1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Instant;
5
6use axum::extract::{Path, State};
7use axum::http::StatusCode;
8use axum::response::sse::{Event, Sse};
9use axum::response::{Html, IntoResponse};
10use axum::routing::{get, post};
11use axum::{Json, Router};
12use tokio::sync::broadcast;
13use tokio::sync::RwLock;
14use tokio_util::sync::CancellationToken;
15
16use crate::error::Result;
17use crate::node::binary::NoopProgress;
18use crate::node::daemon::health::{DiskThresholds, FleetHealth};
19use crate::node::daemon::supervisor::{
20 spawn_eviction_monitor, spawn_liveness_monitor, spawn_upgrade_monitor, Supervisor,
21 EVICTION_POLL_INTERVAL, LIVENESS_POLL_INTERVAL, UPGRADE_POLL_INTERVAL,
22};
23use crate::node::events::NodeEvent;
24use crate::node::registry::NodeRegistry;
25use crate::node::types::{
26 AddNodeOpts, AddNodeResult, DaemonConfig, DaemonStatus, NodeInfo, NodeStarted, NodeStatus,
27 NodeStatusResult, NodeStatusSummary, NodeStopped, RemoveNodeResult, ResetResult,
28 StartNodeResult, StopNodeResult,
29};
30
31pub struct AppState {
33 pub registry: Arc<RwLock<NodeRegistry>>,
34 pub supervisor: Arc<RwLock<Supervisor>>,
35 pub event_tx: broadcast::Sender<NodeEvent>,
36 pub start_time: Instant,
37 pub config: DaemonConfig,
38 pub bound_port: u16,
40 pub health: Arc<RwLock<FleetHealth>>,
43}
44
45pub async fn start(
49 config: DaemonConfig,
50 mut registry: NodeRegistry,
51 shutdown: CancellationToken,
52) -> Result<SocketAddr> {
53 let (event_tx, _) = broadcast::channel(256);
54
55 let addr = SocketAddr::new(config.listen_addr, config.port.unwrap_or(0));
56 let listener = tokio::net::TcpListener::bind(addr)
57 .await
58 .map_err(|e| crate::error::Error::BindError(e.to_string()))?;
59 let bound_addr = listener
60 .local_addr()
61 .map_err(|e| crate::error::Error::BindError(e.to_string()))?;
62
63 reconcile_registry_versions(&mut registry).await;
68
69 let registry = Arc::new(RwLock::new(registry));
70 let supervisor = Arc::new(RwLock::new(Supervisor::new(event_tx.clone())));
71
72 {
77 let reg = registry.read().await;
78 let mut sup = supervisor.write().await;
79 let adopted = sup.adopt_from_registry(®);
80 if !adopted.is_empty() {
81 tracing::info!(
82 "Adopted {} running node(s) from a previous daemon instance: {:?}",
83 adopted.len(),
84 adopted
85 );
86 }
87 }
88
89 let health = Arc::new(RwLock::new(FleetHealth::healthy()));
90
91 let state = Arc::new(AppState {
92 registry: registry.clone(),
93 supervisor: supervisor.clone(),
94 event_tx: event_tx.clone(),
95 start_time: Instant::now(),
96 config: config.clone(),
97 bound_port: bound_addr.port(),
98 health: health.clone(),
99 });
100
101 spawn_upgrade_monitor(
105 registry.clone(),
106 supervisor.clone(),
107 UPGRADE_POLL_INTERVAL,
108 shutdown.clone(),
109 );
110
111 spawn_eviction_monitor(
116 registry.clone(),
117 supervisor.clone(),
118 event_tx.clone(),
119 health,
120 DiskThresholds::default(),
121 EVICTION_POLL_INTERVAL,
122 shutdown.clone(),
123 );
124
125 spawn_liveness_monitor(
129 registry,
130 supervisor,
131 event_tx,
132 LIVENESS_POLL_INTERVAL,
133 shutdown.clone(),
134 );
135
136 let app = build_router(state.clone());
137
138 write_file(&config.port_file_path, &bound_addr.port().to_string())?;
140 write_file(&config.pid_file_path, &std::process::id().to_string())?;
141
142 let port_file = config.port_file_path.clone();
143 let pid_file = config.pid_file_path.clone();
144
145 tokio::spawn(async move {
146 axum::serve(listener, app)
147 .with_graceful_shutdown(shutdown.cancelled_owned())
148 .await
149 .ok();
150
151 let _ = std::fs::remove_file(&port_file);
153 let _ = std::fs::remove_file(&pid_file);
154 });
155
156 Ok(bound_addr)
157}
158
159fn build_router(state: Arc<AppState>) -> Router {
160 use axum::http::HeaderValue;
161 use tower_http::cors::{Any, CorsLayer};
162
163 let origin = format!("http://127.0.0.1:{}", state.bound_port);
167 let cors = CorsLayer::new()
168 .allow_origin([origin.parse::<HeaderValue>().unwrap()])
169 .allow_methods(Any)
170 .allow_headers(Any);
171
172 Router::new()
173 .route("/console", get(get_console))
174 .route("/api/v1/status", get(get_status))
175 .route("/api/v1/health", get(get_health))
176 .route("/api/v1/events", get(get_events))
177 .route("/api/v1/nodes/status", get(get_nodes_status))
178 .route("/api/v1/nodes", post(post_nodes))
179 .route(
180 "/api/v1/nodes/{id}",
181 get(get_node_detail).delete(delete_node),
182 )
183 .route("/api/v1/nodes/{id}/start", post(post_start_node))
184 .route("/api/v1/nodes/start-all", post(post_start_all))
185 .route("/api/v1/nodes/{id}/stop", post(post_stop_node))
186 .route("/api/v1/nodes/stop-all", post(post_stop_all))
187 .route("/api/v1/reset", post(post_reset))
188 .route("/api/v1/openapi.json", get(get_openapi))
189 .layer(cors)
190 .with_state(state)
191}
192
193async fn get_status(State(state): State<Arc<AppState>>) -> Json<DaemonStatus> {
194 let registry = state.registry.read().await;
195 let supervisor = state.supervisor.read().await;
196 let (running, stopped, errored) = supervisor.node_counts();
197
198 Json(DaemonStatus {
199 running: true,
200 pid: Some(std::process::id()),
201 port: Some(state.bound_port),
202 uptime_secs: Some(state.start_time.elapsed().as_secs()),
203 nodes_total: registry.len() as u32,
204 nodes_running: running,
205 nodes_stopped: stopped,
206 nodes_errored: errored,
207 })
208}
209
210async fn get_health(State(state): State<Arc<AppState>>) -> Json<FleetHealth> {
214 Json(state.health.read().await.clone())
215}
216
217async fn get_events(
218 State(state): State<Arc<AppState>>,
219) -> Sse<impl futures_core::Stream<Item = std::result::Result<Event, std::convert::Infallible>>> {
220 let mut rx = state.event_tx.subscribe();
221
222 let stream = async_stream::stream! {
223 loop {
224 match rx.recv().await {
225 Ok(event) => {
226 let event_type = event.event_type().to_string();
227 if let Ok(data) = serde_json::to_string(&event) {
228 yield Ok(Event::default().event(event_type).data(data));
229 }
230 }
231 Err(broadcast::error::RecvError::Lagged(_)) => continue,
232 Err(broadcast::error::RecvError::Closed) => break,
233 }
234 }
235 };
236
237 Sse::new(stream)
238}
239
240async fn get_nodes_status(State(state): State<Arc<AppState>>) -> Json<NodeStatusResult> {
242 let registry = state.registry.read().await;
243 let supervisor = state.supervisor.read().await;
244
245 let mut nodes = Vec::new();
246 let mut total_running = 0u32;
247 let mut total_stopped = 0u32;
248
249 for config in registry.list() {
250 let status = if config.eviction.is_some() {
253 NodeStatus::Evicted
254 } else {
255 supervisor
256 .node_status(config.id)
257 .unwrap_or(NodeStatus::Stopped)
258 };
259
260 match status {
261 NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
262 total_running += 1
263 }
264 _ => total_stopped += 1,
265 }
266
267 let (pid, uptime_secs, pending_version) = if config.eviction.is_some() {
268 (None, None, None)
269 } else {
270 (
271 supervisor.node_pid(config.id),
272 supervisor.node_uptime_secs(config.id),
273 supervisor.node_pending_version(config.id),
274 )
275 };
276
277 nodes.push(NodeStatusSummary {
278 node_id: config.id,
279 name: config.service_name.clone(),
280 version: config.version.clone(),
281 status,
282 pid,
283 uptime_secs,
284 pending_version,
285 eviction: config.eviction.clone(),
286 });
287 }
288
289 Json(NodeStatusResult {
290 nodes,
291 total_running,
292 total_stopped,
293 })
294}
295
296async fn get_node_detail(
298 State(state): State<Arc<AppState>>,
299 Path(id): Path<u32>,
300) -> std::result::Result<Json<NodeInfo>, (StatusCode, Json<serde_json::Value>)> {
301 let registry = state.registry.read().await;
302 let config = match registry.get(id) {
303 Ok(config) => config.clone(),
304 Err(_) => {
305 return Err((
306 StatusCode::NOT_FOUND,
307 Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
308 ))
309 }
310 };
311
312 let supervisor = state.supervisor.read().await;
313 let (status, pid, uptime_secs, pending_version) = if config.eviction.is_some() {
315 (NodeStatus::Evicted, None, None, None)
316 } else {
317 (
318 supervisor.node_status(id).unwrap_or(NodeStatus::Stopped),
319 supervisor.node_pid(id),
320 supervisor.node_uptime_secs(id),
321 supervisor.node_pending_version(id),
322 )
323 };
324
325 Ok(Json(NodeInfo {
326 config,
327 status,
328 pid,
329 uptime_secs,
330 pending_version,
331 }))
332}
333
334async fn post_nodes(
336 State(state): State<Arc<AppState>>,
337 Json(opts): Json<AddNodeOpts>,
338) -> std::result::Result<(StatusCode, Json<AddNodeResult>), (StatusCode, Json<serde_json::Value>)> {
339 let registry_path = state.config.registry_path.clone();
340 let progress = NoopProgress;
341
342 match crate::node::add_nodes(opts, ®istry_path, &progress).await {
343 Ok(result) => {
344 let mut registry = state.registry.write().await;
346 if let Ok(fresh) = NodeRegistry::load(®istry_path) {
347 *registry = fresh;
348 }
349 Ok((StatusCode::CREATED, Json(result)))
350 }
351 Err(e) => Err((
352 StatusCode::BAD_REQUEST,
353 Json(serde_json::json!({ "error": e.to_string() })),
354 )),
355 }
356}
357
358async fn delete_node(
360 State(state): State<Arc<AppState>>,
361 Path(id): Path<u32>,
362) -> std::result::Result<Json<RemoveNodeResult>, (StatusCode, Json<serde_json::Value>)> {
363 let supervisor = state.supervisor.read().await;
365 if supervisor.is_running(id) {
366 return Err((
367 StatusCode::CONFLICT,
368 Json(serde_json::json!({
369 "error": format!("Cannot remove node {id} while it is running. Stop it first."),
370 "current_state": { "node_id": id, "status": "running" }
371 })),
372 ));
373 }
374 drop(supervisor);
375
376 let registry_path = state.config.registry_path.clone();
377
378 match crate::node::remove_node(id, ®istry_path) {
379 Ok(result) => {
380 let mut registry = state.registry.write().await;
382 if let Ok(fresh) = NodeRegistry::load(®istry_path) {
383 *registry = fresh;
384 }
385 Ok(Json(result))
386 }
387 Err(crate::error::Error::NodeNotFound(id)) => Err((
388 StatusCode::NOT_FOUND,
389 Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
390 )),
391 Err(e) => Err((
392 StatusCode::INTERNAL_SERVER_ERROR,
393 Json(serde_json::json!({ "error": e.to_string() })),
394 )),
395 }
396}
397
398async fn post_start_node(
400 State(state): State<Arc<AppState>>,
401 Path(id): Path<u32>,
402) -> std::result::Result<Json<NodeStarted>, (StatusCode, Json<serde_json::Value>)> {
403 let registry = state.registry.read().await;
404 let config = match registry.get(id) {
405 Ok(config) => config.clone(),
406 Err(_) => {
407 return Err((
408 StatusCode::NOT_FOUND,
409 Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
410 ))
411 }
412 };
413 drop(registry);
414
415 if config.eviction.is_some() {
418 return Err((
419 StatusCode::CONFLICT,
420 Json(serde_json::json!({
421 "error": format!(
422 "Node {id} has been evicted and cannot be started. Dismiss it with \
423 `ant node dismiss {id}` and add a new node instead."
424 ),
425 "current_state": { "node_id": id, "status": "evicted" }
426 })),
427 ));
428 }
429
430 let supervisor_ref = state.supervisor.clone();
431
432 let mut supervisor = state.supervisor.write().await;
434 if supervisor.is_running(id) {
435 let pid = supervisor.node_pid(id);
436 let uptime_secs = supervisor.node_uptime_secs(id);
437 return Err((
438 StatusCode::CONFLICT,
439 Json(serde_json::json!({
440 "error": format!("Node {id} is already running"),
441 "current_state": {
442 "node_id": id,
443 "status": "running",
444 "pid": pid,
445 "uptime_secs": uptime_secs,
446 }
447 })),
448 ));
449 }
450
451 let registry_ref = state.registry.clone();
452 match supervisor
453 .start_node(&config, supervisor_ref, registry_ref)
454 .await
455 {
456 Ok(started) => Ok(Json(started)),
457 Err(crate::error::Error::NodeAlreadyRunning(id)) => {
458 let pid = supervisor.node_pid(id);
459 let uptime_secs = supervisor.node_uptime_secs(id);
460 Err((
461 StatusCode::CONFLICT,
462 Json(serde_json::json!({
463 "error": format!("Node {id} is already running"),
464 "current_state": {
465 "node_id": id,
466 "status": "running",
467 "pid": pid,
468 "uptime_secs": uptime_secs,
469 }
470 })),
471 ))
472 }
473 Err(e) => Err((
474 StatusCode::INTERNAL_SERVER_ERROR,
475 Json(serde_json::json!({ "error": e.to_string() })),
476 )),
477 }
478}
479
480async fn post_start_all(State(state): State<Arc<AppState>>) -> Json<StartNodeResult> {
482 let registry = state.registry.read().await;
483 let configs: Vec<_> = registry
486 .list()
487 .into_iter()
488 .filter(|c| c.eviction.is_none())
489 .cloned()
490 .collect();
491 drop(registry);
492
493 let mut started = Vec::new();
494 let mut failed = Vec::new();
495 let mut already_running = Vec::new();
496
497 let supervisor_ref = state.supervisor.clone();
498 let registry_ref = state.registry.clone();
499
500 for config in &configs {
501 let mut supervisor = state.supervisor.write().await;
502 if supervisor.is_running(config.id) {
503 already_running.push(config.id);
504 continue;
505 }
506
507 match supervisor
508 .start_node(config, supervisor_ref.clone(), registry_ref.clone())
509 .await
510 {
511 Ok(result) => started.push(result),
512 Err(crate::error::Error::NodeAlreadyRunning(id)) => {
513 already_running.push(id);
514 }
515 Err(e) => {
516 failed.push(crate::node::types::NodeStartFailed {
517 node_id: config.id,
518 service_name: config.service_name.clone(),
519 error: e.to_string(),
520 });
521 }
522 }
523 }
524
525 Json(StartNodeResult {
526 started,
527 failed,
528 already_running,
529 })
530}
531
532async fn post_stop_node(
534 State(state): State<Arc<AppState>>,
535 Path(id): Path<u32>,
536) -> std::result::Result<Json<NodeStopped>, (StatusCode, Json<serde_json::Value>)> {
537 let registry = state.registry.read().await;
538 let config = match registry.get(id) {
539 Ok(config) => config.clone(),
540 Err(_) => {
541 return Err((
542 StatusCode::NOT_FOUND,
543 Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
544 ))
545 }
546 };
547 drop(registry);
548
549 if config.eviction.is_some() {
551 return Err((
552 StatusCode::CONFLICT,
553 Json(serde_json::json!({
554 "error": format!(
555 "Node {id} has been evicted; there is nothing to stop. Dismiss it with \
556 `ant node dismiss {id}`."
557 ),
558 "current_state": { "node_id": id, "status": "evicted" }
559 })),
560 ));
561 }
562
563 let mut supervisor = state.supervisor.write().await;
565 if !supervisor.is_running(id) {
566 let status = supervisor
567 .node_status(id)
568 .unwrap_or(crate::node::types::NodeStatus::Stopped);
569 return Err((
570 StatusCode::CONFLICT,
571 Json(serde_json::json!({
572 "error": format!("Node {id} is not running"),
573 "current_state": {
574 "node_id": id,
575 "status": status,
576 }
577 })),
578 ));
579 }
580
581 match supervisor.stop_node(id).await {
582 Ok(()) => Ok(Json(NodeStopped {
583 node_id: id,
584 service_name: config.service_name,
585 })),
586 Err(crate::error::Error::NodeNotRunning(id)) => {
587 let status = supervisor
588 .node_status(id)
589 .unwrap_or(crate::node::types::NodeStatus::Stopped);
590 Err((
591 StatusCode::CONFLICT,
592 Json(serde_json::json!({
593 "error": format!("Node {id} is not running"),
594 "current_state": {
595 "node_id": id,
596 "status": status,
597 }
598 })),
599 ))
600 }
601 Err(e) => Err((
602 StatusCode::INTERNAL_SERVER_ERROR,
603 Json(serde_json::json!({ "error": e.to_string() })),
604 )),
605 }
606}
607
608async fn post_stop_all(State(state): State<Arc<AppState>>) -> Json<StopNodeResult> {
610 let registry = state.registry.read().await;
611 let configs: Vec<(u32, String)> = registry
613 .list()
614 .into_iter()
615 .filter(|c| c.eviction.is_none())
616 .map(|c| (c.id, c.service_name.clone()))
617 .collect();
618 drop(registry);
619
620 let mut supervisor = state.supervisor.write().await;
621 let result = supervisor.stop_all_nodes(&configs).await;
622
623 Json(result)
624}
625
626async fn post_reset(
628 State(state): State<Arc<AppState>>,
629) -> std::result::Result<Json<ResetResult>, (StatusCode, Json<serde_json::Value>)> {
630 let supervisor = state.supervisor.write().await;
633 let (running, _, _) = supervisor.node_counts();
634 if running > 0 {
635 return Err((
636 StatusCode::CONFLICT,
637 Json(serde_json::json!({
638 "error": format!("Cannot reset while nodes are running ({running} node(s) still running). Stop all nodes first."),
639 "nodes_running": running,
640 })),
641 ));
642 }
643 drop(supervisor);
644
645 let registry_path = state.config.registry_path.clone();
646
647 match crate::node::reset(®istry_path) {
648 Ok(result) => {
649 let mut registry = state.registry.write().await;
651 if let Ok(fresh) = NodeRegistry::load(®istry_path) {
652 *registry = fresh;
653 }
654 Ok(Json(result))
655 }
656 Err(e) => Err((
657 StatusCode::INTERNAL_SERVER_ERROR,
658 Json(serde_json::json!({ "error": e.to_string() })),
659 )),
660 }
661}
662
663async fn get_openapi() -> impl IntoResponse {
664 let spec = serde_json::json!({
667 "openapi": "3.1.0",
668 "info": {
669 "title": "Ant Daemon API",
670 "version": "0.1.0",
671 "description": "REST API for the ant node management daemon"
672 },
673 "paths": {
674 "/api/v1/status": {
675 "get": {
676 "summary": "Daemon status",
677 "description": "Returns daemon health, uptime, and node count summary",
678 "responses": {
679 "200": {
680 "description": "Daemon status",
681 "content": {
682 "application/json": {
683 "schema": { "$ref": "#/components/schemas/DaemonStatus" }
684 }
685 }
686 }
687 }
688 }
689 },
690 "/api/v1/events": {
691 "get": {
692 "summary": "Event stream",
693 "description": "SSE stream of real-time node events",
694 "responses": {
695 "200": {
696 "description": "SSE event stream"
697 }
698 }
699 }
700 },
701 "/api/v1/nodes": {
702 "post": {
703 "summary": "Add nodes",
704 "description": "Add one or more nodes to the registry",
705 "requestBody": {
706 "required": true,
707 "content": {
708 "application/json": {
709 "schema": { "$ref": "#/components/schemas/AddNodeOpts" }
710 }
711 }
712 },
713 "responses": {
714 "201": {
715 "description": "Nodes added",
716 "content": {
717 "application/json": {
718 "schema": { "$ref": "#/components/schemas/AddNodeResult" }
719 }
720 }
721 },
722 "400": {
723 "description": "Invalid request"
724 }
725 }
726 }
727 },
728 "/api/v1/nodes/{id}": {
729 "delete": {
730 "summary": "Remove node",
731 "description": "Remove a node from the registry",
732 "parameters": [{
733 "name": "id",
734 "in": "path",
735 "required": true,
736 "schema": { "type": "integer" }
737 }],
738 "responses": {
739 "200": {
740 "description": "Node removed",
741 "content": {
742 "application/json": {
743 "schema": { "$ref": "#/components/schemas/RemoveNodeResult" }
744 }
745 }
746 },
747 "404": {
748 "description": "Node not found"
749 }
750 }
751 }
752 },
753 "/api/v1/nodes/{id}/start": {
754 "post": {
755 "summary": "Start a node",
756 "description": "Start a specific node by ID. Returns 409 if already running with current_state.",
757 "parameters": [{
758 "name": "id",
759 "in": "path",
760 "required": true,
761 "schema": { "type": "integer" }
762 }],
763 "responses": {
764 "200": {
765 "description": "Node started",
766 "content": {
767 "application/json": {
768 "schema": { "$ref": "#/components/schemas/NodeStarted" }
769 }
770 }
771 },
772 "404": {
773 "description": "Node not found"
774 },
775 "409": {
776 "description": "Node already running (includes current_state)"
777 },
778 "500": {
779 "description": "Failed to start node"
780 }
781 }
782 }
783 },
784 "/api/v1/nodes/start-all": {
785 "post": {
786 "summary": "Start all nodes",
787 "description": "Start all registered nodes. Returns per-node results.",
788 "responses": {
789 "200": {
790 "description": "Start results",
791 "content": {
792 "application/json": {
793 "schema": { "$ref": "#/components/schemas/StartNodeResult" }
794 }
795 }
796 }
797 }
798 }
799 },
800 "/api/v1/nodes/{id}/stop": {
801 "post": {
802 "summary": "Stop a node",
803 "description": "Stop a specific node by ID. Returns 409 if already stopped with current_state.",
804 "parameters": [{
805 "name": "id",
806 "in": "path",
807 "required": true,
808 "schema": { "type": "integer" }
809 }],
810 "responses": {
811 "200": {
812 "description": "Node stopped",
813 "content": {
814 "application/json": {
815 "schema": { "$ref": "#/components/schemas/NodeStopped" }
816 }
817 }
818 },
819 "404": {
820 "description": "Node not found"
821 },
822 "409": {
823 "description": "Node not running (includes current_state)"
824 },
825 "500": {
826 "description": "Failed to stop node"
827 }
828 }
829 }
830 },
831 "/api/v1/nodes/stop-all": {
832 "post": {
833 "summary": "Stop all nodes",
834 "description": "Stop all running nodes. Returns per-node results.",
835 "responses": {
836 "200": {
837 "description": "Stop results",
838 "content": {
839 "application/json": {
840 "schema": { "$ref": "#/components/schemas/StopNodeResult" }
841 }
842 }
843 }
844 }
845 }
846 },
847 "/api/v1/reset": {
848 "post": {
849 "summary": "Reset all node state",
850 "description": "Remove all node data directories, log directories, and clear the registry. Fails if any nodes are running.",
851 "responses": {
852 "200": {
853 "description": "Reset successful",
854 "content": {
855 "application/json": {
856 "schema": { "$ref": "#/components/schemas/ResetResult" }
857 }
858 }
859 },
860 "409": {
861 "description": "Nodes still running"
862 }
863 }
864 }
865 }
866 },
867 "components": {
868 "schemas": {
869 "DaemonStatus": {
870 "type": "object",
871 "properties": {
872 "running": { "type": "boolean" },
873 "pid": { "type": "integer", "nullable": true },
874 "port": { "type": "integer", "nullable": true },
875 "uptime_secs": { "type": "integer", "nullable": true },
876 "nodes_total": { "type": "integer" },
877 "nodes_running": { "type": "integer" },
878 "nodes_stopped": { "type": "integer" },
879 "nodes_errored": { "type": "integer" }
880 }
881 }
882 }
883 }
884 });
885 Json(spec)
886}
887
888async fn get_console() -> Html<&'static str> {
889 Html(include_str!("console.html"))
890}
891
892fn write_file(path: &PathBuf, contents: &str) -> Result<()> {
893 if let Some(parent) = path.parent() {
894 std::fs::create_dir_all(parent)?;
895 }
896 std::fs::write(path, contents)?;
897 Ok(())
898}
899
900async fn reconcile_registry_versions(registry: &mut NodeRegistry) {
906 let node_ids: Vec<u32> = registry.list().iter().map(|c| c.id).collect();
907 let mut changed = false;
908
909 for id in node_ids {
910 let (binary_path, recorded_version) = match registry.get(id) {
911 Ok(c) => (c.binary_path.clone(), c.version.clone()),
912 Err(_) => continue,
913 };
914
915 if !binary_path.exists() {
916 continue;
917 }
918
919 let Ok(disk_version) = crate::node::binary::extract_version(&binary_path).await else {
920 continue;
921 };
922
923 if disk_version == recorded_version {
924 continue;
925 }
926
927 if let Ok(entry) = registry.get_mut(id) {
928 entry.version = disk_version;
929 changed = true;
930 }
931 }
932
933 if changed {
934 let _ = registry.save();
935 }
936}
937
938#[cfg(all(test, unix))]
939mod tests {
940 use super::*;
941 use crate::node::registry::NodeRegistry;
942 use crate::node::types::{EvmNetwork, NodeConfig};
943 use std::collections::HashMap;
944 use std::os::unix::fs::PermissionsExt;
945
946 fn write_fake_binary(path: &std::path::Path, stdout: &str) {
947 let script = format!("#!/bin/sh\nprintf '%s\\n' '{stdout}'\n");
948 std::fs::write(path, script).unwrap();
949 let mut perm = std::fs::metadata(path).unwrap().permissions();
950 perm.set_mode(0o755);
951 std::fs::set_permissions(path, perm).unwrap();
952 }
953
954 fn seed_config(binary_path: PathBuf, version: &str, data_dir: PathBuf) -> NodeConfig {
955 NodeConfig {
956 id: 0,
957 service_name: String::new(),
958 rewards_address: "0x0".into(),
959 data_dir,
960 log_dir: None,
961 node_port: None,
962 binary_path,
963 version: version.into(),
964 env_variables: HashMap::new(),
965 bootstrap_peers: vec![],
966 upgrade_channel: None,
967 evm_network: EvmNetwork::default(),
968 eviction: None,
969 }
970 }
971
972 #[tokio::test]
973 async fn reconcile_updates_stale_version_and_persists() {
974 let tmp = tempfile::tempdir().unwrap();
975 let reg_path = tmp.path().join("registry.json");
976 let bin_path = tmp.path().join("ant-node");
977 write_fake_binary(&bin_path, "ant-node 0.10.11-rc.1");
978
979 let mut registry = NodeRegistry::load(®_path).unwrap();
980 let id = registry.add(seed_config(
981 bin_path.clone(),
982 "0.10.1",
983 tmp.path().join("data"),
984 ));
985 registry.save().unwrap();
986
987 reconcile_registry_versions(&mut registry).await;
988
989 assert_eq!(registry.get(id).unwrap().version, "0.10.11-rc.1");
990
991 let reloaded = NodeRegistry::load(®_path).unwrap();
992 assert_eq!(reloaded.get(id).unwrap().version, "0.10.11-rc.1");
993 }
994
995 #[tokio::test]
996 async fn reconcile_leaves_matching_version_alone() {
997 let tmp = tempfile::tempdir().unwrap();
998 let reg_path = tmp.path().join("registry.json");
999 let bin_path = tmp.path().join("ant-node");
1000 write_fake_binary(&bin_path, "ant-node 0.10.1");
1001
1002 let mut registry = NodeRegistry::load(®_path).unwrap();
1003 let id = registry.add(seed_config(
1004 bin_path.clone(),
1005 "0.10.1",
1006 tmp.path().join("data"),
1007 ));
1008
1009 reconcile_registry_versions(&mut registry).await;
1010
1011 assert_eq!(registry.get(id).unwrap().version, "0.10.1");
1012 }
1013
1014 #[tokio::test]
1015 async fn reconcile_skips_missing_binary() {
1016 let tmp = tempfile::tempdir().unwrap();
1017 let reg_path = tmp.path().join("registry.json");
1018
1019 let mut registry = NodeRegistry::load(®_path).unwrap();
1020 let id = registry.add(seed_config(
1021 tmp.path().join("does-not-exist"),
1022 "0.10.1",
1023 tmp.path().join("data"),
1024 ));
1025
1026 reconcile_registry_versions(&mut registry).await;
1027
1028 assert_eq!(registry.get(id).unwrap().version, "0.10.1");
1029 }
1030}