use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use crate::logging::{debug, info, warn};
use rand::seq::SliceRandom;
use rand::Rng;
use crate::ant_protocol::XorName;
use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
use crate::replication::protocol::{
compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
ReplicationMessageBody, ABSENT_KEY_DIGEST,
};
use crate::replication::types::{AuditFailureReason, FailureEvidence, PeerSyncRecord};
use crate::storage::LmdbStorage;
use saorsa_core::identity::PeerId;
use saorsa_core::P2PNode;
#[derive(Debug)]
pub enum AuditTickResult {
Passed {
challenged_peer: PeerId,
keys_checked: usize,
},
Failed {
evidence: FailureEvidence,
},
BootstrapClaim {
peer: PeerId,
},
Idle,
InsufficientKeys,
}
#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
pub async fn audit_tick(
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
config: &ReplicationConfig,
sync_history: &HashMap<PeerId, PeerSyncRecord>,
bootstrap_claims: &HashMap<PeerId, Instant>,
is_bootstrapping: bool,
) -> AuditTickResult {
if is_bootstrapping {
return AuditTickResult::Idle;
}
let dht = p2p_node.dht_manager();
let now = Instant::now();
let eligible_peers: Vec<PeerId> = sync_history
.iter()
.filter(|(_, record)| record.has_repair_opportunity())
.filter(|(peer, _)| {
bootstrap_claims.get(peer).map_or(true, |first_seen| {
now.duration_since(*first_seen) <= config.bootstrap_claim_grace_period
})
})
.map(|(peer, _)| *peer)
.collect();
if eligible_peers.is_empty() {
return AuditTickResult::Idle;
}
let (challenged_peer, nonce, challenge_id) = {
let mut rng = rand::thread_rng();
let selected = match eligible_peers.choose(&mut rng) {
Some(p) => *p,
None => return AuditTickResult::Idle,
};
let n: [u8; 32] = rng.gen();
let c: u64 = rng.gen();
(selected, n, c)
};
let all_keys = match storage.all_keys().await {
Ok(keys) => keys,
Err(e) => {
warn!("Audit: failed to read local keys: {e}");
return AuditTickResult::Idle;
}
};
if all_keys.is_empty() {
return AuditTickResult::Idle;
}
let sample_count = ReplicationConfig::audit_sample_count(all_keys.len());
let sampled_keys: Vec<XorName> = {
let mut rng = rand::thread_rng();
all_keys
.choose_multiple(&mut rng, sample_count)
.copied()
.collect()
};
let mut peer_keys = Vec::new();
for key in &sampled_keys {
let closest = dht
.find_closest_nodes_local_with_self(key, config.close_group_size)
.await;
if closest.iter().any(|n| n.peer_id == challenged_peer) {
peer_keys.push(*key);
}
}
if peer_keys.is_empty() {
return AuditTickResult::Idle;
}
let challenge = AuditChallenge {
challenge_id,
nonce,
challenged_peer_id: *challenged_peer.as_bytes(),
keys: peer_keys.clone(),
};
let msg = ReplicationMessage {
request_id: challenge_id,
body: ReplicationMessageBody::AuditChallenge(challenge),
};
let encoded = match msg.encode() {
Ok(data) => data,
Err(e) => {
warn!("Audit: failed to encode challenge: {e}");
return AuditTickResult::Idle;
}
};
let response = match p2p_node
.send_request(
&challenged_peer,
REPLICATION_PROTOCOL_ID,
encoded,
config.audit_response_timeout(peer_keys.len()),
)
.await
{
Ok(resp) => resp,
Err(e) => {
debug!("Audit: challenge to {challenged_peer} failed: {e}");
return handle_audit_timeout(
&challenged_peer,
challenge_id,
&peer_keys,
p2p_node,
config,
)
.await;
}
};
let resp_msg = match ReplicationMessage::decode(&response.data) {
Ok(m) => m,
Err(e) => {
warn!("Audit: failed to decode response from {challenged_peer}: {e}");
return handle_audit_failure(
&challenged_peer,
challenge_id,
&peer_keys,
AuditFailureReason::MalformedResponse,
p2p_node,
config,
)
.await;
}
};
match resp_msg.body {
ReplicationMessageBody::AuditResponse(AuditResponse::Bootstrapping {
challenge_id: resp_id,
}) => {
if resp_id != challenge_id {
warn!("Audit: challenge ID mismatch on Bootstrapping from {challenged_peer}");
return handle_audit_failure(
&challenged_peer,
challenge_id,
&peer_keys,
AuditFailureReason::MalformedResponse,
p2p_node,
config,
)
.await;
}
AuditTickResult::BootstrapClaim {
peer: challenged_peer,
}
}
ReplicationMessageBody::AuditResponse(AuditResponse::Digests {
challenge_id: resp_id,
digests,
}) => {
if resp_id != challenge_id {
warn!("Audit: challenge ID mismatch from {challenged_peer}");
return handle_audit_failure(
&challenged_peer,
challenge_id,
&peer_keys,
AuditFailureReason::MalformedResponse,
p2p_node,
config,
)
.await;
}
verify_digests(
&challenged_peer,
challenge_id,
&nonce,
&peer_keys,
&digests,
storage,
p2p_node,
config,
)
.await
}
ReplicationMessageBody::AuditResponse(AuditResponse::Rejected {
challenge_id: resp_id,
reason,
}) => {
if resp_id != challenge_id {
warn!("Audit: challenge ID mismatch on Rejected from {challenged_peer}");
return handle_audit_failure(
&challenged_peer,
challenge_id,
&peer_keys,
AuditFailureReason::MalformedResponse,
p2p_node,
config,
)
.await;
}
warn!("Audit: challenge rejected by {challenged_peer}: {reason}");
handle_audit_failure(
&challenged_peer,
challenge_id,
&peer_keys,
AuditFailureReason::Rejected,
p2p_node,
config,
)
.await
}
_ => {
warn!("Audit: unexpected response type from {challenged_peer}");
handle_audit_failure(
&challenged_peer,
challenge_id,
&peer_keys,
AuditFailureReason::MalformedResponse,
p2p_node,
config,
)
.await
}
}
}
#[allow(clippy::too_many_arguments)]
async fn verify_digests(
challenged_peer: &PeerId,
challenge_id: u64,
nonce: &[u8; 32],
keys: &[XorName],
digests: &[[u8; 32]],
storage: &Arc<LmdbStorage>,
p2p_node: &Arc<P2PNode>,
config: &ReplicationConfig,
) -> AuditTickResult {
if digests.len() != keys.len() {
warn!(
"Audit: malformed response from {challenged_peer}: {} digests for {} keys",
digests.len(),
keys.len()
);
return handle_audit_failure(
challenged_peer,
challenge_id,
keys,
AuditFailureReason::MalformedResponse,
p2p_node,
config,
)
.await;
}
let challenged_peer_bytes = challenged_peer.as_bytes();
let mut failed_keys = Vec::new();
for (i, key) in keys.iter().enumerate() {
let received_digest = &digests[i];
if *received_digest == ABSENT_KEY_DIGEST {
failed_keys.push(*key);
continue;
}
let local_bytes = match storage.get_raw(key).await {
Ok(Some(bytes)) => bytes,
Ok(None) => {
warn!(
"Audit: local key {} disappeared during audit",
hex::encode(key)
);
continue;
}
Err(e) => {
warn!("Audit: failed to read local key {}: {e}", hex::encode(key));
continue;
}
};
let expected = compute_audit_digest(nonce, challenged_peer_bytes, key, &local_bytes);
if *received_digest != expected {
failed_keys.push(*key);
}
}
if failed_keys.is_empty() {
info!(
"Audit: peer {challenged_peer} passed (all {} keys verified)",
keys.len()
);
return AuditTickResult::Passed {
challenged_peer: *challenged_peer,
keys_checked: keys.len(),
};
}
handle_audit_failure(
challenged_peer,
challenge_id,
&failed_keys,
AuditFailureReason::DigestMismatch,
p2p_node,
config,
)
.await
}
async fn handle_audit_failure(
challenged_peer: &PeerId,
challenge_id: u64,
failed_keys: &[XorName],
reason: AuditFailureReason,
p2p_node: &Arc<P2PNode>,
config: &ReplicationConfig,
) -> AuditTickResult {
let dht = p2p_node.dht_manager();
let mut confirmed_failures = Vec::new();
for key in failed_keys {
let closest = dht
.find_closest_nodes_local_with_self(key, config.close_group_size)
.await;
if closest.iter().any(|n| n.peer_id == *challenged_peer) {
confirmed_failures.push(*key);
} else {
debug!(
"Audit: peer {challenged_peer} not responsible for {} (removed from failure set)",
hex::encode(key)
);
}
}
if confirmed_failures.is_empty() {
info!("Audit: all failures for {challenged_peer} cleared by responsibility confirmation");
return AuditTickResult::Idle;
}
let evidence = FailureEvidence::AuditFailure {
challenge_id,
challenged_peer: *challenged_peer,
confirmed_failed_keys: confirmed_failures,
reason,
};
AuditTickResult::Failed { evidence }
}
async fn handle_audit_timeout(
challenged_peer: &PeerId,
challenge_id: u64,
keys: &[XorName],
p2p_node: &Arc<P2PNode>,
config: &ReplicationConfig,
) -> AuditTickResult {
handle_audit_failure(
challenged_peer,
challenge_id,
keys,
AuditFailureReason::Timeout,
p2p_node,
config,
)
.await
}
pub async fn handle_audit_challenge(
challenge: &AuditChallenge,
storage: &LmdbStorage,
self_peer_id: &PeerId,
is_bootstrapping: bool,
stored_chunks: usize,
) -> AuditResponse {
if is_bootstrapping {
return AuditResponse::Bootstrapping {
challenge_id: challenge.challenge_id,
};
}
if challenge.challenged_peer_id != *self_peer_id.as_bytes() {
warn!(
"Audit challenge targeted wrong peer: expected {}, got {}",
hex::encode(self_peer_id.as_bytes()),
hex::encode(challenge.challenged_peer_id),
);
return AuditResponse::Rejected {
challenge_id: challenge.challenge_id,
reason: "challenged_peer_id does not match this node".to_string(),
};
}
let max_keys = ReplicationConfig::max_incoming_audit_keys(stored_chunks);
if challenge.keys.len() > max_keys {
warn!(
"Audit challenge rejected: {} keys exceeds dynamic limit of {max_keys} \
(stored_chunks={stored_chunks})",
challenge.keys.len(),
);
return AuditResponse::Rejected {
challenge_id: challenge.challenge_id,
reason: format!(
"challenge contains {} keys, limit is {max_keys}",
challenge.keys.len()
),
};
}
let mut digests = Vec::with_capacity(challenge.keys.len());
for key in &challenge.keys {
match storage.get_raw(key).await {
Ok(Some(data)) => {
let digest = compute_audit_digest(
&challenge.nonce,
&challenge.challenged_peer_id,
key,
&data,
);
digests.push(digest);
}
Ok(None) => {
digests.push(ABSENT_KEY_DIGEST);
}
Err(e) => {
warn!(
"Audit responder: failed to read key {}: {e}",
hex::encode(key)
);
digests.push(ABSENT_KEY_DIGEST);
}
}
}
AuditResponse::Digests {
challenge_id: challenge.challenge_id,
digests,
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::replication::protocol::compute_audit_digest;
use crate::replication::types::NeighborSyncState;
use crate::storage::LmdbStorageConfig;
use tempfile::TempDir;
const TEST_STORED_CHUNKS: usize = 1_000_000;
async fn create_test_storage() -> (LmdbStorage, TempDir) {
let temp_dir = TempDir::new().expect("create temp dir");
let config = LmdbStorageConfig {
root_dir: temp_dir.path().to_path_buf(),
verify_on_read: false,
max_map_size: 0,
disk_reserve: 0,
};
let storage = LmdbStorage::new(config).await.expect("create storage");
(storage, temp_dir)
}
fn make_challenge(
challenge_id: u64,
nonce: [u8; 32],
peer_id: [u8; 32],
keys: Vec<XorName>,
) -> AuditChallenge {
AuditChallenge {
challenge_id,
nonce,
challenged_peer_id: peer_id,
keys,
}
}
fn peer_id_from_bytes(bytes: [u8; 32]) -> PeerId {
PeerId::from_bytes(bytes)
}
#[tokio::test]
async fn handle_challenge_present_keys_returns_correct_digests() {
let (storage, _temp) = create_test_storage().await;
let content_a = b"chunk alpha";
let addr_a = LmdbStorage::compute_address(content_a);
storage.put(&addr_a, content_a).await.expect("put a");
let content_b = b"chunk beta";
let addr_b = LmdbStorage::compute_address(content_b);
storage.put(&addr_b, content_b).await.expect("put b");
let nonce = [0xAA; 32];
let peer_id = [0xBB; 32];
let challenge = make_challenge(42, nonce, peer_id, vec![addr_a, addr_b]);
let self_id = peer_id_from_bytes(peer_id);
let response =
handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
match response {
AuditResponse::Digests {
challenge_id,
digests,
} => {
assert_eq!(challenge_id, 42);
assert_eq!(digests.len(), 2);
let expected_a = compute_audit_digest(&nonce, &peer_id, &addr_a, content_a);
let expected_b = compute_audit_digest(&nonce, &peer_id, &addr_b, content_b);
assert_eq!(digests[0], expected_a);
assert_eq!(digests[1], expected_b);
}
AuditResponse::Bootstrapping { .. } => {
panic!("expected Digests, got Bootstrapping");
}
AuditResponse::Rejected { .. } => {
panic!("Unexpected Rejected response");
}
}
}
#[tokio::test]
async fn handle_challenge_absent_keys_returns_sentinel() {
let (storage, _temp) = create_test_storage().await;
let absent_key = [0xFF; 32];
let nonce = [0x11; 32];
let peer_id = [0x22; 32];
let challenge = make_challenge(99, nonce, peer_id, vec![absent_key]);
let self_id = peer_id_from_bytes(peer_id);
let response =
handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
match response {
AuditResponse::Digests {
challenge_id,
digests,
} => {
assert_eq!(challenge_id, 99);
assert_eq!(digests.len(), 1);
assert_eq!(
digests[0], ABSENT_KEY_DIGEST,
"absent key should produce sentinel digest"
);
}
AuditResponse::Bootstrapping { .. } => {
panic!("expected Digests, got Bootstrapping");
}
AuditResponse::Rejected { .. } => {
panic!("Unexpected Rejected response");
}
}
}
#[tokio::test]
async fn handle_challenge_mixed_present_and_absent() {
let (storage, _temp) = create_test_storage().await;
let content = b"present chunk";
let addr_present = LmdbStorage::compute_address(content);
storage.put(&addr_present, content).await.expect("put");
let addr_absent = [0xDE; 32];
let nonce = [0x33; 32];
let peer_id = [0x44; 32];
let challenge = make_challenge(7, nonce, peer_id, vec![addr_present, addr_absent]);
let self_id = peer_id_from_bytes(peer_id);
let response =
handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
match response {
AuditResponse::Digests { digests, .. } => {
assert_eq!(digests.len(), 2);
let expected_present =
compute_audit_digest(&nonce, &peer_id, &addr_present, content);
assert_eq!(digests[0], expected_present);
assert_eq!(
digests[1], ABSENT_KEY_DIGEST,
"absent key should be sentinel"
);
}
AuditResponse::Bootstrapping { .. } => {
panic!("expected Digests, got Bootstrapping");
}
AuditResponse::Rejected { .. } => {
panic!("Unexpected Rejected response");
}
}
}
#[tokio::test]
async fn handle_challenge_bootstrapping_returns_bootstrapping_response() {
let (storage, _temp) = create_test_storage().await;
let challenge = make_challenge(55, [0x00; 32], [0x01; 32], vec![[0x02; 32]]);
let self_id = peer_id_from_bytes([0x01; 32]);
let response =
handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
match response {
AuditResponse::Bootstrapping { challenge_id } => {
assert_eq!(challenge_id, 55);
}
AuditResponse::Digests { .. } => {
panic!("expected Bootstrapping, got Digests");
}
AuditResponse::Rejected { .. } => {
panic!("Unexpected Rejected response");
}
}
}
#[tokio::test]
async fn handle_challenge_empty_keys_returns_empty_digests() {
let (storage, _temp) = create_test_storage().await;
let challenge = make_challenge(100, [0x10; 32], [0x20; 32], vec![]);
let self_id = peer_id_from_bytes([0x20; 32]);
let response =
handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
match response {
AuditResponse::Digests {
challenge_id,
digests,
} => {
assert_eq!(challenge_id, 100);
assert!(
digests.is_empty(),
"empty key list should yield empty digests"
);
}
AuditResponse::Bootstrapping { .. } => {
panic!("expected Digests, got Bootstrapping");
}
AuditResponse::Rejected { .. } => {
panic!("Unexpected Rejected response");
}
}
}
#[test]
fn digest_verification_matching() {
let nonce = [0x01; 32];
let peer_id = [0x02; 32];
let key: XorName = [0x03; 32];
let data = b"correct data";
let expected = compute_audit_digest(&nonce, &peer_id, &key, data);
let recomputed = compute_audit_digest(&nonce, &peer_id, &key, data);
assert_eq!(
expected, recomputed,
"same inputs must produce identical digests"
);
assert_ne!(
expected, ABSENT_KEY_DIGEST,
"real digest must not be sentinel"
);
}
#[test]
fn digest_verification_mismatching_data() {
let nonce = [0x01; 32];
let peer_id = [0x02; 32];
let key: XorName = [0x03; 32];
let digest_a = compute_audit_digest(&nonce, &peer_id, &key, b"data version A");
let digest_b = compute_audit_digest(&nonce, &peer_id, &key, b"data version B");
assert_ne!(
digest_a, digest_b,
"different data must produce different digests"
);
}
#[test]
fn digest_verification_mismatching_nonce() {
let peer_id = [0x02; 32];
let key: XorName = [0x03; 32];
let data = b"same data";
let digest_a = compute_audit_digest(&[0x01; 32], &peer_id, &key, data);
let digest_b = compute_audit_digest(&[0xFF; 32], &peer_id, &key, data);
assert_ne!(
digest_a, digest_b,
"different nonces must produce different digests"
);
}
#[test]
fn digest_verification_mismatching_peer() {
let nonce = [0x01; 32];
let key: XorName = [0x03; 32];
let data = b"same data";
let digest_a = compute_audit_digest(&nonce, &[0x02; 32], &key, data);
let digest_b = compute_audit_digest(&nonce, &[0xFE; 32], &key, data);
assert_ne!(
digest_a, digest_b,
"different peers must produce different digests"
);
}
#[test]
fn digest_verification_mismatching_key() {
let nonce = [0x01; 32];
let peer_id = [0x02; 32];
let data = b"same data";
let digest_a = compute_audit_digest(&nonce, &peer_id, &[0x03; 32], data);
let digest_b = compute_audit_digest(&nonce, &peer_id, &[0xFC; 32], data);
assert_ne!(
digest_a, digest_b,
"different keys must produce different digests"
);
}
#[test]
fn absent_sentinel_is_all_zeros() {
assert_eq!(ABSENT_KEY_DIGEST, [0u8; 32], "sentinel must be all zeros");
}
#[tokio::test]
async fn bootstrapping_skips_digest_computation() {
let (storage, _temp) = create_test_storage().await;
let content = b"stored but bootstrapping";
let addr = LmdbStorage::compute_address(content);
storage.put(&addr, content).await.expect("put");
let challenge = make_challenge(200, [0xCC; 32], [0xDD; 32], vec![addr]);
let self_id = peer_id_from_bytes([0xDD; 32]);
let response =
handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
assert!(
matches!(response, AuditResponse::Bootstrapping { challenge_id: 200 }),
"bootstrapping node must not compute digests"
);
}
#[tokio::test]
async fn scenario_19_partial_failure_mixed_responsibility() {
let (storage, _temp) = create_test_storage().await;
let nonce = [0x42u8; 32];
let peer_id = [0xAA; 32];
let content_k1 = b"key one data";
let addr_k1 = LmdbStorage::compute_address(content_k1);
storage.put(&addr_k1, content_k1).await.unwrap();
let content_k2 = b"key two data";
let addr_k2 = LmdbStorage::compute_address(content_k2);
storage.put(&addr_k2, content_k2).await.unwrap();
let addr_k3 = [0xFF; 32];
let challenge = AuditChallenge {
challenge_id: 100,
nonce,
challenged_peer_id: peer_id,
keys: vec![addr_k1, addr_k2, addr_k3],
};
let self_id = peer_id_from_bytes(peer_id);
let response =
handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
match response {
AuditResponse::Digests { digests, .. } => {
assert_eq!(digests.len(), 3);
let expected_k1 = compute_audit_digest(&nonce, &peer_id, &addr_k1, content_k1);
assert_eq!(digests[0], expected_k1);
let expected_k2 = compute_audit_digest(&nonce, &peer_id, &addr_k2, content_k2);
assert_eq!(digests[1], expected_k2);
assert_eq!(digests[2], ABSENT_KEY_DIGEST);
}
AuditResponse::Bootstrapping { .. } => panic!("Expected Digests response"),
AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
}
}
#[tokio::test]
async fn scenario_54_all_digests_pass() {
let (storage, _temp) = create_test_storage().await;
let nonce = [0x10; 32];
let peer_id = [0x20; 32];
let c1 = b"chunk alpha";
let c2 = b"chunk beta";
let c3 = b"chunk gamma";
let a1 = LmdbStorage::compute_address(c1);
let a2 = LmdbStorage::compute_address(c2);
let a3 = LmdbStorage::compute_address(c3);
storage.put(&a1, c1).await.unwrap();
storage.put(&a2, c2).await.unwrap();
storage.put(&a3, c3).await.unwrap();
let challenge = AuditChallenge {
challenge_id: 200,
nonce,
challenged_peer_id: peer_id,
keys: vec![a1, a2, a3],
};
let self_id = peer_id_from_bytes(peer_id);
let response =
handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
match response {
AuditResponse::Digests { digests, .. } => {
assert_eq!(digests.len(), 3);
for (i, (addr, content)) in [(a1, &c1[..]), (a2, &c2[..]), (a3, &c3[..])]
.iter()
.enumerate()
{
let expected = compute_audit_digest(&nonce, &peer_id, addr, content);
assert_eq!(digests[i], expected, "Key {i} digest should match");
}
}
AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
}
}
#[tokio::test]
async fn scenario_55_no_confirmed_responsibility_no_evidence() {
let (storage, _temp) = create_test_storage().await;
let nonce = [0x55; 32];
let peer_id = [0x55; 32];
let c1 = b"scenario 55 key one";
let c2 = b"scenario 55 key two";
let k1 = LmdbStorage::compute_address(c1);
let k2 = LmdbStorage::compute_address(c2);
storage.put(&k1, c1).await.expect("put k1");
storage.put(&k2, c2).await.expect("put k2");
let expected_d1 = compute_audit_digest(&nonce, &peer_id, &k1, c1);
let expected_d2 = compute_audit_digest(&nonce, &peer_id, &k2, c2);
let wrong_d1 = compute_audit_digest(&nonce, &peer_id, &k1, b"corrupted k1");
let wrong_d2 = compute_audit_digest(&nonce, &peer_id, &k2, b"corrupted k2");
assert_ne!(wrong_d1, expected_d1, "K1 digest should mismatch");
assert_ne!(wrong_d2, expected_d2, "K2 digest should mismatch");
let keys = [k1, k2];
let expected = [expected_d1, expected_d2];
let received = [wrong_d1, wrong_d2];
let mut failed_keys = Vec::new();
for i in 0..keys.len() {
if received[i] != expected[i] {
failed_keys.push(keys[i]);
}
}
assert_eq!(
failed_keys.len(),
2,
"Both keys should be identified as digest mismatches"
);
let confirmed_responsible_keys: Vec<XorName> = Vec::new();
let confirmed_failures: Vec<XorName> = failed_keys
.into_iter()
.filter(|k| confirmed_responsible_keys.contains(k))
.collect();
assert!(
confirmed_failures.is_empty(),
"With no confirmed responsibility, failure set must be empty — \
no AuditFailure evidence should be emitted"
);
let peer = PeerId::from_bytes(peer_id);
let evidence = FailureEvidence::AuditFailure {
challenge_id: 5500,
challenged_peer: peer,
confirmed_failed_keys: confirmed_failures,
reason: AuditFailureReason::DigestMismatch,
};
if let FailureEvidence::AuditFailure {
confirmed_failed_keys,
..
} = evidence
{
assert!(
confirmed_failed_keys.is_empty(),
"Evidence with empty failure set should not trigger a trust penalty"
);
}
}
#[test]
fn scenario_56_repair_opportunity_filters_never_synced() {
let never_synced = PeerSyncRecord {
last_sync: None,
cycles_since_sync: 5,
};
assert!(!never_synced.has_repair_opportunity());
let synced_no_cycle = PeerSyncRecord {
last_sync: Some(Instant::now()),
cycles_since_sync: 0,
};
assert!(!synced_no_cycle.has_repair_opportunity());
let synced_with_cycle = PeerSyncRecord {
last_sync: Some(Instant::now()),
cycles_since_sync: 1,
};
assert!(synced_with_cycle.has_repair_opportunity());
}
#[tokio::test]
async fn audit_response_must_match_key_count() {
let (storage, _temp) = create_test_storage().await;
let nonce = [0x50; 32];
let peer_id = [0x60; 32];
let content = b"single chunk";
let addr = LmdbStorage::compute_address(content);
storage.put(&addr, content).await.unwrap();
let absent_keys: Vec<XorName> = (1..=4u8).map(|i| [i; 32]).collect();
let mut keys = vec![addr];
keys.extend_from_slice(&absent_keys);
let key_count = keys.len();
let challenge = make_challenge(300, nonce, peer_id, keys);
let self_id = peer_id_from_bytes(peer_id);
let response =
handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
match response {
AuditResponse::Digests { digests, .. } => {
assert_eq!(
digests.len(),
key_count,
"must produce exactly one digest per challenged key"
);
}
AuditResponse::Bootstrapping { .. } => panic!("Expected Digests"),
AuditResponse::Rejected { .. } => panic!("Unexpected Rejected response"),
}
}
#[test]
fn audit_digest_uses_full_record_bytes() {
let nonce = [1u8; 32];
let peer = [2u8; 32];
let key = [3u8; 32];
let d1 = compute_audit_digest(&nonce, &peer, &key, b"data version 1");
let d2 = compute_audit_digest(&nonce, &peer, &key, b"data version 2");
assert_ne!(
d1, d2,
"Different record bytes must produce different digests"
);
}
#[tokio::test]
async fn scenario_29_audit_start_gate_during_bootstrap() {
let (storage, _temp) = create_test_storage().await;
let content = b"should not be audited during bootstrap";
let addr = LmdbStorage::compute_address(content);
storage.put(&addr, content).await.expect("put");
let challenge = make_challenge(2900, [0x29; 32], [0x29; 32], vec![addr]);
let self_id = peer_id_from_bytes([0x29; 32]);
let response =
handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
assert!(
matches!(
response,
AuditResponse::Bootstrapping { challenge_id: 2900 }
),
"bootstrapping node must not compute digests — audit start gate"
);
let response =
handle_audit_challenge(&challenge, &storage, &self_id, false, TEST_STORED_CHUNKS).await;
assert!(
matches!(response, AuditResponse::Digests { .. }),
"drained node should compute digests normally"
);
}
#[test]
fn scenario_30_audit_peer_selection_from_sampled_keys() {
assert_eq!(
ReplicationConfig::audit_sample_count(100),
10,
"sample count should scale with sqrt(total_keys)"
);
assert_eq!(ReplicationConfig::audit_sample_count(3), 1, "sqrt(3) = 1");
assert_eq!(
ReplicationConfig::audit_sample_count(10_000),
100,
"sqrt(10000) = 100"
);
let never = PeerSyncRecord {
last_sync: None,
cycles_since_sync: 10,
};
assert!(!never.has_repair_opportunity());
let too_soon = PeerSyncRecord {
last_sync: Some(Instant::now()),
cycles_since_sync: 0,
};
assert!(!too_soon.has_repair_opportunity());
let eligible = PeerSyncRecord {
last_sync: Some(Instant::now()),
cycles_since_sync: 2,
};
assert!(eligible.has_repair_opportunity());
}
#[tokio::test]
async fn scenario_32_dynamic_challenge_size() {
let (storage, _temp) = create_test_storage().await;
let mut addrs = Vec::new();
for i in 0u8..5 {
let content = format!("dynamic challenge key {i}");
let addr = LmdbStorage::compute_address(content.as_bytes());
storage.put(&addr, content.as_bytes()).await.expect("put");
addrs.push(addr);
}
let nonce = [0x32; 32];
let peer_id = [0x32; 32];
let self_id = peer_id_from_bytes(peer_id);
let challenge1 = make_challenge(3201, nonce, peer_id, vec![addrs[0]]);
let resp1 =
handle_audit_challenge(&challenge1, &storage, &self_id, false, TEST_STORED_CHUNKS)
.await;
if let AuditResponse::Digests { digests, .. } = resp1 {
assert_eq!(digests.len(), 1, "|PeerKeySet| = 1 → 1 digest");
}
let challenge3 = make_challenge(3203, nonce, peer_id, addrs[0..3].to_vec());
let resp3 =
handle_audit_challenge(&challenge3, &storage, &self_id, false, TEST_STORED_CHUNKS)
.await;
if let AuditResponse::Digests { digests, .. } = resp3 {
assert_eq!(digests.len(), 3, "|PeerKeySet| = 3 → 3 digests");
}
let challenge5 = make_challenge(3205, nonce, peer_id, addrs.clone());
let resp5 =
handle_audit_challenge(&challenge5, &storage, &self_id, false, TEST_STORED_CHUNKS)
.await;
if let AuditResponse::Digests { digests, .. } = resp5 {
assert_eq!(digests.len(), 5, "|PeerKeySet| = 5 → 5 digests");
}
let challenge0 = make_challenge(3200, nonce, peer_id, vec![]);
let resp0 =
handle_audit_challenge(&challenge0, &storage, &self_id, false, TEST_STORED_CHUNKS)
.await;
if let AuditResponse::Digests { digests, .. } = resp0 {
assert!(digests.is_empty(), "|PeerKeySet| = 0 → 0 digests (idle)");
}
}
#[tokio::test]
async fn scenario_47_bootstrap_claim_grace_period_audit() {
let (storage, _temp) = create_test_storage().await;
let content = b"bootstrap grace test";
let addr = LmdbStorage::compute_address(content);
storage.put(&addr, content).await.expect("put");
let challenge = make_challenge(4700, [0x47; 32], [0x47; 32], vec![addr]);
let self_id = peer_id_from_bytes([0x47; 32]);
let response =
handle_audit_challenge(&challenge, &storage, &self_id, true, TEST_STORED_CHUNKS).await;
let challenge_id = match response {
AuditResponse::Bootstrapping { challenge_id } => challenge_id,
AuditResponse::Digests { .. } => {
panic!("Expected Bootstrapping response during grace period")
}
AuditResponse::Rejected { .. } => {
panic!("Unexpected Rejected response")
}
};
assert_eq!(challenge_id, 4700);
let peer = PeerId::from_bytes([0x47; 32]);
let mut state = NeighborSyncState::new_cycle(vec![peer]);
let now = Instant::now();
state.bootstrap_claims.entry(peer).or_insert(now);
assert!(
state.bootstrap_claims.contains_key(&peer),
"BootstrapClaimFirstSeen should be recorded after grace-period claim"
);
}
#[tokio::test]
async fn scenario_53_partial_failure_mixed_responsibility() {
let (storage, _temp) = create_test_storage().await;
let nonce = [0x53; 32];
let peer_id = [0x53; 32];
let c1 = b"scenario 53 key one";
let c2 = b"scenario 53 key two";
let c3 = b"scenario 53 key three";
let k1 = LmdbStorage::compute_address(c1);
let k2 = LmdbStorage::compute_address(c2);
let k3 = LmdbStorage::compute_address(c3);
storage.put(&k1, c1).await.expect("put k1");
storage.put(&k2, c2).await.expect("put k2");
storage.put(&k3, c3).await.expect("put k3");
let d1_expected = compute_audit_digest(&nonce, &peer_id, &k1, c1);
let d2_expected = compute_audit_digest(&nonce, &peer_id, &k2, c2);
let d3_expected = compute_audit_digest(&nonce, &peer_id, &k3, c3);
let d2_wrong = compute_audit_digest(&nonce, &peer_id, &k2, b"tampered k2");
let d3_wrong = compute_audit_digest(&nonce, &peer_id, &k3, b"tampered k3");
assert_eq!(d1_expected, d1_expected, "K1 should match");
assert_ne!(d2_wrong, d2_expected, "K2 should mismatch");
assert_ne!(d3_wrong, d3_expected, "K3 should mismatch");
let digests = [d1_expected, d2_wrong, d3_wrong];
let keys = [k1, k2, k3];
let contents: [&[u8]; 3] = [c1, c2, c3];
let mut failed_keys = Vec::new();
for (i, key) in keys.iter().enumerate() {
if digests[i] == ABSENT_KEY_DIGEST {
failed_keys.push(*key);
continue;
}
let expected = compute_audit_digest(&nonce, &peer_id, key, contents[i]);
if digests[i] != expected {
failed_keys.push(*key);
}
}
assert_eq!(failed_keys.len(), 2, "K2 and K3 should be in failure set");
assert!(failed_keys.contains(&k2));
assert!(failed_keys.contains(&k3));
assert!(!failed_keys.contains(&k1), "K1 passed digest check");
let responsible_for_k2 = true;
let responsible_for_k3 = false;
let mut confirmed = Vec::new();
for key in &failed_keys {
let is_responsible = if *key == k2 {
responsible_for_k2
} else {
responsible_for_k3
};
if is_responsible {
confirmed.push(*key);
}
}
assert_eq!(confirmed, vec![k2], "Only K2 should be in confirmed set");
let challenged_peer = PeerId::from_bytes(peer_id);
let evidence = FailureEvidence::AuditFailure {
challenge_id: 5300,
challenged_peer,
confirmed_failed_keys: confirmed,
reason: AuditFailureReason::DigestMismatch,
};
match evidence {
FailureEvidence::AuditFailure {
confirmed_failed_keys,
..
} => {
assert_eq!(
confirmed_failed_keys.len(),
1,
"Only K2 should generate evidence"
);
assert_eq!(confirmed_failed_keys[0], k2);
}
_ => panic!("Expected AuditFailure evidence"),
}
}
}