1use std::collections::HashMap;
2
3use anyhow::anyhow;
4use dashmap::DashMap;
5
6use ckb_jsonrpc_types as json_types;
7use ckb_types::{
8 bytes::Bytes,
9 core::{HeaderView, TransactionView},
10 packed::{Byte32, CellOutput, OutPoint, Transaction},
11 prelude::*,
12};
13
14use super::{offchain_impls::CollectResult, OffchainCellCollector};
15use crate::rpc::{
16 ckb_light_client::{FetchStatus, Order, SearchKey},
17 LightClientRpcAsyncClient,
18};
19use crate::traits::{
20 CellCollector, CellCollectorError, CellQueryOptions, HeaderDepResolver, LiveCell, QueryOrder,
21 TransactionDependencyError, TransactionDependencyProvider,
22};
23
24pub struct LightClientHeaderDepResolver {
25 client: LightClientRpcAsyncClient,
26 headers: DashMap<Byte32, Option<HeaderView>>,
28}
29
30impl LightClientHeaderDepResolver {
31 pub fn new(url: &str) -> LightClientHeaderDepResolver {
32 let client = LightClientRpcAsyncClient::new(url);
33 LightClientHeaderDepResolver {
34 client,
35 headers: DashMap::new(),
36 }
37 }
38
39 pub fn is_ready(&self) -> bool {
41 self.headers.is_empty() || self.headers.iter().all(|pair| pair.value().is_some())
42 }
43}
44
45#[async_trait::async_trait]
46impl HeaderDepResolver for LightClientHeaderDepResolver {
47 async fn resolve_by_tx_async(
48 &self,
49 tx_hash: &Byte32,
50 ) -> Result<Option<HeaderView>, anyhow::Error> {
51 if let Some(Some(header)) = self.headers.get(tx_hash).as_ref().map(|pair| pair.value()) {
52 return Ok(Some(header.clone()));
53 }
54 match self.client.fetch_transaction(tx_hash.unpack()).await? {
55 FetchStatus::Fetched { data } => {
56 if let Some(block_hash) = data.tx_status.block_hash {
57 match self.client.fetch_header(block_hash).await? {
58 FetchStatus::Fetched { data } => {
59 let header: HeaderView = data.into();
60 self.headers.insert(tx_hash.clone(), Some(header.clone()));
61 Ok(Some(header))
62 }
63 status => {
64 self.headers.insert(tx_hash.clone(), None);
65 Err(anyhow!("fetching header by transaction: {:?}", status))
66 }
67 }
68 } else {
69 self.headers.insert(tx_hash.clone(), None);
70 Err(anyhow!("fetching transaction: {:?}", data))
71 }
72 }
73 status => {
74 self.headers.insert(tx_hash.clone(), None);
75 Err(anyhow!("fetching header by transaction: {:?}", status))
76 }
77 }
78 }
79
80 async fn resolve_by_number_async(
81 &self,
82 number: u64,
83 ) -> Result<Option<HeaderView>, anyhow::Error> {
84 for pair in self.headers.iter() {
85 if let Some(header) = pair.value() {
86 if header.number() == number {
87 return Ok(Some(header.clone()));
88 }
89 }
90 }
91 Err(anyhow!(
92 "unable to resolver header by number directly when use light client as backend, you can call resolve_by_tx(tx_hash) to load the header first."
93 ))
94 }
95}
96
97pub struct LightClientTransactionDependencyProvider {
98 client: LightClientRpcAsyncClient,
99 headers: DashMap<Byte32, Option<HeaderView>>,
101 txs: DashMap<Byte32, Option<TransactionView>>,
103}
104
105impl LightClientTransactionDependencyProvider {
106 pub fn new(url: &str) -> LightClientTransactionDependencyProvider {
107 LightClientTransactionDependencyProvider {
108 client: LightClientRpcAsyncClient::new(url),
109 headers: DashMap::new(),
110 txs: DashMap::new(),
111 }
112 }
113
114 pub fn is_ready(&self) -> bool {
116 (self.headers.is_empty() && self.txs.is_empty())
117 || (self.headers.iter().all(|pair| pair.value().is_some())
118 && self.txs.iter().all(|pair| pair.value().is_some()))
119 }
120}
121
122#[async_trait::async_trait]
123impl TransactionDependencyProvider for LightClientTransactionDependencyProvider {
124 async fn get_transaction_async(
125 &self,
126 tx_hash: &Byte32,
127 ) -> Result<TransactionView, TransactionDependencyError> {
128 if let Some(Some(tx)) = self.txs.get(tx_hash).as_ref().map(|pair| pair.value()) {
129 return Ok(tx.clone());
130 }
131 match self
132 .client
133 .fetch_transaction(tx_hash.unpack())
134 .await
135 .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))?
136 {
137 FetchStatus::Fetched { data } => {
138 if let Some(block_hash) = data.tx_status.block_hash {
139 match self
140 .client
141 .fetch_header(block_hash)
142 .await
143 .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))?
144 {
145 FetchStatus::Fetched { data: header_view } => {
146 let header: HeaderView = header_view.into();
147 if let Some(transaction_view) = data.transaction {
148 let tx: TransactionView =
149 Transaction::from(transaction_view.inner).into_view();
150 self.headers.insert(header.hash(), Some(header));
151 self.txs.insert(tx_hash.clone(), Some(tx.clone()));
152 Ok(tx)
153 } else {
154 self.txs.insert(tx_hash.clone(), None);
155 Err(TransactionDependencyError::NotFound(format!(
156 "fetching transaction: {:?}",
157 header
158 )))
159 }
160 }
161 status => {
162 self.txs.insert(tx_hash.clone(), None);
163 Err(TransactionDependencyError::NotFound(format!(
164 "fetching transaction: {:?}",
165 status
166 )))
167 }
168 }
169 } else {
170 self.txs.insert(tx_hash.clone(), None);
171 Err(TransactionDependencyError::NotFound(format!(
172 "fetching transaction: {:?}",
173 data
174 )))
175 }
176 }
177 status => {
178 self.txs.insert(tx_hash.clone(), None);
179 Err(TransactionDependencyError::NotFound(format!(
180 "fetching transaction: {:?}",
181 status
182 )))
183 }
184 }
185 }
186
187 async fn get_cell_async(
188 &self,
189 out_point: &OutPoint,
190 ) -> Result<CellOutput, TransactionDependencyError> {
191 let tx = self.get_transaction_async(&out_point.tx_hash()).await?;
192 let output_index: u32 = out_point.index().unpack();
193 tx.outputs().get(output_index as usize).ok_or_else(|| {
194 TransactionDependencyError::NotFound(format!("invalid output index: {}", output_index))
195 })
196 }
197 async fn get_cell_data_async(
198 &self,
199 out_point: &OutPoint,
200 ) -> Result<Bytes, TransactionDependencyError> {
201 let tx = self.get_transaction_async(&out_point.tx_hash()).await?;
202 let output_index: u32 = out_point.index().unpack();
203 tx.outputs_data()
204 .get(output_index as usize)
205 .map(|packed_bytes| packed_bytes.raw_data())
206 .ok_or_else(|| {
207 TransactionDependencyError::NotFound(format!(
208 "invalid output index: {}",
209 output_index
210 ))
211 })
212 }
213 async fn get_header_async(
214 &self,
215 block_hash: &Byte32,
216 ) -> Result<HeaderView, TransactionDependencyError> {
217 if let Some(Some(header)) = self
218 .headers
219 .get(block_hash)
220 .as_ref()
221 .map(|pair| pair.value())
222 {
223 return Ok(header.clone());
224 }
225 match self
226 .client
227 .fetch_header(block_hash.unpack())
228 .await
229 .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))?
230 {
231 FetchStatus::Fetched { data } => {
232 let header: HeaderView = data.into();
233 self.headers
234 .insert(block_hash.clone(), Some(header.clone()));
235 Ok(header)
236 }
237 status => {
238 self.headers.insert(block_hash.clone(), None);
239 Err(TransactionDependencyError::NotFound(format!(
240 "fetching header: {:?}",
241 status
242 )))
243 }
244 }
245 }
246
247 async fn get_block_extension_async(
248 &self,
249 _block_hash: &Byte32,
250 ) -> Result<Option<ckb_types::packed::Bytes>, TransactionDependencyError> {
251 Err(TransactionDependencyError::NotFound(
252 "get_block_extension not supported".to_string(),
253 ))
254 }
255}
256
257#[derive(Clone)]
258pub struct LightClientCellCollector {
259 light_client: LightClientRpcAsyncClient,
260 offchain: OffchainCellCollector,
261}
262
263impl LightClientCellCollector {
264 pub fn new(url: &str) -> LightClientCellCollector {
265 let light_client = LightClientRpcAsyncClient::new(url);
266 LightClientCellCollector {
267 light_client,
268 offchain: OffchainCellCollector::default(),
269 }
270 }
271}
272
273#[async_trait::async_trait]
274impl CellCollector for LightClientCellCollector {
275 async fn collect_live_cells_async(
276 &mut self,
277 query: &CellQueryOptions,
278 apply_changes: bool,
279 ) -> Result<(Vec<LiveCell>, u64), CellCollectorError> {
280 let max_mature_number = 0;
281 self.offchain.max_mature_number = max_mature_number;
282 let tip_num = self
283 .light_client
284 .get_tip_header()
285 .await
286 .map_err(|err| CellCollectorError::Internal(anyhow!(err)))?
287 .inner
288 .number
289 .value();
290 let CollectResult {
291 cells,
292 rest_cells,
293 mut total_capacity,
294 } = self.offchain.collect(query, tip_num);
295 let mut cells: Vec<_> = cells.into_iter().map(|c| c.0).collect();
296
297 if total_capacity < query.min_total_capacity {
298 let order = match query.order {
299 QueryOrder::Asc => Order::Asc,
300 QueryOrder::Desc => Order::Desc,
301 };
302 let mut ret_cells: HashMap<_, _> = cells
303 .into_iter()
304 .map(|c| (c.out_point.clone(), c))
305 .collect();
306 let locked_cells = self.offchain.locked_cells.clone();
307 let search_key = SearchKey::from(query.clone());
308 const MAX_LIMIT: u32 = 4096;
309 let mut limit: u32 = query.limit.unwrap_or(16);
310 let mut last_cursor: Option<json_types::JsonBytes> = None;
311 while total_capacity < query.min_total_capacity {
312 let page = self
313 .light_client
314 .get_cells(search_key.clone(), order.clone(), limit.into(), last_cursor)
315 .await
316 .map_err(|err| CellCollectorError::Internal(err.into()))?;
317 if page.objects.is_empty() {
318 break;
319 }
320 for cell in page.objects {
321 let live_cell = LiveCell::from(cell);
322 if !query.match_cell(&live_cell, max_mature_number)
323 || locked_cells.contains_key(&(
324 live_cell.out_point.tx_hash().unpack(),
325 live_cell.out_point.index().unpack(),
326 ))
327 {
328 continue;
329 }
330 let capacity: u64 = live_cell.output.capacity().unpack();
331 if ret_cells
332 .insert(live_cell.out_point.clone(), live_cell)
333 .is_none()
334 {
335 total_capacity += capacity;
336 }
337 if total_capacity >= query.min_total_capacity {
338 break;
339 }
340 }
341 last_cursor = Some(page.last_cursor);
342 if limit < MAX_LIMIT {
343 limit *= 2;
344 }
345 }
346 cells = ret_cells.into_values().collect();
347 }
348 if apply_changes {
349 self.offchain.live_cells = rest_cells;
350 for cell in &cells {
351 self.lock_cell(cell.out_point.clone(), tip_num)?;
352 }
353 }
354 Ok((cells, total_capacity))
355 }
356
357 fn lock_cell(
358 &mut self,
359 out_point: OutPoint,
360 tip_number: u64,
361 ) -> Result<(), CellCollectorError> {
362 self.offchain.lock_cell(out_point, tip_number)
363 }
364 fn apply_tx(&mut self, tx: Transaction, tip_number: u64) -> Result<(), CellCollectorError> {
365 self.offchain.apply_tx(tx, tip_number)
366 }
367 fn reset(&mut self) {
368 self.offchain.reset();
369 }
370}