1#![warn(missing_docs)]
36#![warn(unused_extern_crates)]
37
38mod metrics;
39mod subscription;
40
41use self::subscription::{SubscriptionStatementsStream, SubscriptionsHandle};
42use futures::FutureExt;
43use metrics::MetricsLink as PrometheusMetrics;
44use parking_lot::{lock_api::RwLockUpgradableReadGuard, RwLock};
45use soil_prometheus::Registry as PrometheusRegistry;
46use soil_client::blockchain::HeaderBackend;
47use soil_client::client_api::{backend::StorageProvider, Backend, StorageKey};
48use soil_client::keystore::LocalKeystore;
49use soil_statement_store::{
50 runtime_api::{StatementSource, StatementStoreExt},
51 AccountId, BlockHash, Channel, DecryptionKey, FilterDecision, Hash, InvalidReason,
52 OptimizedTopicFilter, Proof, RejectionReason, Result, SignatureVerificationResult, Statement,
53 StatementAllowance, StatementEvent, SubmitResult, Topic,
54};
55pub use soil_statement_store::{Error, StatementStore, MAX_TOPICS};
56use std::{
57 collections::{BTreeMap, HashMap, HashSet},
58 sync::Arc,
59 time::{Duration, Instant},
60};
61pub use subscription::StatementStoreSubscriptionApi;
62use subsoil::core::{
63 crypto::UncheckedFrom, hexdisplay::HexDisplay, traits::SpawnNamed, Decode, Encode,
64};
65use subsoil::runtime::traits::Block as BlockT;
66
67const KEY_VERSION: &[u8] = b"version".as_slice();
68const CURRENT_VERSION: u32 = 1;
69
70const LOG_TARGET: &str = "statement-store";
71
72pub const DEFAULT_PURGE_AFTER_SEC: u64 = 2 * 24 * 60 * 60; pub const DEFAULT_MAX_TOTAL_STATEMENTS: usize = 4 * 1024 * 1024; pub const DEFAULT_MAX_TOTAL_SIZE: usize = 2 * 1024 * 1024 * 1024; pub const MAX_STATEMENT_SIZE: usize =
82 crate::statement::config::MAX_STATEMENT_NOTIFICATION_SIZE as usize - 1;
83
84const MAX_EXPIRY_STATEMENTS_PER_ITERATION: usize = 10_000;
86const MAX_EXPIRY_ACCOUNTS_PER_ITERATION: usize = 10_000;
88const MAX_EXPIRY_TIME_PER_ITERATION: Duration = Duration::from_millis(100);
90
91const NUM_FILTER_WORKERS: usize = 1;
93
94const MAINTENANCE_PERIOD: std::time::Duration = std::time::Duration::from_secs(29);
95
96const ENFORCE_LIMITS_PERIOD: std::time::Duration = std::time::Duration::from_secs(31);
100
101mod col {
102 pub const META: u8 = 0;
103 pub const STATEMENTS: u8 = 1;
104 pub const EXPIRED: u8 = 2;
105
106 pub const COUNT: u8 = 3;
107}
108
109#[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Clone, Copy)]
110struct Expiry(u64);
111
112#[derive(PartialEq, Eq)]
113struct PriorityKey {
114 hash: Hash,
115 expiry: Expiry,
116}
117
118impl PartialOrd for PriorityKey {
119 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
120 Some(self.cmp(other))
121 }
122}
123
124impl Ord for PriorityKey {
125 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
126 self.expiry.cmp(&other.expiry).then_with(|| self.hash.cmp(&other.hash))
127 }
128}
129
130#[derive(PartialEq, Eq)]
131struct ChannelEntry {
132 hash: Hash,
133 expiry: Expiry,
134}
135
136#[derive(Default)]
137struct StatementsForAccount {
138 by_priority: BTreeMap<PriorityKey, (Option<Channel>, usize)>,
140 channels: HashMap<Channel, ChannelEntry>,
142 data_size: usize,
144}
145
146impl StatementsForAccount {
147 fn expired_by_iter(
149 &self,
150 current_time: u64,
151 ) -> impl Iterator<Item = (&PriorityKey, &(Option<Channel>, usize))> {
152 let range = PriorityKey { hash: Hash::default(), expiry: Expiry(0) }..PriorityKey {
153 hash: Hash::default(),
154 expiry: Expiry(current_time << 32),
155 };
156 self.by_priority.range(range)
157 }
158}
159
160pub struct Options {
162 max_total_statements: usize,
165 max_total_size: usize,
168 purge_after_sec: u64,
170}
171
172impl Default for Options {
173 fn default() -> Self {
174 Options {
175 max_total_statements: DEFAULT_MAX_TOTAL_STATEMENTS,
176 max_total_size: DEFAULT_MAX_TOTAL_SIZE,
177 purge_after_sec: DEFAULT_PURGE_AFTER_SEC,
178 }
179 }
180}
181
182#[derive(Default)]
183struct Index {
184 recent: HashSet<Hash>,
185 by_topic: HashMap<Topic, HashSet<Hash>>,
186 by_dec_key: HashMap<Option<DecryptionKey>, HashSet<Hash>>,
187 topics_and_keys: HashMap<Hash, ([Option<Topic>; MAX_TOPICS], Option<DecryptionKey>)>,
188 entries: HashMap<Hash, (AccountId, Expiry, usize)>,
189 expired: HashMap<Hash, u64>, accounts: HashMap<AccountId, StatementsForAccount>,
191 accounts_to_check_for_expiry_stmts: Vec<AccountId>,
192 options: Options,
193 total_size: usize,
194}
195
196struct ClientWrapper<Block, Client, BE> {
197 client: Arc<Client>,
198 _block: std::marker::PhantomData<Block>,
199 _backend: std::marker::PhantomData<BE>,
200}
201
202impl<Block, Client, BE> ClientWrapper<Block, Client, BE>
203where
204 Block: BlockT,
205 Block::Hash: From<BlockHash>,
206 BE: Backend<Block> + 'static,
207 Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
208{
209 fn read_allowance(
210 &self,
211 account_id: &AccountId,
212 block_hash: Option<Block::Hash>,
213 ) -> Result<Option<StatementAllowance>> {
214 use soil_statement_store::{statement_allowance_key, StatementAllowance};
215
216 let block_hash = block_hash.unwrap_or(self.client.info().finalized_hash);
217 let key = statement_allowance_key(account_id);
218 let storage_key = StorageKey(key);
219 self.client
220 .storage(block_hash, &storage_key)
221 .map_err(|e| Error::Storage(format!("Failed to read allowance: {:?}", e)))?
222 .map(|value| {
223 StatementAllowance::decode(&mut &value.0[..])
224 .map_err(|e| Error::Decode(format!("Failed to decode allowance: {:?}", e)))
225 })
226 .transpose()
227 }
228}
229
230pub struct Store {
232 db: parity_db::Db,
233 index: RwLock<Index>,
234 read_allowance_fn: Box<
235 dyn Fn(&AccountId, Option<BlockHash>) -> Result<Option<StatementAllowance>> + Send + Sync,
236 >,
237 subscription_manager: SubscriptionsHandle,
238 keystore: Arc<LocalKeystore>,
239 time_override: Option<u64>,
241 metrics: PrometheusMetrics,
242}
243
244enum IndexQuery {
245 Unknown,
246 Exists,
247 Expired,
248}
249
250impl Index {
251 fn new(options: Options) -> Index {
252 Index { options, ..Default::default() }
253 }
254
255 fn insert_new(
256 &mut self,
257 hash: Hash,
258 account: AccountId,
259 statement: &Statement,
260 is_recent: bool,
261 ) {
262 let mut all_topics = [None; MAX_TOPICS];
263 let mut nt = 0;
264 while let Some(t) = statement.topic(nt) {
265 self.by_topic.entry(t).or_default().insert(hash);
266 all_topics[nt] = Some(t);
267 nt += 1;
268 }
269 let key = statement.decryption_key();
270 self.by_dec_key.entry(key).or_default().insert(hash);
271 if nt > 0 || key.is_some() {
272 self.topics_and_keys.insert(hash, (all_topics, key));
273 }
274 let expiry = Expiry(statement.expiry());
275 self.entries.insert(hash, (account, expiry, statement.data_len()));
276 if is_recent {
277 self.recent.insert(hash);
278 }
279 self.total_size += statement.data_len();
280 let account_info = self.accounts.entry(account).or_default();
281 account_info.data_size += statement.data_len();
282 if let Some(channel) = statement.channel() {
283 account_info.channels.insert(channel, ChannelEntry { hash, expiry });
284 }
285 account_info
286 .by_priority
287 .insert(PriorityKey { hash, expiry }, (statement.channel(), statement.data_len()));
288 }
289
290 fn query(&self, hash: &Hash) -> IndexQuery {
291 if self.entries.contains_key(hash) {
292 return IndexQuery::Exists;
293 }
294 if self.expired.contains_key(hash) {
295 return IndexQuery::Expired;
296 }
297 IndexQuery::Unknown
298 }
299
300 fn insert_expired(&mut self, hash: Hash, timestamp: u64) {
301 self.expired.insert(hash, timestamp);
302 }
303
304 fn iterate_with(
305 &self,
306 key: Option<DecryptionKey>,
307 topic: &OptimizedTopicFilter,
308 f: impl FnMut(&Hash) -> Result<()>,
309 ) -> Result<()> {
310 match topic {
311 OptimizedTopicFilter::Any => self.iterate_with_any(key, f),
312 OptimizedTopicFilter::MatchAll(topics) => {
313 self.iterate_with_match_all(key, topics.iter(), f)
314 },
315 OptimizedTopicFilter::MatchAny(topics) => {
316 self.iterate_with_match_any(key, topics.iter(), f)
317 },
318 }
319 }
320
321 fn iterate_with_match_any<'a>(
322 &self,
323 key: Option<DecryptionKey>,
324 match_any_topics: impl ExactSizeIterator<Item = &'a Topic>,
325 mut f: impl FnMut(&Hash) -> Result<()>,
326 ) -> Result<()> {
327 let Some(key_set) = self.by_dec_key.get(&key).filter(|k| !k.is_empty()) else {
328 return Ok(());
329 };
330
331 for t in match_any_topics {
332 let set = self.by_topic.get(t);
333
334 for item in set.iter().flat_map(|set| set.iter()) {
335 if key_set.contains(item) {
336 log::trace!(
337 target: LOG_TARGET,
338 "Iterating by topic/key: statement {:?}",
339 HexDisplay::from(item)
340 );
341 f(item)?
342 }
343 }
344 }
345 Ok(())
346 }
347
348 fn iterate_with_any(
349 &self,
350 key: Option<DecryptionKey>,
351 mut f: impl FnMut(&Hash) -> Result<()>,
352 ) -> Result<()> {
353 let key_set = self.by_dec_key.get(&key);
354 if key_set.map_or(true, |s| s.is_empty()) {
355 return Ok(());
357 }
358
359 for item in key_set.map(|hashes| hashes.iter()).into_iter().flatten() {
360 f(item)?
361 }
362 Ok(())
363 }
364
365 fn iterate_with_match_all<'a>(
366 &self,
367 key: Option<DecryptionKey>,
368 match_all_topics: impl ExactSizeIterator<Item = &'a Topic>,
369 mut f: impl FnMut(&Hash) -> Result<()>,
370 ) -> Result<()> {
371 let empty = HashSet::new();
372 let mut sets: [&HashSet<Hash>; MAX_TOPICS + 1] = [∅ MAX_TOPICS + 1];
373 let num_topics = match_all_topics.len();
374 if num_topics > MAX_TOPICS {
375 return Ok(());
376 }
377 let key_set = self.by_dec_key.get(&key);
378 if key_set.map_or(true, |s| s.is_empty()) {
379 return Ok(());
381 }
382 sets[0] = key_set.expect("Function returns if key_set is None");
383 for (i, t) in match_all_topics.enumerate() {
384 let set = self.by_topic.get(t);
385 if set.map_or(0, |s| s.len()) == 0 {
386 return Ok(());
388 }
389 sets[i + 1] = set.expect("Function returns if set is None");
390 }
391 let sets = &mut sets[0..num_topics + 1];
392 sets.sort_by_key(|s| s.len());
394 for item in sets[0] {
395 if sets[1..].iter().all(|set| set.contains(item)) {
396 log::trace!(
397 target: LOG_TARGET,
398 "Iterating by topic/key: statement {:?}",
399 HexDisplay::from(item)
400 );
401 f(item)?
402 }
403 }
404 Ok(())
405 }
406
407 fn maintain(&mut self, current_time: u64) -> Vec<Hash> {
408 let mut purged = Vec::new();
410 self.expired.retain(|hash, timestamp| {
411 if *timestamp + self.options.purge_after_sec <= current_time {
412 purged.push(*hash);
413 log::trace!(target: LOG_TARGET, "Purged statement {:?}", HexDisplay::from(hash));
414 false
415 } else {
416 true
417 }
418 });
419 purged
420 }
421
422 fn take_recent(&mut self) -> HashSet<Hash> {
423 std::mem::take(&mut self.recent)
424 }
425
426 fn make_expired(&mut self, hash: &Hash, current_time: u64) -> bool {
427 if let Some((account, expiry, len)) = self.entries.remove(hash) {
428 self.total_size -= len;
429 if let Some((topics, key)) = self.topics_and_keys.remove(hash) {
430 for t in topics.into_iter().flatten() {
431 if let std::collections::hash_map::Entry::Occupied(mut set) =
432 self.by_topic.entry(t)
433 {
434 set.get_mut().remove(hash);
435 if set.get().is_empty() {
436 set.remove_entry();
437 }
438 }
439 }
440 if let std::collections::hash_map::Entry::Occupied(mut set) =
441 self.by_dec_key.entry(key)
442 {
443 set.get_mut().remove(hash);
444 if set.get().is_empty() {
445 set.remove_entry();
446 }
447 }
448 }
449 let _ = self.recent.remove(hash);
450 self.expired.insert(*hash, current_time);
451 if let std::collections::hash_map::Entry::Occupied(mut account_rec) =
452 self.accounts.entry(account)
453 {
454 let key = PriorityKey { hash: *hash, expiry };
455 if let Some((channel, len)) = account_rec.get_mut().by_priority.remove(&key) {
456 account_rec.get_mut().data_size -= len;
457 if let Some(channel) = channel {
458 account_rec.get_mut().channels.remove(&channel);
459 }
460 }
461 if account_rec.get().by_priority.is_empty() {
462 account_rec.remove_entry();
463 }
464 }
465 log::trace!(target: LOG_TARGET, "Expired statement {:?}", HexDisplay::from(hash));
466 true
467 } else {
468 false
469 }
470 }
471
472 fn insert(
473 &mut self,
474 hash: Hash,
475 statement: &Statement,
476 account: &AccountId,
477 validation: &StatementAllowance,
478 current_time: u64,
479 ) -> std::result::Result<HashSet<Hash>, RejectionReason> {
480 let statement_len = statement.data_len();
481 if statement_len > validation.max_size as usize {
482 log::debug!(
483 target: LOG_TARGET,
484 "Ignored oversize message: {:?} ({} bytes)",
485 HexDisplay::from(&hash),
486 statement_len,
487 );
488 return Err(RejectionReason::DataTooLarge {
489 submitted_size: statement_len,
490 available_size: validation.max_size as usize,
491 });
492 }
493
494 let mut evicted = HashSet::new();
495 let mut would_free_size = 0;
496 let expiry = Expiry(statement.expiry());
497 let (max_size, max_count) = (validation.max_size as usize, validation.max_count as usize);
498 if let Some(account_rec) = self.accounts.get(account) {
502 if let Some(channel) = statement.channel() {
503 if let Some(channel_record) = account_rec.channels.get(&channel) {
504 if expiry <= channel_record.expiry {
505 log::debug!(
507 target: LOG_TARGET,
508 "Ignored lower priority channel message: {:?} {:?} <= {:?}",
509 HexDisplay::from(&hash),
510 expiry,
511 channel_record.expiry,
512 );
513 return Err(RejectionReason::ChannelPriorityTooLow {
514 submitted_expiry: expiry.0,
515 min_expiry: channel_record.expiry.0,
516 });
517 } else {
518 log::debug!(
521 target: LOG_TARGET,
522 "Replacing higher priority channel message: {:?} ({:?}) > {:?} ({:?})",
523 HexDisplay::from(&hash),
524 expiry,
525 HexDisplay::from(&channel_record.hash),
526 channel_record.expiry,
527 );
528 let key = PriorityKey {
529 hash: channel_record.hash,
530 expiry: channel_record.expiry,
531 };
532 if let Some((_channel, len)) = account_rec.by_priority.get(&key) {
533 would_free_size += *len;
534 evicted.insert(channel_record.hash);
535 }
536 }
537 }
538 }
539 for (entry, (_, len)) in account_rec.by_priority.iter() {
541 if (account_rec.data_size - would_free_size + statement_len <= max_size)
542 && account_rec.by_priority.len() + 1 - evicted.len() <= max_count
543 {
544 break;
546 }
547 if evicted.contains(&entry.hash) {
548 continue;
550 }
551 if entry.expiry >= expiry {
552 log::debug!(
553 target: LOG_TARGET,
554 "Ignored message due to constraints {:?} {:?} < {:?}",
555 HexDisplay::from(&hash),
556 expiry,
557 entry.expiry,
558 );
559 return Err(RejectionReason::AccountFull {
560 submitted_expiry: expiry.0,
561 min_expiry: entry.expiry.0,
562 });
563 }
564 evicted.insert(entry.hash);
565 would_free_size += len;
566 }
567 }
568 if !((self.total_size - would_free_size + statement_len <= self.options.max_total_size)
570 && self.entries.len() + 1 - evicted.len() <= self.options.max_total_statements)
571 {
572 log::debug!(
573 target: LOG_TARGET,
574 "Ignored statement {} because the store is full (size={}, count={})",
575 HexDisplay::from(&hash),
576 self.total_size,
577 self.entries.len(),
578 );
579 return Err(RejectionReason::StoreFull);
580 }
581
582 for h in &evicted {
583 self.make_expired(h, current_time);
584 }
585 self.insert_new(hash, *account, statement, true);
586 Ok(evicted)
587 }
588}
589
590impl Store {
591 pub fn new_shared<Block, Client, BE>(
594 path: &std::path::Path,
595 options: Options,
596 client: Arc<Client>,
597 keystore: Arc<LocalKeystore>,
598 prometheus: Option<&PrometheusRegistry>,
599 task_spawner: Box<dyn SpawnNamed>,
600 ) -> Result<Arc<Store>>
601 where
602 Block: BlockT,
603 Block::Hash: From<BlockHash>,
604 BE: Backend<Block> + 'static,
605 Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
606 {
607 let store =
608 Arc::new(Self::new(path, options, client, keystore, prometheus, task_spawner.clone())?);
609
610 let worker_store = store.clone();
612 task_spawner.spawn(
613 "statement-store-maintenance",
614 Some("statement-store"),
615 Box::pin(async move {
616 let mut maintenance_interval = tokio::time::interval(MAINTENANCE_PERIOD);
617 let mut enforce_limits_interval = tokio::time::interval(ENFORCE_LIMITS_PERIOD);
618 loop {
619 futures::select! {
620 _ = maintenance_interval.tick().fuse() => {worker_store.maintain();}
621 _ = enforce_limits_interval.tick().fuse() => {worker_store.enforce_limits();}
622 }
623 }
624 }),
625 );
626
627 Ok(store)
628 }
629
630 #[doc(hidden)]
633 pub fn new<Block, Client, BE>(
634 path: &std::path::Path,
635 options: Options,
636 client: Arc<Client>,
637 keystore: Arc<LocalKeystore>,
638 prometheus: Option<&PrometheusRegistry>,
639 task_spawner: Box<dyn SpawnNamed>,
640 ) -> Result<Store>
641 where
642 Block: BlockT,
643 Block::Hash: From<BlockHash>,
644 BE: Backend<Block> + 'static,
645 Client: HeaderBackend<Block> + StorageProvider<Block, BE> + Send + Sync + 'static,
646 {
647 let mut path: std::path::PathBuf = path.into();
648 path.push("statements");
649
650 let mut config = parity_db::Options::with_columns(&path, col::COUNT);
651
652 let statement_col = &mut config.columns[col::STATEMENTS as usize];
653 statement_col.ref_counted = false;
654 statement_col.preimage = true;
655 statement_col.uniform = true;
656 let db = parity_db::Db::open_or_create(&config).map_err(|e| Error::Db(e.to_string()))?;
657 match db.get(col::META, &KEY_VERSION).map_err(|e| Error::Db(e.to_string()))? {
658 Some(version) => {
659 let version = u32::from_le_bytes(
660 version
661 .try_into()
662 .map_err(|_| Error::Db("Error reading database version".into()))?,
663 );
664 if version != CURRENT_VERSION {
665 return Err(Error::Db(format!("Unsupported database version: {version}")));
666 }
667 },
668 None => {
669 db.commit([(
670 col::META,
671 KEY_VERSION.to_vec(),
672 Some(CURRENT_VERSION.to_le_bytes().to_vec()),
673 )])
674 .map_err(|e| Error::Db(e.to_string()))?;
675 },
676 }
677
678 let storage_reader =
679 ClientWrapper { client, _block: Default::default(), _backend: Default::default() };
680 let read_allowance_fn =
681 Box::new(move |account_id: &AccountId, block_hash: Option<BlockHash>| {
682 storage_reader.read_allowance(account_id, block_hash.map(Into::into))
683 });
684
685 let store = Store {
686 db,
687 index: RwLock::new(Index::new(options)),
688 read_allowance_fn,
689 keystore,
690 time_override: None,
691 metrics: PrometheusMetrics::new(prometheus),
692 subscription_manager: SubscriptionsHandle::new(
693 task_spawner.clone(),
694 NUM_FILTER_WORKERS,
695 ),
696 };
697 store.populate()?;
698 Ok(store)
699 }
700
701 fn populate(&self) -> Result<()> {
706 {
707 let mut index = self.index.write();
708 self.db
709 .iter_column_while(col::STATEMENTS, |item| {
710 let statement = item.value;
711 if let Ok(statement) = Statement::decode(&mut statement.as_slice()) {
712 let hash = statement.hash();
713 log::trace!(
714 target: LOG_TARGET,
715 "Statement loaded {:?}",
716 HexDisplay::from(&hash)
717 );
718 if let Some(account_id) = statement.account_id() {
719 index.insert_new(hash, account_id, &statement, false);
720 } else {
721 log::debug!(
722 target: LOG_TARGET,
723 "Error decoding statement loaded from the DB: {:?}",
724 HexDisplay::from(&hash)
725 );
726 }
727 }
728 true
729 })
730 .map_err(|e| Error::Db(e.to_string()))?;
731 self.db
732 .iter_column_while(col::EXPIRED, |item| {
733 let expired_info = item.value;
734 if let Ok((hash, timestamp)) =
735 <(Hash, u64)>::decode(&mut expired_info.as_slice())
736 {
737 log::trace!(
738 target: LOG_TARGET,
739 "Statement loaded (expired): {:?}",
740 HexDisplay::from(&hash)
741 );
742 index.insert_expired(hash, timestamp);
743 }
744 true
745 })
746 .map_err(|e| Error::Db(e.to_string()))?;
747 }
748
749 self.maintain();
750 Ok(())
751 }
752
753 fn collect_statements_locked<R>(
754 &self,
755 key: Option<DecryptionKey>,
756 topic_filter: &OptimizedTopicFilter,
757 index: &Index,
758 result: &mut Vec<R>,
759 mut f: impl FnMut(Statement) -> Option<R>,
760 ) -> Result<()> {
761 index.iterate_with(key, topic_filter, |hash| {
762 match self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))? {
763 Some(entry) => {
764 if let Ok(statement) = Statement::decode(&mut entry.as_slice()) {
765 if let Some(data) = f(statement) {
766 result.push(data);
767 }
768 } else {
769 log::warn!(
771 target: LOG_TARGET,
772 "Corrupt statement {:?}",
773 HexDisplay::from(hash)
774 );
775 }
776 },
777 None => {
778 log::warn!(
780 target: LOG_TARGET,
781 "Missing statement {:?}",
782 HexDisplay::from(hash)
783 );
784 },
785 }
786 Ok(())
787 })?;
788 Ok(())
789 }
790
791 fn collect_statements<R>(
792 &self,
793 key: Option<DecryptionKey>,
794 topic_filter: &OptimizedTopicFilter,
795 f: impl FnMut(Statement) -> Option<R>,
796 ) -> Result<Vec<R>> {
797 let mut result = Vec::new();
798 let index = self.index.read();
799 self.collect_statements_locked(key, topic_filter, &index, &mut result, f)?;
800 Ok(result)
801 }
802
803 fn collect_evictions(
805 &self,
806 account: &AccountId,
807 account_rec: &StatementsForAccount,
808 current_time: u64,
809 ) -> Vec<Hash> {
810 let mut to_evict = Vec::new();
811 let mut expired_count = 0usize;
812 let mut expired_size = 0usize;
813 for (key, (_, len)) in account_rec.expired_by_iter(current_time) {
814 to_evict.push(key.hash);
815 expired_count += 1;
816 expired_size += len;
817 }
818
819 let allowance = match (self.read_allowance_fn)(account, None) {
821 Ok(Some(allowance)) => allowance,
822 Ok(None) => {
823 log::debug!(
824 target: LOG_TARGET,
825 "No allowance found for account {:?}, treating as zero allowance",
826 HexDisplay::from(account)
827 );
828 StatementAllowance { max_count: 0, max_size: 0 }
829 },
830 Err(e) => {
831 log::error!(target: LOG_TARGET, "Error reading allowance: {:?}", e);
832 return to_evict;
834 },
835 };
836
837 let mut remaining_count = account_rec.by_priority.len() - expired_count;
839 let mut remaining_size = account_rec.data_size - expired_size;
840
841 if remaining_count > allowance.max_count as usize
843 || remaining_size > allowance.max_size as usize
844 {
845 log::debug!(
846 target: LOG_TARGET,
847 "Account {:?} exceeds allowance: count={}/{}, size={}/{}",
848 HexDisplay::from(account),
849 remaining_count,
850 allowance.max_count,
851 remaining_size,
852 allowance.max_size
853 );
854
855 for (key, (_, len)) in account_rec.by_priority.iter().skip(expired_count) {
857 if remaining_count <= allowance.max_count as usize
858 && remaining_size <= allowance.max_size as usize
859 {
860 break;
861 }
862 to_evict.push(key.hash);
863 remaining_count -= 1;
864 remaining_size -= len;
865 log::debug!(
866 target: LOG_TARGET,
867 "Evicting statement {:?} due to allowance enforcement",
868 HexDisplay::from(&key.hash)
869 );
870 }
871 }
872
873 to_evict
874 }
875
876 fn enforce_limits(&self) {
893 let _start_check_expiration_timer = self.metrics.start_check_expiration_timer();
894 let current_time = self.timestamp();
895
896 let (to_evict, num_accounts_checked) = {
897 let index = self.index.upgradable_read();
898 if index.accounts_to_check_for_expiry_stmts.is_empty() {
899 let existing_accounts = index.accounts.keys().cloned().collect::<Vec<_>>();
900 let mut index = RwLockUpgradableReadGuard::upgrade(index);
901 index.accounts_to_check_for_expiry_stmts = existing_accounts;
902 return;
903 }
904
905 let mut to_evict = Vec::new();
906 let mut num_accounts_checked = 0;
907 let start = Instant::now();
908
909 for account in index.accounts_to_check_for_expiry_stmts.iter().rev() {
910 num_accounts_checked += 1;
911 if let Some(account_rec) = index.accounts.get(account) {
912 to_evict.extend(self.collect_evictions(account, account_rec, current_time));
913 }
914
915 if to_evict.len() >= MAX_EXPIRY_STATEMENTS_PER_ITERATION
916 || num_accounts_checked >= MAX_EXPIRY_ACCOUNTS_PER_ITERATION
917 || start.elapsed() >= MAX_EXPIRY_TIME_PER_ITERATION
918 {
919 break;
920 }
921 }
922
923 (to_evict, num_accounts_checked)
924 };
925
926 let mut expired = 0;
927
928 for hash in to_evict {
929 if let Err(e) = self.remove(&hash) {
930 log::debug!(
931 target: LOG_TARGET,
932 "Error marking statement {:?} as expired: {:?}",
933 HexDisplay::from(&hash),
934 e
935 );
936 } else {
937 expired += 1;
938 log::trace!(
939 target: LOG_TARGET,
940 "Marked statement {:?} as expired",
941 HexDisplay::from(&hash)
942 );
943 }
944 }
945
946 let mut index = self.index.write();
947 let new_len = index
948 .accounts_to_check_for_expiry_stmts
949 .len()
950 .saturating_sub(num_accounts_checked);
951 index.accounts_to_check_for_expiry_stmts.truncate(new_len);
952
953 drop(_start_check_expiration_timer);
954
955 self.metrics.report(|metrics| {
956 metrics.statements_expired_total.inc_by(expired);
957 });
958 }
959
960 pub fn maintain(&self) {
962 log::trace!(target: LOG_TARGET, "Started store maintenance");
963 let (
964 deleted,
965 active_count,
966 expired_count,
967 total_size,
968 accounts_count,
969 capacity_statements,
970 capacity_bytes,
971 ): (Vec<_>, usize, usize, usize, usize, usize, usize) = {
972 let mut index = self.index.write();
973 let deleted = index.maintain(self.timestamp());
974 (
975 deleted,
976 index.entries.len(),
977 index.expired.len(),
978 index.total_size,
979 index.accounts.len(),
980 index.options.max_total_statements,
981 index.options.max_total_size,
982 )
983 };
984 let deleted: Vec<_> =
985 deleted.into_iter().map(|hash| (col::EXPIRED, hash.to_vec(), None)).collect();
986 let deleted_count = deleted.len() as u64;
987 if let Err(e) = self.db.commit(deleted) {
988 log::warn!(target: LOG_TARGET, "Error writing to the statement database: {:?}", e);
989 } else {
990 self.metrics.report(|metrics| metrics.statements_pruned.inc_by(deleted_count));
991 }
992
993 self.metrics.report(|metrics| {
994 metrics.statements_total.set(active_count as u64);
995 metrics.bytes_total.set(total_size as u64);
996 metrics.accounts_total.set(accounts_count as u64);
997 metrics.expired_total.set(expired_count as u64);
998 metrics.capacity_statements.set(capacity_statements as u64);
999 metrics.capacity_bytes.set(capacity_bytes as u64);
1000 });
1001
1002 log::trace!(
1003 target: LOG_TARGET,
1004 "Completed store maintenance. Purged: {}, Active: {}, Expired: {}",
1005 deleted_count,
1006 active_count,
1007 expired_count
1008 );
1009 }
1010
1011 fn timestamp(&self) -> u64 {
1012 self.time_override.unwrap_or_else(|| {
1013 std::time::SystemTime::now()
1014 .duration_since(std::time::UNIX_EPOCH)
1015 .unwrap_or_default()
1016 .as_secs()
1017 })
1018 }
1019
1020 #[cfg(test)]
1021 fn set_time(&mut self, time: u64) {
1022 self.time_override = Some(time);
1023 }
1024
1025 pub fn as_statement_store_ext(self: Arc<Self>) -> StatementStoreExt {
1027 StatementStoreExt::new(self)
1028 }
1029
1030 fn posted_clear_inner<R>(
1033 &self,
1034 match_all_topics: &[Topic],
1035 dest: [u8; 32],
1036 mut map_f: impl FnMut(Statement, Vec<u8>) -> R,
1038 ) -> Result<Vec<R>> {
1039 self.collect_statements(
1040 Some(dest),
1041 &OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1042 |statement| {
1043 if let (Some(key), Some(_)) = (statement.decryption_key(), statement.data()) {
1044 let public: subsoil::core::ed25519::Public = UncheckedFrom::unchecked_from(key);
1045 let public: soil_statement_store::ed25519::Public = public.into();
1046 match self.keystore.key_pair::<soil_statement_store::ed25519::Pair>(&public) {
1047 Err(e) => {
1048 log::debug!(
1049 target: LOG_TARGET,
1050 "Keystore error: {:?}, for statement {:?}",
1051 e,
1052 HexDisplay::from(&statement.hash())
1053 );
1054 None
1055 },
1056 Ok(None) => {
1057 log::debug!(
1058 target: LOG_TARGET,
1059 "Keystore is missing key for statement {:?}",
1060 HexDisplay::from(&statement.hash())
1061 );
1062 None
1063 },
1064 Ok(Some(pair)) => match statement.decrypt_private(&pair.into_inner()) {
1065 Ok(r) => r.map(|data| map_f(statement, data)),
1066 Err(e) => {
1067 log::debug!(
1068 target: LOG_TARGET,
1069 "Decryption error: {:?}, for statement {:?}",
1070 e,
1071 HexDisplay::from(&statement.hash())
1072 );
1073 None
1074 },
1075 },
1076 }
1077 } else {
1078 None
1079 }
1080 },
1081 )
1082 }
1083}
1084
1085impl StatementStore for Store {
1086 fn statements(&self) -> Result<Vec<(Hash, Statement)>> {
1088 let index = self.index.read();
1089 let mut result = Vec::with_capacity(index.entries.len());
1090 for hash in index.entries.keys().cloned() {
1091 let Some(encoded) =
1092 self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
1093 else {
1094 continue;
1095 };
1096 if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
1097 result.push((hash, statement));
1098 }
1099 }
1100 Ok(result)
1101 }
1102
1103 fn take_recent_statements(&self) -> Result<Vec<(Hash, Statement)>> {
1104 let mut index = self.index.write();
1105 let recent = index.take_recent();
1106 let mut result = Vec::with_capacity(recent.len());
1107 for hash in recent {
1108 let Some(encoded) =
1109 self.db.get(col::STATEMENTS, &hash).map_err(|e| Error::Db(e.to_string()))?
1110 else {
1111 continue;
1112 };
1113 if let Ok(statement) = Statement::decode(&mut encoded.as_slice()) {
1114 result.push((hash, statement));
1115 }
1116 }
1117 Ok(result)
1118 }
1119
1120 fn statement(&self, hash: &Hash) -> Result<Option<Statement>> {
1122 Ok(
1123 match self
1124 .db
1125 .get(col::STATEMENTS, hash.as_slice())
1126 .map_err(|e| Error::Db(e.to_string()))?
1127 {
1128 Some(entry) => {
1129 log::trace!(
1130 target: LOG_TARGET,
1131 "Queried statement {:?}",
1132 HexDisplay::from(hash)
1133 );
1134 Some(
1135 Statement::decode(&mut entry.as_slice())
1136 .map_err(|e| Error::Decode(e.to_string()))?,
1137 )
1138 },
1139 None => {
1140 log::trace!(
1141 target: LOG_TARGET,
1142 "Queried missing statement {:?}",
1143 HexDisplay::from(hash)
1144 );
1145 None
1146 },
1147 },
1148 )
1149 }
1150
1151 fn has_statement(&self, hash: &Hash) -> bool {
1152 self.index.read().entries.contains_key(hash)
1153 }
1154
1155 fn statement_hashes(&self) -> Vec<Hash> {
1156 self.index.read().entries.keys().cloned().collect()
1157 }
1158
1159 fn statements_by_hashes(
1160 &self,
1161 hashes: &[Hash],
1162 filter: &mut dyn FnMut(&Hash, &[u8], &Statement) -> FilterDecision,
1163 ) -> Result<(Vec<(Hash, Statement)>, usize)> {
1164 let mut result = Vec::new();
1165 let mut processed = 0;
1166 for hash in hashes {
1167 processed += 1;
1168 let Some(encoded) =
1169 self.db.get(col::STATEMENTS, hash).map_err(|e| Error::Db(e.to_string()))?
1170 else {
1171 continue;
1172 };
1173 let Ok(statement) = Statement::decode(&mut encoded.as_slice()) else { continue };
1174 match filter(hash, &encoded, &statement) {
1175 FilterDecision::Skip => {},
1176 FilterDecision::Take => {
1177 result.push((*hash, statement));
1178 },
1179 FilterDecision::Abort => {
1180 processed -= 1;
1182 break;
1183 },
1184 }
1185 }
1186
1187 Ok((result, processed))
1188 }
1189
1190 fn broadcasts(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
1193 self.collect_statements(
1194 None,
1195 &OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1196 |statement| statement.into_data(),
1197 )
1198 }
1199
1200 fn posted(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
1204 self.collect_statements(
1205 Some(dest),
1206 &OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1207 |statement| statement.into_data(),
1208 )
1209 }
1210
1211 fn posted_clear(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
1214 self.posted_clear_inner(match_all_topics, dest, |_statement, data| data)
1215 }
1216
1217 fn broadcasts_stmt(&self, match_all_topics: &[Topic]) -> Result<Vec<Vec<u8>>> {
1220 self.collect_statements(
1221 None,
1222 &OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1223 |statement| Some(statement.encode()),
1224 )
1225 }
1226
1227 fn posted_stmt(&self, match_all_topics: &[Topic], dest: [u8; 32]) -> Result<Vec<Vec<u8>>> {
1231 self.collect_statements(
1232 Some(dest),
1233 &OptimizedTopicFilter::MatchAll(match_all_topics.iter().cloned().collect()),
1234 |statement| Some(statement.encode()),
1235 )
1236 }
1237
1238 fn posted_clear_stmt(
1241 &self,
1242 match_all_topics: &[Topic],
1243 dest: [u8; 32],
1244 ) -> Result<Vec<Vec<u8>>> {
1245 self.posted_clear_inner(match_all_topics, dest, |statement, data| {
1246 let mut res = Vec::with_capacity(statement.size_hint() + data.len());
1247 statement.encode_to(&mut res);
1248 res.extend_from_slice(&data);
1249 res
1250 })
1251 }
1252
1253 fn submit(&self, statement: Statement, source: StatementSource) -> SubmitResult {
1255 let _histogram_submit_start_timer = self.metrics.start_submit_timer();
1256 let hash = statement.hash();
1257 if self.timestamp() >= statement.get_expiration_timestamp_secs().into() {
1259 log::debug!(
1260 target: LOG_TARGET,
1261 "Statement is already expired: {:?}",
1262 HexDisplay::from(&hash),
1263 );
1264 self.metrics.report(|metrics| metrics.validations_invalid.inc());
1265 return SubmitResult::Invalid(InvalidReason::AlreadyExpired);
1266 }
1267 let encoded_size = statement.encoded_size();
1268 if encoded_size > MAX_STATEMENT_SIZE {
1269 log::debug!(
1270 target: LOG_TARGET,
1271 "Statement is too big for propogation: {:?} ({}/{} bytes)",
1272 HexDisplay::from(&hash),
1273 statement.encoded_size(),
1274 MAX_STATEMENT_SIZE
1275 );
1276 self.metrics.report(|metrics| metrics.validations_invalid.inc());
1277 return SubmitResult::Invalid(InvalidReason::EncodingTooLarge {
1278 submitted_size: encoded_size,
1279 max_size: MAX_STATEMENT_SIZE,
1280 });
1281 }
1282
1283 match self.index.read().query(&hash) {
1284 IndexQuery::Expired => {
1285 if !source.can_be_resubmitted() {
1286 return SubmitResult::KnownExpired;
1287 }
1288 },
1289 IndexQuery::Exists => {
1290 if !source.can_be_resubmitted() {
1291 return SubmitResult::Known;
1292 }
1293 },
1294 IndexQuery::Unknown => {},
1295 }
1296
1297 let Some(account_id) = statement.account_id() else {
1298 log::debug!(
1299 target: LOG_TARGET,
1300 "Statement validation failed: Missing proof ({:?})",
1301 HexDisplay::from(&hash),
1302 );
1303 self.metrics.report(|metrics| metrics.validations_invalid.inc());
1304 return SubmitResult::Invalid(InvalidReason::NoProof);
1305 };
1306
1307 match statement.verify_signature() {
1308 SignatureVerificationResult::Valid(_) => {},
1309 SignatureVerificationResult::Invalid => {
1310 log::debug!(
1311 target: LOG_TARGET,
1312 "Statement validation failed: BadProof, {:?}",
1313 HexDisplay::from(&hash),
1314 );
1315 self.metrics.report(|metrics| metrics.validations_invalid.inc());
1316 return SubmitResult::Invalid(InvalidReason::BadProof);
1317 },
1318 SignatureVerificationResult::NoSignature => {
1319 if let Some(Proof::OnChain { .. }) = statement.proof() {
1320 log::debug!(
1321 target: LOG_TARGET,
1322 "Statement with OnChain proof accepted: {:?}",
1323 HexDisplay::from(&hash),
1324 );
1325 } else {
1326 log::debug!(
1327 target: LOG_TARGET,
1328 "Statement validation failed: NoProof, {:?}",
1329 HexDisplay::from(&hash),
1330 );
1331 self.metrics.report(|metrics| metrics.validations_invalid.inc());
1332 return SubmitResult::Invalid(InvalidReason::NoProof);
1333 }
1334 },
1335 };
1336
1337 let validation = match (self.read_allowance_fn)(
1338 &account_id,
1339 statement.proof().and_then(|p| match p {
1340 Proof::OnChain { block_hash, .. } => Some(*block_hash),
1341 _ => None,
1342 }),
1343 ) {
1344 Ok(Some(allowance)) => allowance,
1345 Ok(None) => {
1346 log::debug!(
1347 target: LOG_TARGET,
1348 "Account {} has no statement allowance set",
1349 HexDisplay::from(&account_id),
1350 );
1351 return SubmitResult::Rejected(RejectionReason::NoAllowance);
1352 },
1353 Err(e) => {
1354 log::debug!(
1355 target: LOG_TARGET,
1356 "Reading statement allowance for account {} failed",
1357 HexDisplay::from(&account_id),
1358 );
1359 return SubmitResult::InternalError(e);
1360 },
1361 };
1362
1363 let current_time = self.timestamp();
1364 let mut commit = Vec::new();
1365 {
1366 let mut index = self.index.write();
1367
1368 let evicted =
1369 match index.insert(hash, &statement, &account_id, &validation, current_time) {
1370 Ok(evicted) => evicted,
1371 Err(reason) => {
1372 self.metrics.report(|metrics| {
1373 metrics.rejections.with_label_values(&[reason.label()]).inc();
1374 });
1375 return SubmitResult::Rejected(reason);
1376 },
1377 };
1378
1379 commit.push((col::STATEMENTS, hash.to_vec(), Some(statement.encode())));
1380 for hash in evicted {
1381 commit.push((col::STATEMENTS, hash.to_vec(), None));
1382 commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
1383 }
1384 if let Err(e) = self.db.commit(commit) {
1385 log::debug!(
1386 target: LOG_TARGET,
1387 "Statement validation failed: database error {}, {:?}",
1388 e,
1389 statement
1390 );
1391 return SubmitResult::InternalError(Error::Db(e.to_string()));
1392 }
1393 self.subscription_manager.notify(statement);
1394 } self.metrics.report(|metrics| metrics.submitted_statements.inc());
1396 log::trace!(target: LOG_TARGET, "Statement submitted: {:?}", HexDisplay::from(&hash));
1397 SubmitResult::New
1398 }
1399
1400 fn remove(&self, hash: &Hash) -> Result<()> {
1402 let current_time = self.timestamp();
1403 {
1404 let mut index = self.index.write();
1405 if index.make_expired(hash, current_time) {
1406 let commit = [
1407 (col::STATEMENTS, hash.to_vec(), None),
1408 (col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())),
1409 ];
1410 if let Err(e) = self.db.commit(commit) {
1411 log::debug!(
1412 target: LOG_TARGET,
1413 "Error removing statement: database error {}, {:?}",
1414 e,
1415 HexDisplay::from(hash),
1416 );
1417 return Err(Error::Db(e.to_string()));
1418 }
1419 }
1420 }
1421 Ok(())
1422 }
1423
1424 fn remove_by(&self, who: [u8; 32]) -> Result<()> {
1426 let mut index = self.index.write();
1427 let mut evicted = Vec::new();
1428 if let Some(account_rec) = index.accounts.get(&who) {
1429 evicted.extend(account_rec.by_priority.keys().map(|k| k.hash));
1430 }
1431
1432 let current_time = self.timestamp();
1433 let mut commit = Vec::new();
1434 for hash in evicted {
1435 index.make_expired(&hash, current_time);
1436 commit.push((col::STATEMENTS, hash.to_vec(), None));
1437 commit.push((col::EXPIRED, hash.to_vec(), Some((hash, current_time).encode())));
1438 }
1439 self.db.commit(commit).map_err(|e| {
1440 log::debug!(
1441 target: LOG_TARGET,
1442 "Error removing statement: database error {}, remove by {:?}",
1443 e,
1444 HexDisplay::from(&who),
1445 );
1446
1447 Error::Db(e.to_string())
1448 })
1449 }
1450}
1451
1452impl StatementStoreSubscriptionApi for Store {
1453 fn subscribe_statement(
1454 &self,
1455 topic_filter: OptimizedTopicFilter,
1456 ) -> Result<(Vec<Vec<u8>>, async_channel::Sender<StatementEvent>, SubscriptionStatementsStream)>
1457 {
1458 let mut existing_statements = Vec::new();
1460 let index = self.index.read();
1461 self.collect_statements_locked(
1462 None,
1463 &topic_filter,
1464 &index,
1465 &mut existing_statements,
1466 |statement| Some(statement.encode()),
1467 )?;
1468 let (subscription_sender, subscription_stream) =
1469 self.subscription_manager.subscribe(topic_filter);
1470 if existing_statements.is_empty() {
1471 subscription_sender
1472 .send_blocking(StatementEvent::NewStatements {
1473 statements: vec![],
1474 remaining: Some(0),
1475 })
1476 .ok();
1477 }
1478 Ok((existing_statements, subscription_sender, subscription_stream))
1479 }
1480}
1481
1482#[cfg(test)]
1483mod tests {
1484
1485 use super::{col, Store, MAX_STATEMENT_SIZE};
1486 use soil_client::keystore::Keystore;
1487 use soil_statement_store::{
1488 AccountId, Channel, DecryptionKey, InvalidReason, Proof, Statement, StatementSource,
1489 StatementStore, SubmitResult, Topic,
1490 };
1491 use subsoil::core::{Decode, Encode, Pair};
1492
1493 type Extrinsic = subsoil::runtime::OpaqueExtrinsic;
1494 type Hash = subsoil::core::H256;
1495 type Hashing = subsoil::runtime::traits::BlakeTwo256;
1496 type BlockNumber = u64;
1497 type Header = subsoil::runtime::generic::Header<BlockNumber, Hashing>;
1498 type Block = subsoil::runtime::generic::Block<Header, Extrinsic>;
1499
1500 const CORRECT_BLOCK_HASH: [u8; 32] = [1u8; 32];
1501
1502 #[derive(Clone)]
1503 pub(crate) struct TestClient;
1504
1505 pub(crate) type TestBackend = soil_client::client_api::in_mem::Backend<Block>;
1506
1507 impl soil_client::client_api::StorageProvider<Block, TestBackend> for TestClient {
1508 fn storage(
1509 &self,
1510 _hash: Hash,
1511 key: &soil_client::client_api::StorageKey,
1512 ) -> soil_client::blockchain::Result<Option<soil_client::client_api::StorageData>> {
1513 use soil_statement_store::StatementAllowance;
1514
1515 assert_eq!(&key.0[0..21], b":statement_allowance:" as &[u8],);
1516
1517 let account_bytes = &key.0[21..53];
1519 let account_id: u64 = u64::from_le_bytes(account_bytes[0..8].try_into().unwrap());
1520 let allowance = match account_id {
1521 0 => return Ok(None),
1523 1 => StatementAllowance::new(1, 1000),
1524 2 => StatementAllowance::new(2, 1000),
1525 3 => StatementAllowance::new(3, 1000),
1526 4 => StatementAllowance::new(4, 1000),
1527 42 => StatementAllowance::new(42, (42 * MAX_STATEMENT_SIZE) as u32),
1528 _ => StatementAllowance::new(100, 1000),
1529 };
1530 Ok(Some(soil_client::client_api::StorageData(allowance.encode())))
1531 }
1532
1533 fn storage_hash(
1534 &self,
1535 _hash: Hash,
1536 _key: &soil_client::client_api::StorageKey,
1537 ) -> soil_client::blockchain::Result<Option<Hash>> {
1538 unimplemented!()
1539 }
1540
1541 fn storage_keys(
1542 &self,
1543 _hash: Hash,
1544 _prefix: Option<&soil_client::client_api::StorageKey>,
1545 _start_key: Option<&soil_client::client_api::StorageKey>,
1546 ) -> soil_client::blockchain::Result<
1547 soil_client::client_api::backend::KeysIter<
1548 <TestBackend as soil_client::client_api::Backend<Block>>::State,
1549 Block,
1550 >,
1551 > {
1552 unimplemented!()
1553 }
1554
1555 fn storage_pairs(
1556 &self,
1557 _hash: Hash,
1558 _prefix: Option<&soil_client::client_api::StorageKey>,
1559 _start_key: Option<&soil_client::client_api::StorageKey>,
1560 ) -> soil_client::blockchain::Result<
1561 soil_client::client_api::backend::PairsIter<
1562 <TestBackend as soil_client::client_api::Backend<Block>>::State,
1563 Block,
1564 >,
1565 > {
1566 unimplemented!()
1567 }
1568
1569 fn child_storage(
1570 &self,
1571 _hash: Hash,
1572 _child_info: &soil_client::client_api::ChildInfo,
1573 _key: &soil_client::client_api::StorageKey,
1574 ) -> soil_client::blockchain::Result<Option<soil_client::client_api::StorageData>> {
1575 unimplemented!()
1576 }
1577
1578 fn child_storage_keys(
1579 &self,
1580 _hash: Hash,
1581 _child_info: soil_client::client_api::ChildInfo,
1582 _prefix: Option<&soil_client::client_api::StorageKey>,
1583 _start_key: Option<&soil_client::client_api::StorageKey>,
1584 ) -> soil_client::blockchain::Result<
1585 soil_client::client_api::backend::KeysIter<
1586 <TestBackend as soil_client::client_api::Backend<Block>>::State,
1587 Block,
1588 >,
1589 > {
1590 unimplemented!()
1591 }
1592
1593 fn child_storage_hash(
1594 &self,
1595 _hash: Hash,
1596 _child_info: &soil_client::client_api::ChildInfo,
1597 _key: &soil_client::client_api::StorageKey,
1598 ) -> soil_client::blockchain::Result<Option<Hash>> {
1599 unimplemented!()
1600 }
1601
1602 fn closest_merkle_value(
1603 &self,
1604 _hash: Hash,
1605 _key: &soil_client::client_api::StorageKey,
1606 ) -> soil_client::blockchain::Result<Option<soil_client::client_api::MerkleValue<Hash>>> {
1607 unimplemented!()
1608 }
1609
1610 fn child_closest_merkle_value(
1611 &self,
1612 _hash: Hash,
1613 _child_info: &soil_client::client_api::ChildInfo,
1614 _key: &soil_client::client_api::StorageKey,
1615 ) -> soil_client::blockchain::Result<Option<soil_client::client_api::MerkleValue<Hash>>> {
1616 unimplemented!()
1617 }
1618 }
1619
1620 impl soil_client::blockchain::HeaderBackend<Block> for TestClient {
1621 fn header(&self, _hash: Hash) -> soil_client::blockchain::Result<Option<Header>> {
1622 unimplemented!()
1623 }
1624 fn info(&self) -> soil_client::blockchain::Info<Block> {
1625 soil_client::blockchain::Info {
1626 best_hash: CORRECT_BLOCK_HASH.into(),
1627 best_number: 0,
1628 genesis_hash: Default::default(),
1629 finalized_hash: CORRECT_BLOCK_HASH.into(),
1630 finalized_number: 1,
1631 finalized_state: None,
1632 number_leaves: 0,
1633 block_gap: None,
1634 }
1635 }
1636 fn status(
1637 &self,
1638 _hash: Hash,
1639 ) -> soil_client::blockchain::Result<soil_client::blockchain::BlockStatus> {
1640 unimplemented!()
1641 }
1642 fn number(&self, _hash: Hash) -> soil_client::blockchain::Result<Option<BlockNumber>> {
1643 unimplemented!()
1644 }
1645 fn hash(&self, _number: BlockNumber) -> soil_client::blockchain::Result<Option<Hash>> {
1646 unimplemented!()
1647 }
1648 }
1649
1650 fn test_store() -> (Store, tempfile::TempDir) {
1651 subsoil::tracing::init_for_tests();
1652 let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir");
1653
1654 let client = std::sync::Arc::new(TestClient);
1655 let mut path: std::path::PathBuf = temp_dir.path().into();
1656 path.push("db");
1657 let keystore = std::sync::Arc::new(soil_client::keystore::LocalKeystore::in_memory());
1658 let store = Store::new::<Block, TestClient, TestBackend>(
1659 &path,
1660 Default::default(),
1661 client,
1662 keystore,
1663 None,
1664 Box::new(subsoil::core::testing::TaskExecutor::new()),
1665 )
1666 .unwrap();
1667 (store, temp_dir) }
1669
1670 pub fn signed_statement(data: u8) -> Statement {
1671 signed_statement_with_topics(data, &[], None)
1672 }
1673
1674 fn signed_statement_with_topics(
1675 data: u8,
1676 topics: &[Topic],
1677 dec_key: Option<DecryptionKey>,
1678 ) -> Statement {
1679 let mut statement = Statement::new();
1680 statement.set_plain_data(vec![data]);
1681 statement.set_expiry(u64::MAX);
1682
1683 for i in 0..topics.len() {
1684 statement.set_topic(i, topics[i]);
1685 }
1686 if let Some(key) = dec_key {
1687 statement.set_decryption_key(key);
1688 }
1689 let kp = subsoil::core::ed25519::Pair::from_string("//Alice", None).unwrap();
1690 statement.sign_ed25519_private(&kp);
1691 statement
1692 }
1693
1694 fn topic(data: u64) -> Topic {
1695 let mut bytes = [0u8; 32];
1696 bytes[0..8].copy_from_slice(&data.to_le_bytes());
1697 Topic::from(bytes)
1698 }
1699
1700 fn dec_key(data: u64) -> DecryptionKey {
1701 let mut dec_key: DecryptionKey = Default::default();
1702 dec_key[0..8].copy_from_slice(&data.to_le_bytes());
1703 dec_key
1704 }
1705
1706 fn account(id: u64) -> AccountId {
1707 let mut account: AccountId = Default::default();
1708 account[0..8].copy_from_slice(&id.to_le_bytes());
1709 account
1710 }
1711
1712 fn channel(id: u64) -> Channel {
1713 let mut channel: Channel = Default::default();
1714 channel[0..8].copy_from_slice(&id.to_le_bytes());
1715 channel
1716 }
1717
1718 fn statement(account_id: u64, priority: u32, c: Option<u64>, data_len: usize) -> Statement {
1719 let mut statement = Statement::new();
1720 let mut data = Vec::new();
1721 data.resize(data_len, 0);
1722 statement.set_plain_data(data);
1723 statement.set_expiry_from_parts(u32::MAX, priority);
1724 if let Some(c) = c {
1725 statement.set_channel(channel(c));
1726 }
1727 statement.set_proof(Proof::OnChain {
1728 block_hash: CORRECT_BLOCK_HASH,
1729 who: account(account_id),
1730 event_index: 0,
1731 });
1732 statement
1733 }
1734
1735 #[test]
1736 fn submit_one() {
1737 let (store, _temp) = test_store();
1738 let statement0 = signed_statement(0);
1739 assert_eq!(store.submit(statement0, StatementSource::Network), SubmitResult::New);
1740 let unsigned = statement(1, 1, None, 0);
1741 assert_eq!(store.submit(unsigned, StatementSource::Network), SubmitResult::New);
1742 }
1743
1744 #[test]
1745 fn save_and_load_statements() {
1746 let (store, temp) = test_store();
1747 let statement0 = signed_statement(0);
1748 let statement1 = signed_statement(1);
1749 let statement2 = signed_statement(2);
1750 assert_eq!(store.submit(statement0.clone(), StatementSource::Network), SubmitResult::New);
1751 assert_eq!(store.submit(statement1.clone(), StatementSource::Network), SubmitResult::New);
1752 assert_eq!(store.submit(statement2.clone(), StatementSource::Network), SubmitResult::New);
1753 assert_eq!(store.statements().unwrap().len(), 3);
1754 assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1755 assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1.clone()));
1756 let keystore = store.keystore.clone();
1757 drop(store);
1758
1759 let client = std::sync::Arc::new(TestClient);
1760 let mut path: std::path::PathBuf = temp.path().into();
1761 path.push("db");
1762 let store = Store::new::<Block, TestClient, TestBackend>(
1763 &path,
1764 Default::default(),
1765 client,
1766 keystore,
1767 None,
1768 Box::new(subsoil::core::testing::TaskExecutor::new()),
1769 )
1770 .unwrap();
1771 assert_eq!(store.statements().unwrap().len(), 3);
1772 assert_eq!(store.broadcasts(&[]).unwrap().len(), 3);
1773 assert_eq!(store.statement(&statement1.hash()).unwrap(), Some(statement1));
1774 }
1775
1776 #[test]
1777 fn take_recent_statements_clears_index() {
1778 let (store, _temp) = test_store();
1779 let statement0 = signed_statement(0);
1780 let statement1 = signed_statement(1);
1781 let statement2 = signed_statement(2);
1782 let statement3 = signed_statement(3);
1783
1784 let _ = store.submit(statement0.clone(), StatementSource::Local);
1785 let _ = store.submit(statement1.clone(), StatementSource::Local);
1786 let _ = store.submit(statement2.clone(), StatementSource::Local);
1787
1788 let recent1 = store.take_recent_statements().unwrap();
1789 let (recent1_hashes, recent1_statements): (Vec<_>, Vec<_>) = recent1.into_iter().unzip();
1790 let expected1 = vec![statement0, statement1, statement2];
1791 assert!(expected1.iter().all(|s| recent1_hashes.contains(&s.hash())));
1792 assert!(expected1.iter().all(|s| recent1_statements.contains(s)));
1793
1794 let recent2 = store.take_recent_statements().unwrap();
1796 assert_eq!(recent2.len(), 0);
1797
1798 store.submit(statement3.clone(), StatementSource::Network);
1799
1800 let recent3 = store.take_recent_statements().unwrap();
1801 let (recent3_hashes, recent3_statements): (Vec<_>, Vec<_>) = recent3.into_iter().unzip();
1802 let expected3 = vec![statement3];
1803 assert!(expected3.iter().all(|s| recent3_hashes.contains(&s.hash())));
1804 assert!(expected3.iter().all(|s| recent3_statements.contains(s)));
1805
1806 assert_eq!(store.statements().unwrap().len(), 4);
1808 }
1809
1810 #[test]
1811 fn search_by_topic_and_key() {
1812 let (store, _temp) = test_store();
1813 let statement0 = signed_statement(0);
1814 let statement1 = signed_statement_with_topics(1, &[topic(0)], None);
1815 let statement2 = signed_statement_with_topics(2, &[topic(0), topic(1)], Some(dec_key(2)));
1816 let statement3 = signed_statement_with_topics(3, &[topic(0), topic(1), topic(2)], None);
1817 let statement4 =
1818 signed_statement_with_topics(4, &[topic(0), topic(42), topic(2), topic(3)], None);
1819 let statements = vec![statement0, statement1, statement2, statement3, statement4];
1820 for s in &statements {
1821 store.submit(s.clone(), StatementSource::Network);
1822 }
1823
1824 let assert_topics = |topics: &[u64], key: Option<u64>, expected: &[u8]| {
1825 let key = key.map(dec_key);
1826 let topics: Vec<_> = topics.iter().map(|t| topic(*t)).collect();
1827 let mut got_vals: Vec<_> = if let Some(key) = key {
1828 store.posted(&topics, key).unwrap().into_iter().map(|d| d[0]).collect()
1829 } else {
1830 store.broadcasts(&topics).unwrap().into_iter().map(|d| d[0]).collect()
1831 };
1832 got_vals.sort();
1833 assert_eq!(expected.to_vec(), got_vals);
1834 };
1835
1836 assert_topics(&[], None, &[0, 1, 3, 4]);
1837 assert_topics(&[], Some(2), &[2]);
1838 assert_topics(&[0], None, &[1, 3, 4]);
1839 assert_topics(&[1], None, &[3]);
1840 assert_topics(&[2], None, &[3, 4]);
1841 assert_topics(&[3], None, &[4]);
1842 assert_topics(&[42], None, &[4]);
1843
1844 assert_topics(&[0, 1], None, &[3]);
1845 assert_topics(&[0, 1], Some(2), &[2]);
1846 assert_topics(&[0, 1, 99], Some(2), &[]);
1847 assert_topics(&[1, 2], None, &[3]);
1848 assert_topics(&[99], None, &[]);
1849 assert_topics(&[0, 99], None, &[]);
1850 assert_topics(&[0, 1, 2, 3, 42], None, &[]);
1851 }
1852
1853 #[test]
1854 fn constraints() {
1855 let (store, _temp) = test_store();
1856
1857 store.index.write().options.max_total_size = 3000;
1858 let source = StatementSource::Network;
1859 let ok = SubmitResult::New;
1860
1861 assert!(matches!(
1865 store.submit(statement(1, 1, Some(1), 2000), source),
1866 SubmitResult::Rejected(_)
1867 ));
1868 assert_eq!(store.submit(statement(1, 1, Some(1), 500), source), ok);
1869 assert!(matches!(
1871 store.submit(statement(1, 1, Some(1), 200), source),
1872 SubmitResult::Rejected(_)
1873 ));
1874 assert_eq!(store.submit(statement(1, 2, Some(1), 600), source), ok);
1875 assert!(matches!(
1878 store.submit(statement(1, 1, Some(2), 100), source),
1879 SubmitResult::Rejected(_)
1880 ));
1881 assert_eq!(store.index.read().expired.len(), 1);
1882
1883 assert_eq!(store.submit(statement(2, 1, None, 500), source), ok);
1886 assert_eq!(store.submit(statement(2, 2, None, 100), source), ok);
1887 assert_eq!(store.submit(statement(2, 3, None, 500), source), ok);
1889 assert_eq!(store.index.read().expired.len(), 2);
1890 assert_eq!(store.submit(statement(2, 4, None, 1000), source), ok);
1892 assert_eq!(store.index.read().expired.len(), 4);
1893
1894 assert_eq!(store.submit(statement(3, 2, Some(1), 300), source), ok);
1897 assert_eq!(store.submit(statement(3, 3, Some(2), 300), source), ok);
1898 assert_eq!(store.submit(statement(3, 4, Some(3), 300), source), ok);
1899 assert_eq!(store.submit(statement(3, 5, None, 500), source), ok);
1901 assert_eq!(store.index.read().expired.len(), 6);
1902
1903 assert_eq!(store.index.read().total_size, 2400);
1904 assert_eq!(store.index.read().entries.len(), 4);
1905
1906 assert!(matches!(
1908 store.submit(statement(1, 1, None, 700), source),
1909 SubmitResult::Rejected(_)
1910 ));
1911 store.index.write().options.max_total_statements = 4;
1913 assert!(matches!(
1914 store.submit(statement(1, 1, None, 100), source),
1915 SubmitResult::Rejected(_)
1916 ));
1917
1918 let mut expected_statements = vec![
1919 statement(1, 2, Some(1), 600).hash(),
1920 statement(2, 4, None, 1000).hash(),
1921 statement(3, 4, Some(3), 300).hash(),
1922 statement(3, 5, None, 500).hash(),
1923 ];
1924 expected_statements.sort();
1925 let mut statements: Vec<_> =
1926 store.statements().unwrap().into_iter().map(|(hash, _)| hash).collect();
1927 statements.sort();
1928 assert_eq!(expected_statements, statements);
1929 }
1930
1931 #[test]
1932 fn max_statement_size_for_gossiping() {
1933 let (store, _temp) = test_store();
1934 store.index.write().options.max_total_size = 42 * MAX_STATEMENT_SIZE;
1935
1936 assert_eq!(
1937 store.submit(
1938 statement(42, 1, Some(1), MAX_STATEMENT_SIZE - 500),
1939 StatementSource::Local
1940 ),
1941 SubmitResult::New
1942 );
1943
1944 assert!(matches!(
1945 store.submit(statement(42, 2, Some(1), 2 * MAX_STATEMENT_SIZE), StatementSource::Local),
1946 SubmitResult::Invalid(_)
1947 ));
1948 }
1949
1950 #[test]
1951 fn expired_statements_are_purged() {
1952 use super::DEFAULT_PURGE_AFTER_SEC;
1953 let (mut store, temp) = test_store();
1954 let mut statement = statement(1, 1, Some(3), 100);
1955 store.set_time(0);
1956 statement.set_topic(0, topic(4));
1957 store.submit(statement.clone(), StatementSource::Network);
1958 assert_eq!(store.index.read().entries.len(), 1);
1959 store.remove(&statement.hash()).unwrap();
1960 assert_eq!(store.index.read().entries.len(), 0);
1961 assert_eq!(store.index.read().accounts.len(), 0);
1962 store.set_time(DEFAULT_PURGE_AFTER_SEC + 1);
1963 store.maintain();
1964 assert_eq!(store.index.read().expired.len(), 0);
1965 let keystore = store.keystore.clone();
1966 drop(store);
1967
1968 let client = std::sync::Arc::new(TestClient);
1969 let mut path: std::path::PathBuf = temp.path().into();
1970 path.push("db");
1971 let store = Store::new::<Block, TestClient, TestBackend>(
1972 &path,
1973 Default::default(),
1974 client,
1975 keystore,
1976 None,
1977 Box::new(subsoil::core::testing::TaskExecutor::new()),
1978 )
1979 .unwrap();
1980 assert_eq!(store.statements().unwrap().len(), 0);
1981 assert_eq!(store.index.read().expired.len(), 0);
1982 }
1983
1984 #[test]
1985 fn posted_clear_decrypts() {
1986 let (store, _temp) = test_store();
1987 let public = store
1988 .keystore
1989 .ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
1990 .unwrap();
1991 let statement1 = statement(1, 1, None, 100);
1992 let mut statement2 = statement(1, 2, None, 0);
1993 let plain = b"The most valuable secret".to_vec();
1994 statement2.encrypt(&plain, &public).unwrap();
1995 store.submit(statement1, StatementSource::Network);
1996 store.submit(statement2, StatementSource::Network);
1997 let posted_clear = store.posted_clear(&[], public.into()).unwrap();
1998 assert_eq!(posted_clear, vec![plain]);
1999 }
2000
2001 #[test]
2002 fn broadcasts_stmt_returns_encoded_statements() {
2003 let (store, _tmp) = test_store();
2004
2005 let s0 = signed_statement_with_topics(0, &[], None);
2007 let s1 = signed_statement_with_topics(1, &[topic(42)], None);
2009 let s2 = signed_statement_with_topics(2, &[topic(42)], Some(dec_key(99)));
2011
2012 for s in [&s0, &s1, &s2] {
2013 store.submit(s.clone(), StatementSource::Network);
2014 }
2015
2016 let mut hashes: Vec<_> = store
2018 .broadcasts_stmt(&[])
2019 .unwrap()
2020 .into_iter()
2021 .map(|bytes| Statement::decode(&mut &bytes[..]).unwrap().hash())
2022 .collect();
2023 hashes.sort();
2024 let expected_hashes = {
2025 let mut e = vec![s0.hash(), s1.hash()];
2026 e.sort();
2027 e
2028 };
2029 assert_eq!(hashes, expected_hashes);
2030
2031 let got = store.broadcasts_stmt(&[topic(42)]).unwrap();
2033 assert_eq!(got.len(), 1);
2034 let st = Statement::decode(&mut &got[0][..]).unwrap();
2035 assert_eq!(st.hash(), s1.hash());
2036 }
2037
2038 #[test]
2039 fn posted_stmt_returns_encoded_statements_for_dest() {
2040 let (store, _tmp) = test_store();
2041
2042 let public1 = store
2043 .keystore
2044 .ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
2045 .unwrap();
2046 let dest: [u8; 32] = public1.into();
2047
2048 let public2 = store
2049 .keystore
2050 .ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
2051 .unwrap();
2052
2053 let mut s_with_key = statement(1, 1, None, 0);
2055 let plain1 = b"The most valuable secret".to_vec();
2056 s_with_key.encrypt(&plain1, &public1).unwrap();
2057
2058 let mut s_other_key = statement(2, 2, None, 0);
2060 let plain2 = b"The second most valuable secret".to_vec();
2061 s_other_key.encrypt(&plain2, &public2).unwrap();
2062
2063 for s in [&s_with_key, &s_other_key] {
2065 store.submit(s.clone(), StatementSource::Network);
2066 }
2067
2068 let retrieved = store.posted_stmt(&[], dest).unwrap();
2070 assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
2071
2072 let returned_stmt = Statement::decode(&mut &retrieved[0][..]).unwrap();
2074 assert_eq!(
2075 returned_stmt.hash(),
2076 s_with_key.hash(),
2077 "Returned statement must match s_with_key"
2078 );
2079 }
2080
2081 #[test]
2082 fn posted_clear_stmt_returns_statement_followed_by_plain_data() {
2083 let (store, _tmp) = test_store();
2084
2085 let public1 = store
2086 .keystore
2087 .ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
2088 .unwrap();
2089 let dest: [u8; 32] = public1.into();
2090
2091 let public2 = store
2092 .keystore
2093 .ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
2094 .unwrap();
2095
2096 let mut s_with_key = statement(1, 1, None, 0);
2098 let plain1 = b"The most valuable secret".to_vec();
2099 s_with_key.encrypt(&plain1, &public1).unwrap();
2100
2101 let mut s_other_key = statement(2, 2, None, 0);
2103 let plain2 = b"The second most valuable secret".to_vec();
2104 s_other_key.encrypt(&plain2, &public2).unwrap();
2105
2106 for s in [&s_with_key, &s_other_key] {
2108 store.submit(s.clone(), StatementSource::Network);
2109 }
2110
2111 let retrieved = store.posted_clear_stmt(&[], dest).unwrap();
2113 assert_eq!(retrieved.len(), 1, "Only one statement has dec_key=dest");
2114
2115 let encoded_stmt = s_with_key.encode();
2117 let stmt_len = encoded_stmt.len();
2118
2119 assert_eq!(&retrieved[0][..stmt_len], &encoded_stmt[..]);
2121
2122 let trailing = &retrieved[0][stmt_len..];
2124 assert_eq!(trailing, &plain1[..]);
2125 }
2126
2127 #[test]
2128 fn posted_clear_returns_plain_data_for_dest_and_topics() {
2129 let (store, _tmp) = test_store();
2130
2131 let public_dest = store
2133 .keystore
2134 .ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
2135 .unwrap();
2136 let dest: [u8; 32] = public_dest.into();
2137
2138 let public_other = store
2139 .keystore
2140 .ed25519_generate_new(subsoil::core::crypto::key_types::STATEMENT, None)
2141 .unwrap();
2142
2143 let mut s_good = statement(1, 1, None, 0);
2145 let plaintext_good = b"The most valuable secret".to_vec();
2146 s_good.encrypt(&plaintext_good, &public_dest).unwrap();
2147 s_good.set_topic(0, topic(42));
2148
2149 let mut s_wrong_topic = statement(2, 2, None, 0);
2151 s_wrong_topic.encrypt(b"Wrong topic", &public_dest).unwrap();
2152 s_wrong_topic.set_topic(0, topic(99));
2153
2154 let mut s_other_dest = statement(3, 3, None, 0);
2156 s_other_dest.encrypt(b"Other dest", &public_other).unwrap();
2157 s_other_dest.set_topic(0, topic(42));
2158
2159 for s in [&s_good, &s_wrong_topic, &s_other_dest] {
2161 store.submit(s.clone(), StatementSource::Network);
2162 }
2163
2164 let retrieved = store.posted_clear(&[topic(42)], dest).unwrap();
2166
2167 assert_eq!(retrieved, vec![plaintext_good]);
2169 }
2170
2171 #[test]
2172 fn already_expired_statement_is_rejected() {
2173 let (mut store, _temp) = test_store();
2174
2175 store.set_time(1000);
2177
2178 let mut expired_statement = statement(1, 1, None, 100);
2181 expired_statement.set_expiry_from_parts(500, 1);
2183
2184 assert_eq!(
2186 store.submit(expired_statement, StatementSource::Network),
2187 SubmitResult::Invalid(InvalidReason::AlreadyExpired)
2188 );
2189
2190 assert_eq!(store.statements().unwrap().len(), 0);
2192
2193 let mut valid_statement = statement(1, 1, None, 100);
2196 valid_statement.set_expiry_from_parts(2000, 1);
2197
2198 assert_eq!(store.submit(valid_statement, StatementSource::Network), SubmitResult::New);
2200 assert_eq!(store.statements().unwrap().len(), 1);
2201 }
2202
2203 #[test]
2204 fn remove_by_covers_various_situations() {
2205 use soil_statement_store::{StatementSource, StatementStore, SubmitResult};
2206
2207 let (mut store, _temp) = test_store();
2209 store.set_time(0);
2210
2211 let t42 = topic(42);
2213 let k7 = dec_key(7);
2214
2215 let mut s_a1 = statement(4, 10, Some(100), 100);
2218 s_a1.set_topic(0, t42);
2219 let h_a1 = s_a1.hash();
2220
2221 let mut s_a2 = statement(4, 20, Some(200), 150);
2222 s_a2.set_decryption_key(k7);
2223 let h_a2 = s_a2.hash();
2224
2225 let s_a3 = statement(4, 30, None, 50);
2226 let h_a3 = s_a3.hash();
2227
2228 let s_b1 = statement(3, 10, None, 100);
2230 let h_b1 = s_b1.hash();
2231
2232 let mut s_b2 = statement(3, 15, Some(300), 100);
2233 s_b2.set_topic(0, t42);
2234 s_b2.set_decryption_key(k7);
2235 let h_b2 = s_b2.hash();
2236
2237 for s in [&s_a1, &s_a2, &s_a3, &s_b1, &s_b2] {
2239 assert_eq!(store.submit(s.clone(), StatementSource::Network), SubmitResult::New);
2240 }
2241
2242 {
2244 let idx = store.index.read();
2245 assert_eq!(idx.entries.len(), 5, "all 5 should be present");
2246 assert!(idx.accounts.contains_key(&account(4)));
2247 assert!(idx.accounts.contains_key(&account(3)));
2248 assert_eq!(idx.total_size, 100 + 150 + 50 + 100 + 100);
2249
2250 let set_t = idx.by_topic.get(&t42).expect("topic set exists");
2252 assert!(set_t.contains(&h_a1) && set_t.contains(&h_b2));
2253
2254 let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
2255 assert!(set_k.contains(&h_a2) && set_k.contains(&h_b2));
2256 }
2257
2258 store.remove_by(account(4)).expect("remove_by should succeed");
2260
2261 {
2263 for h in [h_a1, h_a2, h_a3] {
2265 assert!(store.statement(&h).unwrap().is_none(), "A's statement should be removed");
2266 }
2267
2268 for h in [h_b1, h_b2] {
2270 assert!(store.statement(&h).unwrap().is_some(), "B's statement should remain");
2271 }
2272
2273 let idx = store.index.read();
2274
2275 assert!(!idx.accounts.contains_key(&account(4)), "Account A must be gone");
2277 assert!(idx.accounts.contains_key(&account(3)), "Account B must remain");
2278
2279 assert!(idx.expired.contains_key(&h_a1));
2281 assert!(idx.expired.contains_key(&h_a2));
2282 assert!(idx.expired.contains_key(&h_a3));
2283 assert_eq!(idx.expired.len(), 3);
2284
2285 assert_eq!(idx.entries.len(), 2);
2287 assert_eq!(idx.total_size, 100 + 100);
2288
2289 let set_t = idx.by_topic.get(&t42).expect("topic set exists");
2291 assert!(set_t.contains(&h_b2));
2292 assert!(!set_t.contains(&h_a1));
2293
2294 let set_k = idx.by_dec_key.get(&Some(k7)).expect("key set exists");
2296 assert!(set_k.contains(&h_b2));
2297 assert!(!set_k.contains(&h_a2));
2298 }
2299
2300 store.remove_by(account(4)).expect("second remove_by should be a no-op");
2302
2303 let purge_after = store.index.read().options.purge_after_sec;
2305 store.set_time(purge_after + 1);
2306 store.maintain();
2307 assert_eq!(store.index.read().expired.len(), 0, "expired entries should be purged");
2308
2309 let s_new = statement(4, 40, None, 10);
2311 assert_eq!(store.submit(s_new, StatementSource::Network), SubmitResult::New);
2312 }
2313
2314 #[test]
2315 fn check_expiration_repopulates_account_list_when_empty() {
2316 let (mut store, _temp) = test_store();
2317 store.set_time(1000);
2318
2319 let s1 = statement(1, 1, None, 100);
2323 let s2 = statement(2, 1, None, 100);
2324 let s3 = statement(3, 1, None, 100);
2325
2326 for s in [&s1, &s2, &s3] {
2327 store.submit(s.clone(), StatementSource::Network);
2328 }
2329
2330 assert!(store.index.read().accounts_to_check_for_expiry_stmts.is_empty());
2332
2333 store.enforce_limits();
2335
2336 let accounts = store.index.read().accounts_to_check_for_expiry_stmts.clone();
2338 assert_eq!(accounts.len(), 3, "Should have 3 accounts to check");
2339 assert!(accounts.contains(&account(1)));
2340 assert!(accounts.contains(&account(2)));
2341 assert!(accounts.contains(&account(3)));
2342
2343 assert_eq!(store.index.read().expired.len(), 0);
2345 assert_eq!(store.index.read().entries.len(), 3);
2346 }
2347
2348 #[test]
2349 fn check_expiration_expires_statements_past_current_time() {
2350 let (mut store, _temp) = test_store();
2351
2352 store.set_time(100);
2357
2358 let mut expired_stmt = statement(1, 1, None, 100);
2360 expired_stmt.set_expiry_from_parts(500, 1);
2361 let expired_hash = expired_stmt.hash();
2362 store.submit(expired_stmt, StatementSource::Network);
2363
2364 let valid_stmt = statement(2, 1, None, 100); let valid_hash = valid_stmt.hash();
2367 store.submit(valid_stmt, StatementSource::Network);
2368
2369 assert_eq!(store.index.read().entries.len(), 2);
2371
2372 store.enforce_limits();
2374 assert!(!store.index.read().accounts_to_check_for_expiry_stmts.is_empty());
2375
2376 store.set_time(1000);
2378
2379 store.enforce_limits();
2381
2382 let index = store.index.read();
2384 assert!(index.expired.contains_key(&expired_hash), "Expired statement should be marked");
2385 assert!(
2386 !index.entries.contains_key(&expired_hash),
2387 "Expired statement should be removed from entries"
2388 );
2389
2390 assert!(
2392 index.entries.contains_key(&valid_hash),
2393 "Valid statement should still be in entries"
2394 );
2395 assert!(!index.expired.contains_key(&valid_hash), "Valid statement should not be expired");
2396 }
2397
2398 #[test]
2399 fn check_expiration_removes_checked_accounts_from_list_when_expiring() {
2400 let (mut store, _temp) = test_store();
2401 store.set_time(100);
2402
2403 let mut stmt1 = statement(1, 1, None, 100);
2405 stmt1.set_expiry_from_parts(200, 1);
2406 store.submit(stmt1, StatementSource::Network);
2407
2408 let mut stmt2 = statement(2, 1, None, 100);
2409 stmt2.set_expiry_from_parts(200, 1);
2410 store.submit(stmt2, StatementSource::Network);
2411
2412 let mut stmt3 = statement(3, 1, None, 100);
2413 stmt3.set_expiry_from_parts(200, 1);
2414 store.submit(stmt3, StatementSource::Network);
2415
2416 store.enforce_limits();
2418 assert_eq!(
2419 store.index.read().accounts_to_check_for_expiry_stmts.len(),
2420 3,
2421 "Should have 3 accounts to check"
2422 );
2423
2424 store.set_time(300);
2426
2427 store.enforce_limits();
2429
2430 assert!(
2432 store.index.read().accounts_to_check_for_expiry_stmts.is_empty(),
2433 "All accounts should have been checked and removed after expiration"
2434 );
2435
2436 assert_eq!(store.index.read().expired.len(), 3);
2438 assert_eq!(store.index.read().entries.len(), 0);
2439 }
2440
2441 #[test]
2442 fn check_expiration_truncates_list_even_when_nothing_expires() {
2443 let (mut store, _temp) = test_store();
2444 store.set_time(1000);
2445
2446 for acc_id in 1..=5u64 {
2450 let stmt = statement(acc_id, 1, None, 100);
2451 store.submit(stmt, StatementSource::Network);
2452 }
2453
2454 store.enforce_limits();
2456 assert_eq!(store.index.read().accounts_to_check_for_expiry_stmts.len(), 5);
2457
2458 store.enforce_limits();
2460
2461 assert!(
2463 store.index.read().accounts_to_check_for_expiry_stmts.is_empty(),
2464 "List should be empty after all accounts have been checked"
2465 );
2466
2467 assert_eq!(store.index.read().expired.len(), 0);
2469 assert_eq!(store.index.read().entries.len(), 5);
2470 }
2471
2472 #[test]
2473 fn check_expiration_handles_multiple_statements_per_account() {
2474 let (mut store, _temp) = test_store();
2475 store.set_time(100);
2476
2477 let mut stmt1 = statement(42, 1, Some(1), 100);
2480 stmt1.set_expiry_from_parts(200, 1); let hash1 = stmt1.hash();
2482 store.submit(stmt1, StatementSource::Network);
2483
2484 let mut stmt2 = statement(42, 2, Some(2), 100);
2485 stmt2.set_expiry_from_parts(300, 2); let hash2 = stmt2.hash();
2487 store.submit(stmt2, StatementSource::Network);
2488
2489 let mut stmt3 = statement(42, 3, Some(3), 100);
2490 stmt3.set_expiry_from_parts(500, 3); let hash3 = stmt3.hash();
2492 store.submit(stmt3, StatementSource::Network);
2493
2494 assert_eq!(store.index.read().entries.len(), 3);
2496
2497 store.enforce_limits();
2499
2500 store.set_time(250);
2502 store.enforce_limits();
2503
2504 {
2505 let index = store.index.read();
2506 assert!(index.expired.contains_key(&hash1), "stmt1 should be expired");
2507 assert!(!index.expired.contains_key(&hash2), "stmt2 should not be expired yet");
2508 assert!(!index.expired.contains_key(&hash3), "stmt3 should not be expired yet");
2509 assert_eq!(index.entries.len(), 2);
2510 }
2511
2512 store.enforce_limits();
2514
2515 store.set_time(400);
2517 store.enforce_limits();
2518
2519 {
2520 let index = store.index.read();
2521 assert!(index.expired.contains_key(&hash1));
2522 assert!(index.expired.contains_key(&hash2), "stmt2 should be expired");
2523 assert!(!index.expired.contains_key(&hash3), "stmt3 should not be expired yet");
2524 assert_eq!(index.entries.len(), 1);
2525 }
2526
2527 store.enforce_limits();
2529 store.set_time(600);
2530 store.enforce_limits();
2531
2532 {
2533 let index = store.index.read();
2534 assert!(index.expired.contains_key(&hash1));
2535 assert!(index.expired.contains_key(&hash2));
2536 assert!(index.expired.contains_key(&hash3), "stmt3 should be expired");
2537 assert_eq!(index.entries.len(), 0);
2538 }
2539 }
2540
2541 #[test]
2542 fn check_expiration_does_nothing_when_no_expired_statements() {
2543 let (mut store, _temp) = test_store();
2544 store.set_time(1000);
2545
2546 let stmt = statement(1, 1, None, 100);
2549 let hash = stmt.hash();
2550 store.submit(stmt, StatementSource::Network);
2551
2552 store.enforce_limits();
2554
2555 store.enforce_limits();
2557
2558 let index = store.index.read();
2560 assert!(index.entries.contains_key(&hash));
2561 assert!(!index.expired.contains_key(&hash));
2562 assert_eq!(index.entries.len(), 1);
2563 assert_eq!(index.expired.len(), 0);
2564 }
2565
2566 #[test]
2567 fn check_expiration_correctly_updates_account_data() {
2568 let (mut store, _temp) = test_store();
2569 store.set_time(100);
2570
2571 let mut stmt = statement(1, 1, Some(1), 100);
2573 stmt.set_expiry_from_parts(200, 1);
2574 let hash = stmt.hash();
2575 store.submit(stmt, StatementSource::Network);
2576
2577 {
2579 let index = store.index.read();
2580 assert!(index.accounts.contains_key(&account(1)));
2581 assert_eq!(index.total_size, 100);
2582 }
2583
2584 store.enforce_limits();
2586 store.set_time(300);
2587 store.enforce_limits();
2588
2589 {
2591 let index = store.index.read();
2592 assert!(
2593 !index.accounts.contains_key(&account(1)),
2594 "Account should be removed when all its statements expire"
2595 );
2596 assert_eq!(index.total_size, 0, "Total size should be zero");
2597 assert!(index.expired.contains_key(&hash));
2598 }
2599 }
2600
2601 #[test]
2602 fn check_expiration_clears_topic_and_key_indexes() {
2603 let (mut store, _temp) = test_store();
2604 store.set_time(100);
2605
2606 let mut stmt = statement(1, 1, Some(1), 100);
2608 stmt.set_expiry_from_parts(200, 1);
2609 stmt.set_topic(0, topic(42));
2610 stmt.set_decryption_key(dec_key(7));
2611 let hash = stmt.hash();
2612 store.submit(stmt, StatementSource::Network);
2613
2614 {
2616 let index = store.index.read();
2617 assert!(index.by_topic.get(&topic(42)).map_or(false, |s| s.contains(&hash)));
2618 assert!(index.by_dec_key.get(&Some(dec_key(7))).map_or(false, |s| s.contains(&hash)));
2619 }
2620
2621 store.enforce_limits();
2623 store.set_time(300);
2624 store.enforce_limits();
2625
2626 {
2628 let index = store.index.read();
2629 assert!(
2631 index.by_topic.get(&topic(42)).map_or(true, |s| s.is_empty()),
2632 "Topic index should be cleared"
2633 );
2634 assert!(
2636 index.by_dec_key.get(&Some(dec_key(7))).map_or(true, |s| s.is_empty()),
2637 "Decryption key index should be cleared"
2638 );
2639 assert!(index.expired.contains_key(&hash));
2640 }
2641 }
2642
2643 #[test]
2644 fn check_expiration_handles_empty_store() {
2645 let (mut store, _temp) = test_store();
2646 store.set_time(1000);
2647
2648 store.enforce_limits();
2650
2651 store.enforce_limits();
2653
2654 assert!(store.index.read().accounts_to_check_for_expiry_stmts.is_empty());
2655 assert_eq!(store.index.read().entries.len(), 0);
2656 assert_eq!(store.index.read().expired.len(), 0);
2657 }
2658
2659 #[test]
2660 fn check_expiration_expires_properly_formatted_statements() {
2661 let (mut store, _temp) = test_store();
2665 store.set_time(1000);
2666
2667 let mut stmt = statement(1, 1, None, 100);
2669 stmt.set_expiry_from_parts(1001, 1); let hash = stmt.hash();
2671 store.submit(stmt, StatementSource::Network);
2672
2673 assert_eq!(store.index.read().entries.len(), 1);
2674
2675 store.enforce_limits();
2677
2678 store.set_time(2000);
2680 store.enforce_limits();
2681
2682 let index = store.index.read();
2686 assert!(
2687 !index.entries.contains_key(&hash),
2688 "Statement should be removed from entries after expiration"
2689 );
2690 assert!(index.expired.contains_key(&hash), "Statement should be in expired list");
2691 }
2692
2693 #[test]
2694 fn check_expiration_updates_database_columns() {
2695 let (mut store, _temp) = test_store();
2697 store.set_time(100);
2698
2699 let mut stmt = statement(1, 1, None, 100);
2701 stmt.set_expiry_from_parts(200, 1);
2702 let hash = stmt.hash();
2703 store.submit(stmt.clone(), StatementSource::Network);
2704
2705 let db_entry = store.db.get(col::STATEMENTS, &hash).unwrap();
2707 assert!(db_entry.is_some(), "Statement should be in col::STATEMENTS after submit");
2708
2709 store.enforce_limits();
2711
2712 store.set_time(300);
2714 store.enforce_limits();
2715
2716 {
2718 let index = store.index.read();
2719 assert!(
2720 !index.entries.contains_key(&hash),
2721 "Statement should be removed from in-memory entries"
2722 );
2723 assert!(
2724 index.expired.contains_key(&hash),
2725 "Statement should be in in-memory expired map"
2726 );
2727 }
2728
2729 let db_entry = store.db.get(col::STATEMENTS, &hash).unwrap();
2730 assert!(
2731 db_entry.is_none(),
2732 "Statement should be removed from col::STATEMENTS after expiration"
2733 );
2734
2735 let expired_entry = store.db.get(col::EXPIRED, &hash).unwrap();
2736 assert!(expired_entry.is_some(), "Expiration info should be written to col::EXPIRED");
2737 }
2738
2739 #[test]
2740 fn enforce_allowances_evicts_excess_statements() {
2741 let (mut store, _temp) = test_store();
2746 store.set_time(0);
2747
2748 let s1 = statement(4, 10, None, 100); let s2 = statement(4, 20, None, 100);
2751 let s3 = statement(4, 30, None, 100);
2752 let s4 = statement(4, 40, None, 100);
2753 let s5 = statement(4, 50, None, 100); let h1 = s1.hash();
2756 let h5 = s5.hash();
2757
2758 {
2760 let mut index = store.index.write();
2761 for s in [&s1, &s2, &s3, &s4, &s5] {
2762 index.insert_new(s.hash(), account(4), s, false);
2763 }
2764 }
2765
2766 assert_eq!(store.index.read().entries.len(), 5);
2768 assert_eq!(store.index.read().total_size, 500);
2769
2770 store.enforce_limits();
2774 store.enforce_limits();
2775
2776 let index = store.index.read();
2778 assert_eq!(index.entries.len(), 4, "Should have 4 statements after eviction");
2779 assert!(!index.entries.contains_key(&h1), "Lowest priority should be evicted");
2780 assert!(index.entries.contains_key(&h5), "Highest priority should remain");
2781 assert_eq!(index.total_size, 400);
2782
2783 assert!(index.expired.contains_key(&h1));
2785 }
2786
2787 #[test]
2788 fn enforce_allowances_evicts_all_when_no_allowance_found() {
2789 let (mut store, _temp) = test_store();
2790 store.set_time(0);
2791
2792 let s1 = statement(0, 10, None, 100);
2794 let s2 = statement(0, 20, None, 150);
2795
2796 let h1 = s1.hash();
2797 let h2 = s2.hash();
2798
2799 {
2801 let mut index = store.index.write();
2802 index.insert_new(h1, account(0), &s1, false);
2803 index.insert_new(h2, account(0), &s2, false);
2804 }
2805
2806 assert_eq!(store.index.read().entries.len(), 2);
2807
2808 store.enforce_limits();
2811 store.enforce_limits();
2812
2813 let index = store.index.read();
2814 assert_eq!(index.entries.len(), 0, "All statements should be evicted");
2815 assert!(!index.accounts.contains_key(&account(0)), "Account should be removed");
2816 assert!(index.expired.contains_key(&h1));
2817 assert!(index.expired.contains_key(&h2));
2818 }
2819
2820 #[test]
2821 fn enforce_allowances_based_on_size() {
2822 let (mut store, _temp) = test_store();
2824 store.set_time(0);
2825
2826 let s1 = statement(2, 10, None, 600); let s2 = statement(2, 20, None, 600); let h1 = s1.hash();
2832 let h2 = s2.hash();
2833
2834 {
2836 let mut index = store.index.write();
2837 index.insert_new(h1, account(2), &s1, false);
2838 index.insert_new(h2, account(2), &s2, false);
2839 }
2840
2841 assert_eq!(store.index.read().total_size, 1200);
2842
2843 store.enforce_limits();
2846 store.enforce_limits();
2847
2848 let index = store.index.read();
2849 assert_eq!(index.entries.len(), 1);
2850 assert!(index.entries.contains_key(&h2), "Higher priority should remain");
2851 assert!(!index.entries.contains_key(&h1), "Lower priority should be evicted");
2852 assert_eq!(index.total_size, 600);
2853 }
2854}