Skip to main content

klieo_mcp_server/
lib.rs

1#![deny(missing_docs)]
2#![deny(rustdoc::broken_intra_doc_links)]
3//! Expose any klieo [`ToolInvoker`] as an MCP server over stdio or HTTP (see the `http` cargo feature).
4//!
5//! This crate is the inverse of `klieo-tools-mcp`: instead of a Rust
6//! host calling external MCP servers, it lets external MCP hosts
7//! (Claude Desktop, Continue, LangGraph, OpenAI Agents SDK) call
8//! into a Rust-built klieo `ToolInvoker`.
9//!
10//! Polyglot strategy: klieo exposes three wire-level contracts so
11//! non-Rust hosts can integrate without depending on klieo crates —
12//! the NATS bus (durable pipelines), this MCP server (stdio/HTTP
13//! tool calls), and the A2A peer protocol (JSON-RPC). This crate
14//! covers the MCP-server contract.
15//!
16//! ## Scope
17//!
18//! - **stdio transport** — newline-delimited JSON-RPC 2.0 frames on
19//!   stdin/stdout, matching the MCP reference spec.
20//! - **`tools/list`** — derived from the invoker's catalogue.
21//! - **`tools/call`** — dispatches to `ToolInvoker::invoke` with a
22//!   `ToolCtx` minted per request from the
23//!   [`McpServerBuilder::with_tool_ctx_factory`]-supplied factory.
24//!   Default factory is the in-memory noop wiring (`Pubsub`/`KvStore`/
25//!   `JobQueue` from `klieo-bus-memory`, fresh per call). Override
26//!   the factory to expose tools that need real bus access.
27//! - **`initialize` / `shutdown`** — standard MCP handshake.
28//!
29//! Out-of-scope:
30//! - SSE streaming responses (HTTP transport returns JSON only).
31//! - Multi-tenant auth (wrap `router()` with a tower auth layer, or front with an auth-enforcing reverse proxy).
32//!
33//! ## Agent exposure (0.9, ADR-010)
34//!
35//! `McpServer::expose_agent_with_schema(agent, schema, ctx_factory)`
36//! wraps any [`klieo_core::Agent`] as a single MCP tool whose
37//! `inputSchema` is the caller-supplied JSON Schema. The
38//! `ctx_factory` closure is called per `tools/call` to mint a
39//! fresh [`klieo_core::agent::AgentContext`] (so each invocation gets its own RunId).
40//!
41//! Behind the `schemars` cargo feature, `expose_agent::<A>(agent,
42//! ctx_factory)` derives the schema automatically via
43//! `schema_for!(A::Input)`. See ADR-010 for the trade-off.
44
45#[cfg(feature = "http")]
46pub(crate) mod http;
47
48pub(crate) mod outbound;
49
50#[cfg(feature = "http")]
51pub(crate) mod outbound_ring;
52
53pub(crate) mod session;
54
55pub(crate) mod workflow;
56
57// Inbound per-tenant LLM budget governor. Wired through
58// `McpServerBuilder::with_governor`; gated on `feature = "governor"`
59// so adopters that bring their own rate-limiting layer keep the lean
60// default dep graph.
61#[cfg(feature = "governor")]
62pub(crate) mod governor;
63
64// The HTTP `klieo/run/resume` handler is the only production consumer;
65// stdio builds still need the module compiled so the tests + workflow
66// suspend path can mint tickets, but the read-side API
67// (`peek` / `claim` / decode-error variant) only fires under
68// `--features http`.
69#[cfg_attr(not(feature = "http"), allow(dead_code))]
70pub(crate) mod resume_ticket;
71
72pub mod outbound_sink;
73pub use outbound_sink::{OutboundFrameSink, OutboundSinkError};
74
75pub mod sampling;
76pub use sampling::{
77    ModelHint, ModelPreferences, SamplingContent, SamplingMessage, SamplingRequest,
78    SamplingResponse,
79};
80
81pub mod roots;
82pub use roots::Root;
83
84pub mod outbound_ext;
85pub use outbound_ext::McpOutboundExt;
86
87/// Re-export the cluster-0.24 follower-side orphan re-invoke gate
88/// for integration tests. Gated on `test-fixtures` so production
89/// callers cannot reach the helper. See [`crate::http::OrphanOutcome`]
90/// + [`crate::http::handle_dead_leader_orphan_mcp`] for the contract.
91#[cfg(all(feature = "http", feature = "test-fixtures"))]
92pub use http::{handle_dead_leader_orphan_mcp, OrphanOutcome};
93
94#[cfg(all(feature = "http", feature = "bench"))]
95pub use http::encode_sse_frame;
96#[cfg(feature = "bench")]
97pub use outbound_sink::bench_stdio_sink;
98
99/// Scans a replay-buffer snapshot and returns entries with `event_id >
100/// since_id`. Approximates the per-frame scan cost of the SSE replay
101/// path without HTTP header parsing or lock acquisition overhead.
102/// Available only under the `bench` feature.
103#[cfg(all(feature = "http", feature = "bench"))]
104pub fn bench_filter_replay(
105    entries: &[(u64, std::sync::Arc<serde_json::Value>)],
106    since_id: u64,
107) -> Vec<(u64, std::sync::Arc<serde_json::Value>)> {
108    entries
109        .iter()
110        .filter(|(id, _)| *id > since_id)
111        .cloned()
112        .collect()
113}
114
115use async_trait::async_trait;
116use klieo_core::agent::Agent;
117use klieo_core::error::ToolError;
118use klieo_core::llm::ToolDef;
119use klieo_core::tool::{ToolCtx, ToolInvoker};
120use std::sync::Arc;
121use thiserror::Error;
122use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
123use tokio::sync::Mutex;
124use tokio_util::sync::CancellationToken;
125use tracing::warn;
126
127/// JSON-RPC 2.0 standard error codes used in handler dispatch.
128/// See https://www.jsonrpc.org/specification#error_object.
129pub(crate) const JSONRPC_PARSE_ERROR: i64 = -32700;
130const JSONRPC_METHOD_NOT_FOUND: i64 = -32601;
131#[cfg(feature = "http")]
132pub(crate) const JSONRPC_INVALID_PARAMS: i64 = -32602;
133pub(crate) const JSONRPC_SERVER_ERROR: i64 = -32000;
134/// Klieo-specific: resume requested with an expired window.
135/// Maps to JSON-RPC application code `-32011`. Used by `stream_resume`
136/// in the HTTP transport.
137#[cfg(feature = "http")]
138pub(crate) const JSONRPC_RESUME_BUFFER_EXPIRED: i64 = -32011;
139/// Klieo-specific: resume requested for an unknown progressToken.
140/// Maps to JSON-RPC application code `-32012`. Used by `stream_resume`
141/// in the HTTP transport.
142#[cfg(feature = "http")]
143pub(crate) const JSONRPC_RESUME_BUFFER_NOT_FOUND: i64 = -32012;
144/// Klieo-specific: stream leader (originating replica running the
145/// invoke) died or never claimed leadership. Follower observed an
146/// orphaned resume buffer + wrote a terminal frame so the client sees
147/// clean termination + can retry. Mirrors `codes::LEADER_DIED` on
148/// the A2A side. ADR-020.
149#[cfg(feature = "http")]
150pub(crate) const JSONRPC_LEADER_DIED: i64 = -32099;
151/// Klieo-specific: a second `initialize` arrived while the HTTP
152/// transport already owns an active session. Returned in the JSON-RPC
153/// error envelope alongside a `409 Conflict` HTTP status. Renumbered
154/// from `-32099` to `-32002` in 0.28 to resolve the collision with
155/// [`JSONRPC_LEADER_DIED`]; LEADER_DIED kept the older slot because
156/// it has the longer external visibility. ADR-028, ADR-029.
157#[cfg(feature = "http")]
158pub(crate) const JSONRPC_SESSION_CONFLICT: i64 = -32002;
159/// JSON-RPC application code returned when an
160/// [`klieo_auth_common::Authenticator`] wired via
161/// [`McpServerBuilder::with_authenticator`] rejects a request —
162/// either `authenticate(headers)` failed or `authorize_method`
163/// denied the principal for the requested method. Mirrors
164/// `klieo_auth_common UNAUTHENTICATED (-32001)` so both
165/// transports surface auth failures with the same wire code. ADR-021.
166#[cfg(feature = "http")]
167pub(crate) const JSONRPC_UNAUTHENTICATED: i64 = -32001;
168
169// Compile-time uniqueness: every JSONRPC_* code listed here must be
170// distinct. Adding a new code with a duplicate value is a compile
171// error.
172#[cfg(feature = "http")]
173const _: () = {
174    let codes: [i64; 9] = [
175        JSONRPC_PARSE_ERROR,
176        JSONRPC_METHOD_NOT_FOUND,
177        JSONRPC_INVALID_PARAMS,
178        JSONRPC_SERVER_ERROR,
179        JSONRPC_UNAUTHENTICATED,
180        JSONRPC_RESUME_BUFFER_EXPIRED,
181        JSONRPC_RESUME_BUFFER_NOT_FOUND,
182        JSONRPC_LEADER_DIED,
183        JSONRPC_SESSION_CONFLICT,
184    ];
185    let mut i = 0;
186    while i < codes.len() {
187        let mut j = i + 1;
188        while j < codes.len() {
189            assert!(codes[i] != codes[j], "JSONRPC_* code collision");
190            j += 1;
191        }
192        i += 1;
193    }
194};
195
196/// Leader-claim TTL for the `klieo-leaders` KV bucket used by
197/// `stream_tools_call` (claim on invoke) and `stream_resume` (orphan
198/// check on resume). Operators MUST configure the JetStream KV bucket
199/// with `max_age` equal to this value so a dead replica's leader
200/// entry evicts automatically; the registry's heartbeat runs every
201/// `TTL / 2`. See ADR-020.
202pub const LEADER_TTL: std::time::Duration = std::time::Duration::from_secs(5);
203
204/// Leader-key prefix for the MCP transport in the shared
205/// `klieo-leaders` bucket. Mirrors `a2a.<task_id>` on the A2A side.
206pub const MCP_LEADER_KEY_PREFIX: &str = "mcp.";
207
208/// MCP protocol revision advertised on every `initialize` response.
209///
210/// Pinned to the 2025-03-26 Streamable HTTP revision the HTTP
211/// transport implements (`Mcp-Session-Id` lifecycle + SSE upgrade).
212/// The single source for the `protocolVersion` field — the
213/// conformance test fails if any handshake site diverges from it.
214pub const MCP_PROTOCOL_VERSION: &str = "2025-03-26";
215
216/// Top-level error from the MCP server loop.
217#[derive(Debug, Error)]
218#[non_exhaustive]
219pub enum McpServerError {
220    /// stdin/stdout I/O failure.
221    #[error("io error: {0}")]
222    Io(#[from] std::io::Error),
223    /// JSON-RPC payload could not be decoded.
224    #[error("json decode error: {0}")]
225    Json(#[from] serde_json::Error),
226    /// Klieo-specific: resume requested with an expired window.
227    /// Maps to JSON-RPC application code `-32011`.
228    #[error("resume window expired (since_id={since_id})")]
229    ResumeBufferExpired {
230        /// Cursor the caller supplied.
231        since_id: u64,
232    },
233
234    /// Klieo-specific: resume requested for an unknown progressToken.
235    /// Maps to JSON-RPC application code `-32012`.
236    #[error("no buffered stream for progressToken: {0}")]
237    ResumeBufferNotFound(String),
238
239    /// Underlying tool invocation failed.
240    #[error("tool error: {0}")]
241    Tool(#[from] ToolError),
242
243    /// Caller-supplied subject segment (progressToken, task id, …)
244    /// failed [`klieo_core::validate_subject_token`] — contains a
245    /// reserved metacharacter (`.`, `*`, `>`), whitespace, or
246    /// non-ASCII byte. Split out of [`McpServerError::Bus`] so
247    /// callers can distinguish caller-input validation failures
248    /// (permanent — retry would fail identically) from genuine
249    /// bus transport failures.
250    ///
251    /// The typed cause is reachable via `Error::source()` —
252    /// `BusError::Invalid(_)`.
253    #[error("invalid subject token: {0}")]
254    InvalidSubject(#[source] klieo_core::BusError),
255
256    /// Bus publish or subscribe transport-class failure (connection,
257    /// timeout, retryable, permanent, …). The wire envelope maps
258    /// this to JSON-RPC `SERVER_ERROR` (`-32000`); the typed cause
259    /// stays on `e.source()` for downstream tracing.
260    ///
261    /// Caller-input validation failures
262    /// ([`klieo_core::BusError::Invalid`]) are surfaced as
263    /// [`McpServerError::InvalidSubject`] instead.
264    #[error("bus error: {0}")]
265    Bus(#[source] klieo_core::BusError),
266
267    /// Outbound request timed out (peer didn't respond within the
268    /// configured deadline).
269    #[error("outbound request timed out")]
270    OutboundTimeout,
271
272    /// Peer returned an MCP-spec error envelope on an outbound
273    /// request.
274    #[error("client returned error: code={code} message={message}")]
275    ClientReturnedError {
276        /// JSON-RPC error code returned by the peer.
277        code: i64,
278        /// JSON-RPC error message returned by the peer.
279        message: String,
280    },
281
282    /// Transport closed while an outbound request was in flight.
283    #[error("transport closed")]
284    TransportClosed,
285
286    /// The transport carrying this server does not support
287    /// server-initiated outbound requests (e.g. HTTP today).
288    #[error("outbound channel unsupported on this transport")]
289    OutboundUnsupported,
290
291    /// The outbound request frame could not be serialised. Indicates an
292    /// internal invariant violation rather than a transport or capability failure.
293    #[error("outbound serialisation failed: {0}")]
294    OutboundSerialisation(#[source] serde_json::Error),
295
296    /// Failed to serialise a sampling request to JSON.
297    #[error("failed to serialise sampling request: {0}")]
298    SamplingSerialise(serde_json::Error),
299
300    /// Failed to deserialise the client's sampling response.
301    #[error("failed to deserialise sampling response: {0}")]
302    SamplingDeserialise(serde_json::Error),
303}
304
305impl From<klieo_core::ServerOutboundError> for McpServerError {
306    fn from(e: klieo_core::ServerOutboundError) -> Self {
307        use klieo_core::ServerOutboundError as E;
308        match e {
309            E::Timeout => McpServerError::OutboundTimeout,
310            E::PeerError { code, message } => McpServerError::ClientReturnedError { code, message },
311            E::TransportClosed => McpServerError::TransportClosed,
312            E::Unsupported => McpServerError::OutboundUnsupported,
313            E::Serialisation(err) => McpServerError::OutboundSerialisation(err),
314            // `ServerOutboundError` is `#[non_exhaustive]`. Future
315            // variants surface as `OutboundUnsupported` until an
316            // explicit arm is added here.
317            _ => McpServerError::OutboundUnsupported,
318        }
319    }
320}
321
322impl From<klieo_core::BusError> for McpServerError {
323    fn from(e: klieo_core::BusError) -> Self {
324        match e {
325            klieo_core::BusError::Invalid(_) => Self::InvalidSubject(e),
326            other => Self::Bus(other),
327        }
328    }
329}
330
331/// Errors returned by [`McpServerBuilder::build`] and [`McpServerBuilder::build_arc`].
332#[derive(Debug, thiserror::Error)]
333#[non_exhaustive]
334pub enum McpBuildError {
335    /// `with_cancel_subscription` was set but `build()` (not `build_arc()`) was called.
336    #[error("with_cancel_subscription requires build_arc()")]
337    CancelRequiresArc,
338    /// No invokers registered — call `add_tools`, `add_agent`, or `add_agent_with_schema` first.
339    #[error("at least one invoker required; call add_tools or add_agent before build")]
340    NoInvokers,
341    /// Two agents/invokers share a tool name — rename or split the colliding agent.
342    #[error("duplicate tool name {0:?} across registered invokers")]
343    DuplicateTool(String),
344    /// The builder's `profile(..)` requirements were not met (e.g. regulated
345    /// profile without tenant binding or with an anonymous authenticator).
346    #[error(transparent)]
347    RegulatedProfile(#[from] klieo_core::ProfileViolation),
348    /// A workflow was registered via `add_workflow_with_schema` /
349    /// `add_workflow` without first calling
350    /// [`McpServerBuilder::with_hitl`]. The workflow path drives
351    /// [`klieo_hitl::run_with_hitl`], which requires a shared HITL
352    /// client + config; without them every suspension would fail with
353    /// no compliance endpoint to submit to.
354    #[error("workflow registered without with_hitl(..); call with_hitl before build")]
355    WorkflowWithoutHitl,
356    /// A workflow was registered without a wired
357    /// [`McpServerBuilder::with_governor`]. The build hard-gates
358    /// workflow exposure on a per-tenant LLM budget: an inbound MCP
359    /// caller must NOT drive unbounded paid LLM spend. The error is
360    /// emitted regardless of the `governor` cargo feature — the build
361    /// rejects the unsafe configuration both ways so adopters can't
362    /// silently fall through by toggling a feature flag.
363    #[error("workflow registered without with_governor(..); call with_governor before build")]
364    WorkflowWithoutGovernor,
365}
366
367/// MCP server that exposes a `klieo-core` `ToolInvoker` as a stdio
368/// MCP server.
369///
370/// ## Construction
371///
372/// ```no_run
373/// # use std::sync::Arc;
374/// # use klieo_core::tool::ToolInvoker;
375/// # async fn _ex(invoker: Arc<dyn ToolInvoker>) {
376/// use klieo_mcp_server::McpServer;
377/// let server = Arc::new(McpServer::expose_tools(invoker));
378/// server.serve_stdio().await.expect("server loop");
379/// # }
380/// ```
381pub struct McpServer {
382    pub(crate) invoker: Arc<dyn ToolInvoker>,
383    tool_ctx_factory: ToolCtxFactory,
384    pub(crate) parent_cancel: CancellationToken,
385    pub(crate) resume_buffer: std::sync::Arc<dyn klieo_core::resume::ResumeBuffer>,
386    pub(crate) pubsub: std::sync::Arc<dyn klieo_core::Pubsub>,
387    pub(crate) cancel_registry: klieo_core::CancelRegistry<String>,
388    /// Bounds in-flight SSE-frame fanout publishes on the per-
389    /// progressToken bus subject (`klieo.mcp.progress.{token}`) and
390    /// drop-time cross-replica cancel publishes. Shared with
391    /// [`crate::http::spawn_publish`] and the `CancelOnDrop` body
392    /// so every replica caps fanout work at the same value;
393    /// saturation drops the publish + emits a `warn`. Default
394    /// [`DEFAULT_PUBLISH_PERMITS`] (64); configurable via
395    /// [`McpServerBuilder::with_publish_concurrency`].
396    #[cfg(feature = "http")]
397    pub(crate) publish_permits: std::sync::Arc<tokio::sync::Semaphore>,
398    pub(crate) leader_registry: Option<klieo_core::LeaderRegistry>,
399    pub(crate) ownership_registry: Option<klieo_core::OwnershipRegistry>,
400    /// One-shot ticket store keyed under `klieo.mcp.resume-tickets`,
401    /// populated when the builder was wired with
402    /// [`McpServerBuilder::with_checkpoint_kv`]. `None` keeps the
403    /// slice-1 no-ticket suspend behaviour. Read only by the
404    /// HTTP `klieo/run/resume` handler; gated dead-code accordingly.
405    #[cfg_attr(not(feature = "http"), allow(dead_code))]
406    pub(crate) resume_ticket_store: Option<Arc<crate::resume_ticket::ResumeTicketStore>>,
407    /// Resume-side workflow handles keyed by `workflow_name`.
408    /// Populated only when at least one workflow was registered AND
409    /// the builder was wired with both `with_hitl` + the workflow
410    /// registration path. Read by the HTTP `klieo/run/resume`
411    /// handler after the ticket-bound authz gate has cleared.
412    #[cfg_attr(not(feature = "http"), allow(dead_code))]
413    pub(crate) workflow_resume_handles:
414        std::collections::HashMap<String, Arc<dyn crate::workflow::WorkflowResumeHandle>>,
415    /// Optional [`klieo_auth_common::Authenticator`] wired into the
416    /// HTTP entry path via
417    /// [`McpServerBuilder::with_authenticator`]. `post_mcp` calls
418    /// `authenticate(headers)` + `authorize_method(&identity, method)`
419    /// before dispatch / SSE upgrade when this is `Some`. None
420    /// preserves the pre-0.21 caller-owned-middleware contract
421    /// (no auth applied by the server).
422    pub(crate) authenticator: Option<Arc<dyn klieo_auth_common::Authenticator>>,
423    pub(crate) leader_ttl: std::time::Duration,
424    pub(crate) leader_heartbeat_interval: std::time::Duration,
425    pub(crate) max_failover_attempts: u32,
426    pub(crate) kv_reaper_interval: Option<std::time::Duration>,
427    /// Held for Drop semantics — Drop aborts the background scan
428    /// task spawned by [`McpServerBuilder::with_kv_reaper`]. `None`
429    /// when the builder was not asked to spawn the reaper or when
430    /// neither leader-election nor tenant-binding was wired (no
431    /// bucket to scan).
432    _kv_reaper: Option<klieo_core::KvReaperHandle>,
433    /// Stdio transport's per-process session container. Populated by
434    /// [`Self::ensure_outbound_and_roots`] on stdio entry and holds
435    /// the same `Session` shape as the HTTP registry entries.
436    /// Remains empty for HTTP-only servers; stdio servers populate
437    /// once per process lifetime.
438    pub(crate) stdio_session: tokio::sync::OnceCell<std::sync::Arc<crate::session::Session>>,
439    /// HTTP session registry, keyed by minted `Mcp-Session-Id` UUID.
440    /// Populated by `handle_initialize_post` when a fresh session
441    /// passes the cap check; evicted by `delete_mcp` on explicit
442    /// client teardown or by the idle reaper when the per-session
443    /// watchdog fires. Stdio transports leave this empty.
444    #[cfg(feature = "http")]
445    pub(crate) sessions: std::sync::Arc<
446        tokio::sync::RwLock<
447            std::collections::HashMap<uuid::Uuid, std::sync::Arc<crate::session::Session>>,
448        >,
449    >,
450    /// Hard cap on concurrent HTTP sessions held in
451    /// [`Self::sessions`]. Read by `handle_initialize_post` to
452    /// decide whether a fresh `initialize` POST proceeds or yields
453    /// 503. Configured via
454    /// [`McpServerBuilder::with_max_sessions`]; defaults to
455    /// [`DEFAULT_MAX_SESSIONS`].
456    #[cfg(feature = "http")]
457    pub(crate) max_sessions: usize,
458    /// Hard cap on concurrent HTTP sessions per authenticated
459    /// principal. Resolved at build time from
460    /// [`McpServerBuilder::with_max_sessions_per_principal`] or the
461    /// default [`default_max_sessions_per_principal`] applied to
462    /// `max_sessions`. Read by `handle_initialize_post` to decide
463    /// whether a fresh `initialize` POST from an already-tracked
464    /// principal proceeds or yields 503.
465    #[cfg(feature = "http")]
466    pub(crate) max_sessions_per_principal: usize,
467    /// Per-session SSE replay buffer capacity in frames. Resolved at
468    /// build time from the builder; default
469    /// [`DEFAULT_SSE_REPLAY_CAPACITY`]. Setting to 0 disables SSE
470    /// resumption.
471    #[cfg(feature = "http")]
472    pub(crate) sse_replay_capacity: usize,
473    /// Per-principal session count cache. Keyed by the verified
474    /// [`klieo_auth_common::Identity::as_str`] value of the principal
475    /// that minted the session; auth-disabled deployments leave this
476    /// map empty.
477    ///
478    /// Lock acquisition order: [`Self::sessions`] write FIRST, then
479    /// `principal_counts` write. Never the reverse. Either may be
480    /// taken alone. Eviction paths decrement OUTSIDE the `sessions`
481    /// critical section to preserve the invariant.
482    #[cfg(feature = "http")]
483    pub(crate) principal_counts:
484        std::sync::Arc<tokio::sync::RwLock<std::collections::HashMap<String, usize>>>,
485    /// Idempotent spawn sentinel for the idle-session reaper task.
486    /// The first call to `ensure_idle_reaper` populates this cell
487    /// and spawns the background scan; subsequent calls are no-ops
488    /// so duplicate `initialize` POSTs do not spawn duplicate
489    /// reapers.
490    #[cfg(feature = "http")]
491    pub(crate) idle_reaper_started: tokio::sync::OnceCell<()>,
492    /// Set by [`McpServerBuilder::with_client_sampling`]. Gates the
493    /// `capabilities.sampling = {}` field on the initialize response
494    /// and signals to the stdio loop that outbound correlation should
495    /// be wired when the transport mints its shared writer.
496    pub(crate) declare_sampling: bool,
497    /// Shared stdout writer for the stdio transport. Primed by
498    /// [`Self::serve_with_streams`] on the first call so
499    /// [`Self::ensure_outbound_and_roots`] hands the same `Arc` to the
500    /// outbound primitive — preventing interleaved writes between the
501    /// inbound reply path and the outbound request path on a single
502    /// underlying stream.
503    pub(crate) stdout_writer: tokio::sync::OnceCell<crate::outbound::SharedWriter>,
504    /// Per-server snapshot of client-declared capabilities, parsed from
505    /// the `initialize` request's `params.capabilities` payload. Read
506    /// by the `notifications/initialized` arm to decide whether to
507    /// drive the initial `roots/list` fetch.
508    pub(crate) client_caps: tokio::sync::Mutex<ClientCaps>,
509    /// Idle deadline for the active session. Zero disables the
510    /// watchdog entirely. Read by the GET handler and the watchdog
511    /// task spawned on `initialize`. ADR-028.
512    #[cfg(feature = "http")]
513    pub(crate) session_idle_timeout: std::time::Duration,
514    /// Tick cadence for the idle-reaper background task. Defaults to
515    /// 10 seconds in production builds; test builds can shorten this
516    /// through [`McpServerBuilder::with_idle_reaper_tick`] so short
517    /// `session_idle_timeout` values trip within the test deadline.
518    #[cfg(feature = "http")]
519    pub(crate) idle_reaper_tick: std::time::Duration,
520    /// Reference instant captured at server build time. Each HTTP
521    /// session's `last_activity_millis` is encoded as millis since
522    /// this anchor; the idle reaper compares `server_start.elapsed()`
523    /// against per-session values to decide eviction. Monotonic
524    /// (`Instant`) so wall-clock jumps cannot corrupt the comparison.
525    #[cfg(feature = "http")]
526    pub(crate) server_start: std::time::Instant,
527}
528
529/// Per-server snapshot of client-declared MCP capabilities parsed
530/// from the `initialize` request. Held under
531/// [`McpServer::client_caps`] (a `tokio::sync::Mutex`) so the
532/// `initialize` write and the `notifications/initialized` read
533/// cannot race.
534#[derive(Default, Debug)]
535pub(crate) struct ClientCaps {
536    /// True iff the client advertised `capabilities.roots` (any
537    /// non-null payload) on `initialize`. Gates the spawned
538    /// initial-`roots/list` fetch on `notifications/initialized`.
539    pub roots_supported: bool,
540}
541
542/// Factory for fresh [`klieo_core::agent::AgentContext`] values,
543/// called once per `tools/call` so each invocation gets its own
544/// `RunId` + cancel token. Caller-owned to keep `klieo-mcp-server`
545/// free of opinions about which memory / bus / llm backends an
546/// agent runs against.
547pub type AgentContextFactory =
548    Arc<dyn Fn() -> klieo_core::agent::AgentContext + Send + Sync + 'static>;
549
550/// Factory for fresh [`klieo_core::tool::ToolCtx`] values, called
551/// once per `tools/call` so each invocation gets its own
552/// `Pubsub` / `KvStore` / `JobQueue` (or a shared one, caller's
553/// choice). Default = `default_tool_ctx_factory` which mints
554/// the same per-request in-memory wiring `serve_stdio` has used
555/// since 0.8. Override via
556/// [`McpServerBuilder::with_tool_ctx_factory`] to expose
557/// bus-using tools through this server.
558pub type ToolCtxFactory = Arc<dyn Fn() -> klieo_core::tool::ToolCtx + Send + Sync + 'static>;
559
560/// Returns the default [`ToolCtxFactory`]: a per-request in-memory
561/// `Pubsub` / `KvStore` / `JobQueue`, equivalent to what
562/// `serve_stdio` used before the factory was introduced.
563fn default_tool_ctx_factory() -> ToolCtxFactory {
564    Arc::new(noop_ctx)
565}
566
567/// Builder for [`McpServer`]. Collects one or more invokers (raw
568/// `ToolInvoker`s or `Agent`s wrapped as single-tool invokers) and
569/// configures server-level concerns such as a parent
570/// [`CancellationToken`] that propagates into every minted
571/// `AgentContext`.
572///
573/// ## Why a builder
574///
575/// The original 0.9.0 surface offered four constructors along two
576/// orthogonal axes (schema-explicit vs schemars-derive, cancellable
577/// vs non-cancellable). Adding a third axis (multi-agent / multi-
578/// tool) would have produced eight constructors. The builder
579/// collapses those axes into a single fluent API.
580///
581/// ## Multi-invoker dispatch
582///
583/// Repeated `add_agent_with_schema` / `add_agent` / `add_tools` calls
584/// accumulate invokers; [`Self::build`] wraps them in a private
585/// `MergedInvoker` that merges their catalogues and routes each
586/// `tools/call` to the inner invoker that claims the named tool.
587/// Tool-name collisions across invokers are rejected at `build` time
588/// (caller bug — fix by renaming or splitting the colliding agent).
589/// Default per-server cap on concurrent in-flight SSE-frame fanout
590/// publishes and drop-time cross-replica cancel publishes. Bounds the
591/// runtime work `stream_tools_call` can pile up when a slow bus
592/// backend (e.g. stalled NATS) lets publishes accumulate. Configurable
593/// via [`McpServerBuilder::with_publish_concurrency`]; mirrors the
594/// same default applied to `A2aDispatcher` so both transports share
595/// the cap.
596#[cfg(feature = "http")]
597const DEFAULT_PUBLISH_PERMITS: usize = 64;
598
599/// Default idle deadline for an HTTP streamable session before the
600/// per-session watchdog forcibly closes it. Configurable via
601/// [`McpServerBuilder::with_session_idle_timeout`]. ADR-028.
602#[cfg(feature = "http")]
603pub(crate) const DEFAULT_SESSION_IDLE_TIMEOUT: std::time::Duration =
604    std::time::Duration::from_secs(300);
605
606/// Hard cap on concurrent HTTP sessions held in
607/// `McpServer::sessions`. Configurable via
608/// [`McpServerBuilder::with_max_sessions`]. Reaching the cap returns
609/// 503 on subsequent `initialize` POSTs until a session is evicted
610/// (via `DELETE /mcp` or the idle reaper).
611#[cfg(feature = "http")]
612pub(crate) const DEFAULT_MAX_SESSIONS: usize = 1024;
613
614/// Divisor for the default per-principal sub-cap relative to
615/// [`DEFAULT_MAX_SESSIONS`]. With `DEFAULT_MAX_SESSIONS = 1024` and
616/// divisor 16, the default per-principal cap is 64.
617#[cfg(feature = "http")]
618pub(crate) const DEFAULT_MAX_SESSIONS_PER_PRINCIPAL_DIVISOR: usize = 16;
619
620/// Default per-session SSE replay buffer capacity (frame count).
621///
622/// Servers retain the most recent N outbound frames per session
623/// so a disconnected client can reconnect with `Last-Event-Id`
624/// and replay the strictly-newer slice.
625#[cfg(feature = "http")]
626pub(crate) const DEFAULT_SSE_REPLAY_CAPACITY: usize = 256;
627
628/// Compute the default per-principal session sub-cap from
629/// `max_sessions`. Floors at 1 so tiny `max_sessions` configurations
630/// never produce a sub-cap of zero (which would reject every
631/// authenticated `initialize` POST with no recovery).
632#[cfg(feature = "http")]
633pub(crate) fn default_max_sessions_per_principal(max_sessions: usize) -> usize {
634    (max_sessions / DEFAULT_MAX_SESSIONS_PER_PRINCIPAL_DIVISOR).max(1)
635}
636
637/// Default idle-reaper tick cadence. The reaper wakes on this period
638/// to scan the session registry for idle entries. Sized for
639/// production: a 10-second tick keeps overhead negligible against
640/// the default 5-minute idle timeout. Tests override via
641/// [`McpServerBuilder::with_idle_reaper_tick`].
642#[cfg(feature = "http")]
643pub(crate) const DEFAULT_IDLE_REAPER_TICK: std::time::Duration = std::time::Duration::from_secs(10);
644
645/// Builder for [`McpServer`]. Collects one or more invokers (raw
646/// `ToolInvoker`s or `Agent`s wrapped as single-tool invokers) and
647/// configures server-level concerns such as transports, parent
648/// cancel, leader election, tenant binding, and authentication.
649///
650/// See [`McpServer::builder`] for the entry point and
651/// [`Self::build`] / [`Self::build_arc`] for the terminal calls.
652pub struct McpServerBuilder {
653    invokers: Vec<Arc<dyn ToolInvoker>>,
654    parent_cancel: CancellationToken,
655    tool_ctx_factory: ToolCtxFactory,
656    resume_buffer: Option<std::sync::Arc<dyn klieo_core::resume::ResumeBuffer>>,
657    pubsub: Option<std::sync::Arc<dyn klieo_core::Pubsub>>,
658    subscribe_cancels: bool,
659    #[cfg(feature = "http")]
660    publish_permits: Option<usize>,
661    leader_kv: Option<Arc<dyn klieo_core::KvStore>>,
662    tenant_kv: Option<Arc<dyn klieo_core::KvStore>>,
663    /// Durable store for suspended-workflow resume tickets
664    /// (ADR-045). Kept separate from `tenant_kv`
665    /// (tenant binding is per `tools/call` invocation, scoped under
666    /// `klieo-tenants`) — tickets live under their own bucket
667    /// (`klieo.mcp.resume-tickets`) and their lifecycle is bound to
668    /// the suspended run, not the request. Sharing one underlying
669    /// backend is fine and recommended, but the wiring is explicit
670    /// so a deployment can opt in to one without the other.
671    checkpoint_kv: Option<Arc<dyn klieo_core::KvStore>>,
672    tenant_strict: bool,
673    profile: klieo_core::DeploymentProfile,
674    authenticator: Option<Arc<dyn klieo_auth_common::Authenticator>>,
675    leader_ttl: Option<std::time::Duration>,
676    leader_heartbeat_interval: Option<std::time::Duration>,
677    max_failover_attempts: Option<u32>,
678    kv_reaper_interval: Option<std::time::Duration>,
679    declare_sampling: bool,
680    #[cfg(feature = "http")]
681    session_idle_timeout: Option<std::time::Duration>,
682    #[cfg(feature = "http")]
683    max_sessions: Option<usize>,
684    #[cfg(feature = "http")]
685    max_sessions_per_principal: Option<usize>,
686    #[cfg(feature = "http")]
687    sse_replay_capacity: Option<usize>,
688    /// Test-only override for the idle-reaper tick cadence. Production
689    /// callers cannot reach this; integration + unit tests use it to
690    /// drive the reaper fast enough that short idle deadlines (tens to
691    /// hundreds of milliseconds) trip within the test timeout. See
692    /// [`McpServerBuilder::with_idle_reaper_tick`].
693    #[cfg(all(feature = "http", any(test, feature = "test-fixtures")))]
694    idle_reaper_tick: Option<std::time::Duration>,
695    /// Shared HITL wiring for every workflow registered via
696    /// [`Self::add_workflow_with_schema`] / [`Self::add_workflow`].
697    /// `None` until the caller invokes [`Self::with_hitl`].
698    hitl: Option<crate::workflow::HitlBundle>,
699    /// Pending workflow registrations. Materialised into
700    /// [`crate::workflow::WorkflowAsToolInvoker`] at `build()` time
701    /// once the shared HITL bundle is known. Kept as type-erased
702    /// closures so the builder doesn't need a `WorkflowAsToolInvoker<A>`
703    /// generic at storage time.
704    pending_workflows: Vec<crate::workflow::WorkflowRegistration>,
705    /// Shared per-tenant LLM budget governor wired via
706    /// [`Self::with_governor`]. `None` keeps the legacy non-governed
707    /// path; workflow exposure refuses to build without it. Held
708    /// regardless of the `governor` cargo feature so the
709    /// `WorkflowWithoutGovernor` hard gate fires uniformly — only the
710    /// runtime wire-in lives behind the feature flag.
711    governor_bundle: Option<GovernorBundleHolder>,
712}
713
714/// Feature-gated payload carried by the builder. The outer
715/// [`Option<GovernorBundleHolder>`] field stays present in every build
716/// so the hard gate fires uniformly; the inner type erases to a unit
717/// struct when the `governor` feature is off (the build still rejects
718/// the unsafe configuration, just without a governor to wire).
719#[cfg(feature = "governor")]
720pub(crate) type GovernorBundleHolder = crate::governor::GovernorBundle;
721
722/// Zero-sized stand-in used when the `governor` feature is disabled.
723/// `with_governor` is then unreachable, but the
724/// [`McpBuildError::WorkflowWithoutGovernor`] hard gate still fires
725/// — the no-feature build rejects workflows outright.
726#[cfg(not(feature = "governor"))]
727#[derive(Clone)]
728pub(crate) struct GovernorBundleHolder;
729
730impl Default for McpServerBuilder {
731    fn default() -> Self {
732        Self::new()
733    }
734}
735
736/// Spawn the cluster-0.25 KV reaper iff the builder was asked for it
737/// AND at least one bucket (leader or ownership) is wired AND a
738/// non-noop resume buffer is available. Returns `None` when any
739/// precondition fails — the reaper has nothing useful to do without
740/// a bucket to scan or a buffer to query for terminal liveness.
741fn spawn_reaper_if_configured(
742    interval: Option<std::time::Duration>,
743    leader_registry: Option<&klieo_core::LeaderRegistry>,
744    ownership_registry: Option<&klieo_core::OwnershipRegistry>,
745    resume_buffer: &Arc<dyn klieo_core::resume::ResumeBuffer>,
746) -> Option<klieo_core::KvReaperHandle> {
747    let interval = interval?;
748    let mut buckets: Vec<String> = Vec::new();
749    let kv = if let Some(reg) = leader_registry {
750        buckets.push(reg.bucket().to_string());
751        reg.kv().clone()
752    } else if let Some(reg) = ownership_registry {
753        buckets.push(reg.bucket().to_string());
754        reg.kv().clone()
755    } else {
756        return None;
757    };
758    if let (Some(_), Some(ownership)) = (leader_registry, ownership_registry) {
759        buckets.push(ownership.bucket().to_string());
760    }
761    Some(klieo_core::spawn_kv_reaper(
762        kv,
763        resume_buffer.clone(),
764        buckets,
765        interval,
766    ))
767}
768
769impl McpServerBuilder {
770    /// Open a fresh builder with an empty invoker list and a
771    /// never-cancelled parent token.
772    pub fn new() -> Self {
773        Self {
774            invokers: Vec::new(),
775            parent_cancel: CancellationToken::new(),
776            tool_ctx_factory: default_tool_ctx_factory(),
777            resume_buffer: None,
778            pubsub: None,
779            subscribe_cancels: false,
780            #[cfg(feature = "http")]
781            publish_permits: None,
782            leader_kv: None,
783            tenant_kv: None,
784            checkpoint_kv: None,
785            tenant_strict: false,
786            profile: klieo_core::DeploymentProfile::Unprofiled,
787            authenticator: None,
788            leader_ttl: None,
789            leader_heartbeat_interval: None,
790            max_failover_attempts: None,
791            kv_reaper_interval: None,
792            declare_sampling: false,
793            #[cfg(feature = "http")]
794            session_idle_timeout: None,
795            #[cfg(feature = "http")]
796            max_sessions: None,
797            #[cfg(feature = "http")]
798            max_sessions_per_principal: None,
799            #[cfg(feature = "http")]
800            sse_replay_capacity: None,
801            #[cfg(all(feature = "http", any(test, feature = "test-fixtures")))]
802            idle_reaper_tick: None,
803            hitl: None,
804            pending_workflows: Vec::new(),
805            governor_bundle: None,
806        }
807    }
808
809    /// Configure the parent [`CancellationToken`] applied to every
810    /// agent registered via [`Self::add_agent_with_schema`] or
811    /// [`Self::add_agent`]. Cancelling this token propagates into
812    /// all in-flight agent runs by overriding the `cancel` field of
813    /// each freshly-minted `AgentContext` with a child token.
814    ///
815    /// Suitable for SIGINT-driven graceful shutdown of the stdio
816    /// loop. Raw `ToolInvoker`s registered via [`Self::add_tools`]
817    /// are unaffected (they manage their own concurrency).
818    pub fn with_parent_cancel(mut self, parent_cancel: CancellationToken) -> Self {
819        self.parent_cancel = parent_cancel;
820        self
821    }
822
823    /// Override the per-request `ToolCtx`. Default = noop in-memory
824    /// `Pubsub`/`KvStore`/`JobQueue` (the same wiring `serve_stdio`
825    /// used in 0.8/0.9). Override to wire a shared bus so bus-using
826    /// tools can be exposed through this server.
827    ///
828    /// Applies to all transports (stdio + http).
829    pub fn with_tool_ctx_factory(mut self, factory: ToolCtxFactory) -> Self {
830        self.tool_ctx_factory = factory;
831        self
832    }
833
834    /// Opt in to SSE resumption via `klieo/tools/resume`. Default is
835    /// [`klieo_core::resume::NoopResumeBuffer`] (zero-cost no-op).
836    #[must_use]
837    pub fn with_resume_buffer(
838        mut self,
839        buffer: std::sync::Arc<dyn klieo_core::resume::ResumeBuffer>,
840    ) -> Self {
841        self.resume_buffer = Some(buffer);
842        self
843    }
844
845    /// Opt in to cross-replica progress-event fanout. Default is a
846    /// fresh in-process `MemoryBus` pubsub (single-replica only).
847    /// Multi-replica deployments wire a shared `Arc<dyn Pubsub>`
848    /// (NATS-backed) via this builder method.
849    ///
850    /// # Security — progressToken IS the ownership credential
851    ///
852    /// Per the MCP spec, `_meta.progressToken` is caller-supplied and
853    /// klieo treats it as an opaque session identifier: anyone who can
854    /// present a progressToken to `klieo/tools/resume` receives the
855    /// associated stream's buffered + live events. With cross-replica
856    /// fanout, that exposure spans every replica reading from the same
857    /// shared pubsub.
858    ///
859    /// **Operators MUST ensure progressTokens are unguessable** (UUID v4
860    /// or equivalent) and authorise their issuance at the tenant
861    /// boundary BEFORE the `POST /mcp` request reaches `McpServer`.
862    /// `klieo-mcp-server` performs NO progressToken→tenant binding —
863    /// any auth proxy in front cannot infer it either. Sharing one
864    /// `Arc<dyn Pubsub>` across replicas serving multiple tenants without
865    /// an upstream layer that mints + validates per-tenant unguessable
866    /// progressTokens is a cross-tenant data-leak vector (CWE-639).
867    ///
868    /// See ADR-018 for the full threat model.
869    #[must_use]
870    pub fn with_pubsub(mut self, pubsub: std::sync::Arc<dyn klieo_core::Pubsub>) -> Self {
871        self.pubsub = Some(pubsub);
872        self
873    }
874
875    /// Spawn the wildcard cancel-subject background subscriber on
876    /// [`Self::build_arc`]. Required for multi-replica deployments —
877    /// without it, only same-replica drop-cancel works. The
878    /// background task subscribes to `klieo.mcp.cancel.>` and
879    /// dispatches each inbound cancel through the local
880    /// `cancel_registry`. Subscribe failure at startup is logged
881    /// at `error` and the task exits; replica falls back to
882    /// single-replica semantics.
883    ///
884    /// The spawned task holds an [`Arc`] clone of the server, so
885    /// this flag is only honoured by [`Self::build_arc`].
886    /// [`Self::build`] panics if this flag is set (the unwrapped
887    /// `McpServer` shape cannot keep the background task alive).
888    #[must_use]
889    pub fn with_cancel_subscription(mut self) -> Self {
890        self.subscribe_cancels = true;
891        self
892    }
893
894    /// Replace the server's publish-concurrency cap. The semaphore is
895    /// shared with every SSE-yield publish in `stream_tools_call` (private
896    /// helper in `crate::http`) and with the drop-time cross-replica
897    /// cancel publish in `CancelOnDrop`, so a single dial bounds all
898    /// best-effort fanout work this server performs.
899    ///
900    /// Pass `0` to drop all fanout publishes (useful in tests that
901    /// want to assert the saturation branch — see
902    /// `tests/publish_backpressure.rs`). Pass a larger value when the
903    /// bus backend has demonstrated headroom and 64 in-flight publishes
904    /// are a bottleneck.
905    #[cfg(feature = "http")]
906    #[must_use]
907    pub fn with_publish_concurrency(mut self, permits: usize) -> Self {
908        self.publish_permits = Some(permits);
909        self
910    }
911
912    /// Opt in to leader election for multi-replica orphan
913    /// detection. On `tools/call` start the server claims
914    /// leadership in the `klieo-leaders` KV bucket; on
915    /// `klieo/tools/resume` the server checks the leader is
916    /// alive and writes a terminal "leader died" SSE frame to
917    /// the resume buffer if not.
918    ///
919    /// Operators MUST configure the `klieo-leaders` JetStream KV
920    /// bucket with `max_age = 5s` (matches the cluster's fixed
921    /// TTL). See ADR-020.
922    #[must_use]
923    pub fn with_leader_election(mut self, kv: Arc<dyn klieo_core::KvStore>) -> Self {
924        self.leader_kv = Some(kv);
925        self
926    }
927
928    /// Opt in to tenant binding. On `tools/call` start the
929    /// server claims ownership in the `klieo-tenants` KV
930    /// bucket keyed by the authenticated `Identity.principal`;
931    /// on `klieo/tools/resume` the server rejects mismatched
932    /// principals as `JSONRPC_STREAM_NOT_FOUND` (-32004; deny-
933    /// as-NotFound per OWASP IDOR best practice).
934    ///
935    /// Operators MUST also wire an `Authenticator` via
936    /// `with_authenticator` that produces non-anonymous
937    /// identities (typically OAuth via cluster 0.21); without
938    /// it no entries are written and no checks run. See
939    /// ADR-022.
940    #[must_use]
941    pub fn with_tenant_binding(mut self, kv: Arc<dyn klieo_core::KvStore>) -> Self {
942        self.tenant_kv = Some(kv);
943        self.tenant_strict = false;
944        self
945    }
946
947    /// Like [`with_tenant_binding`](Self::with_tenant_binding) but **fail-closed**
948    /// on store error: when the `klieo-tenants` KV is unreachable, an invoke
949    /// claim and a `klieo/tools/resume` ownership check both DENY (503) rather
950    /// than proceed, closing the transient-KV-blip cross-tenant-resume window
951    /// for regulated multi-tenant deployments (at the cost of availability
952    /// during a KV outage). An unclaimed key still proceeds. See ADR-022.
953    #[must_use]
954    pub fn with_tenant_binding_strict(mut self, kv: Arc<dyn klieo_core::KvStore>) -> Self {
955        self.tenant_kv = Some(kv);
956        self.tenant_strict = true;
957        self
958    }
959
960    /// Opt in to principal-scoped resume tickets (ADR-045). When a
961    /// workflow registered via
962    /// [`Self::add_workflow_with_schema`] suspends on a `ReviewPolicy`,
963    /// the server persists the checkpoint to `kv` under an opaque
964    /// 256-bit token bound to the verified caller principal and
965    /// returns the token in the wire envelope. A later
966    /// `klieo/run/resume` request authorises the token against the
967    /// caller before atomically consuming it.
968    ///
969    /// Distinct from [`Self::with_tenant_binding`] (per-`tools/call`
970    /// ownership claim in the `klieo-tenants` bucket) — tickets live
971    /// under `klieo.mcp.resume-tickets` and carry the checkpoint
972    /// payload across the operator-review window. Sharing one KV
973    /// backend across both is fine.
974    ///
975    /// Leaving this unwired keeps the slice-1 no-ticket suspend
976    /// envelope so unprincipaled / unauthenticated deployments stay
977    /// backward-compatible.
978    #[must_use]
979    pub fn with_checkpoint_kv(mut self, kv: Arc<dyn klieo_core::KvStore>) -> Self {
980        self.checkpoint_kv = Some(kv);
981        self
982    }
983
984    /// Wire an [`klieo_auth_common::Authenticator`] into the HTTP
985    /// entry path. `post_mcp` calls `authenticate(headers)` +
986    /// `authorize_method(&identity, method)` before dispatch /
987    /// SSE upgrade. Auth failures return a JSON-RPC -32001
988    /// envelope (no SSE upgrade on failure).
989    ///
990    /// Designed for `klieo_auth_oauth::OAuthAuthenticator` but
991    /// any `Authenticator` impl works (e.g. an in-house bearer
992    /// validator, or `klieo_auth_common::AllowAnonymous` for
993    /// explicit no-auth wiring).
994    #[must_use]
995    pub fn with_authenticator(
996        mut self,
997        authenticator: Arc<dyn klieo_auth_common::Authenticator>,
998    ) -> Self {
999        self.authenticator = Some(authenticator);
1000        self
1001    }
1002
1003    /// Apply a [`DeploymentProfile`](klieo_core::DeploymentProfile). Regulated
1004    /// profiles force strict tenant binding + require a non-anonymous
1005    /// authenticator; `build()` fails closed if a prerequisite is missing.
1006    pub fn profile(mut self, profile: klieo_core::DeploymentProfile) -> Self {
1007        self.profile = profile;
1008        self
1009    }
1010
1011    /// Override the cluster-0.20 leader TTL (default [`LEADER_TTL`],
1012    /// 5s). Wider TTL tolerates network blips at the cost of slower
1013    /// dead-leader detection; tighter TTL detects deaths faster but
1014    /// risks spurious orphan probes under load. The heartbeat
1015    /// interval defaults to `ttl / 2` — override independently via
1016    /// [`Self::with_leader_heartbeat_interval`]. Cluster 0.25.
1017    #[must_use]
1018    pub fn with_leader_ttl(mut self, ttl: std::time::Duration) -> Self {
1019        self.leader_ttl = Some(ttl);
1020        self
1021    }
1022
1023    /// Override the leader heartbeat interval. Default is half the
1024    /// configured TTL (or [`LEADER_TTL`] / 2 when the TTL itself is
1025    /// at its default). Tighten when KV write latency is high and
1026    /// you want extra headroom before TTL expires. Cluster 0.25.
1027    #[must_use]
1028    pub fn with_leader_heartbeat_interval(mut self, interval: std::time::Duration) -> Self {
1029        self.leader_heartbeat_interval = Some(interval);
1030        self
1031    }
1032
1033    /// Override the cluster-0.24 failover-attempt cap (default
1034    /// [`klieo_core::FAILOVER_ATTEMPT_CAP`], 3). Higher cap
1035    /// accommodates flaky downstream where bad-state tools are
1036    /// rare; lower cap exits failover loops faster. Cluster 0.25.
1037    #[must_use]
1038    pub fn with_max_failover_attempts(mut self, cap: u32) -> Self {
1039        self.max_failover_attempts = Some(cap);
1040        self
1041    }
1042
1043    /// Opt in to the cluster-0.25 KV reaper. Spawns a background
1044    /// task scanning `klieo-leaders` (+ `klieo-tenants` when tenant
1045    /// binding is wired) every `interval`, evicting entries whose
1046    /// resume buffer is terminal. The scan task is spawned at
1047    /// [`Self::build`] / [`Self::build_arc`] time (the `McpServer`
1048    /// holds both leader-KV / ownership-KV via its registries AND
1049    /// the resume buffer directly); calling this method without
1050    /// [`Self::with_resume_buffer`] AND at least one of
1051    /// [`Self::with_leader_election`] / [`Self::with_tenant_binding`]
1052    /// is a no-op.
1053    ///
1054    /// Recommended: 60s for production NATS deployments running
1055    /// `klieo-leaders` / `klieo-tenants` without bucket TTLs. Bucket
1056    /// TTLs are still the primary eviction mechanism — the reaper
1057    /// is a backstop for deployments where TTLs are not configured.
1058    #[must_use]
1059    pub fn with_kv_reaper(mut self, interval: std::time::Duration) -> Self {
1060        self.kv_reaper_interval = Some(interval);
1061        self
1062    }
1063
1064    /// Declare that this server may issue `sampling/createMessage`
1065    /// outbound requests to the client. Sets
1066    /// `capabilities.sampling = {}` on the initialize response so
1067    /// MCP-conformant clients know to listen.
1068    ///
1069    /// This is the precondition for constructing the outbound
1070    /// correlation primitive: until the flag is set, the stdio read
1071    /// loop drops inbound JSON-RPC responses (no table to route them
1072    /// into). Tools-only deployments leave the flag unset and pay
1073    /// zero outbound cost.
1074    #[must_use]
1075    pub fn with_client_sampling(mut self) -> Self {
1076        self.declare_sampling = true;
1077        self
1078    }
1079
1080    /// Configure the HTTP streamable-session idle timeout. Default:
1081    /// 5 minutes (see `DEFAULT_SESSION_IDLE_TIMEOUT`). Pass
1082    /// [`std::time::Duration::ZERO`] to disable the per-session
1083    /// watchdog entirely (sessions then live until the client SSE
1084    /// disconnects). ADR-028.
1085    #[cfg(feature = "http")]
1086    #[must_use]
1087    pub fn with_session_idle_timeout(mut self, ttl: std::time::Duration) -> Self {
1088        self.session_idle_timeout = Some(ttl);
1089        self
1090    }
1091
1092    /// Override the `DEFAULT_MAX_SESSIONS` cap on concurrent HTTP
1093    /// sessions held in the server's private `sessions` registry.
1094    /// Reaching the cap causes `handle_initialize_post` to return 503
1095    /// until a session is evicted via `DELETE /mcp` or the idle reaper.
1096    ///
1097    /// Panics when `cap == 0` — a zero cap would deadlock the
1098    /// `initialize` path with no recovery, which is a caller bug.
1099    #[cfg(feature = "http")]
1100    #[must_use]
1101    pub fn with_max_sessions(mut self, cap: usize) -> Self {
1102        assert!(cap > 0, "max_sessions must be > 0");
1103        self.max_sessions = Some(cap);
1104        self
1105    }
1106
1107    /// Override the default per-principal session sub-cap. The
1108    /// default is `default_max_sessions_per_principal` applied to
1109    /// the effective `max_sessions` —
1110    /// `max_sessions / DEFAULT_MAX_SESSIONS_PER_PRINCIPAL_DIVISOR`
1111    /// floored at 1. Reaching the sub-cap causes
1112    /// `handle_initialize_post` to return 503 on subsequent
1113    /// `initialize` POSTs from the same authenticated principal until
1114    /// one of that principal's sessions is evicted (via `DELETE /mcp`
1115    /// or the idle reaper).
1116    ///
1117    /// Panics when `cap == 0` — a zero sub-cap would reject every
1118    /// authenticated `initialize` with no recovery, which is a caller
1119    /// bug.
1120    #[cfg(feature = "http")]
1121    #[must_use]
1122    pub fn with_max_sessions_per_principal(mut self, cap: usize) -> Self {
1123        assert!(cap > 0, "max_sessions_per_principal must be > 0");
1124        self.max_sessions_per_principal = Some(cap);
1125        self
1126    }
1127
1128    /// Cap on the per-session SSE replay buffer (frame count).
1129    ///
1130    /// Defaults to `DEFAULT_SSE_REPLAY_CAPACITY` (256). A value of
1131    /// `0` disables resumption; reconnects with `Last-Event-Id` then
1132    /// return 501 Not Implemented.
1133    #[cfg(feature = "http")]
1134    #[must_use]
1135    pub fn with_sse_replay_capacity(mut self, capacity: usize) -> Self {
1136        self.sse_replay_capacity = Some(capacity);
1137        self
1138    }
1139
1140    /// Override the idle-reaper tick cadence used by the per-server
1141    /// background task. Production callers leave this unset — the
1142    /// reaper wakes every 10 seconds, which is the right cadence for
1143    /// minute-scale `session_idle_timeout` values. Tests configure
1144    /// short idle deadlines (tens of milliseconds) and rely on a
1145    /// matching short tick so the reaper fires within the test
1146    /// timeout.
1147    ///
1148    /// Gated on `test-fixtures` (or `#[cfg(test)]` inside the crate)
1149    /// so the production wire surface stays fixed.
1150    #[cfg(all(feature = "http", any(test, feature = "test-fixtures")))]
1151    #[must_use]
1152    pub fn with_idle_reaper_tick(mut self, tick: std::time::Duration) -> Self {
1153        self.idle_reaper_tick = Some(tick);
1154        self
1155    }
1156
1157    /// Append a raw [`ToolInvoker`]. The invoker's catalogue will
1158    /// be merged into the server's `tools/list` response.
1159    pub fn add_tools(mut self, invoker: Arc<dyn ToolInvoker>) -> Self {
1160        self.invokers.push(invoker);
1161        self
1162    }
1163
1164    /// Register an [`Agent`] as a single MCP tool with the
1165    /// caller-supplied JSON Schema. See [`McpServer::expose_agent_with_schema`]
1166    /// for the dispatch semantics.
1167    pub fn add_agent_with_schema<A>(
1168        mut self,
1169        agent: A,
1170        input_schema: serde_json::Value,
1171        ctx_factory: AgentContextFactory,
1172    ) -> Self
1173    where
1174        A: Agent + 'static,
1175        A::Input: serde::de::DeserializeOwned + Send + 'static,
1176        A::Output: serde::Serialize + Send + 'static,
1177    {
1178        let name = agent.name().to_string();
1179        let invoker: Arc<dyn ToolInvoker> = Arc::new(AgentAsToolInvoker {
1180            agent: Arc::new(agent),
1181            name,
1182            input_schema,
1183            ctx_factory,
1184            // Governed agents opt in iff `with_governor` wired the
1185            // bundle at registration time. Cloning here is
1186            // cheap (Arc + ProviderId) and binds the invoker to the
1187            // governor visible at registration — later `with_governor`
1188            // calls do not retroactively govern earlier-registered
1189            // agents.
1190            #[cfg(feature = "governor")]
1191            governor: self.governor_bundle.clone(),
1192        });
1193        self.invokers.push(invoker);
1194        self
1195    }
1196
1197    /// Auto-derive variant of [`Self::add_agent_with_schema`].
1198    /// Requires `A::Input: schemars::JsonSchema` and the `schemars`
1199    /// cargo feature on `klieo-mcp-server`.
1200    #[cfg(feature = "schemars")]
1201    pub fn add_agent<A>(self, agent: A, ctx_factory: AgentContextFactory) -> Self
1202    where
1203        A: Agent + 'static,
1204        A::Input: serde::de::DeserializeOwned + schemars::JsonSchema + Send + 'static,
1205        A::Output: serde::Serialize + Send + 'static,
1206    {
1207        let schema = serde_json::to_value(schemars::schema_for!(A::Input))
1208            .expect("schemars::Schema serialises to JSON via #[derive(Serialize)]");
1209        self.add_agent_with_schema(agent, schema, ctx_factory)
1210    }
1211
1212    /// Wire the inbound per-tenant LLM budget governor.
1213    ///
1214    /// `governor` enforces the RPS cap; `provider` is the
1215    /// [`klieo_ops::ProviderId`] every inbound LLM call is scoped to.
1216    /// Workflows registered via [`Self::add_workflow_with_schema`] /
1217    /// [`Self::add_workflow`] are hard-gated on this wiring — the
1218    /// build refuses to ship a workflow-exposing server without a
1219    /// governor and surfaces [`McpBuildError::WorkflowWithoutGovernor`].
1220    /// Governed agents (registered via [`Self::add_agent`]) opt in
1221    /// automatically: the governor wraps their `ctx.llm` whenever
1222    /// one is wired, otherwise they keep the legacy ungoverned path.
1223    ///
1224    /// Available only with the `governor` cargo feature. Without
1225    /// the feature, the build still refuses workflows (the hard gate
1226    /// fires regardless), so adopters always know exposure of a paid
1227    /// LLM path through MCP must be explicit.
1228    #[cfg(feature = "governor")]
1229    #[must_use]
1230    pub fn with_governor(
1231        mut self,
1232        governor: Arc<dyn klieo_ops::governor::Governor>,
1233        provider: klieo_ops::ProviderId,
1234    ) -> Self {
1235        self.governor_bundle = Some(crate::governor::GovernorBundle { governor, provider });
1236        self
1237    }
1238
1239    /// Configure the shared HITL client + config consumed by every
1240    /// workflow registered via [`Self::add_workflow_with_schema`] /
1241    /// [`Self::add_workflow`]. Building with a workflow but no HITL
1242    /// wiring fails with [`McpBuildError::WorkflowWithoutHitl`] — the
1243    /// workflow path has no compliance endpoint to submit to without
1244    /// it (ADR-045).
1245    pub fn with_hitl(
1246        mut self,
1247        client: Arc<klieo_hitl_client::HitlClient>,
1248        cfg: Arc<klieo_hitl::HitlConfig>,
1249    ) -> Self {
1250        self.hitl = Some(crate::workflow::HitlBundle { client, cfg });
1251        self
1252    }
1253
1254    /// Register an [`Agent`] as a single MCP tool whose `tools/call`
1255    /// drives [`klieo_hitl::run_with_hitl`] instead of the bare
1256    /// [`Agent::run`]. `system_prompt` is the prompt the underlying
1257    /// `run_steps` loop uses; `run_options` carries the
1258    /// [`klieo_core::runtime::ReviewPolicy`] that decides when to
1259    /// suspend. On suspend the response is
1260    /// `{"status":"suspended","reason":...}` (no checkpoint / ticket
1261    /// when no resume KV is wired; ADR-045).
1262    ///
1263    /// Building this server without first calling [`Self::with_hitl`]
1264    /// fails with [`McpBuildError::WorkflowWithoutHitl`].
1265    pub fn add_workflow_with_schema<A>(
1266        mut self,
1267        agent: A,
1268        system_prompt: impl Into<String>,
1269        input_schema: serde_json::Value,
1270        run_options: klieo_core::runtime::RunOptions,
1271        ctx_factory: AgentContextFactory,
1272    ) -> Self
1273    where
1274        A: Agent + 'static,
1275        A::Input: serde::de::DeserializeOwned + Send + 'static,
1276    {
1277        let name = agent.name().to_string();
1278        let prompt = system_prompt.into();
1279        // The workflow body never calls `A::run`; the agent value is
1280        // captured purely to anchor the generic `A` (which carries
1281        // `A::Input`'s DeserializeOwned + Send bounds through the
1282        // type-erased registration). PhantomData on the invoker
1283        // does the same job once materialised.
1284        drop(agent);
1285        let materialise: crate::workflow::WorkflowMaterialiser = Box::new(
1286            move |bundle: crate::workflow::HitlBundle,
1287                  ticket_store: Option<Arc<crate::resume_ticket::ResumeTicketStore>>,
1288                  governor_bundle: Option<crate::GovernorBundleHolder>| {
1289                #[cfg(not(feature = "governor"))]
1290                let _ = governor_bundle;
1291                let invoker = Arc::new(crate::workflow::WorkflowAsToolInvoker::<A>::new(
1292                    name.clone(),
1293                    prompt.clone(),
1294                    input_schema.clone(),
1295                    ctx_factory.clone(),
1296                    run_options.clone(),
1297                    bundle,
1298                    ticket_store,
1299                    #[cfg(feature = "governor")]
1300                    governor_bundle,
1301                ));
1302                crate::workflow::WorkflowMaterialisation {
1303                    name: name.clone(),
1304                    resume_handle: invoker.clone()
1305                        as Arc<dyn crate::workflow::WorkflowResumeHandle>,
1306                    invoker: invoker as Arc<dyn ToolInvoker>,
1307                }
1308            },
1309        );
1310        self.pending_workflows
1311            .push(crate::workflow::WorkflowRegistration { materialise });
1312        self
1313    }
1314
1315    /// Auto-derive variant of [`Self::add_workflow_with_schema`].
1316    /// Requires `A::Input: schemars::JsonSchema` and the `schemars`
1317    /// cargo feature on `klieo-mcp-server`.
1318    #[cfg(feature = "schemars")]
1319    pub fn add_workflow<A>(
1320        self,
1321        agent: A,
1322        system_prompt: impl Into<String>,
1323        run_options: klieo_core::runtime::RunOptions,
1324        ctx_factory: AgentContextFactory,
1325    ) -> Self
1326    where
1327        A: Agent + 'static,
1328        A::Input: serde::de::DeserializeOwned + schemars::JsonSchema + Send + 'static,
1329    {
1330        let schema = serde_json::to_value(schemars::schema_for!(A::Input))
1331            .expect("schemars::Schema serialises to JSON via #[derive(Serialize)]");
1332        self.add_workflow_with_schema(agent, system_prompt, schema, run_options, ctx_factory)
1333    }
1334
1335    /// Finalise the builder into a runnable [`McpServer`].
1336    ///
1337    /// - **0 invokers** → [`McpBuildError::NoInvokers`].
1338    /// - **1 invoker** → wrap directly with no merge overhead.
1339    /// - **≥2 invokers** → wrap in `MergedInvoker`. Returns
1340    ///   [`McpBuildError::DuplicateTool`] on the first duplicate tool
1341    ///   name across the merged catalogues.
1342    /// - [`Self::with_cancel_subscription`] set → [`McpBuildError::CancelRequiresArc`].
1343    pub fn build(self) -> Result<McpServer, McpBuildError> {
1344        if self.subscribe_cancels {
1345            return Err(McpBuildError::CancelRequiresArc);
1346        }
1347        self.build_inner()
1348    }
1349
1350    fn build_inner(mut self) -> Result<McpServer, McpBuildError> {
1351        let ticket_store = self
1352            .checkpoint_kv
1353            .clone()
1354            .map(|kv| Arc::new(crate::resume_ticket::ResumeTicketStore::new(kv)));
1355        let mut workflow_resume_handles: std::collections::HashMap<
1356            String,
1357            Arc<dyn crate::workflow::WorkflowResumeHandle>,
1358        > = std::collections::HashMap::new();
1359        if !self.pending_workflows.is_empty() {
1360            let bundle = self
1361                .hitl
1362                .clone()
1363                .ok_or(McpBuildError::WorkflowWithoutHitl)?;
1364            // Hard gate: workflow exposure is illegal without a
1365            // governor — an inbound MCP caller must not be
1366            // able to drive unbounded paid LLM spend. Mirrors
1367            // `WorkflowWithoutHitl`. The gate fires only when the
1368            // `governor` cargo feature is enabled; the off-feature
1369            // build retains the legacy ungoverned path because the
1370            // adopter explicitly opted out of inbound budget
1371            // enforcement at compile time.
1372            #[cfg(feature = "governor")]
1373            if self.governor_bundle.is_none() {
1374                return Err(McpBuildError::WorkflowWithoutGovernor);
1375            }
1376            // The off-feature build path is unreachable because the
1377            // hard gate above already errored; we still propagate the
1378            // (always-`None`) holder so the materialiser signature
1379            // stays feature-uniform.
1380            let governor_bundle = self.governor_bundle.clone();
1381            let pending = std::mem::take(&mut self.pending_workflows);
1382            for reg in pending {
1383                let mat = (reg.materialise)(
1384                    bundle.clone(),
1385                    ticket_store.clone(),
1386                    governor_bundle.clone(),
1387                );
1388                if workflow_resume_handles
1389                    .insert(mat.name.clone(), mat.resume_handle)
1390                    .is_some()
1391                {
1392                    return Err(McpBuildError::DuplicateTool(mat.name));
1393                }
1394                self.invokers.push(mat.invoker);
1395            }
1396        }
1397        let invoker_count = self.invokers.len();
1398        if invoker_count == 0 {
1399            return Err(McpBuildError::NoInvokers);
1400        }
1401        let invoker: Arc<dyn ToolInvoker> = if invoker_count == 1 {
1402            self.invokers.into_iter().next().unwrap() // safe: len == 1
1403        } else {
1404            Arc::new(MergedInvoker::new(self.invokers)?)
1405        };
1406        #[cfg(feature = "http")]
1407        let permits = self.publish_permits.unwrap_or(DEFAULT_PUBLISH_PERMITS);
1408        #[cfg(feature = "http")]
1409        let max_sessions = self.max_sessions.unwrap_or(DEFAULT_MAX_SESSIONS);
1410        #[cfg(feature = "http")]
1411        let max_sessions_per_principal = self
1412            .max_sessions_per_principal
1413            .unwrap_or_else(|| default_max_sessions_per_principal(max_sessions));
1414        let leader_registry = self.leader_kv.map(|kv| {
1415            klieo_core::LeaderRegistry::new(
1416                kv,
1417                "klieo-leaders".into(),
1418                uuid::Uuid::new_v4().to_string(),
1419            )
1420        });
1421        let profile = self.profile;
1422        profile.validate(
1423            self.tenant_kv.is_some(),
1424            self.authenticator.as_ref().map(|a| a.allows_anonymous()),
1425        )?;
1426        let tenant_strict = self.tenant_strict || profile.requires_strict_binding();
1427        let ownership_registry = self.tenant_kv.map(|kv| {
1428            let bucket = "klieo-tenants".into();
1429            if tenant_strict {
1430                klieo_core::OwnershipRegistry::new_strict(kv, bucket)
1431            } else {
1432                klieo_core::OwnershipRegistry::new(kv, bucket)
1433            }
1434        });
1435        if profile.requires_strict_binding() || profile.requires_named_principal() {
1436            tracing::warn!(
1437                target: "klieo.security",
1438                cwe = 639,
1439                "regulated multi-tenant profile active on this replica; \
1440                 cross-replica tenant isolation assumes ALL replicas run the \
1441                 same profile — a lenient peer reintroduces CWE-639. Fleet \
1442                 homogeneity is NOT verified by this replica."
1443            );
1444        }
1445        let resume_buffer = self
1446            .resume_buffer
1447            .unwrap_or_else(|| std::sync::Arc::new(klieo_core::resume::NoopResumeBuffer));
1448        let leader_ttl = self.leader_ttl.unwrap_or(LEADER_TTL);
1449        let leader_heartbeat_interval = self.leader_heartbeat_interval.unwrap_or(leader_ttl / 2);
1450        let max_failover_attempts = self
1451            .max_failover_attempts
1452            .unwrap_or(klieo_core::FAILOVER_ATTEMPT_CAP);
1453        let kv_reaper = spawn_reaper_if_configured(
1454            self.kv_reaper_interval,
1455            leader_registry.as_ref(),
1456            ownership_registry.as_ref(),
1457            &resume_buffer,
1458        );
1459        Ok(McpServer {
1460            invoker,
1461            tool_ctx_factory: self.tool_ctx_factory,
1462            parent_cancel: self.parent_cancel,
1463            resume_buffer,
1464            pubsub: self
1465                .pubsub
1466                .unwrap_or_else(|| klieo_bus_memory::MemoryBus::new().pubsub.clone()),
1467            cancel_registry: klieo_core::CancelRegistry::new(),
1468            #[cfg(feature = "http")]
1469            publish_permits: std::sync::Arc::new(tokio::sync::Semaphore::new(permits)),
1470            leader_registry,
1471            ownership_registry,
1472            resume_ticket_store: ticket_store,
1473            workflow_resume_handles,
1474            authenticator: self.authenticator,
1475            leader_ttl,
1476            leader_heartbeat_interval,
1477            max_failover_attempts,
1478            kv_reaper_interval: self.kv_reaper_interval,
1479            _kv_reaper: kv_reaper,
1480            // Stdio session container starts empty. The stdio entry
1481            // path populates it on first inbound request; HTTP
1482            // transports never touch it.
1483            stdio_session: tokio::sync::OnceCell::new(),
1484            // HTTP session registry starts empty. The initialize POST
1485            // handler inserts a fresh entry when a client mints a
1486            // session; the reaper and DELETE handler remove entries.
1487            #[cfg(feature = "http")]
1488            sessions: std::sync::Arc::new(tokio::sync::RwLock::new(
1489                std::collections::HashMap::new(),
1490            )),
1491            #[cfg(feature = "http")]
1492            max_sessions,
1493            #[cfg(feature = "http")]
1494            max_sessions_per_principal,
1495            #[cfg(feature = "http")]
1496            sse_replay_capacity: self
1497                .sse_replay_capacity
1498                .unwrap_or(DEFAULT_SSE_REPLAY_CAPACITY),
1499            #[cfg(feature = "http")]
1500            principal_counts: std::sync::Arc::new(tokio::sync::RwLock::new(
1501                std::collections::HashMap::new(),
1502            )),
1503            // Idle-reaper spawn sentinel starts empty. The first
1504            // initialize POST that needs the reaper populates it via
1505            // `ensure_idle_reaper`; subsequent populates are no-ops.
1506            #[cfg(feature = "http")]
1507            idle_reaper_started: tokio::sync::OnceCell::new(),
1508            declare_sampling: self.declare_sampling,
1509            // Shared stdout writer is minted lazily by `serve_stdio`
1510            // on first call (build-time stays sync; the writer needs
1511            // an async runtime context).
1512            stdout_writer: tokio::sync::OnceCell::new(),
1513            // Client capabilities populated by the `initialize` arm.
1514            client_caps: tokio::sync::Mutex::new(ClientCaps::default()),
1515            #[cfg(feature = "http")]
1516            session_idle_timeout: self
1517                .session_idle_timeout
1518                .unwrap_or(DEFAULT_SESSION_IDLE_TIMEOUT),
1519            #[cfg(all(feature = "http", any(test, feature = "test-fixtures")))]
1520            idle_reaper_tick: self.idle_reaper_tick.unwrap_or(DEFAULT_IDLE_REAPER_TICK),
1521            #[cfg(all(feature = "http", not(any(test, feature = "test-fixtures"))))]
1522            idle_reaper_tick: DEFAULT_IDLE_REAPER_TICK,
1523            // Reference instant for per-session `last_activity_millis`.
1524            // Captured once at build time and never mutated; lives for
1525            // the lifetime of the server.
1526            #[cfg(feature = "http")]
1527            server_start: std::time::Instant::now(),
1528        })
1529    }
1530
1531    /// Identical to [`Self::build`] but returns `Arc<McpServer>`,
1532    /// which is required by the HTTP transport (it shares state
1533    /// across handler invocations).
1534    ///
1535    /// If [`Self::with_cancel_subscription`] was called, this also
1536    /// spawns the wildcard `klieo.mcp.cancel.>` background task
1537    /// against an [`Arc`] clone of the returned server.
1538    pub fn build_arc(self) -> Result<std::sync::Arc<McpServer>, McpBuildError> {
1539        let spawn_subscriber = self.subscribe_cancels;
1540        let server = std::sync::Arc::new(self.build_inner()?);
1541        if spawn_subscriber {
1542            klieo_core::cancel::spawn_wildcard_cancel_subscriber(
1543                server.pubsub.clone(),
1544                "klieo.mcp.cancel.>".to_string(),
1545                "klieo.mcp.cancel.".to_string(),
1546                server.cancel_registry.clone(),
1547                "mcp.cancel",
1548            );
1549        }
1550        Ok(server)
1551    }
1552}
1553
1554impl McpServer {
1555    /// Open a [`McpServerBuilder`] for accumulating tools / agents
1556    /// and configuring server-level concerns such as a parent
1557    /// [`CancellationToken`].
1558    ///
1559    /// The three shorthand constructors below
1560    /// ([`Self::expose_tools`], [`Self::expose_agent_with_schema`],
1561    /// [`Self::expose_agent`]) build on this — they are zero-config
1562    /// shims for the single-tool / single-agent cases.
1563    pub fn builder() -> McpServerBuilder {
1564        McpServerBuilder::new()
1565    }
1566
1567    /// Borrow the leader registry if configured via the builder's
1568    /// `with_leader_election`. None when running single-replica
1569    /// without orphan detection.
1570    pub fn leader_registry(&self) -> Option<&klieo_core::LeaderRegistry> {
1571        self.leader_registry.as_ref()
1572    }
1573
1574    /// Borrow the ownership registry if configured via the
1575    /// builder's `with_tenant_binding`. None when running
1576    /// without tenant binding (pre-0.22 deployments).
1577    pub fn ownership_registry(&self) -> Option<&klieo_core::OwnershipRegistry> {
1578        self.ownership_registry.as_ref()
1579    }
1580
1581    /// Borrow the configured authenticator if any.
1582    pub fn authenticator(&self) -> Option<&Arc<dyn klieo_auth_common::Authenticator>> {
1583        self.authenticator.as_ref()
1584    }
1585
1586    /// Configured leader TTL (cluster-0.20 + cluster-0.25 tunable).
1587    /// Default [`LEADER_TTL`] when
1588    /// [`McpServerBuilder::with_leader_ttl`] was not called.
1589    pub fn leader_ttl(&self) -> std::time::Duration {
1590        self.leader_ttl
1591    }
1592
1593    /// Configured leader heartbeat interval. Default is half the
1594    /// configured TTL when
1595    /// [`McpServerBuilder::with_leader_heartbeat_interval`] was not
1596    /// called.
1597    pub fn leader_heartbeat_interval(&self) -> std::time::Duration {
1598        self.leader_heartbeat_interval
1599    }
1600
1601    /// Configured failover-attempt cap (cluster-0.24 + cluster-0.25
1602    /// tunable). Default [`klieo_core::FAILOVER_ATTEMPT_CAP`] when
1603    /// [`McpServerBuilder::with_max_failover_attempts`] was not
1604    /// called.
1605    pub fn max_failover_attempts(&self) -> u32 {
1606        self.max_failover_attempts
1607    }
1608
1609    /// Configured KV-reaper interval, if any. `None` when
1610    /// [`McpServerBuilder::with_kv_reaper`] was not called or when
1611    /// no leader/ownership KV was wired (no bucket to scan).
1612    pub fn kv_reaper_interval(&self) -> Option<std::time::Duration> {
1613        self.kv_reaper_interval
1614    }
1615
1616    /// Snapshot of every live HTTP session id, taken under the
1617    /// registry read-lock. The lock is released before this returns,
1618    /// so the snapshot's relationship with concurrent `DELETE /mcp`
1619    /// requests or idle-reaper evictions depends on observation
1620    /// timing. Order is undefined. Returns an empty vector on stdio
1621    /// servers and on HTTP servers with no live sessions.
1622    #[cfg(feature = "http")]
1623    pub async fn session_ids(&self) -> Vec<uuid::Uuid> {
1624        self.sessions.read().await.keys().copied().collect()
1625    }
1626
1627    /// Close state of a specific HTTP session, looked up by minted
1628    /// `Mcp-Session-Id`. Returns `None` when the id is unknown — the
1629    /// session either never existed or has already been evicted via
1630    /// `DELETE /mcp` or the idle reaper. Returns `Some(true)` after
1631    /// the session's drain path has run; `Some(false)` while the
1632    /// session is still serving traffic.
1633    #[cfg(feature = "http")]
1634    pub async fn is_session_closed_by_id(&self, id: uuid::Uuid) -> Option<bool> {
1635        self.sessions.read().await.get(&id).map(|s| s.is_closed())
1636    }
1637
1638    /// Returns `true` when SSE resumption is enabled
1639    /// (`sse_replay_capacity > 0`). Read by the SSE producer to
1640    /// gate buffer writes and by the `GET /mcp` resume branch to
1641    /// reject `Last-Event-Id` with 501 when disabled.
1642    #[cfg(feature = "http")]
1643    pub(crate) fn sse_replay_enabled(&self) -> bool {
1644        self.sse_replay_capacity > 0
1645    }
1646
1647    /// Decrement the per-principal session count for `principal`.
1648    /// Removes the entry when the count reaches zero so the HashMap
1649    /// does not grow unbounded under principal churn. No-op when
1650    /// `principal` is `None` (stdio session or auth-disabled
1651    /// deployment, neither of which populates the count cache).
1652    ///
1653    /// Callers MUST release any [`Self::sessions`] write guard before
1654    /// invoking this method to preserve the documented lock-acquisition
1655    /// order.
1656    #[cfg(feature = "http")]
1657    pub(crate) async fn decrement_principal_count(&self, principal: Option<&str>) {
1658        let Some(principal) = principal else { return };
1659        let mut counts = self.principal_counts.write().await;
1660        if let Some(entry) = counts.get_mut(principal) {
1661            *entry = entry.saturating_sub(1);
1662            if *entry == 0 {
1663                counts.remove(principal);
1664            }
1665        }
1666    }
1667
1668    /// Borrow the stdio transport's server-initiated outbound channel
1669    /// — `Some` once the stdio loop has wired it (i.e. the builder
1670    /// opted in via [`McpServerBuilder::with_client_sampling`] AND
1671    /// [`Self::serve_with_streams`] has run at least once), `None`
1672    /// otherwise. Surfaces an `Arc<dyn ServerOutbound>` so callers
1673    /// thread it onto `ToolCtx::server_outbound` in their own
1674    /// [`McpServerBuilder::with_tool_ctx_factory`] closures.
1675    ///
1676    /// Scoped to the stdio transport: HTTP servers carry one outbound
1677    /// per session, accessible via the per-session registry rather
1678    /// than through this accessor.
1679    pub fn outbound(&self) -> Option<Arc<dyn klieo_core::ServerOutbound>> {
1680        self.stdio_session
1681            .get()
1682            .and_then(|session| session.outbound.get())
1683            .map(|o| o.clone() as Arc<dyn klieo_core::ServerOutbound>)
1684    }
1685
1686    /// Push a notification-shaped JSON-RPC frame through the named
1687    /// HTTP session's wired outbound primitive. Used by the
1688    /// slow-SSE-consumer integration test to drive the per-session
1689    /// ring past [`crate::outbound_sink::OUTBOUND_QUEUE_CAPACITY`]
1690    /// without fabricating pending-response entries — notifications
1691    /// carry no `id`, so the table stays empty while the ring fills.
1692    /// The `payload_bytes` parameter inflates each frame so the
1693    /// kernel TCP send buffer saturates before the ring does —
1694    /// otherwise the receiver dequeues fast enough that drop-oldest
1695    /// never fires.
1696    ///
1697    /// Returns `Ok(())` when the frame was enqueued (including the
1698    /// drop-oldest path, which the sink reports out-of-band via
1699    /// metrics and a `warn` log). Returns `Err(())` when the session
1700    /// id is unknown OR when no outbound primitive has been wired
1701    /// (caller must run `POST /mcp initialize` then `GET /mcp` first).
1702    ///
1703    /// Gated on `test-fixtures` so production callers cannot reach it.
1704    /// Test-only snapshot of the per-HTTP-session roots cache, keyed
1705    /// by minted `Mcp-Session-Id`. Returns `None` when the id is
1706    /// unknown OR the roots cache has not been wired (the server was
1707    /// not built with `with_client_sampling`). Gated on
1708    /// `test-fixtures` so production callers cannot reach it.
1709    #[cfg(all(feature = "http", feature = "test-fixtures"))]
1710    pub async fn client_roots_for_session(&self, session_id: uuid::Uuid) -> Option<Vec<Root>> {
1711        let sessions = self.sessions.read().await;
1712        let session = sessions.get(&session_id)?;
1713        let cache = session.roots_cache.get()?;
1714        Some(cache.snapshot())
1715    }
1716
1717    /// Test-only lookup of a session's wired outbound primitive,
1718    /// keyed by minted `Mcp-Session-Id`. Returns `Some` once the SSE
1719    /// `GET /mcp` for the session has installed the primitive and the
1720    /// session is still in the registry; `None` for unknown ids or
1721    /// before the SSE wiring completes. Gated on `test-fixtures` so
1722    /// production callers cannot reach it — see [`Self::session_ids`]
1723    /// for the production-facing accessor.
1724    #[cfg(all(feature = "http", feature = "test-fixtures"))]
1725    pub async fn outbound_for_session(
1726        &self,
1727        session_id: uuid::Uuid,
1728    ) -> Option<Arc<dyn klieo_core::ServerOutbound>> {
1729        let sessions = self.sessions.read().await;
1730        let session = sessions.get(&session_id)?;
1731        let outbound = session.outbound.get()?.clone();
1732        Some(outbound as Arc<dyn klieo_core::ServerOutbound>)
1733    }
1734
1735    /// Test-only read of the resolved per-session SSE replay buffer
1736    /// capacity. Returns the value handed in via
1737    /// [`McpServerBuilder::with_sse_replay_capacity`] or the builder
1738    /// default when the knob was not set. Gated on `test-fixtures`
1739    /// so production callers cannot reach it.
1740    #[cfg(all(feature = "http", feature = "test-fixtures"))]
1741    pub fn sse_replay_capacity(&self) -> usize {
1742        self.sse_replay_capacity
1743    }
1744
1745    /// Test-only snapshot of a session's SSE replay buffer as a list
1746    /// of `(event_id, frame)` pairs in delivery order. Returns
1747    /// `None` when the session id is absent from the registry. Gated
1748    /// on `test-fixtures` so production callers cannot reach it.
1749    #[cfg(all(feature = "http", feature = "test-fixtures"))]
1750    pub async fn sse_replay_snapshot(
1751        &self,
1752        session_id: uuid::Uuid,
1753    ) -> Option<Vec<(u64, std::sync::Arc<serde_json::Value>)>> {
1754        let sessions = self.sessions.read().await;
1755        let session = sessions.get(&session_id)?;
1756        let buffer = session.sse_replay_buffer.lock();
1757        Some(buffer.iter().cloned().collect())
1758    }
1759
1760    /// Push a notification-shaped JSON-RPC frame through the named
1761    /// HTTP session's wired outbound primitive — used by the slow-SSE
1762    /// backpressure integration test to drive the per-session ring past
1763    /// [`crate::outbound_sink::OUTBOUND_QUEUE_CAPACITY`]. Each frame is
1764    /// inflated to `payload_bytes` so the kernel TCP send buffer
1765    /// saturates before the ring does. Returns `Err(())` when the
1766    /// session id is unknown or no outbound primitive is wired.
1767    ///
1768    /// Gated on `test-fixtures` so production callers cannot reach it.
1769    #[cfg(all(feature = "http", feature = "test-fixtures"))]
1770    pub async fn emit_test_notification(
1771        &self,
1772        session_id: uuid::Uuid,
1773        method: &str,
1774        payload_bytes: usize,
1775    ) -> Result<(), ()> {
1776        let session = {
1777            let sessions = self.sessions.read().await;
1778            sessions.get(&session_id).cloned()
1779        };
1780        let Some(outbound) = session.as_ref().and_then(|s| s.outbound.get()) else {
1781            return Err(());
1782        };
1783        outbound
1784            .send_notification_frame(method, payload_bytes)
1785            .await
1786            .map_err(|_| ())
1787    }
1788
1789    /// Return the latest cached client roots for the stdio transport.
1790    /// Returns an empty vec when the client did not advertise roots
1791    /// support, when the outbound primitive was not enabled at build
1792    /// time (no cache to query), or when the server is HTTP-only.
1793    /// Callers that want to react to changes should use
1794    /// [`Self::subscribe_root_changes`] instead of polling this.
1795    ///
1796    /// Scoped to the stdio transport: HTTP servers carry one roots
1797    /// cache per session.
1798    pub fn client_roots(&self) -> Vec<Root> {
1799        self.stdio_session
1800            .get()
1801            .and_then(|session| session.roots_cache.get())
1802            .map(|c| c.snapshot())
1803            .unwrap_or_default()
1804    }
1805
1806    /// Subscribe to roots-changed notifications for the stdio
1807    /// transport. Returns `None` when the cache is not wired (no
1808    /// outbound primitive) or when the server is HTTP-only. When
1809    /// `Some`, the receiver fires every time a fresh `roots/list`
1810    /// response lands; consumers read the current value via
1811    /// [`tokio::sync::watch::Receiver::borrow`].
1812    ///
1813    /// Scoped to the stdio transport: HTTP servers carry one roots
1814    /// cache per session.
1815    pub fn subscribe_root_changes(&self) -> Option<tokio::sync::watch::Receiver<Vec<Root>>> {
1816        self.stdio_session
1817            .get()
1818            .and_then(|session| session.roots_cache.get())
1819            .map(|c| c.subscribe())
1820    }
1821
1822    /// Shim for [`McpServerBuilder::add_tools`] + [`McpServerBuilder::build`].
1823    /// The invoker's catalogue becomes the MCP `tools/list` response;
1824    /// `tools/call` dispatches via `ToolInvoker::invoke`.
1825    pub fn expose_tools(invoker: Arc<dyn ToolInvoker>) -> Self {
1826        Self::builder()
1827            .add_tools(invoker)
1828            .build()
1829            .expect("single-invoker build cannot fail")
1830    }
1831
1832    /// Shim for [`McpServerBuilder::add_agent_with_schema`] +
1833    /// [`McpServerBuilder::build`]. Wraps an [`Agent`] as a single
1834    /// MCP tool. `tools/list` reports one entry named `agent.name()`
1835    /// whose `inputSchema` is the caller-supplied `input_schema` (an
1836    /// arbitrary JSON Schema). `tools/call` decodes the `arguments`
1837    /// blob into `A::Input`, mints a fresh
1838    /// [`AgentContext`](klieo_core::agent::AgentContext) via
1839    /// `ctx_factory`, runs `agent.run(ctx, input).await`, and returns
1840    /// the JSON-serialised `A::Output`.
1841    ///
1842    /// Use this when the agent's `Input` cannot easily derive
1843    /// `schemars::JsonSchema`. For the auto-derive path enable the
1844    /// `schemars` cargo feature and use [`Self::expose_agent`].
1845    ///
1846    /// For graceful-shutdown — propagating a parent
1847    /// [`CancellationToken`] into every minted `AgentContext` — open
1848    /// the builder explicitly:
1849    /// `McpServer::builder().with_parent_cancel(tok).add_agent_with_schema(..).build()`.
1850    /// See ADR-010 for the design trade-off.
1851    pub fn expose_agent_with_schema<A>(
1852        agent: A,
1853        input_schema: serde_json::Value,
1854        ctx_factory: AgentContextFactory,
1855    ) -> Self
1856    where
1857        A: Agent + 'static,
1858        A::Input: serde::de::DeserializeOwned + Send + 'static,
1859        A::Output: serde::Serialize + Send + 'static,
1860    {
1861        Self::builder()
1862            .add_agent_with_schema(agent, input_schema, ctx_factory)
1863            .build()
1864            .expect("single-invoker build cannot fail")
1865    }
1866
1867    /// Shim for [`McpServerBuilder::add_agent`] + [`McpServerBuilder::build`].
1868    /// Auto-derive variant of [`Self::expose_agent_with_schema`].
1869    /// Requires `A::Input: schemars::JsonSchema` and the `schemars`
1870    /// cargo feature on `klieo-mcp-server`.
1871    #[cfg(feature = "schemars")]
1872    pub fn expose_agent<A>(agent: A, ctx_factory: AgentContextFactory) -> Self
1873    where
1874        A: Agent + 'static,
1875        A::Input: serde::de::DeserializeOwned + schemars::JsonSchema + Send + 'static,
1876        A::Output: serde::Serialize + Send + 'static,
1877    {
1878        Self::builder()
1879            .add_agent(agent, ctx_factory)
1880            .build()
1881            .expect("single-invoker build cannot fail")
1882    }
1883
1884    /// Shim for [`McpServerBuilder::with_hitl`] +
1885    /// [`McpServerBuilder::add_workflow_with_schema`] +
1886    /// [`McpServerBuilder::build`]. Wraps an [`Agent`] as a single MCP
1887    /// tool whose `tools/call` drives [`klieo_hitl::run_with_hitl`]
1888    /// against the supplied HITL client + config. Suspensions surface
1889    /// as `{"status":"suspended","reason":...}` (no checkpoint / ticket
1890    /// when no resume KV is wired; ADR-045).
1891    ///
1892    /// Ungoverned-build convenience constructor. With the `governor`
1893    /// feature on, the workflow hard gate requires `with_governor`, so use
1894    /// the builder path (`with_hitl` + `with_governor` + `add_workflow_*` +
1895    /// `build`) instead — this shorthand is compiled out there.
1896    #[cfg(not(feature = "governor"))]
1897    pub fn expose_workflow_with_schema<A>(
1898        agent: A,
1899        system_prompt: impl Into<String>,
1900        input_schema: serde_json::Value,
1901        run_options: klieo_core::runtime::RunOptions,
1902        hitl_client: Arc<klieo_hitl_client::HitlClient>,
1903        hitl_cfg: Arc<klieo_hitl::HitlConfig>,
1904        ctx_factory: AgentContextFactory,
1905    ) -> Result<Self, McpBuildError>
1906    where
1907        A: Agent + 'static,
1908        A::Input: serde::de::DeserializeOwned + Send + 'static,
1909    {
1910        Self::builder()
1911            .with_hitl(hitl_client, hitl_cfg)
1912            .add_workflow_with_schema(agent, system_prompt, input_schema, run_options, ctx_factory)
1913            .build()
1914    }
1915
1916    /// Auto-derive variant of [`Self::expose_workflow_with_schema`].
1917    /// Requires `A::Input: schemars::JsonSchema` and the `schemars`
1918    /// cargo feature. Ungoverned-build only (see the sibling shorthand);
1919    /// governed builds use the `with_governor` builder path.
1920    #[cfg(all(feature = "schemars", not(feature = "governor")))]
1921    pub fn expose_workflow<A>(
1922        agent: A,
1923        system_prompt: impl Into<String>,
1924        run_options: klieo_core::runtime::RunOptions,
1925        hitl_client: Arc<klieo_hitl_client::HitlClient>,
1926        hitl_cfg: Arc<klieo_hitl::HitlConfig>,
1927        ctx_factory: AgentContextFactory,
1928    ) -> Result<Self, McpBuildError>
1929    where
1930        A: Agent + 'static,
1931        A::Input: serde::de::DeserializeOwned + schemars::JsonSchema + Send + 'static,
1932    {
1933        Self::builder()
1934            .with_hitl(hitl_client, hitl_cfg)
1935            .add_workflow(agent, system_prompt, run_options, ctx_factory)
1936            .build()
1937    }
1938
1939    /// Borrow the merged [`ToolInvoker`] the server dispatches against.
1940    /// Test helper for asserting per-invocation behaviour (governor
1941    /// wrap, tenant isolation, error mapping) without standing up the
1942    /// full HTTP transport. Production callers route through
1943    /// [`Self::serve_stdio`] / [`Self::serve_http`] instead.
1944    pub fn invoker(&self) -> &std::sync::Arc<dyn ToolInvoker> {
1945        &self.invoker
1946    }
1947
1948    /// Borrow the configured resume buffer (test helper).
1949    pub fn resume_buffer(&self) -> &std::sync::Arc<dyn klieo_core::resume::ResumeBuffer> {
1950        &self.resume_buffer
1951    }
1952
1953    /// Borrow the configured pubsub (test helper + cross-replica
1954    /// fanout consumers).
1955    pub fn pubsub(&self) -> &std::sync::Arc<dyn klieo_core::Pubsub> {
1956        &self.pubsub
1957    }
1958
1959    /// Borrow the per-server [`klieo_core::CancelRegistry`] keyed by
1960    /// progressToken. The cancel-subject subscription task and the
1961    /// `tools/call` dispatch path share a single registry instance:
1962    /// `tools/call` registers a
1963    /// [`tokio_util::sync::CancellationToken`] under the progressToken
1964    /// at invocation start, the subscription task fires it on inbound
1965    /// `klieo.mcp.cancel.{progressToken}` messages, and `tools/call`
1966    /// deregisters in a finally-style cleanup once the invocation
1967    /// terminates.
1968    pub fn cancel_registry(&self) -> &klieo_core::CancelRegistry<String> {
1969        &self.cancel_registry
1970    }
1971
1972    /// Publish a cross-replica cancel signal for `progress_token` on
1973    /// `klieo.mcp.cancel.{progress_token}`. Thin wrapper around
1974    /// [`klieo_core::cancel::publish_cancel_signal`] with explicit
1975    /// mapping of [`klieo_core::BusError::Invalid`] onto
1976    /// [`McpServerError::InvalidSubject`] so callers can distinguish
1977    /// caller-input validation failure from transport failure
1978    /// without source-chain downcasting.
1979    ///
1980    /// # Security
1981    /// `progress_token` is validated against
1982    /// [`klieo_core::validate_subject_token`] before subject
1983    /// construction; metacharacters (`.`, `*`, `>`, whitespace,
1984    /// non-ASCII) yield [`McpServerError::InvalidSubject`],
1985    /// preventing a caller-controlled progressToken from collapsing
1986    /// or wildcarding the subject namespace (CWE-74). Same guard
1987    /// shape as the per-progressToken event subject published from
1988    /// the HTTP transport's streaming `tools/call` path.
1989    ///
1990    /// Cancel signals share the progressToken-as-credential threat
1991    /// model documented for resume in ADR-018 / ADR-019: knowledge
1992    /// of the progressToken grants the ability to cancel the
1993    /// invocation. Operators MUST mint unguessable progressTokens
1994    /// (UUID v4 or stronger) and gate per-tenant authorisation
1995    /// BEFORE the request reaches this server; without that,
1996    /// cross-tenant cancel becomes possible (CWE-639 IDOR).
1997    pub async fn publish_cancel(&self, progress_token: &str) -> Result<(), McpServerError> {
1998        // `?` routes via `From<BusError> for McpServerError`, which
1999        // maps `BusError::Invalid` → `McpServerError::InvalidSubject`
2000        // and every other variant → `McpServerError::Bus`. Keep the
2001        // two-arm dispatch in the `From` impl so this thin wrapper
2002        // does not duplicate the mapping.
2003        klieo_core::cancel::publish_cancel_signal(
2004            &self.pubsub,
2005            "klieo.mcp.cancel.",
2006            progress_token,
2007        )
2008        .await?;
2009        Ok(())
2010    }
2011
2012    /// Mint a [`ToolCtx`] for the streaming path, overlaying the supplied
2013    /// progress sender and request-scoped cancel token. Used by the
2014    /// HTTP transport's `stream_tools_call` so the broadcast is in
2015    /// place and disconnect-cancel is observable before `invoke` runs.
2016    #[cfg(feature = "http")]
2017    pub(crate) fn tool_ctx_with_progress(
2018        &self,
2019        progress: tokio::sync::broadcast::Sender<klieo_core::AgentEvent>,
2020        cancel: tokio_util::sync::CancellationToken,
2021        caller_principal: Option<String>,
2022        parent_anchor: Option<String>,
2023    ) -> klieo_core::tool::ToolCtx {
2024        let mut ctx = (self.tool_ctx_factory)()
2025            .with_progress(progress)
2026            .with_cancel(cancel);
2027        if let Some(principal) = caller_principal {
2028            ctx = ctx.with_caller_principal(principal);
2029        }
2030        if let Some(anchor) = parent_anchor {
2031            ctx = ctx.with_parent_anchor(anchor);
2032        }
2033        ctx
2034    }
2035
2036    /// Drive the stdio loop against the real process stdin/stdout. Thin
2037    /// wrapper around [`Self::serve_with_streams`] that mints a writer
2038    /// over [`tokio::io::stdout`] and feeds [`tokio::io::stdin`] as the
2039    /// reader; all loop semantics live in `serve_with_streams` so the
2040    /// integration tests drive the same code path over
2041    /// [`tokio::io::duplex`] pairs.
2042    ///
2043    /// Returns when stdin closes (peer disconnect) or on fatal I/O
2044    /// error.
2045    pub async fn serve_stdio(self: Arc<Self>) -> Result<(), McpServerError> {
2046        let stdin = tokio::io::stdin();
2047        let stdout: outbound::SharedWriter = Arc::new(Mutex::new(tokio::io::stdout()));
2048        self.serve_with_streams(stdin, stdout).await
2049    }
2050
2051    /// Drive the stdio loop against caller-supplied I/O streams. Reads
2052    /// newline-delimited JSON-RPC frames from `reader`, classifies each
2053    /// by shape, and either dispatches a response onto `writer` or
2054    /// routes a server-initiated response into the outbound correlation
2055    /// table. The same `writer` is primed into the private
2056    /// `stdout_writer` slot so the outbound primitive shares it —
2057    /// preventing interleaved writes between inbound replies and
2058    /// outbound requests on a single underlying stream.
2059    ///
2060    /// ## Classification
2061    ///
2062    /// - **Request** (`method` + `id`): dispatched via the private
2063    ///   `handle_jsonrpc` helper; the wire envelope is written onto
2064    ///   `writer`.
2065    /// - **Notification** (`method`, no `id`): dispatched for side
2066    ///   effects only; nothing is written back per JSON-RPC §4.1.
2067    /// - **Outbound response** (`id` + `result`/`error`, no `method`):
2068    ///   routed into `outbound::OutboundRequests::complete_pending`
2069    ///   when the server is wired with outbound support; logged and
2070    ///   dropped otherwise.
2071    /// - **Unparseable** (no `method`, no `id`): logged at `warn`.
2072    pub async fn serve_with_streams<R>(
2073        self: Arc<Self>,
2074        reader: R,
2075        writer: outbound::SharedWriter,
2076    ) -> Result<(), McpServerError>
2077    where
2078        R: tokio::io::AsyncRead + Unpin,
2079    {
2080        let stdout = self.ensure_stdout_writer_with(writer).await;
2081        self.ensure_outbound_and_roots().await;
2082        let mut lines = BufReader::new(reader).lines();
2083
2084        while let Some(line) = lines.next_line().await? {
2085            if line.trim().is_empty() {
2086                continue;
2087            }
2088            self.dispatch_stdio_line(line, stdout.clone());
2089        }
2090        Ok(())
2091    }
2092
2093    /// Dispatch one inbound frame on a spawned task so the read loop
2094    /// keeps draining stdin while the handler runs. Spawning is
2095    /// essential for the sampling path: a `tools/call` whose handler
2096    /// issues `sampling/createMessage` over `ctx.server_outbound`
2097    /// blocks until the matching response arrives — and that response
2098    /// can only arrive once the read loop reads the next frame. Without
2099    /// spawning, the loop and the handler deadlock.
2100    fn dispatch_stdio_line(self: &Arc<Self>, line: String, writer: outbound::SharedWriter) {
2101        let server = self.clone();
2102        tokio::spawn(async move {
2103            if let Err(error) = server.process_stdio_line(&line, &writer).await {
2104                warn!(error = ?error, "stdio dispatch task failed");
2105            }
2106        });
2107    }
2108
2109    /// Prime [`Self::stdout_writer`] with `writer` iff the cell is
2110    /// empty, then return a clone of the cell's value.
2111    /// [`tokio::sync::OnceCell::set`] is no-op-on-populated, so the
2112    /// first caller's writer wins; subsequent [`Self::serve_with_streams`]
2113    /// / [`Self::serve_stdio`] calls on the same server reuse the cached
2114    /// handle so the outbound primitive never observes a writer swap
2115    /// mid-flight.
2116    async fn ensure_stdout_writer_with(
2117        &self,
2118        writer: outbound::SharedWriter,
2119    ) -> outbound::SharedWriter {
2120        // `set` returns `Err` when the cell is already populated — that
2121        // is the expected fast path on every subsequent invocation. We
2122        // ignore the result and read back via `get().expect(..)` because
2123        // we've just guaranteed the cell is non-empty.
2124        let _ = self.stdout_writer.set(writer);
2125        self.stdout_writer
2126            .get()
2127            .expect("stdout_writer populated above")
2128            .clone()
2129    }
2130
2131    /// Construct the outbound correlation primitive + roots cache when
2132    /// the builder opted in via [`McpServerBuilder::with_client_sampling`].
2133    /// No-op when the flag is unset (tools-only deployments pay zero cost)
2134    /// or when the cells are already populated (subsequent
2135    /// [`Self::serve_stdio`] calls share the same primitives).
2136    async fn ensure_outbound_and_roots(&self) {
2137        if !self.declare_sampling {
2138            return;
2139        }
2140        // Cloning the cached handle keeps inbound reply writes and
2141        // outbound request writes serialised on the same `Mutex`.
2142        let writer = self
2143            .stdout_writer
2144            .get()
2145            .expect("serve_with_streams primes stdout_writer before ensure_outbound_and_roots")
2146            .clone();
2147        let session = self
2148            .stdio_session
2149            .get_or_init(|| async { std::sync::Arc::new(crate::session::Session::new_stdio()) })
2150            .await
2151            .clone();
2152        let outbound = session
2153            .outbound
2154            .get_or_init(|| async {
2155                let sink: Arc<dyn OutboundFrameSink> =
2156                    Arc::new(crate::outbound_sink::StdioFrameSink::new(writer.clone()));
2157                Arc::new(crate::outbound::OutboundRequests::new(sink))
2158            })
2159            .await
2160            .clone();
2161        let _ = session
2162            .roots_cache
2163            .get_or_init(|| async {
2164                let outbound: Arc<dyn klieo_core::ServerOutbound> = outbound.clone();
2165                Arc::new(crate::roots::RootsCache::new(outbound))
2166            })
2167            .await;
2168    }
2169
2170    /// Classify one inbound JSON-RPC frame and dispatch to the request,
2171    /// notification, or outbound-response path.
2172    async fn process_stdio_line(
2173        &self,
2174        line: &str,
2175        writer: &outbound::SharedWriter,
2176    ) -> Result<(), McpServerError> {
2177        let parsed: serde_json::Value = match serde_json::from_str(line) {
2178            Ok(value) => value,
2179            Err(error) => {
2180                // Sanitise: log the serde error, return a stable message so attacker bytes never echo back.
2181                warn!(error = %error, "rejected malformed JSON-RPC frame");
2182                let envelope = rpc_error(None, JSONRPC_PARSE_ERROR, "malformed JSON-RPC frame");
2183                return write_frame(writer, &envelope).await;
2184            }
2185        };
2186        let stdio_session = self.stdio_session.get();
2187        match classify_inbound(&parsed) {
2188            InboundKind::Request => {
2189                let envelope = self.handle_jsonrpc(parsed, stdio_session).await;
2190                write_frame(writer, &envelope).await
2191            }
2192            InboundKind::Notification => {
2193                // JSON-RPC §4.1 — notifications carry no response.
2194                self.handle_jsonrpc(parsed, stdio_session).await;
2195                Ok(())
2196            }
2197            InboundKind::OutboundResponse(id) => {
2198                self.route_outbound_response(id, parsed).await;
2199                Ok(())
2200            }
2201            InboundKind::Unparseable => {
2202                warn!("rejected inbound frame: no method and no id");
2203                Ok(())
2204            }
2205        }
2206    }
2207
2208    /// Route a server-initiated response into the stdio session's
2209    /// correlation table. When the table is absent (the builder did
2210    /// not opt in to a transport that carries reverse-direction
2211    /// JSON-RPC) the frame is logged at `warn` and dropped — the peer
2212    /// is misbehaving or the server is misconfigured, but neither
2213    /// warrants tearing down the stdio loop.
2214    async fn route_outbound_response(&self, id: i64, frame: serde_json::Value) {
2215        if let Some(outbound) = self
2216            .stdio_session
2217            .get()
2218            .and_then(|session| session.outbound.get())
2219        {
2220            outbound.complete_pending(id, frame).await;
2221        } else {
2222            warn!(
2223                rpc_id = id,
2224                "outbound response received but server has no outbound table wired"
2225            );
2226        }
2227    }
2228
2229    /// Parse + dispatch a single newline-delimited JSON-RPC frame.
2230    /// Retained from the pre-T2 surface for unit tests that feed a
2231    /// request line straight into the dispatcher and expect the wire
2232    /// envelope back. Notifications + outbound responses go through
2233    /// [`Self::process_stdio_line`] instead, which classifies the
2234    /// frame by shape before dispatch.
2235    #[cfg(test)]
2236    async fn handle_line(&self, line: &str) -> serde_json::Value {
2237        let req: serde_json::Value = match serde_json::from_str(line) {
2238            Ok(v) => v,
2239            Err(e) => {
2240                warn!(error = %e, "rejected malformed JSON-RPC frame");
2241                return rpc_error(None, JSONRPC_PARSE_ERROR, "malformed JSON-RPC frame");
2242            }
2243        };
2244        self.handle_jsonrpc(req, self.stdio_session.get()).await
2245    }
2246
2247    /// Dispatch one already-parsed JSON-RPC request value to the
2248    /// matching handler. Returns a fully-formed JSON-RPC response
2249    /// envelope (`result` on success, `error` on failure). Shared
2250    /// by all transports.
2251    ///
2252    /// `session` carries the per-transport [`crate::session::Session`]
2253    /// that owns the outbound primitive + roots cache for this
2254    /// dispatch. Stdio callers pass `self.stdio_session.get()`; HTTP
2255    /// callers pass the session minted by `handle_initialize_post` or
2256    /// resolved by `require_session`. Notification arms that drive
2257    /// per-session side effects (roots seed + refresh) read from this
2258    /// parameter; method arms that touch only catalogue state ignore
2259    /// it. `None` degrades the notification arms to a silent no-op.
2260    pub(crate) async fn handle_jsonrpc(
2261        &self,
2262        req: serde_json::Value,
2263        session: Option<&std::sync::Arc<crate::session::Session>>,
2264    ) -> serde_json::Value {
2265        let id = req.get("id").cloned();
2266        let method = req.get("method").and_then(|m| m.as_str()).unwrap_or("");
2267
2268        match method {
2269            "initialize" => rpc_ok(id, self.handle_initialize(&req).await),
2270            "notifications/initialized" => {
2271                self.handle_initialized_notification(session).await;
2272                serde_json::Value::Null
2273            }
2274            "notifications/roots/list_changed" => {
2275                self.handle_roots_list_changed_notification(session);
2276                serde_json::Value::Null
2277            }
2278            "shutdown" => rpc_ok(id, serde_json::Value::Null),
2279            "tools/list" => rpc_ok(id, self.tools_list()),
2280            "tools/call" => match self.tools_call(req.get("params")).await {
2281                Ok(v) => rpc_ok(id, v),
2282                Err(e) => tool_error_to_envelope(id, e),
2283            },
2284            other => {
2285                warn!(rpc_id = ?id, method = other, "method not found");
2286                rpc_error(
2287                    id,
2288                    JSONRPC_METHOD_NOT_FOUND,
2289                    &format!("method not found: {other}"),
2290                )
2291            }
2292        }
2293    }
2294
2295    /// Parse the client-declared capabilities from the `initialize`
2296    /// request, store the flags this server cares about under
2297    /// [`Self::client_caps`], and return the result payload for the
2298    /// outgoing JSON-RPC response. Splits the "absorb caps + mint
2299    /// response" pair out of [`Self::handle_jsonrpc`] so the dispatch
2300    /// arm stays a single-statement readable line.
2301    async fn handle_initialize(&self, req: &serde_json::Value) -> serde_json::Value {
2302        let roots_supported = req.pointer("/params/capabilities/roots").is_some();
2303        {
2304            let mut caps = self.client_caps.lock().await;
2305            caps.roots_supported = roots_supported;
2306        }
2307        if self.declare_sampling {
2308            initialize_result_with_sampling()
2309        } else {
2310            initialize_result_without_sampling()
2311        }
2312    }
2313
2314    /// Handle `notifications/initialized`. When the client advertised
2315    /// `capabilities.roots` on `initialize` AND the dispatch session
2316    /// carries a wired outbound primitive (i.e. the builder opted in
2317    /// via [`McpServerBuilder::with_client_sampling`]), spawn a
2318    /// one-shot `roots/list` fetch to seed the cache. The fetch runs
2319    /// on a detached task so the notification dispatch does not block
2320    /// on peer round-trip latency.
2321    ///
2322    /// `session` resolves to the per-transport session that owns the
2323    /// cache; `None` (no session in dispatch context) or a missing
2324    /// cache both degrade silently.
2325    async fn handle_initialized_notification(
2326        &self,
2327        session: Option<&std::sync::Arc<crate::session::Session>>,
2328    ) {
2329        let roots_supported = self.client_caps.lock().await.roots_supported;
2330        if !roots_supported {
2331            return;
2332        }
2333        let Some(cache) = session
2334            .and_then(|session| session.roots_cache.get())
2335            .cloned()
2336        else {
2337            return;
2338        };
2339        tokio::spawn(async move {
2340            if let Err(error) = cache.refresh().await {
2341                warn!(error = ?error, "initial roots/list fetch failed");
2342            }
2343        });
2344    }
2345
2346    /// Handle `notifications/roots/list_changed`. The client signals
2347    /// that its declared roots have changed; the server reacts by
2348    /// re-issuing `roots/list` to refresh the cached snapshot. The
2349    /// refresh runs on a detached task so the notification dispatch
2350    /// does not block on peer round-trip latency.
2351    ///
2352    /// `session` resolves to the per-transport session that owns the
2353    /// cache. Acts as a no-op when `None` (no session in dispatch
2354    /// context) or when the cache is not wired — tools-only
2355    /// deployments that never opted in to the outbound primitive
2356    /// have nothing to refresh, and a misbehaving peer sending the
2357    /// notification anyway should not panic the server.
2358    fn handle_roots_list_changed_notification(
2359        &self,
2360        session: Option<&std::sync::Arc<crate::session::Session>>,
2361    ) {
2362        let Some(cache) = session
2363            .and_then(|session| session.roots_cache.get())
2364            .cloned()
2365        else {
2366            return;
2367        };
2368        tokio::spawn(async move {
2369            if let Err(error) = cache.refresh().await {
2370                warn!(error = ?error, "roots list_changed re-fetch failed");
2371            }
2372        });
2373    }
2374
2375    fn tools_list(&self) -> serde_json::Value {
2376        let tools: Vec<serde_json::Value> = self
2377            .invoker
2378            .catalogue()
2379            .iter()
2380            .map(tool_def_to_mcp_descriptor)
2381            .collect();
2382        serde_json::json!({ "tools": tools })
2383    }
2384
2385    async fn tools_call(
2386        &self,
2387        params: Option<&serde_json::Value>,
2388    ) -> Result<serde_json::Value, ToolError> {
2389        let params = params.ok_or_else(|| ToolError::InvalidArgs("missing params".into()))?;
2390        let name = params
2391            .get("name")
2392            .and_then(|n| n.as_str())
2393            .ok_or_else(|| ToolError::InvalidArgs("missing tool name".into()))?;
2394        let args = params
2395            .get("arguments")
2396            .cloned()
2397            .unwrap_or(serde_json::Value::Null);
2398        let ctx = (self.tool_ctx_factory)().with_cancel(self.parent_cancel.child_token());
2399        let out = self.invoker.invoke(name, args, ctx).await?;
2400        Ok(serde_json::json!({
2401            "content": [
2402                { "type": "text", "text": out.to_string() }
2403            ]
2404        }))
2405    }
2406}
2407
2408/// Internal adapter: presents one [`Agent`] as a single-tool
2409/// [`ToolInvoker`] so the MCP dispatch loop need not learn about
2410/// agents. The wrapper mints an [`AgentContext`] per request via
2411/// `ctx_factory` and derives a child cancel token from the incoming
2412/// [`ToolCtx`], propagating both server-level and per-request
2413/// cancellation into the agent run.
2414struct AgentAsToolInvoker<A>
2415where
2416    A: Agent + 'static,
2417    A::Input: serde::de::DeserializeOwned + Send + 'static,
2418    A::Output: serde::Serialize + Send + 'static,
2419{
2420    agent: Arc<A>,
2421    name: String,
2422    input_schema: serde_json::Value,
2423    ctx_factory: AgentContextFactory,
2424    /// When the builder was wired with a governor, every agent run
2425    /// wraps `ctx.llm` with a [`GovernedLlmClient`] keyed off the
2426    /// tenant label. `None` keeps the legacy
2427    /// ungoverned path so adopters that govern out-of-band stay
2428    /// unaffected.
2429    #[cfg(feature = "governor")]
2430    governor: Option<crate::governor::GovernorBundle>,
2431}
2432
2433#[async_trait]
2434impl<A> ToolInvoker for AgentAsToolInvoker<A>
2435where
2436    A: Agent + 'static,
2437    A::Input: serde::de::DeserializeOwned + Send + 'static,
2438    A::Output: serde::Serialize + Send + 'static,
2439{
2440    fn catalogue(&self) -> Vec<ToolDef> {
2441        vec![ToolDef::new(
2442            self.name.clone(),
2443            format!("klieo agent: {}", self.name),
2444            self.input_schema.clone(),
2445        )]
2446    }
2447
2448    async fn invoke(
2449        &self,
2450        name: &str,
2451        args: serde_json::Value,
2452        tool_ctx: ToolCtx,
2453    ) -> Result<serde_json::Value, ToolError> {
2454        if name != self.name {
2455            return Err(ToolError::UnknownTool(name.into()));
2456        }
2457        // Sanitise wire-bound error messages: keep stable strings on
2458        // the response payload; log the full third-party Display
2459        // server-side. Mirrors the parse-error sanitisation already
2460        // applied in `handle_line` (CWE-209: no internal error
2461        // text — type names, file paths, response bodies — over the
2462        // transport boundary).
2463        let input: A::Input = serde_json::from_value(args).map_err(|e| {
2464            warn!(agent = %self.name, error = %e, "decode of MCP tools/call args failed");
2465            ToolError::InvalidArgs("arguments do not match inputSchema".into())
2466        })?;
2467        let mut ctx = (self.ctx_factory)();
2468        // Child token so both server-level cancel (carried into `tool_ctx`
2469        // by `tools_call`) and per-request cancel (carried in by the HTTP
2470        // streaming path's `DropGuard`) reach the agent without the agent
2471        // needing to know which origin triggered the disconnect.
2472        ctx.cancel = tool_ctx.cancel.child_token();
2473        ctx.progress = tool_ctx.progress.clone();
2474        // The verified raw principal stays server-side; hash it into a
2475        // non-PII label so the audit trail records attribution via
2476        // `Episode::RunAttributed` without admitting the principal into
2477        // agent memory or any LLM-visible surface (ADR-045).
2478        if let Some(principal) = tool_ctx.caller_principal.as_ref() {
2479            ctx = ctx.with_tenant_label(klieo_core::principal_hash(principal.as_str()));
2480        }
2481        // The HTTP boundary threads the cross-hop anchor only for
2482        // authenticated callers, so a stamped `Episode::RunOrigin` is
2483        // always co-attributable to the `RunAttributed` principal above.
2484        // Recorded verbatim; never reaches an LLM-visible surface.
2485        if let Some(anchor) = tool_ctx.parent_anchor.as_ref() {
2486            ctx = ctx.with_parent_anchor(anchor.as_str().to_string());
2487        }
2488        // Agents are governed only when a bundle was wired at
2489        // registration; ungoverned agents keep the legacy path (no hard
2490        // gate at `add_agent` time, unlike workflows).
2491        #[cfg(feature = "governor")]
2492        if let Some(bundle) = self.governor.as_ref() {
2493            ctx = crate::governor::wrap_ctx_with_governor(ctx, bundle);
2494        }
2495        let output = self.agent.run(ctx, input).await.map_err(|e| {
2496            warn!(agent = %self.name, error = %e, "exposed agent execution failed");
2497            ToolError::Permanent("agent execution failed".into())
2498        })?;
2499        serde_json::to_value(output).map_err(|e| {
2500            warn!(agent = %self.name, error = %e, "encode of agent output failed");
2501            ToolError::Permanent("agent output not serialisable".into())
2502        })
2503    }
2504}
2505
2506/// Internal multi-invoker: merges the catalogues of multiple
2507/// [`ToolInvoker`]s and routes each `invoke(name, ..)` to the inner
2508/// invoker that claims the named tool.
2509///
2510/// Construction walks every inner catalogue once and builds two
2511/// memoised structures:
2512/// - `routes: HashMap<tool_name, inner_index>` for O(1) dispatch.
2513/// - `merged_catalogue: Vec<ToolDef>` cached so `catalogue()` does
2514///   not re-walk the inner invokers per `tools/list` request.
2515///
2516/// Tool-name collisions are rejected at construction time —
2517/// [`Self::new`] panics on the first duplicate so routing stays
2518/// unambiguous (silent first-match-wins is fragile when two agents
2519/// share a name).
2520struct MergedInvoker {
2521    inner: Vec<Arc<dyn ToolInvoker>>,
2522    routes: std::collections::HashMap<String, usize>,
2523    merged_catalogue: Vec<ToolDef>,
2524}
2525
2526impl MergedInvoker {
2527    fn new(inner: Vec<Arc<dyn ToolInvoker>>) -> Result<Self, McpBuildError> {
2528        let mut routes: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
2529        let mut merged_catalogue: Vec<ToolDef> = Vec::new();
2530        for (index, invoker) in inner.iter().enumerate() {
2531            for tool in invoker.catalogue() {
2532                if routes.insert(tool.name.clone(), index).is_some() {
2533                    return Err(McpBuildError::DuplicateTool(tool.name));
2534                }
2535                merged_catalogue.push(tool);
2536            }
2537        }
2538        Ok(Self {
2539            inner,
2540            routes,
2541            merged_catalogue,
2542        })
2543    }
2544}
2545
2546#[async_trait]
2547impl ToolInvoker for MergedInvoker {
2548    fn catalogue(&self) -> Vec<ToolDef> {
2549        self.merged_catalogue.clone()
2550    }
2551
2552    async fn invoke(
2553        &self,
2554        name: &str,
2555        args: serde_json::Value,
2556        ctx: ToolCtx,
2557    ) -> Result<serde_json::Value, ToolError> {
2558        match self.routes.get(name) {
2559            Some(&index) => self.inner[index].invoke(name, args, ctx).await,
2560            None => Err(ToolError::UnknownTool(name.into())),
2561        }
2562    }
2563
2564    /// Forward to the inner invoker that owns `name` so per-tool
2565    /// idempotency overrides survive the merge. Tools whose owning
2566    /// invoker is not registered fall through to the default `false`.
2567    /// Cluster-0.24 follower re-invoke reads this on the orphan
2568    /// resume path; see [`crate::http::handle_dead_leader_orphan_mcp`].
2569    fn is_tool_idempotent(&self, name: &str) -> bool {
2570        match self.routes.get(name) {
2571            Some(&index) => self.inner[index].is_tool_idempotent(name),
2572            None => false,
2573        }
2574    }
2575
2576    /// Forward to the inner invoker that owns `name` so a PII-flagged
2577    /// tool's audit-redaction requirement survives the merge. Without
2578    /// this the decorator's default `false` would make dispatch record
2579    /// raw PII for any tool reached through a merged catalogue.
2580    fn tool_redacts_audit(&self, name: &str) -> bool {
2581        match self.routes.get(name) {
2582            Some(&index) => self.inner[index].tool_redacts_audit(name),
2583            None => false,
2584        }
2585    }
2586}
2587
2588/// Map a [`ToolError`] to a JSON-RPC error envelope. Applies the
2589/// standard CWE-209 redaction policy shared by both the JSON path
2590/// (in `handle_jsonrpc`) and the SSE streaming path.
2591///
2592/// - `UnknownTool(name)` — safe to echo; surfaces tool name verbatim.
2593/// - `InvalidArgs(reason)` — sanitised by the invoker layer; echo as-is.
2594/// - All other variants — log full error server-side, surface only the
2595///   stable `"tool invocation failed"` string.
2596pub(crate) fn tool_error_to_envelope(
2597    id: Option<serde_json::Value>,
2598    e: ToolError,
2599) -> serde_json::Value {
2600    let stable_msg = match &e {
2601        ToolError::UnknownTool(name) => {
2602            warn!(rpc_id = ?id, tool = %name, "tools/call: unknown tool");
2603            format!("unknown tool: {name}")
2604        }
2605        ToolError::InvalidArgs(reason) => {
2606            warn!(rpc_id = ?id, reason = %reason, "tools/call: invalid args");
2607            reason.clone()
2608        }
2609        _ => {
2610            warn!(rpc_id = ?id, error = %e, "tools/call failed");
2611            "tool invocation failed".into()
2612        }
2613    };
2614    rpc_error(id, JSONRPC_SERVER_ERROR, &stable_msg)
2615}
2616
2617fn tool_def_to_mcp_descriptor(def: &ToolDef) -> serde_json::Value {
2618    serde_json::json!({
2619        "name": def.name,
2620        "description": def.description,
2621        "inputSchema": def.json_schema,
2622    })
2623}
2624
2625fn initialize_result_with_sampling() -> serde_json::Value {
2626    initialize_result_inner(true)
2627}
2628
2629fn initialize_result_without_sampling() -> serde_json::Value {
2630    initialize_result_inner(false)
2631}
2632
2633/// Build the JSON payload of an `initialize` response.
2634///
2635/// `with_sampling` gates the `capabilities.sampling = {}` field so
2636/// MCP-conformant clients listen for `sampling/createMessage` outbound
2637/// requests only when the server actually wires the outbound primitive.
2638fn initialize_result_inner(with_sampling: bool) -> serde_json::Value {
2639    let mut capabilities = serde_json::json!({ "tools": {} });
2640    if with_sampling {
2641        capabilities["sampling"] = serde_json::json!({});
2642    }
2643    serde_json::json!({
2644        "protocolVersion": MCP_PROTOCOL_VERSION,
2645        "capabilities": capabilities,
2646        "serverInfo": { "name": "klieo-mcp-server", "version": env!("CARGO_PKG_VERSION") }
2647    })
2648}
2649
2650pub(crate) fn rpc_ok(id: Option<serde_json::Value>, result: serde_json::Value) -> serde_json::Value {
2651    serde_json::json!({ "jsonrpc": "2.0", "id": id, "result": result })
2652}
2653
2654/// Shape of an inbound JSON-RPC frame as classified by
2655/// [`classify_inbound`]. The stdio read loop maps each variant to a
2656/// distinct dispatch path (request vs notification vs server-initiated
2657/// response vs reject).
2658#[derive(Debug)]
2659enum InboundKind {
2660    /// `method` + `id`: peer-initiated request — dispatch + write the
2661    /// response.
2662    Request,
2663    /// `method`, no `id`: peer-initiated notification — dispatch for
2664    /// side effects only; no reply per JSON-RPC §4.1.
2665    Notification,
2666    /// `id` + (`result` or `error`), no `method`: response to a
2667    /// previous server-initiated outbound request.
2668    OutboundResponse(i64),
2669    /// Neither `method` nor a usable `id` payload — peer is
2670    /// misbehaving; log + drop.
2671    Unparseable,
2672}
2673
2674/// Classify an already-parsed JSON-RPC frame by shape. Pure function:
2675/// no side effects, fully unit-testable.
2676fn classify_inbound(value: &serde_json::Value) -> InboundKind {
2677    let has_method = value.get("method").is_some();
2678    let id = value.get("id");
2679    if has_method {
2680        return if id.is_some() {
2681            InboundKind::Request
2682        } else {
2683            InboundKind::Notification
2684        };
2685    }
2686    let has_payload = value.get("result").is_some() || value.get("error").is_some();
2687    match (id.and_then(serde_json::Value::as_i64), has_payload) {
2688        (Some(id), true) => InboundKind::OutboundResponse(id),
2689        _ => InboundKind::Unparseable,
2690    }
2691}
2692
2693/// Serialise + flush a single JSON-RPC envelope onto the shared
2694/// transport writer. Holds the writer mutex across write + flush so
2695/// two frames cannot interleave on a single underlying file
2696/// descriptor (stdout in production, in-memory duplex in tests).
2697async fn write_frame(
2698    writer: &outbound::SharedWriter,
2699    envelope: &serde_json::Value,
2700) -> Result<(), McpServerError> {
2701    let bytes = serde_json::to_vec(envelope)?;
2702    let mut guard = writer.lock().await;
2703    guard.write_all(&bytes).await?;
2704    guard.write_all(b"\n").await?;
2705    guard.flush().await?;
2706    Ok(())
2707}
2708
2709pub(crate) fn rpc_error(
2710    id: Option<serde_json::Value>,
2711    code: i64,
2712    message: &str,
2713) -> serde_json::Value {
2714    serde_json::json!({
2715        "jsonrpc": "2.0",
2716        "id": id,
2717        "error": { "code": code, "message": message }
2718    })
2719}
2720
2721/// Construct a default-shape `ToolCtx` for use in downstream
2722/// integration tests. Hidden from rustdoc — not part of the
2723/// public API contract.
2724#[doc(hidden)]
2725pub fn __test_noop_ctx() -> klieo_core::tool::ToolCtx {
2726    noop_ctx()
2727}
2728
2729fn noop_ctx() -> ToolCtx {
2730    let bus = klieo_bus_memory::MemoryBus::new();
2731    ToolCtx::new(bus.pubsub, bus.kv, bus.jobs)
2732}
2733
2734#[cfg(test)]
2735mod tests {
2736    use super::*;
2737    use async_trait::async_trait;
2738    use klieo_core::tool::Tool;
2739    use std::sync::OnceLock;
2740
2741    struct EmptyInvoker;
2742
2743    #[async_trait]
2744    impl ToolInvoker for EmptyInvoker {
2745        fn catalogue(&self) -> Vec<ToolDef> {
2746            Vec::new()
2747        }
2748        async fn invoke(
2749            &self,
2750            name: &str,
2751            _args: serde_json::Value,
2752            _ctx: ToolCtx,
2753        ) -> Result<serde_json::Value, ToolError> {
2754            Err(ToolError::UnknownTool(name.into()))
2755        }
2756    }
2757
2758    struct Echo;
2759
2760    #[async_trait]
2761    impl Tool for Echo {
2762        fn name(&self) -> &str {
2763            "echo"
2764        }
2765        fn description(&self) -> &str {
2766            "echoes back its args"
2767        }
2768        fn json_schema(&self) -> &serde_json::Value {
2769            static S: OnceLock<serde_json::Value> = OnceLock::new();
2770            S.get_or_init(|| serde_json::json!({"type": "object"}))
2771        }
2772        async fn invoke(
2773            &self,
2774            args: serde_json::Value,
2775            _ctx: ToolCtx,
2776        ) -> Result<serde_json::Value, ToolError> {
2777            Ok(args)
2778        }
2779    }
2780
2781    struct OneToolInvoker;
2782
2783    #[async_trait]
2784    impl ToolInvoker for OneToolInvoker {
2785        fn catalogue(&self) -> Vec<ToolDef> {
2786            vec![ToolDef::new(
2787                "echo",
2788                "echoes back its args",
2789                serde_json::json!({"type": "object"}),
2790            )]
2791        }
2792        async fn invoke(
2793            &self,
2794            name: &str,
2795            args: serde_json::Value,
2796            ctx: ToolCtx,
2797        ) -> Result<serde_json::Value, ToolError> {
2798            if name == "echo" {
2799                Echo.invoke(args, ctx).await
2800            } else {
2801                Err(ToolError::UnknownTool(name.into()))
2802            }
2803        }
2804    }
2805
2806    #[tokio::test]
2807    async fn initialize_returns_server_info() {
2808        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
2809        let resp = server
2810            .handle_line(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}"#)
2811            .await;
2812        let info = resp["result"]["serverInfo"]["name"].as_str().unwrap();
2813        assert_eq!(info, "klieo-mcp-server");
2814    }
2815
2816    #[tokio::test]
2817    async fn tools_list_surfaces_invoker_catalogue() {
2818        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
2819        let resp = server
2820            .handle_line(r#"{"jsonrpc":"2.0","id":2,"method":"tools/list"}"#)
2821            .await;
2822        let tools = resp["result"]["tools"].as_array().unwrap();
2823        assert_eq!(tools.len(), 1);
2824        assert_eq!(tools[0]["name"], "echo");
2825    }
2826
2827    #[tokio::test]
2828    async fn tools_call_dispatches_to_invoker() {
2829        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
2830        let resp = server
2831            .handle_line(
2832                r#"{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"echo","arguments":{"hello":"world"}}}"#,
2833            )
2834            .await;
2835        let text = resp["result"]["content"][0]["text"].as_str().unwrap();
2836        assert!(text.contains("hello"));
2837        assert!(text.contains("world"));
2838    }
2839
2840    #[tokio::test]
2841    async fn unknown_method_returns_method_not_found() {
2842        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
2843        let resp = server
2844            .handle_line(r#"{"jsonrpc":"2.0","id":4,"method":"nope"}"#)
2845            .await;
2846        assert_eq!(resp["error"]["code"], JSONRPC_METHOD_NOT_FOUND);
2847    }
2848
2849    #[tokio::test]
2850    async fn tools_call_without_params_returns_server_error() {
2851        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
2852        let resp = server
2853            .handle_line(r#"{"jsonrpc":"2.0","id":5,"method":"tools/call"}"#)
2854            .await;
2855        assert_eq!(resp["error"]["code"], JSONRPC_SERVER_ERROR);
2856        assert!(resp["error"]["message"]
2857            .as_str()
2858            .unwrap()
2859            .contains("missing params"));
2860    }
2861
2862    #[tokio::test]
2863    async fn tools_call_unknown_tool_surfaces_invoker_error() {
2864        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
2865        let resp = server
2866            .handle_line(
2867                r#"{"jsonrpc":"2.0","id":6,"method":"tools/call","params":{"name":"does-not-exist","arguments":{}}}"#,
2868            )
2869            .await;
2870        assert_eq!(resp["error"]["code"], JSONRPC_SERVER_ERROR);
2871        assert!(resp["error"]["message"]
2872            .as_str()
2873            .unwrap()
2874            .contains("does-not-exist"));
2875    }
2876
2877    #[tokio::test]
2878    async fn malformed_frame_returns_sanitised_parse_error() {
2879        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
2880        let resp = server.handle_line("not json").await;
2881        assert_eq!(resp["error"]["code"], JSONRPC_PARSE_ERROR);
2882        // The peer must NOT see the underlying serde_json error text;
2883        // only the stable sanitised message.
2884        let msg = resp["error"]["message"].as_str().unwrap();
2885        assert_eq!(msg, "malformed JSON-RPC frame");
2886    }
2887
2888    #[tokio::test]
2889    async fn handle_jsonrpc_dispatches_initialize() {
2890        let server = McpServer::builder()
2891            .add_tools(Arc::new(EmptyInvoker))
2892            .build()
2893            .unwrap();
2894        let req = serde_json::json!({
2895            "jsonrpc": "2.0",
2896            "id": 1,
2897            "method": "initialize",
2898            "params": {}
2899        });
2900        let resp = server.handle_jsonrpc(req, None).await;
2901        assert_eq!(resp["jsonrpc"], "2.0");
2902        assert_eq!(resp["id"], 1);
2903        assert!(resp["result"].is_object());
2904    }
2905
2906    #[tokio::test]
2907    async fn handle_jsonrpc_returns_method_not_found_for_unknown() {
2908        let server = McpServer::builder()
2909            .add_tools(Arc::new(EmptyInvoker))
2910            .build()
2911            .unwrap();
2912        let req = serde_json::json!({
2913            "jsonrpc": "2.0",
2914            "id": 7,
2915            "method": "no_such_method"
2916        });
2917        let resp = server.handle_jsonrpc(req, None).await;
2918        assert_eq!(resp["error"]["code"], JSONRPC_METHOD_NOT_FOUND);
2919        assert_eq!(resp["id"], 7);
2920    }
2921
2922    #[tokio::test]
2923    async fn classify_inbound_recognises_request_shape() {
2924        let frame = serde_json::json!({
2925            "jsonrpc": "2.0",
2926            "id": 1,
2927            "method": "tools/list"
2928        });
2929        assert!(matches!(classify_inbound(&frame), InboundKind::Request));
2930    }
2931
2932    #[tokio::test]
2933    async fn classify_inbound_recognises_notification_shape() {
2934        let frame = serde_json::json!({
2935            "jsonrpc": "2.0",
2936            "method": "notifications/initialized"
2937        });
2938        assert!(matches!(
2939            classify_inbound(&frame),
2940            InboundKind::Notification
2941        ));
2942    }
2943
2944    #[tokio::test]
2945    async fn classify_inbound_recognises_outbound_result_shape() {
2946        let frame = serde_json::json!({
2947            "jsonrpc": "2.0",
2948            "id": 42,
2949            "result": {"role": "assistant"}
2950        });
2951        match classify_inbound(&frame) {
2952            InboundKind::OutboundResponse(id) => assert_eq!(id, 42),
2953            other => panic!("expected OutboundResponse(42), got {other:?}"),
2954        }
2955    }
2956
2957    #[tokio::test]
2958    async fn classify_inbound_recognises_outbound_error_shape() {
2959        let frame = serde_json::json!({
2960            "jsonrpc": "2.0",
2961            "id": 7,
2962            "error": {"code": -32601, "message": "Method not found"}
2963        });
2964        match classify_inbound(&frame) {
2965            InboundKind::OutboundResponse(id) => assert_eq!(id, 7),
2966            other => panic!("expected OutboundResponse(7), got {other:?}"),
2967        }
2968    }
2969
2970    #[tokio::test]
2971    async fn classify_inbound_rejects_no_method_no_id() {
2972        let frame = serde_json::json!({"jsonrpc": "2.0"});
2973        assert!(matches!(classify_inbound(&frame), InboundKind::Unparseable));
2974    }
2975
2976    #[tokio::test]
2977    async fn classify_inbound_rejects_bare_id_without_payload() {
2978        // `id` present, but neither `result` nor `error` and no
2979        // `method`. JSON-RPC does not define this shape; treat as
2980        // unparseable rather than guess the peer's intent.
2981        let frame = serde_json::json!({"jsonrpc": "2.0", "id": 9});
2982        assert!(matches!(classify_inbound(&frame), InboundKind::Unparseable));
2983    }
2984
2985    /// Shared in-memory byte buffer used by [`BufferSink`]. Wrapped in
2986    /// `std::sync::Mutex` so `AsyncWrite::poll_write` can append
2987    /// synchronously without yielding to the runtime.
2988    type CapturedBytes = std::sync::Arc<std::sync::Mutex<Vec<u8>>>;
2989
2990    /// Mint a [`outbound::SharedWriter`] backed by an in-memory buffer
2991    /// and return the buffer handle so the test can read back what
2992    /// the loop wrote.
2993    fn duplex_writer() -> (outbound::SharedWriter, CapturedBytes) {
2994        let buffer: CapturedBytes = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
2995        let shared: outbound::SharedWriter = Arc::new(Mutex::new(BufferSink(buffer.clone())));
2996        (shared, buffer)
2997    }
2998
2999    /// In-memory [`tokio::io::AsyncWrite`] sink that appends every
3000    /// byte into a shared `Vec<u8>` so the test can read back what the
3001    /// loop wrote out.
3002    struct BufferSink(CapturedBytes);
3003
3004    impl tokio::io::AsyncWrite for BufferSink {
3005        fn poll_write(
3006            self: std::pin::Pin<&mut Self>,
3007            _cx: &mut std::task::Context<'_>,
3008            buf: &[u8],
3009        ) -> std::task::Poll<std::io::Result<usize>> {
3010            self.0
3011                .lock()
3012                .expect("BufferSink mutex poisoned in test")
3013                .extend_from_slice(buf);
3014            std::task::Poll::Ready(Ok(buf.len()))
3015        }
3016
3017        fn poll_flush(
3018            self: std::pin::Pin<&mut Self>,
3019            _cx: &mut std::task::Context<'_>,
3020        ) -> std::task::Poll<std::io::Result<()>> {
3021            std::task::Poll::Ready(Ok(()))
3022        }
3023
3024        fn poll_shutdown(
3025            self: std::pin::Pin<&mut Self>,
3026            _cx: &mut std::task::Context<'_>,
3027        ) -> std::task::Poll<std::io::Result<()>> {
3028            std::task::Poll::Ready(Ok(()))
3029        }
3030    }
3031
3032    fn captured_bytes(buffer: &CapturedBytes) -> Vec<u8> {
3033        buffer
3034            .lock()
3035            .expect("captured-bytes mutex poisoned in test")
3036            .clone()
3037    }
3038
3039    #[tokio::test]
3040    async fn process_stdio_line_writes_response_for_request() {
3041        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3042        let (writer, buffer) = duplex_writer();
3043        let request = r#"{"jsonrpc":"2.0","id":11,"method":"tools/list"}"#;
3044        server
3045            .process_stdio_line(request, &writer)
3046            .await
3047            .expect("stdio dispatch must not fail");
3048        let bytes = captured_bytes(&buffer);
3049        assert!(bytes.ends_with(b"\n"), "frames are newline-delimited");
3050        let envelope: serde_json::Value =
3051            serde_json::from_slice(bytes.trim_ascii_end()).expect("written frame must be JSON");
3052        assert_eq!(envelope["id"], 11);
3053        assert!(envelope["result"]["tools"].is_array());
3054    }
3055
3056    #[tokio::test]
3057    async fn process_stdio_line_drops_outbound_response_when_table_absent() {
3058        // Server built without the future `with_client_sampling` opt-in
3059        // has `outbound = None`. An incoming response shape must be
3060        // logged and dropped, NOT written back to the peer (that would
3061        // echo the same id back and confuse the client).
3062        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3063        assert!(
3064            server
3065                .stdio_session
3066                .get()
3067                .and_then(|s| s.outbound.get())
3068                .is_none(),
3069            "default-built server must not wire an outbound table"
3070        );
3071        let (writer, buffer) = duplex_writer();
3072        let stray = r#"{"jsonrpc":"2.0","id":99,"result":{"role":"assistant"}}"#;
3073        server
3074            .process_stdio_line(stray, &writer)
3075            .await
3076            .expect("stray response must not break the loop");
3077        assert!(
3078            captured_bytes(&buffer).is_empty(),
3079            "stray outbound responses must never produce wire output"
3080        );
3081    }
3082
3083    #[tokio::test]
3084    async fn process_stdio_line_drops_notification_without_writing() {
3085        // Per JSON-RPC §4.1, a notification (method + no id) must not
3086        // produce a wire response — even when `handle_jsonrpc`'s
3087        // method-not-found path generates an envelope internally.
3088        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3089        let (writer, buffer) = duplex_writer();
3090        let notification = r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#;
3091        server
3092            .process_stdio_line(notification, &writer)
3093            .await
3094            .expect("notification dispatch must not fail");
3095        assert!(
3096            captured_bytes(&buffer).is_empty(),
3097            "notifications must not produce wire output"
3098        );
3099    }
3100
3101    #[tokio::test]
3102    async fn process_stdio_line_drops_unparseable_frame() {
3103        // Frame missing both `method` and a usable `id` payload is
3104        // unparseable per `classify_inbound`. Must log + drop without
3105        // writing.
3106        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3107        let (writer, buffer) = duplex_writer();
3108        let unparseable = r#"{"jsonrpc":"2.0"}"#;
3109        server
3110            .process_stdio_line(unparseable, &writer)
3111            .await
3112            .expect("unparseable frame must not break the loop");
3113        assert!(
3114            captured_bytes(&buffer).is_empty(),
3115            "unparseable frames must not produce wire output"
3116        );
3117    }
3118
3119    #[tokio::test]
3120    async fn process_stdio_line_writes_parse_error_for_malformed_json() {
3121        // Malformed bytes still produce a wire-visible parse error so
3122        // the peer can react. The detailed serde error stays server-
3123        // side; only the sanitised message reaches the wire.
3124        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3125        let (writer, buffer) = duplex_writer();
3126        server
3127            .process_stdio_line("not json", &writer)
3128            .await
3129            .expect("parse-error path must not fail the loop");
3130        let bytes = captured_bytes(&buffer);
3131        let envelope: serde_json::Value =
3132            serde_json::from_slice(bytes.trim_ascii_end()).expect("parse-error envelope is JSON");
3133        assert_eq!(envelope["error"]["code"], JSONRPC_PARSE_ERROR);
3134        assert_eq!(envelope["error"]["message"], "malformed JSON-RPC frame");
3135    }
3136
3137    #[tokio::test]
3138    async fn initialize_arm_records_roots_capability_when_advertised() {
3139        // Client advertised `capabilities.roots = {}` → server records
3140        // `roots_supported = true` so the subsequent
3141        // `notifications/initialized` arm can drive the seed fetch.
3142        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3143        let req = serde_json::json!({
3144            "jsonrpc": "2.0",
3145            "id": 400,
3146            "method": "initialize",
3147            "params": { "capabilities": { "roots": {} } }
3148        });
3149        server.handle_jsonrpc(req, None).await;
3150        assert!(
3151            server.client_caps.lock().await.roots_supported,
3152            "initialize must record advertised roots capability"
3153        );
3154    }
3155
3156    #[tokio::test]
3157    async fn initialize_arm_defaults_roots_unsupported_when_absent() {
3158        // No `capabilities.roots` on the initialize payload → server
3159        // must NOT spawn the seed fetch on `notifications/initialized`.
3160        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3161        let req = serde_json::json!({
3162            "jsonrpc": "2.0",
3163            "id": 401,
3164            "method": "initialize",
3165            "params": { "capabilities": {} }
3166        });
3167        server.handle_jsonrpc(req, None).await;
3168        assert!(
3169            !server.client_caps.lock().await.roots_supported,
3170            "initialize must leave roots_supported=false when absent"
3171        );
3172    }
3173
3174    #[tokio::test]
3175    async fn initialize_result_includes_sampling_when_flag_set() {
3176        let payload = super::initialize_result_with_sampling();
3177        assert!(
3178            payload["capabilities"]["sampling"].is_object(),
3179            "initialize_result_with_sampling must surface capabilities.sampling; got: {payload}"
3180        );
3181    }
3182
3183    #[tokio::test]
3184    async fn initialize_result_omits_sampling_when_flag_unset() {
3185        let payload = super::initialize_result_without_sampling();
3186        assert!(
3187            payload["capabilities"].get("sampling").is_none(),
3188            "initialize_result_without_sampling must omit capabilities.sampling; got: {payload}"
3189        );
3190    }
3191
3192    #[tokio::test]
3193    async fn initialized_notification_returns_null_value() {
3194        // Per JSON-RPC §4.1 the dispatcher returns `Value::Null` for a
3195        // notification so the stdio loop's classifier drops the frame
3196        // without writing a reply. Asserts the response shape directly;
3197        // the spawned roots/list fetch (when roots_supported) is covered
3198        // by T9's integration suite.
3199        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3200        let req = serde_json::json!({
3201            "jsonrpc": "2.0",
3202            "method": "notifications/initialized"
3203        });
3204        let resp = server.handle_jsonrpc(req, None).await;
3205        assert!(
3206            resp.is_null(),
3207            "notifications/initialized must yield a Null sentinel; got: {resp}"
3208        );
3209    }
3210
3211    #[tokio::test]
3212    async fn list_changed_notification_returns_null_value() {
3213        // `notifications/roots/list_changed` is a JSON-RPC notification —
3214        // the dispatcher returns `Value::Null` so the stdio loop's
3215        // classifier discards the frame without writing a reply.
3216        // Cache-refresh side effect is asserted end-to-end by T9.
3217        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3218        let req = serde_json::json!({
3219            "jsonrpc": "2.0",
3220            "method": "notifications/roots/list_changed"
3221        });
3222        let resp = server.handle_jsonrpc(req, None).await;
3223        assert!(
3224            resp.is_null(),
3225            "notifications/roots/list_changed must yield a Null sentinel; got: {resp}"
3226        );
3227    }
3228
3229    #[tokio::test]
3230    async fn list_changed_when_cache_absent_is_noop() {
3231        // Default-built server (no `with_client_sampling`) never wires
3232        // the outbound primitive or the roots cache. The dispatch arm
3233        // must degrade silently — no panic, still returns the Null
3234        // sentinel so the stdio classifier drops the frame.
3235        let server = McpServer::expose_tools(Arc::new(OneToolInvoker));
3236        assert!(
3237            server
3238                .stdio_session
3239                .get()
3240                .and_then(|s| s.roots_cache.get())
3241                .is_none(),
3242            "default-built server must not wire a roots cache"
3243        );
3244        let req = serde_json::json!({
3245            "jsonrpc": "2.0",
3246            "method": "notifications/roots/list_changed"
3247        });
3248        let resp = server.handle_jsonrpc(req, None).await;
3249        assert!(
3250            resp.is_null(),
3251            "cache-absent list_changed must still yield Null; got: {resp}"
3252        );
3253    }
3254
3255    #[cfg(feature = "http")]
3256    #[tokio::test]
3257    async fn tool_ctx_with_progress_threads_cancel() {
3258        use std::sync::Arc;
3259        use tokio_util::sync::CancellationToken;
3260        let server = Arc::new(
3261            McpServer::builder()
3262                .add_tools(Arc::new(EmptyInvoker))
3263                .build()
3264                .unwrap(),
3265        );
3266        let (tx, _rx) = tokio::sync::broadcast::channel::<klieo_core::AgentEvent>(8);
3267        let token = CancellationToken::new();
3268        let ctx = server.tool_ctx_with_progress(tx, token.clone(), None, None);
3269        token.cancel();
3270        assert!(ctx.cancel.is_cancelled());
3271    }
3272
3273    mod expose_agent_tests {
3274        use super::*;
3275        use async_trait::async_trait;
3276        use klieo_core::agent::{Agent, AgentContext};
3277        use klieo_core::error::Error as KlieoError;
3278        use klieo_core::llm::ToolDef;
3279        use klieo_core::test_utils::fake_context;
3280        use serde::{Deserialize, Serialize};
3281
3282        #[derive(Debug, Clone, Deserialize, Serialize)]
3283        struct GreetIn {
3284            who: String,
3285        }
3286
3287        #[derive(Debug, Clone, Serialize)]
3288        struct GreetOut {
3289            greeting: String,
3290        }
3291
3292        struct Greeter;
3293
3294        #[async_trait]
3295        impl Agent for Greeter {
3296            type Input = GreetIn;
3297            type Output = GreetOut;
3298            type Error = KlieoError;
3299
3300            fn name(&self) -> &str {
3301                "greeter"
3302            }
3303            fn system_prompt(&self) -> &str {
3304                ""
3305            }
3306            fn tools(&self) -> &[ToolDef] {
3307                &[]
3308            }
3309            async fn run(
3310                &self,
3311                _ctx: AgentContext,
3312                input: GreetIn,
3313            ) -> Result<GreetOut, KlieoError> {
3314                Ok(GreetOut {
3315                    greeting: format!("hello {}", input.who),
3316                })
3317            }
3318        }
3319
3320        // Used by both the propagation and default-token tests so
3321        // each cancel scenario refers to the same Agent shape.
3322
3323        #[derive(Debug, Clone, Serialize)]
3324        struct CancelObserveOut {
3325            state: String,
3326        }
3327
3328        struct CancelObserver;
3329
3330        #[async_trait]
3331        impl Agent for CancelObserver {
3332            type Input = serde_json::Value;
3333            type Output = CancelObserveOut;
3334            type Error = KlieoError;
3335            fn name(&self) -> &str {
3336                "cancel-observer"
3337            }
3338            fn system_prompt(&self) -> &str {
3339                ""
3340            }
3341            fn tools(&self) -> &[ToolDef] {
3342                &[]
3343            }
3344            async fn run(
3345                &self,
3346                ctx: AgentContext,
3347                _input: serde_json::Value,
3348            ) -> Result<CancelObserveOut, KlieoError> {
3349                let state = if ctx.cancel.is_cancelled() {
3350                    "cancelled".into()
3351                } else {
3352                    "ran".into()
3353                };
3354                Ok(CancelObserveOut { state })
3355            }
3356        }
3357
3358        fn fresh_ctx() -> AgentContext {
3359            fake_context("greeter")
3360        }
3361
3362        fn one_object_schema() -> serde_json::Value {
3363            serde_json::json!({
3364                "type": "object",
3365                "properties": {"who": {"type": "string"}},
3366                "required": ["who"]
3367            })
3368        }
3369
3370        #[tokio::test]
3371        async fn expose_agent_with_schema_lists_agent_as_single_tool() {
3372            let server = McpServer::expose_agent_with_schema(
3373                Greeter,
3374                one_object_schema(),
3375                Arc::new(fresh_ctx),
3376            );
3377            let resp = server
3378                .handle_line(r#"{"jsonrpc":"2.0","id":1,"method":"tools/list"}"#)
3379                .await;
3380            let tools = resp["result"]["tools"].as_array().unwrap();
3381            assert_eq!(tools.len(), 1);
3382            assert_eq!(tools[0]["name"], "greeter");
3383            assert_eq!(tools[0]["inputSchema"]["type"], "object");
3384        }
3385
3386        #[tokio::test]
3387        async fn expose_agent_with_schema_dispatches_tools_call_through_agent() {
3388            let server = McpServer::expose_agent_with_schema(
3389                Greeter,
3390                one_object_schema(),
3391                Arc::new(fresh_ctx),
3392            );
3393            let resp = server
3394                .handle_line(
3395                    r#"{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"greeter","arguments":{"who":"world"}}}"#,
3396                )
3397                .await;
3398            let text = resp["result"]["content"][0]["text"].as_str().unwrap();
3399            assert!(
3400                text.contains(r#""greeting":"hello world""#),
3401                "tools/call must return serialised agent output; got: {text}"
3402            );
3403        }
3404
3405        #[tokio::test]
3406        async fn expose_agent_with_schema_rejects_unknown_tool_name() {
3407            let server = McpServer::expose_agent_with_schema(
3408                Greeter,
3409                one_object_schema(),
3410                Arc::new(fresh_ctx),
3411            );
3412            let resp = server
3413                .handle_line(
3414                    r#"{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"not-greeter","arguments":{}}}"#,
3415                )
3416                .await;
3417            assert_eq!(resp["error"]["code"], JSONRPC_SERVER_ERROR);
3418            assert!(resp["error"]["message"]
3419                .as_str()
3420                .unwrap()
3421                .contains("not-greeter"));
3422        }
3423
3424        #[tokio::test]
3425        async fn expose_agent_with_schema_rejects_malformed_args() {
3426            let server = McpServer::expose_agent_with_schema(
3427                Greeter,
3428                one_object_schema(),
3429                Arc::new(fresh_ctx),
3430            );
3431            // `who` field missing — A::Input decode fails.
3432            let resp = server
3433                .handle_line(
3434                    r#"{"jsonrpc":"2.0","id":4,"method":"tools/call","params":{"name":"greeter","arguments":{}}}"#,
3435                )
3436                .await;
3437            assert_eq!(resp["error"]["code"], JSONRPC_SERVER_ERROR);
3438            let msg = resp["error"]["message"].as_str().unwrap();
3439            assert!(
3440                msg.contains("arguments do not match inputSchema"),
3441                "wire message must be the sanitised string; got: {msg}"
3442            );
3443            // The serde decode internals (struct names, paths) MUST NOT
3444            // surface to the peer — verify they are NOT present.
3445            assert!(
3446                !msg.contains("GreetIn") && !msg.contains("missing field"),
3447                "internal decode detail must not leak: {msg}"
3448            );
3449        }
3450
3451        /// Agent execution errors are sanitised over the wire — peer
3452        /// sees the stable `"agent execution failed"` message and the
3453        /// full chain logs server-side via tracing::warn! (CWE-209).
3454        /// Asserts both directions: stable string present + internal
3455        /// secret-bearing content absent.
3456        #[tokio::test]
3457        async fn expose_agent_sanitises_run_error_on_wire() {
3458            struct Failing;
3459            #[async_trait]
3460            impl Agent for Failing {
3461                type Input = serde_json::Value;
3462                type Output = serde_json::Value;
3463                type Error = KlieoError;
3464                fn name(&self) -> &str {
3465                    "failing"
3466                }
3467                fn system_prompt(&self) -> &str {
3468                    ""
3469                }
3470                fn tools(&self) -> &[ToolDef] {
3471                    &[]
3472                }
3473                async fn run(
3474                    &self,
3475                    _ctx: AgentContext,
3476                    _input: serde_json::Value,
3477                ) -> Result<serde_json::Value, KlieoError> {
3478                    Err(KlieoError::BadResponse(
3479                        "internal: token=secret-abc upstream=https://provider/url".into(),
3480                    ))
3481                }
3482            }
3483            let server = McpServer::expose_agent_with_schema(
3484                Failing,
3485                serde_json::json!({}),
3486                Arc::new(fresh_ctx),
3487            );
3488            let resp = server
3489                .handle_line(
3490                    r#"{"jsonrpc":"2.0","id":99,"method":"tools/call","params":{"name":"failing","arguments":{}}}"#,
3491                )
3492                .await;
3493            assert_eq!(resp["error"]["code"], JSONRPC_SERVER_ERROR);
3494            let msg = resp["error"]["message"].as_str().unwrap();
3495            assert!(
3496                msg.contains("tool invocation failed"),
3497                "wire message must contain the sanitised stable string; got: {msg}"
3498            );
3499            // The agent's internal error payload (secret-like content,
3500            // URLs) MUST NOT reach the peer.
3501            assert!(
3502                !msg.contains("secret-abc") && !msg.contains("https://"),
3503                "internal error detail must not leak: {msg}"
3504            );
3505        }
3506
3507        /// Cancellation propagation via the builder: cancelling the
3508        /// parent token mid-flight cancels every in-flight agent's
3509        /// `ctx.cancel`. Agent observes `ctx.cancel.is_cancelled()`
3510        /// and reports `"ran"` or `"cancelled"` in its Output so the
3511        /// test can distinguish before and after `parent.cancel()`.
3512        #[tokio::test]
3513        async fn builder_propagates_parent_cancel_into_ctx() {
3514            let parent = CancellationToken::new();
3515            let server = McpServer::builder()
3516                .with_parent_cancel(parent.clone())
3517                .add_agent_with_schema(CancelObserver, serde_json::json!({}), Arc::new(fresh_ctx))
3518                .build()
3519                .unwrap();
3520
3521            // Uncancelled parent → agent sees a live cancel token.
3522            let resp = server
3523                .handle_line(
3524                    r#"{"jsonrpc":"2.0","id":200,"method":"tools/call","params":{"name":"cancel-observer","arguments":{}}}"#,
3525                )
3526                .await;
3527            let text = resp["result"]["content"][0]["text"].as_str().unwrap();
3528            assert!(
3529                text.contains(r#""state":"ran""#),
3530                "live parent token must produce live ctx.cancel; got: {text}"
3531            );
3532
3533            // Cancel the parent; new invocations must observe a
3534            // cancelled token via the child-token override.
3535            parent.cancel();
3536            let resp = server
3537                .handle_line(
3538                    r#"{"jsonrpc":"2.0","id":201,"method":"tools/call","params":{"name":"cancel-observer","arguments":{}}}"#,
3539                )
3540                .await;
3541            let text = resp["result"]["content"][0]["text"].as_str().unwrap();
3542            assert!(
3543                text.contains(r#""state":"cancelled""#),
3544                "cancelled parent must propagate into ctx.cancel via child_token; got: {text}"
3545            );
3546        }
3547
3548        /// Streaming path: `tool_ctx_with_progress(tx, token)` threads
3549        /// the request-scoped token into the `ToolCtx`; `AgentAsToolInvoker`
3550        /// derives a child and places it on the minted `AgentContext`. The
3551        /// `CancelObserver` agent reports its `ctx.cancel` state so the
3552        /// test can assert end-to-end propagation without needing SSE wiring.
3553        #[cfg(feature = "http")]
3554        #[tokio::test]
3555        async fn tool_ctx_with_progress_cancel_cascades_into_agent_context() {
3556            let request_cancel = CancellationToken::new();
3557            let server = Arc::new(
3558                McpServer::builder()
3559                    .add_agent_with_schema(
3560                        CancelObserver,
3561                        serde_json::json!({}),
3562                        Arc::new(fresh_ctx),
3563                    )
3564                    .build()
3565                    .unwrap(),
3566            );
3567            let (tx, _rx) = tokio::sync::broadcast::channel::<klieo_core::AgentEvent>(8);
3568            request_cancel.cancel();
3569            let tool_ctx = server.tool_ctx_with_progress(tx, request_cancel, None, None);
3570            let result = server
3571                .invoker
3572                .invoke("cancel-observer", serde_json::json!({}), tool_ctx)
3573                .await
3574                .unwrap();
3575            let text = result.to_string();
3576            assert!(
3577                text.contains(r#""state":"cancelled""#),
3578                "cancelled request token must cascade into AgentContext.cancel; got: {text}"
3579            );
3580        }
3581
3582        /// The shorthand `expose_agent_with_schema` ctor — which
3583        /// delegates to the builder without ever calling
3584        /// `with_parent_cancel` — yields a fresh, never-cancelled
3585        /// default token. Agents see `ctx.cancel.is_cancelled() ==
3586        /// false`. Guards against accidental wiring of an external
3587        /// token into the default path.
3588        #[tokio::test]
3589        async fn shim_ctor_uses_default_uncancelled_parent_token() {
3590            let server = McpServer::expose_agent_with_schema(
3591                CancelObserver,
3592                serde_json::json!({}),
3593                Arc::new(fresh_ctx),
3594            );
3595            let resp = server
3596                .handle_line(
3597                    r#"{"jsonrpc":"2.0","id":202,"method":"tools/call","params":{"name":"cancel-observer","arguments":{}}}"#,
3598                )
3599                .await;
3600            let text = resp["result"]["content"][0]["text"].as_str().unwrap();
3601            assert!(
3602                text.contains(r#""state":"ran""#),
3603                "shim ctor must default to a never-cancelled parent token; got: {text}"
3604            );
3605        }
3606
3607        /// An `AgentAsToolInvoker` receiving a
3608        /// `ToolCtx` with `caller_principal=Some(p)` mints an
3609        /// `AgentContext` whose `tenant_label=principal_hash(p)`. The
3610        /// run loop then records exactly one `Episode::RunAttributed`
3611        /// carrying that label, and the raw principal never reaches
3612        /// episodic memory or short-term memory.
3613        #[tokio::test]
3614        async fn agent_as_tool_invoker_installs_tenant_label_from_caller_principal() {
3615            use klieo_core::test_utils::{noop_bus, FakeLlmClient, FakeLlmStep};
3616            const PRINCIPAL: &str = "alice@example.com";
3617
3618            struct EchoLoopAgent;
3619
3620            #[async_trait]
3621            impl Agent for EchoLoopAgent {
3622                type Input = serde_json::Value;
3623                type Output = serde_json::Value;
3624                type Error = KlieoError;
3625                fn name(&self) -> &str {
3626                    "echo-loop"
3627                }
3628                fn system_prompt(&self) -> &str {
3629                    ""
3630                }
3631                fn tools(&self) -> &[ToolDef] {
3632                    &[]
3633                }
3634                async fn run(
3635                    &self,
3636                    ctx: AgentContext,
3637                    _input: serde_json::Value,
3638                ) -> Result<serde_json::Value, KlieoError> {
3639                    let out = klieo_core::runtime::run_steps(
3640                        &ctx,
3641                        "",
3642                        klieo_core::ids::ThreadId::new("echo-loop-thread"),
3643                        klieo_core::runtime::RunOptions::default(),
3644                    )
3645                    .await?;
3646                    Ok(serde_json::Value::String(out))
3647                }
3648            }
3649
3650            let mut ctx_seed = fake_context("echo-loop");
3651            ctx_seed.llm = Arc::new(
3652                FakeLlmClient::new("fake").with_steps(vec![FakeLlmStep::Text("done".into())]),
3653            );
3654            let episodic_for_probe = ctx_seed.episodic.clone();
3655            let short_term_for_probe = ctx_seed.short_term.clone();
3656            let run_id_for_probe = ctx_seed.run_id;
3657
3658            let slot = Arc::new(std::sync::Mutex::new(Some(ctx_seed)));
3659            let ctx_factory: AgentContextFactory = Arc::new(move || {
3660                slot.lock()
3661                    .unwrap()
3662                    .take()
3663                    .expect("ctx_factory called more than once")
3664            });
3665            let server = McpServer::builder()
3666                .add_agent_with_schema(
3667                    EchoLoopAgent,
3668                    serde_json::json!({"type": "object"}),
3669                    ctx_factory,
3670                )
3671                .build()
3672                .unwrap();
3673            let (pubsub, _, kv, jobs) = noop_bus();
3674            let tool_ctx = klieo_core::tool::ToolCtx::new(pubsub, kv, jobs)
3675                .with_caller_principal(PRINCIPAL.into());
3676            let _ = server
3677                .invoker
3678                .invoke(
3679                    "echo-loop",
3680                    serde_json::json!({}),
3681                    tool_ctx,
3682                )
3683                .await
3684                .unwrap();
3685
3686            let expected = klieo_core::principal_hash(PRINCIPAL);
3687            let episodes = episodic_for_probe.replay(run_id_for_probe).await.unwrap();
3688            let labels: Vec<&str> = episodes
3689                .iter()
3690                .filter_map(|e| match e {
3691                    klieo_core::Episode::RunAttributed { tenant_label } => {
3692                        Some(tenant_label.as_str())
3693                    }
3694                    _ => None,
3695                })
3696                .collect();
3697            assert_eq!(
3698                labels,
3699                vec![expected.as_str()],
3700                "exactly one RunAttributed carrying principal_hash; got {episodes:?}",
3701            );
3702            for ep in &episodes {
3703                let payload = serde_json::to_string(ep).unwrap();
3704                assert!(
3705                    !payload.contains(PRINCIPAL),
3706                    "raw principal leaked into recorded episode: {payload}",
3707                );
3708            }
3709            let history = short_term_for_probe
3710                .load(klieo_core::ids::ThreadId::new("echo-loop-thread"), 8192)
3711                .await
3712                .unwrap_or_default();
3713            for msg in &history {
3714                assert!(
3715                    !msg.content.contains(PRINCIPAL),
3716                    "principal leaked into short-term memory: {}",
3717                    msg.content
3718                );
3719            }
3720        }
3721
3722        /// An `AgentAsToolInvoker` receiving a `ToolCtx`
3723        /// with `parent_anchor=Some(a)` records exactly one
3724        /// `Episode::RunOrigin` carrying the anchor **verbatim** (not
3725        /// hashed/rewritten), co-emitted with the `RunAttributed`
3726        /// attribution for the authenticated caller, and never admits the
3727        /// anchor into short-term (LLM-visible) memory.
3728        #[tokio::test]
3729        async fn agent_as_tool_invoker_records_run_origin_from_parent_anchor() {
3730            use klieo_core::test_utils::{noop_bus, FakeLlmClient, FakeLlmStep};
3731            const PRINCIPAL: &str = "alice@example.com";
3732            const ANCHOR: &str = "sha256:deadbeefcafe0123";
3733
3734            struct EchoLoopAgent;
3735
3736            #[async_trait]
3737            impl Agent for EchoLoopAgent {
3738                type Input = serde_json::Value;
3739                type Output = serde_json::Value;
3740                type Error = KlieoError;
3741                fn name(&self) -> &str {
3742                    "echo-origin"
3743                }
3744                fn system_prompt(&self) -> &str {
3745                    ""
3746                }
3747                fn tools(&self) -> &[ToolDef] {
3748                    &[]
3749                }
3750                async fn run(
3751                    &self,
3752                    ctx: AgentContext,
3753                    _input: serde_json::Value,
3754                ) -> Result<serde_json::Value, KlieoError> {
3755                    let out = klieo_core::runtime::run_steps(
3756                        &ctx,
3757                        "",
3758                        klieo_core::ids::ThreadId::new("echo-origin-thread"),
3759                        klieo_core::runtime::RunOptions::default(),
3760                    )
3761                    .await?;
3762                    Ok(serde_json::Value::String(out))
3763                }
3764            }
3765
3766            let mut ctx_seed = fake_context("echo-origin");
3767            ctx_seed.llm = Arc::new(
3768                FakeLlmClient::new("fake").with_steps(vec![FakeLlmStep::Text("done".into())]),
3769            );
3770            let episodic_for_probe = ctx_seed.episodic.clone();
3771            let short_term_for_probe = ctx_seed.short_term.clone();
3772            let run_id_for_probe = ctx_seed.run_id;
3773
3774            let slot = Arc::new(std::sync::Mutex::new(Some(ctx_seed)));
3775            let ctx_factory: AgentContextFactory = Arc::new(move || {
3776                slot.lock()
3777                    .unwrap()
3778                    .take()
3779                    .expect("ctx_factory called more than once")
3780            });
3781            let server = McpServer::builder()
3782                .add_agent_with_schema(
3783                    EchoLoopAgent,
3784                    serde_json::json!({"type": "object"}),
3785                    ctx_factory,
3786                )
3787                .build()
3788                .unwrap();
3789            let (pubsub, _, kv, jobs) = noop_bus();
3790            let tool_ctx = klieo_core::tool::ToolCtx::new(pubsub, kv, jobs)
3791                .with_caller_principal(PRINCIPAL.into())
3792                .with_parent_anchor(ANCHOR.into());
3793            let _ = server
3794                .invoker
3795                .invoke("echo-origin", serde_json::json!({}), tool_ctx)
3796                .await
3797                .unwrap();
3798
3799            let episodes = episodic_for_probe.replay(run_id_for_probe).await.unwrap();
3800            let anchors: Vec<&str> = episodes
3801                .iter()
3802                .filter_map(|e| match e {
3803                    klieo_core::Episode::RunOrigin { parent_anchor } => {
3804                        Some(parent_anchor.as_str())
3805                    }
3806                    _ => None,
3807                })
3808                .collect();
3809            assert_eq!(
3810                anchors,
3811                vec![ANCHOR],
3812                "exactly one RunOrigin carrying the verbatim anchor; got {episodes:?}",
3813            );
3814            // Co-attribution: RunOrigin never stands alone — the
3815            // authenticated principal is recorded in the same run so the
3816            // unverified parent claim is attributable.
3817            let attributed = episodes
3818                .iter()
3819                .filter(|e| matches!(e, klieo_core::Episode::RunAttributed { .. }))
3820                .count();
3821            assert_eq!(
3822                attributed, 1,
3823                "RunOrigin co-emitted with exactly one RunAttributed; got {episodes:?}",
3824            );
3825            let history = short_term_for_probe
3826                .load(klieo_core::ids::ThreadId::new("echo-origin-thread"), 8192)
3827                .await
3828                .unwrap_or_default();
3829            for msg in &history {
3830                assert!(
3831                    !msg.content.contains(ANCHOR),
3832                    "anchor leaked into short-term memory: {}",
3833                    msg.content
3834                );
3835            }
3836        }
3837
3838        /// `build()` returns `NoInvokers` when called with zero
3839        /// invokers — the server has nothing to serve. Documents
3840        /// the typed-error guard.
3841        #[tokio::test]
3842        async fn builder_with_no_invokers_returns_no_invokers_error() {
3843            let result = McpServer::builder().build();
3844            assert!(matches!(result, Err(McpBuildError::NoInvokers)));
3845        }
3846
3847        /// Opting into client sampling via the builder threads the
3848        /// flag onto the built `McpServer`. The capability-negotiation
3849        /// arm reads it to gate `capabilities.sampling = {}` on the
3850        /// initialize response.
3851        #[tokio::test]
3852        async fn with_client_sampling_sets_capability_flag() {
3853            let server = McpServer::builder()
3854                .with_client_sampling()
3855                .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
3856                .build()
3857                .unwrap();
3858            assert!(
3859                server.declare_sampling,
3860                "with_client_sampling() must set declare_sampling=true on the built server"
3861            );
3862        }
3863
3864        /// A builder that never calls `with_client_sampling()` produces
3865        /// a server with the flag unset. Initialize must NOT advertise
3866        /// `capabilities.sampling` for tools-only deployments.
3867        #[tokio::test]
3868        async fn default_builder_does_not_declare_sampling() {
3869            let server = McpServer::builder()
3870                .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
3871                .build()
3872                .unwrap();
3873            assert!(
3874                !server.declare_sampling,
3875                "default builder must leave declare_sampling=false"
3876            );
3877        }
3878
3879        /// A builder that never calls `with_session_idle_timeout`
3880        /// produces a server whose idle deadline matches
3881        /// `DEFAULT_SESSION_IDLE_TIMEOUT` (5 minutes per ADR-028).
3882        /// Also pins that the per-session SSE tx + activity clock
3883        /// fields are initialised to their pre-session sentinels.
3884        #[cfg(feature = "http")]
3885        #[tokio::test]
3886        async fn default_session_idle_timeout_is_5min() {
3887            let server = McpServer::builder()
3888                .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
3889                .build()
3890                .unwrap();
3891            assert_eq!(
3892                server.session_idle_timeout,
3893                std::time::Duration::from_secs(300),
3894                "default session idle timeout must be 5 minutes"
3895            );
3896            assert!(
3897                server.sessions.read().await.is_empty(),
3898                "HTTP server must hold zero sessions before any initialize POST"
3899            );
3900        }
3901
3902        /// `with_session_idle_timeout(ttl)` overrides the default
3903        /// idle deadline on the built server. Pins the builder
3904        /// method threads its argument through `build_inner`.
3905        #[cfg(feature = "http")]
3906        #[tokio::test]
3907        async fn with_session_idle_timeout_overrides_default() {
3908            let server = McpServer::builder()
3909                .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
3910                .with_session_idle_timeout(std::time::Duration::from_secs(42))
3911                .build()
3912                .unwrap();
3913            assert_eq!(
3914                server.session_idle_timeout,
3915                std::time::Duration::from_secs(42),
3916                "with_session_idle_timeout must override the default"
3917            );
3918        }
3919
3920        /// `with_session_idle_timeout(Duration::ZERO)` records the
3921        /// disabled-watchdog sentinel. The watchdog task spawned on
3922        /// `initialize` reads this and returns immediately when zero.
3923        #[cfg(feature = "http")]
3924        #[tokio::test]
3925        async fn zero_duration_records_disabled_watchdog() {
3926            let server = McpServer::builder()
3927                .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
3928                .with_session_idle_timeout(std::time::Duration::ZERO)
3929                .build()
3930                .unwrap();
3931            assert_eq!(
3932                server.session_idle_timeout,
3933                std::time::Duration::ZERO,
3934                "Duration::ZERO must thread through to record disabled-watchdog intent"
3935            );
3936        }
3937
3938        /// `DEFAULT_MAX_SESSIONS` pins the concurrent-HTTP-session cap
3939        /// at 1024. Operators tuning the knob downward are calling
3940        /// `with_max_sessions` against this baseline; the value must
3941        /// not drift without a CHANGELOG entry.
3942        #[cfg(feature = "http")]
3943        #[test]
3944        fn default_max_sessions_is_1024() {
3945            assert_eq!(DEFAULT_MAX_SESSIONS, 1024);
3946        }
3947
3948        /// `with_max_sessions(cap)` overrides the
3949        /// [`DEFAULT_MAX_SESSIONS`] baseline on the built server.
3950        /// Pins that the builder method threads its argument through
3951        /// `build_inner` into the `max_sessions` field read by
3952        /// `handle_initialize_post`.
3953        #[cfg(feature = "http")]
3954        #[tokio::test]
3955        async fn with_max_sessions_overrides_default() {
3956            let server = McpServer::builder()
3957                .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
3958                .with_max_sessions(64)
3959                .build()
3960                .unwrap();
3961            assert_eq!(
3962                server.max_sessions, 64,
3963                "with_max_sessions must override the default cap"
3964            );
3965        }
3966
3967        /// `with_max_sessions(0)` panics — a zero cap would deadlock
3968        /// the `initialize` POST path with no recovery, so the
3969        /// builder rejects it eagerly rather than producing a
3970        /// permanently 503ing server.
3971        #[cfg(feature = "http")]
3972        #[test]
3973        #[should_panic(expected = "max_sessions must be > 0")]
3974        fn with_max_sessions_panics_on_zero() {
3975            let _ = McpServer::builder().with_max_sessions(0);
3976        }
3977
3978        /// `DEFAULT_MAX_SESSIONS_PER_PRINCIPAL_DIVISOR` pins the
3979        /// divisor used by [`default_max_sessions_per_principal`] at
3980        /// 16. Operators see the default sub-cap as
3981        /// `max_sessions / 16`; drifting the constant changes the
3982        /// implicit cap on every untouched deployment.
3983        #[cfg(feature = "http")]
3984        #[test]
3985        fn default_divisor_is_sixteen() {
3986            assert_eq!(DEFAULT_MAX_SESSIONS_PER_PRINCIPAL_DIVISOR, 16);
3987        }
3988
3989        /// `default_max_sessions_per_principal` derives the sub-cap
3990        /// by dividing `max_sessions` by
3991        /// [`DEFAULT_MAX_SESSIONS_PER_PRINCIPAL_DIVISOR`]. Pins the
3992        /// production-baseline pair (1024 → 64) plus a smaller
3993        /// configuration (32 → 2) to lock in the linear scaling.
3994        #[cfg(feature = "http")]
3995        #[test]
3996        fn default_per_principal_derives_from_max_sessions() {
3997            assert_eq!(default_max_sessions_per_principal(1024), 64);
3998            assert_eq!(default_max_sessions_per_principal(32), 2);
3999        }
4000
4001        /// `default_max_sessions_per_principal` floors at 1 when
4002        /// integer division would yield 0. Guarantees the default
4003        /// sub-cap is always reachable so authenticated `initialize`
4004        /// POSTs never face a permanent 503 under small
4005        /// `max_sessions` values.
4006        #[cfg(feature = "http")]
4007        #[test]
4008        fn default_per_principal_floors_at_one() {
4009            assert_eq!(default_max_sessions_per_principal(0), 1);
4010            assert_eq!(default_max_sessions_per_principal(15), 1);
4011            assert_eq!(default_max_sessions_per_principal(16), 1);
4012        }
4013
4014        /// `with_max_sessions_per_principal(cap)` overrides the
4015        /// default sub-cap on the built server. Pins that the builder
4016        /// method threads its argument through `build_inner` into the
4017        /// `max_sessions_per_principal` field read by the admission
4018        /// branch on `handle_initialize_post`.
4019        #[cfg(feature = "http")]
4020        #[tokio::test]
4021        async fn with_max_sessions_per_principal_overrides_default() {
4022            let server = McpServer::builder()
4023                .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
4024                .with_max_sessions(1024)
4025                .with_max_sessions_per_principal(8)
4026                .build()
4027                .unwrap();
4028            assert_eq!(
4029                server.max_sessions_per_principal, 8,
4030                "with_max_sessions_per_principal must override the default sub-cap"
4031            );
4032        }
4033
4034        /// `with_max_sessions_per_principal(0)` panics — a zero
4035        /// sub-cap rejects every authenticated `initialize` with no
4036        /// recovery, so the builder rejects it eagerly rather than
4037        /// producing a permanently 503ing server for any
4038        /// authenticated principal.
4039        #[cfg(feature = "http")]
4040        #[test]
4041        #[should_panic(expected = "max_sessions_per_principal must be > 0")]
4042        fn with_max_sessions_per_principal_panics_on_zero() {
4043            let _ = McpServer::builder().with_max_sessions_per_principal(0);
4044        }
4045
4046        /// `DEFAULT_SSE_REPLAY_CAPACITY` pins the production-baseline
4047        /// per-session SSE replay buffer capacity at 256 frames.
4048        /// Operators observing the implicit ring size depend on this
4049        /// number; a drift would silently change memory pressure on
4050        /// every untouched deployment.
4051        #[cfg(feature = "http")]
4052        #[test]
4053        fn default_sse_replay_capacity_is_256() {
4054            assert_eq!(DEFAULT_SSE_REPLAY_CAPACITY, 256);
4055        }
4056
4057        /// `with_sse_replay_capacity(capacity)` overrides the default
4058        /// on the built server. Pins that the builder method threads
4059        /// its argument through `build_inner` into the
4060        /// `sse_replay_capacity` field, and that `sse_replay_enabled()`
4061        /// returns `true` for positive capacities and `false` for 0
4062        /// (the disable knob).
4063        #[cfg(feature = "http")]
4064        #[tokio::test]
4065        async fn with_sse_replay_capacity_overrides_default() {
4066            let server = McpServer::builder()
4067                .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
4068                .with_sse_replay_capacity(8)
4069                .build()
4070                .unwrap();
4071            assert_eq!(server.sse_replay_capacity, 8);
4072            assert!(server.sse_replay_enabled());
4073
4074            let off = McpServer::builder()
4075                .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
4076                .with_sse_replay_capacity(0)
4077                .build()
4078                .unwrap();
4079            assert_eq!(off.sse_replay_capacity, 0);
4080            assert!(!off.sse_replay_enabled());
4081        }
4082
4083        /// Two agents registered via repeated `add_agent_with_schema`
4084        /// both show up in `tools/list` and each `tools/call` routes
4085        /// to the correct agent through `MergedInvoker`.
4086        #[tokio::test]
4087        async fn builder_supports_multi_agent_dispatch() {
4088            let server = McpServer::builder()
4089                .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
4090                .add_agent_with_schema(CancelObserver, serde_json::json!({}), Arc::new(fresh_ctx))
4091                .build()
4092                .unwrap();
4093
4094            // tools/list surfaces both agents.
4095            let resp = server
4096                .handle_line(r#"{"jsonrpc":"2.0","id":300,"method":"tools/list"}"#)
4097                .await;
4098            let tools = resp["result"]["tools"].as_array().unwrap();
4099            let names: Vec<&str> = tools.iter().map(|t| t["name"].as_str().unwrap()).collect();
4100            assert_eq!(tools.len(), 2);
4101            assert!(names.contains(&"greeter"));
4102            assert!(names.contains(&"cancel-observer"));
4103
4104            // tools/call to greeter dispatches to Greeter.
4105            let resp = server
4106                .handle_line(
4107                    r#"{"jsonrpc":"2.0","id":301,"method":"tools/call","params":{"name":"greeter","arguments":{"who":"multi"}}}"#,
4108                )
4109                .await;
4110            let text = resp["result"]["content"][0]["text"].as_str().unwrap();
4111            assert!(text.contains(r#""greeting":"hello multi""#));
4112
4113            // tools/call to cancel-observer dispatches to CancelObserver.
4114            let resp = server
4115                .handle_line(
4116                    r#"{"jsonrpc":"2.0","id":302,"method":"tools/call","params":{"name":"cancel-observer","arguments":{}}}"#,
4117                )
4118                .await;
4119            let text = resp["result"]["content"][0]["text"].as_str().unwrap();
4120            assert!(text.contains(r#""state":"ran""#));
4121        }
4122
4123        /// Parent-cancel token registered ONCE on the builder
4124        /// propagates into every agent the builder produces. Pinning
4125        /// this prevents a future refactor from giving each
4126        /// `add_agent_*` call its own copy that doesn't share the
4127        /// cancellation chain.
4128        #[tokio::test]
4129        async fn builder_parent_cancel_propagates_into_every_agent() {
4130            let parent = CancellationToken::new();
4131            let server = McpServer::builder()
4132                .with_parent_cancel(parent.clone())
4133                .add_agent_with_schema(CancelObserver, serde_json::json!({}), Arc::new(fresh_ctx))
4134                .add_tools(Arc::new(super::OneToolInvoker))
4135                .build()
4136                .unwrap();
4137
4138            parent.cancel();
4139            let resp = server
4140                .handle_line(
4141                    r#"{"jsonrpc":"2.0","id":303,"method":"tools/call","params":{"name":"cancel-observer","arguments":{}}}"#,
4142                )
4143                .await;
4144            let text = resp["result"]["content"][0]["text"].as_str().unwrap();
4145            assert!(
4146                text.contains(r#""state":"cancelled""#),
4147                "builder-level parent_cancel must reach every add_agent_* invoker; got: {text}"
4148            );
4149        }
4150
4151        /// On a multi-invoker server, `tools/call` for a name that
4152        /// matches no inner catalogue maps to `ToolError::UnknownTool`,
4153        /// surfaced as a JSON-RPC server error containing the
4154        /// unknown name. Covers `MergedInvoker::invoke`'s `None`
4155        /// branch which the dispatch tests do not exercise.
4156        #[tokio::test]
4157        async fn builder_multi_agent_unknown_tool_returns_error() {
4158            let server = McpServer::builder()
4159                .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
4160                .add_agent_with_schema(CancelObserver, serde_json::json!({}), Arc::new(fresh_ctx))
4161                .build()
4162                .unwrap();
4163
4164            let resp = server
4165                .handle_line(
4166                    r#"{"jsonrpc":"2.0","id":304,"method":"tools/call","params":{"name":"no-such-agent","arguments":{}}}"#,
4167                )
4168                .await;
4169            assert_eq!(resp["error"]["code"], JSONRPC_SERVER_ERROR);
4170            assert!(
4171                resp["error"]["message"]
4172                    .as_str()
4173                    .unwrap()
4174                    .contains("no-such-agent"),
4175                "UnknownTool error must reference the requested name"
4176            );
4177        }
4178
4179        /// Two agents that report the same `name()` are a caller bug.
4180        /// `build` with ≥2 invokers returns `DuplicateTool` on the first
4181        /// duplicate so routing stays unambiguous.
4182        #[tokio::test]
4183        async fn builder_build_returns_duplicate_tool_error() {
4184            let result = McpServer::builder()
4185                .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
4186                .add_agent_with_schema(Greeter, one_object_schema(), Arc::new(fresh_ctx))
4187                .build();
4188            let Err(McpBuildError::DuplicateTool(ref name)) = result else {
4189                panic!("expected DuplicateTool error");
4190            };
4191            assert!(
4192                !name.is_empty(),
4193                "DuplicateTool must carry the colliding tool name"
4194            );
4195            assert_eq!(
4196                name, "greeter",
4197                "DuplicateTool must name the colliding tool"
4198            );
4199        }
4200
4201        /// `MergedInvoker::tool_redacts_audit` forwards to the inner
4202        /// invoker that owns the tool, so a PII-flagged tool keeps its
4203        /// audit-redaction requirement after the merge — without the
4204        /// forwarding, the decorator default `false` would make dispatch
4205        /// record raw PII for any tool reached through a merged catalogue.
4206        /// Covers the routed-true, routed-false, and unrouted-None arms.
4207        #[test]
4208        fn merged_invoker_forwards_tool_redacts_audit_per_owner() {
4209            use klieo_core::test_utils::FakeToolInvoker;
4210
4211            let pii_owner: Arc<dyn ToolInvoker> = Arc::new(
4212                FakeToolInvoker::new().with_redacting_tool("claimant_lookup", "handles PII", Ok),
4213            );
4214            let plain_owner: Arc<dyn ToolInvoker> =
4215                Arc::new(FakeToolInvoker::new().with_tool("echo", "plain", Ok));
4216
4217            let merged = MergedInvoker::new(vec![pii_owner, plain_owner])
4218                .expect("distinct tool names must merge without DuplicateTool");
4219
4220            assert!(
4221                merged.tool_redacts_audit("claimant_lookup"),
4222                "a PII-flagged tool's redaction must survive the merge; \
4223                 default-false here would record raw PII (fail-open)"
4224            );
4225            assert!(
4226                !merged.tool_redacts_audit("echo"),
4227                "an unflagged tool must not be reported as redacting"
4228            );
4229            assert!(
4230                !merged.tool_redacts_audit("no-such-tool"),
4231                "an unrouted name hits the None arm and must default to false"
4232            );
4233        }
4234
4235        /// `with_cancel_subscription` + `build()` (not `build_arc()`) must
4236        /// return `CancelRequiresArc` — the unwrapped server shape cannot
4237        /// keep a background subscriber task alive.
4238        #[test]
4239        fn mcp_builder_build_returns_cancel_requires_arc_error() {
4240            let result = McpServer::builder()
4241                .add_tools(Arc::new(OneToolInvoker))
4242                .with_cancel_subscription()
4243                .build();
4244            assert!(
4245                matches!(result, Err(McpBuildError::CancelRequiresArc)),
4246                "build() with cancel subscription must return CancelRequiresArc",
4247            );
4248        }
4249
4250        /// Output-encode failure is sanitised on the wire the same way
4251        /// as run-error and decode-error. An Agent whose Output type's
4252        /// Serialize impl returns Err must NOT leak the inner serde
4253        /// error Display (which can embed arbitrary internal state
4254        /// chosen by the impl author) over the JSON-RPC boundary.
4255        #[tokio::test]
4256        async fn expose_agent_sanitises_encode_error_on_wire() {
4257            struct NonSerialisable;
4258
4259            impl serde::Serialize for NonSerialisable {
4260                fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
4261                where
4262                    S: serde::Serializer,
4263                {
4264                    Err(serde::ser::Error::custom(
4265                        "internal: token=secret-encode-abc upstream=https://provider/encode",
4266                    ))
4267                }
4268            }
4269
4270            struct EncodeFailing;
4271            #[async_trait]
4272            impl Agent for EncodeFailing {
4273                type Input = serde_json::Value;
4274                type Output = NonSerialisable;
4275                type Error = KlieoError;
4276                fn name(&self) -> &str {
4277                    "encode-failing"
4278                }
4279                fn system_prompt(&self) -> &str {
4280                    ""
4281                }
4282                fn tools(&self) -> &[ToolDef] {
4283                    &[]
4284                }
4285                async fn run(
4286                    &self,
4287                    _ctx: AgentContext,
4288                    _input: serde_json::Value,
4289                ) -> Result<NonSerialisable, KlieoError> {
4290                    Ok(NonSerialisable)
4291                }
4292            }
4293
4294            let server = McpServer::expose_agent_with_schema(
4295                EncodeFailing,
4296                serde_json::json!({}),
4297                Arc::new(fresh_ctx),
4298            );
4299            let resp = server
4300                .handle_line(
4301                    r#"{"jsonrpc":"2.0","id":100,"method":"tools/call","params":{"name":"encode-failing","arguments":{}}}"#,
4302                )
4303                .await;
4304            assert_eq!(resp["error"]["code"], JSONRPC_SERVER_ERROR);
4305            let msg = resp["error"]["message"].as_str().unwrap();
4306            assert!(
4307                msg.contains("tool invocation failed"),
4308                "wire message must contain the sanitised stable string; got: {msg}"
4309            );
4310            assert!(
4311                !msg.contains("secret-encode-abc") && !msg.contains("https://"),
4312                "internal encode-error detail must not leak: {msg}"
4313            );
4314        }
4315
4316        #[tokio::test]
4317        async fn tool_ctx_factory_invoked_per_request() {
4318            use std::sync::atomic::{AtomicUsize, Ordering};
4319            let counter = Arc::new(AtomicUsize::new(0));
4320            let c2 = counter.clone();
4321            let factory: ToolCtxFactory = Arc::new(move || {
4322                c2.fetch_add(1, Ordering::SeqCst);
4323                default_tool_ctx_factory()()
4324            });
4325
4326            let server = McpServer::builder()
4327                .with_tool_ctx_factory(factory)
4328                .add_tools(Arc::new(super::OneToolInvoker))
4329                .build()
4330                .unwrap();
4331
4332            let req = serde_json::json!({
4333                "jsonrpc": "2.0", "id": 1, "method": "tools/call",
4334                "params": { "name": "echo", "arguments": {"x": 1} }
4335            });
4336            server.handle_jsonrpc(req.clone(), None).await;
4337            server.handle_jsonrpc(req, None).await;
4338
4339            assert_eq!(counter.load(Ordering::SeqCst), 2);
4340        }
4341
4342        /// `AgentAsToolInvoker::invoke` overlays `ToolCtx::progress`
4343        /// onto the `AgentContext` it mints via `ctx_factory`. A
4344        /// transport that injects a `broadcast::Sender<AgentEvent>`
4345        /// into the `ToolCtx` must see it arrive inside `Agent::run`.
4346        #[tokio::test]
4347        async fn agent_as_tool_invoker_propagates_progress_to_agent_context() {
4348            use klieo_core::AgentEvent;
4349            use std::sync::Mutex;
4350            use tokio::sync::broadcast;
4351
4352            struct CapturingAgent {
4353                captured: Arc<Mutex<Option<Option<broadcast::Sender<AgentEvent>>>>>,
4354            }
4355
4356            #[async_trait]
4357            impl Agent for CapturingAgent {
4358                type Input = serde_json::Value;
4359                type Output = serde_json::Value;
4360                type Error = KlieoError;
4361
4362                fn name(&self) -> &str {
4363                    "capturing"
4364                }
4365                fn system_prompt(&self) -> &str {
4366                    ""
4367                }
4368                fn tools(&self) -> &[ToolDef] {
4369                    &[]
4370                }
4371                async fn run(
4372                    &self,
4373                    ctx: AgentContext,
4374                    _input: serde_json::Value,
4375                ) -> Result<serde_json::Value, KlieoError> {
4376                    *self.captured.lock().unwrap() = Some(ctx.progress.clone());
4377                    Ok(serde_json::json!({}))
4378                }
4379            }
4380
4381            let captured = Arc::new(Mutex::new(None::<Option<broadcast::Sender<AgentEvent>>>));
4382            let agent = CapturingAgent {
4383                captured: captured.clone(),
4384            };
4385
4386            let (tx, _rx) = broadcast::channel::<AgentEvent>(16);
4387            let tx_for_factory = tx.clone();
4388            let factory: ToolCtxFactory = Arc::new(move || {
4389                let bus = klieo_bus_memory::MemoryBus::new();
4390                klieo_core::tool::ToolCtx::new(bus.pubsub, bus.kv, bus.jobs)
4391                    .with_progress(tx_for_factory.clone())
4392            });
4393
4394            let server = McpServer::builder()
4395                .with_tool_ctx_factory(factory)
4396                .add_agent_with_schema(agent, serde_json::json!({}), Arc::new(fresh_ctx))
4397                .build()
4398                .unwrap();
4399
4400            let req = serde_json::json!({
4401                "jsonrpc": "2.0", "id": 1, "method": "tools/call",
4402                "params": { "name": "capturing", "arguments": {} }
4403            });
4404            let resp = server.handle_jsonrpc(req, None).await;
4405            assert!(
4406                resp["result"].is_object(),
4407                "tools/call must succeed; got: {resp}"
4408            );
4409
4410            let captured_progress = captured
4411                .lock()
4412                .unwrap()
4413                .clone()
4414                .expect("Agent::run was never invoked");
4415            assert!(
4416                captured_progress.is_some(),
4417                "AgentContext.progress was None despite ToolCtx.progress=Some"
4418            );
4419        }
4420
4421        #[cfg(feature = "schemars")]
4422        mod auto_derive {
4423            use super::*;
4424            use schemars::JsonSchema;
4425
4426            #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
4427            struct DerivedIn {
4428                who: String,
4429            }
4430
4431            #[derive(Debug, Clone, Serialize)]
4432            struct DerivedOut {
4433                greeting: String,
4434            }
4435
4436            struct DerivedGreeter;
4437
4438            #[async_trait]
4439            impl Agent for DerivedGreeter {
4440                type Input = DerivedIn;
4441                type Output = DerivedOut;
4442                type Error = KlieoError;
4443
4444                fn name(&self) -> &str {
4445                    "derived-greeter"
4446                }
4447                fn system_prompt(&self) -> &str {
4448                    ""
4449                }
4450                fn tools(&self) -> &[ToolDef] {
4451                    &[]
4452                }
4453                async fn run(
4454                    &self,
4455                    _ctx: AgentContext,
4456                    input: DerivedIn,
4457                ) -> Result<DerivedOut, KlieoError> {
4458                    Ok(DerivedOut {
4459                        greeting: format!("hi {}", input.who),
4460                    })
4461                }
4462            }
4463
4464            #[tokio::test]
4465            async fn expose_agent_auto_derives_schema_via_schemars() {
4466                let server = McpServer::expose_agent(DerivedGreeter, Arc::new(fresh_ctx));
4467                let resp = server
4468                    .handle_line(r#"{"jsonrpc":"2.0","id":1,"method":"tools/list"}"#)
4469                    .await;
4470                let schema = &resp["result"]["tools"][0]["inputSchema"];
4471                // schemars emits a JSON Schema with a "properties" map.
4472                assert!(
4473                    schema["properties"]["who"].is_object(),
4474                    "derived schema must include the `who` field; got: {schema}"
4475                );
4476            }
4477        }
4478    }
4479
4480
4481    // These tests exercise the no-governor convenience shim. Under
4482    // `--features governor` they would hit the
4483    // `WorkflowWithoutGovernor` hard gate; that's the intended
4484    // behaviour (workflows must wire a governor when the feature is
4485    // on). `governor_inbound.rs` covers the same contracts on the
4486    // builder + `with_governor` path.
4487    #[cfg(not(feature = "governor"))]
4488    mod expose_workflow_tests {
4489        use super::*;
4490        use async_trait::async_trait;
4491        use chrono::Utc;
4492        use klieo_core::agent::{Agent, AgentContext};
4493        use klieo_core::error::Error as KlieoError;
4494        use klieo_core::llm::Message;
4495        use klieo_core::runtime::{ReviewPolicy, RunOptions};
4496        use klieo_core::test_utils::{fake_context, fake_kv, FakeLlmClient, FakeLlmStep};
4497        use klieo_core::ToolDef;
4498        use klieo_hitl::HitlConfig;
4499        use klieo_hitl_client::HitlClient;
4500        use secrecy::SecretString;
4501        use serde::{Deserialize, Serialize};
4502        use serde_json::json;
4503        use std::time::Duration;
4504        use wiremock::matchers::{method, path};
4505        use wiremock::{Mock, MockServer, ResponseTemplate};
4506
4507        const CHECKPOINT_BUCKET: &str = "klieo.run-checkpoints";
4508        const WORKSPACE_ID: &str = "ws-test";
4509        const PLANTED_SENTINEL: &str = "PLANT-SENTINEL-9F7C";
4510
4511        #[derive(Debug, Clone, Deserialize, Serialize)]
4512        struct WorkflowIn {
4513            #[allow(dead_code)]
4514            payload: String,
4515        }
4516
4517        #[derive(Debug, Clone, Serialize)]
4518        struct UnusedOut;
4519
4520        struct WorkflowAgent {
4521            name: &'static str,
4522        }
4523
4524        #[async_trait]
4525        impl Agent for WorkflowAgent {
4526            type Input = WorkflowIn;
4527            type Output = UnusedOut;
4528            type Error = KlieoError;
4529
4530            fn name(&self) -> &str {
4531                self.name
4532            }
4533            fn system_prompt(&self) -> &str {
4534                ""
4535            }
4536            fn tools(&self) -> &[ToolDef] {
4537                &[]
4538            }
4539            async fn run(
4540                &self,
4541                _ctx: AgentContext,
4542                _input: WorkflowIn,
4543            ) -> Result<UnusedOut, KlieoError> {
4544                Err(KlieoError::BadResponse(
4545                    "workflow path must not call Agent::run".into(),
4546                ))
4547            }
4548        }
4549
4550        struct PauseOnce(std::sync::atomic::AtomicBool);
4551
4552        impl PauseOnce {
4553            fn new() -> Self {
4554                Self(std::sync::atomic::AtomicBool::new(false))
4555            }
4556        }
4557
4558        #[async_trait]
4559        impl ReviewPolicy for PauseOnce {
4560            async fn should_pause_for_approval(
4561                &self,
4562                _step: u32,
4563                _message: &Message,
4564            ) -> Result<Option<String>, KlieoError> {
4565                if self.0.swap(true, std::sync::atomic::Ordering::SeqCst) {
4566                    Ok(None)
4567                } else {
4568                    Ok(Some("policy reason that MUST NOT leak to peer".into()))
4569                }
4570            }
4571        }
4572
4573        fn workflow_ctx_with(steps: Vec<FakeLlmStep>) -> AgentContext {
4574            let mut ctx = fake_context("workflow-test");
4575            ctx.llm = Arc::new(FakeLlmClient::new("fake").with_steps(steps));
4576            ctx.kv = fake_kv();
4577            ctx
4578        }
4579
4580        fn item_json(id: &str, state: &str) -> serde_json::Value {
4581            json!({
4582                "id": id, "workspace_id": WORKSPACE_ID, "state": state, "version": 1,
4583                "escalation_count": 0,
4584                "decision_context": {"subject_ref":"x","run_id":"r","payload_hash_hex":"h"},
4585                "reviewer": null, "updated_at": "2026-06-18T00:00:00Z"
4586            })
4587        }
4588
4589        fn hitl_cfg(poll_timeout: Duration) -> HitlConfig {
4590            HitlConfig::new(
4591                WORKSPACE_ID,
4592                CHECKPOINT_BUCKET,
4593                Duration::from_millis(1),
4594                poll_timeout,
4595            )
4596        }
4597
4598        fn gated_run_options() -> RunOptions {
4599            RunOptions::default()
4600                .with_review_policy(Arc::new(PauseOnce::new()))
4601                .with_checkpoint_bucket(CHECKPOINT_BUCKET)
4602        }
4603
4604        fn plain_run_options() -> RunOptions {
4605            RunOptions::default()
4606        }
4607
4608        fn one_shot_ctx_factory(ctx: AgentContext) -> AgentContextFactory {
4609            let slot = Arc::new(std::sync::Mutex::new(Some(ctx)));
4610            Arc::new(move || {
4611                slot.lock()
4612                    .unwrap()
4613                    .take()
4614                    .expect("ctx_factory called more than once")
4615            })
4616        }
4617
4618        fn input_schema() -> serde_json::Value {
4619            json!({
4620                "type": "object",
4621                "properties": {"payload": {"type": "string"}},
4622                "required": ["payload"]
4623            })
4624        }
4625
4626        /// Happy path: NeverReview-equivalent runs (no policy installed)
4627        /// drive `run_with_hitl` straight through. The HITL endpoint is
4628        /// never hit, and the response carries the final LLM text.
4629        #[tokio::test]
4630        async fn invoke_happy_path_returns_text_without_hitl_traffic() {
4631            let mock = MockServer::start().await;
4632            // Any HITL traffic at all signals an incorrect suspension.
4633            Mock::given(method("POST"))
4634                .and(path("/api/v1/hitl/items"))
4635                .respond_with(ResponseTemplate::new(500))
4636                .expect(0)
4637                .mount(&mock)
4638                .await;
4639
4640            let ctx = workflow_ctx_with(vec![FakeLlmStep::Text("workflow done".into())]);
4641            let client = Arc::new(HitlClient::new(
4642                mock.uri(),
4643                SecretString::from("tok".to_string()),
4644            ));
4645            let cfg = Arc::new(hitl_cfg(Duration::from_secs(1)));
4646
4647            let server = McpServer::expose_workflow_with_schema(
4648                WorkflowAgent { name: "wf-happy" },
4649                "you are a workflow",
4650                input_schema(),
4651                plain_run_options(),
4652                client,
4653                cfg,
4654                one_shot_ctx_factory(ctx),
4655            )
4656            .unwrap();
4657
4658            let resp = server
4659                .handle_line(
4660                    r#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"wf-happy","arguments":{"payload":"hi"}}}"#,
4661                )
4662                .await;
4663
4664            let text = resp["result"]["content"][0]["text"].as_str().unwrap();
4665            assert!(
4666                text.contains("workflow done"),
4667                "tools/call must return run_with_hitl text body; got: {text}"
4668            );
4669        }
4670
4671        /// Suspension path: a pausing ReviewPolicy + near-zero
4672        /// poll_timeout drives `run_with_hitl` into `Suspended`. The
4673        /// wire response must use the SAFE reason string, MUST NOT echo
4674        /// the raw policy reason, and MUST NOT contain the planted
4675        /// sentinel (proves checkpoint/conversation bytes are dropped).
4676        #[tokio::test]
4677        async fn invoke_suspend_path_redacts_reason_and_drops_checkpoint() {
4678            let mock = MockServer::start().await;
4679            Mock::given(method("POST"))
4680                .and(path("/api/v1/hitl/items"))
4681                .respond_with(
4682                    ResponseTemplate::new(201)
4683                        .set_body_json(item_json("item-suspend", "awaiting")),
4684                )
4685                .mount(&mock)
4686                .await;
4687            Mock::given(method("GET"))
4688                .and(path("/api/v1/hitl/items/item-suspend"))
4689                .respond_with(
4690                    ResponseTemplate::new(200)
4691                        .set_body_json(item_json("item-suspend", "awaiting")),
4692                )
4693                .mount(&mock)
4694                .await;
4695
4696            let ctx = workflow_ctx_with(vec![FakeLlmStep::Text(PLANTED_SENTINEL.into())]);
4697            let client = Arc::new(HitlClient::new(
4698                mock.uri(),
4699                SecretString::from("tok".to_string()),
4700            ));
4701            let cfg = Arc::new(hitl_cfg(Duration::from_millis(5)));
4702
4703            let server = McpServer::expose_workflow_with_schema(
4704                WorkflowAgent {
4705                    name: "wf-suspend",
4706                },
4707                "you are a suspending workflow",
4708                input_schema(),
4709                gated_run_options(),
4710                client,
4711                cfg,
4712                one_shot_ctx_factory(ctx),
4713            )
4714            .unwrap();
4715
4716            let resp = server
4717                .handle_line(
4718                    r#"{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"wf-suspend","arguments":{"payload":"please-approve"}}}"#,
4719                )
4720                .await;
4721
4722            let text = resp["result"]["content"][0]["text"].as_str().unwrap();
4723            assert!(
4724                text.contains(r#""status":"suspended""#),
4725                "suspend response must carry status=suspended; got: {text}"
4726            );
4727            assert!(
4728                text.contains("workflow suspended for human review"),
4729                "suspend response must carry the safe wire reason; got: {text}"
4730            );
4731            assert!(
4732                !text.contains("policy reason that MUST NOT leak"),
4733                "raw ReviewPolicy reason leaked to peer: {text}"
4734            );
4735            assert!(
4736                !text.contains(PLANTED_SENTINEL),
4737                "checkpoint/conversation bytes leaked to peer: {text}"
4738            );
4739        }
4740
4741        /// Error path: HITL submit fails (compliance endpoint 403). The
4742        /// wire response must be the sanitised stable error envelope;
4743        /// the underlying `HitlClientError` body MUST NOT leak.
4744        #[tokio::test]
4745        async fn invoke_hitl_submit_failure_maps_to_sanitised_tool_error() {
4746            let mock = MockServer::start().await;
4747            Mock::given(method("POST"))
4748                .and(path("/api/v1/hitl/items"))
4749                .respond_with(ResponseTemplate::new(403).set_body_string("forbidden: token=xyz"))
4750                .mount(&mock)
4751                .await;
4752
4753            let ctx = workflow_ctx_with(vec![FakeLlmStep::Text("never reached".into())]);
4754            let client = Arc::new(HitlClient::new(
4755                mock.uri(),
4756                SecretString::from("tok".to_string()),
4757            ));
4758            let cfg = Arc::new(hitl_cfg(Duration::from_secs(1)));
4759
4760            let server = McpServer::expose_workflow_with_schema(
4761                WorkflowAgent { name: "wf-err" },
4762                "",
4763                input_schema(),
4764                gated_run_options(),
4765                client,
4766                cfg,
4767                one_shot_ctx_factory(ctx),
4768            )
4769            .unwrap();
4770
4771            let resp = server
4772                .handle_line(
4773                    r#"{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"wf-err","arguments":{"payload":"x"}}}"#,
4774                )
4775                .await;
4776
4777            assert!(
4778                resp.get("error").is_some(),
4779                "submit failure must surface as JSON-RPC error; got: {resp}"
4780            );
4781            let msg = resp["error"]["message"].as_str().unwrap();
4782            assert!(
4783                msg.contains("tool invocation failed"),
4784                "wire message must be the sanitised stable string; got: {msg}"
4785            );
4786            assert!(
4787                !msg.contains("token=xyz") && !msg.contains("forbidden"),
4788                "internal HitlClientError detail leaked: {msg}"
4789            );
4790        }
4791
4792
4793        /// Drive a workflow run through the public `ToolInvoker`
4794        /// surface with `caller_principal=Some("alice@x")` and assert
4795        /// no recorded message contains the principal — server-side
4796        /// authz metadata must never leak into agent memory (D3).
4797        #[tokio::test]
4798        async fn caller_principal_does_not_enter_run_state() {
4799            use klieo_core::test_utils::noop_bus;
4800            const PRINCIPAL: &str = "alice-NEVER-IN-MEMORY@x";
4801
4802            let mock = MockServer::start().await;
4803            Mock::given(method("POST"))
4804                .and(path("/api/v1/hitl/items"))
4805                .respond_with(ResponseTemplate::new(500))
4806                .expect(0)
4807                .mount(&mock)
4808                .await;
4809
4810            let ctx = workflow_ctx_with(vec![FakeLlmStep::Text("done".into())]);
4811            let short_term_for_probe = ctx.short_term.clone();
4812            let episodic_for_probe = ctx.episodic.clone();
4813            let run_id_for_probe = ctx.run_id;
4814            let client = Arc::new(HitlClient::new(
4815                mock.uri(),
4816                SecretString::from("tok".to_string()),
4817            ));
4818            let cfg = Arc::new(hitl_cfg(Duration::from_secs(1)));
4819
4820            let server = Arc::new(
4821                McpServer::builder()
4822                    .with_hitl(client, cfg)
4823                    .add_workflow_with_schema(
4824                        WorkflowAgent { name: "wf-no-leak" },
4825                        "you are a workflow",
4826                        input_schema(),
4827                        plain_run_options(),
4828                        one_shot_ctx_factory(ctx),
4829                    )
4830                    .build()
4831                    .unwrap(),
4832            );
4833
4834            let (pubsub, _, kv, jobs) = noop_bus();
4835            let tool_ctx = klieo_core::tool::ToolCtx::new(pubsub, kv, jobs)
4836                .with_caller_principal(PRINCIPAL.into());
4837            let _result = server
4838                .invoker
4839                .invoke(
4840                    "wf-no-leak",
4841                    json!({"payload": "hi"}),
4842                    tool_ctx,
4843                )
4844                .await
4845                .unwrap();
4846
4847            let history = short_term_for_probe
4848                .load(klieo_core::ids::ThreadId::new("wf-no-leak:any"), 8192)
4849                .await
4850                .unwrap_or_default();
4851            for msg in &history {
4852                assert!(
4853                    !msg.content.contains(PRINCIPAL),
4854                    "principal leaked into short-term memory: {}",
4855                    msg.content
4856                );
4857            }
4858
4859            // Hashed attribution label MUST land on the audit trail,
4860            // raw principal MUST NOT.
4861            let episodes = episodic_for_probe
4862                .replay(run_id_for_probe)
4863                .await
4864                .expect("episodic replay must succeed");
4865            let expected_label = klieo_core::principal_hash(PRINCIPAL);
4866            let attributed_labels: Vec<&str> = episodes
4867                .iter()
4868                .filter_map(|e| match e {
4869                    klieo_core::Episode::RunAttributed { tenant_label } => {
4870                        Some(tenant_label.as_str())
4871                    }
4872                    _ => None,
4873                })
4874                .collect();
4875            assert_eq!(
4876                attributed_labels,
4877                vec![expected_label.as_str()],
4878                "exactly one RunAttributed carrying principal_hash; got {episodes:?}",
4879            );
4880            for ep in &episodes {
4881                let payload = serde_json::to_string(ep).expect("episode serialises");
4882                assert!(
4883                    !payload.contains(PRINCIPAL),
4884                    "raw principal leaked into recorded episode: {payload}",
4885                );
4886            }
4887        }
4888
4889        /// Negative path: no `caller_principal` on the inbound
4890        /// `ToolCtx` (non-MCP / unauthenticated transport) records
4891        /// zero `RunAttributed` episodes.
4892        #[tokio::test]
4893        async fn no_principal_yields_no_run_attributed_episode() {
4894            use klieo_core::test_utils::noop_bus;
4895            let mock = MockServer::start().await;
4896            Mock::given(method("POST"))
4897                .and(path("/api/v1/hitl/items"))
4898                .respond_with(ResponseTemplate::new(500))
4899                .expect(0)
4900                .mount(&mock)
4901                .await;
4902            let ctx = workflow_ctx_with(vec![FakeLlmStep::Text("done".into())]);
4903            let episodic_for_probe = ctx.episodic.clone();
4904            let run_id_for_probe = ctx.run_id;
4905            let client = Arc::new(HitlClient::new(
4906                mock.uri(),
4907                SecretString::from("tok".to_string()),
4908            ));
4909            let cfg = Arc::new(hitl_cfg(Duration::from_secs(1)));
4910            let server = Arc::new(
4911                McpServer::builder()
4912                    .with_hitl(client, cfg)
4913                    .add_workflow_with_schema(
4914                        WorkflowAgent { name: "wf-anon" },
4915                        "you are a workflow",
4916                        input_schema(),
4917                        plain_run_options(),
4918                        one_shot_ctx_factory(ctx),
4919                    )
4920                    .build()
4921                    .unwrap(),
4922            );
4923            let (pubsub, _, kv, jobs) = noop_bus();
4924            let tool_ctx = klieo_core::tool::ToolCtx::new(pubsub, kv, jobs);
4925            let _ = server
4926                .invoker
4927                .invoke("wf-anon", json!({"payload": "hi"}), tool_ctx)
4928                .await
4929                .unwrap();
4930            let episodes = episodic_for_probe
4931                .replay(run_id_for_probe)
4932                .await
4933                .expect("episodic replay must succeed");
4934            let attributed_count = episodes
4935                .iter()
4936                .filter(|e| matches!(e, klieo_core::Episode::RunAttributed { .. }))
4937                .count();
4938            assert_eq!(
4939                attributed_count, 0,
4940                "RunAttributed must not appear without a caller_principal; got {episodes:?}",
4941            );
4942        }
4943
4944        /// Ticket-issuance gate: suspend with both `with_checkpoint_kv`
4945        /// AND a principal yields `{status,ticket,reason}`; suspend
4946        /// missing either falls back to the slice-1 envelope. In both
4947        /// paths the planted PLANTED_SENTINEL bytes (carried inside
4948        /// the dropped checkpoint) MUST NOT cross the wire.
4949        async fn run_suspend_with(
4950            with_kv: bool,
4951            with_principal: bool,
4952        ) -> serde_json::Value {
4953            use klieo_core::test_utils::noop_bus;
4954            let mock = MockServer::start().await;
4955            Mock::given(method("POST"))
4956                .and(path("/api/v1/hitl/items"))
4957                .respond_with(
4958                    ResponseTemplate::new(201)
4959                        .set_body_json(item_json("item-suspend", "awaiting")),
4960                )
4961                .mount(&mock)
4962                .await;
4963            Mock::given(method("GET"))
4964                .and(path("/api/v1/hitl/items/item-suspend"))
4965                .respond_with(
4966                    ResponseTemplate::new(200)
4967                        .set_body_json(item_json("item-suspend", "awaiting")),
4968                )
4969                .mount(&mock)
4970                .await;
4971
4972            let ctx = workflow_ctx_with(vec![FakeLlmStep::Text(PLANTED_SENTINEL.into())]);
4973            let client = Arc::new(HitlClient::new(
4974                mock.uri(),
4975                SecretString::from("tok".to_string()),
4976            ));
4977            let cfg = Arc::new(hitl_cfg(Duration::from_millis(5)));
4978
4979            let mut builder = McpServer::builder()
4980                .with_hitl(client, cfg)
4981                .add_workflow_with_schema(
4982                    WorkflowAgent { name: "wf-suspend" },
4983                    "",
4984                    input_schema(),
4985                    gated_run_options(),
4986                    one_shot_ctx_factory(ctx),
4987                );
4988            if with_kv {
4989                builder = builder.with_checkpoint_kv(fake_kv());
4990            }
4991            let server = Arc::new(builder.build().unwrap());
4992
4993            let (pubsub, _, kv, jobs) = noop_bus();
4994            let mut tool_ctx = klieo_core::tool::ToolCtx::new(pubsub, kv, jobs);
4995            if with_principal {
4996                tool_ctx = tool_ctx.with_caller_principal("alice@x".into());
4997            }
4998            server
4999                .invoker
5000                .invoke(
5001                    "wf-suspend",
5002                    json!({"payload": "please-approve"}),
5003                    tool_ctx,
5004                )
5005                .await
5006                .unwrap()
5007        }
5008
5009        #[tokio::test]
5010        async fn suspend_with_kv_and_principal_issues_ticket() {
5011            let envelope = run_suspend_with(true, true).await;
5012            let body = envelope.to_string();
5013            assert_eq!(envelope["status"], "suspended");
5014            let ticket = envelope["ticket"].as_str().expect("ticket present");
5015            assert!(!ticket.is_empty(), "issued ticket must be non-empty");
5016            assert!(
5017                !body.contains(PLANTED_SENTINEL),
5018                "checkpoint bytes leaked: {body}"
5019            );
5020            assert!(
5021                !body.contains("policy reason that MUST NOT leak"),
5022                "raw policy reason leaked: {body}"
5023            );
5024        }
5025
5026        #[tokio::test]
5027        async fn suspend_without_kv_falls_back_to_no_ticket_envelope() {
5028            let envelope = run_suspend_with(false, true).await;
5029            assert_eq!(envelope["status"], "suspended");
5030            assert!(
5031                envelope.get("ticket").is_none(),
5032                "no checkpoint KV must yield slice-1 envelope (no ticket field)",
5033            );
5034            let body = envelope.to_string();
5035            assert!(!body.contains(PLANTED_SENTINEL));
5036        }
5037
5038        #[tokio::test]
5039        async fn suspend_without_principal_falls_back_to_no_ticket_envelope() {
5040            let envelope = run_suspend_with(true, false).await;
5041            assert_eq!(envelope["status"], "suspended");
5042            assert!(
5043                envelope.get("ticket").is_none(),
5044                "no caller principal must yield slice-1 envelope (no ticket field)",
5045            );
5046            let body = envelope.to_string();
5047            assert!(!body.contains(PLANTED_SENTINEL));
5048        }
5049
5050        /// IDOR mandatory test: principal B resumes principal A's
5051        /// ticket → fail-closed and the ticket stays consumable by A.
5052        /// Drives the lookup-then-authz-then-claim sequence directly
5053        /// against the ticket store (the HTTP method is a thin wrapper
5054        /// around it).
5055        #[tokio::test]
5056        async fn principal_b_cannot_consume_principal_a_ticket() {
5057            use crate::resume_ticket::{ResumeTicketRecord, ResumeTicketStore};
5058            let store = ResumeTicketStore::new(fake_kv());
5059            let token = ResumeTicketStore::mint_token();
5060            let cp_json = serde_json::json!({
5061                "run_id": klieo_core::ids::RunId::new(),
5062                "step_index": 1,
5063                "thread_id": "t-idor",
5064                "messages": [],
5065                "pending_tool_calls": null,
5066                "created_at": "2026-06-18T00:00:00Z",
5067            });
5068            let checkpoint = serde_json::from_value(cp_json).unwrap();
5069            let record = ResumeTicketRecord {
5070                principal: "alice@x".into(),
5071                workflow_name: "wf".into(),
5072                checkpoint,
5073                created_at: Utc::now(),
5074            };
5075            store.persist(&token, &record).await.unwrap();
5076
5077            // Principal B follows the lookup→authz→claim sequence.
5078            let peeked = store.peek(&token).await.unwrap().expect("ticket present");
5079            let principal_b = "mallory@x";
5080            assert_ne!(
5081                peeked.principal, principal_b,
5082                "fixture must seed a distinct principal so the authz arm engages"
5083            );
5084            // The handler refuses BEFORE the claim — record this:
5085            // the ticket must therefore stay consumable by Alice.
5086            let consumed_by_alice = store.claim(&token).await.unwrap();
5087            assert!(
5088                consumed_by_alice.is_some(),
5089                "after a foreign-principal denial the rightful owner can still resume"
5090            );
5091            // And a second claim now returns None (the previous succeeded).
5092            let after = store.claim(&token).await.unwrap();
5093            assert!(after.is_none(), "the now-consumed ticket cannot be reused");
5094        }
5095
5096        /// Concurrent double-resume of one ticket: exactly one wins,
5097        /// the other observes None (and would fail-closed at the
5098        /// handler seam).
5099        #[tokio::test]
5100        async fn concurrent_resume_runs_exactly_once() {
5101            use crate::resume_ticket::{ResumeTicketRecord, ResumeTicketStore};
5102            let store = Arc::new(ResumeTicketStore::new(fake_kv()));
5103            let token = ResumeTicketStore::mint_token();
5104            let cp_json = serde_json::json!({
5105                "run_id": klieo_core::ids::RunId::new(),
5106                "step_index": 1,
5107                "thread_id": "t-conc",
5108                "messages": [],
5109                "pending_tool_calls": null,
5110                "created_at": "2026-06-18T00:00:00Z",
5111            });
5112            let checkpoint = serde_json::from_value(cp_json).unwrap();
5113            let record = ResumeTicketRecord {
5114                principal: "alice@x".into(),
5115                workflow_name: "wf".into(),
5116                checkpoint,
5117                created_at: Utc::now(),
5118            };
5119            store.persist(&token, &record).await.unwrap();
5120            let racers: Vec<_> = (0..8)
5121                .map(|_| {
5122                    let store = store.clone();
5123                    let token = token.clone();
5124                    tokio::spawn(async move { store.claim(&token).await })
5125                })
5126                .collect();
5127            let mut winners = 0usize;
5128            for handle in racers {
5129                if handle.await.unwrap().unwrap().is_some() {
5130                    winners += 1;
5131                }
5132            }
5133            assert_eq!(
5134                winners, 1,
5135                "concurrent ticket consumption must run exactly once; got {winners}"
5136            );
5137        }
5138
5139        /// Happy approve resume: a `WorkflowAsToolInvoker` resume
5140        /// handle drives `resume_from_checkpoint` to completion when
5141        /// the ctx_factory mints a real bus + kv. Validates D8 (the
5142        /// resume-time ctx shares the same KV semantics as suspend
5143        /// — here we exercise the bucketless branch, the latch test
5144        /// in klieo-core covers the bucket variant).
5145        #[tokio::test]
5146        async fn approve_resume_drives_run_to_completion() {
5147            use klieo_core::checkpoint::ApprovalDecision;
5148            use std::sync::Mutex;
5149
5150            let ctx = workflow_ctx_with(vec![FakeLlmStep::Text("approved".into())]);
5151            let cp_json = serde_json::json!({
5152                "run_id": ctx.run_id,
5153                "step_index": 1,
5154                "thread_id": "t-resume-approve",
5155                "messages": [],
5156                "pending_tool_calls": null,
5157                "created_at": "2026-06-18T00:00:00Z",
5158            });
5159            let checkpoint: klieo_core::checkpoint::RunCheckpoint =
5160                serde_json::from_value(cp_json).unwrap();
5161
5162            let client = Arc::new(HitlClient::new(
5163                "http://unused".to_string(),
5164                SecretString::from("tok".to_string()),
5165            ));
5166            let cfg = Arc::new(hitl_cfg(Duration::from_secs(1)));
5167            let ctx_holder = Arc::new(Mutex::new(Some(ctx)));
5168            let ctx_factory: AgentContextFactory = Arc::new(move || {
5169                ctx_holder
5170                    .lock()
5171                    .unwrap()
5172                    .take()
5173                    .expect("ctx_factory drained")
5174            });
5175
5176            let invoker = Arc::new(crate::workflow::WorkflowAsToolInvoker::<WorkflowAgent>::new(
5177                "wf-resume".into(),
5178                "".into(),
5179                input_schema(),
5180                ctx_factory,
5181                plain_run_options(),
5182                crate::workflow::HitlBundle { client, cfg },
5183                None,
5184                #[cfg(feature = "governor")]
5185                None,
5186            ));
5187
5188            let handle: Arc<dyn crate::workflow::WorkflowResumeHandle> = invoker;
5189            let result = handle
5190                .resume(checkpoint, ApprovalDecision::Approved, "hashed-tenant".into())
5191                .await
5192                .unwrap();
5193            assert_eq!(result, serde_json::Value::String("approved".into()));
5194        }
5195
5196        /// Reject resume: the rejection reason is appended to short-
5197        /// term memory so the LLM sees the operator's verdict, then
5198        /// the run completes.
5199        #[tokio::test]
5200        async fn reject_resume_feeds_reason_back_to_model() {
5201            use klieo_core::checkpoint::ApprovalDecision;
5202            use klieo_core::llm::Role;
5203            use std::sync::Mutex;
5204
5205            let ctx = workflow_ctx_with(vec![FakeLlmStep::Text("acknowledged".into())]);
5206            let short_term_for_probe = ctx.short_term.clone();
5207            let cp_json = serde_json::json!({
5208                "run_id": ctx.run_id,
5209                "step_index": 1,
5210                "thread_id": "t-resume-reject",
5211                "messages": [],
5212                "pending_tool_calls": null,
5213                "created_at": "2026-06-18T00:00:00Z",
5214            });
5215            let checkpoint: klieo_core::checkpoint::RunCheckpoint =
5216                serde_json::from_value(cp_json).unwrap();
5217
5218            let client = Arc::new(HitlClient::new(
5219                "http://unused".to_string(),
5220                SecretString::from("tok".to_string()),
5221            ));
5222            let cfg = Arc::new(hitl_cfg(Duration::from_secs(1)));
5223            let ctx_holder = Arc::new(Mutex::new(Some(ctx)));
5224            let ctx_factory: AgentContextFactory = Arc::new(move || {
5225                ctx_holder
5226                    .lock()
5227                    .unwrap()
5228                    .take()
5229                    .expect("ctx_factory drained")
5230            });
5231
5232            let invoker = Arc::new(crate::workflow::WorkflowAsToolInvoker::<WorkflowAgent>::new(
5233                "wf-reject".into(),
5234                "".into(),
5235                input_schema(),
5236                ctx_factory,
5237                plain_run_options(),
5238                crate::workflow::HitlBundle { client, cfg },
5239                None,
5240                #[cfg(feature = "governor")]
5241                None,
5242            ));
5243
5244            let handle: Arc<dyn crate::workflow::WorkflowResumeHandle> = invoker;
5245            let _ = handle
5246                .resume(
5247                    checkpoint,
5248                    ApprovalDecision::Rejected {
5249                        reason: "BAD-IDEA-XYZ".into(),
5250                    },
5251                    "hashed-tenant".into(),
5252                )
5253                .await
5254                .unwrap();
5255
5256            let history = short_term_for_probe
5257                .load(
5258                    klieo_core::ids::ThreadId::new("t-resume-reject"),
5259                    8192,
5260                )
5261                .await
5262                .unwrap();
5263            let rejection_seen = history
5264                .iter()
5265                .any(|m| m.role == Role::Tool && m.content.contains("BAD-IDEA-XYZ"));
5266            assert!(
5267                rejection_seen,
5268                "the model must see the operator's rejection reason on resume"
5269            );
5270        }
5271
5272        /// Builder guard: workflow registered without `with_hitl` must
5273        /// fail `build()` with the typed `WorkflowWithoutHitl` variant.
5274        #[test]
5275        fn builder_rejects_workflow_without_hitl() {
5276            let ctx_factory: AgentContextFactory = Arc::new(|| fake_context("guard-test"));
5277            let err = McpServer::builder()
5278                .add_workflow_with_schema(
5279                    WorkflowAgent { name: "wf-guard" },
5280                    "",
5281                    input_schema(),
5282                    plain_run_options(),
5283                    ctx_factory,
5284                )
5285                .build()
5286                .err()
5287                .expect("workflow without with_hitl must fail build");
5288            assert!(
5289                matches!(err, McpBuildError::WorkflowWithoutHitl),
5290                "expected WorkflowWithoutHitl, got: {err:?}"
5291            );
5292        }
5293    }
5294
5295    #[test]
5296    fn resume_errors_render_messages() {
5297        let a = McpServerError::ResumeBufferExpired { since_id: 7 };
5298        assert_eq!(a.to_string(), "resume window expired (since_id=7)");
5299        let b = McpServerError::ResumeBufferNotFound("tok".into());
5300        assert_eq!(b.to_string(), "no buffered stream for progressToken: tok");
5301    }
5302
5303    #[test]
5304    fn from_server_outbound_serialisation_maps_to_outbound_serialisation() {
5305        use klieo_core::ServerOutboundError;
5306        let serde_err = serde_json::from_str::<serde_json::Value>("{invalid}").unwrap_err();
5307        let mcp_err = McpServerError::from(ServerOutboundError::Serialisation(serde_err));
5308        assert!(
5309            matches!(mcp_err, McpServerError::OutboundSerialisation(_)),
5310            "ServerOutboundError::Serialisation must map to McpServerError::OutboundSerialisation; got {mcp_err:?}"
5311        );
5312        use std::error::Error;
5313        assert!(
5314            mcp_err.source().is_some(),
5315            "McpServerError::OutboundSerialisation must expose source via #[source]"
5316        );
5317    }
5318
5319    struct NamedAuthn;
5320
5321    #[async_trait]
5322    impl klieo_auth_common::Authenticator for NamedAuthn {
5323        async fn authenticate(
5324            &self,
5325            _headers: &dyn klieo_auth_common::Headers,
5326            _payload: &[u8],
5327        ) -> Result<klieo_auth_common::Identity, klieo_auth_common::AuthError> {
5328            Ok(klieo_auth_common::Identity::new("alice"))
5329        }
5330    }
5331
5332    #[test]
5333    fn regulated_without_tenant_kv_fails_closed() {
5334        let err = McpServer::builder()
5335            .add_tools(Arc::new(OneToolInvoker))
5336            .with_authenticator(Arc::new(NamedAuthn))
5337            .profile(klieo_core::DeploymentProfile::RegulatedMultiTenant)
5338            .build()
5339            .err()
5340            .expect("must fail closed");
5341        assert!(matches!(
5342            err,
5343            McpBuildError::RegulatedProfile(klieo_core::ProfileViolation::MissingTenantKv)
5344        ));
5345    }
5346
5347    #[test]
5348    fn regulated_without_authenticator_fails_closed() {
5349        let kv = klieo_bus_memory::MemoryBus::new().kv;
5350        let err = McpServer::builder()
5351            .add_tools(Arc::new(OneToolInvoker))
5352            .with_tenant_binding(kv)
5353            .profile(klieo_core::DeploymentProfile::RegulatedMultiTenant)
5354            .build()
5355            .err()
5356            .expect("must fail closed");
5357        assert!(matches!(
5358            err,
5359            McpBuildError::RegulatedProfile(klieo_core::ProfileViolation::AnonymousAuth)
5360        ));
5361    }
5362
5363    struct AnonAuthn;
5364
5365    #[async_trait]
5366    impl klieo_auth_common::Authenticator for AnonAuthn {
5367        async fn authenticate(
5368            &self,
5369            _headers: &dyn klieo_auth_common::Headers,
5370            _payload: &[u8],
5371        ) -> Result<klieo_auth_common::Identity, klieo_auth_common::AuthError> {
5372            Ok(klieo_auth_common::Identity::anonymous())
5373        }
5374
5375        fn allows_anonymous(&self) -> bool {
5376            true
5377        }
5378    }
5379
5380    #[test]
5381    fn regulated_with_anonymous_authenticator_fails_closed() {
5382        let kv = klieo_bus_memory::MemoryBus::new().kv;
5383        let err = McpServer::builder()
5384            .add_tools(Arc::new(OneToolInvoker))
5385            .with_tenant_binding(kv)
5386            .with_authenticator(Arc::new(AnonAuthn))
5387            .profile(klieo_core::DeploymentProfile::RegulatedMultiTenant)
5388            .build()
5389            .err()
5390            .expect("must fail closed");
5391        assert!(matches!(
5392            err,
5393            McpBuildError::RegulatedProfile(klieo_core::ProfileViolation::AnonymousAuth)
5394        ));
5395    }
5396
5397    #[test]
5398    fn regulated_forces_strict_over_lenient_binding() {
5399        let kv = klieo_bus_memory::MemoryBus::new().kv;
5400        let server = McpServer::builder()
5401            .add_tools(Arc::new(OneToolInvoker))
5402            .with_tenant_binding(kv) // lenient — profile must upgrade
5403            .with_authenticator(Arc::new(NamedAuthn))
5404            .profile(klieo_core::DeploymentProfile::RegulatedMultiTenant)
5405            .build()
5406            .expect("regulated build with named auth + kv must succeed");
5407        assert_eq!(
5408            server.ownership_registry.as_ref().map(|r| r.is_strict()),
5409            Some(true)
5410        );
5411    }
5412
5413    #[test]
5414    fn unprofiled_keeps_lenient_binding() {
5415        let kv = klieo_bus_memory::MemoryBus::new().kv;
5416        let server = McpServer::builder()
5417            .add_tools(Arc::new(OneToolInvoker))
5418            .with_tenant_binding(kv)
5419            .with_authenticator(Arc::new(NamedAuthn))
5420            .build()
5421            .expect("unprofiled build ok");
5422        assert_eq!(
5423            server.ownership_registry.as_ref().map(|r| r.is_strict()),
5424            Some(false)
5425        );
5426    }
5427}
5428
5429#[cfg(test)]
5430#[cfg(feature = "http")]
5431mod jsonrpc_const_tests {
5432    use super::*;
5433
5434    #[test]
5435    fn jsonrpc_constants_are_i64_and_unique_at_runtime() {
5436        // Documented runtime mirror of the const _ uniqueness block.
5437        // The block itself enforces the contract at compile time;
5438        // this test makes it grep-able and includes a negative-fixture
5439        // assertion that the dedup logic actually catches duplicates.
5440        let codes: [i64; 9] = [
5441            JSONRPC_PARSE_ERROR,
5442            JSONRPC_METHOD_NOT_FOUND,
5443            JSONRPC_INVALID_PARAMS,
5444            JSONRPC_SERVER_ERROR,
5445            JSONRPC_UNAUTHENTICATED,
5446            JSONRPC_RESUME_BUFFER_EXPIRED,
5447            JSONRPC_RESUME_BUFFER_NOT_FOUND,
5448            JSONRPC_LEADER_DIED,
5449            JSONRPC_SESSION_CONFLICT,
5450        ];
5451        let mut seen = std::collections::HashSet::new();
5452        let mut duplicates: Vec<i64> = Vec::new();
5453        for code in codes {
5454            if !seen.insert(code) {
5455                duplicates.push(code);
5456            }
5457        }
5458        assert!(
5459            duplicates.is_empty(),
5460            "JSONRPC_* codes must be unique; found duplicates: {duplicates:?}"
5461        );
5462
5463        // Negative fixture — the dedup logic actually rejects a known
5464        // duplicate (regression guard against the test itself drifting
5465        // into vacuous truth).
5466        let mut local = seen.clone();
5467        assert!(
5468            !local.insert(JSONRPC_PARSE_ERROR),
5469            "duplicate detection logic broken"
5470        );
5471    }
5472}