Skip to main content

ant_core/node/daemon/
client.rs

1use std::path::Path;
2use std::time::Duration;
3
4use crate::error::{Error, Result};
5use crate::node::process::detach;
6use crate::node::types::{
7    DaemonConfig, DaemonInfo, DaemonStartResult, DaemonStatus, DaemonStopResult, NodeStarted,
8    NodeStatusResult, NodeStopped, StartNodeResult, StopNodeResult,
9};
10
11/// Get the daemon's current status by querying its REST API.
12///
13/// If the daemon is not running, returns a `DaemonStatus` with `running: false`.
14pub async fn status(config: &DaemonConfig) -> Result<DaemonStatus> {
15    let port = match read_port_file(&config.port_file_path) {
16        Some(port) => port,
17        None => {
18            return Ok(DaemonStatus {
19                running: false,
20                pid: None,
21                port: None,
22                uptime_secs: None,
23                nodes_total: 0,
24                nodes_running: 0,
25                nodes_stopped: 0,
26                nodes_errored: 0,
27            });
28        }
29    };
30
31    let url = format!("http://127.0.0.1:{port}/api/v1/status");
32    match reqwest::get(&url).await {
33        Ok(resp) => resp
34            .json::<DaemonStatus>()
35            .await
36            .map_err(|e| Error::HttpRequest(e.to_string())),
37        Err(_) => Ok(DaemonStatus {
38            running: false,
39            pid: None,
40            port: Some(port),
41            uptime_secs: None,
42            nodes_total: 0,
43            nodes_running: 0,
44            nodes_stopped: 0,
45            nodes_errored: 0,
46        }),
47    }
48}
49
50/// Stop the running daemon.
51///
52/// Reads the PID from the PID file, validates the process is actually a daemon
53/// instance, sends SIGTERM (Unix) or Ctrl+C (Windows), and waits for the process
54/// to exit.
55pub async fn stop(config: &DaemonConfig) -> Result<DaemonStopResult> {
56    let pid = read_pid_file(&config.pid_file_path)?;
57
58    // Validate the process is actually our daemon before killing it.
59    // After a crash, the PID may have been reused by an unrelated process.
60    if !is_process_alive(pid) {
61        // Process is already dead — just clean up stale files
62        let _ = std::fs::remove_file(&config.pid_file_path);
63        let _ = std::fs::remove_file(&config.port_file_path);
64        return Ok(DaemonStopResult { pid });
65    }
66
67    if !validate_daemon_process(pid) {
68        // PID is alive but isn't our daemon — clean up stale files without killing
69        let _ = std::fs::remove_file(&config.pid_file_path);
70        let _ = std::fs::remove_file(&config.port_file_path);
71        return Err(Error::DaemonStopFailed(format!(
72            "PID {pid} is alive but does not appear to be the ant daemon (possible PID reuse). \
73             Stale PID file removed."
74        )));
75    }
76
77    send_terminate(pid);
78
79    // Wait for process to exit
80    for _ in 0..50 {
81        tokio::time::sleep(Duration::from_millis(100)).await;
82        if !is_process_alive(pid) {
83            break;
84        }
85    }
86
87    // Verify the process actually died
88    if is_process_alive(pid) {
89        return Err(Error::DaemonStopFailed(format!(
90            "Daemon (PID {pid}) is still alive after 5 seconds"
91        )));
92    }
93
94    // Clean up files if they still exist
95    let _ = std::fs::remove_file(&config.pid_file_path);
96    let _ = std::fs::remove_file(&config.port_file_path);
97
98    Ok(DaemonStopResult { pid })
99}
100
101/// Start the daemon as a detached background process.
102///
103/// If the daemon is already running, returns a result with `already_running: true`.
104/// Otherwise, spawns the daemon and polls for the port file to confirm startup.
105pub async fn start(config: &DaemonConfig) -> Result<DaemonStartResult> {
106    // Check if daemon is already running
107    if let Some(pid) = check_running(&config.pid_file_path) {
108        let port = read_port_file(&config.port_file_path);
109        return Ok(DaemonStartResult {
110            already_running: true,
111            pid,
112            port,
113        });
114    }
115
116    // Get the path to the current executable
117    let exe = std::env::current_exe()
118        .map_err(|e| Error::ProcessSpawn(format!("Failed to get current executable: {e}")))?;
119    let exe_str = exe
120        .to_str()
121        .ok_or_else(|| Error::ProcessSpawn("Executable path is not valid UTF-8".to_string()))?;
122
123    let pid = detach::spawn_detached(exe_str, &["node", "daemon", "run"])?;
124
125    // Wait briefly for the daemon to write its port file
126    let mut port = None;
127    for _ in 0..20 {
128        tokio::time::sleep(Duration::from_millis(100)).await;
129        if let Some(p) = read_port_file(&config.port_file_path) {
130            port = Some(p);
131            break;
132        }
133    }
134
135    Ok(DaemonStartResult {
136        already_running: false,
137        pid,
138        port,
139    })
140}
141
142/// Start a specific node by ID via the daemon REST API.
143pub async fn start_node(config: &DaemonConfig, node_id: u32) -> Result<NodeStarted> {
144    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
145
146    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/{node_id}/start");
147    let resp = reqwest::Client::new()
148        .post(&url)
149        .send()
150        .await
151        .map_err(|e| Error::HttpRequest(e.to_string()))?;
152
153    if resp.status().is_success() {
154        resp.json::<NodeStarted>()
155            .await
156            .map_err(|e| Error::HttpRequest(e.to_string()))
157    } else {
158        let body = resp.text().await.unwrap_or_default();
159        Err(Error::HttpRequest(body))
160    }
161}
162
163/// Start all registered nodes via the daemon REST API.
164pub async fn start_all_nodes(config: &DaemonConfig) -> Result<StartNodeResult> {
165    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
166
167    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/start-all");
168    let resp = reqwest::Client::new()
169        .post(&url)
170        .send()
171        .await
172        .map_err(|e| Error::HttpRequest(e.to_string()))?;
173
174    if resp.status().is_success() {
175        resp.json::<StartNodeResult>()
176            .await
177            .map_err(|e| Error::HttpRequest(e.to_string()))
178    } else {
179        let body = resp.text().await.unwrap_or_default();
180        Err(Error::HttpRequest(body))
181    }
182}
183
184/// Stop a specific node by ID via the daemon REST API.
185pub async fn stop_node(config: &DaemonConfig, node_id: u32) -> Result<NodeStopped> {
186    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
187
188    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/{node_id}/stop");
189    let resp = reqwest::Client::new()
190        .post(&url)
191        .send()
192        .await
193        .map_err(|e| Error::HttpRequest(e.to_string()))?;
194
195    if resp.status().is_success() {
196        resp.json::<NodeStopped>()
197            .await
198            .map_err(|e| Error::HttpRequest(e.to_string()))
199    } else {
200        let body = resp.text().await.unwrap_or_default();
201        Err(Error::HttpRequest(body))
202    }
203}
204
205/// Get the status of all registered nodes via the daemon REST API.
206pub async fn node_status(config: &DaemonConfig) -> Result<NodeStatusResult> {
207    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
208
209    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/status");
210    let resp = reqwest::get(&url)
211        .await
212        .map_err(|e| Error::HttpRequest(e.to_string()))?;
213
214    if resp.status().is_success() {
215        resp.json::<NodeStatusResult>()
216            .await
217            .map_err(|e| Error::HttpRequest(e.to_string()))
218    } else {
219        let body = resp.text().await.unwrap_or_default();
220        Err(Error::HttpRequest(body))
221    }
222}
223
224/// Stop all running nodes via the daemon REST API.
225pub async fn stop_all_nodes(config: &DaemonConfig) -> Result<StopNodeResult> {
226    let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
227
228    let url = format!("http://127.0.0.1:{port}/api/v1/nodes/stop-all");
229    let resp = reqwest::Client::new()
230        .post(&url)
231        .send()
232        .await
233        .map_err(|e| Error::HttpRequest(e.to_string()))?;
234
235    if resp.status().is_success() {
236        resp.json::<StopNodeResult>()
237            .await
238            .map_err(|e| Error::HttpRequest(e.to_string()))
239    } else {
240        let body = resp.text().await.unwrap_or_default();
241        Err(Error::HttpRequest(body))
242    }
243}
244
245/// Get daemon connection info for programmatic use.
246///
247/// Reads PID and port files and checks if the process is alive.
248pub fn info(config: &DaemonConfig) -> DaemonInfo {
249    let pid = std::fs::read_to_string(&config.pid_file_path)
250        .ok()
251        .and_then(|s| s.trim().parse::<u32>().ok());
252
253    let port = read_port_file(&config.port_file_path);
254
255    let running = pid.is_some_and(is_process_alive);
256
257    DaemonInfo {
258        running,
259        pid,
260        port,
261        api_base: port.map(|p| format!("http://127.0.0.1:{p}/api/v1")),
262    }
263}
264
265/// Run the daemon in the foreground (the actual daemon process entry point).
266///
267/// Starts the HTTP server, sets up signal handling, and blocks until shutdown.
268pub async fn run(config: DaemonConfig) -> Result<()> {
269    use crate::node::daemon::server;
270    use crate::node::registry::NodeRegistry;
271
272    let registry = NodeRegistry::load(&config.registry_path)?;
273    let shutdown = tokio_util::sync::CancellationToken::new();
274
275    let shutdown_clone = shutdown.clone();
276    tokio::spawn(async move {
277        tokio::signal::ctrl_c().await.ok();
278        shutdown_clone.cancel();
279    });
280
281    let _addr = server::start(config, registry, shutdown.clone()).await?;
282
283    shutdown.cancelled().await;
284    // Give the server a moment to clean up
285    tokio::time::sleep(Duration::from_millis(100)).await;
286
287    Ok(())
288}
289
290/// Validate that a PID belongs to an ant daemon process by checking its
291/// command line. This guards against PID reuse after a daemon crash.
292#[cfg(unix)]
293fn validate_daemon_process(pid: u32) -> bool {
294    let cmdline_path = format!("/proc/{pid}/cmdline");
295    match std::fs::read(&cmdline_path) {
296        Ok(raw) => {
297            // /proc/PID/cmdline uses null bytes as separators.
298            // Check that the executable basename ends with "ant" and one
299            // of the arguments is "daemon". This avoids false positives
300            // from processes like "rant" or "phantom-daemon".
301            let args: Vec<String> = raw
302                .split(|&b| b == 0)
303                .filter(|s| !s.is_empty())
304                .map(|s| String::from_utf8_lossy(s).to_string())
305                .collect();
306            let exe_matches = args
307                .first()
308                .and_then(|exe| std::path::Path::new(exe).file_name())
309                .and_then(|name| name.to_str())
310                .is_some_and(|name| name == "ant" || name == "ant.exe");
311            let has_daemon_arg = args.iter().any(|a| a == "daemon");
312            exe_matches && has_daemon_arg
313        }
314        Err(_) => {
315            // On non-Linux Unix (macOS), /proc doesn't exist. Fall back to
316            // trusting the PID file since there's no cheap way to inspect
317            // the command line without shelling out.
318            true
319        }
320    }
321}
322
323#[cfg(windows)]
324fn validate_daemon_process(pid: u32) -> bool {
325    use windows_sys::Win32::Foundation::CloseHandle;
326    use windows_sys::Win32::System::Threading::{
327        OpenProcess, QueryFullProcessImageNameW, PROCESS_QUERY_LIMITED_INFORMATION,
328    };
329
330    unsafe {
331        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
332        if handle.is_null() {
333            return false;
334        }
335        let mut buf = [0u16; 1024];
336        let mut size = buf.len() as u32;
337        let success = QueryFullProcessImageNameW(handle, 0, buf.as_mut_ptr(), &mut size);
338        CloseHandle(handle);
339
340        if success == 0 {
341            return false;
342        }
343        let path = String::from_utf16_lossy(&buf[..size as usize]);
344        // Check the executable basename, not just substring
345        std::path::Path::new(&path)
346            .file_stem()
347            .and_then(|s| s.to_str())
348            .is_some_and(|name| name == "ant")
349    }
350}
351
352fn read_port_file(path: &Path) -> Option<u16> {
353    std::fs::read_to_string(path)
354        .ok()
355        .and_then(|s| s.trim().parse::<u16>().ok())
356}
357
358fn read_pid_file(path: &Path) -> Result<u32> {
359    let contents = std::fs::read_to_string(path).map_err(|_| Error::DaemonNotRunning)?;
360    contents
361        .trim()
362        .parse::<u32>()
363        .map_err(|_| Error::DaemonNotRunning)
364}
365
366/// Check if a daemon is running. Returns the PID if so.
367fn check_running(pid_file: &Path) -> Option<u32> {
368    let pid = read_pid_file(pid_file).ok()?;
369    if is_process_alive(pid) {
370        Some(pid)
371    } else {
372        None
373    }
374}
375
376#[cfg(unix)]
377fn pid_to_i32(pid: u32) -> Option<i32> {
378    i32::try_from(pid).ok().filter(|&p| p > 0)
379}
380
381#[cfg(unix)]
382fn send_terminate(pid: u32) {
383    if let Some(pid) = pid_to_i32(pid) {
384        unsafe {
385            libc::kill(pid, libc::SIGTERM);
386        }
387    }
388}
389
390#[cfg(windows)]
391fn send_terminate(pid: u32) {
392    use windows_sys::Win32::Foundation::CloseHandle;
393    use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
394
395    unsafe {
396        let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
397        if !handle.is_null() {
398            TerminateProcess(handle, 1);
399            CloseHandle(handle);
400        }
401    }
402}
403
404#[cfg(unix)]
405fn is_process_alive(pid: u32) -> bool {
406    let Some(pid) = pid_to_i32(pid) else {
407        return false;
408    };
409    let ret = unsafe { libc::kill(pid, 0) };
410    if ret == 0 {
411        return true;
412    }
413    // EPERM means the process exists but we lack permission to signal it
414    std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
415}
416
417#[cfg(windows)]
418fn is_process_alive(pid: u32) -> bool {
419    use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
420    use windows_sys::Win32::System::Threading::{
421        GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
422    };
423
424    unsafe {
425        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
426        if handle.is_null() {
427            return false;
428        }
429        let mut exit_code: u32 = 0;
430        let success = GetExitCodeProcess(handle, &mut exit_code);
431        CloseHandle(handle);
432        success != 0 && exit_code == STILL_ACTIVE as u32
433    }
434}