Skip to main content

forest/rpc/methods/
chain.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4pub mod types;
5use enumflags2::{BitFlags, make_bitflags};
6use types::*;
7
8#[cfg(test)]
9use crate::blocks::RawBlockHeader;
10use crate::blocks::{Block, CachingBlockHeader, Tipset, TipsetKey};
11use crate::chain::index::ResolveNullTipset;
12use crate::chain::{ChainStore, ExportOptions, FilecoinSnapshotVersion, HeadChange};
13use crate::chain_sync::{get_full_tipset, load_full_tipset};
14use crate::cid_collections::CidHashSet;
15use crate::ipld::DfsIter;
16use crate::ipld::{CHAIN_EXPORT_STATUS, cancel_export, end_export, start_export};
17use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self};
18#[cfg(test)]
19use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json};
20use crate::message::{ChainMessage, SignedMessage};
21use crate::rpc::eth::Block as EthBlock;
22use crate::rpc::eth::{
23    EthLog, TxInfo, eth_logs_with_filter, types::ApiHeaders, types::EthFilterSpec,
24};
25use crate::rpc::f3::F3ExportLatestSnapshot;
26use crate::rpc::types::*;
27use crate::rpc::{ApiPaths, Ctx, EthEventHandler, Permission, RpcMethod, ServerError};
28use crate::shim::clock::ChainEpoch;
29use crate::shim::error::ExitCode;
30use crate::shim::executor::Receipt;
31use crate::shim::message::Message;
32use crate::utils::db::CborStoreExt as _;
33use crate::utils::io::VoidAsyncWriter;
34use crate::utils::misc::env::is_env_truthy;
35use anyhow::{Context as _, Result};
36use cid::Cid;
37use fvm_ipld_blockstore::Blockstore;
38use fvm_ipld_encoding::{CborStore, RawBytes};
39use hex::ToHex;
40use ipld_core::ipld::Ipld;
41use jsonrpsee::types::Params;
42use jsonrpsee::types::error::ErrorObjectOwned;
43use num::BigInt;
44use schemars::JsonSchema;
45use serde::{Deserialize, Serialize};
46use sha2::Sha256;
47use std::fs::File;
48use std::{collections::VecDeque, path::PathBuf, sync::LazyLock};
49use tokio::sync::{
50    Mutex,
51    broadcast::{self, Receiver as Subscriber},
52};
53use tokio::task::JoinHandle;
54use tokio_util::sync::CancellationToken;
55
56const HEAD_CHANNEL_CAPACITY: usize = 10;
57
58/// [`SAFE_HEIGHT_DISTANCE`] is the distance from the latest tipset, i.e. "heaviest", that
59/// is considered to be safe from re-orgs at an increasingly diminishing
60/// probability.
61///
62/// This is used to determine the safe tipset when using the "safe" tag in
63/// [`TipsetSelector`] or via Eth JSON-RPC APIs. Note that "safe" doesn't guarantee
64/// finality, but rather a high probability of not being reverted. For guaranteed
65/// finality, use the "finalized" tag.
66///
67/// This constant is experimental and may change in the future.
68/// Discussion on this current value and a tracking item to document the
69/// probabilistic impact of various values is in
70/// https://github.com/filecoin-project/go-f3/issues/944
71const SAFE_HEIGHT_DISTANCE: ChainEpoch = 200;
72
73static CHAIN_EXPORT_LOCK: LazyLock<Mutex<Option<CancellationToken>>> =
74    LazyLock::new(|| Mutex::new(None));
75
76/// Subscribes to head changes from the chain store and broadcasts new blocks.
77///
78/// # Notes
79///
80/// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`,
81/// allowing manual cleanup if needed.
82pub(crate) fn new_heads<DB: Blockstore + Send + Sync + 'static>(
83    data: Ctx<DB>,
84) -> (Subscriber<ApiHeaders>, JoinHandle<()>) {
85    let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
86
87    let mut subscriber = data.chain_store().publisher().subscribe();
88
89    let handle = tokio::spawn(async move {
90        while let Ok(v) = subscriber.recv().await {
91            let headers = match v {
92                HeadChange::Apply(ts) => {
93                    // Convert the tipset to an Ethereum block with full transaction info
94                    // Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block
95                    match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await {
96                        Ok(block) => ApiHeaders(block),
97                        Err(e) => {
98                            tracing::error!("Failed to convert tipset to eth block: {}", e);
99                            continue;
100                        }
101                    }
102                }
103            };
104            if let Err(e) = sender.send(headers) {
105                tracing::error!("Failed to send headers: {}", e);
106                break;
107            }
108        }
109    });
110
111    (receiver, handle)
112}
113
114/// Subscribes to head changes from the chain store and broadcasts new `Ethereum` logs.
115///
116/// # Notes
117///
118/// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`,
119/// allowing manual cleanup if needed.
120pub(crate) fn logs<DB: Blockstore + Sync + Send + 'static>(
121    ctx: &Ctx<DB>,
122    filter: Option<EthFilterSpec>,
123) -> (Subscriber<Vec<EthLog>>, JoinHandle<()>) {
124    let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
125
126    let mut subscriber = ctx.chain_store().publisher().subscribe();
127
128    let ctx = ctx.clone();
129
130    let handle = tokio::spawn(async move {
131        while let Ok(v) = subscriber.recv().await {
132            match v {
133                HeadChange::Apply(ts) => {
134                    match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await {
135                        Ok(logs) => {
136                            if !logs.is_empty()
137                                && let Err(e) = sender.send(logs)
138                            {
139                                tracing::error!(
140                                    "Failed to send logs for tipset {}: {}",
141                                    ts.key(),
142                                    e
143                                );
144                                break;
145                            }
146                        }
147                        Err(e) => {
148                            tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e);
149                        }
150                    }
151                }
152            }
153        }
154    });
155
156    (receiver, handle)
157}
158
159pub enum ChainGetFinalizedTipset {}
160impl RpcMethod<0> for ChainGetFinalizedTipset {
161    const NAME: &'static str = "Filecoin.ChainGetFinalizedTipSet";
162    const PARAM_NAMES: [&'static str; 0] = [];
163    const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::V1);
164    const PERMISSION: Permission = Permission::Read;
165    const DESCRIPTION: Option<&'static str> = Some(
166        "Returns the latest F3 finalized tipset, or falls back to EC finality if F3 is not operational on the node or if the F3 finalized tipset is further back than EC finalized tipset.",
167    );
168
169    type Params = ();
170    type Ok = Tipset;
171
172    async fn handle(
173        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
174        (): Self::Params,
175    ) -> Result<Self::Ok, ServerError> {
176        let head = ctx.chain_store().heaviest_tipset();
177        let ec_finality_epoch = (head.epoch() - ctx.chain_config().policy.chain_finality).max(0);
178
179        // Either get the f3 finalized tipset or the ec finalized tipset
180        match get_f3_finality_tipset(&ctx, ec_finality_epoch).await {
181            Ok(f3_tipset) => {
182                tracing::debug!("Using F3 finalized tipset at epoch {}", f3_tipset.epoch());
183                Ok(f3_tipset)
184            }
185            Err(_) => {
186                // fallback to ec finality
187                tracing::warn!("F3 finalization unavailable, falling back to EC finality");
188                let ec_tipset = ctx.chain_index().tipset_by_height(
189                    ec_finality_epoch,
190                    head,
191                    ResolveNullTipset::TakeOlder,
192                )?;
193                Ok(ec_tipset)
194            }
195        }
196    }
197}
198
199// get f3 finalized tipset based on ec finality epoch
200async fn get_f3_finality_tipset<DB: Blockstore + Sync + Send + 'static>(
201    ctx: &Ctx<DB>,
202    ec_finality_epoch: ChainEpoch,
203) -> Result<Tipset> {
204    let f3_finalized_cert = crate::rpc::f3::F3GetLatestCertificate::handle(ctx.clone(), ())
205        .await
206        .map_err(|e| anyhow::anyhow!("Failed to get F3 certificate: {}", e))?;
207
208    let f3_finalized_head = f3_finalized_cert.chain_head();
209    if f3_finalized_head.epoch < ec_finality_epoch {
210        return Err(anyhow::anyhow!(
211            "F3 finalized tipset epoch {} is further back than EC finalized tipset epoch {}",
212            f3_finalized_head.epoch,
213            ec_finality_epoch
214        ));
215    }
216
217    ctx.chain_index()
218        .load_required_tipset(&f3_finalized_head.key)
219        .map_err(|e| {
220            anyhow::anyhow!(
221                "Failed to load F3 finalized tipset at epoch {}: {}",
222                f3_finalized_head.epoch,
223                e
224            )
225        })
226}
227
228pub enum ChainGetMessage {}
229impl RpcMethod<1> for ChainGetMessage {
230    const NAME: &'static str = "Filecoin.ChainGetMessage";
231    const PARAM_NAMES: [&'static str; 1] = ["messageCid"];
232    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
233    const PERMISSION: Permission = Permission::Read;
234    const DESCRIPTION: Option<&'static str> = Some("Returns the message with the specified CID.");
235
236    type Params = (Cid,);
237    type Ok = FlattenedApiMessage;
238
239    async fn handle(
240        ctx: Ctx<impl Blockstore>,
241        (message_cid,): Self::Params,
242    ) -> Result<Self::Ok, ServerError> {
243        let chain_message: ChainMessage = ctx
244            .store()
245            .get_cbor(&message_cid)?
246            .with_context(|| format!("can't find message with cid {message_cid}"))?;
247        let message = match chain_message {
248            ChainMessage::Signed(m) => m.into_message(),
249            ChainMessage::Unsigned(m) => m,
250        };
251
252        let cid = message.cid();
253        Ok(FlattenedApiMessage { message, cid })
254    }
255}
256
257/// Returns the events stored under the given event AMT root CID.
258/// Errors if the root CID cannot be found in the blockstore.
259pub enum ChainGetEvents {}
260impl RpcMethod<1> for ChainGetEvents {
261    const NAME: &'static str = "Filecoin.ChainGetEvents";
262    const PARAM_NAMES: [&'static str; 1] = ["rootCid"];
263    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
264    const PERMISSION: Permission = Permission::Read;
265    const DESCRIPTION: Option<&'static str> =
266        Some("Returns the events under the given event AMT root CID.");
267
268    type Params = (Cid,);
269    type Ok = Vec<Event>;
270    async fn handle(
271        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
272        (root_cid,): Self::Params,
273    ) -> Result<Self::Ok, ServerError> {
274        let events = EthEventHandler::get_events_by_event_root(&ctx, &root_cid)?;
275        Ok(events)
276    }
277}
278
279pub enum ChainGetParentMessages {}
280impl RpcMethod<1> for ChainGetParentMessages {
281    const NAME: &'static str = "Filecoin.ChainGetParentMessages";
282    const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
283    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
284    const PERMISSION: Permission = Permission::Read;
285    const DESCRIPTION: Option<&'static str> =
286        Some("Returns the messages included in the blocks of the parent tipset.");
287
288    type Params = (Cid,);
289    type Ok = Vec<ApiMessage>;
290
291    async fn handle(
292        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
293        (block_cid,): Self::Params,
294    ) -> Result<Self::Ok, ServerError> {
295        let store = ctx.store();
296        let block_header: CachingBlockHeader = store
297            .get_cbor(&block_cid)?
298            .with_context(|| format!("can't find block header with cid {block_cid}"))?;
299        if block_header.epoch == 0 {
300            Ok(vec![])
301        } else {
302            let parent_tipset = Tipset::load_required(store, &block_header.parents)?;
303            load_api_messages_from_tipset(&ctx, parent_tipset.key()).await
304        }
305    }
306}
307
308pub enum ChainGetParentReceipts {}
309impl RpcMethod<1> for ChainGetParentReceipts {
310    const NAME: &'static str = "Filecoin.ChainGetParentReceipts";
311    const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
312    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
313    const PERMISSION: Permission = Permission::Read;
314    const DESCRIPTION: Option<&'static str> =
315        Some("Returns the message receipts included in the blocks of the parent tipset.");
316
317    type Params = (Cid,);
318    type Ok = Vec<ApiReceipt>;
319
320    async fn handle(
321        ctx: Ctx<impl Blockstore>,
322        (block_cid,): Self::Params,
323    ) -> Result<Self::Ok, ServerError> {
324        let store = ctx.store();
325        let block_header: CachingBlockHeader = store
326            .get_cbor(&block_cid)?
327            .with_context(|| format!("can't find block header with cid {block_cid}"))?;
328        if block_header.epoch == 0 {
329            return Ok(vec![]);
330        }
331        let receipts = Receipt::get_receipts(store, block_header.message_receipts)
332            .map_err(|_| {
333                ErrorObjectOwned::owned::<()>(
334                    1,
335                    format!(
336                        "failed to root: ipld: could not find {}",
337                        block_header.message_receipts
338                    ),
339                    None,
340                )
341            })?
342            .iter()
343            .map(|r| ApiReceipt {
344                exit_code: r.exit_code().into(),
345                return_data: r.return_data(),
346                gas_used: r.gas_used(),
347                events_root: r.events_root(),
348            })
349            .collect();
350
351        Ok(receipts)
352    }
353}
354
355pub enum ChainGetMessagesInTipset {}
356impl RpcMethod<1> for ChainGetMessagesInTipset {
357    const NAME: &'static str = "Filecoin.ChainGetMessagesInTipset";
358    const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
359    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
360    const PERMISSION: Permission = Permission::Read;
361
362    type Params = (ApiTipsetKey,);
363    type Ok = Vec<ApiMessage>;
364
365    async fn handle(
366        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
367        (ApiTipsetKey(tipset_key),): Self::Params,
368    ) -> Result<Self::Ok, ServerError> {
369        let tipset = ctx
370            .chain_store()
371            .load_required_tipset_or_heaviest(&tipset_key)?;
372        load_api_messages_from_tipset(&ctx, tipset.key()).await
373    }
374}
375
376pub enum ChainPruneSnapshot {}
377impl RpcMethod<1> for ChainPruneSnapshot {
378    const NAME: &'static str = "Forest.SnapshotGC";
379    const PARAM_NAMES: [&'static str; 1] = ["blocking"];
380    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
381    const PERMISSION: Permission = Permission::Admin;
382
383    type Params = (bool,);
384    type Ok = ();
385
386    async fn handle(
387        _ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
388        (blocking,): Self::Params,
389    ) -> Result<Self::Ok, ServerError> {
390        if let Some(gc) = crate::daemon::GLOBAL_SNAPSHOT_GC.get() {
391            let progress_rx = gc.trigger()?;
392            while blocking && progress_rx.recv_async().await.is_ok() {}
393            Ok(())
394        } else {
395            Err(anyhow::anyhow!("snapshot gc is not enabled").into())
396        }
397    }
398}
399
400pub enum ForestChainExport {}
401impl RpcMethod<1> for ForestChainExport {
402    const NAME: &'static str = "Forest.ChainExport";
403    const PARAM_NAMES: [&'static str; 1] = ["params"];
404    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
405    const PERMISSION: Permission = Permission::Read;
406
407    type Params = (ForestChainExportParams,);
408    type Ok = ApiExportResult;
409
410    async fn handle(
411        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
412        (params,): Self::Params,
413    ) -> Result<Self::Ok, ServerError> {
414        let ForestChainExportParams {
415            version,
416            epoch,
417            recent_roots,
418            output_path,
419            tipset_keys: ApiTipsetKey(tsk),
420            skip_checksum,
421            dry_run,
422        } = params;
423
424        let token = CancellationToken::new();
425        {
426            let mut guard = CHAIN_EXPORT_LOCK.lock().await;
427            if guard.is_some() {
428                return Err(
429                    anyhow::anyhow!("A chain export is still in progress. Cancel it with the export-cancel subcommand if needed.").into(),
430                );
431            }
432            *guard = Some(token.clone());
433        }
434        start_export();
435
436        let head = ctx.chain_store().load_required_tipset_or_heaviest(&tsk)?;
437        let start_ts =
438            ctx.chain_index()
439                .tipset_by_height(epoch, head, ResolveNullTipset::TakeOlder)?;
440
441        let options = Some(ExportOptions {
442            skip_checksum,
443            seen: Default::default(),
444        });
445        let writer = if dry_run {
446            tokio_util::either::Either::Left(VoidAsyncWriter)
447        } else {
448            tokio_util::either::Either::Right(tokio::fs::File::create(&output_path).await?)
449        };
450        let result = match version {
451            FilecoinSnapshotVersion::V1 => {
452                let db = ctx.store_owned();
453
454                let chain_export =
455                    crate::chain::export::<Sha256>(&db, &start_ts, recent_roots, writer, options);
456
457                tokio::select! {
458                    result = chain_export => {
459                        result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex())))
460                    },
461                    _ = token.cancelled() => {
462                        cancel_export();
463                        tracing::warn!("Snapshot export was cancelled");
464                        Ok(ApiExportResult::Cancelled)
465                    },
466                }
467            }
468            FilecoinSnapshotVersion::V2 => {
469                let db = ctx.store_owned();
470
471                let f3_snap_tmp_path = {
472                    let mut f3_snap_dir = output_path.clone();
473                    let mut builder = tempfile::Builder::new();
474                    let with_suffix = builder.suffix(".f3snap.bin");
475                    if f3_snap_dir.pop() {
476                        with_suffix.tempfile_in(&f3_snap_dir)
477                    } else {
478                        with_suffix.tempfile_in(".")
479                    }?
480                    .into_temp_path()
481                };
482                let f3_snap = {
483                    match F3ExportLatestSnapshot::run(f3_snap_tmp_path.display().to_string()).await
484                    {
485                        Ok(cid) => Some((cid, File::open(&f3_snap_tmp_path)?)),
486                        Err(e) => {
487                            tracing::error!("Failed to export F3 snapshot: {e}");
488                            None
489                        }
490                    }
491                };
492
493                let chain_export = crate::chain::export_v2::<Sha256, _>(
494                    &db,
495                    f3_snap,
496                    &start_ts,
497                    recent_roots,
498                    writer,
499                    options,
500                );
501
502                tokio::select! {
503                    result = chain_export => {
504                        result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex())))
505                    },
506                    _ = token.cancelled() => {
507                        cancel_export();
508                        tracing::warn!("Snapshot export was cancelled");
509                        Ok(ApiExportResult::Cancelled)
510                    },
511                }
512            }
513        };
514        end_export();
515        // Clean up token
516        let mut guard = CHAIN_EXPORT_LOCK.lock().await;
517        *guard = None;
518        match result {
519            Ok(export_result) => Ok(export_result),
520            Err(e) => Err(anyhow::anyhow!(e).into()),
521        }
522    }
523}
524
525pub enum ForestChainExportStatus {}
526impl RpcMethod<0> for ForestChainExportStatus {
527    const NAME: &'static str = "Forest.ChainExportStatus";
528    const PARAM_NAMES: [&'static str; 0] = [];
529    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
530    const PERMISSION: Permission = Permission::Read;
531
532    type Params = ();
533    type Ok = ApiExportStatus;
534
535    async fn handle(_ctx: Ctx<impl Blockstore>, (): Self::Params) -> Result<Self::Ok, ServerError> {
536        let mutex = CHAIN_EXPORT_STATUS.lock();
537
538        let progress = if mutex.initial_epoch == 0 {
539            0.0
540        } else {
541            let p = 1.0 - ((mutex.epoch as f64) / (mutex.initial_epoch as f64));
542            if p.is_finite() {
543                p.clamp(0.0, 1.0)
544            } else {
545                0.0
546            }
547        };
548        // only two decimal places
549        let progress = (progress * 100.0).round() / 100.0;
550
551        let status = ApiExportStatus {
552            progress,
553            exporting: mutex.exporting,
554            cancelled: mutex.cancelled,
555            start_time: mutex.start_time,
556        };
557
558        Ok(status)
559    }
560}
561
562pub enum ForestChainExportCancel {}
563impl RpcMethod<0> for ForestChainExportCancel {
564    const NAME: &'static str = "Forest.ChainExportCancel";
565    const PARAM_NAMES: [&'static str; 0] = [];
566    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
567    const PERMISSION: Permission = Permission::Read;
568
569    type Params = ();
570    type Ok = bool;
571
572    async fn handle(_ctx: Ctx<impl Blockstore>, (): Self::Params) -> Result<Self::Ok, ServerError> {
573        if let Some(token) = CHAIN_EXPORT_LOCK.lock().await.as_ref() {
574            token.cancel();
575            return Ok(true);
576        }
577
578        Ok(false)
579    }
580}
581
582pub enum ForestChainExportDiff {}
583impl RpcMethod<1> for ForestChainExportDiff {
584    const NAME: &'static str = "Forest.ChainExportDiff";
585    const PARAM_NAMES: [&'static str; 1] = ["params"];
586    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
587    const PERMISSION: Permission = Permission::Read;
588
589    type Params = (ForestChainExportDiffParams,);
590    type Ok = ();
591
592    async fn handle(
593        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
594        (params,): Self::Params,
595    ) -> Result<Self::Ok, ServerError> {
596        let ForestChainExportDiffParams {
597            from,
598            to,
599            depth,
600            output_path,
601        } = params;
602
603        let _locked = CHAIN_EXPORT_LOCK.try_lock();
604        if _locked.is_err() {
605            return Err(
606                anyhow::anyhow!("Another chain export diff job is still in progress").into(),
607            );
608        }
609
610        let chain_finality = ctx.chain_config().policy.chain_finality;
611        if depth < chain_finality {
612            return Err(
613                anyhow::anyhow!(format!("depth must be greater than {chain_finality}")).into(),
614            );
615        }
616
617        let head = ctx.chain_store().heaviest_tipset();
618        let start_ts =
619            ctx.chain_index()
620                .tipset_by_height(from, head, ResolveNullTipset::TakeOlder)?;
621
622        crate::tool::subcommands::archive_cmd::do_export(
623            &ctx.store_owned(),
624            start_ts,
625            output_path,
626            None,
627            depth,
628            Some(to),
629            Some(chain_finality),
630            true,
631        )
632        .await?;
633
634        Ok(())
635    }
636}
637
638pub enum ChainExport {}
639impl RpcMethod<1> for ChainExport {
640    const NAME: &'static str = "Filecoin.ChainExport";
641    const PARAM_NAMES: [&'static str; 1] = ["params"];
642    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
643    const PERMISSION: Permission = Permission::Read;
644
645    type Params = (ChainExportParams,);
646    type Ok = ApiExportResult;
647
648    async fn handle(
649        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
650        (ChainExportParams {
651            epoch,
652            recent_roots,
653            output_path,
654            tipset_keys,
655            skip_checksum,
656            dry_run,
657        },): Self::Params,
658    ) -> Result<Self::Ok, ServerError> {
659        ForestChainExport::handle(
660            ctx,
661            (ForestChainExportParams {
662                version: FilecoinSnapshotVersion::V1,
663                epoch,
664                recent_roots,
665                output_path,
666                tipset_keys,
667                skip_checksum,
668                dry_run,
669            },),
670        )
671        .await
672    }
673}
674
675pub enum ChainReadObj {}
676impl RpcMethod<1> for ChainReadObj {
677    const NAME: &'static str = "Filecoin.ChainReadObj";
678    const PARAM_NAMES: [&'static str; 1] = ["cid"];
679    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
680    const PERMISSION: Permission = Permission::Read;
681    const DESCRIPTION: Option<&'static str> = Some(
682        "Reads IPLD nodes referenced by the specified CID from the chain blockstore and returns raw bytes.",
683    );
684
685    type Params = (Cid,);
686    type Ok = Vec<u8>;
687
688    async fn handle(
689        ctx: Ctx<impl Blockstore>,
690        (cid,): Self::Params,
691    ) -> Result<Self::Ok, ServerError> {
692        let bytes = ctx
693            .store()
694            .get(&cid)?
695            .with_context(|| format!("can't find object with cid={cid}"))?;
696        Ok(bytes)
697    }
698}
699
700pub enum ChainHasObj {}
701impl RpcMethod<1> for ChainHasObj {
702    const NAME: &'static str = "Filecoin.ChainHasObj";
703    const PARAM_NAMES: [&'static str; 1] = ["cid"];
704    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
705    const PERMISSION: Permission = Permission::Read;
706    const DESCRIPTION: Option<&'static str> =
707        Some("Checks if a given CID exists in the chain blockstore.");
708
709    type Params = (Cid,);
710    type Ok = bool;
711
712    async fn handle(
713        ctx: Ctx<impl Blockstore>,
714        (cid,): Self::Params,
715    ) -> Result<Self::Ok, ServerError> {
716        Ok(ctx.store().get(&cid)?.is_some())
717    }
718}
719
720/// Returns statistics about the graph referenced by 'obj'.
721/// If 'base' is also specified, then the returned stat will be a diff between the two objects.
722pub enum ChainStatObj {}
723impl RpcMethod<2> for ChainStatObj {
724    const NAME: &'static str = "Filecoin.ChainStatObj";
725    const PARAM_NAMES: [&'static str; 2] = ["obj_cid", "base_cid"];
726    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
727    const PERMISSION: Permission = Permission::Read;
728
729    type Params = (Cid, Option<Cid>);
730    type Ok = ObjStat;
731
732    async fn handle(
733        ctx: Ctx<impl Blockstore>,
734        (obj_cid, base_cid): Self::Params,
735    ) -> Result<Self::Ok, ServerError> {
736        let mut stats = ObjStat::default();
737        let mut seen = CidHashSet::default();
738        let mut walk = |cid, collect| {
739            let mut queue = VecDeque::new();
740            queue.push_back(cid);
741            while let Some(link_cid) = queue.pop_front() {
742                if !seen.insert(link_cid) {
743                    continue;
744                }
745                let data = ctx.store().get(&link_cid)?;
746                if let Some(data) = data {
747                    if collect {
748                        stats.links += 1;
749                        stats.size += data.len();
750                    }
751                    if matches!(link_cid.codec(), fvm_ipld_encoding::DAG_CBOR)
752                        && let Ok(ipld) =
753                            crate::utils::encoding::from_slice_with_fallback::<Ipld>(&data)
754                    {
755                        for ipld in DfsIter::new(ipld) {
756                            if let Ipld::Link(cid) = ipld {
757                                queue.push_back(cid);
758                            }
759                        }
760                    }
761                }
762            }
763            anyhow::Ok(())
764        };
765        if let Some(base_cid) = base_cid {
766            walk(base_cid, false)?;
767        }
768        walk(obj_cid, true)?;
769        Ok(stats)
770    }
771}
772
773pub enum ChainGetBlockMessages {}
774impl RpcMethod<1> for ChainGetBlockMessages {
775    const NAME: &'static str = "Filecoin.ChainGetBlockMessages";
776    const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
777    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
778    const PERMISSION: Permission = Permission::Read;
779    const DESCRIPTION: Option<&'static str> =
780        Some("Returns all messages from the specified block.");
781
782    type Params = (Cid,);
783    type Ok = BlockMessages;
784
785    async fn handle(
786        ctx: Ctx<impl Blockstore>,
787        (block_cid,): Self::Params,
788    ) -> Result<Self::Ok, ServerError> {
789        let blk: CachingBlockHeader = ctx.store().get_cbor_required(&block_cid)?;
790        let (unsigned_cids, signed_cids) = crate::chain::read_msg_cids(ctx.store(), &blk)?;
791        let (bls_msg, secp_msg) =
792            crate::chain::block_messages_from_cids(ctx.store(), &unsigned_cids, &signed_cids)?;
793        let cids = unsigned_cids.into_iter().chain(signed_cids).collect();
794
795        let ret = BlockMessages {
796            bls_msg,
797            secp_msg,
798            cids,
799        };
800        Ok(ret)
801    }
802}
803
804pub enum ChainGetPath {}
805impl RpcMethod<2> for ChainGetPath {
806    const NAME: &'static str = "Filecoin.ChainGetPath";
807    const PARAM_NAMES: [&'static str; 2] = ["from", "to"];
808    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
809    const PERMISSION: Permission = Permission::Read;
810    const DESCRIPTION: Option<&'static str> =
811        Some("Returns the path between the two specified tipsets.");
812
813    type Params = (TipsetKey, TipsetKey);
814    type Ok = Vec<PathChange>;
815
816    async fn handle(
817        ctx: Ctx<impl Blockstore>,
818        (from, to): Self::Params,
819    ) -> Result<Self::Ok, ServerError> {
820        impl_chain_get_path(ctx.chain_store(), &from, &to).map_err(Into::into)
821    }
822}
823
824/// Find the path between two tipsets, as a series of [`PathChange`]s.
825///
826/// ```text
827/// 0 - A - B - C - D
828///     ^~~~~~~~> apply B, C
829///
830/// 0 - A - B - C - D
831///     <~~~~~~~^ revert C, B
832///
833///     <~~~~~~~~ revert C, B
834/// 0 - A - B  - C
835///     |
836///      -- B' - C'
837///      ~~~~~~~~> then apply B', C'
838/// ```
839///
840/// Exposes errors from the [`Blockstore`], and returns an error if there is no common ancestor.
841fn impl_chain_get_path(
842    chain_store: &ChainStore<impl Blockstore>,
843    from: &TipsetKey,
844    to: &TipsetKey,
845) -> anyhow::Result<Vec<PathChange>> {
846    let mut to_revert = chain_store
847        .load_required_tipset_or_heaviest(from)
848        .context("couldn't load `from`")?;
849    let mut to_apply = chain_store
850        .load_required_tipset_or_heaviest(to)
851        .context("couldn't load `to`")?;
852
853    let mut all_reverts = vec![];
854    let mut all_applies = vec![];
855
856    // This loop is guaranteed to terminate if the blockstore contain no cycles.
857    // This is currently computationally infeasible.
858    while to_revert != to_apply {
859        if to_revert.epoch() > to_apply.epoch() {
860            let next = chain_store
861                .load_required_tipset_or_heaviest(to_revert.parents())
862                .context("couldn't load ancestor of `from`")?;
863            all_reverts.push(to_revert);
864            to_revert = next;
865        } else {
866            let next = chain_store
867                .load_required_tipset_or_heaviest(to_apply.parents())
868                .context("couldn't load ancestor of `to`")?;
869            all_applies.push(to_apply);
870            to_apply = next;
871        }
872    }
873    Ok(all_reverts
874        .into_iter()
875        .map(PathChange::Revert)
876        .chain(all_applies.into_iter().rev().map(PathChange::Apply))
877        .collect())
878}
879
880/// Get tipset at epoch. Pick younger tipset if epoch points to a
881/// null-tipset. Only tipsets below the given `head` are searched. If `head`
882/// is null, the node will use the heaviest tipset.
883pub enum ChainGetTipSetByHeight {}
884impl RpcMethod<2> for ChainGetTipSetByHeight {
885    const NAME: &'static str = "Filecoin.ChainGetTipSetByHeight";
886    const PARAM_NAMES: [&'static str; 2] = ["height", "tipsetKey"];
887    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
888    const PERMISSION: Permission = Permission::Read;
889    const DESCRIPTION: Option<&'static str> = Some("Returns the tipset at the specified height.");
890
891    type Params = (ChainEpoch, ApiTipsetKey);
892    type Ok = Tipset;
893
894    async fn handle(
895        ctx: Ctx<impl Blockstore>,
896        (height, ApiTipsetKey(tipset_key)): Self::Params,
897    ) -> Result<Self::Ok, ServerError> {
898        let ts = ctx
899            .chain_store()
900            .load_required_tipset_or_heaviest(&tipset_key)?;
901        let tss = ctx
902            .chain_index()
903            .tipset_by_height(height, ts, ResolveNullTipset::TakeOlder)?;
904        Ok(tss)
905    }
906}
907
908pub enum ChainGetTipSetAfterHeight {}
909impl RpcMethod<2> for ChainGetTipSetAfterHeight {
910    const NAME: &'static str = "Filecoin.ChainGetTipSetAfterHeight";
911    const PARAM_NAMES: [&'static str; 2] = ["height", "tipsetKey"];
912    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
913    const PERMISSION: Permission = Permission::Read;
914    const DESCRIPTION: Option<&'static str> = Some(
915        "Looks back and returns the tipset at the specified epoch.
916    If there are no blocks at the given epoch,
917    returns the first non-nil tipset at a later epoch.",
918    );
919
920    type Params = (ChainEpoch, ApiTipsetKey);
921    type Ok = Tipset;
922
923    async fn handle(
924        ctx: Ctx<impl Blockstore>,
925        (height, ApiTipsetKey(tipset_key)): Self::Params,
926    ) -> Result<Self::Ok, ServerError> {
927        let ts = ctx
928            .chain_store()
929            .load_required_tipset_or_heaviest(&tipset_key)?;
930        let tss = ctx
931            .chain_index()
932            .tipset_by_height(height, ts, ResolveNullTipset::TakeNewer)?;
933        Ok(tss)
934    }
935}
936
937pub enum ChainGetGenesis {}
938impl RpcMethod<0> for ChainGetGenesis {
939    const NAME: &'static str = "Filecoin.ChainGetGenesis";
940    const PARAM_NAMES: [&'static str; 0] = [];
941    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
942    const PERMISSION: Permission = Permission::Read;
943
944    type Params = ();
945    type Ok = Option<Tipset>;
946
947    async fn handle(ctx: Ctx<impl Blockstore>, (): Self::Params) -> Result<Self::Ok, ServerError> {
948        let genesis = ctx.chain_store().genesis_block_header();
949        Ok(Some(Tipset::from(genesis)))
950    }
951}
952
953pub enum ChainHead {}
954impl RpcMethod<0> for ChainHead {
955    const NAME: &'static str = "Filecoin.ChainHead";
956    const PARAM_NAMES: [&'static str; 0] = [];
957    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
958    const PERMISSION: Permission = Permission::Read;
959    const DESCRIPTION: Option<&'static str> = Some("Returns the chain head (heaviest tipset).");
960
961    type Params = ();
962    type Ok = Tipset;
963
964    async fn handle(ctx: Ctx<impl Blockstore>, (): Self::Params) -> Result<Self::Ok, ServerError> {
965        let heaviest = ctx.chain_store().heaviest_tipset();
966        Ok(heaviest)
967    }
968}
969
970pub enum ChainGetBlock {}
971impl RpcMethod<1> for ChainGetBlock {
972    const NAME: &'static str = "Filecoin.ChainGetBlock";
973    const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
974    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
975    const PERMISSION: Permission = Permission::Read;
976    const DESCRIPTION: Option<&'static str> = Some("Returns the block with the specified CID.");
977
978    type Params = (Cid,);
979    type Ok = CachingBlockHeader;
980
981    async fn handle(
982        ctx: Ctx<impl Blockstore>,
983        (block_cid,): Self::Params,
984    ) -> Result<Self::Ok, ServerError> {
985        let blk: CachingBlockHeader = ctx.store().get_cbor_required(&block_cid)?;
986        Ok(blk)
987    }
988}
989
990pub enum ChainGetTipSet {}
991impl RpcMethod<1> for ChainGetTipSet {
992    const NAME: &'static str = "Filecoin.ChainGetTipSet";
993    const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
994    const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::{ V0 | V1 });
995    const PERMISSION: Permission = Permission::Read;
996    const DESCRIPTION: Option<&'static str> = Some("Returns the tipset with the specified CID.");
997
998    type Params = (ApiTipsetKey,);
999    type Ok = Tipset;
1000
1001    async fn handle(
1002        ctx: Ctx<impl Blockstore>,
1003        (ApiTipsetKey(tsk),): Self::Params,
1004    ) -> Result<Self::Ok, ServerError> {
1005        if let Some(tsk) = &tsk {
1006            let ts = ctx.chain_index().load_required_tipset(tsk)?;
1007            Ok(ts)
1008        } else {
1009            // It contains Lotus error message `NewTipSet called with zero length array of blocks` for parity tests
1010            Err(anyhow::anyhow!(
1011                "TipsetKey cannot be empty (NewTipSet called with zero length array of blocks)"
1012            )
1013            .into())
1014        }
1015    }
1016}
1017
1018pub enum ChainGetTipSetV2 {}
1019
1020impl ChainGetTipSetV2 {
1021    pub async fn get_tipset_by_anchor(
1022        ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1023        anchor: &Option<TipsetAnchor>,
1024    ) -> anyhow::Result<Option<Tipset>> {
1025        if let Some(anchor) = anchor {
1026            match (&anchor.key.0, &anchor.tag) {
1027                // Anchor is zero-valued. Fall back to heaviest tipset.
1028                (None, None) => Ok(Some(ctx.state_manager.heaviest_tipset())),
1029                // Get tipset at the specified key.
1030                (Some(tsk), None) => Ok(Some(ctx.chain_index().load_required_tipset(tsk)?)),
1031                (None, Some(tag)) => Self::get_tipset_by_tag(ctx, *tag).await,
1032                _ => {
1033                    anyhow::bail!("invalid anchor")
1034                }
1035            }
1036        } else {
1037            // No anchor specified. Fall back to finalized tipset.
1038            Self::get_tipset_by_tag(ctx, TipsetTag::Finalized).await
1039        }
1040    }
1041
1042    pub async fn get_tipset_by_tag(
1043        ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1044        tag: TipsetTag,
1045    ) -> anyhow::Result<Option<Tipset>> {
1046        match tag {
1047            TipsetTag::Latest => Ok(Some(ctx.state_manager.heaviest_tipset())),
1048            TipsetTag::Finalized => Self::get_latest_finalized_tipset(ctx).await,
1049            TipsetTag::Safe => Some(Self::get_latest_safe_tipset(ctx).await).transpose(),
1050        }
1051    }
1052
1053    pub async fn get_latest_safe_tipset(
1054        ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1055    ) -> anyhow::Result<Tipset> {
1056        let finalized = Self::get_latest_finalized_tipset(ctx).await?;
1057        let head = ctx.chain_store().heaviest_tipset();
1058        let safe_height = (head.epoch() - SAFE_HEIGHT_DISTANCE).max(0);
1059        if let Some(finalized) = finalized
1060            && finalized.epoch() >= safe_height
1061        {
1062            Ok(finalized)
1063        } else {
1064            Ok(ctx.chain_index().tipset_by_height(
1065                safe_height,
1066                head,
1067                ResolveNullTipset::TakeOlder,
1068            )?)
1069        }
1070    }
1071
1072    pub async fn get_latest_finalized_tipset(
1073        ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1074    ) -> anyhow::Result<Option<Tipset>> {
1075        let Ok(f3_finalized_cert) =
1076            crate::rpc::f3::F3GetLatestCertificate::handle(ctx.clone(), ()).await
1077        else {
1078            return Self::get_ec_finalized_tipset(ctx);
1079        };
1080
1081        let f3_finalized_head = f3_finalized_cert.chain_head();
1082        let head = ctx.chain_store().heaviest_tipset();
1083        // Latest F3 finalized tipset is older than EC finality, falling back to EC finality
1084        if head.epoch() > f3_finalized_head.epoch + ctx.chain_config().policy.chain_finality {
1085            return Self::get_ec_finalized_tipset(ctx);
1086        }
1087
1088        let ts = ctx
1089            .chain_index()
1090            .load_required_tipset(&f3_finalized_head.key)
1091            .map_err(|e| {
1092                anyhow::anyhow!(
1093                    "Failed to load F3 finalized tipset at epoch {} with key {}: {e}",
1094                    f3_finalized_head.epoch,
1095                    f3_finalized_head.key,
1096                )
1097            })?;
1098        Ok(Some(ts))
1099    }
1100
1101    pub fn get_ec_finalized_tipset(ctx: &Ctx<impl Blockstore>) -> anyhow::Result<Option<Tipset>> {
1102        let head = ctx.chain_store().heaviest_tipset();
1103        let ec_finality_epoch = head.epoch() - ctx.chain_config().policy.chain_finality;
1104        if ec_finality_epoch >= 0 {
1105            let ts = ctx.chain_index().tipset_by_height(
1106                ec_finality_epoch,
1107                head,
1108                ResolveNullTipset::TakeOlder,
1109            )?;
1110            Ok(Some(ts))
1111        } else {
1112            Ok(None)
1113        }
1114    }
1115
1116    pub async fn get_tipset(
1117        ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1118        selector: &TipsetSelector,
1119    ) -> anyhow::Result<Option<Tipset>> {
1120        selector.validate()?;
1121        // Get tipset by key.
1122        if let ApiTipsetKey(Some(tsk)) = &selector.key {
1123            let ts = ctx.chain_index().load_required_tipset(tsk)?;
1124            return Ok(Some(ts));
1125        }
1126        // Get tipset by height.
1127        if let Some(height) = &selector.height {
1128            let anchor = Self::get_tipset_by_anchor(ctx, &height.anchor).await?;
1129            let ts = ctx.chain_index().tipset_by_height(
1130                height.at,
1131                anchor.unwrap_or_else(|| ctx.chain_store().heaviest_tipset()),
1132                height.resolve_null_tipset_policy(),
1133            )?;
1134            return Ok(Some(ts));
1135        }
1136        // Get tipset by tag, either latest or finalized.
1137        if let Some(tag) = &selector.tag {
1138            let ts = Self::get_tipset_by_tag(ctx, *tag).await?;
1139            return Ok(ts);
1140        }
1141        anyhow::bail!("no tipset found for selector")
1142    }
1143
1144    pub async fn get_required_tipset(
1145        ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1146        selector: &TipsetSelector,
1147    ) -> anyhow::Result<Tipset> {
1148        Self::get_tipset(ctx, selector)
1149            .await?
1150            .context("failed to select a tipset")
1151    }
1152}
1153
1154impl RpcMethod<1> for ChainGetTipSetV2 {
1155    const NAME: &'static str = "Filecoin.ChainGetTipSet";
1156    const PARAM_NAMES: [&'static str; 1] = ["tipsetSelector"];
1157    const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::{ V2 });
1158    const PERMISSION: Permission = Permission::Read;
1159    const DESCRIPTION: Option<&'static str> = Some("Returns the tipset with the specified CID.");
1160
1161    type Params = (TipsetSelector,);
1162    type Ok = Option<Tipset>;
1163
1164    async fn handle(
1165        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
1166        (selector,): Self::Params,
1167    ) -> Result<Self::Ok, ServerError> {
1168        Ok(Self::get_tipset(&ctx, &selector).await?)
1169    }
1170}
1171
1172pub enum ChainSetHead {}
1173impl RpcMethod<1> for ChainSetHead {
1174    const NAME: &'static str = "Filecoin.ChainSetHead";
1175    const PARAM_NAMES: [&'static str; 1] = ["tsk"];
1176    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1177    const PERMISSION: Permission = Permission::Admin;
1178
1179    type Params = (TipsetKey,);
1180    type Ok = ();
1181
1182    async fn handle(
1183        ctx: Ctx<impl Blockstore>,
1184        (tsk,): Self::Params,
1185    ) -> Result<Self::Ok, ServerError> {
1186        // This is basically a port of the reference implementation at
1187        // https://github.com/filecoin-project/lotus/blob/v1.23.0/node/impl/full/chain.go#L321
1188
1189        let new_head = ctx.chain_index().load_required_tipset(&tsk)?;
1190        let mut current = ctx.chain_store().heaviest_tipset();
1191        while current.epoch() >= new_head.epoch() {
1192            for cid in current.key().to_cids() {
1193                ctx.chain_store().unmark_block_as_validated(&cid);
1194            }
1195            let parents = &current.block_headers().first().parents;
1196            current = ctx.chain_index().load_required_tipset(parents)?;
1197        }
1198        ctx.chain_store()
1199            .set_heaviest_tipset(new_head)
1200            .map_err(Into::into)
1201    }
1202}
1203
1204pub enum ChainGetMinBaseFee {}
1205impl RpcMethod<1> for ChainGetMinBaseFee {
1206    const NAME: &'static str = "Forest.ChainGetMinBaseFee";
1207    const PARAM_NAMES: [&'static str; 1] = ["lookback"];
1208    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1209    const PERMISSION: Permission = Permission::Read;
1210
1211    type Params = (u32,);
1212    type Ok = String;
1213
1214    async fn handle(
1215        ctx: Ctx<impl Blockstore>,
1216        (lookback,): Self::Params,
1217    ) -> Result<Self::Ok, ServerError> {
1218        let mut current = ctx.chain_store().heaviest_tipset();
1219        let mut min_base_fee = current.block_headers().first().parent_base_fee.clone();
1220
1221        for _ in 0..lookback {
1222            let parents = &current.block_headers().first().parents;
1223            current = ctx.chain_index().load_required_tipset(parents)?;
1224
1225            min_base_fee =
1226                min_base_fee.min(current.block_headers().first().parent_base_fee.to_owned());
1227        }
1228
1229        Ok(min_base_fee.atto().to_string())
1230    }
1231}
1232
1233pub enum ChainTipSetWeight {}
1234impl RpcMethod<1> for ChainTipSetWeight {
1235    const NAME: &'static str = "Filecoin.ChainTipSetWeight";
1236    const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
1237    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1238    const PERMISSION: Permission = Permission::Read;
1239    const DESCRIPTION: Option<&'static str> = Some("Returns the weight of the specified tipset.");
1240
1241    type Params = (ApiTipsetKey,);
1242    type Ok = BigInt;
1243
1244    async fn handle(
1245        ctx: Ctx<impl Blockstore>,
1246        (ApiTipsetKey(tipset_key),): Self::Params,
1247    ) -> Result<Self::Ok, ServerError> {
1248        let ts = ctx
1249            .chain_store()
1250            .load_required_tipset_or_heaviest(&tipset_key)?;
1251        let weight = crate::fil_cns::weight(ctx.store(), &ts)?;
1252        Ok(weight)
1253    }
1254}
1255
1256pub enum ChainGetTipsetByParentState {}
1257impl RpcMethod<1> for ChainGetTipsetByParentState {
1258    const NAME: &'static str = "Forest.ChainGetTipsetByParentState";
1259    const PARAM_NAMES: [&'static str; 1] = ["parentState"];
1260    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1261    const PERMISSION: Permission = Permission::Read;
1262
1263    type Params = (Cid,);
1264    type Ok = Option<Tipset>;
1265
1266    async fn handle(
1267        ctx: Ctx<impl Blockstore>,
1268        (parent_state,): Self::Params,
1269    ) -> Result<Self::Ok, ServerError> {
1270        Ok(ctx
1271            .chain_store()
1272            .heaviest_tipset()
1273            .chain(ctx.store())
1274            .find(|ts| ts.parent_state() == &parent_state)
1275            .clone())
1276    }
1277}
1278
1279pub const CHAIN_NOTIFY: &str = "Filecoin.ChainNotify";
1280pub(crate) fn chain_notify<DB: Blockstore>(
1281    _params: Params<'_>,
1282    data: &crate::rpc::RPCState<DB>,
1283) -> Subscriber<Vec<ApiHeadChange>> {
1284    let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
1285
1286    // As soon as the channel is created, send the current tipset
1287    let current = data.chain_store().heaviest_tipset();
1288    let (change, tipset) = ("current".into(), current);
1289    sender
1290        .send(vec![ApiHeadChange { change, tipset }])
1291        .expect("receiver is not dropped");
1292
1293    let mut subscriber = data.chain_store().publisher().subscribe();
1294
1295    tokio::spawn(async move {
1296        // Skip first message
1297        let _ = subscriber.recv().await;
1298
1299        while let Ok(v) = subscriber.recv().await {
1300            let (change, tipset) = match v {
1301                HeadChange::Apply(ts) => ("apply".into(), ts),
1302            };
1303
1304            if sender.send(vec![ApiHeadChange { change, tipset }]).is_err() {
1305                break;
1306            }
1307        }
1308    });
1309    receiver
1310}
1311
1312async fn load_api_messages_from_tipset<DB: Blockstore + Send + Sync + 'static>(
1313    ctx: &crate::rpc::RPCState<DB>,
1314    tipset_keys: &TipsetKey,
1315) -> Result<Vec<ApiMessage>, ServerError> {
1316    static SHOULD_BACKFILL: LazyLock<bool> = LazyLock::new(|| {
1317        let enabled = is_env_truthy("FOREST_RPC_BACKFILL_FULL_TIPSET_FROM_NETWORK");
1318        if enabled {
1319            tracing::warn!(
1320                "Full tipset backfilling from network is enabled via FOREST_RPC_BACKFILL_FULL_TIPSET_FROM_NETWORK, excessive disk and bandwidth usage is expected."
1321            );
1322        }
1323        enabled
1324    });
1325    let full_tipset = if *SHOULD_BACKFILL {
1326        get_full_tipset(
1327            &ctx.sync_network_context,
1328            ctx.chain_store(),
1329            None,
1330            tipset_keys,
1331        )
1332        .await?
1333    } else {
1334        load_full_tipset(ctx.chain_store(), tipset_keys)?
1335    };
1336    let blocks = full_tipset.into_blocks();
1337    let mut messages = vec![];
1338    let mut seen = CidHashSet::default();
1339    for Block {
1340        bls_messages,
1341        secp_messages,
1342        ..
1343    } in blocks
1344    {
1345        for message in bls_messages {
1346            let cid = message.cid();
1347            if seen.insert(cid) {
1348                messages.push(ApiMessage { cid, message });
1349            }
1350        }
1351
1352        for msg in secp_messages {
1353            let cid = msg.cid();
1354            if seen.insert(cid) {
1355                messages.push(ApiMessage {
1356                    cid,
1357                    message: msg.message,
1358                });
1359            }
1360        }
1361    }
1362
1363    Ok(messages)
1364}
1365
1366#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
1367pub struct BlockMessages {
1368    #[serde(rename = "BlsMessages", with = "crate::lotus_json")]
1369    #[schemars(with = "LotusJson<Vec<Message>>")]
1370    pub bls_msg: Vec<Message>,
1371    #[serde(rename = "SecpkMessages", with = "crate::lotus_json")]
1372    #[schemars(with = "LotusJson<Vec<SignedMessage>>")]
1373    pub secp_msg: Vec<SignedMessage>,
1374    #[serde(rename = "Cids", with = "crate::lotus_json")]
1375    #[schemars(with = "LotusJson<Vec<Cid>>")]
1376    pub cids: Vec<Cid>,
1377}
1378lotus_json_with_self!(BlockMessages);
1379
1380#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, JsonSchema)]
1381#[serde(rename_all = "PascalCase")]
1382pub struct ApiReceipt {
1383    // Exit status of message execution
1384    pub exit_code: ExitCode,
1385    // `Return` value if the exit code is zero
1386    #[serde(rename = "Return", with = "crate::lotus_json")]
1387    #[schemars(with = "LotusJson<RawBytes>")]
1388    pub return_data: RawBytes,
1389    // Non-negative value of GasUsed
1390    pub gas_used: u64,
1391    #[serde(with = "crate::lotus_json")]
1392    #[schemars(with = "LotusJson<Option<Cid>>")]
1393    pub events_root: Option<Cid>,
1394}
1395
1396lotus_json_with_self!(ApiReceipt);
1397
1398#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug, Eq, PartialEq)]
1399#[serde(rename_all = "PascalCase")]
1400pub struct ApiMessage {
1401    #[serde(with = "crate::lotus_json")]
1402    #[schemars(with = "LotusJson<Cid>")]
1403    pub cid: Cid,
1404    #[serde(with = "crate::lotus_json")]
1405    #[schemars(with = "LotusJson<Message>")]
1406    pub message: Message,
1407}
1408
1409lotus_json_with_self!(ApiMessage);
1410
1411#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug, Eq, PartialEq)]
1412pub struct FlattenedApiMessage {
1413    #[serde(flatten, with = "crate::lotus_json")]
1414    #[schemars(with = "LotusJson<Message>")]
1415    pub message: Message,
1416    #[serde(rename = "CID", with = "crate::lotus_json")]
1417    #[schemars(with = "LotusJson<Cid>")]
1418    pub cid: Cid,
1419}
1420
1421lotus_json_with_self!(FlattenedApiMessage);
1422
1423#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1424pub struct ForestChainExportParams {
1425    pub version: FilecoinSnapshotVersion,
1426    pub epoch: ChainEpoch,
1427    pub recent_roots: i64,
1428    pub output_path: PathBuf,
1429    #[schemars(with = "LotusJson<ApiTipsetKey>")]
1430    #[serde(with = "crate::lotus_json")]
1431    pub tipset_keys: ApiTipsetKey,
1432    pub skip_checksum: bool,
1433    pub dry_run: bool,
1434}
1435lotus_json_with_self!(ForestChainExportParams);
1436
1437#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1438pub struct ForestChainExportDiffParams {
1439    pub from: ChainEpoch,
1440    pub to: ChainEpoch,
1441    pub depth: i64,
1442    pub output_path: PathBuf,
1443}
1444lotus_json_with_self!(ForestChainExportDiffParams);
1445
1446#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1447pub struct ChainExportParams {
1448    pub epoch: ChainEpoch,
1449    pub recent_roots: i64,
1450    pub output_path: PathBuf,
1451    #[schemars(with = "LotusJson<ApiTipsetKey>")]
1452    #[serde(with = "crate::lotus_json")]
1453    pub tipset_keys: ApiTipsetKey,
1454    pub skip_checksum: bool,
1455    pub dry_run: bool,
1456}
1457lotus_json_with_self!(ChainExportParams);
1458
1459#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
1460#[serde(rename_all = "PascalCase")]
1461pub struct ApiHeadChange {
1462    #[serde(rename = "Type")]
1463    pub change: String,
1464    #[serde(rename = "Val", with = "crate::lotus_json")]
1465    #[schemars(with = "LotusJson<Tipset>")]
1466    pub tipset: Tipset,
1467}
1468lotus_json_with_self!(ApiHeadChange);
1469
1470#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
1471#[serde(tag = "Type", content = "Val", rename_all = "snake_case")]
1472pub enum PathChange<T = Tipset> {
1473    Revert(T),
1474    Apply(T),
1475}
1476impl HasLotusJson for PathChange {
1477    type LotusJson = PathChange<<Tipset as HasLotusJson>::LotusJson>;
1478
1479    #[cfg(test)]
1480    fn snapshots() -> Vec<(serde_json::Value, Self)> {
1481        use serde_json::json;
1482        vec![(
1483            json!({
1484                "Type": "revert",
1485                "Val": {
1486                    "Blocks": [
1487                        {
1488                            "BeaconEntries": null,
1489                            "ForkSignaling": 0,
1490                            "Height": 0,
1491                            "Messages": { "/": "baeaaaaa" },
1492                            "Miner": "f00",
1493                            "ParentBaseFee": "0",
1494                            "ParentMessageReceipts": { "/": "baeaaaaa" },
1495                            "ParentStateRoot": { "/":"baeaaaaa" },
1496                            "ParentWeight": "0",
1497                            "Parents": [{"/":"bafyreiaqpwbbyjo4a42saasj36kkrpv4tsherf2e7bvezkert2a7dhonoi"}],
1498                            "Timestamp": 0,
1499                            "WinPoStProof": null
1500                        }
1501                    ],
1502                    "Cids": [
1503                        { "/": "bafy2bzaceag62hjj3o43lf6oyeox3fvg5aqkgl5zagbwpjje3ajwg6yw4iixk" }
1504                    ],
1505                    "Height": 0
1506                }
1507            }),
1508            Self::Revert(RawBlockHeader::default().into()),
1509        )]
1510    }
1511
1512    fn into_lotus_json(self) -> Self::LotusJson {
1513        match self {
1514            PathChange::Revert(it) => PathChange::Revert(it.into_lotus_json()),
1515            PathChange::Apply(it) => PathChange::Apply(it.into_lotus_json()),
1516        }
1517    }
1518
1519    fn from_lotus_json(lotus_json: Self::LotusJson) -> Self {
1520        match lotus_json {
1521            PathChange::Revert(it) => PathChange::Revert(Tipset::from_lotus_json(it)),
1522            PathChange::Apply(it) => PathChange::Apply(Tipset::from_lotus_json(it)),
1523        }
1524    }
1525}
1526
1527#[cfg(test)]
1528impl<T> quickcheck::Arbitrary for PathChange<T>
1529where
1530    T: quickcheck::Arbitrary,
1531{
1532    fn arbitrary(g: &mut quickcheck::Gen) -> Self {
1533        let inner = T::arbitrary(g);
1534        g.choose(&[PathChange::Apply(inner.clone()), PathChange::Revert(inner)])
1535            .unwrap()
1536            .clone()
1537    }
1538}
1539
1540#[test]
1541fn snapshots() {
1542    assert_all_snapshots::<PathChange>()
1543}
1544
1545#[cfg(test)]
1546quickcheck::quickcheck! {
1547    fn quickcheck(val: PathChange) -> () {
1548        assert_unchanged_via_json(val)
1549    }
1550}
1551
1552#[cfg(test)]
1553mod tests {
1554    use super::*;
1555    use crate::{
1556        blocks::{Chain4U, RawBlockHeader, chain4u},
1557        db::{
1558            MemoryDB,
1559            car::{AnyCar, ManyCar},
1560        },
1561        networks::{self, ChainConfig},
1562    };
1563    use PathChange::{Apply, Revert};
1564    use itertools::Itertools as _;
1565    use std::sync::Arc;
1566
1567    #[test]
1568    fn revert_to_ancestor_linear() {
1569        let store = ChainStore::calibnet();
1570        chain4u! {
1571            in store.blockstore();
1572            [_genesis = store.genesis_block_header()]
1573            -> [a] -> [b] -> [c, d] -> [e]
1574        };
1575
1576        // simple
1577        assert_path_change(&store, b, a, [Revert(&[b])]);
1578
1579        // from multi-member tipset
1580        assert_path_change(&store, [c, d], a, [Revert(&[c, d][..]), Revert(&[b])]);
1581
1582        // to multi-member tipset
1583        assert_path_change(&store, e, [c, d], [Revert(e)]);
1584
1585        // over multi-member tipset
1586        assert_path_change(&store, e, b, [Revert(&[e][..]), Revert(&[c, d])]);
1587    }
1588
1589    /// Mirror how lotus handles passing an incomplete `TipsetKey`s.
1590    /// Tested on lotus `1.23.2`
1591    #[test]
1592    fn incomplete_tipsets() {
1593        let store = ChainStore::calibnet();
1594        chain4u! {
1595            in store.blockstore();
1596            [_genesis = store.genesis_block_header()]
1597            -> [a, b] -> [c] -> [d, _e] // this pattern 2 -> 1 -> 2 can be found at calibnet epoch 1369126
1598        };
1599
1600        // apply to descendant with incomplete `from`
1601        assert_path_change(
1602            &store,
1603            a,
1604            c,
1605            [
1606                Revert(&[a][..]), // revert the incomplete tipset
1607                Apply(&[a, b]),   // apply the complete one
1608                Apply(&[c]),      // apply the destination
1609            ],
1610        );
1611
1612        // apply to descendant with incomplete `to`
1613        assert_path_change(&store, c, d, [Apply(d)]);
1614
1615        // revert to ancestor with incomplete `from`
1616        assert_path_change(&store, d, c, [Revert(d)]);
1617
1618        // revert to ancestor with incomplete `to`
1619        assert_path_change(
1620            &store,
1621            c,
1622            a,
1623            [
1624                Revert(&[c][..]),
1625                Revert(&[a, b]), // revert the complete tipset
1626                Apply(&[a]),     // apply the incomplete one
1627            ],
1628        );
1629    }
1630
1631    #[test]
1632    fn apply_to_descendant_linear() {
1633        let store = ChainStore::calibnet();
1634        chain4u! {
1635            in store.blockstore();
1636            [_genesis = store.genesis_block_header()]
1637            -> [a] -> [b] -> [c, d] -> [e]
1638        };
1639
1640        // simple
1641        assert_path_change(&store, a, b, [Apply(&[b])]);
1642
1643        // from multi-member tipset
1644        assert_path_change(&store, [c, d], e, [Apply(e)]);
1645
1646        // to multi-member tipset
1647        assert_path_change(&store, b, [c, d], [Apply([c, d])]);
1648
1649        // over multi-member tipset
1650        assert_path_change(&store, b, e, [Apply(&[c, d][..]), Apply(&[e])]);
1651    }
1652
1653    #[test]
1654    fn cross_fork_simple() {
1655        let store = ChainStore::calibnet();
1656        chain4u! {
1657            in store.blockstore();
1658            [_genesis = store.genesis_block_header()]
1659            -> [a] -> [b1] -> [c1]
1660        };
1661        chain4u! {
1662            from [a] in store.blockstore();
1663            [b2] -> [c2]
1664        };
1665
1666        // same height
1667        assert_path_change(&store, b1, b2, [Revert(b1), Apply(b2)]);
1668
1669        // different height
1670        assert_path_change(&store, b1, c2, [Revert(b1), Apply(b2), Apply(c2)]);
1671
1672        let _ = (a, c1);
1673    }
1674
1675    impl ChainStore<Chain4U<ManyCar>> {
1676        fn _load(genesis_car: &'static [u8], genesis_cid: Cid) -> Self {
1677            let db = Arc::new(Chain4U::with_blockstore(
1678                ManyCar::new(MemoryDB::default())
1679                    .with_read_only(AnyCar::new(genesis_car).unwrap())
1680                    .unwrap(),
1681            ));
1682            let genesis_block_header = db.get_cbor(&genesis_cid).unwrap().unwrap();
1683            ChainStore::new(
1684                db,
1685                Arc::new(MemoryDB::default()),
1686                Arc::new(MemoryDB::default()),
1687                Arc::new(ChainConfig::calibnet()),
1688                genesis_block_header,
1689            )
1690            .unwrap()
1691        }
1692        pub fn calibnet() -> Self {
1693            Self::_load(
1694                networks::calibnet::DEFAULT_GENESIS,
1695                *networks::calibnet::GENESIS_CID,
1696            )
1697        }
1698    }
1699
1700    /// Utility for writing ergonomic tests
1701    trait MakeTipset {
1702        fn make_tipset(self) -> Tipset;
1703    }
1704
1705    impl MakeTipset for &RawBlockHeader {
1706        fn make_tipset(self) -> Tipset {
1707            Tipset::from(CachingBlockHeader::new(self.clone()))
1708        }
1709    }
1710
1711    impl<const N: usize> MakeTipset for [&RawBlockHeader; N] {
1712        fn make_tipset(self) -> Tipset {
1713            self.as_slice().make_tipset()
1714        }
1715    }
1716
1717    impl<const N: usize> MakeTipset for &[&RawBlockHeader; N] {
1718        fn make_tipset(self) -> Tipset {
1719            self.as_slice().make_tipset()
1720        }
1721    }
1722
1723    impl MakeTipset for &[&RawBlockHeader] {
1724        fn make_tipset(self) -> Tipset {
1725            Tipset::new(self.iter().cloned().cloned()).unwrap()
1726        }
1727    }
1728
1729    #[track_caller]
1730    fn assert_path_change<T: MakeTipset>(
1731        store: &ChainStore<impl Blockstore>,
1732        from: impl MakeTipset,
1733        to: impl MakeTipset,
1734        expected: impl IntoIterator<Item = PathChange<T>>,
1735    ) {
1736        fn print(path_change: &PathChange) {
1737            let it = match path_change {
1738                Revert(it) => {
1739                    print!("Revert(");
1740                    it
1741                }
1742                Apply(it) => {
1743                    print!(" Apply(");
1744                    it
1745                }
1746            };
1747            println!(
1748                "epoch = {}, key.cid = {})",
1749                it.epoch(),
1750                it.key().cid().unwrap()
1751            )
1752        }
1753
1754        let actual =
1755            impl_chain_get_path(store, from.make_tipset().key(), to.make_tipset().key()).unwrap();
1756        let expected = expected
1757            .into_iter()
1758            .map(|change| match change {
1759                PathChange::Revert(it) => PathChange::Revert(it.make_tipset()),
1760                PathChange::Apply(it) => PathChange::Apply(it.make_tipset()),
1761            })
1762            .collect_vec();
1763        if expected != actual {
1764            println!("SUMMARY");
1765            println!("=======");
1766            println!("expected:");
1767            for it in &expected {
1768                print(it)
1769            }
1770            println!();
1771            println!("actual:");
1772            for it in &actual {
1773                print(it)
1774            }
1775            println!("=======\n")
1776        }
1777        assert_eq!(
1778            expected, actual,
1779            "expected change (left) does not match actual change (right)"
1780        )
1781    }
1782}