otx_pool_plugin_dust_collector/
lib.rs

1mod helper;
2
3use helper::{OutputAmount, TxBuilder};
4
5use otx_format::jsonrpc_types::OpenTransaction;
6use otx_pool_config::built_in_plugins::DustCollectorConfig;
7use otx_pool_config::{CkbConfig, ScriptConfig};
8use otx_pool_plugin_protocol::{
9    HostServiceHandler, MessageFromHost, MessageFromPlugin, Plugin, PluginInfo, PluginMeta,
10};
11use otx_sdk::build_tx::OtxBuilder;
12
13use anyhow::{anyhow, Result};
14use ckb_sdk::rpc::ckb_indexer::{Order, ScriptType, SearchKey};
15use ckb_sdk::rpc::IndexerRpcClient;
16use ckb_sdk::types::{Address, HumanCapacity};
17use ckb_types::core::service::Request;
18use ckb_types::packed::Script;
19use ckb_types::{packed, H256};
20use dashmap::DashMap;
21
22use std::env;
23use std::path::PathBuf;
24
25pub const EVERY_INTERVALS: usize = 10;
26pub const MIN_PAYMENT: usize = 1_0000_0000;
27pub const DEFAULT_FEE: usize = 1000_0000;
28
29#[derive(Clone)]
30struct Context {
31    plugin_name: String,
32    otxs: DashMap<H256, OpenTransaction>,
33    default_address: Address,
34    ckb_config: CkbConfig,
35    script_config: ScriptConfig,
36    service_handler: HostServiceHandler,
37}
38
39impl Context {
40    fn new(
41        plugin_name: &str,
42        default_address: Address,
43        ckb_config: CkbConfig,
44        script_config: ScriptConfig,
45        service_handler: HostServiceHandler,
46    ) -> Self {
47        Context {
48            plugin_name: plugin_name.to_owned(),
49            otxs: DashMap::new(),
50            default_address,
51            ckb_config,
52            script_config,
53            service_handler,
54        }
55    }
56}
57
58pub struct DustCollector {
59    meta: PluginMeta,
60    info: PluginInfo,
61    context: Context,
62}
63
64impl DustCollector {
65    pub fn new(
66        service_handler: HostServiceHandler,
67        config: DustCollectorConfig,
68        ckb_config: CkbConfig,
69        script_config: ScriptConfig,
70    ) -> Result<DustCollector> {
71        let name = "dust collector";
72        let state = PluginMeta::new(PathBuf::default(), true, true);
73        let info = PluginInfo::new(
74            name,
75            "Collect micropayment otx and aggregate them into ckb tx.",
76            "1.0",
77        );
78        let address = env::var(config.get_env_default_address())?
79            .parse::<Address>()
80            .map_err(|e| anyhow!(e))?;
81        let context = Context::new(name, address, ckb_config, script_config, service_handler);
82        Ok(DustCollector {
83            meta: state,
84            info,
85            context,
86        })
87    }
88}
89
90impl Plugin for DustCollector {
91    fn get_name(&self) -> String {
92        self.info.name.clone()
93    }
94
95    fn get_info(&self) -> PluginInfo {
96        self.info.clone()
97    }
98
99    fn get_meta(&self) -> PluginMeta {
100        self.meta.clone()
101    }
102
103    fn on_new_otx(&self, otx: OpenTransaction) {
104        log::info!(
105            "on_new_open_tx, index otxs count: {:?}",
106            self.context.otxs.len()
107        );
108        if let Ok(aggregate_count) = otx.get_aggregate_count() {
109            log::info!("aggregate count: {:?}", aggregate_count);
110            if aggregate_count > 1 {
111                return;
112            }
113        }
114        if let Ok(payment_amount) = otx.get_payment_amount() {
115            log::info!("payment: {:?}", payment_amount);
116            if payment_amount.capacity < MIN_PAYMENT as i128
117                || !payment_amount.x_udt_amount.is_empty()
118                || !payment_amount.s_udt_amount.is_empty()
119            {
120                return;
121            }
122        } else {
123            return;
124        };
125        let otx_hash = otx.get_tx_hash().expect("get tx hash");
126        self.context.otxs.insert(otx_hash, otx);
127    }
128
129    fn on_commit_otx(&self, otx_hashes: Vec<H256>) {
130        log::info!(
131            "{} on commit open tx remove committed otx: {:?}",
132            self.context.plugin_name,
133            otx_hashes
134                .iter()
135                .map(|hash| hash.to_string())
136                .collect::<Vec<String>>()
137        );
138        otx_hashes.iter().for_each(|otx_hash| {
139            self.context.otxs.remove(otx_hash);
140        })
141    }
142
143    fn on_new_intervel(&self, elapsed: u64) {
144        if elapsed % EVERY_INTERVALS as u64 != 0 || self.context.otxs.len() <= 1 {
145            return;
146        }
147
148        log::info!(
149            "on new {} intervals otx set len: {:?}",
150            EVERY_INTERVALS,
151            self.context.otxs.len()
152        );
153
154        // merge_otx
155        let otx_list: Vec<OpenTransaction> =
156            self.context.otxs.iter().map(|otx| otx.clone()).collect();
157        let otx_builder = OtxBuilder::new(
158            self.context.script_config.clone(),
159            self.context.ckb_config.clone(),
160        );
161        let merged_otx = if let Ok(merged_otx) = otx_builder.merge_otxs_single_acp(otx_list) {
162            log::debug!("otxs merge successfully.");
163            merged_otx
164        } else {
165            log::info!(
166                "Failed to merge otxs, all otxs staged by {} itself will be cleared.",
167                self.context.plugin_name
168            );
169            self.context.otxs.clear();
170            return;
171        };
172
173        // find a input cell to receive assets
174        let mut indexer = IndexerRpcClient::new(self.context.ckb_config.get_ckb_uri());
175        let lock_script: packed::Script = (&self.context.default_address).into();
176        let search_key = SearchKey {
177            script: lock_script.into(),
178            script_type: ScriptType::Lock,
179            script_search_mode: None,
180            filter: None,
181            with_data: None,
182            group_by_transaction: None,
183        };
184        let cell = if let Ok(cell) = indexer.get_cells(search_key, Order::Asc, 1.into(), None) {
185            let cell = &cell.objects[0];
186            log::info!(
187                "the broker identified an available cell: {:?}",
188                cell.out_point
189            );
190            cell.clone()
191        } else {
192            log::error!("broker has no cells available for input");
193            return;
194        };
195
196        // add input and output
197        let receive_ckb_capacity = merged_otx.get_payment_amount().unwrap().capacity;
198        let output_capacity =
199            receive_ckb_capacity as u64 + cell.output.capacity.value() - DEFAULT_FEE as u64;
200        let output_amount = OutputAmount {
201            capacity: HumanCapacity::from(output_capacity),
202            udt_amount: None,
203        };
204        let aggregator = TxBuilder::new(
205            self.context.ckb_config.clone(),
206            self.context.script_config.clone(),
207        );
208        let unsigned_otx = if let Ok(ckb_tx) = aggregator.add_input_and_output(
209            merged_otx,
210            cell.out_point,
211            &self.context.default_address,
212            output_amount,
213            Script::default(),
214            DEFAULT_FEE as u64,
215        ) {
216            ckb_tx
217        } else {
218            log::error!("failed to assemble final tx.");
219            return;
220        };
221
222        // call host service
223        let hashes: Vec<H256> = self
224            .context
225            .otxs
226            .iter()
227            .map(|otx| otx.get_tx_hash().expect("get tx hash"))
228            .collect();
229        let message = MessageFromPlugin::NewMergedOtx((unsigned_otx, hashes));
230        if let Some(MessageFromHost::Ok) = Request::call(&self.context.service_handler, message) {
231            self.context.otxs.clear();
232        }
233    }
234}