sc_statement_store/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Disk-backed statement store.
20//!
21//! This module contains an implementation of `sp_statement_store::StatementStore` which is backed
22//! by a database.
23//!
24//! Constraint management.
25//!
26//! Each time a new statement is inserted into the store, it is first validated with the runtime
27//! Validation function computes `global_priority`, 'max_count' and `max_size` for a statement.
28//! The following constraints are then checked:
29//! * For a given account id, there may be at most `max_count` statements with `max_size` total data
30//!   size. To satisfy this, statements for this account ID are removed from the store starting with
31//!   the lowest priority until a constraint is satisfied.
32//! * There may not be more than `MAX_TOTAL_STATEMENTS` total statements with `MAX_TOTAL_SIZE` size.
33//!   To satisfy this, statements are removed from the store starting with the lowest
34//!   `global_priority` until a constraint is satisfied.
35//!
36//! When a new statement is inserted that would not satisfy constraints in the first place, no
37//! statements are deleted and `Ignored` result is returned.
38//! The order in which statements with the same priority are deleted is unspecified.
39//!
40//! Statement expiration.
41//!
42//! Each time a statement is removed from the store (Either evicted by higher priority statement or
43//! explicitly with the `remove` function) the statement is marked as expired. Expired statements
44//! can't be added to the store for `Options::purge_after_sec` seconds. This is to prevent old
45//! statements from being propagated on the network.
46
47#![warn(missing_docs)]
48#![warn(unused_extern_crates)]
49
50mod metrics;
51
52pub use sp_statement_store::{Error, StatementStore, MAX_TOPICS};
53
54use metrics::MetricsLink as PrometheusMetrics;
55use parking_lot::RwLock;
56use prometheus_endpoint::Registry as PrometheusRegistry;
57use sc_keystore::LocalKeystore;
58use sp_api::ProvideRuntimeApi;
59use sp_blockchain::HeaderBackend;
60use sp_core::{crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode};
61use sp_runtime::traits::Block as BlockT;
62use sp_statement_store::{
63	runtime_api::{
64		InvalidStatement, StatementSource, StatementStoreExt, ValidStatement, ValidateStatement,
65	},
66	AccountId, BlockHash, Channel, DecryptionKey, Hash, NetworkPriority, Proof, Result, Statement,
67	SubmitResult, Topic,
68};
69use std::{
70	collections::{BTreeMap, HashMap, HashSet},
71	sync::Arc,
72};
73
74const KEY_VERSION: &[u8] = b"version".as_slice();
75const CURRENT_VERSION: u32 = 1;
76
77const LOG_TARGET: &str = "statement-store";
78
79const DEFAULT_PURGE_AFTER_SEC: u64 = 2 * 24 * 60 * 60; //48h
80const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 8192;
81const DEFAULT_MAX_TOTAL_SIZE: usize = 64 * 1024 * 1024;
82
83const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(30);
84
85mod col {
86	pub const META: u8 = 0;
87	pub const STATEMENTS: u8 = 1;
88	pub const EXPIRED: u8 = 2;
89
90	pub const COUNT: u8 = 3;
91}
92
93#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)]
94struct Priority(u32);
95
96#[derive(PartialEq, Eq)]
97struct PriorityKey {
98	hash: Hash,
99	priority: Priority,
100}
101
102impl PartialOrd for PriorityKey {
103	fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
104		Some(self.cmp(other))
105	}
106}
107
108impl Ord for PriorityKey {
109	fn cmp(&self, other: &Self) -> std::cmp::Ordering {
110		self.priority.cmp(&other.priority).then_with(|| self.hash.cmp(&other.hash))
111	}
112}
113
114#[derive(PartialEq, Eq)]
115struct ChannelEntry {
116	hash: Hash,
117	priority: Priority,
118}
119
120#[derive(Default)]
121struct StatementsForAccount {
122	// Statements ordered by priority.
123	by_priority: BTreeMap<PriorityKey, (Option<Channel>, usize)>,
124	// Channel to statement map. Only one statement per channel is allowed.
125	channels: HashMap<Channel, ChannelEntry>,
126	// Sum of all `Data` field sizes.
127	data_size: usize,
128}
129
130/// Store configuration
131pub struct Options {
132	/// Maximum statement allowed in the store. Once this limit is reached lower-priority
133	/// statements may be evicted.
134	max_total_statements: usize,
135	/// Maximum total data size allowed in the store. Once this limit is reached lower-priority
136	/// statements may be evicted.
137	max_total_size: usize,
138	/// Number of seconds for which removed statements won't be allowed to be added back in.
139	purge_after_sec: u64,
140}
141
142impl Default for Options {
143	fn default() -> Self {
144		Options {
145			max_total_statements: DEFAULT_MAX_TOTAL_STATEMENTS,
146			max_total_size: DEFAULT_MAX_TOTAL_SIZE,
147			purge_after_sec: DEFAULT_PURGE_AFTER_SEC,
148		}
149	}
150}
151
152#[derive(Default)]
153struct Index {
154	by_topic: HashMap<Topic, HashSet<Hash>>,
155	by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
156	topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
157	entries: HashMap<Hash, (AccountId, Priority, usize)>,
158	expired: HashMap<Hash, u64>, // Value is expiration timestamp.
159	accounts: HashMap<AccountId, StatementsForAccount>,
160	options: Options,
161	total_size: usize,
162}
163
164struct ClientWrapper<Block, Client> {
165	client: Arc<Client>,
166	_block: std::marker::PhantomData<Block>,
167}
168
169impl<Block, Client> ClientWrapper<Block, Client>
170where
171	Block: BlockT,
172	Block::Hash: From<BlockHash>,
173	Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
174	Client::Api: ValidateStatement<Block>,
175{
176	fn validate_statement(
177		&self,
178		block: Option<BlockHash>,
179		source: StatementSource,
180		statement: Statement,
181	) -> std::result::Result<ValidStatement, InvalidStatement> {
182		let api = self.client.runtime_api();
183		let block = block.map(Into::into).unwrap_or_else(|| {
184			// Validate against the finalized state.
185			self.client.info().finalized_hash
186		});
187		api.validate_statement(block, source, statement)
188			.map_err(|_| InvalidStatement::InternalError)?
189	}
190}
191
192/// Statement store.
193pub struct Store {
194	db: parity_db::Db,
195	index: RwLock<Index>,
196	validate_fn: Box<
197		dyn Fn(
198				Option<BlockHash>,
199				StatementSource,
200				Statement,
201			) -> std::result::Result<ValidStatement, InvalidStatement>
202			+ Send
203			+ Sync,
204	>,
205	keystore: Arc<LocalKeystore>,
206	// Used for testing
207	time_override: Option<u64>,
208	metrics: PrometheusMetrics,
209}
210
211enum IndexQuery {
212	Unknown,
213	Exists,
214	Expired,
215}
216
217enum MaybeInserted {
218	Inserted(HashSet<Hash>),
219	Ignored,
220}
221
222impl Index {
223	fn new(options: Options) -> Index {
224		Index { options, ..Default::default() }
225	}
226
227	fn insert_new(&mut self, hash: Hash, account: AccountId, statement: &Statement) {
228		let mut all_topics = [None; MAX_TOPICS];
229		let mut nt = 0;
230		while let Some(t) = statement.topic(nt) {
231			self.by_topic.entry(t).or_default().insert(hash);
232			all_topics[nt] = Some(t);
233			nt += 1;
234		}
235		let key = statement.decryption_key();
236		self.by_dec_key.entry(key).or_default().insert(hash);
237		if nt > 0 || key.is_some() {
238			self.topics_and_keys.insert(hash, (all_topics, key));
239		}
240		let priority = Priority(statement.priority().unwrap_or(0));
241		self.entries.insert(hash, (account, priority, statement.data_len()));
242		self.total_size += statement.data_len();
243		let account_info = self.accounts.entry(account).or_default();
244		account_info.data_size += statement.data_len();
245		if let Some(channel) = statement.channel() {
246			account_info.channels.insert(channel, ChannelEntry { hash, priority });
247		}
248		account_info
249			.by_priority
250			.insert(PriorityKey { hash, priority }, (statement.channel(), statement.data_len()));
251	}
252
253	fn query(&self, hash: &Hash) -> IndexQuery {
254		if self.entries.contains_key(hash) {
255			return IndexQuery::Exists
256		}
257		if self.expired.contains_key(hash) {
258			return IndexQuery::Expired
259		}
260		IndexQuery::Unknown
261	}
262
263	fn insert_expired(&mut self, hash: Hash, timestamp: u64) {
264		self.expired.insert(hash, timestamp);
265	}
266
267	fn iterate_with(
268		&self,
269		key: Option<DecryptionKey>,
270		match_all_topics: &[Topic],
271		mut f: impl FnMut(&Hash) -> Result<()>,
272	) -> Result<()> {
273		let empty = HashSet::new();
274		let mut sets: [&HashSet<Hash>; MAX_TOPICS + 1] = [&empty; MAX_TOPICS + 1];
275		if match_all_topics.len() > MAX_TOPICS {
276			return Ok(())
277		}
278		let key_set = self.by_dec_key.get(&key);
279		if key_set.map_or(0, |s| s.len()) == 0 {
280			// Key does not exist in the index.
281			return Ok(())
282		}
283		sets[0] = key_set.expect("Function returns if key_set is None");
284		for (i, t) in match_all_topics.iter().enumerate() {
285			let set = self.by_topic.get(t);
286			if set.map_or(0, |s| s.len()) == 0 {
287				// At least one of the match_all_topics does not exist in the index.
288				return Ok(())
289			}
290			sets[i + 1] = set.expect("Function returns if set is None");
291		}
292		let sets = &mut sets[0..match_all_topics.len() + 1];
293		// Start with the smallest topic set or the key set.
294		sets.sort_by_key(|s| s.len());
295		for item in sets[0] {
296			if sets[1..].iter().all(|set| set.contains(item)) {
297				log::trace!(
298					target: LOG_TARGET,
299					"Iterating by topic/key: statement {:?}",
300					HexDisplay::from(item)
301				);
302				f(item)?
303			}
304		}
305		Ok(())
306	}
307
308	fn maintain(&mut self, current_time: u64) -> Vec<Hash> {
309		// Purge previously expired messages.
310		let mut purged = Vec::new();
311		self.expired.retain(|hash, timestamp| {
312			if *timestamp + self.options.purge_after_sec <= current_time {
313				purged.push(*hash);
314				log::trace!(target: LOG_TARGET, "Purged statement {:?}", HexDisplay::from(hash));
315				false
316			} else {
317				true
318			}
319		});
320		purged
321	}
322
323	fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
324		if let Some((account, priority, len)) = self.entries.remove(hash) {
325			self.total_size -= len;
326			if let Some((topics, key)) = self.topics_and_keys.remove(hash) {
327				for t in topics.into_iter().flatten() {
328					if let std::collections::hash_map::Entry::Occupied(mut set) =
329						self.by_topic.entry(t)
330					{
331						set.get_mut().remove(hash);
332						if set.get().is_empty() {
333							set.remove_entry();
334						}
335					}
336				}
337				if let std::collections::hash_map::Entry::Occupied(mut set) =
338					self.by_dec_key.entry(key)
339				{
340					set.get_mut().remove(hash);
341					if set.get().is_empty() {
342						set.remove_entry();
343					}
344				}
345			}
346			self.expired.insert(*hash, current_time);
347			if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
348				self.accounts.entry(account)
349			{
350				let key = PriorityKey { hash: *hash, priority };
351				if let Some((channel, len)) = account_rec.get_mut().by_priority.remove(&key) {
352					account_rec.get_mut().data_size -= len;
353					if let Some(channel) = channel {
354						account_rec.get_mut().channels.remove(&channel);
355					}
356				}
357				if account_rec.get().by_priority.is_empty() {
358					account_rec.remove_entry();
359				}
360			}
361			log::trace!(target: LOG_TARGET, "Expired statement {:?}", HexDisplay::from(hash));
362			true
363		} else {
364			false
365		}
366	}
367
368	fn insert(
369		&mut self,
370		hash: Hash,
371		statement: &Statement,
372		account: &AccountId,
373		validation: &ValidStatement,
374		current_time: u64,
375	) -> MaybeInserted {
376		let statement_len = statement.data_len();
377		if statement_len > validation.max_size as usize {
378			log::debug!(
379				target: LOG_TARGET,
380				"Ignored oversize message: {:?} ({} bytes)",
381				HexDisplay::from(&hash),
382				statement_len,
383			);
384			return MaybeInserted::Ignored
385		}
386
387		let mut evicted = HashSet::new();
388		let mut would_free_size = 0;
389		let priority = Priority(statement.priority().unwrap_or(0));
390		let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize);
391		// It may happen that we can't delete enough lower priority messages
392		// to satisfy size constraints. We check for that before deleting anything,
393		// taking into account channel message replacement.
394		if let Some(account_rec) = self.accounts.get(account) {
395			if let Some(channel) = statement.channel() {
396				if let Some(channel_record) = account_rec.channels.get(&channel) {
397					if priority <= channel_record.priority {
398						// Trying to replace channel message with lower priority
399						log::debug!(
400							target: LOG_TARGET,
401							"Ignored lower priority channel message: {:?} {:?} <= {:?}",
402							HexDisplay::from(&hash),
403							priority,
404							channel_record.priority,
405						);
406						return MaybeInserted::Ignored
407					} else {
408						// Would replace channel message. Still need to check for size constraints
409						// below.
410						log::debug!(
411							target: LOG_TARGET,
412							"Replacing higher priority channel message: {:?} ({:?}) > {:?} ({:?})",
413							HexDisplay::from(&hash),
414							priority,
415							HexDisplay::from(&channel_record.hash),
416							channel_record.priority,
417						);
418						let key = PriorityKey {
419							hash: channel_record.hash,
420							priority: channel_record.priority,
421						};
422						if let Some((_channel, len)) = account_rec.by_priority.get(&key) {
423							would_free_size += *len;
424							evicted.insert(channel_record.hash);
425						}
426					}
427				}
428			}
429			// Check if we can evict enough lower priority statements to satisfy constraints
430			for (entry, (_, len)) in account_rec.by_priority.iter() {
431				if (account_rec.data_size - would_free_size + statement_len <= max_size) &&
432					account_rec.by_priority.len() + 1 - evicted.len() <= max_count
433				{
434					// Satisfied
435					break
436				}
437				if evicted.contains(&entry.hash) {
438					// Already accounted for above
439					continue
440				}
441				if entry.priority >= priority {
442					log::debug!(
443						target: LOG_TARGET,
444						"Ignored message due to constraints {:?} {:?} < {:?}",
445						HexDisplay::from(&hash),
446						priority,
447						entry.priority,
448					);
449					return MaybeInserted::Ignored
450				}
451				evicted.insert(entry.hash);
452				would_free_size += len;
453			}
454		}
455		// Now check global constraints as well.
456		if !((self.total_size - would_free_size + statement_len <= self.options.max_total_size) &&
457			self.entries.len() + 1 - evicted.len() <= self.options.max_total_statements)
458		{
459			log::debug!(
460				target: LOG_TARGET,
461				"Ignored statement {} because the store is full (size={}, count={})",
462				HexDisplay::from(&hash),
463				self.total_size,
464				self.entries.len(),
465			);
466			return MaybeInserted::Ignored
467		}
468
469		for h in &evicted {
470			self.make_expired(h, current_time);
471		}
472		self.insert_new(hash, *account, statement);
473		MaybeInserted::Inserted(evicted)
474	}
475}
476
477impl Store {
478	/// Create a new shared store instance. There should only be one per process.
479	/// `path` will be used to open a statement database or create a new one if it does not exist.
480	pub fn new_shared<Block, Client>(
481		path: &std::path::Path,
482		options: Options,
483		client: Arc<Client>,
484		keystore: Arc<LocalKeystore>,
485		prometheus: Option<&PrometheusRegistry>,
486		task_spawner: &dyn SpawnNamed,
487	) -> Result<Arc<Store>>
488	where
489		Block: BlockT,
490		Block::Hash: From<BlockHash>,
491		Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
492		Client::Api: ValidateStatement<Block>,
493	{
494		let store = Arc::new(Self::new(path, options, client, keystore, prometheus)?);
495
496		// Perform periodic statement store maintenance
497		let worker_store = store.clone();
498		task_spawner.spawn(
499			"statement-store-maintenance",
500			Some("statement-store"),
501			Box::pin(async move {
502				let mut interval = tokio::time::interval(MAINTENANCE_PERIOD);
503				loop {
504					interval.tick().await;
505					worker_store.maintain();
506				}
507			}),
508		);
509
510		Ok(store)
511	}
512
513	/// Create a new instance.
514	/// `path` will be used to open a statement database or create a new one if it does not exist.
515	fn new<Block, Client>(
516		path: &std::path::Path,
517		options: Options,
518		client: Arc<Client>,
519		keystore: Arc<LocalKeystore>,
520		prometheus: Option<&PrometheusRegistry>,
521	) -> Result<Store>
522	where
523		Block: BlockT,
524		Block::Hash: From<BlockHash>,
525		Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
526		Client::Api: ValidateStatement<Block>,
527	{
528		let mut path: std::path::PathBuf = path.into();
529		path.push("statements");
530
531		let mut config = parity_db::Options::with_columns(&path, col::COUNT);
532
533		let statement_col = &mut config.columns[col::STATEMENTS as usize];
534		statement_col.ref_counted = false;
535		statement_col.preimage = true;
536		statement_col.uniform = true;
537		let db = parity_db::Db::open_or_create(&config).map_err(|e| Error::Db(e.to_string()))?;
538		match db.get(col::META, &KEY_VERSION).map_err(|e| Error::Db(e.to_string()))? {
539			Some(version) => {
540				let version = u32::from_le_bytes(
541					version
542						.try_into()
543						.map_err(|_| Error::Db("Error reading database version".into()))?,
544				);
545				if version != CURRENT_VERSION {
546					return Err(Error::Db(format!("Unsupported database version: {version}")))
547				}
548			},
549			None => {
550				db.commit([(
551					col::META,
552					KEY_VERSION.to_vec(),
553					Some(CURRENT_VERSION.to_le_bytes().to_vec()),
554				)])
555				.map_err(|e| Error::Db(e.to_string()))?;
556			},
557		}
558
559		let validator = ClientWrapper { client, _block: Default::default() };
560		let validate_fn = Box::new(move |block, source, statement| {
561			validator.validate_statement(block, source, statement)
562		});
563
564		let store = Store {
565			db,
566			index: RwLock::new(Index::new(options)),
567			validate_fn,
568			keystore,
569			time_override: None,
570			metrics: PrometheusMetrics::new(prometheus),
571		};
572		store.populate()?;
573		Ok(store)
574	}
575
576	/// Create memory index from the data.
577	// This may be moved to a background thread if it slows startup too much.
578	// This function should only be used on startup. There should be no other DB operations when
579	// iterating the index.
580	fn populate(&self) -> Result<()> {
581		{
582			let mut index = self.index.write();
583			self.db
584				.iter_column_while(col::STATEMENTS, |item| {
585					let statement = item.value;
586					if let Ok(statement) = Statement::decode(&mut statement.as_slice()) {
587						let hash = statement.hash();
588						log::trace!(
589							target: LOG_TARGET,
590							"Statement loaded {:?}",
591							HexDisplay::from(&hash)
592						);
593						if let Some(account_id) = statement.account_id() {
594							index.insert_new(hash, account_id, &statement);
595						} else {
596							log::debug!(
597								target: LOG_TARGET,
598								"Error decoding statement loaded from the DB: {:?}",
599								HexDisplay::from(&hash)
600							);
601						}
602					}
603					true
604				})
605				.map_err(|e| Error::Db(e.to_string()))?;
606			self.db
607				.iter_column_while(col::EXPIRED, |item| {
608					let expired_info = item.value;
609					if let Ok((hash, timestamp)) =
610						<(Hash, u64)>::decode(&mut expired_info.as_slice())
611					{
612						log::trace!(
613							target: LOG_TARGET,
614							"Statement loaded (expired): {:?}",
615							HexDisplay::from(&hash)
616						);
617						index.insert_expired(hash, timestamp);
618					}
619					true
620				})
621				.map_err(|e| Error::Db(e.to_string()))?;
622		}
623
624		self.maintain();
625		Ok(())
626	}
627
628	fn collect_statements<R>(
629		&self,
630		key: Option<DecryptionKey>,
631		match_all_topics: &[Topic],
632		mut f: impl FnMut(Statement) -> Option<R>,
633	) -> Result<Vec<R>> {
634		let mut result = Vec::new();
635		let index = self.index.read();
636		index.iterate_with(key, match_all_topics, |hash| {
637			match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? {
638				Some(entry) => {
639					if let Ok(statement) = Statement::decode(&mut entry.as_slice()) {
640						if let Some(data) = f(statement) {
641							result.push(data);
642						}
643					} else {
644						// DB inconsistency
645						log::warn!(
646							target: LOG_TARGET,
647							"Corrupt statement {:?}",
648							HexDisplay::from(hash)
649						);
650					}
651				},
652				None => {
653					// DB inconsistency
654					log::warn!(
655						target: LOG_TARGET,
656						"Missing statement {:?}",
657						HexDisplay::from(hash)
658					);
659				},
660			}
661			Ok(())
662		})?;
663		Ok(result)
664	}
665
666	/// Perform periodic store maintenance
667	pub fn maintain(&self) {
668		log::trace!(target: LOG_TARGET, "Started store maintenance");
669		let (deleted, active_count, expired_count): (Vec<_>, usize, usize) = {
670			let mut index = self.index.write();
671			let deleted = index.maintain(self.timestamp());
672			(deleted, index.entries.len(), index.expired.len())
673		};
674		let deleted: Vec<_> =
675			deleted.into_iter().map(|hash| (col::EXPIRED, hash.to_vec(), None)).collect();
676		let deleted_count = deleted.len() as u64;
677		if let Err(e) = self.db.commit(deleted) {
678			log::warn!(target: LOG_TARGET, "Error writing to the statement database: {:?}", e);
679		} else {
680			self.metrics.report(|metrics| metrics.statements_pruned.inc_by(deleted_count));
681		}
682		log::trace!(
683			target: LOG_TARGET,
684			"Completed store maintenance. Purged: {}, Active: {}, Expired: {}",
685			deleted_count,
686			active_count,
687			expired_count
688		);
689	}
690
691	fn timestamp(&self) -> u64 {
692		self.time_override.unwrap_or_else(|| {
693			std::time::SystemTime::now()
694				.duration_since(std::time::UNIX_EPOCH)
695				.unwrap_or_default()
696				.as_secs()
697		})
698	}
699
700	#[cfg(test)]
701	fn set_time(&mut self, time: u64) {
702		self.time_override = Some(time);
703	}
704
705	/// Returns `self` as [`StatementStoreExt`].
706	pub fn as_statement_store_ext(self: Arc<Self>) -> StatementStoreExt {
707		StatementStoreExt::new(self)
708	}
709
710	/// Return information of all known statements whose decryption key is identified as
711	/// `dest`. The key must be available to the client.
712	fn posted_clear_inner<R>(
713		&self,
714		match_all_topics: &[Topic],
715		dest: [u8; 32],
716		// Map the statement and the decrypted data to the desired result.
717		mut map_f: impl FnMut(Statement, Vec<u8>) -> R,
718	) -> Result<Vec<R>> {
719		self.collect_statements(Some(dest), match_all_topics, |statement| {
720			if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) {
721				let public: sp_core::ed25519::Public = UncheckedFrom::unchecked_from(key);
722				let public: sp_statement_store::ed25519::Public = public.into();
723				match self.keystore.key_pair::<sp_statement_store::ed25519::Pair>(&public) {
724					Err(e) => {
725						log::debug!(
726							target: LOG_TARGET,
727							"Keystore error: {:?}, for statement {:?}",
728							e,
729							HexDisplay::from(&statement.hash())
730						);
731						None
732					},
733					Ok(None) => {
734						log::debug!(
735							target: LOG_TARGET,
736							"Keystore is missing key for statement {:?}",
737							HexDisplay::from(&statement.hash())
738						);
739						None
740					},
741					Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) {
742						Ok(r) => r.map(|data| map_f(statement, data)),
743						Err(e) => {
744							log::debug!(
745								target: LOG_TARGET,
746								"Decryption error: {:?}, for statement {:?}",
747								e,
748								HexDisplay::from(&statement.hash())
749							);
750							None
751						},
752					},
753				}
754			} else {
755				None
756			}
757		})
758	}
759}
760
761impl StatementStore for Store {
762	/// Return all statements.
763	fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
764		let index = self.index.read();
765		let mut result = Vec::with_capacity(index.entries.len());
766		for h in index.entries.keys() {
767			let encoded = self.db.get(col::STATEMENTS, h).map_err(|e| Error::Db(e.to_string()))?;
768			if let Some(encoded) = encoded {
769				if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
770					let hash = statement.hash();
771					result.push((hash, statement));
772				}
773			}
774		}
775		Ok(result)
776	}
777
778	/// Returns a statement by hash.
779	fn statement(&self, hash: &Hash) -> Result<Option<Statement>> {
780		Ok(
781			match self
782				.db
783				.get(col::STATEMENTS, hash.as_slice())
784				.map_err(|e| Error::Db(e.to_string()))?
785			{
786				Some(entry) => {
787					log::trace!(
788						target: LOG_TARGET,
789						"Queried statement {:?}",
790						HexDisplay::from(hash)
791					);
792					Some(
793						Statement::decode(&mut entry.as_slice())
794							.map_err(|e| Error::Decode(e.to_string()))?,
795					)
796				},
797				None => {
798					log::trace!(
799						target: LOG_TARGET,
800						"Queried missing statement {:?}",
801						HexDisplay::from(hash)
802					);
803					None
804				},
805			},
806		)
807	}
808
809	/// Return the data of all known statements which include all topics and have no `DecryptionKey`
810	/// field.
811	fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
812		self.collect_statements(None, match_all_topics, |statement| statement.into_data())
813	}
814
815	/// Return the data of all known statements whose decryption key is identified as `dest` (this
816	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
817	/// private key for symmetric ciphers).
818	fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
819		self.collect_statements(Some(dest), match_all_topics, |statement| statement.into_data())
820	}
821
822	/// Return the decrypted data of all known statements whose decryption key is identified as
823	/// `dest`. The key must be available to the client.
824	fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
825		self.posted_clear_inner(match_all_topics, dest, |_statement, data| data)
826	}
827
828	/// Return all known statements which include all topics and have no `DecryptionKey`
829	/// field.
830	fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
831		self.collect_statements(None, match_all_topics, |statement| Some(statement.encode()))
832	}
833
834	/// Return all known statements whose decryption key is identified as `dest` (this
835	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
836	/// private key for symmetric ciphers).
837	fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
838		self.collect_statements(Some(dest), match_all_topics, |statement| Some(statement.encode()))
839	}
840
841	/// Return the statement and the decrypted data of all known statements whose decryption key is
842	/// identified as `dest`. The key must be available to the client.
843	fn posted_clear_stmt(
844		&self,
845		match_all_topics: &[Topic],
846		dest: [u8; 32],
847	) -> Result<Vec<Vec<u8>>> {
848		self.posted_clear_inner(match_all_topics, dest, |statement, data| {
849			let mut res = Vec::with_capacity(statement.size_hint() + data.len());
850			statement.encode_to(&mut res);
851			res.extend_from_slice(&data);
852			res
853		})
854	}
855
856	/// Submit a statement to the store. Validates the statement and returns validation result.
857	fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult {
858		let hash = statement.hash();
859		match self.index.read().query(&hash) {
860			IndexQuery::Expired =>
861				if !source.can_be_resubmitted() {
862					return SubmitResult::KnownExpired
863				},
864			IndexQuery::Exists =>
865				if !source.can_be_resubmitted() {
866					return SubmitResult::Known
867				},
868			IndexQuery::Unknown => {},
869		}
870
871		let Some(account_id) = statement.account_id() else {
872			log::debug!(
873				target: LOG_TARGET,
874				"Statement validation failed: Missing proof ({:?})",
875				HexDisplay::from(&hash),
876			);
877			self.metrics.report(|metrics| metrics.validations_invalid.inc());
878			return SubmitResult::Bad("No statement proof")
879		};
880
881		// Validate.
882		let at_block = if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
883			Some(*block_hash)
884		} else {
885			None
886		};
887		let validation_result = (self.validate_fn)(at_block, source, statement.clone());
888		let validation = match validation_result {
889			Ok(validation) => validation,
890			Err(InvalidStatement::BadProof) => {
891				log::debug!(
892					target: LOG_TARGET,
893					"Statement validation failed: BadProof, {:?}",
894					HexDisplay::from(&hash),
895				);
896				self.metrics.report(|metrics| metrics.validations_invalid.inc());
897				return SubmitResult::Bad("Bad statement proof")
898			},
899			Err(InvalidStatement::NoProof) => {
900				log::debug!(
901					target: LOG_TARGET,
902					"Statement validation failed: NoProof, {:?}",
903					HexDisplay::from(&hash),
904				);
905				self.metrics.report(|metrics| metrics.validations_invalid.inc());
906				return SubmitResult::Bad("Missing statement proof")
907			},
908			Err(InvalidStatement::InternalError) =>
909				return SubmitResult::InternalError(Error::Runtime),
910		};
911
912		let current_time = self.timestamp();
913		let mut commit = Vec::new();
914		{
915			let mut index = self.index.write();
916
917			let evicted =
918				match index.insert(hash, &statement, &account_id, &validation, current_time) {
919					MaybeInserted::Ignored => return SubmitResult::Ignored,
920					MaybeInserted::Inserted(evicted) => evicted,
921				};
922
923			commit.push((col::STATEMENTS, hash.to_vec(), Some(statement.encode())));
924			for hash in evicted {
925				commit.push((col::STATEMENTS, hash.to_vec(), None));
926				commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
927			}
928			if let Err(e) = self.db.commit(commit) {
929				log::debug!(
930					target: LOG_TARGET,
931					"Statement validation failed: database error {}, {:?}",
932					e,
933					statement
934				);
935				return SubmitResult::InternalError(Error::Db(e.to_string()))
936			}
937		} // Release index lock
938		self.metrics.report(|metrics| metrics.submitted_statements.inc());
939		let network_priority = NetworkPriority::High;
940		log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash));
941		SubmitResult::New(network_priority)
942	}
943
944	/// Remove a statement by hash.
945	fn remove(&self, hash: &Hash) -> Result<()> {
946		let current_time = self.timestamp();
947		{
948			let mut index = self.index.write();
949			if index.make_expired(hash, current_time) {
950				let commit = [
951					(col::STATEMENTS, hash.to_vec(), None),
952					(col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())),
953				];
954				if let Err(e) = self.db.commit(commit) {
955					log::debug!(
956						target: LOG_TARGET,
957						"Error removing statement: database error {}, {:?}",
958						e,
959						HexDisplay::from(hash),
960					);
961					return Err(Error::Db(e.to_string()))
962				}
963			}
964		}
965		Ok(())
966	}
967
968	/// Remove all statements by an account.
969	fn remove_by(&self, who: [u8; 32]) -> Result<()> {
970		let mut index = self.index.write();
971		let mut evicted = Vec::new();
972		if let Some(account_rec) = index.accounts.get(&who) {
973			evicted.extend(account_rec.by_priority.keys().map(|k| k.hash));
974		}
975
976		let current_time = self.timestamp();
977		let mut commit = Vec::new();
978		for hash in evicted {
979			index.make_expired(&hash, current_time);
980			commit.push((col::STATEMENTS, hash.to_vec(), None));
981			commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
982		}
983		self.db.commit(commit).map_err(|e| {
984			log::debug!(
985				target: LOG_TARGET,
986				"Error removing statement: database error {}, remove by {:?}",
987				e,
988				HexDisplay::from(&who),
989			);
990
991			Error::Db(e.to_string())
992		})
993	}
994}
995
996#[cfg(test)]
997mod tests {
998	use crate::Store;
999	use sc_keystore::Keystore;
1000	use sp_core::{Decode, Encode, Pair};
1001	use sp_statement_store::{
1002		runtime_api::{InvalidStatement, ValidStatement, ValidateStatement},
1003		AccountId, Channel, DecryptionKey, NetworkPriority, Proof, SignatureVerificationResult,
1004		Statement, StatementSource, StatementStore, SubmitResult, Topic,
1005	};
1006
1007	type Extrinsic = sp_runtime::OpaqueExtrinsic;
1008	type Hash = sp_core::H256;
1009	type Hashing = sp_runtime::traits::BlakeTwo256;
1010	type BlockNumber = u64;
1011	type Header = sp_runtime::generic::Header<BlockNumber, Hashing>;
1012	type Block = sp_runtime::generic::Block<Header, Extrinsic>;
1013
1014	const CORRECT_BLOCK_HASH: [u8; 32] = [1u8; 32];
1015
1016	#[derive(Clone)]
1017	pub(crate) struct TestClient;
1018
1019	pub(crate) struct RuntimeApi {
1020		_inner: TestClient,
1021	}
1022
1023	impl sp_api::ProvideRuntimeApi<Block> for TestClient {
1024		type Api = RuntimeApi;
1025		fn runtime_api(&self) -> sp_api::ApiRef<Self::Api> {
1026			RuntimeApi { _inner: self.clone() }.into()
1027		}
1028	}
1029
1030	sp_api::mock_impl_runtime_apis! {
1031		impl ValidateStatement<Block> for RuntimeApi {
1032			fn validate_statement(
1033				_source: StatementSource,
1034				statement: Statement,
1035			) -> std::result::Result<ValidStatement, InvalidStatement> {
1036				use crate::tests::account;
1037				match statement.verify_signature() {
1038					SignatureVerificationResult::Valid(_) => Ok(ValidStatement{max_count: 100, max_size: 1000}),
1039					SignatureVerificationResult::Invalid => Err(InvalidStatement::BadProof),
1040					SignatureVerificationResult::NoSignature => {
1041						if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
1042							if block_hash == &CORRECT_BLOCK_HASH {
1043								let (max_count, max_size) = match statement.account_id() {
1044									Some(a) if a == account(1) => (1, 1000),
1045									Some(a) if a == account(2) => (2, 1000),
1046									Some(a) if a == account(3) => (3, 1000),
1047									Some(a) if a == account(4) => (4, 1000),
1048									_ => (2, 2000),
1049								};
1050								Ok(ValidStatement{ max_count, max_size })
1051							} else {
1052								Err(InvalidStatement::BadProof)
1053							}
1054						} else {
1055							Err(InvalidStatement::BadProof)
1056						}
1057					}
1058				}
1059			}
1060		}
1061	}
1062
1063	impl sp_blockchain::HeaderBackend<Block> for TestClient {
1064		fn header(&self, _hash: Hash) -> sp_blockchain::Result<Option<Header>> {
1065			unimplemented!()
1066		}
1067		fn info(&self) -> sp_blockchain::Info<Block> {
1068			sp_blockchain::Info {
1069				best_hash: CORRECT_BLOCK_HASH.into(),
1070				best_number: 0,
1071				genesis_hash: Default::default(),
1072				finalized_hash: CORRECT_BLOCK_HASH.into(),
1073				finalized_number: 1,
1074				finalized_state: None,
1075				number_leaves: 0,
1076				block_gap: None,
1077			}
1078		}
1079		fn status(&self, _hash: Hash) -> sp_blockchain::Result<sp_blockchain::BlockStatus> {
1080			unimplemented!()
1081		}
1082		fn number(&self, _hash: Hash) -> sp_blockchain::Result<Option<BlockNumber>> {
1083			unimplemented!()
1084		}
1085		fn hash(&self, _number: BlockNumber) -> sp_blockchain::Result<Option<Hash>> {
1086			unimplemented!()
1087		}
1088	}
1089
1090	fn test_store() -> (Store, tempfile::TempDir) {
1091		sp_tracing::init_for_tests();
1092		let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
1093
1094		let client = std::sync::Arc::new(TestClient);
1095		let mut path: std::path::PathBuf = temp_dir.path().into();
1096		path.push("db");
1097		let keystore = std::sync::Arc::new(sc_keystore::LocalKeystore::in_memory());
1098		let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1099		(store, temp_dir) // return order is important. Store must be dropped before TempDir
1100	}
1101
1102	fn signed_statement(data: u8) -> Statement {
1103		signed_statement_with_topics(data, &[], None)
1104	}
1105
1106	fn signed_statement_with_topics(
1107		data: u8,
1108		topics: &[Topic],
1109		dec_key: Option<DecryptionKey>,
1110	) -> Statement {
1111		let mut statement = Statement::new();
1112		statement.set_plain_data(vec![data]);
1113		for i in 0..topics.len() {
1114			statement.set_topic(i, topics[i]);
1115		}
1116		if let Some(key) = dec_key {
1117			statement.set_decryption_key(key);
1118		}
1119		let kp = sp_core::ed25519::Pair::from_string("//Alice", None).unwrap();
1120		statement.sign_ed25519_private(&kp);
1121		statement
1122	}
1123
1124	fn topic(data: u64) -> Topic {
1125		let mut topic: Topic = Default::default();
1126		topic[0..8].copy_from_slice(&data.to_le_bytes());
1127		topic
1128	}
1129
1130	fn dec_key(data: u64) -> DecryptionKey {
1131		let mut dec_key: DecryptionKey = Default::default();
1132		dec_key[0..8].copy_from_slice(&data.to_le_bytes());
1133		dec_key
1134	}
1135
1136	fn account(id: u64) -> AccountId {
1137		let mut account: AccountId = Default::default();
1138		account[0..8].copy_from_slice(&id.to_le_bytes());
1139		account
1140	}
1141
1142	fn channel(id: u64) -> Channel {
1143		let mut channel: Channel = Default::default();
1144		channel[0..8].copy_from_slice(&id.to_le_bytes());
1145		channel
1146	}
1147
1148	fn statement(account_id: u64, priority: u32, c: Option<u64>, data_len: usize) -> Statement {
1149		let mut statement = Statement::new();
1150		let mut data = Vec::new();
1151		data.resize(data_len, 0);
1152		statement.set_plain_data(data);
1153		statement.set_priority(priority);
1154		if let Some(c) = c {
1155			statement.set_channel(channel(c));
1156		}
1157		statement.set_proof(Proof::OnChain {
1158			block_hash: CORRECT_BLOCK_HASH,
1159			who: account(account_id),
1160			event_index: 0,
1161		});
1162		statement
1163	}
1164
1165	#[test]
1166	fn submit_one() {
1167		let (store, _temp) = test_store();
1168		let statement0 = signed_statement(0);
1169		assert_eq!(
1170			store.submit(statement0, StatementSource::Network),
1171			SubmitResult::New(NetworkPriority::High)
1172		);
1173		let unsigned = statement(0, 1, None, 0);
1174		assert_eq!(
1175			store.submit(unsigned, StatementSource::Network),
1176			SubmitResult::New(NetworkPriority::High)
1177		);
1178	}
1179
1180	#[test]
1181	fn save_and_load_statements() {
1182		let (store, temp) = test_store();
1183		let statement0 = signed_statement(0);
1184		let statement1 = signed_statement(1);
1185		let statement2 = signed_statement(2);
1186		assert_eq!(
1187			store.submit(statement0.clone(), StatementSource::Network),
1188			SubmitResult::New(NetworkPriority::High)
1189		);
1190		assert_eq!(
1191			store.submit(statement1.clone(), StatementSource::Network),
1192			SubmitResult::New(NetworkPriority::High)
1193		);
1194		assert_eq!(
1195			store.submit(statement2.clone(), StatementSource::Network),
1196			SubmitResult::New(NetworkPriority::High)
1197		);
1198		assert_eq!(store.statements().unwrap().len(), 3);
1199		assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1200		assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1.clone()));
1201		let keystore = store.keystore.clone();
1202		drop(store);
1203
1204		let client = std::sync::Arc::new(TestClient);
1205		let mut path: std::path::PathBuf = temp.path().into();
1206		path.push("db");
1207		let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1208		assert_eq!(store.statements().unwrap().len(), 3);
1209		assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1210		assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
1211	}
1212
1213	#[test]
1214	fn search_by_topic_and_key() {
1215		let (store, _temp) = test_store();
1216		let statement0 = signed_statement(0);
1217		let statement1 = signed_statement_with_topics(1, &[topic(0)], None);
1218		let statement2 = signed_statement_with_topics(2, &[topic(0), topic(1)], Some(dec_key(2)));
1219		let statement3 = signed_statement_with_topics(3, &[topic(0), topic(1), topic(2)], None);
1220		let statement4 =
1221			signed_statement_with_topics(4, &[topic(0), topic(42), topic(2), topic(3)], None);
1222		let statements = vec![statement0, statement1, statement2, statement3, statement4];
1223		for s in &statements {
1224			store.submit(s.clone(), StatementSource::Network);
1225		}
1226
1227		let assert_topics = |topics: &[u64], key: Option<u64>, expected: &[u8]| {
1228			let key = key.map(dec_key);
1229			let topics: Vec<_> = topics.iter().map(|t| topic(*t)).collect();
1230			let mut got_vals: Vec<_> = if let Some(key) = key {
1231				store.posted(&topics, key).unwrap().into_iter().map(|d| d[0]).collect()
1232			} else {
1233				store.broadcasts(&topics).unwrap().into_iter().map(|d| d[0]).collect()
1234			};
1235			got_vals.sort();
1236			assert_eq!(expected.to_vec(), got_vals);
1237		};
1238
1239		assert_topics(&[], None, &[0, 1, 3, 4]);
1240		assert_topics(&[], Some(2), &[2]);
1241		assert_topics(&[0], None, &[1, 3, 4]);
1242		assert_topics(&[1], None, &[3]);
1243		assert_topics(&[2], None, &[3, 4]);
1244		assert_topics(&[3], None, &[4]);
1245		assert_topics(&[42], None, &[4]);
1246
1247		assert_topics(&[0, 1], None, &[3]);
1248		assert_topics(&[0, 1], Some(2), &[2]);
1249		assert_topics(&[0, 1, 99], Some(2), &[]);
1250		assert_topics(&[1, 2], None, &[3]);
1251		assert_topics(&[99], None, &[]);
1252		assert_topics(&[0, 99], None, &[]);
1253		assert_topics(&[0, 1, 2, 3, 42], None, &[]);
1254	}
1255
1256	#[test]
1257	fn constraints() {
1258		let (store, _temp) = test_store();
1259
1260		store.index.write().options.max_total_size = 3000;
1261		let source = StatementSource::Network;
1262		let ok = SubmitResult::New(NetworkPriority::High);
1263		let ignored = SubmitResult::Ignored;
1264
1265		// Account 1 (limit = 1 msg, 1000 bytes)
1266
1267		// Oversized statement is not allowed. Limit for account 1 is 1 msg, 1000 bytes
1268		assert_eq!(store.submit(statement(1, 1, Some(1), 2000), source), ignored);
1269		assert_eq!(store.submit(statement(1, 1, Some(1), 500), source), ok);
1270		// Would not replace channel message with same priority
1271		assert_eq!(store.submit(statement(1, 1, Some(1), 200), source), ignored);
1272		assert_eq!(store.submit(statement(1, 2, Some(1), 600), source), ok);
1273		// Submit another message to another channel with lower priority. Should not be allowed
1274		// because msg count limit is 1
1275		assert_eq!(store.submit(statement(1, 1, Some(2), 100), source), ignored);
1276		assert_eq!(store.index.read().expired.len(), 1);
1277
1278		// Account 2 (limit = 2 msg, 1000 bytes)
1279
1280		assert_eq!(store.submit(statement(2, 1, None, 500), source), ok);
1281		assert_eq!(store.submit(statement(2, 2, None, 100), source), ok);
1282		// Should evict priority 1
1283		assert_eq!(store.submit(statement(2, 3, None, 500), source), ok);
1284		assert_eq!(store.index.read().expired.len(), 2);
1285		// Should evict all
1286		assert_eq!(store.submit(statement(2, 4, None, 1000), source), ok);
1287		assert_eq!(store.index.read().expired.len(), 4);
1288
1289		// Account 3 (limit = 3 msg, 1000 bytes)
1290
1291		assert_eq!(store.submit(statement(3, 2, Some(1), 300), source), ok);
1292		assert_eq!(store.submit(statement(3, 3, Some(2), 300), source), ok);
1293		assert_eq!(store.submit(statement(3, 4, Some(3), 300), source), ok);
1294		// Should evict 2 and 3
1295		assert_eq!(store.submit(statement(3, 5, None, 500), source), ok);
1296		assert_eq!(store.index.read().expired.len(), 6);
1297
1298		assert_eq!(store.index.read().total_size, 2400);
1299		assert_eq!(store.index.read().entries.len(), 4);
1300
1301		// Should be over the global size limit
1302		assert_eq!(store.submit(statement(1, 1, None, 700), source), ignored);
1303		// Should be over the global count limit
1304		store.index.write().options.max_total_statements = 4;
1305		assert_eq!(store.submit(statement(1, 1, None, 100), source), ignored);
1306
1307		let mut expected_statements = vec![
1308			statement(1, 2, Some(1), 600).hash(),
1309			statement(2, 4, None, 1000).hash(),
1310			statement(3, 4, Some(3), 300).hash(),
1311			statement(3, 5, None, 500).hash(),
1312		];
1313		expected_statements.sort();
1314		let mut statements: Vec<_> =
1315			store.statements().unwrap().into_iter().map(|(hash, _)| hash).collect();
1316		statements.sort();
1317		assert_eq!(expected_statements, statements);
1318	}
1319
1320	#[test]
1321	fn expired_statements_are_purged() {
1322		use super::DEFAULT_PURGE_AFTER_SEC;
1323		let (mut store, temp) = test_store();
1324		let mut statement = statement(1, 1, Some(3), 100);
1325		store.set_time(0);
1326		statement.set_topic(0, topic(4));
1327		store.submit(statement.clone(), StatementSource::Network);
1328		assert_eq!(store.index.read().entries.len(), 1);
1329		store.remove(&statement.hash()).unwrap();
1330		assert_eq!(store.index.read().entries.len(), 0);
1331		assert_eq!(store.index.read().accounts.len(), 0);
1332		store.set_time(DEFAULT_PURGE_AFTER_SEC + 1);
1333		store.maintain();
1334		assert_eq!(store.index.read().expired.len(), 0);
1335		let keystore = store.keystore.clone();
1336		drop(store);
1337
1338		let client = std::sync::Arc::new(TestClient);
1339		let mut path: std::path::PathBuf = temp.path().into();
1340		path.push("db");
1341		let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1342		assert_eq!(store.statements().unwrap().len(), 0);
1343		assert_eq!(store.index.read().expired.len(), 0);
1344	}
1345
1346	#[test]
1347	fn posted_clear_decrypts() {
1348		let (store, _temp) = test_store();
1349		let public = store
1350			.keystore
1351			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1352			.unwrap();
1353		let statement1 = statement(1, 1, None, 100);
1354		let mut statement2 = statement(1, 2, None, 0);
1355		let plain = b"The most valuable secret".to_vec();
1356		statement2.encrypt(&plain, &public).unwrap();
1357		store.submit(statement1, StatementSource::Network);
1358		store.submit(statement2, StatementSource::Network);
1359		let posted_clear = store.posted_clear(&[], public.into()).unwrap();
1360		assert_eq!(posted_clear, vec![plain]);
1361	}
1362
1363	#[test]
1364	fn broadcasts_stmt_returns_encoded_statements() {
1365		let (store, _tmp) = test_store();
1366
1367		// no key, no topic
1368		let s0 = signed_statement_with_topics(0, &[], None);
1369		// same, but with a topic = 42
1370		let s1 = signed_statement_with_topics(1, &[topic(42)], None);
1371		// has a decryption key -> must NOT be returned by broadcasts_stmt
1372		let s2 = signed_statement_with_topics(2, &[topic(42)], Some(dec_key(99)));
1373
1374		for s in [&s0, &s1, &s2] {
1375			store.submit(s.clone(), StatementSource::Network);
1376		}
1377
1378		// no topic filter
1379		let mut hashes: Vec<_> = store
1380			.broadcasts_stmt(&[])
1381			.unwrap()
1382			.into_iter()
1383			.map(|bytes| Statement::decode(&mut &bytes[..]).unwrap().hash())
1384			.collect();
1385		hashes.sort();
1386		let expected_hashes = {
1387			let mut e = vec![s0.hash(), s1.hash()];
1388			e.sort();
1389			e
1390		};
1391		assert_eq!(hashes, expected_hashes);
1392
1393		// filter on topic 42
1394		let got = store.broadcasts_stmt(&[topic(42)]).unwrap();
1395		assert_eq!(got.len(), 1);
1396		let st = Statement::decode(&mut &got[0][..]).unwrap();
1397		assert_eq!(st.hash(), s1.hash());
1398	}
1399
1400	#[test]
1401	fn posted_stmt_returns_encoded_statements_for_dest() {
1402		let (store, _tmp) = test_store();
1403
1404		let public1 = store
1405			.keystore
1406			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1407			.unwrap();
1408		let dest: [u8; 32] = public1.into();
1409
1410		let public2 = store
1411			.keystore
1412			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1413			.unwrap();
1414
1415		// A statement that does have dec_key = dest
1416		let mut s_with_key = statement(1, 1, None, 0);
1417		let plain1 = b"The most valuable secret".to_vec();
1418		s_with_key.encrypt(&plain1, &public1).unwrap();
1419
1420		// A statement with a different dec_key
1421		let mut s_other_key = statement(2, 2, None, 0);
1422		let plain2 = b"The second most valuable secret".to_vec();
1423		s_other_key.encrypt(&plain2, &public2).unwrap();
1424
1425		// Submit them all
1426		for s in [&s_with_key, &s_other_key] {
1427			store.submit(s.clone(), StatementSource::Network);
1428		}
1429
1430		// posted_stmt should only return the one with dec_key = dest
1431		let retrieved = store.posted_stmt(&[], dest).unwrap();
1432		assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1433
1434		// Re-decode that returned statement to confirm it is correct
1435		let returned_stmt = Statement::decode(&mut &retrieved[0][..]).unwrap();
1436		assert_eq!(
1437			returned_stmt.hash(),
1438			s_with_key.hash(),
1439			"Returned statement must match s_with_key"
1440		);
1441	}
1442
1443	#[test]
1444	fn posted_clear_stmt_returns_statement_followed_by_plain_data() {
1445		let (store, _tmp) = test_store();
1446
1447		let public1 = store
1448			.keystore
1449			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1450			.unwrap();
1451		let dest: [u8; 32] = public1.into();
1452
1453		let public2 = store
1454			.keystore
1455			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1456			.unwrap();
1457
1458		// A statement that does have dec_key = dest
1459		let mut s_with_key = statement(1, 1, None, 0);
1460		let plain1 = b"The most valuable secret".to_vec();
1461		s_with_key.encrypt(&plain1, &public1).unwrap();
1462
1463		// A statement with a different dec_key
1464		let mut s_other_key = statement(2, 2, None, 0);
1465		let plain2 = b"The second most valuable secret".to_vec();
1466		s_other_key.encrypt(&plain2, &public2).unwrap();
1467
1468		// Submit them all
1469		for s in [&s_with_key, &s_other_key] {
1470			store.submit(s.clone(), StatementSource::Network);
1471		}
1472
1473		// posted_stmt should only return the one with dec_key = dest
1474		let retrieved = store.posted_clear_stmt(&[], dest).unwrap();
1475		assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1476
1477		// We expect: [ encoded Statement ] + [ the decrypted bytes ]
1478		let encoded_stmt = s_with_key.encode();
1479		let stmt_len = encoded_stmt.len();
1480
1481		// 1) statement is first
1482		assert_eq!(&retrieved[0][..stmt_len], &encoded_stmt[..]);
1483
1484		// 2) followed by the decrypted payload
1485		let trailing = &retrieved[0][stmt_len..];
1486		assert_eq!(trailing, &plain1[..]);
1487	}
1488
1489	#[test]
1490	fn posted_clear_returns_plain_data_for_dest_and_topics() {
1491		let (store, _tmp) = test_store();
1492
1493		// prepare two key-pairs
1494		let public_dest = store
1495			.keystore
1496			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1497			.unwrap();
1498		let dest: [u8; 32] = public_dest.into();
1499
1500		let public_other = store
1501			.keystore
1502			.ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1503			.unwrap();
1504
1505		// statement that SHOULD be returned (matches dest & topic 42)
1506		let mut s_good = statement(1, 1, None, 0);
1507		let plaintext_good = b"The most valuable secret".to_vec();
1508		s_good.encrypt(&plaintext_good, &public_dest).unwrap();
1509		s_good.set_topic(0, topic(42));
1510
1511		// statement that should NOT be returned (same dest but different topic)
1512		let mut s_wrong_topic = statement(2, 2, None, 0);
1513		s_wrong_topic.encrypt(b"Wrong topic", &public_dest).unwrap();
1514		s_wrong_topic.set_topic(0, topic(99));
1515
1516		// statement that should NOT be returned (different dest)
1517		let mut s_other_dest = statement(3, 3, None, 0);
1518		s_other_dest.encrypt(b"Other dest", &public_other).unwrap();
1519		s_other_dest.set_topic(0, topic(42));
1520
1521		// submit all
1522		for s in [&s_good, &s_wrong_topic, &s_other_dest] {
1523			store.submit(s.clone(), StatementSource::Network);
1524		}
1525
1526		// call posted_clear with the topic filter and dest
1527		let retrieved = store.posted_clear(&[topic(42)], dest).unwrap();
1528
1529		// exactly one element, equal to the expected plaintext
1530		assert_eq!(retrieved, vec![plaintext_good]);
1531	}
1532
1533	#[test]
1534	fn remove_by_covers_various_situations() {
1535		use sp_statement_store::{StatementSource, StatementStore, SubmitResult};
1536
1537		// Use a fresh store and fixed time so we can control purging.
1538		let (mut store, _temp) = test_store();
1539		store.set_time(0);
1540
1541		// Reuse helpers from this module.
1542		let t42 = topic(42);
1543		let k7 = dec_key(7);
1544
1545		// Account A = 4 (has per-account limits (4, 1000) in the mock runtime)
1546		// - Mix of topic, decryption-key and channel to exercise every index.
1547		let mut s_a1 = statement(4, 10, Some(100), 100);
1548		s_a1.set_topic(0, t42);
1549		let h_a1 = s_a1.hash();
1550
1551		let mut s_a2 = statement(4, 20, Some(200), 150);
1552		s_a2.set_decryption_key(k7);
1553		let h_a2 = s_a2.hash();
1554
1555		let s_a3 = statement(4, 30, None, 50);
1556		let h_a3 = s_a3.hash();
1557
1558		// Account B = 3 (control group that must remain untouched).
1559		let s_b1 = statement(3, 10, None, 100);
1560		let h_b1 = s_b1.hash();
1561
1562		let mut s_b2 = statement(3, 15, Some(300), 100);
1563		s_b2.set_topic(0, t42);
1564		s_b2.set_decryption_key(k7);
1565		let h_b2 = s_b2.hash();
1566
1567		// Submit all statements.
1568		for s in [&s_a1, &s_a2, &s_a3, &s_b1, &s_b2] {
1569			assert!(matches!(
1570				store.submit(s.clone(), StatementSource::Network),
1571				SubmitResult::New(_)
1572			));
1573		}
1574
1575		// --- Pre-conditions: everything is indexed as expected.
1576		{
1577			let idx = store.index.read();
1578			assert_eq!(idx.entries.len(), 5, "all 5 should be present");
1579			assert!(idx.accounts.contains_key(&account(4)));
1580			assert!(idx.accounts.contains_key(&account(3)));
1581			assert_eq!(idx.total_size, 100 + 150 + 50 + 100 + 100);
1582
1583			// Topic and key sets contain both A & B entries.
1584			let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1585			assert!(set_t.contains(&h_a1) && set_t.contains(&h_b2));
1586
1587			let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1588			assert!(set_k.contains(&h_a2) && set_k.contains(&h_b2));
1589		}
1590
1591		// --- Action: remove all statements by Account A.
1592		store.remove_by(account(4)).expect("remove_by should succeed");
1593
1594		// --- Post-conditions: A's statements are gone and marked expired; B's remain.
1595		{
1596			// A's statements removed from DB view.
1597			for h in [h_a1, h_a2, h_a3] {
1598				assert!(store.statement(&h).unwrap().is_none(), "A's statement should be removed");
1599			}
1600
1601			// B's statements still present.
1602			for h in [h_b1, h_b2] {
1603				assert!(store.statement(&h).unwrap().is_some(), "B's statement should remain");
1604			}
1605
1606			let idx = store.index.read();
1607
1608			// Account map updated.
1609			assert!(!idx.accounts.contains_key(&account(4)), "Account A must be gone");
1610			assert!(idx.accounts.contains_key(&account(3)), "Account B must remain");
1611
1612			// Removed statements are marked expired.
1613			assert!(idx.expired.contains_key(&h_a1));
1614			assert!(idx.expired.contains_key(&h_a2));
1615			assert!(idx.expired.contains_key(&h_a3));
1616			assert_eq!(idx.expired.len(), 3);
1617
1618			// Entry count & total_size reflect only B's data.
1619			assert_eq!(idx.entries.len(), 2);
1620			assert_eq!(idx.total_size, 100 + 100);
1621
1622			// Topic index: only B2 remains for topic 42.
1623			let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1624			assert!(set_t.contains(&h_b2));
1625			assert!(!set_t.contains(&h_a1));
1626
1627			// Decryption-key index: only B2 remains for key 7.
1628			let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1629			assert!(set_k.contains(&h_b2));
1630			assert!(!set_k.contains(&h_a2));
1631		}
1632
1633		// --- Idempotency: removing again is a no-op and should not error.
1634		store.remove_by(account(4)).expect("second remove_by should be a no-op");
1635
1636		// --- Purge: advance time beyond TTL and run maintenance; expired entries disappear.
1637		let purge_after = store.index.read().options.purge_after_sec;
1638		store.set_time(purge_after + 1);
1639		store.maintain();
1640		assert_eq!(store.index.read().expired.len(), 0, "expired entries should be purged");
1641
1642		// --- Reuse: Account A can submit again after purge.
1643		let s_new = statement(4, 40, None, 10);
1644		assert!(matches!(store.submit(s_new, StatementSource::Network), SubmitResult::New(_)));
1645	}
1646}