1use crate::blocks::Tipset;
5use crate::cid_collections::CidHashSet;
6use crate::ipld::Ipld;
7use crate::shim::clock::ChainEpoch;
8use crate::utils::db::car_stream::CarBlock;
9use crate::utils::encoding::extract_cids;
10use crate::utils::multihash::prelude::*;
11use chrono::{DateTime, Utc};
12use cid::Cid;
13use futures::Stream;
14use fvm_ipld_blockstore::Blockstore;
15use parking_lot::Mutex;
16use pin_project_lite::pin_project;
17use std::borrow::Borrow;
18use std::collections::VecDeque;
19use std::pin::Pin;
20use std::sync::LazyLock;
21use std::task::{Context, Poll};
22
23#[derive(Default)]
24pub struct ExportStatus {
25 pub epoch: i64,
26 pub initial_epoch: i64,
27 pub exporting: bool,
28 pub cancelled: bool,
29 pub start_time: Option<DateTime<Utc>>,
30}
31
32pub static CHAIN_EXPORT_STATUS: LazyLock<Mutex<ExportStatus>> =
33 LazyLock::new(|| ExportStatus::default().into());
34
35fn update_epoch(new_value: i64) {
36 let mut mutex = CHAIN_EXPORT_STATUS.lock();
37 mutex.epoch = new_value;
38 if mutex.initial_epoch == 0 {
39 mutex.initial_epoch = new_value;
40 }
41}
42
43pub fn start_export() {
44 let mut mutex = CHAIN_EXPORT_STATUS.lock();
45 mutex.epoch = 0;
46 mutex.initial_epoch = 0;
47 mutex.exporting = true;
48 mutex.cancelled = false;
49 mutex.start_time = Some(Utc::now());
50}
51
52pub fn end_export() {
53 let mut mutex = CHAIN_EXPORT_STATUS.lock();
54 mutex.exporting = false;
55}
56
57pub fn cancel_export() {
58 let mut mutex = CHAIN_EXPORT_STATUS.lock();
59 mutex.exporting = false;
60 mutex.cancelled = true;
61}
62
63fn should_save_block_to_snapshot(cid: Cid) -> bool {
64 if cid.hash().code() == u64::from(MultihashCode::Identity) {
68 false
69 } else {
70 matches!(
71 cid.codec(),
72 crate::shim::crypto::IPLD_RAW | fvm_ipld_encoding::DAG_CBOR
73 )
74 }
75}
76
77pub struct DfsIter {
102 dfs: VecDeque<Ipld>,
103}
104
105impl DfsIter {
106 pub fn new(root: Ipld) -> Self {
107 DfsIter {
108 dfs: VecDeque::from([root]),
109 }
110 }
111
112 pub fn walk_next(&mut self, ipld: Ipld) {
113 self.dfs.push_front(ipld)
114 }
115}
116
117impl From<Cid> for DfsIter {
118 fn from(cid: Cid) -> Self {
119 DfsIter::new(Ipld::Link(cid))
120 }
121}
122
123impl Iterator for DfsIter {
124 type Item = Ipld;
125
126 fn next(&mut self) -> Option<Self::Item> {
127 while let Some(ipld) = self.dfs.pop_front() {
128 match ipld {
129 Ipld::List(list) => list.into_iter().rev().for_each(|elt| self.walk_next(elt)),
130 Ipld::Map(map) => map.into_values().rev().for_each(|elt| self.walk_next(elt)),
131 other => return Some(other),
132 }
133 }
134 None
135 }
136}
137
138enum IterateType {
139 Message(Cid),
140 StateRoot(Cid),
141}
142
143enum Task {
144 Emit(Cid, Option<Vec<u8>>),
146 Iterate(ChainEpoch, Cid, IterateType, VecDeque<Cid>),
148}
149
150pin_project! {
151 pub struct ChainStream<DB, T> {
152 tipset_iter: T,
153 db: DB,
154 dfs: VecDeque<Task>, seen: CidHashSet,
156 stateroot_limit_exclusive: ChainEpoch,
157 fail_on_dead_links: bool,
158 track_progress: bool,
159 }
160}
161
162impl<DB, T> ChainStream<DB, T> {
163 pub fn with_seen(mut self, seen: CidHashSet) -> Self {
164 self.seen = seen;
165 self
166 }
167
168 pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self {
169 self.fail_on_dead_links = fail_on_dead_links;
170 self
171 }
172
173 pub fn track_progress(mut self, track_progress: bool) -> Self {
174 self.track_progress = track_progress;
175 self
176 }
177
178 #[allow(dead_code)]
179 pub fn into_seen(self) -> CidHashSet {
180 self.seen
181 }
182}
183
184pub fn stream_chain<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin>(
196 db: DB,
197 tipset_iter: ITER,
198 stateroot_limit_exclusive: ChainEpoch,
199) -> ChainStream<DB, ITER> {
200 ChainStream {
201 tipset_iter,
202 db,
203 dfs: VecDeque::new(),
204 seen: CidHashSet::default(),
205 stateroot_limit_exclusive,
206 fail_on_dead_links: true,
207 track_progress: false,
208 }
209}
210
211pub fn stream_graph<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin>(
214 db: DB,
215 tipset_iter: ITER,
216 stateroot_limit_exclusive: ChainEpoch,
217) -> ChainStream<DB, ITER> {
218 stream_chain(db, tipset_iter, stateroot_limit_exclusive).fail_on_dead_links(false)
219}
220
221impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
222 for ChainStream<DB, ITER>
223{
224 type Item = anyhow::Result<CarBlock>;
225
226 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
227 use Task::*;
228
229 let fail_on_dead_links = self.fail_on_dead_links;
230 let stateroot_limit_exclusive = self.stateroot_limit_exclusive;
231 let this = self.project();
232
233 loop {
234 while let Some(task) = this.dfs.front_mut() {
235 match task {
236 Emit(_, _) => {
237 if let Some(Emit(cid, data)) = this.dfs.pop_front() {
238 if let Some(data) = data {
239 return Poll::Ready(Some(Ok(CarBlock { cid, data })));
240 } else if let Some(data) = this.db.get(&cid)? {
241 return Poll::Ready(Some(Ok(CarBlock { cid, data })));
242 } else if fail_on_dead_links {
243 return Poll::Ready(Some(Err(anyhow::anyhow!(
244 "[Emit] missing key: {cid}"
245 ))));
246 };
247 }
248 }
249 Iterate(epoch, block_cid, _type, cid_vec) => {
250 if *this.track_progress {
251 update_epoch(*epoch);
252 }
253 while let Some(cid) = cid_vec.pop_front() {
254 if should_save_block_to_snapshot(cid) && this.seen.insert(cid) {
260 if let Some(data) = this.db.get(&cid)? {
261 if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
262 let new_values = extract_cids(&data)?;
263 if !new_values.is_empty() {
264 cid_vec.reserve(new_values.len());
265 for v in new_values.into_iter().rev() {
266 cid_vec.push_front(v)
267 }
268 }
269 }
270 return Poll::Ready(Some(Ok(CarBlock { cid, data })));
271 } else if fail_on_dead_links {
272 let type_display = match _type {
273 IterateType::Message(c) => {
274 format!("message {c}")
275 }
276 IterateType::StateRoot(c) => {
277 format!("state root {c}")
278 }
279 };
280 return Poll::Ready(Some(Err(anyhow::anyhow!(
281 "[Iterate] missing key: {cid} from {type_display} in block {block_cid} at epoch {epoch}"
282 ))));
283 }
284 }
285 }
286 this.dfs.pop_front();
287 }
288 }
289 }
290
291 if let Some(tipset) = this.tipset_iter.next() {
295 for block in tipset.borrow().block_headers() {
296 let (cid, data) = block.car_block()?;
297 if this.seen.insert(cid) {
298 if *this.track_progress {
299 update_epoch(block.epoch);
300 }
301 this.dfs.push_back(Emit(cid, Some(data)));
303
304 if block.epoch == 0 {
305 for p in &block.parents {
307 this.dfs.push_back(Emit(p, None));
308 }
309 }
310
311 if block.epoch > stateroot_limit_exclusive {
313 this.dfs.push_back(Iterate(
314 block.epoch,
315 *block.cid(),
316 IterateType::Message(block.messages),
317 DfsIter::from(block.messages)
318 .filter_map(ipld_to_cid)
319 .collect(),
320 ));
321 }
322
323 if block.epoch == 0 || block.epoch > stateroot_limit_exclusive {
326 this.dfs.push_back(Iterate(
329 block.epoch,
330 *block.cid(),
331 IterateType::StateRoot(block.state_root),
332 DfsIter::from(block.state_root)
333 .filter_map(ipld_to_cid)
334 .collect(),
335 ));
336 }
337 }
338 }
339 } else {
340 return Poll::Ready(None);
342 }
343 }
344 }
345}
346
347fn ipld_to_cid(ipld: Ipld) -> Option<Cid> {
348 if let Ipld::Link(cid) = ipld {
349 Some(cid)
350 } else {
351 None
352 }
353}