Skip to main content

mabi_opcua/sdk/
session.rs

1//! OPC UA session management.
2//!
3//! Sessions represent authenticated connections from clients to the server.
4
5use std::sync::atomic::{AtomicU32, Ordering};
6
7use chrono::{DateTime, Duration, Utc};
8use dashmap::DashMap;
9use serde::{Deserialize, Serialize};
10use tokio::sync::broadcast;
11use tracing::{info, warn};
12
13use crate::types::NodeId;
14
15/// Session manager configuration.
16#[derive(Debug, Clone)]
17pub struct SessionManagerConfig {
18    /// Maximum number of sessions.
19    pub max_sessions: usize,
20    /// Session timeout in milliseconds.
21    pub session_timeout_ms: u64,
22    /// Maximum subscriptions per session.
23    pub max_subscriptions_per_session: usize,
24}
25
26impl Default for SessionManagerConfig {
27    fn default() -> Self {
28        Self {
29            max_sessions: 1000,
30            session_timeout_ms: 60_000, // 1 minute
31            max_subscriptions_per_session: 100,
32        }
33    }
34}
35
36/// Type alias for backward compatibility.
37pub type SessionConfig = SessionManagerConfig;
38
39/// A session instance.
40///
41/// Represents an active connection from a client.
42pub struct Session {
43    /// Session information.
44    info: SessionInfo,
45}
46
47impl Session {
48    /// Create a new session.
49    pub fn new(info: SessionInfo) -> Self {
50        Self { info }
51    }
52
53    /// Get the session ID.
54    pub fn id(&self) -> &NodeId {
55        &self.info.session_id
56    }
57
58    /// Get the session info.
59    pub fn info(&self) -> &SessionInfo {
60        &self.info
61    }
62
63    /// Get mutable session info.
64    pub fn info_mut(&mut self) -> &mut SessionInfo {
65        &mut self.info
66    }
67
68    /// Check if session is active.
69    pub fn is_active(&self) -> bool {
70        self.info.is_active()
71    }
72
73    /// Check if session is timed out.
74    pub fn is_timed_out(&self) -> bool {
75        self.info.is_timed_out()
76    }
77
78    /// Update last activity.
79    pub fn touch(&mut self) {
80        self.info.touch();
81    }
82
83    /// Activate the session.
84    pub fn activate(&mut self, user_identity: UserIdentity) {
85        self.info.activate(user_identity);
86    }
87
88    /// Close the session.
89    pub fn close(&mut self) {
90        self.info.close();
91    }
92}
93
94/// User identity type.
95#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
96pub enum UserIdentity {
97    /// Anonymous user.
98    Anonymous,
99    /// Username/password authentication.
100    UserName {
101        username: String,
102        // Note: Password is not stored
103    },
104    /// X.509 certificate authentication.
105    Certificate { thumbprint: String, subject: String },
106    /// Issued token (e.g., JWT).
107    IssuedToken { token_type: String },
108}
109
110impl Default for UserIdentity {
111    fn default() -> Self {
112        Self::Anonymous
113    }
114}
115
116/// Session state.
117#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
118pub enum SessionState {
119    /// Session created but not activated.
120    Created,
121    /// Session is active.
122    Active,
123    /// Session is closed.
124    Closed,
125    /// Session timed out.
126    TimedOut,
127}
128
129/// Session information.
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct SessionInfo {
132    /// Session ID.
133    pub session_id: NodeId,
134    /// Authentication token.
135    pub authentication_token: NodeId,
136    /// Session name.
137    pub session_name: String,
138    /// Client description.
139    pub client_description: String,
140    /// Server URI.
141    pub server_uri: String,
142    /// Endpoint URL.
143    pub endpoint_url: String,
144    /// Security policy URI.
145    pub security_policy_uri: String,
146    /// Security mode.
147    pub security_mode: String,
148    /// User identity.
149    pub user_identity: UserIdentity,
150    /// Session state.
151    pub state: SessionState,
152    /// Creation time.
153    pub created_at: DateTime<Utc>,
154    /// Last activity time.
155    pub last_activity: DateTime<Utc>,
156    /// Timeout in milliseconds.
157    pub timeout_ms: u64,
158    /// Subscription IDs.
159    pub subscriptions: Vec<u32>,
160    /// Maximum response message size.
161    pub max_response_message_size: u32,
162}
163
164impl SessionInfo {
165    /// Create a new session info.
166    pub fn new(session_id: NodeId, session_name: impl Into<String>, timeout_ms: u64) -> Self {
167        let now = Utc::now();
168        Self {
169            session_id: session_id.clone(),
170            authentication_token: NodeId::next_numeric(0),
171            session_name: session_name.into(),
172            client_description: String::new(),
173            server_uri: String::new(),
174            endpoint_url: String::new(),
175            security_policy_uri: String::new(),
176            security_mode: String::new(),
177            user_identity: UserIdentity::Anonymous,
178            state: SessionState::Created,
179            created_at: now,
180            last_activity: now,
181            timeout_ms,
182            subscriptions: Vec::new(),
183            max_response_message_size: 0,
184        }
185    }
186
187    /// Check if session is timed out.
188    pub fn is_timed_out(&self) -> bool {
189        let timeout = Duration::milliseconds(self.timeout_ms as i64);
190        Utc::now() - self.last_activity > timeout
191    }
192
193    /// Check if session is active.
194    pub fn is_active(&self) -> bool {
195        self.state == SessionState::Active && !self.is_timed_out()
196    }
197
198    /// Update last activity time.
199    pub fn touch(&mut self) {
200        self.last_activity = Utc::now();
201    }
202
203    /// Activate the session.
204    pub fn activate(&mut self, user_identity: UserIdentity) {
205        self.state = SessionState::Active;
206        self.user_identity = user_identity;
207        self.touch();
208    }
209
210    /// Close the session.
211    pub fn close(&mut self) {
212        self.state = SessionState::Closed;
213    }
214
215    /// Add a subscription.
216    pub fn add_subscription(&mut self, subscription_id: u32) {
217        if !self.subscriptions.contains(&subscription_id) {
218            self.subscriptions.push(subscription_id);
219        }
220    }
221
222    /// Remove a subscription.
223    pub fn remove_subscription(&mut self, subscription_id: u32) {
224        self.subscriptions.retain(|&id| id != subscription_id);
225    }
226}
227
228/// Session event.
229#[derive(Debug, Clone)]
230pub enum SessionEvent {
231    /// Session created.
232    Created { session_id: NodeId },
233    /// Session activated.
234    Activated {
235        session_id: NodeId,
236        user: UserIdentity,
237    },
238    /// Session closed.
239    Closed { session_id: NodeId },
240    /// Session timed out.
241    TimedOut { session_id: NodeId },
242}
243
244/// Session manager.
245///
246/// Manages all active sessions and their lifecycle.
247pub struct SessionManager {
248    config: SessionManagerConfig,
249    sessions: DashMap<NodeId, SessionInfo>,
250    session_counter: AtomicU32,
251    event_tx: broadcast::Sender<SessionEvent>,
252}
253
254impl SessionManager {
255    /// Create a new session manager with default config.
256    pub fn new() -> Self {
257        Self::with_config(SessionManagerConfig::default())
258    }
259
260    /// Create a new session manager with config.
261    pub fn with_config(config: SessionManagerConfig) -> Self {
262        let (event_tx, _) = broadcast::channel(256);
263
264        Self {
265            config,
266            sessions: DashMap::new(),
267            session_counter: AtomicU32::new(1),
268            event_tx,
269        }
270    }
271
272    /// Create a new session.
273    pub fn create_session(
274        &self,
275        session_name: impl Into<String>,
276    ) -> Result<SessionInfo, SessionError> {
277        if self.sessions.len() >= self.config.max_sessions {
278            return Err(SessionError::MaxSessionsReached);
279        }
280
281        let session_id = self.next_session_id();
282        let session = SessionInfo::new(
283            session_id.clone(),
284            session_name,
285            self.config.session_timeout_ms,
286        );
287
288        self.sessions.insert(session_id.clone(), session.clone());
289
290        info!(session_id = %session_id, "Session created");
291        let _ = self.event_tx.send(SessionEvent::Created { session_id });
292
293        Ok(session)
294    }
295
296    /// Activate a session.
297    pub fn activate_session(
298        &self,
299        session_id: &NodeId,
300        user_identity: UserIdentity,
301    ) -> Result<(), SessionError> {
302        let mut session = self
303            .sessions
304            .get_mut(session_id)
305            .ok_or(SessionError::SessionNotFound)?;
306
307        if session.state != SessionState::Created {
308            return Err(SessionError::InvalidState);
309        }
310
311        session.activate(user_identity.clone());
312
313        info!(session_id = %session_id, "Session activated");
314        let _ = self.event_tx.send(SessionEvent::Activated {
315            session_id: session_id.clone(),
316            user: user_identity,
317        });
318
319        Ok(())
320    }
321
322    /// Close a session.
323    pub fn close_session(&self, session_id: &NodeId) -> Result<(), SessionError> {
324        if let Some(mut session) = self.sessions.get_mut(session_id) {
325            session.close();
326            drop(session);
327
328            // Remove after updating state
329            self.sessions.remove(session_id);
330
331            info!(session_id = %session_id, "Session closed");
332            let _ = self.event_tx.send(SessionEvent::Closed {
333                session_id: session_id.clone(),
334            });
335
336            Ok(())
337        } else {
338            Err(SessionError::SessionNotFound)
339        }
340    }
341
342    /// Get a session.
343    pub fn get_session(&self, session_id: &NodeId) -> Option<SessionInfo> {
344        self.sessions.get(session_id).map(|s| s.clone())
345    }
346
347    /// Get session by authentication token.
348    pub fn get_session_by_token(&self, auth_token: &NodeId) -> Option<SessionInfo> {
349        self.sessions
350            .iter()
351            .find(|e| &e.authentication_token == auth_token)
352            .map(|e| e.clone())
353    }
354
355    /// Update session activity.
356    pub fn touch_session(&self, session_id: &NodeId) {
357        if let Some(mut session) = self.sessions.get_mut(session_id) {
358            session.touch();
359        }
360    }
361
362    /// Add subscription to session.
363    pub fn add_subscription(
364        &self,
365        session_id: &NodeId,
366        subscription_id: u32,
367    ) -> Result<(), SessionError> {
368        let mut session = self
369            .sessions
370            .get_mut(session_id)
371            .ok_or(SessionError::SessionNotFound)?;
372
373        if session.subscriptions.len() >= self.config.max_subscriptions_per_session {
374            return Err(SessionError::TooManySubscriptions);
375        }
376
377        session.add_subscription(subscription_id);
378        Ok(())
379    }
380
381    /// Remove subscription from session.
382    pub fn remove_subscription(&self, session_id: &NodeId, subscription_id: u32) {
383        if let Some(mut session) = self.sessions.get_mut(session_id) {
384            session.remove_subscription(subscription_id);
385        }
386    }
387
388    /// Cleanup expired sessions.
389    pub fn cleanup_expired(&self) {
390        let expired: Vec<NodeId> = self
391            .sessions
392            .iter()
393            .filter(|e| e.is_timed_out())
394            .map(|e| e.session_id.clone())
395            .collect();
396
397        for session_id in expired {
398            if let Some((_, session)) = self.sessions.remove(&session_id) {
399                warn!(session_id = %session_id, "Session timed out");
400                let _ = self.event_tx.send(SessionEvent::TimedOut {
401                    session_id: session.session_id,
402                });
403            }
404        }
405    }
406
407    /// Get the number of active sessions.
408    pub fn session_count(&self) -> usize {
409        self.sessions.len()
410    }
411
412    /// Get all session IDs.
413    pub fn session_ids(&self) -> Vec<NodeId> {
414        self.sessions.iter().map(|e| e.key().clone()).collect()
415    }
416
417    /// Subscribe to session events.
418    pub fn subscribe_events(&self) -> broadcast::Receiver<SessionEvent> {
419        self.event_tx.subscribe()
420    }
421
422    /// Generate next session ID.
423    fn next_session_id(&self) -> NodeId {
424        let id = self.session_counter.fetch_add(1, Ordering::SeqCst);
425        NodeId::numeric(1, id)
426    }
427}
428
429impl Default for SessionManager {
430    fn default() -> Self {
431        Self::new()
432    }
433}
434
435/// Session error types.
436#[derive(Debug, Clone, thiserror::Error)]
437pub enum SessionError {
438    #[error("Maximum sessions reached")]
439    MaxSessionsReached,
440    #[error("Session not found")]
441    SessionNotFound,
442    #[error("Invalid session state")]
443    InvalidState,
444    #[error("Too many subscriptions")]
445    TooManySubscriptions,
446    #[error("Session timed out")]
447    TimedOut,
448    #[error("Not authorized")]
449    NotAuthorized,
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455
456    #[test]
457    fn test_create_session() {
458        let manager = SessionManager::default();
459
460        let session = manager.create_session("TestSession").unwrap();
461        assert_eq!(session.state, SessionState::Created);
462        assert_eq!(manager.session_count(), 1);
463    }
464
465    #[test]
466    fn test_activate_session() {
467        let manager = SessionManager::default();
468
469        let session = manager.create_session("TestSession").unwrap();
470        manager
471            .activate_session(&session.session_id, UserIdentity::Anonymous)
472            .unwrap();
473
474        let updated = manager.get_session(&session.session_id).unwrap();
475        assert_eq!(updated.state, SessionState::Active);
476    }
477
478    #[test]
479    fn test_close_session() {
480        let manager = SessionManager::default();
481
482        let session = manager.create_session("TestSession").unwrap();
483        manager.close_session(&session.session_id).unwrap();
484
485        assert!(manager.get_session(&session.session_id).is_none());
486        assert_eq!(manager.session_count(), 0);
487    }
488
489    #[test]
490    fn test_max_sessions() {
491        let manager = SessionManager::with_config(SessionManagerConfig {
492            max_sessions: 2,
493            ..Default::default()
494        });
495
496        manager.create_session("Session1").unwrap();
497        manager.create_session("Session2").unwrap();
498
499        let result = manager.create_session("Session3");
500        assert!(matches!(result, Err(SessionError::MaxSessionsReached)));
501    }
502
503    #[test]
504    fn test_session_subscriptions() {
505        let manager = SessionManager::default();
506
507        let session = manager.create_session("TestSession").unwrap();
508        manager.add_subscription(&session.session_id, 1).unwrap();
509        manager.add_subscription(&session.session_id, 2).unwrap();
510
511        let updated = manager.get_session(&session.session_id).unwrap();
512        assert_eq!(updated.subscriptions.len(), 2);
513
514        manager.remove_subscription(&session.session_id, 1);
515
516        let updated = manager.get_session(&session.session_id).unwrap();
517        assert_eq!(updated.subscriptions.len(), 1);
518    }
519}