Skip to main content

exarrow_rs/connection/
session.rs

1//! Session management for Exasol database connections.
2//!
3//! This module handles session lifecycle, state tracking, and connection pooling support.
4
5use crate::connection::auth::{AuthResponseData, AuthenticationHandler};
6use crate::error::ConnectionError;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::RwLock;
11
12/// Session configuration.
13#[derive(Debug, Clone)]
14pub struct SessionConfig {
15    /// Session idle timeout
16    pub idle_timeout: Duration,
17
18    /// Enable automatic session keepalive
19    pub enable_keepalive: bool,
20
21    /// Keepalive interval
22    pub keepalive_interval: Duration,
23
24    /// Maximum number of retries for failed operations
25    pub max_retries: u32,
26
27    /// Enable transaction auto-commit mode
28    pub auto_commit: bool,
29
30    /// Default fetch size for queries
31    pub default_fetch_size: usize,
32
33    /// Query timeout
34    pub query_timeout: Duration,
35}
36
37impl Default for SessionConfig {
38    fn default() -> Self {
39        Self {
40            idle_timeout: Duration::from_secs(600),
41            enable_keepalive: true,
42            keepalive_interval: Duration::from_secs(60),
43            max_retries: 3,
44            auto_commit: true,
45            default_fetch_size: 1000,
46            query_timeout: Duration::from_secs(300),
47        }
48    }
49}
50
51/// Session state.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum SessionState {
54    /// Session is being initialized
55    Initializing,
56
57    /// Session is connected and ready
58    Ready,
59
60    /// Session is executing a query
61    Executing,
62
63    /// Session is in a transaction
64    InTransaction,
65
66    /// Session is idle
67    Idle,
68
69    /// Session is being closed
70    Closing,
71
72    /// Session is closed
73    Closed,
74
75    /// Session encountered an error
76    Error,
77}
78
79impl SessionState {
80    /// Check if the session is active.
81    pub fn is_active(&self) -> bool {
82        matches!(
83            self,
84            SessionState::Ready
85                | SessionState::Executing
86                | SessionState::InTransaction
87                | SessionState::Idle
88        )
89    }
90
91    /// Check if the session can execute queries.
92    pub fn can_execute(&self) -> bool {
93        matches!(
94            self,
95            SessionState::Ready | SessionState::InTransaction | SessionState::Idle
96        )
97    }
98}
99
100/// Database session information and state tracking.
101pub struct Session {
102    /// Session ID from the server
103    session_id: String,
104
105    /// Server information from authentication
106    server_info: AuthResponseData,
107
108    /// Session configuration
109    config: SessionConfig,
110
111    /// Current session state
112    state: Arc<RwLock<SessionState>>,
113
114    /// Last activity timestamp
115    last_activity: Arc<RwLock<Instant>>,
116
117    /// Query execution counter
118    query_count: AtomicU64,
119
120    /// Transaction active flag
121    in_transaction: AtomicBool,
122
123    /// Current schema
124    current_schema: Arc<RwLock<Option<String>>>,
125
126    /// Session attributes
127    attributes: Arc<RwLock<std::collections::HashMap<String, String>>>,
128}
129
130impl Session {
131    /// Create a new session.
132    pub fn new(session_id: String, server_info: AuthResponseData, config: SessionConfig) -> Self {
133        Self {
134            session_id,
135            server_info,
136            config,
137            state: Arc::new(RwLock::new(SessionState::Ready)),
138            last_activity: Arc::new(RwLock::new(Instant::now())),
139            query_count: AtomicU64::new(0),
140            in_transaction: AtomicBool::new(false),
141            current_schema: Arc::new(RwLock::new(None)),
142            attributes: Arc::new(RwLock::new(std::collections::HashMap::new())),
143        }
144    }
145
146    /// Get the session ID.
147    pub fn session_id(&self) -> &str {
148        &self.session_id
149    }
150
151    /// Get server information.
152    pub fn server_info(&self) -> &AuthResponseData {
153        &self.server_info
154    }
155
156    /// Get session configuration.
157    pub fn config(&self) -> &SessionConfig {
158        &self.config
159    }
160
161    /// Get current session state.
162    pub async fn state(&self) -> SessionState {
163        *self.state.read().await
164    }
165
166    /// Set session state.
167    pub async fn set_state(&self, new_state: SessionState) {
168        let mut state = self.state.write().await;
169        *state = new_state;
170    }
171
172    /// Update last activity timestamp.
173    pub async fn update_activity(&self) {
174        let mut last_activity = self.last_activity.write().await;
175        *last_activity = Instant::now();
176    }
177
178    /// Get time since last activity.
179    pub async fn idle_duration(&self) -> Duration {
180        let last_activity = self.last_activity.read().await;
181        last_activity.elapsed()
182    }
183
184    /// Check if session is idle beyond timeout.
185    pub async fn is_idle_timeout(&self) -> bool {
186        self.idle_duration().await > self.config.idle_timeout
187    }
188
189    /// Increment query counter.
190    pub fn increment_query_count(&self) -> u64 {
191        self.query_count.fetch_add(1, Ordering::SeqCst) + 1
192    }
193
194    /// Get total query count.
195    pub fn query_count(&self) -> u64 {
196        self.query_count.load(Ordering::SeqCst)
197    }
198
199    /// Check if in transaction.
200    pub fn in_transaction(&self) -> bool {
201        self.in_transaction.load(Ordering::SeqCst)
202    }
203
204    /// Begin a transaction.
205    pub async fn begin_transaction(&self) -> Result<(), ConnectionError> {
206        let state = self.state().await;
207        if !state.can_execute() {
208            return Err(ConnectionError::ConnectionClosed);
209        }
210
211        if self.in_transaction() {
212            return Err(ConnectionError::InvalidParameter {
213                parameter: "transaction".to_string(),
214                message: "Transaction already active".to_string(),
215            });
216        }
217
218        self.in_transaction.store(true, Ordering::SeqCst);
219        self.set_state(SessionState::InTransaction).await;
220        self.update_activity().await;
221
222        Ok(())
223    }
224
225    /// Commit the current transaction.
226    pub async fn commit_transaction(&self) -> Result<(), ConnectionError> {
227        if !self.in_transaction() {
228            return Err(ConnectionError::InvalidParameter {
229                parameter: "transaction".to_string(),
230                message: "No active transaction".to_string(),
231            });
232        }
233
234        self.in_transaction.store(false, Ordering::SeqCst);
235        self.set_state(SessionState::Ready).await;
236        self.update_activity().await;
237
238        Ok(())
239    }
240
241    /// Rollback the current transaction.
242    pub async fn rollback_transaction(&self) -> Result<(), ConnectionError> {
243        if !self.in_transaction() {
244            return Err(ConnectionError::InvalidParameter {
245                parameter: "transaction".to_string(),
246                message: "No active transaction".to_string(),
247            });
248        }
249
250        self.in_transaction.store(false, Ordering::SeqCst);
251        self.set_state(SessionState::Ready).await;
252        self.update_activity().await;
253
254        Ok(())
255    }
256
257    /// Get current schema.
258    pub async fn current_schema(&self) -> Option<String> {
259        self.current_schema.read().await.clone()
260    }
261
262    /// Set current schema.
263    pub async fn set_current_schema(&self, schema: Option<String>) {
264        let mut current_schema = self.current_schema.write().await;
265        *current_schema = schema;
266        self.update_activity().await;
267    }
268
269    /// Get a session attribute.
270    pub async fn get_attribute(&self, key: &str) -> Option<String> {
271        let attributes = self.attributes.read().await;
272        attributes.get(key).cloned()
273    }
274
275    /// Set a session attribute.
276    pub async fn set_attribute(&self, key: String, value: String) {
277        let mut attributes = self.attributes.write().await;
278        attributes.insert(key, value);
279    }
280
281    /// Remove a session attribute.
282    pub async fn remove_attribute(&self, key: &str) -> Option<String> {
283        let mut attributes = self.attributes.write().await;
284        attributes.remove(key)
285    }
286
287    /// Close the session.
288    pub async fn close(&self) -> Result<(), ConnectionError> {
289        self.set_state(SessionState::Closing).await;
290
291        // Clean up resources
292        if self.in_transaction() {
293            // Force rollback if in transaction
294            self.in_transaction.store(false, Ordering::SeqCst);
295        }
296
297        self.set_state(SessionState::Closed).await;
298
299        Ok(())
300    }
301
302    /// Check if session is closed.
303    pub async fn is_closed(&self) -> bool {
304        matches!(self.state().await, SessionState::Closed)
305    }
306
307    /// Mark session as having an error.
308    pub async fn mark_error(&self) {
309        self.set_state(SessionState::Error).await;
310    }
311
312    /// Validate session is ready for operations.
313    pub async fn validate_ready(&self) -> Result<(), ConnectionError> {
314        let state = self.state().await;
315
316        match state {
317            SessionState::Closed => Err(ConnectionError::ConnectionClosed),
318            SessionState::Error => Err(ConnectionError::InvalidParameter {
319                parameter: "session".to_string(),
320                message: "Session is in error state".to_string(),
321            }),
322            SessionState::Closing => Err(ConnectionError::ConnectionClosed),
323            _ if !state.is_active() => Err(ConnectionError::InvalidParameter {
324                parameter: "session".to_string(),
325                message: format!("Session is not active: {:?}", state),
326            }),
327            _ => Ok(()),
328        }
329    }
330}
331
332impl std::fmt::Debug for Session {
333    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334        f.debug_struct("Session")
335            .field("session_id", &self.session_id)
336            .field("config", &self.config)
337            .field("query_count", &self.query_count())
338            .field("in_transaction", &self.in_transaction())
339            .finish()
340    }
341}
342
343/// Session manager for connection pooling and lifecycle management.
344pub struct SessionManager {
345    /// Active sessions
346    sessions: Arc<RwLock<std::collections::HashMap<String, Arc<Session>>>>,
347
348    /// Session factory (authentication handler)
349    _auth_handler: Arc<AuthenticationHandler>,
350
351    /// Session configuration
352    config: SessionConfig,
353}
354
355impl SessionManager {
356    /// Create a new session manager.
357    pub fn new(auth_handler: Arc<AuthenticationHandler>, config: SessionConfig) -> Self {
358        Self {
359            sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
360            _auth_handler: auth_handler,
361            config,
362        }
363    }
364
365    /// Register a new session.
366    pub async fn register_session(&self, session: Arc<Session>) {
367        let mut sessions = self.sessions.write().await;
368        sessions.insert(session.session_id().to_string(), session);
369    }
370
371    /// Get a session by ID.
372    pub async fn get_session(&self, session_id: &str) -> Option<Arc<Session>> {
373        let sessions = self.sessions.read().await;
374        sessions.get(session_id).cloned()
375    }
376
377    /// Remove a session.
378    pub async fn remove_session(&self, session_id: &str) -> Option<Arc<Session>> {
379        let mut sessions = self.sessions.write().await;
380        sessions.remove(session_id)
381    }
382
383    /// Get all active sessions.
384    pub async fn active_sessions(&self) -> Vec<Arc<Session>> {
385        let sessions = self.sessions.read().await;
386        sessions.values().cloned().collect()
387    }
388
389    /// Close all sessions.
390    pub async fn close_all(&self) -> Result<(), ConnectionError> {
391        let sessions = {
392            let mut sessions = self.sessions.write().await;
393            let active: Vec<_> = sessions.drain().map(|(_, s)| s).collect();
394            active
395        };
396
397        for session in sessions {
398            session.close().await?;
399        }
400
401        Ok(())
402    }
403
404    /// Clean up idle sessions beyond timeout.
405    pub async fn cleanup_idle_sessions(&self) -> usize {
406        let sessions = self.sessions.read().await;
407        let mut to_remove = Vec::new();
408
409        for (id, session) in sessions.iter() {
410            if session.is_idle_timeout().await {
411                to_remove.push(id.clone());
412            }
413        }
414
415        drop(sessions);
416
417        let count = to_remove.len();
418        for id in to_remove {
419            if let Some(session) = self.remove_session(&id).await {
420                let _ = session.close().await;
421            }
422        }
423
424        count
425    }
426}
427
428impl std::fmt::Debug for SessionManager {
429    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
430        f.debug_struct("SessionManager")
431            .field("config", &self.config)
432            .finish()
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439    use crate::connection::auth::{AuthResponseData, Credentials};
440
441    fn mock_server_info() -> AuthResponseData {
442        AuthResponseData {
443            session_id: "test_session".to_string(),
444            protocol_version: 3,
445            release_version: "7.1.0".to_string(),
446            database_name: "EXA".to_string(),
447            product_name: "Exasol".to_string(),
448            max_data_message_size: 4_194_304,
449            max_identifier_length: 128,
450            max_varchar_length: 2_000_000,
451            identifier_quote_string: "\"".to_string(),
452            time_zone: "UTC".to_string(),
453            time_zone_behavior: "INVALID TIMESTAMP TO DOUBLE".to_string(),
454        }
455    }
456
457    #[tokio::test]
458    async fn test_session_creation() {
459        let session = Session::new(
460            "sess123".to_string(),
461            mock_server_info(),
462            SessionConfig::default(),
463        );
464
465        assert_eq!(session.session_id(), "sess123");
466        assert_eq!(session.state().await, SessionState::Ready);
467        assert_eq!(session.query_count(), 0);
468        assert!(!session.in_transaction());
469    }
470
471    #[tokio::test]
472    async fn test_session_state_transitions() {
473        let session = Session::new(
474            "sess123".to_string(),
475            mock_server_info(),
476            SessionConfig::default(),
477        );
478
479        assert_eq!(session.state().await, SessionState::Ready);
480
481        session.set_state(SessionState::Executing).await;
482        assert_eq!(session.state().await, SessionState::Executing);
483
484        session.set_state(SessionState::Idle).await;
485        assert_eq!(session.state().await, SessionState::Idle);
486
487        session.set_state(SessionState::Closed).await;
488        assert_eq!(session.state().await, SessionState::Closed);
489    }
490
491    #[tokio::test]
492    async fn test_session_query_counter() {
493        let session = Session::new(
494            "sess123".to_string(),
495            mock_server_info(),
496            SessionConfig::default(),
497        );
498
499        assert_eq!(session.increment_query_count(), 1);
500        assert_eq!(session.increment_query_count(), 2);
501        assert_eq!(session.query_count(), 2);
502    }
503
504    #[tokio::test]
505    async fn test_session_transaction() {
506        let session = Session::new(
507            "sess123".to_string(),
508            mock_server_info(),
509            SessionConfig::default(),
510        );
511
512        assert!(!session.in_transaction());
513
514        // Begin transaction
515        session.begin_transaction().await.unwrap();
516        assert!(session.in_transaction());
517        assert_eq!(session.state().await, SessionState::InTransaction);
518
519        // Cannot begin another transaction
520        let result = session.begin_transaction().await;
521        assert!(result.is_err());
522
523        // Commit transaction
524        session.commit_transaction().await.unwrap();
525        assert!(!session.in_transaction());
526        assert_eq!(session.state().await, SessionState::Ready);
527    }
528
529    #[tokio::test]
530    async fn test_session_rollback() {
531        let session = Session::new(
532            "sess123".to_string(),
533            mock_server_info(),
534            SessionConfig::default(),
535        );
536
537        session.begin_transaction().await.unwrap();
538        assert!(session.in_transaction());
539
540        session.rollback_transaction().await.unwrap();
541        assert!(!session.in_transaction());
542        assert_eq!(session.state().await, SessionState::Ready);
543    }
544
545    #[tokio::test]
546    async fn test_session_schema() {
547        let session = Session::new(
548            "sess123".to_string(),
549            mock_server_info(),
550            SessionConfig::default(),
551        );
552
553        assert!(session.current_schema().await.is_none());
554
555        session
556            .set_current_schema(Some("MY_SCHEMA".to_string()))
557            .await;
558        assert_eq!(
559            session.current_schema().await,
560            Some("MY_SCHEMA".to_string())
561        );
562
563        session.set_current_schema(None).await;
564        assert!(session.current_schema().await.is_none());
565    }
566
567    #[tokio::test]
568    async fn test_session_attributes() {
569        let session = Session::new(
570            "sess123".to_string(),
571            mock_server_info(),
572            SessionConfig::default(),
573        );
574
575        assert!(session.get_attribute("key1").await.is_none());
576
577        session
578            .set_attribute("key1".to_string(), "value1".to_string())
579            .await;
580        assert_eq!(
581            session.get_attribute("key1").await,
582            Some("value1".to_string())
583        );
584
585        let removed = session.remove_attribute("key1").await;
586        assert_eq!(removed, Some("value1".to_string()));
587        assert!(session.get_attribute("key1").await.is_none());
588    }
589
590    #[tokio::test]
591    async fn test_session_activity() {
592        let session = Session::new(
593            "sess123".to_string(),
594            mock_server_info(),
595            SessionConfig::default(),
596        );
597
598        session.update_activity().await;
599
600        let idle = session.idle_duration().await;
601        assert!(idle < Duration::from_millis(100));
602
603        tokio::time::sleep(Duration::from_millis(10)).await;
604        let idle = session.idle_duration().await;
605        assert!(idle >= Duration::from_millis(10));
606    }
607
608    #[tokio::test]
609    async fn test_session_close() {
610        let session = Session::new(
611            "sess123".to_string(),
612            mock_server_info(),
613            SessionConfig::default(),
614        );
615
616        assert!(!session.is_closed().await);
617
618        session.close().await.unwrap();
619        assert!(session.is_closed().await);
620        assert_eq!(session.state().await, SessionState::Closed);
621    }
622
623    #[tokio::test]
624    async fn test_session_validate_ready() {
625        let session = Session::new(
626            "sess123".to_string(),
627            mock_server_info(),
628            SessionConfig::default(),
629        );
630
631        // Ready state should validate
632        assert!(session.validate_ready().await.is_ok());
633
634        // Closed state should fail
635        session.set_state(SessionState::Closed).await;
636        assert!(session.validate_ready().await.is_err());
637
638        // Error state should fail
639        session.set_state(SessionState::Error).await;
640        assert!(session.validate_ready().await.is_err());
641    }
642
643    #[tokio::test]
644    async fn test_session_manager() {
645        let creds = Credentials::new("admin".to_string(), "secret".to_string());
646        let auth_handler = Arc::new(AuthenticationHandler::new(
647            creds,
648            "test".to_string(),
649            "1.0".to_string(),
650        ));
651        let manager = SessionManager::new(auth_handler, SessionConfig::default());
652
653        let session = Arc::new(Session::new(
654            "sess123".to_string(),
655            mock_server_info(),
656            SessionConfig::default(),
657        ));
658
659        // Register session
660        manager.register_session(session.clone()).await;
661
662        // Get session
663        let retrieved = manager.get_session("sess123").await;
664        assert!(retrieved.is_some());
665        assert_eq!(retrieved.unwrap().session_id(), "sess123");
666
667        // Active sessions
668        let active = manager.active_sessions().await;
669        assert_eq!(active.len(), 1);
670
671        // Remove session
672        let removed = manager.remove_session("sess123").await;
673        assert!(removed.is_some());
674
675        // Session no longer found
676        assert!(manager.get_session("sess123").await.is_none());
677    }
678
679    #[tokio::test]
680    async fn test_session_manager_close_all() {
681        let creds = Credentials::new("admin".to_string(), "secret".to_string());
682        let auth_handler = Arc::new(AuthenticationHandler::new(
683            creds,
684            "test".to_string(),
685            "1.0".to_string(),
686        ));
687        let manager = SessionManager::new(auth_handler, SessionConfig::default());
688
689        // Register multiple sessions
690        for i in 0..3 {
691            let session = Arc::new(Session::new(
692                format!("sess{}", i),
693                mock_server_info(),
694                SessionConfig::default(),
695            ));
696            manager.register_session(session).await;
697        }
698
699        assert_eq!(manager.active_sessions().await.len(), 3);
700
701        // Close all
702        manager.close_all().await.unwrap();
703        assert_eq!(manager.active_sessions().await.len(), 0);
704    }
705
706    #[test]
707    fn test_session_state_checks() {
708        assert!(SessionState::Ready.is_active());
709        assert!(SessionState::Executing.is_active());
710        assert!(!SessionState::Closed.is_active());
711        assert!(!SessionState::Error.is_active());
712
713        assert!(SessionState::Ready.can_execute());
714        assert!(SessionState::InTransaction.can_execute());
715        assert!(!SessionState::Executing.can_execute());
716        assert!(!SessionState::Closed.can_execute());
717    }
718}