Skip to main content

saorsa_node/
devnet.rs

1//! Local devnet infrastructure for spawning and managing multiple nodes.
2//!
3//! This module provides a local, in-process devnet suitable for running
4//! multi-node networks on a single machine.
5
6use crate::ant_protocol::CHUNK_PROTOCOL_ID;
7use crate::config::{default_root_dir, NODES_SUBDIR, NODE_IDENTITY_FILENAME};
8use crate::payment::{
9    EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
10    QuotingMetricsTracker,
11};
12use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
13use ant_evm::RewardsAddress;
14use evmlib::Network as EvmNetwork;
15use rand::Rng;
16use saorsa_core::identity::NodeIdentity;
17use saorsa_core::{
18    IPDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId,
19};
20use serde::{Deserialize, Serialize};
21use std::net::{Ipv4Addr, SocketAddr};
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::sync::RwLock;
26use tokio::task::JoinHandle;
27use tokio::time::Instant;
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, info, warn};
30
31// =============================================================================
32// Devnet Constants
33// =============================================================================
34
35/// Minimum port for random devnet allocation.
36pub const DEVNET_PORT_RANGE_MIN: u16 = 20_000;
37
38/// Maximum port for random devnet allocation.
39pub const DEVNET_PORT_RANGE_MAX: u16 = 60_000;
40
41// =============================================================================
42// Default Timing Constants
43// =============================================================================
44
45/// Default delay between spawning nodes (milliseconds).
46const DEFAULT_SPAWN_DELAY_MS: u64 = 200;
47
48/// Default timeout for network stabilization (seconds).
49const DEFAULT_STABILIZATION_TIMEOUT_SECS: u64 = 120;
50
51/// Default timeout for single node startup (seconds).
52const DEFAULT_NODE_STARTUP_TIMEOUT_SECS: u64 = 30;
53
54/// Stabilization timeout for minimal network (seconds).
55const MINIMAL_STABILIZATION_TIMEOUT_SECS: u64 = 30;
56
57/// Stabilization timeout for small network (seconds).
58const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;
59
60/// Polling interval when waiting for individual nodes to become ready (milliseconds).
61const NODE_READY_POLL_INTERVAL_MS: u64 = 100;
62
63/// Polling interval when waiting for network stabilization (seconds).
64const STABILIZATION_POLL_INTERVAL_SECS: u64 = 1;
65
66/// Maximum minimum connections required per node during stabilization.
67const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3;
68
69/// Health monitor check interval (seconds).
70const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;
71
72// =============================================================================
73// AntProtocol Devnet Configuration
74// =============================================================================
75
76/// Payment cache capacity for devnet nodes.
77const DEVNET_PAYMENT_CACHE_CAPACITY: usize = 1000;
78
79/// Devnet rewards address (20 bytes, all 0x01).
80const DEVNET_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];
81
82/// Max records for quoting metrics (devnet value).
83const DEVNET_MAX_RECORDS: usize = 100_000;
84
85/// Initial records for quoting metrics (devnet value).
86const DEVNET_INITIAL_RECORDS: usize = 1000;
87
88// =============================================================================
89// Default Node Counts
90// =============================================================================
91
92/// Default number of nodes in a full devnet.
93pub const DEFAULT_NODE_COUNT: usize = 25;
94
95/// Default number of bootstrap nodes.
96pub const DEFAULT_BOOTSTRAP_COUNT: usize = 3;
97
98/// Number of nodes in a minimal devnet.
99pub const MINIMAL_NODE_COUNT: usize = 5;
100
101/// Number of bootstrap nodes in a minimal network.
102pub const MINIMAL_BOOTSTRAP_COUNT: usize = 2;
103
104/// Number of nodes in a small devnet.
105pub const SMALL_NODE_COUNT: usize = 10;
106
107/// Error type for devnet operations.
108#[derive(Debug, thiserror::Error)]
109pub enum DevnetError {
110    /// Configuration error
111    #[error("Configuration error: {0}")]
112    Config(String),
113
114    /// Node startup error
115    #[error("Node startup error: {0}")]
116    Startup(String),
117
118    /// Network stabilization error
119    #[error("Network stabilization error: {0}")]
120    Stabilization(String),
121
122    /// IO error
123    #[error("IO error: {0}")]
124    Io(#[from] std::io::Error),
125
126    /// Core error
127    #[error("Core error: {0}")]
128    Core(String),
129}
130
131/// Result type for devnet operations.
132pub type Result<T> = std::result::Result<T, DevnetError>;
133
134/// Configuration for the devnet.
135///
136/// Each configuration is automatically isolated with unique ports and
137/// data directories to prevent collisions when running multiple devnets.
138#[derive(Debug, Clone)]
139pub struct DevnetConfig {
140    /// Number of nodes to spawn (default: 25).
141    pub node_count: usize,
142
143    /// Base port for node allocation (0 = auto).
144    pub base_port: u16,
145
146    /// Number of bootstrap nodes (first N nodes, default: 3).
147    pub bootstrap_count: usize,
148
149    /// Root directory for devnet data.
150    pub data_dir: PathBuf,
151
152    /// Delay between node spawns (default: 200ms).
153    pub spawn_delay: Duration,
154
155    /// Timeout for network stabilization (default: 120s).
156    pub stabilization_timeout: Duration,
157
158    /// Timeout for single node startup (default: 30s).
159    pub node_startup_timeout: Duration,
160
161    /// Enable verbose logging for devnet nodes.
162    pub enable_node_logging: bool,
163
164    /// Whether to remove the data directory on shutdown.
165    pub cleanup_data_dir: bool,
166
167    /// Optional EVM network for payment verification.
168    /// When `Some`, nodes will use this network (e.g. Anvil testnet) for
169    /// on-chain verification. Defaults to Arbitrum One when `None`.
170    pub evm_network: Option<EvmNetwork>,
171}
172
173impl Default for DevnetConfig {
174    fn default() -> Self {
175        let mut rng = rand::thread_rng();
176
177        #[allow(clippy::cast_possible_truncation)] // DEFAULT_NODE_COUNT is 25, always fits u16
178        let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(DEFAULT_NODE_COUNT as u16);
179        let base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
180
181        Self {
182            node_count: DEFAULT_NODE_COUNT,
183            base_port,
184            bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
185            data_dir: default_root_dir(),
186            spawn_delay: Duration::from_millis(DEFAULT_SPAWN_DELAY_MS),
187            stabilization_timeout: Duration::from_secs(DEFAULT_STABILIZATION_TIMEOUT_SECS),
188            node_startup_timeout: Duration::from_secs(DEFAULT_NODE_STARTUP_TIMEOUT_SECS),
189            enable_node_logging: false,
190            cleanup_data_dir: true,
191            evm_network: None,
192        }
193    }
194}
195
196impl DevnetConfig {
197    /// Minimal devnet preset (5 nodes).
198    #[must_use]
199    pub fn minimal() -> Self {
200        Self {
201            node_count: MINIMAL_NODE_COUNT,
202            bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
203            stabilization_timeout: Duration::from_secs(MINIMAL_STABILIZATION_TIMEOUT_SECS),
204            ..Self::default()
205        }
206    }
207
208    /// Small devnet preset (10 nodes).
209    #[must_use]
210    pub fn small() -> Self {
211        Self {
212            node_count: SMALL_NODE_COUNT,
213            bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
214            stabilization_timeout: Duration::from_secs(SMALL_STABILIZATION_TIMEOUT_SECS),
215            ..Self::default()
216        }
217    }
218}
219
220/// Devnet manifest for client discovery.
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct DevnetManifest {
223    /// Base port for nodes.
224    pub base_port: u16,
225    /// Node count.
226    pub node_count: usize,
227    /// Bootstrap addresses.
228    pub bootstrap: Vec<MultiAddr>,
229    /// Data directory.
230    pub data_dir: PathBuf,
231    /// Creation time in RFC3339.
232    pub created_at: String,
233    /// EVM configuration (present when EVM payment enforcement is enabled).
234    #[serde(default, skip_serializing_if = "Option::is_none")]
235    pub evm: Option<DevnetEvmInfo>,
236}
237
238/// EVM configuration info included in the devnet manifest.
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct DevnetEvmInfo {
241    /// Anvil RPC URL.
242    pub rpc_url: String,
243    /// Funded wallet private key (hex-encoded with 0x prefix).
244    pub wallet_private_key: String,
245    /// Payment token contract address.
246    pub payment_token_address: String,
247    /// Data payments contract address.
248    pub data_payments_address: String,
249}
250
251/// Network state for devnet startup lifecycle.
252#[derive(Debug, Clone)]
253pub enum NetworkState {
254    /// Not started.
255    Uninitialized,
256    /// Bootstrapping nodes are starting.
257    BootstrappingPhase,
258    /// Regular nodes are starting.
259    NodeSpawningPhase,
260    /// Waiting for stabilization.
261    Stabilizing,
262    /// Network is ready.
263    Ready,
264    /// Shutting down.
265    ShuttingDown,
266    /// Stopped.
267    Stopped,
268}
269
270/// Node state for devnet nodes.
271#[derive(Debug, Clone)]
272pub enum NodeState {
273    /// Not started yet.
274    Pending,
275    /// Starting up.
276    Starting,
277    /// Running.
278    Running,
279    /// Connected to peers.
280    Connected,
281    /// Stopped.
282    Stopped,
283    /// Failed to start.
284    Failed(String),
285}
286
287/// A single devnet node instance.
288#[allow(dead_code)]
289pub struct DevnetNode {
290    index: usize,
291    label: String,
292    peer_id: PeerId,
293    port: u16,
294    data_dir: PathBuf,
295    p2p_node: Option<Arc<P2PNode>>,
296    ant_protocol: Option<Arc<AntProtocol>>,
297    is_bootstrap: bool,
298    state: Arc<RwLock<NodeState>>,
299    bootstrap_addrs: Vec<MultiAddr>,
300    protocol_task: Option<JoinHandle<()>>,
301}
302
303impl DevnetNode {
304    /// Get the node's peer count.
305    pub async fn peer_count(&self) -> usize {
306        if let Some(ref node) = self.p2p_node {
307            node.peer_count().await
308        } else {
309            0
310        }
311    }
312}
313
314/// A local devnet composed of multiple nodes.
315pub struct Devnet {
316    config: DevnetConfig,
317    nodes: Vec<DevnetNode>,
318    shutdown: CancellationToken,
319    state: Arc<RwLock<NetworkState>>,
320    health_monitor: Option<JoinHandle<()>>,
321}
322
323impl Devnet {
324    /// Create a new devnet with the given configuration.
325    ///
326    /// # Errors
327    ///
328    /// Returns `DevnetError::Config` if the configuration is invalid (e.g. bootstrap
329    /// count exceeds node count or port range overflow).
330    /// Returns `DevnetError::Io` if the data directory cannot be created.
331    pub async fn new(mut config: DevnetConfig) -> Result<Self> {
332        if config.bootstrap_count >= config.node_count {
333            return Err(DevnetError::Config(
334                "Bootstrap count must be less than node count".to_string(),
335            ));
336        }
337
338        if config.bootstrap_count == 0 {
339            return Err(DevnetError::Config(
340                "At least one bootstrap node is required".to_string(),
341            ));
342        }
343
344        let node_count = config.node_count;
345        let node_count_u16 = u16::try_from(node_count).map_err(|_| {
346            DevnetError::Config(format!("Node count {node_count} exceeds u16::MAX"))
347        })?;
348
349        if config.base_port == 0 {
350            let mut rng = rand::thread_rng();
351            let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(node_count_u16);
352            config.base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
353        }
354
355        let base_port = config.base_port;
356        let max_port = base_port
357            .checked_add(node_count_u16)
358            .ok_or_else(|| {
359                DevnetError::Config(format!(
360                    "Port range overflow: base_port {base_port} + node_count {node_count} exceeds u16::MAX"
361                ))
362            })?;
363        if max_port > DEVNET_PORT_RANGE_MAX {
364            return Err(DevnetError::Config(format!(
365                "Port range overflow: max port {max_port} exceeds DEVNET_PORT_RANGE_MAX {DEVNET_PORT_RANGE_MAX}"
366            )));
367        }
368
369        tokio::fs::create_dir_all(&config.data_dir).await?;
370
371        Ok(Self {
372            config,
373            nodes: Vec::new(),
374            shutdown: CancellationToken::new(),
375            state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
376            health_monitor: None,
377        })
378    }
379
380    /// Start the devnet.
381    ///
382    /// # Errors
383    ///
384    /// Returns `DevnetError::Startup` if any node fails to start, or
385    /// `DevnetError::Stabilization` if the network does not stabilize within the timeout.
386    pub async fn start(&mut self) -> Result<()> {
387        info!(
388            "Starting devnet with {} nodes ({} bootstrap)",
389            self.config.node_count, self.config.bootstrap_count
390        );
391
392        *self.state.write().await = NetworkState::BootstrappingPhase;
393        self.start_bootstrap_nodes().await?;
394
395        *self.state.write().await = NetworkState::NodeSpawningPhase;
396        self.start_regular_nodes().await?;
397
398        *self.state.write().await = NetworkState::Stabilizing;
399        self.wait_for_stabilization().await?;
400
401        self.start_health_monitor();
402
403        *self.state.write().await = NetworkState::Ready;
404        info!("Devnet is ready");
405        Ok(())
406    }
407
408    /// Shutdown the devnet.
409    ///
410    /// # Errors
411    ///
412    /// Returns `DevnetError::Io` if the data directory cleanup fails.
413    pub async fn shutdown(&mut self) -> Result<()> {
414        info!("Shutting down devnet");
415        *self.state.write().await = NetworkState::ShuttingDown;
416
417        self.shutdown.cancel();
418
419        if let Some(handle) = self.health_monitor.take() {
420            handle.abort();
421        }
422
423        let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
424        for node in self.nodes.iter_mut().rev() {
425            debug!("Stopping node {}", node.index);
426            if let Some(handle) = node.protocol_task.take() {
427                handle.abort();
428            }
429
430            let node_index = node.index;
431            let node_state = Arc::clone(&node.state);
432            let p2p_node = node.p2p_node.take();
433
434            shutdown_futures.push(async move {
435                if let Some(p2p) = p2p_node {
436                    if let Err(e) = p2p.shutdown().await {
437                        warn!("Error shutting down node {node_index}: {e}");
438                    }
439                }
440                *node_state.write().await = NodeState::Stopped;
441            });
442        }
443        futures::future::join_all(shutdown_futures).await;
444
445        if self.config.cleanup_data_dir {
446            if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await {
447                warn!("Failed to cleanup devnet data directory: {e}");
448            }
449        }
450
451        *self.state.write().await = NetworkState::Stopped;
452        info!("Devnet shutdown complete");
453        Ok(())
454    }
455
456    /// Get devnet configuration.
457    #[must_use]
458    pub fn config(&self) -> &DevnetConfig {
459        &self.config
460    }
461
462    /// Get bootstrap addresses.
463    #[must_use]
464    pub fn bootstrap_addrs(&self) -> Vec<MultiAddr> {
465        self.nodes
466            .iter()
467            .take(self.config.bootstrap_count)
468            .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
469            .collect()
470    }
471
472    async fn start_bootstrap_nodes(&mut self) -> Result<()> {
473        info!("Starting {} bootstrap nodes", self.config.bootstrap_count);
474
475        for i in 0..self.config.bootstrap_count {
476            let node = self.create_node(i, true, vec![]).await?;
477            self.start_node(node).await?;
478            tokio::time::sleep(self.config.spawn_delay).await;
479        }
480
481        self.wait_for_nodes_ready(0..self.config.bootstrap_count)
482            .await?;
483
484        info!("All bootstrap nodes are ready");
485        Ok(())
486    }
487
488    async fn start_regular_nodes(&mut self) -> Result<()> {
489        let regular_count = self.config.node_count - self.config.bootstrap_count;
490        info!("Starting {} regular nodes", regular_count);
491
492        let bootstrap_addrs: Vec<MultiAddr> = self
493            .nodes
494            .get(0..self.config.bootstrap_count)
495            .ok_or_else(|| {
496                DevnetError::Config(format!(
497                    "Bootstrap count {} exceeds nodes length {}",
498                    self.config.bootstrap_count,
499                    self.nodes.len()
500                ))
501            })?
502            .iter()
503            .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
504            .collect();
505
506        for i in self.config.bootstrap_count..self.config.node_count {
507            let node = self.create_node(i, false, bootstrap_addrs.clone()).await?;
508            self.start_node(node).await?;
509            tokio::time::sleep(self.config.spawn_delay).await;
510        }
511
512        info!("All regular nodes started");
513        Ok(())
514    }
515
516    async fn create_node(
517        &self,
518        index: usize,
519        is_bootstrap: bool,
520        bootstrap_addrs: Vec<MultiAddr>,
521    ) -> Result<DevnetNode> {
522        let index_u16 = u16::try_from(index)
523            .map_err(|_| DevnetError::Config(format!("Node index {index} exceeds u16::MAX")))?;
524        let port = self.config.base_port + index_u16;
525
526        // Generate identity first so we can use peer_id as the directory name
527        let identity = NodeIdentity::generate()
528            .map_err(|e| DevnetError::Core(format!("Failed to generate node identity: {e}")))?;
529        let peer_id = *identity.peer_id();
530        let label = format!("devnet_node_{index}");
531        let data_dir = self
532            .config
533            .data_dir
534            .join(NODES_SUBDIR)
535            .join(peer_id.to_hex());
536
537        tokio::fs::create_dir_all(&data_dir).await?;
538
539        identity
540            .save_to_file(&data_dir.join(NODE_IDENTITY_FILENAME))
541            .await
542            .map_err(|e| DevnetError::Core(format!("Failed to save node identity: {e}")))?;
543
544        let ant_protocol = Self::create_ant_protocol(&data_dir, &identity, &self.config).await?;
545
546        Ok(DevnetNode {
547            index,
548            label,
549            peer_id,
550            port,
551            data_dir,
552            p2p_node: None,
553            ant_protocol: Some(Arc::new(ant_protocol)),
554            is_bootstrap,
555            state: Arc::new(RwLock::new(NodeState::Pending)),
556            bootstrap_addrs,
557            protocol_task: None,
558        })
559    }
560
561    async fn create_ant_protocol(
562        data_dir: &std::path::Path,
563        identity: &NodeIdentity,
564        config: &DevnetConfig,
565    ) -> Result<AntProtocol> {
566        let storage_config = LmdbStorageConfig {
567            root_dir: data_dir.to_path_buf(),
568            verify_on_read: true,
569            max_chunks: 0,
570            max_map_size: 0,
571        };
572        let storage = LmdbStorage::new(storage_config)
573            .await
574            .map_err(|e| DevnetError::Core(format!("Failed to create LMDB storage: {e}")))?;
575
576        let evm_config = EvmVerifierConfig {
577            network: config
578                .evm_network
579                .clone()
580                .unwrap_or(EvmNetwork::ArbitrumOne),
581        };
582
583        let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS);
584        let payment_config = PaymentVerifierConfig {
585            evm: evm_config,
586            cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
587            local_rewards_address: rewards_address,
588        };
589        let payment_verifier = PaymentVerifier::new(payment_config);
590        let metrics_tracker =
591            QuotingMetricsTracker::new(DEVNET_MAX_RECORDS, DEVNET_INITIAL_RECORDS);
592        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
593
594        // Wire ML-DSA-65 signing from the devnet node's identity
595        crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)
596            .map_err(|e| DevnetError::Startup(format!("Failed to wire ML-DSA-65 signer: {e}")))?;
597
598        Ok(AntProtocol::new(
599            Arc::new(storage),
600            Arc::new(payment_verifier),
601            Arc::new(quote_generator),
602        ))
603    }
604
605    async fn start_node(&mut self, mut node: DevnetNode) -> Result<()> {
606        debug!("Starting node {} on port {}", node.index, node.port);
607        *node.state.write().await = NodeState::Starting;
608
609        let mut core_config = CoreNodeConfig::builder()
610            .port(node.port)
611            .local(true)
612            .max_message_size(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE)
613            .build()
614            .map_err(|e| DevnetError::Core(format!("Failed to create core config: {e}")))?;
615
616        // Load the node identity for app-level message signing.
617        let identity = NodeIdentity::load_from_file(
618            &node.data_dir.join(crate::config::NODE_IDENTITY_FILENAME),
619        )
620        .await
621        .map_err(|e| DevnetError::Core(format!("Failed to load node identity: {e}")))?;
622
623        core_config.node_identity = Some(Arc::new(identity));
624        core_config
625            .bootstrap_peers
626            .clone_from(&node.bootstrap_addrs);
627        core_config.diversity_config = Some(IPDiversityConfig::permissive());
628
629        let index = node.index;
630        let p2p_node = P2PNode::new(core_config)
631            .await
632            .map_err(|e| DevnetError::Startup(format!("Failed to create node {index}: {e}")))?;
633
634        p2p_node
635            .start()
636            .await
637            .map_err(|e| DevnetError::Startup(format!("Failed to start node {index}: {e}")))?;
638
639        node.p2p_node = Some(Arc::new(p2p_node));
640        *node.state.write().await = NodeState::Running;
641
642        if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
643            let mut events = p2p.subscribe_events();
644            let p2p_clone = Arc::clone(p2p);
645            let protocol_clone = Arc::clone(protocol);
646            let node_index = node.index;
647            node.protocol_task = Some(tokio::spawn(async move {
648                while let Ok(event) = events.recv().await {
649                    if let P2PEvent::Message {
650                        topic,
651                        source: Some(source),
652                        data,
653                    } = event
654                    {
655                        if topic == CHUNK_PROTOCOL_ID {
656                            debug!(
657                                "Node {node_index} received chunk protocol message from {source}"
658                            );
659                            let protocol = Arc::clone(&protocol_clone);
660                            let p2p = Arc::clone(&p2p_clone);
661                            tokio::spawn(async move {
662                                match protocol.handle_message(&data).await {
663                                    Ok(response) => {
664                                        if let Err(e) = p2p
665                                            .send_message(
666                                                &source,
667                                                CHUNK_PROTOCOL_ID,
668                                                response.to_vec(),
669                                                &[],
670                                            )
671                                            .await
672                                        {
673                                            warn!(
674                                                "Node {node_index} failed to send response to {source}: {e}"
675                                            );
676                                        }
677                                    }
678                                    Err(e) => {
679                                        warn!("Node {node_index} protocol handler error: {e}");
680                                    }
681                                }
682                            });
683                        }
684                    }
685                }
686            }));
687        }
688
689        debug!("Node {} started successfully", node.index);
690        self.nodes.push(node);
691        Ok(())
692    }
693
694    async fn wait_for_nodes_ready(&self, range: std::ops::Range<usize>) -> Result<()> {
695        let deadline = Instant::now() + self.config.node_startup_timeout;
696
697        for i in range {
698            while Instant::now() < deadline {
699                let node = self.nodes.get(i).ok_or_else(|| {
700                    DevnetError::Config(format!(
701                        "Node index {i} out of bounds (len: {})",
702                        self.nodes.len()
703                    ))
704                })?;
705                let state = node.state.read().await.clone();
706                match state {
707                    NodeState::Running | NodeState::Connected => break,
708                    NodeState::Failed(ref e) => {
709                        return Err(DevnetError::Startup(format!("Node {i} failed: {e}")));
710                    }
711                    _ => {
712                        tokio::time::sleep(Duration::from_millis(NODE_READY_POLL_INTERVAL_MS))
713                            .await;
714                    }
715                }
716            }
717        }
718        Ok(())
719    }
720
721    async fn wait_for_stabilization(&self) -> Result<()> {
722        let deadline = Instant::now() + self.config.stabilization_timeout;
723        let min_connections = self
724            .config
725            .bootstrap_count
726            .min(STABILIZATION_MIN_CONNECTIONS_CAP);
727
728        info!(
729            "Waiting for devnet stabilization (min {} connections per node)",
730            min_connections
731        );
732
733        while Instant::now() < deadline {
734            let mut all_connected = true;
735            let mut total_connections = 0;
736
737            for node in &self.nodes {
738                let peer_count = node.peer_count().await;
739                total_connections += peer_count;
740
741                if peer_count < min_connections {
742                    all_connected = false;
743                }
744            }
745
746            if all_connected {
747                info!("Devnet stabilized: {} total connections", total_connections);
748                return Ok(());
749            }
750
751            debug!(
752                "Waiting for stabilization: {} total connections",
753                total_connections
754            );
755            tokio::time::sleep(Duration::from_secs(STABILIZATION_POLL_INTERVAL_SECS)).await;
756        }
757
758        Err(DevnetError::Stabilization(
759            "Devnet failed to stabilize within timeout".to_string(),
760        ))
761    }
762
763    fn start_health_monitor(&mut self) {
764        let nodes: Vec<Arc<P2PNode>> = self
765            .nodes
766            .iter()
767            .filter_map(|n| n.p2p_node.clone())
768            .collect();
769        let shutdown = self.shutdown.clone();
770
771        self.health_monitor = Some(tokio::spawn(async move {
772            let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS);
773
774            loop {
775                tokio::select! {
776                    () = shutdown.cancelled() => break,
777                    () = tokio::time::sleep(check_interval) => {
778                        for (i, node) in nodes.iter().enumerate() {
779                            if !node.is_running() {
780                                warn!("Node {} appears unhealthy", i);
781                            }
782                        }
783                    }
784                }
785            }
786        }));
787    }
788}
789
790impl Drop for Devnet {
791    fn drop(&mut self) {
792        self.shutdown.cancel();
793        if let Some(handle) = self.health_monitor.take() {
794            handle.abort();
795        }
796    }
797}