1use crate::connection::auth::{AuthResponseData, AuthenticationHandler};
6use crate::error::ConnectionError;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::RwLock;
11
12#[derive(Debug, Clone)]
14pub struct SessionConfig {
15 pub idle_timeout: Duration,
17
18 pub enable_keepalive: bool,
20
21 pub keepalive_interval: Duration,
23
24 pub max_retries: u32,
26
27 pub auto_commit: bool,
29
30 pub default_fetch_size: usize,
32
33 pub query_timeout: Duration,
35}
36
37impl Default for SessionConfig {
38 fn default() -> Self {
39 Self {
40 idle_timeout: Duration::from_secs(600),
41 enable_keepalive: true,
42 keepalive_interval: Duration::from_secs(60),
43 max_retries: 3,
44 auto_commit: true,
45 default_fetch_size: 1000,
46 query_timeout: Duration::from_secs(300),
47 }
48 }
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum SessionState {
54 Initializing,
56
57 Ready,
59
60 Executing,
62
63 InTransaction,
65
66 Idle,
68
69 Closing,
71
72 Closed,
74
75 Error,
77}
78
79impl SessionState {
80 pub fn is_active(&self) -> bool {
82 matches!(
83 self,
84 SessionState::Ready
85 | SessionState::Executing
86 | SessionState::InTransaction
87 | SessionState::Idle
88 )
89 }
90
91 pub fn can_execute(&self) -> bool {
93 matches!(
94 self,
95 SessionState::Ready | SessionState::InTransaction | SessionState::Idle
96 )
97 }
98}
99
100pub struct Session {
102 session_id: String,
104
105 server_info: AuthResponseData,
107
108 config: SessionConfig,
110
111 state: Arc<RwLock<SessionState>>,
113
114 last_activity: Arc<RwLock<Instant>>,
116
117 query_count: AtomicU64,
119
120 in_transaction: AtomicBool,
122
123 current_schema: Arc<RwLock<Option<String>>>,
125
126 attributes: Arc<RwLock<std::collections::HashMap<String, String>>>,
128}
129
130impl Session {
131 pub fn new(session_id: String, server_info: AuthResponseData, config: SessionConfig) -> Self {
133 Self {
134 session_id,
135 server_info,
136 config,
137 state: Arc::new(RwLock::new(SessionState::Ready)),
138 last_activity: Arc::new(RwLock::new(Instant::now())),
139 query_count: AtomicU64::new(0),
140 in_transaction: AtomicBool::new(false),
141 current_schema: Arc::new(RwLock::new(None)),
142 attributes: Arc::new(RwLock::new(std::collections::HashMap::new())),
143 }
144 }
145
146 pub fn session_id(&self) -> &str {
148 &self.session_id
149 }
150
151 pub fn server_info(&self) -> &AuthResponseData {
153 &self.server_info
154 }
155
156 pub fn config(&self) -> &SessionConfig {
158 &self.config
159 }
160
161 pub async fn state(&self) -> SessionState {
163 *self.state.read().await
164 }
165
166 pub async fn set_state(&self, new_state: SessionState) {
168 let mut state = self.state.write().await;
169 *state = new_state;
170 }
171
172 pub async fn update_activity(&self) {
174 let mut last_activity = self.last_activity.write().await;
175 *last_activity = Instant::now();
176 }
177
178 pub async fn idle_duration(&self) -> Duration {
180 let last_activity = self.last_activity.read().await;
181 last_activity.elapsed()
182 }
183
184 pub async fn is_idle_timeout(&self) -> bool {
186 self.idle_duration().await > self.config.idle_timeout
187 }
188
189 pub fn increment_query_count(&self) -> u64 {
191 self.query_count.fetch_add(1, Ordering::SeqCst) + 1
192 }
193
194 pub fn query_count(&self) -> u64 {
196 self.query_count.load(Ordering::SeqCst)
197 }
198
199 pub fn in_transaction(&self) -> bool {
201 self.in_transaction.load(Ordering::SeqCst)
202 }
203
204 pub async fn begin_transaction(&self) -> Result<(), ConnectionError> {
206 let state = self.state().await;
207 if !state.can_execute() {
208 return Err(ConnectionError::ConnectionClosed);
209 }
210
211 if self.in_transaction() {
212 return Err(ConnectionError::InvalidParameter {
213 parameter: "transaction".to_string(),
214 message: "Transaction already active".to_string(),
215 });
216 }
217
218 self.in_transaction.store(true, Ordering::SeqCst);
219 self.set_state(SessionState::InTransaction).await;
220 self.update_activity().await;
221
222 Ok(())
223 }
224
225 pub async fn commit_transaction(&self) -> Result<(), ConnectionError> {
227 if !self.in_transaction() {
228 return Err(ConnectionError::InvalidParameter {
229 parameter: "transaction".to_string(),
230 message: "No active transaction".to_string(),
231 });
232 }
233
234 self.in_transaction.store(false, Ordering::SeqCst);
235 self.set_state(SessionState::Ready).await;
236 self.update_activity().await;
237
238 Ok(())
239 }
240
241 pub async fn rollback_transaction(&self) -> Result<(), ConnectionError> {
243 if !self.in_transaction() {
244 return Err(ConnectionError::InvalidParameter {
245 parameter: "transaction".to_string(),
246 message: "No active transaction".to_string(),
247 });
248 }
249
250 self.in_transaction.store(false, Ordering::SeqCst);
251 self.set_state(SessionState::Ready).await;
252 self.update_activity().await;
253
254 Ok(())
255 }
256
257 pub async fn current_schema(&self) -> Option<String> {
259 self.current_schema.read().await.clone()
260 }
261
262 pub async fn set_current_schema(&self, schema: Option<String>) {
264 let mut current_schema = self.current_schema.write().await;
265 *current_schema = schema;
266 self.update_activity().await;
267 }
268
269 pub async fn get_attribute(&self, key: &str) -> Option<String> {
271 let attributes = self.attributes.read().await;
272 attributes.get(key).cloned()
273 }
274
275 pub async fn set_attribute(&self, key: String, value: String) {
277 let mut attributes = self.attributes.write().await;
278 attributes.insert(key, value);
279 }
280
281 pub async fn remove_attribute(&self, key: &str) -> Option<String> {
283 let mut attributes = self.attributes.write().await;
284 attributes.remove(key)
285 }
286
287 pub async fn close(&self) -> Result<(), ConnectionError> {
289 self.set_state(SessionState::Closing).await;
290
291 if self.in_transaction() {
293 self.in_transaction.store(false, Ordering::SeqCst);
295 }
296
297 self.set_state(SessionState::Closed).await;
298
299 Ok(())
300 }
301
302 pub async fn is_closed(&self) -> bool {
304 matches!(self.state().await, SessionState::Closed)
305 }
306
307 pub async fn mark_error(&self) {
309 self.set_state(SessionState::Error).await;
310 }
311
312 pub async fn validate_ready(&self) -> Result<(), ConnectionError> {
314 let state = self.state().await;
315
316 match state {
317 SessionState::Closed => Err(ConnectionError::ConnectionClosed),
318 SessionState::Error => Err(ConnectionError::InvalidParameter {
319 parameter: "session".to_string(),
320 message: "Session is in error state".to_string(),
321 }),
322 SessionState::Closing => Err(ConnectionError::ConnectionClosed),
323 _ if !state.is_active() => Err(ConnectionError::InvalidParameter {
324 parameter: "session".to_string(),
325 message: format!("Session is not active: {:?}", state),
326 }),
327 _ => Ok(()),
328 }
329 }
330}
331
332impl std::fmt::Debug for Session {
333 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334 f.debug_struct("Session")
335 .field("session_id", &self.session_id)
336 .field("config", &self.config)
337 .field("query_count", &self.query_count())
338 .field("in_transaction", &self.in_transaction())
339 .finish()
340 }
341}
342
343pub struct SessionManager {
345 sessions: Arc<RwLock<std::collections::HashMap<String, Arc<Session>>>>,
347
348 _auth_handler: Arc<AuthenticationHandler>,
350
351 config: SessionConfig,
353}
354
355impl SessionManager {
356 pub fn new(auth_handler: Arc<AuthenticationHandler>, config: SessionConfig) -> Self {
358 Self {
359 sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
360 _auth_handler: auth_handler,
361 config,
362 }
363 }
364
365 pub async fn register_session(&self, session: Arc<Session>) {
367 let mut sessions = self.sessions.write().await;
368 sessions.insert(session.session_id().to_string(), session);
369 }
370
371 pub async fn get_session(&self, session_id: &str) -> Option<Arc<Session>> {
373 let sessions = self.sessions.read().await;
374 sessions.get(session_id).cloned()
375 }
376
377 pub async fn remove_session(&self, session_id: &str) -> Option<Arc<Session>> {
379 let mut sessions = self.sessions.write().await;
380 sessions.remove(session_id)
381 }
382
383 pub async fn active_sessions(&self) -> Vec<Arc<Session>> {
385 let sessions = self.sessions.read().await;
386 sessions.values().cloned().collect()
387 }
388
389 pub async fn close_all(&self) -> Result<(), ConnectionError> {
391 let sessions = {
392 let mut sessions = self.sessions.write().await;
393 let active: Vec<_> = sessions.drain().map(|(_, s)| s).collect();
394 active
395 };
396
397 for session in sessions {
398 session.close().await?;
399 }
400
401 Ok(())
402 }
403
404 pub async fn cleanup_idle_sessions(&self) -> usize {
406 let sessions = self.sessions.read().await;
407 let mut to_remove = Vec::new();
408
409 for (id, session) in sessions.iter() {
410 if session.is_idle_timeout().await {
411 to_remove.push(id.clone());
412 }
413 }
414
415 drop(sessions);
416
417 let count = to_remove.len();
418 for id in to_remove {
419 if let Some(session) = self.remove_session(&id).await {
420 let _ = session.close().await;
421 }
422 }
423
424 count
425 }
426}
427
428impl std::fmt::Debug for SessionManager {
429 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
430 f.debug_struct("SessionManager")
431 .field("config", &self.config)
432 .finish()
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439 use crate::connection::auth::{AuthResponseData, Credentials};
440
441 fn mock_server_info() -> AuthResponseData {
442 AuthResponseData {
443 session_id: "test_session".to_string(),
444 protocol_version: 3,
445 release_version: "7.1.0".to_string(),
446 database_name: "EXA".to_string(),
447 product_name: "Exasol".to_string(),
448 max_data_message_size: 4_194_304,
449 max_identifier_length: 128,
450 max_varchar_length: 2_000_000,
451 identifier_quote_string: "\"".to_string(),
452 time_zone: "UTC".to_string(),
453 time_zone_behavior: "INVALID TIMESTAMP TO DOUBLE".to_string(),
454 }
455 }
456
457 #[tokio::test]
458 async fn test_session_creation() {
459 let session = Session::new(
460 "sess123".to_string(),
461 mock_server_info(),
462 SessionConfig::default(),
463 );
464
465 assert_eq!(session.session_id(), "sess123");
466 assert_eq!(session.state().await, SessionState::Ready);
467 assert_eq!(session.query_count(), 0);
468 assert!(!session.in_transaction());
469 }
470
471 #[tokio::test]
472 async fn test_session_state_transitions() {
473 let session = Session::new(
474 "sess123".to_string(),
475 mock_server_info(),
476 SessionConfig::default(),
477 );
478
479 assert_eq!(session.state().await, SessionState::Ready);
480
481 session.set_state(SessionState::Executing).await;
482 assert_eq!(session.state().await, SessionState::Executing);
483
484 session.set_state(SessionState::Idle).await;
485 assert_eq!(session.state().await, SessionState::Idle);
486
487 session.set_state(SessionState::Closed).await;
488 assert_eq!(session.state().await, SessionState::Closed);
489 }
490
491 #[tokio::test]
492 async fn test_session_query_counter() {
493 let session = Session::new(
494 "sess123".to_string(),
495 mock_server_info(),
496 SessionConfig::default(),
497 );
498
499 assert_eq!(session.increment_query_count(), 1);
500 assert_eq!(session.increment_query_count(), 2);
501 assert_eq!(session.query_count(), 2);
502 }
503
504 #[tokio::test]
505 async fn test_session_transaction() {
506 let session = Session::new(
507 "sess123".to_string(),
508 mock_server_info(),
509 SessionConfig::default(),
510 );
511
512 assert!(!session.in_transaction());
513
514 session.begin_transaction().await.unwrap();
516 assert!(session.in_transaction());
517 assert_eq!(session.state().await, SessionState::InTransaction);
518
519 let result = session.begin_transaction().await;
521 assert!(result.is_err());
522
523 session.commit_transaction().await.unwrap();
525 assert!(!session.in_transaction());
526 assert_eq!(session.state().await, SessionState::Ready);
527 }
528
529 #[tokio::test]
530 async fn test_session_rollback() {
531 let session = Session::new(
532 "sess123".to_string(),
533 mock_server_info(),
534 SessionConfig::default(),
535 );
536
537 session.begin_transaction().await.unwrap();
538 assert!(session.in_transaction());
539
540 session.rollback_transaction().await.unwrap();
541 assert!(!session.in_transaction());
542 assert_eq!(session.state().await, SessionState::Ready);
543 }
544
545 #[tokio::test]
546 async fn test_session_schema() {
547 let session = Session::new(
548 "sess123".to_string(),
549 mock_server_info(),
550 SessionConfig::default(),
551 );
552
553 assert!(session.current_schema().await.is_none());
554
555 session
556 .set_current_schema(Some("MY_SCHEMA".to_string()))
557 .await;
558 assert_eq!(
559 session.current_schema().await,
560 Some("MY_SCHEMA".to_string())
561 );
562
563 session.set_current_schema(None).await;
564 assert!(session.current_schema().await.is_none());
565 }
566
567 #[tokio::test]
568 async fn test_session_attributes() {
569 let session = Session::new(
570 "sess123".to_string(),
571 mock_server_info(),
572 SessionConfig::default(),
573 );
574
575 assert!(session.get_attribute("key1").await.is_none());
576
577 session
578 .set_attribute("key1".to_string(), "value1".to_string())
579 .await;
580 assert_eq!(
581 session.get_attribute("key1").await,
582 Some("value1".to_string())
583 );
584
585 let removed = session.remove_attribute("key1").await;
586 assert_eq!(removed, Some("value1".to_string()));
587 assert!(session.get_attribute("key1").await.is_none());
588 }
589
590 #[tokio::test]
591 async fn test_session_activity() {
592 let session = Session::new(
593 "sess123".to_string(),
594 mock_server_info(),
595 SessionConfig::default(),
596 );
597
598 session.update_activity().await;
599
600 let idle = session.idle_duration().await;
601 assert!(idle < Duration::from_millis(100));
602
603 tokio::time::sleep(Duration::from_millis(10)).await;
604 let idle = session.idle_duration().await;
605 assert!(idle >= Duration::from_millis(10));
606 }
607
608 #[tokio::test]
609 async fn test_session_close() {
610 let session = Session::new(
611 "sess123".to_string(),
612 mock_server_info(),
613 SessionConfig::default(),
614 );
615
616 assert!(!session.is_closed().await);
617
618 session.close().await.unwrap();
619 assert!(session.is_closed().await);
620 assert_eq!(session.state().await, SessionState::Closed);
621 }
622
623 #[tokio::test]
624 async fn test_session_validate_ready() {
625 let session = Session::new(
626 "sess123".to_string(),
627 mock_server_info(),
628 SessionConfig::default(),
629 );
630
631 assert!(session.validate_ready().await.is_ok());
633
634 session.set_state(SessionState::Closed).await;
636 assert!(session.validate_ready().await.is_err());
637
638 session.set_state(SessionState::Error).await;
640 assert!(session.validate_ready().await.is_err());
641 }
642
643 #[tokio::test]
644 async fn test_session_manager() {
645 let creds = Credentials::new("admin".to_string(), "secret".to_string());
646 let auth_handler = Arc::new(AuthenticationHandler::new(
647 creds,
648 "test".to_string(),
649 "1.0".to_string(),
650 ));
651 let manager = SessionManager::new(auth_handler, SessionConfig::default());
652
653 let session = Arc::new(Session::new(
654 "sess123".to_string(),
655 mock_server_info(),
656 SessionConfig::default(),
657 ));
658
659 manager.register_session(session.clone()).await;
661
662 let retrieved = manager.get_session("sess123").await;
664 assert!(retrieved.is_some());
665 assert_eq!(retrieved.unwrap().session_id(), "sess123");
666
667 let active = manager.active_sessions().await;
669 assert_eq!(active.len(), 1);
670
671 let removed = manager.remove_session("sess123").await;
673 assert!(removed.is_some());
674
675 assert!(manager.get_session("sess123").await.is_none());
677 }
678
679 #[tokio::test]
680 async fn test_session_manager_close_all() {
681 let creds = Credentials::new("admin".to_string(), "secret".to_string());
682 let auth_handler = Arc::new(AuthenticationHandler::new(
683 creds,
684 "test".to_string(),
685 "1.0".to_string(),
686 ));
687 let manager = SessionManager::new(auth_handler, SessionConfig::default());
688
689 for i in 0..3 {
691 let session = Arc::new(Session::new(
692 format!("sess{}", i),
693 mock_server_info(),
694 SessionConfig::default(),
695 ));
696 manager.register_session(session).await;
697 }
698
699 assert_eq!(manager.active_sessions().await.len(), 3);
700
701 manager.close_all().await.unwrap();
703 assert_eq!(manager.active_sessions().await.len(), 0);
704 }
705
706 #[test]
707 fn test_session_state_checks() {
708 assert!(SessionState::Ready.is_active());
709 assert!(SessionState::Executing.is_active());
710 assert!(!SessionState::Closed.is_active());
711 assert!(!SessionState::Error.is_active());
712
713 assert!(SessionState::Ready.can_execute());
714 assert!(SessionState::InTransaction.can_execute());
715 assert!(!SessionState::Executing.can_execute());
716 assert!(!SessionState::Closed.can_execute());
717 }
718}