Skip to main content

ant_node/
node.rs

1//! Node implementation - thin wrapper around saorsa-core's `P2PNode`.
2
3use crate::ant_protocol::CHUNK_PROTOCOL_ID;
4use crate::config::{
5    default_nodes_dir, default_root_dir, EvmNetworkConfig, NetworkMode, NodeConfig,
6    NODE_IDENTITY_FILENAME,
7};
8use crate::error::{Error, Result};
9use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender};
10use crate::logging::{debug, error, info, warn};
11use crate::payment::metrics::QuotingMetricsTracker;
12use crate::payment::wallet::parse_rewards_address;
13use crate::payment::{EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
14use crate::replication::config::ReplicationConfig;
15use crate::replication::ReplicationEngine;
16use crate::storage::lmdb::MIB;
17use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
18use crate::upgrade::{
19    upgrade_cache_dir, AutoApplyUpgrader, BinaryCache, ReleaseCache, UpgradeMonitor, UpgradeResult,
20};
21use evmlib::Network as EvmNetwork;
22use rand::Rng;
23use saorsa_core::identity::NodeIdentity;
24use saorsa_core::{
25    BootstrapConfig as CoreBootstrapConfig, BootstrapManager,
26    IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent,
27    P2PNode,
28};
29use std::path::PathBuf;
30use std::sync::atomic::{AtomicI32, Ordering};
31use std::sync::Arc;
32use tokio::sync::Semaphore;
33use tokio::task::JoinHandle;
34use tokio_util::sync::CancellationToken;
35
36#[cfg(unix)]
37use tokio::signal::unix::{signal, SignalKind};
38
39/// Builder for constructing an Ant node.
40pub struct NodeBuilder {
41    config: NodeConfig,
42}
43
44impl NodeBuilder {
45    /// Create a new node builder with the given configuration.
46    #[must_use]
47    pub fn new(config: NodeConfig) -> Self {
48        Self { config }
49    }
50
51    /// Build and start the node.
52    ///
53    /// # Errors
54    ///
55    /// Returns an error if the node fails to start.
56    pub async fn build(mut self) -> Result<RunningNode> {
57        info!("Building ant-node with config: {:?}", self.config);
58
59        // Validate rewards address in production
60        if self.config.network_mode == NetworkMode::Production {
61            match self.config.payment.rewards_address {
62                None => {
63                    return Err(Error::Config(
64                        "CRITICAL: Rewards address is not configured. \
65                         Set payment.rewards_address in config to your Arbitrum wallet address."
66                            .to_string(),
67                    ));
68                }
69                Some(ref addr) if addr == "0xYOUR_ARBITRUM_ADDRESS_HERE" || addr.is_empty() => {
70                    return Err(Error::Config(
71                        "CRITICAL: Rewards address is not configured. \
72                         Set payment.rewards_address in config to your Arbitrum wallet address."
73                            .to_string(),
74                    ));
75                }
76                Some(_) => {}
77            }
78        }
79
80        // Resolve identity and root_dir (may update self.config.root_dir)
81        let identity = Arc::new(Self::resolve_identity(&mut self.config).await?);
82        let peer_id = identity.peer_id().to_hex();
83
84        info!(peer_id = %peer_id, root_dir = %self.config.root_dir.display(), "Node identity resolved");
85
86        // Ensure root directory exists
87        std::fs::create_dir_all(&self.config.root_dir)?;
88
89        // Create shutdown token
90        let shutdown = CancellationToken::new();
91
92        // Create event channel
93        let (events_tx, events_rx) = create_event_channel();
94
95        // Convert our config to saorsa-core's config
96        let mut core_config = Self::build_core_config(&self.config)?;
97        // Inject the ML-DSA identity so the P2PNode's transport peer ID
98        // matches the pub_key embedded in payment quotes.
99        core_config.node_identity = Some(Arc::clone(&identity));
100        debug!("Core config: {:?}", core_config);
101
102        // Initialize saorsa-core's P2PNode
103        let p2p_node = P2PNode::new(core_config)
104            .await
105            .map_err(|e| Error::Startup(format!("Failed to create P2P node: {e}")))?;
106
107        // Create upgrade monitor
108        let upgrade_monitor = {
109            let node_id_seed = p2p_node.peer_id().as_bytes();
110            Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
111        };
112
113        // Initialize bootstrap cache manager if enabled
114        let bootstrap_manager = if self.config.bootstrap_cache.enabled {
115            Self::build_bootstrap_manager(&self.config).await
116        } else {
117            info!("Bootstrap cache disabled");
118            None
119        };
120
121        // Initialize ANT protocol handler for chunk storage and
122        // wire the fresh-write channel so PUTs trigger replication.
123        let (ant_protocol, fresh_write_rx) = if self.config.storage.enabled {
124            let (fresh_write_tx, fresh_write_rx) = tokio::sync::mpsc::unbounded_channel();
125            let mut protocol = Self::build_ant_protocol(&self.config, &identity).await?;
126            protocol.set_fresh_write_sender(fresh_write_tx);
127            (Some(Arc::new(protocol)), Some(fresh_write_rx))
128        } else {
129            info!("Chunk storage disabled");
130            (None, None)
131        };
132
133        let p2p_arc = Arc::new(p2p_node);
134
135        // Initialize replication engine (if storage is enabled)
136        let replication_engine =
137            if let (Some(ref protocol), Some(fresh_rx)) = (&ant_protocol, fresh_write_rx) {
138                let repl_config = ReplicationConfig::default();
139                let storage_arc = protocol.storage();
140                let payment_verifier_arc = protocol.payment_verifier_arc();
141                match ReplicationEngine::new(
142                    repl_config,
143                    Arc::clone(&p2p_arc),
144                    storage_arc,
145                    payment_verifier_arc,
146                    &self.config.root_dir,
147                    fresh_rx,
148                    shutdown.clone(),
149                )
150                .await
151                {
152                    Ok(engine) => Some(engine),
153                    Err(e) => {
154                        warn!("Failed to initialize replication engine: {e}");
155                        None
156                    }
157                }
158            } else {
159                None
160            };
161
162        let node = RunningNode {
163            config: self.config,
164            p2p_node: p2p_arc,
165            shutdown,
166            events_tx,
167            events_rx: Some(events_rx),
168            upgrade_monitor,
169            bootstrap_manager,
170            ant_protocol,
171            replication_engine,
172            protocol_task: None,
173            upgrade_exit_code: Arc::new(AtomicI32::new(-1)),
174        };
175
176        Ok(node)
177    }
178
179    /// Build the saorsa-core `NodeConfig` from our config.
180    fn build_core_config(config: &NodeConfig) -> Result<CoreNodeConfig> {
181        let local = matches!(config.network_mode, NetworkMode::Development);
182
183        let mut core_config = CoreNodeConfig::builder()
184            .port(config.port)
185            .ipv6(!config.ipv4_only)
186            .local(local)
187            .max_message_size(config.max_message_size)
188            .build()
189            .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?;
190
191        // Add bootstrap peers.
192        core_config.bootstrap_peers = config
193            .bootstrap
194            .iter()
195            .map(|addr| MultiAddr::quic(*addr))
196            .collect();
197
198        // Propagate network-mode tuning into saorsa-core where supported.
199        match config.network_mode {
200            NetworkMode::Production => {
201                core_config.diversity_config = Some(CoreDiversityConfig::default());
202            }
203            NetworkMode::Testnet => {
204                // Testnet allows loopback so nodes can be co-located on one machine.
205                core_config.allow_loopback = true;
206                core_config.diversity_config = Some(CoreDiversityConfig {
207                    max_per_ip: config.testnet.max_per_ip,
208                    max_per_subnet: config.testnet.max_per_subnet,
209                });
210            }
211            NetworkMode::Development => {
212                core_config.diversity_config = Some(CoreDiversityConfig::permissive());
213            }
214        }
215
216        // Persist close group peers + trust scores across restarts.
217        // Default to root_dir (alongside node_identity.key) when not explicitly set.
218        core_config.close_group_cache_dir = Some(
219            config
220                .close_group_cache_dir
221                .clone()
222                .unwrap_or_else(|| config.root_dir.clone()),
223        );
224
225        Ok(core_config)
226    }
227
228    /// Resolve the node identity from disk or generate a new one.
229    ///
230    /// **When `root_dir` differs from the platform default** (set via `--root-dir`
231    /// or loaded from `config.toml`):
232    ///   - Use `root_dir` directly: load existing identity or generate a new one.
233    ///
234    /// **When `root_dir` is the platform default** (first run, no config file):
235    ///   1. Scan `{default_root_dir}/nodes/` for subdirectories containing
236    ///      `node_identity.key`.
237    ///   2. **None found** — first run: generate identity, create
238    ///      `nodes/{full_peer_id}/`, save identity there, update `config.root_dir`.
239    ///   3. **Exactly one found** — load it and update `config.root_dir`.
240    ///   4. **Multiple found** — return an error asking for `--root-dir`.
241    async fn resolve_identity(config: &mut NodeConfig) -> Result<NodeIdentity> {
242        if config.root_dir != default_root_dir() {
243            return Self::load_or_generate_identity(&config.root_dir).await;
244        }
245
246        let nodes_dir = default_nodes_dir();
247        let identity_dirs = Self::scan_identity_dirs(&nodes_dir)?;
248
249        match identity_dirs.len() {
250            0 => {
251                // First run: generate new identity and create a peer-id-scoped subdirectory
252                let identity = NodeIdentity::generate().map_err(|e| {
253                    Error::Startup(format!("Failed to generate node identity: {e}"))
254                })?;
255                let peer_id = identity.peer_id().to_hex();
256                let peer_dir = nodes_dir.join(&peer_id);
257                std::fs::create_dir_all(&peer_dir)?;
258                identity
259                    .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
260                    .await
261                    .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
262                config.root_dir = peer_dir;
263                Ok(identity)
264            }
265            1 => {
266                let dir = identity_dirs
267                    .first()
268                    .ok_or_else(|| Error::Config("No identity dirs found".to_string()))?;
269                let identity = NodeIdentity::load_from_file(&dir.join(NODE_IDENTITY_FILENAME))
270                    .await
271                    .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))?;
272                config.root_dir.clone_from(dir);
273                Ok(identity)
274            }
275            _ => {
276                let dirs: Vec<String> = identity_dirs
277                    .iter()
278                    .filter_map(|d| d.file_name().map(|n| n.to_string_lossy().into_owned()))
279                    .collect();
280                Err(Error::Config(format!(
281                    "Multiple node identities found at {}: [{}]. Specify --root-dir to select one.",
282                    nodes_dir.display(),
283                    dirs.join(", ")
284                )))
285            }
286        }
287    }
288
289    /// Load an existing identity from `dir/node_identity.key`, or generate and save a new one.
290    async fn load_or_generate_identity(dir: &std::path::Path) -> Result<NodeIdentity> {
291        let key_path = dir.join(NODE_IDENTITY_FILENAME);
292        if key_path.exists() {
293            NodeIdentity::load_from_file(&key_path)
294                .await
295                .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))
296        } else {
297            let identity = NodeIdentity::generate()
298                .map_err(|e| Error::Startup(format!("Failed to generate node identity: {e}")))?;
299            std::fs::create_dir_all(dir)?;
300            identity
301                .save_to_file(&key_path)
302                .await
303                .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
304            Ok(identity)
305        }
306    }
307
308    /// Scan `base_dir` for immediate subdirectories that contain `node_identity.key`.
309    fn scan_identity_dirs(base_dir: &std::path::Path) -> Result<Vec<PathBuf>> {
310        let mut dirs = Vec::new();
311        let read_dir = match std::fs::read_dir(base_dir) {
312            Ok(rd) => rd,
313            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(dirs),
314            Err(e) => return Err(e.into()),
315        };
316        for entry in read_dir {
317            let entry = entry?;
318            let path = entry.path();
319            if path.is_dir() && path.join(NODE_IDENTITY_FILENAME).exists() {
320                dirs.push(path);
321            }
322        }
323        Ok(dirs)
324    }
325
326    fn build_upgrade_monitor(config: &NodeConfig, node_id_seed: &[u8]) -> UpgradeMonitor {
327        let mut monitor = UpgradeMonitor::new(
328            config.upgrade.github_repo.clone(),
329            config.upgrade.channel,
330            config.upgrade.check_interval_hours,
331        );
332
333        if let Ok(cache_dir) = upgrade_cache_dir() {
334            monitor = monitor.with_release_cache(ReleaseCache::new(
335                cache_dir,
336                std::time::Duration::from_secs(3600),
337            ));
338        }
339
340        if config.upgrade.staged_rollout_hours > 0 {
341            monitor =
342                monitor.with_staged_rollout(node_id_seed, config.upgrade.staged_rollout_hours);
343        }
344
345        monitor
346    }
347
348    /// Build the ANT protocol handler from config.
349    ///
350    /// Initializes LMDB storage, payment verifier, and quote generator.
351    /// Wires ML-DSA-65 signing from the node's identity into the quote generator.
352    async fn build_ant_protocol(
353        config: &NodeConfig,
354        identity: &NodeIdentity,
355    ) -> Result<AntProtocol> {
356        // Create LMDB storage
357        let storage_config = LmdbStorageConfig {
358            root_dir: config.root_dir.clone(),
359            verify_on_read: config.storage.verify_on_read,
360            max_map_size: config.storage.db_size_gb.saturating_mul(1024 * 1024 * 1024),
361            disk_reserve: config.storage.disk_reserve_mb.saturating_mul(MIB),
362        };
363        let storage = LmdbStorage::new(storage_config)
364            .await
365            .map_err(|e| Error::Startup(format!("Failed to create LMDB storage: {e}")))?;
366
367        // Parse rewards address (required — node must know where to receive payments)
368        let rewards_address = match config.payment.rewards_address {
369            Some(ref addr) => parse_rewards_address(addr)?,
370            None => {
371                return Err(Error::Startup(
372                    "No rewards address configured. Set --rewards-address or payment.rewards_address in config.".to_string(),
373                ));
374            }
375        };
376
377        // Create payment verifier
378        let evm_network = match config.payment.evm_network {
379            EvmNetworkConfig::ArbitrumOne => EvmNetwork::ArbitrumOne,
380            EvmNetworkConfig::ArbitrumSepolia => EvmNetwork::ArbitrumSepoliaTest,
381        };
382        let payment_config = PaymentVerifierConfig {
383            evm: EvmVerifierConfig {
384                network: evm_network,
385            },
386            cache_capacity: config.payment.cache_capacity,
387            local_rewards_address: rewards_address,
388        };
389        let payment_verifier = PaymentVerifier::new(payment_config);
390        let metrics_tracker = QuotingMetricsTracker::new(0);
391        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
392
393        // Wire ML-DSA-65 signing from node identity.
394        // This same signer is used for both regular quotes and merkle candidate quotes.
395        crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)?;
396
397        let protocol = AntProtocol::new(
398            Arc::new(storage),
399            Arc::new(payment_verifier),
400            Arc::new(quote_generator),
401        );
402
403        info!(
404            "ANT protocol handler initialized with ML-DSA-65 signing (protocol={CHUNK_PROTOCOL_ID})"
405        );
406
407        Ok(protocol)
408    }
409
410    /// Build the bootstrap cache manager from config.
411    async fn build_bootstrap_manager(config: &NodeConfig) -> Option<BootstrapManager> {
412        let cache_dir = config
413            .bootstrap_cache
414            .cache_dir
415            .clone()
416            .unwrap_or_else(|| config.root_dir.join("bootstrap_cache"));
417
418        // Create cache directory
419        if let Err(e) = std::fs::create_dir_all(&cache_dir) {
420            warn!("Failed to create bootstrap cache directory: {e}");
421            return None;
422        }
423
424        let bootstrap_config = CoreBootstrapConfig {
425            cache_dir,
426            max_peers: config.bootstrap_cache.max_contacts,
427            ..CoreBootstrapConfig::default()
428        };
429
430        match BootstrapManager::with_config(bootstrap_config).await {
431            Ok(manager) => {
432                info!(
433                    "Bootstrap cache initialized with {} max contacts",
434                    config.bootstrap_cache.max_contacts
435                );
436                Some(manager)
437            }
438            Err(e) => {
439                warn!("Failed to initialize bootstrap cache: {e}");
440                None
441            }
442        }
443    }
444}
445
446/// A running Ant node.
447pub struct RunningNode {
448    config: NodeConfig,
449    p2p_node: Arc<P2PNode>,
450    shutdown: CancellationToken,
451    events_tx: NodeEventsSender,
452    events_rx: Option<NodeEventsChannel>,
453    upgrade_monitor: Option<UpgradeMonitor>,
454    /// Bootstrap cache manager for persistent peer storage.
455    bootstrap_manager: Option<BootstrapManager>,
456    /// ANT protocol handler for chunk storage.
457    ant_protocol: Option<Arc<AntProtocol>>,
458    /// Replication engine (manages neighbor sync, verification, audits).
459    replication_engine: Option<ReplicationEngine>,
460    /// Protocol message routing background task.
461    protocol_task: Option<JoinHandle<()>>,
462    /// Exit code requested by a successful upgrade (-1 = no upgrade exit pending).
463    upgrade_exit_code: Arc<AtomicI32>,
464}
465
466impl RunningNode {
467    /// Get the node's root directory.
468    #[must_use]
469    pub fn root_dir(&self) -> &PathBuf {
470        &self.config.root_dir
471    }
472
473    /// Get a receiver for node events.
474    ///
475    /// Note: Can only be called once. Subsequent calls return None.
476    pub fn events(&mut self) -> Option<NodeEventsChannel> {
477        self.events_rx.take()
478    }
479
480    /// Subscribe to node events.
481    #[must_use]
482    pub fn subscribe_events(&self) -> NodeEventsChannel {
483        self.events_tx.subscribe()
484    }
485
486    /// Run the node until shutdown is requested.
487    ///
488    /// # Errors
489    ///
490    /// Returns an error if the node encounters a fatal error.
491    #[allow(clippy::too_many_lines)]
492    pub async fn run(&mut self) -> Result<()> {
493        info!("Node runtime loop starting");
494
495        // Subscribe to DHT events BEFORE starting the P2P node so the
496        // bootstrap-sync task does not miss the BootstrapComplete event
497        // emitted during P2PNode::start().
498        let dht_events_for_bootstrap = self
499            .replication_engine
500            .as_ref()
501            .map(|_| self.p2p_node.dht_manager().subscribe_events());
502
503        // Start the P2P node
504        self.p2p_node
505            .start()
506            .await
507            .map_err(|e| Error::Startup(format!("Failed to start P2P node: {e}")))?;
508
509        let listen_addrs = self.p2p_node.listen_addrs().await;
510        info!(listen_addrs = ?listen_addrs, "P2P node started");
511
512        // Extract the actual bound port (config port may be 0 = auto-select)
513        let actual_port = listen_addrs
514            .first()
515            .and_then(MultiAddr::port)
516            .unwrap_or(self.config.port);
517        info!(
518            port = actual_port,
519            "Node is running on port: {}", actual_port
520        );
521
522        // Emit started event
523        if let Err(e) = self.events_tx.send(NodeEvent::Started) {
524            warn!("Failed to send Started event: {e}");
525        }
526
527        // Start protocol message routing (P2P → AntProtocol → P2P response)
528        self.start_protocol_routing();
529
530        // Start replication engine background tasks
531        if let Some(ref mut engine) = self.replication_engine {
532            // Safety: dht_events_for_bootstrap is Some when replication_engine
533            // is Some (both arms use the same condition).
534            if let Some(dht_events) = dht_events_for_bootstrap {
535                engine.start(dht_events);
536            }
537            info!("Replication engine started");
538        }
539
540        // Start upgrade monitor if enabled
541        if let Some(monitor) = self.upgrade_monitor.take() {
542            let events_tx = self.events_tx.clone();
543            let shutdown = self.shutdown.clone();
544            let stop_on_upgrade = self.config.upgrade.stop_on_upgrade;
545            let upgrade_exit_code = Arc::clone(&self.upgrade_exit_code);
546
547            tokio::spawn(async move {
548                let mut monitor = monitor;
549                let mut upgrader = AutoApplyUpgrader::new().with_stop_on_upgrade(stop_on_upgrade);
550                if let Ok(cache_dir) = upgrade_cache_dir() {
551                    upgrader = upgrader.with_binary_cache(BinaryCache::new(cache_dir));
552                }
553
554                // Add randomized jitter before the first upgrade check to prevent all nodes
555                // from hitting the GitHub API simultaneously when started together.
556                {
557                    let jitter_duration = jittered_interval(monitor.check_interval());
558                    let first_check_time = chrono::Utc::now()
559                        + chrono::Duration::from_std(jitter_duration).unwrap_or_else(|e| {
560                            warn!("chrono::Duration::from_std failed for jitter ({e}), defaulting to 1 minute");
561                            chrono::Duration::minutes(1)
562                        });
563                    info!(
564                        "First upgrade check scheduled for {} (jitter: {}s)",
565                        first_check_time.to_rfc3339(),
566                        jitter_duration.as_secs()
567                    );
568                    tokio::time::sleep(jitter_duration).await;
569                }
570
571                loop {
572                    tokio::select! {
573                        () = shutdown.cancelled() => {
574                            break;
575                        }
576                        result = monitor.check_for_ready_upgrade() => {
577                            match result {
578                                Ok(Some(upgrade_info)) => {
579                                    info!(
580                                        current_version = %upgrader.current_version(),
581                                        new_version = %upgrade_info.version,
582                                        "Upgrade available"
583                                    );
584
585                                    // Send notification event
586                                    if let Err(e) = events_tx.send(NodeEvent::UpgradeAvailable {
587                                        version: upgrade_info.version.to_string(),
588                                    }) {
589                                        warn!("Failed to send UpgradeAvailable event: {e}");
590                                    }
591
592                                    // Auto-apply the upgrade
593                                    info!("Starting auto-apply upgrade...");
594                                    match upgrader.apply_upgrade(&upgrade_info).await {
595                                        Ok(UpgradeResult::Success { version, exit_code }) => {
596                                            info!("Upgrade to {} successful, initiating graceful shutdown", version);
597                                            upgrade_exit_code.store(exit_code, Ordering::SeqCst);
598                                            shutdown.cancel();
599                                            break;
600                                        }
601                                        Ok(UpgradeResult::RolledBack { reason }) => {
602                                            warn!("Error during upgrade process: {}", reason);
603                                        }
604                                        Ok(UpgradeResult::NoUpgrade) => {
605                                            info!("Already running latest version");
606                                        }
607                                        Err(e) => {
608                                            error!("Error during upgrade process: {}", e);
609                                        }
610                                    }
611                                }
612                                Ok(None) => {
613                                    if let Some(remaining) = monitor.time_until_upgrade() {
614                                        info!(
615                                            "Upgrade pending, rollout delay remaining: {}m {}s",
616                                            remaining.as_secs() / 60,
617                                            remaining.as_secs() % 60
618                                        );
619                                    } else {
620                                        info!("No upgrade available");
621                                    }
622                                }
623                                Err(e) => {
624                                    warn!("Error during upgrade process: {}", e);
625                                }
626                            }
627                            // If an upgrade is pending, sleep for exactly the remaining
628                            // rollout delay so the node restarts at its scheduled time
629                            // rather than waiting for the next check interval tick.
630                            let sleep_duration = monitor.time_until_upgrade().map_or_else(
631                                || {
632                                    // No pending upgrade - schedule next check with jitter
633                                    let jittered_duration =
634                                        jittered_interval(monitor.check_interval());
635                                    let next_check = chrono::Utc::now()
636                                        + chrono::Duration::from_std(jittered_duration).unwrap_or_else(|e| {
637                                            warn!("chrono::Duration::from_std failed for interval ({e}), defaulting to 1 hour");
638                                            chrono::Duration::hours(1)
639                                        });
640                                    info!("Next upgrade check scheduled for {}", next_check.to_rfc3339());
641                                    jittered_duration
642                                },
643                                |remaining| {
644                                    // If the rollout delay has fully elapsed but the upgrade was
645                                    // not successfully applied, avoid a tight loop by backing off
646                                    // at least one check interval before retrying.
647                                    if remaining.is_zero() {
648                                        let backoff = jittered_interval(monitor.check_interval());
649                                        let next_check = chrono::Utc::now()
650                                            + chrono::Duration::from_std(backoff).unwrap_or_else(|e| {
651                                                warn!("chrono::Duration::from_std failed for backoff ({e}), defaulting to 1 hour");
652                                                chrono::Duration::hours(1)
653                                            });
654                                        info!(
655                                            "Upgrade rollout delay elapsed but previous apply did not succeed; \
656                                             backing off, next check scheduled for {}",
657                                            next_check.to_rfc3339()
658                                        );
659                                        backoff
660                                    } else {
661                                        let wake_time = chrono::Utc::now()
662                                            + chrono::Duration::from_std(remaining).unwrap_or_else(|e| {
663                                                warn!("chrono::Duration::from_std failed for rollout delay ({e}), defaulting to 1 minute");
664                                                chrono::Duration::minutes(1)
665                                            });
666                                        info!("Will apply upgrade at {}", wake_time.to_rfc3339());
667                                        remaining
668                                    }
669                                },
670                            );
671                            // Use select! so shutdown can interrupt long sleeps
672                            // (e.g. during a full rollout window delay).
673                            tokio::select! {
674                                () = shutdown.cancelled() => {
675                                    break;
676                                }
677                                () = tokio::time::sleep(sleep_duration) => {}
678                            }
679                        }
680                    }
681                }
682            });
683        }
684
685        info!("Node running, waiting for shutdown signal");
686
687        // Run the main event loop with signal handling
688        self.run_event_loop().await?;
689
690        // Log bootstrap cache stats before shutdown
691        if let Some(ref manager) = self.bootstrap_manager {
692            let stats = manager.stats().await;
693            info!(
694                "Bootstrap cache shutdown: {} peers, avg quality {:.2}",
695                stats.total_peers, stats.average_quality
696            );
697        }
698
699        // Shutdown replication engine before P2P so background tasks don't
700        // use a dead P2P layer, and Arc<LmdbStorage> references are released.
701        if let Some(ref mut engine) = self.replication_engine {
702            engine.shutdown().await;
703        }
704
705        // Stop protocol routing task
706        if let Some(handle) = self.protocol_task.take() {
707            handle.abort();
708        }
709
710        // Shutdown P2P node
711        info!("Shutting down P2P node...");
712        if let Err(e) = self.p2p_node.shutdown().await {
713            warn!("Error during P2P node shutdown: {e}");
714        }
715
716        if let Err(e) = self.events_tx.send(NodeEvent::ShuttingDown) {
717            warn!("Failed to send ShuttingDown event: {e}");
718        }
719        info!("Node shutdown complete");
720
721        // If an upgrade triggered the shutdown, exit with the requested code.
722        // This happens *after* all cleanup (P2P shutdown, log flush, etc.) so
723        // that destructors and async resources are properly torn down.
724        let exit_code = self.upgrade_exit_code.load(Ordering::SeqCst);
725        if exit_code >= 0 {
726            info!("Exiting with code {} for upgrade restart", exit_code);
727            std::process::exit(exit_code);
728        }
729
730        Ok(())
731    }
732
733    /// Run the main event loop, handling shutdown and signals.
734    #[cfg(unix)]
735    async fn run_event_loop(&self) -> Result<()> {
736        let mut sigterm = signal(SignalKind::terminate())?;
737        let mut sighup = signal(SignalKind::hangup())?;
738
739        loop {
740            tokio::select! {
741                () = self.shutdown.cancelled() => {
742                    info!("Shutdown signal received");
743                    break;
744                }
745                _ = tokio::signal::ctrl_c() => {
746                    info!("Received SIGINT (Ctrl-C), initiating shutdown");
747                    self.shutdown();
748                    break;
749                }
750                _ = sigterm.recv() => {
751                    info!("Received SIGTERM, initiating shutdown");
752                    self.shutdown();
753                    break;
754                }
755                _ = sighup.recv() => {
756                    info!("Received SIGHUP (config reload not yet supported)");
757                }
758            }
759        }
760        Ok(())
761    }
762
763    /// Run the main event loop, handling shutdown signals (non-Unix version).
764    #[cfg(not(unix))]
765    async fn run_event_loop(&self) -> Result<()> {
766        loop {
767            tokio::select! {
768                () = self.shutdown.cancelled() => {
769                    info!("Shutdown signal received");
770                    break;
771                }
772                _ = tokio::signal::ctrl_c() => {
773                    info!("Received Ctrl-C, initiating shutdown");
774                    self.shutdown();
775                    break;
776                }
777            }
778        }
779        Ok(())
780    }
781
782    /// Start the protocol message routing background task.
783    ///
784    /// Subscribes to P2P events and routes incoming chunk protocol messages
785    /// to the `AntProtocol` handler, sending responses back to the sender.
786    fn start_protocol_routing(&mut self) {
787        let protocol = match self.ant_protocol {
788            Some(ref p) => Arc::clone(p),
789            None => return,
790        };
791
792        let mut events = self.p2p_node.subscribe_events();
793        let p2p = Arc::clone(&self.p2p_node);
794        let semaphore = Arc::new(Semaphore::new(64));
795
796        self.protocol_task = Some(tokio::spawn(async move {
797            while let Ok(event) = events.recv().await {
798                if let P2PEvent::Message {
799                    topic,
800                    source: Some(source),
801                    data,
802                } = event
803                {
804                    let handler_info: Option<(&str, &str)> = if topic == CHUNK_PROTOCOL_ID {
805                        Some(("chunk", CHUNK_PROTOCOL_ID))
806                    } else {
807                        None
808                    };
809
810                    if let Some((data_type, response_topic)) = handler_info {
811                        debug!("Received {data_type} protocol message from {source}");
812                        let protocol = Arc::clone(&protocol);
813                        let p2p = Arc::clone(&p2p);
814                        let sem = semaphore.clone();
815                        tokio::spawn(async move {
816                            let Ok(_permit) = sem.acquire().await else {
817                                return;
818                            };
819                            let result = match data_type {
820                                "chunk" => protocol.try_handle_request(&data).await,
821                                _ => return,
822                            };
823                            match result {
824                                Ok(Some(response)) => {
825                                    if let Err(e) = p2p
826                                        .send_message(
827                                            &source,
828                                            response_topic,
829                                            response.to_vec(),
830                                            &[],
831                                        )
832                                        .await
833                                    {
834                                        warn!("Failed to send {data_type} protocol response to {source}: {e}");
835                                    }
836                                }
837                                Ok(None) => {}
838                                Err(e) => {
839                                    warn!("{data_type} protocol handler error: {e}");
840                                }
841                            }
842                        });
843                    }
844                }
845            }
846        }));
847        info!("Protocol message routing started");
848    }
849
850    /// Request the node to shut down.
851    pub fn shutdown(&self) {
852        self.shutdown.cancel();
853    }
854}
855
856/// Apply ±5% jitter to a base interval to prevent thundering-herd behaviour
857/// when multiple nodes check for upgrades on the same schedule.
858fn jittered_interval(base: std::time::Duration) -> std::time::Duration {
859    let secs = base.as_secs();
860    let variance = secs / 20; // 5%
861    if variance == 0 {
862        return base;
863    }
864    let jitter = rand::thread_rng().gen_range(0..=variance * 2);
865    std::time::Duration::from_secs(secs.saturating_sub(variance) + jitter)
866}
867
868#[cfg(test)]
869#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
870mod tests {
871    use super::*;
872    use crate::config::NODES_SUBDIR;
873
874    #[test]
875    fn test_build_upgrade_monitor_staged_rollout_enabled() {
876        let config = NodeConfig {
877            upgrade: crate::config::UpgradeConfig {
878                staged_rollout_hours: 24,
879                ..Default::default()
880            },
881            ..Default::default()
882        };
883        let seed = b"node-seed";
884
885        let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
886        assert!(monitor.has_staged_rollout());
887    }
888
889    #[test]
890    fn test_build_upgrade_monitor_staged_rollout_disabled() {
891        let config = NodeConfig {
892            upgrade: crate::config::UpgradeConfig {
893                staged_rollout_hours: 0,
894                ..Default::default()
895            },
896            ..Default::default()
897        };
898        let seed = b"node-seed";
899
900        let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
901        assert!(!monitor.has_staged_rollout());
902    }
903
904    #[test]
905    fn test_build_core_config_sets_production_mode() {
906        let config = NodeConfig {
907            network_mode: NetworkMode::Production,
908            ..Default::default()
909        };
910        let core = NodeBuilder::build_core_config(&config).expect("core config");
911        assert!(core.diversity_config.is_some());
912    }
913
914    #[test]
915    fn test_build_core_config_ipv4_only() {
916        let config = NodeConfig {
917            ipv4_only: true,
918            ..Default::default()
919        };
920        let core = NodeBuilder::build_core_config(&config).expect("core config");
921        assert!(!core.ipv6, "ipv4_only should disable IPv6");
922    }
923
924    #[test]
925    fn test_build_core_config_dual_stack_by_default() {
926        let config = NodeConfig::default();
927        let core = NodeBuilder::build_core_config(&config).expect("core config");
928        assert!(core.ipv6, "dual-stack should be the default");
929    }
930
931    #[test]
932    fn test_build_core_config_sets_development_mode_permissive() {
933        let config = NodeConfig {
934            network_mode: NetworkMode::Development,
935            ..Default::default()
936        };
937        let core = NodeBuilder::build_core_config(&config).expect("core config");
938        let diversity = core.diversity_config.expect("diversity");
939        assert_eq!(diversity.max_per_ip, Some(usize::MAX));
940        assert_eq!(diversity.max_per_subnet, Some(usize::MAX));
941    }
942
943    #[test]
944    fn test_scan_identity_dirs_empty_dir() {
945        let tmp = tempfile::tempdir().unwrap();
946        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
947        assert!(dirs.is_empty());
948    }
949
950    #[test]
951    fn test_scan_identity_dirs_nonexistent_dir() {
952        let tmp = tempfile::tempdir().unwrap();
953        let path = tmp.path().join("nonexistent_identity_dir");
954        let dirs = NodeBuilder::scan_identity_dirs(&path).unwrap();
955        assert!(dirs.is_empty());
956    }
957
958    #[test]
959    fn test_scan_identity_dirs_finds_one() {
960        let tmp = tempfile::tempdir().unwrap();
961        let node_dir = tmp.path().join("abc123");
962        std::fs::create_dir_all(&node_dir).unwrap();
963        std::fs::write(node_dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
964
965        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
966        assert_eq!(dirs.len(), 1);
967        assert_eq!(dirs[0], node_dir);
968    }
969
970    #[test]
971    fn test_scan_identity_dirs_finds_multiple() {
972        let tmp = tempfile::tempdir().unwrap();
973        for name in &["node_a", "node_b"] {
974            let dir = tmp.path().join(name);
975            std::fs::create_dir_all(&dir).unwrap();
976            std::fs::write(dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
977        }
978        // A directory without a key file should be ignored
979        std::fs::create_dir_all(tmp.path().join("no_key")).unwrap();
980
981        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
982        assert_eq!(dirs.len(), 2);
983    }
984
985    #[tokio::test]
986    async fn test_resolve_identity_first_run_creates_identity() {
987        let tmp = tempfile::tempdir().unwrap();
988        let mut config = NodeConfig {
989            root_dir: tmp.path().to_path_buf(),
990            ..Default::default()
991        };
992
993        let identity = NodeBuilder::resolve_identity(&mut config).await.unwrap();
994        // Key file should exist
995        assert!(tmp.path().join(NODE_IDENTITY_FILENAME).exists());
996        // peer_id should be derivable from the identity
997        let peer_id = identity.peer_id().to_hex();
998        assert_eq!(peer_id.len(), 64); // 32 bytes hex-encoded
999    }
1000
1001    #[tokio::test]
1002    async fn test_resolve_identity_loads_existing() {
1003        let tmp = tempfile::tempdir().unwrap();
1004
1005        // Generate and save an identity
1006        let original = NodeIdentity::generate().unwrap();
1007        original
1008            .save_to_file(&tmp.path().join(NODE_IDENTITY_FILENAME))
1009            .await
1010            .unwrap();
1011
1012        let mut config = NodeConfig {
1013            root_dir: tmp.path().to_path_buf(),
1014            ..Default::default()
1015        };
1016
1017        let loaded = NodeBuilder::resolve_identity(&mut config).await.unwrap();
1018        assert_eq!(loaded.peer_id(), original.peer_id());
1019    }
1020
1021    #[test]
1022    fn test_peer_id_hex_length() {
1023        let id = saorsa_core::identity::PeerId::from_bytes([0x42; 32]);
1024        let hex = id.to_hex();
1025        assert_eq!(hex.len(), 64); // 32 bytes = 64 hex chars
1026    }
1027
1028    /// Simulates a node restart: first run creates identity in a scoped subdir
1029    /// under `nodes/`, second run discovers and reloads it — `peer_id` must be
1030    /// identical and the directory name is the full 64-char hex peer ID.
1031    #[tokio::test]
1032    async fn test_identity_persisted_across_restarts() {
1033        let base_dir = tempfile::tempdir().unwrap();
1034        let nodes_dir = base_dir.path().join(NODES_SUBDIR);
1035
1036        // First "boot": generate identity, save it in nodes/{peer_id}/
1037        let identity1 = NodeIdentity::generate().unwrap();
1038        let peer_id1 = identity1.peer_id().to_hex();
1039        let peer_dir = nodes_dir.join(&peer_id1);
1040        std::fs::create_dir_all(&peer_dir).unwrap();
1041        identity1
1042            .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
1043            .await
1044            .unwrap();
1045
1046        // Verify directory name is the full 64-char hex peer ID
1047        assert_eq!(peer_id1.len(), 64);
1048        assert_eq!(peer_dir.file_name().unwrap().to_string_lossy(), peer_id1);
1049
1050        // Second "boot": scan should find and reload the same identity
1051        let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1052        assert_eq!(identity_dirs.len(), 1);
1053        let loaded = NodeIdentity::load_from_file(&identity_dirs[0].join(NODE_IDENTITY_FILENAME))
1054            .await
1055            .unwrap();
1056        let peer_id2 = loaded.peer_id().to_hex();
1057
1058        assert_eq!(peer_id1, peer_id2, "peer_id must survive restart");
1059        assert_eq!(
1060            identity_dirs[0], peer_dir,
1061            "root_dir must be the same directory"
1062        );
1063    }
1064
1065    /// When two identity subdirs exist under `nodes/`, the scan finds multiple
1066    /// and the resolve path would error asking for `--root-dir`.
1067    #[tokio::test]
1068    async fn test_multiple_identities_errors() {
1069        let base_dir = tempfile::tempdir().unwrap();
1070        let nodes_dir = base_dir.path().join(NODES_SUBDIR);
1071
1072        // Create two identity subdirectories under nodes/
1073        for name in &["aaaa", "bbbb"] {
1074            let dir = nodes_dir.join(name);
1075            std::fs::create_dir_all(&dir).unwrap();
1076            let identity = NodeIdentity::generate().unwrap();
1077            identity
1078                .save_to_file(&dir.join(NODE_IDENTITY_FILENAME))
1079                .await
1080                .unwrap();
1081        }
1082
1083        let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1084        assert_eq!(identity_dirs.len(), 2, "should find both identity dirs");
1085    }
1086
1087    /// With a non-default `root_dir` (explicit path), the identity is created on
1088    /// first run and reloaded on subsequent runs from the same directory.
1089    #[tokio::test]
1090    async fn test_explicit_root_dir_persists_across_restarts() {
1091        let tmp = tempfile::tempdir().unwrap();
1092
1093        // First boot — non-default root_dir triggers explicit path
1094        let mut config1 = NodeConfig {
1095            root_dir: tmp.path().to_path_buf(),
1096            ..Default::default()
1097        };
1098        let identity1 = NodeBuilder::resolve_identity(&mut config1).await.unwrap();
1099
1100        // Second boot — same dir
1101        let mut config2 = NodeConfig {
1102            root_dir: tmp.path().to_path_buf(),
1103            ..Default::default()
1104        };
1105        let identity2 = NodeBuilder::resolve_identity(&mut config2).await.unwrap();
1106
1107        assert_eq!(
1108            identity1.peer_id(),
1109            identity2.peer_id(),
1110            "explicit --root-dir must yield stable identity"
1111        );
1112    }
1113}