Skip to main content

bee_check/
lib.rs

1//! Core check engine for `bee-check`.
2//!
3//! The CLI in `main.rs` and (in future) the `bee-check-web` SPA both
4//! consume the same [`Report`] shape produced here. See `SPEC.md` for
5//! the JSON shape.
6
7use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use anyhow::{Context, Result, anyhow};
12use bee::Client;
13use bee::manifest::{is_null_address, unmarshal};
14use bee::swarm::gsoc::proximity;
15use bee::swarm::{BatchId, EthAddress, Reference, Topic};
16use futures::stream::{FuturesUnordered, StreamExt};
17use reqwest::Method;
18use serde::{Deserialize, Serialize};
19use tokio::sync::Semaphore;
20
21/// Default public gateway used when no `--gateway` flag is supplied.
22/// `api.gateway.ethswarm.org` is the Foundation-operated forwarding
23/// gateway that returns proper 404s for unknown references (unlike
24/// `gateway.ethswarm.org`, which currently fronts a static page that
25/// 200s everything).
26pub const DEFAULT_GATEWAY: &str = "https://api.gateway.ethswarm.org";
27
28const MAX_CHUNKS: usize = 1000;
29/// Stamp is "low TTL" below this many seconds (~24h). A re-seed
30/// against a stamp below this threshold may not outlive the batch.
31const STAMP_LOW_TTL_SECS: i64 = 86_400;
32
33#[derive(Copy, Clone, Debug)]
34pub enum OutputFormat {
35    Text,
36    Json,
37}
38
39/// Outcome enum reported per vantage and aggregated for the whole check.
40#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
41#[serde(rename_all = "snake_case")]
42pub enum Status {
43    Retrievable,
44    Unretrievable,
45    Partial,
46    Error,
47}
48
49#[derive(Debug, Serialize, Deserialize)]
50pub struct Report {
51    pub reference: String,
52    pub status: Status,
53    pub vantages: Vec<VantageResult>,
54    /// Public-gateway HEAD probes. Empty when `--no-gateway` was used
55    /// or no gateways were probed. Added in 0.3 — additive.
56    #[serde(default, skip_serializing_if = "Vec::is_empty")]
57    pub gateways: Vec<GatewayResult>,
58    /// Set when the user-provided input was a non-bare-reference (a
59    /// feed reference) that resolved to `reference`. Added in 0.3.
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub resolution: Option<Resolution>,
62    /// Populated only when `--per-chunk` was requested.
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub chunks: Option<Vec<ChunkProbe>>,
65    /// Roll-ups computed from `chunks`. Populated alongside `chunks`
66    /// (i.e. when `--per-chunk` was used). Added in 0.4 — additive.
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub chunk_stats: Option<ChunkStats>,
69    /// Cold-content end-to-end download probes. One per Bee URL
70    /// (probing `GET /bytes/{ref}`) and one per gateway URL
71    /// (probing `GET /bzz/{ref}/`). Populated only when `--cold` was
72    /// requested. Added in 0.5 — additive on spec_version 1.
73    #[serde(default, skip_serializing_if = "Vec::is_empty")]
74    pub cold_downloads: Vec<ColdDownloadResult>,
75    pub spec_version: u32,
76}
77
78/// Aggregate statistics over the per-chunk probe data. Per-vantage
79/// stats surface "fast on bee-A, slow on bee-B"; per-neighborhood
80/// stats surface "chunks in nb 0x1a are slow no matter which vantage".
81#[derive(Debug, Serialize, Deserialize)]
82pub struct ChunkStats {
83    pub per_vantage: BTreeMap<String, ChunkStatRow>,
84    /// Keyed by neighborhood (first 2 hex chars of the chunk address).
85    pub per_neighborhood: BTreeMap<String, ChunkStatRow>,
86}
87
88#[derive(Debug, Serialize, Deserialize, Clone)]
89pub struct ChunkStatRow {
90    pub total: usize,
91    pub found: usize,
92    pub missing: usize,
93    /// Latency percentiles in ms, computed over chunks where
94    /// `found == true`. Absent when no chunk was found.
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub elapsed_p50_ms: Option<u64>,
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub elapsed_p95_ms: Option<u64>,
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub elapsed_max_ms: Option<u64>,
101}
102
103/// HEAD-probe of a public Swarm gateway. The gateway resolves the
104/// reference end-to-end through forwarding Kademlia; a 2xx means
105/// "anyone could retrieve this through this gateway right now",
106/// independent of any specific Bee node the user controls.
107#[derive(Debug, Serialize, Deserialize)]
108pub struct GatewayResult {
109    pub url: String,
110    /// `true` for 2xx, `false` for 4xx/5xx, `None` when the call
111    /// errored at the network layer.
112    pub retrievable: Option<bool>,
113    pub elapsed_ms: u64,
114    /// HTTP status code returned (when the call completed).
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub status_code: Option<u16>,
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub error: Option<String>,
119}
120
121/// Records the input → reference resolution when the user supplied a
122/// mutable handle (currently: Swarm feeds). Added in 0.3.
123#[derive(Debug, Serialize, Deserialize)]
124#[serde(tag = "kind", rename_all = "snake_case")]
125pub enum Resolution {
126    /// Feed lookup via `GET /feeds/{owner}/{topic}` on the first
127    /// `--bee` vantage.
128    Feed {
129        owner: String,
130        topic: String,
131        /// Hex of the resolved chunk reference.
132        resolved_reference: String,
133    },
134}
135
136#[derive(Debug, Serialize, Deserialize)]
137pub struct VantageResult {
138    pub bee_url: String,
139    /// `None` means the call errored (see `error`).
140    pub retrievable: Option<bool>,
141    pub elapsed_ms: u64,
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub error: Option<String>,
144    /// Hex overlay address of the probed Bee, from `GET /addresses`. The
145    /// first 2 hex chars are the neighborhood the node sits in.
146    #[serde(skip_serializing_if = "Option::is_none")]
147    pub overlay: Option<String>,
148    /// Bee semver, from `GET /health` (Bee's `version` field).
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub bee_version: Option<String>,
151    /// Proximity order between the probe node's overlay and the target
152    /// reference. Higher = closer to where the chunk is stored. PO 0
153    /// means the probe is not in the chunk's neighborhood at all.
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub proximity_to_root: Option<u32>,
156    /// PO between this vantage's overlay and `--target-overlay`, when
157    /// that flag was supplied. Added in 0.4 — used to surface "this
158    /// is the closest vantage to your target neighborhood".
159    #[serde(skip_serializing_if = "Option::is_none")]
160    pub target_proximity: Option<u32>,
161}
162
163#[derive(Debug, Serialize, Deserialize)]
164pub struct ChunkProbe {
165    pub address: String,
166    /// First 2 hex chars — neighborhood the chunk should land in.
167    pub neighborhood: String,
168    pub per_vantage: BTreeMap<String, ChunkVantage>,
169}
170
171#[derive(Debug, Serialize, Deserialize)]
172pub struct ChunkVantage {
173    pub found: bool,
174    pub elapsed_ms: u64,
175    #[serde(skip_serializing_if = "Option::is_none")]
176    pub error: Option<String>,
177    /// Proximity between this vantage's overlay and the chunk address.
178    /// Unset when the vantage's overlay couldn't be fetched.
179    #[serde(skip_serializing_if = "Option::is_none")]
180    pub proximity: Option<u32>,
181}
182
183/// Cold end-to-end download probe. Pulls the full byte stream from
184/// `/bytes/{ref}` (for Bee vantages) or `/bzz/{ref}/` (for gateways)
185/// and times the entire transfer. Distinct from the stewardship
186/// probe, which only walks the chunk graph — cold-download exercises
187/// the HTTP body transport too. Added in 0.5.
188#[derive(Debug, Serialize, Deserialize)]
189pub struct ColdDownloadResult {
190    pub url: String,
191    /// Path probed on this URL — `/bytes/{ref}` for vantages,
192    /// `/bzz/{ref}/` for gateways.
193    pub endpoint: String,
194    /// `true` if HTTP status was 2xx and the body fully streamed.
195    /// `false` for 4xx/5xx or transport errors during body read.
196    pub success: bool,
197    /// Total bytes streamed before the response ended (or errored).
198    pub bytes_downloaded: u64,
199    pub elapsed_ms: u64,
200    #[serde(skip_serializing_if = "Option::is_none")]
201    pub status_code: Option<u16>,
202    #[serde(skip_serializing_if = "Option::is_none")]
203    pub error: Option<String>,
204}
205
206pub struct ReseedRequest {
207    pub reference: String,
208    pub bee_url: String,
209    pub batch_id: String,
210    pub timeout: Duration,
211}
212
213/// Result of the `--reseed --stamp <id>` pre-flight check. Mirrors
214/// ipfs-check's "stale records" hint in spirit: surface upstream-data
215/// problems before doing the operation.
216#[derive(Debug, Serialize, Deserialize)]
217pub struct StampStatus {
218    pub batch_id: String,
219    pub exists: bool,
220    pub usable: bool,
221    pub batch_ttl: i64,
222    /// `usable && exists && batch_ttl >= STAMP_LOW_TTL_SECS` (or
223    /// `batch_ttl < 0`, meaning unknown/infinite).
224    pub healthy: bool,
225    #[serde(skip_serializing_if = "Vec::is_empty")]
226    pub warnings: Vec<String>,
227}
228
229const SPEC_VERSION: u32 = 1;
230
231fn parse_reference(s: &str) -> Result<Reference> {
232    Reference::from_hex(s).map_err(|e| anyhow!("invalid reference {s}: {e}"))
233}
234
235fn make_bee(url: &str, timeout: Duration) -> Result<Client> {
236    let http = reqwest::Client::builder()
237        .timeout(timeout)
238        .build()
239        .context("building http client")?;
240    Client::with_http_client(url, http).map_err(|e| anyhow!("invalid bee url {url}: {e}"))
241}
242
243/// Probe `/stewardship/{ref}` across all vantages in parallel. Alongside
244/// the retrievability call, fetches `/addresses` (overlay) and
245/// `/health` (Bee version) so the rendered report can surface which
246/// neighborhood the probe came from. These ancillary calls are
247/// best-effort: failure leaves `overlay` / `bee_version` unset rather
248/// than failing the whole vantage.
249pub async fn check_multi_vantage(
250    reference: &str,
251    bees: &[String],
252    timeout: Duration,
253) -> Result<Report> {
254    let r = parse_reference(reference)?;
255    let root_bytes = first_32(&r);
256
257    let mut futs = FuturesUnordered::new();
258    for bee_url in bees {
259        let bee_url = bee_url.clone();
260        let r = r.clone();
261        futs.push(async move {
262            let bee = match make_bee(&bee_url, timeout) {
263                Ok(b) => b,
264                Err(e) => {
265                    return VantageResult {
266                        bee_url,
267                        retrievable: None,
268                        elapsed_ms: 0,
269                        error: Some(format!("{e:#}")),
270                        overlay: None,
271                        bee_version: None,
272                        proximity_to_root: None,
273                        target_proximity: None,
274                    };
275                }
276            };
277            let started = Instant::now();
278            let api = bee.api();
279            let debug = bee.debug();
280            let (stew_res, addr_res, health_res) = tokio::join!(
281                api.is_retrievable(&r),
282                debug.addresses(),
283                debug.health(),
284            );
285            let elapsed_ms = started.elapsed().as_millis() as u64;
286
287            let overlay = addr_res.ok().map(|a| a.overlay);
288            let bee_version = health_res.ok().map(|h| h.version);
289            let proximity_to_root = overlay
290                .as_deref()
291                .and_then(decode_overlay)
292                .map(|o| proximity(&o, &root_bytes));
293
294            match stew_res {
295                Ok(ok) => VantageResult {
296                    bee_url,
297                    retrievable: Some(ok),
298                    elapsed_ms,
299                    error: None,
300                    overlay,
301                    bee_version,
302                    proximity_to_root,
303                    target_proximity: None,
304                },
305                Err(e) => VantageResult {
306                    bee_url,
307                    retrievable: None,
308                    elapsed_ms,
309                    error: Some(format!("{e}")),
310                    overlay,
311                    bee_version,
312                    proximity_to_root,
313                    target_proximity: None,
314                },
315            }
316        });
317    }
318
319    let mut vantages = Vec::with_capacity(bees.len());
320    while let Some(v) = futs.next().await {
321        vantages.push(v);
322    }
323    vantages.sort_by(|a, b| a.bee_url.cmp(&b.bee_url));
324
325    let status = aggregate_status(&vantages, &[]);
326    Ok(Report {
327        reference: reference.to_string(),
328        status,
329        vantages,
330        gateways: Vec::new(),
331        resolution: None,
332        chunks: None,
333        chunk_stats: None,
334        cold_downloads: Vec::new(),
335        spec_version: SPEC_VERSION,
336    })
337}
338
339/// Probe public Swarm gateways via `HEAD {gateway}/bzz/{ref}` in
340/// parallel. Mirrors the BYO-Bee philosophy: gateway results
341/// complement (don't replace) the vantage probes and aggregate into
342/// the same top-level `status`.
343pub async fn check_gateways(
344    reference: &str,
345    gateway_urls: &[String],
346    timeout: Duration,
347) -> Result<Vec<GatewayResult>> {
348    // Reference is validated up front so an invalid hex doesn't waste
349    // network calls.
350    let _ = parse_reference(reference)?;
351
352    let http = reqwest::Client::builder()
353        .timeout(timeout)
354        .build()
355        .context("building http client for gateway probes")?;
356
357    let mut futs = FuturesUnordered::new();
358    for base in gateway_urls {
359        let base = base.clone();
360        let reference = reference.to_string();
361        let http = http.clone();
362        futs.push(async move {
363            let url = build_gateway_url(&base, &reference);
364            let started = Instant::now();
365            let res = http.request(Method::HEAD, &url).send().await;
366            let elapsed_ms = started.elapsed().as_millis() as u64;
367            match res {
368                Ok(resp) => {
369                    let status = resp.status().as_u16();
370                    GatewayResult {
371                        url: base,
372                        retrievable: Some(resp.status().is_success()),
373                        elapsed_ms,
374                        status_code: Some(status),
375                        error: None,
376                    }
377                }
378                Err(e) => GatewayResult {
379                    url: base,
380                    retrievable: None,
381                    elapsed_ms,
382                    status_code: None,
383                    error: Some(format!("{e}")),
384                },
385            }
386        });
387    }
388    let mut out = Vec::with_capacity(gateway_urls.len());
389    while let Some(g) = futs.next().await {
390        out.push(g);
391    }
392    out.sort_by(|a, b| a.url.cmp(&b.url));
393    Ok(out)
394}
395
396/// Append gateway results to an existing report and re-aggregate the
397/// top-level status. Splits the gateway probe from the main check so
398/// the SPA can show partial progress.
399pub fn merge_gateways(mut report: Report, gateways: Vec<GatewayResult>) -> Report {
400    report.gateways = gateways;
401    report.status = aggregate_status(&report.vantages, &report.gateways);
402    report
403}
404
405fn build_gateway_url(base: &str, reference: &str) -> String {
406    let trimmed = base.trim_end_matches('/');
407    format!("{trimmed}/bzz/{reference}/")
408}
409
410fn build_bytes_url(base: &str, reference: &str) -> String {
411    let trimmed = base.trim_end_matches('/');
412    format!("{trimmed}/bytes/{reference}")
413}
414
415/// Cold end-to-end download probe. For each Bee vantage, issues
416/// `GET /bytes/{ref}` and streams the body to EOF, counting bytes
417/// and timing the full transfer. For each gateway, issues
418/// `GET /bzz/{ref}/` instead (the canonical gateway path). Records
419/// `bytes_downloaded` + `elapsed_ms` per URL. Distinct from the
420/// stewardship probe: this exercises HTTP body transport, not just
421/// chunk-graph walking.
422pub async fn cold_download_all(
423    bee_urls: &[String],
424    gateway_urls: &[String],
425    reference: &str,
426    timeout: Duration,
427) -> Result<Vec<ColdDownloadResult>> {
428    let _ = parse_reference(reference)?;
429
430    let http = reqwest::Client::builder()
431        .timeout(timeout)
432        .build()
433        .context("building http client for cold-download probes")?;
434
435    // (base_url, endpoint_label, full_probe_url)
436    let mut targets: Vec<(String, String, String)> =
437        Vec::with_capacity(bee_urls.len() + gateway_urls.len());
438    for url in bee_urls {
439        targets.push((
440            url.clone(),
441            "/bytes/{ref}".to_string(),
442            build_bytes_url(url, reference),
443        ));
444    }
445    for url in gateway_urls {
446        targets.push((
447            url.clone(),
448            "/bzz/{ref}/".to_string(),
449            build_gateway_url(url, reference),
450        ));
451    }
452
453    let mut futs = FuturesUnordered::new();
454    for (base, endpoint, probe_url) in targets {
455        let http = http.clone();
456        futs.push(async move { cold_probe(&http, base, endpoint, probe_url).await });
457    }
458
459    let mut out = Vec::with_capacity(bee_urls.len() + gateway_urls.len());
460    while let Some(c) = futs.next().await {
461        out.push(c);
462    }
463    out.sort_by(|a, b| a.url.cmp(&b.url));
464    Ok(out)
465}
466
467async fn cold_probe(
468    http: &reqwest::Client,
469    base: String,
470    endpoint: String,
471    probe_url: String,
472) -> ColdDownloadResult {
473    let started = Instant::now();
474    let res = http.get(&probe_url).send().await;
475    match res {
476        Ok(mut resp) => {
477            let status = resp.status();
478            let status_code = status.as_u16();
479            if !status.is_success() {
480                return ColdDownloadResult {
481                    url: base,
482                    endpoint,
483                    success: false,
484                    bytes_downloaded: 0,
485                    elapsed_ms: started.elapsed().as_millis() as u64,
486                    status_code: Some(status_code),
487                    error: Some(format!("HTTP {status_code}")),
488                };
489            }
490            let mut bytes: u64 = 0;
491            let mut err: Option<String> = None;
492            loop {
493                match resp.chunk().await {
494                    Ok(Some(chunk)) => bytes += chunk.len() as u64,
495                    Ok(None) => break,
496                    Err(e) => {
497                        err = Some(format!("stream error after {bytes} bytes: {e}"));
498                        break;
499                    }
500                }
501            }
502            ColdDownloadResult {
503                url: base,
504                endpoint,
505                success: err.is_none(),
506                bytes_downloaded: bytes,
507                elapsed_ms: started.elapsed().as_millis() as u64,
508                status_code: Some(status_code),
509                error: err,
510            }
511        }
512        Err(e) => ColdDownloadResult {
513            url: base,
514            endpoint,
515            success: false,
516            bytes_downloaded: 0,
517            elapsed_ms: started.elapsed().as_millis() as u64,
518            status_code: None,
519            error: Some(format!("{e}")),
520        },
521    }
522}
523
524/// Resolve a `feed:OWNER:TOPIC` input to its current chunk reference.
525/// `bee_url` is the first Bee vantage; the feed lookup goes through
526/// it. Returns the resolved reference and a [`Resolution`] record
527/// describing what was done.
528pub async fn resolve_feed(
529    bee_url: &str,
530    owner_hex: &str,
531    topic_hex: &str,
532    timeout: Duration,
533) -> Result<(String, Resolution)> {
534    let bee = make_bee(bee_url, timeout)?;
535    let owner = EthAddress::from_hex(owner_hex)
536        .map_err(|e| anyhow!("invalid feed owner {owner_hex}: {e}"))?;
537    let topic = Topic::from_hex(topic_hex)
538        .map_err(|e| anyhow!("invalid feed topic {topic_hex}: {e}"))?;
539    let reference = bee
540        .file()
541        .get_feed_lookup(&owner, &topic)
542        .await
543        .map_err(anyhow::Error::from)?;
544    let r_hex = reference.to_hex();
545    Ok((
546        r_hex.clone(),
547        Resolution::Feed {
548            owner: owner.to_hex(),
549            topic: topic.to_hex(),
550            resolved_reference: r_hex,
551        },
552    ))
553}
554
555/// Parse a positional input. Accepts either:
556/// - A 64- or 128-hex Swarm reference, returned as-is with `None`
557///   resolution metadata.
558/// - `feed:OWNER:TOPIC` (40-hex owner, 64-hex topic) — caller should
559///   then call [`resolve_feed`] to turn it into a reference.
560pub fn parse_input(input: &str) -> ParsedInput {
561    if let Some(rest) = input.strip_prefix("feed:") {
562        // Accept `feed:owner:topic` and `feed:owner/topic`.
563        let parts: Vec<&str> = rest.splitn(2, [':', '/']).collect();
564        if parts.len() == 2 {
565            return ParsedInput::Feed {
566                owner: parts[0].to_string(),
567                topic: parts[1].to_string(),
568            };
569        }
570    }
571    ParsedInput::Reference(input.to_string())
572}
573
574/// Output of [`parse_input`].
575pub enum ParsedInput {
576    /// A direct Swarm reference (caller should pass it straight to
577    /// [`check_multi_vantage`]).
578    Reference(String),
579    /// A feed reference (caller should call [`resolve_feed`] first to
580    /// get the current chunk reference, then probe that).
581    Feed { owner: String, topic: String },
582}
583
584/// Strip the optional `0x` prefix, decode hex, return the first 32
585/// bytes (overlay/reference length). Returns `None` on malformed hex
586/// or short input.
587fn decode_overlay(hex: &str) -> Option<[u8; 32]> {
588    let s = hex.strip_prefix("0x").unwrap_or(hex);
589    if s.len() < 64 {
590        return None;
591    }
592    let mut out = [0u8; 32];
593    for (i, b) in out.iter_mut().enumerate() {
594        let h = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16).ok()?;
595        *b = h;
596    }
597    Some(out)
598}
599
600fn first_32(r: &Reference) -> [u8; 32] {
601    let mut out = [0u8; 32];
602    out.copy_from_slice(&r.as_bytes()[..32]);
603    out
604}
605
606fn aggregate_status(vantages: &[VantageResult], gateways: &[GatewayResult]) -> Status {
607    let outcomes: Vec<Option<bool>> = vantages
608        .iter()
609        .map(|v| v.retrievable)
610        .chain(gateways.iter().map(|g| g.retrievable))
611        .collect();
612    let total = outcomes.len();
613    if total == 0 {
614        return Status::Error;
615    }
616    let retr = outcomes.iter().filter(|o| **o == Some(true)).count();
617    let unret = outcomes.iter().filter(|o| **o == Some(false)).count();
618    let err = outcomes.iter().filter(|o| o.is_none()).count();
619    if err == total {
620        Status::Error
621    } else if retr == total {
622        Status::Retrievable
623    } else if retr == 0 && unret + err == total {
624        Status::Unretrievable
625    } else {
626        Status::Partial
627    }
628}
629
630/// Walk the manifest at `report.reference`, probe each leaf chunk via
631/// `GET /chunks/{addr}` from every vantage, and attach the result to
632/// the report. When the per-vantage overlay is known (already fetched
633/// in [`check_multi_vantage`]), each [`ChunkVantage`] also carries the
634/// proximity between that vantage and the chunk.
635pub async fn drill_down(
636    mut report: Report,
637    bees: &[String],
638    timeout: Duration,
639    concurrency: usize,
640) -> Result<Report> {
641    let r = parse_reference(&report.reference)?;
642    // Use the first vantage as the source-of-truth for manifest walking.
643    let walker_bee = bees.first().context("no bee URL for drill-down")?;
644    let walker = make_bee(walker_bee, timeout)?;
645
646    let addresses = collect_chunk_addresses(&walker, &r).await?;
647
648    // Map vantage URL → its (already-fetched) overlay bytes, for
649    // per-chunk proximity tagging without re-hitting `/addresses`.
650    let overlays: BTreeMap<String, [u8; 32]> = report
651        .vantages
652        .iter()
653        .filter_map(|v| {
654            v.overlay
655                .as_deref()
656                .and_then(decode_overlay)
657                .map(|o| (v.bee_url.clone(), o))
658        })
659        .collect();
660
661    let clients: Vec<(String, Client)> = bees
662        .iter()
663        .map(|u| make_bee(u, timeout).map(|b| (u.clone(), b)))
664        .collect::<Result<_>>()?;
665
666    let sem = Arc::new(Semaphore::new(concurrency.max(1)));
667    let mut probes = Vec::with_capacity(addresses.len());
668
669    let mut futs = FuturesUnordered::new();
670    for addr in addresses {
671        let sem = sem.clone();
672        let clients = clients.clone();
673        let overlays = overlays.clone();
674        futs.push(async move {
675            let chunk_bytes = first_32_of_ref(&addr);
676            let mut per_vantage = BTreeMap::new();
677            for (url, bee) in &clients {
678                let _permit = sem.acquire().await.expect("semaphore not closed");
679                let started = Instant::now();
680                let res = bee.file().download_chunk(&addr, None).await;
681                let elapsed_ms = started.elapsed().as_millis() as u64;
682                let prox = overlays
683                    .get(url)
684                    .map(|o| proximity(o, &chunk_bytes));
685                let cv = match res {
686                    Ok(_) => ChunkVantage {
687                        found: true,
688                        elapsed_ms,
689                        error: None,
690                        proximity: prox,
691                    },
692                    Err(e) => ChunkVantage {
693                        found: false,
694                        elapsed_ms,
695                        error: Some(format!("{e}")),
696                        proximity: prox,
697                    },
698                };
699                per_vantage.insert(url.clone(), cv);
700            }
701            let hex = addr.to_hex();
702            let neighborhood = hex.chars().take(2).collect::<String>();
703            ChunkProbe {
704                address: hex,
705                neighborhood,
706                per_vantage,
707            }
708        });
709    }
710
711    while let Some(p) = futs.next().await {
712        probes.push(p);
713    }
714    probes.sort_by(|a, b| a.address.cmp(&b.address));
715    report.chunk_stats = Some(compute_chunk_stats(&probes));
716    report.chunks = Some(probes);
717    Ok(report)
718}
719
720/// Roll up per-vantage + per-neighborhood timing statistics from a
721/// flat list of [`ChunkProbe`]s. Pure function; runs in milliseconds
722/// even for the 1000-chunk cap.
723pub fn compute_chunk_stats(probes: &[ChunkProbe]) -> ChunkStats {
724    // (url → Vec<elapsed_ms for found chunks>, found_count, missing_count)
725    let mut per_vantage: BTreeMap<String, (Vec<u64>, usize, usize)> = BTreeMap::new();
726    let mut per_neighborhood: BTreeMap<String, (Vec<u64>, usize, usize)> = BTreeMap::new();
727
728    for p in probes {
729        for (url, cv) in &p.per_vantage {
730            let entry = per_vantage.entry(url.clone()).or_default();
731            if cv.found {
732                entry.0.push(cv.elapsed_ms);
733                entry.1 += 1;
734            } else {
735                entry.2 += 1;
736            }
737            let n = per_neighborhood.entry(p.neighborhood.clone()).or_default();
738            if cv.found {
739                n.0.push(cv.elapsed_ms);
740                n.1 += 1;
741            } else {
742                n.2 += 1;
743            }
744        }
745    }
746
747    let to_row = |(latencies, found, missing): (Vec<u64>, usize, usize)| -> ChunkStatRow {
748        let mut sorted = latencies;
749        sorted.sort_unstable();
750        let p = |q: f64| -> Option<u64> {
751            if sorted.is_empty() {
752                None
753            } else {
754                let idx = ((sorted.len() as f64 - 1.0) * q).round() as usize;
755                Some(sorted[idx])
756            }
757        };
758        ChunkStatRow {
759            total: found + missing,
760            found,
761            missing,
762            elapsed_p50_ms: p(0.50),
763            elapsed_p95_ms: p(0.95),
764            elapsed_max_ms: sorted.last().copied(),
765        }
766    };
767
768    ChunkStats {
769        per_vantage: per_vantage.into_iter().map(|(k, v)| (k, to_row(v))).collect(),
770        per_neighborhood: per_neighborhood
771            .into_iter()
772            .map(|(k, v)| (k, to_row(v)))
773            .collect(),
774    }
775}
776
777/// Annotate each vantage with `target_proximity` (PO between its
778/// overlay and `target_overlay_hex`) and sort the vantages array by
779/// that proximity in descending order — closest first. No-op when
780/// `target_overlay_hex` doesn't decode. Added in 0.4.
781pub fn annotate_target_overlay(mut report: Report, target_overlay_hex: &str) -> Report {
782    let Some(target) = decode_overlay(target_overlay_hex) else {
783        return report;
784    };
785    for v in &mut report.vantages {
786        if let Some(o) = v.overlay.as_deref().and_then(decode_overlay) {
787            v.target_proximity = Some(proximity(&o, &target));
788        }
789    }
790    report
791        .vantages
792        .sort_by(|a, b| b.target_proximity.cmp(&a.target_proximity));
793    report
794}
795
796fn first_32_of_ref(r: &Reference) -> [u8; 32] {
797    let mut out = [0u8; 32];
798    out.copy_from_slice(&r.as_bytes()[..32]);
799    out
800}
801
802/// BFS-walk the manifest starting at `root`, collecting both child
803/// manifest chunk addresses and content (`target_address`) addresses.
804/// Capped at [`MAX_CHUNKS`] to bound work for pathological cases. If
805/// `root` isn't a manifest, returns just `[root]`.
806async fn collect_chunk_addresses(bee: &Client, root: &Reference) -> Result<Vec<Reference>> {
807    let mut addresses: Vec<Reference> = vec![root.clone()];
808    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
809    seen.insert(root.to_hex());
810
811    let mut queue: std::collections::VecDeque<Reference> = std::collections::VecDeque::new();
812    queue.push_back(root.clone());
813
814    while let Some(addr) = queue.pop_front() {
815        if addresses.len() >= MAX_CHUNKS {
816            break;
817        }
818        let bytes = match bee.file().download_chunk(&addr, None).await {
819            Ok(b) => b,
820            // If we can't fetch a manifest node, we can't walk deeper. The
821            // outer probe loop will still record the chunk-level miss.
822            Err(_) => continue,
823        };
824        let Ok(node) = unmarshal(&bytes, addr.as_bytes()) else {
825            // Root might be raw content; deeper nodes that don't parse
826            // are leaves — both fine to skip.
827            continue;
828        };
829        // Content reference at this node — probe it but don't recurse.
830        if !is_null_address(&node.target_address) {
831            if let Ok(r) = Reference::new(&node.target_address) {
832                if seen.insert(r.to_hex()) {
833                    addresses.push(r);
834                }
835            }
836        }
837        // Child manifest chunks — probe and recurse.
838        for fork in node.forks.values() {
839            if let Some(sa) = fork.node.self_address {
840                if let Ok(r) = Reference::new(&sa) {
841                    if seen.insert(r.to_hex()) {
842                        addresses.push(r.clone());
843                        queue.push_back(r);
844                    }
845                }
846            }
847        }
848    }
849    Ok(addresses)
850}
851
852pub async fn reseed(req: ReseedRequest) -> Result<()> {
853    let bee = make_bee(&req.bee_url, req.timeout)?;
854    let r = parse_reference(&req.reference)?;
855    let batch = BatchId::from_hex(&req.batch_id)
856        .map_err(|e| anyhow!("invalid batch id {}: {e}", req.batch_id))?;
857    bee.api().reupload(&r, &batch).await?;
858    Ok(())
859}
860
861/// Pre-flight check before `--reseed`: look up `GET /stamps/{id}` on the
862/// target Bee and surface usable/expiry concerns. Mirrors the spirit of
863/// ipfs-check's "stale records" UX hint — flag freshness problems
864/// before doing the operation.
865pub async fn check_stamp(
866    bee_url: &str,
867    batch_id: &str,
868    timeout: Duration,
869) -> Result<StampStatus> {
870    let bee = make_bee(bee_url, timeout)?;
871    let batch = BatchId::from_hex(batch_id)
872        .map_err(|e| anyhow!("invalid batch id {batch_id}: {e}"))?;
873    let pb = bee
874        .postage()
875        .get_postage_batch(&batch)
876        .await
877        .map_err(anyhow::Error::from)?;
878
879    let mut warnings = Vec::new();
880    if !pb.exists {
881        warnings.push("batch not known to this Bee".to_string());
882    }
883    if !pb.usable {
884        warnings.push("batch not usable yet (chain may be syncing)".to_string());
885    }
886    if pb.batch_ttl >= 0 && pb.batch_ttl < STAMP_LOW_TTL_SECS {
887        warnings.push(format!(
888            "batch TTL low: ~{} (re-seed may not outlive the batch)",
889            humanize_secs(pb.batch_ttl)
890        ));
891    }
892
893    let healthy = pb.exists && pb.usable && (pb.batch_ttl < 0 || pb.batch_ttl >= STAMP_LOW_TTL_SECS);
894
895    Ok(StampStatus {
896        batch_id: batch_id.to_string(),
897        exists: pb.exists,
898        usable: pb.usable,
899        batch_ttl: pb.batch_ttl,
900        healthy,
901        warnings,
902    })
903}
904
905fn human_bytes(b: u64) -> String {
906    const KIB: u64 = 1024;
907    const MIB: u64 = KIB * 1024;
908    const GIB: u64 = MIB * 1024;
909    if b >= GIB {
910        format!("{:.2} GiB", b as f64 / GIB as f64)
911    } else if b >= MIB {
912        format!("{:.2} MiB", b as f64 / MIB as f64)
913    } else if b >= KIB {
914        format!("{:.2} KiB", b as f64 / KIB as f64)
915    } else {
916        format!("{b} B")
917    }
918}
919
920fn humanize_secs(s: i64) -> String {
921    if s < 0 {
922        return "unknown".to_string();
923    }
924    let s = s as u64;
925    if s >= 86_400 {
926        format!("{} day(s)", s / 86_400)
927    } else if s >= 3_600 {
928        format!("{} hour(s)", s / 3_600)
929    } else if s >= 60 {
930        format!("{} min", s / 60)
931    } else {
932        format!("{}s", s)
933    }
934}
935
936pub fn render_report(report: &Report, fmt: OutputFormat) -> String {
937    match fmt {
938        OutputFormat::Json => {
939            serde_json::to_string_pretty(report).expect("report serialization") + "\n"
940        }
941        OutputFormat::Text => render_text(report),
942    }
943}
944
945fn render_text(r: &Report) -> String {
946    use std::fmt::Write;
947    let mut out = String::new();
948    let _ = writeln!(out, "ref     {}", r.reference);
949    let _ = writeln!(out, "status  {:?}", r.status);
950    let _ = writeln!(out);
951    let _ = writeln!(out, "vantages:");
952    let url_w = r.vantages.iter().map(|v| v.bee_url.len()).max().unwrap_or(20);
953    for v in &r.vantages {
954        let state = match (v.retrievable, &v.error) {
955            (Some(true), _) => "retrievable",
956            (Some(false), _) => "unretrievable",
957            (None, Some(_)) => "error",
958            (None, None) => "unknown",
959        };
960        let meta = vantage_meta(v);
961        let _ = writeln!(
962            out,
963            "  {:<url_w$}  {:<14} {:>6} ms{}{}",
964            v.bee_url,
965            state,
966            v.elapsed_ms,
967            if meta.is_empty() { String::new() } else { format!("  {meta}") },
968            v.error
969                .as_deref()
970                .map(|e| format!("  ({e})"))
971                .unwrap_or_default(),
972            url_w = url_w
973        );
974    }
975    if !r.gateways.is_empty() {
976        let _ = writeln!(out);
977        let _ = writeln!(out, "gateways:");
978        let url_w = r.gateways.iter().map(|g| g.url.len()).max().unwrap_or(20);
979        for g in &r.gateways {
980            let state = match (g.retrievable, &g.error) {
981                (Some(true), _) => "retrievable",
982                (Some(false), _) => "unretrievable",
983                (None, _) => "error",
984            };
985            let code = g
986                .status_code
987                .map(|c| format!("  HTTP {c}"))
988                .unwrap_or_default();
989            let _ = writeln!(
990                out,
991                "  {:<url_w$}  {:<14} {:>6} ms{}{}",
992                g.url,
993                state,
994                g.elapsed_ms,
995                code,
996                g.error
997                    .as_deref()
998                    .map(|e| format!("  ({e})"))
999                    .unwrap_or_default(),
1000                url_w = url_w
1001            );
1002        }
1003    }
1004    if !r.cold_downloads.is_empty() {
1005        let _ = writeln!(out);
1006        let _ = writeln!(out, "cold downloads:");
1007        let url_w = r.cold_downloads.iter().map(|c| c.url.len()).max().unwrap_or(20);
1008        for c in &r.cold_downloads {
1009            let state = if c.success { "ok" } else { "fail" };
1010            let bytes = human_bytes(c.bytes_downloaded);
1011            let code = c
1012                .status_code
1013                .map(|s| format!("  HTTP {s}"))
1014                .unwrap_or_default();
1015            let _ = writeln!(
1016                out,
1017                "  {:<url_w$}  {:<4} {:>6} ms  {:>9}{}{}",
1018                c.url,
1019                state,
1020                c.elapsed_ms,
1021                bytes,
1022                code,
1023                c.error
1024                    .as_deref()
1025                    .map(|e| format!("  ({e})"))
1026                    .unwrap_or_default(),
1027                url_w = url_w
1028            );
1029        }
1030    }
1031    if let Some(res) = &r.resolution {
1032        let _ = writeln!(out);
1033        match res {
1034            Resolution::Feed { owner, topic, resolved_reference } => {
1035                let _ = writeln!(
1036                    out,
1037                    "resolved feed owner={owner} topic={topic} -> {resolved_reference}",
1038                );
1039            }
1040        }
1041    }
1042    if let Some(stats) = &r.chunk_stats {
1043        let _ = writeln!(out);
1044        let _ = writeln!(out, "chunk stats per vantage:");
1045        let url_w = stats
1046            .per_vantage
1047            .keys()
1048            .map(|k| k.len())
1049            .max()
1050            .unwrap_or(20);
1051        for (url, row) in &stats.per_vantage {
1052            let _ = writeln!(
1053                out,
1054                "  {:<url_w$}  found {:>3}/{:<3}  p50 {:>5} ms · p95 {:>5} ms · max {:>5} ms",
1055                url,
1056                row.found,
1057                row.total,
1058                fmt_ms(row.elapsed_p50_ms),
1059                fmt_ms(row.elapsed_p95_ms),
1060                fmt_ms(row.elapsed_max_ms),
1061                url_w = url_w
1062            );
1063        }
1064        if !stats.per_neighborhood.is_empty() {
1065            let _ = writeln!(out);
1066            let _ = writeln!(out, "chunk stats per neighborhood:");
1067            let mut rows: Vec<(&String, &ChunkStatRow)> =
1068                stats.per_neighborhood.iter().collect();
1069            rows.sort_by(|a, b| {
1070                b.1.elapsed_p95_ms
1071                    .unwrap_or(0)
1072                    .cmp(&a.1.elapsed_p95_ms.unwrap_or(0))
1073            });
1074            for (nb, row) in rows.iter().take(10) {
1075                let _ = writeln!(
1076                    out,
1077                    "  nb {}  found {:>3}/{:<3}  p50 {:>5} ms · p95 {:>5} ms",
1078                    nb,
1079                    row.found,
1080                    row.total,
1081                    fmt_ms(row.elapsed_p50_ms),
1082                    fmt_ms(row.elapsed_p95_ms),
1083                );
1084            }
1085            if rows.len() > 10 {
1086                let _ = writeln!(out, "  ... {} more neighborhoods", rows.len() - 10);
1087            }
1088        }
1089    }
1090    if let Some(chunks) = &r.chunks {
1091        let _ = writeln!(out);
1092        let _ = writeln!(out, "chunks: {} probed", chunks.len());
1093        let mut missing = 0usize;
1094        for c in chunks {
1095            let missing_in: Vec<String> = c
1096                .per_vantage
1097                .iter()
1098                .filter(|(_, cv)| !cv.found)
1099                .map(|(u, cv)| match cv.proximity {
1100                    Some(p) => format!("{u} (PO {p})"),
1101                    None => u.clone(),
1102                })
1103                .collect();
1104            if !missing_in.is_empty() {
1105                missing += 1;
1106                let _ = writeln!(
1107                    out,
1108                    "  [{}] {}  missing on: {}",
1109                    c.neighborhood,
1110                    short(&c.address),
1111                    missing_in.join(", ")
1112                );
1113            }
1114        }
1115        if missing == 0 {
1116            let _ = writeln!(out, "  all chunks present on all vantages");
1117        } else {
1118            let _ = writeln!(out, "  {missing} chunk(s) missing on at least one vantage");
1119        }
1120    }
1121    out
1122}
1123
1124/// Single-line metadata trailer for a vantage: overlay neighborhood,
1125/// proximity to root, target-proximity (when `--target-overlay` was
1126/// set), Bee version. Compactly formatted; pieces that weren't
1127/// fetched are silently dropped.
1128fn vantage_meta(v: &VantageResult) -> String {
1129    let mut parts: Vec<String> = Vec::new();
1130    if let Some(o) = &v.overlay {
1131        let neigh = o.chars().take(2).collect::<String>();
1132        let short_overlay = short_overlay(o);
1133        parts.push(format!("overlay {short_overlay} (nb {neigh})"));
1134    }
1135    if let Some(p) = v.proximity_to_root {
1136        parts.push(format!("PO {p}"));
1137    }
1138    if let Some(p) = v.target_proximity {
1139        parts.push(format!("tgtPO {p}"));
1140    }
1141    if let Some(ver) = &v.bee_version {
1142        parts.push(format!("v{ver}"));
1143    }
1144    if parts.is_empty() {
1145        String::new()
1146    } else {
1147        format!("· {}", parts.join(" · "))
1148    }
1149}
1150
1151fn fmt_ms(v: Option<u64>) -> String {
1152    match v {
1153        Some(ms) => format!("{ms}"),
1154        None => "—".to_string(),
1155    }
1156}
1157
1158fn short_overlay(hex: &str) -> String {
1159    let s = hex.strip_prefix("0x").unwrap_or(hex);
1160    if s.len() > 12 {
1161        format!("{}…{}", &s[..6], &s[s.len() - 4..])
1162    } else {
1163        s.to_string()
1164    }
1165}
1166
1167/// Human-readable summary of a stamp pre-flight check, suitable for
1168/// stderr before a `--reseed` operation.
1169pub fn render_stamp_status(s: &StampStatus) -> String {
1170    use std::fmt::Write;
1171    let mut out = String::new();
1172    let ttl = if s.batch_ttl < 0 {
1173        "unknown".to_string()
1174    } else {
1175        humanize_secs(s.batch_ttl)
1176    };
1177    let header = if s.healthy { "stamp OK" } else { "stamp warning" };
1178    let _ = writeln!(
1179        out,
1180        "{header}: batch {} · usable={} · ttl={}",
1181        short_overlay(&s.batch_id),
1182        s.usable,
1183        ttl,
1184    );
1185    for w in &s.warnings {
1186        let _ = writeln!(out, "  · {w}");
1187    }
1188    out
1189}
1190
1191fn short(hex: &str) -> String {
1192    if hex.len() > 16 {
1193        format!("{}…{}", &hex[..8], &hex[hex.len() - 4..])
1194    } else {
1195        hex.to_string()
1196    }
1197}