lean_ctx/
daemon_client.rs1use anyhow::{Context, Result};
2use tokio::io::{AsyncReadExt, AsyncWriteExt};
3use tokio::net::UnixStream;
4
5use crate::daemon;
6
7pub async fn daemon_request(method: &str, path: &str, body: &str) -> Result<String> {
10 let socket_path = daemon::daemon_socket_path();
11 if !socket_path.exists() {
12 anyhow::bail!(
13 "Daemon socket not found at {}. Is the daemon running?",
14 socket_path.display()
15 );
16 }
17
18 let mut stream = UnixStream::connect(&socket_path)
19 .await
20 .with_context(|| format!("cannot connect to daemon at {}", socket_path.display()))?;
21
22 let request = format_http_request(method, path, body);
23 stream
24 .write_all(request.as_bytes())
25 .await
26 .context("failed to write request to daemon socket")?;
27
28 let mut response_buf = Vec::with_capacity(4096);
29 stream
30 .read_to_end(&mut response_buf)
31 .await
32 .context("failed to read response from daemon")?;
33
34 parse_http_response(&response_buf)
35}
36
37pub async fn daemon_health_check() -> bool {
39 match daemon_request("GET", "/health", "").await {
40 Ok(body) => body.trim() == "ok",
41 Err(_) => false,
42 }
43}
44
45pub async fn daemon_tool_call(name: &str, arguments: Option<&serde_json::Value>) -> Result<String> {
47 let body = serde_json::json!({
48 "name": name,
49 "arguments": arguments,
50 });
51 daemon_request("POST", "/v1/tools/call", &body.to_string()).await
52}
53
54fn format_http_request(method: &str, path: &str, body: &str) -> String {
55 if body.is_empty() {
56 format!("{method} {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
57 } else {
58 let content_length = body.len();
59 format!(
60 "{method} {path} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {content_length}\r\nConnection: close\r\n\r\n{body}"
61 )
62 }
63}
64
65fn parse_http_response(raw: &[u8]) -> Result<String> {
66 let response_str = std::str::from_utf8(raw).context("daemon response is not valid UTF-8")?;
67
68 let Some(header_end) = response_str.find("\r\n\r\n") else {
69 anyhow::bail!("malformed HTTP response from daemon (no header boundary)");
70 };
71
72 let headers = &response_str[..header_end];
73 let body = &response_str[header_end + 4..];
74
75 let status_line = headers.lines().next().unwrap_or("");
76 let status_code = status_line
77 .split_whitespace()
78 .nth(1)
79 .and_then(|s| s.parse::<u16>().ok())
80 .unwrap_or(0);
81
82 if status_code >= 400 {
83 anyhow::bail!("daemon returned HTTP {status_code}: {body}");
84 }
85
86 Ok(body.to_string())
87}
88
89pub async fn try_daemon_request(method: &str, path: &str, body: &str) -> Option<String> {
91 if !daemon::is_daemon_running() {
92 return None;
93 }
94 daemon_request(method, path, body).await.ok()
95}
96
97#[allow(clippy::needless_pass_by_value)]
101pub fn try_daemon_tool_call_blocking(
102 name: &str,
103 arguments: Option<serde_json::Value>,
104) -> Option<String> {
105 use std::time::Duration;
106
107 let rt = tokio::runtime::Runtime::new().ok()?;
110
111 let socket_path = daemon::daemon_socket_path();
112 let mut ready = socket_path.exists() && rt.block_on(async { daemon_health_check().await });
113
114 if !ready {
115 if std::env::var("LEAN_CTX_HOOK_CHILD").is_ok() {
119 return None;
120 }
121
122 #[cfg(unix)]
123 {
124 let lock = crate::core::startup_guard::try_acquire_lock(
127 "daemon-start",
128 Duration::from_millis(1200),
129 Duration::from_secs(8),
130 );
131
132 if let Some(g) = lock {
133 g.touch();
134 let mut did_start = false;
135
136 if !daemon::is_daemon_running() {
139 if daemon::start_daemon(&[]).is_ok() {
140 did_start = true;
141 } else {
142 return None;
143 }
144 }
145
146 for _ in 0..240 {
148 if socket_path.exists() && rt.block_on(async { daemon_health_check().await }) {
149 ready = true;
150 break;
151 }
152 std::thread::sleep(Duration::from_millis(50));
153 }
154
155 if ready && did_start {
156 eprintln!("\x1b[2m▸ daemon auto-started\x1b[0m");
157 }
158 } else {
159 for _ in 0..240 {
161 if socket_path.exists() && rt.block_on(async { daemon_health_check().await }) {
162 ready = true;
163 break;
164 }
165 std::thread::sleep(Duration::from_millis(50));
166 }
167 }
168 }
169 #[cfg(not(unix))]
170 {
171 return None;
172 }
173 }
174
175 if !ready {
176 return None;
177 }
178
179 if let Some(out) = rt.block_on(async { daemon_tool_call(name, arguments.as_ref()).await.ok() })
180 {
181 return Some(out);
182 }
183
184 for _ in 0..20 {
187 std::thread::sleep(Duration::from_millis(50));
188 if let Some(out) =
189 rt.block_on(async { daemon_tool_call(name, arguments.as_ref()).await.ok() })
190 {
191 return Some(out);
192 }
193 }
194
195 None
196}
197
198fn unwrap_mcp_tool_text(body: &str) -> Option<String> {
199 let v: serde_json::Value = serde_json::from_str(body).ok()?;
200 let result = v.get("result")?;
201
202 if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
203 let mut texts: Vec<String> = Vec::new();
204 for item in content {
205 if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
206 if !text.is_empty() {
207 texts.push(text.to_string());
208 }
209 }
210 }
211 if !texts.is_empty() {
212 return Some(texts.join("\n"));
213 }
214 }
215
216 if let Some(text) = result.get("text").and_then(|t| t.as_str()) {
217 return Some(text.to_string());
218 }
219
220 result.as_str().map(std::string::ToString::to_string)
221}
222
223pub fn try_daemon_tool_call_blocking_text(
225 name: &str,
226 arguments: Option<serde_json::Value>,
227) -> Option<String> {
228 let body = try_daemon_tool_call_blocking(name, arguments)?;
229 let trimmed = body.trim_start();
230 if !trimmed.starts_with('{') {
231 return Some(body);
232 }
233 Some(unwrap_mcp_tool_text(&body).unwrap_or(body))
234}