1use crate::ant_protocol::CHUNK_PROTOCOL_ID;
4use crate::config::{
5 default_nodes_dir, default_root_dir, NetworkMode, NodeConfig, NODE_IDENTITY_FILENAME,
6};
7use crate::error::{Error, Result};
8use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender};
9use crate::logging::{debug, error, info, warn};
10use crate::payment::metrics::QuotingMetricsTracker;
11use crate::payment::wallet::parse_rewards_address;
12use crate::payment::{EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
13use crate::replication::config::ReplicationConfig;
14use crate::replication::ReplicationEngine;
15use crate::storage::lmdb::MIB;
16use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
17use crate::upgrade::{
18 upgrade_cache_dir, AutoApplyUpgrader, BinaryCache, ReleaseCache, UpgradeMonitor, UpgradeResult,
19};
20use rand::Rng;
21use saorsa_core::identity::NodeIdentity;
22use saorsa_core::{
23 BootstrapConfig as CoreBootstrapConfig, BootstrapManager,
24 IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent,
25 P2PNode,
26};
27use std::path::PathBuf;
28use std::sync::atomic::{AtomicI32, Ordering};
29use std::sync::Arc;
30use tokio::sync::Semaphore;
31use tokio::task::JoinHandle;
32use tokio_util::sync::CancellationToken;
33
34#[cfg(unix)]
35use tokio::signal::unix::{signal, SignalKind};
36
37pub struct NodeBuilder {
39 config: NodeConfig,
40}
41
42impl NodeBuilder {
43 #[must_use]
45 pub fn new(config: NodeConfig) -> Self {
46 Self { config }
47 }
48
49 pub async fn build(mut self) -> Result<RunningNode> {
55 info!("Building ant-node with config: {:?}", self.config);
56
57 if self.config.network_mode == NetworkMode::Production {
59 match self.config.payment.rewards_address {
60 None => {
61 return Err(Error::Config(
62 "CRITICAL: Rewards address is not configured. \
63 Set payment.rewards_address in config to your Arbitrum wallet address."
64 .to_string(),
65 ));
66 }
67 Some(ref addr) if addr == "0xYOUR_ARBITRUM_ADDRESS_HERE" || addr.is_empty() => {
68 return Err(Error::Config(
69 "CRITICAL: Rewards address is not configured. \
70 Set payment.rewards_address in config to your Arbitrum wallet address."
71 .to_string(),
72 ));
73 }
74 Some(_) => {}
75 }
76 }
77
78 let identity = Arc::new(Self::resolve_identity(&mut self.config).await?);
80 let peer_id = identity.peer_id().to_hex();
81
82 info!(peer_id = %peer_id, root_dir = %self.config.root_dir.display(), "Node identity resolved");
83
84 std::fs::create_dir_all(&self.config.root_dir)?;
86
87 let shutdown = CancellationToken::new();
89
90 let (events_tx, events_rx) = create_event_channel();
92
93 let mut core_config = Self::build_core_config(&self.config)?;
95 core_config.node_identity = Some(Arc::clone(&identity));
98 debug!("Core config: {:?}", core_config);
99
100 let p2p_node = P2PNode::new(core_config)
102 .await
103 .map_err(|e| Error::Startup(format!("Failed to create P2P node: {e}")))?;
104
105 let upgrade_monitor = {
107 let node_id_seed = p2p_node.peer_id().as_bytes();
108 Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
109 };
110
111 let bootstrap_manager = if self.config.bootstrap_cache.enabled {
113 Self::build_bootstrap_manager(&self.config).await
114 } else {
115 info!("Bootstrap cache disabled");
116 None
117 };
118
119 let (ant_protocol, fresh_write_rx) = if self.config.storage.enabled {
122 let (fresh_write_tx, fresh_write_rx) = tokio::sync::mpsc::unbounded_channel();
123 let mut protocol = Self::build_ant_protocol(&self.config, &identity).await?;
124 protocol.set_fresh_write_sender(fresh_write_tx);
125 (Some(Arc::new(protocol)), Some(fresh_write_rx))
126 } else {
127 info!("Chunk storage disabled");
128 (None, None)
129 };
130
131 let p2p_arc = Arc::new(p2p_node);
132
133 let replication_engine =
135 if let (Some(ref protocol), Some(fresh_rx)) = (&ant_protocol, fresh_write_rx) {
136 let repl_config = ReplicationConfig::default();
137 let storage_arc = protocol.storage();
138 let payment_verifier_arc = protocol.payment_verifier_arc();
139 match ReplicationEngine::new(
140 repl_config,
141 Arc::clone(&p2p_arc),
142 storage_arc,
143 payment_verifier_arc,
144 &self.config.root_dir,
145 fresh_rx,
146 shutdown.clone(),
147 )
148 .await
149 {
150 Ok(engine) => Some(engine),
151 Err(e) => {
152 warn!("Failed to initialize replication engine: {e}");
153 None
154 }
155 }
156 } else {
157 None
158 };
159
160 let node = RunningNode {
161 config: self.config,
162 p2p_node: p2p_arc,
163 shutdown,
164 events_tx,
165 events_rx: Some(events_rx),
166 upgrade_monitor,
167 bootstrap_manager,
168 ant_protocol,
169 replication_engine,
170 protocol_task: None,
171 upgrade_exit_code: Arc::new(AtomicI32::new(-1)),
172 };
173
174 Ok(node)
175 }
176
177 fn build_core_config(config: &NodeConfig) -> Result<CoreNodeConfig> {
179 let local = matches!(config.network_mode, NetworkMode::Development);
180
181 let mut core_config = CoreNodeConfig::builder()
182 .port(config.port)
183 .ipv6(!config.ipv4_only)
184 .local(local)
185 .max_message_size(config.max_message_size)
186 .build()
187 .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?;
188
189 core_config.bootstrap_peers = config
191 .bootstrap
192 .iter()
193 .map(|addr| MultiAddr::quic(*addr))
194 .collect();
195
196 match config.network_mode {
198 NetworkMode::Production => {
199 core_config.diversity_config = Some(CoreDiversityConfig::default());
200 }
201 NetworkMode::Testnet => {
202 core_config.allow_loopback = true;
204 core_config.diversity_config = Some(CoreDiversityConfig {
205 max_per_ip: config.testnet.max_per_ip,
206 max_per_subnet: config.testnet.max_per_subnet,
207 });
208 }
209 NetworkMode::Development => {
210 core_config.diversity_config = Some(CoreDiversityConfig::permissive());
211 }
212 }
213
214 core_config.close_group_cache_dir = Some(
217 config
218 .close_group_cache_dir
219 .clone()
220 .unwrap_or_else(|| config.root_dir.clone()),
221 );
222
223 Ok(core_config)
224 }
225
226 async fn resolve_identity(config: &mut NodeConfig) -> Result<NodeIdentity> {
240 if config.root_dir != default_root_dir() {
241 return Self::load_or_generate_identity(&config.root_dir).await;
242 }
243
244 let nodes_dir = default_nodes_dir();
245 let identity_dirs = Self::scan_identity_dirs(&nodes_dir)?;
246
247 match identity_dirs.len() {
248 0 => {
249 let identity = NodeIdentity::generate().map_err(|e| {
251 Error::Startup(format!("Failed to generate node identity: {e}"))
252 })?;
253 let peer_id = identity.peer_id().to_hex();
254 let peer_dir = nodes_dir.join(&peer_id);
255 std::fs::create_dir_all(&peer_dir)?;
256 identity
257 .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
258 .await
259 .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
260 config.root_dir = peer_dir;
261 Ok(identity)
262 }
263 1 => {
264 let dir = identity_dirs
265 .first()
266 .ok_or_else(|| Error::Config("No identity dirs found".to_string()))?;
267 let identity = NodeIdentity::load_from_file(&dir.join(NODE_IDENTITY_FILENAME))
268 .await
269 .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))?;
270 config.root_dir.clone_from(dir);
271 Ok(identity)
272 }
273 _ => {
274 let dirs: Vec<String> = identity_dirs
275 .iter()
276 .filter_map(|d| d.file_name().map(|n| n.to_string_lossy().into_owned()))
277 .collect();
278 Err(Error::Config(format!(
279 "Multiple node identities found at {}: [{}]. Specify --root-dir to select one.",
280 nodes_dir.display(),
281 dirs.join(", ")
282 )))
283 }
284 }
285 }
286
287 async fn load_or_generate_identity(dir: &std::path::Path) -> Result<NodeIdentity> {
289 let key_path = dir.join(NODE_IDENTITY_FILENAME);
290 if key_path.exists() {
291 NodeIdentity::load_from_file(&key_path)
292 .await
293 .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))
294 } else {
295 let identity = NodeIdentity::generate()
296 .map_err(|e| Error::Startup(format!("Failed to generate node identity: {e}")))?;
297 std::fs::create_dir_all(dir)?;
298 identity
299 .save_to_file(&key_path)
300 .await
301 .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
302 Ok(identity)
303 }
304 }
305
306 fn scan_identity_dirs(base_dir: &std::path::Path) -> Result<Vec<PathBuf>> {
308 let mut dirs = Vec::new();
309 let read_dir = match std::fs::read_dir(base_dir) {
310 Ok(rd) => rd,
311 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(dirs),
312 Err(e) => return Err(e.into()),
313 };
314 for entry in read_dir {
315 let entry = entry?;
316 let path = entry.path();
317 if path.is_dir() && path.join(NODE_IDENTITY_FILENAME).exists() {
318 dirs.push(path);
319 }
320 }
321 Ok(dirs)
322 }
323
324 fn build_upgrade_monitor(config: &NodeConfig, node_id_seed: &[u8]) -> UpgradeMonitor {
325 let mut monitor = UpgradeMonitor::new(
326 config.upgrade.github_repo.clone(),
327 config.upgrade.channel,
328 config.upgrade.check_interval_hours,
329 );
330
331 if let Ok(cache_dir) = upgrade_cache_dir() {
332 monitor = monitor.with_release_cache(ReleaseCache::new(
333 cache_dir,
334 std::time::Duration::from_secs(3600),
335 ));
336 }
337
338 if config.upgrade.staged_rollout_hours > 0 {
339 monitor =
340 monitor.with_staged_rollout(node_id_seed, config.upgrade.staged_rollout_hours);
341 }
342
343 monitor
344 }
345
346 async fn build_ant_protocol(
351 config: &NodeConfig,
352 identity: &NodeIdentity,
353 ) -> Result<AntProtocol> {
354 let storage_config = LmdbStorageConfig {
356 root_dir: config.root_dir.clone(),
357 verify_on_read: config.storage.verify_on_read,
358 max_map_size: config.storage.db_size_gb.saturating_mul(1024 * 1024 * 1024),
359 disk_reserve: config.storage.disk_reserve_mb.saturating_mul(MIB),
360 };
361 let storage = LmdbStorage::new(storage_config)
362 .await
363 .map_err(|e| Error::Startup(format!("Failed to create LMDB storage: {e}")))?;
364
365 let rewards_address = match config.payment.rewards_address {
367 Some(ref addr) => parse_rewards_address(addr)?,
368 None => {
369 return Err(Error::Startup(
370 "No rewards address configured. Set --rewards-address or payment.rewards_address in config.".to_string(),
371 ));
372 }
373 };
374
375 let evm_network = config.payment.evm_network.clone().into_evm_network();
377 let payment_config = PaymentVerifierConfig {
378 evm: EvmVerifierConfig {
379 network: evm_network,
380 },
381 cache_capacity: config.payment.cache_capacity,
382 local_rewards_address: rewards_address,
383 };
384 let payment_verifier = PaymentVerifier::new(payment_config);
385 let metrics_tracker = QuotingMetricsTracker::new(0);
386 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
387
388 crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)?;
391
392 let protocol = AntProtocol::new(
393 Arc::new(storage),
394 Arc::new(payment_verifier),
395 Arc::new(quote_generator),
396 );
397
398 info!(
399 "ANT protocol handler initialized with ML-DSA-65 signing (protocol={CHUNK_PROTOCOL_ID})"
400 );
401
402 Ok(protocol)
403 }
404
405 async fn build_bootstrap_manager(config: &NodeConfig) -> Option<BootstrapManager> {
407 let cache_dir = config
408 .bootstrap_cache
409 .cache_dir
410 .clone()
411 .unwrap_or_else(|| config.root_dir.join("bootstrap_cache"));
412
413 if let Err(e) = std::fs::create_dir_all(&cache_dir) {
415 warn!("Failed to create bootstrap cache directory: {e}");
416 return None;
417 }
418
419 let bootstrap_config = CoreBootstrapConfig {
420 cache_dir,
421 max_peers: config.bootstrap_cache.max_contacts,
422 ..CoreBootstrapConfig::default()
423 };
424
425 match BootstrapManager::with_config(bootstrap_config).await {
426 Ok(manager) => {
427 info!(
428 "Bootstrap cache initialized with {} max contacts",
429 config.bootstrap_cache.max_contacts
430 );
431 Some(manager)
432 }
433 Err(e) => {
434 warn!("Failed to initialize bootstrap cache: {e}");
435 None
436 }
437 }
438 }
439}
440
441pub struct RunningNode {
443 config: NodeConfig,
444 p2p_node: Arc<P2PNode>,
445 shutdown: CancellationToken,
446 events_tx: NodeEventsSender,
447 events_rx: Option<NodeEventsChannel>,
448 upgrade_monitor: Option<UpgradeMonitor>,
449 bootstrap_manager: Option<BootstrapManager>,
451 ant_protocol: Option<Arc<AntProtocol>>,
453 replication_engine: Option<ReplicationEngine>,
455 protocol_task: Option<JoinHandle<()>>,
457 upgrade_exit_code: Arc<AtomicI32>,
459}
460
461impl RunningNode {
462 #[must_use]
464 pub fn root_dir(&self) -> &PathBuf {
465 &self.config.root_dir
466 }
467
468 pub fn events(&mut self) -> Option<NodeEventsChannel> {
472 self.events_rx.take()
473 }
474
475 #[must_use]
477 pub fn subscribe_events(&self) -> NodeEventsChannel {
478 self.events_tx.subscribe()
479 }
480
481 #[allow(clippy::too_many_lines)]
487 pub async fn run(&mut self) -> Result<()> {
488 info!("Node runtime loop starting");
489
490 let dht_events_for_bootstrap = self
494 .replication_engine
495 .as_ref()
496 .map(|_| self.p2p_node.dht_manager().subscribe_events());
497
498 self.p2p_node
500 .start()
501 .await
502 .map_err(|e| Error::Startup(format!("Failed to start P2P node: {e}")))?;
503
504 let listen_addrs = self.p2p_node.listen_addrs().await;
505 info!(listen_addrs = ?listen_addrs, "P2P node started");
506
507 let actual_port = listen_addrs
509 .first()
510 .and_then(MultiAddr::port)
511 .unwrap_or(self.config.port);
512 info!(
513 port = actual_port,
514 "Node is running on port: {}", actual_port
515 );
516
517 if let Err(e) = self.events_tx.send(NodeEvent::Started) {
519 warn!("Failed to send Started event: {e}");
520 }
521
522 self.start_protocol_routing();
524
525 if let Some(ref mut engine) = self.replication_engine {
527 if let Some(dht_events) = dht_events_for_bootstrap {
530 engine.start(dht_events);
531 }
532 info!("Replication engine started");
533 }
534
535 if let Some(monitor) = self.upgrade_monitor.take() {
537 let events_tx = self.events_tx.clone();
538 let shutdown = self.shutdown.clone();
539 let stop_on_upgrade = self.config.upgrade.stop_on_upgrade;
540 let upgrade_exit_code = Arc::clone(&self.upgrade_exit_code);
541
542 tokio::spawn(async move {
543 let mut monitor = monitor;
544 let mut upgrader = AutoApplyUpgrader::new().with_stop_on_upgrade(stop_on_upgrade);
545 if let Ok(cache_dir) = upgrade_cache_dir() {
546 upgrader = upgrader.with_binary_cache(BinaryCache::new(cache_dir));
547 }
548
549 {
552 let jitter_duration = jittered_interval(monitor.check_interval());
553 let first_check_time = chrono::Utc::now()
554 + chrono::Duration::from_std(jitter_duration).unwrap_or_else(|e| {
555 warn!("chrono::Duration::from_std failed for jitter ({e}), defaulting to 1 minute");
556 chrono::Duration::minutes(1)
557 });
558 info!(
559 "First upgrade check scheduled for {} (jitter: {}s)",
560 first_check_time.to_rfc3339(),
561 jitter_duration.as_secs()
562 );
563 tokio::time::sleep(jitter_duration).await;
564 }
565
566 loop {
567 tokio::select! {
568 () = shutdown.cancelled() => {
569 break;
570 }
571 result = monitor.check_for_ready_upgrade() => {
572 match result {
573 Ok(Some(upgrade_info)) => {
574 info!(
575 current_version = %upgrader.current_version(),
576 new_version = %upgrade_info.version,
577 "Upgrade available"
578 );
579
580 if let Err(e) = events_tx.send(NodeEvent::UpgradeAvailable {
582 version: upgrade_info.version.to_string(),
583 }) {
584 warn!("Failed to send UpgradeAvailable event: {e}");
585 }
586
587 info!("Starting auto-apply upgrade...");
589 match upgrader.apply_upgrade(&upgrade_info).await {
590 Ok(UpgradeResult::Success { version, exit_code }) => {
591 info!("Upgrade to {} successful, initiating graceful shutdown", version);
592 upgrade_exit_code.store(exit_code, Ordering::SeqCst);
593 shutdown.cancel();
594 break;
595 }
596 Ok(UpgradeResult::RolledBack { reason }) => {
597 warn!("Error during upgrade process: {}", reason);
598 }
599 Ok(UpgradeResult::NoUpgrade) => {
600 info!("Already running latest version");
601 }
602 Err(e) => {
603 error!("Error during upgrade process: {}", e);
604 }
605 }
606 }
607 Ok(None) => {
608 if let Some(remaining) = monitor.time_until_upgrade() {
609 info!(
610 "Upgrade pending, rollout delay remaining: {}m {}s",
611 remaining.as_secs() / 60,
612 remaining.as_secs() % 60
613 );
614 } else {
615 info!("No upgrade available");
616 }
617 }
618 Err(e) => {
619 warn!("Error during upgrade process: {}", e);
620 }
621 }
622 let sleep_duration = monitor.time_until_upgrade().map_or_else(
626 || {
627 let jittered_duration =
629 jittered_interval(monitor.check_interval());
630 let next_check = chrono::Utc::now()
631 + chrono::Duration::from_std(jittered_duration).unwrap_or_else(|e| {
632 warn!("chrono::Duration::from_std failed for interval ({e}), defaulting to 1 hour");
633 chrono::Duration::hours(1)
634 });
635 info!("Next upgrade check scheduled for {}", next_check.to_rfc3339());
636 jittered_duration
637 },
638 |remaining| {
639 if remaining.is_zero() {
643 let backoff = jittered_interval(monitor.check_interval());
644 let next_check = chrono::Utc::now()
645 + chrono::Duration::from_std(backoff).unwrap_or_else(|e| {
646 warn!("chrono::Duration::from_std failed for backoff ({e}), defaulting to 1 hour");
647 chrono::Duration::hours(1)
648 });
649 info!(
650 "Upgrade rollout delay elapsed but previous apply did not succeed; \
651 backing off, next check scheduled for {}",
652 next_check.to_rfc3339()
653 );
654 backoff
655 } else {
656 let wake_time = chrono::Utc::now()
657 + chrono::Duration::from_std(remaining).unwrap_or_else(|e| {
658 warn!("chrono::Duration::from_std failed for rollout delay ({e}), defaulting to 1 minute");
659 chrono::Duration::minutes(1)
660 });
661 info!("Will apply upgrade at {}", wake_time.to_rfc3339());
662 remaining
663 }
664 },
665 );
666 tokio::select! {
669 () = shutdown.cancelled() => {
670 break;
671 }
672 () = tokio::time::sleep(sleep_duration) => {}
673 }
674 }
675 }
676 }
677 });
678 }
679
680 info!("Node running, waiting for shutdown signal");
681
682 self.run_event_loop().await?;
684
685 if let Some(ref manager) = self.bootstrap_manager {
687 let stats = manager.stats().await;
688 info!(
689 "Bootstrap cache shutdown: {} peers, avg quality {:.2}",
690 stats.total_peers, stats.average_quality
691 );
692 }
693
694 if let Some(ref mut engine) = self.replication_engine {
697 engine.shutdown().await;
698 }
699
700 if let Some(handle) = self.protocol_task.take() {
702 handle.abort();
703 }
704
705 info!("Shutting down P2P node...");
707 if let Err(e) = self.p2p_node.shutdown().await {
708 warn!("Error during P2P node shutdown: {e}");
709 }
710
711 if let Err(e) = self.events_tx.send(NodeEvent::ShuttingDown) {
712 warn!("Failed to send ShuttingDown event: {e}");
713 }
714 info!("Node shutdown complete");
715
716 let exit_code = self.upgrade_exit_code.load(Ordering::SeqCst);
720 if exit_code >= 0 {
721 info!("Exiting with code {} for upgrade restart", exit_code);
722 std::process::exit(exit_code);
723 }
724
725 Ok(())
726 }
727
728 #[cfg(unix)]
730 async fn run_event_loop(&self) -> Result<()> {
731 let mut sigterm = signal(SignalKind::terminate())?;
732 let mut sighup = signal(SignalKind::hangup())?;
733
734 loop {
735 tokio::select! {
736 () = self.shutdown.cancelled() => {
737 info!("Shutdown signal received");
738 break;
739 }
740 _ = tokio::signal::ctrl_c() => {
741 info!("Received SIGINT (Ctrl-C), initiating shutdown");
742 self.shutdown();
743 break;
744 }
745 _ = sigterm.recv() => {
746 info!("Received SIGTERM, initiating shutdown");
747 self.shutdown();
748 break;
749 }
750 _ = sighup.recv() => {
751 info!("Received SIGHUP (config reload not yet supported)");
752 }
753 }
754 }
755 Ok(())
756 }
757
758 #[cfg(not(unix))]
760 async fn run_event_loop(&self) -> Result<()> {
761 loop {
762 tokio::select! {
763 () = self.shutdown.cancelled() => {
764 info!("Shutdown signal received");
765 break;
766 }
767 _ = tokio::signal::ctrl_c() => {
768 info!("Received Ctrl-C, initiating shutdown");
769 self.shutdown();
770 break;
771 }
772 }
773 }
774 Ok(())
775 }
776
777 fn start_protocol_routing(&mut self) {
782 let protocol = match self.ant_protocol {
783 Some(ref p) => Arc::clone(p),
784 None => return,
785 };
786
787 let mut events = self.p2p_node.subscribe_events();
788 let p2p = Arc::clone(&self.p2p_node);
789 let semaphore = Arc::new(Semaphore::new(64));
790
791 self.protocol_task = Some(tokio::spawn(async move {
792 while let Ok(event) = events.recv().await {
793 if let P2PEvent::Message {
794 topic,
795 source: Some(source),
796 data,
797 } = event
798 {
799 let handler_info: Option<(&str, &str)> = if topic == CHUNK_PROTOCOL_ID {
800 Some(("chunk", CHUNK_PROTOCOL_ID))
801 } else {
802 None
803 };
804
805 if let Some((data_type, response_topic)) = handler_info {
806 debug!("Received {data_type} protocol message from {source}");
807 let protocol = Arc::clone(&protocol);
808 let p2p = Arc::clone(&p2p);
809 let sem = semaphore.clone();
810 tokio::spawn(async move {
811 let Ok(_permit) = sem.acquire().await else {
812 return;
813 };
814 let result = match data_type {
815 "chunk" => protocol.try_handle_request(&data).await,
816 _ => return,
817 };
818 match result {
819 Ok(Some(response)) => {
820 if let Err(e) = p2p
821 .send_message(
822 &source,
823 response_topic,
824 response.to_vec(),
825 &[],
826 )
827 .await
828 {
829 warn!("Failed to send {data_type} protocol response to {source}: {e}");
830 }
831 }
832 Ok(None) => {}
833 Err(e) => {
834 warn!("{data_type} protocol handler error: {e}");
835 }
836 }
837 });
838 }
839 }
840 }
841 }));
842 info!("Protocol message routing started");
843 }
844
845 pub fn shutdown(&self) {
847 self.shutdown.cancel();
848 }
849}
850
851fn jittered_interval(base: std::time::Duration) -> std::time::Duration {
854 let secs = base.as_secs();
855 let variance = secs / 20; if variance == 0 {
857 return base;
858 }
859 let jitter = rand::thread_rng().gen_range(0..=variance * 2);
860 std::time::Duration::from_secs(secs.saturating_sub(variance) + jitter)
861}
862
863#[cfg(test)]
864#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
865mod tests {
866 use super::*;
867 use crate::config::NODES_SUBDIR;
868
869 #[test]
870 fn test_build_upgrade_monitor_staged_rollout_enabled() {
871 let config = NodeConfig {
872 upgrade: crate::config::UpgradeConfig {
873 staged_rollout_hours: 24,
874 ..Default::default()
875 },
876 ..Default::default()
877 };
878 let seed = b"node-seed";
879
880 let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
881 assert!(monitor.has_staged_rollout());
882 }
883
884 #[test]
885 fn test_build_upgrade_monitor_staged_rollout_disabled() {
886 let config = NodeConfig {
887 upgrade: crate::config::UpgradeConfig {
888 staged_rollout_hours: 0,
889 ..Default::default()
890 },
891 ..Default::default()
892 };
893 let seed = b"node-seed";
894
895 let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
896 assert!(!monitor.has_staged_rollout());
897 }
898
899 #[test]
900 fn test_build_core_config_sets_production_mode() {
901 let config = NodeConfig {
902 network_mode: NetworkMode::Production,
903 ..Default::default()
904 };
905 let core = NodeBuilder::build_core_config(&config).expect("core config");
906 assert!(core.diversity_config.is_some());
907 }
908
909 #[test]
910 fn test_build_core_config_ipv4_only() {
911 let config = NodeConfig {
912 ipv4_only: true,
913 ..Default::default()
914 };
915 let core = NodeBuilder::build_core_config(&config).expect("core config");
916 assert!(!core.ipv6, "ipv4_only should disable IPv6");
917 }
918
919 #[test]
920 fn test_build_core_config_dual_stack_by_default() {
921 let config = NodeConfig::default();
922 let core = NodeBuilder::build_core_config(&config).expect("core config");
923 assert!(core.ipv6, "dual-stack should be the default");
924 }
925
926 #[test]
927 fn test_build_core_config_sets_development_mode_permissive() {
928 let config = NodeConfig {
929 network_mode: NetworkMode::Development,
930 ..Default::default()
931 };
932 let core = NodeBuilder::build_core_config(&config).expect("core config");
933 let diversity = core.diversity_config.expect("diversity");
934 assert_eq!(diversity.max_per_ip, Some(usize::MAX));
935 assert_eq!(diversity.max_per_subnet, Some(usize::MAX));
936 }
937
938 #[test]
939 fn test_scan_identity_dirs_empty_dir() {
940 let tmp = tempfile::tempdir().unwrap();
941 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
942 assert!(dirs.is_empty());
943 }
944
945 #[test]
946 fn test_scan_identity_dirs_nonexistent_dir() {
947 let tmp = tempfile::tempdir().unwrap();
948 let path = tmp.path().join("nonexistent_identity_dir");
949 let dirs = NodeBuilder::scan_identity_dirs(&path).unwrap();
950 assert!(dirs.is_empty());
951 }
952
953 #[test]
954 fn test_scan_identity_dirs_finds_one() {
955 let tmp = tempfile::tempdir().unwrap();
956 let node_dir = tmp.path().join("abc123");
957 std::fs::create_dir_all(&node_dir).unwrap();
958 std::fs::write(node_dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
959
960 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
961 assert_eq!(dirs.len(), 1);
962 assert_eq!(dirs[0], node_dir);
963 }
964
965 #[test]
966 fn test_scan_identity_dirs_finds_multiple() {
967 let tmp = tempfile::tempdir().unwrap();
968 for name in &["node_a", "node_b"] {
969 let dir = tmp.path().join(name);
970 std::fs::create_dir_all(&dir).unwrap();
971 std::fs::write(dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
972 }
973 std::fs::create_dir_all(tmp.path().join("no_key")).unwrap();
975
976 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
977 assert_eq!(dirs.len(), 2);
978 }
979
980 #[tokio::test]
981 async fn test_resolve_identity_first_run_creates_identity() {
982 let tmp = tempfile::tempdir().unwrap();
983 let mut config = NodeConfig {
984 root_dir: tmp.path().to_path_buf(),
985 ..Default::default()
986 };
987
988 let identity = NodeBuilder::resolve_identity(&mut config).await.unwrap();
989 assert!(tmp.path().join(NODE_IDENTITY_FILENAME).exists());
991 let peer_id = identity.peer_id().to_hex();
993 assert_eq!(peer_id.len(), 64); }
995
996 #[tokio::test]
997 async fn test_resolve_identity_loads_existing() {
998 let tmp = tempfile::tempdir().unwrap();
999
1000 let original = NodeIdentity::generate().unwrap();
1002 original
1003 .save_to_file(&tmp.path().join(NODE_IDENTITY_FILENAME))
1004 .await
1005 .unwrap();
1006
1007 let mut config = NodeConfig {
1008 root_dir: tmp.path().to_path_buf(),
1009 ..Default::default()
1010 };
1011
1012 let loaded = NodeBuilder::resolve_identity(&mut config).await.unwrap();
1013 assert_eq!(loaded.peer_id(), original.peer_id());
1014 }
1015
1016 #[test]
1017 fn test_peer_id_hex_length() {
1018 let id = saorsa_core::identity::PeerId::from_bytes([0x42; 32]);
1019 let hex = id.to_hex();
1020 assert_eq!(hex.len(), 64); }
1022
1023 #[tokio::test]
1027 async fn test_identity_persisted_across_restarts() {
1028 let base_dir = tempfile::tempdir().unwrap();
1029 let nodes_dir = base_dir.path().join(NODES_SUBDIR);
1030
1031 let identity1 = NodeIdentity::generate().unwrap();
1033 let peer_id1 = identity1.peer_id().to_hex();
1034 let peer_dir = nodes_dir.join(&peer_id1);
1035 std::fs::create_dir_all(&peer_dir).unwrap();
1036 identity1
1037 .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
1038 .await
1039 .unwrap();
1040
1041 assert_eq!(peer_id1.len(), 64);
1043 assert_eq!(peer_dir.file_name().unwrap().to_string_lossy(), peer_id1);
1044
1045 let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1047 assert_eq!(identity_dirs.len(), 1);
1048 let loaded = NodeIdentity::load_from_file(&identity_dirs[0].join(NODE_IDENTITY_FILENAME))
1049 .await
1050 .unwrap();
1051 let peer_id2 = loaded.peer_id().to_hex();
1052
1053 assert_eq!(peer_id1, peer_id2, "peer_id must survive restart");
1054 assert_eq!(
1055 identity_dirs[0], peer_dir,
1056 "root_dir must be the same directory"
1057 );
1058 }
1059
1060 #[tokio::test]
1063 async fn test_multiple_identities_errors() {
1064 let base_dir = tempfile::tempdir().unwrap();
1065 let nodes_dir = base_dir.path().join(NODES_SUBDIR);
1066
1067 for name in &["aaaa", "bbbb"] {
1069 let dir = nodes_dir.join(name);
1070 std::fs::create_dir_all(&dir).unwrap();
1071 let identity = NodeIdentity::generate().unwrap();
1072 identity
1073 .save_to_file(&dir.join(NODE_IDENTITY_FILENAME))
1074 .await
1075 .unwrap();
1076 }
1077
1078 let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1079 assert_eq!(identity_dirs.len(), 2, "should find both identity dirs");
1080 }
1081
1082 #[tokio::test]
1085 async fn test_explicit_root_dir_persists_across_restarts() {
1086 let tmp = tempfile::tempdir().unwrap();
1087
1088 let mut config1 = NodeConfig {
1090 root_dir: tmp.path().to_path_buf(),
1091 ..Default::default()
1092 };
1093 let identity1 = NodeBuilder::resolve_identity(&mut config1).await.unwrap();
1094
1095 let mut config2 = NodeConfig {
1097 root_dir: tmp.path().to_path_buf(),
1098 ..Default::default()
1099 };
1100 let identity2 = NodeBuilder::resolve_identity(&mut config2).await.unwrap();
1101
1102 assert_eq!(
1103 identity1.peer_id(),
1104 identity2.peer_id(),
1105 "explicit --root-dir must yield stable identity"
1106 );
1107 }
1108}