turul_http_mcp_server/
mcp_session.rs1use std::collections::HashMap;
7use std::time::{Duration, Instant};
8use tokio::sync::{Mutex, broadcast};
9use tracing::{info, warn};
10use uuid::Uuid;
11
12use crate::protocol::McpProtocolVersion;
13
14pub struct Session {
16 pub sender: broadcast::Sender<String>,
18 pub created: Instant,
20 pub version: McpProtocolVersion,
22}
23
24impl Session {
25 pub fn touch(&mut self) {
27 self.created = Instant::now();
28 }
29}
30
31pub struct SessionHandle {
33 pub session_id: String,
34 pub receiver: broadcast::Receiver<String>,
35}
36
37pub type SessionMap = Mutex<HashMap<String, Session>>;
38
39lazy_static::lazy_static! {
40 static ref SESSIONS: SessionMap = Mutex::new(HashMap::new());
42}
43
44pub async fn new_session(mcp_version: McpProtocolVersion) -> SessionHandle {
47 let session_id = Uuid::now_v7().to_string();
48 let (sender, receiver) = broadcast::channel(128);
50 let session = Session {
51 sender: sender.clone(),
52 created: Instant::now(),
53 version: mcp_version,
54 };
55 SESSIONS.lock().await.insert(session_id.clone(), session);
56 SessionHandle {
57 session_id,
58 receiver,
59 }
60}
61
62pub async fn get_sender(session_id: &str) -> Option<broadcast::Sender<String>> {
65 let mut sessions = SESSIONS.lock().await;
66 if let Some(session) = sessions.get_mut(session_id) {
67 session.touch();
69 return Some(session.sender.clone());
71 }
72 None
73}
74
75pub async fn get_receiver(session_id: &str) -> Option<broadcast::Receiver<String>> {
77 let mut sessions = SESSIONS.lock().await;
78 if let Some(session) = sessions.get_mut(session_id) {
79 session.touch();
80 return Some(session.sender.subscribe());
81 }
82 None
83}
84
85pub async fn session_exists(session_id: &str) -> bool {
87 let mut sessions = SESSIONS.lock().await;
88 if let Some(session) = sessions.get_mut(session_id) {
89 session.touch();
90 true
91 } else {
92 false
93 }
94}
95
96pub async fn remove_session(session_id: &str) -> bool {
99 SESSIONS.lock().await.remove(session_id).is_some()
100}
101
102pub async fn expire_old(max_age: Duration) {
104 let cutoff = Instant::now() - max_age;
105 let mut sessions = SESSIONS.lock().await;
106 sessions.retain(|sid, session| {
107 let alive = session.created >= cutoff;
108 if !alive {
109 info!("Session {} expired", sid);
110 }
111 alive
112 });
113}
114
115pub async fn send_to_session(session_id: &str, message: String) -> bool {
117 if let Some(sender) = get_sender(session_id).await {
118 sender.send(message).is_ok()
119 } else {
120 false
121 }
122}
123
124pub async fn broadcast_to_all(message: String) {
126 let sessions = SESSIONS.lock().await;
127 for (sid, session) in sessions.iter() {
128 warn!("Sending message: {} to session {}", message, sid);
129 let _ = session.sender.send(message.clone());
131 }
132}
133
134pub async fn disconnect_all() {
136 let mut sessions = SESSIONS.lock().await;
137 sessions.clear();
139 info!("All sessions have been disconnected");
140}
141
142pub async fn session_count() -> usize {
144 let sessions = SESSIONS.lock().await;
145 sessions.len()
146}
147
148pub fn spawn_session_cleanup() {
150 tokio::spawn(async {
151 let mut interval = tokio::time::interval(Duration::from_secs(60));
152 loop {
153 interval.tick().await;
154 expire_old(Duration::from_secs(30 * 60)).await; }
156 });
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162
163 #[tokio::test]
164 async fn test_session_lifecycle() {
165 let handle = new_session(McpProtocolVersion::V2025_06_18).await;
166 let session_id = handle.session_id.clone();
167
168 assert!(session_exists(&session_id).await);
170
171 assert!(get_sender(&session_id).await.is_some());
173
174 assert!(get_receiver(&session_id).await.is_some());
176
177 assert!(remove_session(&session_id).await);
179
180 assert!(!session_exists(&session_id).await);
182 }
183
184 #[tokio::test]
185 async fn test_session_messaging() {
186 let handle = new_session(McpProtocolVersion::V2025_06_18).await;
187 let session_id = handle.session_id.clone();
188
189 let message = r#"{"method":"test","params":{}}"#.to_string();
191 assert!(send_to_session(&session_id, message.clone()).await);
192
193 let mut receiver = handle.receiver;
195 let received = tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await;
196
197 assert!(received.is_ok());
198 assert_eq!(received.unwrap().unwrap(), message);
199 }
200}