Struct McpClient

Source
pub struct McpClient<S>
where S: Service<JsonRpcMessage, Response = JsonRpcMessage> + Clone + Send + Sync + 'static, S::Error: Into<Error>, S::Future: Send,
{ /* private fields */ }
Expand description

The MCP client is the interface for MCP operations.

Implementations§

Source§

impl<S> McpClient<S>
where S: Service<JsonRpcMessage, Response = JsonRpcMessage> + Clone + Send + Sync + 'static, S::Error: Into<Error>, S::Future: Send,

Source

pub fn new(service: S) -> Self

Examples found in repository?
examples/stdio.rs (line 32)
12async fn main() -> Result<(), ClientError> {
13    // Initialize logging
14    tracing_subscriber::fmt()
15        .with_env_filter(
16            EnvFilter::from_default_env()
17                .add_directive("mcp_client=debug".parse().unwrap())
18                .add_directive("eventsource_client=debug".parse().unwrap()),
19        )
20        .init();
21
22    // 1) Create the transport
23    let transport = StdioTransport::new("uvx", vec!["mcp-server-git".to_string()], HashMap::new());
24
25    // 2) Start the transport to get a handle
26    let transport_handle = transport.start().await?;
27
28    // 3) Create the service with timeout middleware
29    let service = McpService::with_timeout(transport_handle, Duration::from_secs(10));
30
31    // 4) Create the client with the middleware-wrapped service
32    let mut client = McpClient::new(service);
33
34    // Initialize
35    let server_info = client
36        .initialize(
37            ClientInfo {
38                name: "test-client".into(),
39                version: "1.0.0".into(),
40            },
41            ClientCapabilities::default(),
42        )
43        .await?;
44    println!("Connected to server: {server_info:?}\n");
45
46    // List tools
47    let tools = client.list_tools(None).await?;
48    println!("Available tools: {tools:?}\n");
49
50    // Call tool 'git_status' with arguments = {"repo_path": "."}
51    let tool_result = client
52        .call_tool("git_status", serde_json::json!({ "repo_path": "." }))
53        .await?;
54    println!("Tool result: {tool_result:?}\n");
55
56    // List resources
57    let resources = client.list_resources(None).await?;
58    println!("Available resources: {resources:?}\n");
59
60    Ok(())
61}
More examples
Hide additional examples
examples/sse.rs (line 30)
10async fn main() -> Result<()> {
11    // Initialize logging
12    tracing_subscriber::fmt()
13        .with_env_filter(
14            EnvFilter::from_default_env()
15                .add_directive("mcp_client=debug".parse().unwrap())
16                .add_directive("eventsource_client=info".parse().unwrap()),
17        )
18        .init();
19
20    // Create the base transport
21    let transport = SseTransport::new("http://localhost:8000/sse", HashMap::new());
22
23    // Start transport
24    let handle = transport.start().await?;
25
26    // Create the service with timeout middleware
27    let service = McpService::with_timeout(handle, Duration::from_secs(3));
28
29    // Create client
30    let mut client = McpClient::new(service);
31    println!("Client created\n");
32
33    // Initialize
34    let server_info = client
35        .initialize(
36            ClientInfo {
37                name: "test-client".into(),
38                version: "1.0.0".into(),
39            },
40            ClientCapabilities::default(),
41        )
42        .await?;
43    println!("Connected to server: {server_info:?}\n");
44
45    // Sleep for 100ms to allow the server to start - surprisingly this is required!
46    tokio::time::sleep(Duration::from_millis(500)).await;
47
48    // List tools
49    let tools = client.list_tools(None).await?;
50    println!("Available tools: {tools:?}\n");
51
52    // Call tool
53    let tool_result = client
54        .call_tool(
55            "echo_tool",
56            serde_json::json!({ "message": "Client with SSE transport - calling a tool" }),
57        )
58        .await?;
59    println!("Tool result: {tool_result:?}\n");
60
61    // List resources
62    let resources = client.list_resources(None).await?;
63    println!("Resources: {resources:?}\n");
64
65    // Read resource
66    let resource = client.read_resource("echo://fixedresource").await?;
67    println!("Resource: {resource:?}\n");
68
69    Ok(())
70}
examples/stdio_integration.rs (line 41)
14async fn main() -> Result<(), ClientError> {
15    // Initialize logging
16    tracing_subscriber::fmt()
17        .with_env_filter(
18            EnvFilter::from_default_env()
19                .add_directive("mcp_client=debug".parse().unwrap())
20                .add_directive("eventsource_client=debug".parse().unwrap()),
21        )
22        .init();
23
24    // Create the transport
25    let transport = StdioTransport::new(
26        "cargo",
27        vec!["run", "-p", "mcp-server"]
28            .into_iter()
29            .map(|s| s.to_string())
30            .collect(),
31        HashMap::new(),
32    );
33
34    // Start the transport to get a handle
35    let transport_handle = transport.start().await.unwrap();
36
37    // Create the service with timeout middleware
38    let service = McpService::with_timeout(transport_handle, Duration::from_secs(10));
39
40    // Create client
41    let mut client = McpClient::new(service);
42
43    // Initialize
44    let server_info = client
45        .initialize(
46            ClientInfo {
47                name: "test-client".into(),
48                version: "1.0.0".into(),
49            },
50            ClientCapabilities::default(),
51        )
52        .await?;
53    println!("Connected to server: {server_info:?}\n");
54
55    // List tools
56    let tools = client.list_tools(None).await?;
57    println!("Available tools: {tools:?}\n");
58
59    // Call tool 'increment' tool 3 times
60    for _ in 0..3 {
61        let increment_result = client.call_tool("increment", serde_json::json!({})).await?;
62        println!("Tool result for 'increment': {increment_result:?}\n");
63    }
64
65    // Call tool 'get_value'
66    let get_value_result = client.call_tool("get_value", serde_json::json!({})).await?;
67    println!("Tool result for 'get_value': {get_value_result:?}\n");
68
69    // Call tool 'decrement' once
70    let decrement_result = client.call_tool("decrement", serde_json::json!({})).await?;
71    println!("Tool result for 'decrement': {decrement_result:?}\n");
72
73    // Call tool 'get_value'
74    let get_value_result = client.call_tool("get_value", serde_json::json!({})).await?;
75    println!("Tool result for 'get_value': {get_value_result:?}\n");
76
77    // List resources
78    let resources = client.list_resources(None).await?;
79    println!("Resources: {resources:?}\n");
80
81    // Read resource
82    let resource = client.read_resource("memo://insights").await?;
83    println!("Resource: {resource:?}\n");
84
85    let prompts = client.list_prompts(None).await?;
86    println!("Prompts: {prompts:?}\n");
87
88    let prompt = client
89        .get_prompt(
90            "example_prompt",
91            serde_json::json!({"message": "hello there!"}),
92        )
93        .await?;
94    println!("Prompt: {prompt:?}\n");
95
96    Ok(())
97}
examples/clients.rs (line 24)
13async fn main() -> Result<(), Box<dyn std::error::Error>> {
14    // Initialize logging
15    tracing_subscriber::fmt()
16        .with_env_filter(
17            EnvFilter::from_default_env().add_directive("mcp_client=debug".parse().unwrap()),
18        )
19        .init();
20
21    let transport1 = StdioTransport::new("uvx", vec!["mcp-server-git".to_string()], HashMap::new());
22    let handle1 = transport1.start().await?;
23    let service1 = McpService::with_timeout(handle1, Duration::from_secs(30));
24    let client1 = McpClient::new(service1);
25
26    let transport2 = StdioTransport::new("uvx", vec!["mcp-server-git".to_string()], HashMap::new());
27    let handle2 = transport2.start().await?;
28    let service2 = McpService::with_timeout(handle2, Duration::from_secs(30));
29    let client2 = McpClient::new(service2);
30
31    let transport3 = SseTransport::new("http://localhost:8000/sse", HashMap::new());
32    let handle3 = transport3.start().await?;
33    let service3 = McpService::with_timeout(handle3, Duration::from_secs(10));
34    let client3 = McpClient::new(service3);
35
36    // Initialize both clients
37    let mut clients: Vec<Box<dyn McpClientTrait>> =
38        vec![Box::new(client1), Box::new(client2), Box::new(client3)];
39
40    // Initialize all clients
41    for (i, client) in clients.iter_mut().enumerate() {
42        let info = ClientInfo {
43            name: format!("example-client-{}", i + 1),
44            version: "1.0.0".to_string(),
45        };
46        let capabilities = ClientCapabilities::default();
47
48        println!("\nInitializing client {}", i + 1);
49        let init_result = client.initialize(info, capabilities).await?;
50        println!("Client {} initialized: {:?}", i + 1, init_result);
51    }
52
53    // List tools for all clients
54    for (i, client) in clients.iter_mut().enumerate() {
55        let tools = client.list_tools(None).await?;
56        println!("\nClient {} tools: {:?}", i + 1, tools);
57    }
58
59    println!("\n\n----------------------------------\n\n");
60
61    // Wrap clients in Arc before spawning tasks
62    let clients = Arc::new(clients);
63    let mut handles = vec![];
64
65    for i in 0..20 {
66        let clients = Arc::clone(&clients);
67        let handle = tokio::spawn(async move {
68            // let mut rng = rand::thread_rng();
69            let mut rng = rand::rngs::StdRng::from_entropy();
70            tokio::time::sleep(Duration::from_millis(rng.gen_range(5..50))).await;
71
72            // Randomly select an operation
73            match rng.gen_range(0..4) {
74                0 => {
75                    println!("\n{i}: Listing tools for client 1 (stdio)");
76                    match clients[0].list_tools(None).await {
77                        Ok(tools) => {
78                            println!("  {i}: -> Got tools, first one: {:?}", tools.tools.first())
79                        }
80                        Err(e) => println!("  {i}: -> Error: {}", e),
81                    }
82                }
83                1 => {
84                    println!("\n{i}: Calling tool for client 2 (stdio)");
85                    match clients[1]
86                        .call_tool("git_status", serde_json::json!({ "repo_path": "." }))
87                        .await
88                    {
89                        Ok(result) => println!(
90                            "  {i}: -> Tool execution result, is_error: {:?}",
91                            result.is_error
92                        ),
93                        Err(e) => println!("  {i}: -> Error: {}", e),
94                    }
95                }
96                2 => {
97                    println!("\n{i}: Listing tools for client 3 (sse)");
98                    match clients[2].list_tools(None).await {
99                        Ok(tools) => {
100                            println!("  {i}: -> Got tools, first one: {:?}", tools.tools.first())
101                        }
102                        Err(e) => println!("  {i}: -> Error: {}", e),
103                    }
104                }
105                3 => {
106                    println!("\n{i}: Calling tool for client 3 (sse)");
107                    match clients[2]
108                            .call_tool(
109                                "echo_tool",
110                                serde_json::json!({ "message": "Client with SSE transport - calling a tool" }),
111                            )
112                            .await
113                        {
114                        Ok(result) => println!("  {i}: -> Tool execution result, is_error: {:?}", result.is_error),
115                        Err(e) => println!("  {i}: -> Error: {}", e),
116                    }
117                }
118                _ => unreachable!(),
119            }
120            Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
121        });
122        handles.push(handle);
123    }
124
125    // Wait for all tasks to complete
126    for handle in handles {
127        handle.await.unwrap().unwrap();
128    }
129
130    Ok(())
131}

