Skip to main content

saorsa_node/
node.rs

1//! Node implementation - thin wrapper around saorsa-core's `P2PNode`.
2
3use crate::ant_protocol::{CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE};
4use crate::config::{
5    default_nodes_dir, default_root_dir, EvmNetworkConfig, IpVersion, NetworkMode, NodeConfig,
6    NODE_IDENTITY_FILENAME,
7};
8use crate::error::{Error, Result};
9use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender};
10use crate::payment::metrics::QuotingMetricsTracker;
11use crate::payment::wallet::parse_rewards_address;
12use crate::payment::{EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
13use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
14use crate::upgrade::{AutoApplyUpgrader, UpgradeMonitor, UpgradeResult};
15
16use evmlib::Network as EvmNetwork;
17use saorsa_core::identity::NodeIdentity;
18use saorsa_core::{
19    BootstrapConfig as CoreBootstrapConfig, BootstrapManager,
20    IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent,
21    P2PNode,
22};
23use std::path::PathBuf;
24use std::sync::Arc;
25use tokio::sync::Semaphore;
26use tokio::task::JoinHandle;
27use tokio_util::sync::CancellationToken;
28use tracing::{debug, error, info, warn};
29
30/// Node storage capacity limit (5 GB).
31///
32/// Used to derive `max_records` for the quoting metrics pricing curve.
33/// A node advertises `NODE_STORAGE_LIMIT_BYTES / MAX_CHUNK_SIZE` as
34/// its maximum record count, giving the pricing algorithm a meaningful
35/// fullness ratio instead of a hardcoded constant.
36pub const NODE_STORAGE_LIMIT_BYTES: u64 = 5 * 1024 * 1024 * 1024;
37
38#[cfg(unix)]
39use tokio::signal::unix::{signal, SignalKind};
40
41/// Builder for constructing a saorsa node.
42pub struct NodeBuilder {
43    config: NodeConfig,
44}
45
46impl NodeBuilder {
47    /// Create a new node builder with the given configuration.
48    #[must_use]
49    pub fn new(config: NodeConfig) -> Self {
50        Self { config }
51    }
52
53    /// Build and start the node.
54    ///
55    /// # Errors
56    ///
57    /// Returns an error if the node fails to start.
58    pub async fn build(mut self) -> Result<RunningNode> {
59        info!("Building saorsa-node with config: {:?}", self.config);
60
61        // Validate rewards address in production
62        if self.config.network_mode == NetworkMode::Production {
63            match self.config.payment.rewards_address {
64                None => {
65                    return Err(Error::Config(
66                        "CRITICAL: Rewards address is not configured. \
67                         Set payment.rewards_address in config to your Arbitrum wallet address."
68                            .to_string(),
69                    ));
70                }
71                Some(ref addr) if addr == "0xYOUR_ARBITRUM_ADDRESS_HERE" || addr.is_empty() => {
72                    return Err(Error::Config(
73                        "CRITICAL: Rewards address is not configured. \
74                         Set payment.rewards_address in config to your Arbitrum wallet address."
75                            .to_string(),
76                    ));
77                }
78                Some(_) => {}
79            }
80        }
81
82        // Resolve identity and root_dir (may update self.config.root_dir)
83        let identity = Arc::new(Self::resolve_identity(&mut self.config).await?);
84        let peer_id = identity.peer_id().to_hex();
85
86        info!(peer_id = %peer_id, root_dir = %self.config.root_dir.display(), "Node identity resolved");
87
88        // Ensure root directory exists
89        std::fs::create_dir_all(&self.config.root_dir)?;
90
91        // Create shutdown token
92        let shutdown = CancellationToken::new();
93
94        // Create event channel
95        let (events_tx, events_rx) = create_event_channel();
96
97        // Convert our config to saorsa-core's config
98        let mut core_config = Self::build_core_config(&self.config)?;
99        // Inject the ML-DSA identity so the P2PNode's transport peer ID
100        // matches the pub_key embedded in payment quotes.
101        core_config.node_identity = Some(Arc::clone(&identity));
102        debug!("Core config: {:?}", core_config);
103
104        // Initialize saorsa-core's P2PNode
105        let p2p_node = P2PNode::new(core_config)
106            .await
107            .map_err(|e| Error::Startup(format!("Failed to create P2P node: {e}")))?;
108
109        // Create upgrade monitor if enabled
110        let upgrade_monitor = if self.config.upgrade.enabled {
111            let node_id_seed = p2p_node.peer_id().as_bytes();
112            Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
113        } else {
114            None
115        };
116
117        // Initialize bootstrap cache manager if enabled
118        let bootstrap_manager = if self.config.bootstrap_cache.enabled {
119            Self::build_bootstrap_manager(&self.config).await
120        } else {
121            info!("Bootstrap cache disabled");
122            None
123        };
124
125        // Initialize ANT protocol handler for chunk storage
126        let ant_protocol = if self.config.storage.enabled {
127            Some(Arc::new(
128                Self::build_ant_protocol(&self.config, &identity).await?,
129            ))
130        } else {
131            info!("Chunk storage disabled");
132            None
133        };
134
135        let node = RunningNode {
136            config: self.config,
137            p2p_node: Arc::new(p2p_node),
138            shutdown,
139            events_tx,
140            events_rx: Some(events_rx),
141            upgrade_monitor,
142            bootstrap_manager,
143            ant_protocol,
144            protocol_task: None,
145        };
146
147        Ok(node)
148    }
149
150    /// Build the saorsa-core `NodeConfig` from our config.
151    fn build_core_config(config: &NodeConfig) -> Result<CoreNodeConfig> {
152        let ipv6 = matches!(config.ip_version, IpVersion::Ipv6 | IpVersion::Dual);
153        let local = matches!(config.network_mode, NetworkMode::Development);
154
155        let mut core_config = CoreNodeConfig::builder()
156            .port(config.port)
157            .ipv6(ipv6)
158            .local(local)
159            .max_message_size(config.max_message_size)
160            .build()
161            .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?;
162
163        // Add bootstrap peers.
164        core_config.bootstrap_peers = config
165            .bootstrap
166            .iter()
167            .map(|addr| MultiAddr::quic(*addr))
168            .collect();
169
170        // Propagate network-mode tuning into saorsa-core where supported.
171        match config.network_mode {
172            NetworkMode::Production => {
173                core_config.diversity_config = Some(CoreDiversityConfig::default());
174            }
175            NetworkMode::Testnet => {
176                // Testnet allows loopback so nodes can be co-located on one machine.
177                core_config.allow_loopback = true;
178                let mut diversity = CoreDiversityConfig::testnet();
179                diversity.max_nodes_per_asn = config.testnet.max_nodes_per_asn;
180                diversity.max_nodes_per_ipv6_64 = config.testnet.max_nodes_per_64;
181                diversity.enable_geolocation_check = config.testnet.enable_geo_checks;
182                diversity.min_geographic_diversity = if config.testnet.enable_geo_checks {
183                    3
184                } else {
185                    1
186                };
187                core_config.diversity_config = Some(diversity);
188
189                if config.testnet.enforce_age_requirements {
190                    warn!(
191                        "testnet.enforce_age_requirements is set but saorsa-core does not yet \
192                         expose a knob; age checks may remain relaxed"
193                    );
194                }
195            }
196            NetworkMode::Development => {
197                core_config.diversity_config = Some(CoreDiversityConfig::permissive());
198            }
199        }
200
201        Ok(core_config)
202    }
203
204    /// Resolve the node identity from disk or generate a new one.
205    ///
206    /// **When `root_dir` differs from the platform default** (set via `--root-dir`
207    /// or loaded from `config.toml`):
208    ///   - Use `root_dir` directly: load existing identity or generate a new one.
209    ///
210    /// **When `root_dir` is the platform default** (first run, no config file):
211    ///   1. Scan `{default_root_dir}/nodes/` for subdirectories containing
212    ///      `node_identity.key`.
213    ///   2. **None found** — first run: generate identity, create
214    ///      `nodes/{full_peer_id}/`, save identity there, update `config.root_dir`.
215    ///   3. **Exactly one found** — load it and update `config.root_dir`.
216    ///   4. **Multiple found** — return an error asking for `--root-dir`.
217    async fn resolve_identity(config: &mut NodeConfig) -> Result<NodeIdentity> {
218        if config.root_dir != default_root_dir() {
219            return Self::load_or_generate_identity(&config.root_dir).await;
220        }
221
222        let nodes_dir = default_nodes_dir();
223        let identity_dirs = Self::scan_identity_dirs(&nodes_dir)?;
224
225        match identity_dirs.len() {
226            0 => {
227                // First run: generate new identity and create a peer-id-scoped subdirectory
228                let identity = NodeIdentity::generate().map_err(|e| {
229                    Error::Startup(format!("Failed to generate node identity: {e}"))
230                })?;
231                let peer_id = identity.peer_id().to_hex();
232                let peer_dir = nodes_dir.join(&peer_id);
233                std::fs::create_dir_all(&peer_dir)?;
234                identity
235                    .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
236                    .await
237                    .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
238                config.root_dir = peer_dir;
239                Ok(identity)
240            }
241            1 => {
242                let dir = identity_dirs
243                    .first()
244                    .ok_or_else(|| Error::Config("No identity dirs found".to_string()))?;
245                let identity = NodeIdentity::load_from_file(&dir.join(NODE_IDENTITY_FILENAME))
246                    .await
247                    .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))?;
248                config.root_dir.clone_from(dir);
249                Ok(identity)
250            }
251            _ => {
252                let dirs: Vec<String> = identity_dirs
253                    .iter()
254                    .filter_map(|d| d.file_name().map(|n| n.to_string_lossy().into_owned()))
255                    .collect();
256                Err(Error::Config(format!(
257                    "Multiple node identities found at {}: [{}]. Specify --root-dir to select one.",
258                    nodes_dir.display(),
259                    dirs.join(", ")
260                )))
261            }
262        }
263    }
264
265    /// Load an existing identity from `dir/node_identity.key`, or generate and save a new one.
266    async fn load_or_generate_identity(dir: &std::path::Path) -> Result<NodeIdentity> {
267        let key_path = dir.join(NODE_IDENTITY_FILENAME);
268        if key_path.exists() {
269            NodeIdentity::load_from_file(&key_path)
270                .await
271                .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))
272        } else {
273            let identity = NodeIdentity::generate()
274                .map_err(|e| Error::Startup(format!("Failed to generate node identity: {e}")))?;
275            std::fs::create_dir_all(dir)?;
276            identity
277                .save_to_file(&key_path)
278                .await
279                .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
280            Ok(identity)
281        }
282    }
283
284    /// Scan `base_dir` for immediate subdirectories that contain `node_identity.key`.
285    fn scan_identity_dirs(base_dir: &std::path::Path) -> Result<Vec<PathBuf>> {
286        let mut dirs = Vec::new();
287        let read_dir = match std::fs::read_dir(base_dir) {
288            Ok(rd) => rd,
289            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(dirs),
290            Err(e) => return Err(e.into()),
291        };
292        for entry in read_dir {
293            let entry = entry?;
294            let path = entry.path();
295            if path.is_dir() && path.join(NODE_IDENTITY_FILENAME).exists() {
296                dirs.push(path);
297            }
298        }
299        Ok(dirs)
300    }
301
302    fn build_upgrade_monitor(config: &NodeConfig, node_id_seed: &[u8]) -> Arc<UpgradeMonitor> {
303        let monitor = UpgradeMonitor::new(
304            config.upgrade.github_repo.clone(),
305            config.upgrade.channel,
306            config.upgrade.check_interval_hours,
307        );
308
309        if config.upgrade.staged_rollout_hours > 0 {
310            Arc::new(monitor.with_staged_rollout(node_id_seed, config.upgrade.staged_rollout_hours))
311        } else {
312            Arc::new(monitor)
313        }
314    }
315
316    /// Build the ANT protocol handler from config.
317    ///
318    /// Initializes LMDB storage, payment verifier, and quote generator.
319    /// Wires ML-DSA-65 signing from the node's identity into the quote generator.
320    async fn build_ant_protocol(
321        config: &NodeConfig,
322        identity: &NodeIdentity,
323    ) -> Result<AntProtocol> {
324        // Create LMDB storage
325        let storage_config = LmdbStorageConfig {
326            root_dir: config.root_dir.clone(),
327            verify_on_read: config.storage.verify_on_read,
328            max_chunks: config.storage.max_chunks,
329            max_map_size: config.storage.db_size_gb.saturating_mul(1_073_741_824),
330        };
331        let storage = LmdbStorage::new(storage_config)
332            .await
333            .map_err(|e| Error::Startup(format!("Failed to create LMDB storage: {e}")))?;
334
335        // Parse rewards address (required — node must know where to receive payments)
336        let rewards_address = match config.payment.rewards_address {
337            Some(ref addr) => parse_rewards_address(addr)?,
338            None => {
339                return Err(Error::Startup(
340                    "No rewards address configured. Set --rewards-address or payment.rewards_address in config.".to_string(),
341                ));
342            }
343        };
344
345        // Create payment verifier
346        let evm_network = match config.payment.evm_network {
347            EvmNetworkConfig::ArbitrumOne => EvmNetwork::ArbitrumOne,
348            EvmNetworkConfig::ArbitrumSepolia => EvmNetwork::ArbitrumSepoliaTest,
349        };
350        let payment_config = PaymentVerifierConfig {
351            evm: EvmVerifierConfig {
352                network: evm_network,
353            },
354            cache_capacity: config.payment.cache_capacity,
355            local_rewards_address: rewards_address,
356        };
357        let payment_verifier = PaymentVerifier::new(payment_config);
358        // Safe: 5GB fits in usize on all supported 64-bit platforms.
359        #[allow(clippy::cast_possible_truncation)]
360        let max_records = (NODE_STORAGE_LIMIT_BYTES as usize) / MAX_CHUNK_SIZE;
361        let metrics_tracker = QuotingMetricsTracker::new(max_records, 0);
362        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
363
364        // Wire ML-DSA-65 signing from node identity
365        crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)?;
366
367        info!(
368            "ANT protocol handler initialized with ML-DSA-65 signing (protocol={})",
369            CHUNK_PROTOCOL_ID
370        );
371
372        Ok(AntProtocol::new(
373            Arc::new(storage),
374            Arc::new(payment_verifier),
375            Arc::new(quote_generator),
376        ))
377    }
378
379    /// Build the bootstrap cache manager from config.
380    async fn build_bootstrap_manager(config: &NodeConfig) -> Option<BootstrapManager> {
381        let cache_dir = config
382            .bootstrap_cache
383            .cache_dir
384            .clone()
385            .unwrap_or_else(|| config.root_dir.join("bootstrap_cache"));
386
387        // Create cache directory
388        if let Err(e) = std::fs::create_dir_all(&cache_dir) {
389            warn!("Failed to create bootstrap cache directory: {e}");
390            return None;
391        }
392
393        let bootstrap_config = CoreBootstrapConfig {
394            cache_dir,
395            max_peers: config.bootstrap_cache.max_contacts,
396            ..CoreBootstrapConfig::default()
397        };
398
399        match BootstrapManager::with_config(bootstrap_config).await {
400            Ok(manager) => {
401                info!(
402                    "Bootstrap cache initialized with {} max contacts",
403                    config.bootstrap_cache.max_contacts
404                );
405                Some(manager)
406            }
407            Err(e) => {
408                warn!("Failed to initialize bootstrap cache: {e}");
409                None
410            }
411        }
412    }
413}
414
415/// A running saorsa node.
416pub struct RunningNode {
417    config: NodeConfig,
418    p2p_node: Arc<P2PNode>,
419    shutdown: CancellationToken,
420    events_tx: NodeEventsSender,
421    events_rx: Option<NodeEventsChannel>,
422    upgrade_monitor: Option<Arc<UpgradeMonitor>>,
423    /// Bootstrap cache manager for persistent peer storage.
424    bootstrap_manager: Option<BootstrapManager>,
425    /// ANT protocol handler for chunk storage.
426    ant_protocol: Option<Arc<AntProtocol>>,
427    /// Protocol message routing background task.
428    protocol_task: Option<JoinHandle<()>>,
429}
430
431impl RunningNode {
432    /// Get the node's root directory.
433    #[must_use]
434    pub fn root_dir(&self) -> &PathBuf {
435        &self.config.root_dir
436    }
437
438    /// Get a receiver for node events.
439    ///
440    /// Note: Can only be called once. Subsequent calls return None.
441    pub fn events(&mut self) -> Option<NodeEventsChannel> {
442        self.events_rx.take()
443    }
444
445    /// Subscribe to node events.
446    #[must_use]
447    pub fn subscribe_events(&self) -> NodeEventsChannel {
448        self.events_tx.subscribe()
449    }
450
451    /// Run the node until shutdown is requested.
452    ///
453    /// # Errors
454    ///
455    /// Returns an error if the node encounters a fatal error.
456    pub async fn run(&mut self) -> Result<()> {
457        info!("Node runtime loop starting");
458
459        // Start the P2P node
460        self.p2p_node
461            .start()
462            .await
463            .map_err(|e| Error::Startup(format!("Failed to start P2P node: {e}")))?;
464
465        let addrs = self.p2p_node.listen_addrs().await;
466        info!(listen_addrs = ?addrs, "P2P node started");
467
468        // Emit started event
469        if let Err(e) = self.events_tx.send(NodeEvent::Started) {
470            warn!("Failed to send Started event: {e}");
471        }
472
473        // Start protocol message routing (P2P → AntProtocol → P2P response)
474        self.start_protocol_routing();
475
476        // Start upgrade monitor if enabled
477        if let Some(ref monitor) = self.upgrade_monitor {
478            let monitor = Arc::clone(monitor);
479            let events_tx = self.events_tx.clone();
480            let shutdown = self.shutdown.clone();
481
482            tokio::spawn(async move {
483                let upgrader = AutoApplyUpgrader::new();
484
485                loop {
486                    tokio::select! {
487                        () = shutdown.cancelled() => {
488                            break;
489                        }
490                        result = monitor.check_for_updates() => {
491                            if let Ok(Some(upgrade_info)) = result {
492                                info!(
493                                    current_version = %upgrader.current_version(),
494                                    new_version = %upgrade_info.version,
495                                    "Upgrade available"
496                                );
497
498                                // Send notification event
499                                if let Err(e) = events_tx.send(NodeEvent::UpgradeAvailable {
500                                    version: upgrade_info.version.to_string(),
501                                }) {
502                                    warn!("Failed to send UpgradeAvailable event: {e}");
503                                }
504
505                                // Auto-apply the upgrade
506                                info!("Starting auto-apply upgrade...");
507                                match upgrader.apply_upgrade(&upgrade_info).await {
508                                    Ok(UpgradeResult::Success { version }) => {
509                                        info!(version = %version, "Upgrade successful, process will restart");
510                                        // If we reach here, exec() failed or not supported
511                                    }
512                                    Ok(UpgradeResult::RolledBack { reason }) => {
513                                        warn!("Upgrade rolled back: {reason}");
514                                    }
515                                    Ok(UpgradeResult::NoUpgrade) => {
516                                        debug!("No upgrade needed");
517                                    }
518                                    Err(e) => {
519                                        error!("Critical upgrade error: {e}");
520                                    }
521                                }
522                            }
523                            // Wait for next check interval
524                            tokio::time::sleep(monitor.check_interval()).await;
525                        }
526                    }
527                }
528            });
529        }
530
531        info!("Node running, waiting for shutdown signal");
532
533        // Run the main event loop with signal handling
534        self.run_event_loop().await?;
535
536        // Log bootstrap cache stats before shutdown
537        if let Some(ref manager) = self.bootstrap_manager {
538            let stats = manager.stats().await;
539            info!(
540                "Bootstrap cache shutdown: {} peers, avg quality {:.2}",
541                stats.total_peers, stats.average_quality
542            );
543        }
544
545        // Stop protocol routing task
546        if let Some(handle) = self.protocol_task.take() {
547            handle.abort();
548        }
549
550        // Shutdown P2P node
551        info!("Shutting down P2P node...");
552        if let Err(e) = self.p2p_node.shutdown().await {
553            warn!("Error during P2P node shutdown: {e}");
554        }
555
556        if let Err(e) = self.events_tx.send(NodeEvent::ShuttingDown) {
557            warn!("Failed to send ShuttingDown event: {e}");
558        }
559        info!("Node shutdown complete");
560        Ok(())
561    }
562
563    /// Run the main event loop, handling shutdown and signals.
564    #[cfg(unix)]
565    async fn run_event_loop(&self) -> Result<()> {
566        let mut sigterm = signal(SignalKind::terminate())?;
567        let mut sighup = signal(SignalKind::hangup())?;
568
569        loop {
570            tokio::select! {
571                () = self.shutdown.cancelled() => {
572                    info!("Shutdown signal received");
573                    break;
574                }
575                _ = tokio::signal::ctrl_c() => {
576                    info!("Received SIGINT (Ctrl-C), initiating shutdown");
577                    self.shutdown();
578                    break;
579                }
580                _ = sigterm.recv() => {
581                    info!("Received SIGTERM, initiating shutdown");
582                    self.shutdown();
583                    break;
584                }
585                _ = sighup.recv() => {
586                    info!("Received SIGHUP (config reload not yet supported)");
587                }
588            }
589        }
590        Ok(())
591    }
592
593    /// Run the main event loop, handling shutdown signals (non-Unix version).
594    #[cfg(not(unix))]
595    async fn run_event_loop(&self) -> Result<()> {
596        loop {
597            tokio::select! {
598                () = self.shutdown.cancelled() => {
599                    info!("Shutdown signal received");
600                    break;
601                }
602                _ = tokio::signal::ctrl_c() => {
603                    info!("Received Ctrl-C, initiating shutdown");
604                    self.shutdown();
605                    break;
606                }
607            }
608        }
609        Ok(())
610    }
611
612    /// Start the protocol message routing background task.
613    ///
614    /// Subscribes to P2P events and routes incoming chunk protocol messages
615    /// to the `AntProtocol` handler, sending responses back to the sender.
616    fn start_protocol_routing(&mut self) {
617        let protocol = match self.ant_protocol {
618            Some(ref p) => Arc::clone(p),
619            None => return,
620        };
621
622        let mut events = self.p2p_node.subscribe_events();
623        let p2p = Arc::clone(&self.p2p_node);
624        let semaphore = Arc::new(Semaphore::new(64));
625
626        self.protocol_task = Some(tokio::spawn(async move {
627            while let Ok(event) = events.recv().await {
628                if let P2PEvent::Message {
629                    topic,
630                    source: Some(source),
631                    data,
632                } = event
633                {
634                    let handler_info: Option<(&str, &str)> = if topic == CHUNK_PROTOCOL_ID {
635                        Some(("chunk", CHUNK_PROTOCOL_ID))
636                    } else {
637                        None
638                    };
639
640                    if let Some((data_type, response_topic)) = handler_info {
641                        debug!("Received {data_type} protocol message from {source}");
642                        let protocol = Arc::clone(&protocol);
643                        let p2p = Arc::clone(&p2p);
644                        let sem = semaphore.clone();
645                        tokio::spawn(async move {
646                            let Ok(_permit) = sem.acquire().await else {
647                                return;
648                            };
649                            let result = match data_type {
650                                "chunk" => protocol.handle_message(&data).await,
651                                _ => return,
652                            };
653                            match result {
654                                Ok(response) => {
655                                    if let Err(e) = p2p
656                                        .send_message(
657                                            &source,
658                                            response_topic,
659                                            response.to_vec(),
660                                            &[],
661                                        )
662                                        .await
663                                    {
664                                        warn!("Failed to send {data_type} protocol response to {source}: {e}");
665                                    }
666                                }
667                                Err(e) => {
668                                    warn!("{data_type} protocol handler error: {e}");
669                                }
670                            }
671                        });
672                    }
673                }
674            }
675        }));
676        info!("Protocol message routing started");
677    }
678
679    /// Request the node to shut down.
680    pub fn shutdown(&self) {
681        self.shutdown.cancel();
682    }
683}
684
685#[cfg(test)]
686#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
687mod tests {
688    use super::*;
689    use crate::config::NODES_SUBDIR;
690
691    #[test]
692    fn test_build_upgrade_monitor_staged_rollout_enabled() {
693        let config = NodeConfig {
694            upgrade: crate::config::UpgradeConfig {
695                enabled: true,
696                staged_rollout_hours: 24,
697                ..Default::default()
698            },
699            ..Default::default()
700        };
701        let seed = b"node-seed";
702
703        let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
704        assert!(monitor.has_staged_rollout());
705    }
706
707    #[test]
708    fn test_build_upgrade_monitor_staged_rollout_disabled() {
709        let config = NodeConfig {
710            upgrade: crate::config::UpgradeConfig {
711                enabled: true,
712                staged_rollout_hours: 0,
713                ..Default::default()
714            },
715            ..Default::default()
716        };
717        let seed = b"node-seed";
718
719        let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
720        assert!(!monitor.has_staged_rollout());
721    }
722
723    #[test]
724    fn test_build_core_config_sets_production_mode() {
725        let config = NodeConfig {
726            network_mode: NetworkMode::Production,
727            ..Default::default()
728        };
729        let core = NodeBuilder::build_core_config(&config).expect("core config");
730        assert!(core.diversity_config.is_some());
731    }
732
733    #[test]
734    fn test_build_core_config_sets_development_mode_relaxed() {
735        let config = NodeConfig {
736            network_mode: NetworkMode::Development,
737            ..Default::default()
738        };
739        let core = NodeBuilder::build_core_config(&config).expect("core config");
740        let diversity = core.diversity_config.expect("diversity");
741        assert!(diversity.is_relaxed());
742    }
743
744    #[test]
745    fn test_scan_identity_dirs_empty_dir() {
746        let tmp = tempfile::tempdir().unwrap();
747        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
748        assert!(dirs.is_empty());
749    }
750
751    #[test]
752    fn test_scan_identity_dirs_nonexistent_dir() {
753        let tmp = tempfile::tempdir().unwrap();
754        let path = tmp.path().join("nonexistent_identity_dir");
755        let dirs = NodeBuilder::scan_identity_dirs(&path).unwrap();
756        assert!(dirs.is_empty());
757    }
758
759    #[test]
760    fn test_scan_identity_dirs_finds_one() {
761        let tmp = tempfile::tempdir().unwrap();
762        let node_dir = tmp.path().join("abc123");
763        std::fs::create_dir_all(&node_dir).unwrap();
764        std::fs::write(node_dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
765
766        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
767        assert_eq!(dirs.len(), 1);
768        assert_eq!(dirs[0], node_dir);
769    }
770
771    #[test]
772    fn test_scan_identity_dirs_finds_multiple() {
773        let tmp = tempfile::tempdir().unwrap();
774        for name in &["node_a", "node_b"] {
775            let dir = tmp.path().join(name);
776            std::fs::create_dir_all(&dir).unwrap();
777            std::fs::write(dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
778        }
779        // A directory without a key file should be ignored
780        std::fs::create_dir_all(tmp.path().join("no_key")).unwrap();
781
782        let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
783        assert_eq!(dirs.len(), 2);
784    }
785
786    #[tokio::test]
787    async fn test_resolve_identity_first_run_creates_identity() {
788        let tmp = tempfile::tempdir().unwrap();
789        let mut config = NodeConfig {
790            root_dir: tmp.path().to_path_buf(),
791            ..Default::default()
792        };
793
794        let identity = NodeBuilder::resolve_identity(&mut config).await.unwrap();
795        // Key file should exist
796        assert!(tmp.path().join(NODE_IDENTITY_FILENAME).exists());
797        // peer_id should be derivable from the identity
798        let peer_id = identity.peer_id().to_hex();
799        assert_eq!(peer_id.len(), 64); // 32 bytes hex-encoded
800    }
801
802    #[tokio::test]
803    async fn test_resolve_identity_loads_existing() {
804        let tmp = tempfile::tempdir().unwrap();
805
806        // Generate and save an identity
807        let original = NodeIdentity::generate().unwrap();
808        original
809            .save_to_file(&tmp.path().join(NODE_IDENTITY_FILENAME))
810            .await
811            .unwrap();
812
813        let mut config = NodeConfig {
814            root_dir: tmp.path().to_path_buf(),
815            ..Default::default()
816        };
817
818        let loaded = NodeBuilder::resolve_identity(&mut config).await.unwrap();
819        assert_eq!(loaded.peer_id(), original.peer_id());
820    }
821
822    #[test]
823    fn test_peer_id_hex_length() {
824        let id = saorsa_core::identity::PeerId::from_bytes([0x42; 32]);
825        let hex = id.to_hex();
826        assert_eq!(hex.len(), 64); // 32 bytes = 64 hex chars
827    }
828
829    /// Simulates a node restart: first run creates identity in a scoped subdir
830    /// under `nodes/`, second run discovers and reloads it — `peer_id` must be
831    /// identical and the directory name is the full 64-char hex peer ID.
832    #[tokio::test]
833    async fn test_identity_persisted_across_restarts() {
834        let base_dir = tempfile::tempdir().unwrap();
835        let nodes_dir = base_dir.path().join(NODES_SUBDIR);
836
837        // First "boot": generate identity, save it in nodes/{peer_id}/
838        let identity1 = NodeIdentity::generate().unwrap();
839        let peer_id1 = identity1.peer_id().to_hex();
840        let peer_dir = nodes_dir.join(&peer_id1);
841        std::fs::create_dir_all(&peer_dir).unwrap();
842        identity1
843            .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
844            .await
845            .unwrap();
846
847        // Verify directory name is the full 64-char hex peer ID
848        assert_eq!(peer_id1.len(), 64);
849        assert_eq!(peer_dir.file_name().unwrap().to_string_lossy(), peer_id1);
850
851        // Second "boot": scan should find and reload the same identity
852        let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
853        assert_eq!(identity_dirs.len(), 1);
854        let loaded = NodeIdentity::load_from_file(&identity_dirs[0].join(NODE_IDENTITY_FILENAME))
855            .await
856            .unwrap();
857        let peer_id2 = loaded.peer_id().to_hex();
858
859        assert_eq!(peer_id1, peer_id2, "peer_id must survive restart");
860        assert_eq!(
861            identity_dirs[0], peer_dir,
862            "root_dir must be the same directory"
863        );
864    }
865
866    /// When two identity subdirs exist under `nodes/`, the scan finds multiple
867    /// and the resolve path would error asking for `--root-dir`.
868    #[tokio::test]
869    async fn test_multiple_identities_errors() {
870        let base_dir = tempfile::tempdir().unwrap();
871        let nodes_dir = base_dir.path().join(NODES_SUBDIR);
872
873        // Create two identity subdirectories under nodes/
874        for name in &["aaaa", "bbbb"] {
875            let dir = nodes_dir.join(name);
876            std::fs::create_dir_all(&dir).unwrap();
877            let identity = NodeIdentity::generate().unwrap();
878            identity
879                .save_to_file(&dir.join(NODE_IDENTITY_FILENAME))
880                .await
881                .unwrap();
882        }
883
884        let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
885        assert_eq!(identity_dirs.len(), 2, "should find both identity dirs");
886    }
887
888    /// With a non-default `root_dir` (explicit path), the identity is created on
889    /// first run and reloaded on subsequent runs from the same directory.
890    #[tokio::test]
891    async fn test_explicit_root_dir_persists_across_restarts() {
892        let tmp = tempfile::tempdir().unwrap();
893
894        // First boot — non-default root_dir triggers explicit path
895        let mut config1 = NodeConfig {
896            root_dir: tmp.path().to_path_buf(),
897            ..Default::default()
898        };
899        let identity1 = NodeBuilder::resolve_identity(&mut config1).await.unwrap();
900
901        // Second boot — same dir
902        let mut config2 = NodeConfig {
903            root_dir: tmp.path().to_path_buf(),
904            ..Default::default()
905        };
906        let identity2 = NodeBuilder::resolve_identity(&mut config2).await.unwrap();
907
908        assert_eq!(
909            identity1.peer_id(),
910            identity2.peer_id(),
911            "explicit --root-dir must yield stable identity"
912        );
913    }
914}