use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use calimero_crypto::{Nonce, SharedKey};
use calimero_dag::{CausalDelta, DagStore};
use calimero_primitives::context::ContextId;
type ContextHash = [u8; 32];
use calimero_primitives::identity::{PrivateKey, PublicKey};
use calimero_storage::action::Action;
use calimero_storage::address::Id;
use calimero_storage::delta::StorageDelta;
use tokio::sync::{Mutex, RwLock};
struct MockPeer {
peer_id: String,
public_key: PublicKey,
private_key: Arc<PrivateKey>, dag: Arc<RwLock<DagStore<Vec<Action>>>>,
received_deltas: Arc<Mutex<Vec<([u8; 32], Vec<Action>)>>>,
latency_ms: u64,
}
impl Clone for MockPeer {
fn clone(&self) -> Self {
Self {
peer_id: self.peer_id.clone(),
public_key: self.public_key,
private_key: Arc::clone(&self.private_key),
dag: Arc::clone(&self.dag),
received_deltas: Arc::clone(&self.received_deltas),
latency_ms: self.latency_ms,
}
}
}
impl MockPeer {
fn new(peer_id: &str, latency_ms: u64) -> Self {
let private_key = PrivateKey::random(&mut rand::thread_rng());
let public_key = private_key.public_key();
Self {
peer_id: peer_id.to_string(),
public_key,
private_key: Arc::new(private_key),
dag: Arc::new(RwLock::new(DagStore::new([0; 32]))),
received_deltas: Arc::new(Mutex::new(Vec::new())),
latency_ms,
}
}
async fn simulate_latency(&self) {
if self.latency_ms > 0 {
tokio::time::sleep(Duration::from_millis(self.latency_ms)).await;
}
}
}
struct MockNetwork {
peers: HashMap<String, MockPeer>,
subscriptions: Arc<RwLock<HashMap<ContextId, Vec<String>>>>,
broadcast_queue: Arc<Mutex<Vec<BroadcastMessage>>>,
}
#[derive(Clone)]
struct BroadcastMessage {
context_id: ContextId,
author_id: PublicKey,
delta_id: [u8; 32],
parent_ids: Vec<[u8; 32]>,
root_hash: ContextHash,
encrypted_artifact: Vec<u8>,
nonce: Nonce,
}
impl MockNetwork {
fn new() -> Self {
Self {
peers: HashMap::new(),
subscriptions: Arc::new(RwLock::new(HashMap::new())),
broadcast_queue: Arc::new(Mutex::new(Vec::new())),
}
}
fn add_peer(&mut self, peer: MockPeer) {
self.peers.insert(peer.peer_id.clone(), peer);
}
async fn subscribe(&self, peer_id: &str, context_id: ContextId) {
let mut subs = self.subscriptions.write().await;
subs.entry(context_id)
.or_insert_with(Vec::new)
.push(peer_id.to_string());
}
async fn broadcast(
&self,
context_id: ContextId,
author_peer_id: &str,
delta_id: [u8; 32],
parent_ids: Vec<[u8; 32]>,
root_hash: ContextHash,
artifact: Vec<u8>,
sender_key: &PrivateKey,
) -> eyre::Result<()> {
let shared_key = SharedKey::from_sk(sender_key);
let nonce: Nonce = rand::random();
let encrypted = shared_key
.encrypt(artifact.clone(), nonce)
.ok_or_else(|| eyre::eyre!("Failed to encrypt"))?;
let msg = BroadcastMessage {
context_id,
author_id: sender_key.public_key(),
delta_id,
parent_ids,
root_hash,
encrypted_artifact: encrypted,
nonce,
};
self.broadcast_queue.lock().await.push(msg.clone());
let subs = self.subscriptions.read().await;
if let Some(subscribers) = subs.get(&context_id) {
for peer_id in subscribers {
if peer_id != author_peer_id {
if let Some(peer) = self.peers.get(peer_id) {
peer.simulate_latency().await;
let decrypted = shared_key
.decrypt(msg.encrypted_artifact.clone(), msg.nonce)
.ok_or_else(|| eyre::eyre!("Decryption failed"))?;
let storage_delta: StorageDelta = borsh::from_slice(&decrypted)?;
if let StorageDelta::Actions(actions) = storage_delta {
peer.received_deltas
.lock()
.await
.push((msg.delta_id, actions.clone()));
let dag_delta = CausalDelta {
id: msg.delta_id,
parents: msg.parent_ids.clone(),
payload: actions,
hlc: calimero_storage::env::hlc_timestamp(),
expected_root_hash: msg.root_hash.into(),
kind: calimero_dag::DeltaKind::Regular,
};
peer.dag.write().await.has_delta(&dag_delta.id);
}
}
}
}
}
Ok(())
}
async fn request_delta(
&self,
from_peer_id: &str,
delta_id: [u8; 32],
) -> Option<CausalDelta<Vec<Action>>> {
let peer = self.peers.get(from_peer_id)?;
peer.simulate_latency().await;
peer.dag.read().await.get_delta(&delta_id).cloned()
}
async fn request_dag_heads(&self, from_peer_id: &str) -> Option<(Vec<[u8; 32]>, ContextHash)> {
let peer = self.peers.get(from_peer_id)?;
peer.simulate_latency().await;
let heads = peer.dag.read().await.get_heads();
Some((heads, [0; 32])) }
}
#[tokio::test]
async fn test_encrypted_delta_broadcast() {
let mut network = MockNetwork::new();
let context_id = ContextId::from([1; 32]);
let peer_a = MockPeer::new("peer_a", 10);
let peer_b = MockPeer::new("peer_b", 15);
let peer_c = MockPeer::new("peer_c", 20);
network.add_peer(peer_a.clone());
network.add_peer(peer_b.clone());
network.add_peer(peer_c.clone());
network.subscribe("peer_a", context_id).await;
network.subscribe("peer_b", context_id).await;
network.subscribe("peer_c", context_id).await;
let id = Id::from([100; 32]);
let action = Action::Update {
id,
data: vec![1, 2, 3, 4],
ancestors: vec![],
metadata: Default::default(),
};
let storage_delta = StorageDelta::Actions(vec![action.clone()]);
let artifact = borsh::to_vec(&storage_delta).unwrap();
network
.broadcast(
context_id,
"peer_a",
[1; 32],
vec![[0; 32]],
[0; 32],
artifact,
&peer_a.private_key,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let peer_b_received = peer_b.received_deltas.lock().await;
assert_eq!(peer_b_received.len(), 1);
assert_eq!(peer_b_received[0].0, [1; 32]);
assert_eq!(peer_b_received[0].1.len(), 1);
let peer_c_received = peer_c.received_deltas.lock().await;
assert_eq!(peer_c_received.len(), 1);
assert_eq!(peer_c_received[0].0, [1; 32]);
let peer_a_received = peer_a.received_deltas.lock().await;
assert_eq!(peer_a_received.len(), 0);
}
#[tokio::test]
async fn test_encrypted_broadcast_with_different_keys() {
let mut network = MockNetwork::new();
let context_id = ContextId::from([2; 32]);
let peer_a = MockPeer::new("peer_a", 0);
let peer_b = MockPeer::new("peer_b", 0);
network.add_peer(peer_a.clone());
network.add_peer(peer_b.clone());
network.subscribe("peer_a", context_id).await;
network.subscribe("peer_b", context_id).await;
let storage_delta = StorageDelta::Actions(vec![]);
let artifact = borsh::to_vec(&storage_delta).unwrap();
network
.broadcast(
context_id,
"peer_a",
[10; 32],
vec![[0; 32]],
[0; 32],
artifact.clone(),
&peer_a.private_key,
)
.await
.unwrap();
network
.broadcast(
context_id,
"peer_b",
[20; 32],
vec![[0; 32]],
[0; 32],
artifact,
&peer_b.private_key,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(peer_a.received_deltas.lock().await.len(), 1);
assert_eq!(peer_b.received_deltas.lock().await.len(), 1);
}
#[tokio::test]
async fn test_p2p_delta_request() {
let mut network = MockNetwork::new();
let peer_a = MockPeer::new("peer_a", 5);
let peer_b = MockPeer::new("peer_b", 5);
network.add_peer(peer_a.clone());
network.add_peer(peer_b.clone());
let _delta = CausalDelta {
id: [50; 32],
parents: vec![[0; 32]],
payload: vec![Action::Update {
id: Id::from([99; 32]),
data: vec![5, 6, 7],
ancestors: vec![],
metadata: Default::default(),
}],
hlc: calimero_storage::env::hlc_timestamp(),
expected_root_hash: [0xDE; 32],
kind: calimero_dag::DeltaKind::Regular,
};
let requested = network.request_delta("peer_a", [50; 32]).await;
assert!(requested.is_none());
}
#[tokio::test]
async fn test_dag_heads_request() {
let mut network = MockNetwork::new();
let peer_a = MockPeer::new("peer_a", 0);
let peer_b = MockPeer::new("peer_b", 0);
network.add_peer(peer_a.clone());
network.add_peer(peer_b.clone());
let result = network.request_dag_heads("peer_a").await;
assert!(result.is_some());
let (heads, _root_hash) = result.unwrap();
assert_eq!(heads, vec![[0; 32]]);
}
#[tokio::test]
async fn test_network_latency_simulation() {
let mut network = MockNetwork::new();
let context_id = ContextId::from([4; 32]);
let peer_a = MockPeer::new("peer_a", 5);
let peer_b = MockPeer::new("peer_b", 100);
let peer_recv = MockPeer::new("peer_recv", 0);
network.add_peer(peer_a.clone());
network.add_peer(peer_b.clone());
network.add_peer(peer_recv.clone());
network.subscribe("peer_a", context_id).await;
network.subscribe("peer_b", context_id).await;
network.subscribe("peer_recv", context_id).await;
let storage_delta = StorageDelta::Actions(vec![]);
let artifact = borsh::to_vec(&storage_delta).unwrap();
network
.broadcast(
context_id,
"peer_a",
[10; 32],
vec![[0; 32]],
[0; 32],
artifact.clone(),
&peer_a.private_key,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
network
.broadcast(
context_id,
"peer_b",
[20; 32],
vec![[0; 32]],
[0; 32],
artifact,
&peer_b.private_key,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(120)).await;
let received = peer_recv.received_deltas.lock().await;
assert_eq!(received.len(), 2, "Should receive deltas from both peers");
let delta_ids: Vec<_> = received.iter().map(|(id, _)| *id).collect();
assert!(
delta_ids.contains(&[10; 32]),
"Should receive peer_a's delta"
);
assert!(
delta_ids.contains(&[20; 32]),
"Should receive peer_b's delta"
);
}
#[tokio::test]
async fn test_concurrent_broadcasts_from_multiple_peers() {
let mut network = MockNetwork::new();
let context_id = ContextId::from([5; 32]);
let peers: Vec<_> = (0..5)
.map(|i| {
let peer = MockPeer::new(&format!("peer_{}", i), i * 5);
network.add_peer(peer.clone());
peer
})
.collect();
for i in 0..5 {
network.subscribe(&format!("peer_{}", i), context_id).await;
}
for (i, peer) in peers.iter().enumerate() {
let storage_delta = StorageDelta::Actions(vec![]);
let artifact = borsh::to_vec(&storage_delta).unwrap();
let peer_id = format!("peer_{}", i);
network
.broadcast(
context_id,
&peer_id,
[i as u8; 32],
vec![[0; 32]],
[0; 32],
artifact,
&peer.private_key,
)
.await
.unwrap();
}
tokio::time::sleep(Duration::from_millis(50)).await;
for (i, peer) in peers.iter().enumerate() {
let received = peer.received_deltas.lock().await;
assert_eq!(received.len(), 4, "Peer {} should receive 4 deltas", i);
}
}
#[tokio::test]
async fn test_only_subscribed_peers_receive_broadcasts() {
let mut network = MockNetwork::new();
let context_a = ContextId::from([10; 32]);
let context_b = ContextId::from([20; 32]);
let peer_1 = MockPeer::new("peer_1", 0);
let peer_2 = MockPeer::new("peer_2", 0);
let peer_3 = MockPeer::new("peer_3", 0);
network.add_peer(peer_1.clone());
network.add_peer(peer_2.clone());
network.add_peer(peer_3.clone());
network.subscribe("peer_1", context_a).await;
network.subscribe("peer_2", context_a).await;
network.subscribe("peer_3", context_b).await;
let storage_delta = StorageDelta::Actions(vec![]);
let artifact = borsh::to_vec(&storage_delta).unwrap();
network
.broadcast(
context_a,
"peer_1",
[1; 32],
vec![[0; 32]],
[0; 32],
artifact,
&peer_1.private_key,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(peer_2.received_deltas.lock().await.len(), 1);
assert_eq!(peer_3.received_deltas.lock().await.len(), 0);
}