Skip to main content

ant_node/
node.rs

1//! Node implementation - thin wrapper around saorsa-core's `P2PNode`.
2
3use crate::ant_protocol::CHUNK_PROTOCOL_ID;
4use crate::config::{
5    default_nodes_dir, default_root_dir, NetworkMode, NodeConfig, NODE_IDENTITY_FILENAME,
6};
7use crate::error::{Error, Result};
8use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender};
9use crate::logging::{debug, error, info, warn};
10use crate::payment::metrics::QuotingMetricsTracker;
11use crate::payment::wallet::parse_rewards_address;
12use crate::payment::{EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
13use crate::replication::config::ReplicationConfig;
14use crate::replication::ReplicationEngine;
15use crate::storage::lmdb::MIB;
16use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
17use crate::upgrade::{
18    upgrade_cache_dir, AutoApplyUpgrader, BinaryCache, ReleaseCache, UpgradeMonitor, UpgradeResult,
19};
20use rand::Rng;
21use saorsa_core::identity::NodeIdentity;
22use saorsa_core::{
23    IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent,
24    P2PNode,
25};
26use std::path::PathBuf;
27use std::sync::atomic::{AtomicI32, Ordering};
28use std::sync::Arc;
29use tokio::sync::Semaphore;
30use tokio::task::JoinHandle;
31use tokio_util::sync::CancellationToken;
32
33#[cfg(unix)]
34use tokio::signal::unix::{signal, SignalKind};
35
36/// Builder for constructing an Ant node.
37pub struct NodeBuilder {
38    config: NodeConfig,
39}
40
41impl NodeBuilder {
42    /// Create a new node builder with the given configuration.
43    #[must_use]
44    pub fn new(config: NodeConfig) -> Self {
45        Self { config }
46    }
47
48    /// Reject startup in production mode without a usable rewards address.
49    ///
50    /// A node that cannot receive payment must not silently run on the
51    /// production network. The placeholder address shipped in the example
52    /// config and an empty string both count as "unconfigured".
53    ///
54    /// # Errors
55    ///
56    /// Returns [`Error::Config`] if `network_mode` is `Production` and
57    /// `payment.rewards_address` is unset, empty, or the example placeholder.
58    fn validate_production_rewards_address(config: &NodeConfig) -> Result<()> {
59        if config.network_mode != NetworkMode::Production {
60            return Ok(());
61        }
62        let configured = config
63            .payment
64            .rewards_address
65            .as_deref()
66            .is_some_and(|addr| !addr.is_empty() && addr != "0xYOUR_ARBITRUM_ADDRESS_HERE");
67        if configured {
68            Ok(())
69        } else {
70            Err(Error::Config(
71                "CRITICAL: Rewards address is not configured. \
72                 Set payment.rewards_address in config to your Arbitrum wallet address."
73                    .to_string(),
74            ))
75        }
76    }
77
78    /// Build and start the node.
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if the node fails to start.
83    pub async fn build(mut self) -> Result<RunningNode> {
84        info!("Building ant-node with config: {:?}", self.config);
85
86        Self::validate_production_rewards_address(&self.config)?;
87
88        // Resolve identity and root_dir (may update self.config.root_dir)
89        let identity = Arc::new(Self::resolve_identity(&mut self.config).await?);
90        let peer_id = identity.peer_id().to_hex();
91
92        info!(peer_id = %peer_id, root_dir = %self.config.root_dir.display(), "Node identity resolved");
93
94        // Ensure root directory exists
95        std::fs::create_dir_all(&self.config.root_dir)?;
96
97        // Create shutdown token
98        let shutdown = CancellationToken::new();
99
100        // Create event channel
101        let (events_tx, events_rx) = create_event_channel();
102
103        // Convert our config to saorsa-core's config
104        let mut core_config = Self::build_core_config(&self.config)?;
105        // Inject the ML-DSA identity so the P2PNode's transport peer ID
106        // matches the pub_key embedded in payment quotes.
107        core_config.node_identity = Some(Arc::clone(&identity));
108        debug!("Core config: {:?}", core_config);
109
110        // Initialize saorsa-core's P2PNode
111        let p2p_node = P2PNode::new(core_config)
112            .await
113            .map_err(|e| Error::Startup(format!("Failed to create P2P node: {e}")))?;
114
115        // Create upgrade monitor
116        let upgrade_monitor = {
117            let node_id_seed = p2p_node.peer_id().as_bytes();
118            Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
119        };
120
121        let repl_config = ReplicationConfig::default();
122
123        // Initialize ANT protocol handler for chunk storage and
124        // wire the fresh-write channel so PUTs trigger replication.
125        let (ant_protocol, fresh_write_rx) = if self.config.storage.enabled {
126            let (fresh_write_tx, fresh_write_rx) = tokio::sync::mpsc::unbounded_channel();
127            let mut protocol =
128                Self::build_ant_protocol(&self.config, &identity, repl_config.close_group_size)
129                    .await?;
130            protocol.set_fresh_write_sender(fresh_write_tx);
131            (Some(Arc::new(protocol)), Some(fresh_write_rx))
132        } else {
133            info!("Chunk storage disabled");
134            (None, None)
135        };
136
137        let p2p_arc = Arc::new(p2p_node);
138
139        // Wire the P2PNode handle into AntProtocol so payment proofs can query
140        // live-DHT closeness.
141        if let Some(ref protocol) = ant_protocol {
142            protocol.attach_p2p_node(Arc::clone(&p2p_arc));
143        }
144
145        // Initialize replication engine (if storage is enabled)
146        let replication_engine =
147            if let (Some(ref protocol), Some(fresh_rx)) = (&ant_protocol, fresh_write_rx) {
148                let storage_arc = protocol.storage();
149                let payment_verifier_arc = protocol.payment_verifier_arc();
150                match ReplicationEngine::new(
151                    repl_config,
152                    Arc::clone(&p2p_arc),
153                    storage_arc,
154                    payment_verifier_arc,
155                    Arc::clone(&identity),
156                    &self.config.root_dir,
157                    fresh_rx,
158                    shutdown.clone(),
159                )
160                .await
161                {
162                    Ok(engine) => Some(engine),
163                    Err(e) => {
164                        warn!("Failed to initialize replication engine: {e}");
165                        None
166                    }
167                }
168            } else {
169                None
170            };
171
172        let node = RunningNode {
173            config: self.config,
174            p2p_node: p2p_arc,
175            shutdown,
176            events_tx,
177            events_rx: Some(events_rx),
178            upgrade_monitor,
179            ant_protocol,
180            replication_engine,
181            protocol_task: None,
182            upgrade_exit_code: Arc::new(AtomicI32::new(-1)),
183        };
184
185        Ok(node)
186    }
187
188    /// Build the saorsa-core `NodeConfig` from our config.
189    fn build_core_config(config: &NodeConfig) -> Result<CoreNodeConfig> {
190        let local = matches!(config.network_mode, NetworkMode::Development);
191
192        let mut core_config = CoreNodeConfig::builder()
193            .port(config.port)
194            .ipv6(!config.ipv4_only)
195            .local(local)
196            .max_message_size(config.max_message_size)
197            .build()
198            .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?;
199
200        // Add bootstrap peers.
201        core_config.bootstrap_peers = config
202            .bootstrap
203            .iter()
204            .map(|addr| MultiAddr::quic(*addr))
205            .collect();
206
207        // Propagate network-mode tuning into saorsa-core where supported.
208        match config.network_mode {
209            NetworkMode::Production => {
210                core_config.diversity_config = Some(CoreDiversityConfig::default());
211            }
212            NetworkMode::Testnet => {
213                // Testnet allows loopback so nodes can be co-located on one machine.
214                core_config.allow_loopback = true;
215                core_config.diversity_config = Some(CoreDiversityConfig {
216                    max_per_ip: config.testnet.max_per_ip,
217                    max_per_subnet: config.testnet.max_per_subnet,
218                });
219            }
220            NetworkMode::Development => {
221                core_config.diversity_config = Some(CoreDiversityConfig::permissive());
222            }
223        }
224
225        // Persist close group peers + trust scores across restarts.
226        // Default to root_dir (alongside node_identity.key) when not explicitly set.
227        core_config.close_group_cache_dir = Some(
228            config
229                .close_group_cache_dir
230                .clone()
231                .unwrap_or_else(|| config.root_dir.clone()),
232        );
233
234        Ok(core_config)
235    }
236
237    /// Resolve the node identity from disk or generate a new one.
238    ///
239    /// **When `root_dir` differs from the platform default** (set via `--root-dir`
240    /// or loaded from `config.toml`):
241    ///   - Use `root_dir` directly: load existing identity or generate a new one.
242    ///
243    /// **When `root_dir` is the platform default** (first run, no config file):
244    ///   1. Scan `{default_root_dir}/nodes/` for subdirectories containing
245    ///      `node_identity.key`.
246    ///   2. **None found** — first run: generate identity, create
247    ///      `nodes/{full_peer_id}/`, save identity there, update `config.root_dir`.
248    ///   3. **Exactly one found** — load it and update `config.root_dir`.
249    ///   4. **Multiple found** — return an error asking for `--root-dir`.
250    async fn resolve_identity(config: &mut NodeConfig) -> Result<NodeIdentity> {
251        if config.root_dir != default_root_dir() {
252            return Self::load_or_generate_identity(&config.root_dir).await;
253        }
254
255        let nodes_dir = default_nodes_dir();
256        let identity_dirs = Self::scan_identity_dirs(&nodes_dir)?;
257
258        match identity_dirs.len() {
259            0 => {
260                // First run: generate new identity and create a peer-id-scoped subdirectory
261                let identity = NodeIdentity::generate().map_err(|e| {
262                    Error::Startup(format!("Failed to generate node identity: {e}"))
263                })?;
264                let peer_id = identity.peer_id().to_hex();
265                let peer_dir = nodes_dir.join(&peer_id);
266                std::fs::create_dir_all(&peer_dir)?;
267                identity
268                    .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
269                    .await
270                    .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
271                config.root_dir = peer_dir;
272                Ok(identity)
273            }
274            1 => {
275                let dir = identity_dirs
276                    .first()
277                    .ok_or_else(|| Error::Config("No identity dirs found".to_string()))?;
278                let identity = NodeIdentity::load_from_file(&dir.join(NODE_IDENTITY_FILENAME))
279                    .await
280                    .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))?;
281                config.root_dir.clone_from(dir);
282                Ok(identity)
283            }
284            _ => {
285                let dirs: Vec<String> = identity_dirs
286                    .iter()
287                    .filter_map(|d| d.file_name().map(|n| n.to_string_lossy().into_owned()))
288                    .collect();
289                Err(Error::Config(format!(
290                    "Multiple node identities found at {}: [{}]. Specify --root-dir to select one.",
291                    nodes_dir.display(),
292                    dirs.join(", ")
293                )))
294            }
295        }
296    }
297
298    /// Load an existing identity from `dir/node_identity.key`, or generate and save a new one.
299    async fn load_or_generate_identity(dir: &std::path::Path) -> Result<NodeIdentity> {
300        let key_path = dir.join(NODE_IDENTITY_FILENAME);
301        if key_path.exists() {
302            NodeIdentity::load_from_file(&key_path)
303                .await
304                .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))
305        } else {
306            let identity = NodeIdentity::generate()
307                .map_err(|e| Error::Startup(format!("Failed to generate node identity: {e}")))?;
308            std::fs::create_dir_all(dir)?;
309            identity
310                .save_to_file(&key_path)
311                .await
312                .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
313            Ok(identity)
314        }
315    }
316
317    /// Scan `base_dir` for immediate subdirectories that contain `node_identity.key`.
318    fn scan_identity_dirs(base_dir: &std::path::Path) -> Result<Vec<PathBuf>> {
319        let mut dirs = Vec::new();
320        let read_dir = match std::fs::read_dir(base_dir) {
321            Ok(rd) => rd,
322            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(dirs),
323            Err(e) => return Err(e.into()),
324        };
325        for entry in read_dir {
326            let entry = entry?;
327            let path = entry.path();
328            if path.is_dir() && path.join(NODE_IDENTITY_FILENAME).exists() {
329                dirs.push(path);
330            }
331        }
332        Ok(dirs)
333    }
334
335    fn build_upgrade_monitor(config: &NodeConfig, node_id_seed: &[u8]) -> UpgradeMonitor {
336        let mut monitor = UpgradeMonitor::new(
337            config.upgrade.github_repo.clone(),
338            config.upgrade.channel,
339            config.upgrade.check_interval_hours,
340        );
341
342        if let Ok(cache_dir) = upgrade_cache_dir() {
343            monitor = monitor.with_release_cache(ReleaseCache::new(
344                cache_dir,
345                std::time::Duration::from_secs(3600),
346            ));
347        }
348
349        if config.upgrade.staged_rollout_hours > 0 {
350            monitor =
351                monitor.with_staged_rollout(node_id_seed, config.upgrade.staged_rollout_hours);
352        }
353
354        monitor
355    }
356
357    /// Build the ANT protocol handler from config.
358    ///
359    /// Initializes LMDB storage, payment verifier, and quote generator.
360    /// Wires ML-DSA-65 signing from the node's identity into the quote generator.
361    async fn build_ant_protocol(
362        config: &NodeConfig,
363        identity: &NodeIdentity,
364        close_group_size: usize,
365    ) -> Result<AntProtocol> {
366        // Create LMDB storage
367        let storage_config = LmdbStorageConfig {
368            root_dir: config.root_dir.clone(),
369            verify_on_read: config.storage.verify_on_read,
370            max_map_size: config.storage.db_size_gb.saturating_mul(1024 * 1024 * 1024),
371            disk_reserve: config.storage.disk_reserve_mb.saturating_mul(MIB),
372        };
373        let storage = LmdbStorage::new(storage_config)
374            .await
375            .map_err(|e| Error::Startup(format!("Failed to create LMDB storage: {e}")))?;
376
377        // Parse rewards address (required — node must know where to receive payments)
378        let rewards_address = match config.payment.rewards_address {
379            Some(ref addr) => parse_rewards_address(addr)?,
380            None => {
381                return Err(Error::Startup(
382                    "No rewards address configured. Set --rewards-address or payment.rewards_address in config.".to_string(),
383                ));
384            }
385        };
386
387        // Create payment verifier
388        let evm_network = config.payment.evm_network.clone().into_evm_network();
389        let payment_config = PaymentVerifierConfig {
390            evm: EvmVerifierConfig {
391                network: evm_network,
392            },
393            cache_capacity: config.payment.cache_capacity,
394            close_group_size,
395            local_rewards_address: rewards_address,
396        };
397        let payment_verifier = PaymentVerifier::new(payment_config);
398        let metrics_tracker = QuotingMetricsTracker::new(0);
399        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
400
401        // Wire ML-DSA-65 signing from node identity.
402        // This same signer is used for both regular quotes and merkle candidate quotes.
403        crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)?;
404
405        let storage = Arc::new(storage);
406        let payment_verifier = Arc::new(payment_verifier);
407
408        let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
409
410        info!(
411            "ANT protocol handler initialized with ML-DSA-65 signing (protocol={CHUNK_PROTOCOL_ID})"
412        );
413
414        Ok(protocol)
415    }
416}
417
418/// A running Ant node.
419pub struct RunningNode {
420    config: NodeConfig,
421    p2p_node: Arc<P2PNode>,
422    shutdown: CancellationToken,
423    events_tx: NodeEventsSender,
424    events_rx: Option<NodeEventsChannel>,
425    upgrade_monitor: Option<UpgradeMonitor>,
426    /// ANT protocol handler for chunk storage.
427    ant_protocol: Option<Arc<AntProtocol>>,
428    /// Replication engine (manages neighbor sync, verification, audits).
429    replication_engine: Option<ReplicationEngine>,
430    /// Protocol message routing background task.
431    protocol_task: Option<JoinHandle<()>>,
432    /// Exit code requested by a successful upgrade (-1 = no upgrade exit pending).
433    upgrade_exit_code: Arc<AtomicI32>,
434}
435
436impl RunningNode {
437    /// Get the node's root directory.
438    #[must_use]
439    pub fn root_dir(&self) -> &PathBuf {
440        &self.config.root_dir
441    }
442
443    /// Get a receiver for node events.
444    ///
445    /// Note: Can only be called once. Subsequent calls return None.
446    pub fn events(&mut self) -> Option<NodeEventsChannel> {
447        self.events_rx.take()
448    }
449
450    /// Subscribe to node events.
451    #[must_use]
452    pub fn subscribe_events(&self) -> NodeEventsChannel {
453        self.events_tx.subscribe()
454    }
455
456    /// Run the node until shutdown is requested.
457    ///
458    /// # Errors
459    ///
460    /// Returns an error if the node encounters a fatal error.
461    #[allow(clippy::too_many_lines)]
462    pub async fn run(&mut self) -> Result<()> {
463        info!("Node runtime loop starting");
464
465        // Subscribe to DHT events BEFORE starting the P2P node so the
466        // bootstrap-sync task does not miss the BootstrapComplete event
467        // emitted during P2PNode::start().
468        let dht_events_for_bootstrap = self
469            .replication_engine
470            .as_ref()
471            .map(|_| self.p2p_node.dht_manager().subscribe_events());
472
473        // Start the P2P node
474        self.p2p_node
475            .start()
476            .await
477            .map_err(|e| Error::Startup(format!("Failed to start P2P node: {e}")))?;
478
479        let listen_addrs = self.p2p_node.listen_addrs().await;
480        info!(listen_addrs = ?listen_addrs, "P2P node started");
481
482        // Extract the actual bound port (config port may be 0 = auto-select)
483        let actual_port = listen_addrs
484            .first()
485            .and_then(MultiAddr::port)
486            .unwrap_or(self.config.port);
487        info!(
488            port = actual_port,
489            "Node is running on port: {}", actual_port
490        );
491
492        // Emit started event
493        if let Err(e) = self.events_tx.send(NodeEvent::Started) {
494            warn!("Failed to send Started event: {e}");
495        }
496
497        // Start protocol message routing (P2P → AntProtocol → P2P response)
498        self.start_protocol_routing();
499
500        // Start replication engine background tasks
501        if let Some(ref mut engine) = self.replication_engine {
502            // Safety: dht_events_for_bootstrap is Some when replication_engine
503            // is Some (both arms use the same condition).
504            if let Some(dht_events) = dht_events_for_bootstrap {
505                engine.start(dht_events);
506            }
507            info!("Replication engine started");
508        }
509
510        // Start upgrade monitor if enabled
511        if let Some(monitor) = self.upgrade_monitor.take() {
512            let events_tx = self.events_tx.clone();
513            let shutdown = self.shutdown.clone();
514            let stop_on_upgrade = self.config.upgrade.stop_on_upgrade;
515            let upgrade_exit_code = Arc::clone(&self.upgrade_exit_code);
516
517            tokio::spawn(async move {
518                let mut monitor = monitor;
519                let mut upgrader = AutoApplyUpgrader::new().with_stop_on_upgrade(stop_on_upgrade);
520                if let Ok(cache_dir) = upgrade_cache_dir() {
521                    upgrader = upgrader.with_binary_cache(BinaryCache::new(cache_dir));
522                }
523
524                // Add randomized jitter before the first upgrade check to prevent all nodes
525                // from hitting the GitHub API simultaneously when started together.
526                {
527                    let jitter_duration = jittered_interval(monitor.check_interval());
528                    let first_check_time = chrono::Utc::now()
529                        + chrono::Duration::from_std(jitter_duration).unwrap_or_else(|e| {
530                            warn!("chrono::Duration::from_std failed for jitter ({e}), defaulting to 1 minute");
531                            chrono::Duration::minutes(1)
532                        });
533                    info!(
534                        "First upgrade check scheduled for {} (jitter: {}s)",
535                        first_check_time.to_rfc3339(),
536                        jitter_duration.as_secs()
537                    );
538                    tokio::time::sleep(jitter_duration).await;
539                }
540
541                loop {
542                    tokio::select! {
543                        () = shutdown.cancelled() => {
544                            break;
545                        }
546                        result = monitor.check_for_ready_upgrade() => {
547                            match result {
548                                Ok(Some(upgrade_info)) => {
549                                    info!(
550                                        current_version = %upgrader.current_version(),
551                                        new_version = %upgrade_info.version,
552                                        "Upgrade available"
553                                    );
554
555                                    // Send notification event
556                                    if let Err(e) = events_tx.send(NodeEvent::UpgradeAvailable {
557                                        version: upgrade_info.version.to_string(),
558                                    }) {
559                                        warn!("Failed to send UpgradeAvailable event: {e}");
560                                    }
561
562                                    // Auto-apply the upgrade
563                                    info!("Starting auto-apply upgrade...");
564                                    match upgrader.apply_upgrade(&upgrade_info).await {
565                                        Ok(UpgradeResult::Success { version, exit_code }) => {
566                                            info!("Upgrade to {} successful, initiating graceful shutdown", version);
567                                            upgrade_exit_code.store(exit_code, Ordering::SeqCst);
568                                            shutdown.cancel();
569                                            break;
570                                        }
571                                        Ok(UpgradeResult::RolledBack { reason }) => {
572                                            warn!("Error during upgrade process: {}", reason);
573                                        }
574                                        Ok(UpgradeResult::NoUpgrade) => {
575                                            info!("Already running latest version");
576                                        }
577                                        Err(e) => {
578                                            error!("Error during upgrade process: {}", e);
579                                        }
580                                    }
581                                }
582                                Ok(None) => {
583                                    if let Some(remaining) = monitor.time_until_upgrade() {
584                                        info!(
585                                            "Upgrade pending, rollout delay remaining: {}m {}s",
586                                            remaining.as_secs() / 60,
587                                            remaining.as_secs() % 60
588                                        );
589                                    } else {
590                                        info!("No upgrade available");
591                                    }
592                                }
593                                Err(e) => {
594                                    warn!("Error during upgrade process: {}", e);
595                                }
596                            }
597                            // If an upgrade is pending, sleep for exactly the remaining
598                            // rollout delay so the node restarts at its scheduled time
599                            // rather than waiting for the next check interval tick.
600                            let sleep_duration = monitor.time_until_upgrade().map_or_else(
601                                || {
602                                    // No pending upgrade - schedule next check with jitter
603                                    let jittered_duration =
604                                        jittered_interval(monitor.check_interval());
605                                    let next_check = chrono::Utc::now()
606                                        + chrono::Duration::from_std(jittered_duration).unwrap_or_else(|e| {
607                                            warn!("chrono::Duration::from_std failed for interval ({e}), defaulting to 1 hour");
608                                            chrono::Duration::hours(1)
609                                        });
610                                    info!("Next upgrade check scheduled for {}", next_check.to_rfc3339());
611                                    jittered_duration
612                                },
613                                |remaining| {
614                                    // If the rollout delay has fully elapsed but the upgrade was
615                                    // not successfully applied, avoid a tight loop by backing off
616                                    // at least one check interval before retrying.
617                                    if remaining.is_zero() {
618                                        let backoff = jittered_interval(monitor.check_interval());
619                                        let next_check = chrono::Utc::now()
620                                            + chrono::Duration::from_std(backoff).unwrap_or_else(|e| {
621                                                warn!("chrono::Duration::from_std failed for backoff ({e}), defaulting to 1 hour");
622                                                chrono::Duration::hours(1)
623                                            });
624                                        info!(
625                                            "Upgrade rollout delay elapsed but previous apply did not succeed; \
626                                             backing off, next check scheduled for {}",
627                                            next_check.to_rfc3339()
628                                        );
629                                        backoff
630                                    } else {
631                                        let wake_time = chrono::Utc::now()
632                                            + chrono::Duration::from_std(remaining).unwrap_or_else(|e| {
633                                                warn!("chrono::Duration::from_std failed for rollout delay ({e}), defaulting to 1 minute");
634                                                chrono::Duration::minutes(1)
635                                            });
636                                        info!("Will apply upgrade at {}", wake_time.to_rfc3339());
637                                        remaining
638                                    }
639                                },
640                            );
641                            // Use select! so shutdown can interrupt long sleeps
642                            // (e.g. during a full rollout window delay).
643                            tokio::select! {
644                                () = shutdown.cancelled() => {
645                                    break;
646                                }
647                                () = tokio::time::sleep(sleep_duration) => {}
648                            }
649                        }
650                    }
651                }
652            });
653        }
654
655        info!("Node running, waiting for shutdown signal");
656
657        // Run the main event loop with signal handling
658        self.run_event_loop().await?;
659
660        // Shutdown replication engine before P2P so background tasks don't
661        // use a dead P2P layer, and Arc<LmdbStorage> references are released.
662        if let Some(ref mut engine) = self.replication_engine {
663            engine.shutdown().await;
664        }
665
666        // Stop protocol routing task
667        if let Some(handle) = self.protocol_task.take() {
668            handle.abort();
669        }
670
671        // Shutdown P2P node
672        info!("Shutting down P2P node...");
673        if let Err(e) = self.p2p_node.shutdown().await {
674            warn!("Error during P2P node shutdown: {e}");
675        }
676
677        if let Err(e) = self.events_tx.send(NodeEvent::ShuttingDown) {
678            warn!("Failed to send ShuttingDown event: {e}");
679        }
680        info!("Node shutdown complete");
681
682        // If an upgrade triggered the shutdown, exit with the requested code.
683        // This happens *after* all cleanup (P2P shutdown, log flush, etc.) so
684        // that destructors and async resources are properly torn down.
685        let exit_code = self.upgrade_exit_code.load(Ordering::SeqCst);
686        if exit_code >= 0 {
687            info!("Exiting with code {} for upgrade restart", exit_code);
688            std::process::exit(exit_code);
689        }
690
691        Ok(())
692    }
693
694    /// Run the main event loop, handling shutdown and signals.
695    #[cfg(unix)]
696    async fn run_event_loop(&self) -> Result<()> {
697        let mut sigterm = signal(SignalKind::terminate())?;
698        let mut sighup = signal(SignalKind::hangup())?;
699
700        loop {
701            tokio::select! {
702                () = self.shutdown.cancelled() => {
703                    info!("Shutdown signal received");
704                    break;
705                }
706                _ = tokio::signal::ctrl_c() => {
707                    info!("Received SIGINT (Ctrl-C), initiating shutdown");
708                    self.shutdown();
709                    break;
710                }
711                _ = sigterm.recv() => {
712                    info!("Received SIGTERM, initiating shutdown");
713                    self.shutdown();
714                    break;
715                }
716                _ = sighup.recv() => {
717                    info!("Received SIGHUP (config reload not yet supported)");
718                }
719            }
720        }
721        Ok(())
722    }
723
724    /// Run the main event loop, handling shutdown signals (non-Unix version).
725    #[cfg(not(unix))]
726    async fn run_event_loop(&self) -> Result<()> {
727        loop {
728            tokio::select! {
729                () = self.shutdown.cancelled() => {
730                    info!("Shutdown signal received");
731                    break;
732                }
733                _ = tokio::signal::ctrl_c() => {
734                    info!("Received Ctrl-C, initiating shutdown");
735                    self.shutdown();
736                    break;
737                }
738            }
739        }
740        Ok(())
741    }
742
743    /// Start the protocol message routing background task.
744    ///
745    /// Subscribes to P2P events and routes incoming chunk protocol messages
746    /// to the `AntProtocol` handler, sending responses back to the sender.
747    fn start_protocol_routing(&mut self) {
748        let protocol = match self.ant_protocol {
749            Some(ref p) => Arc::clone(p),
750            None => return,
751        };
752
753        let mut events = self.p2p_node.subscribe_events();
754        let p2p = Arc::clone(&self.p2p_node);
755        let semaphore = Arc::new(Semaphore::new(64));
756
757        self.protocol_task = Some(tokio::spawn(async move {
758            while let Ok(event) = events.recv().await {
759                if let P2PEvent::Message {
760                    topic,
761                    source: Some(source),
762                    data,
763                    ..
764                } = event
765                {
766                    let handler_info: Option<(&str, &str)> = if topic == CHUNK_PROTOCOL_ID {
767                        Some(("chunk", CHUNK_PROTOCOL_ID))
768                    } else {
769                        None
770                    };
771
772                    if let Some((data_type, response_topic)) = handler_info {
773                        debug!("Received {data_type} protocol message from {source}");
774                        let protocol = Arc::clone(&protocol);
775                        let p2p = Arc::clone(&p2p);
776                        let sem = semaphore.clone();
777                        tokio::spawn(async move {
778                            let Ok(_permit) = sem.acquire().await else {
779                                return;
780                            };
781                            let result = match data_type {
782                                "chunk" => protocol.try_handle_request(&data).await,
783                                _ => return,
784                            };
785                            match result {
786                                Ok(Some(response)) => {
787                                    if let Err(e) = p2p
788                                        .send_message(
789                                            &source,
790                                            response_topic,
791                                            response.to_vec(),
792                                            &[],
793                                        )
794                                        .await
795                                    {
796                                        warn!("Failed to send {data_type} protocol response to {source}: {e}");
797                                    }
798                                }
799                                Ok(None) => {}
800                                Err(e) => {
801                                    warn!("{data_type} protocol handler error: {e}");
802                                }
803                            }
804                        });
805                    }
806                }
807            }
808        }));
809        info!("Protocol message routing started");
810    }
811
812    /// Request the node to shut down.
813    pub fn shutdown(&self) {
814        self.shutdown.cancel();
815    }
816}
817
818/// Apply ±5% jitter to a base interval to prevent thundering-herd behaviour
819/// when multiple nodes check for upgrades on the same schedule.
820fn jittered_interval(base: std::time::Duration) -> std::time::Duration {
821    let secs = base.as_secs();
822    let variance = secs / 20; // 5%
823    if variance == 0 {
824        return base;
825    }
826    let jitter = rand::thread_rng().gen_range(0..=variance * 2);
827    std::time::Duration::from_secs(secs.saturating_sub(variance) + jitter)
828}
829
830#[cfg(test)]
831#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
832mod tests {
833    use super::*;
834    use crate::config::NODES_SUBDIR;
835
836    #[test]
837    fn test_build_upgrade_monitor_staged_rollout_enabled() {
838        let config = NodeConfig {
839            upgrade: crate::config::UpgradeConfig {
840                staged_rollout_hours: 24,
841                ..Default::default()
842            },
843            ..Default::default()
844        };
845        let seed = b"node-seed";
846
847        let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
848        assert!(monitor.has_staged_rollout());
849    }
850
851    #[test]
852    fn test_build_upgrade_monitor_staged_rollout_disabled() {
853        let config = NodeConfig {
854            upgrade: crate::config::UpgradeConfig {
855                staged_rollout_hours: 0,
856                ..Default::default()
857            },
858            ..Default::default()
859        };
860        let seed = b"node-seed";
861
862        let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
863        assert!(!monitor.has_staged_rollout());
864    }
865
866    #[test]
867    fn test_build_core_config_sets_production_mode() {
868        let config = NodeConfig {
869            network_mode: NetworkMode::Production,
870            ..Default::default()
871        };
872        let core = NodeBuilder::build_core_config(&config).expect("core config");
873        assert!(core.diversity_config.is_some());
874    }
875
876    #[test]
877    fn test_build_core_config_ipv4_only() {
878        let config = NodeConfig {
879            ipv4_only: true,
880            ..Default::default()
881        };
882        let core = NodeBuilder::build_core_config(&config).expect("core config");
883        assert!(!core.ipv6, "ipv4_only should disable IPv6");
884    }
885
886    #[test]
887    fn test_build_core_config_dual_stack_by_default() {
888        let config = NodeConfig::default();
889        let core = NodeBuilder::build_core_config(&config).expect("core config");
890        assert!(core.ipv6, "dual-stack should be the default");
891    }
892
893    #[test]
894    fn test_build_core_config_sets_development_mode_permissive() {
895        let config = NodeConfig {
896            network_mode: NetworkMode::Development,
897            ..Default::default()
898        };
899        let core = NodeBuilder::build_core_config(&config).expect("core config");
900        let diversity = core.diversity_config.expect("diversity");
901        assert_eq!(diversity.max_per_ip, Some(usize::MAX));
902        assert_eq!(diversity.max_per_subnet, Some(usize::MAX));
903    }
904
905    #[test]
906    fn test_scan_identity_dirs_empty_dir() {
907        let tmp = tempfile::tempdir().unwrap();
908        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
909        assert!(dirs.is_empty());
910    }
911
912    #[test]
913    fn test_scan_identity_dirs_nonexistent_dir() {
914        let tmp = tempfile::tempdir().unwrap();
915        let path = tmp.path().join("nonexistent_identity_dir");
916        let dirs = NodeBuilder::scan_identity_dirs(&path).unwrap();
917        assert!(dirs.is_empty());
918    }
919
920    #[test]
921    fn test_scan_identity_dirs_finds_one() {
922        let tmp = tempfile::tempdir().unwrap();
923        let node_dir = tmp.path().join("abc123");
924        std::fs::create_dir_all(&node_dir).unwrap();
925        std::fs::write(node_dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
926
927        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
928        assert_eq!(dirs.len(), 1);
929        assert_eq!(dirs[0], node_dir);
930    }
931
932    #[test]
933    fn test_scan_identity_dirs_finds_multiple() {
934        let tmp = tempfile::tempdir().unwrap();
935        for name in &["node_a", "node_b"] {
936            let dir = tmp.path().join(name);
937            std::fs::create_dir_all(&dir).unwrap();
938            std::fs::write(dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
939        }
940        // A directory without a key file should be ignored
941        std::fs::create_dir_all(tmp.path().join("no_key")).unwrap();
942
943        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
944        assert_eq!(dirs.len(), 2);
945    }
946
947    #[tokio::test]
948    async fn test_resolve_identity_first_run_creates_identity() {
949        let tmp = tempfile::tempdir().unwrap();
950        let mut config = NodeConfig {
951            root_dir: tmp.path().to_path_buf(),
952            ..Default::default()
953        };
954
955        let identity = NodeBuilder::resolve_identity(&mut config).await.unwrap();
956        // Key file should exist
957        assert!(tmp.path().join(NODE_IDENTITY_FILENAME).exists());
958        // peer_id should be derivable from the identity
959        let peer_id = identity.peer_id().to_hex();
960        assert_eq!(peer_id.len(), 64); // 32 bytes hex-encoded
961    }
962
963    #[tokio::test]
964    async fn test_resolve_identity_loads_existing() {
965        let tmp = tempfile::tempdir().unwrap();
966
967        // Generate and save an identity
968        let original = NodeIdentity::generate().unwrap();
969        original
970            .save_to_file(&tmp.path().join(NODE_IDENTITY_FILENAME))
971            .await
972            .unwrap();
973
974        let mut config = NodeConfig {
975            root_dir: tmp.path().to_path_buf(),
976            ..Default::default()
977        };
978
979        let loaded = NodeBuilder::resolve_identity(&mut config).await.unwrap();
980        assert_eq!(loaded.peer_id(), original.peer_id());
981    }
982
983    #[test]
984    fn test_peer_id_hex_length() {
985        let id = saorsa_core::identity::PeerId::from_bytes([0x42; 32]);
986        let hex = id.to_hex();
987        assert_eq!(hex.len(), 64); // 32 bytes = 64 hex chars
988    }
989
990    /// Simulates a node restart: first run creates identity in a scoped subdir
991    /// under `nodes/`, second run discovers and reloads it — `peer_id` must be
992    /// identical and the directory name is the full 64-char hex peer ID.
993    #[tokio::test]
994    async fn test_identity_persisted_across_restarts() {
995        let base_dir = tempfile::tempdir().unwrap();
996        let nodes_dir = base_dir.path().join(NODES_SUBDIR);
997
998        // First "boot": generate identity, save it in nodes/{peer_id}/
999        let identity1 = NodeIdentity::generate().unwrap();
1000        let peer_id1 = identity1.peer_id().to_hex();
1001        let peer_dir = nodes_dir.join(&peer_id1);
1002        std::fs::create_dir_all(&peer_dir).unwrap();
1003        identity1
1004            .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
1005            .await
1006            .unwrap();
1007
1008        // Verify directory name is the full 64-char hex peer ID
1009        assert_eq!(peer_id1.len(), 64);
1010        assert_eq!(peer_dir.file_name().unwrap().to_string_lossy(), peer_id1);
1011
1012        // Second "boot": scan should find and reload the same identity
1013        let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1014        assert_eq!(identity_dirs.len(), 1);
1015        let loaded = NodeIdentity::load_from_file(&identity_dirs[0].join(NODE_IDENTITY_FILENAME))
1016            .await
1017            .unwrap();
1018        let peer_id2 = loaded.peer_id().to_hex();
1019
1020        assert_eq!(peer_id1, peer_id2, "peer_id must survive restart");
1021        assert_eq!(
1022            identity_dirs[0], peer_dir,
1023            "root_dir must be the same directory"
1024        );
1025    }
1026
1027    /// When two identity subdirs exist under `nodes/`, the scan finds multiple
1028    /// and the resolve path would error asking for `--root-dir`.
1029    #[tokio::test]
1030    async fn test_multiple_identities_errors() {
1031        let base_dir = tempfile::tempdir().unwrap();
1032        let nodes_dir = base_dir.path().join(NODES_SUBDIR);
1033
1034        // Create two identity subdirectories under nodes/
1035        for name in &["aaaa", "bbbb"] {
1036            let dir = nodes_dir.join(name);
1037            std::fs::create_dir_all(&dir).unwrap();
1038            let identity = NodeIdentity::generate().unwrap();
1039            identity
1040                .save_to_file(&dir.join(NODE_IDENTITY_FILENAME))
1041                .await
1042                .unwrap();
1043        }
1044
1045        let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1046        assert_eq!(identity_dirs.len(), 2, "should find both identity dirs");
1047    }
1048
1049    /// With a non-default `root_dir` (explicit path), the identity is created on
1050    /// first run and reloaded on subsequent runs from the same directory.
1051    #[tokio::test]
1052    async fn test_explicit_root_dir_persists_across_restarts() {
1053        let tmp = tempfile::tempdir().unwrap();
1054
1055        // First boot — non-default root_dir triggers explicit path
1056        let mut config1 = NodeConfig {
1057            root_dir: tmp.path().to_path_buf(),
1058            ..Default::default()
1059        };
1060        let identity1 = NodeBuilder::resolve_identity(&mut config1).await.unwrap();
1061
1062        // Second boot — same dir
1063        let mut config2 = NodeConfig {
1064            root_dir: tmp.path().to_path_buf(),
1065            ..Default::default()
1066        };
1067        let identity2 = NodeBuilder::resolve_identity(&mut config2).await.unwrap();
1068
1069        assert_eq!(
1070            identity1.peer_id(),
1071            identity2.peer_id(),
1072            "explicit --root-dir must yield stable identity"
1073        );
1074    }
1075}