1use crate::ant_protocol::CHUNK_PROTOCOL_ID;
7use crate::config::{default_root_dir, NODES_SUBDIR, NODE_IDENTITY_FILENAME};
8use crate::logging::{debug, info, warn};
9use crate::payment::{
10 EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
11 QuotingMetricsTracker,
12};
13use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
14use evmlib::Network as EvmNetwork;
15use evmlib::RewardsAddress;
16use rand::Rng;
17use saorsa_core::identity::NodeIdentity;
18use saorsa_core::{
19 IPDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId,
20};
21use std::net::{Ipv4Addr, SocketAddr};
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::sync::RwLock;
26use tokio::task::JoinHandle;
27use tokio::time::Instant;
28use tokio_util::sync::CancellationToken;
29
30pub const DEVNET_PORT_RANGE_MIN: u16 = 20_000;
36
37pub const DEVNET_PORT_RANGE_MAX: u16 = 60_000;
39
40const DEFAULT_SPAWN_DELAY_MS: u64 = 200;
46
47const DEFAULT_STABILIZATION_TIMEOUT_SECS: u64 = 120;
49
50const DEFAULT_NODE_STARTUP_TIMEOUT_SECS: u64 = 30;
52
53const MINIMAL_STABILIZATION_TIMEOUT_SECS: u64 = 30;
55
56const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;
58
59const NODE_READY_POLL_INTERVAL_MS: u64 = 100;
61
62const STABILIZATION_POLL_INTERVAL_SECS: u64 = 1;
64
65const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3;
67
68const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;
70
71const DEVNET_PAYMENT_CACHE_CAPACITY: usize = 1000;
77
78const DEVNET_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];
80
81const DEVNET_INITIAL_RECORDS: usize = 1000;
83
84pub const DEFAULT_NODE_COUNT: usize = 25;
90
91pub const DEFAULT_BOOTSTRAP_COUNT: usize = 3;
93
94pub const MINIMAL_NODE_COUNT: usize = 5;
96
97pub const MINIMAL_BOOTSTRAP_COUNT: usize = 2;
99
100pub const SMALL_NODE_COUNT: usize = 10;
102
103#[derive(Debug, thiserror::Error)]
105pub enum DevnetError {
106 #[error("Configuration error: {0}")]
108 Config(String),
109
110 #[error("Node startup error: {0}")]
112 Startup(String),
113
114 #[error("Network stabilization error: {0}")]
116 Stabilization(String),
117
118 #[error("IO error: {0}")]
120 Io(#[from] std::io::Error),
121
122 #[error("Core error: {0}")]
124 Core(String),
125}
126
127pub type Result<T> = std::result::Result<T, DevnetError>;
129
130#[derive(Debug, Clone)]
135pub struct DevnetConfig {
136 pub node_count: usize,
138
139 pub base_port: u16,
141
142 pub bootstrap_count: usize,
144
145 pub data_dir: PathBuf,
147
148 pub spawn_delay: Duration,
150
151 pub stabilization_timeout: Duration,
153
154 pub node_startup_timeout: Duration,
156
157 pub enable_node_logging: bool,
159
160 pub cleanup_data_dir: bool,
162
163 pub evm_network: Option<EvmNetwork>,
167}
168
169impl Default for DevnetConfig {
170 fn default() -> Self {
171 let mut rng = rand::thread_rng();
172
173 #[allow(clippy::cast_possible_truncation)] let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(DEFAULT_NODE_COUNT as u16);
175 let base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
176
177 Self {
178 node_count: DEFAULT_NODE_COUNT,
179 base_port,
180 bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
181 data_dir: default_root_dir(),
182 spawn_delay: Duration::from_millis(DEFAULT_SPAWN_DELAY_MS),
183 stabilization_timeout: Duration::from_secs(DEFAULT_STABILIZATION_TIMEOUT_SECS),
184 node_startup_timeout: Duration::from_secs(DEFAULT_NODE_STARTUP_TIMEOUT_SECS),
185 enable_node_logging: false,
186 cleanup_data_dir: true,
187 evm_network: None,
188 }
189 }
190}
191
192impl DevnetConfig {
193 #[must_use]
195 pub fn minimal() -> Self {
196 Self {
197 node_count: MINIMAL_NODE_COUNT,
198 bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
199 stabilization_timeout: Duration::from_secs(MINIMAL_STABILIZATION_TIMEOUT_SECS),
200 ..Self::default()
201 }
202 }
203
204 #[must_use]
206 pub fn small() -> Self {
207 Self {
208 node_count: SMALL_NODE_COUNT,
209 bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
210 stabilization_timeout: Duration::from_secs(SMALL_STABILIZATION_TIMEOUT_SECS),
211 ..Self::default()
212 }
213 }
214}
215
216pub use ant_protocol::devnet_manifest::{DevnetEvmInfo, DevnetManifest};
220
221#[derive(Debug, Clone)]
223pub enum NetworkState {
224 Uninitialized,
226 BootstrappingPhase,
228 NodeSpawningPhase,
230 Stabilizing,
232 Ready,
234 ShuttingDown,
236 Stopped,
238}
239
240#[derive(Debug, Clone)]
242pub enum NodeState {
243 Pending,
245 Starting,
247 Running,
249 Connected,
251 Stopped,
253 Failed(String),
255}
256
257#[allow(dead_code)]
259pub struct DevnetNode {
260 index: usize,
261 label: String,
262 peer_id: PeerId,
263 port: u16,
264 data_dir: PathBuf,
265 p2p_node: Option<Arc<P2PNode>>,
266 ant_protocol: Option<Arc<AntProtocol>>,
267 is_bootstrap: bool,
268 state: Arc<RwLock<NodeState>>,
269 bootstrap_addrs: Vec<MultiAddr>,
270 protocol_task: Option<JoinHandle<()>>,
271}
272
273impl DevnetNode {
274 pub async fn peer_count(&self) -> usize {
276 if let Some(ref node) = self.p2p_node {
277 node.peer_count().await
278 } else {
279 0
280 }
281 }
282}
283
284pub struct Devnet {
286 config: DevnetConfig,
287 nodes: Vec<DevnetNode>,
288 shutdown: CancellationToken,
289 state: Arc<RwLock<NetworkState>>,
290 health_monitor: Option<JoinHandle<()>>,
291}
292
293impl Devnet {
294 pub async fn new(mut config: DevnetConfig) -> Result<Self> {
302 if config.bootstrap_count >= config.node_count {
303 return Err(DevnetError::Config(
304 "Bootstrap count must be less than node count".to_string(),
305 ));
306 }
307
308 if config.bootstrap_count == 0 {
309 return Err(DevnetError::Config(
310 "At least one bootstrap node is required".to_string(),
311 ));
312 }
313
314 let node_count = config.node_count;
315 let node_count_u16 = u16::try_from(node_count).map_err(|_| {
316 DevnetError::Config(format!("Node count {node_count} exceeds u16::MAX"))
317 })?;
318
319 if config.base_port == 0 {
320 let mut rng = rand::thread_rng();
321 let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(node_count_u16);
322 config.base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
323 }
324
325 let base_port = config.base_port;
326 let max_port = base_port
327 .checked_add(node_count_u16)
328 .ok_or_else(|| {
329 DevnetError::Config(format!(
330 "Port range overflow: base_port {base_port} + node_count {node_count} exceeds u16::MAX"
331 ))
332 })?;
333 if max_port > DEVNET_PORT_RANGE_MAX {
334 return Err(DevnetError::Config(format!(
335 "Port range overflow: max port {max_port} exceeds DEVNET_PORT_RANGE_MAX {DEVNET_PORT_RANGE_MAX}"
336 )));
337 }
338
339 tokio::fs::create_dir_all(&config.data_dir).await?;
340
341 Ok(Self {
342 config,
343 nodes: Vec::new(),
344 shutdown: CancellationToken::new(),
345 state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
346 health_monitor: None,
347 })
348 }
349
350 pub async fn start(&mut self) -> Result<()> {
357 info!(
358 "Starting devnet with {} nodes ({} bootstrap)",
359 self.config.node_count, self.config.bootstrap_count
360 );
361
362 *self.state.write().await = NetworkState::BootstrappingPhase;
363 self.start_bootstrap_nodes().await?;
364
365 *self.state.write().await = NetworkState::NodeSpawningPhase;
366 self.start_regular_nodes().await?;
367
368 *self.state.write().await = NetworkState::Stabilizing;
369 self.wait_for_stabilization().await?;
370
371 self.start_health_monitor();
372
373 *self.state.write().await = NetworkState::Ready;
374 info!("Devnet is ready");
375 Ok(())
376 }
377
378 pub async fn shutdown(&mut self) -> Result<()> {
384 info!("Shutting down devnet");
385 *self.state.write().await = NetworkState::ShuttingDown;
386
387 self.shutdown.cancel();
388
389 if let Some(handle) = self.health_monitor.take() {
390 handle.abort();
391 }
392
393 let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
394 for node in self.nodes.iter_mut().rev() {
395 debug!("Stopping node {}", node.index);
396 if let Some(handle) = node.protocol_task.take() {
397 handle.abort();
398 }
399
400 let node_index = node.index;
401 let node_state = Arc::clone(&node.state);
402 let p2p_node = node.p2p_node.take();
403
404 shutdown_futures.push(async move {
405 if let Some(p2p) = p2p_node {
406 if let Err(e) = p2p.shutdown().await {
407 warn!("Error shutting down node {node_index}: {e}");
408 }
409 }
410 *node_state.write().await = NodeState::Stopped;
411 });
412 }
413 futures::future::join_all(shutdown_futures).await;
414
415 if self.config.cleanup_data_dir {
416 if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await {
417 warn!("Failed to cleanup devnet data directory: {e}");
418 }
419 }
420
421 *self.state.write().await = NetworkState::Stopped;
422 info!("Devnet shutdown complete");
423 Ok(())
424 }
425
426 #[must_use]
428 pub fn config(&self) -> &DevnetConfig {
429 &self.config
430 }
431
432 #[must_use]
434 pub fn bootstrap_addrs(&self) -> Vec<MultiAddr> {
435 self.nodes
436 .iter()
437 .take(self.config.bootstrap_count)
438 .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
439 .collect()
440 }
441
442 async fn start_bootstrap_nodes(&mut self) -> Result<()> {
443 info!("Starting {} bootstrap nodes", self.config.bootstrap_count);
444
445 for i in 0..self.config.bootstrap_count {
446 let node = self.create_node(i, true, vec![]).await?;
447 self.start_node(node).await?;
448 tokio::time::sleep(self.config.spawn_delay).await;
449 }
450
451 self.wait_for_nodes_ready(0..self.config.bootstrap_count)
452 .await?;
453
454 info!("All bootstrap nodes are ready");
455 Ok(())
456 }
457
458 async fn start_regular_nodes(&mut self) -> Result<()> {
459 let regular_count = self.config.node_count - self.config.bootstrap_count;
460 info!("Starting {} regular nodes", regular_count);
461
462 let bootstrap_addrs: Vec<MultiAddr> = self
463 .nodes
464 .get(0..self.config.bootstrap_count)
465 .ok_or_else(|| {
466 DevnetError::Config(format!(
467 "Bootstrap count {} exceeds nodes length {}",
468 self.config.bootstrap_count,
469 self.nodes.len()
470 ))
471 })?
472 .iter()
473 .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
474 .collect();
475
476 for i in self.config.bootstrap_count..self.config.node_count {
477 let node = self.create_node(i, false, bootstrap_addrs.clone()).await?;
478 self.start_node(node).await?;
479 tokio::time::sleep(self.config.spawn_delay).await;
480 }
481
482 info!("All regular nodes started");
483 Ok(())
484 }
485
486 async fn create_node(
487 &self,
488 index: usize,
489 is_bootstrap: bool,
490 bootstrap_addrs: Vec<MultiAddr>,
491 ) -> Result<DevnetNode> {
492 let index_u16 = u16::try_from(index)
493 .map_err(|_| DevnetError::Config(format!("Node index {index} exceeds u16::MAX")))?;
494 let port = self.config.base_port + index_u16;
495
496 let identity = NodeIdentity::generate()
498 .map_err(|e| DevnetError::Core(format!("Failed to generate node identity: {e}")))?;
499 let peer_id = *identity.peer_id();
500 let label = format!("devnet_node_{index}");
501 let data_dir = self
502 .config
503 .data_dir
504 .join(NODES_SUBDIR)
505 .join(peer_id.to_hex());
506
507 tokio::fs::create_dir_all(&data_dir).await?;
508
509 identity
510 .save_to_file(&data_dir.join(NODE_IDENTITY_FILENAME))
511 .await
512 .map_err(|e| DevnetError::Core(format!("Failed to save node identity: {e}")))?;
513
514 let ant_protocol = Self::create_ant_protocol(&data_dir, &identity, &self.config).await?;
515
516 Ok(DevnetNode {
517 index,
518 label,
519 peer_id,
520 port,
521 data_dir,
522 p2p_node: None,
523 ant_protocol: Some(Arc::new(ant_protocol)),
524 is_bootstrap,
525 state: Arc::new(RwLock::new(NodeState::Pending)),
526 bootstrap_addrs,
527 protocol_task: None,
528 })
529 }
530
531 async fn create_ant_protocol(
532 data_dir: &std::path::Path,
533 identity: &NodeIdentity,
534 config: &DevnetConfig,
535 ) -> Result<AntProtocol> {
536 let storage_config = LmdbStorageConfig {
537 root_dir: data_dir.to_path_buf(),
538 verify_on_read: true,
539 ..LmdbStorageConfig::default()
540 };
541 let storage = LmdbStorage::new(storage_config)
542 .await
543 .map_err(|e| DevnetError::Core(format!("Failed to create LMDB storage: {e}")))?;
544
545 let evm_config = EvmVerifierConfig {
546 network: config
547 .evm_network
548 .clone()
549 .unwrap_or(EvmNetwork::ArbitrumOne),
550 };
551
552 let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS);
553 let payment_config = PaymentVerifierConfig {
554 evm: evm_config,
555 cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
556 local_rewards_address: rewards_address,
557 };
558 let payment_verifier = PaymentVerifier::new(payment_config);
559 let metrics_tracker = QuotingMetricsTracker::new(DEVNET_INITIAL_RECORDS);
560 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
561
562 crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)
564 .map_err(|e| DevnetError::Startup(format!("Failed to wire ML-DSA-65 signer: {e}")))?;
565
566 Ok(AntProtocol::new(
567 Arc::new(storage),
568 Arc::new(payment_verifier),
569 Arc::new(quote_generator),
570 ))
571 }
572
573 async fn start_node(&mut self, mut node: DevnetNode) -> Result<()> {
574 debug!("Starting node {} on port {}", node.index, node.port);
575 *node.state.write().await = NodeState::Starting;
576
577 let mut core_config = CoreNodeConfig::builder()
578 .port(node.port)
579 .local(true)
580 .max_message_size(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE)
581 .build()
582 .map_err(|e| DevnetError::Core(format!("Failed to create core config: {e}")))?;
583
584 let identity = NodeIdentity::load_from_file(
586 &node.data_dir.join(crate::config::NODE_IDENTITY_FILENAME),
587 )
588 .await
589 .map_err(|e| DevnetError::Core(format!("Failed to load node identity: {e}")))?;
590
591 core_config.node_identity = Some(Arc::new(identity));
592 core_config
593 .bootstrap_peers
594 .clone_from(&node.bootstrap_addrs);
595 core_config.diversity_config = Some(IPDiversityConfig::permissive());
596
597 let index = node.index;
598 let p2p_node = P2PNode::new(core_config)
599 .await
600 .map_err(|e| DevnetError::Startup(format!("Failed to create node {index}: {e}")))?;
601
602 p2p_node
603 .start()
604 .await
605 .map_err(|e| DevnetError::Startup(format!("Failed to start node {index}: {e}")))?;
606
607 node.p2p_node = Some(Arc::new(p2p_node));
608 *node.state.write().await = NodeState::Running;
609
610 if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
611 protocol
613 .payment_verifier_arc()
614 .attach_p2p_node(Arc::clone(p2p));
615
616 let mut events = p2p.subscribe_events();
617 let p2p_clone = Arc::clone(p2p);
618 let protocol_clone = Arc::clone(protocol);
619 let node_index = node.index;
620 node.protocol_task = Some(tokio::spawn(async move {
621 while let Ok(event) = events.recv().await {
622 if let P2PEvent::Message {
623 topic,
624 source: Some(source),
625 data,
626 } = event
627 {
628 if topic == CHUNK_PROTOCOL_ID {
629 debug!(
630 "Node {node_index} received chunk protocol message from {source}"
631 );
632 let protocol = Arc::clone(&protocol_clone);
633 let p2p = Arc::clone(&p2p_clone);
634 tokio::spawn(async move {
635 match protocol.try_handle_request(&data).await {
636 Ok(Some(response)) => {
637 if let Err(e) = p2p
638 .send_message(
639 &source,
640 CHUNK_PROTOCOL_ID,
641 response.to_vec(),
642 &[],
643 )
644 .await
645 {
646 warn!(
647 "Node {node_index} failed to send response to {source}: {e}"
648 );
649 }
650 }
651 Ok(None) => {}
652 Err(e) => {
653 warn!("Node {node_index} protocol handler error: {e}");
654 }
655 }
656 });
657 }
658 }
659 }
660 }));
661 }
662
663 debug!("Node {} started successfully", node.index);
664 self.nodes.push(node);
665 Ok(())
666 }
667
668 async fn wait_for_nodes_ready(&self, range: std::ops::Range<usize>) -> Result<()> {
669 let deadline = Instant::now() + self.config.node_startup_timeout;
670
671 for i in range {
672 while Instant::now() < deadline {
673 let node = self.nodes.get(i).ok_or_else(|| {
674 DevnetError::Config(format!(
675 "Node index {i} out of bounds (len: {})",
676 self.nodes.len()
677 ))
678 })?;
679 let state = node.state.read().await.clone();
680 match state {
681 NodeState::Running | NodeState::Connected => break,
682 NodeState::Failed(ref e) => {
683 return Err(DevnetError::Startup(format!("Node {i} failed: {e}")));
684 }
685 _ => {
686 tokio::time::sleep(Duration::from_millis(NODE_READY_POLL_INTERVAL_MS))
687 .await;
688 }
689 }
690 }
691 }
692 Ok(())
693 }
694
695 async fn wait_for_stabilization(&self) -> Result<()> {
696 let deadline = Instant::now() + self.config.stabilization_timeout;
697 let min_connections = self
698 .config
699 .bootstrap_count
700 .min(STABILIZATION_MIN_CONNECTIONS_CAP);
701
702 info!(
703 "Waiting for devnet stabilization (min {} connections per node)",
704 min_connections
705 );
706
707 while Instant::now() < deadline {
708 let mut all_connected = true;
709 let mut total_connections = 0;
710
711 for node in &self.nodes {
712 let peer_count = node.peer_count().await;
713 total_connections += peer_count;
714
715 if peer_count < min_connections {
716 all_connected = false;
717 }
718 }
719
720 if all_connected {
721 info!("Devnet stabilized: {} total connections", total_connections);
722 return Ok(());
723 }
724
725 debug!(
726 "Waiting for stabilization: {} total connections",
727 total_connections
728 );
729 tokio::time::sleep(Duration::from_secs(STABILIZATION_POLL_INTERVAL_SECS)).await;
730 }
731
732 Err(DevnetError::Stabilization(
733 "Devnet failed to stabilize within timeout".to_string(),
734 ))
735 }
736
737 fn start_health_monitor(&mut self) {
738 let nodes: Vec<Arc<P2PNode>> = self
739 .nodes
740 .iter()
741 .filter_map(|n| n.p2p_node.clone())
742 .collect();
743 let shutdown = self.shutdown.clone();
744
745 self.health_monitor = Some(tokio::spawn(async move {
746 let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS);
747
748 loop {
749 tokio::select! {
750 () = shutdown.cancelled() => break,
751 () = tokio::time::sleep(check_interval) => {
752 for (i, node) in nodes.iter().enumerate() {
753 if !node.is_running() {
754 warn!("Node {} appears unhealthy", i);
755 }
756 }
757 }
758 }
759 }
760 }));
761 }
762}
763
764impl Drop for Devnet {
765 fn drop(&mut self) {
766 self.shutdown.cancel();
767 if let Some(handle) = self.health_monitor.take() {
768 handle.abort();
769 }
770 }
771}