Skip to main content

pezsc_statement_store/
lib.rs

1// This file is part of Bizinikiwi.
2
3// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Disk-backed statement store.
20//!
21//! This module contains an implementation of `pezsp_statement_store::StatementStore` which is
22//! backed by a database.
23//!
24//! Constraint management.
25//!
26//! Each time a new statement is inserted into the store, it is first validated with the runtime
27//! Validation function computes `global_priority`, 'max_count' and `max_size` for a statement.
28//! The following constraints are then checked:
29//! * For a given account id, there may be at most `max_count` statements with `max_size` total data
30//!   size. To satisfy this, statements for this account ID are removed from the store starting with
31//!   the lowest priority until a constraint is satisfied.
32//! * There may not be more than `MAX_TOTAL_STATEMENTS` total statements with `MAX_TOTAL_SIZE` size.
33//!   To satisfy this, statements are removed from the store starting with the lowest
34//!   `global_priority` until a constraint is satisfied.
35//!
36//! When a new statement is inserted that would not satisfy constraints in the first place, no
37//! statements are deleted and `Ignored` result is returned.
38//! The order in which statements with the same priority are deleted is unspecified.
39//!
40//! Statement expiration.
41//!
42//! Each time a statement is removed from the store (Either evicted by higher priority statement or
43//! explicitly with the `remove` function) the statement is marked as expired. Expired statements
44//! can't be added to the store for `Options::purge_after_sec` seconds. This is to prevent old
45//! statements from being propagated on the network.
46
47#![warn(missing_docs)]
48#![warn(unused_extern_crates)]
49
50mod metrics;
51
52pub use pezsp_statement_store::{Error, StatementStore, MAX_TOPICS};
53
54use metrics::MetricsLink as PrometheusMetrics;
55use parking_lot::RwLock;
56use pezsc_keystore::LocalKeystore;
57use pezsp_api::ProvideRuntimeApi;
58use pezsp_blockchain::HeaderBackend;
59use pezsp_core::{
60	crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode,
61};
62use pezsp_runtime::traits::Block as BlockT;
63use pezsp_statement_store::{
64	runtime_api::{
65		InvalidStatement, StatementSource, StatementStoreExt, ValidStatement, ValidateStatement,
66	},
67	AccountId, BlockHash, Channel, DecryptionKey, Hash, NetworkPriority, Proof, Result, Statement,
68	SubmitResult, Topic,
69};
70use prometheus_endpoint::Registry as PrometheusRegistry;
71use std::{
72	collections::{BTreeMap, HashMap, HashSet},
73	sync::Arc,
74};
75
76const KEY_VERSION: &[u8] = b"version".as_slice();
77const CURRENT_VERSION: u32 = 1;
78
79const LOG_TARGET: &str = "statement-store";
80
81/// The amount of time an expired statement is kept before it is removed from the store entirely.
82pub const DEFAULT_PURGE_AFTER_SEC: u64 = 2 * 24 * 60 * 60; //48h
83/// The maximum number of statements the statement store can hold.
84pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; // ~4 million
85/// The maximum amount of data the statement store can hold, regardless of the number of
86/// statements from which the data originates.
87pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; // 2GiB
88/// The maximum size of a single statement in bytes.
89/// Accounts for the 1-byte vector length prefix when statements are gossiped as `Vec<Statement>`.
90pub const MAX_STATEMENT_SIZE: usize =
91	pezsc_network_statement::config::MAX_STATEMENT_NOTIFICATION_SIZE as usize - 1;
92
93const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(30);
94
95mod col {
96	pub const META: u8 = 0;
97	pub const STATEMENTS: u8 = 1;
98	pub const EXPIRED: u8 = 2;
99
100	pub const COUNT: u8 = 3;
101}
102
103#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)]
104struct Priority(u32);
105
106#[derive(PartialEq, Eq)]
107struct PriorityKey {
108	hash: Hash,
109	priority: Priority,
110}
111
112impl PartialOrd for PriorityKey {
113	fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
114		Some(self.cmp(other))
115	}
116}
117
118impl Ord for PriorityKey {
119	fn cmp(&self, other: &Self) -> std::cmp::Ordering {
120		self.priority.cmp(&other.priority).then_with(|| self.hash.cmp(&other.hash))
121	}
122}
123
124#[derive(PartialEq, Eq)]
125struct ChannelEntry {
126	hash: Hash,
127	priority: Priority,
128}
129
130#[derive(Default)]
131struct StatementsForAccount {
132	// Statements ordered by priority.
133	by_priority: BTreeMap<PriorityKey, (Option<Channel>, usize)>,
134	// Channel to statement map. Only one statement per channel is allowed.
135	channels: HashMap<Channel, ChannelEntry>,
136	// Sum of all `Data` field sizes.
137	data_size: usize,
138}
139
140/// Store configuration
141pub struct Options {
142	/// Maximum statement allowed in the store. Once this limit is reached lower-priority
143	/// statements may be evicted.
144	max_total_statements: usize,
145	/// Maximum total data size allowed in the store. Once this limit is reached lower-priority
146	/// statements may be evicted.
147	max_total_size: usize,
148	/// Number of seconds for which removed statements won't be allowed to be added back in.
149	purge_after_sec: u64,
150}
151
152impl Default for Options {
153	fn default() -> Self {
154		Options {
155			max_total_statements: DEFAULT_MAX_TOTAL_STATEMENTS,
156			max_total_size: DEFAULT_MAX_TOTAL_SIZE,
157			purge_after_sec: DEFAULT_PURGE_AFTER_SEC,
158		}
159	}
160}
161
162#[derive(Default)]
163struct Index {
164	recent: HashSet<Hash>,
165	by_topic: HashMap<Topic, HashSet<Hash>>,
166	by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
167	topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
168	entries: HashMap<Hash, (AccountId, Priority, usize)>,
169	expired: HashMap<Hash, u64>, // Value is expiration timestamp.
170	accounts: HashMap<AccountId, StatementsForAccount>,
171	options: Options,
172	total_size: usize,
173}
174
175struct ClientWrapper<Block, Client> {
176	client: Arc<Client>,
177	_block: std::marker::PhantomData<Block>,
178}
179
180impl<Block, Client> ClientWrapper<Block, Client>
181where
182	Block: BlockT,
183	Block::Hash: From<BlockHash>,
184	Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
185	Client::Api: ValidateStatement<Block>,
186{
187	fn validate_statement(
188		&self,
189		block: Option<BlockHash>,
190		source: StatementSource,
191		statement: Statement,
192	) -> std::result::Result<ValidStatement, InvalidStatement> {
193		let api = self.client.runtime_api();
194		let block = block.map(Into::into).unwrap_or_else(|| {
195			// Validate against the finalized state.
196			self.client.info().finalized_hash
197		});
198		api.validate_statement(block, source, statement)
199			.map_err(|_| InvalidStatement::InternalError)?
200	}
201}
202
203/// Statement store.
204pub struct Store {
205	db: parity_db::Db,
206	index: RwLock<Index>,
207	validate_fn: Box<
208		dyn Fn(
209				Option<BlockHash>,
210				StatementSource,
211				Statement,
212			) -> std::result::Result<ValidStatement, InvalidStatement>
213			+ Send
214			+ Sync,
215	>,
216	keystore: Arc<LocalKeystore>,
217	// Used for testing
218	time_override: Option<u64>,
219	metrics: PrometheusMetrics,
220}
221
222enum IndexQuery {
223	Unknown,
224	Exists,
225	Expired,
226}
227
228enum MaybeInserted {
229	Inserted(HashSet<Hash>),
230	Ignored,
231}
232
233impl Index {
234	fn new(options: Options) -> Index {
235		Index { options, ..Default::default() }
236	}
237
238	fn insert_new(&mut self, hash: Hash, account: AccountId, statement: &Statement) {
239		let mut all_topics = [None; MAX_TOPICS];
240		let mut nt = 0;
241		while let Some(t) = statement.topic(nt) {
242			self.by_topic.entry(t).or_default().insert(hash);
243			all_topics[nt] = Some(t);
244			nt += 1;
245		}
246		let key = statement.decryption_key();
247		self.by_dec_key.entry(key).or_default().insert(hash);
248		if nt > 0 || key.is_some() {
249			self.topics_and_keys.insert(hash, (all_topics, key));
250		}
251		let priority = Priority(statement.priority().unwrap_or(0));
252		self.entries.insert(hash, (account, priority, statement.data_len()));
253		self.recent.insert(hash);
254		self.total_size += statement.data_len();
255		let account_info = self.accounts.entry(account).or_default();
256		account_info.data_size += statement.data_len();
257		if let Some(channel) = statement.channel() {
258			account_info.channels.insert(channel, ChannelEntry { hash, priority });
259		}
260		account_info
261			.by_priority
262			.insert(PriorityKey { hash, priority }, (statement.channel(), statement.data_len()));
263	}
264
265	fn query(&self, hash: &Hash) -> IndexQuery {
266		if self.entries.contains_key(hash) {
267			return IndexQuery::Exists;
268		}
269		if self.expired.contains_key(hash) {
270			return IndexQuery::Expired;
271		}
272		IndexQuery::Unknown
273	}
274
275	fn insert_expired(&mut self, hash: Hash, timestamp: u64) {
276		self.expired.insert(hash, timestamp);
277	}
278
279	fn iterate_with(
280		&self,
281		key: Option<DecryptionKey>,
282		match_all_topics: &[Topic],
283		mut f: impl FnMut(&Hash) -> Result<()>,
284	) -> Result<()> {
285		let empty = HashSet::new();
286		let mut sets: [&HashSet<Hash>; MAX_TOPICS + 1] = [&empty; MAX_TOPICS + 1];
287		if match_all_topics.len() > MAX_TOPICS {
288			return Ok(());
289		}
290		let key_set = self.by_dec_key.get(&key);
291		if key_set.map_or(0, |s| s.len()) == 0 {
292			// Key does not exist in the index.
293			return Ok(());
294		}
295		sets[0] = key_set.expect("Function returns if key_set is None");
296		for (i, t) in match_all_topics.iter().enumerate() {
297			let set = self.by_topic.get(t);
298			if set.map_or(0, |s| s.len()) == 0 {
299				// At least one of the match_all_topics does not exist in the index.
300				return Ok(());
301			}
302			sets[i + 1] = set.expect("Function returns if set is None");
303		}
304		let sets = &mut sets[0..match_all_topics.len() + 1];
305		// Start with the smallest topic set or the key set.
306		sets.sort_by_key(|s| s.len());
307		for item in sets[0] {
308			if sets[1..].iter().all(|set| set.contains(item)) {
309				log::trace!(
310					target: LOG_TARGET,
311					"Iterating by topic/key: statement {:?}",
312					HexDisplay::from(item)
313				);
314				f(item)?
315			}
316		}
317		Ok(())
318	}
319
320	fn maintain(&mut self, current_time: u64) -> Vec<Hash> {
321		// Purge previously expired messages.
322		let mut purged = Vec::new();
323		self.expired.retain(|hash, timestamp| {
324			if *timestamp + self.options.purge_after_sec <= current_time {
325				purged.push(*hash);
326				log::trace!(target: LOG_TARGET, "Purged statement {:?}", HexDisplay::from(hash));
327				false
328			} else {
329				true
330			}
331		});
332		purged
333	}
334
335	fn take_recent(&mut self) -> HashSet<Hash> {
336		std::mem::take(&mut self.recent)
337	}
338
339	fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
340		if let Some((account, priority, len)) = self.entries.remove(hash) {
341			self.total_size -= len;
342			if let Some((topics, key)) = self.topics_and_keys.remove(hash) {
343				for t in topics.into_iter().flatten() {
344					if let std::collections::hash_map::Entry::Occupied(mut set) =
345						self.by_topic.entry(t)
346					{
347						set.get_mut().remove(hash);
348						if set.get().is_empty() {
349							set.remove_entry();
350						}
351					}
352				}
353				if let std::collections::hash_map::Entry::Occupied(mut set) =
354					self.by_dec_key.entry(key)
355				{
356					set.get_mut().remove(hash);
357					if set.get().is_empty() {
358						set.remove_entry();
359					}
360				}
361			}
362			let _ = self.recent.remove(hash);
363			self.expired.insert(*hash, current_time);
364			if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
365				self.accounts.entry(account)
366			{
367				let key = PriorityKey { hash: *hash, priority };
368				if let Some((channel, len)) = account_rec.get_mut().by_priority.remove(&key) {
369					account_rec.get_mut().data_size -= len;
370					if let Some(channel) = channel {
371						account_rec.get_mut().channels.remove(&channel);
372					}
373				}
374				if account_rec.get().by_priority.is_empty() {
375					account_rec.remove_entry();
376				}
377			}
378			log::trace!(target: LOG_TARGET, "Expired statement {:?}", HexDisplay::from(hash));
379			true
380		} else {
381			false
382		}
383	}
384
385	fn insert(
386		&mut self,
387		hash: Hash,
388		statement: &Statement,
389		account: &AccountId,
390		validation: &ValidStatement,
391		current_time: u64,
392	) -> MaybeInserted {
393		let statement_len = statement.data_len();
394		if statement_len > validation.max_size as usize {
395			log::debug!(
396				target: LOG_TARGET,
397				"Ignored oversize message: {:?} ({} bytes)",
398				HexDisplay::from(&hash),
399				statement_len,
400			);
401			return MaybeInserted::Ignored;
402		}
403
404		let mut evicted = HashSet::new();
405		let mut would_free_size = 0;
406		let priority = Priority(statement.priority().unwrap_or(0));
407		let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize);
408		// It may happen that we can't delete enough lower priority messages
409		// to satisfy size constraints. We check for that before deleting anything,
410		// taking into account channel message replacement.
411		if let Some(account_rec) = self.accounts.get(account) {
412			if let Some(channel) = statement.channel() {
413				if let Some(channel_record) = account_rec.channels.get(&channel) {
414					if priority <= channel_record.priority {
415						// Trying to replace channel message with lower priority
416						log::debug!(
417							target: LOG_TARGET,
418							"Ignored lower priority channel message: {:?} {:?} <= {:?}",
419							HexDisplay::from(&hash),
420							priority,
421							channel_record.priority,
422						);
423						return MaybeInserted::Ignored;
424					} else {
425						// Would replace channel message. Still need to check for size constraints
426						// below.
427						log::debug!(
428							target: LOG_TARGET,
429							"Replacing higher priority channel message: {:?} ({:?}) > {:?} ({:?})",
430							HexDisplay::from(&hash),
431							priority,
432							HexDisplay::from(&channel_record.hash),
433							channel_record.priority,
434						);
435						let key = PriorityKey {
436							hash: channel_record.hash,
437							priority: channel_record.priority,
438						};
439						if let Some((_channel, len)) = account_rec.by_priority.get(&key) {
440							would_free_size += *len;
441							evicted.insert(channel_record.hash);
442						}
443					}
444				}
445			}
446			// Check if we can evict enough lower priority statements to satisfy constraints
447			for (entry, (_, len)) in account_rec.by_priority.iter() {
448				if (account_rec.data_size - would_free_size + statement_len <= max_size)
449					&& account_rec.by_priority.len() + 1 - evicted.len() <= max_count
450				{
451					// Satisfied
452					break;
453				}
454				if evicted.contains(&entry.hash) {
455					// Already accounted for above
456					continue;
457				}
458				if entry.priority >= priority {
459					log::debug!(
460						target: LOG_TARGET,
461						"Ignored message due to constraints {:?} {:?} < {:?}",
462						HexDisplay::from(&hash),
463						priority,
464						entry.priority,
465					);
466					return MaybeInserted::Ignored;
467				}
468				evicted.insert(entry.hash);
469				would_free_size += len;
470			}
471		}
472		// Now check global constraints as well.
473		if !((self.total_size - would_free_size + statement_len <= self.options.max_total_size)
474			&& self.entries.len() + 1 - evicted.len() <= self.options.max_total_statements)
475		{
476			log::debug!(
477				target: LOG_TARGET,
478				"Ignored statement {} because the store is full (size={}, count={})",
479				HexDisplay::from(&hash),
480				self.total_size,
481				self.entries.len(),
482			);
483			return MaybeInserted::Ignored;
484		}
485
486		for h in &evicted {
487			self.make_expired(h, current_time);
488		}
489		self.insert_new(hash, *account, statement);
490		MaybeInserted::Inserted(evicted)
491	}
492}
493
494impl Store {
495	/// Create a new shared store instance. There should only be one per process.
496	/// `path` will be used to open a statement database or create a new one if it does not exist.
497	pub fn new_shared<Block, Client>(
498		path: &std::path::Path,
499		options: Options,
500		client: Arc<Client>,
501		keystore: Arc<LocalKeystore>,
502		prometheus: Option<&PrometheusRegistry>,
503		task_spawner: &dyn SpawnNamed,
504	) -> Result<Arc<Store>>
505	where
506		Block: BlockT,
507		Block::Hash: From<BlockHash>,
508		Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
509		Client::Api: ValidateStatement<Block>,
510	{
511		let store = Arc::new(Self::new(path, options, client, keystore, prometheus)?);
512
513		// Perform periodic statement store maintenance
514		let worker_store = store.clone();
515		task_spawner.spawn(
516			"statement-store-maintenance",
517			Some("statement-store"),
518			Box::pin(async move {
519				let mut interval = tokio::time::interval(MAINTENANCE_PERIOD);
520				loop {
521					interval.tick().await;
522					worker_store.maintain();
523				}
524			}),
525		);
526
527		Ok(store)
528	}
529
530	/// Create a new instance.
531	/// `path` will be used to open a statement database or create a new one if it does not exist.
532	#[doc(hidden)]
533	pub fn new<Block, Client>(
534		path: &std::path::Path,
535		options: Options,
536		client: Arc<Client>,
537		keystore: Arc<LocalKeystore>,
538		prometheus: Option<&PrometheusRegistry>,
539	) -> Result<Store>
540	where
541		Block: BlockT,
542		Block::Hash: From<BlockHash>,
543		Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
544		Client::Api: ValidateStatement<Block>,
545	{
546		let mut path: std::path::PathBuf = path.into();
547		path.push("statements");
548
549		let mut config = parity_db::Options::with_columns(&path, col::COUNT);
550
551		let statement_col = &mut config.columns[col::STATEMENTS as usize];
552		statement_col.ref_counted = false;
553		statement_col.preimage = true;
554		statement_col.uniform = true;
555		let db = parity_db::Db::open_or_create(&config).map_err(|e| Error::Db(e.to_string()))?;
556		match db.get(col::META, &KEY_VERSION).map_err(|e| Error::Db(e.to_string()))? {
557			Some(version) => {
558				let version = u32::from_le_bytes(
559					version
560						.try_into()
561						.map_err(|_| Error::Db("Error reading database version".into()))?,
562				);
563				if version != CURRENT_VERSION {
564					return Err(Error::Db(format!("Unsupported database version: {version}")));
565				}
566			},
567			None => {
568				db.commit([(
569					col::META,
570					KEY_VERSION.to_vec(),
571					Some(CURRENT_VERSION.to_le_bytes().to_vec()),
572				)])
573				.map_err(|e| Error::Db(e.to_string()))?;
574			},
575		}
576
577		let validator = ClientWrapper { client, _block: Default::default() };
578		let validate_fn = Box::new(move |block, source, statement| {
579			validator.validate_statement(block, source, statement)
580		});
581
582		let store = Store {
583			db,
584			index: RwLock::new(Index::new(options)),
585			validate_fn,
586			keystore,
587			time_override: None,
588			metrics: PrometheusMetrics::new(prometheus),
589		};
590		store.populate()?;
591		Ok(store)
592	}
593
594	/// Create memory index from the data.
595	// This may be moved to a background thread if it slows startup too much.
596	// This function should only be used on startup. There should be no other DB operations when
597	// iterating the index.
598	fn populate(&self) -> Result<()> {
599		{
600			let mut index = self.index.write();
601			self.db
602				.iter_column_while(col::STATEMENTS, |item| {
603					let statement = item.value;
604					if let Ok(statement) = Statement::decode(&mut statement.as_slice()) {
605						let hash = statement.hash();
606						log::trace!(
607							target: LOG_TARGET,
608							"Statement loaded {:?}",
609							HexDisplay::from(&hash)
610						);
611						if let Some(account_id) = statement.account_id() {
612							index.insert_new(hash, account_id, &statement);
613						} else {
614							log::debug!(
615								target: LOG_TARGET,
616								"Error decoding statement loaded from the DB: {:?}",
617								HexDisplay::from(&hash)
618							);
619						}
620					}
621					true
622				})
623				.map_err(|e| Error::Db(e.to_string()))?;
624			self.db
625				.iter_column_while(col::EXPIRED, |item| {
626					let expired_info = item.value;
627					if let Ok((hash, timestamp)) =
628						<(Hash, u64)>::decode(&mut expired_info.as_slice())
629					{
630						log::trace!(
631							target: LOG_TARGET,
632							"Statement loaded (expired): {:?}",
633							HexDisplay::from(&hash)
634						);
635						index.insert_expired(hash, timestamp);
636					}
637					true
638				})
639				.map_err(|e| Error::Db(e.to_string()))?;
640		}
641
642		self.maintain();
643		Ok(())
644	}
645
646	fn collect_statements<R>(
647		&self,
648		key: Option<DecryptionKey>,
649		match_all_topics: &[Topic],
650		mut f: impl FnMut(Statement) -> Option<R>,
651	) -> Result<Vec<R>> {
652		let mut result = Vec::new();
653		let index = self.index.read();
654		index.iterate_with(key, match_all_topics, |hash| {
655			match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? {
656				Some(entry) => {
657					if let Ok(statement) = Statement::decode(&mut entry.as_slice()) {
658						if let Some(data) = f(statement) {
659							result.push(data);
660						}
661					} else {
662						// DB inconsistency
663						log::warn!(
664							target: LOG_TARGET,
665							"Corrupt statement {:?}",
666							HexDisplay::from(hash)
667						);
668					}
669				},
670				None => {
671					// DB inconsistency
672					log::warn!(
673						target: LOG_TARGET,
674						"Missing statement {:?}",
675						HexDisplay::from(hash)
676					);
677				},
678			}
679			Ok(())
680		})?;
681		Ok(result)
682	}
683
684	/// Perform periodic store maintenance
685	pub fn maintain(&self) {
686		log::trace!(target: LOG_TARGET, "Started store maintenance");
687		let (deleted, active_count, expired_count): (Vec<_>, usize, usize) = {
688			let mut index = self.index.write();
689			let deleted = index.maintain(self.timestamp());
690			(deleted, index.entries.len(), index.expired.len())
691		};
692		let deleted: Vec<_> =
693			deleted.into_iter().map(|hash| (col::EXPIRED, hash.to_vec(), None)).collect();
694		let deleted_count = deleted.len() as u64;
695		if let Err(e) = self.db.commit(deleted) {
696			log::warn!(target: LOG_TARGET, "Error writing to the statement database: {:?}", e);
697		} else {
698			self.metrics.report(|metrics| metrics.statements_pruned.inc_by(deleted_count));
699		}
700		log::trace!(
701			target: LOG_TARGET,
702			"Completed store maintenance. Purged: {}, Active: {}, Expired: {}",
703			deleted_count,
704			active_count,
705			expired_count
706		);
707	}
708
709	fn timestamp(&self) -> u64 {
710		self.time_override.unwrap_or_else(|| {
711			std::time::SystemTime::now()
712				.duration_since(std::time::UNIX_EPOCH)
713				.unwrap_or_default()
714				.as_secs()
715		})
716	}
717
718	#[cfg(test)]
719	fn set_time(&mut self, time: u64) {
720		self.time_override = Some(time);
721	}
722
723	/// Returns `self` as [`StatementStoreExt`].
724	pub fn as_statement_store_ext(self: Arc<Self>) -> StatementStoreExt {
725		StatementStoreExt::new(self)
726	}
727
728	/// Return information of all known statements whose decryption key is identified as
729	/// `dest`. The key must be available to the client.
730	fn posted_clear_inner<R>(
731		&self,
732		match_all_topics: &[Topic],
733		dest: [u8; 32],
734		// Map the statement and the decrypted data to the desired result.
735		mut map_f: impl FnMut(Statement, Vec<u8>) -> R,
736	) -> Result<Vec<R>> {
737		self.collect_statements(Some(dest), match_all_topics, |statement| {
738			if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) {
739				let public: pezsp_core::ed25519::Public = UncheckedFrom::unchecked_from(key);
740				let public: pezsp_statement_store::ed25519::Public = public.into();
741				match self.keystore.key_pair::<pezsp_statement_store::ed25519::Pair>(&public) {
742					Err(e) => {
743						log::debug!(
744							target: LOG_TARGET,
745							"Keystore error: {:?}, for statement {:?}",
746							e,
747							HexDisplay::from(&statement.hash())
748						);
749						None
750					},
751					Ok(None) => {
752						log::debug!(
753							target: LOG_TARGET,
754							"Keystore is missing key for statement {:?}",
755							HexDisplay::from(&statement.hash())
756						);
757						None
758					},
759					Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) {
760						Ok(r) => r.map(|data| map_f(statement, data)),
761						Err(e) => {
762							log::debug!(
763								target: LOG_TARGET,
764								"Decryption error: {:?}, for statement {:?}",
765								e,
766								HexDisplay::from(&statement.hash())
767							);
768							None
769						},
770					},
771				}
772			} else {
773				None
774			}
775		})
776	}
777}
778
779impl StatementStore for Store {
780	/// Return all statements.
781	fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
782		let index = self.index.read();
783		let mut result = Vec::with_capacity(index.entries.len());
784		for hash in index.entries.keys().cloned() {
785			let Some(encoded) =
786				self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
787			else {
788				continue;
789			};
790			if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
791				result.push((hash, statement));
792			}
793		}
794		Ok(result)
795	}
796
797	fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>> {
798		let mut index = self.index.write();
799		let recent = index.take_recent();
800		let mut result = Vec::with_capacity(recent.len());
801		for hash in recent {
802			let Some(encoded) =
803				self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
804			else {
805				continue;
806			};
807			if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
808				result.push((hash, statement));
809			}
810		}
811		Ok(result)
812	}
813
814	/// Returns a statement by hash.
815	fn statement(&self, hash: &Hash) -> Result<Option<Statement>> {
816		Ok(
817			match self
818				.db
819				.get(col::STATEMENTS, hash.as_slice())
820				.map_err(|e| Error::Db(e.to_string()))?
821			{
822				Some(entry) => {
823					log::trace!(
824						target: LOG_TARGET,
825						"Queried statement {:?}",
826						HexDisplay::from(hash)
827					);
828					Some(
829						Statement::decode(&mut entry.as_slice())
830							.map_err(|e| Error::Decode(e.to_string()))?,
831					)
832				},
833				None => {
834					log::trace!(
835						target: LOG_TARGET,
836						"Queried missing statement {:?}",
837						HexDisplay::from(hash)
838					);
839					None
840				},
841			},
842		)
843	}
844
845	fn has_statement(&self, hash: &Hash) -> bool {
846		self.index.read().entries.contains_key(hash)
847	}
848
849	/// Return the data of all known statements which include all topics and have no `DecryptionKey`
850	/// field.
851	fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
852		self.collect_statements(None, match_all_topics, |statement| statement.into_data())
853	}
854
855	/// Return the data of all known statements whose decryption key is identified as `dest` (this
856	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
857	/// private key for symmetric ciphers).
858	fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
859		self.collect_statements(Some(dest), match_all_topics, |statement| statement.into_data())
860	}
861
862	/// Return the decrypted data of all known statements whose decryption key is identified as
863	/// `dest`. The key must be available to the client.
864	fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
865		self.posted_clear_inner(match_all_topics, dest, |_statement, data| data)
866	}
867
868	/// Return all known statements which include all topics and have no `DecryptionKey`
869	/// field.
870	fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
871		self.collect_statements(None, match_all_topics, |statement| Some(statement.encode()))
872	}
873
874	/// Return all known statements whose decryption key is identified as `dest` (this
875	/// will generally be the public key or a hash thereof for symmetric ciphers, or a hash of the
876	/// private key for symmetric ciphers).
877	fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
878		self.collect_statements(Some(dest), match_all_topics, |statement| Some(statement.encode()))
879	}
880
881	/// Return the statement and the decrypted data of all known statements whose decryption key is
882	/// identified as `dest`. The key must be available to the client.
883	fn posted_clear_stmt(
884		&self,
885		match_all_topics: &[Topic],
886		dest: [u8; 32],
887	) -> Result<Vec<Vec<u8>>> {
888		self.posted_clear_inner(match_all_topics, dest, |statement, data| {
889			let mut res = Vec::with_capacity(statement.size_hint() + data.len());
890			statement.encode_to(&mut res);
891			res.extend_from_slice(&data);
892			res
893		})
894	}
895
896	/// Submit a statement to the store. Validates the statement and returns validation result.
897	fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult {
898		let hash = statement.hash();
899		let encoded_size = statement.encoded_size();
900		if encoded_size > MAX_STATEMENT_SIZE {
901			log::debug!(
902				target: LOG_TARGET,
903				"Statement is too big for propogation: {:?} ({}/{} bytes)",
904				HexDisplay::from(&hash),
905				statement.encoded_size(),
906				MAX_STATEMENT_SIZE
907			);
908			return SubmitResult::Ignored;
909		}
910
911		match self.index.read().query(&hash) {
912			IndexQuery::Expired => {
913				if !source.can_be_resubmitted() {
914					return SubmitResult::KnownExpired;
915				}
916			},
917			IndexQuery::Exists => {
918				if !source.can_be_resubmitted() {
919					return SubmitResult::Known;
920				}
921			},
922			IndexQuery::Unknown => {},
923		}
924
925		let Some(account_id) = statement.account_id() else {
926			log::debug!(
927				target: LOG_TARGET,
928				"Statement validation failed: Missing proof ({:?})",
929				HexDisplay::from(&hash),
930			);
931			self.metrics.report(|metrics| metrics.validations_invalid.inc());
932			return SubmitResult::Bad("No statement proof");
933		};
934
935		// Validate.
936		let at_block = if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
937			Some(*block_hash)
938		} else {
939			None
940		};
941		let validation_result = (self.validate_fn)(at_block, source, statement.clone());
942		let validation = match validation_result {
943			Ok(validation) => validation,
944			Err(InvalidStatement::BadProof) => {
945				log::debug!(
946					target: LOG_TARGET,
947					"Statement validation failed: BadProof, {:?}",
948					HexDisplay::from(&hash),
949				);
950				self.metrics.report(|metrics| metrics.validations_invalid.inc());
951				return SubmitResult::Bad("Bad statement proof");
952			},
953			Err(InvalidStatement::NoProof) => {
954				log::debug!(
955					target: LOG_TARGET,
956					"Statement validation failed: NoProof, {:?}",
957					HexDisplay::from(&hash),
958				);
959				self.metrics.report(|metrics| metrics.validations_invalid.inc());
960				return SubmitResult::Bad("Missing statement proof");
961			},
962			Err(InvalidStatement::InternalError) => {
963				return SubmitResult::InternalError(Error::Runtime)
964			},
965		};
966
967		let current_time = self.timestamp();
968		let mut commit = Vec::new();
969		{
970			let mut index = self.index.write();
971
972			let evicted =
973				match index.insert(hash, &statement, &account_id, &validation, current_time) {
974					MaybeInserted::Ignored => return SubmitResult::Ignored,
975					MaybeInserted::Inserted(evicted) => evicted,
976				};
977
978			commit.push((col::STATEMENTS, hash.to_vec(), Some(statement.encode())));
979			for hash in evicted {
980				commit.push((col::STATEMENTS, hash.to_vec(), None));
981				commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
982			}
983			if let Err(e) = self.db.commit(commit) {
984				log::debug!(
985					target: LOG_TARGET,
986					"Statement validation failed: database error {}, {:?}",
987					e,
988					statement
989				);
990				return SubmitResult::InternalError(Error::Db(e.to_string()));
991			}
992		} // Release index lock
993		self.metrics.report(|metrics| metrics.submitted_statements.inc());
994		let network_priority = NetworkPriority::High;
995		log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash));
996		SubmitResult::New(network_priority)
997	}
998
999	/// Remove a statement by hash.
1000	fn remove(&self, hash: &Hash) -> Result<()> {
1001		let current_time = self.timestamp();
1002		{
1003			let mut index = self.index.write();
1004			if index.make_expired(hash, current_time) {
1005				let commit = [
1006					(col::STATEMENTS, hash.to_vec(), None),
1007					(col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())),
1008				];
1009				if let Err(e) = self.db.commit(commit) {
1010					log::debug!(
1011						target: LOG_TARGET,
1012						"Error removing statement: database error {}, {:?}",
1013						e,
1014						HexDisplay::from(hash),
1015					);
1016					return Err(Error::Db(e.to_string()));
1017				}
1018			}
1019		}
1020		Ok(())
1021	}
1022
1023	/// Remove all statements by an account.
1024	fn remove_by(&self, who: [u8; 32]) -> Result<()> {
1025		let mut index = self.index.write();
1026		let mut evicted = Vec::new();
1027		if let Some(account_rec) = index.accounts.get(&who) {
1028			evicted.extend(account_rec.by_priority.keys().map(|k| k.hash));
1029		}
1030
1031		let current_time = self.timestamp();
1032		let mut commit = Vec::new();
1033		for hash in evicted {
1034			index.make_expired(&hash, current_time);
1035			commit.push((col::STATEMENTS, hash.to_vec(), None));
1036			commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
1037		}
1038		self.db.commit(commit).map_err(|e| {
1039			log::debug!(
1040				target: LOG_TARGET,
1041				"Error removing statement: database error {}, remove by {:?}",
1042				e,
1043				HexDisplay::from(&who),
1044			);
1045
1046			Error::Db(e.to_string())
1047		})
1048	}
1049}
1050
1051#[cfg(test)]
1052mod tests {
1053	use crate::Store;
1054	use pezsc_keystore::Keystore;
1055	use pezsp_core::{Decode, Encode, Pair};
1056	use pezsp_statement_store::{
1057		runtime_api::{InvalidStatement, ValidStatement, ValidateStatement},
1058		AccountId, Channel, DecryptionKey, NetworkPriority, Proof, SignatureVerificationResult,
1059		Statement, StatementSource, StatementStore, SubmitResult, Topic,
1060	};
1061
1062	type Extrinsic = pezsp_runtime::OpaqueExtrinsic;
1063	type Hash = pezsp_core::H256;
1064	type Hashing = pezsp_runtime::traits::BlakeTwo256;
1065	type BlockNumber = u64;
1066	type Header = pezsp_runtime::generic::Header<BlockNumber, Hashing>;
1067	type Block = pezsp_runtime::generic::Block<Header, Extrinsic>;
1068
1069	const CORRECT_BLOCK_HASH: [u8; 32] = [1u8; 32];
1070
1071	#[derive(Clone)]
1072	pub(crate) struct TestClient;
1073
1074	pub(crate) struct RuntimeApi {
1075		_inner: TestClient,
1076	}
1077
1078	impl pezsp_api::ProvideRuntimeApi<Block> for TestClient {
1079		type Api = RuntimeApi;
1080		fn runtime_api(&self) -> pezsp_api::ApiRef<'_, Self::Api> {
1081			RuntimeApi { _inner: self.clone() }.into()
1082		}
1083	}
1084
1085	pezsp_api::mock_impl_runtime_apis! {
1086		impl ValidateStatement<Block> for RuntimeApi {
1087			fn validate_statement(
1088				_source: StatementSource,
1089				statement: Statement,
1090			) -> std::result::Result<ValidStatement, InvalidStatement> {
1091				use crate::tests::account;
1092				match statement.verify_signature() {
1093					SignatureVerificationResult::Valid(_) => Ok(ValidStatement{max_count: 100, max_size: 1000}),
1094					SignatureVerificationResult::Invalid => Err(InvalidStatement::BadProof),
1095					SignatureVerificationResult::NoSignature => {
1096						if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
1097							if block_hash == &CORRECT_BLOCK_HASH {
1098								let (max_count, max_size) = match statement.account_id() {
1099									Some(a) if a == account(1) => (1, 1000),
1100									Some(a) if a == account(2) => (2, 1000),
1101									Some(a) if a == account(3) => (3, 1000),
1102									Some(a) if a == account(4) => (4, 1000),
1103									Some(a) if a == account(42) => (42, 42 * crate::MAX_STATEMENT_SIZE as u32),
1104									_ => (2, 2000),
1105								};
1106								Ok(ValidStatement{ max_count, max_size })
1107							} else {
1108								Err(InvalidStatement::BadProof)
1109							}
1110						} else {
1111							Err(InvalidStatement::BadProof)
1112						}
1113					}
1114				}
1115			}
1116		}
1117	}
1118
1119	impl pezsp_blockchain::HeaderBackend<Block> for TestClient {
1120		fn header(&self, _hash: Hash) -> pezsp_blockchain::Result<Option<Header>> {
1121			unimplemented!()
1122		}
1123		fn info(&self) -> pezsp_blockchain::Info<Block> {
1124			pezsp_blockchain::Info {
1125				best_hash: CORRECT_BLOCK_HASH.into(),
1126				best_number: 0,
1127				genesis_hash: Default::default(),
1128				finalized_hash: CORRECT_BLOCK_HASH.into(),
1129				finalized_number: 1,
1130				finalized_state: None,
1131				number_leaves: 0,
1132				block_gap: None,
1133			}
1134		}
1135		fn status(&self, _hash: Hash) -> pezsp_blockchain::Result<pezsp_blockchain::BlockStatus> {
1136			unimplemented!()
1137		}
1138		fn number(&self, _hash: Hash) -> pezsp_blockchain::Result<Option<BlockNumber>> {
1139			unimplemented!()
1140		}
1141		fn hash(&self, _number: BlockNumber) -> pezsp_blockchain::Result<Option<Hash>> {
1142			unimplemented!()
1143		}
1144	}
1145
1146	fn test_store() -> (Store, tempfile::TempDir) {
1147		pezsp_tracing::init_for_tests();
1148		let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
1149
1150		let client = std::sync::Arc::new(TestClient);
1151		let mut path: std::path::PathBuf = temp_dir.path().into();
1152		path.push("db");
1153		let keystore = std::sync::Arc::new(pezsc_keystore::LocalKeystore::in_memory());
1154		let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1155		(store, temp_dir) // return order is important. Store must be dropped before TempDir
1156	}
1157
1158	fn signed_statement(data: u8) -> Statement {
1159		signed_statement_with_topics(data, &[], None)
1160	}
1161
1162	fn signed_statement_with_topics(
1163		data: u8,
1164		topics: &[Topic],
1165		dec_key: Option<DecryptionKey>,
1166	) -> Statement {
1167		let mut statement = Statement::new();
1168		statement.set_plain_data(vec![data]);
1169		for i in 0..topics.len() {
1170			statement.set_topic(i, topics[i]);
1171		}
1172		if let Some(key) = dec_key {
1173			statement.set_decryption_key(key);
1174		}
1175		let kp = pezsp_core::ed25519::Pair::from_string("//Alice", None).unwrap();
1176		statement.sign_ed25519_private(&kp);
1177		statement
1178	}
1179
1180	fn topic(data: u64) -> Topic {
1181		let mut topic: Topic = Default::default();
1182		topic[0..8].copy_from_slice(&data.to_le_bytes());
1183		topic
1184	}
1185
1186	fn dec_key(data: u64) -> DecryptionKey {
1187		let mut dec_key: DecryptionKey = Default::default();
1188		dec_key[0..8].copy_from_slice(&data.to_le_bytes());
1189		dec_key
1190	}
1191
1192	fn account(id: u64) -> AccountId {
1193		let mut account: AccountId = Default::default();
1194		account[0..8].copy_from_slice(&id.to_le_bytes());
1195		account
1196	}
1197
1198	fn channel(id: u64) -> Channel {
1199		let mut channel: Channel = Default::default();
1200		channel[0..8].copy_from_slice(&id.to_le_bytes());
1201		channel
1202	}
1203
1204	fn statement(account_id: u64, priority: u32, c: Option<u64>, data_len: usize) -> Statement {
1205		let mut statement = Statement::new();
1206		let mut data = Vec::new();
1207		data.resize(data_len, 0);
1208		statement.set_plain_data(data);
1209		statement.set_priority(priority);
1210		if let Some(c) = c {
1211			statement.set_channel(channel(c));
1212		}
1213		statement.set_proof(Proof::OnChain {
1214			block_hash: CORRECT_BLOCK_HASH,
1215			who: account(account_id),
1216			event_index: 0,
1217		});
1218		statement
1219	}
1220
1221	#[test]
1222	fn submit_one() {
1223		let (store, _temp) = test_store();
1224		let statement0 = signed_statement(0);
1225		assert_eq!(
1226			store.submit(statement0, StatementSource::Network),
1227			SubmitResult::New(NetworkPriority::High)
1228		);
1229		let unsigned = statement(0, 1, None, 0);
1230		assert_eq!(
1231			store.submit(unsigned, StatementSource::Network),
1232			SubmitResult::New(NetworkPriority::High)
1233		);
1234	}
1235
1236	#[test]
1237	fn save_and_load_statements() {
1238		let (store, temp) = test_store();
1239		let statement0 = signed_statement(0);
1240		let statement1 = signed_statement(1);
1241		let statement2 = signed_statement(2);
1242		assert_eq!(
1243			store.submit(statement0.clone(), StatementSource::Network),
1244			SubmitResult::New(NetworkPriority::High)
1245		);
1246		assert_eq!(
1247			store.submit(statement1.clone(), StatementSource::Network),
1248			SubmitResult::New(NetworkPriority::High)
1249		);
1250		assert_eq!(
1251			store.submit(statement2.clone(), StatementSource::Network),
1252			SubmitResult::New(NetworkPriority::High)
1253		);
1254		assert_eq!(store.statements().unwrap().len(), 3);
1255		assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1256		assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1.clone()));
1257		let keystore = store.keystore.clone();
1258		drop(store);
1259
1260		let client = std::sync::Arc::new(TestClient);
1261		let mut path: std::path::PathBuf = temp.path().into();
1262		path.push("db");
1263		let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1264		assert_eq!(store.statements().unwrap().len(), 3);
1265		assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1266		assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
1267	}
1268
1269	#[test]
1270	fn take_recent_statements_clears_index() {
1271		let (store, _temp) = test_store();
1272		let statement0 = signed_statement(0);
1273		let statement1 = signed_statement(1);
1274		let statement2 = signed_statement(2);
1275		let statement3 = signed_statement(3);
1276
1277		let _ = store.submit(statement0.clone(), StatementSource::Local);
1278		let _ = store.submit(statement1.clone(), StatementSource::Local);
1279		let _ = store.submit(statement2.clone(), StatementSource::Local);
1280
1281		let recent1 = store.take_recent_statements().unwrap();
1282		let (recent1_hashes, recent1_statements): (Vec<_>, Vec<_>) = recent1.into_iter().unzip();
1283		let expected1 = vec![statement0, statement1, statement2];
1284		assert!(expected1.iter().all(|s| recent1_hashes.contains(&s.hash())));
1285		assert!(expected1.iter().all(|s| recent1_statements.contains(s)));
1286
1287		// Recent statements are cleared.
1288		let recent2 = store.take_recent_statements().unwrap();
1289		assert_eq!(recent2.len(), 0);
1290
1291		store.submit(statement3.clone(), StatementSource::Network);
1292
1293		let recent3 = store.take_recent_statements().unwrap();
1294		let (recent3_hashes, recent3_statements): (Vec<_>, Vec<_>) = recent3.into_iter().unzip();
1295		let expected3 = vec![statement3];
1296		assert!(expected3.iter().all(|s| recent3_hashes.contains(&s.hash())));
1297		assert!(expected3.iter().all(|s| recent3_statements.contains(s)));
1298
1299		// Recent statements are cleared, but statements remain in the store.
1300		assert_eq!(store.statements().unwrap().len(), 4);
1301	}
1302
1303	#[test]
1304	fn search_by_topic_and_key() {
1305		let (store, _temp) = test_store();
1306		let statement0 = signed_statement(0);
1307		let statement1 = signed_statement_with_topics(1, &[topic(0)], None);
1308		let statement2 = signed_statement_with_topics(2, &[topic(0), topic(1)], Some(dec_key(2)));
1309		let statement3 = signed_statement_with_topics(3, &[topic(0), topic(1), topic(2)], None);
1310		let statement4 =
1311			signed_statement_with_topics(4, &[topic(0), topic(42), topic(2), topic(3)], None);
1312		let statements = vec![statement0, statement1, statement2, statement3, statement4];
1313		for s in &statements {
1314			store.submit(s.clone(), StatementSource::Network);
1315		}
1316
1317		let assert_topics = |topics: &[u64], key: Option<u64>, expected: &[u8]| {
1318			let key = key.map(dec_key);
1319			let topics: Vec<_> = topics.iter().map(|t| topic(*t)).collect();
1320			let mut got_vals: Vec<_> = if let Some(key) = key {
1321				store.posted(&topics, key).unwrap().into_iter().map(|d| d[0]).collect()
1322			} else {
1323				store.broadcasts(&topics).unwrap().into_iter().map(|d| d[0]).collect()
1324			};
1325			got_vals.sort();
1326			assert_eq!(expected.to_vec(), got_vals);
1327		};
1328
1329		assert_topics(&[], None, &[0, 1, 3, 4]);
1330		assert_topics(&[], Some(2), &[2]);
1331		assert_topics(&[0], None, &[1, 3, 4]);
1332		assert_topics(&[1], None, &[3]);
1333		assert_topics(&[2], None, &[3, 4]);
1334		assert_topics(&[3], None, &[4]);
1335		assert_topics(&[42], None, &[4]);
1336
1337		assert_topics(&[0, 1], None, &[3]);
1338		assert_topics(&[0, 1], Some(2), &[2]);
1339		assert_topics(&[0, 1, 99], Some(2), &[]);
1340		assert_topics(&[1, 2], None, &[3]);
1341		assert_topics(&[99], None, &[]);
1342		assert_topics(&[0, 99], None, &[]);
1343		assert_topics(&[0, 1, 2, 3, 42], None, &[]);
1344	}
1345
1346	#[test]
1347	fn constraints() {
1348		let (store, _temp) = test_store();
1349
1350		store.index.write().options.max_total_size = 3000;
1351		let source = StatementSource::Network;
1352		let ok = SubmitResult::New(NetworkPriority::High);
1353		let ignored = SubmitResult::Ignored;
1354
1355		// Account 1 (limit = 1 msg, 1000 bytes)
1356
1357		// Oversized statement is not allowed. Limit for account 1 is 1 msg, 1000 bytes
1358		assert_eq!(store.submit(statement(1, 1, Some(1), 2000), source), ignored);
1359		assert_eq!(store.submit(statement(1, 1, Some(1), 500), source), ok);
1360		// Would not replace channel message with same priority
1361		assert_eq!(store.submit(statement(1, 1, Some(1), 200), source), ignored);
1362		assert_eq!(store.submit(statement(1, 2, Some(1), 600), source), ok);
1363		// Submit another message to another channel with lower priority. Should not be allowed
1364		// because msg count limit is 1
1365		assert_eq!(store.submit(statement(1, 1, Some(2), 100), source), ignored);
1366		assert_eq!(store.index.read().expired.len(), 1);
1367
1368		// Account 2 (limit = 2 msg, 1000 bytes)
1369
1370		assert_eq!(store.submit(statement(2, 1, None, 500), source), ok);
1371		assert_eq!(store.submit(statement(2, 2, None, 100), source), ok);
1372		// Should evict priority 1
1373		assert_eq!(store.submit(statement(2, 3, None, 500), source), ok);
1374		assert_eq!(store.index.read().expired.len(), 2);
1375		// Should evict all
1376		assert_eq!(store.submit(statement(2, 4, None, 1000), source), ok);
1377		assert_eq!(store.index.read().expired.len(), 4);
1378
1379		// Account 3 (limit = 3 msg, 1000 bytes)
1380
1381		assert_eq!(store.submit(statement(3, 2, Some(1), 300), source), ok);
1382		assert_eq!(store.submit(statement(3, 3, Some(2), 300), source), ok);
1383		assert_eq!(store.submit(statement(3, 4, Some(3), 300), source), ok);
1384		// Should evict 2 and 3
1385		assert_eq!(store.submit(statement(3, 5, None, 500), source), ok);
1386		assert_eq!(store.index.read().expired.len(), 6);
1387
1388		assert_eq!(store.index.read().total_size, 2400);
1389		assert_eq!(store.index.read().entries.len(), 4);
1390
1391		// Should be over the global size limit
1392		assert_eq!(store.submit(statement(1, 1, None, 700), source), ignored);
1393		// Should be over the global count limit
1394		store.index.write().options.max_total_statements = 4;
1395		assert_eq!(store.submit(statement(1, 1, None, 100), source), ignored);
1396
1397		let mut expected_statements = vec![
1398			statement(1, 2, Some(1), 600).hash(),
1399			statement(2, 4, None, 1000).hash(),
1400			statement(3, 4, Some(3), 300).hash(),
1401			statement(3, 5, None, 500).hash(),
1402		];
1403		expected_statements.sort();
1404		let mut statements: Vec<_> =
1405			store.statements().unwrap().into_iter().map(|(hash, _)| hash).collect();
1406		statements.sort();
1407		assert_eq!(expected_statements, statements);
1408	}
1409
1410	#[test]
1411	fn max_statement_size_for_gossiping() {
1412		let (store, _temp) = test_store();
1413		store.index.write().options.max_total_size = 42 * crate::MAX_STATEMENT_SIZE;
1414
1415		assert_eq!(
1416			store.submit(
1417				statement(42, 1, Some(1), crate::MAX_STATEMENT_SIZE - 500),
1418				StatementSource::Local
1419			),
1420			SubmitResult::New(NetworkPriority::High)
1421		);
1422
1423		assert_eq!(
1424			store.submit(
1425				statement(42, 2, Some(1), 2 * crate::MAX_STATEMENT_SIZE),
1426				StatementSource::Local
1427			),
1428			SubmitResult::Ignored
1429		);
1430	}
1431
1432	#[test]
1433	fn expired_statements_are_purged() {
1434		use super::DEFAULT_PURGE_AFTER_SEC;
1435		let (mut store, temp) = test_store();
1436		let mut statement = statement(1, 1, Some(3), 100);
1437		store.set_time(0);
1438		statement.set_topic(0, topic(4));
1439		store.submit(statement.clone(), StatementSource::Network);
1440		assert_eq!(store.index.read().entries.len(), 1);
1441		store.remove(&statement.hash()).unwrap();
1442		assert_eq!(store.index.read().entries.len(), 0);
1443		assert_eq!(store.index.read().accounts.len(), 0);
1444		store.set_time(DEFAULT_PURGE_AFTER_SEC + 1);
1445		store.maintain();
1446		assert_eq!(store.index.read().expired.len(), 0);
1447		let keystore = store.keystore.clone();
1448		drop(store);
1449
1450		let client = std::sync::Arc::new(TestClient);
1451		let mut path: std::path::PathBuf = temp.path().into();
1452		path.push("db");
1453		let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1454		assert_eq!(store.statements().unwrap().len(), 0);
1455		assert_eq!(store.index.read().expired.len(), 0);
1456	}
1457
1458	#[test]
1459	fn posted_clear_decrypts() {
1460		let (store, _temp) = test_store();
1461		let public = store
1462			.keystore
1463			.ed25519_generate_new(pezsp_core::crypto::key_types::STATEMENT, None)
1464			.unwrap();
1465		let statement1 = statement(1, 1, None, 100);
1466		let mut statement2 = statement(1, 2, None, 0);
1467		let plain = b"The most valuable secret".to_vec();
1468		statement2.encrypt(&plain, &public).unwrap();
1469		store.submit(statement1, StatementSource::Network);
1470		store.submit(statement2, StatementSource::Network);
1471		let posted_clear = store.posted_clear(&[], public.into()).unwrap();
1472		assert_eq!(posted_clear, vec![plain]);
1473	}
1474
1475	#[test]
1476	fn broadcasts_stmt_returns_encoded_statements() {
1477		let (store, _tmp) = test_store();
1478
1479		// no key, no topic
1480		let s0 = signed_statement_with_topics(0, &[], None);
1481		// same, but with a topic = 42
1482		let s1 = signed_statement_with_topics(1, &[topic(42)], None);
1483		// has a decryption key -> must NOT be returned by broadcasts_stmt
1484		let s2 = signed_statement_with_topics(2, &[topic(42)], Some(dec_key(99)));
1485
1486		for s in [&s0, &s1, &s2] {
1487			store.submit(s.clone(), StatementSource::Network);
1488		}
1489
1490		// no topic filter
1491		let mut hashes: Vec<_> = store
1492			.broadcasts_stmt(&[])
1493			.unwrap()
1494			.into_iter()
1495			.map(|bytes| Statement::decode(&mut &bytes[..]).unwrap().hash())
1496			.collect();
1497		hashes.sort();
1498		let expected_hashes = {
1499			let mut e = vec![s0.hash(), s1.hash()];
1500			e.sort();
1501			e
1502		};
1503		assert_eq!(hashes, expected_hashes);
1504
1505		// filter on topic 42
1506		let got = store.broadcasts_stmt(&[topic(42)]).unwrap();
1507		assert_eq!(got.len(), 1);
1508		let st = Statement::decode(&mut &got[0][..]).unwrap();
1509		assert_eq!(st.hash(), s1.hash());
1510	}
1511
1512	#[test]
1513	fn posted_stmt_returns_encoded_statements_for_dest() {
1514		let (store, _tmp) = test_store();
1515
1516		let public1 = store
1517			.keystore
1518			.ed25519_generate_new(pezsp_core::crypto::key_types::STATEMENT, None)
1519			.unwrap();
1520		let dest: [u8; 32] = public1.into();
1521
1522		let public2 = store
1523			.keystore
1524			.ed25519_generate_new(pezsp_core::crypto::key_types::STATEMENT, None)
1525			.unwrap();
1526
1527		// A statement that does have dec_key = dest
1528		let mut s_with_key = statement(1, 1, None, 0);
1529		let plain1 = b"The most valuable secret".to_vec();
1530		s_with_key.encrypt(&plain1, &public1).unwrap();
1531
1532		// A statement with a different dec_key
1533		let mut s_other_key = statement(2, 2, None, 0);
1534		let plain2 = b"The second most valuable secret".to_vec();
1535		s_other_key.encrypt(&plain2, &public2).unwrap();
1536
1537		// Submit them all
1538		for s in [&s_with_key, &s_other_key] {
1539			store.submit(s.clone(), StatementSource::Network);
1540		}
1541
1542		// posted_stmt should only return the one with dec_key = dest
1543		let retrieved = store.posted_stmt(&[], dest).unwrap();
1544		assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1545
1546		// Re-decode that returned statement to confirm it is correct
1547		let returned_stmt = Statement::decode(&mut &retrieved[0][..]).unwrap();
1548		assert_eq!(
1549			returned_stmt.hash(),
1550			s_with_key.hash(),
1551			"Returned statement must match s_with_key"
1552		);
1553	}
1554
1555	#[test]
1556	fn posted_clear_stmt_returns_statement_followed_by_plain_data() {
1557		let (store, _tmp) = test_store();
1558
1559		let public1 = store
1560			.keystore
1561			.ed25519_generate_new(pezsp_core::crypto::key_types::STATEMENT, None)
1562			.unwrap();
1563		let dest: [u8; 32] = public1.into();
1564
1565		let public2 = store
1566			.keystore
1567			.ed25519_generate_new(pezsp_core::crypto::key_types::STATEMENT, None)
1568			.unwrap();
1569
1570		// A statement that does have dec_key = dest
1571		let mut s_with_key = statement(1, 1, None, 0);
1572		let plain1 = b"The most valuable secret".to_vec();
1573		s_with_key.encrypt(&plain1, &public1).unwrap();
1574
1575		// A statement with a different dec_key
1576		let mut s_other_key = statement(2, 2, None, 0);
1577		let plain2 = b"The second most valuable secret".to_vec();
1578		s_other_key.encrypt(&plain2, &public2).unwrap();
1579
1580		// Submit them all
1581		for s in [&s_with_key, &s_other_key] {
1582			store.submit(s.clone(), StatementSource::Network);
1583		}
1584
1585		// posted_stmt should only return the one with dec_key = dest
1586		let retrieved = store.posted_clear_stmt(&[], dest).unwrap();
1587		assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1588
1589		// We expect: [ encoded Statement ] + [ the decrypted bytes ]
1590		let encoded_stmt = s_with_key.encode();
1591		let stmt_len = encoded_stmt.len();
1592
1593		// 1) statement is first
1594		assert_eq!(&retrieved[0][..stmt_len], &encoded_stmt[..]);
1595
1596		// 2) followed by the decrypted payload
1597		let trailing = &retrieved[0][stmt_len..];
1598		assert_eq!(trailing, &plain1[..]);
1599	}
1600
1601	#[test]
1602	fn posted_clear_returns_plain_data_for_dest_and_topics() {
1603		let (store, _tmp) = test_store();
1604
1605		// prepare two key-pairs
1606		let public_dest = store
1607			.keystore
1608			.ed25519_generate_new(pezsp_core::crypto::key_types::STATEMENT, None)
1609			.unwrap();
1610		let dest: [u8; 32] = public_dest.into();
1611
1612		let public_other = store
1613			.keystore
1614			.ed25519_generate_new(pezsp_core::crypto::key_types::STATEMENT, None)
1615			.unwrap();
1616
1617		// statement that SHOULD be returned (matches dest & topic 42)
1618		let mut s_good = statement(1, 1, None, 0);
1619		let plaintext_good = b"The most valuable secret".to_vec();
1620		s_good.encrypt(&plaintext_good, &public_dest).unwrap();
1621		s_good.set_topic(0, topic(42));
1622
1623		// statement that should NOT be returned (same dest but different topic)
1624		let mut s_wrong_topic = statement(2, 2, None, 0);
1625		s_wrong_topic.encrypt(b"Wrong topic", &public_dest).unwrap();
1626		s_wrong_topic.set_topic(0, topic(99));
1627
1628		// statement that should NOT be returned (different dest)
1629		let mut s_other_dest = statement(3, 3, None, 0);
1630		s_other_dest.encrypt(b"Other dest", &public_other).unwrap();
1631		s_other_dest.set_topic(0, topic(42));
1632
1633		// submit all
1634		for s in [&s_good, &s_wrong_topic, &s_other_dest] {
1635			store.submit(s.clone(), StatementSource::Network);
1636		}
1637
1638		// call posted_clear with the topic filter and dest
1639		let retrieved = store.posted_clear(&[topic(42)], dest).unwrap();
1640
1641		// exactly one element, equal to the expected plaintext
1642		assert_eq!(retrieved, vec![plaintext_good]);
1643	}
1644
1645	#[test]
1646	fn remove_by_covers_various_situations() {
1647		use pezsp_statement_store::{StatementSource, StatementStore, SubmitResult};
1648
1649		// Use a fresh store and fixed time so we can control purging.
1650		let (mut store, _temp) = test_store();
1651		store.set_time(0);
1652
1653		// Reuse helpers from this module.
1654		let t42 = topic(42);
1655		let k7 = dec_key(7);
1656
1657		// Account A = 4 (has per-account limits (4, 1000) in the mock runtime)
1658		// - Mix of topic, decryption-key and channel to exercise every index.
1659		let mut s_a1 = statement(4, 10, Some(100), 100);
1660		s_a1.set_topic(0, t42);
1661		let h_a1 = s_a1.hash();
1662
1663		let mut s_a2 = statement(4, 20, Some(200), 150);
1664		s_a2.set_decryption_key(k7);
1665		let h_a2 = s_a2.hash();
1666
1667		let s_a3 = statement(4, 30, None, 50);
1668		let h_a3 = s_a3.hash();
1669
1670		// Account B = 3 (control group that must remain untouched).
1671		let s_b1 = statement(3, 10, None, 100);
1672		let h_b1 = s_b1.hash();
1673
1674		let mut s_b2 = statement(3, 15, Some(300), 100);
1675		s_b2.set_topic(0, t42);
1676		s_b2.set_decryption_key(k7);
1677		let h_b2 = s_b2.hash();
1678
1679		// Submit all statements.
1680		for s in [&s_a1, &s_a2, &s_a3, &s_b1, &s_b2] {
1681			assert!(matches!(
1682				store.submit(s.clone(), StatementSource::Network),
1683				SubmitResult::New(_)
1684			));
1685		}
1686
1687		// --- Pre-conditions: everything is indexed as expected.
1688		{
1689			let idx = store.index.read();
1690			assert_eq!(idx.entries.len(), 5, "all 5 should be present");
1691			assert!(idx.accounts.contains_key(&account(4)));
1692			assert!(idx.accounts.contains_key(&account(3)));
1693			assert_eq!(idx.total_size, 100 + 150 + 50 + 100 + 100);
1694
1695			// Topic and key sets contain both A & B entries.
1696			let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1697			assert!(set_t.contains(&h_a1) && set_t.contains(&h_b2));
1698
1699			let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1700			assert!(set_k.contains(&h_a2) && set_k.contains(&h_b2));
1701		}
1702
1703		// --- Action: remove all statements by Account A.
1704		store.remove_by(account(4)).expect("remove_by should succeed");
1705
1706		// --- Post-conditions: A's statements are gone and marked expired; B's remain.
1707		{
1708			// A's statements removed from DB view.
1709			for h in [h_a1, h_a2, h_a3] {
1710				assert!(store.statement(&h).unwrap().is_none(), "A's statement should be removed");
1711			}
1712
1713			// B's statements still present.
1714			for h in [h_b1, h_b2] {
1715				assert!(store.statement(&h).unwrap().is_some(), "B's statement should remain");
1716			}
1717
1718			let idx = store.index.read();
1719
1720			// Account map updated.
1721			assert!(!idx.accounts.contains_key(&account(4)), "Account A must be gone");
1722			assert!(idx.accounts.contains_key(&account(3)), "Account B must remain");
1723
1724			// Removed statements are marked expired.
1725			assert!(idx.expired.contains_key(&h_a1));
1726			assert!(idx.expired.contains_key(&h_a2));
1727			assert!(idx.expired.contains_key(&h_a3));
1728			assert_eq!(idx.expired.len(), 3);
1729
1730			// Entry count & total_size reflect only B's data.
1731			assert_eq!(idx.entries.len(), 2);
1732			assert_eq!(idx.total_size, 100 + 100);
1733
1734			// Topic index: only B2 remains for topic 42.
1735			let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1736			assert!(set_t.contains(&h_b2));
1737			assert!(!set_t.contains(&h_a1));
1738
1739			// Decryption-key index: only B2 remains for key 7.
1740			let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1741			assert!(set_k.contains(&h_b2));
1742			assert!(!set_k.contains(&h_a2));
1743		}
1744
1745		// --- Idempotency: removing again is a no-op and should not error.
1746		store.remove_by(account(4)).expect("second remove_by should be a no-op");
1747
1748		// --- Purge: advance time beyond TTL and run maintenance; expired entries disappear.
1749		let purge_after = store.index.read().options.purge_after_sec;
1750		store.set_time(purge_after + 1);
1751		store.maintain();
1752		assert_eq!(store.index.read().expired.len(), 0, "expired entries should be purged");
1753
1754		// --- Reuse: Account A can submit again after purge.
1755		let s_new = statement(4, 40, None, 10);
1756		assert!(matches!(store.submit(s_new, StatementSource::Network), SubmitResult::New(_)));
1757	}
1758}