turul-mcp-client

Comprehensive MCP client library with multi-transport support and full MCP 2025-06-18 protocol compliance.
Overview
turul-mcp-client provides a complete client implementation for the Model Context Protocol (MCP), supporting multiple transport layers and offering both high-level and low-level APIs for interacting with MCP servers.
Features
- ✅ Multi-Transport Support - HTTP, SSE, WebSocket, and stdio transports
- ✅ MCP 2025-06-18 Compliance - Full protocol specification support
- ✅ Session Management - Automatic session handling with recovery
- ✅ Streaming Support - Real-time event streaming and progress tracking
- ✅ Async/Await - Built on Tokio for high performance
- ✅ Error Recovery - Comprehensive error types and retry mechanisms
- ✅ Connection Pooling - Efficient resource management
Quick Start
Add this to your Cargo.toml:
[dependencies]
turul-mcp-client = "0.1.1"
tokio = { version = "1.0", features = ["full"] }
Basic HTTP Client
use turul_mcp_client::{McpClient, McpClientBuilder};
use turul_mcp_client::transport::HttpTransport;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let transport = HttpTransport::new("http://localhost:8080/mcp")?;
let client = McpClientBuilder::new()
.with_transport(transport)
.with_client_info("My MCP Client", "1.0.0")
.build();
client.connect().await?;
let tools = client.list_tools().await?;
println!("Available tools: {}", tools.len());
let result = client.call_tool("calculator", serde_json::json!({
"operation": "add",
"a": 5,
"b": 3
})).await?;
println!("Tool result: {:?}", result);
Ok(())
}
Transport Types
HTTP Transport (Streamable HTTP)
For modern MCP servers supporting MCP 2025-06-18:
use turul_mcp_client::transport::HttpTransport;
let transport = HttpTransport::new("http://localhost:8080/mcp")?
.with_timeout(std::time::Duration::from_secs(30))
.with_retry_config(RetryConfig::default());
let client = McpClientBuilder::new()
.with_transport(transport)
.build();
SSE Transport (HTTP+SSE)
For servers supporting server-sent events:
use turul_mcp_client::transport::SseTransport;
let transport = SseTransport::new("http://localhost:8080/mcp")?
.with_heartbeat_interval(std::time::Duration::from_secs(30));
let client = McpClientBuilder::new()
.with_transport(transport)
.enable_streaming() .build();
WebSocket Transport
For WebSocket-based MCP servers:
use turul_mcp_client::transport::WebSocketTransport;
let transport = WebSocketTransport::new("ws://localhost:8080/mcp")?
.with_ping_interval(std::time::Duration::from_secs(30));
let client = McpClientBuilder::new()
.with_transport(transport)
.build();
Stdio Transport
For command-line MCP server executables:
use turul_mcp_client::transport::StdioTransport;
let transport = StdioTransport::new("./my-mcp-server")?
.with_args(vec!["--config", "config.json"])
.with_working_dir("/path/to/server");
let client = McpClientBuilder::new()
.with_transport(transport)
.build();
Advanced Usage
Client Configuration
use turul_mcp_client::{McpClientBuilder, ClientConfig, RetryConfig, TimeoutConfig};
let config = ClientConfig {
connect_timeout: std::time::Duration::from_secs(10),
request_timeout: std::time::Duration::from_secs(30),
retry_config: RetryConfig {
max_retries: 3,
initial_delay: std::time::Duration::from_millis(100),
max_delay: std::time::Duration::from_secs(5),
exponential_base: 2.0,
},
enable_streaming: true,
buffer_size: 8192,
};
let client = McpClientBuilder::new()
.with_config(config)
.with_transport(transport)
.build();
Session Management
use turul_mcp_client::{McpClient, SessionState};
match client.session_state().await {
SessionState::Connected { session_id } => {
println!("Connected with session: {}", session_id);
}
SessionState::Disconnected => {
println!("Not connected");
client.connect().await?;
}
SessionState::Error { error } => {
println!("Session error: {}", error);
client.reconnect().await?;
}
}
let session_info = client.session_info().await?;
println!("Session ID: {}", session_info.session_id);
println!("Created: {}", session_info.created_at);
println!("Last activity: {}", session_info.last_activity);
Error Handling
use turul_mcp_client::{McpClientError, McpClientResult};
async fn robust_tool_call(client: &McpClient) -> McpClientResult<serde_json::Value> {
match client.call_tool("my_tool", serde_json::json!({"param": "value"})).await {
Ok(result) => Ok(result),
Err(McpClientError::Transport(e)) => {
tracing::warn!("Transport error, retrying: {}", e);
client.reconnect().await?;
client.call_tool("my_tool", serde_json::json!({"param": "value"})).await
}
Err(McpClientError::Session(e)) => {
tracing::error!("Session error: {}", e);
client.disconnect().await?;
client.connect().await?;
client.call_tool("my_tool", serde_json::json!({"param": "value"})).await
}
Err(e) => Err(e),
}
}
Core Operations
Tools
let tools = client.list_tools().await?;
for tool in tools {
println!("Tool: {} - {}", tool.name, tool.description.unwrap_or_default());
}
let result = client.call_tool("calculator", serde_json::json!({
"operation": "multiply",
"a": 7,
"b": 6
})).await?;
println!("Result: {:?}", result.content);
let result = client.call_tool_with_progress("long_task",
serde_json::json!({"duration": 10}),
|progress| {
println!("Progress: {}%", progress.progress);
if let Some(message) = &progress.message {
println!("Status: {}", message);
}
}
).await?;
Resources
let resources = client.list_resources().await?;
for resource in resources {
println!("Resource: {} - {}", resource.uri,
resource.description.unwrap_or_default());
}
let content = client.read_resource("file:///path/to/file.txt").await?;
println!("Resource content: {:?}", content);
client.subscribe_to_resource_updates(|notification| {
println!("Resource updated: {}", notification.uri);
}).await?;
Prompts
let prompts = client.list_prompts().await?;
for prompt in prompts {
println!("Prompt: {} - {}", prompt.name,
prompt.description.unwrap_or_default());
}
let prompt = client.get_prompt("greeting", Some(serde_json::json!({
"name": "Alice"
}))).await?;
println!("Prompt messages:");
for message in prompt.messages {
println!(" {}: {}", message.role, message.content);
}
Sampling
let messages = vec![
Message {
role: Role::User,
content: MessageContent::Text {
text: "Hello, can you help me?".to_string(),
},
}
];
let sampling_result = client.create_message(messages, None, None).await?;
println!("Generated message: {:?}", sampling_result);
Streaming and Events
Real-time Notifications
use turul_mcp_client::streaming::EventHandler;
struct MyEventHandler;
impl EventHandler for MyEventHandler {
async fn handle_progress(&self, progress: ProgressNotification) {
println!("Progress: {}% - {}",
progress.progress,
progress.message.unwrap_or_default());
}
async fn handle_log(&self, log: LoggingNotification) {
println!("Log [{}]: {:?}", log.level, log.data);
}
async fn handle_resource_updated(&self, notification: ResourceUpdatedNotification) {
println!("Resource updated: {}", notification.uri);
}
}
let client = McpClientBuilder::new()
.with_transport(transport)
.with_event_handler(Box::new(MyEventHandler))
.enable_streaming()
.build();
Progress Tracking
let progress_receiver = client.call_tool_with_progress_stream(
"data_processing",
serde_json::json!({"file_path": "/large/dataset.csv"})
).await?;
tokio::spawn(async move {
while let Some(progress) = progress_receiver.recv().await {
match progress {
ProgressUpdate::Progress { progress, total, message } => {
let percentage = progress / total.unwrap_or(1.0) * 100.0;
println!("Processing: {:.1}% - {}",
percentage,
message.unwrap_or_default());
}
ProgressUpdate::Completed { result } => {
println!("Processing completed: {:?}", result);
break;
}
ProgressUpdate::Failed { error } => {
println!("Processing failed: {}", error);
break;
}
}
}
});
Testing Support
Mock Transport
use turul_mcp_client::transport::MockTransport;
#[tokio::test]
async fn test_tool_calling() {
let mut mock_transport = MockTransport::new();
mock_transport.expect_initialize_response(InitializeResult {
protocol_version: "2025-06-18".to_string(),
capabilities: Default::default(),
server_info: ServerInfo {
name: "Test Server".to_string(),
version: "1.0.0".to_string(),
},
});
mock_transport.expect_call_tool_response("calculator", CallToolResult {
content: vec![ToolResult::text("42")],
is_error: false,
_meta: None,
});
let client = McpClientBuilder::new()
.with_transport(mock_transport)
.build();
client.connect().await.unwrap();
let result = client.call_tool("calculator",
serde_json::json!({"operation": "add", "a": 20, "b": 22})).await.unwrap();
assert_eq!(result.content[0].text.as_ref().unwrap(), "42");
}
Integration Testing
use turul_mcp_client::testing::{TestServer, TestTransport};
#[tokio::test]
async fn integration_test() {
let test_server = TestServer::new()
.with_tool("echo", |args| async move {
Ok(serde_json::json!({"echo": args}))
})
.start()
.await?;
let transport = HttpTransport::new(&test_server.url())?;
let client = McpClientBuilder::new()
.with_transport(transport)
.build();
client.connect().await?;
let result = client.call_tool("echo",
serde_json::json!({"message": "hello"})).await?;
assert_eq!(result.content[0].text.as_ref().unwrap(),
r#"{"echo": {"message": "hello"}}"#);
}
Performance Optimization
Connection Pooling
use turul_mcp_client::{McpClientPool, PoolConfig};
let pool_config = PoolConfig {
max_connections: 10,
min_connections: 2,
connection_timeout: std::time::Duration::from_secs(30),
idle_timeout: std::time::Duration::from_secs(300),
};
let pool = McpClientPool::new(transport_factory, pool_config).await?;
let client = pool.get_client().await?;
let result = client.call_tool("my_tool", args).await?;
drop(client);
Batch Operations
let batch_requests = vec![
("tool_1", serde_json::json!({"param": "value1"})),
("tool_2", serde_json::json!({"param": "value2"})),
("tool_3", serde_json::json!({"param": "value3"})),
];
let results = client.batch_call_tools(batch_requests).await?;
for (i, result) in results.into_iter().enumerate() {
println!("Tool {} result: {:?}", i + 1, result);
}
Feature Flags
[dependencies]
turul-mcp-client = { version = "0.1.1", features = ["all-transports"] }
Available features:
default = ["http", "sse"] - HTTP and SSE transport
http - HTTP transport support
sse - Server-Sent Events transport
websocket - WebSocket transport support
stdio - Standard I/O transport for executable servers
all-transports - Enable all transport types
Error Types
The client provides comprehensive error handling:
use turul_mcp_client::McpClientError;
match error {
McpClientError::Transport(e) => {
eprintln!("Transport error: {}", e);
}
McpClientError::Protocol(e) => {
eprintln!("Protocol error: {}", e);
}
McpClientError::Session(e) => {
eprintln!("Session error: {}", e);
}
McpClientError::Timeout => {
eprintln!("Request timed out");
}
McpClientError::ServerError { code, message } => {
eprintln!("Server error {}: {}", code, message);
}
}
Examples
Complete Application
use turul_mcp_client::{McpClient, McpClientBuilder};
use turul_mcp_client::transport::HttpTransport;
use tracing::{info, error};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let transport = HttpTransport::new("http://localhost:8080/mcp")?;
let client = McpClientBuilder::new()
.with_transport(transport)
.with_client_info("MCP Demo Client", "1.0.0")
.with_retry_attempts(3)
.enable_streaming()
.build();
info!("Connecting to MCP server...");
client.connect().await?;
info!("Connected successfully!");
let tools = client.list_tools().await?;
info!("Server provides {} tools", tools.len());
for tool in &tools {
info!(" - {}: {}", tool.name, tool.description.as_deref().unwrap_or("No description"));
}
if !tools.is_empty() {
let tool_name = &tools[0].name;
info!("Calling tool: {}", tool_name);
match client.call_tool(tool_name, serde_json::json!({})).await {
Ok(result) => {
info!("Tool result: {:?}", result.content);
}
Err(e) => {
error!("Tool call failed: {}", e);
}
}
}
info!("Disconnecting...");
client.disconnect().await?;
Ok(())
}
License
Licensed under the MIT License. See LICENSE for details.