sc_statement_store/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Disk-backed statement store.
20//!
21//! This module contains an implementation of `sp_statement_store::StatementStore` which is backed
22//! by a database.
23//!
24//! Constraint management.
25//!
26//! Each time a new statement is inserted into the store, it is first validated with the runtime
27//! Validation function computes `global_priority`, 'max_count' and `max_size` for a statement.
28//! The following constraints are then checked:
29//! * For a given account id, there may be at most `max_count` statements with `max_size` total data
30//!   size. To satisfy this, statements for this account ID are removed from the store starting with
31//!   the lowest priority until a constraint is satisfied.
32//! * There may not be more than `MAX_TOTAL_STATEMENTS` total statements with `MAX_TOTAL_SIZE` size.
33//!   To satisfy this, statements are removed from the store starting with the lowest
34//!   `global_priority` until a constraint is satisfied.
35//!
36//! When a new statement is inserted that would not satisfy constraints in the first place, no
37//! statements are deleted and `Ignored` result is returned.
38//! The order in which statements with the same priority are deleted is unspecified.
39//!
40//! Statement expiration.
41//!
42//! Each time a statement is removed from the store (Either evicted by higher priority statement or
43//! explicitly with the `remove` function) the statement is marked as expired. Expired statements
44//! can't be added to the store for `Options::purge_after_sec` seconds. This is to prevent old
45//! statements from being propagated on the network.
46
47#![warn(missing_docs)]
48#![warn(unused_extern_crates)]
49
50mod metrics;
51
52pub use sp_statement_store::{Error, StatementStore, MAX_TOPICS};
53
54use metrics::MetricsLink as PrometheusMetrics;
55use parking_lot::RwLock;
56use prometheus_endpoint::Registry as PrometheusRegistry;
57use sc_keystore::LocalKeystore;
58use sp_api::ProvideRuntimeApi;
59use sp_blockchain::HeaderBackend;
60use sp_core::{crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode};
61use sp_runtime::traits::Block as BlockT;
62use sp_statement_store::{
63	runtime_api::{
64		InvalidStatement, StatementSource, StatementStoreExt, ValidStatement, ValidateStatement,
65	},
66	AccountId, BlockHash, Channel, DecryptionKey, Hash, NetworkPriority, Proof, Result, Statement,
67	SubmitResult, Topic,
68};
69use std::{
70	collections::{BTreeMap, HashMap, HashSet},
71	sync::Arc,
72};
73
74const KEY_VERSION: &[u8] = b"version".as_slice();
75const CURRENT_VERSION: u32 = 1;
76
77const LOG_TARGET: &str = "statement-store";
78
79/// The amount of time an expired statement is kept before it is removed from the store entirely.
80pub const DEFAULT_PURGE_AFTER_SEC: u64 = 2 * 24 * 60 * 60; //48h
81/// The maximum number of statements the statement store can hold.
82pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; // ~4 million
83/// The maximum amount of data the statement store can hold, regardless of the number of
84/// statements from which the data originates.
85pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; // 2GiB
86
87const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(30);
88
89mod col {
90	pub const META: u8 = 0;
91	pub const STATEMENTS: u8 = 1;
92	pub const EXPIRED: u8 = 2;
93
94	pub const COUNT: u8 = 3;
95}
96
97#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)]
98struct Priority(u32);
99
100#[derive(PartialEq, Eq)]
101struct PriorityKey {
102	hash: Hash,
103	priority: Priority,
104}
105
106impl PartialOrd for PriorityKey {
107	fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
108		Some(self.cmp(other))
109	}
110}
111
112impl Ord for PriorityKey {
113	fn cmp(&self, other: &Self) -> std::cmp::Ordering {
114		self.priority.cmp(&other.priority).then_with(|| self.hash.cmp(&other.hash))
115	}
116}
117
118#[derive(PartialEq, Eq)]
119struct ChannelEntry {
120	hash: Hash,
121	priority: Priority,
122}
123
124#[derive(Default)]
125struct StatementsForAccount {
126	// Statements ordered by priority.
127	by_priority: BTreeMap<PriorityKey, (Option<Channel>, usize)>,
128	// Channel to statement map. Only one statement per channel is allowed.
129	channels: HashMap<Channel, ChannelEntry>,
130	// Sum of all `Data` field sizes.
131	data_size: usize,
132}
133
134/// Store configuration
135pub struct Options {
136	/// Maximum statement allowed in the store. Once this limit is reached lower-priority
137	/// statements may be evicted.
138	max_total_statements: usize,
139	/// Maximum total data size allowed in the store. Once this limit is reached lower-priority
140	/// statements may be evicted.
141	max_total_size: usize,
142	/// Number of seconds for which removed statements won't be allowed to be added back in.
143	purge_after_sec: u64,
144}
145
146impl Default for Options {
147	fn default() -> Self {
148		Options {
149			max_total_statements: DEFAULT_MAX_TOTAL_STATEMENTS,
150			max_total_size: DEFAULT_MAX_TOTAL_SIZE,
151			purge_after_sec: DEFAULT_PURGE_AFTER_SEC,
152		}
153	}
154}
155
156#[derive(Default)]
157struct Index {
158	by_topic: HashMap<Topic, HashSet<Hash>>,
159	by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
160	topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
161	entries: HashMap<Hash, (AccountId, Priority, usize)>,
162	expired: HashMap<Hash, u64>, // Value is expiration timestamp.
163	accounts: HashMap<AccountId, StatementsForAccount>,
164	options: Options,
165	total_size: usize,
166}
167
168struct ClientWrapper<Block, Client> {
169	client: Arc<Client>,
170	_block: std::marker::PhantomData<Block>,
171}
172
173impl<Block, Client> ClientWrapper<Block, Client>
174where
175	Block: BlockT,
176	Block::Hash: From<BlockHash>,
177	Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
178	Client::Api: ValidateStatement<Block>,
179{
180	fn validate_statement(
181		&self,
182		block: Option<BlockHash>,
183		source: StatementSource,
184		statement: Statement,
185	) -> std::result::Result<ValidStatement, InvalidStatement> {
186		let api = self.client.runtime_api();
187		let block = block.map(Into::into).unwrap_or_else(|| {
188			// Validate against the finalized state.
189			self.client.info().finalized_hash
190		});
191		api.validate_statement(block, source, statement)
192			.map_err(|_| InvalidStatement::InternalError)?
193	}
194}
195
196/// Statement store.
197pub struct Store {
198	db: parity_db::Db,
199	index: RwLock<Index>,
200	validate_fn: Box<
201		dyn Fn(
202				Option<BlockHash>,
203				StatementSource,
204				Statement,
205			) -> std::result::Result<ValidStatement, InvalidStatement>
206			+ Send
207			+ Sync,
208	>,
209	keystore: Arc<LocalKeystore>,
210	// Used for testing
211	time_override: Option<u64>,
212	metrics: PrometheusMetrics,
213}
214
215enum IndexQuery {
216	Unknown,
217	Exists,
218	Expired,
219}
220
221enum MaybeInserted {
222	Inserted(HashSet<Hash>),
223	Ignored,
224}
225
226impl Index {
227	fn new(options: Options) -> Index {
228		Index { options, ..Default::default() }
229	}
230
231	fn insert_new(&mut self, hash: Hash, account: AccountId, statement: &Statement) {
232		let mut all_topics = [None; MAX_TOPICS];
233		let mut nt = 0;
234		while let Some(t) = statement.topic(nt) {
235			self.by_topic.entry(t).or_default().insert(hash);
236			all_topics[nt] = Some(t);
237			nt += 1;
238		}
239		let key = statement.decryption_key();
240		self.by_dec_key.entry(key).or_default().insert(hash);
241		if nt > 0 || key.is_some() {
242			self.topics_and_keys.insert(hash, (all_topics, key));
243		}
244		let priority = Priority(statement.priority().unwrap_or(0));
245		self.entries.insert(hash, (account, priority, statement.data_len()));
246		self.total_size += statement.data_len();
247		let account_info = self.accounts.entry(account).or_default();
248		account_info.data_size += statement.data_len();
249		if let Some(channel) = statement.channel() {
250			account_info.channels.insert(channel, ChannelEntry { hash, priority });
251		}
252		account_info
253			.by_priority
254			.insert(PriorityKey { hash, priority }, (statement.channel(), statement.data_len()));
255	}
256
257	fn query(&self, hash: &Hash) -> IndexQuery {
258		if self.entries.contains_key(hash) {
259			return IndexQuery::Exists
260		}
261		if self.expired.contains_key(hash) {
262			return IndexQuery::Expired
263		}
264		IndexQuery::Unknown
265	}
266
267	fn insert_expired(&mut self, hash: Hash, timestamp: u64) {
268		self.expired.insert(hash, timestamp);
269	}
270
271	fn iterate_with(
272		&self,
273		key: Option<DecryptionKey>,
274		match_all_topics: &[Topic],
275		mut f: impl FnMut(&Hash) -> Result<()>,
276	) -> Result<()> {
277		let empty = HashSet::new();
278		let mut sets: [&HashSet<Hash>; MAX_TOPICS + 1] = [&empty; MAX_TOPICS + 1];
279		if match_all_topics.len() > MAX_TOPICS {
280			return Ok(())
281		}
282		let key_set = self.by_dec_key.get(&key);
283		if key_set.map_or(0, |s| s.len()) == 0 {
284			// Key does not exist in the index.
285			return Ok(())
286		}
287		sets[0] = key_set.expect("Function returns if key_set is None");
288		for (i, t) in match_all_topics.iter().enumerate() {
289			let set = self.by_topic.get(t);
290			if set.map_or(0, |s| s.len()) == 0 {
291				// At least one of the match_all_topics does not exist in the index.
292				return Ok(())
293			}
294			sets[i + 1] = set.expect("Function returns if set is None");
295		}
296		let sets = &mut sets[0..match_all_topics.len() + 1];
297		// Start with the smallest topic set or the key set.
298		sets.sort_by_key(|s| s.len());
299		for item in sets[0] {
300			if sets[1..].iter().all(|set| set.contains(item)) {
301				log::trace!(
302					target: LOG_TARGET,
303					"Iterating by topic/key: statement {:?}",
304					HexDisplay::from(item)
305				);
306				f(item)?
307			}
308		}
309		Ok(())
310	}
311
312	fn maintain(&mut self, current_time: u64) -> Vec<Hash> {
313		// Purge previously expired messages.
314		let mut purged = Vec::new();
315		self.expired.retain(|hash, timestamp| {
316			if *timestamp + self.options.purge_after_sec <= current_time {
317				purged.push(*hash);
318				log::trace!(target: LOG_TARGET, "Purged statement {:?}", HexDisplay::from(hash));
319				false
320			} else {
321				true
322			}
323		});
324		purged
325	}
326
327	fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
328		if let Some((account, priority, len)) = self.entries.remove(hash) {
329			self.total_size -= len;
330			if let Some((topics, key)) = self.topics_and_keys.remove(hash) {
331				for t in topics.into_iter().flatten() {
332					if let std::collections::hash_map::Entry::Occupied(mut set) =
333						self.by_topic.entry(t)
334					{
335						set.get_mut().remove(hash);
336						if set.get().is_empty() {
337							set.remove_entry();
338						}
339					}
340				}
341				if let std::collections::hash_map::Entry::Occupied(mut set) =
342					self.by_dec_key.entry(key)
343				{
344					set.get_mut().remove(hash);
345					if set.get().is_empty() {
346						set.remove_entry();
347					}
348				}
349			}
350			self.expired.insert(*hash, current_time);
351			if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
352				self.accounts.entry(account)
353			{
354				let key = PriorityKey { hash: *hash, priority };
355				if let Some((channel, len)) = account_rec.get_mut().by_priority.remove(&key) {
356					account_rec.get_mut().data_size -= len;
357					if let Some(channel) = channel {
358						account_rec.get_mut().channels.remove(&channel);
359					}
360				}
361				if account_rec.get().by_priority.is_empty() {
362					account_rec.remove_entry();
363				}
364			}
365			log::trace!(target: LOG_TARGET, "Expired statement {:?}", HexDisplay::from(hash));
366			true
367		} else {
368			false
369		}
370	}
371
372	fn insert(
373		&mut self,
374		hash: Hash,
375		statement: &Statement,
376		account: &AccountId,
377		validation: &ValidStatement,
378		current_time: u64,
379	) -> MaybeInserted {
380		let statement_len = statement.data_len();
381		if statement_len > validation.max_size as usize {
382			log::debug!(
383				target: LOG_TARGET,
384				"Ignored oversize message: {:?} ({} bytes)",
385				HexDisplay::from(&hash),
386				statement_len,
387			);
388			return MaybeInserted::Ignored
389		}
390
391		let mut evicted = HashSet::new();
392		let mut would_free_size = 0;
393		let priority = Priority(statement.priority().unwrap_or(0));
394		let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize);
395		// It may happen that we can't delete enough lower priority messages
396		// to satisfy size constraints. We check for that before deleting anything,
397		// taking into account channel message replacement.
398		if let Some(account_rec) = self.accounts.get(account) {
399			if let Some(channel) = statement.channel() {
400				if let Some(channel_record) = account_rec.channels.get(&channel) {
401					if priority <= channel_record.priority {
402						// Trying to replace channel message with lower priority
403						log::debug!(
404							target: LOG_TARGET,
405							"Ignored lower priority channel message: {:?} {:?} <= {:?}",
406							HexDisplay::from(&hash),
407							priority,
408							channel_record.priority,
409						);
410						return MaybeInserted::Ignored
411					} else {
412						// Would replace channel message. Still need to check for size constraints
413						// below.
414						log::debug!(
415							target: LOG_TARGET,
416							"Replacing higher priority channel message: {:?} ({:?}) > {:?} ({:?})",
417							HexDisplay::from(&hash),
418							priority,
419							HexDisplay::from(&channel_record.hash),
420							channel_record.priority,
421						);
422						let key = PriorityKey {
423							hash: channel_record.hash,
424							priority: channel_record.priority,
425						};
426						if let Some((_channel, len)) = account_rec.by_priority.get(&key) {
427							would_free_size += *len;
428							evicted.insert(channel_record.hash);
429						}
430					}
431				}
432			}
433			// Check if we can evict enough lower priority statements to satisfy constraints
434			for (entry, (_, len)) in account_rec.by_priority.iter() {
435				if (account_rec.data_size - would_free_size + statement_len <= max_size) &&
436					account_rec.by_priority.len() + 1 - evicted.len() <= max_count
437				{
438					// Satisfied
439					break
440				}
441				if evicted.contains(&entry.hash) {
442					// Already accounted for above
443					continue
444				}
445				if entry.priority >= priority {
446					log::debug!(
447						target: LOG_TARGET,
448						"Ignored message due to constraints {:?} {:?} < {:?}",
449						HexDisplay::from(&hash),
450						priority,
451						entry.priority,
452					);
453					return MaybeInserted::Ignored
454				}
455				evicted.insert(entry.hash);
456				would_free_size += len;
457			}
458		}
459		// Now check global constraints as well.
460		if !((self.total_size - would_free_size + statement_len <= self.options.max_total_size) &&
461			self.entries.len() + 1 - evicted.len() <= self.options.max_total_statements)
462		{
463			log::debug!(
464				target: LOG_TARGET,
465				"Ignored statement {} because the store is full (size={}, count={})",
466				HexDisplay::from(&hash),
467				self.total_size,
468				self.entries.len(),
469			);
470			return MaybeInserted::Ignored
471		}
472
473		for h in &evicted {
474			self.make_expired(h, current_time);
475		}
476		self.insert_new(hash, *account, statement);
477		MaybeInserted::Inserted(evicted)
478	}
479}
480
481impl Store {
482	/// Create a new shared store instance. There should only be one per process.
483	/// `path` will be used to open a statement database or create a new one if it does not exist.
484	pub fn new_shared<Block, Client>(
485		path: &std::path::Path,
486		options: Options,
487		client: Arc<Client>,
488		keystore: Arc<LocalKeystore>,
489		prometheus: Option<&PrometheusRegistry>,
490		task_spawner: &dyn SpawnNamed,
491	) -> Result<Arc<Store>>
492	where
493		Block: BlockT,
494		Block::Hash: From<BlockHash>,
495		Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
496		Client::Api: ValidateStatement<Block>,
497	{
498		let store = Arc::new(Self::new(path, options, client, keystore, prometheus)?);
499
500		// Perform periodic statement store maintenance
501		let worker_store = store.clone();
502		task_spawner.spawn(
503			"statement-store-maintenance",
504			Some("statement-store"),
505			Box::pin(async move {
506				let mut interval = tokio::time::interval(MAINTENANCE_PERIOD);
507				loop {
508					interval.tick().await;
509					worker_store.maintain();
510				}
511			}),
512		);
513
514		Ok(store)
515	}
516
517	/// Create a new instance.
518	/// `path` will be used to open a statement database or create a new one if it does not exist.
519	fn new<Block, Client>(
520		path: &std::path::Path,
521		options: Options,
522		client: Arc<Client>,
523		keystore: Arc<LocalKeystore>,
524		prometheus: Option<&PrometheusRegistry>,
525	) -> Result<Store>
526	where
527		Block: BlockT,
528		Block::Hash: From<BlockHash>,
529		Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
530		Client::Api: ValidateStatement<Block>,
531	{
532		let mut path: std::path::PathBuf = path.into();
533		path.push("statements");
534
535		let mut config = parity_db::Options::with_columns(&path, col::COUNT);
536
537		let statement_col = &mut config.columns[col::STATEMENTS as usize];
538		statement_col.ref_counted = false;
539		statement_col.preimage = true;
540		statement_col.uniform = true;
541		let db = parity_db::Db::open_or_create(&config).map_err(|e| Error::Db(e.to_string()))?;
542		match db.get(col::META, &KEY_VERSION).map_err(|e| Error::Db(e.to_string()))? {
543			Some(version) => {
544				let version = u32::from_le_bytes(
545					version
546						.try_into()
547						.map_err(|_| Error::Db("Error reading database version".into()))?,
548				);
549				if version != CURRENT_VERSION {
550					return Err(Error::Db(format!("Unsupported database version: {version}")))
551				}
552			},
553			None => {
554				db.commit([(
555					col::META,
556					KEY_VERSION.to_vec(),
557					Some(CURRENT_VERSION.to_le_bytes().to_vec()),
558				)])
559				.map_err(|e| Error::Db(e.to_string()))?;
560			},
561		}
562
563		let validator = ClientWrapper { client, _block: Default::default() };
564		let validate_fn = Box::new(move |block, source, statement| {
565			validator.validate_statement(block, source, statement)
566		});
567
568		let store = Store {
569			db,
570			index: RwLock::new(Index::new(options)),
571			validate_fn,
572			keystore,
573			time_override: None,
574			metrics: PrometheusMetrics::new(prometheus),
575		};
576		store.populate()?;
577		Ok(store)
578	}
579
580	/// Create memory index from the data.
581	// This may be moved to a background thread if it slows startup too much.
582	// This function should only be used on startup. There should be no other DB operations when
583	// iterating the index.
584	fn populate(&self) -> Result<()> {
585		{
586			let mut index = self.index.write();
587			self.db
588				.iter_column_while(col::STATEMENTS, |item| {
589					let statement = item.value;
590					if let Ok(statement) = Statement::decode(&mut statement.as_slice()) {
591						let hash = statement.hash();
592						log::trace!(
593							target: LOG_TARGET,
594							"Statement loaded {:?}",
595							HexDisplay::from(&hash)
596						);
597						if let Some(account_id) = statement.account_id() {
598							index.insert_new(hash, account_id, &statement);
599						} else {
600							log::debug!(
601								target: LOG_TARGET,
602								"Error decoding statement loaded from the DB: {:?}",
603								HexDisplay::from(&hash)
604							);
605						}
606					}
607					true
608				})
609				.map_err(|e| Error::Db(e.to_string()))?;
610			self.db
611				.iter_column_while(col::EXPIRED, |item| {
612					let expired_info = item.value;
613					if let Ok((hash, timestamp)) =
614						<(Hash, u64)>::decode(&mut expired_info.as_slice())
615					{
616						log::trace!(
617							target: LOG_TARGET,
618							"Statement loaded (expired): {:?}",
619							HexDisplay::from(&hash)
620						);
621						index.insert_expired(hash, timestamp);
622					}
623					true
624				})
625				.map_err(|e| Error::Db(e.to_string()))?;
626		}
627
628		self.maintain();
629		Ok(())
630	}
631
632	fn collect_statements<R>(
633		&self,
634		key: Option<DecryptionKey>,
635		match_all_topics: &[Topic],
636		mut f: impl FnMut(Statement) -> Option<R>,
637	) -> Result<Vec<R>> {
638		let mut result = Vec::new();
639		let index = self.index.read();
640		index.iterate_with(key, match_all_topics, |hash| {
641			match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? {
642				Some(entry) => {
643					if let Ok(statement) = Statement::decode(&mut entry.as_slice()) {
644						if let Some(data) = f(statement) {
645							result.push(data);
646						}
647					} else {
648						// DB inconsistency
649						log::warn!(
650							target: LOG_TARGET,
651							"Corrupt statement {:?}",
652							HexDisplay::from(hash)
653						);
654					}
655				},
656				None => {
657					// DB inconsistency
658					log::warn!(
659						target: LOG_TARGET,
660						"Missing statement {:?}",
661						HexDisplay::from(hash)
662					);
663				},
664			}
665			Ok(())
666		})?;
667		Ok(result)
668	}
669
670	/// Perform periodic store maintenance
671	pub fn maintain(&self) {
672		log::trace!(target: LOG_TARGET, "Started store maintenance");
673		let (deleted, active_count, expired_count): (Vec<_>, usize, usize) = {
674			let mut index = self.index.write();
675			let deleted = index.maintain(self.timestamp());
676			(deleted, index.entries.len(), index.expired.len())
677		};
678		let deleted: Vec<_> =
679			deleted.into_iter().map(|hash| (col::EXPIRED, hash.to_vec(), None)).collect();
680		let deleted_count = deleted.len() as u64;
681		if let Err(e) = self.db.commit(deleted) {
682			log::warn!(target: LOG_TARGET, "Error writing to the statement database: {:?}", e);
683		} else {
684			self.metrics.report(|metrics| metrics.statements_pruned.inc_by(deleted_count));
685		}
686		log::trace!(
687			target: LOG_TARGET,
688			"Completed store maintenance. Purged: {}, Active: {}, Expired: {}",
689			deleted_count,
690			active_count,
691			expired_count
692		);
693	}
694
695	fn timestamp(&self) -> u64 {
696		self.time_override.unwrap_or_else(|| {
697			std::time::SystemTime::now()
698				.duration_since(std::time::UNIX_EPOCH)
699				.unwrap_or_default()
700				.as_secs()
701		})
702	}
703
704	#[cfg(test)]
705	fn set_time(&mut self, time: u64) {
706		self.time_override = Some(time);
707	}
708
709	/// Returns `self` as [`StatementStoreExt`].
710	pub fn as_statement_store_ext(self: Arc<Self>) -> StatementStoreExt {
711		StatementStoreExt::new(self)
712	}
713
714	/// Return information of all known statements whose decryption key is identified as
715	/// `dest`. The key must be available to the client.
716	fn posted_clear_inner<R>(
717		&self,
718		match_all_topics: &[Topic],
719		dest: [u8; 32],
720		// Map the statement and the decrypted data to the desired result.
721		mut map_f: impl FnMut(Statement, Vec<u8>) -> R,
722	) -> Result<Vec<R>> {
723		self.collect_statements(Some(dest), match_all_topics, |statement| {
724			if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) {
725				let public: sp_core::ed25519::Public = UncheckedFrom::unchecked_from(key);
726				let public: sp_statement_store::ed25519::Public = public.into();
727				match self.keystore.key_pair::<sp_statement_store::ed25519::Pair>(&public) {
728					Err(e) => {
729						log::debug!(
730							target: LOG_TARGET,
731							"Keystore error: {:?}, for statement {:?}",
732							e,
733							HexDisplay::from(&statement.hash())
734						);
735						None
736					},
737					Ok(None) => {
738						log::debug!(
739							target: LOG_TARGET,
740							"Keystore is missing key for statement {:?}",
741							HexDisplay::from(&statement.hash())
742						);
743						None
744					},
745					Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) {
746						Ok(r) => r.map(|data| map_f(statement, data)),
747						Err(e) => {
748							log::debug!(
749								target: LOG_TARGET,
750								"Decryption error: {:?}, for statement {:?}",
751								e,
752								HexDisplay::from(&statement.hash())
753							);
754							None
755						},
756					},
757				}
758			} else {
759				None
760			}
761		})
762	}
763}
764
765impl StatementStore for Store {
766	/// Return all statements.
767	fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
768		let index = self.index.read();
769		let mut result = Vec::with_capacity(index.entries.len());
770		for h in index.entries.keys() {
771			let encoded = self.db.get(col::STATEMENTS, h).map_err(|e| Error::Db(e.to_string()))?;
772			if let Some(encoded) = encoded {
773				if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
774					let hash = statement.hash();
775					result.push((hash, statement));
776				}
777			}
778		}
779		Ok(result)
780	}
781
782	/// Returns a statement by hash.
783	fn statement(&self, hash: &Hash) -> Result<Option<Statement>> {
784		Ok(
785			match self
786				.db
787				.get(col::STATEMENTS, hash.as_slice())
788				.map_err(|e| Error::Db(e.to_string()))?
789			{
790				Some(entry) => {
791					log::trace!(
792						target: LOG_TARGET,
793						"Queried statement {:?}",
794						HexDisplay::from(hash)
795					);
796					Some(
797						Statement::decode(&mut entry.as_slice())
798							.map_err(|e| Error::Decode(e.to_string()))?,
799					)
800				},
801				None => {
802					log::trace!(
803						target: LOG_TARGET,
804						"Queried missing statement {:?}",
805						HexDisplay::from(hash)
806					);
807					None
808				},
809			},
810		)
811	}
812
813	/// Return the data of all known statements which include all topics and have no `DecryptionKey`
814	/// field.
815	fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
816		self.collect_statements(None, match_all_topics, |statement| statement.into_data())
817	}
818
819	/// Return the data of all known statements whose decryption key is identified as `dest` (this
820	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
821	/// private key for symmetric ciphers).
822	fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
823		self.collect_statements(Some(dest), match_all_topics, |statement| statement.into_data())
824	}
825
826	/// Return the decrypted data of all known statements whose decryption key is identified as
827	/// `dest`. The key must be available to the client.
828	fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
829		self.posted_clear_inner(match_all_topics, dest, |_statement, data| data)
830	}
831
832	/// Return all known statements which include all topics and have no `DecryptionKey`
833	/// field.
834	fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
835		self.collect_statements(None, match_all_topics, |statement| Some(statement.encode()))
836	}
837
838	/// Return all known statements whose decryption key is identified as `dest` (this
839	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
840	/// private key for symmetric ciphers).
841	fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
842		self.collect_statements(Some(dest), match_all_topics, |statement| Some(statement.encode()))
843	}
844
845	/// Return the statement and the decrypted data of all known statements whose decryption key is
846	/// identified as `dest`. The key must be available to the client.
847	fn posted_clear_stmt(
848		&self,
849		match_all_topics: &[Topic],
850		dest: [u8; 32],
851	) -> Result<Vec<Vec<u8>>> {
852		self.posted_clear_inner(match_all_topics, dest, |statement, data| {
853			let mut res = Vec::with_capacity(statement.size_hint() + data.len());
854			statement.encode_to(&mut res);
855			res.extend_from_slice(&data);
856			res
857		})
858	}
859
860	/// Submit a statement to the store. Validates the statement and returns validation result.
861	fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult {
862		let hash = statement.hash();
863		match self.index.read().query(&hash) {
864			IndexQuery::Expired =>
865				if !source.can_be_resubmitted() {
866					return SubmitResult::KnownExpired
867				},
868			IndexQuery::Exists =>
869				if !source.can_be_resubmitted() {
870					return SubmitResult::Known
871				},
872			IndexQuery::Unknown => {},
873		}
874
875		let Some(account_id) = statement.account_id() else {
876			log::debug!(
877				target: LOG_TARGET,
878				"Statement validation failed: Missing proof ({:?})",
879				HexDisplay::from(&hash),
880			);
881			self.metrics.report(|metrics| metrics.validations_invalid.inc());
882			return SubmitResult::Bad("No statement proof")
883		};
884
885		// Validate.
886		let at_block = if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
887			Some(*block_hash)
888		} else {
889			None
890		};
891		let validation_result = (self.validate_fn)(at_block, source, statement.clone());
892		let validation = match validation_result {
893			Ok(validation) => validation,
894			Err(InvalidStatement::BadProof) => {
895				log::debug!(
896					target: LOG_TARGET,
897					"Statement validation failed: BadProof, {:?}",
898					HexDisplay::from(&hash),
899				);
900				self.metrics.report(|metrics| metrics.validations_invalid.inc());
901				return SubmitResult::Bad("Bad statement proof")
902			},
903			Err(InvalidStatement::NoProof) => {
904				log::debug!(
905					target: LOG_TARGET,
906					"Statement validation failed: NoProof, {:?}",
907					HexDisplay::from(&hash),
908				);
909				self.metrics.report(|metrics| metrics.validations_invalid.inc());
910				return SubmitResult::Bad("Missing statement proof")
911			},
912			Err(InvalidStatement::InternalError) =>
913				return SubmitResult::InternalError(Error::Runtime),
914		};
915
916		let current_time = self.timestamp();
917		let mut commit = Vec::new();
918		{
919			let mut index = self.index.write();
920
921			let evicted =
922				match index.insert(hash, &statement, &account_id, &validation, current_time) {
923					MaybeInserted::Ignored => return SubmitResult::Ignored,
924					MaybeInserted::Inserted(evicted) => evicted,
925				};
926
927			commit.push((col::STATEMENTS, hash.to_vec(), Some(statement.encode())));
928			for hash in evicted {
929				commit.push((col::STATEMENTS, hash.to_vec(), None));
930				commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
931			}
932			if let Err(e) = self.db.commit(commit) {
933				log::debug!(
934					target: LOG_TARGET,
935					"Statement validation failed: database error {}, {:?}",
936					e,
937					statement
938				);
939				return SubmitResult::InternalError(Error::Db(e.to_string()))
940			}
941		} // Release index lock
942		self.metrics.report(|metrics| metrics.submitted_statements.inc());
943		let network_priority = NetworkPriority::High;
944		log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash));
945		SubmitResult::New(network_priority)
946	}
947
948	/// Remove a statement by hash.
949	fn remove(&self, hash: &Hash) -> Result<()> {
950		let current_time = self.timestamp();
951		{
952			let mut index = self.index.write();
953			if index.make_expired(hash, current_time) {
954				let commit = [
955					(col::STATEMENTS, hash.to_vec(), None),
956					(col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())),
957				];
958				if let Err(e) = self.db.commit(commit) {
959					log::debug!(
960						target: LOG_TARGET,
961						"Error removing statement: database error {}, {:?}",
962						e,
963						HexDisplay::from(hash),
964					);
965					return Err(Error::Db(e.to_string()))
966				}
967			}
968		}
969		Ok(())
970	}
971
972	/// Remove all statements by an account.
973	fn remove_by(&self, who: [u8; 32]) -> Result<()> {
974		let mut index = self.index.write();
975		let mut evicted = Vec::new();
976		if let Some(account_rec) = index.accounts.get(&who) {
977			evicted.extend(account_rec.by_priority.keys().map(|k| k.hash));
978		}
979
980		let current_time = self.timestamp();
981		let mut commit = Vec::new();
982		for hash in evicted {
983			index.make_expired(&hash, current_time);
984			commit.push((col::STATEMENTS, hash.to_vec(), None));
985			commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
986		}
987		self.db.commit(commit).map_err(|e| {
988			log::debug!(
989				target: LOG_TARGET,
990				"Error removing statement: database error {}, remove by {:?}",
991				e,
992				HexDisplay::from(&who),
993			);
994
995			Error::Db(e.to_string())
996		})
997	}
998}
999
1000#[cfg(test)]
1001mod tests {
1002	use crate::Store;
1003	use sc_keystore::Keystore;
1004	use sp_core::{Decode, Encode, Pair};
1005	use sp_statement_store::{
1006		runtime_api::{InvalidStatement, ValidStatement, ValidateStatement},
1007		AccountId, Channel, DecryptionKey, NetworkPriority, Proof, SignatureVerificationResult,
1008		Statement, StatementSource, StatementStore, SubmitResult, Topic,
1009	};
1010
1011	type Extrinsic = sp_runtime::OpaqueExtrinsic;
1012	type Hash = sp_core::H256;
1013	type Hashing = sp_runtime::traits::BlakeTwo256;
1014	type BlockNumber = u64;
1015	type Header = sp_runtime::generic::Header<BlockNumber, Hashing>;
1016	type Block = sp_runtime::generic::Block<Header, Extrinsic>;
1017
1018	const CORRECT_BLOCK_HASH: [u8; 32] = [1u8; 32];
1019
1020	#[derive(Clone)]
1021	pub(crate) struct TestClient;
1022
1023	pub(crate) struct RuntimeApi {
1024		_inner: TestClient,
1025	}
1026
1027	impl sp_api::ProvideRuntimeApi<Block> for TestClient {
1028		type Api = RuntimeApi;
1029		fn runtime_api(&self) -> sp_api::ApiRef<Self::Api> {
1030			RuntimeApi { _inner: self.clone() }.into()
1031		}
1032	}
1033
1034	sp_api::mock_impl_runtime_apis! {
1035		impl ValidateStatement<Block> for RuntimeApi {
1036			fn validate_statement(
1037				_source: StatementSource,
1038				statement: Statement,
1039			) -> std::result::Result<ValidStatement, InvalidStatement> {
1040				use crate::tests::account;
1041				match statement.verify_signature() {
1042					SignatureVerificationResult::Valid(_) => Ok(ValidStatement{max_count: 100, max_size: 1000}),
1043					SignatureVerificationResult::Invalid => Err(InvalidStatement::BadProof),
1044					SignatureVerificationResult::NoSignature => {
1045						if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
1046							if block_hash == &CORRECT_BLOCK_HASH {
1047								let (max_count, max_size) = match statement.account_id() {
1048									Some(a) if a == account(1) => (1, 1000),
1049									Some(a) if a == account(2) => (2, 1000),
1050									Some(a) if a == account(3) => (3, 1000),
1051									Some(a) if a == account(4) => (4, 1000),
1052									_ => (2, 2000),
1053								};
1054								Ok(ValidStatement{ max_count, max_size })
1055							} else {
1056								Err(InvalidStatement::BadProof)
1057							}
1058						} else {
1059							Err(InvalidStatement::BadProof)
1060						}
1061					}
1062				}
1063			}
1064		}
1065	}
1066
1067	impl sp_blockchain::HeaderBackend<Block> for TestClient {
1068		fn header(&self, _hash: Hash) -> sp_blockchain::Result<Option<Header>> {
1069			unimplemented!()
1070		}
1071		fn info(&self) -> sp_blockchain::Info<Block> {
1072			sp_blockchain::Info {
1073				best_hash: CORRECT_BLOCK_HASH.into(),
1074				best_number: 0,
1075				genesis_hash: Default::default(),
1076				finalized_hash: CORRECT_BLOCK_HASH.into(),
1077				finalized_number: 1,
1078				finalized_state: None,
1079				number_leaves: 0,
1080				block_gap: None,
1081			}
1082		}
1083		fn status(&self, _hash: Hash) -> sp_blockchain::Result<sp_blockchain::BlockStatus> {
1084			unimplemented!()
1085		}
1086		fn number(&self, _hash: Hash) -> sp_blockchain::Result<Option<BlockNumber>> {
1087			unimplemented!()
1088		}
1089		fn hash(&self, _number: BlockNumber) -> sp_blockchain::Result<Option<Hash>> {
1090			unimplemented!()
1091		}
1092	}
1093
1094	fn test_store() -> (Store, tempfile::TempDir) {
1095		sp_tracing::init_for_tests();
1096		let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
1097
1098		let client = std::sync::Arc::new(TestClient);
1099		let mut path: std::path::PathBuf = temp_dir.path().into();
1100		path.push("db");
1101		let keystore = std::sync::Arc::new(sc_keystore::LocalKeystore::in_memory());
1102		let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1103		(store, temp_dir) // return order is important. Store must be dropped before TempDir
1104	}
1105
1106	fn signed_statement(data: u8) -> Statement {
1107		signed_statement_with_topics(data, &[], None)
1108	}
1109
1110	fn signed_statement_with_topics(
1111		data: u8,
1112		topics: &[Topic],
1113		dec_key: Option<DecryptionKey>,
1114	) -> Statement {
1115		let mut statement = Statement::new();
1116		statement.set_plain_data(vec![data]);
1117		for i in 0..topics.len() {
1118			statement.set_topic(i, topics[i]);
1119		}
1120		if let Some(key) = dec_key {
1121			statement.set_decryption_key(key);
1122		}
1123		let kp = sp_core::ed25519::Pair::from_string("//Alice", None).unwrap();
1124		statement.sign_ed25519_private(&kp);
1125		statement
1126	}
1127
1128	fn topic(data: u64) -> Topic {
1129		let mut topic: Topic = Default::default();
1130		topic[0..8].copy_from_slice(&data.to_le_bytes());
1131		topic
1132	}
1133
1134	fn dec_key(data: u64) -> DecryptionKey {
1135		let mut dec_key: DecryptionKey = Default::default();
1136		dec_key[0..8].copy_from_slice(&data.to_le_bytes());
1137		dec_key
1138	}
1139
1140	fn account(id: u64) -> AccountId {
1141		let mut account: AccountId = Default::default();
1142		account[0..8].copy_from_slice(&id.to_le_bytes());
1143		account
1144	}
1145
1146	fn channel(id: u64) -> Channel {
1147		let mut channel: Channel = Default::default();
1148		channel[0..8].copy_from_slice(&id.to_le_bytes());
1149		channel
1150	}
1151
1152	fn statement(account_id: u64, priority: u32, c: Option<u64>, data_len: usize) -> Statement {
1153		let mut statement = Statement::new();
1154		let mut data = Vec::new();
1155		data.resize(data_len, 0);
1156		statement.set_plain_data(data);
1157		statement.set_priority(priority);
1158		if let Some(c) = c {
1159			statement.set_channel(channel(c));
1160		}
1161		statement.set_proof(Proof::OnChain {
1162			block_hash: CORRECT_BLOCK_HASH,
1163			who: account(account_id),
1164			event_index: 0,
1165		});
1166		statement
1167	}
1168
1169	#[test]
1170	fn submit_one() {
1171		let (store, _temp) = test_store();
1172		let statement0 = signed_statement(0);
1173		assert_eq!(
1174			store.submit(statement0, StatementSource::Network),
1175			SubmitResult::New(NetworkPriority::High)
1176		);
1177		let unsigned = statement(0, 1, None, 0);
1178		assert_eq!(
1179			store.submit(unsigned, StatementSource::Network),
1180			SubmitResult::New(NetworkPriority::High)
1181		);
1182	}
1183
1184	#[test]
1185	fn save_and_load_statements() {
1186		let (store, temp) = test_store();
1187		let statement0 = signed_statement(0);
1188		let statement1 = signed_statement(1);
1189		let statement2 = signed_statement(2);
1190		assert_eq!(
1191			store.submit(statement0.clone(), StatementSource::Network),
1192			SubmitResult::New(NetworkPriority::High)
1193		);
1194		assert_eq!(
1195			store.submit(statement1.clone(), StatementSource::Network),
1196			SubmitResult::New(NetworkPriority::High)
1197		);
1198		assert_eq!(
1199			store.submit(statement2.clone(), StatementSource::Network),
1200			SubmitResult::New(NetworkPriority::High)
1201		);
1202		assert_eq!(store.statements().unwrap().len(), 3);
1203		assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1204		assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1.clone()));
1205		let keystore = store.keystore.clone();
1206		drop(store);
1207
1208		let client = std::sync::Arc::new(TestClient);
1209		let mut path: std::path::PathBuf = temp.path().into();
1210		path.push("db");
1211		let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1212		assert_eq!(store.statements().unwrap().len(), 3);
1213		assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1214		assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
1215	}
1216
1217	#[test]
1218	fn search_by_topic_and_key() {
1219		let (store, _temp) = test_store();
1220		let statement0 = signed_statement(0);
1221		let statement1 = signed_statement_with_topics(1, &[topic(0)], None);
1222		let statement2 = signed_statement_with_topics(2, &[topic(0), topic(1)], Some(dec_key(2)));
1223		let statement3 = signed_statement_with_topics(3, &[topic(0), topic(1), topic(2)], None);
1224		let statement4 =
1225			signed_statement_with_topics(4, &[topic(0), topic(42), topic(2), topic(3)], None);
1226		let statements = vec![statement0, statement1, statement2, statement3, statement4];
1227		for s in &statements {
1228			store.submit(s.clone(), StatementSource::Network);
1229		}
1230
1231		let assert_topics = |topics: &[u64], key: Option<u64>, expected: &[u8]| {
1232			let key = key.map(dec_key);
1233			let topics: Vec<_> = topics.iter().map(|t| topic(*t)).collect();
1234			let mut got_vals: Vec<_> = if let Some(key) = key {
1235				store.posted(&topics, key).unwrap().into_iter().map(|d| d[0]).collect()
1236			} else {
1237				store.broadcasts(&topics).unwrap().into_iter().map(|d| d[0]).collect()
1238			};
1239			got_vals.sort();
1240			assert_eq!(expected.to_vec(), got_vals);
1241		};
1242
1243		assert_topics(&[], None, &[0, 1, 3, 4]);
1244		assert_topics(&[], Some(2), &[2]);
1245		assert_topics(&[0], None, &[1, 3, 4]);
1246		assert_topics(&[1], None, &[3]);
1247		assert_topics(&[2], None, &[3, 4]);
1248		assert_topics(&[3], None, &[4]);
1249		assert_topics(&[42], None, &[4]);
1250
1251		assert_topics(&[0, 1], None, &[3]);
1252		assert_topics(&[0, 1], Some(2), &[2]);
1253		assert_topics(&[0, 1, 99], Some(2), &[]);
1254		assert_topics(&[1, 2], None, &[3]);
1255		assert_topics(&[99], None, &[]);
1256		assert_topics(&[0, 99], None, &[]);
1257		assert_topics(&[0, 1, 2, 3, 42], None, &[]);
1258	}
1259
1260	#[test]
1261	fn constraints() {
1262		let (store, _temp) = test_store();
1263
1264		store.index.write().options.max_total_size = 3000;
1265		let source = StatementSource::Network;
1266		let ok = SubmitResult::New(NetworkPriority::High);
1267		let ignored = SubmitResult::Ignored;
1268
1269		// Account 1 (limit = 1 msg, 1000 bytes)
1270
1271		// Oversized statement is not allowed. Limit for account 1 is 1 msg, 1000 bytes
1272		assert_eq!(store.submit(statement(1, 1, Some(1), 2000), source), ignored);
1273		assert_eq!(store.submit(statement(1, 1, Some(1), 500), source), ok);
1274		// Would not replace channel message with same priority
1275		assert_eq!(store.submit(statement(1, 1, Some(1), 200), source), ignored);
1276		assert_eq!(store.submit(statement(1, 2, Some(1), 600), source), ok);
1277		// Submit another message to another channel with lower priority. Should not be allowed
1278		// because msg count limit is 1
1279		assert_eq!(store.submit(statement(1, 1, Some(2), 100), source), ignored);
1280		assert_eq!(store.index.read().expired.len(), 1);
1281
1282		// Account 2 (limit = 2 msg, 1000 bytes)
1283
1284		assert_eq!(store.submit(statement(2, 1, None, 500), source), ok);
1285		assert_eq!(store.submit(statement(2, 2, None, 100), source), ok);
1286		// Should evict priority 1
1287		assert_eq!(store.submit(statement(2, 3, None, 500), source), ok);
1288		assert_eq!(store.index.read().expired.len(), 2);
1289		// Should evict all
1290		assert_eq!(store.submit(statement(2, 4, None, 1000), source), ok);
1291		assert_eq!(store.index.read().expired.len(), 4);
1292
1293		// Account 3 (limit = 3 msg, 1000 bytes)
1294
1295		assert_eq!(store.submit(statement(3, 2, Some(1), 300), source), ok);
1296		assert_eq!(store.submit(statement(3, 3, Some(2), 300), source), ok);
1297		assert_eq!(store.submit(statement(3, 4, Some(3), 300), source), ok);
1298		// Should evict 2 and 3
1299		assert_eq!(store.submit(statement(3, 5, None, 500), source), ok);
1300		assert_eq!(store.index.read().expired.len(), 6);
1301
1302		assert_eq!(store.index.read().total_size, 2400);
1303		assert_eq!(store.index.read().entries.len(), 4);
1304
1305		// Should be over the global size limit
1306		assert_eq!(store.submit(statement(1, 1, None, 700), source), ignored);
1307		// Should be over the global count limit
1308		store.index.write().options.max_total_statements = 4;
1309		assert_eq!(store.submit(statement(1, 1, None, 100), source), ignored);
1310
1311		let mut expected_statements = vec![
1312			statement(1, 2, Some(1), 600).hash(),
1313			statement(2, 4, None, 1000).hash(),
1314			statement(3, 4, Some(3), 300).hash(),
1315			statement(3, 5, None, 500).hash(),
1316		];
1317		expected_statements.sort();
1318		let mut statements: Vec<_> =
1319			store.statements().unwrap().into_iter().map(|(hash, _)| hash).collect();
1320		statements.sort();
1321		assert_eq!(expected_statements, statements);
1322	}
1323
1324	#[test]
1325	fn expired_statements_are_purged() {
1326		use super::DEFAULT_PURGE_AFTER_SEC;
1327		let (mut store, temp) = test_store();
1328		let mut statement = statement(1, 1, Some(3), 100);
1329		store.set_time(0);
1330		statement.set_topic(0, topic(4));
1331		store.submit(statement.clone(), StatementSource::Network);
1332		assert_eq!(store.index.read().entries.len(), 1);
1333		store.remove(&statement.hash()).unwrap();
1334		assert_eq!(store.index.read().entries.len(), 0);
1335		assert_eq!(store.index.read().accounts.len(), 0);
1336		store.set_time(DEFAULT_PURGE_AFTER_SEC + 1);
1337		store.maintain();
1338		assert_eq!(store.index.read().expired.len(), 0);
1339		let keystore = store.keystore.clone();
1340		drop(store);
1341
1342		let client = std::sync::Arc::new(TestClient);
1343		let mut path: std::path::PathBuf = temp.path().into();
1344		path.push("db");
1345		let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1346		assert_eq!(store.statements().unwrap().len(), 0);
1347		assert_eq!(store.index.read().expired.len(), 0);
1348	}
1349
1350	#[test]
1351	fn posted_clear_decrypts() {
1352		let (store, _temp) = test_store();
1353		let public = store
1354			.keystore
1355			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1356			.unwrap();
1357		let statement1 = statement(1, 1, None, 100);
1358		let mut statement2 = statement(1, 2, None, 0);
1359		let plain = b"The most valuable secret".to_vec();
1360		statement2.encrypt(&plain, &public).unwrap();
1361		store.submit(statement1, StatementSource::Network);
1362		store.submit(statement2, StatementSource::Network);
1363		let posted_clear = store.posted_clear(&[], public.into()).unwrap();
1364		assert_eq!(posted_clear, vec![plain]);
1365	}
1366
1367	#[test]
1368	fn broadcasts_stmt_returns_encoded_statements() {
1369		let (store, _tmp) = test_store();
1370
1371		// no key, no topic
1372		let s0 = signed_statement_with_topics(0, &[], None);
1373		// same, but with a topic = 42
1374		let s1 = signed_statement_with_topics(1, &[topic(42)], None);
1375		// has a decryption key -> must NOT be returned by broadcasts_stmt
1376		let s2 = signed_statement_with_topics(2, &[topic(42)], Some(dec_key(99)));
1377
1378		for s in [&s0, &s1, &s2] {
1379			store.submit(s.clone(), StatementSource::Network);
1380		}
1381
1382		// no topic filter
1383		let mut hashes: Vec<_> = store
1384			.broadcasts_stmt(&[])
1385			.unwrap()
1386			.into_iter()
1387			.map(|bytes| Statement::decode(&mut &bytes[..]).unwrap().hash())
1388			.collect();
1389		hashes.sort();
1390		let expected_hashes = {
1391			let mut e = vec![s0.hash(), s1.hash()];
1392			e.sort();
1393			e
1394		};
1395		assert_eq!(hashes, expected_hashes);
1396
1397		// filter on topic 42
1398		let got = store.broadcasts_stmt(&[topic(42)]).unwrap();
1399		assert_eq!(got.len(), 1);
1400		let st = Statement::decode(&mut &got[0][..]).unwrap();
1401		assert_eq!(st.hash(), s1.hash());
1402	}
1403
1404	#[test]
1405	fn posted_stmt_returns_encoded_statements_for_dest() {
1406		let (store, _tmp) = test_store();
1407
1408		let public1 = store
1409			.keystore
1410			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1411			.unwrap();
1412		let dest: [u8; 32] = public1.into();
1413
1414		let public2 = store
1415			.keystore
1416			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1417			.unwrap();
1418
1419		// A statement that does have dec_key = dest
1420		let mut s_with_key = statement(1, 1, None, 0);
1421		let plain1 = b"The most valuable secret".to_vec();
1422		s_with_key.encrypt(&plain1, &public1).unwrap();
1423
1424		// A statement with a different dec_key
1425		let mut s_other_key = statement(2, 2, None, 0);
1426		let plain2 = b"The second most valuable secret".to_vec();
1427		s_other_key.encrypt(&plain2, &public2).unwrap();
1428
1429		// Submit them all
1430		for s in [&s_with_key, &s_other_key] {
1431			store.submit(s.clone(), StatementSource::Network);
1432		}
1433
1434		// posted_stmt should only return the one with dec_key = dest
1435		let retrieved = store.posted_stmt(&[], dest).unwrap();
1436		assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1437
1438		// Re-decode that returned statement to confirm it is correct
1439		let returned_stmt = Statement::decode(&mut &retrieved[0][..]).unwrap();
1440		assert_eq!(
1441			returned_stmt.hash(),
1442			s_with_key.hash(),
1443			"Returned statement must match s_with_key"
1444		);
1445	}
1446
1447	#[test]
1448	fn posted_clear_stmt_returns_statement_followed_by_plain_data() {
1449		let (store, _tmp) = test_store();
1450
1451		let public1 = store
1452			.keystore
1453			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1454			.unwrap();
1455		let dest: [u8; 32] = public1.into();
1456
1457		let public2 = store
1458			.keystore
1459			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1460			.unwrap();
1461
1462		// A statement that does have dec_key = dest
1463		let mut s_with_key = statement(1, 1, None, 0);
1464		let plain1 = b"The most valuable secret".to_vec();
1465		s_with_key.encrypt(&plain1, &public1).unwrap();
1466
1467		// A statement with a different dec_key
1468		let mut s_other_key = statement(2, 2, None, 0);
1469		let plain2 = b"The second most valuable secret".to_vec();
1470		s_other_key.encrypt(&plain2, &public2).unwrap();
1471
1472		// Submit them all
1473		for s in [&s_with_key, &s_other_key] {
1474			store.submit(s.clone(), StatementSource::Network);
1475		}
1476
1477		// posted_stmt should only return the one with dec_key = dest
1478		let retrieved = store.posted_clear_stmt(&[], dest).unwrap();
1479		assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1480
1481		// We expect: [ encoded Statement ] + [ the decrypted bytes ]
1482		let encoded_stmt = s_with_key.encode();
1483		let stmt_len = encoded_stmt.len();
1484
1485		// 1) statement is first
1486		assert_eq!(&retrieved[0][..stmt_len], &encoded_stmt[..]);
1487
1488		// 2) followed by the decrypted payload
1489		let trailing = &retrieved[0][stmt_len..];
1490		assert_eq!(trailing, &plain1[..]);
1491	}
1492
1493	#[test]
1494	fn posted_clear_returns_plain_data_for_dest_and_topics() {
1495		let (store, _tmp) = test_store();
1496
1497		// prepare two key-pairs
1498		let public_dest = store
1499			.keystore
1500			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1501			.unwrap();
1502		let dest: [u8; 32] = public_dest.into();
1503
1504		let public_other = store
1505			.keystore
1506			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1507			.unwrap();
1508
1509		// statement that SHOULD be returned (matches dest & topic 42)
1510		let mut s_good = statement(1, 1, None, 0);
1511		let plaintext_good = b"The most valuable secret".to_vec();
1512		s_good.encrypt(&plaintext_good, &public_dest).unwrap();
1513		s_good.set_topic(0, topic(42));
1514
1515		// statement that should NOT be returned (same dest but different topic)
1516		let mut s_wrong_topic = statement(2, 2, None, 0);
1517		s_wrong_topic.encrypt(b"Wrong topic", &public_dest).unwrap();
1518		s_wrong_topic.set_topic(0, topic(99));
1519
1520		// statement that should NOT be returned (different dest)
1521		let mut s_other_dest = statement(3, 3, None, 0);
1522		s_other_dest.encrypt(b"Other dest", &public_other).unwrap();
1523		s_other_dest.set_topic(0, topic(42));
1524
1525		// submit all
1526		for s in [&s_good, &s_wrong_topic, &s_other_dest] {
1527			store.submit(s.clone(), StatementSource::Network);
1528		}
1529
1530		// call posted_clear with the topic filter and dest
1531		let retrieved = store.posted_clear(&[topic(42)], dest).unwrap();
1532
1533		// exactly one element, equal to the expected plaintext
1534		assert_eq!(retrieved, vec![plaintext_good]);
1535	}
1536
1537	#[test]
1538	fn remove_by_covers_various_situations() {
1539		use sp_statement_store::{StatementSource, StatementStore, SubmitResult};
1540
1541		// Use a fresh store and fixed time so we can control purging.
1542		let (mut store, _temp) = test_store();
1543		store.set_time(0);
1544
1545		// Reuse helpers from this module.
1546		let t42 = topic(42);
1547		let k7 = dec_key(7);
1548
1549		// Account A = 4 (has per-account limits (4, 1000) in the mock runtime)
1550		// - Mix of topic, decryption-key and channel to exercise every index.
1551		let mut s_a1 = statement(4, 10, Some(100), 100);
1552		s_a1.set_topic(0, t42);
1553		let h_a1 = s_a1.hash();
1554
1555		let mut s_a2 = statement(4, 20, Some(200), 150);
1556		s_a2.set_decryption_key(k7);
1557		let h_a2 = s_a2.hash();
1558
1559		let s_a3 = statement(4, 30, None, 50);
1560		let h_a3 = s_a3.hash();
1561
1562		// Account B = 3 (control group that must remain untouched).
1563		let s_b1 = statement(3, 10, None, 100);
1564		let h_b1 = s_b1.hash();
1565
1566		let mut s_b2 = statement(3, 15, Some(300), 100);
1567		s_b2.set_topic(0, t42);
1568		s_b2.set_decryption_key(k7);
1569		let h_b2 = s_b2.hash();
1570
1571		// Submit all statements.
1572		for s in [&s_a1, &s_a2, &s_a3, &s_b1, &s_b2] {
1573			assert!(matches!(
1574				store.submit(s.clone(), StatementSource::Network),
1575				SubmitResult::New(_)
1576			));
1577		}
1578
1579		// --- Pre-conditions: everything is indexed as expected.
1580		{
1581			let idx = store.index.read();
1582			assert_eq!(idx.entries.len(), 5, "all 5 should be present");
1583			assert!(idx.accounts.contains_key(&account(4)));
1584			assert!(idx.accounts.contains_key(&account(3)));
1585			assert_eq!(idx.total_size, 100 + 150 + 50 + 100 + 100);
1586
1587			// Topic and key sets contain both A & B entries.
1588			let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1589			assert!(set_t.contains(&h_a1) && set_t.contains(&h_b2));
1590
1591			let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1592			assert!(set_k.contains(&h_a2) && set_k.contains(&h_b2));
1593		}
1594
1595		// --- Action: remove all statements by Account A.
1596		store.remove_by(account(4)).expect("remove_by should succeed");
1597
1598		// --- Post-conditions: A's statements are gone and marked expired; B's remain.
1599		{
1600			// A's statements removed from DB view.
1601			for h in [h_a1, h_a2, h_a3] {
1602				assert!(store.statement(&h).unwrap().is_none(), "A's statement should be removed");
1603			}
1604
1605			// B's statements still present.
1606			for h in [h_b1, h_b2] {
1607				assert!(store.statement(&h).unwrap().is_some(), "B's statement should remain");
1608			}
1609
1610			let idx = store.index.read();
1611
1612			// Account map updated.
1613			assert!(!idx.accounts.contains_key(&account(4)), "Account A must be gone");
1614			assert!(idx.accounts.contains_key(&account(3)), "Account B must remain");
1615
1616			// Removed statements are marked expired.
1617			assert!(idx.expired.contains_key(&h_a1));
1618			assert!(idx.expired.contains_key(&h_a2));
1619			assert!(idx.expired.contains_key(&h_a3));
1620			assert_eq!(idx.expired.len(), 3);
1621
1622			// Entry count & total_size reflect only B's data.
1623			assert_eq!(idx.entries.len(), 2);
1624			assert_eq!(idx.total_size, 100 + 100);
1625
1626			// Topic index: only B2 remains for topic 42.
1627			let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1628			assert!(set_t.contains(&h_b2));
1629			assert!(!set_t.contains(&h_a1));
1630
1631			// Decryption-key index: only B2 remains for key 7.
1632			let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1633			assert!(set_k.contains(&h_b2));
1634			assert!(!set_k.contains(&h_a2));
1635		}
1636
1637		// --- Idempotency: removing again is a no-op and should not error.
1638		store.remove_by(account(4)).expect("second remove_by should be a no-op");
1639
1640		// --- Purge: advance time beyond TTL and run maintenance; expired entries disappear.
1641		let purge_after = store.index.read().options.purge_after_sec;
1642		store.set_time(purge_after + 1);
1643		store.maintain();
1644		assert_eq!(store.index.read().expired.len(), 0, "expired entries should be purged");
1645
1646		// --- Reuse: Account A can submit again after purge.
1647		let s_new = statement(4, 40, None, 10);
1648		assert!(matches!(store.submit(s_new, StatementSource::Network), SubmitResult::New(_)));
1649	}
1650}