Skip to main content

ant_node/replication/
paid_list.rs

1//! Persistent `PaidForList` backed by LMDB.
2//!
3//! Tracks keys this node believes are paid-authorized. Survives restarts
4//! (Invariant 15). Bounded by `PaidCloseGroup` membership with
5//! hysteresis-based pruning.
6//!
7//! ## Storage layout
8//!
9//! ```text
10//! {root}/paid_list.mdb/   -- LMDB environment directory
11//! ```
12//!
13//! One unnamed database stores set membership: key = 32-byte `XorName`,
14//! value = empty byte slice.
15//!
16//! ## Out-of-range timestamps
17//!
18//! Per-key `PaidOutOfRangeFirstSeen` and `RecordOutOfRangeFirstSeen`
19//! timestamps live in memory only. On restart the hysteresis clock
20//! restarts from zero, which is safe: the prune timer simply starts
21//! fresh.
22
23use crate::ant_protocol::XorName;
24use crate::error::{Error, Result};
25use crate::logging::{debug, trace, warn};
26use heed::types::Bytes;
27use heed::{Database, Env, EnvOpenOptions};
28use parking_lot::RwLock;
29use std::collections::HashMap;
30use std::path::Path;
31use std::time::Instant;
32use tokio::task::spawn_blocking;
33
34use crate::ant_protocol::XORNAME_LEN;
35
36/// Default LMDB map size for the paid list: 256 MiB.
37///
38/// The paid list stores only 32-byte keys with empty values, so this is
39/// generous even for very large close-group memberships.
40const DEFAULT_MAP_SIZE: usize = 256 * 1_024 * 1_024;
41
42/// Persistent paid-for-list backed by LMDB.
43///
44/// Tracks which keys this node believes are paid-authorized.
45/// Survives node restarts via LMDB persistence.
46pub struct PaidList {
47    /// LMDB environment.
48    env: Env,
49    /// The unnamed default database (key = `XorName` bytes, value = empty).
50    db: Database<Bytes, Bytes>,
51    /// In-memory: when each paid key first went out of `PaidCloseGroup` range.
52    /// Cleared on restart (safe: hysteresis clock restarts from zero).
53    paid_out_of_range: RwLock<HashMap<XorName, Instant>>,
54    /// In-memory: when each stored record first went out of
55    /// storage-responsibility range.
56    record_out_of_range: RwLock<HashMap<XorName, Instant>>,
57    /// Cursor used by paid-list pruning to rotate through expired entries when
58    /// the per-pass remote confirmation cap is exhausted.
59    paid_prune_cursor: RwLock<usize>,
60}
61
62impl PaidList {
63    /// Open or create a `PaidList` backed by LMDB at `{root_dir}/paid_list.mdb/`.
64    ///
65    /// # Errors
66    ///
67    /// Returns an error if the LMDB environment cannot be opened or the
68    /// database cannot be created.
69    #[allow(unsafe_code)]
70    pub async fn new(root_dir: &Path) -> Result<Self> {
71        let env_dir = root_dir.join("paid_list.mdb");
72
73        std::fs::create_dir_all(&env_dir)
74            .map_err(|e| Error::Storage(format!("Failed to create paid-list directory: {e}")))?;
75
76        let env_dir_clone = env_dir.clone();
77        let (env, db) = spawn_blocking(move || -> Result<(Env, Database<Bytes, Bytes>)> {
78            // SAFETY: `EnvOpenOptions::open()` is unsafe because LMDB uses
79            // memory-mapped I/O and relies on OS file-locking to prevent
80            // corruption from concurrent access by multiple processes. We
81            // satisfy this by giving each node instance a unique `root_dir`
82            // (typically named by its full 64-hex peer ID), ensuring no two
83            // processes open the same LMDB environment.
84            let env = unsafe {
85                EnvOpenOptions::new()
86                    .map_size(DEFAULT_MAP_SIZE)
87                    .max_dbs(1)
88                    .open(&env_dir_clone)
89                    .map_err(|e| {
90                        Error::Storage(format!("Failed to open paid-list LMDB env: {e}"))
91                    })?
92            };
93
94            let mut wtxn = env
95                .write_txn()
96                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
97            let db: Database<Bytes, Bytes> = env
98                .create_database(&mut wtxn, None)
99                .map_err(|e| Error::Storage(format!("Failed to create paid-list database: {e}")))?;
100            wtxn.commit()
101                .map_err(|e| Error::Storage(format!("Failed to commit db creation: {e}")))?;
102
103            Ok((env, db))
104        })
105        .await
106        .map_err(|e| Error::Storage(format!("Paid-list init task failed: {e}")))??;
107
108        let paid_list = Self {
109            env,
110            db,
111            paid_out_of_range: RwLock::new(HashMap::new()),
112            record_out_of_range: RwLock::new(HashMap::new()),
113            paid_prune_cursor: RwLock::new(0),
114        };
115
116        let count = paid_list.count()?;
117        debug!("Initialized paid-list at {env_dir:?} ({count} existing keys)");
118
119        Ok(paid_list)
120    }
121
122    /// Insert a key into the paid-for set.
123    ///
124    /// Returns `true` if the key was newly added, `false` if it already existed.
125    ///
126    /// # Errors
127    ///
128    /// Returns an error if the LMDB write transaction fails.
129    pub async fn insert(&self, key: &XorName) -> Result<bool> {
130        // Fast-path: avoid write transaction if key already present.
131        if self.contains(key)? {
132            trace!("Paid-list key {} already present", hex::encode(key));
133            return Ok(false);
134        }
135
136        let key_owned = *key;
137        let env = self.env.clone();
138        let db = self.db;
139
140        let was_new = spawn_blocking(move || -> Result<bool> {
141            let mut wtxn = env
142                .write_txn()
143                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
144
145            // Authoritative existence check inside the serialized write txn.
146            if db
147                .get(&wtxn, &key_owned)
148                .map_err(|e| Error::Storage(format!("Failed to check paid-list existence: {e}")))?
149                .is_some()
150            {
151                return Ok(false);
152            }
153
154            db.put(&mut wtxn, &key_owned, &[])
155                .map_err(|e| Error::Storage(format!("Failed to insert into paid-list: {e}")))?;
156            wtxn.commit()
157                .map_err(|e| Error::Storage(format!("Failed to commit paid-list insert: {e}")))?;
158
159            Ok(true)
160        })
161        .await
162        .map_err(|e| Error::Storage(format!("Paid-list insert task failed: {e}")))??;
163
164        if was_new {
165            debug!("Added key {} to paid-list", hex::encode(key));
166        }
167
168        Ok(was_new)
169    }
170
171    /// Remove a key from the paid-for set.
172    ///
173    /// Also clears any in-memory out-of-range timestamps for this key.
174    ///
175    /// Returns `true` if the key existed and was removed, `false` otherwise.
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if the LMDB write transaction fails.
180    pub async fn remove(&self, key: &XorName) -> Result<bool> {
181        let key_owned = *key;
182        let env = self.env.clone();
183        let db = self.db;
184
185        let existed = spawn_blocking(move || -> Result<bool> {
186            let mut wtxn = env
187                .write_txn()
188                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
189            let deleted = db
190                .delete(&mut wtxn, &key_owned)
191                .map_err(|e| Error::Storage(format!("Failed to delete from paid-list: {e}")))?;
192            wtxn.commit()
193                .map_err(|e| Error::Storage(format!("Failed to commit paid-list delete: {e}")))?;
194            Ok(deleted)
195        })
196        .await
197        .map_err(|e| Error::Storage(format!("Paid-list remove task failed: {e}")))??;
198
199        if existed {
200            self.paid_out_of_range.write().remove(key);
201            self.record_out_of_range.write().remove(key);
202            debug!("Removed key {} from paid-list", hex::encode(key));
203        }
204
205        Ok(existed)
206    }
207
208    /// Check whether a key is in the paid-for set.
209    ///
210    /// This is a synchronous read-only operation (no write transaction needed).
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if the LMDB read transaction fails.
215    pub fn contains(&self, key: &XorName) -> Result<bool> {
216        let rtxn = self
217            .env
218            .read_txn()
219            .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
220        let found = self
221            .db
222            .get(&rtxn, key.as_ref())
223            .map_err(|e| Error::Storage(format!("Failed to check paid-list membership: {e}")))?
224            .is_some();
225        Ok(found)
226    }
227
228    /// Return the number of keys in the paid-for set.
229    ///
230    /// This is an O(1) read of the B-tree page header, not a full scan.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if the LMDB read transaction fails.
235    pub fn count(&self) -> Result<u64> {
236        let rtxn = self
237            .env
238            .read_txn()
239            .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
240        let entries = self
241            .db
242            .stat(&rtxn)
243            .map_err(|e| Error::Storage(format!("Failed to read paid-list stats: {e}")))?
244            .entries;
245        Ok(entries as u64)
246    }
247
248    /// Return all keys in the paid-for set.
249    ///
250    /// Used during hint construction to advertise which keys this node holds.
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if the LMDB read transaction or iteration fails.
255    pub fn all_keys(&self) -> Result<Vec<XorName>> {
256        let rtxn = self
257            .env
258            .read_txn()
259            .map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
260        let mut keys = Vec::new();
261        let iter = self
262            .db
263            .iter(&rtxn)
264            .map_err(|e| Error::Storage(format!("Failed to iterate paid-list: {e}")))?;
265        for result in iter {
266            let (key_bytes, _) = result
267                .map_err(|e| Error::Storage(format!("Failed to read paid-list entry: {e}")))?;
268            if key_bytes.len() == XORNAME_LEN {
269                let mut key = [0u8; XORNAME_LEN];
270                key.copy_from_slice(key_bytes);
271                keys.push(key);
272            } else {
273                warn!(
274                    "PaidList: skipping entry with unexpected key length {} (expected {XORNAME_LEN})",
275                    key_bytes.len()
276                );
277            }
278        }
279        Ok(keys)
280    }
281
282    /// Record the `PaidOutOfRangeFirstSeen` timestamp for a key.
283    ///
284    /// Only sets the timestamp if one is not already recorded (first
285    /// observation wins).
286    pub fn set_paid_out_of_range(&self, key: &XorName) {
287        self.paid_out_of_range
288            .write()
289            .entry(*key)
290            .or_insert_with(Instant::now);
291    }
292
293    /// Clear the `PaidOutOfRangeFirstSeen` timestamp for a key.
294    ///
295    /// Called when the key moves back into `PaidCloseGroup` range.
296    pub fn clear_paid_out_of_range(&self, key: &XorName) {
297        self.paid_out_of_range.write().remove(key);
298    }
299
300    /// Get the `PaidOutOfRangeFirstSeen` timestamp for a key.
301    ///
302    /// Returns `None` if the key is currently in range (no timestamp set).
303    pub fn paid_out_of_range_since(&self, key: &XorName) -> Option<Instant> {
304        self.paid_out_of_range.read().get(key).copied()
305    }
306
307    /// Record the `RecordOutOfRangeFirstSeen` timestamp for a key.
308    ///
309    /// Only sets the timestamp if one is not already recorded (first
310    /// observation wins).
311    pub fn set_record_out_of_range(&self, key: &XorName) {
312        self.record_out_of_range
313            .write()
314            .entry(*key)
315            .or_insert_with(Instant::now);
316    }
317
318    /// Clear the `RecordOutOfRangeFirstSeen` timestamp for a key.
319    ///
320    /// Called when the record moves back into storage-responsibility range.
321    pub fn clear_record_out_of_range(&self, key: &XorName) {
322        self.record_out_of_range.write().remove(key);
323    }
324
325    /// Get the `RecordOutOfRangeFirstSeen` timestamp for a key.
326    ///
327    /// Returns `None` if the record is currently in range (no timestamp set).
328    pub fn record_out_of_range_since(&self, key: &XorName) -> Option<Instant> {
329        self.record_out_of_range.read().get(key).copied()
330    }
331
332    /// Starting offset for the next paid-list prune scan.
333    ///
334    /// LMDB iteration order is stable, so a bounded prune pass must rotate its
335    /// verification window or later expired entries can be starved behind
336    /// earlier unconfirmed entries.
337    pub(crate) fn paid_prune_scan_start(&self, paid_key_count: usize) -> usize {
338        if paid_key_count == 0 {
339            return 0;
340        }
341
342        *self.paid_prune_cursor.read() % paid_key_count
343    }
344
345    /// Advance the paid-list prune cursor after one pass.
346    pub(crate) fn advance_paid_prune_cursor(
347        &self,
348        paid_key_count: usize,
349        scan_start: usize,
350        last_selected_offset: Option<usize>,
351    ) {
352        let mut cursor = self.paid_prune_cursor.write();
353        if paid_key_count == 0 {
354            *cursor = 0;
355            return;
356        }
357
358        let advance_by = last_selected_offset.map_or(1, |offset| offset.saturating_add(1));
359        *cursor = (scan_start + advance_by) % paid_key_count;
360    }
361
362    /// Remove multiple keys in a single write transaction.
363    ///
364    /// Also clears any in-memory out-of-range timestamps for removed keys.
365    ///
366    /// Returns the number of keys that were actually present and removed.
367    ///
368    /// # Errors
369    ///
370    /// Returns an error if the LMDB write transaction fails.
371    pub async fn remove_batch(&self, keys: &[XorName]) -> Result<usize> {
372        if keys.is_empty() {
373            return Ok(0);
374        }
375
376        let keys_owned: Vec<XorName> = keys.to_vec();
377        let env = self.env.clone();
378        let db = self.db;
379
380        let removed_keys = spawn_blocking(move || -> Result<Vec<XorName>> {
381            let mut wtxn = env
382                .write_txn()
383                .map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
384
385            let mut removed = Vec::new();
386            for key in &keys_owned {
387                let deleted = db
388                    .delete(&mut wtxn, key.as_ref())
389                    .map_err(|e| Error::Storage(format!("Failed to delete from paid-list: {e}")))?;
390                if deleted {
391                    removed.push(*key);
392                }
393            }
394
395            wtxn.commit()
396                .map_err(|e| Error::Storage(format!("Failed to commit batch remove: {e}")))?;
397
398            Ok(removed)
399        })
400        .await
401        .map_err(|e| Error::Storage(format!("Paid-list batch remove task failed: {e}")))??;
402
403        // Clear in-memory timestamps for all removed keys.
404        // Acquire and release each lock separately to minimize hold time.
405        if !removed_keys.is_empty() {
406            {
407                let mut paid_oor = self.paid_out_of_range.write();
408                for key in &removed_keys {
409                    paid_oor.remove(key);
410                }
411            }
412            {
413                let mut record_oor = self.record_out_of_range.write();
414                for key in &removed_keys {
415                    record_oor.remove(key);
416                }
417            }
418        }
419
420        let count = removed_keys.len();
421        debug!("Batch-removed {count} keys from paid-list");
422        Ok(count)
423    }
424}
425
426#[cfg(test)]
427#[allow(clippy::unwrap_used, clippy::expect_used)]
428mod tests {
429    use super::*;
430    use crate::replication::config::{BOOTSTRAP_CLAIM_GRACE_PERIOD, PRUNE_HYSTERESIS_DURATION};
431    use crate::replication::types::{
432        BootstrapClaimObservation, FailureEvidence, NeighborSyncState,
433    };
434    use saorsa_core::identity::PeerId;
435    use tempfile::TempDir;
436
437    async fn create_test_paid_list() -> (PaidList, TempDir) {
438        let temp_dir = TempDir::new().expect("create temp dir");
439        let paid_list = PaidList::new(temp_dir.path())
440            .await
441            .expect("create paid list");
442        (paid_list, temp_dir)
443    }
444
445    #[tokio::test]
446    async fn test_insert_and_contains() {
447        let (pl, _temp) = create_test_paid_list().await;
448
449        let key: XorName = [0xAA; 32];
450        assert!(!pl.contains(&key).expect("contains before insert"));
451
452        let was_new = pl.insert(&key).await.expect("insert");
453        assert!(was_new);
454
455        assert!(pl.contains(&key).expect("contains after insert"));
456    }
457
458    #[tokio::test]
459    async fn test_insert_duplicate_returns_false() {
460        let (pl, _temp) = create_test_paid_list().await;
461
462        let key: XorName = [0xBB; 32];
463
464        let first = pl.insert(&key).await.expect("first insert");
465        assert!(first);
466
467        let second = pl.insert(&key).await.expect("second insert");
468        assert!(!second);
469    }
470
471    #[tokio::test]
472    async fn test_remove_existing() {
473        let (pl, _temp) = create_test_paid_list().await;
474
475        let key: XorName = [0xCC; 32];
476        pl.insert(&key).await.expect("insert");
477        assert!(pl.contains(&key).expect("contains"));
478
479        let removed = pl.remove(&key).await.expect("remove");
480        assert!(removed);
481        assert!(!pl.contains(&key).expect("contains after remove"));
482    }
483
484    #[tokio::test]
485    async fn test_remove_nonexistent() {
486        let (pl, _temp) = create_test_paid_list().await;
487
488        let key: XorName = [0xDD; 32];
489        let removed = pl.remove(&key).await.expect("remove nonexistent");
490        assert!(!removed);
491    }
492
493    #[tokio::test]
494    async fn test_persistence_across_reopen() {
495        let temp_dir = TempDir::new().expect("create temp dir");
496        let key: XorName = [0xEE; 32];
497
498        // Insert a key, then drop the PaidList.
499        {
500            let pl = PaidList::new(temp_dir.path())
501                .await
502                .expect("create paid list");
503            pl.insert(&key).await.expect("insert");
504            assert_eq!(pl.count().expect("count"), 1);
505        }
506
507        // Re-open and verify the key persisted.
508        {
509            let pl = PaidList::new(temp_dir.path())
510                .await
511                .expect("reopen paid list");
512            assert_eq!(pl.count().expect("count"), 1);
513            assert!(pl.contains(&key).expect("contains after reopen"));
514        }
515    }
516
517    #[tokio::test]
518    async fn test_all_keys() {
519        let (pl, _temp) = create_test_paid_list().await;
520
521        let key_a: XorName = [0x01; 32];
522        let key_b: XorName = [0x02; 32];
523        let key_c: XorName = [0x03; 32];
524
525        pl.insert(&key_a).await.expect("insert 1");
526        pl.insert(&key_b).await.expect("insert 2");
527        pl.insert(&key_c).await.expect("insert 3");
528
529        let mut keys = pl.all_keys().expect("all_keys");
530        keys.sort_unstable();
531
532        let mut expected = vec![key_a, key_b, key_c];
533        expected.sort_unstable();
534
535        assert_eq!(keys, expected);
536    }
537
538    #[tokio::test]
539    async fn test_count() {
540        let (pl, _temp) = create_test_paid_list().await;
541
542        assert_eq!(pl.count().expect("count empty"), 0);
543
544        let key1: XorName = [0x10; 32];
545        let key2: XorName = [0x20; 32];
546
547        pl.insert(&key1).await.expect("insert 1");
548        assert_eq!(pl.count().expect("count after 1"), 1);
549
550        pl.insert(&key2).await.expect("insert 2");
551        assert_eq!(pl.count().expect("count after 2"), 2);
552
553        pl.remove(&key1).await.expect("remove 1");
554        assert_eq!(pl.count().expect("count after remove"), 1);
555    }
556
557    #[tokio::test]
558    async fn test_paid_out_of_range_timestamps() {
559        let (pl, _temp) = create_test_paid_list().await;
560
561        let key: XorName = [0xF0; 32];
562
563        // Initially no timestamp.
564        assert!(pl.paid_out_of_range_since(&key).is_none());
565
566        // Set timestamp.
567        let before = Instant::now();
568        pl.set_paid_out_of_range(&key);
569        let after = Instant::now();
570
571        let ts = pl
572            .paid_out_of_range_since(&key)
573            .expect("timestamp should exist");
574        assert!(ts >= before);
575        assert!(ts <= after);
576
577        // Setting again should not update (first observation wins).
578        std::thread::sleep(std::time::Duration::from_millis(10));
579        pl.set_paid_out_of_range(&key);
580        let ts2 = pl
581            .paid_out_of_range_since(&key)
582            .expect("timestamp should still exist");
583        assert_eq!(ts, ts2);
584
585        // Clear.
586        pl.clear_paid_out_of_range(&key);
587        assert!(pl.paid_out_of_range_since(&key).is_none());
588    }
589
590    #[tokio::test]
591    async fn test_record_out_of_range_timestamps() {
592        let (pl, _temp) = create_test_paid_list().await;
593
594        let key: XorName = [0xF1; 32];
595
596        assert!(pl.record_out_of_range_since(&key).is_none());
597
598        let before = Instant::now();
599        pl.set_record_out_of_range(&key);
600        let after = Instant::now();
601
602        let ts = pl
603            .record_out_of_range_since(&key)
604            .expect("timestamp should exist");
605        assert!(ts >= before);
606        assert!(ts <= after);
607
608        // Setting again should not update.
609        std::thread::sleep(std::time::Duration::from_millis(10));
610        pl.set_record_out_of_range(&key);
611        let ts2 = pl
612            .record_out_of_range_since(&key)
613            .expect("timestamp should still exist");
614        assert_eq!(ts, ts2);
615
616        // Clear.
617        pl.clear_record_out_of_range(&key);
618        assert!(pl.record_out_of_range_since(&key).is_none());
619    }
620
621    #[tokio::test]
622    async fn test_remove_clears_timestamps() {
623        let (pl, _temp) = create_test_paid_list().await;
624
625        let key: XorName = [0xA0; 32];
626        pl.insert(&key).await.expect("insert");
627
628        pl.set_paid_out_of_range(&key);
629        pl.set_record_out_of_range(&key);
630        assert!(pl.paid_out_of_range_since(&key).is_some());
631        assert!(pl.record_out_of_range_since(&key).is_some());
632
633        pl.remove(&key).await.expect("remove");
634        assert!(pl.paid_out_of_range_since(&key).is_none());
635        assert!(pl.record_out_of_range_since(&key).is_none());
636    }
637
638    #[tokio::test]
639    async fn test_remove_batch() {
640        let (pl, _temp) = create_test_paid_list().await;
641
642        let key1: XorName = [0x01; 32];
643        let key2: XorName = [0x02; 32];
644        let key3: XorName = [0x03; 32];
645        let key4: XorName = [0x04; 32]; // not inserted
646
647        pl.insert(&key1).await.expect("insert 1");
648        pl.insert(&key2).await.expect("insert 2");
649        pl.insert(&key3).await.expect("insert 3");
650
651        // Set timestamps to verify they get cleared.
652        pl.set_paid_out_of_range(&key1);
653        pl.set_record_out_of_range(&key2);
654
655        let removed = pl
656            .remove_batch(&[key1, key2, key4])
657            .await
658            .expect("remove_batch");
659        assert_eq!(removed, 2); // key1 and key2 existed; key4 did not
660
661        assert!(!pl.contains(&key1).expect("key1 gone"));
662        assert!(!pl.contains(&key2).expect("key2 gone"));
663        assert!(pl.contains(&key3).expect("key3 still present"));
664        assert_eq!(pl.count().expect("count"), 1);
665
666        // Timestamps should be cleared for removed keys.
667        assert!(pl.paid_out_of_range_since(&key1).is_none());
668        assert!(pl.record_out_of_range_since(&key2).is_none());
669    }
670
671    #[tokio::test]
672    async fn test_remove_batch_empty() {
673        let (pl, _temp) = create_test_paid_list().await;
674
675        let removed = pl.remove_batch(&[]).await.expect("remove_batch empty");
676        assert_eq!(removed, 0);
677    }
678
679    #[tokio::test]
680    async fn paid_prune_cursor_advances_past_selected_window() {
681        const PAID_KEY_COUNT: usize = 10;
682        const START_CURSOR: usize = 2;
683        const LAST_SELECTED_OFFSET: usize = 3;
684        const EXPECTED_CURSOR: usize = 6;
685
686        let (pl, _temp) = create_test_paid_list().await;
687        *pl.paid_prune_cursor.write() = START_CURSOR;
688
689        let scan_start = pl.paid_prune_scan_start(PAID_KEY_COUNT);
690        pl.advance_paid_prune_cursor(PAID_KEY_COUNT, scan_start, Some(LAST_SELECTED_OFFSET));
691
692        assert_eq!(*pl.paid_prune_cursor.read(), EXPECTED_CURSOR);
693    }
694
695    #[tokio::test]
696    async fn paid_prune_cursor_advances_even_without_selected_entry() {
697        const PAID_KEY_COUNT: usize = 10;
698        const START_CURSOR: usize = 9;
699        const EXPECTED_CURSOR: usize = 0;
700
701        let (pl, _temp) = create_test_paid_list().await;
702        *pl.paid_prune_cursor.write() = START_CURSOR;
703
704        let scan_start = pl.paid_prune_scan_start(PAID_KEY_COUNT);
705        pl.advance_paid_prune_cursor(PAID_KEY_COUNT, scan_start, None);
706
707        assert_eq!(*pl.paid_prune_cursor.read(), EXPECTED_CURSOR);
708    }
709
710    #[tokio::test]
711    async fn paid_prune_cursor_resets_for_empty_paid_list() {
712        const STALE_CURSOR: usize = 7;
713        const EMPTY_PAID_KEY_COUNT: usize = 0;
714        const EXPECTED_CURSOR: usize = 0;
715
716        let (pl, _temp) = create_test_paid_list().await;
717        *pl.paid_prune_cursor.write() = STALE_CURSOR;
718
719        let scan_start = pl.paid_prune_scan_start(EMPTY_PAID_KEY_COUNT);
720        pl.advance_paid_prune_cursor(EMPTY_PAID_KEY_COUNT, scan_start, Some(STALE_CURSOR));
721
722        assert_eq!(*pl.paid_prune_cursor.read(), EXPECTED_CURSOR);
723    }
724
725    // -- Scenario tests -------------------------------------------------------
726
727    /// #50: Key goes out of range. `set_record_out_of_range` called.
728    /// Immediately the elapsed time is less than `PRUNE_HYSTERESIS_DURATION`,
729    /// so a prune pass should NOT delete it. We verify the timestamp is
730    /// present but recent.
731    #[tokio::test]
732    async fn scenario_50_hysteresis_prevents_premature_deletion() {
733        let (pl, _temp) = create_test_paid_list().await;
734        let key: XorName = [0x50; 32];
735
736        // Key goes out of range — record the timestamp.
737        pl.set_record_out_of_range(&key);
738
739        // Timestamp must be present.
740        let since = pl
741            .record_out_of_range_since(&key)
742            .expect("timestamp should exist after set");
743
744        // Elapsed time is effectively zero — well below hysteresis threshold.
745        let elapsed = since.elapsed();
746        assert!(
747            elapsed < PRUNE_HYSTERESIS_DURATION,
748            "elapsed ({elapsed:?}) should be far below PRUNE_HYSTERESIS_DURATION ({PRUNE_HYSTERESIS_DURATION:?})",
749        );
750    }
751
752    /// #51: Key goes out of range, then comes back. Timestamp is cleared.
753    /// If the key leaves again, the clock restarts from now.
754    #[tokio::test]
755    async fn scenario_51_timestamp_reset_on_heal() {
756        let (pl, _temp) = create_test_paid_list().await;
757        let key: XorName = [0x51; 32];
758
759        // Key goes out of range.
760        pl.set_record_out_of_range(&key);
761        assert!(
762            pl.record_out_of_range_since(&key).is_some(),
763            "timestamp should exist after going out of range"
764        );
765
766        // Partition heals — key comes back in range.
767        pl.clear_record_out_of_range(&key);
768        assert!(
769            pl.record_out_of_range_since(&key).is_none(),
770            "timestamp should be cleared after heal"
771        );
772
773        // Key goes out of range again — clock must restart.
774        let before_second = Instant::now();
775        pl.set_record_out_of_range(&key);
776        let second_ts = pl
777            .record_out_of_range_since(&key)
778            .expect("timestamp should exist after second out-of-range");
779        assert!(
780            second_ts >= before_second,
781            "new timestamp should be >= the instant before second set call"
782        );
783    }
784
785    /// #52: Paid and record out-of-range timestamps are independent.
786    /// Clearing one must not affect the other.
787    #[tokio::test]
788    async fn scenario_52_paid_and_record_timestamps_independent() {
789        let (pl, _temp) = create_test_paid_list().await;
790        let key: XorName = [0x52; 32];
791
792        // Set both timestamps.
793        pl.set_paid_out_of_range(&key);
794        pl.set_record_out_of_range(&key);
795        assert!(pl.paid_out_of_range_since(&key).is_some());
796        assert!(pl.record_out_of_range_since(&key).is_some());
797
798        // Clear record — paid must survive.
799        pl.clear_record_out_of_range(&key);
800        assert!(
801            pl.paid_out_of_range_since(&key).is_some(),
802            "paid timestamp should survive clearing record timestamp"
803        );
804        assert!(pl.record_out_of_range_since(&key).is_none());
805
806        // Re-set record, then clear paid — record must survive.
807        pl.set_record_out_of_range(&key);
808        pl.clear_paid_out_of_range(&key);
809        assert!(
810            pl.record_out_of_range_since(&key).is_some(),
811            "record timestamp should survive clearing paid timestamp"
812        );
813        assert!(pl.paid_out_of_range_since(&key).is_none());
814    }
815
816    /// #23: Inserting then removing a key from the paid list clears both
817    /// the persistence entry and any in-memory out-of-range timestamps.
818    #[tokio::test]
819    async fn scenario_23_paid_list_entry_removed() {
820        let (pl, _temp) = create_test_paid_list().await;
821        let key: XorName = [0x23; 32];
822
823        // Insert key and attach out-of-range timestamps.
824        pl.insert(&key).await.expect("insert");
825        pl.set_paid_out_of_range(&key);
826        pl.set_record_out_of_range(&key);
827
828        // Remove — should clear everything.
829        let removed = pl.remove(&key).await.expect("remove");
830        assert!(removed, "key should have existed");
831        assert!(
832            !pl.contains(&key).expect("contains check"),
833            "key should be gone from paid list"
834        );
835        assert!(
836            pl.paid_out_of_range_since(&key).is_none(),
837            "paid timestamp should be cleaned up on remove"
838        );
839        assert!(
840            pl.record_out_of_range_since(&key).is_none(),
841            "record timestamp should be cleaned up on remove"
842        );
843    }
844
845    /// #13: Responsible range shrink — out-of-range records have their
846    /// timestamp recorded, are NOT pruned before `PRUNE_HYSTERESIS_DURATION`,
847    /// and new in-range keys are still accepted while out-of-range keys
848    /// await expiry.
849    #[tokio::test]
850    async fn scenario_13_responsible_range_shrink() {
851        let (pl, _temp) = create_test_paid_list().await;
852
853        let out_of_range_key: XorName = [0x13; 32];
854        let in_range_key: XorName = [0x14; 32];
855
856        // Insert both keys initially (simulating they were once in range).
857        pl.insert(&out_of_range_key)
858            .await
859            .expect("insert out-of-range");
860        pl.insert(&in_range_key).await.expect("insert in-range");
861
862        // Range shrinks: out_of_range_key is no longer in responsibility range.
863        // Record RecordOutOfRangeFirstSeen.
864        pl.set_record_out_of_range(&out_of_range_key);
865        let first_seen = pl
866            .record_out_of_range_since(&out_of_range_key)
867            .expect("timestamp should be recorded for out-of-range key");
868
869        // Key must NOT be pruned yet — elapsed time is far below hysteresis.
870        let elapsed = first_seen.elapsed();
871        assert!(
872            elapsed < PRUNE_HYSTERESIS_DURATION,
873            "elapsed {elapsed:?} should be below PRUNE_HYSTERESIS_DURATION \
874             ({PRUNE_HYSTERESIS_DURATION:?}) — key must not be pruned yet"
875        );
876
877        // The key should still exist in the paid list (not deleted).
878        assert!(
879            pl.contains(&out_of_range_key).expect("contains"),
880            "out-of-range key should still be retained within hysteresis window"
881        );
882
883        // In-range key is unaffected — no out-of-range timestamp set.
884        assert!(
885            pl.record_out_of_range_since(&in_range_key).is_none(),
886            "in-range key should have no out-of-range timestamp"
887        );
888
889        // New in-range keys are still accepted during this period.
890        let new_key: XorName = [0x15; 32];
891        let was_new = pl.insert(&new_key).await.expect("insert new key");
892        assert!(
893            was_new,
894            "new in-range keys should still be accepted while out-of-range keys await expiry"
895        );
896        assert!(
897            pl.contains(&new_key).expect("contains new"),
898            "newly inserted in-range key should be present"
899        );
900    }
901
902    /// #46: Bootstrap claim first-seen is recorded and follows
903    /// first-observation-wins semantics.
904    #[test]
905    fn scenario_46_bootstrap_claim_first_seen_recorded() {
906        let peer = PeerId::from_bytes([0x46; 32]);
907        let mut state = NeighborSyncState::new_cycle(vec![peer]);
908
909        let first_ts = Instant::now()
910            .checked_sub(std::time::Duration::from_secs(3))
911            .unwrap_or_else(Instant::now);
912        let observed = state.observe_bootstrap_claim(peer, first_ts, BOOTSTRAP_CLAIM_GRACE_PERIOD);
913        assert_eq!(
914            observed,
915            BootstrapClaimObservation::WithinGrace {
916                first_seen: first_ts
917            }
918        );
919
920        // Verify recorded.
921        assert_eq!(
922            state.bootstrap_claims.get(&peer),
923            Some(&first_ts),
924            "first-seen timestamp should be recorded"
925        );
926        assert_eq!(
927            state.bootstrap_claim_history.get(&peer),
928            Some(&first_ts),
929            "first-ever timestamp should be retained"
930        );
931
932        // Observe again while still active — must NOT overwrite
933        // (first-observation-wins).
934        let later_ts = Instant::now();
935        let observed = state.observe_bootstrap_claim(peer, later_ts, BOOTSTRAP_CLAIM_GRACE_PERIOD);
936        assert_eq!(
937            observed,
938            BootstrapClaimObservation::WithinGrace {
939                first_seen: first_ts
940            }
941        );
942        assert_eq!(
943            state.bootstrap_claims.get(&peer),
944            Some(&first_ts),
945            "second insert must not overwrite the original timestamp"
946        );
947    }
948
949    /// #48: Peer P first claimed bootstrapping >24 h ago.  On next interaction
950    /// the claim age exceeds `BOOTSTRAP_CLAIM_GRACE_PERIOD` and the node emits
951    /// `BootstrapClaimAbuse` evidence.
952    #[test]
953    fn scenario_48_bootstrap_claim_abuse_after_grace_period() {
954        let peer = PeerId::from_bytes([0x48; 32]);
955        let mut state = NeighborSyncState::new_cycle(vec![peer]);
956
957        // Record a first-seen timestamp >24 h ago.
958        // `Instant::checked_sub` can fail on Windows where the epoch is
959        // process-start, so fall back to a recent instant when the platform
960        // cannot represent the backdated time (the claim-age assertion is
961        // skipped in that case since the subtraction itself proves nothing
962        // about production behaviour).
963        let grace_plus_margin = BOOTSTRAP_CLAIM_GRACE_PERIOD + std::time::Duration::from_secs(3600);
964        let first_seen = Instant::now()
965            .checked_sub(grace_plus_margin)
966            .unwrap_or_else(Instant::now);
967        state.bootstrap_claims.insert(peer, first_seen);
968        state.bootstrap_claim_history.insert(peer, first_seen);
969
970        // On platforms that support the backdated instant, verify claim age.
971        let claim_age = Instant::now().duration_since(first_seen);
972        if claim_age > std::time::Duration::from_secs(1) {
973            assert!(
974                claim_age > BOOTSTRAP_CLAIM_GRACE_PERIOD,
975                "claim age {claim_age:?} should exceed grace period {BOOTSTRAP_CLAIM_GRACE_PERIOD:?}",
976            );
977        }
978
979        // Caller constructs BootstrapClaimAbuse evidence.
980        let evidence = FailureEvidence::BootstrapClaimAbuse { peer, first_seen };
981
982        let FailureEvidence::BootstrapClaimAbuse {
983            peer: p,
984            first_seen: fs,
985        } = evidence
986        else {
987            unreachable!("evidence was just constructed as BootstrapClaimAbuse");
988        };
989        assert_eq!(p, peer);
990        assert_eq!(fs, first_seen);
991    }
992
993    /// #49: Bootstrap claim is cleared when a peer responds normally.
994    #[test]
995    fn scenario_49_bootstrap_claim_cleared() {
996        let peer = PeerId::from_bytes([0x49; 32]);
997        let mut state = NeighborSyncState::new_cycle(vec![peer]);
998
999        // Record a bootstrap claim.
1000        let first_seen = Instant::now();
1001        let _ = state.observe_bootstrap_claim(peer, first_seen, BOOTSTRAP_CLAIM_GRACE_PERIOD);
1002        assert!(
1003            state.bootstrap_claims.contains_key(&peer),
1004            "claim should exist after insert"
1005        );
1006
1007        // Peer responded normally — clear only the active claim.
1008        state.clear_active_bootstrap_claim(&peer);
1009        assert!(
1010            !state.bootstrap_claims.contains_key(&peer),
1011            "claim should be gone after normal response"
1012        );
1013        assert!(
1014            state.bootstrap_claim_history.contains_key(&peer),
1015            "claim history should remain so the peer cannot claim bootstrapping again"
1016        );
1017
1018        let repeated =
1019            state.observe_bootstrap_claim(peer, Instant::now(), BOOTSTRAP_CLAIM_GRACE_PERIOD);
1020        assert_eq!(
1021            repeated,
1022            BootstrapClaimObservation::Repeated { first_seen },
1023            "a second bootstrap claim should be classified as repeated abuse"
1024        );
1025    }
1026}