klieo-mcp-server 2.2.0

Expose any klieo ToolInvoker or Agent as an MCP server over stdio or HTTP. The inverse of klieo-tools-mcp.
Documentation
//! Per-session state used by the MCP server's transport containers.
//!
//! One `Session` per stdio process and one per active HTTP
//! `Mcp-Session-Id`. Stdio's session has `id == None` (no
//! `Mcp-Session-Id` on stdio) and lives in `McpServer.stdio_session`;
//! HTTP sessions have `id == Some(uuid)` minted by
//! `handle_initialize_post` and live in the `McpServer.sessions`
//! registry until the client DELETEs them, the SSE body drops, or
//! the idle reaper evicts them.

// Fields are constructed unconditionally but several
// (`id`, `last_activity_millis`, `closed`, `outbound_tx`) are only
// read by `#[cfg(feature = "http")]` code paths. File-scope allow
// keeps the stdio-only build clean without annotating each field.
#![allow(dead_code)]

use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;

use tokio::sync::OnceCell;

use crate::outbound::OutboundRequests;
#[cfg(feature = "http")]
use crate::outbound_ring::RingSender;
use crate::roots::RootsCache;

pub(crate) struct Session {
    /// Wire identifier. `None` for stdio (no `Mcp-Session-Id` on
    /// stdio). `Some(_)` for HTTP, minted by
    /// `handle_initialize_post`.
    pub(crate) id: Option<uuid::Uuid>,

    /// Authenticated principal that minted this session. `None` for
    /// stdio (no auth context) and for HTTP sessions opened against
    /// a server with no `Authenticator` wired. `Some(_)` when an
    /// authenticator was configured and the verified `Identity` was
    /// captured at `handle_initialize_post` time. Per-request session
    /// lookup (`require_session`) compares this against the caller's
    /// `Identity` and rejects mismatches as 404 to deny cross-tenant
    /// session-kill / context-steal (CWE-639). The string is the
    /// opaque value returned by `Identity::as_str` — typically the
    /// OAuth `sub` claim.
    pub(crate) principal: Option<String>,

    /// Per-session outbound correlation table + sink. Populated by
    /// stdio at server start or by HTTP at `GET /mcp`. `OnceCell`
    /// preserves the prior single-write semantics; race losers flow
    /// through `mint_session_race_500`.
    pub(crate) outbound: OnceCell<Arc<OutboundRequests>>,

    /// Per-session roots cache. Populated alongside `outbound` when
    /// the server was built with `with_client_sampling`.
    pub(crate) roots_cache: OnceCell<Arc<RootsCache>>,

    /// HTTP outbound ring producer; cloned into the SSE body stream
    /// at `GET /mcp` time. Cell stays empty on stdio sessions.
    ///
    /// The ring carries `Arc<Value>` payloads so producer-side cloning
    /// into the replay buffer is a refcount bump rather than a JSON
    /// tree walk; the SSE body builder reads through the `Arc` for
    /// serialisation.
    #[cfg(feature = "http")]
    pub(crate) outbound_tx: OnceCell<RingSender<(u64, std::sync::Arc<serde_json::Value>)>>,

    /// Millis since the owning `McpServer.server_start` at which this
    /// session last saw activity. Read by the idle reaper for
    /// eviction; written by request handlers on POST and GET via
    /// [`Self::mark_active`].
    ///
    /// `Ordering::Relaxed` — the reaper tolerates stale reads; at
    /// worst a session that became idle just before a tick survives
    /// one extra tick.
    pub(crate) last_activity_millis: AtomicU64,

    /// True once the session has been torn down (idle reaper, GET
    /// disconnect). Read by handlers to short-circuit late frames.
    pub(crate) closed: AtomicBool,

    /// Monotonic event id counter for outbound SSE frames. Starts at
    /// 1 at session construction; increments on every
    /// `HttpFrameSink::send_frame` call. Never resets across
    /// reconnects of the same session; the value carried by the
    /// SSE `id:` line compares against a reconnecting client's
    /// `Last-Event-Id` header.
    #[cfg(feature = "http")]
    pub(crate) next_event_id: AtomicU64,

    /// Drop-oldest ring of recent outbound frames keyed by event
    /// id. The producer (`HttpFrameSink::send_frame`) pushes
    /// `(event_id, frame)` under this Mutex; a reconnecting `GET
    /// /mcp` with `Last-Event-Id` snapshots the slice with
    /// `id > Last-Event-Id` under the same Mutex.
    ///
    /// Capacity is the owning server's `sse_replay_capacity`.
    /// Setting that to 0 disables resumption: the buffer stays
    /// empty.
    ///
    /// Sync `parking_lot::Mutex`: the producer's critical section
    /// holds id allocation, deque push/pop, and a lock-free ring
    /// push — all synchronous — so the guard is released without
    /// suspending the task.
    #[cfg(feature = "http")]
    pub(crate) sse_replay_buffer:
        parking_lot::Mutex<std::collections::VecDeque<(u64, std::sync::Arc<serde_json::Value>)>>,
}

impl Session {
    /// Construct an empty stdio session (no wire id, no principal —
    /// stdio carries no authentication context).
    pub(crate) fn new_stdio() -> Self {
        Self {
            id: None,
            principal: None,
            outbound: OnceCell::new(),
            roots_cache: OnceCell::new(),
            #[cfg(feature = "http")]
            outbound_tx: OnceCell::new(),
            last_activity_millis: AtomicU64::new(0),
            closed: AtomicBool::new(false),
            #[cfg(feature = "http")]
            next_event_id: AtomicU64::new(1),
            #[cfg(feature = "http")]
            sse_replay_buffer: parking_lot::Mutex::new(std::collections::VecDeque::new()),
        }
    }

