Skip to main content

lean_ctx/
daemon_client.rs

1use anyhow::{Context, Result};
2use tokio::io::{AsyncReadExt, AsyncWriteExt};
3
4use crate::daemon;
5use crate::ipc;
6
7/// Send an HTTP request to the daemon over the IPC channel.
8/// Returns the response body as a string.
9pub async fn daemon_request(method: &str, path: &str, body: &str) -> Result<String> {
10    use std::time::Duration;
11    use tokio::time::timeout;
12
13    const CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
14    const IO_TIMEOUT: Duration = Duration::from_secs(10);
15
16    let addr = daemon::daemon_addr();
17    if !addr.is_listening() {
18        anyhow::bail!(
19            "Daemon endpoint not found at {}. Is the daemon running?",
20            addr.display()
21        );
22    }
23
24    let request = format_http_request(method, path, body);
25
26    #[cfg(unix)]
27    {
28        let mut stream = timeout(CONNECT_TIMEOUT, ipc::connect(&addr))
29            .await
30            .with_context(|| {
31                format!(
32                    "connect to daemon timed out ({}s)",
33                    CONNECT_TIMEOUT.as_secs()
34                )
35            })?
36            .with_context(|| format!("cannot connect to daemon at {}", addr.display()))?;
37
38        timeout(IO_TIMEOUT, stream.write_all(request.as_bytes()))
39            .await
40            .context("write to daemon timed out")?
41            .context("failed to write request to daemon")?;
42
43        let mut response_buf = Vec::with_capacity(4096);
44        timeout(IO_TIMEOUT, stream.read_to_end(&mut response_buf))
45            .await
46            .context("read from daemon timed out")?
47            .context("failed to read response from daemon")?;
48
49        parse_http_response(&response_buf)
50    }
51
52    #[cfg(windows)]
53    {
54        let mut stream = timeout(CONNECT_TIMEOUT, ipc::connect(&addr))
55            .await
56            .with_context(|| {
57                format!(
58                    "connect to daemon timed out ({}s)",
59                    CONNECT_TIMEOUT.as_secs()
60                )
61            })?
62            .with_context(|| format!("cannot connect to daemon at {}", addr.display()))?;
63
64        timeout(IO_TIMEOUT, stream.write_all(request.as_bytes()))
65            .await
66            .context("write to daemon timed out")?
67            .context("failed to write request to daemon")?;
68
69        let mut response_buf = Vec::with_capacity(4096);
70        timeout(IO_TIMEOUT, stream.read_to_end(&mut response_buf))
71            .await
72            .context("read from daemon timed out")?
73            .context("failed to read response from daemon")?;
74
75        parse_http_response(&response_buf)
76    }
77}
78
79/// Check if the daemon is reachable by hitting /health.
80pub async fn daemon_health_check() -> bool {
81    match daemon_request("GET", "/health", "").await {
82        Ok(body) => body.trim() == "ok",
83        Err(_) => false,
84    }
85}
86
87/// Call a tool on the daemon's REST API.
88pub async fn daemon_tool_call(name: &str, arguments: Option<&serde_json::Value>) -> Result<String> {
89    let body = serde_json::json!({
90        "name": name,
91        "arguments": arguments,
92    });
93    daemon_request("POST", "/v1/tools/call", &body.to_string()).await
94}
95
96fn format_http_request(method: &str, path: &str, body: &str) -> String {
97    if body.is_empty() {
98        format!("{method} {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
99    } else {
100        let content_length = body.len();
101        format!(
102            "{method} {path} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {content_length}\r\nConnection: close\r\n\r\n{body}"
103        )
104    }
105}
106
107fn parse_http_response(raw: &[u8]) -> Result<String> {
108    let response_str = std::str::from_utf8(raw).context("daemon response is not valid UTF-8")?;
109
110    let Some(header_end) = response_str.find("\r\n\r\n") else {
111        anyhow::bail!("malformed HTTP response from daemon (no header boundary)");
112    };
113
114    let headers = &response_str[..header_end];
115    let body = &response_str[header_end + 4..];
116
117    let status_line = headers.lines().next().unwrap_or("");
118    let status_code = status_line
119        .split_whitespace()
120        .nth(1)
121        .and_then(|s| s.parse::<u16>().ok())
122        .unwrap_or(0);
123
124    if status_code >= 400 {
125        anyhow::bail!("daemon returned HTTP {status_code}: {body}");
126    }
127
128    Ok(body.to_string())
129}
130
131/// Attempt to connect to the daemon. Returns `None` if not running.
132pub async fn try_daemon_request(method: &str, path: &str, body: &str) -> Option<String> {
133    if !daemon::is_daemon_running() {
134        return None;
135    }
136    daemon_request(method, path, body).await.ok()
137}
138
139/// Blocking helper for CLI commands: calls a daemon tool if the daemon is running.
140/// Returns `None` if the daemon is not running or the call fails.
141/// Attempts to auto-start the daemon if it's not already running.
142#[allow(clippy::needless_pass_by_value)]
143pub fn try_daemon_tool_call_blocking(
144    name: &str,
145    arguments: Option<serde_json::Value>,
146) -> Option<String> {
147    use std::time::Duration;
148
149    let rt = tokio::runtime::Runtime::new().ok()?;
150
151    let addr = daemon::daemon_addr();
152    let mut ready = addr.is_listening() && rt.block_on(async { daemon_health_check().await });
153
154    if !ready {
155        if std::env::var("LEAN_CTX_HOOK_CHILD").is_ok() {
156            return None;
157        }
158
159        let lock = crate::core::startup_guard::try_acquire_lock(
160            "daemon-start",
161            Duration::from_millis(1200),
162            Duration::from_secs(5),
163        );
164
165        if let Some(g) = lock {
166            g.touch();
167            let mut did_start = false;
168
169            if !daemon::is_daemon_running() {
170                if daemon::start_daemon(&[]).is_ok() {
171                    did_start = true;
172                } else {
173                    return None;
174                }
175            }
176
177            for _ in 0..60 {
178                if addr.is_listening() && rt.block_on(async { daemon_health_check().await }) {
179                    ready = true;
180                    break;
181                }
182                std::thread::sleep(Duration::from_millis(50));
183            }
184
185            if ready && did_start {
186                eprintln!("\x1b[2m▸ daemon auto-started\x1b[0m");
187            }
188        } else {
189            for _ in 0..60 {
190                if addr.is_listening() && rt.block_on(async { daemon_health_check().await }) {
191                    ready = true;
192                    break;
193                }
194                std::thread::sleep(Duration::from_millis(50));
195            }
196        }
197    }
198
199    if !ready {
200        return None;
201    }
202
203    if let Some(out) = rt.block_on(async { daemon_tool_call(name, arguments.as_ref()).await.ok() })
204    {
205        return Some(out);
206    }
207
208    for _ in 0..5 {
209        std::thread::sleep(Duration::from_millis(50));
210        if let Some(out) =
211            rt.block_on(async { daemon_tool_call(name, arguments.as_ref()).await.ok() })
212        {
213            return Some(out);
214        }
215    }
216
217    None
218}
219
220fn unwrap_mcp_tool_text(body: &str) -> Option<String> {
221    let v: serde_json::Value = serde_json::from_str(body).ok()?;
222    let result = v.get("result")?;
223
224    if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
225        let mut texts: Vec<String> = Vec::new();
226        for item in content {
227            if let Some(text) = item.get("text").and_then(|t| t.as_str()) {
228                if !text.is_empty() {
229                    texts.push(text.to_string());
230                }
231            }
232        }
233        if !texts.is_empty() {
234            return Some(texts.join("\n"));
235        }
236    }
237
238    if let Some(text) = result.get("text").and_then(|t| t.as_str()) {
239        return Some(text.to_string());
240    }
241
242    result.as_str().map(std::string::ToString::to_string)
243}
244
245/// Like `try_daemon_tool_call_blocking`, but unwraps MCP JSON responses to text for CLI output.
246pub fn try_daemon_tool_call_blocking_text(
247    name: &str,
248    arguments: Option<serde_json::Value>,
249) -> Option<String> {
250    let body = try_daemon_tool_call_blocking(name, arguments)?;
251    let trimmed = body.trim_start();
252    if !trimmed.starts_with('{') {
253        return Some(body);
254    }
255    Some(unwrap_mcp_tool_text(&body).unwrap_or(body))
256}