pub struct SessionState {
pub tenant_id: TenantId,
pub principal: Option<AuthenticatedPrincipal>,
pub created_at_ms: i64,
pub last_accessed_at_ms: AtomicI64,
pub event_tx: Sender<McpStreamEvent>,
pub next_event_id: AtomicU64,
pub event_replay_buffer: Arc<Mutex<VecDeque<McpStreamEvent>>>,
}Expand description
One session’s state. v0.11.0 P2 grows this from P1’s minimal “tenant + timestamps” record by adding the broadcast event channel
- monotonic event-id counter the resumable GET stream rides on.
Not Clone — atomics + broadcast::Sender make Clone a
surprising contract. The store hands out Arc<SessionState> so
concurrent requests observe each other’s touch() calls + share
one event channel. Callers that want a snapshot of the timestamps
can read them via the public fields directly.
Fields§
§tenant_id: TenantIdTenant the session is bound to. Set on session create from the extractor-resolved tenant; a future priority will refuse to reuse a session under a different tenant.
principal: Option<AuthenticatedPrincipal>Authenticated principal at session create time. None for
unauthenticated loopback deployments (the daemon default).
A future cross-principal access check uses this to refuse a
session presented with a different bearer / OIDC subject.
created_at_ms: i64Wall-clock millis at session create. Compared against
MCP_SESSION_ABSOLUTE_TTL_MS.
last_accessed_at_ms: AtomicI64Wall-clock millis updated on every successful SessionStore::get.
Compared against MCP_SESSION_INACTIVITY_TTL_MS. Stored as
AtomicI64 so reads via Arc<SessionState> can refresh without
re-inserting into the DashMap shard.
event_tx: Sender<McpStreamEvent>v0.11.0 P2: broadcast channel fed by publish_event. The GET
handler subscribes to this on connect; P3 (progress) and P4
(notifications/message) publish into it. Capacity bounded by
MCP_SESSION_EVENT_BUFFER_CAPACITY per Decision E.
Note: broadcast::channel does NOT backfill freshly-subscribed
receivers with previously-sent events. To support the
Last-Event-ID resume contract we also keep a ring buffer
(event_replay_buffer) which the GET handler reads on connect
before tailing this channel for live events.
next_event_id: AtomicU64v0.11.0 P2: monotonic per-session event id counter. Allocated
via fetch_add(1, SeqCst) from publish_event; first event
has id 1 (id 0 is the “never seen” sentinel clients send on
the first Last-Event-ID header).
event_replay_buffer: Arc<Mutex<VecDeque<McpStreamEvent>>>v0.11.0 P2: bounded ring buffer of recent events for
Last-Event-ID replay. Capacity matches the broadcast channel
(MCP_SESSION_EVENT_BUFFER_CAPACITY); oldest entry evicted
on insert past the cap. std::sync::Mutex rather than
tokio::sync::Mutex because the critical sections are tiny
(push one event / clone a Vec out) — no await inside the
lock. Wrapping in Arc<Mutex<...>> keeps SessionState cheap
to share across the broadcast subscribers + the publisher.
Implementations§
Source§impl SessionState
impl SessionState
Sourcepub fn new(
tenant_id: TenantId,
principal: Option<AuthenticatedPrincipal>,
) -> Self
pub fn new( tenant_id: TenantId, principal: Option<AuthenticatedPrincipal>, ) -> Self
Build a fresh session-state record. Used by SessionStore::insert
and the session-extractor path. Allocates a fresh broadcast
channel (capacity MCP_SESSION_EVENT_BUFFER_CAPACITY) for the
session’s SSE stream and a matching-capacity replay ring buffer.
Sourcepub fn publish_event(&self, kind: McpEventKind, data: Value) -> u64
pub fn publish_event(&self, kind: McpEventKind, data: Value) -> u64
Allocate the next event id, construct an McpStreamEvent,
and (a) push it onto the replay ring buffer + (b) broadcast it
to every live subscriber. Returns the assigned id so callers
(P3/P4) can correlate their write with the resulting stream
entry.
Lossy on the broadcast side by design: if there are no live
receivers (or every receiver has been dropped)
broadcast::Sender::send returns Err(SendError) — the event
is silently dropped from the live channel but STILL appended
to the replay buffer so a future subscriber’s Last-Event-ID
replay observes it.
Replay buffer is bounded at MCP_SESSION_EVENT_BUFFER_CAPACITY
entries; pushing past the cap evicts the oldest entry. This
matches the broadcast channel’s capacity so the two stay in
lock-step — a subscriber that subscribed before any events
were published and then lags past 256 events sees the same
“buffer overrun” semantics whether it observes them via the
broadcast lagged-error path or via a Last-Event-ID resume.
Sourcepub fn subscribe_events(&self) -> Receiver<McpStreamEvent>
pub fn subscribe_events(&self) -> Receiver<McpStreamEvent>
Subscribe to the session’s event stream. Returns a fresh
broadcast::Receiver that observes every event published from
this call forward. Combined with SessionState::snapshot_replay_buffer
Last-Event-IDreplay logic in the GET handler, this gives the spec’s resume-from-missed-event semantics.
Sourcepub fn snapshot_replay_buffer(&self) -> Vec<McpStreamEvent>
pub fn snapshot_replay_buffer(&self) -> Vec<McpStreamEvent>
Snapshot the current replay buffer. Returns a Vec<McpStreamEvent>
in monotonically increasing id order. The GET handler calls
this once on connect AFTER calling Self::subscribe_events
(so any event published during the snapshot lands in the live
receiver — the handler dedupes the overlap by id).
Trait Implementations§
Auto Trait Implementations§
impl !Freeze for SessionState
impl !RefUnwindSafe for SessionState
impl Send for SessionState
impl Sync for SessionState
impl Unpin for SessionState
impl UnsafeUnpin for SessionState
impl !UnwindSafe for SessionState
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more