neve 0.1.1

A read-only cache for avalanchego's JSON-RPC frontend API, backed by a local block store.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
mod backfill;
mod bulk;
mod conn;
mod health;
mod metrics;
mod middleware;
mod rpc;
mod storage;
mod subscribe;

use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::time::Duration;

use anyhow::{Context, Result, anyhow, bail};
use clap::{Parser, ValueEnum};
use serde_json::Value;
use tokio::sync::{Notify, broadcast};
use tracing::{info, warn};

use crate::backfill::{BACKFILL_INTER_FETCH_MS, backfill_loop, summary_loop};
use crate::storage::Storage;
use crate::subscribe::{BROWSER_UA, fetch_chain_id, ingest};

#[derive(Debug, Clone, Copy, ValueEnum)]
#[clap(rename_all = "lower")]
enum Network {
    Mainnet,
    Testnet,
}

impl Network {
    const fn ws_url(self) -> &'static str {
        match self {
            Self::Mainnet => "wss://api.avax.network/ext/bc/C/ws",
            Self::Testnet => "wss://api.avax-test.network/ext/bc/C/ws",
        }
    }
    const fn rpc_url(self) -> &'static str {
        match self {
            Self::Mainnet => "https://api.avax.network/ext/bc/C/rpc",
            Self::Testnet => "https://api.avax-test.network/ext/bc/C/rpc",
        }
    }
    const fn as_str(self) -> &'static str {
        match self {
            Self::Mainnet => "mainnet",
            Self::Testnet => "testnet",
        }
    }
    fn default_data_dir(self) -> PathBuf {
        PathBuf::from(format!("./blockstore-data-{}", self.as_str()))
    }
}

const CLI_EXAMPLES: &str = "\
EXAMPLES:
  # Dev quick start — use the permissive testnet endpoints.
  neve --network testnet

  # Bounded test run, debug logging, custom data dir.
  neve --network testnet --stop-time 30 --log-level debug --data-dir /tmp/bs

  # Backfill deep history into a fresh store (here: the whole chain from
  # genesis). Anchored at creation; stays throttled against the public endpoint.
  neve --backfill-floor 0
";

#[derive(Debug, Clone, Copy, ValueEnum)]
#[clap(rename_all = "lower")]
enum LogLevel {
    Trace,
    Debug,
    Info,
    Warn,
    Error,
}

impl LogLevel {
    const fn as_str(self) -> &'static str {
        match self {
            Self::Trace => "trace",
            Self::Debug => "debug",
            Self::Info => "info",
            Self::Warn => "warn",
            Self::Error => "error",
        }
    }
}

#[derive(Debug, Parser)]
#[command(
    version,
    about = "Avalanche C-chain block streamer + JSON-RPC mirror",
    after_help = CLI_EXAMPLES,
)]
struct Cli {
    /// Logging verbosity. Overridden by `RUST_LOG` if set.
    #[arg(long, value_enum, default_value_t = LogLevel::Info)]
    log_level: LogLevel,

    /// Stop after the given duration (e.g. `30s`, `5m`, `1h`). Parsed via
    /// the `parse_duration` crate. Useful for short test runs.
    #[arg(long, value_parser = parse_human_duration)]
    stop_time: Option<Duration>,

    /// Maximum time to wait when upstream sends `Retry-After` (e.g. `30s`,
    /// `10m`, `1h`). If the server asks us to wait longer than this, we log
    /// an error and shut down rather than silently sleep. Default: 10m.
    #[arg(long, value_parser = parse_human_duration, default_value = "10m")]
    max_wait: Duration,

    /// Drop and reconnect the WebSocket if no `newHeads` arrive within this
    /// window (e.g. `30s`, `2m`). Guards against a silently-dead socket — a
    /// half-open TCP connection or a stalled subscription that never errors,
    /// where the read would otherwise block forever. Default: 2m.
    #[arg(long, value_parser = parse_human_duration, default_value = "2m")]
    ws_idle_timeout: Duration,

