mod backfill;
mod bulk;
mod conn;
mod health;
mod metrics;
mod middleware;
mod rpc;
mod storage;
mod subscribe;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use clap::{Parser, ValueEnum};
use serde_json::Value;
use tokio::sync::{Notify, broadcast};
use tracing::{info, warn};
use crate::backfill::{BACKFILL_INTER_FETCH_MS, backfill_loop, summary_loop};
use crate::storage::Storage;
use crate::subscribe::{BROWSER_UA, fetch_chain_id, ingest};
#[derive(Debug, Clone, Copy, ValueEnum)]
#[clap(rename_all = "lower")]
enum Network {
Mainnet,
Testnet,
}
impl Network {
const fn ws_url(self) -> &'static str {
match self {
Self::Mainnet => "wss://api.avax.network/ext/bc/C/ws",
Self::Testnet => "wss://api.avax-test.network/ext/bc/C/ws",
}
}
const fn rpc_url(self) -> &'static str {
match self {
Self::Mainnet => "https://api.avax.network/ext/bc/C/rpc",
Self::Testnet => "https://api.avax-test.network/ext/bc/C/rpc",
}
}
const fn as_str(self) -> &'static str {
match self {
Self::Mainnet => "mainnet",
Self::Testnet => "testnet",
}
}
fn default_data_dir(self) -> PathBuf {
PathBuf::from(format!("./blockstore-data-{}", self.as_str()))
}
}
const CLI_EXAMPLES: &str = "\
EXAMPLES:
# Dev quick start — use the permissive testnet endpoints.
neve --network testnet
# Bounded test run, debug logging, custom data dir.
neve --network testnet --stop-time 30 --log-level debug --data-dir /tmp/bs
# Backfill deep history into a fresh store (here: the whole chain from
# genesis). Anchored at creation; stays throttled against the public endpoint.
neve --backfill-floor 0
";
#[derive(Debug, Clone, Copy, ValueEnum)]
#[clap(rename_all = "lower")]
enum LogLevel {
Trace,
Debug,
Info,
Warn,
Error,
}
impl LogLevel {
const fn as_str(self) -> &'static str {
match self {
Self::Trace => "trace",
Self::Debug => "debug",
Self::Info => "info",
Self::Warn => "warn",
Self::Error => "error",
}
}
}
#[derive(Debug, Parser)]
#[command(
version,
about = "Avalanche C-chain block streamer + JSON-RPC mirror",
after_help = CLI_EXAMPLES,
)]
struct Cli {
#[arg(long, value_enum, default_value_t = LogLevel::Info)]
log_level: LogLevel,
#[arg(long, value_parser = parse_human_duration)]
stop_time: Option<Duration>,
#[arg(long, value_parser = parse_human_duration, default_value = "10m")]
max_wait: Duration,
#[arg(long, value_parser = parse_human_duration, default_value = "2m")]
ws_idle_timeout: Duration,
#[arg(long)]
ws_url: Option<String>,
#[arg(long)]
rpc_url: Option<String>,
#[arg(long, value_enum, default_value_t = Network::Mainnet)]
network: Network,
#[arg(long, value_name = "URL")]
mirror_from: Option<String>,
#[arg(long)]
data_dir: Option<PathBuf>,
#[arg(long, default_value = "127.0.0.1:8545")]
rpc_addr: std::net::SocketAddr,
#[arg(long, default_value_t = 1024)]
max_connections: u32,
#[arg(long, value_parser = parse_human_duration, default_value = "5m")]
summary_period: Duration,
#[arg(long, value_name = "HEIGHT")]
backfill_floor: Option<u64>,
#[arg(long, value_parser = parse_human_duration, default_value = "60s")]
idle_timeout: Duration,
#[arg(long, default_value_t = 10_000)]
max_blocks_per_request: u64,
}
#[derive(Clone)]
struct IngestCfg {
max_wait: Duration,
ws_idle_timeout: Duration,
ws_url: String,
rpc_url: String,
blocks: broadcast::Sender<Value>,
subscribe_blocks: bool,
backfill_inter_fetch: Duration,
backfill_floor: Option<u64>,
fatal: Arc<Notify>,
bootstrap_done: Arc<Notify>,
}
impl IngestCfg {
fn new(
cli: &Cli,
ws_url: String,
rpc_url: String,
blocks: broadcast::Sender<Value>,
backfill_floor: Option<u64>,
) -> Self {
let mirror = cli.mirror_from.is_some();
let backfill_inter_fetch = if mirror {
Duration::ZERO
} else {
Duration::from_millis(BACKFILL_INTER_FETCH_MS)
};
Self {
max_wait: cli.max_wait,
ws_idle_timeout: cli.ws_idle_timeout,
ws_url,
rpc_url,
blocks,
subscribe_blocks: mirror,
backfill_inter_fetch,
backfill_floor,
fatal: Arc::new(Notify::new()),
bootstrap_done: Arc::new(Notify::new()),
}
}
}
fn parse_human_duration(s: &str) -> Result<Duration, String> {
if let Ok(secs) = s.parse::<u64>() {
return Ok(Duration::from_secs(secs));
}
parse_duration::parse(s).map_err(|e| e.to_string())
}
fn init_tracing(default_level: &str) {
let interactive = std::io::IsTerminal::is_terminal(&std::io::stdout());
let builder = tracing_subscriber::fmt()
.with_ansi(interactive)
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_level)),
);
if interactive {
builder.init();
} else {
builder.without_time().init();
}
}
fn spawn_metrics_upkeep(handle: metrics_exporter_prometheus::PrometheusHandle) {
tokio::spawn(async move {
let collector = metrics::process_collector();
let mut tick = tokio::time::interval(Duration::from_secs(5));
loop {
tick.tick().await;
handle.run_upkeep();
collector.collect();
}
});
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
rustls::crypto::ring::default_provider()
.install_default()
.map_err(|_| anyhow!("install rustls crypto provider"))?;
init_tracing(cli.log_level.as_str());
let metrics_handle = metrics::install()?;
spawn_metrics_upkeep(metrics_handle.clone());
let http = reqwest::Client::builder().user_agent(BROWSER_UA).build()?;
let (ws_url, rpc_url) = resolve_endpoints(&cli)?;
let chain_id = fetch_chain_id(&http, &rpc_url, cli.max_wait).await?;
info!(chain_id, rpc_url = %rpc_url, "queried upstream chain_id");
let data_dir = cli
.data_dir
.clone()
.unwrap_or_else(|| cli.network.default_data_dir());
std::fs::create_dir_all(&data_dir)?;
let anchor_floor = resolve_anchor_floor(&http, &cli, &data_dir).await;
let storage = Storage::open(&data_dir, chain_id, anchor_floor)?;
info!(
path = %data_dir.display(),
chain_id,
high_water = storage.high_water().await,
"storage opened",
);
let backfill_count = Arc::new(AtomicU64::new(0));
let behind_tip = Arc::new(AtomicU64::new(0));
let (block_tx, _) = broadcast::channel::<Value>(1024);
let idle_timeout = (cli.idle_timeout > Duration::ZERO).then_some(cli.idle_timeout);
let serve_cfg = rpc::ServeConfig {
addr: cli.rpc_addr,
max_connections: cli.max_connections,
idle_timeout,
max_blocks_per_request: cli.max_blocks_per_request,
};
let _rpc_handle = rpc::serve(
serve_cfg,
storage.clone(),
data_dir.clone(),
chain_id,
behind_tip.clone(),
block_tx.clone(),
metrics_handle,
)
.await?;
let cfg = IngestCfg::new(&cli, ws_url, rpc_url, block_tx, anchor_floor);
info!(
max_wait_secs = cfg.max_wait.as_secs(),
ws_idle_timeout_secs = cfg.ws_idle_timeout.as_secs(),
ws_url = %cfg.ws_url,
rpc_url = %cfg.rpc_url,
"ingest config",
);
tokio::spawn(backfill_loop(
storage.clone(),
http.clone(),
cfg.clone(),
backfill_count.clone(),
behind_tip.clone(),
));
tokio::spawn(summary_loop(
storage.clone(),
cli.summary_period,
backfill_count,
));
let fatal = cfg.fatal.clone();
let storage_close = storage.clone();
let ingest_fut = ingest(storage, http, cfg);
if let Some(stop) = cli.stop_time {
info!(?stop, "stop-time set, will exit after this duration");
}
Box::pin(run_until_shutdown(
ingest_fut,
fatal,
cli.stop_time,
storage_close,
))
.await
}
async fn run_until_shutdown(
ingest_fut: impl std::future::Future<Output = Result<()>>,
fatal: Arc<Notify>,
stop_time: Option<Duration>,
storage_close: Storage,
) -> Result<()> {
let outcome = tokio::select! {
r = ingest_fut => r,
() = sleep_or_pending(stop_time) => {
info!("stop-time reached, shutting down");
Ok(())
}
sig = wait_for_signal() => {
info!(signal = sig, "signal received, shutting down");
Ok(())
}
() = fatal.notified() => {
Err(anyhow!("fatal upstream condition; see prior ERROR log"))
}
};
info!("flushing storage to disk");
if let Err(e) = storage_close.persist().await {
warn!(error = %e, "storage flush on shutdown failed");
}
outcome
}
async fn sleep_or_pending(stop: Option<Duration>) {
match stop {
Some(d) => tokio::time::sleep(d).await,
None => std::future::pending::<()>().await,
}
}
async fn wait_for_signal() -> &'static str {
use tokio::signal::unix::{SignalKind, signal};
let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
let mut sigquit = signal(SignalKind::quit()).expect("install SIGQUIT handler");
tokio::select! {
_ = sigint.recv() => "SIGINT",
_ = sigterm.recv() => "SIGTERM",
_ = sigquit.recv() => "SIGQUIT",
}
}
fn resolve_endpoints(cli: &Cli) -> Result<(String, String)> {
if let Some(base) = cli.mirror_from.as_deref() {
let base = base.trim_end_matches('/').to_owned();
let ws = derive_ws_url(&base)?;
info!(rpc = %base, ws = %ws, "mirror mode: derived endpoints from --mirror-from");
return Ok((ws, base));
}
Ok((
cli.ws_url
.clone()
.unwrap_or_else(|| cli.network.ws_url().to_owned()),
cli.rpc_url
.clone()
.unwrap_or_else(|| cli.network.rpc_url().to_owned()),
))
}
async fn resolve_anchor_floor(
http: &reqwest::Client,
cli: &Cli,
data_dir: &std::path::Path,
) -> Option<u64> {
let store_exists = data_dir.join("blocks").join("blockdb.idx").exists();
if let Some(floor) = cli.backfill_floor {
if store_exists {
info!(
floor,
"--backfill-floor ignored: store already exists, resuming with its baked-in floor",
);
return None;
}
info!(floor, "anchoring backfill floor at --backfill-floor");
return Some(floor);
}
let base = cli.mirror_from.as_deref()?;
if store_exists {
info!("mirror: local store already exists, resuming with its anchored floor");
return None;
}
match fetch_upstream_min_height(http, base).await {
Ok(min_h) => {
info!(
min_height = min_h,
"mirror: anchoring backfill floor at upstream's earliest retained block",
);
Some(min_h)
}
Err(e) => {
warn!(error = %e, "mirror: /health probe failed; falling back to forward-only from tip");
None
}
}
}
fn derive_ws_url(base: &str) -> Result<String> {
if let Some(rest) = base.strip_prefix("https://") {
Ok(format!("wss://{rest}"))
} else if let Some(rest) = base.strip_prefix("http://") {
Ok(format!("ws://{rest}"))
} else if base.starts_with("ws://") || base.starts_with("wss://") {
Ok(base.to_owned())
} else {
bail!("--mirror-from must be an http(s):// (or ws(s)://) URL, got: {base}")
}
}
async fn fetch_upstream_min_height(http: &reqwest::Client, base: &str) -> Result<u64> {
let url = format!("{}/health", base.trim_end_matches('/'));
let resp = http
.get(&url)
.send()
.await
.with_context(|| format!("GET {url}"))?;
if !resp.status().is_success() {
bail!("upstream /health returned HTTP {}", resp.status());
}
let v: Value = resp.json().await.context("decode /health body")?;
v.get("blocks")
.and_then(|b| b.get("min_height"))
.and_then(Value::as_u64)
.ok_or_else(|| anyhow!("/health missing blocks.min_height (is the upstream a neve?)"))
}