ant-node 0.10.1

Pure quantum-proof network node for the Autonomi decentralized network
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
//! Local devnet infrastructure for spawning and managing multiple nodes.
//!
//! This module provides a local, in-process devnet suitable for running
//! multi-node networks on a single machine.

use crate::ant_protocol::CHUNK_PROTOCOL_ID;
use crate::config::{default_root_dir, NODES_SUBDIR, NODE_IDENTITY_FILENAME};
use crate::logging::{debug, info, warn};
use crate::payment::{
    EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
    QuotingMetricsTracker,
};
use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
use evmlib::Network as EvmNetwork;
use evmlib::RewardsAddress;
use rand::Rng;
use saorsa_core::identity::NodeIdentity;
use saorsa_core::{
    IPDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId,
};
use serde::{Deserialize, Serialize};
use std::net::{Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;

// =============================================================================
// Devnet Constants
// =============================================================================

/// Minimum port for random devnet allocation.
pub const DEVNET_PORT_RANGE_MIN: u16 = 20_000;

/// Maximum port for random devnet allocation.
pub const DEVNET_PORT_RANGE_MAX: u16 = 60_000;

// =============================================================================
// Default Timing Constants
// =============================================================================

/// Default delay between spawning nodes (milliseconds).
const DEFAULT_SPAWN_DELAY_MS: u64 = 200;

/// Default timeout for network stabilization (seconds).
const DEFAULT_STABILIZATION_TIMEOUT_SECS: u64 = 120;

/// Default timeout for single node startup (seconds).
const DEFAULT_NODE_STARTUP_TIMEOUT_SECS: u64 = 30;

/// Stabilization timeout for minimal network (seconds).
const MINIMAL_STABILIZATION_TIMEOUT_SECS: u64 = 30;

/// Stabilization timeout for small network (seconds).
const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;

/// Polling interval when waiting for individual nodes to become ready (milliseconds).
const NODE_READY_POLL_INTERVAL_MS: u64 = 100;

/// Polling interval when waiting for network stabilization (seconds).
const STABILIZATION_POLL_INTERVAL_SECS: u64 = 1;

/// Maximum minimum connections required per node during stabilization.
const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3;

/// Health monitor check interval (seconds).
const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;

// =============================================================================
// AntProtocol Devnet Configuration
// =============================================================================

/// Payment cache capacity for devnet nodes.
const DEVNET_PAYMENT_CACHE_CAPACITY: usize = 1000;

/// Devnet rewards address (20 bytes, all 0x01).
const DEVNET_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];

/// Initial records for quoting metrics (devnet value).
const DEVNET_INITIAL_RECORDS: usize = 1000;

// =============================================================================
// Default Node Counts
// =============================================================================

/// Default number of nodes in a full devnet.
pub const DEFAULT_NODE_COUNT: usize = 25;

/// Default number of bootstrap nodes.
pub const DEFAULT_BOOTSTRAP_COUNT: usize = 3;

/// Number of nodes in a minimal devnet.
pub const MINIMAL_NODE_COUNT: usize = 5;

/// Number of bootstrap nodes in a minimal network.
pub const MINIMAL_BOOTSTRAP_COUNT: usize = 2;

/// Number of nodes in a small devnet.
pub const SMALL_NODE_COUNT: usize = 10;

/// Error type for devnet operations.
#[derive(Debug, thiserror::Error)]
pub enum DevnetError {
    /// Configuration error
    #[error("Configuration error: {0}")]
    Config(String),

    /// Node startup error
    #[error("Node startup error: {0}")]
    Startup(String),

    /// Network stabilization error
    #[error("Network stabilization error: {0}")]
    Stabilization(String),

    /// IO error
    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),

    /// Core error
    #[error("Core error: {0}")]
    Core(String),
}

/// Result type for devnet operations.
pub type Result<T> = std::result::Result<T, DevnetError>;

