Skip to main content

lean_ctx/
daemon_client.rs

1use anyhow::{Context, Result};
2use tokio::io::{AsyncReadExt, AsyncWriteExt};
3use tokio::net::UnixStream;
4
5use crate::daemon;
6
7/// Send an HTTP request to the daemon over the Unix Domain Socket.
8/// Returns the response body as a string.
9pub async fn daemon_request(method: &str, path: &str, body: &str) -> Result<String> {
10    let socket_path = daemon::daemon_socket_path();
11    if !socket_path.exists() {
12        anyhow::bail!(
13            "Daemon socket not found at {}. Is the daemon running?",
14            socket_path.display()
15        );
16    }
17
18    let mut stream = UnixStream::connect(&socket_path)
19        .await
20        .with_context(|| format!("cannot connect to daemon at {}", socket_path.display()))?;
21
22    let request = format_http_request(method, path, body);
23    stream
24        .write_all(request.as_bytes())
25        .await
26        .context("failed to write request to daemon socket")?;
27
28    let mut response_buf = Vec::with_capacity(4096);
29    stream
30        .read_to_end(&mut response_buf)
31        .await
32        .context("failed to read response from daemon")?;
33
34    parse_http_response(&response_buf)
35}
36
37/// Check if the daemon is reachable by hitting /health.
38pub async fn daemon_health_check() -> bool {
39    match daemon_request("GET", "/health", "").await {
40        Ok(body) => body.trim() == "ok",
41        Err(_) => false,
42    }
43}
44
45/// Call a tool on the daemon's REST API.
46pub async fn daemon_tool_call(name: &str, arguments: Option<&serde_json::Value>) -> Result<String> {
47    let body = serde_json::json!({
48        "name": name,
49        "arguments": arguments,
50    });
51    daemon_request("POST", "/v1/tools/call", &body.to_string()).await
52}
53
54fn format_http_request(method: &str, path: &str, body: &str) -> String {
55    if body.is_empty() {
56        format!("{method} {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
57    } else {
58        let content_length = body.len();
59        format!(
60            "{method} {path} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {content_length}\r\nConnection: close\r\n\r\n{body}"
61        )
62    }
63}
64
65fn parse_http_response(raw: &[u8]) -> Result<String> {
66    let response_str = std::str::from_utf8(raw).context("daemon response is not valid UTF-8")?;
67
68    let Some(header_end) = response_str.find("\r\n\r\n") else {
69        anyhow::bail!("malformed HTTP response from daemon (no header boundary)");
70    };
71
72    let headers = &response_str[..header_end];
73    let body = &response_str[header_end + 4..];
74
75    let status_line = headers.lines().next().unwrap_or("");
76    let status_code = status_line
77        .split_whitespace()
78        .nth(1)
79        .and_then(|s| s.parse::<u16>().ok())
80        .unwrap_or(0);
81
82    if status_code >= 400 {
83        anyhow::bail!("daemon returned HTTP {status_code}: {body}");
84    }
85
86    Ok(body.to_string())
87}
88
89/// Attempt to connect to the daemon. Returns `None` if not running.
90pub async fn try_daemon_request(method: &str, path: &str, body: &str) -> Option<String> {
91    if !daemon::is_daemon_running() {
92        return None;
93    }
94    daemon_request(method, path, body).await.ok()
95}
96
97/// Blocking helper for CLI commands: calls a daemon tool if the daemon is running.
98/// Returns `None` if the daemon is not running or the call fails.
99/// On Unix, attempts to auto-start the daemon if it's not already running.
100#[allow(clippy::needless_pass_by_value)]
101pub fn try_daemon_tool_call_blocking(
102    name: &str,
103    arguments: Option<serde_json::Value>,
104) -> Option<String> {
105    use std::time::Duration;
106
107    // Always create the runtime once per CLI call. We also use it for
108    // best-effort health checks while a daemon may be starting.
109    let rt = tokio::runtime::Runtime::new().ok()?;
110
111    let socket_path = daemon::daemon_socket_path();
112    let mut ready = socket_path.exists() && rt.block_on(async { daemon_health_check().await });
113
114    if !ready {
115        // SAFETY: Never auto-start the daemon from inside a hook subprocess.
116        // Hooks have a tight watchdog timeout; spawning a daemon would create
117        // orphan processes when the watchdog fires.
118        if std::env::var("LEAN_CTX_HOOK_CHILD").is_ok() {
119            return None;
120        }
121
122        #[cfg(unix)]
123        {
124            // Prevent double-daemon races when multiple CLI commands auto-start concurrently.
125            // One process starts; others wait briefly for readiness.
126            let lock = crate::core::startup_guard::try_acquire_lock(
127                "daemon-start",
128                Duration::from_millis(1200),
129                Duration::from_secs(8),
130            );
131
132            if let Some(g) = lock {
133                g.touch();
134                let mut did_start = false;
135
136                // If a daemon process exists but isn't ready yet, don't try to start a second
137                // one (daemon::start_daemon would bail). Just wait for readiness.
138                if !daemon::is_daemon_running() {
139                    if daemon::start_daemon(&[]).is_ok() {
140                        did_start = true;
141                    } else {
142                        return None;
143                    }
144                }
145
146                // Wait for readiness (socket + /health).
147                for _ in 0..240 {
148                    if socket_path.exists() && rt.block_on(async { daemon_health_check().await }) {
149                        ready = true;
150                        break;
151                    }
152                    std::thread::sleep(Duration::from_millis(50));
153                }
154
155                if ready && did_start {
156                    eprintln!("\x1b[2m▸ daemon auto-started\x1b[0m");
157                }
158            } else {
159                // Another process likely holds the start lock; wait briefly for readiness.
160                for _ in 0..240 {
161                    if socket_path.exists() && rt.block_on(async { daemon_health_check().await }) {
162                        ready = true;
163                        break;
164                    }
165                    std::thread::sleep(Duration::from_millis(50));
166                }
167            }
168        }
169        #[cfg(not(unix))]
170        {
171            return None;
172        }
173    }
174
175    if !ready {
176        return None;
177    }
178
179    if let Some(out) = rt.block_on(async { daemon_tool_call(name, arguments.as_ref()).await.ok() })
180    {
181        return Some(out);
182    }
183
184    // If the daemon is starting up, the first request can still lose a race even after /health
185    // briefly succeeds. Retry once after a short wait.
186    for _ in 0..20 {
187        std::thread::sleep(Duration::from_millis(50));
188        if let Some(out) =
189            rt.block_on(async { daemon_tool_call(name, arguments.as_ref()).await.ok() })
190        {
191            return Some(out);
192        }
193    }
194
195    None
196}
197
198fn unwrap_mcp_tool_text(body: &str) -> Option<String> {
199    let v: serde_json::Value = serde_json::from_str(body).ok()?;
200    let result = v.get("result")?;
201
202    if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
203        let mut texts: Vec<String> = Vec::new();
204        for item in content {
205            if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
206                if !text.is_empty() {
207                    texts.push(text.to_string());
208                }
209            }
210        }
211        if !texts.is_empty() {
212            return Some(texts.join("\n"));
213        }
214    }
215
216    if let Some(text) = result.get("text").and_then(|t| t.as_str()) {
217        return Some(text.to_string());
218    }
219
220    result.as_str().map(std::string::ToString::to_string)
221}
222
223/// Like `try_daemon_tool_call_blocking`, but unwraps MCP JSON responses to text for CLI output.
224pub fn try_daemon_tool_call_blocking_text(
225    name: &str,
226    arguments: Option<serde_json::Value>,
227) -> Option<String> {
228    let body = try_daemon_tool_call_blocking(name, arguments)?;
229    let trimmed = body.trim_start();
230    if !trimmed.starts_with('{') {
231        return Some(body);
232    }
233    Some(unwrap_mcp_tool_text(&body).unwrap_or(body))
234}