1pub mod types;
5use enumflags2::{BitFlags, make_bitflags};
6use types::*;
7
8#[cfg(test)]
9use crate::blocks::RawBlockHeader;
10use crate::blocks::{Block, CachingBlockHeader, Tipset, TipsetKey};
11use crate::chain::index::ResolveNullTipset;
12use crate::chain::{ChainStore, ExportOptions, FilecoinSnapshotVersion, HeadChange};
13use crate::chain_sync::{get_full_tipset, load_full_tipset};
14use crate::cid_collections::CidHashSet;
15use crate::ipld::DfsIter;
16use crate::ipld::{CHAIN_EXPORT_STATUS, cancel_export, end_export, start_export};
17use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self};
18#[cfg(test)]
19use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json};
20use crate::message::{ChainMessage, SignedMessage};
21use crate::rpc::eth::Block as EthBlock;
22use crate::rpc::eth::{
23 EthLog, TxInfo, eth_logs_with_filter, types::ApiHeaders, types::EthFilterSpec,
24};
25use crate::rpc::f3::F3ExportLatestSnapshot;
26use crate::rpc::types::*;
27use crate::rpc::{ApiPaths, Ctx, EthEventHandler, Permission, RpcMethod, ServerError};
28use crate::shim::clock::ChainEpoch;
29use crate::shim::error::ExitCode;
30use crate::shim::executor::Receipt;
31use crate::shim::message::Message;
32use crate::utils::db::CborStoreExt as _;
33use crate::utils::io::VoidAsyncWriter;
34use crate::utils::misc::env::is_env_truthy;
35use anyhow::{Context as _, Result};
36use cid::Cid;
37use fvm_ipld_blockstore::Blockstore;
38use fvm_ipld_encoding::{CborStore, RawBytes};
39use hex::ToHex;
40use ipld_core::ipld::Ipld;
41use jsonrpsee::types::Params;
42use jsonrpsee::types::error::ErrorObjectOwned;
43use num::BigInt;
44use schemars::JsonSchema;
45use serde::{Deserialize, Serialize};
46use sha2::Sha256;
47use std::fs::File;
48use std::{collections::VecDeque, path::PathBuf, sync::LazyLock};
49use tokio::sync::{
50 Mutex,
51 broadcast::{self, Receiver as Subscriber},
52};
53use tokio::task::JoinHandle;
54use tokio_util::sync::CancellationToken;
55
56const HEAD_CHANNEL_CAPACITY: usize = 10;
57
58const SAFE_HEIGHT_DISTANCE: ChainEpoch = 200;
72
73static CHAIN_EXPORT_LOCK: LazyLock<Mutex<Option<CancellationToken>>> =
74 LazyLock::new(|| Mutex::new(None));
75
76pub(crate) fn new_heads<DB: Blockstore + Send + Sync + 'static>(
83 data: Ctx<DB>,
84) -> (Subscriber<ApiHeaders>, JoinHandle<()>) {
85 let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
86
87 let mut subscriber = data.chain_store().publisher().subscribe();
88
89 let handle = tokio::spawn(async move {
90 while let Ok(v) = subscriber.recv().await {
91 let headers = match v {
92 HeadChange::Apply(ts) => {
93 match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await {
96 Ok(block) => ApiHeaders(block),
97 Err(e) => {
98 tracing::error!("Failed to convert tipset to eth block: {}", e);
99 continue;
100 }
101 }
102 }
103 };
104 if let Err(e) = sender.send(headers) {
105 tracing::error!("Failed to send headers: {}", e);
106 break;
107 }
108 }
109 });
110
111 (receiver, handle)
112}
113
114pub(crate) fn logs<DB: Blockstore + Sync + Send + 'static>(
121 ctx: &Ctx<DB>,
122 filter: Option<EthFilterSpec>,
123) -> (Subscriber<Vec<EthLog>>, JoinHandle<()>) {
124 let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
125
126 let mut subscriber = ctx.chain_store().publisher().subscribe();
127
128 let ctx = ctx.clone();
129
130 let handle = tokio::spawn(async move {
131 while let Ok(v) = subscriber.recv().await {
132 match v {
133 HeadChange::Apply(ts) => {
134 match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await {
135 Ok(logs) => {
136 if !logs.is_empty()
137 && let Err(e) = sender.send(logs)
138 {
139 tracing::error!(
140 "Failed to send logs for tipset {}: {}",
141 ts.key(),
142 e
143 );
144 break;
145 }
146 }
147 Err(e) => {
148 tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e);
149 }
150 }
151 }
152 }
153 }
154 });
155
156 (receiver, handle)
157}
158
159pub enum ChainGetFinalizedTipset {}
160impl RpcMethod<0> for ChainGetFinalizedTipset {
161 const NAME: &'static str = "Filecoin.ChainGetFinalizedTipSet";
162 const PARAM_NAMES: [&'static str; 0] = [];
163 const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::V1);
164 const PERMISSION: Permission = Permission::Read;
165 const DESCRIPTION: Option<&'static str> = Some(
166 "Returns the latest F3 finalized tipset, or falls back to EC finality if F3 is not operational on the node or if the F3 finalized tipset is further back than EC finalized tipset.",
167 );
168
169 type Params = ();
170 type Ok = Tipset;
171
172 async fn handle(
173 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
174 (): Self::Params,
175 _: &http::Extensions,
176 ) -> Result<Self::Ok, ServerError> {
177 let head = ctx.chain_store().heaviest_tipset();
178 let ec_finality_epoch = (head.epoch() - ctx.chain_config().policy.chain_finality).max(0);
179
180 match get_f3_finality_tipset(&ctx, ec_finality_epoch).await {
182 Ok(f3_tipset) => {
183 tracing::debug!("Using F3 finalized tipset at epoch {}", f3_tipset.epoch());
184 Ok(f3_tipset)
185 }
186 Err(_) => {
187 tracing::warn!("F3 finalization unavailable, falling back to EC finality");
189 let ec_tipset = ctx.chain_index().tipset_by_height(
190 ec_finality_epoch,
191 head,
192 ResolveNullTipset::TakeOlder,
193 )?;
194 Ok(ec_tipset)
195 }
196 }
197 }
198}
199
200async fn get_f3_finality_tipset<DB: Blockstore + Sync + Send + 'static>(
202 ctx: &Ctx<DB>,
203 ec_finality_epoch: ChainEpoch,
204) -> Result<Tipset> {
205 let f3_finalized_cert = crate::rpc::f3::F3GetLatestCertificate::get()
206 .await
207 .map_err(|e| anyhow::anyhow!("Failed to get F3 certificate: {}", e))?;
208
209 let f3_finalized_head = f3_finalized_cert.chain_head();
210 if f3_finalized_head.epoch < ec_finality_epoch {
211 return Err(anyhow::anyhow!(
212 "F3 finalized tipset epoch {} is further back than EC finalized tipset epoch {}",
213 f3_finalized_head.epoch,
214 ec_finality_epoch
215 ));
216 }
217
218 ctx.chain_index()
219 .load_required_tipset(&f3_finalized_head.key)
220 .map_err(|e| {
221 anyhow::anyhow!(
222 "Failed to load F3 finalized tipset at epoch {}: {}",
223 f3_finalized_head.epoch,
224 e
225 )
226 })
227}
228
229pub enum ChainGetMessage {}
230impl RpcMethod<1> for ChainGetMessage {
231 const NAME: &'static str = "Filecoin.ChainGetMessage";
232 const PARAM_NAMES: [&'static str; 1] = ["messageCid"];
233 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
234 const PERMISSION: Permission = Permission::Read;
235 const DESCRIPTION: Option<&'static str> = Some("Returns the message with the specified CID.");
236
237 type Params = (Cid,);
238 type Ok = FlattenedApiMessage;
239
240 async fn handle(
241 ctx: Ctx<impl Blockstore>,
242 (message_cid,): Self::Params,
243 _: &http::Extensions,
244 ) -> Result<Self::Ok, ServerError> {
245 let chain_message: ChainMessage = ctx
246 .store()
247 .get_cbor(&message_cid)?
248 .with_context(|| format!("can't find message with cid {message_cid}"))?;
249 let message = match chain_message {
250 ChainMessage::Signed(m) => m.into_message(),
251 ChainMessage::Unsigned(m) => m,
252 };
253
254 let cid = message.cid();
255 Ok(FlattenedApiMessage { message, cid })
256 }
257}
258
259pub enum ChainGetEvents {}
262impl RpcMethod<1> for ChainGetEvents {
263 const NAME: &'static str = "Filecoin.ChainGetEvents";
264 const PARAM_NAMES: [&'static str; 1] = ["rootCid"];
265 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
266 const PERMISSION: Permission = Permission::Read;
267 const DESCRIPTION: Option<&'static str> =
268 Some("Returns the events under the given event AMT root CID.");
269
270 type Params = (Cid,);
271 type Ok = Vec<Event>;
272 async fn handle(
273 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
274 (root_cid,): Self::Params,
275 _: &http::Extensions,
276 ) -> Result<Self::Ok, ServerError> {
277 let events = EthEventHandler::get_events_by_event_root(&ctx, &root_cid)?;
278 Ok(events)
279 }
280}
281
282pub enum ChainGetParentMessages {}
283impl RpcMethod<1> for ChainGetParentMessages {
284 const NAME: &'static str = "Filecoin.ChainGetParentMessages";
285 const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
286 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
287 const PERMISSION: Permission = Permission::Read;
288 const DESCRIPTION: Option<&'static str> =
289 Some("Returns the messages included in the blocks of the parent tipset.");
290
291 type Params = (Cid,);
292 type Ok = Vec<ApiMessage>;
293
294 async fn handle(
295 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
296 (block_cid,): Self::Params,
297 _: &http::Extensions,
298 ) -> Result<Self::Ok, ServerError> {
299 let store = ctx.store();
300 let block_header: CachingBlockHeader = store
301 .get_cbor(&block_cid)?
302 .with_context(|| format!("can't find block header with cid {block_cid}"))?;
303 if block_header.epoch == 0 {
304 Ok(vec![])
305 } else {
306 let parent_tipset = Tipset::load_required(store, &block_header.parents)?;
307 load_api_messages_from_tipset(&ctx, parent_tipset.key()).await
308 }
309 }
310}
311
312pub enum ChainGetParentReceipts {}
313impl RpcMethod<1> for ChainGetParentReceipts {
314 const NAME: &'static str = "Filecoin.ChainGetParentReceipts";
315 const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
316 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
317 const PERMISSION: Permission = Permission::Read;
318 const DESCRIPTION: Option<&'static str> =
319 Some("Returns the message receipts included in the blocks of the parent tipset.");
320
321 type Params = (Cid,);
322 type Ok = Vec<ApiReceipt>;
323
324 async fn handle(
325 ctx: Ctx<impl Blockstore>,
326 (block_cid,): Self::Params,
327 _: &http::Extensions,
328 ) -> Result<Self::Ok, ServerError> {
329 let store = ctx.store();
330 let block_header: CachingBlockHeader = store
331 .get_cbor(&block_cid)?
332 .with_context(|| format!("can't find block header with cid {block_cid}"))?;
333 if block_header.epoch == 0 {
334 return Ok(vec![]);
335 }
336 let receipts = Receipt::get_receipts(store, block_header.message_receipts)
337 .map_err(|_| {
338 ErrorObjectOwned::owned::<()>(
339 1,
340 format!(
341 "failed to root: ipld: could not find {}",
342 block_header.message_receipts
343 ),
344 None,
345 )
346 })?
347 .iter()
348 .map(|r| ApiReceipt {
349 exit_code: r.exit_code().into(),
350 return_data: r.return_data(),
351 gas_used: r.gas_used(),
352 events_root: r.events_root(),
353 })
354 .collect();
355
356 Ok(receipts)
357 }
358}
359
360pub enum ChainGetMessagesInTipset {}
361impl RpcMethod<1> for ChainGetMessagesInTipset {
362 const NAME: &'static str = "Filecoin.ChainGetMessagesInTipset";
363 const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
364 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
365 const PERMISSION: Permission = Permission::Read;
366
367 type Params = (ApiTipsetKey,);
368 type Ok = Vec<ApiMessage>;
369
370 async fn handle(
371 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
372 (ApiTipsetKey(tipset_key),): Self::Params,
373 _: &http::Extensions,
374 ) -> Result<Self::Ok, ServerError> {
375 let tipset = ctx
376 .chain_store()
377 .load_required_tipset_or_heaviest(&tipset_key)?;
378 load_api_messages_from_tipset(&ctx, tipset.key()).await
379 }
380}
381
382pub enum ChainPruneSnapshot {}
383impl RpcMethod<1> for ChainPruneSnapshot {
384 const NAME: &'static str = "Forest.SnapshotGC";
385 const PARAM_NAMES: [&'static str; 1] = ["blocking"];
386 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
387 const PERMISSION: Permission = Permission::Admin;
388
389 type Params = (bool,);
390 type Ok = ();
391
392 async fn handle(
393 _ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
394 (blocking,): Self::Params,
395 _: &http::Extensions,
396 ) -> Result<Self::Ok, ServerError> {
397 if let Some(gc) = crate::daemon::GLOBAL_SNAPSHOT_GC.get() {
398 let progress_rx = gc.trigger()?;
399 while blocking && progress_rx.recv_async().await.is_ok() {}
400 Ok(())
401 } else {
402 Err(anyhow::anyhow!("snapshot gc is not enabled").into())
403 }
404 }
405}
406
407pub enum ForestChainExport {}
408impl RpcMethod<1> for ForestChainExport {
409 const NAME: &'static str = "Forest.ChainExport";
410 const PARAM_NAMES: [&'static str; 1] = ["params"];
411 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
412 const PERMISSION: Permission = Permission::Read;
413
414 type Params = (ForestChainExportParams,);
415 type Ok = ApiExportResult;
416
417 async fn handle(
418 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
419 (params,): Self::Params,
420 _: &http::Extensions,
421 ) -> Result<Self::Ok, ServerError> {
422 let ForestChainExportParams {
423 version,
424 epoch,
425 recent_roots,
426 output_path,
427 tipset_keys: ApiTipsetKey(tsk),
428 include_receipts,
429 include_events,
430 skip_checksum,
431 dry_run,
432 } = params;
433
434 let token = CancellationToken::new();
435 {
436 let mut guard = CHAIN_EXPORT_LOCK.lock().await;
437 if guard.is_some() {
438 return Err(
439 anyhow::anyhow!("A chain export is still in progress. Cancel it with the export-cancel subcommand if needed.").into(),
440 );
441 }
442 *guard = Some(token.clone());
443 }
444 start_export();
445
446 let head = ctx.chain_store().load_required_tipset_or_heaviest(&tsk)?;
447 let start_ts =
448 ctx.chain_index()
449 .tipset_by_height(epoch, head, ResolveNullTipset::TakeOlder)?;
450
451 let options = Some(ExportOptions {
452 skip_checksum,
453 include_receipts,
454 include_events,
455 seen: Default::default(),
456 });
457 let writer = if dry_run {
458 tokio_util::either::Either::Left(VoidAsyncWriter)
459 } else {
460 tokio_util::either::Either::Right(tokio::fs::File::create(&output_path).await?)
461 };
462 let result = match version {
463 FilecoinSnapshotVersion::V1 => {
464 let db = ctx.store_owned();
465
466 let chain_export =
467 crate::chain::export::<Sha256>(&db, &start_ts, recent_roots, writer, options);
468
469 tokio::select! {
470 result = chain_export => {
471 result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex())))
472 },
473 _ = token.cancelled() => {
474 cancel_export();
475 tracing::warn!("Snapshot export was cancelled");
476 Ok(ApiExportResult::Cancelled)
477 },
478 }
479 }
480 FilecoinSnapshotVersion::V2 => {
481 let db = ctx.store_owned();
482
483 let f3_snap_tmp_path = {
484 let mut f3_snap_dir = output_path.clone();
485 let mut builder = tempfile::Builder::new();
486 let with_suffix = builder.suffix(".f3snap.bin");
487 if f3_snap_dir.pop() {
488 with_suffix.tempfile_in(&f3_snap_dir)
489 } else {
490 with_suffix.tempfile_in(".")
491 }?
492 .into_temp_path()
493 };
494 let f3_snap = {
495 match F3ExportLatestSnapshot::run(f3_snap_tmp_path.display().to_string()).await
496 {
497 Ok(cid) => Some((cid, File::open(&f3_snap_tmp_path)?)),
498 Err(e) => {
499 tracing::error!("Failed to export F3 snapshot: {e}");
500 None
501 }
502 }
503 };
504
505 let chain_export = crate::chain::export_v2::<Sha256, _>(
506 &db,
507 f3_snap,
508 &start_ts,
509 recent_roots,
510 writer,
511 options,
512 );
513
514 tokio::select! {
515 result = chain_export => {
516 result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex())))
517 },
518 _ = token.cancelled() => {
519 cancel_export();
520 tracing::warn!("Snapshot export was cancelled");
521 Ok(ApiExportResult::Cancelled)
522 },
523 }
524 }
525 };
526 end_export();
527 let mut guard = CHAIN_EXPORT_LOCK.lock().await;
529 *guard = None;
530 match result {
531 Ok(export_result) => Ok(export_result),
532 Err(e) => Err(anyhow::anyhow!(e).into()),
533 }
534 }
535}
536
537pub enum ForestChainExportStatus {}
538impl RpcMethod<0> for ForestChainExportStatus {
539 const NAME: &'static str = "Forest.ChainExportStatus";
540 const PARAM_NAMES: [&'static str; 0] = [];
541 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
542 const PERMISSION: Permission = Permission::Read;
543
544 type Params = ();
545 type Ok = ApiExportStatus;
546
547 async fn handle(
548 _ctx: Ctx<impl Blockstore>,
549 (): Self::Params,
550 _: &http::Extensions,
551 ) -> Result<Self::Ok, ServerError> {
552 let mutex = CHAIN_EXPORT_STATUS.lock();
553
554 let progress = if mutex.initial_epoch == 0 {
555 0.0
556 } else {
557 let p = 1.0 - ((mutex.epoch as f64) / (mutex.initial_epoch as f64));
558 if p.is_finite() {
559 p.clamp(0.0, 1.0)
560 } else {
561 0.0
562 }
563 };
564 let progress = (progress * 100.0).round() / 100.0;
566
567 let status = ApiExportStatus {
568 progress,
569 exporting: mutex.exporting,
570 cancelled: mutex.cancelled,
571 start_time: mutex.start_time,
572 };
573
574 Ok(status)
575 }
576}
577
578pub enum ForestChainExportCancel {}
579impl RpcMethod<0> for ForestChainExportCancel {
580 const NAME: &'static str = "Forest.ChainExportCancel";
581 const PARAM_NAMES: [&'static str; 0] = [];
582 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
583 const PERMISSION: Permission = Permission::Read;
584
585 type Params = ();
586 type Ok = bool;
587
588 async fn handle(
589 _ctx: Ctx<impl Blockstore>,
590 (): Self::Params,
591 _: &http::Extensions,
592 ) -> Result<Self::Ok, ServerError> {
593 if let Some(token) = CHAIN_EXPORT_LOCK.lock().await.as_ref() {
594 token.cancel();
595 return Ok(true);
596 }
597
598 Ok(false)
599 }
600}
601
602pub enum ForestChainExportDiff {}
603impl RpcMethod<1> for ForestChainExportDiff {
604 const NAME: &'static str = "Forest.ChainExportDiff";
605 const PARAM_NAMES: [&'static str; 1] = ["params"];
606 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
607 const PERMISSION: Permission = Permission::Read;
608
609 type Params = (ForestChainExportDiffParams,);
610 type Ok = ();
611
612 async fn handle(
613 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
614 (params,): Self::Params,
615 _: &http::Extensions,
616 ) -> Result<Self::Ok, ServerError> {
617 let ForestChainExportDiffParams {
618 from,
619 to,
620 depth,
621 output_path,
622 } = params;
623
624 let _locked = CHAIN_EXPORT_LOCK.try_lock();
625 if _locked.is_err() {
626 return Err(
627 anyhow::anyhow!("Another chain export diff job is still in progress").into(),
628 );
629 }
630
631 let chain_finality = ctx.chain_config().policy.chain_finality;
632 if depth < chain_finality {
633 return Err(
634 anyhow::anyhow!(format!("depth must be greater than {chain_finality}")).into(),
635 );
636 }
637
638 let head = ctx.chain_store().heaviest_tipset();
639 let start_ts =
640 ctx.chain_index()
641 .tipset_by_height(from, head, ResolveNullTipset::TakeOlder)?;
642
643 crate::tool::subcommands::archive_cmd::do_export(
644 &ctx.store_owned(),
645 start_ts,
646 output_path,
647 None,
648 depth,
649 Some(to),
650 Some(chain_finality),
651 true,
652 )
653 .await?;
654
655 Ok(())
656 }
657}
658
659pub enum ChainExport {}
660impl RpcMethod<1> for ChainExport {
661 const NAME: &'static str = "Filecoin.ChainExport";
662 const PARAM_NAMES: [&'static str; 1] = ["params"];
663 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
664 const PERMISSION: Permission = Permission::Read;
665
666 type Params = (ChainExportParams,);
667 type Ok = ApiExportResult;
668
669 async fn handle(
670 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
671 (ChainExportParams {
672 epoch,
673 recent_roots,
674 output_path,
675 tipset_keys,
676 skip_checksum,
677 dry_run,
678 },): Self::Params,
679 ext: &http::Extensions,
680 ) -> Result<Self::Ok, ServerError> {
681 ForestChainExport::handle(
682 ctx,
683 (ForestChainExportParams {
684 version: FilecoinSnapshotVersion::V1,
685 epoch,
686 recent_roots,
687 output_path,
688 tipset_keys,
689 include_receipts: false,
690 include_events: false,
691 skip_checksum,
692 dry_run,
693 },),
694 ext,
695 )
696 .await
697 }
698}
699
700pub enum ChainReadObj {}
701impl RpcMethod<1> for ChainReadObj {
702 const NAME: &'static str = "Filecoin.ChainReadObj";
703 const PARAM_NAMES: [&'static str; 1] = ["cid"];
704 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
705 const PERMISSION: Permission = Permission::Read;
706 const DESCRIPTION: Option<&'static str> = Some(
707 "Reads IPLD nodes referenced by the specified CID from the chain blockstore and returns raw bytes.",
708 );
709
710 type Params = (Cid,);
711 type Ok = Vec<u8>;
712
713 async fn handle(
714 ctx: Ctx<impl Blockstore>,
715 (cid,): Self::Params,
716 _: &http::Extensions,
717 ) -> Result<Self::Ok, ServerError> {
718 let bytes = ctx
719 .store()
720 .get(&cid)?
721 .with_context(|| format!("can't find object with cid={cid}"))?;
722 Ok(bytes)
723 }
724}
725
726pub enum ChainHasObj {}
727impl RpcMethod<1> for ChainHasObj {
728 const NAME: &'static str = "Filecoin.ChainHasObj";
729 const PARAM_NAMES: [&'static str; 1] = ["cid"];
730 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
731 const PERMISSION: Permission = Permission::Read;
732 const DESCRIPTION: Option<&'static str> =
733 Some("Checks if a given CID exists in the chain blockstore.");
734
735 type Params = (Cid,);
736 type Ok = bool;
737
738 async fn handle(
739 ctx: Ctx<impl Blockstore>,
740 (cid,): Self::Params,
741 _: &http::Extensions,
742 ) -> Result<Self::Ok, ServerError> {
743 Ok(ctx.store().get(&cid)?.is_some())
744 }
745}
746
747pub enum ChainStatObj {}
750impl RpcMethod<2> for ChainStatObj {
751 const NAME: &'static str = "Filecoin.ChainStatObj";
752 const PARAM_NAMES: [&'static str; 2] = ["obj_cid", "base_cid"];
753 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
754 const PERMISSION: Permission = Permission::Read;
755
756 type Params = (Cid, Option<Cid>);
757 type Ok = ObjStat;
758
759 async fn handle(
760 ctx: Ctx<impl Blockstore>,
761 (obj_cid, base_cid): Self::Params,
762 _: &http::Extensions,
763 ) -> Result<Self::Ok, ServerError> {
764 let mut stats = ObjStat::default();
765 let mut seen = CidHashSet::default();
766 let mut walk = |cid, collect| {
767 let mut queue = VecDeque::new();
768 queue.push_back(cid);
769 while let Some(link_cid) = queue.pop_front() {
770 if !seen.insert(link_cid) {
771 continue;
772 }
773 let data = ctx.store().get(&link_cid)?;
774 if let Some(data) = data {
775 if collect {
776 stats.links += 1;
777 stats.size += data.len();
778 }
779 if matches!(link_cid.codec(), fvm_ipld_encoding::DAG_CBOR)
780 && let Ok(ipld) =
781 crate::utils::encoding::from_slice_with_fallback::<Ipld>(&data)
782 {
783 for ipld in DfsIter::new(ipld) {
784 if let Ipld::Link(cid) = ipld {
785 queue.push_back(cid);
786 }
787 }
788 }
789 }
790 }
791 anyhow::Ok(())
792 };
793 if let Some(base_cid) = base_cid {
794 walk(base_cid, false)?;
795 }
796 walk(obj_cid, true)?;
797 Ok(stats)
798 }
799}
800
801pub enum ChainGetBlockMessages {}
802impl RpcMethod<1> for ChainGetBlockMessages {
803 const NAME: &'static str = "Filecoin.ChainGetBlockMessages";
804 const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
805 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
806 const PERMISSION: Permission = Permission::Read;
807 const DESCRIPTION: Option<&'static str> =
808 Some("Returns all messages from the specified block.");
809
810 type Params = (Cid,);
811 type Ok = BlockMessages;
812
813 async fn handle(
814 ctx: Ctx<impl Blockstore>,
815 (block_cid,): Self::Params,
816 _: &http::Extensions,
817 ) -> Result<Self::Ok, ServerError> {
818 let blk: CachingBlockHeader = ctx.store().get_cbor_required(&block_cid)?;
819 let (unsigned_cids, signed_cids) = crate::chain::read_msg_cids(ctx.store(), &blk)?;
820 let (bls_msg, secp_msg) =
821 crate::chain::block_messages_from_cids(ctx.store(), &unsigned_cids, &signed_cids)?;
822 let cids = unsigned_cids.into_iter().chain(signed_cids).collect();
823
824 let ret = BlockMessages {
825 bls_msg,
826 secp_msg,
827 cids,
828 };
829 Ok(ret)
830 }
831}
832
833pub enum ChainGetPath {}
834impl RpcMethod<2> for ChainGetPath {
835 const NAME: &'static str = "Filecoin.ChainGetPath";
836 const PARAM_NAMES: [&'static str; 2] = ["from", "to"];
837 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
838 const PERMISSION: Permission = Permission::Read;
839 const DESCRIPTION: Option<&'static str> =
840 Some("Returns the path between the two specified tipsets.");
841
842 type Params = (TipsetKey, TipsetKey);
843 type Ok = Vec<PathChange>;
844
845 async fn handle(
846 ctx: Ctx<impl Blockstore>,
847 (from, to): Self::Params,
848 _: &http::Extensions,
849 ) -> Result<Self::Ok, ServerError> {
850 impl_chain_get_path(ctx.chain_store(), &from, &to).map_err(Into::into)
851 }
852}
853
854fn impl_chain_get_path(
872 chain_store: &ChainStore<impl Blockstore>,
873 from: &TipsetKey,
874 to: &TipsetKey,
875) -> anyhow::Result<Vec<PathChange>> {
876 let mut to_revert = chain_store
877 .load_required_tipset_or_heaviest(from)
878 .context("couldn't load `from`")?;
879 let mut to_apply = chain_store
880 .load_required_tipset_or_heaviest(to)
881 .context("couldn't load `to`")?;
882
883 let mut all_reverts = vec![];
884 let mut all_applies = vec![];
885
886 while to_revert != to_apply {
889 if to_revert.epoch() > to_apply.epoch() {
890 let next = chain_store
891 .load_required_tipset_or_heaviest(to_revert.parents())
892 .context("couldn't load ancestor of `from`")?;
893 all_reverts.push(to_revert);
894 to_revert = next;
895 } else {
896 let next = chain_store
897 .load_required_tipset_or_heaviest(to_apply.parents())
898 .context("couldn't load ancestor of `to`")?;
899 all_applies.push(to_apply);
900 to_apply = next;
901 }
902 }
903 Ok(all_reverts
904 .into_iter()
905 .map(PathChange::Revert)
906 .chain(all_applies.into_iter().rev().map(PathChange::Apply))
907 .collect())
908}
909
910pub enum ChainGetTipSetByHeight {}
914impl RpcMethod<2> for ChainGetTipSetByHeight {
915 const NAME: &'static str = "Filecoin.ChainGetTipSetByHeight";
916 const PARAM_NAMES: [&'static str; 2] = ["height", "tipsetKey"];
917 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
918 const PERMISSION: Permission = Permission::Read;
919 const DESCRIPTION: Option<&'static str> = Some("Returns the tipset at the specified height.");
920
921 type Params = (ChainEpoch, ApiTipsetKey);
922 type Ok = Tipset;
923
924 async fn handle(
925 ctx: Ctx<impl Blockstore>,
926 (height, ApiTipsetKey(tipset_key)): Self::Params,
927 _: &http::Extensions,
928 ) -> Result<Self::Ok, ServerError> {
929 let ts = ctx
930 .chain_store()
931 .load_required_tipset_or_heaviest(&tipset_key)?;
932 let tss = ctx
933 .chain_index()
934 .tipset_by_height(height, ts, ResolveNullTipset::TakeOlder)?;
935 Ok(tss)
936 }
937}
938
939pub enum ChainGetTipSetAfterHeight {}
940impl RpcMethod<2> for ChainGetTipSetAfterHeight {
941 const NAME: &'static str = "Filecoin.ChainGetTipSetAfterHeight";
942 const PARAM_NAMES: [&'static str; 2] = ["height", "tipsetKey"];
943 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
944 const PERMISSION: Permission = Permission::Read;
945 const DESCRIPTION: Option<&'static str> = Some(
946 "Looks back and returns the tipset at the specified epoch.
947 If there are no blocks at the given epoch,
948 returns the first non-nil tipset at a later epoch.",
949 );
950
951 type Params = (ChainEpoch, ApiTipsetKey);
952 type Ok = Tipset;
953
954 async fn handle(
955 ctx: Ctx<impl Blockstore>,
956 (height, ApiTipsetKey(tipset_key)): Self::Params,
957 _: &http::Extensions,
958 ) -> Result<Self::Ok, ServerError> {
959 let ts = ctx
960 .chain_store()
961 .load_required_tipset_or_heaviest(&tipset_key)?;
962 let tss = ctx
963 .chain_index()
964 .tipset_by_height(height, ts, ResolveNullTipset::TakeNewer)?;
965 Ok(tss)
966 }
967}
968
969pub enum ChainGetGenesis {}
970impl RpcMethod<0> for ChainGetGenesis {
971 const NAME: &'static str = "Filecoin.ChainGetGenesis";
972 const PARAM_NAMES: [&'static str; 0] = [];
973 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
974 const PERMISSION: Permission = Permission::Read;
975
976 type Params = ();
977 type Ok = Option<Tipset>;
978
979 async fn handle(
980 ctx: Ctx<impl Blockstore>,
981 (): Self::Params,
982 _: &http::Extensions,
983 ) -> Result<Self::Ok, ServerError> {
984 let genesis = ctx.chain_store().genesis_block_header();
985 Ok(Some(Tipset::from(genesis)))
986 }
987}
988
989pub enum ChainHead {}
990impl RpcMethod<0> for ChainHead {
991 const NAME: &'static str = "Filecoin.ChainHead";
992 const PARAM_NAMES: [&'static str; 0] = [];
993 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
994 const PERMISSION: Permission = Permission::Read;
995 const DESCRIPTION: Option<&'static str> = Some("Returns the chain head (heaviest tipset).");
996
997 type Params = ();
998 type Ok = Tipset;
999
1000 async fn handle(
1001 ctx: Ctx<impl Blockstore>,
1002 (): Self::Params,
1003 _: &http::Extensions,
1004 ) -> Result<Self::Ok, ServerError> {
1005 let heaviest = ctx.chain_store().heaviest_tipset();
1006 Ok(heaviest)
1007 }
1008}
1009
1010pub enum ChainGetBlock {}
1011impl RpcMethod<1> for ChainGetBlock {
1012 const NAME: &'static str = "Filecoin.ChainGetBlock";
1013 const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
1014 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1015 const PERMISSION: Permission = Permission::Read;
1016 const DESCRIPTION: Option<&'static str> = Some("Returns the block with the specified CID.");
1017
1018 type Params = (Cid,);
1019 type Ok = CachingBlockHeader;
1020
1021 async fn handle(
1022 ctx: Ctx<impl Blockstore>,
1023 (block_cid,): Self::Params,
1024 _: &http::Extensions,
1025 ) -> Result<Self::Ok, ServerError> {
1026 let blk: CachingBlockHeader = ctx.store().get_cbor_required(&block_cid)?;
1027 Ok(blk)
1028 }
1029}
1030
1031pub enum ChainGetTipSet {}
1032impl RpcMethod<1> for ChainGetTipSet {
1033 const NAME: &'static str = "Filecoin.ChainGetTipSet";
1034 const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
1035 const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::{ V0 | V1 });
1036 const PERMISSION: Permission = Permission::Read;
1037 const DESCRIPTION: Option<&'static str> = Some("Returns the tipset with the specified CID.");
1038
1039 type Params = (ApiTipsetKey,);
1040 type Ok = Tipset;
1041
1042 async fn handle(
1043 ctx: Ctx<impl Blockstore>,
1044 (ApiTipsetKey(tsk),): Self::Params,
1045 _: &http::Extensions,
1046 ) -> Result<Self::Ok, ServerError> {
1047 if let Some(tsk) = &tsk {
1048 let ts = ctx.chain_index().load_required_tipset(tsk)?;
1049 Ok(ts)
1050 } else {
1051 Err(anyhow::anyhow!(
1053 "TipsetKey cannot be empty (NewTipSet called with zero length array of blocks)"
1054 )
1055 .into())
1056 }
1057 }
1058}
1059
1060pub enum ChainGetTipSetV2 {}
1061
1062impl ChainGetTipSetV2 {
1063 pub async fn get_tipset_by_anchor(
1064 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1065 anchor: &Option<TipsetAnchor>,
1066 ) -> anyhow::Result<Option<Tipset>> {
1067 if let Some(anchor) = anchor {
1068 match (&anchor.key.0, &anchor.tag) {
1069 (None, None) => Ok(Some(ctx.state_manager.heaviest_tipset())),
1071 (Some(tsk), None) => Ok(Some(ctx.chain_index().load_required_tipset(tsk)?)),
1073 (None, Some(tag)) => Self::get_tipset_by_tag(ctx, *tag).await,
1074 _ => {
1075 anyhow::bail!("invalid anchor")
1076 }
1077 }
1078 } else {
1079 Self::get_tipset_by_tag(ctx, TipsetTag::Finalized).await
1081 }
1082 }
1083
1084 pub async fn get_tipset_by_tag(
1085 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1086 tag: TipsetTag,
1087 ) -> anyhow::Result<Option<Tipset>> {
1088 match tag {
1089 TipsetTag::Latest => Ok(Some(ctx.state_manager.heaviest_tipset())),
1090 TipsetTag::Finalized => Self::get_latest_finalized_tipset(ctx).await,
1091 TipsetTag::Safe => Some(Self::get_latest_safe_tipset(ctx).await).transpose(),
1092 }
1093 }
1094
1095 pub async fn get_latest_safe_tipset(
1096 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1097 ) -> anyhow::Result<Tipset> {
1098 let finalized = Self::get_latest_finalized_tipset(ctx).await?;
1099 let head = ctx.chain_store().heaviest_tipset();
1100 let safe_height = (head.epoch() - SAFE_HEIGHT_DISTANCE).max(0);
1101 if let Some(finalized) = finalized
1102 && finalized.epoch() >= safe_height
1103 {
1104 Ok(finalized)
1105 } else {
1106 Ok(ctx.chain_index().tipset_by_height(
1107 safe_height,
1108 head,
1109 ResolveNullTipset::TakeOlder,
1110 )?)
1111 }
1112 }
1113
1114 pub async fn get_latest_finalized_tipset(
1115 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1116 ) -> anyhow::Result<Option<Tipset>> {
1117 let Ok(f3_finalized_cert) = crate::rpc::f3::F3GetLatestCertificate::get().await else {
1118 return Self::get_ec_finalized_tipset(ctx);
1119 };
1120
1121 let f3_finalized_head = f3_finalized_cert.chain_head();
1122 let head = ctx.chain_store().heaviest_tipset();
1123 if head.epoch() > f3_finalized_head.epoch + ctx.chain_config().policy.chain_finality {
1125 return Self::get_ec_finalized_tipset(ctx);
1126 }
1127
1128 let ts = ctx
1129 .chain_index()
1130 .load_required_tipset(&f3_finalized_head.key)
1131 .map_err(|e| {
1132 anyhow::anyhow!(
1133 "Failed to load F3 finalized tipset at epoch {} with key {}: {e}",
1134 f3_finalized_head.epoch,
1135 f3_finalized_head.key,
1136 )
1137 })?;
1138 Ok(Some(ts))
1139 }
1140
1141 pub fn get_ec_finalized_tipset(ctx: &Ctx<impl Blockstore>) -> anyhow::Result<Option<Tipset>> {
1142 let head = ctx.chain_store().heaviest_tipset();
1143 let ec_finality_epoch = head.epoch() - ctx.chain_config().policy.chain_finality;
1144 if ec_finality_epoch >= 0 {
1145 let ts = ctx.chain_index().tipset_by_height(
1146 ec_finality_epoch,
1147 head,
1148 ResolveNullTipset::TakeOlder,
1149 )?;
1150 Ok(Some(ts))
1151 } else {
1152 Ok(None)
1153 }
1154 }
1155
1156 pub async fn get_tipset(
1157 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1158 selector: &TipsetSelector,
1159 ) -> anyhow::Result<Option<Tipset>> {
1160 selector.validate()?;
1161 if let ApiTipsetKey(Some(tsk)) = &selector.key {
1163 let ts = ctx.chain_index().load_required_tipset(tsk)?;
1164 return Ok(Some(ts));
1165 }
1166 if let Some(height) = &selector.height {
1168 let anchor = Self::get_tipset_by_anchor(ctx, &height.anchor).await?;
1169 let ts = ctx.chain_index().tipset_by_height(
1170 height.at,
1171 anchor.unwrap_or_else(|| ctx.chain_store().heaviest_tipset()),
1172 height.resolve_null_tipset_policy(),
1173 )?;
1174 return Ok(Some(ts));
1175 }
1176 if let Some(tag) = &selector.tag {
1178 let ts = Self::get_tipset_by_tag(ctx, *tag).await?;
1179 return Ok(ts);
1180 }
1181 anyhow::bail!("no tipset found for selector")
1182 }
1183
1184 pub async fn get_required_tipset(
1185 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1186 selector: &TipsetSelector,
1187 ) -> anyhow::Result<Tipset> {
1188 Self::get_tipset(ctx, selector)
1189 .await?
1190 .context("failed to select a tipset")
1191 }
1192}
1193
1194impl RpcMethod<1> for ChainGetTipSetV2 {
1195 const NAME: &'static str = "Filecoin.ChainGetTipSet";
1196 const PARAM_NAMES: [&'static str; 1] = ["tipsetSelector"];
1197 const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::{ V2 });
1198 const PERMISSION: Permission = Permission::Read;
1199 const DESCRIPTION: Option<&'static str> = Some("Returns the tipset with the specified CID.");
1200
1201 type Params = (TipsetSelector,);
1202 type Ok = Option<Tipset>;
1203
1204 async fn handle(
1205 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
1206 (selector,): Self::Params,
1207 _: &http::Extensions,
1208 ) -> Result<Self::Ok, ServerError> {
1209 Ok(Self::get_tipset(&ctx, &selector).await?)
1210 }
1211}
1212
1213pub enum ChainSetHead {}
1214impl RpcMethod<1> for ChainSetHead {
1215 const NAME: &'static str = "Filecoin.ChainSetHead";
1216 const PARAM_NAMES: [&'static str; 1] = ["tsk"];
1217 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1218 const PERMISSION: Permission = Permission::Admin;
1219
1220 type Params = (TipsetKey,);
1221 type Ok = ();
1222
1223 async fn handle(
1224 ctx: Ctx<impl Blockstore>,
1225 (tsk,): Self::Params,
1226 _: &http::Extensions,
1227 ) -> Result<Self::Ok, ServerError> {
1228 let new_head = ctx.chain_index().load_required_tipset(&tsk)?;
1232 let mut current = ctx.chain_store().heaviest_tipset();
1233 while current.epoch() >= new_head.epoch() {
1234 for cid in current.key().to_cids() {
1235 ctx.chain_store().unmark_block_as_validated(&cid);
1236 }
1237 let parents = ¤t.block_headers().first().parents;
1238 current = ctx.chain_index().load_required_tipset(parents)?;
1239 }
1240 ctx.chain_store()
1241 .set_heaviest_tipset(new_head)
1242 .map_err(Into::into)
1243 }
1244}
1245
1246pub enum ChainGetMinBaseFee {}
1247impl RpcMethod<1> for ChainGetMinBaseFee {
1248 const NAME: &'static str = "Forest.ChainGetMinBaseFee";
1249 const PARAM_NAMES: [&'static str; 1] = ["lookback"];
1250 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1251 const PERMISSION: Permission = Permission::Read;
1252
1253 type Params = (u32,);
1254 type Ok = String;
1255
1256 async fn handle(
1257 ctx: Ctx<impl Blockstore>,
1258 (lookback,): Self::Params,
1259 _: &http::Extensions,
1260 ) -> Result<Self::Ok, ServerError> {
1261 let mut current = ctx.chain_store().heaviest_tipset();
1262 let mut min_base_fee = current.block_headers().first().parent_base_fee.clone();
1263
1264 for _ in 0..lookback {
1265 let parents = ¤t.block_headers().first().parents;
1266 current = ctx.chain_index().load_required_tipset(parents)?;
1267
1268 min_base_fee =
1269 min_base_fee.min(current.block_headers().first().parent_base_fee.to_owned());
1270 }
1271
1272 Ok(min_base_fee.atto().to_string())
1273 }
1274}
1275
1276pub enum ChainTipSetWeight {}
1277impl RpcMethod<1> for ChainTipSetWeight {
1278 const NAME: &'static str = "Filecoin.ChainTipSetWeight";
1279 const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
1280 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1281 const PERMISSION: Permission = Permission::Read;
1282 const DESCRIPTION: Option<&'static str> = Some("Returns the weight of the specified tipset.");
1283
1284 type Params = (ApiTipsetKey,);
1285 type Ok = BigInt;
1286
1287 async fn handle(
1288 ctx: Ctx<impl Blockstore>,
1289 (ApiTipsetKey(tipset_key),): Self::Params,
1290 _: &http::Extensions,
1291 ) -> Result<Self::Ok, ServerError> {
1292 let ts = ctx
1293 .chain_store()
1294 .load_required_tipset_or_heaviest(&tipset_key)?;
1295 let weight = crate::fil_cns::weight(ctx.store(), &ts)?;
1296 Ok(weight)
1297 }
1298}
1299
1300pub enum ChainGetTipsetByParentState {}
1301impl RpcMethod<1> for ChainGetTipsetByParentState {
1302 const NAME: &'static str = "Forest.ChainGetTipsetByParentState";
1303 const PARAM_NAMES: [&'static str; 1] = ["parentState"];
1304 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1305 const PERMISSION: Permission = Permission::Read;
1306
1307 type Params = (Cid,);
1308 type Ok = Option<Tipset>;
1309
1310 async fn handle(
1311 ctx: Ctx<impl Blockstore>,
1312 (parent_state,): Self::Params,
1313 _: &http::Extensions,
1314 ) -> Result<Self::Ok, ServerError> {
1315 Ok(ctx
1316 .chain_store()
1317 .heaviest_tipset()
1318 .chain(ctx.store())
1319 .find(|ts| ts.parent_state() == &parent_state)
1320 .clone())
1321 }
1322}
1323
1324pub const CHAIN_NOTIFY: &str = "Filecoin.ChainNotify";
1325pub(crate) fn chain_notify<DB: Blockstore>(
1326 _params: Params<'_>,
1327 data: &crate::rpc::RPCState<DB>,
1328) -> Subscriber<Vec<ApiHeadChange>> {
1329 let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
1330
1331 let current = data.chain_store().heaviest_tipset();
1333 let (change, tipset) = ("current".into(), current);
1334 sender
1335 .send(vec![ApiHeadChange { change, tipset }])
1336 .expect("receiver is not dropped");
1337
1338 let mut subscriber = data.chain_store().publisher().subscribe();
1339
1340 tokio::spawn(async move {
1341 let _ = subscriber.recv().await;
1343
1344 while let Ok(v) = subscriber.recv().await {
1345 let (change, tipset) = match v {
1346 HeadChange::Apply(ts) => ("apply".into(), ts),
1347 };
1348
1349 if sender.send(vec![ApiHeadChange { change, tipset }]).is_err() {
1350 break;
1351 }
1352 }
1353 });
1354 receiver
1355}
1356
1357async fn load_api_messages_from_tipset<DB: Blockstore + Send + Sync + 'static>(
1358 ctx: &crate::rpc::RPCState<DB>,
1359 tipset_keys: &TipsetKey,
1360) -> Result<Vec<ApiMessage>, ServerError> {
1361 static SHOULD_BACKFILL: LazyLock<bool> = LazyLock::new(|| {
1362 let enabled = is_env_truthy("FOREST_RPC_BACKFILL_FULL_TIPSET_FROM_NETWORK");
1363 if enabled {
1364 tracing::warn!(
1365 "Full tipset backfilling from network is enabled via FOREST_RPC_BACKFILL_FULL_TIPSET_FROM_NETWORK, excessive disk and bandwidth usage is expected."
1366 );
1367 }
1368 enabled
1369 });
1370 let full_tipset = if *SHOULD_BACKFILL {
1371 get_full_tipset(
1372 &ctx.sync_network_context,
1373 ctx.chain_store(),
1374 None,
1375 tipset_keys,
1376 )
1377 .await?
1378 } else {
1379 load_full_tipset(ctx.chain_store(), tipset_keys)?
1380 };
1381 let blocks = full_tipset.into_blocks();
1382 let mut messages = vec![];
1383 let mut seen = CidHashSet::default();
1384 for Block {
1385 bls_messages,
1386 secp_messages,
1387 ..
1388 } in blocks
1389 {
1390 for message in bls_messages {
1391 let cid = message.cid();
1392 if seen.insert(cid) {
1393 messages.push(ApiMessage { cid, message });
1394 }
1395 }
1396
1397 for msg in secp_messages {
1398 let cid = msg.cid();
1399 if seen.insert(cid) {
1400 messages.push(ApiMessage {
1401 cid,
1402 message: msg.message,
1403 });
1404 }
1405 }
1406 }
1407
1408 Ok(messages)
1409}
1410
1411#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
1412pub struct BlockMessages {
1413 #[serde(rename = "BlsMessages", with = "crate::lotus_json")]
1414 #[schemars(with = "LotusJson<Vec<Message>>")]
1415 pub bls_msg: Vec<Message>,
1416 #[serde(rename = "SecpkMessages", with = "crate::lotus_json")]
1417 #[schemars(with = "LotusJson<Vec<SignedMessage>>")]
1418 pub secp_msg: Vec<SignedMessage>,
1419 #[serde(rename = "Cids", with = "crate::lotus_json")]
1420 #[schemars(with = "LotusJson<Vec<Cid>>")]
1421 pub cids: Vec<Cid>,
1422}
1423lotus_json_with_self!(BlockMessages);
1424
1425#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, JsonSchema)]
1426#[serde(rename_all = "PascalCase")]
1427pub struct ApiReceipt {
1428 pub exit_code: ExitCode,
1430 #[serde(rename = "Return", with = "crate::lotus_json")]
1432 #[schemars(with = "LotusJson<RawBytes>")]
1433 pub return_data: RawBytes,
1434 pub gas_used: u64,
1436 #[serde(with = "crate::lotus_json")]
1437 #[schemars(with = "LotusJson<Option<Cid>>")]
1438 pub events_root: Option<Cid>,
1439}
1440
1441lotus_json_with_self!(ApiReceipt);
1442
1443#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug, Eq, PartialEq)]
1444#[serde(rename_all = "PascalCase")]
1445pub struct ApiMessage {
1446 #[serde(with = "crate::lotus_json")]
1447 #[schemars(with = "LotusJson<Cid>")]
1448 pub cid: Cid,
1449 #[serde(with = "crate::lotus_json")]
1450 #[schemars(with = "LotusJson<Message>")]
1451 pub message: Message,
1452}
1453
1454lotus_json_with_self!(ApiMessage);
1455
1456#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug, Eq, PartialEq)]
1457pub struct FlattenedApiMessage {
1458 #[serde(flatten, with = "crate::lotus_json")]
1459 #[schemars(with = "LotusJson<Message>")]
1460 pub message: Message,
1461 #[serde(rename = "CID", with = "crate::lotus_json")]
1462 #[schemars(with = "LotusJson<Cid>")]
1463 pub cid: Cid,
1464}
1465
1466lotus_json_with_self!(FlattenedApiMessage);
1467
1468#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1469pub struct ForestChainExportParams {
1470 pub version: FilecoinSnapshotVersion,
1471 pub epoch: ChainEpoch,
1472 pub recent_roots: i64,
1473 pub output_path: PathBuf,
1474 #[schemars(with = "LotusJson<ApiTipsetKey>")]
1475 #[serde(with = "crate::lotus_json")]
1476 pub tipset_keys: ApiTipsetKey,
1477 #[serde(default)]
1478 pub include_receipts: bool,
1479 #[serde(default)]
1480 pub include_events: bool,
1481 pub skip_checksum: bool,
1482 pub dry_run: bool,
1483}
1484lotus_json_with_self!(ForestChainExportParams);
1485
1486#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1487pub struct ForestChainExportDiffParams {
1488 pub from: ChainEpoch,
1489 pub to: ChainEpoch,
1490 pub depth: i64,
1491 pub output_path: PathBuf,
1492}
1493lotus_json_with_self!(ForestChainExportDiffParams);
1494
1495#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1496pub struct ChainExportParams {
1497 pub epoch: ChainEpoch,
1498 pub recent_roots: i64,
1499 pub output_path: PathBuf,
1500 #[schemars(with = "LotusJson<ApiTipsetKey>")]
1501 #[serde(with = "crate::lotus_json")]
1502 pub tipset_keys: ApiTipsetKey,
1503 pub skip_checksum: bool,
1504 pub dry_run: bool,
1505}
1506lotus_json_with_self!(ChainExportParams);
1507
1508#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
1509#[serde(rename_all = "PascalCase")]
1510pub struct ApiHeadChange {
1511 #[serde(rename = "Type")]
1512 pub change: String,
1513 #[serde(rename = "Val", with = "crate::lotus_json")]
1514 #[schemars(with = "LotusJson<Tipset>")]
1515 pub tipset: Tipset,
1516}
1517lotus_json_with_self!(ApiHeadChange);
1518
1519#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
1520#[serde(tag = "Type", content = "Val", rename_all = "snake_case")]
1521pub enum PathChange<T = Tipset> {
1522 Revert(T),
1523 Apply(T),
1524}
1525impl HasLotusJson for PathChange {
1526 type LotusJson = PathChange<<Tipset as HasLotusJson>::LotusJson>;
1527
1528 #[cfg(test)]
1529 fn snapshots() -> Vec<(serde_json::Value, Self)> {
1530 use serde_json::json;
1531 vec![(
1532 json!({
1533 "Type": "revert",
1534 "Val": {
1535 "Blocks": [
1536 {
1537 "BeaconEntries": null,
1538 "ForkSignaling": 0,
1539 "Height": 0,
1540 "Messages": { "/": "baeaaaaa" },
1541 "Miner": "f00",
1542 "ParentBaseFee": "0",
1543 "ParentMessageReceipts": { "/": "baeaaaaa" },
1544 "ParentStateRoot": { "/":"baeaaaaa" },
1545 "ParentWeight": "0",
1546 "Parents": [{"/":"bafyreiaqpwbbyjo4a42saasj36kkrpv4tsherf2e7bvezkert2a7dhonoi"}],
1547 "Timestamp": 0,
1548 "WinPoStProof": null
1549 }
1550 ],
1551 "Cids": [
1552 { "/": "bafy2bzaceag62hjj3o43lf6oyeox3fvg5aqkgl5zagbwpjje3ajwg6yw4iixk" }
1553 ],
1554 "Height": 0
1555 }
1556 }),
1557 Self::Revert(RawBlockHeader::default().into()),
1558 )]
1559 }
1560
1561 fn into_lotus_json(self) -> Self::LotusJson {
1562 match self {
1563 PathChange::Revert(it) => PathChange::Revert(it.into_lotus_json()),
1564 PathChange::Apply(it) => PathChange::Apply(it.into_lotus_json()),
1565 }
1566 }
1567
1568 fn from_lotus_json(lotus_json: Self::LotusJson) -> Self {
1569 match lotus_json {
1570 PathChange::Revert(it) => PathChange::Revert(Tipset::from_lotus_json(it)),
1571 PathChange::Apply(it) => PathChange::Apply(Tipset::from_lotus_json(it)),
1572 }
1573 }
1574}
1575
1576#[cfg(test)]
1577impl<T> quickcheck::Arbitrary for PathChange<T>
1578where
1579 T: quickcheck::Arbitrary,
1580{
1581 fn arbitrary(g: &mut quickcheck::Gen) -> Self {
1582 let inner = T::arbitrary(g);
1583 g.choose(&[PathChange::Apply(inner.clone()), PathChange::Revert(inner)])
1584 .unwrap()
1585 .clone()
1586 }
1587}
1588
1589#[test]
1590fn snapshots() {
1591 assert_all_snapshots::<PathChange>()
1592}
1593
1594#[cfg(test)]
1595quickcheck::quickcheck! {
1596 fn quickcheck(val: PathChange) -> () {
1597 assert_unchanged_via_json(val)
1598 }
1599}
1600
1601#[cfg(test)]
1602mod tests {
1603 use super::*;
1604 use crate::{
1605 blocks::{Chain4U, RawBlockHeader, chain4u},
1606 db::{
1607 MemoryDB,
1608 car::{AnyCar, ManyCar},
1609 },
1610 networks::{self, ChainConfig},
1611 };
1612 use PathChange::{Apply, Revert};
1613 use itertools::Itertools as _;
1614 use std::sync::Arc;
1615
1616 #[test]
1617 fn revert_to_ancestor_linear() {
1618 let store = ChainStore::calibnet();
1619 chain4u! {
1620 in store.blockstore();
1621 [_genesis = store.genesis_block_header()]
1622 -> [a] -> [b] -> [c, d] -> [e]
1623 };
1624
1625 assert_path_change(&store, b, a, [Revert(&[b])]);
1627
1628 assert_path_change(&store, [c, d], a, [Revert(&[c, d][..]), Revert(&[b])]);
1630
1631 assert_path_change(&store, e, [c, d], [Revert(e)]);
1633
1634 assert_path_change(&store, e, b, [Revert(&[e][..]), Revert(&[c, d])]);
1636 }
1637
1638 #[test]
1641 fn incomplete_tipsets() {
1642 let store = ChainStore::calibnet();
1643 chain4u! {
1644 in store.blockstore();
1645 [_genesis = store.genesis_block_header()]
1646 -> [a, b] -> [c] -> [d, _e] };
1648
1649 assert_path_change(
1651 &store,
1652 a,
1653 c,
1654 [
1655 Revert(&[a][..]), Apply(&[a, b]), Apply(&[c]), ],
1659 );
1660
1661 assert_path_change(&store, c, d, [Apply(d)]);
1663
1664 assert_path_change(&store, d, c, [Revert(d)]);
1666
1667 assert_path_change(
1669 &store,
1670 c,
1671 a,
1672 [
1673 Revert(&[c][..]),
1674 Revert(&[a, b]), Apply(&[a]), ],
1677 );
1678 }
1679
1680 #[test]
1681 fn apply_to_descendant_linear() {
1682 let store = ChainStore::calibnet();
1683 chain4u! {
1684 in store.blockstore();
1685 [_genesis = store.genesis_block_header()]
1686 -> [a] -> [b] -> [c, d] -> [e]
1687 };
1688
1689 assert_path_change(&store, a, b, [Apply(&[b])]);
1691
1692 assert_path_change(&store, [c, d], e, [Apply(e)]);
1694
1695 assert_path_change(&store, b, [c, d], [Apply([c, d])]);
1697
1698 assert_path_change(&store, b, e, [Apply(&[c, d][..]), Apply(&[e])]);
1700 }
1701
1702 #[test]
1703 fn cross_fork_simple() {
1704 let store = ChainStore::calibnet();
1705 chain4u! {
1706 in store.blockstore();
1707 [_genesis = store.genesis_block_header()]
1708 -> [a] -> [b1] -> [c1]
1709 };
1710 chain4u! {
1711 from [a] in store.blockstore();
1712 [b2] -> [c2]
1713 };
1714
1715 assert_path_change(&store, b1, b2, [Revert(b1), Apply(b2)]);
1717
1718 assert_path_change(&store, b1, c2, [Revert(b1), Apply(b2), Apply(c2)]);
1720
1721 let _ = (a, c1);
1722 }
1723
1724 impl ChainStore<Chain4U<ManyCar>> {
1725 fn _load(genesis_car: &'static [u8], genesis_cid: Cid) -> Self {
1726 let db = Arc::new(Chain4U::with_blockstore(
1727 ManyCar::new(MemoryDB::default())
1728 .with_read_only(AnyCar::new(genesis_car).unwrap())
1729 .unwrap(),
1730 ));
1731 let genesis_block_header = db.get_cbor(&genesis_cid).unwrap().unwrap();
1732 ChainStore::new(
1733 db,
1734 Arc::new(MemoryDB::default()),
1735 Arc::new(MemoryDB::default()),
1736 Arc::new(ChainConfig::calibnet()),
1737 genesis_block_header,
1738 )
1739 .unwrap()
1740 }
1741 pub fn calibnet() -> Self {
1742 Self::_load(
1743 networks::calibnet::DEFAULT_GENESIS,
1744 *networks::calibnet::GENESIS_CID,
1745 )
1746 }
1747 }
1748
1749 trait MakeTipset {
1751 fn make_tipset(self) -> Tipset;
1752 }
1753
1754 impl MakeTipset for &RawBlockHeader {
1755 fn make_tipset(self) -> Tipset {
1756 Tipset::from(CachingBlockHeader::new(self.clone()))
1757 }
1758 }
1759
1760 impl<const N: usize> MakeTipset for [&RawBlockHeader; N] {
1761 fn make_tipset(self) -> Tipset {
1762 self.as_slice().make_tipset()
1763 }
1764 }
1765
1766 impl<const N: usize> MakeTipset for &[&RawBlockHeader; N] {
1767 fn make_tipset(self) -> Tipset {
1768 self.as_slice().make_tipset()
1769 }
1770 }
1771
1772 impl MakeTipset for &[&RawBlockHeader] {
1773 fn make_tipset(self) -> Tipset {
1774 Tipset::new(self.iter().cloned().cloned()).unwrap()
1775 }
1776 }
1777
1778 #[track_caller]
1779 fn assert_path_change<T: MakeTipset>(
1780 store: &ChainStore<impl Blockstore>,
1781 from: impl MakeTipset,
1782 to: impl MakeTipset,
1783 expected: impl IntoIterator<Item = PathChange<T>>,
1784 ) {
1785 fn print(path_change: &PathChange) {
1786 let it = match path_change {
1787 Revert(it) => {
1788 print!("Revert(");
1789 it
1790 }
1791 Apply(it) => {
1792 print!(" Apply(");
1793 it
1794 }
1795 };
1796 println!(
1797 "epoch = {}, key.cid = {})",
1798 it.epoch(),
1799 it.key().cid().unwrap()
1800 )
1801 }
1802
1803 let actual =
1804 impl_chain_get_path(store, from.make_tipset().key(), to.make_tipset().key()).unwrap();
1805 let expected = expected
1806 .into_iter()
1807 .map(|change| match change {
1808 PathChange::Revert(it) => PathChange::Revert(it.make_tipset()),
1809 PathChange::Apply(it) => PathChange::Apply(it.make_tipset()),
1810 })
1811 .collect_vec();
1812 if expected != actual {
1813 println!("SUMMARY");
1814 println!("=======");
1815 println!("expected:");
1816 for it in &expected {
1817 print(it)
1818 }
1819 println!();
1820 println!("actual:");
1821 for it in &actual {
1822 print(it)
1823 }
1824 println!("=======\n")
1825 }
1826 assert_eq!(
1827 expected, actual,
1828 "expected change (left) does not match actual change (right)"
1829 )
1830 }
1831}