1use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET};
8use async_trait::async_trait;
9use futures::channel::mpsc::Receiver;
10use indexmap::IndexMap;
11use soil_client::blockchain::{HashAndNumber, TreeRoute};
12use soil_client::transaction_pool::error;
13use std::{
14 collections::HashMap,
15 sync::Arc,
16 time::{Duration, Instant},
17};
18use subsoil::runtime::{
19 generic::BlockId,
20 traits::{self, Block as BlockT, SaturatedConversion},
21 transaction_validity::{
22 TransactionSource, TransactionTag as Tag, TransactionValidity, TransactionValidityError,
23 },
24};
25use tracing::{debug, instrument, trace, Level};
26
27use super::{
28 base_pool as base,
29 validated_pool::{IsValidator, ValidatedPool, ValidatedTransaction},
30 EventHandler, ValidatedPoolSubmitOutcome,
31};
32
33pub type EventStream<H> = Receiver<H>;
35
36pub type BlockHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
38pub type ExtrinsicHash<A> = <<A as ChainApi>::Block as traits::Block>::Hash;
40pub type ExtrinsicFor<A> = Arc<<<A as ChainApi>::Block as traits::Block>::Extrinsic>;
42pub type RawExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
44pub type NumberFor<A> = traits::NumberFor<<A as ChainApi>::Block>;
46pub type TransactionFor<A> = Arc<base::Transaction<ExtrinsicHash<A>, ExtrinsicFor<A>>>;
48pub type ValidatedTransactionFor<A> =
50 ValidatedTransaction<ExtrinsicHash<A>, ExtrinsicFor<A>, <A as ChainApi>::Error>;
51
52#[derive(PartialEq, Copy, Clone)]
54pub enum ValidateTransactionPriority {
55 Submitted,
59 Maintained,
63}
64
65#[async_trait]
67pub trait ChainApi: Send + Sync {
68 type Block: BlockT;
70 type Error: From<error::Error> + error::IntoPoolError + error::IntoMetricsLabel;
72
73 async fn validate_transaction(
75 &self,
76 at: <Self::Block as BlockT>::Hash,
77 source: TransactionSource,
78 uxt: ExtrinsicFor<Self>,
79 validation_priority: ValidateTransactionPriority,
80 ) -> Result<TransactionValidity, Self::Error>;
81
82 fn validate_transaction_blocking(
87 &self,
88 at: <Self::Block as BlockT>::Hash,
89 source: TransactionSource,
90 uxt: ExtrinsicFor<Self>,
91 ) -> Result<TransactionValidity, Self::Error>;
92
93 fn block_id_to_number(
95 &self,
96 at: &BlockId<Self::Block>,
97 ) -> Result<Option<NumberFor<Self>>, Self::Error>;
98
99 fn block_id_to_hash(
101 &self,
102 at: &BlockId<Self::Block>,
103 ) -> Result<Option<<Self::Block as BlockT>::Hash>, Self::Error>;
104
105 fn hash_and_length(&self, uxt: &RawExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);
107
108 async fn block_body(
110 &self,
111 at: <Self::Block as BlockT>::Hash,
112 ) -> Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>;
113
114 fn block_header(
116 &self,
117 at: <Self::Block as BlockT>::Hash,
118 ) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error>;
119
120 fn tree_route(
122 &self,
123 from: <Self::Block as BlockT>::Hash,
124 to: <Self::Block as BlockT>::Hash,
125 ) -> Result<TreeRoute<Self::Block>, Self::Error>;
126
127 fn resolve_block_number(
129 &self,
130 at: <Self::Block as BlockT>::Hash,
131 ) -> Result<NumberFor<Self>, Self::Error> {
132 self.block_id_to_number(&BlockId::Hash(at)).and_then(|number| {
133 number.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())
134 })
135 }
136}
137
138#[derive(Debug, Clone)]
140pub struct Options {
141 pub ready: base::Limit,
143 pub future: base::Limit,
145 pub reject_future_transactions: bool,
147 pub ban_time: Duration,
149}
150
151impl Default for Options {
152 fn default() -> Self {
153 Self {
154 ready: base::Limit { count: 8192, total_bytes: 20 * 1024 * 1024 },
155 future: base::Limit { count: 512, total_bytes: 1 * 1024 * 1024 },
156 reject_future_transactions: false,
157 ban_time: Duration::from_secs(60 * 30),
158 }
159 }
160}
161
162impl Options {
163 pub fn total_count(&self) -> usize {
165 self.ready.count + self.future.count
166 }
167}
168
169#[derive(Copy, Clone)]
172pub(crate) enum CheckBannedBeforeVerify {
173 Yes,
174 No,
175}
176
177pub struct Pool<B: ChainApi, L: EventHandler<B>> {
179 validated_pool: Arc<ValidatedPool<B, L>>,
180}
181
182impl<B: ChainApi, L: EventHandler<B>> Pool<B, L> {
183 pub fn new_with_staticly_sized_rotator(
185 options: Options,
186 is_validator: IsValidator,
187 api: Arc<B>,
188 ) -> Self {
189 Self {
190 validated_pool: Arc::new(ValidatedPool::new_with_staticly_sized_rotator(
191 options,
192 is_validator,
193 api,
194 )),
195 }
196 }
197
198 pub fn new(options: Options, is_validator: IsValidator, api: Arc<B>) -> Self {
200 Self { validated_pool: Arc::new(ValidatedPool::new(options, is_validator, api)) }
201 }
202
203 pub fn new_with_event_handler(
205 options: Options,
206 is_validator: IsValidator,
207 api: Arc<B>,
208 event_handler: L,
209 ) -> Self {
210 Self {
211 validated_pool: Arc::new(ValidatedPool::new_with_event_handler(
212 options,
213 is_validator,
214 api,
215 event_handler,
216 )),
217 }
218 }
219
220 #[instrument(level = Level::TRACE, skip_all, target="txpool", name = "pool::submit_at")]
222 pub async fn submit_at(
223 &self,
224 at: &HashAndNumber<B::Block>,
225 xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
226 validation_priority: ValidateTransactionPriority,
227 ) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
228 let validated_transactions =
229 self.verify(at, xts, CheckBannedBeforeVerify::Yes, validation_priority).await;
230 self.validated_pool.submit(validated_transactions.into_values())
231 }
232
233 pub async fn resubmit_at(
237 &self,
238 at: &HashAndNumber<B::Block>,
239 xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
240 validation_priority: ValidateTransactionPriority,
241 ) -> Vec<Result<ValidatedPoolSubmitOutcome<B>, B::Error>> {
242 let validated_transactions =
243 self.verify(at, xts, CheckBannedBeforeVerify::No, validation_priority).await;
244 self.validated_pool.submit(validated_transactions.into_values())
245 }
246
247 pub async fn submit_one(
249 &self,
250 at: &HashAndNumber<B::Block>,
251 source: base::TimedTransactionSource,
252 xt: ExtrinsicFor<B>,
253 ) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
254 let res = self
255 .submit_at(at, std::iter::once((source, xt)), ValidateTransactionPriority::Submitted)
256 .await
257 .pop();
258 res.expect("One extrinsic passed; one result returned; qed")
259 }
260
261 pub async fn submit_and_watch(
263 &self,
264 at: &HashAndNumber<B::Block>,
265 source: base::TimedTransactionSource,
266 xt: ExtrinsicFor<B>,
267 ) -> Result<ValidatedPoolSubmitOutcome<B>, B::Error> {
268 let (_, tx) = self
269 .verify_one(
270 at.hash,
271 at.number,
272 source,
273 xt,
274 CheckBannedBeforeVerify::Yes,
275 ValidateTransactionPriority::Submitted,
276 )
277 .await;
278 self.validated_pool.submit_and_watch(tx)
279 }
280
281 pub fn resubmit(
283 &self,
284 revalidated_transactions: IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>>,
285 ) {
286 let now = Instant::now();
287 self.validated_pool.resubmit(revalidated_transactions);
288 trace!(
289 target: LOG_TARGET,
290 duration = ?now.elapsed(),
291 status = ?self.validated_pool.status(),
292 "Resubmitted transaction."
293 );
294 }
295
296 pub fn prune_known(&self, at: &HashAndNumber<B::Block>, hashes: &[ExtrinsicHash<B>]) {
302 let in_pool_tags =
304 self.validated_pool.extrinsics_tags(hashes).into_iter().flatten().flatten();
305
306 let prune_status = self.validated_pool.prune_tags(in_pool_tags);
308 let pruned_transactions =
309 hashes.iter().cloned().chain(prune_status.pruned.iter().map(|tx| tx.hash));
310 self.validated_pool.fire_pruned(at, pruned_transactions);
311 }
312
313 pub async fn prune(
320 &self,
321 at: &HashAndNumber<B::Block>,
322 parent: <B::Block as BlockT>::Hash,
323 extrinsics: &[RawExtrinsicFor<B>],
324 known_provides_tags: Option<Arc<HashMap<ExtrinsicHash<B>, Vec<Tag>>>>,
325 ) {
326 debug!(
327 target: LOG_TARGET,
328 ?at,
329 extrinsics_count = extrinsics.len(),
330 "Starting pruning of block."
331 );
332 let in_pool_hashes =
334 extrinsics.iter().map(|extrinsic| self.hash_of(extrinsic)).collect::<Vec<_>>();
335 let in_pool_tags = self.validated_pool.extrinsics_tags(&in_pool_hashes);
336 let mut unknown_txs_count = 0usize;
338 let mut reused_txs_count = 0usize;
339 let tags = in_pool_hashes.iter().zip(in_pool_tags).map(|(tx_hash, tags)| {
340 tags.or_else(|| {
341 unknown_txs_count += 1;
342 known_provides_tags.as_ref().and_then(|inner| {
343 inner.get(&tx_hash).map(|found_tags| {
344 reused_txs_count += 1;
345 found_tags.clone()
346 })
347 })
348 })
349 });
350
351 let all = extrinsics.iter().zip(tags);
354 let mut validated_counter: usize = 0;
355 let mut future_tags = Vec::new();
356 let now = Instant::now();
357 for (extrinsic, in_pool_tags) in all {
358 match in_pool_tags {
359 Some(tags) => future_tags.extend(tags),
362 None => {
365 if !self.validated_pool.status().is_empty() {
367 validated_counter = validated_counter + 1;
368 let validity = self
369 .validated_pool
370 .api()
371 .validate_transaction(
372 parent,
373 TransactionSource::InBlock,
374 Arc::from(extrinsic.clone()),
375 ValidateTransactionPriority::Maintained,
376 )
377 .await;
378
379 trace!(
380 target: LOG_TARGET,
381 tx_hash = ?self.validated_pool.api().hash_and_length(&extrinsic.clone()).0,
382 ?validity,
383 "prune::revalidated"
384 );
385 if let Ok(Ok(validity)) = validity {
386 future_tags.extend(validity.provides);
387 }
388 } else {
389 trace!(
390 target: LOG_TARGET,
391 ?at,
392 "txpool is empty, skipping validation for block",
393 );
394 }
395 },
396 }
397 }
398
399 let known_provides_tags_len = known_provides_tags.map(|inner| inner.len()).unwrap_or(0);
400 debug!(
401 target: LOG_TARGET,
402 validated_counter,
403 known_provides_tags_len,
404 unknown_txs_count,
405 reused_txs_count,
406 duration = ?now.elapsed(),
407 "prune"
408 );
409 self.prune_tags(at, future_tags, in_pool_hashes).await
410 }
411
412 pub async fn prune_tags(
434 &self,
435 at: &HashAndNumber<B::Block>,
436 tags: impl IntoIterator<Item = Tag>,
437 known_imported_hashes: impl IntoIterator<Item = ExtrinsicHash<B>> + Clone,
438 ) {
439 let now = Instant::now();
440 trace!(target: LOG_TARGET, ?at, "Pruning tags.");
441 let prune_status = self.validated_pool.prune_tags(tags);
443
444 self.validated_pool
448 .ban(&Instant::now(), known_imported_hashes.clone().into_iter());
449
450 let pruned_transactions =
453 prune_status.pruned.into_iter().map(|tx| (tx.source.clone(), tx.data.clone()));
454
455 let reverified_transactions = self
456 .verify(
457 at,
458 pruned_transactions,
459 CheckBannedBeforeVerify::Yes,
460 ValidateTransactionPriority::Maintained,
461 )
462 .await;
463
464 let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect::<Vec<_>>();
465 debug!(
466 target: LOG_TARGET,
467 ?at,
468 reverified_transactions = reverified_transactions.len(),
469 duration = ?now.elapsed(),
470 "Pruned. Resubmitting transactions."
471 );
472 log_xt_trace!(data: tuple, target: LOG_TARGET, &reverified_transactions, "Resubmitting transaction: {:?}");
473
474 self.validated_pool.resubmit_pruned(
476 &at,
477 known_imported_hashes,
478 pruned_hashes,
479 reverified_transactions.into_values().collect(),
480 )
481 }
482
483 pub fn hash_of(&self, xt: &RawExtrinsicFor<B>) -> ExtrinsicHash<B> {
485 self.validated_pool.api().hash_and_length(xt).0
486 }
487
488 #[instrument(level = Level::TRACE, skip_all, target = "txpool",name = "pool::verify")]
490 async fn verify(
491 &self,
492 at: &HashAndNumber<B::Block>,
493 xts: impl IntoIterator<Item = (base::TimedTransactionSource, ExtrinsicFor<B>)>,
494 check: CheckBannedBeforeVerify,
495 validation_priority: ValidateTransactionPriority,
496 ) -> IndexMap<ExtrinsicHash<B>, ValidatedTransactionFor<B>> {
497 let HashAndNumber { number, hash } = *at;
498
499 let res = futures::future::join_all(xts.into_iter().map(|(source, xt)| {
500 self.verify_one(hash, number, source, xt, check, validation_priority)
501 }))
502 .await
503 .into_iter()
504 .collect::<IndexMap<_, _>>();
505
506 res
507 }
508
509 #[instrument(level = Level::TRACE, skip_all, target = "txpool",name = "pool::verify_one")]
511 pub(crate) async fn verify_one(
512 &self,
513 block_hash: <B::Block as BlockT>::Hash,
514 block_number: NumberFor<B>,
515 source: base::TimedTransactionSource,
516 xt: ExtrinsicFor<B>,
517 check: CheckBannedBeforeVerify,
518 validation_priority: ValidateTransactionPriority,
519 ) -> (ExtrinsicHash<B>, ValidatedTransactionFor<B>) {
520 let (hash, bytes) = self.validated_pool.api().hash_and_length(&xt);
521
522 let ignore_banned = matches!(check, CheckBannedBeforeVerify::No);
523 if let Err(err) = self.validated_pool.check_is_known(&hash, ignore_banned) {
524 return (hash, ValidatedTransaction::Invalid(hash, err));
525 }
526
527 let validation_result = self
528 .validated_pool
529 .api()
530 .validate_transaction(
531 block_hash,
532 source.clone().into(),
533 xt.clone(),
534 validation_priority,
535 )
536 .await;
537
538 let status = match validation_result {
539 Ok(status) => status,
540 Err(e) => return (hash, ValidatedTransaction::Invalid(hash, e)),
541 };
542
543 let validity = match status {
544 Ok(validity) => {
545 if validity.provides.is_empty() {
546 ValidatedTransaction::Invalid(hash, error::Error::NoTagsProvided.into())
547 } else {
548 ValidatedTransaction::valid_at(
549 block_number.saturated_into::<u64>(),
550 hash,
551 source,
552 xt,
553 bytes,
554 validity,
555 )
556 }
557 },
558 Err(TransactionValidityError::Invalid(e)) => {
559 ValidatedTransaction::Invalid(hash, error::Error::InvalidTransaction(e).into())
560 },
561 Err(TransactionValidityError::Unknown(e)) => {
562 ValidatedTransaction::Unknown(hash, error::Error::UnknownTransaction(e).into())
563 },
564 };
565
566 (hash, validity)
567 }
568
569 pub fn validated_pool(&self) -> &ValidatedPool<B, L> {
571 &self.validated_pool
572 }
573
574 pub fn clear_recently_pruned(&mut self) {
576 self.validated_pool.pool.write().clear_recently_pruned();
577 }
578}
579
580impl<B: ChainApi, L: EventHandler<B>> Pool<B, L> {
581 pub fn deep_clone_with_event_handler(&self, event_handler: L) -> Self {
585 let other: ValidatedPool<B, L> =
586 self.validated_pool().deep_clone_with_event_handler(event_handler);
587 Self { validated_pool: Arc::from(other) }
588 }
589}
590
591#[cfg(test)]
592mod tests {
593 use super::{super::base_pool::Limit, *};
594 use crate::common::tests::{pool, uxt, TestApi, INVALID_NONCE};
595 use assert_matches::assert_matches;
596 use base::TimedTransactionSource;
597 use codec::Encode;
598 use futures::executor::block_on;
599 use parking_lot::Mutex;
600 use soil_client::transaction_pool::TransactionStatus;
601 use std::{collections::HashMap, time::Instant};
602 use subsoil::runtime::transaction_validity::TransactionSource;
603 use soil_test_node_runtime::{AccountId, ExtrinsicBuilder, Transfer, H256};
604 use soil_test_node_runtime_client::Sr25519Keyring::{Alice, Bob};
605
606 const SOURCE: TimedTransactionSource =
607 TimedTransactionSource { source: TransactionSource::External, timestamp: None };
608
609 type Pool<Api> = super::Pool<Api, ()>;
610
611 #[test]
612 fn should_validate_and_import_transaction() {
613 let (pool, api) = pool();
615
616 let hash = block_on(
618 pool.submit_one(
619 &api.expect_hash_and_number(0),
620 SOURCE,
621 uxt(Transfer {
622 from: Alice.into(),
623 to: AccountId::from_h256(H256::from_low_u64_be(2)),
624 amount: 5,
625 nonce: 0,
626 })
627 .into(),
628 ),
629 )
630 .map(|outcome| outcome.hash())
631 .unwrap();
632
633 assert_eq!(pool.validated_pool().ready().map(|v| v.hash).collect::<Vec<_>>(), vec![hash]);
635 }
636
637 #[test]
638 fn submit_at_preserves_order() {
639 subsoil::tracing::try_init_simple();
640 let (pool, api) = pool();
642
643 let txs = (0..10)
644 .map(|i| {
645 uxt(Transfer {
646 from: Alice.into(),
647 to: AccountId::from_h256(H256::from_low_u64_be(i)),
648 amount: 5,
649 nonce: i,
650 })
651 .into()
652 })
653 .collect::<Vec<_>>();
654
655 let initial_hashes = txs.iter().map(|t| api.hash_and_length(t).0).collect::<Vec<_>>();
656
657 let txs = txs.into_iter().map(|x| (SOURCE, Arc::from(x))).collect::<Vec<_>>();
659 let hashes = block_on(pool.submit_at(
660 &api.expect_hash_and_number(0),
661 txs,
662 ValidateTransactionPriority::Submitted,
663 ))
664 .into_iter()
665 .map(|r| r.map(|o| o.hash()))
666 .collect::<Vec<_>>();
667 debug!(hashes = ?hashes, "-->");
668
669 hashes.into_iter().zip(initial_hashes.into_iter()).for_each(
671 |(result_hash, initial_hash)| {
672 assert_eq!(result_hash.unwrap(), initial_hash);
673 },
674 );
675 }
676
677 #[test]
678 fn should_reject_if_temporarily_banned() {
679 let (pool, api) = pool();
681 let uxt = uxt(Transfer {
682 from: Alice.into(),
683 to: AccountId::from_h256(H256::from_low_u64_be(2)),
684 amount: 5,
685 nonce: 0,
686 });
687
688 pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]);
690 let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
691 .map(|o| o.hash());
692 assert_eq!(pool.validated_pool().status().ready, 0);
693 assert_eq!(pool.validated_pool().status().future, 0);
694
695 assert_matches!(res.unwrap_err(), error::Error::TemporarilyBanned);
697 }
698
699 #[test]
700 fn should_reject_unactionable_transactions() {
701 let api = Arc::new(TestApi::default());
703 let pool = Pool::new_with_staticly_sized_rotator(
704 Default::default(),
705 false.into(),
707 api.clone(),
708 );
709
710 let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build();
712
713 let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
715 .map(|o| o.hash());
716
717 assert_matches!(res.unwrap_err(), error::Error::Unactionable);
719 }
720
721 #[test]
722 fn should_notify_about_pool_events() {
723 let (stream, hash0, hash1) = {
724 let (pool, api) = pool();
726 let han_of_block0 = api.expect_hash_and_number(0);
727 let stream = pool.validated_pool().import_notification_stream();
728
729 let hash0 = block_on(
731 pool.submit_one(
732 &han_of_block0,
733 SOURCE,
734 uxt(Transfer {
735 from: Alice.into(),
736 to: AccountId::from_h256(H256::from_low_u64_be(2)),
737 amount: 5,
738 nonce: 0,
739 })
740 .into(),
741 ),
742 )
743 .unwrap()
744 .hash();
745 let hash1 = block_on(
746 pool.submit_one(
747 &han_of_block0,
748 SOURCE,
749 uxt(Transfer {
750 from: Alice.into(),
751 to: AccountId::from_h256(H256::from_low_u64_be(2)),
752 amount: 5,
753 nonce: 1,
754 })
755 .into(),
756 ),
757 )
758 .unwrap()
759 .hash();
760 let _hash = block_on(
762 pool.submit_one(
763 &han_of_block0,
764 SOURCE,
765 uxt(Transfer {
766 from: Alice.into(),
767 to: AccountId::from_h256(H256::from_low_u64_be(2)),
768 amount: 5,
769 nonce: 3,
770 })
771 .into(),
772 ),
773 )
774 .unwrap()
775 .hash();
776
777 assert_eq!(pool.validated_pool().status().ready, 2);
778 assert_eq!(pool.validated_pool().status().future, 1);
779
780 (stream, hash0, hash1)
781 };
782
783 let mut it = futures::executor::block_on_stream(stream);
785 assert_eq!(it.next(), Some(hash0));
786 assert_eq!(it.next(), Some(hash1));
787 assert_eq!(it.next(), None);
788 }
789
790 #[test]
791 fn should_clear_stale_transactions() {
792 let (pool, api) = pool();
794 let han_of_block0 = api.expect_hash_and_number(0);
795 let hash1 = block_on(
796 pool.submit_one(
797 &han_of_block0,
798 SOURCE,
799 uxt(Transfer {
800 from: Alice.into(),
801 to: AccountId::from_h256(H256::from_low_u64_be(2)),
802 amount: 5,
803 nonce: 0,
804 })
805 .into(),
806 ),
807 )
808 .unwrap()
809 .hash();
810 let hash2 = block_on(
811 pool.submit_one(
812 &han_of_block0,
813 SOURCE,
814 uxt(Transfer {
815 from: Alice.into(),
816 to: AccountId::from_h256(H256::from_low_u64_be(2)),
817 amount: 5,
818 nonce: 1,
819 })
820 .into(),
821 ),
822 )
823 .unwrap()
824 .hash();
825 let hash3 = block_on(
826 pool.submit_one(
827 &han_of_block0,
828 SOURCE,
829 uxt(Transfer {
830 from: Alice.into(),
831 to: AccountId::from_h256(H256::from_low_u64_be(2)),
832 amount: 5,
833 nonce: 3,
834 })
835 .into(),
836 ),
837 )
838 .unwrap()
839 .hash();
840
841 pool.validated_pool.clear_stale(&api.expect_hash_and_number(5));
843
844 assert_eq!(pool.validated_pool().ready().count(), 0);
846 assert_eq!(pool.validated_pool().status().future, 0);
847 assert_eq!(pool.validated_pool().status().ready, 0);
848 assert!(pool.validated_pool.is_banned(&hash1));
850 assert!(pool.validated_pool.is_banned(&hash2));
851 assert!(pool.validated_pool.is_banned(&hash3));
852 }
853
854 #[test]
855 fn should_ban_mined_transactions() {
856 let (pool, api) = pool();
858 let hash1 = block_on(
859 pool.submit_one(
860 &api.expect_hash_and_number(0),
861 SOURCE,
862 uxt(Transfer {
863 from: Alice.into(),
864 to: AccountId::from_h256(H256::from_low_u64_be(2)),
865 amount: 5,
866 nonce: 0,
867 })
868 .into(),
869 ),
870 )
871 .unwrap()
872 .hash();
873
874 block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![vec![0]], vec![hash1]));
876
877 assert!(pool.validated_pool.is_banned(&hash1));
879 }
880
881 #[test]
882 fn should_limit_futures() {
883 subsoil::tracing::try_init_simple();
884
885 let xt = uxt(Transfer {
886 from: Alice.into(),
887 to: AccountId::from_h256(H256::from_low_u64_be(2)),
888 amount: 5,
889 nonce: 1,
890 });
891
892 let limit = Limit { count: 100, total_bytes: xt.encoded_size() };
894
895 let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
896
897 let api = Arc::new(TestApi::default());
898 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
899
900 let hash1 = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
901 .unwrap()
902 .hash();
903 assert_eq!(pool.validated_pool().status().future, 1);
904
905 let hash2 = block_on(
907 pool.submit_one(
908 &api.expect_hash_and_number(0),
909 SOURCE,
910 uxt(Transfer {
911 from: Bob.into(),
912 to: AccountId::from_h256(H256::from_low_u64_be(2)),
913 amount: 5,
914 nonce: 10,
915 })
916 .into(),
917 ),
918 )
919 .unwrap()
920 .hash();
921
922 assert_eq!(pool.validated_pool().status().future, 1);
924 assert!(pool.validated_pool.is_banned(&hash1));
925 assert!(!pool.validated_pool.is_banned(&hash2));
926 }
927
928 #[test]
929 fn should_error_if_reject_immediately() {
930 let limit = Limit { count: 100, total_bytes: 10 };
932
933 let options = Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
934
935 let api = Arc::new(TestApi::default());
936 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
937
938 block_on(
940 pool.submit_one(
941 &api.expect_hash_and_number(0),
942 SOURCE,
943 uxt(Transfer {
944 from: Alice.into(),
945 to: AccountId::from_h256(H256::from_low_u64_be(2)),
946 amount: 5,
947 nonce: 1,
948 })
949 .into(),
950 ),
951 )
952 .map(|o| o.hash())
953 .unwrap_err();
954
955 assert_eq!(pool.validated_pool().status().ready, 0);
957 assert_eq!(pool.validated_pool().status().future, 0);
958 }
959
960 #[test]
961 fn should_reject_transactions_with_no_provides() {
962 let (pool, api) = pool();
964
965 let err = block_on(
967 pool.submit_one(
968 &api.expect_hash_and_number(0),
969 SOURCE,
970 uxt(Transfer {
971 from: Alice.into(),
972 to: AccountId::from_h256(H256::from_low_u64_be(2)),
973 amount: 5,
974 nonce: INVALID_NONCE,
975 })
976 .into(),
977 ),
978 )
979 .map(|o| o.hash())
980 .unwrap_err();
981
982 assert_eq!(pool.validated_pool().status().ready, 0);
984 assert_eq!(pool.validated_pool().status().future, 0);
985 assert_matches!(err, error::Error::NoTagsProvided);
986 }
987
988 mod listener {
989 use super::*;
990
991 #[test]
992 fn should_trigger_ready_and_finalized() {
993 let (pool, api) = pool();
995 let watcher = block_on(
996 pool.submit_and_watch(
997 &api.expect_hash_and_number(0),
998 SOURCE,
999 uxt(Transfer {
1000 from: Alice.into(),
1001 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1002 amount: 5,
1003 nonce: 0,
1004 })
1005 .into(),
1006 ),
1007 )
1008 .unwrap()
1009 .expect_watcher();
1010 assert_eq!(pool.validated_pool().status().ready, 1);
1011 assert_eq!(pool.validated_pool().status().future, 0);
1012
1013 let han_of_block2 = api.expect_hash_and_number(2);
1014
1015 block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![]));
1017 assert_eq!(pool.validated_pool().status().ready, 0);
1018 assert_eq!(pool.validated_pool().status().future, 0);
1019
1020 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1022 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1023 assert_eq!(
1024 stream.next(),
1025 Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
1026 );
1027 }
1028
1029 #[test]
1030 fn should_trigger_ready_and_finalized_when_pruning_via_hash() {
1031 let (pool, api) = pool();
1033 let watcher = block_on(
1034 pool.submit_and_watch(
1035 &api.expect_hash_and_number(0),
1036 SOURCE,
1037 uxt(Transfer {
1038 from: Alice.into(),
1039 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1040 amount: 5,
1041 nonce: 0,
1042 })
1043 .into(),
1044 ),
1045 )
1046 .unwrap()
1047 .expect_watcher();
1048 assert_eq!(pool.validated_pool().status().ready, 1);
1049 assert_eq!(pool.validated_pool().status().future, 0);
1050
1051 let han_of_block2 = api.expect_hash_and_number(2);
1052
1053 block_on(pool.prune_tags(&han_of_block2, vec![vec![0u8]], vec![*watcher.hash()]));
1055 assert_eq!(pool.validated_pool().status().ready, 0);
1056 assert_eq!(pool.validated_pool().status().future, 0);
1057
1058 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1060 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1061 assert_eq!(
1062 stream.next(),
1063 Some(TransactionStatus::InBlock((han_of_block2.hash.into(), 0))),
1064 );
1065 }
1066
1067 #[test]
1068 fn should_trigger_future_and_ready_after_promoted() {
1069 let (pool, api) = pool();
1071 let han_of_block0 = api.expect_hash_and_number(0);
1072
1073 let watcher = block_on(
1074 pool.submit_and_watch(
1075 &han_of_block0,
1076 SOURCE,
1077 uxt(Transfer {
1078 from: Alice.into(),
1079 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1080 amount: 5,
1081 nonce: 1,
1082 })
1083 .into(),
1084 ),
1085 )
1086 .unwrap()
1087 .expect_watcher();
1088 assert_eq!(pool.validated_pool().status().ready, 0);
1089 assert_eq!(pool.validated_pool().status().future, 1);
1090
1091 block_on(
1093 pool.submit_one(
1094 &han_of_block0,
1095 SOURCE,
1096 uxt(Transfer {
1097 from: Alice.into(),
1098 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1099 amount: 5,
1100 nonce: 0,
1101 })
1102 .into(),
1103 ),
1104 )
1105 .unwrap();
1106 assert_eq!(pool.validated_pool().status().ready, 2);
1107
1108 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1110 assert_eq!(stream.next(), Some(TransactionStatus::Future));
1111 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1112 }
1113
1114 #[test]
1115 fn should_trigger_invalid_and_ban() {
1116 let (pool, api) = pool();
1118 let uxt = uxt(Transfer {
1119 from: Alice.into(),
1120 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1121 amount: 5,
1122 nonce: 0,
1123 });
1124 let watcher =
1125 block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
1126 .unwrap()
1127 .expect_watcher();
1128 assert_eq!(pool.validated_pool().status().ready, 1);
1129
1130 pool.validated_pool.remove_invalid(&[*watcher.hash()]);
1132
1133 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1135 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1136 assert_eq!(stream.next(), Some(TransactionStatus::Invalid));
1137 assert_eq!(stream.next(), None);
1138 }
1139
1140 #[test]
1141 fn should_trigger_broadcasted() {
1142 let (pool, api) = pool();
1144 let uxt = uxt(Transfer {
1145 from: Alice.into(),
1146 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1147 amount: 5,
1148 nonce: 0,
1149 });
1150 let watcher =
1151 block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into()))
1152 .unwrap()
1153 .expect_watcher();
1154 assert_eq!(pool.validated_pool().status().ready, 1);
1155
1156 let mut map = HashMap::new();
1158 let peers = vec!["a".into(), "b".into(), "c".into()];
1159 map.insert(*watcher.hash(), peers.clone());
1160 pool.validated_pool().on_broadcasted(map);
1161
1162 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1164 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1165 assert_eq!(stream.next(), Some(TransactionStatus::Broadcast(peers)));
1166 }
1167
1168 #[test]
1169 fn should_trigger_dropped_older() {
1170 let limit = Limit { count: 1, total_bytes: 1000 };
1172 let options =
1173 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1174
1175 let api = Arc::new(TestApi::default());
1176 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1177
1178 let xt = uxt(Transfer {
1179 from: Alice.into(),
1180 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1181 amount: 5,
1182 nonce: 0,
1183 });
1184 let watcher =
1185 block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1186 .unwrap()
1187 .expect_watcher();
1188 assert_eq!(pool.validated_pool().status().ready, 1);
1189
1190 let xt = uxt(Transfer {
1192 from: Bob.into(),
1193 to: AccountId::from_h256(H256::from_low_u64_be(1)),
1194 amount: 4,
1195 nonce: 1,
1196 });
1197 block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into())).unwrap();
1198 assert_eq!(pool.validated_pool().status().ready, 1);
1199
1200 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1202 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1203 assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1204 }
1205
1206 #[test]
1207 fn should_trigger_dropped_lower_priority() {
1208 {
1209 let limit = Limit { count: 1, total_bytes: 1000 };
1211 let options =
1212 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1213
1214 let api = Arc::new(TestApi::default());
1215 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1216
1217 let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1220 block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into()))
1221 .unwrap();
1222 assert_eq!(pool.validated_pool().status().ready, 1);
1223
1224 let xt = uxt(Transfer {
1228 from: Bob.into(),
1229 to: AccountId::from_h256(H256::from_low_u64_be(1)),
1230 amount: 4,
1231 nonce: 1,
1232 });
1233 let result =
1234 block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()));
1235 assert!(matches!(
1236 result,
1237 Err(soil_client::transaction_pool::error::Error::ImmediatelyDropped)
1238 ));
1239 }
1240 {
1241 let limit = Limit { count: 2, total_bytes: 1000 };
1243 let options =
1244 Options { ready: limit.clone(), future: limit.clone(), ..Default::default() };
1245
1246 let api = Arc::new(TestApi::default());
1247 let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone());
1248
1249 let han_of_block0 = api.expect_hash_and_number(0);
1250
1251 let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build();
1254 block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
1255 .unwrap()
1256 .expect_watcher();
1257 assert_eq!(pool.validated_pool().status().ready, 1);
1258
1259 let xt = uxt(Transfer {
1262 from: Alice.into(),
1263 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1264 amount: 5,
1265 nonce: 0,
1266 });
1267 let watcher = block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into()))
1268 .unwrap()
1269 .expect_watcher();
1270 assert_eq!(pool.validated_pool().status().ready, 2);
1271
1272 let xt = ExtrinsicBuilder::new_indexed_call(Vec::new()).build();
1276 block_on(pool.submit_one(&api.expect_hash_and_number(1), SOURCE, xt.into()))
1277 .unwrap();
1278 assert_eq!(pool.validated_pool().status().ready, 2);
1279
1280 let mut stream = futures::executor::block_on_stream(watcher.into_stream());
1282 assert_eq!(stream.next(), Some(TransactionStatus::Ready));
1283 assert_eq!(stream.next(), Some(TransactionStatus::Dropped));
1284 }
1285 }
1286
1287 #[test]
1288 fn should_handle_pruning_in_the_middle_of_import() {
1289 let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
1291 let (tx, rx) = std::sync::mpsc::sync_channel(1);
1292 let mut api = TestApi::default();
1293 api.delay = Arc::new(Mutex::new(rx.into()));
1294 let api = Arc::new(api);
1295 let pool = Arc::new(Pool::new_with_staticly_sized_rotator(
1296 Default::default(),
1297 true.into(),
1298 api.clone(),
1299 ));
1300
1301 let han_of_block0 = api.expect_hash_and_number(0);
1302
1303 let xt = uxt(Transfer {
1305 from: Alice.into(),
1306 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1307 amount: 5,
1308 nonce: 1,
1309 });
1310
1311 let pool2 = pool.clone();
1313 std::thread::spawn({
1314 let hash_of_block0 = han_of_block0.clone();
1315 move || {
1316 block_on(pool2.submit_one(&hash_of_block0, SOURCE, xt.into())).unwrap();
1317 ready.send(()).unwrap();
1318 }
1319 });
1320
1321 let xt = uxt(Transfer {
1324 from: Alice.into(),
1325 to: AccountId::from_h256(H256::from_low_u64_be(2)),
1326 amount: 4,
1327 nonce: 0,
1328 });
1329 let provides = vec![0_u8];
1331 block_on(pool.submit_one(&han_of_block0, SOURCE, xt.into())).unwrap();
1332 assert_eq!(pool.validated_pool().status().ready, 1);
1333
1334 block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![provides], vec![]));
1337 assert_eq!(pool.validated_pool().status().ready, 0);
1338
1339 tx.send(()).unwrap();
1343
1344 is_ready.recv().unwrap(); assert_eq!(pool.validated_pool().status().ready, 1);
1347 assert_eq!(pool.validated_pool().status().future, 0);
1348 }
1349 }
1350}