sim-lib-mcp 0.1.0-rc.1

Library-only MCP surface projection for SIM.
Documentation
use std::collections::VecDeque;
use std::sync::{
    Arc, Mutex,
    atomic::{AtomicUsize, Ordering},
};

use sim_codec_mcp::{McpEnvelope, McpNotification, McpRequest, McpResponse};
use sim_kernel::{
    AbiVersion, Args, Callable, CapabilityName, Cx, Error, Export, Lib, LibManifest, LibTarget,
    Linker, LoadCx, Object, ObjectCompat, Result, ShapeRef, Stream, Symbol, Value, Version,
};
use sim_lib_stream_core::StreamPacket;
use sim_shape::{AnyShape, shape_value};

use crate::{
    McpExportFacet, McpNativeCard, McpProfile, McpRouter, McpSession, mcp_cancelled_data_kind,
    mcp_tools_call_capability,
};
use sim_kernel::Expr;

#[test]
fn tools_call_with_progress_token_emits_ordered_notifications_and_final_response() {
    let mut cx = cx();
    let symbol = Symbol::qualified("fixture", "streaming");
    let function = install_streaming_tool(&mut cx, symbol.clone());
    let capability = CapabilityName::new("fixture.streaming.call");
    let session = session_with_streaming_tool(symbol, "fixture.streaming", capability.clone())
        .with_granted_capability(mcp_tools_call_capability())
        .with_granted_capability(capability);
    let mut router = McpRouter::new(session);

    let replies = router
        .handle_many(
            &mut cx,
            request(
                Expr::String("stream-1".to_owned()),
                "tools/call",
                call_params_with_token("fixture.streaming", Some(Expr::String("tok-1".to_owned()))),
            ),
        )
        .unwrap();

    assert_eq!(replies.len(), 3);
    assert_eq!(
        progress_payload(&replies[0]),
        Some(&Expr::String("first".to_owned()))
    );
    assert_eq!(
        progress_payload(&replies[1]),
        Some(&Expr::String("second".to_owned()))
    );
    assert_eq!(
        progress_token(&replies[0]),
        Some(&Expr::String("tok-1".to_owned()))
    );
    assert_final_success(&replies[2]);
    assert_eq!(function.call_count(), 1);
    assert_eq!(function.next_count(), 2);
}

#[test]
fn tools_call_without_progress_token_records_stream_but_emits_only_final_response() {
    let mut cx = cx();
    let symbol = Symbol::qualified("fixture", "quiet-streaming");
    let function = install_streaming_tool(&mut cx, symbol.clone());
    let capability = CapabilityName::new("fixture.quiet-streaming.call");
    let session = session_with_streaming_tool(symbol, "fixture.quiet", capability.clone())
        .with_granted_capability(mcp_tools_call_capability())
        .with_granted_capability(capability);
    let mut router = McpRouter::new(session);

    let replies = router
        .handle_many(
            &mut cx,
            request(
                Expr::String("stream-2".to_owned()),
                "tools/call",
                call_params_with_token("fixture.quiet", None),
            ),
        )
        .unwrap();

    assert_eq!(replies.len(), 1);
    assert_final_success(&replies[0]);
    assert_eq!(function.call_count(), 1);
    assert_eq!(function.next_count(), 2);
    assert_eq!(router.session().stream_packets().len(), 2);
}

#[test]
fn cancellation_notification_marks_active_request_and_records_data_packet() {
    let mut cx = cx();
    let mut router = McpRouter::fixture();
    let request_id = Expr::String("cancel-1".to_owned());
    router.session_mut().begin_request(&request_id);

    let replies = router
        .handle_many(
            &mut cx,
            McpEnvelope::Notification(McpNotification {
                method: "notifications/cancelled".to_owned(),
                params: Expr::Map(vec![
                    field("requestId", request_id.clone()),
                    field("reason", Expr::String("client stopped".to_owned())),
                ]),
            }),
        )
        .unwrap();

    assert!(replies.is_empty());
    assert!(router.session().request_cancelled(&request_id));
    let [StreamPacket::Data(packet)] = router.session().stream_packets() else {
        panic!("expected one cancellation data packet");
    };
    assert_eq!(packet.kind, mcp_cancelled_data_kind());
    assert_eq!(map_field(&packet.payload, "requestId"), Some(&request_id));
    assert_eq!(
        map_field(&packet.payload, "reason"),
        Some(&Expr::String("client stopped".to_owned()))
    );
}

fn install_streaming_tool(cx: &mut Cx, symbol: Symbol) -> Arc<StreamingFunction> {
    let function = Arc::new(StreamingFunction::new());
    cx.load_lib(&StreamingLib {
        id: Symbol::qualified("mcp-stream-test", symbol.to_string()),
        symbol,
        function: function.clone(),
    })
    .unwrap();
    function
}

fn session_with_streaming_tool(
    symbol: Symbol,
    name: &str,
    capability: CapabilityName,
) -> McpSession {
    McpSession::new("mcp-stream", McpProfile::all()).with_native_cards(vec![
        McpNativeCard::new(symbol, "Fixture streaming MCP native tool")
            .with_shapes(any_shape("stream-args"), any_shape("stream-result"))
            .with_capability(capability)
            .exported(McpExportFacet::tool().with_name(name.to_owned())),
    ])
}

struct StreamingFunction {
    calls: Arc<AtomicUsize>,
    nexts: Arc<AtomicUsize>,
}

impl StreamingFunction {
    fn new() -> Self {
        Self {
            calls: Arc::new(AtomicUsize::new(0)),
            nexts: Arc::new(AtomicUsize::new(0)),
        }
    }

