1#![warn(missing_docs)]
48#![warn(unused_extern_crates)]
49
50mod metrics;
51
52pub use pezsp_statement_store::{Error, StatementStore, MAX_TOPICS};
53
54use metrics::MetricsLink as PrometheusMetrics;
55use parking_lot::RwLock;
56use pezsc_keystore::LocalKeystore;
57use pezsp_api::ProvideRuntimeApi;
58use pezsp_blockchain::HeaderBackend;
59use pezsp_core::{
60 crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode,
61};
62use pezsp_runtime::traits::Block as BlockT;
63use pezsp_statement_store::{
64 runtime_api::{
65 InvalidStatement, StatementSource, StatementStoreExt, ValidStatement, ValidateStatement,
66 },
67 AccountId, BlockHash, Channel, DecryptionKey, Hash, NetworkPriority, Proof, Result, Statement,
68 SubmitResult, Topic,
69};
70use prometheus_endpoint::Registry as PrometheusRegistry;
71use std::{
72 collections::{BTreeMap, HashMap, HashSet},
73 sync::Arc,
74};
75
76const KEY_VERSION: &[u8] = b"version".as_slice();
77const CURRENT_VERSION: u32 = 1;
78
79const LOG_TARGET: &str = "statement-store";
80
81pub const DEFAULT_PURGE_AFTER_SEC: u64 = 2 * 24 * 60 * 60; pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; pub const MAX_STATEMENT_SIZE: usize =
91 pezsc_network_statement::config::MAX_STATEMENT_NOTIFICATION_SIZE as usize - 1;
92
93const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(30);
94
95mod col {
96 pub const META: u8 = 0;
97 pub const STATEMENTS: u8 = 1;
98 pub const EXPIRED: u8 = 2;
99
100 pub const COUNT: u8 = 3;
101}
102
103#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)]
104struct Priority(u32);
105
106#[derive(PartialEq, Eq)]
107struct PriorityKey {
108 hash: Hash,
109 priority: Priority,
110}
111
112impl PartialOrd for PriorityKey {
113 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
114 Some(self.cmp(other))
115 }
116}
117
118impl Ord for PriorityKey {
119 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
120 self.priority.cmp(&other.priority).then_with(|| self.hash.cmp(&other.hash))
121 }
122}
123
124#[derive(PartialEq, Eq)]
125struct ChannelEntry {
126 hash: Hash,
127 priority: Priority,
128}
129
130#[derive(Default)]
131struct StatementsForAccount {
132 by_priority: BTreeMap<PriorityKey, (Option<Channel>, usize)>,
134 channels: HashMap<Channel, ChannelEntry>,
136 data_size: usize,
138}
139
140pub struct Options {
142 max_total_statements: usize,
145 max_total_size: usize,
148 purge_after_sec: u64,
150}
151
152impl Default for Options {
153 fn default() -> Self {
154 Options {
155 max_total_statements: DEFAULT_MAX_TOTAL_STATEMENTS,
156 max_total_size: DEFAULT_MAX_TOTAL_SIZE,
157 purge_after_sec: DEFAULT_PURGE_AFTER_SEC,
158 }
159 }
160}
161
162#[derive(Default)]
163struct Index {
164 recent: HashSet<Hash>,
165 by_topic: HashMap<Topic, HashSet<Hash>>,
166 by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
167 topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
168 entries: HashMap<Hash, (AccountId, Priority, usize)>,
169 expired: HashMap<Hash, u64>, accounts: HashMap<AccountId, StatementsForAccount>,
171 options: Options,
172 total_size: usize,
173}
174
175struct ClientWrapper<Block, Client> {
176 client: Arc<Client>,
177 _block: std::marker::PhantomData<Block>,
178}
179
180impl<Block, Client> ClientWrapper<Block, Client>
181where
182 Block: BlockT,
183 Block::Hash: From<BlockHash>,
184 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
185 Client::Api: ValidateStatement<Block>,
186{
187 fn validate_statement(
188 &self,
189 block: Option<BlockHash>,
190 source: StatementSource,
191 statement: Statement,
192 ) -> std::result::Result<ValidStatement, InvalidStatement> {
193 let api = self.client.runtime_api();
194 let block = block.map(Into::into).unwrap_or_else(|| {
195 self.client.info().finalized_hash
197 });
198 api.validate_statement(block, source, statement)
199 .map_err(|_| InvalidStatement::InternalError)?
200 }
201}
202
203pub struct Store {
205 db: parity_db::Db,
206 index: RwLock<Index>,
207 validate_fn: Box<
208 dyn Fn(
209 Option<BlockHash>,
210 StatementSource,
211 Statement,
212 ) -> std::result::Result<ValidStatement, InvalidStatement>
213 + Send
214 + Sync,
215 >,
216 keystore: Arc<LocalKeystore>,
217 time_override: Option<u64>,
219 metrics: PrometheusMetrics,
220}
221
222enum IndexQuery {
223 Unknown,
224 Exists,
225 Expired,
226}
227
228enum MaybeInserted {
229 Inserted(HashSet<Hash>),
230 Ignored,
231}
232
233impl Index {
234 fn new(options: Options) -> Index {
235 Index { options, ..Default::default() }
236 }
237
238 fn insert_new(&mut self, hash: Hash, account: AccountId, statement: &Statement) {
239 let mut all_topics = [None; MAX_TOPICS];
240 let mut nt = 0;
241 while let Some(t) = statement.topic(nt) {
242 self.by_topic.entry(t).or_default().insert(hash);
243 all_topics[nt] = Some(t);
244 nt += 1;
245 }
246 let key = statement.decryption_key();
247 self.by_dec_key.entry(key).or_default().insert(hash);
248 if nt > 0 || key.is_some() {
249 self.topics_and_keys.insert(hash, (all_topics, key));
250 }
251 let priority = Priority(statement.priority().unwrap_or(0));
252 self.entries.insert(hash, (account, priority, statement.data_len()));
253 self.recent.insert(hash);
254 self.total_size += statement.data_len();
255 let account_info = self.accounts.entry(account).or_default();
256 account_info.data_size += statement.data_len();
257 if let Some(channel) = statement.channel() {
258 account_info.channels.insert(channel, ChannelEntry { hash, priority });
259 }
260 account_info
261 .by_priority
262 .insert(PriorityKey { hash, priority }, (statement.channel(), statement.data_len()));
263 }
264
265 fn query(&self, hash: &Hash) -> IndexQuery {
266 if self.entries.contains_key(hash) {
267 return IndexQuery::Exists;
268 }
269 if self.expired.contains_key(hash) {
270 return IndexQuery::Expired;
271 }
272 IndexQuery::Unknown
273 }
274
275 fn insert_expired(&mut self, hash: Hash, timestamp: u64) {
276 self.expired.insert(hash, timestamp);
277 }
278
279 fn iterate_with(
280 &self,
281 key: Option<DecryptionKey>,
282 match_all_topics: &[Topic],
283 mut f: impl FnMut(&Hash) -> Result<()>,
284 ) -> Result<()> {
285 let empty = HashSet::new();
286 let mut sets: [&HashSet<Hash>; MAX_TOPICS + 1] = [∅ MAX_TOPICS + 1];
287 if match_all_topics.len() > MAX_TOPICS {
288 return Ok(());
289 }
290 let key_set = self.by_dec_key.get(&key);
291 if key_set.map_or(0, |s| s.len()) == 0 {
292 return Ok(());
294 }
295 sets[0] = key_set.expect("Function returns if key_set is None");
296 for (i, t) in match_all_topics.iter().enumerate() {
297 let set = self.by_topic.get(t);
298 if set.map_or(0, |s| s.len()) == 0 {
299 return Ok(());
301 }
302 sets[i + 1] = set.expect("Function returns if set is None");
303 }
304 let sets = &mut sets[0..match_all_topics.len() + 1];
305 sets.sort_by_key(|s| s.len());
307 for item in sets[0] {
308 if sets[1..].iter().all(|set| set.contains(item)) {
309 log::trace!(
310 target: LOG_TARGET,
311 "Iterating by topic/key: statement {:?}",
312 HexDisplay::from(item)
313 );
314 f(item)?
315 }
316 }
317 Ok(())
318 }
319
320 fn maintain(&mut self, current_time: u64) -> Vec<Hash> {
321 let mut purged = Vec::new();
323 self.expired.retain(|hash, timestamp| {
324 if *timestamp + self.options.purge_after_sec <= current_time {
325 purged.push(*hash);
326 log::trace!(target: LOG_TARGET, "Purged statement {:?}", HexDisplay::from(hash));
327 false
328 } else {
329 true
330 }
331 });
332 purged
333 }
334
335 fn take_recent(&mut self) -> HashSet<Hash> {
336 std::mem::take(&mut self.recent)
337 }
338
339 fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
340 if let Some((account, priority, len)) = self.entries.remove(hash) {
341 self.total_size -= len;
342 if let Some((topics, key)) = self.topics_and_keys.remove(hash) {
343 for t in topics.into_iter().flatten() {
344 if let std::collections::hash_map::Entry::Occupied(mut set) =
345 self.by_topic.entry(t)
346 {
347 set.get_mut().remove(hash);
348 if set.get().is_empty() {
349 set.remove_entry();
350 }
351 }
352 }
353 if let std::collections::hash_map::Entry::Occupied(mut set) =
354 self.by_dec_key.entry(key)
355 {
356 set.get_mut().remove(hash);
357 if set.get().is_empty() {
358 set.remove_entry();
359 }
360 }
361 }
362 let _ = self.recent.remove(hash);
363 self.expired.insert(*hash, current_time);
364 if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
365 self.accounts.entry(account)
366 {
367 let key = PriorityKey { hash: *hash, priority };
368 if let Some((channel, len)) = account_rec.get_mut().by_priority.remove(&key) {
369 account_rec.get_mut().data_size -= len;
370 if let Some(channel) = channel {
371 account_rec.get_mut().channels.remove(&channel);
372 }
373 }
374 if account_rec.get().by_priority.is_empty() {
375 account_rec.remove_entry();
376 }
377 }
378 log::trace!(target: LOG_TARGET, "Expired statement {:?}", HexDisplay::from(hash));
379 true
380 } else {
381 false
382 }
383 }
384
385 fn insert(
386 &mut self,
387 hash: Hash,
388 statement: &Statement,
389 account: &AccountId,
390 validation: &ValidStatement,
391 current_time: u64,
392 ) -> MaybeInserted {
393 let statement_len = statement.data_len();
394 if statement_len > validation.max_size as usize {
395 log::debug!(
396 target: LOG_TARGET,
397 "Ignored oversize message: {:?} ({} bytes)",
398 HexDisplay::from(&hash),
399 statement_len,
400 );
401 return MaybeInserted::Ignored;
402 }
403
404 let mut evicted = HashSet::new();
405 let mut would_free_size = 0;
406 let priority = Priority(statement.priority().unwrap_or(0));
407 let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize);
408 if let Some(account_rec) = self.accounts.get(account) {
412 if let Some(channel) = statement.channel() {
413 if let Some(channel_record) = account_rec.channels.get(&channel) {
414 if priority <= channel_record.priority {
415 log::debug!(
417 target: LOG_TARGET,
418 "Ignored lower priority channel message: {:?} {:?} <= {:?}",
419 HexDisplay::from(&hash),
420 priority,
421 channel_record.priority,
422 );
423 return MaybeInserted::Ignored;
424 } else {
425 log::debug!(
428 target: LOG_TARGET,
429 "Replacing higher priority channel message: {:?} ({:?}) > {:?} ({:?})",
430 HexDisplay::from(&hash),
431 priority,
432 HexDisplay::from(&channel_record.hash),
433 channel_record.priority,
434 );
435 let key = PriorityKey {
436 hash: channel_record.hash,
437 priority: channel_record.priority,
438 };
439 if let Some((_channel, len)) = account_rec.by_priority.get(&key) {
440 would_free_size += *len;
441 evicted.insert(channel_record.hash);
442 }
443 }
444 }
445 }
446 for (entry, (_, len)) in account_rec.by_priority.iter() {
448 if (account_rec.data_size - would_free_size + statement_len <= max_size)
449 && account_rec.by_priority.len() + 1 - evicted.len() <= max_count
450 {
451 break;
453 }
454 if evicted.contains(&entry.hash) {
455 continue;
457 }
458 if entry.priority >= priority {
459 log::debug!(
460 target: LOG_TARGET,
461 "Ignored message due to constraints {:?} {:?} < {:?}",
462 HexDisplay::from(&hash),
463 priority,
464 entry.priority,
465 );
466 return MaybeInserted::Ignored;
467 }
468 evicted.insert(entry.hash);
469 would_free_size += len;
470 }
471 }
472 if !((self.total_size - would_free_size + statement_len <= self.options.max_total_size)
474 && self.entries.len() + 1 - evicted.len() <= self.options.max_total_statements)
475 {
476 log::debug!(
477 target: LOG_TARGET,
478 "Ignored statement {} because the store is full (size={}, count={})",
479 HexDisplay::from(&hash),
480 self.total_size,
481 self.entries.len(),
482 );
483 return MaybeInserted::Ignored;
484 }
485
486 for h in &evicted {
487 self.make_expired(h, current_time);
488 }
489 self.insert_new(hash, *account, statement);
490 MaybeInserted::Inserted(evicted)
491 }
492}
493
494impl Store {
495 pub fn new_shared<Block, Client>(
498 path: &std::path::Path,
499 options: Options,
500 client: Arc<Client>,
501 keystore: Arc<LocalKeystore>,
502 prometheus: Option<&PrometheusRegistry>,
503 task_spawner: &dyn SpawnNamed,
504 ) -> Result<Arc<Store>>
505 where
506 Block: BlockT,
507 Block::Hash: From<BlockHash>,
508 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
509 Client::Api: ValidateStatement<Block>,
510 {
511 let store = Arc::new(Self::new(path, options, client, keystore, prometheus)?);
512
513 let worker_store = store.clone();
515 task_spawner.spawn(
516 "statement-store-maintenance",
517 Some("statement-store"),
518 Box::pin(async move {
519 let mut interval = tokio::time::interval(MAINTENANCE_PERIOD);
520 loop {
521 interval.tick().await;
522 worker_store.maintain();
523 }
524 }),
525 );
526
527 Ok(store)
528 }
529
530 #[doc(hidden)]
533 pub fn new<Block, Client>(
534 path: &std::path::Path,
535 options: Options,
536 client: Arc<Client>,
537 keystore: Arc<LocalKeystore>,
538 prometheus: Option<&PrometheusRegistry>,
539 ) -> Result<Store>
540 where
541 Block: BlockT,
542 Block::Hash: From<BlockHash>,
543 Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
544 Client::Api: ValidateStatement<Block>,
545 {
546 let mut path: std::path::PathBuf = path.into();
547 path.push("statements");
548
549 let mut config = parity_db::Options::with_columns(&path, col::COUNT);
550
551 let statement_col = &mut config.columns[col::STATEMENTS as usize];
552 statement_col.ref_counted = false;
553 statement_col.preimage = true;
554 statement_col.uniform = true;
555 let db = parity_db::Db::open_or_create(&config).map_err(|e| Error::Db(e.to_string()))?;
556 match db.get(col::META, &KEY_VERSION).map_err(|e| Error::Db(e.to_string()))? {
557 Some(version) => {
558 let version = u32::from_le_bytes(
559 version
560 .try_into()
561 .map_err(|_| Error::Db("Error reading database version".into()))?,
562 );
563 if version != CURRENT_VERSION {
564 return Err(Error::Db(format!("Unsupported database version: {version}")));
565 }
566 },
567 None => {
568 db.commit([(
569 col::META,
570 KEY_VERSION.to_vec(),
571 Some(CURRENT_VERSION.to_le_bytes().to_vec()),
572 )])
573 .map_err(|e| Error::Db(e.to_string()))?;
574 },
575 }
576
577 let validator = ClientWrapper { client, _block: Default::default() };
578 let validate_fn = Box::new(move |block, source, statement| {
579 validator.validate_statement(block, source, statement)
580 });
581
582 let store = Store {
583 db,
584 index: RwLock::new(Index::new(options)),
585 validate_fn,
586 keystore,
587 time_override: None,
588 metrics: PrometheusMetrics::new(prometheus),
589 };
590 store.populate()?;
591 Ok(store)
592 }
593
594 fn populate(&self) -> Result<()> {
599 {
600 let mut index = self.index.write();
601 self.db
602 .iter_column_while(col::STATEMENTS, |item| {
603 let statement = item.value;
604 if let Ok(statement) = Statement::decode(&mut statement.as_slice()) {
605 let hash = statement.hash();
606 log::trace!(
607 target: LOG_TARGET,
608 "Statement loaded {:?}",
609 HexDisplay::from(&hash)
610 );
611 if let Some(account_id) = statement.account_id() {
612 index.insert_new(hash, account_id, &statement);
613 } else {
614 log::debug!(
615 target: LOG_TARGET,
616 "Error decoding statement loaded from the DB: {:?}",
617 HexDisplay::from(&hash)
618 );
619 }
620 }
621 true
622 })
623 .map_err(|e| Error::Db(e.to_string()))?;
624 self.db
625 .iter_column_while(col::EXPIRED, |item| {
626 let expired_info = item.value;
627 if let Ok((hash, timestamp)) =
628 <(Hash, u64)>::decode(&mut expired_info.as_slice())
629 {
630 log::trace!(
631 target: LOG_TARGET,
632 "Statement loaded (expired): {:?}",
633 HexDisplay::from(&hash)
634 );
635 index.insert_expired(hash, timestamp);
636 }
637 true
638 })
639 .map_err(|e| Error::Db(e.to_string()))?;
640 }
641
642 self.maintain();
643 Ok(())
644 }
645
646 fn collect_statements<R>(
647 &self,
648 key: Option<DecryptionKey>,
649 match_all_topics: &[Topic],
650 mut f: impl FnMut(Statement) -> Option<R>,
651 ) -> Result<Vec<R>> {
652 let mut result = Vec::new();
653 let index = self.index.read();
654 index.iterate_with(key, match_all_topics, |hash| {
655 match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? {
656 Some(entry) => {
657 if let Ok(statement) = Statement::decode(&mut entry.as_slice()) {
658 if let Some(data) = f(statement) {
659 result.push(data);
660 }
661 } else {
662 log::warn!(
664 target: LOG_TARGET,
665 "Corrupt statement {:?}",
666 HexDisplay::from(hash)
667 );
668 }
669 },
670 None => {
671 log::warn!(
673 target: LOG_TARGET,
674 "Missing statement {:?}",
675 HexDisplay::from(hash)
676 );
677 },
678 }
679 Ok(())
680 })?;
681 Ok(result)
682 }
683
684 pub fn maintain(&self) {
686 log::trace!(target: LOG_TARGET, "Started store maintenance");
687 let (deleted, active_count, expired_count): (Vec<_>, usize, usize) = {
688 let mut index = self.index.write();
689 let deleted = index.maintain(self.timestamp());
690 (deleted, index.entries.len(), index.expired.len())
691 };
692 let deleted: Vec<_> =
693 deleted.into_iter().map(|hash| (col::EXPIRED, hash.to_vec(), None)).collect();
694 let deleted_count = deleted.len() as u64;
695 if let Err(e) = self.db.commit(deleted) {
696 log::warn!(target: LOG_TARGET, "Error writing to the statement database: {:?}", e);
697 } else {
698 self.metrics.report(|metrics| metrics.statements_pruned.inc_by(deleted_count));
699 }
700 log::trace!(
701 target: LOG_TARGET,
702 "Completed store maintenance. Purged: {}, Active: {}, Expired: {}",
703 deleted_count,
704 active_count,
705 expired_count
706 );
707 }
708
709 fn timestamp(&self) -> u64 {
710 self.time_override.unwrap_or_else(|| {
711 std::time::SystemTime::now()
712 .duration_since(std::time::UNIX_EPOCH)
713 .unwrap_or_default()
714 .as_secs()
715 })
716 }
717
718 #[cfg(test)]
719 fn set_time(&mut self, time: u64) {
720 self.time_override = Some(time);
721 }
722
723 pub fn as_statement_store_ext(self: Arc<Self>) -> StatementStoreExt {
725 StatementStoreExt::new(self)
726 }
727
728 fn posted_clear_inner<R>(
731 &self,
732 match_all_topics: &[Topic],
733 dest: [u8; 32],
734 mut map_f: impl FnMut(Statement, Vec<u8>) -> R,
736 ) -> Result<Vec<R>> {
737 self.collect_statements(Some(dest), match_all_topics, |statement| {
738 if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) {
739 let public: pezsp_core::ed25519::Public = UncheckedFrom::unchecked_from(key);
740 let public: pezsp_statement_store::ed25519::Public = public.into();
741 match self.keystore.key_pair::<pezsp_statement_store::ed25519::Pair>(&public) {
742 Err(e) => {
743 log::debug!(
744 target: LOG_TARGET,
745 "Keystore error: {:?}, for statement {:?}",
746 e,
747 HexDisplay::from(&statement.hash())
748 );
749 None
750 },
751 Ok(None) => {
752 log::debug!(
753 target: LOG_TARGET,
754 "Keystore is missing key for statement {:?}",
755 HexDisplay::from(&statement.hash())
756 );
757 None
758 },
759 Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) {
760 Ok(r) => r.map(|data| map_f(statement, data)),
761 Err(e) => {
762 log::debug!(
763 target: LOG_TARGET,
764 "Decryption error: {:?}, for statement {:?}",
765 e,
766 HexDisplay::from(&statement.hash())
767 );
768 None
769 },
770 },
771 }
772 } else {
773 None
774 }
775 })
776 }
777}
778
779impl StatementStore for Store {
780 fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
782 let index = self.index.read();
783 let mut result = Vec::with_capacity(index.entries.len());
784 for hash in index.entries.keys().cloned() {
785 let Some(encoded) =
786 self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
787 else {
788 continue;
789 };
790 if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
791 result.push((hash, statement));
792 }
793 }
794 Ok(result)
795 }
796
797 fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>> {
798 let mut index = self.index.write();
799 let recent = index.take_recent();
800 let mut result = Vec::with_capacity(recent.len());
801 for hash in recent {
802 let Some(encoded) =
803 self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
804 else {
805 continue;
806 };
807 if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
808 result.push((hash, statement));
809 }
810 }
811 Ok(result)
812 }
813
814 fn statement(&self, hash: &Hash) -> Result<Option<Statement>> {
816 Ok(
817 match self
818 .db
819 .get(col::STATEMENTS, hash.as_slice())
820 .map_err(|e| Error::Db(e.to_string()))?
821 {
822 Some(entry) => {
823 log::trace!(
824 target: LOG_TARGET,
825 "Queried statement {:?}",
826 HexDisplay::from(hash)
827 );
828 Some(
829 Statement::decode(&mut entry.as_slice())
830 .map_err(|e| Error::Decode(e.to_string()))?,
831 )
832 },
833 None => {
834 log::trace!(
835 target: LOG_TARGET,
836 "Queried missing statement {:?}",
837 HexDisplay::from(hash)
838 );
839 None
840 },
841 },
842 )
843 }
844
845 fn has_statement(&self, hash: &Hash) -> bool {
846 self.index.read().entries.contains_key(hash)
847 }
848
849 fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
852 self.collect_statements(None, match_all_topics, |statement| statement.into_data())
853 }
854
855 fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
859 self.collect_statements(Some(dest), match_all_topics, |statement| statement.into_data())
860 }
861
862 fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
865 self.posted_clear_inner(match_all_topics, dest, |_statement, data| data)
866 }
867
868 fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
871 self.collect_statements(None, match_all_topics, |statement| Some(statement.encode()))
872 }
873
874 fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
878 self.collect_statements(Some(dest), match_all_topics, |statement| Some(statement.encode()))
879 }
880
881 fn posted_clear_stmt(
884 &self,
885 match_all_topics: &[Topic],
886 dest: [u8; 32],
887 ) -> Result<Vec<Vec<u8>>> {
888 self.posted_clear_inner(match_all_topics, dest, |statement, data| {
889 let mut res = Vec::with_capacity(statement.size_hint() + data.len());
890 statement.encode_to(&mut res);
891 res.extend_from_slice(&data);
892 res
893 })
894 }
895
896 fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult {
898 let hash = statement.hash();
899 let encoded_size = statement.encoded_size();
900 if encoded_size > MAX_STATEMENT_SIZE {
901 log::debug!(
902 target: LOG_TARGET,
903 "Statement is too big for propogation: {:?} ({}/{} bytes)",
904 HexDisplay::from(&hash),
905 statement.encoded_size(),
906 MAX_STATEMENT_SIZE
907 );
908 return SubmitResult::Ignored;
909 }
910
911 match self.index.read().query(&hash) {
912 IndexQuery::Expired => {
913 if !source.can_be_resubmitted() {
914 return SubmitResult::KnownExpired;
915 }
916 },
917 IndexQuery::Exists => {
918 if !source.can_be_resubmitted() {
919 return SubmitResult::Known;
920 }
921 },
922 IndexQuery::Unknown => {},
923 }
924
925 let Some(account_id) = statement.account_id() else {
926 log::debug!(
927 target: LOG_TARGET,
928 "Statement validation failed: Missing proof ({:?})",
929 HexDisplay::from(&hash),
930 );
931 self.metrics.report(|metrics| metrics.validations_invalid.inc());
932 return SubmitResult::Bad("No statement proof");
933 };
934
935 let at_block = if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
937 Some(*block_hash)
938 } else {
939 None
940 };
941 let validation_result = (self.validate_fn)(at_block, source, statement.clone());
942 let validation = match validation_result {
943 Ok(validation) => validation,
944 Err(InvalidStatement::BadProof) => {
945 log::debug!(
946 target: LOG_TARGET,
947 "Statement validation failed: BadProof, {:?}",
948 HexDisplay::from(&hash),
949 );
950 self.metrics.report(|metrics| metrics.validations_invalid.inc());
951 return SubmitResult::Bad("Bad statement proof");
952 },
953 Err(InvalidStatement::NoProof) => {
954 log::debug!(
955 target: LOG_TARGET,
956 "Statement validation failed: NoProof, {:?}",
957 HexDisplay::from(&hash),
958 );
959 self.metrics.report(|metrics| metrics.validations_invalid.inc());
960 return SubmitResult::Bad("Missing statement proof");
961 },
962 Err(InvalidStatement::InternalError) => {
963 return SubmitResult::InternalError(Error::Runtime)
964 },
965 };
966
967 let current_time = self.timestamp();
968 let mut commit = Vec::new();
969 {
970 let mut index = self.index.write();
971
972 let evicted =
973 match index.insert(hash, &statement, &account_id, &validation, current_time) {
974 MaybeInserted::Ignored => return SubmitResult::Ignored,
975 MaybeInserted::Inserted(evicted) => evicted,
976 };
977
978 commit.push((col::STATEMENTS, hash.to_vec(), Some(statement.encode())));
979 for hash in evicted {
980 commit.push((col::STATEMENTS, hash.to_vec(), None));
981 commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
982 }
983 if let Err(e) = self.db.commit(commit) {
984 log::debug!(
985 target: LOG_TARGET,
986 "Statement validation failed: database error {}, {:?}",
987 e,
988 statement
989 );
990 return SubmitResult::InternalError(Error::Db(e.to_string()));
991 }
992 } self.metrics.report(|metrics| metrics.submitted_statements.inc());
994 let network_priority = NetworkPriority::High;
995 log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash));
996 SubmitResult::New(network_priority)
997 }
998
999 fn remove(&self, hash: &Hash) -> Result<()> {
1001 let current_time = self.timestamp();
1002 {
1003 let mut index = self.index.write();
1004 if index.make_expired(hash, current_time) {
1005 let commit = [
1006 (col::STATEMENTS, hash.to_vec(), None),
1007 (col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())),
1008 ];
1009 if let Err(e) = self.db.commit(commit) {
1010 log::debug!(
1011 target: LOG_TARGET,
1012 "Error removing statement: database error {}, {:?}",
1013 e,
1014 HexDisplay::from(hash),
1015 );
1016 return Err(Error::Db(e.to_string()));
1017 }
1018 }
1019 }
1020 Ok(())
1021 }
1022
1023 fn remove_by(&self, who: [u8; 32]) -> Result<()> {
1025 let mut index = self.index.write();
1026 let mut evicted = Vec::new();
1027 if let Some(account_rec) = index.accounts.get(&who) {
1028 evicted.extend(account_rec.by_priority.keys().map(|k| k.hash));
1029 }
1030
1031 let current_time = self.timestamp();
1032 let mut commit = Vec::new();
1033 for hash in evicted {
1034 index.make_expired(&hash, current_time);
1035 commit.push((col::STATEMENTS, hash.to_vec(), None));
1036 commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
1037 }
1038 self.db.commit(commit).map_err(|e| {
1039 log::debug!(
1040 target: LOG_TARGET,
1041 "Error removing statement: database error {}, remove by {:?}",
1042 e,
1043 HexDisplay::from(&who),
1044 );
1045
1046 Error::Db(e.to_string())
1047 })
1048 }
1049}
1050
1051#[cfg(test)]
1052mod tests {
1053 use crate::Store;
1054 use pezsc_keystore::Keystore;
1055 use pezsp_core::{Decode, Encode, Pair};
1056 use pezsp_statement_store::{
1057 runtime_api::{InvalidStatement, ValidStatement, ValidateStatement},
1058 AccountId, Channel, DecryptionKey, NetworkPriority, Proof, SignatureVerificationResult,
1059 Statement, StatementSource, StatementStore, SubmitResult, Topic,
1060 };
1061
1062 type Extrinsic = pezsp_runtime::OpaqueExtrinsic;
1063 type Hash = pezsp_core::H256;
1064 type Hashing = pezsp_runtime::traits::BlakeTwo256;
1065 type BlockNumber = u64;
1066 type Header = pezsp_runtime::generic::Header<BlockNumber, Hashing>;
1067 type Block = pezsp_runtime::generic::Block<Header, Extrinsic>;
1068
1069 const CORRECT_BLOCK_HASH: [u8; 32] = [1u8; 32];
1070
1071 #[derive(Clone)]
1072 pub(crate) struct TestClient;
1073
1074 pub(crate) struct RuntimeApi {
1075 _inner: TestClient,
1076 }
1077
1078 impl pezsp_api::ProvideRuntimeApi<Block> for TestClient {
1079 type Api = RuntimeApi;
1080 fn runtime_api(&self) -> pezsp_api::ApiRef<'_, Self::Api> {
1081 RuntimeApi { _inner: self.clone() }.into()
1082 }
1083 }
1084
1085 pezsp_api::mock_impl_runtime_apis! {
1086 impl ValidateStatement<Block> for RuntimeApi {
1087 fn validate_statement(
1088 _source: StatementSource,
1089 statement: Statement,
1090 ) -> std::result::Result<ValidStatement, InvalidStatement> {
1091 use crate::tests::account;
1092 match statement.verify_signature() {
1093 SignatureVerificationResult::Valid(_) => Ok(ValidStatement{max_count: 100, max_size: 1000}),
1094 SignatureVerificationResult::Invalid => Err(InvalidStatement::BadProof),
1095 SignatureVerificationResult::NoSignature => {
1096 if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() {
1097 if block_hash == &CORRECT_BLOCK_HASH {
1098 let (max_count, max_size) = match statement.account_id() {
1099 Some(a) if a == account(1) => (1, 1000),
1100 Some(a) if a == account(2) => (2, 1000),
1101 Some(a) if a == account(3) => (3, 1000),
1102 Some(a) if a == account(4) => (4, 1000),
1103 Some(a) if a == account(42) => (42, 42 * crate::MAX_STATEMENT_SIZE as u32),
1104 _ => (2, 2000),
1105 };
1106 Ok(ValidStatement{ max_count, max_size })
1107 } else {
1108 Err(InvalidStatement::BadProof)
1109 }
1110 } else {
1111 Err(InvalidStatement::BadProof)
1112 }
1113 }
1114 }
1115 }
1116 }
1117 }
1118
1119 impl pezsp_blockchain::HeaderBackend<Block> for TestClient {
1120 fn header(&self, _hash: Hash) -> pezsp_blockchain::Result<Option<Header>> {
1121 unimplemented!()
1122 }
1123 fn info(&self) -> pezsp_blockchain::Info<Block> {
1124 pezsp_blockchain::Info {
1125 best_hash: CORRECT_BLOCK_HASH.into(),
1126 best_number: 0,
1127 genesis_hash: Default::default(),
1128 finalized_hash: CORRECT_BLOCK_HASH.into(),
1129 finalized_number: 1,
1130 finalized_state: None,
1131 number_leaves: 0,
1132 block_gap: None,
1133 }
1134 }
1135 fn status(&self, _hash: Hash) -> pezsp_blockchain::Result<pezsp_blockchain::BlockStatus> {
1136 unimplemented!()
1137 }
1138 fn number(&self, _hash: Hash) -> pezsp_blockchain::Result<Option<BlockNumber>> {
1139 unimplemented!()
1140 }
1141 fn hash(&self, _number: BlockNumber) -> pezsp_blockchain::Result<Option<Hash>> {
1142 unimplemented!()
1143 }
1144 }
1145
1146 fn test_store() -> (Store, tempfile::TempDir) {
1147 pezsp_tracing::init_for_tests();
1148 let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
1149
1150 let client = std::sync::Arc::new(TestClient);
1151 let mut path: std::path::PathBuf = temp_dir.path().into();
1152 path.push("db");
1153 let keystore = std::sync::Arc::new(pezsc_keystore::LocalKeystore::in_memory());
1154 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1155 (store, temp_dir) }
1157
1158 fn signed_statement(data: u8) -> Statement {
1159 signed_statement_with_topics(data, &[], None)
1160 }
1161
1162 fn signed_statement_with_topics(
1163 data: u8,
1164 topics: &[Topic],
1165 dec_key: Option<DecryptionKey>,
1166 ) -> Statement {
1167 let mut statement = Statement::new();
1168 statement.set_plain_data(vec![data]);
1169 for i in 0..topics.len() {
1170 statement.set_topic(i, topics[i]);
1171 }
1172 if let Some(key) = dec_key {
1173 statement.set_decryption_key(key);
1174 }
1175 let kp = pezsp_core::ed25519::Pair::from_string("//Alice", None).unwrap();
1176 statement.sign_ed25519_private(&kp);
1177 statement
1178 }
1179
1180 fn topic(data: u64) -> Topic {
1181 let mut topic: Topic = Default::default();
1182 topic[0..8].copy_from_slice(&data.to_le_bytes());
1183 topic
1184 }
1185
1186 fn dec_key(data: u64) -> DecryptionKey {
1187 let mut dec_key: DecryptionKey = Default::default();
1188 dec_key[0..8].copy_from_slice(&data.to_le_bytes());
1189 dec_key
1190 }
1191
1192 fn account(id: u64) -> AccountId {
1193 let mut account: AccountId = Default::default();
1194 account[0..8].copy_from_slice(&id.to_le_bytes());
1195 account
1196 }
1197
1198 fn channel(id: u64) -> Channel {
1199 let mut channel: Channel = Default::default();
1200 channel[0..8].copy_from_slice(&id.to_le_bytes());
1201 channel
1202 }
1203
1204 fn statement(account_id: u64, priority: u32, c: Option<u64>, data_len: usize) -> Statement {
1205 let mut statement = Statement::new();
1206 let mut data = Vec::new();
1207 data.resize(data_len, 0);
1208 statement.set_plain_data(data);
1209 statement.set_priority(priority);
1210 if let Some(c) = c {
1211 statement.set_channel(channel(c));
1212 }
1213 statement.set_proof(Proof::OnChain {
1214 block_hash: CORRECT_BLOCK_HASH,
1215 who: account(account_id),
1216 event_index: 0,
1217 });
1218 statement
1219 }
1220
1221 #[test]
1222 fn submit_one() {
1223 let (store, _temp) = test_store();
1224 let statement0 = signed_statement(0);
1225 assert_eq!(
1226 store.submit(statement0, StatementSource::Network),
1227 SubmitResult::New(NetworkPriority::High)
1228 );
1229 let unsigned = statement(0, 1, None, 0);
1230 assert_eq!(
1231 store.submit(unsigned, StatementSource::Network),
1232 SubmitResult::New(NetworkPriority::High)
1233 );
1234 }
1235
1236 #[test]
1237 fn save_and_load_statements() {
1238 let (store, temp) = test_store();
1239 let statement0 = signed_statement(0);
1240 let statement1 = signed_statement(1);
1241 let statement2 = signed_statement(2);
1242 assert_eq!(
1243 store.submit(statement0.clone(), StatementSource::Network),
1244 SubmitResult::New(NetworkPriority::High)
1245 );
1246 assert_eq!(
1247 store.submit(statement1.clone(), StatementSource::Network),
1248 SubmitResult::New(NetworkPriority::High)
1249 );
1250 assert_eq!(
1251 store.submit(statement2.clone(), StatementSource::Network),
1252 SubmitResult::New(NetworkPriority::High)
1253 );
1254 assert_eq!(store.statements().unwrap().len(), 3);
1255 assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1256 assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1.clone()));
1257 let keystore = store.keystore.clone();
1258 drop(store);
1259
1260 let client = std::sync::Arc::new(TestClient);
1261 let mut path: std::path::PathBuf = temp.path().into();
1262 path.push("db");
1263 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1264 assert_eq!(store.statements().unwrap().len(), 3);
1265 assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1266 assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
1267 }
1268
1269 #[test]
1270 fn take_recent_statements_clears_index() {
1271 let (store, _temp) = test_store();
1272 let statement0 = signed_statement(0);
1273 let statement1 = signed_statement(1);
1274 let statement2 = signed_statement(2);
1275 let statement3 = signed_statement(3);
1276
1277 let _ = store.submit(statement0.clone(), StatementSource::Local);
1278 let _ = store.submit(statement1.clone(), StatementSource::Local);
1279 let _ = store.submit(statement2.clone(), StatementSource::Local);
1280
1281 let recent1 = store.take_recent_statements().unwrap();
1282 let (recent1_hashes, recent1_statements): (Vec<_>, Vec<_>) = recent1.into_iter().unzip();
1283 let expected1 = vec![statement0, statement1, statement2];
1284 assert!(expected1.iter().all(|s| recent1_hashes.contains(&s.hash())));
1285 assert!(expected1.iter().all(|s| recent1_statements.contains(s)));
1286
1287 let recent2 = store.take_recent_statements().unwrap();
1289 assert_eq!(recent2.len(), 0);
1290
1291 store.submit(statement3.clone(), StatementSource::Network);
1292
1293 let recent3 = store.take_recent_statements().unwrap();
1294 let (recent3_hashes, recent3_statements): (Vec<_>, Vec<_>) = recent3.into_iter().unzip();
1295 let expected3 = vec![statement3];
1296 assert!(expected3.iter().all(|s| recent3_hashes.contains(&s.hash())));
1297 assert!(expected3.iter().all(|s| recent3_statements.contains(s)));
1298
1299 assert_eq!(store.statements().unwrap().len(), 4);
1301 }
1302
1303 #[test]
1304 fn search_by_topic_and_key() {
1305 let (store, _temp) = test_store();
1306 let statement0 = signed_statement(0);
1307 let statement1 = signed_statement_with_topics(1, &[topic(0)], None);
1308 let statement2 = signed_statement_with_topics(2, &[topic(0), topic(1)], Some(dec_key(2)));
1309 let statement3 = signed_statement_with_topics(3, &[topic(0), topic(1), topic(2)], None);
1310 let statement4 =
1311 signed_statement_with_topics(4, &[topic(0), topic(42), topic(2), topic(3)], None);
1312 let statements = vec![statement0, statement1, statement2, statement3, statement4];
1313 for s in &statements {
1314 store.submit(s.clone(), StatementSource::Network);
1315 }
1316
1317 let assert_topics = |topics: &[u64], key: Option<u64>, expected: &[u8]| {
1318 let key = key.map(dec_key);
1319 let topics: Vec<_> = topics.iter().map(|t| topic(*t)).collect();
1320 let mut got_vals: Vec<_> = if let Some(key) = key {
1321 store.posted(&topics, key).unwrap().into_iter().map(|d| d[0]).collect()
1322 } else {
1323 store.broadcasts(&topics).unwrap().into_iter().map(|d| d[0]).collect()
1324 };
1325 got_vals.sort();
1326 assert_eq!(expected.to_vec(), got_vals);
1327 };
1328
1329 assert_topics(&[], None, &[0, 1, 3, 4]);
1330 assert_topics(&[], Some(2), &[2]);
1331 assert_topics(&[0], None, &[1, 3, 4]);
1332 assert_topics(&[1], None, &[3]);
1333 assert_topics(&[2], None, &[3, 4]);
1334 assert_topics(&[3], None, &[4]);
1335 assert_topics(&[42], None, &[4]);
1336
1337 assert_topics(&[0, 1], None, &[3]);
1338 assert_topics(&[0, 1], Some(2), &[2]);
1339 assert_topics(&[0, 1, 99], Some(2), &[]);
1340 assert_topics(&[1, 2], None, &[3]);
1341 assert_topics(&[99], None, &[]);
1342 assert_topics(&[0, 99], None, &[]);
1343 assert_topics(&[0, 1, 2, 3, 42], None, &[]);
1344 }
1345
1346 #[test]
1347 fn constraints() {
1348 let (store, _temp) = test_store();
1349
1350 store.index.write().options.max_total_size = 3000;
1351 let source = StatementSource::Network;
1352 let ok = SubmitResult::New(NetworkPriority::High);
1353 let ignored = SubmitResult::Ignored;
1354
1355 assert_eq!(store.submit(statement(1, 1, Some(1), 2000), source), ignored);
1359 assert_eq!(store.submit(statement(1, 1, Some(1), 500), source), ok);
1360 assert_eq!(store.submit(statement(1, 1, Some(1), 200), source), ignored);
1362 assert_eq!(store.submit(statement(1, 2, Some(1), 600), source), ok);
1363 assert_eq!(store.submit(statement(1, 1, Some(2), 100), source), ignored);
1366 assert_eq!(store.index.read().expired.len(), 1);
1367
1368 assert_eq!(store.submit(statement(2, 1, None, 500), source), ok);
1371 assert_eq!(store.submit(statement(2, 2, None, 100), source), ok);
1372 assert_eq!(store.submit(statement(2, 3, None, 500), source), ok);
1374 assert_eq!(store.index.read().expired.len(), 2);
1375 assert_eq!(store.submit(statement(2, 4, None, 1000), source), ok);
1377 assert_eq!(store.index.read().expired.len(), 4);
1378
1379 assert_eq!(store.submit(statement(3, 2, Some(1), 300), source), ok);
1382 assert_eq!(store.submit(statement(3, 3, Some(2), 300), source), ok);
1383 assert_eq!(store.submit(statement(3, 4, Some(3), 300), source), ok);
1384 assert_eq!(store.submit(statement(3, 5, None, 500), source), ok);
1386 assert_eq!(store.index.read().expired.len(), 6);
1387
1388 assert_eq!(store.index.read().total_size, 2400);
1389 assert_eq!(store.index.read().entries.len(), 4);
1390
1391 assert_eq!(store.submit(statement(1, 1, None, 700), source), ignored);
1393 store.index.write().options.max_total_statements = 4;
1395 assert_eq!(store.submit(statement(1, 1, None, 100), source), ignored);
1396
1397 let mut expected_statements = vec![
1398 statement(1, 2, Some(1), 600).hash(),
1399 statement(2, 4, None, 1000).hash(),
1400 statement(3, 4, Some(3), 300).hash(),
1401 statement(3, 5, None, 500).hash(),
1402 ];
1403 expected_statements.sort();
1404 let mut statements: Vec<_> =
1405 store.statements().unwrap().into_iter().map(|(hash, _)| hash).collect();
1406 statements.sort();
1407 assert_eq!(expected_statements, statements);
1408 }
1409
1410 #[test]
1411 fn max_statement_size_for_gossiping() {
1412 let (store, _temp) = test_store();
1413 store.index.write().options.max_total_size = 42 * crate::MAX_STATEMENT_SIZE;
1414
1415 assert_eq!(
1416 store.submit(
1417 statement(42, 1, Some(1), crate::MAX_STATEMENT_SIZE - 500),
1418 StatementSource::Local
1419 ),
1420 SubmitResult::New(NetworkPriority::High)
1421 );
1422
1423 assert_eq!(
1424 store.submit(
1425 statement(42, 2, Some(1), 2 * crate::MAX_STATEMENT_SIZE),
1426 StatementSource::Local
1427 ),
1428 SubmitResult::Ignored
1429 );
1430 }
1431
1432 #[test]
1433 fn expired_statements_are_purged() {
1434 use super::DEFAULT_PURGE_AFTER_SEC;
1435 let (mut store, temp) = test_store();
1436 let mut statement = statement(1, 1, Some(3), 100);
1437 store.set_time(0);
1438 statement.set_topic(0, topic(4));
1439 store.submit(statement.clone(), StatementSource::Network);
1440 assert_eq!(store.index.read().entries.len(), 1);
1441 store.remove(&statement.hash()).unwrap();
1442 assert_eq!(store.index.read().entries.len(), 0);
1443 assert_eq!(store.index.read().accounts.len(), 0);
1444 store.set_time(DEFAULT_PURGE_AFTER_SEC + 1);
1445 store.maintain();
1446 assert_eq!(store.index.read().expired.len(), 0);
1447 let keystore = store.keystore.clone();
1448 drop(store);
1449
1450 let client = std::sync::Arc::new(TestClient);
1451 let mut path: std::path::PathBuf = temp.path().into();
1452 path.push("db");
1453 let store = Store::new(&path, Default::default(), client, keystore, None).unwrap();
1454 assert_eq!(store.statements().unwrap().len(), 0);
1455 assert_eq!(store.index.read().expired.len(), 0);
1456 }
1457
1458 #[test]
1459 fn posted_clear_decrypts() {
1460 let (store, _temp) = test_store();
1461 let public = store
1462 .keystore
1463 .ed25519_generate_new(pezsp_core::crypto::key_types::STATEMENT, None)
1464 .unwrap();
1465 let statement1 = statement(1, 1, None, 100);
1466 let mut statement2 = statement(1, 2, None, 0);
1467 let plain = b"The most valuable secret".to_vec();
1468 statement2.encrypt(&plain, &public).unwrap();
1469 store.submit(statement1, StatementSource::Network);
1470 store.submit(statement2, StatementSource::Network);
1471 let posted_clear = store.posted_clear(&[], public.into()).unwrap();
1472 assert_eq!(posted_clear, vec![plain]);
1473 }
1474
1475 #[test]
1476 fn broadcasts_stmt_returns_encoded_statements() {
1477 let (store, _tmp) = test_store();
1478
1479 let s0 = signed_statement_with_topics(0, &[], None);
1481 let s1 = signed_statement_with_topics(1, &[topic(42)], None);
1483 let s2 = signed_statement_with_topics(2, &[topic(42)], Some(dec_key(99)));
1485
1486 for s in [&s0, &s1, &s2] {
1487 store.submit(s.clone(), StatementSource::Network);
1488 }
1489
1490 let mut hashes: Vec<_> = store
1492 .broadcasts_stmt(&[])
1493 .unwrap()
1494 .into_iter()
1495 .map(|bytes| Statement::decode(&mut &bytes[..]).unwrap().hash())
1496 .collect();
1497 hashes.sort();
1498 let expected_hashes = {
1499 let mut e = vec![s0.hash(), s1.hash()];
1500 e.sort();
1501 e
1502 };
1503 assert_eq!(hashes, expected_hashes);
1504
1505 let got = store.broadcasts_stmt(&[topic(42)]).unwrap();
1507 assert_eq!(got.len(), 1);
1508 let st = Statement::decode(&mut &got[0][..]).unwrap();
1509 assert_eq!(st.hash(), s1.hash());
1510 }
1511
1512 #[test]
1513 fn posted_stmt_returns_encoded_statements_for_dest() {
1514 let (store, _tmp) = test_store();
1515
1516 let public1 = store
1517 .keystore
1518 .ed25519_generate_new(pezsp_core::crypto::key_types::STATEMENT, None)
1519 .unwrap();
1520 let dest: [u8; 32] = public1.into();
1521
1522 let public2 = store
1523 .keystore
1524 .ed25519_generate_new(pezsp_core::crypto::key_types::STATEMENT, None)
1525 .unwrap();
1526
1527 let mut s_with_key = statement(1, 1, None, 0);
1529 let plain1 = b"The most valuable secret".to_vec();
1530 s_with_key.encrypt(&plain1, &public1).unwrap();
1531
1532 let mut s_other_key = statement(2, 2, None, 0);
1534 let plain2 = b"The second most valuable secret".to_vec();
1535 s_other_key.encrypt(&plain2, &public2).unwrap();
1536
1537 for s in [&s_with_key, &s_other_key] {
1539 store.submit(s.clone(), StatementSource::Network);
1540 }
1541
1542 let retrieved = store.posted_stmt(&[], dest).unwrap();
1544 assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1545
1546 let returned_stmt = Statement::decode(&mut &retrieved[0][..]).unwrap();
1548 assert_eq!(
1549 returned_stmt.hash(),
1550 s_with_key.hash(),
1551 "Returned statement must match s_with_key"
1552 );
1553 }
1554
1555 #[test]
1556 fn posted_clear_stmt_returns_statement_followed_by_plain_data() {
1557 let (store, _tmp) = test_store();
1558
1559 let public1 = store
1560 .keystore
1561 .ed25519_generate_new(pezsp_core::crypto::key_types::STATEMENT, None)
1562 .unwrap();
1563 let dest: [u8; 32] = public1.into();
1564
1565 let public2 = store
1566 .keystore
1567 .ed25519_generate_new(pezsp_core::crypto::key_types::STATEMENT, None)
1568 .unwrap();
1569
1570 let mut s_with_key = statement(1, 1, None, 0);
1572 let plain1 = b"The most valuable secret".to_vec();
1573 s_with_key.encrypt(&plain1, &public1).unwrap();
1574
1575 let mut s_other_key = statement(2, 2, None, 0);
1577 let plain2 = b"The second most valuable secret".to_vec();
1578 s_other_key.encrypt(&plain2, &public2).unwrap();
1579
1580 for s in [&s_with_key, &s_other_key] {
1582 store.submit(s.clone(), StatementSource::Network);
1583 }
1584
1585 let retrieved = store.posted_clear_stmt(&[], dest).unwrap();
1587 assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
1588
1589 let encoded_stmt = s_with_key.encode();
1591 let stmt_len = encoded_stmt.len();
1592
1593 assert_eq!(&retrieved[0][..stmt_len], &encoded_stmt[..]);
1595
1596 let trailing = &retrieved[0][stmt_len..];
1598 assert_eq!(trailing, &plain1[..]);
1599 }
1600
1601 #[test]
1602 fn posted_clear_returns_plain_data_for_dest_and_topics() {
1603 let (store, _tmp) = test_store();
1604
1605 let public_dest = store
1607 .keystore
1608 .ed25519_generate_new(pezsp_core::crypto::key_types::STATEMENT, None)
1609 .unwrap();
1610 let dest: [u8; 32] = public_dest.into();
1611
1612 let public_other = store
1613 .keystore
1614 .ed25519_generate_new(pezsp_core::crypto::key_types::STATEMENT, None)
1615 .unwrap();
1616
1617 let mut s_good = statement(1, 1, None, 0);
1619 let plaintext_good = b"The most valuable secret".to_vec();
1620 s_good.encrypt(&plaintext_good, &public_dest).unwrap();
1621 s_good.set_topic(0, topic(42));
1622
1623 let mut s_wrong_topic = statement(2, 2, None, 0);
1625 s_wrong_topic.encrypt(b"Wrong topic", &public_dest).unwrap();
1626 s_wrong_topic.set_topic(0, topic(99));
1627
1628 let mut s_other_dest = statement(3, 3, None, 0);
1630 s_other_dest.encrypt(b"Other dest", &public_other).unwrap();
1631 s_other_dest.set_topic(0, topic(42));
1632
1633 for s in [&s_good, &s_wrong_topic, &s_other_dest] {
1635 store.submit(s.clone(), StatementSource::Network);
1636 }
1637
1638 let retrieved = store.posted_clear(&[topic(42)], dest).unwrap();
1640
1641 assert_eq!(retrieved, vec![plaintext_good]);
1643 }
1644
1645 #[test]
1646 fn remove_by_covers_various_situations() {
1647 use pezsp_statement_store::{StatementSource, StatementStore, SubmitResult};
1648
1649 let (mut store, _temp) = test_store();
1651 store.set_time(0);
1652
1653 let t42 = topic(42);
1655 let k7 = dec_key(7);
1656
1657 let mut s_a1 = statement(4, 10, Some(100), 100);
1660 s_a1.set_topic(0, t42);
1661 let h_a1 = s_a1.hash();
1662
1663 let mut s_a2 = statement(4, 20, Some(200), 150);
1664 s_a2.set_decryption_key(k7);
1665 let h_a2 = s_a2.hash();
1666
1667 let s_a3 = statement(4, 30, None, 50);
1668 let h_a3 = s_a3.hash();
1669
1670 let s_b1 = statement(3, 10, None, 100);
1672 let h_b1 = s_b1.hash();
1673
1674 let mut s_b2 = statement(3, 15, Some(300), 100);
1675 s_b2.set_topic(0, t42);
1676 s_b2.set_decryption_key(k7);
1677 let h_b2 = s_b2.hash();
1678
1679 for s in [&s_a1, &s_a2, &s_a3, &s_b1, &s_b2] {
1681 assert!(matches!(
1682 store.submit(s.clone(), StatementSource::Network),
1683 SubmitResult::New(_)
1684 ));
1685 }
1686
1687 {
1689 let idx = store.index.read();
1690 assert_eq!(idx.entries.len(), 5, "all 5 should be present");
1691 assert!(idx.accounts.contains_key(&account(4)));
1692 assert!(idx.accounts.contains_key(&account(3)));
1693 assert_eq!(idx.total_size, 100 + 150 + 50 + 100 + 100);
1694
1695 let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1697 assert!(set_t.contains(&h_a1) && set_t.contains(&h_b2));
1698
1699 let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1700 assert!(set_k.contains(&h_a2) && set_k.contains(&h_b2));
1701 }
1702
1703 store.remove_by(account(4)).expect("remove_by should succeed");
1705
1706 {
1708 for h in [h_a1, h_a2, h_a3] {
1710 assert!(store.statement(&h).unwrap().is_none(), "A's statement should be removed");
1711 }
1712
1713 for h in [h_b1, h_b2] {
1715 assert!(store.statement(&h).unwrap().is_some(), "B's statement should remain");
1716 }
1717
1718 let idx = store.index.read();
1719
1720 assert!(!idx.accounts.contains_key(&account(4)), "Account A must be gone");
1722 assert!(idx.accounts.contains_key(&account(3)), "Account B must remain");
1723
1724 assert!(idx.expired.contains_key(&h_a1));
1726 assert!(idx.expired.contains_key(&h_a2));
1727 assert!(idx.expired.contains_key(&h_a3));
1728 assert_eq!(idx.expired.len(), 3);
1729
1730 assert_eq!(idx.entries.len(), 2);
1732 assert_eq!(idx.total_size, 100 + 100);
1733
1734 let set_t = idx.by_topic.get(&t42).expect("topic set exists");
1736 assert!(set_t.contains(&h_b2));
1737 assert!(!set_t.contains(&h_a1));
1738
1739 let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
1741 assert!(set_k.contains(&h_b2));
1742 assert!(!set_k.contains(&h_a2));
1743 }
1744
1745 store.remove_by(account(4)).expect("second remove_by should be a no-op");
1747
1748 let purge_after = store.index.read().options.purge_after_sec;
1750 store.set_time(purge_after + 1);
1751 store.maintain();
1752 assert_eq!(store.index.read().expired.len(), 0, "expired entries should be purged");
1753
1754 let s_new = statement(4, 40, None, 10);
1756 assert!(matches!(store.submit(s_new, StatementSource::Network), SubmitResult::New(_)));
1757 }
1758}