Skip to main content

ant_core/node/daemon/
server.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Instant;
5
6use axum::extract::{Path, State};
7use axum::http::StatusCode;
8use axum::response::sse::{Event, Sse};
9use axum::response::{Html, IntoResponse};
10use axum::routing::{get, post};
11use axum::{Json, Router};
12use tokio::sync::broadcast;
13use tokio::sync::RwLock;
14use tokio_util::sync::CancellationToken;
15
16use crate::error::Result;
17use crate::node::binary::NoopProgress;
18use crate::node::daemon::health::{DiskThresholds, FleetHealth};
19use crate::node::daemon::supervisor::{
20    spawn_eviction_monitor, spawn_liveness_monitor, spawn_upgrade_monitor, Supervisor,
21    EVICTION_POLL_INTERVAL, LIVENESS_POLL_INTERVAL, UPGRADE_POLL_INTERVAL,
22};
23use crate::node::events::NodeEvent;
24use crate::node::registry::NodeRegistry;
25use crate::node::types::{
26    AddNodeOpts, AddNodeResult, DaemonConfig, DaemonStatus, NodeInfo, NodeStarted, NodeStatus,
27    NodeStatusResult, NodeStatusSummary, NodeStopped, RemoveNodeResult, ResetResult,
28    StartNodeResult, StopNodeResult,
29};
30
31/// Shared application state for the daemon HTTP server.
32pub struct AppState {
33    pub registry: Arc<RwLock<NodeRegistry>>,
34    pub supervisor: Arc<RwLock<Supervisor>>,
35    pub event_tx: broadcast::Sender<NodeEvent>,
36    pub start_time: Instant,
37    pub config: DaemonConfig,
38    /// The actual address the server bound to (resolves port 0 to real port).
39    pub bound_port: u16,
40    /// Latest fleet health snapshot, refreshed by the eviction monitor and served at
41    /// `GET /api/v1/health`.
42    pub health: Arc<RwLock<FleetHealth>>,
43}
44
45/// Start the daemon HTTP server.
46///
47/// Returns the actual address the server bound to (useful when port is 0).
48pub async fn start(
49    config: DaemonConfig,
50    mut registry: NodeRegistry,
51    shutdown: CancellationToken,
52) -> Result<SocketAddr> {
53    let (event_tx, _) = broadcast::channel(256);
54
55    let addr = SocketAddr::new(config.listen_addr, config.port.unwrap_or(0));
56    let listener = tokio::net::TcpListener::bind(addr)
57        .await
58        .map_err(|e| crate::error::Error::BindError(e.to_string()))?;
59    let bound_addr = listener
60        .local_addr()
61        .map_err(|e| crate::error::Error::BindError(e.to_string()))?;
62
63    // Heal any stale `version` entries in the registry. If an earlier daemon ran without the
64    // upgrade-aware supervisor, the on-disk binary may have been replaced without the registry
65    // being updated. We re-read each binary's version and persist any differences before the
66    // supervisor comes up, so subsequent status queries reflect reality.
67    reconcile_registry_versions(&mut registry).await;
68
69    let registry = Arc::new(RwLock::new(registry));
70    let supervisor = Arc::new(RwLock::new(Supervisor::new(event_tx.clone())));
71
72    // Adopt node processes spawned by a previous daemon instance. Must run before
73    // `axum::serve` starts accepting requests — the window between supervisor
74    // creation and adoption is where `/api/v1/nodes/status` would otherwise report
75    // live nodes as Stopped (the supervisor's default when it has no runtime entry).
76    {
77        let reg = registry.read().await;
78        let mut sup = supervisor.write().await;
79        let adopted = sup.adopt_from_registry(&reg);
80        if !adopted.is_empty() {
81            tracing::info!(
82                "Adopted {} running node(s) from a previous daemon instance: {:?}",
83                adopted.len(),
84                adopted
85            );
86        }
87    }
88
89    let health = Arc::new(RwLock::new(FleetHealth::healthy()));
90
91    let state = Arc::new(AppState {
92        registry: registry.clone(),
93        supervisor: supervisor.clone(),
94        event_tx: event_tx.clone(),
95        start_time: Instant::now(),
96        config: config.clone(),
97        bound_port: bound_addr.port(),
98        health: health.clone(),
99    });
100
101    // Background task: probe each Running node's on-disk binary for version drift caused by
102    // ant-node's auto-upgrade, and flip them to UpgradeScheduled so the supervisor knows the
103    // next exit is expected.
104    spawn_upgrade_monitor(
105        registry.clone(),
106        supervisor.clone(),
107        UPGRADE_POLL_INTERVAL,
108        shutdown.clone(),
109    );
110
111    // Background task: monitor free disk space at node data directories. Refreshes the fleet health
112    // snapshot every tick and auto-evicts a node (smallest data dir) on any partition that has
113    // fallen to the eviction threshold while ≥2 nodes remain. The threshold is a fixed internal
114    // constant (mirroring ant-node's own refuse-to-store reserve), not user-configurable.
115    spawn_eviction_monitor(
116        registry.clone(),
117        supervisor.clone(),
118        event_tx.clone(),
119        health,
120        DiskThresholds::default(),
121        EVICTION_POLL_INTERVAL,
122        shutdown.clone(),
123    );
124
125    // Background task: poll adopted nodes' PIDs for OS liveness. Daemon-spawned nodes
126    // get exit detection via `monitor_node`'s owned `Child` handle; adopted nodes don't,
127    // so this poll is the only way the supervisor learns when one of them exits.
128    spawn_liveness_monitor(
129        registry,
130        supervisor,
131        event_tx,
132        LIVENESS_POLL_INTERVAL,
133        shutdown.clone(),
134    );
135
136    let app = build_router(state.clone());
137
138    // Write port and PID files
139    write_file(&config.port_file_path, &bound_addr.port().to_string())?;
140    write_file(&config.pid_file_path, &std::process::id().to_string())?;
141
142    let port_file = config.port_file_path.clone();
143    let pid_file = config.pid_file_path.clone();
144
145    tokio::spawn(async move {
146        axum::serve(listener, app)
147            .with_graceful_shutdown(shutdown.cancelled_owned())
148            .await
149            .ok();
150
151        // Clean up port and PID files on shutdown
152        let _ = std::fs::remove_file(&port_file);
153        let _ = std::fs::remove_file(&pid_file);
154    });
155
156    Ok(bound_addr)
157}
158
159fn build_router(state: Arc<AppState>) -> Router {
160    use axum::http::HeaderValue;
161    use tower_http::cors::{Any, CorsLayer};
162
163    // Restrict CORS to the daemon's own origin to prevent cross-origin CSRF
164    // attacks from malicious webpages. Non-browser clients (CLI, AI agents)
165    // don't send Origin headers so CORS doesn't affect them.
166    let origin = format!("http://127.0.0.1:{}", state.bound_port);
167    let cors = CorsLayer::new()
168        .allow_origin([origin.parse::<HeaderValue>().unwrap()])
169        .allow_methods(Any)
170        .allow_headers(Any);
171
172    Router::new()
173        .route("/console", get(get_console))
174        .route("/api/v1/status", get(get_status))
175        .route("/api/v1/health", get(get_health))
176        .route("/api/v1/events", get(get_events))
177        .route("/api/v1/nodes/status", get(get_nodes_status))
178        .route("/api/v1/nodes", post(post_nodes))
179        .route(
180            "/api/v1/nodes/{id}",
181            get(get_node_detail).delete(delete_node),
182        )
183        .route("/api/v1/nodes/{id}/start", post(post_start_node))
184        .route("/api/v1/nodes/start-all", post(post_start_all))
185        .route("/api/v1/nodes/{id}/stop", post(post_stop_node))
186        .route("/api/v1/nodes/stop-all", post(post_stop_all))
187        .route("/api/v1/reset", post(post_reset))
188        .route("/api/v1/openapi.json", get(get_openapi))
189        .layer(cors)
190        .with_state(state)
191}
192
193async fn get_status(State(state): State<Arc<AppState>>) -> Json<DaemonStatus> {
194    let registry = state.registry.read().await;
195    let supervisor = state.supervisor.read().await;
196    let (running, stopped, errored) = supervisor.node_counts();
197
198    Json(DaemonStatus {
199        running: true,
200        pid: Some(std::process::id()),
201        port: Some(state.bound_port),
202        uptime_secs: Some(state.start_time.elapsed().as_secs()),
203        nodes_total: registry.len() as u32,
204        nodes_running: running,
205        nodes_stopped: stopped,
206        nodes_errored: errored,
207    })
208}
209
210/// GET /api/v1/health — Current fleet health (overall level + per-check findings).
211///
212/// Refreshed by the eviction monitor; reflects disk pressure and the next eviction candidate.
213async fn get_health(State(state): State<Arc<AppState>>) -> Json<FleetHealth> {
214    Json(state.health.read().await.clone())
215}
216
217async fn get_events(
218    State(state): State<Arc<AppState>>,
219) -> Sse<impl futures_core::Stream<Item = std::result::Result<Event, std::convert::Infallible>>> {
220    let mut rx = state.event_tx.subscribe();
221
222    let stream = async_stream::stream! {
223        loop {
224            match rx.recv().await {
225                Ok(event) => {
226                    let event_type = event.event_type().to_string();
227                    if let Ok(data) = serde_json::to_string(&event) {
228                        yield Ok(Event::default().event(event_type).data(data));
229                    }
230                }
231                Err(broadcast::error::RecvError::Lagged(_)) => continue,
232                Err(broadcast::error::RecvError::Closed) => break,
233            }
234        }
235    };
236
237    Sse::new(stream)
238}
239
240/// GET /api/v1/nodes/status — Get status of all registered nodes.
241async fn get_nodes_status(State(state): State<Arc<AppState>>) -> Json<NodeStatusResult> {
242    let registry = state.registry.read().await;
243    let supervisor = state.supervisor.read().await;
244
245    let mut nodes = Vec::new();
246    let mut total_running = 0u32;
247    let mut total_stopped = 0u32;
248
249    for config in registry.list() {
250        // An evicted node has no live process: its persisted marker takes precedence over any
251        // runtime status the supervisor might still report.
252        let status = if config.eviction.is_some() {
253            NodeStatus::Evicted
254        } else {
255            supervisor
256                .node_status(config.id)
257                .unwrap_or(NodeStatus::Stopped)
258        };
259
260        match status {
261            NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
262                total_running += 1
263            }
264            _ => total_stopped += 1,
265        }
266
267        let (pid, uptime_secs, pending_version) = if config.eviction.is_some() {
268            (None, None, None)
269        } else {
270            (
271                supervisor.node_pid(config.id),
272                supervisor.node_uptime_secs(config.id),
273                supervisor.node_pending_version(config.id),
274            )
275        };
276
277        nodes.push(NodeStatusSummary {
278            node_id: config.id,
279            name: config.service_name.clone(),
280            version: config.version.clone(),
281            status,
282            pid,
283            uptime_secs,
284            pending_version,
285            eviction: config.eviction.clone(),
286        });
287    }
288
289    Json(NodeStatusResult {
290        nodes,
291        total_running,
292        total_stopped,
293    })
294}
295
296/// GET /api/v1/nodes/:id — Get full detail for a single node.
297async fn get_node_detail(
298    State(state): State<Arc<AppState>>,
299    Path(id): Path<u32>,
300) -> std::result::Result<Json<NodeInfo>, (StatusCode, Json<serde_json::Value>)> {
301    let registry = state.registry.read().await;
302    let config = match registry.get(id) {
303        Ok(config) => config.clone(),
304        Err(_) => {
305            return Err((
306                StatusCode::NOT_FOUND,
307                Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
308            ))
309        }
310    };
311
312    let supervisor = state.supervisor.read().await;
313    // A persisted eviction marker takes precedence over any runtime status.
314    let (status, pid, uptime_secs, pending_version) = if config.eviction.is_some() {
315        (NodeStatus::Evicted, None, None, None)
316    } else {
317        (
318            supervisor.node_status(id).unwrap_or(NodeStatus::Stopped),
319            supervisor.node_pid(id),
320            supervisor.node_uptime_secs(id),
321            supervisor.node_pending_version(id),
322        )
323    };
324
325    Ok(Json(NodeInfo {
326        config,
327        status,
328        pid,
329        uptime_secs,
330        pending_version,
331    }))
332}
333
334/// POST /api/v1/nodes — Add one or more nodes to the registry.
335async fn post_nodes(
336    State(state): State<Arc<AppState>>,
337    Json(opts): Json<AddNodeOpts>,
338) -> std::result::Result<(StatusCode, Json<AddNodeResult>), (StatusCode, Json<serde_json::Value>)> {
339    let registry_path = state.config.registry_path.clone();
340    let progress = NoopProgress;
341
342    match crate::node::add_nodes(opts, &registry_path, &progress).await {
343        Ok(result) => {
344            // Update the in-memory registry to stay in sync
345            let mut registry = state.registry.write().await;
346            if let Ok(fresh) = NodeRegistry::load(&registry_path) {
347                *registry = fresh;
348            }
349            Ok((StatusCode::CREATED, Json(result)))
350        }
351        Err(e) => Err((
352            StatusCode::BAD_REQUEST,
353            Json(serde_json::json!({ "error": e.to_string() })),
354        )),
355    }
356}
357
358/// DELETE /api/v1/nodes/:id — Remove a node from the registry.
359async fn delete_node(
360    State(state): State<Arc<AppState>>,
361    Path(id): Path<u32>,
362) -> std::result::Result<Json<RemoveNodeResult>, (StatusCode, Json<serde_json::Value>)> {
363    // Prevent removing a running node (would orphan the process)
364    let supervisor = state.supervisor.read().await;
365    if supervisor.is_running(id) {
366        return Err((
367            StatusCode::CONFLICT,
368            Json(serde_json::json!({
369                "error": format!("Cannot remove node {id} while it is running. Stop it first."),
370                "current_state": { "node_id": id, "status": "running" }
371            })),
372        ));
373    }
374    drop(supervisor);
375
376    let registry_path = state.config.registry_path.clone();
377
378    match crate::node::remove_node(id, &registry_path) {
379        Ok(result) => {
380            // Update the in-memory registry to stay in sync
381            let mut registry = state.registry.write().await;
382            if let Ok(fresh) = NodeRegistry::load(&registry_path) {
383                *registry = fresh;
384            }
385            Ok(Json(result))
386        }
387        Err(crate::error::Error::NodeNotFound(id)) => Err((
388            StatusCode::NOT_FOUND,
389            Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
390        )),
391        Err(e) => Err((
392            StatusCode::INTERNAL_SERVER_ERROR,
393            Json(serde_json::json!({ "error": e.to_string() })),
394        )),
395    }
396}
397
398/// POST /api/v1/nodes/:id/start — Start a specific node.
399async fn post_start_node(
400    State(state): State<Arc<AppState>>,
401    Path(id): Path<u32>,
402) -> std::result::Result<Json<NodeStarted>, (StatusCode, Json<serde_json::Value>)> {
403    let registry = state.registry.read().await;
404    let config = match registry.get(id) {
405        Ok(config) => config.clone(),
406        Err(_) => {
407            return Err((
408                StatusCode::NOT_FOUND,
409                Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
410            ))
411        }
412    };
413    drop(registry);
414
415    // An evicted node's data directory is gone; refuse to start it (recovery is to dismiss and
416    // re-add, not restart).
417    if config.eviction.is_some() {
418        return Err((
419            StatusCode::CONFLICT,
420            Json(serde_json::json!({
421                "error": format!(
422                    "Node {id} has been evicted and cannot be started. Dismiss it with \
423                     `ant node dismiss {id}` and add a new node instead."
424                ),
425                "current_state": { "node_id": id, "status": "evicted" }
426            })),
427        ));
428    }
429
430    let supervisor_ref = state.supervisor.clone();
431
432    // Acquire write lock once for atomic check-and-act (avoids TOCTOU race)
433    let mut supervisor = state.supervisor.write().await;
434    if supervisor.is_running(id) {
435        let pid = supervisor.node_pid(id);
436        let uptime_secs = supervisor.node_uptime_secs(id);
437        return Err((
438            StatusCode::CONFLICT,
439            Json(serde_json::json!({
440                "error": format!("Node {id} is already running"),
441                "current_state": {
442                    "node_id": id,
443                    "status": "running",
444                    "pid": pid,
445                    "uptime_secs": uptime_secs,
446                }
447            })),
448        ));
449    }
450
451    let registry_ref = state.registry.clone();
452    match supervisor
453        .start_node(&config, supervisor_ref, registry_ref)
454        .await
455    {
456        Ok(started) => Ok(Json(started)),
457        Err(crate::error::Error::NodeAlreadyRunning(id)) => {
458            let pid = supervisor.node_pid(id);
459            let uptime_secs = supervisor.node_uptime_secs(id);
460            Err((
461                StatusCode::CONFLICT,
462                Json(serde_json::json!({
463                    "error": format!("Node {id} is already running"),
464                    "current_state": {
465                        "node_id": id,
466                        "status": "running",
467                        "pid": pid,
468                        "uptime_secs": uptime_secs,
469                    }
470                })),
471            ))
472        }
473        Err(e) => Err((
474            StatusCode::INTERNAL_SERVER_ERROR,
475            Json(serde_json::json!({ "error": e.to_string() })),
476        )),
477    }
478}
479
480/// POST /api/v1/nodes/start-all — Start all registered nodes.
481async fn post_start_all(State(state): State<Arc<AppState>>) -> Json<StartNodeResult> {
482    let registry = state.registry.read().await;
483    // Evicted nodes have no data directory; skip them silently rather than attempting a spawn that
484    // would fail. They remain visible as `Evicted` in status until dismissed.
485    let configs: Vec<_> = registry
486        .list()
487        .into_iter()
488        .filter(|c| c.eviction.is_none())
489        .cloned()
490        .collect();
491    drop(registry);
492
493    let mut started = Vec::new();
494    let mut failed = Vec::new();
495    let mut already_running = Vec::new();
496
497    let supervisor_ref = state.supervisor.clone();
498    let registry_ref = state.registry.clone();
499
500    for config in &configs {
501        let mut supervisor = state.supervisor.write().await;
502        if supervisor.is_running(config.id) {
503            already_running.push(config.id);
504            continue;
505        }
506
507        match supervisor
508            .start_node(config, supervisor_ref.clone(), registry_ref.clone())
509            .await
510        {
511            Ok(result) => started.push(result),
512            Err(crate::error::Error::NodeAlreadyRunning(id)) => {
513                already_running.push(id);
514            }
515            Err(e) => {
516                failed.push(crate::node::types::NodeStartFailed {
517                    node_id: config.id,
518                    service_name: config.service_name.clone(),
519                    error: e.to_string(),
520                });
521            }
522        }
523    }
524
525    Json(StartNodeResult {
526        started,
527        failed,
528        already_running,
529    })
530}
531
532/// POST /api/v1/nodes/:id/stop — Stop a specific node.
533async fn post_stop_node(
534    State(state): State<Arc<AppState>>,
535    Path(id): Path<u32>,
536) -> std::result::Result<Json<NodeStopped>, (StatusCode, Json<serde_json::Value>)> {
537    let registry = state.registry.read().await;
538    let config = match registry.get(id) {
539        Ok(config) => config.clone(),
540        Err(_) => {
541            return Err((
542                StatusCode::NOT_FOUND,
543                Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
544            ))
545        }
546    };
547    drop(registry);
548
549    // An evicted node is already stopped and its data directory deleted; there is nothing to stop.
550    if config.eviction.is_some() {
551        return Err((
552            StatusCode::CONFLICT,
553            Json(serde_json::json!({
554                "error": format!(
555                    "Node {id} has been evicted; there is nothing to stop. Dismiss it with \
556                     `ant node dismiss {id}`."
557                ),
558                "current_state": { "node_id": id, "status": "evicted" }
559            })),
560        ));
561    }
562
563    // Acquire write lock once for atomic check-and-act (avoids TOCTOU race)
564    let mut supervisor = state.supervisor.write().await;
565    if !supervisor.is_running(id) {
566        let status = supervisor
567            .node_status(id)
568            .unwrap_or(crate::node::types::NodeStatus::Stopped);
569        return Err((
570            StatusCode::CONFLICT,
571            Json(serde_json::json!({
572                "error": format!("Node {id} is not running"),
573                "current_state": {
574                    "node_id": id,
575                    "status": status,
576                }
577            })),
578        ));
579    }
580
581    match supervisor.stop_node(id).await {
582        Ok(()) => Ok(Json(NodeStopped {
583            node_id: id,
584            service_name: config.service_name,
585        })),
586        Err(crate::error::Error::NodeNotRunning(id)) => {
587            let status = supervisor
588                .node_status(id)
589                .unwrap_or(crate::node::types::NodeStatus::Stopped);
590            Err((
591                StatusCode::CONFLICT,
592                Json(serde_json::json!({
593                    "error": format!("Node {id} is not running"),
594                    "current_state": {
595                        "node_id": id,
596                        "status": status,
597                    }
598                })),
599            ))
600        }
601        Err(e) => Err((
602            StatusCode::INTERNAL_SERVER_ERROR,
603            Json(serde_json::json!({ "error": e.to_string() })),
604        )),
605    }
606}
607
608/// POST /api/v1/nodes/stop-all — Stop all running nodes.
609async fn post_stop_all(State(state): State<Arc<AppState>>) -> Json<StopNodeResult> {
610    let registry = state.registry.read().await;
611    // Skip evicted nodes — there is nothing to stop, and they should stay `Evicted` until dismissed.
612    let configs: Vec<(u32, String)> = registry
613        .list()
614        .into_iter()
615        .filter(|c| c.eviction.is_none())
616        .map(|c| (c.id, c.service_name.clone()))
617        .collect();
618    drop(registry);
619
620    let mut supervisor = state.supervisor.write().await;
621    let result = supervisor.stop_all_nodes(&configs).await;
622
623    Json(result)
624}
625
626/// POST /api/v1/reset — Reset all node state.
627async fn post_reset(
628    State(state): State<Arc<AppState>>,
629) -> std::result::Result<Json<ResetResult>, (StatusCode, Json<serde_json::Value>)> {
630    // Hold write lock for atomic check-and-act (prevents nodes being started
631    // between the running check and the reset operation)
632    let supervisor = state.supervisor.write().await;
633    let (running, _, _) = supervisor.node_counts();
634    if running > 0 {
635        return Err((
636            StatusCode::CONFLICT,
637            Json(serde_json::json!({
638                "error": format!("Cannot reset while nodes are running ({running} node(s) still running). Stop all nodes first."),
639                "nodes_running": running,
640            })),
641        ));
642    }
643    drop(supervisor);
644
645    let registry_path = state.config.registry_path.clone();
646
647    match crate::node::reset(&registry_path) {
648        Ok(result) => {
649            // Update the in-memory registry to stay in sync
650            let mut registry = state.registry.write().await;
651            if let Ok(fresh) = NodeRegistry::load(&registry_path) {
652                *registry = fresh;
653            }
654            Ok(Json(result))
655        }
656        Err(e) => Err((
657            StatusCode::INTERNAL_SERVER_ERROR,
658            Json(serde_json::json!({ "error": e.to_string() })),
659        )),
660    }
661}
662
663async fn get_openapi() -> impl IntoResponse {
664    // TODO: Migrate to utoipa-generated OpenAPI spec. Types already derive
665    // utoipa::ToSchema but this spec is still hand-written JSON.
666    let spec = serde_json::json!({
667        "openapi": "3.1.0",
668        "info": {
669            "title": "Ant Daemon API",
670            "version": "0.1.0",
671            "description": "REST API for the ant node management daemon"
672        },
673        "paths": {
674            "/api/v1/status": {
675                "get": {
676                    "summary": "Daemon status",
677                    "description": "Returns daemon health, uptime, and node count summary",
678                    "responses": {
679                        "200": {
680                            "description": "Daemon status",
681                            "content": {
682                                "application/json": {
683                                    "schema": { "$ref": "#/components/schemas/DaemonStatus" }
684                                }
685                            }
686                        }
687                    }
688                }
689            },
690            "/api/v1/events": {
691                "get": {
692                    "summary": "Event stream",
693                    "description": "SSE stream of real-time node events",
694                    "responses": {
695                        "200": {
696                            "description": "SSE event stream"
697                        }
698                    }
699                }
700            },
701            "/api/v1/nodes": {
702                "post": {
703                    "summary": "Add nodes",
704                    "description": "Add one or more nodes to the registry",
705                    "requestBody": {
706                        "required": true,
707                        "content": {
708                            "application/json": {
709                                "schema": { "$ref": "#/components/schemas/AddNodeOpts" }
710                            }
711                        }
712                    },
713                    "responses": {
714                        "201": {
715                            "description": "Nodes added",
716                            "content": {
717                                "application/json": {
718                                    "schema": { "$ref": "#/components/schemas/AddNodeResult" }
719                                }
720                            }
721                        },
722                        "400": {
723                            "description": "Invalid request"
724                        }
725                    }
726                }
727            },
728            "/api/v1/nodes/{id}": {
729                "delete": {
730                    "summary": "Remove node",
731                    "description": "Remove a node from the registry",
732                    "parameters": [{
733                        "name": "id",
734                        "in": "path",
735                        "required": true,
736                        "schema": { "type": "integer" }
737                    }],
738                    "responses": {
739                        "200": {
740                            "description": "Node removed",
741                            "content": {
742                                "application/json": {
743                                    "schema": { "$ref": "#/components/schemas/RemoveNodeResult" }
744                                }
745                            }
746                        },
747                        "404": {
748                            "description": "Node not found"
749                        }
750                    }
751                }
752            },
753            "/api/v1/nodes/{id}/start": {
754                "post": {
755                    "summary": "Start a node",
756                    "description": "Start a specific node by ID. Returns 409 if already running with current_state.",
757                    "parameters": [{
758                        "name": "id",
759                        "in": "path",
760                        "required": true,
761                        "schema": { "type": "integer" }
762                    }],
763                    "responses": {
764                        "200": {
765                            "description": "Node started",
766                            "content": {
767                                "application/json": {
768                                    "schema": { "$ref": "#/components/schemas/NodeStarted" }
769                                }
770                            }
771                        },
772                        "404": {
773                            "description": "Node not found"
774                        },
775                        "409": {
776                            "description": "Node already running (includes current_state)"
777                        },
778                        "500": {
779                            "description": "Failed to start node"
780                        }
781                    }
782                }
783            },
784            "/api/v1/nodes/start-all": {
785                "post": {
786                    "summary": "Start all nodes",
787                    "description": "Start all registered nodes. Returns per-node results.",
788                    "responses": {
789                        "200": {
790                            "description": "Start results",
791                            "content": {
792                                "application/json": {
793                                    "schema": { "$ref": "#/components/schemas/StartNodeResult" }
794                                }
795                            }
796                        }
797                    }
798                }
799            },
800            "/api/v1/nodes/{id}/stop": {
801                "post": {
802                    "summary": "Stop a node",
803                    "description": "Stop a specific node by ID. Returns 409 if already stopped with current_state.",
804                    "parameters": [{
805                        "name": "id",
806                        "in": "path",
807                        "required": true,
808                        "schema": { "type": "integer" }
809                    }],
810                    "responses": {
811                        "200": {
812                            "description": "Node stopped",
813                            "content": {
814                                "application/json": {
815                                    "schema": { "$ref": "#/components/schemas/NodeStopped" }
816                                }
817                            }
818                        },
819                        "404": {
820                            "description": "Node not found"
821                        },
822                        "409": {
823                            "description": "Node not running (includes current_state)"
824                        },
825                        "500": {
826                            "description": "Failed to stop node"
827                        }
828                    }
829                }
830            },
831            "/api/v1/nodes/stop-all": {
832                "post": {
833                    "summary": "Stop all nodes",
834                    "description": "Stop all running nodes. Returns per-node results.",
835                    "responses": {
836                        "200": {
837                            "description": "Stop results",
838                            "content": {
839                                "application/json": {
840                                    "schema": { "$ref": "#/components/schemas/StopNodeResult" }
841                                }
842                            }
843                        }
844                    }
845                }
846            },
847            "/api/v1/reset": {
848                "post": {
849                    "summary": "Reset all node state",
850                    "description": "Remove all node data directories, log directories, and clear the registry. Fails if any nodes are running.",
851                    "responses": {
852                        "200": {
853                            "description": "Reset successful",
854                            "content": {
855                                "application/json": {
856                                    "schema": { "$ref": "#/components/schemas/ResetResult" }
857                                }
858                            }
859                        },
860                        "409": {
861                            "description": "Nodes still running"
862                        }
863                    }
864                }
865            }
866        },
867        "components": {
868            "schemas": {
869                "DaemonStatus": {
870                    "type": "object",
871                    "properties": {
872                        "running": { "type": "boolean" },
873                        "pid": { "type": "integer", "nullable": true },
874                        "port": { "type": "integer", "nullable": true },
875                        "uptime_secs": { "type": "integer", "nullable": true },
876                        "nodes_total": { "type": "integer" },
877                        "nodes_running": { "type": "integer" },
878                        "nodes_stopped": { "type": "integer" },
879                        "nodes_errored": { "type": "integer" }
880                    }
881                }
882            }
883        }
884    });
885    Json(spec)
886}
887
888async fn get_console() -> Html<&'static str> {
889    Html(include_str!("console.html"))
890}
891
892fn write_file(path: &PathBuf, contents: &str) -> Result<()> {
893    if let Some(parent) = path.parent() {
894        std::fs::create_dir_all(parent)?;
895    }
896    std::fs::write(path, contents)?;
897    Ok(())
898}
899
900/// Refresh each registered node's `version` against what its on-disk binary reports.
901///
902/// Intended as a one-time pass at daemon startup to heal registries left in a stale state by
903/// earlier daemon versions that didn't track auto-upgrades. Missing binaries and transient
904/// `--version` failures are silently skipped so daemon startup never aborts on this.
905async fn reconcile_registry_versions(registry: &mut NodeRegistry) {
906    let node_ids: Vec<u32> = registry.list().iter().map(|c| c.id).collect();
907    let mut changed = false;
908
909    for id in node_ids {
910        let (binary_path, recorded_version) = match registry.get(id) {
911            Ok(c) => (c.binary_path.clone(), c.version.clone()),
912            Err(_) => continue,
913        };
914
915        if !binary_path.exists() {
916            continue;
917        }
918
919        let Ok(disk_version) = crate::node::binary::extract_version(&binary_path).await else {
920            continue;
921        };
922
923        if disk_version == recorded_version {
924            continue;
925        }
926
927        if let Ok(entry) = registry.get_mut(id) {
928            entry.version = disk_version;
929            changed = true;
930        }
931    }
932
933    if changed {
934        let _ = registry.save();
935    }
936}
937
938#[cfg(all(test, unix))]
939mod tests {
940    use super::*;
941    use crate::node::registry::NodeRegistry;
942    use crate::node::types::{EvmNetwork, NodeConfig};
943    use std::collections::HashMap;
944    use std::os::unix::fs::PermissionsExt;
945
946    fn write_fake_binary(path: &std::path::Path, stdout: &str) {
947        let script = format!("#!/bin/sh\nprintf '%s\\n' '{stdout}'\n");
948        std::fs::write(path, script).unwrap();
949        let mut perm = std::fs::metadata(path).unwrap().permissions();
950        perm.set_mode(0o755);
951        std::fs::set_permissions(path, perm).unwrap();
952    }
953
954    fn seed_config(binary_path: PathBuf, version: &str, data_dir: PathBuf) -> NodeConfig {
955        NodeConfig {
956            id: 0,
957            service_name: String::new(),
958            rewards_address: "0x0".into(),
959            data_dir,
960            log_dir: None,
961            node_port: None,
962            binary_path,
963            version: version.into(),
964            env_variables: HashMap::new(),
965            bootstrap_peers: vec![],
966            upgrade_channel: None,
967            evm_network: EvmNetwork::default(),
968            eviction: None,
969        }
970    }
971
972    #[tokio::test]
973    async fn reconcile_updates_stale_version_and_persists() {
974        let tmp = tempfile::tempdir().unwrap();
975        let reg_path = tmp.path().join("registry.json");
976        let bin_path = tmp.path().join("ant-node");
977        write_fake_binary(&bin_path, "ant-node 0.10.11-rc.1");
978
979        let mut registry = NodeRegistry::load(&reg_path).unwrap();
980        let id = registry.add(seed_config(
981            bin_path.clone(),
982            "0.10.1",
983            tmp.path().join("data"),
984        ));
985        registry.save().unwrap();
986
987        reconcile_registry_versions(&mut registry).await;
988
989        assert_eq!(registry.get(id).unwrap().version, "0.10.11-rc.1");
990
991        let reloaded = NodeRegistry::load(&reg_path).unwrap();
992        assert_eq!(reloaded.get(id).unwrap().version, "0.10.11-rc.1");
993    }
994
995    #[tokio::test]
996    async fn reconcile_leaves_matching_version_alone() {
997        let tmp = tempfile::tempdir().unwrap();
998        let reg_path = tmp.path().join("registry.json");
999        let bin_path = tmp.path().join("ant-node");
1000        write_fake_binary(&bin_path, "ant-node 0.10.1");
1001
1002        let mut registry = NodeRegistry::load(&reg_path).unwrap();
1003        let id = registry.add(seed_config(
1004            bin_path.clone(),
1005            "0.10.1",
1006            tmp.path().join("data"),
1007        ));
1008
1009        reconcile_registry_versions(&mut registry).await;
1010
1011        assert_eq!(registry.get(id).unwrap().version, "0.10.1");
1012    }
1013
1014    #[tokio::test]
1015    async fn reconcile_skips_missing_binary() {
1016        let tmp = tempfile::tempdir().unwrap();
1017        let reg_path = tmp.path().join("registry.json");
1018
1019        let mut registry = NodeRegistry::load(&reg_path).unwrap();
1020        let id = registry.add(seed_config(
1021            tmp.path().join("does-not-exist"),
1022            "0.10.1",
1023            tmp.path().join("data"),
1024        ));
1025
1026        reconcile_registry_versions(&mut registry).await;
1027
1028        assert_eq!(registry.get(id).unwrap().version, "0.10.1");
1029    }
1030}