rust_mcp_transport/event_store.rs
1mod in_memory_event_store;
2
3use crate::{EventId, SessionId, StreamId};
4use async_trait::async_trait;
5pub use in_memory_event_store::*;
6use thiserror::Error;
7
8#[derive(Debug, Clone)]
9pub struct EventStoreEntry {
10 pub session_id: SessionId,
11 pub stream_id: StreamId,
12 pub messages: Vec<String>,
13}
14
15#[derive(Debug, Error)]
16#[error("{message}")]
17pub struct EventStoreError {
18 pub message: String,
19}
20
21impl From<&str> for EventStoreError {
22 fn from(s: &str) -> Self {
23 EventStoreError {
24 message: s.to_string(),
25 }
26 }
27}
28
29impl From<String> for EventStoreError {
30 fn from(s: String) -> Self {
31 EventStoreError { message: s }
32 }
33}
34
35type EventStoreResult<T> = Result<T, EventStoreError>;
36
37/// Trait defining the interface for event storage and retrieval, used by the MCP server
38/// to store and replay events for state reconstruction after client reconnection
39#[async_trait]
40pub trait EventStore: Send + Sync {
41 /// Stores a new event in the store and returns the generated event ID.
42 /// For MCP, this stores protocol messages, timestamp is the number of microseconds since UNIX_EPOCH.
43 /// The timestamp helps determine the order in which messages arrived.
44 ///
45 /// # Parameters
46 /// - `session_id`: The session identifier for the event.
47 /// - `stream_id`: The stream identifier within the session.
48 /// - `timestamp`: The u128 timestamp of the event.
49 /// - `message`: The event payload as json string.
50 ///
51 /// # Returns
52 /// - `Ok(EventId)`: The generated ID (format: session_id:stream_id:timestamp) on success.
53 /// - `Err(Self::Error)`: If input is invalid or storage fails.
54 async fn store_event(
55 &self,
56 session_id: SessionId,
57 stream_id: StreamId,
58 timestamp: u128,
59 message: String,
60 ) -> EventStoreResult<EventId>;
61
62 /// Removes all events associated with a given session ID.
63 /// Used to clean up all events for a session when it is no longer needed (e.g., session ended).
64 ///
65 /// # Parameters
66 /// - `session_id`: The session ID whose events should be removed.
67 ///
68 async fn remove_by_session_id(&self, session_id: SessionId) -> EventStoreResult<()>;
69 /// Removes all events for a specific stream within a session.
70 /// Useful for cleaning up a specific stream without affecting others.
71 ///
72 /// # Parameters
73 /// - `session_id`: The session ID containing the stream.
74 /// - `stream_id`: The stream ID whose events should be removed.
75 ///
76 /// # Returns
77 /// - `Ok(())`: On successful deletion.
78 /// - `Err(Self::Error)`: If deletion fails.
79 async fn remove_stream_in_session(
80 &self,
81 session_id: SessionId,
82 stream_id: StreamId,
83 ) -> EventStoreResult<()>;
84 /// Clears all events from the store.
85 /// Used for resetting the store.
86 ///
87 async fn clear(&self) -> EventStoreResult<()>;
88 /// Retrieves events after a given event ID for a session and stream.
89 /// Critical for MCP server to replay events after a client reconnects, starting from the last known event.
90 /// Events are returned in chronological order (ascending timestamp) to reconstruct state.
91 ///
92 /// # Parameters
93 /// - `last_event_id`: The event ID to fetch events after.
94 ///
95 /// # Returns
96 /// - `Some(Some(EventStoreEntry))`: Events after the specified ID, if any.
97 /// - `None`: If no events exist after it OR the event ID is invalid.
98 async fn events_after(
99 &self,
100 last_event_id: EventId,
101 ) -> EventStoreResult<Option<EventStoreEntry>>;
102 /// Prunes excess events to control storage usage.
103 /// Implementations may apply custom logic, such as limiting
104 /// the number of events per session or removing events older than a certain timestamp.
105 /// Default implementation logs a warning if not overridden by the store.
106 ///
107 /// # Parameters
108 /// - `session_id`: Optional session ID to prune a specific session; if None, prunes all sessions.
109 async fn prune_excess_events(&self, _session_id: Option<SessionId>) -> EventStoreResult<()> {
110 tracing::warn!("prune_excess_events() is not implemented for the event store.");
111 Ok(())
112 }
113 /// Counts the total number of events in the store.
114 ///
115 /// # Returns
116 /// - The number of events across all sessions and streams.
117 async fn count(&self) -> EventStoreResult<usize>;
118}