otx_pool_plugin_dust_collector/
lib.rs1mod helper;
2
3use helper::{OutputAmount, TxBuilder};
4
5use otx_format::jsonrpc_types::OpenTransaction;
6use otx_pool_config::built_in_plugins::DustCollectorConfig;
7use otx_pool_config::{CkbConfig, ScriptConfig};
8use otx_pool_plugin_protocol::{
9 HostServiceHandler, MessageFromHost, MessageFromPlugin, Plugin, PluginInfo, PluginMeta,
10};
11use otx_sdk::build_tx::OtxBuilder;
12
13use anyhow::{anyhow, Result};
14use ckb_sdk::rpc::ckb_indexer::{Order, ScriptType, SearchKey};
15use ckb_sdk::rpc::IndexerRpcClient;
16use ckb_sdk::types::{Address, HumanCapacity};
17use ckb_types::core::service::Request;
18use ckb_types::packed::Script;
19use ckb_types::{packed, H256};
20use dashmap::DashMap;
21
22use std::env;
23use std::path::PathBuf;
24
25pub const EVERY_INTERVALS: usize = 10;
26pub const MIN_PAYMENT: usize = 1_0000_0000;
27pub const DEFAULT_FEE: usize = 1000_0000;
28
29#[derive(Clone)]
30struct Context {
31 plugin_name: String,
32 otxs: DashMap<H256, OpenTransaction>,
33 default_address: Address,
34 ckb_config: CkbConfig,
35 script_config: ScriptConfig,
36 service_handler: HostServiceHandler,
37}
38
39impl Context {
40 fn new(
41 plugin_name: &str,
42 default_address: Address,
43 ckb_config: CkbConfig,
44 script_config: ScriptConfig,
45 service_handler: HostServiceHandler,
46 ) -> Self {
47 Context {
48 plugin_name: plugin_name.to_owned(),
49 otxs: DashMap::new(),
50 default_address,
51 ckb_config,
52 script_config,
53 service_handler,
54 }
55 }
56}
57
58pub struct DustCollector {
59 meta: PluginMeta,
60 info: PluginInfo,
61 context: Context,
62}
63
64impl DustCollector {
65 pub fn new(
66 service_handler: HostServiceHandler,
67 config: DustCollectorConfig,
68 ckb_config: CkbConfig,
69 script_config: ScriptConfig,
70 ) -> Result<DustCollector> {
71 let name = "dust collector";
72 let state = PluginMeta::new(PathBuf::default(), true, true);
73 let info = PluginInfo::new(
74 name,
75 "Collect micropayment otx and aggregate them into ckb tx.",
76 "1.0",
77 );
78 let address = env::var(config.get_env_default_address())?
79 .parse::<Address>()
80 .map_err(|e| anyhow!(e))?;
81 let context = Context::new(name, address, ckb_config, script_config, service_handler);
82 Ok(DustCollector {
83 meta: state,
84 info,
85 context,
86 })
87 }
88}
89
90impl Plugin for DustCollector {
91 fn get_name(&self) -> String {
92 self.info.name.clone()
93 }
94
95 fn get_info(&self) -> PluginInfo {
96 self.info.clone()
97 }
98
99 fn get_meta(&self) -> PluginMeta {
100 self.meta.clone()
101 }
102
103 fn on_new_otx(&self, otx: OpenTransaction) {
104 log::info!(
105 "on_new_open_tx, index otxs count: {:?}",
106 self.context.otxs.len()
107 );
108 if let Ok(aggregate_count) = otx.get_aggregate_count() {
109 log::info!("aggregate count: {:?}", aggregate_count);
110 if aggregate_count > 1 {
111 return;
112 }
113 }
114 if let Ok(payment_amount) = otx.get_payment_amount() {
115 log::info!("payment: {:?}", payment_amount);
116 if payment_amount.capacity < MIN_PAYMENT as i128
117 || !payment_amount.x_udt_amount.is_empty()
118 || !payment_amount.s_udt_amount.is_empty()
119 {
120 return;
121 }
122 } else {
123 return;
124 };
125 let otx_hash = otx.get_tx_hash().expect("get tx hash");
126 self.context.otxs.insert(otx_hash, otx);
127 }
128
129 fn on_commit_otx(&self, otx_hashes: Vec<H256>) {
130 log::info!(
131 "{} on commit open tx remove committed otx: {:?}",
132 self.context.plugin_name,
133 otx_hashes
134 .iter()
135 .map(|hash| hash.to_string())
136 .collect::<Vec<String>>()
137 );
138 otx_hashes.iter().for_each(|otx_hash| {
139 self.context.otxs.remove(otx_hash);
140 })
141 }
142
143 fn on_new_intervel(&self, elapsed: u64) {
144 if elapsed % EVERY_INTERVALS as u64 != 0 || self.context.otxs.len() <= 1 {
145 return;
146 }
147
148 log::info!(
149 "on new {} intervals otx set len: {:?}",
150 EVERY_INTERVALS,
151 self.context.otxs.len()
152 );
153
154 let otx_list: Vec<OpenTransaction> =
156 self.context.otxs.iter().map(|otx| otx.clone()).collect();
157 let otx_builder = OtxBuilder::new(
158 self.context.script_config.clone(),
159 self.context.ckb_config.clone(),
160 );
161 let merged_otx = if let Ok(merged_otx) = otx_builder.merge_otxs_single_acp(otx_list) {
162 log::debug!("otxs merge successfully.");
163 merged_otx
164 } else {
165 log::info!(
166 "Failed to merge otxs, all otxs staged by {} itself will be cleared.",
167 self.context.plugin_name
168 );
169 self.context.otxs.clear();
170 return;
171 };
172
173 let mut indexer = IndexerRpcClient::new(self.context.ckb_config.get_ckb_uri());
175 let lock_script: packed::Script = (&self.context.default_address).into();
176 let search_key = SearchKey {
177 script: lock_script.into(),
178 script_type: ScriptType::Lock,
179 script_search_mode: None,
180 filter: None,
181 with_data: None,
182 group_by_transaction: None,
183 };
184 let cell = if let Ok(cell) = indexer.get_cells(search_key, Order::Asc, 1.into(), None) {
185 let cell = &cell.objects[0];
186 log::info!(
187 "the broker identified an available cell: {:?}",
188 cell.out_point
189 );
190 cell.clone()
191 } else {
192 log::error!("broker has no cells available for input");
193 return;
194 };
195
196 let receive_ckb_capacity = merged_otx.get_payment_amount().unwrap().capacity;
198 let output_capacity =
199 receive_ckb_capacity as u64 + cell.output.capacity.value() - DEFAULT_FEE as u64;
200 let output_amount = OutputAmount {
201 capacity: HumanCapacity::from(output_capacity),
202 udt_amount: None,
203 };
204 let aggregator = TxBuilder::new(
205 self.context.ckb_config.clone(),
206 self.context.script_config.clone(),
207 );
208 let unsigned_otx = if let Ok(ckb_tx) = aggregator.add_input_and_output(
209 merged_otx,
210 cell.out_point,
211 &self.context.default_address,
212 output_amount,
213 Script::default(),
214 DEFAULT_FEE as u64,
215 ) {
216 ckb_tx
217 } else {
218 log::error!("failed to assemble final tx.");
219 return;
220 };
221
222 let hashes: Vec<H256> = self
224 .context
225 .otxs
226 .iter()
227 .map(|otx| otx.get_tx_hash().expect("get tx hash"))
228 .collect();
229 let message = MessageFromPlugin::NewMergedOtx((unsigned_otx, hashes));
230 if let Some(MessageFromHost::Ok) = Request::call(&self.context.service_handler, message) {
231 self.context.otxs.clear();
232 }
233 }
234}