use chaincraft::{
clear_local_registry,
network::PeerId,
shared::MessageType,
shared::SharedMessage,
shared_object::{ApplicationObject, MerkelizedChain},
storage::MemoryStorage,
ChaincraftNode,
};
use std::sync::Arc;
use tokio::time::{sleep, Duration};
async fn create_network_ephemeral(num_nodes: usize) -> Vec<ChaincraftNode> {
clear_local_registry();
let mut nodes = Vec::new();
for _ in 0..num_nodes {
let id = PeerId::new();
let storage = Arc::new(MemoryStorage::new());
let mut node = ChaincraftNode::new(id, storage);
node.set_port(0);
node.disable_local_discovery();
let chain: Box<dyn ApplicationObject> = Box::new(MerkelizedChain::new());
node.add_shared_object(chain).await.unwrap();
node.start().await.unwrap();
nodes.push(node);
}
nodes
}
async fn connect_nodes(nodes: &mut [ChaincraftNode]) {
let num_nodes = nodes.len();
for i in 0..num_nodes {
for j in 0..num_nodes {
if i == j {
continue;
}
let addr = format!("{}:{}", nodes[j].host(), nodes[j].port());
let _ = nodes[i].connect_to_peer(&addr).await;
}
}
}
async fn wait_for_chain_sync(
nodes: &[ChaincraftNode],
min_length: usize,
timeout_secs: u64,
) -> bool {
let start = std::time::Instant::now();
let timeout = Duration::from_secs(timeout_secs);
while start.elapsed() < timeout {
let mut lengths = Vec::new();
for node in nodes {
let mut shared_objects = node.shared_objects().await;
if let Some(mut obj) = shared_objects.pop() {
let obj_any = obj.as_any_mut();
if let Some(chain) = obj_any.downcast_mut::<MerkelizedChain>() {
lengths.push(chain.chain_length());
}
}
}
let all_at_min = lengths.iter().all(|&l| l >= min_length);
let all_same = lengths.windows(2).all(|w| w[0] == w[1]);
if all_at_min && all_same && !lengths.is_empty() {
return true;
}
sleep(Duration::from_millis(500)).await;
}
false
}
async fn add_hash_and_broadcast(node: &mut ChaincraftNode, hash: &str) {
let data = serde_json::json!(hash);
node.create_shared_message_with_data(data).await.unwrap();
}
#[tokio::test]
async fn test_merkelized_chain_basic() {
let id = PeerId::new();
let storage = Arc::new(MemoryStorage::new());
let mut node = ChaincraftNode::new(id, storage);
node.set_port(0);
let chain: Box<dyn ApplicationObject> = Box::new(MerkelizedChain::new());
let _chain_id = node.add_shared_object(chain).await.unwrap();
node.start().await.unwrap();
let mut shared_objects = node.shared_objects().await;
let chain = shared_objects
.first_mut()
.unwrap()
.as_any_mut()
.downcast_mut::<MerkelizedChain>()
.unwrap();
assert_eq!(chain.chain_length(), 1); assert!(!chain.genesis_hash().is_empty());
chain.add_next_hash();
chain.add_next_hash();
chain.add_next_hash();
assert_eq!(chain.chain_length(), 4);
node.close().await.unwrap();
}
#[tokio::test]
async fn test_shared_object_chain_propagation() {
let num_nodes = 5;
let mut nodes = create_network_ephemeral(num_nodes).await;
connect_nodes(&mut nodes).await;
sleep(Duration::from_secs(2)).await;
let next_hash = {
let mut shared_objects = nodes[0].shared_objects().await;
let chain = shared_objects
.first_mut()
.unwrap()
.as_any_mut()
.downcast_mut::<MerkelizedChain>()
.unwrap();
MerkelizedChain::calculate_next_hash(chain.latest_hash())
};
add_hash_and_broadcast(&mut nodes[0], &next_hash).await;
sleep(Duration::from_millis(100)).await;
let next_hash2 = {
let mut shared_objects = nodes[0].shared_objects().await;
let chain = shared_objects
.first_mut()
.unwrap()
.as_any_mut()
.downcast_mut::<MerkelizedChain>()
.unwrap();
MerkelizedChain::calculate_next_hash(chain.latest_hash())
};
add_hash_and_broadcast(&mut nodes[0], &next_hash2).await;
sleep(Duration::from_millis(100)).await;
let next_hash3 = {
let mut shared_objects = nodes[0].shared_objects().await;
let chain = shared_objects
.first_mut()
.unwrap()
.as_any_mut()
.downcast_mut::<MerkelizedChain>()
.unwrap();
MerkelizedChain::calculate_next_hash(chain.latest_hash())
};
add_hash_and_broadcast(&mut nodes[0], &next_hash3).await;
let expected_length = 4;
assert!(
wait_for_chain_sync(&nodes, expected_length, 30).await,
"Chain did not sync to expected length {expected_length} within timeout"
);
let first_chain = {
let mut shared_objects = nodes[0].shared_objects().await;
let chain = shared_objects
.first_mut()
.unwrap()
.as_any_mut()
.downcast_mut::<MerkelizedChain>()
.unwrap();
chain.chain().to_vec()
};
for (i, node) in nodes.iter().enumerate() {
let mut shared_objects = node.shared_objects().await;
let chain = shared_objects
.first_mut()
.unwrap()
.as_any_mut()
.downcast_mut::<MerkelizedChain>()
.unwrap();
assert_eq!(chain.chain_length(), expected_length, "Node {i} has incorrect chain length");
let node_chain: Vec<String> = chain
.chain()
.iter()
.take(expected_length)
.cloned()
.collect();
assert_eq!(node_chain, first_chain, "Node {i} chain doesn't match expected prefix");
}
println!("All {num_nodes} nodes synced to chain length {expected_length}!");
for mut node in nodes {
node.close().await.unwrap();
}
}
#[tokio::test]
async fn test_digest_based_sync() {
let num_nodes = 3;
let mut nodes = create_network_ephemeral(num_nodes).await;
connect_nodes(&mut nodes).await;
sleep(Duration::from_secs(2)).await;
let hash1 = {
let mut shared_objects = nodes[0].shared_objects().await;
let chain = shared_objects
.first_mut()
.unwrap()
.as_any_mut()
.downcast_mut::<MerkelizedChain>()
.unwrap();
MerkelizedChain::calculate_next_hash(chain.latest_hash())
};
add_hash_and_broadcast(&mut nodes[0], &hash1).await;
sleep(Duration::from_millis(100)).await;
let hash2 = {
let mut shared_objects = nodes[0].shared_objects().await;
let chain = shared_objects
.first_mut()
.unwrap()
.as_any_mut()
.downcast_mut::<MerkelizedChain>()
.unwrap();
MerkelizedChain::calculate_next_hash(chain.latest_hash())
};
add_hash_and_broadcast(&mut nodes[0], &hash2).await;
assert!(wait_for_chain_sync(&nodes, 3, 30).await, "Chain did not sync to length >= 3");
let mut shared_objects = nodes[0].shared_objects().await;
let obj = shared_objects.first_mut().unwrap();
let genesis = {
let any = obj.as_any_mut();
let chain = any.downcast_mut::<MerkelizedChain>().unwrap();
chain.genesis_hash().to_string()
};
let messages = obj.gossip_messages(Some(&genesis)).await.unwrap();
assert!(!messages.is_empty(), "Should have messages after genesis");
println!("Got {} messages since genesis", messages.len());
for mut node in nodes {
node.close().await.unwrap();
}
}
#[tokio::test]
async fn test_chain_hash_validation() {
let id = PeerId::new();
let storage = Arc::new(MemoryStorage::new());
let mut node = ChaincraftNode::new(id, storage);
node.set_port(0);
let chain: Box<dyn ApplicationObject> = Box::new(MerkelizedChain::new());
let _chain_id = node.add_shared_object(chain).await.unwrap();
node.start().await.unwrap();
let mut shared_objects = node.shared_objects().await;
let chain = shared_objects
.first_mut()
.unwrap()
.as_any_mut()
.downcast_mut::<MerkelizedChain>()
.unwrap();
let next_hash = MerkelizedChain::calculate_next_hash(chain.latest_hash());
let valid_msg = SharedMessage::new(
MessageType::Custom("chain_update".to_string()),
serde_json::json!(next_hash),
);
assert!(chain.is_valid(&valid_msg).await.unwrap(), "Should accept valid next hash");
let invalid_hash = "invalid_hash_not_following_chain";
let invalid_msg = SharedMessage::new(
MessageType::Custom("chain_update".to_string()),
serde_json::json!(invalid_hash),
);
assert!(!chain.is_valid(&invalid_msg).await.unwrap(), "Should reject invalid hash");
node.close().await.unwrap();
}
#[tokio::test]
async fn test_deduplication() {
let id = PeerId::new();
let storage = Arc::new(MemoryStorage::new());
let mut node = ChaincraftNode::new(id, storage);
node.set_port(0);
let chain: Box<dyn ApplicationObject> = Box::new(MerkelizedChain::new());
let _chain_id = node.add_shared_object(chain).await.unwrap();
node.start().await.unwrap();
let mut shared_objects = node.shared_objects().await;
let chain = shared_objects
.first_mut()
.unwrap()
.as_any_mut()
.downcast_mut::<MerkelizedChain>()
.unwrap();
chain.add_next_hash();
let len_before = chain.chain_length();
let existing_hash = chain.latest_hash().to_string();
let dup_msg = SharedMessage::new(
MessageType::Custom("chain_update".to_string()),
serde_json::json!(existing_hash),
);
chain.add_message(dup_msg.clone()).await.unwrap();
assert_eq!(
chain.chain_length(),
len_before,
"Duplicate hash should not increase chain length"
);
node.close().await.unwrap();
}