    /// WebSocket endpoint for `newHeads` subscription. Defaults to the URL
    /// for the configured `--network`. An explicit `--ws-url` wins.
    #[arg(long)]
    ws_url: Option<String>,

    /// HTTPS JSON-RPC endpoint for block fetches. Defaults to the
    /// URL for the configured `--network`. An explicit `--rpc-url` wins.
    #[arg(long)]
    rpc_url: Option<String>,

    /// Which Avalanche network to target. Picks the default WS / RPC URLs
    /// and the default `--data-dir`. Testnet has much more permissive rate
    /// limits and is recommended for dev work.
    #[arg(long, value_enum, default_value_t = Network::Mainnet)]
    network: Network,

    /// Mirror another neve instance from a single endpoint. neve serves
    /// JSON-RPC, the `newHeads` WebSocket, and `/health` on one socket, so
    /// this one URL yields all three: the WS and RPC endpoints are derived
    /// from it (`http`→`ws`, `https`→`wss`), overriding `--network` /
    /// `--ws-url` / `--rpc-url`. When the local store is empty, the
    /// upstream's `/health` is queried for its earliest retained block and
    /// the store is anchored there so backfill reproduces the upstream's
    /// whole retained range (not just forward from the tip). Backfill runs
    /// unthrottled in this mode — there's no public-endpoint rate limit to
    /// be polite to. Example: `--mirror-from http://10.0.0.5:8545`.
    #[arg(long, value_name = "URL")]
    mirror_from: Option<String>,

    /// Directory holding the blockstore + fjall index. Created if missing.
    /// Defaults to `./blockstore-data-<network>` so swapping networks
    /// doesn't cross-pollinate stores. A `NETWORK` stamp file is written
    /// on first open and verified on subsequent opens.
    #[arg(long)]
    data_dir: Option<PathBuf>,

    /// Socket address for the JSON-RPC server.
    #[arg(long, default_value = "127.0.0.1:8545")]
    rpc_addr: std::net::SocketAddr,

    /// Maximum concurrent JSON-RPC connections. Excess connections are
    /// rejected with HTTP 429. jsonrpsee's own default is only 100, which a
    /// public/wallet-facing endpoint blows past easily.
    #[arg(long, default_value_t = 1024)]
    max_connections: u32,

    /// Cadence for the periodic `summary` INFO log line (e.g. `30s`, `5m`,
    /// `1h`). The first summary fires shortly after startup regardless.
    #[arg(long, value_parser = parse_human_duration, default_value = "5m")]
    summary_period: Duration,

    /// Lowest block height backfill should fill down to, anchored when the
    /// store is first created. Without it, neve anchors at the first `newHead`
    /// it receives and only fills *forward* from there; set it to retain deep
    /// history — e.g. `--backfill-floor 0` to mirror the whole chain. Ignored
    /// if the store already exists (the floor is baked in at creation; truncate
    /// the data dir to re-anchor). Against the public endpoint backfill stays
    /// throttled to ~25 req/s, so a deep floor takes a long time to fill.
    /// Overrides the `--mirror-from` auto-floor when both are given.
    #[arg(long, value_name = "HEIGHT")]
    backfill_floor: Option<u64>,

    /// Close a JSON-RPC connection that has had no read or write activity for
    /// this long (e.g. `60s`, `2m`). Defends against slowloris and the leaked
    /// idle-keep-alive fd growth jsonrpsee can't reap on its own. `0` disables
    /// the reaping entirely (connections may then linger until `--max-connections`).
    #[arg(long, value_parser = parse_human_duration, default_value = "60s")]
    idle_timeout: Duration,

    /// Maximum number of blocks a single `GET /blocks?from=&to=` bulk-export
    /// request may return; larger ranges are rejected with HTTP 400. Split a
    /// bigger download into successive windows.
    #[arg(long, default_value_t = 10_000)]
    max_blocks_per_request: u64,
}

