1use crate::ant_protocol::CHUNK_PROTOCOL_ID;
4use crate::config::{
5 default_nodes_dir, default_root_dir, EvmNetworkConfig, NetworkMode, NodeConfig,
6 NODE_IDENTITY_FILENAME,
7};
8use crate::error::{Error, Result};
9use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender};
10use crate::logging::{debug, error, info, warn};
11use crate::payment::metrics::QuotingMetricsTracker;
12use crate::payment::wallet::parse_rewards_address;
13use crate::payment::{EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
14use crate::replication::config::ReplicationConfig;
15use crate::replication::ReplicationEngine;
16use crate::storage::lmdb::MIB;
17use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
18use crate::upgrade::{
19 upgrade_cache_dir, AutoApplyUpgrader, BinaryCache, ReleaseCache, UpgradeMonitor, UpgradeResult,
20};
21use evmlib::Network as EvmNetwork;
22use rand::Rng;
23use saorsa_core::identity::NodeIdentity;
24use saorsa_core::{
25 BootstrapConfig as CoreBootstrapConfig, BootstrapManager,
26 IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent,
27 P2PNode,
28};
29use std::path::PathBuf;
30use std::sync::atomic::{AtomicI32, Ordering};
31use std::sync::Arc;
32use tokio::sync::Semaphore;
33use tokio::task::JoinHandle;
34use tokio_util::sync::CancellationToken;
35
36#[cfg(unix)]
37use tokio::signal::unix::{signal, SignalKind};
38
39pub struct NodeBuilder {
41 config: NodeConfig,
42}
43
44impl NodeBuilder {
45 #[must_use]
47 pub fn new(config: NodeConfig) -> Self {
48 Self { config }
49 }
50
51 pub async fn build(mut self) -> Result<RunningNode> {
57 info!("Building ant-node with config: {:?}", self.config);
58
59 if self.config.network_mode == NetworkMode::Production {
61 match self.config.payment.rewards_address {
62 None => {
63 return Err(Error::Config(
64 "CRITICAL: Rewards address is not configured. \
65 Set payment.rewards_address in config to your Arbitrum wallet address."
66 .to_string(),
67 ));
68 }
69 Some(ref addr) if addr == "0xYOUR_ARBITRUM_ADDRESS_HERE" || addr.is_empty() => {
70 return Err(Error::Config(
71 "CRITICAL: Rewards address is not configured. \
72 Set payment.rewards_address in config to your Arbitrum wallet address."
73 .to_string(),
74 ));
75 }
76 Some(_) => {}
77 }
78 }
79
80 let identity = Arc::new(Self::resolve_identity(&mut self.config).await?);
82 let peer_id = identity.peer_id().to_hex();
83
84 info!(peer_id = %peer_id, root_dir = %self.config.root_dir.display(), "Node identity resolved");
85
86 std::fs::create_dir_all(&self.config.root_dir)?;
88
89 let shutdown = CancellationToken::new();
91
92 let (events_tx, events_rx) = create_event_channel();
94
95 let mut core_config = Self::build_core_config(&self.config)?;
97 core_config.node_identity = Some(Arc::clone(&identity));
100 debug!("Core config: {:?}", core_config);
101
102 let p2p_node = P2PNode::new(core_config)
104 .await
105 .map_err(|e| Error::Startup(format!("Failed to create P2P node: {e}")))?;
106
107 let upgrade_monitor = {
109 let node_id_seed = p2p_node.peer_id().as_bytes();
110 Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
111 };
112
113 let bootstrap_manager = if self.config.bootstrap_cache.enabled {
115 Self::build_bootstrap_manager(&self.config).await
116 } else {
117 info!("Bootstrap cache disabled");
118 None
119 };
120
121 let (ant_protocol, fresh_write_rx) = if self.config.storage.enabled {
124 let (fresh_write_tx, fresh_write_rx) = tokio::sync::mpsc::unbounded_channel();
125 let mut protocol = Self::build_ant_protocol(&self.config, &identity).await?;
126 protocol.set_fresh_write_sender(fresh_write_tx);
127 (Some(Arc::new(protocol)), Some(fresh_write_rx))
128 } else {
129 info!("Chunk storage disabled");
130 (None, None)
131 };
132
133 let p2p_arc = Arc::new(p2p_node);
134
135 let replication_engine =
137 if let (Some(ref protocol), Some(fresh_rx)) = (&ant_protocol, fresh_write_rx) {
138 let repl_config = ReplicationConfig::default();
139 let storage_arc = protocol.storage();
140 let payment_verifier_arc = protocol.payment_verifier_arc();
141 match ReplicationEngine::new(
142 repl_config,
143 Arc::clone(&p2p_arc),
144 storage_arc,
145 payment_verifier_arc,
146 &self.config.root_dir,
147 fresh_rx,
148 shutdown.clone(),
149 )
150 .await
151 {
152 Ok(engine) => Some(engine),
153 Err(e) => {
154 warn!("Failed to initialize replication engine: {e}");
155 None
156 }
157 }
158 } else {
159 None
160 };
161
162 let node = RunningNode {
163 config: self.config,
164 p2p_node: p2p_arc,
165 shutdown,
166 events_tx,
167 events_rx: Some(events_rx),
168 upgrade_monitor,
169 bootstrap_manager,
170 ant_protocol,
171 replication_engine,
172 protocol_task: None,
173 upgrade_exit_code: Arc::new(AtomicI32::new(-1)),
174 };
175
176 Ok(node)
177 }
178
179 fn build_core_config(config: &NodeConfig) -> Result<CoreNodeConfig> {
181 let local = matches!(config.network_mode, NetworkMode::Development);
182
183 let mut core_config = CoreNodeConfig::builder()
184 .port(config.port)
185 .ipv6(!config.ipv4_only)
186 .local(local)
187 .max_message_size(config.max_message_size)
188 .build()
189 .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?;
190
191 core_config.bootstrap_peers = config
193 .bootstrap
194 .iter()
195 .map(|addr| MultiAddr::quic(*addr))
196 .collect();
197
198 match config.network_mode {
200 NetworkMode::Production => {
201 core_config.diversity_config = Some(CoreDiversityConfig::default());
202 }
203 NetworkMode::Testnet => {
204 core_config.allow_loopback = true;
206 core_config.diversity_config = Some(CoreDiversityConfig {
207 max_per_ip: config.testnet.max_per_ip,
208 max_per_subnet: config.testnet.max_per_subnet,
209 });
210 }
211 NetworkMode::Development => {
212 core_config.diversity_config = Some(CoreDiversityConfig::permissive());
213 }
214 }
215
216 core_config.close_group_cache_dir = Some(
219 config
220 .close_group_cache_dir
221 .clone()
222 .unwrap_or_else(|| config.root_dir.clone()),
223 );
224
225 Ok(core_config)
226 }
227
228 async fn resolve_identity(config: &mut NodeConfig) -> Result<NodeIdentity> {
242 if config.root_dir != default_root_dir() {
243 return Self::load_or_generate_identity(&config.root_dir).await;
244 }
245
246 let nodes_dir = default_nodes_dir();
247 let identity_dirs = Self::scan_identity_dirs(&nodes_dir)?;
248
249 match identity_dirs.len() {
250 0 => {
251 let identity = NodeIdentity::generate().map_err(|e| {
253 Error::Startup(format!("Failed to generate node identity: {e}"))
254 })?;
255 let peer_id = identity.peer_id().to_hex();
256 let peer_dir = nodes_dir.join(&peer_id);
257 std::fs::create_dir_all(&peer_dir)?;
258 identity
259 .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
260 .await
261 .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
262 config.root_dir = peer_dir;
263 Ok(identity)
264 }
265 1 => {
266 let dir = identity_dirs
267 .first()
268 .ok_or_else(|| Error::Config("No identity dirs found".to_string()))?;
269 let identity = NodeIdentity::load_from_file(&dir.join(NODE_IDENTITY_FILENAME))
270 .await
271 .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))?;
272 config.root_dir.clone_from(dir);
273 Ok(identity)
274 }
275 _ => {
276 let dirs: Vec<String> = identity_dirs
277 .iter()
278 .filter_map(|d| d.file_name().map(|n| n.to_string_lossy().into_owned()))
279 .collect();
280 Err(Error::Config(format!(
281 "Multiple node identities found at {}: [{}]. Specify --root-dir to select one.",
282 nodes_dir.display(),
283 dirs.join(", ")
284 )))
285 }
286 }
287 }
288
289 async fn load_or_generate_identity(dir: &std::path::Path) -> Result<NodeIdentity> {
291 let key_path = dir.join(NODE_IDENTITY_FILENAME);
292 if key_path.exists() {
293 NodeIdentity::load_from_file(&key_path)
294 .await
295 .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))
296 } else {
297 let identity = NodeIdentity::generate()
298 .map_err(|e| Error::Startup(format!("Failed to generate node identity: {e}")))?;
299 std::fs::create_dir_all(dir)?;
300 identity
301 .save_to_file(&key_path)
302 .await
303 .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
304 Ok(identity)
305 }
306 }
307
308 fn scan_identity_dirs(base_dir: &std::path::Path) -> Result<Vec<PathBuf>> {
310 let mut dirs = Vec::new();
311 let read_dir = match std::fs::read_dir(base_dir) {
312 Ok(rd) => rd,
313 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(dirs),
314 Err(e) => return Err(e.into()),
315 };
316 for entry in read_dir {
317 let entry = entry?;
318 let path = entry.path();
319 if path.is_dir() && path.join(NODE_IDENTITY_FILENAME).exists() {
320 dirs.push(path);
321 }
322 }
323 Ok(dirs)
324 }
325
326 fn build_upgrade_monitor(config: &NodeConfig, node_id_seed: &[u8]) -> UpgradeMonitor {
327 let mut monitor = UpgradeMonitor::new(
328 config.upgrade.github_repo.clone(),
329 config.upgrade.channel,
330 config.upgrade.check_interval_hours,
331 );
332
333 if let Ok(cache_dir) = upgrade_cache_dir() {
334 monitor = monitor.with_release_cache(ReleaseCache::new(
335 cache_dir,
336 std::time::Duration::from_secs(3600),
337 ));
338 }
339
340 if config.upgrade.staged_rollout_hours > 0 {
341 monitor =
342 monitor.with_staged_rollout(node_id_seed, config.upgrade.staged_rollout_hours);
343 }
344
345 monitor
346 }
347
348 async fn build_ant_protocol(
353 config: &NodeConfig,
354 identity: &NodeIdentity,
355 ) -> Result<AntProtocol> {
356 let storage_config = LmdbStorageConfig {
358 root_dir: config.root_dir.clone(),
359 verify_on_read: config.storage.verify_on_read,
360 max_map_size: config.storage.db_size_gb.saturating_mul(1024 * 1024 * 1024),
361 disk_reserve: config.storage.disk_reserve_mb.saturating_mul(MIB),
362 };
363 let storage = LmdbStorage::new(storage_config)
364 .await
365 .map_err(|e| Error::Startup(format!("Failed to create LMDB storage: {e}")))?;
366
367 let rewards_address = match config.payment.rewards_address {
369 Some(ref addr) => parse_rewards_address(addr)?,
370 None => {
371 return Err(Error::Startup(
372 "No rewards address configured. Set --rewards-address or payment.rewards_address in config.".to_string(),
373 ));
374 }
375 };
376
377 let evm_network = match config.payment.evm_network {
379 EvmNetworkConfig::ArbitrumOne => EvmNetwork::ArbitrumOne,
380 EvmNetworkConfig::ArbitrumSepolia => EvmNetwork::ArbitrumSepoliaTest,
381 };
382 let payment_config = PaymentVerifierConfig {
383 evm: EvmVerifierConfig {
384 network: evm_network,
385 },
386 cache_capacity: config.payment.cache_capacity,
387 local_rewards_address: rewards_address,
388 };
389 let payment_verifier = PaymentVerifier::new(payment_config);
390 let metrics_tracker = QuotingMetricsTracker::new(0);
391 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
392
393 crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)?;
396
397 let protocol = AntProtocol::new(
398 Arc::new(storage),
399 Arc::new(payment_verifier),
400 Arc::new(quote_generator),
401 );
402
403 info!(
404 "ANT protocol handler initialized with ML-DSA-65 signing (protocol={CHUNK_PROTOCOL_ID})"
405 );
406
407 Ok(protocol)
408 }
409
410 async fn build_bootstrap_manager(config: &NodeConfig) -> Option<BootstrapManager> {
412 let cache_dir = config
413 .bootstrap_cache
414 .cache_dir
415 .clone()
416 .unwrap_or_else(|| config.root_dir.join("bootstrap_cache"));
417
418 if let Err(e) = std::fs::create_dir_all(&cache_dir) {
420 warn!("Failed to create bootstrap cache directory: {e}");
421 return None;
422 }
423
424 let bootstrap_config = CoreBootstrapConfig {
425 cache_dir,
426 max_peers: config.bootstrap_cache.max_contacts,
427 ..CoreBootstrapConfig::default()
428 };
429
430 match BootstrapManager::with_config(bootstrap_config).await {
431 Ok(manager) => {
432 info!(
433 "Bootstrap cache initialized with {} max contacts",
434 config.bootstrap_cache.max_contacts
435 );
436 Some(manager)
437 }
438 Err(e) => {
439 warn!("Failed to initialize bootstrap cache: {e}");
440 None
441 }
442 }
443 }
444}
445
446pub struct RunningNode {
448 config: NodeConfig,
449 p2p_node: Arc<P2PNode>,
450 shutdown: CancellationToken,
451 events_tx: NodeEventsSender,
452 events_rx: Option<NodeEventsChannel>,
453 upgrade_monitor: Option<UpgradeMonitor>,
454 bootstrap_manager: Option<BootstrapManager>,
456 ant_protocol: Option<Arc<AntProtocol>>,
458 replication_engine: Option<ReplicationEngine>,
460 protocol_task: Option<JoinHandle<()>>,
462 upgrade_exit_code: Arc<AtomicI32>,
464}
465
466impl RunningNode {
467 #[must_use]
469 pub fn root_dir(&self) -> &PathBuf {
470 &self.config.root_dir
471 }
472
473 pub fn events(&mut self) -> Option<NodeEventsChannel> {
477 self.events_rx.take()
478 }
479
480 #[must_use]
482 pub fn subscribe_events(&self) -> NodeEventsChannel {
483 self.events_tx.subscribe()
484 }
485
486 #[allow(clippy::too_many_lines)]
492 pub async fn run(&mut self) -> Result<()> {
493 info!("Node runtime loop starting");
494
495 let dht_events_for_bootstrap = self
499 .replication_engine
500 .as_ref()
501 .map(|_| self.p2p_node.dht_manager().subscribe_events());
502
503 self.p2p_node
505 .start()
506 .await
507 .map_err(|e| Error::Startup(format!("Failed to start P2P node: {e}")))?;
508
509 let listen_addrs = self.p2p_node.listen_addrs().await;
510 info!(listen_addrs = ?listen_addrs, "P2P node started");
511
512 let actual_port = listen_addrs
514 .first()
515 .and_then(MultiAddr::port)
516 .unwrap_or(self.config.port);
517 info!(
518 port = actual_port,
519 "Node is running on port: {}", actual_port
520 );
521
522 if let Err(e) = self.events_tx.send(NodeEvent::Started) {
524 warn!("Failed to send Started event: {e}");
525 }
526
527 self.start_protocol_routing();
529
530 if let Some(ref mut engine) = self.replication_engine {
532 if let Some(dht_events) = dht_events_for_bootstrap {
535 engine.start(dht_events);
536 }
537 info!("Replication engine started");
538 }
539
540 if let Some(monitor) = self.upgrade_monitor.take() {
542 let events_tx = self.events_tx.clone();
543 let shutdown = self.shutdown.clone();
544 let stop_on_upgrade = self.config.upgrade.stop_on_upgrade;
545 let upgrade_exit_code = Arc::clone(&self.upgrade_exit_code);
546
547 tokio::spawn(async move {
548 let mut monitor = monitor;
549 let mut upgrader = AutoApplyUpgrader::new().with_stop_on_upgrade(stop_on_upgrade);
550 if let Ok(cache_dir) = upgrade_cache_dir() {
551 upgrader = upgrader.with_binary_cache(BinaryCache::new(cache_dir));
552 }
553
554 {
557 let jitter_duration = jittered_interval(monitor.check_interval());
558 let first_check_time = chrono::Utc::now()
559 + chrono::Duration::from_std(jitter_duration).unwrap_or_else(|e| {
560 warn!("chrono::Duration::from_std failed for jitter ({e}), defaulting to 1 minute");
561 chrono::Duration::minutes(1)
562 });
563 info!(
564 "First upgrade check scheduled for {} (jitter: {}s)",
565 first_check_time.to_rfc3339(),
566 jitter_duration.as_secs()
567 );
568 tokio::time::sleep(jitter_duration).await;
569 }
570
571 loop {
572 tokio::select! {
573 () = shutdown.cancelled() => {
574 break;
575 }
576 result = monitor.check_for_ready_upgrade() => {
577 match result {
578 Ok(Some(upgrade_info)) => {
579 info!(
580 current_version = %upgrader.current_version(),
581 new_version = %upgrade_info.version,
582 "Upgrade available"
583 );
584
585 if let Err(e) = events_tx.send(NodeEvent::UpgradeAvailable {
587 version: upgrade_info.version.to_string(),
588 }) {
589 warn!("Failed to send UpgradeAvailable event: {e}");
590 }
591
592 info!("Starting auto-apply upgrade...");
594 match upgrader.apply_upgrade(&upgrade_info).await {
595 Ok(UpgradeResult::Success { version, exit_code }) => {
596 info!("Upgrade to {} successful, initiating graceful shutdown", version);
597 upgrade_exit_code.store(exit_code, Ordering::SeqCst);
598 shutdown.cancel();
599 break;
600 }
601 Ok(UpgradeResult::RolledBack { reason }) => {
602 warn!("Error during upgrade process: {}", reason);
603 }
604 Ok(UpgradeResult::NoUpgrade) => {
605 info!("Already running latest version");
606 }
607 Err(e) => {
608 error!("Error during upgrade process: {}", e);
609 }
610 }
611 }
612 Ok(None) => {
613 if let Some(remaining) = monitor.time_until_upgrade() {
614 info!(
615 "Upgrade pending, rollout delay remaining: {}m {}s",
616 remaining.as_secs() / 60,
617 remaining.as_secs() % 60
618 );
619 } else {
620 info!("No upgrade available");
621 }
622 }
623 Err(e) => {
624 warn!("Error during upgrade process: {}", e);
625 }
626 }
627 let sleep_duration = monitor.time_until_upgrade().map_or_else(
631 || {
632 let jittered_duration =
634 jittered_interval(monitor.check_interval());
635 let next_check = chrono::Utc::now()
636 + chrono::Duration::from_std(jittered_duration).unwrap_or_else(|e| {
637 warn!("chrono::Duration::from_std failed for interval ({e}), defaulting to 1 hour");
638 chrono::Duration::hours(1)
639 });
640 info!("Next upgrade check scheduled for {}", next_check.to_rfc3339());
641 jittered_duration
642 },
643 |remaining| {
644 if remaining.is_zero() {
648 let backoff = jittered_interval(monitor.check_interval());
649 let next_check = chrono::Utc::now()
650 + chrono::Duration::from_std(backoff).unwrap_or_else(|e| {
651 warn!("chrono::Duration::from_std failed for backoff ({e}), defaulting to 1 hour");
652 chrono::Duration::hours(1)
653 });
654 info!(
655 "Upgrade rollout delay elapsed but previous apply did not succeed; \
656 backing off, next check scheduled for {}",
657 next_check.to_rfc3339()
658 );
659 backoff
660 } else {
661 let wake_time = chrono::Utc::now()
662 + chrono::Duration::from_std(remaining).unwrap_or_else(|e| {
663 warn!("chrono::Duration::from_std failed for rollout delay ({e}), defaulting to 1 minute");
664 chrono::Duration::minutes(1)
665 });
666 info!("Will apply upgrade at {}", wake_time.to_rfc3339());
667 remaining
668 }
669 },
670 );
671 tokio::select! {
674 () = shutdown.cancelled() => {
675 break;
676 }
677 () = tokio::time::sleep(sleep_duration) => {}
678 }
679 }
680 }
681 }
682 });
683 }
684
685 info!("Node running, waiting for shutdown signal");
686
687 self.run_event_loop().await?;
689
690 if let Some(ref manager) = self.bootstrap_manager {
692 let stats = manager.stats().await;
693 info!(
694 "Bootstrap cache shutdown: {} peers, avg quality {:.2}",
695 stats.total_peers, stats.average_quality
696 );
697 }
698
699 if let Some(ref mut engine) = self.replication_engine {
702 engine.shutdown().await;
703 }
704
705 if let Some(handle) = self.protocol_task.take() {
707 handle.abort();
708 }
709
710 info!("Shutting down P2P node...");
712 if let Err(e) = self.p2p_node.shutdown().await {
713 warn!("Error during P2P node shutdown: {e}");
714 }
715
716 if let Err(e) = self.events_tx.send(NodeEvent::ShuttingDown) {
717 warn!("Failed to send ShuttingDown event: {e}");
718 }
719 info!("Node shutdown complete");
720
721 let exit_code = self.upgrade_exit_code.load(Ordering::SeqCst);
725 if exit_code >= 0 {
726 info!("Exiting with code {} for upgrade restart", exit_code);
727 std::process::exit(exit_code);
728 }
729
730 Ok(())
731 }
732
733 #[cfg(unix)]
735 async fn run_event_loop(&self) -> Result<()> {
736 let mut sigterm = signal(SignalKind::terminate())?;
737 let mut sighup = signal(SignalKind::hangup())?;
738
739 loop {
740 tokio::select! {
741 () = self.shutdown.cancelled() => {
742 info!("Shutdown signal received");
743 break;
744 }
745 _ = tokio::signal::ctrl_c() => {
746 info!("Received SIGINT (Ctrl-C), initiating shutdown");
747 self.shutdown();
748 break;
749 }
750 _ = sigterm.recv() => {
751 info!("Received SIGTERM, initiating shutdown");
752 self.shutdown();
753 break;
754 }
755 _ = sighup.recv() => {
756 info!("Received SIGHUP (config reload not yet supported)");
757 }
758 }
759 }
760 Ok(())
761 }
762
763 #[cfg(not(unix))]
765 async fn run_event_loop(&self) -> Result<()> {
766 loop {
767 tokio::select! {
768 () = self.shutdown.cancelled() => {
769 info!("Shutdown signal received");
770 break;
771 }
772 _ = tokio::signal::ctrl_c() => {
773 info!("Received Ctrl-C, initiating shutdown");
774 self.shutdown();
775 break;
776 }
777 }
778 }
779 Ok(())
780 }
781
782 fn start_protocol_routing(&mut self) {
787 let protocol = match self.ant_protocol {
788 Some(ref p) => Arc::clone(p),
789 None => return,
790 };
791
792 let mut events = self.p2p_node.subscribe_events();
793 let p2p = Arc::clone(&self.p2p_node);
794 let semaphore = Arc::new(Semaphore::new(64));
795
796 self.protocol_task = Some(tokio::spawn(async move {
797 while let Ok(event) = events.recv().await {
798 if let P2PEvent::Message {
799 topic,
800 source: Some(source),
801 data,
802 } = event
803 {
804 let handler_info: Option<(&str, &str)> = if topic == CHUNK_PROTOCOL_ID {
805 Some(("chunk", CHUNK_PROTOCOL_ID))
806 } else {
807 None
808 };
809
810 if let Some((data_type, response_topic)) = handler_info {
811 debug!("Received {data_type} protocol message from {source}");
812 let protocol = Arc::clone(&protocol);
813 let p2p = Arc::clone(&p2p);
814 let sem = semaphore.clone();
815 tokio::spawn(async move {
816 let Ok(_permit) = sem.acquire().await else {
817 return;
818 };
819 let result = match data_type {
820 "chunk" => protocol.try_handle_request(&data).await,
821 _ => return,
822 };
823 match result {
824 Ok(Some(response)) => {
825 if let Err(e) = p2p
826 .send_message(
827 &source,
828 response_topic,
829 response.to_vec(),
830 &[],
831 )
832 .await
833 {
834 warn!("Failed to send {data_type} protocol response to {source}: {e}");
835 }
836 }
837 Ok(None) => {}
838 Err(e) => {
839 warn!("{data_type} protocol handler error: {e}");
840 }
841 }
842 });
843 }
844 }
845 }
846 }));
847 info!("Protocol message routing started");
848 }
849
850 pub fn shutdown(&self) {
852 self.shutdown.cancel();
853 }
854}
855
856fn jittered_interval(base: std::time::Duration) -> std::time::Duration {
859 let secs = base.as_secs();
860 let variance = secs / 20; if variance == 0 {
862 return base;
863 }
864 let jitter = rand::thread_rng().gen_range(0..=variance * 2);
865 std::time::Duration::from_secs(secs.saturating_sub(variance) + jitter)
866}
867
868#[cfg(test)]
869#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
870mod tests {
871 use super::*;
872 use crate::config::NODES_SUBDIR;
873
874 #[test]
875 fn test_build_upgrade_monitor_staged_rollout_enabled() {
876 let config = NodeConfig {
877 upgrade: crate::config::UpgradeConfig {
878 staged_rollout_hours: 24,
879 ..Default::default()
880 },
881 ..Default::default()
882 };
883 let seed = b"node-seed";
884
885 let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
886 assert!(monitor.has_staged_rollout());
887 }
888
889 #[test]
890 fn test_build_upgrade_monitor_staged_rollout_disabled() {
891 let config = NodeConfig {
892 upgrade: crate::config::UpgradeConfig {
893 staged_rollout_hours: 0,
894 ..Default::default()
895 },
896 ..Default::default()
897 };
898 let seed = b"node-seed";
899
900 let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
901 assert!(!monitor.has_staged_rollout());
902 }
903
904 #[test]
905 fn test_build_core_config_sets_production_mode() {
906 let config = NodeConfig {
907 network_mode: NetworkMode::Production,
908 ..Default::default()
909 };
910 let core = NodeBuilder::build_core_config(&config).expect("core config");
911 assert!(core.diversity_config.is_some());
912 }
913
914 #[test]
915 fn test_build_core_config_ipv4_only() {
916 let config = NodeConfig {
917 ipv4_only: true,
918 ..Default::default()
919 };
920 let core = NodeBuilder::build_core_config(&config).expect("core config");
921 assert!(!core.ipv6, "ipv4_only should disable IPv6");
922 }
923
924 #[test]
925 fn test_build_core_config_dual_stack_by_default() {
926 let config = NodeConfig::default();
927 let core = NodeBuilder::build_core_config(&config).expect("core config");
928 assert!(core.ipv6, "dual-stack should be the default");
929 }
930
931 #[test]
932 fn test_build_core_config_sets_development_mode_permissive() {
933 let config = NodeConfig {
934 network_mode: NetworkMode::Development,
935 ..Default::default()
936 };
937 let core = NodeBuilder::build_core_config(&config).expect("core config");
938 let diversity = core.diversity_config.expect("diversity");
939 assert_eq!(diversity.max_per_ip, Some(usize::MAX));
940 assert_eq!(diversity.max_per_subnet, Some(usize::MAX));
941 }
942
943 #[test]
944 fn test_scan_identity_dirs_empty_dir() {
945 let tmp = tempfile::tempdir().unwrap();
946 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
947 assert!(dirs.is_empty());
948 }
949
950 #[test]
951 fn test_scan_identity_dirs_nonexistent_dir() {
952 let tmp = tempfile::tempdir().unwrap();
953 let path = tmp.path().join("nonexistent_identity_dir");
954 let dirs = NodeBuilder::scan_identity_dirs(&path).unwrap();
955 assert!(dirs.is_empty());
956 }
957
958 #[test]
959 fn test_scan_identity_dirs_finds_one() {
960 let tmp = tempfile::tempdir().unwrap();
961 let node_dir = tmp.path().join("abc123");
962 std::fs::create_dir_all(&node_dir).unwrap();
963 std::fs::write(node_dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
964
965 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
966 assert_eq!(dirs.len(), 1);
967 assert_eq!(dirs[0], node_dir);
968 }
969
970 #[test]
971 fn test_scan_identity_dirs_finds_multiple() {
972 let tmp = tempfile::tempdir().unwrap();
973 for name in &["node_a", "node_b"] {
974 let dir = tmp.path().join(name);
975 std::fs::create_dir_all(&dir).unwrap();
976 std::fs::write(dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
977 }
978 std::fs::create_dir_all(tmp.path().join("no_key")).unwrap();
980
981 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
982 assert_eq!(dirs.len(), 2);
983 }
984
985 #[tokio::test]
986 async fn test_resolve_identity_first_run_creates_identity() {
987 let tmp = tempfile::tempdir().unwrap();
988 let mut config = NodeConfig {
989 root_dir: tmp.path().to_path_buf(),
990 ..Default::default()
991 };
992
993 let identity = NodeBuilder::resolve_identity(&mut config).await.unwrap();
994 assert!(tmp.path().join(NODE_IDENTITY_FILENAME).exists());
996 let peer_id = identity.peer_id().to_hex();
998 assert_eq!(peer_id.len(), 64); }
1000
1001 #[tokio::test]
1002 async fn test_resolve_identity_loads_existing() {
1003 let tmp = tempfile::tempdir().unwrap();
1004
1005 let original = NodeIdentity::generate().unwrap();
1007 original
1008 .save_to_file(&tmp.path().join(NODE_IDENTITY_FILENAME))
1009 .await
1010 .unwrap();
1011
1012 let mut config = NodeConfig {
1013 root_dir: tmp.path().to_path_buf(),
1014 ..Default::default()
1015 };
1016
1017 let loaded = NodeBuilder::resolve_identity(&mut config).await.unwrap();
1018 assert_eq!(loaded.peer_id(), original.peer_id());
1019 }
1020
1021 #[test]
1022 fn test_peer_id_hex_length() {
1023 let id = saorsa_core::identity::PeerId::from_bytes([0x42; 32]);
1024 let hex = id.to_hex();
1025 assert_eq!(hex.len(), 64); }
1027
1028 #[tokio::test]
1032 async fn test_identity_persisted_across_restarts() {
1033 let base_dir = tempfile::tempdir().unwrap();
1034 let nodes_dir = base_dir.path().join(NODES_SUBDIR);
1035
1036 let identity1 = NodeIdentity::generate().unwrap();
1038 let peer_id1 = identity1.peer_id().to_hex();
1039 let peer_dir = nodes_dir.join(&peer_id1);
1040 std::fs::create_dir_all(&peer_dir).unwrap();
1041 identity1
1042 .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
1043 .await
1044 .unwrap();
1045
1046 assert_eq!(peer_id1.len(), 64);
1048 assert_eq!(peer_dir.file_name().unwrap().to_string_lossy(), peer_id1);
1049
1050 let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1052 assert_eq!(identity_dirs.len(), 1);
1053 let loaded = NodeIdentity::load_from_file(&identity_dirs[0].join(NODE_IDENTITY_FILENAME))
1054 .await
1055 .unwrap();
1056 let peer_id2 = loaded.peer_id().to_hex();
1057
1058 assert_eq!(peer_id1, peer_id2, "peer_id must survive restart");
1059 assert_eq!(
1060 identity_dirs[0], peer_dir,
1061 "root_dir must be the same directory"
1062 );
1063 }
1064
1065 #[tokio::test]
1068 async fn test_multiple_identities_errors() {
1069 let base_dir = tempfile::tempdir().unwrap();
1070 let nodes_dir = base_dir.path().join(NODES_SUBDIR);
1071
1072 for name in &["aaaa", "bbbb"] {
1074 let dir = nodes_dir.join(name);
1075 std::fs::create_dir_all(&dir).unwrap();
1076 let identity = NodeIdentity::generate().unwrap();
1077 identity
1078 .save_to_file(&dir.join(NODE_IDENTITY_FILENAME))
1079 .await
1080 .unwrap();
1081 }
1082
1083 let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1084 assert_eq!(identity_dirs.len(), 2, "should find both identity dirs");
1085 }
1086
1087 #[tokio::test]
1090 async fn test_explicit_root_dir_persists_across_restarts() {
1091 let tmp = tempfile::tempdir().unwrap();
1092
1093 let mut config1 = NodeConfig {
1095 root_dir: tmp.path().to_path_buf(),
1096 ..Default::default()
1097 };
1098 let identity1 = NodeBuilder::resolve_identity(&mut config1).await.unwrap();
1099
1100 let mut config2 = NodeConfig {
1102 root_dir: tmp.path().to_path_buf(),
1103 ..Default::default()
1104 };
1105 let identity2 = NodeBuilder::resolve_identity(&mut config2).await.unwrap();
1106
1107 assert_eq!(
1108 identity1.peer_id(),
1109 identity2.peer_id(),
1110 "explicit --root-dir must yield stable identity"
1111 );
1112 }
1113}