lean_ctx/
daemon_client.rs1use anyhow::{Context, Result};
2use tokio::io::{AsyncReadExt, AsyncWriteExt};
3use tokio::net::UnixStream;
4
5use crate::daemon;
6
7pub async fn daemon_request(method: &str, path: &str, body: &str) -> Result<String> {
10 use std::time::Duration;
11 use tokio::time::timeout;
12
13 const CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
14 const IO_TIMEOUT: Duration = Duration::from_secs(10);
15
16 let socket_path = daemon::daemon_socket_path();
17 if !socket_path.exists() {
18 anyhow::bail!(
19 "Daemon socket not found at {}. Is the daemon running?",
20 socket_path.display()
21 );
22 }
23
24 let mut stream = timeout(CONNECT_TIMEOUT, UnixStream::connect(&socket_path))
25 .await
26 .with_context(|| {
27 format!(
28 "connect to daemon timed out ({}s)",
29 CONNECT_TIMEOUT.as_secs()
30 )
31 })?
32 .with_context(|| format!("cannot connect to daemon at {}", socket_path.display()))?;
33
34 let request = format_http_request(method, path, body);
35 timeout(IO_TIMEOUT, stream.write_all(request.as_bytes()))
36 .await
37 .context("write to daemon timed out")?
38 .context("failed to write request to daemon socket")?;
39
40 let mut response_buf = Vec::with_capacity(4096);
41 timeout(IO_TIMEOUT, stream.read_to_end(&mut response_buf))
42 .await
43 .context("read from daemon timed out")?
44 .context("failed to read response from daemon")?;
45
46 parse_http_response(&response_buf)
47}
48
49pub async fn daemon_health_check() -> bool {
51 match daemon_request("GET", "/health", "").await {
52 Ok(body) => body.trim() == "ok",
53 Err(_) => false,
54 }
55}
56
57pub async fn daemon_tool_call(name: &str, arguments: Option<&serde_json::Value>) -> Result<String> {
59 let body = serde_json::json!({
60 "name": name,
61 "arguments": arguments,
62 });
63 daemon_request("POST", "/v1/tools/call", &body.to_string()).await
64}
65
66fn format_http_request(method: &str, path: &str, body: &str) -> String {
67 if body.is_empty() {
68 format!("{method} {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
69 } else {
70 let content_length = body.len();
71 format!(
72 "{method} {path} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {content_length}\r\nConnection: close\r\n\r\n{body}"
73 )
74 }
75}
76
77fn parse_http_response(raw: &[u8]) -> Result<String> {
78 let response_str = std::str::from_utf8(raw).context("daemon response is not valid UTF-8")?;
79
80 let Some(header_end) = response_str.find("\r\n\r\n") else {
81 anyhow::bail!("malformed HTTP response from daemon (no header boundary)");
82 };
83
84 let headers = &response_str[..header_end];
85 let body = &response_str[header_end + 4..];
86
87 let status_line = headers.lines().next().unwrap_or("");
88 let status_code = status_line
89 .split_whitespace()
90 .nth(1)
91 .and_then(|s| s.parse::<u16>().ok())
92 .unwrap_or(0);
93
94 if status_code >= 400 {
95 anyhow::bail!("daemon returned HTTP {status_code}: {body}");
96 }
97
98 Ok(body.to_string())
99}
100
101pub async fn try_daemon_request(method: &str, path: &str, body: &str) -> Option<String> {
103 if !daemon::is_daemon_running() {
104 return None;
105 }
106 daemon_request(method, path, body).await.ok()
107}
108
109#[allow(clippy::needless_pass_by_value)]
113pub fn try_daemon_tool_call_blocking(
114 name: &str,
115 arguments: Option<serde_json::Value>,
116) -> Option<String> {
117 use std::time::Duration;
118
119 let rt = tokio::runtime::Runtime::new().ok()?;
122
123 let socket_path = daemon::daemon_socket_path();
124 let mut ready = socket_path.exists() && rt.block_on(async { daemon_health_check().await });
125
126 if !ready {
127 if std::env::var("LEAN_CTX_HOOK_CHILD").is_ok() {
131 return None;
132 }
133
134 #[cfg(unix)]
135 {
136 let lock = crate::core::startup_guard::try_acquire_lock(
141 "daemon-start",
142 Duration::from_millis(1200),
143 Duration::from_secs(5),
144 );
145
146 if let Some(g) = lock {
147 g.touch();
148 let mut did_start = false;
149
150 if !daemon::is_daemon_running() {
151 if daemon::start_daemon(&[]).is_ok() {
152 did_start = true;
153 } else {
154 return None;
155 }
156 }
157
158 for _ in 0..60 {
160 if socket_path.exists() && rt.block_on(async { daemon_health_check().await }) {
161 ready = true;
162 break;
163 }
164 std::thread::sleep(Duration::from_millis(50));
165 }
166
167 if ready && did_start {
168 eprintln!("\x1b[2mâ–¸ daemon auto-started\x1b[0m");
169 }
170 } else {
171 for _ in 0..60 {
173 if socket_path.exists() && rt.block_on(async { daemon_health_check().await }) {
174 ready = true;
175 break;
176 }
177 std::thread::sleep(Duration::from_millis(50));
178 }
179 }
180 }
181 #[cfg(not(unix))]
182 {
183 return None;
184 }
185 }
186
187 if !ready {
188 return None;
189 }
190
191 if let Some(out) = rt.block_on(async { daemon_tool_call(name, arguments.as_ref()).await.ok() })
192 {
193 return Some(out);
194 }
195
196 for _ in 0..5 {
198 std::thread::sleep(Duration::from_millis(50));
199 if let Some(out) =
200 rt.block_on(async { daemon_tool_call(name, arguments.as_ref()).await.ok() })
201 {
202 return Some(out);
203 }
204 }
205
206 None
207}
208
209fn unwrap_mcp_tool_text(body: &str) -> Option<String> {
210 let v: serde_json::Value = serde_json::from_str(body).ok()?;
211 let result = v.get("result")?;
212
213 if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
214 let mut texts: Vec<String> = Vec::new();
215 for item in content {
216 if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
217 if !text.is_empty() {
218 texts.push(text.to_string());
219 }
220 }
221 }
222 if !texts.is_empty() {
223 return Some(texts.join("\n"));
224 }
225 }
226
227 if let Some(text) = result.get("text").and_then(|t| t.as_str()) {
228 return Some(text.to_string());
229 }
230
231 result.as_str().map(std::string::ToString::to_string)
232}
233
234pub fn try_daemon_tool_call_blocking_text(
236 name: &str,
237 arguments: Option<serde_json::Value>,
238) -> Option<String> {
239 let body = try_daemon_tool_call_blocking(name, arguments)?;
240 let trimmed = body.trim_start();
241 if !trimmed.starts_with('{') {
242 return Some(body);
243 }
244 Some(unwrap_mcp_tool_text(&body).unwrap_or(body))
245}