ant_core/node/daemon/
client.rs1use std::path::Path;
2use std::time::Duration;
3
4use crate::error::{Error, Result};
5use crate::node::process::detach;
6use crate::node::types::{
7 DaemonConfig, DaemonInfo, DaemonStartResult, DaemonStatus, DaemonStopResult, NodeStarted,
8 NodeStatusResult, NodeStopped, StartNodeResult, StopNodeResult,
9};
10
11pub async fn status(config: &DaemonConfig) -> Result<DaemonStatus> {
15 let port = match read_port_file(&config.port_file_path) {
16 Some(port) => port,
17 None => {
18 return Ok(DaemonStatus {
19 running: false,
20 pid: None,
21 port: None,
22 uptime_secs: None,
23 nodes_total: 0,
24 nodes_running: 0,
25 nodes_stopped: 0,
26 nodes_errored: 0,
27 });
28 }
29 };
30
31 let url = format!("http://127.0.0.1:{port}/api/v1/status");
32 match reqwest::get(&url).await {
33 Ok(resp) => resp
34 .json::<DaemonStatus>()
35 .await
36 .map_err(|e| Error::HttpRequest(e.to_string())),
37 Err(_) => Ok(DaemonStatus {
38 running: false,
39 pid: None,
40 port: Some(port),
41 uptime_secs: None,
42 nodes_total: 0,
43 nodes_running: 0,
44 nodes_stopped: 0,
45 nodes_errored: 0,
46 }),
47 }
48}
49
50pub async fn stop(config: &DaemonConfig) -> Result<DaemonStopResult> {
56 let pid = read_pid_file(&config.pid_file_path)?;
57
58 if !is_process_alive(pid) {
61 let _ = std::fs::remove_file(&config.pid_file_path);
63 let _ = std::fs::remove_file(&config.port_file_path);
64 return Ok(DaemonStopResult { pid });
65 }
66
67 if !validate_daemon_process(pid) {
68 let _ = std::fs::remove_file(&config.pid_file_path);
70 let _ = std::fs::remove_file(&config.port_file_path);
71 return Err(Error::DaemonStopFailed(format!(
72 "PID {pid} is alive but does not appear to be the ant daemon (possible PID reuse). \
73 Stale PID file removed."
74 )));
75 }
76
77 send_terminate(pid);
78
79 for _ in 0..50 {
81 tokio::time::sleep(Duration::from_millis(100)).await;
82 if !is_process_alive(pid) {
83 break;
84 }
85 }
86
87 if is_process_alive(pid) {
89 return Err(Error::DaemonStopFailed(format!(
90 "Daemon (PID {pid}) is still alive after 5 seconds"
91 )));
92 }
93
94 let _ = std::fs::remove_file(&config.pid_file_path);
96 let _ = std::fs::remove_file(&config.port_file_path);
97
98 Ok(DaemonStopResult { pid })
99}
100
101pub async fn start(config: &DaemonConfig) -> Result<DaemonStartResult> {
106 if let Some(pid) = check_running(&config.pid_file_path) {
108 let port = read_port_file(&config.port_file_path);
109 return Ok(DaemonStartResult {
110 already_running: true,
111 pid,
112 port,
113 });
114 }
115
116 let exe = std::env::current_exe()
118 .map_err(|e| Error::ProcessSpawn(format!("Failed to get current executable: {e}")))?;
119 let exe_str = exe
120 .to_str()
121 .ok_or_else(|| Error::ProcessSpawn("Executable path is not valid UTF-8".to_string()))?;
122
123 let pid = detach::spawn_detached(exe_str, &["node", "daemon", "run"])?;
124
125 let mut port = None;
127 for _ in 0..20 {
128 tokio::time::sleep(Duration::from_millis(100)).await;
129 if let Some(p) = read_port_file(&config.port_file_path) {
130 port = Some(p);
131 break;
132 }
133 }
134
135 Ok(DaemonStartResult {
136 already_running: false,
137 pid,
138 port,
139 })
140}
141
142pub async fn start_node(config: &DaemonConfig, node_id: u32) -> Result<NodeStarted> {
144 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
145
146 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/{node_id}/start");
147 let resp = reqwest::Client::new()
148 .post(&url)
149 .send()
150 .await
151 .map_err(|e| Error::HttpRequest(e.to_string()))?;
152
153 if resp.status().is_success() {
154 resp.json::<NodeStarted>()
155 .await
156 .map_err(|e| Error::HttpRequest(e.to_string()))
157 } else {
158 let body = resp.text().await.unwrap_or_default();
159 Err(Error::HttpRequest(body))
160 }
161}
162
163pub async fn start_all_nodes(config: &DaemonConfig) -> Result<StartNodeResult> {
165 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
166
167 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/start-all");
168 let resp = reqwest::Client::new()
169 .post(&url)
170 .send()
171 .await
172 .map_err(|e| Error::HttpRequest(e.to_string()))?;
173
174 if resp.status().is_success() {
175 resp.json::<StartNodeResult>()
176 .await
177 .map_err(|e| Error::HttpRequest(e.to_string()))
178 } else {
179 let body = resp.text().await.unwrap_or_default();
180 Err(Error::HttpRequest(body))
181 }
182}
183
184pub async fn stop_node(config: &DaemonConfig, node_id: u32) -> Result<NodeStopped> {
186 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
187
188 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/{node_id}/stop");
189 let resp = reqwest::Client::new()
190 .post(&url)
191 .send()
192 .await
193 .map_err(|e| Error::HttpRequest(e.to_string()))?;
194
195 if resp.status().is_success() {
196 resp.json::<NodeStopped>()
197 .await
198 .map_err(|e| Error::HttpRequest(e.to_string()))
199 } else {
200 let body = resp.text().await.unwrap_or_default();
201 Err(Error::HttpRequest(body))
202 }
203}
204
205pub async fn node_status(config: &DaemonConfig) -> Result<NodeStatusResult> {
207 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
208
209 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/status");
210 let resp = reqwest::get(&url)
211 .await
212 .map_err(|e| Error::HttpRequest(e.to_string()))?;
213
214 if resp.status().is_success() {
215 resp.json::<NodeStatusResult>()
216 .await
217 .map_err(|e| Error::HttpRequest(e.to_string()))
218 } else {
219 let body = resp.text().await.unwrap_or_default();
220 Err(Error::HttpRequest(body))
221 }
222}
223
224pub async fn stop_all_nodes(config: &DaemonConfig) -> Result<StopNodeResult> {
226 let port = read_port_file(&config.port_file_path).ok_or(Error::DaemonNotRunning)?;
227
228 let url = format!("http://127.0.0.1:{port}/api/v1/nodes/stop-all");
229 let resp = reqwest::Client::new()
230 .post(&url)
231 .send()
232 .await
233 .map_err(|e| Error::HttpRequest(e.to_string()))?;
234
235 if resp.status().is_success() {
236 resp.json::<StopNodeResult>()
237 .await
238 .map_err(|e| Error::HttpRequest(e.to_string()))
239 } else {
240 let body = resp.text().await.unwrap_or_default();
241 Err(Error::HttpRequest(body))
242 }
243}
244
245pub fn info(config: &DaemonConfig) -> DaemonInfo {
249 let pid = std::fs::read_to_string(&config.pid_file_path)
250 .ok()
251 .and_then(|s| s.trim().parse::<u32>().ok());
252
253 let port = read_port_file(&config.port_file_path);
254
255 let running = pid.is_some_and(is_process_alive);
256
257 DaemonInfo {
258 running,
259 pid,
260 port,
261 api_base: port.map(|p| format!("http://127.0.0.1:{p}/api/v1")),
262 }
263}
264
265pub async fn run(config: DaemonConfig) -> Result<()> {
269 use crate::node::daemon::server;
270 use crate::node::registry::NodeRegistry;
271
272 let registry = NodeRegistry::load(&config.registry_path)?;
273 let shutdown = tokio_util::sync::CancellationToken::new();
274
275 let shutdown_clone = shutdown.clone();
276 tokio::spawn(async move {
277 tokio::signal::ctrl_c().await.ok();
278 shutdown_clone.cancel();
279 });
280
281 let _addr = server::start(config, registry, shutdown.clone()).await?;
282
283 shutdown.cancelled().await;
284 tokio::time::sleep(Duration::from_millis(100)).await;
286
287 Ok(())
288}
289
290#[cfg(unix)]
293fn validate_daemon_process(pid: u32) -> bool {
294 let cmdline_path = format!("/proc/{pid}/cmdline");
295 match std::fs::read(&cmdline_path) {
296 Ok(raw) => {
297 let args: Vec<String> = raw
302 .split(|&b| b == 0)
303 .filter(|s| !s.is_empty())
304 .map(|s| String::from_utf8_lossy(s).to_string())
305 .collect();
306 let exe_matches = args
307 .first()
308 .and_then(|exe| std::path::Path::new(exe).file_name())
309 .and_then(|name| name.to_str())
310 .is_some_and(|name| name == "ant" || name == "ant.exe");
311 let has_daemon_arg = args.iter().any(|a| a == "daemon");
312 exe_matches && has_daemon_arg
313 }
314 Err(_) => {
315 true
319 }
320 }
321}
322
323#[cfg(windows)]
324fn validate_daemon_process(pid: u32) -> bool {
325 use windows_sys::Win32::Foundation::CloseHandle;
326 use windows_sys::Win32::System::Threading::{
327 OpenProcess, QueryFullProcessImageNameW, PROCESS_QUERY_LIMITED_INFORMATION,
328 };
329
330 unsafe {
331 let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
332 if handle.is_null() {
333 return false;
334 }
335 let mut buf = [0u16; 1024];
336 let mut size = buf.len() as u32;
337 let success = QueryFullProcessImageNameW(handle, 0, buf.as_mut_ptr(), &mut size);
338 CloseHandle(handle);
339
340 if success == 0 {
341 return false;
342 }
343 let path = String::from_utf16_lossy(&buf[..size as usize]);
344 std::path::Path::new(&path)
346 .file_stem()
347 .and_then(|s| s.to_str())
348 .is_some_and(|name| name == "ant")
349 }
350}
351
352fn read_port_file(path: &Path) -> Option<u16> {
353 std::fs::read_to_string(path)
354 .ok()
355 .and_then(|s| s.trim().parse::<u16>().ok())
356}
357
358fn read_pid_file(path: &Path) -> Result<u32> {
359 let contents = std::fs::read_to_string(path).map_err(|_| Error::DaemonNotRunning)?;
360 contents
361 .trim()
362 .parse::<u32>()
363 .map_err(|_| Error::DaemonNotRunning)
364}
365
366fn check_running(pid_file: &Path) -> Option<u32> {
368 let pid = read_pid_file(pid_file).ok()?;
369 if is_process_alive(pid) {
370 Some(pid)
371 } else {
372 None
373 }
374}
375
376#[cfg(unix)]
377fn pid_to_i32(pid: u32) -> Option<i32> {
378 i32::try_from(pid).ok().filter(|&p| p > 0)
379}
380
381#[cfg(unix)]
382fn send_terminate(pid: u32) {
383 if let Some(pid) = pid_to_i32(pid) {
384 unsafe {
385 libc::kill(pid, libc::SIGTERM);
386 }
387 }
388}
389
390#[cfg(windows)]
391fn send_terminate(pid: u32) {
392 use windows_sys::Win32::Foundation::CloseHandle;
393 use windows_sys::Win32::System::Threading::{OpenProcess, TerminateProcess, PROCESS_TERMINATE};
394
395 unsafe {
396 let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
397 if !handle.is_null() {
398 TerminateProcess(handle, 1);
399 CloseHandle(handle);
400 }
401 }
402}
403
404#[cfg(unix)]
405fn is_process_alive(pid: u32) -> bool {
406 let Some(pid) = pid_to_i32(pid) else {
407 return false;
408 };
409 let ret = unsafe { libc::kill(pid, 0) };
410 if ret == 0 {
411 return true;
412 }
413 std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
415}
416
417#[cfg(windows)]
418fn is_process_alive(pid: u32) -> bool {
419 use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
420 use windows_sys::Win32::System::Threading::{
421 GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
422 };
423
424 unsafe {
425 let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
426 if handle.is_null() {
427 return false;
428 }
429 let mut exit_code: u32 = 0;
430 let success = GetExitCodeProcess(handle, &mut exit_code);
431 CloseHandle(handle);
432 success != 0 && exit_code == STILL_ACTIVE as u32
433 }
434}