use async_trait::async_trait;
use std::env;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tenx_mcp::{
error::{MCPError, Result},
schema::*,
server::{MCPServer, ToolHandler},
transport::TcpServerTransport,
};
use tokio::net::TcpListener;
use tokio::signal;
use tokio::time::sleep;
use tracing::{error, info, warn};
struct FlakeyTool {
fail_count: Arc<AtomicU32>,
failures_before_success: u32,
}
impl FlakeyTool {
fn new(failures_before_success: u32) -> Self {
Self {
fail_count: Arc::new(AtomicU32::new(0)),
failures_before_success,
}
}
}
#[async_trait]
impl ToolHandler for FlakeyTool {
fn metadata(&self) -> Tool {
Tool {
name: "flakey_operation".to_string(),
description: Some("Simulates a flaky network operation".to_string()),
input_schema: ToolInputSchema {
schema_type: "object".to_string(),
properties: None,
required: None,
},
annotations: None,
}
}
async fn execute(&self, _arguments: Option<serde_json::Value>) -> Result<Vec<Content>> {
let count = self.fail_count.fetch_add(1, Ordering::SeqCst);
if count < self.failures_before_success {
warn!("FlakeyTool failing (attempt {})", count + 1);
Err(MCPError::ConnectionClosed)
} else {
info!("FlakeyTool succeeding (attempt {})", count + 1);
if count >= self.failures_before_success {
self.fail_count.store(0, Ordering::SeqCst);
}
Ok(vec![Content::Text(TextContent {
text: format!("Success after {} attempts", count + 1),
annotations: None,
})])
}
}
}
struct SlowTool {
delay_seconds: u64,
}
impl SlowTool {
fn new(delay_seconds: u64) -> Self {
Self { delay_seconds }
}
}
#[async_trait]
impl ToolHandler for SlowTool {
fn metadata(&self) -> Tool {
Tool {
name: "slow_operation".to_string(),
description: Some("Simulates a slow operation that may timeout".to_string()),
input_schema: ToolInputSchema {
schema_type: "object".to_string(),
properties: None,
required: None,
},
annotations: None,
}
}
async fn execute(&self, _arguments: Option<serde_json::Value>) -> Result<Vec<Content>> {
info!("SlowTool starting execution (will take {} seconds)...", self.delay_seconds);
sleep(Duration::from_secs(self.delay_seconds)).await;
info!("SlowTool completed");
Ok(vec![Content::Text(TextContent {
text: "Operation completed successfully".to_string(),
annotations: None,
})])
}
}
struct BrokenTool;
#[async_trait]
impl ToolHandler for BrokenTool {
fn metadata(&self) -> Tool {
Tool {
name: "broken_operation".to_string(),
description: Some("Always fails with a non-retryable error".to_string()),
input_schema: ToolInputSchema {
schema_type: "object".to_string(),
properties: None,
required: None,
},
annotations: None,
}
}
async fn execute(&self, _arguments: Option<serde_json::Value>) -> Result<Vec<Content>> {
Err(MCPError::InvalidParams {
method: "broken_operation".to_string(),
message: "This operation is permanently broken".to_string()
})
}
}
struct ReliableTool;
#[async_trait]
impl ToolHandler for ReliableTool {
fn metadata(&self) -> Tool {
Tool {
name: "reliable_operation".to_string(),
description: Some("Always succeeds immediately".to_string()),
input_schema: ToolInputSchema {
schema_type: "object".to_string(),
properties: None,
required: None,
},
annotations: None,
}
}
async fn execute(&self, _arguments: Option<serde_json::Value>) -> Result<Vec<Content>> {
Ok(vec![Content::Text(TextContent {
text: "Reliable operation completed".to_string(),
annotations: None,
})])
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_target(false)
.init();
let args: Vec<String> = env::args().collect();
let (host, port) = if args.len() == 3 {
(
args[1].clone(),
args[2].parse::<u16>().expect("Invalid port number"),
)
} else if args.len() == 1 {
("127.0.0.1".to_string(), 3001)
} else {
eprintln!("Usage: {} [host] [port]", args[0]);
eprintln!("Example: {} 127.0.0.1 3001", args[0]);
eprintln!("If no arguments provided, defaults to 127.0.0.1:3001");
std::process::exit(1);
};
let addr = format!("{host}:{port}");
let listener = TcpListener::bind(&addr).await?;
info!("Timeout Test Server listening on {}", addr);
loop {
tokio::select! {
result = listener.accept() => {
match result {
Ok((stream, peer_addr)) => {
info!("New connection from {}", peer_addr);
let mut server = MCPServer::new(
"timeout-test-server".to_string(),
"1.0.0".to_string(),
);
server = server.with_capabilities(ServerCapabilities {
tools: Some(ToolsCapability {
list_changed: Some(true),
}),
..Default::default()
});
server.register_tool(Box::new(FlakeyTool::new(2))).await; server.register_tool(Box::new(SlowTool::new(5))).await; server.register_tool(Box::new(BrokenTool)).await;
server.register_tool(Box::new(ReliableTool)).await;
info!("Registered tools for {}:", peer_addr);
info!(" - flakey_operation: Fails 2 times before succeeding");
info!(" - slow_operation: Takes 5 seconds to complete");
info!(" - broken_operation: Always fails with non-retryable error");
info!(" - reliable_operation: Always succeeds immediately");
let transport = Box::new(TcpServerTransport::new(stream));
tokio::spawn(async move {
info!("Handling connection from {}", peer_addr);
match server.serve(transport).await {
Ok(()) => info!("Connection from {} closed", peer_addr),
Err(e) => error!("Error handling connection from {}: {}", peer_addr, e),
}
});
}
Err(e) => {
error!("Failed to accept connection: {}", e);
}
}
}
_ = signal::ctrl_c() => {
info!("\nShutting down server...");
break;
}
}
}
info!("Server stopped");
Ok(())
}