klieo-mcp-server 2.2.0

Expose any klieo ToolInvoker or Agent as an MCP server over stdio or HTTP. The inverse of klieo-tools-mcp.
Documentation
//! Opaque, principal-scoped resume tickets for suspended workflow runs
//! (ADR-045).
//!
//! A workflow that suspends via [`klieo_core::error::Error::Suspended`]
//! returns a freshly minted opaque token in place of the raw checkpoint.
//! The checkpoint persists under the token in a dedicated KV bucket,
//! tagged with the issuing caller's verified principal. A follow-up
//! `klieo/run/resume` call authorises the supplied token against the
//! caller principal BEFORE the one-shot atomic claim — so a foreign
//! principal cannot consume someone else's ticket (IDOR / CWE-639),
//! and the token cannot be re-played by the same caller either
//! (one-shot via compare-and-set, CWE-367 TOCTOU-safe).
//!
//! The token IS the credential. Treat it accordingly: 256-bit OsRng
//! entropy, base64url string, never logged.

use std::sync::Arc;

use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use base64::Engine as _;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use klieo_core::checkpoint::RunCheckpoint;
use klieo_core::error::BusError;
use klieo_core::KvStore;
use rand::rngs::OsRng;
use rand::RngCore;
use serde::{Deserialize, Serialize};

/// KV bucket under which resume tickets persist. Distinct from the
/// runtime checkpoint bucket (`klieo.run-checkpoints`) so the
/// ADR-045 resume-latch GC never trips on a ticket entry and vice
/// versa.
pub(crate) const RESUME_TICKET_BUCKET: &str = "klieo.mcp.resume-tickets";

/// Token entropy in bytes. 256 bits is the minimum unguessable-
/// token bar for a credential that the server treats as opaque.
const TOKEN_ENTROPY_BYTES: usize = 32;

/// Tombstone payload written by [`ResumeTicketStore::claim`] before
/// the best-effort delete. One byte (not empty) so a follow-up `peek`
/// can distinguish "already-claimed but not yet cleaned up" from
/// "never existed" — both fail the same way at the call site (IDOR
/// parity) but logs distinguish them.
const CLAIMED_TOMBSTONE: &[u8] = b"\x00";

/// Server-side record bound to a resume ticket. NEVER crosses the
/// wire — the ticket itself is the only opaque handle the peer sees.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct ResumeTicketRecord {
    /// Verified caller principal at the moment of suspend. The
    /// `klieo/run/resume` authz gate compares the resuming caller's
    /// `Identity::as_str()` against this value (a plain string compare
    /// is safe — the unguessable lookup secret is the random token, not
    /// the principal).
    pub principal: String,
    /// Name of the workflow that suspended. Used by the resume path
    /// to look up the right [`crate::workflow::WorkflowResumeHandle`].
    pub workflow_name: String,
    /// Continuation state needed to drive
    /// `klieo_core::runtime::resume_from_checkpoint`.
    pub checkpoint: RunCheckpoint,
    /// Suspend wall-clock, for downstream TTL/GC.
    pub created_at: DateTime<Utc>,
}

/// KV-backed one-shot ticket store. Cloneable via the `Arc<dyn KvStore>`
/// underneath.
pub struct ResumeTicketStore {
    kv: Arc<dyn KvStore>,
}

impl ResumeTicketStore {
    /// Build a fresh store over the supplied KV. The bucket is
    /// fixed at [`RESUME_TICKET_BUCKET`].
    pub fn new(kv: Arc<dyn KvStore>) -> Self {
        Self { kv }
    }

    /// Mint a fresh 256-bit opaque token (URL-safe base64, no
    /// padding). Caller persists it via [`Self::persist`]; the token
    /// IS the credential, so never log it.
    pub fn mint_token() -> String {
        let mut bytes = [0u8; TOKEN_ENTROPY_BYTES];
        OsRng.fill_bytes(&mut bytes);
        URL_SAFE_NO_PAD.encode(bytes)
    }

