otx_pool/pool/
mod.rs

1use crate::error::InnerResult;
2use crate::notify::NotifyController;
3
4use otx_format::{
5    jsonrpc_types::OpenTransaction,
6    types::{packed, OpenTxStatus, OpenTxWithStatus},
7};
8
9use ckb_jsonrpc_types::JsonBytes;
10use ckb_types::{prelude::Entity, H256};
11use dashmap::mapref::entry::Entry;
12use dashmap::DashMap;
13
14pub struct OtxPool {
15    raw_otxs: DashMap<H256, OpenTxWithStatus>,
16    sent_txs: DashMap<H256, Vec<H256>>,
17    notify_ctrl: NotifyController,
18}
19
20impl OtxPool {
21    pub fn new(notify_ctrl: NotifyController) -> Self {
22        let raw_otxs = DashMap::new();
23        let sent_txs = DashMap::new();
24        OtxPool {
25            raw_otxs,
26            sent_txs,
27            notify_ctrl,
28        }
29    }
30
31    pub fn insert(&self, mut otx: OpenTransaction) -> InnerResult<H256> {
32        let tx_hash = otx.get_or_insert_otx_id()?;
33        match self.raw_otxs.entry(tx_hash.clone()) {
34            Entry::Vacant(entry) => {
35                entry.insert(OpenTxWithStatus::new(otx.clone()));
36                self.notify_ctrl.notify_new_open_tx(otx)
37            }
38            Entry::Occupied(_) => {}
39        };
40        Ok(tx_hash)
41    }
42
43    pub fn get_otx_by_id(&self, id: H256) -> Option<OpenTxWithStatus> {
44        self.raw_otxs.get(&id).map(|pair| pair.value().clone())
45    }
46
47    pub fn update_otx_status(&self, id: &H256, status: OpenTxStatus) {
48        if let Some(mut otx) = self.raw_otxs.get_mut(id) {
49            otx.status = status;
50        }
51    }
52
53    pub fn insert_sent_tx(&self, tx_hash: H256, otx_hashes: Vec<H256>) {
54        self.sent_txs.insert(tx_hash, otx_hashes);
55    }
56
57    pub fn get_otxs_by_merged_otx_id(&self, id: &H256) -> Vec<OpenTxWithStatus> {
58        self.raw_otxs
59            .iter()
60            .filter(|pair| {
61                if let OpenTxStatus::Merged(merged_otx_id) = &pair.value().status {
62                    merged_otx_id == id
63                } else {
64                    false
65                }
66            })
67            .map(|pair| pair.value().clone())
68            .collect()
69    }
70}
71
72fn _parse_otx(otx: JsonBytes) -> InnerResult<OpenTransaction> {
73    let r = packed::OpenTransaction::from_slice(otx.as_bytes());
74    r.map(Into::into).map_err(Into::into)
75}