    fn call_count(&self) -> usize {
        self.calls.load(Ordering::SeqCst)
    }

    fn next_count(&self) -> usize {
        self.nexts.load(Ordering::SeqCst)
    }
}

impl Object for StreamingFunction {
    fn display(&self, _cx: &mut Cx) -> Result<String> {
        Ok("#<mcp-stream-test-function>".to_owned())
    }

    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
}

impl ObjectCompat for StreamingFunction {
    fn as_callable(&self) -> Option<&dyn Callable> {
        Some(self)
    }
}

impl Callable for StreamingFunction {
    fn call(&self, cx: &mut Cx, _args: Args) -> Result<Value> {
        self.calls.fetch_add(1, Ordering::SeqCst);
        cx.factory().opaque(Arc::new(FixtureStream::new(
            vec![
                StreamPacket::data(
                    Symbol::qualified("stream/data", "fixture"),
                    Expr::String("first".to_owned()),
                )
                .to_expr(),
                StreamPacket::data(
                    Symbol::qualified("stream/data", "fixture"),
                    Expr::String("second".to_owned()),
                )
                .to_expr(),
            ],
            self.nexts.clone(),
        )))
    }
}

struct FixtureStream {
    items: Mutex<VecDeque<Expr>>,
    nexts: Arc<AtomicUsize>,
}

impl FixtureStream {
    fn new(items: Vec<Expr>, nexts: Arc<AtomicUsize>) -> Self {
        Self {
            items: Mutex::new(items.into()),
            nexts,
        }
    }
}

impl Object for FixtureStream {
    fn display(&self, _cx: &mut Cx) -> Result<String> {
        Ok("#<mcp-stream-test-stream>".to_owned())
    }

    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
}

impl ObjectCompat for FixtureStream {
    fn as_stream(&self) -> Option<&dyn Stream> {
        Some(self)
    }

    fn as_expr(&self, _cx: &mut Cx) -> Result<Expr> {
        Ok(Expr::Map(vec![
            (
                Expr::String("kind".to_owned()),
                Expr::String("fixture-stream".to_owned()),
            ),
            (
                Expr::String("done".to_owned()),
                Expr::Bool(
                    self.items
                        .lock()
                        .map(|items| items.is_empty())
                        .unwrap_or(true),
                ),
            ),
        ]))
    }
}

impl Stream for FixtureStream {
    fn next(&self, cx: &mut Cx) -> Result<Option<Value>> {
        let item = self
            .items
            .lock()
            .map_err(|_| Error::PoisonedLock("fixture MCP stream"))?
            .pop_front();
        match item {
            Some(expr) => {
                self.nexts.fetch_add(1, Ordering::SeqCst);
                Ok(Some(cx.factory().expr(expr)?))
            }
            None => Ok(None),
        }
    }
}

struct StreamingLib {
    id: Symbol,
    symbol: Symbol,
    function: Arc<StreamingFunction>,
}

impl Lib for StreamingLib {
    fn manifest(&self) -> LibManifest {
        LibManifest {
            id: self.id.clone(),
            version: Version(env!("CARGO_PKG_VERSION").to_owned()),
            abi: AbiVersion { major: 0, minor: 1 },
            target: LibTarget::HostRegistered,
            requires: Vec::new(),
            capabilities: Vec::new(),
            exports: vec![Export::Function {
                symbol: self.symbol.clone(),
                function_id: None,
            }],
        }
    }

    fn load(&self, cx: &mut LoadCx, linker: &mut Linker<'_>) -> Result<()> {
        linker.function_value(
            self.symbol.clone(),
            cx.factory().opaque(self.function.clone())?,
        )?;
        Ok(())
    }
}

fn request(id: Expr, method: &str, params: Expr) -> McpEnvelope {
    McpEnvelope::Request(McpRequest {
        id,
        method: method.to_owned(),
        params,
    })
}

fn call_params_with_token(name: &str, token: Option<Expr>) -> Expr {
    let mut fields = vec![
        field("name", Expr::String(name.to_owned())),
        field("arguments", Expr::List(Vec::new())),
    ];
    if let Some(token) = token {
        fields.push(field(
            "_meta",
            Expr::Map(vec![field("progressToken", token)]),
        ));
    }
    Expr::Map(fields)
}

fn assert_final_success(envelope: &McpEnvelope) {
    let McpEnvelope::Response(McpResponse { result, .. }) = envelope else {
        panic!("expected final MCP response");
    };
    assert_eq!(map_field(result, "isError"), Some(&Expr::Bool(false)));
}

fn progress_payload(envelope: &McpEnvelope) -> Option<&Expr> {
    let params = progress_params(envelope)?;
    let data = map_field(params, "data")?;
    map_field(data, "payload")
}

fn progress_token(envelope: &McpEnvelope) -> Option<&Expr> {
    progress_params(envelope).and_then(|params| map_field(params, "progressToken"))
}

fn progress_params(envelope: &McpEnvelope) -> Option<&Expr> {
    let McpEnvelope::Notification(notification) = envelope else {
        return None;
    };
    (notification.method == "notifications/progress").then_some(&notification.params)
}

use sim_value::access::field_any as map_field;

fn any_shape(name: &str) -> ShapeRef {
    shape_value(
        Symbol::qualified("mcp-stream-test", name.to_owned()),
        Arc::new(AnyShape),
    )
}

use sim_kernel::testing::eager_cx as cx;

use sim_value::build::entry as field;