/// Runtime knobs that need to be available deep in the ingest/backfill paths.
#[derive(Clone)]
struct IngestCfg {
    max_wait: Duration,
    /// Reconnect the WebSocket if no `newHeads` arrive within this window.
    ws_idle_timeout: Duration,
    ws_url: String,
    rpc_url: String,
    /// Publishes each freshly-persisted **full** block to subscribers (the
    /// fan-out source for `newHeads` and `newBlocks`). Only the WS-driven
    /// path feeds this; backfill does not (those aren't "new"). Clone is
    /// cheap — it's a `broadcast::Sender` handle.
    blocks: broadcast::Sender<Value>,
    /// Subscribe to `newBlocks` (whole block, no follow-up fetch) instead of
    /// `newHeads` (header, then fetch). `true` in `--mirror-from` mode, where
    /// the upstream is a neve that serves the extension; `false` against the
    /// public endpoint, which only offers `newHeads`.
    subscribe_blocks: bool,
    /// Minimum delay between backfill block fetches. `40ms` (~25 req/s) by
    /// default to stay under Cloudflare on the public endpoint; `0` in
    /// `--mirror-from` mode, where the upstream is another neve with no such
    /// limit.
    backfill_inter_fetch: Duration,
    /// Lowest height backfill should fill down to. `Some(floor)` in mirror
    /// mode (the upstream's earliest retained height) lets backfill begin
    /// from `floor` without waiting for a `newHead` to anchor the store.
    /// `None` keeps the original "anchor at first newHead, fill forward only"
    /// behavior.
    backfill_floor: Option<u64>,
    /// Notified when something fatal happens (e.g. upstream throttle exceeds
    /// `--max-wait`). main's select! awaits this and exits with an error.
    fatal: Arc<Notify>,
    /// Notified once the mirror's `oldBlocks` bootstrap has finished streaming
    /// the historical range (or given up). The backfill loop waits on this in
    /// mirror mode so it doesn't race the bootstrap's ascending frontier with
    /// redundant HTTPS fetches. Unused (never awaited) outside mirror mode.
    bootstrap_done: Arc<Notify>,
}

impl IngestCfg {
    /// Assemble the ingest knobs from the CLI plus the already-resolved
    /// WebSocket / RPC endpoints, with a fresh `fatal` notifier.
    fn new(
        cli: &Cli,
        ws_url: String,
        rpc_url: String,
        blocks: broadcast::Sender<Value>,
        backfill_floor: Option<u64>,
    ) -> Self {
        // Mirror mode targets another neve: backfill unthrottled, and use the
        // newBlocks extension to skip the per-block fetch round-trip.
        let mirror = cli.mirror_from.is_some();
        let backfill_inter_fetch = if mirror {
            Duration::ZERO
        } else {
            Duration::from_millis(BACKFILL_INTER_FETCH_MS)
        };
        Self {
            max_wait: cli.max_wait,
            ws_idle_timeout: cli.ws_idle_timeout,
            ws_url,
            rpc_url,
            blocks,
            subscribe_blocks: mirror,
            backfill_inter_fetch,
            backfill_floor,
            fatal: Arc::new(Notify::new()),
            bootstrap_done: Arc::new(Notify::new()),
        }
    }
}

fn parse_human_duration(s: &str) -> Result<Duration, String> {
    // Plain integer → seconds, so `--stop-time 6` works without a unit suffix.
    if let Ok(secs) = s.parse::<u64>() {
        return Ok(Duration::from_secs(secs));
    }
    parse_duration::parse(s).map_err(|e| e.to_string())
}