    /// Construct an empty HTTP session with the given minted UUID and
    /// the verified caller principal (or `None` when no authenticator
    /// was wired into the server). The principal is the value
    /// returned by `Identity::as_str` at `initialize` time and stays
    /// fixed for the session's lifetime; `require_session` enforces
    /// it on every subsequent request.
    ///
    /// `server_start` is the owning server's reference instant; the
    /// session's `last_activity_millis` is initialised to the current
    /// elapsed millis since that instant so the idle reaper treats
    /// the session as freshly active at construction.
    pub(crate) fn new_http(
        id: uuid::Uuid,
        principal: Option<String>,
        server_start: Instant,
    ) -> Self {
        let now_millis = server_start.elapsed().as_millis() as u64;
        Self {
            id: Some(id),
            principal,
            outbound: OnceCell::new(),
            roots_cache: OnceCell::new(),
            #[cfg(feature = "http")]
            outbound_tx: OnceCell::new(),
            last_activity_millis: AtomicU64::new(now_millis),
            closed: AtomicBool::new(false),
            #[cfg(feature = "http")]
            next_event_id: AtomicU64::new(1),
            #[cfg(feature = "http")]
            sse_replay_buffer: parking_lot::Mutex::new(std::collections::VecDeque::new()),
        }
    }

    /// Refresh `last_activity_millis` to current elapsed millis since
    /// the owning server's `server_start`. Lock-free relaxed store.
    pub(crate) fn mark_active(&self, server_start: Instant) {
        let now = server_start.elapsed().as_millis() as u64;
        self.last_activity_millis.store(now, Ordering::Relaxed);
    }

    /// Idempotent close marker. Returns the previous state.
    pub(crate) fn mark_closed(&self) -> bool {
        self.closed.swap(true, Ordering::SeqCst)
    }

    /// Snapshot the close flag.
    pub(crate) fn is_closed(&self) -> bool {
        self.closed.load(Ordering::SeqCst)
    }

    /// Mark this session as closed and drain any pending outbound
    /// oneshots as
    /// [`klieo_core::ServerOutboundError::TransportClosed`] so
    /// awaiting callers wake immediately. Composes the two-step
    /// eviction tail shared by every HTTP removal path
    /// (`delete_mcp`, `spawn_session_cleanup`, `idle_reaper_loop`).
    ///
    /// Idempotent on the mark; the drain is a no-op the second time
    /// because `OutboundRequests::drain_pending_as_closed` clears the
    /// internal table on first call.
    ///
    /// `#[cfg(feature = "http")]` because
    /// `OutboundRequests::drain_pending_as_closed` is itself
    /// feature-gated; stdio has no idle reaper and tears down by
    /// process exit.
    #[cfg(feature = "http")]
    pub(crate) async fn close_and_drain(&self) {
        self.mark_closed();
        if let Some(outbound) = self.outbound.get() {
            outbound.drain_pending_as_closed().await;
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn stdio_session_has_no_wire_id() {
        let session = Session::new_stdio();
        assert!(session.id.is_none());
        assert!(!session.is_closed());
    }

    #[tokio::test]
    async fn http_session_carries_minted_uuid() {
        let id = uuid::Uuid::new_v4();
        let session = Session::new_http(id, None, Instant::now());
        assert_eq!(session.id, Some(id));
        assert!(session.principal.is_none());
        assert!(!session.is_closed());
    }

    #[tokio::test]
    async fn http_session_records_principal_when_supplied() {
        let id = uuid::Uuid::new_v4();
        let session = Session::new_http(id, Some("alice".into()), Instant::now());
        assert_eq!(session.principal.as_deref(), Some("alice"));
    }

    #[tokio::test]
    async fn mark_closed_returns_previous_state_then_idempotent() {
        let session = Session::new_stdio();
        assert!(!session.mark_closed(), "first mark_closed sees false");
        assert!(session.mark_closed(), "second sees true");
        assert!(session.is_closed());
    }

    #[tokio::test]
    async fn mark_active_advances_last_activity_timestamp() {
        let server_start = Instant::now();
        let session = Session::new_stdio();
        let before = session.last_activity_millis.load(Ordering::Relaxed);
        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
        session.mark_active(server_start);
        let after = session.last_activity_millis.load(Ordering::Relaxed);
        assert!(
            after > before,
            "mark_active must move the timestamp forward"
        );
    }

    /// Fresh sessions seed `next_event_id` at 1 so the first outbound
    /// SSE frame fetch-and-adds to event id 1 on the wire. `id: 0` is
    /// reserved as the unspecified value per the SSE spec and must
    /// never appear in a `data:` frame; this test pins the
    /// construction-time baseline that enforces that invariant.
    #[cfg(feature = "http")]
    #[tokio::test]
    async fn session_initialises_next_event_id_at_one() {
        let session = Session::new_stdio();
        let id = session.next_event_id.load(Ordering::Relaxed);
        assert_eq!(id, 1);
    }

    /// Fresh sessions start with an empty SSE replay buffer. The
    /// HTTP frame sink populates it on first send; a reconnect
    /// arriving before any send produces an empty replay snapshot.
    #[cfg(feature = "http")]
    #[tokio::test]
    async fn session_initialises_empty_sse_replay_buffer() {
        let session = Session::new_stdio();
        let buffer = session.sse_replay_buffer.lock();
        assert!(buffer.is_empty());
    }
}