Skip to main content

ant_node/
node.rs

1//! Node implementation - thin wrapper around saorsa-core's `P2PNode`.
2
3use crate::ant_protocol::{CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE};
4use crate::config::{
5    default_nodes_dir, default_root_dir, EvmNetworkConfig, IpVersion, NetworkMode, NodeConfig,
6    NODE_IDENTITY_FILENAME,
7};
8use crate::error::{Error, Result};
9use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender};
10use crate::payment::metrics::QuotingMetricsTracker;
11use crate::payment::wallet::parse_rewards_address;
12use crate::payment::{EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
13use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
14use crate::upgrade::{
15    upgrade_cache_dir, AutoApplyUpgrader, BinaryCache, ReleaseCache, UpgradeMonitor, UpgradeResult,
16};
17use evmlib::Network as EvmNetwork;
18use rand::Rng;
19use saorsa_core::identity::NodeIdentity;
20use saorsa_core::{
21    BootstrapConfig as CoreBootstrapConfig, BootstrapManager,
22    IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent,
23    P2PNode,
24};
25use std::path::PathBuf;
26use std::sync::atomic::{AtomicI32, Ordering};
27use std::sync::Arc;
28use tokio::sync::Semaphore;
29use tokio::task::JoinHandle;
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, info, warn};
32
33/// Node storage capacity limit (5 GB).
34///
35/// Used to derive `max_records` for the quoting metrics pricing curve.
36/// A node advertises `NODE_STORAGE_LIMIT_BYTES / MAX_CHUNK_SIZE` as
37/// its maximum record count, giving the pricing algorithm a meaningful
38/// fullness ratio instead of a hardcoded constant.
39pub const NODE_STORAGE_LIMIT_BYTES: u64 = 5 * 1024 * 1024 * 1024;
40
41#[cfg(unix)]
42use tokio::signal::unix::{signal, SignalKind};
43
44/// Builder for constructing an Ant node.
45pub struct NodeBuilder {
46    config: NodeConfig,
47}
48
49impl NodeBuilder {
50    /// Create a new node builder with the given configuration.
51    #[must_use]
52    pub fn new(config: NodeConfig) -> Self {
53        Self { config }
54    }
55
56    /// Build and start the node.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if the node fails to start.
61    pub async fn build(mut self) -> Result<RunningNode> {
62        info!("Building ant-node with config: {:?}", self.config);
63
64        // Validate rewards address in production
65        if self.config.network_mode == NetworkMode::Production {
66            match self.config.payment.rewards_address {
67                None => {
68                    return Err(Error::Config(
69                        "CRITICAL: Rewards address is not configured. \
70                         Set payment.rewards_address in config to your Arbitrum wallet address."
71                            .to_string(),
72                    ));
73                }
74                Some(ref addr) if addr == "0xYOUR_ARBITRUM_ADDRESS_HERE" || addr.is_empty() => {
75                    return Err(Error::Config(
76                        "CRITICAL: Rewards address is not configured. \
77                         Set payment.rewards_address in config to your Arbitrum wallet address."
78                            .to_string(),
79                    ));
80                }
81                Some(_) => {}
82            }
83        }
84
85        // Resolve identity and root_dir (may update self.config.root_dir)
86        let identity = Arc::new(Self::resolve_identity(&mut self.config).await?);
87        let peer_id = identity.peer_id().to_hex();
88
89        info!(peer_id = %peer_id, root_dir = %self.config.root_dir.display(), "Node identity resolved");
90
91        // Ensure root directory exists
92        std::fs::create_dir_all(&self.config.root_dir)?;
93
94        // Create shutdown token
95        let shutdown = CancellationToken::new();
96
97        // Create event channel
98        let (events_tx, events_rx) = create_event_channel();
99
100        // Convert our config to saorsa-core's config
101        let mut core_config = Self::build_core_config(&self.config)?;
102        // Inject the ML-DSA identity so the P2PNode's transport peer ID
103        // matches the pub_key embedded in payment quotes.
104        core_config.node_identity = Some(Arc::clone(&identity));
105        debug!("Core config: {:?}", core_config);
106
107        // Initialize saorsa-core's P2PNode
108        let p2p_node = P2PNode::new(core_config)
109            .await
110            .map_err(|e| Error::Startup(format!("Failed to create P2P node: {e}")))?;
111
112        // Create upgrade monitor
113        let upgrade_monitor = {
114            let node_id_seed = p2p_node.peer_id().as_bytes();
115            Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
116        };
117
118        // Initialize bootstrap cache manager if enabled
119        let bootstrap_manager = if self.config.bootstrap_cache.enabled {
120            Self::build_bootstrap_manager(&self.config).await
121        } else {
122            info!("Bootstrap cache disabled");
123            None
124        };
125
126        // Initialize ANT protocol handler for chunk storage
127        let ant_protocol = if self.config.storage.enabled {
128            Some(Arc::new(
129                Self::build_ant_protocol(&self.config, &identity).await?,
130            ))
131        } else {
132            info!("Chunk storage disabled");
133            None
134        };
135
136        let node = RunningNode {
137            config: self.config,
138            p2p_node: Arc::new(p2p_node),
139            shutdown,
140            events_tx,
141            events_rx: Some(events_rx),
142            upgrade_monitor,
143            bootstrap_manager,
144            ant_protocol,
145            protocol_task: None,
146            upgrade_exit_code: Arc::new(AtomicI32::new(-1)),
147        };
148
149        Ok(node)
150    }
151
152    /// Build the saorsa-core `NodeConfig` from our config.
153    fn build_core_config(config: &NodeConfig) -> Result<CoreNodeConfig> {
154        let ipv6 = matches!(config.ip_version, IpVersion::Ipv6 | IpVersion::Dual);
155        let local = matches!(config.network_mode, NetworkMode::Development);
156
157        let mut core_config = CoreNodeConfig::builder()
158            .port(config.port)
159            .ipv6(ipv6)
160            .local(local)
161            .max_message_size(config.max_message_size)
162            .build()
163            .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?;
164
165        // Add bootstrap peers.
166        core_config.bootstrap_peers = config
167            .bootstrap
168            .iter()
169            .map(|addr| MultiAddr::quic(*addr))
170            .collect();
171
172        // Propagate network-mode tuning into saorsa-core where supported.
173        match config.network_mode {
174            NetworkMode::Production => {
175                core_config.diversity_config = Some(CoreDiversityConfig::default());
176            }
177            NetworkMode::Testnet => {
178                // Testnet allows loopback so nodes can be co-located on one machine.
179                core_config.allow_loopback = true;
180                let mut diversity = CoreDiversityConfig::testnet();
181                diversity.max_nodes_per_asn = config.testnet.max_nodes_per_asn;
182                diversity.max_nodes_per_ipv6_64 = config.testnet.max_nodes_per_64;
183                diversity.enable_geolocation_check = config.testnet.enable_geo_checks;
184                diversity.min_geographic_diversity = if config.testnet.enable_geo_checks {
185                    3
186                } else {
187                    1
188                };
189                core_config.diversity_config = Some(diversity);
190
191                if config.testnet.enforce_age_requirements {
192                    warn!(
193                        "testnet.enforce_age_requirements is set but saorsa-core does not yet \
194                         expose a knob; age checks may remain relaxed"
195                    );
196                }
197            }
198            NetworkMode::Development => {
199                core_config.diversity_config = Some(CoreDiversityConfig::permissive());
200            }
201        }
202
203        Ok(core_config)
204    }
205
206    /// Resolve the node identity from disk or generate a new one.
207    ///
208    /// **When `root_dir` differs from the platform default** (set via `--root-dir`
209    /// or loaded from `config.toml`):
210    ///   - Use `root_dir` directly: load existing identity or generate a new one.
211    ///
212    /// **When `root_dir` is the platform default** (first run, no config file):
213    ///   1. Scan `{default_root_dir}/nodes/` for subdirectories containing
214    ///      `node_identity.key`.
215    ///   2. **None found** — first run: generate identity, create
216    ///      `nodes/{full_peer_id}/`, save identity there, update `config.root_dir`.
217    ///   3. **Exactly one found** — load it and update `config.root_dir`.
218    ///   4. **Multiple found** — return an error asking for `--root-dir`.
219    async fn resolve_identity(config: &mut NodeConfig) -> Result<NodeIdentity> {
220        if config.root_dir != default_root_dir() {
221            return Self::load_or_generate_identity(&config.root_dir).await;
222        }
223
224        let nodes_dir = default_nodes_dir();
225        let identity_dirs = Self::scan_identity_dirs(&nodes_dir)?;
226
227        match identity_dirs.len() {
228            0 => {
229                // First run: generate new identity and create a peer-id-scoped subdirectory
230                let identity = NodeIdentity::generate().map_err(|e| {
231                    Error::Startup(format!("Failed to generate node identity: {e}"))
232                })?;
233                let peer_id = identity.peer_id().to_hex();
234                let peer_dir = nodes_dir.join(&peer_id);
235                std::fs::create_dir_all(&peer_dir)?;
236                identity
237                    .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
238                    .await
239                    .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
240                config.root_dir = peer_dir;
241                Ok(identity)
242            }
243            1 => {
244                let dir = identity_dirs
245                    .first()
246                    .ok_or_else(|| Error::Config("No identity dirs found".to_string()))?;
247                let identity = NodeIdentity::load_from_file(&dir.join(NODE_IDENTITY_FILENAME))
248                    .await
249                    .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))?;
250                config.root_dir.clone_from(dir);
251                Ok(identity)
252            }
253            _ => {
254                let dirs: Vec<String> = identity_dirs
255                    .iter()
256                    .filter_map(|d| d.file_name().map(|n| n.to_string_lossy().into_owned()))
257                    .collect();
258                Err(Error::Config(format!(
259                    "Multiple node identities found at {}: [{}]. Specify --root-dir to select one.",
260                    nodes_dir.display(),
261                    dirs.join(", ")
262                )))
263            }
264        }
265    }
266
267    /// Load an existing identity from `dir/node_identity.key`, or generate and save a new one.
268    async fn load_or_generate_identity(dir: &std::path::Path) -> Result<NodeIdentity> {
269        let key_path = dir.join(NODE_IDENTITY_FILENAME);
270        if key_path.exists() {
271            NodeIdentity::load_from_file(&key_path)
272                .await
273                .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))
274        } else {
275            let identity = NodeIdentity::generate()
276                .map_err(|e| Error::Startup(format!("Failed to generate node identity: {e}")))?;
277            std::fs::create_dir_all(dir)?;
278            identity
279                .save_to_file(&key_path)
280                .await
281                .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
282            Ok(identity)
283        }
284    }
285
286    /// Scan `base_dir` for immediate subdirectories that contain `node_identity.key`.
287    fn scan_identity_dirs(base_dir: &std::path::Path) -> Result<Vec<PathBuf>> {
288        let mut dirs = Vec::new();
289        let read_dir = match std::fs::read_dir(base_dir) {
290            Ok(rd) => rd,
291            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(dirs),
292            Err(e) => return Err(e.into()),
293        };
294        for entry in read_dir {
295            let entry = entry?;
296            let path = entry.path();
297            if path.is_dir() && path.join(NODE_IDENTITY_FILENAME).exists() {
298                dirs.push(path);
299            }
300        }
301        Ok(dirs)
302    }
303
304    fn build_upgrade_monitor(config: &NodeConfig, node_id_seed: &[u8]) -> UpgradeMonitor {
305        let mut monitor = UpgradeMonitor::new(
306            config.upgrade.github_repo.clone(),
307            config.upgrade.channel,
308            config.upgrade.check_interval_hours,
309        );
310
311        if let Ok(cache_dir) = upgrade_cache_dir() {
312            monitor = monitor.with_release_cache(ReleaseCache::new(
313                cache_dir,
314                std::time::Duration::from_secs(3600),
315            ));
316        }
317
318        if config.upgrade.staged_rollout_hours > 0 {
319            monitor =
320                monitor.with_staged_rollout(node_id_seed, config.upgrade.staged_rollout_hours);
321        }
322
323        monitor
324    }
325
326    /// Build the ANT protocol handler from config.
327    ///
328    /// Initializes LMDB storage, payment verifier, and quote generator.
329    /// Wires ML-DSA-65 signing from the node's identity into the quote generator.
330    async fn build_ant_protocol(
331        config: &NodeConfig,
332        identity: &NodeIdentity,
333    ) -> Result<AntProtocol> {
334        // Create LMDB storage
335        let storage_config = LmdbStorageConfig {
336            root_dir: config.root_dir.clone(),
337            verify_on_read: config.storage.verify_on_read,
338            max_chunks: config.storage.max_chunks,
339            max_map_size: config.storage.db_size_gb.saturating_mul(1_073_741_824),
340        };
341        let storage = LmdbStorage::new(storage_config)
342            .await
343            .map_err(|e| Error::Startup(format!("Failed to create LMDB storage: {e}")))?;
344
345        // Parse rewards address (required — node must know where to receive payments)
346        let rewards_address = match config.payment.rewards_address {
347            Some(ref addr) => parse_rewards_address(addr)?,
348            None => {
349                return Err(Error::Startup(
350                    "No rewards address configured. Set --rewards-address or payment.rewards_address in config.".to_string(),
351                ));
352            }
353        };
354
355        // Create payment verifier
356        let evm_network = match config.payment.evm_network {
357            EvmNetworkConfig::ArbitrumOne => EvmNetwork::ArbitrumOne,
358            EvmNetworkConfig::ArbitrumSepolia => EvmNetwork::ArbitrumSepoliaTest,
359        };
360        let payment_config = PaymentVerifierConfig {
361            evm: EvmVerifierConfig {
362                network: evm_network,
363            },
364            cache_capacity: config.payment.cache_capacity,
365            local_rewards_address: rewards_address,
366        };
367        let payment_verifier = PaymentVerifier::new(payment_config);
368        // Safe: 5GB fits in usize on all supported 64-bit platforms.
369        #[allow(clippy::cast_possible_truncation)]
370        let max_records = (NODE_STORAGE_LIMIT_BYTES as usize) / MAX_CHUNK_SIZE;
371        let metrics_tracker = QuotingMetricsTracker::new(max_records, 0);
372        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
373
374        // Wire ML-DSA-65 signing from node identity.
375        // This same signer is used for both regular quotes and merkle candidate quotes.
376        crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)?;
377
378        let protocol = AntProtocol::new(
379            Arc::new(storage),
380            Arc::new(payment_verifier),
381            Arc::new(quote_generator),
382        );
383
384        info!(
385            "ANT protocol handler initialized with ML-DSA-65 signing (protocol={CHUNK_PROTOCOL_ID})"
386        );
387
388        Ok(protocol)
389    }
390
391    /// Build the bootstrap cache manager from config.
392    async fn build_bootstrap_manager(config: &NodeConfig) -> Option<BootstrapManager> {
393        let cache_dir = config
394            .bootstrap_cache
395            .cache_dir
396            .clone()
397            .unwrap_or_else(|| config.root_dir.join("bootstrap_cache"));
398
399        // Create cache directory
400        if let Err(e) = std::fs::create_dir_all(&cache_dir) {
401            warn!("Failed to create bootstrap cache directory: {e}");
402            return None;
403        }
404
405        let bootstrap_config = CoreBootstrapConfig {
406            cache_dir,
407            max_peers: config.bootstrap_cache.max_contacts,
408            ..CoreBootstrapConfig::default()
409        };
410
411        match BootstrapManager::with_config(bootstrap_config).await {
412            Ok(manager) => {
413                info!(
414                    "Bootstrap cache initialized with {} max contacts",
415                    config.bootstrap_cache.max_contacts
416                );
417                Some(manager)
418            }
419            Err(e) => {
420                warn!("Failed to initialize bootstrap cache: {e}");
421                None
422            }
423        }
424    }
425}
426
427/// A running Ant node.
428pub struct RunningNode {
429    config: NodeConfig,
430    p2p_node: Arc<P2PNode>,
431    shutdown: CancellationToken,
432    events_tx: NodeEventsSender,
433    events_rx: Option<NodeEventsChannel>,
434    upgrade_monitor: Option<UpgradeMonitor>,
435    /// Bootstrap cache manager for persistent peer storage.
436    bootstrap_manager: Option<BootstrapManager>,
437    /// ANT protocol handler for chunk storage.
438    ant_protocol: Option<Arc<AntProtocol>>,
439    /// Protocol message routing background task.
440    protocol_task: Option<JoinHandle<()>>,
441    /// Exit code requested by a successful upgrade (-1 = no upgrade exit pending).
442    upgrade_exit_code: Arc<AtomicI32>,
443}
444
445impl RunningNode {
446    /// Get the node's root directory.
447    #[must_use]
448    pub fn root_dir(&self) -> &PathBuf {
449        &self.config.root_dir
450    }
451
452    /// Get a receiver for node events.
453    ///
454    /// Note: Can only be called once. Subsequent calls return None.
455    pub fn events(&mut self) -> Option<NodeEventsChannel> {
456        self.events_rx.take()
457    }
458
459    /// Subscribe to node events.
460    #[must_use]
461    pub fn subscribe_events(&self) -> NodeEventsChannel {
462        self.events_tx.subscribe()
463    }
464
465    /// Run the node until shutdown is requested.
466    ///
467    /// # Errors
468    ///
469    /// Returns an error if the node encounters a fatal error.
470    #[allow(clippy::too_many_lines)]
471    pub async fn run(&mut self) -> Result<()> {
472        info!("Node runtime loop starting");
473
474        // Start the P2P node
475        self.p2p_node
476            .start()
477            .await
478            .map_err(|e| Error::Startup(format!("Failed to start P2P node: {e}")))?;
479
480        let listen_addrs = self.p2p_node.listen_addrs().await;
481        info!(listen_addrs = ?listen_addrs, "P2P node started");
482
483        // Extract the actual bound port (config port may be 0 = auto-select)
484        let actual_port = listen_addrs
485            .first()
486            .and_then(MultiAddr::port)
487            .unwrap_or(self.config.port);
488        info!(
489            port = actual_port,
490            "Node is running on port: {}", actual_port
491        );
492
493        // Emit started event
494        if let Err(e) = self.events_tx.send(NodeEvent::Started) {
495            warn!("Failed to send Started event: {e}");
496        }
497
498        // Start protocol message routing (P2P → AntProtocol → P2P response)
499        self.start_protocol_routing();
500
501        // Start upgrade monitor if enabled
502        if let Some(monitor) = self.upgrade_monitor.take() {
503            let events_tx = self.events_tx.clone();
504            let shutdown = self.shutdown.clone();
505            let stop_on_upgrade = self.config.upgrade.stop_on_upgrade;
506            let upgrade_exit_code = Arc::clone(&self.upgrade_exit_code);
507
508            tokio::spawn(async move {
509                let mut monitor = monitor;
510                let mut upgrader = AutoApplyUpgrader::new().with_stop_on_upgrade(stop_on_upgrade);
511                if let Ok(cache_dir) = upgrade_cache_dir() {
512                    upgrader = upgrader.with_binary_cache(BinaryCache::new(cache_dir));
513                }
514
515                // Add randomized jitter before the first upgrade check to prevent all nodes
516                // from hitting the GitHub API simultaneously when started together.
517                {
518                    let jitter_duration = jittered_interval(monitor.check_interval());
519                    let first_check_time = chrono::Utc::now()
520                        + chrono::Duration::from_std(jitter_duration).unwrap_or_else(|e| {
521                            warn!("chrono::Duration::from_std failed for jitter ({e}), defaulting to 1 minute");
522                            chrono::Duration::minutes(1)
523                        });
524                    info!(
525                        "First upgrade check scheduled for {} (jitter: {}s)",
526                        first_check_time.to_rfc3339(),
527                        jitter_duration.as_secs()
528                    );
529                    tokio::time::sleep(jitter_duration).await;
530                }
531
532                loop {
533                    tokio::select! {
534                        () = shutdown.cancelled() => {
535                            break;
536                        }
537                        result = monitor.check_for_ready_upgrade() => {
538                            match result {
539                                Ok(Some(upgrade_info)) => {
540                                    info!(
541                                        current_version = %upgrader.current_version(),
542                                        new_version = %upgrade_info.version,
543                                        "Upgrade available"
544                                    );
545
546                                    // Send notification event
547                                    if let Err(e) = events_tx.send(NodeEvent::UpgradeAvailable {
548                                        version: upgrade_info.version.to_string(),
549                                    }) {
550                                        warn!("Failed to send UpgradeAvailable event: {e}");
551                                    }
552
553                                    // Auto-apply the upgrade
554                                    info!("Starting auto-apply upgrade...");
555                                    match upgrader.apply_upgrade(&upgrade_info).await {
556                                        Ok(UpgradeResult::Success { version, exit_code }) => {
557                                            info!("Upgrade to {} successful, initiating graceful shutdown", version);
558                                            upgrade_exit_code.store(exit_code, Ordering::SeqCst);
559                                            shutdown.cancel();
560                                            break;
561                                        }
562                                        Ok(UpgradeResult::RolledBack { reason }) => {
563                                            warn!("Error during upgrade process: {}", reason);
564                                        }
565                                        Ok(UpgradeResult::NoUpgrade) => {
566                                            info!("Already running latest version");
567                                        }
568                                        Err(e) => {
569                                            error!("Error during upgrade process: {}", e);
570                                        }
571                                    }
572                                }
573                                Ok(None) => {
574                                    if let Some(remaining) = monitor.time_until_upgrade() {
575                                        info!(
576                                            "Upgrade pending, rollout delay remaining: {}m {}s",
577                                            remaining.as_secs() / 60,
578                                            remaining.as_secs() % 60
579                                        );
580                                    } else {
581                                        info!("No upgrade available");
582                                    }
583                                }
584                                Err(e) => {
585                                    warn!("Error during upgrade process: {}", e);
586                                }
587                            }
588                            // Schedule next check with jitter to prevent fleet re-alignment
589                            let jittered_duration =
590                                jittered_interval(monitor.check_interval());
591                            let next_check = chrono::Utc::now()
592                                + chrono::Duration::from_std(jittered_duration).unwrap_or_else(|e| {
593                                    warn!("chrono::Duration::from_std failed for interval ({e}), defaulting to 1 hour");
594                                    chrono::Duration::hours(1)
595                                });
596                            info!("Next upgrade check scheduled for {}", next_check.to_rfc3339());
597                            tokio::time::sleep(jittered_duration).await;
598                        }
599                    }
600                }
601            });
602        }
603
604        info!("Node running, waiting for shutdown signal");
605
606        // Run the main event loop with signal handling
607        self.run_event_loop().await?;
608
609        // Log bootstrap cache stats before shutdown
610        if let Some(ref manager) = self.bootstrap_manager {
611            let stats = manager.stats().await;
612            info!(
613                "Bootstrap cache shutdown: {} peers, avg quality {:.2}",
614                stats.total_peers, stats.average_quality
615            );
616        }
617
618        // Stop protocol routing task
619        if let Some(handle) = self.protocol_task.take() {
620            handle.abort();
621        }
622
623        // Shutdown P2P node
624        info!("Shutting down P2P node...");
625        if let Err(e) = self.p2p_node.shutdown().await {
626            warn!("Error during P2P node shutdown: {e}");
627        }
628
629        if let Err(e) = self.events_tx.send(NodeEvent::ShuttingDown) {
630            warn!("Failed to send ShuttingDown event: {e}");
631        }
632        info!("Node shutdown complete");
633
634        // If an upgrade triggered the shutdown, exit with the requested code.
635        // This happens *after* all cleanup (P2P shutdown, log flush, etc.) so
636        // that destructors and async resources are properly torn down.
637        let exit_code = self.upgrade_exit_code.load(Ordering::SeqCst);
638        if exit_code >= 0 {
639            info!("Exiting with code {} for upgrade restart", exit_code);
640            std::process::exit(exit_code);
641        }
642
643        Ok(())
644    }
645
646    /// Run the main event loop, handling shutdown and signals.
647    #[cfg(unix)]
648    async fn run_event_loop(&self) -> Result<()> {
649        let mut sigterm = signal(SignalKind::terminate())?;
650        let mut sighup = signal(SignalKind::hangup())?;
651
652        loop {
653            tokio::select! {
654                () = self.shutdown.cancelled() => {
655                    info!("Shutdown signal received");
656                    break;
657                }
658                _ = tokio::signal::ctrl_c() => {
659                    info!("Received SIGINT (Ctrl-C), initiating shutdown");
660                    self.shutdown();
661                    break;
662                }
663                _ = sigterm.recv() => {
664                    info!("Received SIGTERM, initiating shutdown");
665                    self.shutdown();
666                    break;
667                }
668                _ = sighup.recv() => {
669                    info!("Received SIGHUP (config reload not yet supported)");
670                }
671            }
672        }
673        Ok(())
674    }
675
676    /// Run the main event loop, handling shutdown signals (non-Unix version).
677    #[cfg(not(unix))]
678    async fn run_event_loop(&self) -> Result<()> {
679        loop {
680            tokio::select! {
681                () = self.shutdown.cancelled() => {
682                    info!("Shutdown signal received");
683                    break;
684                }
685                _ = tokio::signal::ctrl_c() => {
686                    info!("Received Ctrl-C, initiating shutdown");
687                    self.shutdown();
688                    break;
689                }
690            }
691        }
692        Ok(())
693    }
694
695    /// Start the protocol message routing background task.
696    ///
697    /// Subscribes to P2P events and routes incoming chunk protocol messages
698    /// to the `AntProtocol` handler, sending responses back to the sender.
699    fn start_protocol_routing(&mut self) {
700        let protocol = match self.ant_protocol {
701            Some(ref p) => Arc::clone(p),
702            None => return,
703        };
704
705        let mut events = self.p2p_node.subscribe_events();
706        let p2p = Arc::clone(&self.p2p_node);
707        let semaphore = Arc::new(Semaphore::new(64));
708
709        self.protocol_task = Some(tokio::spawn(async move {
710            while let Ok(event) = events.recv().await {
711                if let P2PEvent::Message {
712                    topic,
713                    source: Some(source),
714                    data,
715                } = event
716                {
717                    let handler_info: Option<(&str, &str)> = if topic == CHUNK_PROTOCOL_ID {
718                        Some(("chunk", CHUNK_PROTOCOL_ID))
719                    } else {
720                        None
721                    };
722
723                    if let Some((data_type, response_topic)) = handler_info {
724                        debug!("Received {data_type} protocol message from {source}");
725                        let protocol = Arc::clone(&protocol);
726                        let p2p = Arc::clone(&p2p);
727                        let sem = semaphore.clone();
728                        tokio::spawn(async move {
729                            let Ok(_permit) = sem.acquire().await else {
730                                return;
731                            };
732                            let result = match data_type {
733                                "chunk" => protocol.try_handle_request(&data).await,
734                                _ => return,
735                            };
736                            match result {
737                                Ok(Some(response)) => {
738                                    if let Err(e) = p2p
739                                        .send_message(
740                                            &source,
741                                            response_topic,
742                                            response.to_vec(),
743                                            &[],
744                                        )
745                                        .await
746                                    {
747                                        warn!("Failed to send {data_type} protocol response to {source}: {e}");
748                                    }
749                                }
750                                Ok(None) => {}
751                                Err(e) => {
752                                    warn!("{data_type} protocol handler error: {e}");
753                                }
754                            }
755                        });
756                    }
757                }
758            }
759        }));
760        info!("Protocol message routing started");
761    }
762
763    /// Request the node to shut down.
764    pub fn shutdown(&self) {
765        self.shutdown.cancel();
766    }
767}
768
769/// Apply ±5% jitter to a base interval to prevent thundering-herd behaviour
770/// when multiple nodes check for upgrades on the same schedule.
771fn jittered_interval(base: std::time::Duration) -> std::time::Duration {
772    let secs = base.as_secs();
773    let variance = secs / 20; // 5%
774    if variance == 0 {
775        return base;
776    }
777    let jitter = rand::thread_rng().gen_range(0..=variance * 2);
778    std::time::Duration::from_secs(secs.saturating_sub(variance) + jitter)
779}
780
781#[cfg(test)]
782#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
783mod tests {
784    use super::*;
785    use crate::config::NODES_SUBDIR;
786
787    #[test]
788    fn test_build_upgrade_monitor_staged_rollout_enabled() {
789        let config = NodeConfig {
790            upgrade: crate::config::UpgradeConfig {
791                staged_rollout_hours: 24,
792                ..Default::default()
793            },
794            ..Default::default()
795        };
796        let seed = b"node-seed";
797
798        let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
799        assert!(monitor.has_staged_rollout());
800    }
801
802    #[test]
803    fn test_build_upgrade_monitor_staged_rollout_disabled() {
804        let config = NodeConfig {
805            upgrade: crate::config::UpgradeConfig {
806                staged_rollout_hours: 0,
807                ..Default::default()
808            },
809            ..Default::default()
810        };
811        let seed = b"node-seed";
812
813        let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
814        assert!(!monitor.has_staged_rollout());
815    }
816
817    #[test]
818    fn test_build_core_config_sets_production_mode() {
819        let config = NodeConfig {
820            network_mode: NetworkMode::Production,
821            ..Default::default()
822        };
823        let core = NodeBuilder::build_core_config(&config).expect("core config");
824        assert!(core.diversity_config.is_some());
825    }
826
827    #[test]
828    fn test_build_core_config_sets_development_mode_relaxed() {
829        let config = NodeConfig {
830            network_mode: NetworkMode::Development,
831            ..Default::default()
832        };
833        let core = NodeBuilder::build_core_config(&config).expect("core config");
834        let diversity = core.diversity_config.expect("diversity");
835        assert!(diversity.is_relaxed());
836    }
837
838    #[test]
839    fn test_scan_identity_dirs_empty_dir() {
840        let tmp = tempfile::tempdir().unwrap();
841        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
842        assert!(dirs.is_empty());
843    }
844
845    #[test]
846    fn test_scan_identity_dirs_nonexistent_dir() {
847        let tmp = tempfile::tempdir().unwrap();
848        let path = tmp.path().join("nonexistent_identity_dir");
849        let dirs = NodeBuilder::scan_identity_dirs(&path).unwrap();
850        assert!(dirs.is_empty());
851    }
852
853    #[test]
854    fn test_scan_identity_dirs_finds_one() {
855        let tmp = tempfile::tempdir().unwrap();
856        let node_dir = tmp.path().join("abc123");
857        std::fs::create_dir_all(&node_dir).unwrap();
858        std::fs::write(node_dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
859
860        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
861        assert_eq!(dirs.len(), 1);
862        assert_eq!(dirs[0], node_dir);
863    }
864
865    #[test]
866    fn test_scan_identity_dirs_finds_multiple() {
867        let tmp = tempfile::tempdir().unwrap();
868        for name in &["node_a", "node_b"] {
869            let dir = tmp.path().join(name);
870            std::fs::create_dir_all(&dir).unwrap();
871            std::fs::write(dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
872        }
873        // A directory without a key file should be ignored
874        std::fs::create_dir_all(tmp.path().join("no_key")).unwrap();
875
876        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
877        assert_eq!(dirs.len(), 2);
878    }
879
880    #[tokio::test]
881    async fn test_resolve_identity_first_run_creates_identity() {
882        let tmp = tempfile::tempdir().unwrap();
883        let mut config = NodeConfig {
884            root_dir: tmp.path().to_path_buf(),
885            ..Default::default()
886        };
887
888        let identity = NodeBuilder::resolve_identity(&mut config).await.unwrap();
889        // Key file should exist
890        assert!(tmp.path().join(NODE_IDENTITY_FILENAME).exists());
891        // peer_id should be derivable from the identity
892        let peer_id = identity.peer_id().to_hex();
893        assert_eq!(peer_id.len(), 64); // 32 bytes hex-encoded
894    }
895
896    #[tokio::test]
897    async fn test_resolve_identity_loads_existing() {
898        let tmp = tempfile::tempdir().unwrap();
899
900        // Generate and save an identity
901        let original = NodeIdentity::generate().unwrap();
902        original
903            .save_to_file(&tmp.path().join(NODE_IDENTITY_FILENAME))
904            .await
905            .unwrap();
906
907        let mut config = NodeConfig {
908            root_dir: tmp.path().to_path_buf(),
909            ..Default::default()
910        };
911
912        let loaded = NodeBuilder::resolve_identity(&mut config).await.unwrap();
913        assert_eq!(loaded.peer_id(), original.peer_id());
914    }
915
916    #[test]
917    fn test_peer_id_hex_length() {
918        let id = saorsa_core::identity::PeerId::from_bytes([0x42; 32]);
919        let hex = id.to_hex();
920        assert_eq!(hex.len(), 64); // 32 bytes = 64 hex chars
921    }
922
923    /// Simulates a node restart: first run creates identity in a scoped subdir
924    /// under `nodes/`, second run discovers and reloads it — `peer_id` must be
925    /// identical and the directory name is the full 64-char hex peer ID.
926    #[tokio::test]
927    async fn test_identity_persisted_across_restarts() {
928        let base_dir = tempfile::tempdir().unwrap();
929        let nodes_dir = base_dir.path().join(NODES_SUBDIR);
930
931        // First "boot": generate identity, save it in nodes/{peer_id}/
932        let identity1 = NodeIdentity::generate().unwrap();
933        let peer_id1 = identity1.peer_id().to_hex();
934        let peer_dir = nodes_dir.join(&peer_id1);
935        std::fs::create_dir_all(&peer_dir).unwrap();
936        identity1
937            .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
938            .await
939            .unwrap();
940
941        // Verify directory name is the full 64-char hex peer ID
942        assert_eq!(peer_id1.len(), 64);
943        assert_eq!(peer_dir.file_name().unwrap().to_string_lossy(), peer_id1);
944
945        // Second "boot": scan should find and reload the same identity
946        let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
947        assert_eq!(identity_dirs.len(), 1);
948        let loaded = NodeIdentity::load_from_file(&identity_dirs[0].join(NODE_IDENTITY_FILENAME))
949            .await
950            .unwrap();
951        let peer_id2 = loaded.peer_id().to_hex();
952
953        assert_eq!(peer_id1, peer_id2, "peer_id must survive restart");
954        assert_eq!(
955            identity_dirs[0], peer_dir,
956            "root_dir must be the same directory"
957        );
958    }
959
960    /// When two identity subdirs exist under `nodes/`, the scan finds multiple
961    /// and the resolve path would error asking for `--root-dir`.
962    #[tokio::test]
963    async fn test_multiple_identities_errors() {
964        let base_dir = tempfile::tempdir().unwrap();
965        let nodes_dir = base_dir.path().join(NODES_SUBDIR);
966
967        // Create two identity subdirectories under nodes/
968        for name in &["aaaa", "bbbb"] {
969            let dir = nodes_dir.join(name);
970            std::fs::create_dir_all(&dir).unwrap();
971            let identity = NodeIdentity::generate().unwrap();
972            identity
973                .save_to_file(&dir.join(NODE_IDENTITY_FILENAME))
974                .await
975                .unwrap();
976        }
977
978        let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
979        assert_eq!(identity_dirs.len(), 2, "should find both identity dirs");
980    }
981
982    /// With a non-default `root_dir` (explicit path), the identity is created on
983    /// first run and reloaded on subsequent runs from the same directory.
984    #[tokio::test]
985    async fn test_explicit_root_dir_persists_across_restarts() {
986        let tmp = tempfile::tempdir().unwrap();
987
988        // First boot — non-default root_dir triggers explicit path
989        let mut config1 = NodeConfig {
990            root_dir: tmp.path().to_path_buf(),
991            ..Default::default()
992        };
993        let identity1 = NodeBuilder::resolve_identity(&mut config1).await.unwrap();
994
995        // Second boot — same dir
996        let mut config2 = NodeConfig {
997            root_dir: tmp.path().to_path_buf(),
998            ..Default::default()
999        };
1000        let identity2 = NodeBuilder::resolve_identity(&mut config2).await.unwrap();
1001
1002        assert_eq!(
1003            identity1.peer_id(),
1004            identity2.peer_id(),
1005            "explicit --root-dir must yield stable identity"
1006        );
1007    }
1008}