rust_mcp_transport/
event_store.rs

1mod in_memory_event_store;
2
3use crate::{EventId, SessionId, StreamId};
4use async_trait::async_trait;
5pub use in_memory_event_store::*;
6use thiserror::Error;
7
8#[derive(Debug, Clone)]
9pub struct EventStoreEntry {
10    pub session_id: SessionId,
11    pub stream_id: StreamId,
12    pub messages: Vec<String>,
13}
14
15#[derive(Debug, Error)]
16#[error("{message}")]
17pub struct EventStoreError {
18    pub message: String,
19}
20
21impl From<&str> for EventStoreError {
22    fn from(s: &str) -> Self {
23        EventStoreError {
24            message: s.to_string(),
25        }
26    }
27}
28
29impl From<String> for EventStoreError {
30    fn from(s: String) -> Self {
31        EventStoreError { message: s }
32    }
33}
34
35type EventStoreResult<T> = Result<T, EventStoreError>;
36
37/// Trait defining the interface for event storage and retrieval, used by the MCP server
38/// to store and replay events for state reconstruction after client reconnection
39#[async_trait]
40pub trait EventStore: Send + Sync {
41    /// Stores a new event in the store and returns the generated event ID.
42    /// For MCP, this stores protocol messages, timestamp is the number of microseconds since UNIX_EPOCH.
43    /// The timestamp helps determine the order in which messages arrived.
44    ///
45    /// # Parameters
46    /// - `session_id`: The session identifier for the event.
47    /// - `stream_id`: The stream identifier within the session.
48    /// - `timestamp`: The u128 timestamp of the event.
49    /// - `message`: The event payload as json string.
50    ///
51    /// # Returns
52    /// - `Ok(EventId)`: The generated ID (format: session_id:stream_id:timestamp) on success.
53    /// - `Err(Self::Error)`: If input is invalid or storage fails.
54    async fn store_event(
55        &self,
56        session_id: SessionId,
57        stream_id: StreamId,
58        timestamp: u128,
59        message: String,
60    ) -> EventStoreResult<EventId>;
61
62    /// Removes all events associated with a given session ID.
63    /// Used to clean up all events for a session when it is no longer needed (e.g., session ended).
64    ///
65    /// # Parameters
66    /// - `session_id`: The session ID whose events should be removed.
67    ///
68    async fn remove_by_session_id(&self, session_id: SessionId) -> EventStoreResult<()>;
69    /// Removes all events for a specific stream within a session.
70    /// Useful for cleaning up a specific stream without affecting others.
71    ///
72    /// # Parameters
73    /// - `session_id`: The session ID containing the stream.
74    /// - `stream_id`: The stream ID whose events should be removed.
75    ///
76    /// # Returns
77    /// - `Ok(())`: On successful deletion.
78    /// - `Err(Self::Error)`: If deletion fails.
79    async fn remove_stream_in_session(
80        &self,
81        session_id: SessionId,
82        stream_id: StreamId,
83    ) -> EventStoreResult<()>;
84    /// Clears all events from the store.
85    /// Used for resetting the store.
86    ///
87    async fn clear(&self) -> EventStoreResult<()>;
88    /// Retrieves events after a given event ID for a session and stream.
89    /// Critical for MCP server to replay events after a client reconnects, starting from the last known event.
90    /// Events are returned in chronological order (ascending timestamp) to reconstruct state.
91    ///
92    /// # Parameters
93    /// - `last_event_id`: The event ID to fetch events after.
94    ///
95    /// # Returns
96    /// - `Some(Some(EventStoreEntry))`: Events after the specified ID, if any.
97    /// - `None`: If no events exist after it OR the event ID is invalid.
98    async fn events_after(
99        &self,
100        last_event_id: EventId,
101    ) -> EventStoreResult<Option<EventStoreEntry>>;
102    /// Prunes excess events to control storage usage.
103    /// Implementations may apply custom logic, such as limiting
104    /// the number of events per session or removing events older than a certain timestamp.
105    /// Default implementation logs a warning if not overridden by the store.
106    ///
107    /// # Parameters
108    /// - `session_id`: Optional session ID to prune a specific session; if None, prunes all sessions.
109    async fn prune_excess_events(&self, _session_id: Option<SessionId>) -> EventStoreResult<()> {
110        tracing::warn!("prune_excess_events() is not implemented for the event store.");
111        Ok(())
112    }
113    /// Counts the total number of events in the store.
114    ///
115    /// # Returns
116    /// - The number of events across all sessions and streams.
117    async fn count(&self) -> EventStoreResult<usize>;
118}