use crate::blocks::Tipset;
use crate::cid_collections::{CidHashSet, CidHashSetLike};
use crate::ipld::Ipld;
use crate::shim::clock::ChainEpoch;
use crate::shim::executor::Receipt;
use crate::utils::db::car_stream::CarBlock;
use crate::utils::encoding::extract_cids;
use crate::utils::multihash::prelude::*;
use chrono::{DateTime, Utc};
use cid::Cid;
use futures::Stream;
use fvm_ipld_blockstore::Blockstore;
use parking_lot::Mutex;
use pin_project_lite::pin_project;
use std::borrow::Borrow;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::LazyLock;
use std::task::{Context, Poll};
#[derive(Default)]
pub struct ExportStatus {
pub epoch: i64,
pub initial_epoch: i64,
pub exporting: bool,
pub cancelled: bool,
pub start_time: Option<DateTime<Utc>>,
}
pub static CHAIN_EXPORT_STATUS: LazyLock<Mutex<ExportStatus>> =
LazyLock::new(|| ExportStatus::default().into());
fn update_epoch(new_value: i64) {
let mut mutex = CHAIN_EXPORT_STATUS.lock();
mutex.epoch = new_value;
if mutex.initial_epoch == 0 {
mutex.initial_epoch = new_value;
}
}
pub fn start_export() {
let mut mutex = CHAIN_EXPORT_STATUS.lock();
mutex.epoch = 0;
mutex.initial_epoch = 0;
mutex.exporting = true;
mutex.cancelled = false;
mutex.start_time = Some(Utc::now());
}
pub fn end_export() {
let mut mutex = CHAIN_EXPORT_STATUS.lock();
mutex.exporting = false;
}
pub fn cancel_export() {
let mut mutex = CHAIN_EXPORT_STATUS.lock();
mutex.exporting = false;
mutex.cancelled = true;
}
fn should_save_block_to_snapshot(cid: Cid) -> bool {
if cid.hash().code() == u64::from(MultihashCode::Identity) {
false
} else {
matches!(
cid.codec(),
crate::shim::crypto::IPLD_RAW | fvm_ipld_encoding::DAG_CBOR
)
}
}
pub struct DfsIter {
dfs: Vec<Ipld>,
}
impl DfsIter {
pub fn new(root: Ipld) -> Self {
DfsIter { dfs: vec![root] }
}
}
impl From<Cid> for DfsIter {
fn from(cid: Cid) -> Self {
DfsIter::new(Ipld::Link(cid))
}
}
impl Iterator for DfsIter {
type Item = Ipld;
fn next(&mut self) -> Option<Self::Item> {
while let Some(ipld) = self.dfs.pop() {
match ipld {
Ipld::List(list) => self.dfs.extend(list.into_iter().rev()),
Ipld::Map(map) => self.dfs.extend(map.into_values().rev()),
other => return Some(other),
}
}
None
}
}
enum IterateType {
Message(Cid),
MessageReceipts(Cid),
StateRoot(Cid),
EventsRoot(Cid),
}
enum Task {
Emit(Cid, Option<Vec<u8>>),
Iterate(ChainEpoch, Cid, IterateType, Vec<Cid>),
}
pin_project! {
pub struct ChainStream<DB, T, S = CidHashSet> {
tipset_iter: T,
db: DB,
dfs: VecDeque<Task>, seen: S,
stateroot_limit_exclusive: ChainEpoch,
fail_on_dead_links: bool,
message_receipts: bool,
events: bool,
tipset_keys:bool,
track_progress: bool,
}
}
impl<DB, T, S> ChainStream<DB, T, S> {
pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self {
self.fail_on_dead_links = fail_on_dead_links;
self
}
pub fn track_progress(mut self, track_progress: bool) -> Self {
self.track_progress = track_progress;
self
}
pub fn with_message_receipts(mut self, message_receipts: bool) -> Self {
self.message_receipts = message_receipts;
self
}
pub fn with_events(mut self, events: bool) -> Self {
self.events = events;
self
}
pub fn with_tipset_keys(mut self, tipset_keys: bool) -> Self {
self.tipset_keys = tipset_keys;
self
}
pub fn into_seen(self) -> S {
self.seen
}
}
pub fn stream_chain<
DB: Blockstore,
T: Borrow<Tipset>,
ITER: Iterator<Item = T> + Unpin,
S: CidHashSetLike,
>(
db: DB,
tipset_iter: ITER,
stateroot_limit_exclusive: ChainEpoch,
seen: S,
) -> ChainStream<DB, ITER, S> {
ChainStream {
tipset_iter,
db,
dfs: VecDeque::new(),
seen,
stateroot_limit_exclusive,
fail_on_dead_links: true,
message_receipts: false,
events: false,
tipset_keys: false,
track_progress: false,
}
}
pub fn stream_graph<
DB: Blockstore,
T: Borrow<Tipset>,
ITER: Iterator<Item = T> + Unpin,
S: CidHashSetLike,
>(
db: DB,
tipset_iter: ITER,
stateroot_limit_exclusive: ChainEpoch,
seen: S,
) -> ChainStream<DB, ITER, S> {
stream_chain(db, tipset_iter, stateroot_limit_exclusive, seen).fail_on_dead_links(false)
}
impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin, S: CidHashSetLike> Stream
for ChainStream<DB, ITER, S>
{
type Item = anyhow::Result<CarBlock>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use Task::*;
let export_tipset_keys = self.tipset_keys;
let fail_on_dead_links = self.fail_on_dead_links;
let stateroot_limit_exclusive = self.stateroot_limit_exclusive;
let this = self.project();
loop {
while let Some(task) = this.dfs.front_mut() {
match task {
Emit(_, _) => {
if let Some(Emit(cid, data)) = this.dfs.pop_front() {
if let Some(data) = data {
return Poll::Ready(Some(Ok(CarBlock { cid, data })));
} else if let Some(data) = this.db.get(&cid)? {
return Poll::Ready(Some(Ok(CarBlock { cid, data })));
} else if fail_on_dead_links {
return Poll::Ready(Some(Err(anyhow::anyhow!(
"[Emit] missing key: {cid}"
))));
};
}
}
Iterate(epoch, block_cid, _type, cid_vec) => {
if *this.track_progress {
update_epoch(*epoch);
}
while let Some(cid) = cid_vec.pop() {
if should_save_block_to_snapshot(cid) && this.seen.insert(cid)? {
if let Some(data) = this.db.get(&cid)? {
if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
let new_values = extract_cids(&data)?;
cid_vec.extend(new_values.into_iter().rev());
}
return Poll::Ready(Some(Ok(CarBlock { cid, data })));
} else if fail_on_dead_links {
let type_display = match _type {
IterateType::Message(c) => {
format!("message {c}")
}
IterateType::StateRoot(c) => {
format!("state root {c}")
}
IterateType::MessageReceipts(c) => {
tracing::trace!(
"[Iterate] missing key: {cid} from message receipts {c} in block {block_cid} at epoch {epoch}"
);
continue;
}
IterateType::EventsRoot(c) => {
tracing::trace!(
"[Iterate] missing key: {cid} from events root {c} in block {block_cid} at epoch {epoch}"
);
continue;
}
};
return Poll::Ready(Some(Err(anyhow::anyhow!(
"[Iterate] missing key: {cid} from {type_display} in block {block_cid} at epoch {epoch}"
))));
}
}
}
this.dfs.pop_front();
}
}
}
if let Some(tipset) = this.tipset_iter.next() {
if export_tipset_keys
&& let Ok(CarBlock { cid, data }) = tipset.borrow().key().car_block()
{
this.dfs.push_back(Emit(cid, Some(data)));
}
for block in tipset.borrow().block_headers() {
let (cid, data) = block.car_block()?;
if this.seen.insert(cid)? {
if *this.track_progress {
update_epoch(block.epoch);
}
this.dfs.push_back(Emit(cid, Some(data)));
if block.epoch == 0 {
for p in &block.parents {
this.dfs.push_back(Emit(p, None));
}
}
if block.epoch > stateroot_limit_exclusive {
this.dfs.push_back(Iterate(
block.epoch,
*block.cid(),
IterateType::Message(block.messages),
DfsIter::from(block.messages)
.filter_map(ipld_to_cid)
.collect(),
));
if *this.message_receipts {
this.dfs.push_back(Iterate(
block.epoch,
*block.cid(),
IterateType::MessageReceipts(block.message_receipts),
DfsIter::from(block.message_receipts)
.filter_map(ipld_to_cid)
.collect(),
));
}
if *this.events
&& let Ok(receipts) =
Receipt::get_receipts(this.db, block.message_receipts)
{
for receipt in receipts {
if let Some(events_root) = receipt.events_root() {
this.dfs.push_back(Iterate(
block.epoch,
*block.cid(),
IterateType::EventsRoot(events_root),
DfsIter::from(events_root)
.filter_map(ipld_to_cid)
.collect(),
));
}
}
}
}
if block.epoch == 0 || block.epoch > stateroot_limit_exclusive {
this.dfs.push_back(Iterate(
block.epoch,
*block.cid(),
IterateType::StateRoot(block.state_root),
DfsIter::from(block.state_root)
.filter_map(ipld_to_cid)
.collect(),
));
}
}
}
} else {
return Poll::Ready(None);
}
}
}
}
pin_project! {
pub struct IpldStream<DB, S> {
db: DB,
cid_vec: Vec<Cid>,
seen: S,
}
}
impl<DB, S> IpldStream<DB, S> {
pub fn new(db: DB, roots: Vec<Cid>, seen: S) -> Self {
Self {
db,
cid_vec: roots,
seen,
}
}
}
impl<DB: Blockstore, S: CidHashSetLike> Stream for IpldStream<DB, S> {
type Item = anyhow::Result<CarBlock>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
while let Some(cid) = this.cid_vec.pop() {
if should_save_block_to_snapshot(cid) && this.seen.insert(cid)? {
if let Some(data) = this.db.get(&cid)? {
if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
let new_cids = extract_cids(&data)?;
this.cid_vec.extend(new_cids);
}
return Poll::Ready(Some(Ok(CarBlock { cid, data })));
} else {
return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {cid}"))));
}
}
}
Poll::Ready(None)
}
}
fn ipld_to_cid(ipld: Ipld) -> Option<Cid> {
if let Ipld::Link(cid) = ipld {
Some(cid)
} else {
None
}
}