1use log::{trace, warn};
10use std::collections::{hash_map, BTreeSet, HashMap};
11use std::slice;
12use std::sync::Arc;
13
14use crate::{
15 error,
16 listener::{Listener, NoopListener},
17 options::Options,
18 ready::{Readiness, Ready},
19 replace::{ReplaceTransaction, ShouldReplace},
20 scoring::{self, ScoreWithRef, Scoring},
21 status::{LightStatus, Status},
22 transactions::{AddResult, Transactions},
23 VerifiedTransaction,
24};
25
26#[derive(Debug)]
32pub struct Transaction<T> {
33 pub insertion_id: u64,
35 pub transaction: Arc<T>,
37}
38
39impl<T> Clone for Transaction<T> {
40 fn clone(&self) -> Self {
41 Transaction { insertion_id: self.insertion_id, transaction: self.transaction.clone() }
42 }
43}
44
45impl<T> ::std::ops::Deref for Transaction<T> {
46 type Target = Arc<T>;
47
48 fn deref(&self) -> &Self::Target {
49 &self.transaction
50 }
51}
52
53#[derive(Debug)]
55pub struct Pool<T: VerifiedTransaction, S: Scoring<T>, L = NoopListener> {
56 listener: L,
57 scoring: S,
58 options: Options,
59 mem_usage: usize,
60
61 transactions: HashMap<T::Sender, Transactions<T, S>>,
62 by_hash: HashMap<T::Hash, Transaction<T>>,
63
64 best_transactions: BTreeSet<ScoreWithRef<T, S::Score>>,
65 worst_transactions: BTreeSet<ScoreWithRef<T, S::Score>>,
66
67 insertion_id: u64,
68}
69
70impl<T: VerifiedTransaction, S: Scoring<T> + Default> Default for Pool<T, S> {
71 fn default() -> Self {
72 Self::with_scoring(S::default(), Options::default())
73 }
74}
75
76impl<T: VerifiedTransaction, S: Scoring<T> + Default> Pool<T, S> {
77 pub fn with_options(options: Options) -> Self {
80 Self::with_scoring(S::default(), options)
81 }
82}
83
84impl<T: VerifiedTransaction, S: Scoring<T>> Pool<T, S> {
85 pub fn with_scoring(scoring: S, options: Options) -> Self {
87 Self::new(NoopListener, scoring, options)
88 }
89}
90
91const INITIAL_NUMBER_OF_SENDERS: usize = 16;
92
93impl<T, S, L> Pool<T, S, L>
94where
95 T: VerifiedTransaction,
96 S: Scoring<T>,
97 L: Listener<T>,
98{
99 pub fn new(listener: L, scoring: S, options: Options) -> Self {
101 let transactions = HashMap::with_capacity(INITIAL_NUMBER_OF_SENDERS);
102 let by_hash = HashMap::with_capacity(options.max_count / 16);
103
104 Pool {
105 listener,
106 scoring,
107 options,
108 mem_usage: 0,
109 transactions,
110 by_hash,
111 best_transactions: Default::default(),
112 worst_transactions: Default::default(),
113 insertion_id: 0,
114 }
115 }
116
117 pub fn import(&mut self, transaction: T, replace: &dyn ShouldReplace<T>) -> error::Result<Arc<T>, T::Hash> {
131 let mem_usage = transaction.mem_usage();
132
133 if self.by_hash.contains_key(transaction.hash()) {
134 return Err(error::Error::AlreadyImported(transaction.hash().clone()));
135 }
136
137 self.insertion_id += 1;
138 let transaction = Transaction { insertion_id: self.insertion_id, transaction: Arc::new(transaction) };
139
140 {
143 let remove_worst = |s: &mut Self, transaction| match s.remove_worst(transaction, replace) {
144 Err(err) => {
145 s.listener.rejected(transaction, &err);
146 Err(err)
147 }
148 Ok(None) => Ok(false),
149 Ok(Some(removed)) => {
150 s.listener.dropped(&removed, Some(transaction));
151 s.finalize_remove(removed.hash());
152 Ok(true)
153 }
154 };
155
156 while self.by_hash.len() + 1 > self.options.max_count {
157 trace!("Count limit reached: {} > {}", self.by_hash.len() + 1, self.options.max_count);
158 if !remove_worst(self, &transaction)? {
159 break;
160 }
161 }
162
163 while self.mem_usage + mem_usage > self.options.max_mem_usage {
164 trace!("Mem limit reached: {} > {}", self.mem_usage + mem_usage, self.options.max_mem_usage);
165 if !remove_worst(self, &transaction)? {
166 break;
167 }
168 }
169 }
170
171 let (result, prev_state, current_state) = {
172 let transactions =
173 self.transactions.entry(transaction.sender().clone()).or_insert_with(Transactions::default);
174 let prev = transactions.worst_and_best();
176 let result = transactions.add(transaction, &self.scoring, self.options.max_per_sender);
177 let current = transactions.worst_and_best();
178 (result, prev, current)
179 };
180
181 self.update_senders_worst_and_best(prev_state, current_state);
183
184 match result {
185 AddResult::Ok(tx) => {
186 self.listener.added(&tx, None);
187 self.finalize_insert(&tx, None);
188 Ok(tx.transaction)
189 }
190 AddResult::PushedOut { new, old } | AddResult::Replaced { new, old } => {
191 self.listener.added(&new, Some(&old));
192 self.finalize_insert(&new, Some(&old));
193 Ok(new.transaction)
194 }
195 AddResult::TooCheap { new, old } => {
196 let error = error::Error::TooCheapToReplace(old.hash().clone(), new.hash().clone());
197 self.listener.rejected(&new, &error);
198 return Err(error);
199 }
200 AddResult::TooCheapToEnter(new, score) => {
201 let error = error::Error::TooCheapToEnter(new.hash().clone(), format!("{:#x}", score));
202 self.listener.rejected(&new, &error);
203 return Err(error);
204 }
205 }
206 }
207
208 fn finalize_insert(&mut self, new: &Transaction<T>, old: Option<&Transaction<T>>) {
210 self.mem_usage += new.mem_usage();
211 self.by_hash.insert(new.hash().clone(), new.clone());
212
213 if let Some(old) = old {
214 self.finalize_remove(old.hash());
215 }
216 }
217
218 fn finalize_remove(&mut self, hash: &T::Hash) -> Option<Arc<T>> {
220 self.by_hash.remove(hash).map(|old| {
221 self.mem_usage -= old.transaction.mem_usage();
222 old.transaction
223 })
224 }
225
226 fn update_senders_worst_and_best(
228 &mut self,
229 previous: Option<((S::Score, Transaction<T>), (S::Score, Transaction<T>))>,
230 current: Option<((S::Score, Transaction<T>), (S::Score, Transaction<T>))>,
231 ) {
232 let worst_collection = &mut self.worst_transactions;
233 let best_collection = &mut self.best_transactions;
234
235 let is_same =
236 |a: &(S::Score, Transaction<T>), b: &(S::Score, Transaction<T>)| a.0 == b.0 && a.1.hash() == b.1.hash();
237
238 let update = |collection: &mut BTreeSet<_>, (score, tx), remove| {
239 if remove {
240 collection.remove(&ScoreWithRef::new(score, tx));
241 } else {
242 collection.insert(ScoreWithRef::new(score, tx));
243 }
244 };
245
246 match (previous, current) {
247 (None, Some((worst, best))) => {
248 update(worst_collection, worst, false);
249 update(best_collection, best, false);
250 }
251 (Some((worst, best)), None) => {
252 self.transactions.remove(worst.1.sender());
255 update(worst_collection, worst, true);
256 update(best_collection, best, true);
257 }
258 (Some((w1, b1)), Some((w2, b2))) => {
259 if !is_same(&w1, &w2) {
260 update(worst_collection, w1, true);
261 update(worst_collection, w2, false);
262 }
263 if !is_same(&b1, &b2) {
264 update(best_collection, b1, true);
265 update(best_collection, b2, false);
266 }
267 }
268 (None, None) => {}
269 }
270 }
271
272 fn remove_worst(
277 &mut self,
278 transaction: &Transaction<T>,
279 replace: &dyn ShouldReplace<T>,
280 ) -> error::Result<Option<Transaction<T>>, T::Hash> {
281 let to_remove = match self.worst_transactions.iter().next_back() {
282 None => {
284 warn!("The pool is full but there are no transactions to remove.");
285 return Err(error::Error::TooCheapToEnter(transaction.hash().clone(), "unknown".into()));
286 }
287 Some(old) => {
288 let txs = &self.transactions;
289 let get_replace_tx = |tx| {
290 let sender_txs = txs.get(transaction.sender()).map(|txs| txs.iter().as_slice());
291 ReplaceTransaction::new(tx, sender_txs)
292 };
293 let old_replace = get_replace_tx(&old.transaction);
294 let new_replace = get_replace_tx(transaction);
295
296 match replace.should_replace(&old_replace, &new_replace) {
297 scoring::Choice::InsertNew => None,
299 scoring::Choice::ReplaceOld => Some(old.clone()),
301 scoring::Choice::RejectNew => {
303 return Err(error::Error::TooCheapToEnter(
304 transaction.hash().clone(),
305 format!("{:#x}", old.score),
306 ))
307 }
308 }
309 }
310 };
311
312 if let Some(to_remove) = to_remove {
313 self.remove_from_set(to_remove.transaction.sender(), |set, scoring| {
315 set.remove(&to_remove.transaction, scoring)
316 });
317
318 Ok(Some(to_remove.transaction))
319 } else {
320 Ok(None)
321 }
322 }
323
324 fn remove_from_set<R, F: FnOnce(&mut Transactions<T, S>, &S) -> R>(
326 &mut self,
327 sender: &T::Sender,
328 f: F,
329 ) -> Option<R> {
330 let (prev, next, result) = if let Some(set) = self.transactions.get_mut(sender) {
331 let prev = set.worst_and_best();
332 let result = f(set, &self.scoring);
333 (prev, set.worst_and_best(), result)
334 } else {
335 return None;
336 };
337
338 self.update_senders_worst_and_best(prev, next);
339 Some(result)
340 }
341
342 pub fn clear(&mut self) {
346 self.mem_usage = 0;
347 self.transactions.clear();
348 self.best_transactions.clear();
349 self.worst_transactions.clear();
350
351 for (_hash, tx) in self.by_hash.drain() {
352 self.listener.dropped(&tx.transaction, None)
353 }
354 }
355
356 pub fn remove(&mut self, hash: &T::Hash, is_invalid: bool) -> Option<Arc<T>> {
360 if let Some(tx) = self.finalize_remove(hash) {
361 self.remove_from_set(tx.sender(), |set, scoring| set.remove(&tx, scoring));
362 if is_invalid {
363 self.listener.invalid(&tx);
364 } else {
365 self.listener.canceled(&tx);
366 }
367 Some(tx)
368 } else {
369 None
370 }
371 }
372
373 fn remove_stalled<R: Ready<T>>(&mut self, sender: &T::Sender, ready: &mut R) -> usize {
375 let removed_from_set = self.remove_from_set(sender, |transactions, scoring| transactions.cull(ready, scoring));
376
377 match removed_from_set {
378 Some(removed) => {
379 let len = removed.len();
380 for tx in removed {
381 self.finalize_remove(tx.hash());
382 self.listener.culled(&tx);
383 }
384 len
385 }
386 None => 0,
387 }
388 }
389
390 pub fn cull<R: Ready<T>>(&mut self, senders: Option<&[T::Sender]>, mut ready: R) -> usize {
392 let mut removed = 0;
393 match senders {
394 Some(senders) => {
395 for sender in senders {
396 removed += self.remove_stalled(sender, &mut ready);
397 }
398 }
399 None => {
400 let senders = self.transactions.keys().cloned().collect::<Vec<_>>();
401 for sender in senders {
402 removed += self.remove_stalled(&sender, &mut ready);
403 }
404 }
405 }
406
407 removed
408 }
409
410 pub fn find(&self, hash: &T::Hash) -> Option<Arc<T>> {
412 self.by_hash.get(hash).map(|t| t.transaction.clone())
413 }
414
415 pub fn worst_transaction(&self) -> Option<Arc<T>> {
417 self.worst_transactions.iter().next_back().map(|x| x.transaction.transaction.clone())
418 }
419
420 pub fn is_full(&self) -> bool {
422 self.by_hash.len() >= self.options.max_count || self.mem_usage >= self.options.max_mem_usage
423 }
424
425 pub fn senders(&self) -> impl Iterator<Item = &T::Sender> {
427 self.best_transactions.iter().map(|tx| tx.transaction.sender())
428 }
429
430 pub fn pending<R: Ready<T>>(&self, ready: R) -> PendingIterator<'_, T, R, S, L> {
432 PendingIterator { ready, best_transactions: self.best_transactions.clone(), pool: self }
433 }
434
435 pub fn pending_from_sender<R: Ready<T>>(&self, ready: R, sender: &T::Sender) -> PendingIterator<'_, T, R, S, L> {
437 let best_transactions = self
438 .transactions
439 .get(sender)
440 .and_then(|transactions| transactions.worst_and_best())
441 .map(|(_, best)| ScoreWithRef::new(best.0, best.1))
442 .map(|s| {
443 let mut set = BTreeSet::new();
444 set.insert(s);
445 set
446 })
447 .unwrap_or_default();
448
449 PendingIterator { ready, best_transactions, pool: self }
450 }
451
452 pub fn unordered_pending<R: Ready<T>>(&self, ready: R) -> UnorderedIterator<'_, T, R, S> {
454 UnorderedIterator { ready, senders: self.transactions.iter(), transactions: None }
455 }
456
457 pub fn update_scores(&mut self, sender: &T::Sender, event: S::Event) {
459 let res = if let Some(set) = self.transactions.get_mut(sender) {
460 let prev = set.worst_and_best();
461 set.update_scores(&self.scoring, event);
462 let current = set.worst_and_best();
463 Some((prev, current))
464 } else {
465 None
466 };
467
468 if let Some((prev, current)) = res {
469 self.update_senders_worst_and_best(prev, current);
470 }
471 }
472
473 pub fn status<R: Ready<T>>(&self, mut ready: R) -> Status {
475 let mut status = Status::default();
476
477 for (_sender, transactions) in &self.transactions {
478 let len = transactions.len();
479 for (idx, tx) in transactions.iter().enumerate() {
480 match ready.is_ready(tx) {
481 Readiness::Stale => status.stalled += 1,
482 Readiness::Ready => status.pending += 1,
483 Readiness::Future => {
484 status.future += len - idx;
485 break;
486 }
487 }
488 }
489 }
490
491 status
492 }
493
494 pub fn light_status(&self) -> LightStatus {
496 LightStatus {
497 mem_usage: self.mem_usage,
498 transaction_count: self.by_hash.len(),
499 senders: self.transactions.len(),
500 }
501 }
502
503 pub fn options(&self) -> Options {
505 self.options.clone()
506 }
507
508 pub fn listener(&self) -> &L {
510 &self.listener
511 }
512
513 pub fn scoring(&self) -> &S {
515 &self.scoring
516 }
517
518 pub fn listener_mut(&mut self) -> &mut L {
520 &mut self.listener
521 }
522}
523
524pub struct UnorderedIterator<'a, T, R, S>
532where
533 T: VerifiedTransaction + 'a,
534 S: Scoring<T> + 'a,
535{
536 ready: R,
537 senders: hash_map::Iter<'a, T::Sender, Transactions<T, S>>,
538 transactions: Option<slice::Iter<'a, Transaction<T>>>,
539}
540
541impl<'a, T, R, S> Iterator for UnorderedIterator<'a, T, R, S>
542where
543 T: VerifiedTransaction,
544 R: Ready<T>,
545 S: Scoring<T>,
546{
547 type Item = Arc<T>;
548
549 fn next(&mut self) -> Option<Self::Item> {
550 loop {
551 if let Some(transactions) = self.transactions.as_mut() {
552 if let Some(tx) = transactions.next() {
553 match self.ready.is_ready(&tx) {
554 Readiness::Ready => {
555 return Some(tx.transaction.clone());
556 }
557 state => trace!("[{:?}] Ignoring {:?} transaction.", tx.hash(), state),
558 }
559 }
560 }
561
562 let next_sender = self.senders.next()?;
564 self.transactions = Some(next_sender.1.iter());
565 }
566 }
567}
568
569pub struct PendingIterator<'a, T, R, S, L>
573where
574 T: VerifiedTransaction + 'a,
575 S: Scoring<T> + 'a,
576 L: 'a,
577{
578 ready: R,
579 best_transactions: BTreeSet<ScoreWithRef<T, S::Score>>,
580 pool: &'a Pool<T, S, L>,
581}
582
583impl<'a, T, R, S, L> Iterator for PendingIterator<'a, T, R, S, L>
584where
585 T: VerifiedTransaction,
586 R: Ready<T>,
587 S: Scoring<T>,
588{
589 type Item = Arc<T>;
590
591 fn next(&mut self) -> Option<Self::Item> {
592 while !self.best_transactions.is_empty() {
593 let best = {
594 let best = self.best_transactions.iter().next().expect("current_best is not empty; qed").clone();
595 self.best_transactions.take(&best).expect("Just taken from iterator; qed")
596 };
597
598 let tx_state = self.ready.is_ready(&best.transaction);
599 match tx_state {
601 Readiness::Ready | Readiness::Stale => {
602 let next = self
604 .pool
605 .transactions
606 .get(best.transaction.sender())
607 .and_then(|s| s.find_next(&best.transaction, &self.pool.scoring));
608 if let Some((score, tx)) = next {
609 self.best_transactions.insert(ScoreWithRef::new(score, tx));
610 }
611 }
612 _ => (),
613 }
614
615 if tx_state == Readiness::Ready {
616 return Some(best.transaction.transaction);
617 }
618
619 trace!("[{:?}] Ignoring {:?} transaction.", best.transaction.hash(), tx_state);
620 }
621
622 None
623 }
624}