otx_pool_plugin_atomic_swap/
lib.rs

1pub mod rpc;
2
3use otx_format::jsonrpc_types::OpenTransaction;
4use otx_format::types::PaymentAmount;
5use otx_pool_config::{CkbConfig, ScriptConfig};
6use otx_pool_plugin_protocol::{
7    HostServiceHandler, MessageFromHost, MessageFromPlugin, Plugin, PluginInfo, PluginMeta,
8};
9use otx_sdk::build_tx::send_tx;
10use otx_sdk::build_tx::OtxBuilder;
11
12use ckb_jsonrpc_types::Script;
13use ckb_types::core::service::Request;
14use ckb_types::H256;
15use dashmap::DashMap;
16use serde::{Deserialize, Serialize};
17
18use std::collections::HashSet;
19use std::path::PathBuf;
20
21pub const EVERY_INTERVALS: usize = 10;
22pub const MIN_FEE: u64 = 1_0000_0000;
23
24#[derive(Clone)]
25struct Context {
26    plugin_name: String,
27    ckb_config: CkbConfig,
28    script_config: ScriptConfig,
29    service_handler: HostServiceHandler,
30
31    otxs: DashMap<H256, (OpenTransaction, SwapProposal)>,
32    proposals: DashMap<SwapProposal, HashSet<H256>>,
33}
34
35impl Context {
36    fn new(
37        plugin_name: &str,
38        ckb_config: CkbConfig,
39        script_config: ScriptConfig,
40        service_handler: HostServiceHandler,
41    ) -> Self {
42        Context {
43            plugin_name: plugin_name.to_owned(),
44            ckb_config,
45            script_config,
46            service_handler,
47            otxs: DashMap::new(),
48            proposals: DashMap::new(),
49        }
50    }
51}
52
53#[derive(Debug, Eq, PartialEq, Hash, Default, Clone, Serialize, Deserialize)]
54pub struct SwapProposal {
55    pub sell_udt: Script,
56    pub sell_amount: u64,
57    pub buy_udt: Script,
58    pub buy_amount: u64,
59    pub pay_fee: u64,
60}
61
62impl SwapProposal {
63    fn pair_proposal(&self) -> SwapProposal {
64        SwapProposal {
65            sell_udt: self.buy_udt.clone(),
66            sell_amount: self.buy_amount,
67            buy_udt: self.sell_udt.clone(),
68            buy_amount: self.sell_amount,
69            pay_fee: 0, // no need to consider the fee when doing a pair
70        }
71    }
72
73    fn cap_match(&self, swap_proposal: SwapProposal) -> bool {
74        self.pay_fee + swap_proposal.pay_fee >= MIN_FEE
75            && self.buy_udt == swap_proposal.sell_udt
76            && self.sell_udt == swap_proposal.buy_udt
77            && self.buy_amount == swap_proposal.sell_amount
78            && self.sell_amount == swap_proposal.buy_amount
79    }
80}
81
82impl TryFrom<PaymentAmount> for SwapProposal {
83    type Error = String;
84    fn try_from(payment_amount: PaymentAmount) -> Result<Self, Self::Error> {
85        let asset_types_number = payment_amount.s_udt_amount.len()
86            + payment_amount.x_udt_amount.len()
87            + usize::from(payment_amount.capacity - payment_amount.fee as i128 != 0);
88        if asset_types_number != 2 {
89            return Err(format!(
90                "The number of asset types must be 2, but got {}",
91                asset_types_number
92            ));
93        }
94        let mut swap_proposal = SwapProposal {
95            pay_fee: payment_amount.fee,
96            ..Default::default()
97        };
98        if payment_amount.capacity - payment_amount.fee as i128 > 0 {
99            swap_proposal.sell_udt = Script::default();
100            swap_proposal.sell_amount = payment_amount.capacity as u64 - payment_amount.fee;
101        } else {
102            swap_proposal.buy_udt = Script::default();
103            swap_proposal.buy_amount =
104                (-(payment_amount.capacity - payment_amount.fee as i128)) as u64;
105        }
106        for (type_script, udt_amount) in payment_amount.s_udt_amount {
107            if udt_amount > 0 {
108                swap_proposal.sell_udt = type_script;
109                swap_proposal.sell_amount = udt_amount as u64;
110            } else {
111                swap_proposal.buy_udt = type_script;
112                swap_proposal.buy_amount = (-udt_amount) as u64;
113            }
114        }
115        for (type_script, udt_amount) in payment_amount.x_udt_amount {
116            if udt_amount > 0 {
117                swap_proposal.sell_udt = type_script;
118                swap_proposal.sell_amount = udt_amount as u64;
119            } else {
120                swap_proposal.buy_udt = type_script;
121                swap_proposal.buy_amount = (-udt_amount) as u64;
122            }
123        }
124        if swap_proposal.sell_amount == 0 || swap_proposal.buy_amount == 0 {
125            return Err("The amount of sell and buy must be greater than 0".to_owned());
126        }
127        Ok(swap_proposal)
128    }
129}
130
131#[derive(Debug, Eq, PartialEq, Hash, Default, Clone, Serialize, Deserialize)]
132pub struct SwapProposalWithOtxId {
133    pub swap_proposal: SwapProposal,
134    pub otx_id: H256,
135}
136
137impl SwapProposalWithOtxId {
138    pub fn new(swap_proposal: SwapProposal, otx_id: H256) -> Self {
139        Self {
140            swap_proposal,
141            otx_id,
142        }
143    }
144}
145
146pub struct AtomicSwap {
147    meta: PluginMeta,
148    info: PluginInfo,
149    context: Context,
150}
151
152impl AtomicSwap {
153    pub fn new(
154        service_handler: HostServiceHandler,
155        ckb_config: CkbConfig,
156        script_config: ScriptConfig,
157    ) -> Result<AtomicSwap, String> {
158        let name = "atomic swap";
159        let state = PluginMeta::new(PathBuf::default(), true, true);
160        let info = PluginInfo::new(
161            name,
162            "One kind of UDT can be used to swap another kind of UDT.",
163            "1.0",
164        );
165        let context = Context::new(name, ckb_config, script_config, service_handler);
166        Ok(AtomicSwap {
167            meta: state,
168            info,
169            context,
170        })
171    }
172}
173
174impl Plugin for AtomicSwap {
175    fn get_name(&self) -> String {
176        self.info.name.clone()
177    }
178
179    fn get_info(&self) -> PluginInfo {
180        self.info.clone()
181    }
182
183    fn get_meta(&self) -> PluginMeta {
184        self.meta.clone()
185    }
186
187    fn on_new_otx(&self, otx: OpenTransaction) {
188        log::info!(
189            "on_new_open_tx, index otxs count: {:?}",
190            self.context.otxs.len()
191        );
192        if let Ok(aggregate_count) = otx.get_aggregate_count() {
193            log::info!("aggregate count: {:?}", aggregate_count);
194            if aggregate_count > 1 {
195                return;
196            }
197        }
198        let payment_amount = if let Ok(payment_amount) = otx.get_payment_amount() {
199            payment_amount
200        } else {
201            return;
202        };
203        let mut swap_proposal: SwapProposal = match payment_amount.try_into() {
204            Ok(swap_proposal) => swap_proposal,
205            Err(err) => {
206                log::info!("parse payment amount error: {:?}", err);
207                return;
208            }
209        };
210        log::info!("swap proposal {:?}", swap_proposal);
211
212        let otx_hash = otx.get_tx_hash().expect("get otx tx hash");
213        let item = match self.context.proposals.get(&swap_proposal.pair_proposal()) {
214            Some(item) => item,
215            None => {
216                self.context
217                    .otxs
218                    .insert(otx_hash.clone(), (otx, swap_proposal.clone()));
219                swap_proposal.pay_fee = 0;
220                self.context
221                    .proposals
222                    .entry(swap_proposal)
223                    .or_insert_with(HashSet::new)
224                    .insert(otx_hash);
225                return;
226            }
227        };
228
229        for pair_otx_hash in item.value() {
230            let pair_otx = self
231                .context
232                .otxs
233                .get(pair_otx_hash)
234                .expect("get pair otx from otxs")
235                .value()
236                .clone();
237            if !swap_proposal.cap_match(pair_otx.1) {
238                continue;
239            }
240            log::info!("matched tx: {:#x}", pair_otx_hash);
241
242            // merge_otx
243            let builder = OtxBuilder::new(
244                self.context.script_config.clone(),
245                self.context.ckb_config.clone(),
246            );
247            let otx_list = vec![otx.clone(), pair_otx.0];
248            let merged_otx = if let Ok(merged_otx) = builder.merge_otxs_single_acp(otx_list) {
249                log::debug!("otxs merge successfully.");
250                merged_otx
251            } else {
252                log::error!("{} failed to merge otxs.", self.context.plugin_name);
253                continue;
254            };
255
256            // to final tx
257            let tx = if let Ok(tx) = merged_otx.try_into() {
258                tx
259            } else {
260                log::error!("failed to generate final tx.");
261                continue;
262            };
263
264            // send_ckb
265            let tx_hash = match send_tx(self.context.ckb_config.get_ckb_uri(), tx) {
266                Ok(tx_hash) => tx_hash,
267                Err(err) => {
268                    log::error!("failed to send final tx: {}", err);
269                    continue;
270                }
271            };
272            log::info!("commit final Ckb tx: {:?}", tx_hash.to_string());
273
274            // call host service
275            let message = MessageFromPlugin::MergeOtxsAndSentToCkb((
276                vec![pair_otx_hash.to_owned(), otx_hash],
277                tx_hash,
278            ));
279            if let Some(MessageFromHost::Ok) = Request::call(&self.context.service_handler, message)
280            {
281                self.context.otxs.remove(pair_otx_hash);
282                self.context.proposals.retain(|_, hashes| {
283                    hashes.remove(pair_otx_hash);
284                    !hashes.is_empty()
285                });
286            }
287
288            break;
289        }
290    }
291
292    fn on_commit_otx(&self, otx_hashes: Vec<H256>) {
293        log::info!(
294            "{} on commit open tx remove committed otx: {:?}",
295            self.context.plugin_name,
296            otx_hashes
297                .iter()
298                .map(|hash| hash.to_string())
299                .collect::<Vec<String>>()
300        );
301        otx_hashes.iter().for_each(|otx_hash| {
302            self.context.otxs.remove(otx_hash);
303            self.context.proposals.retain(|_, hashes| {
304                hashes.remove(otx_hash);
305                !hashes.is_empty()
306            });
307        })
308    }
309
310    fn on_new_intervel(&self, elapsed: u64) {
311        if elapsed % EVERY_INTERVALS as u64 != 0 || self.context.otxs.len() <= 1 {
312            return;
313        }
314
315        log::info!(
316            "on new {} intervals otx set len: {:?}",
317            EVERY_INTERVALS,
318            self.context.otxs.len()
319        );
320    }
321}