/// Configure tracing output for the run's destination. An interactive terminal
/// gets ANSI colors and a timestamp; under systemd/journald (no TTY) both are
/// dropped — ANSI would be stored as literal `^[[2m…` escapes, and journald
/// already stamps every line, so neve's own timestamp would just be a duplicate.
fn init_tracing(default_level: &str) {
    let interactive = std::io::IsTerminal::is_terminal(&std::io::stdout());
    let builder = tracing_subscriber::fmt()
        .with_ansi(interactive)
        .with_env_filter(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_level)),
        );
    if interactive {
        builder.init();
    } else {
        builder.without_time().init();
    }
}

/// Drive `PrometheusHandle::run_upkeep` and refresh the `process_*` collector on
/// a fixed cadence. Upkeep drains histogram buckets and clears idle metrics so
/// the renderer's memory stays bounded over long runs; the collector re-reads
/// process CPU/memory/fd stats. 5s is frequent enough without measurable cost.
fn spawn_metrics_upkeep(handle: metrics_exporter_prometheus::PrometheusHandle) {
    tokio::spawn(async move {
        let collector = metrics::process_collector();
        let mut tick = tokio::time::interval(Duration::from_secs(5));
        loop {
            tick.tick().await;
            handle.run_upkeep();
            collector.collect();
        }
    });
}

#[tokio::main]
async fn main() -> Result<()> {
    let cli = Cli::parse();
    rustls::crypto::ring::default_provider()
        .install_default()
        .map_err(|_| anyhow!("install rustls crypto provider"))?;
    init_tracing(cli.log_level.as_str());

    // Install the global metrics recorder before anything records; the handle
    // renders the `/metrics` payload and drives periodic upkeep.
    let metrics_handle = metrics::install()?;
    spawn_metrics_upkeep(metrics_handle.clone());

    let http = reqwest::Client::builder().user_agent(BROWSER_UA).build()?;
    let (ws_url, rpc_url) = resolve_endpoints(&cli)?;
    let chain_id = fetch_chain_id(&http, &rpc_url, cli.max_wait).await?;
    info!(chain_id, rpc_url = %rpc_url, "queried upstream chain_id");

    let data_dir = cli
        .data_dir
        .clone()
        .unwrap_or_else(|| cli.network.default_data_dir());
    std::fs::create_dir_all(&data_dir)?;

    let anchor_floor = resolve_anchor_floor(&http, &cli, &data_dir).await;
    let storage = Storage::open(&data_dir, chain_id, anchor_floor)?;
    info!(
        path = %data_dir.display(),
        chain_id,
        high_water = storage.high_water().await,
        "storage opened",
    );

    let backfill_count = Arc::new(AtomicU64::new(0));
    let behind_tip = Arc::new(AtomicU64::new(0));
    // Full-block fan-out for eth_subscribe (newHeads / newBlocks). Capacity
    // 1024 ≈ minutes of tail at C-chain block rate; a subscriber slower than
    // that gets Lagged and resumes from the tip rather than back-pressuring
    // ingest.
    let (block_tx, _) = broadcast::channel::<Value>(1024);

    // `--idle-timeout 0` disables the connection reaper; a positive value enables
    // it. (`Option` rather than a magic-zero `Duration` past this boundary.)
    let idle_timeout = (cli.idle_timeout > Duration::ZERO).then_some(cli.idle_timeout);
    let serve_cfg = rpc::ServeConfig {
        addr: cli.rpc_addr,
        max_connections: cli.max_connections,
        idle_timeout,
        max_blocks_per_request: cli.max_blocks_per_request,
    };
    let _rpc_handle = rpc::serve(
        serve_cfg,
        storage.clone(),
        data_dir.clone(),
        chain_id,
        behind_tip.clone(),
        block_tx.clone(),
        metrics_handle,
    )
    .await?;
    let cfg = IngestCfg::new(&cli, ws_url, rpc_url, block_tx, anchor_floor);
    info!(
        max_wait_secs = cfg.max_wait.as_secs(),
        ws_idle_timeout_secs = cfg.ws_idle_timeout.as_secs(),
        ws_url = %cfg.ws_url,
        rpc_url = %cfg.rpc_url,
        "ingest config",
    );
    tokio::spawn(backfill_loop(
        storage.clone(),
        http.clone(),
        cfg.clone(),
        backfill_count.clone(),
        behind_tip.clone(),
    ));
    tokio::spawn(summary_loop(
        storage.clone(),
        cli.summary_period,
        backfill_count,
    ));

    let fatal = cfg.fatal.clone();
    let storage_close = storage.clone();
    let ingest_fut = ingest(storage, http, cfg);
    if let Some(stop) = cli.stop_time {
        info!(?stop, "stop-time set, will exit after this duration");
    }
    // Box::pin: this future transitively holds the large `ingest` state machine.
    Box::pin(run_until_shutdown(
        ingest_fut,
        fatal,
        cli.stop_time,
        storage_close,
    ))
    .await
}

