ckb_indexer_sync/
pool.rs

1//! An overlay to index the pending txs in the ckb tx pool
2
3use ckb_async_runtime::{
4    Handle,
5    tokio::{self, task::JoinHandle},
6};
7use ckb_logger::info;
8use ckb_notify::NotifyController;
9use ckb_stop_handler::{CancellationToken, new_tokio_exit_rx};
10use ckb_types::{core::TransactionView, packed::OutPoint};
11
12use std::collections::HashSet;
13use std::sync::{Arc, RwLock};
14
15const SUBSCRIBER_NAME: &str = "Indexer";
16
17/// An overlay to index the pending txs in the ckb tx pool,
18/// currently only supports removals of dead cells from the pending txs
19#[derive(Default)]
20pub struct Pool {
21    dead_cells: HashSet<OutPoint>,
22}
23
24impl Pool {
25    /// the tx has been committed in a block, it should be removed from pending dead cells
26    pub fn transaction_committed(&mut self, tx: &TransactionView) {
27        for input in tx.inputs() {
28            self.dead_cells.remove(&input.previous_output());
29        }
30    }
31
32    /// the tx has been rejected for some reason, it should be removed from pending dead cells
33    pub fn transaction_rejected(&mut self, tx: &TransactionView) {
34        for input in tx.inputs() {
35            self.dead_cells.remove(&input.previous_output());
36        }
37    }
38
39    /// a new tx is submitted to the pool, mark its inputs as dead cells
40    pub fn new_transaction(&mut self, tx: &TransactionView) {
41        for input in tx.inputs() {
42            self.dead_cells.insert(input.previous_output());
43        }
44    }
45
46    /// Return weather out_point referred cell consumed by pooled transaction
47    pub fn is_consumed_by_pool_tx(&self, out_point: &OutPoint) -> bool {
48        self.dead_cells.contains(out_point)
49    }
50
51    /// the txs has been committed in a block, it should be removed from pending dead cells
52    pub fn transactions_committed(&mut self, txs: &[TransactionView]) {
53        for tx in txs {
54            self.transaction_committed(tx);
55        }
56    }
57
58    /// return all dead cells
59    pub fn dead_cells(&self) -> impl Iterator<Item = &OutPoint> {
60        self.dead_cells.iter()
61    }
62}
63
64/// Pool service
65#[derive(Clone)]
66pub struct PoolService {
67    pool: Option<Arc<RwLock<Pool>>>,
68    async_handle: Handle,
69    is_index_tx_pool_called: bool,
70}
71
72impl PoolService {
73    /// Construct new Pool service instance
74    pub fn new(index_tx_pool: bool, async_handle: Handle) -> Self {
75        let pool = if index_tx_pool {
76            Some(Arc::new(RwLock::new(Pool::default())))
77        } else {
78            None
79        };
80
81        Self {
82            pool,
83            async_handle,
84            is_index_tx_pool_called: false,
85        }
86    }
87
88    /// Get the inner pool
89    pub fn pool(&self) -> Option<Arc<RwLock<Pool>>> {
90        self.pool.clone()
91    }
92
93    /// Processes that handle index pool transaction and expect to be spawned to run in tokio runtime
94    pub fn index_tx_pool(
95        &mut self,
96        notify_controller: NotifyController,
97        check_index_tx_pool_ready: JoinHandle<()>,
98    ) {
99        if self.is_index_tx_pool_called {
100            return;
101        }
102        self.is_index_tx_pool_called = true;
103
104        let service = self.clone();
105        let stop: CancellationToken = new_tokio_exit_rx();
106
107        self.async_handle.spawn(async move {
108            let _check_index_tx_pool_ready = check_index_tx_pool_ready.await;
109            if stop.is_cancelled() {
110                info!("Indexer received exit signal, cancel subscribe_new_transaction task, exit now");
111                return;
112            }
113
114            info!("check_index_tx_pool_ready finished");
115
116            let mut new_transaction_receiver = notify_controller
117                .subscribe_new_transaction(SUBSCRIBER_NAME.to_string())
118                .await;
119            let mut reject_transaction_receiver = notify_controller
120                .subscribe_reject_transaction(SUBSCRIBER_NAME.to_string())
121                .await;
122
123            loop {
124                tokio::select! {
125                    Some(tx_entry) = new_transaction_receiver.recv() => {
126                        if let Some(pool) = service.pool.as_ref() {
127                            pool.write().expect("acquire lock").new_transaction(&tx_entry.transaction);
128                        }
129                    }
130                    Some((tx_entry, _reject)) = reject_transaction_receiver.recv() => {
131                        if let Some(pool) = service.pool.as_ref() {
132                            pool.write()
133                            .expect("acquire lock")
134                            .transaction_rejected(&tx_entry.transaction);
135                        }
136                    }
137                    _ = stop.cancelled() => {
138                        info!("index_tx_pool received exit signal, exit now");
139                        break
140                    },
141                    else => break,
142                }
143            }
144        });
145    }
146}