ckb_sdk/traits/
light_client_impls.rs

1use std::collections::HashMap;
2
3use anyhow::anyhow;
4use dashmap::DashMap;
5
6use ckb_jsonrpc_types as json_types;
7use ckb_types::{
8    bytes::Bytes,
9    core::{HeaderView, TransactionView},
10    packed::{Byte32, CellOutput, OutPoint, Transaction},
11    prelude::*,
12};
13
14use super::{offchain_impls::CollectResult, OffchainCellCollector};
15use crate::rpc::{
16    ckb_light_client::{FetchStatus, Order, SearchKey},
17    LightClientRpcAsyncClient,
18};
19use crate::traits::{
20    CellCollector, CellCollectorError, CellQueryOptions, HeaderDepResolver, LiveCell, QueryOrder,
21    TransactionDependencyError, TransactionDependencyProvider,
22};
23
24pub struct LightClientHeaderDepResolver {
25    client: LightClientRpcAsyncClient,
26    // tx_hash => HeaderView
27    headers: DashMap<Byte32, Option<HeaderView>>,
28}
29
30impl LightClientHeaderDepResolver {
31    pub fn new(url: &str) -> LightClientHeaderDepResolver {
32        let client = LightClientRpcAsyncClient::new(url);
33        LightClientHeaderDepResolver {
34            client,
35            headers: DashMap::new(),
36        }
37    }
38
39    /// Check if headers all fetched
40    pub fn is_ready(&self) -> bool {
41        self.headers.is_empty() || self.headers.iter().all(|pair| pair.value().is_some())
42    }
43}
44
45#[cfg_attr(target_arch="wasm32", async_trait::async_trait(?Send))]
46#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
47impl HeaderDepResolver for LightClientHeaderDepResolver {
48    async fn resolve_by_tx_async(
49        &self,
50        tx_hash: &Byte32,
51    ) -> Result<Option<HeaderView>, anyhow::Error> {
52        if let Some(Some(header)) = self.headers.get(tx_hash).as_ref().map(|pair| pair.value()) {
53            return Ok(Some(header.clone()));
54        }
55        match self.client.fetch_transaction(tx_hash.unpack()).await? {
56            FetchStatus::Fetched { data } => {
57                if let Some(block_hash) = data.tx_status.block_hash {
58                    match self.client.fetch_header(block_hash).await? {
59                        FetchStatus::Fetched { data } => {
60                            let header: HeaderView = data.into();
61                            self.headers.insert(tx_hash.clone(), Some(header.clone()));
62                            Ok(Some(header))
63                        }
64                        status => {
65                            self.headers.insert(tx_hash.clone(), None);
66                            Err(anyhow!("fetching header by transaction: {:?}", status))
67                        }
68                    }
69                } else {
70                    self.headers.insert(tx_hash.clone(), None);
71                    Err(anyhow!("fetching transaction: {:?}", data))
72                }
73            }
74            status => {
75                self.headers.insert(tx_hash.clone(), None);
76                Err(anyhow!("fetching header by transaction: {:?}", status))
77            }
78        }
79    }
80
81    async fn resolve_by_number_async(
82        &self,
83        number: u64,
84    ) -> Result<Option<HeaderView>, anyhow::Error> {
85        for pair in self.headers.iter() {
86            if let Some(header) = pair.value() {
87                if header.number() == number {
88                    return Ok(Some(header.clone()));
89                }
90            }
91        }
92        Err(anyhow!(
93                "unable to resolver header by number directly when use light client as backend, you can call resolve_by_tx(tx_hash) to load the header first."
94            ))
95    }
96}
97
98pub struct LightClientTransactionDependencyProvider {
99    client: LightClientRpcAsyncClient,
100    // headers to load
101    headers: DashMap<Byte32, Option<HeaderView>>,
102    // transactions to load
103    txs: DashMap<Byte32, Option<TransactionView>>,
104}
105
106impl LightClientTransactionDependencyProvider {
107    pub fn new(url: &str) -> LightClientTransactionDependencyProvider {
108        LightClientTransactionDependencyProvider {
109            client: LightClientRpcAsyncClient::new(url),
110            headers: DashMap::new(),
111            txs: DashMap::new(),
112        }
113    }
114
115    /// Check if headers and transactions all fetched
116    pub fn is_ready(&self) -> bool {
117        (self.headers.is_empty() && self.txs.is_empty())
118            || (self.headers.iter().all(|pair| pair.value().is_some())
119                && self.txs.iter().all(|pair| pair.value().is_some()))
120    }
121}
122
123#[cfg_attr(target_arch="wasm32", async_trait::async_trait(?Send))]
124#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
125impl TransactionDependencyProvider for LightClientTransactionDependencyProvider {
126    async fn get_transaction_async(
127        &self,
128        tx_hash: &Byte32,
129    ) -> Result<TransactionView, TransactionDependencyError> {
130        if let Some(Some(tx)) = self.txs.get(tx_hash).as_ref().map(|pair| pair.value()) {
131            return Ok(tx.clone());
132        }
133        match self
134            .client
135            .fetch_transaction(tx_hash.unpack())
136            .await
137            .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))?
138        {
139            FetchStatus::Fetched { data } => {
140                if let Some(block_hash) = data.tx_status.block_hash {
141                    match self
142                        .client
143                        .fetch_header(block_hash)
144                        .await
145                        .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))?
146                    {
147                        FetchStatus::Fetched { data: header_view } => {
148                            let header: HeaderView = header_view.into();
149                            if let Some(transaction_view) = data.transaction {
150                                let tx: TransactionView =
151                                    Transaction::from(transaction_view.inner).into_view();
152                                self.headers.insert(header.hash(), Some(header));
153                                self.txs.insert(tx_hash.clone(), Some(tx.clone()));
154                                Ok(tx)
155                            } else {
156                                self.txs.insert(tx_hash.clone(), None);
157                                Err(TransactionDependencyError::NotFound(format!(
158                                    "fetching transaction: {:?}",
159                                    header
160                                )))
161                            }
162                        }
163                        status => {
164                            self.txs.insert(tx_hash.clone(), None);
165                            Err(TransactionDependencyError::NotFound(format!(
166                                "fetching transaction: {:?}",
167                                status
168                            )))
169                        }
170                    }
171                } else {
172                    self.txs.insert(tx_hash.clone(), None);
173                    Err(TransactionDependencyError::NotFound(format!(
174                        "fetching transaction: {:?}",
175                        data
176                    )))
177                }
178            }
179            status => {
180                self.txs.insert(tx_hash.clone(), None);
181                Err(TransactionDependencyError::NotFound(format!(
182                    "fetching transaction: {:?}",
183                    status
184                )))
185            }
186        }
187    }
188
189    async fn get_cell_async(
190        &self,
191        out_point: &OutPoint,
192    ) -> Result<CellOutput, TransactionDependencyError> {
193        let tx = self.get_transaction_async(&out_point.tx_hash()).await?;
194        let output_index: u32 = out_point.index().unpack();
195        tx.outputs().get(output_index as usize).ok_or_else(|| {
196            TransactionDependencyError::NotFound(format!("invalid output index: {}", output_index))
197        })
198    }
199    async fn get_cell_data_async(
200        &self,
201        out_point: &OutPoint,
202    ) -> Result<Bytes, TransactionDependencyError> {
203        let tx = self.get_transaction_async(&out_point.tx_hash()).await?;
204        let output_index: u32 = out_point.index().unpack();
205        tx.outputs_data()
206            .get(output_index as usize)
207            .map(|packed_bytes| packed_bytes.raw_data())
208            .ok_or_else(|| {
209                TransactionDependencyError::NotFound(format!(
210                    "invalid output index: {}",
211                    output_index
212                ))
213            })
214    }
215    async fn get_header_async(
216        &self,
217        block_hash: &Byte32,
218    ) -> Result<HeaderView, TransactionDependencyError> {
219        if let Some(Some(header)) = self
220            .headers
221            .get(block_hash)
222            .as_ref()
223            .map(|pair| pair.value())
224        {
225            return Ok(header.clone());
226        }
227        match self
228            .client
229            .fetch_header(block_hash.unpack())
230            .await
231            .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))?
232        {
233            FetchStatus::Fetched { data } => {
234                let header: HeaderView = data.into();
235                self.headers
236                    .insert(block_hash.clone(), Some(header.clone()));
237                Ok(header)
238            }
239            status => {
240                self.headers.insert(block_hash.clone(), None);
241                Err(TransactionDependencyError::NotFound(format!(
242                    "fetching header: {:?}",
243                    status
244                )))
245            }
246        }
247    }
248
249    async fn get_block_extension_async(
250        &self,
251        _block_hash: &Byte32,
252    ) -> Result<Option<ckb_types::packed::Bytes>, TransactionDependencyError> {
253        Err(TransactionDependencyError::NotFound(
254            "get_block_extension not supported".to_string(),
255        ))
256    }
257}
258
259#[derive(Clone)]
260pub struct LightClientCellCollector {
261    light_client: LightClientRpcAsyncClient,
262    offchain: OffchainCellCollector,
263}
264
265impl LightClientCellCollector {
266    pub fn new(url: &str) -> LightClientCellCollector {
267        let light_client = LightClientRpcAsyncClient::new(url);
268        LightClientCellCollector {
269            light_client,
270            offchain: OffchainCellCollector::default(),
271        }
272    }
273}
274
275#[cfg_attr(target_arch="wasm32", async_trait::async_trait(?Send))]
276#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
277impl CellCollector for LightClientCellCollector {
278    async fn collect_live_cells_async(
279        &mut self,
280        query: &CellQueryOptions,
281        apply_changes: bool,
282    ) -> Result<(Vec<LiveCell>, u64), CellCollectorError> {
283        let max_mature_number = 0;
284        self.offchain.max_mature_number = max_mature_number;
285        let tip_num = self
286            .light_client
287            .get_tip_header()
288            .await
289            .map_err(|err| CellCollectorError::Internal(anyhow!(err)))?
290            .inner
291            .number
292            .value();
293        let CollectResult {
294            cells,
295            rest_cells,
296            mut total_capacity,
297        } = self.offchain.collect(query, tip_num);
298        let mut cells: Vec<_> = cells.into_iter().map(|c| c.0).collect();
299
300        if total_capacity < query.min_total_capacity {
301            let order = match query.order {
302                QueryOrder::Asc => Order::Asc,
303                QueryOrder::Desc => Order::Desc,
304            };
305            let mut ret_cells: HashMap<_, _> = cells
306                .into_iter()
307                .map(|c| (c.out_point.clone(), c))
308                .collect();
309            let locked_cells = self.offchain.locked_cells.clone();
310            let search_key = SearchKey::from(query.clone());
311            const MAX_LIMIT: u32 = 4096;
312            let mut limit: u32 = query.limit.unwrap_or(16);
313            let mut last_cursor: Option<json_types::JsonBytes> = None;
314            while total_capacity < query.min_total_capacity {
315                let page = self
316                    .light_client
317                    .get_cells(search_key.clone(), order.clone(), limit.into(), last_cursor)
318                    .await
319                    .map_err(|err| CellCollectorError::Internal(err.into()))?;
320                if page.objects.is_empty() {
321                    break;
322                }
323                for cell in page.objects {
324                    let live_cell = LiveCell::from(cell);
325                    if !query.match_cell(&live_cell, max_mature_number)
326                        || locked_cells.contains_key(&(
327                            live_cell.out_point.tx_hash().unpack(),
328                            live_cell.out_point.index().unpack(),
329                        ))
330                    {
331                        continue;
332                    }
333                    let capacity: u64 = live_cell.output.capacity().unpack();
334                    if ret_cells
335                        .insert(live_cell.out_point.clone(), live_cell)
336                        .is_none()
337                    {
338                        total_capacity += capacity;
339                    }
340                    if total_capacity >= query.min_total_capacity {
341                        break;
342                    }
343                }
344                last_cursor = Some(page.last_cursor);
345                if limit < MAX_LIMIT {
346                    limit *= 2;
347                }
348            }
349            cells = ret_cells.into_values().collect();
350        }
351        if apply_changes {
352            self.offchain.live_cells = rest_cells;
353            for cell in &cells {
354                self.lock_cell(cell.out_point.clone(), tip_num)?;
355            }
356        }
357        Ok((cells, total_capacity))
358    }
359
360    fn lock_cell(
361        &mut self,
362        out_point: OutPoint,
363        tip_number: u64,
364    ) -> Result<(), CellCollectorError> {
365        self.offchain.lock_cell(out_point, tip_number)
366    }
367    fn apply_tx(&mut self, tx: Transaction, tip_number: u64) -> Result<(), CellCollectorError> {
368        self.offchain.apply_tx(tx, tip_number)
369    }
370    fn reset(&mut self) {
371        self.offchain.reset();
372    }
373}