Skip to main content

ckb_sdk/traits/
light_client_impls.rs

1use std::collections::HashMap;
2
3use anyhow::anyhow;
4use dashmap::DashMap;
5
6use ckb_jsonrpc_types as json_types;
7use ckb_types::{
8    bytes::Bytes,
9    core::{HeaderView, TransactionView},
10    packed::{Byte32, CellOutput, OutPoint, Transaction},
11    prelude::*,
12};
13
14use super::{offchain_impls::CollectResult, OffchainCellCollector};
15use crate::rpc::{
16    ckb_light_client::{FetchStatus, Order, SearchKey},
17    LightClientRpcAsyncClient,
18};
19use crate::traits::{
20    CellCollector, CellCollectorError, CellQueryOptions, HeaderDepResolver, LiveCell, QueryOrder,
21    TransactionDependencyError, TransactionDependencyProvider,
22};
23
24pub struct LightClientHeaderDepResolver {
25    client: LightClientRpcAsyncClient,
26    // tx_hash => HeaderView
27    headers: DashMap<Byte32, Option<HeaderView>>,
28}
29
30impl LightClientHeaderDepResolver {
31    pub fn new(url: &str) -> LightClientHeaderDepResolver {
32        let client = LightClientRpcAsyncClient::new(url);
33        LightClientHeaderDepResolver {
34            client,
35            headers: DashMap::new(),
36        }
37    }
38
39    /// Check if headers all fetched
40    pub fn is_ready(&self) -> bool {
41        self.headers.is_empty() || self.headers.iter().all(|pair| pair.value().is_some())
42    }
43}
44
45#[async_trait::async_trait]
46impl HeaderDepResolver for LightClientHeaderDepResolver {
47    async fn resolve_by_tx_async(
48        &self,
49        tx_hash: &Byte32,
50    ) -> Result<Option<HeaderView>, anyhow::Error> {
51        if let Some(Some(header)) = self.headers.get(tx_hash).as_ref().map(|pair| pair.value()) {
52            return Ok(Some(header.clone()));
53        }
54        match self.client.fetch_transaction(tx_hash.unpack()).await? {
55            FetchStatus::Fetched { data } => {
56                if let Some(block_hash) = data.tx_status.block_hash {
57                    match self.client.fetch_header(block_hash).await? {
58                        FetchStatus::Fetched { data } => {
59                            let header: HeaderView = data.into();
60                            self.headers.insert(tx_hash.clone(), Some(header.clone()));
61                            Ok(Some(header))
62                        }
63                        status => {
64                            self.headers.insert(tx_hash.clone(), None);
65                            Err(anyhow!("fetching header by transaction: {:?}", status))
66                        }
67                    }
68                } else {
69                    self.headers.insert(tx_hash.clone(), None);
70                    Err(anyhow!("fetching transaction: {:?}", data))
71                }
72            }
73            status => {
74                self.headers.insert(tx_hash.clone(), None);
75                Err(anyhow!("fetching header by transaction: {:?}", status))
76            }
77        }
78    }
79
80    async fn resolve_by_number_async(
81        &self,
82        number: u64,
83    ) -> Result<Option<HeaderView>, anyhow::Error> {
84        for pair in self.headers.iter() {
85            if let Some(header) = pair.value() {
86                if header.number() == number {
87                    return Ok(Some(header.clone()));
88                }
89            }
90        }
91        Err(anyhow!(
92                "unable to resolver header by number directly when use light client as backend, you can call resolve_by_tx(tx_hash) to load the header first."
93            ))
94    }
95}
96
97pub struct LightClientTransactionDependencyProvider {
98    client: LightClientRpcAsyncClient,
99    // headers to load
100    headers: DashMap<Byte32, Option<HeaderView>>,
101    // transactions to load
102    txs: DashMap<Byte32, Option<TransactionView>>,
103}
104
105impl LightClientTransactionDependencyProvider {
106    pub fn new(url: &str) -> LightClientTransactionDependencyProvider {
107        LightClientTransactionDependencyProvider {
108            client: LightClientRpcAsyncClient::new(url),
109            headers: DashMap::new(),
110            txs: DashMap::new(),
111        }
112    }
113
114    /// Check if headers and transactions all fetched
115    pub fn is_ready(&self) -> bool {
116        (self.headers.is_empty() && self.txs.is_empty())
117            || (self.headers.iter().all(|pair| pair.value().is_some())
118                && self.txs.iter().all(|pair| pair.value().is_some()))
119    }
120}
121
122#[async_trait::async_trait]
123impl TransactionDependencyProvider for LightClientTransactionDependencyProvider {
124    async fn get_transaction_async(
125        &self,
126        tx_hash: &Byte32,
127    ) -> Result<TransactionView, TransactionDependencyError> {
128        if let Some(Some(tx)) = self.txs.get(tx_hash).as_ref().map(|pair| pair.value()) {
129            return Ok(tx.clone());
130        }
131        match self
132            .client
133            .fetch_transaction(tx_hash.unpack())
134            .await
135            .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))?
136        {
137            FetchStatus::Fetched { data } => {
138                if let Some(block_hash) = data.tx_status.block_hash {
139                    match self
140                        .client
141                        .fetch_header(block_hash)
142                        .await
143                        .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))?
144                    {
145                        FetchStatus::Fetched { data: header_view } => {
146                            let header: HeaderView = header_view.into();
147                            if let Some(transaction_view) = data.transaction {
148                                let tx: TransactionView =
149                                    Transaction::from(transaction_view.inner).into_view();
150                                self.headers.insert(header.hash(), Some(header));
151                                self.txs.insert(tx_hash.clone(), Some(tx.clone()));
152                                Ok(tx)
153                            } else {
154                                self.txs.insert(tx_hash.clone(), None);
155                                Err(TransactionDependencyError::NotFound(format!(
156                                    "fetching transaction: {:?}",
157                                    header
158                                )))
159                            }
160                        }
161                        status => {
162                            self.txs.insert(tx_hash.clone(), None);
163                            Err(TransactionDependencyError::NotFound(format!(
164                                "fetching transaction: {:?}",
165                                status
166                            )))
167                        }
168                    }
169                } else {
170                    self.txs.insert(tx_hash.clone(), None);
171                    Err(TransactionDependencyError::NotFound(format!(
172                        "fetching transaction: {:?}",
173                        data
174                    )))
175                }
176            }
177            status => {
178                self.txs.insert(tx_hash.clone(), None);
179                Err(TransactionDependencyError::NotFound(format!(
180                    "fetching transaction: {:?}",
181                    status
182                )))
183            }
184        }
185    }
186
187    async fn get_cell_async(
188        &self,
189        out_point: &OutPoint,
190    ) -> Result<CellOutput, TransactionDependencyError> {
191        let tx = self.get_transaction_async(&out_point.tx_hash()).await?;
192        let output_index: u32 = out_point.index().unpack();
193        tx.outputs().get(output_index as usize).ok_or_else(|| {
194            TransactionDependencyError::NotFound(format!("invalid output index: {}", output_index))
195        })
196    }
197    async fn get_cell_data_async(
198        &self,
199        out_point: &OutPoint,
200    ) -> Result<Bytes, TransactionDependencyError> {
201        let tx = self.get_transaction_async(&out_point.tx_hash()).await?;
202        let output_index: u32 = out_point.index().unpack();
203        tx.outputs_data()
204            .get(output_index as usize)
205            .map(|packed_bytes| packed_bytes.raw_data())
206            .ok_or_else(|| {
207                TransactionDependencyError::NotFound(format!(
208                    "invalid output index: {}",
209                    output_index
210                ))
211            })
212    }
213    async fn get_header_async(
214        &self,
215        block_hash: &Byte32,
216    ) -> Result<HeaderView, TransactionDependencyError> {
217        if let Some(Some(header)) = self
218            .headers
219            .get(block_hash)
220            .as_ref()
221            .map(|pair| pair.value())
222        {
223            return Ok(header.clone());
224        }
225        match self
226            .client
227            .fetch_header(block_hash.unpack())
228            .await
229            .map_err(|err| TransactionDependencyError::Other(anyhow!(err)))?
230        {
231            FetchStatus::Fetched { data } => {
232                let header: HeaderView = data.into();
233                self.headers
234                    .insert(block_hash.clone(), Some(header.clone()));
235                Ok(header)
236            }
237            status => {
238                self.headers.insert(block_hash.clone(), None);
239                Err(TransactionDependencyError::NotFound(format!(
240                    "fetching header: {:?}",
241                    status
242                )))
243            }
244        }
245    }
246
247    async fn get_block_extension_async(
248        &self,
249        _block_hash: &Byte32,
250    ) -> Result<Option<ckb_types::packed::Bytes>, TransactionDependencyError> {
251        Err(TransactionDependencyError::NotFound(
252            "get_block_extension not supported".to_string(),
253        ))
254    }
255}
256
257#[derive(Clone)]
258pub struct LightClientCellCollector {
259    light_client: LightClientRpcAsyncClient,
260    offchain: OffchainCellCollector,
261}
262
263impl LightClientCellCollector {
264    pub fn new(url: &str) -> LightClientCellCollector {
265        let light_client = LightClientRpcAsyncClient::new(url);
266        LightClientCellCollector {
267            light_client,
268            offchain: OffchainCellCollector::default(),
269        }
270    }
271}
272
273#[async_trait::async_trait]
274impl CellCollector for LightClientCellCollector {
275    async fn collect_live_cells_async(
276        &mut self,
277        query: &CellQueryOptions,
278        apply_changes: bool,
279    ) -> Result<(Vec<LiveCell>, u64), CellCollectorError> {
280        let max_mature_number = 0;
281        self.offchain.max_mature_number = max_mature_number;
282        let tip_num = self
283            .light_client
284            .get_tip_header()
285            .await
286            .map_err(|err| CellCollectorError::Internal(anyhow!(err)))?
287            .inner
288            .number
289            .value();
290        let CollectResult {
291            cells,
292            rest_cells,
293            mut total_capacity,
294        } = self.offchain.collect(query, tip_num);
295        let mut cells: Vec<_> = cells.into_iter().map(|c| c.0).collect();
296
297        if total_capacity < query.min_total_capacity {
298            let order = match query.order {
299                QueryOrder::Asc => Order::Asc,
300                QueryOrder::Desc => Order::Desc,
301            };
302            let mut ret_cells: HashMap<_, _> = cells
303                .into_iter()
304                .map(|c| (c.out_point.clone(), c))
305                .collect();
306            let locked_cells = self.offchain.locked_cells.clone();
307            let search_key = SearchKey::from(query.clone());
308            const MAX_LIMIT: u32 = 4096;
309            let mut limit: u32 = query.limit.unwrap_or(16);
310            let mut last_cursor: Option<json_types::JsonBytes> = None;
311            while total_capacity < query.min_total_capacity {
312                let page = self
313                    .light_client
314                    .get_cells(search_key.clone(), order.clone(), limit.into(), last_cursor)
315                    .await
316                    .map_err(|err| CellCollectorError::Internal(err.into()))?;
317                if page.objects.is_empty() {
318                    break;
319                }
320                for cell in page.objects {
321                    let live_cell = LiveCell::from(cell);
322                    if !query.match_cell(&live_cell, max_mature_number)
323                        || locked_cells.contains_key(&(
324                            live_cell.out_point.tx_hash().unpack(),
325                            live_cell.out_point.index().unpack(),
326                        ))
327                    {
328                        continue;
329                    }
330                    let capacity: u64 = live_cell.output.capacity().unpack();
331                    if ret_cells
332                        .insert(live_cell.out_point.clone(), live_cell)
333                        .is_none()
334                    {
335                        total_capacity += capacity;
336                    }
337                    if total_capacity >= query.min_total_capacity {
338                        break;
339                    }
340                }
341                last_cursor = Some(page.last_cursor);
342                if limit < MAX_LIMIT {
343                    limit *= 2;
344                }
345            }
346            cells = ret_cells.into_values().collect();
347        }
348        if apply_changes {
349            self.offchain.live_cells = rest_cells;
350            for cell in &cells {
351                self.lock_cell(cell.out_point.clone(), tip_num)?;
352            }
353        }
354        Ok((cells, total_capacity))
355    }
356
357    fn lock_cell(
358        &mut self,
359        out_point: OutPoint,
360        tip_number: u64,
361    ) -> Result<(), CellCollectorError> {
362        self.offchain.lock_cell(out_point, tip_number)
363    }
364    fn apply_tx(&mut self, tx: Transaction, tip_number: u64) -> Result<(), CellCollectorError> {
365        self.offchain.apply_tx(tx, tip_number)
366    }
367    fn reset(&mut self) {
368        self.offchain.reset();
369    }
370}