1use bytes::Bytes;
11use futures::{Stream, StreamExt};
12use http_body_util::{BodyExt, StreamBody};
13use hyper::header::{ACCESS_CONTROL_ALLOW_ORIGIN, CACHE_CONTROL, CONTENT_TYPE};
14use hyper::{Response, StatusCode};
15use serde_json::Value;
16use std::collections::{HashMap, HashSet};
17use std::pin::Pin;
18use std::sync::Arc;
19use tokio::sync::{RwLock, mpsc};
20use tracing::{debug, error, info, warn};
21
22use turul_mcp_session_storage::SseEvent;
23
24pub type ConnectionId = String;
26pub type SessionConnections = HashMap<ConnectionId, mpsc::Sender<SseEvent>>;
27pub type ConnectionsMap = Arc<RwLock<HashMap<String, SessionConnections>>>;
28
29pub struct StreamManager {
31 storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
33 connections: ConnectionsMap,
35 subscriptions: Arc<RwLock<HashMap<String, HashSet<String>>>>,
37 config: StreamConfig,
39 instance_id: String,
41}
42
43#[derive(Debug, Clone)]
45pub struct StreamConfig {
46 pub channel_buffer_size: usize,
48 pub max_replay_events: usize,
50 pub keepalive_interval_seconds: u64,
52 pub cors_origin: String,
54}
55
56impl Default for StreamConfig {
57 fn default() -> Self {
58 Self {
59 channel_buffer_size: 1000,
60 max_replay_events: 100,
61 keepalive_interval_seconds: 30,
62 cors_origin: "*".to_string(),
63 }
64 }
65}
66
67pub struct SseStream {
69 stream: Option<Pin<Box<dyn Stream<Item = SseEvent> + Send>>>,
71 session_id: String,
73 connection_id: ConnectionId,
75}
76
77impl SseStream {
78 pub fn session_id(&self) -> &str {
80 &self.session_id
81 }
82
83 pub fn connection_id(&self) -> &str {
85 &self.connection_id
86 }
87
88 pub fn stream_identifier(&self) -> String {
90 format!("{}:{}", self.session_id, self.connection_id)
91 }
92}
93
94impl Drop for SseStream {
95 fn drop(&mut self) {
96 debug!(
97 "DROP: SseStream - session={}, connection={}",
98 self.session_id, self.connection_id
99 );
100 if self.stream.is_some() {
101 debug!("Stream still present during drop - this indicates early cleanup");
102 } else {
103 debug!("Stream was properly extracted before drop");
104 }
105 }
106}
107
108#[derive(Debug, thiserror::Error)]
110pub enum StreamError {
111 #[error("Session not found: {0}")]
112 SessionNotFound(String),
113 #[error("Stream not found: session={0}, stream={1}")]
114 StreamNotFound(String, String),
115 #[error("Storage error: {0}")]
116 StorageError(String),
117 #[error("Connection error: {0}")]
118 ConnectionError(String),
119 #[error("No connections available for session: {0}")]
120 NoConnections(String),
121 #[error("Session {0} not subscribed to notification type: {1}")]
122 NotSubscribed(String, String),
123}
124
125impl StreamManager {
126 pub fn new(storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>) -> Self {
128 Self::with_config(storage, StreamConfig::default())
129 }
130
131 pub fn with_config(
133 storage: Arc<turul_mcp_session_storage::BoxedSessionStorage>,
134 config: StreamConfig,
135 ) -> Self {
136 use uuid::Uuid;
137 let instance_id = Uuid::now_v7().to_string();
138 debug!("Creating StreamManager instance: {}", instance_id);
139 Self {
140 storage,
141 connections: Arc::new(RwLock::new(HashMap::new())),
142 subscriptions: Arc::new(RwLock::new(HashMap::new())),
143 config,
144 instance_id,
145 }
146 }
147
148 pub async fn handle_sse_connection(
150 &self,
151 session_id: String,
152 connection_id: ConnectionId,
153 last_event_id: Option<u64>,
154 ) -> Result<
155 Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>>,
156 StreamError,
157 > {
158 info!(
159 "๐ handle_sse_connection called: session={}, connection={}, last_event_id={:?}",
160 session_id, connection_id, last_event_id
161 );
162
163 if self
165 .storage
166 .get_session(&session_id)
167 .await
168 .map_err(|e| StreamError::StorageError(e.to_string()))?
169 .is_none()
170 {
171 return Err(StreamError::SessionNotFound(session_id));
172 }
173
174 debug!(
176 "๐ Creating SSE stream for session={}, connection={}",
177 session_id, connection_id
178 );
179 let sse_stream = self
180 .create_sse_stream(session_id.clone(), connection_id.clone(), last_event_id)
181 .await?;
182
183 debug!("๐ Converting SSE stream to HTTP response");
185 let response = self.stream_to_response(sse_stream).await;
186
187 debug!(
188 "Created SSE connection: session={}, connection={}, last_event_id={:?}",
189 session_id, connection_id, last_event_id
190 );
191
192 Ok(response)
193 }
194
195 async fn create_sse_stream(
197 &self,
198 session_id: String,
199 connection_id: ConnectionId,
200 last_event_id: Option<u64>,
201 ) -> Result<SseStream, StreamError> {
202 let (sender, mut receiver) = mpsc::channel(self.config.channel_buffer_size);
204
205 self.register_connection(&session_id, connection_id.clone(), sender)
207 .await;
208
209 let storage = self.storage.clone();
211 let session_id_clone = session_id.clone();
212 let connection_id_clone = connection_id.clone();
213 let config = self.config.clone();
214
215 let combined_stream = async_stream::stream! {
216 let after_id = last_event_id.unwrap_or(0);
218 debug!("๐ Fetching events after ID {} for session={}, connection={}",
219 after_id, session_id_clone, connection_id_clone);
220
221 match storage.get_events_after(&session_id_clone, after_id).await {
222 Ok(events) => {
223 debug!("๐ Found {} stored events to send", events.len());
224 for event in events.into_iter().take(config.max_replay_events) {
225 debug!("๐ Yielding event: id={}, type={}", event.id, event.event_type);
226 yield event;
227 }
228 },
229 Err(e) => {
230 error!("Failed to get historical events: {}", e);
231 }
233 }
234
235 let mut keepalive_interval = tokio::time::interval(
237 tokio::time::Duration::from_secs(config.keepalive_interval_seconds)
238 );
239
240 loop {
241 tokio::select! {
242 event = receiver.recv() => {
244 match event {
245 Some(event) => {
246 debug!("Received event for connection {}: {}", connection_id_clone, event.event_type);
247 yield event;
248 },
249 None => {
250 debug!("Connection channel closed for session={}, connection={}", session_id_clone, connection_id_clone);
251 break;
252 }
253 }
254 },
255
256 _ = keepalive_interval.tick() => {
258 let keepalive_event = SseEvent {
259 id: 0, timestamp: chrono::Utc::now().timestamp_millis() as u64,
261 event_type: "keepalive".to_string(), data: serde_json::Value::Null, retry: None,
264 };
265 yield keepalive_event;
266 }
267 }
268 }
269
270 debug!("Cleaning up connection: session={}, connection={}", session_id_clone, connection_id_clone);
272 };
273
274 Ok(SseStream {
275 stream: Some(Box::pin(combined_stream)),
276 session_id,
277 connection_id,
278 })
279 }
280
281 async fn register_connection(
283 &self,
284 session_id: &str,
285 connection_id: ConnectionId,
286 sender: mpsc::Sender<SseEvent>,
287 ) {
288 let mut connections = self.connections.write().await;
289
290 debug!(
291 "[{}] ๐ BEFORE registration: HashMap has {} sessions",
292 self.instance_id,
293 connections.len()
294 );
295 for (sid, conns) in connections.iter() {
296 debug!(
297 "[{}] ๐ Existing session before: {} with {} connections",
298 self.instance_id,
299 sid,
300 conns.len()
301 );
302 }
303
304 let session_connections = connections
306 .entry(session_id.to_string())
307 .or_insert_with(HashMap::new);
308
309 session_connections.insert(connection_id.clone(), sender);
311
312 debug!(
313 "[{}] ๐ Registered connection: session={}, connection={}, total_connections={}",
314 self.instance_id,
315 session_id,
316 connection_id,
317 session_connections.len()
318 );
319
320 debug!(
321 "[{}] ๐ AFTER registration: HashMap has {} sessions",
322 self.instance_id,
323 connections.len()
324 );
325 for (sid, conns) in connections.iter() {
326 debug!(
327 "[{}] ๐ Session after: {} with {} connections",
328 self.instance_id,
329 sid,
330 conns.len()
331 );
332 }
333 }
334
335 pub async fn register_streaming_connection(
337 &self,
338 session_id: &str,
339 connection_id: ConnectionId,
340 sender: mpsc::Sender<SseEvent>,
341 ) -> Result<(), StreamError> {
342 if self
344 .storage
345 .get_session(session_id)
346 .await
347 .map_err(|e| StreamError::StorageError(e.to_string()))?
348 .is_none()
349 {
350 return Err(StreamError::SessionNotFound(session_id.to_string()));
351 }
352
353 self.register_connection(session_id, connection_id, sender)
354 .await;
355 Ok(())
356 }
357
358 pub async fn unregister_connection(&self, session_id: &str, connection_id: &ConnectionId) {
360 debug!(
361 "๐ด UNREGISTER called for session={}, connection={}",
362 session_id, connection_id
363 );
364 let mut connections = self.connections.write().await;
365
366 debug!(
367 "๐ BEFORE unregister: HashMap has {} sessions",
368 connections.len()
369 );
370
371 if let Some(session_connections) = connections.get_mut(session_id)
372 && session_connections.remove(connection_id).is_some()
373 {
374 debug!(
375 "๐ Unregistered connection: session={}, connection={}",
376 session_id, connection_id
377 );
378
379 if session_connections.is_empty() {
381 connections.remove(session_id);
382 debug!("๐งน Removed empty session: {}", session_id);
383 }
384 }
385
386 debug!(
387 "๐ AFTER unregister: HashMap has {} sessions",
388 connections.len()
389 );
390 }
391
392 pub async fn close_session_connections(&self, session_id: &str) -> usize {
394 debug!("๐ด Closing all connections for session: {}", session_id);
395 let mut connections = self.connections.write().await;
396
397 let closed_count = if let Some(session_connections) = connections.remove(session_id) {
398 let count = session_connections.len();
399 debug!(
400 "๐ Closed {} SSE connections for session: {}",
401 count, session_id
402 );
403 count
404 } else {
405 debug!("๐ No SSE connections found for session: {}", session_id);
406 0
407 };
408
409 self.clear_subscriptions(session_id).await;
411
412 debug!("๐งน Session {} removed from stream manager", session_id);
413 closed_count
414 }
415
416 async fn stream_to_response(
418 &self,
419 mut sse_stream: SseStream,
420 ) -> Response<http_body_util::combinators::UnsyncBoxBody<Bytes, hyper::Error>> {
421 let session_id = sse_stream.session_id().to_string();
423 let stream_identifier = sse_stream.stream_identifier();
424
425 debug!(
427 "Converting SSE stream to HTTP response: {}",
428 stream_identifier
429 );
430 debug!("Stream details: session_id={}", session_id);
431
432 let stream = sse_stream
435 .stream
436 .take()
437 .expect("Stream should be present in SseStream");
438
439 let formatted_stream = stream.map(|event| {
440 let sse_formatted = event.format();
441 debug!(
442 "๐ก Streaming SSE event: id={}, event_type={}",
443 event.id, event.event_type
444 );
445 Ok(hyper::body::Frame::data(Bytes::from(sse_formatted)))
446 });
447
448 let body = StreamBody::new(formatted_stream).boxed_unsync();
450
451 Response::builder()
453 .status(StatusCode::OK)
454 .header(CONTENT_TYPE, "text/event-stream")
455 .header(CACHE_CONTROL, "no-cache")
456 .header(ACCESS_CONTROL_ALLOW_ORIGIN, &self.config.cors_origin)
457 .header("Connection", "keep-alive")
458 .body(body)
459 .unwrap()
460 }
461
462 pub async fn has_connections(&self, session_id: &str) -> bool {
464 let connections = self.connections.read().await;
465 connections
466 .get(session_id)
467 .map(|session_connections| !session_connections.is_empty())
468 .unwrap_or(false)
469 }
470
471 pub async fn broadcast_to_session(
473 &self,
474 session_id: &str,
475 event_type: String,
476 data: Value,
477 ) -> Result<u64, StreamError> {
478 self.broadcast_to_session_with_options(session_id, event_type, data, true)
479 .await
480 }
481
482 pub async fn broadcast_to_session_with_options(
484 &self,
485 session_id: &str,
486 event_type: String,
487 data: Value,
488 store_when_no_connections: bool,
489 ) -> Result<u64, StreamError> {
490 let is_subscribed = self.is_subscribed(session_id, &event_type).await;
492 info!(
493 "๐ Subscription check: session={}, event_type={}, is_subscribed={}",
494 session_id, event_type, is_subscribed
495 );
496 if !is_subscribed {
497 warn!(
498 "๐ซ Session {} not subscribed to notification type: {}",
499 session_id, event_type
500 );
501 return Err(StreamError::NotSubscribed(
502 session_id.to_string(),
503 event_type,
504 ));
505 }
506
507 if !store_when_no_connections && !self.has_connections(session_id).await {
509 debug!(
510 "๐ซ Suppressing notification for session {} (no connections, store_when_no_connections=false)",
511 session_id
512 );
513 return Err(StreamError::NoConnections(session_id.to_string()));
514 }
515
516 let event = SseEvent::new(event_type.clone(), data);
518
519 let stored_event = self
521 .storage
522 .store_event(session_id, event)
523 .await
524 .map_err(|e| StreamError::StorageError(e.to_string()))?;
525
526 let connections = self.connections.read().await;
528 debug!(
529 "[{}] ๐ Checking connections for session {}: connections hashmap has {} sessions",
530 self.instance_id,
531 session_id,
532 connections.len()
533 );
534
535 if let Some(session_connections) = connections.get(session_id) {
536 debug!(
537 "๐ Session {} found with {} connections",
538 session_id,
539 session_connections.len()
540 );
541
542 if !session_connections.is_empty() {
543 let (selected_connection_id, selected_sender) =
545 session_connections.iter().next().unwrap();
546
547 if selected_sender.is_closed() {
549 warn!(
550 "๐ Sender is closed for connection: session={}, connection={}",
551 session_id, selected_connection_id
552 );
553 debug!("๐ญ Connection sender was closed, event stored for reconnection");
554 } else {
555 debug!(
556 "โ
Sender is open, attempting to send to connection: session={}, connection={}",
557 session_id, selected_connection_id
558 );
559
560 match selected_sender.try_send(stored_event.clone()) {
561 Ok(()) => {
562 debug!(
563 "Sent notification to ONE connection: session={}, connection={}, event_id={}, method={}",
564 session_id,
565 selected_connection_id,
566 stored_event.id,
567 stored_event.event_type
568 );
569 }
570 Err(mpsc::error::TrySendError::Full(_)) => {
571 warn!(
572 "โ ๏ธ Connection buffer full: session={}, connection={}",
573 session_id, selected_connection_id
574 );
575 }
577 Err(mpsc::error::TrySendError::Closed(_)) => {
578 warn!(
579 "๐ Connection closed during send: session={}, connection={}",
580 session_id, selected_connection_id
581 );
582 }
584 }
585 }
586 } else {
587 debug!(
588 "๐ญ No active connections for session: {} (event stored for reconnection)",
589 session_id
590 );
591 }
592 } else {
593 debug!(
594 "๐ญ No connections registered for session: {} (event stored for reconnection)",
595 session_id
596 );
597
598 for (sid, conns) in connections.iter() {
600 debug!(
601 "๐ Available session: {} with {} connections",
602 sid,
603 conns.len()
604 );
605 }
606 }
607
608 Ok(stored_event.id)
609 }
610
611 pub async fn broadcast_to_all_sessions(
613 &self,
614 event_type: String,
615 data: Value,
616 ) -> Result<Vec<String>, StreamError> {
617 let session_ids = self
619 .storage
620 .list_sessions()
621 .await
622 .map_err(|e| StreamError::StorageError(e.to_string()))?;
623
624 let mut failed_sessions = Vec::new();
625
626 for session_id in session_ids {
627 if let Err(e) = self
628 .broadcast_to_session(&session_id, event_type.clone(), data.clone())
629 .await
630 {
631 error!("Failed to broadcast to session {}: {}", session_id, e);
632 failed_sessions.push(session_id);
633 }
634 }
635
636 Ok(failed_sessions)
637 }
638
639 pub async fn cleanup_connections(&self) -> usize {
641 debug!("๐งน CLEANUP_CONNECTIONS called");
642 let mut connections = self.connections.write().await;
643 let mut total_cleaned = 0;
644
645 debug!(
646 "๐ BEFORE cleanup: HashMap has {} sessions",
647 connections.len()
648 );
649
650 connections.retain(|session_id, session_connections| {
652 let initial_count = session_connections.len();
653
654 session_connections.retain(|connection_id, sender| {
656 if sender.is_closed() {
657 debug!(
658 "๐งน Cleaned up closed connection: session={}, connection={}",
659 session_id, connection_id
660 );
661 false
662 } else {
663 true
664 }
665 });
666
667 let cleaned_count = initial_count - session_connections.len();
668 total_cleaned += cleaned_count;
669
670 !session_connections.is_empty()
672 });
673
674 if total_cleaned > 0 {
675 debug!("Cleaned up {} inactive connections", total_cleaned);
676 }
677
678 total_cleaned
679 }
680
681 pub async fn create_post_sse_stream(
683 &self,
684 session_id: String,
685 response: turul_mcp_json_rpc_server::JsonRpcResponse,
686 ) -> Result<
687 hyper::Response<
688 http_body_util::combinators::BoxBody<bytes::Bytes, std::convert::Infallible>,
689 >,
690 StreamError,
691 > {
692 if self
694 .storage
695 .get_session(&session_id)
696 .await
697 .map_err(|e| StreamError::StorageError(e.to_string()))?
698 .is_none()
699 {
700 return Err(StreamError::SessionNotFound(session_id));
701 }
702
703 debug!("Creating POST SSE stream for session: {}", session_id);
704
705 let response_json = serde_json::to_string(&response).map_err(|e| {
707 StreamError::StorageError(format!("Failed to serialize response: {}", e))
708 })?;
709
710 let mut sse_frames = Vec::new();
714 let mut event_id_counter = 1;
715
716 if let Ok(events) = self.storage.get_recent_events(&session_id, 10).await {
717 for event in events {
718 if event.event_type != "ping" {
720 let notification_sse = format!(
724 "id: {}\nevent: message\ndata: {}\n\n",
725 event_id_counter, event.data
726 );
727 debug!(
728 "๐ค Including notification in POST SSE stream: id={}, json_rpc_method={}",
729 event_id_counter, event.event_type
730 );
731 sse_frames.push(http_body::Frame::data(Bytes::from(notification_sse)));
732 event_id_counter += 1;
733 }
734 }
735 }
736
737 let response_sse = format!(
740 "id: {}\nevent: message\ndata: {}\n\n",
741 event_id_counter, response_json
742 );
743 debug!(
744 "๐ค Sending JSON-RPC response as SSE event: id={}, event=message",
745 event_id_counter
746 );
747 sse_frames.push(http_body::Frame::data(Bytes::from(response_sse)));
748
749 let stream = futures::stream::iter(
751 sse_frames
752 .into_iter()
753 .map(Ok::<_, std::convert::Infallible>),
754 );
755
756 let body = StreamBody::new(stream);
758 let boxed_body = http_body_util::combinators::BoxBody::new(body);
759
760 debug!(
761 "๐ก POST SSE streaming response created: session={}",
762 session_id
763 );
764
765 Ok(hyper::Response::builder()
767 .status(hyper::StatusCode::OK)
768 .header(hyper::header::CONTENT_TYPE, "text/event-stream")
769 .header(hyper::header::CACHE_CONTROL, "no-cache")
770 .header(
771 hyper::header::ACCESS_CONTROL_ALLOW_ORIGIN,
772 &self.config.cors_origin,
773 )
774 .header("Connection", "keep-alive")
775 .header("X-Accel-Buffering", "no") .header("Mcp-Session-Id", &session_id)
777 .body(boxed_body)
778 .unwrap())
779 }
780
781 pub async fn subscribe_to_notifications(
783 &self,
784 session_id: &str,
785 notification_types: Vec<String>,
786 ) {
787 let mut subscriptions = self.subscriptions.write().await;
788 let session_subscriptions = subscriptions
789 .entry(session_id.to_string())
790 .or_insert_with(HashSet::new);
791
792 for notification_type in notification_types {
793 session_subscriptions.insert(notification_type.clone());
794 debug!(
795 "๐ Session {} subscribed to notification: {}",
796 session_id, notification_type
797 );
798 }
799
800 debug!(
801 "Session {} now has {} subscriptions",
802 session_id,
803 session_subscriptions.len()
804 );
805 }
806
807 pub async fn unsubscribe_from_notifications(
809 &self,
810 session_id: &str,
811 notification_types: Vec<String>,
812 ) {
813 let mut subscriptions = self.subscriptions.write().await;
814 if let Some(session_subscriptions) = subscriptions.get_mut(session_id) {
815 for notification_type in notification_types {
816 if session_subscriptions.remove(¬ification_type) {
817 debug!(
818 "๐ Session {} unsubscribed from notification: {}",
819 session_id, notification_type
820 );
821 }
822 }
823
824 if session_subscriptions.is_empty() {
826 subscriptions.remove(session_id);
827 debug!(
828 "๐๏ธ Removed subscription entry for session {} (no remaining subscriptions)",
829 session_id
830 );
831 }
832 }
833 }
834
835 pub async fn is_subscribed(&self, session_id: &str, notification_type: &str) -> bool {
837 let subscriptions = self.subscriptions.read().await;
838 subscriptions
839 .get(session_id)
840 .map(|session_subscriptions| session_subscriptions.contains(notification_type))
841 .unwrap_or(true) }
843
844 pub async fn get_subscriptions(&self, session_id: &str) -> HashSet<String> {
846 let subscriptions = self.subscriptions.read().await;
847 subscriptions.get(session_id).cloned().unwrap_or_default()
848 }
849
850 pub async fn clear_subscriptions(&self, session_id: &str) {
852 let mut subscriptions = self.subscriptions.write().await;
853 if subscriptions.remove(session_id).is_some() {
854 debug!("๐๏ธ Cleared all subscriptions for session: {}", session_id);
855 }
856 }
857
858 pub fn get_config(&self) -> &StreamConfig {
860 &self.config
861 }
862
863 pub async fn get_stats(&self) -> StreamStats {
865 let connections = self.connections.read().await;
866 let session_count = self.storage.session_count().await.unwrap_or(0);
867 let event_count = self.storage.event_count().await.unwrap_or(0);
868
869 let total_connections: usize = connections
871 .values()
872 .map(|session_connections| session_connections.len())
873 .sum();
874
875 StreamStats {
876 active_broadcasters: total_connections, total_sessions: session_count,
878 total_events: event_count,
879 channel_buffer_size: self.config.channel_buffer_size,
880 }
881 }
882}
883
884impl Drop for StreamManager {
885 fn drop(&mut self) {
886 debug!(
887 "DROP: StreamManager instance {} - this may cause connection loss!",
888 self.instance_id
889 );
890 debug!("If this appears during request processing, it indicates architecture problem");
891 }
892}
893
894#[derive(Debug, Clone)]
896pub struct StreamStats {
897 pub active_broadcasters: usize,
898 pub total_sessions: usize,
899 pub total_events: usize,
900 pub channel_buffer_size: usize,
901}
902
903#[cfg(not(test))]
905use async_stream;
906
907#[cfg(test)]
908mod tests {
909 use super::*;
910 use turul_mcp_protocol::ServerCapabilities;
911 use turul_mcp_session_storage::{InMemorySessionStorage, SessionStorage};
912
913 #[tokio::test]
914 async fn test_stream_manager_creation() {
915 let storage = Arc::new(InMemorySessionStorage::new());
916 let manager = StreamManager::new(storage);
917
918 let stats = manager.get_stats().await;
919 assert_eq!(stats.active_broadcasters, 0);
920 assert_eq!(stats.total_sessions, 0);
921 }
922
923 #[tokio::test]
924 async fn test_broadcast_to_session() {
925 let storage = Arc::new(InMemorySessionStorage::new());
926 let manager = StreamManager::new(storage.clone());
927
928 let session = storage
930 .create_session(ServerCapabilities::default())
931 .await
932 .unwrap();
933 let session_id = session.session_id.clone();
934
935 let event_id = manager
937 .broadcast_to_session(
938 &session_id,
939 "test".to_string(),
940 serde_json::json!({"message": "test"}),
941 )
942 .await
943 .unwrap();
944
945 assert!(event_id > 0);
946
947 let events = storage.get_events_after(&session_id, 0).await.unwrap();
949 assert_eq!(events.len(), 1);
950 assert_eq!(events[0].id, event_id);
951 }
952}