Skip to main content

ant_node/
node.rs

1//! Node implementation - thin wrapper around saorsa-core's `P2PNode`.
2
3use crate::ant_protocol::CHUNK_PROTOCOL_ID;
4use crate::config::{
5    default_nodes_dir, default_root_dir, NetworkMode, NodeConfig, NODE_IDENTITY_FILENAME,
6};
7use crate::error::{Error, Result};
8use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender};
9use crate::logging::{debug, error, info, warn};
10use crate::payment::metrics::QuotingMetricsTracker;
11use crate::payment::wallet::parse_rewards_address;
12use crate::payment::{EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
13use crate::replication::config::ReplicationConfig;
14use crate::replication::ReplicationEngine;
15use crate::storage::lmdb::MIB;
16use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
17use crate::upgrade::{
18    upgrade_cache_dir, AutoApplyUpgrader, BinaryCache, ReleaseCache, UpgradeMonitor, UpgradeResult,
19};
20use rand::Rng;
21use saorsa_core::identity::NodeIdentity;
22use saorsa_core::{
23    BootstrapConfig as CoreBootstrapConfig, BootstrapManager,
24    IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent,
25    P2PNode,
26};
27use std::path::PathBuf;
28use std::sync::atomic::{AtomicI32, Ordering};
29use std::sync::Arc;
30use tokio::sync::Semaphore;
31use tokio::task::JoinHandle;
32use tokio_util::sync::CancellationToken;
33
34#[cfg(unix)]
35use tokio::signal::unix::{signal, SignalKind};
36
37/// Builder for constructing an Ant node.
38pub struct NodeBuilder {
39    config: NodeConfig,
40}
41
42impl NodeBuilder {
43    /// Create a new node builder with the given configuration.
44    #[must_use]
45    pub fn new(config: NodeConfig) -> Self {
46        Self { config }
47    }
48
49    /// Build and start the node.
50    ///
51    /// # Errors
52    ///
53    /// Returns an error if the node fails to start.
54    pub async fn build(mut self) -> Result<RunningNode> {
55        info!("Building ant-node with config: {:?}", self.config);
56
57        // Validate rewards address in production
58        if self.config.network_mode == NetworkMode::Production {
59            match self.config.payment.rewards_address {
60                None => {
61                    return Err(Error::Config(
62                        "CRITICAL: Rewards address is not configured. \
63                         Set payment.rewards_address in config to your Arbitrum wallet address."
64                            .to_string(),
65                    ));
66                }
67                Some(ref addr) if addr == "0xYOUR_ARBITRUM_ADDRESS_HERE" || addr.is_empty() => {
68                    return Err(Error::Config(
69                        "CRITICAL: Rewards address is not configured. \
70                         Set payment.rewards_address in config to your Arbitrum wallet address."
71                            .to_string(),
72                    ));
73                }
74                Some(_) => {}
75            }
76        }
77
78        // Resolve identity and root_dir (may update self.config.root_dir)
79        let identity = Arc::new(Self::resolve_identity(&mut self.config).await?);
80        let peer_id = identity.peer_id().to_hex();
81
82        info!(peer_id = %peer_id, root_dir = %self.config.root_dir.display(), "Node identity resolved");
83
84        // Ensure root directory exists
85        std::fs::create_dir_all(&self.config.root_dir)?;
86
87        // Create shutdown token
88        let shutdown = CancellationToken::new();
89
90        // Create event channel
91        let (events_tx, events_rx) = create_event_channel();
92
93        // Convert our config to saorsa-core's config
94        let mut core_config = Self::build_core_config(&self.config)?;
95        // Inject the ML-DSA identity so the P2PNode's transport peer ID
96        // matches the pub_key embedded in payment quotes.
97        core_config.node_identity = Some(Arc::clone(&identity));
98        debug!("Core config: {:?}", core_config);
99
100        // Initialize saorsa-core's P2PNode
101        let p2p_node = P2PNode::new(core_config)
102            .await
103            .map_err(|e| Error::Startup(format!("Failed to create P2P node: {e}")))?;
104
105        // Create upgrade monitor
106        let upgrade_monitor = {
107            let node_id_seed = p2p_node.peer_id().as_bytes();
108            Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
109        };
110
111        // Initialize bootstrap cache manager if enabled
112        let bootstrap_manager = if self.config.bootstrap_cache.enabled {
113            Self::build_bootstrap_manager(&self.config).await
114        } else {
115            info!("Bootstrap cache disabled");
116            None
117        };
118
119        // Initialize ANT protocol handler for chunk storage and
120        // wire the fresh-write channel so PUTs trigger replication.
121        let (ant_protocol, fresh_write_rx) = if self.config.storage.enabled {
122            let (fresh_write_tx, fresh_write_rx) = tokio::sync::mpsc::unbounded_channel();
123            let mut protocol = Self::build_ant_protocol(&self.config, &identity).await?;
124            protocol.set_fresh_write_sender(fresh_write_tx);
125            (Some(Arc::new(protocol)), Some(fresh_write_rx))
126        } else {
127            info!("Chunk storage disabled");
128            (None, None)
129        };
130
131        let p2p_arc = Arc::new(p2p_node);
132
133        // Wire the P2PNode handle into the payment verifier so merkle-payment
134        // checks can query the live DHT for peers actually closest to a pool
135        // midpoint (pay-yourself defence).
136        if let Some(ref protocol) = ant_protocol {
137            protocol
138                .payment_verifier_arc()
139                .attach_p2p_node(Arc::clone(&p2p_arc));
140        }
141
142        // Initialize replication engine (if storage is enabled)
143        let replication_engine =
144            if let (Some(ref protocol), Some(fresh_rx)) = (&ant_protocol, fresh_write_rx) {
145                let repl_config = ReplicationConfig::default();
146                let storage_arc = protocol.storage();
147                let payment_verifier_arc = protocol.payment_verifier_arc();
148                match ReplicationEngine::new(
149                    repl_config,
150                    Arc::clone(&p2p_arc),
151                    storage_arc,
152                    payment_verifier_arc,
153                    &self.config.root_dir,
154                    fresh_rx,
155                    shutdown.clone(),
156                )
157                .await
158                {
159                    Ok(engine) => Some(engine),
160                    Err(e) => {
161                        warn!("Failed to initialize replication engine: {e}");
162                        None
163                    }
164                }
165            } else {
166                None
167            };
168
169        let node = RunningNode {
170            config: self.config,
171            p2p_node: p2p_arc,
172            shutdown,
173            events_tx,
174            events_rx: Some(events_rx),
175            upgrade_monitor,
176            bootstrap_manager,
177            ant_protocol,
178            replication_engine,
179            protocol_task: None,
180            upgrade_exit_code: Arc::new(AtomicI32::new(-1)),
181        };
182
183        Ok(node)
184    }
185
186    /// Build the saorsa-core `NodeConfig` from our config.
187    fn build_core_config(config: &NodeConfig) -> Result<CoreNodeConfig> {
188        let local = matches!(config.network_mode, NetworkMode::Development);
189
190        let mut core_config = CoreNodeConfig::builder()
191            .port(config.port)
192            .ipv6(!config.ipv4_only)
193            .local(local)
194            .max_message_size(config.max_message_size)
195            .build()
196            .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?;
197
198        // Add bootstrap peers.
199        core_config.bootstrap_peers = config
200            .bootstrap
201            .iter()
202            .map(|addr| MultiAddr::quic(*addr))
203            .collect();
204
205        // Propagate network-mode tuning into saorsa-core where supported.
206        match config.network_mode {
207            NetworkMode::Production => {
208                core_config.diversity_config = Some(CoreDiversityConfig::default());
209            }
210            NetworkMode::Testnet => {
211                // Testnet allows loopback so nodes can be co-located on one machine.
212                core_config.allow_loopback = true;
213                core_config.diversity_config = Some(CoreDiversityConfig {
214                    max_per_ip: config.testnet.max_per_ip,
215                    max_per_subnet: config.testnet.max_per_subnet,
216                });
217            }
218            NetworkMode::Development => {
219                core_config.diversity_config = Some(CoreDiversityConfig::permissive());
220            }
221        }
222
223        // Persist close group peers + trust scores across restarts.
224        // Default to root_dir (alongside node_identity.key) when not explicitly set.
225        core_config.close_group_cache_dir = Some(
226            config
227                .close_group_cache_dir
228                .clone()
229                .unwrap_or_else(|| config.root_dir.clone()),
230        );
231
232        Ok(core_config)
233    }
234
235    /// Resolve the node identity from disk or generate a new one.
236    ///
237    /// **When `root_dir` differs from the platform default** (set via `--root-dir`
238    /// or loaded from `config.toml`):
239    ///   - Use `root_dir` directly: load existing identity or generate a new one.
240    ///
241    /// **When `root_dir` is the platform default** (first run, no config file):
242    ///   1. Scan `{default_root_dir}/nodes/` for subdirectories containing
243    ///      `node_identity.key`.
244    ///   2. **None found** — first run: generate identity, create
245    ///      `nodes/{full_peer_id}/`, save identity there, update `config.root_dir`.
246    ///   3. **Exactly one found** — load it and update `config.root_dir`.
247    ///   4. **Multiple found** — return an error asking for `--root-dir`.
248    async fn resolve_identity(config: &mut NodeConfig) -> Result<NodeIdentity> {
249        if config.root_dir != default_root_dir() {
250            return Self::load_or_generate_identity(&config.root_dir).await;
251        }
252
253        let nodes_dir = default_nodes_dir();
254        let identity_dirs = Self::scan_identity_dirs(&nodes_dir)?;
255
256        match identity_dirs.len() {
257            0 => {
258                // First run: generate new identity and create a peer-id-scoped subdirectory
259                let identity = NodeIdentity::generate().map_err(|e| {
260                    Error::Startup(format!("Failed to generate node identity: {e}"))
261                })?;
262                let peer_id = identity.peer_id().to_hex();
263                let peer_dir = nodes_dir.join(&peer_id);
264                std::fs::create_dir_all(&peer_dir)?;
265                identity
266                    .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
267                    .await
268                    .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
269                config.root_dir = peer_dir;
270                Ok(identity)
271            }
272            1 => {
273                let dir = identity_dirs
274                    .first()
275                    .ok_or_else(|| Error::Config("No identity dirs found".to_string()))?;
276                let identity = NodeIdentity::load_from_file(&dir.join(NODE_IDENTITY_FILENAME))
277                    .await
278                    .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))?;
279                config.root_dir.clone_from(dir);
280                Ok(identity)
281            }
282            _ => {
283                let dirs: Vec<String> = identity_dirs
284                    .iter()
285                    .filter_map(|d| d.file_name().map(|n| n.to_string_lossy().into_owned()))
286                    .collect();
287                Err(Error::Config(format!(
288                    "Multiple node identities found at {}: [{}]. Specify --root-dir to select one.",
289                    nodes_dir.display(),
290                    dirs.join(", ")
291                )))
292            }
293        }
294    }
295
296    /// Load an existing identity from `dir/node_identity.key`, or generate and save a new one.
297    async fn load_or_generate_identity(dir: &std::path::Path) -> Result<NodeIdentity> {
298        let key_path = dir.join(NODE_IDENTITY_FILENAME);
299        if key_path.exists() {
300            NodeIdentity::load_from_file(&key_path)
301                .await
302                .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))
303        } else {
304            let identity = NodeIdentity::generate()
305                .map_err(|e| Error::Startup(format!("Failed to generate node identity: {e}")))?;
306            std::fs::create_dir_all(dir)?;
307            identity
308                .save_to_file(&key_path)
309                .await
310                .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
311            Ok(identity)
312        }
313    }
314
315    /// Scan `base_dir` for immediate subdirectories that contain `node_identity.key`.
316    fn scan_identity_dirs(base_dir: &std::path::Path) -> Result<Vec<PathBuf>> {
317        let mut dirs = Vec::new();
318        let read_dir = match std::fs::read_dir(base_dir) {
319            Ok(rd) => rd,
320            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(dirs),
321            Err(e) => return Err(e.into()),
322        };
323        for entry in read_dir {
324            let entry = entry?;
325            let path = entry.path();
326            if path.is_dir() && path.join(NODE_IDENTITY_FILENAME).exists() {
327                dirs.push(path);
328            }
329        }
330        Ok(dirs)
331    }
332
333    fn build_upgrade_monitor(config: &NodeConfig, node_id_seed: &[u8]) -> UpgradeMonitor {
334        let mut monitor = UpgradeMonitor::new(
335            config.upgrade.github_repo.clone(),
336            config.upgrade.channel,
337            config.upgrade.check_interval_hours,
338        );
339
340        if let Ok(cache_dir) = upgrade_cache_dir() {
341            monitor = monitor.with_release_cache(ReleaseCache::new(
342                cache_dir,
343                std::time::Duration::from_secs(3600),
344            ));
345        }
346
347        if config.upgrade.staged_rollout_hours > 0 {
348            monitor =
349                monitor.with_staged_rollout(node_id_seed, config.upgrade.staged_rollout_hours);
350        }
351
352        monitor
353    }
354
355    /// Build the ANT protocol handler from config.
356    ///
357    /// Initializes LMDB storage, payment verifier, and quote generator.
358    /// Wires ML-DSA-65 signing from the node's identity into the quote generator.
359    async fn build_ant_protocol(
360        config: &NodeConfig,
361        identity: &NodeIdentity,
362    ) -> Result<AntProtocol> {
363        // Create LMDB storage
364        let storage_config = LmdbStorageConfig {
365            root_dir: config.root_dir.clone(),
366            verify_on_read: config.storage.verify_on_read,
367            max_map_size: config.storage.db_size_gb.saturating_mul(1024 * 1024 * 1024),
368            disk_reserve: config.storage.disk_reserve_mb.saturating_mul(MIB),
369        };
370        let storage = LmdbStorage::new(storage_config)
371            .await
372            .map_err(|e| Error::Startup(format!("Failed to create LMDB storage: {e}")))?;
373
374        // Parse rewards address (required — node must know where to receive payments)
375        let rewards_address = match config.payment.rewards_address {
376            Some(ref addr) => parse_rewards_address(addr)?,
377            None => {
378                return Err(Error::Startup(
379                    "No rewards address configured. Set --rewards-address or payment.rewards_address in config.".to_string(),
380                ));
381            }
382        };
383
384        // Create payment verifier
385        let evm_network = config.payment.evm_network.clone().into_evm_network();
386        let payment_config = PaymentVerifierConfig {
387            evm: EvmVerifierConfig {
388                network: evm_network,
389            },
390            cache_capacity: config.payment.cache_capacity,
391            local_rewards_address: rewards_address,
392        };
393        let payment_verifier = PaymentVerifier::new(payment_config);
394        let metrics_tracker = QuotingMetricsTracker::new(0);
395        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
396
397        // Wire ML-DSA-65 signing from node identity.
398        // This same signer is used for both regular quotes and merkle candidate quotes.
399        crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)?;
400
401        let protocol = AntProtocol::new(
402            Arc::new(storage),
403            Arc::new(payment_verifier),
404            Arc::new(quote_generator),
405        );
406
407        info!(
408            "ANT protocol handler initialized with ML-DSA-65 signing (protocol={CHUNK_PROTOCOL_ID})"
409        );
410
411        Ok(protocol)
412    }
413
414    /// Build the bootstrap cache manager from config.
415    async fn build_bootstrap_manager(config: &NodeConfig) -> Option<BootstrapManager> {
416        let cache_dir = config
417            .bootstrap_cache
418            .cache_dir
419            .clone()
420            .unwrap_or_else(|| config.root_dir.join("bootstrap_cache"));
421
422        // Create cache directory
423        if let Err(e) = std::fs::create_dir_all(&cache_dir) {
424            warn!("Failed to create bootstrap cache directory: {e}");
425            return None;
426        }
427
428        let bootstrap_config = CoreBootstrapConfig {
429            cache_dir,
430            max_peers: config.bootstrap_cache.max_contacts,
431            ..CoreBootstrapConfig::default()
432        };
433
434        match BootstrapManager::with_config(bootstrap_config).await {
435            Ok(manager) => {
436                info!(
437                    "Bootstrap cache initialized with {} max contacts",
438                    config.bootstrap_cache.max_contacts
439                );
440                Some(manager)
441            }
442            Err(e) => {
443                warn!("Failed to initialize bootstrap cache: {e}");
444                None
445            }
446        }
447    }
448}
449
450/// A running Ant node.
451pub struct RunningNode {
452    config: NodeConfig,
453    p2p_node: Arc<P2PNode>,
454    shutdown: CancellationToken,
455    events_tx: NodeEventsSender,
456    events_rx: Option<NodeEventsChannel>,
457    upgrade_monitor: Option<UpgradeMonitor>,
458    /// Bootstrap cache manager for persistent peer storage.
459    bootstrap_manager: Option<BootstrapManager>,
460    /// ANT protocol handler for chunk storage.
461    ant_protocol: Option<Arc<AntProtocol>>,
462    /// Replication engine (manages neighbor sync, verification, audits).
463    replication_engine: Option<ReplicationEngine>,
464    /// Protocol message routing background task.
465    protocol_task: Option<JoinHandle<()>>,
466    /// Exit code requested by a successful upgrade (-1 = no upgrade exit pending).
467    upgrade_exit_code: Arc<AtomicI32>,
468}
469
470impl RunningNode {
471    /// Get the node's root directory.
472    #[must_use]
473    pub fn root_dir(&self) -> &PathBuf {
474        &self.config.root_dir
475    }
476
477    /// Get a receiver for node events.
478    ///
479    /// Note: Can only be called once. Subsequent calls return None.
480    pub fn events(&mut self) -> Option<NodeEventsChannel> {
481        self.events_rx.take()
482    }
483
484    /// Subscribe to node events.
485    #[must_use]
486    pub fn subscribe_events(&self) -> NodeEventsChannel {
487        self.events_tx.subscribe()
488    }
489
490    /// Run the node until shutdown is requested.
491    ///
492    /// # Errors
493    ///
494    /// Returns an error if the node encounters a fatal error.
495    #[allow(clippy::too_many_lines)]
496    pub async fn run(&mut self) -> Result<()> {
497        info!("Node runtime loop starting");
498
499        // Subscribe to DHT events BEFORE starting the P2P node so the
500        // bootstrap-sync task does not miss the BootstrapComplete event
501        // emitted during P2PNode::start().
502        let dht_events_for_bootstrap = self
503            .replication_engine
504            .as_ref()
505            .map(|_| self.p2p_node.dht_manager().subscribe_events());
506
507        // Start the P2P node
508        self.p2p_node
509            .start()
510            .await
511            .map_err(|e| Error::Startup(format!("Failed to start P2P node: {e}")))?;
512
513        let listen_addrs = self.p2p_node.listen_addrs().await;
514        info!(listen_addrs = ?listen_addrs, "P2P node started");
515
516        // Extract the actual bound port (config port may be 0 = auto-select)
517        let actual_port = listen_addrs
518            .first()
519            .and_then(MultiAddr::port)
520            .unwrap_or(self.config.port);
521        info!(
522            port = actual_port,
523            "Node is running on port: {}", actual_port
524        );
525
526        // Emit started event
527        if let Err(e) = self.events_tx.send(NodeEvent::Started) {
528            warn!("Failed to send Started event: {e}");
529        }
530
531        // Start protocol message routing (P2P → AntProtocol → P2P response)
532        self.start_protocol_routing();
533
534        // Start replication engine background tasks
535        if let Some(ref mut engine) = self.replication_engine {
536            // Safety: dht_events_for_bootstrap is Some when replication_engine
537            // is Some (both arms use the same condition).
538            if let Some(dht_events) = dht_events_for_bootstrap {
539                engine.start(dht_events);
540            }
541            info!("Replication engine started");
542        }
543
544        // Start upgrade monitor if enabled
545        if let Some(monitor) = self.upgrade_monitor.take() {
546            let events_tx = self.events_tx.clone();
547            let shutdown = self.shutdown.clone();
548            let stop_on_upgrade = self.config.upgrade.stop_on_upgrade;
549            let upgrade_exit_code = Arc::clone(&self.upgrade_exit_code);
550
551            tokio::spawn(async move {
552                let mut monitor = monitor;
553                let mut upgrader = AutoApplyUpgrader::new().with_stop_on_upgrade(stop_on_upgrade);
554                if let Ok(cache_dir) = upgrade_cache_dir() {
555                    upgrader = upgrader.with_binary_cache(BinaryCache::new(cache_dir));
556                }
557
558                // Add randomized jitter before the first upgrade check to prevent all nodes
559                // from hitting the GitHub API simultaneously when started together.
560                {
561                    let jitter_duration = jittered_interval(monitor.check_interval());
562                    let first_check_time = chrono::Utc::now()
563                        + chrono::Duration::from_std(jitter_duration).unwrap_or_else(|e| {
564                            warn!("chrono::Duration::from_std failed for jitter ({e}), defaulting to 1 minute");
565                            chrono::Duration::minutes(1)
566                        });
567                    info!(
568                        "First upgrade check scheduled for {} (jitter: {}s)",
569                        first_check_time.to_rfc3339(),
570                        jitter_duration.as_secs()
571                    );
572                    tokio::time::sleep(jitter_duration).await;
573                }
574
575                loop {
576                    tokio::select! {
577                        () = shutdown.cancelled() => {
578                            break;
579                        }
580                        result = monitor.check_for_ready_upgrade() => {
581                            match result {
582                                Ok(Some(upgrade_info)) => {
583                                    info!(
584                                        current_version = %upgrader.current_version(),
585                                        new_version = %upgrade_info.version,
586                                        "Upgrade available"
587                                    );
588
589                                    // Send notification event
590                                    if let Err(e) = events_tx.send(NodeEvent::UpgradeAvailable {
591                                        version: upgrade_info.version.to_string(),
592                                    }) {
593                                        warn!("Failed to send UpgradeAvailable event: {e}");
594                                    }
595
596                                    // Auto-apply the upgrade
597                                    info!("Starting auto-apply upgrade...");
598                                    match upgrader.apply_upgrade(&upgrade_info).await {
599                                        Ok(UpgradeResult::Success { version, exit_code }) => {
600                                            info!("Upgrade to {} successful, initiating graceful shutdown", version);
601                                            upgrade_exit_code.store(exit_code, Ordering::SeqCst);
602                                            shutdown.cancel();
603                                            break;
604                                        }
605                                        Ok(UpgradeResult::RolledBack { reason }) => {
606                                            warn!("Error during upgrade process: {}", reason);
607                                        }
608                                        Ok(UpgradeResult::NoUpgrade) => {
609                                            info!("Already running latest version");
610                                        }
611                                        Err(e) => {
612                                            error!("Error during upgrade process: {}", e);
613                                        }
614                                    }
615                                }
616                                Ok(None) => {
617                                    if let Some(remaining) = monitor.time_until_upgrade() {
618                                        info!(
619                                            "Upgrade pending, rollout delay remaining: {}m {}s",
620                                            remaining.as_secs() / 60,
621                                            remaining.as_secs() % 60
622                                        );
623                                    } else {
624                                        info!("No upgrade available");
625                                    }
626                                }
627                                Err(e) => {
628                                    warn!("Error during upgrade process: {}", e);
629                                }
630                            }
631                            // If an upgrade is pending, sleep for exactly the remaining
632                            // rollout delay so the node restarts at its scheduled time
633                            // rather than waiting for the next check interval tick.
634                            let sleep_duration = monitor.time_until_upgrade().map_or_else(
635                                || {
636                                    // No pending upgrade - schedule next check with jitter
637                                    let jittered_duration =
638                                        jittered_interval(monitor.check_interval());
639                                    let next_check = chrono::Utc::now()
640                                        + chrono::Duration::from_std(jittered_duration).unwrap_or_else(|e| {
641                                            warn!("chrono::Duration::from_std failed for interval ({e}), defaulting to 1 hour");
642                                            chrono::Duration::hours(1)
643                                        });
644                                    info!("Next upgrade check scheduled for {}", next_check.to_rfc3339());
645                                    jittered_duration
646                                },
647                                |remaining| {
648                                    // If the rollout delay has fully elapsed but the upgrade was
649                                    // not successfully applied, avoid a tight loop by backing off
650                                    // at least one check interval before retrying.
651                                    if remaining.is_zero() {
652                                        let backoff = jittered_interval(monitor.check_interval());
653                                        let next_check = chrono::Utc::now()
654                                            + chrono::Duration::from_std(backoff).unwrap_or_else(|e| {
655                                                warn!("chrono::Duration::from_std failed for backoff ({e}), defaulting to 1 hour");
656                                                chrono::Duration::hours(1)
657                                            });
658                                        info!(
659                                            "Upgrade rollout delay elapsed but previous apply did not succeed; \
660                                             backing off, next check scheduled for {}",
661                                            next_check.to_rfc3339()
662                                        );
663                                        backoff
664                                    } else {
665                                        let wake_time = chrono::Utc::now()
666                                            + chrono::Duration::from_std(remaining).unwrap_or_else(|e| {
667                                                warn!("chrono::Duration::from_std failed for rollout delay ({e}), defaulting to 1 minute");
668                                                chrono::Duration::minutes(1)
669                                            });
670                                        info!("Will apply upgrade at {}", wake_time.to_rfc3339());
671                                        remaining
672                                    }
673                                },
674                            );
675                            // Use select! so shutdown can interrupt long sleeps
676                            // (e.g. during a full rollout window delay).
677                            tokio::select! {
678                                () = shutdown.cancelled() => {
679                                    break;
680                                }
681                                () = tokio::time::sleep(sleep_duration) => {}
682                            }
683                        }
684                    }
685                }
686            });
687        }
688
689        info!("Node running, waiting for shutdown signal");
690
691        // Run the main event loop with signal handling
692        self.run_event_loop().await?;
693
694        // Log bootstrap cache stats before shutdown
695        if let Some(ref manager) = self.bootstrap_manager {
696            let stats = manager.stats().await;
697            info!(
698                "Bootstrap cache shutdown: {} peers, avg quality {:.2}",
699                stats.total_peers, stats.average_quality
700            );
701        }
702
703        // Shutdown replication engine before P2P so background tasks don't
704        // use a dead P2P layer, and Arc<LmdbStorage> references are released.
705        if let Some(ref mut engine) = self.replication_engine {
706            engine.shutdown().await;
707        }
708
709        // Stop protocol routing task
710        if let Some(handle) = self.protocol_task.take() {
711            handle.abort();
712        }
713
714        // Shutdown P2P node
715        info!("Shutting down P2P node...");
716        if let Err(e) = self.p2p_node.shutdown().await {
717            warn!("Error during P2P node shutdown: {e}");
718        }
719
720        if let Err(e) = self.events_tx.send(NodeEvent::ShuttingDown) {
721            warn!("Failed to send ShuttingDown event: {e}");
722        }
723        info!("Node shutdown complete");
724
725        // If an upgrade triggered the shutdown, exit with the requested code.
726        // This happens *after* all cleanup (P2P shutdown, log flush, etc.) so
727        // that destructors and async resources are properly torn down.
728        let exit_code = self.upgrade_exit_code.load(Ordering::SeqCst);
729        if exit_code >= 0 {
730            info!("Exiting with code {} for upgrade restart", exit_code);
731            std::process::exit(exit_code);
732        }
733
734        Ok(())
735    }
736
737    /// Run the main event loop, handling shutdown and signals.
738    #[cfg(unix)]
739    async fn run_event_loop(&self) -> Result<()> {
740        let mut sigterm = signal(SignalKind::terminate())?;
741        let mut sighup = signal(SignalKind::hangup())?;
742
743        loop {
744            tokio::select! {
745                () = self.shutdown.cancelled() => {
746                    info!("Shutdown signal received");
747                    break;
748                }
749                _ = tokio::signal::ctrl_c() => {
750                    info!("Received SIGINT (Ctrl-C), initiating shutdown");
751                    self.shutdown();
752                    break;
753                }
754                _ = sigterm.recv() => {
755                    info!("Received SIGTERM, initiating shutdown");
756                    self.shutdown();
757                    break;
758                }
759                _ = sighup.recv() => {
760                    info!("Received SIGHUP (config reload not yet supported)");
761                }
762            }
763        }
764        Ok(())
765    }
766
767    /// Run the main event loop, handling shutdown signals (non-Unix version).
768    #[cfg(not(unix))]
769    async fn run_event_loop(&self) -> Result<()> {
770        loop {
771            tokio::select! {
772                () = self.shutdown.cancelled() => {
773                    info!("Shutdown signal received");
774                    break;
775                }
776                _ = tokio::signal::ctrl_c() => {
777                    info!("Received Ctrl-C, initiating shutdown");
778                    self.shutdown();
779                    break;
780                }
781            }
782        }
783        Ok(())
784    }
785
786    /// Start the protocol message routing background task.
787    ///
788    /// Subscribes to P2P events and routes incoming chunk protocol messages
789    /// to the `AntProtocol` handler, sending responses back to the sender.
790    fn start_protocol_routing(&mut self) {
791        let protocol = match self.ant_protocol {
792            Some(ref p) => Arc::clone(p),
793            None => return,
794        };
795
796        let mut events = self.p2p_node.subscribe_events();
797        let p2p = Arc::clone(&self.p2p_node);
798        let semaphore = Arc::new(Semaphore::new(64));
799
800        self.protocol_task = Some(tokio::spawn(async move {
801            while let Ok(event) = events.recv().await {
802                if let P2PEvent::Message {
803                    topic,
804                    source: Some(source),
805                    data,
806                } = event
807                {
808                    let handler_info: Option<(&str, &str)> = if topic == CHUNK_PROTOCOL_ID {
809                        Some(("chunk", CHUNK_PROTOCOL_ID))
810                    } else {
811                        None
812                    };
813
814                    if let Some((data_type, response_topic)) = handler_info {
815                        debug!("Received {data_type} protocol message from {source}");
816                        let protocol = Arc::clone(&protocol);
817                        let p2p = Arc::clone(&p2p);
818                        let sem = semaphore.clone();
819                        tokio::spawn(async move {
820                            let Ok(_permit) = sem.acquire().await else {
821                                return;
822                            };
823                            let result = match data_type {
824                                "chunk" => protocol.try_handle_request(&data).await,
825                                _ => return,
826                            };
827                            match result {
828                                Ok(Some(response)) => {
829                                    if let Err(e) = p2p
830                                        .send_message(
831                                            &source,
832                                            response_topic,
833                                            response.to_vec(),
834                                            &[],
835                                        )
836                                        .await
837                                    {
838                                        warn!("Failed to send {data_type} protocol response to {source}: {e}");
839                                    }
840                                }
841                                Ok(None) => {}
842                                Err(e) => {
843                                    warn!("{data_type} protocol handler error: {e}");
844                                }
845                            }
846                        });
847                    }
848                }
849            }
850        }));
851        info!("Protocol message routing started");
852    }
853
854    /// Request the node to shut down.
855    pub fn shutdown(&self) {
856        self.shutdown.cancel();
857    }
858}
859
860/// Apply ±5% jitter to a base interval to prevent thundering-herd behaviour
861/// when multiple nodes check for upgrades on the same schedule.
862fn jittered_interval(base: std::time::Duration) -> std::time::Duration {
863    let secs = base.as_secs();
864    let variance = secs / 20; // 5%
865    if variance == 0 {
866        return base;
867    }
868    let jitter = rand::thread_rng().gen_range(0..=variance * 2);
869    std::time::Duration::from_secs(secs.saturating_sub(variance) + jitter)
870}
871
872#[cfg(test)]
873#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
874mod tests {
875    use super::*;
876    use crate::config::NODES_SUBDIR;
877
878    #[test]
879    fn test_build_upgrade_monitor_staged_rollout_enabled() {
880        let config = NodeConfig {
881            upgrade: crate::config::UpgradeConfig {
882                staged_rollout_hours: 24,
883                ..Default::default()
884            },
885            ..Default::default()
886        };
887        let seed = b"node-seed";
888
889        let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
890        assert!(monitor.has_staged_rollout());
891    }
892
893    #[test]
894    fn test_build_upgrade_monitor_staged_rollout_disabled() {
895        let config = NodeConfig {
896            upgrade: crate::config::UpgradeConfig {
897                staged_rollout_hours: 0,
898                ..Default::default()
899            },
900            ..Default::default()
901        };
902        let seed = b"node-seed";
903
904        let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
905        assert!(!monitor.has_staged_rollout());
906    }
907
908    #[test]
909    fn test_build_core_config_sets_production_mode() {
910        let config = NodeConfig {
911            network_mode: NetworkMode::Production,
912            ..Default::default()
913        };
914        let core = NodeBuilder::build_core_config(&config).expect("core config");
915        assert!(core.diversity_config.is_some());
916    }
917
918    #[test]
919    fn test_build_core_config_ipv4_only() {
920        let config = NodeConfig {
921            ipv4_only: true,
922            ..Default::default()
923        };
924        let core = NodeBuilder::build_core_config(&config).expect("core config");
925        assert!(!core.ipv6, "ipv4_only should disable IPv6");
926    }
927
928    #[test]
929    fn test_build_core_config_dual_stack_by_default() {
930        let config = NodeConfig::default();
931        let core = NodeBuilder::build_core_config(&config).expect("core config");
932        assert!(core.ipv6, "dual-stack should be the default");
933    }
934
935    #[test]
936    fn test_build_core_config_sets_development_mode_permissive() {
937        let config = NodeConfig {
938            network_mode: NetworkMode::Development,
939            ..Default::default()
940        };
941        let core = NodeBuilder::build_core_config(&config).expect("core config");
942        let diversity = core.diversity_config.expect("diversity");
943        assert_eq!(diversity.max_per_ip, Some(usize::MAX));
944        assert_eq!(diversity.max_per_subnet, Some(usize::MAX));
945    }
946
947    #[test]
948    fn test_scan_identity_dirs_empty_dir() {
949        let tmp = tempfile::tempdir().unwrap();
950        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
951        assert!(dirs.is_empty());
952    }
953
954    #[test]
955    fn test_scan_identity_dirs_nonexistent_dir() {
956        let tmp = tempfile::tempdir().unwrap();
957        let path = tmp.path().join("nonexistent_identity_dir");
958        let dirs = NodeBuilder::scan_identity_dirs(&path).unwrap();
959        assert!(dirs.is_empty());
960    }
961
962    #[test]
963    fn test_scan_identity_dirs_finds_one() {
964        let tmp = tempfile::tempdir().unwrap();
965        let node_dir = tmp.path().join("abc123");
966        std::fs::create_dir_all(&node_dir).unwrap();
967        std::fs::write(node_dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
968
969        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
970        assert_eq!(dirs.len(), 1);
971        assert_eq!(dirs[0], node_dir);
972    }
973
974    #[test]
975    fn test_scan_identity_dirs_finds_multiple() {
976        let tmp = tempfile::tempdir().unwrap();
977        for name in &["node_a", "node_b"] {
978            let dir = tmp.path().join(name);
979            std::fs::create_dir_all(&dir).unwrap();
980            std::fs::write(dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
981        }
982        // A directory without a key file should be ignored
983        std::fs::create_dir_all(tmp.path().join("no_key")).unwrap();
984
985        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
986        assert_eq!(dirs.len(), 2);
987    }
988
989    #[tokio::test]
990    async fn test_resolve_identity_first_run_creates_identity() {
991        let tmp = tempfile::tempdir().unwrap();
992        let mut config = NodeConfig {
993            root_dir: tmp.path().to_path_buf(),
994            ..Default::default()
995        };
996
997        let identity = NodeBuilder::resolve_identity(&mut config).await.unwrap();
998        // Key file should exist
999        assert!(tmp.path().join(NODE_IDENTITY_FILENAME).exists());
1000        // peer_id should be derivable from the identity
1001        let peer_id = identity.peer_id().to_hex();
1002        assert_eq!(peer_id.len(), 64); // 32 bytes hex-encoded
1003    }
1004
1005    #[tokio::test]
1006    async fn test_resolve_identity_loads_existing() {
1007        let tmp = tempfile::tempdir().unwrap();
1008
1009        // Generate and save an identity
1010        let original = NodeIdentity::generate().unwrap();
1011        original
1012            .save_to_file(&tmp.path().join(NODE_IDENTITY_FILENAME))
1013            .await
1014            .unwrap();
1015
1016        let mut config = NodeConfig {
1017            root_dir: tmp.path().to_path_buf(),
1018            ..Default::default()
1019        };
1020
1021        let loaded = NodeBuilder::resolve_identity(&mut config).await.unwrap();
1022        assert_eq!(loaded.peer_id(), original.peer_id());
1023    }
1024
1025    #[test]
1026    fn test_peer_id_hex_length() {
1027        let id = saorsa_core::identity::PeerId::from_bytes([0x42; 32]);
1028        let hex = id.to_hex();
1029        assert_eq!(hex.len(), 64); // 32 bytes = 64 hex chars
1030    }
1031
1032    /// Simulates a node restart: first run creates identity in a scoped subdir
1033    /// under `nodes/`, second run discovers and reloads it — `peer_id` must be
1034    /// identical and the directory name is the full 64-char hex peer ID.
1035    #[tokio::test]
1036    async fn test_identity_persisted_across_restarts() {
1037        let base_dir = tempfile::tempdir().unwrap();
1038        let nodes_dir = base_dir.path().join(NODES_SUBDIR);
1039
1040        // First "boot": generate identity, save it in nodes/{peer_id}/
1041        let identity1 = NodeIdentity::generate().unwrap();
1042        let peer_id1 = identity1.peer_id().to_hex();
1043        let peer_dir = nodes_dir.join(&peer_id1);
1044        std::fs::create_dir_all(&peer_dir).unwrap();
1045        identity1
1046            .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
1047            .await
1048            .unwrap();
1049
1050        // Verify directory name is the full 64-char hex peer ID
1051        assert_eq!(peer_id1.len(), 64);
1052        assert_eq!(peer_dir.file_name().unwrap().to_string_lossy(), peer_id1);
1053
1054        // Second "boot": scan should find and reload the same identity
1055        let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1056        assert_eq!(identity_dirs.len(), 1);
1057        let loaded = NodeIdentity::load_from_file(&identity_dirs[0].join(NODE_IDENTITY_FILENAME))
1058            .await
1059            .unwrap();
1060        let peer_id2 = loaded.peer_id().to_hex();
1061
1062        assert_eq!(peer_id1, peer_id2, "peer_id must survive restart");
1063        assert_eq!(
1064            identity_dirs[0], peer_dir,
1065            "root_dir must be the same directory"
1066        );
1067    }
1068
1069    /// When two identity subdirs exist under `nodes/`, the scan finds multiple
1070    /// and the resolve path would error asking for `--root-dir`.
1071    #[tokio::test]
1072    async fn test_multiple_identities_errors() {
1073        let base_dir = tempfile::tempdir().unwrap();
1074        let nodes_dir = base_dir.path().join(NODES_SUBDIR);
1075
1076        // Create two identity subdirectories under nodes/
1077        for name in &["aaaa", "bbbb"] {
1078            let dir = nodes_dir.join(name);
1079            std::fs::create_dir_all(&dir).unwrap();
1080            let identity = NodeIdentity::generate().unwrap();
1081            identity
1082                .save_to_file(&dir.join(NODE_IDENTITY_FILENAME))
1083                .await
1084                .unwrap();
1085        }
1086
1087        let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1088        assert_eq!(identity_dirs.len(), 2, "should find both identity dirs");
1089    }
1090
1091    /// With a non-default `root_dir` (explicit path), the identity is created on
1092    /// first run and reloaded on subsequent runs from the same directory.
1093    #[tokio::test]
1094    async fn test_explicit_root_dir_persists_across_restarts() {
1095        let tmp = tempfile::tempdir().unwrap();
1096
1097        // First boot — non-default root_dir triggers explicit path
1098        let mut config1 = NodeConfig {
1099            root_dir: tmp.path().to_path_buf(),
1100            ..Default::default()
1101        };
1102        let identity1 = NodeBuilder::resolve_identity(&mut config1).await.unwrap();
1103
1104        // Second boot — same dir
1105        let mut config2 = NodeConfig {
1106            root_dir: tmp.path().to_path_buf(),
1107            ..Default::default()
1108        };
1109        let identity2 = NodeBuilder::resolve_identity(&mut config2).await.unwrap();
1110
1111        assert_eq!(
1112            identity1.peer_id(),
1113            identity2.peer_id(),
1114            "explicit --root-dir must yield stable identity"
1115        );
1116    }
1117}