InMemoryEventStore

Struct InMemoryEventStore 

Source
pub struct InMemoryEventStore { /* private fields */ }

Implementations§

Source§

impl InMemoryEventStore

In-memory implementation of the EventStore trait for MCP’s Streamable HTTP transport.

Stores events in a HashMap of session IDs to VecDeques of events, with a per-session limit. Events are identified by event_id (format: session-.-stream-.-timestamp) and used for SSE resumption. Thread-safe via RwLock for concurrent access.

Source

pub fn new(max_events_per_session: Option<usize>) -> InMemoryEventStore

Creates a new InMemoryEventStore with an optional maximum events per session.

§Arguments
  • max_events_per_session: Maximum number of events per session. Defaults to MAX_EVENTS_PER_SESSION (32) if None.
§Returns

A new InMemoryEventStore instance with an empty HashMap wrapped in a RwLock.

§Example
let store = InMemoryEventStore::new(Some(10));
assert_eq!(store.max_events_per_session, 10);
Source

pub fn parse_event_id<'a>( &self, event_id: &'a str, ) -> Option<(&'a str, &'a str, &'a str)>

Parses an event ID into its session, stream, and timestamp components.

The event ID must follow the format session-.-stream-.-timestamp. Returns None if the format is invalid, empty, or contains invalid characters (e.g., NULL).

§Arguments
  • event_id: The event ID string to parse.
§Returns

An Option containing a tuple of (session_id, stream_id, time_stamp) as string slices, or None if the format is invalid.

§Example
let store = InMemoryEventStore::new(None);
let event_id = "session1-.-stream1-.-12345";
assert_eq!(
    store.parse_event_id(event_id),
    Some(("session1", "stream1", "12345"))
);
assert_eq!(store.parse_event_id("invalid"), None);

Trait Implementations§

Source§

impl Debug for InMemoryEventStore

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
Source§

impl Default for InMemoryEventStore

Source§

fn default() -> InMemoryEventStore

Returns the “default value” for a type. Read more
Source§

impl EventStore for InMemoryEventStore

Source§

fn store_event<'life0, 'async_trait>( &'life0 self, session_id: String, stream_id: String, time_stamp: u128, message: String, ) -> Pin<Box<dyn Future<Output = String> + Send + 'async_trait>>
where 'life0: 'async_trait, InMemoryEventStore: 'async_trait,

Stores an event for a given session and stream, returning its event_id.

Adds the event to the session’s VecDeque, removing the oldest event if the session reaches max_events_per_session.

§Arguments
  • session_id: The session identifier.
  • stream_id: The stream identifier.
  • time_stamp: The event timestamp (u128).
  • message: The ServerMessages payload.
§Returns

The generated EventId for the stored event.

Source§

fn remove_stream_in_session<'life0, 'async_trait>( &'life0 self, session_id: String, stream_id: String, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where 'life0: 'async_trait, InMemoryEventStore: 'async_trait,

Removes all events associated with a given stream ID within a specific session.

Removes events matching stream_id from the specified session_id’s event queue. If the session’s queue becomes empty, it is removed from the store. Idempotent if session_id or stream_id doesn’t exist.

§Arguments
  • session_id: The session identifier to target.
  • stream_id: The stream identifier to remove.
Source§

fn remove_by_session_id<'life0, 'async_trait>( &'life0 self, session_id: String, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where 'life0: 'async_trait, InMemoryEventStore: 'async_trait,

Removes all events associated with a given session ID.

Removes the entire session from the store. Idempotent if session_id doesn’t exist.

§Arguments
  • session_id: The session identifier to remove.
Source§

fn events_after<'life0, 'async_trait>( &'life0 self, last_event_id: String, ) -> Pin<Box<dyn Future<Output = Option<EventStoreMessages>> + Send + 'async_trait>>
where 'life0: 'async_trait, InMemoryEventStore: 'async_trait,

Retrieves events after a given event_id for a specific session and stream.

Parses last_event_id to extract session_id, stream_id, and time_stamp. Returns events after the matching event in the session’s stream, sorted by timestamp in ascending order (earliest to latest). Returns None if the event_id is invalid, the session doesn’t exist, or the timestamp is non-numeric.

§Arguments
  • last_event_id: The event ID (format: session-.-stream-.-timestamp) to start after.
§Returns

An Option containing EventStoreMessages with the session ID, stream ID, and sorted messages, or None if no events are found or the input is invalid.

Source§

fn clear<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where 'life0: 'async_trait, InMemoryEventStore: 'async_trait,

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,