#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
use super::TestHarness;
use ant_node::client::compute_address;
use ant_node::replication::config::REPLICATION_PROTOCOL_ID;
use ant_node::replication::protocol::{
compute_audit_digest, AuditChallenge, AuditResponse, FetchRequest, FetchResponse,
FreshReplicationOffer, FreshReplicationResponse, NeighborSyncRequest, ReplicationMessage,
ReplicationMessageBody, VerificationRequest, ABSENT_KEY_DIGEST,
};
use ant_node::replication::scheduling::ReplicationQueues;
use saorsa_core::identity::PeerId;
use saorsa_core::{P2PNode, TrustEvent};
use serial_test::serial;
use std::time::Duration;
const PROPAGATION_TIMEOUT: Duration = Duration::from_secs(15);
const PROPAGATION_POLL_INTERVAL: Duration = Duration::from_millis(200);
async fn send_replication_request(
sender: &P2PNode,
target: &PeerId,
msg: ReplicationMessage,
timeout: Duration,
) -> ReplicationMessage {
let encoded = msg.encode().expect("encode replication request");
let response = sender
.send_request(target, REPLICATION_PROTOCOL_ID, encoded, timeout)
.await
.expect("send_request");
ReplicationMessage::decode(&response.data).expect("decode replication response")
}
#[tokio::test]
#[serial]
async fn test_fresh_replication_propagates_to_close_group() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let source_idx = 3; let source = harness.test_node(source_idx).expect("source node");
let source_protocol = source.ant_protocol.as_ref().expect("protocol");
let source_storage = source_protocol.storage();
let content = b"hello replication world";
let address = compute_address(content);
source_storage.put(&address, content).await.expect("put");
for i in 0..harness.node_count() {
if let Some(node) = harness.test_node(i) {
if let Some(protocol) = &node.ant_protocol {
protocol.payment_verifier().cache_insert(address);
}
}
}
let dummy_pop = [0x01u8; 64];
if let Some(ref engine) = source.replication_engine {
engine.replicate_fresh(&address, content, &dummy_pop).await;
}
let deadline = tokio::time::Instant::now() + PROPAGATION_TIMEOUT;
let mut found_on_other = false;
while tokio::time::Instant::now() < deadline {
for i in 0..harness.node_count() {
if i == source_idx {
continue;
}
if let Some(node) = harness.test_node(i) {
if let Some(protocol) = &node.ant_protocol {
if protocol.storage().exists(&address).unwrap_or(false) {
found_on_other = true;
}
}
}
}
if found_on_other {
break;
}
tokio::time::sleep(PROPAGATION_POLL_INTERVAL).await;
}
assert!(
found_on_other,
"Chunk should have replicated to at least one other node"
);
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn test_paid_list_persistence() {
let mut harness = TestHarness::setup_minimal().await.expect("setup");
let key = [0xAA; 32];
let data_dir = {
let node = harness.test_node(3).expect("node");
let dir = node.data_dir.clone();
if let Some(ref engine) = node.replication_engine {
engine.paid_list().insert(&key).await.expect("insert");
assert!(engine.paid_list().contains(&key).expect("contains"));
}
dir
};
{
let node = harness.network_mut().node_mut(3).expect("node");
if let Some(ref mut engine) = node.replication_engine {
engine.shutdown().await;
}
node.replication_engine = None;
node.replication_shutdown = None;
}
let paid_list2 = ant_node::replication::paid_list::PaidList::new(&data_dir)
.await
.expect("reopen");
assert!(paid_list2.contains(&key).expect("contains after reopen"));
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn test_verification_request_returns_presence() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let node_a = harness.test_node(3).expect("node_a");
let node_b = harness.test_node(4).expect("node_b");
let p2p_a = node_a.p2p_node.as_ref().expect("p2p_a");
let protocol_a = node_a.ant_protocol.as_ref().expect("protocol_a");
let storage_a = protocol_a.storage();
let content = b"verification test data";
let address = compute_address(content);
storage_a.put(&address, content).await.expect("put");
let missing_key = [0xBB; 32];
let request = VerificationRequest {
keys: vec![address, missing_key],
paid_list_check_indices: vec![],
};
let msg = ReplicationMessage {
request_id: 42,
body: ReplicationMessageBody::VerificationRequest(request),
};
let p2p_b = node_b.p2p_node.as_ref().expect("p2p_b");
let peer_a = *p2p_a.peer_id();
let resp_msg = send_replication_request(p2p_b, &peer_a, msg, Duration::from_secs(10)).await;
if let ReplicationMessageBody::VerificationResponse(resp) = resp_msg.body {
assert_eq!(resp.results.len(), 2);
assert!(resp.results[0].present, "First key should be present");
assert!(!resp.results[1].present, "Second key should be absent");
} else {
panic!("Expected VerificationResponse");
}
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn test_fetch_request_returns_record() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let node_a = harness.test_node(3).expect("node_a");
let node_b = harness.test_node(4).expect("node_b");
let p2p_a = node_a.p2p_node.as_ref().expect("p2p_a");
let protocol_a = node_a.ant_protocol.as_ref().expect("protocol_a");
let content = b"fetch me please";
let address = compute_address(content);
protocol_a
.storage()
.put(&address, content)
.await
.expect("put");
let request = FetchRequest { key: address };
let msg = ReplicationMessage {
request_id: 99,
body: ReplicationMessageBody::FetchRequest(request),
};
let p2p_b = node_b.p2p_node.as_ref().expect("p2p_b");
let peer_a = *p2p_a.peer_id();
let resp_msg = send_replication_request(p2p_b, &peer_a, msg, Duration::from_secs(10)).await;
if let ReplicationMessageBody::FetchResponse(FetchResponse::Success { key, data }) =
resp_msg.body
{
assert_eq!(key, address);
assert_eq!(data, content);
} else {
panic!("Expected FetchResponse::Success");
}
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn test_audit_challenge_returns_correct_digest() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let node_a = harness.test_node(3).expect("node_a");
let node_b = harness.test_node(4).expect("node_b");
let p2p_a = node_a.p2p_node.as_ref().expect("p2p_a");
let protocol_a = node_a.ant_protocol.as_ref().expect("protocol_a");
let content = b"audit test data";
let address = compute_address(content);
protocol_a
.storage()
.put(&address, content)
.await
.expect("put");
let peer_a = *p2p_a.peer_id();
let nonce = [0x42u8; 32];
let challenge = AuditChallenge {
challenge_id: 1234,
nonce,
challenged_peer_id: *peer_a.as_bytes(),
keys: vec![address],
};
let msg = ReplicationMessage {
request_id: 1234,
body: ReplicationMessageBody::AuditChallenge(challenge),
};
let p2p_b = node_b.p2p_node.as_ref().expect("p2p_b");
let resp_msg = send_replication_request(p2p_b, &peer_a, msg, Duration::from_secs(10)).await;
if let ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
challenge_id,
digests,
}) = resp_msg.body
{
assert_eq!(challenge_id, 1234);
assert_eq!(digests.len(), 1);
let expected = compute_audit_digest(&nonce, peer_a.as_bytes(), &address, content);
assert_eq!(digests[0], expected);
} else {
panic!("Expected AuditResponse::Digests");
}
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn test_audit_absent_key_returns_sentinel() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let node_a = harness.test_node(3).expect("node_a");
let node_b = harness.test_node(4).expect("node_b");
let p2p_a = node_a.p2p_node.as_ref().expect("p2p_a");
let peer_a = *p2p_a.peer_id();
let missing_key = [0xDD; 32];
let nonce = [0x11u8; 32];
let challenge = AuditChallenge {
challenge_id: 5678,
nonce,
challenged_peer_id: *peer_a.as_bytes(),
keys: vec![missing_key],
};
let msg = ReplicationMessage {
request_id: 5678,
body: ReplicationMessageBody::AuditChallenge(challenge),
};
let p2p_b = node_b.p2p_node.as_ref().expect("p2p_b");
let resp_msg = send_replication_request(p2p_b, &peer_a, msg, Duration::from_secs(10)).await;
if let ReplicationMessageBody::AuditResponse(AuditResponse::Digests { digests, .. }) =
resp_msg.body
{
assert_eq!(digests.len(), 1);
assert_eq!(
digests[0], ABSENT_KEY_DIGEST,
"Absent key should return sentinel digest"
);
} else {
panic!("Expected AuditResponse::Digests");
}
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn test_fetch_not_found() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let node_a = harness.test_node(3).expect("node_a");
let node_b = harness.test_node(4).expect("node_b");
let p2p_a = node_a.p2p_node.as_ref().expect("p2p_a");
let peer_a = *p2p_a.peer_id();
let missing_key = [0xEE; 32];
let request = FetchRequest { key: missing_key };
let msg = ReplicationMessage {
request_id: 77,
body: ReplicationMessageBody::FetchRequest(request),
};
let p2p_b = node_b.p2p_node.as_ref().expect("p2p_b");
let resp_msg = send_replication_request(p2p_b, &peer_a, msg, Duration::from_secs(10)).await;
assert!(
matches!(
resp_msg.body,
ReplicationMessageBody::FetchResponse(FetchResponse::NotFound { .. })
),
"Expected FetchResponse::NotFound"
);
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn test_verification_with_paid_list_check() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let node_a = harness.test_node(3).expect("node_a");
let node_b = harness.test_node(4).expect("node_b");
let p2p_a = node_a.p2p_node.as_ref().expect("p2p_a");
let content = b"paid test data";
let address = compute_address(content);
let protocol_a = node_a.ant_protocol.as_ref().expect("protocol_a");
protocol_a
.storage()
.put(&address, content)
.await
.expect("put");
if let Some(ref engine) = node_a.replication_engine {
engine
.paid_list()
.insert(&address)
.await
.expect("paid_list insert");
}
let request = VerificationRequest {
keys: vec![address],
paid_list_check_indices: vec![0],
};
let msg = ReplicationMessage {
request_id: 55,
body: ReplicationMessageBody::VerificationRequest(request),
};
let p2p_b = node_b.p2p_node.as_ref().expect("p2p_b");
let peer_a = *p2p_a.peer_id();
let resp_msg = send_replication_request(p2p_b, &peer_a, msg, Duration::from_secs(10)).await;
if let ReplicationMessageBody::VerificationResponse(resp) = resp_msg.body {
assert_eq!(resp.results.len(), 1);
assert!(resp.results[0].present, "Key should be present");
assert_eq!(
resp.results[0].paid,
Some(true),
"Key should be in PaidForList"
);
} else {
panic!("Expected VerificationResponse");
}
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn test_fresh_offer_with_empty_pop_rejected() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let node_a = harness.test_node(3).expect("node_a");
let node_b = harness.test_node(4).expect("node_b");
let p2p_b = node_b.p2p_node.as_ref().expect("p2p_b");
let peer_a = *node_a.p2p_node.as_ref().expect("p2p_a").peer_id();
let content = b"invalid pop test";
let address = ant_node::client::compute_address(content);
let offer = FreshReplicationOffer {
key: address,
data: content.to_vec(),
proof_of_payment: vec![], };
let msg = ReplicationMessage {
request_id: 1000,
body: ReplicationMessageBody::FreshReplicationOffer(offer),
};
let resp_msg = send_replication_request(p2p_b, &peer_a, msg, Duration::from_secs(10)).await;
match resp_msg.body {
ReplicationMessageBody::FreshReplicationResponse(FreshReplicationResponse::Rejected {
reason,
..
}) => {
assert!(
reason.contains("proof of payment") || reason.contains("Missing"),
"Should mention missing PoP, got: {reason}"
);
}
other => panic!("Expected Rejected, got: {other:?}"),
}
let protocol_a = node_a.ant_protocol.as_ref().expect("protocol");
assert!(
!protocol_a.storage().exists(&address).unwrap_or(false),
"Chunk should not be stored with empty PoP"
);
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn test_neighbor_sync_request_returns_hints() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let node_a = harness.test_node(3).expect("node_a");
let node_b = harness.test_node(4).expect("node_b");
let p2p_b = node_b.p2p_node.as_ref().expect("p2p_b");
let peer_a = *node_a.p2p_node.as_ref().expect("p2p_a").peer_id();
let content = b"sync test data";
let address = ant_node::client::compute_address(content);
let protocol_a = node_a.ant_protocol.as_ref().expect("protocol");
protocol_a
.storage()
.put(&address, content)
.await
.expect("put");
let request = NeighborSyncRequest {
replica_hints: vec![],
paid_hints: vec![],
bootstrapping: false,
};
let msg = ReplicationMessage {
request_id: 2000,
body: ReplicationMessageBody::NeighborSyncRequest(request),
};
let resp_msg = send_replication_request(p2p_b, &peer_a, msg, Duration::from_secs(10)).await;
match resp_msg.body {
ReplicationMessageBody::NeighborSyncResponse(resp) => {
assert!(!resp.bootstrapping, "Node A shouldn't claim bootstrapping");
}
other => panic!("Expected NeighborSyncResponse, got: {other:?}"),
}
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn test_audit_challenge_multi_key() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let node_a = harness.test_node(3).expect("node_a");
let node_b = harness.test_node(4).expect("node_b");
let p2p_a = node_a.p2p_node.as_ref().expect("p2p_a");
let protocol_a = node_a.ant_protocol.as_ref().expect("protocol_a");
let c1 = b"audit multi key 1";
let c2 = b"audit multi key 2";
let c3 = b"audit multi key 3 (padding)";
let c4 = b"audit multi key 4 (padding)";
let a1 = ant_node::client::compute_address(c1);
let a2 = ant_node::client::compute_address(c2);
let a3 = ant_node::client::compute_address(c3);
let a4 = ant_node::client::compute_address(c4);
protocol_a.storage().put(&a1, c1).await.expect("put 1");
protocol_a.storage().put(&a2, c2).await.expect("put 2");
protocol_a.storage().put(&a3, c3).await.expect("put 3");
protocol_a.storage().put(&a4, c4).await.expect("put 4");
let absent_key = [0xCC; 32];
let peer_a = *p2p_a.peer_id();
let nonce = [0x55; 32];
let challenge = AuditChallenge {
challenge_id: 3000,
nonce,
challenged_peer_id: *peer_a.as_bytes(),
keys: vec![a1, absent_key, a2],
};
let msg = ReplicationMessage {
request_id: 3000,
body: ReplicationMessageBody::AuditChallenge(challenge),
};
let p2p_b = node_b.p2p_node.as_ref().expect("p2p_b");
let resp_msg = send_replication_request(p2p_b, &peer_a, msg, Duration::from_secs(10)).await;
if let ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
challenge_id,
digests,
}) = resp_msg.body
{
assert_eq!(challenge_id, 3000);
assert_eq!(digests.len(), 3);
let expected_1 = compute_audit_digest(&nonce, peer_a.as_bytes(), &a1, c1);
assert_eq!(digests[0], expected_1, "First key digest should match");
assert_eq!(
digests[1], ABSENT_KEY_DIGEST,
"Absent key should be sentinel"
);
let expected_2 = compute_audit_digest(&nonce, peer_a.as_bytes(), &a2, c2);
assert_eq!(digests[2], expected_2, "Third key digest should match");
} else {
panic!("Expected AuditResponse::Digests");
}
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn test_fetch_returns_error_for_corrupt_key() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let node_a = harness.test_node(3).expect("node_a");
let node_b = harness.test_node(4).expect("node_b");
let p2p_a = node_a.p2p_node.as_ref().expect("p2p_a");
let peer_a = *p2p_a.peer_id();
let fake_key = [0x00; 32];
let request = FetchRequest { key: fake_key };
let msg = ReplicationMessage {
request_id: 4000,
body: ReplicationMessageBody::FetchRequest(request),
};
let p2p_b = node_b.p2p_node.as_ref().expect("p2p_b");
let resp_msg = send_replication_request(p2p_b, &peer_a, msg, Duration::from_secs(10)).await;
assert!(
matches!(
resp_msg.body,
ReplicationMessageBody::FetchResponse(FetchResponse::NotFound { .. })
),
"Expected NotFound for non-existent key"
);
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn scenario_1_and_24_fresh_replication_stores_and_propagates_paid_list() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let source_idx = 3;
let source = harness.test_node(source_idx).expect("source");
let protocol = source.ant_protocol.as_ref().expect("protocol");
let storage = protocol.storage();
let content = b"scenario 3 quorum pass test";
let address = compute_address(content);
storage.put(&address, content).await.expect("put");
for i in 0..harness.node_count() {
if let Some(node) = harness.test_node(i) {
if let Some(p) = &node.ant_protocol {
p.payment_verifier().cache_insert(address);
}
}
}
let dummy_pop = [0x01u8; 64];
if let Some(ref engine) = source.replication_engine {
engine.replicate_fresh(&address, content, &dummy_pop).await;
}
let deadline = tokio::time::Instant::now() + PROPAGATION_TIMEOUT;
let mut stored_elsewhere = false;
let mut paid_listed_elsewhere = false;
loop {
for i in 0..harness.node_count() {
if i == source_idx {
continue;
}
if let Some(node) = harness.test_node(i) {
if let Some(p) = &node.ant_protocol {
if p.storage().exists(&address).unwrap_or(false) {
stored_elsewhere = true;
}
}
if let Some(ref engine) = node.replication_engine {
if engine.paid_list().contains(&address).unwrap_or(false) {
paid_listed_elsewhere = true;
}
}
}
}
if (stored_elsewhere && paid_listed_elsewhere) || tokio::time::Instant::now() >= deadline {
break;
}
tokio::time::sleep(PROPAGATION_POLL_INTERVAL).await;
}
assert!(
stored_elsewhere,
"Chunk should be stored on at least one other node"
);
assert!(
paid_listed_elsewhere,
"Key should be in PaidForList on at least one other node"
);
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn scenario_9_fetch_retry_uses_alternate_source() {
let mut queues = ReplicationQueues::new();
let key = [0x09; 32];
let distance = [0x01; 32];
let source_a = PeerId::from_bytes([0xA0; 32]);
let source_b = PeerId::from_bytes([0xB0; 32]);
queues.enqueue_fetch(key, distance, vec![source_a, source_b]);
let candidate = queues.dequeue_fetch().expect("dequeue");
queues.start_fetch(key, source_a, candidate.sources);
let next = queues.retry_fetch(&key);
assert_eq!(next, Some(source_b), "Should retry with alternate source");
let exhausted = queues.retry_fetch(&key);
assert!(exhausted.is_none(), "No more sources available");
}
#[tokio::test]
#[serial]
async fn scenario_10_fetch_retry_exhaustion() {
let mut queues = ReplicationQueues::new();
let key = [0x10; 32];
let distance = [0x01; 32];
let source = PeerId::from_bytes([0xC0; 32]);
queues.enqueue_fetch(key, distance, vec![source]);
let _candidate = queues.dequeue_fetch().expect("dequeue");
queues.start_fetch(key, source, vec![source]);
let next = queues.retry_fetch(&key);
assert!(next.is_none(), "Single source exhausted");
let entry = queues.complete_fetch(&key);
assert!(entry.is_some(), "Should have in-flight entry to complete");
assert_eq!(queues.in_flight_count(), 0);
}
#[tokio::test]
#[serial]
async fn scenario_11_repeated_failures_decrease_trust() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let node_a = harness.test_node(3).expect("node_a");
let node_b = harness.test_node(4).expect("node_b");
let p2p_a = node_a.p2p_node.as_ref().expect("p2p_a");
let p2p_b = node_b.p2p_node.as_ref().expect("p2p_b");
let peer_b = *p2p_b.peer_id();
let initial_trust = p2p_a.peer_trust(&peer_b);
let failure_count = 5;
let failure_weight = 3.0;
for _ in 0..failure_count {
p2p_a
.report_trust_event(&peer_b, TrustEvent::ApplicationFailure(failure_weight))
.await;
}
let final_trust = p2p_a.peer_trust(&peer_b);
assert!(
final_trust < initial_trust,
"Trust should decrease after repeated failures: {initial_trust} -> {final_trust}"
);
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn scenario_12_bootstrap_quorum_aggregation() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let content = b"bootstrap quorum test";
let address = compute_address(content);
let holder_count = 4;
for idx in 0..holder_count {
let node = harness.test_node(idx).expect("node");
let protocol = node.ant_protocol.as_ref().expect("protocol");
protocol
.storage()
.put(&address, content)
.await
.expect("put");
if let Some(ref engine) = node.replication_engine {
engine
.paid_list()
.insert(&address)
.await
.expect("paid insert");
}
}
let querier = harness.test_node(4).expect("querier");
let p2p_q = querier.p2p_node.as_ref().expect("p2p");
let mut presence_confirmations = 0u32;
let mut paid_confirmations = 0u32;
for idx in 0..holder_count {
let target = harness.test_node(idx).expect("target");
let peer = *target.p2p_node.as_ref().expect("p2p").peer_id();
let request = VerificationRequest {
keys: vec![address],
paid_list_check_indices: vec![0],
};
let msg = ReplicationMessage {
request_id: 1200 + idx as u64,
body: ReplicationMessageBody::VerificationRequest(request),
};
let resp_msg = send_replication_request(p2p_q, &peer, msg, Duration::from_secs(10)).await;
if let ReplicationMessageBody::VerificationResponse(resp) = resp_msg.body {
if let Some(result) = resp.results.first() {
if result.present {
presence_confirmations += 1;
}
if result.paid == Some(true) {
paid_confirmations += 1;
}
}
}
}
let min_quorum = 3;
assert!(
presence_confirmations >= min_quorum,
"Bootstrap node should receive enough presence confirmations for quorum: \
got {presence_confirmations}, need {min_quorum}"
);
assert!(
paid_confirmations >= min_quorum,
"Bootstrap node should receive enough paid-list confirmations: \
got {paid_confirmations}, need {min_quorum}"
);
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn scenario_14_sync_hints_cover_all_local_keys() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let node_a = harness.test_node(3).expect("node_a");
let node_b = harness.test_node(4).expect("node_b");
let p2p_b = node_b.p2p_node.as_ref().expect("p2p_b");
let peer_a = *node_a.p2p_node.as_ref().expect("p2p_a").peer_id();
let protocol_a = node_a.ant_protocol.as_ref().expect("protocol_a");
let storage_a = protocol_a.storage();
let chunk_count = 10u8;
let mut addresses = Vec::new();
for i in 0..chunk_count {
let content = format!("backlog test chunk {i}");
let address = compute_address(content.as_bytes());
storage_a
.put(&address, content.as_bytes())
.await
.expect("put");
addresses.push(address);
}
let all_keys = storage_a.all_keys().await.expect("all_keys");
assert_eq!(
all_keys.len(),
addresses.len(),
"all_keys should cover every stored chunk"
);
let request = NeighborSyncRequest {
replica_hints: vec![],
paid_hints: vec![],
bootstrapping: false,
};
let msg = ReplicationMessage {
request_id: 1400,
body: ReplicationMessageBody::NeighborSyncRequest(request),
};
let resp_msg = send_replication_request(p2p_b, &peer_a, msg, Duration::from_secs(10)).await;
let hints = match resp_msg.body {
ReplicationMessageBody::NeighborSyncResponse(resp) => resp.replica_hints,
other => panic!("Expected NeighborSyncResponse, got: {other:?}"),
};
for addr in &addresses {
assert!(
hints.contains(addr),
"Sync response hints should include stored key {addr:?}; \
got {} hints total",
hints.len()
);
}
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn scenario_15_partition_and_heal() {
let mut harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let content = b"partition test data";
let address = compute_address(content);
for idx in [3, 4] {
let node = harness.test_node(idx).expect("node");
let protocol = node.ant_protocol.as_ref().expect("protocol");
protocol
.storage()
.put(&address, content)
.await
.expect("put");
if let Some(ref engine) = node.replication_engine {
engine
.paid_list()
.insert(&address)
.await
.expect("paid insert");
}
}
harness.shutdown_node(4).await.expect("shutdown");
let node3 = harness.test_node(3).expect("node3 after partition");
let protocol3 = node3.ant_protocol.as_ref().expect("protocol");
assert!(
protocol3.storage().exists(&address).expect("exists"),
"Data should survive partition on remaining node"
);
let querier = harness.test_node(0).expect("querier");
let p2p_q = querier.p2p_node.as_ref().expect("p2p");
let node3_peer = *node3.p2p_node.as_ref().expect("p2p").peer_id();
let request = VerificationRequest {
keys: vec![address],
paid_list_check_indices: vec![0],
};
let msg = ReplicationMessage {
request_id: 1500,
body: ReplicationMessageBody::VerificationRequest(request),
};
let resp_msg = send_replication_request(p2p_q, &node3_peer, msg, Duration::from_secs(10)).await;
if let ReplicationMessageBody::VerificationResponse(resp) = resp_msg.body {
let result = resp.results.first().expect("should have a result");
assert!(
result.present,
"Node 3 should still report chunk as present after partition"
);
assert_eq!(
result.paid,
Some(true),
"Node 3 should still confirm paid-list status — this enables recovery \
when paid-list authorization survives the partition"
);
} else {
panic!("Expected VerificationResponse");
}
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn scenario_17_bidirectional_sync_when_sender_in_rt() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let node_a = harness.test_node(3).expect("node_a");
let node_b = harness.test_node(4).expect("node_b");
let p2p_b = node_b.p2p_node.as_ref().expect("p2p_b");
let peer_a = *node_a.p2p_node.as_ref().expect("p2p_a").peer_id();
let content = b"admission asymmetry test";
let address = compute_address(content);
let protocol_a = node_a.ant_protocol.as_ref().expect("protocol");
protocol_a
.storage()
.put(&address, content)
.await
.expect("put");
let inbound_hint = [0x17; 32];
let request = NeighborSyncRequest {
replica_hints: vec![inbound_hint],
paid_hints: vec![],
bootstrapping: false,
};
let msg = ReplicationMessage {
request_id: 1700,
body: ReplicationMessageBody::NeighborSyncRequest(request),
};
let resp_msg = send_replication_request(p2p_b, &peer_a, msg, Duration::from_secs(10)).await;
match resp_msg.body {
ReplicationMessageBody::NeighborSyncResponse(resp) => {
assert!(!resp.bootstrapping, "Node A should not claim bootstrapping");
assert!(
resp.replica_hints.contains(&address),
"When sender is in receiver's RT, receiver should send outbound \
replica hints. Expected address {address:?} in hints, got {} hints.",
resp.replica_hints.len()
);
}
other => panic!("Expected NeighborSyncResponse, got: {other:?}"),
}
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn scenario_21_paid_list_majority_from_multiple_peers() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let key = [0x21; 32];
let populated_count = 4;
for idx in 0..populated_count {
if let Some(node) = harness.test_node(idx) {
if let Some(ref engine) = node.replication_engine {
engine.paid_list().insert(&key).await.expect("paid insert");
}
}
}
let querier = harness.test_node(4).expect("querier");
let p2p_q = querier.p2p_node.as_ref().expect("p2p");
let mut paid_confirmations = 0u32;
for idx in 0..populated_count {
let target = harness.test_node(idx).expect("target");
let target_p2p = target.p2p_node.as_ref().expect("target_p2p");
let peer = *target_p2p.peer_id();
let request = VerificationRequest {
keys: vec![key],
paid_list_check_indices: vec![0],
};
let msg = ReplicationMessage {
request_id: 2100 + idx as u64,
body: ReplicationMessageBody::VerificationRequest(request),
};
let resp_msg = send_replication_request(p2p_q, &peer, msg, Duration::from_secs(10)).await;
if let ReplicationMessageBody::VerificationResponse(resp) = resp_msg.body {
if resp.results.first().and_then(|r| r.paid) == Some(true) {
paid_confirmations += 1;
}
}
}
let min_confirmations = 3;
assert!(
paid_confirmations >= min_confirmations,
"Should get paid confirmations from multiple peers, got {paid_confirmations}"
);
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn scenario_24_fresh_replication_propagates_paid_notify() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let source_idx = 3;
let source = harness.test_node(source_idx).expect("source");
let protocol = source.ant_protocol.as_ref().expect("protocol");
let content = b"paid notify propagation test";
let address = compute_address(content);
protocol
.storage()
.put(&address, content)
.await
.expect("put");
for i in 0..harness.node_count() {
if let Some(node) = harness.test_node(i) {
if let Some(p) = &node.ant_protocol {
p.payment_verifier().cache_insert(address);
}
}
}
let dummy_pop = [0x01u8; 64];
if let Some(ref engine) = source.replication_engine {
engine.replicate_fresh(&address, content, &dummy_pop).await;
}
let deadline = tokio::time::Instant::now() + PROPAGATION_TIMEOUT;
let mut paid_count;
loop {
paid_count = 0u32;
for i in 0..harness.node_count() {
if i == source_idx {
continue;
}
if let Some(node) = harness.test_node(i) {
if let Some(ref engine) = node.replication_engine {
if engine.paid_list().contains(&address).unwrap_or(false) {
paid_count += 1;
}
}
}
}
if paid_count >= 1 || tokio::time::Instant::now() >= deadline {
break;
}
tokio::time::sleep(PROPAGATION_POLL_INTERVAL).await;
}
assert!(
paid_count >= 1,
"PaidNotify should propagate to at least 1 other node, got {paid_count}"
);
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn scenario_25_paid_list_convergence_via_verification() {
let harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let key = [0x25; 32];
let populated_count = 3;
for idx in 0..populated_count {
if let Some(node) = harness.test_node(idx) {
if let Some(ref engine) = node.replication_engine {
engine.paid_list().insert(&key).await.expect("insert");
}
}
}
let querier = harness.test_node(4).expect("querier");
let p2p_q = querier.p2p_node.as_ref().expect("p2p");
let mut confirmations = 0u32;
for idx in 0..populated_count {
let target = harness.test_node(idx).expect("target");
let peer = *target.p2p_node.as_ref().expect("p2p").peer_id();
let request = VerificationRequest {
keys: vec![key],
paid_list_check_indices: vec![0],
};
let msg = ReplicationMessage {
request_id: 2500 + idx as u64,
body: ReplicationMessageBody::VerificationRequest(request),
};
let resp_msg = send_replication_request(p2p_q, &peer, msg, Duration::from_secs(10)).await;
if let ReplicationMessageBody::VerificationResponse(v) = resp_msg.body {
if v.results.first().and_then(|r| r.paid) == Some(true) {
confirmations += 1;
}
}
}
let min_confirmations = 2;
assert!(
confirmations >= min_confirmations,
"Majority of queried peers should confirm paid status, got {confirmations}"
);
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn scenario_43_paid_list_persists_across_restart() {
let mut harness = TestHarness::setup_minimal().await.expect("setup");
let data_dir = {
let node = harness.test_node(3).expect("node");
let dir = node.data_dir.clone();
let key = [0x44; 32];
if let Some(ref engine) = node.replication_engine {
engine.paid_list().insert(&key).await.expect("insert");
}
dir
};
{
let node = harness.network_mut().node_mut(3).expect("node");
if let Some(ref mut engine) = node.replication_engine {
engine.shutdown().await;
}
node.replication_engine = None;
node.replication_shutdown = None;
}
let key = [0x44; 32];
let paid_list2 = ant_node::replication::paid_list::PaidList::new(&data_dir)
.await
.expect("reopen");
assert!(
paid_list2.contains(&key).expect("contains"),
"PaidForList should survive restart (cold-start recovery)"
);
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn scenario_45_unrecoverable_when_paid_list_lost() {
let harness = TestHarness::setup_minimal().await.expect("setup");
let key = [0x45; 32];
let node = harness.test_node(3).expect("node");
if let Some(ref engine) = node.replication_engine {
engine.paid_list().insert(&key).await.expect("insert");
}
let temp_dir = tempfile::tempdir().expect("tempdir");
let fresh_paid_list = ant_node::replication::paid_list::PaidList::new(temp_dir.path())
.await
.expect("fresh paid list");
assert!(
!fresh_paid_list.contains(&key).expect("contains"),
"Key should NOT be found in a fresh (lost) PaidForList"
);
harness.teardown().await.expect("teardown");
}
#[tokio::test]
#[serial]
async fn test_late_joiner_replicates_responsible_chunks() {
const REPLICATION_SETTLE_TIMEOUT: Duration = Duration::from_secs(90);
const SETTLE_POLL_INTERVAL: Duration = Duration::from_millis(500);
let mut harness = TestHarness::setup_minimal().await.expect("setup");
harness.warmup_dht().await.expect("warmup");
let chunk_count = 10;
let mut chunks: Vec<([u8; 32], Vec<u8>)> = Vec::with_capacity(chunk_count);
for i in 0..chunk_count {
let content = format!("late-joiner-test-chunk-{i}").into_bytes();
let address = compute_address(&content);
chunks.push((address, content));
}
let source_idx = 2;
{
let source = harness.test_node(source_idx).expect("source node");
let storage = source.ant_protocol.as_ref().expect("protocol").storage();
for (address, content) in &chunks {
storage.put(address, content).await.expect("put");
}
}
for (address, _) in &chunks {
for i in 0..harness.node_count() {
if let Some(node) = harness.test_node(i) {
if let Some(protocol) = &node.ant_protocol {
protocol.payment_verifier().cache_insert(*address);
}
}
}
}
let dummy_pop = [0x01u8; 64];
{
let source = harness.test_node(source_idx).expect("source node");
if let Some(ref engine) = source.replication_engine {
for (address, content) in &chunks {
engine.replicate_fresh(address, content, &dummy_pop).await;
}
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
let new_idx = harness.add_node().await.expect("add_node");
{
let new_node = harness.test_node(new_idx).expect("new node");
let protocol = new_node.ant_protocol.as_ref().expect("protocol");
for (address, _) in &chunks {
protocol.payment_verifier().cache_insert(*address);
}
}
let new_node = harness.test_node(new_idx).expect("new node");
let new_p2p = new_node.p2p_node.as_ref().expect("p2p");
let new_peer_id = *new_p2p.peer_id();
let new_storage = new_node.ant_protocol.as_ref().expect("protocol").storage();
let close_group_size = ant_node::CLOSE_GROUP_SIZE;
let mut responsible_chunks: Vec<[u8; 32]> = Vec::new();
for (address, _) in &chunks {
let closest = new_p2p
.dht_manager()
.find_closest_nodes_local_with_self(address, close_group_size)
.await;
if closest.iter().any(|n| n.peer_id == new_peer_id) {
responsible_chunks.push(*address);
}
}
assert!(
!responsible_chunks.is_empty(),
"New node should be in the close group for at least some chunks"
);
let deadline = tokio::time::Instant::now() + REPLICATION_SETTLE_TIMEOUT;
let mut all_present = false;
while tokio::time::Instant::now() < deadline {
let mut count = 0;
for address in &responsible_chunks {
if new_storage.exists(address).unwrap_or(false) {
count += 1;
}
}
if count == responsible_chunks.len() {
all_present = true;
break;
}
tokio::time::sleep(SETTLE_POLL_INTERVAL).await;
}
if !all_present {
let mut missing = Vec::new();
for address in &responsible_chunks {
if !new_storage.exists(address).unwrap_or(false) {
missing.push(hex::encode(address));
}
}
panic!(
"New node is missing {}/{} responsible chunks after {REPLICATION_SETTLE_TIMEOUT:?}: [{}]",
missing.len(),
responsible_chunks.len(),
missing.join(", "),
);
}
for (address, content) in &chunks {
if responsible_chunks.contains(address) {
let stored = new_storage
.get(address)
.await
.expect("get should succeed")
.expect("chunk should exist");
assert_eq!(
&stored,
content,
"Stored chunk content should match original for key {}",
hex::encode(address),
);
}
}
harness.teardown().await.expect("teardown");
}