1use crate::ant_protocol::{CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE};
4use crate::config::{
5 default_nodes_dir, default_root_dir, EvmNetworkConfig, IpVersion, NetworkMode, NodeConfig,
6 NODE_IDENTITY_FILENAME,
7};
8use crate::error::{Error, Result};
9use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender};
10use crate::payment::metrics::QuotingMetricsTracker;
11use crate::payment::wallet::parse_rewards_address;
12use crate::payment::{EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
13use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
14use crate::upgrade::{
15 upgrade_cache_dir, AutoApplyUpgrader, BinaryCache, ReleaseCache, UpgradeMonitor, UpgradeResult,
16};
17use evmlib::Network as EvmNetwork;
18use rand::Rng;
19use saorsa_core::identity::NodeIdentity;
20use saorsa_core::{
21 BootstrapConfig as CoreBootstrapConfig, BootstrapManager,
22 IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent,
23 P2PNode,
24};
25use std::path::PathBuf;
26use std::sync::atomic::{AtomicI32, Ordering};
27use std::sync::Arc;
28use tokio::sync::Semaphore;
29use tokio::task::JoinHandle;
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, info, warn};
32
33pub const NODE_STORAGE_LIMIT_BYTES: u64 = 5 * 1024 * 1024 * 1024;
40
41#[cfg(unix)]
42use tokio::signal::unix::{signal, SignalKind};
43
44pub struct NodeBuilder {
46 config: NodeConfig,
47}
48
49impl NodeBuilder {
50 #[must_use]
52 pub fn new(config: NodeConfig) -> Self {
53 Self { config }
54 }
55
56 pub async fn build(mut self) -> Result<RunningNode> {
62 info!("Building ant-node with config: {:?}", self.config);
63
64 if self.config.network_mode == NetworkMode::Production {
66 match self.config.payment.rewards_address {
67 None => {
68 return Err(Error::Config(
69 "CRITICAL: Rewards address is not configured. \
70 Set payment.rewards_address in config to your Arbitrum wallet address."
71 .to_string(),
72 ));
73 }
74 Some(ref addr) if addr == "0xYOUR_ARBITRUM_ADDRESS_HERE" || addr.is_empty() => {
75 return Err(Error::Config(
76 "CRITICAL: Rewards address is not configured. \
77 Set payment.rewards_address in config to your Arbitrum wallet address."
78 .to_string(),
79 ));
80 }
81 Some(_) => {}
82 }
83 }
84
85 let identity = Arc::new(Self::resolve_identity(&mut self.config).await?);
87 let peer_id = identity.peer_id().to_hex();
88
89 info!(peer_id = %peer_id, root_dir = %self.config.root_dir.display(), "Node identity resolved");
90
91 std::fs::create_dir_all(&self.config.root_dir)?;
93
94 let shutdown = CancellationToken::new();
96
97 let (events_tx, events_rx) = create_event_channel();
99
100 let mut core_config = Self::build_core_config(&self.config)?;
102 core_config.node_identity = Some(Arc::clone(&identity));
105 debug!("Core config: {:?}", core_config);
106
107 let p2p_node = P2PNode::new(core_config)
109 .await
110 .map_err(|e| Error::Startup(format!("Failed to create P2P node: {e}")))?;
111
112 let upgrade_monitor = {
114 let node_id_seed = p2p_node.peer_id().as_bytes();
115 Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
116 };
117
118 let bootstrap_manager = if self.config.bootstrap_cache.enabled {
120 Self::build_bootstrap_manager(&self.config).await
121 } else {
122 info!("Bootstrap cache disabled");
123 None
124 };
125
126 let ant_protocol = if self.config.storage.enabled {
128 Some(Arc::new(
129 Self::build_ant_protocol(&self.config, &identity).await?,
130 ))
131 } else {
132 info!("Chunk storage disabled");
133 None
134 };
135
136 let node = RunningNode {
137 config: self.config,
138 p2p_node: Arc::new(p2p_node),
139 shutdown,
140 events_tx,
141 events_rx: Some(events_rx),
142 upgrade_monitor,
143 bootstrap_manager,
144 ant_protocol,
145 protocol_task: None,
146 upgrade_exit_code: Arc::new(AtomicI32::new(-1)),
147 };
148
149 Ok(node)
150 }
151
152 fn build_core_config(config: &NodeConfig) -> Result<CoreNodeConfig> {
154 let ipv6 = matches!(config.ip_version, IpVersion::Ipv6 | IpVersion::Dual);
155 let local = matches!(config.network_mode, NetworkMode::Development);
156
157 let mut core_config = CoreNodeConfig::builder()
158 .port(config.port)
159 .ipv6(ipv6)
160 .local(local)
161 .max_message_size(config.max_message_size)
162 .build()
163 .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?;
164
165 core_config.bootstrap_peers = config
167 .bootstrap
168 .iter()
169 .map(|addr| MultiAddr::quic(*addr))
170 .collect();
171
172 match config.network_mode {
174 NetworkMode::Production => {
175 core_config.diversity_config = Some(CoreDiversityConfig::default());
176 }
177 NetworkMode::Testnet => {
178 core_config.allow_loopback = true;
180 let mut diversity = CoreDiversityConfig::testnet();
181 diversity.max_nodes_per_asn = config.testnet.max_nodes_per_asn;
182 diversity.max_nodes_per_ipv6_64 = config.testnet.max_nodes_per_64;
183 diversity.enable_geolocation_check = config.testnet.enable_geo_checks;
184 diversity.min_geographic_diversity = if config.testnet.enable_geo_checks {
185 3
186 } else {
187 1
188 };
189 core_config.diversity_config = Some(diversity);
190
191 if config.testnet.enforce_age_requirements {
192 warn!(
193 "testnet.enforce_age_requirements is set but saorsa-core does not yet \
194 expose a knob; age checks may remain relaxed"
195 );
196 }
197 }
198 NetworkMode::Development => {
199 core_config.diversity_config = Some(CoreDiversityConfig::permissive());
200 }
201 }
202
203 Ok(core_config)
204 }
205
206 async fn resolve_identity(config: &mut NodeConfig) -> Result<NodeIdentity> {
220 if config.root_dir != default_root_dir() {
221 return Self::load_or_generate_identity(&config.root_dir).await;
222 }
223
224 let nodes_dir = default_nodes_dir();
225 let identity_dirs = Self::scan_identity_dirs(&nodes_dir)?;
226
227 match identity_dirs.len() {
228 0 => {
229 let identity = NodeIdentity::generate().map_err(|e| {
231 Error::Startup(format!("Failed to generate node identity: {e}"))
232 })?;
233 let peer_id = identity.peer_id().to_hex();
234 let peer_dir = nodes_dir.join(&peer_id);
235 std::fs::create_dir_all(&peer_dir)?;
236 identity
237 .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
238 .await
239 .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
240 config.root_dir = peer_dir;
241 Ok(identity)
242 }
243 1 => {
244 let dir = identity_dirs
245 .first()
246 .ok_or_else(|| Error::Config("No identity dirs found".to_string()))?;
247 let identity = NodeIdentity::load_from_file(&dir.join(NODE_IDENTITY_FILENAME))
248 .await
249 .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))?;
250 config.root_dir.clone_from(dir);
251 Ok(identity)
252 }
253 _ => {
254 let dirs: Vec<String> = identity_dirs
255 .iter()
256 .filter_map(|d| d.file_name().map(|n| n.to_string_lossy().into_owned()))
257 .collect();
258 Err(Error::Config(format!(
259 "Multiple node identities found at {}: [{}]. Specify --root-dir to select one.",
260 nodes_dir.display(),
261 dirs.join(", ")
262 )))
263 }
264 }
265 }
266
267 async fn load_or_generate_identity(dir: &std::path::Path) -> Result<NodeIdentity> {
269 let key_path = dir.join(NODE_IDENTITY_FILENAME);
270 if key_path.exists() {
271 NodeIdentity::load_from_file(&key_path)
272 .await
273 .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))
274 } else {
275 let identity = NodeIdentity::generate()
276 .map_err(|e| Error::Startup(format!("Failed to generate node identity: {e}")))?;
277 std::fs::create_dir_all(dir)?;
278 identity
279 .save_to_file(&key_path)
280 .await
281 .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
282 Ok(identity)
283 }
284 }
285
286 fn scan_identity_dirs(base_dir: &std::path::Path) -> Result<Vec<PathBuf>> {
288 let mut dirs = Vec::new();
289 let read_dir = match std::fs::read_dir(base_dir) {
290 Ok(rd) => rd,
291 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(dirs),
292 Err(e) => return Err(e.into()),
293 };
294 for entry in read_dir {
295 let entry = entry?;
296 let path = entry.path();
297 if path.is_dir() && path.join(NODE_IDENTITY_FILENAME).exists() {
298 dirs.push(path);
299 }
300 }
301 Ok(dirs)
302 }
303
304 fn build_upgrade_monitor(config: &NodeConfig, node_id_seed: &[u8]) -> UpgradeMonitor {
305 let mut monitor = UpgradeMonitor::new(
306 config.upgrade.github_repo.clone(),
307 config.upgrade.channel,
308 config.upgrade.check_interval_hours,
309 );
310
311 if let Ok(cache_dir) = upgrade_cache_dir() {
312 monitor = monitor.with_release_cache(ReleaseCache::new(
313 cache_dir,
314 std::time::Duration::from_secs(3600),
315 ));
316 }
317
318 if config.upgrade.staged_rollout_hours > 0 {
319 monitor =
320 monitor.with_staged_rollout(node_id_seed, config.upgrade.staged_rollout_hours);
321 }
322
323 monitor
324 }
325
326 async fn build_ant_protocol(
331 config: &NodeConfig,
332 identity: &NodeIdentity,
333 ) -> Result<AntProtocol> {
334 let storage_config = LmdbStorageConfig {
336 root_dir: config.root_dir.clone(),
337 verify_on_read: config.storage.verify_on_read,
338 max_chunks: config.storage.max_chunks,
339 max_map_size: config.storage.db_size_gb.saturating_mul(1_073_741_824),
340 };
341 let storage = LmdbStorage::new(storage_config)
342 .await
343 .map_err(|e| Error::Startup(format!("Failed to create LMDB storage: {e}")))?;
344
345 let rewards_address = match config.payment.rewards_address {
347 Some(ref addr) => parse_rewards_address(addr)?,
348 None => {
349 return Err(Error::Startup(
350 "No rewards address configured. Set --rewards-address or payment.rewards_address in config.".to_string(),
351 ));
352 }
353 };
354
355 let evm_network = match config.payment.evm_network {
357 EvmNetworkConfig::ArbitrumOne => EvmNetwork::ArbitrumOne,
358 EvmNetworkConfig::ArbitrumSepolia => EvmNetwork::ArbitrumSepoliaTest,
359 };
360 let payment_config = PaymentVerifierConfig {
361 evm: EvmVerifierConfig {
362 network: evm_network,
363 },
364 cache_capacity: config.payment.cache_capacity,
365 local_rewards_address: rewards_address,
366 };
367 let payment_verifier = PaymentVerifier::new(payment_config);
368 #[allow(clippy::cast_possible_truncation)]
370 let max_records = (NODE_STORAGE_LIMIT_BYTES as usize) / MAX_CHUNK_SIZE;
371 let metrics_tracker = QuotingMetricsTracker::new(max_records, 0);
372 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
373
374 crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)?;
377
378 let protocol = AntProtocol::new(
379 Arc::new(storage),
380 Arc::new(payment_verifier),
381 Arc::new(quote_generator),
382 );
383
384 info!(
385 "ANT protocol handler initialized with ML-DSA-65 signing (protocol={CHUNK_PROTOCOL_ID})"
386 );
387
388 Ok(protocol)
389 }
390
391 async fn build_bootstrap_manager(config: &NodeConfig) -> Option<BootstrapManager> {
393 let cache_dir = config
394 .bootstrap_cache
395 .cache_dir
396 .clone()
397 .unwrap_or_else(|| config.root_dir.join("bootstrap_cache"));
398
399 if let Err(e) = std::fs::create_dir_all(&cache_dir) {
401 warn!("Failed to create bootstrap cache directory: {e}");
402 return None;
403 }
404
405 let bootstrap_config = CoreBootstrapConfig {
406 cache_dir,
407 max_peers: config.bootstrap_cache.max_contacts,
408 ..CoreBootstrapConfig::default()
409 };
410
411 match BootstrapManager::with_config(bootstrap_config).await {
412 Ok(manager) => {
413 info!(
414 "Bootstrap cache initialized with {} max contacts",
415 config.bootstrap_cache.max_contacts
416 );
417 Some(manager)
418 }
419 Err(e) => {
420 warn!("Failed to initialize bootstrap cache: {e}");
421 None
422 }
423 }
424 }
425}
426
427pub struct RunningNode {
429 config: NodeConfig,
430 p2p_node: Arc<P2PNode>,
431 shutdown: CancellationToken,
432 events_tx: NodeEventsSender,
433 events_rx: Option<NodeEventsChannel>,
434 upgrade_monitor: Option<UpgradeMonitor>,
435 bootstrap_manager: Option<BootstrapManager>,
437 ant_protocol: Option<Arc<AntProtocol>>,
439 protocol_task: Option<JoinHandle<()>>,
441 upgrade_exit_code: Arc<AtomicI32>,
443}
444
445impl RunningNode {
446 #[must_use]
448 pub fn root_dir(&self) -> &PathBuf {
449 &self.config.root_dir
450 }
451
452 pub fn events(&mut self) -> Option<NodeEventsChannel> {
456 self.events_rx.take()
457 }
458
459 #[must_use]
461 pub fn subscribe_events(&self) -> NodeEventsChannel {
462 self.events_tx.subscribe()
463 }
464
465 #[allow(clippy::too_many_lines)]
471 pub async fn run(&mut self) -> Result<()> {
472 info!("Node runtime loop starting");
473
474 self.p2p_node
476 .start()
477 .await
478 .map_err(|e| Error::Startup(format!("Failed to start P2P node: {e}")))?;
479
480 let listen_addrs = self.p2p_node.listen_addrs().await;
481 info!(listen_addrs = ?listen_addrs, "P2P node started");
482
483 let actual_port = listen_addrs
485 .first()
486 .and_then(MultiAddr::port)
487 .unwrap_or(self.config.port);
488 info!(
489 port = actual_port,
490 "Node is running on port: {}", actual_port
491 );
492
493 if let Err(e) = self.events_tx.send(NodeEvent::Started) {
495 warn!("Failed to send Started event: {e}");
496 }
497
498 self.start_protocol_routing();
500
501 if let Some(monitor) = self.upgrade_monitor.take() {
503 let events_tx = self.events_tx.clone();
504 let shutdown = self.shutdown.clone();
505 let stop_on_upgrade = self.config.upgrade.stop_on_upgrade;
506 let upgrade_exit_code = Arc::clone(&self.upgrade_exit_code);
507
508 tokio::spawn(async move {
509 let mut monitor = monitor;
510 let mut upgrader = AutoApplyUpgrader::new().with_stop_on_upgrade(stop_on_upgrade);
511 if let Ok(cache_dir) = upgrade_cache_dir() {
512 upgrader = upgrader.with_binary_cache(BinaryCache::new(cache_dir));
513 }
514
515 {
518 let jitter_duration = jittered_interval(monitor.check_interval());
519 let first_check_time = chrono::Utc::now()
520 + chrono::Duration::from_std(jitter_duration).unwrap_or_else(|e| {
521 warn!("chrono::Duration::from_std failed for jitter ({e}), defaulting to 1 minute");
522 chrono::Duration::minutes(1)
523 });
524 info!(
525 "First upgrade check scheduled for {} (jitter: {}s)",
526 first_check_time.to_rfc3339(),
527 jitter_duration.as_secs()
528 );
529 tokio::time::sleep(jitter_duration).await;
530 }
531
532 loop {
533 tokio::select! {
534 () = shutdown.cancelled() => {
535 break;
536 }
537 result = monitor.check_for_ready_upgrade() => {
538 match result {
539 Ok(Some(upgrade_info)) => {
540 info!(
541 current_version = %upgrader.current_version(),
542 new_version = %upgrade_info.version,
543 "Upgrade available"
544 );
545
546 if let Err(e) = events_tx.send(NodeEvent::UpgradeAvailable {
548 version: upgrade_info.version.to_string(),
549 }) {
550 warn!("Failed to send UpgradeAvailable event: {e}");
551 }
552
553 info!("Starting auto-apply upgrade...");
555 match upgrader.apply_upgrade(&upgrade_info).await {
556 Ok(UpgradeResult::Success { version, exit_code }) => {
557 info!("Upgrade to {} successful, initiating graceful shutdown", version);
558 upgrade_exit_code.store(exit_code, Ordering::SeqCst);
559 shutdown.cancel();
560 break;
561 }
562 Ok(UpgradeResult::RolledBack { reason }) => {
563 warn!("Error during upgrade process: {}", reason);
564 }
565 Ok(UpgradeResult::NoUpgrade) => {
566 info!("Already running latest version");
567 }
568 Err(e) => {
569 error!("Error during upgrade process: {}", e);
570 }
571 }
572 }
573 Ok(None) => {
574 if let Some(remaining) = monitor.time_until_upgrade() {
575 info!(
576 "Upgrade pending, rollout delay remaining: {}m {}s",
577 remaining.as_secs() / 60,
578 remaining.as_secs() % 60
579 );
580 } else {
581 info!("No upgrade available");
582 }
583 }
584 Err(e) => {
585 warn!("Error during upgrade process: {}", e);
586 }
587 }
588 let jittered_duration =
590 jittered_interval(monitor.check_interval());
591 let next_check = chrono::Utc::now()
592 + chrono::Duration::from_std(jittered_duration).unwrap_or_else(|e| {
593 warn!("chrono::Duration::from_std failed for interval ({e}), defaulting to 1 hour");
594 chrono::Duration::hours(1)
595 });
596 info!("Next upgrade check scheduled for {}", next_check.to_rfc3339());
597 tokio::time::sleep(jittered_duration).await;
598 }
599 }
600 }
601 });
602 }
603
604 info!("Node running, waiting for shutdown signal");
605
606 self.run_event_loop().await?;
608
609 if let Some(ref manager) = self.bootstrap_manager {
611 let stats = manager.stats().await;
612 info!(
613 "Bootstrap cache shutdown: {} peers, avg quality {:.2}",
614 stats.total_peers, stats.average_quality
615 );
616 }
617
618 if let Some(handle) = self.protocol_task.take() {
620 handle.abort();
621 }
622
623 info!("Shutting down P2P node...");
625 if let Err(e) = self.p2p_node.shutdown().await {
626 warn!("Error during P2P node shutdown: {e}");
627 }
628
629 if let Err(e) = self.events_tx.send(NodeEvent::ShuttingDown) {
630 warn!("Failed to send ShuttingDown event: {e}");
631 }
632 info!("Node shutdown complete");
633
634 let exit_code = self.upgrade_exit_code.load(Ordering::SeqCst);
638 if exit_code >= 0 {
639 info!("Exiting with code {} for upgrade restart", exit_code);
640 std::process::exit(exit_code);
641 }
642
643 Ok(())
644 }
645
646 #[cfg(unix)]
648 async fn run_event_loop(&self) -> Result<()> {
649 let mut sigterm = signal(SignalKind::terminate())?;
650 let mut sighup = signal(SignalKind::hangup())?;
651
652 loop {
653 tokio::select! {
654 () = self.shutdown.cancelled() => {
655 info!("Shutdown signal received");
656 break;
657 }
658 _ = tokio::signal::ctrl_c() => {
659 info!("Received SIGINT (Ctrl-C), initiating shutdown");
660 self.shutdown();
661 break;
662 }
663 _ = sigterm.recv() => {
664 info!("Received SIGTERM, initiating shutdown");
665 self.shutdown();
666 break;
667 }
668 _ = sighup.recv() => {
669 info!("Received SIGHUP (config reload not yet supported)");
670 }
671 }
672 }
673 Ok(())
674 }
675
676 #[cfg(not(unix))]
678 async fn run_event_loop(&self) -> Result<()> {
679 loop {
680 tokio::select! {
681 () = self.shutdown.cancelled() => {
682 info!("Shutdown signal received");
683 break;
684 }
685 _ = tokio::signal::ctrl_c() => {
686 info!("Received Ctrl-C, initiating shutdown");
687 self.shutdown();
688 break;
689 }
690 }
691 }
692 Ok(())
693 }
694
695 fn start_protocol_routing(&mut self) {
700 let protocol = match self.ant_protocol {
701 Some(ref p) => Arc::clone(p),
702 None => return,
703 };
704
705 let mut events = self.p2p_node.subscribe_events();
706 let p2p = Arc::clone(&self.p2p_node);
707 let semaphore = Arc::new(Semaphore::new(64));
708
709 self.protocol_task = Some(tokio::spawn(async move {
710 while let Ok(event) = events.recv().await {
711 if let P2PEvent::Message {
712 topic,
713 source: Some(source),
714 data,
715 } = event
716 {
717 let handler_info: Option<(&str, &str)> = if topic == CHUNK_PROTOCOL_ID {
718 Some(("chunk", CHUNK_PROTOCOL_ID))
719 } else {
720 None
721 };
722
723 if let Some((data_type, response_topic)) = handler_info {
724 debug!("Received {data_type} protocol message from {source}");
725 let protocol = Arc::clone(&protocol);
726 let p2p = Arc::clone(&p2p);
727 let sem = semaphore.clone();
728 tokio::spawn(async move {
729 let Ok(_permit) = sem.acquire().await else {
730 return;
731 };
732 let result = match data_type {
733 "chunk" => protocol.try_handle_request(&data).await,
734 _ => return,
735 };
736 match result {
737 Ok(Some(response)) => {
738 if let Err(e) = p2p
739 .send_message(
740 &source,
741 response_topic,
742 response.to_vec(),
743 &[],
744 )
745 .await
746 {
747 warn!("Failed to send {data_type} protocol response to {source}: {e}");
748 }
749 }
750 Ok(None) => {}
751 Err(e) => {
752 warn!("{data_type} protocol handler error: {e}");
753 }
754 }
755 });
756 }
757 }
758 }
759 }));
760 info!("Protocol message routing started");
761 }
762
763 pub fn shutdown(&self) {
765 self.shutdown.cancel();
766 }
767}
768
769fn jittered_interval(base: std::time::Duration) -> std::time::Duration {
772 let secs = base.as_secs();
773 let variance = secs / 20; if variance == 0 {
775 return base;
776 }
777 let jitter = rand::thread_rng().gen_range(0..=variance * 2);
778 std::time::Duration::from_secs(secs.saturating_sub(variance) + jitter)
779}
780
781#[cfg(test)]
782#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
783mod tests {
784 use super::*;
785 use crate::config::NODES_SUBDIR;
786
787 #[test]
788 fn test_build_upgrade_monitor_staged_rollout_enabled() {
789 let config = NodeConfig {
790 upgrade: crate::config::UpgradeConfig {
791 staged_rollout_hours: 24,
792 ..Default::default()
793 },
794 ..Default::default()
795 };
796 let seed = b"node-seed";
797
798 let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
799 assert!(monitor.has_staged_rollout());
800 }
801
802 #[test]
803 fn test_build_upgrade_monitor_staged_rollout_disabled() {
804 let config = NodeConfig {
805 upgrade: crate::config::UpgradeConfig {
806 staged_rollout_hours: 0,
807 ..Default::default()
808 },
809 ..Default::default()
810 };
811 let seed = b"node-seed";
812
813 let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
814 assert!(!monitor.has_staged_rollout());
815 }
816
817 #[test]
818 fn test_build_core_config_sets_production_mode() {
819 let config = NodeConfig {
820 network_mode: NetworkMode::Production,
821 ..Default::default()
822 };
823 let core = NodeBuilder::build_core_config(&config).expect("core config");
824 assert!(core.diversity_config.is_some());
825 }
826
827 #[test]
828 fn test_build_core_config_sets_development_mode_relaxed() {
829 let config = NodeConfig {
830 network_mode: NetworkMode::Development,
831 ..Default::default()
832 };
833 let core = NodeBuilder::build_core_config(&config).expect("core config");
834 let diversity = core.diversity_config.expect("diversity");
835 assert!(diversity.is_relaxed());
836 }
837
838 #[test]
839 fn test_scan_identity_dirs_empty_dir() {
840 let tmp = tempfile::tempdir().unwrap();
841 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
842 assert!(dirs.is_empty());
843 }
844
845 #[test]
846 fn test_scan_identity_dirs_nonexistent_dir() {
847 let tmp = tempfile::tempdir().unwrap();
848 let path = tmp.path().join("nonexistent_identity_dir");
849 let dirs = NodeBuilder::scan_identity_dirs(&path).unwrap();
850 assert!(dirs.is_empty());
851 }
852
853 #[test]
854 fn test_scan_identity_dirs_finds_one() {
855 let tmp = tempfile::tempdir().unwrap();
856 let node_dir = tmp.path().join("abc123");
857 std::fs::create_dir_all(&node_dir).unwrap();
858 std::fs::write(node_dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
859
860 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
861 assert_eq!(dirs.len(), 1);
862 assert_eq!(dirs[0], node_dir);
863 }
864
865 #[test]
866 fn test_scan_identity_dirs_finds_multiple() {
867 let tmp = tempfile::tempdir().unwrap();
868 for name in &["node_a", "node_b"] {
869 let dir = tmp.path().join(name);
870 std::fs::create_dir_all(&dir).unwrap();
871 std::fs::write(dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
872 }
873 std::fs::create_dir_all(tmp.path().join("no_key")).unwrap();
875
876 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
877 assert_eq!(dirs.len(), 2);
878 }
879
880 #[tokio::test]
881 async fn test_resolve_identity_first_run_creates_identity() {
882 let tmp = tempfile::tempdir().unwrap();
883 let mut config = NodeConfig {
884 root_dir: tmp.path().to_path_buf(),
885 ..Default::default()
886 };
887
888 let identity = NodeBuilder::resolve_identity(&mut config).await.unwrap();
889 assert!(tmp.path().join(NODE_IDENTITY_FILENAME).exists());
891 let peer_id = identity.peer_id().to_hex();
893 assert_eq!(peer_id.len(), 64); }
895
896 #[tokio::test]
897 async fn test_resolve_identity_loads_existing() {
898 let tmp = tempfile::tempdir().unwrap();
899
900 let original = NodeIdentity::generate().unwrap();
902 original
903 .save_to_file(&tmp.path().join(NODE_IDENTITY_FILENAME))
904 .await
905 .unwrap();
906
907 let mut config = NodeConfig {
908 root_dir: tmp.path().to_path_buf(),
909 ..Default::default()
910 };
911
912 let loaded = NodeBuilder::resolve_identity(&mut config).await.unwrap();
913 assert_eq!(loaded.peer_id(), original.peer_id());
914 }
915
916 #[test]
917 fn test_peer_id_hex_length() {
918 let id = saorsa_core::identity::PeerId::from_bytes([0x42; 32]);
919 let hex = id.to_hex();
920 assert_eq!(hex.len(), 64); }
922
923 #[tokio::test]
927 async fn test_identity_persisted_across_restarts() {
928 let base_dir = tempfile::tempdir().unwrap();
929 let nodes_dir = base_dir.path().join(NODES_SUBDIR);
930
931 let identity1 = NodeIdentity::generate().unwrap();
933 let peer_id1 = identity1.peer_id().to_hex();
934 let peer_dir = nodes_dir.join(&peer_id1);
935 std::fs::create_dir_all(&peer_dir).unwrap();
936 identity1
937 .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
938 .await
939 .unwrap();
940
941 assert_eq!(peer_id1.len(), 64);
943 assert_eq!(peer_dir.file_name().unwrap().to_string_lossy(), peer_id1);
944
945 let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
947 assert_eq!(identity_dirs.len(), 1);
948 let loaded = NodeIdentity::load_from_file(&identity_dirs[0].join(NODE_IDENTITY_FILENAME))
949 .await
950 .unwrap();
951 let peer_id2 = loaded.peer_id().to_hex();
952
953 assert_eq!(peer_id1, peer_id2, "peer_id must survive restart");
954 assert_eq!(
955 identity_dirs[0], peer_dir,
956 "root_dir must be the same directory"
957 );
958 }
959
960 #[tokio::test]
963 async fn test_multiple_identities_errors() {
964 let base_dir = tempfile::tempdir().unwrap();
965 let nodes_dir = base_dir.path().join(NODES_SUBDIR);
966
967 for name in &["aaaa", "bbbb"] {
969 let dir = nodes_dir.join(name);
970 std::fs::create_dir_all(&dir).unwrap();
971 let identity = NodeIdentity::generate().unwrap();
972 identity
973 .save_to_file(&dir.join(NODE_IDENTITY_FILENAME))
974 .await
975 .unwrap();
976 }
977
978 let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
979 assert_eq!(identity_dirs.len(), 2, "should find both identity dirs");
980 }
981
982 #[tokio::test]
985 async fn test_explicit_root_dir_persists_across_restarts() {
986 let tmp = tempfile::tempdir().unwrap();
987
988 let mut config1 = NodeConfig {
990 root_dir: tmp.path().to_path_buf(),
991 ..Default::default()
992 };
993 let identity1 = NodeBuilder::resolve_identity(&mut config1).await.unwrap();
994
995 let mut config2 = NodeConfig {
997 root_dir: tmp.path().to_path_buf(),
998 ..Default::default()
999 };
1000 let identity2 = NodeBuilder::resolve_identity(&mut config2).await.unwrap();
1001
1002 assert_eq!(
1003 identity1.peer_id(),
1004 identity2.peer_id(),
1005 "explicit --root-dir must yield stable identity"
1006 );
1007 }
1008}