turul_http_mcp_server/
stream_manager.rs

1//! Enhanced Stream Manager with MCP 2025-06-18 Resumability
2//!
3//! This module provides proper SSE stream management with:
4//! - Event IDs for resumability
5//! - Last-Event-ID header support
6//! - Per-session event targeting (not broadcast to all)
7//! - Event persistence and replay
8//! - Proper HTTP status codes and headers
9
10use bytes::Bytes;
11use futures::{Stream, StreamExt};
12use http_body_util::{BodyExt, StreamBody};
13use hyper::header::{ACCESS_CONTROL_ALLOW_ORIGIN, CACHE_CONTROL, CONTENT_TYPE};
14use hyper::{Response, StatusCode};
15use serde_json::Value;
16use std::collections::{HashMap, HashSet};
17use std::pin::Pin;
18use std::sync::Arc;
19use tokio::sync::{RwLock, mpsc};
20use tracing::{debug, error, info, warn};
21
22use turul_mcp_session_storage::SseEvent;
23
24/// Connection ID for tracking individual SSE streams
25pub type ConnectionId = String;
26pub type SessionConnections = HashMap<ConnectionId, mpsc::Sender<SseEvent>>;
27pub type ConnectionsMap = Arc<RwLock<HashMap<String, SessionConnections>>>;
28
29/// Enhanced stream manager with resumability support (MCP spec compliant)
30pub struct StreamManager {
31    /// Session storage backend for persistence
32    storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
33    /// Per-session connections for real-time events (MCP compliant - no broadcasting)
34    connections: ConnectionsMap,
35    /// Per-session notification subscriptions (what notifications each session wants)
36    subscriptions: Arc<RwLock<HashMap<String, HashSet<String>>>>,
37    /// Configuration
38    config: StreamConfig,
39    /// Unique instance ID for debugging
40    instance_id: String,
41}
42
43/// Configuration for stream management
44#[derive(Debug, Clone)]
45pub struct StreamConfig {
46    /// Channel buffer size for real-time broadcasting
47    pub channel_buffer_size: usize,
48    /// Maximum events to replay on reconnection
49    pub max_replay_events: usize,
50    /// Keep-alive interval in seconds
51    pub keepalive_interval_seconds: u64,
52    /// CORS configuration
53    pub cors_origin: String,
54}
55
56impl Default for StreamConfig {
57    fn default() -> Self {
58        Self {
59            channel_buffer_size: 1000,
60            max_replay_events: 100,
61            keepalive_interval_seconds: 30,
62            cors_origin: "*".to_string(),
63        }
64    }
65}
66
67/// SSE stream wrapper that formats events properly (MCP compliant - one connection per stream)
68pub struct SseStream {
69    /// Underlying event stream
70    stream: Option<Pin<Box<dyn Stream<Item = SseEvent> + Send>>>,
71    /// Session metadata
72    session_id: String,
73    /// Connection identifier (for MCP spec compliance)
74    connection_id: ConnectionId,
75}
76
77impl SseStream {
78    /// Get the session ID this stream belongs to
79    pub fn session_id(&self) -> &str {
80        &self.session_id
81    }
82
83    /// Get the connection ID for this stream
84    pub fn connection_id(&self) -> &str {
85        &self.connection_id
86    }
87
88    /// Get stream identifier for logging (session + connection)
89    pub fn stream_identifier(&self) -> String {
90        format!("{}:{}", self.session_id, self.connection_id)
91    }
92}
93
94impl Drop for SseStream {
95    fn drop(&mut self) {
96        debug!(
97            "DROP: SseStream - session={}, connection={}",
98            self.session_id, self.connection_id
99        );
100        if self.stream.is_some() {
101            debug!("Stream still present during drop - this indicates early cleanup");
102        } else {
103            debug!("Stream was properly extracted before drop");
104        }
105    }
106}
107
108/// Error type for stream management
109#[derive(Debug, thiserror::Error)]
110pub enum StreamError {
111    #[error("Session not found: {0}")]
112    SessionNotFound(String),
113    #[error("Stream not found: session={0}, stream={1}")]
114    StreamNotFound(String, String),
115    #[error("Storage error: {0}")]
116    StorageError(String),
117    #[error("Connection error: {0}")]
118    ConnectionError(String),
119    #[error("No connections available for session: {0}")]
120    NoConnections(String),
121    #[error("Session {0} not subscribed to notification type: {1}")]
122    NotSubscribed(String, String),
123}
124
125impl StreamManager {
126    /// Create new stream manager with session storage backend
127    pub fn new(storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>) -> Self {
128        Self::with_config(storage, StreamConfig::default())
129    }
130
131    /// Create stream manager with custom configuration
132    pub fn with_config(
133        storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
134        config: StreamConfig,
135    ) -> Self {
136        use uuid::Uuid;
137        let instance_id = Uuid::now_v7().to_string();
138        debug!("Creating StreamManager instance: {}", instance_id);
139        Self {
140            storage,
141            connections: Arc::new(RwLock::new(HashMap::new())),
142            subscriptions: Arc::new(RwLock::new(HashMap::new())),
143            config,
144            instance_id,
145        }
146    }
147
148    /// Handle SSE connection request with proper resumability
149    pub async fn handle_sse_connection(
150        &self,
151        session_id: String,
152        connection_id: ConnectionId,
153        last_event_id: Option<u64>,
154    ) -> Result<
155        Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>,
156        StreamError,
157    > {
158        info!(
159            "๐ŸŒŠ handle_sse_connection called: session={}, connection={}, last_event_id={:?}",
160            session_id, connection_id, last_event_id
161        );
162
163        // Verify session exists
164        if self
165            .storage
166            .get_session(&session_id)
167            .await
168            .map_err(|e| StreamError::StorageError(e.to_string()))?
169            .is_none()
170        {
171            return Err(StreamError::SessionNotFound(session_id));
172        }
173
174        // Create the SSE stream (one per connection, MCP compliant)
175        debug!(
176            "๐ŸŒŠ Creating SSE stream for session={}, connection={}",
177            session_id, connection_id
178        );
179        let sse_stream = self
180            .create_sse_stream(session_id.clone(), connection_id.clone(), last_event_id)
181            .await?;
182
183        // Convert to HTTP response
184        debug!("๐ŸŒŠ Converting SSE stream to HTTP response");
185        let response = self.stream_to_response(sse_stream).await;
186
187        debug!(
188            "Created SSE connection: session={}, connection={}, last_event_id={:?}",
189            session_id, connection_id, last_event_id
190        );
191
192        Ok(response)
193    }
194
195    /// Create SSE stream with resumability support (MCP compliant - no broadcast)
196    async fn create_sse_stream(
197        &self,
198        session_id: String,
199        connection_id: ConnectionId,
200        last_event_id: Option<u64>,
201    ) -> Result<SseStream, StreamError> {
202        // Create mpsc channel for this specific connection (MCP compliant)
203        let (sender, mut receiver) = mpsc::channel(self.config.channel_buffer_size);
204
205        // Register this connection with the session
206        self.register_connection(&session_id, connection_id.clone(), sender)
207            .await;
208
209        // Create the combined stream
210        let storage = self.storage.clone();
211        let session_id_clone = session_id.clone();
212        let connection_id_clone = connection_id.clone();
213        let config = self.config.clone();
214
215        let combined_stream = async_stream::stream! {
216            // 1. First, yield any historical events (resumability)
217            let after_id = last_event_id.unwrap_or(0);
218            debug!("๐ŸŒŠ Fetching events after ID {} for session={}, connection={}",
219                   after_id, session_id_clone, connection_id_clone);
220
221            match storage.get_events_after(&session_id_clone, after_id).await {
222                Ok(events) => {
223                    debug!("๐ŸŒŠ Found {} stored events to send", events.len());
224                    for event in events.into_iter().take(config.max_replay_events) {
225                        debug!("๐ŸŒŠ Yielding event: id={}, type={}", event.id, event.event_type);
226                        yield event;
227                    }
228                },
229                Err(e) => {
230                    error!("Failed to get historical events: {}", e);
231                    // Continue with real-time events even if historical replay fails
232                }
233            }
234
235            // 2. Then, stream real-time events from dedicated channel
236            let mut keepalive_interval = tokio::time::interval(
237                tokio::time::Duration::from_secs(config.keepalive_interval_seconds)
238            );
239
240            loop {
241                tokio::select! {
242                    // Real-time events from this connection's channel
243                    event = receiver.recv() => {
244                        match event {
245                            Some(event) => {
246                                debug!("Received event for connection {}: {}", connection_id_clone, event.event_type);
247                                yield event;
248                            },
249                            None => {
250                                debug!("Connection channel closed for session={}, connection={}", session_id_clone, connection_id_clone);
251                                break;
252                            }
253                        }
254                    },
255
256                    // Keep-alive pings (comment-style to preserve Last-Event-ID for resumability)
257                    _ = keepalive_interval.tick() => {
258                        let keepalive_event = SseEvent {
259                            id: 0, // Will be ignored - comment-style keepalives don't have id field
260                            timestamp: chrono::Utc::now().timestamp_millis() as u64,
261                            event_type: "keepalive".to_string(), // Triggers comment-style formatting
262                            data: serde_json::Value::Null, // No data for comment-style keepalives
263                            retry: None,
264                        };
265                        yield keepalive_event;
266                    }
267                }
268            }
269
270            // Clean up connection when stream ends
271            debug!("Cleaning up connection: session={}, connection={}", session_id_clone, connection_id_clone);
272        };
273
274        Ok(SseStream {
275            stream: Some(Box::pin(combined_stream)),
276            session_id,
277            connection_id,
278        })
279    }
280
281    /// Register a new connection for a session (MCP compliant)
282    async fn register_connection(
283        &self,
284        session_id: &str,
285        connection_id: ConnectionId,
286        sender: mpsc::Sender<SseEvent>,
287    ) {
288        let mut connections = self.connections.write().await;
289
290        debug!(
291            "[{}] ๐Ÿ” BEFORE registration: HashMap has {} sessions",
292            self.instance_id,
293            connections.len()
294        );
295        for (sid, conns) in connections.iter() {
296            debug!(
297                "[{}] ๐Ÿ” Existing session before: {} with {} connections",
298                self.instance_id,
299                sid,
300                conns.len()
301            );
302        }
303
304        // Get or create session entry
305        let session_connections = connections
306            .entry(session_id.to_string())
307            .or_insert_with(HashMap::new);
308
309        // Add this connection
310        session_connections.insert(connection_id.clone(), sender);
311
312        debug!(
313            "[{}] ๐Ÿ”— Registered connection: session={}, connection={}, total_connections={}",
314            self.instance_id,
315            session_id,
316            connection_id,
317            session_connections.len()
318        );
319
320        debug!(
321            "[{}] ๐Ÿ” AFTER registration: HashMap has {} sessions",
322            self.instance_id,
323            connections.len()
324        );
325        for (sid, conns) in connections.iter() {
326            debug!(
327                "[{}] ๐Ÿ” Session after: {} with {} connections",
328                self.instance_id,
329                sid,
330                conns.len()
331            );
332        }
333    }
334
335    /// Register a streaming connection to receive events for a session (public API for POST streaming)
336    pub async fn register_streaming_connection(
337        &self,
338        session_id: &str,
339        connection_id: ConnectionId,
340        sender: mpsc::Sender<SseEvent>,
341    ) -> Result<(), StreamError> {
342        // Verify session exists first
343        if self
344            .storage
345            .get_session(session_id)
346            .await
347            .map_err(|e| StreamError::StorageError(e.to_string()))?
348            .is_none()
349        {
350            return Err(StreamError::SessionNotFound(session_id.to_string()));
351        }
352
353        self.register_connection(session_id, connection_id, sender)
354            .await;
355        Ok(())
356    }
357
358    /// Remove a connection when it's closed
359    pub async fn unregister_connection(&self, session_id: &str, connection_id: &ConnectionId) {
360        debug!(
361            "๐Ÿ”ด UNREGISTER called for session={}, connection={}",
362            session_id, connection_id
363        );
364        let mut connections = self.connections.write().await;
365
366        debug!(
367            "๐Ÿ” BEFORE unregister: HashMap has {} sessions",
368            connections.len()
369        );
370
371        if let Some(session_connections) = connections.get_mut(session_id)
372            && session_connections.remove(connection_id).is_some()
373        {
374            debug!(
375                "๐Ÿ”Œ Unregistered connection: session={}, connection={}",
376                session_id, connection_id
377            );
378
379            // Clean up empty sessions
380            if session_connections.is_empty() {
381                connections.remove(session_id);
382                debug!("๐Ÿงน Removed empty session: {}", session_id);
383            }
384        }
385
386        debug!(
387            "๐Ÿ” AFTER unregister: HashMap has {} sessions",
388            connections.len()
389        );
390    }
391
392    /// Close all SSE connections for a session (useful for session termination)
393    pub async fn close_session_connections(&self, session_id: &str) -> usize {
394        debug!("๐Ÿ”ด Closing all connections for session: {}", session_id);
395        let mut connections = self.connections.write().await;
396
397        let closed_count = if let Some(session_connections) = connections.remove(session_id) {
398            let count = session_connections.len();
399            debug!(
400                "๐Ÿ”Œ Closed {} SSE connections for session: {}",
401                count, session_id
402            );
403            count
404        } else {
405            debug!("๐Ÿ” No SSE connections found for session: {}", session_id);
406            0
407        };
408
409        // Also clear subscriptions for this session
410        self.clear_subscriptions(session_id).await;
411
412        debug!("๐Ÿงน Session {} removed from stream manager", session_id);
413        closed_count
414    }
415
416    /// Convert SSE stream to HTTP response with proper headers
417    async fn stream_to_response(
418        &self,
419        mut sse_stream: SseStream,
420    ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>> {
421        // Extract session info before moving the stream
422        let session_id = sse_stream.session_id().to_string();
423        let stream_identifier = sse_stream.stream_identifier();
424
425        // Log stream creation with session identifier
426        debug!(
427            "Converting SSE stream to HTTP response: {}",
428            stream_identifier
429        );
430        debug!("Stream details: session_id={}", session_id);
431
432        // Transform events to SSE format and create proper HTTP frames
433        // Extract stream from Option wrapper
434        let stream = sse_stream
435            .stream
436            .take()
437            .expect("Stream should be present in SseStream");
438
439        let formatted_stream = stream.map(|event| {
440            let sse_formatted = event.format();
441            debug!(
442                "๐Ÿ“ก Streaming SSE event: id={}, event_type={}",
443                event.id, event.event_type
444            );
445            Ok(hyper::body::Frame::data(Bytes::from(sse_formatted)))
446        });
447
448        // Create streaming body from the actual event stream and box it
449        let body = StreamBody::new(formatted_stream).boxed_unsync();
450
451        // Build response with proper SSE headers for streaming
452        Response::builder()
453            .status(StatusCode::OK)
454            .header(CONTENT_TYPE, "text/event-stream")
455            .header(CACHE_CONTROL, "no-cache")
456            .header(ACCESS_CONTROL_ALLOW_ORIGIN, &self.config.cors_origin)
457            .header("Connection", "keep-alive")
458            .body(body)
459            .unwrap()
460    }
461
462    /// Check if a session has any active SSE connections
463    pub async fn has_connections(&self, session_id: &str) -> bool {
464        let connections = self.connections.read().await;
465        connections
466            .get(session_id)
467            .map(|session_connections| !session_connections.is_empty())
468            .unwrap_or(false)
469    }
470
471    /// Send event to specific session (MCP compliant - ONE connection only)
472    pub async fn broadcast_to_session(
473        &self,
474        session_id: &str,
475        event_type: String,
476        data: Value,
477    ) -> Result<u64, StreamError> {
478        self.broadcast_to_session_with_options(session_id, event_type, data, true)
479            .await
480    }
481
482    /// Send event to specific session with option to suppress when no connections exist
483    pub async fn broadcast_to_session_with_options(
484        &self,
485        session_id: &str,
486        event_type: String,
487        data: Value,
488        store_when_no_connections: bool,
489    ) -> Result<u64, StreamError> {
490        // Check subscription filtering first
491        let is_subscribed = self.is_subscribed(session_id, &event_type).await;
492        info!(
493            "๐Ÿ” Subscription check: session={}, event_type={}, is_subscribed={}",
494            session_id, event_type, is_subscribed
495        );
496        if !is_subscribed {
497            warn!(
498                "๐Ÿšซ Session {} not subscribed to notification type: {}",
499                session_id, event_type
500            );
501            return Err(StreamError::NotSubscribed(
502                session_id.to_string(),
503                event_type,
504            ));
505        }
506
507        // Check if we should suppress notifications when no connections exist
508        if !store_when_no_connections && !self.has_connections(session_id).await {
509            debug!(
510                "๐Ÿšซ Suppressing notification for session {} (no connections, store_when_no_connections=false)",
511                session_id
512            );
513            return Err(StreamError::NoConnections(session_id.to_string()));
514        }
515
516        // Create the event
517        let event = SseEvent::new(event_type.clone(), data);
518
519        // Store event for resumability (always store for compliant clients)
520        let stored_event = self
521            .storage
522            .store_event(session_id, event)
523            .await
524            .map_err(|e| StreamError::StorageError(e.to_string()))?;
525
526        // DEBUG: Check connection state more thoroughly
527        let connections = self.connections.read().await;
528        debug!(
529            "[{}] ๐Ÿ” Checking connections for session {}: connections hashmap has {} sessions",
530            self.instance_id,
531            session_id,
532            connections.len()
533        );
534
535        if let Some(session_connections) = connections.get(session_id) {
536            debug!(
537                "๐Ÿ” Session {} found with {} connections",
538                session_id,
539                session_connections.len()
540            );
541
542            if !session_connections.is_empty() {
543                // Pick the FIRST available connection (MCP compliant)
544                let (selected_connection_id, selected_sender) =
545                    session_connections.iter().next().unwrap();
546
547                // Check if sender is closed
548                if selected_sender.is_closed() {
549                    warn!(
550                        "๐Ÿ”Œ Sender is closed for connection: session={}, connection={}",
551                        session_id, selected_connection_id
552                    );
553                    debug!("๐Ÿ“ญ Connection sender was closed, event stored for reconnection");
554                } else {
555                    debug!(
556                        "โœ… Sender is open, attempting to send to connection: session={}, connection={}",
557                        session_id, selected_connection_id
558                    );
559
560                    match selected_sender.try_send(stored_event.clone()) {
561                        Ok(()) => {
562                            debug!(
563                                "Sent notification to ONE connection: session={}, connection={}, event_id={}, method={}",
564                                session_id,
565                                selected_connection_id,
566                                stored_event.id,
567                                stored_event.event_type
568                            );
569                        }
570                        Err(mpsc::error::TrySendError::Full(_)) => {
571                            warn!(
572                                "โš ๏ธ Connection buffer full: session={}, connection={}",
573                                session_id, selected_connection_id
574                            );
575                            // Event is still stored for reconnection
576                        }
577                        Err(mpsc::error::TrySendError::Closed(_)) => {
578                            warn!(
579                                "๐Ÿ”Œ Connection closed during send: session={}, connection={}",
580                                session_id, selected_connection_id
581                            );
582                            // Event is still stored for reconnection
583                        }
584                    }
585                }
586            } else {
587                debug!(
588                    "๐Ÿ“ญ No active connections for session: {} (event stored for reconnection)",
589                    session_id
590                );
591            }
592        } else {
593            debug!(
594                "๐Ÿ“ญ No connections registered for session: {} (event stored for reconnection)",
595                session_id
596            );
597
598            // DEBUG: List all sessions in connections
599            for (sid, conns) in connections.iter() {
600                debug!(
601                    "๐Ÿ” Available session: {} with {} connections",
602                    sid,
603                    conns.len()
604                );
605            }
606        }
607
608        Ok(stored_event.id)
609    }
610
611    /// Broadcast to all sessions (for server-wide notifications)
612    pub async fn broadcast_to_all_sessions(
613        &self,
614        event_type: String,
615        data: Value,
616    ) -> Result<Vec<String>, StreamError> {
617        // Get all session IDs
618        let session_ids = self
619            .storage
620            .list_sessions()
621            .await
622            .map_err(|e| StreamError::StorageError(e.to_string()))?;
623
624        let mut failed_sessions = Vec::new();
625
626        for session_id in session_ids {
627            if let Err(e) = self
628                .broadcast_to_session(&session_id, event_type.clone(), data.clone())
629                .await
630            {
631                error!("Failed to broadcast to session {}: {}", session_id, e);
632                failed_sessions.push(session_id);
633            }
634        }
635
636        Ok(failed_sessions)
637    }
638
639    /// Clean up closed connections
640    pub async fn cleanup_connections(&self) -> usize {
641        debug!("๐Ÿงน CLEANUP_CONNECTIONS called");
642        let mut connections = self.connections.write().await;
643        let mut total_cleaned = 0;
644
645        debug!(
646            "๐Ÿ” BEFORE cleanup: HashMap has {} sessions",
647            connections.len()
648        );
649
650        // Clean up closed connections
651        connections.retain(|session_id, session_connections| {
652            let initial_count = session_connections.len();
653
654            // Remove closed connections
655            session_connections.retain(|connection_id, sender| {
656                if sender.is_closed() {
657                    debug!(
658                        "๐Ÿงน Cleaned up closed connection: session={}, connection={}",
659                        session_id, connection_id
660                    );
661                    false
662                } else {
663                    true
664                }
665            });
666
667            let cleaned_count = initial_count - session_connections.len();
668            total_cleaned += cleaned_count;
669
670            // Keep session if it has active connections
671            !session_connections.is_empty()
672        });
673
674        if total_cleaned > 0 {
675            debug!("Cleaned up {} inactive connections", total_cleaned);
676        }
677
678        total_cleaned
679    }
680
681    /// Create SSE stream for POST requests (MCP Streamable HTTP)
682    pub async fn create_post_sse_stream(
683        &self,
684        session_id: String,
685        response: turul_mcp_json_rpc_server::JsonRpcResponse,
686    ) -> Result<
687        hyper::Response<
688            http_body_util::combinators::BoxBody<bytes::Bytes, std::convert::Infallible>,
689        >,
690        StreamError,
691    > {
692        // Verify session exists
693        if self
694            .storage
695            .get_session(&session_id)
696            .await
697            .map_err(|e| StreamError::StorageError(e.to_string()))?
698            .is_none()
699        {
700            return Err(StreamError::SessionNotFound(session_id));
701        }
702
703        debug!("Creating POST SSE stream for session: {}", session_id);
704
705        // Create the SSE response body
706        let response_json = serde_json::to_string(&response).map_err(|e| {
707            StreamError::StorageError(format!("Failed to serialize response: {}", e))
708        })?;
709
710        // 1. Include recent notifications that were generated during tool execution
711        // Tool execution is fully awaited, and storage writes use consistent reads,
712        // so all notifications should be immediately available
713        let mut sse_frames = Vec::new();
714        let mut event_id_counter = 1;
715
716        if let Ok(events) = self.storage.get_recent_events(&session_id, 10).await {
717            for event in events {
718                // Convert stored SSE event to notification JSON-RPC format
719                if event.event_type != "ping" {
720                    // Skip keepalive events
721                    // Use "message" event type for MCP Inspector compatibility
722                    // The JSON-RPC method is already in the data payload
723                    let notification_sse = format!(
724                        "id: {}\nevent: message\ndata: {}\n\n",
725                        event_id_counter, event.data
726                    );
727                    debug!(
728                        "๐Ÿ“ค Including notification in POST SSE stream: id={}, json_rpc_method={}",
729                        event_id_counter, event.event_type
730                    );
731                    sse_frames.push(http_body::Frame::data(Bytes::from(notification_sse)));
732                    event_id_counter += 1;
733                }
734            }
735        }
736
737        // 2. Add the JSON-RPC tool response
738        // Use "message" event type for MCP Inspector compatibility
739        let response_sse = format!(
740            "id: {}\nevent: message\ndata: {}\n\n",
741            event_id_counter, response_json
742        );
743        debug!(
744            "๐Ÿ“ค Sending JSON-RPC response as SSE event: id={}, event=message",
745            event_id_counter
746        );
747        sse_frames.push(http_body::Frame::data(Bytes::from(response_sse)));
748
749        // Create a simple stream from the collected frames
750        let stream = futures::stream::iter(
751            sse_frames
752                .into_iter()
753                .map(Ok::<_, std::convert::Infallible>),
754        );
755
756        // Create StreamBody from the stream and box it for type erasure
757        let body = StreamBody::new(stream);
758        let boxed_body = http_body_util::combinators::BoxBody::new(body);
759
760        debug!(
761            "๐Ÿ“ก POST SSE streaming response created: session={}",
762            session_id
763        );
764
765        // Build response with proper SSE headers including MCP session ID
766        Ok(hyper::Response::builder()
767            .status(hyper::StatusCode::OK)
768            .header(hyper::header::CONTENT_TYPE, "text/event-stream")
769            .header(hyper::header::CACHE_CONTROL, "no-cache")
770            .header(
771                hyper::header::ACCESS_CONTROL_ALLOW_ORIGIN,
772                &self.config.cors_origin,
773            )
774            .header("Connection", "keep-alive")
775            .header("X-Accel-Buffering", "no") // Prevent proxy buffering
776            .header("Mcp-Session-Id", &session_id)
777            .body(boxed_body)
778            .unwrap())
779    }
780
781    /// Subscribe a session to specific notification types
782    pub async fn subscribe_to_notifications(
783        &self,
784        session_id: &str,
785        notification_types: Vec<String>,
786    ) {
787        let mut subscriptions = self.subscriptions.write().await;
788        let session_subscriptions = subscriptions
789            .entry(session_id.to_string())
790            .or_insert_with(HashSet::new);
791
792        for notification_type in notification_types {
793            session_subscriptions.insert(notification_type.clone());
794            debug!(
795                "๐Ÿ“ Session {} subscribed to notification: {}",
796                session_id, notification_type
797            );
798        }
799
800        debug!(
801            "Session {} now has {} subscriptions",
802            session_id,
803            session_subscriptions.len()
804        );
805    }
806
807    /// Unsubscribe a session from specific notification types
808    pub async fn unsubscribe_from_notifications(
809        &self,
810        session_id: &str,
811        notification_types: Vec<String>,
812    ) {
813        let mut subscriptions = self.subscriptions.write().await;
814        if let Some(session_subscriptions) = subscriptions.get_mut(session_id) {
815            for notification_type in notification_types {
816                if session_subscriptions.remove(&notification_type) {
817                    debug!(
818                        "๐Ÿ“ Session {} unsubscribed from notification: {}",
819                        session_id, notification_type
820                    );
821                }
822            }
823
824            // Remove session entry if no subscriptions remain
825            if session_subscriptions.is_empty() {
826                subscriptions.remove(session_id);
827                debug!(
828                    "๐Ÿ—‘๏ธ Removed subscription entry for session {} (no remaining subscriptions)",
829                    session_id
830                );
831            }
832        }
833    }
834
835    /// Check if a session is subscribed to a specific notification type
836    pub async fn is_subscribed(&self, session_id: &str, notification_type: &str) -> bool {
837        let subscriptions = self.subscriptions.read().await;
838        subscriptions
839            .get(session_id)
840            .map(|session_subscriptions| session_subscriptions.contains(notification_type))
841            .unwrap_or(true) // Default: allow all notifications if no explicit subscriptions
842    }
843
844    /// Get all subscriptions for a session
845    pub async fn get_subscriptions(&self, session_id: &str) -> HashSet<String> {
846        let subscriptions = self.subscriptions.read().await;
847        subscriptions.get(session_id).cloned().unwrap_or_default()
848    }
849
850    /// Clear all subscriptions for a session (used during session cleanup)
851    pub async fn clear_subscriptions(&self, session_id: &str) {
852        let mut subscriptions = self.subscriptions.write().await;
853        if subscriptions.remove(session_id).is_some() {
854            debug!("๐Ÿ—‘๏ธ Cleared all subscriptions for session: {}", session_id);
855        }
856    }
857
858    /// Get the stream configuration (for testing and debugging)
859    pub fn get_config(&self) -> &StreamConfig {
860        &self.config
861    }
862
863    /// Get statistics about active streams
864    pub async fn get_stats(&self) -> StreamStats {
865        let connections = self.connections.read().await;
866        let session_count = self.storage.session_count().await.unwrap_or(0);
867        let event_count = self.storage.event_count().await.unwrap_or(0);
868
869        // Count total active connections
870        let total_connections: usize = connections
871            .values()
872            .map(|session_connections| session_connections.len())
873            .sum();
874
875        StreamStats {
876            active_broadcasters: total_connections, // Now tracks active connections
877            total_sessions: session_count,
878            total_events: event_count,
879            channel_buffer_size: self.config.channel_buffer_size,
880        }
881    }
882}
883
884impl Drop for StreamManager {
885    fn drop(&mut self) {
886        debug!(
887            "DROP: StreamManager instance {} - this may cause connection loss!",
888            self.instance_id
889        );
890        debug!("If this appears during request processing, it indicates architecture problem");
891    }
892}
893
894/// Stream manager statistics
895#[derive(Debug, Clone)]
896pub struct StreamStats {
897    pub active_broadcasters: usize,
898    pub total_sessions: usize,
899    pub total_events: usize,
900    pub channel_buffer_size: usize,
901}
902
903// Helper to create async stream
904#[cfg(not(test))]
905use async_stream;
906
907#[cfg(test)]
908mod tests {
909    use super::*;
910    use turul_mcp_protocol::ServerCapabilities;
911    use turul_mcp_session_storage::{InMemorySessionStorage, SessionStorage};
912
913    #[tokio::test]
914    async fn test_stream_manager_creation() {
915        let storage = Arc::new(InMemorySessionStorage::new());
916        let manager = StreamManager::new(storage);
917
918        let stats = manager.get_stats().await;
919        assert_eq!(stats.active_broadcasters, 0);
920        assert_eq!(stats.total_sessions, 0);
921    }
922
923    #[tokio::test]
924    async fn test_broadcast_to_session() {
925        let storage = Arc::new(InMemorySessionStorage::new());
926        let manager = StreamManager::new(storage.clone());
927
928        // Create a session
929        let session = storage
930            .create_session(ServerCapabilities::default())
931            .await
932            .unwrap();
933        let session_id = session.session_id.clone();
934
935        // Broadcast an event
936        let event_id = manager
937            .broadcast_to_session(
938                &session_id,
939                "test".to_string(),
940                serde_json::json!({"message": "test"}),
941            )
942            .await
943            .unwrap();
944
945        assert!(event_id > 0);
946
947        // Verify event was stored
948        let events = storage.get_events_after(&session_id, 0).await.unwrap();
949        assert_eq!(events.len(), 1);
950        assert_eq!(events[0].id, event_id);
951    }
952}