1use crate::blocks::Tipset;
5use crate::cid_collections::{CidHashSet, CidHashSetLike};
6use crate::ipld::Ipld;
7use crate::shim::clock::ChainEpoch;
8use crate::shim::executor::Receipt;
9use crate::utils::db::car_stream::CarBlock;
10use crate::utils::encoding::extract_cids;
11use crate::utils::multihash::prelude::*;
12use chrono::{DateTime, Utc};
13use cid::Cid;
14use futures::Stream;
15use fvm_ipld_blockstore::Blockstore;
16use parking_lot::Mutex;
17use pin_project_lite::pin_project;
18use std::borrow::Borrow;
19use std::collections::VecDeque;
20use std::pin::Pin;
21use std::sync::LazyLock;
22use std::task::{Context, Poll};
23
24#[derive(Default)]
25pub struct ExportStatus {
26 pub epoch: i64,
27 pub initial_epoch: i64,
28 pub exporting: bool,
29 pub cancelled: bool,
30 pub start_time: Option<DateTime<Utc>>,
31}
32
33pub static CHAIN_EXPORT_STATUS: LazyLock<Mutex<ExportStatus>> =
34 LazyLock::new(|| ExportStatus::default().into());
35
36fn update_epoch(new_value: i64) {
37 let mut mutex = CHAIN_EXPORT_STATUS.lock();
38 mutex.epoch = new_value;
39 if mutex.initial_epoch == 0 {
40 mutex.initial_epoch = new_value;
41 }
42}
43
44pub fn start_export() {
45 let mut mutex = CHAIN_EXPORT_STATUS.lock();
46 mutex.epoch = 0;
47 mutex.initial_epoch = 0;
48 mutex.exporting = true;
49 mutex.cancelled = false;
50 mutex.start_time = Some(Utc::now());
51}
52
53pub fn end_export() {
54 let mut mutex = CHAIN_EXPORT_STATUS.lock();
55 mutex.exporting = false;
56}
57
58pub fn cancel_export() {
59 let mut mutex = CHAIN_EXPORT_STATUS.lock();
60 mutex.exporting = false;
61 mutex.cancelled = true;
62}
63
64fn should_save_block_to_snapshot(cid: Cid) -> bool {
65 if cid.hash().code() == u64::from(MultihashCode::Identity) {
69 false
70 } else {
71 matches!(
72 cid.codec(),
73 crate::shim::crypto::IPLD_RAW | fvm_ipld_encoding::DAG_CBOR
74 )
75 }
76}
77
78pub struct DfsIter {
103 dfs: Vec<Ipld>,
104}
105
106impl DfsIter {
107 pub fn new(root: Ipld) -> Self {
108 DfsIter { dfs: vec![root] }
109 }
110}
111
112impl From<Cid> for DfsIter {
113 fn from(cid: Cid) -> Self {
114 DfsIter::new(Ipld::Link(cid))
115 }
116}
117
118impl Iterator for DfsIter {
119 type Item = Ipld;
120
121 fn next(&mut self) -> Option<Self::Item> {
122 while let Some(ipld) = self.dfs.pop() {
123 match ipld {
124 Ipld::List(list) => self.dfs.extend(list.into_iter().rev()),
125 Ipld::Map(map) => self.dfs.extend(map.into_values().rev()),
126 other => return Some(other),
127 }
128 }
129 None
130 }
131}
132
133enum IterateType {
134 Message(Cid),
135 MessageReceipts(Cid),
136 StateRoot(Cid),
137 EventsRoot(Cid),
138}
139
140enum Task {
141 Emit(Cid, Option<Vec<u8>>),
143 Iterate(ChainEpoch, Cid, IterateType, Vec<Cid>),
145}
146
147pin_project! {
148 pub struct ChainStream<DB, T, S = CidHashSet> {
149 tipset_iter: T,
150 db: DB,
151 dfs: VecDeque<Task>, seen: S,
153 stateroot_limit_exclusive: ChainEpoch,
154 fail_on_dead_links: bool,
155 message_receipts: bool,
156 events: bool,
157 tipset_keys:bool,
158 track_progress: bool,
159 }
160}
161
162impl<DB, T, S> ChainStream<DB, T, S> {
163 pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self {
164 self.fail_on_dead_links = fail_on_dead_links;
165 self
166 }
167
168 pub fn track_progress(mut self, track_progress: bool) -> Self {
169 self.track_progress = track_progress;
170 self
171 }
172
173 pub fn with_message_receipts(mut self, message_receipts: bool) -> Self {
175 self.message_receipts = message_receipts;
176 self
177 }
178
179 pub fn with_events(mut self, events: bool) -> Self {
182 self.events = events;
183 self
184 }
185
186 pub fn with_tipset_keys(mut self, tipset_keys: bool) -> Self {
188 self.tipset_keys = tipset_keys;
189 self
190 }
191
192 pub fn into_seen(self) -> S {
193 self.seen
194 }
195}
196
197pub fn stream_chain<
209 DB: Blockstore,
210 T: Borrow<Tipset>,
211 ITER: Iterator<Item = T> + Unpin,
212 S: CidHashSetLike,
213>(
214 db: DB,
215 tipset_iter: ITER,
216 stateroot_limit_exclusive: ChainEpoch,
217 seen: S,
218) -> ChainStream<DB, ITER, S> {
219 ChainStream {
220 tipset_iter,
221 db,
222 dfs: VecDeque::new(),
223 seen,
224 stateroot_limit_exclusive,
225 fail_on_dead_links: true,
226 message_receipts: false,
227 events: false,
228 tipset_keys: false,
229 track_progress: false,
230 }
231}
232
233pub fn stream_graph<
236 DB: Blockstore,
237 T: Borrow<Tipset>,
238 ITER: Iterator<Item = T> + Unpin,
239 S: CidHashSetLike,
240>(
241 db: DB,
242 tipset_iter: ITER,
243 stateroot_limit_exclusive: ChainEpoch,
244 seen: S,
245) -> ChainStream<DB, ITER, S> {
246 stream_chain(db, tipset_iter, stateroot_limit_exclusive, seen).fail_on_dead_links(false)
247}
248
249impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin, S: CidHashSetLike> Stream
250 for ChainStream<DB, ITER, S>
251{
252 type Item = anyhow::Result<CarBlock>;
253
254 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
255 use Task::*;
256
257 let export_tipset_keys = self.tipset_keys;
258 let fail_on_dead_links = self.fail_on_dead_links;
259 let stateroot_limit_exclusive = self.stateroot_limit_exclusive;
260 let this = self.project();
261
262 loop {
263 while let Some(task) = this.dfs.front_mut() {
264 match task {
265 Emit(_, _) => {
266 if let Some(Emit(cid, data)) = this.dfs.pop_front() {
267 if let Some(data) = data {
268 return Poll::Ready(Some(Ok(CarBlock { cid, data })));
269 } else if let Some(data) = this.db.get(&cid)? {
270 return Poll::Ready(Some(Ok(CarBlock { cid, data })));
271 } else if fail_on_dead_links {
272 return Poll::Ready(Some(Err(anyhow::anyhow!(
273 "[Emit] missing key: {cid}"
274 ))));
275 };
276 }
277 }
278 Iterate(epoch, block_cid, _type, cid_vec) => {
279 if *this.track_progress {
280 update_epoch(*epoch);
281 }
282 while let Some(cid) = cid_vec.pop() {
283 if should_save_block_to_snapshot(cid) && this.seen.insert(cid)? {
289 if let Some(data) = this.db.get(&cid)? {
290 if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
291 let new_values = extract_cids(&data)?;
292 cid_vec.extend(new_values.into_iter().rev());
293 }
294 return Poll::Ready(Some(Ok(CarBlock { cid, data })));
295 } else if fail_on_dead_links {
296 let type_display = match _type {
297 IterateType::Message(c) => {
298 format!("message {c}")
299 }
300 IterateType::StateRoot(c) => {
301 format!("state root {c}")
302 }
303 IterateType::MessageReceipts(c) => {
304 tracing::trace!(
306 "[Iterate] missing key: {cid} from message receipts {c} in block {block_cid} at epoch {epoch}"
307 );
308 continue;
309 }
310 IterateType::EventsRoot(c) => {
311 tracing::trace!(
313 "[Iterate] missing key: {cid} from events root {c} in block {block_cid} at epoch {epoch}"
314 );
315 continue;
316 }
317 };
318 return Poll::Ready(Some(Err(anyhow::anyhow!(
319 "[Iterate] missing key: {cid} from {type_display} in block {block_cid} at epoch {epoch}"
320 ))));
321 }
322 }
323 }
324 this.dfs.pop_front();
325 }
326 }
327 }
328
329 if let Some(tipset) = this.tipset_iter.next() {
333 if export_tipset_keys
335 && let Ok(CarBlock { cid, data }) = tipset.borrow().key().car_block()
336 {
337 this.dfs.push_back(Emit(cid, Some(data)));
338 }
339
340 for block in tipset.borrow().block_headers() {
341 let (cid, data) = block.car_block()?;
342 if this.seen.insert(cid)? {
343 if *this.track_progress {
344 update_epoch(block.epoch);
345 }
346 this.dfs.push_back(Emit(cid, Some(data)));
348
349 if block.epoch == 0 {
350 for p in &block.parents {
352 this.dfs.push_back(Emit(p, None));
353 }
354 }
355
356 if block.epoch > stateroot_limit_exclusive {
358 this.dfs.push_back(Iterate(
359 block.epoch,
360 *block.cid(),
361 IterateType::Message(block.messages),
362 DfsIter::from(block.messages)
363 .filter_map(ipld_to_cid)
364 .collect(),
365 ));
366 if *this.message_receipts {
367 this.dfs.push_back(Iterate(
368 block.epoch,
369 *block.cid(),
370 IterateType::MessageReceipts(block.message_receipts),
371 DfsIter::from(block.message_receipts)
372 .filter_map(ipld_to_cid)
373 .collect(),
374 ));
375 }
376 if *this.events
378 && let Ok(receipts) =
379 Receipt::get_receipts(this.db, block.message_receipts)
380 {
381 for receipt in receipts {
382 if let Some(events_root) = receipt.events_root() {
383 this.dfs.push_back(Iterate(
384 block.epoch,
385 *block.cid(),
386 IterateType::EventsRoot(events_root),
387 DfsIter::from(events_root)
388 .filter_map(ipld_to_cid)
389 .collect(),
390 ));
391 }
392 }
393 }
394 }
395
396 if block.epoch == 0 || block.epoch > stateroot_limit_exclusive {
399 this.dfs.push_back(Iterate(
402 block.epoch,
403 *block.cid(),
404 IterateType::StateRoot(block.state_root),
405 DfsIter::from(block.state_root)
406 .filter_map(ipld_to_cid)
407 .collect(),
408 ));
409 }
410 }
411 }
412 } else {
413 return Poll::Ready(None);
415 }
416 }
417 }
418}
419
420pin_project! {
421 pub struct IpldStream<DB, S> {
422 db: DB,
423 cid_vec: Vec<Cid>,
424 seen: S,
425 }
426}
427
428impl<DB, S> IpldStream<DB, S> {
429 pub fn new(db: DB, roots: Vec<Cid>, seen: S) -> Self {
430 Self {
431 db,
432 cid_vec: roots,
433 seen,
434 }
435 }
436}
437
438impl<DB: Blockstore, S: CidHashSetLike> Stream for IpldStream<DB, S> {
439 type Item = anyhow::Result<CarBlock>;
440
441 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
442 let this = self.project();
443 while let Some(cid) = this.cid_vec.pop() {
444 if should_save_block_to_snapshot(cid) && this.seen.insert(cid)? {
445 if let Some(data) = this.db.get(&cid)? {
446 if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
447 let new_cids = extract_cids(&data)?;
448 this.cid_vec.extend(new_cids);
449 }
450 return Poll::Ready(Some(Ok(CarBlock { cid, data })));
451 } else {
452 return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {cid}"))));
453 }
454 }
455 }
456 Poll::Ready(None)
458 }
459}
460
461fn ipld_to_cid(ipld: Ipld) -> Option<Cid> {
462 if let Ipld::Link(cid) = ipld {
463 Some(cid)
464 } else {
465 None
466 }
467}