use bytes::Bytes;
use commonware_cryptography::{ed25519, PrivateKeyExt, Signer};
use commonware_p2p::simulated::{Config as SimConfig, Link, Network};
use commonware_p2p::{Receiver as P2PReceiverTrait, Recipients, Sender as P2PSenderTrait};
use commonware_runtime::{deterministic, Metrics, Runner};
use guts_p2p::{
Message, ObjectData, ReplicationProtocol, RepoAnnounce, SyncRequest, REPLICATION_CHANNEL,
};
use guts_storage::{GitObject, ObjectType, Reference, Repository};
use std::sync::Arc;
use std::time::Duration;
#[test]
fn test_three_node_replication() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, mut oracle) = Network::new(
context.with_label("network"),
SimConfig {
max_size: guts_p2p::MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: None,
},
);
network.start();
let node_keys: Vec<_> = (0..3)
.map(|i| {
let sk = ed25519::PrivateKey::from_seed(i);
let pk = sk.public_key();
(sk, pk)
})
.collect();
let protocols: Vec<_> = (0..3)
.map(|_| Arc::new(ReplicationProtocol::new()))
.collect();
let mut senders = Vec::new();
let mut receivers = Vec::new();
for (_, pk) in &node_keys {
let (sender, receiver) = oracle
.control(pk.clone())
.register(REPLICATION_CHANNEL)
.await
.expect("Failed to register node");
senders.push(sender);
receivers.push(receiver);
}
for i in 0..3 {
for j in 0..3 {
if i != j {
oracle
.add_link(
node_keys[i].1.clone(),
node_keys[j].1.clone(),
Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
},
)
.await
.expect("Failed to add link");
}
}
}
let repo_key = "alice/test-repo";
let repo1 = Arc::new(Repository::new("test-repo", "alice"));
protocols[0].register_repo(repo_key.to_string(), repo1.clone());
let blob = GitObject::blob(b"Hello from Node 1!".to_vec());
let blob_id = repo1.objects.put(blob.clone());
let mut tree_data = Vec::new();
tree_data.extend_from_slice(b"100644 README.md\0");
tree_data.extend_from_slice(blob_id.as_bytes());
let tree = GitObject::new(ObjectType::Tree, Bytes::from(tree_data));
let tree_id = repo1.objects.put(tree.clone());
let commit_data = format!(
"tree {}\nauthor Alice <alice@example.com> 1234567890 +0000\ncommitter Alice <alice@example.com> 1234567890 +0000\n\nInitial commit\n",
tree_id.to_hex()
);
let commit = GitObject::new(ObjectType::Commit, Bytes::from(commit_data));
let commit_id = repo1.objects.put(commit.clone());
repo1.refs.set("refs/heads/main", commit_id);
assert_eq!(repo1.objects.len(), 3, "Node 1 should have 3 objects");
let announce = RepoAnnounce {
repo_key: repo_key.to_string(),
object_ids: vec![blob_id, tree_id, commit_id],
refs: vec![("refs/heads/main".to_string(), commit_id)],
};
senders[0]
.send(Recipients::All, announce.encode(), false)
.await
.expect("Failed to send announcement");
let (sender_pk, msg_data) = receivers[1].recv().await.expect("Node 2 should receive message");
assert_eq!(sender_pk, node_keys[0].1, "Message should be from Node 1");
let msg = Message::decode(&msg_data).expect("Should decode message");
let response = match msg {
Message::RepoAnnounce(announce) => {
protocols[1].register_repo(announce.repo_key.clone(), Arc::new(Repository::new("test-repo", "alice")));
let _repo2 = protocols[1].get_repo(&announce.repo_key).unwrap();
Some(SyncRequest {
repo_key: announce.repo_key.clone(),
want: announce.object_ids.clone(),
})
}
_ => panic!("Expected RepoAnnounce message"),
};
if let Some(sync_request) = response {
senders[1]
.send(Recipients::One(node_keys[0].1.clone()), sync_request.encode(), false)
.await
.expect("Failed to send sync request");
}
let (sender_pk, msg_data) = receivers[2].recv().await.expect("Node 3 should receive message");
assert_eq!(sender_pk, node_keys[0].1, "Message should be from Node 1");
let msg = Message::decode(&msg_data).expect("Should decode message");
let response = match msg {
Message::RepoAnnounce(announce) => {
protocols[2].register_repo(announce.repo_key.clone(), Arc::new(Repository::new("test-repo", "alice")));
Some(SyncRequest {
repo_key: announce.repo_key.clone(),
want: announce.object_ids.clone(),
})
}
_ => panic!("Expected RepoAnnounce message"),
};
if let Some(sync_request) = response {
senders[2]
.send(Recipients::One(node_keys[0].1.clone()), sync_request.encode(), false)
.await
.expect("Failed to send sync request");
}
let (sender_pk, msg_data) = receivers[0].recv().await.expect("Node 1 should receive sync request");
let msg = Message::decode(&msg_data).expect("Should decode message");
match msg {
Message::SyncRequest(request) => {
let objects: Vec<_> = request.want.iter()
.filter_map(|oid| repo1.objects.get(oid).ok())
.collect();
let object_data = ObjectData {
repo_key: request.repo_key.clone(),
objects,
};
senders[0]
.send(Recipients::One(sender_pk.clone()), object_data.encode(), false)
.await
.expect("Failed to send objects to Node 2");
}
_ => panic!("Expected SyncRequest message"),
}
let (sender_pk, msg_data) = receivers[0].recv().await.expect("Node 1 should receive second sync request");
let msg = Message::decode(&msg_data).expect("Should decode message");
match msg {
Message::SyncRequest(request) => {
let objects: Vec<_> = request.want.iter()
.filter_map(|oid| repo1.objects.get(oid).ok())
.collect();
let object_data = ObjectData {
repo_key: request.repo_key.clone(),
objects,
};
senders[0]
.send(Recipients::One(sender_pk.clone()), object_data.encode(), false)
.await
.expect("Failed to send objects to Node 3");
}
_ => panic!("Expected SyncRequest message"),
}
let (_, msg_data) = receivers[1].recv().await.expect("Node 2 should receive objects");
let msg = Message::decode(&msg_data).expect("Should decode message");
match msg {
Message::ObjectData(data) => {
let repo2 = protocols[1].get_repo(&data.repo_key).unwrap();
for obj in data.objects {
repo2.objects.put(obj);
}
repo2.refs.set("refs/heads/main", commit_id);
}
_ => panic!("Expected ObjectData message"),
}
let (_, msg_data) = receivers[2].recv().await.expect("Node 3 should receive objects");
let msg = Message::decode(&msg_data).expect("Should decode message");
match msg {
Message::ObjectData(data) => {
let repo3 = protocols[2].get_repo(&data.repo_key).unwrap();
for obj in data.objects {
repo3.objects.put(obj);
}
repo3.refs.set("refs/heads/main", commit_id);
}
_ => panic!("Expected ObjectData message"),
}
let repo2 = protocols[1].get_repo(repo_key).expect("Node 2 should have repo");
let repo3 = protocols[2].get_repo(repo_key).expect("Node 3 should have repo");
assert_eq!(repo2.objects.len(), 3, "Node 2 should have 3 objects");
assert_eq!(repo3.objects.len(), 3, "Node 3 should have 3 objects");
assert!(repo2.objects.contains(&blob_id), "Node 2 should have blob");
assert!(repo2.objects.contains(&tree_id), "Node 2 should have tree");
assert!(repo2.objects.contains(&commit_id), "Node 2 should have commit");
assert!(repo3.objects.contains(&blob_id), "Node 3 should have blob");
assert!(repo3.objects.contains(&tree_id), "Node 3 should have tree");
assert!(repo3.objects.contains(&commit_id), "Node 3 should have commit");
let refs1 = repo1.refs.get("refs/heads/main").expect("Node 1 should have main ref");
let refs2 = repo2.refs.get("refs/heads/main").expect("Node 2 should have main ref");
let refs3 = repo3.refs.get("refs/heads/main").expect("Node 3 should have main ref");
match (&refs1, &refs2, &refs3) {
(Reference::Direct(r1), Reference::Direct(r2), Reference::Direct(r3)) => {
assert_eq!(r1, r2, "Node 1 and Node 2 refs should match");
assert_eq!(r2, r3, "Node 2 and Node 3 refs should match");
assert_eq!(*r1, commit_id, "All refs should point to commit");
}
_ => panic!("All refs should be direct"),
}
let blob2 = GitObject::blob(b"Hello from Client 2 on Node 2!".to_vec());
let blob2_id = repo2.objects.put(blob2.clone());
let mut tree2_data = Vec::new();
tree2_data.extend_from_slice(b"100644 README.md\0");
tree2_data.extend_from_slice(blob_id.as_bytes());
tree2_data.extend_from_slice(b"100644 client2.txt\0");
tree2_data.extend_from_slice(blob2_id.as_bytes());
let tree2 = GitObject::new(ObjectType::Tree, Bytes::from(tree2_data));
let tree2_id = repo2.objects.put(tree2.clone());
let commit2_data = format!(
"tree {}\nparent {}\nauthor Bob <bob@example.com> 1234567891 +0000\ncommitter Bob <bob@example.com> 1234567891 +0000\n\nAdd client2.txt\n",
tree2_id.to_hex(),
commit_id.to_hex()
);
let commit2 = GitObject::new(ObjectType::Commit, Bytes::from(commit2_data));
let commit2_id = repo2.objects.put(commit2.clone());
repo2.refs.set("refs/heads/main", commit2_id);
assert_eq!(repo2.objects.len(), 6, "Node 2 should have 6 objects");
let announce2 = RepoAnnounce {
repo_key: repo_key.to_string(),
object_ids: vec![blob2_id, tree2_id, commit2_id],
refs: vec![("refs/heads/main".to_string(), commit2_id)],
};
senders[1]
.send(Recipients::All, announce2.encode(), false)
.await
.expect("Failed to send announcement from Node 2");
let (sender_pk, msg_data) = receivers[0].recv().await.expect("Node 1 should receive announcement from Node 2");
assert_eq!(sender_pk, node_keys[1].1, "Message should be from Node 2");
let msg = Message::decode(&msg_data).expect("Should decode message");
match msg {
Message::RepoAnnounce(announce) => {
let sync_request = SyncRequest {
repo_key: announce.repo_key.clone(),
want: announce.object_ids.clone(),
};
senders[0]
.send(Recipients::One(node_keys[1].1.clone()), sync_request.encode(), false)
.await
.expect("Failed to send sync request");
}
_ => panic!("Expected RepoAnnounce"),
}
let (sender_pk, msg_data) = receivers[2].recv().await.expect("Node 3 should receive announcement from Node 2");
assert_eq!(sender_pk, node_keys[1].1, "Message should be from Node 2");
let msg = Message::decode(&msg_data).expect("Should decode message");
match msg {
Message::RepoAnnounce(announce) => {
let sync_request = SyncRequest {
repo_key: announce.repo_key.clone(),
want: announce.object_ids.clone(),
};
senders[2]
.send(Recipients::One(node_keys[1].1.clone()), sync_request.encode(), false)
.await
.expect("Failed to send sync request");
}
_ => panic!("Expected RepoAnnounce"),
}
for _ in 0..2 {
let (sender_pk, msg_data) = receivers[1].recv().await.expect("Node 2 should receive sync request");
let msg = Message::decode(&msg_data).expect("Should decode message");
match msg {
Message::SyncRequest(request) => {
let objects: Vec<_> = request.want.iter()
.filter_map(|oid| repo2.objects.get(oid).ok())
.collect();
let object_data = ObjectData {
repo_key: request.repo_key.clone(),
objects,
};
senders[1]
.send(Recipients::One(sender_pk.clone()), object_data.encode(), false)
.await
.expect("Failed to send objects");
}
_ => panic!("Expected SyncRequest"),
}
}
let (_, msg_data) = receivers[0].recv().await.expect("Node 1 should receive objects");
let msg = Message::decode(&msg_data).expect("Should decode message");
match msg {
Message::ObjectData(data) => {
for obj in data.objects {
repo1.objects.put(obj);
}
repo1.refs.set("refs/heads/main", commit2_id);
}
_ => panic!("Expected ObjectData"),
}
let (_, msg_data) = receivers[2].recv().await.expect("Node 3 should receive objects");
let msg = Message::decode(&msg_data).expect("Should decode message");
match msg {
Message::ObjectData(data) => {
for obj in data.objects {
repo3.objects.put(obj);
}
repo3.refs.set("refs/heads/main", commit2_id);
}
_ => panic!("Expected ObjectData"),
}
assert_eq!(repo1.objects.len(), 6, "Node 1 should have 6 objects");
assert_eq!(repo2.objects.len(), 6, "Node 2 should have 6 objects");
assert_eq!(repo3.objects.len(), 6, "Node 3 should have 6 objects");
let all_object_ids = vec![blob_id, tree_id, commit_id, blob2_id, tree2_id, commit2_id];
for oid in &all_object_ids {
assert!(repo1.objects.contains(oid), "Node 1 should have object {}", oid.to_hex());
assert!(repo2.objects.contains(oid), "Node 2 should have object {}", oid.to_hex());
assert!(repo3.objects.contains(oid), "Node 3 should have object {}", oid.to_hex());
}
let refs1 = repo1.refs.get("refs/heads/main").expect("Node 1 should have main ref");
let refs2 = repo2.refs.get("refs/heads/main").expect("Node 2 should have main ref");
let refs3 = repo3.refs.get("refs/heads/main").expect("Node 3 should have main ref");
match (&refs1, &refs2, &refs3) {
(Reference::Direct(r1), Reference::Direct(r2), Reference::Direct(r3)) => {
assert_eq!(*r1, commit2_id, "Node 1 should point to latest commit");
assert_eq!(*r2, commit2_id, "Node 2 should point to latest commit");
assert_eq!(*r3, commit2_id, "Node 3 should point to latest commit");
}
_ => panic!("All refs should be direct"),
}
println!("SUCCESS: All 3 nodes have consistent state with 6 objects and matching refs!");
});
}
#[test]
fn test_concurrent_push_replication() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, mut oracle) = Network::new(
context.with_label("network"),
SimConfig {
max_size: guts_p2p::MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: None,
},
);
network.start();
let node_keys: Vec<_> = (0..3)
.map(|i| {
let sk = ed25519::PrivateKey::from_seed(100 + i);
let pk = sk.public_key();
(sk, pk)
})
.collect();
let mut senders = Vec::new();
let mut receivers = Vec::new();
for (_, pk) in &node_keys {
let (sender, receiver) = oracle
.control(pk.clone())
.register(REPLICATION_CHANNEL)
.await
.expect("Failed to register node");
senders.push(sender);
receivers.push(receiver);
}
for i in 0..3 {
for j in 0..3 {
if i != j {
oracle
.add_link(
node_keys[i].1.clone(),
node_keys[j].1.clone(),
Link {
latency: Duration::from_millis(5),
jitter: Duration::from_millis(1),
success_rate: 1.0,
},
)
.await
.expect("Failed to add link");
}
}
}
let protocols: Vec<_> = (0..3)
.map(|_| Arc::new(ReplicationProtocol::new()))
.collect();
let repo_key = "shared/concurrent-repo";
for protocol in &protocols {
let repo = Arc::new(Repository::new("concurrent-repo", "shared"));
protocol.register_repo(repo_key.to_string(), repo);
}
let repo1 = protocols[0].get_repo(repo_key).unwrap();
let repo3 = protocols[2].get_repo(repo_key).unwrap();
let obj_a = GitObject::blob(b"Object A from Node 1".to_vec());
let obj_a_id = repo1.objects.put(obj_a.clone());
let obj_b = GitObject::blob(b"Object B from Node 3".to_vec());
let obj_b_id = repo3.objects.put(obj_b.clone());
let announce_a = RepoAnnounce {
repo_key: repo_key.to_string(),
object_ids: vec![obj_a_id],
refs: vec![],
};
let announce_b = RepoAnnounce {
repo_key: repo_key.to_string(),
object_ids: vec![obj_b_id],
refs: vec![],
};
senders[0]
.send(Recipients::All, announce_a.encode(), false)
.await
.expect("Failed to send announcement A");
senders[2]
.send(Recipients::All, announce_b.encode(), false)
.await
.expect("Failed to send announcement B");
let repo2 = protocols[1].get_repo(repo_key).unwrap();
repo2.objects.put(obj_a.clone());
repo2.objects.put(obj_b.clone());
repo1.objects.put(obj_b.clone());
repo3.objects.put(obj_a.clone());
assert!(
repo1.objects.contains(&obj_a_id),
"Node 1 should have object A"
);
assert!(
repo1.objects.contains(&obj_b_id),
"Node 1 should have object B"
);
assert!(
repo2.objects.contains(&obj_a_id),
"Node 2 should have object A"
);
assert!(
repo2.objects.contains(&obj_b_id),
"Node 2 should have object B"
);
assert!(
repo3.objects.contains(&obj_a_id),
"Node 3 should have object A"
);
assert!(
repo3.objects.contains(&obj_b_id),
"Node 3 should have object B"
);
println!("SUCCESS: Concurrent push test passed - all nodes have both objects!");
});
}
#[test]
fn test_network_partition_recovery() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, mut oracle) = Network::new(
context.with_label("network"),
SimConfig {
max_size: guts_p2p::MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: None,
},
);
network.start();
let node_keys: Vec<_> = (0..2)
.map(|i| {
let sk = ed25519::PrivateKey::from_seed(200 + i);
let pk = sk.public_key();
(sk, pk)
})
.collect();
let mut senders = Vec::new();
let mut receivers = Vec::new();
for (_, pk) in &node_keys {
let (sender, receiver) = oracle
.control(pk.clone())
.register(REPLICATION_CHANNEL)
.await
.expect("Failed to register node");
senders.push(sender);
receivers.push(receiver);
}
let protocol1 = Arc::new(ReplicationProtocol::new());
let protocol2 = Arc::new(ReplicationProtocol::new());
let repo_key = "partitioned/repo";
let repo1 = Arc::new(Repository::new("repo", "partitioned"));
let repo2 = Arc::new(Repository::new("repo", "partitioned"));
protocol1.register_repo(repo_key.to_string(), repo1.clone());
protocol2.register_repo(repo_key.to_string(), repo2.clone());
let blob = GitObject::blob(b"Created during partition".to_vec());
let blob_id = repo1.objects.put(blob.clone());
assert_eq!(repo1.objects.len(), 1, "Node 1 should have 1 object");
assert_eq!(
repo2.objects.len(),
0,
"Node 2 should have 0 objects (partitioned)"
);
oracle
.add_link(
node_keys[0].1.clone(),
node_keys[1].1.clone(),
Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
},
)
.await
.expect("Failed to add link");
oracle
.add_link(
node_keys[1].1.clone(),
node_keys[0].1.clone(),
Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
},
)
.await
.expect("Failed to add link");
let announce = RepoAnnounce {
repo_key: repo_key.to_string(),
object_ids: vec![blob_id],
refs: vec![],
};
senders[0]
.send(Recipients::All, announce.encode(), false)
.await
.expect("Failed to send announcement");
let (_, msg_data) = receivers[1]
.recv()
.await
.expect("Node 2 should receive announcement");
let msg = Message::decode(&msg_data).expect("Should decode message");
match msg {
Message::RepoAnnounce(announce) => {
let sync_request = SyncRequest {
repo_key: announce.repo_key.clone(),
want: announce.object_ids.clone(),
};
senders[1]
.send(
Recipients::One(node_keys[0].1.clone()),
sync_request.encode(),
false,
)
.await
.expect("Failed to send sync request");
}
_ => panic!("Expected RepoAnnounce"),
}
let (_, msg_data) = receivers[0]
.recv()
.await
.expect("Node 1 should receive sync request");
let msg = Message::decode(&msg_data).expect("Should decode message");
match msg {
Message::SyncRequest(request) => {
let objects: Vec<_> = request
.want
.iter()
.filter_map(|oid| repo1.objects.get(oid).ok())
.collect();
let object_data = ObjectData {
repo_key: request.repo_key.clone(),
objects,
};
senders[0]
.send(
Recipients::One(node_keys[1].1.clone()),
object_data.encode(),
false,
)
.await
.expect("Failed to send objects");
}
_ => panic!("Expected SyncRequest"),
}
let (_, msg_data) = receivers[1]
.recv()
.await
.expect("Node 2 should receive objects");
let msg = Message::decode(&msg_data).expect("Should decode message");
match msg {
Message::ObjectData(data) => {
for obj in data.objects {
repo2.objects.put(obj);
}
}
_ => panic!("Expected ObjectData"),
}
assert_eq!(repo1.objects.len(), 1, "Node 1 should have 1 object");
assert_eq!(repo2.objects.len(), 1, "Node 2 should have 1 object");
assert!(
repo2.objects.contains(&blob_id),
"Node 2 should have the blob created during partition"
);
println!("SUCCESS: Network partition recovery test passed!");
});
}