Skip to main content

ant_core/node/daemon/
server.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Instant;
5
6use axum::extract::{Path, State};
7use axum::http::StatusCode;
8use axum::response::sse::{Event, Sse};
9use axum::response::{Html, IntoResponse};
10use axum::routing::{get, post};
11use axum::{Json, Router};
12use tokio::sync::broadcast;
13use tokio::sync::RwLock;
14use tokio_util::sync::CancellationToken;
15
16use crate::error::Result;
17use crate::node::binary::NoopProgress;
18use crate::node::daemon::supervisor::{
19    spawn_liveness_monitor, spawn_upgrade_monitor, Supervisor, LIVENESS_POLL_INTERVAL,
20    UPGRADE_POLL_INTERVAL,
21};
22use crate::node::events::NodeEvent;
23use crate::node::registry::NodeRegistry;
24use crate::node::types::{
25    AddNodeOpts, AddNodeResult, DaemonConfig, DaemonStatus, NodeInfo, NodeStarted, NodeStatus,
26    NodeStatusResult, NodeStatusSummary, NodeStopped, RemoveNodeResult, ResetResult,
27    StartNodeResult, StopNodeResult,
28};
29
30/// Shared application state for the daemon HTTP server.
31pub struct AppState {
32    pub registry: Arc<RwLock<NodeRegistry>>,
33    pub supervisor: Arc<RwLock<Supervisor>>,
34    pub event_tx: broadcast::Sender<NodeEvent>,
35    pub start_time: Instant,
36    pub config: DaemonConfig,
37    /// The actual address the server bound to (resolves port 0 to real port).
38    pub bound_port: u16,
39}
40
41/// Start the daemon HTTP server.
42///
43/// Returns the actual address the server bound to (useful when port is 0).
44pub async fn start(
45    config: DaemonConfig,
46    mut registry: NodeRegistry,
47    shutdown: CancellationToken,
48) -> Result<SocketAddr> {
49    let (event_tx, _) = broadcast::channel(256);
50
51    let addr = SocketAddr::new(config.listen_addr, config.port.unwrap_or(0));
52    let listener = tokio::net::TcpListener::bind(addr)
53        .await
54        .map_err(|e| crate::error::Error::BindError(e.to_string()))?;
55    let bound_addr = listener
56        .local_addr()
57        .map_err(|e| crate::error::Error::BindError(e.to_string()))?;
58
59    // Heal any stale `version` entries in the registry. If an earlier daemon ran without the
60    // upgrade-aware supervisor, the on-disk binary may have been replaced without the registry
61    // being updated. We re-read each binary's version and persist any differences before the
62    // supervisor comes up, so subsequent status queries reflect reality.
63    reconcile_registry_versions(&mut registry).await;
64
65    let registry = Arc::new(RwLock::new(registry));
66    let supervisor = Arc::new(RwLock::new(Supervisor::new(event_tx.clone())));
67
68    // Adopt node processes spawned by a previous daemon instance. Must run before
69    // `axum::serve` starts accepting requests — the window between supervisor
70    // creation and adoption is where `/api/v1/nodes/status` would otherwise report
71    // live nodes as Stopped (the supervisor's default when it has no runtime entry).
72    {
73        let reg = registry.read().await;
74        let mut sup = supervisor.write().await;
75        let adopted = sup.adopt_from_registry(&reg);
76        if !adopted.is_empty() {
77            tracing::info!(
78                "Adopted {} running node(s) from a previous daemon instance: {:?}",
79                adopted.len(),
80                adopted
81            );
82        }
83    }
84
85    let state = Arc::new(AppState {
86        registry: registry.clone(),
87        supervisor: supervisor.clone(),
88        event_tx: event_tx.clone(),
89        start_time: Instant::now(),
90        config: config.clone(),
91        bound_port: bound_addr.port(),
92    });
93
94    // Background task: probe each Running node's on-disk binary for version drift caused by
95    // ant-node's auto-upgrade, and flip them to UpgradeScheduled so the supervisor knows the
96    // next exit is expected.
97    spawn_upgrade_monitor(
98        registry.clone(),
99        supervisor.clone(),
100        UPGRADE_POLL_INTERVAL,
101        shutdown.clone(),
102    );
103
104    // Background task: poll adopted nodes' PIDs for OS liveness. Daemon-spawned nodes
105    // get exit detection via `monitor_node`'s owned `Child` handle; adopted nodes don't,
106    // so this poll is the only way the supervisor learns when one of them exits.
107    spawn_liveness_monitor(
108        registry,
109        supervisor,
110        event_tx,
111        LIVENESS_POLL_INTERVAL,
112        shutdown.clone(),
113    );
114
115    let app = build_router(state.clone());
116
117    // Write port and PID files
118    write_file(&config.port_file_path, &bound_addr.port().to_string())?;
119    write_file(&config.pid_file_path, &std::process::id().to_string())?;
120
121    let port_file = config.port_file_path.clone();
122    let pid_file = config.pid_file_path.clone();
123
124    tokio::spawn(async move {
125        axum::serve(listener, app)
126            .with_graceful_shutdown(shutdown.cancelled_owned())
127            .await
128            .ok();
129
130        // Clean up port and PID files on shutdown
131        let _ = std::fs::remove_file(&port_file);
132        let _ = std::fs::remove_file(&pid_file);
133    });
134
135    Ok(bound_addr)
136}
137
138fn build_router(state: Arc<AppState>) -> Router {
139    use axum::http::HeaderValue;
140    use tower_http::cors::{Any, CorsLayer};
141
142    // Restrict CORS to the daemon's own origin to prevent cross-origin CSRF
143    // attacks from malicious webpages. Non-browser clients (CLI, AI agents)
144    // don't send Origin headers so CORS doesn't affect them.
145    let origin = format!("http://127.0.0.1:{}", state.bound_port);
146    let cors = CorsLayer::new()
147        .allow_origin([origin.parse::<HeaderValue>().unwrap()])
148        .allow_methods(Any)
149        .allow_headers(Any);
150
151    Router::new()
152        .route("/console", get(get_console))
153        .route("/api/v1/status", get(get_status))
154        .route("/api/v1/events", get(get_events))
155        .route("/api/v1/nodes/status", get(get_nodes_status))
156        .route("/api/v1/nodes", post(post_nodes))
157        .route(
158            "/api/v1/nodes/{id}",
159            get(get_node_detail).delete(delete_node),
160        )
161        .route("/api/v1/nodes/{id}/start", post(post_start_node))
162        .route("/api/v1/nodes/start-all", post(post_start_all))
163        .route("/api/v1/nodes/{id}/stop", post(post_stop_node))
164        .route("/api/v1/nodes/stop-all", post(post_stop_all))
165        .route("/api/v1/reset", post(post_reset))
166        .route("/api/v1/openapi.json", get(get_openapi))
167        .layer(cors)
168        .with_state(state)
169}
170
171async fn get_status(State(state): State<Arc<AppState>>) -> Json<DaemonStatus> {
172    let registry = state.registry.read().await;
173    let supervisor = state.supervisor.read().await;
174    let (running, stopped, errored) = supervisor.node_counts();
175
176    Json(DaemonStatus {
177        running: true,
178        pid: Some(std::process::id()),
179        port: Some(state.bound_port),
180        uptime_secs: Some(state.start_time.elapsed().as_secs()),
181        nodes_total: registry.len() as u32,
182        nodes_running: running,
183        nodes_stopped: stopped,
184        nodes_errored: errored,
185    })
186}
187
188async fn get_events(
189    State(state): State<Arc<AppState>>,
190) -> Sse<impl futures_core::Stream<Item = std::result::Result<Event, std::convert::Infallible>>> {
191    let mut rx = state.event_tx.subscribe();
192
193    let stream = async_stream::stream! {
194        loop {
195            match rx.recv().await {
196                Ok(event) => {
197                    let event_type = event.event_type().to_string();
198                    if let Ok(data) = serde_json::to_string(&event) {
199                        yield Ok(Event::default().event(event_type).data(data));
200                    }
201                }
202                Err(broadcast::error::RecvError::Lagged(_)) => continue,
203                Err(broadcast::error::RecvError::Closed) => break,
204            }
205        }
206    };
207
208    Sse::new(stream)
209}
210
211/// GET /api/v1/nodes/status — Get status of all registered nodes.
212async fn get_nodes_status(State(state): State<Arc<AppState>>) -> Json<NodeStatusResult> {
213    let registry = state.registry.read().await;
214    let supervisor = state.supervisor.read().await;
215
216    let mut nodes = Vec::new();
217    let mut total_running = 0u32;
218    let mut total_stopped = 0u32;
219
220    for config in registry.list() {
221        let status = supervisor
222            .node_status(config.id)
223            .unwrap_or(NodeStatus::Stopped);
224
225        match status {
226            NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
227                total_running += 1
228            }
229            _ => total_stopped += 1,
230        }
231
232        let pid = supervisor.node_pid(config.id);
233        let uptime_secs = supervisor.node_uptime_secs(config.id);
234        let pending_version = supervisor.node_pending_version(config.id);
235
236        nodes.push(NodeStatusSummary {
237            node_id: config.id,
238            name: config.service_name.clone(),
239            version: config.version.clone(),
240            status,
241            pid,
242            uptime_secs,
243            pending_version,
244        });
245    }
246
247    Json(NodeStatusResult {
248        nodes,
249        total_running,
250        total_stopped,
251    })
252}
253
254/// GET /api/v1/nodes/:id — Get full detail for a single node.
255async fn get_node_detail(
256    State(state): State<Arc<AppState>>,
257    Path(id): Path<u32>,
258) -> std::result::Result<Json<NodeInfo>, (StatusCode, Json<serde_json::Value>)> {
259    let registry = state.registry.read().await;
260    let config = match registry.get(id) {
261        Ok(config) => config.clone(),
262        Err(_) => {
263            return Err((
264                StatusCode::NOT_FOUND,
265                Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
266            ))
267        }
268    };
269
270    let supervisor = state.supervisor.read().await;
271    let status = supervisor.node_status(id).unwrap_or(NodeStatus::Stopped);
272    let pid = supervisor.node_pid(id);
273    let uptime_secs = supervisor.node_uptime_secs(id);
274    let pending_version = supervisor.node_pending_version(id);
275
276    Ok(Json(NodeInfo {
277        config,
278        status,
279        pid,
280        uptime_secs,
281        pending_version,
282    }))
283}
284
285/// POST /api/v1/nodes — Add one or more nodes to the registry.
286async fn post_nodes(
287    State(state): State<Arc<AppState>>,
288    Json(opts): Json<AddNodeOpts>,
289) -> std::result::Result<(StatusCode, Json<AddNodeResult>), (StatusCode, Json<serde_json::Value>)> {
290    let registry_path = state.config.registry_path.clone();
291    let progress = NoopProgress;
292
293    match crate::node::add_nodes(opts, &registry_path, &progress).await {
294        Ok(result) => {
295            // Update the in-memory registry to stay in sync
296            let mut registry = state.registry.write().await;
297            if let Ok(fresh) = NodeRegistry::load(&registry_path) {
298                *registry = fresh;
299            }
300            Ok((StatusCode::CREATED, Json(result)))
301        }
302        Err(e) => Err((
303            StatusCode::BAD_REQUEST,
304            Json(serde_json::json!({ "error": e.to_string() })),
305        )),
306    }
307}
308
309/// DELETE /api/v1/nodes/:id — Remove a node from the registry.
310async fn delete_node(
311    State(state): State<Arc<AppState>>,
312    Path(id): Path<u32>,
313) -> std::result::Result<Json<RemoveNodeResult>, (StatusCode, Json<serde_json::Value>)> {
314    // Prevent removing a running node (would orphan the process)
315    let supervisor = state.supervisor.read().await;
316    if supervisor.is_running(id) {
317        return Err((
318            StatusCode::CONFLICT,
319            Json(serde_json::json!({
320                "error": format!("Cannot remove node {id} while it is running. Stop it first."),
321                "current_state": { "node_id": id, "status": "running" }
322            })),
323        ));
324    }
325    drop(supervisor);
326
327    let registry_path = state.config.registry_path.clone();
328
329    match crate::node::remove_node(id, &registry_path) {
330        Ok(result) => {
331            // Update the in-memory registry to stay in sync
332            let mut registry = state.registry.write().await;
333            if let Ok(fresh) = NodeRegistry::load(&registry_path) {
334                *registry = fresh;
335            }
336            Ok(Json(result))
337        }
338        Err(crate::error::Error::NodeNotFound(id)) => Err((
339            StatusCode::NOT_FOUND,
340            Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
341        )),
342        Err(e) => Err((
343            StatusCode::INTERNAL_SERVER_ERROR,
344            Json(serde_json::json!({ "error": e.to_string() })),
345        )),
346    }
347}
348
349/// POST /api/v1/nodes/:id/start — Start a specific node.
350async fn post_start_node(
351    State(state): State<Arc<AppState>>,
352    Path(id): Path<u32>,
353) -> std::result::Result<Json<NodeStarted>, (StatusCode, Json<serde_json::Value>)> {
354    let registry = state.registry.read().await;
355    let config = match registry.get(id) {
356        Ok(config) => config.clone(),
357        Err(_) => {
358            return Err((
359                StatusCode::NOT_FOUND,
360                Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
361            ))
362        }
363    };
364    drop(registry);
365
366    let supervisor_ref = state.supervisor.clone();
367
368    // Acquire write lock once for atomic check-and-act (avoids TOCTOU race)
369    let mut supervisor = state.supervisor.write().await;
370    if supervisor.is_running(id) {
371        let pid = supervisor.node_pid(id);
372        let uptime_secs = supervisor.node_uptime_secs(id);
373        return Err((
374            StatusCode::CONFLICT,
375            Json(serde_json::json!({
376                "error": format!("Node {id} is already running"),
377                "current_state": {
378                    "node_id": id,
379                    "status": "running",
380                    "pid": pid,
381                    "uptime_secs": uptime_secs,
382                }
383            })),
384        ));
385    }
386
387    let registry_ref = state.registry.clone();
388    match supervisor
389        .start_node(&config, supervisor_ref, registry_ref)
390        .await
391    {
392        Ok(started) => Ok(Json(started)),
393        Err(crate::error::Error::NodeAlreadyRunning(id)) => {
394            let pid = supervisor.node_pid(id);
395            let uptime_secs = supervisor.node_uptime_secs(id);
396            Err((
397                StatusCode::CONFLICT,
398                Json(serde_json::json!({
399                    "error": format!("Node {id} is already running"),
400                    "current_state": {
401                        "node_id": id,
402                        "status": "running",
403                        "pid": pid,
404                        "uptime_secs": uptime_secs,
405                    }
406                })),
407            ))
408        }
409        Err(e) => Err((
410            StatusCode::INTERNAL_SERVER_ERROR,
411            Json(serde_json::json!({ "error": e.to_string() })),
412        )),
413    }
414}
415
416/// POST /api/v1/nodes/start-all — Start all registered nodes.
417async fn post_start_all(State(state): State<Arc<AppState>>) -> Json<StartNodeResult> {
418    let registry = state.registry.read().await;
419    let configs: Vec<_> = registry.list().into_iter().cloned().collect();
420    drop(registry);
421
422    let mut started = Vec::new();
423    let mut failed = Vec::new();
424    let mut already_running = Vec::new();
425
426    let supervisor_ref = state.supervisor.clone();
427    let registry_ref = state.registry.clone();
428
429    for config in &configs {
430        let mut supervisor = state.supervisor.write().await;
431        if supervisor.is_running(config.id) {
432            already_running.push(config.id);
433            continue;
434        }
435
436        match supervisor
437            .start_node(config, supervisor_ref.clone(), registry_ref.clone())
438            .await
439        {
440            Ok(result) => started.push(result),
441            Err(crate::error::Error::NodeAlreadyRunning(id)) => {
442                already_running.push(id);
443            }
444            Err(e) => {
445                failed.push(crate::node::types::NodeStartFailed {
446                    node_id: config.id,
447                    service_name: config.service_name.clone(),
448                    error: e.to_string(),
449                });
450            }
451        }
452    }
453
454    Json(StartNodeResult {
455        started,
456        failed,
457        already_running,
458    })
459}
460
461/// POST /api/v1/nodes/:id/stop — Stop a specific node.
462async fn post_stop_node(
463    State(state): State<Arc<AppState>>,
464    Path(id): Path<u32>,
465) -> std::result::Result<Json<NodeStopped>, (StatusCode, Json<serde_json::Value>)> {
466    let registry = state.registry.read().await;
467    let config = match registry.get(id) {
468        Ok(config) => config.clone(),
469        Err(_) => {
470            return Err((
471                StatusCode::NOT_FOUND,
472                Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
473            ))
474        }
475    };
476    drop(registry);
477
478    // Acquire write lock once for atomic check-and-act (avoids TOCTOU race)
479    let mut supervisor = state.supervisor.write().await;
480    if !supervisor.is_running(id) {
481        let status = supervisor
482            .node_status(id)
483            .unwrap_or(crate::node::types::NodeStatus::Stopped);
484        return Err((
485            StatusCode::CONFLICT,
486            Json(serde_json::json!({
487                "error": format!("Node {id} is not running"),
488                "current_state": {
489                    "node_id": id,
490                    "status": status,
491                }
492            })),
493        ));
494    }
495
496    match supervisor.stop_node(id).await {
497        Ok(()) => Ok(Json(NodeStopped {
498            node_id: id,
499            service_name: config.service_name,
500        })),
501        Err(crate::error::Error::NodeNotRunning(id)) => {
502            let status = supervisor
503                .node_status(id)
504                .unwrap_or(crate::node::types::NodeStatus::Stopped);
505            Err((
506                StatusCode::CONFLICT,
507                Json(serde_json::json!({
508                    "error": format!("Node {id} is not running"),
509                    "current_state": {
510                        "node_id": id,
511                        "status": status,
512                    }
513                })),
514            ))
515        }
516        Err(e) => Err((
517            StatusCode::INTERNAL_SERVER_ERROR,
518            Json(serde_json::json!({ "error": e.to_string() })),
519        )),
520    }
521}
522
523/// POST /api/v1/nodes/stop-all — Stop all running nodes.
524async fn post_stop_all(State(state): State<Arc<AppState>>) -> Json<StopNodeResult> {
525    let registry = state.registry.read().await;
526    let configs: Vec<(u32, String)> = registry
527        .list()
528        .into_iter()
529        .map(|c| (c.id, c.service_name.clone()))
530        .collect();
531    drop(registry);
532
533    let mut supervisor = state.supervisor.write().await;
534    let result = supervisor.stop_all_nodes(&configs).await;
535
536    Json(result)
537}
538
539/// POST /api/v1/reset — Reset all node state.
540async fn post_reset(
541    State(state): State<Arc<AppState>>,
542) -> std::result::Result<Json<ResetResult>, (StatusCode, Json<serde_json::Value>)> {
543    // Hold write lock for atomic check-and-act (prevents nodes being started
544    // between the running check and the reset operation)
545    let supervisor = state.supervisor.write().await;
546    let (running, _, _) = supervisor.node_counts();
547    if running > 0 {
548        return Err((
549            StatusCode::CONFLICT,
550            Json(serde_json::json!({
551                "error": format!("Cannot reset while nodes are running ({running} node(s) still running). Stop all nodes first."),
552                "nodes_running": running,
553            })),
554        ));
555    }
556    drop(supervisor);
557
558    let registry_path = state.config.registry_path.clone();
559
560    match crate::node::reset(&registry_path) {
561        Ok(result) => {
562            // Update the in-memory registry to stay in sync
563            let mut registry = state.registry.write().await;
564            if let Ok(fresh) = NodeRegistry::load(&registry_path) {
565                *registry = fresh;
566            }
567            Ok(Json(result))
568        }
569        Err(e) => Err((
570            StatusCode::INTERNAL_SERVER_ERROR,
571            Json(serde_json::json!({ "error": e.to_string() })),
572        )),
573    }
574}
575
576async fn get_openapi() -> impl IntoResponse {
577    // TODO: Migrate to utoipa-generated OpenAPI spec. Types already derive
578    // utoipa::ToSchema but this spec is still hand-written JSON.
579    let spec = serde_json::json!({
580        "openapi": "3.1.0",
581        "info": {
582            "title": "Ant Daemon API",
583            "version": "0.1.0",
584            "description": "REST API for the ant node management daemon"
585        },
586        "paths": {
587            "/api/v1/status": {
588                "get": {
589                    "summary": "Daemon status",
590                    "description": "Returns daemon health, uptime, and node count summary",
591                    "responses": {
592                        "200": {
593                            "description": "Daemon status",
594                            "content": {
595                                "application/json": {
596                                    "schema": { "$ref": "#/components/schemas/DaemonStatus" }
597                                }
598                            }
599                        }
600                    }
601                }
602            },
603            "/api/v1/events": {
604                "get": {
605                    "summary": "Event stream",
606                    "description": "SSE stream of real-time node events",
607                    "responses": {
608                        "200": {
609                            "description": "SSE event stream"
610                        }
611                    }
612                }
613            },
614            "/api/v1/nodes": {
615                "post": {
616                    "summary": "Add nodes",
617                    "description": "Add one or more nodes to the registry",
618                    "requestBody": {
619                        "required": true,
620                        "content": {
621                            "application/json": {
622                                "schema": { "$ref": "#/components/schemas/AddNodeOpts" }
623                            }
624                        }
625                    },
626                    "responses": {
627                        "201": {
628                            "description": "Nodes added",
629                            "content": {
630                                "application/json": {
631                                    "schema": { "$ref": "#/components/schemas/AddNodeResult" }
632                                }
633                            }
634                        },
635                        "400": {
636                            "description": "Invalid request"
637                        }
638                    }
639                }
640            },
641            "/api/v1/nodes/{id}": {
642                "delete": {
643                    "summary": "Remove node",
644                    "description": "Remove a node from the registry",
645                    "parameters": [{
646                        "name": "id",
647                        "in": "path",
648                        "required": true,
649                        "schema": { "type": "integer" }
650                    }],
651                    "responses": {
652                        "200": {
653                            "description": "Node removed",
654                            "content": {
655                                "application/json": {
656                                    "schema": { "$ref": "#/components/schemas/RemoveNodeResult" }
657                                }
658                            }
659                        },
660                        "404": {
661                            "description": "Node not found"
662                        }
663                    }
664                }
665            },
666            "/api/v1/nodes/{id}/start": {
667                "post": {
668                    "summary": "Start a node",
669                    "description": "Start a specific node by ID. Returns 409 if already running with current_state.",
670                    "parameters": [{
671                        "name": "id",
672                        "in": "path",
673                        "required": true,
674                        "schema": { "type": "integer" }
675                    }],
676                    "responses": {
677                        "200": {
678                            "description": "Node started",
679                            "content": {
680                                "application/json": {
681                                    "schema": { "$ref": "#/components/schemas/NodeStarted" }
682                                }
683                            }
684                        },
685                        "404": {
686                            "description": "Node not found"
687                        },
688                        "409": {
689                            "description": "Node already running (includes current_state)"
690                        },
691                        "500": {
692                            "description": "Failed to start node"
693                        }
694                    }
695                }
696            },
697            "/api/v1/nodes/start-all": {
698                "post": {
699                    "summary": "Start all nodes",
700                    "description": "Start all registered nodes. Returns per-node results.",
701                    "responses": {
702                        "200": {
703                            "description": "Start results",
704                            "content": {
705                                "application/json": {
706                                    "schema": { "$ref": "#/components/schemas/StartNodeResult" }
707                                }
708                            }
709                        }
710                    }
711                }
712            },
713            "/api/v1/nodes/{id}/stop": {
714                "post": {
715                    "summary": "Stop a node",
716                    "description": "Stop a specific node by ID. Returns 409 if already stopped with current_state.",
717                    "parameters": [{
718                        "name": "id",
719                        "in": "path",
720                        "required": true,
721                        "schema": { "type": "integer" }
722                    }],
723                    "responses": {
724                        "200": {
725                            "description": "Node stopped",
726                            "content": {
727                                "application/json": {
728                                    "schema": { "$ref": "#/components/schemas/NodeStopped" }
729                                }
730                            }
731                        },
732                        "404": {
733                            "description": "Node not found"
734                        },
735                        "409": {
736                            "description": "Node not running (includes current_state)"
737                        },
738                        "500": {
739                            "description": "Failed to stop node"
740                        }
741                    }
742                }
743            },
744            "/api/v1/nodes/stop-all": {
745                "post": {
746                    "summary": "Stop all nodes",
747                    "description": "Stop all running nodes. Returns per-node results.",
748                    "responses": {
749                        "200": {
750                            "description": "Stop results",
751                            "content": {
752                                "application/json": {
753                                    "schema": { "$ref": "#/components/schemas/StopNodeResult" }
754                                }
755                            }
756                        }
757                    }
758                }
759            },
760            "/api/v1/reset": {
761                "post": {
762                    "summary": "Reset all node state",
763                    "description": "Remove all node data directories, log directories, and clear the registry. Fails if any nodes are running.",
764                    "responses": {
765                        "200": {
766                            "description": "Reset successful",
767                            "content": {
768                                "application/json": {
769                                    "schema": { "$ref": "#/components/schemas/ResetResult" }
770                                }
771                            }
772                        },
773                        "409": {
774                            "description": "Nodes still running"
775                        }
776                    }
777                }
778            }
779        },
780        "components": {
781            "schemas": {
782                "DaemonStatus": {
783                    "type": "object",
784                    "properties": {
785                        "running": { "type": "boolean" },
786                        "pid": { "type": "integer", "nullable": true },
787                        "port": { "type": "integer", "nullable": true },
788                        "uptime_secs": { "type": "integer", "nullable": true },
789                        "nodes_total": { "type": "integer" },
790                        "nodes_running": { "type": "integer" },
791                        "nodes_stopped": { "type": "integer" },
792                        "nodes_errored": { "type": "integer" }
793                    }
794                }
795            }
796        }
797    });
798    Json(spec)
799}
800
801async fn get_console() -> Html<&'static str> {
802    Html(include_str!("console.html"))
803}
804
805fn write_file(path: &PathBuf, contents: &str) -> Result<()> {
806    if let Some(parent) = path.parent() {
807        std::fs::create_dir_all(parent)?;
808    }
809    std::fs::write(path, contents)?;
810    Ok(())
811}
812
813/// Refresh each registered node's `version` against what its on-disk binary reports.
814///
815/// Intended as a one-time pass at daemon startup to heal registries left in a stale state by
816/// earlier daemon versions that didn't track auto-upgrades. Missing binaries and transient
817/// `--version` failures are silently skipped so daemon startup never aborts on this.
818async fn reconcile_registry_versions(registry: &mut NodeRegistry) {
819    let node_ids: Vec<u32> = registry.list().iter().map(|c| c.id).collect();
820    let mut changed = false;
821
822    for id in node_ids {
823        let (binary_path, recorded_version) = match registry.get(id) {
824            Ok(c) => (c.binary_path.clone(), c.version.clone()),
825            Err(_) => continue,
826        };
827
828        if !binary_path.exists() {
829            continue;
830        }
831
832        let Ok(disk_version) = crate::node::binary::extract_version(&binary_path).await else {
833            continue;
834        };
835
836        if disk_version == recorded_version {
837            continue;
838        }
839
840        if let Ok(entry) = registry.get_mut(id) {
841            entry.version = disk_version;
842            changed = true;
843        }
844    }
845
846    if changed {
847        let _ = registry.save();
848    }
849}
850
851#[cfg(all(test, unix))]
852mod tests {
853    use super::*;
854    use crate::node::registry::NodeRegistry;
855    use crate::node::types::NodeConfig;
856    use std::collections::HashMap;
857    use std::os::unix::fs::PermissionsExt;
858
859    fn write_fake_binary(path: &std::path::Path, stdout: &str) {
860        let script = format!("#!/bin/sh\nprintf '%s\\n' '{stdout}'\n");
861        std::fs::write(path, script).unwrap();
862        let mut perm = std::fs::metadata(path).unwrap().permissions();
863        perm.set_mode(0o755);
864        std::fs::set_permissions(path, perm).unwrap();
865    }
866
867    fn seed_config(binary_path: PathBuf, version: &str, data_dir: PathBuf) -> NodeConfig {
868        NodeConfig {
869            id: 0,
870            service_name: String::new(),
871            rewards_address: "0x0".into(),
872            data_dir,
873            log_dir: None,
874            node_port: None,
875            metrics_port: None,
876            network_id: Some(1),
877            binary_path,
878            version: version.into(),
879            env_variables: HashMap::new(),
880            bootstrap_peers: vec![],
881        }
882    }
883
884    #[tokio::test]
885    async fn reconcile_updates_stale_version_and_persists() {
886        let tmp = tempfile::tempdir().unwrap();
887        let reg_path = tmp.path().join("registry.json");
888        let bin_path = tmp.path().join("ant-node");
889        write_fake_binary(&bin_path, "ant-node 0.10.11-rc.1");
890
891        let mut registry = NodeRegistry::load(&reg_path).unwrap();
892        let id = registry.add(seed_config(
893            bin_path.clone(),
894            "0.10.1",
895            tmp.path().join("data"),
896        ));
897        registry.save().unwrap();
898
899        reconcile_registry_versions(&mut registry).await;
900
901        assert_eq!(registry.get(id).unwrap().version, "0.10.11-rc.1");
902
903        let reloaded = NodeRegistry::load(&reg_path).unwrap();
904        assert_eq!(reloaded.get(id).unwrap().version, "0.10.11-rc.1");
905    }
906
907    #[tokio::test]
908    async fn reconcile_leaves_matching_version_alone() {
909        let tmp = tempfile::tempdir().unwrap();
910        let reg_path = tmp.path().join("registry.json");
911        let bin_path = tmp.path().join("ant-node");
912        write_fake_binary(&bin_path, "ant-node 0.10.1");
913
914        let mut registry = NodeRegistry::load(&reg_path).unwrap();
915        let id = registry.add(seed_config(
916            bin_path.clone(),
917            "0.10.1",
918            tmp.path().join("data"),
919        ));
920
921        reconcile_registry_versions(&mut registry).await;
922
923        assert_eq!(registry.get(id).unwrap().version, "0.10.1");
924    }
925
926    #[tokio::test]
927    async fn reconcile_skips_missing_binary() {
928        let tmp = tempfile::tempdir().unwrap();
929        let reg_path = tmp.path().join("registry.json");
930
931        let mut registry = NodeRegistry::load(&reg_path).unwrap();
932        let id = registry.add(seed_config(
933            tmp.path().join("does-not-exist"),
934            "0.10.1",
935            tmp.path().join("data"),
936        ));
937
938        reconcile_registry_versions(&mut registry).await;
939
940        assert_eq!(registry.get(id).unwrap().version, "0.10.1");
941    }
942}