Skip to main content

lean_ctx/
daemon_client.rs

1use anyhow::{Context, Result};
2use tokio::io::{AsyncReadExt, AsyncWriteExt};
3use tokio::net::UnixStream;
4
5use crate::daemon;
6
7/// Send an HTTP request to the daemon over the Unix Domain Socket.
8/// Returns the response body as a string.
9pub async fn daemon_request(method: &str, path: &str, body: &str) -> Result<String> {
10    use std::time::Duration;
11    use tokio::time::timeout;
12
13    const CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
14    const IO_TIMEOUT: Duration = Duration::from_secs(10);
15
16    let socket_path = daemon::daemon_socket_path();
17    if !socket_path.exists() {
18        anyhow::bail!(
19            "Daemon socket not found at {}. Is the daemon running?",
20            socket_path.display()
21        );
22    }
23
24    let mut stream = timeout(CONNECT_TIMEOUT, UnixStream::connect(&socket_path))
25        .await
26        .with_context(|| {
27            format!(
28                "connect to daemon timed out ({}s)",
29                CONNECT_TIMEOUT.as_secs()
30            )
31        })?
32        .with_context(|| format!("cannot connect to daemon at {}", socket_path.display()))?;
33
34    let request = format_http_request(method, path, body);
35    timeout(IO_TIMEOUT, stream.write_all(request.as_bytes()))
36        .await
37        .context("write to daemon timed out")?
38        .context("failed to write request to daemon socket")?;
39
40    let mut response_buf = Vec::with_capacity(4096);
41    timeout(IO_TIMEOUT, stream.read_to_end(&mut response_buf))
42        .await
43        .context("read from daemon timed out")?
44        .context("failed to read response from daemon")?;
45
46    parse_http_response(&response_buf)
47}
48
49/// Check if the daemon is reachable by hitting /health.
50pub async fn daemon_health_check() -> bool {
51    match daemon_request("GET", "/health", "").await {
52        Ok(body) => body.trim() == "ok",
53        Err(_) => false,
54    }
55}
56
57/// Call a tool on the daemon's REST API.
58pub async fn daemon_tool_call(name: &str, arguments: Option<&serde_json::Value>) -> Result<String> {
59    let body = serde_json::json!({
60        "name": name,
61        "arguments": arguments,
62    });
63    daemon_request("POST", "/v1/tools/call", &body.to_string()).await
64}
65
66fn format_http_request(method: &str, path: &str, body: &str) -> String {
67    if body.is_empty() {
68        format!("{method} {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
69    } else {
70        let content_length = body.len();
71        format!(
72            "{method} {path} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {content_length}\r\nConnection: close\r\n\r\n{body}"
73        )
74    }
75}
76
77fn parse_http_response(raw: &[u8]) -> Result<String> {
78    let response_str = std::str::from_utf8(raw).context("daemon response is not valid UTF-8")?;
79
80    let Some(header_end) = response_str.find("\r\n\r\n") else {
81        anyhow::bail!("malformed HTTP response from daemon (no header boundary)");
82    };
83
84    let headers = &response_str[..header_end];
85    let body = &response_str[header_end + 4..];
86
87    let status_line = headers.lines().next().unwrap_or("");
88    let status_code = status_line
89        .split_whitespace()
90        .nth(1)
91        .and_then(|s| s.parse::<u16>().ok())
92        .unwrap_or(0);
93
94    if status_code >= 400 {
95        anyhow::bail!("daemon returned HTTP {status_code}: {body}");
96    }
97
98    Ok(body.to_string())
99}
100
101/// Attempt to connect to the daemon. Returns `None` if not running.
102pub async fn try_daemon_request(method: &str, path: &str, body: &str) -> Option<String> {
103    if !daemon::is_daemon_running() {
104        return None;
105    }
106    daemon_request(method, path, body).await.ok()
107}
108
109/// Blocking helper for CLI commands: calls a daemon tool if the daemon is running.
110/// Returns `None` if the daemon is not running or the call fails.
111/// On Unix, attempts to auto-start the daemon if it's not already running.
112#[allow(clippy::needless_pass_by_value)]
113pub fn try_daemon_tool_call_blocking(
114    name: &str,
115    arguments: Option<serde_json::Value>,
116) -> Option<String> {
117    use std::time::Duration;
118
119    // Always create the runtime once per CLI call. We also use it for
120    // best-effort health checks while a daemon may be starting.
121    let rt = tokio::runtime::Runtime::new().ok()?;
122
123    let socket_path = daemon::daemon_socket_path();
124    let mut ready = socket_path.exists() && rt.block_on(async { daemon_health_check().await });
125
126    if !ready {
127        // SAFETY: Never auto-start the daemon from inside a hook subprocess.
128        // Hooks have a tight watchdog timeout; spawning a daemon would create
129        // orphan processes when the watchdog fires.
130        if std::env::var("LEAN_CTX_HOOK_CHILD").is_ok() {
131            return None;
132        }
133
134        #[cfg(unix)]
135        {
136            // is_daemon_running() now cleans stale PID/socket files when the
137            // PID is dead, so subsequent connect attempts won't hang on a
138            // stale socket.
139
140            let lock = crate::core::startup_guard::try_acquire_lock(
141                "daemon-start",
142                Duration::from_millis(1200),
143                Duration::from_secs(5),
144            );
145
146            if let Some(g) = lock {
147                g.touch();
148                let mut did_start = false;
149
150                if !daemon::is_daemon_running() {
151                    if daemon::start_daemon(&[]).is_ok() {
152                        did_start = true;
153                    } else {
154                        return None;
155                    }
156                }
157
158                // Max 3s readiness wait (60 × 50ms) — keeps CLI snappy.
159                for _ in 0..60 {
160                    if socket_path.exists() && rt.block_on(async { daemon_health_check().await }) {
161                        ready = true;
162                        break;
163                    }
164                    std::thread::sleep(Duration::from_millis(50));
165                }
166
167                if ready && did_start {
168                    eprintln!("\x1b[2mâ–¸ daemon auto-started\x1b[0m");
169                }
170            } else {
171                // Another process likely holds the start lock; wait briefly.
172                for _ in 0..60 {
173                    if socket_path.exists() && rt.block_on(async { daemon_health_check().await }) {
174                        ready = true;
175                        break;
176                    }
177                    std::thread::sleep(Duration::from_millis(50));
178                }
179            }
180        }
181        #[cfg(not(unix))]
182        {
183            return None;
184        }
185    }
186
187    if !ready {
188        return None;
189    }
190
191    if let Some(out) = rt.block_on(async { daemon_tool_call(name, arguments.as_ref()).await.ok() })
192    {
193        return Some(out);
194    }
195
196    // Brief retry — the first request can lose a race even after /health succeeds.
197    for _ in 0..5 {
198        std::thread::sleep(Duration::from_millis(50));
199        if let Some(out) =
200            rt.block_on(async { daemon_tool_call(name, arguments.as_ref()).await.ok() })
201        {
202            return Some(out);
203        }
204    }
205
206    None
207}
208
209fn unwrap_mcp_tool_text(body: &str) -> Option<String> {
210    let v: serde_json::Value = serde_json::from_str(body).ok()?;
211    let result = v.get("result")?;
212
213    if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
214        let mut texts: Vec<String> = Vec::new();
215        for item in content {
216            if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
217                if !text.is_empty() {
218                    texts.push(text.to_string());
219                }
220            }
221        }
222        if !texts.is_empty() {
223            return Some(texts.join("\n"));
224        }
225    }
226
227    if let Some(text) = result.get("text").and_then(|t| t.as_str()) {
228        return Some(text.to_string());
229    }
230
231    result.as_str().map(std::string::ToString::to_string)
232}
233
234/// Like `try_daemon_tool_call_blocking`, but unwraps MCP JSON responses to text for CLI output.
235pub fn try_daemon_tool_call_blocking_text(
236    name: &str,
237    arguments: Option<serde_json::Value>,
238) -> Option<String> {
239    let body = try_daemon_tool_call_blocking(name, arguments)?;
240    let trimmed = body.trim_start();
241    if !trimmed.starts_with('{') {
242        return Some(body);
243    }
244    Some(unwrap_mcp_tool_text(&body).unwrap_or(body))
245}