1use ckb_async_runtime::{
4 Handle,
5 tokio::{self, task::JoinHandle},
6};
7use ckb_logger::info;
8use ckb_notify::NotifyController;
9use ckb_stop_handler::{CancellationToken, new_tokio_exit_rx};
10use ckb_types::{core::TransactionView, packed::OutPoint};
11
12use std::collections::HashSet;
13use std::sync::{Arc, RwLock};
14
15const SUBSCRIBER_NAME: &str = "Indexer";
16
17#[derive(Default)]
20pub struct Pool {
21 dead_cells: HashSet<OutPoint>,
22}
23
24impl Pool {
25 pub fn transaction_committed(&mut self, tx: &TransactionView) {
27 for input in tx.inputs() {
28 self.dead_cells.remove(&input.previous_output());
29 }
30 }
31
32 pub fn transaction_rejected(&mut self, tx: &TransactionView) {
34 for input in tx.inputs() {
35 self.dead_cells.remove(&input.previous_output());
36 }
37 }
38
39 pub fn new_transaction(&mut self, tx: &TransactionView) {
41 for input in tx.inputs() {
42 self.dead_cells.insert(input.previous_output());
43 }
44 }
45
46 pub fn is_consumed_by_pool_tx(&self, out_point: &OutPoint) -> bool {
48 self.dead_cells.contains(out_point)
49 }
50
51 pub fn transactions_committed(&mut self, txs: &[TransactionView]) {
53 for tx in txs {
54 self.transaction_committed(tx);
55 }
56 }
57
58 pub fn dead_cells(&self) -> impl Iterator<Item = &OutPoint> {
60 self.dead_cells.iter()
61 }
62}
63
64#[derive(Clone)]
66pub struct PoolService {
67 pool: Option<Arc<RwLock<Pool>>>,
68 async_handle: Handle,
69 is_index_tx_pool_called: bool,
70}
71
72impl PoolService {
73 pub fn new(index_tx_pool: bool, async_handle: Handle) -> Self {
75 let pool = if index_tx_pool {
76 Some(Arc::new(RwLock::new(Pool::default())))
77 } else {
78 None
79 };
80
81 Self {
82 pool,
83 async_handle,
84 is_index_tx_pool_called: false,
85 }
86 }
87
88 pub fn pool(&self) -> Option<Arc<RwLock<Pool>>> {
90 self.pool.clone()
91 }
92
93 pub fn index_tx_pool(
95 &mut self,
96 notify_controller: NotifyController,
97 check_index_tx_pool_ready: JoinHandle<()>,
98 ) {
99 if self.is_index_tx_pool_called {
100 return;
101 }
102 self.is_index_tx_pool_called = true;
103
104 let service = self.clone();
105 let stop: CancellationToken = new_tokio_exit_rx();
106
107 self.async_handle.spawn(async move {
108 let _check_index_tx_pool_ready = check_index_tx_pool_ready.await;
109 if stop.is_cancelled() {
110 info!("Indexer received exit signal, cancel subscribe_new_transaction task, exit now");
111 return;
112 }
113
114 info!("check_index_tx_pool_ready finished");
115
116 let mut new_transaction_receiver = notify_controller
117 .subscribe_new_transaction(SUBSCRIBER_NAME.to_string())
118 .await;
119 let mut reject_transaction_receiver = notify_controller
120 .subscribe_reject_transaction(SUBSCRIBER_NAME.to_string())
121 .await;
122
123 loop {
124 tokio::select! {
125 Some(tx_entry) = new_transaction_receiver.recv() => {
126 if let Some(pool) = service.pool.as_ref() {
127 pool.write().expect("acquire lock").new_transaction(&tx_entry.transaction);
128 }
129 }
130 Some((tx_entry, _reject)) = reject_transaction_receiver.recv() => {
131 if let Some(pool) = service.pool.as_ref() {
132 pool.write()
133 .expect("acquire lock")
134 .transaction_rejected(&tx_entry.transaction);
135 }
136 }
137 _ = stop.cancelled() => {
138 info!("index_tx_pool received exit signal, exit now");
139 break
140 },
141 else => break,
142 }
143 }
144 });
145 }
146}