Skip to main content

brainwires_agent_network/client/
client.rs

1use anyhow::Result;
2use std::sync::atomic::{AtomicU64, Ordering};
3use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
4use tokio::process::{Child, ChildStdin, ChildStdout, Command};
5
6use super::error::AgentNetworkClientError;
7use super::protocol;
8
9/// MCP relay client that communicates with a subprocess over stdio.
10pub struct AgentNetworkClient {
11    /// Child process handle.
12    child: Child,
13    /// Buffered writer to the child's stdin.
14    stdin: BufWriter<ChildStdin>,
15    /// Buffered reader from the child's stdout.
16    stdout: BufReader<ChildStdout>,
17    /// Monotonically increasing request ID counter.
18    request_id: AtomicU64,
19    /// Whether the initialize handshake has completed.
20    initialized: bool,
21}
22
23impl AgentNetworkClient {
24    /// Connect to a relay process using default MCP server arguments.
25    pub async fn connect(binary_path: &str) -> Result<Self, AgentNetworkClientError> {
26        Self::connect_with_args(binary_path, &["chat", "--mcp-server"]).await
27    }
28
29    /// Connect to a relay process with custom arguments.
30    pub async fn connect_with_args(
31        binary_path: &str,
32        args: &[&str],
33    ) -> Result<Self, AgentNetworkClientError> {
34        let mut child = Command::new(binary_path)
35            .args(args)
36            .stdin(std::process::Stdio::piped())
37            .stdout(std::process::Stdio::piped())
38            .stderr(std::process::Stdio::null())
39            .spawn()
40            .map_err(AgentNetworkClientError::SpawnFailed)?;
41
42        let stdin = child.stdin.take().ok_or_else(|| {
43            AgentNetworkClientError::Protocol("Failed to capture stdin".to_string())
44        })?;
45        let stdout = child.stdout.take().ok_or_else(|| {
46            AgentNetworkClientError::Protocol("Failed to capture stdout".to_string())
47        })?;
48
49        Ok(Self {
50            child,
51            stdin: BufWriter::new(stdin),
52            stdout: BufReader::new(stdout),
53            request_id: AtomicU64::new(1),
54            initialized: false,
55        })
56    }
57
58    fn next_id(&self) -> u64 {
59        self.request_id.fetch_add(1, Ordering::SeqCst)
60    }
61
62    /// Send a JSON-RPC request and read the response.
63    pub async fn send_request(
64        &mut self,
65        method: &str,
66        params: Option<serde_json::Value>,
67    ) -> Result<serde_json::Value, AgentNetworkClientError> {
68        let id = self.next_id();
69        let request = brainwires_mcp::JsonRpcRequest {
70            jsonrpc: "2.0".to_string(),
71            id: serde_json::json!(id),
72            method: method.to_string(),
73            params,
74        };
75
76        let json = serde_json::to_string(&request)?;
77        self.stdin
78            .write_all(format!("{json}\n").as_bytes())
79            .await
80            .map_err(AgentNetworkClientError::Io)?;
81        self.stdin
82            .flush()
83            .await
84            .map_err(AgentNetworkClientError::Io)?;
85
86        // Read response
87        let mut line = String::new();
88        let bytes = self
89            .stdout
90            .read_line(&mut line)
91            .await
92            .map_err(AgentNetworkClientError::Io)?;
93
94        if bytes == 0 {
95            return Err(AgentNetworkClientError::ProcessExited);
96        }
97
98        let response = protocol::parse_response(line.trim())?;
99        protocol::extract_result(response)
100    }
101
102    /// Perform the MCP initialize handshake with the relay process.
103    pub async fn initialize(&mut self) -> Result<serde_json::Value, AgentNetworkClientError> {
104        let id = self.next_id();
105        let request = protocol::build_initialize_request(id);
106
107        let json = serde_json::to_string(&request)?;
108        self.stdin
109            .write_all(format!("{json}\n").as_bytes())
110            .await
111            .map_err(AgentNetworkClientError::Io)?;
112        self.stdin
113            .flush()
114            .await
115            .map_err(AgentNetworkClientError::Io)?;
116
117        // Read initialize response
118        let mut line = String::new();
119        let bytes = self
120            .stdout
121            .read_line(&mut line)
122            .await
123            .map_err(AgentNetworkClientError::Io)?;
124
125        if bytes == 0 {
126            return Err(AgentNetworkClientError::ProcessExited);
127        }
128
129        let response = protocol::parse_response(line.trim())?;
130        let result = protocol::extract_result(response)?;
131
132        // Send initialized notification
133        let notif = protocol::build_initialized_notification();
134        self.stdin
135            .write_all(format!("{notif}\n").as_bytes())
136            .await
137            .map_err(AgentNetworkClientError::Io)?;
138        self.stdin
139            .flush()
140            .await
141            .map_err(AgentNetworkClientError::Io)?;
142
143        self.initialized = true;
144        Ok(result)
145    }
146
147    /// Call a tool on the relay server by name with the given arguments.
148    pub async fn call_tool(
149        &mut self,
150        name: &str,
151        args: serde_json::Value,
152    ) -> Result<serde_json::Value, AgentNetworkClientError> {
153        if !self.initialized {
154            return Err(AgentNetworkClientError::NotInitialized);
155        }
156
157        self.send_request(
158            "tools/call",
159            Some(serde_json::json!({
160                "name": name,
161                "arguments": args
162            })),
163        )
164        .await
165    }
166
167    /// List all tools available on the relay server.
168    pub async fn list_tools(&mut self) -> Result<serde_json::Value, AgentNetworkClientError> {
169        if !self.initialized {
170            return Err(AgentNetworkClientError::NotInitialized);
171        }
172        self.send_request("tools/list", None).await
173    }
174
175    /// Shut down the relay client and terminate the child process.
176    pub async fn shutdown(mut self) -> Result<(), AgentNetworkClientError> {
177        // Close stdin to signal EOF to the child process
178        drop(self.stdin);
179        // Wait for child to exit (with timeout)
180        let _ = tokio::time::timeout(std::time::Duration::from_secs(5), self.child.wait()).await;
181        Ok(())
182    }
183
184    /// Check whether the client has completed initialization.
185    pub fn is_initialized(&self) -> bool {
186        self.initialized
187    }
188}