1use crate::blocks::Tipset;
5use crate::cid_collections::CidHashSet;
6use crate::ipld::Ipld;
7use crate::shim::clock::ChainEpoch;
8use crate::shim::executor::Receipt;
9use crate::utils::db::car_stream::CarBlock;
10use crate::utils::encoding::extract_cids;
11use crate::utils::multihash::prelude::*;
12use chrono::{DateTime, Utc};
13use cid::Cid;
14use futures::Stream;
15use fvm_ipld_blockstore::Blockstore;
16use parking_lot::Mutex;
17use pin_project_lite::pin_project;
18use std::borrow::Borrow;
19use std::collections::VecDeque;
20use std::pin::Pin;
21use std::sync::LazyLock;
22use std::task::{Context, Poll};
23
24#[derive(Default)]
25pub struct ExportStatus {
26 pub epoch: i64,
27 pub initial_epoch: i64,
28 pub exporting: bool,
29 pub cancelled: bool,
30 pub start_time: Option<DateTime<Utc>>,
31}
32
33pub static CHAIN_EXPORT_STATUS: LazyLock<Mutex<ExportStatus>> =
34 LazyLock::new(|| ExportStatus::default().into());
35
36fn update_epoch(new_value: i64) {
37 let mut mutex = CHAIN_EXPORT_STATUS.lock();
38 mutex.epoch = new_value;
39 if mutex.initial_epoch == 0 {
40 mutex.initial_epoch = new_value;
41 }
42}
43
44pub fn start_export() {
45 let mut mutex = CHAIN_EXPORT_STATUS.lock();
46 mutex.epoch = 0;
47 mutex.initial_epoch = 0;
48 mutex.exporting = true;
49 mutex.cancelled = false;
50 mutex.start_time = Some(Utc::now());
51}
52
53pub fn end_export() {
54 let mut mutex = CHAIN_EXPORT_STATUS.lock();
55 mutex.exporting = false;
56}
57
58pub fn cancel_export() {
59 let mut mutex = CHAIN_EXPORT_STATUS.lock();
60 mutex.exporting = false;
61 mutex.cancelled = true;
62}
63
64fn should_save_block_to_snapshot(cid: Cid) -> bool {
65 if cid.hash().code() == u64::from(MultihashCode::Identity) {
69 false
70 } else {
71 matches!(
72 cid.codec(),
73 crate::shim::crypto::IPLD_RAW | fvm_ipld_encoding::DAG_CBOR
74 )
75 }
76}
77
78pub struct DfsIter {
103 dfs: VecDeque<Ipld>,
104}
105
106impl DfsIter {
107 pub fn new(root: Ipld) -> Self {
108 DfsIter {
109 dfs: VecDeque::from([root]),
110 }
111 }
112
113 pub fn walk_next(&mut self, ipld: Ipld) {
114 self.dfs.push_front(ipld)
115 }
116}
117
118impl From<Cid> for DfsIter {
119 fn from(cid: Cid) -> Self {
120 DfsIter::new(Ipld::Link(cid))
121 }
122}
123
124impl Iterator for DfsIter {
125 type Item = Ipld;
126
127 fn next(&mut self) -> Option<Self::Item> {
128 while let Some(ipld) = self.dfs.pop_front() {
129 match ipld {
130 Ipld::List(list) => list.into_iter().rev().for_each(|elt| self.walk_next(elt)),
131 Ipld::Map(map) => map.into_values().rev().for_each(|elt| self.walk_next(elt)),
132 other => return Some(other),
133 }
134 }
135 None
136 }
137}
138
139enum IterateType {
140 Message(Cid),
141 MessageReceipts(Cid),
142 StateRoot(Cid),
143 EventsRoot(Cid),
144}
145
146enum Task {
147 Emit(Cid, Option<Vec<u8>>),
149 Iterate(ChainEpoch, Cid, IterateType, VecDeque<Cid>),
151}
152
153pin_project! {
154 pub struct ChainStream<DB, T> {
155 tipset_iter: T,
156 db: DB,
157 dfs: VecDeque<Task>, seen: CidHashSet,
159 stateroot_limit_exclusive: ChainEpoch,
160 fail_on_dead_links: bool,
161 message_receipts: bool,
162 events: bool,
163 track_progress: bool,
164 }
165}
166
167impl<DB, T> ChainStream<DB, T> {
168 pub fn with_seen(mut self, seen: CidHashSet) -> Self {
169 self.seen = seen;
170 self
171 }
172
173 pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self {
174 self.fail_on_dead_links = fail_on_dead_links;
175 self
176 }
177
178 pub fn track_progress(mut self, track_progress: bool) -> Self {
179 self.track_progress = track_progress;
180 self
181 }
182
183 pub fn with_message_receipts(mut self, message_receipts: bool) -> Self {
185 self.message_receipts = message_receipts;
186 self
187 }
188
189 pub fn with_events(mut self, events: bool) -> Self {
192 self.events = events;
193 self
194 }
195
196 #[allow(dead_code)]
197 pub fn into_seen(self) -> CidHashSet {
198 self.seen
199 }
200}
201
202pub fn stream_chain<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin>(
214 db: DB,
215 tipset_iter: ITER,
216 stateroot_limit_exclusive: ChainEpoch,
217) -> ChainStream<DB, ITER> {
218 ChainStream {
219 tipset_iter,
220 db,
221 dfs: VecDeque::new(),
222 seen: CidHashSet::default(),
223 stateroot_limit_exclusive,
224 fail_on_dead_links: true,
225 message_receipts: false,
226 events: false,
227 track_progress: false,
228 }
229}
230
231pub fn stream_graph<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin>(
234 db: DB,
235 tipset_iter: ITER,
236 stateroot_limit_exclusive: ChainEpoch,
237) -> ChainStream<DB, ITER> {
238 stream_chain(db, tipset_iter, stateroot_limit_exclusive).fail_on_dead_links(false)
239}
240
241impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
242 for ChainStream<DB, ITER>
243{
244 type Item = anyhow::Result<CarBlock>;
245
246 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
247 use Task::*;
248
249 let fail_on_dead_links = self.fail_on_dead_links;
250 let stateroot_limit_exclusive = self.stateroot_limit_exclusive;
251 let this = self.project();
252
253 loop {
254 while let Some(task) = this.dfs.front_mut() {
255 match task {
256 Emit(_, _) => {
257 if let Some(Emit(cid, data)) = this.dfs.pop_front() {
258 if let Some(data) = data {
259 return Poll::Ready(Some(Ok(CarBlock { cid, data })));
260 } else if let Some(data) = this.db.get(&cid)? {
261 return Poll::Ready(Some(Ok(CarBlock { cid, data })));
262 } else if fail_on_dead_links {
263 return Poll::Ready(Some(Err(anyhow::anyhow!(
264 "[Emit] missing key: {cid}"
265 ))));
266 };
267 }
268 }
269 Iterate(epoch, block_cid, _type, cid_vec) => {
270 if *this.track_progress {
271 update_epoch(*epoch);
272 }
273 while let Some(cid) = cid_vec.pop_front() {
274 if should_save_block_to_snapshot(cid) && this.seen.insert(cid) {
280 if let Some(data) = this.db.get(&cid)? {
281 if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
282 let new_values = extract_cids(&data)?;
283 if !new_values.is_empty() {
284 cid_vec.reserve(new_values.len());
285 for v in new_values.into_iter().rev() {
286 cid_vec.push_front(v)
287 }
288 }
289 }
290 return Poll::Ready(Some(Ok(CarBlock { cid, data })));
291 } else if fail_on_dead_links {
292 let type_display = match _type {
293 IterateType::Message(c) => {
294 format!("message {c}")
295 }
296 IterateType::StateRoot(c) => {
297 format!("state root {c}")
298 }
299 IterateType::MessageReceipts(c) => {
300 tracing::trace!(
302 "[Iterate] missing key: {cid} from message receipts {c} in block {block_cid} at epoch {epoch}"
303 );
304 continue;
305 }
306 IterateType::EventsRoot(c) => {
307 tracing::trace!(
309 "[Iterate] missing key: {cid} from events root {c} in block {block_cid} at epoch {epoch}"
310 );
311 continue;
312 }
313 };
314 return Poll::Ready(Some(Err(anyhow::anyhow!(
315 "[Iterate] missing key: {cid} from {type_display} in block {block_cid} at epoch {epoch}"
316 ))));
317 }
318 }
319 }
320 this.dfs.pop_front();
321 }
322 }
323 }
324
325 if let Some(tipset) = this.tipset_iter.next() {
329 for block in tipset.borrow().block_headers() {
330 let (cid, data) = block.car_block()?;
331 if this.seen.insert(cid) {
332 if *this.track_progress {
333 update_epoch(block.epoch);
334 }
335 this.dfs.push_back(Emit(cid, Some(data)));
337
338 if block.epoch == 0 {
339 for p in &block.parents {
341 this.dfs.push_back(Emit(p, None));
342 }
343 }
344
345 if block.epoch > stateroot_limit_exclusive {
347 this.dfs.push_back(Iterate(
348 block.epoch,
349 *block.cid(),
350 IterateType::Message(block.messages),
351 DfsIter::from(block.messages)
352 .filter_map(ipld_to_cid)
353 .collect(),
354 ));
355 if *this.message_receipts {
356 this.dfs.push_back(Iterate(
357 block.epoch,
358 *block.cid(),
359 IterateType::MessageReceipts(block.message_receipts),
360 DfsIter::from(block.message_receipts)
361 .filter_map(ipld_to_cid)
362 .collect(),
363 ));
364 }
365 if *this.events
367 && let Ok(receipts) =
368 Receipt::get_receipts(this.db, block.message_receipts)
369 {
370 for receipt in receipts {
371 if let Some(events_root) = receipt.events_root() {
372 this.dfs.push_back(Iterate(
373 block.epoch,
374 *block.cid(),
375 IterateType::EventsRoot(events_root),
376 DfsIter::from(events_root)
377 .filter_map(ipld_to_cid)
378 .collect(),
379 ));
380 }
381 }
382 }
383 }
384
385 if block.epoch == 0 || block.epoch > stateroot_limit_exclusive {
388 this.dfs.push_back(Iterate(
391 block.epoch,
392 *block.cid(),
393 IterateType::StateRoot(block.state_root),
394 DfsIter::from(block.state_root)
395 .filter_map(ipld_to_cid)
396 .collect(),
397 ));
398 }
399 }
400 }
401 } else {
402 return Poll::Ready(None);
404 }
405 }
406 }
407}
408
409fn ipld_to_cid(ipld: Ipld) -> Option<Cid> {
410 if let Ipld::Link(cid) = ipld {
411 Some(cid)
412 } else {
413 None
414 }
415}