Skip to main content

ant_core/node/daemon/
client.rs

1use std::path::Path;
2use std::time::Duration;
3
4use crate::error::{Error, Result};
5use crate::node::process::detach;
6use crate::node::types::{
7    DaemonConfig, DaemonInfo, DaemonStartResult, DaemonStatus, DaemonStopResult, NodeStarted,
8    NodeStatusResult, NodeStopped, StartNodeResult, StopNodeResult,
9};
10
11/// Get the daemon's current status by querying its REST API.
12///
13/// If the daemon is not running, returns a `DaemonStatus` with `running: false`.
14pub async fn status(config: &DaemonConfig) -> Result<DaemonStatus> {
15    let port = match read_port_file(&config.port_file_path) {
16        Some(port) => port,
17        None => {
18            return Ok(DaemonStatus {
19                running: false,
20                pid: None,
21                port: None,
22                uptime_secs: None,
23                nodes_total: 0,
24                nodes_running: 0,
25                nodes_stopped: 0,
26                nodes_errored: 0,
27            });
28        }
29    };
30
31    let url = format!("http://127.0.0.1:{port}/api/v1/status");
32    match reqwest::get(&url).await {
33        Ok(resp) => resp
34            .json::<DaemonStatus>()
35            .await
36            .map_err(|e| Error::HttpRequest(e.to_string())),
37        Err(_) => Ok(DaemonStatus {
38            running: false,
39            pid: None,
40            port: Some(port),
41            uptime_secs: None,
42            nodes_total: 0,
43            nodes_running: 0,
44            nodes_stopped: 0,
45            nodes_errored: 0,
46        }),
47    }
48}
49
50/// Stop the running daemon.
51///
52/// Reads the PID from the PID file, validates the process is actually a daemon
53/// instance, sends SIGTERM (Unix) or Ctrl+C (Windows), and waits for the process
54/// to exit.
55pub async fn stop(config: &DaemonConfig) -> Result<DaemonStopResult> {
56    let pid = read_pid_file(&config.pid_file_path)?;
57
58    // Validate the process is actually our daemon before killing it.
59    // After a crash, the PID may have been reused by an unrelated process.
60    if !is_process_alive(pid) {
61        // Process is already dead — just clean up stale files
62        let _ = std::fs::remove_file(&config.pid_file_path);
63        let _ = std::fs::remove_file(&config.port_file_path);
64        return Ok(DaemonStopResult { pid });
65    }
66
67    if !validate_daemon_process(pid) {
68        // PID is alive but isn't our daemon — clean up stale files without killing
69        let _ = std::fs::remove_file(&config.pid_file_path);
70        let _ = std::fs::remove_file(&config.port_file_path);
71        return Err(Error::DaemonStopFailed(format!(
72            "PID {pid} is alive but does not appear to be the ant daemon (possible PID reuse). \
73             Stale PID file removed."
74        )));
75    }
76
77    send_terminate(pid);
78
79    // Wait for process to exit
80    for _ in 0..50 {
81        tokio::time::sleep(Duration::from_millis(100)).await;
82        if !is_process_alive(pid) {
83            break;
84        }
85    }
86
87    // Verify the process actually died
88    if is_process_alive(pid) {
89        return Err(Error::DaemonStopFailed(format!(
90            "Daemon (PID {pid}) is still alive after 5 seconds"
91        )));
92    }
93
94    // Clean up files if they still exist
95    let _ = std::fs::remove_file(&config.pid_file_path);
96    let _ = std::fs::remove_file(&config.port_file_path);
97
98    Ok(DaemonStopResult { pid })
99}
100
101/// Start the daemon as a detached background process.
102///
103/// If the daemon is already running, returns a result with `already_running: true`.
104/// Otherwise, spawns the daemon and polls for the port file to confirm startup.
105pub async fn start(config: &DaemonConfig) -> Result<DaemonStartResult> {
106    // Check if daemon is already running
107    if let Some(pid) = check_running(&config.pid_file_path) {
108        let port = read_port_file(&config.port_file_path);
109        return Ok(DaemonStartResult {
110            already_running: true,
111            pid,
112            port,
113        });
114    }
115
116    // Get the path to the current executable
117    let exe = std::env::current_exe()
118        .map_err(|e| Error::ProcessSpawn(format!("Failed to get current executable: {e}")))?;
119    let exe_str = exe
120        .to_str()
121        .ok_or_else(|| Error::ProcessSpawn("Executable path is not valid UTF-8".to_string()))?;
122
123    let args = daemon_run_args(config);
124    let arg_refs: Vec<&str> = args.iter().map(String::as_str).collect();
125    let pid = detach::spawn_detached(exe_str, &arg_refs)?;
126
127    // Wait briefly for the daemon to write its port file
128    let mut port = None;
129    for _ in 0..20 {
130        tokio::time::sleep(Duration::from_millis(100)).await;
131        if let Some(p) = read_port_file(&config.port_file_path) {
132            port = Some(p);
133            break;
134        }
135    }
136
137    Ok(DaemonStartResult {
138        already_running: false,
139        pid,
140        port,
141    })
142}
143
144/// Start a specific node by ID via the daemon REST API.
145pub async fn start_node(config: &DaemonConfig, node_id: u32) -> Result<NodeStarted> {
146    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
147
148    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/{node_id}/start");
149    let resp = reqwest::Client::new()
150        .post(&url)
151        .send()
152        .await
153        .map_err(|e| Error::HttpRequest(e.to_string()))?;
154
155    if resp.status().is_success() {
156        resp.json::<NodeStarted>()
157            .await
158            .map_err(|e| Error::HttpRequest(e.to_string()))
159    } else {
160        let body = resp.text().await.unwrap_or_default();
161        Err(Error::HttpRequest(body))
162    }
163}
164
165/// Start all registered nodes via the daemon REST API.
166pub async fn start_all_nodes(config: &DaemonConfig) -> Result<StartNodeResult> {
167    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
168
169    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/start-all");
170    let resp = reqwest::Client::new()
171        .post(&url)
172        .send()
173        .await
174        .map_err(|e| Error::HttpRequest(e.to_string()))?;
175
176    if resp.status().is_success() {
177        resp.json::<StartNodeResult>()
178            .await
179            .map_err(|e| Error::HttpRequest(e.to_string()))
180    } else {
181        let body = resp.text().await.unwrap_or_default();
182        Err(Error::HttpRequest(body))
183    }
184}
185
186/// Stop a specific node by ID via the daemon REST API.
187pub async fn stop_node(config: &DaemonConfig, node_id: u32) -> Result<NodeStopped> {
188    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
189
190    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/{node_id}/stop");
191    let resp = reqwest::Client::new()
192        .post(&url)
193        .send()
194        .await
195        .map_err(|e| Error::HttpRequest(e.to_string()))?;
196
197    if resp.status().is_success() {
198        resp.json::<NodeStopped>()
199            .await
200            .map_err(|e| Error::HttpRequest(e.to_string()))
201    } else {
202        let body = resp.text().await.unwrap_or_default();
203        Err(Error::HttpRequest(body))
204    }
205}
206
207/// Get the status of all registered nodes via the daemon REST API.
208pub async fn node_status(config: &DaemonConfig) -> Result<NodeStatusResult> {
209    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
210
211    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/status");
212    let resp = reqwest::get(&url)
213        .await
214        .map_err(|e| Error::HttpRequest(e.to_string()))?;
215
216    if resp.status().is_success() {
217        resp.json::<NodeStatusResult>()
218            .await
219            .map_err(|e| Error::HttpRequest(e.to_string()))
220    } else {
221        let body = resp.text().await.unwrap_or_default();
222        Err(Error::HttpRequest(body))
223    }
224}
225
226/// Stop all running nodes via the daemon REST API.
227pub async fn stop_all_nodes(config: &DaemonConfig) -> Result<StopNodeResult> {
228    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
229
230    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/stop-all");
231    let resp = reqwest::Client::new()
232        .post(&url)
233        .send()
234        .await
235        .map_err(|e| Error::HttpRequest(e.to_string()))?;
236
237    if resp.status().is_success() {
238        resp.json::<StopNodeResult>()
239            .await
240            .map_err(|e| Error::HttpRequest(e.to_string()))
241    } else {
242        let body = resp.text().await.unwrap_or_default();
243        Err(Error::HttpRequest(body))
244    }
245}
246
247/// Get daemon connection info for programmatic use.
248///
249/// Reads PID and port files and checks if the process is alive.
250pub fn info(config: &DaemonConfig) -> DaemonInfo {
251    let pid = std::fs::read_to_string(&config.pid_file_path)
252        .ok()
253        .and_then(|s| s.trim().parse::<u32>().ok());
254
255    let port = read_port_file(&config.port_file_path);
256
257    let running = pid.is_some_and(is_process_alive);
258
259    DaemonInfo {
260        running,
261        pid,
262        port,
263        api_base: port.map(|p| format!("http://127.0.0.1:{p}/api/v1")),
264    }
265}
266
267/// Run the daemon in the foreground (the actual daemon process entry point).
268///
269/// Starts the HTTP server, sets up signal handling, and blocks until shutdown.
270pub async fn run(config: DaemonConfig) -> Result<()> {
271    use crate::node::daemon::server;
272    use crate::node::registry::NodeRegistry;
273
274    let registry = NodeRegistry::load(&config.registry_path)?;
275    let shutdown = tokio_util::sync::CancellationToken::new();
276
277    let shutdown_clone = shutdown.clone();
278    tokio::spawn(async move {
279        tokio::signal::ctrl_c().await.ok();
280        shutdown_clone.cancel();
281    });
282
283    let _addr = server::start(config, registry, shutdown.clone()).await?;
284
285    shutdown.cancelled().await;
286    // Give the server a moment to clean up
287    tokio::time::sleep(Duration::from_millis(100)).await;
288
289    Ok(())
290}
291
292/// Validate that a PID belongs to an ant daemon process by checking its
293/// command line. This guards against PID reuse after a daemon crash.
294#[cfg(unix)]
295fn validate_daemon_process(pid: u32) -> bool {
296    let cmdline_path = format!("/proc/{pid}/cmdline");
297    match std::fs::read(&cmdline_path) {
298        Ok(raw) => {
299            // /proc/PID/cmdline uses null bytes as separators.
300            // Check that the executable basename ends with "ant" and one
301            // of the arguments is "daemon". This avoids false positives
302            // from processes like "rant" or "phantom-daemon".
303            let args: Vec<String> = raw
304                .split(|&b| b == 0)
305                .filter(|s| !s.is_empty())
306                .map(|s| String::from_utf8_lossy(s).to_string())
307                .collect();
308            let exe_matches = args
309                .first()
310                .and_then(|exe| std::path::Path::new(exe).file_name())
311                .and_then(|name| name.to_str())
312                .is_some_and(|name| name == "ant" || name == "ant.exe");
313            let has_daemon_arg = args.iter().any(|a| a == "daemon");
314            exe_matches && has_daemon_arg
315        }
316        Err(_) => {
317            // On non-Linux Unix (macOS), /proc doesn't exist. Fall back to
318            // trusting the PID file since there's no cheap way to inspect
319            // the command line without shelling out.
320            true
321        }
322    }
323}
324
325#[cfg(windows)]
326fn validate_daemon_process(pid: u32) -> bool {
327    use windows_sys::Win32::Foundation::CloseHandle;
328    use windows_sys::Win32::System::Threading::{
329        OpenProcess, QueryFullProcessImageNameW, PROCESS_QUERY_LIMITED_INFORMATION,
330    };
331
332    unsafe {
333        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
334        if handle.is_null() {
335            return false;
336        }
337        let mut buf = [0u16; 1024];
338        let mut size = buf.len() as u32;
339        let success = QueryFullProcessImageNameW(handle, 0, buf.as_mut_ptr(), &mut size);
340        CloseHandle(handle);
341
342        if success == 0 {
343            return false;
344        }
345        let path = String::from_utf16_lossy(&buf[..size as usize]);
346        // Check the executable basename, not just substring
347        std::path::Path::new(&path)
348            .file_stem()
349            .and_then(|s| s.to_str())
350            .is_some_and(|name| name == "ant")
351    }
352}
353
354/// Build the arg list passed to the detached `ant node daemon run` child.
355///
356/// Overrides are forwarded explicitly so the child binds to the same address
357/// and port the caller asked for. Unset fields fall through to the child's
358/// own defaults (loopback + OS-assigned port).
359fn daemon_run_args(config: &DaemonConfig) -> Vec<String> {
360    let defaults = DaemonConfig::default();
361    let mut args = vec!["node".to_string(), "daemon".to_string(), "run".to_string()];
362    if let Some(port) = config.port {
363        args.push("--port".to_string());
364        args.push(port.to_string());
365    }
366    if config.listen_addr != defaults.listen_addr {
367        args.push("--listen-addr".to_string());
368        args.push(config.listen_addr.to_string());
369    }
370    args
371}
372
373fn read_port_file(path: &Path) -> Option<u16> {
374    std::fs::read_to_string(path)
375        .ok()
376        .and_then(|s| s.trim().parse::<u16>().ok())
377}
378
379fn read_pid_file(path: &Path) -> Result<u32> {
380    let contents = std::fs::read_to_string(path).map_err(|_| Error::DaemonNotRunning)?;
381    contents
382        .trim()
383        .parse::<u32>()
384        .map_err(|_| Error::DaemonNotRunning)
385}
386
387/// Check if a daemon is running. Returns the PID if so.
388fn check_running(pid_file: &Path) -> Option<u32> {
389    let pid = read_pid_file(pid_file).ok()?;
390    if is_process_alive(pid) {
391        Some(pid)
392    } else {
393        None
394    }
395}
396
397#[cfg(unix)]
398fn pid_to_i32(pid: u32) -> Option<i32> {
399    i32::try_from(pid).ok().filter(|&p| p > 0)
400}
401
402#[cfg(unix)]
403fn send_terminate(pid: u32) {
404    if let Some(pid) = pid_to_i32(pid) {
405        unsafe {
406            libc::kill(pid, libc::SIGTERM);
407        }
408    }
409}
410
411#[cfg(windows)]
412fn send_terminate(pid: u32) {
413    use windows_sys::Win32::Foundation::CloseHandle;
414    use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
415
416    unsafe {
417        let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
418        if !handle.is_null() {
419            TerminateProcess(handle, 1);
420            CloseHandle(handle);
421        }
422    }
423}
424
425#[cfg(unix)]
426fn is_process_alive(pid: u32) -> bool {
427    let Some(pid) = pid_to_i32(pid) else {
428        return false;
429    };
430    let ret = unsafe { libc::kill(pid, 0) };
431    if ret == 0 {
432        return true;
433    }
434    // EPERM means the process exists but we lack permission to signal it
435    std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
436}
437
438#[cfg(windows)]
439fn is_process_alive(pid: u32) -> bool {
440    use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
441    use windows_sys::Win32::System::Threading::{
442        GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
443    };
444
445    unsafe {
446        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
447        if handle.is_null() {
448            return false;
449        }
450        let mut exit_code: u32 = 0;
451        let success = GetExitCodeProcess(handle, &mut exit_code);
452        CloseHandle(handle);
453        success != 0 && exit_code == STILL_ACTIVE as u32
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use std::net::Ipv4Addr;
461
462    #[test]
463    fn run_args_default_config_has_no_overrides() {
464        let config = DaemonConfig::default();
465        let args = daemon_run_args(&config);
466        assert_eq!(args, vec!["node", "daemon", "run"]);
467    }
468
469    #[test]
470    fn run_args_forward_explicit_port() {
471        let config = DaemonConfig {
472            port: Some(8765),
473            ..DaemonConfig::default()
474        };
475        let args = daemon_run_args(&config);
476        assert_eq!(args, vec!["node", "daemon", "run", "--port", "8765"]);
477    }
478
479    #[test]
480    fn run_args_forward_explicit_listen_addr() {
481        let config = DaemonConfig {
482            listen_addr: std::net::IpAddr::V4(Ipv4Addr::UNSPECIFIED),
483            ..DaemonConfig::default()
484        };
485        let args = daemon_run_args(&config);
486        assert_eq!(
487            args,
488            vec!["node", "daemon", "run", "--listen-addr", "0.0.0.0"]
489        );
490    }
491
492    #[test]
493    fn run_args_forward_both_overrides() {
494        let config = DaemonConfig {
495            port: Some(8765),
496            listen_addr: std::net::IpAddr::V4(Ipv4Addr::UNSPECIFIED),
497            ..DaemonConfig::default()
498        };
499        let args = daemon_run_args(&config);
500        assert_eq!(
501            args,
502            vec![
503                "node",
504                "daemon",
505                "run",
506                "--port",
507                "8765",
508                "--listen-addr",
509                "0.0.0.0",
510            ]
511        );
512    }
513
514    #[test]
515    fn run_args_forward_explicit_zero_port() {
516        // Explicit `--port 0` is preserved so the user's intent (OS-assigned)
517        // round-trips through the spawn, even though the child's default would
518        // produce the same bind behavior.
519        let config = DaemonConfig {
520            port: Some(0),
521            ..DaemonConfig::default()
522        };
523        let args = daemon_run_args(&config);
524        assert_eq!(args, vec!["node", "daemon", "run", "--port", "0"]);
525    }
526}