use async_trait::async_trait;
use pmcp::server::streamable_http_server::StreamableHttpServer;
use pmcp::types::capabilities::ServerCapabilities;
use pmcp::types::ToolInfo;
use pmcp::{Server, SimpleTool, SyncTool, ToolHandler};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::info;
struct EchoTool;
#[async_trait]
impl ToolHandler for EchoTool {
async fn handle(&self, args: Value, _extra: pmcp::RequestHandlerExtra) -> pmcp::Result<Value> {
let message = args
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("(no message provided)");
Ok(json!({
"echo": message,
"timestamp": chrono::Utc::now().to_rfc3339()
}))
}
}
#[derive(Debug, Deserialize)]
struct CalculatorArgs {
operation: String,
a: f64,
b: f64,
}
#[derive(Debug, Serialize)]
struct CalculatorResult {
result: f64,
expression: String,
}
struct CalculatorTool;
#[async_trait]
impl ToolHandler for CalculatorTool {
async fn handle(&self, args: Value, _extra: pmcp::RequestHandlerExtra) -> pmcp::Result<Value> {
let params: CalculatorArgs = serde_json::from_value(args)
.map_err(|e| pmcp::Error::validation(format!("Invalid arguments: {}", e)))?;
let result = match params.operation.as_str() {
"add" => params.a + params.b,
"subtract" => params.a - params.b,
"multiply" => params.a * params.b,
"divide" => {
if params.b == 0.0 {
return Err(pmcp::Error::validation("Division by zero"));
}
params.a / params.b
},
op => {
return Err(pmcp::Error::validation(format!(
"Unknown operation: {}",
op
)));
},
};
let expression = format!(
"{} {} {} = {}",
params.a, params.operation, params.b, result
);
Ok(serde_json::to_value(CalculatorResult {
result,
expression,
})?)
}
fn metadata(&self) -> Option<ToolInfo> {
Some(ToolInfo::new(
"calculate",
Some("Perform basic arithmetic operations".to_string()),
json!({
"type": "object",
"properties": {
"operation": {
"type": "string",
"enum": ["add", "subtract", "multiply", "divide"],
"description": "The operation to perform"
},
"a": {
"type": "number",
"description": "First operand"
},
"b": {
"type": "number",
"description": "Second operand"
}
},
"required": ["operation", "a", "b"]
}),
))
}
}
struct SessionInfoTool;
#[async_trait]
impl ToolHandler for SessionInfoTool {
async fn handle(&self, _args: Value, _extra: pmcp::RequestHandlerExtra) -> pmcp::Result<Value> {
Ok(json!({
"message": "This is a stateful server with session management",
"session_active": true,
"server_mode": "stateful",
"timestamp": chrono::Utc::now().to_rfc3339()
}))
}
}
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter("pmcp=info")
.init();
info!("Starting Stateful Streamable HTTP Server Example");
let advanced_math_tool = SimpleTool::new(
"advanced_math",
Box::new(|args: Value, _extra: pmcp::RequestHandlerExtra| {
Box::pin(async move {
let func = args
.get("function")
.and_then(|v| v.as_str())
.ok_or_else(|| pmcp::Error::validation("Missing function parameter"))?;
let x = args
.get("x")
.and_then(|v| v.as_f64())
.ok_or_else(|| pmcp::Error::validation("Missing x parameter"))?;
let result = match func {
"sin" => x.sin(),
"cos" => x.cos(),
"tan" => x.tan(),
"sqrt" => {
if x < 0.0 {
return Err(pmcp::Error::validation(
"Cannot take square root of negative number",
));
}
x.sqrt()
},
"ln" => {
if x <= 0.0 {
return Err(pmcp::Error::validation(
"Cannot take natural log of non-positive number",
));
}
x.ln()
},
_ => {
return Err(pmcp::Error::validation(format!(
"Unknown function: {}",
func
)))
},
};
Ok(json!({
"result": result,
"function": func,
"input": x
}))
})
as std::pin::Pin<Box<dyn std::future::Future<Output = pmcp::Result<Value>> + Send>>
}),
)
.with_description("Perform advanced mathematical functions")
.with_schema(json!({
"type": "object",
"properties": {
"function": {
"type": "string",
"enum": ["sin", "cos", "tan", "sqrt", "ln"],
"description": "The mathematical function to apply"
},
"x": {
"type": "number",
"description": "The input value"
}
},
"required": ["function", "x"]
}));
let random_tool = SyncTool::new("random", |args| {
let min = args.get("min").and_then(|v| v.as_f64()).unwrap_or(0.0);
let max = args.get("max").and_then(|v| v.as_f64()).unwrap_or(1.0);
let seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as f64;
let random = ((seed * 9.869604401089358) % 1.0).abs();
let value = min + (max - min) * random;
Ok(json!({
"value": value,
"min": min,
"max": max
}))
})
.with_description("Generate a random number within a range")
.with_schema(json!({
"type": "object",
"properties": {
"min": {
"type": "number",
"description": "Minimum value (inclusive)",
"default": 0
},
"max": {
"type": "number",
"description": "Maximum value (exclusive)",
"default": 1
}
}
}));
let server = Server::builder()
.name("stateful-http-example")
.version("1.0.0")
.capabilities(ServerCapabilities::tools_only())
.tool("echo", EchoTool)
.tool("calculate", CalculatorTool)
.tool("session_info", SessionInfoTool)
.tool("advanced_math", advanced_math_tool)
.tool("random", random_tool)
.build()
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
let server = Arc::new(Mutex::new(server));
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8080);
info!("Creating stateful HTTP server on {}", addr);
let http_server = StreamableHttpServer::new(addr, server);
let (bound_addr, server_handle) = http_server
.start()
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
println!("╔════════════════════════════════════════════════════════════╗");
println!("║ STATEFUL STREAMABLE HTTP SERVER RUNNING ║");
println!("╠════════════════════════════════════════════════════════════╣");
println!("║ Address: http://{:43} ║", bound_addr);
println!("║ Mode: Stateful (with session management) ║");
println!("╠════════════════════════════════════════════════════════════╣");
println!("║ Features: ║");
println!("║ • Session IDs are generated on initialization ║");
println!("║ • Each client gets a unique session ║");
println!("║ • Sessions are tracked and validated ║");
println!("║ • Re-initialization is prevented ║");
println!("╠════════════════════════════════════════════════════════════╣");
println!("║ Available Tools: ║");
println!("║ • echo - Echo back messages ║");
println!("║ • calculate - Perform arithmetic (with schema) ║");
println!("║ • session_info - Get session information ║");
println!("║ • advanced_math- Advanced math functions (with schema) ║");
println!("║ • random - Random number generator (with schema) ║");
println!("╠════════════════════════════════════════════════════════════╣");
println!("║ Connect with: ║");
println!("║ cargo run --example 24_streamable_http_client ║");
println!("╚════════════════════════════════════════════════════════════╝");
println!();
println!("Press Ctrl+C to stop the server");
server_handle
.await
.map_err(|e| pmcp::Error::Internal(e.to_string()))?;
Ok(())
}