/// Configuration for the devnet.
///
/// Each configuration is automatically isolated with unique ports and
/// data directories to prevent collisions when running multiple devnets.
#[derive(Debug, Clone)]
pub struct DevnetConfig {
    /// Number of nodes to spawn (default: 25).
    pub node_count: usize,

    /// Base port for node allocation (0 = auto).
    pub base_port: u16,

    /// Number of bootstrap nodes (first N nodes, default: 3).
    pub bootstrap_count: usize,

    /// Root directory for devnet data.
    pub data_dir: PathBuf,

    /// Delay between node spawns (default: 200ms).
    pub spawn_delay: Duration,

    /// Timeout for network stabilization (default: 120s).
    pub stabilization_timeout: Duration,

    /// Timeout for single node startup (default: 30s).
    pub node_startup_timeout: Duration,

    /// Enable verbose logging for devnet nodes.
    pub enable_node_logging: bool,

    /// Whether to remove the data directory on shutdown.
    pub cleanup_data_dir: bool,

    /// Optional EVM network for payment verification.
    /// When `Some`, nodes will use this network (e.g. Anvil testnet) for
    /// on-chain verification. Defaults to Arbitrum One when `None`.
    pub evm_network: Option<EvmNetwork>,
}

impl Default for DevnetConfig {
    fn default() -> Self {
        let mut rng = rand::thread_rng();

        #[allow(clippy::cast_possible_truncation)] // DEFAULT_NODE_COUNT is 25, always fits u16
        let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(DEFAULT_NODE_COUNT as u16);
        let base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);

        Self {
            node_count: DEFAULT_NODE_COUNT,
            base_port,
            bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
            data_dir: default_root_dir(),
            spawn_delay: Duration::from_millis(DEFAULT_SPAWN_DELAY_MS),
            stabilization_timeout: Duration::from_secs(DEFAULT_STABILIZATION_TIMEOUT_SECS),
            node_startup_timeout: Duration::from_secs(DEFAULT_NODE_STARTUP_TIMEOUT_SECS),
            enable_node_logging: false,
            cleanup_data_dir: true,
            evm_network: None,
        }
    }
}

impl DevnetConfig {
    /// Minimal devnet preset (5 nodes).
    #[must_use]
    pub fn minimal() -> Self {
        Self {
            node_count: MINIMAL_NODE_COUNT,
            bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
            stabilization_timeout: Duration::from_secs(MINIMAL_STABILIZATION_TIMEOUT_SECS),
            ..Self::default()
        }
    }

    /// Small devnet preset (10 nodes).
    #[must_use]
    pub fn small() -> Self {
        Self {
            node_count: SMALL_NODE_COUNT,
            bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
            stabilization_timeout: Duration::from_secs(SMALL_STABILIZATION_TIMEOUT_SECS),
            ..Self::default()
        }
    }
}

/// Devnet manifest for client discovery.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DevnetManifest {
    /// Base port for nodes.
    pub base_port: u16,
    /// Node count.
    pub node_count: usize,
    /// Bootstrap addresses.
    pub bootstrap: Vec<MultiAddr>,
    /// Data directory.
    pub data_dir: PathBuf,
    /// Creation time in RFC3339.
    pub created_at: String,
    /// EVM configuration (present when EVM payment enforcement is enabled).
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub evm: Option<DevnetEvmInfo>,
}

/// EVM configuration info included in the devnet manifest.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DevnetEvmInfo {
    /// Anvil RPC URL.
    pub rpc_url: String,
    /// Funded wallet private key (hex-encoded with 0x prefix).
    pub wallet_private_key: String,
    /// Payment token contract address.
    pub payment_token_address: String,
    /// Unified payment vault contract address (handles both single-node and merkle payments).
    pub payment_vault_address: String,
}

/// Network state for devnet startup lifecycle.
#[derive(Debug, Clone)]
pub enum NetworkState {
    /// Not started.
    Uninitialized,
    /// Bootstrapping nodes are starting.
    BootstrappingPhase,
    /// Regular nodes are starting.
    NodeSpawningPhase,
    /// Waiting for stabilization.
    Stabilizing,
    /// Network is ready.
    Ready,
    /// Shutting down.
    ShuttingDown,
    /// Stopped.
    Stopped,
}

