pub struct McpService<T: TransportHandle> { /* private fields */ }
Expand description
A wrapper service that implements Tower’s Service trait for MCP transport
Implementations§
Source§impl<T: TransportHandle> McpService<T>
impl<T: TransportHandle> McpService<T>
Source§impl<T> McpService<T>where
T: TransportHandle,
impl<T> McpService<T>where
T: TransportHandle,
Sourcepub fn with_timeout(transport: T, timeout: Duration) -> Timeout<McpService<T>>
pub fn with_timeout(transport: T, timeout: Duration) -> Timeout<McpService<T>>
Examples found in repository?
examples/stdio.rs (line 29)
12async fn main() -> Result<(), ClientError> {
13 // Initialize logging
14 tracing_subscriber::fmt()
15 .with_env_filter(
16 EnvFilter::from_default_env()
17 .add_directive("mcp_client=debug".parse().unwrap())
18 .add_directive("eventsource_client=debug".parse().unwrap()),
19 )
20 .init();
21
22 // 1) Create the transport
23 let transport = StdioTransport::new("uvx", vec!["mcp-server-git".to_string()], HashMap::new());
24
25 // 2) Start the transport to get a handle
26 let transport_handle = transport.start().await?;
27
28 // 3) Create the service with timeout middleware
29 let service = McpService::with_timeout(transport_handle, Duration::from_secs(10));
30
31 // 4) Create the client with the middleware-wrapped service
32 let mut client = McpClient::new(service);
33
34 // Initialize
35 let server_info = client
36 .initialize(
37 ClientInfo {
38 name: "test-client".into(),
39 version: "1.0.0".into(),
40 },
41 ClientCapabilities::default(),
42 )
43 .await?;
44 println!("Connected to server: {server_info:?}\n");
45
46 // List tools
47 let tools = client.list_tools(None).await?;
48 println!("Available tools: {tools:?}\n");
49
50 // Call tool 'git_status' with arguments = {"repo_path": "."}
51 let tool_result = client
52 .call_tool("git_status", serde_json::json!({ "repo_path": "." }))
53 .await?;
54 println!("Tool result: {tool_result:?}\n");
55
56 // List resources
57 let resources = client.list_resources(None).await?;
58 println!("Available resources: {resources:?}\n");
59
60 Ok(())
61}
More examples
examples/sse.rs (line 27)
10async fn main() -> Result<()> {
11 // Initialize logging
12 tracing_subscriber::fmt()
13 .with_env_filter(
14 EnvFilter::from_default_env()
15 .add_directive("mcp_client=debug".parse().unwrap())
16 .add_directive("eventsource_client=info".parse().unwrap()),
17 )
18 .init();
19
20 // Create the base transport
21 let transport = SseTransport::new("http://localhost:8000/sse", HashMap::new());
22
23 // Start transport
24 let handle = transport.start().await?;
25
26 // Create the service with timeout middleware
27 let service = McpService::with_timeout(handle, Duration::from_secs(3));
28
29 // Create client
30 let mut client = McpClient::new(service);
31 println!("Client created\n");
32
33 // Initialize
34 let server_info = client
35 .initialize(
36 ClientInfo {
37 name: "test-client".into(),
38 version: "1.0.0".into(),
39 },
40 ClientCapabilities::default(),
41 )
42 .await?;
43 println!("Connected to server: {server_info:?}\n");
44
45 // Sleep for 100ms to allow the server to start - surprisingly this is required!
46 tokio::time::sleep(Duration::from_millis(500)).await;
47
48 // List tools
49 let tools = client.list_tools(None).await?;
50 println!("Available tools: {tools:?}\n");
51
52 // Call tool
53 let tool_result = client
54 .call_tool(
55 "echo_tool",
56 serde_json::json!({ "message": "Client with SSE transport - calling a tool" }),
57 )
58 .await?;
59 println!("Tool result: {tool_result:?}\n");
60
61 // List resources
62 let resources = client.list_resources(None).await?;
63 println!("Resources: {resources:?}\n");
64
65 // Read resource
66 let resource = client.read_resource("echo://fixedresource").await?;
67 println!("Resource: {resource:?}\n");
68
69 Ok(())
70}
examples/stdio_integration.rs (line 38)
14async fn main() -> Result<(), ClientError> {
15 // Initialize logging
16 tracing_subscriber::fmt()
17 .with_env_filter(
18 EnvFilter::from_default_env()
19 .add_directive("mcp_client=debug".parse().unwrap())
20 .add_directive("eventsource_client=debug".parse().unwrap()),
21 )
22 .init();
23
24 // Create the transport
25 let transport = StdioTransport::new(
26 "cargo",
27 vec!["run", "-p", "mcp-server"]
28 .into_iter()
29 .map(|s| s.to_string())
30 .collect(),
31 HashMap::new(),
32 );
33
34 // Start the transport to get a handle
35 let transport_handle = transport.start().await.unwrap();
36
37 // Create the service with timeout middleware
38 let service = McpService::with_timeout(transport_handle, Duration::from_secs(10));
39
40 // Create client
41 let mut client = McpClient::new(service);
42
43 // Initialize
44 let server_info = client
45 .initialize(
46 ClientInfo {
47 name: "test-client".into(),
48 version: "1.0.0".into(),
49 },
50 ClientCapabilities::default(),
51 )
52 .await?;
53 println!("Connected to server: {server_info:?}\n");
54
55 // List tools
56 let tools = client.list_tools(None).await?;
57 println!("Available tools: {tools:?}\n");
58
59 // Call tool 'increment' tool 3 times
60 for _ in 0..3 {
61 let increment_result = client.call_tool("increment", serde_json::json!({})).await?;
62 println!("Tool result for 'increment': {increment_result:?}\n");
63 }
64
65 // Call tool 'get_value'
66 let get_value_result = client.call_tool("get_value", serde_json::json!({})).await?;
67 println!("Tool result for 'get_value': {get_value_result:?}\n");
68
69 // Call tool 'decrement' once
70 let decrement_result = client.call_tool("decrement", serde_json::json!({})).await?;
71 println!("Tool result for 'decrement': {decrement_result:?}\n");
72
73 // Call tool 'get_value'
74 let get_value_result = client.call_tool("get_value", serde_json::json!({})).await?;
75 println!("Tool result for 'get_value': {get_value_result:?}\n");
76
77 // List resources
78 let resources = client.list_resources(None).await?;
79 println!("Resources: {resources:?}\n");
80
81 // Read resource
82 let resource = client.read_resource("memo://insights").await?;
83 println!("Resource: {resource:?}\n");
84
85 let prompts = client.list_prompts(None).await?;
86 println!("Prompts: {prompts:?}\n");
87
88 let prompt = client
89 .get_prompt(
90 "example_prompt",
91 serde_json::json!({"message": "hello there!"}),
92 )
93 .await?;
94 println!("Prompt: {prompt:?}\n");
95
96 Ok(())
97}
examples/clients.rs (line 23)
13async fn main() -> Result<(), Box<dyn std::error::Error>> {
14 // Initialize logging
15 tracing_subscriber::fmt()
16 .with_env_filter(
17 EnvFilter::from_default_env().add_directive("mcp_client=debug".parse().unwrap()),
18 )
19 .init();
20
21 let transport1 = StdioTransport::new("uvx", vec!["mcp-server-git".to_string()], HashMap::new());
22 let handle1 = transport1.start().await?;
23 let service1 = McpService::with_timeout(handle1, Duration::from_secs(30));
24 let client1 = McpClient::new(service1);
25
26 let transport2 = StdioTransport::new("uvx", vec!["mcp-server-git".to_string()], HashMap::new());
27 let handle2 = transport2.start().await?;
28 let service2 = McpService::with_timeout(handle2, Duration::from_secs(30));
29 let client2 = McpClient::new(service2);
30
31 let transport3 = SseTransport::new("http://localhost:8000/sse", HashMap::new());
32 let handle3 = transport3.start().await?;
33 let service3 = McpService::with_timeout(handle3, Duration::from_secs(10));
34 let client3 = McpClient::new(service3);
35
36 // Initialize both clients
37 let mut clients: Vec<Box<dyn McpClientTrait>> =
38 vec![Box::new(client1), Box::new(client2), Box::new(client3)];
39
40 // Initialize all clients
41 for (i, client) in clients.iter_mut().enumerate() {
42 let info = ClientInfo {
43 name: format!("example-client-{}", i + 1),
44 version: "1.0.0".to_string(),
45 };
46 let capabilities = ClientCapabilities::default();
47
48 println!("\nInitializing client {}", i + 1);
49 let init_result = client.initialize(info, capabilities).await?;
50 println!("Client {} initialized: {:?}", i + 1, init_result);
51 }
52
53 // List tools for all clients
54 for (i, client) in clients.iter_mut().enumerate() {
55 let tools = client.list_tools(None).await?;
56 println!("\nClient {} tools: {:?}", i + 1, tools);
57 }
58
59 println!("\n\n----------------------------------\n\n");
60
61 // Wrap clients in Arc before spawning tasks
62 let clients = Arc::new(clients);
63 let mut handles = vec![];
64
65 for i in 0..20 {
66 let clients = Arc::clone(&clients);
67 let handle = tokio::spawn(async move {
68 // let mut rng = rand::thread_rng();
69 let mut rng = rand::rngs::StdRng::from_entropy();
70 tokio::time::sleep(Duration::from_millis(rng.gen_range(5..50))).await;
71
72 // Randomly select an operation
73 match rng.gen_range(0..4) {
74 0 => {
75 println!("\n{i}: Listing tools for client 1 (stdio)");
76 match clients[0].list_tools(None).await {
77 Ok(tools) => {
78 println!(" {i}: -> Got tools, first one: {:?}", tools.tools.first())
79 }
80 Err(e) => println!(" {i}: -> Error: {}", e),
81 }
82 }
83 1 => {
84 println!("\n{i}: Calling tool for client 2 (stdio)");
85 match clients[1]
86 .call_tool("git_status", serde_json::json!({ "repo_path": "." }))
87 .await
88 {
89 Ok(result) => println!(
90 " {i}: -> Tool execution result, is_error: {:?}",
91 result.is_error
92 ),
93 Err(e) => println!(" {i}: -> Error: {}", e),
94 }
95 }
96 2 => {
97 println!("\n{i}: Listing tools for client 3 (sse)");
98 match clients[2].list_tools(None).await {
99 Ok(tools) => {
100 println!(" {i}: -> Got tools, first one: {:?}", tools.tools.first())
101 }
102 Err(e) => println!(" {i}: -> Error: {}", e),
103 }
104 }
105 3 => {
106 println!("\n{i}: Calling tool for client 3 (sse)");
107 match clients[2]
108 .call_tool(
109 "echo_tool",
110 serde_json::json!({ "message": "Client with SSE transport - calling a tool" }),
111 )
112 .await
113 {
114 Ok(result) => println!(" {i}: -> Tool execution result, is_error: {:?}", result.is_error),
115 Err(e) => println!(" {i}: -> Error: {}", e),
116 }
117 }
118 _ => unreachable!(),
119 }
120 Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
121 });
122 handles.push(handle);
123 }
124
125 // Wait for all tasks to complete
126 for handle in handles {
127 handle.await.unwrap().unwrap();
128 }
129
130 Ok(())
131}
Trait Implementations§
Source§impl<T: Clone + TransportHandle> Clone for McpService<T>
impl<T: Clone + TransportHandle> Clone for McpService<T>
Source§fn clone(&self) -> McpService<T>
fn clone(&self) -> McpService<T>
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source
. Read moreSource§impl<T> Service<JsonRpcMessage> for McpService<T>
impl<T> Service<JsonRpcMessage> for McpService<T>
Source§type Response = JsonRpcMessage
type Response = JsonRpcMessage
Responses given by the service.
Source§type Future = Pin<Box<dyn Future<Output = Result<<McpService<T> as Service<JsonRpcMessage>>::Response, <McpService<T> as Service<JsonRpcMessage>>::Error>> + Send>>
type Future = Pin<Box<dyn Future<Output = Result<<McpService<T> as Service<JsonRpcMessage>>::Response, <McpService<T> as Service<JsonRpcMessage>>::Error>> + Send>>
The future response value.
Auto Trait Implementations§
impl<T> Freeze for McpService<T>
impl<T> RefUnwindSafe for McpService<T>where
T: RefUnwindSafe,
impl<T> Send for McpService<T>
impl<T> Sync for McpService<T>
impl<T> Unpin for McpService<T>
impl<T> UnwindSafe for McpService<T>where
T: RefUnwindSafe,
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T, Request> ServiceExt<Request> for T
impl<T, Request> ServiceExt<Request> for T
Source§fn ready(&mut self) -> Ready<'_, Self, Request>where
Self: Sized,
fn ready(&mut self) -> Ready<'_, Self, Request>where
Self: Sized,
Yields a mutable reference to the service when it is ready to accept a request.
Source§fn ready_and(&mut self) -> Ready<'_, Self, Request>where
Self: Sized,
fn ready_and(&mut self) -> Ready<'_, Self, Request>where
Self: Sized,
👎Deprecated since 0.4.6: please use the
ServiceExt::ready
method insteadYields a mutable reference to the service when it is ready to accept a request.
Source§fn ready_oneshot(self) -> ReadyOneshot<Self, Request>where
Self: Sized,
fn ready_oneshot(self) -> ReadyOneshot<Self, Request>where
Self: Sized,
Yields the service when it is ready to accept a request.
Source§fn oneshot(self, req: Request) -> Oneshot<Self, Request>where
Self: Sized,
fn oneshot(self, req: Request) -> Oneshot<Self, Request>where
Self: Sized,
Consume this
Service
, calling with the providing request once it is ready.Source§fn and_then<F>(self, f: F) -> AndThen<Self, F>
fn and_then<F>(self, f: F) -> AndThen<Self, F>
Executes a new future after this service’s future resolves. This does
not alter the behaviour of the
poll_ready
method. Read moreSource§fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
fn map_response<F, Response>(self, f: F) -> MapResponse<Self, F>
Maps this service’s response value to a different value. This does not
alter the behaviour of the
poll_ready
method. Read moreSource§fn map_err<F, Error>(self, f: F) -> MapErr<Self, F>
fn map_err<F, Error>(self, f: F) -> MapErr<Self, F>
Maps this service’s error value to a different value. This does not
alter the behaviour of the
poll_ready
method. Read moreSource§fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
fn map_result<F, Response, Error>(self, f: F) -> MapResult<Self, F>
Maps this service’s result type (
Result<Self::Response, Self::Error>
)
to a different value, regardless of whether the future succeeds or
fails. Read moreSource§fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F>
fn map_request<F, NewRequest>(self, f: F) -> MapRequest<Self, F>
Composes a function in front of the service. Read more
Source§fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F>
fn then<F, Response, Error, Fut>(self, f: F) -> Then<Self, F>
Composes an asynchronous function after this service. Read more
Source§fn map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F>
fn map_future<F, Fut, Response, Error>(self, f: F) -> MapFuture<Self, F>
Composes a function that transforms futures produced by the service. Read more