Skip to main content

bee_check/
lib.rs

1//! Core check engine for `bee-check`.
2//!
3//! The CLI in `main.rs` and (in future) the `bee-check-web` SPA both
4//! consume the same [`Report`] shape produced here. See `SPEC.md` for
5//! the JSON shape.
6
7use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use anyhow::{Context, Result, anyhow};
12use bee::Client;
13use bee::manifest::{is_null_address, unmarshal};
14use bee::swarm::gsoc::proximity;
15use bee::swarm::{BatchId, EthAddress, Reference, Topic};
16use futures::stream::{FuturesUnordered, StreamExt};
17use reqwest::Method;
18use serde::{Deserialize, Serialize};
19use tokio::sync::Semaphore;
20use tracing::{debug, info, trace, warn};
21
22/// Default public gateway used when no `--gateway` flag is supplied.
23/// `api.gateway.ethswarm.org` is the Foundation-operated forwarding
24/// gateway that returns proper 404s for unknown references (unlike
25/// `gateway.ethswarm.org`, which currently fronts a static page that
26/// 200s everything).
27pub const DEFAULT_GATEWAY: &str = "https://api.gateway.ethswarm.org";
28
29const MAX_CHUNKS: usize = 1000;
30/// Stamp is "low TTL" below this many seconds (~24h). A re-seed
31/// against a stamp below this threshold may not outlive the batch.
32const STAMP_LOW_TTL_SECS: i64 = 86_400;
33
34#[derive(Copy, Clone, Debug)]
35pub enum OutputFormat {
36    Text,
37    Json,
38}
39
40/// Outcome enum reported per vantage and aggregated for the whole check.
41#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
42#[serde(rename_all = "snake_case")]
43pub enum Status {
44    Retrievable,
45    Unretrievable,
46    Partial,
47    Error,
48}
49
50#[derive(Debug, Serialize, Deserialize)]
51pub struct Report {
52    pub reference: String,
53    pub status: Status,
54    pub vantages: Vec<VantageResult>,
55    /// Public-gateway HEAD probes. Empty when `--no-gateway` was used
56    /// or no gateways were probed. Added in 0.3 — additive.
57    #[serde(default, skip_serializing_if = "Vec::is_empty")]
58    pub gateways: Vec<GatewayResult>,
59    /// Set when the user-provided input was a non-bare-reference (a
60    /// feed reference) that resolved to `reference`. Added in 0.3.
61    #[serde(default, skip_serializing_if = "Option::is_none")]
62    pub resolution: Option<Resolution>,
63    /// Populated only when `--per-chunk` was requested.
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub chunks: Option<Vec<ChunkProbe>>,
66    /// Roll-ups computed from `chunks`. Populated alongside `chunks`
67    /// (i.e. when `--per-chunk` was used). Added in 0.4 — additive.
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub chunk_stats: Option<ChunkStats>,
70    /// Cold-content end-to-end download probes. One per Bee URL
71    /// (probing `GET /bytes/{ref}`) and one per gateway URL
72    /// (probing `GET /bzz/{ref}/`). Populated only when `--cold` was
73    /// requested. Added in 0.5 — additive on spec_version 1.
74    #[serde(default, skip_serializing_if = "Vec::is_empty")]
75    pub cold_downloads: Vec<ColdDownloadResult>,
76    pub spec_version: u32,
77}
78
79/// Aggregate statistics over the per-chunk probe data. Per-vantage
80/// stats surface "fast on bee-A, slow on bee-B"; per-neighborhood
81/// stats surface "chunks in nb 0x1a are slow no matter which vantage".
82#[derive(Debug, Serialize, Deserialize)]
83pub struct ChunkStats {
84    pub per_vantage: BTreeMap<String, ChunkStatRow>,
85    /// Keyed by neighborhood (first 2 hex chars of the chunk address).
86    pub per_neighborhood: BTreeMap<String, ChunkStatRow>,
87}
88
89#[derive(Debug, Serialize, Deserialize, Clone)]
90pub struct ChunkStatRow {
91    pub total: usize,
92    pub found: usize,
93    pub missing: usize,
94    /// Latency percentiles in ms, computed over chunks where
95    /// `found == true`. Absent when no chunk was found.
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub elapsed_p50_ms: Option<u64>,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub elapsed_p95_ms: Option<u64>,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub elapsed_max_ms: Option<u64>,
102}
103
104/// HEAD-probe of a public Swarm gateway. The gateway resolves the
105/// reference end-to-end through forwarding Kademlia; a 2xx means
106/// "anyone could retrieve this through this gateway right now",
107/// independent of any specific Bee node the user controls.
108#[derive(Debug, Serialize, Deserialize)]
109pub struct GatewayResult {
110    pub url: String,
111    /// `true` for 2xx, `false` for 4xx/5xx, `None` when the call
112    /// errored at the network layer.
113    pub retrievable: Option<bool>,
114    pub elapsed_ms: u64,
115    /// HTTP status code returned (when the call completed).
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub status_code: Option<u16>,
118    #[serde(skip_serializing_if = "Option::is_none")]
119    pub error: Option<String>,
120}
121
122/// Records the input → reference resolution when the user supplied a
123/// mutable handle (currently: Swarm feeds). Added in 0.3.
124#[derive(Debug, Serialize, Deserialize)]
125#[serde(tag = "kind", rename_all = "snake_case")]
126pub enum Resolution {
127    /// Feed lookup via `GET /feeds/{owner}/{topic}` on the first
128    /// `--bee` vantage.
129    Feed {
130        owner: String,
131        topic: String,
132        /// Hex of the resolved chunk reference.
133        resolved_reference: String,
134    },
135}
136
137#[derive(Debug, Serialize, Deserialize)]
138pub struct VantageResult {
139    pub bee_url: String,
140    /// `None` means the call errored (see `error`).
141    pub retrievable: Option<bool>,
142    pub elapsed_ms: u64,
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub error: Option<String>,
145    /// Hex overlay address of the probed Bee, from `GET /addresses`. The
146    /// first 2 hex chars are the neighborhood the node sits in.
147    #[serde(skip_serializing_if = "Option::is_none")]
148    pub overlay: Option<String>,
149    /// Bee semver, from `GET /health` (Bee's `version` field).
150    #[serde(skip_serializing_if = "Option::is_none")]
151    pub bee_version: Option<String>,
152    /// Proximity order between the probe node's overlay and the target
153    /// reference. Higher = closer to where the chunk is stored. PO 0
154    /// means the probe is not in the chunk's neighborhood at all.
155    #[serde(skip_serializing_if = "Option::is_none")]
156    pub proximity_to_root: Option<u32>,
157    /// PO between this vantage's overlay and `--target-overlay`, when
158    /// that flag was supplied. Added in 0.4 — used to surface "this
159    /// is the closest vantage to your target neighborhood".
160    #[serde(skip_serializing_if = "Option::is_none")]
161    pub target_proximity: Option<u32>,
162}
163
164#[derive(Debug, Serialize, Deserialize)]
165pub struct ChunkProbe {
166    pub address: String,
167    /// First 2 hex chars — neighborhood the chunk should land in.
168    pub neighborhood: String,
169    pub per_vantage: BTreeMap<String, ChunkVantage>,
170}
171
172#[derive(Debug, Serialize, Deserialize)]
173pub struct ChunkVantage {
174    pub found: bool,
175    pub elapsed_ms: u64,
176    #[serde(skip_serializing_if = "Option::is_none")]
177    pub error: Option<String>,
178    /// Proximity between this vantage's overlay and the chunk address.
179    /// Unset when the vantage's overlay couldn't be fetched.
180    #[serde(skip_serializing_if = "Option::is_none")]
181    pub proximity: Option<u32>,
182}
183
184/// Cold end-to-end download probe. Pulls the full byte stream from
185/// `/bytes/{ref}` (for Bee vantages) or `/bzz/{ref}/` (for gateways)
186/// and times the entire transfer. Distinct from the stewardship
187/// probe, which only walks the chunk graph — cold-download exercises
188/// the HTTP body transport too. Added in 0.5.
189#[derive(Debug, Serialize, Deserialize)]
190pub struct ColdDownloadResult {
191    pub url: String,
192    /// Path probed on this URL — `/bytes/{ref}` for vantages,
193    /// `/bzz/{ref}/` for gateways.
194    pub endpoint: String,
195    /// `true` if HTTP status was 2xx and the body fully streamed.
196    /// `false` for 4xx/5xx or transport errors during body read.
197    pub success: bool,
198    /// Total bytes streamed before the response ended (or errored).
199    pub bytes_downloaded: u64,
200    pub elapsed_ms: u64,
201    #[serde(skip_serializing_if = "Option::is_none")]
202    pub status_code: Option<u16>,
203    #[serde(skip_serializing_if = "Option::is_none")]
204    pub error: Option<String>,
205}
206
207pub struct ReseedRequest {
208    pub reference: String,
209    pub bee_url: String,
210    pub batch_id: String,
211    pub timeout: Duration,
212}
213
214/// Result of the `--reseed --stamp <id>` pre-flight check. Mirrors
215/// ipfs-check's "stale records" hint in spirit: surface upstream-data
216/// problems before doing the operation.
217#[derive(Debug, Serialize, Deserialize)]
218pub struct StampStatus {
219    pub batch_id: String,
220    pub exists: bool,
221    pub usable: bool,
222    pub batch_ttl: i64,
223    /// `usable && exists && batch_ttl >= STAMP_LOW_TTL_SECS` (or
224    /// `batch_ttl < 0`, meaning unknown/infinite).
225    pub healthy: bool,
226    #[serde(skip_serializing_if = "Vec::is_empty")]
227    pub warnings: Vec<String>,
228}
229
230const SPEC_VERSION: u32 = 1;
231
232fn parse_reference(s: &str) -> Result<Reference> {
233    Reference::from_hex(s).map_err(|e| anyhow!("invalid reference {s}: {e}"))
234}
235
236fn make_bee(url: &str, timeout: Duration) -> Result<Client> {
237    let http = reqwest::Client::builder()
238        .timeout(timeout)
239        .build()
240        .context("building http client")?;
241    Client::with_http_client(url, http).map_err(|e| anyhow!("invalid bee url {url}: {e}"))
242}
243
244/// Probe `/stewardship/{ref}` across all vantages in parallel. Alongside
245/// the retrievability call, fetches `/addresses` (overlay) and
246/// `/health` (Bee version) so the rendered report can surface which
247/// neighborhood the probe came from. These ancillary calls are
248/// best-effort: failure leaves `overlay` / `bee_version` unset rather
249/// than failing the whole vantage.
250pub async fn check_multi_vantage(
251    reference: &str,
252    bees: &[String],
253    timeout: Duration,
254) -> Result<Report> {
255    let r = parse_reference(reference)?;
256    let root_bytes = first_32(&r);
257    info!(
258        reference = reference,
259        vantages = bees.len(),
260        timeout_secs = timeout.as_secs(),
261        "starting multi-vantage probe",
262    );
263
264    let mut futs = FuturesUnordered::new();
265    for bee_url in bees {
266        let bee_url = bee_url.clone();
267        let r = r.clone();
268        futs.push(async move {
269            debug!(bee_url = %bee_url, "probing vantage");
270            let bee = match make_bee(&bee_url, timeout) {
271                Ok(b) => b,
272                Err(e) => {
273                    return VantageResult {
274                        bee_url,
275                        retrievable: None,
276                        elapsed_ms: 0,
277                        error: Some(format!("{e:#}")),
278                        overlay: None,
279                        bee_version: None,
280                        proximity_to_root: None,
281                        target_proximity: None,
282                    };
283                }
284            };
285            let started = Instant::now();
286            let api = bee.api();
287            let debug = bee.debug();
288            let (stew_res, addr_res, health_res) = tokio::join!(
289                api.is_retrievable(&r),
290                debug.addresses(),
291                debug.health(),
292            );
293            let elapsed_ms = started.elapsed().as_millis() as u64;
294
295            let overlay = addr_res.ok().map(|a| a.overlay);
296            let bee_version = health_res.ok().map(|h| h.version);
297            let proximity_to_root = overlay
298                .as_deref()
299                .and_then(decode_overlay)
300                .map(|o| proximity(&o, &root_bytes));
301
302            match stew_res {
303                Ok(ok) => {
304                    debug!(
305                        bee_url = %bee_url,
306                        retrievable = ok,
307                        elapsed_ms,
308                        overlay = overlay.as_deref().unwrap_or("?"),
309                        "vantage done",
310                    );
311                    VantageResult {
312                        bee_url,
313                        retrievable: Some(ok),
314                        elapsed_ms,
315                        error: None,
316                        overlay,
317                        bee_version,
318                        proximity_to_root,
319                        target_proximity: None,
320                    }
321                }
322                Err(e) => {
323                    debug!(bee_url = %bee_url, error = %e, "vantage errored");
324                    VantageResult {
325                        bee_url,
326                        retrievable: None,
327                        elapsed_ms,
328                        error: Some(format!("{e}")),
329                        overlay,
330                        bee_version,
331                        proximity_to_root,
332                        target_proximity: None,
333                    }
334                }
335            }
336        });
337    }
338
339    let mut vantages = Vec::with_capacity(bees.len());
340    while let Some(v) = futs.next().await {
341        vantages.push(v);
342    }
343    vantages.sort_by(|a, b| a.bee_url.cmp(&b.bee_url));
344
345    let status = aggregate_status(&vantages, &[]);
346    Ok(Report {
347        reference: reference.to_string(),
348        status,
349        vantages,
350        gateways: Vec::new(),
351        resolution: None,
352        chunks: None,
353        chunk_stats: None,
354        cold_downloads: Vec::new(),
355        spec_version: SPEC_VERSION,
356    })
357}
358
359/// Probe public Swarm gateways via `HEAD {gateway}/bzz/{ref}` in
360/// parallel. Mirrors the BYO-Bee philosophy: gateway results
361/// complement (don't replace) the vantage probes and aggregate into
362/// the same top-level `status`.
363pub async fn check_gateways(
364    reference: &str,
365    gateway_urls: &[String],
366    timeout: Duration,
367) -> Result<Vec<GatewayResult>> {
368    // Reference is validated up front so an invalid hex doesn't waste
369    // network calls.
370    let _ = parse_reference(reference)?;
371    if !gateway_urls.is_empty() {
372        info!(count = gateway_urls.len(), "probing gateways");
373    }
374
375    let http = reqwest::Client::builder()
376        .timeout(timeout)
377        .build()
378        .context("building http client for gateway probes")?;
379
380    let mut futs = FuturesUnordered::new();
381    for base in gateway_urls {
382        let base = base.clone();
383        let reference = reference.to_string();
384        let http = http.clone();
385        futs.push(async move {
386            let url = build_gateway_url(&base, &reference);
387            let started = Instant::now();
388            let res = http.request(Method::HEAD, &url).send().await;
389            let elapsed_ms = started.elapsed().as_millis() as u64;
390            match res {
391                Ok(resp) => {
392                    let status = resp.status().as_u16();
393                    GatewayResult {
394                        url: base,
395                        retrievable: Some(resp.status().is_success()),
396                        elapsed_ms,
397                        status_code: Some(status),
398                        error: None,
399                    }
400                }
401                Err(e) => GatewayResult {
402                    url: base,
403                    retrievable: None,
404                    elapsed_ms,
405                    status_code: None,
406                    error: Some(format!("{e}")),
407                },
408            }
409        });
410    }
411    let mut out = Vec::with_capacity(gateway_urls.len());
412    while let Some(g) = futs.next().await {
413        out.push(g);
414    }
415    out.sort_by(|a, b| a.url.cmp(&b.url));
416    Ok(out)
417}
418
419/// Append gateway results to an existing report and re-aggregate the
420/// top-level status. Splits the gateway probe from the main check so
421/// the SPA can show partial progress.
422pub fn merge_gateways(mut report: Report, gateways: Vec<GatewayResult>) -> Report {
423    report.gateways = gateways;
424    report.status = aggregate_status(&report.vantages, &report.gateways);
425    report
426}
427
428fn build_gateway_url(base: &str, reference: &str) -> String {
429    let trimmed = base.trim_end_matches('/');
430    format!("{trimmed}/bzz/{reference}/")
431}
432
433fn build_bytes_url(base: &str, reference: &str) -> String {
434    let trimmed = base.trim_end_matches('/');
435    format!("{trimmed}/bytes/{reference}")
436}
437
438/// Cold end-to-end download probe. For each Bee vantage, issues
439/// `GET /bytes/{ref}` and streams the body to EOF, counting bytes
440/// and timing the full transfer. For each gateway, issues
441/// `GET /bzz/{ref}/` instead (the canonical gateway path). Records
442/// `bytes_downloaded` + `elapsed_ms` per URL. Distinct from the
443/// stewardship probe: this exercises HTTP body transport, not just
444/// chunk-graph walking.
445pub async fn cold_download_all(
446    bee_urls: &[String],
447    gateway_urls: &[String],
448    reference: &str,
449    timeout: Duration,
450) -> Result<Vec<ColdDownloadResult>> {
451    let _ = parse_reference(reference)?;
452    info!(
453        bee_count = bee_urls.len(),
454        gateway_count = gateway_urls.len(),
455        "starting cold-download probes",
456    );
457
458    let http = reqwest::Client::builder()
459        .timeout(timeout)
460        .build()
461        .context("building http client for cold-download probes")?;
462
463    // (base_url, endpoint_label, full_probe_url)
464    let mut targets: Vec<(String, String, String)> =
465        Vec::with_capacity(bee_urls.len() + gateway_urls.len());
466    for url in bee_urls {
467        targets.push((
468            url.clone(),
469            "/bytes/{ref}".to_string(),
470            build_bytes_url(url, reference),
471        ));
472    }
473    for url in gateway_urls {
474        targets.push((
475            url.clone(),
476            "/bzz/{ref}/".to_string(),
477            build_gateway_url(url, reference),
478        ));
479    }
480
481    let mut futs = FuturesUnordered::new();
482    for (base, endpoint, probe_url) in targets {
483        let http = http.clone();
484        futs.push(async move { cold_probe(&http, base, endpoint, probe_url).await });
485    }
486
487    let mut out = Vec::with_capacity(bee_urls.len() + gateway_urls.len());
488    while let Some(c) = futs.next().await {
489        out.push(c);
490    }
491    out.sort_by(|a, b| a.url.cmp(&b.url));
492    Ok(out)
493}
494
495async fn cold_probe(
496    http: &reqwest::Client,
497    base: String,
498    endpoint: String,
499    probe_url: String,
500) -> ColdDownloadResult {
501    debug!(url = %probe_url, "cold-download GET");
502    let started = Instant::now();
503    let res = http.get(&probe_url).send().await;
504    match res {
505        Ok(mut resp) => {
506            let status = resp.status();
507            let status_code = status.as_u16();
508            if !status.is_success() {
509                return ColdDownloadResult {
510                    url: base,
511                    endpoint,
512                    success: false,
513                    bytes_downloaded: 0,
514                    elapsed_ms: started.elapsed().as_millis() as u64,
515                    status_code: Some(status_code),
516                    error: Some(format!("HTTP {status_code}")),
517                };
518            }
519            let mut bytes: u64 = 0;
520            let mut err: Option<String> = None;
521            loop {
522                match resp.chunk().await {
523                    Ok(Some(chunk)) => bytes += chunk.len() as u64,
524                    Ok(None) => break,
525                    Err(e) => {
526                        err = Some(format!("stream error after {bytes} bytes: {e}"));
527                        break;
528                    }
529                }
530            }
531            ColdDownloadResult {
532                url: base,
533                endpoint,
534                success: err.is_none(),
535                bytes_downloaded: bytes,
536                elapsed_ms: started.elapsed().as_millis() as u64,
537                status_code: Some(status_code),
538                error: err,
539            }
540        }
541        Err(e) => ColdDownloadResult {
542            url: base,
543            endpoint,
544            success: false,
545            bytes_downloaded: 0,
546            elapsed_ms: started.elapsed().as_millis() as u64,
547            status_code: None,
548            error: Some(format!("{e}")),
549        },
550    }
551}
552
553/// Resolve a `feed:OWNER:TOPIC` input to its current chunk reference.
554/// `bee_url` is the first Bee vantage; the feed lookup goes through
555/// it. Returns the resolved reference and a [`Resolution`] record
556/// describing what was done.
557pub async fn resolve_feed(
558    bee_url: &str,
559    owner_hex: &str,
560    topic_hex: &str,
561    timeout: Duration,
562) -> Result<(String, Resolution)> {
563    let bee = make_bee(bee_url, timeout)?;
564    let owner = EthAddress::from_hex(owner_hex)
565        .map_err(|e| anyhow!("invalid feed owner {owner_hex}: {e}"))?;
566    let topic = Topic::from_hex(topic_hex)
567        .map_err(|e| anyhow!("invalid feed topic {topic_hex}: {e}"))?;
568    let reference = bee
569        .file()
570        .get_feed_lookup(&owner, &topic)
571        .await
572        .map_err(anyhow::Error::from)?;
573    let r_hex = reference.to_hex();
574    Ok((
575        r_hex.clone(),
576        Resolution::Feed {
577            owner: owner.to_hex(),
578            topic: topic.to_hex(),
579            resolved_reference: r_hex,
580        },
581    ))
582}
583
584/// Parse a positional input. Accepts either:
585/// - A 64- or 128-hex Swarm reference, returned as-is with `None`
586///   resolution metadata.
587/// - `feed:OWNER:TOPIC` (40-hex owner, 64-hex topic) — caller should
588///   then call [`resolve_feed`] to turn it into a reference.
589pub fn parse_input(input: &str) -> ParsedInput {
590    if let Some(rest) = input.strip_prefix("feed:") {
591        // Accept `feed:owner:topic` and `feed:owner/topic`.
592        let parts: Vec<&str> = rest.splitn(2, [':', '/']).collect();
593        if parts.len() == 2 {
594            return ParsedInput::Feed {
595                owner: parts[0].to_string(),
596                topic: parts[1].to_string(),
597            };
598        }
599    }
600    ParsedInput::Reference(input.to_string())
601}
602
603/// Output of [`parse_input`].
604pub enum ParsedInput {
605    /// A direct Swarm reference (caller should pass it straight to
606    /// [`check_multi_vantage`]).
607    Reference(String),
608    /// A feed reference (caller should call [`resolve_feed`] first to
609    /// get the current chunk reference, then probe that).
610    Feed { owner: String, topic: String },
611}
612
613/// Strip the optional `0x` prefix, decode hex, return the first 32
614/// bytes (overlay/reference length). Returns `None` on malformed hex
615/// or short input.
616fn decode_overlay(hex: &str) -> Option<[u8; 32]> {
617    let s = hex.strip_prefix("0x").unwrap_or(hex);
618    if s.len() < 64 {
619        return None;
620    }
621    let mut out = [0u8; 32];
622    for (i, b) in out.iter_mut().enumerate() {
623        let h = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16).ok()?;
624        *b = h;
625    }
626    Some(out)
627}
628
629fn first_32(r: &Reference) -> [u8; 32] {
630    let mut out = [0u8; 32];
631    out.copy_from_slice(&r.as_bytes()[..32]);
632    out
633}
634
635fn aggregate_status(vantages: &[VantageResult], gateways: &[GatewayResult]) -> Status {
636    let outcomes: Vec<Option<bool>> = vantages
637        .iter()
638        .map(|v| v.retrievable)
639        .chain(gateways.iter().map(|g| g.retrievable))
640        .collect();
641    let total = outcomes.len();
642    if total == 0 {
643        return Status::Error;
644    }
645    let retr = outcomes.iter().filter(|o| **o == Some(true)).count();
646    let unret = outcomes.iter().filter(|o| **o == Some(false)).count();
647    let err = outcomes.iter().filter(|o| o.is_none()).count();
648    if err == total {
649        Status::Error
650    } else if retr == total {
651        Status::Retrievable
652    } else if retr == 0 && unret + err == total {
653        Status::Unretrievable
654    } else {
655        Status::Partial
656    }
657}
658
659/// Walk the manifest at `report.reference`, probe each leaf chunk via
660/// `GET /chunks/{addr}` from every vantage, and attach the result to
661/// the report. When the per-vantage overlay is known (already fetched
662/// in [`check_multi_vantage`]), each [`ChunkVantage`] also carries the
663/// proximity between that vantage and the chunk.
664pub async fn drill_down(
665    mut report: Report,
666    bees: &[String],
667    timeout: Duration,
668    concurrency: usize,
669) -> Result<Report> {
670    let r = parse_reference(&report.reference)?;
671    // Use the first vantage as the source-of-truth for manifest walking.
672    let walker_bee = bees.first().context("no bee URL for drill-down")?;
673    let walker = make_bee(walker_bee, timeout)?;
674
675    let addresses = collect_chunk_addresses(&walker, &r).await?;
676    info!(chunks = addresses.len(), concurrency, "drill-down probing chunks");
677
678    // Map vantage URL → its (already-fetched) overlay bytes, for
679    // per-chunk proximity tagging without re-hitting `/addresses`.
680    let overlays: BTreeMap<String, [u8; 32]> = report
681        .vantages
682        .iter()
683        .filter_map(|v| {
684            v.overlay
685                .as_deref()
686                .and_then(decode_overlay)
687                .map(|o| (v.bee_url.clone(), o))
688        })
689        .collect();
690
691    let clients: Vec<(String, Client)> = bees
692        .iter()
693        .map(|u| make_bee(u, timeout).map(|b| (u.clone(), b)))
694        .collect::<Result<_>>()?;
695
696    let sem = Arc::new(Semaphore::new(concurrency.max(1)));
697    let mut probes = Vec::with_capacity(addresses.len());
698
699    let mut futs = FuturesUnordered::new();
700    for addr in addresses {
701        let sem = sem.clone();
702        let clients = clients.clone();
703        let overlays = overlays.clone();
704        futs.push(async move {
705            let chunk_bytes = first_32_of_ref(&addr);
706            let mut per_vantage = BTreeMap::new();
707            for (url, bee) in &clients {
708                let _permit = sem.acquire().await.expect("semaphore not closed");
709                let started = Instant::now();
710                let res = bee.file().download_chunk(&addr, None).await;
711                let elapsed_ms = started.elapsed().as_millis() as u64;
712                let prox = overlays
713                    .get(url)
714                    .map(|o| proximity(o, &chunk_bytes));
715                trace!(
716                    chunk = %addr.to_hex(),
717                    bee_url = %url,
718                    found = res.is_ok(),
719                    elapsed_ms,
720                    "chunk probe",
721                );
722                let cv = match res {
723                    Ok(_) => ChunkVantage {
724                        found: true,
725                        elapsed_ms,
726                        error: None,
727                        proximity: prox,
728                    },
729                    Err(e) => ChunkVantage {
730                        found: false,
731                        elapsed_ms,
732                        error: Some(format!("{e}")),
733                        proximity: prox,
734                    },
735                };
736                per_vantage.insert(url.clone(), cv);
737            }
738            let hex = addr.to_hex();
739            let neighborhood = hex.chars().take(2).collect::<String>();
740            ChunkProbe {
741                address: hex,
742                neighborhood,
743                per_vantage,
744            }
745        });
746    }
747
748    while let Some(p) = futs.next().await {
749        probes.push(p);
750    }
751    probes.sort_by(|a, b| a.address.cmp(&b.address));
752    report.chunk_stats = Some(compute_chunk_stats(&probes));
753    report.chunks = Some(probes);
754    Ok(report)
755}
756
757/// Roll up per-vantage + per-neighborhood timing statistics from a
758/// flat list of [`ChunkProbe`]s. Pure function; runs in milliseconds
759/// even for the 1000-chunk cap.
760pub fn compute_chunk_stats(probes: &[ChunkProbe]) -> ChunkStats {
761    // (url → Vec<elapsed_ms for found chunks>, found_count, missing_count)
762    let mut per_vantage: BTreeMap<String, (Vec<u64>, usize, usize)> = BTreeMap::new();
763    let mut per_neighborhood: BTreeMap<String, (Vec<u64>, usize, usize)> = BTreeMap::new();
764
765    for p in probes {
766        for (url, cv) in &p.per_vantage {
767            let entry = per_vantage.entry(url.clone()).or_default();
768            if cv.found {
769                entry.0.push(cv.elapsed_ms);
770                entry.1 += 1;
771            } else {
772                entry.2 += 1;
773            }
774            let n = per_neighborhood.entry(p.neighborhood.clone()).or_default();
775            if cv.found {
776                n.0.push(cv.elapsed_ms);
777                n.1 += 1;
778            } else {
779                n.2 += 1;
780            }
781        }
782    }
783
784    let to_row = |(latencies, found, missing): (Vec<u64>, usize, usize)| -> ChunkStatRow {
785        let mut sorted = latencies;
786        sorted.sort_unstable();
787        let p = |q: f64| -> Option<u64> {
788            if sorted.is_empty() {
789                None
790            } else {
791                let idx = ((sorted.len() as f64 - 1.0) * q).round() as usize;
792                Some(sorted[idx])
793            }
794        };
795        ChunkStatRow {
796            total: found + missing,
797            found,
798            missing,
799            elapsed_p50_ms: p(0.50),
800            elapsed_p95_ms: p(0.95),
801            elapsed_max_ms: sorted.last().copied(),
802        }
803    };
804
805    ChunkStats {
806        per_vantage: per_vantage.into_iter().map(|(k, v)| (k, to_row(v))).collect(),
807        per_neighborhood: per_neighborhood
808            .into_iter()
809            .map(|(k, v)| (k, to_row(v)))
810            .collect(),
811    }
812}
813
814/// Annotate each vantage with `target_proximity` (PO between its
815/// overlay and `target_overlay_hex`) and sort the vantages array by
816/// that proximity in descending order — closest first. No-op when
817/// `target_overlay_hex` doesn't decode. Added in 0.4.
818pub fn annotate_target_overlay(mut report: Report, target_overlay_hex: &str) -> Report {
819    let Some(target) = decode_overlay(target_overlay_hex) else {
820        // Silently no-op'ing here meant users could mis-type --target-overlay
821        // and never realize the flag was ignored. Warn so a `-v`-less run
822        // still surfaces the problem on stderr (warn is the default level).
823        warn!(
824            target = target_overlay_hex,
825            "--target-overlay value is not valid 64-hex overlay; flag ignored",
826        );
827        return report;
828    };
829    for v in &mut report.vantages {
830        if let Some(o) = v.overlay.as_deref().and_then(decode_overlay) {
831            v.target_proximity = Some(proximity(&o, &target));
832        }
833    }
834    report
835        .vantages
836        .sort_by(|a, b| b.target_proximity.cmp(&a.target_proximity));
837    report
838}
839
840fn first_32_of_ref(r: &Reference) -> [u8; 32] {
841    let mut out = [0u8; 32];
842    out.copy_from_slice(&r.as_bytes()[..32]);
843    out
844}
845
846/// BFS-walk the manifest starting at `root`, collecting both child
847/// manifest chunk addresses and content (`target_address`) addresses.
848/// Capped at [`MAX_CHUNKS`] to bound work for pathological cases. If
849/// `root` isn't a manifest, returns just `[root]`.
850async fn collect_chunk_addresses(bee: &Client, root: &Reference) -> Result<Vec<Reference>> {
851    let mut addresses: Vec<Reference> = vec![root.clone()];
852    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
853    seen.insert(root.to_hex());
854
855    let mut queue: std::collections::VecDeque<Reference> = std::collections::VecDeque::new();
856    queue.push_back(root.clone());
857
858    while let Some(addr) = queue.pop_front() {
859        if addresses.len() >= MAX_CHUNKS {
860            break;
861        }
862        let bytes = match bee.file().download_chunk(&addr, None).await {
863            Ok(b) => b,
864            // If we can't fetch a manifest node, we can't walk deeper. The
865            // outer probe loop will still record the chunk-level miss.
866            Err(_) => continue,
867        };
868        let Ok(node) = unmarshal(&bytes, addr.as_bytes()) else {
869            // Root might be raw content; deeper nodes that don't parse
870            // are leaves — both fine to skip.
871            continue;
872        };
873        // Content reference at this node — probe it but don't recurse.
874        if !is_null_address(&node.target_address) {
875            if let Ok(r) = Reference::new(&node.target_address) {
876                if seen.insert(r.to_hex()) {
877                    addresses.push(r);
878                }
879            }
880        }
881        // Child manifest chunks — probe and recurse.
882        for fork in node.forks.values() {
883            if let Some(sa) = fork.node.self_address {
884                if let Ok(r) = Reference::new(&sa) {
885                    if seen.insert(r.to_hex()) {
886                        addresses.push(r.clone());
887                        queue.push_back(r);
888                    }
889                }
890            }
891        }
892    }
893    Ok(addresses)
894}
895
896pub async fn reseed(req: ReseedRequest) -> Result<()> {
897    let bee = make_bee(&req.bee_url, req.timeout)?;
898    let r = parse_reference(&req.reference)?;
899    let batch = BatchId::from_hex(&req.batch_id)
900        .map_err(|e| anyhow!("invalid batch id {}: {e}", req.batch_id))?;
901    info!(
902        bee_url = %req.bee_url,
903        batch = %req.batch_id,
904        "re-uploading via PUT /stewardship/{{ref}}",
905    );
906    bee.api().reupload(&r, &batch).await?;
907    Ok(())
908}
909
910/// Pre-flight check before `--reseed`: look up `GET /stamps/{id}` on the
911/// target Bee and surface usable/expiry concerns. Mirrors the spirit of
912/// ipfs-check's "stale records" UX hint — flag freshness problems
913/// before doing the operation.
914pub async fn check_stamp(
915    bee_url: &str,
916    batch_id: &str,
917    timeout: Duration,
918) -> Result<StampStatus> {
919    let bee = make_bee(bee_url, timeout)?;
920    let batch = BatchId::from_hex(batch_id)
921        .map_err(|e| anyhow!("invalid batch id {batch_id}: {e}"))?;
922    let pb = bee
923        .postage()
924        .get_postage_batch(&batch)
925        .await
926        .map_err(anyhow::Error::from)?;
927
928    let mut warnings = Vec::new();
929    if !pb.exists {
930        warnings.push("batch not known to this Bee".to_string());
931    }
932    if !pb.usable {
933        warnings.push("batch not usable yet (chain may be syncing)".to_string());
934    }
935    if pb.batch_ttl >= 0 && pb.batch_ttl < STAMP_LOW_TTL_SECS {
936        warnings.push(format!(
937            "batch TTL low: ~{} (re-seed may not outlive the batch)",
938            humanize_secs(pb.batch_ttl)
939        ));
940    }
941
942    let healthy = pb.exists && pb.usable && (pb.batch_ttl < 0 || pb.batch_ttl >= STAMP_LOW_TTL_SECS);
943
944    Ok(StampStatus {
945        batch_id: batch_id.to_string(),
946        exists: pb.exists,
947        usable: pb.usable,
948        batch_ttl: pb.batch_ttl,
949        healthy,
950        warnings,
951    })
952}
953
954fn human_bytes(b: u64) -> String {
955    const KIB: u64 = 1024;
956    const MIB: u64 = KIB * 1024;
957    const GIB: u64 = MIB * 1024;
958    if b >= GIB {
959        format!("{:.2} GiB", b as f64 / GIB as f64)
960    } else if b >= MIB {
961        format!("{:.2} MiB", b as f64 / MIB as f64)
962    } else if b >= KIB {
963        format!("{:.2} KiB", b as f64 / KIB as f64)
964    } else {
965        format!("{b} B")
966    }
967}
968
969fn humanize_secs(s: i64) -> String {
970    if s < 0 {
971        return "unknown".to_string();
972    }
973    let s = s as u64;
974    if s >= 86_400 {
975        format!("{} day(s)", s / 86_400)
976    } else if s >= 3_600 {
977        format!("{} hour(s)", s / 3_600)
978    } else if s >= 60 {
979        format!("{} min", s / 60)
980    } else {
981        format!("{}s", s)
982    }
983}
984
985pub fn render_report(report: &Report, fmt: OutputFormat) -> String {
986    match fmt {
987        OutputFormat::Json => {
988            serde_json::to_string_pretty(report).expect("report serialization") + "\n"
989        }
990        OutputFormat::Text => render_text(report),
991    }
992}
993
994fn render_text(r: &Report) -> String {
995    use std::fmt::Write;
996    let mut out = String::new();
997    let _ = writeln!(out, "ref     {}", r.reference);
998    let _ = writeln!(out, "status  {:?}", r.status);
999    let _ = writeln!(out);
1000    let _ = writeln!(out, "vantages:");
1001    let url_w = r.vantages.iter().map(|v| v.bee_url.len()).max().unwrap_or(20);
1002    for v in &r.vantages {
1003        let state = match (v.retrievable, &v.error) {
1004            (Some(true), _) => "retrievable",
1005            (Some(false), _) => "unretrievable",
1006            (None, Some(_)) => "error",
1007            (None, None) => "unknown",
1008        };
1009        let meta = vantage_meta(v);
1010        let _ = writeln!(
1011            out,
1012            "  {:<url_w$}  {:<14} {:>6} ms{}{}",
1013            v.bee_url,
1014            state,
1015            v.elapsed_ms,
1016            if meta.is_empty() { String::new() } else { format!("  {meta}") },
1017            v.error
1018                .as_deref()
1019                .map(|e| format!("  ({e})"))
1020                .unwrap_or_default(),
1021            url_w = url_w
1022        );
1023    }
1024    if !r.gateways.is_empty() {
1025        let _ = writeln!(out);
1026        let _ = writeln!(out, "gateways:");
1027        let url_w = r.gateways.iter().map(|g| g.url.len()).max().unwrap_or(20);
1028        for g in &r.gateways {
1029            let state = match (g.retrievable, &g.error) {
1030                (Some(true), _) => "retrievable",
1031                (Some(false), _) => "unretrievable",
1032                (None, _) => "error",
1033            };
1034            let code = g
1035                .status_code
1036                .map(|c| format!("  HTTP {c}"))
1037                .unwrap_or_default();
1038            let _ = writeln!(
1039                out,
1040                "  {:<url_w$}  {:<14} {:>6} ms{}{}",
1041                g.url,
1042                state,
1043                g.elapsed_ms,
1044                code,
1045                g.error
1046                    .as_deref()
1047                    .map(|e| format!("  ({e})"))
1048                    .unwrap_or_default(),
1049                url_w = url_w
1050            );
1051        }
1052    }
1053    if !r.cold_downloads.is_empty() {
1054        let _ = writeln!(out);
1055        let _ = writeln!(out, "cold downloads:");
1056        let url_w = r.cold_downloads.iter().map(|c| c.url.len()).max().unwrap_or(20);
1057        for c in &r.cold_downloads {
1058            let state = if c.success { "ok" } else { "fail" };
1059            let bytes = human_bytes(c.bytes_downloaded);
1060            let code = c
1061                .status_code
1062                .map(|s| format!("  HTTP {s}"))
1063                .unwrap_or_default();
1064            let _ = writeln!(
1065                out,
1066                "  {:<url_w$}  {:<4} {:>6} ms  {:>9}{}{}",
1067                c.url,
1068                state,
1069                c.elapsed_ms,
1070                bytes,
1071                code,
1072                c.error
1073                    .as_deref()
1074                    .map(|e| format!("  ({e})"))
1075                    .unwrap_or_default(),
1076                url_w = url_w
1077            );
1078        }
1079    }
1080    if let Some(res) = &r.resolution {
1081        let _ = writeln!(out);
1082        match res {
1083            Resolution::Feed { owner, topic, resolved_reference } => {
1084                let _ = writeln!(
1085                    out,
1086                    "resolved feed owner={owner} topic={topic} -> {resolved_reference}",
1087                );
1088            }
1089        }
1090    }
1091    if let Some(stats) = &r.chunk_stats {
1092        let _ = writeln!(out);
1093        let _ = writeln!(out, "chunk stats per vantage:");
1094        let url_w = stats
1095            .per_vantage
1096            .keys()
1097            .map(|k| k.len())
1098            .max()
1099            .unwrap_or(20);
1100        for (url, row) in &stats.per_vantage {
1101            let _ = writeln!(
1102                out,
1103                "  {:<url_w$}  found {:>3}/{:<3}  p50 {:>5} ms · p95 {:>5} ms · max {:>5} ms",
1104                url,
1105                row.found,
1106                row.total,
1107                fmt_ms(row.elapsed_p50_ms),
1108                fmt_ms(row.elapsed_p95_ms),
1109                fmt_ms(row.elapsed_max_ms),
1110                url_w = url_w
1111            );
1112        }
1113        if !stats.per_neighborhood.is_empty() {
1114            let _ = writeln!(out);
1115            let _ = writeln!(out, "chunk stats per neighborhood:");
1116            let mut rows: Vec<(&String, &ChunkStatRow)> =
1117                stats.per_neighborhood.iter().collect();
1118            rows.sort_by(|a, b| {
1119                b.1.elapsed_p95_ms
1120                    .unwrap_or(0)
1121                    .cmp(&a.1.elapsed_p95_ms.unwrap_or(0))
1122            });
1123            for (nb, row) in rows.iter().take(10) {
1124                let _ = writeln!(
1125                    out,
1126                    "  nb {}  found {:>3}/{:<3}  p50 {:>5} ms · p95 {:>5} ms",
1127                    nb,
1128                    row.found,
1129                    row.total,
1130                    fmt_ms(row.elapsed_p50_ms),
1131                    fmt_ms(row.elapsed_p95_ms),
1132                );
1133            }
1134            if rows.len() > 10 {
1135                let _ = writeln!(out, "  ... {} more neighborhoods", rows.len() - 10);
1136            }
1137        }
1138    }
1139    if let Some(chunks) = &r.chunks {
1140        let _ = writeln!(out);
1141        let _ = writeln!(out, "chunks: {} probed", chunks.len());
1142        let mut missing = 0usize;
1143        for c in chunks {
1144            let missing_in: Vec<String> = c
1145                .per_vantage
1146                .iter()
1147                .filter(|(_, cv)| !cv.found)
1148                .map(|(u, cv)| match cv.proximity {
1149                    Some(p) => format!("{u} (PO {p})"),
1150                    None => u.clone(),
1151                })
1152                .collect();
1153            if !missing_in.is_empty() {
1154                missing += 1;
1155                let _ = writeln!(
1156                    out,
1157                    "  [{}] {}  missing on: {}",
1158                    c.neighborhood,
1159                    short(&c.address),
1160                    missing_in.join(", ")
1161                );
1162            }
1163        }
1164        if missing == 0 {
1165            let _ = writeln!(out, "  all chunks present on all vantages");
1166        } else {
1167            let _ = writeln!(out, "  {missing} chunk(s) missing on at least one vantage");
1168        }
1169    }
1170    out
1171}
1172
1173/// Single-line metadata trailer for a vantage: overlay neighborhood,
1174/// proximity to root, target-proximity (when `--target-overlay` was
1175/// set), Bee version. Compactly formatted; pieces that weren't
1176/// fetched are silently dropped.
1177fn vantage_meta(v: &VantageResult) -> String {
1178    let mut parts: Vec<String> = Vec::new();
1179    if let Some(o) = &v.overlay {
1180        let neigh = o.chars().take(2).collect::<String>();
1181        let short_overlay = short_overlay(o);
1182        parts.push(format!("overlay {short_overlay} (nb {neigh})"));
1183    }
1184    if let Some(p) = v.proximity_to_root {
1185        parts.push(format!("PO {p}"));
1186    }
1187    if let Some(p) = v.target_proximity {
1188        parts.push(format!("tgtPO {p}"));
1189    }
1190    if let Some(ver) = &v.bee_version {
1191        parts.push(format!("v{ver}"));
1192    }
1193    if parts.is_empty() {
1194        String::new()
1195    } else {
1196        format!("· {}", parts.join(" · "))
1197    }
1198}
1199
1200fn fmt_ms(v: Option<u64>) -> String {
1201    match v {
1202        Some(ms) => format!("{ms}"),
1203        None => "—".to_string(),
1204    }
1205}
1206
1207fn short_overlay(hex: &str) -> String {
1208    let s = hex.strip_prefix("0x").unwrap_or(hex);
1209    if s.len() > 12 {
1210        format!("{}…{}", &s[..6], &s[s.len() - 4..])
1211    } else {
1212        s.to_string()
1213    }
1214}
1215
1216/// Human-readable summary of a stamp pre-flight check, suitable for
1217/// stderr before a `--reseed` operation.
1218pub fn render_stamp_status(s: &StampStatus) -> String {
1219    use std::fmt::Write;
1220    let mut out = String::new();
1221    let ttl = if s.batch_ttl < 0 {
1222        "unknown".to_string()
1223    } else {
1224        humanize_secs(s.batch_ttl)
1225    };
1226    let header = if s.healthy { "stamp OK" } else { "stamp warning" };
1227    let _ = writeln!(
1228        out,
1229        "{header}: batch {} · usable={} · ttl={}",
1230        short_overlay(&s.batch_id),
1231        s.usable,
1232        ttl,
1233    );
1234    for w in &s.warnings {
1235        let _ = writeln!(out, "  · {w}");
1236    }
1237    out
1238}
1239
1240fn short(hex: &str) -> String {
1241    if hex.len() > 16 {
1242        format!("{}…{}", &hex[..8], &hex[hex.len() - 4..])
1243    } else {
1244        hex.to_string()
1245    }
1246}