1#![warn(missing_docs)]
48#![warn(unused_extern_crates)]
49
50mod metrics;
51
52pub use sp_statement_store::{Error, StatementStore, MAX_TOPICS};
53
54use metrics::MetricsLink as PrometheusMetrics;
55use parking_lot::RwLock;
56use prometheus_endpoint::Registry as PrometheusRegistry;
57use sc_keystore::LocalKeystore;
58use sp_api::ProvideRuntimeApi;
59use sp_blockchain::HeaderBackend;
60use sp_core::{crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode};
61use sp_runtime::traits::Block as BlockT;
62use sp_statement_store::{
63 runtime_api::{
64 InvalidStatement, StatementSource, StatementStoreExt, ValidStatement, ValidateStatement,
65 },
66 AccountId, BlockHash, Channel, DecryptionKey, Hash, NetworkPriority, Proof, Result, Statement,
67 SubmitResult, Topic,
68};
69use std::{
70 collections::{BTreeMap, HashMap, HashSet},
71 sync::Arc,
72};
73
74const KEY_VERSION: &[u8] = b"version".as_slice();
75const CURRENT_VERSION: u32 = 1;
76
77const LOG_TARGET: &str = "statement-store";
78
79const DEFAULT_PURGE_AFTER_SEC: u64 = 2 * 24 * 60 * 60; const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 8192;
81const DEFAULT_MAX_TOTAL_SIZE: usize = 64 * 1024 * 1024;
82
83const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(30);
84
85mod col {
86 pub const META: u8 = 0;
87 pub const STATEMENTS: u8 = 1;
88 pub const EXPIRED: u8 = 2;
89
90 pub const COUNT: u8 = 3;
91}
92
93#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)]
94struct Priority(u32);
95
96#[derive(PartialEq, Eq)]
97struct PriorityKey {
98 hash: Hash,
99 priority: Priority,
100}
101
102impl PartialOrd for PriorityKey {
103 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
104 Some(self.cmp(other))
105 }
106}
107
108impl Ord for PriorityKey {
109 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
110 self.priority.cmp(&other.priority).then_with(|| self.hash.cmp(&other.hash))
111 }
112}
113
114#[derive(PartialEq, Eq)]
115struct ChannelEntry {
116 hash: Hash,
117 priority: Priority,
118}
119
120#[derive(Default)]
121struct StatementsForAccount {
122 by_priority: BTreeMap<PriorityKey, (Option<Channel>, usize)>,
124 channels: HashMap<Channel, ChannelEntry>,
126 data_size: usize,
128}
129
130pub struct Options {
132 max_total_statements: usize,
135 max_total_size: usize,
138 purge_after_sec: u64,
140}
141
142impl Default for Options {
143 fn default() -> Self {
144 Options {
145 max_total_statements: DEFAULT_MAX_TOTAL_STATEMENTS,
146 max_total_size: DEFAULT_MAX_TOTAL_SIZE,
147 purge_after_sec: DEFAULT_PURGE_AFTER_SEC,
148 }
149 }
150}
151
152#[derive(Default)]
153struct Index {
154 by_topic: HashMap<Topic, HashSet<Hash>>,
155 by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
156 topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
157 entries: HashMap<Hash, (AccountId, Priority, usize)>,
158 expired: HashMap<Hash, u64>, accounts: HashMap<AccountId, StatementsForAccount>,
160 options: Options,
161 total_size: usize,
162}
163
164struct ClientWrapper<Block, Client> {
165 client: Arc<Client>,
166 _block: std::marker::PhantomData<Block>,
167}
168
169impl<Block, Client> ClientWrapper<Block, Client>
170where
171 Block: BlockT,
172 Block::Hash: From<BlockHash>,
173 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
174 Client::Api: ValidateStatement<Block>,
175{
176 fn validate_statement(
177 &self,
178 block: Option<BlockHash>,
179 source: StatementSource,
180 statement: Statement,
181 ) -> std::result::Result<ValidStatement, InvalidStatement> {
182 let api = self.client.runtime_api();
183 let block = block.map(Into::into).unwrap_or_else(|| {
184 self.client.info().finalized_hash
186 });
187 api.validate_statement(block, source, statement)
188 .map_err(|_| InvalidStatement::InternalError)?
189 }
190}
191
192pub struct Store {
194 db: parity_db::Db,
195 index: RwLock<Index>,
196 validate_fn: Box<
197 dyn Fn(
198 Option<BlockHash>,
199 StatementSource,
200 Statement,
201 ) -> std::result::Result<ValidStatement, InvalidStatement>
202 + Send
203 + Sync,
204 >,
205 keystore: Arc<LocalKeystore>,
206 time_override: Option<u64>,
208 metrics: PrometheusMetrics,
209}
210
211enum IndexQuery {
212 Unknown,
213 Exists,
214 Expired,
215}
216
217enum MaybeInserted {
218 Inserted(HashSet<Hash>),
219 Ignored,
220}
221
222impl Index {
223 fn new(options: Options) -> Index {
224 Index { options, ..Default::default() }
225 }
226
227 fn insert_new(&mut self, hash: Hash, account: AccountId, statement: &Statement) {
228 let mut all_topics = [None; MAX_TOPICS];
229 let mut nt = 0;
230 while let Some(t) = statement.topic(nt) {
231 self.by_topic.entry(t).or_default().insert(hash);
232 all_topics[nt] = Some(t);
233 nt += 1;
234 }
235 let key = statement.decryption_key();
236 self.by_dec_key.entry(key).or_default().insert(hash);
237 if nt > 0 || key.is_some() {
238 self.topics_and_keys.insert(hash, (all_topics, key));
239 }
240 let priority = Priority(statement.priority().unwrap_or(0));
241 self.entries.insert(hash, (account, priority, statement.data_len()));
242 self.total_size += statement.data_len();
243 let account_info = self.accounts.entry(account).or_default();
244 account_info.data_size += statement.data_len();
245 if let Some(channel) = statement.channel() {
246 account_info.channels.insert(channel, ChannelEntry { hash, priority });
247 }
248 account_info
249 .by_priority
250 .insert(PriorityKey { hash, priority }, (statement.channel(), statement.data_len()));
251 }
252
253 fn query(&self, hash: &Hash) -> IndexQuery {
254 if self.entries.contains_key(hash) {
255 return IndexQuery::Exists
256 }
257 if self.expired.contains_key(hash) {
258 return IndexQuery::Expired
259 }
260 IndexQuery::Unknown
261 }
262
263 fn insert_expired(&mut self, hash: Hash, timestamp: u64) {
264 self.expired.insert(hash, timestamp);
265 }
266
267 fn iterate_with(
268 &self,
269 key: Option<DecryptionKey>,
270 match_all_topics: &[Topic],
271 mut f: impl FnMut(&Hash) -> Result<()>,
272 ) -> Result<()> {
273 let empty = HashSet::new();
274 let mut sets: [&HashSet<Hash>; MAX_TOPICS + 1] = [∅ MAX_TOPICS + 1];
275 if match_all_topics.len() > MAX_TOPICS {
276 return Ok(())
277 }
278 let key_set = self.by_dec_key.get(&key);
279 if key_set.map_or(0, |s| s.len()) == 0 {
280 return Ok(())
282 }
283 sets[0] = key_set.expect("Function returns if key_set is None");
284 for (i, t) in match_all_topics.iter().enumerate() {
285 let set = self.by_topic.get(t);
286 if set.map_or(0, |s| s.len()) == 0 {
287 return Ok(())
289 }
290 sets[i + 1] = set.expect("Function returns if set is None");
291 }
292 let sets = &mut sets[0..match_all_topics.len() + 1];
293 sets.sort_by_key(|s| s.len());
295 for item in sets[0] {
296 if sets[1..].iter().all(|set| set.contains(item)) {
297 log::trace!(
298 target: LOG_TARGET,
299 "Iterating by topic/key: statement {:?}",
300 HexDisplay::from(item)
301 );
302 f(item)?
303 }
304 }
305 Ok(())
306 }
307
308 fn maintain(&mut self, current_time: u64) -> Vec<Hash> {
309 let mut purged = Vec::new();
311 self.expired.retain(|hash, timestamp| {
312 if *timestamp + self.options.purge_after_sec <= current_time {
313 purged.push(*hash);
314 log::trace!(target: LOG_TARGET, "Purged statement {:?}", HexDisplay::from(hash));
315 false
316 } else {
317 true
318 }
319 });
320 purged
321 }
322
323 fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
324 if let Some((account, priority, len)) = self.entries.remove(hash) {
325 self.total_size -= len;
326 if let Some((topics, key)) = self.topics_and_keys.remove(hash) {
327 for t in topics.into_iter().flatten() {
328 if let std::collections::hash_map::Entry::Occupied(mut set) =
329 self.by_topic.entry(t)
330 {
331 set.get_mut().remove(hash);
332 if set.get().is_empty() {
333 set.remove_entry();
334 }
335 }
336 }
337 if let std::collections::hash_map::Entry::Occupied(mut set) =
338 self.by_dec_key.entry(key)
339 {
340 set.get_mut().remove(hash);
341 if set.get().is_empty() {
342 set.remove_entry();
343 }
344 }
345 }
346 self.expired.insert(*hash, current_time);
347 if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
348 self.accounts.entry(account)
349 {
350 let key = PriorityKey { hash: *hash, priority };
351 if let Some((channel, len)) = account_rec.get_mut().by_priority.remove(&key) {
352 account_rec.get_mut().data_size -= len;
353 if let Some(channel) = channel {
354 account_rec.get_mut().channels.remove(&channel);
355 }
356 }
357 if account_rec.get().by_priority.is_empty() {
358 account_rec.remove_entry();
359 }
360 }
361 log::trace!(target: LOG_TARGET, "Expired statement {:?}", HexDisplay::from(hash));
362 true
363 } else {
364 false
365 }
366 }
367
368 fn insert(
369 &mut self,
370 hash: Hash,
371 statement: &Statement,
372 account: &AccountId,
373 validation: &ValidStatement,
374 current_time: u64,
375 ) -> MaybeInserted {
376 let statement_len = statement.data_len();
377 if statement_len > validation.max_size as usize {
378 log::debug!(
379 target: LOG_TARGET,
380 "Ignored oversize message: {:?} ({} bytes)",
381 HexDisplay::from(&hash),
382 statement_len,
383 );
384 return MaybeInserted::Ignored
385 }
386
387 let mut evicted = HashSet::new();
388 let mut would_free_size = 0;
389 let priority = Priority(statement.priority().unwrap_or(0));
390 let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize);
391 if let Some(account_rec) = self.accounts.get(account) {
395 if let Some(channel) = statement.channel() {
396 if let Some(channel_record) = account_rec.channels.get(&channel) {
397 if priority <= channel_record.priority {
398 log::debug!(
400 target: LOG_TARGET,
401 "Ignored lower priority channel message: {:?} {:?} <= {:?}",
402 HexDisplay::from(&hash),
403 priority,
404 channel_record.priority,
405 );
406 return MaybeInserted::Ignored
407 } else {
408 log::debug!(
411 target: LOG_TARGET,
412 "Replacing higher priority channel message: {:?} ({:?}) > {:?} ({:?})",
413 HexDisplay::from(&hash),
414 priority,
415 HexDisplay::from(&channel_record.hash),
416 channel_record.priority,
417 );
418 let key = PriorityKey {
419 hash: channel_record.hash,
420 priority: channel_record.priority,
421 };
422 if let Some((_channel, len)) = account_rec.by_priority.get(&key) {
423 would_free_size += *len;
424 evicted.insert(channel_record.hash);
425 }
426 }
427 }
428 }
429 for (entry, (_, len)) in account_rec.by_priority.iter() {
431 if (account_rec.data_size - would_free_size + statement_len <= max_size) &&
432 account_rec.by_priority.len() + 1 - evicted.len() <= max_count
433 {
434 break
436 }
437 if evicted.contains(&entry.hash) {
438 continue
440 }
441 if entry.priority >= priority {
442 log::debug!(
443 target: LOG_TARGET,
444 "Ignored message due to constraints {:?} {:?} < {:?}",
445 HexDisplay::from(&hash),
446 priority,
447 entry.priority,
448 );
449 return MaybeInserted::Ignored
450 }
451 evicted.insert(entry.hash);
452 would_free_size += len;
453 }
454 }
455 if !((self.total_size - would_free_size + statement_len <= self.options.max_total_size) &&
457 self.entries.len() + 1 - evicted.len() <= self.options.max_total_statements)
458 {
459 log::debug!(
460 target: LOG_TARGET,
461 "Ignored statement {} because the store is full (size={}, count={})",
462 HexDisplay::from(&hash),
463 self.total_size,
464 self.entries.len(),
465 );
466 return MaybeInserted::Ignored
467 }
468
469 for h in &evicted {
470 self.make_expired(h, current_time);
471 }
472 self.insert_new(hash, *account, statement);
473 MaybeInserted::Inserted(evicted)
474 }
475}
476
477impl Store {
478 pub fn new_shared<Block, Client>(
481 path: &std::path::Path,
482 options: Options,
483 client: Arc<Client>,
484 keystore: Arc<LocalKeystore>,
485 prometheus: Option<&PrometheusRegistry>,
486 task_spawner: &dyn SpawnNamed,
487 ) -> Result<Arc<Store>>
488 where
489 Block: BlockT,
490 Block::Hash: From<BlockHash>,
491 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
492 Client::Api: ValidateStatement<Block>,
493 {
494 let store = Arc::new(Self::new(path, options, client, keystore, prometheus)?);
495
496 let worker_store = store.clone();
498 task_spawner.spawn(
499 "statement-store-maintenance",
500 Some("statement-store"),
501 Box::pin(async move {
502 let mut interval = tokio::time::interval(MAINTENANCE_PERIOD);
503 loop {
504 interval.tick().await;
505 worker_store.maintain();
506 }
507 }),
508 );
509
510 Ok(store)
511 }
512
513 fn new<Block, Client>(
516 path: &std::path::Path,
517 options: Options,
518 client: Arc<Client>,
519 keystore: Arc<LocalKeystore>,
520 prometheus: Option<&PrometheusRegistry>,
521 ) -> Result<Store>
522 where
523 Block: BlockT,
524 Block::Hash: From<BlockHash>,
525 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
526 Client::Api: ValidateStatement<Block>,
527 {
528 let mut path: std::path::PathBuf = path.into();
529 path.push("statements");
530
531 let mut config = parity_db::Options::with_columns(&path, col::COUNT);
532
533 let statement_col = &mut config.columns[col::STATEMENTS as usize];
534 statement_col.ref_counted = false;
535 statement_col.preimage = true;
536 statement_col.uniform = true;
537 let db = parity_db::Db::open_or_create(&config).map_err(|e| Error::Db(e.to_string()))?;
538 match db.get(col::META, &KEY_VERSION).map_err(|e| Error::Db(e.to_string()))? {
539 Some(version) => {
540 let version = u32::from_le_bytes(
541 version
542 .try_into()
543 .map_err(|_| Error::Db("Error reading database version".into()))?,
544 );
545 if version != CURRENT_VERSION {
546 return Err(Error::Db(format!("Unsupported database version: {version}")))
547 }
548 },
549 None => {
550 db.commit([(
551 col::META,
552 KEY_VERSION.to_vec(),
553 Some(CURRENT_VERSION.to_le_bytes().to_vec()),
554 )])
555 .map_err(|e| Error::Db(e.to_string()))?;
556 },
557 }
558
559 let validator = ClientWrapper { client, _block: Default::default() };
560 let validate_fn = Box::new(move |block, source, statement| {
561 validator.validate_statement(block, source, statement)
562 });
563
564 let store = Store {
565 db,
566 index: RwLock::new(Index::new(options)),
567 validate_fn,
568 keystore,
569 time_override: None,
570 metrics: PrometheusMetrics::new(prometheus),
571 };
572 store.populate()?;
573 Ok(store)
574 }
575
576 fn populate(&self) -> Result<()> {
581 {
582 let mut index = self.index.write();
583 self.db
584 .iter_column_while(col::STATEMENTS, |item| {
585 let statement = item.value;
586 if let Ok(statement) = Statement::decode(&mut statement.as_slice()) {
587 let hash = statement.hash();
588 log::trace!(
589 target: LOG_TARGET,
590 "Statement loaded {:?}",
591 HexDisplay::from(&hash)
592 );
593 if let Some(account_id) = statement.account_id() {
594 index.insert_new(hash, account_id, &statement);
595 } else {
596 log::debug!(
597 target: LOG_TARGET,
598 "Error decoding statement loaded from the DB: {:?}",
599 HexDisplay::from(&hash)
600 );
601 }
602 }
603 true
604 })
605 .map_err(|e| Error::Db(e.to_string()))?;
606 self.db
607 .iter_column_while(col::EXPIRED, |item| {
608 let expired_info = item.value;
609 if let Ok((hash, timestamp)) =
610 <(Hash, u64)>::decode(&mut expired_info.as_slice())
611 {
612 log::trace!(
613 target: LOG_TARGET,
614 "Statement loaded (expired): {:?}",
615 HexDisplay::from(&hash)
616 );
617 index.insert_expired(hash, timestamp);
618 }
619 true
620 })
621 .map_err(|e| Error::Db(e.to_string()))?;
622 }
623
624 self.maintain();
625 Ok(())
626 }
627
628 fn collect_statements<R>(
629 &self,
630 key: Option<DecryptionKey>,
631 match_all_topics: &[Topic],
632 mut f: impl FnMut(Statement) -> Option<R>,
633 ) -> Result<Vec<R>> {
634 let mut result = Vec::new();
635 let index = self.index.read();
636 index.iterate_with(key, match_all_topics, |hash| {
637 match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? {
638 Some(entry) => {
639 if let Ok(statement) = Statement::decode(&mut entry.as_slice()) {
640 if let Some(data) = f(statement) {
641 result.push(data);
642 }
643 } else {
644 log::warn!(
646 target: LOG_TARGET,
647 "Corrupt statement {:?}",
648 HexDisplay::from(hash)
649 );
650 }
651 },
652 None => {
653 log::warn!(
655 target: LOG_TARGET,
656 "Missing statement {:?}",
657 HexDisplay::from(hash)
658 );
659 },
660 }
661 Ok(())
662 })?;
663 Ok(result)
664 }
665
666 pub fn maintain(&self) {
668 log::trace!(target: LOG_TARGET, "Started store maintenance");
669 let (deleted, active_count, expired_count): (Vec<_>, usize, usize) = {
670 let mut index = self.index.write();
671 let deleted = index.maintain(self.timestamp());
672 (deleted, index.entries.len(), index.expired.len())
673 };
674 let deleted: Vec<_> =
675 deleted.into_iter().map(|hash| (col::EXPIRED, hash.to_vec(), None)).collect();
676 let deleted_count = deleted.len() as u64;
677 if let Err(e) = self.db.commit(deleted) {
678 log::warn!(target: LOG_TARGET, "Error writing to the statement database: {:?}", e);
679 } else {
680 self.metrics.report(|metrics| metrics.statements_pruned.inc_by(deleted_count));
681 }
682 log::trace!(
683 target: LOG_TARGET,
684 "Completed store maintenance. Purged: {}, Active: {}, Expired: {}",
685 deleted_count,
686 active_count,
687 expired_count
688 );
689 }
690
691 fn timestamp(&self) -> u64 {
692 self.time_override.unwrap_or_else(|| {
693 std::time::SystemTime::now()
694 .duration_since(std::time::UNIX_EPOCH)
695 .unwrap_or_default()
696 .as_secs()
697 })
698 }
699
700 #[cfg(test)]
701 fn set_time(&mut self, time: u64) {
702 self.time_override = Some(time);
703 }
704
705 pub fn as_statement_store_ext(self: Arc<Self>) -> StatementStoreExt {
707 StatementStoreExt::new(self)
708 }
709
710 fn posted_clear_inner<R>(
713 &self,
714 match_all_topics: &[Topic],
715 dest: [u8; 32],
716 mut map_f: impl FnMut(Statement, Vec<u8>) -> R,
718 ) -> Result<Vec<R>> {
719 self.collect_statements(Some(dest), match_all_topics, |statement| {
720 if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) {
721 let public: sp_core::ed25519::Public = UncheckedFrom::unchecked_from(key);
722 let public: sp_statement_store::ed25519::Public = public.into();
723 match self.keystore.key_pair::<sp_statement_store::ed25519::Pair>(&public) {
724 Err(e) => {
725 log::debug!(
726 target: LOG_TARGET,
727 "Keystore error: {:?}, for statement {:?}",
728 e,
729 HexDisplay::from(&statement.hash())
730 );
731 None
732 },
733 Ok(None) => {
734 log::debug!(
735 target: LOG_TARGET,
736 "Keystore is missing key for statement {:?}",
737 HexDisplay::from(&statement.hash())
738 );
739 None
740 },
741 Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) {
742 Ok(r) => r.map(|data| map_f(statement, data)),
743 Err(e) => {
744 log::debug!(
745 target: LOG_TARGET,
746 "Decryption error: {:?}, for statement {:?}",
747 e,
748 HexDisplay::from(&statement.hash())
749 );
750 None
751 },
752 },
753 }
754 } else {
755 None
756 }
757 })
758 }
759}
760
761impl StatementStore for Store {
762 fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
764 let index = self.index.read();
765 let mut result = Vec::with_capacity(index.entries.len());
766 for h in index.entries.keys() {
767 let encoded = self.db.get(col::STATEMENTS, h).map_err(|e| Error::Db(e.to_string()))?;
768 if let Some(encoded) = encoded {
769 if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
770 let hash = statement.hash();
771 result.push((hash, statement));
772 }
773 }
774 }
775 Ok(result)
776 }
777
778 fn statement(&self, hash: &Hash) -> Result<Option<Statement>> {
780 Ok(
781 match self
782 .db
783 .get(col::STATEMENTS, hash.as_slice())
784 .map_err(|e| Error::Db(e.to_string()))?
785 {
786 Some(entry) => {
787 log::trace!(
788 target: LOG_TARGET,
789 "Queried statement {:?}",
790 HexDisplay::from(hash)
791 );
792 Some(
793 Statement::decode(&mut entry.as_slice())
794 .map_err(|e| Error::Decode(e.to_string()))?,
795 )
796 },
797 None => {
798 log::trace!(
799 target: LOG_TARGET,
800 "Queried missing statement {:?}",
801 HexDisplay::from(hash)
802 );
803 None
804 },
805 },
806 )
807 }
808
809 fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
812 self.collect_statements(None, match_all_topics, |statement| statement.into_data())
813 }
814
815 fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
819 self.collect_statements(Some(dest), match_all_topics, |statement| statement.into_data())
820 }
821
822 fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
825 self.posted_clear_inner(match_all_topics, dest, |_statement, data| data)
826 }
827
828 fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
831 self.collect_statements(None, match_all_topics, |statement| Some(statement.encode()))
832 }
833
834 fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
838 self.collect_statements(Some(dest), match_all_topics, |statement| Some(statement.encode()))
839 }
840
841 fn posted_clear_stmt(
844 &self,
845 match_all_topics: &[Topic],
846 dest: [u8; 32],
847 ) -> Result<Vec<Vec<u8>>> {
848 self.posted_clear_inner(match_all_topics, dest, |statement, data| {
849 let mut res = Vec::with_capacity(statement.size_hint() + data.len());
850 statement.encode_to(&mut res);
851 res.extend_from_slice(&data);
852 res
853 })
854 }
855
856 fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult {
858 let hash = statement.hash();
859 match self.index.read().query(&hash) {
860 IndexQuery::Expired =>
861 if !source.can_be_resubmitted() {
862 return SubmitResult::KnownExpired
863 },
864 IndexQuery::Exists =>
865 if !source.can_be_resubmitted() {
866 return SubmitResult::Known
867 },
868 IndexQuery::Unknown => {},
869 }
870
871 let Some(account_id) = statement.account_id() else {
872 log::debug!(
873 target: LOG_TARGET,
874 "Statement validation failed: Missing proof ({:?})",
875 HexDisplay::from(&hash),
876 );
877 self.metrics.report(|metrics| metrics.validations_invalid.inc());
878 return SubmitResult::Bad("No statement proof")
879 };
880
881 let at_block = if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
883 Some(*block_hash)
884 } else {
885 None
886 };
887 let validation_result = (self.validate_fn)(at_block, source, statement.clone());
888 let validation = match validation_result {
889 Ok(validation) => validation,
890 Err(InvalidStatement::BadProof) => {
891 log::debug!(
892 target: LOG_TARGET,
893 "Statement validation failed: BadProof, {:?}",
894 HexDisplay::from(&hash),
895 );
896 self.metrics.report(|metrics| metrics.validations_invalid.inc());
897 return SubmitResult::Bad("Bad statement proof")
898 },
899 Err(InvalidStatement::NoProof) => {
900 log::debug!(
901 target: LOG_TARGET,
902 "Statement validation failed: NoProof, {:?}",
903 HexDisplay::from(&hash),
904 );
905 self.metrics.report(|metrics| metrics.validations_invalid.inc());
906 return SubmitResult::Bad("Missing statement proof")
907 },
908 Err(InvalidStatement::InternalError) =>
909 return SubmitResult::InternalError(Error::Runtime),
910 };
911
912 let current_time = self.timestamp();
913 let mut commit = Vec::new();
914 {
915 let mut index = self.index.write();
916
917 let evicted =
918 match index.insert(hash, &statement, &account_id, &validation, current_time) {
919 MaybeInserted::Ignored => return SubmitResult::Ignored,
920 MaybeInserted::Inserted(evicted) => evicted,
921 };
922
923 commit.push((col::STATEMENTS, hash.to_vec(), Some(statement.encode())));
924 for hash in evicted {
925 commit.push((col::STATEMENTS, hash.to_vec(), None));
926 commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
927 }
928 if let Err(e) = self.db.commit(commit) {
929 log::debug!(
930 target: LOG_TARGET,
931 "Statement validation failed: database error {}, {:?}",
932 e,
933 statement
934 );
935 return SubmitResult::InternalError(Error::Db(e.to_string()))
936 }
937 } self.metrics.report(|metrics| metrics.submitted_statements.inc());
939 let network_priority = NetworkPriority::High;
940 log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash));
941 SubmitResult::New(network_priority)
942 }
943
944 fn remove(&self, hash: &Hash) -> Result<()> {
946 let current_time = self.timestamp();
947 {
948 let mut index = self.index.write();
949 if index.make_expired(hash, current_time) {
950 let commit = [
951 (col::STATEMENTS, hash.to_vec(), None),
952 (col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())),
953 ];
954 if let Err(e) = self.db.commit(commit) {
955 log::debug!(
956 target: LOG_TARGET,
957 "Error removing statement: database error {}, {:?}",
958 e,
959 HexDisplay::from(hash),
960 );
961 return Err(Error::Db(e.to_string()))
962 }
963 }
964 }
965 Ok(())
966 }
967
968 fn remove_by(&self, who: [u8; 32]) -> Result<()> {
970 let mut index = self.index.write();
971 let mut evicted = Vec::new();
972 if let Some(account_rec) = index.accounts.get(&who) {
973 evicted.extend(account_rec.by_priority.keys().map(|k| k.hash));
974 }
975
976 let current_time = self.timestamp();
977 let mut commit = Vec::new();
978 for hash in evicted {
979 index.make_expired(&hash, current_time);
980 commit.push((col::STATEMENTS, hash.to_vec(), None));
981 commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
982 }
983 self.db.commit(commit).map_err(|e| {
984 log::debug!(
985 target: LOG_TARGET,
986 "Error removing statement: database error {}, remove by {:?}",
987 e,
988 HexDisplay::from(&who),
989 );
990
991 Error::Db(e.to_string())
992 })
993 }
994}
995
996#[cfg(test)]
997mod tests {
998 use crate::Store;
999 use sc_keystore::Keystore;
1000 use sp_core::{Decode, Encode, Pair};
1001 use sp_statement_store::{
1002 runtime_api::{InvalidStatement, ValidStatement, ValidateStatement},
1003 AccountId, Channel, DecryptionKey, NetworkPriority, Proof, SignatureVerificationResult,
1004 Statement, StatementSource, StatementStore, SubmitResult, Topic,
1005 };
1006
1007 type Extrinsic = sp_runtime::OpaqueExtrinsic;
1008 type Hash = sp_core::H256;
1009 type Hashing = sp_runtime::traits::BlakeTwo256;
1010 type BlockNumber = u64;
1011 type Header = sp_runtime::generic::Header<BlockNumber, Hashing>;
1012 type Block = sp_runtime::generic::Block<Header, Extrinsic>;
1013
1014 const CORRECT_BLOCK_HASH: [u8; 32] = [1u8; 32];
1015
1016 #[derive(Clone)]
1017 pub(crate) struct TestClient;
1018
1019 pub(crate) struct RuntimeApi {
1020 _inner: TestClient,
1021 }
1022
1023 impl sp_api::ProvideRuntimeApi<Block> for TestClient {
1024 type Api = RuntimeApi;
1025 fn runtime_api(&self) -> sp_api::ApiRef<Self::Api> {
1026 RuntimeApi { _inner: self.clone() }.into()
1027 }
1028 }
1029
1030 sp_api::mock_impl_runtime_apis! {
1031 impl ValidateStatement<Block> for RuntimeApi {
1032 fn validate_statement(
1033 _source: StatementSource,
1034 statement: Statement,
1035 ) -> std::result::Result<ValidStatement, InvalidStatement> {
1036 use crate::tests::account;
1037 match statement.verify_signature() {
1038 SignatureVerificationResult::Valid(_) => Ok(ValidStatement{max_count: 100, max_size: 1000}),
1039 SignatureVerificationResult::Invalid => Err(InvalidStatement::BadProof),
1040 SignatureVerificationResult::NoSignature => {
1041 if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
1042 if block_hash == &CORRECT_BLOCK_HASH {
1043 let (max_count, max_size) = match statement.account_id() {
1044 Some(a) if a == account(1) => (1, 1000),
1045 Some(a) if a == account(2) => (2, 1000),
1046 Some(a) if a == account(3) => (3, 1000),
1047 Some(a) if a == account(4) => (4, 1000),
1048 _ => (2, 2000),
1049 };
1050 Ok(ValidStatement{ max_count, max_size })
1051 } else {
1052 Err(InvalidStatement::BadProof)
1053 }
1054 } else {
1055 Err(InvalidStatement::BadProof)
1056 }
1057 }
1058 }
1059 }
1060 }
1061 }
1062
1063 impl sp_blockchain::HeaderBackend<Block> for TestClient {
1064 fn header(&self, _hash: Hash) -> sp_blockchain::Result<Option<Header>> {
1065 unimplemented!()
1066 }
1067 fn info(&self) -> sp_blockchain::Info<Block> {
1068 sp_blockchain::Info {
1069 best_hash: CORRECT_BLOCK_HASH.into(),
1070 best_number: 0,
1071 genesis_hash: Default::default(),
1072 finalized_hash: CORRECT_BLOCK_HASH.into(),
1073 finalized_number: 1,
1074 finalized_state: None,
1075 number_leaves: 0,
1076 block_gap: None,
1077 }
1078 }
1079 fn status(&self, _hash: Hash) -> sp_blockchain::Result<sp_blockchain::BlockStatus> {
1080 unimplemented!()
1081 }
1082 fn number(&self, _hash: Hash) -> sp_blockchain::Result<Option<BlockNumber>> {
1083 unimplemented!()
1084 }
1085 fn hash(&self, _number: BlockNumber) -> sp_blockchain::Result<Option<Hash>> {
1086 unimplemented!()
1087 }
1088 }
1089
1090 fn test_store() -> (Store, tempfile::TempDir) {
1091 sp_tracing::init_for_tests();
1092 let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
1093
1094 let client = std::sync::Arc::new(TestClient);
1095 let mut path: std::path::PathBuf = temp_dir.path().into();
1096 path.push("db");
1097 let keystore = std::sync::Arc::new(sc_keystore::LocalKeystore::in_memory());
1098 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1099 (store, temp_dir) }
1101
1102 fn signed_statement(data: u8) -> Statement {
1103 signed_statement_with_topics(data, &[], None)
1104 }
1105
1106 fn signed_statement_with_topics(
1107 data: u8,
1108 topics: &[Topic],
1109 dec_key: Option<DecryptionKey>,
1110 ) -> Statement {
1111 let mut statement = Statement::new();
1112 statement.set_plain_data(vec![data]);
1113 for i in 0..topics.len() {
1114 statement.set_topic(i, topics[i]);
1115 }
1116 if let Some(key) = dec_key {
1117 statement.set_decryption_key(key);
1118 }
1119 let kp = sp_core::ed25519::Pair::from_string("//Alice", None).unwrap();
1120 statement.sign_ed25519_private(&kp);
1121 statement
1122 }
1123
1124 fn topic(data: u64) -> Topic {
1125 let mut topic: Topic = Default::default();
1126 topic[0..8].copy_from_slice(&data.to_le_bytes());
1127 topic
1128 }
1129
1130 fn dec_key(data: u64) -> DecryptionKey {
1131 let mut dec_key: DecryptionKey = Default::default();
1132 dec_key[0..8].copy_from_slice(&data.to_le_bytes());
1133 dec_key
1134 }
1135
1136 fn account(id: u64) -> AccountId {
1137 let mut account: AccountId = Default::default();
1138 account[0..8].copy_from_slice(&id.to_le_bytes());
1139 account
1140 }
1141
1142 fn channel(id: u64) -> Channel {
1143 let mut channel: Channel = Default::default();
1144 channel[0..8].copy_from_slice(&id.to_le_bytes());
1145 channel
1146 }
1147
1148 fn statement(account_id: u64, priority: u32, c: Option<u64>, data_len: usize) -> Statement {
1149 let mut statement = Statement::new();
1150 let mut data = Vec::new();
1151 data.resize(data_len, 0);
1152 statement.set_plain_data(data);
1153 statement.set_priority(priority);
1154 if let Some(c) = c {
1155 statement.set_channel(channel(c));
1156 }
1157 statement.set_proof(Proof::OnChain {
1158 block_hash: CORRECT_BLOCK_HASH,
1159 who: account(account_id),
1160 event_index: 0,
1161 });
1162 statement
1163 }
1164
1165 #[test]
1166 fn submit_one() {
1167 let (store, _temp) = test_store();
1168 let statement0 = signed_statement(0);
1169 assert_eq!(
1170 store.submit(statement0, StatementSource::Network),
1171 SubmitResult::New(NetworkPriority::High)
1172 );
1173 let unsigned = statement(0, 1, None, 0);
1174 assert_eq!(
1175 store.submit(unsigned, StatementSource::Network),
1176 SubmitResult::New(NetworkPriority::High)
1177 );
1178 }
1179
1180 #[test]
1181 fn save_and_load_statements() {
1182 let (store, temp) = test_store();
1183 let statement0 = signed_statement(0);
1184 let statement1 = signed_statement(1);
1185 let statement2 = signed_statement(2);
1186 assert_eq!(
1187 store.submit(statement0.clone(), StatementSource::Network),
1188 SubmitResult::New(NetworkPriority::High)
1189 );
1190 assert_eq!(
1191 store.submit(statement1.clone(), StatementSource::Network),
1192 SubmitResult::New(NetworkPriority::High)
1193 );
1194 assert_eq!(
1195 store.submit(statement2.clone(), StatementSource::Network),
1196 SubmitResult::New(NetworkPriority::High)
1197 );
1198 assert_eq!(store.statements().unwrap().len(), 3);
1199 assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1200 assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1.clone()));
1201 let keystore = store.keystore.clone();
1202 drop(store);
1203
1204 let client = std::sync::Arc::new(TestClient);
1205 let mut path: std::path::PathBuf = temp.path().into();
1206 path.push("db");
1207 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1208 assert_eq!(store.statements().unwrap().len(), 3);
1209 assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1210 assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
1211 }
1212
1213 #[test]
1214 fn search_by_topic_and_key() {
1215 let (store, _temp) = test_store();
1216 let statement0 = signed_statement(0);
1217 let statement1 = signed_statement_with_topics(1, &[topic(0)], None);
1218 let statement2 = signed_statement_with_topics(2, &[topic(0), topic(1)], Some(dec_key(2)));
1219 let statement3 = signed_statement_with_topics(3, &[topic(0), topic(1), topic(2)], None);
1220 let statement4 =
1221 signed_statement_with_topics(4, &[topic(0), topic(42), topic(2), topic(3)], None);
1222 let statements = vec![statement0, statement1, statement2, statement3, statement4];
1223 for s in &statements {
1224 store.submit(s.clone(), StatementSource::Network);
1225 }
1226
1227 let assert_topics = |topics: &[u64], key: Option<u64>, expected: &[u8]| {
1228 let key = key.map(dec_key);
1229 let topics: Vec<_> = topics.iter().map(|t| topic(*t)).collect();
1230 let mut got_vals: Vec<_> = if let Some(key) = key {
1231 store.posted(&topics, key).unwrap().into_iter().map(|d| d[0]).collect()
1232 } else {
1233 store.broadcasts(&topics).unwrap().into_iter().map(|d| d[0]).collect()
1234 };
1235 got_vals.sort();
1236 assert_eq!(expected.to_vec(), got_vals);
1237 };
1238
1239 assert_topics(&[], None, &[0, 1, 3, 4]);
1240 assert_topics(&[], Some(2), &[2]);
1241 assert_topics(&[0], None, &[1, 3, 4]);
1242 assert_topics(&[1], None, &[3]);
1243 assert_topics(&[2], None, &[3, 4]);
1244 assert_topics(&[3], None, &[4]);
1245 assert_topics(&[42], None, &[4]);
1246
1247 assert_topics(&[0, 1], None, &[3]);
1248 assert_topics(&[0, 1], Some(2), &[2]);
1249 assert_topics(&[0, 1, 99], Some(2), &[]);
1250 assert_topics(&[1, 2], None, &[3]);
1251 assert_topics(&[99], None, &[]);
1252 assert_topics(&[0, 99], None, &[]);
1253 assert_topics(&[0, 1, 2, 3, 42], None, &[]);
1254 }
1255
1256 #[test]
1257 fn constraints() {
1258 let (store, _temp) = test_store();
1259
1260 store.index.write().options.max_total_size = 3000;
1261 let source = StatementSource::Network;
1262 let ok = SubmitResult::New(NetworkPriority::High);
1263 let ignored = SubmitResult::Ignored;
1264
1265 assert_eq!(store.submit(statement(1, 1, Some(1), 2000), source), ignored);
1269 assert_eq!(store.submit(statement(1, 1, Some(1), 500), source), ok);
1270 assert_eq!(store.submit(statement(1, 1, Some(1), 200), source), ignored);
1272 assert_eq!(store.submit(statement(1, 2, Some(1), 600), source), ok);
1273 assert_eq!(store.submit(statement(1, 1, Some(2), 100), source), ignored);
1276 assert_eq!(store.index.read().expired.len(), 1);
1277
1278 assert_eq!(store.submit(statement(2, 1, None, 500), source), ok);
1281 assert_eq!(store.submit(statement(2, 2, None, 100), source), ok);
1282 assert_eq!(store.submit(statement(2, 3, None, 500), source), ok);
1284 assert_eq!(store.index.read().expired.len(), 2);
1285 assert_eq!(store.submit(statement(2, 4, None, 1000), source), ok);
1287 assert_eq!(store.index.read().expired.len(), 4);
1288
1289 assert_eq!(store.submit(statement(3, 2, Some(1), 300), source), ok);
1292 assert_eq!(store.submit(statement(3, 3, Some(2), 300), source), ok);
1293 assert_eq!(store.submit(statement(3, 4, Some(3), 300), source), ok);
1294 assert_eq!(store.submit(statement(3, 5, None, 500), source), ok);
1296 assert_eq!(store.index.read().expired.len(), 6);
1297
1298 assert_eq!(store.index.read().total_size, 2400);
1299 assert_eq!(store.index.read().entries.len(), 4);
1300
1301 assert_eq!(store.submit(statement(1, 1, None, 700), source), ignored);
1303 store.index.write().options.max_total_statements = 4;
1305 assert_eq!(store.submit(statement(1, 1, None, 100), source), ignored);
1306
1307 let mut expected_statements = vec![
1308 statement(1, 2, Some(1), 600).hash(),
1309 statement(2, 4, None, 1000).hash(),
1310 statement(3, 4, Some(3), 300).hash(),
1311 statement(3, 5, None, 500).hash(),
1312 ];
1313 expected_statements.sort();
1314 let mut statements: Vec<_> =
1315 store.statements().unwrap().into_iter().map(|(hash, _)| hash).collect();
1316 statements.sort();
1317 assert_eq!(expected_statements, statements);
1318 }
1319
1320 #[test]
1321 fn expired_statements_are_purged() {
1322 use super::DEFAULT_PURGE_AFTER_SEC;
1323 let (mut store, temp) = test_store();
1324 let mut statement = statement(1, 1, Some(3), 100);
1325 store.set_time(0);
1326 statement.set_topic(0, topic(4));
1327 store.submit(statement.clone(), StatementSource::Network);
1328 assert_eq!(store.index.read().entries.len(), 1);
1329 store.remove(&statement.hash()).unwrap();
1330 assert_eq!(store.index.read().entries.len(), 0);
1331 assert_eq!(store.index.read().accounts.len(), 0);
1332 store.set_time(DEFAULT_PURGE_AFTER_SEC + 1);
1333 store.maintain();
1334 assert_eq!(store.index.read().expired.len(), 0);
1335 let keystore = store.keystore.clone();
1336 drop(store);
1337
1338 let client = std::sync::Arc::new(TestClient);
1339 let mut path: std::path::PathBuf = temp.path().into();
1340 path.push("db");
1341 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1342 assert_eq!(store.statements().unwrap().len(), 0);
1343 assert_eq!(store.index.read().expired.len(), 0);
1344 }
1345
1346 #[test]
1347 fn posted_clear_decrypts() {
1348 let (store, _temp) = test_store();
1349 let public = store
1350 .keystore
1351 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1352 .unwrap();
1353 let statement1 = statement(1, 1, None, 100);
1354 let mut statement2 = statement(1, 2, None, 0);
1355 let plain = b"The most valuable secret".to_vec();
1356 statement2.encrypt(&plain, &public).unwrap();
1357 store.submit(statement1, StatementSource::Network);
1358 store.submit(statement2, StatementSource::Network);
1359 let posted_clear = store.posted_clear(&[], public.into()).unwrap();
1360 assert_eq!(posted_clear, vec![plain]);
1361 }
1362
1363 #[test]
1364 fn broadcasts_stmt_returns_encoded_statements() {
1365 let (store, _tmp) = test_store();
1366
1367 let s0 = signed_statement_with_topics(0, &[], None);
1369 let s1 = signed_statement_with_topics(1, &[topic(42)], None);
1371 let s2 = signed_statement_with_topics(2, &[topic(42)], Some(dec_key(99)));
1373
1374 for s in [&s0, &s1, &s2] {
1375 store.submit(s.clone(), StatementSource::Network);
1376 }
1377
1378 let mut hashes: Vec<_> = store
1380 .broadcasts_stmt(&[])
1381 .unwrap()
1382 .into_iter()
1383 .map(|bytes| Statement::decode(&mut &bytes[..]).unwrap().hash())
1384 .collect();
1385 hashes.sort();
1386 let expected_hashes = {
1387 let mut e = vec![s0.hash(), s1.hash()];
1388 e.sort();
1389 e
1390 };
1391 assert_eq!(hashes, expected_hashes);
1392
1393 let got = store.broadcasts_stmt(&[topic(42)]).unwrap();
1395 assert_eq!(got.len(), 1);
1396 let st = Statement::decode(&mut &got[0][..]).unwrap();
1397 assert_eq!(st.hash(), s1.hash());
1398 }
1399
1400 #[test]
1401 fn posted_stmt_returns_encoded_statements_for_dest() {
1402 let (store, _tmp) = test_store();
1403
1404 let public1 = store
1405 .keystore
1406 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1407 .unwrap();
1408 let dest: [u8; 32] = public1.into();
1409
1410 let public2 = store
1411 .keystore
1412 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1413 .unwrap();
1414
1415 let mut s_with_key = statement(1, 1, None, 0);
1417 let plain1 = b"The most valuable secret".to_vec();
1418 s_with_key.encrypt(&plain1, &public1).unwrap();
1419
1420 let mut s_other_key = statement(2, 2, None, 0);
1422 let plain2 = b"The second most valuable secret".to_vec();
1423 s_other_key.encrypt(&plain2, &public2).unwrap();
1424
1425 for s in [&s_with_key, &s_other_key] {
1427 store.submit(s.clone(), StatementSource::Network);
1428 }
1429
1430 let retrieved = store.posted_stmt(&[], dest).unwrap();
1432 assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1433
1434 let returned_stmt = Statement::decode(&mut &retrieved[0][..]).unwrap();
1436 assert_eq!(
1437 returned_stmt.hash(),
1438 s_with_key.hash(),
1439 "Returned statement must match s_with_key"
1440 );
1441 }
1442
1443 #[test]
1444 fn posted_clear_stmt_returns_statement_followed_by_plain_data() {
1445 let (store, _tmp) = test_store();
1446
1447 let public1 = store
1448 .keystore
1449 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1450 .unwrap();
1451 let dest: [u8; 32] = public1.into();
1452
1453 let public2 = store
1454 .keystore
1455 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1456 .unwrap();
1457
1458 let mut s_with_key = statement(1, 1, None, 0);
1460 let plain1 = b"The most valuable secret".to_vec();
1461 s_with_key.encrypt(&plain1, &public1).unwrap();
1462
1463 let mut s_other_key = statement(2, 2, None, 0);
1465 let plain2 = b"The second most valuable secret".to_vec();
1466 s_other_key.encrypt(&plain2, &public2).unwrap();
1467
1468 for s in [&s_with_key, &s_other_key] {
1470 store.submit(s.clone(), StatementSource::Network);
1471 }
1472
1473 let retrieved = store.posted_clear_stmt(&[], dest).unwrap();
1475 assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1476
1477 let encoded_stmt = s_with_key.encode();
1479 let stmt_len = encoded_stmt.len();
1480
1481 assert_eq!(&retrieved[0][..stmt_len], &encoded_stmt[..]);
1483
1484 let trailing = &retrieved[0][stmt_len..];
1486 assert_eq!(trailing, &plain1[..]);
1487 }
1488
1489 #[test]
1490 fn posted_clear_returns_plain_data_for_dest_and_topics() {
1491 let (store, _tmp) = test_store();
1492
1493 let public_dest = store
1495 .keystore
1496 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1497 .unwrap();
1498 let dest: [u8; 32] = public_dest.into();
1499
1500 let public_other = store
1501 .keystore
1502 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1503 .unwrap();
1504
1505 let mut s_good = statement(1, 1, None, 0);
1507 let plaintext_good = b"The most valuable secret".to_vec();
1508 s_good.encrypt(&plaintext_good, &public_dest).unwrap();
1509 s_good.set_topic(0, topic(42));
1510
1511 let mut s_wrong_topic = statement(2, 2, None, 0);
1513 s_wrong_topic.encrypt(b"Wrong topic", &public_dest).unwrap();
1514 s_wrong_topic.set_topic(0, topic(99));
1515
1516 let mut s_other_dest = statement(3, 3, None, 0);
1518 s_other_dest.encrypt(b"Other dest", &public_other).unwrap();
1519 s_other_dest.set_topic(0, topic(42));
1520
1521 for s in [&s_good, &s_wrong_topic, &s_other_dest] {
1523 store.submit(s.clone(), StatementSource::Network);
1524 }
1525
1526 let retrieved = store.posted_clear(&[topic(42)], dest).unwrap();
1528
1529 assert_eq!(retrieved, vec![plaintext_good]);
1531 }
1532
1533 #[test]
1534 fn remove_by_covers_various_situations() {
1535 use sp_statement_store::{StatementSource, StatementStore, SubmitResult};
1536
1537 let (mut store, _temp) = test_store();
1539 store.set_time(0);
1540
1541 let t42 = topic(42);
1543 let k7 = dec_key(7);
1544
1545 let mut s_a1 = statement(4, 10, Some(100), 100);
1548 s_a1.set_topic(0, t42);
1549 let h_a1 = s_a1.hash();
1550
1551 let mut s_a2 = statement(4, 20, Some(200), 150);
1552 s_a2.set_decryption_key(k7);
1553 let h_a2 = s_a2.hash();
1554
1555 let s_a3 = statement(4, 30, None, 50);
1556 let h_a3 = s_a3.hash();
1557
1558 let s_b1 = statement(3, 10, None, 100);
1560 let h_b1 = s_b1.hash();
1561
1562 let mut s_b2 = statement(3, 15, Some(300), 100);
1563 s_b2.set_topic(0, t42);
1564 s_b2.set_decryption_key(k7);
1565 let h_b2 = s_b2.hash();
1566
1567 for s in [&s_a1, &s_a2, &s_a3, &s_b1, &s_b2] {
1569 assert!(matches!(
1570 store.submit(s.clone(), StatementSource::Network),
1571 SubmitResult::New(_)
1572 ));
1573 }
1574
1575 {
1577 let idx = store.index.read();
1578 assert_eq!(idx.entries.len(), 5, "all 5 should be present");
1579 assert!(idx.accounts.contains_key(&account(4)));
1580 assert!(idx.accounts.contains_key(&account(3)));
1581 assert_eq!(idx.total_size, 100 + 150 + 50 + 100 + 100);
1582
1583 let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1585 assert!(set_t.contains(&h_a1) && set_t.contains(&h_b2));
1586
1587 let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1588 assert!(set_k.contains(&h_a2) && set_k.contains(&h_b2));
1589 }
1590
1591 store.remove_by(account(4)).expect("remove_by should succeed");
1593
1594 {
1596 for h in [h_a1, h_a2, h_a3] {
1598 assert!(store.statement(&h).unwrap().is_none(), "A's statement should be removed");
1599 }
1600
1601 for h in [h_b1, h_b2] {
1603 assert!(store.statement(&h).unwrap().is_some(), "B's statement should remain");
1604 }
1605
1606 let idx = store.index.read();
1607
1608 assert!(!idx.accounts.contains_key(&account(4)), "Account A must be gone");
1610 assert!(idx.accounts.contains_key(&account(3)), "Account B must remain");
1611
1612 assert!(idx.expired.contains_key(&h_a1));
1614 assert!(idx.expired.contains_key(&h_a2));
1615 assert!(idx.expired.contains_key(&h_a3));
1616 assert_eq!(idx.expired.len(), 3);
1617
1618 assert_eq!(idx.entries.len(), 2);
1620 assert_eq!(idx.total_size, 100 + 100);
1621
1622 let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1624 assert!(set_t.contains(&h_b2));
1625 assert!(!set_t.contains(&h_a1));
1626
1627 let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1629 assert!(set_k.contains(&h_b2));
1630 assert!(!set_k.contains(&h_a2));
1631 }
1632
1633 store.remove_by(account(4)).expect("second remove_by should be a no-op");
1635
1636 let purge_after = store.index.read().options.purge_after_sec;
1638 store.set_time(purge_after + 1);
1639 store.maintain();
1640 assert_eq!(store.index.read().expired.len(), 0, "expired entries should be purged");
1641
1642 let s_new = statement(4, 40, None, 10);
1644 assert!(matches!(store.submit(s_new, StatementSource::Network), SubmitResult::New(_)));
1645 }
1646}