1use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use rand::Rng;
27use saorsa_core::identity::PeerId;
28use saorsa_core::{P2PNode, TrustEvent};
29use tokio::sync::RwLock;
30use tokio_util::sync::CancellationToken;
31
32use crate::ant_protocol::XorName;
33use crate::logging::{debug, warn};
34use crate::replication::config::{
35 ReplicationConfig, AUDIT_FAILURE_TRUST_WEIGHT, REPLICATION_PROTOCOL_ID,
36};
37use crate::replication::protocol::{
38 compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
39 ReplicationMessageBody, ABSENT_KEY_DIGEST,
40};
41use crate::replication::types::{BootstrapClaimObservation, NeighborSyncState};
42use crate::storage::LmdbStorage;
43
44use super::REPLICATION_TRUST_WEIGHT;
45
46const POSSESSION_PROBE_KEY_COUNT: usize = 1;
49
50pub struct PossessionCheckEvent {
52 pub key: XorName,
54 pub peers: Vec<PeerId>,
56}
57
58#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
60enum ProbeOutcome {
61 Present,
63 Failed,
66 Timeout,
69 BootstrapClaim,
72 Inconclusive,
74}
75
76#[must_use]
80pub fn random_delay(min: Duration, max: Duration) -> Duration {
81 let to_millis = |d: Duration| u64::try_from(d.as_millis()).unwrap_or(u64::MAX);
82 let min_ms = to_millis(min);
83 let max_ms = to_millis(max);
84 if min_ms >= max_ms {
85 return min;
86 }
87 Duration::from_millis(rand::thread_rng().gen_range(min_ms..=max_ms))
88}
89
90pub(crate) async fn run_possession_check(
101 key: XorName,
102 peers: Vec<PeerId>,
103 p2p_node: &Arc<P2PNode>,
104 storage: &Arc<LmdbStorage>,
105 config: &ReplicationConfig,
106 sync_state: &Arc<RwLock<NeighborSyncState>>,
107 shutdown: &CancellationToken,
108) {
109 let key_hex = hex::encode(key);
110
111 let local_bytes = match storage.get_raw(&key).await {
116 Ok(Some(bytes)) => bytes,
117 Ok(None) => {
118 debug!("Possession check: checker no longer holds {key_hex}; skipping");
119 return;
120 }
121 Err(e) => {
122 warn!("Possession check: failed to read local {key_hex}: {e}; skipping");
123 return;
124 }
125 };
126
127 let probe_timeout = config.audit_response_timeout(POSSESSION_PROBE_KEY_COUNT);
131
132 for peer in peers {
133 if shutdown.is_cancelled() {
134 return;
135 }
136 match probe_once(&key, &local_bytes, &peer, p2p_node, probe_timeout).await {
137 ProbeOutcome::Present => {
138 debug!("Possession check: {peer} proved possession of {key_hex}");
139 clear_possession_bootstrap_claim(&peer, sync_state).await;
140 }
141 ProbeOutcome::Failed => {
142 clear_possession_bootstrap_claim(&peer, sync_state).await;
143 report_possession_audit_failure(
144 &peer,
145 &key_hex,
146 "failed to prove possession",
147 p2p_node,
148 )
149 .await;
150 }
151 ProbeOutcome::Timeout => {
152 report_possession_audit_failure(&peer, &key_hex, "timed out", p2p_node).await;
153 }
154 ProbeOutcome::BootstrapClaim => {
155 handle_possession_bootstrap_claim(&peer, &key_hex, p2p_node, config, sync_state)
156 .await;
157 }
158 ProbeOutcome::Inconclusive => {
159 debug!(
160 "Possession check: inconclusive probe of {peer} for {key_hex}; not penalised"
161 );
162 }
163 }
164 }
165}
166
167async fn clear_possession_bootstrap_claim(
168 peer: &PeerId,
169 sync_state: &Arc<RwLock<NeighborSyncState>>,
170) {
171 sync_state.write().await.clear_active_bootstrap_claim(peer);
172}
173
174async fn report_possession_audit_failure(
175 peer: &PeerId,
176 key_hex: &str,
177 reason: &str,
178 p2p_node: &Arc<P2PNode>,
179) {
180 warn!("Possession check: {peer} {reason} for {key_hex}; penalising at audit severity");
181 p2p_node
182 .report_trust_event(
183 peer,
184 TrustEvent::ApplicationFailure(AUDIT_FAILURE_TRUST_WEIGHT),
185 )
186 .await;
187}
188
189async fn handle_possession_bootstrap_claim(
190 peer: &PeerId,
191 key_hex: &str,
192 p2p_node: &Arc<P2PNode>,
193 config: &ReplicationConfig,
194 sync_state: &Arc<RwLock<NeighborSyncState>>,
195) {
196 let (now, observation) = {
197 let now = Instant::now();
198 let mut state = sync_state.write().await;
199 (
200 now,
201 state.observe_bootstrap_claim(*peer, now, config.bootstrap_claim_grace_period),
202 )
203 };
204
205 match observation {
206 BootstrapClaimObservation::WithinGrace { .. } => {
207 debug!(
208 "Possession check: peer {peer} claims bootstrapping for {key_hex} \
209 (within grace period)"
210 );
211 }
212 BootstrapClaimObservation::PastGrace { first_seen } => {
213 warn!(
214 "Possession check: peer {peer} claiming bootstrap for {key_hex} past grace period \
215 ({:?} > {:?}), reporting abuse",
216 now.duration_since(first_seen),
217 config.bootstrap_claim_grace_period,
218 );
219 p2p_node
220 .report_trust_event(
221 peer,
222 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
223 )
224 .await;
225 }
226 BootstrapClaimObservation::Repeated { first_seen } => {
227 warn!(
228 "Possession check: peer {peer} repeated bootstrap claim for {key_hex} after \
229 previously stopping; first claim was {:?} ago, reporting abuse",
230 now.duration_since(first_seen),
231 );
232 p2p_node
233 .report_trust_event(
234 peer,
235 TrustEvent::ApplicationFailure(REPLICATION_TRUST_WEIGHT),
236 )
237 .await;
238 }
239 }
240}
241
242async fn probe_once(
251 key: &XorName,
252 local_bytes: &[u8],
253 peer: &PeerId,
254 p2p_node: &Arc<P2PNode>,
255 probe_timeout: Duration,
256) -> ProbeOutcome {
257 let (nonce, challenge_id) = {
260 let mut rng = rand::thread_rng();
261 let nonce: [u8; 32] = rng.gen();
262 let challenge_id: u64 = rng.gen();
263 (nonce, challenge_id)
264 };
265 let challenge = AuditChallenge {
266 challenge_id,
267 nonce,
268 challenged_peer_id: *peer.as_bytes(),
269 keys: vec![*key],
270 };
271 let msg = ReplicationMessage {
272 request_id: challenge_id,
273 body: ReplicationMessageBody::AuditChallenge(challenge),
274 };
275 let Ok(encoded) = msg.encode() else {
276 warn!(
277 "Failed to encode possession challenge for {}",
278 hex::encode(key)
279 );
280 return ProbeOutcome::Inconclusive;
281 };
282
283 let response = match p2p_node
284 .send_request(peer, REPLICATION_PROTOCOL_ID, encoded, probe_timeout)
285 .await
286 {
287 Ok(response) => response,
288 Err(e) => {
289 debug!("Possession probe to {peer} got no response: {e}");
290 return ProbeOutcome::Timeout;
291 }
292 };
293
294 let decoded = match ReplicationMessage::decode(&response.data) {
295 Ok(decoded) => decoded,
296 Err(e) => {
297 debug!("Failed to decode possession response from {peer}: {e}");
298 return ProbeOutcome::Failed;
299 }
300 };
301
302 let ReplicationMessageBody::AuditResponse(resp) = decoded.body else {
303 debug!("Unexpected possession response type from {peer}");
304 return ProbeOutcome::Failed;
305 };
306
307 interpret_audit_response(
308 key,
309 local_bytes,
310 peer.as_bytes(),
311 &nonce,
312 challenge_id,
313 resp,
314 )
315}
316
317fn interpret_audit_response(
320 key: &XorName,
321 local_bytes: &[u8],
322 challenged_peer_id: &[u8; 32],
323 nonce: &[u8; 32],
324 challenge_id: u64,
325 response: AuditResponse,
326) -> ProbeOutcome {
327 match response {
328 AuditResponse::Digests {
329 challenge_id: resp_id,
330 digests,
331 } => {
332 if resp_id != challenge_id || digests.len() != 1 {
333 return ProbeOutcome::Failed;
334 }
335 let received = digests[0];
336 if received == ABSENT_KEY_DIGEST {
337 return ProbeOutcome::Failed;
338 }
339 let expected = compute_audit_digest(nonce, challenged_peer_id, key, local_bytes);
340 if received == expected {
341 ProbeOutcome::Present
342 } else {
343 ProbeOutcome::Failed
347 }
348 }
349 AuditResponse::Bootstrapping {
350 challenge_id: resp_id,
351 } => {
352 if resp_id == challenge_id {
353 ProbeOutcome::BootstrapClaim
354 } else {
355 ProbeOutcome::Failed
356 }
357 }
358 AuditResponse::Rejected { .. } => ProbeOutcome::Failed,
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365 use crate::replication::config::{POSSESSION_CHECK_DELAY_MAX, POSSESSION_CHECK_DELAY_MIN};
366
367 const PEER_ID: [u8; 32] = [0x42; 32];
368 const NONCE: [u8; 32] = [0x7a; 32];
369 const CHALLENGE_ID: u64 = 0xDEAD_BEEF;
370 const KEY: XorName = [0x11; 32];
371 const BYTES: &[u8] = b"possession-check payload";
372
373 fn digests_response(challenge_id: u64, digests: Vec<[u8; 32]>) -> AuditResponse {
374 AuditResponse::Digests {
375 challenge_id,
376 digests,
377 }
378 }
379
380 #[test]
381 fn random_delay_is_within_bounds() {
382 for _ in 0..100 {
383 let d = random_delay(POSSESSION_CHECK_DELAY_MIN, POSSESSION_CHECK_DELAY_MAX);
384 assert!(d >= POSSESSION_CHECK_DELAY_MIN);
385 assert!(d <= POSSESSION_CHECK_DELAY_MAX);
386 }
387 }
388
389 #[test]
390 fn matching_digest_is_present() {
391 let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES);
392 let verdict = interpret_audit_response(
393 &KEY,
394 BYTES,
395 &PEER_ID,
396 &NONCE,
397 CHALLENGE_ID,
398 digests_response(CHALLENGE_ID, vec![valid]),
399 );
400 assert_eq!(verdict, ProbeOutcome::Present);
401 }
402
403 #[test]
404 fn absent_sentinel_is_failed() {
405 let verdict = interpret_audit_response(
406 &KEY,
407 BYTES,
408 &PEER_ID,
409 &NONCE,
410 CHALLENGE_ID,
411 digests_response(CHALLENGE_ID, vec![ABSENT_KEY_DIGEST]),
412 );
413 assert_eq!(verdict, ProbeOutcome::Failed);
414 }
415
416 #[test]
417 fn forged_digest_is_failed() {
418 let forged = [0x99; 32];
421 let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES);
422 assert_ne!(forged, valid, "test fixture must use a wrong digest");
423 let verdict = interpret_audit_response(
424 &KEY,
425 BYTES,
426 &PEER_ID,
427 &NONCE,
428 CHALLENGE_ID,
429 digests_response(CHALLENGE_ID, vec![forged]),
430 );
431 assert_eq!(verdict, ProbeOutcome::Failed);
432 }
433
434 #[test]
435 fn mismatched_challenge_id_is_failed() {
436 let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES);
437 let verdict = interpret_audit_response(
438 &KEY,
439 BYTES,
440 &PEER_ID,
441 &NONCE,
442 CHALLENGE_ID,
443 digests_response(CHALLENGE_ID.wrapping_add(1), vec![valid]),
444 );
445 assert_eq!(verdict, ProbeOutcome::Failed);
446 }
447
448 #[test]
449 fn wrong_arity_is_failed() {
450 let valid = compute_audit_digest(&NONCE, &PEER_ID, &KEY, BYTES);
451 let verdict = interpret_audit_response(
452 &KEY,
453 BYTES,
454 &PEER_ID,
455 &NONCE,
456 CHALLENGE_ID,
457 digests_response(CHALLENGE_ID, vec![valid, ABSENT_KEY_DIGEST]),
458 );
459 assert_eq!(verdict, ProbeOutcome::Failed);
460 }
461
462 #[test]
463 fn bootstrapping_is_bootstrap_claim() {
464 let verdict = interpret_audit_response(
465 &KEY,
466 BYTES,
467 &PEER_ID,
468 &NONCE,
469 CHALLENGE_ID,
470 AuditResponse::Bootstrapping {
471 challenge_id: CHALLENGE_ID,
472 },
473 );
474 assert_eq!(verdict, ProbeOutcome::BootstrapClaim);
475 }
476
477 #[test]
478 fn bootstrapping_with_wrong_challenge_id_is_failed() {
479 let verdict = interpret_audit_response(
480 &KEY,
481 BYTES,
482 &PEER_ID,
483 &NONCE,
484 CHALLENGE_ID,
485 AuditResponse::Bootstrapping {
486 challenge_id: CHALLENGE_ID.wrapping_add(1),
487 },
488 );
489 assert_eq!(verdict, ProbeOutcome::Failed);
490 }
491
492 #[tokio::test]
493 async fn possession_success_clears_active_bootstrap_claim_but_keeps_history() {
494 let peer = PeerId::from_bytes(PEER_ID);
495 let sync_state = Arc::new(RwLock::new(NeighborSyncState::new_cycle(Vec::new())));
496 {
497 let mut state = sync_state.write().await;
498 let now = Instant::now();
499 state.bootstrap_claims.insert(peer, now);
500 state.bootstrap_claim_history.insert(peer, now);
501 }
502
503 clear_possession_bootstrap_claim(&peer, &sync_state).await;
504
505 let state = sync_state.read().await;
506 assert!(!state.bootstrap_claims.contains_key(&peer));
507 assert!(state.bootstrap_claim_history.contains_key(&peer));
508 }
509
510 #[test]
511 fn rejected_is_failed() {
512 let verdict = interpret_audit_response(
513 &KEY,
514 BYTES,
515 &PEER_ID,
516 &NONCE,
517 CHALLENGE_ID,
518 AuditResponse::Rejected {
519 challenge_id: CHALLENGE_ID,
520 reason: "nope".to_string(),
521 },
522 );
523 assert_eq!(verdict, ProbeOutcome::Failed);
524 }
525}