turul_mcp_client/
session.rs

1//! Session management for MCP client
2
3use serde_json::Value;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tokio::sync::RwLock;
7use tracing::{debug, info, warn};
8
9use crate::config::ClientConfig;
10use crate::error::{McpClientResult, SessionError};
11use turul_mcp_protocol::{
12    ClientCapabilities, Implementation, InitializeRequest, ServerCapabilities,
13};
14
15/// Session state enumeration
16#[derive(Debug, Clone, PartialEq)]
17pub enum SessionState {
18    /// Session is not initialized
19    Uninitialized,
20    /// Session is being initialized
21    Initializing,
22    /// Session is active and ready for operations
23    Active,
24    /// Session is reconnecting
25    Reconnecting,
26    /// Session has been terminated
27    Terminated,
28    /// Session encountered an error
29    Error(String),
30}
31
32impl std::fmt::Display for SessionState {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        match self {
35            SessionState::Uninitialized => write!(f, "uninitialized"),
36            SessionState::Initializing => write!(f, "initializing"),
37            SessionState::Active => write!(f, "active"),
38            SessionState::Reconnecting => write!(f, "reconnecting"),
39            SessionState::Terminated => write!(f, "terminated"),
40            SessionState::Error(err) => write!(f, "error: {}", err),
41        }
42    }
43}
44
45/// Session information and metadata
46#[derive(Debug, Clone)]
47pub struct SessionInfo {
48    /// Session identifier from server (None until server provides it)
49    pub session_id: Option<String>,
50
51    /// Session state
52    pub state: SessionState,
53
54    /// Client capabilities sent during initialization
55    pub client_capabilities: Option<ClientCapabilities>,
56
57    /// Server capabilities received during initialization
58    pub server_capabilities: Option<ServerCapabilities>,
59
60    /// Protocol version negotiated
61    pub protocol_version: Option<String>,
62
63    /// Session creation timestamp
64    pub created_at: Instant,
65
66    /// Last activity timestamp
67    pub last_activity: Instant,
68
69    /// Connection attempt count
70    pub connection_attempts: u32,
71
72    /// Session metadata
73    pub metadata: Value,
74}
75
76impl SessionInfo {
77    /// Create a new session info
78    pub fn new() -> Self {
79        let now = Instant::now();
80        Self {
81            session_id: None,
82            state: SessionState::Uninitialized,
83            client_capabilities: None,
84            server_capabilities: None,
85            protocol_version: None,
86            created_at: now,
87            last_activity: now,
88            connection_attempts: 0,
89            metadata: Value::Null,
90        }
91    }
92
93    /// Update last activity timestamp
94    pub fn update_activity(&mut self) {
95        self.last_activity = Instant::now();
96    }
97
98    /// Get session duration
99    pub fn duration(&self) -> Duration {
100        self.last_activity.duration_since(self.created_at)
101    }
102
103    /// Get time since last activity
104    pub fn idle_time(&self) -> Duration {
105        Instant::now().duration_since(self.last_activity)
106    }
107
108    /// Check if session is active
109    pub fn is_active(&self) -> bool {
110        self.state == SessionState::Active
111    }
112
113    /// Check if session can be used for operations
114    pub fn is_ready(&self) -> bool {
115        matches!(self.state, SessionState::Active)
116    }
117
118    /// Check if session needs initialization
119    pub fn needs_initialization(&self) -> bool {
120        matches!(self.state, SessionState::Uninitialized)
121    }
122}
123
124impl Default for SessionInfo {
125    fn default() -> Self {
126        Self::new()
127    }
128}
129
130/// Session manager handles session lifecycle and state
131#[derive(Debug)]
132pub struct SessionManager {
133    /// Current session information
134    session: Arc<RwLock<SessionInfo>>,
135
136    /// Client configuration
137    config: ClientConfig,
138}
139
140impl SessionManager {
141    /// Create a new session manager
142    pub fn new(config: ClientConfig) -> Self {
143        Self {
144            session: Arc::new(RwLock::new(SessionInfo::new())),
145            config,
146        }
147    }
148
149    /// Get current session information
150    pub async fn session_info(&self) -> SessionInfo {
151        self.session.read().await.clone()
152    }
153
154    /// Get session ID (returns error if not yet initialized by server)
155    pub async fn session_id(&self) -> McpClientResult<String> {
156        let session = self.session.read().await;
157        session
158            .session_id
159            .clone()
160            .ok_or_else(|| SessionError::NotInitialized.into())
161    }
162
163    /// Get session ID if available (returns None if not initialized)
164    pub async fn session_id_optional(&self) -> Option<String> {
165        self.session.read().await.session_id.clone()
166    }
167
168    /// Set session ID (called when server provides it during initialization)
169    pub async fn set_session_id(&self, session_id: String) -> McpClientResult<()> {
170        let mut session = self.session.write().await;
171        session.session_id = Some(session_id);
172        Ok(())
173    }
174
175    /// Get current session state
176    pub async fn state(&self) -> SessionState {
177        self.session.read().await.state.clone()
178    }
179
180    /// Update session state
181    pub async fn set_state(&self, state: SessionState) {
182        let mut session = self.session.write().await;
183        debug!("Session state transition: {} -> {}", session.state, state);
184        session.state = state;
185        session.update_activity();
186    }
187
188    /// Check if session is ready for operations
189    pub async fn is_ready(&self) -> bool {
190        self.session.read().await.is_ready()
191    }
192
193    /// Initialize session with server capabilities
194    pub async fn initialize(
195        &self,
196        client_capabilities: ClientCapabilities,
197        server_capabilities: ServerCapabilities,
198        protocol_version: String,
199    ) -> McpClientResult<()> {
200        let mut session = self.session.write().await;
201
202        if !matches!(
203            session.state,
204            SessionState::Uninitialized | SessionState::Initializing
205        ) {
206            return Err(SessionError::AlreadyInitialized.into());
207        }
208
209        session.client_capabilities = Some(client_capabilities);
210        session.server_capabilities = Some(server_capabilities);
211        session.protocol_version = Some(protocol_version.clone());
212        session.state = SessionState::Active;
213        session.update_activity();
214
215        info!(
216            session_id = %session.session_id.as_deref().unwrap_or("None"),
217            protocol_version = %protocol_version,
218            "Session initialized successfully"
219        );
220
221        Ok(())
222    }
223
224    /// Mark session as initializing
225    pub async fn mark_initializing(&self) -> McpClientResult<()> {
226        let mut session = self.session.write().await;
227
228        if !session.needs_initialization() {
229            return Err(SessionError::AlreadyInitialized.into());
230        }
231
232        session.state = SessionState::Initializing;
233        session.connection_attempts += 1;
234        session.update_activity();
235
236        debug!(
237            session_id = %session.session_id.as_deref().unwrap_or("None"),
238            attempt = session.connection_attempts,
239            "Session initialization started"
240        );
241
242        Ok(())
243    }
244
245    /// Terminate session
246    pub async fn terminate(&self, reason: Option<String>) {
247        let mut session = self.session.write().await;
248
249        let previous_state = session.state.clone();
250        session.state = SessionState::Terminated;
251        session.update_activity();
252
253        info!(
254            session_id = %session.session_id.as_deref().unwrap_or("None"),
255            previous_state = %previous_state,
256            reason = reason.as_deref().unwrap_or("user requested"),
257            "Session terminated"
258        );
259    }
260
261    /// Handle session error
262    pub async fn handle_error(&self, error: String) {
263        let mut session = self.session.write().await;
264
265        let previous_state = session.state.clone();
266        session.state = SessionState::Error(error.clone());
267        session.update_activity();
268
269        warn!(
270            session_id = %session.session_id.as_deref().unwrap_or("None"),
271            previous_state = %previous_state,
272            error = %error,
273            "Session encountered error"
274        );
275    }
276
277    /// Start reconnection process
278    pub async fn start_reconnection(&self) {
279        let mut session = self.session.write().await;
280
281        if matches!(session.state, SessionState::Terminated) {
282            return; // Cannot reconnect terminated sessions
283        }
284
285        session.state = SessionState::Reconnecting;
286        session.connection_attempts += 1;
287        session.update_activity();
288
289        info!(
290            session_id = %session.session_id.as_deref().unwrap_or("None"),
291            attempt = session.connection_attempts,
292            "Session reconnection started"
293        );
294    }
295
296    /// Reset session for new connection
297    pub async fn reset(&self) {
298        let mut session = self.session.write().await;
299        *session = SessionInfo::new();
300
301        debug!(
302            session_id = %session.session_id.as_deref().unwrap_or("None"),
303            "Session reset for new connection"
304        );
305    }
306
307    /// Update activity timestamp
308    pub async fn update_activity(&self) {
309        self.session.write().await.update_activity();
310    }
311
312    /// Get client capabilities for initialization
313    pub fn create_client_capabilities(&self) -> ClientCapabilities {
314        ClientCapabilities {
315            experimental: None,
316            sampling: None,
317            elicitation: None,
318            roots: None,
319        }
320    }
321
322    /// Create initialization request
323    pub async fn create_initialize_request(&self) -> InitializeRequest {
324        let client_info = &self.config.client_info;
325
326        InitializeRequest {
327            protocol_version: "2025-06-18".to_string(),
328            capabilities: self.create_client_capabilities(),
329            client_info: Implementation {
330                name: client_info.name.clone(),
331                version: client_info.version.clone(),
332                title: None,
333            },
334        }
335    }
336
337    /// Validate server capabilities
338    pub async fn validate_server_capabilities(
339        &self,
340        server_capabilities: &ServerCapabilities,
341    ) -> McpClientResult<()> {
342        // Basic validation - ensure server supports required capabilities
343        debug!(
344            tools = ?server_capabilities.tools,
345            resources = ?server_capabilities.resources,
346            prompts = ?server_capabilities.prompts,
347            "Validating server capabilities"
348        );
349
350        // For now, we accept any server capabilities
351        // In the future, we might want to check for required features
352        Ok(())
353    }
354
355    /// Get session statistics
356    pub async fn statistics(&self) -> SessionStatistics {
357        let session = self.session.read().await;
358
359        SessionStatistics {
360            session_id: session.session_id.clone(),
361            state: session.state.clone(),
362            duration: session.duration(),
363            idle_time: session.idle_time(),
364            connection_attempts: session.connection_attempts,
365            protocol_version: session.protocol_version.clone(),
366        }
367    }
368}
369
370/// Session statistics for monitoring and debugging
371#[derive(Debug, Clone)]
372pub struct SessionStatistics {
373    pub session_id: Option<String>,
374    pub state: SessionState,
375    pub duration: Duration,
376    pub idle_time: Duration,
377    pub connection_attempts: u32,
378    pub protocol_version: Option<String>,
379}
380
381impl SessionStatistics {
382    /// Check if session is healthy
383    pub fn is_healthy(&self) -> bool {
384        matches!(self.state, SessionState::Active) && self.idle_time < Duration::from_secs(300)
385    }
386
387    /// Get human-readable session status
388    pub fn status_summary(&self) -> String {
389        let session_display = match &self.session_id {
390            Some(id) => &id[..id.len().min(8)],
391            None => "None",
392        };
393        format!(
394            "Session {} ({}) - Duration: {:?}, Idle: {:?}, Attempts: {}",
395            session_display, self.state, self.duration, self.idle_time, self.connection_attempts
396        )
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403    use crate::config::ClientConfig;
404
405    #[tokio::test]
406    async fn test_session_lifecycle() {
407        let config = ClientConfig::default();
408        let manager = SessionManager::new(config);
409
410        // Initial state should be uninitialized
411        assert_eq!(manager.state().await, SessionState::Uninitialized);
412        assert!(!manager.is_ready().await);
413
414        // Mark as initializing
415        manager.mark_initializing().await.unwrap();
416        assert_eq!(manager.state().await, SessionState::Initializing);
417
418        // Initialize session
419        let client_caps = manager.create_client_capabilities();
420        let server_caps = ServerCapabilities {
421            experimental: None,
422            logging: None,
423            prompts: None,
424            resources: None,
425            tools: None,
426            completions: None,
427            elicitation: None,
428        };
429
430        manager
431            .initialize(client_caps, server_caps, "2025-06-18".to_string())
432            .await
433            .unwrap();
434        assert_eq!(manager.state().await, SessionState::Active);
435        assert!(manager.is_ready().await);
436
437        // Terminate session
438        manager.terminate(Some("test completed".to_string())).await;
439        assert_eq!(manager.state().await, SessionState::Terminated);
440        assert!(!manager.is_ready().await);
441    }
442
443    #[tokio::test]
444    async fn test_session_error_handling() {
445        let config = ClientConfig::default();
446        let manager = SessionManager::new(config);
447
448        manager.handle_error("test error".to_string()).await;
449
450        let SessionState::Error(msg) = manager.state().await else {
451            panic!("Expected error state, got: {:?}", manager.state().await);
452        };
453        assert_eq!(msg, "test error");
454    }
455
456    #[tokio::test]
457    async fn test_session_reset() {
458        let config = ClientConfig::default();
459        let manager = SessionManager::new(config);
460
461        // Set a mock session ID to simulate server initialization
462        manager
463            .set_session_id("test-session-id".to_string())
464            .await
465            .unwrap();
466        let _original_id = manager.session_id().await.unwrap();
467
468        manager.reset().await;
469
470        // After reset, session_id should return NotInitialized error
471        assert!(manager.session_id().await.is_err());
472        assert_eq!(manager.state().await, SessionState::Uninitialized);
473
474        // Verify the session ID was actually cleared
475        assert!(manager.session_id_optional().await.is_none());
476    }
477}