/// Drive `ingest_fut` until the first shutdown trigger fires — ingest returning,
/// the optional stop-time elapsing, an OS signal, or a fatal upstream condition
/// — then flush storage to disk and return the run's outcome.
async fn run_until_shutdown(
    ingest_fut: impl std::future::Future<Output = Result<()>>,
    fatal: Arc<Notify>,
    stop_time: Option<Duration>,
    storage_close: Storage,
) -> Result<()> {
    let outcome = tokio::select! {
        r = ingest_fut => r,
        () = sleep_or_pending(stop_time) => {
            info!("stop-time reached, shutting down");
            Ok(())
        }
        sig = wait_for_signal() => {
            info!(signal = sig, "signal received, shutting down");
            Ok(())
        }
        () = fatal.notified() => {
            Err(anyhow!("fatal upstream condition; see prior ERROR log"))
        }
    };
    // Graceful flush. Returning drops the runtime, which cancels the spawned
    // tasks and drops Storage — the blockstore checkpoints in its own Drop, and
    // fjall's journal (a WAL) survives a clean exit in the page cache. But
    // steady-state writes use PersistMode::Buffer, so a *power failure* right
    // after exit could lose the un-synced tail; fsync it explicitly here. The
    // "Recovering keyspace" lines on the next startup are fjall's normal open
    // path (it always recovers when the marker file exists), not a dirty close.
    info!("flushing storage to disk");
    if let Err(e) = storage_close.persist().await {
        warn!(error = %e, "storage flush on shutdown failed");
    }
    outcome
}

/// Resolve immediately if `stop` is `None` (never fire), otherwise sleep for
/// the duration. Lets the main select! arm uniformly without a conditional.
async fn sleep_or_pending(stop: Option<Duration>) {
    match stop {
        Some(d) => tokio::time::sleep(d).await,
        None => std::future::pending::<()>().await,
    }
}

/// Wait for any of SIGINT / SIGTERM / SIGQUIT and return its name. Unix only.
async fn wait_for_signal() -> &'static str {
    use tokio::signal::unix::{SignalKind, signal};
    let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
    let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
    let mut sigquit = signal(SignalKind::quit()).expect("install SIGQUIT handler");
    tokio::select! {
        _ = sigint.recv() => "SIGINT",
        _ = sigterm.recv() => "SIGTERM",
        _ = sigquit.recv() => "SIGQUIT",
    }
}

/// Resolve the `(ws_url, rpc_url)` pair. `--mirror-from <url>` derives both
/// from one neve endpoint (neve serves RPC + WS + `/health` on one socket),
/// overriding `--network` / `--ws-url` / `--rpc-url`. Otherwise an explicit
/// `--ws-url` / `--rpc-url` wins, falling back to the `--network` defaults.
fn resolve_endpoints(cli: &Cli) -> Result<(String, String)> {
    if let Some(base) = cli.mirror_from.as_deref() {
        let base = base.trim_end_matches('/').to_owned();
        let ws = derive_ws_url(&base)?;
        info!(rpc = %base, ws = %ws, "mirror mode: derived endpoints from --mirror-from");
        return Ok((ws, base));
    }
    Ok((
        cli.ws_url
            .clone()
            .unwrap_or_else(|| cli.network.ws_url().to_owned()),
        cli.rpc_url
            .clone()
            .unwrap_or_else(|| cli.network.rpc_url().to_owned()),
    ))
}