    /// Persist `record` under `token`. CAS with `expected = None` so a
    /// freak token collision (effectively impossible at 256 bits, but
    /// still a fail-closed invariant) is rejected rather than
    /// overwriting an existing ticket.
    pub async fn persist(
        &self,
        token: &str,
        record: &ResumeTicketRecord,
    ) -> Result<(), TicketStoreError> {
        let bytes = serde_json::to_vec(record).map_err(TicketStoreError::Encode)?;
        match self
            .kv
            .cas(RESUME_TICKET_BUCKET, token, Bytes::from(bytes), None)
            .await
        {
            Ok(_) => Ok(()),
            Err(err) => Err(TicketStoreError::Backend(err)),
        }
    }

    /// Read the record under `token` without consuming it. Used by
    /// the resume path to run the principal-authz gate BEFORE the
    /// atomic claim — leaking neither "ticket unknown" nor "wrong
    /// principal" through the wire envelope (same fail-closed
    /// message in both cases at the call site).
    pub async fn peek(
        &self,
        token: &str,
    ) -> Result<Option<ResumeTicketRecord>, TicketStoreError> {
        let entry = self
            .kv
            .get(RESUME_TICKET_BUCKET, token)
            .await
            .map_err(TicketStoreError::Backend)?;
        let Some(entry) = entry else {
            return Ok(None);
        };
        if entry.value.as_ref() == CLAIMED_TOMBSTONE {
            return Ok(None);
        }
        let record = serde_json::from_slice::<ResumeTicketRecord>(&entry.value)
            .map_err(TicketStoreError::Decode)?;
        Ok(Some(record))
    }

    /// Atomically remove-and-return the record under `token`. Exactly
    /// one concurrent caller wins; every other call returns `Ok(None)`.
    /// The CAS replaces the entry with [`CLAIMED_TOMBSTONE`] using the
    /// observed revision; on success the entry is best-effort deleted
    /// for cleanup. A loser of the CAS race (`BusError::CasConflict`)
    /// is mapped to `Ok(None)` so the caller cannot tell a race apart
    /// from "ticket unknown" — IDOR parity is preserved (CWE-639 /
    /// CWE-367).
    pub async fn claim(
        &self,
        token: &str,
    ) -> Result<Option<ResumeTicketRecord>, TicketStoreError> {
        let entry = self
            .kv
            .get(RESUME_TICKET_BUCKET, token)
            .await
            .map_err(TicketStoreError::Backend)?;
        let Some(entry) = entry else {
            return Ok(None);
        };
        if entry.value.as_ref() == CLAIMED_TOMBSTONE {
            return Ok(None);
        }
        let record = serde_json::from_slice::<ResumeTicketRecord>(&entry.value)
            .map_err(TicketStoreError::Decode)?;
        match self
            .kv
            .cas(
                RESUME_TICKET_BUCKET,
                token,
                Bytes::from_static(CLAIMED_TOMBSTONE),
                Some(entry.revision),
            )
            .await
        {
            Ok(_) => {}
            Err(BusError::CasConflict { .. }) => return Ok(None),
            Err(err) => return Err(TicketStoreError::Backend(err)),
        }
        if let Err(err) = self.kv.delete(RESUME_TICKET_BUCKET, token).await {
            tracing::warn!(
                target: "klieo.mcp.resume_ticket",
                error = %err,
                "best-effort delete after ticket claim failed; tombstone remains",
            );
        }
        Ok(Some(record))
    }
}

