1use crate::ant_protocol::CHUNK_PROTOCOL_ID;
4use crate::attestation::VerificationLevel;
5use crate::config::{
6 AttestationMode, AttestationNodeConfig, EvmNetworkConfig, IpVersion, NetworkMode, NodeConfig,
7};
8use crate::error::{Error, Result};
9use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender};
10use crate::payment::metrics::QuotingMetricsTracker;
11use crate::payment::wallet::parse_rewards_address;
12use crate::payment::{PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
13use crate::storage::{AntProtocol, DiskStorage, DiskStorageConfig};
14use crate::upgrade::{AutoApplyUpgrader, UpgradeMonitor, UpgradeResult};
15use ant_evm::RewardsAddress;
16use evmlib::Network as EvmNetwork;
17use saorsa_core::{
18 AttestationConfig as CoreAttestationConfig, BootstrapConfig as CoreBootstrapConfig,
19 BootstrapManager, EnforcementMode as CoreEnforcementMode,
20 IPDiversityConfig as CoreDiversityConfig, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode,
21 ProductionConfig as CoreProductionConfig,
22};
23use std::net::SocketAddr;
24use std::path::PathBuf;
25use std::sync::Arc;
26use tokio::sync::watch;
27use tokio::task::JoinHandle;
28use tracing::{debug, error, info, warn};
29
30const DEFAULT_MAX_QUOTING_RECORDS: usize = 100_000;
32
33const DEFAULT_REWARDS_ADDRESS: [u8; 20] = [0u8; 20];
35
36#[cfg(unix)]
37use tokio::signal::unix::{signal, SignalKind};
38
39pub struct NodeBuilder {
41 config: NodeConfig,
42}
43
44impl NodeBuilder {
45 #[must_use]
47 pub fn new(config: NodeConfig) -> Self {
48 Self { config }
49 }
50
51 pub async fn build(self) -> Result<RunningNode> {
58 info!("Building saorsa-node with config: {:?}", self.config);
59
60 Self::validate_attestation_security(&self.config)?;
62
63 std::fs::create_dir_all(&self.config.root_dir)?;
65
66 let (shutdown_tx, shutdown_rx) = watch::channel(false);
68
69 let (events_tx, events_rx) = create_event_channel();
71
72 let core_config = Self::build_core_config(&self.config)?;
74 debug!("Core config: {:?}", core_config);
75
76 let p2p_node = P2PNode::new(core_config)
78 .await
79 .map_err(|e| Error::Startup(format!("Failed to create P2P node: {e}")))?;
80
81 let upgrade_monitor = if self.config.upgrade.enabled {
83 let node_id_seed = p2p_node.peer_id().as_bytes();
84 Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
85 } else {
86 None
87 };
88
89 let bootstrap_manager = if self.config.bootstrap_cache.enabled {
91 Self::build_bootstrap_manager(&self.config).await
92 } else {
93 info!("Bootstrap cache disabled");
94 None
95 };
96
97 let ant_protocol = if self.config.storage.enabled {
99 Some(Arc::new(Self::build_ant_protocol(&self.config).await?))
100 } else {
101 info!("Chunk storage disabled");
102 None
103 };
104
105 let node = RunningNode {
106 config: self.config,
107 p2p_node: Arc::new(p2p_node),
108 shutdown_tx,
109 shutdown_rx,
110 events_tx,
111 events_rx: Some(events_rx),
112 upgrade_monitor,
113 bootstrap_manager,
114 ant_protocol,
115 protocol_task: None,
116 };
117
118 Ok(node)
119 }
120
121 fn build_core_config(config: &NodeConfig) -> Result<CoreNodeConfig> {
123 let listen_addr: SocketAddr = match config.ip_version {
125 IpVersion::Ipv4 | IpVersion::Dual => format!("0.0.0.0:{}", config.port)
126 .parse()
127 .map_err(|e| Error::Config(format!("Invalid listen address: {e}")))?,
128 IpVersion::Ipv6 => format!("[::]:{}", config.port)
129 .parse()
130 .map_err(|e| Error::Config(format!("Invalid listen address: {e}")))?,
131 };
132
133 let mut core_config = CoreNodeConfig::new()
134 .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?;
135
136 core_config.listen_addr = listen_addr;
138 core_config.listen_addrs = vec![listen_addr];
139
140 core_config.enable_ipv6 = matches!(config.ip_version, IpVersion::Ipv6 | IpVersion::Dual);
142
143 core_config.bootstrap_peers.clone_from(&config.bootstrap);
145
146 match config.network_mode {
148 NetworkMode::Production => {
149 core_config.production_config = Some(CoreProductionConfig::default());
150 core_config.diversity_config = Some(CoreDiversityConfig::default());
151 }
152 NetworkMode::Testnet => {
153 core_config.production_config = Some(CoreProductionConfig::default());
154 let mut diversity = CoreDiversityConfig::testnet();
155 diversity.max_nodes_per_asn = config.testnet.max_nodes_per_asn;
156 diversity.max_nodes_per_64 = config.testnet.max_nodes_per_64;
157 diversity.enable_geolocation_check = config.testnet.enable_geo_checks;
158 diversity.min_geographic_diversity = if config.testnet.enable_geo_checks {
159 3
160 } else {
161 1
162 };
163 core_config.diversity_config = Some(diversity);
164
165 if config.testnet.enforce_age_requirements {
166 warn!(
167 "testnet.enforce_age_requirements is set but saorsa-core does not yet \
168 expose a knob; age checks may remain relaxed"
169 );
170 }
171 }
172 NetworkMode::Development => {
173 core_config.production_config = None;
174 core_config.diversity_config = Some(CoreDiversityConfig::permissive());
175 }
176 }
177
178 core_config.attestation_config = Self::build_attestation_config(&config.attestation)?;
180
181 Ok(core_config)
182 }
183
184 fn validate_attestation_security(config: &NodeConfig) -> Result<()> {
188 if !config.attestation.enabled {
189 return Ok(());
190 }
191
192 let level = VerificationLevel::current();
193 info!("Attestation verification level: {}", level);
194
195 match level {
196 VerificationLevel::None => {
197 error!("SECURITY: Attestation enabled without verification feature!");
198 error!(
199 "Enable zkvm-prover or zkvm-verifier-groth16 feature for real verification."
200 );
201 error!("Build with: cargo build --features zkvm-prover");
202 return Err(Error::Config(
203 "Attestation requires zkvm-prover or zkvm-verifier-groth16 feature. \
204 Without a verification feature, proofs use mock verification \
205 which provides NO CRYPTOGRAPHIC SECURITY. \
206 Build with: cargo build --features zkvm-prover"
207 .into(),
208 ));
209 }
210 VerificationLevel::Groth16 => {
211 if config.attestation.require_pq_secure {
212 error!(
213 "SECURITY: require_pq_secure=true but only Groth16 verification available"
214 );
215 return Err(Error::Config(
216 "require_pq_secure=true but only Groth16 available (not post-quantum secure). \
217 Either enable zkvm-prover feature for STARK verification, \
218 or set require_pq_secure=false in attestation config."
219 .into(),
220 ));
221 }
222 warn!(
223 "Attestation using Groth16 verification - NOT post-quantum secure. \
224 Consider enabling zkvm-prover feature for production deployments."
225 );
226 }
227 VerificationLevel::Stark => {
228 info!("Attestation using STARK verification (post-quantum secure)");
229 }
230 }
231
232 Ok(())
233 }
234
235 fn build_attestation_config(config: &AttestationNodeConfig) -> Result<CoreAttestationConfig> {
237 let enforcement_mode = match config.mode {
238 AttestationMode::Off => CoreEnforcementMode::Off,
239 AttestationMode::Soft => CoreEnforcementMode::Soft,
240 AttestationMode::Hard => CoreEnforcementMode::Hard,
241 };
242
243 let allowed_binary_hashes = config
245 .allowed_binary_hashes
246 .iter()
247 .map(|hex_str| {
248 let bytes = hex::decode(hex_str).map_err(|e| {
249 Error::Config(format!(
250 "Invalid hex in allowed_binary_hashes '{hex_str}': {e}"
251 ))
252 })?;
253 if bytes.len() != 32 {
254 let len = bytes.len();
255 return Err(Error::Config(format!(
256 "Binary hash must be 32 bytes (64 hex chars), got {len} bytes for '{hex_str}'"
257 )));
258 }
259 let mut arr = [0u8; 32];
260 arr.copy_from_slice(&bytes);
261 Ok(arr)
262 })
263 .collect::<Result<Vec<_>>>()?;
264
265 if config.mode == AttestationMode::Hard && config.enabled {
266 if allowed_binary_hashes.is_empty() {
267 warn!(
268 "Attestation in Hard mode with empty allowed_binary_hashes - \
269 all binaries will be accepted. Consider specifying allowed hashes."
270 );
271 } else {
272 info!(
273 "Attestation in Hard mode with {} allowed binary hash(es)",
274 allowed_binary_hashes.len()
275 );
276 }
277 }
278
279 Ok(CoreAttestationConfig {
280 enabled: config.enabled,
281 enforcement_mode,
282 allowed_binary_hashes,
283 sunset_grace_days: config.sunset_grace_days,
284 })
285 }
286
287 fn build_upgrade_monitor(config: &NodeConfig, node_id_seed: &[u8]) -> Arc<UpgradeMonitor> {
288 let monitor = UpgradeMonitor::new(
289 config.upgrade.github_repo.clone(),
290 config.upgrade.channel,
291 config.upgrade.check_interval_hours,
292 );
293
294 if config.upgrade.staged_rollout_hours > 0 {
295 Arc::new(monitor.with_staged_rollout(node_id_seed, config.upgrade.staged_rollout_hours))
296 } else {
297 Arc::new(monitor)
298 }
299 }
300
301 async fn build_ant_protocol(config: &NodeConfig) -> Result<AntProtocol> {
305 let storage_config = DiskStorageConfig {
307 root_dir: config.root_dir.clone(),
308 verify_on_read: config.storage.verify_on_read,
309 max_chunks: config.storage.max_chunks,
310 };
311 let storage = DiskStorage::new(storage_config)
312 .await
313 .map_err(|e| Error::Startup(format!("Failed to create disk storage: {e}")))?;
314
315 let evm_network = match config.payment.evm_network {
317 EvmNetworkConfig::ArbitrumOne => EvmNetwork::ArbitrumOne,
318 EvmNetworkConfig::ArbitrumSepolia => EvmNetwork::ArbitrumSepoliaTest,
319 };
320 let payment_config = PaymentVerifierConfig {
321 evm: crate::payment::EvmVerifierConfig {
322 enabled: config.payment.enabled,
323 network: evm_network,
324 },
325 cache_capacity: config.payment.cache_capacity,
326 };
327 let payment_verifier = PaymentVerifier::new(payment_config);
328
329 let rewards_address = match config.payment.rewards_address {
331 Some(ref addr) => parse_rewards_address(addr)?,
332 None => RewardsAddress::new(DEFAULT_REWARDS_ADDRESS),
333 };
334 let metrics_tracker = QuotingMetricsTracker::new(DEFAULT_MAX_QUOTING_RECORDS, 0);
335 let quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
336
337 info!(
338 "ANT protocol handler initialized (protocol={})",
339 CHUNK_PROTOCOL_ID
340 );
341
342 Ok(AntProtocol::new(
343 Arc::new(storage),
344 Arc::new(payment_verifier),
345 Arc::new(quote_generator),
346 ))
347 }
348
349 async fn build_bootstrap_manager(config: &NodeConfig) -> Option<BootstrapManager> {
351 let cache_dir = config
352 .bootstrap_cache
353 .cache_dir
354 .clone()
355 .unwrap_or_else(|| config.root_dir.join("bootstrap_cache"));
356
357 if let Err(e) = std::fs::create_dir_all(&cache_dir) {
359 warn!("Failed to create bootstrap cache directory: {}", e);
360 return None;
361 }
362
363 let bootstrap_config = CoreBootstrapConfig {
364 cache_dir,
365 max_peers: config.bootstrap_cache.max_contacts,
366 ..CoreBootstrapConfig::default()
367 };
368
369 match BootstrapManager::with_config(bootstrap_config).await {
370 Ok(manager) => {
371 info!(
372 "Bootstrap cache initialized with {} max contacts",
373 config.bootstrap_cache.max_contacts
374 );
375 Some(manager)
376 }
377 Err(e) => {
378 warn!("Failed to initialize bootstrap cache: {}", e);
379 None
380 }
381 }
382 }
383}
384
385pub struct RunningNode {
387 config: NodeConfig,
388 p2p_node: Arc<P2PNode>,
389 shutdown_tx: watch::Sender<bool>,
390 shutdown_rx: watch::Receiver<bool>,
391 events_tx: NodeEventsSender,
392 events_rx: Option<NodeEventsChannel>,
393 upgrade_monitor: Option<Arc<UpgradeMonitor>>,
394 bootstrap_manager: Option<BootstrapManager>,
396 ant_protocol: Option<Arc<AntProtocol>>,
398 protocol_task: Option<JoinHandle<()>>,
400}
401
402impl RunningNode {
403 #[must_use]
405 pub fn root_dir(&self) -> &PathBuf {
406 &self.config.root_dir
407 }
408
409 pub fn events(&mut self) -> Option<NodeEventsChannel> {
413 self.events_rx.take()
414 }
415
416 #[must_use]
418 pub fn subscribe_events(&self) -> NodeEventsChannel {
419 self.events_tx.subscribe()
420 }
421
422 pub async fn run(&mut self) -> Result<()> {
428 info!("Starting saorsa-node");
429
430 self.p2p_node
432 .start()
433 .await
434 .map_err(|e| Error::Startup(format!("Failed to start P2P node: {e}")))?;
435
436 info!(
437 "P2P node started, listening on {:?}",
438 self.p2p_node.listen_addrs().await
439 );
440
441 if let Err(e) = self.events_tx.send(NodeEvent::Started) {
443 warn!("Failed to send Started event: {e}");
444 }
445
446 self.start_protocol_routing();
448
449 if let Some(ref monitor) = self.upgrade_monitor {
451 let monitor = Arc::clone(monitor);
452 let events_tx = self.events_tx.clone();
453 let mut shutdown_rx = self.shutdown_rx.clone();
454
455 tokio::spawn(async move {
456 let upgrader = AutoApplyUpgrader::new();
457
458 loop {
459 tokio::select! {
460 _ = shutdown_rx.changed() => {
461 if *shutdown_rx.borrow() {
462 break;
463 }
464 }
465 result = monitor.check_for_updates() => {
466 if let Ok(Some(upgrade_info)) = result {
467 info!(
468 "Upgrade available: {} -> {}",
469 upgrader.current_version(),
470 upgrade_info.version
471 );
472
473 if let Err(e) = events_tx.send(NodeEvent::UpgradeAvailable {
475 version: upgrade_info.version.to_string(),
476 }) {
477 warn!("Failed to send UpgradeAvailable event: {e}");
478 }
479
480 info!("Starting auto-apply upgrade...");
482 match upgrader.apply_upgrade(&upgrade_info).await {
483 Ok(UpgradeResult::Success { version }) => {
484 info!("Upgrade to {} successful! Process will restart.", version);
485 }
487 Ok(UpgradeResult::RolledBack { reason }) => {
488 warn!("Upgrade rolled back: {}", reason);
489 }
490 Ok(UpgradeResult::NoUpgrade) => {
491 debug!("No upgrade needed");
492 }
493 Err(e) => {
494 error!("Critical upgrade error: {}", e);
495 }
496 }
497 }
498 tokio::time::sleep(monitor.check_interval()).await;
500 }
501 }
502 }
503 });
504 }
505
506 info!("Node running, waiting for shutdown signal");
507
508 self.run_event_loop().await?;
510
511 if let Some(ref manager) = self.bootstrap_manager {
513 match manager.get_stats().await {
514 Ok(stats) => {
515 info!(
516 "Bootstrap cache shutdown: {} contacts, avg quality {:.2}",
517 stats.total_contacts, stats.average_quality_score
518 );
519 }
520 Err(e) => {
521 debug!("Failed to get bootstrap cache stats: {}", e);
522 }
523 }
524 }
525
526 if let Some(handle) = self.protocol_task.take() {
528 handle.abort();
529 }
530
531 info!("Shutting down P2P node...");
533 if let Err(e) = self.p2p_node.shutdown().await {
534 warn!("Error during P2P node shutdown: {e}");
535 }
536
537 if let Err(e) = self.events_tx.send(NodeEvent::ShuttingDown) {
538 warn!("Failed to send ShuttingDown event: {e}");
539 }
540 info!("Node shutdown complete");
541 Ok(())
542 }
543
544 #[cfg(unix)]
546 async fn run_event_loop(&mut self) -> Result<()> {
547 let mut sigterm = signal(SignalKind::terminate())?;
548 let mut sighup = signal(SignalKind::hangup())?;
549
550 loop {
551 tokio::select! {
552 _ = self.shutdown_rx.changed() => {
553 if *self.shutdown_rx.borrow() {
554 info!("Shutdown signal received");
555 break;
556 }
557 }
558 _ = tokio::signal::ctrl_c() => {
559 info!("Received SIGINT (Ctrl-C), initiating shutdown");
560 self.shutdown();
561 break;
562 }
563 _ = sigterm.recv() => {
564 info!("Received SIGTERM, initiating shutdown");
565 self.shutdown();
566 break;
567 }
568 _ = sighup.recv() => {
569 info!("Received SIGHUP, could reload config here");
570 }
572 }
573 }
574 Ok(())
575 }
576
577 #[cfg(not(unix))]
579 async fn run_event_loop(&mut self) -> Result<()> {
580 loop {
581 tokio::select! {
582 _ = self.shutdown_rx.changed() => {
583 if *self.shutdown_rx.borrow() {
584 info!("Shutdown signal received");
585 break;
586 }
587 }
588 _ = tokio::signal::ctrl_c() => {
589 info!("Received Ctrl-C, initiating shutdown");
590 self.shutdown();
591 break;
592 }
593 }
594 }
595 Ok(())
596 }
597
598 fn start_protocol_routing(&mut self) {
603 let protocol = match self.ant_protocol {
604 Some(ref p) => Arc::clone(p),
605 None => return,
606 };
607
608 let mut events = self.p2p_node.subscribe_events();
609 let p2p = Arc::clone(&self.p2p_node);
610
611 self.protocol_task = Some(tokio::spawn(async move {
612 while let Ok(event) = events.recv().await {
613 if let P2PEvent::Message {
614 topic,
615 source,
616 data,
617 } = event
618 {
619 if topic == CHUNK_PROTOCOL_ID {
620 debug!("Received chunk protocol message from {}", source);
621 let protocol = Arc::clone(&protocol);
622 let p2p = Arc::clone(&p2p);
623 tokio::spawn(async move {
624 match protocol.handle_message(&data).await {
625 Ok(response) => {
626 if let Err(e) = p2p
627 .send_message(&source, CHUNK_PROTOCOL_ID, response.to_vec())
628 .await
629 {
630 warn!(
631 "Failed to send protocol response to {}: {}",
632 source, e
633 );
634 }
635 }
636 Err(e) => {
637 warn!("Protocol handler error: {}", e);
638 }
639 }
640 });
641 }
642 }
643 }
644 }));
645 info!("Protocol message routing started");
646 }
647
648 pub fn shutdown(&self) {
650 if let Err(e) = self.shutdown_tx.send(true) {
651 warn!("Failed to send shutdown signal: {e}");
652 }
653 }
654}
655
656#[cfg(test)]
657#[allow(clippy::unwrap_used, clippy::expect_used)]
658mod tests {
659 use super::*;
660
661 #[test]
662 fn test_build_upgrade_monitor_staged_rollout_enabled() {
663 let config = NodeConfig {
664 upgrade: crate::config::UpgradeConfig {
665 enabled: true,
666 staged_rollout_hours: 24,
667 ..Default::default()
668 },
669 ..Default::default()
670 };
671 let seed = b"node-seed";
672
673 let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
674 assert!(monitor.has_staged_rollout());
675 }
676
677 #[test]
678 fn test_build_upgrade_monitor_staged_rollout_disabled() {
679 let config = NodeConfig {
680 upgrade: crate::config::UpgradeConfig {
681 enabled: true,
682 staged_rollout_hours: 0,
683 ..Default::default()
684 },
685 ..Default::default()
686 };
687 let seed = b"node-seed";
688
689 let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
690 assert!(!monitor.has_staged_rollout());
691 }
692
693 #[test]
694 fn test_build_core_config_sets_production_mode() {
695 let config = NodeConfig {
696 network_mode: NetworkMode::Production,
697 ..Default::default()
698 };
699 let core = NodeBuilder::build_core_config(&config).expect("core config");
700 assert!(core.production_config.is_some());
701 assert!(core.diversity_config.is_some());
702 }
703
704 #[test]
705 fn test_build_core_config_sets_development_mode_relaxed() {
706 let config = NodeConfig {
707 network_mode: NetworkMode::Development,
708 ..Default::default()
709 };
710 let core = NodeBuilder::build_core_config(&config).expect("core config");
711 assert!(core.production_config.is_none());
712 let diversity = core.diversity_config.expect("diversity");
713 assert!(diversity.is_relaxed());
714 }
715}