1use serde_json::Value;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tokio::sync::RwLock;
7use tracing::{debug, info, warn};
8
9use crate::config::ClientConfig;
10use crate::error::{McpClientResult, SessionError};
11use turul_mcp_protocol::{
12 ClientCapabilities, Implementation, InitializeRequest, ServerCapabilities,
13};
14
15#[derive(Debug, Clone, PartialEq)]
17pub enum SessionState {
18 Uninitialized,
20 Initializing,
22 Active,
24 Reconnecting,
26 Terminated,
28 Error(String),
30}
31
32impl std::fmt::Display for SessionState {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 match self {
35 SessionState::Uninitialized => write!(f, "uninitialized"),
36 SessionState::Initializing => write!(f, "initializing"),
37 SessionState::Active => write!(f, "active"),
38 SessionState::Reconnecting => write!(f, "reconnecting"),
39 SessionState::Terminated => write!(f, "terminated"),
40 SessionState::Error(err) => write!(f, "error: {}", err),
41 }
42 }
43}
44
45#[derive(Debug, Clone)]
47pub struct SessionInfo {
48 pub session_id: Option<String>,
50
51 pub state: SessionState,
53
54 pub client_capabilities: Option<ClientCapabilities>,
56
57 pub server_capabilities: Option<ServerCapabilities>,
59
60 pub protocol_version: Option<String>,
62
63 pub created_at: Instant,
65
66 pub last_activity: Instant,
68
69 pub connection_attempts: u32,
71
72 pub metadata: Value,
74}
75
76impl SessionInfo {
77 pub fn new() -> Self {
79 let now = Instant::now();
80 Self {
81 session_id: None,
82 state: SessionState::Uninitialized,
83 client_capabilities: None,
84 server_capabilities: None,
85 protocol_version: None,
86 created_at: now,
87 last_activity: now,
88 connection_attempts: 0,
89 metadata: Value::Null,
90 }
91 }
92
93 pub fn update_activity(&mut self) {
95 self.last_activity = Instant::now();
96 }
97
98 pub fn duration(&self) -> Duration {
100 self.last_activity.duration_since(self.created_at)
101 }
102
103 pub fn idle_time(&self) -> Duration {
105 Instant::now().duration_since(self.last_activity)
106 }
107
108 pub fn is_active(&self) -> bool {
110 self.state == SessionState::Active
111 }
112
113 pub fn is_ready(&self) -> bool {
115 matches!(self.state, SessionState::Active)
116 }
117
118 pub fn needs_initialization(&self) -> bool {
120 matches!(self.state, SessionState::Uninitialized)
121 }
122}
123
124impl Default for SessionInfo {
125 fn default() -> Self {
126 Self::new()
127 }
128}
129
130#[derive(Debug)]
132pub struct SessionManager {
133 session: Arc<RwLock<SessionInfo>>,
135
136 config: ClientConfig,
138}
139
140impl SessionManager {
141 pub fn new(config: ClientConfig) -> Self {
143 Self {
144 session: Arc::new(RwLock::new(SessionInfo::new())),
145 config,
146 }
147 }
148
149 pub async fn session_info(&self) -> SessionInfo {
151 self.session.read().await.clone()
152 }
153
154 pub async fn session_id(&self) -> McpClientResult<String> {
156 let session = self.session.read().await;
157 session
158 .session_id
159 .clone()
160 .ok_or_else(|| SessionError::NotInitialized.into())
161 }
162
163 pub async fn session_id_optional(&self) -> Option<String> {
165 self.session.read().await.session_id.clone()
166 }
167
168 pub async fn set_session_id(&self, session_id: String) -> McpClientResult<()> {
170 let mut session = self.session.write().await;
171 session.session_id = Some(session_id);
172 Ok(())
173 }
174
175 pub async fn state(&self) -> SessionState {
177 self.session.read().await.state.clone()
178 }
179
180 pub async fn set_state(&self, state: SessionState) {
182 let mut session = self.session.write().await;
183 debug!("Session state transition: {} -> {}", session.state, state);
184 session.state = state;
185 session.update_activity();
186 }
187
188 pub async fn is_ready(&self) -> bool {
190 self.session.read().await.is_ready()
191 }
192
193 pub async fn initialize(
195 &self,
196 client_capabilities: ClientCapabilities,
197 server_capabilities: ServerCapabilities,
198 protocol_version: String,
199 ) -> McpClientResult<()> {
200 let mut session = self.session.write().await;
201
202 if !matches!(
203 session.state,
204 SessionState::Uninitialized | SessionState::Initializing
205 ) {
206 return Err(SessionError::AlreadyInitialized.into());
207 }
208
209 session.client_capabilities = Some(client_capabilities);
210 session.server_capabilities = Some(server_capabilities);
211 session.protocol_version = Some(protocol_version.clone());
212 session.state = SessionState::Active;
213 session.update_activity();
214
215 info!(
216 session_id = %session.session_id.as_deref().unwrap_or("None"),
217 protocol_version = %protocol_version,
218 "Session initialized successfully"
219 );
220
221 Ok(())
222 }
223
224 pub async fn mark_initializing(&self) -> McpClientResult<()> {
226 let mut session = self.session.write().await;
227
228 if !session.needs_initialization() {
229 return Err(SessionError::AlreadyInitialized.into());
230 }
231
232 session.state = SessionState::Initializing;
233 session.connection_attempts += 1;
234 session.update_activity();
235
236 debug!(
237 session_id = %session.session_id.as_deref().unwrap_or("None"),
238 attempt = session.connection_attempts,
239 "Session initialization started"
240 );
241
242 Ok(())
243 }
244
245 pub async fn terminate(&self, reason: Option<String>) {
247 let mut session = self.session.write().await;
248
249 let previous_state = session.state.clone();
250 session.state = SessionState::Terminated;
251 session.update_activity();
252
253 info!(
254 session_id = %session.session_id.as_deref().unwrap_or("None"),
255 previous_state = %previous_state,
256 reason = reason.as_deref().unwrap_or("user requested"),
257 "Session terminated"
258 );
259 }
260
261 pub async fn handle_error(&self, error: String) {
263 let mut session = self.session.write().await;
264
265 let previous_state = session.state.clone();
266 session.state = SessionState::Error(error.clone());
267 session.update_activity();
268
269 warn!(
270 session_id = %session.session_id.as_deref().unwrap_or("None"),
271 previous_state = %previous_state,
272 error = %error,
273 "Session encountered error"
274 );
275 }
276
277 pub async fn start_reconnection(&self) {
279 let mut session = self.session.write().await;
280
281 if matches!(session.state, SessionState::Terminated) {
282 return; }
284
285 session.state = SessionState::Reconnecting;
286 session.connection_attempts += 1;
287 session.update_activity();
288
289 info!(
290 session_id = %session.session_id.as_deref().unwrap_or("None"),
291 attempt = session.connection_attempts,
292 "Session reconnection started"
293 );
294 }
295
296 pub async fn reset(&self) {
298 let mut session = self.session.write().await;
299 *session = SessionInfo::new();
300
301 debug!(
302 session_id = %session.session_id.as_deref().unwrap_or("None"),
303 "Session reset for new connection"
304 );
305 }
306
307 pub async fn update_activity(&self) {
309 self.session.write().await.update_activity();
310 }
311
312 pub fn create_client_capabilities(&self) -> ClientCapabilities {
314 ClientCapabilities {
315 experimental: None,
316 sampling: None,
317 elicitation: None,
318 roots: None,
319 }
320 }
321
322 pub async fn create_initialize_request(&self) -> InitializeRequest {
324 let client_info = &self.config.client_info;
325
326 InitializeRequest {
327 protocol_version: "2025-06-18".to_string(),
328 capabilities: self.create_client_capabilities(),
329 client_info: Implementation {
330 name: client_info.name.clone(),
331 version: client_info.version.clone(),
332 title: None,
333 },
334 }
335 }
336
337 pub async fn validate_server_capabilities(
339 &self,
340 server_capabilities: &ServerCapabilities,
341 ) -> McpClientResult<()> {
342 debug!(
344 tools = ?server_capabilities.tools,
345 resources = ?server_capabilities.resources,
346 prompts = ?server_capabilities.prompts,
347 "Validating server capabilities"
348 );
349
350 Ok(())
353 }
354
355 pub async fn statistics(&self) -> SessionStatistics {
357 let session = self.session.read().await;
358
359 SessionStatistics {
360 session_id: session.session_id.clone(),
361 state: session.state.clone(),
362 duration: session.duration(),
363 idle_time: session.idle_time(),
364 connection_attempts: session.connection_attempts,
365 protocol_version: session.protocol_version.clone(),
366 }
367 }
368}
369
370#[derive(Debug, Clone)]
372pub struct SessionStatistics {
373 pub session_id: Option<String>,
374 pub state: SessionState,
375 pub duration: Duration,
376 pub idle_time: Duration,
377 pub connection_attempts: u32,
378 pub protocol_version: Option<String>,
379}
380
381impl SessionStatistics {
382 pub fn is_healthy(&self) -> bool {
384 matches!(self.state, SessionState::Active) && self.idle_time < Duration::from_secs(300)
385 }
386
387 pub fn status_summary(&self) -> String {
389 let session_display = match &self.session_id {
390 Some(id) => &id[..id.len().min(8)],
391 None => "None",
392 };
393 format!(
394 "Session {} ({}) - Duration: {:?}, Idle: {:?}, Attempts: {}",
395 session_display, self.state, self.duration, self.idle_time, self.connection_attempts
396 )
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403 use crate::config::ClientConfig;
404
405 #[tokio::test]
406 async fn test_session_lifecycle() {
407 let config = ClientConfig::default();
408 let manager = SessionManager::new(config);
409
410 assert_eq!(manager.state().await, SessionState::Uninitialized);
412 assert!(!manager.is_ready().await);
413
414 manager.mark_initializing().await.unwrap();
416 assert_eq!(manager.state().await, SessionState::Initializing);
417
418 let client_caps = manager.create_client_capabilities();
420 let server_caps = ServerCapabilities {
421 experimental: None,
422 logging: None,
423 prompts: None,
424 resources: None,
425 tools: None,
426 completions: None,
427 elicitation: None,
428 };
429
430 manager
431 .initialize(client_caps, server_caps, "2025-06-18".to_string())
432 .await
433 .unwrap();
434 assert_eq!(manager.state().await, SessionState::Active);
435 assert!(manager.is_ready().await);
436
437 manager.terminate(Some("test completed".to_string())).await;
439 assert_eq!(manager.state().await, SessionState::Terminated);
440 assert!(!manager.is_ready().await);
441 }
442
443 #[tokio::test]
444 async fn test_session_error_handling() {
445 let config = ClientConfig::default();
446 let manager = SessionManager::new(config);
447
448 manager.handle_error("test error".to_string()).await;
449
450 let SessionState::Error(msg) = manager.state().await else {
451 panic!("Expected error state, got: {:?}", manager.state().await);
452 };
453 assert_eq!(msg, "test error");
454 }
455
456 #[tokio::test]
457 async fn test_session_reset() {
458 let config = ClientConfig::default();
459 let manager = SessionManager::new(config);
460
461 manager
463 .set_session_id("test-session-id".to_string())
464 .await
465 .unwrap();
466 let _original_id = manager.session_id().await.unwrap();
467
468 manager.reset().await;
469
470 assert!(manager.session_id().await.is_err());
472 assert_eq!(manager.state().await, SessionState::Uninitialized);
473
474 assert!(manager.session_id_optional().await.is_none());
476 }
477}