1use std::path::Path;
2use std::time::Duration;
3
4use crate::error::{Error, Result};
5use crate::node::process::detach;
6use crate::node::types::{
7 DaemonConfig, DaemonInfo, DaemonStartResult, DaemonStatus, DaemonStopResult, NodeStarted,
8 NodeStatusResult, NodeStopped, StartNodeResult, StopNodeResult,
9};
10
11pub async fn status(config: &DaemonConfig) -> Result<DaemonStatus> {
15 let port = match read_port_file(&config.port_file_path) {
16 Some(port) => port,
17 None => {
18 return Ok(DaemonStatus {
19 running: false,
20 pid: None,
21 port: None,
22 uptime_secs: None,
23 nodes_total: 0,
24 nodes_running: 0,
25 nodes_stopped: 0,
26 nodes_errored: 0,
27 });
28 }
29 };
30
31 let url = format!("http://127.0.0.1:{port}/api/v1/status");
32 match reqwest::get(&url).await {
33 Ok(resp) => resp
34 .json::<DaemonStatus>()
35 .await
36 .map_err(|e| Error::HttpRequest(e.to_string())),
37 Err(_) => Ok(DaemonStatus {
38 running: false,
39 pid: None,
40 port: Some(port),
41 uptime_secs: None,
42 nodes_total: 0,
43 nodes_running: 0,
44 nodes_stopped: 0,
45 nodes_errored: 0,
46 }),
47 }
48}
49
50pub async fn stop(config: &DaemonConfig) -> Result<DaemonStopResult> {
56 let pid = read_pid_file(&config.pid_file_path)?;
57
58 if !is_process_alive(pid) {
61 let _ = std::fs::remove_file(&config.pid_file_path);
63 let _ = std::fs::remove_file(&config.port_file_path);
64 return Ok(DaemonStopResult { pid });
65 }
66
67 if !validate_daemon_process(pid) {
68 let _ = std::fs::remove_file(&config.pid_file_path);
70 let _ = std::fs::remove_file(&config.port_file_path);
71 return Err(Error::DaemonStopFailed(format!(
72 "PID {pid} is alive but does not appear to be the ant daemon (possible PID reuse). \
73 Stale PID file removed."
74 )));
75 }
76
77 send_terminate(pid);
78
79 for _ in 0..50 {
81 tokio::time::sleep(Duration::from_millis(100)).await;
82 if !is_process_alive(pid) {
83 break;
84 }
85 }
86
87 if is_process_alive(pid) {
89 return Err(Error::DaemonStopFailed(format!(
90 "Daemon (PID {pid}) is still alive after 5 seconds"
91 )));
92 }
93
94 let _ = std::fs::remove_file(&config.pid_file_path);
96 let _ = std::fs::remove_file(&config.port_file_path);
97
98 Ok(DaemonStopResult { pid })
99}
100
101pub async fn start(config: &DaemonConfig) -> Result<DaemonStartResult> {
106 if let Some(pid) = check_running(&config.pid_file_path) {
108 let port = read_port_file(&config.port_file_path);
109 return Ok(DaemonStartResult {
110 already_running: true,
111 pid,
112 port,
113 });
114 }
115
116 let exe = std::env::current_exe()
118 .map_err(|e| Error::ProcessSpawn(format!("Failed to get current executable: {e}")))?;
119 let exe_str = exe
120 .to_str()
121 .ok_or_else(|| Error::ProcessSpawn("Executable path is not valid UTF-8".to_string()))?;
122
123 let args = daemon_run_args(config);
124 let arg_refs: Vec<&str> = args.iter().map(String::as_str).collect();
125 let pid = detach::spawn_detached(exe_str, &arg_refs)?;
126
127 let mut port = None;
129 for _ in 0..20 {
130 tokio::time::sleep(Duration::from_millis(100)).await;
131 if let Some(p) = read_port_file(&config.port_file_path) {
132 port = Some(p);
133 break;
134 }
135 }
136
137 Ok(DaemonStartResult {
138 already_running: false,
139 pid,
140 port,
141 })
142}
143
144pub async fn start_node(config: &DaemonConfig, node_id: u32) -> Result<NodeStarted> {
146 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
147
148 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/{node_id}/start");
149 let resp = reqwest::Client::new()
150 .post(&url)
151 .send()
152 .await
153 .map_err(|e| Error::HttpRequest(e.to_string()))?;
154
155 if resp.status().is_success() {
156 resp.json::<NodeStarted>()
157 .await
158 .map_err(|e| Error::HttpRequest(e.to_string()))
159 } else {
160 let body = resp.text().await.unwrap_or_default();
161 Err(Error::HttpRequest(body))
162 }
163}
164
165pub async fn start_all_nodes(config: &DaemonConfig) -> Result<StartNodeResult> {
167 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
168
169 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/start-all");
170 let resp = reqwest::Client::new()
171 .post(&url)
172 .send()
173 .await
174 .map_err(|e| Error::HttpRequest(e.to_string()))?;
175
176 if resp.status().is_success() {
177 resp.json::<StartNodeResult>()
178 .await
179 .map_err(|e| Error::HttpRequest(e.to_string()))
180 } else {
181 let body = resp.text().await.unwrap_or_default();
182 Err(Error::HttpRequest(body))
183 }
184}
185
186pub async fn stop_node(config: &DaemonConfig, node_id: u32) -> Result<NodeStopped> {
188 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
189
190 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/{node_id}/stop");
191 let resp = reqwest::Client::new()
192 .post(&url)
193 .send()
194 .await
195 .map_err(|e| Error::HttpRequest(e.to_string()))?;
196
197 if resp.status().is_success() {
198 resp.json::<NodeStopped>()
199 .await
200 .map_err(|e| Error::HttpRequest(e.to_string()))
201 } else {
202 let body = resp.text().await.unwrap_or_default();
203 Err(Error::HttpRequest(body))
204 }
205}
206
207pub async fn node_status(config: &DaemonConfig) -> Result<NodeStatusResult> {
209 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
210
211 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/status");
212 let resp = reqwest::get(&url)
213 .await
214 .map_err(|e| Error::HttpRequest(e.to_string()))?;
215
216 if resp.status().is_success() {
217 resp.json::<NodeStatusResult>()
218 .await
219 .map_err(|e| Error::HttpRequest(e.to_string()))
220 } else {
221 let body = resp.text().await.unwrap_or_default();
222 Err(Error::HttpRequest(body))
223 }
224}
225
226pub async fn stop_all_nodes(config: &DaemonConfig) -> Result<StopNodeResult> {
228 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
229
230 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/stop-all");
231 let resp = reqwest::Client::new()
232 .post(&url)
233 .send()
234 .await
235 .map_err(|e| Error::HttpRequest(e.to_string()))?;
236
237 if resp.status().is_success() {
238 resp.json::<StopNodeResult>()
239 .await
240 .map_err(|e| Error::HttpRequest(e.to_string()))
241 } else {
242 let body = resp.text().await.unwrap_or_default();
243 Err(Error::HttpRequest(body))
244 }
245}
246
247pub fn info(config: &DaemonConfig) -> DaemonInfo {
251 let pid = std::fs::read_to_string(&config.pid_file_path)
252 .ok()
253 .and_then(|s| s.trim().parse::<u32>().ok());
254
255 let port = read_port_file(&config.port_file_path);
256
257 let running = pid.is_some_and(is_process_alive);
258
259 DaemonInfo {
260 running,
261 pid,
262 port,
263 api_base: port.map(|p| format!("http://127.0.0.1:{p}/api/v1")),
264 }
265}
266
267pub async fn run(config: DaemonConfig) -> Result<()> {
271 use crate::node::daemon::server;
272 use crate::node::registry::NodeRegistry;
273
274 let registry = NodeRegistry::load(&config.registry_path)?;
275 let shutdown = tokio_util::sync::CancellationToken::new();
276
277 let shutdown_clone = shutdown.clone();
278 tokio::spawn(async move {
279 tokio::signal::ctrl_c().await.ok();
280 shutdown_clone.cancel();
281 });
282
283 let _addr = server::start(config, registry, shutdown.clone()).await?;
284
285 shutdown.cancelled().await;
286 tokio::time::sleep(Duration::from_millis(100)).await;
288
289 Ok(())
290}
291
292#[cfg(unix)]
295fn validate_daemon_process(pid: u32) -> bool {
296 let cmdline_path = format!("/proc/{pid}/cmdline");
297 match std::fs::read(&cmdline_path) {
298 Ok(raw) => {
299 let args: Vec<String> = raw
304 .split(|&b| b == 0)
305 .filter(|s| !s.is_empty())
306 .map(|s| String::from_utf8_lossy(s).to_string())
307 .collect();
308 let exe_matches = args
309 .first()
310 .and_then(|exe| std::path::Path::new(exe).file_name())
311 .and_then(|name| name.to_str())
312 .is_some_and(|name| name == "ant" || name == "ant.exe");
313 let has_daemon_arg = args.iter().any(|a| a == "daemon");
314 exe_matches && has_daemon_arg
315 }
316 Err(_) => {
317 true
321 }
322 }
323}
324
325#[cfg(windows)]
326fn validate_daemon_process(pid: u32) -> bool {
327 use windows_sys::Win32::Foundation::CloseHandle;
328 use windows_sys::Win32::System::Threading::{
329 OpenProcess, QueryFullProcessImageNameW, PROCESS_QUERY_LIMITED_INFORMATION,
330 };
331
332 unsafe {
333 let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
334 if handle.is_null() {
335 return false;
336 }
337 let mut buf = [0u16; 1024];
338 let mut size = buf.len() as u32;
339 let success = QueryFullProcessImageNameW(handle, 0, buf.as_mut_ptr(), &mut size);
340 CloseHandle(handle);
341
342 if success == 0 {
343 return false;
344 }
345 let path = String::from_utf16_lossy(&buf[..size as usize]);
346 std::path::Path::new(&path)
348 .file_stem()
349 .and_then(|s| s.to_str())
350 .is_some_and(|name| name == "ant")
351 }
352}
353
354fn daemon_run_args(config: &DaemonConfig) -> Vec<String> {
360 let defaults = DaemonConfig::default();
361 let mut args = vec!["node".to_string(), "daemon".to_string(), "run".to_string()];
362 if let Some(port) = config.port {
363 args.push("--port".to_string());
364 args.push(port.to_string());
365 }
366 if config.listen_addr != defaults.listen_addr {
367 args.push("--listen-addr".to_string());
368 args.push(config.listen_addr.to_string());
369 }
370 args
371}
372
373fn read_port_file(path: &Path) -> Option<u16> {
374 std::fs::read_to_string(path)
375 .ok()
376 .and_then(|s| s.trim().parse::<u16>().ok())
377}
378
379fn read_pid_file(path: &Path) -> Result<u32> {
380 let contents = std::fs::read_to_string(path).map_err(|_| Error::DaemonNotRunning)?;
381 contents
382 .trim()
383 .parse::<u32>()
384 .map_err(|_| Error::DaemonNotRunning)
385}
386
387fn check_running(pid_file: &Path) -> Option<u32> {
389 let pid = read_pid_file(pid_file).ok()?;
390 if is_process_alive(pid) {
391 Some(pid)
392 } else {
393 None
394 }
395}
396
397#[cfg(unix)]
398fn pid_to_i32(pid: u32) -> Option<i32> {
399 i32::try_from(pid).ok().filter(|&p| p > 0)
400}
401
402#[cfg(unix)]
403fn send_terminate(pid: u32) {
404 if let Some(pid) = pid_to_i32(pid) {
405 unsafe {
406 libc::kill(pid, libc::SIGTERM);
407 }
408 }
409}
410
411#[cfg(windows)]
412fn send_terminate(pid: u32) {
413 use windows_sys::Win32::Foundation::CloseHandle;
414 use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
415
416 unsafe {
417 let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
418 if !handle.is_null() {
419 TerminateProcess(handle, 1);
420 CloseHandle(handle);
421 }
422 }
423}
424
425#[cfg(unix)]
426fn is_process_alive(pid: u32) -> bool {
427 let Some(pid) = pid_to_i32(pid) else {
428 return false;
429 };
430 let ret = unsafe { libc::kill(pid, 0) };
431 if ret == 0 {
432 return true;
433 }
434 std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
436}
437
438#[cfg(windows)]
439fn is_process_alive(pid: u32) -> bool {
440 use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
441 use windows_sys::Win32::System::Threading::{
442 GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
443 };
444
445 unsafe {
446 let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
447 if handle.is_null() {
448 return false;
449 }
450 let mut exit_code: u32 = 0;
451 let success = GetExitCodeProcess(handle, &mut exit_code);
452 CloseHandle(handle);
453 success != 0 && exit_code == STILL_ACTIVE as u32
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use std::net::Ipv4Addr;
461
462 #[test]
463 fn run_args_default_config_has_no_overrides() {
464 let config = DaemonConfig::default();
465 let args = daemon_run_args(&config);
466 assert_eq!(args, vec!["node", "daemon", "run"]);
467 }
468
469 #[test]
470 fn run_args_forward_explicit_port() {
471 let config = DaemonConfig {
472 port: Some(8765),
473 ..DaemonConfig::default()
474 };
475 let args = daemon_run_args(&config);
476 assert_eq!(args, vec!["node", "daemon", "run", "--port", "8765"]);
477 }
478
479 #[test]
480 fn run_args_forward_explicit_listen_addr() {
481 let config = DaemonConfig {
482 listen_addr: std::net::IpAddr::V4(Ipv4Addr::UNSPECIFIED),
483 ..DaemonConfig::default()
484 };
485 let args = daemon_run_args(&config);
486 assert_eq!(
487 args,
488 vec!["node", "daemon", "run", "--listen-addr", "0.0.0.0"]
489 );
490 }
491
492 #[test]
493 fn run_args_forward_both_overrides() {
494 let config = DaemonConfig {
495 port: Some(8765),
496 listen_addr: std::net::IpAddr::V4(Ipv4Addr::UNSPECIFIED),
497 ..DaemonConfig::default()
498 };
499 let args = daemon_run_args(&config);
500 assert_eq!(
501 args,
502 vec![
503 "node",
504 "daemon",
505 "run",
506 "--port",
507 "8765",
508 "--listen-addr",
509 "0.0.0.0",
510 ]
511 );
512 }
513
514 #[test]
515 fn run_args_forward_explicit_zero_port() {
516 let config = DaemonConfig {
520 port: Some(0),
521 ..DaemonConfig::default()
522 };
523 let args = daemon_run_args(&config);
524 assert_eq!(args, vec!["node", "daemon", "run", "--port", "0"]);
525 }
526}