1use crate::ant_protocol::CHUNK_PROTOCOL_ID;
7use crate::config::{default_root_dir, NODES_SUBDIR, NODE_IDENTITY_FILENAME};
8use crate::logging::{debug, info, warn};
9use crate::payment::{
10 EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
11 QuotingMetricsTracker,
12};
13use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
14use evmlib::Network as EvmNetwork;
15use evmlib::RewardsAddress;
16use rand::Rng;
17use saorsa_core::identity::NodeIdentity;
18use saorsa_core::{
19 IPDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId,
20};
21use serde::{Deserialize, Serialize};
22use std::net::{Ipv4Addr, SocketAddr};
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::time::Duration;
26use tokio::sync::RwLock;
27use tokio::task::JoinHandle;
28use tokio::time::Instant;
29use tokio_util::sync::CancellationToken;
30
31pub const DEVNET_PORT_RANGE_MIN: u16 = 20_000;
37
38pub const DEVNET_PORT_RANGE_MAX: u16 = 60_000;
40
41const DEFAULT_SPAWN_DELAY_MS: u64 = 200;
47
48const DEFAULT_STABILIZATION_TIMEOUT_SECS: u64 = 120;
50
51const DEFAULT_NODE_STARTUP_TIMEOUT_SECS: u64 = 30;
53
54const MINIMAL_STABILIZATION_TIMEOUT_SECS: u64 = 30;
56
57const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;
59
60const NODE_READY_POLL_INTERVAL_MS: u64 = 100;
62
63const STABILIZATION_POLL_INTERVAL_SECS: u64 = 1;
65
66const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3;
68
69const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;
71
72const DEVNET_PAYMENT_CACHE_CAPACITY: usize = 1000;
78
79const DEVNET_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];
81
82const DEVNET_INITIAL_RECORDS: usize = 1000;
84
85pub const DEFAULT_NODE_COUNT: usize = 25;
91
92pub const DEFAULT_BOOTSTRAP_COUNT: usize = 3;
94
95pub const MINIMAL_NODE_COUNT: usize = 5;
97
98pub const MINIMAL_BOOTSTRAP_COUNT: usize = 2;
100
101pub const SMALL_NODE_COUNT: usize = 10;
103
104#[derive(Debug, thiserror::Error)]
106pub enum DevnetError {
107 #[error("Configuration error: {0}")]
109 Config(String),
110
111 #[error("Node startup error: {0}")]
113 Startup(String),
114
115 #[error("Network stabilization error: {0}")]
117 Stabilization(String),
118
119 #[error("IO error: {0}")]
121 Io(#[from] std::io::Error),
122
123 #[error("Core error: {0}")]
125 Core(String),
126}
127
128pub type Result<T> = std::result::Result<T, DevnetError>;
130
131#[derive(Debug, Clone)]
136pub struct DevnetConfig {
137 pub node_count: usize,
139
140 pub base_port: u16,
142
143 pub bootstrap_count: usize,
145
146 pub data_dir: PathBuf,
148
149 pub spawn_delay: Duration,
151
152 pub stabilization_timeout: Duration,
154
155 pub node_startup_timeout: Duration,
157
158 pub enable_node_logging: bool,
160
161 pub cleanup_data_dir: bool,
163
164 pub evm_network: Option<EvmNetwork>,
168}
169
170impl Default for DevnetConfig {
171 fn default() -> Self {
172 let mut rng = rand::thread_rng();
173
174 #[allow(clippy::cast_possible_truncation)] let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(DEFAULT_NODE_COUNT as u16);
176 let base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
177
178 Self {
179 node_count: DEFAULT_NODE_COUNT,
180 base_port,
181 bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
182 data_dir: default_root_dir(),
183 spawn_delay: Duration::from_millis(DEFAULT_SPAWN_DELAY_MS),
184 stabilization_timeout: Duration::from_secs(DEFAULT_STABILIZATION_TIMEOUT_SECS),
185 node_startup_timeout: Duration::from_secs(DEFAULT_NODE_STARTUP_TIMEOUT_SECS),
186 enable_node_logging: false,
187 cleanup_data_dir: true,
188 evm_network: None,
189 }
190 }
191}
192
193impl DevnetConfig {
194 #[must_use]
196 pub fn minimal() -> Self {
197 Self {
198 node_count: MINIMAL_NODE_COUNT,
199 bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
200 stabilization_timeout: Duration::from_secs(MINIMAL_STABILIZATION_TIMEOUT_SECS),
201 ..Self::default()
202 }
203 }
204
205 #[must_use]
207 pub fn small() -> Self {
208 Self {
209 node_count: SMALL_NODE_COUNT,
210 bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
211 stabilization_timeout: Duration::from_secs(SMALL_STABILIZATION_TIMEOUT_SECS),
212 ..Self::default()
213 }
214 }
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct DevnetManifest {
220 pub base_port: u16,
222 pub node_count: usize,
224 pub bootstrap: Vec<MultiAddr>,
226 pub data_dir: PathBuf,
228 pub created_at: String,
230 #[serde(default, skip_serializing_if = "Option::is_none")]
232 pub evm: Option<DevnetEvmInfo>,
233}
234
235#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct DevnetEvmInfo {
238 pub rpc_url: String,
240 pub wallet_private_key: String,
242 pub payment_token_address: String,
244 pub payment_vault_address: String,
246}
247
248#[derive(Debug, Clone)]
250pub enum NetworkState {
251 Uninitialized,
253 BootstrappingPhase,
255 NodeSpawningPhase,
257 Stabilizing,
259 Ready,
261 ShuttingDown,
263 Stopped,
265}
266
267#[derive(Debug, Clone)]
269pub enum NodeState {
270 Pending,
272 Starting,
274 Running,
276 Connected,
278 Stopped,
280 Failed(String),
282}
283
284#[allow(dead_code)]
286pub struct DevnetNode {
287 index: usize,
288 label: String,
289 peer_id: PeerId,
290 port: u16,
291 data_dir: PathBuf,
292 p2p_node: Option<Arc<P2PNode>>,
293 ant_protocol: Option<Arc<AntProtocol>>,
294 is_bootstrap: bool,
295 state: Arc<RwLock<NodeState>>,
296 bootstrap_addrs: Vec<MultiAddr>,
297 protocol_task: Option<JoinHandle<()>>,
298}
299
300impl DevnetNode {
301 pub async fn peer_count(&self) -> usize {
303 if let Some(ref node) = self.p2p_node {
304 node.peer_count().await
305 } else {
306 0
307 }
308 }
309}
310
311pub struct Devnet {
313 config: DevnetConfig,
314 nodes: Vec<DevnetNode>,
315 shutdown: CancellationToken,
316 state: Arc<RwLock<NetworkState>>,
317 health_monitor: Option<JoinHandle<()>>,
318}
319
320impl Devnet {
321 pub async fn new(mut config: DevnetConfig) -> Result<Self> {
329 if config.bootstrap_count >= config.node_count {
330 return Err(DevnetError::Config(
331 "Bootstrap count must be less than node count".to_string(),
332 ));
333 }
334
335 if config.bootstrap_count == 0 {
336 return Err(DevnetError::Config(
337 "At least one bootstrap node is required".to_string(),
338 ));
339 }
340
341 let node_count = config.node_count;
342 let node_count_u16 = u16::try_from(node_count).map_err(|_| {
343 DevnetError::Config(format!("Node count {node_count} exceeds u16::MAX"))
344 })?;
345
346 if config.base_port == 0 {
347 let mut rng = rand::thread_rng();
348 let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(node_count_u16);
349 config.base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
350 }
351
352 let base_port = config.base_port;
353 let max_port = base_port
354 .checked_add(node_count_u16)
355 .ok_or_else(|| {
356 DevnetError::Config(format!(
357 "Port range overflow: base_port {base_port} + node_count {node_count} exceeds u16::MAX"
358 ))
359 })?;
360 if max_port > DEVNET_PORT_RANGE_MAX {
361 return Err(DevnetError::Config(format!(
362 "Port range overflow: max port {max_port} exceeds DEVNET_PORT_RANGE_MAX {DEVNET_PORT_RANGE_MAX}"
363 )));
364 }
365
366 tokio::fs::create_dir_all(&config.data_dir).await?;
367
368 Ok(Self {
369 config,
370 nodes: Vec::new(),
371 shutdown: CancellationToken::new(),
372 state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
373 health_monitor: None,
374 })
375 }
376
377 pub async fn start(&mut self) -> Result<()> {
384 info!(
385 "Starting devnet with {} nodes ({} bootstrap)",
386 self.config.node_count, self.config.bootstrap_count
387 );
388
389 *self.state.write().await = NetworkState::BootstrappingPhase;
390 self.start_bootstrap_nodes().await?;
391
392 *self.state.write().await = NetworkState::NodeSpawningPhase;
393 self.start_regular_nodes().await?;
394
395 *self.state.write().await = NetworkState::Stabilizing;
396 self.wait_for_stabilization().await?;
397
398 self.start_health_monitor();
399
400 *self.state.write().await = NetworkState::Ready;
401 info!("Devnet is ready");
402 Ok(())
403 }
404
405 pub async fn shutdown(&mut self) -> Result<()> {
411 info!("Shutting down devnet");
412 *self.state.write().await = NetworkState::ShuttingDown;
413
414 self.shutdown.cancel();
415
416 if let Some(handle) = self.health_monitor.take() {
417 handle.abort();
418 }
419
420 let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
421 for node in self.nodes.iter_mut().rev() {
422 debug!("Stopping node {}", node.index);
423 if let Some(handle) = node.protocol_task.take() {
424 handle.abort();
425 }
426
427 let node_index = node.index;
428 let node_state = Arc::clone(&node.state);
429 let p2p_node = node.p2p_node.take();
430
431 shutdown_futures.push(async move {
432 if let Some(p2p) = p2p_node {
433 if let Err(e) = p2p.shutdown().await {
434 warn!("Error shutting down node {node_index}: {e}");
435 }
436 }
437 *node_state.write().await = NodeState::Stopped;
438 });
439 }
440 futures::future::join_all(shutdown_futures).await;
441
442 if self.config.cleanup_data_dir {
443 if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await {
444 warn!("Failed to cleanup devnet data directory: {e}");
445 }
446 }
447
448 *self.state.write().await = NetworkState::Stopped;
449 info!("Devnet shutdown complete");
450 Ok(())
451 }
452
453 #[must_use]
455 pub fn config(&self) -> &DevnetConfig {
456 &self.config
457 }
458
459 #[must_use]
461 pub fn bootstrap_addrs(&self) -> Vec<MultiAddr> {
462 self.nodes
463 .iter()
464 .take(self.config.bootstrap_count)
465 .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
466 .collect()
467 }
468
469 async fn start_bootstrap_nodes(&mut self) -> Result<()> {
470 info!("Starting {} bootstrap nodes", self.config.bootstrap_count);
471
472 for i in 0..self.config.bootstrap_count {
473 let node = self.create_node(i, true, vec![]).await?;
474 self.start_node(node).await?;
475 tokio::time::sleep(self.config.spawn_delay).await;
476 }
477
478 self.wait_for_nodes_ready(0..self.config.bootstrap_count)
479 .await?;
480
481 info!("All bootstrap nodes are ready");
482 Ok(())
483 }
484
485 async fn start_regular_nodes(&mut self) -> Result<()> {
486 let regular_count = self.config.node_count - self.config.bootstrap_count;
487 info!("Starting {} regular nodes", regular_count);
488
489 let bootstrap_addrs: Vec<MultiAddr> = self
490 .nodes
491 .get(0..self.config.bootstrap_count)
492 .ok_or_else(|| {
493 DevnetError::Config(format!(
494 "Bootstrap count {} exceeds nodes length {}",
495 self.config.bootstrap_count,
496 self.nodes.len()
497 ))
498 })?
499 .iter()
500 .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
501 .collect();
502
503 for i in self.config.bootstrap_count..self.config.node_count {
504 let node = self.create_node(i, false, bootstrap_addrs.clone()).await?;
505 self.start_node(node).await?;
506 tokio::time::sleep(self.config.spawn_delay).await;
507 }
508
509 info!("All regular nodes started");
510 Ok(())
511 }
512
513 async fn create_node(
514 &self,
515 index: usize,
516 is_bootstrap: bool,
517 bootstrap_addrs: Vec<MultiAddr>,
518 ) -> Result<DevnetNode> {
519 let index_u16 = u16::try_from(index)
520 .map_err(|_| DevnetError::Config(format!("Node index {index} exceeds u16::MAX")))?;
521 let port = self.config.base_port + index_u16;
522
523 let identity = NodeIdentity::generate()
525 .map_err(|e| DevnetError::Core(format!("Failed to generate node identity: {e}")))?;
526 let peer_id = *identity.peer_id();
527 let label = format!("devnet_node_{index}");
528 let data_dir = self
529 .config
530 .data_dir
531 .join(NODES_SUBDIR)
532 .join(peer_id.to_hex());
533
534 tokio::fs::create_dir_all(&data_dir).await?;
535
536 identity
537 .save_to_file(&data_dir.join(NODE_IDENTITY_FILENAME))
538 .await
539 .map_err(|e| DevnetError::Core(format!("Failed to save node identity: {e}")))?;
540
541 let ant_protocol = Self::create_ant_protocol(&data_dir, &identity, &self.config).await?;
542
543 Ok(DevnetNode {
544 index,
545 label,
546 peer_id,
547 port,
548 data_dir,
549 p2p_node: None,
550 ant_protocol: Some(Arc::new(ant_protocol)),
551 is_bootstrap,
552 state: Arc::new(RwLock::new(NodeState::Pending)),
553 bootstrap_addrs,
554 protocol_task: None,
555 })
556 }
557
558 async fn create_ant_protocol(
559 data_dir: &std::path::Path,
560 identity: &NodeIdentity,
561 config: &DevnetConfig,
562 ) -> Result<AntProtocol> {
563 let storage_config = LmdbStorageConfig {
564 root_dir: data_dir.to_path_buf(),
565 verify_on_read: true,
566 ..LmdbStorageConfig::default()
567 };
568 let storage = LmdbStorage::new(storage_config)
569 .await
570 .map_err(|e| DevnetError::Core(format!("Failed to create LMDB storage: {e}")))?;
571
572 let evm_config = EvmVerifierConfig {
573 network: config
574 .evm_network
575 .clone()
576 .unwrap_or(EvmNetwork::ArbitrumOne),
577 };
578
579 let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS);
580 let payment_config = PaymentVerifierConfig {
581 evm: evm_config,
582 cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
583 local_rewards_address: rewards_address,
584 };
585 let payment_verifier = PaymentVerifier::new(payment_config);
586 let metrics_tracker = QuotingMetricsTracker::new(DEVNET_INITIAL_RECORDS);
587 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
588
589 crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)
591 .map_err(|e| DevnetError::Startup(format!("Failed to wire ML-DSA-65 signer: {e}")))?;
592
593 Ok(AntProtocol::new(
594 Arc::new(storage),
595 Arc::new(payment_verifier),
596 Arc::new(quote_generator),
597 ))
598 }
599
600 async fn start_node(&mut self, mut node: DevnetNode) -> Result<()> {
601 debug!("Starting node {} on port {}", node.index, node.port);
602 *node.state.write().await = NodeState::Starting;
603
604 let mut core_config = CoreNodeConfig::builder()
605 .port(node.port)
606 .local(true)
607 .max_message_size(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE)
608 .build()
609 .map_err(|e| DevnetError::Core(format!("Failed to create core config: {e}")))?;
610
611 let identity = NodeIdentity::load_from_file(
613 &node.data_dir.join(crate::config::NODE_IDENTITY_FILENAME),
614 )
615 .await
616 .map_err(|e| DevnetError::Core(format!("Failed to load node identity: {e}")))?;
617
618 core_config.node_identity = Some(Arc::new(identity));
619 core_config
620 .bootstrap_peers
621 .clone_from(&node.bootstrap_addrs);
622 core_config.diversity_config = Some(IPDiversityConfig::permissive());
623
624 let index = node.index;
625 let p2p_node = P2PNode::new(core_config)
626 .await
627 .map_err(|e| DevnetError::Startup(format!("Failed to create node {index}: {e}")))?;
628
629 p2p_node
630 .start()
631 .await
632 .map_err(|e| DevnetError::Startup(format!("Failed to start node {index}: {e}")))?;
633
634 node.p2p_node = Some(Arc::new(p2p_node));
635 *node.state.write().await = NodeState::Running;
636
637 if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
638 let mut events = p2p.subscribe_events();
639 let p2p_clone = Arc::clone(p2p);
640 let protocol_clone = Arc::clone(protocol);
641 let node_index = node.index;
642 node.protocol_task = Some(tokio::spawn(async move {
643 while let Ok(event) = events.recv().await {
644 if let P2PEvent::Message {
645 topic,
646 source: Some(source),
647 data,
648 } = event
649 {
650 if topic == CHUNK_PROTOCOL_ID {
651 debug!(
652 "Node {node_index} received chunk protocol message from {source}"
653 );
654 let protocol = Arc::clone(&protocol_clone);
655 let p2p = Arc::clone(&p2p_clone);
656 tokio::spawn(async move {
657 match protocol.try_handle_request(&data).await {
658 Ok(Some(response)) => {
659 if let Err(e) = p2p
660 .send_message(
661 &source,
662 CHUNK_PROTOCOL_ID,
663 response.to_vec(),
664 &[],
665 )
666 .await
667 {
668 warn!(
669 "Node {node_index} failed to send response to {source}: {e}"
670 );
671 }
672 }
673 Ok(None) => {}
674 Err(e) => {
675 warn!("Node {node_index} protocol handler error: {e}");
676 }
677 }
678 });
679 }
680 }
681 }
682 }));
683 }
684
685 debug!("Node {} started successfully", node.index);
686 self.nodes.push(node);
687 Ok(())
688 }
689
690 async fn wait_for_nodes_ready(&self, range: std::ops::Range<usize>) -> Result<()> {
691 let deadline = Instant::now() + self.config.node_startup_timeout;
692
693 for i in range {
694 while Instant::now() < deadline {
695 let node = self.nodes.get(i).ok_or_else(|| {
696 DevnetError::Config(format!(
697 "Node index {i} out of bounds (len: {})",
698 self.nodes.len()
699 ))
700 })?;
701 let state = node.state.read().await.clone();
702 match state {
703 NodeState::Running | NodeState::Connected => break,
704 NodeState::Failed(ref e) => {
705 return Err(DevnetError::Startup(format!("Node {i} failed: {e}")));
706 }
707 _ => {
708 tokio::time::sleep(Duration::from_millis(NODE_READY_POLL_INTERVAL_MS))
709 .await;
710 }
711 }
712 }
713 }
714 Ok(())
715 }
716
717 async fn wait_for_stabilization(&self) -> Result<()> {
718 let deadline = Instant::now() + self.config.stabilization_timeout;
719 let min_connections = self
720 .config
721 .bootstrap_count
722 .min(STABILIZATION_MIN_CONNECTIONS_CAP);
723
724 info!(
725 "Waiting for devnet stabilization (min {} connections per node)",
726 min_connections
727 );
728
729 while Instant::now() < deadline {
730 let mut all_connected = true;
731 let mut total_connections = 0;
732
733 for node in &self.nodes {
734 let peer_count = node.peer_count().await;
735 total_connections += peer_count;
736
737 if peer_count < min_connections {
738 all_connected = false;
739 }
740 }
741
742 if all_connected {
743 info!("Devnet stabilized: {} total connections", total_connections);
744 return Ok(());
745 }
746
747 debug!(
748 "Waiting for stabilization: {} total connections",
749 total_connections
750 );
751 tokio::time::sleep(Duration::from_secs(STABILIZATION_POLL_INTERVAL_SECS)).await;
752 }
753
754 Err(DevnetError::Stabilization(
755 "Devnet failed to stabilize within timeout".to_string(),
756 ))
757 }
758
759 fn start_health_monitor(&mut self) {
760 let nodes: Vec<Arc<P2PNode>> = self
761 .nodes
762 .iter()
763 .filter_map(|n| n.p2p_node.clone())
764 .collect();
765 let shutdown = self.shutdown.clone();
766
767 self.health_monitor = Some(tokio::spawn(async move {
768 let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS);
769
770 loop {
771 tokio::select! {
772 () = shutdown.cancelled() => break,
773 () = tokio::time::sleep(check_interval) => {
774 for (i, node) in nodes.iter().enumerate() {
775 if !node.is_running() {
776 warn!("Node {} appears unhealthy", i);
777 }
778 }
779 }
780 }
781 }
782 }));
783 }
784}
785
786impl Drop for Devnet {
787 fn drop(&mut self) {
788 self.shutdown.cancel();
789 if let Some(handle) = self.health_monitor.take() {
790 handle.abort();
791 }
792 }
793}