Skip to main content

ant_node/
devnet.rs

1//! Local devnet infrastructure for spawning and managing multiple nodes.
2//!
3//! This module provides a local, in-process devnet suitable for running
4//! multi-node networks on a single machine.
5
6use crate::ant_protocol::CHUNK_PROTOCOL_ID;
7use crate::config::{default_root_dir, NODES_SUBDIR, NODE_IDENTITY_FILENAME};
8use crate::logging::{debug, info, warn};
9use crate::payment::{
10    EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
11    QuotingMetricsTracker,
12};
13use crate::replication::config::ReplicationConfig;
14use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
15use evmlib::Network as EvmNetwork;
16use evmlib::RewardsAddress;
17use rand::Rng;
18use saorsa_core::identity::NodeIdentity;
19use saorsa_core::{
20    IPDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId,
21};
22use std::net::{Ipv4Addr, SocketAddr};
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::time::Duration;
26use tokio::sync::RwLock;
27use tokio::task::JoinHandle;
28use tokio::time::Instant;
29use tokio_util::sync::CancellationToken;
30
31// =============================================================================
32// Devnet Constants
33// =============================================================================
34
35/// Minimum port for random devnet allocation.
36pub const DEVNET_PORT_RANGE_MIN: u16 = 20_000;
37
38/// Maximum port for random devnet allocation.
39pub const DEVNET_PORT_RANGE_MAX: u16 = 60_000;
40
41// =============================================================================
42// Default Timing Constants
43// =============================================================================
44
45/// Default delay between spawning nodes (milliseconds).
46const DEFAULT_SPAWN_DELAY_MS: u64 = 200;
47
48/// Default timeout for network stabilization (seconds).
49const DEFAULT_STABILIZATION_TIMEOUT_SECS: u64 = 120;
50
51/// Default timeout for single node startup (seconds).
52const DEFAULT_NODE_STARTUP_TIMEOUT_SECS: u64 = 30;
53
54/// Stabilization timeout for minimal network (seconds).
55const MINIMAL_STABILIZATION_TIMEOUT_SECS: u64 = 30;
56
57/// Stabilization timeout for small network (seconds).
58const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;
59
60/// Polling interval when waiting for individual nodes to become ready (milliseconds).
61const NODE_READY_POLL_INTERVAL_MS: u64 = 100;
62
63/// Polling interval when waiting for network stabilization (seconds).
64const STABILIZATION_POLL_INTERVAL_SECS: u64 = 1;
65
66/// Maximum minimum connections required per node during stabilization.
67const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3;
68
69/// Health monitor check interval (seconds).
70const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;
71
72// =============================================================================
73// AntProtocol Devnet Configuration
74// =============================================================================
75
76/// Payment cache capacity for devnet nodes.
77const DEVNET_PAYMENT_CACHE_CAPACITY: usize = 1000;
78
79/// Devnet rewards address (20 bytes, all 0x01).
80const DEVNET_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];
81
82/// Initial records for quoting metrics (devnet value).
83const DEVNET_INITIAL_RECORDS: usize = 1000;
84
85// =============================================================================
86// Default Node Counts
87// =============================================================================
88
89/// Default number of nodes in a full devnet.
90pub const DEFAULT_NODE_COUNT: usize = 25;
91
92/// Default number of bootstrap nodes.
93pub const DEFAULT_BOOTSTRAP_COUNT: usize = 3;
94
95/// Number of nodes in a minimal devnet.
96pub const MINIMAL_NODE_COUNT: usize = 5;
97
98/// Number of bootstrap nodes in a minimal network.
99pub const MINIMAL_BOOTSTRAP_COUNT: usize = 2;
100
101/// Number of nodes in a small devnet.
102pub const SMALL_NODE_COUNT: usize = 10;
103
104/// Error type for devnet operations.
105#[derive(Debug, thiserror::Error)]
106pub enum DevnetError {
107    /// Configuration error
108    #[error("Configuration error: {0}")]
109    Config(String),
110
111    /// Node startup error
112    #[error("Node startup error: {0}")]
113    Startup(String),
114
115    /// Network stabilization error
116    #[error("Network stabilization error: {0}")]
117    Stabilization(String),
118
119    /// IO error
120    #[error("IO error: {0}")]
121    Io(#[from] std::io::Error),
122
123    /// Core error
124    #[error("Core error: {0}")]
125    Core(String),
126}
127
128/// Result type for devnet operations.
129pub type Result<T> = std::result::Result<T, DevnetError>;
130
131/// Configuration for the devnet.
132///
133/// Each configuration is automatically isolated with unique ports and
134/// data directories to prevent collisions when running multiple devnets.
135#[derive(Debug, Clone)]
136pub struct DevnetConfig {
137    /// Number of nodes to spawn (default: 25).
138    pub node_count: usize,
139
140    /// Base port for node allocation (0 = auto).
141    pub base_port: u16,
142
143    /// Number of bootstrap nodes (first N nodes, default: 3).
144    pub bootstrap_count: usize,
145
146    /// Root directory for devnet data.
147    pub data_dir: PathBuf,
148
149    /// Delay between node spawns (default: 200ms).
150    pub spawn_delay: Duration,
151
152    /// Timeout for network stabilization (default: 120s).
153    pub stabilization_timeout: Duration,
154
155    /// Timeout for single node startup (default: 30s).
156    pub node_startup_timeout: Duration,
157
158    /// Enable verbose logging for devnet nodes.
159    pub enable_node_logging: bool,
160
161    /// Whether to remove the data directory on shutdown.
162    pub cleanup_data_dir: bool,
163
164    /// Optional EVM network for payment verification.
165    /// When `Some`, nodes will use this network (e.g. Anvil testnet) for
166    /// on-chain verification. Defaults to Arbitrum One when `None`.
167    pub evm_network: Option<EvmNetwork>,
168}
169
170impl Default for DevnetConfig {
171    fn default() -> Self {
172        let mut rng = rand::thread_rng();
173
174        #[allow(clippy::cast_possible_truncation)] // DEFAULT_NODE_COUNT is 25, always fits u16
175        let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(DEFAULT_NODE_COUNT as u16);
176        let base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
177
178        Self {
179            node_count: DEFAULT_NODE_COUNT,
180            base_port,
181            bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
182            data_dir: default_root_dir(),
183            spawn_delay: Duration::from_millis(DEFAULT_SPAWN_DELAY_MS),
184            stabilization_timeout: Duration::from_secs(DEFAULT_STABILIZATION_TIMEOUT_SECS),
185            node_startup_timeout: Duration::from_secs(DEFAULT_NODE_STARTUP_TIMEOUT_SECS),
186            enable_node_logging: false,
187            cleanup_data_dir: true,
188            evm_network: None,
189        }
190    }
191}
192
193impl DevnetConfig {
194    /// Minimal devnet preset (5 nodes).
195    #[must_use]
196    pub fn minimal() -> Self {
197        Self {
198            node_count: MINIMAL_NODE_COUNT,
199            bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
200            stabilization_timeout: Duration::from_secs(MINIMAL_STABILIZATION_TIMEOUT_SECS),
201            ..Self::default()
202        }
203    }
204
205    /// Small devnet preset (10 nodes).
206    #[must_use]
207    pub fn small() -> Self {
208        Self {
209            node_count: SMALL_NODE_COUNT,
210            bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
211            stabilization_timeout: Duration::from_secs(SMALL_STABILIZATION_TIMEOUT_SECS),
212            ..Self::default()
213        }
214    }
215}
216
217// The manifest types are shared with ant-client (CLI reads them, devnet
218// writes them), so they live in ant-protocol. Re-exported here for
219// backwards compatibility.
220pub use ant_protocol::devnet_manifest::{DevnetEvmInfo, DevnetManifest};
221
222/// Network state for devnet startup lifecycle.
223#[derive(Debug, Clone)]
224pub enum NetworkState {
225    /// Not started.
226    Uninitialized,
227    /// Bootstrapping nodes are starting.
228    BootstrappingPhase,
229    /// Regular nodes are starting.
230    NodeSpawningPhase,
231    /// Waiting for stabilization.
232    Stabilizing,
233    /// Network is ready.
234    Ready,
235    /// Shutting down.
236    ShuttingDown,
237    /// Stopped.
238    Stopped,
239}
240
241/// Node state for devnet nodes.
242#[derive(Debug, Clone)]
243pub enum NodeState {
244    /// Not started yet.
245    Pending,
246    /// Starting up.
247    Starting,
248    /// Running.
249    Running,
250    /// Connected to peers.
251    Connected,
252    /// Stopped.
253    Stopped,
254    /// Failed to start.
255    Failed(String),
256}
257
258/// A single devnet node instance.
259#[allow(dead_code)]
260pub struct DevnetNode {
261    index: usize,
262    label: String,
263    peer_id: PeerId,
264    port: u16,
265    data_dir: PathBuf,
266    p2p_node: Option<Arc<P2PNode>>,
267    ant_protocol: Option<Arc<AntProtocol>>,
268    is_bootstrap: bool,
269    state: Arc<RwLock<NodeState>>,
270    bootstrap_addrs: Vec<MultiAddr>,
271    protocol_task: Option<JoinHandle<()>>,
272}
273
274impl DevnetNode {
275    /// Get the node's peer count.
276    pub async fn peer_count(&self) -> usize {
277        if let Some(ref node) = self.p2p_node {
278            node.peer_count().await
279        } else {
280            0
281        }
282    }
283}
284
285/// A local devnet composed of multiple nodes.
286pub struct Devnet {
287    config: DevnetConfig,
288    nodes: Vec<DevnetNode>,
289    shutdown: CancellationToken,
290    state: Arc<RwLock<NetworkState>>,
291    health_monitor: Option<JoinHandle<()>>,
292}
293
294impl Devnet {
295    /// Create a new devnet with the given configuration.
296    ///
297    /// # Errors
298    ///
299    /// Returns `DevnetError::Config` if the configuration is invalid (e.g. bootstrap
300    /// count exceeds node count or port range overflow).
301    /// Returns `DevnetError::Io` if the data directory cannot be created.
302    pub async fn new(mut config: DevnetConfig) -> Result<Self> {
303        if config.bootstrap_count >= config.node_count {
304            return Err(DevnetError::Config(
305                "Bootstrap count must be less than node count".to_string(),
306            ));
307        }
308
309        if config.bootstrap_count == 0 {
310            return Err(DevnetError::Config(
311                "At least one bootstrap node is required".to_string(),
312            ));
313        }
314
315        let node_count = config.node_count;
316        let node_count_u16 = u16::try_from(node_count).map_err(|_| {
317            DevnetError::Config(format!("Node count {node_count} exceeds u16::MAX"))
318        })?;
319
320        if config.base_port == 0 {
321            let mut rng = rand::thread_rng();
322            let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(node_count_u16);
323            config.base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
324        }
325
326        let base_port = config.base_port;
327        let max_port = base_port
328            .checked_add(node_count_u16)
329            .ok_or_else(|| {
330                DevnetError::Config(format!(
331                    "Port range overflow: base_port {base_port} + node_count {node_count} exceeds u16::MAX"
332                ))
333            })?;
334        if max_port > DEVNET_PORT_RANGE_MAX {
335            return Err(DevnetError::Config(format!(
336                "Port range overflow: max port {max_port} exceeds DEVNET_PORT_RANGE_MAX {DEVNET_PORT_RANGE_MAX}"
337            )));
338        }
339
340        tokio::fs::create_dir_all(&config.data_dir).await?;
341
342        Ok(Self {
343            config,
344            nodes: Vec::new(),
345            shutdown: CancellationToken::new(),
346            state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
347            health_monitor: None,
348        })
349    }
350
351    /// Start the devnet.
352    ///
353    /// # Errors
354    ///
355    /// Returns `DevnetError::Startup` if any node fails to start, or
356    /// `DevnetError::Stabilization` if the network does not stabilize within the timeout.
357    pub async fn start(&mut self) -> Result<()> {
358        info!(
359            "Starting devnet with {} nodes ({} bootstrap)",
360            self.config.node_count, self.config.bootstrap_count
361        );
362
363        *self.state.write().await = NetworkState::BootstrappingPhase;
364        self.start_bootstrap_nodes().await?;
365
366        *self.state.write().await = NetworkState::NodeSpawningPhase;
367        self.start_regular_nodes().await?;
368
369        *self.state.write().await = NetworkState::Stabilizing;
370        self.wait_for_stabilization().await?;
371
372        self.start_health_monitor();
373
374        *self.state.write().await = NetworkState::Ready;
375        info!("Devnet is ready");
376        Ok(())
377    }
378
379    /// Shutdown the devnet.
380    ///
381    /// # Errors
382    ///
383    /// Returns `DevnetError::Io` if the data directory cleanup fails.
384    pub async fn shutdown(&mut self) -> Result<()> {
385        info!("Shutting down devnet");
386        *self.state.write().await = NetworkState::ShuttingDown;
387
388        self.shutdown.cancel();
389
390        if let Some(handle) = self.health_monitor.take() {
391            handle.abort();
392        }
393
394        let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
395        for node in self.nodes.iter_mut().rev() {
396            debug!("Stopping node {}", node.index);
397            if let Some(handle) = node.protocol_task.take() {
398                handle.abort();
399            }
400
401            let node_index = node.index;
402            let node_state = Arc::clone(&node.state);
403            let p2p_node = node.p2p_node.take();
404
405            shutdown_futures.push(async move {
406                if let Some(p2p) = p2p_node {
407                    if let Err(e) = p2p.shutdown().await {
408                        warn!("Error shutting down node {node_index}: {e}");
409                    }
410                }
411                *node_state.write().await = NodeState::Stopped;
412            });
413        }
414        futures::future::join_all(shutdown_futures).await;
415
416        if self.config.cleanup_data_dir {
417            if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await {
418                warn!("Failed to cleanup devnet data directory: {e}");
419            }
420        }
421
422        *self.state.write().await = NetworkState::Stopped;
423        info!("Devnet shutdown complete");
424        Ok(())
425    }
426
427    /// Get devnet configuration.
428    #[must_use]
429    pub fn config(&self) -> &DevnetConfig {
430        &self.config
431    }
432
433    /// Get bootstrap addresses.
434    #[must_use]
435    pub fn bootstrap_addrs(&self) -> Vec<MultiAddr> {
436        self.nodes
437            .iter()
438            .take(self.config.bootstrap_count)
439            .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
440            .collect()
441    }
442
443    async fn start_bootstrap_nodes(&mut self) -> Result<()> {
444        info!("Starting {} bootstrap nodes", self.config.bootstrap_count);
445
446        for i in 0..self.config.bootstrap_count {
447            let node = self.create_node(i, true, vec![]).await?;
448            self.start_node(node).await?;
449            tokio::time::sleep(self.config.spawn_delay).await;
450        }
451
452        self.wait_for_nodes_ready(0..self.config.bootstrap_count)
453            .await?;
454
455        info!("All bootstrap nodes are ready");
456        Ok(())
457    }
458
459    async fn start_regular_nodes(&mut self) -> Result<()> {
460        let regular_count = self.config.node_count - self.config.bootstrap_count;
461        info!("Starting {} regular nodes", regular_count);
462
463        let bootstrap_addrs: Vec<MultiAddr> = self
464            .nodes
465            .get(0..self.config.bootstrap_count)
466            .ok_or_else(|| {
467                DevnetError::Config(format!(
468                    "Bootstrap count {} exceeds nodes length {}",
469                    self.config.bootstrap_count,
470                    self.nodes.len()
471                ))
472            })?
473            .iter()
474            .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
475            .collect();
476
477        for i in self.config.bootstrap_count..self.config.node_count {
478            let node = self.create_node(i, false, bootstrap_addrs.clone()).await?;
479            self.start_node(node).await?;
480            tokio::time::sleep(self.config.spawn_delay).await;
481        }
482
483        info!("All regular nodes started");
484        Ok(())
485    }
486
487    async fn create_node(
488        &self,
489        index: usize,
490        is_bootstrap: bool,
491        bootstrap_addrs: Vec<MultiAddr>,
492    ) -> Result<DevnetNode> {
493        let index_u16 = u16::try_from(index)
494            .map_err(|_| DevnetError::Config(format!("Node index {index} exceeds u16::MAX")))?;
495        let port = self.config.base_port + index_u16;
496
497        // Generate identity first so we can use peer_id as the directory name
498        let identity = NodeIdentity::generate()
499            .map_err(|e| DevnetError::Core(format!("Failed to generate node identity: {e}")))?;
500        let peer_id = *identity.peer_id();
501        let label = format!("devnet_node_{index}");
502        let data_dir = self
503            .config
504            .data_dir
505            .join(NODES_SUBDIR)
506            .join(peer_id.to_hex());
507
508        tokio::fs::create_dir_all(&data_dir).await?;
509
510        identity
511            .save_to_file(&data_dir.join(NODE_IDENTITY_FILENAME))
512            .await
513            .map_err(|e| DevnetError::Core(format!("Failed to save node identity: {e}")))?;
514
515        let ant_protocol = Self::create_ant_protocol(&data_dir, &identity, &self.config).await?;
516
517        Ok(DevnetNode {
518            index,
519            label,
520            peer_id,
521            port,
522            data_dir,
523            p2p_node: None,
524            ant_protocol: Some(Arc::new(ant_protocol)),
525            is_bootstrap,
526            state: Arc::new(RwLock::new(NodeState::Pending)),
527            bootstrap_addrs,
528            protocol_task: None,
529        })
530    }
531
532    async fn create_ant_protocol(
533        data_dir: &std::path::Path,
534        identity: &NodeIdentity,
535        config: &DevnetConfig,
536    ) -> Result<AntProtocol> {
537        let storage_config = LmdbStorageConfig {
538            root_dir: data_dir.to_path_buf(),
539            verify_on_read: true,
540            ..LmdbStorageConfig::default()
541        };
542        let storage = LmdbStorage::new(storage_config)
543            .await
544            .map_err(|e| DevnetError::Core(format!("Failed to create LMDB storage: {e}")))?;
545
546        let evm_config = EvmVerifierConfig {
547            network: config
548                .evm_network
549                .clone()
550                .unwrap_or(EvmNetwork::ArbitrumOne),
551        };
552
553        let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS);
554        let replication_config = ReplicationConfig::default();
555        let payment_config = PaymentVerifierConfig {
556            evm: evm_config,
557            cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
558            close_group_size: replication_config.close_group_size,
559            local_rewards_address: rewards_address,
560        };
561        let payment_verifier = PaymentVerifier::new(payment_config);
562        let metrics_tracker = QuotingMetricsTracker::new(DEVNET_INITIAL_RECORDS);
563        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
564
565        // Wire ML-DSA-65 signing from the devnet node's identity
566        crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)
567            .map_err(|e| DevnetError::Startup(format!("Failed to wire ML-DSA-65 signer: {e}")))?;
568
569        let storage = Arc::new(storage);
570        let payment_verifier = Arc::new(payment_verifier);
571
572        Ok(AntProtocol::new(
573            storage,
574            payment_verifier,
575            Arc::new(quote_generator),
576        ))
577    }
578
579    async fn start_node(&mut self, mut node: DevnetNode) -> Result<()> {
580        debug!("Starting node {} on port {}", node.index, node.port);
581        *node.state.write().await = NodeState::Starting;
582
583        let mut core_config = CoreNodeConfig::builder()
584            .port(node.port)
585            .local(true)
586            .max_message_size(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE)
587            .build()
588            .map_err(|e| DevnetError::Core(format!("Failed to create core config: {e}")))?;
589
590        // Load the node identity for app-level message signing.
591        let identity = NodeIdentity::load_from_file(
592            &node.data_dir.join(crate::config::NODE_IDENTITY_FILENAME),
593        )
594        .await
595        .map_err(|e| DevnetError::Core(format!("Failed to load node identity: {e}")))?;
596
597        core_config.node_identity = Some(Arc::new(identity));
598        core_config
599            .bootstrap_peers
600            .clone_from(&node.bootstrap_addrs);
601        core_config.diversity_config = Some(IPDiversityConfig::permissive());
602
603        let index = node.index;
604        let p2p_node = P2PNode::new(core_config)
605            .await
606            .map_err(|e| DevnetError::Startup(format!("Failed to create node {index}: {e}")))?;
607
608        p2p_node
609            .start()
610            .await
611            .map_err(|e| DevnetError::Startup(format!("Failed to start node {index}: {e}")))?;
612
613        node.p2p_node = Some(Arc::new(p2p_node));
614        *node.state.write().await = NodeState::Running;
615
616        if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
617            // Wire P2P into AntProtocol for payment-proof closeness checks.
618            protocol.attach_p2p_node(Arc::clone(p2p));
619
620            let mut events = p2p.subscribe_events();
621            let p2p_clone = Arc::clone(p2p);
622            let protocol_clone = Arc::clone(protocol);
623            let node_index = node.index;
624            node.protocol_task = Some(tokio::spawn(async move {
625                while let Ok(event) = events.recv().await {
626                    if let P2PEvent::Message {
627                        topic,
628                        source: Some(source),
629                        data,
630                        ..
631                    } = event
632                    {
633                        if topic == CHUNK_PROTOCOL_ID {
634                            debug!(
635                                "Node {node_index} received chunk protocol message from {source}"
636                            );
637                            let protocol = Arc::clone(&protocol_clone);
638                            let p2p = Arc::clone(&p2p_clone);
639                            tokio::spawn(async move {
640                                match protocol.try_handle_request(&data).await {
641                                    Ok(Some(response)) => {
642                                        if let Err(e) = p2p
643                                            .send_message(
644                                                &source,
645                                                CHUNK_PROTOCOL_ID,
646                                                response.to_vec(),
647                                                &[],
648                                            )
649                                            .await
650                                        {
651                                            warn!(
652                                                "Node {node_index} failed to send response to {source}: {e}"
653                                            );
654                                        }
655                                    }
656                                    Ok(None) => {}
657                                    Err(e) => {
658                                        warn!("Node {node_index} protocol handler error: {e}");
659                                    }
660                                }
661                            });
662                        }
663                    }
664                }
665            }));
666        }
667
668        debug!("Node {} started successfully", node.index);
669        self.nodes.push(node);
670        Ok(())
671    }
672
673    async fn wait_for_nodes_ready(&self, range: std::ops::Range<usize>) -> Result<()> {
674        let deadline = Instant::now() + self.config.node_startup_timeout;
675
676        for i in range {
677            while Instant::now() < deadline {
678                let node = self.nodes.get(i).ok_or_else(|| {
679                    DevnetError::Config(format!(
680                        "Node index {i} out of bounds (len: {})",
681                        self.nodes.len()
682                    ))
683                })?;
684                let state = node.state.read().await.clone();
685                match state {
686                    NodeState::Running | NodeState::Connected => break,
687                    NodeState::Failed(ref e) => {
688                        return Err(DevnetError::Startup(format!("Node {i} failed: {e}")));
689                    }
690                    _ => {
691                        tokio::time::sleep(Duration::from_millis(NODE_READY_POLL_INTERVAL_MS))
692                            .await;
693                    }
694                }
695            }
696        }
697        Ok(())
698    }
699
700    async fn wait_for_stabilization(&self) -> Result<()> {
701        let deadline = Instant::now() + self.config.stabilization_timeout;
702        let min_connections = self
703            .config
704            .bootstrap_count
705            .min(STABILIZATION_MIN_CONNECTIONS_CAP);
706
707        info!(
708            "Waiting for devnet stabilization (min {} connections per node)",
709            min_connections
710        );
711
712        while Instant::now() < deadline {
713            let mut all_connected = true;
714            let mut total_connections = 0;
715
716            for node in &self.nodes {
717                let peer_count = node.peer_count().await;
718                total_connections += peer_count;
719
720                if peer_count < min_connections {
721                    all_connected = false;
722                }
723            }
724
725            if all_connected {
726                info!("Devnet stabilized: {} total connections", total_connections);
727                return Ok(());
728            }
729
730            debug!(
731                "Waiting for stabilization: {} total connections",
732                total_connections
733            );
734            tokio::time::sleep(Duration::from_secs(STABILIZATION_POLL_INTERVAL_SECS)).await;
735        }
736
737        Err(DevnetError::Stabilization(
738            "Devnet failed to stabilize within timeout".to_string(),
739        ))
740    }
741
742    fn start_health_monitor(&mut self) {
743        let nodes: Vec<Arc<P2PNode>> = self
744            .nodes
745            .iter()
746            .filter_map(|n| n.p2p_node.clone())
747            .collect();
748        let shutdown = self.shutdown.clone();
749
750        self.health_monitor = Some(tokio::spawn(async move {
751            let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS);
752
753            loop {
754                tokio::select! {
755                    () = shutdown.cancelled() => break,
756                    () = tokio::time::sleep(check_interval) => {
757                        for (i, node) in nodes.iter().enumerate() {
758                            if !node.is_running() {
759                                warn!("Node {} appears unhealthy", i);
760                            }
761                        }
762                    }
763                }
764            }
765        }));
766    }
767}
768
769impl Drop for Devnet {
770    fn drop(&mut self) {
771        self.shutdown.cancel();
772        if let Some(handle) = self.health_monitor.take() {
773            handle.abort();
774        }
775    }
776}