/// Node state for devnet nodes.
#[derive(Debug, Clone)]
pub enum NodeState {
    /// Not started yet.
    Pending,
    /// Starting up.
    Starting,
    /// Running.
    Running,
    /// Connected to peers.
    Connected,
    /// Stopped.
    Stopped,
    /// Failed to start.
    Failed(String),
}

/// A single devnet node instance.
#[allow(dead_code)]
pub struct DevnetNode {
    index: usize,
    label: String,
    peer_id: PeerId,
    port: u16,
    data_dir: PathBuf,
    p2p_node: Option<Arc<P2PNode>>,
    ant_protocol: Option<Arc<AntProtocol>>,
    is_bootstrap: bool,
    state: Arc<RwLock<NodeState>>,
    bootstrap_addrs: Vec<MultiAddr>,
    protocol_task: Option<JoinHandle<()>>,
}

impl DevnetNode {
    /// Get the node's peer count.
    pub async fn peer_count(&self) -> usize {
        if let Some(ref node) = self.p2p_node {
            node.peer_count().await
        } else {
            0
        }
    }
}

/// A local devnet composed of multiple nodes.
pub struct Devnet {
    config: DevnetConfig,
    nodes: Vec<DevnetNode>,
    shutdown: CancellationToken,
    state: Arc<RwLock<NetworkState>>,
    health_monitor: Option<JoinHandle<()>>,
}

impl Devnet {
    /// Create a new devnet with the given configuration.
    ///
    /// # Errors
    ///
    /// Returns `DevnetError::Config` if the configuration is invalid (e.g. bootstrap
    /// count exceeds node count or port range overflow).
    /// Returns `DevnetError::Io` if the data directory cannot be created.
    pub async fn new(mut config: DevnetConfig) -> Result<Self> {
        if config.bootstrap_count >= config.node_count {
            return Err(DevnetError::Config(
                "Bootstrap count must be less than node count".to_string(),
            ));
        }

        if config.bootstrap_count == 0 {
            return Err(DevnetError::Config(
                "At least one bootstrap node is required".to_string(),
            ));
        }

        let node_count = config.node_count;
        let node_count_u16 = u16::try_from(node_count).map_err(|_| {
            DevnetError::Config(format!("Node count {node_count} exceeds u16::MAX"))
        })?;

        if config.base_port == 0 {
            let mut rng = rand::thread_rng();
            let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(node_count_u16);
            config.base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
        }

        let base_port = config.base_port;
        let max_port = base_port
            .checked_add(node_count_u16)
            .ok_or_else(|| {
                DevnetError::Config(format!(
                    "Port range overflow: base_port {base_port} + node_count {node_count} exceeds u16::MAX"
                ))
            })?;
        if max_port > DEVNET_PORT_RANGE_MAX {
            return Err(DevnetError::Config(format!(
                "Port range overflow: max port {max_port} exceeds DEVNET_PORT_RANGE_MAX {DEVNET_PORT_RANGE_MAX}"
            )));
        }

        tokio::fs::create_dir_all(&config.data_dir).await?;

        Ok(Self {
            config,
            nodes: Vec::new(),
            shutdown: CancellationToken::new(),
            state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
            health_monitor: None,
        })
    }

    /// Start the devnet.
    ///
    /// # Errors
    ///
    /// Returns `DevnetError::Startup` if any node fails to start, or
    /// `DevnetError::Stabilization` if the network does not stabilize within the timeout.
    pub async fn start(&mut self) -> Result<()> {
        info!(
            "Starting devnet with {} nodes ({} bootstrap)",
            self.config.node_count, self.config.bootstrap_count
        );

        *self.state.write().await = NetworkState::BootstrappingPhase;
        self.start_bootstrap_nodes().await?;

        *self.state.write().await = NetworkState::NodeSpawningPhase;
        self.start_regular_nodes().await?;

        *self.state.write().await = NetworkState::Stabilizing;
        self.wait_for_stabilization().await?;

        self.start_health_monitor();

        *self.state.write().await = NetworkState::Ready;
        info!("Devnet is ready");
        Ok(())
    }

