use crate::ant_protocol::XorName;
use crate::error::{Error, Result};
use crate::logging::{debug, trace, warn};
use heed::types::Bytes;
use heed::{Database, Env, EnvOpenOptions};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::path::Path;
use std::time::Instant;
use tokio::task::spawn_blocking;
use crate::ant_protocol::XORNAME_LEN;
const DEFAULT_MAP_SIZE: usize = 256 * 1_024 * 1_024;
pub struct PaidList {
env: Env,
db: Database<Bytes, Bytes>,
paid_out_of_range: RwLock<HashMap<XorName, Instant>>,
record_out_of_range: RwLock<HashMap<XorName, Instant>>,
}
impl PaidList {
#[allow(unsafe_code)]
pub async fn new(root_dir: &Path) -> Result<Self> {
let env_dir = root_dir.join("paid_list.mdb");
std::fs::create_dir_all(&env_dir)
.map_err(|e| Error::Storage(format!("Failed to create paid-list directory: {e}")))?;
let env_dir_clone = env_dir.clone();
let (env, db) = spawn_blocking(move || -> Result<(Env, Database<Bytes, Bytes>)> {
let env = unsafe {
EnvOpenOptions::new()
.map_size(DEFAULT_MAP_SIZE)
.max_dbs(1)
.open(&env_dir_clone)
.map_err(|e| {
Error::Storage(format!("Failed to open paid-list LMDB env: {e}"))
})?
};
let mut wtxn = env
.write_txn()
.map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
let db: Database<Bytes, Bytes> = env
.create_database(&mut wtxn, None)
.map_err(|e| Error::Storage(format!("Failed to create paid-list database: {e}")))?;
wtxn.commit()
.map_err(|e| Error::Storage(format!("Failed to commit db creation: {e}")))?;
Ok((env, db))
})
.await
.map_err(|e| Error::Storage(format!("Paid-list init task failed: {e}")))??;
let paid_list = Self {
env,
db,
paid_out_of_range: RwLock::new(HashMap::new()),
record_out_of_range: RwLock::new(HashMap::new()),
};
let count = paid_list.count()?;
debug!("Initialized paid-list at {env_dir:?} ({count} existing keys)");
Ok(paid_list)
}
pub async fn insert(&self, key: &XorName) -> Result<bool> {
if self.contains(key)? {
trace!("Paid-list key {} already present", hex::encode(key));
return Ok(false);
}
let key_owned = *key;
let env = self.env.clone();
let db = self.db;
let was_new = spawn_blocking(move || -> Result<bool> {
let mut wtxn = env
.write_txn()
.map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
if db
.get(&wtxn, &key_owned)
.map_err(|e| Error::Storage(format!("Failed to check paid-list existence: {e}")))?
.is_some()
{
return Ok(false);
}
db.put(&mut wtxn, &key_owned, &[])
.map_err(|e| Error::Storage(format!("Failed to insert into paid-list: {e}")))?;
wtxn.commit()
.map_err(|e| Error::Storage(format!("Failed to commit paid-list insert: {e}")))?;
Ok(true)
})
.await
.map_err(|e| Error::Storage(format!("Paid-list insert task failed: {e}")))??;
if was_new {
debug!("Added key {} to paid-list", hex::encode(key));
}
Ok(was_new)
}
pub async fn remove(&self, key: &XorName) -> Result<bool> {
let key_owned = *key;
let env = self.env.clone();
let db = self.db;
let existed = spawn_blocking(move || -> Result<bool> {
let mut wtxn = env
.write_txn()
.map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
let deleted = db
.delete(&mut wtxn, &key_owned)
.map_err(|e| Error::Storage(format!("Failed to delete from paid-list: {e}")))?;
wtxn.commit()
.map_err(|e| Error::Storage(format!("Failed to commit paid-list delete: {e}")))?;
Ok(deleted)
})
.await
.map_err(|e| Error::Storage(format!("Paid-list remove task failed: {e}")))??;
if existed {
self.paid_out_of_range.write().remove(key);
self.record_out_of_range.write().remove(key);
debug!("Removed key {} from paid-list", hex::encode(key));
}
Ok(existed)
}
pub fn contains(&self, key: &XorName) -> Result<bool> {
let rtxn = self
.env
.read_txn()
.map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
let found = self
.db
.get(&rtxn, key.as_ref())
.map_err(|e| Error::Storage(format!("Failed to check paid-list membership: {e}")))?
.is_some();
Ok(found)
}
pub fn count(&self) -> Result<u64> {
let rtxn = self
.env
.read_txn()
.map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
let entries = self
.db
.stat(&rtxn)
.map_err(|e| Error::Storage(format!("Failed to read paid-list stats: {e}")))?
.entries;
Ok(entries as u64)
}
pub fn all_keys(&self) -> Result<Vec<XorName>> {
let rtxn = self
.env
.read_txn()
.map_err(|e| Error::Storage(format!("Failed to create read txn: {e}")))?;
let mut keys = Vec::new();
let iter = self
.db
.iter(&rtxn)
.map_err(|e| Error::Storage(format!("Failed to iterate paid-list: {e}")))?;
for result in iter {
let (key_bytes, _) = result
.map_err(|e| Error::Storage(format!("Failed to read paid-list entry: {e}")))?;
if key_bytes.len() == XORNAME_LEN {
let mut key = [0u8; XORNAME_LEN];
key.copy_from_slice(key_bytes);
keys.push(key);
} else {
warn!(
"PaidList: skipping entry with unexpected key length {} (expected {XORNAME_LEN})",
key_bytes.len()
);
}
}
Ok(keys)
}
pub fn set_paid_out_of_range(&self, key: &XorName) {
self.paid_out_of_range
.write()
.entry(*key)
.or_insert_with(Instant::now);
}
pub fn clear_paid_out_of_range(&self, key: &XorName) {
self.paid_out_of_range.write().remove(key);
}
pub fn paid_out_of_range_since(&self, key: &XorName) -> Option<Instant> {
self.paid_out_of_range.read().get(key).copied()
}
pub fn set_record_out_of_range(&self, key: &XorName) {
self.record_out_of_range
.write()
.entry(*key)
.or_insert_with(Instant::now);
}
pub fn clear_record_out_of_range(&self, key: &XorName) {
self.record_out_of_range.write().remove(key);
}
pub fn record_out_of_range_since(&self, key: &XorName) -> Option<Instant> {
self.record_out_of_range.read().get(key).copied()
}
pub async fn remove_batch(&self, keys: &[XorName]) -> Result<usize> {
if keys.is_empty() {
return Ok(0);
}
let keys_owned: Vec<XorName> = keys.to_vec();
let env = self.env.clone();
let db = self.db;
let removed_keys = spawn_blocking(move || -> Result<Vec<XorName>> {
let mut wtxn = env
.write_txn()
.map_err(|e| Error::Storage(format!("Failed to create write txn: {e}")))?;
let mut removed = Vec::new();
for key in &keys_owned {
let deleted = db
.delete(&mut wtxn, key.as_ref())
.map_err(|e| Error::Storage(format!("Failed to delete from paid-list: {e}")))?;
if deleted {
removed.push(*key);
}
}
wtxn.commit()
.map_err(|e| Error::Storage(format!("Failed to commit batch remove: {e}")))?;
Ok(removed)
})
.await
.map_err(|e| Error::Storage(format!("Paid-list batch remove task failed: {e}")))??;
if !removed_keys.is_empty() {
{
let mut paid_oor = self.paid_out_of_range.write();
for key in &removed_keys {
paid_oor.remove(key);
}
}
{
let mut record_oor = self.record_out_of_range.write();
for key in &removed_keys {
record_oor.remove(key);
}
}
}
let count = removed_keys.len();
debug!("Batch-removed {count} keys from paid-list");
Ok(count)
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::replication::config::{BOOTSTRAP_CLAIM_GRACE_PERIOD, PRUNE_HYSTERESIS_DURATION};
use crate::replication::types::{FailureEvidence, NeighborSyncState};
use saorsa_core::identity::PeerId;
use tempfile::TempDir;
async fn create_test_paid_list() -> (PaidList, TempDir) {
let temp_dir = TempDir::new().expect("create temp dir");
let paid_list = PaidList::new(temp_dir.path())
.await
.expect("create paid list");
(paid_list, temp_dir)
}
#[tokio::test]
async fn test_insert_and_contains() {
let (pl, _temp) = create_test_paid_list().await;
let key: XorName = [0xAA; 32];
assert!(!pl.contains(&key).expect("contains before insert"));
let was_new = pl.insert(&key).await.expect("insert");
assert!(was_new);
assert!(pl.contains(&key).expect("contains after insert"));
}
#[tokio::test]
async fn test_insert_duplicate_returns_false() {
let (pl, _temp) = create_test_paid_list().await;
let key: XorName = [0xBB; 32];
let first = pl.insert(&key).await.expect("first insert");
assert!(first);
let second = pl.insert(&key).await.expect("second insert");
assert!(!second);
}
#[tokio::test]
async fn test_remove_existing() {
let (pl, _temp) = create_test_paid_list().await;
let key: XorName = [0xCC; 32];
pl.insert(&key).await.expect("insert");
assert!(pl.contains(&key).expect("contains"));
let removed = pl.remove(&key).await.expect("remove");
assert!(removed);
assert!(!pl.contains(&key).expect("contains after remove"));
}
#[tokio::test]
async fn test_remove_nonexistent() {
let (pl, _temp) = create_test_paid_list().await;
let key: XorName = [0xDD; 32];
let removed = pl.remove(&key).await.expect("remove nonexistent");
assert!(!removed);
}
#[tokio::test]
async fn test_persistence_across_reopen() {
let temp_dir = TempDir::new().expect("create temp dir");
let key: XorName = [0xEE; 32];
{
let pl = PaidList::new(temp_dir.path())
.await
.expect("create paid list");
pl.insert(&key).await.expect("insert");
assert_eq!(pl.count().expect("count"), 1);
}
{
let pl = PaidList::new(temp_dir.path())
.await
.expect("reopen paid list");
assert_eq!(pl.count().expect("count"), 1);
assert!(pl.contains(&key).expect("contains after reopen"));
}
}
#[tokio::test]
async fn test_all_keys() {
let (pl, _temp) = create_test_paid_list().await;
let key_a: XorName = [0x01; 32];
let key_b: XorName = [0x02; 32];
let key_c: XorName = [0x03; 32];
pl.insert(&key_a).await.expect("insert 1");
pl.insert(&key_b).await.expect("insert 2");
pl.insert(&key_c).await.expect("insert 3");
let mut keys = pl.all_keys().expect("all_keys");
keys.sort_unstable();
let mut expected = vec![key_a, key_b, key_c];
expected.sort_unstable();
assert_eq!(keys, expected);
}
#[tokio::test]
async fn test_count() {
let (pl, _temp) = create_test_paid_list().await;
assert_eq!(pl.count().expect("count empty"), 0);
let key1: XorName = [0x10; 32];
let key2: XorName = [0x20; 32];
pl.insert(&key1).await.expect("insert 1");
assert_eq!(pl.count().expect("count after 1"), 1);
pl.insert(&key2).await.expect("insert 2");
assert_eq!(pl.count().expect("count after 2"), 2);
pl.remove(&key1).await.expect("remove 1");
assert_eq!(pl.count().expect("count after remove"), 1);
}
#[tokio::test]
async fn test_paid_out_of_range_timestamps() {
let (pl, _temp) = create_test_paid_list().await;
let key: XorName = [0xF0; 32];
assert!(pl.paid_out_of_range_since(&key).is_none());
let before = Instant::now();
pl.set_paid_out_of_range(&key);
let after = Instant::now();
let ts = pl
.paid_out_of_range_since(&key)
.expect("timestamp should exist");
assert!(ts >= before);
assert!(ts <= after);
std::thread::sleep(std::time::Duration::from_millis(10));
pl.set_paid_out_of_range(&key);
let ts2 = pl
.paid_out_of_range_since(&key)
.expect("timestamp should still exist");
assert_eq!(ts, ts2);
pl.clear_paid_out_of_range(&key);
assert!(pl.paid_out_of_range_since(&key).is_none());
}
#[tokio::test]
async fn test_record_out_of_range_timestamps() {
let (pl, _temp) = create_test_paid_list().await;
let key: XorName = [0xF1; 32];
assert!(pl.record_out_of_range_since(&key).is_none());
let before = Instant::now();
pl.set_record_out_of_range(&key);
let after = Instant::now();
let ts = pl
.record_out_of_range_since(&key)
.expect("timestamp should exist");
assert!(ts >= before);
assert!(ts <= after);
std::thread::sleep(std::time::Duration::from_millis(10));
pl.set_record_out_of_range(&key);
let ts2 = pl
.record_out_of_range_since(&key)
.expect("timestamp should still exist");
assert_eq!(ts, ts2);
pl.clear_record_out_of_range(&key);
assert!(pl.record_out_of_range_since(&key).is_none());
}
#[tokio::test]
async fn test_remove_clears_timestamps() {
let (pl, _temp) = create_test_paid_list().await;
let key: XorName = [0xA0; 32];
pl.insert(&key).await.expect("insert");
pl.set_paid_out_of_range(&key);
pl.set_record_out_of_range(&key);
assert!(pl.paid_out_of_range_since(&key).is_some());
assert!(pl.record_out_of_range_since(&key).is_some());
pl.remove(&key).await.expect("remove");
assert!(pl.paid_out_of_range_since(&key).is_none());
assert!(pl.record_out_of_range_since(&key).is_none());
}
#[tokio::test]
async fn test_remove_batch() {
let (pl, _temp) = create_test_paid_list().await;
let key1: XorName = [0x01; 32];
let key2: XorName = [0x02; 32];
let key3: XorName = [0x03; 32];
let key4: XorName = [0x04; 32];
pl.insert(&key1).await.expect("insert 1");
pl.insert(&key2).await.expect("insert 2");
pl.insert(&key3).await.expect("insert 3");
pl.set_paid_out_of_range(&key1);
pl.set_record_out_of_range(&key2);
let removed = pl
.remove_batch(&[key1, key2, key4])
.await
.expect("remove_batch");
assert_eq!(removed, 2);
assert!(!pl.contains(&key1).expect("key1 gone"));
assert!(!pl.contains(&key2).expect("key2 gone"));
assert!(pl.contains(&key3).expect("key3 still present"));
assert_eq!(pl.count().expect("count"), 1);
assert!(pl.paid_out_of_range_since(&key1).is_none());
assert!(pl.record_out_of_range_since(&key2).is_none());
}
#[tokio::test]
async fn test_remove_batch_empty() {
let (pl, _temp) = create_test_paid_list().await;
let removed = pl.remove_batch(&[]).await.expect("remove_batch empty");
assert_eq!(removed, 0);
}
#[tokio::test]
async fn scenario_50_hysteresis_prevents_premature_deletion() {
let (pl, _temp) = create_test_paid_list().await;
let key: XorName = [0x50; 32];
pl.set_record_out_of_range(&key);
let since = pl
.record_out_of_range_since(&key)
.expect("timestamp should exist after set");
let elapsed = since.elapsed();
assert!(
elapsed < PRUNE_HYSTERESIS_DURATION,
"elapsed ({elapsed:?}) should be far below PRUNE_HYSTERESIS_DURATION ({PRUNE_HYSTERESIS_DURATION:?})",
);
}
#[tokio::test]
async fn scenario_51_timestamp_reset_on_heal() {
let (pl, _temp) = create_test_paid_list().await;
let key: XorName = [0x51; 32];
pl.set_record_out_of_range(&key);
assert!(
pl.record_out_of_range_since(&key).is_some(),
"timestamp should exist after going out of range"
);
pl.clear_record_out_of_range(&key);
assert!(
pl.record_out_of_range_since(&key).is_none(),
"timestamp should be cleared after heal"
);
let before_second = Instant::now();
pl.set_record_out_of_range(&key);
let second_ts = pl
.record_out_of_range_since(&key)
.expect("timestamp should exist after second out-of-range");
assert!(
second_ts >= before_second,
"new timestamp should be >= the instant before second set call"
);
}
#[tokio::test]
async fn scenario_52_paid_and_record_timestamps_independent() {
let (pl, _temp) = create_test_paid_list().await;
let key: XorName = [0x52; 32];
pl.set_paid_out_of_range(&key);
pl.set_record_out_of_range(&key);
assert!(pl.paid_out_of_range_since(&key).is_some());
assert!(pl.record_out_of_range_since(&key).is_some());
pl.clear_record_out_of_range(&key);
assert!(
pl.paid_out_of_range_since(&key).is_some(),
"paid timestamp should survive clearing record timestamp"
);
assert!(pl.record_out_of_range_since(&key).is_none());
pl.set_record_out_of_range(&key);
pl.clear_paid_out_of_range(&key);
assert!(
pl.record_out_of_range_since(&key).is_some(),
"record timestamp should survive clearing paid timestamp"
);
assert!(pl.paid_out_of_range_since(&key).is_none());
}
#[tokio::test]
async fn scenario_23_paid_list_entry_removed() {
let (pl, _temp) = create_test_paid_list().await;
let key: XorName = [0x23; 32];
pl.insert(&key).await.expect("insert");
pl.set_paid_out_of_range(&key);
pl.set_record_out_of_range(&key);
let removed = pl.remove(&key).await.expect("remove");
assert!(removed, "key should have existed");
assert!(
!pl.contains(&key).expect("contains check"),
"key should be gone from paid list"
);
assert!(
pl.paid_out_of_range_since(&key).is_none(),
"paid timestamp should be cleaned up on remove"
);
assert!(
pl.record_out_of_range_since(&key).is_none(),
"record timestamp should be cleaned up on remove"
);
}
#[tokio::test]
async fn scenario_13_responsible_range_shrink() {
let (pl, _temp) = create_test_paid_list().await;
let out_of_range_key: XorName = [0x13; 32];
let in_range_key: XorName = [0x14; 32];
pl.insert(&out_of_range_key)
.await
.expect("insert out-of-range");
pl.insert(&in_range_key).await.expect("insert in-range");
pl.set_record_out_of_range(&out_of_range_key);
let first_seen = pl
.record_out_of_range_since(&out_of_range_key)
.expect("timestamp should be recorded for out-of-range key");
let elapsed = first_seen.elapsed();
assert!(
elapsed < PRUNE_HYSTERESIS_DURATION,
"elapsed {elapsed:?} should be below PRUNE_HYSTERESIS_DURATION \
({PRUNE_HYSTERESIS_DURATION:?}) — key must not be pruned yet"
);
assert!(
pl.contains(&out_of_range_key).expect("contains"),
"out-of-range key should still be retained within hysteresis window"
);
assert!(
pl.record_out_of_range_since(&in_range_key).is_none(),
"in-range key should have no out-of-range timestamp"
);
let new_key: XorName = [0x15; 32];
let was_new = pl.insert(&new_key).await.expect("insert new key");
assert!(
was_new,
"new in-range keys should still be accepted while out-of-range keys await expiry"
);
assert!(
pl.contains(&new_key).expect("contains new"),
"newly inserted in-range key should be present"
);
}
#[test]
fn scenario_46_bootstrap_claim_first_seen_recorded() {
let peer = PeerId::from_bytes([0x46; 32]);
let mut state = NeighborSyncState::new_cycle(vec![peer]);
let first_ts = Instant::now()
.checked_sub(std::time::Duration::from_secs(3))
.unwrap_or_else(Instant::now);
state.bootstrap_claims.insert(peer, first_ts);
assert_eq!(
state.bootstrap_claims.get(&peer),
Some(&first_ts),
"first-seen timestamp should be recorded"
);
let later_ts = Instant::now();
state.bootstrap_claims.entry(peer).or_insert(later_ts);
assert_eq!(
state.bootstrap_claims.get(&peer),
Some(&first_ts),
"second insert must not overwrite the original timestamp"
);
}
#[test]
fn scenario_48_bootstrap_claim_abuse_after_grace_period() {
let peer = PeerId::from_bytes([0x48; 32]);
let mut state = NeighborSyncState::new_cycle(vec![peer]);
let grace_plus_margin = BOOTSTRAP_CLAIM_GRACE_PERIOD + std::time::Duration::from_secs(3600);
let first_seen = Instant::now()
.checked_sub(grace_plus_margin)
.unwrap_or_else(Instant::now);
state.bootstrap_claims.insert(peer, first_seen);
let claim_age = Instant::now().duration_since(first_seen);
if claim_age > std::time::Duration::from_secs(1) {
assert!(
claim_age > BOOTSTRAP_CLAIM_GRACE_PERIOD,
"claim age {claim_age:?} should exceed grace period {BOOTSTRAP_CLAIM_GRACE_PERIOD:?}",
);
}
let evidence = FailureEvidence::BootstrapClaimAbuse { peer, first_seen };
let FailureEvidence::BootstrapClaimAbuse {
peer: p,
first_seen: fs,
} = evidence
else {
unreachable!("evidence was just constructed as BootstrapClaimAbuse");
};
assert_eq!(p, peer);
assert_eq!(fs, first_seen);
}
#[test]
fn scenario_49_bootstrap_claim_cleared() {
let peer = PeerId::from_bytes([0x49; 32]);
let mut state = NeighborSyncState::new_cycle(vec![peer]);
state.bootstrap_claims.insert(peer, Instant::now());
assert!(
state.bootstrap_claims.contains_key(&peer),
"claim should exist after insert"
);
state.bootstrap_claims.remove(&peer);
assert!(
!state.bootstrap_claims.contains_key(&peer),
"claim should be gone after normal response"
);
}
}