1use crate::error::InnerResult;
2use crate::notify::NotifyController;
3
4use otx_format::{
5 jsonrpc_types::OpenTransaction,
6 types::{packed, OpenTxStatus, OpenTxWithStatus},
7};
8
9use ckb_jsonrpc_types::JsonBytes;
10use ckb_types::{prelude::Entity, H256};
11use dashmap::mapref::entry::Entry;
12use dashmap::DashMap;
13
14pub struct OtxPool {
15 raw_otxs: DashMap<H256, OpenTxWithStatus>,
16 sent_txs: DashMap<H256, Vec<H256>>,
17 notify_ctrl: NotifyController,
18}
19
20impl OtxPool {
21 pub fn new(notify_ctrl: NotifyController) -> Self {
22 let raw_otxs = DashMap::new();
23 let sent_txs = DashMap::new();
24 OtxPool {
25 raw_otxs,
26 sent_txs,
27 notify_ctrl,
28 }
29 }
30
31 pub fn insert(&self, mut otx: OpenTransaction) -> InnerResult<H256> {
32 let tx_hash = otx.get_or_insert_otx_id()?;
33 match self.raw_otxs.entry(tx_hash.clone()) {
34 Entry::Vacant(entry) => {
35 entry.insert(OpenTxWithStatus::new(otx.clone()));
36 self.notify_ctrl.notify_new_open_tx(otx)
37 }
38 Entry::Occupied(_) => {}
39 };
40 Ok(tx_hash)
41 }
42
43 pub fn get_otx_by_id(&self, id: H256) -> Option<OpenTxWithStatus> {
44 self.raw_otxs.get(&id).map(|pair| pair.value().clone())
45 }
46
47 pub fn update_otx_status(&self, id: &H256, status: OpenTxStatus) {
48 if let Some(mut otx) = self.raw_otxs.get_mut(id) {
49 otx.status = status;
50 }
51 }
52
53 pub fn insert_sent_tx(&self, tx_hash: H256, otx_hashes: Vec<H256>) {
54 self.sent_txs.insert(tx_hash, otx_hashes);
55 }
56
57 pub fn get_otxs_by_merged_otx_id(&self, id: &H256) -> Vec<OpenTxWithStatus> {
58 self.raw_otxs
59 .iter()
60 .filter(|pair| {
61 if let OpenTxStatus::Merged(merged_otx_id) = &pair.value().status {
62 merged_otx_id == id
63 } else {
64 false
65 }
66 })
67 .map(|pair| pair.value().clone())
68 .collect()
69 }
70}
71
72fn _parse_otx(otx: JsonBytes) -> InnerResult<OpenTransaction> {
73 let r = packed::OpenTransaction::from_slice(otx.as_bytes());
74 r.map(Into::into).map_err(Into::into)
75}