EventStore

Trait EventStore 

Source
pub trait EventStore: Send + Sync {
    // Required methods
    fn store_event<'life0, 'async_trait>(
        &'life0 self,
        session_id: String,
        stream_id: String,
        timestamp: u128,
        message: String,
    ) -> Pin<Box<dyn Future<Output = Result<String, EventStoreError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn remove_by_session_id<'life0, 'async_trait>(
        &'life0 self,
        session_id: String,
    ) -> Pin<Box<dyn Future<Output = Result<(), EventStoreError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn remove_stream_in_session<'life0, 'async_trait>(
        &'life0 self,
        session_id: String,
        stream_id: String,
    ) -> Pin<Box<dyn Future<Output = Result<(), EventStoreError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn clear<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<(), EventStoreError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn events_after<'life0, 'async_trait>(
        &'life0 self,
        last_event_id: String,
    ) -> Pin<Box<dyn Future<Output = Result<Option<EventStoreEntry>, EventStoreError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn count<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<usize, EventStoreError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait;

    // Provided method
    fn prune_excess_events<'life0, 'async_trait>(
        &'life0 self,
        _session_id: Option<String>,
    ) -> Pin<Box<dyn Future<Output = Result<(), EventStoreError>> + Send + 'async_trait>>
       where 'life0: 'async_trait,
             Self: 'async_trait { ... }
}
Expand description

Trait defining the interface for event storage and retrieval, used by the MCP server to store and replay events for state reconstruction after client reconnection

Required Methods§

Source

fn store_event<'life0, 'async_trait>( &'life0 self, session_id: String, stream_id: String, timestamp: u128, message: String, ) -> Pin<Box<dyn Future<Output = Result<String, EventStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Stores a new event in the store and returns the generated event ID. For MCP, this stores protocol messages, timestamp is the number of microseconds since UNIX_EPOCH. The timestamp helps determine the order in which messages arrived.

§Parameters
  • session_id: The session identifier for the event.
  • stream_id: The stream identifier within the session.
  • timestamp: The u128 timestamp of the event.
  • message: The event payload as json string.
§Returns
  • Ok(EventId): The generated ID (format: session_id:stream_id:timestamp) on success.
  • Err(Self::Error): If input is invalid or storage fails.
Source

fn remove_by_session_id<'life0, 'async_trait>( &'life0 self, session_id: String, ) -> Pin<Box<dyn Future<Output = Result<(), EventStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Removes all events associated with a given session ID. Used to clean up all events for a session when it is no longer needed (e.g., session ended).

§Parameters
  • session_id: The session ID whose events should be removed.
Source

fn remove_stream_in_session<'life0, 'async_trait>( &'life0 self, session_id: String, stream_id: String, ) -> Pin<Box<dyn Future<Output = Result<(), EventStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Removes all events for a specific stream within a session. Useful for cleaning up a specific stream without affecting others.

§Parameters
  • session_id: The session ID containing the stream.
  • stream_id: The stream ID whose events should be removed.
§Returns
  • Ok(()): On successful deletion.
  • Err(Self::Error): If deletion fails.
Source

fn clear<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<(), EventStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Clears all events from the store. Used for resetting the store.

Source

fn events_after<'life0, 'async_trait>( &'life0 self, last_event_id: String, ) -> Pin<Box<dyn Future<Output = Result<Option<EventStoreEntry>, EventStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Retrieves events after a given event ID for a session and stream. Critical for MCP server to replay events after a client reconnects, starting from the last known event. Events are returned in chronological order (ascending timestamp) to reconstruct state.

§Parameters
  • last_event_id: The event ID to fetch events after.
§Returns
  • Some(Some(EventStoreEntry)): Events after the specified ID, if any.
  • None: If no events exist after it OR the event ID is invalid.
Source

fn count<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<usize, EventStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Counts the total number of events in the store.

§Returns
  • The number of events across all sessions and streams.

Provided Methods§

Source

fn prune_excess_events<'life0, 'async_trait>( &'life0 self, _session_id: Option<String>, ) -> Pin<Box<dyn Future<Output = Result<(), EventStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

Prunes excess events to control storage usage. Implementations may apply custom logic, such as limiting the number of events per session or removing events older than a certain timestamp. Default implementation logs a warning if not overridden by the store.

§Parameters
  • session_id: Optional session ID to prune a specific session; if None, prunes all sessions.

Implementors§