pub mod bundle;
mod context;
pub mod db_util;
pub mod main;
use crate::blocks::Tipset;
use crate::chain::ChainStore;
use crate::chain::index::ResolveNullTipset;
use crate::chain_sync::ChainFollower;
use crate::chain_sync::network_context::SyncNetworkContext;
use crate::cli_shared::snapshot;
use crate::cli_shared::{
chain_path,
cli::{CliOpts, Config},
};
use crate::daemon::{
context::{AppContext, DbType},
db_util::import_chain_as_forest_car,
};
use crate::db::gc::SnapshotGarbageCollector;
use crate::db::ttl::EthMappingCollector;
use crate::libp2p::{Libp2pService, PeerManager};
use crate::message_pool::{MessagePool, MpoolConfig, MpoolLocker, NonceTracker};
use crate::networks::{self, ChainConfig};
use crate::rpc::RPCState;
use crate::rpc::eth::filter::EthEventHandler;
use crate::rpc::start_rpc;
use crate::shim::clock::ChainEpoch;
use crate::shim::state_tree::StateTree;
use crate::shim::version::NetworkVersion;
use crate::utils::misc::env::is_env_truthy;
use crate::utils::{self, ShallowClone as _};
use crate::utils::{proofs_api::ensure_proof_params_downloaded, version::FOREST_VERSION_STRING};
use anyhow::{Context as _, bail};
use backon::{ExponentialBuilder, Retryable};
use dialoguer::theme::ColorfulTheme;
use futures::{Future, FutureExt};
use std::path::Path;
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::{Duration, Instant};
use tokio::{
net::TcpListener,
signal::{
ctrl_c,
unix::{SignalKind, signal},
},
sync::mpsc,
task::JoinSet,
};
use tracing::{debug, info, warn};
pub static GLOBAL_SNAPSHOT_GC: OnceLock<Arc<SnapshotGarbageCollector<DbType>>> = OnceLock::new();
fn maybe_increase_fd_limit() -> anyhow::Result<()> {
static DESIRED_SOFT_LIMIT: u64 = 8192;
let (soft_before, _) = rlimit::Resource::NOFILE.get()?;
let soft_after = rlimit::increase_nofile_limit(DESIRED_SOFT_LIMIT)?;
if soft_before < soft_after {
debug!("Increased file descriptor limit from {soft_before} to {soft_after}");
}
if soft_after < DESIRED_SOFT_LIMIT {
warn!(
"File descriptor limit is too low: {soft_after} < {DESIRED_SOFT_LIMIT}. \
You may encounter 'too many open files' errors.",
);
}
Ok(())
}
pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Result<()> {
let start_time = chrono::Utc::now();
let mut terminate = signal(SignalKind::terminate())?;
let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);
let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel();
let result = tokio::select! {
ret = start(start_time, opts, config, shutdown_send, rpc_stop_handle) => ret,
_ = ctrl_c() => {
info!("Keyboard interrupt.");
Ok(())
},
_ = terminate.recv() => {
info!("Received SIGTERM.");
Ok(())
},
_ = shutdown_recv.recv() => {
info!("Client requested a shutdown.");
Ok(())
},
};
_ = rpc_server_handle.stop();
crate::utils::io::terminal_cleanup();
result
}
fn startup_init(config: &Config) -> anyhow::Result<()> {
maybe_increase_fd_limit()?;
crate::utils::proofs_api::maybe_set_proofs_parameter_cache_dir_env(&config.client.data_dir);
info!(
"Starting Forest daemon, version {}",
FOREST_VERSION_STRING.as_str()
);
Ok(())
}
async fn maybe_import_snapshot(
opts: &CliOpts,
config: &mut Config,
ctx: &AppContext,
) -> anyhow::Result<()> {
let chain_config = ctx.state_manager.chain_config();
if config.client.snapshot_path.is_none() && !opts.stateless {
maybe_set_snapshot_path(
config,
chain_config,
ctx.state_manager.chain_store().heaviest_tipset().epoch(),
opts.auto_download_snapshot,
&ctx.db_meta_data.get_root_dir(),
)
.await?;
}
let snapshot_tracker = ctx.snapshot_progress_tracker.clone();
if !opts.skip_load.unwrap_or_default()
&& let Some(path) = &config.client.snapshot_path
{
let (car_db_path, ts) = import_chain_as_forest_car(
path,
&ctx.db_meta_data.get_forest_car_db_dir(),
config.client.import_mode,
config.client.rpc_v1_endpoint()?,
&crate::f3::get_f3_root(config),
ctx.chain_config(),
&snapshot_tracker,
)
.await?;
ctx.db
.read_only_files(std::iter::once(car_db_path.clone()))?;
let ts_epoch = ts.epoch();
ctx.state_manager.chain_store().set_heaviest_tipset(ts)?;
debug!(
"Loaded car DB at {} and set current head to epoch {ts_epoch}",
car_db_path.display(),
);
}
if !snapshot_tracker.is_completed() {
snapshot_tracker.not_required();
}
if let Some(validate_from) = config.client.snapshot_height {
ensure_proof_params_downloaded().await?;
let current_height = config
.client
.snapshot_head
.unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch());
let validation_range = validation_range(current_height, validate_from)?;
let state_manager = ctx.state_manager.clone();
tokio::task::spawn_blocking(move || state_manager.validate_range(validation_range))
.await??;
}
Ok(())
}
fn validation_range(
current: ChainEpoch,
from: ChainEpoch,
) -> anyhow::Result<std::ops::RangeInclusive<ChainEpoch>> {
anyhow::ensure!(
current.is_positive(),
"current head epoch {current} is invalid"
);
let start = if from.is_negative() {
current.saturating_add(from).max(0)
} else {
from
};
anyhow::ensure!(
start <= current,
"requested validation start epoch {start} is beyond the current head at epoch {current}",
);
Ok(start..=current)
}
async fn maybe_start_metrics_service(
services: &mut JoinSet<anyhow::Result<()>>,
config: &Config,
ctx: &AppContext,
) -> anyhow::Result<()> {
if config.client.enable_metrics_endpoint {
let prometheus_listener = TcpListener::bind(config.client.metrics_address)
.await
.with_context(|| format!("could not bind to {}", config.client.metrics_address))?;
info!(
"Prometheus server started at {}",
config.client.metrics_address
);
let db_directory = crate::db::db_engine::db_root(&chain_path(config))?;
let db = ctx.db.writer().clone();
let get_chain_head_height = Arc::new({
let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
move || {
chain_store
.upgrade()
.map(|cs| cs.heaviest_tipset().epoch())
.unwrap_or_default()
}
});
let get_chain_head_actor_version = Arc::new({
let chain_store = Arc::downgrade(ctx.state_manager.chain_store());
move || {
if let Some(cs) = chain_store.upgrade()
&& let Ok(state) = StateTree::new_from_root(
cs.blockstore().clone(),
cs.heaviest_tipset().parent_state(),
)
&& let Ok(bundle_meta) = state.get_actor_bundle_metadata()
&& let Ok(actor_version) = bundle_meta.actor_major_version()
{
return actor_version;
}
0
}
});
services.spawn({
let chain_config = ctx.chain_config().clone();
let get_chain_head_height = get_chain_head_height.clone();
async {
crate::metrics::init_prometheus(
prometheus_listener,
db_directory,
db,
chain_config,
get_chain_head_height,
get_chain_head_actor_version,
)
.await
.context("Failed to initiate prometheus server")
}
});
crate::metrics::register_collector(Box::new(
networks::metrics::NetworkHeightCollector::new(
ctx.state_manager.chain_config().block_delay_secs,
ctx.state_manager
.chain_store()
.genesis_block_header()
.timestamp,
get_chain_head_height,
),
));
}
Ok(())
}
async fn create_p2p_service(
services: &mut JoinSet<anyhow::Result<()>>,
config: &mut Config,
ctx: &AppContext,
) -> anyhow::Result<Libp2pService<DbType>> {
if config.network.bootstrap_peers.is_empty() {
config.network.bootstrap_peers = ctx.state_manager.chain_config().bootstrap_peers.clone();
}
let peer_manager = Arc::new(PeerManager::default());
services.spawn(peer_manager.clone().peer_operation_event_loop_task());
let p2p_service = Libp2pService::new(
config.network.clone(),
Arc::clone(ctx.state_manager.chain_store()),
peer_manager.clone(),
ctx.net_keypair.clone(),
config.chain.genesis_name(),
*ctx.state_manager.chain_store().genesis_block_header().cid(),
)
.await?;
Ok(p2p_service)
}
fn create_mpool(
services: &mut JoinSet<anyhow::Result<()>>,
p2p_service: &Libp2pService<DbType>,
ctx: &AppContext,
) -> anyhow::Result<Arc<MessagePool<Arc<ChainStore<DbType>>>>> {
Ok(MessagePool::new(
ctx.state_manager.chain_store().clone(),
p2p_service.network_sender().clone(),
MpoolConfig::load_config(ctx.db.writer().as_ref())?,
ctx.state_manager.chain_config().clone(),
services,
)
.map(Arc::new)?)
}
fn create_chain_follower(
opts: &CliOpts,
p2p_service: &Libp2pService<DbType>,
mpool: Arc<MessagePool<Arc<ChainStore<DbType>>>>,
ctx: &AppContext,
) -> anyhow::Result<Arc<ChainFollower<DbType>>> {
let network_send = p2p_service.network_sender().clone();
let peer_manager = p2p_service.peer_manager().clone();
let network = SyncNetworkContext::new(network_send, peer_manager, ctx.db.clone());
Ok(Arc::new(ChainFollower::new(
ctx.state_manager.clone(),
network,
Tipset::from(ctx.state_manager.chain_store().genesis_block_header()),
p2p_service.network_receiver(),
opts.stateless,
mpool,
)))
}
fn start_chain_follower_service(
services: &mut JoinSet<anyhow::Result<()>>,
chain_follower: Arc<ChainFollower<DbType>>,
) {
services.spawn(async move { chain_follower.run().await });
}
async fn maybe_start_health_check_service(
services: &mut JoinSet<anyhow::Result<()>>,
config: &Config,
p2p_service: &Libp2pService<DbType>,
chain_follower: &ChainFollower<DbType>,
ctx: &AppContext,
) -> anyhow::Result<()> {
if config.client.enable_health_check {
let forest_state = crate::health::ForestState {
config: config.clone(),
chain_config: ctx.state_manager.chain_config().clone(),
genesis_timestamp: ctx
.state_manager
.chain_store()
.genesis_block_header()
.timestamp,
sync_status: chain_follower.sync_status.clone(),
peer_manager: p2p_service.peer_manager().clone(),
};
let healthcheck_address = forest_state.config.client.healthcheck_address;
info!("Healthcheck endpoint will listen at {healthcheck_address}");
let listener = tokio::net::TcpListener::bind(healthcheck_address).await?;
services.spawn(async move {
crate::health::init_healthcheck_server(forest_state, listener)
.await
.context("Failed to initiate healthcheck server")
});
} else {
info!("Healthcheck service is disabled");
}
Ok(())
}
fn maybe_start_gc_service(
services: &mut JoinSet<anyhow::Result<()>>,
opts: &CliOpts,
config: &Config,
chain_follower: Arc<ChainFollower<DbType>>,
) -> anyhow::Result<()> {
if opts.stateless {
return Ok(());
}
let snap_gc = Arc::new(SnapshotGarbageCollector::new(chain_follower, config)?);
GLOBAL_SNAPSHOT_GC
.set(snap_gc.clone())
.ok()
.context("failed to set GLOBAL_SNAPSHOT_GC")?;
services.spawn({
let snap_gc = snap_gc.clone();
async move {
snap_gc.event_loop().await;
Ok(())
}
});
if !opts.no_gc {
services.spawn({
let snap_gc = snap_gc.clone();
async move {
snap_gc.scheduler_loop().await;
Ok(())
}
});
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn maybe_start_rpc_service(
services: &mut JoinSet<anyhow::Result<()>>,
config: &Config,
mpool: Arc<MessagePool<Arc<ChainStore<DbType>>>>,
chain_follower: &ChainFollower<DbType>,
start_time: chrono::DateTime<chrono::Utc>,
shutdown: mpsc::Sender<()>,
rpc_stop_handle: jsonrpsee::server::StopHandle,
ctx: &AppContext,
) -> anyhow::Result<()> {
if config.client.enable_rpc {
let rpc_address = config.client.rpc_address;
let filter_list = config
.client
.rpc_filter_list
.as_ref()
.map(|path| crate::rpc::FilterList::new_from_file(path))
.transpose()?;
info!("JSON-RPC endpoint will listen at {rpc_address}");
let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events));
if is_env_truthy("FOREST_JWT_DISABLE_EXP_VALIDATION") {
warn!(
"JWT expiration validation is disabled; this significantly weakens security and should only be used in tightly controlled environments"
);
}
services.spawn({
let state_manager = ctx.state_manager.shallow_clone();
let bad_blocks = chain_follower.bad_blocks.shallow_clone();
let sync_status = chain_follower.sync_status.shallow_clone();
let sync_network_context = chain_follower.network.shallow_clone();
let tipset_send = chain_follower.tipset_sender.clone();
let keystore = ctx.keystore.shallow_clone();
let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone();
let nonce_tracker = NonceTracker::new();
let mpool_locker = MpoolLocker::new();
let temp_dir = Arc::new(ctx.temp_dir.clone());
async move {
let rpc_listener = tokio::net::TcpListener::bind(rpc_address)
.await
.map_err(|e| {
anyhow::anyhow!("Unable to listen on RPC endpoint {rpc_address}: {e}")
})
.unwrap();
start_rpc(
RPCState {
state_manager,
keystore,
mpool,
bad_blocks,
sync_status,
eth_event_handler,
sync_network_context,
start_time,
shutdown,
tipset_send,
snapshot_progress_tracker,
mpool_locker,
nonce_tracker,
temp_dir,
},
rpc_listener,
rpc_stop_handle,
filter_list,
)
.await
}
});
} else {
debug!("RPC disabled.");
};
Ok(())
}
fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) -> anyhow::Result<()> {
if crate::rpc::f3::F3_LEASE_MANAGER.get().is_some() {
return Ok(());
}
if !config.client.enable_rpc {
if crate::f3::is_sidecar_ffi_enabled(ctx.state_manager.chain_config()) {
tracing::warn!("F3 sidecar is enabled but not run because RPC is disabled. ")
}
return Ok(());
}
if !opts.halt_after_import && !opts.stateless {
let rpc_endpoint = config.client.rpc_v1_endpoint()?;
let state_manager = &ctx.state_manager;
let p2p_peer_id = ctx.p2p_peer_id;
let admin_jwt = ctx.admin_jwt.clone();
tokio::task::spawn_blocking({
crate::rpc::f3::F3_LEASE_MANAGER
.set(crate::rpc::f3::F3LeaseManager::new(
state_manager.chain_config().network.clone(),
p2p_peer_id,
))
.expect("F3 lease manager should not have been initialized before");
let chain_config = state_manager.chain_config().clone();
let f3_root = crate::f3::get_f3_root(config);
let crate::f3::F3Options {
chain_finality,
bootstrap_epoch,
initial_power_table,
} = crate::f3::get_f3_sidecar_params(&chain_config);
move || {
crate::f3::run_f3_sidecar_if_enabled(
&chain_config,
rpc_endpoint.to_string(),
admin_jwt,
crate::rpc::f3::get_f3_rpc_endpoint().to_string(),
initial_power_table
.map(|i| i.to_string())
.unwrap_or_default(),
bootstrap_epoch,
chain_finality,
f3_root.display().to_string(),
);
}
});
tokio::task::spawn({
let chain_store = ctx.chain_store().clone();
async move {
tokio::time::sleep(Duration::from_secs(1)).await;
match (|| crate::rpc::f3::F3GetLatestCertificate::get())
.retry(ExponentialBuilder::default())
.await
{
Ok(f3_finalized_cert) => {
let f3_finalized_head = f3_finalized_cert.chain_head();
match chain_store
.chain_index()
.load_required_tipset(&f3_finalized_head.key)
{
Ok(ts) => {
chain_store.set_f3_finalized_tipset(ts);
tracing::info!(
"Set F3 finalized tipset to epoch {} and key {}",
f3_finalized_head.epoch,
f3_finalized_head.key,
);
}
Err(e) => {
tracing::error!(
"Failed to get F3 finalized tipset epoch {} and key {}: {e}",
f3_finalized_head.epoch,
f3_finalized_head.key
);
}
}
}
Err(e) => {
tracing::error!("Failed to get F3 latest certificate: {e:#}");
}
}
}
});
}
Ok(())
}
fn maybe_start_indexer_service(
services: &mut JoinSet<anyhow::Result<()>>,
opts: &CliOpts,
config: &Config,
ctx: &AppContext,
) {
if config.chain_indexer.enable_indexer
&& !opts.stateless
&& !ctx.state_manager.chain_config().is_devnet()
{
let mut head_changes_rx = ctx.state_manager.chain_store().subscribe_head_changes();
let chain_store = ctx.state_manager.chain_store().clone();
services.spawn(async move {
tracing::info!("Starting indexer service");
loop {
for ts in head_changes_rx.recv().await?.applies {
tracing::debug!("Indexing tipset {}", ts.key());
let delegated_messages =
chain_store.headers_delegated_messages(ts.block_headers().iter())?;
chain_store.process_signed_messages(&delegated_messages)?;
}
}
});
if let Some(retention_epochs) = config.chain_indexer.gc_retention_epochs {
let chain_store = ctx.state_manager.chain_store().clone();
let chain_config = ctx.state_manager.chain_config().clone();
services.spawn(async move {
tracing::info!("Starting collector for eth_mappings");
let mut collector = EthMappingCollector::new(
chain_store.blockstore().clone(),
chain_config.eth_chain_id,
retention_epochs.into(),
);
collector.run().await
});
}
}
}
pub(super) async fn start(
start_time: chrono::DateTime<chrono::Utc>,
opts: CliOpts,
config: Config,
shutdown_send: mpsc::Sender<()>,
rpc_stop_handle: jsonrpsee::server::StopHandle,
) -> anyhow::Result<()> {
startup_init(&config)?;
start_services(
start_time,
&opts,
config.clone(),
shutdown_send.clone(),
rpc_stop_handle,
)
.await
}
pub(super) async fn start_services(
start_time: chrono::DateTime<chrono::Utc>,
opts: &CliOpts,
mut config: Config,
shutdown_send: mpsc::Sender<()>,
rpc_stop_handle: jsonrpsee::server::StopHandle,
) -> anyhow::Result<()> {
crate::metrics::reset_collector_registry();
let mut services = JoinSet::new();
let network = config.chain();
let ctx = AppContext::init(opts, &config).await?;
info!("Using network :: {network}");
utils::misc::display_chain_logo(config.chain());
if opts.exit_after_init {
return Ok(());
}
if !opts.stateless
&& !opts.skip_load_actors
&& let Err(e) = ctx.state_manager.maybe_rewind_heaviest_tipset()
{
tracing::warn!("error in maybe_rewind_heaviest_tipset: {e:#}");
}
let p2p_service = create_p2p_service(&mut services, &mut config, &ctx).await?;
let mpool = create_mpool(&mut services, &p2p_service, &ctx)?;
let chain_follower = create_chain_follower(opts, &p2p_service, mpool.clone(), &ctx)?;
maybe_start_rpc_service(
&mut services,
&config,
mpool.clone(),
&chain_follower,
start_time,
shutdown_send.clone(),
rpc_stop_handle,
&ctx,
)?;
maybe_import_snapshot(opts, &mut config, &ctx).await?;
if opts.halt_after_import {
services.shutdown().await;
return Ok(());
}
warmup_in_background(&ctx);
maybe_start_gc_service(&mut services, opts, &config, chain_follower.clone())?;
maybe_start_metrics_service(&mut services, &config, &ctx).await?;
maybe_start_f3_service(opts, &config, &ctx)?;
maybe_start_health_check_service(&mut services, &config, &p2p_service, &chain_follower, &ctx)
.await?;
maybe_start_indexer_service(&mut services, opts, &config, &ctx);
if !opts.stateless {
ensure_proof_params_downloaded().await?;
}
services.spawn(p2p_service.run());
start_chain_follower_service(&mut services, chain_follower);
propagate_error(&mut services)
.await
.context("services failure")
.map(|_| {})
}
fn warmup_in_background(ctx: &AppContext) {
let cs = ctx.chain_store().clone();
tokio::task::spawn_blocking(move || {
let start = Instant::now();
match cs.chain_index().tipset_by_height(
1,
cs.heaviest_tipset(),
ResolveNullTipset::TakeOlder,
) {
Ok(_) => {
tracing::info!(
"Successfully populated tipset_by_height cache, took {}",
humantime::format_duration(start.elapsed())
);
}
Err(e) => {
tracing::warn!("Failed to populate tipset_by_height cache: {e}");
}
}
});
}
async fn maybe_set_snapshot_path(
config: &mut Config,
chain_config: &ChainConfig,
epoch: ChainEpoch,
auto_download_snapshot: bool,
download_directory: &Path,
) -> anyhow::Result<()> {
if !download_directory.is_dir() {
anyhow::bail!(
"`download_directory` does not exist: {}",
download_directory.display()
);
}
let vendor = snapshot::TrustedVendor::default();
let chain = config.chain();
let network_version = chain_config.network_version(epoch);
let network_version_is_small = network_version < NetworkVersion::V16;
let require_a_snapshot = network_version_is_small;
let have_a_snapshot = config.client.snapshot_path.is_some();
match (require_a_snapshot, have_a_snapshot, auto_download_snapshot) {
(false, _, _) => {} (true, true, _) => {} (true, false, true) => {
const AUTO_SNAPSHOT_PATH_ENV_KEY: &str = "FOREST_AUTO_DOWNLOAD_SNAPSHOT_PATH";
match std::env::var(AUTO_SNAPSHOT_PATH_ENV_KEY) {
Ok(path) if !path.is_empty() => {
tracing::info!(
"importing snapshot from {path} set by `{AUTO_SNAPSHOT_PATH_ENV_KEY}`"
);
config.client.snapshot_path = Some(path.into());
}
_ => {
let (resolved_url, _num_bytes, filename) =
crate::cli_shared::snapshot::peek(vendor, chain).await?;
tracing::info!("Downloading snapshot: {filename}");
config.client.snapshot_path = Some(resolved_url.to_string().into());
}
}
}
(true, false, false) => {
let (url, num_bytes, filename) = crate::cli_shared::snapshot::peek(vendor, chain)
.await
.context("couldn't get snapshot size")?;
println!(
"Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
);
let message = format!(
"Fetch a {} snapshot? (denying will exit the program). ",
indicatif::HumanBytes(num_bytes)
);
let have_permission = asyncify(|| {
dialoguer::Confirm::with_theme(&ColorfulTheme::default())
.with_prompt(message)
.default(false)
.interact()
.unwrap_or(false)
})
.await;
if !have_permission {
bail!(
"Forest requires a snapshot to sync with the network, but automatic fetching is disabled."
)
}
tracing::info!("Downloading snapshot: {filename}");
config.client.snapshot_path = Some(url.to_string().into());
}
};
Ok(())
}
async fn propagate_error(
services: &mut JoinSet<anyhow::Result<()>>,
) -> anyhow::Result<std::convert::Infallible> {
while let Some(result) = services.join_next().await {
if let Ok(Err(error_message)) = result {
return Err(error_message);
}
}
std::future::pending().await
}
fn asyncify<T>(f: impl FnOnce() -> T + Send + 'static) -> impl Future<Output = T>
where
T: Send + 'static,
{
tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") })
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
#[rstest]
#[case::current_non_positive(0, 1, anyhow::Result::Err(anyhow::anyhow!(
"current head epoch 0 is invalid"
)))]
#[case::current_non_positive(-1, 1, anyhow::Result::Err(anyhow::anyhow!(
"current head epoch 0 is invalid"
)))]
#[case::from_positive_beyond_head(10, 11, anyhow::Result::Err(anyhow::anyhow!(
"requested validation start epoch 11 is beyond the current head at epoch 10"
)))]
#[case::from_positive_within_range(10, 5, anyhow::Result::Ok(5..=10))]
#[case::from_zero(10, 0, anyhow::Result::Ok(0..=10))]
#[case::from_negative_within_range(10, -5, anyhow::Result::Ok(5..=10))]
#[case::from_negative_beyond_range(10, -15, anyhow::Result::Ok(0..=10))]
fn test_validation_range(
#[case] current: ChainEpoch,
#[case] from: ChainEpoch,
#[case] expected: anyhow::Result<std::ops::RangeInclusive<ChainEpoch>>,
) {
let result = validation_range(current, from);
match expected {
Ok(expected_range) => {
assert_eq!(result.unwrap(), expected_range);
}
Err(_) => {
assert!(result.is_err());
}
}
}
}