1use crate::ant_protocol::CHUNK_PROTOCOL_ID;
7use crate::config::{default_root_dir, NODES_SUBDIR, NODE_IDENTITY_FILENAME};
8use crate::logging::{debug, info, warn};
9use crate::payment::{
10 EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
11 QuotingMetricsTracker,
12};
13use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
14use evmlib::Network as EvmNetwork;
15use evmlib::RewardsAddress;
16use rand::Rng;
17use saorsa_core::identity::NodeIdentity;
18use saorsa_core::{
19 IPDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId,
20};
21use std::net::{Ipv4Addr, SocketAddr};
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::sync::RwLock;
26use tokio::task::JoinHandle;
27use tokio::time::Instant;
28use tokio_util::sync::CancellationToken;
29
30pub const DEVNET_PORT_RANGE_MIN: u16 = 20_000;
36
37pub const DEVNET_PORT_RANGE_MAX: u16 = 60_000;
39
40const DEFAULT_SPAWN_DELAY_MS: u64 = 200;
46
47const DEFAULT_STABILIZATION_TIMEOUT_SECS: u64 = 120;
49
50const DEFAULT_NODE_STARTUP_TIMEOUT_SECS: u64 = 30;
52
53const MINIMAL_STABILIZATION_TIMEOUT_SECS: u64 = 30;
55
56const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;
58
59const NODE_READY_POLL_INTERVAL_MS: u64 = 100;
61
62const STABILIZATION_POLL_INTERVAL_SECS: u64 = 1;
64
65const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3;
67
68const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;
70
71const DEVNET_PAYMENT_CACHE_CAPACITY: usize = 1000;
77
78const DEVNET_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];
80
81const DEVNET_INITIAL_RECORDS: usize = 1000;
83
84pub const DEFAULT_NODE_COUNT: usize = 25;
90
91pub const DEFAULT_BOOTSTRAP_COUNT: usize = 3;
93
94pub const MINIMAL_NODE_COUNT: usize = 5;
96
97pub const MINIMAL_BOOTSTRAP_COUNT: usize = 2;
99
100pub const SMALL_NODE_COUNT: usize = 10;
102
103#[derive(Debug, thiserror::Error)]
105pub enum DevnetError {
106 #[error("Configuration error: {0}")]
108 Config(String),
109
110 #[error("Node startup error: {0}")]
112 Startup(String),
113
114 #[error("Network stabilization error: {0}")]
116 Stabilization(String),
117
118 #[error("IO error: {0}")]
120 Io(#[from] std::io::Error),
121
122 #[error("Core error: {0}")]
124 Core(String),
125}
126
127pub type Result<T> = std::result::Result<T, DevnetError>;
129
130#[derive(Debug, Clone)]
135pub struct DevnetConfig {
136 pub node_count: usize,
138
139 pub base_port: u16,
141
142 pub bootstrap_count: usize,
144
145 pub data_dir: PathBuf,
147
148 pub spawn_delay: Duration,
150
151 pub stabilization_timeout: Duration,
153
154 pub node_startup_timeout: Duration,
156
157 pub enable_node_logging: bool,
159
160 pub cleanup_data_dir: bool,
162
163 pub evm_network: Option<EvmNetwork>,
167}
168
169impl Default for DevnetConfig {
170 fn default() -> Self {
171 let mut rng = rand::thread_rng();
172
173 #[allow(clippy::cast_possible_truncation)] let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(DEFAULT_NODE_COUNT as u16);
175 let base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
176
177 Self {
178 node_count: DEFAULT_NODE_COUNT,
179 base_port,
180 bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
181 data_dir: default_root_dir(),
182 spawn_delay: Duration::from_millis(DEFAULT_SPAWN_DELAY_MS),
183 stabilization_timeout: Duration::from_secs(DEFAULT_STABILIZATION_TIMEOUT_SECS),
184 node_startup_timeout: Duration::from_secs(DEFAULT_NODE_STARTUP_TIMEOUT_SECS),
185 enable_node_logging: false,
186 cleanup_data_dir: true,
187 evm_network: None,
188 }
189 }
190}
191
192impl DevnetConfig {
193 #[must_use]
195 pub fn minimal() -> Self {
196 Self {
197 node_count: MINIMAL_NODE_COUNT,
198 bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
199 stabilization_timeout: Duration::from_secs(MINIMAL_STABILIZATION_TIMEOUT_SECS),
200 ..Self::default()
201 }
202 }
203
204 #[must_use]
206 pub fn small() -> Self {
207 Self {
208 node_count: SMALL_NODE_COUNT,
209 bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
210 stabilization_timeout: Duration::from_secs(SMALL_STABILIZATION_TIMEOUT_SECS),
211 ..Self::default()
212 }
213 }
214}
215
216pub use ant_protocol::devnet_manifest::{DevnetEvmInfo, DevnetManifest};
220
221#[derive(Debug, Clone)]
223pub enum NetworkState {
224 Uninitialized,
226 BootstrappingPhase,
228 NodeSpawningPhase,
230 Stabilizing,
232 Ready,
234 ShuttingDown,
236 Stopped,
238}
239
240#[derive(Debug, Clone)]
242pub enum NodeState {
243 Pending,
245 Starting,
247 Running,
249 Connected,
251 Stopped,
253 Failed(String),
255}
256
257#[allow(dead_code)]
259pub struct DevnetNode {
260 index: usize,
261 label: String,
262 peer_id: PeerId,
263 port: u16,
264 data_dir: PathBuf,
265 p2p_node: Option<Arc<P2PNode>>,
266 ant_protocol: Option<Arc<AntProtocol>>,
267 is_bootstrap: bool,
268 state: Arc<RwLock<NodeState>>,
269 bootstrap_addrs: Vec<MultiAddr>,
270 protocol_task: Option<JoinHandle<()>>,
271}
272
273impl DevnetNode {
274 pub async fn peer_count(&self) -> usize {
276 if let Some(ref node) = self.p2p_node {
277 node.peer_count().await
278 } else {
279 0
280 }
281 }
282}
283
284pub struct Devnet {
286 config: DevnetConfig,
287 nodes: Vec<DevnetNode>,
288 shutdown: CancellationToken,
289 state: Arc<RwLock<NetworkState>>,
290 health_monitor: Option<JoinHandle<()>>,
291}
292
293impl Devnet {
294 pub async fn new(mut config: DevnetConfig) -> Result<Self> {
302 if config.bootstrap_count >= config.node_count {
303 return Err(DevnetError::Config(
304 "Bootstrap count must be less than node count".to_string(),
305 ));
306 }
307
308 if config.bootstrap_count == 0 {
309 return Err(DevnetError::Config(
310 "At least one bootstrap node is required".to_string(),
311 ));
312 }
313
314 let node_count = config.node_count;
315 let node_count_u16 = u16::try_from(node_count).map_err(|_| {
316 DevnetError::Config(format!("Node count {node_count} exceeds u16::MAX"))
317 })?;
318
319 if config.base_port == 0 {
320 let mut rng = rand::thread_rng();
321 let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(node_count_u16);
322 config.base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
323 }
324
325 let base_port = config.base_port;
326 let max_port = base_port
327 .checked_add(node_count_u16)
328 .ok_or_else(|| {
329 DevnetError::Config(format!(
330 "Port range overflow: base_port {base_port} + node_count {node_count} exceeds u16::MAX"
331 ))
332 })?;
333 if max_port > DEVNET_PORT_RANGE_MAX {
334 return Err(DevnetError::Config(format!(
335 "Port range overflow: max port {max_port} exceeds DEVNET_PORT_RANGE_MAX {DEVNET_PORT_RANGE_MAX}"
336 )));
337 }
338
339 tokio::fs::create_dir_all(&config.data_dir).await?;
340
341 Ok(Self {
342 config,
343 nodes: Vec::new(),
344 shutdown: CancellationToken::new(),
345 state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
346 health_monitor: None,
347 })
348 }
349
350 pub async fn start(&mut self) -> Result<()> {
357 info!(
358 "Starting devnet with {} nodes ({} bootstrap)",
359 self.config.node_count, self.config.bootstrap_count
360 );
361
362 *self.state.write().await = NetworkState::BootstrappingPhase;
363 self.start_bootstrap_nodes().await?;
364
365 *self.state.write().await = NetworkState::NodeSpawningPhase;
366 self.start_regular_nodes().await?;
367
368 *self.state.write().await = NetworkState::Stabilizing;
369 self.wait_for_stabilization().await?;
370
371 self.start_health_monitor();
372
373 *self.state.write().await = NetworkState::Ready;
374 info!("Devnet is ready");
375 Ok(())
376 }
377
378 pub async fn shutdown(&mut self) -> Result<()> {
384 info!("Shutting down devnet");
385 *self.state.write().await = NetworkState::ShuttingDown;
386
387 self.shutdown.cancel();
388
389 if let Some(handle) = self.health_monitor.take() {
390 handle.abort();
391 }
392
393 let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
394 for node in self.nodes.iter_mut().rev() {
395 debug!("Stopping node {}", node.index);
396 if let Some(handle) = node.protocol_task.take() {
397 handle.abort();
398 }
399
400 let node_index = node.index;
401 let node_state = Arc::clone(&node.state);
402 let p2p_node = node.p2p_node.take();
403
404 shutdown_futures.push(async move {
405 if let Some(p2p) = p2p_node {
406 if let Err(e) = p2p.shutdown().await {
407 warn!("Error shutting down node {node_index}: {e}");
408 }
409 }
410 *node_state.write().await = NodeState::Stopped;
411 });
412 }
413 futures::future::join_all(shutdown_futures).await;
414
415 if self.config.cleanup_data_dir {
416 if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await {
417 warn!("Failed to cleanup devnet data directory: {e}");
418 }
419 }
420
421 *self.state.write().await = NetworkState::Stopped;
422 info!("Devnet shutdown complete");
423 Ok(())
424 }
425
426 #[must_use]
428 pub fn config(&self) -> &DevnetConfig {
429 &self.config
430 }
431
432 #[must_use]
434 pub fn bootstrap_addrs(&self) -> Vec<MultiAddr> {
435 self.nodes
436 .iter()
437 .take(self.config.bootstrap_count)
438 .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
439 .collect()
440 }
441
442 async fn start_bootstrap_nodes(&mut self) -> Result<()> {
443 info!("Starting {} bootstrap nodes", self.config.bootstrap_count);
444
445 for i in 0..self.config.bootstrap_count {
446 let node = self.create_node(i, true, vec![]).await?;
447 self.start_node(node).await?;
448 tokio::time::sleep(self.config.spawn_delay).await;
449 }
450
451 self.wait_for_nodes_ready(0..self.config.bootstrap_count)
452 .await?;
453
454 info!("All bootstrap nodes are ready");
455 Ok(())
456 }
457
458 async fn start_regular_nodes(&mut self) -> Result<()> {
459 let regular_count = self.config.node_count - self.config.bootstrap_count;
460 info!("Starting {} regular nodes", regular_count);
461
462 let bootstrap_addrs: Vec<MultiAddr> = self
463 .nodes
464 .get(0..self.config.bootstrap_count)
465 .ok_or_else(|| {
466 DevnetError::Config(format!(
467 "Bootstrap count {} exceeds nodes length {}",
468 self.config.bootstrap_count,
469 self.nodes.len()
470 ))
471 })?
472 .iter()
473 .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
474 .collect();
475
476 for i in self.config.bootstrap_count..self.config.node_count {
477 let node = self.create_node(i, false, bootstrap_addrs.clone()).await?;
478 self.start_node(node).await?;
479 tokio::time::sleep(self.config.spawn_delay).await;
480 }
481
482 info!("All regular nodes started");
483 Ok(())
484 }
485
486 async fn create_node(
487 &self,
488 index: usize,
489 is_bootstrap: bool,
490 bootstrap_addrs: Vec<MultiAddr>,
491 ) -> Result<DevnetNode> {
492 let index_u16 = u16::try_from(index)
493 .map_err(|_| DevnetError::Config(format!("Node index {index} exceeds u16::MAX")))?;
494 let port = self.config.base_port + index_u16;
495
496 let identity = NodeIdentity::generate()
498 .map_err(|e| DevnetError::Core(format!("Failed to generate node identity: {e}")))?;
499 let peer_id = *identity.peer_id();
500 let label = format!("devnet_node_{index}");
501 let data_dir = self
502 .config
503 .data_dir
504 .join(NODES_SUBDIR)
505 .join(peer_id.to_hex());
506
507 tokio::fs::create_dir_all(&data_dir).await?;
508
509 identity
510 .save_to_file(&data_dir.join(NODE_IDENTITY_FILENAME))
511 .await
512 .map_err(|e| DevnetError::Core(format!("Failed to save node identity: {e}")))?;
513
514 let ant_protocol = Self::create_ant_protocol(&data_dir, &identity, &self.config).await?;
515
516 Ok(DevnetNode {
517 index,
518 label,
519 peer_id,
520 port,
521 data_dir,
522 p2p_node: None,
523 ant_protocol: Some(Arc::new(ant_protocol)),
524 is_bootstrap,
525 state: Arc::new(RwLock::new(NodeState::Pending)),
526 bootstrap_addrs,
527 protocol_task: None,
528 })
529 }
530
531 async fn create_ant_protocol(
532 data_dir: &std::path::Path,
533 identity: &NodeIdentity,
534 config: &DevnetConfig,
535 ) -> Result<AntProtocol> {
536 let storage_config = LmdbStorageConfig {
537 root_dir: data_dir.to_path_buf(),
538 verify_on_read: true,
539 ..LmdbStorageConfig::default()
540 };
541 let storage = LmdbStorage::new(storage_config)
542 .await
543 .map_err(|e| DevnetError::Core(format!("Failed to create LMDB storage: {e}")))?;
544
545 let evm_config = EvmVerifierConfig {
546 network: config
547 .evm_network
548 .clone()
549 .unwrap_or(EvmNetwork::ArbitrumOne),
550 };
551
552 let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS);
553 let payment_config = PaymentVerifierConfig {
554 evm: evm_config,
555 cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
556 local_rewards_address: rewards_address,
557 };
558 let payment_verifier = PaymentVerifier::new(payment_config);
559 let metrics_tracker = QuotingMetricsTracker::new(DEVNET_INITIAL_RECORDS);
560 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
561
562 crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)
564 .map_err(|e| DevnetError::Startup(format!("Failed to wire ML-DSA-65 signer: {e}")))?;
565
566 let storage = Arc::new(storage);
567 let payment_verifier = Arc::new(payment_verifier);
568
569 Ok(AntProtocol::new(
570 storage,
571 payment_verifier,
572 Arc::new(quote_generator),
573 ))
574 }
575
576 async fn start_node(&mut self, mut node: DevnetNode) -> Result<()> {
577 debug!("Starting node {} on port {}", node.index, node.port);
578 *node.state.write().await = NodeState::Starting;
579
580 let mut core_config = CoreNodeConfig::builder()
581 .port(node.port)
582 .local(true)
583 .max_message_size(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE)
584 .build()
585 .map_err(|e| DevnetError::Core(format!("Failed to create core config: {e}")))?;
586
587 let identity = NodeIdentity::load_from_file(
589 &node.data_dir.join(crate::config::NODE_IDENTITY_FILENAME),
590 )
591 .await
592 .map_err(|e| DevnetError::Core(format!("Failed to load node identity: {e}")))?;
593
594 core_config.node_identity = Some(Arc::new(identity));
595 core_config
596 .bootstrap_peers
597 .clone_from(&node.bootstrap_addrs);
598 core_config.diversity_config = Some(IPDiversityConfig::permissive());
599
600 let index = node.index;
601 let p2p_node = P2PNode::new(core_config)
602 .await
603 .map_err(|e| DevnetError::Startup(format!("Failed to create node {index}: {e}")))?;
604
605 p2p_node
606 .start()
607 .await
608 .map_err(|e| DevnetError::Startup(format!("Failed to start node {index}: {e}")))?;
609
610 node.p2p_node = Some(Arc::new(p2p_node));
611 *node.state.write().await = NodeState::Running;
612
613 if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
614 protocol
616 .payment_verifier_arc()
617 .attach_p2p_node(Arc::clone(p2p));
618
619 let mut events = p2p.subscribe_events();
620 let p2p_clone = Arc::clone(p2p);
621 let protocol_clone = Arc::clone(protocol);
622 let node_index = node.index;
623 node.protocol_task = Some(tokio::spawn(async move {
624 while let Ok(event) = events.recv().await {
625 if let P2PEvent::Message {
626 topic,
627 source: Some(source),
628 data,
629 ..
630 } = event
631 {
632 if topic == CHUNK_PROTOCOL_ID {
633 debug!(
634 "Node {node_index} received chunk protocol message from {source}"
635 );
636 let protocol = Arc::clone(&protocol_clone);
637 let p2p = Arc::clone(&p2p_clone);
638 tokio::spawn(async move {
639 match protocol.try_handle_request(&data).await {
640 Ok(Some(response)) => {
641 if let Err(e) = p2p
642 .send_message(
643 &source,
644 CHUNK_PROTOCOL_ID,
645 response.to_vec(),
646 &[],
647 )
648 .await
649 {
650 warn!(
651 "Node {node_index} failed to send response to {source}: {e}"
652 );
653 }
654 }
655 Ok(None) => {}
656 Err(e) => {
657 warn!("Node {node_index} protocol handler error: {e}");
658 }
659 }
660 });
661 }
662 }
663 }
664 }));
665 }
666
667 debug!("Node {} started successfully", node.index);
668 self.nodes.push(node);
669 Ok(())
670 }
671
672 async fn wait_for_nodes_ready(&self, range: std::ops::Range<usize>) -> Result<()> {
673 let deadline = Instant::now() + self.config.node_startup_timeout;
674
675 for i in range {
676 while Instant::now() < deadline {
677 let node = self.nodes.get(i).ok_or_else(|| {
678 DevnetError::Config(format!(
679 "Node index {i} out of bounds (len: {})",
680 self.nodes.len()
681 ))
682 })?;
683 let state = node.state.read().await.clone();
684 match state {
685 NodeState::Running | NodeState::Connected => break,
686 NodeState::Failed(ref e) => {
687 return Err(DevnetError::Startup(format!("Node {i} failed: {e}")));
688 }
689 _ => {
690 tokio::time::sleep(Duration::from_millis(NODE_READY_POLL_INTERVAL_MS))
691 .await;
692 }
693 }
694 }
695 }
696 Ok(())
697 }
698
699 async fn wait_for_stabilization(&self) -> Result<()> {
700 let deadline = Instant::now() + self.config.stabilization_timeout;
701 let min_connections = self
702 .config
703 .bootstrap_count
704 .min(STABILIZATION_MIN_CONNECTIONS_CAP);
705
706 info!(
707 "Waiting for devnet stabilization (min {} connections per node)",
708 min_connections
709 );
710
711 while Instant::now() < deadline {
712 let mut all_connected = true;
713 let mut total_connections = 0;
714
715 for node in &self.nodes {
716 let peer_count = node.peer_count().await;
717 total_connections += peer_count;
718
719 if peer_count < min_connections {
720 all_connected = false;
721 }
722 }
723
724 if all_connected {
725 info!("Devnet stabilized: {} total connections", total_connections);
726 return Ok(());
727 }
728
729 debug!(
730 "Waiting for stabilization: {} total connections",
731 total_connections
732 );
733 tokio::time::sleep(Duration::from_secs(STABILIZATION_POLL_INTERVAL_SECS)).await;
734 }
735
736 Err(DevnetError::Stabilization(
737 "Devnet failed to stabilize within timeout".to_string(),
738 ))
739 }
740
741 fn start_health_monitor(&mut self) {
742 let nodes: Vec<Arc<P2PNode>> = self
743 .nodes
744 .iter()
745 .filter_map(|n| n.p2p_node.clone())
746 .collect();
747 let shutdown = self.shutdown.clone();
748
749 self.health_monitor = Some(tokio::spawn(async move {
750 let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS);
751
752 loop {
753 tokio::select! {
754 () = shutdown.cancelled() => break,
755 () = tokio::time::sleep(check_interval) => {
756 for (i, node) in nodes.iter().enumerate() {
757 if !node.is_running() {
758 warn!("Node {} appears unhealthy", i);
759 }
760 }
761 }
762 }
763 }
764 }));
765 }
766}
767
768impl Drop for Devnet {
769 fn drop(&mut self) {
770 self.shutdown.cancel();
771 if let Some(handle) = self.health_monitor.take() {
772 handle.abort();
773 }
774 }
775}