1use crate::ant_protocol::{CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE};
4use crate::config::{
5 default_nodes_dir, default_root_dir, EvmNetworkConfig, NetworkMode, NodeConfig,
6 NODE_IDENTITY_FILENAME,
7};
8use crate::error::{Error, Result};
9use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender};
10use crate::payment::metrics::QuotingMetricsTracker;
11use crate::payment::wallet::parse_rewards_address;
12use crate::payment::{EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
13use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
14use crate::upgrade::{
15 upgrade_cache_dir, AutoApplyUpgrader, BinaryCache, ReleaseCache, UpgradeMonitor, UpgradeResult,
16};
17use evmlib::Network as EvmNetwork;
18use rand::Rng;
19use saorsa_core::identity::NodeIdentity;
20use saorsa_core::{
21 BootstrapConfig as CoreBootstrapConfig, BootstrapManager,
22 IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent,
23 P2PNode,
24};
25use std::path::PathBuf;
26use std::sync::atomic::{AtomicI32, Ordering};
27use std::sync::Arc;
28use tokio::sync::Semaphore;
29use tokio::task::JoinHandle;
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, info, warn};
32
33pub const NODE_STORAGE_LIMIT_BYTES: u64 = 5 * 1024 * 1024 * 1024;
40
41#[cfg(unix)]
42use tokio::signal::unix::{signal, SignalKind};
43
44pub struct NodeBuilder {
46 config: NodeConfig,
47}
48
49impl NodeBuilder {
50 #[must_use]
52 pub fn new(config: NodeConfig) -> Self {
53 Self { config }
54 }
55
56 pub async fn build(mut self) -> Result<RunningNode> {
62 info!("Building ant-node with config: {:?}", self.config);
63
64 if self.config.network_mode == NetworkMode::Production {
66 match self.config.payment.rewards_address {
67 None => {
68 return Err(Error::Config(
69 "CRITICAL: Rewards address is not configured. \
70 Set payment.rewards_address in config to your Arbitrum wallet address."
71 .to_string(),
72 ));
73 }
74 Some(ref addr) if addr == "0xYOUR_ARBITRUM_ADDRESS_HERE" || addr.is_empty() => {
75 return Err(Error::Config(
76 "CRITICAL: Rewards address is not configured. \
77 Set payment.rewards_address in config to your Arbitrum wallet address."
78 .to_string(),
79 ));
80 }
81 Some(_) => {}
82 }
83 }
84
85 let identity = Arc::new(Self::resolve_identity(&mut self.config).await?);
87 let peer_id = identity.peer_id().to_hex();
88
89 info!(peer_id = %peer_id, root_dir = %self.config.root_dir.display(), "Node identity resolved");
90
91 std::fs::create_dir_all(&self.config.root_dir)?;
93
94 let shutdown = CancellationToken::new();
96
97 let (events_tx, events_rx) = create_event_channel();
99
100 let mut core_config = Self::build_core_config(&self.config)?;
102 core_config.node_identity = Some(Arc::clone(&identity));
105 debug!("Core config: {:?}", core_config);
106
107 let p2p_node = P2PNode::new(core_config)
109 .await
110 .map_err(|e| Error::Startup(format!("Failed to create P2P node: {e}")))?;
111
112 let upgrade_monitor = {
114 let node_id_seed = p2p_node.peer_id().as_bytes();
115 Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
116 };
117
118 let bootstrap_manager = if self.config.bootstrap_cache.enabled {
120 Self::build_bootstrap_manager(&self.config).await
121 } else {
122 info!("Bootstrap cache disabled");
123 None
124 };
125
126 let ant_protocol = if self.config.storage.enabled {
128 Some(Arc::new(
129 Self::build_ant_protocol(&self.config, &identity).await?,
130 ))
131 } else {
132 info!("Chunk storage disabled");
133 None
134 };
135
136 let node = RunningNode {
137 config: self.config,
138 p2p_node: Arc::new(p2p_node),
139 shutdown,
140 events_tx,
141 events_rx: Some(events_rx),
142 upgrade_monitor,
143 bootstrap_manager,
144 ant_protocol,
145 protocol_task: None,
146 upgrade_exit_code: Arc::new(AtomicI32::new(-1)),
147 };
148
149 Ok(node)
150 }
151
152 fn build_core_config(config: &NodeConfig) -> Result<CoreNodeConfig> {
154 let local = matches!(config.network_mode, NetworkMode::Development);
155
156 let mut core_config = CoreNodeConfig::builder()
157 .port(config.port)
158 .ipv6(!config.ipv4_only)
159 .local(local)
160 .max_message_size(config.max_message_size)
161 .build()
162 .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?;
163
164 core_config.bootstrap_peers = config
166 .bootstrap
167 .iter()
168 .map(|addr| MultiAddr::quic(*addr))
169 .collect();
170
171 match config.network_mode {
173 NetworkMode::Production => {
174 core_config.diversity_config = Some(CoreDiversityConfig::default());
175 }
176 NetworkMode::Testnet => {
177 core_config.allow_loopback = true;
179 core_config.diversity_config = Some(CoreDiversityConfig {
180 max_per_ip: config.testnet.max_per_ip,
181 max_per_subnet: config.testnet.max_per_subnet,
182 });
183 }
184 NetworkMode::Development => {
185 core_config.diversity_config = Some(CoreDiversityConfig::permissive());
186 }
187 }
188
189 core_config.close_group_cache_dir = Some(
192 config
193 .close_group_cache_dir
194 .clone()
195 .unwrap_or_else(|| config.root_dir.clone()),
196 );
197
198 Ok(core_config)
199 }
200
201 async fn resolve_identity(config: &mut NodeConfig) -> Result<NodeIdentity> {
215 if config.root_dir != default_root_dir() {
216 return Self::load_or_generate_identity(&config.root_dir).await;
217 }
218
219 let nodes_dir = default_nodes_dir();
220 let identity_dirs = Self::scan_identity_dirs(&nodes_dir)?;
221
222 match identity_dirs.len() {
223 0 => {
224 let identity = NodeIdentity::generate().map_err(|e| {
226 Error::Startup(format!("Failed to generate node identity: {e}"))
227 })?;
228 let peer_id = identity.peer_id().to_hex();
229 let peer_dir = nodes_dir.join(&peer_id);
230 std::fs::create_dir_all(&peer_dir)?;
231 identity
232 .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
233 .await
234 .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
235 config.root_dir = peer_dir;
236 Ok(identity)
237 }
238 1 => {
239 let dir = identity_dirs
240 .first()
241 .ok_or_else(|| Error::Config("No identity dirs found".to_string()))?;
242 let identity = NodeIdentity::load_from_file(&dir.join(NODE_IDENTITY_FILENAME))
243 .await
244 .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))?;
245 config.root_dir.clone_from(dir);
246 Ok(identity)
247 }
248 _ => {
249 let dirs: Vec<String> = identity_dirs
250 .iter()
251 .filter_map(|d| d.file_name().map(|n| n.to_string_lossy().into_owned()))
252 .collect();
253 Err(Error::Config(format!(
254 "Multiple node identities found at {}: [{}]. Specify --root-dir to select one.",
255 nodes_dir.display(),
256 dirs.join(", ")
257 )))
258 }
259 }
260 }
261
262 async fn load_or_generate_identity(dir: &std::path::Path) -> Result<NodeIdentity> {
264 let key_path = dir.join(NODE_IDENTITY_FILENAME);
265 if key_path.exists() {
266 NodeIdentity::load_from_file(&key_path)
267 .await
268 .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))
269 } else {
270 let identity = NodeIdentity::generate()
271 .map_err(|e| Error::Startup(format!("Failed to generate node identity: {e}")))?;
272 std::fs::create_dir_all(dir)?;
273 identity
274 .save_to_file(&key_path)
275 .await
276 .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
277 Ok(identity)
278 }
279 }
280
281 fn scan_identity_dirs(base_dir: &std::path::Path) -> Result<Vec<PathBuf>> {
283 let mut dirs = Vec::new();
284 let read_dir = match std::fs::read_dir(base_dir) {
285 Ok(rd) => rd,
286 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(dirs),
287 Err(e) => return Err(e.into()),
288 };
289 for entry in read_dir {
290 let entry = entry?;
291 let path = entry.path();
292 if path.is_dir() && path.join(NODE_IDENTITY_FILENAME).exists() {
293 dirs.push(path);
294 }
295 }
296 Ok(dirs)
297 }
298
299 fn build_upgrade_monitor(config: &NodeConfig, node_id_seed: &[u8]) -> UpgradeMonitor {
300 let mut monitor = UpgradeMonitor::new(
301 config.upgrade.github_repo.clone(),
302 config.upgrade.channel,
303 config.upgrade.check_interval_hours,
304 );
305
306 if let Ok(cache_dir) = upgrade_cache_dir() {
307 monitor = monitor.with_release_cache(ReleaseCache::new(
308 cache_dir,
309 std::time::Duration::from_secs(3600),
310 ));
311 }
312
313 if config.upgrade.staged_rollout_hours > 0 {
314 monitor =
315 monitor.with_staged_rollout(node_id_seed, config.upgrade.staged_rollout_hours);
316 }
317
318 monitor
319 }
320
321 async fn build_ant_protocol(
326 config: &NodeConfig,
327 identity: &NodeIdentity,
328 ) -> Result<AntProtocol> {
329 let storage_config = LmdbStorageConfig {
331 root_dir: config.root_dir.clone(),
332 verify_on_read: config.storage.verify_on_read,
333 max_chunks: config.storage.max_chunks,
334 max_map_size: config.storage.db_size_gb.saturating_mul(1_073_741_824),
335 };
336 let storage = LmdbStorage::new(storage_config)
337 .await
338 .map_err(|e| Error::Startup(format!("Failed to create LMDB storage: {e}")))?;
339
340 let rewards_address = match config.payment.rewards_address {
342 Some(ref addr) => parse_rewards_address(addr)?,
343 None => {
344 return Err(Error::Startup(
345 "No rewards address configured. Set --rewards-address or payment.rewards_address in config.".to_string(),
346 ));
347 }
348 };
349
350 let evm_network = match config.payment.evm_network {
352 EvmNetworkConfig::ArbitrumOne => EvmNetwork::ArbitrumOne,
353 EvmNetworkConfig::ArbitrumSepolia => EvmNetwork::ArbitrumSepoliaTest,
354 };
355 let payment_config = PaymentVerifierConfig {
356 evm: EvmVerifierConfig {
357 network: evm_network,
358 },
359 cache_capacity: config.payment.cache_capacity,
360 local_rewards_address: rewards_address,
361 };
362 let payment_verifier = PaymentVerifier::new(payment_config);
363 #[allow(clippy::cast_possible_truncation)]
365 let max_records = (NODE_STORAGE_LIMIT_BYTES as usize) / MAX_CHUNK_SIZE;
366 let metrics_tracker = QuotingMetricsTracker::new(max_records, 0);
367 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
368
369 crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)?;
372
373 let protocol = AntProtocol::new(
374 Arc::new(storage),
375 Arc::new(payment_verifier),
376 Arc::new(quote_generator),
377 );
378
379 info!(
380 "ANT protocol handler initialized with ML-DSA-65 signing (protocol={CHUNK_PROTOCOL_ID})"
381 );
382
383 Ok(protocol)
384 }
385
386 async fn build_bootstrap_manager(config: &NodeConfig) -> Option<BootstrapManager> {
388 let cache_dir = config
389 .bootstrap_cache
390 .cache_dir
391 .clone()
392 .unwrap_or_else(|| config.root_dir.join("bootstrap_cache"));
393
394 if let Err(e) = std::fs::create_dir_all(&cache_dir) {
396 warn!("Failed to create bootstrap cache directory: {e}");
397 return None;
398 }
399
400 let bootstrap_config = CoreBootstrapConfig {
401 cache_dir,
402 max_peers: config.bootstrap_cache.max_contacts,
403 ..CoreBootstrapConfig::default()
404 };
405
406 match BootstrapManager::with_config(bootstrap_config).await {
407 Ok(manager) => {
408 info!(
409 "Bootstrap cache initialized with {} max contacts",
410 config.bootstrap_cache.max_contacts
411 );
412 Some(manager)
413 }
414 Err(e) => {
415 warn!("Failed to initialize bootstrap cache: {e}");
416 None
417 }
418 }
419 }
420}
421
422pub struct RunningNode {
424 config: NodeConfig,
425 p2p_node: Arc<P2PNode>,
426 shutdown: CancellationToken,
427 events_tx: NodeEventsSender,
428 events_rx: Option<NodeEventsChannel>,
429 upgrade_monitor: Option<UpgradeMonitor>,
430 bootstrap_manager: Option<BootstrapManager>,
432 ant_protocol: Option<Arc<AntProtocol>>,
434 protocol_task: Option<JoinHandle<()>>,
436 upgrade_exit_code: Arc<AtomicI32>,
438}
439
440impl RunningNode {
441 #[must_use]
443 pub fn root_dir(&self) -> &PathBuf {
444 &self.config.root_dir
445 }
446
447 pub fn events(&mut self) -> Option<NodeEventsChannel> {
451 self.events_rx.take()
452 }
453
454 #[must_use]
456 pub fn subscribe_events(&self) -> NodeEventsChannel {
457 self.events_tx.subscribe()
458 }
459
460 #[allow(clippy::too_many_lines)]
466 pub async fn run(&mut self) -> Result<()> {
467 info!("Node runtime loop starting");
468
469 self.p2p_node
471 .start()
472 .await
473 .map_err(|e| Error::Startup(format!("Failed to start P2P node: {e}")))?;
474
475 let listen_addrs = self.p2p_node.listen_addrs().await;
476 info!(listen_addrs = ?listen_addrs, "P2P node started");
477
478 let actual_port = listen_addrs
480 .first()
481 .and_then(MultiAddr::port)
482 .unwrap_or(self.config.port);
483 info!(
484 port = actual_port,
485 "Node is running on port: {}", actual_port
486 );
487
488 if let Err(e) = self.events_tx.send(NodeEvent::Started) {
490 warn!("Failed to send Started event: {e}");
491 }
492
493 self.start_protocol_routing();
495
496 if let Some(monitor) = self.upgrade_monitor.take() {
498 let events_tx = self.events_tx.clone();
499 let shutdown = self.shutdown.clone();
500 let stop_on_upgrade = self.config.upgrade.stop_on_upgrade;
501 let upgrade_exit_code = Arc::clone(&self.upgrade_exit_code);
502
503 tokio::spawn(async move {
504 let mut monitor = monitor;
505 let mut upgrader = AutoApplyUpgrader::new().with_stop_on_upgrade(stop_on_upgrade);
506 if let Ok(cache_dir) = upgrade_cache_dir() {
507 upgrader = upgrader.with_binary_cache(BinaryCache::new(cache_dir));
508 }
509
510 {
513 let jitter_duration = jittered_interval(monitor.check_interval());
514 let first_check_time = chrono::Utc::now()
515 + chrono::Duration::from_std(jitter_duration).unwrap_or_else(|e| {
516 warn!("chrono::Duration::from_std failed for jitter ({e}), defaulting to 1 minute");
517 chrono::Duration::minutes(1)
518 });
519 info!(
520 "First upgrade check scheduled for {} (jitter: {}s)",
521 first_check_time.to_rfc3339(),
522 jitter_duration.as_secs()
523 );
524 tokio::time::sleep(jitter_duration).await;
525 }
526
527 loop {
528 tokio::select! {
529 () = shutdown.cancelled() => {
530 break;
531 }
532 result = monitor.check_for_ready_upgrade() => {
533 match result {
534 Ok(Some(upgrade_info)) => {
535 info!(
536 current_version = %upgrader.current_version(),
537 new_version = %upgrade_info.version,
538 "Upgrade available"
539 );
540
541 if let Err(e) = events_tx.send(NodeEvent::UpgradeAvailable {
543 version: upgrade_info.version.to_string(),
544 }) {
545 warn!("Failed to send UpgradeAvailable event: {e}");
546 }
547
548 info!("Starting auto-apply upgrade...");
550 match upgrader.apply_upgrade(&upgrade_info).await {
551 Ok(UpgradeResult::Success { version, exit_code }) => {
552 info!("Upgrade to {} successful, initiating graceful shutdown", version);
553 upgrade_exit_code.store(exit_code, Ordering::SeqCst);
554 shutdown.cancel();
555 break;
556 }
557 Ok(UpgradeResult::RolledBack { reason }) => {
558 warn!("Error during upgrade process: {}", reason);
559 }
560 Ok(UpgradeResult::NoUpgrade) => {
561 info!("Already running latest version");
562 }
563 Err(e) => {
564 error!("Error during upgrade process: {}", e);
565 }
566 }
567 }
568 Ok(None) => {
569 if let Some(remaining) = monitor.time_until_upgrade() {
570 info!(
571 "Upgrade pending, rollout delay remaining: {}m {}s",
572 remaining.as_secs() / 60,
573 remaining.as_secs() % 60
574 );
575 } else {
576 info!("No upgrade available");
577 }
578 }
579 Err(e) => {
580 warn!("Error during upgrade process: {}", e);
581 }
582 }
583 let sleep_duration = monitor.time_until_upgrade().map_or_else(
587 || {
588 let jittered_duration =
590 jittered_interval(monitor.check_interval());
591 let next_check = chrono::Utc::now()
592 + chrono::Duration::from_std(jittered_duration).unwrap_or_else(|e| {
593 warn!("chrono::Duration::from_std failed for interval ({e}), defaulting to 1 hour");
594 chrono::Duration::hours(1)
595 });
596 info!("Next upgrade check scheduled for {}", next_check.to_rfc3339());
597 jittered_duration
598 },
599 |remaining| {
600 if remaining.is_zero() {
604 let backoff = jittered_interval(monitor.check_interval());
605 let next_check = chrono::Utc::now()
606 + chrono::Duration::from_std(backoff).unwrap_or_else(|e| {
607 warn!("chrono::Duration::from_std failed for backoff ({e}), defaulting to 1 hour");
608 chrono::Duration::hours(1)
609 });
610 info!(
611 "Upgrade rollout delay elapsed but previous apply did not succeed; \
612 backing off, next check scheduled for {}",
613 next_check.to_rfc3339()
614 );
615 backoff
616 } else {
617 let wake_time = chrono::Utc::now()
618 + chrono::Duration::from_std(remaining).unwrap_or_else(|e| {
619 warn!("chrono::Duration::from_std failed for rollout delay ({e}), defaulting to 1 minute");
620 chrono::Duration::minutes(1)
621 });
622 info!("Will apply upgrade at {}", wake_time.to_rfc3339());
623 remaining
624 }
625 },
626 );
627 tokio::select! {
630 () = shutdown.cancelled() => {
631 break;
632 }
633 () = tokio::time::sleep(sleep_duration) => {}
634 }
635 }
636 }
637 }
638 });
639 }
640
641 info!("Node running, waiting for shutdown signal");
642
643 self.run_event_loop().await?;
645
646 if let Some(ref manager) = self.bootstrap_manager {
648 let stats = manager.stats().await;
649 info!(
650 "Bootstrap cache shutdown: {} peers, avg quality {:.2}",
651 stats.total_peers, stats.average_quality
652 );
653 }
654
655 if let Some(handle) = self.protocol_task.take() {
657 handle.abort();
658 }
659
660 info!("Shutting down P2P node...");
662 if let Err(e) = self.p2p_node.shutdown().await {
663 warn!("Error during P2P node shutdown: {e}");
664 }
665
666 if let Err(e) = self.events_tx.send(NodeEvent::ShuttingDown) {
667 warn!("Failed to send ShuttingDown event: {e}");
668 }
669 info!("Node shutdown complete");
670
671 let exit_code = self.upgrade_exit_code.load(Ordering::SeqCst);
675 if exit_code >= 0 {
676 info!("Exiting with code {} for upgrade restart", exit_code);
677 std::process::exit(exit_code);
678 }
679
680 Ok(())
681 }
682
683 #[cfg(unix)]
685 async fn run_event_loop(&self) -> Result<()> {
686 let mut sigterm = signal(SignalKind::terminate())?;
687 let mut sighup = signal(SignalKind::hangup())?;
688
689 loop {
690 tokio::select! {
691 () = self.shutdown.cancelled() => {
692 info!("Shutdown signal received");
693 break;
694 }
695 _ = tokio::signal::ctrl_c() => {
696 info!("Received SIGINT (Ctrl-C), initiating shutdown");
697 self.shutdown();
698 break;
699 }
700 _ = sigterm.recv() => {
701 info!("Received SIGTERM, initiating shutdown");
702 self.shutdown();
703 break;
704 }
705 _ = sighup.recv() => {
706 info!("Received SIGHUP (config reload not yet supported)");
707 }
708 }
709 }
710 Ok(())
711 }
712
713 #[cfg(not(unix))]
715 async fn run_event_loop(&self) -> Result<()> {
716 loop {
717 tokio::select! {
718 () = self.shutdown.cancelled() => {
719 info!("Shutdown signal received");
720 break;
721 }
722 _ = tokio::signal::ctrl_c() => {
723 info!("Received Ctrl-C, initiating shutdown");
724 self.shutdown();
725 break;
726 }
727 }
728 }
729 Ok(())
730 }
731
732 fn start_protocol_routing(&mut self) {
737 let protocol = match self.ant_protocol {
738 Some(ref p) => Arc::clone(p),
739 None => return,
740 };
741
742 let mut events = self.p2p_node.subscribe_events();
743 let p2p = Arc::clone(&self.p2p_node);
744 let semaphore = Arc::new(Semaphore::new(64));
745
746 self.protocol_task = Some(tokio::spawn(async move {
747 while let Ok(event) = events.recv().await {
748 if let P2PEvent::Message {
749 topic,
750 source: Some(source),
751 data,
752 } = event
753 {
754 let handler_info: Option<(&str, &str)> = if topic == CHUNK_PROTOCOL_ID {
755 Some(("chunk", CHUNK_PROTOCOL_ID))
756 } else {
757 None
758 };
759
760 if let Some((data_type, response_topic)) = handler_info {
761 debug!("Received {data_type} protocol message from {source}");
762 let protocol = Arc::clone(&protocol);
763 let p2p = Arc::clone(&p2p);
764 let sem = semaphore.clone();
765 tokio::spawn(async move {
766 let Ok(_permit) = sem.acquire().await else {
767 return;
768 };
769 let result = match data_type {
770 "chunk" => protocol.try_handle_request(&data).await,
771 _ => return,
772 };
773 match result {
774 Ok(Some(response)) => {
775 if let Err(e) = p2p
776 .send_message(
777 &source,
778 response_topic,
779 response.to_vec(),
780 &[],
781 )
782 .await
783 {
784 warn!("Failed to send {data_type} protocol response to {source}: {e}");
785 }
786 }
787 Ok(None) => {}
788 Err(e) => {
789 warn!("{data_type} protocol handler error: {e}");
790 }
791 }
792 });
793 }
794 }
795 }
796 }));
797 info!("Protocol message routing started");
798 }
799
800 pub fn shutdown(&self) {
802 self.shutdown.cancel();
803 }
804}
805
806fn jittered_interval(base: std::time::Duration) -> std::time::Duration {
809 let secs = base.as_secs();
810 let variance = secs / 20; if variance == 0 {
812 return base;
813 }
814 let jitter = rand::thread_rng().gen_range(0..=variance * 2);
815 std::time::Duration::from_secs(secs.saturating_sub(variance) + jitter)
816}
817
818#[cfg(test)]
819#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
820mod tests {
821 use super::*;
822 use crate::config::NODES_SUBDIR;
823
824 #[test]
825 fn test_build_upgrade_monitor_staged_rollout_enabled() {
826 let config = NodeConfig {
827 upgrade: crate::config::UpgradeConfig {
828 staged_rollout_hours: 24,
829 ..Default::default()
830 },
831 ..Default::default()
832 };
833 let seed = b"node-seed";
834
835 let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
836 assert!(monitor.has_staged_rollout());
837 }
838
839 #[test]
840 fn test_build_upgrade_monitor_staged_rollout_disabled() {
841 let config = NodeConfig {
842 upgrade: crate::config::UpgradeConfig {
843 staged_rollout_hours: 0,
844 ..Default::default()
845 },
846 ..Default::default()
847 };
848 let seed = b"node-seed";
849
850 let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
851 assert!(!monitor.has_staged_rollout());
852 }
853
854 #[test]
855 fn test_build_core_config_sets_production_mode() {
856 let config = NodeConfig {
857 network_mode: NetworkMode::Production,
858 ..Default::default()
859 };
860 let core = NodeBuilder::build_core_config(&config).expect("core config");
861 assert!(core.diversity_config.is_some());
862 }
863
864 #[test]
865 fn test_build_core_config_ipv4_only() {
866 let config = NodeConfig {
867 ipv4_only: true,
868 ..Default::default()
869 };
870 let core = NodeBuilder::build_core_config(&config).expect("core config");
871 assert!(!core.ipv6, "ipv4_only should disable IPv6");
872 }
873
874 #[test]
875 fn test_build_core_config_dual_stack_by_default() {
876 let config = NodeConfig::default();
877 let core = NodeBuilder::build_core_config(&config).expect("core config");
878 assert!(core.ipv6, "dual-stack should be the default");
879 }
880
881 #[test]
882 fn test_build_core_config_sets_development_mode_permissive() {
883 let config = NodeConfig {
884 network_mode: NetworkMode::Development,
885 ..Default::default()
886 };
887 let core = NodeBuilder::build_core_config(&config).expect("core config");
888 let diversity = core.diversity_config.expect("diversity");
889 assert_eq!(diversity.max_per_ip, Some(usize::MAX));
890 assert_eq!(diversity.max_per_subnet, Some(usize::MAX));
891 }
892
893 #[test]
894 fn test_scan_identity_dirs_empty_dir() {
895 let tmp = tempfile::tempdir().unwrap();
896 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
897 assert!(dirs.is_empty());
898 }
899
900 #[test]
901 fn test_scan_identity_dirs_nonexistent_dir() {
902 let tmp = tempfile::tempdir().unwrap();
903 let path = tmp.path().join("nonexistent_identity_dir");
904 let dirs = NodeBuilder::scan_identity_dirs(&path).unwrap();
905 assert!(dirs.is_empty());
906 }
907
908 #[test]
909 fn test_scan_identity_dirs_finds_one() {
910 let tmp = tempfile::tempdir().unwrap();
911 let node_dir = tmp.path().join("abc123");
912 std::fs::create_dir_all(&node_dir).unwrap();
913 std::fs::write(node_dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
914
915 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
916 assert_eq!(dirs.len(), 1);
917 assert_eq!(dirs[0], node_dir);
918 }
919
920 #[test]
921 fn test_scan_identity_dirs_finds_multiple() {
922 let tmp = tempfile::tempdir().unwrap();
923 for name in &["node_a", "node_b"] {
924 let dir = tmp.path().join(name);
925 std::fs::create_dir_all(&dir).unwrap();
926 std::fs::write(dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
927 }
928 std::fs::create_dir_all(tmp.path().join("no_key")).unwrap();
930
931 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
932 assert_eq!(dirs.len(), 2);
933 }
934
935 #[tokio::test]
936 async fn test_resolve_identity_first_run_creates_identity() {
937 let tmp = tempfile::tempdir().unwrap();
938 let mut config = NodeConfig {
939 root_dir: tmp.path().to_path_buf(),
940 ..Default::default()
941 };
942
943 let identity = NodeBuilder::resolve_identity(&mut config).await.unwrap();
944 assert!(tmp.path().join(NODE_IDENTITY_FILENAME).exists());
946 let peer_id = identity.peer_id().to_hex();
948 assert_eq!(peer_id.len(), 64); }
950
951 #[tokio::test]
952 async fn test_resolve_identity_loads_existing() {
953 let tmp = tempfile::tempdir().unwrap();
954
955 let original = NodeIdentity::generate().unwrap();
957 original
958 .save_to_file(&tmp.path().join(NODE_IDENTITY_FILENAME))
959 .await
960 .unwrap();
961
962 let mut config = NodeConfig {
963 root_dir: tmp.path().to_path_buf(),
964 ..Default::default()
965 };
966
967 let loaded = NodeBuilder::resolve_identity(&mut config).await.unwrap();
968 assert_eq!(loaded.peer_id(), original.peer_id());
969 }
970
971 #[test]
972 fn test_peer_id_hex_length() {
973 let id = saorsa_core::identity::PeerId::from_bytes([0x42; 32]);
974 let hex = id.to_hex();
975 assert_eq!(hex.len(), 64); }
977
978 #[tokio::test]
982 async fn test_identity_persisted_across_restarts() {
983 let base_dir = tempfile::tempdir().unwrap();
984 let nodes_dir = base_dir.path().join(NODES_SUBDIR);
985
986 let identity1 = NodeIdentity::generate().unwrap();
988 let peer_id1 = identity1.peer_id().to_hex();
989 let peer_dir = nodes_dir.join(&peer_id1);
990 std::fs::create_dir_all(&peer_dir).unwrap();
991 identity1
992 .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
993 .await
994 .unwrap();
995
996 assert_eq!(peer_id1.len(), 64);
998 assert_eq!(peer_dir.file_name().unwrap().to_string_lossy(), peer_id1);
999
1000 let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1002 assert_eq!(identity_dirs.len(), 1);
1003 let loaded = NodeIdentity::load_from_file(&identity_dirs[0].join(NODE_IDENTITY_FILENAME))
1004 .await
1005 .unwrap();
1006 let peer_id2 = loaded.peer_id().to_hex();
1007
1008 assert_eq!(peer_id1, peer_id2, "peer_id must survive restart");
1009 assert_eq!(
1010 identity_dirs[0], peer_dir,
1011 "root_dir must be the same directory"
1012 );
1013 }
1014
1015 #[tokio::test]
1018 async fn test_multiple_identities_errors() {
1019 let base_dir = tempfile::tempdir().unwrap();
1020 let nodes_dir = base_dir.path().join(NODES_SUBDIR);
1021
1022 for name in &["aaaa", "bbbb"] {
1024 let dir = nodes_dir.join(name);
1025 std::fs::create_dir_all(&dir).unwrap();
1026 let identity = NodeIdentity::generate().unwrap();
1027 identity
1028 .save_to_file(&dir.join(NODE_IDENTITY_FILENAME))
1029 .await
1030 .unwrap();
1031 }
1032
1033 let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1034 assert_eq!(identity_dirs.len(), 2, "should find both identity dirs");
1035 }
1036
1037 #[tokio::test]
1040 async fn test_explicit_root_dir_persists_across_restarts() {
1041 let tmp = tempfile::tempdir().unwrap();
1042
1043 let mut config1 = NodeConfig {
1045 root_dir: tmp.path().to_path_buf(),
1046 ..Default::default()
1047 };
1048 let identity1 = NodeBuilder::resolve_identity(&mut config1).await.unwrap();
1049
1050 let mut config2 = NodeConfig {
1052 root_dir: tmp.path().to_path_buf(),
1053 ..Default::default()
1054 };
1055 let identity2 = NodeBuilder::resolve_identity(&mut config2).await.unwrap();
1056
1057 assert_eq!(
1058 identity1.peer_id(),
1059 identity2.peer_id(),
1060 "explicit --root-dir must yield stable identity"
1061 );
1062 }
1063}