klieo-mcp-server 2.2.0

Expose any klieo ToolInvoker or Agent as an MCP server over stdio or HTTP. The inverse of klieo-tools-mcp.
Documentation
//! MCP roots — client-declared URI scopes the server may operate on.
//!
//! [`Root`] is the wire-format type returned by the client's
//! `roots/list` response. `RootsCache` (crate-private) holds the latest
//! snapshot and broadcasts updates to subscribers via a
//! [`tokio::sync::watch`] channel. Refresh is driven by
//! `notifications/roots/list_changed` arriving from the client side;
//! consumers read either the latest snapshot via `RootsCache::snapshot`
//! or subscribe via `RootsCache::subscribe` to react to changes.

use klieo_core::{ServerOutbound, ServerOutboundError};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;

/// Default upper bound on how long a single `roots/list` outbound
/// request waits for a peer response before the cache refresh path
/// returns [`ServerOutboundError::Timeout`]. Sized to comfortably
/// cover an interactive MCP client round-trip while keeping the cache
/// refresh path bounded.
const ROOTS_LIST_TIMEOUT: Duration = Duration::from_secs(10);

/// One client-declared root URI plus an optional human-readable label.
///
/// The MCP spec admits any URI scheme, though `file://` is by far the
/// most common in practice. The optional `name` is what well-behaved
/// clients render when surfacing the root to an end user.
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Root {
    /// Root URI (typically `file://...` but the MCP spec allows any
    /// URI scheme).
    pub uri: String,
    /// Optional display name surfaced by clients that show roots to
    /// the end user.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub name: Option<String>,
}

impl Root {
    /// Construction entry point for [`Root`] from outside this crate, where
    /// the `#[non_exhaustive]` attribute forbids struct-literal syntax.
    pub fn new(uri: impl Into<String>, name: Option<String>) -> Self {
        Self {
            uri: uri.into(),
            name,
        }
    }
}

/// Per-server cache of client roots. Lifetime tied to the owning
/// [`crate::McpServer`]. Updates fan out via a
/// [`tokio::sync::watch::Sender`] so multiple subscribers see the same
/// snapshot without polling.
pub(crate) struct RootsCache {
    tx: watch::Sender<Vec<Root>>,
    outbound: Arc<dyn ServerOutbound>,
}

impl RootsCache {
    /// Build a fresh cache wired to the given outbound channel. The
    /// initial snapshot is empty until [`Self::refresh`] is driven by
    /// the stdio loop's handler for `notifications/roots/list_changed`
    /// (or any other site that wants to seed the cache).
    pub(crate) fn new(outbound: Arc<dyn ServerOutbound>) -> Self {
        let (tx, _rx) = watch::channel(Vec::new());
        Self { tx, outbound }
    }

    /// Subscribe to the cache. Receivers fire whenever a fresh
    /// `roots/list` response lands.
    pub(crate) fn subscribe(&self) -> watch::Receiver<Vec<Root>> {
        self.tx.subscribe()
    }

    /// Snapshot the current cached value. Cheap clone of the latest
    /// `Vec<Root>` held by the underlying watch channel.
    pub(crate) fn snapshot(&self) -> Vec<Root> {
        self.tx.borrow().clone()
    }

    /// Issue `roots/list` over the wired outbound and replace the
    /// cached snapshot with the response. Errors propagate verbatim
    /// from the transport so callers can distinguish transport from
    /// peer-side failure modes.
    ///
    /// Uses [`watch::Sender::send_replace`] (not `send`) so the cache
    /// value updates even when no subscribers exist — callers reading
    /// via [`Self::snapshot`] must see the latest roots regardless of
    /// whether [`Self::subscribe`] was ever called.
    pub(crate) async fn refresh(&self) -> Result<(), ServerOutboundError> {
        let response = self
            .outbound
            .outbound_request("roots/list", serde_json::Value::Null, ROOTS_LIST_TIMEOUT)
            .await?;
        let roots = parse_roots_payload(&response);
        self.tx.send_replace(roots);
        Ok(())
    }
}

/// Extract the `roots` array from a `roots/list` response payload.
/// Missing or malformed payloads yield an empty vec — the wire spec
/// allows the field to be absent, and we treat a malformed entry as
/// equivalent rather than letting one bad root invalidate the whole
/// snapshot. Pulled out of [`RootsCache::refresh`] so the deserialise
/// fallback is independently testable.
fn parse_roots_payload(response: &serde_json::Value) -> Vec<Root> {
    response
        .get("roots")
        .and_then(|v| serde_json::from_value(v.clone()).ok())
        .unwrap_or_default()
}

#[cfg(test)]
mod tests {
    use super::*;
    use async_trait::async_trait;

    /// In-memory [`ServerOutbound`] used by the cache tests. Captures
    /// a single canned response and returns it (or a sentinel error)
    /// on every `outbound_request` call.
    struct MockOutbound {
        response: Result<serde_json::Value, ServerOutboundError>,
    }

