saorsa_node/
node.rs

1//! Node implementation - thin wrapper around saorsa-core's `P2PNode`.
2
3use crate::attestation::VerificationLevel;
4use crate::config::{AttestationMode, AttestationNodeConfig, IpVersion, NetworkMode, NodeConfig};
5use crate::error::{Error, Result};
6use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEventsSender};
7use crate::upgrade::{AutoApplyUpgrader, UpgradeMonitor, UpgradeResult};
8use saorsa_core::{
9    AttestationConfig as CoreAttestationConfig, BootstrapConfig as CoreBootstrapConfig,
10    BootstrapManager, EnforcementMode as CoreEnforcementMode,
11    IPDiversityConfig as CoreDiversityConfig, NodeConfig as CoreNodeConfig, P2PNode,
12    ProductionConfig as CoreProductionConfig,
13};
14use std::net::SocketAddr;
15use std::path::PathBuf;
16use std::sync::Arc;
17use tokio::sync::watch;
18use tracing::{debug, error, info, warn};
19
20#[cfg(unix)]
21use tokio::signal::unix::{signal, SignalKind};
22
23/// Builder for constructing a saorsa node.
24pub struct NodeBuilder {
25    config: NodeConfig,
26}
27
28impl NodeBuilder {
29    /// Create a new node builder with the given configuration.
30    #[must_use]
31    pub fn new(config: NodeConfig) -> Self {
32        Self { config }
33    }
34
35    /// Build and start the node.
36    ///
37    /// # Errors
38    ///
39    /// Returns an error if the node fails to start, or if attestation is enabled
40    /// without a proper verification feature (blocks startup for security).
41    pub async fn build(self) -> Result<RunningNode> {
42        info!("Building saorsa-node with config: {:?}", self.config);
43
44        // Validate attestation security BEFORE proceeding
45        Self::validate_attestation_security(&self.config)?;
46
47        // Ensure root directory exists
48        std::fs::create_dir_all(&self.config.root_dir)?;
49
50        // Create shutdown channel
51        let (shutdown_tx, shutdown_rx) = watch::channel(false);
52
53        // Create event channel
54        let (events_tx, events_rx) = create_event_channel();
55
56        // Convert our config to saorsa-core's config
57        let core_config = Self::build_core_config(&self.config)?;
58        debug!("Core config: {:?}", core_config);
59
60        // Initialize saorsa-core's P2PNode
61        let p2p_node = P2PNode::new(core_config)
62            .await
63            .map_err(|e| Error::Startup(format!("Failed to create P2P node: {e}")))?;
64
65        // Create upgrade monitor if enabled
66        let upgrade_monitor = if self.config.upgrade.enabled {
67            let node_id_seed = p2p_node.peer_id().as_bytes();
68            Some(Self::build_upgrade_monitor(&self.config, node_id_seed))
69        } else {
70            None
71        };
72
73        // Initialize bootstrap cache manager if enabled
74        let bootstrap_manager = if self.config.bootstrap_cache.enabled {
75            Self::build_bootstrap_manager(&self.config).await
76        } else {
77            info!("Bootstrap cache disabled");
78            None
79        };
80
81        let node = RunningNode {
82            config: self.config,
83            p2p_node: Arc::new(p2p_node),
84            shutdown_tx,
85            shutdown_rx,
86            events_tx,
87            events_rx: Some(events_rx),
88            upgrade_monitor,
89            bootstrap_manager,
90        };
91
92        Ok(node)
93    }
94
95    /// Build the saorsa-core `NodeConfig` from our config.
96    fn build_core_config(config: &NodeConfig) -> Result<CoreNodeConfig> {
97        // Determine listen address based on port and IP version
98        let listen_addr: SocketAddr = match config.ip_version {
99            IpVersion::Ipv4 | IpVersion::Dual => format!("0.0.0.0:{}", config.port)
100                .parse()
101                .map_err(|e| Error::Config(format!("Invalid listen address: {e}")))?,
102            IpVersion::Ipv6 => format!("[::]:{}", config.port)
103                .parse()
104                .map_err(|e| Error::Config(format!("Invalid listen address: {e}")))?,
105        };
106
107        let mut core_config = CoreNodeConfig::new()
108            .map_err(|e| Error::Config(format!("Failed to create core config: {e}")))?;
109
110        // Set listen address
111        core_config.listen_addr = listen_addr;
112        core_config.listen_addrs = vec![listen_addr];
113
114        // Enable IPv6 if configured
115        core_config.enable_ipv6 = matches!(config.ip_version, IpVersion::Ipv6 | IpVersion::Dual);
116
117        // Add bootstrap peers
118        core_config.bootstrap_peers.clone_from(&config.bootstrap);
119
120        // Propagate network-mode tuning into saorsa-core where supported.
121        match config.network_mode {
122            NetworkMode::Production => {
123                core_config.production_config = Some(CoreProductionConfig::default());
124                core_config.diversity_config = Some(CoreDiversityConfig::default());
125            }
126            NetworkMode::Testnet => {
127                core_config.production_config = Some(CoreProductionConfig::default());
128                let mut diversity = CoreDiversityConfig::testnet();
129                diversity.max_nodes_per_asn = config.testnet.max_nodes_per_asn;
130                diversity.max_nodes_per_64 = config.testnet.max_nodes_per_64;
131                diversity.enable_geolocation_check = config.testnet.enable_geo_checks;
132                diversity.min_geographic_diversity = if config.testnet.enable_geo_checks {
133                    3
134                } else {
135                    1
136                };
137                core_config.diversity_config = Some(diversity);
138
139                if config.testnet.enforce_age_requirements {
140                    warn!(
141                        "testnet.enforce_age_requirements is set but saorsa-core does not yet \
142                         expose a knob; age checks may remain relaxed"
143                    );
144                }
145            }
146            NetworkMode::Development => {
147                core_config.production_config = None;
148                core_config.diversity_config = Some(CoreDiversityConfig::permissive());
149            }
150        }
151
152        // Configure attestation
153        core_config.attestation_config = Self::build_attestation_config(&config.attestation)?;
154
155        Ok(core_config)
156    }
157
158    /// Validate attestation security configuration.
159    ///
160    /// **BLOCKS STARTUP** if attestation is enabled without a verification feature.
161    fn validate_attestation_security(config: &NodeConfig) -> Result<()> {
162        if !config.attestation.enabled {
163            return Ok(());
164        }
165
166        let level = VerificationLevel::current();
167        info!("Attestation verification level: {}", level);
168
169        match level {
170            VerificationLevel::None => {
171                error!("SECURITY: Attestation enabled without verification feature!");
172                error!(
173                    "Enable zkvm-prover or zkvm-verifier-groth16 feature for real verification."
174                );
175                error!("Build with: cargo build --features zkvm-prover");
176                return Err(Error::Config(
177                    "Attestation requires zkvm-prover or zkvm-verifier-groth16 feature. \
178                     Without a verification feature, proofs use mock verification \
179                     which provides NO CRYPTOGRAPHIC SECURITY. \
180                     Build with: cargo build --features zkvm-prover"
181                        .into(),
182                ));
183            }
184            VerificationLevel::Groth16 => {
185                if config.attestation.require_pq_secure {
186                    error!(
187                        "SECURITY: require_pq_secure=true but only Groth16 verification available"
188                    );
189                    return Err(Error::Config(
190                        "require_pq_secure=true but only Groth16 available (not post-quantum secure). \
191                         Either enable zkvm-prover feature for STARK verification, \
192                         or set require_pq_secure=false in attestation config."
193                            .into(),
194                    ));
195                }
196                warn!(
197                    "Attestation using Groth16 verification - NOT post-quantum secure. \
198                     Consider enabling zkvm-prover feature for production deployments."
199                );
200            }
201            VerificationLevel::Stark => {
202                info!("Attestation using STARK verification (post-quantum secure)");
203            }
204        }
205
206        Ok(())
207    }
208
209    /// Build the saorsa-core `AttestationConfig` from our config.
210    fn build_attestation_config(config: &AttestationNodeConfig) -> Result<CoreAttestationConfig> {
211        let enforcement_mode = match config.mode {
212            AttestationMode::Off => CoreEnforcementMode::Off,
213            AttestationMode::Soft => CoreEnforcementMode::Soft,
214            AttestationMode::Hard => CoreEnforcementMode::Hard,
215        };
216
217        // Parse hex-encoded binary hashes
218        let allowed_binary_hashes = config
219            .allowed_binary_hashes
220            .iter()
221            .map(|hex_str| {
222                let bytes = hex::decode(hex_str).map_err(|e| {
223                    Error::Config(format!(
224                        "Invalid hex in allowed_binary_hashes '{hex_str}': {e}"
225                    ))
226                })?;
227                if bytes.len() != 32 {
228                    let len = bytes.len();
229                    return Err(Error::Config(format!(
230                        "Binary hash must be 32 bytes (64 hex chars), got {len} bytes for '{hex_str}'"
231                    )));
232                }
233                let mut arr = [0u8; 32];
234                arr.copy_from_slice(&bytes);
235                Ok(arr)
236            })
237            .collect::<Result<Vec<_>>>()?;
238
239        if config.mode == AttestationMode::Hard && config.enabled {
240            if allowed_binary_hashes.is_empty() {
241                warn!(
242                    "Attestation in Hard mode with empty allowed_binary_hashes - \
243                     all binaries will be accepted. Consider specifying allowed hashes."
244                );
245            } else {
246                info!(
247                    "Attestation in Hard mode with {} allowed binary hash(es)",
248                    allowed_binary_hashes.len()
249                );
250            }
251        }
252
253        Ok(CoreAttestationConfig {
254            enabled: config.enabled,
255            enforcement_mode,
256            allowed_binary_hashes,
257            sunset_grace_days: config.sunset_grace_days,
258        })
259    }
260
261    fn build_upgrade_monitor(config: &NodeConfig, node_id_seed: &[u8]) -> Arc<UpgradeMonitor> {
262        let monitor = UpgradeMonitor::new(
263            config.upgrade.github_repo.clone(),
264            config.upgrade.channel,
265            config.upgrade.check_interval_hours,
266        );
267
268        if config.upgrade.staged_rollout_hours > 0 {
269            Arc::new(monitor.with_staged_rollout(node_id_seed, config.upgrade.staged_rollout_hours))
270        } else {
271            Arc::new(monitor)
272        }
273    }
274
275    /// Build the bootstrap cache manager from config.
276    async fn build_bootstrap_manager(config: &NodeConfig) -> Option<BootstrapManager> {
277        let cache_dir = config
278            .bootstrap_cache
279            .cache_dir
280            .clone()
281            .unwrap_or_else(|| config.root_dir.join("bootstrap_cache"));
282
283        // Create cache directory
284        if let Err(e) = std::fs::create_dir_all(&cache_dir) {
285            warn!("Failed to create bootstrap cache directory: {}", e);
286            return None;
287        }
288
289        let bootstrap_config = CoreBootstrapConfig {
290            cache_dir,
291            max_peers: config.bootstrap_cache.max_contacts,
292            ..CoreBootstrapConfig::default()
293        };
294
295        match BootstrapManager::with_config(bootstrap_config).await {
296            Ok(manager) => {
297                info!(
298                    "Bootstrap cache initialized with {} max contacts",
299                    config.bootstrap_cache.max_contacts
300                );
301                Some(manager)
302            }
303            Err(e) => {
304                warn!("Failed to initialize bootstrap cache: {}", e);
305                None
306            }
307        }
308    }
309}
310
311/// A running saorsa node.
312pub struct RunningNode {
313    config: NodeConfig,
314    p2p_node: Arc<P2PNode>,
315    shutdown_tx: watch::Sender<bool>,
316    shutdown_rx: watch::Receiver<bool>,
317    events_tx: NodeEventsSender,
318    events_rx: Option<NodeEventsChannel>,
319    upgrade_monitor: Option<Arc<UpgradeMonitor>>,
320    /// Bootstrap cache manager for persistent peer storage.
321    bootstrap_manager: Option<BootstrapManager>,
322}
323
324impl RunningNode {
325    /// Get the node's root directory.
326    #[must_use]
327    pub fn root_dir(&self) -> &PathBuf {
328        &self.config.root_dir
329    }
330
331    /// Get a receiver for node events.
332    ///
333    /// Note: Can only be called once. Subsequent calls return None.
334    pub fn events(&mut self) -> Option<NodeEventsChannel> {
335        self.events_rx.take()
336    }
337
338    /// Subscribe to node events.
339    #[must_use]
340    pub fn subscribe_events(&self) -> NodeEventsChannel {
341        self.events_tx.subscribe()
342    }
343
344    /// Run the node until shutdown is requested.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if the node encounters a fatal error.
349    pub async fn run(&mut self) -> Result<()> {
350        info!("Starting saorsa-node");
351
352        // Start the P2P node
353        self.p2p_node
354            .start()
355            .await
356            .map_err(|e| Error::Startup(format!("Failed to start P2P node: {e}")))?;
357
358        info!(
359            "P2P node started, listening on {:?}",
360            self.p2p_node.listen_addrs().await
361        );
362
363        // Emit started event
364        if let Err(e) = self.events_tx.send(NodeEvent::Started) {
365            warn!("Failed to send Started event: {e}");
366        }
367
368        // Start upgrade monitor if enabled
369        if let Some(ref monitor) = self.upgrade_monitor {
370            let monitor = Arc::clone(monitor);
371            let events_tx = self.events_tx.clone();
372            let mut shutdown_rx = self.shutdown_rx.clone();
373
374            tokio::spawn(async move {
375                let upgrader = AutoApplyUpgrader::new();
376
377                loop {
378                    tokio::select! {
379                        _ = shutdown_rx.changed() => {
380                            if *shutdown_rx.borrow() {
381                                break;
382                            }
383                        }
384                        result = monitor.check_for_updates() => {
385                            if let Ok(Some(upgrade_info)) = result {
386                                info!(
387                                    "Upgrade available: {} -> {}",
388                                    upgrader.current_version(),
389                                    upgrade_info.version
390                                );
391
392                                // Send notification event
393                                if let Err(e) = events_tx.send(NodeEvent::UpgradeAvailable {
394                                    version: upgrade_info.version.to_string(),
395                                }) {
396                                    warn!("Failed to send UpgradeAvailable event: {e}");
397                                }
398
399                                // Auto-apply the upgrade
400                                info!("Starting auto-apply upgrade...");
401                                match upgrader.apply_upgrade(&upgrade_info).await {
402                                    Ok(UpgradeResult::Success { version }) => {
403                                        info!("Upgrade to {} successful! Process will restart.", version);
404                                        // If we reach here, exec() failed or not supported
405                                    }
406                                    Ok(UpgradeResult::RolledBack { reason }) => {
407                                        warn!("Upgrade rolled back: {}", reason);
408                                    }
409                                    Ok(UpgradeResult::NoUpgrade) => {
410                                        debug!("No upgrade needed");
411                                    }
412                                    Err(e) => {
413                                        error!("Critical upgrade error: {}", e);
414                                    }
415                                }
416                            }
417                            // Wait for next check interval
418                            tokio::time::sleep(monitor.check_interval()).await;
419                        }
420                    }
421                }
422            });
423        }
424
425        info!("Node running, waiting for shutdown signal");
426
427        // Run the main event loop with signal handling
428        self.run_event_loop().await?;
429
430        // Log bootstrap cache stats before shutdown
431        if let Some(ref manager) = self.bootstrap_manager {
432            match manager.get_stats().await {
433                Ok(stats) => {
434                    info!(
435                        "Bootstrap cache shutdown: {} contacts, avg quality {:.2}",
436                        stats.total_contacts, stats.average_quality_score
437                    );
438                }
439                Err(e) => {
440                    debug!("Failed to get bootstrap cache stats: {}", e);
441                }
442            }
443        }
444
445        // Shutdown P2P node
446        info!("Shutting down P2P node...");
447        if let Err(e) = self.p2p_node.shutdown().await {
448            warn!("Error during P2P node shutdown: {e}");
449        }
450
451        if let Err(e) = self.events_tx.send(NodeEvent::ShuttingDown) {
452            warn!("Failed to send ShuttingDown event: {e}");
453        }
454        info!("Node shutdown complete");
455        Ok(())
456    }
457
458    /// Run the main event loop, handling shutdown and signals.
459    #[cfg(unix)]
460    async fn run_event_loop(&mut self) -> Result<()> {
461        let mut sigterm = signal(SignalKind::terminate())?;
462        let mut sighup = signal(SignalKind::hangup())?;
463
464        loop {
465            tokio::select! {
466                _ = self.shutdown_rx.changed() => {
467                    if *self.shutdown_rx.borrow() {
468                        info!("Shutdown signal received");
469                        break;
470                    }
471                }
472                _ = tokio::signal::ctrl_c() => {
473                    info!("Received SIGINT (Ctrl-C), initiating shutdown");
474                    self.shutdown();
475                    break;
476                }
477                _ = sigterm.recv() => {
478                    info!("Received SIGTERM, initiating shutdown");
479                    self.shutdown();
480                    break;
481                }
482                _ = sighup.recv() => {
483                    info!("Received SIGHUP, could reload config here");
484                    // TODO: Implement config reload on SIGHUP
485                }
486            }
487        }
488        Ok(())
489    }
490
491    /// Run the main event loop, handling shutdown signals (non-Unix version).
492    #[cfg(not(unix))]
493    async fn run_event_loop(&mut self) -> Result<()> {
494        loop {
495            tokio::select! {
496                _ = self.shutdown_rx.changed() => {
497                    if *self.shutdown_rx.borrow() {
498                        info!("Shutdown signal received");
499                        break;
500                    }
501                }
502                _ = tokio::signal::ctrl_c() => {
503                    info!("Received Ctrl-C, initiating shutdown");
504                    self.shutdown();
505                    break;
506                }
507            }
508        }
509        Ok(())
510    }
511
512    /// Request the node to shut down.
513    pub fn shutdown(&self) {
514        if let Err(e) = self.shutdown_tx.send(true) {
515            warn!("Failed to send shutdown signal: {e}");
516        }
517    }
518}
519
520#[cfg(test)]
521#[allow(clippy::unwrap_used, clippy::expect_used)]
522mod tests {
523    use super::*;
524
525    #[test]
526    fn test_build_upgrade_monitor_staged_rollout_enabled() {
527        let config = NodeConfig {
528            upgrade: crate::config::UpgradeConfig {
529                enabled: true,
530                staged_rollout_hours: 24,
531                ..Default::default()
532            },
533            ..Default::default()
534        };
535        let seed = b"node-seed";
536
537        let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
538        assert!(monitor.has_staged_rollout());
539    }
540
541    #[test]
542    fn test_build_upgrade_monitor_staged_rollout_disabled() {
543        let config = NodeConfig {
544            upgrade: crate::config::UpgradeConfig {
545                enabled: true,
546                staged_rollout_hours: 0,
547                ..Default::default()
548            },
549            ..Default::default()
550        };
551        let seed = b"node-seed";
552
553        let monitor = NodeBuilder::build_upgrade_monitor(&config, seed);
554        assert!(!monitor.has_staged_rollout());
555    }
556
557    #[test]
558    fn test_build_core_config_sets_production_mode() {
559        let config = NodeConfig {
560            network_mode: NetworkMode::Production,
561            ..Default::default()
562        };
563        let core = NodeBuilder::build_core_config(&config).expect("core config");
564        assert!(core.production_config.is_some());
565        assert!(core.diversity_config.is_some());
566    }
567
568    #[test]
569    fn test_build_core_config_sets_development_mode_relaxed() {
570        let config = NodeConfig {
571            network_mode: NetworkMode::Development,
572            ..Default::default()
573        };
574        let core = NodeBuilder::build_core_config(&config).expect("core config");
575        assert!(core.production_config.is_none());
576        let diversity = core.diversity_config.expect("diversity");
577        assert!(diversity.is_relaxed());
578    }
579}