#![warn(missing_docs)]
#![warn(unused_extern_crates)]
mod metrics;
mod subscription;
use self::subscription::{SubscriptionStatementsStream, SubscriptionsHandle};
use futures::FutureExt;
use metrics::MetricsLink as PrometheusMetrics;
use parking_lot::{lock_api::RwLockUpgradableReadGuard, RwLock};
use soil_prometheus::Registry as PrometheusRegistry;
use soil_client::blockchain::HeaderBackend;
use soil_client::client_api::{backend::StorageProvider, Backend, StorageKey};
use soil_client::keystore::LocalKeystore;
use soil_statement_store::{
runtime_api::{StatementSource, StatementStoreExt},
AccountId, BlockHash, Channel, DecryptionKey, FilterDecision, Hash, InvalidReason,
OptimizedTopicFilter, Proof, RejectionReason, Result, SignatureVerificationResult, Statement,
StatementAllowance, StatementEvent, SubmitResult, Topic,
};
pub use soil_statement_store::{Error, StatementStore, MAX_TOPICS};
use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
};
pub use subscription::StatementStoreSubscriptionApi;
use subsoil::core::{
crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode,
};
use subsoil::runtime::traits::Block as BlockT;
const KEY_VERSION: &[u8] = b"version".as_slice();
const CURRENT_VERSION: u32 = 1;
const LOG_TARGET: &str = "statement-store";
pub const DEFAULT_PURGE_AFTER_SEC: u64 = 2 * 24 * 60 * 60; pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; pub const MAX_STATEMENT_SIZE: usize =
crate::statement::config::MAX_STATEMENT_NOTIFICATION_SIZE as usize - 1;
const MAX_EXPIRY_STATEMENTS_PER_ITERATION: usize = 10_000;
const MAX_EXPIRY_ACCOUNTS_PER_ITERATION: usize = 10_000;
const MAX_EXPIRY_TIME_PER_ITERATION: Duration = Duration::from_millis(100);
const NUM_FILTER_WORKERS: usize = 1;
const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(29);
const ENFORCE_LIMITS_PERIOD: std::time::Duration = std::time::Duration::from_secs(31);
mod col {
pub const META: u8 = 0;
pub const STATEMENTS: u8 = 1;
pub const EXPIRED: u8 = 2;
pub const COUNT: u8 = 3;
}
#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)]
struct Expiry(u64);
#[derive(PartialEq, Eq)]
struct PriorityKey {
hash: Hash,
expiry: Expiry,
}
impl PartialOrd for PriorityKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PriorityKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.expiry.cmp(&other.expiry).then_with(|| self.hash.cmp(&other.hash))
}
}
#[derive(PartialEq, Eq)]
struct ChannelEntry {
hash: Hash,
expiry: Expiry,
}
#[derive(Default)]
struct StatementsForAccount {
by_priority: BTreeMap<PriorityKey, (Option<Channel>, usize)>,
channels: HashMap<Channel, ChannelEntry>,
data_size: usize,
}
impl StatementsForAccount {
fn expired_by_iter(
&self,
current_time: u64,
) -> impl Iterator<Item = (&PriorityKey, &(Option<Channel>, usize))> {
let range = PriorityKey { hash: Hash::default(), expiry: Expiry(0) }..PriorityKey {
hash: Hash::default(),
expiry: Expiry(current_time << 32),
};
self.by_priority.range(range)
}
}
pub struct Options {
max_total_statements: usize,
max_total_size: usize,
purge_after_sec: u64,
}
impl Default for Options {
fn default() -> Self {
Options {
max_total_statements: DEFAULT_MAX_TOTAL_STATEMENTS,
max_total_size: DEFAULT_MAX_TOTAL_SIZE,
purge_after_sec: DEFAULT_PURGE_AFTER_SEC,
}
}
}
#[derive(Default)]
struct Index {
recent: HashSet<Hash>,
by_topic: HashMap<Topic, HashSet<Hash>>,
by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
entries: HashMap<Hash, (AccountId, Expiry, usize)>,
expired: HashMap<Hash, u64>, accounts: HashMap<AccountId, StatementsForAccount>,
accounts_to_check_for_expiry_stmts: Vec<AccountId>,
options: Options,
total_size: usize,
}
struct ClientWrapper<Block, Client, BE> {
client: Arc<Client>,
_block: std::marker::PhantomData<Block>,
_backend: std::marker::PhantomData<BE>,
}
impl<Block, Client, BE> ClientWrapper<Block, Client, BE>
where
Block: BlockT,
Block::Hash: From<BlockHash>,
BE: Backend<Block> + 'static,
Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
{
fn read_allowance(
&self,
account_id: &AccountId,
block_hash: Option<Block::Hash>,
) -> Result<Option<StatementAllowance>> {
use soil_statement_store::{statement_allowance_key, StatementAllowance};
let block_hash = block_hash.unwrap_or(self.client.info().finalized_hash);
let key = statement_allowance_key(account_id);
let storage_key = StorageKey(key);
self.client
.storage(block_hash, &storage_key)
.map_err(|e| Error::Storage(format!("Failed to read allowance: {:?}", e)))?
.map(|value| {
StatementAllowance::decode(&mut &value.0[..])
.map_err(|e| Error::Decode(format!("Failed to decode allowance: {:?}", e)))
})
.transpose()
}
}
pub struct Store {
db: parity_db::Db,
index: RwLock<Index>,
read_allowance_fn: Box<
dyn Fn(&AccountId, Option<BlockHash>) -> Result<Option<StatementAllowance>> + Send + Sync,
>,
subscription_manager: SubscriptionsHandle,
keystore: Arc<LocalKeystore>,
time_override: Option<u64>,
metrics: PrometheusMetrics,
}
enum IndexQuery {
Unknown,
Exists,
Expired,
}
impl Index {
fn new(options: Options) -> Index {
Index { options, ..Default::default() }
}
fn insert_new(
&mut self,
hash: Hash,
account: AccountId,
statement: &Statement,
is_recent: bool,
) {
let mut all_topics = [None; MAX_TOPICS];
let mut nt = 0;
while let Some(t) = statement.topic(nt) {
self.by_topic.entry(t).or_default().insert(hash);
all_topics[nt] = Some(t);
nt += 1;
}
let key = statement.decryption_key();
self.by_dec_key.entry(key).or_default().insert(hash);
if nt > 0 || key.is_some() {
self.topics_and_keys.insert(hash, (all_topics, key));
}
let expiry = Expiry(statement.expiry());
self.entries.insert(hash, (account, expiry, statement.data_len()));
if is_recent {
self.recent.insert(hash);
}
self.total_size += statement.data_len();
let account_info = self.accounts.entry(account).or_default();
account_info.data_size += statement.data_len();
if let Some(channel) = statement.channel() {
account_info.channels.insert(channel, ChannelEntry { hash, expiry });
}
account_info
.by_priority
.insert(PriorityKey { hash, expiry }, (statement.channel(), statement.data_len()));
}
fn query(&self, hash: &Hash) -> IndexQuery {
if self.entries.contains_key(hash) {
return IndexQuery::Exists;
}
if self.expired.contains_key(hash) {
return IndexQuery::Expired;
}
IndexQuery::Unknown
}
fn insert_expired(&mut self, hash: Hash, timestamp: u64) {
self.expired.insert(hash, timestamp);
}
fn iterate_with(
&self,
key: Option<DecryptionKey>,
topic: &OptimizedTopicFilter,
f: impl FnMut(&Hash) -> Result<()>,
) -> Result<()> {
match topic {
OptimizedTopicFilter::Any => self.iterate_with_any(key, f),
OptimizedTopicFilter::MatchAll(topics) => {
self.iterate_with_match_all(key, topics.iter(), f)
},
OptimizedTopicFilter::MatchAny(topics) => {
self.iterate_with_match_any(key, topics.iter(), f)
},
}
}
fn iterate_with_match_any<'a>(
&self,
key: Option<DecryptionKey>,
match_any_topics: impl ExactSizeIterator<Item = &'a Topic>,
mut f: impl FnMut(&Hash) -> Result<()>,
) -> Result<()> {
let Some(key_set) = self.by_dec_key.get(&key).filter(|k| !k.is_empty()) else {
return Ok(());
};
for t in match_any_topics {
let set = self.by_topic.get(t);
for item in set.iter().flat_map(|set| set.iter()) {
if key_set.contains(item) {
log::trace!(
target: LOG_TARGET,
"Iterating by topic/key: statement {:?}",
HexDisplay::from(item)
);
f(item)?
}
}
}
Ok(())
}
fn iterate_with_any(
&self,
key: Option<DecryptionKey>,
mut f: impl FnMut(&Hash) -> Result<()>,
) -> Result<()> {
let key_set = self.by_dec_key.get(&key);
if key_set.map_or(true, |s| s.is_empty()) {
return Ok(());
}
for item in key_set.map(|hashes| hashes.iter()).into_iter().flatten() {
f(item)?
}
Ok(())
}
fn iterate_with_match_all<'a>(
&self,
key: Option<DecryptionKey>,
match_all_topics: impl ExactSizeIterator<Item = &'a Topic>,
mut f: impl FnMut(&Hash) -> Result<()>,
) -> Result<()> {
let empty = HashSet::new();
let mut sets: [&HashSet<Hash>; MAX_TOPICS + 1] = [∅ MAX_TOPICS + 1];
let num_topics = match_all_topics.len();
if num_topics > MAX_TOPICS {
return Ok(());
}
let key_set = self.by_dec_key.get(&key);
if key_set.map_or(true, |s| s.is_empty()) {
return Ok(());
}
sets[0] = key_set.expect("Function returns if key_set is None");
for (i, t) in match_all_topics.enumerate() {
let set = self.by_topic.get(t);
if set.map_or(0, |s| s.len()) == 0 {
return Ok(());
}
sets[i + 1] = set.expect("Function returns if set is None");
}
let sets = &mut sets[0..num_topics + 1];
sets.sort_by_key(|s| s.len());
for item in sets[0] {
if sets[1..].iter().all(|set| set.contains(item)) {
log::trace!(
target: LOG_TARGET,
"Iterating by topic/key: statement {:?}",
HexDisplay::from(item)
);
f(item)?
}
}
Ok(())
}
fn maintain(&mut self, current_time: u64) -> Vec<Hash> {
let mut purged = Vec::new();
self.expired.retain(|hash, timestamp| {
if *timestamp + self.options.purge_after_sec <= current_time {
purged.push(*hash);
log::trace!(target: LOG_TARGET, "Purged statement {:?}", HexDisplay::from(hash));
false
} else {
true
}
});
purged
}
fn take_recent(&mut self) -> HashSet<Hash> {
std::mem::take(&mut self.recent)
}
fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
if let Some((account, expiry, len)) = self.entries.remove(hash) {
self.total_size -= len;
if let Some((topics, key)) = self.topics_and_keys.remove(hash) {
for t in topics.into_iter().flatten() {
if let std::collections::hash_map::Entry::Occupied(mut set) =
self.by_topic.entry(t)
{
set.get_mut().remove(hash);
if set.get().is_empty() {
set.remove_entry();
}
}
}
if let std::collections::hash_map::Entry::Occupied(mut set) =
self.by_dec_key.entry(key)
{
set.get_mut().remove(hash);
if set.get().is_empty() {
set.remove_entry();
}
}
}
let _ = self.recent.remove(hash);
self.expired.insert(*hash, current_time);
if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
self.accounts.entry(account)
{
let key = PriorityKey { hash: *hash, expiry };
if let Some((channel, len)) = account_rec.get_mut().by_priority.remove(&key) {
account_rec.get_mut().data_size -= len;
if let Some(channel) = channel {
account_rec.get_mut().channels.remove(&channel);
}
}
if account_rec.get().by_priority.is_empty() {
account_rec.remove_entry();
}
}
log::trace!(target: LOG_TARGET, "Expired statement {:?}", HexDisplay::from(hash));
true
} else {
false
}
}
fn insert(
&mut self,
hash: Hash,
statement: &Statement,
account: &AccountId,
validation: &StatementAllowance,
current_time: u64,
) -> std::result::Result<HashSet<Hash>, RejectionReason> {
let statement_len = statement.data_len();
if statement_len > validation.max_size as usize {
log::debug!(
target: LOG_TARGET,
"Ignored oversize message: {:?} ({} bytes)",
HexDisplay::from(&hash),
statement_len,
);
return Err(RejectionReason::DataTooLarge {
submitted_size: statement_len,
available_size: validation.max_size as usize,
});
}
let mut evicted = HashSet::new();
let mut would_free_size = 0;
let expiry = Expiry(statement.expiry());
let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize);
if let Some(account_rec) = self.accounts.get(account) {
if let Some(channel) = statement.channel() {
if let Some(channel_record) = account_rec.channels.get(&channel) {
if expiry <= channel_record.expiry {
log::debug!(
target: LOG_TARGET,
"Ignored lower priority channel message: {:?} {:?} <= {:?}",
HexDisplay::from(&hash),
expiry,
channel_record.expiry,
);
return Err(RejectionReason::ChannelPriorityTooLow {
submitted_expiry: expiry.0,
min_expiry: channel_record.expiry.0,
});
} else {
log::debug!(
target: LOG_TARGET,
"Replacing higher priority channel message: {:?} ({:?}) > {:?} ({:?})",
HexDisplay::from(&hash),
expiry,
HexDisplay::from(&channel_record.hash),
channel_record.expiry,
);
let key = PriorityKey {
hash: channel_record.hash,
expiry: channel_record.expiry,
};
if let Some((_channel, len)) = account_rec.by_priority.get(&key) {
would_free_size += *len;
evicted.insert(channel_record.hash);
}
}
}
}
for (entry, (_, len)) in account_rec.by_priority.iter() {
if (account_rec.data_size - would_free_size + statement_len <= max_size)
&& account_rec.by_priority.len() + 1 - evicted.len() <= max_count
{
break;
}
if evicted.contains(&entry.hash) {
continue;
}
if entry.expiry >= expiry {
log::debug!(
target: LOG_TARGET,
"Ignored message due to constraints {:?} {:?} < {:?}",
HexDisplay::from(&hash),
expiry,
entry.expiry,
);
return Err(RejectionReason::AccountFull {
submitted_expiry: expiry.0,
min_expiry: entry.expiry.0,
});
}
evicted.insert(entry.hash);
would_free_size += len;
}
}
if !((self.total_size - would_free_size + statement_len <= self.options.max_total_size)
&& self.entries.len() + 1 - evicted.len() <= self.options.max_total_statements)
{
log::debug!(
target: LOG_TARGET,
"Ignored statement {} because the store is full (size={}, count={})",
HexDisplay::from(&hash),
self.total_size,
self.entries.len(),
);
return Err(RejectionReason::StoreFull);
}
for h in &evicted {
self.make_expired(h, current_time);
}
self.insert_new(hash, *account, statement, true);
Ok(evicted)
}
}
impl Store {
pub fn new_shared<Block, Client, BE>(
path: &std::path::Path,
options: Options,
client: Arc<Client>,
keystore: Arc<LocalKeystore>,
prometheus: Option<&PrometheusRegistry>,
task_spawner: Box<dyn SpawnNamed>,
) -> Result<Arc<Store>>
where
Block: BlockT,
Block::Hash: From<BlockHash>,
BE: Backend<Block> + 'static,
Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
{
let store =
Arc::new(Self::new(path, options, client, keystore, prometheus, task_spawner.clone())?);
let worker_store = store.clone();
task_spawner.spawn(
"statement-store-maintenance",
Some("statement-store"),
Box::pin(async move {
let mut maintenance_interval = tokio::time::interval(MAINTENANCE_PERIOD);
let mut enforce_limits_interval = tokio::time::interval(ENFORCE_LIMITS_PERIOD);
loop {
futures::select! {
_ = maintenance_interval.tick().fuse() => {worker_store.maintain();}
_ = enforce_limits_interval.tick().fuse() => {worker_store.enforce_limits();}
}
}
}),
);
Ok(store)
}
#[doc(hidden)]
pub fn new<Block, Client, BE>(
path: &std::path::Path,
options: Options,
client: Arc<Client>,
keystore: Arc<LocalKeystore>,
prometheus: Option<&PrometheusRegistry>,
task_spawner: Box<dyn SpawnNamed>,
) -> Result<Store>
where
Block: BlockT,
Block::Hash: From<BlockHash>,
BE: Backend<Block> + 'static,
Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
{
let mut path: std::path::PathBuf = path.into();
path.push("statements");
let mut config = parity_db::Options::with_columns(&path, col::COUNT);
let statement_col = &mut config.columns[col::STATEMENTS as usize];
statement_col.ref_counted = false;
statement_col.preimage = true;
statement_col.uniform = true;
let db = parity_db::Db::open_or_create(&config).map_err(|e| Error::Db(e.to_string()))?;
match db.get(col::META, &KEY_VERSION).map_err(|e| Error::Db(e.to_string()))? {
Some(version) => {
let version = u32::from_le_bytes(
version
.try_into()
.map_err(|_| Error::Db("Error reading database version".into()))?,
);
if version != CURRENT_VERSION {
return Err(Error::Db(format!("Unsupported database version: {version}")));
}
},
None => {
db.commit([(
col::META,
KEY_VERSION.to_vec(),
Some(CURRENT_VERSION.to_le_bytes().to_vec()),
)])
.map_err(|e| Error::Db(e.to_string()))?;
},
}
let storage_reader =
ClientWrapper { client, _block: Default::default(), _backend: Default::default() };
let read_allowance_fn =
Box::new(move |account_id: &AccountId, block_hash: Option<BlockHash>| {
storage_reader.read_allowance(account_id, block_hash.map(Into::into))
});
let store = Store {
db,
index: RwLock::new(Index::new(options)),
read_allowance_fn,
keystore,
time_override: None,
metrics: PrometheusMetrics::new(prometheus),
subscription_manager: SubscriptionsHandle::new(
task_spawner.clone(),
NUM_FILTER_WORKERS,
),
};
store.populate()?;
Ok(store)
}
fn populate(&self) -> Result<()> {
{
let mut index = self.index.write();
self.db
.iter_column_while(col::STATEMENTS, |item| {
let statement = item.value;
if let Ok(statement) = Statement::decode(&mut statement.as_slice()) {
let hash = statement.hash();
log::trace!(
target: LOG_TARGET,
"Statement loaded {:?}",
HexDisplay::from(&hash)
);
if let Some(account_id) = statement.account_id() {
index.insert_new(hash, account_id, &statement, false);
} else {
log::debug!(
target: LOG_TARGET,
"Error decoding statement loaded from the DB: {:?}",
HexDisplay::from(&hash)
);
}
}
true
})
.map_err(|e| Error::Db(e.to_string()))?;
self.db
.iter_column_while(col::EXPIRED, |item| {
let expired_info = item.value;
if let Ok((hash, timestamp)) =
<(Hash, u64)>::decode(&mut expired_info.as_slice())
{
log::trace!(
target: LOG_TARGET,
"Statement loaded (expired): {:?}",
HexDisplay::from(&hash)
);
index.insert_expired(hash, timestamp);
}
true
})
.map_err(|e| Error::Db(e.to_string()))?;
}
self.maintain();
Ok(())
}
fn collect_statements_locked<R>(
&self,
key: Option<DecryptionKey>,
topic_filter: &OptimizedTopicFilter,
index: &Index,
result: &mut Vec<R>,
mut f: impl FnMut(Statement) -> Option<R>,
) -> Result<()> {
index.iterate_with(key, topic_filter, |hash| {
match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? {
Some(entry) => {
if let Ok(statement) = Statement::decode(&mut entry.as_slice()) {
if let Some(data) = f(statement) {
result.push(data);
}
} else {
log::warn!(
target: LOG_TARGET,
"Corrupt statement {:?}",
HexDisplay::from(hash)
);
}
},
None => {
log::warn!(
target: LOG_TARGET,
"Missing statement {:?}",
HexDisplay::from(hash)
);
},
}
Ok(())
})?;
Ok(())
}
fn collect_statements<R>(
&self,
key: Option<DecryptionKey>,
topic_filter: &OptimizedTopicFilter,
f: impl FnMut(Statement) -> Option<R>,
) -> Result<Vec<R>> {
let mut result = Vec::new();
let index = self.index.read();
self.collect_statements_locked(key, topic_filter, &index, &mut result, f)?;
Ok(result)
}
fn collect_evictions(
&self,
account: &AccountId,
account_rec: &StatementsForAccount,
current_time: u64,
) -> Vec<Hash> {
let mut to_evict = Vec::new();
let mut expired_count = 0usize;
let mut expired_size = 0usize;
for (key, (_, len)) in account_rec.expired_by_iter(current_time) {
to_evict.push(key.hash);
expired_count += 1;
expired_size += len;
}
let allowance = match (self.read_allowance_fn)(account, None) {
Ok(Some(allowance)) => allowance,
Ok(None) => {
log::debug!(
target: LOG_TARGET,
"No allowance found for account {:?}, treating as zero allowance",
HexDisplay::from(account)
);
StatementAllowance { max_count: 0, max_size: 0 }
},
Err(e) => {
log::error!(target: LOG_TARGET, "Error reading allowance: {:?}", e);
return to_evict;
},
};
let mut remaining_count = account_rec.by_priority.len() - expired_count;
let mut remaining_size = account_rec.data_size - expired_size;
if remaining_count > allowance.max_count as usize
|| remaining_size > allowance.max_size as usize
{
log::debug!(
target: LOG_TARGET,
"Account {:?} exceeds allowance: count={}/{}, size={}/{}",
HexDisplay::from(account),
remaining_count,
allowance.max_count,
remaining_size,
allowance.max_size
);
for (key, (_, len)) in account_rec.by_priority.iter().skip(expired_count) {
if remaining_count <= allowance.max_count as usize
&& remaining_size <= allowance.max_size as usize
{
break;
}
to_evict.push(key.hash);
remaining_count -= 1;
remaining_size -= len;
log::debug!(
target: LOG_TARGET,
"Evicting statement {:?} due to allowance enforcement",
HexDisplay::from(&key.hash)
);
}
}
to_evict
}
fn enforce_limits(&self) {
let _start_check_expiration_timer = self.metrics.start_check_expiration_timer();
let current_time = self.timestamp();
let (to_evict, num_accounts_checked) = {
let index = self.index.upgradable_read();
if index.accounts_to_check_for_expiry_stmts.is_empty() {
let existing_accounts = index.accounts.keys().cloned().collect::<Vec<_>>();
let mut index = RwLockUpgradableReadGuard::upgrade(index);
index.accounts_to_check_for_expiry_stmts = existing_accounts;
return;
}
let mut to_evict = Vec::new();
let mut num_accounts_checked = 0;
let start = Instant::now();
for account in index.accounts_to_check_for_expiry_stmts.iter().rev() {
num_accounts_checked += 1;
if let Some(account_rec) = index.accounts.get(account) {
to_evict.extend(self.collect_evictions(account, account_rec, current_time));
}
if to_evict.len() >= MAX_EXPIRY_STATEMENTS_PER_ITERATION
|| num_accounts_checked >= MAX_EXPIRY_ACCOUNTS_PER_ITERATION
|| start.elapsed() >= MAX_EXPIRY_TIME_PER_ITERATION
{
break;
}
}
(to_evict, num_accounts_checked)
};
let mut expired = 0;
for hash in to_evict {
if let Err(e) = self.remove(&hash) {
log::debug!(
target: LOG_TARGET,
"Error marking statement {:?} as expired: {:?}",
HexDisplay::from(&hash),
e
);
} else {
expired += 1;
log::trace!(
target: LOG_TARGET,
"Marked statement {:?} as expired",
HexDisplay::from(&hash)
);
}
}
let mut index = self.index.write();
let new_len = index
.accounts_to_check_for_expiry_stmts
.len()
.saturating_sub(num_accounts_checked);
index.accounts_to_check_for_expiry_stmts.truncate(new_len);
drop(_start_check_expiration_timer);
self.metrics.report(|metrics| {
metrics.statements_expired_total.inc_by(expired);
});
}
pub fn maintain(&self) {
log::trace!(target: LOG_TARGET, "Started store maintenance");
let (
deleted,
active_count,
expired_count,
total_size,
accounts_count,
capacity_statements,
capacity_bytes,
): (Vec<_>, usize, usize, usize, usize, usize, usize) = {
let mut index = self.index.write();
let deleted = index.maintain(self.timestamp());
(
deleted,
index.entries.len(),
index.expired.len(),
index.total_size,
index.accounts.len(),
index.options.max_total_statements,
index.options.max_total_size,
)
};
let deleted: Vec<_> =
deleted.into_iter().map(|hash| (col::EXPIRED, hash.to_vec(), None)).collect();
let deleted_count = deleted.len() as u64;
if let Err(e) = self.db.commit(deleted) {
log::warn!(target: LOG_TARGET, "Error writing to the statement database: {:?}", e);
} else {
self.metrics.report(|metrics| metrics.statements_pruned.inc_by(deleted_count));
}
self.metrics.report(|metrics| {
metrics.statements_total.set(active_count as u64);
metrics.bytes_total.set(total_size as u64);
metrics.accounts_total.set(accounts_count as u64);
metrics.expired_total.set(expired_count as u64);
metrics.capacity_statements.set(capacity_statements as u64);
metrics.capacity_bytes.set(capacity_bytes as u64);
});
log::trace!(
target: LOG_TARGET,
"Completed store maintenance. Purged: {}, Active: {}, Expired: {}",
deleted_count,
active_count,
expired_count
);
}
fn timestamp(&self) -> u64 {
self.time_override.unwrap_or_else(|| {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
})
}
#[cfg(test)]
fn set_time(&mut self, time: u64) {
self.time_override = Some(time);
}
pub fn as_statement_store_ext(self: Arc<Self>) -> StatementStoreExt {
StatementStoreExt::new(self)
}
fn posted_clear_inner<R>(
&self,
match_all_topics: &[Topic],
dest: [u8; 32],
mut map_f: impl FnMut(Statement, Vec<u8>) -> R,
) -> Result<Vec<R>> {
self.collect_statements(
Some(dest),
&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
|statement| {
if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) {
let public: subsoil::core::ed25519::Public = UncheckedFrom::unchecked_from(key);
let public: soil_statement_store::ed25519::Public = public.into();
match self.keystore.key_pair::<soil_statement_store::ed25519::Pair>(&public) {
Err(e) => {
log::debug!(
target: LOG_TARGET,
"Keystore error: {:?}, for statement {:?}",
e,
HexDisplay::from(&statement.hash())
);
None
},
Ok(None) => {
log::debug!(
target: LOG_TARGET,
"Keystore is missing key for statement {:?}",
HexDisplay::from(&statement.hash())
);
None
},
Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) {
Ok(r) => r.map(|data| map_f(statement, data)),
Err(e) => {
log::debug!(
target: LOG_TARGET,
"Decryption error: {:?}, for statement {:?}",
e,
HexDisplay::from(&statement.hash())
);
None
},
},
}
} else {
None
}
},
)
}
}
impl StatementStore for Store {
fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
let index = self.index.read();
let mut result = Vec::with_capacity(index.entries.len());
for hash in index.entries.keys().cloned() {
let Some(encoded) =
self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
else {
continue;
};
if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
result.push((hash, statement));
}
}
Ok(result)
}
fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>> {
let mut index = self.index.write();
let recent = index.take_recent();
let mut result = Vec::with_capacity(recent.len());
for hash in recent {
let Some(encoded) =
self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
else {
continue;
};
if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
result.push((hash, statement));
}
}
Ok(result)
}
fn statement(&self, hash: &Hash) -> Result<Option<Statement>> {
Ok(
match self
.db
.get(col::STATEMENTS, hash.as_slice())
.map_err(|e| Error::Db(e.to_string()))?
{
Some(entry) => {
log::trace!(
target: LOG_TARGET,
"Queried statement {:?}",
HexDisplay::from(hash)
);
Some(
Statement::decode(&mut entry.as_slice())
.map_err(|e| Error::Decode(e.to_string()))?,
)
},
None => {
log::trace!(
target: LOG_TARGET,
"Queried missing statement {:?}",
HexDisplay::from(hash)
);
None
},
},
)
}
fn has_statement(&self, hash: &Hash) -> bool {
self.index.read().entries.contains_key(hash)
}
fn statement_hashes(&self) -> Vec<Hash> {
self.index.read().entries.keys().cloned().collect()
}
fn statements_by_hashes(
&self,
hashes: &[Hash],
filter: &mut dyn FnMut(&Hash, &[u8], &Statement) -> FilterDecision,
) -> Result<(Vec<(Hash, Statement)>, usize)> {
let mut result = Vec::new();
let mut processed = 0;
for hash in hashes {
processed += 1;
let Some(encoded) =
self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))?
else {
continue;
};
let Ok(statement) = Statement::decode(&mut encoded.as_slice()) else { continue };
match filter(hash, &encoded, &statement) {
FilterDecision::Skip => {},
FilterDecision::Take => {
result.push((*hash, statement));
},
FilterDecision::Abort => {
processed -= 1;
break;
},
}
}
Ok((result, processed))
}
fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
self.collect_statements(
None,
&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
|statement| statement.into_data(),
)
}
fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
self.collect_statements(
Some(dest),
&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
|statement| statement.into_data(),
)
}
fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
self.posted_clear_inner(match_all_topics, dest, |_statement, data| data)
}
fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
self.collect_statements(
None,
&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
|statement| Some(statement.encode()),
)
}
fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
self.collect_statements(
Some(dest),
&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
|statement| Some(statement.encode()),
)
}
fn posted_clear_stmt(
&self,
match_all_topics: &[Topic],
dest: [u8; 32],
) -> Result<Vec<Vec<u8>>> {
self.posted_clear_inner(match_all_topics, dest, |statement, data| {
let mut res = Vec::with_capacity(statement.size_hint() + data.len());
statement.encode_to(&mut res);
res.extend_from_slice(&data);
res
})
}
fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult {
let _histogram_submit_start_timer = self.metrics.start_submit_timer();
let hash = statement.hash();
if self.timestamp() >= statement.get_expiration_timestamp_secs().into() {
log::debug!(
target: LOG_TARGET,
"Statement is already expired: {:?}",
HexDisplay::from(&hash),
);
self.metrics.report(|metrics| metrics.validations_invalid.inc());
return SubmitResult::Invalid(InvalidReason::AlreadyExpired);
}
let encoded_size = statement.encoded_size();
if encoded_size > MAX_STATEMENT_SIZE {
log::debug!(
target: LOG_TARGET,
"Statement is too big for propogation: {:?} ({}/{} bytes)",
HexDisplay::from(&hash),
statement.encoded_size(),
MAX_STATEMENT_SIZE
);
self.metrics.report(|metrics| metrics.validations_invalid.inc());
return SubmitResult::Invalid(InvalidReason::EncodingTooLarge {
submitted_size: encoded_size,
max_size: MAX_STATEMENT_SIZE,
});
}
match self.index.read().query(&hash) {
IndexQuery::Expired => {
if !source.can_be_resubmitted() {
return SubmitResult::KnownExpired;
}
},
IndexQuery::Exists => {
if !source.can_be_resubmitted() {
return SubmitResult::Known;
}
},
IndexQuery::Unknown => {},
}
let Some(account_id) = statement.account_id() else {
log::debug!(
target: LOG_TARGET,
"Statement validation failed: Missing proof ({:?})",
HexDisplay::from(&hash),
);
self.metrics.report(|metrics| metrics.validations_invalid.inc());
return SubmitResult::Invalid(InvalidReason::NoProof);
};
match statement.verify_signature() {
SignatureVerificationResult::Valid(_) => {},
SignatureVerificationResult::Invalid => {
log::debug!(
target: LOG_TARGET,
"Statement validation failed: BadProof, {:?}",
HexDisplay::from(&hash),
);
self.metrics.report(|metrics| metrics.validations_invalid.inc());
return SubmitResult::Invalid(InvalidReason::BadProof);
},
SignatureVerificationResult::NoSignature => {
if let Some(Proof::OnChain { .. }) = statement.proof() {
log::debug!(
target: LOG_TARGET,
"Statement with OnChain proof accepted: {:?}",
HexDisplay::from(&hash),
);
} else {
log::debug!(
target: LOG_TARGET,
"Statement validation failed: NoProof, {:?}",
HexDisplay::from(&hash),
);
self.metrics.report(|metrics| metrics.validations_invalid.inc());
return SubmitResult::Invalid(InvalidReason::NoProof);
}
},
};
let validation = match (self.read_allowance_fn)(
&account_id,
statement.proof().and_then(|p| match p {
Proof::OnChain { block_hash, .. } => Some(*block_hash),
_ => None,
}),
) {
Ok(Some(allowance)) => allowance,
Ok(None) => {
log::debug!(
target: LOG_TARGET,
"Account {} has no statement allowance set",
HexDisplay::from(&account_id),
);
return SubmitResult::Rejected(RejectionReason::NoAllowance);
},
Err(e) => {
log::debug!(
target: LOG_TARGET,
"Reading statement allowance for account {} failed",
HexDisplay::from(&account_id),
);
return SubmitResult::InternalError(e);
},
};
let current_time = self.timestamp();
let mut commit = Vec::new();
{
let mut index = self.index.write();
let evicted =
match index.insert(hash, &statement, &account_id, &validation, current_time) {
Ok(evicted) => evicted,
Err(reason) => {
self.metrics.report(|metrics| {
metrics.rejections.with_label_values(&[reason.label()]).inc();
});
return SubmitResult::Rejected(reason);
},
};
commit.push((col::STATEMENTS, hash.to_vec(), Some(statement.encode())));
for hash in evicted {
commit.push((col::STATEMENTS, hash.to_vec(), None));
commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
}
if let Err(e) = self.db.commit(commit) {
log::debug!(
target: LOG_TARGET,
"Statement validation failed: database error {}, {:?}",
e,
statement
);
return SubmitResult::InternalError(Error::Db(e.to_string()));
}
self.subscription_manager.notify(statement);
} self.metrics.report(|metrics| metrics.submitted_statements.inc());
log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash));
SubmitResult::New
}
fn remove(&self, hash: &Hash) -> Result<()> {
let current_time = self.timestamp();
{
let mut index = self.index.write();
if index.make_expired(hash, current_time) {
let commit = [
(col::STATEMENTS, hash.to_vec(), None),
(col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())),
];
if let Err(e) = self.db.commit(commit) {
log::debug!(
target: LOG_TARGET,
"Error removing statement: database error {}, {:?}",
e,
HexDisplay::from(hash),
);
return Err(Error::Db(e.to_string()));
}
}
}
Ok(())
}
fn remove_by(&self, who: [u8; 32]) -> Result<()> {
let mut index = self.index.write();
let mut evicted = Vec::new();
if let Some(account_rec) = index.accounts.get(&who) {
evicted.extend(account_rec.by_priority.keys().map(|k| k.hash));
}
let current_time = self.timestamp();
let mut commit = Vec::new();
for hash in evicted {
index.make_expired(&hash, current_time);
commit.push((col::STATEMENTS, hash.to_vec(), None));
commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
}
self.db.commit(commit).map_err(|e| {
log::debug!(
target: LOG_TARGET,
"Error removing statement: database error {}, remove by {:?}",
e,
HexDisplay::from(&who),
);
Error::Db(e.to_string())
})
}
}
impl StatementStoreSubscriptionApi for Store {
fn subscribe_statement(
&self,
topic_filter: OptimizedTopicFilter,
) -> Result<(Vec<Vec<u8>>, async_channel::Sender<StatementEvent>, SubscriptionStatementsStream)>
{
let mut existing_statements = Vec::new();
let index = self.index.read();
self.collect_statements_locked(
None,
&topic_filter,
&index,
&mut existing_statements,
|statement| Some(statement.encode()),
)?;
let (subscription_sender, subscription_stream) =
self.subscription_manager.subscribe(topic_filter);
if existing_statements.is_empty() {
subscription_sender
.send_blocking(StatementEvent::NewStatements {
statements: vec![],
remaining: Some(0),
})
.ok();
}
Ok((existing_statements, subscription_sender, subscription_stream))
}
}
#[cfg(test)]
mod tests {
use super::{col, Store, MAX_STATEMENT_SIZE};
use soil_client::keystore::Keystore;
use soil_statement_store::{
AccountId, Channel, DecryptionKey, InvalidReason, Proof, Statement, StatementSource,
StatementStore, SubmitResult, Topic,
};
use subsoil::core::{Decode, Encode, Pair};
type Extrinsic = subsoil::runtime::OpaqueExtrinsic;
type Hash = subsoil::core::H256;
type Hashing = subsoil::runtime::traits::BlakeTwo256;
type BlockNumber = u64;
type Header = subsoil::runtime::generic::Header<BlockNumber, Hashing>;
type Block = subsoil::runtime::generic::Block<Header, Extrinsic>;
const CORRECT_BLOCK_HASH: [u8; 32] = [1u8; 32];
#[derive(Clone)]
pub(crate) struct TestClient;
pub(crate) type TestBackend = soil_client::client_api::in_mem::Backend<Block>;
impl soil_client::client_api::StorageProvider<Block, TestBackend> for TestClient {
fn storage(
&self,
_hash: Hash,
key: &soil_client::client_api::StorageKey,
) -> soil_client::blockchain::Result<Option<soil_client::client_api::StorageData>> {
use soil_statement_store::StatementAllowance;
assert_eq!(&key.0[0..21], b":statement_allowance:" as &[u8],);
let account_bytes = &key.0[21..53];
let account_id: u64 = u64::from_le_bytes(account_bytes[0..8].try_into().unwrap());
let allowance = match account_id {
0 => return Ok(None),
1 => StatementAllowance::new(1, 1000),
2 => StatementAllowance::new(2, 1000),
3 => StatementAllowance::new(3, 1000),
4 => StatementAllowance::new(4, 1000),
42 => StatementAllowance::new(42, (42 * MAX_STATEMENT_SIZE) as u32),
_ => StatementAllowance::new(100, 1000),
};
Ok(Some(soil_client::client_api::StorageData(allowance.encode())))
}
fn storage_hash(
&self,
_hash: Hash,
_key: &soil_client::client_api::StorageKey,
) -> soil_client::blockchain::Result<Option<Hash>> {
unimplemented!()
}
fn storage_keys(
&self,
_hash: Hash,
_prefix: Option<&soil_client::client_api::StorageKey>,
_start_key: Option<&soil_client::client_api::StorageKey>,
) -> soil_client::blockchain::Result<
soil_client::client_api::backend::KeysIter<
<TestBackend as soil_client::client_api::Backend<Block>>::State,
Block,
>,
> {
unimplemented!()
}
fn storage_pairs(
&self,
_hash: Hash,
_prefix: Option<&soil_client::client_api::StorageKey>,
_start_key: Option<&soil_client::client_api::StorageKey>,
) -> soil_client::blockchain::Result<
soil_client::client_api::backend::PairsIter<
<TestBackend as soil_client::client_api::Backend<Block>>::State,
Block,
>,
> {
unimplemented!()
}
fn child_storage(
&self,
_hash: Hash,
_child_info: &soil_client::client_api::ChildInfo,
_key: &soil_client::client_api::StorageKey,
) -> soil_client::blockchain::Result<Option<soil_client::client_api::StorageData>> {
unimplemented!()
}
fn child_storage_keys(
&self,
_hash: Hash,
_child_info: soil_client::client_api::ChildInfo,
_prefix: Option<&soil_client::client_api::StorageKey>,
_start_key: Option<&soil_client::client_api::StorageKey>,
) -> soil_client::blockchain::Result<
soil_client::client_api::backend::KeysIter<
<TestBackend as soil_client::client_api::Backend<Block>>::State,
Block,
>,
> {
unimplemented!()
}
fn child_storage_hash(
&self,
_hash: Hash,
_child_info: &soil_client::client_api::ChildInfo,
_key: &soil_client::client_api::StorageKey,
) -> soil_client::blockchain::Result<Option<Hash>> {
unimplemented!()
}
fn closest_merkle_value(
&self,
_hash: Hash,
_key: &soil_client::client_api::StorageKey,
) -> soil_client::blockchain::Result<Option<soil_client::client_api::MerkleValue<Hash>>> {
unimplemented!()
}
fn child_closest_merkle_value(
&self,
_hash: Hash,
_child_info: &soil_client::client_api::ChildInfo,
_key: &soil_client::client_api::StorageKey,
) -> soil_client::blockchain::Result<Option<soil_client::client_api::MerkleValue<Hash>>> {
unimplemented!()
}
}
impl soil_client::blockchain::HeaderBackend<Block> for TestClient {
fn header(&self, _hash: Hash) -> soil_client::blockchain::Result<Option<Header>> {
unimplemented!()
}
fn info(&self) -> soil_client::blockchain::Info<Block> {
soil_client::blockchain::Info {
best_hash: CORRECT_BLOCK_HASH.into(),
best_number: 0,
genesis_hash: Default::default(),
finalized_hash: CORRECT_BLOCK_HASH.into(),
finalized_number: 1,
finalized_state: None,
number_leaves: 0,
block_gap: None,
}
}
fn status(
&self,
_hash: Hash,
) -> soil_client::blockchain::Result<soil_client::blockchain::BlockStatus> {
unimplemented!()
}
fn number(&self, _hash: Hash) -> soil_client::blockchain::Result<Option<BlockNumber>> {
unimplemented!()
}
fn hash(&self, _number: BlockNumber) -> soil_client::blockchain::Result<Option<Hash>> {
unimplemented!()
}
}
fn test_store() -> (Store, tempfile::TempDir) {
subsoil::tracing::init_for_tests();
let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
let client = std::sync::Arc::new(TestClient);
let mut path: std::path::PathBuf = temp_dir.path().into();
path.push("db");
let keystore = std::sync::Arc::new(soil_client::keystore::LocalKeystore::in_memory());
let store = Store::new::<Block, TestClient, TestBackend>(
&path,
Default::default(),
client,
keystore,
None,
Box::new(subsoil::core::testing::TaskExecutor::new()),
)
.unwrap();
(store, temp_dir) }
pub fn signed_statement(data: u8) -> Statement {
signed_statement_with_topics(data, &[], None)
}
fn signed_statement_with_topics(
data: u8,
topics: &[Topic],
dec_key: Option<DecryptionKey>,
) -> Statement {
let mut statement = Statement::new();
statement.set_plain_data(vec![data]);
statement.set_expiry(u64::MAX);
for i in 0..topics.len() {
statement.set_topic(i, topics[i]);
}
if let Some(key) = dec_key {
statement.set_decryption_key(key);
}
let kp = subsoil::core::ed25519::Pair::from_string("//Alice", None).unwrap();
statement.sign_ed25519_private(&kp);
statement
}
fn topic(data: u64) -> Topic {
let mut bytes = [0u8; 32];
bytes[0..8].copy_from_slice(&data.to_le_bytes());
Topic::from(bytes)
}
fn dec_key(data: u64) -> DecryptionKey {
let mut dec_key: DecryptionKey = Default::default();
dec_key[0..8].copy_from_slice(&data.to_le_bytes());
dec_key
}
fn account(id: u64) -> AccountId {
let mut account: AccountId = Default::default();
account[0..8].copy_from_slice(&id.to_le_bytes());
account
}
fn channel(id: u64) -> Channel {
let mut channel: Channel = Default::default();
channel[0..8].copy_from_slice(&id.to_le_bytes());
channel
}
fn statement(account_id: u64, priority: u32, c: Option<u64>, data_len: usize) -> Statement {
let mut statement = Statement::new();
let mut data = Vec::new();
data.resize(data_len, 0);
statement.set_plain_data(data);
statement.set_expiry_from_parts(u32::MAX, priority);
if let Some(c) = c {
statement.set_channel(channel(c));
}
statement.set_proof(Proof::OnChain {
block_hash: CORRECT_BLOCK_HASH,
who: account(account_id),
event_index: 0,
});
statement
}
#[test]
fn submit_one() {
let (store, _temp) = test_store();
let statement0 = signed_statement(0);
assert_eq!(store.submit(statement0, StatementSource::Network), SubmitResult::New);
let unsigned = statement(1, 1, None, 0);
assert_eq!(store.submit(unsigned, StatementSource::Network), SubmitResult::New);
}
#[test]
fn save_and_load_statements() {
let (store, temp) = test_store();
let statement0 = signed_statement(0);
let statement1 = signed_statement(1);
let statement2 = signed_statement(2);
assert_eq!(store.submit(statement0.clone(), StatementSource::Network), SubmitResult::New);
assert_eq!(store.submit(statement1.clone(), StatementSource::Network), SubmitResult::New);
assert_eq!(store.submit(statement2.clone(), StatementSource::Network), SubmitResult::New);
assert_eq!(store.statements().unwrap().len(), 3);
assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1.clone()));
let keystore = store.keystore.clone();
drop(store);
let client = std::sync::Arc::new(TestClient);
let mut path: std::path::PathBuf = temp.path().into();
path.push("db");
let store = Store::new::<Block, TestClient, TestBackend>(
&path,
Default::default(),
client,
keystore,
None,
Box::new(subsoil::core::testing::TaskExecutor::new()),
)
.unwrap();
assert_eq!(store.statements().unwrap().len(), 3);
assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
}
#[test]
fn take_recent_statements_clears_index() {
let (store, _temp) = test_store();
let statement0 = signed_statement(0);
let statement1 = signed_statement(1);
let statement2 = signed_statement(2);
let statement3 = signed_statement(3);
let _ = store.submit(statement0.clone(), StatementSource::Local);
let _ = store.submit(statement1.clone(), StatementSource::Local);
let _ = store.submit(statement2.clone(), StatementSource::Local);
let recent1 = store.take_recent_statements().unwrap();
let (recent1_hashes, recent1_statements): (Vec<_>, Vec<_>) = recent1.into_iter().unzip();
let expected1 = vec![statement0, statement1, statement2];
assert!(expected1.iter().all(|s| recent1_hashes.contains(&s.hash())));
assert!(expected1.iter().all(|s| recent1_statements.contains(s)));
let recent2 = store.take_recent_statements().unwrap();
assert_eq!(recent2.len(), 0);
store.submit(statement3.clone(), StatementSource::Network);
let recent3 = store.take_recent_statements().unwrap();
let (recent3_hashes, recent3_statements): (Vec<_>, Vec<_>) = recent3.into_iter().unzip();
let expected3 = vec![statement3];
assert!(expected3.iter().all(|s| recent3_hashes.contains(&s.hash())));
assert!(expected3.iter().all(|s| recent3_statements.contains(s)));
assert_eq!(store.statements().unwrap().len(), 4);
}
#[test]
fn search_by_topic_and_key() {
let (store, _temp) = test_store();
let statement0 = signed_statement(0);
let statement1 = signed_statement_with_topics(1, &[topic(0)], None);
let statement2 = signed_statement_with_topics(2, &[topic(0), topic(1)], Some(dec_key(2)));
let statement3 = signed_statement_with_topics(3, &[topic(0), topic(1), topic(2)], None);
let statement4 =
signed_statement_with_topics(4, &[topic(0), topic(42), topic(2), topic(3)], None);
let statements = vec![statement0, statement1, statement2, statement3, statement4];
for s in &statements {
store.submit(s.clone(), StatementSource::Network);
}
let assert_topics = |topics: &[u64], key: Option<u64>, expected: &[u8]| {
let key = key.map(dec_key);
let topics: Vec<_> = topics.iter().map(|t| topic(*t)).collect();
let mut got_vals: Vec<_> = if let Some(key) = key {
store.posted(&topics, key).unwrap().into_iter().map(|d| d[0]).collect()
} else {
store.broadcasts(&topics).unwrap().into_iter().map(|d| d[0]).collect()
};
got_vals.sort();
assert_eq!(expected.to_vec(), got_vals);
};
assert_topics(&[], None, &[0, 1, 3, 4]);
assert_topics(&[], Some(2), &[2]);
assert_topics(&[0], None, &[1, 3, 4]);
assert_topics(&[1], None, &[3]);
assert_topics(&[2], None, &[3, 4]);
assert_topics(&[3], None, &[4]);
assert_topics(&[42], None, &[4]);
assert_topics(&[0, 1], None, &[3]);
assert_topics(&[0, 1], Some(2), &[2]);
assert_topics(&[0, 1, 99], Some(2), &[]);
assert_topics(&[1, 2], None, &[3]);
assert_topics(&[99], None, &[]);
assert_topics(&[0, 99], None, &[]);
assert_topics(&[0, 1, 2, 3, 42], None, &[]);
}
#[test]
fn constraints() {
let (store, _temp) = test_store();
store.index.write().options.max_total_size = 3000;
let source = StatementSource::Network;
let ok = SubmitResult::New;
assert!(matches!(
store.submit(statement(1, 1, Some(1), 2000), source),
SubmitResult::Rejected(_)
));
assert_eq!(store.submit(statement(1, 1, Some(1), 500), source), ok);
assert!(matches!(
store.submit(statement(1, 1, Some(1), 200), source),
SubmitResult::Rejected(_)
));
assert_eq!(store.submit(statement(1, 2, Some(1), 600), source), ok);
assert!(matches!(
store.submit(statement(1, 1, Some(2), 100), source),
SubmitResult::Rejected(_)
));
assert_eq!(store.index.read().expired.len(), 1);
assert_eq!(store.submit(statement(2, 1, None, 500), source), ok);
assert_eq!(store.submit(statement(2, 2, None, 100), source), ok);
assert_eq!(store.submit(statement(2, 3, None, 500), source), ok);
assert_eq!(store.index.read().expired.len(), 2);
assert_eq!(store.submit(statement(2, 4, None, 1000), source), ok);
assert_eq!(store.index.read().expired.len(), 4);
assert_eq!(store.submit(statement(3, 2, Some(1), 300), source), ok);
assert_eq!(store.submit(statement(3, 3, Some(2), 300), source), ok);
assert_eq!(store.submit(statement(3, 4, Some(3), 300), source), ok);
assert_eq!(store.submit(statement(3, 5, None, 500), source), ok);
assert_eq!(store.index.read().expired.len(), 6);
assert_eq!(store.index.read().total_size, 2400);
assert_eq!(store.index.read().entries.len(), 4);
assert!(matches!(
store.submit(statement(1, 1, None, 700), source),
SubmitResult::Rejected(_)
));
store.index.write().options.max_total_statements = 4;
assert!(matches!(
store.submit(statement(1, 1, None, 100), source),
SubmitResult::Rejected(_)
));
let mut expected_statements = vec![
statement(1, 2, Some(1), 600).hash(),
statement(2, 4, None, 1000).hash(),
statement(3, 4, Some(3), 300).hash(),
statement(3, 5, None, 500).hash(),
];
expected_statements.sort();
let mut statements: Vec<_> =
store.statements().unwrap().into_iter().map(|(hash, _)| hash).collect();
statements.sort();
assert_eq!(expected_statements, statements);
}
#[test]
fn max_statement_size_for_gossiping() {
let (store, _temp) = test_store();
store.index.write().options.max_total_size = 42 * MAX_STATEMENT_SIZE;
assert_eq!(
store.submit(
statement(42, 1, Some(1), MAX_STATEMENT_SIZE - 500),
StatementSource::Local
),
SubmitResult::New
);
assert!(matches!(
store.submit(statement(42, 2, Some(1), 2 * MAX_STATEMENT_SIZE), StatementSource::Local),
SubmitResult::Invalid(_)
));
}
#[test]
fn expired_statements_are_purged() {
use super::DEFAULT_PURGE_AFTER_SEC;
let (mut store, temp) = test_store();
let mut statement = statement(1, 1, Some(3), 100);
store.set_time(0);
statement.set_topic(0, topic(4));
store.submit(statement.clone(), StatementSource::Network);
assert_eq!(store.index.read().entries.len(), 1);
store.remove(&statement.hash()).unwrap();
assert_eq!(store.index.read().entries.len(), 0);
assert_eq!(store.index.read().accounts.len(), 0);
store.set_time(DEFAULT_PURGE_AFTER_SEC + 1);
store.maintain();
assert_eq!(store.index.read().expired.len(), 0);
let keystore = store.keystore.clone();
drop(store);
let client = std::sync::Arc::new(TestClient);
let mut path: std::path::PathBuf = temp.path().into();
path.push("db");
let store = Store::new::<Block, TestClient, TestBackend>(
&path,
Default::default(),
client,
keystore,
None,
Box::new(subsoil::core::testing::TaskExecutor::new()),
)
.unwrap();
assert_eq!(store.statements().unwrap().len(), 0);
assert_eq!(store.index.read().expired.len(), 0);
}
#[test]
fn posted_clear_decrypts() {
let (store, _temp) = test_store();
let public = store
.keystore
.ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
.unwrap();
let statement1 = statement(1, 1, None, 100);
let mut statement2 = statement(1, 2, None, 0);
let plain = b"The most valuable secret".to_vec();
statement2.encrypt(&plain, &public).unwrap();
store.submit(statement1, StatementSource::Network);
store.submit(statement2, StatementSource::Network);
let posted_clear = store.posted_clear(&[], public.into()).unwrap();
assert_eq!(posted_clear, vec![plain]);
}
#[test]
fn broadcasts_stmt_returns_encoded_statements() {
let (store, _tmp) = test_store();
let s0 = signed_statement_with_topics(0, &[], None);
let s1 = signed_statement_with_topics(1, &[topic(42)], None);
let s2 = signed_statement_with_topics(2, &[topic(42)], Some(dec_key(99)));
for s in [&s0, &s1, &s2] {
store.submit(s.clone(), StatementSource::Network);
}
let mut hashes: Vec<_> = store
.broadcasts_stmt(&[])
.unwrap()
.into_iter()
.map(|bytes| Statement::decode(&mut &bytes[..]).unwrap().hash())
.collect();
hashes.sort();
let expected_hashes = {
let mut e = vec![s0.hash(), s1.hash()];
e.sort();
e
};
assert_eq!(hashes, expected_hashes);
let got = store.broadcasts_stmt(&[topic(42)]).unwrap();
assert_eq!(got.len(), 1);
let st = Statement::decode(&mut &got[0][..]).unwrap();
assert_eq!(st.hash(), s1.hash());
}
#[test]
fn posted_stmt_returns_encoded_statements_for_dest() {
let (store, _tmp) = test_store();
let public1 = store
.keystore
.ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
.unwrap();
let dest: [u8; 32] = public1.into();
let public2 = store
.keystore
.ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
.unwrap();
let mut s_with_key = statement(1, 1, None, 0);
let plain1 = b"The most valuable secret".to_vec();
s_with_key.encrypt(&plain1, &public1).unwrap();
let mut s_other_key = statement(2, 2, None, 0);
let plain2 = b"The second most valuable secret".to_vec();
s_other_key.encrypt(&plain2, &public2).unwrap();
for s in [&s_with_key, &s_other_key] {
store.submit(s.clone(), StatementSource::Network);
}
let retrieved = store.posted_stmt(&[], dest).unwrap();
assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
let returned_stmt = Statement::decode(&mut &retrieved[0][..]).unwrap();
assert_eq!(
returned_stmt.hash(),
s_with_key.hash(),
"Returned statement must match s_with_key"
);
}
#[test]
fn posted_clear_stmt_returns_statement_followed_by_plain_data() {
let (store, _tmp) = test_store();
let public1 = store
.keystore
.ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
.unwrap();
let dest: [u8; 32] = public1.into();
let public2 = store
.keystore
.ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
.unwrap();
let mut s_with_key = statement(1, 1, None, 0);
let plain1 = b"The most valuable secret".to_vec();
s_with_key.encrypt(&plain1, &public1).unwrap();
let mut s_other_key = statement(2, 2, None, 0);
let plain2 = b"The second most valuable secret".to_vec();
s_other_key.encrypt(&plain2, &public2).unwrap();
for s in [&s_with_key, &s_other_key] {
store.submit(s.clone(), StatementSource::Network);
}
let retrieved = store.posted_clear_stmt(&[], dest).unwrap();
assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
let encoded_stmt = s_with_key.encode();
let stmt_len = encoded_stmt.len();
assert_eq!(&retrieved[0][..stmt_len], &encoded_stmt[..]);
let trailing = &retrieved[0][stmt_len..];
assert_eq!(trailing, &plain1[..]);
}
#[test]
fn posted_clear_returns_plain_data_for_dest_and_topics() {
let (store, _tmp) = test_store();
let public_dest = store
.keystore
.ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
.unwrap();
let dest: [u8; 32] = public_dest.into();
let public_other = store
.keystore
.ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
.unwrap();
let mut s_good = statement(1, 1, None, 0);
let plaintext_good = b"The most valuable secret".to_vec();
s_good.encrypt(&plaintext_good, &public_dest).unwrap();
s_good.set_topic(0, topic(42));
let mut s_wrong_topic = statement(2, 2, None, 0);
s_wrong_topic.encrypt(b"Wrong topic", &public_dest).unwrap();
s_wrong_topic.set_topic(0, topic(99));
let mut s_other_dest = statement(3, 3, None, 0);
s_other_dest.encrypt(b"Other dest", &public_other).unwrap();
s_other_dest.set_topic(0, topic(42));
for s in [&s_good, &s_wrong_topic, &s_other_dest] {
store.submit(s.clone(), StatementSource::Network);
}
let retrieved = store.posted_clear(&[topic(42)], dest).unwrap();
assert_eq!(retrieved, vec![plaintext_good]);
}
#[test]
fn already_expired_statement_is_rejected() {
let (mut store, _temp) = test_store();
store.set_time(1000);
let mut expired_statement = statement(1, 1, None, 100);
expired_statement.set_expiry_from_parts(500, 1);
assert_eq!(
store.submit(expired_statement, StatementSource::Network),
SubmitResult::Invalid(InvalidReason::AlreadyExpired)
);
assert_eq!(store.statements().unwrap().len(), 0);
let mut valid_statement = statement(1, 1, None, 100);
valid_statement.set_expiry_from_parts(2000, 1);
assert_eq!(store.submit(valid_statement, StatementSource::Network), SubmitResult::New);
assert_eq!(store.statements().unwrap().len(), 1);
}
#[test]
fn remove_by_covers_various_situations() {
use soil_statement_store::{StatementSource, StatementStore, SubmitResult};
let (mut store, _temp) = test_store();
store.set_time(0);
let t42 = topic(42);
let k7 = dec_key(7);
let mut s_a1 = statement(4, 10, Some(100), 100);
s_a1.set_topic(0, t42);
let h_a1 = s_a1.hash();
let mut s_a2 = statement(4, 20, Some(200), 150);
s_a2.set_decryption_key(k7);
let h_a2 = s_a2.hash();
let s_a3 = statement(4, 30, None, 50);
let h_a3 = s_a3.hash();
let s_b1 = statement(3, 10, None, 100);
let h_b1 = s_b1.hash();
let mut s_b2 = statement(3, 15, Some(300), 100);
s_b2.set_topic(0, t42);
s_b2.set_decryption_key(k7);
let h_b2 = s_b2.hash();
for s in [&s_a1, &s_a2, &s_a3, &s_b1, &s_b2] {
assert_eq!(store.submit(s.clone(), StatementSource::Network), SubmitResult::New);
}
{
let idx = store.index.read();
assert_eq!(idx.entries.len(), 5, "all 5 should be present");
assert!(idx.accounts.contains_key(&account(4)));
assert!(idx.accounts.contains_key(&account(3)));
assert_eq!(idx.total_size, 100 + 150 + 50 + 100 + 100);
let set_t = idx.by_topic.get(&t42).expect("topic set exists");
assert!(set_t.contains(&h_a1) && set_t.contains(&h_b2));
let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
assert!(set_k.contains(&h_a2) && set_k.contains(&h_b2));
}
store.remove_by(account(4)).expect("remove_by should succeed");
{
for h in [h_a1, h_a2, h_a3] {
assert!(store.statement(&h).unwrap().is_none(), "A's statement should be removed");
}
for h in [h_b1, h_b2] {
assert!(store.statement(&h).unwrap().is_some(), "B's statement should remain");
}
let idx = store.index.read();
assert!(!idx.accounts.contains_key(&account(4)), "Account A must be gone");
assert!(idx.accounts.contains_key(&account(3)), "Account B must remain");
assert!(idx.expired.contains_key(&h_a1));
assert!(idx.expired.contains_key(&h_a2));
assert!(idx.expired.contains_key(&h_a3));
assert_eq!(idx.expired.len(), 3);
assert_eq!(idx.entries.len(), 2);
assert_eq!(idx.total_size, 100 + 100);
let set_t = idx.by_topic.get(&t42).expect("topic set exists");
assert!(set_t.contains(&h_b2));
assert!(!set_t.contains(&h_a1));
let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
assert!(set_k.contains(&h_b2));
assert!(!set_k.contains(&h_a2));
}
store.remove_by(account(4)).expect("second remove_by should be a no-op");
let purge_after = store.index.read().options.purge_after_sec;
store.set_time(purge_after + 1);
store.maintain();
assert_eq!(store.index.read().expired.len(), 0, "expired entries should be purged");
let s_new = statement(4, 40, None, 10);
assert_eq!(store.submit(s_new, StatementSource::Network), SubmitResult::New);
}
#[test]
fn check_expiration_repopulates_account_list_when_empty() {
let (mut store, _temp) = test_store();
store.set_time(1000);
let s1 = statement(1, 1, None, 100);
let s2 = statement(2, 1, None, 100);
let s3 = statement(3, 1, None, 100);
for s in [&s1, &s2, &s3] {
store.submit(s.clone(), StatementSource::Network);
}
assert!(store.index.read().accounts_to_check_for_expiry_stmts.is_empty());
store.enforce_limits();
let accounts = store.index.read().accounts_to_check_for_expiry_stmts.clone();
assert_eq!(accounts.len(), 3, "Should have 3 accounts to check");
assert!(accounts.contains(&account(1)));
assert!(accounts.contains(&account(2)));
assert!(accounts.contains(&account(3)));
assert_eq!(store.index.read().expired.len(), 0);
assert_eq!(store.index.read().entries.len(), 3);
}
#[test]
fn check_expiration_expires_statements_past_current_time() {
let (mut store, _temp) = test_store();
store.set_time(100);
let mut expired_stmt = statement(1, 1, None, 100);
expired_stmt.set_expiry_from_parts(500, 1);
let expired_hash = expired_stmt.hash();
store.submit(expired_stmt, StatementSource::Network);
let valid_stmt = statement(2, 1, None, 100); let valid_hash = valid_stmt.hash();
store.submit(valid_stmt, StatementSource::Network);
assert_eq!(store.index.read().entries.len(), 2);
store.enforce_limits();
assert!(!store.index.read().accounts_to_check_for_expiry_stmts.is_empty());
store.set_time(1000);
store.enforce_limits();
let index = store.index.read();
assert!(index.expired.contains_key(&expired_hash), "Expired statement should be marked");
assert!(
!index.entries.contains_key(&expired_hash),
"Expired statement should be removed from entries"
);
assert!(
index.entries.contains_key(&valid_hash),
"Valid statement should still be in entries"
);
assert!(!index.expired.contains_key(&valid_hash), "Valid statement should not be expired");
}
#[test]
fn check_expiration_removes_checked_accounts_from_list_when_expiring() {
let (mut store, _temp) = test_store();
store.set_time(100);
let mut stmt1 = statement(1, 1, None, 100);
stmt1.set_expiry_from_parts(200, 1);
store.submit(stmt1, StatementSource::Network);
let mut stmt2 = statement(2, 1, None, 100);
stmt2.set_expiry_from_parts(200, 1);
store.submit(stmt2, StatementSource::Network);
let mut stmt3 = statement(3, 1, None, 100);
stmt3.set_expiry_from_parts(200, 1);
store.submit(stmt3, StatementSource::Network);
store.enforce_limits();
assert_eq!(
store.index.read().accounts_to_check_for_expiry_stmts.len(),
3,
"Should have 3 accounts to check"
);
store.set_time(300);
store.enforce_limits();
assert!(
store.index.read().accounts_to_check_for_expiry_stmts.is_empty(),
"All accounts should have been checked and removed after expiration"
);
assert_eq!(store.index.read().expired.len(), 3);
assert_eq!(store.index.read().entries.len(), 0);
}
#[test]
fn check_expiration_truncates_list_even_when_nothing_expires() {
let (mut store, _temp) = test_store();
store.set_time(1000);
for acc_id in 1..=5u64 {
let stmt = statement(acc_id, 1, None, 100);
store.submit(stmt, StatementSource::Network);
}
store.enforce_limits();
assert_eq!(store.index.read().accounts_to_check_for_expiry_stmts.len(), 5);
store.enforce_limits();
assert!(
store.index.read().accounts_to_check_for_expiry_stmts.is_empty(),
"List should be empty after all accounts have been checked"
);
assert_eq!(store.index.read().expired.len(), 0);
assert_eq!(store.index.read().entries.len(), 5);
}
#[test]
fn check_expiration_handles_multiple_statements_per_account() {
let (mut store, _temp) = test_store();
store.set_time(100);
let mut stmt1 = statement(42, 1, Some(1), 100);
stmt1.set_expiry_from_parts(200, 1); let hash1 = stmt1.hash();
store.submit(stmt1, StatementSource::Network);
let mut stmt2 = statement(42, 2, Some(2), 100);
stmt2.set_expiry_from_parts(300, 2); let hash2 = stmt2.hash();
store.submit(stmt2, StatementSource::Network);
let mut stmt3 = statement(42, 3, Some(3), 100);
stmt3.set_expiry_from_parts(500, 3); let hash3 = stmt3.hash();
store.submit(stmt3, StatementSource::Network);
assert_eq!(store.index.read().entries.len(), 3);
store.enforce_limits();
store.set_time(250);
store.enforce_limits();
{
let index = store.index.read();
assert!(index.expired.contains_key(&hash1), "stmt1 should be expired");
assert!(!index.expired.contains_key(&hash2), "stmt2 should not be expired yet");
assert!(!index.expired.contains_key(&hash3), "stmt3 should not be expired yet");
assert_eq!(index.entries.len(), 2);
}
store.enforce_limits();
store.set_time(400);
store.enforce_limits();
{
let index = store.index.read();
assert!(index.expired.contains_key(&hash1));
assert!(index.expired.contains_key(&hash2), "stmt2 should be expired");
assert!(!index.expired.contains_key(&hash3), "stmt3 should not be expired yet");
assert_eq!(index.entries.len(), 1);
}
store.enforce_limits();
store.set_time(600);
store.enforce_limits();
{
let index = store.index.read();
assert!(index.expired.contains_key(&hash1));
assert!(index.expired.contains_key(&hash2));
assert!(index.expired.contains_key(&hash3), "stmt3 should be expired");
assert_eq!(index.entries.len(), 0);
}
}
#[test]
fn check_expiration_does_nothing_when_no_expired_statements() {
let (mut store, _temp) = test_store();
store.set_time(1000);
let stmt = statement(1, 1, None, 100);
let hash = stmt.hash();
store.submit(stmt, StatementSource::Network);
store.enforce_limits();
store.enforce_limits();
let index = store.index.read();
assert!(index.entries.contains_key(&hash));
assert!(!index.expired.contains_key(&hash));
assert_eq!(index.entries.len(), 1);
assert_eq!(index.expired.len(), 0);
}
#[test]
fn check_expiration_correctly_updates_account_data() {
let (mut store, _temp) = test_store();
store.set_time(100);
let mut stmt = statement(1, 1, Some(1), 100);
stmt.set_expiry_from_parts(200, 1);
let hash = stmt.hash();
store.submit(stmt, StatementSource::Network);
{
let index = store.index.read();
assert!(index.accounts.contains_key(&account(1)));
assert_eq!(index.total_size, 100);
}
store.enforce_limits();
store.set_time(300);
store.enforce_limits();
{
let index = store.index.read();
assert!(
!index.accounts.contains_key(&account(1)),
"Account should be removed when all its statements expire"
);
assert_eq!(index.total_size, 0, "Total size should be zero");
assert!(index.expired.contains_key(&hash));
}
}
#[test]
fn check_expiration_clears_topic_and_key_indexes() {
let (mut store, _temp) = test_store();
store.set_time(100);
let mut stmt = statement(1, 1, Some(1), 100);
stmt.set_expiry_from_parts(200, 1);
stmt.set_topic(0, topic(42));
stmt.set_decryption_key(dec_key(7));
let hash = stmt.hash();
store.submit(stmt, StatementSource::Network);
{
let index = store.index.read();
assert!(index.by_topic.get(&topic(42)).map_or(false, |s| s.contains(&hash)));
assert!(index.by_dec_key.get(&Some(dec_key(7))).map_or(false, |s| s.contains(&hash)));
}
store.enforce_limits();
store.set_time(300);
store.enforce_limits();
{
let index = store.index.read();
assert!(
index.by_topic.get(&topic(42)).map_or(true, |s| s.is_empty()),
"Topic index should be cleared"
);
assert!(
index.by_dec_key.get(&Some(dec_key(7))).map_or(true, |s| s.is_empty()),
"Decryption key index should be cleared"
);
assert!(index.expired.contains_key(&hash));
}
}
#[test]
fn check_expiration_handles_empty_store() {
let (mut store, _temp) = test_store();
store.set_time(1000);
store.enforce_limits();
store.enforce_limits();
assert!(store.index.read().accounts_to_check_for_expiry_stmts.is_empty());
assert_eq!(store.index.read().entries.len(), 0);
assert_eq!(store.index.read().expired.len(), 0);
}
#[test]
fn check_expiration_expires_properly_formatted_statements() {
let (mut store, _temp) = test_store();
store.set_time(1000);
let mut stmt = statement(1, 1, None, 100);
stmt.set_expiry_from_parts(1001, 1); let hash = stmt.hash();
store.submit(stmt, StatementSource::Network);
assert_eq!(store.index.read().entries.len(), 1);
store.enforce_limits();
store.set_time(2000);
store.enforce_limits();
let index = store.index.read();
assert!(
!index.entries.contains_key(&hash),
"Statement should be removed from entries after expiration"
);
assert!(index.expired.contains_key(&hash), "Statement should be in expired list");
}
#[test]
fn check_expiration_updates_database_columns() {
let (mut store, _temp) = test_store();
store.set_time(100);
let mut stmt = statement(1, 1, None, 100);
stmt.set_expiry_from_parts(200, 1);
let hash = stmt.hash();
store.submit(stmt.clone(), StatementSource::Network);
let db_entry = store.db.get(col::STATEMENTS, &hash).unwrap();
assert!(db_entry.is_some(), "Statement should be in col::STATEMENTS after submit");
store.enforce_limits();
store.set_time(300);
store.enforce_limits();
{
let index = store.index.read();
assert!(
!index.entries.contains_key(&hash),
"Statement should be removed from in-memory entries"
);
assert!(
index.expired.contains_key(&hash),
"Statement should be in in-memory expired map"
);
}
let db_entry = store.db.get(col::STATEMENTS, &hash).unwrap();
assert!(
db_entry.is_none(),
"Statement should be removed from col::STATEMENTS after expiration"
);
let expired_entry = store.db.get(col::EXPIRED, &hash).unwrap();
assert!(expired_entry.is_some(), "Expiration info should be written to col::EXPIRED");
}
#[test]
fn enforce_allowances_evicts_excess_statements() {
let (mut store, _temp) = test_store();
store.set_time(0);
let s1 = statement(4, 10, None, 100); let s2 = statement(4, 20, None, 100);
let s3 = statement(4, 30, None, 100);
let s4 = statement(4, 40, None, 100);
let s5 = statement(4, 50, None, 100);
let h1 = s1.hash();
let h5 = s5.hash();
{
let mut index = store.index.write();
for s in [&s1, &s2, &s3, &s4, &s5] {
index.insert_new(s.hash(), account(4), s, false);
}
}
assert_eq!(store.index.read().entries.len(), 5);
assert_eq!(store.index.read().total_size, 500);
store.enforce_limits();
store.enforce_limits();
let index = store.index.read();
assert_eq!(index.entries.len(), 4, "Should have 4 statements after eviction");
assert!(!index.entries.contains_key(&h1), "Lowest priority should be evicted");
assert!(index.entries.contains_key(&h5), "Highest priority should remain");
assert_eq!(index.total_size, 400);
assert!(index.expired.contains_key(&h1));
}
#[test]
fn enforce_allowances_evicts_all_when_no_allowance_found() {
let (mut store, _temp) = test_store();
store.set_time(0);
let s1 = statement(0, 10, None, 100);
let s2 = statement(0, 20, None, 150);
let h1 = s1.hash();
let h2 = s2.hash();
{
let mut index = store.index.write();
index.insert_new(h1, account(0), &s1, false);
index.insert_new(h2, account(0), &s2, false);
}
assert_eq!(store.index.read().entries.len(), 2);
store.enforce_limits();
store.enforce_limits();
let index = store.index.read();
assert_eq!(index.entries.len(), 0, "All statements should be evicted");
assert!(!index.accounts.contains_key(&account(0)), "Account should be removed");
assert!(index.expired.contains_key(&h1));
assert!(index.expired.contains_key(&h2));
}
#[test]
fn enforce_allowances_based_on_size() {
let (mut store, _temp) = test_store();
store.set_time(0);
let s1 = statement(2, 10, None, 600); let s2 = statement(2, 20, None, 600);
let h1 = s1.hash();
let h2 = s2.hash();
{
let mut index = store.index.write();
index.insert_new(h1, account(2), &s1, false);
index.insert_new(h2, account(2), &s2, false);
}
assert_eq!(store.index.read().total_size, 1200);
store.enforce_limits();
store.enforce_limits();
let index = store.index.read();
assert_eq!(index.entries.len(), 1);
assert!(index.entries.contains_key(&h2), "Higher priority should remain");
assert!(!index.entries.contains_key(&h1), "Lower priority should be evicted");
assert_eq!(index.total_size, 600);
}
}