Skip to main content

forest/ipld/
util.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::Tipset;
5use crate::cid_collections::{CidHashSet, CidHashSetLike};
6use crate::ipld::Ipld;
7use crate::shim::clock::ChainEpoch;
8use crate::shim::executor::Receipt;
9use crate::utils::db::car_stream::CarBlock;
10use crate::utils::encoding::extract_cids;
11use crate::utils::multihash::prelude::*;
12use bytes::Bytes;
13use chrono::{DateTime, Utc};
14use cid::Cid;
15use futures::Stream;
16use fvm_ipld_blockstore::Blockstore;
17use parking_lot::Mutex;
18use pin_project_lite::pin_project;
19use std::borrow::Borrow;
20use std::collections::VecDeque;
21use std::pin::Pin;
22use std::sync::LazyLock;
23use std::task::{Context, Poll};
24
25#[derive(Default)]
26pub struct ExportStatus {
27    pub epoch: i64,
28    pub initial_epoch: i64,
29    pub exporting: bool,
30    pub cancelled: bool,
31    pub start_time: Option<DateTime<Utc>>,
32}
33
34pub static CHAIN_EXPORT_STATUS: LazyLock<Mutex<ExportStatus>> =
35    LazyLock::new(|| ExportStatus::default().into());
36
37fn update_epoch(new_value: i64) {
38    let mut mutex = CHAIN_EXPORT_STATUS.lock();
39    mutex.epoch = new_value;
40    if mutex.initial_epoch == 0 {
41        mutex.initial_epoch = new_value;
42    }
43}
44
45pub fn start_export() {
46    let mut mutex = CHAIN_EXPORT_STATUS.lock();
47    mutex.epoch = 0;
48    mutex.initial_epoch = 0;
49    mutex.exporting = true;
50    mutex.cancelled = false;
51    mutex.start_time = Some(Utc::now());
52}
53
54pub fn end_export() {
55    let mut mutex = CHAIN_EXPORT_STATUS.lock();
56    mutex.exporting = false;
57}
58
59pub fn cancel_export() {
60    let mut mutex = CHAIN_EXPORT_STATUS.lock();
61    mutex.exporting = false;
62    mutex.cancelled = true;
63}
64
65fn should_save_block_to_snapshot(cid: Cid) -> bool {
66    // Don't include identity CIDs.
67    // We only include raw and dagcbor, for now.
68    // Raw for "code" CIDs.
69    if cid.hash().code() == u64::from(MultihashCode::Identity) {
70        false
71    } else {
72        matches!(
73            cid.codec(),
74            crate::shim::crypto::IPLD_RAW | fvm_ipld_encoding::DAG_CBOR
75        )
76    }
77}
78
79/// Depth-first-search iterator for `ipld` leaf nodes.
80///
81/// This iterator consumes the given `ipld` structure and returns leaf nodes (i.e.,
82/// no list or map) in depth-first order. The iterator can be extended at any
83/// point by the caller.
84///
85/// Consider walking this `ipld` graph:
86/// ```text
87/// List
88///  ├ Integer(5)
89///  ├ Link(Y)
90///  └ String("string")
91///
92/// Link(Y):
93/// Map
94///  ├ "key1" => Bool(true)
95///  └ "key2" => Float(3.14)
96/// ```
97///
98/// If we walk the above `ipld` graph (replacing `Link(Y)` when it is encountered), the leaf nodes will be seen in this order:
99/// 1. `Integer(5)`
100/// 2. `Bool(true)`
101/// 3. `Float(3.14)`
102/// 4. `String("string")`
103pub struct DfsIter {
104    dfs: Vec<Ipld>,
105}
106
107impl DfsIter {
108    pub fn new(root: Ipld) -> Self {
109        DfsIter { dfs: vec![root] }
110    }
111}
112
113impl From<Cid> for DfsIter {
114    fn from(cid: Cid) -> Self {
115        DfsIter::new(Ipld::Link(cid))
116    }
117}
118
119impl Iterator for DfsIter {
120    type Item = Ipld;
121
122    fn next(&mut self) -> Option<Self::Item> {
123        while let Some(ipld) = self.dfs.pop() {
124            match ipld {
125                Ipld::List(list) => self.dfs.extend(list.into_iter().rev()),
126                Ipld::Map(map) => self.dfs.extend(map.into_values().rev()),
127                other => return Some(other),
128            }
129        }
130        None
131    }
132}
133
134enum IterateType {
135    Message(Cid),
136    MessageReceipts(Cid),
137    StateRoot(Cid),
138    EventsRoot(Cid),
139}
140
141enum Task {
142    // Yield the block, don't visit it.
143    Emit(Cid, Option<Bytes>),
144    // Visit all the elements, recursively.
145    Iterate(ChainEpoch, Cid, IterateType, Vec<Cid>),
146}
147
148pin_project! {
149    pub struct ChainStream<DB, T, S = CidHashSet> {
150        tipset_iter: T,
151        db: DB,
152        dfs: VecDeque<Task>, // Depth-first work queue.
153        seen: S,
154        stateroot_limit_exclusive: ChainEpoch,
155        fail_on_dead_links: bool,
156        message_receipts: bool,
157        events: bool,
158        tipset_keys:bool,
159        track_progress: bool,
160    }
161}
162
163impl<DB, T, S> ChainStream<DB, T, S> {
164    pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self {
165        self.fail_on_dead_links = fail_on_dead_links;
166        self
167    }
168
169    pub fn track_progress(mut self, track_progress: bool) -> Self {
170        self.track_progress = track_progress;
171        self
172    }
173
174    /// Whether to enable traversal of message receipt roots during chain export.
175    pub fn with_message_receipts(mut self, message_receipts: bool) -> Self {
176        self.message_receipts = message_receipts;
177        self
178    }
179
180    /// Whether to enable traversal of events roots during chain export.
181    /// Requires message receipts to be enabled as well.
182    pub fn with_events(mut self, events: bool) -> Self {
183        self.events = events;
184        self
185    }
186
187    /// Whether to export tipset keys.
188    pub fn with_tipset_keys(mut self, tipset_keys: bool) -> Self {
189        self.tipset_keys = tipset_keys;
190        self
191    }
192
193    pub fn into_seen(self) -> S {
194        self.seen
195    }
196}
197
198/// Stream all blocks that are reachable before the `stateroot_limit` epoch in a depth-first
199/// fashion.
200/// After this limit, only block headers are streamed. Any dead links are reported as errors.
201///
202/// # Arguments
203///
204/// * `db` - A database that implements [`Blockstore`] interface.
205/// * `tipset_iter` - An iterator of [`Tipset`], descending order `$child -> $parent`.
206/// * `stateroot_limit` - An epoch that signifies how far back (exclusive) we need to inspect tipsets,
207///   in-depth. This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth`
208///   is the number of `[`Tipset`]` that needs inspection.
209pub fn stream_chain<
210    DB: Blockstore,
211    T: Borrow<Tipset>,
212    ITER: Iterator<Item = T> + Unpin,
213    S: CidHashSetLike,
214>(
215    db: DB,
216    tipset_iter: ITER,
217    stateroot_limit_exclusive: ChainEpoch,
218    seen: S,
219) -> ChainStream<DB, ITER, S> {
220    ChainStream {
221        tipset_iter,
222        db,
223        dfs: VecDeque::new(),
224        seen,
225        stateroot_limit_exclusive,
226        fail_on_dead_links: true,
227        message_receipts: false,
228        events: false,
229        tipset_keys: false,
230        track_progress: false,
231    }
232}
233
234// Stream available graph in a depth-first search. All reachable nodes are touched and dead-links
235// are ignored.
236pub fn stream_graph<
237    DB: Blockstore,
238    T: Borrow<Tipset>,
239    ITER: Iterator<Item = T> + Unpin,
240    S: CidHashSetLike,
241>(
242    db: DB,
243    tipset_iter: ITER,
244    stateroot_limit_exclusive: ChainEpoch,
245    seen: S,
246) -> ChainStream<DB, ITER, S> {
247    stream_chain(db, tipset_iter, stateroot_limit_exclusive, seen).fail_on_dead_links(false)
248}
249
250impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin, S: CidHashSetLike> Stream
251    for ChainStream<DB, ITER, S>
252{
253    type Item = anyhow::Result<CarBlock>;
254
255    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
256        use Task::*;
257
258        let export_tipset_keys = self.tipset_keys;
259        let fail_on_dead_links = self.fail_on_dead_links;
260        let stateroot_limit_exclusive = self.stateroot_limit_exclusive;
261        let this = self.project();
262
263        loop {
264            while let Some(task) = this.dfs.front_mut() {
265                match task {
266                    Emit(_, _) => {
267                        if let Some(Emit(cid, data)) = this.dfs.pop_front() {
268                            if let Some(data) = data {
269                                return Poll::Ready(Some(Ok(CarBlock { cid, data })));
270                            } else if let Some(data) = this.db.get(&cid)? {
271                                return Poll::Ready(Some(Ok(CarBlock {
272                                    cid,
273                                    data: data.into(),
274                                })));
275                            } else if fail_on_dead_links {
276                                return Poll::Ready(Some(Err(anyhow::anyhow!(
277                                    "[Emit] missing key: {cid}"
278                                ))));
279                            };
280                        }
281                    }
282                    Iterate(epoch, block_cid, _type, cid_vec) => {
283                        if *this.track_progress {
284                            update_epoch(*epoch);
285                        }
286                        while let Some(cid) = cid_vec.pop() {
287                            // The link traversal implementation assumes there are three types of encoding:
288                            // 1. DAG_CBOR: needs to be reachable, so we add it to the queue and load.
289                            // 2. IPLD_RAW: WASM blocks, for example. Need to be loaded, but not traversed.
290                            // 3. _: ignore all other links
291                            // Don't revisit what's already been visited.
292                            if should_save_block_to_snapshot(cid) && this.seen.insert(cid)? {
293                                if let Some(data) = this.db.get(&cid)? {
294                                    if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
295                                        let new_values = extract_cids(&data)?;
296                                        cid_vec.extend(new_values.into_iter().rev());
297                                    }
298                                    return Poll::Ready(Some(Ok(CarBlock {
299                                        cid,
300                                        data: data.into(),
301                                    })));
302                                } else if fail_on_dead_links {
303                                    let type_display = match _type {
304                                        IterateType::Message(c) => {
305                                            format!("message {c}")
306                                        }
307                                        IterateType::StateRoot(c) => {
308                                            format!("state root {c}")
309                                        }
310                                        IterateType::MessageReceipts(c) => {
311                                            // Forgive message receipts
312                                            tracing::trace!(
313                                                "[Iterate] missing key: {cid} from message receipts {c} in block {block_cid} at epoch {epoch}"
314                                            );
315                                            continue;
316                                        }
317                                        IterateType::EventsRoot(c) => {
318                                            // Forgive events
319                                            tracing::trace!(
320                                                "[Iterate] missing key: {cid} from events root {c} in block {block_cid} at epoch {epoch}"
321                                            );
322                                            continue;
323                                        }
324                                    };
325                                    return Poll::Ready(Some(Err(anyhow::anyhow!(
326                                        "[Iterate] missing key: {cid} from {type_display} in block {block_cid} at epoch {epoch}"
327                                    ))));
328                                }
329                            }
330                        }
331                        this.dfs.pop_front();
332                    }
333                }
334            }
335
336            // This consumes a [`Tipset`] from the iterator one at a time. The next iteration of the
337            // enclosing loop is processing the queue. Once the desired depth has been reached -
338            // yield the block without walking the graph it represents.
339            if let Some(tipset) = this.tipset_iter.next() {
340                // Tipset key cid can be convert from and to eth hash, which is useful for Eth APIs
341                if export_tipset_keys
342                    && let Ok(CarBlock { cid, data }) = tipset.borrow().key().car_block()
343                {
344                    this.dfs.push_back(Emit(cid, Some(data)));
345                }
346
347                for block in tipset.borrow().block_headers() {
348                    let (cid, data) = block.car_block()?;
349                    if this.seen.insert(cid)? {
350                        if *this.track_progress {
351                            update_epoch(block.epoch);
352                        }
353                        // Make sure we always yield a block otherwise.
354                        this.dfs.push_back(Emit(cid, Some(data.into())));
355
356                        if block.epoch == 0 {
357                            // The genesis block has some kind of dummy parent that needs to be emitted.
358                            for p in &block.parents {
359                                this.dfs.push_back(Emit(p, None));
360                            }
361                        }
362
363                        // Process block messages.
364                        if block.epoch > stateroot_limit_exclusive {
365                            this.dfs.push_back(Iterate(
366                                block.epoch,
367                                *block.cid(),
368                                IterateType::Message(block.messages),
369                                DfsIter::from(block.messages)
370                                    .filter_map(ipld_to_cid)
371                                    .collect(),
372                            ));
373                            if *this.message_receipts {
374                                this.dfs.push_back(Iterate(
375                                    block.epoch,
376                                    *block.cid(),
377                                    IterateType::MessageReceipts(block.message_receipts),
378                                    DfsIter::from(block.message_receipts)
379                                        .filter_map(ipld_to_cid)
380                                        .collect(),
381                                ));
382                            }
383                            // ignore failure as receipts are not required by a lite snapshot
384                            if *this.events
385                                && let Ok(receipts) =
386                                    Receipt::get_receipts(this.db, block.message_receipts)
387                            {
388                                for receipt in receipts {
389                                    if let Some(events_root) = receipt.events_root() {
390                                        this.dfs.push_back(Iterate(
391                                            block.epoch,
392                                            *block.cid(),
393                                            IterateType::EventsRoot(events_root),
394                                            DfsIter::from(events_root)
395                                                .filter_map(ipld_to_cid)
396                                                .collect(),
397                                        ));
398                                    }
399                                }
400                            }
401                        }
402
403                        // Visit the block if it's within required depth. And a special case for `0`
404                        // epoch to match Lotus' implementation.
405                        if block.epoch == 0 || block.epoch > stateroot_limit_exclusive {
406                            // NOTE: In the original `walk_snapshot` implementation we walk the dag
407                            // immediately. Which is what we do here as well, but using a queue.
408                            this.dfs.push_back(Iterate(
409                                block.epoch,
410                                *block.cid(),
411                                IterateType::StateRoot(block.state_root),
412                                DfsIter::from(block.state_root)
413                                    .filter_map(ipld_to_cid)
414                                    .collect(),
415                            ));
416                        }
417                    }
418                }
419            } else {
420                // That's it, nothing else to do. End of stream.
421                return Poll::Ready(None);
422            }
423        }
424    }
425}
426
427pin_project! {
428    pub struct IpldStream<DB, S> {
429        db: DB,
430        cid_vec: Vec<Cid>,
431        seen: S,
432    }
433}
434
435impl<DB, S> IpldStream<DB, S> {
436    pub fn new(db: DB, roots: Vec<Cid>, seen: S) -> Self {
437        Self {
438            db,
439            cid_vec: roots,
440            seen,
441        }
442    }
443}
444
445impl<DB: Blockstore, S: CidHashSetLike> Stream for IpldStream<DB, S> {
446    type Item = anyhow::Result<CarBlock>;
447
448    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
449        let this = self.project();
450        while let Some(cid) = this.cid_vec.pop() {
451            if should_save_block_to_snapshot(cid) && this.seen.insert(cid)? {
452                if let Some(data) = this.db.get(&cid)? {
453                    if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
454                        let new_cids = extract_cids(&data)?;
455                        this.cid_vec.extend(new_cids);
456                    }
457                    return Poll::Ready(Some(Ok(CarBlock {
458                        cid,
459                        data: data.into(),
460                    })));
461                } else {
462                    return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {cid}"))));
463                }
464            }
465        }
466        // That's it, nothing else to do. End of stream.
467        Poll::Ready(None)
468    }
469}
470
471fn ipld_to_cid(ipld: Ipld) -> Option<Cid> {
472    if let Ipld::Link(cid) = ipld {
473        Some(cid)
474    } else {
475        None
476    }
477}