use crate::rpc::{
DagStats, MemoryStats, NetworkStats, NetworkTestResult, NodeStatus, PeerInfo, RpcError,
RpcRequest, RpcResponse, WalletInfo,
};
use anyhow::{anyhow, Result};
use qudag_network::{NetworkAddress, PeerId};
use qudag_protocol::{Node, NodeConfig, ProtocolState};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct MockBehavior {
pub should_succeed: bool,
pub latency_ms: u64,
pub error_message: String,
pub custom_response: Option<serde_json::Value>,
}
impl Default for MockBehavior {
fn default() -> Self {
Self {
should_succeed: true,
latency_ms: 10,
error_message: "Mock error".to_string(),
custom_response: None,
}
}
}
pub struct MockNode {
pub id: String,
pub state: Arc<RwLock<NodeState>>,
pub behaviors: Arc<RwLock<HashMap<String, MockBehavior>>>,
pub peers: Arc<RwLock<Vec<MockPeer>>>,
pub network_stats: Arc<RwLock<NetworkStats>>,
pub dag_stats: Arc<RwLock<DagStats>>,
pub memory_stats: Arc<RwLock<MemoryStats>>,
pub start_time: SystemTime,
}
#[derive(Debug, Clone)]
pub enum NodeState {
Stopped,
Starting,
Running,
Stopping,
Error(String),
}
#[derive(Debug, Clone)]
pub struct MockPeer {
pub id: String,
pub address: String,
pub connected_at: SystemTime,
pub messages_sent: u64,
pub messages_received: u64,
pub last_seen: SystemTime,
}
impl MockNode {
pub fn new(id: String) -> Self {
Self {
id,
state: Arc::new(RwLock::new(NodeState::Stopped)),
behaviors: Arc::new(RwLock::new(HashMap::new())),
peers: Arc::new(RwLock::new(Vec::new())),
network_stats: Arc::new(RwLock::new(NetworkStats {
total_connections: 0,
active_connections: 0,
messages_sent: 0,
messages_received: 0,
bytes_sent: 0,
bytes_received: 0,
average_latency: 0.0,
uptime: 0,
})),
dag_stats: Arc::new(RwLock::new(DagStats {
vertex_count: 0,
edge_count: 0,
tip_count: 0,
finalized_height: 0,
pending_transactions: 0,
})),
memory_stats: Arc::new(RwLock::new(MemoryStats {
total_allocated: 0,
current_usage: 0,
peak_usage: 0,
})),
start_time: SystemTime::now(),
}
}
pub async fn set_behavior(&self, method: &str, behavior: MockBehavior) {
self.behaviors
.write()
.await
.insert(method.to_string(), behavior);
}
pub async fn get_status(&self) -> NodeStatus {
let state = self.state.read().await;
let peers = self.peers.read().await;
let network_stats = self.network_stats.read().await;
let dag_stats = self.dag_stats.read().await;
let memory_stats = self.memory_stats.read().await;
let uptime = SystemTime::now()
.duration_since(self.start_time)
.unwrap_or(Duration::from_secs(0))
.as_secs();
let peer_info: Vec<PeerInfo> = peers
.iter()
.map(|p| PeerInfo {
id: p.id.clone(),
address: p.address.clone(),
connected_duration: SystemTime::now()
.duration_since(p.connected_at)
.unwrap_or(Duration::from_secs(0))
.as_secs(),
messages_sent: p.messages_sent,
messages_received: p.messages_received,
last_seen: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
status: "Connected".to_string(),
latency: None,
})
.collect();
NodeStatus {
node_id: self.id.clone(),
state: format!("{:?}", *state),
uptime,
peers: peer_info,
network_stats: network_stats.clone(),
dag_stats: dag_stats.clone(),
memory_usage: memory_stats.clone(),
}
}
pub async fn start(&self) -> Result<()> {
let mut state = self.state.write().await;
match &*state {
NodeState::Stopped => {
*state = NodeState::Starting;
tokio::time::sleep(Duration::from_millis(100)).await;
*state = NodeState::Running;
let mut dag_stats = self.dag_stats.write().await;
dag_stats.vertex_count = 10;
dag_stats.edge_count = 15;
dag_stats.tip_count = 3;
Ok(())
}
_ => Err(anyhow!("Node is already running or in transition")),
}
}
pub async fn stop(&self) -> Result<()> {
let mut state = self.state.write().await;
match &*state {
NodeState::Running => {
*state = NodeState::Stopping;
tokio::time::sleep(Duration::from_millis(50)).await;
*state = NodeState::Stopped;
self.peers.write().await.clear();
Ok(())
}
_ => Err(anyhow!("Node is not running")),
}
}
pub async fn add_peer(&self, address: String) -> Result<()> {
let mut peers = self.peers.write().await;
if peers.iter().any(|p| p.address == address) {
return Err(anyhow!("Peer already connected"));
}
let peer = MockPeer {
id: format!("peer-{}", uuid::Uuid::new_v4()),
address,
connected_at: SystemTime::now(),
messages_sent: 0,
messages_received: 0,
last_seen: SystemTime::now(),
};
peers.push(peer);
let mut stats = self.network_stats.write().await;
stats.total_connections += 1;
stats.active_connections += 1;
Ok(())
}
pub async fn remove_peer(&self, peer_id: &str) -> Result<()> {
let mut peers = self.peers.write().await;
let initial_count = peers.len();
peers.retain(|p| p.id != peer_id);
if peers.len() == initial_count {
return Err(anyhow!("Peer not found"));
}
let mut stats = self.network_stats.write().await;
stats.active_connections = stats.active_connections.saturating_sub(1);
Ok(())
}
pub async fn simulate_activity(&self) {
let mut peers = self.peers.write().await;
let mut stats = self.network_stats.write().await;
for peer in peers.iter_mut() {
peer.messages_sent += rand::random::<u64>() % 10;
peer.messages_received += rand::random::<u64>() % 10;
peer.last_seen = SystemTime::now();
stats.messages_sent += peer.messages_sent;
stats.messages_received += peer.messages_received;
}
stats.bytes_sent = stats.messages_sent * 256; stats.bytes_received = stats.messages_received * 256;
stats.average_latency = 20.0 + (rand::random::<f64>() * 10.0);
let mut dag = self.dag_stats.write().await;
dag.vertex_count += rand::random::<usize>() % 5;
dag.edge_count += rand::random::<usize>() % 8;
dag.pending_transactions = rand::random::<usize>() % 20;
}
}
pub struct MockPeerManager {
pub peers: Arc<RwLock<HashMap<String, MockPeer>>>,
pub connection_attempts: Arc<Mutex<Vec<ConnectionAttempt>>>,
pub behaviors: Arc<RwLock<HashMap<String, MockBehavior>>>,
}
#[derive(Debug, Clone)]
pub struct ConnectionAttempt {
pub address: String,
pub timestamp: SystemTime,
pub success: bool,
pub error: Option<String>,
}
impl MockPeerManager {
pub fn new() -> Self {
Self {
peers: Arc::new(RwLock::new(HashMap::new())),
connection_attempts: Arc::new(Mutex::new(Vec::new())),
behaviors: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn add_known_peer(&self, peer: MockPeer) {
self.peers.write().await.insert(peer.id.clone(), peer);
}
pub async fn connect_to_peer(&self, address: String) -> Result<String> {
let behaviors = self.behaviors.read().await;
let behavior = behaviors.get("connect").cloned().unwrap_or_default();
if behavior.latency_ms > 0 {
tokio::time::sleep(Duration::from_millis(behavior.latency_ms)).await;
}
let attempt = ConnectionAttempt {
address: address.clone(),
timestamp: SystemTime::now(),
success: behavior.should_succeed,
error: if behavior.should_succeed {
None
} else {
Some(behavior.error_message.clone())
},
};
self.connection_attempts
.lock()
.unwrap()
.push(attempt.clone());
if behavior.should_succeed {
let peer_id = format!("peer-{}", uuid::Uuid::new_v4());
let peer = MockPeer {
id: peer_id.clone(),
address,
connected_at: SystemTime::now(),
messages_sent: 0,
messages_received: 0,
last_seen: SystemTime::now(),
};
self.peers.write().await.insert(peer_id.clone(), peer);
Ok(peer_id)
} else {
Err(anyhow!(behavior.error_message))
}
}
pub fn get_connection_attempts(&self) -> Vec<ConnectionAttempt> {
self.connection_attempts.lock().unwrap().clone()
}
}
pub struct MockNetworkStats {
pub stats: Arc<RwLock<NetworkStats>>,
pub history: Arc<Mutex<Vec<NetworkStatsSnapshot>>>,
pub update_interval: Duration,
}
#[derive(Debug, Clone)]
pub struct NetworkStatsSnapshot {
pub timestamp: SystemTime,
pub stats: NetworkStats,
}
impl MockNetworkStats {
pub fn new() -> Self {
Self {
stats: Arc::new(RwLock::new(NetworkStats {
total_connections: 0,
active_connections: 0,
messages_sent: 0,
messages_received: 0,
bytes_sent: 0,
bytes_received: 0,
average_latency: 0.0,
uptime: 0,
})),
history: Arc::new(Mutex::new(Vec::new())),
update_interval: Duration::from_secs(1),
}
}
pub async fn update(&self) {
let mut stats = self.stats.write().await;
stats.messages_sent += rand::random::<u64>() % 100;
stats.messages_received += rand::random::<u64>() % 100;
stats.bytes_sent += stats.messages_sent * 256;
stats.bytes_received += stats.messages_received * 256;
stats.average_latency = 15.0 + (rand::random::<f64>() * 20.0);
let snapshot = NetworkStatsSnapshot {
timestamp: SystemTime::now(),
stats: stats.clone(),
};
self.history.lock().unwrap().push(snapshot);
}
pub async fn get_current(&self) -> NetworkStats {
self.stats.read().await.clone()
}
pub fn get_history(&self) -> Vec<NetworkStatsSnapshot> {
self.history.lock().unwrap().clone()
}
pub async fn reset(&self) {
let mut stats = self.stats.write().await;
*stats = NetworkStats {
total_connections: stats.total_connections,
active_connections: stats.active_connections,
messages_sent: 0,
messages_received: 0,
bytes_sent: 0,
bytes_received: 0,
average_latency: 0.0,
uptime: stats.uptime,
};
self.history.lock().unwrap().clear();
}
}
pub struct MockRpcClient {
pub node: Arc<MockNode>,
pub behaviors: Arc<RwLock<HashMap<String, MockBehavior>>>,
pub request_history: Arc<Mutex<Vec<RpcRequest>>>,
}
impl MockRpcClient {
pub fn new(node: Arc<MockNode>) -> Self {
Self {
node,
behaviors: Arc::new(RwLock::new(HashMap::new())),
request_history: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn set_behavior(&self, method: &str, behavior: MockBehavior) {
self.behaviors
.write()
.await
.insert(method.to_string(), behavior);
}
pub async fn process_request(&self, request: RpcRequest) -> RpcResponse {
self.request_history.lock().unwrap().push(request.clone());
let behaviors = self.behaviors.read().await;
let behavior = behaviors.get(&request.method).cloned().unwrap_or_default();
if behavior.latency_ms > 0 {
tokio::time::sleep(Duration::from_millis(behavior.latency_ms)).await;
}
if let Some(custom) = behavior.custom_response {
return RpcResponse {
id: request.id,
result: Some(custom),
error: None,
};
}
if behavior.should_succeed {
let result = match request.method.as_str() {
"get_status" => {
let status = self.node.get_status().await;
serde_json::to_value(status).ok()
}
"start" => match self.node.start().await {
Ok(_) => Some(serde_json::Value::Bool(true)),
Err(e) => {
return RpcResponse {
id: request.id,
result: None,
error: Some(RpcError {
code: -1,
message: e.to_string(),
data: None,
}),
}
}
},
"stop" => match self.node.stop().await {
Ok(_) => Some(serde_json::Value::Bool(true)),
Err(e) => {
return RpcResponse {
id: request.id,
result: None,
error: Some(RpcError {
code: -1,
message: e.to_string(),
data: None,
}),
}
}
},
"list_peers" => {
let peers = self.node.peers.read().await;
let peer_info: Vec<PeerInfo> = peers
.iter()
.map(|p| PeerInfo {
id: p.id.clone(),
address: p.address.clone(),
connected_duration: SystemTime::now()
.duration_since(p.connected_at)
.unwrap_or(Duration::from_secs(0))
.as_secs(),
messages_sent: p.messages_sent,
messages_received: p.messages_received,
last_seen: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
status: "Connected".to_string(),
latency: None,
})
.collect();
serde_json::to_value(peer_info).ok()
}
"add_peer" => {
if let Some(params) = request.params.as_object() {
if let Some(address) = params.get("address").and_then(|v| v.as_str()) {
match self.node.add_peer(address.to_string()).await {
Ok(_) => Some(serde_json::Value::Bool(true)),
Err(e) => {
return RpcResponse {
id: request.id,
result: None,
error: Some(RpcError {
code: -1,
message: e.to_string(),
data: None,
}),
}
}
}
} else {
return RpcResponse {
id: request.id,
result: None,
error: Some(RpcError {
code: -32602,
message: "Invalid params: missing address".to_string(),
data: None,
}),
};
}
} else {
return RpcResponse {
id: request.id,
result: None,
error: Some(RpcError {
code: -32602,
message: "Invalid params".to_string(),
data: None,
}),
};
}
}
"get_network_stats" => {
let stats = self.node.network_stats.read().await;
serde_json::to_value(&*stats).ok()
}
"test_network" => {
let results = vec![
NetworkTestResult {
peer_id: "peer-1".to_string(),
address: "192.168.1.10:8080".to_string(),
reachable: true,
latency: Some(15.5),
error: None,
},
NetworkTestResult {
peer_id: "peer-2".to_string(),
address: "192.168.1.11:8080".to_string(),
reachable: true,
latency: Some(22.3),
error: None,
},
NetworkTestResult {
peer_id: "peer-3".to_string(),
address: "192.168.1.12:8080".to_string(),
reachable: false,
latency: None,
error: Some("Connection timeout".to_string()),
},
];
serde_json::to_value(results).ok()
}
_ => Some(serde_json::json!({
"message": format!("Mock response for {}", request.method)
})),
};
RpcResponse {
id: request.id,
result,
error: None,
}
} else {
RpcResponse {
id: request.id,
result: None,
error: Some(RpcError {
code: -1,
message: behavior.error_message,
data: None,
}),
}
}
}
pub fn get_request_history(&self) -> Vec<RpcRequest> {
self.request_history.lock().unwrap().clone()
}
pub fn clear_history(&self) {
self.request_history.lock().unwrap().clear();
}
}
pub struct TestScenarioBuilder {
pub nodes: HashMap<String, Arc<MockNode>>,
pub topology: NetworkTopology,
pub global_behaviors: HashMap<String, MockBehavior>,
}
#[derive(Debug, Clone)]
pub enum NetworkTopology {
FullMesh,
Ring,
Star { center: String },
Custom { connections: Vec<(String, String)> },
}
impl TestScenarioBuilder {
pub fn new() -> Self {
Self {
nodes: HashMap::new(),
topology: NetworkTopology::FullMesh,
global_behaviors: HashMap::new(),
}
}
pub fn add_node(mut self, id: String) -> Self {
let node = Arc::new(MockNode::new(id.clone()));
self.nodes.insert(id, node);
self
}
pub fn with_topology(mut self, topology: NetworkTopology) -> Self {
self.topology = topology;
self
}
pub fn with_global_behavior(mut self, method: String, behavior: MockBehavior) -> Self {
self.global_behaviors.insert(method, behavior);
self
}
pub async fn build(self) -> TestScenario {
let mut scenario = TestScenario {
nodes: self.nodes,
start_time: SystemTime::now(),
};
for (method, behavior) in self.global_behaviors {
for node in scenario.nodes.values() {
node.set_behavior(&method, behavior.clone()).await;
}
}
match self.topology {
NetworkTopology::FullMesh => {
scenario.configure_full_mesh().await;
}
NetworkTopology::Ring => {
scenario.configure_ring().await;
}
NetworkTopology::Star { center } => {
scenario.configure_star(¢er).await;
}
NetworkTopology::Custom { connections } => {
scenario.configure_custom(connections).await;
}
}
scenario
}
}
pub struct TestScenario {
pub nodes: HashMap<String, Arc<MockNode>>,
pub start_time: SystemTime,
}
impl TestScenario {
async fn configure_full_mesh(&self) {
let node_ids: Vec<String> = self.nodes.keys().cloned().collect();
for (i, node_id) in node_ids.iter().enumerate() {
if let Some(node) = self.nodes.get(node_id) {
for (j, peer_id) in node_ids.iter().enumerate() {
if i != j {
let address = format!("{}:8080", peer_id);
let _ = node.add_peer(address).await;
}
}
}
}
}
async fn configure_ring(&self) {
let node_ids: Vec<String> = self.nodes.keys().cloned().collect();
let n = node_ids.len();
for (i, node_id) in node_ids.iter().enumerate() {
if let Some(node) = self.nodes.get(node_id) {
let next_idx = (i + 1) % n;
let next_id = &node_ids[next_idx];
let address = format!("{}:8080", next_id);
let _ = node.add_peer(address).await;
let prev_idx = if i == 0 { n - 1 } else { i - 1 };
let prev_id = &node_ids[prev_idx];
let address = format!("{}:8080", prev_id);
let _ = node.add_peer(address).await;
}
}
}
async fn configure_star(&self, center: &str) {
if let Some(center_node) = self.nodes.get(center) {
for (node_id, node) in &self.nodes {
if node_id != center {
let center_address = format!("{}:8080", center);
let _ = node.add_peer(center_address).await;
let node_address = format!("{}:8080", node_id);
let _ = center_node.add_peer(node_address).await;
}
}
}
}
async fn configure_custom(&self, connections: Vec<(String, String)>) {
for (from, to) in connections {
if let Some(from_node) = self.nodes.get(&from) {
let to_address = format!("{}:8080", to);
let _ = from_node.add_peer(to_address).await;
}
}
}
pub async fn start_all_nodes(&self) -> Result<()> {
for node in self.nodes.values() {
node.start().await?;
}
Ok(())
}
pub async fn stop_all_nodes(&self) -> Result<()> {
for node in self.nodes.values() {
node.stop().await?;
}
Ok(())
}
pub async fn simulate_activity(&self, duration: Duration) {
let end_time = SystemTime::now() + duration;
while SystemTime::now() < end_time {
for node in self.nodes.values() {
node.simulate_activity().await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
pub async fn get_aggregate_stats(&self) -> NetworkStats {
let mut total_stats = NetworkStats {
total_connections: 0,
active_connections: 0,
messages_sent: 0,
messages_received: 0,
bytes_sent: 0,
bytes_received: 0,
average_latency: 0.0,
uptime: 0,
};
let mut latency_sum = 0.0;
let mut latency_count = 0;
for node in self.nodes.values() {
let stats = node.network_stats.read().await;
total_stats.total_connections += stats.total_connections;
total_stats.active_connections += stats.active_connections;
total_stats.messages_sent += stats.messages_sent;
total_stats.messages_received += stats.messages_received;
total_stats.bytes_sent += stats.bytes_sent;
total_stats.bytes_received += stats.bytes_received;
if stats.average_latency > 0.0 {
latency_sum += stats.average_latency;
latency_count += 1;
}
}
if latency_count > 0 {
total_stats.average_latency = latency_sum / latency_count as f64;
}
total_stats
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_mock_node_lifecycle() {
let node = MockNode::new("test-node".to_string());
assert!(matches!(*node.state.read().await, NodeState::Stopped));
assert!(node.start().await.is_ok());
assert!(matches!(*node.state.read().await, NodeState::Running));
assert!(node.start().await.is_err());
assert!(node.stop().await.is_ok());
assert!(matches!(*node.state.read().await, NodeState::Stopped));
}
#[tokio::test]
async fn test_mock_peer_management() {
let node = MockNode::new("test-node".to_string());
assert!(node.add_peer("192.168.1.10:8080".to_string()).await.is_ok());
assert_eq!(node.peers.read().await.len(), 1);
assert!(node
.add_peer("192.168.1.10:8080".to_string())
.await
.is_err());
let peer_id = node.peers.read().await[0].id.clone();
assert!(node.remove_peer(&peer_id).await.is_ok());
assert_eq!(node.peers.read().await.len(), 0);
assert!(node.remove_peer("non-existent").await.is_err());
}
#[tokio::test]
async fn test_mock_rpc_client() {
let node = Arc::new(MockNode::new("test-node".to_string()));
let rpc = MockRpcClient::new(node);
let request = RpcRequest {
id: Uuid::new_v4(),
method: "get_status".to_string(),
params: serde_json::Value::Null,
};
let response = rpc.process_request(request.clone()).await;
assert!(response.result.is_some());
assert!(response.error.is_none());
rpc.set_behavior(
"test_error",
MockBehavior {
should_succeed: false,
error_message: "Test error".to_string(),
..Default::default()
},
)
.await;
let error_request = RpcRequest {
id: Uuid::new_v4(),
method: "test_error".to_string(),
params: serde_json::Value::Null,
};
let error_response = rpc.process_request(error_request).await;
assert!(error_response.result.is_none());
assert!(error_response.error.is_some());
}
#[tokio::test]
async fn test_scenario_builder() {
let scenario = TestScenarioBuilder::new()
.add_node("node1".to_string())
.add_node("node2".to_string())
.add_node("node3".to_string())
.with_topology(NetworkTopology::Ring)
.build()
.await;
assert!(scenario.start_all_nodes().await.is_ok());
for node in scenario.nodes.values() {
assert_eq!(node.peers.read().await.len(), 2); }
assert!(scenario.stop_all_nodes().await.is_ok());
}
}