ant_node/replication/admission.rs
1//! Neighbor-sync hint admission rules (Section 7).
2//!
3//! Per-key admission filtering before verification pipeline entry.
4//!
5//! When a neighbor sync hint arrives, each key must pass admission before
6//! entering verification. The admission rules check:
7//! 1. Sender is authenticated and in `LocalRT(self)` (checked before calling
8//! this module).
9//! 2. Key is relevant to the receiver (checked here).
10
11use std::collections::HashSet;
12use std::sync::Arc;
13
14use saorsa_core::identity::PeerId;
15use saorsa_core::P2PNode;
16
17use crate::ant_protocol::XorName;
18use crate::replication::config::{storage_admission_width, ReplicationConfig};
19use crate::replication::paid_list::PaidList;
20use crate::storage::LmdbStorage;
21
22/// Result of admitting a set of hints from a neighbor sync.
23#[derive(Debug)]
24pub struct AdmissionResult {
25 /// Keys admitted into the replica-hint pipeline (fetch-eligible).
26 pub replica_keys: Vec<XorName>,
27 /// Keys admitted into the paid-hint-only pipeline (`PaidForList` update
28 /// only).
29 pub paid_only_keys: Vec<XorName>,
30 /// Keys rejected (not relevant to this node).
31 pub rejected_keys: Vec<XorName>,
32}
33
34/// Check if this node is within a caller-supplied closest-peer width for key
35/// `K`.
36///
37/// Returns `true` if `self_id` is among the `responsibility_width` nearest
38/// peers to `K` in `SelfInclusiveRT`.
39pub async fn is_responsible(
40 self_id: &PeerId,
41 key: &XorName,
42 p2p_node: &Arc<P2PNode>,
43 responsibility_width: usize,
44) -> bool {
45 let closest = p2p_node
46 .dht_manager()
47 .find_closest_nodes_local_with_self(key, responsibility_width)
48 .await;
49 closest.iter().any(|n| n.peer_id == *self_id)
50}
51
52/// Check if this node is in the `PaidCloseGroup` for key `K`.
53///
54/// `PaidCloseGroup` = `paid_list_close_group_size` nearest peers to `K` in
55/// `SelfInclusiveRT`.
56pub async fn is_in_paid_close_group(
57 self_id: &PeerId,
58 key: &XorName,
59 p2p_node: &Arc<P2PNode>,
60 paid_list_close_group_size: usize,
61) -> bool {
62 let closest = p2p_node
63 .dht_manager()
64 .find_closest_nodes_local_with_self(key, paid_list_close_group_size)
65 .await;
66 closest.iter().any(|n| n.peer_id == *self_id)
67}
68
69/// Admit neighbor-sync hints per Section 7.1 rules.
70///
71/// For each key in `replica_hints` and `paid_hints`:
72/// - **Cross-set precedence**: if a key appears in both sets, keep only the
73/// replica-hint entry.
74/// - **Replica hints**: admitted if `self` is in the storage-admission group
75/// (`close_group_size + STORAGE_ADMISSION_MARGIN`) or key already exists in
76/// local store / pending set.
77/// - **Paid hints**: admitted if `self` is in `PaidCloseGroup(K)` or key is
78/// already in `PaidForList`.
79///
80/// Returns an [`AdmissionResult`] with keys sorted into pipelines.
81#[allow(clippy::too_many_arguments, clippy::implicit_hasher)]
82pub async fn admit_hints(
83 self_id: &PeerId,
84 replica_hints: &[XorName],
85 paid_hints: &[XorName],
86 p2p_node: &Arc<P2PNode>,
87 config: &ReplicationConfig,
88 storage: &Arc<LmdbStorage>,
89 paid_list: &Arc<PaidList>,
90 pending_keys: &HashSet<XorName>,
91) -> AdmissionResult {
92 let mut result = AdmissionResult {
93 replica_keys: Vec::new(),
94 paid_only_keys: Vec::new(),
95 rejected_keys: Vec::new(),
96 };
97
98 // Track all processed keys to deduplicate within and across sets.
99 let mut seen = HashSet::new();
100
101 // Process replica hints.
102 for &key in replica_hints {
103 if !seen.insert(key) {
104 continue;
105 }
106
107 // Fast path: already local or pending -- no routing-table lookup needed.
108 let already_local = storage.exists(&key).unwrap_or(false);
109 let already_pending = pending_keys.contains(&key);
110
111 if already_local || already_pending {
112 result.replica_keys.push(key);
113 continue;
114 }
115
116 if is_responsible(
117 self_id,
118 &key,
119 p2p_node,
120 storage_admission_width(config.close_group_size),
121 )
122 .await
123 {
124 result.replica_keys.push(key);
125 } else {
126 result.rejected_keys.push(key);
127 }
128 }
129
130 // Process paid hints. Cross-set dedup is handled by `seen` — any key
131 // already processed in the replica-hints loop above is skipped here.
132 for &key in paid_hints {
133 if !seen.insert(key) {
134 continue;
135 }
136
137 // Fast path: already in PaidForList -- no routing-table lookup needed.
138 let already_paid = paid_list.contains(&key).unwrap_or(false);
139
140 if already_paid {
141 result.paid_only_keys.push(key);
142 continue;
143 }
144
145 if is_in_paid_close_group(self_id, &key, p2p_node, config.paid_list_close_group_size).await
146 {
147 result.paid_only_keys.push(key);
148 } else {
149 result.rejected_keys.push(key);
150 }
151 }
152
153 result
154}
155
156// ---------------------------------------------------------------------------
157// Tests
158// ---------------------------------------------------------------------------
159
160#[cfg(test)]
161#[allow(clippy::unwrap_used, clippy::expect_used)]
162mod tests {
163 use super::*;
164 use crate::client::xor_distance;
165 use crate::replication::config::ReplicationConfig;
166
167 /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
168 fn peer_id_from_byte(b: u8) -> PeerId {
169 let mut bytes = [0u8; 32];
170 bytes[0] = b;
171 PeerId::from_bytes(bytes)
172 }
173
174 /// Build an `XorName` from a single byte (repeated to 32 bytes).
175 fn xor_name_from_byte(b: u8) -> XorName {
176 [b; 32]
177 }
178
179 // -----------------------------------------------------------------------
180 // AdmissionResult construction helpers for pure-logic tests
181 //
182 // The full `admit_hints` function requires a live DHT + LMDB backend.
183 // For unit tests we directly exercise:
184 // 1. Cross-set precedence logic
185 // 2. Deduplication logic
186 // 3. evaluate_key_evidence (in quorum.rs)
187 //
188 // Below we simulate admission by using the pure-logic portions.
189 // -----------------------------------------------------------------------
190
191 #[test]
192 fn cross_set_precedence_replica_wins() {
193 // When a key appears in both replica_hints and paid_hints, the
194 // paid_hints entry should be suppressed by cross-set precedence.
195 let key = xor_name_from_byte(0xAA);
196 let replica_set: HashSet<XorName> = std::iter::once(key).collect();
197
198 // Simulating the paid-hint loop: key is in replica_set, so it should
199 // be skipped.
200 assert!(
201 replica_set.contains(&key),
202 "paid-hint key present in replica set should be skipped"
203 );
204 }
205
206 #[test]
207 fn deduplication_within_replica_hints() {
208 // Duplicate keys in replica_hints should only appear once.
209 let key_a = xor_name_from_byte(0x01);
210 let key_b = xor_name_from_byte(0x02);
211 let hints = vec![key_a, key_b, key_a, key_a, key_b];
212
213 let mut seen = HashSet::new();
214 let mut unique = Vec::new();
215 for &key in &hints {
216 if seen.insert(key) {
217 unique.push(key);
218 }
219 }
220
221 assert_eq!(unique.len(), 2);
222 assert_eq!(unique[0], key_a);
223 assert_eq!(unique[1], key_b);
224 }
225
226 #[test]
227 fn deduplication_across_sets() {
228 // If a key appears in replica_hints AND paid_hints, the paid entry
229 // is skipped because seen already contains it from replica processing.
230 let key = xor_name_from_byte(0xFF);
231 let replica_hints = vec![key];
232 let paid_hints = vec![key];
233
234 let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
235 let mut seen: HashSet<XorName> = HashSet::new();
236
237 // Process replica hints first.
238 for &k in &replica_hints {
239 seen.insert(k);
240 }
241
242 // Process paid hints: key is already in `seen` AND in `replica_set`.
243 let mut paid_admitted = Vec::new();
244 for &k in &paid_hints {
245 if !seen.insert(k) {
246 continue; // duplicate
247 }
248 if replica_set.contains(&k) {
249 continue; // cross-set precedence
250 }
251 paid_admitted.push(k);
252 }
253
254 assert!(
255 paid_admitted.is_empty(),
256 "paid-hint should be suppressed when key is also a replica hint"
257 );
258 }
259
260 #[test]
261 fn admission_result_empty_inputs() {
262 let result = AdmissionResult {
263 replica_keys: Vec::new(),
264 paid_only_keys: Vec::new(),
265 rejected_keys: Vec::new(),
266 };
267
268 assert!(result.replica_keys.is_empty());
269 assert!(result.paid_only_keys.is_empty());
270 assert!(result.rejected_keys.is_empty());
271 }
272
273 #[test]
274 fn out_of_range_keys_rejected_by_distance() {
275 // Simulate rejection: a key whose XOR distance from self is large
276 // should not appear in a close-group of size 3 when there are closer
277 // peers.
278 let _self_id = peer_id_from_byte(0x00);
279 let key = xor_name_from_byte(0xFF);
280 let _config = ReplicationConfig::default();
281
282 // Distance from self (0x00...) to key (0xFF...):
283 let self_xor: XorName = [0u8; 32];
284 let dist = xor_distance(&self_xor, &key);
285
286 // A very far key would have high distance -- this proves the concept.
287 assert_eq!(dist[0], 0xFF, "distance first byte should be 0xFF");
288
289 // Meanwhile a close key would have a small distance.
290 let close_key = xor_name_from_byte(0x01);
291 let close_dist = xor_distance(&self_xor, &close_key);
292 assert_eq!(
293 close_dist[0], 0x01,
294 "close distance first byte should be 0x01"
295 );
296
297 assert!(
298 dist > close_dist,
299 "far key should have greater distance than close key"
300 );
301 }
302
303 #[test]
304 fn config_close_group_sizes_are_valid() {
305 let config = ReplicationConfig::default();
306 assert!(
307 config.close_group_size > 0,
308 "close_group_size must be positive"
309 );
310 assert!(
311 config.paid_list_close_group_size > 0,
312 "paid_list_close_group_size must be positive"
313 );
314 assert!(
315 config.paid_list_close_group_size >= config.close_group_size,
316 "paid_list_close_group_size should be >= close_group_size"
317 );
318 }
319
320 // -----------------------------------------------------------------------
321 // Section 18 scenarios
322 // -----------------------------------------------------------------------
323
324 /// Scenario 5: Unauthorized sync peer — hints from peers not in
325 /// `LocalRT(self)` are dropped and do not enter verification.
326 ///
327 /// Two layers enforce this:
328 /// (a) `handle_sync_request` in `neighbor_sync.rs` returns
329 /// `sender_in_rt = false` when the sender is not in `LocalRT`.
330 /// The caller (`handle_neighbor_sync_request` in `mod.rs`) returns
331 /// early without processing ANY inbound hints. This is the primary
332 /// gate tested at the e2e level (scenario 17 tests the positive
333 /// case).
334 /// (b) Even if a sender IS in `LocalRT`, the per-key relevance check
335 /// (`is_responsible` with storage-admission width /
336 /// `is_in_paid_close_group`) in `admit_hints` still applies. Sender
337 /// identity does not grant key admission.
338 ///
339 /// This test exercises layer (b): the admission pipeline's dedup,
340 /// cross-set precedence, and relevance filtering using the same logic
341 /// that `admit_hints` performs — without the `P2PNode` dependency
342 /// needed for the actual `is_responsible` DHT lookup.
343 #[test]
344 fn scenario_5_sender_does_not_grant_key_relevance() {
345 let key_pending = xor_name_from_byte(0xB0);
346 let key_not_pending = xor_name_from_byte(0xB1);
347 let key_paid_existing = xor_name_from_byte(0xB2);
348 let _sender = peer_id_from_byte(0x01);
349
350 // Simulate local state: only key_pending is in the pending set,
351 // key_paid_existing is in the paid list.
352 let pending: HashSet<XorName> = std::iter::once(key_pending).collect();
353 let paid_set: HashSet<XorName> = std::iter::once(key_paid_existing).collect();
354
355 // Trace through admit_hints logic for replica hints:
356 let replica_hints = [key_pending, key_not_pending];
357 let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
358 let mut seen = HashSet::new();
359 let mut admitted_replica = Vec::new();
360 let mut rejected = Vec::new();
361
362 for &key in &replica_hints {
363 if !seen.insert(key) {
364 continue; // dedup
365 }
366 // Fast path: already pending -> admitted.
367 if pending.contains(&key) {
368 admitted_replica.push(key);
369 continue;
370 }
371 // key_not_pending: not pending, not local -> needs the
372 // storage-admission check. Simulate it returning false.
373 let is_responsible = false;
374 if is_responsible {
375 admitted_replica.push(key);
376 } else {
377 rejected.push(key);
378 }
379 }
380
381 // Trace through paid hints:
382 let paid_hints = [key_paid_existing, key_pending]; // key_pending overlaps with replica
383 let mut admitted_paid = Vec::new();
384
385 for &key in &paid_hints {
386 if !seen.insert(key) {
387 continue; // dedup: key_pending already seen
388 }
389 if replica_set.contains(&key) {
390 continue; // cross-set precedence
391 }
392 // Fast path: already in paid list -> admitted.
393 if paid_set.contains(&key) {
394 admitted_paid.push(key);
395 continue;
396 }
397 rejected.push(key);
398 }
399
400 // Verify outcomes:
401 assert_eq!(
402 admitted_replica,
403 vec![key_pending],
404 "only the pending key should be admitted as replica"
405 );
406 assert_eq!(
407 rejected,
408 vec![key_not_pending],
409 "non-pending, non-responsible key must be rejected"
410 );
411 assert_eq!(
412 admitted_paid,
413 vec![key_paid_existing],
414 "existing paid-list key should be admitted via fast path"
415 );
416
417 // Cross-set precedence: key_pending appeared in both replica and
418 // paid hints — it was processed as replica only, paid duplicate
419 // was deduped.
420 assert!(
421 !admitted_paid.contains(&key_pending),
422 "key in both hint sets must be processed as replica only"
423 );
424 }
425
426 /// Scenario 7: Out-of-range key hint rejected regardless of quorum.
427 ///
428 /// A key whose XOR distance from self is much larger than the distance
429 /// of the storage-admission members fails the `is_responsible` check in
430 /// `admit_hints`. The key never enters the verification pipeline, so
431 /// quorum is irrelevant.
432 ///
433 /// This test exercises the distance-based reasoning that `admit_hints`
434 /// uses, tracing through the same logic path. Full `is_responsible`
435 /// requires a `P2PNode` for DHT lookups; here we verify the distance
436 /// comparison and admission outcome for both close and far keys.
437 #[test]
438 fn scenario_7_out_of_range_key_rejected() {
439 let self_xor: XorName = [0u8; 32];
440
441 // -- Distance proof: far key vs close key --
442
443 let far_key = xor_name_from_byte(0xFF);
444 let close_key = xor_name_from_byte(0x01);
445 let far_dist = xor_distance(&self_xor, &far_key);
446 let close_dist = xor_distance(&self_xor, &close_key);
447
448 assert_eq!(far_dist[0], 0xFF, "far_key distance should be maximal");
449 assert_eq!(close_dist[0], 0x01, "close_key distance should be small");
450 assert!(far_dist > close_dist, "far key is further than close key");
451
452 // -- Simulate admit_hints for these keys --
453 //
454 // When the storage-admission peers are all closer to far_key than
455 // self, `is_responsible(self, far_key)` returns false. The key is
456 // rejected without entering verification or quorum.
457
458 let pending: HashSet<XorName> = HashSet::new();
459 let replica_hints = [far_key, close_key];
460 let mut seen = HashSet::new();
461 let mut admitted = Vec::new();
462 let mut rejected = Vec::new();
463
464 for &key in &replica_hints {
465 if !seen.insert(key) {
466 continue;
467 }
468 // Not pending, not local.
469 if pending.contains(&key) {
470 admitted.push(key);
471 continue;
472 }
473 // Simulate is_responsible: self (0x00) has the full
474 // storage-admission group closer to far_key (0xFF) than itself.
475 // For close_key (0x01), self is very close -> responsible.
476 let distance = xor_distance(&self_xor, &key);
477 let simulated_responsible = distance[0] < 0x80;
478 if simulated_responsible {
479 admitted.push(key);
480 } else {
481 rejected.push(key);
482 }
483 }
484
485 assert_eq!(
486 admitted,
487 vec![close_key],
488 "only close key should be admitted"
489 );
490 assert_eq!(
491 rejected,
492 vec![far_key],
493 "far key should be rejected regardless of quorum — it never enters verification"
494 );
495
496 // Verify the key doesn't sneak in via paid hints either.
497 // far_key was already seen (deduped), so paid processing skips it.
498 let paid_hints = [far_key];
499 let replica_set: HashSet<XorName> = replica_hints.iter().copied().collect();
500 let mut paid_admitted = Vec::new();
501
502 for &key in &paid_hints {
503 if !seen.insert(key) {
504 continue; // already seen from replica processing
505 }
506 if replica_set.contains(&key) {
507 continue; // cross-set precedence
508 }
509 paid_admitted.push(key);
510 }
511
512 assert!(
513 paid_admitted.is_empty(),
514 "far key already processed as replica (and rejected) should not re-enter via paid hints"
515 );
516 }
517}