    /// Shutdown the devnet.
    ///
    /// # Errors
    ///
    /// Returns `DevnetError::Io` if the data directory cleanup fails.
    pub async fn shutdown(&mut self) -> Result<()> {
        info!("Shutting down devnet");
        *self.state.write().await = NetworkState::ShuttingDown;

        self.shutdown.cancel();

        if let Some(handle) = self.health_monitor.take() {
            handle.abort();
        }

        let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
        for node in self.nodes.iter_mut().rev() {
            debug!("Stopping node {}", node.index);
            if let Some(handle) = node.protocol_task.take() {
                handle.abort();
            }

            let node_index = node.index;
            let node_state = Arc::clone(&node.state);
            let p2p_node = node.p2p_node.take();

            shutdown_futures.push(async move {
                if let Some(p2p) = p2p_node {
                    if let Err(e) = p2p.shutdown().await {
                        warn!("Error shutting down node {node_index}: {e}");
                    }
                }
                *node_state.write().await = NodeState::Stopped;
            });
        }
        futures::future::join_all(shutdown_futures).await;

        if self.config.cleanup_data_dir {
            if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await {
                warn!("Failed to cleanup devnet data directory: {e}");
            }
        }

        *self.state.write().await = NetworkState::Stopped;
        info!("Devnet shutdown complete");
        Ok(())
    }

    /// Get devnet configuration.
    #[must_use]
    pub fn config(&self) -> &DevnetConfig {
        &self.config
    }

    /// Get bootstrap addresses.
    #[must_use]
    pub fn bootstrap_addrs(&self) -> Vec<MultiAddr> {
        self.nodes
            .iter()
            .take(self.config.bootstrap_count)
            .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
            .collect()
    }

    async fn start_bootstrap_nodes(&mut self) -> Result<()> {
        info!("Starting {} bootstrap nodes", self.config.bootstrap_count);

        for i in 0..self.config.bootstrap_count {
            let node = self.create_node(i, true, vec![]).await?;
            self.start_node(node).await?;
            tokio::time::sleep(self.config.spawn_delay).await;
        }

        self.wait_for_nodes_ready(0..self.config.bootstrap_count)
            .await?;

        info!("All bootstrap nodes are ready");
        Ok(())
    }

    async fn start_regular_nodes(&mut self) -> Result<()> {
        let regular_count = self.config.node_count - self.config.bootstrap_count;
        info!("Starting {} regular nodes", regular_count);

        let bootstrap_addrs: Vec<MultiAddr> = self
            .nodes
            .get(0..self.config.bootstrap_count)
            .ok_or_else(|| {
                DevnetError::Config(format!(
                    "Bootstrap count {} exceeds nodes length {}",
                    self.config.bootstrap_count,
                    self.nodes.len()
                ))
            })?
            .iter()
            .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
            .collect();

        for i in self.config.bootstrap_count..self.config.node_count {
            let node = self.create_node(i, false, bootstrap_addrs.clone()).await?;
            self.start_node(node).await?;
            tokio::time::sleep(self.config.spawn_delay).await;
        }

        info!("All regular nodes started");
        Ok(())
    }

