Skip to main content

ant_node/
devnet.rs

1//! Local devnet infrastructure for spawning and managing multiple nodes.
2//!
3//! This module provides a local, in-process devnet suitable for running
4//! multi-node networks on a single machine.
5
6use crate::ant_protocol::CHUNK_PROTOCOL_ID;
7use crate::config::{default_root_dir, NODES_SUBDIR, NODE_IDENTITY_FILENAME};
8use crate::payment::{
9    EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
10    QuotingMetricsTracker,
11};
12use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
13use evmlib::Network as EvmNetwork;
14use evmlib::RewardsAddress;
15use rand::Rng;
16use saorsa_core::identity::NodeIdentity;
17use saorsa_core::{
18    IPDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId,
19};
20use serde::{Deserialize, Serialize};
21use std::net::{Ipv4Addr, SocketAddr};
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::sync::RwLock;
26use tokio::task::JoinHandle;
27use tokio::time::Instant;
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, info, warn};
30
31// =============================================================================
32// Devnet Constants
33// =============================================================================
34
35/// Minimum port for random devnet allocation.
36pub const DEVNET_PORT_RANGE_MIN: u16 = 20_000;
37
38/// Maximum port for random devnet allocation.
39pub const DEVNET_PORT_RANGE_MAX: u16 = 60_000;
40
41// =============================================================================
42// Default Timing Constants
43// =============================================================================
44
45/// Default delay between spawning nodes (milliseconds).
46const DEFAULT_SPAWN_DELAY_MS: u64 = 200;
47
48/// Default timeout for network stabilization (seconds).
49const DEFAULT_STABILIZATION_TIMEOUT_SECS: u64 = 120;
50
51/// Default timeout for single node startup (seconds).
52const DEFAULT_NODE_STARTUP_TIMEOUT_SECS: u64 = 30;
53
54/// Stabilization timeout for minimal network (seconds).
55const MINIMAL_STABILIZATION_TIMEOUT_SECS: u64 = 30;
56
57/// Stabilization timeout for small network (seconds).
58const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;
59
60/// Polling interval when waiting for individual nodes to become ready (milliseconds).
61const NODE_READY_POLL_INTERVAL_MS: u64 = 100;
62
63/// Polling interval when waiting for network stabilization (seconds).
64const STABILIZATION_POLL_INTERVAL_SECS: u64 = 1;
65
66/// Maximum minimum connections required per node during stabilization.
67const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3;
68
69/// Health monitor check interval (seconds).
70const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;
71
72// =============================================================================
73// AntProtocol Devnet Configuration
74// =============================================================================
75
76/// Payment cache capacity for devnet nodes.
77const DEVNET_PAYMENT_CACHE_CAPACITY: usize = 1000;
78
79/// Devnet rewards address (20 bytes, all 0x01).
80const DEVNET_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];
81
82/// Max records for quoting metrics (devnet value).
83const DEVNET_MAX_RECORDS: usize = 100_000;
84
85/// Initial records for quoting metrics (devnet value).
86const DEVNET_INITIAL_RECORDS: usize = 1000;
87
88// =============================================================================
89// Default Node Counts
90// =============================================================================
91
92/// Default number of nodes in a full devnet.
93pub const DEFAULT_NODE_COUNT: usize = 25;
94
95/// Default number of bootstrap nodes.
96pub const DEFAULT_BOOTSTRAP_COUNT: usize = 3;
97
98/// Number of nodes in a minimal devnet.
99pub const MINIMAL_NODE_COUNT: usize = 5;
100
101/// Number of bootstrap nodes in a minimal network.
102pub const MINIMAL_BOOTSTRAP_COUNT: usize = 2;
103
104/// Number of nodes in a small devnet.
105pub const SMALL_NODE_COUNT: usize = 10;
106
107/// Error type for devnet operations.
108#[derive(Debug, thiserror::Error)]
109pub enum DevnetError {
110    /// Configuration error
111    #[error("Configuration error: {0}")]
112    Config(String),
113
114    /// Node startup error
115    #[error("Node startup error: {0}")]
116    Startup(String),
117
118    /// Network stabilization error
119    #[error("Network stabilization error: {0}")]
120    Stabilization(String),
121
122    /// IO error
123    #[error("IO error: {0}")]
124    Io(#[from] std::io::Error),
125
126    /// Core error
127    #[error("Core error: {0}")]
128    Core(String),
129}
130
131/// Result type for devnet operations.
132pub type Result<T> = std::result::Result<T, DevnetError>;
133
134/// Configuration for the devnet.
135///
136/// Each configuration is automatically isolated with unique ports and
137/// data directories to prevent collisions when running multiple devnets.
138#[derive(Debug, Clone)]
139pub struct DevnetConfig {
140    /// Number of nodes to spawn (default: 25).
141    pub node_count: usize,
142
143    /// Base port for node allocation (0 = auto).
144    pub base_port: u16,
145
146    /// Number of bootstrap nodes (first N nodes, default: 3).
147    pub bootstrap_count: usize,
148
149    /// Root directory for devnet data.
150    pub data_dir: PathBuf,
151
152    /// Delay between node spawns (default: 200ms).
153    pub spawn_delay: Duration,
154
155    /// Timeout for network stabilization (default: 120s).
156    pub stabilization_timeout: Duration,
157
158    /// Timeout for single node startup (default: 30s).
159    pub node_startup_timeout: Duration,
160
161    /// Enable verbose logging for devnet nodes.
162    pub enable_node_logging: bool,
163
164    /// Whether to remove the data directory on shutdown.
165    pub cleanup_data_dir: bool,
166
167    /// Optional EVM network for payment verification.
168    /// When `Some`, nodes will use this network (e.g. Anvil testnet) for
169    /// on-chain verification. Defaults to Arbitrum One when `None`.
170    pub evm_network: Option<EvmNetwork>,
171}
172
173impl Default for DevnetConfig {
174    fn default() -> Self {
175        let mut rng = rand::thread_rng();
176
177        #[allow(clippy::cast_possible_truncation)] // DEFAULT_NODE_COUNT is 25, always fits u16
178        let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(DEFAULT_NODE_COUNT as u16);
179        let base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
180
181        Self {
182            node_count: DEFAULT_NODE_COUNT,
183            base_port,
184            bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
185            data_dir: default_root_dir(),
186            spawn_delay: Duration::from_millis(DEFAULT_SPAWN_DELAY_MS),
187            stabilization_timeout: Duration::from_secs(DEFAULT_STABILIZATION_TIMEOUT_SECS),
188            node_startup_timeout: Duration::from_secs(DEFAULT_NODE_STARTUP_TIMEOUT_SECS),
189            enable_node_logging: false,
190            cleanup_data_dir: true,
191            evm_network: None,
192        }
193    }
194}
195
196impl DevnetConfig {
197    /// Minimal devnet preset (5 nodes).
198    #[must_use]
199    pub fn minimal() -> Self {
200        Self {
201            node_count: MINIMAL_NODE_COUNT,
202            bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
203            stabilization_timeout: Duration::from_secs(MINIMAL_STABILIZATION_TIMEOUT_SECS),
204            ..Self::default()
205        }
206    }
207
208    /// Small devnet preset (10 nodes).
209    #[must_use]
210    pub fn small() -> Self {
211        Self {
212            node_count: SMALL_NODE_COUNT,
213            bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
214            stabilization_timeout: Duration::from_secs(SMALL_STABILIZATION_TIMEOUT_SECS),
215            ..Self::default()
216        }
217    }
218}
219
220/// Devnet manifest for client discovery.
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct DevnetManifest {
223    /// Base port for nodes.
224    pub base_port: u16,
225    /// Node count.
226    pub node_count: usize,
227    /// Bootstrap addresses.
228    pub bootstrap: Vec<MultiAddr>,
229    /// Data directory.
230    pub data_dir: PathBuf,
231    /// Creation time in RFC3339.
232    pub created_at: String,
233    /// EVM configuration (present when EVM payment enforcement is enabled).
234    #[serde(default, skip_serializing_if = "Option::is_none")]
235    pub evm: Option<DevnetEvmInfo>,
236}
237
238/// EVM configuration info included in the devnet manifest.
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct DevnetEvmInfo {
241    /// Anvil RPC URL.
242    pub rpc_url: String,
243    /// Funded wallet private key (hex-encoded with 0x prefix).
244    pub wallet_private_key: String,
245    /// Payment token contract address.
246    pub payment_token_address: String,
247    /// Data payments contract address.
248    pub data_payments_address: String,
249    /// Merkle payments contract address (for batch payments).
250    #[serde(default, skip_serializing_if = "Option::is_none")]
251    pub merkle_payments_address: Option<String>,
252}
253
254/// Network state for devnet startup lifecycle.
255#[derive(Debug, Clone)]
256pub enum NetworkState {
257    /// Not started.
258    Uninitialized,
259    /// Bootstrapping nodes are starting.
260    BootstrappingPhase,
261    /// Regular nodes are starting.
262    NodeSpawningPhase,
263    /// Waiting for stabilization.
264    Stabilizing,
265    /// Network is ready.
266    Ready,
267    /// Shutting down.
268    ShuttingDown,
269    /// Stopped.
270    Stopped,
271}
272
273/// Node state for devnet nodes.
274#[derive(Debug, Clone)]
275pub enum NodeState {
276    /// Not started yet.
277    Pending,
278    /// Starting up.
279    Starting,
280    /// Running.
281    Running,
282    /// Connected to peers.
283    Connected,
284    /// Stopped.
285    Stopped,
286    /// Failed to start.
287    Failed(String),
288}
289
290/// A single devnet node instance.
291#[allow(dead_code)]
292pub struct DevnetNode {
293    index: usize,
294    label: String,
295    peer_id: PeerId,
296    port: u16,
297    data_dir: PathBuf,
298    p2p_node: Option<Arc<P2PNode>>,
299    ant_protocol: Option<Arc<AntProtocol>>,
300    is_bootstrap: bool,
301    state: Arc<RwLock<NodeState>>,
302    bootstrap_addrs: Vec<MultiAddr>,
303    protocol_task: Option<JoinHandle<()>>,
304}
305
306impl DevnetNode {
307    /// Get the node's peer count.
308    pub async fn peer_count(&self) -> usize {
309        if let Some(ref node) = self.p2p_node {
310            node.peer_count().await
311        } else {
312            0
313        }
314    }
315}
316
317/// A local devnet composed of multiple nodes.
318pub struct Devnet {
319    config: DevnetConfig,
320    nodes: Vec<DevnetNode>,
321    shutdown: CancellationToken,
322    state: Arc<RwLock<NetworkState>>,
323    health_monitor: Option<JoinHandle<()>>,
324}
325
326impl Devnet {
327    /// Create a new devnet with the given configuration.
328    ///
329    /// # Errors
330    ///
331    /// Returns `DevnetError::Config` if the configuration is invalid (e.g. bootstrap
332    /// count exceeds node count or port range overflow).
333    /// Returns `DevnetError::Io` if the data directory cannot be created.
334    pub async fn new(mut config: DevnetConfig) -> Result<Self> {
335        if config.bootstrap_count >= config.node_count {
336            return Err(DevnetError::Config(
337                "Bootstrap count must be less than node count".to_string(),
338            ));
339        }
340
341        if config.bootstrap_count == 0 {
342            return Err(DevnetError::Config(
343                "At least one bootstrap node is required".to_string(),
344            ));
345        }
346
347        let node_count = config.node_count;
348        let node_count_u16 = u16::try_from(node_count).map_err(|_| {
349            DevnetError::Config(format!("Node count {node_count} exceeds u16::MAX"))
350        })?;
351
352        if config.base_port == 0 {
353            let mut rng = rand::thread_rng();
354            let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(node_count_u16);
355            config.base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
356        }
357
358        let base_port = config.base_port;
359        let max_port = base_port
360            .checked_add(node_count_u16)
361            .ok_or_else(|| {
362                DevnetError::Config(format!(
363                    "Port range overflow: base_port {base_port} + node_count {node_count} exceeds u16::MAX"
364                ))
365            })?;
366        if max_port > DEVNET_PORT_RANGE_MAX {
367            return Err(DevnetError::Config(format!(
368                "Port range overflow: max port {max_port} exceeds DEVNET_PORT_RANGE_MAX {DEVNET_PORT_RANGE_MAX}"
369            )));
370        }
371
372        tokio::fs::create_dir_all(&config.data_dir).await?;
373
374        Ok(Self {
375            config,
376            nodes: Vec::new(),
377            shutdown: CancellationToken::new(),
378            state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
379            health_monitor: None,
380        })
381    }
382
383    /// Start the devnet.
384    ///
385    /// # Errors
386    ///
387    /// Returns `DevnetError::Startup` if any node fails to start, or
388    /// `DevnetError::Stabilization` if the network does not stabilize within the timeout.
389    pub async fn start(&mut self) -> Result<()> {
390        info!(
391            "Starting devnet with {} nodes ({} bootstrap)",
392            self.config.node_count, self.config.bootstrap_count
393        );
394
395        *self.state.write().await = NetworkState::BootstrappingPhase;
396        self.start_bootstrap_nodes().await?;
397
398        *self.state.write().await = NetworkState::NodeSpawningPhase;
399        self.start_regular_nodes().await?;
400
401        *self.state.write().await = NetworkState::Stabilizing;
402        self.wait_for_stabilization().await?;
403
404        self.start_health_monitor();
405
406        *self.state.write().await = NetworkState::Ready;
407        info!("Devnet is ready");
408        Ok(())
409    }
410
411    /// Shutdown the devnet.
412    ///
413    /// # Errors
414    ///
415    /// Returns `DevnetError::Io` if the data directory cleanup fails.
416    pub async fn shutdown(&mut self) -> Result<()> {
417        info!("Shutting down devnet");
418        *self.state.write().await = NetworkState::ShuttingDown;
419
420        self.shutdown.cancel();
421
422        if let Some(handle) = self.health_monitor.take() {
423            handle.abort();
424        }
425
426        let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
427        for node in self.nodes.iter_mut().rev() {
428            debug!("Stopping node {}", node.index);
429            if let Some(handle) = node.protocol_task.take() {
430                handle.abort();
431            }
432
433            let node_index = node.index;
434            let node_state = Arc::clone(&node.state);
435            let p2p_node = node.p2p_node.take();
436
437            shutdown_futures.push(async move {
438                if let Some(p2p) = p2p_node {
439                    if let Err(e) = p2p.shutdown().await {
440                        warn!("Error shutting down node {node_index}: {e}");
441                    }
442                }
443                *node_state.write().await = NodeState::Stopped;
444            });
445        }
446        futures::future::join_all(shutdown_futures).await;
447
448        if self.config.cleanup_data_dir {
449            if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await {
450                warn!("Failed to cleanup devnet data directory: {e}");
451            }
452        }
453
454        *self.state.write().await = NetworkState::Stopped;
455        info!("Devnet shutdown complete");
456        Ok(())
457    }
458
459    /// Get devnet configuration.
460    #[must_use]
461    pub fn config(&self) -> &DevnetConfig {
462        &self.config
463    }
464
465    /// Get bootstrap addresses.
466    #[must_use]
467    pub fn bootstrap_addrs(&self) -> Vec<MultiAddr> {
468        self.nodes
469            .iter()
470            .take(self.config.bootstrap_count)
471            .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
472            .collect()
473    }
474
475    async fn start_bootstrap_nodes(&mut self) -> Result<()> {
476        info!("Starting {} bootstrap nodes", self.config.bootstrap_count);
477
478        for i in 0..self.config.bootstrap_count {
479            let node = self.create_node(i, true, vec![]).await?;
480            self.start_node(node).await?;
481            tokio::time::sleep(self.config.spawn_delay).await;
482        }
483
484        self.wait_for_nodes_ready(0..self.config.bootstrap_count)
485            .await?;
486
487        info!("All bootstrap nodes are ready");
488        Ok(())
489    }
490
491    async fn start_regular_nodes(&mut self) -> Result<()> {
492        let regular_count = self.config.node_count - self.config.bootstrap_count;
493        info!("Starting {} regular nodes", regular_count);
494
495        let bootstrap_addrs: Vec<MultiAddr> = self
496            .nodes
497            .get(0..self.config.bootstrap_count)
498            .ok_or_else(|| {
499                DevnetError::Config(format!(
500                    "Bootstrap count {} exceeds nodes length {}",
501                    self.config.bootstrap_count,
502                    self.nodes.len()
503                ))
504            })?
505            .iter()
506            .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
507            .collect();
508
509        for i in self.config.bootstrap_count..self.config.node_count {
510            let node = self.create_node(i, false, bootstrap_addrs.clone()).await?;
511            self.start_node(node).await?;
512            tokio::time::sleep(self.config.spawn_delay).await;
513        }
514
515        info!("All regular nodes started");
516        Ok(())
517    }
518
519    async fn create_node(
520        &self,
521        index: usize,
522        is_bootstrap: bool,
523        bootstrap_addrs: Vec<MultiAddr>,
524    ) -> Result<DevnetNode> {
525        let index_u16 = u16::try_from(index)
526            .map_err(|_| DevnetError::Config(format!("Node index {index} exceeds u16::MAX")))?;
527        let port = self.config.base_port + index_u16;
528
529        // Generate identity first so we can use peer_id as the directory name
530        let identity = NodeIdentity::generate()
531            .map_err(|e| DevnetError::Core(format!("Failed to generate node identity: {e}")))?;
532        let peer_id = *identity.peer_id();
533        let label = format!("devnet_node_{index}");
534        let data_dir = self
535            .config
536            .data_dir
537            .join(NODES_SUBDIR)
538            .join(peer_id.to_hex());
539
540        tokio::fs::create_dir_all(&data_dir).await?;
541
542        identity
543            .save_to_file(&data_dir.join(NODE_IDENTITY_FILENAME))
544            .await
545            .map_err(|e| DevnetError::Core(format!("Failed to save node identity: {e}")))?;
546
547        let ant_protocol = Self::create_ant_protocol(&data_dir, &identity, &self.config).await?;
548
549        Ok(DevnetNode {
550            index,
551            label,
552            peer_id,
553            port,
554            data_dir,
555            p2p_node: None,
556            ant_protocol: Some(Arc::new(ant_protocol)),
557            is_bootstrap,
558            state: Arc::new(RwLock::new(NodeState::Pending)),
559            bootstrap_addrs,
560            protocol_task: None,
561        })
562    }
563
564    async fn create_ant_protocol(
565        data_dir: &std::path::Path,
566        identity: &NodeIdentity,
567        config: &DevnetConfig,
568    ) -> Result<AntProtocol> {
569        let storage_config = LmdbStorageConfig {
570            root_dir: data_dir.to_path_buf(),
571            verify_on_read: true,
572            max_chunks: 0,
573            max_map_size: 0,
574        };
575        let storage = LmdbStorage::new(storage_config)
576            .await
577            .map_err(|e| DevnetError::Core(format!("Failed to create LMDB storage: {e}")))?;
578
579        let evm_config = EvmVerifierConfig {
580            network: config
581                .evm_network
582                .clone()
583                .unwrap_or(EvmNetwork::ArbitrumOne),
584        };
585
586        let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS);
587        let payment_config = PaymentVerifierConfig {
588            evm: evm_config,
589            cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
590            local_rewards_address: rewards_address,
591        };
592        let payment_verifier = PaymentVerifier::new(payment_config);
593        let metrics_tracker =
594            QuotingMetricsTracker::new(DEVNET_MAX_RECORDS, DEVNET_INITIAL_RECORDS);
595        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
596
597        // Wire ML-DSA-65 signing from the devnet node's identity
598        crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)
599            .map_err(|e| DevnetError::Startup(format!("Failed to wire ML-DSA-65 signer: {e}")))?;
600
601        Ok(AntProtocol::new(
602            Arc::new(storage),
603            Arc::new(payment_verifier),
604            Arc::new(quote_generator),
605        ))
606    }
607
608    async fn start_node(&mut self, mut node: DevnetNode) -> Result<()> {
609        debug!("Starting node {} on port {}", node.index, node.port);
610        *node.state.write().await = NodeState::Starting;
611
612        let mut core_config = CoreNodeConfig::builder()
613            .port(node.port)
614            .local(true)
615            .max_message_size(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE)
616            .build()
617            .map_err(|e| DevnetError::Core(format!("Failed to create core config: {e}")))?;
618
619        // Load the node identity for app-level message signing.
620        let identity = NodeIdentity::load_from_file(
621            &node.data_dir.join(crate::config::NODE_IDENTITY_FILENAME),
622        )
623        .await
624        .map_err(|e| DevnetError::Core(format!("Failed to load node identity: {e}")))?;
625
626        core_config.node_identity = Some(Arc::new(identity));
627        core_config
628            .bootstrap_peers
629            .clone_from(&node.bootstrap_addrs);
630        core_config.diversity_config = Some(IPDiversityConfig::permissive());
631
632        let index = node.index;
633        let p2p_node = P2PNode::new(core_config)
634            .await
635            .map_err(|e| DevnetError::Startup(format!("Failed to create node {index}: {e}")))?;
636
637        p2p_node
638            .start()
639            .await
640            .map_err(|e| DevnetError::Startup(format!("Failed to start node {index}: {e}")))?;
641
642        node.p2p_node = Some(Arc::new(p2p_node));
643        *node.state.write().await = NodeState::Running;
644
645        if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
646            let mut events = p2p.subscribe_events();
647            let p2p_clone = Arc::clone(p2p);
648            let protocol_clone = Arc::clone(protocol);
649            let node_index = node.index;
650            node.protocol_task = Some(tokio::spawn(async move {
651                while let Ok(event) = events.recv().await {
652                    if let P2PEvent::Message {
653                        topic,
654                        source: Some(source),
655                        data,
656                    } = event
657                    {
658                        if topic == CHUNK_PROTOCOL_ID {
659                            debug!(
660                                "Node {node_index} received chunk protocol message from {source}"
661                            );
662                            let protocol = Arc::clone(&protocol_clone);
663                            let p2p = Arc::clone(&p2p_clone);
664                            tokio::spawn(async move {
665                                match protocol.try_handle_request(&data).await {
666                                    Ok(Some(response)) => {
667                                        if let Err(e) = p2p
668                                            .send_message(
669                                                &source,
670                                                CHUNK_PROTOCOL_ID,
671                                                response.to_vec(),
672                                                &[],
673                                            )
674                                            .await
675                                        {
676                                            warn!(
677                                                "Node {node_index} failed to send response to {source}: {e}"
678                                            );
679                                        }
680                                    }
681                                    Ok(None) => {}
682                                    Err(e) => {
683                                        warn!("Node {node_index} protocol handler error: {e}");
684                                    }
685                                }
686                            });
687                        }
688                    }
689                }
690            }));
691        }
692
693        debug!("Node {} started successfully", node.index);
694        self.nodes.push(node);
695        Ok(())
696    }
697
698    async fn wait_for_nodes_ready(&self, range: std::ops::Range<usize>) -> Result<()> {
699        let deadline = Instant::now() + self.config.node_startup_timeout;
700
701        for i in range {
702            while Instant::now() < deadline {
703                let node = self.nodes.get(i).ok_or_else(|| {
704                    DevnetError::Config(format!(
705                        "Node index {i} out of bounds (len: {})",
706                        self.nodes.len()
707                    ))
708                })?;
709                let state = node.state.read().await.clone();
710                match state {
711                    NodeState::Running | NodeState::Connected => break,
712                    NodeState::Failed(ref e) => {
713                        return Err(DevnetError::Startup(format!("Node {i} failed: {e}")));
714                    }
715                    _ => {
716                        tokio::time::sleep(Duration::from_millis(NODE_READY_POLL_INTERVAL_MS))
717                            .await;
718                    }
719                }
720            }
721        }
722        Ok(())
723    }
724
725    async fn wait_for_stabilization(&self) -> Result<()> {
726        let deadline = Instant::now() + self.config.stabilization_timeout;
727        let min_connections = self
728            .config
729            .bootstrap_count
730            .min(STABILIZATION_MIN_CONNECTIONS_CAP);
731
732        info!(
733            "Waiting for devnet stabilization (min {} connections per node)",
734            min_connections
735        );
736
737        while Instant::now() < deadline {
738            let mut all_connected = true;
739            let mut total_connections = 0;
740
741            for node in &self.nodes {
742                let peer_count = node.peer_count().await;
743                total_connections += peer_count;
744
745                if peer_count < min_connections {
746                    all_connected = false;
747                }
748            }
749
750            if all_connected {
751                info!("Devnet stabilized: {} total connections", total_connections);
752                return Ok(());
753            }
754
755            debug!(
756                "Waiting for stabilization: {} total connections",
757                total_connections
758            );
759            tokio::time::sleep(Duration::from_secs(STABILIZATION_POLL_INTERVAL_SECS)).await;
760        }
761
762        Err(DevnetError::Stabilization(
763            "Devnet failed to stabilize within timeout".to_string(),
764        ))
765    }
766
767    fn start_health_monitor(&mut self) {
768        let nodes: Vec<Arc<P2PNode>> = self
769            .nodes
770            .iter()
771            .filter_map(|n| n.p2p_node.clone())
772            .collect();
773        let shutdown = self.shutdown.clone();
774
775        self.health_monitor = Some(tokio::spawn(async move {
776            let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS);
777
778            loop {
779                tokio::select! {
780                    () = shutdown.cancelled() => break,
781                    () = tokio::time::sleep(check_interval) => {
782                        for (i, node) in nodes.iter().enumerate() {
783                            if !node.is_running() {
784                                warn!("Node {} appears unhealthy", i);
785                            }
786                        }
787                    }
788                }
789            }
790        }));
791    }
792}
793
794impl Drop for Devnet {
795    fn drop(&mut self) {
796        self.shutdown.cancel();
797        if let Some(handle) = self.health_monitor.take() {
798            handle.abort();
799        }
800    }
801}