fastmcp-rs 0.2.0

Rust prototype for the FastMCP server
Documentation
use std::net::SocketAddr;
use std::sync::Once;
use std::time::Duration;

use fastmcp_rs::{
    FastMcpServer, ToolDefinition, ToolResponse,
    prompt::{PromptMessage, PromptMessageContent, PromptTemplate},
    resource::{ResourceContent, ResourceDefinition},
    start_http,
};
use futures::StreamExt;
use reqwest_eventsource::{Event, EventSource};
use serde_json::{Value, json};
use tokio::sync::mpsc;

static INIT: Once = Once::new();

fn init_tracing() {
    INIT.call_once(|| {
        let _ = tracing_subscriber::fmt()
            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
            .try_init();
    });
}

#[tokio::test]
async fn http_endpoints_work() {
    init_tracing();

    let server = FastMcpServer::builder()
        .name("Rust FastMCP")
        .build()
        .into_shared();

    server
        .register_tool(
            ToolDefinition::new("echo", |_, payload: serde_json::Value| async move {
                Ok(ToolResponse::new(vec![json!({
                    "type": "text",
                    "text": payload.to_string()
                })]))
            })
            .with_description("Echoes the payload"),
        )
        .unwrap();

    server
        .register_resource(
            ResourceDefinition::static_resource(
                "resource://hello",
                ResourceContent::text("hello world"),
            )
            .with_description("Simple text resource"),
        )
        .unwrap();

    server
        .register_prompt(PromptTemplate::new(
            "welcome",
            vec![PromptMessage {
                role: "system".into(),
                content: PromptMessageContent::Text {
                    text: "Hello {{ user }}".into(),
                },
            }],
        ))
        .unwrap();

    let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
    let handle = start_http(server.clone(), addr).await.unwrap();
    let base = format!("http://{}", handle.addr());
    let client = reqwest::Client::new();

    let metadata = client
        .get(format!("{base}/metadata"))
        .send()
        .await
        .unwrap()
        .json::<serde_json::Value>()
        .await
        .unwrap();
    assert_eq!(metadata["name"], "Rust FastMCP");

    let tool_list = client
        .get(format!("{base}/tools"))
        .send()
        .await
        .unwrap()
        .json::<serde_json::Value>()
        .await
        .unwrap();
    assert_eq!(tool_list.as_array().unwrap().len(), 1);

    let tool_response = client
        .post(format!("{base}/tools/echo/call"))
        .json(&json!({ "arguments": { "message": "hi" } }))
        .send()
        .await
        .unwrap()
        .json::<serde_json::Value>()
        .await
        .unwrap();
    assert_eq!(tool_response["content"][0]["text"], "{\"message\":\"hi\"}");

    let resource = client
        .get(format!("{base}/resource"))
        .query(&[("uri", "resource://hello")])
        .send()
        .await
        .unwrap()
        .json::<serde_json::Value>()
        .await
        .unwrap();
    assert_eq!(resource["type"], "text");

    let prompt = client
        .post(format!("{base}/prompts/welcome/instantiate"))
        .json(&json!({ "arguments": { "user": "Alice" } }))
        .send()
        .await
        .unwrap()
        .json::<serde_json::Value>()
        .await
        .unwrap();
    assert_eq!(prompt["messages"][0]["content"]["text"], "Hello Alice");

    let (event_tx, mut event_rx) = mpsc::unbounded_channel::<(String, String)>();
    let event_client = client.clone();
    let base_for_events = base.clone();
    let listener = tokio::spawn(async move {
        let mut source = EventSource::new(event_client.get(format!("{base_for_events}/sse")))
            .expect("failed to create EventSource");
        while let Some(event) = source.next().await {
            match event {
                Ok(Event::Open) => continue,
                Ok(Event::Message(message)) => {
                    let _ = event_tx.send((message.event, message.data));
                }
                Err(_err) => break,
            }
        }
    });

    let gateway_response = client
        .post(format!("{base}/messages"))
        .json(&json!({
            "command": "call_tool",
            "name": "echo",
            "arguments": { "message": "from_gateway" }
        }))
        .send()
        .await
        .unwrap()
        .json::<Value>()
        .await
        .unwrap();
    assert_eq!(gateway_response["type"], "tool_invocation");

    let (event_name, event_data) = tokio::time::timeout(Duration::from_secs(5), event_rx.recv())
        .await
        .expect("event not received")
        .expect("channel closed");
    assert_eq!(event_name, "tool_invocation");
    let payload: Value = serde_json::from_str(&event_data).unwrap();
    assert_eq!(payload["type"], "tool_invocation");
    assert_eq!(
        payload["data"]["content"][0]["text"],
        "{\"message\":\"from_gateway\"}"
    );

    let session_resp = client
        .post(format!("{base}/streamable/session"))
        .send()
        .await
        .unwrap()
        .json::<Value>()
        .await
        .unwrap();
    let session_id = session_resp["session_id"]
        .as_str()
        .expect("session id")
        .to_string();

    let stream_response = client
        .get(format!("{base}/streamable/session/{session_id}"))
        .send()
        .await
        .unwrap();

    let (stream_sender, mut stream_receiver) = mpsc::unbounded_channel::<String>();
    let stream_handle = tokio::spawn(async move {
        let mut buffer = String::new();
        let mut stream = stream_response.bytes_stream();
        while let Some(chunk) = stream.next().await {
            match chunk {
                Ok(bytes) => {
                    buffer.push_str(&String::from_utf8_lossy(&bytes));
                    loop {
                        if let Some(pos) = buffer.find('\n') {
                            let line = buffer[..pos].to_string();
                            buffer.drain(..=pos);
                            if !line.trim().is_empty() {
                                let _ = stream_sender.send(line);
                            }
                        } else {
                            break;
                        }
                    }
                }
                Err(_err) => break,
            }
        }
    });

    tokio::time::sleep(Duration::from_millis(100)).await;

    let stream_command = client
        .post(format!("{base}/streamable/session/{session_id}/messages"))
        .json(&json!({
            "command": "call_tool",
            "name": "echo",
            "arguments": { "message": "from_stream" }
        }))
        .send()
        .await
        .unwrap()
        .json::<Value>()
        .await
        .unwrap();
    assert_eq!(stream_command["type"], "tool_invocation");

    let stream_line = tokio::time::timeout(Duration::from_secs(5), stream_receiver.recv())
        .await
        .expect("stream event not received")
        .expect("stream channel closed");
    let stream_payload: Value = serde_json::from_str(&stream_line).unwrap();
    assert_eq!(stream_payload["type"], "tool_invocation");
    assert_eq!(
        stream_payload["data"]["content"][0]["text"],
        "{\"message\":\"from_stream\"}"
    );

    let shutdown_resp = client
        .post(format!("{base}/streamable/session/{session_id}/messages"))
        .json(&json!({ "command": "shutdown" }))
        .send()
        .await
        .unwrap()
        .json::<Value>()
        .await
        .unwrap();
    assert_eq!(shutdown_resp["type"], "ack");

    let _ = tokio::time::timeout(Duration::from_secs(5), stream_receiver.recv()).await;
    stream_handle.abort();
    let _ = stream_handle.await;
    listener.abort();
    let _ = listener.await;

    handle.shutdown().await;
}