Skip to main content

ant_node/replication/
admission.rs

1//! Neighbor-sync hint admission rules (Section 7).
2//!
3//! Per-key admission filtering before verification pipeline entry.
4//!
5//! When a neighbor sync hint arrives, each key must pass admission before
6//! entering verification. The admission rules check:
7//! 1. Sender is authenticated and in `LocalRT(self)` (checked before calling
8//!    this module).
9//! 2. Key is relevant to the receiver (checked here).
10
11use std::collections::HashSet;
12use std::sync::Arc;
13
14use saorsa_core::identity::PeerId;
15use saorsa_core::P2PNode;
16
17use crate::ant_protocol::XorName;
18use crate::replication::config::ReplicationConfig;
19use crate::replication::paid_list::PaidList;
20use crate::storage::LmdbStorage;
21
22/// Result of admitting a set of hints from a neighbor sync.
23#[derive(Debug)]
24pub struct AdmissionResult {
25    /// Keys admitted into the replica-hint pipeline (fetch-eligible).
26    pub replica_keys: Vec<XorName>,
27    /// Keys admitted into the paid-hint-only pipeline (`PaidForList` update
28    /// only).
29    pub paid_only_keys: Vec<XorName>,
30    /// Keys rejected (not relevant to this node).
31    pub rejected_keys: Vec<XorName>,
32}
33
34/// Check if this node is responsible for key `K`.
35///
36/// Returns `true` if `self_id` is among the `close_group_size` nearest peers
37/// to `K` in `SelfInclusiveRT`.
38pub async fn is_responsible(
39    self_id: &PeerId,
40    key: &XorName,
41    p2p_node: &Arc<P2PNode>,
42    close_group_size: usize,
43) -> bool {
44    let closest = p2p_node
45        .dht_manager()
46        .find_closest_nodes_local_with_self(key, close_group_size)
47        .await;
48    closest.iter().any(|n| n.peer_id == *self_id)
49}
50
51/// Check if this node is in the `PaidCloseGroup` for key `K`.
52///
53/// `PaidCloseGroup` = `paid_list_close_group_size` nearest peers to `K` in
54/// `SelfInclusiveRT`.
55pub async fn is_in_paid_close_group(
56    self_id: &PeerId,
57    key: &XorName,
58    p2p_node: &Arc<P2PNode>,
59    paid_list_close_group_size: usize,
60) -> bool {
61    let closest = p2p_node
62        .dht_manager()
63        .find_closest_nodes_local_with_self(key, paid_list_close_group_size)
64        .await;
65    closest.iter().any(|n| n.peer_id == *self_id)
66}
67
68/// Admit neighbor-sync hints per Section 7.1 rules.
69///
70/// For each key in `replica_hints` and `paid_hints`:
71/// - **Cross-set precedence**: if a key appears in both sets, keep only the
72///   replica-hint entry.
73/// - **Replica hints**: admitted if `IsResponsible(self, K)` or key already
74///   exists in local store / pending set.
75/// - **Paid hints**: admitted if `self` is in `PaidCloseGroup(K)` or key is
76///   already in `PaidForList`.
77///
78/// Returns an [`AdmissionResult`] with keys sorted into pipelines.
79#[allow(clippy::too_many_arguments, clippy::implicit_hasher)]
80pub async fn admit_hints(
81    self_id: &PeerId,
82    replica_hints: &[XorName],
83    paid_hints: &[XorName],
84    p2p_node: &Arc<P2PNode>,
85    config: &ReplicationConfig,
86    storage: &Arc<LmdbStorage>,
87    paid_list: &Arc<PaidList>,
88    pending_keys: &HashSet<XorName>,
89) -> AdmissionResult {
90    let mut result = AdmissionResult {
91        replica_keys: Vec::new(),
92        paid_only_keys: Vec::new(),
93        rejected_keys: Vec::new(),
94    };
95
96    // Track all processed keys to deduplicate within and across sets.
97    let mut seen = HashSet::new();
98
99    // Process replica hints.
100    for &key in replica_hints {
101        if !seen.insert(key) {
102            continue;
103        }
104
105        // Fast path: already local or pending -- no routing-table lookup needed.
106        let already_local = storage.exists(&key).unwrap_or(false);
107        let already_pending = pending_keys.contains(&key);
108
109        if already_local || already_pending {
110            result.replica_keys.push(key);
111            continue;
112        }
113
114        if is_responsible(self_id, &key, p2p_node, config.close_group_size).await {
115            result.replica_keys.push(key);
116        } else {
117            result.rejected_keys.push(key);
118        }
119    }
120
121    // Process paid hints. Cross-set dedup is handled by `seen` — any key
122    // already processed in the replica-hints loop above is skipped here.
123    for &key in paid_hints {
124        if !seen.insert(key) {
125            continue;
126        }
127
128        // Fast path: already in PaidForList -- no routing-table lookup needed.
129        let already_paid = paid_list.contains(&key).unwrap_or(false);
130
131        if already_paid {
132            result.paid_only_keys.push(key);
133            continue;
134        }
135
136        if is_in_paid_close_group(self_id, &key, p2p_node, config.paid_list_close_group_size).await
137        {
138            result.paid_only_keys.push(key);
139        } else {
140            result.rejected_keys.push(key);
141        }
142    }
143
144    result
145}
146
147// ---------------------------------------------------------------------------
148// Tests
149// ---------------------------------------------------------------------------
150
151#[cfg(test)]
152#[allow(clippy::unwrap_used, clippy::expect_used)]
153mod tests {
154    use super::*;
155    use crate::client::xor_distance;
156    use crate::replication::config::ReplicationConfig;
157
158    /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
159    fn peer_id_from_byte(b: u8) -> PeerId {
160        let mut bytes = [0u8; 32];
161        bytes[0] = b;
162        PeerId::from_bytes(bytes)
163    }
164
165    /// Build an `XorName` from a single byte (repeated to 32 bytes).
166    fn xor_name_from_byte(b: u8) -> XorName {
167        [b; 32]
168    }
169
170    // -----------------------------------------------------------------------
171    // AdmissionResult construction helpers for pure-logic tests
172    //
173    // The full `admit_hints` function requires a live DHT + LMDB backend.
174    // For unit tests we directly exercise:
175    //   1. Cross-set precedence logic
176    //   2. Deduplication logic
177    //   3. evaluate_key_evidence (in quorum.rs)
178    //
179    // Below we simulate admission by using the pure-logic portions.
180    // -----------------------------------------------------------------------
181
182    #[test]
183    fn cross_set_precedence_replica_wins() {
184        // When a key appears in both replica_hints and paid_hints, the
185        // paid_hints entry should be suppressed by cross-set precedence.
186        let key = xor_name_from_byte(0xAA);
187        let replica_set: HashSet<XorName> = std::iter::once(key).collect();
188
189        // Simulating the paid-hint loop: key is in replica_set, so it should
190        // be skipped.
191        assert!(
192            replica_set.contains(&key),
193            "paid-hint key present in replica set should be skipped"
194        );
195    }
196
197    #[test]
198    fn deduplication_within_replica_hints() {
199        // Duplicate keys in replica_hints should only appear once.
200        let key_a = xor_name_from_byte(0x01);
201        let key_b = xor_name_from_byte(0x02);
202        let hints = vec![key_a, key_b, key_a, key_a, key_b];
203
204        let mut seen = HashSet::new();
205        let mut unique = Vec::new();
206        for &key in &hints {
207            if seen.insert(key) {
208                unique.push(key);
209            }
210        }
211
212        assert_eq!(unique.len(), 2);
213        assert_eq!(unique[0], key_a);
214        assert_eq!(unique[1], key_b);
215    }
216
217    #[test]
218    fn deduplication_across_sets() {
219        // If a key appears in replica_hints AND paid_hints, the paid entry
220        // is skipped because seen already contains it from replica processing.
221        let key = xor_name_from_byte(0xFF);
222        let replica_hints = vec![key];
223        let paid_hints = vec![key];
224
225        let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
226        let mut seen: HashSet<XorName> = HashSet::new();
227
228        // Process replica hints first.
229        for &k in &replica_hints {
230            seen.insert(k);
231        }
232
233        // Process paid hints: key is already in `seen` AND in `replica_set`.
234        let mut paid_admitted = Vec::new();
235        for &k in &paid_hints {
236            if !seen.insert(k) {
237                continue; // duplicate
238            }
239            if replica_set.contains(&k) {
240                continue; // cross-set precedence
241            }
242            paid_admitted.push(k);
243        }
244
245        assert!(
246            paid_admitted.is_empty(),
247            "paid-hint should be suppressed when key is also a replica hint"
248        );
249    }
250
251    #[test]
252    fn admission_result_empty_inputs() {
253        let result = AdmissionResult {
254            replica_keys: Vec::new(),
255            paid_only_keys: Vec::new(),
256            rejected_keys: Vec::new(),
257        };
258
259        assert!(result.replica_keys.is_empty());
260        assert!(result.paid_only_keys.is_empty());
261        assert!(result.rejected_keys.is_empty());
262    }
263
264    #[test]
265    fn out_of_range_keys_rejected_by_distance() {
266        // Simulate rejection: a key whose XOR distance from self is large
267        // should not appear in a close-group of size 3 when there are closer
268        // peers.
269        let _self_id = peer_id_from_byte(0x00);
270        let key = xor_name_from_byte(0xFF);
271        let _config = ReplicationConfig::default();
272
273        // Distance from self (0x00...) to key (0xFF...):
274        let self_xor: XorName = [0u8; 32];
275        let dist = xor_distance(&self_xor, &key);
276
277        // A very far key would have high distance -- this proves the concept.
278        assert_eq!(dist[0], 0xFF, "distance first byte should be 0xFF");
279
280        // Meanwhile a close key would have a small distance.
281        let close_key = xor_name_from_byte(0x01);
282        let close_dist = xor_distance(&self_xor, &close_key);
283        assert_eq!(
284            close_dist[0], 0x01,
285            "close distance first byte should be 0x01"
286        );
287
288        assert!(
289            dist > close_dist,
290            "far key should have greater distance than close key"
291        );
292    }
293
294    #[test]
295    fn config_close_group_sizes_are_valid() {
296        let config = ReplicationConfig::default();
297        assert!(
298            config.close_group_size > 0,
299            "close_group_size must be positive"
300        );
301        assert!(
302            config.paid_list_close_group_size > 0,
303            "paid_list_close_group_size must be positive"
304        );
305        assert!(
306            config.paid_list_close_group_size >= config.close_group_size,
307            "paid_list_close_group_size should be >= close_group_size"
308        );
309    }
310
311    // -----------------------------------------------------------------------
312    // Section 18 scenarios
313    // -----------------------------------------------------------------------
314
315    /// Scenario 5: Unauthorized sync peer — hints from peers not in
316    /// `LocalRT(self)` are dropped and do not enter verification.
317    ///
318    /// Two layers enforce this:
319    /// (a) `handle_sync_request` in `neighbor_sync.rs` returns
320    ///     `sender_in_rt = false` when the sender is not in `LocalRT`.
321    ///     The caller (`handle_neighbor_sync_request` in `mod.rs`) returns
322    ///     early without processing ANY inbound hints. This is the primary
323    ///     gate tested at the e2e level (scenario 17 tests the positive
324    ///     case).
325    /// (b) Even if a sender IS in `LocalRT`, the per-key relevance check
326    ///     (`is_responsible` / `is_in_paid_close_group`) in `admit_hints`
327    ///     still applies. Sender identity does not grant key admission.
328    ///
329    /// This test exercises layer (b): the admission pipeline's dedup,
330    /// cross-set precedence, and relevance filtering using the same logic
331    /// that `admit_hints` performs — without the `P2PNode` dependency
332    /// needed for the actual `is_responsible` DHT lookup.
333    #[test]
334    fn scenario_5_sender_does_not_grant_key_relevance() {
335        let key_pending = xor_name_from_byte(0xB0);
336        let key_not_pending = xor_name_from_byte(0xB1);
337        let key_paid_existing = xor_name_from_byte(0xB2);
338        let _sender = peer_id_from_byte(0x01);
339
340        // Simulate local state: only key_pending is in the pending set,
341        // key_paid_existing is in the paid list.
342        let pending: HashSet<XorName> = std::iter::once(key_pending).collect();
343        let paid_set: HashSet<XorName> = std::iter::once(key_paid_existing).collect();
344
345        // Trace through admit_hints logic for replica hints:
346        let replica_hints = [key_pending, key_not_pending];
347        let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
348        let mut seen = HashSet::new();
349        let mut admitted_replica = Vec::new();
350        let mut rejected = Vec::new();
351
352        for &key in &replica_hints {
353            if !seen.insert(key) {
354                continue; // dedup
355            }
356            // Fast path: already pending -> admitted.
357            if pending.contains(&key) {
358                admitted_replica.push(key);
359                continue;
360            }
361            // key_not_pending: not pending, not local -> needs is_responsible.
362            // Simulate is_responsible returning false (out of range).
363            let is_responsible = false;
364            if is_responsible {
365                admitted_replica.push(key);
366            } else {
367                rejected.push(key);
368            }
369        }
370
371        // Trace through paid hints:
372        let paid_hints = [key_paid_existing, key_pending]; // key_pending overlaps with replica
373        let mut admitted_paid = Vec::new();
374
375        for &key in &paid_hints {
376            if !seen.insert(key) {
377                continue; // dedup: key_pending already seen
378            }
379            if replica_set.contains(&key) {
380                continue; // cross-set precedence
381            }
382            // Fast path: already in paid list -> admitted.
383            if paid_set.contains(&key) {
384                admitted_paid.push(key);
385                continue;
386            }
387            rejected.push(key);
388        }
389
390        // Verify outcomes:
391        assert_eq!(
392            admitted_replica,
393            vec![key_pending],
394            "only the pending key should be admitted as replica"
395        );
396        assert_eq!(
397            rejected,
398            vec![key_not_pending],
399            "non-pending, non-responsible key must be rejected"
400        );
401        assert_eq!(
402            admitted_paid,
403            vec![key_paid_existing],
404            "existing paid-list key should be admitted via fast path"
405        );
406
407        // Cross-set precedence: key_pending appeared in both replica and
408        // paid hints — it was processed as replica only, paid duplicate
409        // was deduped.
410        assert!(
411            !admitted_paid.contains(&key_pending),
412            "key in both hint sets must be processed as replica only"
413        );
414    }
415
416    /// Scenario 7: Out-of-range key hint rejected regardless of quorum.
417    ///
418    /// A key whose XOR distance from self is much larger than the distance
419    /// of the close-group members fails the `is_responsible` check in
420    /// `admit_hints`. The key never enters the verification pipeline, so
421    /// quorum is irrelevant.
422    ///
423    /// This test exercises the distance-based reasoning that `admit_hints`
424    /// uses, tracing through the same logic path. Full `is_responsible`
425    /// requires a `P2PNode` for DHT lookups; here we verify the distance
426    /// comparison and admission outcome for both close and far keys.
427    #[test]
428    fn scenario_7_out_of_range_key_rejected() {
429        let self_xor: XorName = [0u8; 32];
430
431        // -- Distance proof: far key vs close key --
432
433        let far_key = xor_name_from_byte(0xFF);
434        let close_key = xor_name_from_byte(0x01);
435        let far_dist = xor_distance(&self_xor, &far_key);
436        let close_dist = xor_distance(&self_xor, &close_key);
437
438        assert_eq!(far_dist[0], 0xFF, "far_key distance should be maximal");
439        assert_eq!(close_dist[0], 0x01, "close_key distance should be small");
440        assert!(far_dist > close_dist, "far key is further than close key");
441
442        // -- Simulate admit_hints for these keys --
443        //
444        // When `close_group_size` peers are all closer to far_key than
445        // self, `is_responsible(self, far_key)` returns false. The key is
446        // rejected without entering verification or quorum.
447
448        let pending: HashSet<XorName> = HashSet::new();
449        let replica_hints = [far_key, close_key];
450        let mut seen = HashSet::new();
451        let mut admitted = Vec::new();
452        let mut rejected = Vec::new();
453
454        for &key in &replica_hints {
455            if !seen.insert(key) {
456                continue;
457            }
458            // Not pending, not local.
459            if pending.contains(&key) {
460                admitted.push(key);
461                continue;
462            }
463            // Simulate is_responsible: self (0x00) has close_group_size
464            // peers closer to far_key (0xFF) than itself -> not responsible.
465            // For close_key (0x01), self is very close -> responsible.
466            let distance = xor_distance(&self_xor, &key);
467            let simulated_responsible = distance[0] < 0x80;
468            if simulated_responsible {
469                admitted.push(key);
470            } else {
471                rejected.push(key);
472            }
473        }
474
475        assert_eq!(
476            admitted,
477            vec![close_key],
478            "only close key should be admitted"
479        );
480        assert_eq!(
481            rejected,
482            vec![far_key],
483            "far key should be rejected regardless of quorum — it never enters verification"
484        );
485
486        // Verify the key doesn't sneak in via paid hints either.
487        // far_key was already seen (deduped), so paid processing skips it.
488        let paid_hints = [far_key];
489        let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
490        let mut paid_admitted = Vec::new();
491
492        for &key in &paid_hints {
493            if !seen.insert(key) {
494                continue; // already seen from replica processing
495            }
496            if replica_set.contains(&key) {
497                continue; // cross-set precedence
498            }
499            paid_admitted.push(key);
500        }
501
502        assert!(
503            paid_admitted.is_empty(),
504            "far key already processed as replica (and rejected) should not re-enter via paid hints"
505        );
506    }
507}