1use crate::ant_protocol::CHUNK_PROTOCOL_ID;
7use crate::config::{default_root_dir, NODES_SUBDIR, NODE_IDENTITY_FILENAME};
8use crate::payment::{
9 EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
10 QuotingMetricsTracker,
11};
12use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
13use evmlib::Network as EvmNetwork;
14use evmlib::RewardsAddress;
15use rand::Rng;
16use saorsa_core::identity::NodeIdentity;
17use saorsa_core::{
18 IPDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId,
19};
20use serde::{Deserialize, Serialize};
21use std::net::{Ipv4Addr, SocketAddr};
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::sync::RwLock;
26use tokio::task::JoinHandle;
27use tokio::time::Instant;
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, info, warn};
30
31pub const DEVNET_PORT_RANGE_MIN: u16 = 20_000;
37
38pub const DEVNET_PORT_RANGE_MAX: u16 = 60_000;
40
41const DEFAULT_SPAWN_DELAY_MS: u64 = 200;
47
48const DEFAULT_STABILIZATION_TIMEOUT_SECS: u64 = 120;
50
51const DEFAULT_NODE_STARTUP_TIMEOUT_SECS: u64 = 30;
53
54const MINIMAL_STABILIZATION_TIMEOUT_SECS: u64 = 30;
56
57const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;
59
60const NODE_READY_POLL_INTERVAL_MS: u64 = 100;
62
63const STABILIZATION_POLL_INTERVAL_SECS: u64 = 1;
65
66const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3;
68
69const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;
71
72const DEVNET_PAYMENT_CACHE_CAPACITY: usize = 1000;
78
79const DEVNET_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];
81
82const DEVNET_MAX_RECORDS: usize = 100_000;
84
85const DEVNET_INITIAL_RECORDS: usize = 1000;
87
88pub const DEFAULT_NODE_COUNT: usize = 25;
94
95pub const DEFAULT_BOOTSTRAP_COUNT: usize = 3;
97
98pub const MINIMAL_NODE_COUNT: usize = 5;
100
101pub const MINIMAL_BOOTSTRAP_COUNT: usize = 2;
103
104pub const SMALL_NODE_COUNT: usize = 10;
106
107#[derive(Debug, thiserror::Error)]
109pub enum DevnetError {
110 #[error("Configuration error: {0}")]
112 Config(String),
113
114 #[error("Node startup error: {0}")]
116 Startup(String),
117
118 #[error("Network stabilization error: {0}")]
120 Stabilization(String),
121
122 #[error("IO error: {0}")]
124 Io(#[from] std::io::Error),
125
126 #[error("Core error: {0}")]
128 Core(String),
129}
130
131pub type Result<T> = std::result::Result<T, DevnetError>;
133
134#[derive(Debug, Clone)]
139pub struct DevnetConfig {
140 pub node_count: usize,
142
143 pub base_port: u16,
145
146 pub bootstrap_count: usize,
148
149 pub data_dir: PathBuf,
151
152 pub spawn_delay: Duration,
154
155 pub stabilization_timeout: Duration,
157
158 pub node_startup_timeout: Duration,
160
161 pub enable_node_logging: bool,
163
164 pub cleanup_data_dir: bool,
166
167 pub evm_network: Option<EvmNetwork>,
171}
172
173impl Default for DevnetConfig {
174 fn default() -> Self {
175 let mut rng = rand::thread_rng();
176
177 #[allow(clippy::cast_possible_truncation)] let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(DEFAULT_NODE_COUNT as u16);
179 let base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
180
181 Self {
182 node_count: DEFAULT_NODE_COUNT,
183 base_port,
184 bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
185 data_dir: default_root_dir(),
186 spawn_delay: Duration::from_millis(DEFAULT_SPAWN_DELAY_MS),
187 stabilization_timeout: Duration::from_secs(DEFAULT_STABILIZATION_TIMEOUT_SECS),
188 node_startup_timeout: Duration::from_secs(DEFAULT_NODE_STARTUP_TIMEOUT_SECS),
189 enable_node_logging: false,
190 cleanup_data_dir: true,
191 evm_network: None,
192 }
193 }
194}
195
196impl DevnetConfig {
197 #[must_use]
199 pub fn minimal() -> Self {
200 Self {
201 node_count: MINIMAL_NODE_COUNT,
202 bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
203 stabilization_timeout: Duration::from_secs(MINIMAL_STABILIZATION_TIMEOUT_SECS),
204 ..Self::default()
205 }
206 }
207
208 #[must_use]
210 pub fn small() -> Self {
211 Self {
212 node_count: SMALL_NODE_COUNT,
213 bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
214 stabilization_timeout: Duration::from_secs(SMALL_STABILIZATION_TIMEOUT_SECS),
215 ..Self::default()
216 }
217 }
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct DevnetManifest {
223 pub base_port: u16,
225 pub node_count: usize,
227 pub bootstrap: Vec<MultiAddr>,
229 pub data_dir: PathBuf,
231 pub created_at: String,
233 #[serde(default, skip_serializing_if = "Option::is_none")]
235 pub evm: Option<DevnetEvmInfo>,
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct DevnetEvmInfo {
241 pub rpc_url: String,
243 pub wallet_private_key: String,
245 pub payment_token_address: String,
247 pub data_payments_address: String,
249 #[serde(default, skip_serializing_if = "Option::is_none")]
251 pub merkle_payments_address: Option<String>,
252}
253
254#[derive(Debug, Clone)]
256pub enum NetworkState {
257 Uninitialized,
259 BootstrappingPhase,
261 NodeSpawningPhase,
263 Stabilizing,
265 Ready,
267 ShuttingDown,
269 Stopped,
271}
272
273#[derive(Debug, Clone)]
275pub enum NodeState {
276 Pending,
278 Starting,
280 Running,
282 Connected,
284 Stopped,
286 Failed(String),
288}
289
290#[allow(dead_code)]
292pub struct DevnetNode {
293 index: usize,
294 label: String,
295 peer_id: PeerId,
296 port: u16,
297 data_dir: PathBuf,
298 p2p_node: Option<Arc<P2PNode>>,
299 ant_protocol: Option<Arc<AntProtocol>>,
300 is_bootstrap: bool,
301 state: Arc<RwLock<NodeState>>,
302 bootstrap_addrs: Vec<MultiAddr>,
303 protocol_task: Option<JoinHandle<()>>,
304}
305
306impl DevnetNode {
307 pub async fn peer_count(&self) -> usize {
309 if let Some(ref node) = self.p2p_node {
310 node.peer_count().await
311 } else {
312 0
313 }
314 }
315}
316
317pub struct Devnet {
319 config: DevnetConfig,
320 nodes: Vec<DevnetNode>,
321 shutdown: CancellationToken,
322 state: Arc<RwLock<NetworkState>>,
323 health_monitor: Option<JoinHandle<()>>,
324}
325
326impl Devnet {
327 pub async fn new(mut config: DevnetConfig) -> Result<Self> {
335 if config.bootstrap_count >= config.node_count {
336 return Err(DevnetError::Config(
337 "Bootstrap count must be less than node count".to_string(),
338 ));
339 }
340
341 if config.bootstrap_count == 0 {
342 return Err(DevnetError::Config(
343 "At least one bootstrap node is required".to_string(),
344 ));
345 }
346
347 let node_count = config.node_count;
348 let node_count_u16 = u16::try_from(node_count).map_err(|_| {
349 DevnetError::Config(format!("Node count {node_count} exceeds u16::MAX"))
350 })?;
351
352 if config.base_port == 0 {
353 let mut rng = rand::thread_rng();
354 let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(node_count_u16);
355 config.base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
356 }
357
358 let base_port = config.base_port;
359 let max_port = base_port
360 .checked_add(node_count_u16)
361 .ok_or_else(|| {
362 DevnetError::Config(format!(
363 "Port range overflow: base_port {base_port} + node_count {node_count} exceeds u16::MAX"
364 ))
365 })?;
366 if max_port > DEVNET_PORT_RANGE_MAX {
367 return Err(DevnetError::Config(format!(
368 "Port range overflow: max port {max_port} exceeds DEVNET_PORT_RANGE_MAX {DEVNET_PORT_RANGE_MAX}"
369 )));
370 }
371
372 tokio::fs::create_dir_all(&config.data_dir).await?;
373
374 Ok(Self {
375 config,
376 nodes: Vec::new(),
377 shutdown: CancellationToken::new(),
378 state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
379 health_monitor: None,
380 })
381 }
382
383 pub async fn start(&mut self) -> Result<()> {
390 info!(
391 "Starting devnet with {} nodes ({} bootstrap)",
392 self.config.node_count, self.config.bootstrap_count
393 );
394
395 *self.state.write().await = NetworkState::BootstrappingPhase;
396 self.start_bootstrap_nodes().await?;
397
398 *self.state.write().await = NetworkState::NodeSpawningPhase;
399 self.start_regular_nodes().await?;
400
401 *self.state.write().await = NetworkState::Stabilizing;
402 self.wait_for_stabilization().await?;
403
404 self.start_health_monitor();
405
406 *self.state.write().await = NetworkState::Ready;
407 info!("Devnet is ready");
408 Ok(())
409 }
410
411 pub async fn shutdown(&mut self) -> Result<()> {
417 info!("Shutting down devnet");
418 *self.state.write().await = NetworkState::ShuttingDown;
419
420 self.shutdown.cancel();
421
422 if let Some(handle) = self.health_monitor.take() {
423 handle.abort();
424 }
425
426 let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
427 for node in self.nodes.iter_mut().rev() {
428 debug!("Stopping node {}", node.index);
429 if let Some(handle) = node.protocol_task.take() {
430 handle.abort();
431 }
432
433 let node_index = node.index;
434 let node_state = Arc::clone(&node.state);
435 let p2p_node = node.p2p_node.take();
436
437 shutdown_futures.push(async move {
438 if let Some(p2p) = p2p_node {
439 if let Err(e) = p2p.shutdown().await {
440 warn!("Error shutting down node {node_index}: {e}");
441 }
442 }
443 *node_state.write().await = NodeState::Stopped;
444 });
445 }
446 futures::future::join_all(shutdown_futures).await;
447
448 if self.config.cleanup_data_dir {
449 if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await {
450 warn!("Failed to cleanup devnet data directory: {e}");
451 }
452 }
453
454 *self.state.write().await = NetworkState::Stopped;
455 info!("Devnet shutdown complete");
456 Ok(())
457 }
458
459 #[must_use]
461 pub fn config(&self) -> &DevnetConfig {
462 &self.config
463 }
464
465 #[must_use]
467 pub fn bootstrap_addrs(&self) -> Vec<MultiAddr> {
468 self.nodes
469 .iter()
470 .take(self.config.bootstrap_count)
471 .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
472 .collect()
473 }
474
475 async fn start_bootstrap_nodes(&mut self) -> Result<()> {
476 info!("Starting {} bootstrap nodes", self.config.bootstrap_count);
477
478 for i in 0..self.config.bootstrap_count {
479 let node = self.create_node(i, true, vec![]).await?;
480 self.start_node(node).await?;
481 tokio::time::sleep(self.config.spawn_delay).await;
482 }
483
484 self.wait_for_nodes_ready(0..self.config.bootstrap_count)
485 .await?;
486
487 info!("All bootstrap nodes are ready");
488 Ok(())
489 }
490
491 async fn start_regular_nodes(&mut self) -> Result<()> {
492 let regular_count = self.config.node_count - self.config.bootstrap_count;
493 info!("Starting {} regular nodes", regular_count);
494
495 let bootstrap_addrs: Vec<MultiAddr> = self
496 .nodes
497 .get(0..self.config.bootstrap_count)
498 .ok_or_else(|| {
499 DevnetError::Config(format!(
500 "Bootstrap count {} exceeds nodes length {}",
501 self.config.bootstrap_count,
502 self.nodes.len()
503 ))
504 })?
505 .iter()
506 .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
507 .collect();
508
509 for i in self.config.bootstrap_count..self.config.node_count {
510 let node = self.create_node(i, false, bootstrap_addrs.clone()).await?;
511 self.start_node(node).await?;
512 tokio::time::sleep(self.config.spawn_delay).await;
513 }
514
515 info!("All regular nodes started");
516 Ok(())
517 }
518
519 async fn create_node(
520 &self,
521 index: usize,
522 is_bootstrap: bool,
523 bootstrap_addrs: Vec<MultiAddr>,
524 ) -> Result<DevnetNode> {
525 let index_u16 = u16::try_from(index)
526 .map_err(|_| DevnetError::Config(format!("Node index {index} exceeds u16::MAX")))?;
527 let port = self.config.base_port + index_u16;
528
529 let identity = NodeIdentity::generate()
531 .map_err(|e| DevnetError::Core(format!("Failed to generate node identity: {e}")))?;
532 let peer_id = *identity.peer_id();
533 let label = format!("devnet_node_{index}");
534 let data_dir = self
535 .config
536 .data_dir
537 .join(NODES_SUBDIR)
538 .join(peer_id.to_hex());
539
540 tokio::fs::create_dir_all(&data_dir).await?;
541
542 identity
543 .save_to_file(&data_dir.join(NODE_IDENTITY_FILENAME))
544 .await
545 .map_err(|e| DevnetError::Core(format!("Failed to save node identity: {e}")))?;
546
547 let ant_protocol = Self::create_ant_protocol(&data_dir, &identity, &self.config).await?;
548
549 Ok(DevnetNode {
550 index,
551 label,
552 peer_id,
553 port,
554 data_dir,
555 p2p_node: None,
556 ant_protocol: Some(Arc::new(ant_protocol)),
557 is_bootstrap,
558 state: Arc::new(RwLock::new(NodeState::Pending)),
559 bootstrap_addrs,
560 protocol_task: None,
561 })
562 }
563
564 async fn create_ant_protocol(
565 data_dir: &std::path::Path,
566 identity: &NodeIdentity,
567 config: &DevnetConfig,
568 ) -> Result<AntProtocol> {
569 let storage_config = LmdbStorageConfig {
570 root_dir: data_dir.to_path_buf(),
571 verify_on_read: true,
572 max_chunks: 0,
573 max_map_size: 0,
574 };
575 let storage = LmdbStorage::new(storage_config)
576 .await
577 .map_err(|e| DevnetError::Core(format!("Failed to create LMDB storage: {e}")))?;
578
579 let evm_config = EvmVerifierConfig {
580 network: config
581 .evm_network
582 .clone()
583 .unwrap_or(EvmNetwork::ArbitrumOne),
584 };
585
586 let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS);
587 let payment_config = PaymentVerifierConfig {
588 evm: evm_config,
589 cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
590 local_rewards_address: rewards_address,
591 };
592 let payment_verifier = PaymentVerifier::new(payment_config);
593 let metrics_tracker =
594 QuotingMetricsTracker::new(DEVNET_MAX_RECORDS, DEVNET_INITIAL_RECORDS);
595 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
596
597 crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)
599 .map_err(|e| DevnetError::Startup(format!("Failed to wire ML-DSA-65 signer: {e}")))?;
600
601 Ok(AntProtocol::new(
602 Arc::new(storage),
603 Arc::new(payment_verifier),
604 Arc::new(quote_generator),
605 ))
606 }
607
608 async fn start_node(&mut self, mut node: DevnetNode) -> Result<()> {
609 debug!("Starting node {} on port {}", node.index, node.port);
610 *node.state.write().await = NodeState::Starting;
611
612 let mut core_config = CoreNodeConfig::builder()
613 .port(node.port)
614 .local(true)
615 .max_message_size(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE)
616 .build()
617 .map_err(|e| DevnetError::Core(format!("Failed to create core config: {e}")))?;
618
619 let identity = NodeIdentity::load_from_file(
621 &node.data_dir.join(crate::config::NODE_IDENTITY_FILENAME),
622 )
623 .await
624 .map_err(|e| DevnetError::Core(format!("Failed to load node identity: {e}")))?;
625
626 core_config.node_identity = Some(Arc::new(identity));
627 core_config
628 .bootstrap_peers
629 .clone_from(&node.bootstrap_addrs);
630 core_config.diversity_config = Some(IPDiversityConfig::permissive());
631
632 let index = node.index;
633 let p2p_node = P2PNode::new(core_config)
634 .await
635 .map_err(|e| DevnetError::Startup(format!("Failed to create node {index}: {e}")))?;
636
637 p2p_node
638 .start()
639 .await
640 .map_err(|e| DevnetError::Startup(format!("Failed to start node {index}: {e}")))?;
641
642 node.p2p_node = Some(Arc::new(p2p_node));
643 *node.state.write().await = NodeState::Running;
644
645 if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
646 let mut events = p2p.subscribe_events();
647 let p2p_clone = Arc::clone(p2p);
648 let protocol_clone = Arc::clone(protocol);
649 let node_index = node.index;
650 node.protocol_task = Some(tokio::spawn(async move {
651 while let Ok(event) = events.recv().await {
652 if let P2PEvent::Message {
653 topic,
654 source: Some(source),
655 data,
656 } = event
657 {
658 if topic == CHUNK_PROTOCOL_ID {
659 debug!(
660 "Node {node_index} received chunk protocol message from {source}"
661 );
662 let protocol = Arc::clone(&protocol_clone);
663 let p2p = Arc::clone(&p2p_clone);
664 tokio::spawn(async move {
665 match protocol.try_handle_request(&data).await {
666 Ok(Some(response)) => {
667 if let Err(e) = p2p
668 .send_message(
669 &source,
670 CHUNK_PROTOCOL_ID,
671 response.to_vec(),
672 &[],
673 )
674 .await
675 {
676 warn!(
677 "Node {node_index} failed to send response to {source}: {e}"
678 );
679 }
680 }
681 Ok(None) => {}
682 Err(e) => {
683 warn!("Node {node_index} protocol handler error: {e}");
684 }
685 }
686 });
687 }
688 }
689 }
690 }));
691 }
692
693 debug!("Node {} started successfully", node.index);
694 self.nodes.push(node);
695 Ok(())
696 }
697
698 async fn wait_for_nodes_ready(&self, range: std::ops::Range<usize>) -> Result<()> {
699 let deadline = Instant::now() + self.config.node_startup_timeout;
700
701 for i in range {
702 while Instant::now() < deadline {
703 let node = self.nodes.get(i).ok_or_else(|| {
704 DevnetError::Config(format!(
705 "Node index {i} out of bounds (len: {})",
706 self.nodes.len()
707 ))
708 })?;
709 let state = node.state.read().await.clone();
710 match state {
711 NodeState::Running | NodeState::Connected => break,
712 NodeState::Failed(ref e) => {
713 return Err(DevnetError::Startup(format!("Node {i} failed: {e}")));
714 }
715 _ => {
716 tokio::time::sleep(Duration::from_millis(NODE_READY_POLL_INTERVAL_MS))
717 .await;
718 }
719 }
720 }
721 }
722 Ok(())
723 }
724
725 async fn wait_for_stabilization(&self) -> Result<()> {
726 let deadline = Instant::now() + self.config.stabilization_timeout;
727 let min_connections = self
728 .config
729 .bootstrap_count
730 .min(STABILIZATION_MIN_CONNECTIONS_CAP);
731
732 info!(
733 "Waiting for devnet stabilization (min {} connections per node)",
734 min_connections
735 );
736
737 while Instant::now() < deadline {
738 let mut all_connected = true;
739 let mut total_connections = 0;
740
741 for node in &self.nodes {
742 let peer_count = node.peer_count().await;
743 total_connections += peer_count;
744
745 if peer_count < min_connections {
746 all_connected = false;
747 }
748 }
749
750 if all_connected {
751 info!("Devnet stabilized: {} total connections", total_connections);
752 return Ok(());
753 }
754
755 debug!(
756 "Waiting for stabilization: {} total connections",
757 total_connections
758 );
759 tokio::time::sleep(Duration::from_secs(STABILIZATION_POLL_INTERVAL_SECS)).await;
760 }
761
762 Err(DevnetError::Stabilization(
763 "Devnet failed to stabilize within timeout".to_string(),
764 ))
765 }
766
767 fn start_health_monitor(&mut self) {
768 let nodes: Vec<Arc<P2PNode>> = self
769 .nodes
770 .iter()
771 .filter_map(|n| n.p2p_node.clone())
772 .collect();
773 let shutdown = self.shutdown.clone();
774
775 self.health_monitor = Some(tokio::spawn(async move {
776 let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS);
777
778 loop {
779 tokio::select! {
780 () = shutdown.cancelled() => break,
781 () = tokio::time::sleep(check_interval) => {
782 for (i, node) in nodes.iter().enumerate() {
783 if !node.is_running() {
784 warn!("Node {} appears unhealthy", i);
785 }
786 }
787 }
788 }
789 }
790 }));
791 }
792}
793
794impl Drop for Devnet {
795 fn drop(&mut self) {
796 self.shutdown.cancel();
797 if let Some(handle) = self.health_monitor.take() {
798 handle.abort();
799 }
800 }
801}