    #[async_trait]
    impl ServerOutbound for MockOutbound {
        async fn outbound_request(
            &self,
            _method: &str,
            _params: serde_json::Value,
            _timeout: Duration,
        ) -> Result<serde_json::Value, ServerOutboundError> {
            match &self.response {
                Ok(v) => Ok(v.clone()),
                // Sentinel — the cache tests don't care which variant
                // bubbles up, only that the Err arm propagates.
                Err(_) => Err(ServerOutboundError::Timeout),
            }
        }
    }

    fn ok_outbound(payload: serde_json::Value) -> Arc<dyn ServerOutbound> {
        Arc::new(MockOutbound {
            response: Ok(payload),
        })
    }

    fn err_outbound() -> Arc<dyn ServerOutbound> {
        Arc::new(MockOutbound {
            response: Err(ServerOutboundError::Timeout),
        })
    }

    /// Minimal [`ToolInvoker`] used by the McpServer-level tests. The
    /// server requires at least one invoker at build time; we stub a
    /// no-tool, never-invoked one rather than reach into a real impl.
    struct StubInvoker;

    #[async_trait]
    impl klieo_core::tool::ToolInvoker for StubInvoker {
        fn catalogue(&self) -> Vec<klieo_core::llm::ToolDef> {
            Vec::new()
        }
        async fn invoke(
            &self,
            name: &str,
            _args: serde_json::Value,
            _ctx: klieo_core::tool::ToolCtx,
        ) -> Result<serde_json::Value, klieo_core::error::ToolError> {
            Err(klieo_core::error::ToolError::UnknownTool(name.into()))
        }
    }

    #[tokio::test]
    async fn client_roots_returns_empty_when_cache_missing() {
        let server = crate::McpServer::expose_tools(Arc::new(StubInvoker));
        assert!(
            server
                .stdio_session
                .get()
                .and_then(|s| s.roots_cache.get())
                .is_none(),
            "default-built server must not wire a roots cache"
        );
        assert_eq!(server.client_roots(), Vec::<Root>::new());
    }

    #[tokio::test]
    async fn subscribe_root_changes_returns_none_when_cache_missing() {
        let server = crate::McpServer::expose_tools(Arc::new(StubInvoker));
        assert!(server.subscribe_root_changes().is_none());
    }

    #[tokio::test]
    async fn cache_snapshot_returns_initial_empty_then_updated() {
        let cache = RootsCache::new(ok_outbound(serde_json::json!({})));
        assert_eq!(cache.snapshot(), Vec::<Root>::new());

        let rx = cache.subscribe();
        let next = vec![Root {
            uri: "file:///workspace".into(),
            name: None,
        }];
        cache.tx.send(next.clone()).expect("watch send");
        assert_eq!(*rx.borrow(), next);
        assert_eq!(cache.snapshot(), next);
    }

    #[tokio::test]
    async fn cache_refresh_deserialises_roots_array() {
        let payload = serde_json::json!({
            "roots": [
                {"uri": "file:///a"},
                {"uri": "file:///b", "name": "home"}
            ]
        });
        let cache = RootsCache::new(ok_outbound(payload));
        cache.refresh().await.expect("refresh ok");
        let snapshot = cache.snapshot();
        assert_eq!(snapshot.len(), 2);
        assert_eq!(snapshot[0].uri, "file:///a");
        assert_eq!(snapshot[0].name, None);
        assert_eq!(snapshot[1].uri, "file:///b");
        assert_eq!(snapshot[1].name.as_deref(), Some("home"));
    }

    #[tokio::test]
    async fn cache_refresh_propagates_outbound_error() {
        let cache = RootsCache::new(err_outbound());
        let outcome = cache.refresh().await;
        assert!(
            matches!(outcome, Err(ServerOutboundError::Timeout)),
            "transport-level failure must propagate verbatim; got {outcome:?}"
        );
        assert_eq!(
            cache.snapshot(),
            Vec::<Root>::new(),
            "failed refresh must not mutate the cached snapshot"
        );
    }

    #[test]
    fn parse_roots_payload_handles_missing_field() {
        assert_eq!(
            parse_roots_payload(&serde_json::json!({})),
            Vec::<Root>::new()
        );
    }

    #[test]
    fn parse_roots_payload_handles_malformed_entry() {
        // `uri` field missing → serde fails to decode the array →
        // fallback to empty vec.
        let response = serde_json::json!({"roots": [{"name": "no-uri"}]});
        assert_eq!(parse_roots_payload(&response), Vec::<Root>::new());
    }

    #[test]
    fn root_serialises_without_null_name() {
        let root = Root {
            uri: "file:///a".into(),
            name: None,
        };
        let serialised = serde_json::to_value(&root).expect("encode");
        assert_eq!(serialised, serde_json::json!({"uri": "file:///a"}));
    }
}