1use std::collections::HashMap;
2
3use anyhow::anyhow;
4use dashmap::DashMap;
5
6use ckb_jsonrpc_types as json_types;
7use ckb_types::{
8 bytes::Bytes,
9 core::{HeaderView, TransactionView},
10 packed::{Byte32, CellOutput, OutPoint, Transaction},
11 prelude::*,
12};
13
14use super::{offchain_impls::CollectResult, OffchainCellCollector};
15use crate::rpc::{
16 ckb_light_client::{FetchStatus, Order, SearchKey},
17 LightClientRpcAsyncClient,
18};
19use crate::traits::{
20 CellCollector, CellCollectorError, CellQueryOptions, HeaderDepResolver, LiveCell, QueryOrder,
21 TransactionDependencyError, TransactionDependencyProvider,
22};
23
24pub struct LightClientHeaderDepResolver {
25 client: LightClientRpcAsyncClient,
26 headers: DashMap<Byte32, Option<HeaderView>>,
28}
29
30impl LightClientHeaderDepResolver {
31 pub fn new(url: &str) -> LightClientHeaderDepResolver {
32 let client = LightClientRpcAsyncClient::new(url);
33 LightClientHeaderDepResolver {
34 client,
35 headers: DashMap::new(),
36 }
37 }
38
39 pub fn is_ready(&self) -> bool {
41 self.headers.is_empty() || self.headers.iter().all(|pair| pair.value().is_some())
42 }
43}
44
45#[cfg_attr(target_arch="wasm32", async_trait::async_trait(?Send))]
46#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
47impl HeaderDepResolver for LightClientHeaderDepResolver {
48 async fn resolve_by_tx_async(
49 &self,
50 tx_hash: &Byte32,
51 ) -> Result<Option<HeaderView>, anyhow::Error> {
52 if let Some(Some(header)) = self.headers.get(tx_hash).as_ref().map(|pair| pair.value()) {
53 return Ok(Some(header.clone()));
54 }
55 match self.client.fetch_transaction(tx_hash.unpack()).await? {
56 FetchStatus::Fetched { data } => {
57 if let Some(block_hash) = data.tx_status.block_hash {
58 match self.client.fetch_header(block_hash).await? {
59 FetchStatus::Fetched { data } => {
60 let header: HeaderView = data.into();
61 self.headers.insert(tx_hash.clone(), Some(header.clone()));
62 Ok(Some(header))
63 }
64 status => {
65 self.headers.insert(tx_hash.clone(), None);
66 Err(anyhow!("fetching header by transaction: {:?}", status))
67 }
68 }
69 } else {
70 self.headers.insert(tx_hash.clone(), None);
71 Err(anyhow!("fetching transaction: {:?}", data))
72 }
73 }
74 status => {
75 self.headers.insert(tx_hash.clone(), None);
76 Err(anyhow!("fetching header by transaction: {:?}", status))
77 }
78 }
79 }
80
81 async fn resolve_by_number_async(
82 &self,
83 number: u64,
84 ) -> Result<Option<HeaderView>, anyhow::Error> {
85 for pair in self.headers.iter() {
86 if let Some(header) = pair.value() {
87 if header.number() == number {
88 return Ok(Some(header.clone()));
89 }
90 }
91 }
92 Err(anyhow!(
93 "unable to resolver header by number directly when use light client as backend, you can call resolve_by_tx(tx_hash) to load the header first."
94 ))
95 }
96}
97
98pub struct LightClientTransactionDependencyProvider {
99 client: LightClientRpcAsyncClient,
100 headers: DashMap<Byte32, Option<HeaderView>>,
102 txs: DashMap<Byte32, Option<TransactionView>>,
104}
105
106impl LightClientTransactionDependencyProvider {
107 pub fn new(url: &str) -> LightClientTransactionDependencyProvider {
108 LightClientTransactionDependencyProvider {
109 client: LightClientRpcAsyncClient::new(url),
110 headers: DashMap::new(),
111 txs: DashMap::new(),
112 }
113 }
114
115 pub fn is_ready(&self) -> bool {
117 (self.headers.is_empty() && self.txs.is_empty())
118 || (self.headers.iter().all(|pair| pair.value().is_some())
119 && self.txs.iter().all(|pair| pair.value().is_some()))
120 }
121}
122
123#[cfg_attr(target_arch="wasm32", async_trait::async_trait(?Send))]
124#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
125impl TransactionDependencyProvider for LightClientTransactionDependencyProvider {
126 async fn get_transaction_async(
127 &self,
128 tx_hash: &Byte32,
129 ) -> Result<TransactionView, TransactionDependencyError> {
130 if let Some(Some(tx)) = self.txs.get(tx_hash).as_ref().map(|pair| pair.value()) {
131 return Ok(tx.clone());
132 }
133 match self
134 .client
135 .fetch_transaction(tx_hash.unpack())
136 .await
137 .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))?
138 {
139 FetchStatus::Fetched { data } => {
140 if let Some(block_hash) = data.tx_status.block_hash {
141 match self
142 .client
143 .fetch_header(block_hash)
144 .await
145 .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))?
146 {
147 FetchStatus::Fetched { data: header_view } => {
148 let header: HeaderView = header_view.into();
149 if let Some(transaction_view) = data.transaction {
150 let tx: TransactionView =
151 Transaction::from(transaction_view.inner).into_view();
152 self.headers.insert(header.hash(), Some(header));
153 self.txs.insert(tx_hash.clone(), Some(tx.clone()));
154 Ok(tx)
155 } else {
156 self.txs.insert(tx_hash.clone(), None);
157 Err(TransactionDependencyError::NotFound(format!(
158 "fetching transaction: {:?}",
159 header
160 )))
161 }
162 }
163 status => {
164 self.txs.insert(tx_hash.clone(), None);
165 Err(TransactionDependencyError::NotFound(format!(
166 "fetching transaction: {:?}",
167 status
168 )))
169 }
170 }
171 } else {
172 self.txs.insert(tx_hash.clone(), None);
173 Err(TransactionDependencyError::NotFound(format!(
174 "fetching transaction: {:?}",
175 data
176 )))
177 }
178 }
179 status => {
180 self.txs.insert(tx_hash.clone(), None);
181 Err(TransactionDependencyError::NotFound(format!(
182 "fetching transaction: {:?}",
183 status
184 )))
185 }
186 }
187 }
188
189 async fn get_cell_async(
190 &self,
191 out_point: &OutPoint,
192 ) -> Result<CellOutput, TransactionDependencyError> {
193 let tx = self.get_transaction_async(&out_point.tx_hash()).await?;
194 let output_index: u32 = out_point.index().unpack();
195 tx.outputs().get(output_index as usize).ok_or_else(|| {
196 TransactionDependencyError::NotFound(format!("invalid output index: {}", output_index))
197 })
198 }
199 async fn get_cell_data_async(
200 &self,
201 out_point: &OutPoint,
202 ) -> Result<Bytes, TransactionDependencyError> {
203 let tx = self.get_transaction_async(&out_point.tx_hash()).await?;
204 let output_index: u32 = out_point.index().unpack();
205 tx.outputs_data()
206 .get(output_index as usize)
207 .map(|packed_bytes| packed_bytes.raw_data())
208 .ok_or_else(|| {
209 TransactionDependencyError::NotFound(format!(
210 "invalid output index: {}",
211 output_index
212 ))
213 })
214 }
215 async fn get_header_async(
216 &self,
217 block_hash: &Byte32,
218 ) -> Result<HeaderView, TransactionDependencyError> {
219 if let Some(Some(header)) = self
220 .headers
221 .get(block_hash)
222 .as_ref()
223 .map(|pair| pair.value())
224 {
225 return Ok(header.clone());
226 }
227 match self
228 .client
229 .fetch_header(block_hash.unpack())
230 .await
231 .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))?
232 {
233 FetchStatus::Fetched { data } => {
234 let header: HeaderView = data.into();
235 self.headers
236 .insert(block_hash.clone(), Some(header.clone()));
237 Ok(header)
238 }
239 status => {
240 self.headers.insert(block_hash.clone(), None);
241 Err(TransactionDependencyError::NotFound(format!(
242 "fetching header: {:?}",
243 status
244 )))
245 }
246 }
247 }
248
249 async fn get_block_extension_async(
250 &self,
251 _block_hash: &Byte32,
252 ) -> Result<Option<ckb_types::packed::Bytes>, TransactionDependencyError> {
253 Err(TransactionDependencyError::NotFound(
254 "get_block_extension not supported".to_string(),
255 ))
256 }
257}
258
259#[derive(Clone)]
260pub struct LightClientCellCollector {
261 light_client: LightClientRpcAsyncClient,
262 offchain: OffchainCellCollector,
263}
264
265impl LightClientCellCollector {
266 pub fn new(url: &str) -> LightClientCellCollector {
267 let light_client = LightClientRpcAsyncClient::new(url);
268 LightClientCellCollector {
269 light_client,
270 offchain: OffchainCellCollector::default(),
271 }
272 }
273}
274
275#[cfg_attr(target_arch="wasm32", async_trait::async_trait(?Send))]
276#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
277impl CellCollector for LightClientCellCollector {
278 async fn collect_live_cells_async(
279 &mut self,
280 query: &CellQueryOptions,
281 apply_changes: bool,
282 ) -> Result<(Vec<LiveCell>, u64), CellCollectorError> {
283 let max_mature_number = 0;
284 self.offchain.max_mature_number = max_mature_number;
285 let tip_num = self
286 .light_client
287 .get_tip_header()
288 .await
289 .map_err(|err| CellCollectorError::Internal(anyhow!(err)))?
290 .inner
291 .number
292 .value();
293 let CollectResult {
294 cells,
295 rest_cells,
296 mut total_capacity,
297 } = self.offchain.collect(query, tip_num);
298 let mut cells: Vec<_> = cells.into_iter().map(|c| c.0).collect();
299
300 if total_capacity < query.min_total_capacity {
301 let order = match query.order {
302 QueryOrder::Asc => Order::Asc,
303 QueryOrder::Desc => Order::Desc,
304 };
305 let mut ret_cells: HashMap<_, _> = cells
306 .into_iter()
307 .map(|c| (c.out_point.clone(), c))
308 .collect();
309 let locked_cells = self.offchain.locked_cells.clone();
310 let search_key = SearchKey::from(query.clone());
311 const MAX_LIMIT: u32 = 4096;
312 let mut limit: u32 = query.limit.unwrap_or(16);
313 let mut last_cursor: Option<json_types::JsonBytes> = None;
314 while total_capacity < query.min_total_capacity {
315 let page = self
316 .light_client
317 .get_cells(search_key.clone(), order.clone(), limit.into(), last_cursor)
318 .await
319 .map_err(|err| CellCollectorError::Internal(err.into()))?;
320 if page.objects.is_empty() {
321 break;
322 }
323 for cell in page.objects {
324 let live_cell = LiveCell::from(cell);
325 if !query.match_cell(&live_cell, max_mature_number)
326 || locked_cells.contains_key(&(
327 live_cell.out_point.tx_hash().unpack(),
328 live_cell.out_point.index().unpack(),
329 ))
330 {
331 continue;
332 }
333 let capacity: u64 = live_cell.output.capacity().unpack();
334 if ret_cells
335 .insert(live_cell.out_point.clone(), live_cell)
336 .is_none()
337 {
338 total_capacity += capacity;
339 }
340 if total_capacity >= query.min_total_capacity {
341 break;
342 }
343 }
344 last_cursor = Some(page.last_cursor);
345 if limit < MAX_LIMIT {
346 limit *= 2;
347 }
348 }
349 cells = ret_cells.into_values().collect();
350 }
351 if apply_changes {
352 self.offchain.live_cells = rest_cells;
353 for cell in &cells {
354 self.lock_cell(cell.out_point.clone(), tip_num)?;
355 }
356 }
357 Ok((cells, total_capacity))
358 }
359
360 fn lock_cell(
361 &mut self,
362 out_point: OutPoint,
363 tip_number: u64,
364 ) -> Result<(), CellCollectorError> {
365 self.offchain.lock_cell(out_point, tip_number)
366 }
367 fn apply_tx(&mut self, tx: Transaction, tip_number: u64) -> Result<(), CellCollectorError> {
368 self.offchain.apply_tx(tx, tip_number)
369 }
370 fn reset(&mut self) {
371 self.offchain.reset();
372 }
373}