/// Resolve the height at which to anchor a freshly-created store's floor, so
/// backfill fills *down* to it rather than only forward from the first
/// `newHead`. Sources, in priority order:
///
/// 1. An explicit `--backfill-floor <HEIGHT>` (wins even in mirror mode).
/// 2. `--mirror-from` mode: the upstream's earliest retained block, learned
///    from its `/health`, so backfill reproduces the whole upstream range.
///
/// In every case an *existing* store already has its floor baked in at
/// creation, so we skip the work and return `None` (resume as-is). Neither
/// flag nor probe can lower the floor of a store that already exists.
async fn resolve_anchor_floor(
    http: &reqwest::Client,
    cli: &Cli,
    data_dir: &std::path::Path,
) -> Option<u64> {
    let store_exists = data_dir.join("blocks").join("blockdb.idx").exists();

    // An explicit floor is the most specific intent and applies in any mode,
    // including against the public endpoint.
    if let Some(floor) = cli.backfill_floor {
        if store_exists {
            info!(
                floor,
                "--backfill-floor ignored: store already exists, resuming with its baked-in floor",
            );
            return None;
        }
        info!(floor, "anchoring backfill floor at --backfill-floor");
        return Some(floor);
    }

    let base = cli.mirror_from.as_deref()?;
    if store_exists {
        info!("mirror: local store already exists, resuming with its anchored floor");
        return None;
    }
    match fetch_upstream_min_height(http, base).await {
        Ok(min_h) => {
            info!(
                min_height = min_h,
                "mirror: anchoring backfill floor at upstream's earliest retained block",
            );
            Some(min_h)
        }
        Err(e) => {
            warn!(error = %e, "mirror: /health probe failed; falling back to forward-only from tip");
            None
        }
    }
}

/// Derive a WebSocket URL from an HTTP(S) base, preserving host/port/path.
/// neve serves the `newHeads` WebSocket on the same socket as its HTTP
/// JSON-RPC, so mirroring needs only the one endpoint. `ws://` / `wss://`
/// inputs pass through unchanged.
fn derive_ws_url(base: &str) -> Result<String> {
    if let Some(rest) = base.strip_prefix("https://") {
        Ok(format!("wss://{rest}"))
    } else if let Some(rest) = base.strip_prefix("http://") {
        Ok(format!("ws://{rest}"))
    } else if base.starts_with("ws://") || base.starts_with("wss://") {
        Ok(base.to_owned())
    } else {
        bail!("--mirror-from must be an http(s):// (or ws(s)://) URL, got: {base}")
    }
}

/// Probe a neve upstream's `/health` for its earliest retained block height
/// (`blocks.min_height`). Used to anchor a fresh mirror store's floor so
/// backfill reproduces the upstream's whole retained range rather than only
/// growing forward from the current tip.
async fn fetch_upstream_min_height(http: &reqwest::Client, base: &str) -> Result<u64> {
    let url = format!("{}/health", base.trim_end_matches('/'));
    let resp = http
        .get(&url)
        .send()
        .await
        .with_context(|| format!("GET {url}"))?;
    if !resp.status().is_success() {
        bail!("upstream /health returned HTTP {}", resp.status());
    }
    let v: Value = resp.json().await.context("decode /health body")?;
    v.get("blocks")
        .and_then(|b| b.get("min_height"))
        .and_then(Value::as_u64)
        .ok_or_else(|| anyhow!("/health missing blocks.min_height (is the upstream a neve?)"))
}