1use bytes::Bytes;
4use clasp_core::{Action, Message, Scope, WelcomeMessage, PROTOCOL_VERSION};
5use clasp_transport::TransportSender;
6use parking_lot::RwLock;
7use std::collections::HashSet;
8use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::Instant;
11use uuid::Uuid;
12
13pub type SessionId = String;
15
16const DROP_NOTIFICATION_THRESHOLD: u32 = 100; const DROP_WINDOW_SECONDS: u64 = 10; const DROP_NOTIFICATION_COOLDOWN_SECONDS: u64 = 10; pub struct Session {
23 pub id: SessionId,
25 pub name: String,
27 pub features: Vec<String>,
29 sender: Arc<dyn TransportSender>,
31 subscriptions: RwLock<HashSet<u32>>,
33 pub created_at: Instant,
35 pub last_activity: RwLock<Instant>,
37 pub authenticated: bool,
39 pub token: Option<String>,
41 pub subject: Option<String>,
43 scopes: Vec<Scope>,
45 messages_this_second: AtomicU32,
47 last_rate_limit_second: AtomicU64,
49 drops_in_window: AtomicU32,
51 drop_window_start: AtomicU64,
53 last_drop_notification: AtomicU64,
55 total_drops: AtomicU64,
57 #[cfg(feature = "federation")]
59 federation_peer: bool,
60 #[cfg(feature = "federation")]
62 federation_router_id: parking_lot::RwLock<Option<String>>,
63 #[cfg(feature = "federation")]
65 federation_namespaces: parking_lot::RwLock<Vec<String>>,
66}
67
68#[doc(hidden)]
70struct StubSender;
71
72#[async_trait::async_trait]
73impl TransportSender for StubSender {
74 async fn send(&self, _data: Bytes) -> clasp_transport::Result<()> {
75 Ok(())
76 }
77 fn try_send(&self, _data: Bytes) -> clasp_transport::Result<()> {
78 Ok(())
79 }
80 fn is_connected(&self) -> bool {
81 false
82 }
83 async fn close(&self) -> clasp_transport::Result<()> {
84 Ok(())
85 }
86}
87
88impl Session {
89 #[doc(hidden)]
91 pub fn stub(subject: Option<String>) -> Self {
92 let mut s = Self::new(Arc::new(StubSender), "test-stub".to_string(), vec![]);
93 s.subject = subject;
94 s
95 }
96
97 #[doc(hidden)]
99 #[cfg(feature = "federation")]
100 pub fn stub_federation(name: &str) -> Self {
101 Self::new(
102 Arc::new(StubSender),
103 name.to_string(),
104 vec!["federation".to_string()],
105 )
106 }
107
108 pub fn new(sender: Arc<dyn TransportSender>, name: String, features: Vec<String>) -> Self {
110 let now = Instant::now();
111 #[cfg(feature = "federation")]
112 let is_federation_peer = features.iter().any(|f| f == "federation");
113 Self {
114 id: Uuid::new_v4().to_string(),
115 name,
116 features,
117 sender,
118 subscriptions: RwLock::new(HashSet::new()),
119 created_at: now,
120 last_activity: RwLock::new(now),
121 authenticated: false,
122 token: None,
123 subject: None,
124 scopes: Vec::new(),
125 messages_this_second: AtomicU32::new(0),
126 last_rate_limit_second: AtomicU64::new(0),
127 drops_in_window: AtomicU32::new(0),
128 drop_window_start: AtomicU64::new(0),
129 last_drop_notification: AtomicU64::new(0),
130 total_drops: AtomicU64::new(0),
131 #[cfg(feature = "federation")]
132 federation_peer: is_federation_peer,
133 #[cfg(feature = "federation")]
134 federation_router_id: parking_lot::RwLock::new(None),
135 #[cfg(feature = "federation")]
136 federation_namespaces: parking_lot::RwLock::new(Vec::new()),
137 }
138 }
139
140 pub fn set_authenticated(
142 &mut self,
143 token: String,
144 subject: Option<String>,
145 scopes: Vec<Scope>,
146 ) {
147 self.authenticated = true;
148 self.token = Some(token);
149 self.subject = subject;
150 self.scopes = scopes;
151 }
152
153 pub fn has_scope(&self, action: Action, address: &str) -> bool {
155 if self.scopes.is_empty() && !self.authenticated {
158 return true;
159 }
160 self.scopes
161 .iter()
162 .any(|scope| scope.allows(action, address))
163 }
164
165 pub fn has_strict_read_scope(&self, address: &str) -> bool {
172 if self.scopes.is_empty() && !self.authenticated {
173 return true;
174 }
175 self.scopes
176 .iter()
177 .any(|scope| scope.action() == Action::Read && scope.allows(Action::Read, address))
178 }
179
180 pub fn scopes(&self) -> &[Scope] {
182 &self.scopes
183 }
184
185 pub async fn send(&self, data: Bytes) -> Result<(), clasp_transport::TransportError> {
187 self.sender.send(data).await?;
188 *self.last_activity.write() = Instant::now();
189 Ok(())
190 }
191
192 pub fn try_send(&self, data: Bytes) -> Result<(), clasp_transport::TransportError> {
195 self.sender.try_send(data)?;
196 *self.last_activity.write() = Instant::now();
197 Ok(())
198 }
199
200 pub async fn send_message(&self, message: &Message) -> Result<(), clasp_core::Error> {
202 let data = clasp_core::codec::encode(message)?;
203 self.send(data)
204 .await
205 .map_err(|e| clasp_core::Error::ConnectionError(e.to_string()))?;
206 Ok(())
207 }
208
209 pub fn welcome_message(&self, server_name: &str, server_features: &[String]) -> Message {
211 Message::Welcome(WelcomeMessage {
212 version: PROTOCOL_VERSION,
213 session: self.id.clone(),
214 name: server_name.to_string(),
215 features: server_features.to_vec(),
216 time: clasp_core::time::now(),
217 token: None,
218 })
219 }
220
221 pub fn add_subscription(&self, id: u32) {
223 self.subscriptions.write().insert(id);
224 }
225
226 pub fn remove_subscription(&self, id: u32) -> bool {
228 self.subscriptions.write().remove(&id)
229 }
230
231 pub fn subscriptions(&self) -> Vec<u32> {
233 self.subscriptions.read().iter().cloned().collect()
234 }
235
236 pub fn is_connected(&self) -> bool {
238 self.sender.is_connected()
239 }
240
241 pub fn touch(&self) {
243 *self.last_activity.write() = Instant::now();
244 }
245
246 pub fn idle_duration(&self) -> std::time::Duration {
248 self.last_activity.read().elapsed()
249 }
250
251 pub fn check_rate_limit(&self, max_per_second: u32) -> bool {
254 if max_per_second == 0 {
255 return true; }
257
258 let now = std::time::SystemTime::now()
259 .duration_since(std::time::UNIX_EPOCH)
260 .unwrap_or_default()
261 .as_secs();
262
263 let last_second = self.last_rate_limit_second.load(Ordering::Relaxed);
264
265 if now != last_second {
266 self.messages_this_second.store(1, Ordering::Relaxed);
268 self.last_rate_limit_second.store(now, Ordering::Relaxed);
269 true
270 } else {
271 let count = self.messages_this_second.fetch_add(1, Ordering::Relaxed) + 1;
273 count <= max_per_second
274 }
275 }
276
277 pub fn messages_per_second(&self) -> u32 {
279 self.messages_this_second.load(Ordering::Relaxed)
280 }
281
282 pub fn record_drop(&self) -> bool {
285 let now = std::time::SystemTime::now()
286 .duration_since(std::time::UNIX_EPOCH)
287 .unwrap_or_default()
288 .as_secs();
289
290 self.total_drops.fetch_add(1, Ordering::Relaxed);
292
293 let window_start = self.drop_window_start.load(Ordering::Relaxed);
295 if now >= window_start + DROP_WINDOW_SECONDS {
296 self.drops_in_window.store(1, Ordering::Relaxed);
298 self.drop_window_start.store(now, Ordering::Relaxed);
299 return false; }
301
302 let drops = self.drops_in_window.fetch_add(1, Ordering::Relaxed) + 1;
304 if drops >= DROP_NOTIFICATION_THRESHOLD {
305 let last_notification = self.last_drop_notification.load(Ordering::Relaxed);
307 if now >= last_notification + DROP_NOTIFICATION_COOLDOWN_SECONDS {
308 self.last_drop_notification.store(now, Ordering::Relaxed);
310 return true;
311 }
312 }
313
314 false
315 }
316
317 pub fn total_drops(&self) -> u64 {
319 self.total_drops.load(Ordering::Relaxed)
320 }
321
322 pub fn drops_in_window(&self) -> u32 {
324 self.drops_in_window.load(Ordering::Relaxed)
325 }
326
327 #[cfg(feature = "federation")]
329 pub fn is_federation_peer(&self) -> bool {
330 self.federation_peer
331 }
332
333 #[cfg(feature = "federation")]
335 pub fn federation_router_id(&self) -> Option<String> {
336 self.federation_router_id.read().clone()
337 }
338
339 #[cfg(feature = "federation")]
341 pub fn set_federation_router_id(&self, id: String) {
342 *self.federation_router_id.write() = Some(id);
343 }
344
345 #[cfg(feature = "federation")]
347 pub fn federation_namespaces(&self) -> Vec<String> {
348 self.federation_namespaces.read().clone()
349 }
350
351 #[cfg(feature = "federation")]
353 pub fn set_federation_namespaces(&self, patterns: Vec<String>) {
354 *self.federation_namespaces.write() = patterns;
355 }
356}
357
358impl std::fmt::Debug for Session {
359 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360 f.debug_struct("Session")
361 .field("id", &self.id)
362 .field("name", &self.name)
363 .field("features", &self.features)
364 .field("authenticated", &self.authenticated)
365 .field("subject", &self.subject)
366 .field("scopes", &self.scopes.len())
367 .finish()
368 }
369}