1use crate::ant_protocol::CHUNK_PROTOCOL_ID;
7use crate::config::{default_root_dir, NODES_SUBDIR, NODE_IDENTITY_FILENAME};
8use crate::logging::{debug, info, warn};
9use crate::payment::{
10 EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator,
11 QuotingMetricsTracker,
12};
13use crate::replication::config::ReplicationConfig;
14use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
15use evmlib::Network as EvmNetwork;
16use evmlib::RewardsAddress;
17use rand::Rng;
18use saorsa_core::identity::NodeIdentity;
19use saorsa_core::{
20 IPDiversityConfig, MultiAddr, NodeConfig as CoreNodeConfig, P2PEvent, P2PNode, PeerId,
21};
22use std::net::{Ipv4Addr, SocketAddr};
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::time::Duration;
26use tokio::sync::RwLock;
27use tokio::task::JoinHandle;
28use tokio::time::Instant;
29use tokio_util::sync::CancellationToken;
30
31pub const DEVNET_PORT_RANGE_MIN: u16 = 20_000;
37
38pub const DEVNET_PORT_RANGE_MAX: u16 = 60_000;
40
41const DEFAULT_SPAWN_DELAY_MS: u64 = 200;
47
48const DEFAULT_STABILIZATION_TIMEOUT_SECS: u64 = 120;
50
51const DEFAULT_NODE_STARTUP_TIMEOUT_SECS: u64 = 30;
53
54const MINIMAL_STABILIZATION_TIMEOUT_SECS: u64 = 30;
56
57const SMALL_STABILIZATION_TIMEOUT_SECS: u64 = 60;
59
60const NODE_READY_POLL_INTERVAL_MS: u64 = 100;
62
63const STABILIZATION_POLL_INTERVAL_SECS: u64 = 1;
65
66const STABILIZATION_MIN_CONNECTIONS_CAP: usize = 3;
68
69const HEALTH_CHECK_INTERVAL_SECS: u64 = 5;
71
72const DEVNET_PAYMENT_CACHE_CAPACITY: usize = 1000;
78
79const DEVNET_REWARDS_ADDRESS: [u8; 20] = [0x01; 20];
81
82const DEVNET_INITIAL_RECORDS: usize = 1000;
84
85pub const DEFAULT_NODE_COUNT: usize = 25;
91
92pub const DEFAULT_BOOTSTRAP_COUNT: usize = 3;
94
95pub const MINIMAL_NODE_COUNT: usize = 5;
97
98pub const MINIMAL_BOOTSTRAP_COUNT: usize = 2;
100
101pub const SMALL_NODE_COUNT: usize = 10;
103
104#[derive(Debug, thiserror::Error)]
106pub enum DevnetError {
107 #[error("Configuration error: {0}")]
109 Config(String),
110
111 #[error("Node startup error: {0}")]
113 Startup(String),
114
115 #[error("Network stabilization error: {0}")]
117 Stabilization(String),
118
119 #[error("IO error: {0}")]
121 Io(#[from] std::io::Error),
122
123 #[error("Core error: {0}")]
125 Core(String),
126}
127
128pub type Result<T> = std::result::Result<T, DevnetError>;
130
131#[derive(Debug, Clone)]
136pub struct DevnetConfig {
137 pub node_count: usize,
139
140 pub base_port: u16,
142
143 pub bootstrap_count: usize,
145
146 pub data_dir: PathBuf,
148
149 pub spawn_delay: Duration,
151
152 pub stabilization_timeout: Duration,
154
155 pub node_startup_timeout: Duration,
157
158 pub enable_node_logging: bool,
160
161 pub cleanup_data_dir: bool,
163
164 pub evm_network: Option<EvmNetwork>,
168}
169
170impl Default for DevnetConfig {
171 fn default() -> Self {
172 let mut rng = rand::thread_rng();
173
174 #[allow(clippy::cast_possible_truncation)] let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(DEFAULT_NODE_COUNT as u16);
176 let base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
177
178 Self {
179 node_count: DEFAULT_NODE_COUNT,
180 base_port,
181 bootstrap_count: DEFAULT_BOOTSTRAP_COUNT,
182 data_dir: default_root_dir(),
183 spawn_delay: Duration::from_millis(DEFAULT_SPAWN_DELAY_MS),
184 stabilization_timeout: Duration::from_secs(DEFAULT_STABILIZATION_TIMEOUT_SECS),
185 node_startup_timeout: Duration::from_secs(DEFAULT_NODE_STARTUP_TIMEOUT_SECS),
186 enable_node_logging: false,
187 cleanup_data_dir: true,
188 evm_network: None,
189 }
190 }
191}
192
193impl DevnetConfig {
194 #[must_use]
196 pub fn minimal() -> Self {
197 Self {
198 node_count: MINIMAL_NODE_COUNT,
199 bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
200 stabilization_timeout: Duration::from_secs(MINIMAL_STABILIZATION_TIMEOUT_SECS),
201 ..Self::default()
202 }
203 }
204
205 #[must_use]
207 pub fn small() -> Self {
208 Self {
209 node_count: SMALL_NODE_COUNT,
210 bootstrap_count: MINIMAL_BOOTSTRAP_COUNT,
211 stabilization_timeout: Duration::from_secs(SMALL_STABILIZATION_TIMEOUT_SECS),
212 ..Self::default()
213 }
214 }
215}
216
217pub use ant_protocol::devnet_manifest::{DevnetEvmInfo, DevnetManifest};
221
222#[derive(Debug, Clone)]
224pub enum NetworkState {
225 Uninitialized,
227 BootstrappingPhase,
229 NodeSpawningPhase,
231 Stabilizing,
233 Ready,
235 ShuttingDown,
237 Stopped,
239}
240
241#[derive(Debug, Clone)]
243pub enum NodeState {
244 Pending,
246 Starting,
248 Running,
250 Connected,
252 Stopped,
254 Failed(String),
256}
257
258#[allow(dead_code)]
260pub struct DevnetNode {
261 index: usize,
262 label: String,
263 peer_id: PeerId,
264 port: u16,
265 data_dir: PathBuf,
266 p2p_node: Option<Arc<P2PNode>>,
267 ant_protocol: Option<Arc<AntProtocol>>,
268 is_bootstrap: bool,
269 state: Arc<RwLock<NodeState>>,
270 bootstrap_addrs: Vec<MultiAddr>,
271 protocol_task: Option<JoinHandle<()>>,
272}
273
274impl DevnetNode {
275 pub async fn peer_count(&self) -> usize {
277 if let Some(ref node) = self.p2p_node {
278 node.peer_count().await
279 } else {
280 0
281 }
282 }
283}
284
285pub struct Devnet {
287 config: DevnetConfig,
288 nodes: Vec<DevnetNode>,
289 shutdown: CancellationToken,
290 state: Arc<RwLock<NetworkState>>,
291 health_monitor: Option<JoinHandle<()>>,
292}
293
294impl Devnet {
295 pub async fn new(mut config: DevnetConfig) -> Result<Self> {
303 if config.bootstrap_count >= config.node_count {
304 return Err(DevnetError::Config(
305 "Bootstrap count must be less than node count".to_string(),
306 ));
307 }
308
309 if config.bootstrap_count == 0 {
310 return Err(DevnetError::Config(
311 "At least one bootstrap node is required".to_string(),
312 ));
313 }
314
315 let node_count = config.node_count;
316 let node_count_u16 = u16::try_from(node_count).map_err(|_| {
317 DevnetError::Config(format!("Node count {node_count} exceeds u16::MAX"))
318 })?;
319
320 if config.base_port == 0 {
321 let mut rng = rand::thread_rng();
322 let max_base_port = DEVNET_PORT_RANGE_MAX.saturating_sub(node_count_u16);
323 config.base_port = rng.gen_range(DEVNET_PORT_RANGE_MIN..max_base_port);
324 }
325
326 let base_port = config.base_port;
327 let max_port = base_port
328 .checked_add(node_count_u16)
329 .ok_or_else(|| {
330 DevnetError::Config(format!(
331 "Port range overflow: base_port {base_port} + node_count {node_count} exceeds u16::MAX"
332 ))
333 })?;
334 if max_port > DEVNET_PORT_RANGE_MAX {
335 return Err(DevnetError::Config(format!(
336 "Port range overflow: max port {max_port} exceeds DEVNET_PORT_RANGE_MAX {DEVNET_PORT_RANGE_MAX}"
337 )));
338 }
339
340 tokio::fs::create_dir_all(&config.data_dir).await?;
341
342 Ok(Self {
343 config,
344 nodes: Vec::new(),
345 shutdown: CancellationToken::new(),
346 state: Arc::new(RwLock::new(NetworkState::Uninitialized)),
347 health_monitor: None,
348 })
349 }
350
351 pub async fn start(&mut self) -> Result<()> {
358 info!(
359 "Starting devnet with {} nodes ({} bootstrap)",
360 self.config.node_count, self.config.bootstrap_count
361 );
362
363 *self.state.write().await = NetworkState::BootstrappingPhase;
364 self.start_bootstrap_nodes().await?;
365
366 *self.state.write().await = NetworkState::NodeSpawningPhase;
367 self.start_regular_nodes().await?;
368
369 *self.state.write().await = NetworkState::Stabilizing;
370 self.wait_for_stabilization().await?;
371
372 self.start_health_monitor();
373
374 *self.state.write().await = NetworkState::Ready;
375 info!("Devnet is ready");
376 Ok(())
377 }
378
379 pub async fn shutdown(&mut self) -> Result<()> {
385 info!("Shutting down devnet");
386 *self.state.write().await = NetworkState::ShuttingDown;
387
388 self.shutdown.cancel();
389
390 if let Some(handle) = self.health_monitor.take() {
391 handle.abort();
392 }
393
394 let mut shutdown_futures = Vec::with_capacity(self.nodes.len());
395 for node in self.nodes.iter_mut().rev() {
396 debug!("Stopping node {}", node.index);
397 if let Some(handle) = node.protocol_task.take() {
398 handle.abort();
399 }
400
401 let node_index = node.index;
402 let node_state = Arc::clone(&node.state);
403 let p2p_node = node.p2p_node.take();
404
405 shutdown_futures.push(async move {
406 if let Some(p2p) = p2p_node {
407 if let Err(e) = p2p.shutdown().await {
408 warn!("Error shutting down node {node_index}: {e}");
409 }
410 }
411 *node_state.write().await = NodeState::Stopped;
412 });
413 }
414 futures::future::join_all(shutdown_futures).await;
415
416 if self.config.cleanup_data_dir {
417 if let Err(e) = tokio::fs::remove_dir_all(&self.config.data_dir).await {
418 warn!("Failed to cleanup devnet data directory: {e}");
419 }
420 }
421
422 *self.state.write().await = NetworkState::Stopped;
423 info!("Devnet shutdown complete");
424 Ok(())
425 }
426
427 #[must_use]
429 pub fn config(&self) -> &DevnetConfig {
430 &self.config
431 }
432
433 #[must_use]
435 pub fn bootstrap_addrs(&self) -> Vec<MultiAddr> {
436 self.nodes
437 .iter()
438 .take(self.config.bootstrap_count)
439 .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
440 .collect()
441 }
442
443 async fn start_bootstrap_nodes(&mut self) -> Result<()> {
444 info!("Starting {} bootstrap nodes", self.config.bootstrap_count);
445
446 for i in 0..self.config.bootstrap_count {
447 let node = self.create_node(i, true, vec![]).await?;
448 self.start_node(node).await?;
449 tokio::time::sleep(self.config.spawn_delay).await;
450 }
451
452 self.wait_for_nodes_ready(0..self.config.bootstrap_count)
453 .await?;
454
455 info!("All bootstrap nodes are ready");
456 Ok(())
457 }
458
459 async fn start_regular_nodes(&mut self) -> Result<()> {
460 let regular_count = self.config.node_count - self.config.bootstrap_count;
461 info!("Starting {} regular nodes", regular_count);
462
463 let bootstrap_addrs: Vec<MultiAddr> = self
464 .nodes
465 .get(0..self.config.bootstrap_count)
466 .ok_or_else(|| {
467 DevnetError::Config(format!(
468 "Bootstrap count {} exceeds nodes length {}",
469 self.config.bootstrap_count,
470 self.nodes.len()
471 ))
472 })?
473 .iter()
474 .map(|n| MultiAddr::quic(SocketAddr::from((Ipv4Addr::LOCALHOST, n.port))))
475 .collect();
476
477 for i in self.config.bootstrap_count..self.config.node_count {
478 let node = self.create_node(i, false, bootstrap_addrs.clone()).await?;
479 self.start_node(node).await?;
480 tokio::time::sleep(self.config.spawn_delay).await;
481 }
482
483 info!("All regular nodes started");
484 Ok(())
485 }
486
487 async fn create_node(
488 &self,
489 index: usize,
490 is_bootstrap: bool,
491 bootstrap_addrs: Vec<MultiAddr>,
492 ) -> Result<DevnetNode> {
493 let index_u16 = u16::try_from(index)
494 .map_err(|_| DevnetError::Config(format!("Node index {index} exceeds u16::MAX")))?;
495 let port = self.config.base_port + index_u16;
496
497 let identity = NodeIdentity::generate()
499 .map_err(|e| DevnetError::Core(format!("Failed to generate node identity: {e}")))?;
500 let peer_id = *identity.peer_id();
501 let label = format!("devnet_node_{index}");
502 let data_dir = self
503 .config
504 .data_dir
505 .join(NODES_SUBDIR)
506 .join(peer_id.to_hex());
507
508 tokio::fs::create_dir_all(&data_dir).await?;
509
510 identity
511 .save_to_file(&data_dir.join(NODE_IDENTITY_FILENAME))
512 .await
513 .map_err(|e| DevnetError::Core(format!("Failed to save node identity: {e}")))?;
514
515 let ant_protocol = Self::create_ant_protocol(&data_dir, &identity, &self.config).await?;
516
517 Ok(DevnetNode {
518 index,
519 label,
520 peer_id,
521 port,
522 data_dir,
523 p2p_node: None,
524 ant_protocol: Some(Arc::new(ant_protocol)),
525 is_bootstrap,
526 state: Arc::new(RwLock::new(NodeState::Pending)),
527 bootstrap_addrs,
528 protocol_task: None,
529 })
530 }
531
532 async fn create_ant_protocol(
533 data_dir: &std::path::Path,
534 identity: &NodeIdentity,
535 config: &DevnetConfig,
536 ) -> Result<AntProtocol> {
537 let storage_config = LmdbStorageConfig {
538 root_dir: data_dir.to_path_buf(),
539 verify_on_read: true,
540 ..LmdbStorageConfig::default()
541 };
542 let storage = LmdbStorage::new(storage_config)
543 .await
544 .map_err(|e| DevnetError::Core(format!("Failed to create LMDB storage: {e}")))?;
545
546 let evm_config = EvmVerifierConfig {
547 network: config
548 .evm_network
549 .clone()
550 .unwrap_or(EvmNetwork::ArbitrumOne),
551 };
552
553 let rewards_address = RewardsAddress::new(DEVNET_REWARDS_ADDRESS);
554 let replication_config = ReplicationConfig::default();
555 let payment_config = PaymentVerifierConfig {
556 evm: evm_config,
557 cache_capacity: DEVNET_PAYMENT_CACHE_CAPACITY,
558 close_group_size: replication_config.close_group_size,
559 local_rewards_address: rewards_address,
560 };
561 let payment_verifier = PaymentVerifier::new(payment_config);
562 let metrics_tracker = QuotingMetricsTracker::new(DEVNET_INITIAL_RECORDS);
563 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
564
565 crate::payment::wire_ml_dsa_signer(&mut quote_generator, identity)
567 .map_err(|e| DevnetError::Startup(format!("Failed to wire ML-DSA-65 signer: {e}")))?;
568
569 let storage = Arc::new(storage);
570 let payment_verifier = Arc::new(payment_verifier);
571
572 Ok(AntProtocol::new(
573 storage,
574 payment_verifier,
575 Arc::new(quote_generator),
576 ))
577 }
578
579 async fn start_node(&mut self, mut node: DevnetNode) -> Result<()> {
580 debug!("Starting node {} on port {}", node.index, node.port);
581 *node.state.write().await = NodeState::Starting;
582
583 let mut core_config = CoreNodeConfig::builder()
584 .port(node.port)
585 .local(true)
586 .max_message_size(crate::ant_protocol::MAX_WIRE_MESSAGE_SIZE)
587 .build()
588 .map_err(|e| DevnetError::Core(format!("Failed to create core config: {e}")))?;
589
590 let identity = NodeIdentity::load_from_file(
592 &node.data_dir.join(crate::config::NODE_IDENTITY_FILENAME),
593 )
594 .await
595 .map_err(|e| DevnetError::Core(format!("Failed to load node identity: {e}")))?;
596
597 core_config.node_identity = Some(Arc::new(identity));
598 core_config
599 .bootstrap_peers
600 .clone_from(&node.bootstrap_addrs);
601 core_config.diversity_config = Some(IPDiversityConfig::permissive());
602
603 let index = node.index;
604 let p2p_node = P2PNode::new(core_config)
605 .await
606 .map_err(|e| DevnetError::Startup(format!("Failed to create node {index}: {e}")))?;
607
608 p2p_node
609 .start()
610 .await
611 .map_err(|e| DevnetError::Startup(format!("Failed to start node {index}: {e}")))?;
612
613 node.p2p_node = Some(Arc::new(p2p_node));
614 *node.state.write().await = NodeState::Running;
615
616 if let (Some(ref p2p), Some(ref protocol)) = (&node.p2p_node, &node.ant_protocol) {
617 protocol.attach_p2p_node(Arc::clone(p2p));
619
620 let mut events = p2p.subscribe_events();
621 let p2p_clone = Arc::clone(p2p);
622 let protocol_clone = Arc::clone(protocol);
623 let node_index = node.index;
624 node.protocol_task = Some(tokio::spawn(async move {
625 while let Ok(event) = events.recv().await {
626 if let P2PEvent::Message {
627 topic,
628 source: Some(source),
629 data,
630 ..
631 } = event
632 {
633 if topic == CHUNK_PROTOCOL_ID {
634 debug!(
635 "Node {node_index} received chunk protocol message from {source}"
636 );
637 let protocol = Arc::clone(&protocol_clone);
638 let p2p = Arc::clone(&p2p_clone);
639 tokio::spawn(async move {
640 match protocol.try_handle_request(&data).await {
641 Ok(Some(response)) => {
642 if let Err(e) = p2p
643 .send_message(
644 &source,
645 CHUNK_PROTOCOL_ID,
646 response.to_vec(),
647 &[],
648 )
649 .await
650 {
651 warn!(
652 "Node {node_index} failed to send response to {source}: {e}"
653 );
654 }
655 }
656 Ok(None) => {}
657 Err(e) => {
658 warn!("Node {node_index} protocol handler error: {e}");
659 }
660 }
661 });
662 }
663 }
664 }
665 }));
666 }
667
668 debug!("Node {} started successfully", node.index);
669 self.nodes.push(node);
670 Ok(())
671 }
672
673 async fn wait_for_nodes_ready(&self, range: std::ops::Range<usize>) -> Result<()> {
674 let deadline = Instant::now() + self.config.node_startup_timeout;
675
676 for i in range {
677 while Instant::now() < deadline {
678 let node = self.nodes.get(i).ok_or_else(|| {
679 DevnetError::Config(format!(
680 "Node index {i} out of bounds (len: {})",
681 self.nodes.len()
682 ))
683 })?;
684 let state = node.state.read().await.clone();
685 match state {
686 NodeState::Running | NodeState::Connected => break,
687 NodeState::Failed(ref e) => {
688 return Err(DevnetError::Startup(format!("Node {i} failed: {e}")));
689 }
690 _ => {
691 tokio::time::sleep(Duration::from_millis(NODE_READY_POLL_INTERVAL_MS))
692 .await;
693 }
694 }
695 }
696 }
697 Ok(())
698 }
699
700 async fn wait_for_stabilization(&self) -> Result<()> {
701 let deadline = Instant::now() + self.config.stabilization_timeout;
702 let min_connections = self
703 .config
704 .bootstrap_count
705 .min(STABILIZATION_MIN_CONNECTIONS_CAP);
706
707 info!(
708 "Waiting for devnet stabilization (min {} connections per node)",
709 min_connections
710 );
711
712 while Instant::now() < deadline {
713 let mut all_connected = true;
714 let mut total_connections = 0;
715
716 for node in &self.nodes {
717 let peer_count = node.peer_count().await;
718 total_connections += peer_count;
719
720 if peer_count < min_connections {
721 all_connected = false;
722 }
723 }
724
725 if all_connected {
726 info!("Devnet stabilized: {} total connections", total_connections);
727 return Ok(());
728 }
729
730 debug!(
731 "Waiting for stabilization: {} total connections",
732 total_connections
733 );
734 tokio::time::sleep(Duration::from_secs(STABILIZATION_POLL_INTERVAL_SECS)).await;
735 }
736
737 Err(DevnetError::Stabilization(
738 "Devnet failed to stabilize within timeout".to_string(),
739 ))
740 }
741
742 fn start_health_monitor(&mut self) {
743 let nodes: Vec<Arc<P2PNode>> = self
744 .nodes
745 .iter()
746 .filter_map(|n| n.p2p_node.clone())
747 .collect();
748 let shutdown = self.shutdown.clone();
749
750 self.health_monitor = Some(tokio::spawn(async move {
751 let check_interval = Duration::from_secs(HEALTH_CHECK_INTERVAL_SECS);
752
753 loop {
754 tokio::select! {
755 () = shutdown.cancelled() => break,
756 () = tokio::time::sleep(check_interval) => {
757 for (i, node) in nodes.iter().enumerate() {
758 if !node.is_running() {
759 warn!("Node {} appears unhealthy", i);
760 }
761 }
762 }
763 }
764 }
765 }));
766 }
767}
768
769impl Drop for Devnet {
770 fn drop(&mut self) {
771 self.shutdown.cancel();
772 if let Some(handle) = self.health_monitor.take() {
773 handle.abort();
774 }
775 }
776}