1#![warn(missing_docs)]
48#![warn(unused_extern_crates)]
49
50mod metrics;
51
52pub use sp_statement_store::{Error, StatementStore, MAX_TOPICS};
53
54use metrics::MetricsLink as PrometheusMetrics;
55use parking_lot::RwLock;
56use prometheus_endpoint::Registry as PrometheusRegistry;
57use sc_keystore::LocalKeystore;
58use sp_api::ProvideRuntimeApi;
59use sp_blockchain::HeaderBackend;
60use sp_core::{crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode};
61use sp_runtime::traits::Block as BlockT;
62use sp_statement_store::{
63 runtime_api::{
64 InvalidStatement, StatementSource, StatementStoreExt, ValidStatement, ValidateStatement,
65 },
66 AccountId, BlockHash, Channel, DecryptionKey, Hash, NetworkPriority, Proof, Result, Statement,
67 SubmitResult, Topic,
68};
69use std::{
70 collections::{BTreeMap, HashMap, HashSet},
71 sync::Arc,
72};
73
74const KEY_VERSION: &[u8] = b"version".as_slice();
75const CURRENT_VERSION: u32 = 1;
76
77const LOG_TARGET: &str = "statement-store";
78
79pub const DEFAULT_PURGE_AFTER_SEC: u64 = 2 * 24 * 60 * 60; pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(30);
88
89mod col {
90 pub const META: u8 = 0;
91 pub const STATEMENTS: u8 = 1;
92 pub const EXPIRED: u8 = 2;
93
94 pub const COUNT: u8 = 3;
95}
96
97#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)]
98struct Priority(u32);
99
100#[derive(PartialEq, Eq)]
101struct PriorityKey {
102 hash: Hash,
103 priority: Priority,
104}
105
106impl PartialOrd for PriorityKey {
107 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
108 Some(self.cmp(other))
109 }
110}
111
112impl Ord for PriorityKey {
113 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
114 self.priority.cmp(&other.priority).then_with(|| self.hash.cmp(&other.hash))
115 }
116}
117
118#[derive(PartialEq, Eq)]
119struct ChannelEntry {
120 hash: Hash,
121 priority: Priority,
122}
123
124#[derive(Default)]
125struct StatementsForAccount {
126 by_priority: BTreeMap<PriorityKey, (Option<Channel>, usize)>,
128 channels: HashMap<Channel, ChannelEntry>,
130 data_size: usize,
132}
133
134pub struct Options {
136 max_total_statements: usize,
139 max_total_size: usize,
142 purge_after_sec: u64,
144}
145
146impl Default for Options {
147 fn default() -> Self {
148 Options {
149 max_total_statements: DEFAULT_MAX_TOTAL_STATEMENTS,
150 max_total_size: DEFAULT_MAX_TOTAL_SIZE,
151 purge_after_sec: DEFAULT_PURGE_AFTER_SEC,
152 }
153 }
154}
155
156#[derive(Default)]
157struct Index {
158 by_topic: HashMap<Topic, HashSet<Hash>>,
159 by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
160 topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
161 entries: HashMap<Hash, (AccountId, Priority, usize)>,
162 expired: HashMap<Hash, u64>, accounts: HashMap<AccountId, StatementsForAccount>,
164 options: Options,
165 total_size: usize,
166}
167
168struct ClientWrapper<Block, Client> {
169 client: Arc<Client>,
170 _block: std::marker::PhantomData<Block>,
171}
172
173impl<Block, Client> ClientWrapper<Block, Client>
174where
175 Block: BlockT,
176 Block::Hash: From<BlockHash>,
177 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
178 Client::Api: ValidateStatement<Block>,
179{
180 fn validate_statement(
181 &self,
182 block: Option<BlockHash>,
183 source: StatementSource,
184 statement: Statement,
185 ) -> std::result::Result<ValidStatement, InvalidStatement> {
186 let api = self.client.runtime_api();
187 let block = block.map(Into::into).unwrap_or_else(|| {
188 self.client.info().finalized_hash
190 });
191 api.validate_statement(block, source, statement)
192 .map_err(|_| InvalidStatement::InternalError)?
193 }
194}
195
196pub struct Store {
198 db: parity_db::Db,
199 index: RwLock<Index>,
200 validate_fn: Box<
201 dyn Fn(
202 Option<BlockHash>,
203 StatementSource,
204 Statement,
205 ) -> std::result::Result<ValidStatement, InvalidStatement>
206 + Send
207 + Sync,
208 >,
209 keystore: Arc<LocalKeystore>,
210 time_override: Option<u64>,
212 metrics: PrometheusMetrics,
213}
214
215enum IndexQuery {
216 Unknown,
217 Exists,
218 Expired,
219}
220
221enum MaybeInserted {
222 Inserted(HashSet<Hash>),
223 Ignored,
224}
225
226impl Index {
227 fn new(options: Options) -> Index {
228 Index { options, ..Default::default() }
229 }
230
231 fn insert_new(&mut self, hash: Hash, account: AccountId, statement: &Statement) {
232 let mut all_topics = [None; MAX_TOPICS];
233 let mut nt = 0;
234 while let Some(t) = statement.topic(nt) {
235 self.by_topic.entry(t).or_default().insert(hash);
236 all_topics[nt] = Some(t);
237 nt += 1;
238 }
239 let key = statement.decryption_key();
240 self.by_dec_key.entry(key).or_default().insert(hash);
241 if nt > 0 || key.is_some() {
242 self.topics_and_keys.insert(hash, (all_topics, key));
243 }
244 let priority = Priority(statement.priority().unwrap_or(0));
245 self.entries.insert(hash, (account, priority, statement.data_len()));
246 self.total_size += statement.data_len();
247 let account_info = self.accounts.entry(account).or_default();
248 account_info.data_size += statement.data_len();
249 if let Some(channel) = statement.channel() {
250 account_info.channels.insert(channel, ChannelEntry { hash, priority });
251 }
252 account_info
253 .by_priority
254 .insert(PriorityKey { hash, priority }, (statement.channel(), statement.data_len()));
255 }
256
257 fn query(&self, hash: &Hash) -> IndexQuery {
258 if self.entries.contains_key(hash) {
259 return IndexQuery::Exists
260 }
261 if self.expired.contains_key(hash) {
262 return IndexQuery::Expired
263 }
264 IndexQuery::Unknown
265 }
266
267 fn insert_expired(&mut self, hash: Hash, timestamp: u64) {
268 self.expired.insert(hash, timestamp);
269 }
270
271 fn iterate_with(
272 &self,
273 key: Option<DecryptionKey>,
274 match_all_topics: &[Topic],
275 mut f: impl FnMut(&Hash) -> Result<()>,
276 ) -> Result<()> {
277 let empty = HashSet::new();
278 let mut sets: [&HashSet<Hash>; MAX_TOPICS + 1] = [∅ MAX_TOPICS + 1];
279 if match_all_topics.len() > MAX_TOPICS {
280 return Ok(())
281 }
282 let key_set = self.by_dec_key.get(&key);
283 if key_set.map_or(0, |s| s.len()) == 0 {
284 return Ok(())
286 }
287 sets[0] = key_set.expect("Function returns if key_set is None");
288 for (i, t) in match_all_topics.iter().enumerate() {
289 let set = self.by_topic.get(t);
290 if set.map_or(0, |s| s.len()) == 0 {
291 return Ok(())
293 }
294 sets[i + 1] = set.expect("Function returns if set is None");
295 }
296 let sets = &mut sets[0..match_all_topics.len() + 1];
297 sets.sort_by_key(|s| s.len());
299 for item in sets[0] {
300 if sets[1..].iter().all(|set| set.contains(item)) {
301 log::trace!(
302 target: LOG_TARGET,
303 "Iterating by topic/key: statement {:?}",
304 HexDisplay::from(item)
305 );
306 f(item)?
307 }
308 }
309 Ok(())
310 }
311
312 fn maintain(&mut self, current_time: u64) -> Vec<Hash> {
313 let mut purged = Vec::new();
315 self.expired.retain(|hash, timestamp| {
316 if *timestamp + self.options.purge_after_sec <= current_time {
317 purged.push(*hash);
318 log::trace!(target: LOG_TARGET, "Purged statement {:?}", HexDisplay::from(hash));
319 false
320 } else {
321 true
322 }
323 });
324 purged
325 }
326
327 fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
328 if let Some((account, priority, len)) = self.entries.remove(hash) {
329 self.total_size -= len;
330 if let Some((topics, key)) = self.topics_and_keys.remove(hash) {
331 for t in topics.into_iter().flatten() {
332 if let std::collections::hash_map::Entry::Occupied(mut set) =
333 self.by_topic.entry(t)
334 {
335 set.get_mut().remove(hash);
336 if set.get().is_empty() {
337 set.remove_entry();
338 }
339 }
340 }
341 if let std::collections::hash_map::Entry::Occupied(mut set) =
342 self.by_dec_key.entry(key)
343 {
344 set.get_mut().remove(hash);
345 if set.get().is_empty() {
346 set.remove_entry();
347 }
348 }
349 }
350 self.expired.insert(*hash, current_time);
351 if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
352 self.accounts.entry(account)
353 {
354 let key = PriorityKey { hash: *hash, priority };
355 if let Some((channel, len)) = account_rec.get_mut().by_priority.remove(&key) {
356 account_rec.get_mut().data_size -= len;
357 if let Some(channel) = channel {
358 account_rec.get_mut().channels.remove(&channel);
359 }
360 }
361 if account_rec.get().by_priority.is_empty() {
362 account_rec.remove_entry();
363 }
364 }
365 log::trace!(target: LOG_TARGET, "Expired statement {:?}", HexDisplay::from(hash));
366 true
367 } else {
368 false
369 }
370 }
371
372 fn insert(
373 &mut self,
374 hash: Hash,
375 statement: &Statement,
376 account: &AccountId,
377 validation: &ValidStatement,
378 current_time: u64,
379 ) -> MaybeInserted {
380 let statement_len = statement.data_len();
381 if statement_len > validation.max_size as usize {
382 log::debug!(
383 target: LOG_TARGET,
384 "Ignored oversize message: {:?} ({} bytes)",
385 HexDisplay::from(&hash),
386 statement_len,
387 );
388 return MaybeInserted::Ignored
389 }
390
391 let mut evicted = HashSet::new();
392 let mut would_free_size = 0;
393 let priority = Priority(statement.priority().unwrap_or(0));
394 let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize);
395 if let Some(account_rec) = self.accounts.get(account) {
399 if let Some(channel) = statement.channel() {
400 if let Some(channel_record) = account_rec.channels.get(&channel) {
401 if priority <= channel_record.priority {
402 log::debug!(
404 target: LOG_TARGET,
405 "Ignored lower priority channel message: {:?} {:?} <= {:?}",
406 HexDisplay::from(&hash),
407 priority,
408 channel_record.priority,
409 );
410 return MaybeInserted::Ignored
411 } else {
412 log::debug!(
415 target: LOG_TARGET,
416 "Replacing higher priority channel message: {:?} ({:?}) > {:?} ({:?})",
417 HexDisplay::from(&hash),
418 priority,
419 HexDisplay::from(&channel_record.hash),
420 channel_record.priority,
421 );
422 let key = PriorityKey {
423 hash: channel_record.hash,
424 priority: channel_record.priority,
425 };
426 if let Some((_channel, len)) = account_rec.by_priority.get(&key) {
427 would_free_size += *len;
428 evicted.insert(channel_record.hash);
429 }
430 }
431 }
432 }
433 for (entry, (_, len)) in account_rec.by_priority.iter() {
435 if (account_rec.data_size - would_free_size + statement_len <= max_size) &&
436 account_rec.by_priority.len() + 1 - evicted.len() <= max_count
437 {
438 break
440 }
441 if evicted.contains(&entry.hash) {
442 continue
444 }
445 if entry.priority >= priority {
446 log::debug!(
447 target: LOG_TARGET,
448 "Ignored message due to constraints {:?} {:?} < {:?}",
449 HexDisplay::from(&hash),
450 priority,
451 entry.priority,
452 );
453 return MaybeInserted::Ignored
454 }
455 evicted.insert(entry.hash);
456 would_free_size += len;
457 }
458 }
459 if !((self.total_size - would_free_size + statement_len <= self.options.max_total_size) &&
461 self.entries.len() + 1 - evicted.len() <= self.options.max_total_statements)
462 {
463 log::debug!(
464 target: LOG_TARGET,
465 "Ignored statement {} because the store is full (size={}, count={})",
466 HexDisplay::from(&hash),
467 self.total_size,
468 self.entries.len(),
469 );
470 return MaybeInserted::Ignored
471 }
472
473 for h in &evicted {
474 self.make_expired(h, current_time);
475 }
476 self.insert_new(hash, *account, statement);
477 MaybeInserted::Inserted(evicted)
478 }
479}
480
481impl Store {
482 pub fn new_shared<Block, Client>(
485 path: &std::path::Path,
486 options: Options,
487 client: Arc<Client>,
488 keystore: Arc<LocalKeystore>,
489 prometheus: Option<&PrometheusRegistry>,
490 task_spawner: &dyn SpawnNamed,
491 ) -> Result<Arc<Store>>
492 where
493 Block: BlockT,
494 Block::Hash: From<BlockHash>,
495 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
496 Client::Api: ValidateStatement<Block>,
497 {
498 let store = Arc::new(Self::new(path, options, client, keystore, prometheus)?);
499
500 let worker_store = store.clone();
502 task_spawner.spawn(
503 "statement-store-maintenance",
504 Some("statement-store"),
505 Box::pin(async move {
506 let mut interval = tokio::time::interval(MAINTENANCE_PERIOD);
507 loop {
508 interval.tick().await;
509 worker_store.maintain();
510 }
511 }),
512 );
513
514 Ok(store)
515 }
516
517 fn new<Block, Client>(
520 path: &std::path::Path,
521 options: Options,
522 client: Arc<Client>,
523 keystore: Arc<LocalKeystore>,
524 prometheus: Option<&PrometheusRegistry>,
525 ) -> Result<Store>
526 where
527 Block: BlockT,
528 Block::Hash: From<BlockHash>,
529 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
530 Client::Api: ValidateStatement<Block>,
531 {
532 let mut path: std::path::PathBuf = path.into();
533 path.push("statements");
534
535 let mut config = parity_db::Options::with_columns(&path, col::COUNT);
536
537 let statement_col = &mut config.columns[col::STATEMENTS as usize];
538 statement_col.ref_counted = false;
539 statement_col.preimage = true;
540 statement_col.uniform = true;
541 let db = parity_db::Db::open_or_create(&config).map_err(|e| Error::Db(e.to_string()))?;
542 match db.get(col::META, &KEY_VERSION).map_err(|e| Error::Db(e.to_string()))? {
543 Some(version) => {
544 let version = u32::from_le_bytes(
545 version
546 .try_into()
547 .map_err(|_| Error::Db("Error reading database version".into()))?,
548 );
549 if version != CURRENT_VERSION {
550 return Err(Error::Db(format!("Unsupported database version: {version}")))
551 }
552 },
553 None => {
554 db.commit([(
555 col::META,
556 KEY_VERSION.to_vec(),
557 Some(CURRENT_VERSION.to_le_bytes().to_vec()),
558 )])
559 .map_err(|e| Error::Db(e.to_string()))?;
560 },
561 }
562
563 let validator = ClientWrapper { client, _block: Default::default() };
564 let validate_fn = Box::new(move |block, source, statement| {
565 validator.validate_statement(block, source, statement)
566 });
567
568 let store = Store {
569 db,
570 index: RwLock::new(Index::new(options)),
571 validate_fn,
572 keystore,
573 time_override: None,
574 metrics: PrometheusMetrics::new(prometheus),
575 };
576 store.populate()?;
577 Ok(store)
578 }
579
580 fn populate(&self) -> Result<()> {
585 {
586 let mut index = self.index.write();
587 self.db
588 .iter_column_while(col::STATEMENTS, |item| {
589 let statement = item.value;
590 if let Ok(statement) = Statement::decode(&mut statement.as_slice()) {
591 let hash = statement.hash();
592 log::trace!(
593 target: LOG_TARGET,
594 "Statement loaded {:?}",
595 HexDisplay::from(&hash)
596 );
597 if let Some(account_id) = statement.account_id() {
598 index.insert_new(hash, account_id, &statement);
599 } else {
600 log::debug!(
601 target: LOG_TARGET,
602 "Error decoding statement loaded from the DB: {:?}",
603 HexDisplay::from(&hash)
604 );
605 }
606 }
607 true
608 })
609 .map_err(|e| Error::Db(e.to_string()))?;
610 self.db
611 .iter_column_while(col::EXPIRED, |item| {
612 let expired_info = item.value;
613 if let Ok((hash, timestamp)) =
614 <(Hash, u64)>::decode(&mut expired_info.as_slice())
615 {
616 log::trace!(
617 target: LOG_TARGET,
618 "Statement loaded (expired): {:?}",
619 HexDisplay::from(&hash)
620 );
621 index.insert_expired(hash, timestamp);
622 }
623 true
624 })
625 .map_err(|e| Error::Db(e.to_string()))?;
626 }
627
628 self.maintain();
629 Ok(())
630 }
631
632 fn collect_statements<R>(
633 &self,
634 key: Option<DecryptionKey>,
635 match_all_topics: &[Topic],
636 mut f: impl FnMut(Statement) -> Option<R>,
637 ) -> Result<Vec<R>> {
638 let mut result = Vec::new();
639 let index = self.index.read();
640 index.iterate_with(key, match_all_topics, |hash| {
641 match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? {
642 Some(entry) => {
643 if let Ok(statement) = Statement::decode(&mut entry.as_slice()) {
644 if let Some(data) = f(statement) {
645 result.push(data);
646 }
647 } else {
648 log::warn!(
650 target: LOG_TARGET,
651 "Corrupt statement {:?}",
652 HexDisplay::from(hash)
653 );
654 }
655 },
656 None => {
657 log::warn!(
659 target: LOG_TARGET,
660 "Missing statement {:?}",
661 HexDisplay::from(hash)
662 );
663 },
664 }
665 Ok(())
666 })?;
667 Ok(result)
668 }
669
670 pub fn maintain(&self) {
672 log::trace!(target: LOG_TARGET, "Started store maintenance");
673 let (deleted, active_count, expired_count): (Vec<_>, usize, usize) = {
674 let mut index = self.index.write();
675 let deleted = index.maintain(self.timestamp());
676 (deleted, index.entries.len(), index.expired.len())
677 };
678 let deleted: Vec<_> =
679 deleted.into_iter().map(|hash| (col::EXPIRED, hash.to_vec(), None)).collect();
680 let deleted_count = deleted.len() as u64;
681 if let Err(e) = self.db.commit(deleted) {
682 log::warn!(target: LOG_TARGET, "Error writing to the statement database: {:?}", e);
683 } else {
684 self.metrics.report(|metrics| metrics.statements_pruned.inc_by(deleted_count));
685 }
686 log::trace!(
687 target: LOG_TARGET,
688 "Completed store maintenance. Purged: {}, Active: {}, Expired: {}",
689 deleted_count,
690 active_count,
691 expired_count
692 );
693 }
694
695 fn timestamp(&self) -> u64 {
696 self.time_override.unwrap_or_else(|| {
697 std::time::SystemTime::now()
698 .duration_since(std::time::UNIX_EPOCH)
699 .unwrap_or_default()
700 .as_secs()
701 })
702 }
703
704 #[cfg(test)]
705 fn set_time(&mut self, time: u64) {
706 self.time_override = Some(time);
707 }
708
709 pub fn as_statement_store_ext(self: Arc<Self>) -> StatementStoreExt {
711 StatementStoreExt::new(self)
712 }
713
714 fn posted_clear_inner<R>(
717 &self,
718 match_all_topics: &[Topic],
719 dest: [u8; 32],
720 mut map_f: impl FnMut(Statement, Vec<u8>) -> R,
722 ) -> Result<Vec<R>> {
723 self.collect_statements(Some(dest), match_all_topics, |statement| {
724 if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) {
725 let public: sp_core::ed25519::Public = UncheckedFrom::unchecked_from(key);
726 let public: sp_statement_store::ed25519::Public = public.into();
727 match self.keystore.key_pair::<sp_statement_store::ed25519::Pair>(&public) {
728 Err(e) => {
729 log::debug!(
730 target: LOG_TARGET,
731 "Keystore error: {:?}, for statement {:?}",
732 e,
733 HexDisplay::from(&statement.hash())
734 );
735 None
736 },
737 Ok(None) => {
738 log::debug!(
739 target: LOG_TARGET,
740 "Keystore is missing key for statement {:?}",
741 HexDisplay::from(&statement.hash())
742 );
743 None
744 },
745 Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) {
746 Ok(r) => r.map(|data| map_f(statement, data)),
747 Err(e) => {
748 log::debug!(
749 target: LOG_TARGET,
750 "Decryption error: {:?}, for statement {:?}",
751 e,
752 HexDisplay::from(&statement.hash())
753 );
754 None
755 },
756 },
757 }
758 } else {
759 None
760 }
761 })
762 }
763}
764
765impl StatementStore for Store {
766 fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
768 let index = self.index.read();
769 let mut result = Vec::with_capacity(index.entries.len());
770 for h in index.entries.keys() {
771 let encoded = self.db.get(col::STATEMENTS, h).map_err(|e| Error::Db(e.to_string()))?;
772 if let Some(encoded) = encoded {
773 if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
774 let hash = statement.hash();
775 result.push((hash, statement));
776 }
777 }
778 }
779 Ok(result)
780 }
781
782 fn statement(&self, hash: &Hash) -> Result<Option<Statement>> {
784 Ok(
785 match self
786 .db
787 .get(col::STATEMENTS, hash.as_slice())
788 .map_err(|e| Error::Db(e.to_string()))?
789 {
790 Some(entry) => {
791 log::trace!(
792 target: LOG_TARGET,
793 "Queried statement {:?}",
794 HexDisplay::from(hash)
795 );
796 Some(
797 Statement::decode(&mut entry.as_slice())
798 .map_err(|e| Error::Decode(e.to_string()))?,
799 )
800 },
801 None => {
802 log::trace!(
803 target: LOG_TARGET,
804 "Queried missing statement {:?}",
805 HexDisplay::from(hash)
806 );
807 None
808 },
809 },
810 )
811 }
812
813 fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
816 self.collect_statements(None, match_all_topics, |statement| statement.into_data())
817 }
818
819 fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
823 self.collect_statements(Some(dest), match_all_topics, |statement| statement.into_data())
824 }
825
826 fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
829 self.posted_clear_inner(match_all_topics, dest, |_statement, data| data)
830 }
831
832 fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
835 self.collect_statements(None, match_all_topics, |statement| Some(statement.encode()))
836 }
837
838 fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
842 self.collect_statements(Some(dest), match_all_topics, |statement| Some(statement.encode()))
843 }
844
845 fn posted_clear_stmt(
848 &self,
849 match_all_topics: &[Topic],
850 dest: [u8; 32],
851 ) -> Result<Vec<Vec<u8>>> {
852 self.posted_clear_inner(match_all_topics, dest, |statement, data| {
853 let mut res = Vec::with_capacity(statement.size_hint() + data.len());
854 statement.encode_to(&mut res);
855 res.extend_from_slice(&data);
856 res
857 })
858 }
859
860 fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult {
862 let hash = statement.hash();
863 match self.index.read().query(&hash) {
864 IndexQuery::Expired =>
865 if !source.can_be_resubmitted() {
866 return SubmitResult::KnownExpired
867 },
868 IndexQuery::Exists =>
869 if !source.can_be_resubmitted() {
870 return SubmitResult::Known
871 },
872 IndexQuery::Unknown => {},
873 }
874
875 let Some(account_id) = statement.account_id() else {
876 log::debug!(
877 target: LOG_TARGET,
878 "Statement validation failed: Missing proof ({:?})",
879 HexDisplay::from(&hash),
880 );
881 self.metrics.report(|metrics| metrics.validations_invalid.inc());
882 return SubmitResult::Bad("No statement proof")
883 };
884
885 let at_block = if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
887 Some(*block_hash)
888 } else {
889 None
890 };
891 let validation_result = (self.validate_fn)(at_block, source, statement.clone());
892 let validation = match validation_result {
893 Ok(validation) => validation,
894 Err(InvalidStatement::BadProof) => {
895 log::debug!(
896 target: LOG_TARGET,
897 "Statement validation failed: BadProof, {:?}",
898 HexDisplay::from(&hash),
899 );
900 self.metrics.report(|metrics| metrics.validations_invalid.inc());
901 return SubmitResult::Bad("Bad statement proof")
902 },
903 Err(InvalidStatement::NoProof) => {
904 log::debug!(
905 target: LOG_TARGET,
906 "Statement validation failed: NoProof, {:?}",
907 HexDisplay::from(&hash),
908 );
909 self.metrics.report(|metrics| metrics.validations_invalid.inc());
910 return SubmitResult::Bad("Missing statement proof")
911 },
912 Err(InvalidStatement::InternalError) =>
913 return SubmitResult::InternalError(Error::Runtime),
914 };
915
916 let current_time = self.timestamp();
917 let mut commit = Vec::new();
918 {
919 let mut index = self.index.write();
920
921 let evicted =
922 match index.insert(hash, &statement, &account_id, &validation, current_time) {
923 MaybeInserted::Ignored => return SubmitResult::Ignored,
924 MaybeInserted::Inserted(evicted) => evicted,
925 };
926
927 commit.push((col::STATEMENTS, hash.to_vec(), Some(statement.encode())));
928 for hash in evicted {
929 commit.push((col::STATEMENTS, hash.to_vec(), None));
930 commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
931 }
932 if let Err(e) = self.db.commit(commit) {
933 log::debug!(
934 target: LOG_TARGET,
935 "Statement validation failed: database error {}, {:?}",
936 e,
937 statement
938 );
939 return SubmitResult::InternalError(Error::Db(e.to_string()))
940 }
941 } self.metrics.report(|metrics| metrics.submitted_statements.inc());
943 let network_priority = NetworkPriority::High;
944 log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash));
945 SubmitResult::New(network_priority)
946 }
947
948 fn remove(&self, hash: &Hash) -> Result<()> {
950 let current_time = self.timestamp();
951 {
952 let mut index = self.index.write();
953 if index.make_expired(hash, current_time) {
954 let commit = [
955 (col::STATEMENTS, hash.to_vec(), None),
956 (col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())),
957 ];
958 if let Err(e) = self.db.commit(commit) {
959 log::debug!(
960 target: LOG_TARGET,
961 "Error removing statement: database error {}, {:?}",
962 e,
963 HexDisplay::from(hash),
964 );
965 return Err(Error::Db(e.to_string()))
966 }
967 }
968 }
969 Ok(())
970 }
971
972 fn remove_by(&self, who: [u8; 32]) -> Result<()> {
974 let mut index = self.index.write();
975 let mut evicted = Vec::new();
976 if let Some(account_rec) = index.accounts.get(&who) {
977 evicted.extend(account_rec.by_priority.keys().map(|k| k.hash));
978 }
979
980 let current_time = self.timestamp();
981 let mut commit = Vec::new();
982 for hash in evicted {
983 index.make_expired(&hash, current_time);
984 commit.push((col::STATEMENTS, hash.to_vec(), None));
985 commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
986 }
987 self.db.commit(commit).map_err(|e| {
988 log::debug!(
989 target: LOG_TARGET,
990 "Error removing statement: database error {}, remove by {:?}",
991 e,
992 HexDisplay::from(&who),
993 );
994
995 Error::Db(e.to_string())
996 })
997 }
998}
999
1000#[cfg(test)]
1001mod tests {
1002 use crate::Store;
1003 use sc_keystore::Keystore;
1004 use sp_core::{Decode, Encode, Pair};
1005 use sp_statement_store::{
1006 runtime_api::{InvalidStatement, ValidStatement, ValidateStatement},
1007 AccountId, Channel, DecryptionKey, NetworkPriority, Proof, SignatureVerificationResult,
1008 Statement, StatementSource, StatementStore, SubmitResult, Topic,
1009 };
1010
1011 type Extrinsic = sp_runtime::OpaqueExtrinsic;
1012 type Hash = sp_core::H256;
1013 type Hashing = sp_runtime::traits::BlakeTwo256;
1014 type BlockNumber = u64;
1015 type Header = sp_runtime::generic::Header<BlockNumber, Hashing>;
1016 type Block = sp_runtime::generic::Block<Header, Extrinsic>;
1017
1018 const CORRECT_BLOCK_HASH: [u8; 32] = [1u8; 32];
1019
1020 #[derive(Clone)]
1021 pub(crate) struct TestClient;
1022
1023 pub(crate) struct RuntimeApi {
1024 _inner: TestClient,
1025 }
1026
1027 impl sp_api::ProvideRuntimeApi<Block> for TestClient {
1028 type Api = RuntimeApi;
1029 fn runtime_api(&self) -> sp_api::ApiRef<Self::Api> {
1030 RuntimeApi { _inner: self.clone() }.into()
1031 }
1032 }
1033
1034 sp_api::mock_impl_runtime_apis! {
1035 impl ValidateStatement<Block> for RuntimeApi {
1036 fn validate_statement(
1037 _source: StatementSource,
1038 statement: Statement,
1039 ) -> std::result::Result<ValidStatement, InvalidStatement> {
1040 use crate::tests::account;
1041 match statement.verify_signature() {
1042 SignatureVerificationResult::Valid(_) => Ok(ValidStatement{max_count: 100, max_size: 1000}),
1043 SignatureVerificationResult::Invalid => Err(InvalidStatement::BadProof),
1044 SignatureVerificationResult::NoSignature => {
1045 if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
1046 if block_hash == &CORRECT_BLOCK_HASH {
1047 let (max_count, max_size) = match statement.account_id() {
1048 Some(a) if a == account(1) => (1, 1000),
1049 Some(a) if a == account(2) => (2, 1000),
1050 Some(a) if a == account(3) => (3, 1000),
1051 Some(a) if a == account(4) => (4, 1000),
1052 _ => (2, 2000),
1053 };
1054 Ok(ValidStatement{ max_count, max_size })
1055 } else {
1056 Err(InvalidStatement::BadProof)
1057 }
1058 } else {
1059 Err(InvalidStatement::BadProof)
1060 }
1061 }
1062 }
1063 }
1064 }
1065 }
1066
1067 impl sp_blockchain::HeaderBackend<Block> for TestClient {
1068 fn header(&self, _hash: Hash) -> sp_blockchain::Result<Option<Header>> {
1069 unimplemented!()
1070 }
1071 fn info(&self) -> sp_blockchain::Info<Block> {
1072 sp_blockchain::Info {
1073 best_hash: CORRECT_BLOCK_HASH.into(),
1074 best_number: 0,
1075 genesis_hash: Default::default(),
1076 finalized_hash: CORRECT_BLOCK_HASH.into(),
1077 finalized_number: 1,
1078 finalized_state: None,
1079 number_leaves: 0,
1080 block_gap: None,
1081 }
1082 }
1083 fn status(&self, _hash: Hash) -> sp_blockchain::Result<sp_blockchain::BlockStatus> {
1084 unimplemented!()
1085 }
1086 fn number(&self, _hash: Hash) -> sp_blockchain::Result<Option<BlockNumber>> {
1087 unimplemented!()
1088 }
1089 fn hash(&self, _number: BlockNumber) -> sp_blockchain::Result<Option<Hash>> {
1090 unimplemented!()
1091 }
1092 }
1093
1094 fn test_store() -> (Store, tempfile::TempDir) {
1095 sp_tracing::init_for_tests();
1096 let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
1097
1098 let client = std::sync::Arc::new(TestClient);
1099 let mut path: std::path::PathBuf = temp_dir.path().into();
1100 path.push("db");
1101 let keystore = std::sync::Arc::new(sc_keystore::LocalKeystore::in_memory());
1102 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1103 (store, temp_dir) }
1105
1106 fn signed_statement(data: u8) -> Statement {
1107 signed_statement_with_topics(data, &[], None)
1108 }
1109
1110 fn signed_statement_with_topics(
1111 data: u8,
1112 topics: &[Topic],
1113 dec_key: Option<DecryptionKey>,
1114 ) -> Statement {
1115 let mut statement = Statement::new();
1116 statement.set_plain_data(vec![data]);
1117 for i in 0..topics.len() {
1118 statement.set_topic(i, topics[i]);
1119 }
1120 if let Some(key) = dec_key {
1121 statement.set_decryption_key(key);
1122 }
1123 let kp = sp_core::ed25519::Pair::from_string("//Alice", None).unwrap();
1124 statement.sign_ed25519_private(&kp);
1125 statement
1126 }
1127
1128 fn topic(data: u64) -> Topic {
1129 let mut topic: Topic = Default::default();
1130 topic[0..8].copy_from_slice(&data.to_le_bytes());
1131 topic
1132 }
1133
1134 fn dec_key(data: u64) -> DecryptionKey {
1135 let mut dec_key: DecryptionKey = Default::default();
1136 dec_key[0..8].copy_from_slice(&data.to_le_bytes());
1137 dec_key
1138 }
1139
1140 fn account(id: u64) -> AccountId {
1141 let mut account: AccountId = Default::default();
1142 account[0..8].copy_from_slice(&id.to_le_bytes());
1143 account
1144 }
1145
1146 fn channel(id: u64) -> Channel {
1147 let mut channel: Channel = Default::default();
1148 channel[0..8].copy_from_slice(&id.to_le_bytes());
1149 channel
1150 }
1151
1152 fn statement(account_id: u64, priority: u32, c: Option<u64>, data_len: usize) -> Statement {
1153 let mut statement = Statement::new();
1154 let mut data = Vec::new();
1155 data.resize(data_len, 0);
1156 statement.set_plain_data(data);
1157 statement.set_priority(priority);
1158 if let Some(c) = c {
1159 statement.set_channel(channel(c));
1160 }
1161 statement.set_proof(Proof::OnChain {
1162 block_hash: CORRECT_BLOCK_HASH,
1163 who: account(account_id),
1164 event_index: 0,
1165 });
1166 statement
1167 }
1168
1169 #[test]
1170 fn submit_one() {
1171 let (store, _temp) = test_store();
1172 let statement0 = signed_statement(0);
1173 assert_eq!(
1174 store.submit(statement0, StatementSource::Network),
1175 SubmitResult::New(NetworkPriority::High)
1176 );
1177 let unsigned = statement(0, 1, None, 0);
1178 assert_eq!(
1179 store.submit(unsigned, StatementSource::Network),
1180 SubmitResult::New(NetworkPriority::High)
1181 );
1182 }
1183
1184 #[test]
1185 fn save_and_load_statements() {
1186 let (store, temp) = test_store();
1187 let statement0 = signed_statement(0);
1188 let statement1 = signed_statement(1);
1189 let statement2 = signed_statement(2);
1190 assert_eq!(
1191 store.submit(statement0.clone(), StatementSource::Network),
1192 SubmitResult::New(NetworkPriority::High)
1193 );
1194 assert_eq!(
1195 store.submit(statement1.clone(), StatementSource::Network),
1196 SubmitResult::New(NetworkPriority::High)
1197 );
1198 assert_eq!(
1199 store.submit(statement2.clone(), StatementSource::Network),
1200 SubmitResult::New(NetworkPriority::High)
1201 );
1202 assert_eq!(store.statements().unwrap().len(), 3);
1203 assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1204 assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1.clone()));
1205 let keystore = store.keystore.clone();
1206 drop(store);
1207
1208 let client = std::sync::Arc::new(TestClient);
1209 let mut path: std::path::PathBuf = temp.path().into();
1210 path.push("db");
1211 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1212 assert_eq!(store.statements().unwrap().len(), 3);
1213 assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1214 assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
1215 }
1216
1217 #[test]
1218 fn search_by_topic_and_key() {
1219 let (store, _temp) = test_store();
1220 let statement0 = signed_statement(0);
1221 let statement1 = signed_statement_with_topics(1, &[topic(0)], None);
1222 let statement2 = signed_statement_with_topics(2, &[topic(0), topic(1)], Some(dec_key(2)));
1223 let statement3 = signed_statement_with_topics(3, &[topic(0), topic(1), topic(2)], None);
1224 let statement4 =
1225 signed_statement_with_topics(4, &[topic(0), topic(42), topic(2), topic(3)], None);
1226 let statements = vec![statement0, statement1, statement2, statement3, statement4];
1227 for s in &statements {
1228 store.submit(s.clone(), StatementSource::Network);
1229 }
1230
1231 let assert_topics = |topics: &[u64], key: Option<u64>, expected: &[u8]| {
1232 let key = key.map(dec_key);
1233 let topics: Vec<_> = topics.iter().map(|t| topic(*t)).collect();
1234 let mut got_vals: Vec<_> = if let Some(key) = key {
1235 store.posted(&topics, key).unwrap().into_iter().map(|d| d[0]).collect()
1236 } else {
1237 store.broadcasts(&topics).unwrap().into_iter().map(|d| d[0]).collect()
1238 };
1239 got_vals.sort();
1240 assert_eq!(expected.to_vec(), got_vals);
1241 };
1242
1243 assert_topics(&[], None, &[0, 1, 3, 4]);
1244 assert_topics(&[], Some(2), &[2]);
1245 assert_topics(&[0], None, &[1, 3, 4]);
1246 assert_topics(&[1], None, &[3]);
1247 assert_topics(&[2], None, &[3, 4]);
1248 assert_topics(&[3], None, &[4]);
1249 assert_topics(&[42], None, &[4]);
1250
1251 assert_topics(&[0, 1], None, &[3]);
1252 assert_topics(&[0, 1], Some(2), &[2]);
1253 assert_topics(&[0, 1, 99], Some(2), &[]);
1254 assert_topics(&[1, 2], None, &[3]);
1255 assert_topics(&[99], None, &[]);
1256 assert_topics(&[0, 99], None, &[]);
1257 assert_topics(&[0, 1, 2, 3, 42], None, &[]);
1258 }
1259
1260 #[test]
1261 fn constraints() {
1262 let (store, _temp) = test_store();
1263
1264 store.index.write().options.max_total_size = 3000;
1265 let source = StatementSource::Network;
1266 let ok = SubmitResult::New(NetworkPriority::High);
1267 let ignored = SubmitResult::Ignored;
1268
1269 assert_eq!(store.submit(statement(1, 1, Some(1), 2000), source), ignored);
1273 assert_eq!(store.submit(statement(1, 1, Some(1), 500), source), ok);
1274 assert_eq!(store.submit(statement(1, 1, Some(1), 200), source), ignored);
1276 assert_eq!(store.submit(statement(1, 2, Some(1), 600), source), ok);
1277 assert_eq!(store.submit(statement(1, 1, Some(2), 100), source), ignored);
1280 assert_eq!(store.index.read().expired.len(), 1);
1281
1282 assert_eq!(store.submit(statement(2, 1, None, 500), source), ok);
1285 assert_eq!(store.submit(statement(2, 2, None, 100), source), ok);
1286 assert_eq!(store.submit(statement(2, 3, None, 500), source), ok);
1288 assert_eq!(store.index.read().expired.len(), 2);
1289 assert_eq!(store.submit(statement(2, 4, None, 1000), source), ok);
1291 assert_eq!(store.index.read().expired.len(), 4);
1292
1293 assert_eq!(store.submit(statement(3, 2, Some(1), 300), source), ok);
1296 assert_eq!(store.submit(statement(3, 3, Some(2), 300), source), ok);
1297 assert_eq!(store.submit(statement(3, 4, Some(3), 300), source), ok);
1298 assert_eq!(store.submit(statement(3, 5, None, 500), source), ok);
1300 assert_eq!(store.index.read().expired.len(), 6);
1301
1302 assert_eq!(store.index.read().total_size, 2400);
1303 assert_eq!(store.index.read().entries.len(), 4);
1304
1305 assert_eq!(store.submit(statement(1, 1, None, 700), source), ignored);
1307 store.index.write().options.max_total_statements = 4;
1309 assert_eq!(store.submit(statement(1, 1, None, 100), source), ignored);
1310
1311 let mut expected_statements = vec![
1312 statement(1, 2, Some(1), 600).hash(),
1313 statement(2, 4, None, 1000).hash(),
1314 statement(3, 4, Some(3), 300).hash(),
1315 statement(3, 5, None, 500).hash(),
1316 ];
1317 expected_statements.sort();
1318 let mut statements: Vec<_> =
1319 store.statements().unwrap().into_iter().map(|(hash, _)| hash).collect();
1320 statements.sort();
1321 assert_eq!(expected_statements, statements);
1322 }
1323
1324 #[test]
1325 fn expired_statements_are_purged() {
1326 use super::DEFAULT_PURGE_AFTER_SEC;
1327 let (mut store, temp) = test_store();
1328 let mut statement = statement(1, 1, Some(3), 100);
1329 store.set_time(0);
1330 statement.set_topic(0, topic(4));
1331 store.submit(statement.clone(), StatementSource::Network);
1332 assert_eq!(store.index.read().entries.len(), 1);
1333 store.remove(&statement.hash()).unwrap();
1334 assert_eq!(store.index.read().entries.len(), 0);
1335 assert_eq!(store.index.read().accounts.len(), 0);
1336 store.set_time(DEFAULT_PURGE_AFTER_SEC + 1);
1337 store.maintain();
1338 assert_eq!(store.index.read().expired.len(), 0);
1339 let keystore = store.keystore.clone();
1340 drop(store);
1341
1342 let client = std::sync::Arc::new(TestClient);
1343 let mut path: std::path::PathBuf = temp.path().into();
1344 path.push("db");
1345 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1346 assert_eq!(store.statements().unwrap().len(), 0);
1347 assert_eq!(store.index.read().expired.len(), 0);
1348 }
1349
1350 #[test]
1351 fn posted_clear_decrypts() {
1352 let (store, _temp) = test_store();
1353 let public = store
1354 .keystore
1355 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1356 .unwrap();
1357 let statement1 = statement(1, 1, None, 100);
1358 let mut statement2 = statement(1, 2, None, 0);
1359 let plain = b"The most valuable secret".to_vec();
1360 statement2.encrypt(&plain, &public).unwrap();
1361 store.submit(statement1, StatementSource::Network);
1362 store.submit(statement2, StatementSource::Network);
1363 let posted_clear = store.posted_clear(&[], public.into()).unwrap();
1364 assert_eq!(posted_clear, vec![plain]);
1365 }
1366
1367 #[test]
1368 fn broadcasts_stmt_returns_encoded_statements() {
1369 let (store, _tmp) = test_store();
1370
1371 let s0 = signed_statement_with_topics(0, &[], None);
1373 let s1 = signed_statement_with_topics(1, &[topic(42)], None);
1375 let s2 = signed_statement_with_topics(2, &[topic(42)], Some(dec_key(99)));
1377
1378 for s in [&s0, &s1, &s2] {
1379 store.submit(s.clone(), StatementSource::Network);
1380 }
1381
1382 let mut hashes: Vec<_> = store
1384 .broadcasts_stmt(&[])
1385 .unwrap()
1386 .into_iter()
1387 .map(|bytes| Statement::decode(&mut &bytes[..]).unwrap().hash())
1388 .collect();
1389 hashes.sort();
1390 let expected_hashes = {
1391 let mut e = vec![s0.hash(), s1.hash()];
1392 e.sort();
1393 e
1394 };
1395 assert_eq!(hashes, expected_hashes);
1396
1397 let got = store.broadcasts_stmt(&[topic(42)]).unwrap();
1399 assert_eq!(got.len(), 1);
1400 let st = Statement::decode(&mut &got[0][..]).unwrap();
1401 assert_eq!(st.hash(), s1.hash());
1402 }
1403
1404 #[test]
1405 fn posted_stmt_returns_encoded_statements_for_dest() {
1406 let (store, _tmp) = test_store();
1407
1408 let public1 = store
1409 .keystore
1410 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1411 .unwrap();
1412 let dest: [u8; 32] = public1.into();
1413
1414 let public2 = store
1415 .keystore
1416 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1417 .unwrap();
1418
1419 let mut s_with_key = statement(1, 1, None, 0);
1421 let plain1 = b"The most valuable secret".to_vec();
1422 s_with_key.encrypt(&plain1, &public1).unwrap();
1423
1424 let mut s_other_key = statement(2, 2, None, 0);
1426 let plain2 = b"The second most valuable secret".to_vec();
1427 s_other_key.encrypt(&plain2, &public2).unwrap();
1428
1429 for s in [&s_with_key, &s_other_key] {
1431 store.submit(s.clone(), StatementSource::Network);
1432 }
1433
1434 let retrieved = store.posted_stmt(&[], dest).unwrap();
1436 assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1437
1438 let returned_stmt = Statement::decode(&mut &retrieved[0][..]).unwrap();
1440 assert_eq!(
1441 returned_stmt.hash(),
1442 s_with_key.hash(),
1443 "Returned statement must match s_with_key"
1444 );
1445 }
1446
1447 #[test]
1448 fn posted_clear_stmt_returns_statement_followed_by_plain_data() {
1449 let (store, _tmp) = test_store();
1450
1451 let public1 = store
1452 .keystore
1453 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1454 .unwrap();
1455 let dest: [u8; 32] = public1.into();
1456
1457 let public2 = store
1458 .keystore
1459 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1460 .unwrap();
1461
1462 let mut s_with_key = statement(1, 1, None, 0);
1464 let plain1 = b"The most valuable secret".to_vec();
1465 s_with_key.encrypt(&plain1, &public1).unwrap();
1466
1467 let mut s_other_key = statement(2, 2, None, 0);
1469 let plain2 = b"The second most valuable secret".to_vec();
1470 s_other_key.encrypt(&plain2, &public2).unwrap();
1471
1472 for s in [&s_with_key, &s_other_key] {
1474 store.submit(s.clone(), StatementSource::Network);
1475 }
1476
1477 let retrieved = store.posted_clear_stmt(&[], dest).unwrap();
1479 assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1480
1481 let encoded_stmt = s_with_key.encode();
1483 let stmt_len = encoded_stmt.len();
1484
1485 assert_eq!(&retrieved[0][..stmt_len], &encoded_stmt[..]);
1487
1488 let trailing = &retrieved[0][stmt_len..];
1490 assert_eq!(trailing, &plain1[..]);
1491 }
1492
1493 #[test]
1494 fn posted_clear_returns_plain_data_for_dest_and_topics() {
1495 let (store, _tmp) = test_store();
1496
1497 let public_dest = store
1499 .keystore
1500 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1501 .unwrap();
1502 let dest: [u8; 32] = public_dest.into();
1503
1504 let public_other = store
1505 .keystore
1506 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1507 .unwrap();
1508
1509 let mut s_good = statement(1, 1, None, 0);
1511 let plaintext_good = b"The most valuable secret".to_vec();
1512 s_good.encrypt(&plaintext_good, &public_dest).unwrap();
1513 s_good.set_topic(0, topic(42));
1514
1515 let mut s_wrong_topic = statement(2, 2, None, 0);
1517 s_wrong_topic.encrypt(b"Wrong topic", &public_dest).unwrap();
1518 s_wrong_topic.set_topic(0, topic(99));
1519
1520 let mut s_other_dest = statement(3, 3, None, 0);
1522 s_other_dest.encrypt(b"Other dest", &public_other).unwrap();
1523 s_other_dest.set_topic(0, topic(42));
1524
1525 for s in [&s_good, &s_wrong_topic, &s_other_dest] {
1527 store.submit(s.clone(), StatementSource::Network);
1528 }
1529
1530 let retrieved = store.posted_clear(&[topic(42)], dest).unwrap();
1532
1533 assert_eq!(retrieved, vec![plaintext_good]);
1535 }
1536
1537 #[test]
1538 fn remove_by_covers_various_situations() {
1539 use sp_statement_store::{StatementSource, StatementStore, SubmitResult};
1540
1541 let (mut store, _temp) = test_store();
1543 store.set_time(0);
1544
1545 let t42 = topic(42);
1547 let k7 = dec_key(7);
1548
1549 let mut s_a1 = statement(4, 10, Some(100), 100);
1552 s_a1.set_topic(0, t42);
1553 let h_a1 = s_a1.hash();
1554
1555 let mut s_a2 = statement(4, 20, Some(200), 150);
1556 s_a2.set_decryption_key(k7);
1557 let h_a2 = s_a2.hash();
1558
1559 let s_a3 = statement(4, 30, None, 50);
1560 let h_a3 = s_a3.hash();
1561
1562 let s_b1 = statement(3, 10, None, 100);
1564 let h_b1 = s_b1.hash();
1565
1566 let mut s_b2 = statement(3, 15, Some(300), 100);
1567 s_b2.set_topic(0, t42);
1568 s_b2.set_decryption_key(k7);
1569 let h_b2 = s_b2.hash();
1570
1571 for s in [&s_a1, &s_a2, &s_a3, &s_b1, &s_b2] {
1573 assert!(matches!(
1574 store.submit(s.clone(), StatementSource::Network),
1575 SubmitResult::New(_)
1576 ));
1577 }
1578
1579 {
1581 let idx = store.index.read();
1582 assert_eq!(idx.entries.len(), 5, "all 5 should be present");
1583 assert!(idx.accounts.contains_key(&account(4)));
1584 assert!(idx.accounts.contains_key(&account(3)));
1585 assert_eq!(idx.total_size, 100 + 150 + 50 + 100 + 100);
1586
1587 let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1589 assert!(set_t.contains(&h_a1) && set_t.contains(&h_b2));
1590
1591 let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1592 assert!(set_k.contains(&h_a2) && set_k.contains(&h_b2));
1593 }
1594
1595 store.remove_by(account(4)).expect("remove_by should succeed");
1597
1598 {
1600 for h in [h_a1, h_a2, h_a3] {
1602 assert!(store.statement(&h).unwrap().is_none(), "A's statement should be removed");
1603 }
1604
1605 for h in [h_b1, h_b2] {
1607 assert!(store.statement(&h).unwrap().is_some(), "B's statement should remain");
1608 }
1609
1610 let idx = store.index.read();
1611
1612 assert!(!idx.accounts.contains_key(&account(4)), "Account A must be gone");
1614 assert!(idx.accounts.contains_key(&account(3)), "Account B must remain");
1615
1616 assert!(idx.expired.contains_key(&h_a1));
1618 assert!(idx.expired.contains_key(&h_a2));
1619 assert!(idx.expired.contains_key(&h_a3));
1620 assert_eq!(idx.expired.len(), 3);
1621
1622 assert_eq!(idx.entries.len(), 2);
1624 assert_eq!(idx.total_size, 100 + 100);
1625
1626 let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1628 assert!(set_t.contains(&h_b2));
1629 assert!(!set_t.contains(&h_a1));
1630
1631 let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1633 assert!(set_k.contains(&h_b2));
1634 assert!(!set_k.contains(&h_a2));
1635 }
1636
1637 store.remove_by(account(4)).expect("second remove_by should be a no-op");
1639
1640 let purge_after = store.index.read().options.purge_after_sec;
1642 store.set_time(purge_after + 1);
1643 store.maintain();
1644 assert_eq!(store.index.read().expired.len(), 0, "expired entries should be purged");
1645
1646 let s_new = statement(4, 40, None, 10);
1648 assert!(matches!(store.submit(s_new, StatementSource::Network), SubmitResult::New(_)));
1649 }
1650}