Skip to main content

ant_node/
devnet.rs

1//! Local devnet infrastructure for spawning and managing multiple nodes.
2//!
3//! This module provides a local, in-process devnet suitable for running
4//! multi-node networks on a single machine.
5
6use crate::ant_protocol::CHUNK_PROTOCOL_ID;
7use crate::config::{default_root_dir, NODES_SUBDIR, NODE_IDENTITY_FILENAME};
8use crate::logging::{debug, info, warn};
9use crate::payment::{
10    EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
11    QuotingMetricsTracker,
12};
13use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
14use evmlib::Network as EvmNetwork;
15use evmlib::RewardsAddress;
16use rand::Rng;
17use saorsa_core::identity::NodeIdentity;
18use saorsa_core::{
19    IPDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId,
20};
21use serde::{Deserialize, Serialize};
22use std::net::{Ipv4Addr, SocketAddr};
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::time::Duration;
26use tokio::sync::RwLock;
27use tokio::task::JoinHandle;
28use tokio::time::Instant;
29use tokio_util::sync::CancellationToken;
30
31// =============================================================================
32// Devnet Constants
33// =============================================================================
34
35/// Minimum port for random devnet allocation.
36pub const DEVNET_PORT_RANGE_MIN: u16 = 20_000;
37
38/// Maximum port for random devnet allocation.
39pub const DEVNET_PORT_RANGE_MAX: u16 = 60_000;
40
41// =============================================================================
42// Default Timing Constants
43// =============================================================================
44
45/// Default delay between spawning nodes (milliseconds).
46const DEFAULT_SPAWN_DELAY_MS: u64 = 200;
47
48/// Default timeout for network stabilization (seconds).
49const DEFAULT_STABILIZATION_TIMEOUT_SECS: u64 = 120;
50
51/// Default timeout for single node startup (seconds).
52const DEFAULT_NODE_STARTUP_TIMEOUT_SECS: u64 = 30;
53
54/// Stabilization timeout for minimal network (seconds).
55const MINIMAL_STABILIZATION_TIMEOUT_SECS: u64 = 30;
56
57/// Stabilization timeout for small network (seconds).
58const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;
59
60/// Polling interval when waiting for individual nodes to become ready (milliseconds).
61const NODE_READY_POLL_INTERVAL_MS: u64 = 100;
62
63/// Polling interval when waiting for network stabilization (seconds).
64const STABILIZATION_POLL_INTERVAL_SECS: u64 = 1;
65
66/// Maximum minimum connections required per node during stabilization.
67const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3;
68
69/// Health monitor check interval (seconds).
70const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;
71
72// =============================================================================
73// AntProtocol Devnet Configuration
74// =============================================================================
75
76/// Payment cache capacity for devnet nodes.
77const DEVNET_PAYMENT_CACHE_CAPACITY: usize = 1000;
78
79/// Devnet rewards address (20 bytes, all 0x01).
80const DEVNET_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];
81
82/// Initial records for quoting metrics (devnet value).
83const DEVNET_INITIAL_RECORDS: usize = 1000;
84
85// =============================================================================
86// Default Node Counts
87// =============================================================================
88
89/// Default number of nodes in a full devnet.
90pub const DEFAULT_NODE_COUNT: usize = 25;
91
92/// Default number of bootstrap nodes.
93pub const DEFAULT_BOOTSTRAP_COUNT: usize = 3;
94
95/// Number of nodes in a minimal devnet.
96pub const MINIMAL_NODE_COUNT: usize = 5;
97
98/// Number of bootstrap nodes in a minimal network.
99pub const MINIMAL_BOOTSTRAP_COUNT: usize = 2;
100
101/// Number of nodes in a small devnet.
102pub const SMALL_NODE_COUNT: usize = 10;
103
104/// Error type for devnet operations.
105#[derive(Debug, thiserror::Error)]
106pub enum DevnetError {
107    /// Configuration error
108    #[error("Configuration error: {0}")]
109    Config(String),
110
111    /// Node startup error
112    #[error("Node startup error: {0}")]
113    Startup(String),
114
115    /// Network stabilization error
116    #[error("Network stabilization error: {0}")]
117    Stabilization(String),
118
119    /// IO error
120    #[error("IO error: {0}")]
121    Io(#[from] std::io::Error),
122
123    /// Core error
124    #[error("Core error: {0}")]
125    Core(String),
126}
127
128/// Result type for devnet operations.
129pub type Result<T> = std::result::Result<T, DevnetError>;
130
131/// Configuration for the devnet.
132///
133/// Each configuration is automatically isolated with unique ports and
134/// data directories to prevent collisions when running multiple devnets.
135#[derive(Debug, Clone)]
136pub struct DevnetConfig {
137    /// Number of nodes to spawn (default: 25).
138    pub node_count: usize,
139
140    /// Base port for node allocation (0 = auto).
141    pub base_port: u16,
142
143    /// Number of bootstrap nodes (first N nodes, default: 3).
144    pub bootstrap_count: usize,
145
146    /// Root directory for devnet data.
147    pub data_dir: PathBuf,
148
149    /// Delay between node spawns (default: 200ms).
150    pub spawn_delay: Duration,
151
152    /// Timeout for network stabilization (default: 120s).
153    pub stabilization_timeout: Duration,
154
155    /// Timeout for single node startup (default: 30s).
156    pub node_startup_timeout: Duration,
157
158    /// Enable verbose logging for devnet nodes.
159    pub enable_node_logging: bool,
160
161    /// Whether to remove the data directory on shutdown.
162    pub cleanup_data_dir: bool,
163
164    /// Optional EVM network for payment verification.
165    /// When `Some`, nodes will use this network (e.g. Anvil testnet) for
166    /// on-chain verification. Defaults to Arbitrum One when `None`.
167    pub evm_network: Option<EvmNetwork>,
168}
169
170impl Default for DevnetConfig {
171    fn default() -> Self {
172        let mut rng = rand::thread_rng();
173
174        #[allow(clippy::cast_possible_truncation)] // DEFAULT_NODE_COUNT is 25, always fits u16
175        let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(DEFAULT_NODE_COUNT as u16);
176        let base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
177
178        Self {
179            node_count: DEFAULT_NODE_COUNT,
180            base_port,
181            bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
182            data_dir: default_root_dir(),
183            spawn_delay: Duration::from_millis(DEFAULT_SPAWN_DELAY_MS),
184            stabilization_timeout: Duration::from_secs(DEFAULT_STABILIZATION_TIMEOUT_SECS),
185            node_startup_timeout: Duration::from_secs(DEFAULT_NODE_STARTUP_TIMEOUT_SECS),
186            enable_node_logging: false,
187            cleanup_data_dir: true,
188            evm_network: None,
189        }
190    }
191}
192
193impl DevnetConfig {
194    /// Minimal devnet preset (5 nodes).
195    #[must_use]
196    pub fn minimal() -> Self {
197        Self {
198            node_count: MINIMAL_NODE_COUNT,
199            bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
200            stabilization_timeout: Duration::from_secs(MINIMAL_STABILIZATION_TIMEOUT_SECS),
201            ..Self::default()
202        }
203    }
204
205    /// Small devnet preset (10 nodes).
206    #[must_use]
207    pub fn small() -> Self {
208        Self {
209            node_count: SMALL_NODE_COUNT,
210            bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
211            stabilization_timeout: Duration::from_secs(SMALL_STABILIZATION_TIMEOUT_SECS),
212            ..Self::default()
213        }
214    }
215}
216
217/// Devnet manifest for client discovery.
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct DevnetManifest {
220    /// Base port for nodes.
221    pub base_port: u16,
222    /// Node count.
223    pub node_count: usize,
224    /// Bootstrap addresses.
225    pub bootstrap: Vec<MultiAddr>,
226    /// Data directory.
227    pub data_dir: PathBuf,
228    /// Creation time in RFC3339.
229    pub created_at: String,
230    /// EVM configuration (present when EVM payment enforcement is enabled).
231    #[serde(default, skip_serializing_if = "Option::is_none")]
232    pub evm: Option<DevnetEvmInfo>,
233}
234
235/// EVM configuration info included in the devnet manifest.
236#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct DevnetEvmInfo {
238    /// Anvil RPC URL.
239    pub rpc_url: String,
240    /// Funded wallet private key (hex-encoded with 0x prefix).
241    pub wallet_private_key: String,
242    /// Payment token contract address.
243    pub payment_token_address: String,
244    /// Unified payment vault contract address (handles both single-node and merkle payments).
245    pub payment_vault_address: String,
246}
247
248/// Network state for devnet startup lifecycle.
249#[derive(Debug, Clone)]
250pub enum NetworkState {
251    /// Not started.
252    Uninitialized,
253    /// Bootstrapping nodes are starting.
254    BootstrappingPhase,
255    /// Regular nodes are starting.
256    NodeSpawningPhase,
257    /// Waiting for stabilization.
258    Stabilizing,
259    /// Network is ready.
260    Ready,
261    /// Shutting down.
262    ShuttingDown,
263    /// Stopped.
264    Stopped,
265}
266
267/// Node state for devnet nodes.
268#[derive(Debug, Clone)]
269pub enum NodeState {
270    /// Not started yet.
271    Pending,
272    /// Starting up.
273    Starting,
274    /// Running.
275    Running,
276    /// Connected to peers.
277    Connected,
278    /// Stopped.
279    Stopped,
280    /// Failed to start.
281    Failed(String),
282}
283
284/// A single devnet node instance.
285#[allow(dead_code)]
286pub struct DevnetNode {
287    index: usize,
288    label: String,
289    peer_id: PeerId,
290    port: u16,
291    data_dir: PathBuf,
292    p2p_node: Option<Arc<P2PNode>>,
293    ant_protocol: Option<Arc<AntProtocol>>,
294    is_bootstrap: bool,
295    state: Arc<RwLock<NodeState>>,
296    bootstrap_addrs: Vec<MultiAddr>,
297    protocol_task: Option<JoinHandle<()>>,
298}
299
300impl DevnetNode {
301    /// Get the node's peer count.
302    pub async fn peer_count(&self) -> usize {
303        if let Some(ref node) = self.p2p_node {
304            node.peer_count().await
305        } else {
306            0
307        }
308    }
309}
310
311/// A local devnet composed of multiple nodes.
312pub struct Devnet {
313    config: DevnetConfig,
314    nodes: Vec<DevnetNode>,
315    shutdown: CancellationToken,
316    state: Arc<RwLock<NetworkState>>,
317    health_monitor: Option<JoinHandle<()>>,
318}
319
320impl Devnet {
321    /// Create a new devnet with the given configuration.
322    ///
323    /// # Errors
324    ///
325    /// Returns `DevnetError::Config` if the configuration is invalid (e.g. bootstrap
326    /// count exceeds node count or port range overflow).
327    /// Returns `DevnetError::Io` if the data directory cannot be created.
328    pub async fn new(mut config: DevnetConfig) -> Result<Self> {
329        if config.bootstrap_count >= config.node_count {
330            return Err(DevnetError::Config(
331                "Bootstrap count must be less than node count".to_string(),
332            ));
333        }
334
335        if config.bootstrap_count == 0 {
336            return Err(DevnetError::Config(
337                "At least one bootstrap node is required".to_string(),
338            ));
339        }
340
341        let node_count = config.node_count;
342        let node_count_u16 = u16::try_from(node_count).map_err(|_| {
343            DevnetError::Config(format!("Node count {node_count} exceeds u16::MAX"))
344        })?;
345
346        if config.base_port == 0 {
347            let mut rng = rand::thread_rng();
348            let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(node_count_u16);
349            config.base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
350        }
351
352        let base_port = config.base_port;
353        let max_port = base_port
354            .checked_add(node_count_u16)
355            .ok_or_else(|| {
356                DevnetError::Config(format!(
357                    "Port range overflow: base_port {base_port} + node_count {node_count} exceeds u16::MAX"
358                ))
359            })?;
360        if max_port > DEVNET_PORT_RANGE_MAX {
361            return Err(DevnetError::Config(format!(
362                "Port range overflow: max port {max_port} exceeds DEVNET_PORT_RANGE_MAX {DEVNET_PORT_RANGE_MAX}"
363            )));
364        }
365
366        tokio::fs::create_dir_all(&config.data_dir).await?;
367
368        Ok(Self {
369            config,
370            nodes: Vec::new(),
371            shutdown: CancellationToken::new(),
372            state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
373            health_monitor: None,
374        })
375    }
376
377    /// Start the devnet.
378    ///
379    /// # Errors
380    ///
381    /// Returns `DevnetError::Startup` if any node fails to start, or
382    /// `DevnetError::Stabilization` if the network does not stabilize within the timeout.
383    pub async fn start(&mut self) -> Result<()> {
384        info!(
385            "Starting devnet with {} nodes ({} bootstrap)",
386            self.config.node_count, self.config.bootstrap_count
387        );
388
389        *self.state.write().await = NetworkState::BootstrappingPhase;
390        self.start_bootstrap_nodes().await?;
391
392        *self.state.write().await = NetworkState::NodeSpawningPhase;
393        self.start_regular_nodes().await?;
394
395        *self.state.write().await = NetworkState::Stabilizing;
396        self.wait_for_stabilization().await?;
397
398        self.start_health_monitor();
399
400        *self.state.write().await = NetworkState::Ready;
401        info!("Devnet is ready");
402        Ok(())
403    }
404
405    /// Shutdown the devnet.
406    ///
407    /// # Errors
408    ///
409    /// Returns `DevnetError::Io` if the data directory cleanup fails.
410    pub async fn shutdown(&mut self) -> Result<()> {
411        info!("Shutting down devnet");
412        *self.state.write().await = NetworkState::ShuttingDown;
413
414        self.shutdown.cancel();
415
416        if let Some(handle) = self.health_monitor.take() {
417            handle.abort();
418        }
419
420        let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
421        for node in self.nodes.iter_mut().rev() {
422            debug!("Stopping node {}", node.index);
423            if let Some(handle) = node.protocol_task.take() {
424                handle.abort();
425            }
426
427            let node_index = node.index;
428            let node_state = Arc::clone(&node.state);
429            let p2p_node = node.p2p_node.take();
430
431            shutdown_futures.push(async move {
432                if let Some(p2p) = p2p_node {
433                    if let Err(e) = p2p.shutdown().await {
434                        warn!("Error shutting down node {node_index}: {e}");
435                    }
436                }
437                *node_state.write().await = NodeState::Stopped;
438            });
439        }
440        futures::future::join_all(shutdown_futures).await;
441
442        if self.config.cleanup_data_dir {
443            if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await {
444                warn!("Failed to cleanup devnet data directory: {e}");
445            }
446        }
447
448        *self.state.write().await = NetworkState::Stopped;
449        info!("Devnet shutdown complete");
450        Ok(())
451    }
452
453    /// Get devnet configuration.
454    #[must_use]
455    pub fn config(&self) -> &DevnetConfig {
456        &self.config
457    }
458
459    /// Get bootstrap addresses.
460    #[must_use]
461    pub fn bootstrap_addrs(&self) -> Vec<MultiAddr> {
462        self.nodes
463            .iter()
464            .take(self.config.bootstrap_count)
465            .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
466            .collect()
467    }
468
469    async fn start_bootstrap_nodes(&mut self) -> Result<()> {
470        info!("Starting {} bootstrap nodes", self.config.bootstrap_count);
471
472        for i in 0..self.config.bootstrap_count {
473            let node = self.create_node(i, true, vec![]).await?;
474            self.start_node(node).await?;
475            tokio::time::sleep(self.config.spawn_delay).await;
476        }
477
478        self.wait_for_nodes_ready(0..self.config.bootstrap_count)
479            .await?;
480
481        info!("All bootstrap nodes are ready");
482        Ok(())
483    }
484
485    async fn start_regular_nodes(&mut self) -> Result<()> {
486        let regular_count = self.config.node_count - self.config.bootstrap_count;
487        info!("Starting {} regular nodes", regular_count);
488
489        let bootstrap_addrs: Vec<MultiAddr> = self
490            .nodes
491            .get(0..self.config.bootstrap_count)
492            .ok_or_else(|| {
493                DevnetError::Config(format!(
494                    "Bootstrap count {} exceeds nodes length {}",
495                    self.config.bootstrap_count,
496                    self.nodes.len()
497                ))
498            })?
499            .iter()
500            .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
501            .collect();
502
503        for i in self.config.bootstrap_count..self.config.node_count {
504            let node = self.create_node(i, false, bootstrap_addrs.clone()).await?;
505            self.start_node(node).await?;
506            tokio::time::sleep(self.config.spawn_delay).await;
507        }
508
509        info!("All regular nodes started");
510        Ok(())
511    }
512
513    async fn create_node(
514        &self,
515        index: usize,
516        is_bootstrap: bool,
517        bootstrap_addrs: Vec<MultiAddr>,
518    ) -> Result<DevnetNode> {
519        let index_u16 = u16::try_from(index)
520            .map_err(|_| DevnetError::Config(format!("Node index {index} exceeds u16::MAX")))?;
521        let port = self.config.base_port + index_u16;
522
523        // Generate identity first so we can use peer_id as the directory name
524        let identity = NodeIdentity::generate()
525            .map_err(|e| DevnetError::Core(format!("Failed to generate node identity: {e}")))?;
526        let peer_id = *identity.peer_id();
527        let label = format!("devnet_node_{index}");
528        let data_dir = self
529            .config
530            .data_dir
531            .join(NODES_SUBDIR)
532            .join(peer_id.to_hex());
533
534        tokio::fs::create_dir_all(&data_dir).await?;
535
536        identity
537            .save_to_file(&data_dir.join(NODE_IDENTITY_FILENAME))
538            .await
539            .map_err(|e| DevnetError::Core(format!("Failed to save node identity: {e}")))?;
540
541        let ant_protocol = Self::create_ant_protocol(&data_dir, &identity, &self.config).await?;
542
543        Ok(DevnetNode {
544            index,
545            label,
546            peer_id,
547            port,
548            data_dir,
549            p2p_node: None,
550            ant_protocol: Some(Arc::new(ant_protocol)),
551            is_bootstrap,
552            state: Arc::new(RwLock::new(NodeState::Pending)),
553            bootstrap_addrs,
554            protocol_task: None,
555        })
556    }
557
558    async fn create_ant_protocol(
559        data_dir: &std::path::Path,
560        identity: &NodeIdentity,
561        config: &DevnetConfig,
562    ) -> Result<AntProtocol> {
563        let storage_config = LmdbStorageConfig {
564            root_dir: data_dir.to_path_buf(),
565            verify_on_read: true,
566            ..LmdbStorageConfig::default()
567        };
568        let storage = LmdbStorage::new(storage_config)
569            .await
570            .map_err(|e| DevnetError::Core(format!("Failed to create LMDB storage: {e}")))?;
571
572        let evm_config = EvmVerifierConfig {
573            network: config
574                .evm_network
575                .clone()
576                .unwrap_or(EvmNetwork::ArbitrumOne),
577        };
578
579        let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS);
580        let payment_config = PaymentVerifierConfig {
581            evm: evm_config,
582            cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
583            local_rewards_address: rewards_address,
584        };
585        let payment_verifier = PaymentVerifier::new(payment_config);
586        let metrics_tracker = QuotingMetricsTracker::new(DEVNET_INITIAL_RECORDS);
587        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
588
589        // Wire ML-DSA-65 signing from the devnet node's identity
590        crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)
591            .map_err(|e| DevnetError::Startup(format!("Failed to wire ML-DSA-65 signer: {e}")))?;
592
593        Ok(AntProtocol::new(
594            Arc::new(storage),
595            Arc::new(payment_verifier),
596            Arc::new(quote_generator),
597        ))
598    }
599
600    async fn start_node(&mut self, mut node: DevnetNode) -> Result<()> {
601        debug!("Starting node {} on port {}", node.index, node.port);
602        *node.state.write().await = NodeState::Starting;
603
604        let mut core_config = CoreNodeConfig::builder()
605            .port(node.port)
606            .local(true)
607            .max_message_size(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE)
608            .build()
609            .map_err(|e| DevnetError::Core(format!("Failed to create core config: {e}")))?;
610
611        // Load the node identity for app-level message signing.
612        let identity = NodeIdentity::load_from_file(
613            &node.data_dir.join(crate::config::NODE_IDENTITY_FILENAME),
614        )
615        .await
616        .map_err(|e| DevnetError::Core(format!("Failed to load node identity: {e}")))?;
617
618        core_config.node_identity = Some(Arc::new(identity));
619        core_config
620            .bootstrap_peers
621            .clone_from(&node.bootstrap_addrs);
622        core_config.diversity_config = Some(IPDiversityConfig::permissive());
623
624        let index = node.index;
625        let p2p_node = P2PNode::new(core_config)
626            .await
627            .map_err(|e| DevnetError::Startup(format!("Failed to create node {index}: {e}")))?;
628
629        p2p_node
630            .start()
631            .await
632            .map_err(|e| DevnetError::Startup(format!("Failed to start node {index}: {e}")))?;
633
634        node.p2p_node = Some(Arc::new(p2p_node));
635        *node.state.write().await = NodeState::Running;
636
637        if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
638            let mut events = p2p.subscribe_events();
639            let p2p_clone = Arc::clone(p2p);
640            let protocol_clone = Arc::clone(protocol);
641            let node_index = node.index;
642            node.protocol_task = Some(tokio::spawn(async move {
643                while let Ok(event) = events.recv().await {
644                    if let P2PEvent::Message {
645                        topic,
646                        source: Some(source),
647                        data,
648                    } = event
649                    {
650                        if topic == CHUNK_PROTOCOL_ID {
651                            debug!(
652                                "Node {node_index} received chunk protocol message from {source}"
653                            );
654                            let protocol = Arc::clone(&protocol_clone);
655                            let p2p = Arc::clone(&p2p_clone);
656                            tokio::spawn(async move {
657                                match protocol.try_handle_request(&data).await {
658                                    Ok(Some(response)) => {
659                                        if let Err(e) = p2p
660                                            .send_message(
661                                                &source,
662                                                CHUNK_PROTOCOL_ID,
663                                                response.to_vec(),
664                                                &[],
665                                            )
666                                            .await
667                                        {
668                                            warn!(
669                                                "Node {node_index} failed to send response to {source}: {e}"
670                                            );
671                                        }
672                                    }
673                                    Ok(None) => {}
674                                    Err(e) => {
675                                        warn!("Node {node_index} protocol handler error: {e}");
676                                    }
677                                }
678                            });
679                        }
680                    }
681                }
682            }));
683        }
684
685        debug!("Node {} started successfully", node.index);
686        self.nodes.push(node);
687        Ok(())
688    }
689
690    async fn wait_for_nodes_ready(&self, range: std::ops::Range<usize>) -> Result<()> {
691        let deadline = Instant::now() + self.config.node_startup_timeout;
692
693        for i in range {
694            while Instant::now() < deadline {
695                let node = self.nodes.get(i).ok_or_else(|| {
696                    DevnetError::Config(format!(
697                        "Node index {i} out of bounds (len: {})",
698                        self.nodes.len()
699                    ))
700                })?;
701                let state = node.state.read().await.clone();
702                match state {
703                    NodeState::Running | NodeState::Connected => break,
704                    NodeState::Failed(ref e) => {
705                        return Err(DevnetError::Startup(format!("Node {i} failed: {e}")));
706                    }
707                    _ => {
708                        tokio::time::sleep(Duration::from_millis(NODE_READY_POLL_INTERVAL_MS))
709                            .await;
710                    }
711                }
712            }
713        }
714        Ok(())
715    }
716
717    async fn wait_for_stabilization(&self) -> Result<()> {
718        let deadline = Instant::now() + self.config.stabilization_timeout;
719        let min_connections = self
720            .config
721            .bootstrap_count
722            .min(STABILIZATION_MIN_CONNECTIONS_CAP);
723
724        info!(
725            "Waiting for devnet stabilization (min {} connections per node)",
726            min_connections
727        );
728
729        while Instant::now() < deadline {
730            let mut all_connected = true;
731            let mut total_connections = 0;
732
733            for node in &self.nodes {
734                let peer_count = node.peer_count().await;
735                total_connections += peer_count;
736
737                if peer_count < min_connections {
738                    all_connected = false;
739                }
740            }
741
742            if all_connected {
743                info!("Devnet stabilized: {} total connections", total_connections);
744                return Ok(());
745            }
746
747            debug!(
748                "Waiting for stabilization: {} total connections",
749                total_connections
750            );
751            tokio::time::sleep(Duration::from_secs(STABILIZATION_POLL_INTERVAL_SECS)).await;
752        }
753
754        Err(DevnetError::Stabilization(
755            "Devnet failed to stabilize within timeout".to_string(),
756        ))
757    }
758
759    fn start_health_monitor(&mut self) {
760        let nodes: Vec<Arc<P2PNode>> = self
761            .nodes
762            .iter()
763            .filter_map(|n| n.p2p_node.clone())
764            .collect();
765        let shutdown = self.shutdown.clone();
766
767        self.health_monitor = Some(tokio::spawn(async move {
768            let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS);
769
770            loop {
771                tokio::select! {
772                    () = shutdown.cancelled() => break,
773                    () = tokio::time::sleep(check_interval) => {
774                        for (i, node) in nodes.iter().enumerate() {
775                            if !node.is_running() {
776                                warn!("Node {} appears unhealthy", i);
777                            }
778                        }
779                    }
780                }
781            }
782        }));
783    }
784}
785
786impl Drop for Devnet {
787    fn drop(&mut self) {
788        self.shutdown.cancel();
789        if let Some(handle) = self.health_monitor.take() {
790            handle.abort();
791        }
792    }
793}