    async fn create_node(
        &self,
        index: usize,
        is_bootstrap: bool,
        bootstrap_addrs: Vec<MultiAddr>,
    ) -> Result<DevnetNode> {
        let index_u16 = u16::try_from(index)
            .map_err(|_| DevnetError::Config(format!("Node index {index} exceeds u16::MAX")))?;
        let port = self.config.base_port + index_u16;

        // Generate identity first so we can use peer_id as the directory name
        let identity = NodeIdentity::generate()
            .map_err(|e| DevnetError::Core(format!("Failed to generate node identity: {e}")))?;
        let peer_id = *identity.peer_id();
        let label = format!("devnet_node_{index}");
        let data_dir = self
            .config
            .data_dir
            .join(NODES_SUBDIR)
            .join(peer_id.to_hex());

        tokio::fs::create_dir_all(&data_dir).await?;

        identity
            .save_to_file(&data_dir.join(NODE_IDENTITY_FILENAME))
            .await
            .map_err(|e| DevnetError::Core(format!("Failed to save node identity: {e}")))?;

        let ant_protocol = Self::create_ant_protocol(&data_dir, &identity, &self.config).await?;

        Ok(DevnetNode {
            index,
            label,
            peer_id,
            port,
            data_dir,
            p2p_node: None,
            ant_protocol: Some(Arc::new(ant_protocol)),
            is_bootstrap,
            state: Arc::new(RwLock::new(NodeState::Pending)),
            bootstrap_addrs,
            protocol_task: None,
        })
    }