/// Typed error from ticket-store ops. Public so the call site can
/// pattern-match without source-chain downcasting; the wire surface
/// flattens every variant to the same opaque envelope (CWE-209).
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum TicketStoreError {
    /// Serializing a record to JSON failed.
    #[error("resume-ticket encode failed")]
    Encode(#[source] serde_json::Error),
    /// A persisted ticket payload could not be parsed as a record.
    #[error("resume-ticket decode failed")]
    Decode(#[source] serde_json::Error),
    /// The underlying KV returned an error.
    #[error("resume-ticket KV failure")]
    Backend(#[source] BusError),
}

#[cfg(test)]
mod tests {
    use super::*;
    use klieo_core::test_utils::fake_kv;

    fn record_for(principal: &str) -> ResumeTicketRecord {
        // `RunCheckpoint` is `#[non_exhaustive]` outside its defining
        // crate; the legacy serialised form (pre-resume_attempted)
        // round-trips via Deserialize, which gives the test a real
        // checkpoint without touching the crate-private ctor.
        let cp_json = serde_json::json!({
            "run_id": klieo_core::ids::RunId::new(),
            "step_index": 1,
            "thread_id": "t-test",
            "messages": [],
            "pending_tool_calls": null,
            "created_at": "2026-06-18T00:00:00Z",
        });
        let checkpoint = serde_json::from_value(cp_json).unwrap();
        ResumeTicketRecord {
            principal: principal.to_string(),
            workflow_name: "wf-test".into(),
            checkpoint,
            created_at: Utc::now(),
        }
    }

    #[test]
    fn mint_token_yields_256_bits_of_url_safe_base64() {
        let token = ResumeTicketStore::mint_token();
        let decoded = URL_SAFE_NO_PAD.decode(token.as_bytes()).unwrap();
        assert_eq!(
            decoded.len(),
            TOKEN_ENTROPY_BYTES,
            "minted token must decode to exactly {TOKEN_ENTROPY_BYTES} bytes"
        );
        let again = ResumeTicketStore::mint_token();
        assert_ne!(token, again, "two mints must not collide");
    }

    #[tokio::test]
    async fn persist_then_peek_returns_original_record() {
        let store = ResumeTicketStore::new(fake_kv());
        let token = ResumeTicketStore::mint_token();
        let record = record_for("alice@x");
        store.persist(&token, &record).await.unwrap();
        let back = store.peek(&token).await.unwrap().expect("ticket present");
        assert_eq!(back.principal, "alice@x");
        assert_eq!(back.workflow_name, "wf-test");
    }

    #[tokio::test]
    async fn peek_unknown_token_is_none() {
        let store = ResumeTicketStore::new(fake_kv());
        let back = store.peek("never-persisted").await.unwrap();
        assert!(back.is_none());
    }

    #[tokio::test]
    async fn claim_consumes_ticket_exactly_once() {
        let store = ResumeTicketStore::new(fake_kv());
        let token = ResumeTicketStore::mint_token();
        store.persist(&token, &record_for("alice@x")).await.unwrap();
        let first = store.claim(&token).await.unwrap();
        assert!(
            first.is_some(),
            "first claim must surface the persisted record"
        );
        let second = store.claim(&token).await.unwrap();
        assert!(
            second.is_none(),
            "second claim of the same token must return None — one-shot"
        );
        let peek = store.peek(&token).await.unwrap();
        assert!(
            peek.is_none(),
            "a consumed ticket peeks as absent so resume cannot re-authz against it"
        );
    }

    #[tokio::test]
    async fn concurrent_claim_yields_exactly_one_winner() {
        let store = Arc::new(ResumeTicketStore::new(fake_kv()));
        let token = ResumeTicketStore::mint_token();
        store.persist(&token, &record_for("alice@x")).await.unwrap();

        let claimers: Vec<_> = (0..8)
            .map(|_| {
                let store = store.clone();
                let token = token.clone();
                tokio::spawn(async move { store.claim(&token).await })
            })
            .collect();

        let mut winners = 0usize;
        for handle in claimers {
            let outcome = handle.await.unwrap().unwrap();
            if outcome.is_some() {
                winners += 1;
            }
        }
        assert_eq!(
            winners, 1,
            "exactly one concurrent claim must surface the record; got {winners}"
        );
    }
}