Skip to main content

forest/ipld/
util.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::Tipset;
5use crate::cid_collections::{CidHashSet, CidHashSetLike};
6use crate::ipld::Ipld;
7use crate::shim::clock::ChainEpoch;
8use crate::shim::executor::Receipt;
9use crate::utils::db::car_stream::CarBlock;
10use crate::utils::encoding::extract_cids;
11use crate::utils::multihash::prelude::*;
12use chrono::{DateTime, Utc};
13use cid::Cid;
14use futures::Stream;
15use fvm_ipld_blockstore::Blockstore;
16use parking_lot::Mutex;
17use pin_project_lite::pin_project;
18use std::borrow::Borrow;
19use std::collections::VecDeque;
20use std::pin::Pin;
21use std::sync::LazyLock;
22use std::task::{Context, Poll};
23
24#[derive(Default)]
25pub struct ExportStatus {
26    pub epoch: i64,
27    pub initial_epoch: i64,
28    pub exporting: bool,
29    pub cancelled: bool,
30    pub start_time: Option<DateTime<Utc>>,
31}
32
33pub static CHAIN_EXPORT_STATUS: LazyLock<Mutex<ExportStatus>> =
34    LazyLock::new(|| ExportStatus::default().into());
35
36fn update_epoch(new_value: i64) {
37    let mut mutex = CHAIN_EXPORT_STATUS.lock();
38    mutex.epoch = new_value;
39    if mutex.initial_epoch == 0 {
40        mutex.initial_epoch = new_value;
41    }
42}
43
44pub fn start_export() {
45    let mut mutex = CHAIN_EXPORT_STATUS.lock();
46    mutex.epoch = 0;
47    mutex.initial_epoch = 0;
48    mutex.exporting = true;
49    mutex.cancelled = false;
50    mutex.start_time = Some(Utc::now());
51}
52
53pub fn end_export() {
54    let mut mutex = CHAIN_EXPORT_STATUS.lock();
55    mutex.exporting = false;
56}
57
58pub fn cancel_export() {
59    let mut mutex = CHAIN_EXPORT_STATUS.lock();
60    mutex.exporting = false;
61    mutex.cancelled = true;
62}
63
64fn should_save_block_to_snapshot(cid: Cid) -> bool {
65    // Don't include identity CIDs.
66    // We only include raw and dagcbor, for now.
67    // Raw for "code" CIDs.
68    if cid.hash().code() == u64::from(MultihashCode::Identity) {
69        false
70    } else {
71        matches!(
72            cid.codec(),
73            crate::shim::crypto::IPLD_RAW | fvm_ipld_encoding::DAG_CBOR
74        )
75    }
76}
77
78/// Depth-first-search iterator for `ipld` leaf nodes.
79///
80/// This iterator consumes the given `ipld` structure and returns leaf nodes (i.e.,
81/// no list or map) in depth-first order. The iterator can be extended at any
82/// point by the caller.
83///
84/// Consider walking this `ipld` graph:
85/// ```text
86/// List
87///  ├ Integer(5)
88///  ├ Link(Y)
89///  └ String("string")
90///
91/// Link(Y):
92/// Map
93///  ├ "key1" => Bool(true)
94///  └ "key2" => Float(3.14)
95/// ```
96///
97/// If we walk the above `ipld` graph (replacing `Link(Y)` when it is encountered), the leaf nodes will be seen in this order:
98/// 1. `Integer(5)`
99/// 2. `Bool(true)`
100/// 3. `Float(3.14)`
101/// 4. `String("string")`
102pub struct DfsIter {
103    dfs: Vec<Ipld>,
104}
105
106impl DfsIter {
107    pub fn new(root: Ipld) -> Self {
108        DfsIter { dfs: vec![root] }
109    }
110}
111
112impl From<Cid> for DfsIter {
113    fn from(cid: Cid) -> Self {
114        DfsIter::new(Ipld::Link(cid))
115    }
116}
117
118impl Iterator for DfsIter {
119    type Item = Ipld;
120
121    fn next(&mut self) -> Option<Self::Item> {
122        while let Some(ipld) = self.dfs.pop() {
123            match ipld {
124                Ipld::List(list) => self.dfs.extend(list.into_iter().rev()),
125                Ipld::Map(map) => self.dfs.extend(map.into_values().rev()),
126                other => return Some(other),
127            }
128        }
129        None
130    }
131}
132
133enum IterateType {
134    Message(Cid),
135    MessageReceipts(Cid),
136    StateRoot(Cid),
137    EventsRoot(Cid),
138}
139
140enum Task {
141    // Yield the block, don't visit it.
142    Emit(Cid, Option<Vec<u8>>),
143    // Visit all the elements, recursively.
144    Iterate(ChainEpoch, Cid, IterateType, Vec<Cid>),
145}
146
147pin_project! {
148    pub struct ChainStream<DB, T, S = CidHashSet> {
149        tipset_iter: T,
150        db: DB,
151        dfs: VecDeque<Task>, // Depth-first work queue.
152        seen: S,
153        stateroot_limit_exclusive: ChainEpoch,
154        fail_on_dead_links: bool,
155        message_receipts: bool,
156        events: bool,
157        tipset_keys:bool,
158        track_progress: bool,
159    }
160}
161
162impl<DB, T, S> ChainStream<DB, T, S> {
163    pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self {
164        self.fail_on_dead_links = fail_on_dead_links;
165        self
166    }
167
168    pub fn track_progress(mut self, track_progress: bool) -> Self {
169        self.track_progress = track_progress;
170        self
171    }
172
173    /// Whether to enable traversal of message receipt roots during chain export.
174    pub fn with_message_receipts(mut self, message_receipts: bool) -> Self {
175        self.message_receipts = message_receipts;
176        self
177    }
178
179    /// Whether to enable traversal of events roots during chain export.
180    /// Requires message receipts to be enabled as well.
181    pub fn with_events(mut self, events: bool) -> Self {
182        self.events = events;
183        self
184    }
185
186    /// Whether to export tipset keys.
187    pub fn with_tipset_keys(mut self, tipset_keys: bool) -> Self {
188        self.tipset_keys = tipset_keys;
189        self
190    }
191
192    pub fn into_seen(self) -> S {
193        self.seen
194    }
195}
196
197/// Stream all blocks that are reachable before the `stateroot_limit` epoch in a depth-first
198/// fashion.
199/// After this limit, only block headers are streamed. Any dead links are reported as errors.
200///
201/// # Arguments
202///
203/// * `db` - A database that implements [`Blockstore`] interface.
204/// * `tipset_iter` - An iterator of [`Tipset`], descending order `$child -> $parent`.
205/// * `stateroot_limit` - An epoch that signifies how far back (exclusive) we need to inspect tipsets,
206///   in-depth. This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth`
207///   is the number of `[`Tipset`]` that needs inspection.
208pub fn stream_chain<
209    DB: Blockstore,
210    T: Borrow<Tipset>,
211    ITER: Iterator<Item = T> + Unpin,
212    S: CidHashSetLike,
213>(
214    db: DB,
215    tipset_iter: ITER,
216    stateroot_limit_exclusive: ChainEpoch,
217    seen: S,
218) -> ChainStream<DB, ITER, S> {
219    ChainStream {
220        tipset_iter,
221        db,
222        dfs: VecDeque::new(),
223        seen,
224        stateroot_limit_exclusive,
225        fail_on_dead_links: true,
226        message_receipts: false,
227        events: false,
228        tipset_keys: false,
229        track_progress: false,
230    }
231}
232
233// Stream available graph in a depth-first search. All reachable nodes are touched and dead-links
234// are ignored.
235pub fn stream_graph<
236    DB: Blockstore,
237    T: Borrow<Tipset>,
238    ITER: Iterator<Item = T> + Unpin,
239    S: CidHashSetLike,
240>(
241    db: DB,
242    tipset_iter: ITER,
243    stateroot_limit_exclusive: ChainEpoch,
244    seen: S,
245) -> ChainStream<DB, ITER, S> {
246    stream_chain(db, tipset_iter, stateroot_limit_exclusive, seen).fail_on_dead_links(false)
247}
248
249impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin, S: CidHashSetLike> Stream
250    for ChainStream<DB, ITER, S>
251{
252    type Item = anyhow::Result<CarBlock>;
253
254    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
255        use Task::*;
256
257        let export_tipset_keys = self.tipset_keys;
258        let fail_on_dead_links = self.fail_on_dead_links;
259        let stateroot_limit_exclusive = self.stateroot_limit_exclusive;
260        let this = self.project();
261
262        loop {
263            while let Some(task) = this.dfs.front_mut() {
264                match task {
265                    Emit(_, _) => {
266                        if let Some(Emit(cid, data)) = this.dfs.pop_front() {
267                            if let Some(data) = data {
268                                return Poll::Ready(Some(Ok(CarBlock { cid, data })));
269                            } else if let Some(data) = this.db.get(&cid)? {
270                                return Poll::Ready(Some(Ok(CarBlock { cid, data })));
271                            } else if fail_on_dead_links {
272                                return Poll::Ready(Some(Err(anyhow::anyhow!(
273                                    "[Emit] missing key: {cid}"
274                                ))));
275                            };
276                        }
277                    }
278                    Iterate(epoch, block_cid, _type, cid_vec) => {
279                        if *this.track_progress {
280                            update_epoch(*epoch);
281                        }
282                        while let Some(cid) = cid_vec.pop() {
283                            // The link traversal implementation assumes there are three types of encoding:
284                            // 1. DAG_CBOR: needs to be reachable, so we add it to the queue and load.
285                            // 2. IPLD_RAW: WASM blocks, for example. Need to be loaded, but not traversed.
286                            // 3. _: ignore all other links
287                            // Don't revisit what's already been visited.
288                            if should_save_block_to_snapshot(cid) && this.seen.insert(cid)? {
289                                if let Some(data) = this.db.get(&cid)? {
290                                    if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
291                                        let new_values = extract_cids(&data)?;
292                                        cid_vec.extend(new_values.into_iter().rev());
293                                    }
294                                    return Poll::Ready(Some(Ok(CarBlock { cid, data })));
295                                } else if fail_on_dead_links {
296                                    let type_display = match _type {
297                                        IterateType::Message(c) => {
298                                            format!("message {c}")
299                                        }
300                                        IterateType::StateRoot(c) => {
301                                            format!("state root {c}")
302                                        }
303                                        IterateType::MessageReceipts(c) => {
304                                            // Forgive message receipts
305                                            tracing::trace!(
306                                                "[Iterate] missing key: {cid} from message receipts {c} in block {block_cid} at epoch {epoch}"
307                                            );
308                                            continue;
309                                        }
310                                        IterateType::EventsRoot(c) => {
311                                            // Forgive events
312                                            tracing::trace!(
313                                                "[Iterate] missing key: {cid} from events root {c} in block {block_cid} at epoch {epoch}"
314                                            );
315                                            continue;
316                                        }
317                                    };
318                                    return Poll::Ready(Some(Err(anyhow::anyhow!(
319                                        "[Iterate] missing key: {cid} from {type_display} in block {block_cid} at epoch {epoch}"
320                                    ))));
321                                }
322                            }
323                        }
324                        this.dfs.pop_front();
325                    }
326                }
327            }
328
329            // This consumes a [`Tipset`] from the iterator one at a time. The next iteration of the
330            // enclosing loop is processing the queue. Once the desired depth has been reached -
331            // yield the block without walking the graph it represents.
332            if let Some(tipset) = this.tipset_iter.next() {
333                // Tipset key cid can be convert from and to eth hash, which is useful for Eth APIs
334                if export_tipset_keys
335                    && let Ok(CarBlock { cid, data }) = tipset.borrow().key().car_block()
336                {
337                    this.dfs.push_back(Emit(cid, Some(data)));
338                }
339
340                for block in tipset.borrow().block_headers() {
341                    let (cid, data) = block.car_block()?;
342                    if this.seen.insert(cid)? {
343                        if *this.track_progress {
344                            update_epoch(block.epoch);
345                        }
346                        // Make sure we always yield a block otherwise.
347                        this.dfs.push_back(Emit(cid, Some(data)));
348
349                        if block.epoch == 0 {
350                            // The genesis block has some kind of dummy parent that needs to be emitted.
351                            for p in &block.parents {
352                                this.dfs.push_back(Emit(p, None));
353                            }
354                        }
355
356                        // Process block messages.
357                        if block.epoch > stateroot_limit_exclusive {
358                            this.dfs.push_back(Iterate(
359                                block.epoch,
360                                *block.cid(),
361                                IterateType::Message(block.messages),
362                                DfsIter::from(block.messages)
363                                    .filter_map(ipld_to_cid)
364                                    .collect(),
365                            ));
366                            if *this.message_receipts {
367                                this.dfs.push_back(Iterate(
368                                    block.epoch,
369                                    *block.cid(),
370                                    IterateType::MessageReceipts(block.message_receipts),
371                                    DfsIter::from(block.message_receipts)
372                                        .filter_map(ipld_to_cid)
373                                        .collect(),
374                                ));
375                            }
376                            // ignore failure as receipts are not required by a lite snapshot
377                            if *this.events
378                                && let Ok(receipts) =
379                                    Receipt::get_receipts(this.db, block.message_receipts)
380                            {
381                                for receipt in receipts {
382                                    if let Some(events_root) = receipt.events_root() {
383                                        this.dfs.push_back(Iterate(
384                                            block.epoch,
385                                            *block.cid(),
386                                            IterateType::EventsRoot(events_root),
387                                            DfsIter::from(events_root)
388                                                .filter_map(ipld_to_cid)
389                                                .collect(),
390                                        ));
391                                    }
392                                }
393                            }
394                        }
395
396                        // Visit the block if it's within required depth. And a special case for `0`
397                        // epoch to match Lotus' implementation.
398                        if block.epoch == 0 || block.epoch > stateroot_limit_exclusive {
399                            // NOTE: In the original `walk_snapshot` implementation we walk the dag
400                            // immediately. Which is what we do here as well, but using a queue.
401                            this.dfs.push_back(Iterate(
402                                block.epoch,
403                                *block.cid(),
404                                IterateType::StateRoot(block.state_root),
405                                DfsIter::from(block.state_root)
406                                    .filter_map(ipld_to_cid)
407                                    .collect(),
408                            ));
409                        }
410                    }
411                }
412            } else {
413                // That's it, nothing else to do. End of stream.
414                return Poll::Ready(None);
415            }
416        }
417    }
418}
419
420pin_project! {
421    pub struct IpldStream<DB, S> {
422        db: DB,
423        cid_vec: Vec<Cid>,
424        seen: S,
425    }
426}
427
428impl<DB, S> IpldStream<DB, S> {
429    pub fn new(db: DB, roots: Vec<Cid>, seen: S) -> Self {
430        Self {
431            db,
432            cid_vec: roots,
433            seen,
434        }
435    }
436}
437
438impl<DB: Blockstore, S: CidHashSetLike> Stream for IpldStream<DB, S> {
439    type Item = anyhow::Result<CarBlock>;
440
441    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
442        let this = self.project();
443        while let Some(cid) = this.cid_vec.pop() {
444            if should_save_block_to_snapshot(cid) && this.seen.insert(cid)? {
445                if let Some(data) = this.db.get(&cid)? {
446                    if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
447                        let new_cids = extract_cids(&data)?;
448                        this.cid_vec.extend(new_cids);
449                    }
450                    return Poll::Ready(Some(Ok(CarBlock { cid, data })));
451                } else {
452                    return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {cid}"))));
453                }
454            }
455        }
456        // That's it, nothing else to do. End of stream.
457        Poll::Ready(None)
458    }
459}
460
461fn ipld_to_cid(ipld: Ipld) -> Option<Cid> {
462    if let Ipld::Link(cid) = ipld {
463        Some(cid)
464    } else {
465        None
466    }
467}