Trait Implementations§

Source§

impl<S> McpClientTrait for McpClient<S>
where S: Service<JsonRpcMessage, Response = JsonRpcMessage> + Clone + Send + Sync + 'static, S::Error: Into<Error>, S::Future: Send,

Source§

fn initialize<'life0, 'async_trait>( &'life0 mut self, info: ClientInfo, capabilities: ClientCapabilities, ) -> Pin<Box<dyn Future<Output = Result<InitializeResult, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Source§

fn list_resources<'life0, 'async_trait>( &'life0 self, next_cursor: Option<String>, ) -> Pin<Box<dyn Future<Output = Result<ListResourcesResult, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Source§

fn read_resource<'life0, 'life1, 'async_trait>( &'life0 self, uri: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<ReadResourceResult, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source§

fn list_tools<'life0, 'async_trait>( &'life0 self, next_cursor: Option<String>, ) -> Pin<Box<dyn Future<Output = Result<ListToolsResult, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Source§

fn call_tool<'life0, 'life1, 'async_trait>( &'life0 self, name: &'life1 str, arguments: Value, ) -> Pin<Box<dyn Future<Output = Result<CallToolResult, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Source§

fn list_prompts<'life0, 'async_trait>( &'life0 self, next_cursor: Option<String>, ) -> Pin<Box<dyn Future<Output = Result<ListPromptsResult, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Source§

fn get_prompt<'life0, 'life1, 'async_trait>( &'life0 self, name: &'life1 str, arguments: Value, ) -> Pin<Box<dyn Future<Output = Result<GetPromptResult, Error>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Auto Trait Implementations§

§

impl<S> !Freeze for McpClient<S>

§

impl<S> !RefUnwindSafe for McpClient<S>

§

impl<S> Send for McpClient<S>

§

impl<S> Sync for McpClient<S>

§

impl<S> Unpin for McpClient<S>
where <S as Service<JsonRpcMessage>>::Error: Sized, S: Unpin,

§

impl<S> !UnwindSafe for McpClient<S>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,