1#![warn(missing_docs)]
48#![warn(unused_extern_crates)]
49
50mod metrics;
51
52pub use sp_statement_store::{Error, StatementStore, MAX_TOPICS};
53
54use metrics::MetricsLink as PrometheusMetrics;
55use parking_lot::RwLock;
56use prometheus_endpoint::Registry as PrometheusRegistry;
57use sc_keystore::LocalKeystore;
58use sp_api::ProvideRuntimeApi;
59use sp_blockchain::HeaderBackend;
60use sp_core::{crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode};
61use sp_runtime::traits::Block as BlockT;
62use sp_statement_store::{
63 runtime_api::{
64 InvalidStatement, StatementSource, StatementStoreExt, ValidStatement, ValidateStatement,
65 },
66 AccountId, BlockHash, Channel, DecryptionKey, Hash, NetworkPriority, Proof, Result, Statement,
67 SubmitResult, Topic,
68};
69use std::{
70 collections::{BTreeMap, HashMap, HashSet},
71 sync::Arc,
72};
73
74const KEY_VERSION: &[u8] = b"version".as_slice();
75const CURRENT_VERSION: u32 = 1;
76
77const LOG_TARGET: &str = "statement-store";
78
79pub const DEFAULT_PURGE_AFTER_SEC: u64 = 2 * 24 * 60 * 60; pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; pub const MAX_STATEMENT_SIZE: usize =
89 sc_network_statement::config::MAX_STATEMENT_NOTIFICATION_SIZE as usize - 1;
90
91const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(30);
92
93mod col {
94 pub const META: u8 = 0;
95 pub const STATEMENTS: u8 = 1;
96 pub const EXPIRED: u8 = 2;
97
98 pub const COUNT: u8 = 3;
99}
100
101#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)]
102struct Priority(u32);
103
104#[derive(PartialEq, Eq)]
105struct PriorityKey {
106 hash: Hash,
107 priority: Priority,
108}
109
110impl PartialOrd for PriorityKey {
111 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
112 Some(self.cmp(other))
113 }
114}
115
116impl Ord for PriorityKey {
117 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
118 self.priority.cmp(&other.priority).then_with(|| self.hash.cmp(&other.hash))
119 }
120}
121
122#[derive(PartialEq, Eq)]
123struct ChannelEntry {
124 hash: Hash,
125 priority: Priority,
126}
127
128#[derive(Default)]
129struct StatementsForAccount {
130 by_priority: BTreeMap<PriorityKey, (Option<Channel>, usize)>,
132 channels: HashMap<Channel, ChannelEntry>,
134 data_size: usize,
136}
137
138pub struct Options {
140 max_total_statements: usize,
143 max_total_size: usize,
146 purge_after_sec: u64,
148}
149
150impl Default for Options {
151 fn default() -> Self {
152 Options {
153 max_total_statements: DEFAULT_MAX_TOTAL_STATEMENTS,
154 max_total_size: DEFAULT_MAX_TOTAL_SIZE,
155 purge_after_sec: DEFAULT_PURGE_AFTER_SEC,
156 }
157 }
158}
159
160#[derive(Default)]
161struct Index {
162 recent: HashSet<Hash>,
163 by_topic: HashMap<Topic, HashSet<Hash>>,
164 by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
165 topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
166 entries: HashMap<Hash, (AccountId, Priority, usize)>,
167 expired: HashMap<Hash, u64>, accounts: HashMap<AccountId, StatementsForAccount>,
169 options: Options,
170 total_size: usize,
171}
172
173struct ClientWrapper<Block, Client> {
174 client: Arc<Client>,
175 _block: std::marker::PhantomData<Block>,
176}
177
178impl<Block, Client> ClientWrapper<Block, Client>
179where
180 Block: BlockT,
181 Block::Hash: From<BlockHash>,
182 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
183 Client::Api: ValidateStatement<Block>,
184{
185 fn validate_statement(
186 &self,
187 block: Option<BlockHash>,
188 source: StatementSource,
189 statement: Statement,
190 ) -> std::result::Result<ValidStatement, InvalidStatement> {
191 let api = self.client.runtime_api();
192 let block = block.map(Into::into).unwrap_or_else(|| {
193 self.client.info().finalized_hash
195 });
196 api.validate_statement(block, source, statement)
197 .map_err(|_| InvalidStatement::InternalError)?
198 }
199}
200
201pub struct Store {
203 db: parity_db::Db,
204 index: RwLock<Index>,
205 validate_fn: Box<
206 dyn Fn(
207 Option<BlockHash>,
208 StatementSource,
209 Statement,
210 ) -> std::result::Result<ValidStatement, InvalidStatement>
211 + Send
212 + Sync,
213 >,
214 keystore: Arc<LocalKeystore>,
215 time_override: Option<u64>,
217 metrics: PrometheusMetrics,
218}
219
220enum IndexQuery {
221 Unknown,
222 Exists,
223 Expired,
224}
225
226enum MaybeInserted {
227 Inserted(HashSet<Hash>),
228 Ignored,
229}
230
231impl Index {
232 fn new(options: Options) -> Index {
233 Index { options, ..Default::default() }
234 }
235
236 fn insert_new(&mut self, hash: Hash, account: AccountId, statement: &Statement) {
237 let mut all_topics = [None; MAX_TOPICS];
238 let mut nt = 0;
239 while let Some(t) = statement.topic(nt) {
240 self.by_topic.entry(t).or_default().insert(hash);
241 all_topics[nt] = Some(t);
242 nt += 1;
243 }
244 let key = statement.decryption_key();
245 self.by_dec_key.entry(key).or_default().insert(hash);
246 if nt > 0 || key.is_some() {
247 self.topics_and_keys.insert(hash, (all_topics, key));
248 }
249 let priority = Priority(statement.priority().unwrap_or(0));
250 self.entries.insert(hash, (account, priority, statement.data_len()));
251 self.recent.insert(hash);
252 self.total_size += statement.data_len();
253 let account_info = self.accounts.entry(account).or_default();
254 account_info.data_size += statement.data_len();
255 if let Some(channel) = statement.channel() {
256 account_info.channels.insert(channel, ChannelEntry { hash, priority });
257 }
258 account_info
259 .by_priority
260 .insert(PriorityKey { hash, priority }, (statement.channel(), statement.data_len()));
261 }
262
263 fn query(&self, hash: &Hash) -> IndexQuery {
264 if self.entries.contains_key(hash) {
265 return IndexQuery::Exists
266 }
267 if self.expired.contains_key(hash) {
268 return IndexQuery::Expired
269 }
270 IndexQuery::Unknown
271 }
272
273 fn insert_expired(&mut self, hash: Hash, timestamp: u64) {
274 self.expired.insert(hash, timestamp);
275 }
276
277 fn iterate_with(
278 &self,
279 key: Option<DecryptionKey>,
280 match_all_topics: &[Topic],
281 mut f: impl FnMut(&Hash) -> Result<()>,
282 ) -> Result<()> {
283 let empty = HashSet::new();
284 let mut sets: [&HashSet<Hash>; MAX_TOPICS + 1] = [∅ MAX_TOPICS + 1];
285 if match_all_topics.len() > MAX_TOPICS {
286 return Ok(())
287 }
288 let key_set = self.by_dec_key.get(&key);
289 if key_set.map_or(0, |s| s.len()) == 0 {
290 return Ok(())
292 }
293 sets[0] = key_set.expect("Function returns if key_set is None");
294 for (i, t) in match_all_topics.iter().enumerate() {
295 let set = self.by_topic.get(t);
296 if set.map_or(0, |s| s.len()) == 0 {
297 return Ok(())
299 }
300 sets[i + 1] = set.expect("Function returns if set is None");
301 }
302 let sets = &mut sets[0..match_all_topics.len() + 1];
303 sets.sort_by_key(|s| s.len());
305 for item in sets[0] {
306 if sets[1..].iter().all(|set| set.contains(item)) {
307 log::trace!(
308 target: LOG_TARGET,
309 "Iterating by topic/key: statement {:?}",
310 HexDisplay::from(item)
311 );
312 f(item)?
313 }
314 }
315 Ok(())
316 }
317
318 fn maintain(&mut self, current_time: u64) -> Vec<Hash> {
319 let mut purged = Vec::new();
321 self.expired.retain(|hash, timestamp| {
322 if *timestamp + self.options.purge_after_sec <= current_time {
323 purged.push(*hash);
324 log::trace!(target: LOG_TARGET, "Purged statement {:?}", HexDisplay::from(hash));
325 false
326 } else {
327 true
328 }
329 });
330 purged
331 }
332
333 fn take_recent(&mut self) -> HashSet<Hash> {
334 std::mem::take(&mut self.recent)
335 }
336
337 fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
338 if let Some((account, priority, len)) = self.entries.remove(hash) {
339 self.total_size -= len;
340 if let Some((topics, key)) = self.topics_and_keys.remove(hash) {
341 for t in topics.into_iter().flatten() {
342 if let std::collections::hash_map::Entry::Occupied(mut set) =
343 self.by_topic.entry(t)
344 {
345 set.get_mut().remove(hash);
346 if set.get().is_empty() {
347 set.remove_entry();
348 }
349 }
350 }
351 if let std::collections::hash_map::Entry::Occupied(mut set) =
352 self.by_dec_key.entry(key)
353 {
354 set.get_mut().remove(hash);
355 if set.get().is_empty() {
356 set.remove_entry();
357 }
358 }
359 }
360 let _ = self.recent.remove(hash);
361 self.expired.insert(*hash, current_time);
362 if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
363 self.accounts.entry(account)
364 {
365 let key = PriorityKey { hash: *hash, priority };
366 if let Some((channel, len)) = account_rec.get_mut().by_priority.remove(&key) {
367 account_rec.get_mut().data_size -= len;
368 if let Some(channel) = channel {
369 account_rec.get_mut().channels.remove(&channel);
370 }
371 }
372 if account_rec.get().by_priority.is_empty() {
373 account_rec.remove_entry();
374 }
375 }
376 log::trace!(target: LOG_TARGET, "Expired statement {:?}", HexDisplay::from(hash));
377 true
378 } else {
379 false
380 }
381 }
382
383 fn insert(
384 &mut self,
385 hash: Hash,
386 statement: &Statement,
387 account: &AccountId,
388 validation: &ValidStatement,
389 current_time: u64,
390 ) -> MaybeInserted {
391 let statement_len = statement.data_len();
392 if statement_len > validation.max_size as usize {
393 log::debug!(
394 target: LOG_TARGET,
395 "Ignored oversize message: {:?} ({} bytes)",
396 HexDisplay::from(&hash),
397 statement_len,
398 );
399 return MaybeInserted::Ignored
400 }
401
402 let mut evicted = HashSet::new();
403 let mut would_free_size = 0;
404 let priority = Priority(statement.priority().unwrap_or(0));
405 let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize);
406 if let Some(account_rec) = self.accounts.get(account) {
410 if let Some(channel) = statement.channel() {
411 if let Some(channel_record) = account_rec.channels.get(&channel) {
412 if priority <= channel_record.priority {
413 log::debug!(
415 target: LOG_TARGET,
416 "Ignored lower priority channel message: {:?} {:?} <= {:?}",
417 HexDisplay::from(&hash),
418 priority,
419 channel_record.priority,
420 );
421 return MaybeInserted::Ignored
422 } else {
423 log::debug!(
426 target: LOG_TARGET,
427 "Replacing higher priority channel message: {:?} ({:?}) > {:?} ({:?})",
428 HexDisplay::from(&hash),
429 priority,
430 HexDisplay::from(&channel_record.hash),
431 channel_record.priority,
432 );
433 let key = PriorityKey {
434 hash: channel_record.hash,
435 priority: channel_record.priority,
436 };
437 if let Some((_channel, len)) = account_rec.by_priority.get(&key) {
438 would_free_size += *len;
439 evicted.insert(channel_record.hash);
440 }
441 }
442 }
443 }
444 for (entry, (_, len)) in account_rec.by_priority.iter() {
446 if (account_rec.data_size - would_free_size + statement_len <= max_size) &&
447 account_rec.by_priority.len() + 1 - evicted.len() <= max_count
448 {
449 break
451 }
452 if evicted.contains(&entry.hash) {
453 continue
455 }
456 if entry.priority >= priority {
457 log::debug!(
458 target: LOG_TARGET,
459 "Ignored message due to constraints {:?} {:?} < {:?}",
460 HexDisplay::from(&hash),
461 priority,
462 entry.priority,
463 );
464 return MaybeInserted::Ignored
465 }
466 evicted.insert(entry.hash);
467 would_free_size += len;
468 }
469 }
470 if !((self.total_size - would_free_size + statement_len <= self.options.max_total_size) &&
472 self.entries.len() + 1 - evicted.len() <= self.options.max_total_statements)
473 {
474 log::debug!(
475 target: LOG_TARGET,
476 "Ignored statement {} because the store is full (size={}, count={})",
477 HexDisplay::from(&hash),
478 self.total_size,
479 self.entries.len(),
480 );
481 return MaybeInserted::Ignored
482 }
483
484 for h in &evicted {
485 self.make_expired(h, current_time);
486 }
487 self.insert_new(hash, *account, statement);
488 MaybeInserted::Inserted(evicted)
489 }
490}
491
492impl Store {
493 pub fn new_shared<Block, Client>(
496 path: &std::path::Path,
497 options: Options,
498 client: Arc<Client>,
499 keystore: Arc<LocalKeystore>,
500 prometheus: Option<&PrometheusRegistry>,
501 task_spawner: &dyn SpawnNamed,
502 ) -> Result<Arc<Store>>
503 where
504 Block: BlockT,
505 Block::Hash: From<BlockHash>,
506 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
507 Client::Api: ValidateStatement<Block>,
508 {
509 let store = Arc::new(Self::new(path, options, client, keystore, prometheus)?);
510
511 let worker_store = store.clone();
513 task_spawner.spawn(
514 "statement-store-maintenance",
515 Some("statement-store"),
516 Box::pin(async move {
517 let mut interval = tokio::time::interval(MAINTENANCE_PERIOD);
518 loop {
519 interval.tick().await;
520 worker_store.maintain();
521 }
522 }),
523 );
524
525 Ok(store)
526 }
527
528 #[doc(hidden)]
531 pub fn new<Block, Client>(
532 path: &std::path::Path,
533 options: Options,
534 client: Arc<Client>,
535 keystore: Arc<LocalKeystore>,
536 prometheus: Option<&PrometheusRegistry>,
537 ) -> Result<Store>
538 where
539 Block: BlockT,
540 Block::Hash: From<BlockHash>,
541 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
542 Client::Api: ValidateStatement<Block>,
543 {
544 let mut path: std::path::PathBuf = path.into();
545 path.push("statements");
546
547 let mut config = parity_db::Options::with_columns(&path, col::COUNT);
548
549 let statement_col = &mut config.columns[col::STATEMENTS as usize];
550 statement_col.ref_counted = false;
551 statement_col.preimage = true;
552 statement_col.uniform = true;
553 let db = parity_db::Db::open_or_create(&config).map_err(|e| Error::Db(e.to_string()))?;
554 match db.get(col::META, &KEY_VERSION).map_err(|e| Error::Db(e.to_string()))? {
555 Some(version) => {
556 let version = u32::from_le_bytes(
557 version
558 .try_into()
559 .map_err(|_| Error::Db("Error reading database version".into()))?,
560 );
561 if version != CURRENT_VERSION {
562 return Err(Error::Db(format!("Unsupported database version: {version}")))
563 }
564 },
565 None => {
566 db.commit([(
567 col::META,
568 KEY_VERSION.to_vec(),
569 Some(CURRENT_VERSION.to_le_bytes().to_vec()),
570 )])
571 .map_err(|e| Error::Db(e.to_string()))?;
572 },
573 }
574
575 let validator = ClientWrapper { client, _block: Default::default() };
576 let validate_fn = Box::new(move |block, source, statement| {
577 validator.validate_statement(block, source, statement)
578 });
579
580 let store = Store {
581 db,
582 index: RwLock::new(Index::new(options)),
583 validate_fn,
584 keystore,
585 time_override: None,
586 metrics: PrometheusMetrics::new(prometheus),
587 };
588 store.populate()?;
589 Ok(store)
590 }
591
592 fn populate(&self) -> Result<()> {
597 {
598 let mut index = self.index.write();
599 self.db
600 .iter_column_while(col::STATEMENTS, |item| {
601 let statement = item.value;
602 if let Ok(statement) = Statement::decode(&mut statement.as_slice()) {
603 let hash = statement.hash();
604 log::trace!(
605 target: LOG_TARGET,
606 "Statement loaded {:?}",
607 HexDisplay::from(&hash)
608 );
609 if let Some(account_id) = statement.account_id() {
610 index.insert_new(hash, account_id, &statement);
611 } else {
612 log::debug!(
613 target: LOG_TARGET,
614 "Error decoding statement loaded from the DB: {:?}",
615 HexDisplay::from(&hash)
616 );
617 }
618 }
619 true
620 })
621 .map_err(|e| Error::Db(e.to_string()))?;
622 self.db
623 .iter_column_while(col::EXPIRED, |item| {
624 let expired_info = item.value;
625 if let Ok((hash, timestamp)) =
626 <(Hash, u64)>::decode(&mut expired_info.as_slice())
627 {
628 log::trace!(
629 target: LOG_TARGET,
630 "Statement loaded (expired): {:?}",
631 HexDisplay::from(&hash)
632 );
633 index.insert_expired(hash, timestamp);
634 }
635 true
636 })
637 .map_err(|e| Error::Db(e.to_string()))?;
638 }
639
640 self.maintain();
641 Ok(())
642 }
643
644 fn collect_statements<R>(
645 &self,
646 key: Option<DecryptionKey>,
647 match_all_topics: &[Topic],
648 mut f: impl FnMut(Statement) -> Option<R>,
649 ) -> Result<Vec<R>> {
650 let mut result = Vec::new();
651 let index = self.index.read();
652 index.iterate_with(key, match_all_topics, |hash| {
653 match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? {
654 Some(entry) => {
655 if let Ok(statement) = Statement::decode(&mut entry.as_slice()) {
656 if let Some(data) = f(statement) {
657 result.push(data);
658 }
659 } else {
660 log::warn!(
662 target: LOG_TARGET,
663 "Corrupt statement {:?}",
664 HexDisplay::from(hash)
665 );
666 }
667 },
668 None => {
669 log::warn!(
671 target: LOG_TARGET,
672 "Missing statement {:?}",
673 HexDisplay::from(hash)
674 );
675 },
676 }
677 Ok(())
678 })?;
679 Ok(result)
680 }
681
682 pub fn maintain(&self) {
684 log::trace!(target: LOG_TARGET, "Started store maintenance");
685 let (deleted, active_count, expired_count): (Vec<_>, usize, usize) = {
686 let mut index = self.index.write();
687 let deleted = index.maintain(self.timestamp());
688 (deleted, index.entries.len(), index.expired.len())
689 };
690 let deleted: Vec<_> =
691 deleted.into_iter().map(|hash| (col::EXPIRED, hash.to_vec(), None)).collect();
692 let deleted_count = deleted.len() as u64;
693 if let Err(e) = self.db.commit(deleted) {
694 log::warn!(target: LOG_TARGET, "Error writing to the statement database: {:?}", e);
695 } else {
696 self.metrics.report(|metrics| metrics.statements_pruned.inc_by(deleted_count));
697 }
698 log::trace!(
699 target: LOG_TARGET,
700 "Completed store maintenance. Purged: {}, Active: {}, Expired: {}",
701 deleted_count,
702 active_count,
703 expired_count
704 );
705 }
706
707 fn timestamp(&self) -> u64 {
708 self.time_override.unwrap_or_else(|| {
709 std::time::SystemTime::now()
710 .duration_since(std::time::UNIX_EPOCH)
711 .unwrap_or_default()
712 .as_secs()
713 })
714 }
715
716 #[cfg(test)]
717 fn set_time(&mut self, time: u64) {
718 self.time_override = Some(time);
719 }
720
721 pub fn as_statement_store_ext(self: Arc<Self>) -> StatementStoreExt {
723 StatementStoreExt::new(self)
724 }
725
726 fn posted_clear_inner<R>(
729 &self,
730 match_all_topics: &[Topic],
731 dest: [u8; 32],
732 mut map_f: impl FnMut(Statement, Vec<u8>) -> R,
734 ) -> Result<Vec<R>> {
735 self.collect_statements(Some(dest), match_all_topics, |statement| {
736 if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) {
737 let public: sp_core::ed25519::Public = UncheckedFrom::unchecked_from(key);
738 let public: sp_statement_store::ed25519::Public = public.into();
739 match self.keystore.key_pair::<sp_statement_store::ed25519::Pair>(&public) {
740 Err(e) => {
741 log::debug!(
742 target: LOG_TARGET,
743 "Keystore error: {:?}, for statement {:?}",
744 e,
745 HexDisplay::from(&statement.hash())
746 );
747 None
748 },
749 Ok(None) => {
750 log::debug!(
751 target: LOG_TARGET,
752 "Keystore is missing key for statement {:?}",
753 HexDisplay::from(&statement.hash())
754 );
755 None
756 },
757 Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) {
758 Ok(r) => r.map(|data| map_f(statement, data)),
759 Err(e) => {
760 log::debug!(
761 target: LOG_TARGET,
762 "Decryption error: {:?}, for statement {:?}",
763 e,
764 HexDisplay::from(&statement.hash())
765 );
766 None
767 },
768 },
769 }
770 } else {
771 None
772 }
773 })
774 }
775}
776
777impl StatementStore for Store {
778 fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
780 let index = self.index.read();
781 let mut result = Vec::with_capacity(index.entries.len());
782 for hash in index.entries.keys().cloned() {
783 let Some(encoded) =
784 self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
785 else {
786 continue
787 };
788 if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
789 result.push((hash, statement));
790 }
791 }
792 Ok(result)
793 }
794
795 fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>> {
796 let mut index = self.index.write();
797 let recent = index.take_recent();
798 let mut result = Vec::with_capacity(recent.len());
799 for hash in recent {
800 let Some(encoded) =
801 self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
802 else {
803 continue
804 };
805 if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
806 result.push((hash, statement));
807 }
808 }
809 Ok(result)
810 }
811
812 fn statement(&self, hash: &Hash) -> Result<Option<Statement>> {
814 Ok(
815 match self
816 .db
817 .get(col::STATEMENTS, hash.as_slice())
818 .map_err(|e| Error::Db(e.to_string()))?
819 {
820 Some(entry) => {
821 log::trace!(
822 target: LOG_TARGET,
823 "Queried statement {:?}",
824 HexDisplay::from(hash)
825 );
826 Some(
827 Statement::decode(&mut entry.as_slice())
828 .map_err(|e| Error::Decode(e.to_string()))?,
829 )
830 },
831 None => {
832 log::trace!(
833 target: LOG_TARGET,
834 "Queried missing statement {:?}",
835 HexDisplay::from(hash)
836 );
837 None
838 },
839 },
840 )
841 }
842
843 fn has_statement(&self, hash: &Hash) -> bool {
844 self.index.read().entries.contains_key(hash)
845 }
846
847 fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
850 self.collect_statements(None, match_all_topics, |statement| statement.into_data())
851 }
852
853 fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
857 self.collect_statements(Some(dest), match_all_topics, |statement| statement.into_data())
858 }
859
860 fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
863 self.posted_clear_inner(match_all_topics, dest, |_statement, data| data)
864 }
865
866 fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
869 self.collect_statements(None, match_all_topics, |statement| Some(statement.encode()))
870 }
871
872 fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
876 self.collect_statements(Some(dest), match_all_topics, |statement| Some(statement.encode()))
877 }
878
879 fn posted_clear_stmt(
882 &self,
883 match_all_topics: &[Topic],
884 dest: [u8; 32],
885 ) -> Result<Vec<Vec<u8>>> {
886 self.posted_clear_inner(match_all_topics, dest, |statement, data| {
887 let mut res = Vec::with_capacity(statement.size_hint() + data.len());
888 statement.encode_to(&mut res);
889 res.extend_from_slice(&data);
890 res
891 })
892 }
893
894 fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult {
896 let hash = statement.hash();
897 let encoded_size = statement.encoded_size();
898 if encoded_size > MAX_STATEMENT_SIZE {
899 log::debug!(
900 target: LOG_TARGET,
901 "Statement is too big for propogation: {:?} ({}/{} bytes)",
902 HexDisplay::from(&hash),
903 statement.encoded_size(),
904 MAX_STATEMENT_SIZE
905 );
906 return SubmitResult::Ignored
907 }
908
909 match self.index.read().query(&hash) {
910 IndexQuery::Expired =>
911 if !source.can_be_resubmitted() {
912 return SubmitResult::KnownExpired
913 },
914 IndexQuery::Exists =>
915 if !source.can_be_resubmitted() {
916 return SubmitResult::Known
917 },
918 IndexQuery::Unknown => {},
919 }
920
921 let Some(account_id) = statement.account_id() else {
922 log::debug!(
923 target: LOG_TARGET,
924 "Statement validation failed: Missing proof ({:?})",
925 HexDisplay::from(&hash),
926 );
927 self.metrics.report(|metrics| metrics.validations_invalid.inc());
928 return SubmitResult::Bad("No statement proof")
929 };
930
931 let at_block = if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
933 Some(*block_hash)
934 } else {
935 None
936 };
937 let validation_result = (self.validate_fn)(at_block, source, statement.clone());
938 let validation = match validation_result {
939 Ok(validation) => validation,
940 Err(InvalidStatement::BadProof) => {
941 log::debug!(
942 target: LOG_TARGET,
943 "Statement validation failed: BadProof, {:?}",
944 HexDisplay::from(&hash),
945 );
946 self.metrics.report(|metrics| metrics.validations_invalid.inc());
947 return SubmitResult::Bad("Bad statement proof")
948 },
949 Err(InvalidStatement::NoProof) => {
950 log::debug!(
951 target: LOG_TARGET,
952 "Statement validation failed: NoProof, {:?}",
953 HexDisplay::from(&hash),
954 );
955 self.metrics.report(|metrics| metrics.validations_invalid.inc());
956 return SubmitResult::Bad("Missing statement proof")
957 },
958 Err(InvalidStatement::InternalError) =>
959 return SubmitResult::InternalError(Error::Runtime),
960 };
961
962 let current_time = self.timestamp();
963 let mut commit = Vec::new();
964 {
965 let mut index = self.index.write();
966
967 let evicted =
968 match index.insert(hash, &statement, &account_id, &validation, current_time) {
969 MaybeInserted::Ignored => return SubmitResult::Ignored,
970 MaybeInserted::Inserted(evicted) => evicted,
971 };
972
973 commit.push((col::STATEMENTS, hash.to_vec(), Some(statement.encode())));
974 for hash in evicted {
975 commit.push((col::STATEMENTS, hash.to_vec(), None));
976 commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
977 }
978 if let Err(e) = self.db.commit(commit) {
979 log::debug!(
980 target: LOG_TARGET,
981 "Statement validation failed: database error {}, {:?}",
982 e,
983 statement
984 );
985 return SubmitResult::InternalError(Error::Db(e.to_string()))
986 }
987 } self.metrics.report(|metrics| metrics.submitted_statements.inc());
989 let network_priority = NetworkPriority::High;
990 log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash));
991 SubmitResult::New(network_priority)
992 }
993
994 fn remove(&self, hash: &Hash) -> Result<()> {
996 let current_time = self.timestamp();
997 {
998 let mut index = self.index.write();
999 if index.make_expired(hash, current_time) {
1000 let commit = [
1001 (col::STATEMENTS, hash.to_vec(), None),
1002 (col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())),
1003 ];
1004 if let Err(e) = self.db.commit(commit) {
1005 log::debug!(
1006 target: LOG_TARGET,
1007 "Error removing statement: database error {}, {:?}",
1008 e,
1009 HexDisplay::from(hash),
1010 );
1011 return Err(Error::Db(e.to_string()))
1012 }
1013 }
1014 }
1015 Ok(())
1016 }
1017
1018 fn remove_by(&self, who: [u8; 32]) -> Result<()> {
1020 let mut index = self.index.write();
1021 let mut evicted = Vec::new();
1022 if let Some(account_rec) = index.accounts.get(&who) {
1023 evicted.extend(account_rec.by_priority.keys().map(|k| k.hash));
1024 }
1025
1026 let current_time = self.timestamp();
1027 let mut commit = Vec::new();
1028 for hash in evicted {
1029 index.make_expired(&hash, current_time);
1030 commit.push((col::STATEMENTS, hash.to_vec(), None));
1031 commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
1032 }
1033 self.db.commit(commit).map_err(|e| {
1034 log::debug!(
1035 target: LOG_TARGET,
1036 "Error removing statement: database error {}, remove by {:?}",
1037 e,
1038 HexDisplay::from(&who),
1039 );
1040
1041 Error::Db(e.to_string())
1042 })
1043 }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048 use crate::Store;
1049 use sc_keystore::Keystore;
1050 use sp_core::{Decode, Encode, Pair};
1051 use sp_statement_store::{
1052 runtime_api::{InvalidStatement, ValidStatement, ValidateStatement},
1053 AccountId, Channel, DecryptionKey, NetworkPriority, Proof, SignatureVerificationResult,
1054 Statement, StatementSource, StatementStore, SubmitResult, Topic,
1055 };
1056
1057 type Extrinsic = sp_runtime::OpaqueExtrinsic;
1058 type Hash = sp_core::H256;
1059 type Hashing = sp_runtime::traits::BlakeTwo256;
1060 type BlockNumber = u64;
1061 type Header = sp_runtime::generic::Header<BlockNumber, Hashing>;
1062 type Block = sp_runtime::generic::Block<Header, Extrinsic>;
1063
1064 const CORRECT_BLOCK_HASH: [u8; 32] = [1u8; 32];
1065
1066 #[derive(Clone)]
1067 pub(crate) struct TestClient;
1068
1069 pub(crate) struct RuntimeApi {
1070 _inner: TestClient,
1071 }
1072
1073 impl sp_api::ProvideRuntimeApi<Block> for TestClient {
1074 type Api = RuntimeApi;
1075 fn runtime_api(&self) -> sp_api::ApiRef<'_, Self::Api> {
1076 RuntimeApi { _inner: self.clone() }.into()
1077 }
1078 }
1079
1080 sp_api::mock_impl_runtime_apis! {
1081 impl ValidateStatement<Block> for RuntimeApi {
1082 fn validate_statement(
1083 _source: StatementSource,
1084 statement: Statement,
1085 ) -> std::result::Result<ValidStatement, InvalidStatement> {
1086 use crate::tests::account;
1087 match statement.verify_signature() {
1088 SignatureVerificationResult::Valid(_) => Ok(ValidStatement{max_count: 100, max_size: 1000}),
1089 SignatureVerificationResult::Invalid => Err(InvalidStatement::BadProof),
1090 SignatureVerificationResult::NoSignature => {
1091 if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
1092 if block_hash == &CORRECT_BLOCK_HASH {
1093 let (max_count, max_size) = match statement.account_id() {
1094 Some(a) if a == account(1) => (1, 1000),
1095 Some(a) if a == account(2) => (2, 1000),
1096 Some(a) if a == account(3) => (3, 1000),
1097 Some(a) if a == account(4) => (4, 1000),
1098 Some(a) if a == account(42) => (42, 42 * crate::MAX_STATEMENT_SIZE as u32),
1099 _ => (2, 2000),
1100 };
1101 Ok(ValidStatement{ max_count, max_size })
1102 } else {
1103 Err(InvalidStatement::BadProof)
1104 }
1105 } else {
1106 Err(InvalidStatement::BadProof)
1107 }
1108 }
1109 }
1110 }
1111 }
1112 }
1113
1114 impl sp_blockchain::HeaderBackend<Block> for TestClient {
1115 fn header(&self, _hash: Hash) -> sp_blockchain::Result<Option<Header>> {
1116 unimplemented!()
1117 }
1118 fn info(&self) -> sp_blockchain::Info<Block> {
1119 sp_blockchain::Info {
1120 best_hash: CORRECT_BLOCK_HASH.into(),
1121 best_number: 0,
1122 genesis_hash: Default::default(),
1123 finalized_hash: CORRECT_BLOCK_HASH.into(),
1124 finalized_number: 1,
1125 finalized_state: None,
1126 number_leaves: 0,
1127 block_gap: None,
1128 }
1129 }
1130 fn status(&self, _hash: Hash) -> sp_blockchain::Result<sp_blockchain::BlockStatus> {
1131 unimplemented!()
1132 }
1133 fn number(&self, _hash: Hash) -> sp_blockchain::Result<Option<BlockNumber>> {
1134 unimplemented!()
1135 }
1136 fn hash(&self, _number: BlockNumber) -> sp_blockchain::Result<Option<Hash>> {
1137 unimplemented!()
1138 }
1139 }
1140
1141 fn test_store() -> (Store, tempfile::TempDir) {
1142 sp_tracing::init_for_tests();
1143 let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
1144
1145 let client = std::sync::Arc::new(TestClient);
1146 let mut path: std::path::PathBuf = temp_dir.path().into();
1147 path.push("db");
1148 let keystore = std::sync::Arc::new(sc_keystore::LocalKeystore::in_memory());
1149 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1150 (store, temp_dir) }
1152
1153 fn signed_statement(data: u8) -> Statement {
1154 signed_statement_with_topics(data, &[], None)
1155 }
1156
1157 fn signed_statement_with_topics(
1158 data: u8,
1159 topics: &[Topic],
1160 dec_key: Option<DecryptionKey>,
1161 ) -> Statement {
1162 let mut statement = Statement::new();
1163 statement.set_plain_data(vec![data]);
1164 for i in 0..topics.len() {
1165 statement.set_topic(i, topics[i]);
1166 }
1167 if let Some(key) = dec_key {
1168 statement.set_decryption_key(key);
1169 }
1170 let kp = sp_core::ed25519::Pair::from_string("//Alice", None).unwrap();
1171 statement.sign_ed25519_private(&kp);
1172 statement
1173 }
1174
1175 fn topic(data: u64) -> Topic {
1176 let mut topic: Topic = Default::default();
1177 topic[0..8].copy_from_slice(&data.to_le_bytes());
1178 topic
1179 }
1180
1181 fn dec_key(data: u64) -> DecryptionKey {
1182 let mut dec_key: DecryptionKey = Default::default();
1183 dec_key[0..8].copy_from_slice(&data.to_le_bytes());
1184 dec_key
1185 }
1186
1187 fn account(id: u64) -> AccountId {
1188 let mut account: AccountId = Default::default();
1189 account[0..8].copy_from_slice(&id.to_le_bytes());
1190 account
1191 }
1192
1193 fn channel(id: u64) -> Channel {
1194 let mut channel: Channel = Default::default();
1195 channel[0..8].copy_from_slice(&id.to_le_bytes());
1196 channel
1197 }
1198
1199 fn statement(account_id: u64, priority: u32, c: Option<u64>, data_len: usize) -> Statement {
1200 let mut statement = Statement::new();
1201 let mut data = Vec::new();
1202 data.resize(data_len, 0);
1203 statement.set_plain_data(data);
1204 statement.set_priority(priority);
1205 if let Some(c) = c {
1206 statement.set_channel(channel(c));
1207 }
1208 statement.set_proof(Proof::OnChain {
1209 block_hash: CORRECT_BLOCK_HASH,
1210 who: account(account_id),
1211 event_index: 0,
1212 });
1213 statement
1214 }
1215
1216 #[test]
1217 fn submit_one() {
1218 let (store, _temp) = test_store();
1219 let statement0 = signed_statement(0);
1220 assert_eq!(
1221 store.submit(statement0, StatementSource::Network),
1222 SubmitResult::New(NetworkPriority::High)
1223 );
1224 let unsigned = statement(0, 1, None, 0);
1225 assert_eq!(
1226 store.submit(unsigned, StatementSource::Network),
1227 SubmitResult::New(NetworkPriority::High)
1228 );
1229 }
1230
1231 #[test]
1232 fn save_and_load_statements() {
1233 let (store, temp) = test_store();
1234 let statement0 = signed_statement(0);
1235 let statement1 = signed_statement(1);
1236 let statement2 = signed_statement(2);
1237 assert_eq!(
1238 store.submit(statement0.clone(), StatementSource::Network),
1239 SubmitResult::New(NetworkPriority::High)
1240 );
1241 assert_eq!(
1242 store.submit(statement1.clone(), StatementSource::Network),
1243 SubmitResult::New(NetworkPriority::High)
1244 );
1245 assert_eq!(
1246 store.submit(statement2.clone(), StatementSource::Network),
1247 SubmitResult::New(NetworkPriority::High)
1248 );
1249 assert_eq!(store.statements().unwrap().len(), 3);
1250 assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1251 assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1.clone()));
1252 let keystore = store.keystore.clone();
1253 drop(store);
1254
1255 let client = std::sync::Arc::new(TestClient);
1256 let mut path: std::path::PathBuf = temp.path().into();
1257 path.push("db");
1258 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1259 assert_eq!(store.statements().unwrap().len(), 3);
1260 assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1261 assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
1262 }
1263
1264 #[test]
1265 fn take_recent_statements_clears_index() {
1266 let (store, _temp) = test_store();
1267 let statement0 = signed_statement(0);
1268 let statement1 = signed_statement(1);
1269 let statement2 = signed_statement(2);
1270 let statement3 = signed_statement(3);
1271
1272 let _ = store.submit(statement0.clone(), StatementSource::Local);
1273 let _ = store.submit(statement1.clone(), StatementSource::Local);
1274 let _ = store.submit(statement2.clone(), StatementSource::Local);
1275
1276 let recent1 = store.take_recent_statements().unwrap();
1277 let (recent1_hashes, recent1_statements): (Vec<_>, Vec<_>) = recent1.into_iter().unzip();
1278 let expected1 = vec![statement0, statement1, statement2];
1279 assert!(expected1.iter().all(|s| recent1_hashes.contains(&s.hash())));
1280 assert!(expected1.iter().all(|s| recent1_statements.contains(s)));
1281
1282 let recent2 = store.take_recent_statements().unwrap();
1284 assert_eq!(recent2.len(), 0);
1285
1286 store.submit(statement3.clone(), StatementSource::Network);
1287
1288 let recent3 = store.take_recent_statements().unwrap();
1289 let (recent3_hashes, recent3_statements): (Vec<_>, Vec<_>) = recent3.into_iter().unzip();
1290 let expected3 = vec![statement3];
1291 assert!(expected3.iter().all(|s| recent3_hashes.contains(&s.hash())));
1292 assert!(expected3.iter().all(|s| recent3_statements.contains(s)));
1293
1294 assert_eq!(store.statements().unwrap().len(), 4);
1296 }
1297
1298 #[test]
1299 fn search_by_topic_and_key() {
1300 let (store, _temp) = test_store();
1301 let statement0 = signed_statement(0);
1302 let statement1 = signed_statement_with_topics(1, &[topic(0)], None);
1303 let statement2 = signed_statement_with_topics(2, &[topic(0), topic(1)], Some(dec_key(2)));
1304 let statement3 = signed_statement_with_topics(3, &[topic(0), topic(1), topic(2)], None);
1305 let statement4 =
1306 signed_statement_with_topics(4, &[topic(0), topic(42), topic(2), topic(3)], None);
1307 let statements = vec![statement0, statement1, statement2, statement3, statement4];
1308 for s in &statements {
1309 store.submit(s.clone(), StatementSource::Network);
1310 }
1311
1312 let assert_topics = |topics: &[u64], key: Option<u64>, expected: &[u8]| {
1313 let key = key.map(dec_key);
1314 let topics: Vec<_> = topics.iter().map(|t| topic(*t)).collect();
1315 let mut got_vals: Vec<_> = if let Some(key) = key {
1316 store.posted(&topics, key).unwrap().into_iter().map(|d| d[0]).collect()
1317 } else {
1318 store.broadcasts(&topics).unwrap().into_iter().map(|d| d[0]).collect()
1319 };
1320 got_vals.sort();
1321 assert_eq!(expected.to_vec(), got_vals);
1322 };
1323
1324 assert_topics(&[], None, &[0, 1, 3, 4]);
1325 assert_topics(&[], Some(2), &[2]);
1326 assert_topics(&[0], None, &[1, 3, 4]);
1327 assert_topics(&[1], None, &[3]);
1328 assert_topics(&[2], None, &[3, 4]);
1329 assert_topics(&[3], None, &[4]);
1330 assert_topics(&[42], None, &[4]);
1331
1332 assert_topics(&[0, 1], None, &[3]);
1333 assert_topics(&[0, 1], Some(2), &[2]);
1334 assert_topics(&[0, 1, 99], Some(2), &[]);
1335 assert_topics(&[1, 2], None, &[3]);
1336 assert_topics(&[99], None, &[]);
1337 assert_topics(&[0, 99], None, &[]);
1338 assert_topics(&[0, 1, 2, 3, 42], None, &[]);
1339 }
1340
1341 #[test]
1342 fn constraints() {
1343 let (store, _temp) = test_store();
1344
1345 store.index.write().options.max_total_size = 3000;
1346 let source = StatementSource::Network;
1347 let ok = SubmitResult::New(NetworkPriority::High);
1348 let ignored = SubmitResult::Ignored;
1349
1350 assert_eq!(store.submit(statement(1, 1, Some(1), 2000), source), ignored);
1354 assert_eq!(store.submit(statement(1, 1, Some(1), 500), source), ok);
1355 assert_eq!(store.submit(statement(1, 1, Some(1), 200), source), ignored);
1357 assert_eq!(store.submit(statement(1, 2, Some(1), 600), source), ok);
1358 assert_eq!(store.submit(statement(1, 1, Some(2), 100), source), ignored);
1361 assert_eq!(store.index.read().expired.len(), 1);
1362
1363 assert_eq!(store.submit(statement(2, 1, None, 500), source), ok);
1366 assert_eq!(store.submit(statement(2, 2, None, 100), source), ok);
1367 assert_eq!(store.submit(statement(2, 3, None, 500), source), ok);
1369 assert_eq!(store.index.read().expired.len(), 2);
1370 assert_eq!(store.submit(statement(2, 4, None, 1000), source), ok);
1372 assert_eq!(store.index.read().expired.len(), 4);
1373
1374 assert_eq!(store.submit(statement(3, 2, Some(1), 300), source), ok);
1377 assert_eq!(store.submit(statement(3, 3, Some(2), 300), source), ok);
1378 assert_eq!(store.submit(statement(3, 4, Some(3), 300), source), ok);
1379 assert_eq!(store.submit(statement(3, 5, None, 500), source), ok);
1381 assert_eq!(store.index.read().expired.len(), 6);
1382
1383 assert_eq!(store.index.read().total_size, 2400);
1384 assert_eq!(store.index.read().entries.len(), 4);
1385
1386 assert_eq!(store.submit(statement(1, 1, None, 700), source), ignored);
1388 store.index.write().options.max_total_statements = 4;
1390 assert_eq!(store.submit(statement(1, 1, None, 100), source), ignored);
1391
1392 let mut expected_statements = vec![
1393 statement(1, 2, Some(1), 600).hash(),
1394 statement(2, 4, None, 1000).hash(),
1395 statement(3, 4, Some(3), 300).hash(),
1396 statement(3, 5, None, 500).hash(),
1397 ];
1398 expected_statements.sort();
1399 let mut statements: Vec<_> =
1400 store.statements().unwrap().into_iter().map(|(hash, _)| hash).collect();
1401 statements.sort();
1402 assert_eq!(expected_statements, statements);
1403 }
1404
1405 #[test]
1406 fn max_statement_size_for_gossiping() {
1407 let (store, _temp) = test_store();
1408 store.index.write().options.max_total_size = 42 * crate::MAX_STATEMENT_SIZE;
1409
1410 assert_eq!(
1411 store.submit(
1412 statement(42, 1, Some(1), crate::MAX_STATEMENT_SIZE - 500),
1413 StatementSource::Local
1414 ),
1415 SubmitResult::New(NetworkPriority::High)
1416 );
1417
1418 assert_eq!(
1419 store.submit(
1420 statement(42, 2, Some(1), 2 * crate::MAX_STATEMENT_SIZE),
1421 StatementSource::Local
1422 ),
1423 SubmitResult::Ignored
1424 );
1425 }
1426
1427 #[test]
1428 fn expired_statements_are_purged() {
1429 use super::DEFAULT_PURGE_AFTER_SEC;
1430 let (mut store, temp) = test_store();
1431 let mut statement = statement(1, 1, Some(3), 100);
1432 store.set_time(0);
1433 statement.set_topic(0, topic(4));
1434 store.submit(statement.clone(), StatementSource::Network);
1435 assert_eq!(store.index.read().entries.len(), 1);
1436 store.remove(&statement.hash()).unwrap();
1437 assert_eq!(store.index.read().entries.len(), 0);
1438 assert_eq!(store.index.read().accounts.len(), 0);
1439 store.set_time(DEFAULT_PURGE_AFTER_SEC + 1);
1440 store.maintain();
1441 assert_eq!(store.index.read().expired.len(), 0);
1442 let keystore = store.keystore.clone();
1443 drop(store);
1444
1445 let client = std::sync::Arc::new(TestClient);
1446 let mut path: std::path::PathBuf = temp.path().into();
1447 path.push("db");
1448 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1449 assert_eq!(store.statements().unwrap().len(), 0);
1450 assert_eq!(store.index.read().expired.len(), 0);
1451 }
1452
1453 #[test]
1454 fn posted_clear_decrypts() {
1455 let (store, _temp) = test_store();
1456 let public = store
1457 .keystore
1458 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1459 .unwrap();
1460 let statement1 = statement(1, 1, None, 100);
1461 let mut statement2 = statement(1, 2, None, 0);
1462 let plain = b"The most valuable secret".to_vec();
1463 statement2.encrypt(&plain, &public).unwrap();
1464 store.submit(statement1, StatementSource::Network);
1465 store.submit(statement2, StatementSource::Network);
1466 let posted_clear = store.posted_clear(&[], public.into()).unwrap();
1467 assert_eq!(posted_clear, vec![plain]);
1468 }
1469
1470 #[test]
1471 fn broadcasts_stmt_returns_encoded_statements() {
1472 let (store, _tmp) = test_store();
1473
1474 let s0 = signed_statement_with_topics(0, &[], None);
1476 let s1 = signed_statement_with_topics(1, &[topic(42)], None);
1478 let s2 = signed_statement_with_topics(2, &[topic(42)], Some(dec_key(99)));
1480
1481 for s in [&s0, &s1, &s2] {
1482 store.submit(s.clone(), StatementSource::Network);
1483 }
1484
1485 let mut hashes: Vec<_> = store
1487 .broadcasts_stmt(&[])
1488 .unwrap()
1489 .into_iter()
1490 .map(|bytes| Statement::decode(&mut &bytes[..]).unwrap().hash())
1491 .collect();
1492 hashes.sort();
1493 let expected_hashes = {
1494 let mut e = vec![s0.hash(), s1.hash()];
1495 e.sort();
1496 e
1497 };
1498 assert_eq!(hashes, expected_hashes);
1499
1500 let got = store.broadcasts_stmt(&[topic(42)]).unwrap();
1502 assert_eq!(got.len(), 1);
1503 let st = Statement::decode(&mut &got[0][..]).unwrap();
1504 assert_eq!(st.hash(), s1.hash());
1505 }
1506
1507 #[test]
1508 fn posted_stmt_returns_encoded_statements_for_dest() {
1509 let (store, _tmp) = test_store();
1510
1511 let public1 = store
1512 .keystore
1513 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1514 .unwrap();
1515 let dest: [u8; 32] = public1.into();
1516
1517 let public2 = store
1518 .keystore
1519 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1520 .unwrap();
1521
1522 let mut s_with_key = statement(1, 1, None, 0);
1524 let plain1 = b"The most valuable secret".to_vec();
1525 s_with_key.encrypt(&plain1, &public1).unwrap();
1526
1527 let mut s_other_key = statement(2, 2, None, 0);
1529 let plain2 = b"The second most valuable secret".to_vec();
1530 s_other_key.encrypt(&plain2, &public2).unwrap();
1531
1532 for s in [&s_with_key, &s_other_key] {
1534 store.submit(s.clone(), StatementSource::Network);
1535 }
1536
1537 let retrieved = store.posted_stmt(&[], dest).unwrap();
1539 assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1540
1541 let returned_stmt = Statement::decode(&mut &retrieved[0][..]).unwrap();
1543 assert_eq!(
1544 returned_stmt.hash(),
1545 s_with_key.hash(),
1546 "Returned statement must match s_with_key"
1547 );
1548 }
1549
1550 #[test]
1551 fn posted_clear_stmt_returns_statement_followed_by_plain_data() {
1552 let (store, _tmp) = test_store();
1553
1554 let public1 = store
1555 .keystore
1556 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1557 .unwrap();
1558 let dest: [u8; 32] = public1.into();
1559
1560 let public2 = store
1561 .keystore
1562 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1563 .unwrap();
1564
1565 let mut s_with_key = statement(1, 1, None, 0);
1567 let plain1 = b"The most valuable secret".to_vec();
1568 s_with_key.encrypt(&plain1, &public1).unwrap();
1569
1570 let mut s_other_key = statement(2, 2, None, 0);
1572 let plain2 = b"The second most valuable secret".to_vec();
1573 s_other_key.encrypt(&plain2, &public2).unwrap();
1574
1575 for s in [&s_with_key, &s_other_key] {
1577 store.submit(s.clone(), StatementSource::Network);
1578 }
1579
1580 let retrieved = store.posted_clear_stmt(&[], dest).unwrap();
1582 assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1583
1584 let encoded_stmt = s_with_key.encode();
1586 let stmt_len = encoded_stmt.len();
1587
1588 assert_eq!(&retrieved[0][..stmt_len], &encoded_stmt[..]);
1590
1591 let trailing = &retrieved[0][stmt_len..];
1593 assert_eq!(trailing, &plain1[..]);
1594 }
1595
1596 #[test]
1597 fn posted_clear_returns_plain_data_for_dest_and_topics() {
1598 let (store, _tmp) = test_store();
1599
1600 let public_dest = store
1602 .keystore
1603 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1604 .unwrap();
1605 let dest: [u8; 32] = public_dest.into();
1606
1607 let public_other = store
1608 .keystore
1609 .ed25519_generate_new(sp_core::crypto::key_types::STATEMENT, None)
1610 .unwrap();
1611
1612 let mut s_good = statement(1, 1, None, 0);
1614 let plaintext_good = b"The most valuable secret".to_vec();
1615 s_good.encrypt(&plaintext_good, &public_dest).unwrap();
1616 s_good.set_topic(0, topic(42));
1617
1618 let mut s_wrong_topic = statement(2, 2, None, 0);
1620 s_wrong_topic.encrypt(b"Wrong topic", &public_dest).unwrap();
1621 s_wrong_topic.set_topic(0, topic(99));
1622
1623 let mut s_other_dest = statement(3, 3, None, 0);
1625 s_other_dest.encrypt(b"Other dest", &public_other).unwrap();
1626 s_other_dest.set_topic(0, topic(42));
1627
1628 for s in [&s_good, &s_wrong_topic, &s_other_dest] {
1630 store.submit(s.clone(), StatementSource::Network);
1631 }
1632
1633 let retrieved = store.posted_clear(&[topic(42)], dest).unwrap();
1635
1636 assert_eq!(retrieved, vec![plaintext_good]);
1638 }
1639
1640 #[test]
1641 fn remove_by_covers_various_situations() {
1642 use sp_statement_store::{StatementSource, StatementStore, SubmitResult};
1643
1644 let (mut store, _temp) = test_store();
1646 store.set_time(0);
1647
1648 let t42 = topic(42);
1650 let k7 = dec_key(7);
1651
1652 let mut s_a1 = statement(4, 10, Some(100), 100);
1655 s_a1.set_topic(0, t42);
1656 let h_a1 = s_a1.hash();
1657
1658 let mut s_a2 = statement(4, 20, Some(200), 150);
1659 s_a2.set_decryption_key(k7);
1660 let h_a2 = s_a2.hash();
1661
1662 let s_a3 = statement(4, 30, None, 50);
1663 let h_a3 = s_a3.hash();
1664
1665 let s_b1 = statement(3, 10, None, 100);
1667 let h_b1 = s_b1.hash();
1668
1669 let mut s_b2 = statement(3, 15, Some(300), 100);
1670 s_b2.set_topic(0, t42);
1671 s_b2.set_decryption_key(k7);
1672 let h_b2 = s_b2.hash();
1673
1674 for s in [&s_a1, &s_a2, &s_a3, &s_b1, &s_b2] {
1676 assert!(matches!(
1677 store.submit(s.clone(), StatementSource::Network),
1678 SubmitResult::New(_)
1679 ));
1680 }
1681
1682 {
1684 let idx = store.index.read();
1685 assert_eq!(idx.entries.len(), 5, "all 5 should be present");
1686 assert!(idx.accounts.contains_key(&account(4)));
1687 assert!(idx.accounts.contains_key(&account(3)));
1688 assert_eq!(idx.total_size, 100 + 150 + 50 + 100 + 100);
1689
1690 let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1692 assert!(set_t.contains(&h_a1) && set_t.contains(&h_b2));
1693
1694 let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1695 assert!(set_k.contains(&h_a2) && set_k.contains(&h_b2));
1696 }
1697
1698 store.remove_by(account(4)).expect("remove_by should succeed");
1700
1701 {
1703 for h in [h_a1, h_a2, h_a3] {
1705 assert!(store.statement(&h).unwrap().is_none(), "A's statement should be removed");
1706 }
1707
1708 for h in [h_b1, h_b2] {
1710 assert!(store.statement(&h).unwrap().is_some(), "B's statement should remain");
1711 }
1712
1713 let idx = store.index.read();
1714
1715 assert!(!idx.accounts.contains_key(&account(4)), "Account A must be gone");
1717 assert!(idx.accounts.contains_key(&account(3)), "Account B must remain");
1718
1719 assert!(idx.expired.contains_key(&h_a1));
1721 assert!(idx.expired.contains_key(&h_a2));
1722 assert!(idx.expired.contains_key(&h_a3));
1723 assert_eq!(idx.expired.len(), 3);
1724
1725 assert_eq!(idx.entries.len(), 2);
1727 assert_eq!(idx.total_size, 100 + 100);
1728
1729 let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1731 assert!(set_t.contains(&h_b2));
1732 assert!(!set_t.contains(&h_a1));
1733
1734 let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1736 assert!(set_k.contains(&h_b2));
1737 assert!(!set_k.contains(&h_a2));
1738 }
1739
1740 store.remove_by(account(4)).expect("second remove_by should be a no-op");
1742
1743 let purge_after = store.index.read().options.purge_after_sec;
1745 store.set_time(purge_after + 1);
1746 store.maintain();
1747 assert_eq!(store.index.read().expired.len(), 0, "expired entries should be purged");
1748
1749 let s_new = statement(4, 40, None, 10);
1751 assert!(matches!(store.submit(s_new, StatementSource::Network), SubmitResult::New(_)));
1752 }
1753}