otx_pool_plugin_atomic_swap/
lib.rs1pub mod rpc;
2
3use otx_format::jsonrpc_types::OpenTransaction;
4use otx_format::types::PaymentAmount;
5use otx_pool_config::{CkbConfig, ScriptConfig};
6use otx_pool_plugin_protocol::{
7 HostServiceHandler, MessageFromHost, MessageFromPlugin, Plugin, PluginInfo, PluginMeta,
8};
9use otx_sdk::build_tx::send_tx;
10use otx_sdk::build_tx::OtxBuilder;
11
12use ckb_jsonrpc_types::Script;
13use ckb_types::core::service::Request;
14use ckb_types::H256;
15use dashmap::DashMap;
16use serde::{Deserialize, Serialize};
17
18use std::collections::HashSet;
19use std::path::PathBuf;
20
21pub const EVERY_INTERVALS: usize = 10;
22pub const MIN_FEE: u64 = 1_0000_0000;
23
24#[derive(Clone)]
25struct Context {
26 plugin_name: String,
27 ckb_config: CkbConfig,
28 script_config: ScriptConfig,
29 service_handler: HostServiceHandler,
30
31 otxs: DashMap<H256, (OpenTransaction, SwapProposal)>,
32 proposals: DashMap<SwapProposal, HashSet<H256>>,
33}
34
35impl Context {
36 fn new(
37 plugin_name: &str,
38 ckb_config: CkbConfig,
39 script_config: ScriptConfig,
40 service_handler: HostServiceHandler,
41 ) -> Self {
42 Context {
43 plugin_name: plugin_name.to_owned(),
44 ckb_config,
45 script_config,
46 service_handler,
47 otxs: DashMap::new(),
48 proposals: DashMap::new(),
49 }
50 }
51}
52
53#[derive(Debug, Eq, PartialEq, Hash, Default, Clone, Serialize, Deserialize)]
54pub struct SwapProposal {
55 pub sell_udt: Script,
56 pub sell_amount: u64,
57 pub buy_udt: Script,
58 pub buy_amount: u64,
59 pub pay_fee: u64,
60}
61
62impl SwapProposal {
63 fn pair_proposal(&self) -> SwapProposal {
64 SwapProposal {
65 sell_udt: self.buy_udt.clone(),
66 sell_amount: self.buy_amount,
67 buy_udt: self.sell_udt.clone(),
68 buy_amount: self.sell_amount,
69 pay_fee: 0, }
71 }
72
73 fn cap_match(&self, swap_proposal: SwapProposal) -> bool {
74 self.pay_fee + swap_proposal.pay_fee >= MIN_FEE
75 && self.buy_udt == swap_proposal.sell_udt
76 && self.sell_udt == swap_proposal.buy_udt
77 && self.buy_amount == swap_proposal.sell_amount
78 && self.sell_amount == swap_proposal.buy_amount
79 }
80}
81
82impl TryFrom<PaymentAmount> for SwapProposal {
83 type Error = String;
84 fn try_from(payment_amount: PaymentAmount) -> Result<Self, Self::Error> {
85 let asset_types_number = payment_amount.s_udt_amount.len()
86 + payment_amount.x_udt_amount.len()
87 + usize::from(payment_amount.capacity - payment_amount.fee as i128 != 0);
88 if asset_types_number != 2 {
89 return Err(format!(
90 "The number of asset types must be 2, but got {}",
91 asset_types_number
92 ));
93 }
94 let mut swap_proposal = SwapProposal {
95 pay_fee: payment_amount.fee,
96 ..Default::default()
97 };
98 if payment_amount.capacity - payment_amount.fee as i128 > 0 {
99 swap_proposal.sell_udt = Script::default();
100 swap_proposal.sell_amount = payment_amount.capacity as u64 - payment_amount.fee;
101 } else {
102 swap_proposal.buy_udt = Script::default();
103 swap_proposal.buy_amount =
104 (-(payment_amount.capacity - payment_amount.fee as i128)) as u64;
105 }
106 for (type_script, udt_amount) in payment_amount.s_udt_amount {
107 if udt_amount > 0 {
108 swap_proposal.sell_udt = type_script;
109 swap_proposal.sell_amount = udt_amount as u64;
110 } else {
111 swap_proposal.buy_udt = type_script;
112 swap_proposal.buy_amount = (-udt_amount) as u64;
113 }
114 }
115 for (type_script, udt_amount) in payment_amount.x_udt_amount {
116 if udt_amount > 0 {
117 swap_proposal.sell_udt = type_script;
118 swap_proposal.sell_amount = udt_amount as u64;
119 } else {
120 swap_proposal.buy_udt = type_script;
121 swap_proposal.buy_amount = (-udt_amount) as u64;
122 }
123 }
124 if swap_proposal.sell_amount == 0 || swap_proposal.buy_amount == 0 {
125 return Err("The amount of sell and buy must be greater than 0".to_owned());
126 }
127 Ok(swap_proposal)
128 }
129}
130
131#[derive(Debug, Eq, PartialEq, Hash, Default, Clone, Serialize, Deserialize)]
132pub struct SwapProposalWithOtxId {
133 pub swap_proposal: SwapProposal,
134 pub otx_id: H256,
135}
136
137impl SwapProposalWithOtxId {
138 pub fn new(swap_proposal: SwapProposal, otx_id: H256) -> Self {
139 Self {
140 swap_proposal,
141 otx_id,
142 }
143 }
144}
145
146pub struct AtomicSwap {
147 meta: PluginMeta,
148 info: PluginInfo,
149 context: Context,
150}
151
152impl AtomicSwap {
153 pub fn new(
154 service_handler: HostServiceHandler,
155 ckb_config: CkbConfig,
156 script_config: ScriptConfig,
157 ) -> Result<AtomicSwap, String> {
158 let name = "atomic swap";
159 let state = PluginMeta::new(PathBuf::default(), true, true);
160 let info = PluginInfo::new(
161 name,
162 "One kind of UDT can be used to swap another kind of UDT.",
163 "1.0",
164 );
165 let context = Context::new(name, ckb_config, script_config, service_handler);
166 Ok(AtomicSwap {
167 meta: state,
168 info,
169 context,
170 })
171 }
172}
173
174impl Plugin for AtomicSwap {
175 fn get_name(&self) -> String {
176 self.info.name.clone()
177 }
178
179 fn get_info(&self) -> PluginInfo {
180 self.info.clone()
181 }
182
183 fn get_meta(&self) -> PluginMeta {
184 self.meta.clone()
185 }
186
187 fn on_new_otx(&self, otx: OpenTransaction) {
188 log::info!(
189 "on_new_open_tx, index otxs count: {:?}",
190 self.context.otxs.len()
191 );
192 if let Ok(aggregate_count) = otx.get_aggregate_count() {
193 log::info!("aggregate count: {:?}", aggregate_count);
194 if aggregate_count > 1 {
195 return;
196 }
197 }
198 let payment_amount = if let Ok(payment_amount) = otx.get_payment_amount() {
199 payment_amount
200 } else {
201 return;
202 };
203 let mut swap_proposal: SwapProposal = match payment_amount.try_into() {
204 Ok(swap_proposal) => swap_proposal,
205 Err(err) => {
206 log::info!("parse payment amount error: {:?}", err);
207 return;
208 }
209 };
210 log::info!("swap proposal {:?}", swap_proposal);
211
212 let otx_hash = otx.get_tx_hash().expect("get otx tx hash");
213 let item = match self.context.proposals.get(&swap_proposal.pair_proposal()) {
214 Some(item) => item,
215 None => {
216 self.context
217 .otxs
218 .insert(otx_hash.clone(), (otx, swap_proposal.clone()));
219 swap_proposal.pay_fee = 0;
220 self.context
221 .proposals
222 .entry(swap_proposal)
223 .or_insert_with(HashSet::new)
224 .insert(otx_hash);
225 return;
226 }
227 };
228
229 for pair_otx_hash in item.value() {
230 let pair_otx = self
231 .context
232 .otxs
233 .get(pair_otx_hash)
234 .expect("get pair otx from otxs")
235 .value()
236 .clone();
237 if !swap_proposal.cap_match(pair_otx.1) {
238 continue;
239 }
240 log::info!("matched tx: {:#x}", pair_otx_hash);
241
242 let builder = OtxBuilder::new(
244 self.context.script_config.clone(),
245 self.context.ckb_config.clone(),
246 );
247 let otx_list = vec![otx.clone(), pair_otx.0];
248 let merged_otx = if let Ok(merged_otx) = builder.merge_otxs_single_acp(otx_list) {
249 log::debug!("otxs merge successfully.");
250 merged_otx
251 } else {
252 log::error!("{} failed to merge otxs.", self.context.plugin_name);
253 continue;
254 };
255
256 let tx = if let Ok(tx) = merged_otx.try_into() {
258 tx
259 } else {
260 log::error!("failed to generate final tx.");
261 continue;
262 };
263
264 let tx_hash = match send_tx(self.context.ckb_config.get_ckb_uri(), tx) {
266 Ok(tx_hash) => tx_hash,
267 Err(err) => {
268 log::error!("failed to send final tx: {}", err);
269 continue;
270 }
271 };
272 log::info!("commit final Ckb tx: {:?}", tx_hash.to_string());
273
274 let message = MessageFromPlugin::MergeOtxsAndSentToCkb((
276 vec![pair_otx_hash.to_owned(), otx_hash],
277 tx_hash,
278 ));
279 if let Some(MessageFromHost::Ok) = Request::call(&self.context.service_handler, message)
280 {
281 self.context.otxs.remove(pair_otx_hash);
282 self.context.proposals.retain(|_, hashes| {
283 hashes.remove(pair_otx_hash);
284 !hashes.is_empty()
285 });
286 }
287
288 break;
289 }
290 }
291
292 fn on_commit_otx(&self, otx_hashes: Vec<H256>) {
293 log::info!(
294 "{} on commit open tx remove committed otx: {:?}",
295 self.context.plugin_name,
296 otx_hashes
297 .iter()
298 .map(|hash| hash.to_string())
299 .collect::<Vec<String>>()
300 );
301 otx_hashes.iter().for_each(|otx_hash| {
302 self.context.otxs.remove(otx_hash);
303 self.context.proposals.retain(|_, hashes| {
304 hashes.remove(otx_hash);
305 !hashes.is_empty()
306 });
307 })
308 }
309
310 fn on_new_intervel(&self, elapsed: u64) {
311 if elapsed % EVERY_INTERVALS as u64 != 0 || self.context.otxs.len() <= 1 {
312 return;
313 }
314
315 log::info!(
316 "on new {} intervals otx set len: {:?}",
317 EVERY_INTERVALS,
318 self.context.otxs.len()
319 );
320 }
321}