Skip to main content

forest/ipld/
util.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::Tipset;
5use crate::cid_collections::CidHashSet;
6use crate::ipld::Ipld;
7use crate::shim::clock::ChainEpoch;
8use crate::shim::executor::Receipt;
9use crate::utils::db::car_stream::CarBlock;
10use crate::utils::encoding::extract_cids;
11use crate::utils::multihash::prelude::*;
12use chrono::{DateTime, Utc};
13use cid::Cid;
14use futures::Stream;
15use fvm_ipld_blockstore::Blockstore;
16use parking_lot::Mutex;
17use pin_project_lite::pin_project;
18use std::borrow::Borrow;
19use std::collections::VecDeque;
20use std::pin::Pin;
21use std::sync::LazyLock;
22use std::task::{Context, Poll};
23
24#[derive(Default)]
25pub struct ExportStatus {
26    pub epoch: i64,
27    pub initial_epoch: i64,
28    pub exporting: bool,
29    pub cancelled: bool,
30    pub start_time: Option<DateTime<Utc>>,
31}
32
33pub static CHAIN_EXPORT_STATUS: LazyLock<Mutex<ExportStatus>> =
34    LazyLock::new(|| ExportStatus::default().into());
35
36fn update_epoch(new_value: i64) {
37    let mut mutex = CHAIN_EXPORT_STATUS.lock();
38    mutex.epoch = new_value;
39    if mutex.initial_epoch == 0 {
40        mutex.initial_epoch = new_value;
41    }
42}
43
44pub fn start_export() {
45    let mut mutex = CHAIN_EXPORT_STATUS.lock();
46    mutex.epoch = 0;
47    mutex.initial_epoch = 0;
48    mutex.exporting = true;
49    mutex.cancelled = false;
50    mutex.start_time = Some(Utc::now());
51}
52
53pub fn end_export() {
54    let mut mutex = CHAIN_EXPORT_STATUS.lock();
55    mutex.exporting = false;
56}
57
58pub fn cancel_export() {
59    let mut mutex = CHAIN_EXPORT_STATUS.lock();
60    mutex.exporting = false;
61    mutex.cancelled = true;
62}
63
64fn should_save_block_to_snapshot(cid: Cid) -> bool {
65    // Don't include identity CIDs.
66    // We only include raw and dagcbor, for now.
67    // Raw for "code" CIDs.
68    if cid.hash().code() == u64::from(MultihashCode::Identity) {
69        false
70    } else {
71        matches!(
72            cid.codec(),
73            crate::shim::crypto::IPLD_RAW | fvm_ipld_encoding::DAG_CBOR
74        )
75    }
76}
77
78/// Depth-first-search iterator for `ipld` leaf nodes.
79///
80/// This iterator consumes the given `ipld` structure and returns leaf nodes (i.e.,
81/// no list or map) in depth-first order. The iterator can be extended at any
82/// point by the caller.
83///
84/// Consider walking this `ipld` graph:
85/// ```text
86/// List
87///  ├ Integer(5)
88///  ├ Link(Y)
89///  └ String("string")
90///
91/// Link(Y):
92/// Map
93///  ├ "key1" => Bool(true)
94///  └ "key2" => Float(3.14)
95/// ```
96///
97/// If we walk the above `ipld` graph (replacing `Link(Y)` when it is encountered), the leaf nodes will be seen in this order:
98/// 1. `Integer(5)`
99/// 2. `Bool(true)`
100/// 3. `Float(3.14)`
101/// 4. `String("string")`
102pub struct DfsIter {
103    dfs: VecDeque<Ipld>,
104}
105
106impl DfsIter {
107    pub fn new(root: Ipld) -> Self {
108        DfsIter {
109            dfs: VecDeque::from([root]),
110        }
111    }
112
113    pub fn walk_next(&mut self, ipld: Ipld) {
114        self.dfs.push_front(ipld)
115    }
116}
117
118impl From<Cid> for DfsIter {
119    fn from(cid: Cid) -> Self {
120        DfsIter::new(Ipld::Link(cid))
121    }
122}
123
124impl Iterator for DfsIter {
125    type Item = Ipld;
126
127    fn next(&mut self) -> Option<Self::Item> {
128        while let Some(ipld) = self.dfs.pop_front() {
129            match ipld {
130                Ipld::List(list) => list.into_iter().rev().for_each(|elt| self.walk_next(elt)),
131                Ipld::Map(map) => map.into_values().rev().for_each(|elt| self.walk_next(elt)),
132                other => return Some(other),
133            }
134        }
135        None
136    }
137}
138
139enum IterateType {
140    Message(Cid),
141    MessageReceipts(Cid),
142    StateRoot(Cid),
143    EventsRoot(Cid),
144}
145
146enum Task {
147    // Yield the block, don't visit it.
148    Emit(Cid, Option<Vec<u8>>),
149    // Visit all the elements, recursively.
150    Iterate(ChainEpoch, Cid, IterateType, VecDeque<Cid>),
151}
152
153pin_project! {
154    pub struct ChainStream<DB, T> {
155        tipset_iter: T,
156        db: DB,
157        dfs: VecDeque<Task>, // Depth-first work queue.
158        seen: CidHashSet,
159        stateroot_limit_exclusive: ChainEpoch,
160        fail_on_dead_links: bool,
161        message_receipts: bool,
162        events: bool,
163        track_progress: bool,
164    }
165}
166
167impl<DB, T> ChainStream<DB, T> {
168    pub fn with_seen(mut self, seen: CidHashSet) -> Self {
169        self.seen = seen;
170        self
171    }
172
173    pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self {
174        self.fail_on_dead_links = fail_on_dead_links;
175        self
176    }
177
178    pub fn track_progress(mut self, track_progress: bool) -> Self {
179        self.track_progress = track_progress;
180        self
181    }
182
183    /// Enable traversal of message receipt roots during chain export.
184    pub fn with_message_receipts(mut self, message_receipts: bool) -> Self {
185        self.message_receipts = message_receipts;
186        self
187    }
188
189    /// Enable traversal of events roots during chain export.
190    /// Requires message receipts to be enabled as well.
191    pub fn with_events(mut self, events: bool) -> Self {
192        self.events = events;
193        self
194    }
195
196    #[allow(dead_code)]
197    pub fn into_seen(self) -> CidHashSet {
198        self.seen
199    }
200}
201
202/// Stream all blocks that are reachable before the `stateroot_limit` epoch in a depth-first
203/// fashion.
204/// After this limit, only block headers are streamed. Any dead links are reported as errors.
205///
206/// # Arguments
207///
208/// * `db` - A database that implements [`Blockstore`] interface.
209/// * `tipset_iter` - An iterator of [`Tipset`], descending order `$child -> $parent`.
210/// * `stateroot_limit` - An epoch that signifies how far back (exclusive) we need to inspect tipsets,
211///   in-depth. This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth`
212///   is the number of `[`Tipset`]` that needs inspection.
213pub fn stream_chain<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin>(
214    db: DB,
215    tipset_iter: ITER,
216    stateroot_limit_exclusive: ChainEpoch,
217) -> ChainStream<DB, ITER> {
218    ChainStream {
219        tipset_iter,
220        db,
221        dfs: VecDeque::new(),
222        seen: CidHashSet::default(),
223        stateroot_limit_exclusive,
224        fail_on_dead_links: true,
225        message_receipts: false,
226        events: false,
227        track_progress: false,
228    }
229}
230
231// Stream available graph in a depth-first search. All reachable nodes are touched and dead-links
232// are ignored.
233pub fn stream_graph<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin>(
234    db: DB,
235    tipset_iter: ITER,
236    stateroot_limit_exclusive: ChainEpoch,
237) -> ChainStream<DB, ITER> {
238    stream_chain(db, tipset_iter, stateroot_limit_exclusive).fail_on_dead_links(false)
239}
240
241impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
242    for ChainStream<DB, ITER>
243{
244    type Item = anyhow::Result<CarBlock>;
245
246    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
247        use Task::*;
248
249        let fail_on_dead_links = self.fail_on_dead_links;
250        let stateroot_limit_exclusive = self.stateroot_limit_exclusive;
251        let this = self.project();
252
253        loop {
254            while let Some(task) = this.dfs.front_mut() {
255                match task {
256                    Emit(_, _) => {
257                        if let Some(Emit(cid, data)) = this.dfs.pop_front() {
258                            if let Some(data) = data {
259                                return Poll::Ready(Some(Ok(CarBlock { cid, data })));
260                            } else if let Some(data) = this.db.get(&cid)? {
261                                return Poll::Ready(Some(Ok(CarBlock { cid, data })));
262                            } else if fail_on_dead_links {
263                                return Poll::Ready(Some(Err(anyhow::anyhow!(
264                                    "[Emit] missing key: {cid}"
265                                ))));
266                            };
267                        }
268                    }
269                    Iterate(epoch, block_cid, _type, cid_vec) => {
270                        if *this.track_progress {
271                            update_epoch(*epoch);
272                        }
273                        while let Some(cid) = cid_vec.pop_front() {
274                            // The link traversal implementation assumes there are three types of encoding:
275                            // 1. DAG_CBOR: needs to be reachable, so we add it to the queue and load.
276                            // 2. IPLD_RAW: WASM blocks, for example. Need to be loaded, but not traversed.
277                            // 3. _: ignore all other links
278                            // Don't revisit what's already been visited.
279                            if should_save_block_to_snapshot(cid) && this.seen.insert(cid) {
280                                if let Some(data) = this.db.get(&cid)? {
281                                    if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
282                                        let new_values = extract_cids(&data)?;
283                                        if !new_values.is_empty() {
284                                            cid_vec.reserve(new_values.len());
285                                            for v in new_values.into_iter().rev() {
286                                                cid_vec.push_front(v)
287                                            }
288                                        }
289                                    }
290                                    return Poll::Ready(Some(Ok(CarBlock { cid, data })));
291                                } else if fail_on_dead_links {
292                                    let type_display = match _type {
293                                        IterateType::Message(c) => {
294                                            format!("message {c}")
295                                        }
296                                        IterateType::StateRoot(c) => {
297                                            format!("state root {c}")
298                                        }
299                                        IterateType::MessageReceipts(c) => {
300                                            // Forgive message receipts
301                                            tracing::trace!(
302                                                "[Iterate] missing key: {cid} from message receipts {c} in block {block_cid} at epoch {epoch}"
303                                            );
304                                            continue;
305                                        }
306                                        IterateType::EventsRoot(c) => {
307                                            // Forgive events
308                                            tracing::trace!(
309                                                "[Iterate] missing key: {cid} from events root {c} in block {block_cid} at epoch {epoch}"
310                                            );
311                                            continue;
312                                        }
313                                    };
314                                    return Poll::Ready(Some(Err(anyhow::anyhow!(
315                                        "[Iterate] missing key: {cid} from {type_display} in block {block_cid} at epoch {epoch}"
316                                    ))));
317                                }
318                            }
319                        }
320                        this.dfs.pop_front();
321                    }
322                }
323            }
324
325            // This consumes a [`Tipset`] from the iterator one at a time. The next iteration of the
326            // enclosing loop is processing the queue. Once the desired depth has been reached -
327            // yield the block without walking the graph it represents.
328            if let Some(tipset) = this.tipset_iter.next() {
329                for block in tipset.borrow().block_headers() {
330                    let (cid, data) = block.car_block()?;
331                    if this.seen.insert(cid) {
332                        if *this.track_progress {
333                            update_epoch(block.epoch);
334                        }
335                        // Make sure we always yield a block otherwise.
336                        this.dfs.push_back(Emit(cid, Some(data)));
337
338                        if block.epoch == 0 {
339                            // The genesis block has some kind of dummy parent that needs to be emitted.
340                            for p in &block.parents {
341                                this.dfs.push_back(Emit(p, None));
342                            }
343                        }
344
345                        // Process block messages.
346                        if block.epoch > stateroot_limit_exclusive {
347                            this.dfs.push_back(Iterate(
348                                block.epoch,
349                                *block.cid(),
350                                IterateType::Message(block.messages),
351                                DfsIter::from(block.messages)
352                                    .filter_map(ipld_to_cid)
353                                    .collect(),
354                            ));
355                            if *this.message_receipts {
356                                this.dfs.push_back(Iterate(
357                                    block.epoch,
358                                    *block.cid(),
359                                    IterateType::MessageReceipts(block.message_receipts),
360                                    DfsIter::from(block.message_receipts)
361                                        .filter_map(ipld_to_cid)
362                                        .collect(),
363                                ));
364                            }
365                            // ignore failure as receipts are not required by a lite snapshot
366                            if *this.events
367                                && let Ok(receipts) =
368                                    Receipt::get_receipts(this.db, block.message_receipts)
369                            {
370                                for receipt in receipts {
371                                    if let Some(events_root) = receipt.events_root() {
372                                        this.dfs.push_back(Iterate(
373                                            block.epoch,
374                                            *block.cid(),
375                                            IterateType::EventsRoot(events_root),
376                                            DfsIter::from(events_root)
377                                                .filter_map(ipld_to_cid)
378                                                .collect(),
379                                        ));
380                                    }
381                                }
382                            }
383                        }
384
385                        // Visit the block if it's within required depth. And a special case for `0`
386                        // epoch to match Lotus' implementation.
387                        if block.epoch == 0 || block.epoch > stateroot_limit_exclusive {
388                            // NOTE: In the original `walk_snapshot` implementation we walk the dag
389                            // immediately. Which is what we do here as well, but using a queue.
390                            this.dfs.push_back(Iterate(
391                                block.epoch,
392                                *block.cid(),
393                                IterateType::StateRoot(block.state_root),
394                                DfsIter::from(block.state_root)
395                                    .filter_map(ipld_to_cid)
396                                    .collect(),
397                            ));
398                        }
399                    }
400                }
401            } else {
402                // That's it, nothing else to do. End of stream.
403                return Poll::Ready(None);
404            }
405        }
406    }
407}
408
409fn ipld_to_cid(ipld: Ipld) -> Option<Cid> {
410    if let Ipld::Link(cid) = ipld {
411        Some(cid)
412    } else {
413        None
414    }
415}