1use std::{cmp::Ordering, collections::HashSet, fmt, hash, sync::Arc, time::Instant};
12
13use crate::LOG_TARGET;
14use serde::Serialize;
15use soil_client::transaction_pool::{error, InPoolTransaction, PoolStatus};
16use subsoil::core::hexdisplay::HexDisplay;
17use subsoil::runtime::{
18 traits::Member,
19 transaction_validity::{
20 TransactionLongevity as Longevity, TransactionPriority as Priority, TransactionSource,
21 TransactionTag as Tag,
22 },
23};
24use tracing::{trace, warn};
25
26use super::{
27 future::{FutureTransactions, WaitingTransaction},
28 ready::{BestIterator, ReadyTransactions, TransactionRef},
29};
30
31#[derive(Debug, PartialEq, Eq)]
33pub enum Imported<Hash, Ex> {
34 Ready {
36 hash: Hash,
38 promoted: Vec<Hash>,
40 failed: Vec<Hash>,
42 removed: Vec<Arc<Transaction<Hash, Ex>>>,
44 },
45 Future {
47 hash: Hash,
49 },
50}
51
52impl<Hash, Ex> Imported<Hash, Ex> {
53 pub fn hash(&self) -> &Hash {
55 use self::Imported::*;
56 match *self {
57 Ready { ref hash, .. } => hash,
58 Future { ref hash, .. } => hash,
59 }
60 }
61}
62
63#[derive(Debug)]
65pub struct PruneStatus<Hash, Ex> {
66 pub promoted: Vec<Imported<Hash, Ex>>,
68 pub failed: Vec<Hash>,
70 pub pruned: Vec<Arc<Transaction<Hash, Ex>>>,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
76pub struct TimedTransactionSource {
77 pub source: TransactionSource,
79
80 pub timestamp: Option<Instant>,
82}
83
84impl From<TimedTransactionSource> for TransactionSource {
85 fn from(value: TimedTransactionSource) -> Self {
86 value.source
87 }
88}
89
90impl TimedTransactionSource {
91 pub fn new_in_block(with_timestamp: bool) -> Self {
94 Self { source: TransactionSource::InBlock, timestamp: with_timestamp.then(Instant::now) }
95 }
96 pub fn new_external(with_timestamp: bool) -> Self {
99 Self { source: TransactionSource::External, timestamp: with_timestamp.then(Instant::now) }
100 }
101 pub fn new_local(with_timestamp: bool) -> Self {
104 Self { source: TransactionSource::Local, timestamp: with_timestamp.then(Instant::now) }
105 }
106 pub fn from_transaction_source(source: TransactionSource, with_timestamp: bool) -> Self {
108 Self { source, timestamp: with_timestamp.then(Instant::now) }
109 }
110}
111
112#[derive(PartialEq, Eq, Clone)]
114pub struct Transaction<Hash, Extrinsic> {
115 pub data: Extrinsic,
117 pub bytes: usize,
119 pub hash: Hash,
121 pub priority: Priority,
123 pub valid_till: Longevity,
125 pub requires: Vec<Tag>,
127 pub provides: Vec<Tag>,
129 pub propagate: bool,
131 pub source: TimedTransactionSource,
133}
134
135impl<Hash, Extrinsic> AsRef<Extrinsic> for Transaction<Hash, Extrinsic> {
136 fn as_ref(&self) -> &Extrinsic {
137 &self.data
138 }
139}
140
141impl<Hash, Extrinsic> InPoolTransaction for Transaction<Hash, Extrinsic> {
142 type Transaction = Extrinsic;
143 type Hash = Hash;
144
145 fn data(&self) -> &Extrinsic {
146 &self.data
147 }
148
149 fn hash(&self) -> &Hash {
150 &self.hash
151 }
152
153 fn priority(&self) -> &Priority {
154 &self.priority
155 }
156
157 fn longevity(&self) -> &Longevity {
158 &self.valid_till
159 }
160
161 fn requires(&self) -> &[Tag] {
162 &self.requires
163 }
164
165 fn provides(&self) -> &[Tag] {
166 &self.provides
167 }
168
169 fn is_propagable(&self) -> bool {
170 self.propagate
171 }
172}
173
174impl<Hash: Clone, Extrinsic: Clone> Transaction<Hash, Extrinsic> {
175 pub fn duplicate(&self) -> Self {
181 Self {
182 data: self.data.clone(),
183 bytes: self.bytes,
184 hash: self.hash.clone(),
185 priority: self.priority,
186 source: self.source.clone(),
187 valid_till: self.valid_till,
188 requires: self.requires.clone(),
189 provides: self.provides.clone(),
190 propagate: self.propagate,
191 }
192 }
193}
194
195impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic>
196where
197 Hash: fmt::Debug,
198 Extrinsic: fmt::Debug,
199{
200 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
201 let join_tags = |tags: &[Tag]| {
202 tags.iter()
203 .map(|tag| HexDisplay::from(tag).to_string())
204 .collect::<Vec<_>>()
205 .join(", ")
206 };
207
208 write!(fmt, "Transaction {{ ")?;
209 write!(fmt, "hash: {:?}, ", &self.hash)?;
210 write!(fmt, "priority: {:?}, ", &self.priority)?;
211 write!(fmt, "valid_till: {:?}, ", &self.valid_till)?;
212 write!(fmt, "bytes: {:?}, ", &self.bytes)?;
213 write!(fmt, "propagate: {:?}, ", &self.propagate)?;
214 write!(fmt, "source: {:?}, ", &self.source)?;
215 write!(fmt, "requires: [{}], ", join_tags(&self.requires))?;
216 write!(fmt, "provides: [{}], ", join_tags(&self.provides))?;
217 write!(fmt, "data: {:?}", &self.data)?;
218 write!(fmt, "}}")?;
219 Ok(())
220 }
221}
222
223const RECENTLY_PRUNED_TAGS: usize = 2;
225
226#[derive(Clone, Debug)]
237pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
238 reject_future_transactions: bool,
239 future: FutureTransactions<Hash, Ex>,
240 ready: ReadyTransactions<Hash, Ex>,
241 recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],
246 recently_pruned_index: usize,
247}
248
249impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> Default for BasePool<Hash, Ex> {
250 fn default() -> Self {
251 Self::new(false)
252 }
253}
254
255impl<Hash: hash::Hash + Member + Serialize, Ex: std::fmt::Debug> BasePool<Hash, Ex> {
256 pub fn new(reject_future_transactions: bool) -> Self {
258 Self {
259 reject_future_transactions,
260 future: Default::default(),
261 ready: Default::default(),
262 recently_pruned: Default::default(),
263 recently_pruned_index: 0,
264 }
265 }
266
267 pub fn clear_recently_pruned(&mut self) {
269 self.recently_pruned = Default::default();
270 self.recently_pruned_index = 0;
271 }
272
273 pub(crate) fn with_futures_enabled<T>(
279 &mut self,
280 closure: impl FnOnce(&mut Self, bool) -> T,
281 ) -> T {
282 let previous = self.reject_future_transactions;
283 self.reject_future_transactions = false;
284 let return_value = closure(self, previous);
285 self.reject_future_transactions = previous;
286 return_value
287 }
288
289 pub fn is_imported(&self, tx_hash: &Hash) -> bool {
291 self.future.contains(tx_hash) || self.ready.contains(tx_hash)
292 }
293
294 pub fn import(&mut self, tx: Transaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {
302 if self.is_imported(&tx.hash) {
303 return Err(error::Error::AlreadyImported(Box::new(tx.hash)));
304 }
305
306 let tx = WaitingTransaction::new(tx, self.ready.provided_tags(), &self.recently_pruned);
307 trace!(
308 target: LOG_TARGET,
309 tx_hash = ?tx.transaction.hash,
310 ?tx,
311 set = if tx.is_ready() { "ready" } else { "future" },
312 "Importing transaction"
313 );
314
315 if !tx.is_ready() {
317 if self.reject_future_transactions {
318 return Err(error::Error::RejectedFutureTransaction);
319 }
320
321 let hash = tx.transaction.hash.clone();
322 self.future.import(tx);
323 return Ok(Imported::Future { hash });
324 }
325
326 self.import_to_ready(tx)
327 }
328
329 fn import_to_ready(
333 &mut self,
334 tx: WaitingTransaction<Hash, Ex>,
335 ) -> error::Result<Imported<Hash, Ex>> {
336 let tx_hash = tx.transaction.hash.clone();
337 let mut promoted = vec![];
338 let mut failed = vec![];
339 let mut removed = vec![];
340
341 let mut first = true;
342 let mut to_import = vec![tx];
343
344 while let Some(tx) = to_import.pop() {
346 to_import.append(&mut self.future.satisfy_tags(&tx.transaction.provides));
348
349 let current_hash = tx.transaction.hash.clone();
351 let current_tx = tx.transaction.clone();
352 match self.ready.import(tx) {
353 Ok(mut replaced) => {
354 if !first {
355 promoted.push(current_hash.clone());
356 }
357 promoted.retain(|hash| replaced.iter().all(|tx| *hash != tx.hash));
360 removed.append(&mut replaced);
363 },
364 Err(error @ error::Error::TooLowPriority { .. }) => {
365 trace!(
366 target: LOG_TARGET,
367 tx_hash = ?current_tx.hash,
368 ?first,
369 %error,
370 "Error importing transaction"
371 );
372 if first {
373 return Err(error);
374 } else {
375 removed.push(current_tx);
376 promoted.retain(|hash| *hash != current_hash);
377 }
378 },
379 Err(error) => {
381 trace!(
382 target: LOG_TARGET,
383 tx_hash = ?current_tx.hash,
384 ?error,
385 first,
386 "Error importing transaction"
387 );
388 if first {
389 return Err(error);
390 } else {
391 failed.push(current_tx.hash.clone());
392 }
393 },
394 }
395 first = false;
396 }
397
398 if removed.iter().any(|tx| tx.hash == tx_hash) {
404 self.ready.remove_subtree(&promoted);
407
408 trace!(
409 target: LOG_TARGET,
410 ?tx_hash,
411 "Cycle detected, bailing."
412 );
413 return Err(error::Error::CycleDetected);
414 }
415
416 Ok(Imported::Ready { hash: tx_hash, promoted, failed, removed })
417 }
418
419 pub fn ready(&self) -> BestIterator<Hash, Ex> {
421 self.ready.get()
422 }
423
424 pub fn futures(&self) -> impl Iterator<Item = &Transaction<Hash, Ex>> {
426 self.future.all()
427 }
428
429 pub fn by_hashes(&self, hashes: &[Hash]) -> Vec<Option<Arc<Transaction<Hash, Ex>>>> {
434 let ready = self.ready.by_hashes(hashes);
435 let future = self.future.by_hashes(hashes);
436
437 ready.into_iter().zip(future).map(|(a, b)| a.or(b)).collect()
438 }
439
440 pub fn ready_by_hash(&self, hash: &Hash) -> Option<Arc<Transaction<Hash, Ex>>> {
442 self.ready.by_hash(hash)
443 }
444
445 pub fn enforce_limits(
452 &mut self,
453 ready: &Limit,
454 future: &Limit,
455 ) -> Vec<Arc<Transaction<Hash, Ex>>> {
456 let mut removed = vec![];
457
458 while ready.is_exceeded(self.ready.len(), self.ready.bytes()) {
459 let worst =
461 self.ready.fold::<Option<TransactionRef<Hash, Ex>>, _>(None, |worst, current| {
462 let transaction = ¤t.transaction;
463 worst
464 .map(|worst| {
465 match worst.transaction.priority.cmp(&transaction.transaction.priority)
470 {
471 Ordering::Less => worst,
472 Ordering::Equal => {
473 if worst.insertion_id > transaction.insertion_id {
474 transaction.clone()
475 } else {
476 worst
477 }
478 },
479 Ordering::Greater => transaction.clone(),
480 }
481 })
482 .or_else(|| Some(transaction.clone()))
483 });
484
485 if let Some(worst) = worst {
486 removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
487 } else {
488 break;
489 }
490 }
491
492 while future.is_exceeded(self.future.len(), self.future.bytes()) {
493 let worst = self.future.fold(|worst, current| match worst {
495 None => Some(current.clone()),
496 Some(worst) => Some(
497 match (worst.transaction.source.timestamp, current.transaction.source.timestamp)
498 {
499 (Some(worst_timestamp), Some(current_timestamp)) => {
500 if worst_timestamp > current_timestamp {
501 current.clone()
502 } else {
503 worst
504 }
505 },
506 _ => {
507 if worst.imported_at > current.imported_at {
508 current.clone()
509 } else {
510 worst
511 }
512 },
513 },
514 ),
515 });
516
517 if let Some(worst) = worst {
518 removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()]))
519 } else {
520 break;
521 }
522 }
523
524 removed
525 }
526
527 pub fn remove_subtree(&mut self, hashes: &[Hash]) -> Vec<Arc<Transaction<Hash, Ex>>> {
536 let mut removed = self.ready.remove_subtree(hashes);
537 removed.extend(self.future.remove(hashes));
538 removed
539 }
540
541 pub fn clear_future(&mut self) -> Vec<Arc<Transaction<Hash, Ex>>> {
543 self.future.clear()
544 }
545
546 pub fn prune_tags(&mut self, tags: impl IntoIterator<Item = Tag>) -> PruneStatus<Hash, Ex> {
553 let mut to_import = vec![];
554 let mut pruned = vec![];
555 let recently_pruned = &mut self.recently_pruned[self.recently_pruned_index];
556 self.recently_pruned_index = (self.recently_pruned_index + 1) % RECENTLY_PRUNED_TAGS;
557 recently_pruned.clear();
558
559 let tags = tags.into_iter().collect::<Vec<_>>();
560 let futures_removed = self.future.prune_tags(&tags);
561
562 for tag in tags {
563 to_import.append(&mut self.future.satisfy_tags(std::iter::once(&tag)));
565 pruned.append(&mut self.ready.prune_tags(tag.clone()));
567 recently_pruned.insert(tag);
569 }
570
571 let mut promoted = vec![];
572 let mut failed = vec![];
573 for tx in futures_removed {
574 failed.push(tx.hash.clone());
575 }
576
577 for tx in to_import {
578 let tx_hash = tx.transaction.hash.clone();
579 match self.import_to_ready(tx) {
580 Ok(res) => promoted.push(res),
581 Err(error) => {
582 warn!(
583 target: LOG_TARGET,
584 ?tx_hash,
585 ?error,
586 "Failed to promote during pruning."
587 );
588 failed.push(tx_hash)
589 },
590 }
591 }
592
593 PruneStatus { pruned, failed, promoted }
594 }
595
596 pub fn status(&self) -> PoolStatus {
598 PoolStatus {
599 ready: self.ready.len(),
600 ready_bytes: self.ready.bytes(),
601 future: self.future.len(),
602 future_bytes: self.future.bytes(),
603 }
604 }
605}
606
607#[derive(Debug, Clone)]
609pub struct Limit {
610 pub count: usize,
612 pub total_bytes: usize,
614}
615
616impl Limit {
617 pub fn is_exceeded(&self, count: usize, bytes: usize) -> bool {
619 self.count < count || self.total_bytes < bytes
620 }
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626
627 type Hash = u64;
628
629 fn pool() -> BasePool<Hash, Vec<u8>> {
630 BasePool::default()
631 }
632
633 fn default_tx() -> Transaction<Hash, Vec<u8>> {
634 Transaction {
635 data: vec![],
636 bytes: 1,
637 hash: 1u64,
638 priority: 5u64,
639 valid_till: 64u64,
640 requires: vec![],
641 provides: vec![],
642 propagate: true,
643 source: TimedTransactionSource::new_external(false),
644 }
645 }
646
647 #[test]
648 fn prune_for_ready_works() {
649 let mut pool = pool();
651
652 pool.import(Transaction {
654 data: vec![1u8].into(),
655 provides: vec![vec![2]],
656 ..default_tx().clone()
657 })
658 .unwrap();
659
660 assert_eq!(pool.ready().count(), 1);
662 assert_eq!(pool.ready.len(), 1);
663
664 let result = pool.prune_tags(vec![vec![2]]);
665 assert_eq!(pool.ready().count(), 0);
666 assert_eq!(pool.ready.len(), 0);
667 assert_eq!(result.pruned.len(), 1);
668 assert_eq!(result.failed.len(), 0);
669 assert_eq!(result.promoted.len(), 0);
670 }
671
672 #[test]
673 fn prune_for_future_works() {
674 let mut pool = pool();
676
677 pool.import(Transaction {
679 data: vec![1u8].into(),
680 requires: vec![vec![1]],
681 provides: vec![vec![2]],
682 hash: 0xaa,
683 ..default_tx().clone()
684 })
685 .unwrap();
686
687 assert_eq!(pool.futures().count(), 1);
689 assert_eq!(pool.future.len(), 1);
690
691 let result = pool.prune_tags(vec![vec![2]]);
692 assert_eq!(pool.ready().count(), 0);
693 assert_eq!(pool.ready.len(), 0);
694 assert_eq!(pool.futures().count(), 0);
695 assert_eq!(pool.future.len(), 0);
696
697 assert_eq!(result.pruned.len(), 0);
698 assert_eq!(result.failed.len(), 1);
699 assert_eq!(result.failed[0], 0xaa);
700 assert_eq!(result.promoted.len(), 0);
701 }
702
703 #[test]
704 fn should_import_transaction_to_ready() {
705 let mut pool = pool();
707
708 pool.import(Transaction {
710 data: vec![1u8].into(),
711 provides: vec![vec![1]],
712 ..default_tx().clone()
713 })
714 .unwrap();
715
716 assert_eq!(pool.ready().count(), 1);
718 assert_eq!(pool.ready.len(), 1);
719 }
720
721 #[test]
722 fn should_not_import_same_transaction_twice() {
723 let mut pool = pool();
725
726 pool.import(Transaction {
728 data: vec![1u8].into(),
729 provides: vec![vec![1]],
730 ..default_tx().clone()
731 })
732 .unwrap();
733 pool.import(Transaction {
734 data: vec![1u8].into(),
735 provides: vec![vec![1]],
736 ..default_tx().clone()
737 })
738 .unwrap_err();
739
740 assert_eq!(pool.ready().count(), 1);
742 assert_eq!(pool.ready.len(), 1);
743 }
744
745 #[test]
746 fn should_import_transaction_to_future_and_promote_it_later() {
747 let mut pool = pool();
749
750 pool.import(Transaction {
752 data: vec![1u8].into(),
753 requires: vec![vec![0]],
754 provides: vec![vec![1]],
755 ..default_tx().clone()
756 })
757 .unwrap();
758 assert_eq!(pool.ready().count(), 0);
759 assert_eq!(pool.ready.len(), 0);
760 pool.import(Transaction {
761 data: vec![2u8].into(),
762 hash: 2,
763 provides: vec![vec![0]],
764 ..default_tx().clone()
765 })
766 .unwrap();
767
768 assert_eq!(pool.ready().count(), 2);
770 assert_eq!(pool.ready.len(), 2);
771 }
772
773 #[test]
774 fn should_promote_a_subgraph() {
775 let mut pool = pool();
777
778 pool.import(Transaction {
780 data: vec![1u8].into(),
781 requires: vec![vec![0]],
782 provides: vec![vec![1]],
783 ..default_tx().clone()
784 })
785 .unwrap();
786 pool.import(Transaction {
787 data: vec![3u8].into(),
788 hash: 3,
789 requires: vec![vec![2]],
790 ..default_tx().clone()
791 })
792 .unwrap();
793 pool.import(Transaction {
794 data: vec![2u8].into(),
795 hash: 2,
796 requires: vec![vec![1]],
797 provides: vec![vec![3], vec![2]],
798 ..default_tx().clone()
799 })
800 .unwrap();
801 pool.import(Transaction {
802 data: vec![4u8].into(),
803 hash: 4,
804 priority: 1_000u64,
805 requires: vec![vec![3], vec![4]],
806 ..default_tx().clone()
807 })
808 .unwrap();
809 assert_eq!(pool.ready().count(), 0);
810 assert_eq!(pool.ready.len(), 0);
811
812 let res = pool
813 .import(Transaction {
814 data: vec![5u8].into(),
815 hash: 5,
816 provides: vec![vec![0], vec![4]],
817 ..default_tx().clone()
818 })
819 .unwrap();
820
821 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
823
824 assert_eq!(it.next(), Some(5));
825 assert_eq!(it.next(), Some(1));
826 assert_eq!(it.next(), Some(2));
827 assert_eq!(it.next(), Some(4));
828 assert_eq!(it.next(), Some(3));
829 assert_eq!(it.next(), None);
830 assert_eq!(
831 res,
832 Imported::Ready {
833 hash: 5,
834 promoted: vec![1, 2, 3, 4],
835 failed: vec![],
836 removed: vec![],
837 }
838 );
839 }
840
841 #[test]
842 fn should_remove_conflicting_future() {
843 let mut pool = pool();
844 pool.import(Transaction {
845 data: vec![3u8].into(),
846 hash: 3,
847 requires: vec![vec![1]],
848 priority: 50u64,
849 provides: vec![vec![3]],
850 ..default_tx().clone()
851 })
852 .unwrap();
853 assert_eq!(pool.ready().count(), 0);
854 assert_eq!(pool.ready.len(), 0);
855
856 let tx2 = Transaction {
857 data: vec![2u8].into(),
858 hash: 2,
859 requires: vec![vec![1]],
860 provides: vec![vec![3]],
861 ..default_tx().clone()
862 };
863 pool.import(tx2.clone()).unwrap();
864 assert_eq!(pool.future.len(), 2);
865
866 let res = pool
867 .import(Transaction {
868 data: vec![1u8].into(),
869 hash: 1,
870 provides: vec![vec![1]],
871 ..default_tx().clone()
872 })
873 .unwrap();
874
875 assert_eq!(
876 res,
877 Imported::Ready {
878 hash: 1,
879 promoted: vec![3],
880 failed: vec![],
881 removed: vec![tx2.into()]
882 }
883 );
884
885 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
886 assert_eq!(it.next(), Some(1));
887 assert_eq!(it.next(), Some(3));
888 assert_eq!(it.next(), None);
889
890 assert_eq!(pool.future.len(), 0);
891 }
892
893 #[test]
894 fn should_handle_a_cycle() {
895 let mut pool = pool();
897 pool.import(Transaction {
898 data: vec![1u8].into(),
899 requires: vec![vec![0]],
900 provides: vec![vec![1]],
901 ..default_tx().clone()
902 })
903 .unwrap();
904 pool.import(Transaction {
905 data: vec![3u8].into(),
906 hash: 3,
907 requires: vec![vec![1]],
908 provides: vec![vec![2]],
909 ..default_tx().clone()
910 })
911 .unwrap();
912 assert_eq!(pool.ready().count(), 0);
913 assert_eq!(pool.ready.len(), 0);
914
915 let tx2 = Transaction {
917 data: vec![2u8].into(),
918 hash: 2,
919 requires: vec![vec![2]],
920 provides: vec![vec![0]],
921 ..default_tx().clone()
922 };
923 pool.import(tx2.clone()).unwrap();
924
925 {
927 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
928 assert_eq!(it.next(), None);
929 }
930 assert_eq!(pool.future.len(), 3);
932
933 let res = pool
935 .import(Transaction {
936 data: vec![4u8].into(),
937 hash: 4,
938 priority: 50u64,
939 provides: vec![vec![0]],
940 ..default_tx().clone()
941 })
942 .unwrap();
943 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
944 assert_eq!(it.next(), Some(4));
945 assert_eq!(it.next(), Some(1));
946 assert_eq!(it.next(), Some(3));
947 assert_eq!(it.next(), None);
948 assert_eq!(
949 res,
950 Imported::Ready {
951 hash: 4,
952 promoted: vec![1, 3],
953 failed: vec![],
954 removed: vec![tx2.into()]
955 }
956 );
957 assert_eq!(pool.future.len(), 0);
958 }
959
960 #[test]
961 fn should_handle_a_cycle_with_low_priority() {
962 let mut pool = pool();
964 pool.import(Transaction {
965 data: vec![1u8].into(),
966 requires: vec![vec![0]],
967 provides: vec![vec![1]],
968 ..default_tx().clone()
969 })
970 .unwrap();
971 pool.import(Transaction {
972 data: vec![3u8].into(),
973 hash: 3,
974 requires: vec![vec![1]],
975 provides: vec![vec![2]],
976 ..default_tx().clone()
977 })
978 .unwrap();
979 assert_eq!(pool.ready().count(), 0);
980 assert_eq!(pool.ready.len(), 0);
981
982 pool.import(Transaction {
984 data: vec![2u8].into(),
985 hash: 2,
986 requires: vec![vec![2]],
987 provides: vec![vec![0]],
988 ..default_tx().clone()
989 })
990 .unwrap();
991
992 {
994 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
995 assert_eq!(it.next(), None);
996 }
997 assert_eq!(pool.future.len(), 3);
999
1000 let err = pool
1002 .import(Transaction {
1003 data: vec![4u8].into(),
1004 hash: 4,
1005 priority: 1u64, provides: vec![vec![0]],
1007 ..default_tx().clone()
1008 })
1009 .unwrap_err();
1010 let mut it = pool.ready().into_iter().map(|tx| tx.data[0]);
1011 assert_eq!(it.next(), None);
1012 assert_eq!(pool.ready.len(), 0);
1013 assert_eq!(pool.future.len(), 0);
1014 if let error::Error::CycleDetected = err {
1015 } else {
1016 assert!(false, "Invalid error kind: {:?}", err);
1017 }
1018 }
1019
1020 #[test]
1021 fn should_remove_invalid_transactions() {
1022 let mut pool = pool();
1024 pool.import(Transaction {
1025 data: vec![5u8].into(),
1026 hash: 5,
1027 provides: vec![vec![0], vec![4]],
1028 ..default_tx().clone()
1029 })
1030 .unwrap();
1031 pool.import(Transaction {
1032 data: vec![1u8].into(),
1033 requires: vec![vec![0]],
1034 provides: vec![vec![1]],
1035 ..default_tx().clone()
1036 })
1037 .unwrap();
1038 pool.import(Transaction {
1039 data: vec![3u8].into(),
1040 hash: 3,
1041 requires: vec![vec![2]],
1042 ..default_tx().clone()
1043 })
1044 .unwrap();
1045 pool.import(Transaction {
1046 data: vec![2u8].into(),
1047 hash: 2,
1048 requires: vec![vec![1]],
1049 provides: vec![vec![3], vec![2]],
1050 ..default_tx().clone()
1051 })
1052 .unwrap();
1053 pool.import(Transaction {
1054 data: vec![4u8].into(),
1055 hash: 4,
1056 priority: 1_000u64,
1057 requires: vec![vec![3], vec![4]],
1058 ..default_tx().clone()
1059 })
1060 .unwrap();
1061 pool.import(Transaction {
1063 data: vec![6u8].into(),
1064 hash: 6,
1065 priority: 1_000u64,
1066 requires: vec![vec![11]],
1067 ..default_tx().clone()
1068 })
1069 .unwrap();
1070 assert_eq!(pool.ready().count(), 5);
1071 assert_eq!(pool.future.len(), 1);
1072
1073 pool.remove_subtree(&[6, 1]);
1075
1076 assert_eq!(pool.ready().count(), 1);
1078 assert_eq!(pool.future.len(), 0);
1079 }
1080
1081 #[test]
1082 fn should_prune_ready_transactions() {
1083 let mut pool = pool();
1085 pool.import(Transaction {
1087 data: vec![5u8].into(),
1088 hash: 5,
1089 requires: vec![vec![0]],
1090 provides: vec![vec![100]],
1091 ..default_tx().clone()
1092 })
1093 .unwrap();
1094 pool.import(Transaction {
1096 data: vec![1u8].into(),
1097 provides: vec![vec![1]],
1098 ..default_tx().clone()
1099 })
1100 .unwrap();
1101 pool.import(Transaction {
1102 data: vec![2u8].into(),
1103 hash: 2,
1104 requires: vec![vec![2]],
1105 provides: vec![vec![3]],
1106 ..default_tx().clone()
1107 })
1108 .unwrap();
1109 pool.import(Transaction {
1110 data: vec![3u8].into(),
1111 hash: 3,
1112 requires: vec![vec![1]],
1113 provides: vec![vec![2]],
1114 ..default_tx().clone()
1115 })
1116 .unwrap();
1117 pool.import(Transaction {
1118 data: vec![4u8].into(),
1119 hash: 4,
1120 priority: 1_000u64,
1121 requires: vec![vec![3], vec![2]],
1122 provides: vec![vec![4]],
1123 ..default_tx().clone()
1124 })
1125 .unwrap();
1126
1127 assert_eq!(pool.ready().count(), 4);
1128 assert_eq!(pool.future.len(), 1);
1129
1130 let result = pool.prune_tags(vec![vec![0], vec![2]]);
1132
1133 assert_eq!(result.pruned.len(), 2);
1135 assert_eq!(result.failed.len(), 0);
1136 assert_eq!(
1137 result.promoted[0],
1138 Imported::Ready { hash: 5, promoted: vec![], failed: vec![], removed: vec![] }
1139 );
1140 assert_eq!(result.promoted.len(), 1);
1141 assert_eq!(pool.future.len(), 0);
1142 assert_eq!(pool.ready.len(), 3);
1143 assert_eq!(pool.ready().count(), 3);
1144 }
1145
1146 #[test]
1147 fn transaction_debug() {
1148 assert_eq!(
1149 format!(
1150 "{:?}",
1151 Transaction {
1152 data: vec![4u8].into(),
1153 hash: 4,
1154 priority: 1_000u64,
1155 requires: vec![vec![3], vec![2]],
1156 provides: vec![vec![4]],
1157 ..default_tx().clone()
1158 }
1159 ),
1160 "Transaction { \
1161hash: 4, priority: 1000, valid_till: 64, bytes: 1, propagate: true, \
1162source: TimedTransactionSource { source: External, timestamp: None }, requires: [03, 02], provides: [04], data: [4]}"
1163 .to_owned()
1164 );
1165 }
1166
1167 #[test]
1168 fn transaction_propagation() {
1169 assert_eq!(
1170 Transaction {
1171 data: vec![4u8].into(),
1172 hash: 4,
1173 priority: 1_000u64,
1174 requires: vec![vec![3], vec![2]],
1175 provides: vec![vec![4]],
1176 ..default_tx().clone()
1177 }
1178 .is_propagable(),
1179 true
1180 );
1181
1182 assert_eq!(
1183 Transaction {
1184 data: vec![4u8].into(),
1185 hash: 4,
1186 priority: 1_000u64,
1187 requires: vec![vec![3], vec![2]],
1188 provides: vec![vec![4]],
1189 propagate: false,
1190 ..default_tx().clone()
1191 }
1192 .is_propagable(),
1193 false
1194 );
1195 }
1196
1197 #[test]
1198 fn should_reject_future_transactions() {
1199 let mut pool = pool();
1201
1202 pool.reject_future_transactions = true;
1204
1205 let err = pool.import(Transaction {
1207 data: vec![5u8].into(),
1208 hash: 5,
1209 requires: vec![vec![0]],
1210 ..default_tx().clone()
1211 });
1212
1213 if let Err(error::Error::RejectedFutureTransaction) = err {
1214 } else {
1215 assert!(false, "Invalid error kind: {:?}", err);
1216 }
1217 }
1218
1219 #[test]
1220 fn should_clear_future_queue() {
1221 let mut pool = pool();
1223
1224 pool.import(Transaction {
1226 data: vec![5u8].into(),
1227 hash: 5,
1228 requires: vec![vec![0]],
1229 ..default_tx().clone()
1230 })
1231 .unwrap();
1232
1233 assert_eq!(pool.future.len(), 1);
1235
1236 assert_eq!(pool.clear_future().len(), 1);
1238
1239 assert_eq!(pool.future.len(), 0);
1241 }
1242
1243 #[test]
1244 fn should_accept_future_transactions_when_explicitly_asked_to() {
1245 let mut pool = pool();
1247 pool.reject_future_transactions = true;
1248
1249 let flag_value = pool.with_futures_enabled(|pool, flag| {
1251 pool.import(Transaction {
1252 data: vec![5u8].into(),
1253 hash: 5,
1254 requires: vec![vec![0]],
1255 ..default_tx().clone()
1256 })
1257 .unwrap();
1258
1259 flag
1260 });
1261
1262 assert_eq!(flag_value, true);
1264 assert_eq!(pool.reject_future_transactions, true);
1265 assert_eq!(pool.future.len(), 1);
1266 }
1267}