use std::sync::Arc;
use std::time::{Duration, Instant};
use rand::Rng;
use saorsa_core::identity::PeerId;
use saorsa_core::{P2PNode, TrustEvent};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use crate::ant_protocol::XorName;
use crate::logging::{debug, warn};
use crate::replication::config::{
ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT, REPLICATION_PROTOCOL_ID,
};
use crate::replication::protocol::{
compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
ReplicationMessageBody, ABSENT_KEY_DIGEST,
};
use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState};
use crate::storage::LmdbStorage;
use super::REPLICATION_TRUST_WEIGHT;
const POSSESSION_PROBE_KEY_COUNT: usize = 1;
pub struct PossessionCheckEvent {
pub key: XorName,
pub peers: Vec<PeerId>,
}
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
enum ProbeOutcome {
Present,
Failed,
Timeout,
BootstrapClaim,
Inconclusive,
}
#[must_use]
pub fn random_delay(min: Duration, max: Duration) -> Duration {
let to_millis = |d: Duration| u64::try_from(d.as_millis()).unwrap_or(u64::MAX);
let min_ms = to_millis(min);
let max_ms = to_millis(max);
if min_ms >= max_ms {
return min;
}
Duration::from_millis(rand::thread_rng().gen_range(min_ms..=max_ms))
}
pub(crate) async fn run_possession_check(
key: XorName,
peers: Vec<PeerId>,
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
config: &ReplicationConfig,
sync_state: &Arc<RwLock<NeighborSyncState>>,
shutdown: &CancellationToken,
) {
let key_hex = hex::encode(key);
let local_bytes = match storage.get_raw(&key).await {
Ok(Some(bytes)) => bytes,
Ok(None) => {
debug!("Possession check: checker no longer holds {key_hex}; skipping");
return;
}
Err(e) => {
warn!("Possession check: failed to read local {key_hex}: {e}; skipping");
return;
}
};
let probe_timeout = config.audit_response_timeout(POSSESSION_PROBE_KEY_COUNT);
for peer in peers {
if shutdown.is_cancelled() {
return;
}
match probe_once(&key, &local_bytes, &peer, p2p_node, probe_timeout).await {
ProbeOutcome::Present => {
debug!("Possession check: {peer} proved possession of {key_hex}");
clear_possession_bootstrap_claim(&peer, sync_state).await;
}
ProbeOutcome::Failed => {
clear_possession_bootstrap_claim(&peer, sync_state).await;
report_possession_audit_failure(
&peer,
&key_hex,
"failed to prove possession",
p2p_node,
)
.await;
}
ProbeOutcome::Timeout => {
report_possession_audit_failure(&peer, &key_hex, "timed out", p2p_node).await;
}
ProbeOutcome::BootstrapClaim => {
handle_possession_bootstrap_claim(&peer, &key_hex, p2p_node, config, sync_state)
.await;
}
ProbeOutcome::Inconclusive => {
debug!(
"Possession check: inconclusive probe of {peer} for {key_hex}; not penalised"
);
}
}
}
}
async fn clear_possession_bootstrap_claim(
peer: &PeerId,
sync_state: &Arc<RwLock<NeighborSyncState>>,
) {
sync_state.write().await.clear_active_bootstrap_claim(peer);
}
async fn report_possession_audit_failure(
peer: &PeerId,
key_hex: &str,
reason: &str,
p2p_node: &Arc<P2PNode>,
) {
warn!("Possession check: {peer} {reason} for {key_hex}; penalising at audit severity");
p2p_node
.report_trust_event(
peer,
TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT),
)
.await;
}
async fn handle_possession_bootstrap_claim(
peer: &PeerId,
key_hex: &str,
p2p_node: &Arc<P2PNode>,
config: &ReplicationConfig,
sync_state: &Arc<RwLock<NeighborSyncState>>,
) {
let (now, observation) = {
let now = Instant::now();
let mut state = sync_state.write().await;
(
now,
state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period),
)
};
match observation {
BootstrapClaimObservation::WithinGrace { .. } => {
debug!(
"Possession check: peer {peer} claims bootstrapping for {key_hex} \
(within grace period)"
);
}
BootstrapClaimObservation::PastGrace { first_seen } => {
warn!(
"Possession check: peer {peer} claiming bootstrap for {key_hex} past grace period \
({:?} > {:?}), reporting abuse",
now.duration_since(first_seen),
config.bootstrap_claim_grace_period,
);
p2p_node
.report_trust_event(
peer,
TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
)
.await;
}
BootstrapClaimObservation::Repeated { first_seen } => {
warn!(
"Possession check: peer {peer} repeated bootstrap claim for {key_hex} after \
previously stopping; first claim was {:?} ago, reporting abuse",
now.duration_since(first_seen),
);
p2p_node
.report_trust_event(
peer,
TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
)
.await;
}
}
}
async fn probe_once(
key: &XorName,
local_bytes: &[u8],
peer: &PeerId,
p2p_node: &Arc<P2PNode>,
probe_timeout: Duration,
) -> ProbeOutcome {
let (nonce, challenge_id) = {
let mut rng = rand::thread_rng();
let nonce: [u8; 32] = rng.gen();
let challenge_id: u64 = rng.gen();
(nonce, challenge_id)
};
let challenge = AuditChallenge {
challenge_id,
nonce,
challenged_peer_id: *peer.as_bytes(),
keys: vec![*key],
};
let msg = ReplicationMessage {
request_id: challenge_id,
body: ReplicationMessageBody::AuditChallenge(challenge),
};
let Ok(encoded) = msg.encode() else {
warn!(
"Failed to encode possession challenge for {}",
hex::encode(key)
);
return ProbeOutcome::Inconclusive;
};
let response = match p2p_node
.send_request(peer, REPLICATION_PROTOCOL_ID, encoded, probe_timeout)
.await
{
Ok(response) => response,
Err(e) => {
debug!("Possession probe to {peer} got no response: {e}");
return ProbeOutcome::Timeout;
}
};
let decoded = match ReplicationMessage::decode(&response.data) {
Ok(decoded) => decoded,
Err(e) => {
debug!("Failed to decode possession response from {peer}: {e}");
return ProbeOutcome::Failed;
}
};
let ReplicationMessageBody::AuditResponse(resp) = decoded.body else {
debug!("Unexpected possession response type from {peer}");
return ProbeOutcome::Failed;
};
interpret_audit_response(
key,
local_bytes,
peer.as_bytes(),
&nonce,
challenge_id,
resp,
)
}
fn interpret_audit_response(
key: &XorName,
local_bytes: &[u8],
challenged_peer_id: &[u8; 32],
nonce: &[u8; 32],
challenge_id: u64,
response: AuditResponse,
) -> ProbeOutcome {
match response {
AuditResponse::Digests {
challenge_id: resp_id,
digests,
} => {
if resp_id != challenge_id || digests.len() != 1 {
return ProbeOutcome::Failed;
}
let received = digests[0];
if received == ABSENT_KEY_DIGEST {
return ProbeOutcome::Failed;
}
let expected = compute_audit_digest(nonce, challenged_peer_id, key, local_bytes);
if received == expected {
ProbeOutcome::Present
} else {
ProbeOutcome::Failed
}
}
AuditResponse::Bootstrapping {
challenge_id: resp_id,
} => {
if resp_id == challenge_id {
ProbeOutcome::BootstrapClaim
} else {
ProbeOutcome::Failed
}
}
AuditResponse::Rejected { .. } => ProbeOutcome::Failed,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::replication::config::{POSSESSION_CHECK_DELAY_MAX, POSSESSION_CHECK_DELAY_MIN};
const PEER_ID: [u8; 32] = [0x42; 32];
const NONCE: [u8; 32] = [0x7a; 32];
const CHALLENGE_ID: u64 = 0xDEAD_BEEF;
const KEY: XorName = [0x11; 32];
const BYTES: &[u8] = b"possession-check payload";
fn digests_response(challenge_id: u64, digests: Vec<[u8; 32]>) -> AuditResponse {
AuditResponse::Digests {
challenge_id,
digests,
}
}
#[test]
fn random_delay_is_within_bounds() {
for _ in 0..100 {
let d = random_delay(POSSESSION_CHECK_DELAY_MIN, POSSESSION_CHECK_DELAY_MAX);
assert!(d >= POSSESSION_CHECK_DELAY_MIN);
assert!(d <= POSSESSION_CHECK_DELAY_MAX);
}
}
#[test]
fn matching_digest_is_present() {
let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES);
let verdict = interpret_audit_response(
&KEY,
BYTES,
&PEER_ID,
&NONCE,
CHALLENGE_ID,
digests_response(CHALLENGE_ID, vec![valid]),
);
assert_eq!(verdict, ProbeOutcome::Present);
}
#[test]
fn absent_sentinel_is_failed() {
let verdict = interpret_audit_response(
&KEY,
BYTES,
&PEER_ID,
&NONCE,
CHALLENGE_ID,
digests_response(CHALLENGE_ID, vec![ABSENT_KEY_DIGEST]),
);
assert_eq!(verdict, ProbeOutcome::Failed);
}
#[test]
fn forged_digest_is_failed() {
let forged = [0x99; 32];
let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES);
assert_ne!(forged, valid, "test fixture must use a wrong digest");
let verdict = interpret_audit_response(
&KEY,
BYTES,
&PEER_ID,
&NONCE,
CHALLENGE_ID,
digests_response(CHALLENGE_ID, vec![forged]),
);
assert_eq!(verdict, ProbeOutcome::Failed);
}
#[test]
fn mismatched_challenge_id_is_failed() {
let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES);
let verdict = interpret_audit_response(
&KEY,
BYTES,
&PEER_ID,
&NONCE,
CHALLENGE_ID,
digests_response(CHALLENGE_ID.wrapping_add(1), vec![valid]),
);
assert_eq!(verdict, ProbeOutcome::Failed);
}
#[test]
fn wrong_arity_is_failed() {
let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES);
let verdict = interpret_audit_response(
&KEY,
BYTES,
&PEER_ID,
&NONCE,
CHALLENGE_ID,
digests_response(CHALLENGE_ID, vec![valid, ABSENT_KEY_DIGEST]),
);
assert_eq!(verdict, ProbeOutcome::Failed);
}
#[test]
fn bootstrapping_is_bootstrap_claim() {
let verdict = interpret_audit_response(
&KEY,
BYTES,
&PEER_ID,
&NONCE,
CHALLENGE_ID,
AuditResponse::Bootstrapping {
challenge_id: CHALLENGE_ID,
},
);
assert_eq!(verdict, ProbeOutcome::BootstrapClaim);
}
#[test]
fn bootstrapping_with_wrong_challenge_id_is_failed() {
let verdict = interpret_audit_response(
&KEY,
BYTES,
&PEER_ID,
&NONCE,
CHALLENGE_ID,
AuditResponse::Bootstrapping {
challenge_id: CHALLENGE_ID.wrapping_add(1),
},
);
assert_eq!(verdict, ProbeOutcome::Failed);
}
#[tokio::test]
async fn possession_success_clears_active_bootstrap_claim_but_keeps_history() {
let peer = PeerId::from_bytes(PEER_ID);
let sync_state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(Vec::new())));
{
let mut state = sync_state.write().await;
let now = Instant::now();
state.bootstrap_claims.insert(peer, now);
state.bootstrap_claim_history.insert(peer, now);
}
clear_possession_bootstrap_claim(&peer, &sync_state).await;
let state = sync_state.read().await;
assert!(!state.bootstrap_claims.contains_key(&peer));
assert!(state.bootstrap_claim_history.contains_key(&peer));
}
#[test]
fn rejected_is_failed() {
let verdict = interpret_audit_response(
&KEY,
BYTES,
&PEER_ID,
&NONCE,
CHALLENGE_ID,
AuditResponse::Rejected {
challenge_id: CHALLENGE_ID,
reason: "nope".to_string(),
},
);
assert_eq!(verdict, ProbeOutcome::Failed);
}
}