Skip to main content

soil_network/statement_store/
mod.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Disk-backed statement store.
8//!
9//! This module contains an implementation of `soil_statement_store::StatementStore` which is backed
10//! by a database.
11//!
12//! Constraint management.
13//!
14//! The statement store validates statements using node-side signature verification and
15//! static runtime allowance limits.
16//! The following constraints are then checked:
17//! * For a given account id, there may be at most `max_count` statements with `max_size` total data
18//!   size. To satisfy this, statements for this account ID are removed from the store starting with
19//!   the lowest priority until a constraint is satisfied.
20//! * There may not be more than `MAX_TOTAL_STATEMENTS` total statements with `MAX_TOTAL_SIZE` size.
21//!   To satisfy this, statements are removed from the store starting with the lowest
22//!   `global_priority` until a constraint is satisfied.
23//!
24//! When a new statement is inserted that would not satisfy constraints in the first place, no
25//! statements are deleted and `Ignored` result is returned.
26//! The order in which statements with the same priority are deleted is unspecified.
27//!
28//! Statement expiration.
29//!
30//! Each time a statement is removed from the store (Either evicted by higher priority statement or
31//! explicitly with the `remove` function) the statement is marked as expired. Expired statements
32//! can't be added to the store for `Options::purge_after_sec` seconds. This is to prevent old
33//! statements from being propagated on the network.
34
35#![warn(missing_docs)]
36#![warn(unused_extern_crates)]
37
38mod metrics;
39mod subscription;
40
41use self::subscription::{SubscriptionStatementsStream, SubscriptionsHandle};
42use futures::FutureExt;
43use metrics::MetricsLink as PrometheusMetrics;
44use parking_lot::{lock_api::RwLockUpgradableReadGuard, RwLock};
45use soil_prometheus::Registry as PrometheusRegistry;
46use soil_client::blockchain::HeaderBackend;
47use soil_client::client_api::{backend::StorageProvider, Backend, StorageKey};
48use soil_client::keystore::LocalKeystore;
49use soil_statement_store::{
50	runtime_api::{StatementSource, StatementStoreExt},
51	AccountId, BlockHash, Channel, DecryptionKey, FilterDecision, Hash, InvalidReason,
52	OptimizedTopicFilter, Proof, RejectionReason, Result, SignatureVerificationResult, Statement,
53	StatementAllowance, StatementEvent, SubmitResult, Topic,
54};
55pub use soil_statement_store::{Error, StatementStore, MAX_TOPICS};
56use std::{
57	collections::{BTreeMap, HashMap, HashSet},
58	sync::Arc,
59	time::{Duration, Instant},
60};
61pub use subscription::StatementStoreSubscriptionApi;
62use subsoil::core::{
63	crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode,
64};
65use subsoil::runtime::traits::Block as BlockT;
66
67const KEY_VERSION: &[u8] = b"version".as_slice();
68const CURRENT_VERSION: u32 = 1;
69
70const LOG_TARGET: &str = "statement-store";
71
72/// The amount of time an expired statement is kept before it is removed from the store entirely.
73pub const DEFAULT_PURGE_AFTER_SEC: u64 = 2 * 24 * 60 * 60; // 48h
74/// The maximum number of statements the statement store can hold.
75pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; // ~4 million
76/// The maximum amount of data the statement store can hold, regardless of the number of
77/// statements from which the data originates.
78pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; // 2GiB
79/// The maximum size of a single statement in bytes.
80/// Accounts for the 1-byte vector length prefix when statements are gossiped as `Vec<Statement>`.
81pub const MAX_STATEMENT_SIZE: usize =
82	crate::statement::config::MAX_STATEMENT_NOTIFICATION_SIZE as usize - 1;
83
84/// Maximum number of statements to expire in a single iteration.
85const MAX_EXPIRY_STATEMENTS_PER_ITERATION: usize = 10_000;
86/// Maximum number of accounts to check for expiry in a single iteration.
87const MAX_EXPIRY_ACCOUNTS_PER_ITERATION: usize = 10_000;
88/// Maximum time in milliseconds to spend checking for expiry in a single iteration.
89const MAX_EXPIRY_TIME_PER_ITERATION: Duration = Duration::from_millis(100);
90
91/// Number of subscription filter worker tasks.
92const NUM_FILTER_WORKERS: usize = 1;
93
94const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(29);
95
96// Period between enforcing limits (checking for expired statements and making sure statements stay
97// within allowances). Different from maintenance period to avoid keeping the lock for too long for
98// maintenance tasks.
99const ENFORCE_LIMITS_PERIOD: std::time::Duration = std::time::Duration::from_secs(31);
100
101mod col {
102	pub const META: u8 = 0;
103	pub const STATEMENTS: u8 = 1;
104	pub const EXPIRED: u8 = 2;
105
106	pub const COUNT: u8 = 3;
107}
108
109#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)]
110struct Expiry(u64);
111
112#[derive(PartialEq, Eq)]
113struct PriorityKey {
114	hash: Hash,
115	expiry: Expiry,
116}
117
118impl PartialOrd for PriorityKey {
119	fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
120		Some(self.cmp(other))
121	}
122}
123
124impl Ord for PriorityKey {
125	fn cmp(&self, other: &Self) -> std::cmp::Ordering {
126		self.expiry.cmp(&other.expiry).then_with(|| self.hash.cmp(&other.hash))
127	}
128}
129
130#[derive(PartialEq, Eq)]
131struct ChannelEntry {
132	hash: Hash,
133	expiry: Expiry,
134}
135
136#[derive(Default)]
137struct StatementsForAccount {
138	// Statements ordered by priority.
139	by_priority: BTreeMap<PriorityKey, (Option<Channel>, usize)>,
140	// Channel to statement map. Only one statement per channel is allowed.
141	channels: HashMap<Channel, ChannelEntry>,
142	// Sum of all `Data` field sizes.
143	data_size: usize,
144}
145
146impl StatementsForAccount {
147	/// Returns an iterator over statements that have expired by `current_time`.
148	fn expired_by_iter(
149		&self,
150		current_time: u64,
151	) -> impl Iterator<Item = (&PriorityKey, &(Option<Channel>, usize))> {
152		let range = PriorityKey { hash: Hash::default(), expiry: Expiry(0) }..PriorityKey {
153			hash: Hash::default(),
154			expiry: Expiry(current_time << 32),
155		};
156		self.by_priority.range(range)
157	}
158}
159
160/// Store configuration
161pub struct Options {
162	/// Maximum statement allowed in the store. Once this limit is reached lower-priority
163	/// statements may be evicted.
164	max_total_statements: usize,
165	/// Maximum total data size allowed in the store. Once this limit is reached lower-priority
166	/// statements may be evicted.
167	max_total_size: usize,
168	/// Number of seconds for which removed statements won't be allowed to be added back in.
169	purge_after_sec: u64,
170}
171
172impl Default for Options {
173	fn default() -> Self {
174		Options {
175			max_total_statements: DEFAULT_MAX_TOTAL_STATEMENTS,
176			max_total_size: DEFAULT_MAX_TOTAL_SIZE,
177			purge_after_sec: DEFAULT_PURGE_AFTER_SEC,
178		}
179	}
180}
181
182#[derive(Default)]
183struct Index {
184	recent: HashSet<Hash>,
185	by_topic: HashMap<Topic, HashSet<Hash>>,
186	by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
187	topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
188	entries: HashMap<Hash, (AccountId, Expiry, usize)>,
189	expired: HashMap<Hash, u64>, // Value is expiration timestamp.
190	accounts: HashMap<AccountId, StatementsForAccount>,
191	accounts_to_check_for_expiry_stmts: Vec<AccountId>,
192	options: Options,
193	total_size: usize,
194}
195
196struct ClientWrapper<Block, Client, BE> {
197	client: Arc<Client>,
198	_block: std::marker::PhantomData<Block>,
199	_backend: std::marker::PhantomData<BE>,
200}
201
202impl<Block, Client, BE> ClientWrapper<Block, Client, BE>
203where
204	Block: BlockT,
205	Block::Hash: From<BlockHash>,
206	BE: Backend<Block> + 'static,
207	Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
208{
209	fn read_allowance(
210		&self,
211		account_id: &AccountId,
212		block_hash: Option<Block::Hash>,
213	) -> Result<Option<StatementAllowance>> {
214		use soil_statement_store::{statement_allowance_key, StatementAllowance};
215
216		let block_hash = block_hash.unwrap_or(self.client.info().finalized_hash);
217		let key = statement_allowance_key(account_id);
218		let storage_key = StorageKey(key);
219		self.client
220			.storage(block_hash, &storage_key)
221			.map_err(|e| Error::Storage(format!("Failed to read allowance: {:?}", e)))?
222			.map(|value| {
223				StatementAllowance::decode(&mut &value.0[..])
224					.map_err(|e| Error::Decode(format!("Failed to decode allowance: {:?}", e)))
225			})
226			.transpose()
227	}
228}
229
230/// Statement store.
231pub struct Store {
232	db: parity_db::Db,
233	index: RwLock<Index>,
234	read_allowance_fn: Box<
235		dyn Fn(&AccountId, Option<BlockHash>) -> Result<Option<StatementAllowance>> + Send + Sync,
236	>,
237	subscription_manager: SubscriptionsHandle,
238	keystore: Arc<LocalKeystore>,
239	// Used for testing
240	time_override: Option<u64>,
241	metrics: PrometheusMetrics,
242}
243
244enum IndexQuery {
245	Unknown,
246	Exists,
247	Expired,
248}
249
250impl Index {
251	fn new(options: Options) -> Index {
252		Index { options, ..Default::default() }
253	}
254
255	fn insert_new(
256		&mut self,
257		hash: Hash,
258		account: AccountId,
259		statement: &Statement,
260		is_recent: bool,
261	) {
262		let mut all_topics = [None; MAX_TOPICS];
263		let mut nt = 0;
264		while let Some(t) = statement.topic(nt) {
265			self.by_topic.entry(t).or_default().insert(hash);
266			all_topics[nt] = Some(t);
267			nt += 1;
268		}
269		let key = statement.decryption_key();
270		self.by_dec_key.entry(key).or_default().insert(hash);
271		if nt > 0 || key.is_some() {
272			self.topics_and_keys.insert(hash, (all_topics, key));
273		}
274		let expiry = Expiry(statement.expiry());
275		self.entries.insert(hash, (account, expiry, statement.data_len()));
276		if is_recent {
277			self.recent.insert(hash);
278		}
279		self.total_size += statement.data_len();
280		let account_info = self.accounts.entry(account).or_default();
281		account_info.data_size += statement.data_len();
282		if let Some(channel) = statement.channel() {
283			account_info.channels.insert(channel, ChannelEntry { hash, expiry });
284		}
285		account_info
286			.by_priority
287			.insert(PriorityKey { hash, expiry }, (statement.channel(), statement.data_len()));
288	}
289
290	fn query(&self, hash: &Hash) -> IndexQuery {
291		if self.entries.contains_key(hash) {
292			return IndexQuery::Exists;
293		}
294		if self.expired.contains_key(hash) {
295			return IndexQuery::Expired;
296		}
297		IndexQuery::Unknown
298	}
299
300	fn insert_expired(&mut self, hash: Hash, timestamp: u64) {
301		self.expired.insert(hash, timestamp);
302	}
303
304	fn iterate_with(
305		&self,
306		key: Option<DecryptionKey>,
307		topic: &OptimizedTopicFilter,
308		f: impl FnMut(&Hash) -> Result<()>,
309	) -> Result<()> {
310		match topic {
311			OptimizedTopicFilter::Any => self.iterate_with_any(key, f),
312			OptimizedTopicFilter::MatchAll(topics) => {
313				self.iterate_with_match_all(key, topics.iter(), f)
314			},
315			OptimizedTopicFilter::MatchAny(topics) => {
316				self.iterate_with_match_any(key, topics.iter(), f)
317			},
318		}
319	}
320
321	fn iterate_with_match_any<'a>(
322		&self,
323		key: Option<DecryptionKey>,
324		match_any_topics: impl ExactSizeIterator<Item = &'a Topic>,
325		mut f: impl FnMut(&Hash) -> Result<()>,
326	) -> Result<()> {
327		let Some(key_set) = self.by_dec_key.get(&key).filter(|k| !k.is_empty()) else {
328			return Ok(());
329		};
330
331		for t in match_any_topics {
332			let set = self.by_topic.get(t);
333
334			for item in set.iter().flat_map(|set| set.iter()) {
335				if key_set.contains(item) {
336					log::trace!(
337						target: LOG_TARGET,
338						"Iterating by topic/key: statement {:?}",
339						HexDisplay::from(item)
340					);
341					f(item)?
342				}
343			}
344		}
345		Ok(())
346	}
347
348	fn iterate_with_any(
349		&self,
350		key: Option<DecryptionKey>,
351		mut f: impl FnMut(&Hash) -> Result<()>,
352	) -> Result<()> {
353		let key_set = self.by_dec_key.get(&key);
354		if key_set.map_or(true, |s| s.is_empty()) {
355			// Key does not exist in the index.
356			return Ok(());
357		}
358
359		for item in key_set.map(|hashes| hashes.iter()).into_iter().flatten() {
360			f(item)?
361		}
362		Ok(())
363	}
364
365	fn iterate_with_match_all<'a>(
366		&self,
367		key: Option<DecryptionKey>,
368		match_all_topics: impl ExactSizeIterator<Item = &'a Topic>,
369		mut f: impl FnMut(&Hash) -> Result<()>,
370	) -> Result<()> {
371		let empty = HashSet::new();
372		let mut sets: [&HashSet<Hash>; MAX_TOPICS + 1] = [&empty; MAX_TOPICS + 1];
373		let num_topics = match_all_topics.len();
374		if num_topics > MAX_TOPICS {
375			return Ok(());
376		}
377		let key_set = self.by_dec_key.get(&key);
378		if key_set.map_or(true, |s| s.is_empty()) {
379			// Key does not exist in the index.
380			return Ok(());
381		}
382		sets[0] = key_set.expect("Function returns if key_set is None");
383		for (i, t) in match_all_topics.enumerate() {
384			let set = self.by_topic.get(t);
385			if set.map_or(0, |s| s.len()) == 0 {
386				// At least one of the match_all_topics does not exist in the index.
387				return Ok(());
388			}
389			sets[i + 1] = set.expect("Function returns if set is None");
390		}
391		let sets = &mut sets[0..num_topics + 1];
392		// Start with the smallest topic set or the key set.
393		sets.sort_by_key(|s| s.len());
394		for item in sets[0] {
395			if sets[1..].iter().all(|set| set.contains(item)) {
396				log::trace!(
397					target: LOG_TARGET,
398					"Iterating by topic/key: statement {:?}",
399					HexDisplay::from(item)
400				);
401				f(item)?
402			}
403		}
404		Ok(())
405	}
406
407	fn maintain(&mut self, current_time: u64) -> Vec<Hash> {
408		// Purge previously expired messages.
409		let mut purged = Vec::new();
410		self.expired.retain(|hash, timestamp| {
411			if *timestamp + self.options.purge_after_sec <= current_time {
412				purged.push(*hash);
413				log::trace!(target: LOG_TARGET, "Purged statement {:?}", HexDisplay::from(hash));
414				false
415			} else {
416				true
417			}
418		});
419		purged
420	}
421
422	fn take_recent(&mut self) -> HashSet<Hash> {
423		std::mem::take(&mut self.recent)
424	}
425
426	fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
427		if let Some((account, expiry, len)) = self.entries.remove(hash) {
428			self.total_size -= len;
429			if let Some((topics, key)) = self.topics_and_keys.remove(hash) {
430				for t in topics.into_iter().flatten() {
431					if let std::collections::hash_map::Entry::Occupied(mut set) =
432						self.by_topic.entry(t)
433					{
434						set.get_mut().remove(hash);
435						if set.get().is_empty() {
436							set.remove_entry();
437						}
438					}
439				}
440				if let std::collections::hash_map::Entry::Occupied(mut set) =
441					self.by_dec_key.entry(key)
442				{
443					set.get_mut().remove(hash);
444					if set.get().is_empty() {
445						set.remove_entry();
446					}
447				}
448			}
449			let _ = self.recent.remove(hash);
450			self.expired.insert(*hash, current_time);
451			if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
452				self.accounts.entry(account)
453			{
454				let key = PriorityKey { hash: *hash, expiry };
455				if let Some((channel, len)) = account_rec.get_mut().by_priority.remove(&key) {
456					account_rec.get_mut().data_size -= len;
457					if let Some(channel) = channel {
458						account_rec.get_mut().channels.remove(&channel);
459					}
460				}
461				if account_rec.get().by_priority.is_empty() {
462					account_rec.remove_entry();
463				}
464			}
465			log::trace!(target: LOG_TARGET, "Expired statement {:?}", HexDisplay::from(hash));
466			true
467		} else {
468			false
469		}
470	}
471
472	fn insert(
473		&mut self,
474		hash: Hash,
475		statement: &Statement,
476		account: &AccountId,
477		validation: &StatementAllowance,
478		current_time: u64,
479	) -> std::result::Result<HashSet<Hash>, RejectionReason> {
480		let statement_len = statement.data_len();
481		if statement_len > validation.max_size as usize {
482			log::debug!(
483				target: LOG_TARGET,
484				"Ignored oversize message: {:?} ({} bytes)",
485				HexDisplay::from(&hash),
486				statement_len,
487			);
488			return Err(RejectionReason::DataTooLarge {
489				submitted_size: statement_len,
490				available_size: validation.max_size as usize,
491			});
492		}
493
494		let mut evicted = HashSet::new();
495		let mut would_free_size = 0;
496		let expiry = Expiry(statement.expiry());
497		let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize);
498		// It may happen that we can't delete enough lower priority messages
499		// to satisfy size constraints. We check for that before deleting anything,
500		// taking into account channel message replacement.
501		if let Some(account_rec) = self.accounts.get(account) {
502			if let Some(channel) = statement.channel() {
503				if let Some(channel_record) = account_rec.channels.get(&channel) {
504					if expiry <= channel_record.expiry {
505						// Trying to replace channel message with lower expiry.
506						log::debug!(
507							target: LOG_TARGET,
508							"Ignored lower priority channel message: {:?} {:?} <= {:?}",
509							HexDisplay::from(&hash),
510							expiry,
511							channel_record.expiry,
512						);
513						return Err(RejectionReason::ChannelPriorityTooLow {
514							submitted_expiry: expiry.0,
515							min_expiry: channel_record.expiry.0,
516						});
517					} else {
518						// Would replace channel message. Still need to check for size constraints
519						// below.
520						log::debug!(
521							target: LOG_TARGET,
522							"Replacing higher priority channel message: {:?} ({:?}) > {:?} ({:?})",
523							HexDisplay::from(&hash),
524							expiry,
525							HexDisplay::from(&channel_record.hash),
526							channel_record.expiry,
527						);
528						let key = PriorityKey {
529							hash: channel_record.hash,
530							expiry: channel_record.expiry,
531						};
532						if let Some((_channel, len)) = account_rec.by_priority.get(&key) {
533							would_free_size += *len;
534							evicted.insert(channel_record.hash);
535						}
536					}
537				}
538			}
539			// Check if we can evict enough lower priority statements to satisfy constraints
540			for (entry, (_, len)) in account_rec.by_priority.iter() {
541				if (account_rec.data_size - would_free_size + statement_len <= max_size)
542					&& account_rec.by_priority.len() + 1 - evicted.len() <= max_count
543				{
544					// Satisfied
545					break;
546				}
547				if evicted.contains(&entry.hash) {
548					// Already accounted for above
549					continue;
550				}
551				if entry.expiry >= expiry {
552					log::debug!(
553						target: LOG_TARGET,
554						"Ignored message due to constraints {:?} {:?} < {:?}",
555						HexDisplay::from(&hash),
556						expiry,
557						entry.expiry,
558					);
559					return Err(RejectionReason::AccountFull {
560						submitted_expiry: expiry.0,
561						min_expiry: entry.expiry.0,
562					});
563				}
564				evicted.insert(entry.hash);
565				would_free_size += len;
566			}
567		}
568		// Now check global constraints as well.
569		if !((self.total_size - would_free_size + statement_len <= self.options.max_total_size)
570			&& self.entries.len() + 1 - evicted.len() <= self.options.max_total_statements)
571		{
572			log::debug!(
573				target: LOG_TARGET,
574				"Ignored statement {} because the store is full (size={}, count={})",
575				HexDisplay::from(&hash),
576				self.total_size,
577				self.entries.len(),
578			);
579			return Err(RejectionReason::StoreFull);
580		}
581
582		for h in &evicted {
583			self.make_expired(h, current_time);
584		}
585		self.insert_new(hash, *account, statement, true);
586		Ok(evicted)
587	}
588}
589
590impl Store {
591	/// Create a new shared store instance. There should only be one per process.
592	/// `path` will be used to open a statement database or create a new one if it does not exist.
593	pub fn new_shared<Block, Client, BE>(
594		path: &std::path::Path,
595		options: Options,
596		client: Arc<Client>,
597		keystore: Arc<LocalKeystore>,
598		prometheus: Option<&PrometheusRegistry>,
599		task_spawner: Box<dyn SpawnNamed>,
600	) -> Result<Arc<Store>>
601	where
602		Block: BlockT,
603		Block::Hash: From<BlockHash>,
604		BE: Backend<Block> + 'static,
605		Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
606	{
607		let store =
608			Arc::new(Self::new(path, options, client, keystore, prometheus, task_spawner.clone())?);
609
610		// Perform periodic statement store maintenance
611		let worker_store = store.clone();
612		task_spawner.spawn(
613			"statement-store-maintenance",
614			Some("statement-store"),
615			Box::pin(async move {
616				let mut maintenance_interval = tokio::time::interval(MAINTENANCE_PERIOD);
617				let mut enforce_limits_interval = tokio::time::interval(ENFORCE_LIMITS_PERIOD);
618				loop {
619					futures::select! {
620						_ = maintenance_interval.tick().fuse() => {worker_store.maintain();}
621						_ = enforce_limits_interval.tick().fuse() => {worker_store.enforce_limits();}
622					}
623				}
624			}),
625		);
626
627		Ok(store)
628	}
629
630	/// Create a new instance.
631	/// `path` will be used to open a statement database or create a new one if it does not exist.
632	#[doc(hidden)]
633	pub fn new<Block, Client, BE>(
634		path: &std::path::Path,
635		options: Options,
636		client: Arc<Client>,
637		keystore: Arc<LocalKeystore>,
638		prometheus: Option<&PrometheusRegistry>,
639		task_spawner: Box<dyn SpawnNamed>,
640	) -> Result<Store>
641	where
642		Block: BlockT,
643		Block::Hash: From<BlockHash>,
644		BE: Backend<Block> + 'static,
645		Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
646	{
647		let mut path: std::path::PathBuf = path.into();
648		path.push("statements");
649
650		let mut config = parity_db::Options::with_columns(&path, col::COUNT);
651
652		let statement_col = &mut config.columns[col::STATEMENTS as usize];
653		statement_col.ref_counted = false;
654		statement_col.preimage = true;
655		statement_col.uniform = true;
656		let db = parity_db::Db::open_or_create(&config).map_err(|e| Error::Db(e.to_string()))?;
657		match db.get(col::META, &KEY_VERSION).map_err(|e| Error::Db(e.to_string()))? {
658			Some(version) => {
659				let version = u32::from_le_bytes(
660					version
661						.try_into()
662						.map_err(|_| Error::Db("Error reading database version".into()))?,
663				);
664				if version != CURRENT_VERSION {
665					return Err(Error::Db(format!("Unsupported database version: {version}")));
666				}
667			},
668			None => {
669				db.commit([(
670					col::META,
671					KEY_VERSION.to_vec(),
672					Some(CURRENT_VERSION.to_le_bytes().to_vec()),
673				)])
674				.map_err(|e| Error::Db(e.to_string()))?;
675			},
676		}
677
678		let storage_reader =
679			ClientWrapper { client, _block: Default::default(), _backend: Default::default() };
680		let read_allowance_fn =
681			Box::new(move |account_id: &AccountId, block_hash: Option<BlockHash>| {
682				storage_reader.read_allowance(account_id, block_hash.map(Into::into))
683			});
684
685		let store = Store {
686			db,
687			index: RwLock::new(Index::new(options)),
688			read_allowance_fn,
689			keystore,
690			time_override: None,
691			metrics: PrometheusMetrics::new(prometheus),
692			subscription_manager: SubscriptionsHandle::new(
693				task_spawner.clone(),
694				NUM_FILTER_WORKERS,
695			),
696		};
697		store.populate()?;
698		Ok(store)
699	}
700
701	/// Create memory index from the data.
702	// This may be moved to a background thread if it slows startup too much.
703	// This function should only be used on startup. There should be no other DB operations when
704	// iterating the index.
705	fn populate(&self) -> Result<()> {
706		{
707			let mut index = self.index.write();
708			self.db
709				.iter_column_while(col::STATEMENTS, |item| {
710					let statement = item.value;
711					if let Ok(statement) = Statement::decode(&mut statement.as_slice()) {
712						let hash = statement.hash();
713						log::trace!(
714							target: LOG_TARGET,
715							"Statement loaded {:?}",
716							HexDisplay::from(&hash)
717						);
718						if let Some(account_id) = statement.account_id() {
719							index.insert_new(hash, account_id, &statement, false);
720						} else {
721							log::debug!(
722								target: LOG_TARGET,
723								"Error decoding statement loaded from the DB: {:?}",
724								HexDisplay::from(&hash)
725							);
726						}
727					}
728					true
729				})
730				.map_err(|e| Error::Db(e.to_string()))?;
731			self.db
732				.iter_column_while(col::EXPIRED, |item| {
733					let expired_info = item.value;
734					if let Ok((hash, timestamp)) =
735						<(Hash, u64)>::decode(&mut expired_info.as_slice())
736					{
737						log::trace!(
738							target: LOG_TARGET,
739							"Statement loaded (expired): {:?}",
740							HexDisplay::from(&hash)
741						);
742						index.insert_expired(hash, timestamp);
743					}
744					true
745				})
746				.map_err(|e| Error::Db(e.to_string()))?;
747		}
748
749		self.maintain();
750		Ok(())
751	}
752
753	fn collect_statements_locked<R>(
754		&self,
755		key: Option<DecryptionKey>,
756		topic_filter: &OptimizedTopicFilter,
757		index: &Index,
758		result: &mut Vec<R>,
759		mut f: impl FnMut(Statement) -> Option<R>,
760	) -> Result<()> {
761		index.iterate_with(key, topic_filter, |hash| {
762			match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? {
763				Some(entry) => {
764					if let Ok(statement) = Statement::decode(&mut entry.as_slice()) {
765						if let Some(data) = f(statement) {
766							result.push(data);
767						}
768					} else {
769						// DB inconsistency
770						log::warn!(
771							target: LOG_TARGET,
772							"Corrupt statement {:?}",
773							HexDisplay::from(hash)
774						);
775					}
776				},
777				None => {
778					// DB inconsistency
779					log::warn!(
780						target: LOG_TARGET,
781						"Missing statement {:?}",
782						HexDisplay::from(hash)
783					);
784				},
785			}
786			Ok(())
787		})?;
788		Ok(())
789	}
790
791	fn collect_statements<R>(
792		&self,
793		key: Option<DecryptionKey>,
794		topic_filter: &OptimizedTopicFilter,
795		f: impl FnMut(Statement) -> Option<R>,
796	) -> Result<Vec<R>> {
797		let mut result = Vec::new();
798		let index = self.index.read();
799		self.collect_statements_locked(key, topic_filter, &index, &mut result, f)?;
800		Ok(result)
801	}
802
803	// Collects expired and over-allowance statement hashes for a single account.
804	fn collect_evictions(
805		&self,
806		account: &AccountId,
807		account_rec: &StatementsForAccount,
808		current_time: u64,
809	) -> Vec<Hash> {
810		let mut to_evict = Vec::new();
811		let mut expired_count = 0usize;
812		let mut expired_size = 0usize;
813		for (key, (_, len)) in account_rec.expired_by_iter(current_time) {
814			to_evict.push(key.hash);
815			expired_count += 1;
816			expired_size += len;
817		}
818
819		// Enforce allowances for remaining (non-expired) statements
820		let allowance = match (self.read_allowance_fn)(account, None) {
821			Ok(Some(allowance)) => allowance,
822			Ok(None) => {
823				log::debug!(
824					target: LOG_TARGET,
825					"No allowance found for account {:?}, treating as zero allowance",
826					HexDisplay::from(account)
827				);
828				StatementAllowance { max_count: 0, max_size: 0 }
829			},
830			Err(e) => {
831				log::error!(target: LOG_TARGET, "Error reading allowance: {:?}", e);
832				// Skip allowance enforcement for this account on error
833				return to_evict;
834			},
835		};
836
837		// Calculate remaining count and size after expiring statements
838		let mut remaining_count = account_rec.by_priority.len() - expired_count;
839		let mut remaining_size = account_rec.data_size - expired_size;
840
841		// Evict lowest priority statements that exceed allowance
842		if remaining_count > allowance.max_count as usize
843			|| remaining_size > allowance.max_size as usize
844		{
845			log::debug!(
846				target: LOG_TARGET,
847				"Account {:?} exceeds allowance: count={}/{}, size={}/{}",
848				HexDisplay::from(account),
849				remaining_count,
850				allowance.max_count,
851				remaining_size,
852				allowance.max_size
853			);
854
855			// Skip expired statements (they're at the beginning due to BTreeMap ordering)
856			for (key, (_, len)) in account_rec.by_priority.iter().skip(expired_count) {
857				if remaining_count <= allowance.max_count as usize
858					&& remaining_size <= allowance.max_size as usize
859				{
860					break;
861				}
862				to_evict.push(key.hash);
863				remaining_count -= 1;
864				remaining_size -= len;
865				log::debug!(
866					target: LOG_TARGET,
867					"Evicting statement {:?} due to allowance enforcement",
868					HexDisplay::from(&key.hash)
869				);
870			}
871		}
872
873		to_evict
874	}
875
876	// Checks for expired statements and enforces allowances, marking violating statements
877	// as expired in the index.
878	//
879	// This function performs incremental checking to avoid blocking the store for too long.
880	// It processes accounts in batches and stops when any of these limits are reached:
881	// - `MAX_EXPIRY_STATEMENTS_PER_ITERATION` statements found to expire/evict
882	// - `MAX_EXPIRY_ACCOUNTS_PER_ITERATION` accounts checked
883	// - `MAX_EXPIRY_TIME_MS_PER_ITERATION` milliseconds elapsed
884	//
885	// The function maintains a list of accounts to check (`accounts_to_check_for_expiry_stmts`).
886	// When this list is empty, it repopulates it with all current accounts and returns early,
887	// deferring the actual check to the next call. This ensures the process eventually covers
888	// all accounts across multiple invocations.
889	//
890	// Statements are considered expired when their priority (which encodes the expiration
891	// timestamp in the upper 32 bits) is less than the current timestamp.
892	fn enforce_limits(&self) {
893		let _start_check_expiration_timer = self.metrics.start_check_expiration_timer();
894		let current_time = self.timestamp();
895
896		let (to_evict, num_accounts_checked) = {
897			let index = self.index.upgradable_read();
898			if index.accounts_to_check_for_expiry_stmts.is_empty() {
899				let existing_accounts = index.accounts.keys().cloned().collect::<Vec<_>>();
900				let mut index = RwLockUpgradableReadGuard::upgrade(index);
901				index.accounts_to_check_for_expiry_stmts = existing_accounts;
902				return;
903			}
904
905			let mut to_evict = Vec::new();
906			let mut num_accounts_checked = 0;
907			let start = Instant::now();
908
909			for account in index.accounts_to_check_for_expiry_stmts.iter().rev() {
910				num_accounts_checked += 1;
911				if let Some(account_rec) = index.accounts.get(account) {
912					to_evict.extend(self.collect_evictions(account, account_rec, current_time));
913				}
914
915				if to_evict.len() >= MAX_EXPIRY_STATEMENTS_PER_ITERATION
916					|| num_accounts_checked >= MAX_EXPIRY_ACCOUNTS_PER_ITERATION
917					|| start.elapsed() >= MAX_EXPIRY_TIME_PER_ITERATION
918				{
919					break;
920				}
921			}
922
923			(to_evict, num_accounts_checked)
924		};
925
926		let mut expired = 0;
927
928		for hash in to_evict {
929			if let Err(e) = self.remove(&hash) {
930				log::debug!(
931					target: LOG_TARGET,
932					"Error marking statement {:?} as expired: {:?}",
933					HexDisplay::from(&hash),
934					e
935				);
936			} else {
937				expired += 1;
938				log::trace!(
939					target: LOG_TARGET,
940					"Marked statement {:?} as expired",
941					HexDisplay::from(&hash)
942				);
943			}
944		}
945
946		let mut index = self.index.write();
947		let new_len = index
948			.accounts_to_check_for_expiry_stmts
949			.len()
950			.saturating_sub(num_accounts_checked);
951		index.accounts_to_check_for_expiry_stmts.truncate(new_len);
952
953		drop(_start_check_expiration_timer);
954
955		self.metrics.report(|metrics| {
956			metrics.statements_expired_total.inc_by(expired);
957		});
958	}
959
960	/// Perform periodic store maintenance
961	pub fn maintain(&self) {
962		log::trace!(target: LOG_TARGET, "Started store maintenance");
963		let (
964			deleted,
965			active_count,
966			expired_count,
967			total_size,
968			accounts_count,
969			capacity_statements,
970			capacity_bytes,
971		): (Vec<_>, usize, usize, usize, usize, usize, usize) = {
972			let mut index = self.index.write();
973			let deleted = index.maintain(self.timestamp());
974			(
975				deleted,
976				index.entries.len(),
977				index.expired.len(),
978				index.total_size,
979				index.accounts.len(),
980				index.options.max_total_statements,
981				index.options.max_total_size,
982			)
983		};
984		let deleted: Vec<_> =
985			deleted.into_iter().map(|hash| (col::EXPIRED, hash.to_vec(), None)).collect();
986		let deleted_count = deleted.len() as u64;
987		if let Err(e) = self.db.commit(deleted) {
988			log::warn!(target: LOG_TARGET, "Error writing to the statement database: {:?}", e);
989		} else {
990			self.metrics.report(|metrics| metrics.statements_pruned.inc_by(deleted_count));
991		}
992
993		self.metrics.report(|metrics| {
994			metrics.statements_total.set(active_count as u64);
995			metrics.bytes_total.set(total_size as u64);
996			metrics.accounts_total.set(accounts_count as u64);
997			metrics.expired_total.set(expired_count as u64);
998			metrics.capacity_statements.set(capacity_statements as u64);
999			metrics.capacity_bytes.set(capacity_bytes as u64);
1000		});
1001
1002		log::trace!(
1003			target: LOG_TARGET,
1004			"Completed store maintenance. Purged: {}, Active: {}, Expired: {}",
1005			deleted_count,
1006			active_count,
1007			expired_count
1008		);
1009	}
1010
1011	fn timestamp(&self) -> u64 {
1012		self.time_override.unwrap_or_else(|| {
1013			std::time::SystemTime::now()
1014				.duration_since(std::time::UNIX_EPOCH)
1015				.unwrap_or_default()
1016				.as_secs()
1017		})
1018	}
1019
1020	#[cfg(test)]
1021	fn set_time(&mut self, time: u64) {
1022		self.time_override = Some(time);
1023	}
1024
1025	/// Returns `self` as [`StatementStoreExt`].
1026	pub fn as_statement_store_ext(self: Arc<Self>) -> StatementStoreExt {
1027		StatementStoreExt::new(self)
1028	}
1029
1030	/// Return information of all known statements whose decryption key is identified as
1031	/// `dest`. The key must be available to the client.
1032	fn posted_clear_inner<R>(
1033		&self,
1034		match_all_topics: &[Topic],
1035		dest: [u8; 32],
1036		// Map the statement and the decrypted data to the desired result.
1037		mut map_f: impl FnMut(Statement, Vec<u8>) -> R,
1038	) -> Result<Vec<R>> {
1039		self.collect_statements(
1040			Some(dest),
1041			&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1042			|statement| {
1043				if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) {
1044					let public: subsoil::core::ed25519::Public = UncheckedFrom::unchecked_from(key);
1045					let public: soil_statement_store::ed25519::Public = public.into();
1046					match self.keystore.key_pair::<soil_statement_store::ed25519::Pair>(&public) {
1047						Err(e) => {
1048							log::debug!(
1049								target: LOG_TARGET,
1050								"Keystore error: {:?}, for statement {:?}",
1051								e,
1052								HexDisplay::from(&statement.hash())
1053							);
1054							None
1055						},
1056						Ok(None) => {
1057							log::debug!(
1058								target: LOG_TARGET,
1059								"Keystore is missing key for statement {:?}",
1060								HexDisplay::from(&statement.hash())
1061							);
1062							None
1063						},
1064						Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) {
1065							Ok(r) => r.map(|data| map_f(statement, data)),
1066							Err(e) => {
1067								log::debug!(
1068									target: LOG_TARGET,
1069									"Decryption error: {:?}, for statement {:?}",
1070									e,
1071									HexDisplay::from(&statement.hash())
1072								);
1073								None
1074							},
1075						},
1076					}
1077				} else {
1078					None
1079				}
1080			},
1081		)
1082	}
1083}
1084
1085impl StatementStore for Store {
1086	/// Return all statements.
1087	fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
1088		let index = self.index.read();
1089		let mut result = Vec::with_capacity(index.entries.len());
1090		for hash in index.entries.keys().cloned() {
1091			let Some(encoded) =
1092				self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
1093			else {
1094				continue;
1095			};
1096			if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
1097				result.push((hash, statement));
1098			}
1099		}
1100		Ok(result)
1101	}
1102
1103	fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>> {
1104		let mut index = self.index.write();
1105		let recent = index.take_recent();
1106		let mut result = Vec::with_capacity(recent.len());
1107		for hash in recent {
1108			let Some(encoded) =
1109				self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
1110			else {
1111				continue;
1112			};
1113			if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
1114				result.push((hash, statement));
1115			}
1116		}
1117		Ok(result)
1118	}
1119
1120	/// Returns a statement by hash.
1121	fn statement(&self, hash: &Hash) -> Result<Option<Statement>> {
1122		Ok(
1123			match self
1124				.db
1125				.get(col::STATEMENTS, hash.as_slice())
1126				.map_err(|e| Error::Db(e.to_string()))?
1127			{
1128				Some(entry) => {
1129					log::trace!(
1130						target: LOG_TARGET,
1131						"Queried statement {:?}",
1132						HexDisplay::from(hash)
1133					);
1134					Some(
1135						Statement::decode(&mut entry.as_slice())
1136							.map_err(|e| Error::Decode(e.to_string()))?,
1137					)
1138				},
1139				None => {
1140					log::trace!(
1141						target: LOG_TARGET,
1142						"Queried missing statement {:?}",
1143						HexDisplay::from(hash)
1144					);
1145					None
1146				},
1147			},
1148		)
1149	}
1150
1151	fn has_statement(&self, hash: &Hash) -> bool {
1152		self.index.read().entries.contains_key(hash)
1153	}
1154
1155	fn statement_hashes(&self) -> Vec<Hash> {
1156		self.index.read().entries.keys().cloned().collect()
1157	}
1158
1159	fn statements_by_hashes(
1160		&self,
1161		hashes: &[Hash],
1162		filter: &mut dyn FnMut(&Hash, &[u8], &Statement) -> FilterDecision,
1163	) -> Result<(Vec<(Hash, Statement)>, usize)> {
1164		let mut result = Vec::new();
1165		let mut processed = 0;
1166		for hash in hashes {
1167			processed += 1;
1168			let Some(encoded) =
1169				self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))?
1170			else {
1171				continue;
1172			};
1173			let Ok(statement) = Statement::decode(&mut encoded.as_slice()) else { continue };
1174			match filter(hash, &encoded, &statement) {
1175				FilterDecision::Skip => {},
1176				FilterDecision::Take => {
1177					result.push((*hash, statement));
1178				},
1179				FilterDecision::Abort => {
1180					// We did not process it :)
1181					processed -= 1;
1182					break;
1183				},
1184			}
1185		}
1186
1187		Ok((result, processed))
1188	}
1189
1190	/// Return the data of all known statements which include all topics and have no `DecryptionKey`
1191	/// field.
1192	fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
1193		self.collect_statements(
1194			None,
1195			&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1196			|statement| statement.into_data(),
1197		)
1198	}
1199
1200	/// Return the data of all known statements whose decryption key is identified as `dest` (this
1201	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
1202	/// private key for symmetric ciphers).
1203	fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
1204		self.collect_statements(
1205			Some(dest),
1206			&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1207			|statement| statement.into_data(),
1208		)
1209	}
1210
1211	/// Return the decrypted data of all known statements whose decryption key is identified as
1212	/// `dest`. The key must be available to the client.
1213	fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
1214		self.posted_clear_inner(match_all_topics, dest, |_statement, data| data)
1215	}
1216
1217	/// Return all known statements which include all topics and have no `DecryptionKey`
1218	/// field.
1219	fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
1220		self.collect_statements(
1221			None,
1222			&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1223			|statement| Some(statement.encode()),
1224		)
1225	}
1226
1227	/// Return all known statements whose decryption key is identified as `dest` (this
1228	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
1229	/// private key for symmetric ciphers).
1230	fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
1231		self.collect_statements(
1232			Some(dest),
1233			&OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1234			|statement| Some(statement.encode()),
1235		)
1236	}
1237
1238	/// Return the statement and the decrypted data of all known statements whose decryption key is
1239	/// identified as `dest`. The key must be available to the client.
1240	fn posted_clear_stmt(
1241		&self,
1242		match_all_topics: &[Topic],
1243		dest: [u8; 32],
1244	) -> Result<Vec<Vec<u8>>> {
1245		self.posted_clear_inner(match_all_topics, dest, |statement, data| {
1246			let mut res = Vec::with_capacity(statement.size_hint() + data.len());
1247			statement.encode_to(&mut res);
1248			res.extend_from_slice(&data);
1249			res
1250		})
1251	}
1252
1253	/// Submit a statement to the store. Validates the statement and returns validation result.
1254	fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult {
1255		let _histogram_submit_start_timer = self.metrics.start_submit_timer();
1256		let hash = statement.hash();
1257		// Get unix timestamp
1258		if self.timestamp() >= statement.get_expiration_timestamp_secs().into() {
1259			log::debug!(
1260				target: LOG_TARGET,
1261				"Statement is already expired: {:?}",
1262				HexDisplay::from(&hash),
1263			);
1264			self.metrics.report(|metrics| metrics.validations_invalid.inc());
1265			return SubmitResult::Invalid(InvalidReason::AlreadyExpired);
1266		}
1267		let encoded_size = statement.encoded_size();
1268		if encoded_size > MAX_STATEMENT_SIZE {
1269			log::debug!(
1270				target: LOG_TARGET,
1271				"Statement is too big for propogation: {:?} ({}/{} bytes)",
1272				HexDisplay::from(&hash),
1273				statement.encoded_size(),
1274				MAX_STATEMENT_SIZE
1275			);
1276			self.metrics.report(|metrics| metrics.validations_invalid.inc());
1277			return SubmitResult::Invalid(InvalidReason::EncodingTooLarge {
1278				submitted_size: encoded_size,
1279				max_size: MAX_STATEMENT_SIZE,
1280			});
1281		}
1282
1283		match self.index.read().query(&hash) {
1284			IndexQuery::Expired => {
1285				if !source.can_be_resubmitted() {
1286					return SubmitResult::KnownExpired;
1287				}
1288			},
1289			IndexQuery::Exists => {
1290				if !source.can_be_resubmitted() {
1291					return SubmitResult::Known;
1292				}
1293			},
1294			IndexQuery::Unknown => {},
1295		}
1296
1297		let Some(account_id) = statement.account_id() else {
1298			log::debug!(
1299				target: LOG_TARGET,
1300				"Statement validation failed: Missing proof ({:?})",
1301				HexDisplay::from(&hash),
1302			);
1303			self.metrics.report(|metrics| metrics.validations_invalid.inc());
1304			return SubmitResult::Invalid(InvalidReason::NoProof);
1305		};
1306
1307		match statement.verify_signature() {
1308			SignatureVerificationResult::Valid(_) => {},
1309			SignatureVerificationResult::Invalid => {
1310				log::debug!(
1311					target: LOG_TARGET,
1312					"Statement validation failed: BadProof, {:?}",
1313					HexDisplay::from(&hash),
1314				);
1315				self.metrics.report(|metrics| metrics.validations_invalid.inc());
1316				return SubmitResult::Invalid(InvalidReason::BadProof);
1317			},
1318			SignatureVerificationResult::NoSignature => {
1319				if let Some(Proof::OnChain { .. }) = statement.proof() {
1320					log::debug!(
1321						target: LOG_TARGET,
1322						"Statement with OnChain proof accepted: {:?}",
1323						HexDisplay::from(&hash),
1324					);
1325				} else {
1326					log::debug!(
1327						target: LOG_TARGET,
1328						"Statement validation failed: NoProof, {:?}",
1329						HexDisplay::from(&hash),
1330					);
1331					self.metrics.report(|metrics| metrics.validations_invalid.inc());
1332					return SubmitResult::Invalid(InvalidReason::NoProof);
1333				}
1334			},
1335		};
1336
1337		let validation = match (self.read_allowance_fn)(
1338			&account_id,
1339			statement.proof().and_then(|p| match p {
1340				Proof::OnChain { block_hash, .. } => Some(*block_hash),
1341				_ => None,
1342			}),
1343		) {
1344			Ok(Some(allowance)) => allowance,
1345			Ok(None) => {
1346				log::debug!(
1347					target: LOG_TARGET,
1348					"Account {} has no statement allowance set",
1349					HexDisplay::from(&account_id),
1350				);
1351				return SubmitResult::Rejected(RejectionReason::NoAllowance);
1352			},
1353			Err(e) => {
1354				log::debug!(
1355					target: LOG_TARGET,
1356					"Reading statement allowance for account {} failed",
1357					HexDisplay::from(&account_id),
1358				);
1359				return SubmitResult::InternalError(e);
1360			},
1361		};
1362
1363		let current_time = self.timestamp();
1364		let mut commit = Vec::new();
1365		{
1366			let mut index = self.index.write();
1367
1368			let evicted =
1369				match index.insert(hash, &statement, &account_id, &validation, current_time) {
1370					Ok(evicted) => evicted,
1371					Err(reason) => {
1372						self.metrics.report(|metrics| {
1373							metrics.rejections.with_label_values(&[reason.label()]).inc();
1374						});
1375						return SubmitResult::Rejected(reason);
1376					},
1377				};
1378
1379			commit.push((col::STATEMENTS, hash.to_vec(), Some(statement.encode())));
1380			for hash in evicted {
1381				commit.push((col::STATEMENTS, hash.to_vec(), None));
1382				commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
1383			}
1384			if let Err(e) = self.db.commit(commit) {
1385				log::debug!(
1386					target: LOG_TARGET,
1387					"Statement validation failed: database error {}, {:?}",
1388					e,
1389					statement
1390				);
1391				return SubmitResult::InternalError(Error::Db(e.to_string()));
1392			}
1393			self.subscription_manager.notify(statement);
1394		} // Release index lock
1395		self.metrics.report(|metrics| metrics.submitted_statements.inc());
1396		log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash));
1397		SubmitResult::New
1398	}
1399
1400	/// Remove a statement by hash.
1401	fn remove(&self, hash: &Hash) -> Result<()> {
1402		let current_time = self.timestamp();
1403		{
1404			let mut index = self.index.write();
1405			if index.make_expired(hash, current_time) {
1406				let commit = [
1407					(col::STATEMENTS, hash.to_vec(), None),
1408					(col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())),
1409				];
1410				if let Err(e) = self.db.commit(commit) {
1411					log::debug!(
1412						target: LOG_TARGET,
1413						"Error removing statement: database error {}, {:?}",
1414						e,
1415						HexDisplay::from(hash),
1416					);
1417					return Err(Error::Db(e.to_string()));
1418				}
1419			}
1420		}
1421		Ok(())
1422	}
1423
1424	/// Remove all statements by an account.
1425	fn remove_by(&self, who: [u8; 32]) -> Result<()> {
1426		let mut index = self.index.write();
1427		let mut evicted = Vec::new();
1428		if let Some(account_rec) = index.accounts.get(&who) {
1429			evicted.extend(account_rec.by_priority.keys().map(|k| k.hash));
1430		}
1431
1432		let current_time = self.timestamp();
1433		let mut commit = Vec::new();
1434		for hash in evicted {
1435			index.make_expired(&hash, current_time);
1436			commit.push((col::STATEMENTS, hash.to_vec(), None));
1437			commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
1438		}
1439		self.db.commit(commit).map_err(|e| {
1440			log::debug!(
1441				target: LOG_TARGET,
1442				"Error removing statement: database error {}, remove by {:?}",
1443				e,
1444				HexDisplay::from(&who),
1445			);
1446
1447			Error::Db(e.to_string())
1448		})
1449	}
1450}
1451
1452impl StatementStoreSubscriptionApi for Store {
1453	fn subscribe_statement(
1454		&self,
1455		topic_filter: OptimizedTopicFilter,
1456	) -> Result<(Vec<Vec<u8>>, async_channel::Sender<StatementEvent>, SubscriptionStatementsStream)>
1457	{
1458		// Keep the index read lock until after we have subscribed to avoid missing statements.
1459		let mut existing_statements = Vec::new();
1460		let index = self.index.read();
1461		self.collect_statements_locked(
1462			None,
1463			&topic_filter,
1464			&index,
1465			&mut existing_statements,
1466			|statement| Some(statement.encode()),
1467		)?;
1468		let (subscription_sender, subscription_stream) =
1469			self.subscription_manager.subscribe(topic_filter);
1470		if existing_statements.is_empty() {
1471			subscription_sender
1472				.send_blocking(StatementEvent::NewStatements {
1473					statements: vec![],
1474					remaining: Some(0),
1475				})
1476				.ok();
1477		}
1478		Ok((existing_statements, subscription_sender, subscription_stream))
1479	}
1480}
1481
1482#[cfg(test)]
1483mod tests {
1484
1485	use super::{col, Store, MAX_STATEMENT_SIZE};
1486	use soil_client::keystore::Keystore;
1487	use soil_statement_store::{
1488		AccountId, Channel, DecryptionKey, InvalidReason, Proof, Statement, StatementSource,
1489		StatementStore, SubmitResult, Topic,
1490	};
1491	use subsoil::core::{Decode, Encode, Pair};
1492
1493	type Extrinsic = subsoil::runtime::OpaqueExtrinsic;
1494	type Hash = subsoil::core::H256;
1495	type Hashing = subsoil::runtime::traits::BlakeTwo256;
1496	type BlockNumber = u64;
1497	type Header = subsoil::runtime::generic::Header<BlockNumber, Hashing>;
1498	type Block = subsoil::runtime::generic::Block<Header, Extrinsic>;
1499
1500	const CORRECT_BLOCK_HASH: [u8; 32] = [1u8; 32];
1501
1502	#[derive(Clone)]
1503	pub(crate) struct TestClient;
1504
1505	pub(crate) type TestBackend = soil_client::client_api::in_mem::Backend<Block>;
1506
1507	impl soil_client::client_api::StorageProvider<Block, TestBackend> for TestClient {
1508		fn storage(
1509			&self,
1510			_hash: Hash,
1511			key: &soil_client::client_api::StorageKey,
1512		) -> soil_client::blockchain::Result<Option<soil_client::client_api::StorageData>> {
1513			use soil_statement_store::StatementAllowance;
1514
1515			assert_eq!(&key.0[0..21], b":statement_allowance:" as &[u8],);
1516
1517			// Extract account ID (32 bytes) from the storage key
1518			let account_bytes = &key.0[21..53];
1519			let account_id: u64 = u64::from_le_bytes(account_bytes[0..8].try_into().unwrap());
1520			let allowance = match account_id {
1521				// Account 0 has no allowance (used to test eviction of all statements)
1522				0 => return Ok(None),
1523				1 => StatementAllowance::new(1, 1000),
1524				2 => StatementAllowance::new(2, 1000),
1525				3 => StatementAllowance::new(3, 1000),
1526				4 => StatementAllowance::new(4, 1000),
1527				42 => StatementAllowance::new(42, (42 * MAX_STATEMENT_SIZE) as u32),
1528				_ => StatementAllowance::new(100, 1000),
1529			};
1530			Ok(Some(soil_client::client_api::StorageData(allowance.encode())))
1531		}
1532
1533		fn storage_hash(
1534			&self,
1535			_hash: Hash,
1536			_key: &soil_client::client_api::StorageKey,
1537		) -> soil_client::blockchain::Result<Option<Hash>> {
1538			unimplemented!()
1539		}
1540
1541		fn storage_keys(
1542			&self,
1543			_hash: Hash,
1544			_prefix: Option<&soil_client::client_api::StorageKey>,
1545			_start_key: Option<&soil_client::client_api::StorageKey>,
1546		) -> soil_client::blockchain::Result<
1547			soil_client::client_api::backend::KeysIter<
1548				<TestBackend as soil_client::client_api::Backend<Block>>::State,
1549				Block,
1550			>,
1551		> {
1552			unimplemented!()
1553		}
1554
1555		fn storage_pairs(
1556			&self,
1557			_hash: Hash,
1558			_prefix: Option<&soil_client::client_api::StorageKey>,
1559			_start_key: Option<&soil_client::client_api::StorageKey>,
1560		) -> soil_client::blockchain::Result<
1561			soil_client::client_api::backend::PairsIter<
1562				<TestBackend as soil_client::client_api::Backend<Block>>::State,
1563				Block,
1564			>,
1565		> {
1566			unimplemented!()
1567		}
1568
1569		fn child_storage(
1570			&self,
1571			_hash: Hash,
1572			_child_info: &soil_client::client_api::ChildInfo,
1573			_key: &soil_client::client_api::StorageKey,
1574		) -> soil_client::blockchain::Result<Option<soil_client::client_api::StorageData>> {
1575			unimplemented!()
1576		}
1577
1578		fn child_storage_keys(
1579			&self,
1580			_hash: Hash,
1581			_child_info: soil_client::client_api::ChildInfo,
1582			_prefix: Option<&soil_client::client_api::StorageKey>,
1583			_start_key: Option<&soil_client::client_api::StorageKey>,
1584		) -> soil_client::blockchain::Result<
1585			soil_client::client_api::backend::KeysIter<
1586				<TestBackend as soil_client::client_api::Backend<Block>>::State,
1587				Block,
1588			>,
1589		> {
1590			unimplemented!()
1591		}
1592
1593		fn child_storage_hash(
1594			&self,
1595			_hash: Hash,
1596			_child_info: &soil_client::client_api::ChildInfo,
1597			_key: &soil_client::client_api::StorageKey,
1598		) -> soil_client::blockchain::Result<Option<Hash>> {
1599			unimplemented!()
1600		}
1601
1602		fn closest_merkle_value(
1603			&self,
1604			_hash: Hash,
1605			_key: &soil_client::client_api::StorageKey,
1606		) -> soil_client::blockchain::Result<Option<soil_client::client_api::MerkleValue<Hash>>> {
1607			unimplemented!()
1608		}
1609
1610		fn child_closest_merkle_value(
1611			&self,
1612			_hash: Hash,
1613			_child_info: &soil_client::client_api::ChildInfo,
1614			_key: &soil_client::client_api::StorageKey,
1615		) -> soil_client::blockchain::Result<Option<soil_client::client_api::MerkleValue<Hash>>> {
1616			unimplemented!()
1617		}
1618	}
1619
1620	impl soil_client::blockchain::HeaderBackend<Block> for TestClient {
1621		fn header(&self, _hash: Hash) -> soil_client::blockchain::Result<Option<Header>> {
1622			unimplemented!()
1623		}
1624		fn info(&self) -> soil_client::blockchain::Info<Block> {
1625			soil_client::blockchain::Info {
1626				best_hash: CORRECT_BLOCK_HASH.into(),
1627				best_number: 0,
1628				genesis_hash: Default::default(),
1629				finalized_hash: CORRECT_BLOCK_HASH.into(),
1630				finalized_number: 1,
1631				finalized_state: None,
1632				number_leaves: 0,
1633				block_gap: None,
1634			}
1635		}
1636		fn status(
1637			&self,
1638			_hash: Hash,
1639		) -> soil_client::blockchain::Result<soil_client::blockchain::BlockStatus> {
1640			unimplemented!()
1641		}
1642		fn number(&self, _hash: Hash) -> soil_client::blockchain::Result<Option<BlockNumber>> {
1643			unimplemented!()
1644		}
1645		fn hash(&self, _number: BlockNumber) -> soil_client::blockchain::Result<Option<Hash>> {
1646			unimplemented!()
1647		}
1648	}
1649
1650	fn test_store() -> (Store, tempfile::TempDir) {
1651		subsoil::tracing::init_for_tests();
1652		let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
1653
1654		let client = std::sync::Arc::new(TestClient);
1655		let mut path: std::path::PathBuf = temp_dir.path().into();
1656		path.push("db");
1657		let keystore = std::sync::Arc::new(soil_client::keystore::LocalKeystore::in_memory());
1658		let store = Store::new::<Block, TestClient, TestBackend>(
1659			&path,
1660			Default::default(),
1661			client,
1662			keystore,
1663			None,
1664			Box::new(subsoil::core::testing::TaskExecutor::new()),
1665		)
1666		.unwrap();
1667		(store, temp_dir) // return order is important. Store must be dropped before TempDir
1668	}
1669
1670	pub fn signed_statement(data: u8) -> Statement {
1671		signed_statement_with_topics(data, &[], None)
1672	}
1673
1674	fn signed_statement_with_topics(
1675		data: u8,
1676		topics: &[Topic],
1677		dec_key: Option<DecryptionKey>,
1678	) -> Statement {
1679		let mut statement = Statement::new();
1680		statement.set_plain_data(vec![data]);
1681		statement.set_expiry(u64::MAX);
1682
1683		for i in 0..topics.len() {
1684			statement.set_topic(i, topics[i]);
1685		}
1686		if let Some(key) = dec_key {
1687			statement.set_decryption_key(key);
1688		}
1689		let kp = subsoil::core::ed25519::Pair::from_string("//Alice", None).unwrap();
1690		statement.sign_ed25519_private(&kp);
1691		statement
1692	}
1693
1694	fn topic(data: u64) -> Topic {
1695		let mut bytes = [0u8; 32];
1696		bytes[0..8].copy_from_slice(&data.to_le_bytes());
1697		Topic::from(bytes)
1698	}
1699
1700	fn dec_key(data: u64) -> DecryptionKey {
1701		let mut dec_key: DecryptionKey = Default::default();
1702		dec_key[0..8].copy_from_slice(&data.to_le_bytes());
1703		dec_key
1704	}
1705
1706	fn account(id: u64) -> AccountId {
1707		let mut account: AccountId = Default::default();
1708		account[0..8].copy_from_slice(&id.to_le_bytes());
1709		account
1710	}
1711
1712	fn channel(id: u64) -> Channel {
1713		let mut channel: Channel = Default::default();
1714		channel[0..8].copy_from_slice(&id.to_le_bytes());
1715		channel
1716	}
1717
1718	fn statement(account_id: u64, priority: u32, c: Option<u64>, data_len: usize) -> Statement {
1719		let mut statement = Statement::new();
1720		let mut data = Vec::new();
1721		data.resize(data_len, 0);
1722		statement.set_plain_data(data);
1723		statement.set_expiry_from_parts(u32::MAX, priority);
1724		if let Some(c) = c {
1725			statement.set_channel(channel(c));
1726		}
1727		statement.set_proof(Proof::OnChain {
1728			block_hash: CORRECT_BLOCK_HASH,
1729			who: account(account_id),
1730			event_index: 0,
1731		});
1732		statement
1733	}
1734
1735	#[test]
1736	fn submit_one() {
1737		let (store, _temp) = test_store();
1738		let statement0 = signed_statement(0);
1739		assert_eq!(store.submit(statement0, StatementSource::Network), SubmitResult::New);
1740		let unsigned = statement(1, 1, None, 0);
1741		assert_eq!(store.submit(unsigned, StatementSource::Network), SubmitResult::New);
1742	}
1743
1744	#[test]
1745	fn save_and_load_statements() {
1746		let (store, temp) = test_store();
1747		let statement0 = signed_statement(0);
1748		let statement1 = signed_statement(1);
1749		let statement2 = signed_statement(2);
1750		assert_eq!(store.submit(statement0.clone(), StatementSource::Network), SubmitResult::New);
1751		assert_eq!(store.submit(statement1.clone(), StatementSource::Network), SubmitResult::New);
1752		assert_eq!(store.submit(statement2.clone(), StatementSource::Network), SubmitResult::New);
1753		assert_eq!(store.statements().unwrap().len(), 3);
1754		assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1755		assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1.clone()));
1756		let keystore = store.keystore.clone();
1757		drop(store);
1758
1759		let client = std::sync::Arc::new(TestClient);
1760		let mut path: std::path::PathBuf = temp.path().into();
1761		path.push("db");
1762		let store = Store::new::<Block, TestClient, TestBackend>(
1763			&path,
1764			Default::default(),
1765			client,
1766			keystore,
1767			None,
1768			Box::new(subsoil::core::testing::TaskExecutor::new()),
1769		)
1770		.unwrap();
1771		assert_eq!(store.statements().unwrap().len(), 3);
1772		assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1773		assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
1774	}
1775
1776	#[test]
1777	fn take_recent_statements_clears_index() {
1778		let (store, _temp) = test_store();
1779		let statement0 = signed_statement(0);
1780		let statement1 = signed_statement(1);
1781		let statement2 = signed_statement(2);
1782		let statement3 = signed_statement(3);
1783
1784		let _ = store.submit(statement0.clone(), StatementSource::Local);
1785		let _ = store.submit(statement1.clone(), StatementSource::Local);
1786		let _ = store.submit(statement2.clone(), StatementSource::Local);
1787
1788		let recent1 = store.take_recent_statements().unwrap();
1789		let (recent1_hashes, recent1_statements): (Vec<_>, Vec<_>) = recent1.into_iter().unzip();
1790		let expected1 = vec![statement0, statement1, statement2];
1791		assert!(expected1.iter().all(|s| recent1_hashes.contains(&s.hash())));
1792		assert!(expected1.iter().all(|s| recent1_statements.contains(s)));
1793
1794		// Recent statements are cleared.
1795		let recent2 = store.take_recent_statements().unwrap();
1796		assert_eq!(recent2.len(), 0);
1797
1798		store.submit(statement3.clone(), StatementSource::Network);
1799
1800		let recent3 = store.take_recent_statements().unwrap();
1801		let (recent3_hashes, recent3_statements): (Vec<_>, Vec<_>) = recent3.into_iter().unzip();
1802		let expected3 = vec![statement3];
1803		assert!(expected3.iter().all(|s| recent3_hashes.contains(&s.hash())));
1804		assert!(expected3.iter().all(|s| recent3_statements.contains(s)));
1805
1806		// Recent statements are cleared, but statements remain in the store.
1807		assert_eq!(store.statements().unwrap().len(), 4);
1808	}
1809
1810	#[test]
1811	fn search_by_topic_and_key() {
1812		let (store, _temp) = test_store();
1813		let statement0 = signed_statement(0);
1814		let statement1 = signed_statement_with_topics(1, &[topic(0)], None);
1815		let statement2 = signed_statement_with_topics(2, &[topic(0), topic(1)], Some(dec_key(2)));
1816		let statement3 = signed_statement_with_topics(3, &[topic(0), topic(1), topic(2)], None);
1817		let statement4 =
1818			signed_statement_with_topics(4, &[topic(0), topic(42), topic(2), topic(3)], None);
1819		let statements = vec![statement0, statement1, statement2, statement3, statement4];
1820		for s in &statements {
1821			store.submit(s.clone(), StatementSource::Network);
1822		}
1823
1824		let assert_topics = |topics: &[u64], key: Option<u64>, expected: &[u8]| {
1825			let key = key.map(dec_key);
1826			let topics: Vec<_> = topics.iter().map(|t| topic(*t)).collect();
1827			let mut got_vals: Vec<_> = if let Some(key) = key {
1828				store.posted(&topics, key).unwrap().into_iter().map(|d| d[0]).collect()
1829			} else {
1830				store.broadcasts(&topics).unwrap().into_iter().map(|d| d[0]).collect()
1831			};
1832			got_vals.sort();
1833			assert_eq!(expected.to_vec(), got_vals);
1834		};
1835
1836		assert_topics(&[], None, &[0, 1, 3, 4]);
1837		assert_topics(&[], Some(2), &[2]);
1838		assert_topics(&[0], None, &[1, 3, 4]);
1839		assert_topics(&[1], None, &[3]);
1840		assert_topics(&[2], None, &[3, 4]);
1841		assert_topics(&[3], None, &[4]);
1842		assert_topics(&[42], None, &[4]);
1843
1844		assert_topics(&[0, 1], None, &[3]);
1845		assert_topics(&[0, 1], Some(2), &[2]);
1846		assert_topics(&[0, 1, 99], Some(2), &[]);
1847		assert_topics(&[1, 2], None, &[3]);
1848		assert_topics(&[99], None, &[]);
1849		assert_topics(&[0, 99], None, &[]);
1850		assert_topics(&[0, 1, 2, 3, 42], None, &[]);
1851	}
1852
1853	#[test]
1854	fn constraints() {
1855		let (store, _temp) = test_store();
1856
1857		store.index.write().options.max_total_size = 3000;
1858		let source = StatementSource::Network;
1859		let ok = SubmitResult::New;
1860
1861		// Account 1 (limit = 1 msg, 1000 bytes)
1862
1863		// Oversized statement is not allowed. Limit for account 1 is 1 msg, 1000 bytes
1864		assert!(matches!(
1865			store.submit(statement(1, 1, Some(1), 2000), source),
1866			SubmitResult::Rejected(_)
1867		));
1868		assert_eq!(store.submit(statement(1, 1, Some(1), 500), source), ok);
1869		// Would not replace channel message with same priority
1870		assert!(matches!(
1871			store.submit(statement(1, 1, Some(1), 200), source),
1872			SubmitResult::Rejected(_)
1873		));
1874		assert_eq!(store.submit(statement(1, 2, Some(1), 600), source), ok);
1875		// Submit another message to another channel with lower priority. Should not be allowed
1876		// because msg count limit is 1
1877		assert!(matches!(
1878			store.submit(statement(1, 1, Some(2), 100), source),
1879			SubmitResult::Rejected(_)
1880		));
1881		assert_eq!(store.index.read().expired.len(), 1);
1882
1883		// Account 2 (limit = 2 msg, 1000 bytes)
1884
1885		assert_eq!(store.submit(statement(2, 1, None, 500), source), ok);
1886		assert_eq!(store.submit(statement(2, 2, None, 100), source), ok);
1887		// Should evict priority 1
1888		assert_eq!(store.submit(statement(2, 3, None, 500), source), ok);
1889		assert_eq!(store.index.read().expired.len(), 2);
1890		// Should evict all
1891		assert_eq!(store.submit(statement(2, 4, None, 1000), source), ok);
1892		assert_eq!(store.index.read().expired.len(), 4);
1893
1894		// Account 3 (limit = 3 msg, 1000 bytes)
1895
1896		assert_eq!(store.submit(statement(3, 2, Some(1), 300), source), ok);
1897		assert_eq!(store.submit(statement(3, 3, Some(2), 300), source), ok);
1898		assert_eq!(store.submit(statement(3, 4, Some(3), 300), source), ok);
1899		// Should evict 2 and 3
1900		assert_eq!(store.submit(statement(3, 5, None, 500), source), ok);
1901		assert_eq!(store.index.read().expired.len(), 6);
1902
1903		assert_eq!(store.index.read().total_size, 2400);
1904		assert_eq!(store.index.read().entries.len(), 4);
1905
1906		// Should be over the global size limit
1907		assert!(matches!(
1908			store.submit(statement(1, 1, None, 700), source),
1909			SubmitResult::Rejected(_)
1910		));
1911		// Should be over the global count limit
1912		store.index.write().options.max_total_statements = 4;
1913		assert!(matches!(
1914			store.submit(statement(1, 1, None, 100), source),
1915			SubmitResult::Rejected(_)
1916		));
1917
1918		let mut expected_statements = vec![
1919			statement(1, 2, Some(1), 600).hash(),
1920			statement(2, 4, None, 1000).hash(),
1921			statement(3, 4, Some(3), 300).hash(),
1922			statement(3, 5, None, 500).hash(),
1923		];
1924		expected_statements.sort();
1925		let mut statements: Vec<_> =
1926			store.statements().unwrap().into_iter().map(|(hash, _)| hash).collect();
1927		statements.sort();
1928		assert_eq!(expected_statements, statements);
1929	}
1930
1931	#[test]
1932	fn max_statement_size_for_gossiping() {
1933		let (store, _temp) = test_store();
1934		store.index.write().options.max_total_size = 42 * MAX_STATEMENT_SIZE;
1935
1936		assert_eq!(
1937			store.submit(
1938				statement(42, 1, Some(1), MAX_STATEMENT_SIZE - 500),
1939				StatementSource::Local
1940			),
1941			SubmitResult::New
1942		);
1943
1944		assert!(matches!(
1945			store.submit(statement(42, 2, Some(1), 2 * MAX_STATEMENT_SIZE), StatementSource::Local),
1946			SubmitResult::Invalid(_)
1947		));
1948	}
1949
1950	#[test]
1951	fn expired_statements_are_purged() {
1952		use super::DEFAULT_PURGE_AFTER_SEC;
1953		let (mut store, temp) = test_store();
1954		let mut statement = statement(1, 1, Some(3), 100);
1955		store.set_time(0);
1956		statement.set_topic(0, topic(4));
1957		store.submit(statement.clone(), StatementSource::Network);
1958		assert_eq!(store.index.read().entries.len(), 1);
1959		store.remove(&statement.hash()).unwrap();
1960		assert_eq!(store.index.read().entries.len(), 0);
1961		assert_eq!(store.index.read().accounts.len(), 0);
1962		store.set_time(DEFAULT_PURGE_AFTER_SEC + 1);
1963		store.maintain();
1964		assert_eq!(store.index.read().expired.len(), 0);
1965		let keystore = store.keystore.clone();
1966		drop(store);
1967
1968		let client = std::sync::Arc::new(TestClient);
1969		let mut path: std::path::PathBuf = temp.path().into();
1970		path.push("db");
1971		let store = Store::new::<Block, TestClient, TestBackend>(
1972			&path,
1973			Default::default(),
1974			client,
1975			keystore,
1976			None,
1977			Box::new(subsoil::core::testing::TaskExecutor::new()),
1978		)
1979		.unwrap();
1980		assert_eq!(store.statements().unwrap().len(), 0);
1981		assert_eq!(store.index.read().expired.len(), 0);
1982	}
1983
1984	#[test]
1985	fn posted_clear_decrypts() {
1986		let (store, _temp) = test_store();
1987		let public = store
1988			.keystore
1989			.ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
1990			.unwrap();
1991		let statement1 = statement(1, 1, None, 100);
1992		let mut statement2 = statement(1, 2, None, 0);
1993		let plain = b"The most valuable secret".to_vec();
1994		statement2.encrypt(&plain, &public).unwrap();
1995		store.submit(statement1, StatementSource::Network);
1996		store.submit(statement2, StatementSource::Network);
1997		let posted_clear = store.posted_clear(&[], public.into()).unwrap();
1998		assert_eq!(posted_clear, vec![plain]);
1999	}
2000
2001	#[test]
2002	fn broadcasts_stmt_returns_encoded_statements() {
2003		let (store, _tmp) = test_store();
2004
2005		// no key, no topic
2006		let s0 = signed_statement_with_topics(0, &[], None);
2007		// same, but with a topic = 42
2008		let s1 = signed_statement_with_topics(1, &[topic(42)], None);
2009		// has a decryption key -> must NOT be returned by broadcasts_stmt
2010		let s2 = signed_statement_with_topics(2, &[topic(42)], Some(dec_key(99)));
2011
2012		for s in [&s0, &s1, &s2] {
2013			store.submit(s.clone(), StatementSource::Network);
2014		}
2015
2016		// no topic filter
2017		let mut hashes: Vec<_> = store
2018			.broadcasts_stmt(&[])
2019			.unwrap()
2020			.into_iter()
2021			.map(|bytes| Statement::decode(&mut &bytes[..]).unwrap().hash())
2022			.collect();
2023		hashes.sort();
2024		let expected_hashes = {
2025			let mut e = vec![s0.hash(), s1.hash()];
2026			e.sort();
2027			e
2028		};
2029		assert_eq!(hashes, expected_hashes);
2030
2031		// filter on topic 42
2032		let got = store.broadcasts_stmt(&[topic(42)]).unwrap();
2033		assert_eq!(got.len(), 1);
2034		let st = Statement::decode(&mut &got[0][..]).unwrap();
2035		assert_eq!(st.hash(), s1.hash());
2036	}
2037
2038	#[test]
2039	fn posted_stmt_returns_encoded_statements_for_dest() {
2040		let (store, _tmp) = test_store();
2041
2042		let public1 = store
2043			.keystore
2044			.ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
2045			.unwrap();
2046		let dest: [u8; 32] = public1.into();
2047
2048		let public2 = store
2049			.keystore
2050			.ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
2051			.unwrap();
2052
2053		// A statement that does have dec_key = dest
2054		let mut s_with_key = statement(1, 1, None, 0);
2055		let plain1 = b"The most valuable secret".to_vec();
2056		s_with_key.encrypt(&plain1, &public1).unwrap();
2057
2058		// A statement with a different dec_key
2059		let mut s_other_key = statement(2, 2, None, 0);
2060		let plain2 = b"The second most valuable secret".to_vec();
2061		s_other_key.encrypt(&plain2, &public2).unwrap();
2062
2063		// Submit them all
2064		for s in [&s_with_key, &s_other_key] {
2065			store.submit(s.clone(), StatementSource::Network);
2066		}
2067
2068		// posted_stmt should only return the one with dec_key = dest
2069		let retrieved = store.posted_stmt(&[], dest).unwrap();
2070		assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
2071
2072		// Re-decode that returned statement to confirm it is correct
2073		let returned_stmt = Statement::decode(&mut &retrieved[0][..]).unwrap();
2074		assert_eq!(
2075			returned_stmt.hash(),
2076			s_with_key.hash(),
2077			"Returned statement must match s_with_key"
2078		);
2079	}
2080
2081	#[test]
2082	fn posted_clear_stmt_returns_statement_followed_by_plain_data() {
2083		let (store, _tmp) = test_store();
2084
2085		let public1 = store
2086			.keystore
2087			.ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
2088			.unwrap();
2089		let dest: [u8; 32] = public1.into();
2090
2091		let public2 = store
2092			.keystore
2093			.ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
2094			.unwrap();
2095
2096		// A statement that does have dec_key = dest
2097		let mut s_with_key = statement(1, 1, None, 0);
2098		let plain1 = b"The most valuable secret".to_vec();
2099		s_with_key.encrypt(&plain1, &public1).unwrap();
2100
2101		// A statement with a different dec_key
2102		let mut s_other_key = statement(2, 2, None, 0);
2103		let plain2 = b"The second most valuable secret".to_vec();
2104		s_other_key.encrypt(&plain2, &public2).unwrap();
2105
2106		// Submit them all
2107		for s in [&s_with_key, &s_other_key] {
2108			store.submit(s.clone(), StatementSource::Network);
2109		}
2110
2111		// posted_stmt should only return the one with dec_key = dest
2112		let retrieved = store.posted_clear_stmt(&[], dest).unwrap();
2113		assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
2114
2115		// We expect: [ encoded Statement ] + [ the decrypted bytes ]
2116		let encoded_stmt = s_with_key.encode();
2117		let stmt_len = encoded_stmt.len();
2118
2119		// 1) statement is first
2120		assert_eq!(&retrieved[0][..stmt_len], &encoded_stmt[..]);
2121
2122		// 2) followed by the decrypted payload
2123		let trailing = &retrieved[0][stmt_len..];
2124		assert_eq!(trailing, &plain1[..]);
2125	}
2126
2127	#[test]
2128	fn posted_clear_returns_plain_data_for_dest_and_topics() {
2129		let (store, _tmp) = test_store();
2130
2131		// prepare two key-pairs
2132		let public_dest = store
2133			.keystore
2134			.ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
2135			.unwrap();
2136		let dest: [u8; 32] = public_dest.into();
2137
2138		let public_other = store
2139			.keystore
2140			.ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
2141			.unwrap();
2142
2143		// statement that SHOULD be returned (matches dest & topic 42)
2144		let mut s_good = statement(1, 1, None, 0);
2145		let plaintext_good = b"The most valuable secret".to_vec();
2146		s_good.encrypt(&plaintext_good, &public_dest).unwrap();
2147		s_good.set_topic(0, topic(42));
2148
2149		// statement that should NOT be returned (same dest but different topic)
2150		let mut s_wrong_topic = statement(2, 2, None, 0);
2151		s_wrong_topic.encrypt(b"Wrong topic", &public_dest).unwrap();
2152		s_wrong_topic.set_topic(0, topic(99));
2153
2154		// statement that should NOT be returned (different dest)
2155		let mut s_other_dest = statement(3, 3, None, 0);
2156		s_other_dest.encrypt(b"Other dest", &public_other).unwrap();
2157		s_other_dest.set_topic(0, topic(42));
2158
2159		// submit all
2160		for s in [&s_good, &s_wrong_topic, &s_other_dest] {
2161			store.submit(s.clone(), StatementSource::Network);
2162		}
2163
2164		// call posted_clear with the topic filter and dest
2165		let retrieved = store.posted_clear(&[topic(42)], dest).unwrap();
2166
2167		// exactly one element, equal to the expected plaintext
2168		assert_eq!(retrieved, vec![plaintext_good]);
2169	}
2170
2171	#[test]
2172	fn already_expired_statement_is_rejected() {
2173		let (mut store, _temp) = test_store();
2174
2175		// Set current time to 1000 seconds
2176		store.set_time(1000);
2177
2178		// Create a statement that has already expired (expiration at 500 seconds, before current
2179		// time)
2180		let mut expired_statement = statement(1, 1, None, 100);
2181		// set_expiry_from_parts: first arg is expiration timestamp in seconds, second is priority
2182		expired_statement.set_expiry_from_parts(500, 1);
2183
2184		// Submit should fail with AlreadyExpired
2185		assert_eq!(
2186			store.submit(expired_statement, StatementSource::Network),
2187			SubmitResult::Invalid(InvalidReason::AlreadyExpired)
2188		);
2189
2190		// Verify the statement was not added
2191		assert_eq!(store.statements().unwrap().len(), 0);
2192
2193		// Now create a statement that is not expired (expiration at 2000 seconds, after current
2194		// time)
2195		let mut valid_statement = statement(1, 1, None, 100);
2196		valid_statement.set_expiry_from_parts(2000, 1);
2197
2198		// Submit should succeed
2199		assert_eq!(store.submit(valid_statement, StatementSource::Network), SubmitResult::New);
2200		assert_eq!(store.statements().unwrap().len(), 1);
2201	}
2202
2203	#[test]
2204	fn remove_by_covers_various_situations() {
2205		use soil_statement_store::{StatementSource, StatementStore, SubmitResult};
2206
2207		// Use a fresh store and fixed time so we can control purging.
2208		let (mut store, _temp) = test_store();
2209		store.set_time(0);
2210
2211		// Reuse helpers from this module.
2212		let t42 = topic(42);
2213		let k7 = dec_key(7);
2214
2215		// Account A = 4 (has per-account limits (4, 1000) in the mock runtime)
2216		// - Mix of topic, decryption-key and channel to exercise every index.
2217		let mut s_a1 = statement(4, 10, Some(100), 100);
2218		s_a1.set_topic(0, t42);
2219		let h_a1 = s_a1.hash();
2220
2221		let mut s_a2 = statement(4, 20, Some(200), 150);
2222		s_a2.set_decryption_key(k7);
2223		let h_a2 = s_a2.hash();
2224
2225		let s_a3 = statement(4, 30, None, 50);
2226		let h_a3 = s_a3.hash();
2227
2228		// Account B = 3 (control group that must remain untouched).
2229		let s_b1 = statement(3, 10, None, 100);
2230		let h_b1 = s_b1.hash();
2231
2232		let mut s_b2 = statement(3, 15, Some(300), 100);
2233		s_b2.set_topic(0, t42);
2234		s_b2.set_decryption_key(k7);
2235		let h_b2 = s_b2.hash();
2236
2237		// Submit all statements.
2238		for s in [&s_a1, &s_a2, &s_a3, &s_b1, &s_b2] {
2239			assert_eq!(store.submit(s.clone(), StatementSource::Network), SubmitResult::New);
2240		}
2241
2242		// --- Pre-conditions: everything is indexed as expected.
2243		{
2244			let idx = store.index.read();
2245			assert_eq!(idx.entries.len(), 5, "all 5 should be present");
2246			assert!(idx.accounts.contains_key(&account(4)));
2247			assert!(idx.accounts.contains_key(&account(3)));
2248			assert_eq!(idx.total_size, 100 + 150 + 50 + 100 + 100);
2249
2250			// Topic and key sets contain both A & B entries.
2251			let set_t = idx.by_topic.get(&t42).expect("topic set exists");
2252			assert!(set_t.contains(&h_a1) && set_t.contains(&h_b2));
2253
2254			let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
2255			assert!(set_k.contains(&h_a2) && set_k.contains(&h_b2));
2256		}
2257
2258		// --- Action: remove all statements by Account A.
2259		store.remove_by(account(4)).expect("remove_by should succeed");
2260
2261		// --- Post-conditions: A's statements are gone and marked expired; B's remain.
2262		{
2263			// A's statements removed from DB view.
2264			for h in [h_a1, h_a2, h_a3] {
2265				assert!(store.statement(&h).unwrap().is_none(), "A's statement should be removed");
2266			}
2267
2268			// B's statements still present.
2269			for h in [h_b1, h_b2] {
2270				assert!(store.statement(&h).unwrap().is_some(), "B's statement should remain");
2271			}
2272
2273			let idx = store.index.read();
2274
2275			// Account map updated.
2276			assert!(!idx.accounts.contains_key(&account(4)), "Account A must be gone");
2277			assert!(idx.accounts.contains_key(&account(3)), "Account B must remain");
2278
2279			// Removed statements are marked expired.
2280			assert!(idx.expired.contains_key(&h_a1));
2281			assert!(idx.expired.contains_key(&h_a2));
2282			assert!(idx.expired.contains_key(&h_a3));
2283			assert_eq!(idx.expired.len(), 3);
2284
2285			// Entry count & total_size reflect only B's data.
2286			assert_eq!(idx.entries.len(), 2);
2287			assert_eq!(idx.total_size, 100 + 100);
2288
2289			// Topic index: only B2 remains for topic 42.
2290			let set_t = idx.by_topic.get(&t42).expect("topic set exists");
2291			assert!(set_t.contains(&h_b2));
2292			assert!(!set_t.contains(&h_a1));
2293
2294			// Decryption-key index: only B2 remains for key 7.
2295			let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
2296			assert!(set_k.contains(&h_b2));
2297			assert!(!set_k.contains(&h_a2));
2298		}
2299
2300		// --- Idempotency: removing again is a no-op and should not error.
2301		store.remove_by(account(4)).expect("second remove_by should be a no-op");
2302
2303		// --- Purge: advance time beyond TTL and run maintenance; expired entries disappear.
2304		let purge_after = store.index.read().options.purge_after_sec;
2305		store.set_time(purge_after + 1);
2306		store.maintain();
2307		assert_eq!(store.index.read().expired.len(), 0, "expired entries should be purged");
2308
2309		// --- Reuse: Account A can submit again after purge.
2310		let s_new = statement(4, 40, None, 10);
2311		assert_eq!(store.submit(s_new, StatementSource::Network), SubmitResult::New);
2312	}
2313
2314	#[test]
2315	fn check_expiration_repopulates_account_list_when_empty() {
2316		let (mut store, _temp) = test_store();
2317		store.set_time(1000);
2318
2319		// Create statements for multiple accounts
2320		// Note: The statement() helper uses set_expiry_from_parts(u32::MAX, priority)
2321		// which creates a very large expiry value that won't trigger expiration
2322		let s1 = statement(1, 1, None, 100);
2323		let s2 = statement(2, 1, None, 100);
2324		let s3 = statement(3, 1, None, 100);
2325
2326		for s in [&s1, &s2, &s3] {
2327			store.submit(s.clone(), StatementSource::Network);
2328		}
2329
2330		// Initially, accounts_to_check_for_expiry_stmts is empty
2331		assert!(store.index.read().accounts_to_check_for_expiry_stmts.is_empty());
2332
2333		// First call to check_expiration should populate the list
2334		store.enforce_limits();
2335
2336		// Now accounts_to_check_for_expiry_stmts should contain all 3 accounts
2337		let accounts = store.index.read().accounts_to_check_for_expiry_stmts.clone();
2338		assert_eq!(accounts.len(), 3, "Should have 3 accounts to check");
2339		assert!(accounts.contains(&account(1)));
2340		assert!(accounts.contains(&account(2)));
2341		assert!(accounts.contains(&account(3)));
2342
2343		// No statements should have been expired since they're all valid
2344		assert_eq!(store.index.read().expired.len(), 0);
2345		assert_eq!(store.index.read().entries.len(), 3);
2346	}
2347
2348	#[test]
2349	fn check_expiration_expires_statements_past_current_time() {
2350		let (mut store, _temp) = test_store();
2351
2352		// The check_expiration function compares Expiry(current_time << 32) against
2353		// Expiry(expiry) where expiry is the full 64-bit value with timestamp in high 32 bits.
2354		// Statements with expiration timestamp < current_time will be expired.
2355
2356		store.set_time(100);
2357
2358		// Create a statement that will expire at timestamp 500
2359		let mut expired_stmt = statement(1, 1, None, 100);
2360		expired_stmt.set_expiry_from_parts(500, 1);
2361		let expired_hash = expired_stmt.hash();
2362		store.submit(expired_stmt, StatementSource::Network);
2363
2364		// Create a statement that won't expire (far future expiry)
2365		let valid_stmt = statement(2, 1, None, 100); // Uses u32::MAX as timestamp
2366		let valid_hash = valid_stmt.hash();
2367		store.submit(valid_stmt, StatementSource::Network);
2368
2369		// Verify both statements are in the store
2370		assert_eq!(store.index.read().entries.len(), 2);
2371
2372		// First check_expiration populates the account list
2373		store.enforce_limits();
2374		assert!(!store.index.read().accounts_to_check_for_expiry_stmts.is_empty());
2375
2376		// Advance time past the expiry of the first statement
2377		store.set_time(1000);
2378
2379		// Second check_expiration should find and expire the statement
2380		store.enforce_limits();
2381
2382		// Check the expired statement is now in the expired list
2383		let index = store.index.read();
2384		assert!(index.expired.contains_key(&expired_hash), "Expired statement should be marked");
2385		assert!(
2386			!index.entries.contains_key(&expired_hash),
2387			"Expired statement should be removed from entries"
2388		);
2389
2390		// The valid statement should still be in entries
2391		assert!(
2392			index.entries.contains_key(&valid_hash),
2393			"Valid statement should still be in entries"
2394		);
2395		assert!(!index.expired.contains_key(&valid_hash), "Valid statement should not be expired");
2396	}
2397
2398	#[test]
2399	fn check_expiration_removes_checked_accounts_from_list_when_expiring() {
2400		let (mut store, _temp) = test_store();
2401		store.set_time(100);
2402
2403		// Create statements with expiry at timestamp 200
2404		let mut stmt1 = statement(1, 1, None, 100);
2405		stmt1.set_expiry_from_parts(200, 1);
2406		store.submit(stmt1, StatementSource::Network);
2407
2408		let mut stmt2 = statement(2, 1, None, 100);
2409		stmt2.set_expiry_from_parts(200, 1);
2410		store.submit(stmt2, StatementSource::Network);
2411
2412		let mut stmt3 = statement(3, 1, None, 100);
2413		stmt3.set_expiry_from_parts(200, 1);
2414		store.submit(stmt3, StatementSource::Network);
2415
2416		// First call populates the list
2417		store.enforce_limits();
2418		assert_eq!(
2419			store.index.read().accounts_to_check_for_expiry_stmts.len(),
2420			3,
2421			"Should have 3 accounts to check"
2422		);
2423
2424		// Advance time past expiry
2425		store.set_time(300);
2426
2427		// Second call should check accounts, expire statements, and remove checked accounts
2428		store.enforce_limits();
2429
2430		// The list should now be empty (all accounts checked and removed)
2431		assert!(
2432			store.index.read().accounts_to_check_for_expiry_stmts.is_empty(),
2433			"All accounts should have been checked and removed after expiration"
2434		);
2435
2436		// All statements should have been expired
2437		assert_eq!(store.index.read().expired.len(), 3);
2438		assert_eq!(store.index.read().entries.len(), 0);
2439	}
2440
2441	#[test]
2442	fn check_expiration_truncates_list_even_when_nothing_expires() {
2443		let (mut store, _temp) = test_store();
2444		store.set_time(1000);
2445
2446		// Create statements for multiple accounts with far future expiry (using statement helper)
2447		// The statement() helper uses set_expiry_from_parts(u32::MAX, priority) which creates
2448		// a very large expiry value that won't trigger expiration
2449		for acc_id in 1..=5u64 {
2450			let stmt = statement(acc_id, 1, None, 100);
2451			store.submit(stmt, StatementSource::Network);
2452		}
2453
2454		// First call populates the list
2455		store.enforce_limits();
2456		assert_eq!(store.index.read().accounts_to_check_for_expiry_stmts.len(), 5);
2457
2458		// Second call checks accounts and truncates the list (even though nothing expires)
2459		store.enforce_limits();
2460
2461		// The list should now be empty - accounts are removed after being checked
2462		assert!(
2463			store.index.read().accounts_to_check_for_expiry_stmts.is_empty(),
2464			"List should be empty after all accounts have been checked"
2465		);
2466
2467		// No statements should have been expired
2468		assert_eq!(store.index.read().expired.len(), 0);
2469		assert_eq!(store.index.read().entries.len(), 5);
2470	}
2471
2472	#[test]
2473	fn check_expiration_handles_multiple_statements_per_account() {
2474		let (mut store, _temp) = test_store();
2475		store.set_time(100);
2476
2477		// Create multiple statements for the same account with different expiry timestamps
2478		// Account 42 has limit of 42 statements
2479		let mut stmt1 = statement(42, 1, Some(1), 100);
2480		stmt1.set_expiry_from_parts(200, 1); // Expires at timestamp 200
2481		let hash1 = stmt1.hash();
2482		store.submit(stmt1, StatementSource::Network);
2483
2484		let mut stmt2 = statement(42, 2, Some(2), 100);
2485		stmt2.set_expiry_from_parts(300, 2); // Expires at timestamp 300
2486		let hash2 = stmt2.hash();
2487		store.submit(stmt2, StatementSource::Network);
2488
2489		let mut stmt3 = statement(42, 3, Some(3), 100);
2490		stmt3.set_expiry_from_parts(500, 3); // Expires at timestamp 500
2491		let hash3 = stmt3.hash();
2492		store.submit(stmt3, StatementSource::Network);
2493
2494		// Verify all statements are in the store
2495		assert_eq!(store.index.read().entries.len(), 3);
2496
2497		// First check_expiration populates the account list
2498		store.enforce_limits();
2499
2500		// Advance time to 250 (stmt1 should expire since 250 > 200)
2501		store.set_time(250);
2502		store.enforce_limits();
2503
2504		{
2505			let index = store.index.read();
2506			assert!(index.expired.contains_key(&hash1), "stmt1 should be expired");
2507			assert!(!index.expired.contains_key(&hash2), "stmt2 should not be expired yet");
2508			assert!(!index.expired.contains_key(&hash3), "stmt3 should not be expired yet");
2509			assert_eq!(index.entries.len(), 2);
2510		}
2511
2512		// Repopulate the account list for next check
2513		store.enforce_limits();
2514
2515		// Advance time to 400 (stmt2 should also expire since 400 > 300)
2516		store.set_time(400);
2517		store.enforce_limits();
2518
2519		{
2520			let index = store.index.read();
2521			assert!(index.expired.contains_key(&hash1));
2522			assert!(index.expired.contains_key(&hash2), "stmt2 should be expired");
2523			assert!(!index.expired.contains_key(&hash3), "stmt3 should not be expired yet");
2524			assert_eq!(index.entries.len(), 1);
2525		}
2526
2527		// Repopulate and check again at time 600 (stmt3 should expire since 600 > 500)
2528		store.enforce_limits();
2529		store.set_time(600);
2530		store.enforce_limits();
2531
2532		{
2533			let index = store.index.read();
2534			assert!(index.expired.contains_key(&hash1));
2535			assert!(index.expired.contains_key(&hash2));
2536			assert!(index.expired.contains_key(&hash3), "stmt3 should be expired");
2537			assert_eq!(index.entries.len(), 0);
2538		}
2539	}
2540
2541	#[test]
2542	fn check_expiration_does_nothing_when_no_expired_statements() {
2543		let (mut store, _temp) = test_store();
2544		store.set_time(1000);
2545
2546		// Create statement with expiry far in the future
2547		// The statement() helper uses set_expiry_from_parts(u32::MAX, priority)
2548		let stmt = statement(1, 1, None, 100);
2549		let hash = stmt.hash();
2550		store.submit(stmt, StatementSource::Network);
2551
2552		// Populate the account list
2553		store.enforce_limits();
2554
2555		// Check expiration - nothing should happen
2556		store.enforce_limits();
2557
2558		// Statement should still be there
2559		let index = store.index.read();
2560		assert!(index.entries.contains_key(&hash));
2561		assert!(!index.expired.contains_key(&hash));
2562		assert_eq!(index.entries.len(), 1);
2563		assert_eq!(index.expired.len(), 0);
2564	}
2565
2566	#[test]
2567	fn check_expiration_correctly_updates_account_data() {
2568		let (mut store, _temp) = test_store();
2569		store.set_time(100);
2570
2571		// Create a statement with expiry at timestamp 200
2572		let mut stmt = statement(1, 1, Some(1), 100);
2573		stmt.set_expiry_from_parts(200, 1);
2574		let hash = stmt.hash();
2575		store.submit(stmt, StatementSource::Network);
2576
2577		// Verify account exists before expiration
2578		{
2579			let index = store.index.read();
2580			assert!(index.accounts.contains_key(&account(1)));
2581			assert_eq!(index.total_size, 100);
2582		}
2583
2584		// Populate and then expire
2585		store.enforce_limits();
2586		store.set_time(300);
2587		store.enforce_limits();
2588
2589		// Verify account is removed after its only statement expires
2590		{
2591			let index = store.index.read();
2592			assert!(
2593				!index.accounts.contains_key(&account(1)),
2594				"Account should be removed when all its statements expire"
2595			);
2596			assert_eq!(index.total_size, 0, "Total size should be zero");
2597			assert!(index.expired.contains_key(&hash));
2598		}
2599	}
2600
2601	#[test]
2602	fn check_expiration_clears_topic_and_key_indexes() {
2603		let (mut store, _temp) = test_store();
2604		store.set_time(100);
2605
2606		// Create a statement with topic and decryption key
2607		let mut stmt = statement(1, 1, Some(1), 100);
2608		stmt.set_expiry_from_parts(200, 1);
2609		stmt.set_topic(0, topic(42));
2610		stmt.set_decryption_key(dec_key(7));
2611		let hash = stmt.hash();
2612		store.submit(stmt, StatementSource::Network);
2613
2614		// Verify indexes are populated
2615		{
2616			let index = store.index.read();
2617			assert!(index.by_topic.get(&topic(42)).map_or(false, |s| s.contains(&hash)));
2618			assert!(index.by_dec_key.get(&Some(dec_key(7))).map_or(false, |s| s.contains(&hash)));
2619		}
2620
2621		// Populate and then expire
2622		store.enforce_limits();
2623		store.set_time(300);
2624		store.enforce_limits();
2625
2626		// Verify indexes are cleared
2627		{
2628			let index = store.index.read();
2629			// Topic set should be empty or removed
2630			assert!(
2631				index.by_topic.get(&topic(42)).map_or(true, |s| s.is_empty()),
2632				"Topic index should be cleared"
2633			);
2634			// Key set should be empty or removed
2635			assert!(
2636				index.by_dec_key.get(&Some(dec_key(7))).map_or(true, |s| s.is_empty()),
2637				"Decryption key index should be cleared"
2638			);
2639			assert!(index.expired.contains_key(&hash));
2640		}
2641	}
2642
2643	#[test]
2644	fn check_expiration_handles_empty_store() {
2645		let (mut store, _temp) = test_store();
2646		store.set_time(1000);
2647
2648		// With no statements, check_expiration should not panic
2649		store.enforce_limits();
2650
2651		// Second call should also work (empty repopulation)
2652		store.enforce_limits();
2653
2654		assert!(store.index.read().accounts_to_check_for_expiry_stmts.is_empty());
2655		assert_eq!(store.index.read().entries.len(), 0);
2656		assert_eq!(store.index.read().expired.len(), 0);
2657	}
2658
2659	#[test]
2660	fn check_expiration_expires_properly_formatted_statements() {
2661		// With the fix (Expiry(current_time << 32)), check_expiration properly
2662		// compares timestamps and can expire statements submitted through normal flow.
2663
2664		let (mut store, _temp) = test_store();
2665		store.set_time(1000);
2666
2667		// Create a statement with expiration timestamp just 1 second in the future
2668		let mut stmt = statement(1, 1, None, 100);
2669		stmt.set_expiry_from_parts(1001, 1); // Expires at timestamp 1001
2670		let hash = stmt.hash();
2671		store.submit(stmt, StatementSource::Network);
2672
2673		assert_eq!(store.index.read().entries.len(), 1);
2674
2675		// Populate the accounts list
2676		store.enforce_limits();
2677
2678		// Advance time past the expiration timestamp
2679		store.set_time(2000);
2680		store.enforce_limits();
2681
2682		// Statement SHOULD be expired because check_expiration now compares
2683		// Expiry(2000 << 32) against Expiry(1001 << 32 | 1), and
2684		// (2000 << 32) > (1001 << 32 | 1)
2685		let index = store.index.read();
2686		assert!(
2687			!index.entries.contains_key(&hash),
2688			"Statement should be removed from entries after expiration"
2689		);
2690		assert!(index.expired.contains_key(&hash), "Statement should be in expired list");
2691	}
2692
2693	#[test]
2694	fn check_expiration_updates_database_columns() {
2695		// This test verifies that check_expiration properly updates the database.
2696		let (mut store, _temp) = test_store();
2697		store.set_time(100);
2698
2699		// Create a statement with expiry at timestamp 200
2700		let mut stmt = statement(1, 1, None, 100);
2701		stmt.set_expiry_from_parts(200, 1);
2702		let hash = stmt.hash();
2703		store.submit(stmt.clone(), StatementSource::Network);
2704
2705		// Verify statement is in the database
2706		let db_entry = store.db.get(col::STATEMENTS, &hash).unwrap();
2707		assert!(db_entry.is_some(), "Statement should be in col::STATEMENTS after submit");
2708
2709		// Populate the accounts list
2710		store.enforce_limits();
2711
2712		// Advance time past expiry and run check_expiration
2713		store.set_time(300);
2714		store.enforce_limits();
2715
2716		// Verify in-memory state is updated correctly
2717		{
2718			let index = store.index.read();
2719			assert!(
2720				!index.entries.contains_key(&hash),
2721				"Statement should be removed from in-memory entries"
2722			);
2723			assert!(
2724				index.expired.contains_key(&hash),
2725				"Statement should be in in-memory expired map"
2726			);
2727		}
2728
2729		let db_entry = store.db.get(col::STATEMENTS, &hash).unwrap();
2730		assert!(
2731			db_entry.is_none(),
2732			"Statement should be removed from col::STATEMENTS after expiration"
2733		);
2734
2735		let expired_entry = store.db.get(col::EXPIRED, &hash).unwrap();
2736		assert!(expired_entry.is_some(), "Expiration info should be written to col::EXPIRED");
2737	}
2738
2739	#[test]
2740	fn enforce_allowances_evicts_excess_statements() {
2741		// This test verifies that check_expiration correctly evicts statements
2742		// when statements exceed the current allowance. We directly insert into
2743		// the index (bypassing submit's validation) to simulate statements that
2744		// existed before allowances were reduced.
2745		let (mut store, _temp) = test_store();
2746		store.set_time(0);
2747
2748		// Account 4 has allowance (4 statements, 1000 bytes) from TestClient
2749		let s1 = statement(4, 10, None, 100); // lowest priority - will be evicted
2750		let s2 = statement(4, 20, None, 100);
2751		let s3 = statement(4, 30, None, 100);
2752		let s4 = statement(4, 40, None, 100);
2753		let s5 = statement(4, 50, None, 100); // highest priority
2754
2755		let h1 = s1.hash();
2756		let h5 = s5.hash();
2757
2758		// Directly insert into index, bypassing `submit`'s allowance check
2759		{
2760			let mut index = store.index.write();
2761			for s in [&s1, &s2, &s3, &s4, &s5] {
2762				index.insert_new(s.hash(), account(4), s, false);
2763			}
2764		}
2765
2766		// Verify initial state - all 5 should be present
2767		assert_eq!(store.index.read().entries.len(), 5);
2768		assert_eq!(store.index.read().total_size, 500);
2769
2770		// Run check_expiration which handles both expiration and allowance enforcement
2771		// First call populates the accounts list, second call processes them
2772		// Since account 4 has max_count=4, one statement should be evicted
2773		store.enforce_limits();
2774		store.enforce_limits();
2775
2776		// Should evict the lowest priority statement (s1)
2777		let index = store.index.read();
2778		assert_eq!(index.entries.len(), 4, "Should have 4 statements after eviction");
2779		assert!(!index.entries.contains_key(&h1), "Lowest priority should be evicted");
2780		assert!(index.entries.contains_key(&h5), "Highest priority should remain");
2781		assert_eq!(index.total_size, 400);
2782
2783		// Evicted statement should be marked as expired
2784		assert!(index.expired.contains_key(&h1));
2785	}
2786
2787	#[test]
2788	fn enforce_allowances_evicts_all_when_no_allowance_found() {
2789		let (mut store, _temp) = test_store();
2790		store.set_time(0);
2791
2792		// Account 0 has NO allowance in TestClient
2793		let s1 = statement(0, 10, None, 100);
2794		let s2 = statement(0, 20, None, 150);
2795
2796		let h1 = s1.hash();
2797		let h2 = s2.hash();
2798
2799		// Directly insert statements for account with no allowance
2800		{
2801			let mut index = store.index.write();
2802			index.insert_new(h1, account(0), &s1, false);
2803			index.insert_new(h2, account(0), &s2, false);
2804		}
2805
2806		assert_eq!(store.index.read().entries.len(), 2);
2807
2808		// Run check_expiration - should evict ALL statements since no allowance exists
2809		// First call populates the accounts list, second call processes them
2810		store.enforce_limits();
2811		store.enforce_limits();
2812
2813		let index = store.index.read();
2814		assert_eq!(index.entries.len(), 0, "All statements should be evicted");
2815		assert!(!index.accounts.contains_key(&account(0)), "Account should be removed");
2816		assert!(index.expired.contains_key(&h1));
2817		assert!(index.expired.contains_key(&h2));
2818	}
2819
2820	#[test]
2821	fn enforce_allowances_based_on_size() {
2822		// This test verifies that check_expiration evicts based on size limits.
2823		let (mut store, _temp) = test_store();
2824		store.set_time(0);
2825
2826		// Account 2 has allowance (2, 1000) from TestClient
2827		// Insert 2 statements that together exceed 1000 bytes
2828		let s1 = statement(2, 10, None, 600); // lowest priority
2829		let s2 = statement(2, 20, None, 600); // higher priority
2830
2831		let h1 = s1.hash();
2832		let h2 = s2.hash();
2833
2834		// Directly insert both statements (total 1200 bytes > 1000 limit)
2835		{
2836			let mut index = store.index.write();
2837			index.insert_new(h1, account(2), &s1, false);
2838			index.insert_new(h2, account(2), &s2, false);
2839		}
2840
2841		assert_eq!(store.index.read().total_size, 1200);
2842
2843		// Run check_expiration - should evict s1 to get under 1000 bytes
2844		// First call populates the accounts list, second call processes them
2845		store.enforce_limits();
2846		store.enforce_limits();
2847
2848		let index = store.index.read();
2849		assert_eq!(index.entries.len(), 1);
2850		assert!(index.entries.contains_key(&h2), "Higher priority should remain");
2851		assert!(!index.entries.contains_key(&h1), "Lower priority should be evicted");
2852		assert_eq!(index.total_size, 600);
2853	}
2854}