forest/ipld/
util.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::Tipset;
5use crate::cid_collections::CidHashSet;
6use crate::ipld::Ipld;
7use crate::shim::clock::ChainEpoch;
8use crate::utils::db::car_stream::CarBlock;
9use crate::utils::encoding::extract_cids;
10use crate::utils::multihash::prelude::*;
11use chrono::{DateTime, Utc};
12use cid::Cid;
13use futures::Stream;
14use fvm_ipld_blockstore::Blockstore;
15use parking_lot::Mutex;
16use pin_project_lite::pin_project;
17use std::borrow::Borrow;
18use std::collections::VecDeque;
19use std::pin::Pin;
20use std::sync::LazyLock;
21use std::task::{Context, Poll};
22
23#[derive(Default)]
24pub struct ExportStatus {
25    pub epoch: i64,
26    pub initial_epoch: i64,
27    pub exporting: bool,
28    pub cancelled: bool,
29    pub start_time: Option<DateTime<Utc>>,
30}
31
32pub static CHAIN_EXPORT_STATUS: LazyLock<Mutex<ExportStatus>> =
33    LazyLock::new(|| ExportStatus::default().into());
34
35fn update_epoch(new_value: i64) {
36    let mut mutex = CHAIN_EXPORT_STATUS.lock();
37    mutex.epoch = new_value;
38    if mutex.initial_epoch == 0 {
39        mutex.initial_epoch = new_value;
40    }
41}
42
43pub fn start_export() {
44    let mut mutex = CHAIN_EXPORT_STATUS.lock();
45    mutex.epoch = 0;
46    mutex.initial_epoch = 0;
47    mutex.exporting = true;
48    mutex.cancelled = false;
49    mutex.start_time = Some(Utc::now());
50}
51
52pub fn end_export() {
53    let mut mutex = CHAIN_EXPORT_STATUS.lock();
54    mutex.exporting = false;
55}
56
57pub fn cancel_export() {
58    let mut mutex = CHAIN_EXPORT_STATUS.lock();
59    mutex.exporting = false;
60    mutex.cancelled = true;
61}
62
63fn should_save_block_to_snapshot(cid: Cid) -> bool {
64    // Don't include identity CIDs.
65    // We only include raw and dagcbor, for now.
66    // Raw for "code" CIDs.
67    if cid.hash().code() == u64::from(MultihashCode::Identity) {
68        false
69    } else {
70        matches!(
71            cid.codec(),
72            crate::shim::crypto::IPLD_RAW | fvm_ipld_encoding::DAG_CBOR
73        )
74    }
75}
76
77/// Depth-first-search iterator for `ipld` leaf nodes.
78///
79/// This iterator consumes the given `ipld` structure and returns leaf nodes (i.e.,
80/// no list or map) in depth-first order. The iterator can be extended at any
81/// point by the caller.
82///
83/// Consider walking this `ipld` graph:
84/// ```text
85/// List
86///  ├ Integer(5)
87///  ├ Link(Y)
88///  └ String("string")
89///
90/// Link(Y):
91/// Map
92///  ├ "key1" => Bool(true)
93///  └ "key2" => Float(3.14)
94/// ```
95///
96/// If we walk the above `ipld` graph (replacing `Link(Y)` when it is encountered), the leaf nodes will be seen in this order:
97/// 1. `Integer(5)`
98/// 2. `Bool(true)`
99/// 3. `Float(3.14)`
100/// 4. `String("string")`
101pub struct DfsIter {
102    dfs: VecDeque<Ipld>,
103}
104
105impl DfsIter {
106    pub fn new(root: Ipld) -> Self {
107        DfsIter {
108            dfs: VecDeque::from([root]),
109        }
110    }
111
112    pub fn walk_next(&mut self, ipld: Ipld) {
113        self.dfs.push_front(ipld)
114    }
115}
116
117impl From<Cid> for DfsIter {
118    fn from(cid: Cid) -> Self {
119        DfsIter::new(Ipld::Link(cid))
120    }
121}
122
123impl Iterator for DfsIter {
124    type Item = Ipld;
125
126    fn next(&mut self) -> Option<Self::Item> {
127        while let Some(ipld) = self.dfs.pop_front() {
128            match ipld {
129                Ipld::List(list) => list.into_iter().rev().for_each(|elt| self.walk_next(elt)),
130                Ipld::Map(map) => map.into_values().rev().for_each(|elt| self.walk_next(elt)),
131                other => return Some(other),
132            }
133        }
134        None
135    }
136}
137
138enum IterateType {
139    Message(Cid),
140    StateRoot(Cid),
141}
142
143enum Task {
144    // Yield the block, don't visit it.
145    Emit(Cid, Option<Vec<u8>>),
146    // Visit all the elements, recursively.
147    Iterate(ChainEpoch, Cid, IterateType, VecDeque<Cid>),
148}
149
150pin_project! {
151    pub struct ChainStream<DB, T> {
152        tipset_iter: T,
153        db: DB,
154        dfs: VecDeque<Task>, // Depth-first work queue.
155        seen: CidHashSet,
156        stateroot_limit_exclusive: ChainEpoch,
157        fail_on_dead_links: bool,
158        track_progress: bool,
159    }
160}
161
162impl<DB, T> ChainStream<DB, T> {
163    pub fn with_seen(mut self, seen: CidHashSet) -> Self {
164        self.seen = seen;
165        self
166    }
167
168    pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self {
169        self.fail_on_dead_links = fail_on_dead_links;
170        self
171    }
172
173    pub fn track_progress(mut self, track_progress: bool) -> Self {
174        self.track_progress = track_progress;
175        self
176    }
177
178    #[allow(dead_code)]
179    pub fn into_seen(self) -> CidHashSet {
180        self.seen
181    }
182}
183
184/// Stream all blocks that are reachable before the `stateroot_limit` epoch in a depth-first
185/// fashion.
186/// After this limit, only block headers are streamed. Any dead links are reported as errors.
187///
188/// # Arguments
189///
190/// * `db` - A database that implements [`Blockstore`] interface.
191/// * `tipset_iter` - An iterator of [`Tipset`], descending order `$child -> $parent`.
192/// * `stateroot_limit` - An epoch that signifies how far back (exclusive) we need to inspect tipsets,
193///   in-depth. This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth`
194///   is the number of `[`Tipset`]` that needs inspection.
195pub fn stream_chain<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin>(
196    db: DB,
197    tipset_iter: ITER,
198    stateroot_limit_exclusive: ChainEpoch,
199) -> ChainStream<DB, ITER> {
200    ChainStream {
201        tipset_iter,
202        db,
203        dfs: VecDeque::new(),
204        seen: CidHashSet::default(),
205        stateroot_limit_exclusive,
206        fail_on_dead_links: true,
207        track_progress: false,
208    }
209}
210
211// Stream available graph in a depth-first search. All reachable nodes are touched and dead-links
212// are ignored.
213pub fn stream_graph<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin>(
214    db: DB,
215    tipset_iter: ITER,
216    stateroot_limit_exclusive: ChainEpoch,
217) -> ChainStream<DB, ITER> {
218    stream_chain(db, tipset_iter, stateroot_limit_exclusive).fail_on_dead_links(false)
219}
220
221impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
222    for ChainStream<DB, ITER>
223{
224    type Item = anyhow::Result<CarBlock>;
225
226    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
227        use Task::*;
228
229        let fail_on_dead_links = self.fail_on_dead_links;
230        let stateroot_limit_exclusive = self.stateroot_limit_exclusive;
231        let this = self.project();
232
233        loop {
234            while let Some(task) = this.dfs.front_mut() {
235                match task {
236                    Emit(_, _) => {
237                        if let Some(Emit(cid, data)) = this.dfs.pop_front() {
238                            if let Some(data) = data {
239                                return Poll::Ready(Some(Ok(CarBlock { cid, data })));
240                            } else if let Some(data) = this.db.get(&cid)? {
241                                return Poll::Ready(Some(Ok(CarBlock { cid, data })));
242                            } else if fail_on_dead_links {
243                                return Poll::Ready(Some(Err(anyhow::anyhow!(
244                                    "[Emit] missing key: {cid}"
245                                ))));
246                            };
247                        }
248                    }
249                    Iterate(epoch, block_cid, _type, cid_vec) => {
250                        if *this.track_progress {
251                            update_epoch(*epoch);
252                        }
253                        while let Some(cid) = cid_vec.pop_front() {
254                            // The link traversal implementation assumes there are three types of encoding:
255                            // 1. DAG_CBOR: needs to be reachable, so we add it to the queue and load.
256                            // 2. IPLD_RAW: WASM blocks, for example. Need to be loaded, but not traversed.
257                            // 3. _: ignore all other links
258                            // Don't revisit what's already been visited.
259                            if should_save_block_to_snapshot(cid) && this.seen.insert(cid) {
260                                if let Some(data) = this.db.get(&cid)? {
261                                    if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
262                                        let new_values = extract_cids(&data)?;
263                                        if !new_values.is_empty() {
264                                            cid_vec.reserve(new_values.len());
265                                            for v in new_values.into_iter().rev() {
266                                                cid_vec.push_front(v)
267                                            }
268                                        }
269                                    }
270                                    return Poll::Ready(Some(Ok(CarBlock { cid, data })));
271                                } else if fail_on_dead_links {
272                                    let type_display = match _type {
273                                        IterateType::Message(c) => {
274                                            format!("message {c}")
275                                        }
276                                        IterateType::StateRoot(c) => {
277                                            format!("state root {c}")
278                                        }
279                                    };
280                                    return Poll::Ready(Some(Err(anyhow::anyhow!(
281                                        "[Iterate] missing key: {cid} from {type_display} in block {block_cid} at epoch {epoch}"
282                                    ))));
283                                }
284                            }
285                        }
286                        this.dfs.pop_front();
287                    }
288                }
289            }
290
291            // This consumes a [`Tipset`] from the iterator one at a time. The next iteration of the
292            // enclosing loop is processing the queue. Once the desired depth has been reached -
293            // yield the block without walking the graph it represents.
294            if let Some(tipset) = this.tipset_iter.next() {
295                for block in tipset.borrow().block_headers() {
296                    let (cid, data) = block.car_block()?;
297                    if this.seen.insert(cid) {
298                        if *this.track_progress {
299                            update_epoch(block.epoch);
300                        }
301                        // Make sure we always yield a block otherwise.
302                        this.dfs.push_back(Emit(cid, Some(data)));
303
304                        if block.epoch == 0 {
305                            // The genesis block has some kind of dummy parent that needs to be emitted.
306                            for p in &block.parents {
307                                this.dfs.push_back(Emit(p, None));
308                            }
309                        }
310
311                        // Process block messages.
312                        if block.epoch > stateroot_limit_exclusive {
313                            this.dfs.push_back(Iterate(
314                                block.epoch,
315                                *block.cid(),
316                                IterateType::Message(block.messages),
317                                DfsIter::from(block.messages)
318                                    .filter_map(ipld_to_cid)
319                                    .collect(),
320                            ));
321                        }
322
323                        // Visit the block if it's within required depth. And a special case for `0`
324                        // epoch to match Lotus' implementation.
325                        if block.epoch == 0 || block.epoch > stateroot_limit_exclusive {
326                            // NOTE: In the original `walk_snapshot` implementation we walk the dag
327                            // immediately. Which is what we do here as well, but using a queue.
328                            this.dfs.push_back(Iterate(
329                                block.epoch,
330                                *block.cid(),
331                                IterateType::StateRoot(block.state_root),
332                                DfsIter::from(block.state_root)
333                                    .filter_map(ipld_to_cid)
334                                    .collect(),
335                            ));
336                        }
337                    }
338                }
339            } else {
340                // That's it, nothing else to do. End of stream.
341                return Poll::Ready(None);
342            }
343        }
344    }
345}
346
347fn ipld_to_cid(ipld: Ipld) -> Option<Cid> {
348    if let Ipld::Link(cid) = ipld {
349        Some(cid)
350    } else {
351        None
352    }
353}