    async fn create_ant_protocol(
        data_dir: &std::path::Path,
        identity: &NodeIdentity,
        config: &DevnetConfig,
    ) -> Result<AntProtocol> {
        let storage_config = LmdbStorageConfig {
            root_dir: data_dir.to_path_buf(),
            verify_on_read: true,
            ..LmdbStorageConfig::default()
        };
        let storage = LmdbStorage::new(storage_config)
            .await
            .map_err(|e| DevnetError::Core(format!("Failed to create LMDB storage: {e}")))?;

        let evm_config = EvmVerifierConfig {
            network: config
                .evm_network
                .clone()
                .unwrap_or(EvmNetwork::ArbitrumOne),
        };

        let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS);
        let payment_config = PaymentVerifierConfig {
            evm: evm_config,
            cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
            local_rewards_address: rewards_address,
        };
        let payment_verifier = PaymentVerifier::new(payment_config);
        let metrics_tracker = QuotingMetricsTracker::new(DEVNET_INITIAL_RECORDS);
        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);

        // Wire ML-DSA-65 signing from the devnet node's identity
        crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)
            .map_err(|e| DevnetError::Startup(format!("Failed to wire ML-DSA-65 signer: {e}")))?;

        Ok(AntProtocol::new(
            Arc::new(storage),
            Arc::new(payment_verifier),
            Arc::new(quote_generator),
        ))
    }

    async fn start_node(&mut self, mut node: DevnetNode) -> Result<()> {
        debug!("Starting node {} on port {}", node.index, node.port);
        *node.state.write().await = NodeState::Starting;

        let mut core_config = CoreNodeConfig::builder()
            .port(node.port)
            .local(true)
            .max_message_size(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE)
            .build()
            .map_err(|e| DevnetError::Core(format!("Failed to create core config: {e}")))?;

        // Load the node identity for app-level message signing.
        let identity = NodeIdentity::load_from_file(
            &node.data_dir.join(crate::config::NODE_IDENTITY_FILENAME),
        )
        .await
        .map_err(|e| DevnetError::Core(format!("Failed to load node identity: {e}")))?;

        core_config.node_identity = Some(Arc::new(identity));
        core_config
            .bootstrap_peers
            .clone_from(&node.bootstrap_addrs);
        core_config.diversity_config = Some(IPDiversityConfig::permissive());

        let index = node.index;
        let p2p_node = P2PNode::new(core_config)
            .await
            .map_err(|e| DevnetError::Startup(format!("Failed to create node {index}: {e}")))?;

        p2p_node
            .start()
            .await
            .map_err(|e| DevnetError::Startup(format!("Failed to start node {index}: {e}")))?;

        node.p2p_node = Some(Arc::new(p2p_node));
        *node.state.write().await = NodeState::Running;

        if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
            let mut events = p2p.subscribe_events();
            let p2p_clone = Arc::clone(p2p);
            let protocol_clone = Arc::clone(protocol);
            let node_index = node.index;
            node.protocol_task = Some(tokio::spawn(async move {
                while let Ok(event) = events.recv().await {
                    if let P2PEvent::Message {
                        topic,
                        source: Some(source),
                        data,
                    } = event
                    {
                        if topic == CHUNK_PROTOCOL_ID {
                            debug!(
                                "Node {node_index} received chunk protocol message from {source}"
                            );
                            let protocol = Arc::clone(&protocol_clone);
                            let p2p = Arc::clone(&p2p_clone);
                            tokio::spawn(async move {
                                match protocol.try_handle_request(&data).await {
                                    Ok(Some(response)) => {
                                        if let Err(e) = p2p
                                            .send_message(
                                                &source,
                                                CHUNK_PROTOCOL_ID,
                                                response.to_vec(),
                                                &[],
                                            )
                                            .await
                                        {
                                            warn!(
                                                "Node {node_index} failed to send response to {source}: {e}"
                                            );
                                        }
                                    }
                                    Ok(None) => {}
                                    Err(e) => {
                                        warn!("Node {node_index} protocol handler error: {e}");
                                    }
                                }
                            });
                        }
                    }
                }
            }));
        }

        debug!("Node {} started successfully", node.index);
        self.nodes.push(node);
        Ok(())
    }

    async fn wait_for_nodes_ready(&self, range: std::ops::Range<usize>) -> Result<()> {
        let deadline = Instant::now() + self.config.node_startup_timeout;

        for i in range {
            while Instant::now() < deadline {
                let node = self.nodes.get(i).ok_or_else(|| {
                    DevnetError::Config(format!(
                        "Node index {i} out of bounds (len: {})",
                        self.nodes.len()
                    ))
                })?;
                let state = node.state.read().await.clone();
                match state {
                    NodeState::Running | NodeState::Connected => break,
                    NodeState::Failed(ref e) => {
                        return Err(DevnetError::Startup(format!("Node {i} failed: {e}")));
                    }
                    _ => {
                        tokio::time::sleep(Duration::from_millis(NODE_READY_POLL_INTERVAL_MS))
                            .await;
                    }
                }
            }
        }
        Ok(())
    }

    async fn wait_for_stabilization(&self) -> Result<()> {
        let deadline = Instant::now() + self.config.stabilization_timeout;
        let min_connections = self
            .config
            .bootstrap_count
            .min(STABILIZATION_MIN_CONNECTIONS_CAP);

        info!(
            "Waiting for devnet stabilization (min {} connections per node)",
            min_connections
        );

        while Instant::now() < deadline {
            let mut all_connected = true;
            let mut total_connections = 0;

            for node in &self.nodes {
                let peer_count = node.peer_count().await;
                total_connections += peer_count;

                if peer_count < min_connections {
                    all_connected = false;
                }
            }

            if all_connected {
                info!("Devnet stabilized: {} total connections", total_connections);
                return Ok(());
            }

            debug!(
                "Waiting for stabilization: {} total connections",
                total_connections
            );
            tokio::time::sleep(Duration::from_secs(STABILIZATION_POLL_INTERVAL_SECS)).await;
        }

        Err(DevnetError::Stabilization(
            "Devnet failed to stabilize within timeout".to_string(),
        ))
    }

    fn start_health_monitor(&mut self) {
        let nodes: Vec<Arc<P2PNode>> = self
            .nodes
            .iter()
            .filter_map(|n| n.p2p_node.clone())
            .collect();
        let shutdown = self.shutdown.clone();

        self.health_monitor = Some(tokio::spawn(async move {
            let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS);

            loop {
                tokio::select! {
                    () = shutdown.cancelled() => break,
                    () = tokio::time::sleep(check_interval) => {
                        for (i, node) in nodes.iter().enumerate() {
                            if !node.is_running() {
                                warn!("Node {} appears unhealthy", i);
                            }
                        }
                    }
                }
            }
        }));
    }
}

impl Drop for Devnet {
    fn drop(&mut self) {
        self.shutdown.cancel();
        if let Some(handle) = self.health_monitor.take() {
            handle.abort();
        }
    }
}