sc_statement_store/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Disk-backed statement store.
20//!
21//! This module contains an implementation of `sp_statement_store::StatementStore` which is backed
22//! by a database.
23//!
24//! Constraint management.
25//!
26//! Each time a new statement is inserted into the store, it is first validated with the runtime
27//! Validation function computes `global_priority`, 'max_count' and `max_size` for a statement.
28//! The following constraints are then checked:
29//! * For a given account id, there may be at most `max_count` statements with `max_size` total data
30//!   size. To satisfy this, statements for this account ID are removed from the store starting with
31//!   the lowest priority until a constraint is satisfied.
32//! * There may not be more than `MAX_TOTAL_STATEMENTS` total statements with `MAX_TOTAL_SIZE` size.
33//!   To satisfy this, statements are removed from the store starting with the lowest
34//!   `global_priority` until a constraint is satisfied.
35//!
36//! When a new statement is inserted that would not satisfy constraints in the first place, no
37//! statements are deleted and `Ignored` result is returned.
38//! The order in which statements with the same priority are deleted is unspecified.
39//!
40//! Statement expiration.
41//!
42//! Each time a statement is removed from the store (Either evicted by higher priority statement or
43//! explicitly with the `remove` function) the statement is marked as expired. Expired statements
44//! can't be added to the store for `Options::purge_after_sec` seconds. This is to prevent old
45//! statements from being propagated on the network.
46
47#![warn(missing_docs)]
48#![warn(unused_extern_crates)]
49
50mod metrics;
51
52pub use sp_statement_store::{Error, StatementStore, MAX_TOPICS};
53
54use metrics::MetricsLink as PrometheusMetrics;
55use parking_lot::RwLock;
56use prometheus_endpoint::Registry as PrometheusRegistry;
57use sc_keystore::LocalKeystore;
58use sp_api::ProvideRuntimeApi;
59use sp_blockchain::HeaderBackend;
60use sp_core::{crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode};
61use sp_runtime::traits::Block as BlockT;
62use sp_statement_store::{
63	runtime_api::{
64		InvalidStatement, StatementSource, StatementStoreExt, ValidStatement, ValidateStatement,
65	},
66	AccountId, BlockHash, Channel, DecryptionKey, Hash, NetworkPriority, Proof, Result, Statement,
67	SubmitResult, Topic,
68};
69use std::{
70	collections::{BTreeMap, HashMap, HashSet},
71	sync::Arc,
72};
73
74const KEY_VERSION: &[u8] = b"version".as_slice();
75const CURRENT_VERSION: u32 = 1;
76
77const LOG_TARGET: &str = "statement-store";
78
79/// The amount of time an expired statement is kept before it is removed from the store entirely.
80pub const DEFAULT_PURGE_AFTER_SEC: u64 = 2 * 24 * 60 * 60; //48h
81/// The maximum number of statements the statement store can hold.
82pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; // ~4 million
83/// The maximum amount of data the statement store can hold, regardless of the number of
84/// statements from which the data originates.
85pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; // 2GiB
86/// The maximum size of a single statement in bytes.
87/// Accounts for the 1-byte vector length prefix when statements are gossiped as `Vec<Statement>`.
88pub const MAX_STATEMENT_SIZE: usize =
89	sc_network_statement::config::MAX_STATEMENT_NOTIFICATION_SIZE as usize - 1;
90
91const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(30);
92
93mod col {
94	pub const META: u8 = 0;
95	pub const STATEMENTS: u8 = 1;
96	pub const EXPIRED: u8 = 2;
97
98	pub const COUNT: u8 = 3;
99}
100
101#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)]
102struct Priority(u32);
103
104#[derive(PartialEq, Eq)]
105struct PriorityKey {
106	hash: Hash,
107	priority: Priority,
108}
109
110impl PartialOrd for PriorityKey {
111	fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
112		Some(self.cmp(other))
113	}
114}
115
116impl Ord for PriorityKey {
117	fn cmp(&self, other: &Self) -> std::cmp::Ordering {
118		self.priority.cmp(&other.priority).then_with(|| self.hash.cmp(&other.hash))
119	}
120}
121
122#[derive(PartialEq, Eq)]
123struct ChannelEntry {
124	hash: Hash,
125	priority: Priority,
126}
127
128#[derive(Default)]
129struct StatementsForAccount {
130	// Statements ordered by priority.
131	by_priority: BTreeMap<PriorityKey, (Option<Channel>, usize)>,
132	// Channel to statement map. Only one statement per channel is allowed.
133	channels: HashMap<Channel, ChannelEntry>,
134	// Sum of all `Data` field sizes.
135	data_size: usize,
136}
137
138/// Store configuration
139pub struct Options {
140	/// Maximum statement allowed in the store. Once this limit is reached lower-priority
141	/// statements may be evicted.
142	max_total_statements: usize,
143	/// Maximum total data size allowed in the store. Once this limit is reached lower-priority
144	/// statements may be evicted.
145	max_total_size: usize,
146	/// Number of seconds for which removed statements won't be allowed to be added back in.
147	purge_after_sec: u64,
148}
149
150impl Default for Options {
151	fn default() -> Self {
152		Options {
153			max_total_statements: DEFAULT_MAX_TOTAL_STATEMENTS,
154			max_total_size: DEFAULT_MAX_TOTAL_SIZE,
155			purge_after_sec: DEFAULT_PURGE_AFTER_SEC,
156		}
157	}
158}
159
160#[derive(Default)]
161struct Index {
162	recent: HashSet<Hash>,
163	by_topic: HashMap<Topic, HashSet<Hash>>,
164	by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
165	topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
166	entries: HashMap<Hash, (AccountId, Priority, usize)>,
167	expired: HashMap<Hash, u64>, // Value is expiration timestamp.
168	accounts: HashMap<AccountId, StatementsForAccount>,
169	options: Options,
170	total_size: usize,
171}
172
173struct ClientWrapper<Block, Client> {
174	client: Arc<Client>,
175	_block: std::marker::PhantomData<Block>,
176}
177
178impl<Block, Client> ClientWrapper<Block, Client>
179where
180	Block: BlockT,
181	Block::Hash: From<BlockHash>,
182	Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
183	Client::Api: ValidateStatement<Block>,
184{
185	fn validate_statement(
186		&self,
187		block: Option<BlockHash>,
188		source: StatementSource,
189		statement: Statement,
190	) -> std::result::Result<ValidStatement, InvalidStatement> {
191		let api = self.client.runtime_api();
192		let block = block.map(Into::into).unwrap_or_else(|| {
193			// Validate against the finalized state.
194			self.client.info().finalized_hash
195		});
196		api.validate_statement(block, source, statement)
197			.map_err(|_| InvalidStatement::InternalError)?
198	}
199}
200
201/// Statement store.
202pub struct Store {
203	db: parity_db::Db,
204	index: RwLock<Index>,
205	validate_fn: Box<
206		dyn Fn(
207				Option<BlockHash>,
208				StatementSource,
209				Statement,
210			) -> std::result::Result<ValidStatement, InvalidStatement>
211			+ Send
212			+ Sync,
213	>,
214	keystore: Arc<LocalKeystore>,
215	// Used for testing
216	time_override: Option<u64>,
217	metrics: PrometheusMetrics,
218}
219
220enum IndexQuery {
221	Unknown,
222	Exists,
223	Expired,
224}
225
226enum MaybeInserted {
227	Inserted(HashSet<Hash>),
228	Ignored,
229}
230
231impl Index {
232	fn new(options: Options) -> Index {
233		Index { options, ..Default::default() }
234	}
235
236	fn insert_new(&mut self, hash: Hash, account: AccountId, statement: &Statement) {
237		let mut all_topics = [None; MAX_TOPICS];
238		let mut nt = 0;
239		while let Some(t) = statement.topic(nt) {
240			self.by_topic.entry(t).or_default().insert(hash);
241			all_topics[nt] = Some(t);
242			nt += 1;
243		}
244		let key = statement.decryption_key();
245		self.by_dec_key.entry(key).or_default().insert(hash);
246		if nt > 0 || key.is_some() {
247			self.topics_and_keys.insert(hash, (all_topics, key));
248		}
249		let priority = Priority(statement.priority().unwrap_or(0));
250		self.entries.insert(hash, (account, priority, statement.data_len()));
251		self.recent.insert(hash);
252		self.total_size += statement.data_len();
253		let account_info = self.accounts.entry(account).or_default();
254		account_info.data_size += statement.data_len();
255		if let Some(channel) = statement.channel() {
256			account_info.channels.insert(channel, ChannelEntry { hash, priority });
257		}
258		account_info
259			.by_priority
260			.insert(PriorityKey { hash, priority }, (statement.channel(), statement.data_len()));
261	}
262
263	fn query(&self, hash: &Hash) -> IndexQuery {
264		if self.entries.contains_key(hash) {
265			return IndexQuery::Exists
266		}
267		if self.expired.contains_key(hash) {
268			return IndexQuery::Expired
269		}
270		IndexQuery::Unknown
271	}
272
273	fn insert_expired(&mut self, hash: Hash, timestamp: u64) {
274		self.expired.insert(hash, timestamp);
275	}
276
277	fn iterate_with(
278		&self,
279		key: Option<DecryptionKey>,
280		match_all_topics: &[Topic],
281		mut f: impl FnMut(&Hash) -> Result<()>,
282	) -> Result<()> {
283		let empty = HashSet::new();
284		let mut sets: [&HashSet<Hash>; MAX_TOPICS + 1] = [&empty; MAX_TOPICS + 1];
285		if match_all_topics.len() > MAX_TOPICS {
286			return Ok(())
287		}
288		let key_set = self.by_dec_key.get(&key);
289		if key_set.map_or(0, |s| s.len()) == 0 {
290			// Key does not exist in the index.
291			return Ok(())
292		}
293		sets[0] = key_set.expect("Function returns if key_set is None");
294		for (i, t) in match_all_topics.iter().enumerate() {
295			let set = self.by_topic.get(t);
296			if set.map_or(0, |s| s.len()) == 0 {
297				// At least one of the match_all_topics does not exist in the index.
298				return Ok(())
299			}
300			sets[i + 1] = set.expect("Function returns if set is None");
301		}
302		let sets = &mut sets[0..match_all_topics.len() + 1];
303		// Start with the smallest topic set or the key set.
304		sets.sort_by_key(|s| s.len());
305		for item in sets[0] {
306			if sets[1..].iter().all(|set| set.contains(item)) {
307				log::trace!(
308					target: LOG_TARGET,
309					"Iterating by topic/key: statement {:?}",
310					HexDisplay::from(item)
311				);
312				f(item)?
313			}
314		}
315		Ok(())
316	}
317
318	fn maintain(&mut self, current_time: u64) -> Vec<Hash> {
319		// Purge previously expired messages.
320		let mut purged = Vec::new();
321		self.expired.retain(|hash, timestamp| {
322			if *timestamp + self.options.purge_after_sec <= current_time {
323				purged.push(*hash);
324				log::trace!(target: LOG_TARGET, "Purged statement {:?}", HexDisplay::from(hash));
325				false
326			} else {
327				true
328			}
329		});
330		purged
331	}
332
333	fn take_recent(&mut self) -> HashSet<Hash> {
334		std::mem::take(&mut self.recent)
335	}
336
337	fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
338		if let Some((account, priority, len)) = self.entries.remove(hash) {
339			self.total_size -= len;
340			if let Some((topics, key)) = self.topics_and_keys.remove(hash) {
341				for t in topics.into_iter().flatten() {
342					if let std::collections::hash_map::Entry::Occupied(mut set) =
343						self.by_topic.entry(t)
344					{
345						set.get_mut().remove(hash);
346						if set.get().is_empty() {
347							set.remove_entry();
348						}
349					}
350				}
351				if let std::collections::hash_map::Entry::Occupied(mut set) =
352					self.by_dec_key.entry(key)
353				{
354					set.get_mut().remove(hash);
355					if set.get().is_empty() {
356						set.remove_entry();
357					}
358				}
359			}
360			let _ = self.recent.remove(hash);
361			self.expired.insert(*hash, current_time);
362			if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
363				self.accounts.entry(account)
364			{
365				let key = PriorityKey { hash: *hash, priority };
366				if let Some((channel, len)) = account_rec.get_mut().by_priority.remove(&key) {
367					account_rec.get_mut().data_size -= len;
368					if let Some(channel) = channel {
369						account_rec.get_mut().channels.remove(&channel);
370					}
371				}
372				if account_rec.get().by_priority.is_empty() {
373					account_rec.remove_entry();
374				}
375			}
376			log::trace!(target: LOG_TARGET, "Expired statement {:?}", HexDisplay::from(hash));
377			true
378		} else {
379			false
380		}
381	}
382
383	fn insert(
384		&mut self,
385		hash: Hash,
386		statement: &Statement,
387		account: &AccountId,
388		validation: &ValidStatement,
389		current_time: u64,
390	) -> MaybeInserted {
391		let statement_len = statement.data_len();
392		if statement_len > validation.max_size as usize {
393			log::debug!(
394				target: LOG_TARGET,
395				"Ignored oversize message: {:?} ({} bytes)",
396				HexDisplay::from(&hash),
397				statement_len,
398			);
399			return MaybeInserted::Ignored
400		}
401
402		let mut evicted = HashSet::new();
403		let mut would_free_size = 0;
404		let priority = Priority(statement.priority().unwrap_or(0));
405		let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize);
406		// It may happen that we can't delete enough lower priority messages
407		// to satisfy size constraints. We check for that before deleting anything,
408		// taking into account channel message replacement.
409		if let Some(account_rec) = self.accounts.get(account) {
410			if let Some(channel) = statement.channel() {
411				if let Some(channel_record) = account_rec.channels.get(&channel) {
412					if priority <= channel_record.priority {
413						// Trying to replace channel message with lower priority
414						log::debug!(
415							target: LOG_TARGET,
416							"Ignored lower priority channel message: {:?} {:?} <= {:?}",
417							HexDisplay::from(&hash),
418							priority,
419							channel_record.priority,
420						);
421						return MaybeInserted::Ignored
422					} else {
423						// Would replace channel message. Still need to check for size constraints
424						// below.
425						log::debug!(
426							target: LOG_TARGET,
427							"Replacing higher priority channel message: {:?} ({:?}) > {:?} ({:?})",
428							HexDisplay::from(&hash),
429							priority,
430							HexDisplay::from(&channel_record.hash),
431							channel_record.priority,
432						);
433						let key = PriorityKey {
434							hash: channel_record.hash,
435							priority: channel_record.priority,
436						};
437						if let Some((_channel, len)) = account_rec.by_priority.get(&key) {
438							would_free_size += *len;
439							evicted.insert(channel_record.hash);
440						}
441					}
442				}
443			}
444			// Check if we can evict enough lower priority statements to satisfy constraints
445			for (entry, (_, len)) in account_rec.by_priority.iter() {
446				if (account_rec.data_size - would_free_size + statement_len <= max_size) &&
447					account_rec.by_priority.len() + 1 - evicted.len() <= max_count
448				{
449					// Satisfied
450					break
451				}
452				if evicted.contains(&entry.hash) {
453					// Already accounted for above
454					continue
455				}
456				if entry.priority >= priority {
457					log::debug!(
458						target: LOG_TARGET,
459						"Ignored message due to constraints {:?} {:?} < {:?}",
460						HexDisplay::from(&hash),
461						priority,
462						entry.priority,
463					);
464					return MaybeInserted::Ignored
465				}
466				evicted.insert(entry.hash);
467				would_free_size += len;
468			}
469		}
470		// Now check global constraints as well.
471		if !((self.total_size - would_free_size + statement_len <= self.options.max_total_size) &&
472			self.entries.len() + 1 - evicted.len() <= self.options.max_total_statements)
473		{
474			log::debug!(
475				target: LOG_TARGET,
476				"Ignored statement {} because the store is full (size={}, count={})",
477				HexDisplay::from(&hash),
478				self.total_size,
479				self.entries.len(),
480			);
481			return MaybeInserted::Ignored
482		}
483
484		for h in &evicted {
485			self.make_expired(h, current_time);
486		}
487		self.insert_new(hash, *account, statement);
488		MaybeInserted::Inserted(evicted)
489	}
490}
491
492impl Store {
493	/// Create a new shared store instance. There should only be one per process.
494	/// `path` will be used to open a statement database or create a new one if it does not exist.
495	pub fn new_shared<Block, Client>(
496		path: &std::path::Path,
497		options: Options,
498		client: Arc<Client>,
499		keystore: Arc<LocalKeystore>,
500		prometheus: Option<&PrometheusRegistry>,
501		task_spawner: &dyn SpawnNamed,
502	) -> Result<Arc<Store>>
503	where
504		Block: BlockT,
505		Block::Hash: From<BlockHash>,
506		Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
507		Client::Api: ValidateStatement<Block>,
508	{
509		let store = Arc::new(Self::new(path, options, client, keystore, prometheus)?);
510
511		// Perform periodic statement store maintenance
512		let worker_store = store.clone();
513		task_spawner.spawn(
514			"statement-store-maintenance",
515			Some("statement-store"),
516			Box::pin(async move {
517				let mut interval = tokio::time::interval(MAINTENANCE_PERIOD);
518				loop {
519					interval.tick().await;
520					worker_store.maintain();
521				}
522			}),
523		);
524
525		Ok(store)
526	}
527
528	/// Create a new instance.
529	/// `path` will be used to open a statement database or create a new one if it does not exist.
530	#[doc(hidden)]
531	pub fn new<Block, Client>(
532		path: &std::path::Path,
533		options: Options,
534		client: Arc<Client>,
535		keystore: Arc<LocalKeystore>,
536		prometheus: Option<&PrometheusRegistry>,
537	) -> Result<Store>
538	where
539		Block: BlockT,
540		Block::Hash: From<BlockHash>,
541		Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
542		Client::Api: ValidateStatement<Block>,
543	{
544		let mut path: std::path::PathBuf = path.into();
545		path.push("statements");
546
547		let mut config = parity_db::Options::with_columns(&path, col::COUNT);
548
549		let statement_col = &mut config.columns[col::STATEMENTS as usize];
550		statement_col.ref_counted = false;
551		statement_col.preimage = true;
552		statement_col.uniform = true;
553		let db = parity_db::Db::open_or_create(&config).map_err(|e| Error::Db(e.to_string()))?;
554		match db.get(col::META, &KEY_VERSION).map_err(|e| Error::Db(e.to_string()))? {
555			Some(version) => {
556				let version = u32::from_le_bytes(
557					version
558						.try_into()
559						.map_err(|_| Error::Db("Error reading database version".into()))?,
560				);
561				if version != CURRENT_VERSION {
562					return Err(Error::Db(format!("Unsupported database version: {version}")))
563				}
564			},
565			None => {
566				db.commit([(
567					col::META,
568					KEY_VERSION.to_vec(),
569					Some(CURRENT_VERSION.to_le_bytes().to_vec()),
570				)])
571				.map_err(|e| Error::Db(e.to_string()))?;
572			},
573		}
574
575		let validator = ClientWrapper { client, _block: Default::default() };
576		let validate_fn = Box::new(move |block, source, statement| {
577			validator.validate_statement(block, source, statement)
578		});
579
580		let store = Store {
581			db,
582			index: RwLock::new(Index::new(options)),
583			validate_fn,
584			keystore,
585			time_override: None,
586			metrics: PrometheusMetrics::new(prometheus),
587		};
588		store.populate()?;
589		Ok(store)
590	}
591
592	/// Create memory index from the data.
593	// This may be moved to a background thread if it slows startup too much.
594	// This function should only be used on startup. There should be no other DB operations when
595	// iterating the index.
596	fn populate(&self) -> Result<()> {
597		{
598			let mut index = self.index.write();
599			self.db
600				.iter_column_while(col::STATEMENTS, |item| {
601					let statement = item.value;
602					if let Ok(statement) = Statement::decode(&mut statement.as_slice()) {
603						let hash = statement.hash();
604						log::trace!(
605							target: LOG_TARGET,
606							"Statement loaded {:?}",
607							HexDisplay::from(&hash)
608						);
609						if let Some(account_id) = statement.account_id() {
610							index.insert_new(hash, account_id, &statement);
611						} else {
612							log::debug!(
613								target: LOG_TARGET,
614								"Error decoding statement loaded from the DB: {:?}",
615								HexDisplay::from(&hash)
616							);
617						}
618					}
619					true
620				})
621				.map_err(|e| Error::Db(e.to_string()))?;
622			self.db
623				.iter_column_while(col::EXPIRED, |item| {
624					let expired_info = item.value;
625					if let Ok((hash, timestamp)) =
626						<(Hash, u64)>::decode(&mut expired_info.as_slice())
627					{
628						log::trace!(
629							target: LOG_TARGET,
630							"Statement loaded (expired): {:?}",
631							HexDisplay::from(&hash)
632						);
633						index.insert_expired(hash, timestamp);
634					}
635					true
636				})
637				.map_err(|e| Error::Db(e.to_string()))?;
638		}
639
640		self.maintain();
641		Ok(())
642	}
643
644	fn collect_statements<R>(
645		&self,
646		key: Option<DecryptionKey>,
647		match_all_topics: &[Topic],
648		mut f: impl FnMut(Statement) -> Option<R>,
649	) -> Result<Vec<R>> {
650		let mut result = Vec::new();
651		let index = self.index.read();
652		index.iterate_with(key, match_all_topics, |hash| {
653			match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? {
654				Some(entry) => {
655					if let Ok(statement) = Statement::decode(&mut entry.as_slice()) {
656						if let Some(data) = f(statement) {
657							result.push(data);
658						}
659					} else {
660						// DB inconsistency
661						log::warn!(
662							target: LOG_TARGET,
663							"Corrupt statement {:?}",
664							HexDisplay::from(hash)
665						);
666					}
667				},
668				None => {
669					// DB inconsistency
670					log::warn!(
671						target: LOG_TARGET,
672						"Missing statement {:?}",
673						HexDisplay::from(hash)
674					);
675				},
676			}
677			Ok(())
678		})?;
679		Ok(result)
680	}
681
682	/// Perform periodic store maintenance
683	pub fn maintain(&self) {
684		log::trace!(target: LOG_TARGET, "Started store maintenance");
685		let (deleted, active_count, expired_count): (Vec<_>, usize, usize) = {
686			let mut index = self.index.write();
687			let deleted = index.maintain(self.timestamp());
688			(deleted, index.entries.len(), index.expired.len())
689		};
690		let deleted: Vec<_> =
691			deleted.into_iter().map(|hash| (col::EXPIRED, hash.to_vec(), None)).collect();
692		let deleted_count = deleted.len() as u64;
693		if let Err(e) = self.db.commit(deleted) {
694			log::warn!(target: LOG_TARGET, "Error writing to the statement database: {:?}", e);
695		} else {
696			self.metrics.report(|metrics| metrics.statements_pruned.inc_by(deleted_count));
697		}
698		log::trace!(
699			target: LOG_TARGET,
700			"Completed store maintenance. Purged: {}, Active: {}, Expired: {}",
701			deleted_count,
702			active_count,
703			expired_count
704		);
705	}
706
707	fn timestamp(&self) -> u64 {
708		self.time_override.unwrap_or_else(|| {
709			std::time::SystemTime::now()
710				.duration_since(std::time::UNIX_EPOCH)
711				.unwrap_or_default()
712				.as_secs()
713		})
714	}
715
716	#[cfg(test)]
717	fn set_time(&mut self, time: u64) {
718		self.time_override = Some(time);
719	}
720
721	/// Returns `self` as [`StatementStoreExt`].
722	pub fn as_statement_store_ext(self: Arc<Self>) -> StatementStoreExt {
723		StatementStoreExt::new(self)
724	}
725
726	/// Return information of all known statements whose decryption key is identified as
727	/// `dest`. The key must be available to the client.
728	fn posted_clear_inner<R>(
729		&self,
730		match_all_topics: &[Topic],
731		dest: [u8; 32],
732		// Map the statement and the decrypted data to the desired result.
733		mut map_f: impl FnMut(Statement, Vec<u8>) -> R,
734	) -> Result<Vec<R>> {
735		self.collect_statements(Some(dest), match_all_topics, |statement| {
736			if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) {
737				let public: sp_core::ed25519::Public = UncheckedFrom::unchecked_from(key);
738				let public: sp_statement_store::ed25519::Public = public.into();
739				match self.keystore.key_pair::<sp_statement_store::ed25519::Pair>(&public) {
740					Err(e) => {
741						log::debug!(
742							target: LOG_TARGET,
743							"Keystore error: {:?}, for statement {:?}",
744							e,
745							HexDisplay::from(&statement.hash())
746						);
747						None
748					},
749					Ok(None) => {
750						log::debug!(
751							target: LOG_TARGET,
752							"Keystore is missing key for statement {:?}",
753							HexDisplay::from(&statement.hash())
754						);
755						None
756					},
757					Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) {
758						Ok(r) => r.map(|data| map_f(statement, data)),
759						Err(e) => {
760							log::debug!(
761								target: LOG_TARGET,
762								"Decryption error: {:?}, for statement {:?}",
763								e,
764								HexDisplay::from(&statement.hash())
765							);
766							None
767						},
768					},
769				}
770			} else {
771				None
772			}
773		})
774	}
775}
776
777impl StatementStore for Store {
778	/// Return all statements.
779	fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
780		let index = self.index.read();
781		let mut result = Vec::with_capacity(index.entries.len());
782		for hash in index.entries.keys().cloned() {
783			let Some(encoded) =
784				self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
785			else {
786				continue
787			};
788			if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
789				result.push((hash, statement));
790			}
791		}
792		Ok(result)
793	}
794
795	fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>> {
796		let mut index = self.index.write();
797		let recent = index.take_recent();
798		let mut result = Vec::with_capacity(recent.len());
799		for hash in recent {
800			let Some(encoded) =
801				self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
802			else {
803				continue
804			};
805			if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
806				result.push((hash, statement));
807			}
808		}
809		Ok(result)
810	}
811
812	/// Returns a statement by hash.
813	fn statement(&self, hash: &Hash) -> Result<Option<Statement>> {
814		Ok(
815			match self
816				.db
817				.get(col::STATEMENTS, hash.as_slice())
818				.map_err(|e| Error::Db(e.to_string()))?
819			{
820				Some(entry) => {
821					log::trace!(
822						target: LOG_TARGET,
823						"Queried statement {:?}",
824						HexDisplay::from(hash)
825					);
826					Some(
827						Statement::decode(&mut entry.as_slice())
828							.map_err(|e| Error::Decode(e.to_string()))?,
829					)
830				},
831				None => {
832					log::trace!(
833						target: LOG_TARGET,
834						"Queried missing statement {:?}",
835						HexDisplay::from(hash)
836					);
837					None
838				},
839			},
840		)
841	}
842
843	fn has_statement(&self, hash: &Hash) -> bool {
844		self.index.read().entries.contains_key(hash)
845	}
846
847	/// Return the data of all known statements which include all topics and have no `DecryptionKey`
848	/// field.
849	fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
850		self.collect_statements(None, match_all_topics, |statement| statement.into_data())
851	}
852
853	/// Return the data of all known statements whose decryption key is identified as `dest` (this
854	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
855	/// private key for symmetric ciphers).
856	fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
857		self.collect_statements(Some(dest), match_all_topics, |statement| statement.into_data())
858	}
859
860	/// Return the decrypted data of all known statements whose decryption key is identified as
861	/// `dest`. The key must be available to the client.
862	fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
863		self.posted_clear_inner(match_all_topics, dest, |_statement, data| data)
864	}
865
866	/// Return all known statements which include all topics and have no `DecryptionKey`
867	/// field.
868	fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
869		self.collect_statements(None, match_all_topics, |statement| Some(statement.encode()))
870	}
871
872	/// Return all known statements whose decryption key is identified as `dest` (this
873	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
874	/// private key for symmetric ciphers).
875	fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
876		self.collect_statements(Some(dest), match_all_topics, |statement| Some(statement.encode()))
877	}
878
879	/// Return the statement and the decrypted data of all known statements whose decryption key is
880	/// identified as `dest`. The key must be available to the client.
881	fn posted_clear_stmt(
882		&self,
883		match_all_topics: &[Topic],
884		dest: [u8; 32],
885	) -> Result<Vec<Vec<u8>>> {
886		self.posted_clear_inner(match_all_topics, dest, |statement, data| {
887			let mut res = Vec::with_capacity(statement.size_hint() + data.len());
888			statement.encode_to(&mut res);
889			res.extend_from_slice(&data);
890			res
891		})
892	}
893
894	/// Submit a statement to the store. Validates the statement and returns validation result.
895	fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult {
896		let hash = statement.hash();
897		let encoded_size = statement.encoded_size();
898		if encoded_size > MAX_STATEMENT_SIZE {
899			log::debug!(
900				target: LOG_TARGET,
901				"Statement is too big for propogation: {:?} ({}/{} bytes)",
902				HexDisplay::from(&hash),
903				statement.encoded_size(),
904				MAX_STATEMENT_SIZE
905			);
906			return SubmitResult::Ignored
907		}
908
909		match self.index.read().query(&hash) {
910			IndexQuery::Expired =>
911				if !source.can_be_resubmitted() {
912					return SubmitResult::KnownExpired
913				},
914			IndexQuery::Exists =>
915				if !source.can_be_resubmitted() {
916					return SubmitResult::Known
917				},
918			IndexQuery::Unknown => {},
919		}
920
921		let Some(account_id) = statement.account_id() else {
922			log::debug!(
923				target: LOG_TARGET,
924				"Statement validation failed: Missing proof ({:?})",
925				HexDisplay::from(&hash),
926			);
927			self.metrics.report(|metrics| metrics.validations_invalid.inc());
928			return SubmitResult::Bad("No statement proof")
929		};
930
931		// Validate.
932		let at_block = if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
933			Some(*block_hash)
934		} else {
935			None
936		};
937		let validation_result = (self.validate_fn)(at_block, source, statement.clone());
938		let validation = match validation_result {
939			Ok(validation) => validation,
940			Err(InvalidStatement::BadProof) => {
941				log::debug!(
942					target: LOG_TARGET,
943					"Statement validation failed: BadProof, {:?}",
944					HexDisplay::from(&hash),
945				);
946				self.metrics.report(|metrics| metrics.validations_invalid.inc());
947				return SubmitResult::Bad("Bad statement proof")
948			},
949			Err(InvalidStatement::NoProof) => {
950				log::debug!(
951					target: LOG_TARGET,
952					"Statement validation failed: NoProof, {:?}",
953					HexDisplay::from(&hash),
954				);
955				self.metrics.report(|metrics| metrics.validations_invalid.inc());
956				return SubmitResult::Bad("Missing statement proof")
957			},
958			Err(InvalidStatement::InternalError) =>
959				return SubmitResult::InternalError(Error::Runtime),
960		};
961
962		let current_time = self.timestamp();
963		let mut commit = Vec::new();
964		{
965			let mut index = self.index.write();
966
967			let evicted =
968				match index.insert(hash, &statement, &account_id, &validation, current_time) {
969					MaybeInserted::Ignored => return SubmitResult::Ignored,
970					MaybeInserted::Inserted(evicted) => evicted,
971				};
972
973			commit.push((col::STATEMENTS, hash.to_vec(), Some(statement.encode())));
974			for hash in evicted {
975				commit.push((col::STATEMENTS, hash.to_vec(), None));
976				commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
977			}
978			if let Err(e) = self.db.commit(commit) {
979				log::debug!(
980					target: LOG_TARGET,
981					"Statement validation failed: database error {}, {:?}",
982					e,
983					statement
984				);
985				return SubmitResult::InternalError(Error::Db(e.to_string()))
986			}
987		} // Release index lock
988		self.metrics.report(|metrics| metrics.submitted_statements.inc());
989		let network_priority = NetworkPriority::High;
990		log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash));
991		SubmitResult::New(network_priority)
992	}
993
994	/// Remove a statement by hash.
995	fn remove(&self, hash: &Hash) -> Result<()> {
996		let current_time = self.timestamp();
997		{
998			let mut index = self.index.write();
999			if index.make_expired(hash, current_time) {
1000				let commit = [
1001					(col::STATEMENTS, hash.to_vec(), None),
1002					(col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())),
1003				];
1004				if let Err(e) = self.db.commit(commit) {
1005					log::debug!(
1006						target: LOG_TARGET,
1007						"Error removing statement: database error {}, {:?}",
1008						e,
1009						HexDisplay::from(hash),
1010					);
1011					return Err(Error::Db(e.to_string()))
1012				}
1013			}
1014		}
1015		Ok(())
1016	}
1017
1018	/// Remove all statements by an account.
1019	fn remove_by(&self, who: [u8; 32]) -> Result<()> {
1020		let mut index = self.index.write();
1021		let mut evicted = Vec::new();
1022		if let Some(account_rec) = index.accounts.get(&who) {
1023			evicted.extend(account_rec.by_priority.keys().map(|k| k.hash));
1024		}
1025
1026		let current_time = self.timestamp();
1027		let mut commit = Vec::new();
1028		for hash in evicted {
1029			index.make_expired(&hash, current_time);
1030			commit.push((col::STATEMENTS, hash.to_vec(), None));
1031			commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
1032		}
1033		self.db.commit(commit).map_err(|e| {
1034			log::debug!(
1035				target: LOG_TARGET,
1036				"Error removing statement: database error {}, remove by {:?}",
1037				e,
1038				HexDisplay::from(&who),
1039			);
1040
1041			Error::Db(e.to_string())
1042		})
1043	}
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048	use crate::Store;
1049	use sc_keystore::Keystore;
1050	use sp_core::{Decode, Encode, Pair};
1051	use sp_statement_store::{
1052		runtime_api::{InvalidStatement, ValidStatement, ValidateStatement},
1053		AccountId, Channel, DecryptionKey, NetworkPriority, Proof, SignatureVerificationResult,
1054		Statement, StatementSource, StatementStore, SubmitResult, Topic,
1055	};
1056
1057	type Extrinsic = sp_runtime::OpaqueExtrinsic;
1058	type Hash = sp_core::H256;
1059	type Hashing = sp_runtime::traits::BlakeTwo256;
1060	type BlockNumber = u64;
1061	type Header = sp_runtime::generic::Header<BlockNumber, Hashing>;
1062	type Block = sp_runtime::generic::Block<Header, Extrinsic>;
1063
1064	const CORRECT_BLOCK_HASH: [u8; 32] = [1u8; 32];
1065
1066	#[derive(Clone)]
1067	pub(crate) struct TestClient;
1068
1069	pub(crate) struct RuntimeApi {
1070		_inner: TestClient,
1071	}
1072
1073	impl sp_api::ProvideRuntimeApi<Block> for TestClient {
1074		type Api = RuntimeApi;
1075		fn runtime_api(&self) -> sp_api::ApiRef<'_, Self::Api> {
1076			RuntimeApi { _inner: self.clone() }.into()
1077		}
1078	}
1079
1080	sp_api::mock_impl_runtime_apis! {
1081		impl ValidateStatement<Block> for RuntimeApi {
1082			fn validate_statement(
1083				_source: StatementSource,
1084				statement: Statement,
1085			) -> std::result::Result<ValidStatement, InvalidStatement> {
1086				use crate::tests::account;
1087				match statement.verify_signature() {
1088					SignatureVerificationResult::Valid(_) => Ok(ValidStatement{max_count: 100, max_size: 1000}),
1089					SignatureVerificationResult::Invalid => Err(InvalidStatement::BadProof),
1090					SignatureVerificationResult::NoSignature => {
1091						if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
1092							if block_hash == &CORRECT_BLOCK_HASH {
1093								let (max_count, max_size) = match statement.account_id() {
1094									Some(a) if a == account(1) => (1, 1000),
1095									Some(a) if a == account(2) => (2, 1000),
1096									Some(a) if a == account(3) => (3, 1000),
1097									Some(a) if a == account(4) => (4, 1000),
1098									Some(a) if a == account(42) => (42, 42 * crate::MAX_STATEMENT_SIZE as u32),
1099									_ => (2, 2000),
1100								};
1101								Ok(ValidStatement{ max_count, max_size })
1102							} else {
1103								Err(InvalidStatement::BadProof)
1104							}
1105						} else {
1106							Err(InvalidStatement::BadProof)
1107						}
1108					}
1109				}
1110			}
1111		}
1112	}
1113
1114	impl sp_blockchain::HeaderBackend<Block> for TestClient {
1115		fn header(&self, _hash: Hash) -> sp_blockchain::Result<Option<Header>> {
1116			unimplemented!()
1117		}
1118		fn info(&self) -> sp_blockchain::Info<Block> {
1119			sp_blockchain::Info {
1120				best_hash: CORRECT_BLOCK_HASH.into(),
1121				best_number: 0,
1122				genesis_hash: Default::default(),
1123				finalized_hash: CORRECT_BLOCK_HASH.into(),
1124				finalized_number: 1,
1125				finalized_state: None,
1126				number_leaves: 0,
1127				block_gap: None,
1128			}
1129		}
1130		fn status(&self, _hash: Hash) -> sp_blockchain::Result<sp_blockchain::BlockStatus> {
1131			unimplemented!()
1132		}
1133		fn number(&self, _hash: Hash) -> sp_blockchain::Result<Option<BlockNumber>> {
1134			unimplemented!()
1135		}
1136		fn hash(&self, _number: BlockNumber) -> sp_blockchain::Result<Option<Hash>> {
1137			unimplemented!()
1138		}
1139	}
1140
1141	fn test_store() -> (Store, tempfile::TempDir) {
1142		sp_tracing::init_for_tests();
1143		let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
1144
1145		let client = std::sync::Arc::new(TestClient);
1146		let mut path: std::path::PathBuf = temp_dir.path().into();
1147		path.push("db");
1148		let keystore = std::sync::Arc::new(sc_keystore::LocalKeystore::in_memory());
1149		let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1150		(store, temp_dir) // return order is important. Store must be dropped before TempDir
1151	}
1152
1153	fn signed_statement(data: u8) -> Statement {
1154		signed_statement_with_topics(data, &[], None)
1155	}
1156
1157	fn signed_statement_with_topics(
1158		data: u8,
1159		topics: &[Topic],
1160		dec_key: Option<DecryptionKey>,
1161	) -> Statement {
1162		let mut statement = Statement::new();
1163		statement.set_plain_data(vec![data]);
1164		for i in 0..topics.len() {
1165			statement.set_topic(i, topics[i]);
1166		}
1167		if let Some(key) = dec_key {
1168			statement.set_decryption_key(key);
1169		}
1170		let kp = sp_core::ed25519::Pair::from_string("//Alice", None).unwrap();
1171		statement.sign_ed25519_private(&kp);
1172		statement
1173	}
1174
1175	fn topic(data: u64) -> Topic {
1176		let mut topic: Topic = Default::default();
1177		topic[0..8].copy_from_slice(&data.to_le_bytes());
1178		topic
1179	}
1180
1181	fn dec_key(data: u64) -> DecryptionKey {
1182		let mut dec_key: DecryptionKey = Default::default();
1183		dec_key[0..8].copy_from_slice(&data.to_le_bytes());
1184		dec_key
1185	}
1186
1187	fn account(id: u64) -> AccountId {
1188		let mut account: AccountId = Default::default();
1189		account[0..8].copy_from_slice(&id.to_le_bytes());
1190		account
1191	}
1192
1193	fn channel(id: u64) -> Channel {
1194		let mut channel: Channel = Default::default();
1195		channel[0..8].copy_from_slice(&id.to_le_bytes());
1196		channel
1197	}
1198
1199	fn statement(account_id: u64, priority: u32, c: Option<u64>, data_len: usize) -> Statement {
1200		let mut statement = Statement::new();
1201		let mut data = Vec::new();
1202		data.resize(data_len, 0);
1203		statement.set_plain_data(data);
1204		statement.set_priority(priority);
1205		if let Some(c) = c {
1206			statement.set_channel(channel(c));
1207		}
1208		statement.set_proof(Proof::OnChain {
1209			block_hash: CORRECT_BLOCK_HASH,
1210			who: account(account_id),
1211			event_index: 0,
1212		});
1213		statement
1214	}
1215
1216	#[test]
1217	fn submit_one() {
1218		let (store, _temp) = test_store();
1219		let statement0 = signed_statement(0);
1220		assert_eq!(
1221			store.submit(statement0, StatementSource::Network),
1222			SubmitResult::New(NetworkPriority::High)
1223		);
1224		let unsigned = statement(0, 1, None, 0);
1225		assert_eq!(
1226			store.submit(unsigned, StatementSource::Network),
1227			SubmitResult::New(NetworkPriority::High)
1228		);
1229	}
1230
1231	#[test]
1232	fn save_and_load_statements() {
1233		let (store, temp) = test_store();
1234		let statement0 = signed_statement(0);
1235		let statement1 = signed_statement(1);
1236		let statement2 = signed_statement(2);
1237		assert_eq!(
1238			store.submit(statement0.clone(), StatementSource::Network),
1239			SubmitResult::New(NetworkPriority::High)
1240		);
1241		assert_eq!(
1242			store.submit(statement1.clone(), StatementSource::Network),
1243			SubmitResult::New(NetworkPriority::High)
1244		);
1245		assert_eq!(
1246			store.submit(statement2.clone(), StatementSource::Network),
1247			SubmitResult::New(NetworkPriority::High)
1248		);
1249		assert_eq!(store.statements().unwrap().len(), 3);
1250		assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1251		assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1.clone()));
1252		let keystore = store.keystore.clone();
1253		drop(store);
1254
1255		let client = std::sync::Arc::new(TestClient);
1256		let mut path: std::path::PathBuf = temp.path().into();
1257		path.push("db");
1258		let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1259		assert_eq!(store.statements().unwrap().len(), 3);
1260		assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1261		assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
1262	}
1263
1264	#[test]
1265	fn take_recent_statements_clears_index() {
1266		let (store, _temp) = test_store();
1267		let statement0 = signed_statement(0);
1268		let statement1 = signed_statement(1);
1269		let statement2 = signed_statement(2);
1270		let statement3 = signed_statement(3);
1271
1272		let _ = store.submit(statement0.clone(), StatementSource::Local);
1273		let _ = store.submit(statement1.clone(), StatementSource::Local);
1274		let _ = store.submit(statement2.clone(), StatementSource::Local);
1275
1276		let recent1 = store.take_recent_statements().unwrap();
1277		let (recent1_hashes, recent1_statements): (Vec<_>, Vec<_>) = recent1.into_iter().unzip();
1278		let expected1 = vec![statement0, statement1, statement2];
1279		assert!(expected1.iter().all(|s| recent1_hashes.contains(&s.hash())));
1280		assert!(expected1.iter().all(|s| recent1_statements.contains(s)));
1281
1282		// Recent statements are cleared.
1283		let recent2 = store.take_recent_statements().unwrap();
1284		assert_eq!(recent2.len(), 0);
1285
1286		store.submit(statement3.clone(), StatementSource::Network);
1287
1288		let recent3 = store.take_recent_statements().unwrap();
1289		let (recent3_hashes, recent3_statements): (Vec<_>, Vec<_>) = recent3.into_iter().unzip();
1290		let expected3 = vec![statement3];
1291		assert!(expected3.iter().all(|s| recent3_hashes.contains(&s.hash())));
1292		assert!(expected3.iter().all(|s| recent3_statements.contains(s)));
1293
1294		// Recent statements are cleared, but statements remain in the store.
1295		assert_eq!(store.statements().unwrap().len(), 4);
1296	}
1297
1298	#[test]
1299	fn search_by_topic_and_key() {
1300		let (store, _temp) = test_store();
1301		let statement0 = signed_statement(0);
1302		let statement1 = signed_statement_with_topics(1, &[topic(0)], None);
1303		let statement2 = signed_statement_with_topics(2, &[topic(0), topic(1)], Some(dec_key(2)));
1304		let statement3 = signed_statement_with_topics(3, &[topic(0), topic(1), topic(2)], None);
1305		let statement4 =
1306			signed_statement_with_topics(4, &[topic(0), topic(42), topic(2), topic(3)], None);
1307		let statements = vec![statement0, statement1, statement2, statement3, statement4];
1308		for s in &statements {
1309			store.submit(s.clone(), StatementSource::Network);
1310		}
1311
1312		let assert_topics = |topics: &[u64], key: Option<u64>, expected: &[u8]| {
1313			let key = key.map(dec_key);
1314			let topics: Vec<_> = topics.iter().map(|t| topic(*t)).collect();
1315			let mut got_vals: Vec<_> = if let Some(key) = key {
1316				store.posted(&topics, key).unwrap().into_iter().map(|d| d[0]).collect()
1317			} else {
1318				store.broadcasts(&topics).unwrap().into_iter().map(|d| d[0]).collect()
1319			};
1320			got_vals.sort();
1321			assert_eq!(expected.to_vec(), got_vals);
1322		};
1323
1324		assert_topics(&[], None, &[0, 1, 3, 4]);
1325		assert_topics(&[], Some(2), &[2]);
1326		assert_topics(&[0], None, &[1, 3, 4]);
1327		assert_topics(&[1], None, &[3]);
1328		assert_topics(&[2], None, &[3, 4]);
1329		assert_topics(&[3], None, &[4]);
1330		assert_topics(&[42], None, &[4]);
1331
1332		assert_topics(&[0, 1], None, &[3]);
1333		assert_topics(&[0, 1], Some(2), &[2]);
1334		assert_topics(&[0, 1, 99], Some(2), &[]);
1335		assert_topics(&[1, 2], None, &[3]);
1336		assert_topics(&[99], None, &[]);
1337		assert_topics(&[0, 99], None, &[]);
1338		assert_topics(&[0, 1, 2, 3, 42], None, &[]);
1339	}
1340
1341	#[test]
1342	fn constraints() {
1343		let (store, _temp) = test_store();
1344
1345		store.index.write().options.max_total_size = 3000;
1346		let source = StatementSource::Network;
1347		let ok = SubmitResult::New(NetworkPriority::High);
1348		let ignored = SubmitResult::Ignored;
1349
1350		// Account 1 (limit = 1 msg, 1000 bytes)
1351
1352		// Oversized statement is not allowed. Limit for account 1 is 1 msg, 1000 bytes
1353		assert_eq!(store.submit(statement(1, 1, Some(1), 2000), source), ignored);
1354		assert_eq!(store.submit(statement(1, 1, Some(1), 500), source), ok);
1355		// Would not replace channel message with same priority
1356		assert_eq!(store.submit(statement(1, 1, Some(1), 200), source), ignored);
1357		assert_eq!(store.submit(statement(1, 2, Some(1), 600), source), ok);
1358		// Submit another message to another channel with lower priority. Should not be allowed
1359		// because msg count limit is 1
1360		assert_eq!(store.submit(statement(1, 1, Some(2), 100), source), ignored);
1361		assert_eq!(store.index.read().expired.len(), 1);
1362
1363		// Account 2 (limit = 2 msg, 1000 bytes)
1364
1365		assert_eq!(store.submit(statement(2, 1, None, 500), source), ok);
1366		assert_eq!(store.submit(statement(2, 2, None, 100), source), ok);
1367		// Should evict priority 1
1368		assert_eq!(store.submit(statement(2, 3, None, 500), source), ok);
1369		assert_eq!(store.index.read().expired.len(), 2);
1370		// Should evict all
1371		assert_eq!(store.submit(statement(2, 4, None, 1000), source), ok);
1372		assert_eq!(store.index.read().expired.len(), 4);
1373
1374		// Account 3 (limit = 3 msg, 1000 bytes)
1375
1376		assert_eq!(store.submit(statement(3, 2, Some(1), 300), source), ok);
1377		assert_eq!(store.submit(statement(3, 3, Some(2), 300), source), ok);
1378		assert_eq!(store.submit(statement(3, 4, Some(3), 300), source), ok);
1379		// Should evict 2 and 3
1380		assert_eq!(store.submit(statement(3, 5, None, 500), source), ok);
1381		assert_eq!(store.index.read().expired.len(), 6);
1382
1383		assert_eq!(store.index.read().total_size, 2400);
1384		assert_eq!(store.index.read().entries.len(), 4);
1385
1386		// Should be over the global size limit
1387		assert_eq!(store.submit(statement(1, 1, None, 700), source), ignored);
1388		// Should be over the global count limit
1389		store.index.write().options.max_total_statements = 4;
1390		assert_eq!(store.submit(statement(1, 1, None, 100), source), ignored);
1391
1392		let mut expected_statements = vec![
1393			statement(1, 2, Some(1), 600).hash(),
1394			statement(2, 4, None, 1000).hash(),
1395			statement(3, 4, Some(3), 300).hash(),
1396			statement(3, 5, None, 500).hash(),
1397		];
1398		expected_statements.sort();
1399		let mut statements: Vec<_> =
1400			store.statements().unwrap().into_iter().map(|(hash, _)| hash).collect();
1401		statements.sort();
1402		assert_eq!(expected_statements, statements);
1403	}
1404
1405	#[test]
1406	fn max_statement_size_for_gossiping() {
1407		let (store, _temp) = test_store();
1408		store.index.write().options.max_total_size = 42 * crate::MAX_STATEMENT_SIZE;
1409
1410		assert_eq!(
1411			store.submit(
1412				statement(42, 1, Some(1), crate::MAX_STATEMENT_SIZE - 500),
1413				StatementSource::Local
1414			),
1415			SubmitResult::New(NetworkPriority::High)
1416		);
1417
1418		assert_eq!(
1419			store.submit(
1420				statement(42, 2, Some(1), 2 * crate::MAX_STATEMENT_SIZE),
1421				StatementSource::Local
1422			),
1423			SubmitResult::Ignored
1424		);
1425	}
1426
1427	#[test]
1428	fn expired_statements_are_purged() {
1429		use super::DEFAULT_PURGE_AFTER_SEC;
1430		let (mut store, temp) = test_store();
1431		let mut statement = statement(1, 1, Some(3), 100);
1432		store.set_time(0);
1433		statement.set_topic(0, topic(4));
1434		store.submit(statement.clone(), StatementSource::Network);
1435		assert_eq!(store.index.read().entries.len(), 1);
1436		store.remove(&statement.hash()).unwrap();
1437		assert_eq!(store.index.read().entries.len(), 0);
1438		assert_eq!(store.index.read().accounts.len(), 0);
1439		store.set_time(DEFAULT_PURGE_AFTER_SEC + 1);
1440		store.maintain();
1441		assert_eq!(store.index.read().expired.len(), 0);
1442		let keystore = store.keystore.clone();
1443		drop(store);
1444
1445		let client = std::sync::Arc::new(TestClient);
1446		let mut path: std::path::PathBuf = temp.path().into();
1447		path.push("db");
1448		let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1449		assert_eq!(store.statements().unwrap().len(), 0);
1450		assert_eq!(store.index.read().expired.len(), 0);
1451	}
1452
1453	#[test]
1454	fn posted_clear_decrypts() {
1455		let (store, _temp) = test_store();
1456		let public = store
1457			.keystore
1458			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1459			.unwrap();
1460		let statement1 = statement(1, 1, None, 100);
1461		let mut statement2 = statement(1, 2, None, 0);
1462		let plain = b"The most valuable secret".to_vec();
1463		statement2.encrypt(&plain, &public).unwrap();
1464		store.submit(statement1, StatementSource::Network);
1465		store.submit(statement2, StatementSource::Network);
1466		let posted_clear = store.posted_clear(&[], public.into()).unwrap();
1467		assert_eq!(posted_clear, vec![plain]);
1468	}
1469
1470	#[test]
1471	fn broadcasts_stmt_returns_encoded_statements() {
1472		let (store, _tmp) = test_store();
1473
1474		// no key, no topic
1475		let s0 = signed_statement_with_topics(0, &[], None);
1476		// same, but with a topic = 42
1477		let s1 = signed_statement_with_topics(1, &[topic(42)], None);
1478		// has a decryption key -> must NOT be returned by broadcasts_stmt
1479		let s2 = signed_statement_with_topics(2, &[topic(42)], Some(dec_key(99)));
1480
1481		for s in [&s0, &s1, &s2] {
1482			store.submit(s.clone(), StatementSource::Network);
1483		}
1484
1485		// no topic filter
1486		let mut hashes: Vec<_> = store
1487			.broadcasts_stmt(&[])
1488			.unwrap()
1489			.into_iter()
1490			.map(|bytes| Statement::decode(&mut &bytes[..]).unwrap().hash())
1491			.collect();
1492		hashes.sort();
1493		let expected_hashes = {
1494			let mut e = vec![s0.hash(), s1.hash()];
1495			e.sort();
1496			e
1497		};
1498		assert_eq!(hashes, expected_hashes);
1499
1500		// filter on topic 42
1501		let got = store.broadcasts_stmt(&[topic(42)]).unwrap();
1502		assert_eq!(got.len(), 1);
1503		let st = Statement::decode(&mut &got[0][..]).unwrap();
1504		assert_eq!(st.hash(), s1.hash());
1505	}
1506
1507	#[test]
1508	fn posted_stmt_returns_encoded_statements_for_dest() {
1509		let (store, _tmp) = test_store();
1510
1511		let public1 = store
1512			.keystore
1513			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1514			.unwrap();
1515		let dest: [u8; 32] = public1.into();
1516
1517		let public2 = store
1518			.keystore
1519			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1520			.unwrap();
1521
1522		// A statement that does have dec_key = dest
1523		let mut s_with_key = statement(1, 1, None, 0);
1524		let plain1 = b"The most valuable secret".to_vec();
1525		s_with_key.encrypt(&plain1, &public1).unwrap();
1526
1527		// A statement with a different dec_key
1528		let mut s_other_key = statement(2, 2, None, 0);
1529		let plain2 = b"The second most valuable secret".to_vec();
1530		s_other_key.encrypt(&plain2, &public2).unwrap();
1531
1532		// Submit them all
1533		for s in [&s_with_key, &s_other_key] {
1534			store.submit(s.clone(), StatementSource::Network);
1535		}
1536
1537		// posted_stmt should only return the one with dec_key = dest
1538		let retrieved = store.posted_stmt(&[], dest).unwrap();
1539		assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1540
1541		// Re-decode that returned statement to confirm it is correct
1542		let returned_stmt = Statement::decode(&mut &retrieved[0][..]).unwrap();
1543		assert_eq!(
1544			returned_stmt.hash(),
1545			s_with_key.hash(),
1546			"Returned statement must match s_with_key"
1547		);
1548	}
1549
1550	#[test]
1551	fn posted_clear_stmt_returns_statement_followed_by_plain_data() {
1552		let (store, _tmp) = test_store();
1553
1554		let public1 = store
1555			.keystore
1556			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1557			.unwrap();
1558		let dest: [u8; 32] = public1.into();
1559
1560		let public2 = store
1561			.keystore
1562			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1563			.unwrap();
1564
1565		// A statement that does have dec_key = dest
1566		let mut s_with_key = statement(1, 1, None, 0);
1567		let plain1 = b"The most valuable secret".to_vec();
1568		s_with_key.encrypt(&plain1, &public1).unwrap();
1569
1570		// A statement with a different dec_key
1571		let mut s_other_key = statement(2, 2, None, 0);
1572		let plain2 = b"The second most valuable secret".to_vec();
1573		s_other_key.encrypt(&plain2, &public2).unwrap();
1574
1575		// Submit them all
1576		for s in [&s_with_key, &s_other_key] {
1577			store.submit(s.clone(), StatementSource::Network);
1578		}
1579
1580		// posted_stmt should only return the one with dec_key = dest
1581		let retrieved = store.posted_clear_stmt(&[], dest).unwrap();
1582		assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1583
1584		// We expect: [ encoded Statement ] + [ the decrypted bytes ]
1585		let encoded_stmt = s_with_key.encode();
1586		let stmt_len = encoded_stmt.len();
1587
1588		// 1) statement is first
1589		assert_eq!(&retrieved[0][..stmt_len], &encoded_stmt[..]);
1590
1591		// 2) followed by the decrypted payload
1592		let trailing = &retrieved[0][stmt_len..];
1593		assert_eq!(trailing, &plain1[..]);
1594	}
1595
1596	#[test]
1597	fn posted_clear_returns_plain_data_for_dest_and_topics() {
1598		let (store, _tmp) = test_store();
1599
1600		// prepare two key-pairs
1601		let public_dest = store
1602			.keystore
1603			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1604			.unwrap();
1605		let dest: [u8; 32] = public_dest.into();
1606
1607		let public_other = store
1608			.keystore
1609			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1610			.unwrap();
1611
1612		// statement that SHOULD be returned (matches dest & topic 42)
1613		let mut s_good = statement(1, 1, None, 0);
1614		let plaintext_good = b"The most valuable secret".to_vec();
1615		s_good.encrypt(&plaintext_good, &public_dest).unwrap();
1616		s_good.set_topic(0, topic(42));
1617
1618		// statement that should NOT be returned (same dest but different topic)
1619		let mut s_wrong_topic = statement(2, 2, None, 0);
1620		s_wrong_topic.encrypt(b"Wrong topic", &public_dest).unwrap();
1621		s_wrong_topic.set_topic(0, topic(99));
1622
1623		// statement that should NOT be returned (different dest)
1624		let mut s_other_dest = statement(3, 3, None, 0);
1625		s_other_dest.encrypt(b"Other dest", &public_other).unwrap();
1626		s_other_dest.set_topic(0, topic(42));
1627
1628		// submit all
1629		for s in [&s_good, &s_wrong_topic, &s_other_dest] {
1630			store.submit(s.clone(), StatementSource::Network);
1631		}
1632
1633		// call posted_clear with the topic filter and dest
1634		let retrieved = store.posted_clear(&[topic(42)], dest).unwrap();
1635
1636		// exactly one element, equal to the expected plaintext
1637		assert_eq!(retrieved, vec![plaintext_good]);
1638	}
1639
1640	#[test]
1641	fn remove_by_covers_various_situations() {
1642		use sp_statement_store::{StatementSource, StatementStore, SubmitResult};
1643
1644		// Use a fresh store and fixed time so we can control purging.
1645		let (mut store, _temp) = test_store();
1646		store.set_time(0);
1647
1648		// Reuse helpers from this module.
1649		let t42 = topic(42);
1650		let k7 = dec_key(7);
1651
1652		// Account A = 4 (has per-account limits (4, 1000) in the mock runtime)
1653		// - Mix of topic, decryption-key and channel to exercise every index.
1654		let mut s_a1 = statement(4, 10, Some(100), 100);
1655		s_a1.set_topic(0, t42);
1656		let h_a1 = s_a1.hash();
1657
1658		let mut s_a2 = statement(4, 20, Some(200), 150);
1659		s_a2.set_decryption_key(k7);
1660		let h_a2 = s_a2.hash();
1661
1662		let s_a3 = statement(4, 30, None, 50);
1663		let h_a3 = s_a3.hash();
1664
1665		// Account B = 3 (control group that must remain untouched).
1666		let s_b1 = statement(3, 10, None, 100);
1667		let h_b1 = s_b1.hash();
1668
1669		let mut s_b2 = statement(3, 15, Some(300), 100);
1670		s_b2.set_topic(0, t42);
1671		s_b2.set_decryption_key(k7);
1672		let h_b2 = s_b2.hash();
1673
1674		// Submit all statements.
1675		for s in [&s_a1, &s_a2, &s_a3, &s_b1, &s_b2] {
1676			assert!(matches!(
1677				store.submit(s.clone(), StatementSource::Network),
1678				SubmitResult::New(_)
1679			));
1680		}
1681
1682		// --- Pre-conditions: everything is indexed as expected.
1683		{
1684			let idx = store.index.read();
1685			assert_eq!(idx.entries.len(), 5, "all 5 should be present");
1686			assert!(idx.accounts.contains_key(&account(4)));
1687			assert!(idx.accounts.contains_key(&account(3)));
1688			assert_eq!(idx.total_size, 100 + 150 + 50 + 100 + 100);
1689
1690			// Topic and key sets contain both A & B entries.
1691			let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1692			assert!(set_t.contains(&h_a1) && set_t.contains(&h_b2));
1693
1694			let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1695			assert!(set_k.contains(&h_a2) && set_k.contains(&h_b2));
1696		}
1697
1698		// --- Action: remove all statements by Account A.
1699		store.remove_by(account(4)).expect("remove_by should succeed");
1700
1701		// --- Post-conditions: A's statements are gone and marked expired; B's remain.
1702		{
1703			// A's statements removed from DB view.
1704			for h in [h_a1, h_a2, h_a3] {
1705				assert!(store.statement(&h).unwrap().is_none(), "A's statement should be removed");
1706			}
1707
1708			// B's statements still present.
1709			for h in [h_b1, h_b2] {
1710				assert!(store.statement(&h).unwrap().is_some(), "B's statement should remain");
1711			}
1712
1713			let idx = store.index.read();
1714
1715			// Account map updated.
1716			assert!(!idx.accounts.contains_key(&account(4)), "Account A must be gone");
1717			assert!(idx.accounts.contains_key(&account(3)), "Account B must remain");
1718
1719			// Removed statements are marked expired.
1720			assert!(idx.expired.contains_key(&h_a1));
1721			assert!(idx.expired.contains_key(&h_a2));
1722			assert!(idx.expired.contains_key(&h_a3));
1723			assert_eq!(idx.expired.len(), 3);
1724
1725			// Entry count & total_size reflect only B's data.
1726			assert_eq!(idx.entries.len(), 2);
1727			assert_eq!(idx.total_size, 100 + 100);
1728
1729			// Topic index: only B2 remains for topic 42.
1730			let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1731			assert!(set_t.contains(&h_b2));
1732			assert!(!set_t.contains(&h_a1));
1733
1734			// Decryption-key index: only B2 remains for key 7.
1735			let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1736			assert!(set_k.contains(&h_b2));
1737			assert!(!set_k.contains(&h_a2));
1738		}
1739
1740		// --- Idempotency: removing again is a no-op and should not error.
1741		store.remove_by(account(4)).expect("second remove_by should be a no-op");
1742
1743		// --- Purge: advance time beyond TTL and run maintenance; expired entries disappear.
1744		let purge_after = store.index.read().options.purge_after_sec;
1745		store.set_time(purge_after + 1);
1746		store.maintain();
1747		assert_eq!(store.index.read().expired.len(), 0, "expired entries should be purged");
1748
1749		// --- Reuse: Account A can submit again after purge.
1750		let s_new = statement(4, 40, None, 10);
1751		assert!(matches!(store.submit(s_new, StatementSource::Network), SubmitResult::New(_)));
1752	}
1753}