1use crate::ant_protocol::CHUNK_PROTOCOL_ID;
7use crate::config::{default_root_dir, NODES_SUBDIR, NODE_IDENTITY_FILENAME};
8use crate::payment::{
9 EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
10 QuotingMetricsTracker,
11};
12use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
13use ant_evm::RewardsAddress;
14use evmlib::Network as EvmNetwork;
15use rand::Rng;
16use saorsa_core::identity::NodeIdentity;
17use saorsa_core::{
18 IPDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId,
19};
20use serde::{Deserialize, Serialize};
21use std::net::{Ipv4Addr, SocketAddr};
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::sync::RwLock;
26use tokio::task::JoinHandle;
27use tokio::time::Instant;
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, info, warn};
30
31pub const DEVNET_PORT_RANGE_MIN: u16 = 20_000;
37
38pub const DEVNET_PORT_RANGE_MAX: u16 = 60_000;
40
41const DEFAULT_SPAWN_DELAY_MS: u64 = 200;
47
48const DEFAULT_STABILIZATION_TIMEOUT_SECS: u64 = 120;
50
51const DEFAULT_NODE_STARTUP_TIMEOUT_SECS: u64 = 30;
53
54const MINIMAL_STABILIZATION_TIMEOUT_SECS: u64 = 30;
56
57const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;
59
60const NODE_READY_POLL_INTERVAL_MS: u64 = 100;
62
63const STABILIZATION_POLL_INTERVAL_SECS: u64 = 1;
65
66const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3;
68
69const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;
71
72const DEVNET_PAYMENT_CACHE_CAPACITY: usize = 1000;
78
79const DEVNET_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];
81
82const DEVNET_MAX_RECORDS: usize = 100_000;
84
85const DEVNET_INITIAL_RECORDS: usize = 1000;
87
88pub const DEFAULT_NODE_COUNT: usize = 25;
94
95pub const DEFAULT_BOOTSTRAP_COUNT: usize = 3;
97
98pub const MINIMAL_NODE_COUNT: usize = 5;
100
101pub const MINIMAL_BOOTSTRAP_COUNT: usize = 2;
103
104pub const SMALL_NODE_COUNT: usize = 10;
106
107#[derive(Debug, thiserror::Error)]
109pub enum DevnetError {
110 #[error("Configuration error: {0}")]
112 Config(String),
113
114 #[error("Node startup error: {0}")]
116 Startup(String),
117
118 #[error("Network stabilization error: {0}")]
120 Stabilization(String),
121
122 #[error("IO error: {0}")]
124 Io(#[from] std::io::Error),
125
126 #[error("Core error: {0}")]
128 Core(String),
129}
130
131pub type Result<T> = std::result::Result<T, DevnetError>;
133
134#[derive(Debug, Clone)]
139pub struct DevnetConfig {
140 pub node_count: usize,
142
143 pub base_port: u16,
145
146 pub bootstrap_count: usize,
148
149 pub data_dir: PathBuf,
151
152 pub spawn_delay: Duration,
154
155 pub stabilization_timeout: Duration,
157
158 pub node_startup_timeout: Duration,
160
161 pub enable_node_logging: bool,
163
164 pub cleanup_data_dir: bool,
166
167 pub evm_network: Option<EvmNetwork>,
171}
172
173impl Default for DevnetConfig {
174 fn default() -> Self {
175 let mut rng = rand::thread_rng();
176
177 #[allow(clippy::cast_possible_truncation)] let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(DEFAULT_NODE_COUNT as u16);
179 let base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
180
181 Self {
182 node_count: DEFAULT_NODE_COUNT,
183 base_port,
184 bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
185 data_dir: default_root_dir(),
186 spawn_delay: Duration::from_millis(DEFAULT_SPAWN_DELAY_MS),
187 stabilization_timeout: Duration::from_secs(DEFAULT_STABILIZATION_TIMEOUT_SECS),
188 node_startup_timeout: Duration::from_secs(DEFAULT_NODE_STARTUP_TIMEOUT_SECS),
189 enable_node_logging: false,
190 cleanup_data_dir: true,
191 evm_network: None,
192 }
193 }
194}
195
196impl DevnetConfig {
197 #[must_use]
199 pub fn minimal() -> Self {
200 Self {
201 node_count: MINIMAL_NODE_COUNT,
202 bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
203 stabilization_timeout: Duration::from_secs(MINIMAL_STABILIZATION_TIMEOUT_SECS),
204 ..Self::default()
205 }
206 }
207
208 #[must_use]
210 pub fn small() -> Self {
211 Self {
212 node_count: SMALL_NODE_COUNT,
213 bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
214 stabilization_timeout: Duration::from_secs(SMALL_STABILIZATION_TIMEOUT_SECS),
215 ..Self::default()
216 }
217 }
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct DevnetManifest {
223 pub base_port: u16,
225 pub node_count: usize,
227 pub bootstrap: Vec<MultiAddr>,
229 pub data_dir: PathBuf,
231 pub created_at: String,
233 #[serde(default, skip_serializing_if = "Option::is_none")]
235 pub evm: Option<DevnetEvmInfo>,
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct DevnetEvmInfo {
241 pub rpc_url: String,
243 pub wallet_private_key: String,
245 pub payment_token_address: String,
247 pub data_payments_address: String,
249}
250
251#[derive(Debug, Clone)]
253pub enum NetworkState {
254 Uninitialized,
256 BootstrappingPhase,
258 NodeSpawningPhase,
260 Stabilizing,
262 Ready,
264 ShuttingDown,
266 Stopped,
268}
269
270#[derive(Debug, Clone)]
272pub enum NodeState {
273 Pending,
275 Starting,
277 Running,
279 Connected,
281 Stopped,
283 Failed(String),
285}
286
287#[allow(dead_code)]
289pub struct DevnetNode {
290 index: usize,
291 label: String,
292 peer_id: PeerId,
293 port: u16,
294 data_dir: PathBuf,
295 p2p_node: Option<Arc<P2PNode>>,
296 ant_protocol: Option<Arc<AntProtocol>>,
297 is_bootstrap: bool,
298 state: Arc<RwLock<NodeState>>,
299 bootstrap_addrs: Vec<MultiAddr>,
300 protocol_task: Option<JoinHandle<()>>,
301}
302
303impl DevnetNode {
304 pub async fn peer_count(&self) -> usize {
306 if let Some(ref node) = self.p2p_node {
307 node.peer_count().await
308 } else {
309 0
310 }
311 }
312}
313
314pub struct Devnet {
316 config: DevnetConfig,
317 nodes: Vec<DevnetNode>,
318 shutdown: CancellationToken,
319 state: Arc<RwLock<NetworkState>>,
320 health_monitor: Option<JoinHandle<()>>,
321}
322
323impl Devnet {
324 pub async fn new(mut config: DevnetConfig) -> Result<Self> {
332 if config.bootstrap_count >= config.node_count {
333 return Err(DevnetError::Config(
334 "Bootstrap count must be less than node count".to_string(),
335 ));
336 }
337
338 if config.bootstrap_count == 0 {
339 return Err(DevnetError::Config(
340 "At least one bootstrap node is required".to_string(),
341 ));
342 }
343
344 let node_count = config.node_count;
345 let node_count_u16 = u16::try_from(node_count).map_err(|_| {
346 DevnetError::Config(format!("Node count {node_count} exceeds u16::MAX"))
347 })?;
348
349 if config.base_port == 0 {
350 let mut rng = rand::thread_rng();
351 let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(node_count_u16);
352 config.base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
353 }
354
355 let base_port = config.base_port;
356 let max_port = base_port
357 .checked_add(node_count_u16)
358 .ok_or_else(|| {
359 DevnetError::Config(format!(
360 "Port range overflow: base_port {base_port} + node_count {node_count} exceeds u16::MAX"
361 ))
362 })?;
363 if max_port > DEVNET_PORT_RANGE_MAX {
364 return Err(DevnetError::Config(format!(
365 "Port range overflow: max port {max_port} exceeds DEVNET_PORT_RANGE_MAX {DEVNET_PORT_RANGE_MAX}"
366 )));
367 }
368
369 tokio::fs::create_dir_all(&config.data_dir).await?;
370
371 Ok(Self {
372 config,
373 nodes: Vec::new(),
374 shutdown: CancellationToken::new(),
375 state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
376 health_monitor: None,
377 })
378 }
379
380 pub async fn start(&mut self) -> Result<()> {
387 info!(
388 "Starting devnet with {} nodes ({} bootstrap)",
389 self.config.node_count, self.config.bootstrap_count
390 );
391
392 *self.state.write().await = NetworkState::BootstrappingPhase;
393 self.start_bootstrap_nodes().await?;
394
395 *self.state.write().await = NetworkState::NodeSpawningPhase;
396 self.start_regular_nodes().await?;
397
398 *self.state.write().await = NetworkState::Stabilizing;
399 self.wait_for_stabilization().await?;
400
401 self.start_health_monitor();
402
403 *self.state.write().await = NetworkState::Ready;
404 info!("Devnet is ready");
405 Ok(())
406 }
407
408 pub async fn shutdown(&mut self) -> Result<()> {
414 info!("Shutting down devnet");
415 *self.state.write().await = NetworkState::ShuttingDown;
416
417 self.shutdown.cancel();
418
419 if let Some(handle) = self.health_monitor.take() {
420 handle.abort();
421 }
422
423 let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
424 for node in self.nodes.iter_mut().rev() {
425 debug!("Stopping node {}", node.index);
426 if let Some(handle) = node.protocol_task.take() {
427 handle.abort();
428 }
429
430 let node_index = node.index;
431 let node_state = Arc::clone(&node.state);
432 let p2p_node = node.p2p_node.take();
433
434 shutdown_futures.push(async move {
435 if let Some(p2p) = p2p_node {
436 if let Err(e) = p2p.shutdown().await {
437 warn!("Error shutting down node {node_index}: {e}");
438 }
439 }
440 *node_state.write().await = NodeState::Stopped;
441 });
442 }
443 futures::future::join_all(shutdown_futures).await;
444
445 if self.config.cleanup_data_dir {
446 if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await {
447 warn!("Failed to cleanup devnet data directory: {e}");
448 }
449 }
450
451 *self.state.write().await = NetworkState::Stopped;
452 info!("Devnet shutdown complete");
453 Ok(())
454 }
455
456 #[must_use]
458 pub fn config(&self) -> &DevnetConfig {
459 &self.config
460 }
461
462 #[must_use]
464 pub fn bootstrap_addrs(&self) -> Vec<MultiAddr> {
465 self.nodes
466 .iter()
467 .take(self.config.bootstrap_count)
468 .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
469 .collect()
470 }
471
472 async fn start_bootstrap_nodes(&mut self) -> Result<()> {
473 info!("Starting {} bootstrap nodes", self.config.bootstrap_count);
474
475 for i in 0..self.config.bootstrap_count {
476 let node = self.create_node(i, true, vec![]).await?;
477 self.start_node(node).await?;
478 tokio::time::sleep(self.config.spawn_delay).await;
479 }
480
481 self.wait_for_nodes_ready(0..self.config.bootstrap_count)
482 .await?;
483
484 info!("All bootstrap nodes are ready");
485 Ok(())
486 }
487
488 async fn start_regular_nodes(&mut self) -> Result<()> {
489 let regular_count = self.config.node_count - self.config.bootstrap_count;
490 info!("Starting {} regular nodes", regular_count);
491
492 let bootstrap_addrs: Vec<MultiAddr> = self
493 .nodes
494 .get(0..self.config.bootstrap_count)
495 .ok_or_else(|| {
496 DevnetError::Config(format!(
497 "Bootstrap count {} exceeds nodes length {}",
498 self.config.bootstrap_count,
499 self.nodes.len()
500 ))
501 })?
502 .iter()
503 .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
504 .collect();
505
506 for i in self.config.bootstrap_count..self.config.node_count {
507 let node = self.create_node(i, false, bootstrap_addrs.clone()).await?;
508 self.start_node(node).await?;
509 tokio::time::sleep(self.config.spawn_delay).await;
510 }
511
512 info!("All regular nodes started");
513 Ok(())
514 }
515
516 async fn create_node(
517 &self,
518 index: usize,
519 is_bootstrap: bool,
520 bootstrap_addrs: Vec<MultiAddr>,
521 ) -> Result<DevnetNode> {
522 let index_u16 = u16::try_from(index)
523 .map_err(|_| DevnetError::Config(format!("Node index {index} exceeds u16::MAX")))?;
524 let port = self.config.base_port + index_u16;
525
526 let identity = NodeIdentity::generate()
528 .map_err(|e| DevnetError::Core(format!("Failed to generate node identity: {e}")))?;
529 let peer_id = *identity.peer_id();
530 let label = format!("devnet_node_{index}");
531 let data_dir = self
532 .config
533 .data_dir
534 .join(NODES_SUBDIR)
535 .join(peer_id.to_hex());
536
537 tokio::fs::create_dir_all(&data_dir).await?;
538
539 identity
540 .save_to_file(&data_dir.join(NODE_IDENTITY_FILENAME))
541 .await
542 .map_err(|e| DevnetError::Core(format!("Failed to save node identity: {e}")))?;
543
544 let ant_protocol = Self::create_ant_protocol(&data_dir, &identity, &self.config).await?;
545
546 Ok(DevnetNode {
547 index,
548 label,
549 peer_id,
550 port,
551 data_dir,
552 p2p_node: None,
553 ant_protocol: Some(Arc::new(ant_protocol)),
554 is_bootstrap,
555 state: Arc::new(RwLock::new(NodeState::Pending)),
556 bootstrap_addrs,
557 protocol_task: None,
558 })
559 }
560
561 async fn create_ant_protocol(
562 data_dir: &std::path::Path,
563 identity: &NodeIdentity,
564 config: &DevnetConfig,
565 ) -> Result<AntProtocol> {
566 let storage_config = LmdbStorageConfig {
567 root_dir: data_dir.to_path_buf(),
568 verify_on_read: true,
569 max_chunks: 0,
570 max_map_size: 0,
571 };
572 let storage = LmdbStorage::new(storage_config)
573 .await
574 .map_err(|e| DevnetError::Core(format!("Failed to create LMDB storage: {e}")))?;
575
576 let evm_config = EvmVerifierConfig {
577 network: config
578 .evm_network
579 .clone()
580 .unwrap_or(EvmNetwork::ArbitrumOne),
581 };
582
583 let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS);
584 let payment_config = PaymentVerifierConfig {
585 evm: evm_config,
586 cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
587 local_rewards_address: rewards_address,
588 };
589 let payment_verifier = PaymentVerifier::new(payment_config);
590 let metrics_tracker =
591 QuotingMetricsTracker::new(DEVNET_MAX_RECORDS, DEVNET_INITIAL_RECORDS);
592 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
593
594 crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)
596 .map_err(|e| DevnetError::Startup(format!("Failed to wire ML-DSA-65 signer: {e}")))?;
597
598 Ok(AntProtocol::new(
599 Arc::new(storage),
600 Arc::new(payment_verifier),
601 Arc::new(quote_generator),
602 ))
603 }
604
605 async fn start_node(&mut self, mut node: DevnetNode) -> Result<()> {
606 debug!("Starting node {} on port {}", node.index, node.port);
607 *node.state.write().await = NodeState::Starting;
608
609 let mut core_config = CoreNodeConfig::builder()
610 .port(node.port)
611 .local(true)
612 .max_message_size(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE)
613 .build()
614 .map_err(|e| DevnetError::Core(format!("Failed to create core config: {e}")))?;
615
616 let identity = NodeIdentity::load_from_file(
618 &node.data_dir.join(crate::config::NODE_IDENTITY_FILENAME),
619 )
620 .await
621 .map_err(|e| DevnetError::Core(format!("Failed to load node identity: {e}")))?;
622
623 core_config.node_identity = Some(Arc::new(identity));
624 core_config
625 .bootstrap_peers
626 .clone_from(&node.bootstrap_addrs);
627 core_config.diversity_config = Some(IPDiversityConfig::permissive());
628
629 let index = node.index;
630 let p2p_node = P2PNode::new(core_config)
631 .await
632 .map_err(|e| DevnetError::Startup(format!("Failed to create node {index}: {e}")))?;
633
634 p2p_node
635 .start()
636 .await
637 .map_err(|e| DevnetError::Startup(format!("Failed to start node {index}: {e}")))?;
638
639 node.p2p_node = Some(Arc::new(p2p_node));
640 *node.state.write().await = NodeState::Running;
641
642 if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
643 let mut events = p2p.subscribe_events();
644 let p2p_clone = Arc::clone(p2p);
645 let protocol_clone = Arc::clone(protocol);
646 let node_index = node.index;
647 node.protocol_task = Some(tokio::spawn(async move {
648 while let Ok(event) = events.recv().await {
649 if let P2PEvent::Message {
650 topic,
651 source: Some(source),
652 data,
653 } = event
654 {
655 if topic == CHUNK_PROTOCOL_ID {
656 debug!(
657 "Node {node_index} received chunk protocol message from {source}"
658 );
659 let protocol = Arc::clone(&protocol_clone);
660 let p2p = Arc::clone(&p2p_clone);
661 tokio::spawn(async move {
662 match protocol.handle_message(&data).await {
663 Ok(response) => {
664 if let Err(e) = p2p
665 .send_message(
666 &source,
667 CHUNK_PROTOCOL_ID,
668 response.to_vec(),
669 &[],
670 )
671 .await
672 {
673 warn!(
674 "Node {node_index} failed to send response to {source}: {e}"
675 );
676 }
677 }
678 Err(e) => {
679 warn!("Node {node_index} protocol handler error: {e}");
680 }
681 }
682 });
683 }
684 }
685 }
686 }));
687 }
688
689 debug!("Node {} started successfully", node.index);
690 self.nodes.push(node);
691 Ok(())
692 }
693
694 async fn wait_for_nodes_ready(&self, range: std::ops::Range<usize>) -> Result<()> {
695 let deadline = Instant::now() + self.config.node_startup_timeout;
696
697 for i in range {
698 while Instant::now() < deadline {
699 let node = self.nodes.get(i).ok_or_else(|| {
700 DevnetError::Config(format!(
701 "Node index {i} out of bounds (len: {})",
702 self.nodes.len()
703 ))
704 })?;
705 let state = node.state.read().await.clone();
706 match state {
707 NodeState::Running | NodeState::Connected => break,
708 NodeState::Failed(ref e) => {
709 return Err(DevnetError::Startup(format!("Node {i} failed: {e}")));
710 }
711 _ => {
712 tokio::time::sleep(Duration::from_millis(NODE_READY_POLL_INTERVAL_MS))
713 .await;
714 }
715 }
716 }
717 }
718 Ok(())
719 }
720
721 async fn wait_for_stabilization(&self) -> Result<()> {
722 let deadline = Instant::now() + self.config.stabilization_timeout;
723 let min_connections = self
724 .config
725 .bootstrap_count
726 .min(STABILIZATION_MIN_CONNECTIONS_CAP);
727
728 info!(
729 "Waiting for devnet stabilization (min {} connections per node)",
730 min_connections
731 );
732
733 while Instant::now() < deadline {
734 let mut all_connected = true;
735 let mut total_connections = 0;
736
737 for node in &self.nodes {
738 let peer_count = node.peer_count().await;
739 total_connections += peer_count;
740
741 if peer_count < min_connections {
742 all_connected = false;
743 }
744 }
745
746 if all_connected {
747 info!("Devnet stabilized: {} total connections", total_connections);
748 return Ok(());
749 }
750
751 debug!(
752 "Waiting for stabilization: {} total connections",
753 total_connections
754 );
755 tokio::time::sleep(Duration::from_secs(STABILIZATION_POLL_INTERVAL_SECS)).await;
756 }
757
758 Err(DevnetError::Stabilization(
759 "Devnet failed to stabilize within timeout".to_string(),
760 ))
761 }
762
763 fn start_health_monitor(&mut self) {
764 let nodes: Vec<Arc<P2PNode>> = self
765 .nodes
766 .iter()
767 .filter_map(|n| n.p2p_node.clone())
768 .collect();
769 let shutdown = self.shutdown.clone();
770
771 self.health_monitor = Some(tokio::spawn(async move {
772 let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS);
773
774 loop {
775 tokio::select! {
776 () = shutdown.cancelled() => break,
777 () = tokio::time::sleep(check_interval) => {
778 for (i, node) in nodes.iter().enumerate() {
779 if !node.is_running() {
780 warn!("Node {} appears unhealthy", i);
781 }
782 }
783 }
784 }
785 }
786 }));
787 }
788}
789
790impl Drop for Devnet {
791 fn drop(&mut self) {
792 self.shutdown.cancel();
793 if let Some(handle) = self.health_monitor.take() {
794 handle.abort();
795 }
796 }
797}