Skip to main content

ant_core/node/daemon/
client.rs

1use std::path::Path;
2use std::time::Duration;
3
4use crate::error::{Error, Result};
5use crate::node::daemon::health::FleetHealth;
6use crate::node::process::detach;
7use crate::node::types::{
8    DaemonConfig, DaemonInfo, DaemonStartResult, DaemonStatus, DaemonStopResult, NodeStarted,
9    NodeStatusResult, NodeStopped, RemoveNodeResult, StartNodeResult, StopNodeResult,
10};
11
12/// Get the daemon's current status by querying its REST API.
13///
14/// If the daemon is not running, returns a `DaemonStatus` with `running: false`.
15pub async fn status(config: &DaemonConfig) -> Result<DaemonStatus> {
16    let port = match read_port_file(&config.port_file_path) {
17        Some(port) => port,
18        None => {
19            return Ok(DaemonStatus {
20                running: false,
21                pid: None,
22                port: None,
23                uptime_secs: None,
24                nodes_total: 0,
25                nodes_running: 0,
26                nodes_stopped: 0,
27                nodes_errored: 0,
28            });
29        }
30    };
31
32    let url = format!("http://127.0.0.1:{port}/api/v1/status");
33    match reqwest::get(&url).await {
34        Ok(resp) => resp
35            .json::<DaemonStatus>()
36            .await
37            .map_err(|e| Error::HttpRequest(e.to_string())),
38        Err(_) => Ok(DaemonStatus {
39            running: false,
40            pid: None,
41            port: Some(port),
42            uptime_secs: None,
43            nodes_total: 0,
44            nodes_running: 0,
45            nodes_stopped: 0,
46            nodes_errored: 0,
47        }),
48    }
49}
50
51/// Stop the running daemon.
52///
53/// Reads the PID from the PID file, validates the process is actually a daemon
54/// instance, sends SIGTERM (Unix) or Ctrl+C (Windows), and waits for the process
55/// to exit.
56pub async fn stop(config: &DaemonConfig) -> Result<DaemonStopResult> {
57    let pid = read_pid_file(&config.pid_file_path)?;
58
59    // Validate the process is actually our daemon before killing it.
60    // After a crash, the PID may have been reused by an unrelated process.
61    if !is_process_alive(pid) {
62        // Process is already dead — just clean up stale files
63        let _ = std::fs::remove_file(&config.pid_file_path);
64        let _ = std::fs::remove_file(&config.port_file_path);
65        return Ok(DaemonStopResult { pid });
66    }
67
68    if !validate_daemon_process(pid) {
69        // PID is alive but isn't our daemon — clean up stale files without killing
70        let _ = std::fs::remove_file(&config.pid_file_path);
71        let _ = std::fs::remove_file(&config.port_file_path);
72        return Err(Error::DaemonStopFailed(format!(
73            "PID {pid} is alive but does not appear to be the ant daemon (possible PID reuse). \
74             Stale PID file removed."
75        )));
76    }
77
78    send_terminate(pid);
79
80    // Wait for process to exit
81    for _ in 0..50 {
82        tokio::time::sleep(Duration::from_millis(100)).await;
83        if !is_process_alive(pid) {
84            break;
85        }
86    }
87
88    // Verify the process actually died
89    if is_process_alive(pid) {
90        return Err(Error::DaemonStopFailed(format!(
91            "Daemon (PID {pid}) is still alive after 5 seconds"
92        )));
93    }
94
95    // Clean up files if they still exist
96    let _ = std::fs::remove_file(&config.pid_file_path);
97    let _ = std::fs::remove_file(&config.port_file_path);
98
99    Ok(DaemonStopResult { pid })
100}
101
102/// Start the daemon as a detached background process.
103///
104/// If the daemon is already running, returns a result with `already_running: true`.
105/// Otherwise, spawns the daemon and polls for the port file to confirm startup.
106pub async fn start(config: &DaemonConfig) -> Result<DaemonStartResult> {
107    // Check if daemon is already running
108    if let Some(pid) = check_running(&config.pid_file_path) {
109        let port = read_port_file(&config.port_file_path);
110        return Ok(DaemonStartResult {
111            already_running: true,
112            pid,
113            port,
114        });
115    }
116
117    // Get the path to the current executable
118    let exe = std::env::current_exe()
119        .map_err(|e| Error::ProcessSpawn(format!("Failed to get current executable: {e}")))?;
120    let exe_str = exe
121        .to_str()
122        .ok_or_else(|| Error::ProcessSpawn("Executable path is not valid UTF-8".to_string()))?;
123
124    let args = daemon_run_args(config);
125    let arg_refs: Vec<&str> = args.iter().map(String::as_str).collect();
126    let pid = detach::spawn_detached(exe_str, &arg_refs)?;
127
128    // Wait briefly for the daemon to write its port file
129    let mut port = None;
130    for _ in 0..20 {
131        tokio::time::sleep(Duration::from_millis(100)).await;
132        if let Some(p) = read_port_file(&config.port_file_path) {
133            port = Some(p);
134            break;
135        }
136    }
137
138    Ok(DaemonStartResult {
139        already_running: false,
140        pid,
141        port,
142    })
143}
144
145/// Start a specific node by ID via the daemon REST API.
146pub async fn start_node(config: &DaemonConfig, node_id: u32) -> Result<NodeStarted> {
147    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
148
149    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/{node_id}/start");
150    let resp = reqwest::Client::new()
151        .post(&url)
152        .send()
153        .await
154        .map_err(|e| Error::HttpRequest(e.to_string()))?;
155
156    if resp.status().is_success() {
157        resp.json::<NodeStarted>()
158            .await
159            .map_err(|e| Error::HttpRequest(e.to_string()))
160    } else {
161        let body = resp.text().await.unwrap_or_default();
162        Err(Error::HttpRequest(body))
163    }
164}
165
166/// Start all registered nodes via the daemon REST API.
167pub async fn start_all_nodes(config: &DaemonConfig) -> Result<StartNodeResult> {
168    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
169
170    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/start-all");
171    let resp = reqwest::Client::new()
172        .post(&url)
173        .send()
174        .await
175        .map_err(|e| Error::HttpRequest(e.to_string()))?;
176
177    if resp.status().is_success() {
178        resp.json::<StartNodeResult>()
179            .await
180            .map_err(|e| Error::HttpRequest(e.to_string()))
181    } else {
182        let body = resp.text().await.unwrap_or_default();
183        Err(Error::HttpRequest(body))
184    }
185}
186
187/// Stop a specific node by ID via the daemon REST API.
188pub async fn stop_node(config: &DaemonConfig, node_id: u32) -> Result<NodeStopped> {
189    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
190
191    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/{node_id}/stop");
192    let resp = reqwest::Client::new()
193        .post(&url)
194        .send()
195        .await
196        .map_err(|e| Error::HttpRequest(e.to_string()))?;
197
198    if resp.status().is_success() {
199        resp.json::<NodeStopped>()
200            .await
201            .map_err(|e| Error::HttpRequest(e.to_string()))
202    } else {
203        let body = resp.text().await.unwrap_or_default();
204        Err(Error::HttpRequest(body))
205    }
206}
207
208/// Dismiss a node — remove it from the registry — via the daemon REST API.
209///
210/// Intended for evicted nodes (whose data directory has already been deleted), but the daemon will
211/// remove any non-running node. Running nodes are rejected with a conflict error.
212pub async fn dismiss_node(config: &DaemonConfig, node_id: u32) -> Result<RemoveNodeResult> {
213    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
214
215    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/{node_id}");
216    let resp = reqwest::Client::new()
217        .delete(&url)
218        .send()
219        .await
220        .map_err(|e| Error::HttpRequest(e.to_string()))?;
221
222    if resp.status().is_success() {
223        resp.json::<RemoveNodeResult>()
224            .await
225            .map_err(|e| Error::HttpRequest(e.to_string()))
226    } else {
227        let body = resp.text().await.unwrap_or_default();
228        Err(Error::HttpRequest(body))
229    }
230}
231
232/// Get the current fleet health snapshot via the daemon REST API.
233pub async fn fleet_health(config: &DaemonConfig) -> Result<FleetHealth> {
234    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
235
236    let url = format!("http://127.0.0.1:{port}/api/v1/health");
237    let resp = reqwest::get(&url)
238        .await
239        .map_err(|e| Error::HttpRequest(e.to_string()))?;
240
241    if resp.status().is_success() {
242        resp.json::<FleetHealth>()
243            .await
244            .map_err(|e| Error::HttpRequest(e.to_string()))
245    } else {
246        let body = resp.text().await.unwrap_or_default();
247        Err(Error::HttpRequest(body))
248    }
249}
250
251/// Get the status of all registered nodes via the daemon REST API.
252pub async fn node_status(config: &DaemonConfig) -> Result<NodeStatusResult> {
253    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
254
255    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/status");
256    let resp = reqwest::get(&url)
257        .await
258        .map_err(|e| Error::HttpRequest(e.to_string()))?;
259
260    if resp.status().is_success() {
261        resp.json::<NodeStatusResult>()
262            .await
263            .map_err(|e| Error::HttpRequest(e.to_string()))
264    } else {
265        let body = resp.text().await.unwrap_or_default();
266        Err(Error::HttpRequest(body))
267    }
268}
269
270/// Stop all running nodes via the daemon REST API.
271pub async fn stop_all_nodes(config: &DaemonConfig) -> Result<StopNodeResult> {
272    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
273
274    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/stop-all");
275    let resp = reqwest::Client::new()
276        .post(&url)
277        .send()
278        .await
279        .map_err(|e| Error::HttpRequest(e.to_string()))?;
280
281    if resp.status().is_success() {
282        resp.json::<StopNodeResult>()
283            .await
284            .map_err(|e| Error::HttpRequest(e.to_string()))
285    } else {
286        let body = resp.text().await.unwrap_or_default();
287        Err(Error::HttpRequest(body))
288    }
289}
290
291/// Get daemon connection info for programmatic use.
292///
293/// Reads PID and port files and checks if the process is alive.
294pub fn info(config: &DaemonConfig) -> DaemonInfo {
295    let pid = std::fs::read_to_string(&config.pid_file_path)
296        .ok()
297        .and_then(|s| s.trim().parse::<u32>().ok());
298
299    let port = read_port_file(&config.port_file_path);
300
301    let running = pid.is_some_and(is_process_alive);
302
303    DaemonInfo {
304        running,
305        pid,
306        port,
307        api_base: port.map(|p| format!("http://127.0.0.1:{p}/api/v1")),
308    }
309}
310
311/// Run the daemon in the foreground (the actual daemon process entry point).
312///
313/// Starts the HTTP server, sets up signal handling, and blocks until shutdown.
314pub async fn run(config: DaemonConfig) -> Result<()> {
315    use crate::node::daemon::server;
316    use crate::node::registry::NodeRegistry;
317
318    let registry = NodeRegistry::load(&config.registry_path)?;
319    let shutdown = tokio_util::sync::CancellationToken::new();
320
321    let shutdown_clone = shutdown.clone();
322    tokio::spawn(async move {
323        tokio::signal::ctrl_c().await.ok();
324        shutdown_clone.cancel();
325    });
326
327    let _addr = server::start(config, registry, shutdown.clone()).await?;
328
329    shutdown.cancelled().await;
330    // Give the server a moment to clean up
331    tokio::time::sleep(Duration::from_millis(100)).await;
332
333    Ok(())
334}
335
336/// Validate that a PID belongs to an ant daemon process by checking its
337/// command line. This guards against PID reuse after a daemon crash.
338#[cfg(unix)]
339fn validate_daemon_process(pid: u32) -> bool {
340    let cmdline_path = format!("/proc/{pid}/cmdline");
341    match std::fs::read(&cmdline_path) {
342        Ok(raw) => {
343            // /proc/PID/cmdline uses null bytes as separators.
344            // Check that the executable basename ends with "ant" and one
345            // of the arguments is "daemon". This avoids false positives
346            // from processes like "rant" or "phantom-daemon".
347            let args: Vec<String> = raw
348                .split(|&b| b == 0)
349                .filter(|s| !s.is_empty())
350                .map(|s| String::from_utf8_lossy(s).to_string())
351                .collect();
352            let exe_matches = args
353                .first()
354                .and_then(|exe| std::path::Path::new(exe).file_name())
355                .and_then(|name| name.to_str())
356                .is_some_and(|name| name == "ant" || name == "ant.exe");
357            let has_daemon_arg = args.iter().any(|a| a == "daemon");
358            exe_matches && has_daemon_arg
359        }
360        Err(_) => {
361            // On non-Linux Unix (macOS), /proc doesn't exist. Fall back to
362            // trusting the PID file since there's no cheap way to inspect
363            // the command line without shelling out.
364            true
365        }
366    }
367}
368
369#[cfg(windows)]
370fn validate_daemon_process(pid: u32) -> bool {
371    use windows_sys::Win32::Foundation::CloseHandle;
372    use windows_sys::Win32::System::Threading::{
373        OpenProcess, QueryFullProcessImageNameW, PROCESS_QUERY_LIMITED_INFORMATION,
374    };
375
376    unsafe {
377        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
378        if handle.is_null() {
379            return false;
380        }
381        let mut buf = [0u16; 1024];
382        let mut size = buf.len() as u32;
383        let success = QueryFullProcessImageNameW(handle, 0, buf.as_mut_ptr(), &mut size);
384        CloseHandle(handle);
385
386        if success == 0 {
387            return false;
388        }
389        let path = String::from_utf16_lossy(&buf[..size as usize]);
390        // Check the executable basename, not just substring
391        std::path::Path::new(&path)
392            .file_stem()
393            .and_then(|s| s.to_str())
394            .is_some_and(|name| name == "ant")
395    }
396}
397
398/// Build the arg list passed to the detached `ant node daemon run` child.
399///
400/// Overrides are forwarded explicitly so the child binds to the same address
401/// and port the caller asked for. Unset fields fall through to the child's
402/// own defaults (loopback + OS-assigned port).
403fn daemon_run_args(config: &DaemonConfig) -> Vec<String> {
404    let defaults = DaemonConfig::default();
405    let mut args = vec!["node".to_string(), "daemon".to_string(), "run".to_string()];
406    if let Some(port) = config.port {
407        args.push("--port".to_string());
408        args.push(port.to_string());
409    }
410    if config.listen_addr != defaults.listen_addr {
411        args.push("--listen-addr".to_string());
412        args.push(config.listen_addr.to_string());
413    }
414    args
415}
416
417fn read_port_file(path: &Path) -> Option<u16> {
418    std::fs::read_to_string(path)
419        .ok()
420        .and_then(|s| s.trim().parse::<u16>().ok())
421}
422
423fn read_pid_file(path: &Path) -> Result<u32> {
424    let contents = std::fs::read_to_string(path).map_err(|_| Error::DaemonNotRunning)?;
425    contents
426        .trim()
427        .parse::<u32>()
428        .map_err(|_| Error::DaemonNotRunning)
429}
430
431/// Check if a daemon is running. Returns the PID if so.
432fn check_running(pid_file: &Path) -> Option<u32> {
433    let pid = read_pid_file(pid_file).ok()?;
434    if is_process_alive(pid) {
435        Some(pid)
436    } else {
437        None
438    }
439}
440
441#[cfg(unix)]
442fn pid_to_i32(pid: u32) -> Option<i32> {
443    i32::try_from(pid).ok().filter(|&p| p > 0)
444}
445
446#[cfg(unix)]
447fn send_terminate(pid: u32) {
448    if let Some(pid) = pid_to_i32(pid) {
449        unsafe {
450            libc::kill(pid, libc::SIGTERM);
451        }
452    }
453}
454
455#[cfg(windows)]
456fn send_terminate(pid: u32) {
457    use windows_sys::Win32::Foundation::CloseHandle;
458    use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
459
460    unsafe {
461        let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
462        if !handle.is_null() {
463            TerminateProcess(handle, 1);
464            CloseHandle(handle);
465        }
466    }
467}
468
469#[cfg(unix)]
470fn is_process_alive(pid: u32) -> bool {
471    let Some(pid) = pid_to_i32(pid) else {
472        return false;
473    };
474    let ret = unsafe { libc::kill(pid, 0) };
475    if ret == 0 {
476        return true;
477    }
478    // EPERM means the process exists but we lack permission to signal it
479    std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
480}
481
482#[cfg(windows)]
483fn is_process_alive(pid: u32) -> bool {
484    use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
485    use windows_sys::Win32::System::Threading::{
486        GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
487    };
488
489    unsafe {
490        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
491        if handle.is_null() {
492            return false;
493        }
494        let mut exit_code: u32 = 0;
495        let success = GetExitCodeProcess(handle, &mut exit_code);
496        CloseHandle(handle);
497        success != 0 && exit_code == STILL_ACTIVE as u32
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504    use std::net::Ipv4Addr;
505
506    #[test]
507    fn run_args_default_config_has_no_overrides() {
508        let config = DaemonConfig::default();
509        let args = daemon_run_args(&config);
510        assert_eq!(args, vec!["node", "daemon", "run"]);
511    }
512
513    #[test]
514    fn run_args_forward_explicit_port() {
515        let config = DaemonConfig {
516            port: Some(8765),
517            ..DaemonConfig::default()
518        };
519        let args = daemon_run_args(&config);
520        assert_eq!(args, vec!["node", "daemon", "run", "--port", "8765"]);
521    }
522
523    #[test]
524    fn run_args_forward_explicit_listen_addr() {
525        let config = DaemonConfig {
526            listen_addr: std::net::IpAddr::V4(Ipv4Addr::UNSPECIFIED),
527            ..DaemonConfig::default()
528        };
529        let args = daemon_run_args(&config);
530        assert_eq!(
531            args,
532            vec!["node", "daemon", "run", "--listen-addr", "0.0.0.0"]
533        );
534    }
535
536    #[test]
537    fn run_args_forward_both_overrides() {
538        let config = DaemonConfig {
539            port: Some(8765),
540            listen_addr: std::net::IpAddr::V4(Ipv4Addr::UNSPECIFIED),
541            ..DaemonConfig::default()
542        };
543        let args = daemon_run_args(&config);
544        assert_eq!(
545            args,
546            vec![
547                "node",
548                "daemon",
549                "run",
550                "--port",
551                "8765",
552                "--listen-addr",
553                "0.0.0.0",
554            ]
555        );
556    }
557
558    #[test]
559    fn run_args_forward_explicit_zero_port() {
560        // Explicit `--port 0` is preserved so the user's intent (OS-assigned)
561        // round-trips through the spawn, even though the child's default would
562        // produce the same bind behavior.
563        let config = DaemonConfig {
564            port: Some(0),
565            ..DaemonConfig::default()
566        };
567        let args = daemon_run_args(&config);
568        assert_eq!(args, vec!["node", "daemon", "run", "--port", "0"]);
569    }
570}