1use crate::ant_protocol::CHUNK_PROTOCOL_ID;
4use crate::config::{
5 default_nodes_dir, default_root_dir, NetworkMode, NodeConfig, NODE_IDENTITY_FILENAME,
6};
7use crate::error::{Error, Result};
8use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender};
9use crate::logging::{debug, error, info, warn};
10use crate::payment::metrics::QuotingMetricsTracker;
11use crate::payment::wallet::parse_rewards_address;
12use crate::payment::{EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
13use crate::replication::config::ReplicationConfig;
14use crate::replication::ReplicationEngine;
15use crate::storage::lmdb::MIB;
16use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
17use crate::upgrade::{
18 upgrade_cache_dir, AutoApplyUpgrader, BinaryCache, ReleaseCache, UpgradeMonitor, UpgradeResult,
19};
20use rand::Rng;
21use saorsa_core::identity::NodeIdentity;
22use saorsa_core::{
23 IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent,
24 P2PNode,
25};
26use std::path::PathBuf;
27use std::sync::atomic::{AtomicI32, Ordering};
28use std::sync::Arc;
29use tokio::sync::Semaphore;
30use tokio::task::JoinHandle;
31use tokio_util::sync::CancellationToken;
32
33#[cfg(unix)]
34use tokio::signal::unix::{signal, SignalKind};
35
36pub struct NodeBuilder {
38 config: NodeConfig,
39}
40
41impl NodeBuilder {
42 #[must_use]
44 pub fn new(config: NodeConfig) -> Self {
45 Self { config }
46 }
47
48 fn validate_production_rewards_address(config: &NodeConfig) -> Result<()> {
59 if config.network_mode != NetworkMode::Production {
60 return Ok(());
61 }
62 let configured = config
63 .payment
64 .rewards_address
65 .as_deref()
66 .is_some_and(|addr| !addr.is_empty() && addr != "0xYOUR_ARBITRUM_ADDRESS_HERE");
67 if configured {
68 Ok(())
69 } else {
70 Err(Error::Config(
71 "CRITICAL: Rewards address is not configured. \
72 Set payment.rewards_address in config to your Arbitrum wallet address."
73 .to_string(),
74 ))
75 }
76 }
77
78 pub async fn build(mut self) -> Result<RunningNode> {
84 info!("Building ant-node with config: {:?}", self.config);
85
86 Self::validate_production_rewards_address(&self.config)?;
87
88 let identity = Arc::new(Self::resolve_identity(&mut self.config).await?);
90 let peer_id = identity.peer_id().to_hex();
91
92 info!(peer_id = %peer_id, root_dir = %self.config.root_dir.display(), "Node identity resolved");
93
94 std::fs::create_dir_all(&self.config.root_dir)?;
96
97 let shutdown = CancellationToken::new();
99
100 let (events_tx, events_rx) = create_event_channel();
102
103 let mut core_config = Self::build_core_config(&self.config)?;
105 core_config.node_identity = Some(Arc::clone(&identity));
108 debug!("Core config: {:?}", core_config);
109
110 let p2p_node = P2PNode::new(core_config)
112 .await
113 .map_err(|e| Error::Startup(format!("Failed to create P2P node: {e}")))?;
114
115 let upgrade_monitor = {
117 let node_id_seed = p2p_node.peer_id().as_bytes();
118 Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
119 };
120
121 let repl_config = ReplicationConfig::default();
122
123 let (ant_protocol, fresh_write_rx) = if self.config.storage.enabled {
126 let (fresh_write_tx, fresh_write_rx) = tokio::sync::mpsc::unbounded_channel();
127 let mut protocol =
128 Self::build_ant_protocol(&self.config, &identity, repl_config.close_group_size)
129 .await?;
130 protocol.set_fresh_write_sender(fresh_write_tx);
131 (Some(Arc::new(protocol)), Some(fresh_write_rx))
132 } else {
133 info!("Chunk storage disabled");
134 (None, None)
135 };
136
137 let p2p_arc = Arc::new(p2p_node);
138
139 if let Some(ref protocol) = ant_protocol {
142 protocol.attach_p2p_node(Arc::clone(&p2p_arc));
143 }
144
145 let replication_engine =
147 if let (Some(ref protocol), Some(fresh_rx)) = (&ant_protocol, fresh_write_rx) {
148 let storage_arc = protocol.storage();
149 let payment_verifier_arc = protocol.payment_verifier_arc();
150 match ReplicationEngine::new(
151 repl_config,
152 Arc::clone(&p2p_arc),
153 storage_arc,
154 payment_verifier_arc,
155 Arc::clone(&identity),
156 &self.config.root_dir,
157 fresh_rx,
158 shutdown.clone(),
159 )
160 .await
161 {
162 Ok(engine) => Some(engine),
163 Err(e) => {
164 warn!("Failed to initialize replication engine: {e}");
165 None
166 }
167 }
168 } else {
169 None
170 };
171
172 let node = RunningNode {
173 config: self.config,
174 p2p_node: p2p_arc,
175 shutdown,
176 events_tx,
177 events_rx: Some(events_rx),
178 upgrade_monitor,
179 ant_protocol,
180 replication_engine,
181 protocol_task: None,
182 upgrade_exit_code: Arc::new(AtomicI32::new(-1)),
183 };
184
185 Ok(node)
186 }
187
188 fn build_core_config(config: &NodeConfig) -> Result<CoreNodeConfig> {
190 let local = matches!(config.network_mode, NetworkMode::Development);
191
192 let mut core_config = CoreNodeConfig::builder()
193 .port(config.port)
194 .ipv6(!config.ipv4_only)
195 .local(local)
196 .max_message_size(config.max_message_size)
197 .build()
198 .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?;
199
200 core_config.bootstrap_peers = config
202 .bootstrap
203 .iter()
204 .map(|addr| MultiAddr::quic(*addr))
205 .collect();
206
207 match config.network_mode {
209 NetworkMode::Production => {
210 core_config.diversity_config = Some(CoreDiversityConfig::default());
211 }
212 NetworkMode::Testnet => {
213 core_config.allow_loopback = true;
215 core_config.diversity_config = Some(CoreDiversityConfig {
216 max_per_ip: config.testnet.max_per_ip,
217 max_per_subnet: config.testnet.max_per_subnet,
218 });
219 }
220 NetworkMode::Development => {
221 core_config.diversity_config = Some(CoreDiversityConfig::permissive());
222 }
223 }
224
225 core_config.close_group_cache_dir = Some(
228 config
229 .close_group_cache_dir
230 .clone()
231 .unwrap_or_else(|| config.root_dir.clone()),
232 );
233
234 Ok(core_config)
235 }
236
237 async fn resolve_identity(config: &mut NodeConfig) -> Result<NodeIdentity> {
251 if config.root_dir != default_root_dir() {
252 return Self::load_or_generate_identity(&config.root_dir).await;
253 }
254
255 let nodes_dir = default_nodes_dir();
256 let identity_dirs = Self::scan_identity_dirs(&nodes_dir)?;
257
258 match identity_dirs.len() {
259 0 => {
260 let identity = NodeIdentity::generate().map_err(|e| {
262 Error::Startup(format!("Failed to generate node identity: {e}"))
263 })?;
264 let peer_id = identity.peer_id().to_hex();
265 let peer_dir = nodes_dir.join(&peer_id);
266 std::fs::create_dir_all(&peer_dir)?;
267 identity
268 .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
269 .await
270 .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
271 config.root_dir = peer_dir;
272 Ok(identity)
273 }
274 1 => {
275 let dir = identity_dirs
276 .first()
277 .ok_or_else(|| Error::Config("No identity dirs found".to_string()))?;
278 let identity = NodeIdentity::load_from_file(&dir.join(NODE_IDENTITY_FILENAME))
279 .await
280 .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))?;
281 config.root_dir.clone_from(dir);
282 Ok(identity)
283 }
284 _ => {
285 let dirs: Vec<String> = identity_dirs
286 .iter()
287 .filter_map(|d| d.file_name().map(|n| n.to_string_lossy().into_owned()))
288 .collect();
289 Err(Error::Config(format!(
290 "Multiple node identities found at {}: [{}]. Specify --root-dir to select one.",
291 nodes_dir.display(),
292 dirs.join(", ")
293 )))
294 }
295 }
296 }
297
298 async fn load_or_generate_identity(dir: &std::path::Path) -> Result<NodeIdentity> {
300 let key_path = dir.join(NODE_IDENTITY_FILENAME);
301 if key_path.exists() {
302 NodeIdentity::load_from_file(&key_path)
303 .await
304 .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))
305 } else {
306 let identity = NodeIdentity::generate()
307 .map_err(|e| Error::Startup(format!("Failed to generate node identity: {e}")))?;
308 std::fs::create_dir_all(dir)?;
309 identity
310 .save_to_file(&key_path)
311 .await
312 .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
313 Ok(identity)
314 }
315 }
316
317 fn scan_identity_dirs(base_dir: &std::path::Path) -> Result<Vec<PathBuf>> {
319 let mut dirs = Vec::new();
320 let read_dir = match std::fs::read_dir(base_dir) {
321 Ok(rd) => rd,
322 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(dirs),
323 Err(e) => return Err(e.into()),
324 };
325 for entry in read_dir {
326 let entry = entry?;
327 let path = entry.path();
328 if path.is_dir() && path.join(NODE_IDENTITY_FILENAME).exists() {
329 dirs.push(path);
330 }
331 }
332 Ok(dirs)
333 }
334
335 fn build_upgrade_monitor(config: &NodeConfig, node_id_seed: &[u8]) -> UpgradeMonitor {
336 let mut monitor = UpgradeMonitor::new(
337 config.upgrade.github_repo.clone(),
338 config.upgrade.channel,
339 config.upgrade.check_interval_hours,
340 );
341
342 if let Ok(cache_dir) = upgrade_cache_dir() {
343 monitor = monitor.with_release_cache(ReleaseCache::new(
344 cache_dir,
345 std::time::Duration::from_secs(3600),
346 ));
347 }
348
349 if config.upgrade.staged_rollout_hours > 0 {
350 monitor =
351 monitor.with_staged_rollout(node_id_seed, config.upgrade.staged_rollout_hours);
352 }
353
354 monitor
355 }
356
357 async fn build_ant_protocol(
362 config: &NodeConfig,
363 identity: &NodeIdentity,
364 close_group_size: usize,
365 ) -> Result<AntProtocol> {
366 let storage_config = LmdbStorageConfig {
368 root_dir: config.root_dir.clone(),
369 verify_on_read: config.storage.verify_on_read,
370 max_map_size: config.storage.db_size_gb.saturating_mul(1024 * 1024 * 1024),
371 disk_reserve: config.storage.disk_reserve_mb.saturating_mul(MIB),
372 };
373 let storage = LmdbStorage::new(storage_config)
374 .await
375 .map_err(|e| Error::Startup(format!("Failed to create LMDB storage: {e}")))?;
376
377 let rewards_address = match config.payment.rewards_address {
379 Some(ref addr) => parse_rewards_address(addr)?,
380 None => {
381 return Err(Error::Startup(
382 "No rewards address configured. Set --rewards-address or payment.rewards_address in config.".to_string(),
383 ));
384 }
385 };
386
387 let evm_network = config.payment.evm_network.clone().into_evm_network();
389 let payment_config = PaymentVerifierConfig {
390 evm: EvmVerifierConfig {
391 network: evm_network,
392 },
393 cache_capacity: config.payment.cache_capacity,
394 close_group_size,
395 local_rewards_address: rewards_address,
396 };
397 let payment_verifier = PaymentVerifier::new(payment_config);
398 let metrics_tracker = QuotingMetricsTracker::new(0);
399 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
400
401 crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)?;
404
405 let storage = Arc::new(storage);
406 let payment_verifier = Arc::new(payment_verifier);
407
408 let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
409
410 info!(
411 "ANT protocol handler initialized with ML-DSA-65 signing (protocol={CHUNK_PROTOCOL_ID})"
412 );
413
414 Ok(protocol)
415 }
416}
417
418pub struct RunningNode {
420 config: NodeConfig,
421 p2p_node: Arc<P2PNode>,
422 shutdown: CancellationToken,
423 events_tx: NodeEventsSender,
424 events_rx: Option<NodeEventsChannel>,
425 upgrade_monitor: Option<UpgradeMonitor>,
426 ant_protocol: Option<Arc<AntProtocol>>,
428 replication_engine: Option<ReplicationEngine>,
430 protocol_task: Option<JoinHandle<()>>,
432 upgrade_exit_code: Arc<AtomicI32>,
434}
435
436impl RunningNode {
437 #[must_use]
439 pub fn root_dir(&self) -> &PathBuf {
440 &self.config.root_dir
441 }
442
443 pub fn events(&mut self) -> Option<NodeEventsChannel> {
447 self.events_rx.take()
448 }
449
450 #[must_use]
452 pub fn subscribe_events(&self) -> NodeEventsChannel {
453 self.events_tx.subscribe()
454 }
455
456 #[allow(clippy::too_many_lines)]
462 pub async fn run(&mut self) -> Result<()> {
463 info!("Node runtime loop starting");
464
465 let dht_events_for_bootstrap = self
469 .replication_engine
470 .as_ref()
471 .map(|_| self.p2p_node.dht_manager().subscribe_events());
472
473 self.p2p_node
475 .start()
476 .await
477 .map_err(|e| Error::Startup(format!("Failed to start P2P node: {e}")))?;
478
479 let listen_addrs = self.p2p_node.listen_addrs().await;
480 info!(listen_addrs = ?listen_addrs, "P2P node started");
481
482 let actual_port = listen_addrs
484 .first()
485 .and_then(MultiAddr::port)
486 .unwrap_or(self.config.port);
487 info!(
488 port = actual_port,
489 "Node is running on port: {}", actual_port
490 );
491
492 if let Err(e) = self.events_tx.send(NodeEvent::Started) {
494 warn!("Failed to send Started event: {e}");
495 }
496
497 self.start_protocol_routing();
499
500 if let Some(ref mut engine) = self.replication_engine {
502 if let Some(dht_events) = dht_events_for_bootstrap {
505 engine.start(dht_events);
506 }
507 info!("Replication engine started");
508 }
509
510 if let Some(monitor) = self.upgrade_monitor.take() {
512 let events_tx = self.events_tx.clone();
513 let shutdown = self.shutdown.clone();
514 let stop_on_upgrade = self.config.upgrade.stop_on_upgrade;
515 let upgrade_exit_code = Arc::clone(&self.upgrade_exit_code);
516
517 tokio::spawn(async move {
518 let mut monitor = monitor;
519 let mut upgrader = AutoApplyUpgrader::new().with_stop_on_upgrade(stop_on_upgrade);
520 if let Ok(cache_dir) = upgrade_cache_dir() {
521 upgrader = upgrader.with_binary_cache(BinaryCache::new(cache_dir));
522 }
523
524 {
527 let jitter_duration = jittered_interval(monitor.check_interval());
528 let first_check_time = chrono::Utc::now()
529 + chrono::Duration::from_std(jitter_duration).unwrap_or_else(|e| {
530 warn!("chrono::Duration::from_std failed for jitter ({e}), defaulting to 1 minute");
531 chrono::Duration::minutes(1)
532 });
533 info!(
534 "First upgrade check scheduled for {} (jitter: {}s)",
535 first_check_time.to_rfc3339(),
536 jitter_duration.as_secs()
537 );
538 tokio::time::sleep(jitter_duration).await;
539 }
540
541 loop {
542 tokio::select! {
543 () = shutdown.cancelled() => {
544 break;
545 }
546 result = monitor.check_for_ready_upgrade() => {
547 match result {
548 Ok(Some(upgrade_info)) => {
549 info!(
550 current_version = %upgrader.current_version(),
551 new_version = %upgrade_info.version,
552 "Upgrade available"
553 );
554
555 if let Err(e) = events_tx.send(NodeEvent::UpgradeAvailable {
557 version: upgrade_info.version.to_string(),
558 }) {
559 warn!("Failed to send UpgradeAvailable event: {e}");
560 }
561
562 info!("Starting auto-apply upgrade...");
564 match upgrader.apply_upgrade(&upgrade_info).await {
565 Ok(UpgradeResult::Success { version, exit_code }) => {
566 info!("Upgrade to {} successful, initiating graceful shutdown", version);
567 upgrade_exit_code.store(exit_code, Ordering::SeqCst);
568 shutdown.cancel();
569 break;
570 }
571 Ok(UpgradeResult::RolledBack { reason }) => {
572 warn!("Error during upgrade process: {}", reason);
573 }
574 Ok(UpgradeResult::NoUpgrade) => {
575 info!("Already running latest version");
576 }
577 Err(e) => {
578 error!("Error during upgrade process: {}", e);
579 }
580 }
581 }
582 Ok(None) => {
583 if let Some(remaining) = monitor.time_until_upgrade() {
584 info!(
585 "Upgrade pending, rollout delay remaining: {}m {}s",
586 remaining.as_secs() / 60,
587 remaining.as_secs() % 60
588 );
589 } else {
590 info!("No upgrade available");
591 }
592 }
593 Err(e) => {
594 warn!("Error during upgrade process: {}", e);
595 }
596 }
597 let sleep_duration = monitor.time_until_upgrade().map_or_else(
601 || {
602 let jittered_duration =
604 jittered_interval(monitor.check_interval());
605 let next_check = chrono::Utc::now()
606 + chrono::Duration::from_std(jittered_duration).unwrap_or_else(|e| {
607 warn!("chrono::Duration::from_std failed for interval ({e}), defaulting to 1 hour");
608 chrono::Duration::hours(1)
609 });
610 info!("Next upgrade check scheduled for {}", next_check.to_rfc3339());
611 jittered_duration
612 },
613 |remaining| {
614 if remaining.is_zero() {
618 let backoff = jittered_interval(monitor.check_interval());
619 let next_check = chrono::Utc::now()
620 + chrono::Duration::from_std(backoff).unwrap_or_else(|e| {
621 warn!("chrono::Duration::from_std failed for backoff ({e}), defaulting to 1 hour");
622 chrono::Duration::hours(1)
623 });
624 info!(
625 "Upgrade rollout delay elapsed but previous apply did not succeed; \
626 backing off, next check scheduled for {}",
627 next_check.to_rfc3339()
628 );
629 backoff
630 } else {
631 let wake_time = chrono::Utc::now()
632 + chrono::Duration::from_std(remaining).unwrap_or_else(|e| {
633 warn!("chrono::Duration::from_std failed for rollout delay ({e}), defaulting to 1 minute");
634 chrono::Duration::minutes(1)
635 });
636 info!("Will apply upgrade at {}", wake_time.to_rfc3339());
637 remaining
638 }
639 },
640 );
641 tokio::select! {
644 () = shutdown.cancelled() => {
645 break;
646 }
647 () = tokio::time::sleep(sleep_duration) => {}
648 }
649 }
650 }
651 }
652 });
653 }
654
655 info!("Node running, waiting for shutdown signal");
656
657 self.run_event_loop().await?;
659
660 if let Some(ref mut engine) = self.replication_engine {
663 engine.shutdown().await;
664 }
665
666 if let Some(handle) = self.protocol_task.take() {
668 handle.abort();
669 }
670
671 info!("Shutting down P2P node...");
673 if let Err(e) = self.p2p_node.shutdown().await {
674 warn!("Error during P2P node shutdown: {e}");
675 }
676
677 if let Err(e) = self.events_tx.send(NodeEvent::ShuttingDown) {
678 warn!("Failed to send ShuttingDown event: {e}");
679 }
680 info!("Node shutdown complete");
681
682 let exit_code = self.upgrade_exit_code.load(Ordering::SeqCst);
686 if exit_code >= 0 {
687 info!("Exiting with code {} for upgrade restart", exit_code);
688 std::process::exit(exit_code);
689 }
690
691 Ok(())
692 }
693
694 #[cfg(unix)]
696 async fn run_event_loop(&self) -> Result<()> {
697 let mut sigterm = signal(SignalKind::terminate())?;
698 let mut sighup = signal(SignalKind::hangup())?;
699
700 loop {
701 tokio::select! {
702 () = self.shutdown.cancelled() => {
703 info!("Shutdown signal received");
704 break;
705 }
706 _ = tokio::signal::ctrl_c() => {
707 info!("Received SIGINT (Ctrl-C), initiating shutdown");
708 self.shutdown();
709 break;
710 }
711 _ = sigterm.recv() => {
712 info!("Received SIGTERM, initiating shutdown");
713 self.shutdown();
714 break;
715 }
716 _ = sighup.recv() => {
717 info!("Received SIGHUP (config reload not yet supported)");
718 }
719 }
720 }
721 Ok(())
722 }
723
724 #[cfg(not(unix))]
726 async fn run_event_loop(&self) -> Result<()> {
727 loop {
728 tokio::select! {
729 () = self.shutdown.cancelled() => {
730 info!("Shutdown signal received");
731 break;
732 }
733 _ = tokio::signal::ctrl_c() => {
734 info!("Received Ctrl-C, initiating shutdown");
735 self.shutdown();
736 break;
737 }
738 }
739 }
740 Ok(())
741 }
742
743 fn start_protocol_routing(&mut self) {
748 let protocol = match self.ant_protocol {
749 Some(ref p) => Arc::clone(p),
750 None => return,
751 };
752
753 let mut events = self.p2p_node.subscribe_events();
754 let p2p = Arc::clone(&self.p2p_node);
755 let semaphore = Arc::new(Semaphore::new(64));
756
757 self.protocol_task = Some(tokio::spawn(async move {
758 while let Ok(event) = events.recv().await {
759 if let P2PEvent::Message {
760 topic,
761 source: Some(source),
762 data,
763 ..
764 } = event
765 {
766 let handler_info: Option<(&str, &str)> = if topic == CHUNK_PROTOCOL_ID {
767 Some(("chunk", CHUNK_PROTOCOL_ID))
768 } else {
769 None
770 };
771
772 if let Some((data_type, response_topic)) = handler_info {
773 debug!("Received {data_type} protocol message from {source}");
774 let protocol = Arc::clone(&protocol);
775 let p2p = Arc::clone(&p2p);
776 let sem = semaphore.clone();
777 tokio::spawn(async move {
778 let Ok(_permit) = sem.acquire().await else {
779 return;
780 };
781 let result = match data_type {
782 "chunk" => protocol.try_handle_request(&data).await,
783 _ => return,
784 };
785 match result {
786 Ok(Some(response)) => {
787 if let Err(e) = p2p
788 .send_message(
789 &source,
790 response_topic,
791 response.to_vec(),
792 &[],
793 )
794 .await
795 {
796 warn!("Failed to send {data_type} protocol response to {source}: {e}");
797 }
798 }
799 Ok(None) => {}
800 Err(e) => {
801 warn!("{data_type} protocol handler error: {e}");
802 }
803 }
804 });
805 }
806 }
807 }
808 }));
809 info!("Protocol message routing started");
810 }
811
812 pub fn shutdown(&self) {
814 self.shutdown.cancel();
815 }
816}
817
818fn jittered_interval(base: std::time::Duration) -> std::time::Duration {
821 let secs = base.as_secs();
822 let variance = secs / 20; if variance == 0 {
824 return base;
825 }
826 let jitter = rand::thread_rng().gen_range(0..=variance * 2);
827 std::time::Duration::from_secs(secs.saturating_sub(variance) + jitter)
828}
829
830#[cfg(test)]
831#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
832mod tests {
833 use super::*;
834 use crate::config::NODES_SUBDIR;
835
836 #[test]
837 fn test_build_upgrade_monitor_staged_rollout_enabled() {
838 let config = NodeConfig {
839 upgrade: crate::config::UpgradeConfig {
840 staged_rollout_hours: 24,
841 ..Default::default()
842 },
843 ..Default::default()
844 };
845 let seed = b"node-seed";
846
847 let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
848 assert!(monitor.has_staged_rollout());
849 }
850
851 #[test]
852 fn test_build_upgrade_monitor_staged_rollout_disabled() {
853 let config = NodeConfig {
854 upgrade: crate::config::UpgradeConfig {
855 staged_rollout_hours: 0,
856 ..Default::default()
857 },
858 ..Default::default()
859 };
860 let seed = b"node-seed";
861
862 let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
863 assert!(!monitor.has_staged_rollout());
864 }
865
866 #[test]
867 fn test_build_core_config_sets_production_mode() {
868 let config = NodeConfig {
869 network_mode: NetworkMode::Production,
870 ..Default::default()
871 };
872 let core = NodeBuilder::build_core_config(&config).expect("core config");
873 assert!(core.diversity_config.is_some());
874 }
875
876 #[test]
877 fn test_build_core_config_ipv4_only() {
878 let config = NodeConfig {
879 ipv4_only: true,
880 ..Default::default()
881 };
882 let core = NodeBuilder::build_core_config(&config).expect("core config");
883 assert!(!core.ipv6, "ipv4_only should disable IPv6");
884 }
885
886 #[test]
887 fn test_build_core_config_dual_stack_by_default() {
888 let config = NodeConfig::default();
889 let core = NodeBuilder::build_core_config(&config).expect("core config");
890 assert!(core.ipv6, "dual-stack should be the default");
891 }
892
893 #[test]
894 fn test_build_core_config_sets_development_mode_permissive() {
895 let config = NodeConfig {
896 network_mode: NetworkMode::Development,
897 ..Default::default()
898 };
899 let core = NodeBuilder::build_core_config(&config).expect("core config");
900 let diversity = core.diversity_config.expect("diversity");
901 assert_eq!(diversity.max_per_ip, Some(usize::MAX));
902 assert_eq!(diversity.max_per_subnet, Some(usize::MAX));
903 }
904
905 #[test]
906 fn test_scan_identity_dirs_empty_dir() {
907 let tmp = tempfile::tempdir().unwrap();
908 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
909 assert!(dirs.is_empty());
910 }
911
912 #[test]
913 fn test_scan_identity_dirs_nonexistent_dir() {
914 let tmp = tempfile::tempdir().unwrap();
915 let path = tmp.path().join("nonexistent_identity_dir");
916 let dirs = NodeBuilder::scan_identity_dirs(&path).unwrap();
917 assert!(dirs.is_empty());
918 }
919
920 #[test]
921 fn test_scan_identity_dirs_finds_one() {
922 let tmp = tempfile::tempdir().unwrap();
923 let node_dir = tmp.path().join("abc123");
924 std::fs::create_dir_all(&node_dir).unwrap();
925 std::fs::write(node_dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
926
927 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
928 assert_eq!(dirs.len(), 1);
929 assert_eq!(dirs[0], node_dir);
930 }
931
932 #[test]
933 fn test_scan_identity_dirs_finds_multiple() {
934 let tmp = tempfile::tempdir().unwrap();
935 for name in &["node_a", "node_b"] {
936 let dir = tmp.path().join(name);
937 std::fs::create_dir_all(&dir).unwrap();
938 std::fs::write(dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
939 }
940 std::fs::create_dir_all(tmp.path().join("no_key")).unwrap();
942
943 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
944 assert_eq!(dirs.len(), 2);
945 }
946
947 #[tokio::test]
948 async fn test_resolve_identity_first_run_creates_identity() {
949 let tmp = tempfile::tempdir().unwrap();
950 let mut config = NodeConfig {
951 root_dir: tmp.path().to_path_buf(),
952 ..Default::default()
953 };
954
955 let identity = NodeBuilder::resolve_identity(&mut config).await.unwrap();
956 assert!(tmp.path().join(NODE_IDENTITY_FILENAME).exists());
958 let peer_id = identity.peer_id().to_hex();
960 assert_eq!(peer_id.len(), 64); }
962
963 #[tokio::test]
964 async fn test_resolve_identity_loads_existing() {
965 let tmp = tempfile::tempdir().unwrap();
966
967 let original = NodeIdentity::generate().unwrap();
969 original
970 .save_to_file(&tmp.path().join(NODE_IDENTITY_FILENAME))
971 .await
972 .unwrap();
973
974 let mut config = NodeConfig {
975 root_dir: tmp.path().to_path_buf(),
976 ..Default::default()
977 };
978
979 let loaded = NodeBuilder::resolve_identity(&mut config).await.unwrap();
980 assert_eq!(loaded.peer_id(), original.peer_id());
981 }
982
983 #[test]
984 fn test_peer_id_hex_length() {
985 let id = saorsa_core::identity::PeerId::from_bytes([0x42; 32]);
986 let hex = id.to_hex();
987 assert_eq!(hex.len(), 64); }
989
990 #[tokio::test]
994 async fn test_identity_persisted_across_restarts() {
995 let base_dir = tempfile::tempdir().unwrap();
996 let nodes_dir = base_dir.path().join(NODES_SUBDIR);
997
998 let identity1 = NodeIdentity::generate().unwrap();
1000 let peer_id1 = identity1.peer_id().to_hex();
1001 let peer_dir = nodes_dir.join(&peer_id1);
1002 std::fs::create_dir_all(&peer_dir).unwrap();
1003 identity1
1004 .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
1005 .await
1006 .unwrap();
1007
1008 assert_eq!(peer_id1.len(), 64);
1010 assert_eq!(peer_dir.file_name().unwrap().to_string_lossy(), peer_id1);
1011
1012 let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1014 assert_eq!(identity_dirs.len(), 1);
1015 let loaded = NodeIdentity::load_from_file(&identity_dirs[0].join(NODE_IDENTITY_FILENAME))
1016 .await
1017 .unwrap();
1018 let peer_id2 = loaded.peer_id().to_hex();
1019
1020 assert_eq!(peer_id1, peer_id2, "peer_id must survive restart");
1021 assert_eq!(
1022 identity_dirs[0], peer_dir,
1023 "root_dir must be the same directory"
1024 );
1025 }
1026
1027 #[tokio::test]
1030 async fn test_multiple_identities_errors() {
1031 let base_dir = tempfile::tempdir().unwrap();
1032 let nodes_dir = base_dir.path().join(NODES_SUBDIR);
1033
1034 for name in &["aaaa", "bbbb"] {
1036 let dir = nodes_dir.join(name);
1037 std::fs::create_dir_all(&dir).unwrap();
1038 let identity = NodeIdentity::generate().unwrap();
1039 identity
1040 .save_to_file(&dir.join(NODE_IDENTITY_FILENAME))
1041 .await
1042 .unwrap();
1043 }
1044
1045 let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1046 assert_eq!(identity_dirs.len(), 2, "should find both identity dirs");
1047 }
1048
1049 #[tokio::test]
1052 async fn test_explicit_root_dir_persists_across_restarts() {
1053 let tmp = tempfile::tempdir().unwrap();
1054
1055 let mut config1 = NodeConfig {
1057 root_dir: tmp.path().to_path_buf(),
1058 ..Default::default()
1059 };
1060 let identity1 = NodeBuilder::resolve_identity(&mut config1).await.unwrap();
1061
1062 let mut config2 = NodeConfig {
1064 root_dir: tmp.path().to_path_buf(),
1065 ..Default::default()
1066 };
1067 let identity2 = NodeBuilder::resolve_identity(&mut config2).await.unwrap();
1068
1069 assert_eq!(
1070 identity1.peer_id(),
1071 identity2.peer_id(),
1072 "explicit --root-dir must yield stable identity"
1073 );
1074 }
1075}