Skip to main content

ant_node/
devnet.rs

1//! Local devnet infrastructure for spawning and managing multiple nodes.
2//!
3//! This module provides a local, in-process devnet suitable for running
4//! multi-node networks on a single machine.
5
6use crate::ant_protocol::CHUNK_PROTOCOL_ID;
7use crate::config::{default_root_dir, NODES_SUBDIR, NODE_IDENTITY_FILENAME};
8use crate::logging::{debug, info, warn};
9use crate::payment::{
10    EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
11    QuotingMetricsTracker,
12};
13use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
14use evmlib::Network as EvmNetwork;
15use evmlib::RewardsAddress;
16use rand::Rng;
17use saorsa_core::identity::NodeIdentity;
18use saorsa_core::{
19    IPDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId,
20};
21use std::net::{Ipv4Addr, SocketAddr};
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::sync::RwLock;
26use tokio::task::JoinHandle;
27use tokio::time::Instant;
28use tokio_util::sync::CancellationToken;
29
30// =============================================================================
31// Devnet Constants
32// =============================================================================
33
34/// Minimum port for random devnet allocation.
35pub const DEVNET_PORT_RANGE_MIN: u16 = 20_000;
36
37/// Maximum port for random devnet allocation.
38pub const DEVNET_PORT_RANGE_MAX: u16 = 60_000;
39
40// =============================================================================
41// Default Timing Constants
42// =============================================================================
43
44/// Default delay between spawning nodes (milliseconds).
45const DEFAULT_SPAWN_DELAY_MS: u64 = 200;
46
47/// Default timeout for network stabilization (seconds).
48const DEFAULT_STABILIZATION_TIMEOUT_SECS: u64 = 120;
49
50/// Default timeout for single node startup (seconds).
51const DEFAULT_NODE_STARTUP_TIMEOUT_SECS: u64 = 30;
52
53/// Stabilization timeout for minimal network (seconds).
54const MINIMAL_STABILIZATION_TIMEOUT_SECS: u64 = 30;
55
56/// Stabilization timeout for small network (seconds).
57const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;
58
59/// Polling interval when waiting for individual nodes to become ready (milliseconds).
60const NODE_READY_POLL_INTERVAL_MS: u64 = 100;
61
62/// Polling interval when waiting for network stabilization (seconds).
63const STABILIZATION_POLL_INTERVAL_SECS: u64 = 1;
64
65/// Maximum minimum connections required per node during stabilization.
66const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3;
67
68/// Health monitor check interval (seconds).
69const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;
70
71// =============================================================================
72// AntProtocol Devnet Configuration
73// =============================================================================
74
75/// Payment cache capacity for devnet nodes.
76const DEVNET_PAYMENT_CACHE_CAPACITY: usize = 1000;
77
78/// Devnet rewards address (20 bytes, all 0x01).
79const DEVNET_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];
80
81/// Initial records for quoting metrics (devnet value).
82const DEVNET_INITIAL_RECORDS: usize = 1000;
83
84// =============================================================================
85// Default Node Counts
86// =============================================================================
87
88/// Default number of nodes in a full devnet.
89pub const DEFAULT_NODE_COUNT: usize = 25;
90
91/// Default number of bootstrap nodes.
92pub const DEFAULT_BOOTSTRAP_COUNT: usize = 3;
93
94/// Number of nodes in a minimal devnet.
95pub const MINIMAL_NODE_COUNT: usize = 5;
96
97/// Number of bootstrap nodes in a minimal network.
98pub const MINIMAL_BOOTSTRAP_COUNT: usize = 2;
99
100/// Number of nodes in a small devnet.
101pub const SMALL_NODE_COUNT: usize = 10;
102
103/// Error type for devnet operations.
104#[derive(Debug, thiserror::Error)]
105pub enum DevnetError {
106    /// Configuration error
107    #[error("Configuration error: {0}")]
108    Config(String),
109
110    /// Node startup error
111    #[error("Node startup error: {0}")]
112    Startup(String),
113
114    /// Network stabilization error
115    #[error("Network stabilization error: {0}")]
116    Stabilization(String),
117
118    /// IO error
119    #[error("IO error: {0}")]
120    Io(#[from] std::io::Error),
121
122    /// Core error
123    #[error("Core error: {0}")]
124    Core(String),
125}
126
127/// Result type for devnet operations.
128pub type Result<T> = std::result::Result<T, DevnetError>;
129
130/// Configuration for the devnet.
131///
132/// Each configuration is automatically isolated with unique ports and
133/// data directories to prevent collisions when running multiple devnets.
134#[derive(Debug, Clone)]
135pub struct DevnetConfig {
136    /// Number of nodes to spawn (default: 25).
137    pub node_count: usize,
138
139    /// Base port for node allocation (0 = auto).
140    pub base_port: u16,
141
142    /// Number of bootstrap nodes (first N nodes, default: 3).
143    pub bootstrap_count: usize,
144
145    /// Root directory for devnet data.
146    pub data_dir: PathBuf,
147
148    /// Delay between node spawns (default: 200ms).
149    pub spawn_delay: Duration,
150
151    /// Timeout for network stabilization (default: 120s).
152    pub stabilization_timeout: Duration,
153
154    /// Timeout for single node startup (default: 30s).
155    pub node_startup_timeout: Duration,
156
157    /// Enable verbose logging for devnet nodes.
158    pub enable_node_logging: bool,
159
160    /// Whether to remove the data directory on shutdown.
161    pub cleanup_data_dir: bool,
162
163    /// Optional EVM network for payment verification.
164    /// When `Some`, nodes will use this network (e.g. Anvil testnet) for
165    /// on-chain verification. Defaults to Arbitrum One when `None`.
166    pub evm_network: Option<EvmNetwork>,
167}
168
169impl Default for DevnetConfig {
170    fn default() -> Self {
171        let mut rng = rand::thread_rng();
172
173        #[allow(clippy::cast_possible_truncation)] // DEFAULT_NODE_COUNT is 25, always fits u16
174        let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(DEFAULT_NODE_COUNT as u16);
175        let base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
176
177        Self {
178            node_count: DEFAULT_NODE_COUNT,
179            base_port,
180            bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
181            data_dir: default_root_dir(),
182            spawn_delay: Duration::from_millis(DEFAULT_SPAWN_DELAY_MS),
183            stabilization_timeout: Duration::from_secs(DEFAULT_STABILIZATION_TIMEOUT_SECS),
184            node_startup_timeout: Duration::from_secs(DEFAULT_NODE_STARTUP_TIMEOUT_SECS),
185            enable_node_logging: false,
186            cleanup_data_dir: true,
187            evm_network: None,
188        }
189    }
190}
191
192impl DevnetConfig {
193    /// Minimal devnet preset (5 nodes).
194    #[must_use]
195    pub fn minimal() -> Self {
196        Self {
197            node_count: MINIMAL_NODE_COUNT,
198            bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
199            stabilization_timeout: Duration::from_secs(MINIMAL_STABILIZATION_TIMEOUT_SECS),
200            ..Self::default()
201        }
202    }
203
204    /// Small devnet preset (10 nodes).
205    #[must_use]
206    pub fn small() -> Self {
207        Self {
208            node_count: SMALL_NODE_COUNT,
209            bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
210            stabilization_timeout: Duration::from_secs(SMALL_STABILIZATION_TIMEOUT_SECS),
211            ..Self::default()
212        }
213    }
214}
215
216// The manifest types are shared with ant-client (CLI reads them, devnet
217// writes them), so they live in ant-protocol. Re-exported here for
218// backwards compatibility.
219pub use ant_protocol::devnet_manifest::{DevnetEvmInfo, DevnetManifest};
220
221/// Network state for devnet startup lifecycle.
222#[derive(Debug, Clone)]
223pub enum NetworkState {
224    /// Not started.
225    Uninitialized,
226    /// Bootstrapping nodes are starting.
227    BootstrappingPhase,
228    /// Regular nodes are starting.
229    NodeSpawningPhase,
230    /// Waiting for stabilization.
231    Stabilizing,
232    /// Network is ready.
233    Ready,
234    /// Shutting down.
235    ShuttingDown,
236    /// Stopped.
237    Stopped,
238}
239
240/// Node state for devnet nodes.
241#[derive(Debug, Clone)]
242pub enum NodeState {
243    /// Not started yet.
244    Pending,
245    /// Starting up.
246    Starting,
247    /// Running.
248    Running,
249    /// Connected to peers.
250    Connected,
251    /// Stopped.
252    Stopped,
253    /// Failed to start.
254    Failed(String),
255}
256
257/// A single devnet node instance.
258#[allow(dead_code)]
259pub struct DevnetNode {
260    index: usize,
261    label: String,
262    peer_id: PeerId,
263    port: u16,
264    data_dir: PathBuf,
265    p2p_node: Option<Arc<P2PNode>>,
266    ant_protocol: Option<Arc<AntProtocol>>,
267    is_bootstrap: bool,
268    state: Arc<RwLock<NodeState>>,
269    bootstrap_addrs: Vec<MultiAddr>,
270    protocol_task: Option<JoinHandle<()>>,
271}
272
273impl DevnetNode {
274    /// Get the node's peer count.
275    pub async fn peer_count(&self) -> usize {
276        if let Some(ref node) = self.p2p_node {
277            node.peer_count().await
278        } else {
279            0
280        }
281    }
282}
283
284/// A local devnet composed of multiple nodes.
285pub struct Devnet {
286    config: DevnetConfig,
287    nodes: Vec<DevnetNode>,
288    shutdown: CancellationToken,
289    state: Arc<RwLock<NetworkState>>,
290    health_monitor: Option<JoinHandle<()>>,
291}
292
293impl Devnet {
294    /// Create a new devnet with the given configuration.
295    ///
296    /// # Errors
297    ///
298    /// Returns `DevnetError::Config` if the configuration is invalid (e.g. bootstrap
299    /// count exceeds node count or port range overflow).
300    /// Returns `DevnetError::Io` if the data directory cannot be created.
301    pub async fn new(mut config: DevnetConfig) -> Result<Self> {
302        if config.bootstrap_count >= config.node_count {
303            return Err(DevnetError::Config(
304                "Bootstrap count must be less than node count".to_string(),
305            ));
306        }
307
308        if config.bootstrap_count == 0 {
309            return Err(DevnetError::Config(
310                "At least one bootstrap node is required".to_string(),
311            ));
312        }
313
314        let node_count = config.node_count;
315        let node_count_u16 = u16::try_from(node_count).map_err(|_| {
316            DevnetError::Config(format!("Node count {node_count} exceeds u16::MAX"))
317        })?;
318
319        if config.base_port == 0 {
320            let mut rng = rand::thread_rng();
321            let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(node_count_u16);
322            config.base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
323        }
324
325        let base_port = config.base_port;
326        let max_port = base_port
327            .checked_add(node_count_u16)
328            .ok_or_else(|| {
329                DevnetError::Config(format!(
330                    "Port range overflow: base_port {base_port} + node_count {node_count} exceeds u16::MAX"
331                ))
332            })?;
333        if max_port > DEVNET_PORT_RANGE_MAX {
334            return Err(DevnetError::Config(format!(
335                "Port range overflow: max port {max_port} exceeds DEVNET_PORT_RANGE_MAX {DEVNET_PORT_RANGE_MAX}"
336            )));
337        }
338
339        tokio::fs::create_dir_all(&config.data_dir).await?;
340
341        Ok(Self {
342            config,
343            nodes: Vec::new(),
344            shutdown: CancellationToken::new(),
345            state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
346            health_monitor: None,
347        })
348    }
349
350    /// Start the devnet.
351    ///
352    /// # Errors
353    ///
354    /// Returns `DevnetError::Startup` if any node fails to start, or
355    /// `DevnetError::Stabilization` if the network does not stabilize within the timeout.
356    pub async fn start(&mut self) -> Result<()> {
357        info!(
358            "Starting devnet with {} nodes ({} bootstrap)",
359            self.config.node_count, self.config.bootstrap_count
360        );
361
362        *self.state.write().await = NetworkState::BootstrappingPhase;
363        self.start_bootstrap_nodes().await?;
364
365        *self.state.write().await = NetworkState::NodeSpawningPhase;
366        self.start_regular_nodes().await?;
367
368        *self.state.write().await = NetworkState::Stabilizing;
369        self.wait_for_stabilization().await?;
370
371        self.start_health_monitor();
372
373        *self.state.write().await = NetworkState::Ready;
374        info!("Devnet is ready");
375        Ok(())
376    }
377
378    /// Shutdown the devnet.
379    ///
380    /// # Errors
381    ///
382    /// Returns `DevnetError::Io` if the data directory cleanup fails.
383    pub async fn shutdown(&mut self) -> Result<()> {
384        info!("Shutting down devnet");
385        *self.state.write().await = NetworkState::ShuttingDown;
386
387        self.shutdown.cancel();
388
389        if let Some(handle) = self.health_monitor.take() {
390            handle.abort();
391        }
392
393        let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
394        for node in self.nodes.iter_mut().rev() {
395            debug!("Stopping node {}", node.index);
396            if let Some(handle) = node.protocol_task.take() {
397                handle.abort();
398            }
399
400            let node_index = node.index;
401            let node_state = Arc::clone(&node.state);
402            let p2p_node = node.p2p_node.take();
403
404            shutdown_futures.push(async move {
405                if let Some(p2p) = p2p_node {
406                    if let Err(e) = p2p.shutdown().await {
407                        warn!("Error shutting down node {node_index}: {e}");
408                    }
409                }
410                *node_state.write().await = NodeState::Stopped;
411            });
412        }
413        futures::future::join_all(shutdown_futures).await;
414
415        if self.config.cleanup_data_dir {
416            if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await {
417                warn!("Failed to cleanup devnet data directory: {e}");
418            }
419        }
420
421        *self.state.write().await = NetworkState::Stopped;
422        info!("Devnet shutdown complete");
423        Ok(())
424    }
425
426    /// Get devnet configuration.
427    #[must_use]
428    pub fn config(&self) -> &DevnetConfig {
429        &self.config
430    }
431
432    /// Get bootstrap addresses.
433    #[must_use]
434    pub fn bootstrap_addrs(&self) -> Vec<MultiAddr> {
435        self.nodes
436            .iter()
437            .take(self.config.bootstrap_count)
438            .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
439            .collect()
440    }
441
442    async fn start_bootstrap_nodes(&mut self) -> Result<()> {
443        info!("Starting {} bootstrap nodes", self.config.bootstrap_count);
444
445        for i in 0..self.config.bootstrap_count {
446            let node = self.create_node(i, true, vec![]).await?;
447            self.start_node(node).await?;
448            tokio::time::sleep(self.config.spawn_delay).await;
449        }
450
451        self.wait_for_nodes_ready(0..self.config.bootstrap_count)
452            .await?;
453
454        info!("All bootstrap nodes are ready");
455        Ok(())
456    }
457
458    async fn start_regular_nodes(&mut self) -> Result<()> {
459        let regular_count = self.config.node_count - self.config.bootstrap_count;
460        info!("Starting {} regular nodes", regular_count);
461
462        let bootstrap_addrs: Vec<MultiAddr> = self
463            .nodes
464            .get(0..self.config.bootstrap_count)
465            .ok_or_else(|| {
466                DevnetError::Config(format!(
467                    "Bootstrap count {} exceeds nodes length {}",
468                    self.config.bootstrap_count,
469                    self.nodes.len()
470                ))
471            })?
472            .iter()
473            .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
474            .collect();
475
476        for i in self.config.bootstrap_count..self.config.node_count {
477            let node = self.create_node(i, false, bootstrap_addrs.clone()).await?;
478            self.start_node(node).await?;
479            tokio::time::sleep(self.config.spawn_delay).await;
480        }
481
482        info!("All regular nodes started");
483        Ok(())
484    }
485
486    async fn create_node(
487        &self,
488        index: usize,
489        is_bootstrap: bool,
490        bootstrap_addrs: Vec<MultiAddr>,
491    ) -> Result<DevnetNode> {
492        let index_u16 = u16::try_from(index)
493            .map_err(|_| DevnetError::Config(format!("Node index {index} exceeds u16::MAX")))?;
494        let port = self.config.base_port + index_u16;
495
496        // Generate identity first so we can use peer_id as the directory name
497        let identity = NodeIdentity::generate()
498            .map_err(|e| DevnetError::Core(format!("Failed to generate node identity: {e}")))?;
499        let peer_id = *identity.peer_id();
500        let label = format!("devnet_node_{index}");
501        let data_dir = self
502            .config
503            .data_dir
504            .join(NODES_SUBDIR)
505            .join(peer_id.to_hex());
506
507        tokio::fs::create_dir_all(&data_dir).await?;
508
509        identity
510            .save_to_file(&data_dir.join(NODE_IDENTITY_FILENAME))
511            .await
512            .map_err(|e| DevnetError::Core(format!("Failed to save node identity: {e}")))?;
513
514        let ant_protocol = Self::create_ant_protocol(&data_dir, &identity, &self.config).await?;
515
516        Ok(DevnetNode {
517            index,
518            label,
519            peer_id,
520            port,
521            data_dir,
522            p2p_node: None,
523            ant_protocol: Some(Arc::new(ant_protocol)),
524            is_bootstrap,
525            state: Arc::new(RwLock::new(NodeState::Pending)),
526            bootstrap_addrs,
527            protocol_task: None,
528        })
529    }
530
531    async fn create_ant_protocol(
532        data_dir: &std::path::Path,
533        identity: &NodeIdentity,
534        config: &DevnetConfig,
535    ) -> Result<AntProtocol> {
536        let storage_config = LmdbStorageConfig {
537            root_dir: data_dir.to_path_buf(),
538            verify_on_read: true,
539            ..LmdbStorageConfig::default()
540        };
541        let storage = LmdbStorage::new(storage_config)
542            .await
543            .map_err(|e| DevnetError::Core(format!("Failed to create LMDB storage: {e}")))?;
544
545        let evm_config = EvmVerifierConfig {
546            network: config
547                .evm_network
548                .clone()
549                .unwrap_or(EvmNetwork::ArbitrumOne),
550        };
551
552        let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS);
553        let payment_config = PaymentVerifierConfig {
554            evm: evm_config,
555            cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
556            local_rewards_address: rewards_address,
557        };
558        let payment_verifier = PaymentVerifier::new(payment_config);
559        let metrics_tracker = QuotingMetricsTracker::new(DEVNET_INITIAL_RECORDS);
560        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
561
562        // Wire ML-DSA-65 signing from the devnet node's identity
563        crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)
564            .map_err(|e| DevnetError::Startup(format!("Failed to wire ML-DSA-65 signer: {e}")))?;
565
566        let storage = Arc::new(storage);
567        let payment_verifier = Arc::new(payment_verifier);
568
569        Ok(AntProtocol::new(
570            storage,
571            payment_verifier,
572            Arc::new(quote_generator),
573        ))
574    }
575
576    async fn start_node(&mut self, mut node: DevnetNode) -> Result<()> {
577        debug!("Starting node {} on port {}", node.index, node.port);
578        *node.state.write().await = NodeState::Starting;
579
580        let mut core_config = CoreNodeConfig::builder()
581            .port(node.port)
582            .local(true)
583            .max_message_size(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE)
584            .build()
585            .map_err(|e| DevnetError::Core(format!("Failed to create core config: {e}")))?;
586
587        // Load the node identity for app-level message signing.
588        let identity = NodeIdentity::load_from_file(
589            &node.data_dir.join(crate::config::NODE_IDENTITY_FILENAME),
590        )
591        .await
592        .map_err(|e| DevnetError::Core(format!("Failed to load node identity: {e}")))?;
593
594        core_config.node_identity = Some(Arc::new(identity));
595        core_config
596            .bootstrap_peers
597            .clone_from(&node.bootstrap_addrs);
598        core_config.diversity_config = Some(IPDiversityConfig::permissive());
599
600        let index = node.index;
601        let p2p_node = P2PNode::new(core_config)
602            .await
603            .map_err(|e| DevnetError::Startup(format!("Failed to create node {index}: {e}")))?;
604
605        p2p_node
606            .start()
607            .await
608            .map_err(|e| DevnetError::Startup(format!("Failed to start node {index}: {e}")))?;
609
610        node.p2p_node = Some(Arc::new(p2p_node));
611        *node.state.write().await = NodeState::Running;
612
613        if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
614            // Wire the P2PNode into the payment verifier for merkle-closeness checks.
615            protocol
616                .payment_verifier_arc()
617                .attach_p2p_node(Arc::clone(p2p));
618
619            let mut events = p2p.subscribe_events();
620            let p2p_clone = Arc::clone(p2p);
621            let protocol_clone = Arc::clone(protocol);
622            let node_index = node.index;
623            node.protocol_task = Some(tokio::spawn(async move {
624                while let Ok(event) = events.recv().await {
625                    if let P2PEvent::Message {
626                        topic,
627                        source: Some(source),
628                        data,
629                        ..
630                    } = event
631                    {
632                        if topic == CHUNK_PROTOCOL_ID {
633                            debug!(
634                                "Node {node_index} received chunk protocol message from {source}"
635                            );
636                            let protocol = Arc::clone(&protocol_clone);
637                            let p2p = Arc::clone(&p2p_clone);
638                            tokio::spawn(async move {
639                                match protocol.try_handle_request(&data).await {
640                                    Ok(Some(response)) => {
641                                        if let Err(e) = p2p
642                                            .send_message(
643                                                &source,
644                                                CHUNK_PROTOCOL_ID,
645                                                response.to_vec(),
646                                                &[],
647                                            )
648                                            .await
649                                        {
650                                            warn!(
651                                                "Node {node_index} failed to send response to {source}: {e}"
652                                            );
653                                        }
654                                    }
655                                    Ok(None) => {}
656                                    Err(e) => {
657                                        warn!("Node {node_index} protocol handler error: {e}");
658                                    }
659                                }
660                            });
661                        }
662                    }
663                }
664            }));
665        }
666
667        debug!("Node {} started successfully", node.index);
668        self.nodes.push(node);
669        Ok(())
670    }
671
672    async fn wait_for_nodes_ready(&self, range: std::ops::Range<usize>) -> Result<()> {
673        let deadline = Instant::now() + self.config.node_startup_timeout;
674
675        for i in range {
676            while Instant::now() < deadline {
677                let node = self.nodes.get(i).ok_or_else(|| {
678                    DevnetError::Config(format!(
679                        "Node index {i} out of bounds (len: {})",
680                        self.nodes.len()
681                    ))
682                })?;
683                let state = node.state.read().await.clone();
684                match state {
685                    NodeState::Running | NodeState::Connected => break,
686                    NodeState::Failed(ref e) => {
687                        return Err(DevnetError::Startup(format!("Node {i} failed: {e}")));
688                    }
689                    _ => {
690                        tokio::time::sleep(Duration::from_millis(NODE_READY_POLL_INTERVAL_MS))
691                            .await;
692                    }
693                }
694            }
695        }
696        Ok(())
697    }
698
699    async fn wait_for_stabilization(&self) -> Result<()> {
700        let deadline = Instant::now() + self.config.stabilization_timeout;
701        let min_connections = self
702            .config
703            .bootstrap_count
704            .min(STABILIZATION_MIN_CONNECTIONS_CAP);
705
706        info!(
707            "Waiting for devnet stabilization (min {} connections per node)",
708            min_connections
709        );
710
711        while Instant::now() < deadline {
712            let mut all_connected = true;
713            let mut total_connections = 0;
714
715            for node in &self.nodes {
716                let peer_count = node.peer_count().await;
717                total_connections += peer_count;
718
719                if peer_count < min_connections {
720                    all_connected = false;
721                }
722            }
723
724            if all_connected {
725                info!("Devnet stabilized: {} total connections", total_connections);
726                return Ok(());
727            }
728
729            debug!(
730                "Waiting for stabilization: {} total connections",
731                total_connections
732            );
733            tokio::time::sleep(Duration::from_secs(STABILIZATION_POLL_INTERVAL_SECS)).await;
734        }
735
736        Err(DevnetError::Stabilization(
737            "Devnet failed to stabilize within timeout".to_string(),
738        ))
739    }
740
741    fn start_health_monitor(&mut self) {
742        let nodes: Vec<Arc<P2PNode>> = self
743            .nodes
744            .iter()
745            .filter_map(|n| n.p2p_node.clone())
746            .collect();
747        let shutdown = self.shutdown.clone();
748
749        self.health_monitor = Some(tokio::spawn(async move {
750            let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS);
751
752            loop {
753                tokio::select! {
754                    () = shutdown.cancelled() => break,
755                    () = tokio::time::sleep(check_interval) => {
756                        for (i, node) in nodes.iter().enumerate() {
757                            if !node.is_running() {
758                                warn!("Node {} appears unhealthy", i);
759                            }
760                        }
761                    }
762                }
763            }
764        }));
765    }
766}
767
768impl Drop for Devnet {
769    fn drop(&mut self) {
770        self.shutdown.cancel();
771        if let Some(handle) = self.health_monitor.take() {
772            handle.abort();
773        }
774    }
775}