1pub mod types;
5use enumflags2::{BitFlags, make_bitflags};
6use types::*;
7
8#[cfg(test)]
9use crate::blocks::RawBlockHeader;
10use crate::blocks::{Block, CachingBlockHeader, Tipset, TipsetKey};
11use crate::chain::index::ResolveNullTipset;
12use crate::chain::{ChainStore, ExportOptions, FilecoinSnapshotVersion, HeadChange};
13use crate::chain_sync::{get_full_tipset, load_full_tipset};
14use crate::cid_collections::CidHashSet;
15use crate::ipld::DfsIter;
16use crate::ipld::{CHAIN_EXPORT_STATUS, cancel_export, end_export, start_export};
17use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self};
18#[cfg(test)]
19use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json};
20use crate::message::{ChainMessage, SignedMessage};
21use crate::rpc::eth::Block as EthBlock;
22use crate::rpc::eth::{
23 EthLog, TxInfo, eth_logs_with_filter, types::ApiHeaders, types::EthFilterSpec,
24};
25use crate::rpc::f3::F3ExportLatestSnapshot;
26use crate::rpc::types::*;
27use crate::rpc::{ApiPaths, Ctx, EthEventHandler, Permission, RpcMethod, ServerError};
28use crate::shim::clock::ChainEpoch;
29use crate::shim::error::ExitCode;
30use crate::shim::executor::Receipt;
31use crate::shim::message::Message;
32use crate::utils::db::CborStoreExt as _;
33use crate::utils::io::VoidAsyncWriter;
34use crate::utils::misc::env::is_env_truthy;
35use anyhow::{Context as _, Result};
36use cid::Cid;
37use fvm_ipld_blockstore::Blockstore;
38use fvm_ipld_encoding::{CborStore, RawBytes};
39use hex::ToHex;
40use ipld_core::ipld::Ipld;
41use jsonrpsee::types::Params;
42use jsonrpsee::types::error::ErrorObjectOwned;
43use num::BigInt;
44use schemars::JsonSchema;
45use serde::{Deserialize, Serialize};
46use sha2::Sha256;
47use std::fs::File;
48use std::{collections::VecDeque, path::PathBuf, sync::LazyLock};
49use tokio::sync::{
50 Mutex,
51 broadcast::{self, Receiver as Subscriber},
52};
53use tokio::task::JoinHandle;
54use tokio_util::sync::CancellationToken;
55
56const HEAD_CHANNEL_CAPACITY: usize = 10;
57
58pub const SAFE_HEIGHT_DISTANCE: ChainEpoch = 200;
72
73static CHAIN_EXPORT_LOCK: LazyLock<Mutex<Option<CancellationToken>>> =
74 LazyLock::new(|| Mutex::new(None));
75
76pub(crate) fn new_heads<DB: Blockstore + Send + Sync + 'static>(
83 data: Ctx<DB>,
84) -> (Subscriber<ApiHeaders>, JoinHandle<()>) {
85 let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
86
87 let mut subscriber = data.chain_store().publisher().subscribe();
88
89 let handle = tokio::spawn(async move {
90 while let Ok(v) = subscriber.recv().await {
91 let headers = match v {
92 HeadChange::Apply(ts) => {
93 match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await {
96 Ok(block) => ApiHeaders(block),
97 Err(e) => {
98 tracing::error!("Failed to convert tipset to eth block: {}", e);
99 continue;
100 }
101 }
102 }
103 };
104 if let Err(e) = sender.send(headers) {
105 tracing::error!("Failed to send headers: {}", e);
106 break;
107 }
108 }
109 });
110
111 (receiver, handle)
112}
113
114pub(crate) fn logs<DB: Blockstore + Sync + Send + 'static>(
121 ctx: &Ctx<DB>,
122 filter: Option<EthFilterSpec>,
123) -> (Subscriber<Vec<EthLog>>, JoinHandle<()>) {
124 let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
125
126 let mut subscriber = ctx.chain_store().publisher().subscribe();
127
128 let ctx = ctx.clone();
129
130 let handle = tokio::spawn(async move {
131 while let Ok(v) = subscriber.recv().await {
132 match v {
133 HeadChange::Apply(ts) => {
134 match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await {
135 Ok(logs) => {
136 if !logs.is_empty()
137 && let Err(e) = sender.send(logs)
138 {
139 tracing::error!(
140 "Failed to send logs for tipset {}: {}",
141 ts.key(),
142 e
143 );
144 break;
145 }
146 }
147 Err(e) => {
148 tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e);
149 }
150 }
151 }
152 }
153 }
154 });
155
156 (receiver, handle)
157}
158
159pub enum ChainGetFinalizedTipset {}
160impl RpcMethod<0> for ChainGetFinalizedTipset {
161 const NAME: &'static str = "Filecoin.ChainGetFinalizedTipSet";
162 const PARAM_NAMES: [&'static str; 0] = [];
163 const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::V1);
164 const PERMISSION: Permission = Permission::Read;
165 const DESCRIPTION: Option<&'static str> = Some(
166 "Returns the latest F3 finalized tipset, or falls back to EC finality if F3 is not operational on the node or if the F3 finalized tipset is further back than EC finalized tipset.",
167 );
168
169 type Params = ();
170 type Ok = Tipset;
171
172 async fn handle(
173 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
174 (): Self::Params,
175 _: &http::Extensions,
176 ) -> Result<Self::Ok, ServerError> {
177 let head = ctx.chain_store().heaviest_tipset();
178 let ec_finality_epoch = (head.epoch() - ctx.chain_config().policy.chain_finality).max(0);
179
180 match get_f3_finality_tipset(&ctx, ec_finality_epoch).await {
182 Ok(f3_tipset) => {
183 tracing::debug!("Using F3 finalized tipset at epoch {}", f3_tipset.epoch());
184 Ok(f3_tipset)
185 }
186 Err(_) => {
187 tracing::warn!("F3 finalization unavailable, falling back to EC finality");
189 let ec_tipset = ctx.chain_index().tipset_by_height(
190 ec_finality_epoch,
191 head,
192 ResolveNullTipset::TakeOlder,
193 )?;
194 Ok(ec_tipset)
195 }
196 }
197 }
198}
199
200async fn get_f3_finality_tipset<DB: Blockstore + Sync + Send + 'static>(
202 ctx: &Ctx<DB>,
203 ec_finality_epoch: ChainEpoch,
204) -> Result<Tipset> {
205 let f3_finalized_cert = crate::rpc::f3::F3GetLatestCertificate::get()
206 .await
207 .map_err(|e| anyhow::anyhow!("Failed to get F3 certificate: {}", e))?;
208
209 let f3_finalized_head = f3_finalized_cert.chain_head();
210 if f3_finalized_head.epoch < ec_finality_epoch {
211 return Err(anyhow::anyhow!(
212 "F3 finalized tipset epoch {} is further back than EC finalized tipset epoch {}",
213 f3_finalized_head.epoch,
214 ec_finality_epoch
215 ));
216 }
217
218 ctx.chain_index()
219 .load_required_tipset(&f3_finalized_head.key)
220 .map_err(|e| {
221 anyhow::anyhow!(
222 "Failed to load F3 finalized tipset at epoch {}: {}",
223 f3_finalized_head.epoch,
224 e
225 )
226 })
227}
228
229pub enum ChainGetMessage {}
230impl RpcMethod<1> for ChainGetMessage {
231 const NAME: &'static str = "Filecoin.ChainGetMessage";
232 const PARAM_NAMES: [&'static str; 1] = ["messageCid"];
233 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
234 const PERMISSION: Permission = Permission::Read;
235 const DESCRIPTION: Option<&'static str> = Some("Returns the message with the specified CID.");
236
237 type Params = (Cid,);
238 type Ok = FlattenedApiMessage;
239
240 async fn handle(
241 ctx: Ctx<impl Blockstore>,
242 (message_cid,): Self::Params,
243 _: &http::Extensions,
244 ) -> Result<Self::Ok, ServerError> {
245 let chain_message: ChainMessage = ctx
246 .store()
247 .get_cbor(&message_cid)?
248 .with_context(|| format!("can't find message with cid {message_cid}"))?;
249 let message = match chain_message {
250 ChainMessage::Signed(m) => m.into_message(),
251 ChainMessage::Unsigned(m) => m,
252 };
253
254 let cid = message.cid();
255 Ok(FlattenedApiMessage { message, cid })
256 }
257}
258
259pub enum ChainGetEvents {}
262impl RpcMethod<1> for ChainGetEvents {
263 const NAME: &'static str = "Filecoin.ChainGetEvents";
264 const PARAM_NAMES: [&'static str; 1] = ["rootCid"];
265 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
266 const PERMISSION: Permission = Permission::Read;
267 const DESCRIPTION: Option<&'static str> =
268 Some("Returns the events under the given event AMT root CID.");
269
270 type Params = (Cid,);
271 type Ok = Vec<Event>;
272 async fn handle(
273 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
274 (root_cid,): Self::Params,
275 _: &http::Extensions,
276 ) -> Result<Self::Ok, ServerError> {
277 let events = EthEventHandler::get_events_by_event_root(&ctx, &root_cid)?;
278 Ok(events)
279 }
280}
281
282pub enum ChainGetParentMessages {}
283impl RpcMethod<1> for ChainGetParentMessages {
284 const NAME: &'static str = "Filecoin.ChainGetParentMessages";
285 const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
286 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
287 const PERMISSION: Permission = Permission::Read;
288 const DESCRIPTION: Option<&'static str> =
289 Some("Returns the messages included in the blocks of the parent tipset.");
290
291 type Params = (Cid,);
292 type Ok = Vec<ApiMessage>;
293
294 async fn handle(
295 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
296 (block_cid,): Self::Params,
297 _: &http::Extensions,
298 ) -> Result<Self::Ok, ServerError> {
299 let store = ctx.store();
300 let block_header: CachingBlockHeader = store
301 .get_cbor(&block_cid)?
302 .with_context(|| format!("can't find block header with cid {block_cid}"))?;
303 if block_header.epoch == 0 {
304 Ok(vec![])
305 } else {
306 let parent_tipset = Tipset::load_required(store, &block_header.parents)?;
307 load_api_messages_from_tipset(&ctx, parent_tipset.key()).await
308 }
309 }
310}
311
312pub enum ChainGetParentReceipts {}
313impl RpcMethod<1> for ChainGetParentReceipts {
314 const NAME: &'static str = "Filecoin.ChainGetParentReceipts";
315 const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
316 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
317 const PERMISSION: Permission = Permission::Read;
318 const DESCRIPTION: Option<&'static str> =
319 Some("Returns the message receipts included in the blocks of the parent tipset.");
320
321 type Params = (Cid,);
322 type Ok = Vec<ApiReceipt>;
323
324 async fn handle(
325 ctx: Ctx<impl Blockstore>,
326 (block_cid,): Self::Params,
327 _: &http::Extensions,
328 ) -> Result<Self::Ok, ServerError> {
329 let store = ctx.store();
330 let block_header: CachingBlockHeader = store
331 .get_cbor(&block_cid)?
332 .with_context(|| format!("can't find block header with cid {block_cid}"))?;
333 if block_header.epoch == 0 {
334 return Ok(vec![]);
335 }
336 let receipts = Receipt::get_receipts(store, block_header.message_receipts)
337 .map_err(|_| {
338 ErrorObjectOwned::owned::<()>(
339 1,
340 format!(
341 "failed to root: ipld: could not find {}",
342 block_header.message_receipts
343 ),
344 None,
345 )
346 })?
347 .iter()
348 .map(|r| ApiReceipt {
349 exit_code: r.exit_code().into(),
350 return_data: r.return_data(),
351 gas_used: r.gas_used(),
352 events_root: r.events_root(),
353 })
354 .collect();
355
356 Ok(receipts)
357 }
358}
359
360pub enum ChainGetMessagesInTipset {}
361impl RpcMethod<1> for ChainGetMessagesInTipset {
362 const NAME: &'static str = "Filecoin.ChainGetMessagesInTipset";
363 const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
364 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
365 const PERMISSION: Permission = Permission::Read;
366
367 type Params = (ApiTipsetKey,);
368 type Ok = Vec<ApiMessage>;
369
370 async fn handle(
371 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
372 (ApiTipsetKey(tipset_key),): Self::Params,
373 _: &http::Extensions,
374 ) -> Result<Self::Ok, ServerError> {
375 let tipset = ctx
376 .chain_store()
377 .load_required_tipset_or_heaviest(&tipset_key)?;
378 load_api_messages_from_tipset(&ctx, tipset.key()).await
379 }
380}
381
382pub enum ChainPruneSnapshot {}
383impl RpcMethod<1> for ChainPruneSnapshot {
384 const NAME: &'static str = "Forest.SnapshotGC";
385 const PARAM_NAMES: [&'static str; 1] = ["blocking"];
386 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
387 const PERMISSION: Permission = Permission::Admin;
388
389 type Params = (bool,);
390 type Ok = ();
391
392 async fn handle(
393 _ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
394 (blocking,): Self::Params,
395 _: &http::Extensions,
396 ) -> Result<Self::Ok, ServerError> {
397 if let Some(gc) = crate::daemon::GLOBAL_SNAPSHOT_GC.get() {
398 let progress_rx = gc.trigger()?;
399 while blocking && progress_rx.recv_async().await.is_ok() {}
400 Ok(())
401 } else {
402 Err(anyhow::anyhow!("snapshot gc is not enabled").into())
403 }
404 }
405}
406
407pub enum ForestChainExport {}
408impl RpcMethod<1> for ForestChainExport {
409 const NAME: &'static str = "Forest.ChainExport";
410 const PARAM_NAMES: [&'static str; 1] = ["params"];
411 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
412 const PERMISSION: Permission = Permission::Read;
413
414 type Params = (ForestChainExportParams,);
415 type Ok = ApiExportResult;
416
417 async fn handle(
418 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
419 (params,): Self::Params,
420 _: &http::Extensions,
421 ) -> Result<Self::Ok, ServerError> {
422 let ForestChainExportParams {
423 version,
424 epoch,
425 recent_roots,
426 output_path,
427 tipset_keys: ApiTipsetKey(tsk),
428 include_receipts,
429 include_events,
430 skip_checksum,
431 dry_run,
432 } = params;
433
434 let token = CancellationToken::new();
435 {
436 let mut guard = CHAIN_EXPORT_LOCK.lock().await;
437 if guard.is_some() {
438 return Err(
439 anyhow::anyhow!("A chain export is still in progress. Cancel it with the export-cancel subcommand if needed.").into(),
440 );
441 }
442 *guard = Some(token.clone());
443 }
444 start_export();
445
446 let head = ctx.chain_store().load_required_tipset_or_heaviest(&tsk)?;
447 let start_ts =
448 ctx.chain_index()
449 .tipset_by_height(epoch, head, ResolveNullTipset::TakeOlder)?;
450
451 let options = Some(ExportOptions {
452 skip_checksum,
453 include_receipts,
454 include_events,
455 seen: Default::default(),
456 });
457 let writer = if dry_run {
458 tokio_util::either::Either::Left(VoidAsyncWriter)
459 } else {
460 tokio_util::either::Either::Right(tokio::fs::File::create(&output_path).await?)
461 };
462 let result = match version {
463 FilecoinSnapshotVersion::V1 => {
464 let db = ctx.store_owned();
465
466 let chain_export =
467 crate::chain::export::<Sha256>(&db, &start_ts, recent_roots, writer, options);
468
469 tokio::select! {
470 result = chain_export => {
471 result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex())))
472 },
473 _ = token.cancelled() => {
474 cancel_export();
475 tracing::warn!("Snapshot export was cancelled");
476 Ok(ApiExportResult::Cancelled)
477 },
478 }
479 }
480 FilecoinSnapshotVersion::V2 => {
481 let db = ctx.store_owned();
482
483 let f3_snap_tmp_path = {
484 let mut f3_snap_dir = output_path.clone();
485 let mut builder = tempfile::Builder::new();
486 let with_suffix = builder.suffix(".f3snap.bin");
487 if f3_snap_dir.pop() {
488 with_suffix.tempfile_in(&f3_snap_dir)
489 } else {
490 with_suffix.tempfile_in(".")
491 }?
492 .into_temp_path()
493 };
494 let f3_snap = {
495 match F3ExportLatestSnapshot::run(f3_snap_tmp_path.display().to_string()).await
496 {
497 Ok(cid) => Some((cid, File::open(&f3_snap_tmp_path)?)),
498 Err(e) => {
499 tracing::error!("Failed to export F3 snapshot: {e}");
500 None
501 }
502 }
503 };
504
505 let chain_export = crate::chain::export_v2::<Sha256, _>(
506 &db,
507 f3_snap,
508 &start_ts,
509 recent_roots,
510 writer,
511 options,
512 );
513
514 tokio::select! {
515 result = chain_export => {
516 result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex())))
517 },
518 _ = token.cancelled() => {
519 cancel_export();
520 tracing::warn!("Snapshot export was cancelled");
521 Ok(ApiExportResult::Cancelled)
522 },
523 }
524 }
525 };
526 end_export();
527 let mut guard = CHAIN_EXPORT_LOCK.lock().await;
529 *guard = None;
530 match result {
531 Ok(export_result) => Ok(export_result),
532 Err(e) => Err(anyhow::anyhow!(e).into()),
533 }
534 }
535}
536
537pub enum ForestChainExportStatus {}
538impl RpcMethod<0> for ForestChainExportStatus {
539 const NAME: &'static str = "Forest.ChainExportStatus";
540 const PARAM_NAMES: [&'static str; 0] = [];
541 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
542 const PERMISSION: Permission = Permission::Read;
543
544 type Params = ();
545 type Ok = ApiExportStatus;
546
547 async fn handle(
548 _ctx: Ctx<impl Blockstore>,
549 (): Self::Params,
550 _: &http::Extensions,
551 ) -> Result<Self::Ok, ServerError> {
552 let mutex = CHAIN_EXPORT_STATUS.lock();
553
554 let progress = if mutex.initial_epoch == 0 {
555 0.0
556 } else {
557 let p = 1.0 - ((mutex.epoch as f64) / (mutex.initial_epoch as f64));
558 if p.is_finite() {
559 p.clamp(0.0, 1.0)
560 } else {
561 0.0
562 }
563 };
564 let progress = (progress * 100.0).round() / 100.0;
566
567 let status = ApiExportStatus {
568 progress,
569 exporting: mutex.exporting,
570 cancelled: mutex.cancelled,
571 start_time: mutex.start_time,
572 };
573
574 Ok(status)
575 }
576}
577
578pub enum ForestChainExportCancel {}
579impl RpcMethod<0> for ForestChainExportCancel {
580 const NAME: &'static str = "Forest.ChainExportCancel";
581 const PARAM_NAMES: [&'static str; 0] = [];
582 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
583 const PERMISSION: Permission = Permission::Read;
584
585 type Params = ();
586 type Ok = bool;
587
588 async fn handle(
589 _ctx: Ctx<impl Blockstore>,
590 (): Self::Params,
591 _: &http::Extensions,
592 ) -> Result<Self::Ok, ServerError> {
593 if let Some(token) = CHAIN_EXPORT_LOCK.lock().await.as_ref() {
594 token.cancel();
595 return Ok(true);
596 }
597
598 Ok(false)
599 }
600}
601
602pub enum ForestChainExportDiff {}
603impl RpcMethod<1> for ForestChainExportDiff {
604 const NAME: &'static str = "Forest.ChainExportDiff";
605 const PARAM_NAMES: [&'static str; 1] = ["params"];
606 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all_with_v2();
607 const PERMISSION: Permission = Permission::Read;
608
609 type Params = (ForestChainExportDiffParams,);
610 type Ok = ();
611
612 async fn handle(
613 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
614 (params,): Self::Params,
615 _: &http::Extensions,
616 ) -> Result<Self::Ok, ServerError> {
617 let ForestChainExportDiffParams {
618 from,
619 to,
620 depth,
621 output_path,
622 } = params;
623
624 let _locked = CHAIN_EXPORT_LOCK.try_lock();
625 if _locked.is_err() {
626 return Err(
627 anyhow::anyhow!("Another chain export diff job is still in progress").into(),
628 );
629 }
630
631 let chain_finality = ctx.chain_config().policy.chain_finality;
632 if depth < chain_finality {
633 return Err(
634 anyhow::anyhow!(format!("depth must be greater than {chain_finality}")).into(),
635 );
636 }
637
638 let head = ctx.chain_store().heaviest_tipset();
639 let start_ts =
640 ctx.chain_index()
641 .tipset_by_height(from, head, ResolveNullTipset::TakeOlder)?;
642
643 crate::tool::subcommands::archive_cmd::do_export(
644 &ctx.store_owned(),
645 start_ts,
646 output_path,
647 None,
648 depth,
649 Some(to),
650 Some(chain_finality),
651 true,
652 )
653 .await?;
654
655 Ok(())
656 }
657}
658
659pub enum ChainExport {}
660impl RpcMethod<1> for ChainExport {
661 const NAME: &'static str = "Filecoin.ChainExport";
662 const PARAM_NAMES: [&'static str; 1] = ["params"];
663 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
664 const PERMISSION: Permission = Permission::Read;
665
666 type Params = (ChainExportParams,);
667 type Ok = ApiExportResult;
668
669 async fn handle(
670 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
671 (ChainExportParams {
672 epoch,
673 recent_roots,
674 output_path,
675 tipset_keys,
676 skip_checksum,
677 dry_run,
678 },): Self::Params,
679 ext: &http::Extensions,
680 ) -> Result<Self::Ok, ServerError> {
681 ForestChainExport::handle(
682 ctx,
683 (ForestChainExportParams {
684 version: FilecoinSnapshotVersion::V1,
685 epoch,
686 recent_roots,
687 output_path,
688 tipset_keys,
689 include_receipts: false,
690 include_events: false,
691 skip_checksum,
692 dry_run,
693 },),
694 ext,
695 )
696 .await
697 }
698}
699
700pub enum ChainReadObj {}
701impl RpcMethod<1> for ChainReadObj {
702 const NAME: &'static str = "Filecoin.ChainReadObj";
703 const PARAM_NAMES: [&'static str; 1] = ["cid"];
704 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
705 const PERMISSION: Permission = Permission::Read;
706 const DESCRIPTION: Option<&'static str> = Some(
707 "Reads IPLD nodes referenced by the specified CID from the chain blockstore and returns raw bytes.",
708 );
709
710 type Params = (Cid,);
711 type Ok = Vec<u8>;
712
713 async fn handle(
714 ctx: Ctx<impl Blockstore>,
715 (cid,): Self::Params,
716 _: &http::Extensions,
717 ) -> Result<Self::Ok, ServerError> {
718 let bytes = ctx
719 .store()
720 .get(&cid)?
721 .with_context(|| format!("can't find object with cid={cid}"))?;
722 Ok(bytes)
723 }
724}
725
726pub enum ChainHasObj {}
727impl RpcMethod<1> for ChainHasObj {
728 const NAME: &'static str = "Filecoin.ChainHasObj";
729 const PARAM_NAMES: [&'static str; 1] = ["cid"];
730 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
731 const PERMISSION: Permission = Permission::Read;
732 const DESCRIPTION: Option<&'static str> =
733 Some("Checks if a given CID exists in the chain blockstore.");
734
735 type Params = (Cid,);
736 type Ok = bool;
737
738 async fn handle(
739 ctx: Ctx<impl Blockstore>,
740 (cid,): Self::Params,
741 _: &http::Extensions,
742 ) -> Result<Self::Ok, ServerError> {
743 Ok(ctx.store().get(&cid)?.is_some())
744 }
745}
746
747pub enum ChainStatObj {}
750impl RpcMethod<2> for ChainStatObj {
751 const NAME: &'static str = "Filecoin.ChainStatObj";
752 const PARAM_NAMES: [&'static str; 2] = ["obj_cid", "base_cid"];
753 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
754 const PERMISSION: Permission = Permission::Read;
755
756 type Params = (Cid, Option<Cid>);
757 type Ok = ObjStat;
758
759 async fn handle(
760 ctx: Ctx<impl Blockstore>,
761 (obj_cid, base_cid): Self::Params,
762 _: &http::Extensions,
763 ) -> Result<Self::Ok, ServerError> {
764 let mut stats = ObjStat::default();
765 let mut seen = CidHashSet::default();
766 let mut walk = |cid, collect| {
767 let mut queue = VecDeque::new();
768 queue.push_back(cid);
769 while let Some(link_cid) = queue.pop_front() {
770 if !seen.insert(link_cid) {
771 continue;
772 }
773 let data = ctx.store().get(&link_cid)?;
774 if let Some(data) = data {
775 if collect {
776 stats.links += 1;
777 stats.size += data.len();
778 }
779 if matches!(link_cid.codec(), fvm_ipld_encoding::DAG_CBOR)
780 && let Ok(ipld) =
781 crate::utils::encoding::from_slice_with_fallback::<Ipld>(&data)
782 {
783 for ipld in DfsIter::new(ipld) {
784 if let Ipld::Link(cid) = ipld {
785 queue.push_back(cid);
786 }
787 }
788 }
789 }
790 }
791 anyhow::Ok(())
792 };
793 if let Some(base_cid) = base_cid {
794 walk(base_cid, false)?;
795 }
796 walk(obj_cid, true)?;
797 Ok(stats)
798 }
799}
800
801pub enum ChainGetBlockMessages {}
802impl RpcMethod<1> for ChainGetBlockMessages {
803 const NAME: &'static str = "Filecoin.ChainGetBlockMessages";
804 const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
805 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
806 const PERMISSION: Permission = Permission::Read;
807 const DESCRIPTION: Option<&'static str> =
808 Some("Returns all messages from the specified block.");
809
810 type Params = (Cid,);
811 type Ok = BlockMessages;
812
813 async fn handle(
814 ctx: Ctx<impl Blockstore>,
815 (block_cid,): Self::Params,
816 _: &http::Extensions,
817 ) -> Result<Self::Ok, ServerError> {
818 let blk: CachingBlockHeader = ctx.store().get_cbor_required(&block_cid)?;
819 let (unsigned_cids, signed_cids) = crate::chain::read_msg_cids(ctx.store(), &blk)?;
820 let (bls_msg, secp_msg) =
821 crate::chain::block_messages_from_cids(ctx.store(), &unsigned_cids, &signed_cids)?;
822 let cids = unsigned_cids.into_iter().chain(signed_cids).collect();
823
824 let ret = BlockMessages {
825 bls_msg,
826 secp_msg,
827 cids,
828 };
829 Ok(ret)
830 }
831}
832
833pub enum ChainGetPath {}
834impl RpcMethod<2> for ChainGetPath {
835 const NAME: &'static str = "Filecoin.ChainGetPath";
836 const PARAM_NAMES: [&'static str; 2] = ["from", "to"];
837 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
838 const PERMISSION: Permission = Permission::Read;
839 const DESCRIPTION: Option<&'static str> =
840 Some("Returns the path between the two specified tipsets.");
841
842 type Params = (TipsetKey, TipsetKey);
843 type Ok = Vec<PathChange>;
844
845 async fn handle(
846 ctx: Ctx<impl Blockstore>,
847 (from, to): Self::Params,
848 _: &http::Extensions,
849 ) -> Result<Self::Ok, ServerError> {
850 impl_chain_get_path(ctx.chain_store(), &from, &to).map_err(Into::into)
851 }
852}
853
854fn impl_chain_get_path(
872 chain_store: &ChainStore<impl Blockstore>,
873 from: &TipsetKey,
874 to: &TipsetKey,
875) -> anyhow::Result<Vec<PathChange>> {
876 let mut to_revert = chain_store
877 .load_required_tipset_or_heaviest(from)
878 .context("couldn't load `from`")?;
879 let mut to_apply = chain_store
880 .load_required_tipset_or_heaviest(to)
881 .context("couldn't load `to`")?;
882
883 let mut all_reverts = vec![];
884 let mut all_applies = vec![];
885
886 while to_revert != to_apply {
889 if to_revert.epoch() > to_apply.epoch() {
890 let next = chain_store
891 .load_required_tipset_or_heaviest(to_revert.parents())
892 .context("couldn't load ancestor of `from`")?;
893 all_reverts.push(to_revert);
894 to_revert = next;
895 } else {
896 let next = chain_store
897 .load_required_tipset_or_heaviest(to_apply.parents())
898 .context("couldn't load ancestor of `to`")?;
899 all_applies.push(to_apply);
900 to_apply = next;
901 }
902 }
903 Ok(all_reverts
904 .into_iter()
905 .map(PathChange::Revert)
906 .chain(all_applies.into_iter().rev().map(PathChange::Apply))
907 .collect())
908}
909
910pub enum ChainGetTipSetByHeight {}
914impl RpcMethod<2> for ChainGetTipSetByHeight {
915 const NAME: &'static str = "Filecoin.ChainGetTipSetByHeight";
916 const PARAM_NAMES: [&'static str; 2] = ["height", "tipsetKey"];
917 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
918 const PERMISSION: Permission = Permission::Read;
919 const DESCRIPTION: Option<&'static str> = Some("Returns the tipset at the specified height.");
920
921 type Params = (ChainEpoch, ApiTipsetKey);
922 type Ok = Tipset;
923
924 async fn handle(
925 ctx: Ctx<impl Blockstore>,
926 (height, ApiTipsetKey(tipset_key)): Self::Params,
927 _: &http::Extensions,
928 ) -> Result<Self::Ok, ServerError> {
929 let ts = ctx
930 .chain_store()
931 .load_required_tipset_or_heaviest(&tipset_key)?;
932 let tss = ctx
933 .chain_index()
934 .tipset_by_height(height, ts, ResolveNullTipset::TakeOlder)?;
935 Ok(tss)
936 }
937}
938
939pub enum ChainGetTipSetAfterHeight {}
940impl RpcMethod<2> for ChainGetTipSetAfterHeight {
941 const NAME: &'static str = "Filecoin.ChainGetTipSetAfterHeight";
942 const PARAM_NAMES: [&'static str; 2] = ["height", "tipsetKey"];
943 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
944 const PERMISSION: Permission = Permission::Read;
945 const DESCRIPTION: Option<&'static str> = Some(
946 "Looks back and returns the tipset at the specified epoch.
947 If there are no blocks at the given epoch,
948 returns the first non-nil tipset at a later epoch.",
949 );
950
951 type Params = (ChainEpoch, ApiTipsetKey);
952 type Ok = Tipset;
953
954 async fn handle(
955 ctx: Ctx<impl Blockstore>,
956 (height, ApiTipsetKey(tipset_key)): Self::Params,
957 _: &http::Extensions,
958 ) -> Result<Self::Ok, ServerError> {
959 let ts = ctx
960 .chain_store()
961 .load_required_tipset_or_heaviest(&tipset_key)?;
962 let tss = ctx
963 .chain_index()
964 .tipset_by_height(height, ts, ResolveNullTipset::TakeNewer)?;
965 Ok(tss)
966 }
967}
968
969pub enum ChainGetGenesis {}
970impl RpcMethod<0> for ChainGetGenesis {
971 const NAME: &'static str = "Filecoin.ChainGetGenesis";
972 const PARAM_NAMES: [&'static str; 0] = [];
973 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
974 const PERMISSION: Permission = Permission::Read;
975
976 type Params = ();
977 type Ok = Option<Tipset>;
978
979 async fn handle(
980 ctx: Ctx<impl Blockstore>,
981 (): Self::Params,
982 _: &http::Extensions,
983 ) -> Result<Self::Ok, ServerError> {
984 let genesis = ctx.chain_store().genesis_block_header();
985 Ok(Some(Tipset::from(genesis)))
986 }
987}
988
989pub enum ChainHead {}
990impl RpcMethod<0> for ChainHead {
991 const NAME: &'static str = "Filecoin.ChainHead";
992 const PARAM_NAMES: [&'static str; 0] = [];
993 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
994 const PERMISSION: Permission = Permission::Read;
995 const DESCRIPTION: Option<&'static str> = Some("Returns the chain head (heaviest tipset).");
996
997 type Params = ();
998 type Ok = Tipset;
999
1000 async fn handle(
1001 ctx: Ctx<impl Blockstore>,
1002 (): Self::Params,
1003 _: &http::Extensions,
1004 ) -> Result<Self::Ok, ServerError> {
1005 let heaviest = ctx.chain_store().heaviest_tipset();
1006 Ok(heaviest)
1007 }
1008}
1009
1010pub enum ChainGetBlock {}
1011impl RpcMethod<1> for ChainGetBlock {
1012 const NAME: &'static str = "Filecoin.ChainGetBlock";
1013 const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
1014 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1015 const PERMISSION: Permission = Permission::Read;
1016 const DESCRIPTION: Option<&'static str> = Some("Returns the block with the specified CID.");
1017
1018 type Params = (Cid,);
1019 type Ok = CachingBlockHeader;
1020
1021 async fn handle(
1022 ctx: Ctx<impl Blockstore>,
1023 (block_cid,): Self::Params,
1024 _: &http::Extensions,
1025 ) -> Result<Self::Ok, ServerError> {
1026 let blk: CachingBlockHeader = ctx.store().get_cbor_required(&block_cid)?;
1027 Ok(blk)
1028 }
1029}
1030
1031pub enum ChainGetTipSet {}
1032
1033impl RpcMethod<1> for ChainGetTipSet {
1034 const NAME: &'static str = "Filecoin.ChainGetTipSet";
1035 const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
1036 const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::{ V0 | V1 });
1037 const PERMISSION: Permission = Permission::Read;
1038 const DESCRIPTION: Option<&'static str> = Some("Returns the tipset with the specified CID.");
1039
1040 type Params = (ApiTipsetKey,);
1041 type Ok = Tipset;
1042
1043 async fn handle(
1044 ctx: Ctx<impl Blockstore>,
1045 (ApiTipsetKey(tsk),): Self::Params,
1046 _: &http::Extensions,
1047 ) -> Result<Self::Ok, ServerError> {
1048 if let Some(tsk) = &tsk {
1049 let ts = ctx.chain_index().load_required_tipset(tsk)?;
1050 Ok(ts)
1051 } else {
1052 Err(anyhow::anyhow!(
1054 "TipsetKey cannot be empty (NewTipSet called with zero length array of blocks)"
1055 )
1056 .into())
1057 }
1058 }
1059}
1060
1061pub enum ChainGetTipSetV2 {}
1062
1063impl ChainGetTipSetV2 {
1064 pub async fn get_tipset_by_anchor(
1065 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1066 anchor: &Option<TipsetAnchor>,
1067 ) -> anyhow::Result<Tipset> {
1068 if let Some(anchor) = anchor {
1069 match (&anchor.key.0, &anchor.tag) {
1070 (None, None) => Ok(ctx.state_manager.heaviest_tipset()),
1072 (Some(tsk), None) => Ok(ctx.chain_index().load_required_tipset(tsk)?),
1074 (None, Some(tag)) => Self::get_tipset_by_tag(ctx, *tag).await,
1075 _ => {
1076 anyhow::bail!("invalid anchor")
1077 }
1078 }
1079 } else {
1080 Self::get_tipset_by_tag(ctx, TipsetTag::Finalized).await
1082 }
1083 }
1084
1085 pub async fn get_tipset_by_tag(
1086 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1087 tag: TipsetTag,
1088 ) -> anyhow::Result<Tipset> {
1089 match tag {
1090 TipsetTag::Latest => Ok(ctx.state_manager.heaviest_tipset()),
1091 TipsetTag::Finalized => Self::get_latest_finalized_tipset(ctx).await,
1092 TipsetTag::Safe => Self::get_latest_safe_tipset(ctx).await,
1093 }
1094 }
1095
1096 pub async fn get_latest_safe_tipset(
1097 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1098 ) -> anyhow::Result<Tipset> {
1099 let finalized = Self::get_latest_finalized_tipset(ctx).await?;
1100 let head = ctx.chain_store().heaviest_tipset();
1101 let safe_height = (head.epoch() - SAFE_HEIGHT_DISTANCE).max(0);
1102 if finalized.epoch() >= safe_height {
1103 Ok(finalized)
1104 } else {
1105 Ok(ctx.chain_index().tipset_by_height(
1106 safe_height,
1107 head,
1108 ResolveNullTipset::TakeOlder,
1109 )?)
1110 }
1111 }
1112
1113 pub async fn get_latest_finalized_tipset(
1114 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1115 ) -> anyhow::Result<Tipset> {
1116 let Ok(f3_finalized_cert) = crate::rpc::f3::F3GetLatestCertificate::get().await else {
1117 return Self::get_ec_finalized_tipset(ctx);
1118 };
1119
1120 let f3_finalized_head = f3_finalized_cert.chain_head();
1121 let head = ctx.chain_store().heaviest_tipset();
1122 if head.epoch() > f3_finalized_head.epoch + ctx.chain_config().policy.chain_finality {
1124 return Self::get_ec_finalized_tipset(ctx);
1125 }
1126
1127 let ts = ctx
1128 .chain_index()
1129 .load_required_tipset(&f3_finalized_head.key)
1130 .map_err(|e| {
1131 anyhow::anyhow!(
1132 "Failed to load F3 finalized tipset at epoch {} with key {}: {e}",
1133 f3_finalized_head.epoch,
1134 f3_finalized_head.key,
1135 )
1136 })?;
1137 Ok(ts)
1138 }
1139
1140 pub fn get_ec_finalized_tipset(ctx: &Ctx<impl Blockstore>) -> anyhow::Result<Tipset> {
1141 let head = ctx.chain_store().heaviest_tipset();
1142 let ec_finality_epoch = (head.epoch() - ctx.chain_config().policy.chain_finality).max(0);
1143 Ok(ctx.chain_index().tipset_by_height(
1144 ec_finality_epoch,
1145 head,
1146 ResolveNullTipset::TakeOlder,
1147 )?)
1148 }
1149
1150 pub async fn get_tipset(
1151 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1152 selector: &TipsetSelector,
1153 ) -> anyhow::Result<Tipset> {
1154 selector.validate()?;
1155 if let ApiTipsetKey(Some(tsk)) = &selector.key {
1157 let ts = ctx.chain_index().load_required_tipset(tsk)?;
1158 return Ok(ts);
1159 }
1160 if let Some(height) = &selector.height {
1162 let anchor = Self::get_tipset_by_anchor(ctx, &height.anchor).await?;
1163 let ts = ctx.chain_index().tipset_by_height(
1164 height.at,
1165 anchor,
1166 height.resolve_null_tipset_policy(),
1167 )?;
1168 return Ok(ts);
1169 }
1170 if let Some(tag) = &selector.tag {
1172 let ts = Self::get_tipset_by_tag(ctx, *tag).await?;
1173 return Ok(ts);
1174 }
1175 anyhow::bail!("no tipset found for selector")
1176 }
1177}
1178
1179impl RpcMethod<1> for ChainGetTipSetV2 {
1180 const NAME: &'static str = "Filecoin.ChainGetTipSet";
1181 const PARAM_NAMES: [&'static str; 1] = ["tipsetSelector"];
1182 const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::{ V2 });
1183 const PERMISSION: Permission = Permission::Read;
1184 const DESCRIPTION: Option<&'static str> = Some("Returns the tipset with the specified CID.");
1185
1186 type Params = (TipsetSelector,);
1187 type Ok = Tipset;
1188
1189 async fn handle(
1190 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
1191 (selector,): Self::Params,
1192 _: &http::Extensions,
1193 ) -> Result<Self::Ok, ServerError> {
1194 Ok(Self::get_tipset(&ctx, &selector).await?)
1195 }
1196}
1197
1198pub enum ChainSetHead {}
1199impl RpcMethod<1> for ChainSetHead {
1200 const NAME: &'static str = "Filecoin.ChainSetHead";
1201 const PARAM_NAMES: [&'static str; 1] = ["tsk"];
1202 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1203 const PERMISSION: Permission = Permission::Admin;
1204
1205 type Params = (TipsetKey,);
1206 type Ok = ();
1207
1208 async fn handle(
1209 ctx: Ctx<impl Blockstore>,
1210 (tsk,): Self::Params,
1211 _: &http::Extensions,
1212 ) -> Result<Self::Ok, ServerError> {
1213 let new_head = ctx.chain_index().load_required_tipset(&tsk)?;
1217 let mut current = ctx.chain_store().heaviest_tipset();
1218 while current.epoch() >= new_head.epoch() {
1219 for cid in current.key().to_cids() {
1220 ctx.chain_store().unmark_block_as_validated(&cid);
1221 }
1222 let parents = ¤t.block_headers().first().parents;
1223 current = ctx.chain_index().load_required_tipset(parents)?;
1224 }
1225 ctx.chain_store()
1226 .set_heaviest_tipset(new_head)
1227 .map_err(Into::into)
1228 }
1229}
1230
1231pub enum ChainGetMinBaseFee {}
1232impl RpcMethod<1> for ChainGetMinBaseFee {
1233 const NAME: &'static str = "Forest.ChainGetMinBaseFee";
1234 const PARAM_NAMES: [&'static str; 1] = ["lookback"];
1235 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1236 const PERMISSION: Permission = Permission::Read;
1237
1238 type Params = (u32,);
1239 type Ok = String;
1240
1241 async fn handle(
1242 ctx: Ctx<impl Blockstore>,
1243 (lookback,): Self::Params,
1244 _: &http::Extensions,
1245 ) -> Result<Self::Ok, ServerError> {
1246 let mut current = ctx.chain_store().heaviest_tipset();
1247 let mut min_base_fee = current.block_headers().first().parent_base_fee.clone();
1248
1249 for _ in 0..lookback {
1250 let parents = ¤t.block_headers().first().parents;
1251 current = ctx.chain_index().load_required_tipset(parents)?;
1252
1253 min_base_fee =
1254 min_base_fee.min(current.block_headers().first().parent_base_fee.to_owned());
1255 }
1256
1257 Ok(min_base_fee.atto().to_string())
1258 }
1259}
1260
1261pub enum ChainTipSetWeight {}
1262impl RpcMethod<1> for ChainTipSetWeight {
1263 const NAME: &'static str = "Filecoin.ChainTipSetWeight";
1264 const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
1265 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1266 const PERMISSION: Permission = Permission::Read;
1267 const DESCRIPTION: Option<&'static str> = Some("Returns the weight of the specified tipset.");
1268
1269 type Params = (ApiTipsetKey,);
1270 type Ok = BigInt;
1271
1272 async fn handle(
1273 ctx: Ctx<impl Blockstore>,
1274 (ApiTipsetKey(tipset_key),): Self::Params,
1275 _: &http::Extensions,
1276 ) -> Result<Self::Ok, ServerError> {
1277 let ts = ctx
1278 .chain_store()
1279 .load_required_tipset_or_heaviest(&tipset_key)?;
1280 let weight = crate::fil_cns::weight(ctx.store(), &ts)?;
1281 Ok(weight)
1282 }
1283}
1284
1285pub enum ChainGetTipsetByParentState {}
1286impl RpcMethod<1> for ChainGetTipsetByParentState {
1287 const NAME: &'static str = "Forest.ChainGetTipsetByParentState";
1288 const PARAM_NAMES: [&'static str; 1] = ["parentState"];
1289 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1290 const PERMISSION: Permission = Permission::Read;
1291
1292 type Params = (Cid,);
1293 type Ok = Option<Tipset>;
1294
1295 async fn handle(
1296 ctx: Ctx<impl Blockstore>,
1297 (parent_state,): Self::Params,
1298 _: &http::Extensions,
1299 ) -> Result<Self::Ok, ServerError> {
1300 Ok(ctx
1301 .chain_store()
1302 .heaviest_tipset()
1303 .chain(ctx.store())
1304 .find(|ts| ts.parent_state() == &parent_state)
1305 .clone())
1306 }
1307}
1308
1309pub const CHAIN_NOTIFY: &str = "Filecoin.ChainNotify";
1310pub(crate) fn chain_notify<DB: Blockstore>(
1311 _params: Params<'_>,
1312 data: &crate::rpc::RPCState<DB>,
1313) -> Subscriber<Vec<ApiHeadChange>> {
1314 let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
1315
1316 let current = data.chain_store().heaviest_tipset();
1318 let (change, tipset) = ("current".into(), current);
1319 sender
1320 .send(vec![ApiHeadChange { change, tipset }])
1321 .expect("receiver is not dropped");
1322
1323 let mut subscriber = data.chain_store().publisher().subscribe();
1324
1325 tokio::spawn(async move {
1326 let _ = subscriber.recv().await;
1328
1329 while let Ok(v) = subscriber.recv().await {
1330 let (change, tipset) = match v {
1331 HeadChange::Apply(ts) => ("apply".into(), ts),
1332 };
1333
1334 if sender.send(vec![ApiHeadChange { change, tipset }]).is_err() {
1335 break;
1336 }
1337 }
1338 });
1339 receiver
1340}
1341
1342async fn load_api_messages_from_tipset<DB: Blockstore + Send + Sync + 'static>(
1343 ctx: &crate::rpc::RPCState<DB>,
1344 tipset_keys: &TipsetKey,
1345) -> Result<Vec<ApiMessage>, ServerError> {
1346 static SHOULD_BACKFILL: LazyLock<bool> = LazyLock::new(|| {
1347 let enabled = is_env_truthy("FOREST_RPC_BACKFILL_FULL_TIPSET_FROM_NETWORK");
1348 if enabled {
1349 tracing::warn!(
1350 "Full tipset backfilling from network is enabled via FOREST_RPC_BACKFILL_FULL_TIPSET_FROM_NETWORK, excessive disk and bandwidth usage is expected."
1351 );
1352 }
1353 enabled
1354 });
1355 let full_tipset = if *SHOULD_BACKFILL {
1356 get_full_tipset(
1357 &ctx.sync_network_context,
1358 ctx.chain_store(),
1359 None,
1360 tipset_keys,
1361 )
1362 .await?
1363 } else {
1364 load_full_tipset(ctx.chain_store(), tipset_keys)?
1365 };
1366 let blocks = full_tipset.into_blocks();
1367 let mut messages = vec![];
1368 let mut seen = CidHashSet::default();
1369 for Block {
1370 bls_messages,
1371 secp_messages,
1372 ..
1373 } in blocks
1374 {
1375 for message in bls_messages {
1376 let cid = message.cid();
1377 if seen.insert(cid) {
1378 messages.push(ApiMessage { cid, message });
1379 }
1380 }
1381
1382 for msg in secp_messages {
1383 let cid = msg.cid();
1384 if seen.insert(cid) {
1385 messages.push(ApiMessage {
1386 cid,
1387 message: msg.message,
1388 });
1389 }
1390 }
1391 }
1392
1393 Ok(messages)
1394}
1395
1396#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
1397pub struct BlockMessages {
1398 #[serde(rename = "BlsMessages", with = "crate::lotus_json")]
1399 #[schemars(with = "LotusJson<Vec<Message>>")]
1400 pub bls_msg: Vec<Message>,
1401 #[serde(rename = "SecpkMessages", with = "crate::lotus_json")]
1402 #[schemars(with = "LotusJson<Vec<SignedMessage>>")]
1403 pub secp_msg: Vec<SignedMessage>,
1404 #[serde(rename = "Cids", with = "crate::lotus_json")]
1405 #[schemars(with = "LotusJson<Vec<Cid>>")]
1406 pub cids: Vec<Cid>,
1407}
1408lotus_json_with_self!(BlockMessages);
1409
1410#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, JsonSchema)]
1411#[serde(rename_all = "PascalCase")]
1412pub struct ApiReceipt {
1413 pub exit_code: ExitCode,
1415 #[serde(rename = "Return", with = "crate::lotus_json")]
1417 #[schemars(with = "LotusJson<RawBytes>")]
1418 pub return_data: RawBytes,
1419 pub gas_used: u64,
1421 #[serde(with = "crate::lotus_json")]
1422 #[schemars(with = "LotusJson<Option<Cid>>")]
1423 pub events_root: Option<Cid>,
1424}
1425
1426lotus_json_with_self!(ApiReceipt);
1427
1428#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug, Eq, PartialEq)]
1429#[serde(rename_all = "PascalCase")]
1430pub struct ApiMessage {
1431 #[serde(with = "crate::lotus_json")]
1432 #[schemars(with = "LotusJson<Cid>")]
1433 pub cid: Cid,
1434 #[serde(with = "crate::lotus_json")]
1435 #[schemars(with = "LotusJson<Message>")]
1436 pub message: Message,
1437}
1438
1439lotus_json_with_self!(ApiMessage);
1440
1441#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug, Eq, PartialEq)]
1442pub struct FlattenedApiMessage {
1443 #[serde(flatten, with = "crate::lotus_json")]
1444 #[schemars(with = "LotusJson<Message>")]
1445 pub message: Message,
1446 #[serde(rename = "CID", with = "crate::lotus_json")]
1447 #[schemars(with = "LotusJson<Cid>")]
1448 pub cid: Cid,
1449}
1450
1451lotus_json_with_self!(FlattenedApiMessage);
1452
1453#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1454pub struct ForestChainExportParams {
1455 pub version: FilecoinSnapshotVersion,
1456 pub epoch: ChainEpoch,
1457 pub recent_roots: i64,
1458 pub output_path: PathBuf,
1459 #[schemars(with = "LotusJson<ApiTipsetKey>")]
1460 #[serde(with = "crate::lotus_json")]
1461 pub tipset_keys: ApiTipsetKey,
1462 #[serde(default)]
1463 pub include_receipts: bool,
1464 #[serde(default)]
1465 pub include_events: bool,
1466 pub skip_checksum: bool,
1467 pub dry_run: bool,
1468}
1469lotus_json_with_self!(ForestChainExportParams);
1470
1471#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1472pub struct ForestChainExportDiffParams {
1473 pub from: ChainEpoch,
1474 pub to: ChainEpoch,
1475 pub depth: i64,
1476 pub output_path: PathBuf,
1477}
1478lotus_json_with_self!(ForestChainExportDiffParams);
1479
1480#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1481pub struct ChainExportParams {
1482 pub epoch: ChainEpoch,
1483 pub recent_roots: i64,
1484 pub output_path: PathBuf,
1485 #[schemars(with = "LotusJson<ApiTipsetKey>")]
1486 #[serde(with = "crate::lotus_json")]
1487 pub tipset_keys: ApiTipsetKey,
1488 pub skip_checksum: bool,
1489 pub dry_run: bool,
1490}
1491lotus_json_with_self!(ChainExportParams);
1492
1493#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
1494#[serde(rename_all = "PascalCase")]
1495pub struct ApiHeadChange {
1496 #[serde(rename = "Type")]
1497 pub change: String,
1498 #[serde(rename = "Val", with = "crate::lotus_json")]
1499 #[schemars(with = "LotusJson<Tipset>")]
1500 pub tipset: Tipset,
1501}
1502lotus_json_with_self!(ApiHeadChange);
1503
1504#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
1505#[serde(tag = "Type", content = "Val", rename_all = "snake_case")]
1506pub enum PathChange<T = Tipset> {
1507 Revert(T),
1508 Apply(T),
1509}
1510impl HasLotusJson for PathChange {
1511 type LotusJson = PathChange<<Tipset as HasLotusJson>::LotusJson>;
1512
1513 #[cfg(test)]
1514 fn snapshots() -> Vec<(serde_json::Value, Self)> {
1515 use serde_json::json;
1516 vec![(
1517 json!({
1518 "Type": "revert",
1519 "Val": {
1520 "Blocks": [
1521 {
1522 "BeaconEntries": null,
1523 "ForkSignaling": 0,
1524 "Height": 0,
1525 "Messages": { "/": "baeaaaaa" },
1526 "Miner": "f00",
1527 "ParentBaseFee": "0",
1528 "ParentMessageReceipts": { "/": "baeaaaaa" },
1529 "ParentStateRoot": { "/":"baeaaaaa" },
1530 "ParentWeight": "0",
1531 "Parents": [{"/":"bafyreiaqpwbbyjo4a42saasj36kkrpv4tsherf2e7bvezkert2a7dhonoi"}],
1532 "Timestamp": 0,
1533 "WinPoStProof": null
1534 }
1535 ],
1536 "Cids": [
1537 { "/": "bafy2bzaceag62hjj3o43lf6oyeox3fvg5aqkgl5zagbwpjje3ajwg6yw4iixk" }
1538 ],
1539 "Height": 0
1540 }
1541 }),
1542 Self::Revert(RawBlockHeader::default().into()),
1543 )]
1544 }
1545
1546 fn into_lotus_json(self) -> Self::LotusJson {
1547 match self {
1548 PathChange::Revert(it) => PathChange::Revert(it.into_lotus_json()),
1549 PathChange::Apply(it) => PathChange::Apply(it.into_lotus_json()),
1550 }
1551 }
1552
1553 fn from_lotus_json(lotus_json: Self::LotusJson) -> Self {
1554 match lotus_json {
1555 PathChange::Revert(it) => PathChange::Revert(Tipset::from_lotus_json(it)),
1556 PathChange::Apply(it) => PathChange::Apply(Tipset::from_lotus_json(it)),
1557 }
1558 }
1559}
1560
1561#[cfg(test)]
1562impl<T> quickcheck::Arbitrary for PathChange<T>
1563where
1564 T: quickcheck::Arbitrary,
1565{
1566 fn arbitrary(g: &mut quickcheck::Gen) -> Self {
1567 let inner = T::arbitrary(g);
1568 g.choose(&[PathChange::Apply(inner.clone()), PathChange::Revert(inner)])
1569 .unwrap()
1570 .clone()
1571 }
1572}
1573
1574#[test]
1575fn snapshots() {
1576 assert_all_snapshots::<PathChange>()
1577}
1578
1579#[cfg(test)]
1580quickcheck::quickcheck! {
1581 fn quickcheck(val: PathChange) -> () {
1582 assert_unchanged_via_json(val)
1583 }
1584}
1585
1586#[cfg(test)]
1587mod tests {
1588 use super::*;
1589 use crate::{
1590 blocks::{Chain4U, RawBlockHeader, chain4u},
1591 db::{
1592 MemoryDB,
1593 car::{AnyCar, ManyCar},
1594 },
1595 networks::{self, ChainConfig},
1596 };
1597 use PathChange::{Apply, Revert};
1598 use itertools::Itertools as _;
1599 use std::sync::Arc;
1600
1601 #[test]
1602 fn revert_to_ancestor_linear() {
1603 let store = ChainStore::calibnet();
1604 chain4u! {
1605 in store.blockstore();
1606 [_genesis = store.genesis_block_header()]
1607 -> [a] -> [b] -> [c, d] -> [e]
1608 };
1609
1610 assert_path_change(&store, b, a, [Revert(&[b])]);
1612
1613 assert_path_change(&store, [c, d], a, [Revert(&[c, d][..]), Revert(&[b])]);
1615
1616 assert_path_change(&store, e, [c, d], [Revert(e)]);
1618
1619 assert_path_change(&store, e, b, [Revert(&[e][..]), Revert(&[c, d])]);
1621 }
1622
1623 #[test]
1626 fn incomplete_tipsets() {
1627 let store = ChainStore::calibnet();
1628 chain4u! {
1629 in store.blockstore();
1630 [_genesis = store.genesis_block_header()]
1631 -> [a, b] -> [c] -> [d, _e] };
1633
1634 assert_path_change(
1636 &store,
1637 a,
1638 c,
1639 [
1640 Revert(&[a][..]), Apply(&[a, b]), Apply(&[c]), ],
1644 );
1645
1646 assert_path_change(&store, c, d, [Apply(d)]);
1648
1649 assert_path_change(&store, d, c, [Revert(d)]);
1651
1652 assert_path_change(
1654 &store,
1655 c,
1656 a,
1657 [
1658 Revert(&[c][..]),
1659 Revert(&[a, b]), Apply(&[a]), ],
1662 );
1663 }
1664
1665 #[test]
1666 fn apply_to_descendant_linear() {
1667 let store = ChainStore::calibnet();
1668 chain4u! {
1669 in store.blockstore();
1670 [_genesis = store.genesis_block_header()]
1671 -> [a] -> [b] -> [c, d] -> [e]
1672 };
1673
1674 assert_path_change(&store, a, b, [Apply(&[b])]);
1676
1677 assert_path_change(&store, [c, d], e, [Apply(e)]);
1679
1680 assert_path_change(&store, b, [c, d], [Apply([c, d])]);
1682
1683 assert_path_change(&store, b, e, [Apply(&[c, d][..]), Apply(&[e])]);
1685 }
1686
1687 #[test]
1688 fn cross_fork_simple() {
1689 let store = ChainStore::calibnet();
1690 chain4u! {
1691 in store.blockstore();
1692 [_genesis = store.genesis_block_header()]
1693 -> [a] -> [b1] -> [c1]
1694 };
1695 chain4u! {
1696 from [a] in store.blockstore();
1697 [b2] -> [c2]
1698 };
1699
1700 assert_path_change(&store, b1, b2, [Revert(b1), Apply(b2)]);
1702
1703 assert_path_change(&store, b1, c2, [Revert(b1), Apply(b2), Apply(c2)]);
1705
1706 let _ = (a, c1);
1707 }
1708
1709 impl ChainStore<Chain4U<ManyCar>> {
1710 fn _load(genesis_car: &'static [u8], genesis_cid: Cid) -> Self {
1711 let db = Arc::new(Chain4U::with_blockstore(
1712 ManyCar::new(MemoryDB::default())
1713 .with_read_only(AnyCar::new(genesis_car).unwrap())
1714 .unwrap(),
1715 ));
1716 let genesis_block_header = db.get_cbor(&genesis_cid).unwrap().unwrap();
1717 ChainStore::new(
1718 db,
1719 Arc::new(MemoryDB::default()),
1720 Arc::new(MemoryDB::default()),
1721 Arc::new(ChainConfig::calibnet()),
1722 genesis_block_header,
1723 )
1724 .unwrap()
1725 }
1726 pub fn calibnet() -> Self {
1727 Self::_load(
1728 networks::calibnet::DEFAULT_GENESIS,
1729 *networks::calibnet::GENESIS_CID,
1730 )
1731 }
1732 }
1733
1734 trait MakeTipset {
1736 fn make_tipset(self) -> Tipset;
1737 }
1738
1739 impl MakeTipset for &RawBlockHeader {
1740 fn make_tipset(self) -> Tipset {
1741 Tipset::from(CachingBlockHeader::new(self.clone()))
1742 }
1743 }
1744
1745 impl<const N: usize> MakeTipset for [&RawBlockHeader; N] {
1746 fn make_tipset(self) -> Tipset {
1747 self.as_slice().make_tipset()
1748 }
1749 }
1750
1751 impl<const N: usize> MakeTipset for &[&RawBlockHeader; N] {
1752 fn make_tipset(self) -> Tipset {
1753 self.as_slice().make_tipset()
1754 }
1755 }
1756
1757 impl MakeTipset for &[&RawBlockHeader] {
1758 fn make_tipset(self) -> Tipset {
1759 Tipset::new(self.iter().cloned().cloned()).unwrap()
1760 }
1761 }
1762
1763 #[track_caller]
1764 fn assert_path_change<T: MakeTipset>(
1765 store: &ChainStore<impl Blockstore>,
1766 from: impl MakeTipset,
1767 to: impl MakeTipset,
1768 expected: impl IntoIterator<Item = PathChange<T>>,
1769 ) {
1770 fn print(path_change: &PathChange) {
1771 let it = match path_change {
1772 Revert(it) => {
1773 print!("Revert(");
1774 it
1775 }
1776 Apply(it) => {
1777 print!(" Apply(");
1778 it
1779 }
1780 };
1781 println!(
1782 "epoch = {}, key.cid = {})",
1783 it.epoch(),
1784 it.key().cid().unwrap()
1785 )
1786 }
1787
1788 let actual =
1789 impl_chain_get_path(store, from.make_tipset().key(), to.make_tipset().key()).unwrap();
1790 let expected = expected
1791 .into_iter()
1792 .map(|change| match change {
1793 PathChange::Revert(it) => PathChange::Revert(it.make_tipset()),
1794 PathChange::Apply(it) => PathChange::Apply(it.make_tipset()),
1795 })
1796 .collect_vec();
1797 if expected != actual {
1798 println!("SUMMARY");
1799 println!("=======");
1800 println!("expected:");
1801 for it in &expected {
1802 print(it)
1803 }
1804 println!();
1805 println!("actual:");
1806 for it in &actual {
1807 print(it)
1808 }
1809 println!("=======\n")
1810 }
1811 assert_eq!(
1812 expected, actual,
1813 "expected change (left) does not match actual change (right)"
1814 )
1815 }
1816}