ant_node/replication/admission.rs
1//! Neighbor-sync hint admission rules (Section 7).
2//!
3//! Per-key admission filtering before verification pipeline entry.
4//!
5//! When a neighbor sync hint arrives, each key must pass admission before
6//! entering verification. The admission rules check:
7//! 1. Sender is authenticated and in `LocalRT(self)` (checked before calling
8//! this module).
9//! 2. Key is relevant to the receiver (checked here).
10
11use std::collections::HashSet;
12use std::sync::Arc;
13
14use saorsa_core::identity::PeerId;
15use saorsa_core::P2PNode;
16
17use crate::ant_protocol::XorName;
18use crate::replication::config::ReplicationConfig;
19use crate::replication::paid_list::PaidList;
20use crate::storage::LmdbStorage;
21
22/// Result of admitting a set of hints from a neighbor sync.
23#[derive(Debug)]
24pub struct AdmissionResult {
25 /// Keys admitted into the replica-hint pipeline (fetch-eligible).
26 pub replica_keys: Vec<XorName>,
27 /// Keys admitted into the paid-hint-only pipeline (`PaidForList` update
28 /// only).
29 pub paid_only_keys: Vec<XorName>,
30 /// Keys rejected (not relevant to this node).
31 pub rejected_keys: Vec<XorName>,
32}
33
34/// Check if this node is responsible for key `K`.
35///
36/// Returns `true` if `self_id` is among the `close_group_size` nearest peers
37/// to `K` in `SelfInclusiveRT`.
38pub async fn is_responsible(
39 self_id: &PeerId,
40 key: &XorName,
41 p2p_node: &Arc<P2PNode>,
42 close_group_size: usize,
43) -> bool {
44 let closest = p2p_node
45 .dht_manager()
46 .find_closest_nodes_local_with_self(key, close_group_size)
47 .await;
48 closest.iter().any(|n| n.peer_id == *self_id)
49}
50
51/// Check if this node is in the `PaidCloseGroup` for key `K`.
52///
53/// `PaidCloseGroup` = `paid_list_close_group_size` nearest peers to `K` in
54/// `SelfInclusiveRT`.
55pub async fn is_in_paid_close_group(
56 self_id: &PeerId,
57 key: &XorName,
58 p2p_node: &Arc<P2PNode>,
59 paid_list_close_group_size: usize,
60) -> bool {
61 let closest = p2p_node
62 .dht_manager()
63 .find_closest_nodes_local_with_self(key, paid_list_close_group_size)
64 .await;
65 closest.iter().any(|n| n.peer_id == *self_id)
66}
67
68/// Admit neighbor-sync hints per Section 7.1 rules.
69///
70/// For each key in `replica_hints` and `paid_hints`:
71/// - **Cross-set precedence**: if a key appears in both sets, keep only the
72/// replica-hint entry.
73/// - **Replica hints**: admitted if `IsResponsible(self, K)` or key already
74/// exists in local store / pending set.
75/// - **Paid hints**: admitted if `self` is in `PaidCloseGroup(K)` or key is
76/// already in `PaidForList`.
77///
78/// Returns an [`AdmissionResult`] with keys sorted into pipelines.
79#[allow(clippy::too_many_arguments, clippy::implicit_hasher)]
80pub async fn admit_hints(
81 self_id: &PeerId,
82 replica_hints: &[XorName],
83 paid_hints: &[XorName],
84 p2p_node: &Arc<P2PNode>,
85 config: &ReplicationConfig,
86 storage: &Arc<LmdbStorage>,
87 paid_list: &Arc<PaidList>,
88 pending_keys: &HashSet<XorName>,
89) -> AdmissionResult {
90 let mut result = AdmissionResult {
91 replica_keys: Vec::new(),
92 paid_only_keys: Vec::new(),
93 rejected_keys: Vec::new(),
94 };
95
96 // Track all processed keys to deduplicate within and across sets.
97 let mut seen = HashSet::new();
98
99 // Process replica hints.
100 for &key in replica_hints {
101 if !seen.insert(key) {
102 continue;
103 }
104
105 // Fast path: already local or pending -- no routing-table lookup needed.
106 let already_local = storage.exists(&key).unwrap_or(false);
107 let already_pending = pending_keys.contains(&key);
108
109 if already_local || already_pending {
110 result.replica_keys.push(key);
111 continue;
112 }
113
114 if is_responsible(self_id, &key, p2p_node, config.close_group_size).await {
115 result.replica_keys.push(key);
116 } else {
117 result.rejected_keys.push(key);
118 }
119 }
120
121 // Process paid hints. Cross-set dedup is handled by `seen` — any key
122 // already processed in the replica-hints loop above is skipped here.
123 for &key in paid_hints {
124 if !seen.insert(key) {
125 continue;
126 }
127
128 // Fast path: already in PaidForList -- no routing-table lookup needed.
129 let already_paid = paid_list.contains(&key).unwrap_or(false);
130
131 if already_paid {
132 result.paid_only_keys.push(key);
133 continue;
134 }
135
136 if is_in_paid_close_group(self_id, &key, p2p_node, config.paid_list_close_group_size).await
137 {
138 result.paid_only_keys.push(key);
139 } else {
140 result.rejected_keys.push(key);
141 }
142 }
143
144 result
145}
146
147// ---------------------------------------------------------------------------
148// Tests
149// ---------------------------------------------------------------------------
150
151#[cfg(test)]
152#[allow(clippy::unwrap_used, clippy::expect_used)]
153mod tests {
154 use super::*;
155 use crate::client::xor_distance;
156 use crate::replication::config::ReplicationConfig;
157
158 /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
159 fn peer_id_from_byte(b: u8) -> PeerId {
160 let mut bytes = [0u8; 32];
161 bytes[0] = b;
162 PeerId::from_bytes(bytes)
163 }
164
165 /// Build an `XorName` from a single byte (repeated to 32 bytes).
166 fn xor_name_from_byte(b: u8) -> XorName {
167 [b; 32]
168 }
169
170 // -----------------------------------------------------------------------
171 // AdmissionResult construction helpers for pure-logic tests
172 //
173 // The full `admit_hints` function requires a live DHT + LMDB backend.
174 // For unit tests we directly exercise:
175 // 1. Cross-set precedence logic
176 // 2. Deduplication logic
177 // 3. evaluate_key_evidence (in quorum.rs)
178 //
179 // Below we simulate admission by using the pure-logic portions.
180 // -----------------------------------------------------------------------
181
182 #[test]
183 fn cross_set_precedence_replica_wins() {
184 // When a key appears in both replica_hints and paid_hints, the
185 // paid_hints entry should be suppressed by cross-set precedence.
186 let key = xor_name_from_byte(0xAA);
187 let replica_set: HashSet<XorName> = std::iter::once(key).collect();
188
189 // Simulating the paid-hint loop: key is in replica_set, so it should
190 // be skipped.
191 assert!(
192 replica_set.contains(&key),
193 "paid-hint key present in replica set should be skipped"
194 );
195 }
196
197 #[test]
198 fn deduplication_within_replica_hints() {
199 // Duplicate keys in replica_hints should only appear once.
200 let key_a = xor_name_from_byte(0x01);
201 let key_b = xor_name_from_byte(0x02);
202 let hints = vec![key_a, key_b, key_a, key_a, key_b];
203
204 let mut seen = HashSet::new();
205 let mut unique = Vec::new();
206 for &key in &hints {
207 if seen.insert(key) {
208 unique.push(key);
209 }
210 }
211
212 assert_eq!(unique.len(), 2);
213 assert_eq!(unique[0], key_a);
214 assert_eq!(unique[1], key_b);
215 }
216
217 #[test]
218 fn deduplication_across_sets() {
219 // If a key appears in replica_hints AND paid_hints, the paid entry
220 // is skipped because seen already contains it from replica processing.
221 let key = xor_name_from_byte(0xFF);
222 let replica_hints = vec![key];
223 let paid_hints = vec![key];
224
225 let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
226 let mut seen: HashSet<XorName> = HashSet::new();
227
228 // Process replica hints first.
229 for &k in &replica_hints {
230 seen.insert(k);
231 }
232
233 // Process paid hints: key is already in `seen` AND in `replica_set`.
234 let mut paid_admitted = Vec::new();
235 for &k in &paid_hints {
236 if !seen.insert(k) {
237 continue; // duplicate
238 }
239 if replica_set.contains(&k) {
240 continue; // cross-set precedence
241 }
242 paid_admitted.push(k);
243 }
244
245 assert!(
246 paid_admitted.is_empty(),
247 "paid-hint should be suppressed when key is also a replica hint"
248 );
249 }
250
251 #[test]
252 fn admission_result_empty_inputs() {
253 let result = AdmissionResult {
254 replica_keys: Vec::new(),
255 paid_only_keys: Vec::new(),
256 rejected_keys: Vec::new(),
257 };
258
259 assert!(result.replica_keys.is_empty());
260 assert!(result.paid_only_keys.is_empty());
261 assert!(result.rejected_keys.is_empty());
262 }
263
264 #[test]
265 fn out_of_range_keys_rejected_by_distance() {
266 // Simulate rejection: a key whose XOR distance from self is large
267 // should not appear in a close-group of size 3 when there are closer
268 // peers.
269 let _self_id = peer_id_from_byte(0x00);
270 let key = xor_name_from_byte(0xFF);
271 let _config = ReplicationConfig::default();
272
273 // Distance from self (0x00...) to key (0xFF...):
274 let self_xor: XorName = [0u8; 32];
275 let dist = xor_distance(&self_xor, &key);
276
277 // A very far key would have high distance -- this proves the concept.
278 assert_eq!(dist[0], 0xFF, "distance first byte should be 0xFF");
279
280 // Meanwhile a close key would have a small distance.
281 let close_key = xor_name_from_byte(0x01);
282 let close_dist = xor_distance(&self_xor, &close_key);
283 assert_eq!(
284 close_dist[0], 0x01,
285 "close distance first byte should be 0x01"
286 );
287
288 assert!(
289 dist > close_dist,
290 "far key should have greater distance than close key"
291 );
292 }
293
294 #[test]
295 fn config_close_group_sizes_are_valid() {
296 let config = ReplicationConfig::default();
297 assert!(
298 config.close_group_size > 0,
299 "close_group_size must be positive"
300 );
301 assert!(
302 config.paid_list_close_group_size > 0,
303 "paid_list_close_group_size must be positive"
304 );
305 assert!(
306 config.paid_list_close_group_size >= config.close_group_size,
307 "paid_list_close_group_size should be >= close_group_size"
308 );
309 }
310
311 // -----------------------------------------------------------------------
312 // Section 18 scenarios
313 // -----------------------------------------------------------------------
314
315 /// Scenario 5: Unauthorized sync peer — hints from peers not in
316 /// `LocalRT(self)` are dropped and do not enter verification.
317 ///
318 /// Two layers enforce this:
319 /// (a) `handle_sync_request` in `neighbor_sync.rs` returns
320 /// `sender_in_rt = false` when the sender is not in `LocalRT`.
321 /// The caller (`handle_neighbor_sync_request` in `mod.rs`) returns
322 /// early without processing ANY inbound hints. This is the primary
323 /// gate tested at the e2e level (scenario 17 tests the positive
324 /// case).
325 /// (b) Even if a sender IS in `LocalRT`, the per-key relevance check
326 /// (`is_responsible` / `is_in_paid_close_group`) in `admit_hints`
327 /// still applies. Sender identity does not grant key admission.
328 ///
329 /// This test exercises layer (b): the admission pipeline's dedup,
330 /// cross-set precedence, and relevance filtering using the same logic
331 /// that `admit_hints` performs — without the `P2PNode` dependency
332 /// needed for the actual `is_responsible` DHT lookup.
333 #[test]
334 fn scenario_5_sender_does_not_grant_key_relevance() {
335 let key_pending = xor_name_from_byte(0xB0);
336 let key_not_pending = xor_name_from_byte(0xB1);
337 let key_paid_existing = xor_name_from_byte(0xB2);
338 let _sender = peer_id_from_byte(0x01);
339
340 // Simulate local state: only key_pending is in the pending set,
341 // key_paid_existing is in the paid list.
342 let pending: HashSet<XorName> = std::iter::once(key_pending).collect();
343 let paid_set: HashSet<XorName> = std::iter::once(key_paid_existing).collect();
344
345 // Trace through admit_hints logic for replica hints:
346 let replica_hints = [key_pending, key_not_pending];
347 let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
348 let mut seen = HashSet::new();
349 let mut admitted_replica = Vec::new();
350 let mut rejected = Vec::new();
351
352 for &key in &replica_hints {
353 if !seen.insert(key) {
354 continue; // dedup
355 }
356 // Fast path: already pending -> admitted.
357 if pending.contains(&key) {
358 admitted_replica.push(key);
359 continue;
360 }
361 // key_not_pending: not pending, not local -> needs is_responsible.
362 // Simulate is_responsible returning false (out of range).
363 let is_responsible = false;
364 if is_responsible {
365 admitted_replica.push(key);
366 } else {
367 rejected.push(key);
368 }
369 }
370
371 // Trace through paid hints:
372 let paid_hints = [key_paid_existing, key_pending]; // key_pending overlaps with replica
373 let mut admitted_paid = Vec::new();
374
375 for &key in &paid_hints {
376 if !seen.insert(key) {
377 continue; // dedup: key_pending already seen
378 }
379 if replica_set.contains(&key) {
380 continue; // cross-set precedence
381 }
382 // Fast path: already in paid list -> admitted.
383 if paid_set.contains(&key) {
384 admitted_paid.push(key);
385 continue;
386 }
387 rejected.push(key);
388 }
389
390 // Verify outcomes:
391 assert_eq!(
392 admitted_replica,
393 vec![key_pending],
394 "only the pending key should be admitted as replica"
395 );
396 assert_eq!(
397 rejected,
398 vec![key_not_pending],
399 "non-pending, non-responsible key must be rejected"
400 );
401 assert_eq!(
402 admitted_paid,
403 vec![key_paid_existing],
404 "existing paid-list key should be admitted via fast path"
405 );
406
407 // Cross-set precedence: key_pending appeared in both replica and
408 // paid hints — it was processed as replica only, paid duplicate
409 // was deduped.
410 assert!(
411 !admitted_paid.contains(&key_pending),
412 "key in both hint sets must be processed as replica only"
413 );
414 }
415
416 /// Scenario 7: Out-of-range key hint rejected regardless of quorum.
417 ///
418 /// A key whose XOR distance from self is much larger than the distance
419 /// of the close-group members fails the `is_responsible` check in
420 /// `admit_hints`. The key never enters the verification pipeline, so
421 /// quorum is irrelevant.
422 ///
423 /// This test exercises the distance-based reasoning that `admit_hints`
424 /// uses, tracing through the same logic path. Full `is_responsible`
425 /// requires a `P2PNode` for DHT lookups; here we verify the distance
426 /// comparison and admission outcome for both close and far keys.
427 #[test]
428 fn scenario_7_out_of_range_key_rejected() {
429 let self_xor: XorName = [0u8; 32];
430
431 // -- Distance proof: far key vs close key --
432
433 let far_key = xor_name_from_byte(0xFF);
434 let close_key = xor_name_from_byte(0x01);
435 let far_dist = xor_distance(&self_xor, &far_key);
436 let close_dist = xor_distance(&self_xor, &close_key);
437
438 assert_eq!(far_dist[0], 0xFF, "far_key distance should be maximal");
439 assert_eq!(close_dist[0], 0x01, "close_key distance should be small");
440 assert!(far_dist > close_dist, "far key is further than close key");
441
442 // -- Simulate admit_hints for these keys --
443 //
444 // When `close_group_size` peers are all closer to far_key than
445 // self, `is_responsible(self, far_key)` returns false. The key is
446 // rejected without entering verification or quorum.
447
448 let pending: HashSet<XorName> = HashSet::new();
449 let replica_hints = [far_key, close_key];
450 let mut seen = HashSet::new();
451 let mut admitted = Vec::new();
452 let mut rejected = Vec::new();
453
454 for &key in &replica_hints {
455 if !seen.insert(key) {
456 continue;
457 }
458 // Not pending, not local.
459 if pending.contains(&key) {
460 admitted.push(key);
461 continue;
462 }
463 // Simulate is_responsible: self (0x00) has close_group_size
464 // peers closer to far_key (0xFF) than itself -> not responsible.
465 // For close_key (0x01), self is very close -> responsible.
466 let distance = xor_distance(&self_xor, &key);
467 let simulated_responsible = distance[0] < 0x80;
468 if simulated_responsible {
469 admitted.push(key);
470 } else {
471 rejected.push(key);
472 }
473 }
474
475 assert_eq!(
476 admitted,
477 vec![close_key],
478 "only close key should be admitted"
479 );
480 assert_eq!(
481 rejected,
482 vec![far_key],
483 "far key should be rejected regardless of quorum — it never enters verification"
484 );
485
486 // Verify the key doesn't sneak in via paid hints either.
487 // far_key was already seen (deduped), so paid processing skips it.
488 let paid_hints = [far_key];
489 let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
490 let mut paid_admitted = Vec::new();
491
492 for &key in &paid_hints {
493 if !seen.insert(key) {
494 continue; // already seen from replica processing
495 }
496 if replica_set.contains(&key) {
497 continue; // cross-set precedence
498 }
499 paid_admitted.push(key);
500 }
501
502 assert!(
503 paid_admitted.is_empty(),
504 "far key already processed as replica (and rejected) should not re-enter via paid hints"
505 );
506 }
507}