Skip to main content

bee_check/
lib.rs

1//! Core check engine for `bee-check`.
2//!
3//! The CLI in `main.rs` and (in future) the `bee-check-web` SPA both
4//! consume the same [`Report`] shape produced here. See `SPEC.md` for
5//! the JSON shape.
6
7use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use anyhow::{Context, Result, anyhow};
12use bee::Client;
13use bee::manifest::{is_null_address, unmarshal};
14use bee::swarm::gsoc::proximity;
15use bee::swarm::{BatchId, EthAddress, Reference, Topic};
16use futures::stream::{FuturesUnordered, StreamExt};
17use reqwest::Method;
18use serde::{Deserialize, Serialize};
19use tokio::sync::Semaphore;
20
21/// Default public gateway used when no `--gateway` flag is supplied.
22/// `api.gateway.ethswarm.org` is the Foundation-operated forwarding
23/// gateway that returns proper 404s for unknown references (unlike
24/// `gateway.ethswarm.org`, which currently fronts a static page that
25/// 200s everything).
26pub const DEFAULT_GATEWAY: &str = "https://api.gateway.ethswarm.org";
27
28const MAX_CHUNKS: usize = 1000;
29/// Stamp is "low TTL" below this many seconds (~24h). A re-seed
30/// against a stamp below this threshold may not outlive the batch.
31const STAMP_LOW_TTL_SECS: i64 = 86_400;
32
33#[derive(Copy, Clone, Debug)]
34pub enum OutputFormat {
35    Text,
36    Json,
37}
38
39/// Outcome enum reported per vantage and aggregated for the whole check.
40#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
41#[serde(rename_all = "snake_case")]
42pub enum Status {
43    Retrievable,
44    Unretrievable,
45    Partial,
46    Error,
47}
48
49#[derive(Debug, Serialize, Deserialize)]
50pub struct Report {
51    pub reference: String,
52    pub status: Status,
53    pub vantages: Vec<VantageResult>,
54    /// Public-gateway HEAD probes. Empty when `--no-gateway` was used
55    /// or no gateways were probed. Added in 0.3 — additive.
56    #[serde(default, skip_serializing_if = "Vec::is_empty")]
57    pub gateways: Vec<GatewayResult>,
58    /// Set when the user-provided input was a non-bare-reference (a
59    /// feed reference) that resolved to `reference`. Added in 0.3.
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub resolution: Option<Resolution>,
62    /// Populated only when `--per-chunk` was requested.
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub chunks: Option<Vec<ChunkProbe>>,
65    /// Roll-ups computed from `chunks`. Populated alongside `chunks`
66    /// (i.e. when `--per-chunk` was used). Added in 0.4 — additive.
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub chunk_stats: Option<ChunkStats>,
69    pub spec_version: u32,
70}
71
72/// Aggregate statistics over the per-chunk probe data. Per-vantage
73/// stats surface "fast on bee-A, slow on bee-B"; per-neighborhood
74/// stats surface "chunks in nb 0x1a are slow no matter which vantage".
75#[derive(Debug, Serialize, Deserialize)]
76pub struct ChunkStats {
77    pub per_vantage: BTreeMap<String, ChunkStatRow>,
78    /// Keyed by neighborhood (first 2 hex chars of the chunk address).
79    pub per_neighborhood: BTreeMap<String, ChunkStatRow>,
80}
81
82#[derive(Debug, Serialize, Deserialize, Clone)]
83pub struct ChunkStatRow {
84    pub total: usize,
85    pub found: usize,
86    pub missing: usize,
87    /// Latency percentiles in ms, computed over chunks where
88    /// `found == true`. Absent when no chunk was found.
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub elapsed_p50_ms: Option<u64>,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub elapsed_p95_ms: Option<u64>,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub elapsed_max_ms: Option<u64>,
95}
96
97/// HEAD-probe of a public Swarm gateway. The gateway resolves the
98/// reference end-to-end through forwarding Kademlia; a 2xx means
99/// "anyone could retrieve this through this gateway right now",
100/// independent of any specific Bee node the user controls.
101#[derive(Debug, Serialize, Deserialize)]
102pub struct GatewayResult {
103    pub url: String,
104    /// `true` for 2xx, `false` for 4xx/5xx, `None` when the call
105    /// errored at the network layer.
106    pub retrievable: Option<bool>,
107    pub elapsed_ms: u64,
108    /// HTTP status code returned (when the call completed).
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub status_code: Option<u16>,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub error: Option<String>,
113}
114
115/// Records the input → reference resolution when the user supplied a
116/// mutable handle (currently: Swarm feeds). Added in 0.3.
117#[derive(Debug, Serialize, Deserialize)]
118#[serde(tag = "kind", rename_all = "snake_case")]
119pub enum Resolution {
120    /// Feed lookup via `GET /feeds/{owner}/{topic}` on the first
121    /// `--bee` vantage.
122    Feed {
123        owner: String,
124        topic: String,
125        /// Hex of the resolved chunk reference.
126        resolved_reference: String,
127    },
128}
129
130#[derive(Debug, Serialize, Deserialize)]
131pub struct VantageResult {
132    pub bee_url: String,
133    /// `None` means the call errored (see `error`).
134    pub retrievable: Option<bool>,
135    pub elapsed_ms: u64,
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub error: Option<String>,
138    /// Hex overlay address of the probed Bee, from `GET /addresses`. The
139    /// first 2 hex chars are the neighborhood the node sits in.
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub overlay: Option<String>,
142    /// Bee semver, from `GET /health` (Bee's `version` field).
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub bee_version: Option<String>,
145    /// Proximity order between the probe node's overlay and the target
146    /// reference. Higher = closer to where the chunk is stored. PO 0
147    /// means the probe is not in the chunk's neighborhood at all.
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub proximity_to_root: Option<u32>,
150    /// PO between this vantage's overlay and `--target-overlay`, when
151    /// that flag was supplied. Added in 0.4 — used to surface "this
152    /// is the closest vantage to your target neighborhood".
153    #[serde(skip_serializing_if = "Option::is_none")]
154    pub target_proximity: Option<u32>,
155}
156
157#[derive(Debug, Serialize, Deserialize)]
158pub struct ChunkProbe {
159    pub address: String,
160    /// First 2 hex chars — neighborhood the chunk should land in.
161    pub neighborhood: String,
162    pub per_vantage: BTreeMap<String, ChunkVantage>,
163}
164
165#[derive(Debug, Serialize, Deserialize)]
166pub struct ChunkVantage {
167    pub found: bool,
168    pub elapsed_ms: u64,
169    #[serde(skip_serializing_if = "Option::is_none")]
170    pub error: Option<String>,
171    /// Proximity between this vantage's overlay and the chunk address.
172    /// Unset when the vantage's overlay couldn't be fetched.
173    #[serde(skip_serializing_if = "Option::is_none")]
174    pub proximity: Option<u32>,
175}
176
177pub struct ReseedRequest {
178    pub reference: String,
179    pub bee_url: String,
180    pub batch_id: String,
181    pub timeout: Duration,
182}
183
184/// Result of the `--reseed --stamp <id>` pre-flight check. Mirrors
185/// ipfs-check's "stale records" hint in spirit: surface upstream-data
186/// problems before doing the operation.
187#[derive(Debug, Serialize, Deserialize)]
188pub struct StampStatus {
189    pub batch_id: String,
190    pub exists: bool,
191    pub usable: bool,
192    pub batch_ttl: i64,
193    /// `usable && exists && batch_ttl >= STAMP_LOW_TTL_SECS` (or
194    /// `batch_ttl < 0`, meaning unknown/infinite).
195    pub healthy: bool,
196    #[serde(skip_serializing_if = "Vec::is_empty")]
197    pub warnings: Vec<String>,
198}
199
200const SPEC_VERSION: u32 = 1;
201
202fn parse_reference(s: &str) -> Result<Reference> {
203    Reference::from_hex(s).map_err(|e| anyhow!("invalid reference {s}: {e}"))
204}
205
206fn make_bee(url: &str, timeout: Duration) -> Result<Client> {
207    let http = reqwest::Client::builder()
208        .timeout(timeout)
209        .build()
210        .context("building http client")?;
211    Client::with_http_client(url, http).map_err(|e| anyhow!("invalid bee url {url}: {e}"))
212}
213
214/// Probe `/stewardship/{ref}` across all vantages in parallel. Alongside
215/// the retrievability call, fetches `/addresses` (overlay) and
216/// `/health` (Bee version) so the rendered report can surface which
217/// neighborhood the probe came from. These ancillary calls are
218/// best-effort: failure leaves `overlay` / `bee_version` unset rather
219/// than failing the whole vantage.
220pub async fn check_multi_vantage(
221    reference: &str,
222    bees: &[String],
223    timeout: Duration,
224) -> Result<Report> {
225    let r = parse_reference(reference)?;
226    let root_bytes = first_32(&r);
227
228    let mut futs = FuturesUnordered::new();
229    for bee_url in bees {
230        let bee_url = bee_url.clone();
231        let r = r.clone();
232        futs.push(async move {
233            let bee = match make_bee(&bee_url, timeout) {
234                Ok(b) => b,
235                Err(e) => {
236                    return VantageResult {
237                        bee_url,
238                        retrievable: None,
239                        elapsed_ms: 0,
240                        error: Some(format!("{e:#}")),
241                        overlay: None,
242                        bee_version: None,
243                        proximity_to_root: None,
244                        target_proximity: None,
245                    };
246                }
247            };
248            let started = Instant::now();
249            let api = bee.api();
250            let debug = bee.debug();
251            let (stew_res, addr_res, health_res) = tokio::join!(
252                api.is_retrievable(&r),
253                debug.addresses(),
254                debug.health(),
255            );
256            let elapsed_ms = started.elapsed().as_millis() as u64;
257
258            let overlay = addr_res.ok().map(|a| a.overlay);
259            let bee_version = health_res.ok().map(|h| h.version);
260            let proximity_to_root = overlay
261                .as_deref()
262                .and_then(decode_overlay)
263                .map(|o| proximity(&o, &root_bytes));
264
265            match stew_res {
266                Ok(ok) => VantageResult {
267                    bee_url,
268                    retrievable: Some(ok),
269                    elapsed_ms,
270                    error: None,
271                    overlay,
272                    bee_version,
273                    proximity_to_root,
274                    target_proximity: None,
275                },
276                Err(e) => VantageResult {
277                    bee_url,
278                    retrievable: None,
279                    elapsed_ms,
280                    error: Some(format!("{e}")),
281                    overlay,
282                    bee_version,
283                    proximity_to_root,
284                    target_proximity: None,
285                },
286            }
287        });
288    }
289
290    let mut vantages = Vec::with_capacity(bees.len());
291    while let Some(v) = futs.next().await {
292        vantages.push(v);
293    }
294    vantages.sort_by(|a, b| a.bee_url.cmp(&b.bee_url));
295
296    let status = aggregate_status(&vantages, &[]);
297    Ok(Report {
298        reference: reference.to_string(),
299        status,
300        vantages,
301        gateways: Vec::new(),
302        resolution: None,
303        chunks: None,
304        chunk_stats: None,
305        spec_version: SPEC_VERSION,
306    })
307}
308
309/// Probe public Swarm gateways via `HEAD {gateway}/bzz/{ref}` in
310/// parallel. Mirrors the BYO-Bee philosophy: gateway results
311/// complement (don't replace) the vantage probes and aggregate into
312/// the same top-level `status`.
313pub async fn check_gateways(
314    reference: &str,
315    gateway_urls: &[String],
316    timeout: Duration,
317) -> Result<Vec<GatewayResult>> {
318    // Reference is validated up front so an invalid hex doesn't waste
319    // network calls.
320    let _ = parse_reference(reference)?;
321
322    let http = reqwest::Client::builder()
323        .timeout(timeout)
324        .build()
325        .context("building http client for gateway probes")?;
326
327    let mut futs = FuturesUnordered::new();
328    for base in gateway_urls {
329        let base = base.clone();
330        let reference = reference.to_string();
331        let http = http.clone();
332        futs.push(async move {
333            let url = build_gateway_url(&base, &reference);
334            let started = Instant::now();
335            let res = http.request(Method::HEAD, &url).send().await;
336            let elapsed_ms = started.elapsed().as_millis() as u64;
337            match res {
338                Ok(resp) => {
339                    let status = resp.status().as_u16();
340                    GatewayResult {
341                        url: base,
342                        retrievable: Some(resp.status().is_success()),
343                        elapsed_ms,
344                        status_code: Some(status),
345                        error: None,
346                    }
347                }
348                Err(e) => GatewayResult {
349                    url: base,
350                    retrievable: None,
351                    elapsed_ms,
352                    status_code: None,
353                    error: Some(format!("{e}")),
354                },
355            }
356        });
357    }
358    let mut out = Vec::with_capacity(gateway_urls.len());
359    while let Some(g) = futs.next().await {
360        out.push(g);
361    }
362    out.sort_by(|a, b| a.url.cmp(&b.url));
363    Ok(out)
364}
365
366/// Append gateway results to an existing report and re-aggregate the
367/// top-level status. Splits the gateway probe from the main check so
368/// the SPA can show partial progress.
369pub fn merge_gateways(mut report: Report, gateways: Vec<GatewayResult>) -> Report {
370    report.gateways = gateways;
371    report.status = aggregate_status(&report.vantages, &report.gateways);
372    report
373}
374
375fn build_gateway_url(base: &str, reference: &str) -> String {
376    let trimmed = base.trim_end_matches('/');
377    format!("{trimmed}/bzz/{reference}/")
378}
379
380/// Resolve a `feed:OWNER:TOPIC` input to its current chunk reference.
381/// `bee_url` is the first Bee vantage; the feed lookup goes through
382/// it. Returns the resolved reference and a [`Resolution`] record
383/// describing what was done.
384pub async fn resolve_feed(
385    bee_url: &str,
386    owner_hex: &str,
387    topic_hex: &str,
388    timeout: Duration,
389) -> Result<(String, Resolution)> {
390    let bee = make_bee(bee_url, timeout)?;
391    let owner = EthAddress::from_hex(owner_hex)
392        .map_err(|e| anyhow!("invalid feed owner {owner_hex}: {e}"))?;
393    let topic = Topic::from_hex(topic_hex)
394        .map_err(|e| anyhow!("invalid feed topic {topic_hex}: {e}"))?;
395    let reference = bee
396        .file()
397        .get_feed_lookup(&owner, &topic)
398        .await
399        .map_err(anyhow::Error::from)?;
400    let r_hex = reference.to_hex();
401    Ok((
402        r_hex.clone(),
403        Resolution::Feed {
404            owner: owner.to_hex(),
405            topic: topic.to_hex(),
406            resolved_reference: r_hex,
407        },
408    ))
409}
410
411/// Parse a positional input. Accepts either:
412/// - A 64- or 128-hex Swarm reference, returned as-is with `None`
413///   resolution metadata.
414/// - `feed:OWNER:TOPIC` (40-hex owner, 64-hex topic) — caller should
415///   then call [`resolve_feed`] to turn it into a reference.
416pub fn parse_input(input: &str) -> ParsedInput {
417    if let Some(rest) = input.strip_prefix("feed:") {
418        // Accept `feed:owner:topic` and `feed:owner/topic`.
419        let parts: Vec<&str> = rest.splitn(2, [':', '/']).collect();
420        if parts.len() == 2 {
421            return ParsedInput::Feed {
422                owner: parts[0].to_string(),
423                topic: parts[1].to_string(),
424            };
425        }
426    }
427    ParsedInput::Reference(input.to_string())
428}
429
430/// Output of [`parse_input`].
431pub enum ParsedInput {
432    /// A direct Swarm reference (caller should pass it straight to
433    /// [`check_multi_vantage`]).
434    Reference(String),
435    /// A feed reference (caller should call [`resolve_feed`] first to
436    /// get the current chunk reference, then probe that).
437    Feed { owner: String, topic: String },
438}
439
440/// Strip the optional `0x` prefix, decode hex, return the first 32
441/// bytes (overlay/reference length). Returns `None` on malformed hex
442/// or short input.
443fn decode_overlay(hex: &str) -> Option<[u8; 32]> {
444    let s = hex.strip_prefix("0x").unwrap_or(hex);
445    if s.len() < 64 {
446        return None;
447    }
448    let mut out = [0u8; 32];
449    for (i, b) in out.iter_mut().enumerate() {
450        let h = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16).ok()?;
451        *b = h;
452    }
453    Some(out)
454}
455
456fn first_32(r: &Reference) -> [u8; 32] {
457    let mut out = [0u8; 32];
458    out.copy_from_slice(&r.as_bytes()[..32]);
459    out
460}
461
462fn aggregate_status(vantages: &[VantageResult], gateways: &[GatewayResult]) -> Status {
463    let outcomes: Vec<Option<bool>> = vantages
464        .iter()
465        .map(|v| v.retrievable)
466        .chain(gateways.iter().map(|g| g.retrievable))
467        .collect();
468    let total = outcomes.len();
469    if total == 0 {
470        return Status::Error;
471    }
472    let retr = outcomes.iter().filter(|o| **o == Some(true)).count();
473    let unret = outcomes.iter().filter(|o| **o == Some(false)).count();
474    let err = outcomes.iter().filter(|o| o.is_none()).count();
475    if err == total {
476        Status::Error
477    } else if retr == total {
478        Status::Retrievable
479    } else if retr == 0 && unret + err == total {
480        Status::Unretrievable
481    } else {
482        Status::Partial
483    }
484}
485
486/// Walk the manifest at `report.reference`, probe each leaf chunk via
487/// `GET /chunks/{addr}` from every vantage, and attach the result to
488/// the report. When the per-vantage overlay is known (already fetched
489/// in [`check_multi_vantage`]), each [`ChunkVantage`] also carries the
490/// proximity between that vantage and the chunk.
491pub async fn drill_down(
492    mut report: Report,
493    bees: &[String],
494    timeout: Duration,
495    concurrency: usize,
496) -> Result<Report> {
497    let r = parse_reference(&report.reference)?;
498    // Use the first vantage as the source-of-truth for manifest walking.
499    let walker_bee = bees.first().context("no bee URL for drill-down")?;
500    let walker = make_bee(walker_bee, timeout)?;
501
502    let addresses = collect_chunk_addresses(&walker, &r).await?;
503
504    // Map vantage URL → its (already-fetched) overlay bytes, for
505    // per-chunk proximity tagging without re-hitting `/addresses`.
506    let overlays: BTreeMap<String, [u8; 32]> = report
507        .vantages
508        .iter()
509        .filter_map(|v| {
510            v.overlay
511                .as_deref()
512                .and_then(decode_overlay)
513                .map(|o| (v.bee_url.clone(), o))
514        })
515        .collect();
516
517    let clients: Vec<(String, Client)> = bees
518        .iter()
519        .map(|u| make_bee(u, timeout).map(|b| (u.clone(), b)))
520        .collect::<Result<_>>()?;
521
522    let sem = Arc::new(Semaphore::new(concurrency.max(1)));
523    let mut probes = Vec::with_capacity(addresses.len());
524
525    let mut futs = FuturesUnordered::new();
526    for addr in addresses {
527        let sem = sem.clone();
528        let clients = clients.clone();
529        let overlays = overlays.clone();
530        futs.push(async move {
531            let chunk_bytes = first_32_of_ref(&addr);
532            let mut per_vantage = BTreeMap::new();
533            for (url, bee) in &clients {
534                let _permit = sem.acquire().await.expect("semaphore not closed");
535                let started = Instant::now();
536                let res = bee.file().download_chunk(&addr, None).await;
537                let elapsed_ms = started.elapsed().as_millis() as u64;
538                let prox = overlays
539                    .get(url)
540                    .map(|o| proximity(o, &chunk_bytes));
541                let cv = match res {
542                    Ok(_) => ChunkVantage {
543                        found: true,
544                        elapsed_ms,
545                        error: None,
546                        proximity: prox,
547                    },
548                    Err(e) => ChunkVantage {
549                        found: false,
550                        elapsed_ms,
551                        error: Some(format!("{e}")),
552                        proximity: prox,
553                    },
554                };
555                per_vantage.insert(url.clone(), cv);
556            }
557            let hex = addr.to_hex();
558            let neighborhood = hex.chars().take(2).collect::<String>();
559            ChunkProbe {
560                address: hex,
561                neighborhood,
562                per_vantage,
563            }
564        });
565    }
566
567    while let Some(p) = futs.next().await {
568        probes.push(p);
569    }
570    probes.sort_by(|a, b| a.address.cmp(&b.address));
571    report.chunk_stats = Some(compute_chunk_stats(&probes));
572    report.chunks = Some(probes);
573    Ok(report)
574}
575
576/// Roll up per-vantage + per-neighborhood timing statistics from a
577/// flat list of [`ChunkProbe`]s. Pure function; runs in milliseconds
578/// even for the 1000-chunk cap.
579pub fn compute_chunk_stats(probes: &[ChunkProbe]) -> ChunkStats {
580    // (url → Vec<elapsed_ms for found chunks>, found_count, missing_count)
581    let mut per_vantage: BTreeMap<String, (Vec<u64>, usize, usize)> = BTreeMap::new();
582    let mut per_neighborhood: BTreeMap<String, (Vec<u64>, usize, usize)> = BTreeMap::new();
583
584    for p in probes {
585        for (url, cv) in &p.per_vantage {
586            let entry = per_vantage.entry(url.clone()).or_default();
587            if cv.found {
588                entry.0.push(cv.elapsed_ms);
589                entry.1 += 1;
590            } else {
591                entry.2 += 1;
592            }
593            let n = per_neighborhood.entry(p.neighborhood.clone()).or_default();
594            if cv.found {
595                n.0.push(cv.elapsed_ms);
596                n.1 += 1;
597            } else {
598                n.2 += 1;
599            }
600        }
601    }
602
603    let to_row = |(latencies, found, missing): (Vec<u64>, usize, usize)| -> ChunkStatRow {
604        let mut sorted = latencies;
605        sorted.sort_unstable();
606        let p = |q: f64| -> Option<u64> {
607            if sorted.is_empty() {
608                None
609            } else {
610                let idx = ((sorted.len() as f64 - 1.0) * q).round() as usize;
611                Some(sorted[idx])
612            }
613        };
614        ChunkStatRow {
615            total: found + missing,
616            found,
617            missing,
618            elapsed_p50_ms: p(0.50),
619            elapsed_p95_ms: p(0.95),
620            elapsed_max_ms: sorted.last().copied(),
621        }
622    };
623
624    ChunkStats {
625        per_vantage: per_vantage.into_iter().map(|(k, v)| (k, to_row(v))).collect(),
626        per_neighborhood: per_neighborhood
627            .into_iter()
628            .map(|(k, v)| (k, to_row(v)))
629            .collect(),
630    }
631}
632
633/// Annotate each vantage with `target_proximity` (PO between its
634/// overlay and `target_overlay_hex`) and sort the vantages array by
635/// that proximity in descending order — closest first. No-op when
636/// `target_overlay_hex` doesn't decode. Added in 0.4.
637pub fn annotate_target_overlay(mut report: Report, target_overlay_hex: &str) -> Report {
638    let Some(target) = decode_overlay(target_overlay_hex) else {
639        return report;
640    };
641    for v in &mut report.vantages {
642        if let Some(o) = v.overlay.as_deref().and_then(decode_overlay) {
643            v.target_proximity = Some(proximity(&o, &target));
644        }
645    }
646    report
647        .vantages
648        .sort_by(|a, b| b.target_proximity.cmp(&a.target_proximity));
649    report
650}
651
652fn first_32_of_ref(r: &Reference) -> [u8; 32] {
653    let mut out = [0u8; 32];
654    out.copy_from_slice(&r.as_bytes()[..32]);
655    out
656}
657
658/// BFS-walk the manifest starting at `root`, collecting both child
659/// manifest chunk addresses and content (`target_address`) addresses.
660/// Capped at [`MAX_CHUNKS`] to bound work for pathological cases. If
661/// `root` isn't a manifest, returns just `[root]`.
662async fn collect_chunk_addresses(bee: &Client, root: &Reference) -> Result<Vec<Reference>> {
663    let mut addresses: Vec<Reference> = vec![root.clone()];
664    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
665    seen.insert(root.to_hex());
666
667    let mut queue: std::collections::VecDeque<Reference> = std::collections::VecDeque::new();
668    queue.push_back(root.clone());
669
670    while let Some(addr) = queue.pop_front() {
671        if addresses.len() >= MAX_CHUNKS {
672            break;
673        }
674        let bytes = match bee.file().download_chunk(&addr, None).await {
675            Ok(b) => b,
676            // If we can't fetch a manifest node, we can't walk deeper. The
677            // outer probe loop will still record the chunk-level miss.
678            Err(_) => continue,
679        };
680        let Ok(node) = unmarshal(&bytes, addr.as_bytes()) else {
681            // Root might be raw content; deeper nodes that don't parse
682            // are leaves — both fine to skip.
683            continue;
684        };
685        // Content reference at this node — probe it but don't recurse.
686        if !is_null_address(&node.target_address) {
687            if let Ok(r) = Reference::new(&node.target_address) {
688                if seen.insert(r.to_hex()) {
689                    addresses.push(r);
690                }
691            }
692        }
693        // Child manifest chunks — probe and recurse.
694        for fork in node.forks.values() {
695            if let Some(sa) = fork.node.self_address {
696                if let Ok(r) = Reference::new(&sa) {
697                    if seen.insert(r.to_hex()) {
698                        addresses.push(r.clone());
699                        queue.push_back(r);
700                    }
701                }
702            }
703        }
704    }
705    Ok(addresses)
706}
707
708pub async fn reseed(req: ReseedRequest) -> Result<()> {
709    let bee = make_bee(&req.bee_url, req.timeout)?;
710    let r = parse_reference(&req.reference)?;
711    let batch = BatchId::from_hex(&req.batch_id)
712        .map_err(|e| anyhow!("invalid batch id {}: {e}", req.batch_id))?;
713    bee.api().reupload(&r, &batch).await?;
714    Ok(())
715}
716
717/// Pre-flight check before `--reseed`: look up `GET /stamps/{id}` on the
718/// target Bee and surface usable/expiry concerns. Mirrors the spirit of
719/// ipfs-check's "stale records" UX hint — flag freshness problems
720/// before doing the operation.
721pub async fn check_stamp(
722    bee_url: &str,
723    batch_id: &str,
724    timeout: Duration,
725) -> Result<StampStatus> {
726    let bee = make_bee(bee_url, timeout)?;
727    let batch = BatchId::from_hex(batch_id)
728        .map_err(|e| anyhow!("invalid batch id {batch_id}: {e}"))?;
729    let pb = bee
730        .postage()
731        .get_postage_batch(&batch)
732        .await
733        .map_err(anyhow::Error::from)?;
734
735    let mut warnings = Vec::new();
736    if !pb.exists {
737        warnings.push("batch not known to this Bee".to_string());
738    }
739    if !pb.usable {
740        warnings.push("batch not usable yet (chain may be syncing)".to_string());
741    }
742    if pb.batch_ttl >= 0 && pb.batch_ttl < STAMP_LOW_TTL_SECS {
743        warnings.push(format!(
744            "batch TTL low: ~{} (re-seed may not outlive the batch)",
745            humanize_secs(pb.batch_ttl)
746        ));
747    }
748
749    let healthy = pb.exists && pb.usable && (pb.batch_ttl < 0 || pb.batch_ttl >= STAMP_LOW_TTL_SECS);
750
751    Ok(StampStatus {
752        batch_id: batch_id.to_string(),
753        exists: pb.exists,
754        usable: pb.usable,
755        batch_ttl: pb.batch_ttl,
756        healthy,
757        warnings,
758    })
759}
760
761fn humanize_secs(s: i64) -> String {
762    if s < 0 {
763        return "unknown".to_string();
764    }
765    let s = s as u64;
766    if s >= 86_400 {
767        format!("{} day(s)", s / 86_400)
768    } else if s >= 3_600 {
769        format!("{} hour(s)", s / 3_600)
770    } else if s >= 60 {
771        format!("{} min", s / 60)
772    } else {
773        format!("{}s", s)
774    }
775}
776
777pub fn render_report(report: &Report, fmt: OutputFormat) -> String {
778    match fmt {
779        OutputFormat::Json => {
780            serde_json::to_string_pretty(report).expect("report serialization") + "\n"
781        }
782        OutputFormat::Text => render_text(report),
783    }
784}
785
786fn render_text(r: &Report) -> String {
787    use std::fmt::Write;
788    let mut out = String::new();
789    let _ = writeln!(out, "ref     {}", r.reference);
790    let _ = writeln!(out, "status  {:?}", r.status);
791    let _ = writeln!(out);
792    let _ = writeln!(out, "vantages:");
793    let url_w = r.vantages.iter().map(|v| v.bee_url.len()).max().unwrap_or(20);
794    for v in &r.vantages {
795        let state = match (v.retrievable, &v.error) {
796            (Some(true), _) => "retrievable",
797            (Some(false), _) => "unretrievable",
798            (None, Some(_)) => "error",
799            (None, None) => "unknown",
800        };
801        let meta = vantage_meta(v);
802        let _ = writeln!(
803            out,
804            "  {:<url_w$}  {:<14} {:>6} ms{}{}",
805            v.bee_url,
806            state,
807            v.elapsed_ms,
808            if meta.is_empty() { String::new() } else { format!("  {meta}") },
809            v.error
810                .as_deref()
811                .map(|e| format!("  ({e})"))
812                .unwrap_or_default(),
813            url_w = url_w
814        );
815    }
816    if !r.gateways.is_empty() {
817        let _ = writeln!(out);
818        let _ = writeln!(out, "gateways:");
819        let url_w = r.gateways.iter().map(|g| g.url.len()).max().unwrap_or(20);
820        for g in &r.gateways {
821            let state = match (g.retrievable, &g.error) {
822                (Some(true), _) => "retrievable",
823                (Some(false), _) => "unretrievable",
824                (None, _) => "error",
825            };
826            let code = g
827                .status_code
828                .map(|c| format!("  HTTP {c}"))
829                .unwrap_or_default();
830            let _ = writeln!(
831                out,
832                "  {:<url_w$}  {:<14} {:>6} ms{}{}",
833                g.url,
834                state,
835                g.elapsed_ms,
836                code,
837                g.error
838                    .as_deref()
839                    .map(|e| format!("  ({e})"))
840                    .unwrap_or_default(),
841                url_w = url_w
842            );
843        }
844    }
845    if let Some(res) = &r.resolution {
846        let _ = writeln!(out);
847        match res {
848            Resolution::Feed { owner, topic, resolved_reference } => {
849                let _ = writeln!(
850                    out,
851                    "resolved feed owner={owner} topic={topic} -> {resolved_reference}",
852                );
853            }
854        }
855    }
856    if let Some(stats) = &r.chunk_stats {
857        let _ = writeln!(out);
858        let _ = writeln!(out, "chunk stats per vantage:");
859        let url_w = stats
860            .per_vantage
861            .keys()
862            .map(|k| k.len())
863            .max()
864            .unwrap_or(20);
865        for (url, row) in &stats.per_vantage {
866            let _ = writeln!(
867                out,
868                "  {:<url_w$}  found {:>3}/{:<3}  p50 {:>5} ms · p95 {:>5} ms · max {:>5} ms",
869                url,
870                row.found,
871                row.total,
872                fmt_ms(row.elapsed_p50_ms),
873                fmt_ms(row.elapsed_p95_ms),
874                fmt_ms(row.elapsed_max_ms),
875                url_w = url_w
876            );
877        }
878        if !stats.per_neighborhood.is_empty() {
879            let _ = writeln!(out);
880            let _ = writeln!(out, "chunk stats per neighborhood:");
881            let mut rows: Vec<(&String, &ChunkStatRow)> =
882                stats.per_neighborhood.iter().collect();
883            rows.sort_by(|a, b| {
884                b.1.elapsed_p95_ms
885                    .unwrap_or(0)
886                    .cmp(&a.1.elapsed_p95_ms.unwrap_or(0))
887            });
888            for (nb, row) in rows.iter().take(10) {
889                let _ = writeln!(
890                    out,
891                    "  nb {}  found {:>3}/{:<3}  p50 {:>5} ms · p95 {:>5} ms",
892                    nb,
893                    row.found,
894                    row.total,
895                    fmt_ms(row.elapsed_p50_ms),
896                    fmt_ms(row.elapsed_p95_ms),
897                );
898            }
899            if rows.len() > 10 {
900                let _ = writeln!(out, "  ... {} more neighborhoods", rows.len() - 10);
901            }
902        }
903    }
904    if let Some(chunks) = &r.chunks {
905        let _ = writeln!(out);
906        let _ = writeln!(out, "chunks: {} probed", chunks.len());
907        let mut missing = 0usize;
908        for c in chunks {
909            let missing_in: Vec<String> = c
910                .per_vantage
911                .iter()
912                .filter(|(_, cv)| !cv.found)
913                .map(|(u, cv)| match cv.proximity {
914                    Some(p) => format!("{u} (PO {p})"),
915                    None => u.clone(),
916                })
917                .collect();
918            if !missing_in.is_empty() {
919                missing += 1;
920                let _ = writeln!(
921                    out,
922                    "  [{}] {}  missing on: {}",
923                    c.neighborhood,
924                    short(&c.address),
925                    missing_in.join(", ")
926                );
927            }
928        }
929        if missing == 0 {
930            let _ = writeln!(out, "  all chunks present on all vantages");
931        } else {
932            let _ = writeln!(out, "  {missing} chunk(s) missing on at least one vantage");
933        }
934    }
935    out
936}
937
938/// Single-line metadata trailer for a vantage: overlay neighborhood,
939/// proximity to root, target-proximity (when `--target-overlay` was
940/// set), Bee version. Compactly formatted; pieces that weren't
941/// fetched are silently dropped.
942fn vantage_meta(v: &VantageResult) -> String {
943    let mut parts: Vec<String> = Vec::new();
944    if let Some(o) = &v.overlay {
945        let neigh = o.chars().take(2).collect::<String>();
946        let short_overlay = short_overlay(o);
947        parts.push(format!("overlay {short_overlay} (nb {neigh})"));
948    }
949    if let Some(p) = v.proximity_to_root {
950        parts.push(format!("PO {p}"));
951    }
952    if let Some(p) = v.target_proximity {
953        parts.push(format!("tgtPO {p}"));
954    }
955    if let Some(ver) = &v.bee_version {
956        parts.push(format!("v{ver}"));
957    }
958    if parts.is_empty() {
959        String::new()
960    } else {
961        format!("· {}", parts.join(" · "))
962    }
963}
964
965fn fmt_ms(v: Option<u64>) -> String {
966    match v {
967        Some(ms) => format!("{ms}"),
968        None => "—".to_string(),
969    }
970}
971
972fn short_overlay(hex: &str) -> String {
973    let s = hex.strip_prefix("0x").unwrap_or(hex);
974    if s.len() > 12 {
975        format!("{}…{}", &s[..6], &s[s.len() - 4..])
976    } else {
977        s.to_string()
978    }
979}
980
981/// Human-readable summary of a stamp pre-flight check, suitable for
982/// stderr before a `--reseed` operation.
983pub fn render_stamp_status(s: &StampStatus) -> String {
984    use std::fmt::Write;
985    let mut out = String::new();
986    let ttl = if s.batch_ttl < 0 {
987        "unknown".to_string()
988    } else {
989        humanize_secs(s.batch_ttl)
990    };
991    let header = if s.healthy { "stamp OK" } else { "stamp warning" };
992    let _ = writeln!(
993        out,
994        "{header}: batch {} · usable={} · ttl={}",
995        short_overlay(&s.batch_id),
996        s.usable,
997        ttl,
998    );
999    for w in &s.warnings {
1000        let _ = writeln!(out, "  · {w}");
1001    }
1002    out
1003}
1004
1005fn short(hex: &str) -> String {
1006    if hex.len() > 16 {
1007        format!("{}…{}", &hex[..8], &hex[hex.len() - 4..])
1008    } else {
1009        hex.to_string()
1010    }
1011}