Skip to main content

clasp_router/
session.rs

1//! Session management
2
3use bytes::Bytes;
4use clasp_core::{Action, Message, Scope, WelcomeMessage, PROTOCOL_VERSION};
5use clasp_transport::TransportSender;
6use parking_lot::RwLock;
7use std::collections::HashSet;
8use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::Instant;
11use uuid::Uuid;
12
13/// Session identifier
14pub type SessionId = String;
15
16/// Drop tracking configuration
17const DROP_NOTIFICATION_THRESHOLD: u32 = 100; // Drops before notification
18const DROP_WINDOW_SECONDS: u64 = 10; // Time window for counting drops
19const DROP_NOTIFICATION_COOLDOWN_SECONDS: u64 = 10; // Min time between notifications
20
21/// A connected client session
22pub struct Session {
23    /// Unique session ID
24    pub id: SessionId,
25    /// Client name
26    pub name: String,
27    /// Client features
28    pub features: Vec<String>,
29    /// Transport sender for this session
30    sender: Arc<dyn TransportSender>,
31    /// Active subscriptions (subscription IDs)
32    subscriptions: RwLock<HashSet<u32>>,
33    /// Session creation time
34    pub created_at: Instant,
35    /// Last activity time
36    pub last_activity: RwLock<Instant>,
37    /// Is authenticated
38    pub authenticated: bool,
39    /// Permission token (if any)
40    pub token: Option<String>,
41    /// Subject identifier from token (user, device, or service ID)
42    pub subject: Option<String>,
43    /// Scopes granted to this session
44    scopes: Vec<Scope>,
45    /// Messages received in the current second (for rate limiting)
46    messages_this_second: AtomicU32,
47    /// The second when the message count was last reset (Unix timestamp)
48    last_rate_limit_second: AtomicU64,
49    /// Dropped messages in the current window
50    drops_in_window: AtomicU32,
51    /// Start of the current drop counting window (Unix timestamp)
52    drop_window_start: AtomicU64,
53    /// Last time a drop notification was sent (Unix timestamp)
54    last_drop_notification: AtomicU64,
55    /// Total drops since session started
56    total_drops: AtomicU64,
57    /// Whether this session is a federation peer (advertised "federation" feature in HELLO)
58    #[cfg(feature = "federation")]
59    federation_peer: bool,
60    /// Federation peer's router ID (from HELLO name or DeclareNamespaces origin)
61    #[cfg(feature = "federation")]
62    federation_router_id: parking_lot::RwLock<Option<String>>,
63    /// Namespace patterns declared by this federation peer
64    #[cfg(feature = "federation")]
65    federation_namespaces: parking_lot::RwLock<Vec<String>>,
66}
67
68/// No-op transport sender for test sessions.
69#[doc(hidden)]
70struct StubSender;
71
72#[async_trait::async_trait]
73impl TransportSender for StubSender {
74    async fn send(&self, _data: Bytes) -> clasp_transport::Result<()> {
75        Ok(())
76    }
77    fn try_send(&self, _data: Bytes) -> clasp_transport::Result<()> {
78        Ok(())
79    }
80    fn is_connected(&self) -> bool {
81        false
82    }
83    async fn close(&self) -> clasp_transport::Result<()> {
84        Ok(())
85    }
86}
87
88impl Session {
89    /// Create a minimal session for unit tests. Uses a no-op transport sender.
90    #[doc(hidden)]
91    pub fn stub(subject: Option<String>) -> Self {
92        let mut s = Self::new(Arc::new(StubSender), "test-stub".to_string(), vec![]);
93        s.subject = subject;
94        s
95    }
96
97    /// Create a federation peer session for unit tests.
98    #[doc(hidden)]
99    #[cfg(feature = "federation")]
100    pub fn stub_federation(name: &str) -> Self {
101        Self::new(
102            Arc::new(StubSender),
103            name.to_string(),
104            vec!["federation".to_string()],
105        )
106    }
107
108    /// Create a new session
109    pub fn new(sender: Arc<dyn TransportSender>, name: String, features: Vec<String>) -> Self {
110        let now = Instant::now();
111        #[cfg(feature = "federation")]
112        let is_federation_peer = features.iter().any(|f| f == "federation");
113        Self {
114            id: Uuid::new_v4().to_string(),
115            name,
116            features,
117            sender,
118            subscriptions: RwLock::new(HashSet::new()),
119            created_at: now,
120            last_activity: RwLock::new(now),
121            authenticated: false,
122            token: None,
123            subject: None,
124            scopes: Vec::new(),
125            messages_this_second: AtomicU32::new(0),
126            last_rate_limit_second: AtomicU64::new(0),
127            drops_in_window: AtomicU32::new(0),
128            drop_window_start: AtomicU64::new(0),
129            last_drop_notification: AtomicU64::new(0),
130            total_drops: AtomicU64::new(0),
131            #[cfg(feature = "federation")]
132            federation_peer: is_federation_peer,
133            #[cfg(feature = "federation")]
134            federation_router_id: parking_lot::RwLock::new(None),
135            #[cfg(feature = "federation")]
136            federation_namespaces: parking_lot::RwLock::new(Vec::new()),
137        }
138    }
139
140    /// Set authentication info from a validated token
141    pub fn set_authenticated(
142        &mut self,
143        token: String,
144        subject: Option<String>,
145        scopes: Vec<Scope>,
146    ) {
147        self.authenticated = true;
148        self.token = Some(token);
149        self.subject = subject;
150        self.scopes = scopes;
151    }
152
153    /// Check if this session has permission for the given action on the given address
154    pub fn has_scope(&self, action: Action, address: &str) -> bool {
155        // Unauthenticated sessions in open mode have no scope restrictions
156        // (handled by router based on SecurityMode)
157        if self.scopes.is_empty() && !self.authenticated {
158            return true;
159        }
160        self.scopes
161            .iter()
162            .any(|scope| scope.allows(action, address))
163    }
164
165    /// Check if this session has an explicit read scope for the given address.
166    ///
167    /// Unlike `has_scope(Action::Read, ...)`, this does NOT match write scopes
168    /// (which normally imply read via `Action::Write.allows(Action::Read)`).
169    /// Use this for SUBSCRIBE checks to prevent write-only scopes from granting
170    /// subscription access to paths they should only write to.
171    pub fn has_strict_read_scope(&self, address: &str) -> bool {
172        if self.scopes.is_empty() && !self.authenticated {
173            return true;
174        }
175        self.scopes
176            .iter()
177            .any(|scope| scope.action() == Action::Read && scope.allows(Action::Read, address))
178    }
179
180    /// Get the scopes for this session
181    pub fn scopes(&self) -> &[Scope] {
182        &self.scopes
183    }
184
185    /// Send a message to this session
186    pub async fn send(&self, data: Bytes) -> Result<(), clasp_transport::TransportError> {
187        self.sender.send(data).await?;
188        *self.last_activity.write() = Instant::now();
189        Ok(())
190    }
191
192    /// Try to send a message without blocking (for broadcasts)
193    /// Returns Ok if sent or queued, Err if buffer is full
194    pub fn try_send(&self, data: Bytes) -> Result<(), clasp_transport::TransportError> {
195        self.sender.try_send(data)?;
196        *self.last_activity.write() = Instant::now();
197        Ok(())
198    }
199
200    /// Send a Clasp message
201    pub async fn send_message(&self, message: &Message) -> Result<(), clasp_core::Error> {
202        let data = clasp_core::codec::encode(message)?;
203        self.send(data)
204            .await
205            .map_err(|e| clasp_core::Error::ConnectionError(e.to_string()))?;
206        Ok(())
207    }
208
209    /// Create welcome message for this session
210    pub fn welcome_message(&self, server_name: &str, server_features: &[String]) -> Message {
211        Message::Welcome(WelcomeMessage {
212            version: PROTOCOL_VERSION,
213            session: self.id.clone(),
214            name: server_name.to_string(),
215            features: server_features.to_vec(),
216            time: clasp_core::time::now(),
217            token: None,
218        })
219    }
220
221    /// Add a subscription
222    pub fn add_subscription(&self, id: u32) {
223        self.subscriptions.write().insert(id);
224    }
225
226    /// Remove a subscription
227    pub fn remove_subscription(&self, id: u32) -> bool {
228        self.subscriptions.write().remove(&id)
229    }
230
231    /// Get all subscription IDs
232    pub fn subscriptions(&self) -> Vec<u32> {
233        self.subscriptions.read().iter().cloned().collect()
234    }
235
236    /// Check if connected
237    pub fn is_connected(&self) -> bool {
238        self.sender.is_connected()
239    }
240
241    /// Touch to update last activity
242    pub fn touch(&self) {
243        *self.last_activity.write() = Instant::now();
244    }
245
246    /// Get idle duration
247    pub fn idle_duration(&self) -> std::time::Duration {
248        self.last_activity.read().elapsed()
249    }
250
251    /// Check and increment rate limit counter
252    /// Returns true if within rate limit, false if exceeded
253    pub fn check_rate_limit(&self, max_per_second: u32) -> bool {
254        if max_per_second == 0 {
255            return true; // No rate limiting
256        }
257
258        let now = std::time::SystemTime::now()
259            .duration_since(std::time::UNIX_EPOCH)
260            .unwrap_or_default()
261            .as_secs();
262
263        let last_second = self.last_rate_limit_second.load(Ordering::Relaxed);
264
265        if now != last_second {
266            // New second, reset counter
267            self.messages_this_second.store(1, Ordering::Relaxed);
268            self.last_rate_limit_second.store(now, Ordering::Relaxed);
269            true
270        } else {
271            // Same second, increment and check
272            let count = self.messages_this_second.fetch_add(1, Ordering::Relaxed) + 1;
273            count <= max_per_second
274        }
275    }
276
277    /// Get current message count for this second
278    pub fn messages_per_second(&self) -> u32 {
279        self.messages_this_second.load(Ordering::Relaxed)
280    }
281
282    /// Record a dropped message and check if notification is needed.
283    /// Returns true if a drop notification should be sent to the client.
284    pub fn record_drop(&self) -> bool {
285        let now = std::time::SystemTime::now()
286            .duration_since(std::time::UNIX_EPOCH)
287            .unwrap_or_default()
288            .as_secs();
289
290        // Increment total drops
291        self.total_drops.fetch_add(1, Ordering::Relaxed);
292
293        // Check if we're in a new window
294        let window_start = self.drop_window_start.load(Ordering::Relaxed);
295        if now >= window_start + DROP_WINDOW_SECONDS {
296            // New window, reset counter
297            self.drops_in_window.store(1, Ordering::Relaxed);
298            self.drop_window_start.store(now, Ordering::Relaxed);
299            return false; // First drop in new window, don't notify yet
300        }
301
302        // Same window, increment and check threshold
303        let drops = self.drops_in_window.fetch_add(1, Ordering::Relaxed) + 1;
304        if drops >= DROP_NOTIFICATION_THRESHOLD {
305            // Check cooldown
306            let last_notification = self.last_drop_notification.load(Ordering::Relaxed);
307            if now >= last_notification + DROP_NOTIFICATION_COOLDOWN_SECONDS {
308                // Update notification time and signal to send notification
309                self.last_drop_notification.store(now, Ordering::Relaxed);
310                return true;
311            }
312        }
313
314        false
315    }
316
317    /// Get the total number of dropped messages for this session
318    pub fn total_drops(&self) -> u64 {
319        self.total_drops.load(Ordering::Relaxed)
320    }
321
322    /// Get the drops in the current window
323    pub fn drops_in_window(&self) -> u32 {
324        self.drops_in_window.load(Ordering::Relaxed)
325    }
326
327    /// Check if this session is a federation peer
328    #[cfg(feature = "federation")]
329    pub fn is_federation_peer(&self) -> bool {
330        self.federation_peer
331    }
332
333    /// Get the federation router ID (if set)
334    #[cfg(feature = "federation")]
335    pub fn federation_router_id(&self) -> Option<String> {
336        self.federation_router_id.read().clone()
337    }
338
339    /// Set the federation router ID
340    #[cfg(feature = "federation")]
341    pub fn set_federation_router_id(&self, id: String) {
342        *self.federation_router_id.write() = Some(id);
343    }
344
345    /// Get the federation namespace patterns
346    #[cfg(feature = "federation")]
347    pub fn federation_namespaces(&self) -> Vec<String> {
348        self.federation_namespaces.read().clone()
349    }
350
351    /// Set the federation namespace patterns
352    #[cfg(feature = "federation")]
353    pub fn set_federation_namespaces(&self, patterns: Vec<String>) {
354        *self.federation_namespaces.write() = patterns;
355    }
356}
357
358impl std::fmt::Debug for Session {
359    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360        f.debug_struct("Session")
361            .field("id", &self.id)
362            .field("name", &self.name)
363            .field("features", &self.features)
364            .field("authenticated", &self.authenticated)
365            .field("subject", &self.subject)
366            .field("scopes", &self.scopes.len())
367            .finish()
368    }
369}