1use std::sync::atomic::{AtomicU32, Ordering};
6
7use chrono::{DateTime, Duration, Utc};
8use dashmap::DashMap;
9use serde::{Deserialize, Serialize};
10use tokio::sync::broadcast;
11use tracing::{info, warn};
12
13use crate::types::NodeId;
14
15#[derive(Debug, Clone)]
17pub struct SessionManagerConfig {
18 pub max_sessions: usize,
20 pub session_timeout_ms: u64,
22 pub max_subscriptions_per_session: usize,
24}
25
26impl Default for SessionManagerConfig {
27 fn default() -> Self {
28 Self {
29 max_sessions: 1000,
30 session_timeout_ms: 60_000, max_subscriptions_per_session: 100,
32 }
33 }
34}
35
36pub type SessionConfig = SessionManagerConfig;
38
39pub struct Session {
43 info: SessionInfo,
45}
46
47impl Session {
48 pub fn new(info: SessionInfo) -> Self {
50 Self { info }
51 }
52
53 pub fn id(&self) -> &NodeId {
55 &self.info.session_id
56 }
57
58 pub fn info(&self) -> &SessionInfo {
60 &self.info
61 }
62
63 pub fn info_mut(&mut self) -> &mut SessionInfo {
65 &mut self.info
66 }
67
68 pub fn is_active(&self) -> bool {
70 self.info.is_active()
71 }
72
73 pub fn is_timed_out(&self) -> bool {
75 self.info.is_timed_out()
76 }
77
78 pub fn touch(&mut self) {
80 self.info.touch();
81 }
82
83 pub fn activate(&mut self, user_identity: UserIdentity) {
85 self.info.activate(user_identity);
86 }
87
88 pub fn close(&mut self) {
90 self.info.close();
91 }
92}
93
94#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
96pub enum UserIdentity {
97 Anonymous,
99 UserName {
101 username: String,
102 },
104 Certificate { thumbprint: String, subject: String },
106 IssuedToken { token_type: String },
108}
109
110impl Default for UserIdentity {
111 fn default() -> Self {
112 Self::Anonymous
113 }
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
118pub enum SessionState {
119 Created,
121 Active,
123 Closed,
125 TimedOut,
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct SessionInfo {
132 pub session_id: NodeId,
134 pub authentication_token: NodeId,
136 pub session_name: String,
138 pub client_description: String,
140 pub server_uri: String,
142 pub endpoint_url: String,
144 pub security_policy_uri: String,
146 pub security_mode: String,
148 pub user_identity: UserIdentity,
150 pub state: SessionState,
152 pub created_at: DateTime<Utc>,
154 pub last_activity: DateTime<Utc>,
156 pub timeout_ms: u64,
158 pub subscriptions: Vec<u32>,
160 pub max_response_message_size: u32,
162}
163
164impl SessionInfo {
165 pub fn new(session_id: NodeId, session_name: impl Into<String>, timeout_ms: u64) -> Self {
167 let now = Utc::now();
168 Self {
169 session_id: session_id.clone(),
170 authentication_token: NodeId::next_numeric(0),
171 session_name: session_name.into(),
172 client_description: String::new(),
173 server_uri: String::new(),
174 endpoint_url: String::new(),
175 security_policy_uri: String::new(),
176 security_mode: String::new(),
177 user_identity: UserIdentity::Anonymous,
178 state: SessionState::Created,
179 created_at: now,
180 last_activity: now,
181 timeout_ms,
182 subscriptions: Vec::new(),
183 max_response_message_size: 0,
184 }
185 }
186
187 pub fn is_timed_out(&self) -> bool {
189 let timeout = Duration::milliseconds(self.timeout_ms as i64);
190 Utc::now() - self.last_activity > timeout
191 }
192
193 pub fn is_active(&self) -> bool {
195 self.state == SessionState::Active && !self.is_timed_out()
196 }
197
198 pub fn touch(&mut self) {
200 self.last_activity = Utc::now();
201 }
202
203 pub fn activate(&mut self, user_identity: UserIdentity) {
205 self.state = SessionState::Active;
206 self.user_identity = user_identity;
207 self.touch();
208 }
209
210 pub fn close(&mut self) {
212 self.state = SessionState::Closed;
213 }
214
215 pub fn add_subscription(&mut self, subscription_id: u32) {
217 if !self.subscriptions.contains(&subscription_id) {
218 self.subscriptions.push(subscription_id);
219 }
220 }
221
222 pub fn remove_subscription(&mut self, subscription_id: u32) {
224 self.subscriptions.retain(|&id| id != subscription_id);
225 }
226}
227
228#[derive(Debug, Clone)]
230pub enum SessionEvent {
231 Created { session_id: NodeId },
233 Activated {
235 session_id: NodeId,
236 user: UserIdentity,
237 },
238 Closed { session_id: NodeId },
240 TimedOut { session_id: NodeId },
242}
243
244pub struct SessionManager {
248 config: SessionManagerConfig,
249 sessions: DashMap<NodeId, SessionInfo>,
250 session_counter: AtomicU32,
251 event_tx: broadcast::Sender<SessionEvent>,
252}
253
254impl SessionManager {
255 pub fn new() -> Self {
257 Self::with_config(SessionManagerConfig::default())
258 }
259
260 pub fn with_config(config: SessionManagerConfig) -> Self {
262 let (event_tx, _) = broadcast::channel(256);
263
264 Self {
265 config,
266 sessions: DashMap::new(),
267 session_counter: AtomicU32::new(1),
268 event_tx,
269 }
270 }
271
272 pub fn create_session(
274 &self,
275 session_name: impl Into<String>,
276 ) -> Result<SessionInfo, SessionError> {
277 if self.sessions.len() >= self.config.max_sessions {
278 return Err(SessionError::MaxSessionsReached);
279 }
280
281 let session_id = self.next_session_id();
282 let session = SessionInfo::new(
283 session_id.clone(),
284 session_name,
285 self.config.session_timeout_ms,
286 );
287
288 self.sessions.insert(session_id.clone(), session.clone());
289
290 info!(session_id = %session_id, "Session created");
291 let _ = self.event_tx.send(SessionEvent::Created { session_id });
292
293 Ok(session)
294 }
295
296 pub fn activate_session(
298 &self,
299 session_id: &NodeId,
300 user_identity: UserIdentity,
301 ) -> Result<(), SessionError> {
302 let mut session = self
303 .sessions
304 .get_mut(session_id)
305 .ok_or(SessionError::SessionNotFound)?;
306
307 if session.state != SessionState::Created {
308 return Err(SessionError::InvalidState);
309 }
310
311 session.activate(user_identity.clone());
312
313 info!(session_id = %session_id, "Session activated");
314 let _ = self.event_tx.send(SessionEvent::Activated {
315 session_id: session_id.clone(),
316 user: user_identity,
317 });
318
319 Ok(())
320 }
321
322 pub fn close_session(&self, session_id: &NodeId) -> Result<(), SessionError> {
324 if let Some(mut session) = self.sessions.get_mut(session_id) {
325 session.close();
326 drop(session);
327
328 self.sessions.remove(session_id);
330
331 info!(session_id = %session_id, "Session closed");
332 let _ = self.event_tx.send(SessionEvent::Closed {
333 session_id: session_id.clone(),
334 });
335
336 Ok(())
337 } else {
338 Err(SessionError::SessionNotFound)
339 }
340 }
341
342 pub fn get_session(&self, session_id: &NodeId) -> Option<SessionInfo> {
344 self.sessions.get(session_id).map(|s| s.clone())
345 }
346
347 pub fn get_session_by_token(&self, auth_token: &NodeId) -> Option<SessionInfo> {
349 self.sessions
350 .iter()
351 .find(|e| &e.authentication_token == auth_token)
352 .map(|e| e.clone())
353 }
354
355 pub fn touch_session(&self, session_id: &NodeId) {
357 if let Some(mut session) = self.sessions.get_mut(session_id) {
358 session.touch();
359 }
360 }
361
362 pub fn add_subscription(
364 &self,
365 session_id: &NodeId,
366 subscription_id: u32,
367 ) -> Result<(), SessionError> {
368 let mut session = self
369 .sessions
370 .get_mut(session_id)
371 .ok_or(SessionError::SessionNotFound)?;
372
373 if session.subscriptions.len() >= self.config.max_subscriptions_per_session {
374 return Err(SessionError::TooManySubscriptions);
375 }
376
377 session.add_subscription(subscription_id);
378 Ok(())
379 }
380
381 pub fn remove_subscription(&self, session_id: &NodeId, subscription_id: u32) {
383 if let Some(mut session) = self.sessions.get_mut(session_id) {
384 session.remove_subscription(subscription_id);
385 }
386 }
387
388 pub fn cleanup_expired(&self) {
390 let expired: Vec<NodeId> = self
391 .sessions
392 .iter()
393 .filter(|e| e.is_timed_out())
394 .map(|e| e.session_id.clone())
395 .collect();
396
397 for session_id in expired {
398 if let Some((_, session)) = self.sessions.remove(&session_id) {
399 warn!(session_id = %session_id, "Session timed out");
400 let _ = self.event_tx.send(SessionEvent::TimedOut {
401 session_id: session.session_id,
402 });
403 }
404 }
405 }
406
407 pub fn session_count(&self) -> usize {
409 self.sessions.len()
410 }
411
412 pub fn session_ids(&self) -> Vec<NodeId> {
414 self.sessions.iter().map(|e| e.key().clone()).collect()
415 }
416
417 pub fn subscribe_events(&self) -> broadcast::Receiver<SessionEvent> {
419 self.event_tx.subscribe()
420 }
421
422 fn next_session_id(&self) -> NodeId {
424 let id = self.session_counter.fetch_add(1, Ordering::SeqCst);
425 NodeId::numeric(1, id)
426 }
427}
428
429impl Default for SessionManager {
430 fn default() -> Self {
431 Self::new()
432 }
433}
434
435#[derive(Debug, Clone, thiserror::Error)]
437pub enum SessionError {
438 #[error("Maximum sessions reached")]
439 MaxSessionsReached,
440 #[error("Session not found")]
441 SessionNotFound,
442 #[error("Invalid session state")]
443 InvalidState,
444 #[error("Too many subscriptions")]
445 TooManySubscriptions,
446 #[error("Session timed out")]
447 TimedOut,
448 #[error("Not authorized")]
449 NotAuthorized,
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455
456 #[test]
457 fn test_create_session() {
458 let manager = SessionManager::default();
459
460 let session = manager.create_session("TestSession").unwrap();
461 assert_eq!(session.state, SessionState::Created);
462 assert_eq!(manager.session_count(), 1);
463 }
464
465 #[test]
466 fn test_activate_session() {
467 let manager = SessionManager::default();
468
469 let session = manager.create_session("TestSession").unwrap();
470 manager
471 .activate_session(&session.session_id, UserIdentity::Anonymous)
472 .unwrap();
473
474 let updated = manager.get_session(&session.session_id).unwrap();
475 assert_eq!(updated.state, SessionState::Active);
476 }
477
478 #[test]
479 fn test_close_session() {
480 let manager = SessionManager::default();
481
482 let session = manager.create_session("TestSession").unwrap();
483 manager.close_session(&session.session_id).unwrap();
484
485 assert!(manager.get_session(&session.session_id).is_none());
486 assert_eq!(manager.session_count(), 0);
487 }
488
489 #[test]
490 fn test_max_sessions() {
491 let manager = SessionManager::with_config(SessionManagerConfig {
492 max_sessions: 2,
493 ..Default::default()
494 });
495
496 manager.create_session("Session1").unwrap();
497 manager.create_session("Session2").unwrap();
498
499 let result = manager.create_session("Session3");
500 assert!(matches!(result, Err(SessionError::MaxSessionsReached)));
501 }
502
503 #[test]
504 fn test_session_subscriptions() {
505 let manager = SessionManager::default();
506
507 let session = manager.create_session("TestSession").unwrap();
508 manager.add_subscription(&session.session_id, 1).unwrap();
509 manager.add_subscription(&session.session_id, 2).unwrap();
510
511 let updated = manager.get_session(&session.session_id).unwrap();
512 assert_eq!(updated.subscriptions.len(), 2);
513
514 manager.remove_subscription(&session.session_id, 1);
515
516 let updated = manager.get_session(&session.session_id).unwrap();
517 assert_eq!(updated.subscriptions.len(), 1);
518 }
519}