use std::net::SocketAddr;
use std::sync::Once;
use std::time::Duration;
use fastmcp_rs::{
FastMcpServer, ToolDefinition, ToolResponse,
prompt::{PromptMessage, PromptMessageContent, PromptTemplate},
resource::{ResourceContent, ResourceDefinition},
start_http,
};
use futures::StreamExt;
use reqwest_eventsource::{Event, EventSource};
use serde_json::{Value, json};
use tokio::sync::mpsc;
static INIT: Once = Once::new();
fn init_tracing() {
INIT.call_once(|| {
let _ = tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();
});
}
#[tokio::test]
async fn http_endpoints_work() {
init_tracing();
let server = FastMcpServer::builder()
.name("Rust FastMCP")
.build()
.into_shared();
server
.register_tool(
ToolDefinition::new("echo", |_, payload: serde_json::Value| async move {
Ok(ToolResponse::new(vec![json!({
"type": "text",
"text": payload.to_string()
})]))
})
.with_description("Echoes the payload"),
)
.unwrap();
server
.register_resource(
ResourceDefinition::static_resource(
"resource://hello",
ResourceContent::text("hello world"),
)
.with_description("Simple text resource"),
)
.unwrap();
server
.register_prompt(PromptTemplate::new(
"welcome",
vec![PromptMessage {
role: "system".into(),
content: PromptMessageContent::Text {
text: "Hello {{ user }}".into(),
},
}],
))
.unwrap();
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let handle = start_http(server.clone(), addr).await.unwrap();
let base = format!("http://{}", handle.addr());
let client = reqwest::Client::new();
let metadata = client
.get(format!("{base}/metadata"))
.send()
.await
.unwrap()
.json::<serde_json::Value>()
.await
.unwrap();
assert_eq!(metadata["name"], "Rust FastMCP");
let tool_list = client
.get(format!("{base}/tools"))
.send()
.await
.unwrap()
.json::<serde_json::Value>()
.await
.unwrap();
assert_eq!(tool_list.as_array().unwrap().len(), 1);
let tool_response = client
.post(format!("{base}/tools/echo/call"))
.json(&json!({ "arguments": { "message": "hi" } }))
.send()
.await
.unwrap()
.json::<serde_json::Value>()
.await
.unwrap();
assert_eq!(tool_response["content"][0]["text"], "{\"message\":\"hi\"}");
let resource = client
.get(format!("{base}/resource"))
.query(&[("uri", "resource://hello")])
.send()
.await
.unwrap()
.json::<serde_json::Value>()
.await
.unwrap();
assert_eq!(resource["type"], "text");
let prompt = client
.post(format!("{base}/prompts/welcome/instantiate"))
.json(&json!({ "arguments": { "user": "Alice" } }))
.send()
.await
.unwrap()
.json::<serde_json::Value>()
.await
.unwrap();
assert_eq!(prompt["messages"][0]["content"]["text"], "Hello Alice");
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<(String, String)>();
let event_client = client.clone();
let base_for_events = base.clone();
let listener = tokio::spawn(async move {
let mut source = EventSource::new(event_client.get(format!("{base_for_events}/sse")))
.expect("failed to create EventSource");
while let Some(event) = source.next().await {
match event {
Ok(Event::Open) => continue,
Ok(Event::Message(message)) => {
let _ = event_tx.send((message.event, message.data));
}
Err(_err) => break,
}
}
});
let gateway_response = client
.post(format!("{base}/messages"))
.json(&json!({
"command": "call_tool",
"name": "echo",
"arguments": { "message": "from_gateway" }
}))
.send()
.await
.unwrap()
.json::<Value>()
.await
.unwrap();
assert_eq!(gateway_response["type"], "tool_invocation");
let (event_name, event_data) = tokio::time::timeout(Duration::from_secs(5), event_rx.recv())
.await
.expect("event not received")
.expect("channel closed");
assert_eq!(event_name, "tool_invocation");
let payload: Value = serde_json::from_str(&event_data).unwrap();
assert_eq!(payload["type"], "tool_invocation");
assert_eq!(
payload["data"]["content"][0]["text"],
"{\"message\":\"from_gateway\"}"
);
let session_resp = client
.post(format!("{base}/streamable/session"))
.send()
.await
.unwrap()
.json::<Value>()
.await
.unwrap();
let session_id = session_resp["session_id"]
.as_str()
.expect("session id")
.to_string();
let stream_response = client
.get(format!("{base}/streamable/session/{session_id}"))
.send()
.await
.unwrap();
let (stream_sender, mut stream_receiver) = mpsc::unbounded_channel::<String>();
let stream_handle = tokio::spawn(async move {
let mut buffer = String::new();
let mut stream = stream_response.bytes_stream();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
buffer.push_str(&String::from_utf8_lossy(&bytes));
loop {
if let Some(pos) = buffer.find('\n') {
let line = buffer[..pos].to_string();
buffer.drain(..=pos);
if !line.trim().is_empty() {
let _ = stream_sender.send(line);
}
} else {
break;
}
}
}
Err(_err) => break,
}
}
});
tokio::time::sleep(Duration::from_millis(100)).await;
let stream_command = client
.post(format!("{base}/streamable/session/{session_id}/messages"))
.json(&json!({
"command": "call_tool",
"name": "echo",
"arguments": { "message": "from_stream" }
}))
.send()
.await
.unwrap()
.json::<Value>()
.await
.unwrap();
assert_eq!(stream_command["type"], "tool_invocation");
let stream_line = tokio::time::timeout(Duration::from_secs(5), stream_receiver.recv())
.await
.expect("stream event not received")
.expect("stream channel closed");
let stream_payload: Value = serde_json::from_str(&stream_line).unwrap();
assert_eq!(stream_payload["type"], "tool_invocation");
assert_eq!(
stream_payload["data"]["content"][0]["text"],
"{\"message\":\"from_stream\"}"
);
let shutdown_resp = client
.post(format!("{base}/streamable/session/{session_id}/messages"))
.json(&json!({ "command": "shutdown" }))
.send()
.await
.unwrap()
.json::<Value>()
.await
.unwrap();
assert_eq!(shutdown_resp["type"], "ack");
let _ = tokio::time::timeout(Duration::from_secs(5), stream_receiver.recv()).await;
stream_handle.abort();
let _ = stream_handle.await;
listener.abort();
let _ = listener.await;
handle.shutdown().await;
}