Skip to main content

forest/rpc/methods/
chain.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4pub mod types;
5use enumflags2::{BitFlags, make_bitflags};
6use types::*;
7
8#[cfg(test)]
9use crate::blocks::RawBlockHeader;
10use crate::blocks::{Block, CachingBlockHeader, Tipset, TipsetKey};
11use crate::chain::index::ResolveNullTipset;
12use crate::chain::{ChainStore, ExportOptions, FilecoinSnapshotVersion, HeadChange};
13use crate::chain_sync::{get_full_tipset, load_full_tipset};
14use crate::cid_collections::CidHashSet;
15use crate::ipld::DfsIter;
16use crate::ipld::{CHAIN_EXPORT_STATUS, cancel_export, end_export, start_export};
17use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self};
18#[cfg(test)]
19use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json};
20use crate::message::{ChainMessage, SignedMessage};
21use crate::rpc::eth::Block as EthBlock;
22use crate::rpc::eth::{
23    EthLog, TxInfo, eth_logs_with_filter, types::ApiHeaders, types::EthFilterSpec,
24};
25use crate::rpc::f3::F3ExportLatestSnapshot;
26use crate::rpc::types::*;
27use crate::rpc::{ApiPaths, Ctx, EthEventHandler, Permission, RpcMethod, ServerError};
28use crate::shim::clock::ChainEpoch;
29use crate::shim::error::ExitCode;
30use crate::shim::executor::Receipt;
31use crate::shim::message::Message;
32use crate::utils::db::CborStoreExt as _;
33use crate::utils::io::VoidAsyncWriter;
34use crate::utils::misc::env::is_env_truthy;
35use anyhow::{Context as _, Result};
36use cid::Cid;
37use fvm_ipld_blockstore::Blockstore;
38use fvm_ipld_encoding::{CborStore, RawBytes};
39use hex::ToHex;
40use ipld_core::ipld::Ipld;
41use jsonrpsee::types::Params;
42use jsonrpsee::types::error::ErrorObjectOwned;
43use num::BigInt;
44use schemars::JsonSchema;
45use serde::{Deserialize, Serialize};
46use sha2::Sha256;
47use std::fs::File;
48use std::{collections::VecDeque, path::PathBuf, sync::LazyLock};
49use tokio::sync::{
50    Mutex,
51    broadcast::{self, Receiver as Subscriber},
52};
53use tokio::task::JoinHandle;
54use tokio_util::sync::CancellationToken;
55
56const HEAD_CHANNEL_CAPACITY: usize = 10;
57
58/// [`SAFE_HEIGHT_DISTANCE`] is the distance from the latest tipset, i.e. "heaviest", that
59/// is considered to be safe from re-orgs at an increasingly diminishing
60/// probability.
61///
62/// This is used to determine the safe tipset when using the "safe" tag in
63/// [`TipsetSelector`] or via Eth JSON-RPC APIs. Note that "safe" doesn't guarantee
64/// finality, but rather a high probability of not being reverted. For guaranteed
65/// finality, use the "finalized" tag.
66///
67/// This constant is experimental and may change in the future.
68/// Discussion on this current value and a tracking item to document the
69/// probabilistic impact of various values is in
70/// https://github.com/filecoin-project/go-f3/issues/944
71pub const SAFE_HEIGHT_DISTANCE: ChainEpoch = 200;
72
73static CHAIN_EXPORT_LOCK: LazyLock<Mutex<Option<CancellationToken>>> =
74    LazyLock::new(|| Mutex::new(None));
75
76/// Subscribes to head changes from the chain store and broadcasts new blocks.
77///
78/// # Notes
79///
80/// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`,
81/// allowing manual cleanup if needed.
82pub(crate) fn new_heads<DB: Blockstore + Send + Sync + 'static>(
83    data: Ctx<DB>,
84) -> (Subscriber<ApiHeaders>, JoinHandle<()>) {
85    let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
86
87    let mut subscriber = data.chain_store().publisher().subscribe();
88
89    let handle = tokio::spawn(async move {
90        while let Ok(v) = subscriber.recv().await {
91            let headers = match v {
92                HeadChange::Apply(ts) => {
93                    // Convert the tipset to an Ethereum block with full transaction info
94                    // Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block
95                    match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await {
96                        Ok(block) => ApiHeaders(block),
97                        Err(e) => {
98                            tracing::error!("Failed to convert tipset to eth block: {}", e);
99                            continue;
100                        }
101                    }
102                }
103            };
104            if let Err(e) = sender.send(headers) {
105                tracing::error!("Failed to send headers: {}", e);
106                break;
107            }
108        }
109    });
110
111    (receiver, handle)
112}
113
114/// Subscribes to head changes from the chain store and broadcasts new `Ethereum` logs.
115///
116/// # Notes
117///
118/// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`,
119/// allowing manual cleanup if needed.
120pub(crate) fn logs<DB: Blockstore + Sync + Send + 'static>(
121    ctx: &Ctx<DB>,
122    filter: Option<EthFilterSpec>,
123) -> (Subscriber<Vec<EthLog>>, JoinHandle<()>) {
124    let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
125
126    let mut subscriber = ctx.chain_store().publisher().subscribe();
127
128    let ctx = ctx.clone();
129
130    let handle = tokio::spawn(async move {
131        while let Ok(v) = subscriber.recv().await {
132            match v {
133                HeadChange::Apply(ts) => {
134                    match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await {
135                        Ok(logs) => {
136                            if !logs.is_empty()
137                                && let Err(e) = sender.send(logs)
138                            {
139                                tracing::error!(
140                                    "Failed to send logs for tipset {}: {}",
141                                    ts.key(),
142                                    e
143                                );
144                                break;
145                            }
146                        }
147                        Err(e) => {
148                            tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e);
149                        }
150                    }
151                }
152            }
153        }
154    });
155
156    (receiver, handle)
157}
158
159pub enum ChainGetFinalizedTipset {}
160impl RpcMethod<0> for ChainGetFinalizedTipset {
161    const NAME: &'static str = "Filecoin.ChainGetFinalizedTipSet";
162    const PARAM_NAMES: [&'static str; 0] = [];
163    const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::V1);
164    const PERMISSION: Permission = Permission::Read;
165    const DESCRIPTION: Option<&'static str> = Some(
166        "Returns the latest F3 finalized tipset, or falls back to EC finality if F3 is not operational on the node or if the F3 finalized tipset is further back than EC finalized tipset.",
167    );
168
169    type Params = ();
170    type Ok = Tipset;
171
172    async fn handle(
173        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
174        (): Self::Params,
175        _: &http::Extensions,
176    ) -> Result<Self::Ok, ServerError> {
177        let head = ctx.chain_store().heaviest_tipset();
178        let ec_finality_epoch = (head.epoch() - ctx.chain_config().policy.chain_finality).max(0);
179
180        // Either get the f3 finalized tipset or the ec finalized tipset
181        match get_f3_finality_tipset(&ctx, ec_finality_epoch).await {
182            Ok(f3_tipset) => {
183                tracing::debug!("Using F3 finalized tipset at epoch {}", f3_tipset.epoch());
184                Ok(f3_tipset)
185            }
186            Err(_) => {
187                // fallback to ec finality
188                tracing::warn!("F3 finalization unavailable, falling back to EC finality");
189                let ec_tipset = ctx.chain_index().tipset_by_height(
190                    ec_finality_epoch,
191                    head,
192                    ResolveNullTipset::TakeOlder,
193                )?;
194                Ok(ec_tipset)
195            }
196        }
197    }
198}
199
200// get f3 finalized tipset based on ec finality epoch
201async fn get_f3_finality_tipset<DB: Blockstore + Sync + Send + 'static>(
202    ctx: &Ctx<DB>,
203    ec_finality_epoch: ChainEpoch,
204) -> Result<Tipset> {
205    let f3_finalized_cert = crate::rpc::f3::F3GetLatestCertificate::get()
206        .await
207        .map_err(|e| anyhow::anyhow!("Failed to get F3 certificate: {}", e))?;
208
209    let f3_finalized_head = f3_finalized_cert.chain_head();
210    if f3_finalized_head.epoch < ec_finality_epoch {
211        return Err(anyhow::anyhow!(
212            "F3 finalized tipset epoch {} is further back than EC finalized tipset epoch {}",
213            f3_finalized_head.epoch,
214            ec_finality_epoch
215        ));
216    }
217
218    ctx.chain_index()
219        .load_required_tipset(&f3_finalized_head.key)
220        .map_err(|e| {
221            anyhow::anyhow!(
222                "Failed to load F3 finalized tipset at epoch {}: {}",
223                f3_finalized_head.epoch,
224                e
225            )
226        })
227}
228
229pub enum ChainGetMessage {}
230impl RpcMethod<1> for ChainGetMessage {
231    const NAME: &'static str = "Filecoin.ChainGetMessage";
232    const PARAM_NAMES: [&'static str; 1] = ["messageCid"];
233    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
234    const PERMISSION: Permission = Permission::Read;
235    const DESCRIPTION: Option<&'static str> = Some("Returns the message with the specified CID.");
236
237    type Params = (Cid,);
238    type Ok = FlattenedApiMessage;
239
240    async fn handle(
241        ctx: Ctx<impl Blockstore>,
242        (message_cid,): Self::Params,
243        _: &http::Extensions,
244    ) -> Result<Self::Ok, ServerError> {
245        let chain_message: ChainMessage = ctx
246            .store()
247            .get_cbor(&message_cid)?
248            .with_context(|| format!("can't find message with cid {message_cid}"))?;
249        let message = match chain_message {
250            ChainMessage::Signed(m) => m.into_message(),
251            ChainMessage::Unsigned(m) => m,
252        };
253
254        let cid = message.cid();
255        Ok(FlattenedApiMessage { message, cid })
256    }
257}
258
259/// Returns the events stored under the given event AMT root CID.
260/// Errors if the root CID cannot be found in the blockstore.
261pub enum ChainGetEvents {}
262impl RpcMethod<1> for ChainGetEvents {
263    const NAME: &'static str = "Filecoin.ChainGetEvents";
264    const PARAM_NAMES: [&'static str; 1] = ["rootCid"];
265    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
266    const PERMISSION: Permission = Permission::Read;
267    const DESCRIPTION: Option<&'static str> =
268        Some("Returns the events under the given event AMT root CID.");
269
270    type Params = (Cid,);
271    type Ok = Vec<Event>;
272    async fn handle(
273        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
274        (root_cid,): Self::Params,
275        _: &http::Extensions,
276    ) -> Result<Self::Ok, ServerError> {
277        let events = EthEventHandler::get_events_by_event_root(&ctx, &root_cid)?;
278        Ok(events)
279    }
280}
281
282pub enum ChainGetParentMessages {}
283impl RpcMethod<1> for ChainGetParentMessages {
284    const NAME: &'static str = "Filecoin.ChainGetParentMessages";
285    const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
286    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
287    const PERMISSION: Permission = Permission::Read;
288    const DESCRIPTION: Option<&'static str> =
289        Some("Returns the messages included in the blocks of the parent tipset.");
290
291    type Params = (Cid,);
292    type Ok = Vec<ApiMessage>;
293
294    async fn handle(
295        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
296        (block_cid,): Self::Params,
297        _: &http::Extensions,
298    ) -> Result<Self::Ok, ServerError> {
299        let store = ctx.store();
300        let block_header: CachingBlockHeader = store
301            .get_cbor(&block_cid)?
302            .with_context(|| format!("can't find block header with cid {block_cid}"))?;
303        if block_header.epoch == 0 {
304            Ok(vec![])
305        } else {
306            let parent_tipset = Tipset::load_required(store, &block_header.parents)?;
307            load_api_messages_from_tipset(&ctx, parent_tipset.key()).await
308        }
309    }
310}
311
312pub enum ChainGetParentReceipts {}
313impl RpcMethod<1> for ChainGetParentReceipts {
314    const NAME: &'static str = "Filecoin.ChainGetParentReceipts";
315    const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
316    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
317    const PERMISSION: Permission = Permission::Read;
318    const DESCRIPTION: Option<&'static str> =
319        Some("Returns the message receipts included in the blocks of the parent tipset.");
320
321    type Params = (Cid,);
322    type Ok = Vec<ApiReceipt>;
323
324    async fn handle(
325        ctx: Ctx<impl Blockstore>,
326        (block_cid,): Self::Params,
327        _: &http::Extensions,
328    ) -> Result<Self::Ok, ServerError> {
329        let store = ctx.store();
330        let block_header: CachingBlockHeader = store
331            .get_cbor(&block_cid)?
332            .with_context(|| format!("can't find block header with cid {block_cid}"))?;
333        if block_header.epoch == 0 {
334            return Ok(vec![]);
335        }
336        let receipts = Receipt::get_receipts(store, block_header.message_receipts)
337            .map_err(|_| {
338                ErrorObjectOwned::owned::<()>(
339                    1,
340                    format!(
341                        "failed to root: ipld: could not find {}",
342                        block_header.message_receipts
343                    ),
344                    None,
345                )
346            })?
347            .iter()
348            .map(|r| ApiReceipt {
349                exit_code: r.exit_code().into(),
350                return_data: r.return_data(),
351                gas_used: r.gas_used(),
352                events_root: r.events_root(),
353            })
354            .collect();
355
356        Ok(receipts)
357    }
358}
359
360pub enum ChainGetMessagesInTipset {}
361impl RpcMethod<1> for ChainGetMessagesInTipset {
362    const NAME: &'static str = "Filecoin.ChainGetMessagesInTipset";
363    const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
364    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
365    const PERMISSION: Permission = Permission::Read;
366
367    type Params = (ApiTipsetKey,);
368    type Ok = Vec<ApiMessage>;
369
370    async fn handle(
371        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
372        (ApiTipsetKey(tipset_key),): Self::Params,
373        _: &http::Extensions,
374    ) -> Result<Self::Ok, ServerError> {
375        let tipset = ctx
376            .chain_store()
377            .load_required_tipset_or_heaviest(&tipset_key)?;
378        load_api_messages_from_tipset(&ctx, tipset.key()).await
379    }
380}
381
382pub enum ChainPruneSnapshot {}
383impl RpcMethod<1> for ChainPruneSnapshot {
384    const NAME: &'static str = "Forest.SnapshotGC";
385    const PARAM_NAMES: [&'static str; 1] = ["blocking"];
386    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
387    const PERMISSION: Permission = Permission::Admin;
388
389    type Params = (bool,);
390    type Ok = ();
391
392    async fn handle(
393        _ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
394        (blocking,): Self::Params,
395        _: &http::Extensions,
396    ) -> Result<Self::Ok, ServerError> {
397        if let Some(gc) = crate::daemon::GLOBAL_SNAPSHOT_GC.get() {
398            let progress_rx = gc.trigger()?;
399            while blocking && progress_rx.recv_async().await.is_ok() {}
400            Ok(())
401        } else {
402            Err(anyhow::anyhow!("snapshot gc is not enabled").into())
403        }
404    }
405}
406
407pub enum ForestChainExport {}
408impl RpcMethod<1> for ForestChainExport {
409    const NAME: &'static str = "Forest.ChainExport";
410    const PARAM_NAMES: [&'static str; 1] = ["params"];
411    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
412    const PERMISSION: Permission = Permission::Read;
413
414    type Params = (ForestChainExportParams,);
415    type Ok = ApiExportResult;
416
417    async fn handle(
418        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
419        (params,): Self::Params,
420        _: &http::Extensions,
421    ) -> Result<Self::Ok, ServerError> {
422        let ForestChainExportParams {
423            version,
424            epoch,
425            recent_roots,
426            output_path,
427            tipset_keys: ApiTipsetKey(tsk),
428            include_receipts,
429            include_events,
430            skip_checksum,
431            dry_run,
432        } = params;
433
434        let token = CancellationToken::new();
435        {
436            let mut guard = CHAIN_EXPORT_LOCK.lock().await;
437            if guard.is_some() {
438                return Err(
439                    anyhow::anyhow!("A chain export is still in progress. Cancel it with the export-cancel subcommand if needed.").into(),
440                );
441            }
442            *guard = Some(token.clone());
443        }
444        start_export();
445
446        let head = ctx.chain_store().load_required_tipset_or_heaviest(&tsk)?;
447        let start_ts =
448            ctx.chain_index()
449                .tipset_by_height(epoch, head, ResolveNullTipset::TakeOlder)?;
450
451        let options = Some(ExportOptions {
452            skip_checksum,
453            include_receipts,
454            include_events,
455            seen: Default::default(),
456        });
457        let writer = if dry_run {
458            tokio_util::either::Either::Left(VoidAsyncWriter)
459        } else {
460            tokio_util::either::Either::Right(tokio::fs::File::create(&output_path).await?)
461        };
462        let result = match version {
463            FilecoinSnapshotVersion::V1 => {
464                let db = ctx.store_owned();
465
466                let chain_export =
467                    crate::chain::export::<Sha256>(&db, &start_ts, recent_roots, writer, options);
468
469                tokio::select! {
470                    result = chain_export => {
471                        result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex())))
472                    },
473                    _ = token.cancelled() => {
474                        cancel_export();
475                        tracing::warn!("Snapshot export was cancelled");
476                        Ok(ApiExportResult::Cancelled)
477                    },
478                }
479            }
480            FilecoinSnapshotVersion::V2 => {
481                let db = ctx.store_owned();
482
483                let f3_snap_tmp_path = {
484                    let mut f3_snap_dir = output_path.clone();
485                    let mut builder = tempfile::Builder::new();
486                    let with_suffix = builder.suffix(".f3snap.bin");
487                    if f3_snap_dir.pop() {
488                        with_suffix.tempfile_in(&f3_snap_dir)
489                    } else {
490                        with_suffix.tempfile_in(".")
491                    }?
492                    .into_temp_path()
493                };
494                let f3_snap = {
495                    match F3ExportLatestSnapshot::run(f3_snap_tmp_path.display().to_string()).await
496                    {
497                        Ok(cid) => Some((cid, File::open(&f3_snap_tmp_path)?)),
498                        Err(e) => {
499                            tracing::error!("Failed to export F3 snapshot: {e}");
500                            None
501                        }
502                    }
503                };
504
505                let chain_export = crate::chain::export_v2::<Sha256, _>(
506                    &db,
507                    f3_snap,
508                    &start_ts,
509                    recent_roots,
510                    writer,
511                    options,
512                );
513
514                tokio::select! {
515                    result = chain_export => {
516                        result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex())))
517                    },
518                    _ = token.cancelled() => {
519                        cancel_export();
520                        tracing::warn!("Snapshot export was cancelled");
521                        Ok(ApiExportResult::Cancelled)
522                    },
523                }
524            }
525        };
526        end_export();
527        // Clean up token
528        let mut guard = CHAIN_EXPORT_LOCK.lock().await;
529        *guard = None;
530        match result {
531            Ok(export_result) => Ok(export_result),
532            Err(e) => Err(anyhow::anyhow!(e).into()),
533        }
534    }
535}
536
537pub enum ForestChainExportStatus {}
538impl RpcMethod<0> for ForestChainExportStatus {
539    const NAME: &'static str = "Forest.ChainExportStatus";
540    const PARAM_NAMES: [&'static str; 0] = [];
541    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
542    const PERMISSION: Permission = Permission::Read;
543
544    type Params = ();
545    type Ok = ApiExportStatus;
546
547    async fn handle(
548        _ctx: Ctx<impl Blockstore>,
549        (): Self::Params,
550        _: &http::Extensions,
551    ) -> Result<Self::Ok, ServerError> {
552        let mutex = CHAIN_EXPORT_STATUS.lock();
553
554        let progress = if mutex.initial_epoch == 0 {
555            0.0
556        } else {
557            let p = 1.0 - ((mutex.epoch as f64) / (mutex.initial_epoch as f64));
558            if p.is_finite() {
559                p.clamp(0.0, 1.0)
560            } else {
561                0.0
562            }
563        };
564        // only two decimal places
565        let progress = (progress * 100.0).round() / 100.0;
566
567        let status = ApiExportStatus {
568            progress,
569            exporting: mutex.exporting,
570            cancelled: mutex.cancelled,
571            start_time: mutex.start_time,
572        };
573
574        Ok(status)
575    }
576}
577
578pub enum ForestChainExportCancel {}
579impl RpcMethod<0> for ForestChainExportCancel {
580    const NAME: &'static str = "Forest.ChainExportCancel";
581    const PARAM_NAMES: [&'static str; 0] = [];
582    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
583    const PERMISSION: Permission = Permission::Read;
584
585    type Params = ();
586    type Ok = bool;
587
588    async fn handle(
589        _ctx: Ctx<impl Blockstore>,
590        (): Self::Params,
591        _: &http::Extensions,
592    ) -> Result<Self::Ok, ServerError> {
593        if let Some(token) = CHAIN_EXPORT_LOCK.lock().await.as_ref() {
594            token.cancel();
595            return Ok(true);
596        }
597
598        Ok(false)
599    }
600}
601
602pub enum ForestChainExportDiff {}
603impl RpcMethod<1> for ForestChainExportDiff {
604    const NAME: &'static str = "Forest.ChainExportDiff";
605    const PARAM_NAMES: [&'static str; 1] = ["params"];
606    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
607    const PERMISSION: Permission = Permission::Read;
608
609    type Params = (ForestChainExportDiffParams,);
610    type Ok = ();
611
612    async fn handle(
613        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
614        (params,): Self::Params,
615        _: &http::Extensions,
616    ) -> Result<Self::Ok, ServerError> {
617        let ForestChainExportDiffParams {
618            from,
619            to,
620            depth,
621            output_path,
622        } = params;
623
624        let _locked = CHAIN_EXPORT_LOCK.try_lock();
625        if _locked.is_err() {
626            return Err(
627                anyhow::anyhow!("Another chain export diff job is still in progress").into(),
628            );
629        }
630
631        let chain_finality = ctx.chain_config().policy.chain_finality;
632        if depth < chain_finality {
633            return Err(
634                anyhow::anyhow!(format!("depth must be greater than {chain_finality}")).into(),
635            );
636        }
637
638        let head = ctx.chain_store().heaviest_tipset();
639        let start_ts =
640            ctx.chain_index()
641                .tipset_by_height(from, head, ResolveNullTipset::TakeOlder)?;
642
643        crate::tool::subcommands::archive_cmd::do_export(
644            &ctx.store_owned(),
645            start_ts,
646            output_path,
647            None,
648            depth,
649            Some(to),
650            Some(chain_finality),
651            true,
652        )
653        .await?;
654
655        Ok(())
656    }
657}
658
659pub enum ChainExport {}
660impl RpcMethod<1> for ChainExport {
661    const NAME: &'static str = "Filecoin.ChainExport";
662    const PARAM_NAMES: [&'static str; 1] = ["params"];
663    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
664    const PERMISSION: Permission = Permission::Read;
665
666    type Params = (ChainExportParams,);
667    type Ok = ApiExportResult;
668
669    async fn handle(
670        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
671        (ChainExportParams {
672            epoch,
673            recent_roots,
674            output_path,
675            tipset_keys,
676            skip_checksum,
677            dry_run,
678        },): Self::Params,
679        ext: &http::Extensions,
680    ) -> Result<Self::Ok, ServerError> {
681        ForestChainExport::handle(
682            ctx,
683            (ForestChainExportParams {
684                version: FilecoinSnapshotVersion::V1,
685                epoch,
686                recent_roots,
687                output_path,
688                tipset_keys,
689                include_receipts: false,
690                include_events: false,
691                skip_checksum,
692                dry_run,
693            },),
694            ext,
695        )
696        .await
697    }
698}
699
700pub enum ChainReadObj {}
701impl RpcMethod<1> for ChainReadObj {
702    const NAME: &'static str = "Filecoin.ChainReadObj";
703    const PARAM_NAMES: [&'static str; 1] = ["cid"];
704    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
705    const PERMISSION: Permission = Permission::Read;
706    const DESCRIPTION: Option<&'static str> = Some(
707        "Reads IPLD nodes referenced by the specified CID from the chain blockstore and returns raw bytes.",
708    );
709
710    type Params = (Cid,);
711    type Ok = Vec<u8>;
712
713    async fn handle(
714        ctx: Ctx<impl Blockstore>,
715        (cid,): Self::Params,
716        _: &http::Extensions,
717    ) -> Result<Self::Ok, ServerError> {
718        let bytes = ctx
719            .store()
720            .get(&cid)?
721            .with_context(|| format!("can't find object with cid={cid}"))?;
722        Ok(bytes)
723    }
724}
725
726pub enum ChainHasObj {}
727impl RpcMethod<1> for ChainHasObj {
728    const NAME: &'static str = "Filecoin.ChainHasObj";
729    const PARAM_NAMES: [&'static str; 1] = ["cid"];
730    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
731    const PERMISSION: Permission = Permission::Read;
732    const DESCRIPTION: Option<&'static str> =
733        Some("Checks if a given CID exists in the chain blockstore.");
734
735    type Params = (Cid,);
736    type Ok = bool;
737
738    async fn handle(
739        ctx: Ctx<impl Blockstore>,
740        (cid,): Self::Params,
741        _: &http::Extensions,
742    ) -> Result<Self::Ok, ServerError> {
743        Ok(ctx.store().get(&cid)?.is_some())
744    }
745}
746
747/// Returns statistics about the graph referenced by 'obj'.
748/// If 'base' is also specified, then the returned stat will be a diff between the two objects.
749pub enum ChainStatObj {}
750impl RpcMethod<2> for ChainStatObj {
751    const NAME: &'static str = "Filecoin.ChainStatObj";
752    const PARAM_NAMES: [&'static str; 2] = ["obj_cid", "base_cid"];
753    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
754    const PERMISSION: Permission = Permission::Read;
755
756    type Params = (Cid, Option<Cid>);
757    type Ok = ObjStat;
758
759    async fn handle(
760        ctx: Ctx<impl Blockstore>,
761        (obj_cid, base_cid): Self::Params,
762        _: &http::Extensions,
763    ) -> Result<Self::Ok, ServerError> {
764        let mut stats = ObjStat::default();
765        let mut seen = CidHashSet::default();
766        let mut walk = |cid, collect| {
767            let mut queue = VecDeque::new();
768            queue.push_back(cid);
769            while let Some(link_cid) = queue.pop_front() {
770                if !seen.insert(link_cid) {
771                    continue;
772                }
773                let data = ctx.store().get(&link_cid)?;
774                if let Some(data) = data {
775                    if collect {
776                        stats.links += 1;
777                        stats.size += data.len();
778                    }
779                    if matches!(link_cid.codec(), fvm_ipld_encoding::DAG_CBOR)
780                        && let Ok(ipld) =
781                            crate::utils::encoding::from_slice_with_fallback::<Ipld>(&data)
782                    {
783                        for ipld in DfsIter::new(ipld) {
784                            if let Ipld::Link(cid) = ipld {
785                                queue.push_back(cid);
786                            }
787                        }
788                    }
789                }
790            }
791            anyhow::Ok(())
792        };
793        if let Some(base_cid) = base_cid {
794            walk(base_cid, false)?;
795        }
796        walk(obj_cid, true)?;
797        Ok(stats)
798    }
799}
800
801pub enum ChainGetBlockMessages {}
802impl RpcMethod<1> for ChainGetBlockMessages {
803    const NAME: &'static str = "Filecoin.ChainGetBlockMessages";
804    const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
805    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
806    const PERMISSION: Permission = Permission::Read;
807    const DESCRIPTION: Option<&'static str> =
808        Some("Returns all messages from the specified block.");
809
810    type Params = (Cid,);
811    type Ok = BlockMessages;
812
813    async fn handle(
814        ctx: Ctx<impl Blockstore>,
815        (block_cid,): Self::Params,
816        _: &http::Extensions,
817    ) -> Result<Self::Ok, ServerError> {
818        let blk: CachingBlockHeader = ctx.store().get_cbor_required(&block_cid)?;
819        let (unsigned_cids, signed_cids) = crate::chain::read_msg_cids(ctx.store(), &blk)?;
820        let (bls_msg, secp_msg) =
821            crate::chain::block_messages_from_cids(ctx.store(), &unsigned_cids, &signed_cids)?;
822        let cids = unsigned_cids.into_iter().chain(signed_cids).collect();
823
824        let ret = BlockMessages {
825            bls_msg,
826            secp_msg,
827            cids,
828        };
829        Ok(ret)
830    }
831}
832
833pub enum ChainGetPath {}
834impl RpcMethod<2> for ChainGetPath {
835    const NAME: &'static str = "Filecoin.ChainGetPath";
836    const PARAM_NAMES: [&'static str; 2] = ["from", "to"];
837    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
838    const PERMISSION: Permission = Permission::Read;
839    const DESCRIPTION: Option<&'static str> =
840        Some("Returns the path between the two specified tipsets.");
841
842    type Params = (TipsetKey, TipsetKey);
843    type Ok = Vec<PathChange>;
844
845    async fn handle(
846        ctx: Ctx<impl Blockstore>,
847        (from, to): Self::Params,
848        _: &http::Extensions,
849    ) -> Result<Self::Ok, ServerError> {
850        impl_chain_get_path(ctx.chain_store(), &from, &to).map_err(Into::into)
851    }
852}
853
854/// Find the path between two tipsets, as a series of [`PathChange`]s.
855///
856/// ```text
857/// 0 - A - B - C - D
858///     ^~~~~~~~> apply B, C
859///
860/// 0 - A - B - C - D
861///     <~~~~~~~^ revert C, B
862///
863///     <~~~~~~~~ revert C, B
864/// 0 - A - B  - C
865///     |
866///      -- B' - C'
867///      ~~~~~~~~> then apply B', C'
868/// ```
869///
870/// Exposes errors from the [`Blockstore`], and returns an error if there is no common ancestor.
871fn impl_chain_get_path(
872    chain_store: &ChainStore<impl Blockstore>,
873    from: &TipsetKey,
874    to: &TipsetKey,
875) -> anyhow::Result<Vec<PathChange>> {
876    let mut to_revert = chain_store
877        .load_required_tipset_or_heaviest(from)
878        .context("couldn't load `from`")?;
879    let mut to_apply = chain_store
880        .load_required_tipset_or_heaviest(to)
881        .context("couldn't load `to`")?;
882
883    let mut all_reverts = vec![];
884    let mut all_applies = vec![];
885
886    // This loop is guaranteed to terminate if the blockstore contain no cycles.
887    // This is currently computationally infeasible.
888    while to_revert != to_apply {
889        if to_revert.epoch() > to_apply.epoch() {
890            let next = chain_store
891                .load_required_tipset_or_heaviest(to_revert.parents())
892                .context("couldn't load ancestor of `from`")?;
893            all_reverts.push(to_revert);
894            to_revert = next;
895        } else {
896            let next = chain_store
897                .load_required_tipset_or_heaviest(to_apply.parents())
898                .context("couldn't load ancestor of `to`")?;
899            all_applies.push(to_apply);
900            to_apply = next;
901        }
902    }
903    Ok(all_reverts
904        .into_iter()
905        .map(PathChange::Revert)
906        .chain(all_applies.into_iter().rev().map(PathChange::Apply))
907        .collect())
908}
909
910/// Get tipset at epoch. Pick younger tipset if epoch points to a
911/// null-tipset. Only tipsets below the given `head` are searched. If `head`
912/// is null, the node will use the heaviest tipset.
913pub enum ChainGetTipSetByHeight {}
914impl RpcMethod<2> for ChainGetTipSetByHeight {
915    const NAME: &'static str = "Filecoin.ChainGetTipSetByHeight";
916    const PARAM_NAMES: [&'static str; 2] = ["height", "tipsetKey"];
917    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
918    const PERMISSION: Permission = Permission::Read;
919    const DESCRIPTION: Option<&'static str> = Some("Returns the tipset at the specified height.");
920
921    type Params = (ChainEpoch, ApiTipsetKey);
922    type Ok = Tipset;
923
924    async fn handle(
925        ctx: Ctx<impl Blockstore>,
926        (height, ApiTipsetKey(tipset_key)): Self::Params,
927        _: &http::Extensions,
928    ) -> Result<Self::Ok, ServerError> {
929        let ts = ctx
930            .chain_store()
931            .load_required_tipset_or_heaviest(&tipset_key)?;
932        let tss = ctx
933            .chain_index()
934            .tipset_by_height(height, ts, ResolveNullTipset::TakeOlder)?;
935        Ok(tss)
936    }
937}
938
939pub enum ChainGetTipSetAfterHeight {}
940impl RpcMethod<2> for ChainGetTipSetAfterHeight {
941    const NAME: &'static str = "Filecoin.ChainGetTipSetAfterHeight";
942    const PARAM_NAMES: [&'static str; 2] = ["height", "tipsetKey"];
943    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
944    const PERMISSION: Permission = Permission::Read;
945    const DESCRIPTION: Option<&'static str> = Some(
946        "Looks back and returns the tipset at the specified epoch.
947    If there are no blocks at the given epoch,
948    returns the first non-nil tipset at a later epoch.",
949    );
950
951    type Params = (ChainEpoch, ApiTipsetKey);
952    type Ok = Tipset;
953
954    async fn handle(
955        ctx: Ctx<impl Blockstore>,
956        (height, ApiTipsetKey(tipset_key)): Self::Params,
957        _: &http::Extensions,
958    ) -> Result<Self::Ok, ServerError> {
959        let ts = ctx
960            .chain_store()
961            .load_required_tipset_or_heaviest(&tipset_key)?;
962        let tss = ctx
963            .chain_index()
964            .tipset_by_height(height, ts, ResolveNullTipset::TakeNewer)?;
965        Ok(tss)
966    }
967}
968
969pub enum ChainGetGenesis {}
970impl RpcMethod<0> for ChainGetGenesis {
971    const NAME: &'static str = "Filecoin.ChainGetGenesis";
972    const PARAM_NAMES: [&'static str; 0] = [];
973    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
974    const PERMISSION: Permission = Permission::Read;
975
976    type Params = ();
977    type Ok = Option<Tipset>;
978
979    async fn handle(
980        ctx: Ctx<impl Blockstore>,
981        (): Self::Params,
982        _: &http::Extensions,
983    ) -> Result<Self::Ok, ServerError> {
984        let genesis = ctx.chain_store().genesis_block_header();
985        Ok(Some(Tipset::from(genesis)))
986    }
987}
988
989pub enum ChainHead {}
990impl RpcMethod<0> for ChainHead {
991    const NAME: &'static str = "Filecoin.ChainHead";
992    const PARAM_NAMES: [&'static str; 0] = [];
993    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
994    const PERMISSION: Permission = Permission::Read;
995    const DESCRIPTION: Option<&'static str> = Some("Returns the chain head (heaviest tipset).");
996
997    type Params = ();
998    type Ok = Tipset;
999
1000    async fn handle(
1001        ctx: Ctx<impl Blockstore>,
1002        (): Self::Params,
1003        _: &http::Extensions,
1004    ) -> Result<Self::Ok, ServerError> {
1005        let heaviest = ctx.chain_store().heaviest_tipset();
1006        Ok(heaviest)
1007    }
1008}
1009
1010pub enum ChainGetBlock {}
1011impl RpcMethod<1> for ChainGetBlock {
1012    const NAME: &'static str = "Filecoin.ChainGetBlock";
1013    const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
1014    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1015    const PERMISSION: Permission = Permission::Read;
1016    const DESCRIPTION: Option<&'static str> = Some("Returns the block with the specified CID.");
1017
1018    type Params = (Cid,);
1019    type Ok = CachingBlockHeader;
1020
1021    async fn handle(
1022        ctx: Ctx<impl Blockstore>,
1023        (block_cid,): Self::Params,
1024        _: &http::Extensions,
1025    ) -> Result<Self::Ok, ServerError> {
1026        let blk: CachingBlockHeader = ctx.store().get_cbor_required(&block_cid)?;
1027        Ok(blk)
1028    }
1029}
1030
1031pub enum ChainGetTipSet {}
1032
1033impl RpcMethod<1> for ChainGetTipSet {
1034    const NAME: &'static str = "Filecoin.ChainGetTipSet";
1035    const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
1036    const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::{ V0 | V1 });
1037    const PERMISSION: Permission = Permission::Read;
1038    const DESCRIPTION: Option<&'static str> = Some("Returns the tipset with the specified CID.");
1039
1040    type Params = (ApiTipsetKey,);
1041    type Ok = Tipset;
1042
1043    async fn handle(
1044        ctx: Ctx<impl Blockstore>,
1045        (ApiTipsetKey(tsk),): Self::Params,
1046        _: &http::Extensions,
1047    ) -> Result<Self::Ok, ServerError> {
1048        if let Some(tsk) = &tsk {
1049            let ts = ctx.chain_index().load_required_tipset(tsk)?;
1050            Ok(ts)
1051        } else {
1052            // It contains Lotus error message `NewTipSet called with zero length array of blocks` for parity tests
1053            Err(anyhow::anyhow!(
1054                "TipsetKey cannot be empty (NewTipSet called with zero length array of blocks)"
1055            )
1056            .into())
1057        }
1058    }
1059}
1060
1061pub enum ChainGetTipSetV2 {}
1062
1063impl ChainGetTipSetV2 {
1064    pub async fn get_tipset_by_anchor(
1065        ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1066        anchor: &Option<TipsetAnchor>,
1067    ) -> anyhow::Result<Tipset> {
1068        if let Some(anchor) = anchor {
1069            match (&anchor.key.0, &anchor.tag) {
1070                // Anchor is zero-valued. Fall back to heaviest tipset.
1071                (None, None) => Ok(ctx.state_manager.heaviest_tipset()),
1072                // Get tipset at the specified key.
1073                (Some(tsk), None) => Ok(ctx.chain_index().load_required_tipset(tsk)?),
1074                (None, Some(tag)) => Self::get_tipset_by_tag(ctx, *tag).await,
1075                _ => {
1076                    anyhow::bail!("invalid anchor")
1077                }
1078            }
1079        } else {
1080            // No anchor specified. Fall back to finalized tipset.
1081            Self::get_tipset_by_tag(ctx, TipsetTag::Finalized).await
1082        }
1083    }
1084
1085    pub async fn get_tipset_by_tag(
1086        ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1087        tag: TipsetTag,
1088    ) -> anyhow::Result<Tipset> {
1089        match tag {
1090            TipsetTag::Latest => Ok(ctx.state_manager.heaviest_tipset()),
1091            TipsetTag::Finalized => Self::get_latest_finalized_tipset(ctx).await,
1092            TipsetTag::Safe => Self::get_latest_safe_tipset(ctx).await,
1093        }
1094    }
1095
1096    pub async fn get_latest_safe_tipset(
1097        ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1098    ) -> anyhow::Result<Tipset> {
1099        let finalized = Self::get_latest_finalized_tipset(ctx).await?;
1100        let head = ctx.chain_store().heaviest_tipset();
1101        let safe_height = (head.epoch() - SAFE_HEIGHT_DISTANCE).max(0);
1102        if finalized.epoch() >= safe_height {
1103            Ok(finalized)
1104        } else {
1105            Ok(ctx.chain_index().tipset_by_height(
1106                safe_height,
1107                head,
1108                ResolveNullTipset::TakeOlder,
1109            )?)
1110        }
1111    }
1112
1113    pub async fn get_latest_finalized_tipset(
1114        ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1115    ) -> anyhow::Result<Tipset> {
1116        let Ok(f3_finalized_cert) = crate::rpc::f3::F3GetLatestCertificate::get().await else {
1117            return Self::get_ec_finalized_tipset(ctx);
1118        };
1119
1120        let f3_finalized_head = f3_finalized_cert.chain_head();
1121        let head = ctx.chain_store().heaviest_tipset();
1122        // Latest F3 finalized tipset is older than EC finality, falling back to EC finality
1123        if head.epoch() > f3_finalized_head.epoch + ctx.chain_config().policy.chain_finality {
1124            return Self::get_ec_finalized_tipset(ctx);
1125        }
1126
1127        let ts = ctx
1128            .chain_index()
1129            .load_required_tipset(&f3_finalized_head.key)
1130            .map_err(|e| {
1131                anyhow::anyhow!(
1132                    "Failed to load F3 finalized tipset at epoch {} with key {}: {e}",
1133                    f3_finalized_head.epoch,
1134                    f3_finalized_head.key,
1135                )
1136            })?;
1137        Ok(ts)
1138    }
1139
1140    pub fn get_ec_finalized_tipset(ctx: &Ctx<impl Blockstore>) -> anyhow::Result<Tipset> {
1141        let head = ctx.chain_store().heaviest_tipset();
1142        let ec_finality_epoch = (head.epoch() - ctx.chain_config().policy.chain_finality).max(0);
1143        Ok(ctx.chain_index().tipset_by_height(
1144            ec_finality_epoch,
1145            head,
1146            ResolveNullTipset::TakeOlder,
1147        )?)
1148    }
1149
1150    pub async fn get_tipset(
1151        ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1152        selector: &TipsetSelector,
1153    ) -> anyhow::Result<Tipset> {
1154        selector.validate()?;
1155        // Get tipset by key.
1156        if let ApiTipsetKey(Some(tsk)) = &selector.key {
1157            let ts = ctx.chain_index().load_required_tipset(tsk)?;
1158            return Ok(ts);
1159        }
1160        // Get tipset by height.
1161        if let Some(height) = &selector.height {
1162            let anchor = Self::get_tipset_by_anchor(ctx, &height.anchor).await?;
1163            let ts = ctx.chain_index().tipset_by_height(
1164                height.at,
1165                anchor,
1166                height.resolve_null_tipset_policy(),
1167            )?;
1168            return Ok(ts);
1169        }
1170        // Get tipset by tag, either latest or finalized.
1171        if let Some(tag) = &selector.tag {
1172            let ts = Self::get_tipset_by_tag(ctx, *tag).await?;
1173            return Ok(ts);
1174        }
1175        anyhow::bail!("no tipset found for selector")
1176    }
1177}
1178
1179impl RpcMethod<1> for ChainGetTipSetV2 {
1180    const NAME: &'static str = "Filecoin.ChainGetTipSet";
1181    const PARAM_NAMES: [&'static str; 1] = ["tipsetSelector"];
1182    const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::{ V2 });
1183    const PERMISSION: Permission = Permission::Read;
1184    const DESCRIPTION: Option<&'static str> = Some("Returns the tipset with the specified CID.");
1185
1186    type Params = (TipsetSelector,);
1187    type Ok = Tipset;
1188
1189    async fn handle(
1190        ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
1191        (selector,): Self::Params,
1192        _: &http::Extensions,
1193    ) -> Result<Self::Ok, ServerError> {
1194        Ok(Self::get_tipset(&ctx, &selector).await?)
1195    }
1196}
1197
1198pub enum ChainSetHead {}
1199impl RpcMethod<1> for ChainSetHead {
1200    const NAME: &'static str = "Filecoin.ChainSetHead";
1201    const PARAM_NAMES: [&'static str; 1] = ["tsk"];
1202    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1203    const PERMISSION: Permission = Permission::Admin;
1204
1205    type Params = (TipsetKey,);
1206    type Ok = ();
1207
1208    async fn handle(
1209        ctx: Ctx<impl Blockstore>,
1210        (tsk,): Self::Params,
1211        _: &http::Extensions,
1212    ) -> Result<Self::Ok, ServerError> {
1213        // This is basically a port of the reference implementation at
1214        // https://github.com/filecoin-project/lotus/blob/v1.23.0/node/impl/full/chain.go#L321
1215
1216        let new_head = ctx.chain_index().load_required_tipset(&tsk)?;
1217        let mut current = ctx.chain_store().heaviest_tipset();
1218        while current.epoch() >= new_head.epoch() {
1219            for cid in current.key().to_cids() {
1220                ctx.chain_store().unmark_block_as_validated(&cid);
1221            }
1222            let parents = &current.block_headers().first().parents;
1223            current = ctx.chain_index().load_required_tipset(parents)?;
1224        }
1225        ctx.chain_store()
1226            .set_heaviest_tipset(new_head)
1227            .map_err(Into::into)
1228    }
1229}
1230
1231pub enum ChainGetMinBaseFee {}
1232impl RpcMethod<1> for ChainGetMinBaseFee {
1233    const NAME: &'static str = "Forest.ChainGetMinBaseFee";
1234    const PARAM_NAMES: [&'static str; 1] = ["lookback"];
1235    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1236    const PERMISSION: Permission = Permission::Read;
1237
1238    type Params = (u32,);
1239    type Ok = String;
1240
1241    async fn handle(
1242        ctx: Ctx<impl Blockstore>,
1243        (lookback,): Self::Params,
1244        _: &http::Extensions,
1245    ) -> Result<Self::Ok, ServerError> {
1246        let mut current = ctx.chain_store().heaviest_tipset();
1247        let mut min_base_fee = current.block_headers().first().parent_base_fee.clone();
1248
1249        for _ in 0..lookback {
1250            let parents = &current.block_headers().first().parents;
1251            current = ctx.chain_index().load_required_tipset(parents)?;
1252
1253            min_base_fee =
1254                min_base_fee.min(current.block_headers().first().parent_base_fee.to_owned());
1255        }
1256
1257        Ok(min_base_fee.atto().to_string())
1258    }
1259}
1260
1261pub enum ChainTipSetWeight {}
1262impl RpcMethod<1> for ChainTipSetWeight {
1263    const NAME: &'static str = "Filecoin.ChainTipSetWeight";
1264    const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
1265    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1266    const PERMISSION: Permission = Permission::Read;
1267    const DESCRIPTION: Option<&'static str> = Some("Returns the weight of the specified tipset.");
1268
1269    type Params = (ApiTipsetKey,);
1270    type Ok = BigInt;
1271
1272    async fn handle(
1273        ctx: Ctx<impl Blockstore>,
1274        (ApiTipsetKey(tipset_key),): Self::Params,
1275        _: &http::Extensions,
1276    ) -> Result<Self::Ok, ServerError> {
1277        let ts = ctx
1278            .chain_store()
1279            .load_required_tipset_or_heaviest(&tipset_key)?;
1280        let weight = crate::fil_cns::weight(ctx.store(), &ts)?;
1281        Ok(weight)
1282    }
1283}
1284
1285pub enum ChainGetTipsetByParentState {}
1286impl RpcMethod<1> for ChainGetTipsetByParentState {
1287    const NAME: &'static str = "Forest.ChainGetTipsetByParentState";
1288    const PARAM_NAMES: [&'static str; 1] = ["parentState"];
1289    const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1290    const PERMISSION: Permission = Permission::Read;
1291
1292    type Params = (Cid,);
1293    type Ok = Option<Tipset>;
1294
1295    async fn handle(
1296        ctx: Ctx<impl Blockstore>,
1297        (parent_state,): Self::Params,
1298        _: &http::Extensions,
1299    ) -> Result<Self::Ok, ServerError> {
1300        Ok(ctx
1301            .chain_store()
1302            .heaviest_tipset()
1303            .chain(ctx.store())
1304            .find(|ts| ts.parent_state() == &parent_state)
1305            .clone())
1306    }
1307}
1308
1309pub const CHAIN_NOTIFY: &str = "Filecoin.ChainNotify";
1310pub(crate) fn chain_notify<DB: Blockstore>(
1311    _params: Params<'_>,
1312    data: &crate::rpc::RPCState<DB>,
1313) -> Subscriber<Vec<ApiHeadChange>> {
1314    let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
1315
1316    // As soon as the channel is created, send the current tipset
1317    let current = data.chain_store().heaviest_tipset();
1318    let (change, tipset) = ("current".into(), current);
1319    sender
1320        .send(vec![ApiHeadChange { change, tipset }])
1321        .expect("receiver is not dropped");
1322
1323    let mut subscriber = data.chain_store().publisher().subscribe();
1324
1325    tokio::spawn(async move {
1326        // Skip first message
1327        let _ = subscriber.recv().await;
1328
1329        while let Ok(v) = subscriber.recv().await {
1330            let (change, tipset) = match v {
1331                HeadChange::Apply(ts) => ("apply".into(), ts),
1332            };
1333
1334            if sender.send(vec![ApiHeadChange { change, tipset }]).is_err() {
1335                break;
1336            }
1337        }
1338    });
1339    receiver
1340}
1341
1342async fn load_api_messages_from_tipset<DB: Blockstore + Send + Sync + 'static>(
1343    ctx: &crate::rpc::RPCState<DB>,
1344    tipset_keys: &TipsetKey,
1345) -> Result<Vec<ApiMessage>, ServerError> {
1346    static SHOULD_BACKFILL: LazyLock<bool> = LazyLock::new(|| {
1347        let enabled = is_env_truthy("FOREST_RPC_BACKFILL_FULL_TIPSET_FROM_NETWORK");
1348        if enabled {
1349            tracing::warn!(
1350                "Full tipset backfilling from network is enabled via FOREST_RPC_BACKFILL_FULL_TIPSET_FROM_NETWORK, excessive disk and bandwidth usage is expected."
1351            );
1352        }
1353        enabled
1354    });
1355    let full_tipset = if *SHOULD_BACKFILL {
1356        get_full_tipset(
1357            &ctx.sync_network_context,
1358            ctx.chain_store(),
1359            None,
1360            tipset_keys,
1361        )
1362        .await?
1363    } else {
1364        load_full_tipset(ctx.chain_store(), tipset_keys)?
1365    };
1366    let blocks = full_tipset.into_blocks();
1367    let mut messages = vec![];
1368    let mut seen = CidHashSet::default();
1369    for Block {
1370        bls_messages,
1371        secp_messages,
1372        ..
1373    } in blocks
1374    {
1375        for message in bls_messages {
1376            let cid = message.cid();
1377            if seen.insert(cid) {
1378                messages.push(ApiMessage { cid, message });
1379            }
1380        }
1381
1382        for msg in secp_messages {
1383            let cid = msg.cid();
1384            if seen.insert(cid) {
1385                messages.push(ApiMessage {
1386                    cid,
1387                    message: msg.message,
1388                });
1389            }
1390        }
1391    }
1392
1393    Ok(messages)
1394}
1395
1396#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
1397pub struct BlockMessages {
1398    #[serde(rename = "BlsMessages", with = "crate::lotus_json")]
1399    #[schemars(with = "LotusJson<Vec<Message>>")]
1400    pub bls_msg: Vec<Message>,
1401    #[serde(rename = "SecpkMessages", with = "crate::lotus_json")]
1402    #[schemars(with = "LotusJson<Vec<SignedMessage>>")]
1403    pub secp_msg: Vec<SignedMessage>,
1404    #[serde(rename = "Cids", with = "crate::lotus_json")]
1405    #[schemars(with = "LotusJson<Vec<Cid>>")]
1406    pub cids: Vec<Cid>,
1407}
1408lotus_json_with_self!(BlockMessages);
1409
1410#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, JsonSchema)]
1411#[serde(rename_all = "PascalCase")]
1412pub struct ApiReceipt {
1413    // Exit status of message execution
1414    pub exit_code: ExitCode,
1415    // `Return` value if the exit code is zero
1416    #[serde(rename = "Return", with = "crate::lotus_json")]
1417    #[schemars(with = "LotusJson<RawBytes>")]
1418    pub return_data: RawBytes,
1419    // Non-negative value of GasUsed
1420    pub gas_used: u64,
1421    #[serde(with = "crate::lotus_json")]
1422    #[schemars(with = "LotusJson<Option<Cid>>")]
1423    pub events_root: Option<Cid>,
1424}
1425
1426lotus_json_with_self!(ApiReceipt);
1427
1428#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug, Eq, PartialEq)]
1429#[serde(rename_all = "PascalCase")]
1430pub struct ApiMessage {
1431    #[serde(with = "crate::lotus_json")]
1432    #[schemars(with = "LotusJson<Cid>")]
1433    pub cid: Cid,
1434    #[serde(with = "crate::lotus_json")]
1435    #[schemars(with = "LotusJson<Message>")]
1436    pub message: Message,
1437}
1438
1439lotus_json_with_self!(ApiMessage);
1440
1441#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug, Eq, PartialEq)]
1442pub struct FlattenedApiMessage {
1443    #[serde(flatten, with = "crate::lotus_json")]
1444    #[schemars(with = "LotusJson<Message>")]
1445    pub message: Message,
1446    #[serde(rename = "CID", with = "crate::lotus_json")]
1447    #[schemars(with = "LotusJson<Cid>")]
1448    pub cid: Cid,
1449}
1450
1451lotus_json_with_self!(FlattenedApiMessage);
1452
1453#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1454pub struct ForestChainExportParams {
1455    pub version: FilecoinSnapshotVersion,
1456    pub epoch: ChainEpoch,
1457    pub recent_roots: i64,
1458    pub output_path: PathBuf,
1459    #[schemars(with = "LotusJson<ApiTipsetKey>")]
1460    #[serde(with = "crate::lotus_json")]
1461    pub tipset_keys: ApiTipsetKey,
1462    #[serde(default)]
1463    pub include_receipts: bool,
1464    #[serde(default)]
1465    pub include_events: bool,
1466    pub skip_checksum: bool,
1467    pub dry_run: bool,
1468}
1469lotus_json_with_self!(ForestChainExportParams);
1470
1471#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1472pub struct ForestChainExportDiffParams {
1473    pub from: ChainEpoch,
1474    pub to: ChainEpoch,
1475    pub depth: i64,
1476    pub output_path: PathBuf,
1477}
1478lotus_json_with_self!(ForestChainExportDiffParams);
1479
1480#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1481pub struct ChainExportParams {
1482    pub epoch: ChainEpoch,
1483    pub recent_roots: i64,
1484    pub output_path: PathBuf,
1485    #[schemars(with = "LotusJson<ApiTipsetKey>")]
1486    #[serde(with = "crate::lotus_json")]
1487    pub tipset_keys: ApiTipsetKey,
1488    pub skip_checksum: bool,
1489    pub dry_run: bool,
1490}
1491lotus_json_with_self!(ChainExportParams);
1492
1493#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
1494#[serde(rename_all = "PascalCase")]
1495pub struct ApiHeadChange {
1496    #[serde(rename = "Type")]
1497    pub change: String,
1498    #[serde(rename = "Val", with = "crate::lotus_json")]
1499    #[schemars(with = "LotusJson<Tipset>")]
1500    pub tipset: Tipset,
1501}
1502lotus_json_with_self!(ApiHeadChange);
1503
1504#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
1505#[serde(tag = "Type", content = "Val", rename_all = "snake_case")]
1506pub enum PathChange<T = Tipset> {
1507    Revert(T),
1508    Apply(T),
1509}
1510impl HasLotusJson for PathChange {
1511    type LotusJson = PathChange<<Tipset as HasLotusJson>::LotusJson>;
1512
1513    #[cfg(test)]
1514    fn snapshots() -> Vec<(serde_json::Value, Self)> {
1515        use serde_json::json;
1516        vec![(
1517            json!({
1518                "Type": "revert",
1519                "Val": {
1520                    "Blocks": [
1521                        {
1522                            "BeaconEntries": null,
1523                            "ForkSignaling": 0,
1524                            "Height": 0,
1525                            "Messages": { "/": "baeaaaaa" },
1526                            "Miner": "f00",
1527                            "ParentBaseFee": "0",
1528                            "ParentMessageReceipts": { "/": "baeaaaaa" },
1529                            "ParentStateRoot": { "/":"baeaaaaa" },
1530                            "ParentWeight": "0",
1531                            "Parents": [{"/":"bafyreiaqpwbbyjo4a42saasj36kkrpv4tsherf2e7bvezkert2a7dhonoi"}],
1532                            "Timestamp": 0,
1533                            "WinPoStProof": null
1534                        }
1535                    ],
1536                    "Cids": [
1537                        { "/": "bafy2bzaceag62hjj3o43lf6oyeox3fvg5aqkgl5zagbwpjje3ajwg6yw4iixk" }
1538                    ],
1539                    "Height": 0
1540                }
1541            }),
1542            Self::Revert(RawBlockHeader::default().into()),
1543        )]
1544    }
1545
1546    fn into_lotus_json(self) -> Self::LotusJson {
1547        match self {
1548            PathChange::Revert(it) => PathChange::Revert(it.into_lotus_json()),
1549            PathChange::Apply(it) => PathChange::Apply(it.into_lotus_json()),
1550        }
1551    }
1552
1553    fn from_lotus_json(lotus_json: Self::LotusJson) -> Self {
1554        match lotus_json {
1555            PathChange::Revert(it) => PathChange::Revert(Tipset::from_lotus_json(it)),
1556            PathChange::Apply(it) => PathChange::Apply(Tipset::from_lotus_json(it)),
1557        }
1558    }
1559}
1560
1561#[cfg(test)]
1562impl<T> quickcheck::Arbitrary for PathChange<T>
1563where
1564    T: quickcheck::Arbitrary,
1565{
1566    fn arbitrary(g: &mut quickcheck::Gen) -> Self {
1567        let inner = T::arbitrary(g);
1568        g.choose(&[PathChange::Apply(inner.clone()), PathChange::Revert(inner)])
1569            .unwrap()
1570            .clone()
1571    }
1572}
1573
1574#[test]
1575fn snapshots() {
1576    assert_all_snapshots::<PathChange>()
1577}
1578
1579#[cfg(test)]
1580quickcheck::quickcheck! {
1581    fn quickcheck(val: PathChange) -> () {
1582        assert_unchanged_via_json(val)
1583    }
1584}
1585
1586#[cfg(test)]
1587mod tests {
1588    use super::*;
1589    use crate::{
1590        blocks::{Chain4U, RawBlockHeader, chain4u},
1591        db::{
1592            MemoryDB,
1593            car::{AnyCar, ManyCar},
1594        },
1595        networks::{self, ChainConfig},
1596    };
1597    use PathChange::{Apply, Revert};
1598    use itertools::Itertools as _;
1599    use std::sync::Arc;
1600
1601    #[test]
1602    fn revert_to_ancestor_linear() {
1603        let store = ChainStore::calibnet();
1604        chain4u! {
1605            in store.blockstore();
1606            [_genesis = store.genesis_block_header()]
1607            -> [a] -> [b] -> [c, d] -> [e]
1608        };
1609
1610        // simple
1611        assert_path_change(&store, b, a, [Revert(&[b])]);
1612
1613        // from multi-member tipset
1614        assert_path_change(&store, [c, d], a, [Revert(&[c, d][..]), Revert(&[b])]);
1615
1616        // to multi-member tipset
1617        assert_path_change(&store, e, [c, d], [Revert(e)]);
1618
1619        // over multi-member tipset
1620        assert_path_change(&store, e, b, [Revert(&[e][..]), Revert(&[c, d])]);
1621    }
1622
1623    /// Mirror how lotus handles passing an incomplete `TipsetKey`s.
1624    /// Tested on lotus `1.23.2`
1625    #[test]
1626    fn incomplete_tipsets() {
1627        let store = ChainStore::calibnet();
1628        chain4u! {
1629            in store.blockstore();
1630            [_genesis = store.genesis_block_header()]
1631            -> [a, b] -> [c] -> [d, _e] // this pattern 2 -> 1 -> 2 can be found at calibnet epoch 1369126
1632        };
1633
1634        // apply to descendant with incomplete `from`
1635        assert_path_change(
1636            &store,
1637            a,
1638            c,
1639            [
1640                Revert(&[a][..]), // revert the incomplete tipset
1641                Apply(&[a, b]),   // apply the complete one
1642                Apply(&[c]),      // apply the destination
1643            ],
1644        );
1645
1646        // apply to descendant with incomplete `to`
1647        assert_path_change(&store, c, d, [Apply(d)]);
1648
1649        // revert to ancestor with incomplete `from`
1650        assert_path_change(&store, d, c, [Revert(d)]);
1651
1652        // revert to ancestor with incomplete `to`
1653        assert_path_change(
1654            &store,
1655            c,
1656            a,
1657            [
1658                Revert(&[c][..]),
1659                Revert(&[a, b]), // revert the complete tipset
1660                Apply(&[a]),     // apply the incomplete one
1661            ],
1662        );
1663    }
1664
1665    #[test]
1666    fn apply_to_descendant_linear() {
1667        let store = ChainStore::calibnet();
1668        chain4u! {
1669            in store.blockstore();
1670            [_genesis = store.genesis_block_header()]
1671            -> [a] -> [b] -> [c, d] -> [e]
1672        };
1673
1674        // simple
1675        assert_path_change(&store, a, b, [Apply(&[b])]);
1676
1677        // from multi-member tipset
1678        assert_path_change(&store, [c, d], e, [Apply(e)]);
1679
1680        // to multi-member tipset
1681        assert_path_change(&store, b, [c, d], [Apply([c, d])]);
1682
1683        // over multi-member tipset
1684        assert_path_change(&store, b, e, [Apply(&[c, d][..]), Apply(&[e])]);
1685    }
1686
1687    #[test]
1688    fn cross_fork_simple() {
1689        let store = ChainStore::calibnet();
1690        chain4u! {
1691            in store.blockstore();
1692            [_genesis = store.genesis_block_header()]
1693            -> [a] -> [b1] -> [c1]
1694        };
1695        chain4u! {
1696            from [a] in store.blockstore();
1697            [b2] -> [c2]
1698        };
1699
1700        // same height
1701        assert_path_change(&store, b1, b2, [Revert(b1), Apply(b2)]);
1702
1703        // different height
1704        assert_path_change(&store, b1, c2, [Revert(b1), Apply(b2), Apply(c2)]);
1705
1706        let _ = (a, c1);
1707    }
1708
1709    impl ChainStore<Chain4U<ManyCar>> {
1710        fn _load(genesis_car: &'static [u8], genesis_cid: Cid) -> Self {
1711            let db = Arc::new(Chain4U::with_blockstore(
1712                ManyCar::new(MemoryDB::default())
1713                    .with_read_only(AnyCar::new(genesis_car).unwrap())
1714                    .unwrap(),
1715            ));
1716            let genesis_block_header = db.get_cbor(&genesis_cid).unwrap().unwrap();
1717            ChainStore::new(
1718                db,
1719                Arc::new(MemoryDB::default()),
1720                Arc::new(MemoryDB::default()),
1721                Arc::new(ChainConfig::calibnet()),
1722                genesis_block_header,
1723            )
1724            .unwrap()
1725        }
1726        pub fn calibnet() -> Self {
1727            Self::_load(
1728                networks::calibnet::DEFAULT_GENESIS,
1729                *networks::calibnet::GENESIS_CID,
1730            )
1731        }
1732    }
1733
1734    /// Utility for writing ergonomic tests
1735    trait MakeTipset {
1736        fn make_tipset(self) -> Tipset;
1737    }
1738
1739    impl MakeTipset for &RawBlockHeader {
1740        fn make_tipset(self) -> Tipset {
1741            Tipset::from(CachingBlockHeader::new(self.clone()))
1742        }
1743    }
1744
1745    impl<const N: usize> MakeTipset for [&RawBlockHeader; N] {
1746        fn make_tipset(self) -> Tipset {
1747            self.as_slice().make_tipset()
1748        }
1749    }
1750
1751    impl<const N: usize> MakeTipset for &[&RawBlockHeader; N] {
1752        fn make_tipset(self) -> Tipset {
1753            self.as_slice().make_tipset()
1754        }
1755    }
1756
1757    impl MakeTipset for &[&RawBlockHeader] {
1758        fn make_tipset(self) -> Tipset {
1759            Tipset::new(self.iter().cloned().cloned()).unwrap()
1760        }
1761    }
1762
1763    #[track_caller]
1764    fn assert_path_change<T: MakeTipset>(
1765        store: &ChainStore<impl Blockstore>,
1766        from: impl MakeTipset,
1767        to: impl MakeTipset,
1768        expected: impl IntoIterator<Item = PathChange<T>>,
1769    ) {
1770        fn print(path_change: &PathChange) {
1771            let it = match path_change {
1772                Revert(it) => {
1773                    print!("Revert(");
1774                    it
1775                }
1776                Apply(it) => {
1777                    print!(" Apply(");
1778                    it
1779                }
1780            };
1781            println!(
1782                "epoch = {}, key.cid = {})",
1783                it.epoch(),
1784                it.key().cid().unwrap()
1785            )
1786        }
1787
1788        let actual =
1789            impl_chain_get_path(store, from.make_tipset().key(), to.make_tipset().key()).unwrap();
1790        let expected = expected
1791            .into_iter()
1792            .map(|change| match change {
1793                PathChange::Revert(it) => PathChange::Revert(it.make_tipset()),
1794                PathChange::Apply(it) => PathChange::Apply(it.make_tipset()),
1795            })
1796            .collect_vec();
1797        if expected != actual {
1798            println!("SUMMARY");
1799            println!("=======");
1800            println!("expected:");
1801            for it in &expected {
1802                print(it)
1803            }
1804            println!();
1805            println!("actual:");
1806            for it in &actual {
1807                print(it)
1808            }
1809            println!("=======\n")
1810        }
1811        assert_eq!(
1812            expected, actual,
1813            "expected change (left) does not match actual change (right)"
1814        )
1815    }
1816}