use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use rs_utcp::config::UtcpClientConfig;
use rs_utcp::providers::mcp::McpProvider;
use rs_utcp::repository::in_memory::InMemoryToolRepository;
use rs_utcp::tag::tag_search::TagSearchStrategy;
use rs_utcp::{UtcpClient, UtcpClientInterface};
use serde_json::json;
use std::collections::HashMap;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
println!("🔄 MCP Streaming Example\n");
let addr = spawn_mcp_server().await?;
println!(" ✓ Started local MCP server at http://{}", addr);
let repo = Arc::new(InMemoryToolRepository::new());
let search = Arc::new(TagSearchStrategy::new(repo.clone(), 1.0));
let config = UtcpClientConfig::default();
let client = UtcpClient::new(config, repo.clone(), search).await?;
println!("\n📡 Example 1: HTTP MCP Provider with SSE");
let http_provider = Arc::new(McpProvider::new(
"http_mcp".to_string(),
format!("http://{}/mcp", addr),
None,
));
match client.register_tool_provider(http_provider.clone()).await {
Ok(tools) => {
println!(
" ✓ Registered HTTP MCP provider with {} tools",
tools.len()
);
if let Some(tool) = tools.first() {
let mut args = HashMap::new();
args.insert("query".to_string(), serde_json::json!("test"));
println!(" → Streaming results from tool: {}", tool.name);
match client.call_tool_stream(&tool.name, args).await {
Ok(mut stream) => {
while let Ok(Some(value)) = stream.next().await {
println!(" 📦 Received: {}", serde_json::to_string_pretty(&value)?);
}
stream.close().await?;
}
Err(e) => println!(" ⚠ Stream error: {}", e),
}
}
}
Err(e) => println!(" ⚠ Failed to register HTTP provider: {}", e),
}
println!("\n📝 Example 2: Stdio MCP Provider with Streaming");
let stdio_provider = Arc::new(McpProvider::new_stdio(
"stdio_mcp".to_string(),
"python3".to_string(),
Some(vec!["examples/mcp_stdio_server.py".to_string()]),
None,
));
match client.register_tool_provider(stdio_provider.clone()).await {
Ok(tools) => {
println!(
" ✓ Registered stdio MCP provider with {} tools",
tools.len()
);
let mut args = HashMap::new();
args.insert("a".to_string(), serde_json::json!(10));
args.insert("b".to_string(), serde_json::json!(20));
println!(" → Streaming results from tool: stdio_mcp.add");
match client.call_tool_stream("stdio_mcp.add", args).await {
Ok(mut stream) => {
while let Ok(Some(value)) = stream.next().await {
println!(" 📦 Received: {}", serde_json::to_string_pretty(&value)?);
}
stream.close().await?;
}
Err(e) => println!(" ⚠ Stream error: {}", e),
}
}
Err(e) => println!(" ⚠ Failed to register stdio provider: {}", e),
}
println!("\n✨ Demo complete!");
Ok(())
}
async fn spawn_mcp_server() -> anyhow::Result<SocketAddr> {
let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
let server = Server::try_bind(&([127, 0, 0, 1], 0).into())?;
let addr = server.local_addr();
tokio::spawn(server.serve(make_svc));
Ok(addr)
}
async fn handle(req: Request<Body>) -> Result<Response<Body>, Infallible> {
if req.method() != Method::POST {
return Ok(Response::builder()
.status(StatusCode::METHOD_NOT_ALLOWED)
.body(Body::empty())
.unwrap());
}
let accept_header = req
.headers()
.get("Accept")
.and_then(|h| h.to_str().ok())
.unwrap_or("");
let is_sse = accept_header.contains("text/event-stream");
let body = hyper::body::to_bytes(req.into_body())
.await
.unwrap_or_default();
let payload: serde_json::Value = serde_json::from_slice(&body).unwrap_or(json!({}));
let method = payload.get("method").and_then(|m| m.as_str()).unwrap_or("");
match method {
"tools/list" => {
let resp = json!({
"jsonrpc": "2.0",
"result": { "tools": [{
"name": "stream_echo",
"description": "Echo args with streaming",
"inputs": { "type": "object" },
"outputs": { "type": "object" },
"tags": ["mcp", "stream"]
}]},
"id": payload.get("id").cloned().unwrap_or(json!(1))
});
Ok(json_response(StatusCode::OK, resp))
}
"tools/call" => {
if is_sse {
let (mut tx, body) = Body::channel();
tokio::spawn(async move {
let messages = vec!["Hello", "from", "SSE", "stream!"];
for msg in messages {
let data = json!({
"type": "chunk",
"content": msg
});
let event = format!("data: {}\n\n", data.to_string());
if tx.send_data(event.into()).await.is_err() {
break;
}
sleep(Duration::from_millis(200)).await;
}
});
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/event-stream")
.header("Cache-Control", "no-cache")
.header("Connection", "keep-alive")
.body(body)
.unwrap())
} else {
let resp = json!({
"jsonrpc": "2.0",
"result": payload.get("params").cloned().unwrap_or(json!({})),
"id": payload.get("id").cloned().unwrap_or(json!(1))
});
Ok(json_response(StatusCode::OK, resp))
}
}
_ => Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::empty())
.unwrap()),
}
}
fn json_response(status: StatusCode, body: serde_json::Value) -> Response<Body> {
Response::builder()
.status(status)
.header("content-type", "application/json")
.body(Body::from(body.to_string()))
.unwrap()
}