Skip to main content

ant_node/
node.rs

1//! Node implementation - thin wrapper around saorsa-core's `P2PNode`.
2
3use crate::ant_protocol::{CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE};
4use crate::config::{
5    default_nodes_dir, default_root_dir, EvmNetworkConfig, NetworkMode, NodeConfig,
6    NODE_IDENTITY_FILENAME,
7};
8use crate::error::{Error, Result};
9use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender};
10use crate::payment::metrics::QuotingMetricsTracker;
11use crate::payment::wallet::parse_rewards_address;
12use crate::payment::{EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
13use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
14use crate::upgrade::{
15    upgrade_cache_dir, AutoApplyUpgrader, BinaryCache, ReleaseCache, UpgradeMonitor, UpgradeResult,
16};
17use evmlib::Network as EvmNetwork;
18use rand::Rng;
19use saorsa_core::identity::NodeIdentity;
20use saorsa_core::{
21    BootstrapConfig as CoreBootstrapConfig, BootstrapManager,
22    IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent,
23    P2PNode,
24};
25use std::path::PathBuf;
26use std::sync::atomic::{AtomicI32, Ordering};
27use std::sync::Arc;
28use tokio::sync::Semaphore;
29use tokio::task::JoinHandle;
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, info, warn};
32
33/// Node storage capacity limit (5 GB).
34///
35/// Used to derive `max_records` for the quoting metrics pricing curve.
36/// A node advertises `NODE_STORAGE_LIMIT_BYTES / MAX_CHUNK_SIZE` as
37/// its maximum record count, giving the pricing algorithm a meaningful
38/// fullness ratio instead of a hardcoded constant.
39pub const NODE_STORAGE_LIMIT_BYTES: u64 = 5 * 1024 * 1024 * 1024;
40
41#[cfg(unix)]
42use tokio::signal::unix::{signal, SignalKind};
43
44/// Builder for constructing an Ant node.
45pub struct NodeBuilder {
46    config: NodeConfig,
47}
48
49impl NodeBuilder {
50    /// Create a new node builder with the given configuration.
51    #[must_use]
52    pub fn new(config: NodeConfig) -> Self {
53        Self { config }
54    }
55
56    /// Build and start the node.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if the node fails to start.
61    pub async fn build(mut self) -> Result<RunningNode> {
62        info!("Building ant-node with config: {:?}", self.config);
63
64        // Validate rewards address in production
65        if self.config.network_mode == NetworkMode::Production {
66            match self.config.payment.rewards_address {
67                None => {
68                    return Err(Error::Config(
69                        "CRITICAL: Rewards address is not configured. \
70                         Set payment.rewards_address in config to your Arbitrum wallet address."
71                            .to_string(),
72                    ));
73                }
74                Some(ref addr) if addr == "0xYOUR_ARBITRUM_ADDRESS_HERE" || addr.is_empty() => {
75                    return Err(Error::Config(
76                        "CRITICAL: Rewards address is not configured. \
77                         Set payment.rewards_address in config to your Arbitrum wallet address."
78                            .to_string(),
79                    ));
80                }
81                Some(_) => {}
82            }
83        }
84
85        // Resolve identity and root_dir (may update self.config.root_dir)
86        let identity = Arc::new(Self::resolve_identity(&mut self.config).await?);
87        let peer_id = identity.peer_id().to_hex();
88
89        info!(peer_id = %peer_id, root_dir = %self.config.root_dir.display(), "Node identity resolved");
90
91        // Ensure root directory exists
92        std::fs::create_dir_all(&self.config.root_dir)?;
93
94        // Create shutdown token
95        let shutdown = CancellationToken::new();
96
97        // Create event channel
98        let (events_tx, events_rx) = create_event_channel();
99
100        // Convert our config to saorsa-core's config
101        let mut core_config = Self::build_core_config(&self.config)?;
102        // Inject the ML-DSA identity so the P2PNode's transport peer ID
103        // matches the pub_key embedded in payment quotes.
104        core_config.node_identity = Some(Arc::clone(&identity));
105        debug!("Core config: {:?}", core_config);
106
107        // Initialize saorsa-core's P2PNode
108        let p2p_node = P2PNode::new(core_config)
109            .await
110            .map_err(|e| Error::Startup(format!("Failed to create P2P node: {e}")))?;
111
112        // Create upgrade monitor
113        let upgrade_monitor = {
114            let node_id_seed = p2p_node.peer_id().as_bytes();
115            Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
116        };
117
118        // Initialize bootstrap cache manager if enabled
119        let bootstrap_manager = if self.config.bootstrap_cache.enabled {
120            Self::build_bootstrap_manager(&self.config).await
121        } else {
122            info!("Bootstrap cache disabled");
123            None
124        };
125
126        // Initialize ANT protocol handler for chunk storage
127        let ant_protocol = if self.config.storage.enabled {
128            Some(Arc::new(
129                Self::build_ant_protocol(&self.config, &identity).await?,
130            ))
131        } else {
132            info!("Chunk storage disabled");
133            None
134        };
135
136        let node = RunningNode {
137            config: self.config,
138            p2p_node: Arc::new(p2p_node),
139            shutdown,
140            events_tx,
141            events_rx: Some(events_rx),
142            upgrade_monitor,
143            bootstrap_manager,
144            ant_protocol,
145            protocol_task: None,
146            upgrade_exit_code: Arc::new(AtomicI32::new(-1)),
147        };
148
149        Ok(node)
150    }
151
152    /// Build the saorsa-core `NodeConfig` from our config.
153    fn build_core_config(config: &NodeConfig) -> Result<CoreNodeConfig> {
154        let local = matches!(config.network_mode, NetworkMode::Development);
155
156        let mut core_config = CoreNodeConfig::builder()
157            .port(config.port)
158            .ipv6(!config.ipv4_only)
159            .local(local)
160            .max_message_size(config.max_message_size)
161            .build()
162            .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?;
163
164        // Add bootstrap peers.
165        core_config.bootstrap_peers = config
166            .bootstrap
167            .iter()
168            .map(|addr| MultiAddr::quic(*addr))
169            .collect();
170
171        // Propagate network-mode tuning into saorsa-core where supported.
172        match config.network_mode {
173            NetworkMode::Production => {
174                core_config.diversity_config = Some(CoreDiversityConfig::default());
175            }
176            NetworkMode::Testnet => {
177                // Testnet allows loopback so nodes can be co-located on one machine.
178                core_config.allow_loopback = true;
179                core_config.diversity_config = Some(CoreDiversityConfig {
180                    max_per_ip: config.testnet.max_per_ip,
181                    max_per_subnet: config.testnet.max_per_subnet,
182                });
183            }
184            NetworkMode::Development => {
185                core_config.diversity_config = Some(CoreDiversityConfig::permissive());
186            }
187        }
188
189        // Persist close group peers + trust scores across restarts.
190        // Default to root_dir (alongside node_identity.key) when not explicitly set.
191        core_config.close_group_cache_dir = Some(
192            config
193                .close_group_cache_dir
194                .clone()
195                .unwrap_or_else(|| config.root_dir.clone()),
196        );
197
198        Ok(core_config)
199    }
200
201    /// Resolve the node identity from disk or generate a new one.
202    ///
203    /// **When `root_dir` differs from the platform default** (set via `--root-dir`
204    /// or loaded from `config.toml`):
205    ///   - Use `root_dir` directly: load existing identity or generate a new one.
206    ///
207    /// **When `root_dir` is the platform default** (first run, no config file):
208    ///   1. Scan `{default_root_dir}/nodes/` for subdirectories containing
209    ///      `node_identity.key`.
210    ///   2. **None found** — first run: generate identity, create
211    ///      `nodes/{full_peer_id}/`, save identity there, update `config.root_dir`.
212    ///   3. **Exactly one found** — load it and update `config.root_dir`.
213    ///   4. **Multiple found** — return an error asking for `--root-dir`.
214    async fn resolve_identity(config: &mut NodeConfig) -> Result<NodeIdentity> {
215        if config.root_dir != default_root_dir() {
216            return Self::load_or_generate_identity(&config.root_dir).await;
217        }
218
219        let nodes_dir = default_nodes_dir();
220        let identity_dirs = Self::scan_identity_dirs(&nodes_dir)?;
221
222        match identity_dirs.len() {
223            0 => {
224                // First run: generate new identity and create a peer-id-scoped subdirectory
225                let identity = NodeIdentity::generate().map_err(|e| {
226                    Error::Startup(format!("Failed to generate node identity: {e}"))
227                })?;
228                let peer_id = identity.peer_id().to_hex();
229                let peer_dir = nodes_dir.join(&peer_id);
230                std::fs::create_dir_all(&peer_dir)?;
231                identity
232                    .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
233                    .await
234                    .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
235                config.root_dir = peer_dir;
236                Ok(identity)
237            }
238            1 => {
239                let dir = identity_dirs
240                    .first()
241                    .ok_or_else(|| Error::Config("No identity dirs found".to_string()))?;
242                let identity = NodeIdentity::load_from_file(&dir.join(NODE_IDENTITY_FILENAME))
243                    .await
244                    .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))?;
245                config.root_dir.clone_from(dir);
246                Ok(identity)
247            }
248            _ => {
249                let dirs: Vec<String> = identity_dirs
250                    .iter()
251                    .filter_map(|d| d.file_name().map(|n| n.to_string_lossy().into_owned()))
252                    .collect();
253                Err(Error::Config(format!(
254                    "Multiple node identities found at {}: [{}]. Specify --root-dir to select one.",
255                    nodes_dir.display(),
256                    dirs.join(", ")
257                )))
258            }
259        }
260    }
261
262    /// Load an existing identity from `dir/node_identity.key`, or generate and save a new one.
263    async fn load_or_generate_identity(dir: &std::path::Path) -> Result<NodeIdentity> {
264        let key_path = dir.join(NODE_IDENTITY_FILENAME);
265        if key_path.exists() {
266            NodeIdentity::load_from_file(&key_path)
267                .await
268                .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))
269        } else {
270            let identity = NodeIdentity::generate()
271                .map_err(|e| Error::Startup(format!("Failed to generate node identity: {e}")))?;
272            std::fs::create_dir_all(dir)?;
273            identity
274                .save_to_file(&key_path)
275                .await
276                .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
277            Ok(identity)
278        }
279    }
280
281    /// Scan `base_dir` for immediate subdirectories that contain `node_identity.key`.
282    fn scan_identity_dirs(base_dir: &std::path::Path) -> Result<Vec<PathBuf>> {
283        let mut dirs = Vec::new();
284        let read_dir = match std::fs::read_dir(base_dir) {
285            Ok(rd) => rd,
286            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(dirs),
287            Err(e) => return Err(e.into()),
288        };
289        for entry in read_dir {
290            let entry = entry?;
291            let path = entry.path();
292            if path.is_dir() && path.join(NODE_IDENTITY_FILENAME).exists() {
293                dirs.push(path);
294            }
295        }
296        Ok(dirs)
297    }
298
299    fn build_upgrade_monitor(config: &NodeConfig, node_id_seed: &[u8]) -> UpgradeMonitor {
300        let mut monitor = UpgradeMonitor::new(
301            config.upgrade.github_repo.clone(),
302            config.upgrade.channel,
303            config.upgrade.check_interval_hours,
304        );
305
306        if let Ok(cache_dir) = upgrade_cache_dir() {
307            monitor = monitor.with_release_cache(ReleaseCache::new(
308                cache_dir,
309                std::time::Duration::from_secs(3600),
310            ));
311        }
312
313        if config.upgrade.staged_rollout_hours > 0 {
314            monitor =
315                monitor.with_staged_rollout(node_id_seed, config.upgrade.staged_rollout_hours);
316        }
317
318        monitor
319    }
320
321    /// Build the ANT protocol handler from config.
322    ///
323    /// Initializes LMDB storage, payment verifier, and quote generator.
324    /// Wires ML-DSA-65 signing from the node's identity into the quote generator.
325    async fn build_ant_protocol(
326        config: &NodeConfig,
327        identity: &NodeIdentity,
328    ) -> Result<AntProtocol> {
329        // Create LMDB storage
330        let storage_config = LmdbStorageConfig {
331            root_dir: config.root_dir.clone(),
332            verify_on_read: config.storage.verify_on_read,
333            max_chunks: config.storage.max_chunks,
334            max_map_size: config.storage.db_size_gb.saturating_mul(1_073_741_824),
335        };
336        let storage = LmdbStorage::new(storage_config)
337            .await
338            .map_err(|e| Error::Startup(format!("Failed to create LMDB storage: {e}")))?;
339
340        // Parse rewards address (required — node must know where to receive payments)
341        let rewards_address = match config.payment.rewards_address {
342            Some(ref addr) => parse_rewards_address(addr)?,
343            None => {
344                return Err(Error::Startup(
345                    "No rewards address configured. Set --rewards-address or payment.rewards_address in config.".to_string(),
346                ));
347            }
348        };
349
350        // Create payment verifier
351        let evm_network = match config.payment.evm_network {
352            EvmNetworkConfig::ArbitrumOne => EvmNetwork::ArbitrumOne,
353            EvmNetworkConfig::ArbitrumSepolia => EvmNetwork::ArbitrumSepoliaTest,
354        };
355        let payment_config = PaymentVerifierConfig {
356            evm: EvmVerifierConfig {
357                network: evm_network,
358            },
359            cache_capacity: config.payment.cache_capacity,
360            local_rewards_address: rewards_address,
361        };
362        let payment_verifier = PaymentVerifier::new(payment_config);
363        // Safe: 5GB fits in usize on all supported 64-bit platforms.
364        #[allow(clippy::cast_possible_truncation)]
365        let max_records = (NODE_STORAGE_LIMIT_BYTES as usize) / MAX_CHUNK_SIZE;
366        let metrics_tracker = QuotingMetricsTracker::new(max_records, 0);
367        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
368
369        // Wire ML-DSA-65 signing from node identity.
370        // This same signer is used for both regular quotes and merkle candidate quotes.
371        crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)?;
372
373        let protocol = AntProtocol::new(
374            Arc::new(storage),
375            Arc::new(payment_verifier),
376            Arc::new(quote_generator),
377        );
378
379        info!(
380            "ANT protocol handler initialized with ML-DSA-65 signing (protocol={CHUNK_PROTOCOL_ID})"
381        );
382
383        Ok(protocol)
384    }
385
386    /// Build the bootstrap cache manager from config.
387    async fn build_bootstrap_manager(config: &NodeConfig) -> Option<BootstrapManager> {
388        let cache_dir = config
389            .bootstrap_cache
390            .cache_dir
391            .clone()
392            .unwrap_or_else(|| config.root_dir.join("bootstrap_cache"));
393
394        // Create cache directory
395        if let Err(e) = std::fs::create_dir_all(&cache_dir) {
396            warn!("Failed to create bootstrap cache directory: {e}");
397            return None;
398        }
399
400        let bootstrap_config = CoreBootstrapConfig {
401            cache_dir,
402            max_peers: config.bootstrap_cache.max_contacts,
403            ..CoreBootstrapConfig::default()
404        };
405
406        match BootstrapManager::with_config(bootstrap_config).await {
407            Ok(manager) => {
408                info!(
409                    "Bootstrap cache initialized with {} max contacts",
410                    config.bootstrap_cache.max_contacts
411                );
412                Some(manager)
413            }
414            Err(e) => {
415                warn!("Failed to initialize bootstrap cache: {e}");
416                None
417            }
418        }
419    }
420}
421
422/// A running Ant node.
423pub struct RunningNode {
424    config: NodeConfig,
425    p2p_node: Arc<P2PNode>,
426    shutdown: CancellationToken,
427    events_tx: NodeEventsSender,
428    events_rx: Option<NodeEventsChannel>,
429    upgrade_monitor: Option<UpgradeMonitor>,
430    /// Bootstrap cache manager for persistent peer storage.
431    bootstrap_manager: Option<BootstrapManager>,
432    /// ANT protocol handler for chunk storage.
433    ant_protocol: Option<Arc<AntProtocol>>,
434    /// Protocol message routing background task.
435    protocol_task: Option<JoinHandle<()>>,
436    /// Exit code requested by a successful upgrade (-1 = no upgrade exit pending).
437    upgrade_exit_code: Arc<AtomicI32>,
438}
439
440impl RunningNode {
441    /// Get the node's root directory.
442    #[must_use]
443    pub fn root_dir(&self) -> &PathBuf {
444        &self.config.root_dir
445    }
446
447    /// Get a receiver for node events.
448    ///
449    /// Note: Can only be called once. Subsequent calls return None.
450    pub fn events(&mut self) -> Option<NodeEventsChannel> {
451        self.events_rx.take()
452    }
453
454    /// Subscribe to node events.
455    #[must_use]
456    pub fn subscribe_events(&self) -> NodeEventsChannel {
457        self.events_tx.subscribe()
458    }
459
460    /// Run the node until shutdown is requested.
461    ///
462    /// # Errors
463    ///
464    /// Returns an error if the node encounters a fatal error.
465    #[allow(clippy::too_many_lines)]
466    pub async fn run(&mut self) -> Result<()> {
467        info!("Node runtime loop starting");
468
469        // Start the P2P node
470        self.p2p_node
471            .start()
472            .await
473            .map_err(|e| Error::Startup(format!("Failed to start P2P node: {e}")))?;
474
475        let listen_addrs = self.p2p_node.listen_addrs().await;
476        info!(listen_addrs = ?listen_addrs, "P2P node started");
477
478        // Extract the actual bound port (config port may be 0 = auto-select)
479        let actual_port = listen_addrs
480            .first()
481            .and_then(MultiAddr::port)
482            .unwrap_or(self.config.port);
483        info!(
484            port = actual_port,
485            "Node is running on port: {}", actual_port
486        );
487
488        // Emit started event
489        if let Err(e) = self.events_tx.send(NodeEvent::Started) {
490            warn!("Failed to send Started event: {e}");
491        }
492
493        // Start protocol message routing (P2P → AntProtocol → P2P response)
494        self.start_protocol_routing();
495
496        // Start upgrade monitor if enabled
497        if let Some(monitor) = self.upgrade_monitor.take() {
498            let events_tx = self.events_tx.clone();
499            let shutdown = self.shutdown.clone();
500            let stop_on_upgrade = self.config.upgrade.stop_on_upgrade;
501            let upgrade_exit_code = Arc::clone(&self.upgrade_exit_code);
502
503            tokio::spawn(async move {
504                let mut monitor = monitor;
505                let mut upgrader = AutoApplyUpgrader::new().with_stop_on_upgrade(stop_on_upgrade);
506                if let Ok(cache_dir) = upgrade_cache_dir() {
507                    upgrader = upgrader.with_binary_cache(BinaryCache::new(cache_dir));
508                }
509
510                // Add randomized jitter before the first upgrade check to prevent all nodes
511                // from hitting the GitHub API simultaneously when started together.
512                {
513                    let jitter_duration = jittered_interval(monitor.check_interval());
514                    let first_check_time = chrono::Utc::now()
515                        + chrono::Duration::from_std(jitter_duration).unwrap_or_else(|e| {
516                            warn!("chrono::Duration::from_std failed for jitter ({e}), defaulting to 1 minute");
517                            chrono::Duration::minutes(1)
518                        });
519                    info!(
520                        "First upgrade check scheduled for {} (jitter: {}s)",
521                        first_check_time.to_rfc3339(),
522                        jitter_duration.as_secs()
523                    );
524                    tokio::time::sleep(jitter_duration).await;
525                }
526
527                loop {
528                    tokio::select! {
529                        () = shutdown.cancelled() => {
530                            break;
531                        }
532                        result = monitor.check_for_ready_upgrade() => {
533                            match result {
534                                Ok(Some(upgrade_info)) => {
535                                    info!(
536                                        current_version = %upgrader.current_version(),
537                                        new_version = %upgrade_info.version,
538                                        "Upgrade available"
539                                    );
540
541                                    // Send notification event
542                                    if let Err(e) = events_tx.send(NodeEvent::UpgradeAvailable {
543                                        version: upgrade_info.version.to_string(),
544                                    }) {
545                                        warn!("Failed to send UpgradeAvailable event: {e}");
546                                    }
547
548                                    // Auto-apply the upgrade
549                                    info!("Starting auto-apply upgrade...");
550                                    match upgrader.apply_upgrade(&upgrade_info).await {
551                                        Ok(UpgradeResult::Success { version, exit_code }) => {
552                                            info!("Upgrade to {} successful, initiating graceful shutdown", version);
553                                            upgrade_exit_code.store(exit_code, Ordering::SeqCst);
554                                            shutdown.cancel();
555                                            break;
556                                        }
557                                        Ok(UpgradeResult::RolledBack { reason }) => {
558                                            warn!("Error during upgrade process: {}", reason);
559                                        }
560                                        Ok(UpgradeResult::NoUpgrade) => {
561                                            info!("Already running latest version");
562                                        }
563                                        Err(e) => {
564                                            error!("Error during upgrade process: {}", e);
565                                        }
566                                    }
567                                }
568                                Ok(None) => {
569                                    if let Some(remaining) = monitor.time_until_upgrade() {
570                                        info!(
571                                            "Upgrade pending, rollout delay remaining: {}m {}s",
572                                            remaining.as_secs() / 60,
573                                            remaining.as_secs() % 60
574                                        );
575                                    } else {
576                                        info!("No upgrade available");
577                                    }
578                                }
579                                Err(e) => {
580                                    warn!("Error during upgrade process: {}", e);
581                                }
582                            }
583                            // If an upgrade is pending, sleep for exactly the remaining
584                            // rollout delay so the node restarts at its scheduled time
585                            // rather than waiting for the next check interval tick.
586                            let sleep_duration = monitor.time_until_upgrade().map_or_else(
587                                || {
588                                    // No pending upgrade - schedule next check with jitter
589                                    let jittered_duration =
590                                        jittered_interval(monitor.check_interval());
591                                    let next_check = chrono::Utc::now()
592                                        + chrono::Duration::from_std(jittered_duration).unwrap_or_else(|e| {
593                                            warn!("chrono::Duration::from_std failed for interval ({e}), defaulting to 1 hour");
594                                            chrono::Duration::hours(1)
595                                        });
596                                    info!("Next upgrade check scheduled for {}", next_check.to_rfc3339());
597                                    jittered_duration
598                                },
599                                |remaining| {
600                                    // If the rollout delay has fully elapsed but the upgrade was
601                                    // not successfully applied, avoid a tight loop by backing off
602                                    // at least one check interval before retrying.
603                                    if remaining.is_zero() {
604                                        let backoff = jittered_interval(monitor.check_interval());
605                                        let next_check = chrono::Utc::now()
606                                            + chrono::Duration::from_std(backoff).unwrap_or_else(|e| {
607                                                warn!("chrono::Duration::from_std failed for backoff ({e}), defaulting to 1 hour");
608                                                chrono::Duration::hours(1)
609                                            });
610                                        info!(
611                                            "Upgrade rollout delay elapsed but previous apply did not succeed; \
612                                             backing off, next check scheduled for {}",
613                                            next_check.to_rfc3339()
614                                        );
615                                        backoff
616                                    } else {
617                                        let wake_time = chrono::Utc::now()
618                                            + chrono::Duration::from_std(remaining).unwrap_or_else(|e| {
619                                                warn!("chrono::Duration::from_std failed for rollout delay ({e}), defaulting to 1 minute");
620                                                chrono::Duration::minutes(1)
621                                            });
622                                        info!("Will apply upgrade at {}", wake_time.to_rfc3339());
623                                        remaining
624                                    }
625                                },
626                            );
627                            // Use select! so shutdown can interrupt long sleeps
628                            // (e.g. during a full rollout window delay).
629                            tokio::select! {
630                                () = shutdown.cancelled() => {
631                                    break;
632                                }
633                                () = tokio::time::sleep(sleep_duration) => {}
634                            }
635                        }
636                    }
637                }
638            });
639        }
640
641        info!("Node running, waiting for shutdown signal");
642
643        // Run the main event loop with signal handling
644        self.run_event_loop().await?;
645
646        // Log bootstrap cache stats before shutdown
647        if let Some(ref manager) = self.bootstrap_manager {
648            let stats = manager.stats().await;
649            info!(
650                "Bootstrap cache shutdown: {} peers, avg quality {:.2}",
651                stats.total_peers, stats.average_quality
652            );
653        }
654
655        // Stop protocol routing task
656        if let Some(handle) = self.protocol_task.take() {
657            handle.abort();
658        }
659
660        // Shutdown P2P node
661        info!("Shutting down P2P node...");
662        if let Err(e) = self.p2p_node.shutdown().await {
663            warn!("Error during P2P node shutdown: {e}");
664        }
665
666        if let Err(e) = self.events_tx.send(NodeEvent::ShuttingDown) {
667            warn!("Failed to send ShuttingDown event: {e}");
668        }
669        info!("Node shutdown complete");
670
671        // If an upgrade triggered the shutdown, exit with the requested code.
672        // This happens *after* all cleanup (P2P shutdown, log flush, etc.) so
673        // that destructors and async resources are properly torn down.
674        let exit_code = self.upgrade_exit_code.load(Ordering::SeqCst);
675        if exit_code >= 0 {
676            info!("Exiting with code {} for upgrade restart", exit_code);
677            std::process::exit(exit_code);
678        }
679
680        Ok(())
681    }
682
683    /// Run the main event loop, handling shutdown and signals.
684    #[cfg(unix)]
685    async fn run_event_loop(&self) -> Result<()> {
686        let mut sigterm = signal(SignalKind::terminate())?;
687        let mut sighup = signal(SignalKind::hangup())?;
688
689        loop {
690            tokio::select! {
691                () = self.shutdown.cancelled() => {
692                    info!("Shutdown signal received");
693                    break;
694                }
695                _ = tokio::signal::ctrl_c() => {
696                    info!("Received SIGINT (Ctrl-C), initiating shutdown");
697                    self.shutdown();
698                    break;
699                }
700                _ = sigterm.recv() => {
701                    info!("Received SIGTERM, initiating shutdown");
702                    self.shutdown();
703                    break;
704                }
705                _ = sighup.recv() => {
706                    info!("Received SIGHUP (config reload not yet supported)");
707                }
708            }
709        }
710        Ok(())
711    }
712
713    /// Run the main event loop, handling shutdown signals (non-Unix version).
714    #[cfg(not(unix))]
715    async fn run_event_loop(&self) -> Result<()> {
716        loop {
717            tokio::select! {
718                () = self.shutdown.cancelled() => {
719                    info!("Shutdown signal received");
720                    break;
721                }
722                _ = tokio::signal::ctrl_c() => {
723                    info!("Received Ctrl-C, initiating shutdown");
724                    self.shutdown();
725                    break;
726                }
727            }
728        }
729        Ok(())
730    }
731
732    /// Start the protocol message routing background task.
733    ///
734    /// Subscribes to P2P events and routes incoming chunk protocol messages
735    /// to the `AntProtocol` handler, sending responses back to the sender.
736    fn start_protocol_routing(&mut self) {
737        let protocol = match self.ant_protocol {
738            Some(ref p) => Arc::clone(p),
739            None => return,
740        };
741
742        let mut events = self.p2p_node.subscribe_events();
743        let p2p = Arc::clone(&self.p2p_node);
744        let semaphore = Arc::new(Semaphore::new(64));
745
746        self.protocol_task = Some(tokio::spawn(async move {
747            while let Ok(event) = events.recv().await {
748                if let P2PEvent::Message {
749                    topic,
750                    source: Some(source),
751                    data,
752                } = event
753                {
754                    let handler_info: Option<(&str, &str)> = if topic == CHUNK_PROTOCOL_ID {
755                        Some(("chunk", CHUNK_PROTOCOL_ID))
756                    } else {
757                        None
758                    };
759
760                    if let Some((data_type, response_topic)) = handler_info {
761                        debug!("Received {data_type} protocol message from {source}");
762                        let protocol = Arc::clone(&protocol);
763                        let p2p = Arc::clone(&p2p);
764                        let sem = semaphore.clone();
765                        tokio::spawn(async move {
766                            let Ok(_permit) = sem.acquire().await else {
767                                return;
768                            };
769                            let result = match data_type {
770                                "chunk" => protocol.try_handle_request(&data).await,
771                                _ => return,
772                            };
773                            match result {
774                                Ok(Some(response)) => {
775                                    if let Err(e) = p2p
776                                        .send_message(
777                                            &source,
778                                            response_topic,
779                                            response.to_vec(),
780                                            &[],
781                                        )
782                                        .await
783                                    {
784                                        warn!("Failed to send {data_type} protocol response to {source}: {e}");
785                                    }
786                                }
787                                Ok(None) => {}
788                                Err(e) => {
789                                    warn!("{data_type} protocol handler error: {e}");
790                                }
791                            }
792                        });
793                    }
794                }
795            }
796        }));
797        info!("Protocol message routing started");
798    }
799
800    /// Request the node to shut down.
801    pub fn shutdown(&self) {
802        self.shutdown.cancel();
803    }
804}
805
806/// Apply ±5% jitter to a base interval to prevent thundering-herd behaviour
807/// when multiple nodes check for upgrades on the same schedule.
808fn jittered_interval(base: std::time::Duration) -> std::time::Duration {
809    let secs = base.as_secs();
810    let variance = secs / 20; // 5%
811    if variance == 0 {
812        return base;
813    }
814    let jitter = rand::thread_rng().gen_range(0..=variance * 2);
815    std::time::Duration::from_secs(secs.saturating_sub(variance) + jitter)
816}
817
818#[cfg(test)]
819#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
820mod tests {
821    use super::*;
822    use crate::config::NODES_SUBDIR;
823
824    #[test]
825    fn test_build_upgrade_monitor_staged_rollout_enabled() {
826        let config = NodeConfig {
827            upgrade: crate::config::UpgradeConfig {
828                staged_rollout_hours: 24,
829                ..Default::default()
830            },
831            ..Default::default()
832        };
833        let seed = b"node-seed";
834
835        let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
836        assert!(monitor.has_staged_rollout());
837    }
838
839    #[test]
840    fn test_build_upgrade_monitor_staged_rollout_disabled() {
841        let config = NodeConfig {
842            upgrade: crate::config::UpgradeConfig {
843                staged_rollout_hours: 0,
844                ..Default::default()
845            },
846            ..Default::default()
847        };
848        let seed = b"node-seed";
849
850        let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
851        assert!(!monitor.has_staged_rollout());
852    }
853
854    #[test]
855    fn test_build_core_config_sets_production_mode() {
856        let config = NodeConfig {
857            network_mode: NetworkMode::Production,
858            ..Default::default()
859        };
860        let core = NodeBuilder::build_core_config(&config).expect("core config");
861        assert!(core.diversity_config.is_some());
862    }
863
864    #[test]
865    fn test_build_core_config_ipv4_only() {
866        let config = NodeConfig {
867            ipv4_only: true,
868            ..Default::default()
869        };
870        let core = NodeBuilder::build_core_config(&config).expect("core config");
871        assert!(!core.ipv6, "ipv4_only should disable IPv6");
872    }
873
874    #[test]
875    fn test_build_core_config_dual_stack_by_default() {
876        let config = NodeConfig::default();
877        let core = NodeBuilder::build_core_config(&config).expect("core config");
878        assert!(core.ipv6, "dual-stack should be the default");
879    }
880
881    #[test]
882    fn test_build_core_config_sets_development_mode_permissive() {
883        let config = NodeConfig {
884            network_mode: NetworkMode::Development,
885            ..Default::default()
886        };
887        let core = NodeBuilder::build_core_config(&config).expect("core config");
888        let diversity = core.diversity_config.expect("diversity");
889        assert_eq!(diversity.max_per_ip, Some(usize::MAX));
890        assert_eq!(diversity.max_per_subnet, Some(usize::MAX));
891    }
892
893    #[test]
894    fn test_scan_identity_dirs_empty_dir() {
895        let tmp = tempfile::tempdir().unwrap();
896        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
897        assert!(dirs.is_empty());
898    }
899
900    #[test]
901    fn test_scan_identity_dirs_nonexistent_dir() {
902        let tmp = tempfile::tempdir().unwrap();
903        let path = tmp.path().join("nonexistent_identity_dir");
904        let dirs = NodeBuilder::scan_identity_dirs(&path).unwrap();
905        assert!(dirs.is_empty());
906    }
907
908    #[test]
909    fn test_scan_identity_dirs_finds_one() {
910        let tmp = tempfile::tempdir().unwrap();
911        let node_dir = tmp.path().join("abc123");
912        std::fs::create_dir_all(&node_dir).unwrap();
913        std::fs::write(node_dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
914
915        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
916        assert_eq!(dirs.len(), 1);
917        assert_eq!(dirs[0], node_dir);
918    }
919
920    #[test]
921    fn test_scan_identity_dirs_finds_multiple() {
922        let tmp = tempfile::tempdir().unwrap();
923        for name in &["node_a", "node_b"] {
924            let dir = tmp.path().join(name);
925            std::fs::create_dir_all(&dir).unwrap();
926            std::fs::write(dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
927        }
928        // A directory without a key file should be ignored
929        std::fs::create_dir_all(tmp.path().join("no_key")).unwrap();
930
931        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
932        assert_eq!(dirs.len(), 2);
933    }
934
935    #[tokio::test]
936    async fn test_resolve_identity_first_run_creates_identity() {
937        let tmp = tempfile::tempdir().unwrap();
938        let mut config = NodeConfig {
939            root_dir: tmp.path().to_path_buf(),
940            ..Default::default()
941        };
942
943        let identity = NodeBuilder::resolve_identity(&mut config).await.unwrap();
944        // Key file should exist
945        assert!(tmp.path().join(NODE_IDENTITY_FILENAME).exists());
946        // peer_id should be derivable from the identity
947        let peer_id = identity.peer_id().to_hex();
948        assert_eq!(peer_id.len(), 64); // 32 bytes hex-encoded
949    }
950
951    #[tokio::test]
952    async fn test_resolve_identity_loads_existing() {
953        let tmp = tempfile::tempdir().unwrap();
954
955        // Generate and save an identity
956        let original = NodeIdentity::generate().unwrap();
957        original
958            .save_to_file(&tmp.path().join(NODE_IDENTITY_FILENAME))
959            .await
960            .unwrap();
961
962        let mut config = NodeConfig {
963            root_dir: tmp.path().to_path_buf(),
964            ..Default::default()
965        };
966
967        let loaded = NodeBuilder::resolve_identity(&mut config).await.unwrap();
968        assert_eq!(loaded.peer_id(), original.peer_id());
969    }
970
971    #[test]
972    fn test_peer_id_hex_length() {
973        let id = saorsa_core::identity::PeerId::from_bytes([0x42; 32]);
974        let hex = id.to_hex();
975        assert_eq!(hex.len(), 64); // 32 bytes = 64 hex chars
976    }
977
978    /// Simulates a node restart: first run creates identity in a scoped subdir
979    /// under `nodes/`, second run discovers and reloads it — `peer_id` must be
980    /// identical and the directory name is the full 64-char hex peer ID.
981    #[tokio::test]
982    async fn test_identity_persisted_across_restarts() {
983        let base_dir = tempfile::tempdir().unwrap();
984        let nodes_dir = base_dir.path().join(NODES_SUBDIR);
985
986        // First "boot": generate identity, save it in nodes/{peer_id}/
987        let identity1 = NodeIdentity::generate().unwrap();
988        let peer_id1 = identity1.peer_id().to_hex();
989        let peer_dir = nodes_dir.join(&peer_id1);
990        std::fs::create_dir_all(&peer_dir).unwrap();
991        identity1
992            .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
993            .await
994            .unwrap();
995
996        // Verify directory name is the full 64-char hex peer ID
997        assert_eq!(peer_id1.len(), 64);
998        assert_eq!(peer_dir.file_name().unwrap().to_string_lossy(), peer_id1);
999
1000        // Second "boot": scan should find and reload the same identity
1001        let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1002        assert_eq!(identity_dirs.len(), 1);
1003        let loaded = NodeIdentity::load_from_file(&identity_dirs[0].join(NODE_IDENTITY_FILENAME))
1004            .await
1005            .unwrap();
1006        let peer_id2 = loaded.peer_id().to_hex();
1007
1008        assert_eq!(peer_id1, peer_id2, "peer_id must survive restart");
1009        assert_eq!(
1010            identity_dirs[0], peer_dir,
1011            "root_dir must be the same directory"
1012        );
1013    }
1014
1015    /// When two identity subdirs exist under `nodes/`, the scan finds multiple
1016    /// and the resolve path would error asking for `--root-dir`.
1017    #[tokio::test]
1018    async fn test_multiple_identities_errors() {
1019        let base_dir = tempfile::tempdir().unwrap();
1020        let nodes_dir = base_dir.path().join(NODES_SUBDIR);
1021
1022        // Create two identity subdirectories under nodes/
1023        for name in &["aaaa", "bbbb"] {
1024            let dir = nodes_dir.join(name);
1025            std::fs::create_dir_all(&dir).unwrap();
1026            let identity = NodeIdentity::generate().unwrap();
1027            identity
1028                .save_to_file(&dir.join(NODE_IDENTITY_FILENAME))
1029                .await
1030                .unwrap();
1031        }
1032
1033        let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1034        assert_eq!(identity_dirs.len(), 2, "should find both identity dirs");
1035    }
1036
1037    /// With a non-default `root_dir` (explicit path), the identity is created on
1038    /// first run and reloaded on subsequent runs from the same directory.
1039    #[tokio::test]
1040    async fn test_explicit_root_dir_persists_across_restarts() {
1041        let tmp = tempfile::tempdir().unwrap();
1042
1043        // First boot — non-default root_dir triggers explicit path
1044        let mut config1 = NodeConfig {
1045            root_dir: tmp.path().to_path_buf(),
1046            ..Default::default()
1047        };
1048        let identity1 = NodeBuilder::resolve_identity(&mut config1).await.unwrap();
1049
1050        // Second boot — same dir
1051        let mut config2 = NodeConfig {
1052            root_dir: tmp.path().to_path_buf(),
1053            ..Default::default()
1054        };
1055        let identity2 = NodeBuilder::resolve_identity(&mut config2).await.unwrap();
1056
1057        assert_eq!(
1058            identity1.peer_id(),
1059            identity2.peer_id(),
1060            "explicit --root-dir must yield stable identity"
1061        );
1062    }
1063}