Skip to main content

ant_node/
devnet.rs

1//! Local devnet infrastructure for spawning and managing multiple nodes.
2//!
3//! This module provides a local, in-process devnet suitable for running
4//! multi-node networks on a single machine.
5
6use crate::ant_protocol::CHUNK_PROTOCOL_ID;
7use crate::config::{default_root_dir, NODES_SUBDIR, NODE_IDENTITY_FILENAME};
8use crate::logging::{debug, info, warn};
9use crate::payment::{
10    EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
11    QuotingMetricsTracker,
12};
13use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
14use evmlib::Network as EvmNetwork;
15use evmlib::RewardsAddress;
16use rand::Rng;
17use saorsa_core::identity::NodeIdentity;
18use saorsa_core::{
19    IPDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId,
20};
21use std::net::{Ipv4Addr, SocketAddr};
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::sync::RwLock;
26use tokio::task::JoinHandle;
27use tokio::time::Instant;
28use tokio_util::sync::CancellationToken;
29
30// =============================================================================
31// Devnet Constants
32// =============================================================================
33
34/// Minimum port for random devnet allocation.
35pub const DEVNET_PORT_RANGE_MIN: u16 = 20_000;
36
37/// Maximum port for random devnet allocation.
38pub const DEVNET_PORT_RANGE_MAX: u16 = 60_000;
39
40// =============================================================================
41// Default Timing Constants
42// =============================================================================
43
44/// Default delay between spawning nodes (milliseconds).
45const DEFAULT_SPAWN_DELAY_MS: u64 = 200;
46
47/// Default timeout for network stabilization (seconds).
48const DEFAULT_STABILIZATION_TIMEOUT_SECS: u64 = 120;
49
50/// Default timeout for single node startup (seconds).
51const DEFAULT_NODE_STARTUP_TIMEOUT_SECS: u64 = 30;
52
53/// Stabilization timeout for minimal network (seconds).
54const MINIMAL_STABILIZATION_TIMEOUT_SECS: u64 = 30;
55
56/// Stabilization timeout for small network (seconds).
57const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;
58
59/// Polling interval when waiting for individual nodes to become ready (milliseconds).
60const NODE_READY_POLL_INTERVAL_MS: u64 = 100;
61
62/// Polling interval when waiting for network stabilization (seconds).
63const STABILIZATION_POLL_INTERVAL_SECS: u64 = 1;
64
65/// Maximum minimum connections required per node during stabilization.
66const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3;
67
68/// Health monitor check interval (seconds).
69const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;
70
71// =============================================================================
72// AntProtocol Devnet Configuration
73// =============================================================================
74
75/// Payment cache capacity for devnet nodes.
76const DEVNET_PAYMENT_CACHE_CAPACITY: usize = 1000;
77
78/// Devnet rewards address (20 bytes, all 0x01).
79const DEVNET_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];
80
81/// Initial records for quoting metrics (devnet value).
82const DEVNET_INITIAL_RECORDS: usize = 1000;
83
84// =============================================================================
85// Default Node Counts
86// =============================================================================
87
88/// Default number of nodes in a full devnet.
89pub const DEFAULT_NODE_COUNT: usize = 25;
90
91/// Default number of bootstrap nodes.
92pub const DEFAULT_BOOTSTRAP_COUNT: usize = 3;
93
94/// Number of nodes in a minimal devnet.
95pub const MINIMAL_NODE_COUNT: usize = 5;
96
97/// Number of bootstrap nodes in a minimal network.
98pub const MINIMAL_BOOTSTRAP_COUNT: usize = 2;
99
100/// Number of nodes in a small devnet.
101pub const SMALL_NODE_COUNT: usize = 10;
102
103/// Error type for devnet operations.
104#[derive(Debug, thiserror::Error)]
105pub enum DevnetError {
106    /// Configuration error
107    #[error("Configuration error: {0}")]
108    Config(String),
109
110    /// Node startup error
111    #[error("Node startup error: {0}")]
112    Startup(String),
113
114    /// Network stabilization error
115    #[error("Network stabilization error: {0}")]
116    Stabilization(String),
117
118    /// IO error
119    #[error("IO error: {0}")]
120    Io(#[from] std::io::Error),
121
122    /// Core error
123    #[error("Core error: {0}")]
124    Core(String),
125}
126
127/// Result type for devnet operations.
128pub type Result<T> = std::result::Result<T, DevnetError>;
129
130/// Configuration for the devnet.
131///
132/// Each configuration is automatically isolated with unique ports and
133/// data directories to prevent collisions when running multiple devnets.
134#[derive(Debug, Clone)]
135pub struct DevnetConfig {
136    /// Number of nodes to spawn (default: 25).
137    pub node_count: usize,
138
139    /// Base port for node allocation (0 = auto).
140    pub base_port: u16,
141
142    /// Number of bootstrap nodes (first N nodes, default: 3).
143    pub bootstrap_count: usize,
144
145    /// Root directory for devnet data.
146    pub data_dir: PathBuf,
147
148    /// Delay between node spawns (default: 200ms).
149    pub spawn_delay: Duration,
150
151    /// Timeout for network stabilization (default: 120s).
152    pub stabilization_timeout: Duration,
153
154    /// Timeout for single node startup (default: 30s).
155    pub node_startup_timeout: Duration,
156
157    /// Enable verbose logging for devnet nodes.
158    pub enable_node_logging: bool,
159
160    /// Whether to remove the data directory on shutdown.
161    pub cleanup_data_dir: bool,
162
163    /// Optional EVM network for payment verification.
164    /// When `Some`, nodes will use this network (e.g. Anvil testnet) for
165    /// on-chain verification. Defaults to Arbitrum One when `None`.
166    pub evm_network: Option<EvmNetwork>,
167}
168
169impl Default for DevnetConfig {
170    fn default() -> Self {
171        let mut rng = rand::thread_rng();
172
173        #[allow(clippy::cast_possible_truncation)] // DEFAULT_NODE_COUNT is 25, always fits u16
174        let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(DEFAULT_NODE_COUNT as u16);
175        let base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
176
177        Self {
178            node_count: DEFAULT_NODE_COUNT,
179            base_port,
180            bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
181            data_dir: default_root_dir(),
182            spawn_delay: Duration::from_millis(DEFAULT_SPAWN_DELAY_MS),
183            stabilization_timeout: Duration::from_secs(DEFAULT_STABILIZATION_TIMEOUT_SECS),
184            node_startup_timeout: Duration::from_secs(DEFAULT_NODE_STARTUP_TIMEOUT_SECS),
185            enable_node_logging: false,
186            cleanup_data_dir: true,
187            evm_network: None,
188        }
189    }
190}
191
192impl DevnetConfig {
193    /// Minimal devnet preset (5 nodes).
194    #[must_use]
195    pub fn minimal() -> Self {
196        Self {
197            node_count: MINIMAL_NODE_COUNT,
198            bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
199            stabilization_timeout: Duration::from_secs(MINIMAL_STABILIZATION_TIMEOUT_SECS),
200            ..Self::default()
201        }
202    }
203
204    /// Small devnet preset (10 nodes).
205    #[must_use]
206    pub fn small() -> Self {
207        Self {
208            node_count: SMALL_NODE_COUNT,
209            bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
210            stabilization_timeout: Duration::from_secs(SMALL_STABILIZATION_TIMEOUT_SECS),
211            ..Self::default()
212        }
213    }
214}
215
216// The manifest types are shared with ant-client (CLI reads them, devnet
217// writes them), so they live in ant-protocol. Re-exported here for
218// backwards compatibility.
219pub use ant_protocol::devnet_manifest::{DevnetEvmInfo, DevnetManifest};
220
221/// Network state for devnet startup lifecycle.
222#[derive(Debug, Clone)]
223pub enum NetworkState {
224    /// Not started.
225    Uninitialized,
226    /// Bootstrapping nodes are starting.
227    BootstrappingPhase,
228    /// Regular nodes are starting.
229    NodeSpawningPhase,
230    /// Waiting for stabilization.
231    Stabilizing,
232    /// Network is ready.
233    Ready,
234    /// Shutting down.
235    ShuttingDown,
236    /// Stopped.
237    Stopped,
238}
239
240/// Node state for devnet nodes.
241#[derive(Debug, Clone)]
242pub enum NodeState {
243    /// Not started yet.
244    Pending,
245    /// Starting up.
246    Starting,
247    /// Running.
248    Running,
249    /// Connected to peers.
250    Connected,
251    /// Stopped.
252    Stopped,
253    /// Failed to start.
254    Failed(String),
255}
256
257/// A single devnet node instance.
258#[allow(dead_code)]
259pub struct DevnetNode {
260    index: usize,
261    label: String,
262    peer_id: PeerId,
263    port: u16,
264    data_dir: PathBuf,
265    p2p_node: Option<Arc<P2PNode>>,
266    ant_protocol: Option<Arc<AntProtocol>>,
267    is_bootstrap: bool,
268    state: Arc<RwLock<NodeState>>,
269    bootstrap_addrs: Vec<MultiAddr>,
270    protocol_task: Option<JoinHandle<()>>,
271}
272
273impl DevnetNode {
274    /// Get the node's peer count.
275    pub async fn peer_count(&self) -> usize {
276        if let Some(ref node) = self.p2p_node {
277            node.peer_count().await
278        } else {
279            0
280        }
281    }
282}
283
284/// A local devnet composed of multiple nodes.
285pub struct Devnet {
286    config: DevnetConfig,
287    nodes: Vec<DevnetNode>,
288    shutdown: CancellationToken,
289    state: Arc<RwLock<NetworkState>>,
290    health_monitor: Option<JoinHandle<()>>,
291}
292
293impl Devnet {
294    /// Create a new devnet with the given configuration.
295    ///
296    /// # Errors
297    ///
298    /// Returns `DevnetError::Config` if the configuration is invalid (e.g. bootstrap
299    /// count exceeds node count or port range overflow).
300    /// Returns `DevnetError::Io` if the data directory cannot be created.
301    pub async fn new(mut config: DevnetConfig) -> Result<Self> {
302        if config.bootstrap_count >= config.node_count {
303            return Err(DevnetError::Config(
304                "Bootstrap count must be less than node count".to_string(),
305            ));
306        }
307
308        if config.bootstrap_count == 0 {
309            return Err(DevnetError::Config(
310                "At least one bootstrap node is required".to_string(),
311            ));
312        }
313
314        let node_count = config.node_count;
315        let node_count_u16 = u16::try_from(node_count).map_err(|_| {
316            DevnetError::Config(format!("Node count {node_count} exceeds u16::MAX"))
317        })?;
318
319        if config.base_port == 0 {
320            let mut rng = rand::thread_rng();
321            let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(node_count_u16);
322            config.base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
323        }
324
325        let base_port = config.base_port;
326        let max_port = base_port
327            .checked_add(node_count_u16)
328            .ok_or_else(|| {
329                DevnetError::Config(format!(
330                    "Port range overflow: base_port {base_port} + node_count {node_count} exceeds u16::MAX"
331                ))
332            })?;
333        if max_port > DEVNET_PORT_RANGE_MAX {
334            return Err(DevnetError::Config(format!(
335                "Port range overflow: max port {max_port} exceeds DEVNET_PORT_RANGE_MAX {DEVNET_PORT_RANGE_MAX}"
336            )));
337        }
338
339        tokio::fs::create_dir_all(&config.data_dir).await?;
340
341        Ok(Self {
342            config,
343            nodes: Vec::new(),
344            shutdown: CancellationToken::new(),
345            state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
346            health_monitor: None,
347        })
348    }
349
350    /// Start the devnet.
351    ///
352    /// # Errors
353    ///
354    /// Returns `DevnetError::Startup` if any node fails to start, or
355    /// `DevnetError::Stabilization` if the network does not stabilize within the timeout.
356    pub async fn start(&mut self) -> Result<()> {
357        info!(
358            "Starting devnet with {} nodes ({} bootstrap)",
359            self.config.node_count, self.config.bootstrap_count
360        );
361
362        *self.state.write().await = NetworkState::BootstrappingPhase;
363        self.start_bootstrap_nodes().await?;
364
365        *self.state.write().await = NetworkState::NodeSpawningPhase;
366        self.start_regular_nodes().await?;
367
368        *self.state.write().await = NetworkState::Stabilizing;
369        self.wait_for_stabilization().await?;
370
371        self.start_health_monitor();
372
373        *self.state.write().await = NetworkState::Ready;
374        info!("Devnet is ready");
375        Ok(())
376    }
377
378    /// Shutdown the devnet.
379    ///
380    /// # Errors
381    ///
382    /// Returns `DevnetError::Io` if the data directory cleanup fails.
383    pub async fn shutdown(&mut self) -> Result<()> {
384        info!("Shutting down devnet");
385        *self.state.write().await = NetworkState::ShuttingDown;
386
387        self.shutdown.cancel();
388
389        if let Some(handle) = self.health_monitor.take() {
390            handle.abort();
391        }
392
393        let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
394        for node in self.nodes.iter_mut().rev() {
395            debug!("Stopping node {}", node.index);
396            if let Some(handle) = node.protocol_task.take() {
397                handle.abort();
398            }
399
400            let node_index = node.index;
401            let node_state = Arc::clone(&node.state);
402            let p2p_node = node.p2p_node.take();
403
404            shutdown_futures.push(async move {
405                if let Some(p2p) = p2p_node {
406                    if let Err(e) = p2p.shutdown().await {
407                        warn!("Error shutting down node {node_index}: {e}");
408                    }
409                }
410                *node_state.write().await = NodeState::Stopped;
411            });
412        }
413        futures::future::join_all(shutdown_futures).await;
414
415        if self.config.cleanup_data_dir {
416            if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await {
417                warn!("Failed to cleanup devnet data directory: {e}");
418            }
419        }
420
421        *self.state.write().await = NetworkState::Stopped;
422        info!("Devnet shutdown complete");
423        Ok(())
424    }
425
426    /// Get devnet configuration.
427    #[must_use]
428    pub fn config(&self) -> &DevnetConfig {
429        &self.config
430    }
431
432    /// Get bootstrap addresses.
433    #[must_use]
434    pub fn bootstrap_addrs(&self) -> Vec<MultiAddr> {
435        self.nodes
436            .iter()
437            .take(self.config.bootstrap_count)
438            .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
439            .collect()
440    }
441
442    async fn start_bootstrap_nodes(&mut self) -> Result<()> {
443        info!("Starting {} bootstrap nodes", self.config.bootstrap_count);
444
445        for i in 0..self.config.bootstrap_count {
446            let node = self.create_node(i, true, vec![]).await?;
447            self.start_node(node).await?;
448            tokio::time::sleep(self.config.spawn_delay).await;
449        }
450
451        self.wait_for_nodes_ready(0..self.config.bootstrap_count)
452            .await?;
453
454        info!("All bootstrap nodes are ready");
455        Ok(())
456    }
457
458    async fn start_regular_nodes(&mut self) -> Result<()> {
459        let regular_count = self.config.node_count - self.config.bootstrap_count;
460        info!("Starting {} regular nodes", regular_count);
461
462        let bootstrap_addrs: Vec<MultiAddr> = self
463            .nodes
464            .get(0..self.config.bootstrap_count)
465            .ok_or_else(|| {
466                DevnetError::Config(format!(
467                    "Bootstrap count {} exceeds nodes length {}",
468                    self.config.bootstrap_count,
469                    self.nodes.len()
470                ))
471            })?
472            .iter()
473            .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
474            .collect();
475
476        for i in self.config.bootstrap_count..self.config.node_count {
477            let node = self.create_node(i, false, bootstrap_addrs.clone()).await?;
478            self.start_node(node).await?;
479            tokio::time::sleep(self.config.spawn_delay).await;
480        }
481
482        info!("All regular nodes started");
483        Ok(())
484    }
485
486    async fn create_node(
487        &self,
488        index: usize,
489        is_bootstrap: bool,
490        bootstrap_addrs: Vec<MultiAddr>,
491    ) -> Result<DevnetNode> {
492        let index_u16 = u16::try_from(index)
493            .map_err(|_| DevnetError::Config(format!("Node index {index} exceeds u16::MAX")))?;
494        let port = self.config.base_port + index_u16;
495
496        // Generate identity first so we can use peer_id as the directory name
497        let identity = NodeIdentity::generate()
498            .map_err(|e| DevnetError::Core(format!("Failed to generate node identity: {e}")))?;
499        let peer_id = *identity.peer_id();
500        let label = format!("devnet_node_{index}");
501        let data_dir = self
502            .config
503            .data_dir
504            .join(NODES_SUBDIR)
505            .join(peer_id.to_hex());
506
507        tokio::fs::create_dir_all(&data_dir).await?;
508
509        identity
510            .save_to_file(&data_dir.join(NODE_IDENTITY_FILENAME))
511            .await
512            .map_err(|e| DevnetError::Core(format!("Failed to save node identity: {e}")))?;
513
514        let ant_protocol = Self::create_ant_protocol(&data_dir, &identity, &self.config).await?;
515
516        Ok(DevnetNode {
517            index,
518            label,
519            peer_id,
520            port,
521            data_dir,
522            p2p_node: None,
523            ant_protocol: Some(Arc::new(ant_protocol)),
524            is_bootstrap,
525            state: Arc::new(RwLock::new(NodeState::Pending)),
526            bootstrap_addrs,
527            protocol_task: None,
528        })
529    }
530
531    async fn create_ant_protocol(
532        data_dir: &std::path::Path,
533        identity: &NodeIdentity,
534        config: &DevnetConfig,
535    ) -> Result<AntProtocol> {
536        let storage_config = LmdbStorageConfig {
537            root_dir: data_dir.to_path_buf(),
538            verify_on_read: true,
539            ..LmdbStorageConfig::default()
540        };
541        let storage = LmdbStorage::new(storage_config)
542            .await
543            .map_err(|e| DevnetError::Core(format!("Failed to create LMDB storage: {e}")))?;
544
545        let evm_config = EvmVerifierConfig {
546            network: config
547                .evm_network
548                .clone()
549                .unwrap_or(EvmNetwork::ArbitrumOne),
550        };
551
552        let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS);
553        let payment_config = PaymentVerifierConfig {
554            evm: evm_config,
555            cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
556            local_rewards_address: rewards_address,
557        };
558        let payment_verifier = PaymentVerifier::new(payment_config);
559        let metrics_tracker = QuotingMetricsTracker::new(DEVNET_INITIAL_RECORDS);
560        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
561
562        // Wire ML-DSA-65 signing from the devnet node's identity
563        crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)
564            .map_err(|e| DevnetError::Startup(format!("Failed to wire ML-DSA-65 signer: {e}")))?;
565
566        Ok(AntProtocol::new(
567            Arc::new(storage),
568            Arc::new(payment_verifier),
569            Arc::new(quote_generator),
570        ))
571    }
572
573    async fn start_node(&mut self, mut node: DevnetNode) -> Result<()> {
574        debug!("Starting node {} on port {}", node.index, node.port);
575        *node.state.write().await = NodeState::Starting;
576
577        let mut core_config = CoreNodeConfig::builder()
578            .port(node.port)
579            .local(true)
580            .max_message_size(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE)
581            .build()
582            .map_err(|e| DevnetError::Core(format!("Failed to create core config: {e}")))?;
583
584        // Load the node identity for app-level message signing.
585        let identity = NodeIdentity::load_from_file(
586            &node.data_dir.join(crate::config::NODE_IDENTITY_FILENAME),
587        )
588        .await
589        .map_err(|e| DevnetError::Core(format!("Failed to load node identity: {e}")))?;
590
591        core_config.node_identity = Some(Arc::new(identity));
592        core_config
593            .bootstrap_peers
594            .clone_from(&node.bootstrap_addrs);
595        core_config.diversity_config = Some(IPDiversityConfig::permissive());
596
597        let index = node.index;
598        let p2p_node = P2PNode::new(core_config)
599            .await
600            .map_err(|e| DevnetError::Startup(format!("Failed to create node {index}: {e}")))?;
601
602        p2p_node
603            .start()
604            .await
605            .map_err(|e| DevnetError::Startup(format!("Failed to start node {index}: {e}")))?;
606
607        node.p2p_node = Some(Arc::new(p2p_node));
608        *node.state.write().await = NodeState::Running;
609
610        if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
611            // Wire the P2PNode into the payment verifier for merkle-closeness checks.
612            protocol
613                .payment_verifier_arc()
614                .attach_p2p_node(Arc::clone(p2p));
615
616            let mut events = p2p.subscribe_events();
617            let p2p_clone = Arc::clone(p2p);
618            let protocol_clone = Arc::clone(protocol);
619            let node_index = node.index;
620            node.protocol_task = Some(tokio::spawn(async move {
621                while let Ok(event) = events.recv().await {
622                    if let P2PEvent::Message {
623                        topic,
624                        source: Some(source),
625                        data,
626                    } = event
627                    {
628                        if topic == CHUNK_PROTOCOL_ID {
629                            debug!(
630                                "Node {node_index} received chunk protocol message from {source}"
631                            );
632                            let protocol = Arc::clone(&protocol_clone);
633                            let p2p = Arc::clone(&p2p_clone);
634                            tokio::spawn(async move {
635                                match protocol.try_handle_request(&data).await {
636                                    Ok(Some(response)) => {
637                                        if let Err(e) = p2p
638                                            .send_message(
639                                                &source,
640                                                CHUNK_PROTOCOL_ID,
641                                                response.to_vec(),
642                                                &[],
643                                            )
644                                            .await
645                                        {
646                                            warn!(
647                                                "Node {node_index} failed to send response to {source}: {e}"
648                                            );
649                                        }
650                                    }
651                                    Ok(None) => {}
652                                    Err(e) => {
653                                        warn!("Node {node_index} protocol handler error: {e}");
654                                    }
655                                }
656                            });
657                        }
658                    }
659                }
660            }));
661        }
662
663        debug!("Node {} started successfully", node.index);
664        self.nodes.push(node);
665        Ok(())
666    }
667
668    async fn wait_for_nodes_ready(&self, range: std::ops::Range<usize>) -> Result<()> {
669        let deadline = Instant::now() + self.config.node_startup_timeout;
670
671        for i in range {
672            while Instant::now() < deadline {
673                let node = self.nodes.get(i).ok_or_else(|| {
674                    DevnetError::Config(format!(
675                        "Node index {i} out of bounds (len: {})",
676                        self.nodes.len()
677                    ))
678                })?;
679                let state = node.state.read().await.clone();
680                match state {
681                    NodeState::Running | NodeState::Connected => break,
682                    NodeState::Failed(ref e) => {
683                        return Err(DevnetError::Startup(format!("Node {i} failed: {e}")));
684                    }
685                    _ => {
686                        tokio::time::sleep(Duration::from_millis(NODE_READY_POLL_INTERVAL_MS))
687                            .await;
688                    }
689                }
690            }
691        }
692        Ok(())
693    }
694
695    async fn wait_for_stabilization(&self) -> Result<()> {
696        let deadline = Instant::now() + self.config.stabilization_timeout;
697        let min_connections = self
698            .config
699            .bootstrap_count
700            .min(STABILIZATION_MIN_CONNECTIONS_CAP);
701
702        info!(
703            "Waiting for devnet stabilization (min {} connections per node)",
704            min_connections
705        );
706
707        while Instant::now() < deadline {
708            let mut all_connected = true;
709            let mut total_connections = 0;
710
711            for node in &self.nodes {
712                let peer_count = node.peer_count().await;
713                total_connections += peer_count;
714
715                if peer_count < min_connections {
716                    all_connected = false;
717                }
718            }
719
720            if all_connected {
721                info!("Devnet stabilized: {} total connections", total_connections);
722                return Ok(());
723            }
724
725            debug!(
726                "Waiting for stabilization: {} total connections",
727                total_connections
728            );
729            tokio::time::sleep(Duration::from_secs(STABILIZATION_POLL_INTERVAL_SECS)).await;
730        }
731
732        Err(DevnetError::Stabilization(
733            "Devnet failed to stabilize within timeout".to_string(),
734        ))
735    }
736
737    fn start_health_monitor(&mut self) {
738        let nodes: Vec<Arc<P2PNode>> = self
739            .nodes
740            .iter()
741            .filter_map(|n| n.p2p_node.clone())
742            .collect();
743        let shutdown = self.shutdown.clone();
744
745        self.health_monitor = Some(tokio::spawn(async move {
746            let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS);
747
748            loop {
749                tokio::select! {
750                    () = shutdown.cancelled() => break,
751                    () = tokio::time::sleep(check_interval) => {
752                        for (i, node) in nodes.iter().enumerate() {
753                            if !node.is_running() {
754                                warn!("Node {} appears unhealthy", i);
755                            }
756                        }
757                    }
758                }
759            }
760        }));
761    }
762}
763
764impl Drop for Devnet {
765    fn drop(&mut self) {
766        self.shutdown.cancel();
767        if let Some(handle) = self.health_monitor.take() {
768            handle.abort();
769        }
770    }
771}