Skip to main content

ant_node/replication/
possession.rs

1//! Delayed possession verification for fresh replication (ADR-0003).
2//!
3//! After a node fresh-replicates a chunk, every close-group peer responsible
4//! for it is checked 5-15 minutes later for actual possession. The check is a
5//! single-key cryptographic
6//! [`AuditChallenge`]: the probed
7//! peer must return `BLAKE3(nonce ‖ peer_id ‖ key ‖ bytes)` computed over the
8//! chunk it claims to hold. It cannot produce that digest without the bytes, so
9//! — unlike a self-reported presence flag — a peer cannot escape the check by
10//! falsely asserting possession. A peer that holds the chunk earns nothing —
11//! storing what it was paid to store is the baseline expectation, not
12//! meritorious; a peer that returns the absent sentinel, or a digest that does
13//! not match the checker's canonical copy (cryptographic proof it lacks the
14//! bytes), is penalised at `AuditChallenge` severity. Delivery of the original
15//! push is irrelevant: a peer the push never reached is still checked and
16//! penalised if it lacks the chunk.
17//!
18//! A peer unreachable at check time is penalised immediately at audit severity,
19//! matching the responsible-chunk `AuditChallenge` path. A matching bootstrap
20//! claim uses the shared bootstrap-claim grace/abuse tracker; peer-side
21//! malformed, rejected, or mismatched responses are audit failures.
22
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use rand::Rng;
27use saorsa_core::identity::PeerId;
28use saorsa_core::{P2PNode, TrustEvent};
29use tokio::sync::RwLock;
30use tokio_util::sync::CancellationToken;
31
32use crate::ant_protocol::XorName;
33use crate::logging::{debug, warn};
34use crate::replication::config::{
35    ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT, REPLICATION_PROTOCOL_ID,
36};
37use crate::replication::protocol::{
38    compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
39    ReplicationMessageBody, ABSENT_KEY_DIGEST,
40};
41use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState};
42use crate::storage::LmdbStorage;
43
44use super::REPLICATION_TRUST_WEIGHT;
45
46/// A possession probe challenges exactly one key, so the per-probe response
47/// budget is the audit-response timeout sized for a single chunk.
48const POSSESSION_PROBE_KEY_COUNT: usize = 1;
49
50/// A scheduled possession check for one freshly-replicated chunk.
51pub struct PossessionCheckEvent {
52    /// Content-address of the chunk.
53    pub key: XorName,
54    /// Close-group peers responsible for holding it (excludes self).
55    pub peers: Vec<PeerId>,
56}
57
58/// Verdict of cryptographically probing a single peer for possession of a chunk.
59#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
60enum ProbeOutcome {
61    /// Peer returned a digest proving it holds the chunk's bytes.
62    Present,
63    /// Peer failed the audit challenge: absent sentinel, digest mismatch,
64    /// rejection, mismatched challenge ID, wrong digest count, or malformed reply.
65    Failed,
66    /// No response (transport error / deadline). Penalised immediately at
67    /// audit-failure severity.
68    Timeout,
69    /// Peer returned a matching bootstrap claim. Graced only through the shared
70    /// bootstrap-claim tracker.
71    BootstrapClaim,
72    /// The probe could not be sent locally. Graced: no penalty.
73    Inconclusive,
74}
75
76/// Pick a randomised delay in `[min, max]` to wait before a possession check
77/// runs. The bounds come from `ReplicationConfig` (defaulting to
78/// `POSSESSION_CHECK_DELAY_MIN`/`MAX`) so tests can shorten them.
79#[must_use]
80pub fn random_delay(min: Duration, max: Duration) -> Duration {
81    let to_millis = |d: Duration| u64::try_from(d.as_millis()).unwrap_or(u64::MAX);
82    let min_ms = to_millis(min);
83    let max_ms = to_millis(max);
84    if min_ms >= max_ms {
85        return min;
86    }
87    Duration::from_millis(rand::thread_rng().gen_range(min_ms..=max_ms))
88}
89
90/// Run the possession check for one chunk against every responsible peer.
91///
92/// Recomputes the expected audit digest from the checker's own canonical copy
93/// of `key`, so the check is meaningful only while the checker still holds the
94/// chunk — which it does immediately after accepting and fresh-replicating a
95/// PUT. If the checker no longer holds it (e.g. pruned), the check is moot and
96/// is skipped without penalising anyone.
97///
98/// A peer that fails to prove possession, including by timeout, is penalised at
99/// `AuditChallenge` severity immediately. A responsive peer is left unrewarded.
100pub(crate) async fn run_possession_check(
101    key: XorName,
102    peers: Vec<PeerId>,
103    p2p_node: &Arc<P2PNode>,
104    storage: &Arc<LmdbStorage>,
105    config: &ReplicationConfig,
106    sync_state: &Arc<RwLock<NeighborSyncState>>,
107    shutdown: &CancellationToken,
108) {
109    let key_hex = hex::encode(key);
110
111    // Read our canonical copy once: the audit digest is recomputed from these
112    // bytes for every peer (hoisted out of the per-peer loop). If we no longer
113    // hold the chunk we cannot verify any peer's proof, and we are no longer a
114    // responsible checker for it — skip without penalising anyone.
115    let local_bytes = match storage.get_raw(&key).await {
116        Ok(Some(bytes)) => bytes,
117        Ok(None) => {
118            debug!("Possession check: checker no longer holds {key_hex}; skipping");
119            return;
120        }
121        Err(e) => {
122            warn!("Possession check: failed to read local {key_hex}: {e}; skipping");
123            return;
124        }
125    };
126
127    // Single-key probe budget, matched to the audit response timeout's
128    // bandwidth-calibrated deadline (tight enough that a relay that must refetch
129    // the bytes blows it, generous for an honest local-disk read).
130    let probe_timeout = config.audit_response_timeout(POSSESSION_PROBE_KEY_COUNT);
131
132    for peer in peers {
133        if shutdown.is_cancelled() {
134            return;
135        }
136        match probe_once(&key, &local_bytes, &peer, p2p_node, probe_timeout).await {
137            ProbeOutcome::Present => {
138                debug!("Possession check: {peer} proved possession of {key_hex}");
139                clear_possession_bootstrap_claim(&peer, sync_state).await;
140            }
141            ProbeOutcome::Failed => {
142                clear_possession_bootstrap_claim(&peer, sync_state).await;
143                report_possession_audit_failure(
144                    &peer,
145                    &key_hex,
146                    "failed to prove possession",
147                    p2p_node,
148                )
149                .await;
150            }
151            ProbeOutcome::Timeout => {
152                report_possession_audit_failure(&peer, &key_hex, "timed out", p2p_node).await;
153            }
154            ProbeOutcome::BootstrapClaim => {
155                handle_possession_bootstrap_claim(&peer, &key_hex, p2p_node, config, sync_state)
156                    .await;
157            }
158            ProbeOutcome::Inconclusive => {
159                debug!(
160                    "Possession check: inconclusive probe of {peer} for {key_hex}; not penalised"
161                );
162            }
163        }
164    }
165}
166
167async fn clear_possession_bootstrap_claim(
168    peer: &PeerId,
169    sync_state: &Arc<RwLock<NeighborSyncState>>,
170) {
171    sync_state.write().await.clear_active_bootstrap_claim(peer);
172}
173
174async fn report_possession_audit_failure(
175    peer: &PeerId,
176    key_hex: &str,
177    reason: &str,
178    p2p_node: &Arc<P2PNode>,
179) {
180    warn!("Possession check: {peer} {reason} for {key_hex}; penalising at audit severity");
181    p2p_node
182        .report_trust_event(
183            peer,
184            TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT),
185        )
186        .await;
187}
188
189async fn handle_possession_bootstrap_claim(
190    peer: &PeerId,
191    key_hex: &str,
192    p2p_node: &Arc<P2PNode>,
193    config: &ReplicationConfig,
194    sync_state: &Arc<RwLock<NeighborSyncState>>,
195) {
196    let (now, observation) = {
197        let now = Instant::now();
198        let mut state = sync_state.write().await;
199        (
200            now,
201            state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period),
202        )
203    };
204
205    match observation {
206        BootstrapClaimObservation::WithinGrace { .. } => {
207            debug!(
208                "Possession check: peer {peer} claims bootstrapping for {key_hex} \
209                 (within grace period)"
210            );
211        }
212        BootstrapClaimObservation::PastGrace { first_seen } => {
213            warn!(
214                "Possession check: peer {peer} claiming bootstrap for {key_hex} past grace period \
215                 ({:?} > {:?}), reporting abuse",
216                now.duration_since(first_seen),
217                config.bootstrap_claim_grace_period,
218            );
219            p2p_node
220                .report_trust_event(
221                    peer,
222                    TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
223                )
224                .await;
225        }
226        BootstrapClaimObservation::Repeated { first_seen } => {
227            warn!(
228                "Possession check: peer {peer} repeated bootstrap claim for {key_hex} after \
229                 previously stopping; first claim was {:?} ago, reporting abuse",
230                now.duration_since(first_seen),
231            );
232            p2p_node
233                .report_trust_event(
234                    peer,
235                    TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
236                )
237                .await;
238        }
239    }
240}
241
242/// Send one single-key cryptographic [`AuditChallenge`] and interpret the
243/// response. The peer proves possession by returning
244/// `compute_audit_digest(nonce, peer, key, bytes)`; absence is proven by the
245/// [`ABSENT_KEY_DIGEST`] sentinel or any digest that does not match the
246/// checker's canonical copy. A transport failure / deadline is a `Timeout`; a
247/// matching bootstrap response is a `BootstrapClaim`; a local encode failure is
248/// `Inconclusive`; peer-side malformed, rejected, or mismatched replies are
249/// `Failed`.
250async fn probe_once(
251    key: &XorName,
252    local_bytes: &[u8],
253    peer: &PeerId,
254    p2p_node: &Arc<P2PNode>,
255    probe_timeout: Duration,
256) -> ProbeOutcome {
257    // Fresh nonce per probe so a stored digest cannot be replayed, and bind the
258    // challenge to this peer's identity so it cannot relay another node's proof.
259    let (nonce, challenge_id) = {
260        let mut rng = rand::thread_rng();
261        let nonce: [u8; 32] = rng.gen();
262        let challenge_id: u64 = rng.gen();
263        (nonce, challenge_id)
264    };
265    let challenge = AuditChallenge {
266        challenge_id,
267        nonce,
268        challenged_peer_id: *peer.as_bytes(),
269        keys: vec![*key],
270    };
271    let msg = ReplicationMessage {
272        request_id: challenge_id,
273        body: ReplicationMessageBody::AuditChallenge(challenge),
274    };
275    let Ok(encoded) = msg.encode() else {
276        warn!(
277            "Failed to encode possession challenge for {}",
278            hex::encode(key)
279        );
280        return ProbeOutcome::Inconclusive;
281    };
282
283    let response = match p2p_node
284        .send_request(peer, REPLICATION_PROTOCOL_ID, encoded, probe_timeout)
285        .await
286    {
287        Ok(response) => response,
288        Err(e) => {
289            debug!("Possession probe to {peer} got no response: {e}");
290            return ProbeOutcome::Timeout;
291        }
292    };
293
294    let decoded = match ReplicationMessage::decode(&response.data) {
295        Ok(decoded) => decoded,
296        Err(e) => {
297            debug!("Failed to decode possession response from {peer}: {e}");
298            return ProbeOutcome::Failed;
299        }
300    };
301
302    let ReplicationMessageBody::AuditResponse(resp) = decoded.body else {
303        debug!("Unexpected possession response type from {peer}");
304        return ProbeOutcome::Failed;
305    };
306
307    interpret_audit_response(
308        key,
309        local_bytes,
310        peer.as_bytes(),
311        &nonce,
312        challenge_id,
313        resp,
314    )
315}
316
317/// Classify an [`AuditResponse`] into a possession verdict. Pure (no I/O): the
318/// digest is verified against `local_bytes`, the checker's canonical copy.
319fn interpret_audit_response(
320    key: &XorName,
321    local_bytes: &[u8],
322    challenged_peer_id: &[u8; 32],
323    nonce: &[u8; 32],
324    challenge_id: u64,
325    response: AuditResponse,
326) -> ProbeOutcome {
327    match response {
328        AuditResponse::Digests {
329            challenge_id: resp_id,
330            digests,
331        } => {
332            if resp_id != challenge_id || digests.len() != 1 {
333                return ProbeOutcome::Failed;
334            }
335            let received = digests[0];
336            if received == ABSENT_KEY_DIGEST {
337                return ProbeOutcome::Failed;
338            }
339            let expected = compute_audit_digest(nonce, challenged_peer_id, key, local_bytes);
340            if received == expected {
341                ProbeOutcome::Present
342            } else {
343                // A non-sentinel digest that does not match our canonical bytes
344                // proves the peer cannot reproduce the content — treat as absent
345                // (matches the audit's DigestMismatch handling).
346                ProbeOutcome::Failed
347            }
348        }
349        AuditResponse::Bootstrapping {
350            challenge_id: resp_id,
351        } => {
352            if resp_id == challenge_id {
353                ProbeOutcome::BootstrapClaim
354            } else {
355                ProbeOutcome::Failed
356            }
357        }
358        AuditResponse::Rejected { .. } => ProbeOutcome::Failed,
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365    use crate::replication::config::{POSSESSION_CHECK_DELAY_MAX, POSSESSION_CHECK_DELAY_MIN};
366
367    const PEER_ID: [u8; 32] = [0x42; 32];
368    const NONCE: [u8; 32] = [0x7a; 32];
369    const CHALLENGE_ID: u64 = 0xDEAD_BEEF;
370    const KEY: XorName = [0x11; 32];
371    const BYTES: &[u8] = b"possession-check payload";
372
373    fn digests_response(challenge_id: u64, digests: Vec<[u8; 32]>) -> AuditResponse {
374        AuditResponse::Digests {
375            challenge_id,
376            digests,
377        }
378    }
379
380    #[test]
381    fn random_delay_is_within_bounds() {
382        for _ in 0..100 {
383            let d = random_delay(POSSESSION_CHECK_DELAY_MIN, POSSESSION_CHECK_DELAY_MAX);
384            assert!(d >= POSSESSION_CHECK_DELAY_MIN);
385            assert!(d <= POSSESSION_CHECK_DELAY_MAX);
386        }
387    }
388
389    #[test]
390    fn matching_digest_is_present() {
391        let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES);
392        let verdict = interpret_audit_response(
393            &KEY,
394            BYTES,
395            &PEER_ID,
396            &NONCE,
397            CHALLENGE_ID,
398            digests_response(CHALLENGE_ID, vec![valid]),
399        );
400        assert_eq!(verdict, ProbeOutcome::Present);
401    }
402
403    #[test]
404    fn absent_sentinel_is_failed() {
405        let verdict = interpret_audit_response(
406            &KEY,
407            BYTES,
408            &PEER_ID,
409            &NONCE,
410            CHALLENGE_ID,
411            digests_response(CHALLENGE_ID, vec![ABSENT_KEY_DIGEST]),
412        );
413        assert_eq!(verdict, ProbeOutcome::Failed);
414    }
415
416    #[test]
417    fn forged_digest_is_failed() {
418        // A peer that lacks the bytes cannot compute the right digest; whatever
419        // non-sentinel value it sends must not match our canonical copy.
420        let forged = [0x99; 32];
421        let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES);
422        assert_ne!(forged, valid, "test fixture must use a wrong digest");
423        let verdict = interpret_audit_response(
424            &KEY,
425            BYTES,
426            &PEER_ID,
427            &NONCE,
428            CHALLENGE_ID,
429            digests_response(CHALLENGE_ID, vec![forged]),
430        );
431        assert_eq!(verdict, ProbeOutcome::Failed);
432    }
433
434    #[test]
435    fn mismatched_challenge_id_is_failed() {
436        let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES);
437        let verdict = interpret_audit_response(
438            &KEY,
439            BYTES,
440            &PEER_ID,
441            &NONCE,
442            CHALLENGE_ID,
443            digests_response(CHALLENGE_ID.wrapping_add(1), vec![valid]),
444        );
445        assert_eq!(verdict, ProbeOutcome::Failed);
446    }
447
448    #[test]
449    fn wrong_arity_is_failed() {
450        let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES);
451        let verdict = interpret_audit_response(
452            &KEY,
453            BYTES,
454            &PEER_ID,
455            &NONCE,
456            CHALLENGE_ID,
457            digests_response(CHALLENGE_ID, vec![valid, ABSENT_KEY_DIGEST]),
458        );
459        assert_eq!(verdict, ProbeOutcome::Failed);
460    }
461
462    #[test]
463    fn bootstrapping_is_bootstrap_claim() {
464        let verdict = interpret_audit_response(
465            &KEY,
466            BYTES,
467            &PEER_ID,
468            &NONCE,
469            CHALLENGE_ID,
470            AuditResponse::Bootstrapping {
471                challenge_id: CHALLENGE_ID,
472            },
473        );
474        assert_eq!(verdict, ProbeOutcome::BootstrapClaim);
475    }
476
477    #[test]
478    fn bootstrapping_with_wrong_challenge_id_is_failed() {
479        let verdict = interpret_audit_response(
480            &KEY,
481            BYTES,
482            &PEER_ID,
483            &NONCE,
484            CHALLENGE_ID,
485            AuditResponse::Bootstrapping {
486                challenge_id: CHALLENGE_ID.wrapping_add(1),
487            },
488        );
489        assert_eq!(verdict, ProbeOutcome::Failed);
490    }
491
492    #[tokio::test]
493    async fn possession_success_clears_active_bootstrap_claim_but_keeps_history() {
494        let peer = PeerId::from_bytes(PEER_ID);
495        let sync_state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(Vec::new())));
496        {
497            let mut state = sync_state.write().await;
498            let now = Instant::now();
499            state.bootstrap_claims.insert(peer, now);
500            state.bootstrap_claim_history.insert(peer, now);
501        }
502
503        clear_possession_bootstrap_claim(&peer, &sync_state).await;
504
505        let state = sync_state.read().await;
506        assert!(!state.bootstrap_claims.contains_key(&peer));
507        assert!(state.bootstrap_claim_history.contains_key(&peer));
508    }
509
510    #[test]
511    fn rejected_is_failed() {
512        let verdict = interpret_audit_response(
513            &KEY,
514            BYTES,
515            &PEER_ID,
516            &NONCE,
517            CHALLENGE_ID,
518            AuditResponse::Rejected {
519                challenge_id: CHALLENGE_ID,
520                reason: "nope".to_string(),
521            },
522        );
523        assert_eq!(verdict, ProbeOutcome::Failed);
524    }
525}