1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Instant;
5
6use axum::extract::{Path, State};
7use axum::http::StatusCode;
8use axum::response::sse::{Event, Sse};
9use axum::response::{Html, IntoResponse};
10use axum::routing::{get, post};
11use axum::{Json, Router};
12use tokio::sync::broadcast;
13use tokio::sync::RwLock;
14use tokio_util::sync::CancellationToken;
15
16use crate::error::Result;
17use crate::node::binary::NoopProgress;
18use crate::node::daemon::supervisor::{
19 spawn_liveness_monitor, spawn_upgrade_monitor, Supervisor, LIVENESS_POLL_INTERVAL,
20 UPGRADE_POLL_INTERVAL,
21};
22use crate::node::events::NodeEvent;
23use crate::node::registry::NodeRegistry;
24use crate::node::types::{
25 AddNodeOpts, AddNodeResult, DaemonConfig, DaemonStatus, NodeInfo, NodeStarted, NodeStatus,
26 NodeStatusResult, NodeStatusSummary, NodeStopped, RemoveNodeResult, ResetResult,
27 StartNodeResult, StopNodeResult,
28};
29
30pub struct AppState {
32 pub registry: Arc<RwLock<NodeRegistry>>,
33 pub supervisor: Arc<RwLock<Supervisor>>,
34 pub event_tx: broadcast::Sender<NodeEvent>,
35 pub start_time: Instant,
36 pub config: DaemonConfig,
37 pub bound_port: u16,
39}
40
41pub async fn start(
45 config: DaemonConfig,
46 mut registry: NodeRegistry,
47 shutdown: CancellationToken,
48) -> Result<SocketAddr> {
49 let (event_tx, _) = broadcast::channel(256);
50
51 let addr = SocketAddr::new(config.listen_addr, config.port.unwrap_or(0));
52 let listener = tokio::net::TcpListener::bind(addr)
53 .await
54 .map_err(|e| crate::error::Error::BindError(e.to_string()))?;
55 let bound_addr = listener
56 .local_addr()
57 .map_err(|e| crate::error::Error::BindError(e.to_string()))?;
58
59 reconcile_registry_versions(&mut registry).await;
64
65 let registry = Arc::new(RwLock::new(registry));
66 let supervisor = Arc::new(RwLock::new(Supervisor::new(event_tx.clone())));
67
68 {
73 let reg = registry.read().await;
74 let mut sup = supervisor.write().await;
75 let adopted = sup.adopt_from_registry(®);
76 if !adopted.is_empty() {
77 tracing::info!(
78 "Adopted {} running node(s) from a previous daemon instance: {:?}",
79 adopted.len(),
80 adopted
81 );
82 }
83 }
84
85 let state = Arc::new(AppState {
86 registry: registry.clone(),
87 supervisor: supervisor.clone(),
88 event_tx: event_tx.clone(),
89 start_time: Instant::now(),
90 config: config.clone(),
91 bound_port: bound_addr.port(),
92 });
93
94 spawn_upgrade_monitor(
98 registry.clone(),
99 supervisor.clone(),
100 UPGRADE_POLL_INTERVAL,
101 shutdown.clone(),
102 );
103
104 spawn_liveness_monitor(
108 registry,
109 supervisor,
110 event_tx,
111 LIVENESS_POLL_INTERVAL,
112 shutdown.clone(),
113 );
114
115 let app = build_router(state.clone());
116
117 write_file(&config.port_file_path, &bound_addr.port().to_string())?;
119 write_file(&config.pid_file_path, &std::process::id().to_string())?;
120
121 let port_file = config.port_file_path.clone();
122 let pid_file = config.pid_file_path.clone();
123
124 tokio::spawn(async move {
125 axum::serve(listener, app)
126 .with_graceful_shutdown(shutdown.cancelled_owned())
127 .await
128 .ok();
129
130 let _ = std::fs::remove_file(&port_file);
132 let _ = std::fs::remove_file(&pid_file);
133 });
134
135 Ok(bound_addr)
136}
137
138fn build_router(state: Arc<AppState>) -> Router {
139 use axum::http::HeaderValue;
140 use tower_http::cors::{Any, CorsLayer};
141
142 let origin = format!("http://127.0.0.1:{}", state.bound_port);
146 let cors = CorsLayer::new()
147 .allow_origin([origin.parse::<HeaderValue>().unwrap()])
148 .allow_methods(Any)
149 .allow_headers(Any);
150
151 Router::new()
152 .route("/console", get(get_console))
153 .route("/api/v1/status", get(get_status))
154 .route("/api/v1/events", get(get_events))
155 .route("/api/v1/nodes/status", get(get_nodes_status))
156 .route("/api/v1/nodes", post(post_nodes))
157 .route(
158 "/api/v1/nodes/{id}",
159 get(get_node_detail).delete(delete_node),
160 )
161 .route("/api/v1/nodes/{id}/start", post(post_start_node))
162 .route("/api/v1/nodes/start-all", post(post_start_all))
163 .route("/api/v1/nodes/{id}/stop", post(post_stop_node))
164 .route("/api/v1/nodes/stop-all", post(post_stop_all))
165 .route("/api/v1/reset", post(post_reset))
166 .route("/api/v1/openapi.json", get(get_openapi))
167 .layer(cors)
168 .with_state(state)
169}
170
171async fn get_status(State(state): State<Arc<AppState>>) -> Json<DaemonStatus> {
172 let registry = state.registry.read().await;
173 let supervisor = state.supervisor.read().await;
174 let (running, stopped, errored) = supervisor.node_counts();
175
176 Json(DaemonStatus {
177 running: true,
178 pid: Some(std::process::id()),
179 port: Some(state.bound_port),
180 uptime_secs: Some(state.start_time.elapsed().as_secs()),
181 nodes_total: registry.len() as u32,
182 nodes_running: running,
183 nodes_stopped: stopped,
184 nodes_errored: errored,
185 })
186}
187
188async fn get_events(
189 State(state): State<Arc<AppState>>,
190) -> Sse<impl futures_core::Stream<Item = std::result::Result<Event, std::convert::Infallible>>> {
191 let mut rx = state.event_tx.subscribe();
192
193 let stream = async_stream::stream! {
194 loop {
195 match rx.recv().await {
196 Ok(event) => {
197 let event_type = event.event_type().to_string();
198 if let Ok(data) = serde_json::to_string(&event) {
199 yield Ok(Event::default().event(event_type).data(data));
200 }
201 }
202 Err(broadcast::error::RecvError::Lagged(_)) => continue,
203 Err(broadcast::error::RecvError::Closed) => break,
204 }
205 }
206 };
207
208 Sse::new(stream)
209}
210
211async fn get_nodes_status(State(state): State<Arc<AppState>>) -> Json<NodeStatusResult> {
213 let registry = state.registry.read().await;
214 let supervisor = state.supervisor.read().await;
215
216 let mut nodes = Vec::new();
217 let mut total_running = 0u32;
218 let mut total_stopped = 0u32;
219
220 for config in registry.list() {
221 let status = supervisor
222 .node_status(config.id)
223 .unwrap_or(NodeStatus::Stopped);
224
225 match status {
226 NodeStatus::Running | NodeStatus::Starting | NodeStatus::UpgradeScheduled => {
227 total_running += 1
228 }
229 _ => total_stopped += 1,
230 }
231
232 let pid = supervisor.node_pid(config.id);
233 let uptime_secs = supervisor.node_uptime_secs(config.id);
234 let pending_version = supervisor.node_pending_version(config.id);
235
236 nodes.push(NodeStatusSummary {
237 node_id: config.id,
238 name: config.service_name.clone(),
239 version: config.version.clone(),
240 status,
241 pid,
242 uptime_secs,
243 pending_version,
244 });
245 }
246
247 Json(NodeStatusResult {
248 nodes,
249 total_running,
250 total_stopped,
251 })
252}
253
254async fn get_node_detail(
256 State(state): State<Arc<AppState>>,
257 Path(id): Path<u32>,
258) -> std::result::Result<Json<NodeInfo>, (StatusCode, Json<serde_json::Value>)> {
259 let registry = state.registry.read().await;
260 let config = match registry.get(id) {
261 Ok(config) => config.clone(),
262 Err(_) => {
263 return Err((
264 StatusCode::NOT_FOUND,
265 Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
266 ))
267 }
268 };
269
270 let supervisor = state.supervisor.read().await;
271 let status = supervisor.node_status(id).unwrap_or(NodeStatus::Stopped);
272 let pid = supervisor.node_pid(id);
273 let uptime_secs = supervisor.node_uptime_secs(id);
274 let pending_version = supervisor.node_pending_version(id);
275
276 Ok(Json(NodeInfo {
277 config,
278 status,
279 pid,
280 uptime_secs,
281 pending_version,
282 }))
283}
284
285async fn post_nodes(
287 State(state): State<Arc<AppState>>,
288 Json(opts): Json<AddNodeOpts>,
289) -> std::result::Result<(StatusCode, Json<AddNodeResult>), (StatusCode, Json<serde_json::Value>)> {
290 let registry_path = state.config.registry_path.clone();
291 let progress = NoopProgress;
292
293 match crate::node::add_nodes(opts, ®istry_path, &progress).await {
294 Ok(result) => {
295 let mut registry = state.registry.write().await;
297 if let Ok(fresh) = NodeRegistry::load(®istry_path) {
298 *registry = fresh;
299 }
300 Ok((StatusCode::CREATED, Json(result)))
301 }
302 Err(e) => Err((
303 StatusCode::BAD_REQUEST,
304 Json(serde_json::json!({ "error": e.to_string() })),
305 )),
306 }
307}
308
309async fn delete_node(
311 State(state): State<Arc<AppState>>,
312 Path(id): Path<u32>,
313) -> std::result::Result<Json<RemoveNodeResult>, (StatusCode, Json<serde_json::Value>)> {
314 let supervisor = state.supervisor.read().await;
316 if supervisor.is_running(id) {
317 return Err((
318 StatusCode::CONFLICT,
319 Json(serde_json::json!({
320 "error": format!("Cannot remove node {id} while it is running. Stop it first."),
321 "current_state": { "node_id": id, "status": "running" }
322 })),
323 ));
324 }
325 drop(supervisor);
326
327 let registry_path = state.config.registry_path.clone();
328
329 match crate::node::remove_node(id, ®istry_path) {
330 Ok(result) => {
331 let mut registry = state.registry.write().await;
333 if let Ok(fresh) = NodeRegistry::load(®istry_path) {
334 *registry = fresh;
335 }
336 Ok(Json(result))
337 }
338 Err(crate::error::Error::NodeNotFound(id)) => Err((
339 StatusCode::NOT_FOUND,
340 Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
341 )),
342 Err(e) => Err((
343 StatusCode::INTERNAL_SERVER_ERROR,
344 Json(serde_json::json!({ "error": e.to_string() })),
345 )),
346 }
347}
348
349async fn post_start_node(
351 State(state): State<Arc<AppState>>,
352 Path(id): Path<u32>,
353) -> std::result::Result<Json<NodeStarted>, (StatusCode, Json<serde_json::Value>)> {
354 let registry = state.registry.read().await;
355 let config = match registry.get(id) {
356 Ok(config) => config.clone(),
357 Err(_) => {
358 return Err((
359 StatusCode::NOT_FOUND,
360 Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
361 ))
362 }
363 };
364 drop(registry);
365
366 let supervisor_ref = state.supervisor.clone();
367
368 let mut supervisor = state.supervisor.write().await;
370 if supervisor.is_running(id) {
371 let pid = supervisor.node_pid(id);
372 let uptime_secs = supervisor.node_uptime_secs(id);
373 return Err((
374 StatusCode::CONFLICT,
375 Json(serde_json::json!({
376 "error": format!("Node {id} is already running"),
377 "current_state": {
378 "node_id": id,
379 "status": "running",
380 "pid": pid,
381 "uptime_secs": uptime_secs,
382 }
383 })),
384 ));
385 }
386
387 let registry_ref = state.registry.clone();
388 match supervisor
389 .start_node(&config, supervisor_ref, registry_ref)
390 .await
391 {
392 Ok(started) => Ok(Json(started)),
393 Err(crate::error::Error::NodeAlreadyRunning(id)) => {
394 let pid = supervisor.node_pid(id);
395 let uptime_secs = supervisor.node_uptime_secs(id);
396 Err((
397 StatusCode::CONFLICT,
398 Json(serde_json::json!({
399 "error": format!("Node {id} is already running"),
400 "current_state": {
401 "node_id": id,
402 "status": "running",
403 "pid": pid,
404 "uptime_secs": uptime_secs,
405 }
406 })),
407 ))
408 }
409 Err(e) => Err((
410 StatusCode::INTERNAL_SERVER_ERROR,
411 Json(serde_json::json!({ "error": e.to_string() })),
412 )),
413 }
414}
415
416async fn post_start_all(State(state): State<Arc<AppState>>) -> Json<StartNodeResult> {
418 let registry = state.registry.read().await;
419 let configs: Vec<_> = registry.list().into_iter().cloned().collect();
420 drop(registry);
421
422 let mut started = Vec::new();
423 let mut failed = Vec::new();
424 let mut already_running = Vec::new();
425
426 let supervisor_ref = state.supervisor.clone();
427 let registry_ref = state.registry.clone();
428
429 for config in &configs {
430 let mut supervisor = state.supervisor.write().await;
431 if supervisor.is_running(config.id) {
432 already_running.push(config.id);
433 continue;
434 }
435
436 match supervisor
437 .start_node(config, supervisor_ref.clone(), registry_ref.clone())
438 .await
439 {
440 Ok(result) => started.push(result),
441 Err(crate::error::Error::NodeAlreadyRunning(id)) => {
442 already_running.push(id);
443 }
444 Err(e) => {
445 failed.push(crate::node::types::NodeStartFailed {
446 node_id: config.id,
447 service_name: config.service_name.clone(),
448 error: e.to_string(),
449 });
450 }
451 }
452 }
453
454 Json(StartNodeResult {
455 started,
456 failed,
457 already_running,
458 })
459}
460
461async fn post_stop_node(
463 State(state): State<Arc<AppState>>,
464 Path(id): Path<u32>,
465) -> std::result::Result<Json<NodeStopped>, (StatusCode, Json<serde_json::Value>)> {
466 let registry = state.registry.read().await;
467 let config = match registry.get(id) {
468 Ok(config) => config.clone(),
469 Err(_) => {
470 return Err((
471 StatusCode::NOT_FOUND,
472 Json(serde_json::json!({ "error": format!("Node not found: {id}") })),
473 ))
474 }
475 };
476 drop(registry);
477
478 let mut supervisor = state.supervisor.write().await;
480 if !supervisor.is_running(id) {
481 let status = supervisor
482 .node_status(id)
483 .unwrap_or(crate::node::types::NodeStatus::Stopped);
484 return Err((
485 StatusCode::CONFLICT,
486 Json(serde_json::json!({
487 "error": format!("Node {id} is not running"),
488 "current_state": {
489 "node_id": id,
490 "status": status,
491 }
492 })),
493 ));
494 }
495
496 match supervisor.stop_node(id).await {
497 Ok(()) => Ok(Json(NodeStopped {
498 node_id: id,
499 service_name: config.service_name,
500 })),
501 Err(crate::error::Error::NodeNotRunning(id)) => {
502 let status = supervisor
503 .node_status(id)
504 .unwrap_or(crate::node::types::NodeStatus::Stopped);
505 Err((
506 StatusCode::CONFLICT,
507 Json(serde_json::json!({
508 "error": format!("Node {id} is not running"),
509 "current_state": {
510 "node_id": id,
511 "status": status,
512 }
513 })),
514 ))
515 }
516 Err(e) => Err((
517 StatusCode::INTERNAL_SERVER_ERROR,
518 Json(serde_json::json!({ "error": e.to_string() })),
519 )),
520 }
521}
522
523async fn post_stop_all(State(state): State<Arc<AppState>>) -> Json<StopNodeResult> {
525 let registry = state.registry.read().await;
526 let configs: Vec<(u32, String)> = registry
527 .list()
528 .into_iter()
529 .map(|c| (c.id, c.service_name.clone()))
530 .collect();
531 drop(registry);
532
533 let mut supervisor = state.supervisor.write().await;
534 let result = supervisor.stop_all_nodes(&configs).await;
535
536 Json(result)
537}
538
539async fn post_reset(
541 State(state): State<Arc<AppState>>,
542) -> std::result::Result<Json<ResetResult>, (StatusCode, Json<serde_json::Value>)> {
543 let supervisor = state.supervisor.write().await;
546 let (running, _, _) = supervisor.node_counts();
547 if running > 0 {
548 return Err((
549 StatusCode::CONFLICT,
550 Json(serde_json::json!({
551 "error": format!("Cannot reset while nodes are running ({running} node(s) still running). Stop all nodes first."),
552 "nodes_running": running,
553 })),
554 ));
555 }
556 drop(supervisor);
557
558 let registry_path = state.config.registry_path.clone();
559
560 match crate::node::reset(®istry_path) {
561 Ok(result) => {
562 let mut registry = state.registry.write().await;
564 if let Ok(fresh) = NodeRegistry::load(®istry_path) {
565 *registry = fresh;
566 }
567 Ok(Json(result))
568 }
569 Err(e) => Err((
570 StatusCode::INTERNAL_SERVER_ERROR,
571 Json(serde_json::json!({ "error": e.to_string() })),
572 )),
573 }
574}
575
576async fn get_openapi() -> impl IntoResponse {
577 let spec = serde_json::json!({
580 "openapi": "3.1.0",
581 "info": {
582 "title": "Ant Daemon API",
583 "version": "0.1.0",
584 "description": "REST API for the ant node management daemon"
585 },
586 "paths": {
587 "/api/v1/status": {
588 "get": {
589 "summary": "Daemon status",
590 "description": "Returns daemon health, uptime, and node count summary",
591 "responses": {
592 "200": {
593 "description": "Daemon status",
594 "content": {
595 "application/json": {
596 "schema": { "$ref": "#/components/schemas/DaemonStatus" }
597 }
598 }
599 }
600 }
601 }
602 },
603 "/api/v1/events": {
604 "get": {
605 "summary": "Event stream",
606 "description": "SSE stream of real-time node events",
607 "responses": {
608 "200": {
609 "description": "SSE event stream"
610 }
611 }
612 }
613 },
614 "/api/v1/nodes": {
615 "post": {
616 "summary": "Add nodes",
617 "description": "Add one or more nodes to the registry",
618 "requestBody": {
619 "required": true,
620 "content": {
621 "application/json": {
622 "schema": { "$ref": "#/components/schemas/AddNodeOpts" }
623 }
624 }
625 },
626 "responses": {
627 "201": {
628 "description": "Nodes added",
629 "content": {
630 "application/json": {
631 "schema": { "$ref": "#/components/schemas/AddNodeResult" }
632 }
633 }
634 },
635 "400": {
636 "description": "Invalid request"
637 }
638 }
639 }
640 },
641 "/api/v1/nodes/{id}": {
642 "delete": {
643 "summary": "Remove node",
644 "description": "Remove a node from the registry",
645 "parameters": [{
646 "name": "id",
647 "in": "path",
648 "required": true,
649 "schema": { "type": "integer" }
650 }],
651 "responses": {
652 "200": {
653 "description": "Node removed",
654 "content": {
655 "application/json": {
656 "schema": { "$ref": "#/components/schemas/RemoveNodeResult" }
657 }
658 }
659 },
660 "404": {
661 "description": "Node not found"
662 }
663 }
664 }
665 },
666 "/api/v1/nodes/{id}/start": {
667 "post": {
668 "summary": "Start a node",
669 "description": "Start a specific node by ID. Returns 409 if already running with current_state.",
670 "parameters": [{
671 "name": "id",
672 "in": "path",
673 "required": true,
674 "schema": { "type": "integer" }
675 }],
676 "responses": {
677 "200": {
678 "description": "Node started",
679 "content": {
680 "application/json": {
681 "schema": { "$ref": "#/components/schemas/NodeStarted" }
682 }
683 }
684 },
685 "404": {
686 "description": "Node not found"
687 },
688 "409": {
689 "description": "Node already running (includes current_state)"
690 },
691 "500": {
692 "description": "Failed to start node"
693 }
694 }
695 }
696 },
697 "/api/v1/nodes/start-all": {
698 "post": {
699 "summary": "Start all nodes",
700 "description": "Start all registered nodes. Returns per-node results.",
701 "responses": {
702 "200": {
703 "description": "Start results",
704 "content": {
705 "application/json": {
706 "schema": { "$ref": "#/components/schemas/StartNodeResult" }
707 }
708 }
709 }
710 }
711 }
712 },
713 "/api/v1/nodes/{id}/stop": {
714 "post": {
715 "summary": "Stop a node",
716 "description": "Stop a specific node by ID. Returns 409 if already stopped with current_state.",
717 "parameters": [{
718 "name": "id",
719 "in": "path",
720 "required": true,
721 "schema": { "type": "integer" }
722 }],
723 "responses": {
724 "200": {
725 "description": "Node stopped",
726 "content": {
727 "application/json": {
728 "schema": { "$ref": "#/components/schemas/NodeStopped" }
729 }
730 }
731 },
732 "404": {
733 "description": "Node not found"
734 },
735 "409": {
736 "description": "Node not running (includes current_state)"
737 },
738 "500": {
739 "description": "Failed to stop node"
740 }
741 }
742 }
743 },
744 "/api/v1/nodes/stop-all": {
745 "post": {
746 "summary": "Stop all nodes",
747 "description": "Stop all running nodes. Returns per-node results.",
748 "responses": {
749 "200": {
750 "description": "Stop results",
751 "content": {
752 "application/json": {
753 "schema": { "$ref": "#/components/schemas/StopNodeResult" }
754 }
755 }
756 }
757 }
758 }
759 },
760 "/api/v1/reset": {
761 "post": {
762 "summary": "Reset all node state",
763 "description": "Remove all node data directories, log directories, and clear the registry. Fails if any nodes are running.",
764 "responses": {
765 "200": {
766 "description": "Reset successful",
767 "content": {
768 "application/json": {
769 "schema": { "$ref": "#/components/schemas/ResetResult" }
770 }
771 }
772 },
773 "409": {
774 "description": "Nodes still running"
775 }
776 }
777 }
778 }
779 },
780 "components": {
781 "schemas": {
782 "DaemonStatus": {
783 "type": "object",
784 "properties": {
785 "running": { "type": "boolean" },
786 "pid": { "type": "integer", "nullable": true },
787 "port": { "type": "integer", "nullable": true },
788 "uptime_secs": { "type": "integer", "nullable": true },
789 "nodes_total": { "type": "integer" },
790 "nodes_running": { "type": "integer" },
791 "nodes_stopped": { "type": "integer" },
792 "nodes_errored": { "type": "integer" }
793 }
794 }
795 }
796 }
797 });
798 Json(spec)
799}
800
801async fn get_console() -> Html<&'static str> {
802 Html(include_str!("console.html"))
803}
804
805fn write_file(path: &PathBuf, contents: &str) -> Result<()> {
806 if let Some(parent) = path.parent() {
807 std::fs::create_dir_all(parent)?;
808 }
809 std::fs::write(path, contents)?;
810 Ok(())
811}
812
813async fn reconcile_registry_versions(registry: &mut NodeRegistry) {
819 let node_ids: Vec<u32> = registry.list().iter().map(|c| c.id).collect();
820 let mut changed = false;
821
822 for id in node_ids {
823 let (binary_path, recorded_version) = match registry.get(id) {
824 Ok(c) => (c.binary_path.clone(), c.version.clone()),
825 Err(_) => continue,
826 };
827
828 if !binary_path.exists() {
829 continue;
830 }
831
832 let Ok(disk_version) = crate::node::binary::extract_version(&binary_path).await else {
833 continue;
834 };
835
836 if disk_version == recorded_version {
837 continue;
838 }
839
840 if let Ok(entry) = registry.get_mut(id) {
841 entry.version = disk_version;
842 changed = true;
843 }
844 }
845
846 if changed {
847 let _ = registry.save();
848 }
849}
850
851#[cfg(all(test, unix))]
852mod tests {
853 use super::*;
854 use crate::node::registry::NodeRegistry;
855 use crate::node::types::NodeConfig;
856 use std::collections::HashMap;
857 use std::os::unix::fs::PermissionsExt;
858
859 fn write_fake_binary(path: &std::path::Path, stdout: &str) {
860 let script = format!("#!/bin/sh\nprintf '%s\\n' '{stdout}'\n");
861 std::fs::write(path, script).unwrap();
862 let mut perm = std::fs::metadata(path).unwrap().permissions();
863 perm.set_mode(0o755);
864 std::fs::set_permissions(path, perm).unwrap();
865 }
866
867 fn seed_config(binary_path: PathBuf, version: &str, data_dir: PathBuf) -> NodeConfig {
868 NodeConfig {
869 id: 0,
870 service_name: String::new(),
871 rewards_address: "0x0".into(),
872 data_dir,
873 log_dir: None,
874 node_port: None,
875 metrics_port: None,
876 network_id: Some(1),
877 binary_path,
878 version: version.into(),
879 env_variables: HashMap::new(),
880 bootstrap_peers: vec![],
881 }
882 }
883
884 #[tokio::test]
885 async fn reconcile_updates_stale_version_and_persists() {
886 let tmp = tempfile::tempdir().unwrap();
887 let reg_path = tmp.path().join("registry.json");
888 let bin_path = tmp.path().join("ant-node");
889 write_fake_binary(&bin_path, "ant-node 0.10.11-rc.1");
890
891 let mut registry = NodeRegistry::load(®_path).unwrap();
892 let id = registry.add(seed_config(
893 bin_path.clone(),
894 "0.10.1",
895 tmp.path().join("data"),
896 ));
897 registry.save().unwrap();
898
899 reconcile_registry_versions(&mut registry).await;
900
901 assert_eq!(registry.get(id).unwrap().version, "0.10.11-rc.1");
902
903 let reloaded = NodeRegistry::load(®_path).unwrap();
904 assert_eq!(reloaded.get(id).unwrap().version, "0.10.11-rc.1");
905 }
906
907 #[tokio::test]
908 async fn reconcile_leaves_matching_version_alone() {
909 let tmp = tempfile::tempdir().unwrap();
910 let reg_path = tmp.path().join("registry.json");
911 let bin_path = tmp.path().join("ant-node");
912 write_fake_binary(&bin_path, "ant-node 0.10.1");
913
914 let mut registry = NodeRegistry::load(®_path).unwrap();
915 let id = registry.add(seed_config(
916 bin_path.clone(),
917 "0.10.1",
918 tmp.path().join("data"),
919 ));
920
921 reconcile_registry_versions(&mut registry).await;
922
923 assert_eq!(registry.get(id).unwrap().version, "0.10.1");
924 }
925
926 #[tokio::test]
927 async fn reconcile_skips_missing_binary() {
928 let tmp = tempfile::tempdir().unwrap();
929 let reg_path = tmp.path().join("registry.json");
930
931 let mut registry = NodeRegistry::load(®_path).unwrap();
932 let id = registry.add(seed_config(
933 tmp.path().join("does-not-exist"),
934 "0.10.1",
935 tmp.path().join("data"),
936 ));
937
938 reconcile_registry_versions(&mut registry).await;
939
940 assert_eq!(registry.get(id).unwrap().version, "0.10.1");
941 }
942}