Skip to main content

ant_node/replication/
admission.rs

1//! Neighbor-sync hint admission rules (Section 7).
2//!
3//! Per-key admission filtering before verification pipeline entry.
4//!
5//! When a neighbor sync hint arrives, each key must pass admission before
6//! entering verification. The admission rules check:
7//! 1. Sender is authenticated and in `LocalRT(self)` (checked before calling
8//!    this module).
9//! 2. Key is relevant to the receiver (checked here).
10
11use std::collections::HashSet;
12use std::sync::Arc;
13
14use saorsa_core::identity::PeerId;
15use saorsa_core::P2PNode;
16
17use crate::ant_protocol::XorName;
18use crate::replication::config::{storage_admission_width, ReplicationConfig};
19use crate::replication::paid_list::PaidList;
20use crate::storage::LmdbStorage;
21
22/// Result of admitting a set of hints from a neighbor sync.
23#[derive(Debug)]
24pub struct AdmissionResult {
25    /// Keys admitted into the replica-hint pipeline (fetch-eligible).
26    pub replica_keys: Vec<XorName>,
27    /// Keys admitted into the paid-hint-only pipeline (`PaidForList` update
28    /// only).
29    pub paid_only_keys: Vec<XorName>,
30    /// Keys rejected (not relevant to this node).
31    pub rejected_keys: Vec<XorName>,
32}
33
34/// Check if this node is within a caller-supplied closest-peer width for key
35/// `K`.
36///
37/// Returns `true` if `self_id` is among the `responsibility_width` nearest
38/// peers to `K` in `SelfInclusiveRT`.
39pub async fn is_responsible(
40    self_id: &PeerId,
41    key: &XorName,
42    p2p_node: &Arc<P2PNode>,
43    responsibility_width: usize,
44) -> bool {
45    let closest = p2p_node
46        .dht_manager()
47        .find_closest_nodes_local_with_self(key, responsibility_width)
48        .await;
49    closest.iter().any(|n| n.peer_id == *self_id)
50}
51
52/// Check if this node is in the `PaidCloseGroup` for key `K`.
53///
54/// `PaidCloseGroup` = `paid_list_close_group_size` nearest peers to `K` in
55/// `SelfInclusiveRT`.
56pub async fn is_in_paid_close_group(
57    self_id: &PeerId,
58    key: &XorName,
59    p2p_node: &Arc<P2PNode>,
60    paid_list_close_group_size: usize,
61) -> bool {
62    let closest = p2p_node
63        .dht_manager()
64        .find_closest_nodes_local_with_self(key, paid_list_close_group_size)
65        .await;
66    closest.iter().any(|n| n.peer_id == *self_id)
67}
68
69/// Admit neighbor-sync hints per Section 7.1 rules.
70///
71/// For each key in `replica_hints` and `paid_hints`:
72/// - **Cross-set precedence**: if a key appears in both sets, keep only the
73///   replica-hint entry.
74/// - **Replica hints**: admitted if `self` is in the storage-admission group
75///   (`close_group_size + STORAGE_ADMISSION_MARGIN`) or key already exists in
76///   local store / pending set.
77/// - **Paid hints**: admitted if `self` is in `PaidCloseGroup(K)` or key is
78///   already in `PaidForList`.
79///
80/// Returns an [`AdmissionResult`] with keys sorted into pipelines.
81#[allow(clippy::too_many_arguments, clippy::implicit_hasher)]
82pub async fn admit_hints(
83    self_id: &PeerId,
84    replica_hints: &[XorName],
85    paid_hints: &[XorName],
86    p2p_node: &Arc<P2PNode>,
87    config: &ReplicationConfig,
88    storage: &Arc<LmdbStorage>,
89    paid_list: &Arc<PaidList>,
90    pending_keys: &HashSet<XorName>,
91) -> AdmissionResult {
92    let mut result = AdmissionResult {
93        replica_keys: Vec::new(),
94        paid_only_keys: Vec::new(),
95        rejected_keys: Vec::new(),
96    };
97
98    // Track all processed keys to deduplicate within and across sets.
99    let mut seen = HashSet::new();
100
101    // Process replica hints.
102    for &key in replica_hints {
103        if !seen.insert(key) {
104            continue;
105        }
106
107        // Fast path: already local or pending -- no routing-table lookup needed.
108        let already_local = storage.exists(&key).unwrap_or(false);
109        let already_pending = pending_keys.contains(&key);
110
111        if already_local || already_pending {
112            result.replica_keys.push(key);
113            continue;
114        }
115
116        if is_responsible(
117            self_id,
118            &key,
119            p2p_node,
120            storage_admission_width(config.close_group_size),
121        )
122        .await
123        {
124            result.replica_keys.push(key);
125        } else {
126            result.rejected_keys.push(key);
127        }
128    }
129
130    // Process paid hints. Cross-set dedup is handled by `seen` — any key
131    // already processed in the replica-hints loop above is skipped here.
132    for &key in paid_hints {
133        if !seen.insert(key) {
134            continue;
135        }
136
137        // Fast path: already in PaidForList -- no routing-table lookup needed.
138        let already_paid = paid_list.contains(&key).unwrap_or(false);
139
140        if already_paid {
141            result.paid_only_keys.push(key);
142            continue;
143        }
144
145        if is_in_paid_close_group(self_id, &key, p2p_node, config.paid_list_close_group_size).await
146        {
147            result.paid_only_keys.push(key);
148        } else {
149            result.rejected_keys.push(key);
150        }
151    }
152
153    result
154}
155
156// ---------------------------------------------------------------------------
157// Tests
158// ---------------------------------------------------------------------------
159
160#[cfg(test)]
161#[allow(clippy::unwrap_used, clippy::expect_used)]
162mod tests {
163    use super::*;
164    use crate::client::xor_distance;
165    use crate::replication::config::ReplicationConfig;
166
167    /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
168    fn peer_id_from_byte(b: u8) -> PeerId {
169        let mut bytes = [0u8; 32];
170        bytes[0] = b;
171        PeerId::from_bytes(bytes)
172    }
173
174    /// Build an `XorName` from a single byte (repeated to 32 bytes).
175    fn xor_name_from_byte(b: u8) -> XorName {
176        [b; 32]
177    }
178
179    // -----------------------------------------------------------------------
180    // AdmissionResult construction helpers for pure-logic tests
181    //
182    // The full `admit_hints` function requires a live DHT + LMDB backend.
183    // For unit tests we directly exercise:
184    //   1. Cross-set precedence logic
185    //   2. Deduplication logic
186    //   3. evaluate_key_evidence (in quorum.rs)
187    //
188    // Below we simulate admission by using the pure-logic portions.
189    // -----------------------------------------------------------------------
190
191    #[test]
192    fn cross_set_precedence_replica_wins() {
193        // When a key appears in both replica_hints and paid_hints, the
194        // paid_hints entry should be suppressed by cross-set precedence.
195        let key = xor_name_from_byte(0xAA);
196        let replica_set: HashSet<XorName> = std::iter::once(key).collect();
197
198        // Simulating the paid-hint loop: key is in replica_set, so it should
199        // be skipped.
200        assert!(
201            replica_set.contains(&key),
202            "paid-hint key present in replica set should be skipped"
203        );
204    }
205
206    #[test]
207    fn deduplication_within_replica_hints() {
208        // Duplicate keys in replica_hints should only appear once.
209        let key_a = xor_name_from_byte(0x01);
210        let key_b = xor_name_from_byte(0x02);
211        let hints = vec![key_a, key_b, key_a, key_a, key_b];
212
213        let mut seen = HashSet::new();
214        let mut unique = Vec::new();
215        for &key in &hints {
216            if seen.insert(key) {
217                unique.push(key);
218            }
219        }
220
221        assert_eq!(unique.len(), 2);
222        assert_eq!(unique[0], key_a);
223        assert_eq!(unique[1], key_b);
224    }
225
226    #[test]
227    fn deduplication_across_sets() {
228        // If a key appears in replica_hints AND paid_hints, the paid entry
229        // is skipped because seen already contains it from replica processing.
230        let key = xor_name_from_byte(0xFF);
231        let replica_hints = vec![key];
232        let paid_hints = vec![key];
233
234        let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
235        let mut seen: HashSet<XorName> = HashSet::new();
236
237        // Process replica hints first.
238        for &k in &replica_hints {
239            seen.insert(k);
240        }
241
242        // Process paid hints: key is already in `seen` AND in `replica_set`.
243        let mut paid_admitted = Vec::new();
244        for &k in &paid_hints {
245            if !seen.insert(k) {
246                continue; // duplicate
247            }
248            if replica_set.contains(&k) {
249                continue; // cross-set precedence
250            }
251            paid_admitted.push(k);
252        }
253
254        assert!(
255            paid_admitted.is_empty(),
256            "paid-hint should be suppressed when key is also a replica hint"
257        );
258    }
259
260    #[test]
261    fn admission_result_empty_inputs() {
262        let result = AdmissionResult {
263            replica_keys: Vec::new(),
264            paid_only_keys: Vec::new(),
265            rejected_keys: Vec::new(),
266        };
267
268        assert!(result.replica_keys.is_empty());
269        assert!(result.paid_only_keys.is_empty());
270        assert!(result.rejected_keys.is_empty());
271    }
272
273    #[test]
274    fn out_of_range_keys_rejected_by_distance() {
275        // Simulate rejection: a key whose XOR distance from self is large
276        // should not appear in a close-group of size 3 when there are closer
277        // peers.
278        let _self_id = peer_id_from_byte(0x00);
279        let key = xor_name_from_byte(0xFF);
280        let _config = ReplicationConfig::default();
281
282        // Distance from self (0x00...) to key (0xFF...):
283        let self_xor: XorName = [0u8; 32];
284        let dist = xor_distance(&self_xor, &key);
285
286        // A very far key would have high distance -- this proves the concept.
287        assert_eq!(dist[0], 0xFF, "distance first byte should be 0xFF");
288
289        // Meanwhile a close key would have a small distance.
290        let close_key = xor_name_from_byte(0x01);
291        let close_dist = xor_distance(&self_xor, &close_key);
292        assert_eq!(
293            close_dist[0], 0x01,
294            "close distance first byte should be 0x01"
295        );
296
297        assert!(
298            dist > close_dist,
299            "far key should have greater distance than close key"
300        );
301    }
302
303    #[test]
304    fn config_close_group_sizes_are_valid() {
305        let config = ReplicationConfig::default();
306        assert!(
307            config.close_group_size > 0,
308            "close_group_size must be positive"
309        );
310        assert!(
311            config.paid_list_close_group_size > 0,
312            "paid_list_close_group_size must be positive"
313        );
314        assert!(
315            config.paid_list_close_group_size >= config.close_group_size,
316            "paid_list_close_group_size should be >= close_group_size"
317        );
318    }
319
320    // -----------------------------------------------------------------------
321    // Section 18 scenarios
322    // -----------------------------------------------------------------------
323
324    /// Scenario 5: Unauthorized sync peer — hints from peers not in
325    /// `LocalRT(self)` are dropped and do not enter verification.
326    ///
327    /// Two layers enforce this:
328    /// (a) `handle_sync_request` in `neighbor_sync.rs` returns
329    ///     `sender_in_rt = false` when the sender is not in `LocalRT`.
330    ///     The caller (`handle_neighbor_sync_request` in `mod.rs`) returns
331    ///     early without processing ANY inbound hints. This is the primary
332    ///     gate tested at the e2e level (scenario 17 tests the positive
333    ///     case).
334    /// (b) Even if a sender IS in `LocalRT`, the per-key relevance check
335    ///     (`is_responsible` with storage-admission width /
336    ///     `is_in_paid_close_group`) in `admit_hints` still applies. Sender
337    ///     identity does not grant key admission.
338    ///
339    /// This test exercises layer (b): the admission pipeline's dedup,
340    /// cross-set precedence, and relevance filtering using the same logic
341    /// that `admit_hints` performs — without the `P2PNode` dependency
342    /// needed for the actual `is_responsible` DHT lookup.
343    #[test]
344    fn scenario_5_sender_does_not_grant_key_relevance() {
345        let key_pending = xor_name_from_byte(0xB0);
346        let key_not_pending = xor_name_from_byte(0xB1);
347        let key_paid_existing = xor_name_from_byte(0xB2);
348        let _sender = peer_id_from_byte(0x01);
349
350        // Simulate local state: only key_pending is in the pending set,
351        // key_paid_existing is in the paid list.
352        let pending: HashSet<XorName> = std::iter::once(key_pending).collect();
353        let paid_set: HashSet<XorName> = std::iter::once(key_paid_existing).collect();
354
355        // Trace through admit_hints logic for replica hints:
356        let replica_hints = [key_pending, key_not_pending];
357        let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
358        let mut seen = HashSet::new();
359        let mut admitted_replica = Vec::new();
360        let mut rejected = Vec::new();
361
362        for &key in &replica_hints {
363            if !seen.insert(key) {
364                continue; // dedup
365            }
366            // Fast path: already pending -> admitted.
367            if pending.contains(&key) {
368                admitted_replica.push(key);
369                continue;
370            }
371            // key_not_pending: not pending, not local -> needs the
372            // storage-admission check. Simulate it returning false.
373            let is_responsible = false;
374            if is_responsible {
375                admitted_replica.push(key);
376            } else {
377                rejected.push(key);
378            }
379        }
380
381        // Trace through paid hints:
382        let paid_hints = [key_paid_existing, key_pending]; // key_pending overlaps with replica
383        let mut admitted_paid = Vec::new();
384
385        for &key in &paid_hints {
386            if !seen.insert(key) {
387                continue; // dedup: key_pending already seen
388            }
389            if replica_set.contains(&key) {
390                continue; // cross-set precedence
391            }
392            // Fast path: already in paid list -> admitted.
393            if paid_set.contains(&key) {
394                admitted_paid.push(key);
395                continue;
396            }
397            rejected.push(key);
398        }
399
400        // Verify outcomes:
401        assert_eq!(
402            admitted_replica,
403            vec![key_pending],
404            "only the pending key should be admitted as replica"
405        );
406        assert_eq!(
407            rejected,
408            vec![key_not_pending],
409            "non-pending, non-responsible key must be rejected"
410        );
411        assert_eq!(
412            admitted_paid,
413            vec![key_paid_existing],
414            "existing paid-list key should be admitted via fast path"
415        );
416
417        // Cross-set precedence: key_pending appeared in both replica and
418        // paid hints — it was processed as replica only, paid duplicate
419        // was deduped.
420        assert!(
421            !admitted_paid.contains(&key_pending),
422            "key in both hint sets must be processed as replica only"
423        );
424    }
425
426    /// Scenario 7: Out-of-range key hint rejected regardless of quorum.
427    ///
428    /// A key whose XOR distance from self is much larger than the distance
429    /// of the storage-admission members fails the `is_responsible` check in
430    /// `admit_hints`. The key never enters the verification pipeline, so
431    /// quorum is irrelevant.
432    ///
433    /// This test exercises the distance-based reasoning that `admit_hints`
434    /// uses, tracing through the same logic path. Full `is_responsible`
435    /// requires a `P2PNode` for DHT lookups; here we verify the distance
436    /// comparison and admission outcome for both close and far keys.
437    #[test]
438    fn scenario_7_out_of_range_key_rejected() {
439        let self_xor: XorName = [0u8; 32];
440
441        // -- Distance proof: far key vs close key --
442
443        let far_key = xor_name_from_byte(0xFF);
444        let close_key = xor_name_from_byte(0x01);
445        let far_dist = xor_distance(&self_xor, &far_key);
446        let close_dist = xor_distance(&self_xor, &close_key);
447
448        assert_eq!(far_dist[0], 0xFF, "far_key distance should be maximal");
449        assert_eq!(close_dist[0], 0x01, "close_key distance should be small");
450        assert!(far_dist > close_dist, "far key is further than close key");
451
452        // -- Simulate admit_hints for these keys --
453        //
454        // When the storage-admission peers are all closer to far_key than
455        // self, `is_responsible(self, far_key)` returns false. The key is
456        // rejected without entering verification or quorum.
457
458        let pending: HashSet<XorName> = HashSet::new();
459        let replica_hints = [far_key, close_key];
460        let mut seen = HashSet::new();
461        let mut admitted = Vec::new();
462        let mut rejected = Vec::new();
463
464        for &key in &replica_hints {
465            if !seen.insert(key) {
466                continue;
467            }
468            // Not pending, not local.
469            if pending.contains(&key) {
470                admitted.push(key);
471                continue;
472            }
473            // Simulate is_responsible: self (0x00) has the full
474            // storage-admission group closer to far_key (0xFF) than itself.
475            // For close_key (0x01), self is very close -> responsible.
476            let distance = xor_distance(&self_xor, &key);
477            let simulated_responsible = distance[0] < 0x80;
478            if simulated_responsible {
479                admitted.push(key);
480            } else {
481                rejected.push(key);
482            }
483        }
484
485        assert_eq!(
486            admitted,
487            vec![close_key],
488            "only close key should be admitted"
489        );
490        assert_eq!(
491            rejected,
492            vec![far_key],
493            "far key should be rejected regardless of quorum — it never enters verification"
494        );
495
496        // Verify the key doesn't sneak in via paid hints either.
497        // far_key was already seen (deduped), so paid processing skips it.
498        let paid_hints = [far_key];
499        let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
500        let mut paid_admitted = Vec::new();
501
502        for &key in &paid_hints {
503            if !seen.insert(key) {
504                continue; // already seen from replica processing
505            }
506            if replica_set.contains(&key) {
507                continue; // cross-set precedence
508            }
509            paid_admitted.push(key);
510        }
511
512        assert!(
513            paid_admitted.is_empty(),
514            "far key already processed as replica (and rejected) should not re-enter via paid hints"
515        );
516    }
517}