pub struct InMemoryEventStore { /* private fields */ }
Implementations§
Source§impl InMemoryEventStore
In-memory implementation of the EventStore
trait for MCP’s Streamable HTTP transport.
impl InMemoryEventStore
In-memory implementation of the EventStore
trait for MCP’s Streamable HTTP transport.
Stores events in a HashMap
of session IDs to VecDeque
s of events, with a per-session limit.
Events are identified by event_id
(format: session-.-stream-.-timestamp
) and used for SSE resumption.
Thread-safe via RwLock
for concurrent access.
Sourcepub fn new(max_events_per_session: Option<usize>) -> InMemoryEventStore
pub fn new(max_events_per_session: Option<usize>) -> InMemoryEventStore
Creates a new InMemoryEventStore
with an optional maximum events per session.
§Arguments
max_events_per_session
: Maximum number of events per session. Defaults toMAX_EVENTS_PER_SESSION
(32) ifNone
.
§Returns
A new InMemoryEventStore
instance with an empty HashMap
wrapped in a RwLock
.
§Example
let store = InMemoryEventStore::new(Some(10));
assert_eq!(store.max_events_per_session, 10);
Sourcepub fn parse_event_id<'a>(
&self,
event_id: &'a str,
) -> Option<(&'a str, &'a str, &'a str)>
pub fn parse_event_id<'a>( &self, event_id: &'a str, ) -> Option<(&'a str, &'a str, &'a str)>
Parses an event ID into its session, stream, and timestamp components.
The event ID must follow the format session-.-stream-.-timestamp
.
Returns None
if the format is invalid, empty, or contains invalid characters (e.g., NULL).
§Arguments
event_id
: The event ID string to parse.
§Returns
An Option
containing a tuple of (session_id, stream_id, time_stamp)
as string slices,
or None
if the format is invalid.
§Example
let store = InMemoryEventStore::new(None);
let event_id = "session1-.-stream1-.-12345";
assert_eq!(
store.parse_event_id(event_id),
Some(("session1", "stream1", "12345"))
);
assert_eq!(store.parse_event_id("invalid"), None);
Trait Implementations§
Source§impl Debug for InMemoryEventStore
impl Debug for InMemoryEventStore
Source§impl Default for InMemoryEventStore
impl Default for InMemoryEventStore
Source§fn default() -> InMemoryEventStore
fn default() -> InMemoryEventStore
Source§impl EventStore for InMemoryEventStore
impl EventStore for InMemoryEventStore
Source§fn store_event<'life0, 'async_trait>(
&'life0 self,
session_id: String,
stream_id: String,
time_stamp: u128,
message: String,
) -> Pin<Box<dyn Future<Output = String> + Send + 'async_trait>>where
'life0: 'async_trait,
InMemoryEventStore: 'async_trait,
fn store_event<'life0, 'async_trait>(
&'life0 self,
session_id: String,
stream_id: String,
time_stamp: u128,
message: String,
) -> Pin<Box<dyn Future<Output = String> + Send + 'async_trait>>where
'life0: 'async_trait,
InMemoryEventStore: 'async_trait,
Stores an event for a given session and stream, returning its event_id
.
Adds the event to the session’s VecDeque
, removing the oldest event if the session
reaches max_events_per_session
.
§Arguments
session_id
: The session identifier.stream_id
: The stream identifier.time_stamp
: The event timestamp (u128).message
: TheServerMessages
payload.
§Returns
The generated EventId
for the stored event.
Source§fn remove_stream_in_session<'life0, 'async_trait>(
&'life0 self,
session_id: String,
stream_id: String,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
'life0: 'async_trait,
InMemoryEventStore: 'async_trait,
fn remove_stream_in_session<'life0, 'async_trait>(
&'life0 self,
session_id: String,
stream_id: String,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
'life0: 'async_trait,
InMemoryEventStore: 'async_trait,
Removes all events associated with a given stream ID within a specific session.
Removes events matching stream_id
from the specified session_id
’s event queue.
If the session’s queue becomes empty, it is removed from the store.
Idempotent if session_id
or stream_id
doesn’t exist.
§Arguments
session_id
: The session identifier to target.stream_id
: The stream identifier to remove.
Source§fn remove_by_session_id<'life0, 'async_trait>(
&'life0 self,
session_id: String,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
'life0: 'async_trait,
InMemoryEventStore: 'async_trait,
fn remove_by_session_id<'life0, 'async_trait>(
&'life0 self,
session_id: String,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
'life0: 'async_trait,
InMemoryEventStore: 'async_trait,
Removes all events associated with a given session ID.
Removes the entire session from the store. Idempotent if session_id
doesn’t exist.
§Arguments
session_id
: The session identifier to remove.
Source§fn events_after<'life0, 'async_trait>(
&'life0 self,
last_event_id: String,
) -> Pin<Box<dyn Future<Output = Option<EventStoreMessages>> + Send + 'async_trait>>where
'life0: 'async_trait,
InMemoryEventStore: 'async_trait,
fn events_after<'life0, 'async_trait>(
&'life0 self,
last_event_id: String,
) -> Pin<Box<dyn Future<Output = Option<EventStoreMessages>> + Send + 'async_trait>>where
'life0: 'async_trait,
InMemoryEventStore: 'async_trait,
Retrieves events after a given event_id
for a specific session and stream.
Parses last_event_id
to extract session_id
, stream_id
, and time_stamp
.
Returns events after the matching event in the session’s stream, sorted by timestamp
in ascending order (earliest to latest). Returns None
if the event_id
is invalid,
the session doesn’t exist, or the timestamp is non-numeric.
§Arguments
last_event_id
: The event ID (format:session-.-stream-.-timestamp
) to start after.
§Returns
An Option
containing EventStoreMessages
with the session ID, stream ID, and sorted messages,
or None
if no events are found or the input is invalid.