1use crate::blocks::Tipset;
5use crate::cid_collections::{CidHashSet, CidHashSetLike};
6use crate::ipld::Ipld;
7use crate::shim::clock::ChainEpoch;
8use crate::shim::executor::Receipt;
9use crate::utils::db::car_stream::CarBlock;
10use crate::utils::encoding::extract_cids;
11use crate::utils::multihash::prelude::*;
12use bytes::Bytes;
13use chrono::{DateTime, Utc};
14use cid::Cid;
15use futures::Stream;
16use fvm_ipld_blockstore::Blockstore;
17use parking_lot::Mutex;
18use pin_project_lite::pin_project;
19use std::borrow::Borrow;
20use std::collections::VecDeque;
21use std::pin::Pin;
22use std::sync::LazyLock;
23use std::task::{Context, Poll};
24
25#[derive(Default)]
26pub struct ExportStatus {
27 pub epoch: i64,
28 pub initial_epoch: i64,
29 pub exporting: bool,
30 pub cancelled: bool,
31 pub start_time: Option<DateTime<Utc>>,
32}
33
34pub static CHAIN_EXPORT_STATUS: LazyLock<Mutex<ExportStatus>> =
35 LazyLock::new(|| ExportStatus::default().into());
36
37fn update_epoch(new_value: i64) {
38 let mut mutex = CHAIN_EXPORT_STATUS.lock();
39 mutex.epoch = new_value;
40 if mutex.initial_epoch == 0 {
41 mutex.initial_epoch = new_value;
42 }
43}
44
45pub fn start_export() {
46 let mut mutex = CHAIN_EXPORT_STATUS.lock();
47 mutex.epoch = 0;
48 mutex.initial_epoch = 0;
49 mutex.exporting = true;
50 mutex.cancelled = false;
51 mutex.start_time = Some(Utc::now());
52}
53
54pub fn end_export() {
55 let mut mutex = CHAIN_EXPORT_STATUS.lock();
56 mutex.exporting = false;
57}
58
59pub fn cancel_export() {
60 let mut mutex = CHAIN_EXPORT_STATUS.lock();
61 mutex.exporting = false;
62 mutex.cancelled = true;
63}
64
65fn should_save_block_to_snapshot(cid: Cid) -> bool {
66 if cid.hash().code() == u64::from(MultihashCode::Identity) {
70 false
71 } else {
72 matches!(
73 cid.codec(),
74 crate::shim::crypto::IPLD_RAW | fvm_ipld_encoding::DAG_CBOR
75 )
76 }
77}
78
79pub struct DfsIter {
104 dfs: Vec<Ipld>,
105}
106
107impl DfsIter {
108 pub fn new(root: Ipld) -> Self {
109 DfsIter { dfs: vec![root] }
110 }
111}
112
113impl From<Cid> for DfsIter {
114 fn from(cid: Cid) -> Self {
115 DfsIter::new(Ipld::Link(cid))
116 }
117}
118
119impl Iterator for DfsIter {
120 type Item = Ipld;
121
122 fn next(&mut self) -> Option<Self::Item> {
123 while let Some(ipld) = self.dfs.pop() {
124 match ipld {
125 Ipld::List(list) => self.dfs.extend(list.into_iter().rev()),
126 Ipld::Map(map) => self.dfs.extend(map.into_values().rev()),
127 other => return Some(other),
128 }
129 }
130 None
131 }
132}
133
134enum IterateType {
135 Message(Cid),
136 MessageReceipts(Cid),
137 StateRoot(Cid),
138 EventsRoot(Cid),
139}
140
141enum Task {
142 Emit(Cid, Option<Bytes>),
144 Iterate(ChainEpoch, Cid, IterateType, Vec<Cid>),
146}
147
148pin_project! {
149 pub struct ChainStream<DB, T, S = CidHashSet> {
150 tipset_iter: T,
151 db: DB,
152 dfs: VecDeque<Task>, seen: S,
154 stateroot_limit_exclusive: ChainEpoch,
155 fail_on_dead_links: bool,
156 message_receipts: bool,
157 events: bool,
158 tipset_keys:bool,
159 track_progress: bool,
160 }
161}
162
163impl<DB, T, S> ChainStream<DB, T, S> {
164 pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self {
165 self.fail_on_dead_links = fail_on_dead_links;
166 self
167 }
168
169 pub fn track_progress(mut self, track_progress: bool) -> Self {
170 self.track_progress = track_progress;
171 self
172 }
173
174 pub fn with_message_receipts(mut self, message_receipts: bool) -> Self {
176 self.message_receipts = message_receipts;
177 self
178 }
179
180 pub fn with_events(mut self, events: bool) -> Self {
183 self.events = events;
184 self
185 }
186
187 pub fn with_tipset_keys(mut self, tipset_keys: bool) -> Self {
189 self.tipset_keys = tipset_keys;
190 self
191 }
192
193 pub fn into_seen(self) -> S {
194 self.seen
195 }
196}
197
198pub fn stream_chain<
210 DB: Blockstore,
211 T: Borrow<Tipset>,
212 ITER: Iterator<Item = T> + Unpin,
213 S: CidHashSetLike,
214>(
215 db: DB,
216 tipset_iter: ITER,
217 stateroot_limit_exclusive: ChainEpoch,
218 seen: S,
219) -> ChainStream<DB, ITER, S> {
220 ChainStream {
221 tipset_iter,
222 db,
223 dfs: VecDeque::new(),
224 seen,
225 stateroot_limit_exclusive,
226 fail_on_dead_links: true,
227 message_receipts: false,
228 events: false,
229 tipset_keys: false,
230 track_progress: false,
231 }
232}
233
234pub fn stream_graph<
237 DB: Blockstore,
238 T: Borrow<Tipset>,
239 ITER: Iterator<Item = T> + Unpin,
240 S: CidHashSetLike,
241>(
242 db: DB,
243 tipset_iter: ITER,
244 stateroot_limit_exclusive: ChainEpoch,
245 seen: S,
246) -> ChainStream<DB, ITER, S> {
247 stream_chain(db, tipset_iter, stateroot_limit_exclusive, seen).fail_on_dead_links(false)
248}
249
250impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin, S: CidHashSetLike> Stream
251 for ChainStream<DB, ITER, S>
252{
253 type Item = anyhow::Result<CarBlock>;
254
255 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
256 use Task::*;
257
258 let export_tipset_keys = self.tipset_keys;
259 let fail_on_dead_links = self.fail_on_dead_links;
260 let stateroot_limit_exclusive = self.stateroot_limit_exclusive;
261 let this = self.project();
262
263 loop {
264 while let Some(task) = this.dfs.front_mut() {
265 match task {
266 Emit(_, _) => {
267 if let Some(Emit(cid, data)) = this.dfs.pop_front() {
268 if let Some(data) = data {
269 return Poll::Ready(Some(Ok(CarBlock { cid, data })));
270 } else if let Some(data) = this.db.get(&cid)? {
271 return Poll::Ready(Some(Ok(CarBlock {
272 cid,
273 data: data.into(),
274 })));
275 } else if fail_on_dead_links {
276 return Poll::Ready(Some(Err(anyhow::anyhow!(
277 "[Emit] missing key: {cid}"
278 ))));
279 };
280 }
281 }
282 Iterate(epoch, block_cid, _type, cid_vec) => {
283 if *this.track_progress {
284 update_epoch(*epoch);
285 }
286 while let Some(cid) = cid_vec.pop() {
287 if should_save_block_to_snapshot(cid) && this.seen.insert(cid)? {
293 if let Some(data) = this.db.get(&cid)? {
294 if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
295 let new_values = extract_cids(&data)?;
296 cid_vec.extend(new_values.into_iter().rev());
297 }
298 return Poll::Ready(Some(Ok(CarBlock {
299 cid,
300 data: data.into(),
301 })));
302 } else if fail_on_dead_links {
303 let type_display = match _type {
304 IterateType::Message(c) => {
305 format!("message {c}")
306 }
307 IterateType::StateRoot(c) => {
308 format!("state root {c}")
309 }
310 IterateType::MessageReceipts(c) => {
311 tracing::trace!(
313 "[Iterate] missing key: {cid} from message receipts {c} in block {block_cid} at epoch {epoch}"
314 );
315 continue;
316 }
317 IterateType::EventsRoot(c) => {
318 tracing::trace!(
320 "[Iterate] missing key: {cid} from events root {c} in block {block_cid} at epoch {epoch}"
321 );
322 continue;
323 }
324 };
325 return Poll::Ready(Some(Err(anyhow::anyhow!(
326 "[Iterate] missing key: {cid} from {type_display} in block {block_cid} at epoch {epoch}"
327 ))));
328 }
329 }
330 }
331 this.dfs.pop_front();
332 }
333 }
334 }
335
336 if let Some(tipset) = this.tipset_iter.next() {
340 if export_tipset_keys
342 && let Ok(CarBlock { cid, data }) = tipset.borrow().key().car_block()
343 {
344 this.dfs.push_back(Emit(cid, Some(data)));
345 }
346
347 for block in tipset.borrow().block_headers() {
348 let (cid, data) = block.car_block()?;
349 if this.seen.insert(cid)? {
350 if *this.track_progress {
351 update_epoch(block.epoch);
352 }
353 this.dfs.push_back(Emit(cid, Some(data.into())));
355
356 if block.epoch == 0 {
357 for p in &block.parents {
359 this.dfs.push_back(Emit(p, None));
360 }
361 }
362
363 if block.epoch > stateroot_limit_exclusive {
365 this.dfs.push_back(Iterate(
366 block.epoch,
367 *block.cid(),
368 IterateType::Message(block.messages),
369 DfsIter::from(block.messages)
370 .filter_map(ipld_to_cid)
371 .collect(),
372 ));
373 if *this.message_receipts {
374 this.dfs.push_back(Iterate(
375 block.epoch,
376 *block.cid(),
377 IterateType::MessageReceipts(block.message_receipts),
378 DfsIter::from(block.message_receipts)
379 .filter_map(ipld_to_cid)
380 .collect(),
381 ));
382 }
383 if *this.events
385 && let Ok(receipts) =
386 Receipt::get_receipts(this.db, block.message_receipts)
387 {
388 for receipt in receipts {
389 if let Some(events_root) = receipt.events_root() {
390 this.dfs.push_back(Iterate(
391 block.epoch,
392 *block.cid(),
393 IterateType::EventsRoot(events_root),
394 DfsIter::from(events_root)
395 .filter_map(ipld_to_cid)
396 .collect(),
397 ));
398 }
399 }
400 }
401 }
402
403 if block.epoch == 0 || block.epoch > stateroot_limit_exclusive {
406 this.dfs.push_back(Iterate(
409 block.epoch,
410 *block.cid(),
411 IterateType::StateRoot(block.state_root),
412 DfsIter::from(block.state_root)
413 .filter_map(ipld_to_cid)
414 .collect(),
415 ));
416 }
417 }
418 }
419 } else {
420 return Poll::Ready(None);
422 }
423 }
424 }
425}
426
427pin_project! {
428 pub struct IpldStream<DB, S> {
429 db: DB,
430 cid_vec: Vec<Cid>,
431 seen: S,
432 }
433}
434
435impl<DB, S> IpldStream<DB, S> {
436 pub fn new(db: DB, roots: Vec<Cid>, seen: S) -> Self {
437 Self {
438 db,
439 cid_vec: roots,
440 seen,
441 }
442 }
443}
444
445impl<DB: Blockstore, S: CidHashSetLike> Stream for IpldStream<DB, S> {
446 type Item = anyhow::Result<CarBlock>;
447
448 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
449 let this = self.project();
450 while let Some(cid) = this.cid_vec.pop() {
451 if should_save_block_to_snapshot(cid) && this.seen.insert(cid)? {
452 if let Some(data) = this.db.get(&cid)? {
453 if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
454 let new_cids = extract_cids(&data)?;
455 this.cid_vec.extend(new_cids);
456 }
457 return Poll::Ready(Some(Ok(CarBlock {
458 cid,
459 data: data.into(),
460 })));
461 } else {
462 return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {cid}"))));
463 }
464 }
465 }
466 Poll::Ready(None)
468 }
469}
470
471fn ipld_to_cid(ipld: Ipld) -> Option<Cid> {
472 if let Ipld::Link(cid) = ipld {
473 Some(cid)
474 } else {
475 None
476 }
477}