1pub mod types;
5use enumflags2::{BitFlags, make_bitflags};
6use types::*;
7
8#[cfg(test)]
9use crate::blocks::RawBlockHeader;
10use crate::blocks::{Block, CachingBlockHeader, Tipset, TipsetKey};
11use crate::chain::index::ResolveNullTipset;
12use crate::chain::{ChainStore, ExportOptions, FilecoinSnapshotVersion, HeadChange};
13use crate::chain_sync::{get_full_tipset, load_full_tipset};
14use crate::cid_collections::CidHashSet;
15use crate::ipld::DfsIter;
16use crate::ipld::{CHAIN_EXPORT_STATUS, cancel_export, end_export, start_export};
17use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self};
18#[cfg(test)]
19use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json};
20use crate::message::{ChainMessage, SignedMessage};
21use crate::rpc::eth::Block as EthBlock;
22use crate::rpc::eth::{
23 EthLog, TxInfo, eth_logs_with_filter, types::ApiHeaders, types::EthFilterSpec,
24};
25use crate::rpc::f3::F3ExportLatestSnapshot;
26use crate::rpc::types::*;
27use crate::rpc::{ApiPaths, Ctx, EthEventHandler, Permission, RpcMethod, ServerError};
28use crate::shim::clock::ChainEpoch;
29use crate::shim::error::ExitCode;
30use crate::shim::executor::Receipt;
31use crate::shim::message::Message;
32use crate::utils::db::CborStoreExt as _;
33use crate::utils::io::VoidAsyncWriter;
34use crate::utils::misc::env::is_env_truthy;
35use anyhow::{Context as _, Result};
36use cid::Cid;
37use fvm_ipld_blockstore::Blockstore;
38use fvm_ipld_encoding::{CborStore, RawBytes};
39use hex::ToHex;
40use ipld_core::ipld::Ipld;
41use jsonrpsee::types::Params;
42use jsonrpsee::types::error::ErrorObjectOwned;
43use num::BigInt;
44use schemars::JsonSchema;
45use serde::{Deserialize, Serialize};
46use sha2::Sha256;
47use std::fs::File;
48use std::{collections::VecDeque, path::PathBuf, sync::LazyLock};
49use tokio::sync::{
50 Mutex,
51 broadcast::{self, Receiver as Subscriber},
52};
53use tokio::task::JoinHandle;
54use tokio_util::sync::CancellationToken;
55
56const HEAD_CHANNEL_CAPACITY: usize = 10;
57
58const SAFE_HEIGHT_DISTANCE: ChainEpoch = 200;
72
73static CHAIN_EXPORT_LOCK: LazyLock<Mutex<Option<CancellationToken>>> =
74 LazyLock::new(|| Mutex::new(None));
75
76pub(crate) fn new_heads<DB: Blockstore + Send + Sync + 'static>(
83 data: Ctx<DB>,
84) -> (Subscriber<ApiHeaders>, JoinHandle<()>) {
85 let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
86
87 let mut subscriber = data.chain_store().publisher().subscribe();
88
89 let handle = tokio::spawn(async move {
90 while let Ok(v) = subscriber.recv().await {
91 let headers = match v {
92 HeadChange::Apply(ts) => {
93 match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await {
96 Ok(block) => ApiHeaders(block),
97 Err(e) => {
98 tracing::error!("Failed to convert tipset to eth block: {}", e);
99 continue;
100 }
101 }
102 }
103 };
104 if let Err(e) = sender.send(headers) {
105 tracing::error!("Failed to send headers: {}", e);
106 break;
107 }
108 }
109 });
110
111 (receiver, handle)
112}
113
114pub(crate) fn logs<DB: Blockstore + Sync + Send + 'static>(
121 ctx: &Ctx<DB>,
122 filter: Option<EthFilterSpec>,
123) -> (Subscriber<Vec<EthLog>>, JoinHandle<()>) {
124 let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
125
126 let mut subscriber = ctx.chain_store().publisher().subscribe();
127
128 let ctx = ctx.clone();
129
130 let handle = tokio::spawn(async move {
131 while let Ok(v) = subscriber.recv().await {
132 match v {
133 HeadChange::Apply(ts) => {
134 match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await {
135 Ok(logs) => {
136 if !logs.is_empty()
137 && let Err(e) = sender.send(logs)
138 {
139 tracing::error!(
140 "Failed to send logs for tipset {}: {}",
141 ts.key(),
142 e
143 );
144 break;
145 }
146 }
147 Err(e) => {
148 tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e);
149 }
150 }
151 }
152 }
153 }
154 });
155
156 (receiver, handle)
157}
158
159pub enum ChainGetFinalizedTipset {}
160impl RpcMethod<0> for ChainGetFinalizedTipset {
161 const NAME: &'static str = "Filecoin.ChainGetFinalizedTipSet";
162 const PARAM_NAMES: [&'static str; 0] = [];
163 const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::V1);
164 const PERMISSION: Permission = Permission::Read;
165 const DESCRIPTION: Option<&'static str> = Some(
166 "Returns the latest F3 finalized tipset, or falls back to EC finality if F3 is not operational on the node or if the F3 finalized tipset is further back than EC finalized tipset.",
167 );
168
169 type Params = ();
170 type Ok = Tipset;
171
172 async fn handle(
173 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
174 (): Self::Params,
175 ) -> Result<Self::Ok, ServerError> {
176 let head = ctx.chain_store().heaviest_tipset();
177 let ec_finality_epoch = (head.epoch() - ctx.chain_config().policy.chain_finality).max(0);
178
179 match get_f3_finality_tipset(&ctx, ec_finality_epoch).await {
181 Ok(f3_tipset) => {
182 tracing::debug!("Using F3 finalized tipset at epoch {}", f3_tipset.epoch());
183 Ok(f3_tipset)
184 }
185 Err(_) => {
186 tracing::warn!("F3 finalization unavailable, falling back to EC finality");
188 let ec_tipset = ctx.chain_index().tipset_by_height(
189 ec_finality_epoch,
190 head,
191 ResolveNullTipset::TakeOlder,
192 )?;
193 Ok(ec_tipset)
194 }
195 }
196 }
197}
198
199async fn get_f3_finality_tipset<DB: Blockstore + Sync + Send + 'static>(
201 ctx: &Ctx<DB>,
202 ec_finality_epoch: ChainEpoch,
203) -> Result<Tipset> {
204 let f3_finalized_cert = crate::rpc::f3::F3GetLatestCertificate::handle(ctx.clone(), ())
205 .await
206 .map_err(|e| anyhow::anyhow!("Failed to get F3 certificate: {}", e))?;
207
208 let f3_finalized_head = f3_finalized_cert.chain_head();
209 if f3_finalized_head.epoch < ec_finality_epoch {
210 return Err(anyhow::anyhow!(
211 "F3 finalized tipset epoch {} is further back than EC finalized tipset epoch {}",
212 f3_finalized_head.epoch,
213 ec_finality_epoch
214 ));
215 }
216
217 ctx.chain_index()
218 .load_required_tipset(&f3_finalized_head.key)
219 .map_err(|e| {
220 anyhow::anyhow!(
221 "Failed to load F3 finalized tipset at epoch {}: {}",
222 f3_finalized_head.epoch,
223 e
224 )
225 })
226}
227
228pub enum ChainGetMessage {}
229impl RpcMethod<1> for ChainGetMessage {
230 const NAME: &'static str = "Filecoin.ChainGetMessage";
231 const PARAM_NAMES: [&'static str; 1] = ["messageCid"];
232 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
233 const PERMISSION: Permission = Permission::Read;
234 const DESCRIPTION: Option<&'static str> = Some("Returns the message with the specified CID.");
235
236 type Params = (Cid,);
237 type Ok = FlattenedApiMessage;
238
239 async fn handle(
240 ctx: Ctx<impl Blockstore>,
241 (message_cid,): Self::Params,
242 ) -> Result<Self::Ok, ServerError> {
243 let chain_message: ChainMessage = ctx
244 .store()
245 .get_cbor(&message_cid)?
246 .with_context(|| format!("can't find message with cid {message_cid}"))?;
247 let message = match chain_message {
248 ChainMessage::Signed(m) => m.into_message(),
249 ChainMessage::Unsigned(m) => m,
250 };
251
252 let cid = message.cid();
253 Ok(FlattenedApiMessage { message, cid })
254 }
255}
256
257pub enum ChainGetEvents {}
260impl RpcMethod<1> for ChainGetEvents {
261 const NAME: &'static str = "Filecoin.ChainGetEvents";
262 const PARAM_NAMES: [&'static str; 1] = ["rootCid"];
263 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
264 const PERMISSION: Permission = Permission::Read;
265 const DESCRIPTION: Option<&'static str> =
266 Some("Returns the events under the given event AMT root CID.");
267
268 type Params = (Cid,);
269 type Ok = Vec<Event>;
270 async fn handle(
271 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
272 (root_cid,): Self::Params,
273 ) -> Result<Self::Ok, ServerError> {
274 let events = EthEventHandler::get_events_by_event_root(&ctx, &root_cid)?;
275 Ok(events)
276 }
277}
278
279pub enum ChainGetParentMessages {}
280impl RpcMethod<1> for ChainGetParentMessages {
281 const NAME: &'static str = "Filecoin.ChainGetParentMessages";
282 const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
283 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
284 const PERMISSION: Permission = Permission::Read;
285 const DESCRIPTION: Option<&'static str> =
286 Some("Returns the messages included in the blocks of the parent tipset.");
287
288 type Params = (Cid,);
289 type Ok = Vec<ApiMessage>;
290
291 async fn handle(
292 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
293 (block_cid,): Self::Params,
294 ) -> Result<Self::Ok, ServerError> {
295 let store = ctx.store();
296 let block_header: CachingBlockHeader = store
297 .get_cbor(&block_cid)?
298 .with_context(|| format!("can't find block header with cid {block_cid}"))?;
299 if block_header.epoch == 0 {
300 Ok(vec![])
301 } else {
302 let parent_tipset = Tipset::load_required(store, &block_header.parents)?;
303 load_api_messages_from_tipset(&ctx, parent_tipset.key()).await
304 }
305 }
306}
307
308pub enum ChainGetParentReceipts {}
309impl RpcMethod<1> for ChainGetParentReceipts {
310 const NAME: &'static str = "Filecoin.ChainGetParentReceipts";
311 const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
312 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
313 const PERMISSION: Permission = Permission::Read;
314 const DESCRIPTION: Option<&'static str> =
315 Some("Returns the message receipts included in the blocks of the parent tipset.");
316
317 type Params = (Cid,);
318 type Ok = Vec<ApiReceipt>;
319
320 async fn handle(
321 ctx: Ctx<impl Blockstore>,
322 (block_cid,): Self::Params,
323 ) -> Result<Self::Ok, ServerError> {
324 let store = ctx.store();
325 let block_header: CachingBlockHeader = store
326 .get_cbor(&block_cid)?
327 .with_context(|| format!("can't find block header with cid {block_cid}"))?;
328 if block_header.epoch == 0 {
329 return Ok(vec![]);
330 }
331 let receipts = Receipt::get_receipts(store, block_header.message_receipts)
332 .map_err(|_| {
333 ErrorObjectOwned::owned::<()>(
334 1,
335 format!(
336 "failed to root: ipld: could not find {}",
337 block_header.message_receipts
338 ),
339 None,
340 )
341 })?
342 .iter()
343 .map(|r| ApiReceipt {
344 exit_code: r.exit_code().into(),
345 return_data: r.return_data(),
346 gas_used: r.gas_used(),
347 events_root: r.events_root(),
348 })
349 .collect();
350
351 Ok(receipts)
352 }
353}
354
355pub enum ChainGetMessagesInTipset {}
356impl RpcMethod<1> for ChainGetMessagesInTipset {
357 const NAME: &'static str = "Filecoin.ChainGetMessagesInTipset";
358 const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
359 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
360 const PERMISSION: Permission = Permission::Read;
361
362 type Params = (ApiTipsetKey,);
363 type Ok = Vec<ApiMessage>;
364
365 async fn handle(
366 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
367 (ApiTipsetKey(tipset_key),): Self::Params,
368 ) -> Result<Self::Ok, ServerError> {
369 let tipset = ctx
370 .chain_store()
371 .load_required_tipset_or_heaviest(&tipset_key)?;
372 load_api_messages_from_tipset(&ctx, tipset.key()).await
373 }
374}
375
376pub enum ChainPruneSnapshot {}
377impl RpcMethod<1> for ChainPruneSnapshot {
378 const NAME: &'static str = "Forest.SnapshotGC";
379 const PARAM_NAMES: [&'static str; 1] = ["blocking"];
380 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
381 const PERMISSION: Permission = Permission::Admin;
382
383 type Params = (bool,);
384 type Ok = ();
385
386 async fn handle(
387 _ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
388 (blocking,): Self::Params,
389 ) -> Result<Self::Ok, ServerError> {
390 if let Some(gc) = crate::daemon::GLOBAL_SNAPSHOT_GC.get() {
391 let progress_rx = gc.trigger()?;
392 while blocking && progress_rx.recv_async().await.is_ok() {}
393 Ok(())
394 } else {
395 Err(anyhow::anyhow!("snapshot gc is not enabled").into())
396 }
397 }
398}
399
400pub enum ForestChainExport {}
401impl RpcMethod<1> for ForestChainExport {
402 const NAME: &'static str = "Forest.ChainExport";
403 const PARAM_NAMES: [&'static str; 1] = ["params"];
404 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
405 const PERMISSION: Permission = Permission::Read;
406
407 type Params = (ForestChainExportParams,);
408 type Ok = ApiExportResult;
409
410 async fn handle(
411 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
412 (params,): Self::Params,
413 ) -> Result<Self::Ok, ServerError> {
414 let ForestChainExportParams {
415 version,
416 epoch,
417 recent_roots,
418 output_path,
419 tipset_keys: ApiTipsetKey(tsk),
420 skip_checksum,
421 dry_run,
422 } = params;
423
424 let token = CancellationToken::new();
425 {
426 let mut guard = CHAIN_EXPORT_LOCK.lock().await;
427 if guard.is_some() {
428 return Err(
429 anyhow::anyhow!("A chain export is still in progress. Cancel it with the export-cancel subcommand if needed.").into(),
430 );
431 }
432 *guard = Some(token.clone());
433 }
434 start_export();
435
436 let head = ctx.chain_store().load_required_tipset_or_heaviest(&tsk)?;
437 let start_ts =
438 ctx.chain_index()
439 .tipset_by_height(epoch, head, ResolveNullTipset::TakeOlder)?;
440
441 let options = Some(ExportOptions {
442 skip_checksum,
443 seen: Default::default(),
444 });
445 let writer = if dry_run {
446 tokio_util::either::Either::Left(VoidAsyncWriter)
447 } else {
448 tokio_util::either::Either::Right(tokio::fs::File::create(&output_path).await?)
449 };
450 let result = match version {
451 FilecoinSnapshotVersion::V1 => {
452 let db = ctx.store_owned();
453
454 let chain_export =
455 crate::chain::export::<Sha256>(&db, &start_ts, recent_roots, writer, options);
456
457 tokio::select! {
458 result = chain_export => {
459 result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex())))
460 },
461 _ = token.cancelled() => {
462 cancel_export();
463 tracing::warn!("Snapshot export was cancelled");
464 Ok(ApiExportResult::Cancelled)
465 },
466 }
467 }
468 FilecoinSnapshotVersion::V2 => {
469 let db = ctx.store_owned();
470
471 let f3_snap_tmp_path = {
472 let mut f3_snap_dir = output_path.clone();
473 let mut builder = tempfile::Builder::new();
474 let with_suffix = builder.suffix(".f3snap.bin");
475 if f3_snap_dir.pop() {
476 with_suffix.tempfile_in(&f3_snap_dir)
477 } else {
478 with_suffix.tempfile_in(".")
479 }?
480 .into_temp_path()
481 };
482 let f3_snap = {
483 match F3ExportLatestSnapshot::run(f3_snap_tmp_path.display().to_string()).await
484 {
485 Ok(cid) => Some((cid, File::open(&f3_snap_tmp_path)?)),
486 Err(e) => {
487 tracing::error!("Failed to export F3 snapshot: {e}");
488 None
489 }
490 }
491 };
492
493 let chain_export = crate::chain::export_v2::<Sha256, _>(
494 &db,
495 f3_snap,
496 &start_ts,
497 recent_roots,
498 writer,
499 options,
500 );
501
502 tokio::select! {
503 result = chain_export => {
504 result.map(|checksum_opt| ApiExportResult::Done(checksum_opt.map(|hash| hash.encode_hex())))
505 },
506 _ = token.cancelled() => {
507 cancel_export();
508 tracing::warn!("Snapshot export was cancelled");
509 Ok(ApiExportResult::Cancelled)
510 },
511 }
512 }
513 };
514 end_export();
515 let mut guard = CHAIN_EXPORT_LOCK.lock().await;
517 *guard = None;
518 match result {
519 Ok(export_result) => Ok(export_result),
520 Err(e) => Err(anyhow::anyhow!(e).into()),
521 }
522 }
523}
524
525pub enum ForestChainExportStatus {}
526impl RpcMethod<0> for ForestChainExportStatus {
527 const NAME: &'static str = "Forest.ChainExportStatus";
528 const PARAM_NAMES: [&'static str; 0] = [];
529 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
530 const PERMISSION: Permission = Permission::Read;
531
532 type Params = ();
533 type Ok = ApiExportStatus;
534
535 async fn handle(_ctx: Ctx<impl Blockstore>, (): Self::Params) -> Result<Self::Ok, ServerError> {
536 let mutex = CHAIN_EXPORT_STATUS.lock();
537
538 let progress = if mutex.initial_epoch == 0 {
539 0.0
540 } else {
541 let p = 1.0 - ((mutex.epoch as f64) / (mutex.initial_epoch as f64));
542 if p.is_finite() {
543 p.clamp(0.0, 1.0)
544 } else {
545 0.0
546 }
547 };
548 let progress = (progress * 100.0).round() / 100.0;
550
551 let status = ApiExportStatus {
552 progress,
553 exporting: mutex.exporting,
554 cancelled: mutex.cancelled,
555 start_time: mutex.start_time,
556 };
557
558 Ok(status)
559 }
560}
561
562pub enum ForestChainExportCancel {}
563impl RpcMethod<0> for ForestChainExportCancel {
564 const NAME: &'static str = "Forest.ChainExportCancel";
565 const PARAM_NAMES: [&'static str; 0] = [];
566 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
567 const PERMISSION: Permission = Permission::Read;
568
569 type Params = ();
570 type Ok = bool;
571
572 async fn handle(_ctx: Ctx<impl Blockstore>, (): Self::Params) -> Result<Self::Ok, ServerError> {
573 if let Some(token) = CHAIN_EXPORT_LOCK.lock().await.as_ref() {
574 token.cancel();
575 return Ok(true);
576 }
577
578 Ok(false)
579 }
580}
581
582pub enum ForestChainExportDiff {}
583impl RpcMethod<1> for ForestChainExportDiff {
584 const NAME: &'static str = "Forest.ChainExportDiff";
585 const PARAM_NAMES: [&'static str; 1] = ["params"];
586 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
587 const PERMISSION: Permission = Permission::Read;
588
589 type Params = (ForestChainExportDiffParams,);
590 type Ok = ();
591
592 async fn handle(
593 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
594 (params,): Self::Params,
595 ) -> Result<Self::Ok, ServerError> {
596 let ForestChainExportDiffParams {
597 from,
598 to,
599 depth,
600 output_path,
601 } = params;
602
603 let _locked = CHAIN_EXPORT_LOCK.try_lock();
604 if _locked.is_err() {
605 return Err(
606 anyhow::anyhow!("Another chain export diff job is still in progress").into(),
607 );
608 }
609
610 let chain_finality = ctx.chain_config().policy.chain_finality;
611 if depth < chain_finality {
612 return Err(
613 anyhow::anyhow!(format!("depth must be greater than {chain_finality}")).into(),
614 );
615 }
616
617 let head = ctx.chain_store().heaviest_tipset();
618 let start_ts =
619 ctx.chain_index()
620 .tipset_by_height(from, head, ResolveNullTipset::TakeOlder)?;
621
622 crate::tool::subcommands::archive_cmd::do_export(
623 &ctx.store_owned(),
624 start_ts,
625 output_path,
626 None,
627 depth,
628 Some(to),
629 Some(chain_finality),
630 true,
631 )
632 .await?;
633
634 Ok(())
635 }
636}
637
638pub enum ChainExport {}
639impl RpcMethod<1> for ChainExport {
640 const NAME: &'static str = "Filecoin.ChainExport";
641 const PARAM_NAMES: [&'static str; 1] = ["params"];
642 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
643 const PERMISSION: Permission = Permission::Read;
644
645 type Params = (ChainExportParams,);
646 type Ok = ApiExportResult;
647
648 async fn handle(
649 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
650 (ChainExportParams {
651 epoch,
652 recent_roots,
653 output_path,
654 tipset_keys,
655 skip_checksum,
656 dry_run,
657 },): Self::Params,
658 ) -> Result<Self::Ok, ServerError> {
659 ForestChainExport::handle(
660 ctx,
661 (ForestChainExportParams {
662 version: FilecoinSnapshotVersion::V1,
663 epoch,
664 recent_roots,
665 output_path,
666 tipset_keys,
667 skip_checksum,
668 dry_run,
669 },),
670 )
671 .await
672 }
673}
674
675pub enum ChainReadObj {}
676impl RpcMethod<1> for ChainReadObj {
677 const NAME: &'static str = "Filecoin.ChainReadObj";
678 const PARAM_NAMES: [&'static str; 1] = ["cid"];
679 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
680 const PERMISSION: Permission = Permission::Read;
681 const DESCRIPTION: Option<&'static str> = Some(
682 "Reads IPLD nodes referenced by the specified CID from the chain blockstore and returns raw bytes.",
683 );
684
685 type Params = (Cid,);
686 type Ok = Vec<u8>;
687
688 async fn handle(
689 ctx: Ctx<impl Blockstore>,
690 (cid,): Self::Params,
691 ) -> Result<Self::Ok, ServerError> {
692 let bytes = ctx
693 .store()
694 .get(&cid)?
695 .with_context(|| format!("can't find object with cid={cid}"))?;
696 Ok(bytes)
697 }
698}
699
700pub enum ChainHasObj {}
701impl RpcMethod<1> for ChainHasObj {
702 const NAME: &'static str = "Filecoin.ChainHasObj";
703 const PARAM_NAMES: [&'static str; 1] = ["cid"];
704 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
705 const PERMISSION: Permission = Permission::Read;
706 const DESCRIPTION: Option<&'static str> =
707 Some("Checks if a given CID exists in the chain blockstore.");
708
709 type Params = (Cid,);
710 type Ok = bool;
711
712 async fn handle(
713 ctx: Ctx<impl Blockstore>,
714 (cid,): Self::Params,
715 ) -> Result<Self::Ok, ServerError> {
716 Ok(ctx.store().get(&cid)?.is_some())
717 }
718}
719
720pub enum ChainStatObj {}
723impl RpcMethod<2> for ChainStatObj {
724 const NAME: &'static str = "Filecoin.ChainStatObj";
725 const PARAM_NAMES: [&'static str; 2] = ["obj_cid", "base_cid"];
726 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
727 const PERMISSION: Permission = Permission::Read;
728
729 type Params = (Cid, Option<Cid>);
730 type Ok = ObjStat;
731
732 async fn handle(
733 ctx: Ctx<impl Blockstore>,
734 (obj_cid, base_cid): Self::Params,
735 ) -> Result<Self::Ok, ServerError> {
736 let mut stats = ObjStat::default();
737 let mut seen = CidHashSet::default();
738 let mut walk = |cid, collect| {
739 let mut queue = VecDeque::new();
740 queue.push_back(cid);
741 while let Some(link_cid) = queue.pop_front() {
742 if !seen.insert(link_cid) {
743 continue;
744 }
745 let data = ctx.store().get(&link_cid)?;
746 if let Some(data) = data {
747 if collect {
748 stats.links += 1;
749 stats.size += data.len();
750 }
751 if matches!(link_cid.codec(), fvm_ipld_encoding::DAG_CBOR)
752 && let Ok(ipld) =
753 crate::utils::encoding::from_slice_with_fallback::<Ipld>(&data)
754 {
755 for ipld in DfsIter::new(ipld) {
756 if let Ipld::Link(cid) = ipld {
757 queue.push_back(cid);
758 }
759 }
760 }
761 }
762 }
763 anyhow::Ok(())
764 };
765 if let Some(base_cid) = base_cid {
766 walk(base_cid, false)?;
767 }
768 walk(obj_cid, true)?;
769 Ok(stats)
770 }
771}
772
773pub enum ChainGetBlockMessages {}
774impl RpcMethod<1> for ChainGetBlockMessages {
775 const NAME: &'static str = "Filecoin.ChainGetBlockMessages";
776 const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
777 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
778 const PERMISSION: Permission = Permission::Read;
779 const DESCRIPTION: Option<&'static str> =
780 Some("Returns all messages from the specified block.");
781
782 type Params = (Cid,);
783 type Ok = BlockMessages;
784
785 async fn handle(
786 ctx: Ctx<impl Blockstore>,
787 (block_cid,): Self::Params,
788 ) -> Result<Self::Ok, ServerError> {
789 let blk: CachingBlockHeader = ctx.store().get_cbor_required(&block_cid)?;
790 let (unsigned_cids, signed_cids) = crate::chain::read_msg_cids(ctx.store(), &blk)?;
791 let (bls_msg, secp_msg) =
792 crate::chain::block_messages_from_cids(ctx.store(), &unsigned_cids, &signed_cids)?;
793 let cids = unsigned_cids.into_iter().chain(signed_cids).collect();
794
795 let ret = BlockMessages {
796 bls_msg,
797 secp_msg,
798 cids,
799 };
800 Ok(ret)
801 }
802}
803
804pub enum ChainGetPath {}
805impl RpcMethod<2> for ChainGetPath {
806 const NAME: &'static str = "Filecoin.ChainGetPath";
807 const PARAM_NAMES: [&'static str; 2] = ["from", "to"];
808 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
809 const PERMISSION: Permission = Permission::Read;
810 const DESCRIPTION: Option<&'static str> =
811 Some("Returns the path between the two specified tipsets.");
812
813 type Params = (TipsetKey, TipsetKey);
814 type Ok = Vec<PathChange>;
815
816 async fn handle(
817 ctx: Ctx<impl Blockstore>,
818 (from, to): Self::Params,
819 ) -> Result<Self::Ok, ServerError> {
820 impl_chain_get_path(ctx.chain_store(), &from, &to).map_err(Into::into)
821 }
822}
823
824fn impl_chain_get_path(
842 chain_store: &ChainStore<impl Blockstore>,
843 from: &TipsetKey,
844 to: &TipsetKey,
845) -> anyhow::Result<Vec<PathChange>> {
846 let mut to_revert = chain_store
847 .load_required_tipset_or_heaviest(from)
848 .context("couldn't load `from`")?;
849 let mut to_apply = chain_store
850 .load_required_tipset_or_heaviest(to)
851 .context("couldn't load `to`")?;
852
853 let mut all_reverts = vec![];
854 let mut all_applies = vec![];
855
856 while to_revert != to_apply {
859 if to_revert.epoch() > to_apply.epoch() {
860 let next = chain_store
861 .load_required_tipset_or_heaviest(to_revert.parents())
862 .context("couldn't load ancestor of `from`")?;
863 all_reverts.push(to_revert);
864 to_revert = next;
865 } else {
866 let next = chain_store
867 .load_required_tipset_or_heaviest(to_apply.parents())
868 .context("couldn't load ancestor of `to`")?;
869 all_applies.push(to_apply);
870 to_apply = next;
871 }
872 }
873 Ok(all_reverts
874 .into_iter()
875 .map(PathChange::Revert)
876 .chain(all_applies.into_iter().rev().map(PathChange::Apply))
877 .collect())
878}
879
880pub enum ChainGetTipSetByHeight {}
884impl RpcMethod<2> for ChainGetTipSetByHeight {
885 const NAME: &'static str = "Filecoin.ChainGetTipSetByHeight";
886 const PARAM_NAMES: [&'static str; 2] = ["height", "tipsetKey"];
887 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
888 const PERMISSION: Permission = Permission::Read;
889 const DESCRIPTION: Option<&'static str> = Some("Returns the tipset at the specified height.");
890
891 type Params = (ChainEpoch, ApiTipsetKey);
892 type Ok = Tipset;
893
894 async fn handle(
895 ctx: Ctx<impl Blockstore>,
896 (height, ApiTipsetKey(tipset_key)): Self::Params,
897 ) -> Result<Self::Ok, ServerError> {
898 let ts = ctx
899 .chain_store()
900 .load_required_tipset_or_heaviest(&tipset_key)?;
901 let tss = ctx
902 .chain_index()
903 .tipset_by_height(height, ts, ResolveNullTipset::TakeOlder)?;
904 Ok(tss)
905 }
906}
907
908pub enum ChainGetTipSetAfterHeight {}
909impl RpcMethod<2> for ChainGetTipSetAfterHeight {
910 const NAME: &'static str = "Filecoin.ChainGetTipSetAfterHeight";
911 const PARAM_NAMES: [&'static str; 2] = ["height", "tipsetKey"];
912 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
913 const PERMISSION: Permission = Permission::Read;
914 const DESCRIPTION: Option<&'static str> = Some(
915 "Looks back and returns the tipset at the specified epoch.
916 If there are no blocks at the given epoch,
917 returns the first non-nil tipset at a later epoch.",
918 );
919
920 type Params = (ChainEpoch, ApiTipsetKey);
921 type Ok = Tipset;
922
923 async fn handle(
924 ctx: Ctx<impl Blockstore>,
925 (height, ApiTipsetKey(tipset_key)): Self::Params,
926 ) -> Result<Self::Ok, ServerError> {
927 let ts = ctx
928 .chain_store()
929 .load_required_tipset_or_heaviest(&tipset_key)?;
930 let tss = ctx
931 .chain_index()
932 .tipset_by_height(height, ts, ResolveNullTipset::TakeNewer)?;
933 Ok(tss)
934 }
935}
936
937pub enum ChainGetGenesis {}
938impl RpcMethod<0> for ChainGetGenesis {
939 const NAME: &'static str = "Filecoin.ChainGetGenesis";
940 const PARAM_NAMES: [&'static str; 0] = [];
941 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
942 const PERMISSION: Permission = Permission::Read;
943
944 type Params = ();
945 type Ok = Option<Tipset>;
946
947 async fn handle(ctx: Ctx<impl Blockstore>, (): Self::Params) -> Result<Self::Ok, ServerError> {
948 let genesis = ctx.chain_store().genesis_block_header();
949 Ok(Some(Tipset::from(genesis)))
950 }
951}
952
953pub enum ChainHead {}
954impl RpcMethod<0> for ChainHead {
955 const NAME: &'static str = "Filecoin.ChainHead";
956 const PARAM_NAMES: [&'static str; 0] = [];
957 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
958 const PERMISSION: Permission = Permission::Read;
959 const DESCRIPTION: Option<&'static str> = Some("Returns the chain head (heaviest tipset).");
960
961 type Params = ();
962 type Ok = Tipset;
963
964 async fn handle(ctx: Ctx<impl Blockstore>, (): Self::Params) -> Result<Self::Ok, ServerError> {
965 let heaviest = ctx.chain_store().heaviest_tipset();
966 Ok(heaviest)
967 }
968}
969
970pub enum ChainGetBlock {}
971impl RpcMethod<1> for ChainGetBlock {
972 const NAME: &'static str = "Filecoin.ChainGetBlock";
973 const PARAM_NAMES: [&'static str; 1] = ["blockCid"];
974 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
975 const PERMISSION: Permission = Permission::Read;
976 const DESCRIPTION: Option<&'static str> = Some("Returns the block with the specified CID.");
977
978 type Params = (Cid,);
979 type Ok = CachingBlockHeader;
980
981 async fn handle(
982 ctx: Ctx<impl Blockstore>,
983 (block_cid,): Self::Params,
984 ) -> Result<Self::Ok, ServerError> {
985 let blk: CachingBlockHeader = ctx.store().get_cbor_required(&block_cid)?;
986 Ok(blk)
987 }
988}
989
990pub enum ChainGetTipSet {}
991impl RpcMethod<1> for ChainGetTipSet {
992 const NAME: &'static str = "Filecoin.ChainGetTipSet";
993 const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
994 const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::{ V0 | V1 });
995 const PERMISSION: Permission = Permission::Read;
996 const DESCRIPTION: Option<&'static str> = Some("Returns the tipset with the specified CID.");
997
998 type Params = (ApiTipsetKey,);
999 type Ok = Tipset;
1000
1001 async fn handle(
1002 ctx: Ctx<impl Blockstore>,
1003 (ApiTipsetKey(tsk),): Self::Params,
1004 ) -> Result<Self::Ok, ServerError> {
1005 if let Some(tsk) = &tsk {
1006 let ts = ctx.chain_index().load_required_tipset(tsk)?;
1007 Ok(ts)
1008 } else {
1009 Err(anyhow::anyhow!(
1011 "TipsetKey cannot be empty (NewTipSet called with zero length array of blocks)"
1012 )
1013 .into())
1014 }
1015 }
1016}
1017
1018pub enum ChainGetTipSetV2 {}
1019
1020impl ChainGetTipSetV2 {
1021 pub async fn get_tipset_by_anchor(
1022 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1023 anchor: &Option<TipsetAnchor>,
1024 ) -> anyhow::Result<Option<Tipset>> {
1025 if let Some(anchor) = anchor {
1026 match (&anchor.key.0, &anchor.tag) {
1027 (None, None) => Ok(Some(ctx.state_manager.heaviest_tipset())),
1029 (Some(tsk), None) => Ok(Some(ctx.chain_index().load_required_tipset(tsk)?)),
1031 (None, Some(tag)) => Self::get_tipset_by_tag(ctx, *tag).await,
1032 _ => {
1033 anyhow::bail!("invalid anchor")
1034 }
1035 }
1036 } else {
1037 Self::get_tipset_by_tag(ctx, TipsetTag::Finalized).await
1039 }
1040 }
1041
1042 pub async fn get_tipset_by_tag(
1043 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1044 tag: TipsetTag,
1045 ) -> anyhow::Result<Option<Tipset>> {
1046 match tag {
1047 TipsetTag::Latest => Ok(Some(ctx.state_manager.heaviest_tipset())),
1048 TipsetTag::Finalized => Self::get_latest_finalized_tipset(ctx).await,
1049 TipsetTag::Safe => Some(Self::get_latest_safe_tipset(ctx).await).transpose(),
1050 }
1051 }
1052
1053 pub async fn get_latest_safe_tipset(
1054 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1055 ) -> anyhow::Result<Tipset> {
1056 let finalized = Self::get_latest_finalized_tipset(ctx).await?;
1057 let head = ctx.chain_store().heaviest_tipset();
1058 let safe_height = (head.epoch() - SAFE_HEIGHT_DISTANCE).max(0);
1059 if let Some(finalized) = finalized
1060 && finalized.epoch() >= safe_height
1061 {
1062 Ok(finalized)
1063 } else {
1064 Ok(ctx.chain_index().tipset_by_height(
1065 safe_height,
1066 head,
1067 ResolveNullTipset::TakeOlder,
1068 )?)
1069 }
1070 }
1071
1072 pub async fn get_latest_finalized_tipset(
1073 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1074 ) -> anyhow::Result<Option<Tipset>> {
1075 let Ok(f3_finalized_cert) =
1076 crate::rpc::f3::F3GetLatestCertificate::handle(ctx.clone(), ()).await
1077 else {
1078 return Self::get_ec_finalized_tipset(ctx);
1079 };
1080
1081 let f3_finalized_head = f3_finalized_cert.chain_head();
1082 let head = ctx.chain_store().heaviest_tipset();
1083 if head.epoch() > f3_finalized_head.epoch + ctx.chain_config().policy.chain_finality {
1085 return Self::get_ec_finalized_tipset(ctx);
1086 }
1087
1088 let ts = ctx
1089 .chain_index()
1090 .load_required_tipset(&f3_finalized_head.key)
1091 .map_err(|e| {
1092 anyhow::anyhow!(
1093 "Failed to load F3 finalized tipset at epoch {} with key {}: {e}",
1094 f3_finalized_head.epoch,
1095 f3_finalized_head.key,
1096 )
1097 })?;
1098 Ok(Some(ts))
1099 }
1100
1101 pub fn get_ec_finalized_tipset(ctx: &Ctx<impl Blockstore>) -> anyhow::Result<Option<Tipset>> {
1102 let head = ctx.chain_store().heaviest_tipset();
1103 let ec_finality_epoch = head.epoch() - ctx.chain_config().policy.chain_finality;
1104 if ec_finality_epoch >= 0 {
1105 let ts = ctx.chain_index().tipset_by_height(
1106 ec_finality_epoch,
1107 head,
1108 ResolveNullTipset::TakeOlder,
1109 )?;
1110 Ok(Some(ts))
1111 } else {
1112 Ok(None)
1113 }
1114 }
1115
1116 pub async fn get_tipset(
1117 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1118 selector: &TipsetSelector,
1119 ) -> anyhow::Result<Option<Tipset>> {
1120 selector.validate()?;
1121 if let ApiTipsetKey(Some(tsk)) = &selector.key {
1123 let ts = ctx.chain_index().load_required_tipset(tsk)?;
1124 return Ok(Some(ts));
1125 }
1126 if let Some(height) = &selector.height {
1128 let anchor = Self::get_tipset_by_anchor(ctx, &height.anchor).await?;
1129 let ts = ctx.chain_index().tipset_by_height(
1130 height.at,
1131 anchor.unwrap_or_else(|| ctx.chain_store().heaviest_tipset()),
1132 height.resolve_null_tipset_policy(),
1133 )?;
1134 return Ok(Some(ts));
1135 }
1136 if let Some(tag) = &selector.tag {
1138 let ts = Self::get_tipset_by_tag(ctx, *tag).await?;
1139 return Ok(ts);
1140 }
1141 anyhow::bail!("no tipset found for selector")
1142 }
1143
1144 pub async fn get_required_tipset(
1145 ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
1146 selector: &TipsetSelector,
1147 ) -> anyhow::Result<Tipset> {
1148 Self::get_tipset(ctx, selector)
1149 .await?
1150 .context("failed to select a tipset")
1151 }
1152}
1153
1154impl RpcMethod<1> for ChainGetTipSetV2 {
1155 const NAME: &'static str = "Filecoin.ChainGetTipSet";
1156 const PARAM_NAMES: [&'static str; 1] = ["tipsetSelector"];
1157 const API_PATHS: BitFlags<ApiPaths> = make_bitflags!(ApiPaths::{ V2 });
1158 const PERMISSION: Permission = Permission::Read;
1159 const DESCRIPTION: Option<&'static str> = Some("Returns the tipset with the specified CID.");
1160
1161 type Params = (TipsetSelector,);
1162 type Ok = Option<Tipset>;
1163
1164 async fn handle(
1165 ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
1166 (selector,): Self::Params,
1167 ) -> Result<Self::Ok, ServerError> {
1168 Ok(Self::get_tipset(&ctx, &selector).await?)
1169 }
1170}
1171
1172pub enum ChainSetHead {}
1173impl RpcMethod<1> for ChainSetHead {
1174 const NAME: &'static str = "Filecoin.ChainSetHead";
1175 const PARAM_NAMES: [&'static str; 1] = ["tsk"];
1176 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1177 const PERMISSION: Permission = Permission::Admin;
1178
1179 type Params = (TipsetKey,);
1180 type Ok = ();
1181
1182 async fn handle(
1183 ctx: Ctx<impl Blockstore>,
1184 (tsk,): Self::Params,
1185 ) -> Result<Self::Ok, ServerError> {
1186 let new_head = ctx.chain_index().load_required_tipset(&tsk)?;
1190 let mut current = ctx.chain_store().heaviest_tipset();
1191 while current.epoch() >= new_head.epoch() {
1192 for cid in current.key().to_cids() {
1193 ctx.chain_store().unmark_block_as_validated(&cid);
1194 }
1195 let parents = ¤t.block_headers().first().parents;
1196 current = ctx.chain_index().load_required_tipset(parents)?;
1197 }
1198 ctx.chain_store()
1199 .set_heaviest_tipset(new_head)
1200 .map_err(Into::into)
1201 }
1202}
1203
1204pub enum ChainGetMinBaseFee {}
1205impl RpcMethod<1> for ChainGetMinBaseFee {
1206 const NAME: &'static str = "Forest.ChainGetMinBaseFee";
1207 const PARAM_NAMES: [&'static str; 1] = ["lookback"];
1208 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1209 const PERMISSION: Permission = Permission::Read;
1210
1211 type Params = (u32,);
1212 type Ok = String;
1213
1214 async fn handle(
1215 ctx: Ctx<impl Blockstore>,
1216 (lookback,): Self::Params,
1217 ) -> Result<Self::Ok, ServerError> {
1218 let mut current = ctx.chain_store().heaviest_tipset();
1219 let mut min_base_fee = current.block_headers().first().parent_base_fee.clone();
1220
1221 for _ in 0..lookback {
1222 let parents = ¤t.block_headers().first().parents;
1223 current = ctx.chain_index().load_required_tipset(parents)?;
1224
1225 min_base_fee =
1226 min_base_fee.min(current.block_headers().first().parent_base_fee.to_owned());
1227 }
1228
1229 Ok(min_base_fee.atto().to_string())
1230 }
1231}
1232
1233pub enum ChainTipSetWeight {}
1234impl RpcMethod<1> for ChainTipSetWeight {
1235 const NAME: &'static str = "Filecoin.ChainTipSetWeight";
1236 const PARAM_NAMES: [&'static str; 1] = ["tipsetKey"];
1237 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1238 const PERMISSION: Permission = Permission::Read;
1239 const DESCRIPTION: Option<&'static str> = Some("Returns the weight of the specified tipset.");
1240
1241 type Params = (ApiTipsetKey,);
1242 type Ok = BigInt;
1243
1244 async fn handle(
1245 ctx: Ctx<impl Blockstore>,
1246 (ApiTipsetKey(tipset_key),): Self::Params,
1247 ) -> Result<Self::Ok, ServerError> {
1248 let ts = ctx
1249 .chain_store()
1250 .load_required_tipset_or_heaviest(&tipset_key)?;
1251 let weight = crate::fil_cns::weight(ctx.store(), &ts)?;
1252 Ok(weight)
1253 }
1254}
1255
1256pub enum ChainGetTipsetByParentState {}
1257impl RpcMethod<1> for ChainGetTipsetByParentState {
1258 const NAME: &'static str = "Forest.ChainGetTipsetByParentState";
1259 const PARAM_NAMES: [&'static str; 1] = ["parentState"];
1260 const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
1261 const PERMISSION: Permission = Permission::Read;
1262
1263 type Params = (Cid,);
1264 type Ok = Option<Tipset>;
1265
1266 async fn handle(
1267 ctx: Ctx<impl Blockstore>,
1268 (parent_state,): Self::Params,
1269 ) -> Result<Self::Ok, ServerError> {
1270 Ok(ctx
1271 .chain_store()
1272 .heaviest_tipset()
1273 .chain(ctx.store())
1274 .find(|ts| ts.parent_state() == &parent_state)
1275 .clone())
1276 }
1277}
1278
1279pub const CHAIN_NOTIFY: &str = "Filecoin.ChainNotify";
1280pub(crate) fn chain_notify<DB: Blockstore>(
1281 _params: Params<'_>,
1282 data: &crate::rpc::RPCState<DB>,
1283) -> Subscriber<Vec<ApiHeadChange>> {
1284 let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);
1285
1286 let current = data.chain_store().heaviest_tipset();
1288 let (change, tipset) = ("current".into(), current);
1289 sender
1290 .send(vec![ApiHeadChange { change, tipset }])
1291 .expect("receiver is not dropped");
1292
1293 let mut subscriber = data.chain_store().publisher().subscribe();
1294
1295 tokio::spawn(async move {
1296 let _ = subscriber.recv().await;
1298
1299 while let Ok(v) = subscriber.recv().await {
1300 let (change, tipset) = match v {
1301 HeadChange::Apply(ts) => ("apply".into(), ts),
1302 };
1303
1304 if sender.send(vec![ApiHeadChange { change, tipset }]).is_err() {
1305 break;
1306 }
1307 }
1308 });
1309 receiver
1310}
1311
1312async fn load_api_messages_from_tipset<DB: Blockstore + Send + Sync + 'static>(
1313 ctx: &crate::rpc::RPCState<DB>,
1314 tipset_keys: &TipsetKey,
1315) -> Result<Vec<ApiMessage>, ServerError> {
1316 static SHOULD_BACKFILL: LazyLock<bool> = LazyLock::new(|| {
1317 let enabled = is_env_truthy("FOREST_RPC_BACKFILL_FULL_TIPSET_FROM_NETWORK");
1318 if enabled {
1319 tracing::warn!(
1320 "Full tipset backfilling from network is enabled via FOREST_RPC_BACKFILL_FULL_TIPSET_FROM_NETWORK, excessive disk and bandwidth usage is expected."
1321 );
1322 }
1323 enabled
1324 });
1325 let full_tipset = if *SHOULD_BACKFILL {
1326 get_full_tipset(
1327 &ctx.sync_network_context,
1328 ctx.chain_store(),
1329 None,
1330 tipset_keys,
1331 )
1332 .await?
1333 } else {
1334 load_full_tipset(ctx.chain_store(), tipset_keys)?
1335 };
1336 let blocks = full_tipset.into_blocks();
1337 let mut messages = vec![];
1338 let mut seen = CidHashSet::default();
1339 for Block {
1340 bls_messages,
1341 secp_messages,
1342 ..
1343 } in blocks
1344 {
1345 for message in bls_messages {
1346 let cid = message.cid();
1347 if seen.insert(cid) {
1348 messages.push(ApiMessage { cid, message });
1349 }
1350 }
1351
1352 for msg in secp_messages {
1353 let cid = msg.cid();
1354 if seen.insert(cid) {
1355 messages.push(ApiMessage {
1356 cid,
1357 message: msg.message,
1358 });
1359 }
1360 }
1361 }
1362
1363 Ok(messages)
1364}
1365
1366#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
1367pub struct BlockMessages {
1368 #[serde(rename = "BlsMessages", with = "crate::lotus_json")]
1369 #[schemars(with = "LotusJson<Vec<Message>>")]
1370 pub bls_msg: Vec<Message>,
1371 #[serde(rename = "SecpkMessages", with = "crate::lotus_json")]
1372 #[schemars(with = "LotusJson<Vec<SignedMessage>>")]
1373 pub secp_msg: Vec<SignedMessage>,
1374 #[serde(rename = "Cids", with = "crate::lotus_json")]
1375 #[schemars(with = "LotusJson<Vec<Cid>>")]
1376 pub cids: Vec<Cid>,
1377}
1378lotus_json_with_self!(BlockMessages);
1379
1380#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, JsonSchema)]
1381#[serde(rename_all = "PascalCase")]
1382pub struct ApiReceipt {
1383 pub exit_code: ExitCode,
1385 #[serde(rename = "Return", with = "crate::lotus_json")]
1387 #[schemars(with = "LotusJson<RawBytes>")]
1388 pub return_data: RawBytes,
1389 pub gas_used: u64,
1391 #[serde(with = "crate::lotus_json")]
1392 #[schemars(with = "LotusJson<Option<Cid>>")]
1393 pub events_root: Option<Cid>,
1394}
1395
1396lotus_json_with_self!(ApiReceipt);
1397
1398#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug, Eq, PartialEq)]
1399#[serde(rename_all = "PascalCase")]
1400pub struct ApiMessage {
1401 #[serde(with = "crate::lotus_json")]
1402 #[schemars(with = "LotusJson<Cid>")]
1403 pub cid: Cid,
1404 #[serde(with = "crate::lotus_json")]
1405 #[schemars(with = "LotusJson<Message>")]
1406 pub message: Message,
1407}
1408
1409lotus_json_with_self!(ApiMessage);
1410
1411#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug, Eq, PartialEq)]
1412pub struct FlattenedApiMessage {
1413 #[serde(flatten, with = "crate::lotus_json")]
1414 #[schemars(with = "LotusJson<Message>")]
1415 pub message: Message,
1416 #[serde(rename = "CID", with = "crate::lotus_json")]
1417 #[schemars(with = "LotusJson<Cid>")]
1418 pub cid: Cid,
1419}
1420
1421lotus_json_with_self!(FlattenedApiMessage);
1422
1423#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1424pub struct ForestChainExportParams {
1425 pub version: FilecoinSnapshotVersion,
1426 pub epoch: ChainEpoch,
1427 pub recent_roots: i64,
1428 pub output_path: PathBuf,
1429 #[schemars(with = "LotusJson<ApiTipsetKey>")]
1430 #[serde(with = "crate::lotus_json")]
1431 pub tipset_keys: ApiTipsetKey,
1432 pub skip_checksum: bool,
1433 pub dry_run: bool,
1434}
1435lotus_json_with_self!(ForestChainExportParams);
1436
1437#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1438pub struct ForestChainExportDiffParams {
1439 pub from: ChainEpoch,
1440 pub to: ChainEpoch,
1441 pub depth: i64,
1442 pub output_path: PathBuf,
1443}
1444lotus_json_with_self!(ForestChainExportDiffParams);
1445
1446#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
1447pub struct ChainExportParams {
1448 pub epoch: ChainEpoch,
1449 pub recent_roots: i64,
1450 pub output_path: PathBuf,
1451 #[schemars(with = "LotusJson<ApiTipsetKey>")]
1452 #[serde(with = "crate::lotus_json")]
1453 pub tipset_keys: ApiTipsetKey,
1454 pub skip_checksum: bool,
1455 pub dry_run: bool,
1456}
1457lotus_json_with_self!(ChainExportParams);
1458
1459#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
1460#[serde(rename_all = "PascalCase")]
1461pub struct ApiHeadChange {
1462 #[serde(rename = "Type")]
1463 pub change: String,
1464 #[serde(rename = "Val", with = "crate::lotus_json")]
1465 #[schemars(with = "LotusJson<Tipset>")]
1466 pub tipset: Tipset,
1467}
1468lotus_json_with_self!(ApiHeadChange);
1469
1470#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
1471#[serde(tag = "Type", content = "Val", rename_all = "snake_case")]
1472pub enum PathChange<T = Tipset> {
1473 Revert(T),
1474 Apply(T),
1475}
1476impl HasLotusJson for PathChange {
1477 type LotusJson = PathChange<<Tipset as HasLotusJson>::LotusJson>;
1478
1479 #[cfg(test)]
1480 fn snapshots() -> Vec<(serde_json::Value, Self)> {
1481 use serde_json::json;
1482 vec![(
1483 json!({
1484 "Type": "revert",
1485 "Val": {
1486 "Blocks": [
1487 {
1488 "BeaconEntries": null,
1489 "ForkSignaling": 0,
1490 "Height": 0,
1491 "Messages": { "/": "baeaaaaa" },
1492 "Miner": "f00",
1493 "ParentBaseFee": "0",
1494 "ParentMessageReceipts": { "/": "baeaaaaa" },
1495 "ParentStateRoot": { "/":"baeaaaaa" },
1496 "ParentWeight": "0",
1497 "Parents": [{"/":"bafyreiaqpwbbyjo4a42saasj36kkrpv4tsherf2e7bvezkert2a7dhonoi"}],
1498 "Timestamp": 0,
1499 "WinPoStProof": null
1500 }
1501 ],
1502 "Cids": [
1503 { "/": "bafy2bzaceag62hjj3o43lf6oyeox3fvg5aqkgl5zagbwpjje3ajwg6yw4iixk" }
1504 ],
1505 "Height": 0
1506 }
1507 }),
1508 Self::Revert(RawBlockHeader::default().into()),
1509 )]
1510 }
1511
1512 fn into_lotus_json(self) -> Self::LotusJson {
1513 match self {
1514 PathChange::Revert(it) => PathChange::Revert(it.into_lotus_json()),
1515 PathChange::Apply(it) => PathChange::Apply(it.into_lotus_json()),
1516 }
1517 }
1518
1519 fn from_lotus_json(lotus_json: Self::LotusJson) -> Self {
1520 match lotus_json {
1521 PathChange::Revert(it) => PathChange::Revert(Tipset::from_lotus_json(it)),
1522 PathChange::Apply(it) => PathChange::Apply(Tipset::from_lotus_json(it)),
1523 }
1524 }
1525}
1526
1527#[cfg(test)]
1528impl<T> quickcheck::Arbitrary for PathChange<T>
1529where
1530 T: quickcheck::Arbitrary,
1531{
1532 fn arbitrary(g: &mut quickcheck::Gen) -> Self {
1533 let inner = T::arbitrary(g);
1534 g.choose(&[PathChange::Apply(inner.clone()), PathChange::Revert(inner)])
1535 .unwrap()
1536 .clone()
1537 }
1538}
1539
1540#[test]
1541fn snapshots() {
1542 assert_all_snapshots::<PathChange>()
1543}
1544
1545#[cfg(test)]
1546quickcheck::quickcheck! {
1547 fn quickcheck(val: PathChange) -> () {
1548 assert_unchanged_via_json(val)
1549 }
1550}
1551
1552#[cfg(test)]
1553mod tests {
1554 use super::*;
1555 use crate::{
1556 blocks::{Chain4U, RawBlockHeader, chain4u},
1557 db::{
1558 MemoryDB,
1559 car::{AnyCar, ManyCar},
1560 },
1561 networks::{self, ChainConfig},
1562 };
1563 use PathChange::{Apply, Revert};
1564 use itertools::Itertools as _;
1565 use std::sync::Arc;
1566
1567 #[test]
1568 fn revert_to_ancestor_linear() {
1569 let store = ChainStore::calibnet();
1570 chain4u! {
1571 in store.blockstore();
1572 [_genesis = store.genesis_block_header()]
1573 -> [a] -> [b] -> [c, d] -> [e]
1574 };
1575
1576 assert_path_change(&store, b, a, [Revert(&[b])]);
1578
1579 assert_path_change(&store, [c, d], a, [Revert(&[c, d][..]), Revert(&[b])]);
1581
1582 assert_path_change(&store, e, [c, d], [Revert(e)]);
1584
1585 assert_path_change(&store, e, b, [Revert(&[e][..]), Revert(&[c, d])]);
1587 }
1588
1589 #[test]
1592 fn incomplete_tipsets() {
1593 let store = ChainStore::calibnet();
1594 chain4u! {
1595 in store.blockstore();
1596 [_genesis = store.genesis_block_header()]
1597 -> [a, b] -> [c] -> [d, _e] };
1599
1600 assert_path_change(
1602 &store,
1603 a,
1604 c,
1605 [
1606 Revert(&[a][..]), Apply(&[a, b]), Apply(&[c]), ],
1610 );
1611
1612 assert_path_change(&store, c, d, [Apply(d)]);
1614
1615 assert_path_change(&store, d, c, [Revert(d)]);
1617
1618 assert_path_change(
1620 &store,
1621 c,
1622 a,
1623 [
1624 Revert(&[c][..]),
1625 Revert(&[a, b]), Apply(&[a]), ],
1628 );
1629 }
1630
1631 #[test]
1632 fn apply_to_descendant_linear() {
1633 let store = ChainStore::calibnet();
1634 chain4u! {
1635 in store.blockstore();
1636 [_genesis = store.genesis_block_header()]
1637 -> [a] -> [b] -> [c, d] -> [e]
1638 };
1639
1640 assert_path_change(&store, a, b, [Apply(&[b])]);
1642
1643 assert_path_change(&store, [c, d], e, [Apply(e)]);
1645
1646 assert_path_change(&store, b, [c, d], [Apply([c, d])]);
1648
1649 assert_path_change(&store, b, e, [Apply(&[c, d][..]), Apply(&[e])]);
1651 }
1652
1653 #[test]
1654 fn cross_fork_simple() {
1655 let store = ChainStore::calibnet();
1656 chain4u! {
1657 in store.blockstore();
1658 [_genesis = store.genesis_block_header()]
1659 -> [a] -> [b1] -> [c1]
1660 };
1661 chain4u! {
1662 from [a] in store.blockstore();
1663 [b2] -> [c2]
1664 };
1665
1666 assert_path_change(&store, b1, b2, [Revert(b1), Apply(b2)]);
1668
1669 assert_path_change(&store, b1, c2, [Revert(b1), Apply(b2), Apply(c2)]);
1671
1672 let _ = (a, c1);
1673 }
1674
1675 impl ChainStore<Chain4U<ManyCar>> {
1676 fn _load(genesis_car: &'static [u8], genesis_cid: Cid) -> Self {
1677 let db = Arc::new(Chain4U::with_blockstore(
1678 ManyCar::new(MemoryDB::default())
1679 .with_read_only(AnyCar::new(genesis_car).unwrap())
1680 .unwrap(),
1681 ));
1682 let genesis_block_header = db.get_cbor(&genesis_cid).unwrap().unwrap();
1683 ChainStore::new(
1684 db,
1685 Arc::new(MemoryDB::default()),
1686 Arc::new(MemoryDB::default()),
1687 Arc::new(ChainConfig::calibnet()),
1688 genesis_block_header,
1689 )
1690 .unwrap()
1691 }
1692 pub fn calibnet() -> Self {
1693 Self::_load(
1694 networks::calibnet::DEFAULT_GENESIS,
1695 *networks::calibnet::GENESIS_CID,
1696 )
1697 }
1698 }
1699
1700 trait MakeTipset {
1702 fn make_tipset(self) -> Tipset;
1703 }
1704
1705 impl MakeTipset for &RawBlockHeader {
1706 fn make_tipset(self) -> Tipset {
1707 Tipset::from(CachingBlockHeader::new(self.clone()))
1708 }
1709 }
1710
1711 impl<const N: usize> MakeTipset for [&RawBlockHeader; N] {
1712 fn make_tipset(self) -> Tipset {
1713 self.as_slice().make_tipset()
1714 }
1715 }
1716
1717 impl<const N: usize> MakeTipset for &[&RawBlockHeader; N] {
1718 fn make_tipset(self) -> Tipset {
1719 self.as_slice().make_tipset()
1720 }
1721 }
1722
1723 impl MakeTipset for &[&RawBlockHeader] {
1724 fn make_tipset(self) -> Tipset {
1725 Tipset::new(self.iter().cloned().cloned()).unwrap()
1726 }
1727 }
1728
1729 #[track_caller]
1730 fn assert_path_change<T: MakeTipset>(
1731 store: &ChainStore<impl Blockstore>,
1732 from: impl MakeTipset,
1733 to: impl MakeTipset,
1734 expected: impl IntoIterator<Item = PathChange<T>>,
1735 ) {
1736 fn print(path_change: &PathChange) {
1737 let it = match path_change {
1738 Revert(it) => {
1739 print!("Revert(");
1740 it
1741 }
1742 Apply(it) => {
1743 print!(" Apply(");
1744 it
1745 }
1746 };
1747 println!(
1748 "epoch = {}, key.cid = {})",
1749 it.epoch(),
1750 it.key().cid().unwrap()
1751 )
1752 }
1753
1754 let actual =
1755 impl_chain_get_path(store, from.make_tipset().key(), to.make_tipset().key()).unwrap();
1756 let expected = expected
1757 .into_iter()
1758 .map(|change| match change {
1759 PathChange::Revert(it) => PathChange::Revert(it.make_tipset()),
1760 PathChange::Apply(it) => PathChange::Apply(it.make_tipset()),
1761 })
1762 .collect_vec();
1763 if expected != actual {
1764 println!("SUMMARY");
1765 println!("=======");
1766 println!("expected:");
1767 for it in &expected {
1768 print(it)
1769 }
1770 println!();
1771 println!("actual:");
1772 for it in &actual {
1773 print(it)
1774 }
1775 println!("=======\n")
1776 }
1777 assert_eq!(
1778 expected, actual,
1779 "expected change (left) does not match actual change (right)"
1780 )
1781 }
1782}