1use std::path::Path;
2use std::time::Duration;
3
4use crate::error::{Error, Result};
5use crate::node::daemon::health::FleetHealth;
6use crate::node::process::detach;
7use crate::node::types::{
8 DaemonConfig, DaemonInfo, DaemonStartResult, DaemonStatus, DaemonStopResult, NodeStarted,
9 NodeStatusResult, NodeStopped, RemoveNodeResult, StartNodeResult, StopNodeResult,
10};
11
12pub async fn status(config: &DaemonConfig) -> Result<DaemonStatus> {
16 let port = match read_port_file(&config.port_file_path) {
17 Some(port) => port,
18 None => {
19 return Ok(DaemonStatus {
20 running: false,
21 pid: None,
22 port: None,
23 uptime_secs: None,
24 nodes_total: 0,
25 nodes_running: 0,
26 nodes_stopped: 0,
27 nodes_errored: 0,
28 });
29 }
30 };
31
32 let url = format!("http://127.0.0.1:{port}/api/v1/status");
33 match reqwest::get(&url).await {
34 Ok(resp) => resp
35 .json::<DaemonStatus>()
36 .await
37 .map_err(|e| Error::HttpRequest(e.to_string())),
38 Err(_) => Ok(DaemonStatus {
39 running: false,
40 pid: None,
41 port: Some(port),
42 uptime_secs: None,
43 nodes_total: 0,
44 nodes_running: 0,
45 nodes_stopped: 0,
46 nodes_errored: 0,
47 }),
48 }
49}
50
51pub async fn stop(config: &DaemonConfig) -> Result<DaemonStopResult> {
57 let pid = read_pid_file(&config.pid_file_path)?;
58
59 if !is_process_alive(pid) {
62 let _ = std::fs::remove_file(&config.pid_file_path);
64 let _ = std::fs::remove_file(&config.port_file_path);
65 return Ok(DaemonStopResult { pid });
66 }
67
68 if !validate_daemon_process(pid) {
69 let _ = std::fs::remove_file(&config.pid_file_path);
71 let _ = std::fs::remove_file(&config.port_file_path);
72 return Err(Error::DaemonStopFailed(format!(
73 "PID {pid} is alive but does not appear to be the ant daemon (possible PID reuse). \
74 Stale PID file removed."
75 )));
76 }
77
78 send_terminate(pid);
79
80 for _ in 0..50 {
82 tokio::time::sleep(Duration::from_millis(100)).await;
83 if !is_process_alive(pid) {
84 break;
85 }
86 }
87
88 if is_process_alive(pid) {
90 return Err(Error::DaemonStopFailed(format!(
91 "Daemon (PID {pid}) is still alive after 5 seconds"
92 )));
93 }
94
95 let _ = std::fs::remove_file(&config.pid_file_path);
97 let _ = std::fs::remove_file(&config.port_file_path);
98
99 Ok(DaemonStopResult { pid })
100}
101
102pub async fn start(config: &DaemonConfig) -> Result<DaemonStartResult> {
107 if let Some(pid) = check_running(&config.pid_file_path) {
109 let port = read_port_file(&config.port_file_path);
110 return Ok(DaemonStartResult {
111 already_running: true,
112 pid,
113 port,
114 });
115 }
116
117 let exe = std::env::current_exe()
119 .map_err(|e| Error::ProcessSpawn(format!("Failed to get current executable: {e}")))?;
120 let exe_str = exe
121 .to_str()
122 .ok_or_else(|| Error::ProcessSpawn("Executable path is not valid UTF-8".to_string()))?;
123
124 let args = daemon_run_args(config);
125 let arg_refs: Vec<&str> = args.iter().map(String::as_str).collect();
126 let pid = detach::spawn_detached(exe_str, &arg_refs)?;
127
128 let mut port = None;
130 for _ in 0..20 {
131 tokio::time::sleep(Duration::from_millis(100)).await;
132 if let Some(p) = read_port_file(&config.port_file_path) {
133 port = Some(p);
134 break;
135 }
136 }
137
138 Ok(DaemonStartResult {
139 already_running: false,
140 pid,
141 port,
142 })
143}
144
145pub async fn start_node(config: &DaemonConfig, node_id: u32) -> Result<NodeStarted> {
147 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
148
149 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/{node_id}/start");
150 let resp = reqwest::Client::new()
151 .post(&url)
152 .send()
153 .await
154 .map_err(|e| Error::HttpRequest(e.to_string()))?;
155
156 if resp.status().is_success() {
157 resp.json::<NodeStarted>()
158 .await
159 .map_err(|e| Error::HttpRequest(e.to_string()))
160 } else {
161 let body = resp.text().await.unwrap_or_default();
162 Err(Error::HttpRequest(body))
163 }
164}
165
166pub async fn start_all_nodes(config: &DaemonConfig) -> Result<StartNodeResult> {
168 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
169
170 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/start-all");
171 let resp = reqwest::Client::new()
172 .post(&url)
173 .send()
174 .await
175 .map_err(|e| Error::HttpRequest(e.to_string()))?;
176
177 if resp.status().is_success() {
178 resp.json::<StartNodeResult>()
179 .await
180 .map_err(|e| Error::HttpRequest(e.to_string()))
181 } else {
182 let body = resp.text().await.unwrap_or_default();
183 Err(Error::HttpRequest(body))
184 }
185}
186
187pub async fn stop_node(config: &DaemonConfig, node_id: u32) -> Result<NodeStopped> {
189 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
190
191 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/{node_id}/stop");
192 let resp = reqwest::Client::new()
193 .post(&url)
194 .send()
195 .await
196 .map_err(|e| Error::HttpRequest(e.to_string()))?;
197
198 if resp.status().is_success() {
199 resp.json::<NodeStopped>()
200 .await
201 .map_err(|e| Error::HttpRequest(e.to_string()))
202 } else {
203 let body = resp.text().await.unwrap_or_default();
204 Err(Error::HttpRequest(body))
205 }
206}
207
208pub async fn dismiss_node(config: &DaemonConfig, node_id: u32) -> Result<RemoveNodeResult> {
213 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
214
215 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/{node_id}");
216 let resp = reqwest::Client::new()
217 .delete(&url)
218 .send()
219 .await
220 .map_err(|e| Error::HttpRequest(e.to_string()))?;
221
222 if resp.status().is_success() {
223 resp.json::<RemoveNodeResult>()
224 .await
225 .map_err(|e| Error::HttpRequest(e.to_string()))
226 } else {
227 let body = resp.text().await.unwrap_or_default();
228 Err(Error::HttpRequest(body))
229 }
230}
231
232pub async fn fleet_health(config: &DaemonConfig) -> Result<FleetHealth> {
234 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
235
236 let url = format!("http://127.0.0.1:{port}/api/v1/health");
237 let resp = reqwest::get(&url)
238 .await
239 .map_err(|e| Error::HttpRequest(e.to_string()))?;
240
241 if resp.status().is_success() {
242 resp.json::<FleetHealth>()
243 .await
244 .map_err(|e| Error::HttpRequest(e.to_string()))
245 } else {
246 let body = resp.text().await.unwrap_or_default();
247 Err(Error::HttpRequest(body))
248 }
249}
250
251pub async fn node_status(config: &DaemonConfig) -> Result<NodeStatusResult> {
253 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
254
255 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/status");
256 let resp = reqwest::get(&url)
257 .await
258 .map_err(|e| Error::HttpRequest(e.to_string()))?;
259
260 if resp.status().is_success() {
261 resp.json::<NodeStatusResult>()
262 .await
263 .map_err(|e| Error::HttpRequest(e.to_string()))
264 } else {
265 let body = resp.text().await.unwrap_or_default();
266 Err(Error::HttpRequest(body))
267 }
268}
269
270pub async fn stop_all_nodes(config: &DaemonConfig) -> Result<StopNodeResult> {
272 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
273
274 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/stop-all");
275 let resp = reqwest::Client::new()
276 .post(&url)
277 .send()
278 .await
279 .map_err(|e| Error::HttpRequest(e.to_string()))?;
280
281 if resp.status().is_success() {
282 resp.json::<StopNodeResult>()
283 .await
284 .map_err(|e| Error::HttpRequest(e.to_string()))
285 } else {
286 let body = resp.text().await.unwrap_or_default();
287 Err(Error::HttpRequest(body))
288 }
289}
290
291pub fn info(config: &DaemonConfig) -> DaemonInfo {
295 let pid = std::fs::read_to_string(&config.pid_file_path)
296 .ok()
297 .and_then(|s| s.trim().parse::<u32>().ok());
298
299 let port = read_port_file(&config.port_file_path);
300
301 let running = pid.is_some_and(is_process_alive);
302
303 DaemonInfo {
304 running,
305 pid,
306 port,
307 api_base: port.map(|p| format!("http://127.0.0.1:{p}/api/v1")),
308 }
309}
310
311pub async fn run(config: DaemonConfig) -> Result<()> {
315 use crate::node::daemon::server;
316 use crate::node::registry::NodeRegistry;
317
318 let registry = NodeRegistry::load(&config.registry_path)?;
319 let shutdown = tokio_util::sync::CancellationToken::new();
320
321 let shutdown_clone = shutdown.clone();
322 tokio::spawn(async move {
323 tokio::signal::ctrl_c().await.ok();
324 shutdown_clone.cancel();
325 });
326
327 let _addr = server::start(config, registry, shutdown.clone()).await?;
328
329 shutdown.cancelled().await;
330 tokio::time::sleep(Duration::from_millis(100)).await;
332
333 Ok(())
334}
335
336#[cfg(unix)]
339fn validate_daemon_process(pid: u32) -> bool {
340 let cmdline_path = format!("/proc/{pid}/cmdline");
341 match std::fs::read(&cmdline_path) {
342 Ok(raw) => {
343 let args: Vec<String> = raw
348 .split(|&b| b == 0)
349 .filter(|s| !s.is_empty())
350 .map(|s| String::from_utf8_lossy(s).to_string())
351 .collect();
352 let exe_matches = args
353 .first()
354 .and_then(|exe| std::path::Path::new(exe).file_name())
355 .and_then(|name| name.to_str())
356 .is_some_and(|name| name == "ant" || name == "ant.exe");
357 let has_daemon_arg = args.iter().any(|a| a == "daemon");
358 exe_matches && has_daemon_arg
359 }
360 Err(_) => {
361 true
365 }
366 }
367}
368
369#[cfg(windows)]
370fn validate_daemon_process(pid: u32) -> bool {
371 use windows_sys::Win32::Foundation::CloseHandle;
372 use windows_sys::Win32::System::Threading::{
373 OpenProcess, QueryFullProcessImageNameW, PROCESS_QUERY_LIMITED_INFORMATION,
374 };
375
376 unsafe {
377 let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
378 if handle.is_null() {
379 return false;
380 }
381 let mut buf = [0u16; 1024];
382 let mut size = buf.len() as u32;
383 let success = QueryFullProcessImageNameW(handle, 0, buf.as_mut_ptr(), &mut size);
384 CloseHandle(handle);
385
386 if success == 0 {
387 return false;
388 }
389 let path = String::from_utf16_lossy(&buf[..size as usize]);
390 std::path::Path::new(&path)
392 .file_stem()
393 .and_then(|s| s.to_str())
394 .is_some_and(|name| name == "ant")
395 }
396}
397
398fn daemon_run_args(config: &DaemonConfig) -> Vec<String> {
404 let defaults = DaemonConfig::default();
405 let mut args = vec!["node".to_string(), "daemon".to_string(), "run".to_string()];
406 if let Some(port) = config.port {
407 args.push("--port".to_string());
408 args.push(port.to_string());
409 }
410 if config.listen_addr != defaults.listen_addr {
411 args.push("--listen-addr".to_string());
412 args.push(config.listen_addr.to_string());
413 }
414 args
415}
416
417fn read_port_file(path: &Path) -> Option<u16> {
418 std::fs::read_to_string(path)
419 .ok()
420 .and_then(|s| s.trim().parse::<u16>().ok())
421}
422
423fn read_pid_file(path: &Path) -> Result<u32> {
424 let contents = std::fs::read_to_string(path).map_err(|_| Error::DaemonNotRunning)?;
425 contents
426 .trim()
427 .parse::<u32>()
428 .map_err(|_| Error::DaemonNotRunning)
429}
430
431fn check_running(pid_file: &Path) -> Option<u32> {
433 let pid = read_pid_file(pid_file).ok()?;
434 if is_process_alive(pid) {
435 Some(pid)
436 } else {
437 None
438 }
439}
440
441#[cfg(unix)]
442fn pid_to_i32(pid: u32) -> Option<i32> {
443 i32::try_from(pid).ok().filter(|&p| p > 0)
444}
445
446#[cfg(unix)]
447fn send_terminate(pid: u32) {
448 if let Some(pid) = pid_to_i32(pid) {
449 unsafe {
450 libc::kill(pid, libc::SIGTERM);
451 }
452 }
453}
454
455#[cfg(windows)]
456fn send_terminate(pid: u32) {
457 use windows_sys::Win32::Foundation::CloseHandle;
458 use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
459
460 unsafe {
461 let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
462 if !handle.is_null() {
463 TerminateProcess(handle, 1);
464 CloseHandle(handle);
465 }
466 }
467}
468
469#[cfg(unix)]
470fn is_process_alive(pid: u32) -> bool {
471 let Some(pid) = pid_to_i32(pid) else {
472 return false;
473 };
474 let ret = unsafe { libc::kill(pid, 0) };
475 if ret == 0 {
476 return true;
477 }
478 std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
480}
481
482#[cfg(windows)]
483fn is_process_alive(pid: u32) -> bool {
484 use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
485 use windows_sys::Win32::System::Threading::{
486 GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
487 };
488
489 unsafe {
490 let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
491 if handle.is_null() {
492 return false;
493 }
494 let mut exit_code: u32 = 0;
495 let success = GetExitCodeProcess(handle, &mut exit_code);
496 CloseHandle(handle);
497 success != 0 && exit_code == STILL_ACTIVE as u32
498 }
499}
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504 use std::net::Ipv4Addr;
505
506 #[test]
507 fn run_args_default_config_has_no_overrides() {
508 let config = DaemonConfig::default();
509 let args = daemon_run_args(&config);
510 assert_eq!(args, vec!["node", "daemon", "run"]);
511 }
512
513 #[test]
514 fn run_args_forward_explicit_port() {
515 let config = DaemonConfig {
516 port: Some(8765),
517 ..DaemonConfig::default()
518 };
519 let args = daemon_run_args(&config);
520 assert_eq!(args, vec!["node", "daemon", "run", "--port", "8765"]);
521 }
522
523 #[test]
524 fn run_args_forward_explicit_listen_addr() {
525 let config = DaemonConfig {
526 listen_addr: std::net::IpAddr::V4(Ipv4Addr::UNSPECIFIED),
527 ..DaemonConfig::default()
528 };
529 let args = daemon_run_args(&config);
530 assert_eq!(
531 args,
532 vec!["node", "daemon", "run", "--listen-addr", "0.0.0.0"]
533 );
534 }
535
536 #[test]
537 fn run_args_forward_both_overrides() {
538 let config = DaemonConfig {
539 port: Some(8765),
540 listen_addr: std::net::IpAddr::V4(Ipv4Addr::UNSPECIFIED),
541 ..DaemonConfig::default()
542 };
543 let args = daemon_run_args(&config);
544 assert_eq!(
545 args,
546 vec![
547 "node",
548 "daemon",
549 "run",
550 "--port",
551 "8765",
552 "--listen-addr",
553 "0.0.0.0",
554 ]
555 );
556 }
557
558 #[test]
559 fn run_args_forward_explicit_zero_port() {
560 let config = DaemonConfig {
564 port: Some(0),
565 ..DaemonConfig::default()
566 };
567 let args = daemon_run_args(&config);
568 assert_eq!(args, vec!["node", "daemon", "run", "--port", "0"]);
569 }
570}