1use crate::ant_protocol::CHUNK_PROTOCOL_ID;
4use crate::config::{
5 default_nodes_dir, default_root_dir, NetworkMode, NodeConfig, NODE_IDENTITY_FILENAME,
6};
7use crate::error::{Error, Result};
8use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender};
9use crate::logging::{debug, error, info, warn};
10use crate::payment::metrics::QuotingMetricsTracker;
11use crate::payment::wallet::parse_rewards_address;
12use crate::payment::{EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
13use crate::replication::config::ReplicationConfig;
14use crate::replication::ReplicationEngine;
15use crate::storage::lmdb::MIB;
16use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
17use crate::upgrade::{
18 upgrade_cache_dir, AutoApplyUpgrader, BinaryCache, ReleaseCache, UpgradeMonitor, UpgradeResult,
19};
20use rand::Rng;
21use saorsa_core::identity::NodeIdentity;
22use saorsa_core::{
23 BootstrapConfig as CoreBootstrapConfig, BootstrapManager,
24 IPDiversityConfig as CoreDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent,
25 P2PNode,
26};
27use std::path::PathBuf;
28use std::sync::atomic::{AtomicI32, Ordering};
29use std::sync::Arc;
30use tokio::sync::Semaphore;
31use tokio::task::JoinHandle;
32use tokio_util::sync::CancellationToken;
33
34#[cfg(unix)]
35use tokio::signal::unix::{signal, SignalKind};
36
37pub struct NodeBuilder {
39 config: NodeConfig,
40}
41
42impl NodeBuilder {
43 #[must_use]
45 pub fn new(config: NodeConfig) -> Self {
46 Self { config }
47 }
48
49 pub async fn build(mut self) -> Result<RunningNode> {
55 info!("Building ant-node with config: {:?}", self.config);
56
57 if self.config.network_mode == NetworkMode::Production {
59 match self.config.payment.rewards_address {
60 None => {
61 return Err(Error::Config(
62 "CRITICAL: Rewards address is not configured. \
63 Set payment.rewards_address in config to your Arbitrum wallet address."
64 .to_string(),
65 ));
66 }
67 Some(ref addr) if addr == "0xYOUR_ARBITRUM_ADDRESS_HERE" || addr.is_empty() => {
68 return Err(Error::Config(
69 "CRITICAL: Rewards address is not configured. \
70 Set payment.rewards_address in config to your Arbitrum wallet address."
71 .to_string(),
72 ));
73 }
74 Some(_) => {}
75 }
76 }
77
78 let identity = Arc::new(Self::resolve_identity(&mut self.config).await?);
80 let peer_id = identity.peer_id().to_hex();
81
82 info!(peer_id = %peer_id, root_dir = %self.config.root_dir.display(), "Node identity resolved");
83
84 std::fs::create_dir_all(&self.config.root_dir)?;
86
87 let shutdown = CancellationToken::new();
89
90 let (events_tx, events_rx) = create_event_channel();
92
93 let mut core_config = Self::build_core_config(&self.config)?;
95 core_config.node_identity = Some(Arc::clone(&identity));
98 debug!("Core config: {:?}", core_config);
99
100 let p2p_node = P2PNode::new(core_config)
102 .await
103 .map_err(|e| Error::Startup(format!("Failed to create P2P node: {e}")))?;
104
105 let upgrade_monitor = {
107 let node_id_seed = p2p_node.peer_id().as_bytes();
108 Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
109 };
110
111 let bootstrap_manager = if self.config.bootstrap_cache.enabled {
113 Self::build_bootstrap_manager(&self.config).await
114 } else {
115 info!("Bootstrap cache disabled");
116 None
117 };
118
119 let (ant_protocol, fresh_write_rx) = if self.config.storage.enabled {
122 let (fresh_write_tx, fresh_write_rx) = tokio::sync::mpsc::unbounded_channel();
123 let mut protocol = Self::build_ant_protocol(&self.config, &identity).await?;
124 protocol.set_fresh_write_sender(fresh_write_tx);
125 (Some(Arc::new(protocol)), Some(fresh_write_rx))
126 } else {
127 info!("Chunk storage disabled");
128 (None, None)
129 };
130
131 let p2p_arc = Arc::new(p2p_node);
132
133 if let Some(ref protocol) = ant_protocol {
137 protocol
138 .payment_verifier_arc()
139 .attach_p2p_node(Arc::clone(&p2p_arc));
140 }
141
142 let replication_engine =
144 if let (Some(ref protocol), Some(fresh_rx)) = (&ant_protocol, fresh_write_rx) {
145 let repl_config = ReplicationConfig::default();
146 let storage_arc = protocol.storage();
147 let payment_verifier_arc = protocol.payment_verifier_arc();
148 match ReplicationEngine::new(
149 repl_config,
150 Arc::clone(&p2p_arc),
151 storage_arc,
152 payment_verifier_arc,
153 &self.config.root_dir,
154 fresh_rx,
155 shutdown.clone(),
156 )
157 .await
158 {
159 Ok(engine) => Some(engine),
160 Err(e) => {
161 warn!("Failed to initialize replication engine: {e}");
162 None
163 }
164 }
165 } else {
166 None
167 };
168
169 let node = RunningNode {
170 config: self.config,
171 p2p_node: p2p_arc,
172 shutdown,
173 events_tx,
174 events_rx: Some(events_rx),
175 upgrade_monitor,
176 bootstrap_manager,
177 ant_protocol,
178 replication_engine,
179 protocol_task: None,
180 upgrade_exit_code: Arc::new(AtomicI32::new(-1)),
181 };
182
183 Ok(node)
184 }
185
186 fn build_core_config(config: &NodeConfig) -> Result<CoreNodeConfig> {
188 let local = matches!(config.network_mode, NetworkMode::Development);
189
190 let mut core_config = CoreNodeConfig::builder()
191 .port(config.port)
192 .ipv6(!config.ipv4_only)
193 .local(local)
194 .max_message_size(config.max_message_size)
195 .build()
196 .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?;
197
198 core_config.bootstrap_peers = config
200 .bootstrap
201 .iter()
202 .map(|addr| MultiAddr::quic(*addr))
203 .collect();
204
205 match config.network_mode {
207 NetworkMode::Production => {
208 core_config.diversity_config = Some(CoreDiversityConfig::default());
209 }
210 NetworkMode::Testnet => {
211 core_config.allow_loopback = true;
213 core_config.diversity_config = Some(CoreDiversityConfig {
214 max_per_ip: config.testnet.max_per_ip,
215 max_per_subnet: config.testnet.max_per_subnet,
216 });
217 }
218 NetworkMode::Development => {
219 core_config.diversity_config = Some(CoreDiversityConfig::permissive());
220 }
221 }
222
223 core_config.close_group_cache_dir = Some(
226 config
227 .close_group_cache_dir
228 .clone()
229 .unwrap_or_else(|| config.root_dir.clone()),
230 );
231
232 Ok(core_config)
233 }
234
235 async fn resolve_identity(config: &mut NodeConfig) -> Result<NodeIdentity> {
249 if config.root_dir != default_root_dir() {
250 return Self::load_or_generate_identity(&config.root_dir).await;
251 }
252
253 let nodes_dir = default_nodes_dir();
254 let identity_dirs = Self::scan_identity_dirs(&nodes_dir)?;
255
256 match identity_dirs.len() {
257 0 => {
258 let identity = NodeIdentity::generate().map_err(|e| {
260 Error::Startup(format!("Failed to generate node identity: {e}"))
261 })?;
262 let peer_id = identity.peer_id().to_hex();
263 let peer_dir = nodes_dir.join(&peer_id);
264 std::fs::create_dir_all(&peer_dir)?;
265 identity
266 .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
267 .await
268 .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
269 config.root_dir = peer_dir;
270 Ok(identity)
271 }
272 1 => {
273 let dir = identity_dirs
274 .first()
275 .ok_or_else(|| Error::Config("No identity dirs found".to_string()))?;
276 let identity = NodeIdentity::load_from_file(&dir.join(NODE_IDENTITY_FILENAME))
277 .await
278 .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))?;
279 config.root_dir.clone_from(dir);
280 Ok(identity)
281 }
282 _ => {
283 let dirs: Vec<String> = identity_dirs
284 .iter()
285 .filter_map(|d| d.file_name().map(|n| n.to_string_lossy().into_owned()))
286 .collect();
287 Err(Error::Config(format!(
288 "Multiple node identities found at {}: [{}]. Specify --root-dir to select one.",
289 nodes_dir.display(),
290 dirs.join(", ")
291 )))
292 }
293 }
294 }
295
296 async fn load_or_generate_identity(dir: &std::path::Path) -> Result<NodeIdentity> {
298 let key_path = dir.join(NODE_IDENTITY_FILENAME);
299 if key_path.exists() {
300 NodeIdentity::load_from_file(&key_path)
301 .await
302 .map_err(|e| Error::Startup(format!("Failed to load node identity: {e}")))
303 } else {
304 let identity = NodeIdentity::generate()
305 .map_err(|e| Error::Startup(format!("Failed to generate node identity: {e}")))?;
306 std::fs::create_dir_all(dir)?;
307 identity
308 .save_to_file(&key_path)
309 .await
310 .map_err(|e| Error::Startup(format!("Failed to save node identity: {e}")))?;
311 Ok(identity)
312 }
313 }
314
315 fn scan_identity_dirs(base_dir: &std::path::Path) -> Result<Vec<PathBuf>> {
317 let mut dirs = Vec::new();
318 let read_dir = match std::fs::read_dir(base_dir) {
319 Ok(rd) => rd,
320 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(dirs),
321 Err(e) => return Err(e.into()),
322 };
323 for entry in read_dir {
324 let entry = entry?;
325 let path = entry.path();
326 if path.is_dir() && path.join(NODE_IDENTITY_FILENAME).exists() {
327 dirs.push(path);
328 }
329 }
330 Ok(dirs)
331 }
332
333 fn build_upgrade_monitor(config: &NodeConfig, node_id_seed: &[u8]) -> UpgradeMonitor {
334 let mut monitor = UpgradeMonitor::new(
335 config.upgrade.github_repo.clone(),
336 config.upgrade.channel,
337 config.upgrade.check_interval_hours,
338 );
339
340 if let Ok(cache_dir) = upgrade_cache_dir() {
341 monitor = monitor.with_release_cache(ReleaseCache::new(
342 cache_dir,
343 std::time::Duration::from_secs(3600),
344 ));
345 }
346
347 if config.upgrade.staged_rollout_hours > 0 {
348 monitor =
349 monitor.with_staged_rollout(node_id_seed, config.upgrade.staged_rollout_hours);
350 }
351
352 monitor
353 }
354
355 async fn build_ant_protocol(
360 config: &NodeConfig,
361 identity: &NodeIdentity,
362 ) -> Result<AntProtocol> {
363 let storage_config = LmdbStorageConfig {
365 root_dir: config.root_dir.clone(),
366 verify_on_read: config.storage.verify_on_read,
367 max_map_size: config.storage.db_size_gb.saturating_mul(1024 * 1024 * 1024),
368 disk_reserve: config.storage.disk_reserve_mb.saturating_mul(MIB),
369 };
370 let storage = LmdbStorage::new(storage_config)
371 .await
372 .map_err(|e| Error::Startup(format!("Failed to create LMDB storage: {e}")))?;
373
374 let rewards_address = match config.payment.rewards_address {
376 Some(ref addr) => parse_rewards_address(addr)?,
377 None => {
378 return Err(Error::Startup(
379 "No rewards address configured. Set --rewards-address or payment.rewards_address in config.".to_string(),
380 ));
381 }
382 };
383
384 let evm_network = config.payment.evm_network.clone().into_evm_network();
386 let payment_config = PaymentVerifierConfig {
387 evm: EvmVerifierConfig {
388 network: evm_network,
389 },
390 cache_capacity: config.payment.cache_capacity,
391 local_rewards_address: rewards_address,
392 };
393 let payment_verifier = PaymentVerifier::new(payment_config);
394 let metrics_tracker = QuotingMetricsTracker::new(0);
395 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
396
397 crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)?;
400
401 let protocol = AntProtocol::new(
402 Arc::new(storage),
403 Arc::new(payment_verifier),
404 Arc::new(quote_generator),
405 );
406
407 info!(
408 "ANT protocol handler initialized with ML-DSA-65 signing (protocol={CHUNK_PROTOCOL_ID})"
409 );
410
411 Ok(protocol)
412 }
413
414 async fn build_bootstrap_manager(config: &NodeConfig) -> Option<BootstrapManager> {
416 let cache_dir = config
417 .bootstrap_cache
418 .cache_dir
419 .clone()
420 .unwrap_or_else(|| config.root_dir.join("bootstrap_cache"));
421
422 if let Err(e) = std::fs::create_dir_all(&cache_dir) {
424 warn!("Failed to create bootstrap cache directory: {e}");
425 return None;
426 }
427
428 let bootstrap_config = CoreBootstrapConfig {
429 cache_dir,
430 max_peers: config.bootstrap_cache.max_contacts,
431 ..CoreBootstrapConfig::default()
432 };
433
434 match BootstrapManager::with_config(bootstrap_config).await {
435 Ok(manager) => {
436 info!(
437 "Bootstrap cache initialized with {} max contacts",
438 config.bootstrap_cache.max_contacts
439 );
440 Some(manager)
441 }
442 Err(e) => {
443 warn!("Failed to initialize bootstrap cache: {e}");
444 None
445 }
446 }
447 }
448}
449
450pub struct RunningNode {
452 config: NodeConfig,
453 p2p_node: Arc<P2PNode>,
454 shutdown: CancellationToken,
455 events_tx: NodeEventsSender,
456 events_rx: Option<NodeEventsChannel>,
457 upgrade_monitor: Option<UpgradeMonitor>,
458 bootstrap_manager: Option<BootstrapManager>,
460 ant_protocol: Option<Arc<AntProtocol>>,
462 replication_engine: Option<ReplicationEngine>,
464 protocol_task: Option<JoinHandle<()>>,
466 upgrade_exit_code: Arc<AtomicI32>,
468}
469
470impl RunningNode {
471 #[must_use]
473 pub fn root_dir(&self) -> &PathBuf {
474 &self.config.root_dir
475 }
476
477 pub fn events(&mut self) -> Option<NodeEventsChannel> {
481 self.events_rx.take()
482 }
483
484 #[must_use]
486 pub fn subscribe_events(&self) -> NodeEventsChannel {
487 self.events_tx.subscribe()
488 }
489
490 #[allow(clippy::too_many_lines)]
496 pub async fn run(&mut self) -> Result<()> {
497 info!("Node runtime loop starting");
498
499 let dht_events_for_bootstrap = self
503 .replication_engine
504 .as_ref()
505 .map(|_| self.p2p_node.dht_manager().subscribe_events());
506
507 self.p2p_node
509 .start()
510 .await
511 .map_err(|e| Error::Startup(format!("Failed to start P2P node: {e}")))?;
512
513 let listen_addrs = self.p2p_node.listen_addrs().await;
514 info!(listen_addrs = ?listen_addrs, "P2P node started");
515
516 let actual_port = listen_addrs
518 .first()
519 .and_then(MultiAddr::port)
520 .unwrap_or(self.config.port);
521 info!(
522 port = actual_port,
523 "Node is running on port: {}", actual_port
524 );
525
526 if let Err(e) = self.events_tx.send(NodeEvent::Started) {
528 warn!("Failed to send Started event: {e}");
529 }
530
531 self.start_protocol_routing();
533
534 if let Some(ref mut engine) = self.replication_engine {
536 if let Some(dht_events) = dht_events_for_bootstrap {
539 engine.start(dht_events);
540 }
541 info!("Replication engine started");
542 }
543
544 if let Some(monitor) = self.upgrade_monitor.take() {
546 let events_tx = self.events_tx.clone();
547 let shutdown = self.shutdown.clone();
548 let stop_on_upgrade = self.config.upgrade.stop_on_upgrade;
549 let upgrade_exit_code = Arc::clone(&self.upgrade_exit_code);
550
551 tokio::spawn(async move {
552 let mut monitor = monitor;
553 let mut upgrader = AutoApplyUpgrader::new().with_stop_on_upgrade(stop_on_upgrade);
554 if let Ok(cache_dir) = upgrade_cache_dir() {
555 upgrader = upgrader.with_binary_cache(BinaryCache::new(cache_dir));
556 }
557
558 {
561 let jitter_duration = jittered_interval(monitor.check_interval());
562 let first_check_time = chrono::Utc::now()
563 + chrono::Duration::from_std(jitter_duration).unwrap_or_else(|e| {
564 warn!("chrono::Duration::from_std failed for jitter ({e}), defaulting to 1 minute");
565 chrono::Duration::minutes(1)
566 });
567 info!(
568 "First upgrade check scheduled for {} (jitter: {}s)",
569 first_check_time.to_rfc3339(),
570 jitter_duration.as_secs()
571 );
572 tokio::time::sleep(jitter_duration).await;
573 }
574
575 loop {
576 tokio::select! {
577 () = shutdown.cancelled() => {
578 break;
579 }
580 result = monitor.check_for_ready_upgrade() => {
581 match result {
582 Ok(Some(upgrade_info)) => {
583 info!(
584 current_version = %upgrader.current_version(),
585 new_version = %upgrade_info.version,
586 "Upgrade available"
587 );
588
589 if let Err(e) = events_tx.send(NodeEvent::UpgradeAvailable {
591 version: upgrade_info.version.to_string(),
592 }) {
593 warn!("Failed to send UpgradeAvailable event: {e}");
594 }
595
596 info!("Starting auto-apply upgrade...");
598 match upgrader.apply_upgrade(&upgrade_info).await {
599 Ok(UpgradeResult::Success { version, exit_code }) => {
600 info!("Upgrade to {} successful, initiating graceful shutdown", version);
601 upgrade_exit_code.store(exit_code, Ordering::SeqCst);
602 shutdown.cancel();
603 break;
604 }
605 Ok(UpgradeResult::RolledBack { reason }) => {
606 warn!("Error during upgrade process: {}", reason);
607 }
608 Ok(UpgradeResult::NoUpgrade) => {
609 info!("Already running latest version");
610 }
611 Err(e) => {
612 error!("Error during upgrade process: {}", e);
613 }
614 }
615 }
616 Ok(None) => {
617 if let Some(remaining) = monitor.time_until_upgrade() {
618 info!(
619 "Upgrade pending, rollout delay remaining: {}m {}s",
620 remaining.as_secs() / 60,
621 remaining.as_secs() % 60
622 );
623 } else {
624 info!("No upgrade available");
625 }
626 }
627 Err(e) => {
628 warn!("Error during upgrade process: {}", e);
629 }
630 }
631 let sleep_duration = monitor.time_until_upgrade().map_or_else(
635 || {
636 let jittered_duration =
638 jittered_interval(monitor.check_interval());
639 let next_check = chrono::Utc::now()
640 + chrono::Duration::from_std(jittered_duration).unwrap_or_else(|e| {
641 warn!("chrono::Duration::from_std failed for interval ({e}), defaulting to 1 hour");
642 chrono::Duration::hours(1)
643 });
644 info!("Next upgrade check scheduled for {}", next_check.to_rfc3339());
645 jittered_duration
646 },
647 |remaining| {
648 if remaining.is_zero() {
652 let backoff = jittered_interval(monitor.check_interval());
653 let next_check = chrono::Utc::now()
654 + chrono::Duration::from_std(backoff).unwrap_or_else(|e| {
655 warn!("chrono::Duration::from_std failed for backoff ({e}), defaulting to 1 hour");
656 chrono::Duration::hours(1)
657 });
658 info!(
659 "Upgrade rollout delay elapsed but previous apply did not succeed; \
660 backing off, next check scheduled for {}",
661 next_check.to_rfc3339()
662 );
663 backoff
664 } else {
665 let wake_time = chrono::Utc::now()
666 + chrono::Duration::from_std(remaining).unwrap_or_else(|e| {
667 warn!("chrono::Duration::from_std failed for rollout delay ({e}), defaulting to 1 minute");
668 chrono::Duration::minutes(1)
669 });
670 info!("Will apply upgrade at {}", wake_time.to_rfc3339());
671 remaining
672 }
673 },
674 );
675 tokio::select! {
678 () = shutdown.cancelled() => {
679 break;
680 }
681 () = tokio::time::sleep(sleep_duration) => {}
682 }
683 }
684 }
685 }
686 });
687 }
688
689 info!("Node running, waiting for shutdown signal");
690
691 self.run_event_loop().await?;
693
694 if let Some(ref manager) = self.bootstrap_manager {
696 let stats = manager.stats().await;
697 info!(
698 "Bootstrap cache shutdown: {} peers, avg quality {:.2}",
699 stats.total_peers, stats.average_quality
700 );
701 }
702
703 if let Some(ref mut engine) = self.replication_engine {
706 engine.shutdown().await;
707 }
708
709 if let Some(handle) = self.protocol_task.take() {
711 handle.abort();
712 }
713
714 info!("Shutting down P2P node...");
716 if let Err(e) = self.p2p_node.shutdown().await {
717 warn!("Error during P2P node shutdown: {e}");
718 }
719
720 if let Err(e) = self.events_tx.send(NodeEvent::ShuttingDown) {
721 warn!("Failed to send ShuttingDown event: {e}");
722 }
723 info!("Node shutdown complete");
724
725 let exit_code = self.upgrade_exit_code.load(Ordering::SeqCst);
729 if exit_code >= 0 {
730 info!("Exiting with code {} for upgrade restart", exit_code);
731 std::process::exit(exit_code);
732 }
733
734 Ok(())
735 }
736
737 #[cfg(unix)]
739 async fn run_event_loop(&self) -> Result<()> {
740 let mut sigterm = signal(SignalKind::terminate())?;
741 let mut sighup = signal(SignalKind::hangup())?;
742
743 loop {
744 tokio::select! {
745 () = self.shutdown.cancelled() => {
746 info!("Shutdown signal received");
747 break;
748 }
749 _ = tokio::signal::ctrl_c() => {
750 info!("Received SIGINT (Ctrl-C), initiating shutdown");
751 self.shutdown();
752 break;
753 }
754 _ = sigterm.recv() => {
755 info!("Received SIGTERM, initiating shutdown");
756 self.shutdown();
757 break;
758 }
759 _ = sighup.recv() => {
760 info!("Received SIGHUP (config reload not yet supported)");
761 }
762 }
763 }
764 Ok(())
765 }
766
767 #[cfg(not(unix))]
769 async fn run_event_loop(&self) -> Result<()> {
770 loop {
771 tokio::select! {
772 () = self.shutdown.cancelled() => {
773 info!("Shutdown signal received");
774 break;
775 }
776 _ = tokio::signal::ctrl_c() => {
777 info!("Received Ctrl-C, initiating shutdown");
778 self.shutdown();
779 break;
780 }
781 }
782 }
783 Ok(())
784 }
785
786 fn start_protocol_routing(&mut self) {
791 let protocol = match self.ant_protocol {
792 Some(ref p) => Arc::clone(p),
793 None => return,
794 };
795
796 let mut events = self.p2p_node.subscribe_events();
797 let p2p = Arc::clone(&self.p2p_node);
798 let semaphore = Arc::new(Semaphore::new(64));
799
800 self.protocol_task = Some(tokio::spawn(async move {
801 while let Ok(event) = events.recv().await {
802 if let P2PEvent::Message {
803 topic,
804 source: Some(source),
805 data,
806 } = event
807 {
808 let handler_info: Option<(&str, &str)> = if topic == CHUNK_PROTOCOL_ID {
809 Some(("chunk", CHUNK_PROTOCOL_ID))
810 } else {
811 None
812 };
813
814 if let Some((data_type, response_topic)) = handler_info {
815 debug!("Received {data_type} protocol message from {source}");
816 let protocol = Arc::clone(&protocol);
817 let p2p = Arc::clone(&p2p);
818 let sem = semaphore.clone();
819 tokio::spawn(async move {
820 let Ok(_permit) = sem.acquire().await else {
821 return;
822 };
823 let result = match data_type {
824 "chunk" => protocol.try_handle_request(&data).await,
825 _ => return,
826 };
827 match result {
828 Ok(Some(response)) => {
829 if let Err(e) = p2p
830 .send_message(
831 &source,
832 response_topic,
833 response.to_vec(),
834 &[],
835 )
836 .await
837 {
838 warn!("Failed to send {data_type} protocol response to {source}: {e}");
839 }
840 }
841 Ok(None) => {}
842 Err(e) => {
843 warn!("{data_type} protocol handler error: {e}");
844 }
845 }
846 });
847 }
848 }
849 }
850 }));
851 info!("Protocol message routing started");
852 }
853
854 pub fn shutdown(&self) {
856 self.shutdown.cancel();
857 }
858}
859
860fn jittered_interval(base: std::time::Duration) -> std::time::Duration {
863 let secs = base.as_secs();
864 let variance = secs / 20; if variance == 0 {
866 return base;
867 }
868 let jitter = rand::thread_rng().gen_range(0..=variance * 2);
869 std::time::Duration::from_secs(secs.saturating_sub(variance) + jitter)
870}
871
872#[cfg(test)]
873#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
874mod tests {
875 use super::*;
876 use crate::config::NODES_SUBDIR;
877
878 #[test]
879 fn test_build_upgrade_monitor_staged_rollout_enabled() {
880 let config = NodeConfig {
881 upgrade: crate::config::UpgradeConfig {
882 staged_rollout_hours: 24,
883 ..Default::default()
884 },
885 ..Default::default()
886 };
887 let seed = b"node-seed";
888
889 let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
890 assert!(monitor.has_staged_rollout());
891 }
892
893 #[test]
894 fn test_build_upgrade_monitor_staged_rollout_disabled() {
895 let config = NodeConfig {
896 upgrade: crate::config::UpgradeConfig {
897 staged_rollout_hours: 0,
898 ..Default::default()
899 },
900 ..Default::default()
901 };
902 let seed = b"node-seed";
903
904 let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
905 assert!(!monitor.has_staged_rollout());
906 }
907
908 #[test]
909 fn test_build_core_config_sets_production_mode() {
910 let config = NodeConfig {
911 network_mode: NetworkMode::Production,
912 ..Default::default()
913 };
914 let core = NodeBuilder::build_core_config(&config).expect("core config");
915 assert!(core.diversity_config.is_some());
916 }
917
918 #[test]
919 fn test_build_core_config_ipv4_only() {
920 let config = NodeConfig {
921 ipv4_only: true,
922 ..Default::default()
923 };
924 let core = NodeBuilder::build_core_config(&config).expect("core config");
925 assert!(!core.ipv6, "ipv4_only should disable IPv6");
926 }
927
928 #[test]
929 fn test_build_core_config_dual_stack_by_default() {
930 let config = NodeConfig::default();
931 let core = NodeBuilder::build_core_config(&config).expect("core config");
932 assert!(core.ipv6, "dual-stack should be the default");
933 }
934
935 #[test]
936 fn test_build_core_config_sets_development_mode_permissive() {
937 let config = NodeConfig {
938 network_mode: NetworkMode::Development,
939 ..Default::default()
940 };
941 let core = NodeBuilder::build_core_config(&config).expect("core config");
942 let diversity = core.diversity_config.expect("diversity");
943 assert_eq!(diversity.max_per_ip, Some(usize::MAX));
944 assert_eq!(diversity.max_per_subnet, Some(usize::MAX));
945 }
946
947 #[test]
948 fn test_scan_identity_dirs_empty_dir() {
949 let tmp = tempfile::tempdir().unwrap();
950 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
951 assert!(dirs.is_empty());
952 }
953
954 #[test]
955 fn test_scan_identity_dirs_nonexistent_dir() {
956 let tmp = tempfile::tempdir().unwrap();
957 let path = tmp.path().join("nonexistent_identity_dir");
958 let dirs = NodeBuilder::scan_identity_dirs(&path).unwrap();
959 assert!(dirs.is_empty());
960 }
961
962 #[test]
963 fn test_scan_identity_dirs_finds_one() {
964 let tmp = tempfile::tempdir().unwrap();
965 let node_dir = tmp.path().join("abc123");
966 std::fs::create_dir_all(&node_dir).unwrap();
967 std::fs::write(node_dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
968
969 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
970 assert_eq!(dirs.len(), 1);
971 assert_eq!(dirs[0], node_dir);
972 }
973
974 #[test]
975 fn test_scan_identity_dirs_finds_multiple() {
976 let tmp = tempfile::tempdir().unwrap();
977 for name in &["node_a", "node_b"] {
978 let dir = tmp.path().join(name);
979 std::fs::create_dir_all(&dir).unwrap();
980 std::fs::write(dir.join(NODE_IDENTITY_FILENAME), "{}").unwrap();
981 }
982 std::fs::create_dir_all(tmp.path().join("no_key")).unwrap();
984
985 let dirs = NodeBuilder::scan_identity_dirs(tmp.path()).unwrap();
986 assert_eq!(dirs.len(), 2);
987 }
988
989 #[tokio::test]
990 async fn test_resolve_identity_first_run_creates_identity() {
991 let tmp = tempfile::tempdir().unwrap();
992 let mut config = NodeConfig {
993 root_dir: tmp.path().to_path_buf(),
994 ..Default::default()
995 };
996
997 let identity = NodeBuilder::resolve_identity(&mut config).await.unwrap();
998 assert!(tmp.path().join(NODE_IDENTITY_FILENAME).exists());
1000 let peer_id = identity.peer_id().to_hex();
1002 assert_eq!(peer_id.len(), 64); }
1004
1005 #[tokio::test]
1006 async fn test_resolve_identity_loads_existing() {
1007 let tmp = tempfile::tempdir().unwrap();
1008
1009 let original = NodeIdentity::generate().unwrap();
1011 original
1012 .save_to_file(&tmp.path().join(NODE_IDENTITY_FILENAME))
1013 .await
1014 .unwrap();
1015
1016 let mut config = NodeConfig {
1017 root_dir: tmp.path().to_path_buf(),
1018 ..Default::default()
1019 };
1020
1021 let loaded = NodeBuilder::resolve_identity(&mut config).await.unwrap();
1022 assert_eq!(loaded.peer_id(), original.peer_id());
1023 }
1024
1025 #[test]
1026 fn test_peer_id_hex_length() {
1027 let id = saorsa_core::identity::PeerId::from_bytes([0x42; 32]);
1028 let hex = id.to_hex();
1029 assert_eq!(hex.len(), 64); }
1031
1032 #[tokio::test]
1036 async fn test_identity_persisted_across_restarts() {
1037 let base_dir = tempfile::tempdir().unwrap();
1038 let nodes_dir = base_dir.path().join(NODES_SUBDIR);
1039
1040 let identity1 = NodeIdentity::generate().unwrap();
1042 let peer_id1 = identity1.peer_id().to_hex();
1043 let peer_dir = nodes_dir.join(&peer_id1);
1044 std::fs::create_dir_all(&peer_dir).unwrap();
1045 identity1
1046 .save_to_file(&peer_dir.join(NODE_IDENTITY_FILENAME))
1047 .await
1048 .unwrap();
1049
1050 assert_eq!(peer_id1.len(), 64);
1052 assert_eq!(peer_dir.file_name().unwrap().to_string_lossy(), peer_id1);
1053
1054 let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1056 assert_eq!(identity_dirs.len(), 1);
1057 let loaded = NodeIdentity::load_from_file(&identity_dirs[0].join(NODE_IDENTITY_FILENAME))
1058 .await
1059 .unwrap();
1060 let peer_id2 = loaded.peer_id().to_hex();
1061
1062 assert_eq!(peer_id1, peer_id2, "peer_id must survive restart");
1063 assert_eq!(
1064 identity_dirs[0], peer_dir,
1065 "root_dir must be the same directory"
1066 );
1067 }
1068
1069 #[tokio::test]
1072 async fn test_multiple_identities_errors() {
1073 let base_dir = tempfile::tempdir().unwrap();
1074 let nodes_dir = base_dir.path().join(NODES_SUBDIR);
1075
1076 for name in &["aaaa", "bbbb"] {
1078 let dir = nodes_dir.join(name);
1079 std::fs::create_dir_all(&dir).unwrap();
1080 let identity = NodeIdentity::generate().unwrap();
1081 identity
1082 .save_to_file(&dir.join(NODE_IDENTITY_FILENAME))
1083 .await
1084 .unwrap();
1085 }
1086
1087 let identity_dirs = NodeBuilder::scan_identity_dirs(&nodes_dir).unwrap();
1088 assert_eq!(identity_dirs.len(), 2, "should find both identity dirs");
1089 }
1090
1091 #[tokio::test]
1094 async fn test_explicit_root_dir_persists_across_restarts() {
1095 let tmp = tempfile::tempdir().unwrap();
1096
1097 let mut config1 = NodeConfig {
1099 root_dir: tmp.path().to_path_buf(),
1100 ..Default::default()
1101 };
1102 let identity1 = NodeBuilder::resolve_identity(&mut config1).await.unwrap();
1103
1104 let mut config2 = NodeConfig {
1106 root_dir: tmp.path().to_path_buf(),
1107 ..Default::default()
1108 };
1109 let identity2 = NodeBuilder::resolve_identity(&mut config2).await.unwrap();
1110
1111 assert_eq!(
1112 identity1.peer_id(),
1113 identity2.peer_id(),
1114 "explicit --root-dir must yield stable identity"
1115 );
1116 }
1117}