lean_ctx/
daemon_client.rs1use anyhow::{Context, Result};
2use tokio::io::{AsyncReadExt, AsyncWriteExt};
3
4use crate::daemon;
5use crate::ipc;
6
7pub async fn daemon_request(method: &str, path: &str, body: &str) -> Result<String> {
10 use std::time::Duration;
11 use tokio::time::timeout;
12
13 const CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
14 const IO_TIMEOUT: Duration = Duration::from_secs(10);
15
16 let addr = daemon::daemon_addr();
17 if !addr.is_listening() {
18 anyhow::bail!(
19 "Daemon endpoint not found at {}. Is the daemon running?",
20 addr.display()
21 );
22 }
23
24 let request = format_http_request(method, path, body);
25
26 #[cfg(unix)]
27 {
28 let mut stream = timeout(CONNECT_TIMEOUT, ipc::connect(&addr))
29 .await
30 .with_context(|| {
31 format!(
32 "connect to daemon timed out ({}s)",
33 CONNECT_TIMEOUT.as_secs()
34 )
35 })?
36 .with_context(|| format!("cannot connect to daemon at {}", addr.display()))?;
37
38 timeout(IO_TIMEOUT, stream.write_all(request.as_bytes()))
39 .await
40 .context("write to daemon timed out")?
41 .context("failed to write request to daemon")?;
42
43 let mut response_buf = Vec::with_capacity(4096);
44 timeout(IO_TIMEOUT, stream.read_to_end(&mut response_buf))
45 .await
46 .context("read from daemon timed out")?
47 .context("failed to read response from daemon")?;
48
49 parse_http_response(&response_buf)
50 }
51
52 #[cfg(windows)]
53 {
54 let mut stream = timeout(CONNECT_TIMEOUT, ipc::connect(&addr))
55 .await
56 .with_context(|| {
57 format!(
58 "connect to daemon timed out ({}s)",
59 CONNECT_TIMEOUT.as_secs()
60 )
61 })?
62 .with_context(|| format!("cannot connect to daemon at {}", addr.display()))?;
63
64 timeout(IO_TIMEOUT, stream.write_all(request.as_bytes()))
65 .await
66 .context("write to daemon timed out")?
67 .context("failed to write request to daemon")?;
68
69 let mut response_buf = Vec::with_capacity(4096);
70 timeout(IO_TIMEOUT, stream.read_to_end(&mut response_buf))
71 .await
72 .context("read from daemon timed out")?
73 .context("failed to read response from daemon")?;
74
75 parse_http_response(&response_buf)
76 }
77}
78
79pub async fn daemon_health_check() -> bool {
81 match daemon_request("GET", "/health", "").await {
82 Ok(body) => body.trim() == "ok",
83 Err(_) => false,
84 }
85}
86
87pub async fn daemon_tool_call(name: &str, arguments: Option<&serde_json::Value>) -> Result<String> {
89 let body = serde_json::json!({
90 "name": name,
91 "arguments": arguments,
92 });
93 daemon_request("POST", "/v1/tools/call", &body.to_string()).await
94}
95
96fn format_http_request(method: &str, path: &str, body: &str) -> String {
97 if body.is_empty() {
98 format!("{method} {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
99 } else {
100 let content_length = body.len();
101 format!(
102 "{method} {path} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {content_length}\r\nConnection: close\r\n\r\n{body}"
103 )
104 }
105}
106
107fn parse_http_response(raw: &[u8]) -> Result<String> {
108 let response_str = std::str::from_utf8(raw).context("daemon response is not valid UTF-8")?;
109
110 let Some(header_end) = response_str.find("\r\n\r\n") else {
111 anyhow::bail!("malformed HTTP response from daemon (no header boundary)");
112 };
113
114 let headers = &response_str[..header_end];
115 let body = &response_str[header_end + 4..];
116
117 let status_line = headers.lines().next().unwrap_or("");
118 let status_code = status_line
119 .split_whitespace()
120 .nth(1)
121 .and_then(|s| s.parse::<u16>().ok())
122 .unwrap_or(0);
123
124 if status_code >= 400 {
125 anyhow::bail!("daemon returned HTTP {status_code}: {body}");
126 }
127
128 Ok(body.to_string())
129}
130
131pub async fn try_daemon_request(method: &str, path: &str, body: &str) -> Option<String> {
133 if !daemon::is_daemon_running() {
134 return None;
135 }
136 daemon_request(method, path, body).await.ok()
137}
138
139#[allow(clippy::needless_pass_by_value)]
143pub fn try_daemon_tool_call_blocking(
144 name: &str,
145 arguments: Option<serde_json::Value>,
146) -> Option<String> {
147 use std::time::Duration;
148
149 let rt = tokio::runtime::Runtime::new().ok()?;
150
151 let addr = daemon::daemon_addr();
152 let mut ready = addr.is_listening() && rt.block_on(async { daemon_health_check().await });
153
154 if !ready {
155 if std::env::var("LEAN_CTX_HOOK_CHILD").is_ok() {
156 return None;
157 }
158
159 let lock = crate::core::startup_guard::try_acquire_lock(
160 "daemon-start",
161 Duration::from_millis(1200),
162 Duration::from_secs(5),
163 );
164
165 if let Some(g) = lock {
166 g.touch();
167 let mut did_start = false;
168
169 if !daemon::is_daemon_running() {
170 if daemon::start_daemon(&[]).is_ok() {
171 did_start = true;
172 } else {
173 return None;
174 }
175 }
176
177 for _ in 0..60 {
178 if addr.is_listening() && rt.block_on(async { daemon_health_check().await }) {
179 ready = true;
180 break;
181 }
182 std::thread::sleep(Duration::from_millis(50));
183 }
184
185 if ready && did_start && crate::core::protocol::meta_visible() {
186 eprintln!("\x1b[2m▸ daemon auto-started\x1b[0m");
187 }
188 } else {
189 for _ in 0..60 {
190 if addr.is_listening() && rt.block_on(async { daemon_health_check().await }) {
191 ready = true;
192 break;
193 }
194 std::thread::sleep(Duration::from_millis(50));
195 }
196 }
197 }
198
199 if !ready {
200 return None;
201 }
202
203 if let Some(out) = rt.block_on(async { daemon_tool_call(name, arguments.as_ref()).await.ok() })
204 {
205 return Some(out);
206 }
207
208 for _ in 0..5 {
209 std::thread::sleep(Duration::from_millis(50));
210 if let Some(out) =
211 rt.block_on(async { daemon_tool_call(name, arguments.as_ref()).await.ok() })
212 {
213 return Some(out);
214 }
215 }
216
217 None
218}
219
220fn unwrap_mcp_tool_text(body: &str) -> Option<String> {
221 let v: serde_json::Value = serde_json::from_str(body).ok()?;
222 let result = v.get("result")?;
223
224 if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
225 let mut texts: Vec<String> = Vec::new();
226 for item in content {
227 if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
228 if !text.is_empty() {
229 texts.push(text.to_string());
230 }
231 }
232 }
233 if !texts.is_empty() {
234 return Some(texts.join("\n"));
235 }
236 }
237
238 if let Some(text) = result.get("text").and_then(|t| t.as_str()) {
239 return Some(text.to_string());
240 }
241
242 result.as_str().map(std::string::ToString::to_string)
243}
244
245pub fn try_daemon_tool_call_blocking_text(
247 name: &str,
248 arguments: Option<serde_json::Value>,
249) -> Option<String> {
250 let body = try_daemon_tool_call_blocking(name, arguments)?;
251 let trimmed = body.trim_start();
252 if !trimmed.starts_with('{') {
253 return Some(body);
254 }
255 Some(unwrap_mcp_tool_text(&body).unwrap_or(body))
256}