Skip to main content

bee_check/
lib.rs

1//! Core check engine for `bee-check`.
2//!
3//! The CLI in `main.rs` and (in future) the `bee-check-web` SPA both
4//! consume the same [`Report`] shape produced here. See `SPEC.md` for
5//! the JSON shape.
6
7use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use anyhow::{Context, Result, anyhow};
12use bee::Client;
13use bee::manifest::{is_null_address, unmarshal};
14use bee::swarm::gsoc::proximity;
15use bee::swarm::{BatchId, Reference};
16use futures::stream::{FuturesUnordered, StreamExt};
17use serde::{Deserialize, Serialize};
18use tokio::sync::Semaphore;
19
20const MAX_CHUNKS: usize = 1000;
21/// Stamp is "low TTL" below this many seconds (~24h). A re-seed
22/// against a stamp below this threshold may not outlive the batch.
23const STAMP_LOW_TTL_SECS: i64 = 86_400;
24
25#[derive(Copy, Clone, Debug)]
26pub enum OutputFormat {
27    Text,
28    Json,
29}
30
31/// Outcome enum reported per vantage and aggregated for the whole check.
32#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
33#[serde(rename_all = "snake_case")]
34pub enum Status {
35    Retrievable,
36    Unretrievable,
37    Partial,
38    Error,
39}
40
41#[derive(Debug, Serialize, Deserialize)]
42pub struct Report {
43    pub reference: String,
44    pub status: Status,
45    pub vantages: Vec<VantageResult>,
46    /// Populated only when `--per-chunk` was requested.
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub chunks: Option<Vec<ChunkProbe>>,
49    pub spec_version: u32,
50}
51
52#[derive(Debug, Serialize, Deserialize)]
53pub struct VantageResult {
54    pub bee_url: String,
55    /// `None` means the call errored (see `error`).
56    pub retrievable: Option<bool>,
57    pub elapsed_ms: u64,
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub error: Option<String>,
60    /// Hex overlay address of the probed Bee, from `GET /addresses`. The
61    /// first 2 hex chars are the neighborhood the node sits in.
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub overlay: Option<String>,
64    /// Bee semver, from `GET /health` (Bee's `version` field).
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub bee_version: Option<String>,
67    /// Proximity order between the probe node's overlay and the target
68    /// reference. Higher = closer to where the chunk is stored. PO 0
69    /// means the probe is not in the chunk's neighborhood at all.
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub proximity_to_root: Option<u32>,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75pub struct ChunkProbe {
76    pub address: String,
77    /// First 2 hex chars — neighborhood the chunk should land in.
78    pub neighborhood: String,
79    pub per_vantage: BTreeMap<String, ChunkVantage>,
80}
81
82#[derive(Debug, Serialize, Deserialize)]
83pub struct ChunkVantage {
84    pub found: bool,
85    pub elapsed_ms: u64,
86    #[serde(skip_serializing_if = "Option::is_none")]
87    pub error: Option<String>,
88    /// Proximity between this vantage's overlay and the chunk address.
89    /// Unset when the vantage's overlay couldn't be fetched.
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub proximity: Option<u32>,
92}
93
94pub struct ReseedRequest {
95    pub reference: String,
96    pub bee_url: String,
97    pub batch_id: String,
98    pub timeout: Duration,
99}
100
101/// Result of the `--reseed --stamp <id>` pre-flight check. Mirrors
102/// ipfs-check's "stale records" hint in spirit: surface upstream-data
103/// problems before doing the operation.
104#[derive(Debug, Serialize, Deserialize)]
105pub struct StampStatus {
106    pub batch_id: String,
107    pub exists: bool,
108    pub usable: bool,
109    pub batch_ttl: i64,
110    /// `usable && exists && batch_ttl >= STAMP_LOW_TTL_SECS` (or
111    /// `batch_ttl < 0`, meaning unknown/infinite).
112    pub healthy: bool,
113    #[serde(skip_serializing_if = "Vec::is_empty")]
114    pub warnings: Vec<String>,
115}
116
117const SPEC_VERSION: u32 = 1;
118
119fn parse_reference(s: &str) -> Result<Reference> {
120    Reference::from_hex(s).map_err(|e| anyhow!("invalid reference {s}: {e}"))
121}
122
123fn make_bee(url: &str, timeout: Duration) -> Result<Client> {
124    let http = reqwest::Client::builder()
125        .timeout(timeout)
126        .build()
127        .context("building http client")?;
128    Client::with_http_client(url, http).map_err(|e| anyhow!("invalid bee url {url}: {e}"))
129}
130
131/// Probe `/stewardship/{ref}` across all vantages in parallel. Alongside
132/// the retrievability call, fetches `/addresses` (overlay) and
133/// `/health` (Bee version) so the rendered report can surface which
134/// neighborhood the probe came from. These ancillary calls are
135/// best-effort: failure leaves `overlay` / `bee_version` unset rather
136/// than failing the whole vantage.
137pub async fn check_multi_vantage(
138    reference: &str,
139    bees: &[String],
140    timeout: Duration,
141) -> Result<Report> {
142    let r = parse_reference(reference)?;
143    let root_bytes = first_32(&r);
144
145    let mut futs = FuturesUnordered::new();
146    for bee_url in bees {
147        let bee_url = bee_url.clone();
148        let r = r.clone();
149        futs.push(async move {
150            let bee = match make_bee(&bee_url, timeout) {
151                Ok(b) => b,
152                Err(e) => {
153                    return VantageResult {
154                        bee_url,
155                        retrievable: None,
156                        elapsed_ms: 0,
157                        error: Some(format!("{e:#}")),
158                        overlay: None,
159                        bee_version: None,
160                        proximity_to_root: None,
161                    };
162                }
163            };
164            let started = Instant::now();
165            let api = bee.api();
166            let debug = bee.debug();
167            let (stew_res, addr_res, health_res) = tokio::join!(
168                api.is_retrievable(&r),
169                debug.addresses(),
170                debug.health(),
171            );
172            let elapsed_ms = started.elapsed().as_millis() as u64;
173
174            let overlay = addr_res.ok().map(|a| a.overlay);
175            let bee_version = health_res.ok().map(|h| h.version);
176            let proximity_to_root = overlay
177                .as_deref()
178                .and_then(decode_overlay)
179                .map(|o| proximity(&o, &root_bytes));
180
181            match stew_res {
182                Ok(ok) => VantageResult {
183                    bee_url,
184                    retrievable: Some(ok),
185                    elapsed_ms,
186                    error: None,
187                    overlay,
188                    bee_version,
189                    proximity_to_root,
190                },
191                Err(e) => VantageResult {
192                    bee_url,
193                    retrievable: None,
194                    elapsed_ms,
195                    error: Some(format!("{e}")),
196                    overlay,
197                    bee_version,
198                    proximity_to_root,
199                },
200            }
201        });
202    }
203
204    let mut vantages = Vec::with_capacity(bees.len());
205    while let Some(v) = futs.next().await {
206        vantages.push(v);
207    }
208    vantages.sort_by(|a, b| a.bee_url.cmp(&b.bee_url));
209
210    let status = aggregate_status(&vantages);
211    Ok(Report {
212        reference: reference.to_string(),
213        status,
214        vantages,
215        chunks: None,
216        spec_version: SPEC_VERSION,
217    })
218}
219
220/// Strip the optional `0x` prefix, decode hex, return the first 32
221/// bytes (overlay/reference length). Returns `None` on malformed hex
222/// or short input.
223fn decode_overlay(hex: &str) -> Option<[u8; 32]> {
224    let s = hex.strip_prefix("0x").unwrap_or(hex);
225    if s.len() < 64 {
226        return None;
227    }
228    let mut out = [0u8; 32];
229    for (i, b) in out.iter_mut().enumerate() {
230        let h = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16).ok()?;
231        *b = h;
232    }
233    Some(out)
234}
235
236fn first_32(r: &Reference) -> [u8; 32] {
237    let mut out = [0u8; 32];
238    out.copy_from_slice(&r.as_bytes()[..32]);
239    out
240}
241
242fn aggregate_status(vantages: &[VantageResult]) -> Status {
243    let total = vantages.len();
244    if total == 0 {
245        return Status::Error;
246    }
247    let mut retr = 0usize;
248    let mut unret = 0usize;
249    let mut err = 0usize;
250    for v in vantages {
251        match v.retrievable {
252            Some(true) => retr += 1,
253            Some(false) => unret += 1,
254            None => err += 1,
255        }
256    }
257    if err == total {
258        Status::Error
259    } else if retr == total {
260        Status::Retrievable
261    } else if retr == 0 && unret + err == total {
262        Status::Unretrievable
263    } else {
264        Status::Partial
265    }
266}
267
268/// Walk the manifest at `report.reference`, probe each leaf chunk via
269/// `GET /chunks/{addr}` from every vantage, and attach the result to
270/// the report. When the per-vantage overlay is known (already fetched
271/// in [`check_multi_vantage`]), each [`ChunkVantage`] also carries the
272/// proximity between that vantage and the chunk.
273pub async fn drill_down(
274    mut report: Report,
275    bees: &[String],
276    timeout: Duration,
277    concurrency: usize,
278) -> Result<Report> {
279    let r = parse_reference(&report.reference)?;
280    // Use the first vantage as the source-of-truth for manifest walking.
281    let walker_bee = bees.first().context("no bee URL for drill-down")?;
282    let walker = make_bee(walker_bee, timeout)?;
283
284    let addresses = collect_chunk_addresses(&walker, &r).await?;
285
286    // Map vantage URL → its (already-fetched) overlay bytes, for
287    // per-chunk proximity tagging without re-hitting `/addresses`.
288    let overlays: BTreeMap<String, [u8; 32]> = report
289        .vantages
290        .iter()
291        .filter_map(|v| {
292            v.overlay
293                .as_deref()
294                .and_then(decode_overlay)
295                .map(|o| (v.bee_url.clone(), o))
296        })
297        .collect();
298
299    let clients: Vec<(String, Client)> = bees
300        .iter()
301        .map(|u| make_bee(u, timeout).map(|b| (u.clone(), b)))
302        .collect::<Result<_>>()?;
303
304    let sem = Arc::new(Semaphore::new(concurrency.max(1)));
305    let mut probes = Vec::with_capacity(addresses.len());
306
307    let mut futs = FuturesUnordered::new();
308    for addr in addresses {
309        let sem = sem.clone();
310        let clients = clients.clone();
311        let overlays = overlays.clone();
312        futs.push(async move {
313            let chunk_bytes = first_32_of_ref(&addr);
314            let mut per_vantage = BTreeMap::new();
315            for (url, bee) in &clients {
316                let _permit = sem.acquire().await.expect("semaphore not closed");
317                let started = Instant::now();
318                let res = bee.file().download_chunk(&addr, None).await;
319                let elapsed_ms = started.elapsed().as_millis() as u64;
320                let prox = overlays
321                    .get(url)
322                    .map(|o| proximity(o, &chunk_bytes));
323                let cv = match res {
324                    Ok(_) => ChunkVantage {
325                        found: true,
326                        elapsed_ms,
327                        error: None,
328                        proximity: prox,
329                    },
330                    Err(e) => ChunkVantage {
331                        found: false,
332                        elapsed_ms,
333                        error: Some(format!("{e}")),
334                        proximity: prox,
335                    },
336                };
337                per_vantage.insert(url.clone(), cv);
338            }
339            let hex = addr.to_hex();
340            let neighborhood = hex.chars().take(2).collect::<String>();
341            ChunkProbe {
342                address: hex,
343                neighborhood,
344                per_vantage,
345            }
346        });
347    }
348
349    while let Some(p) = futs.next().await {
350        probes.push(p);
351    }
352    probes.sort_by(|a, b| a.address.cmp(&b.address));
353    report.chunks = Some(probes);
354    Ok(report)
355}
356
357fn first_32_of_ref(r: &Reference) -> [u8; 32] {
358    let mut out = [0u8; 32];
359    out.copy_from_slice(&r.as_bytes()[..32]);
360    out
361}
362
363/// BFS-walk the manifest starting at `root`, collecting both child
364/// manifest chunk addresses and content (`target_address`) addresses.
365/// Capped at [`MAX_CHUNKS`] to bound work for pathological cases. If
366/// `root` isn't a manifest, returns just `[root]`.
367async fn collect_chunk_addresses(bee: &Client, root: &Reference) -> Result<Vec<Reference>> {
368    let mut addresses: Vec<Reference> = vec![root.clone()];
369    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
370    seen.insert(root.to_hex());
371
372    let mut queue: std::collections::VecDeque<Reference> = std::collections::VecDeque::new();
373    queue.push_back(root.clone());
374
375    while let Some(addr) = queue.pop_front() {
376        if addresses.len() >= MAX_CHUNKS {
377            break;
378        }
379        let bytes = match bee.file().download_chunk(&addr, None).await {
380            Ok(b) => b,
381            // If we can't fetch a manifest node, we can't walk deeper. The
382            // outer probe loop will still record the chunk-level miss.
383            Err(_) => continue,
384        };
385        let Ok(node) = unmarshal(&bytes, addr.as_bytes()) else {
386            // Root might be raw content; deeper nodes that don't parse
387            // are leaves — both fine to skip.
388            continue;
389        };
390        // Content reference at this node — probe it but don't recurse.
391        if !is_null_address(&node.target_address) {
392            if let Ok(r) = Reference::new(&node.target_address) {
393                if seen.insert(r.to_hex()) {
394                    addresses.push(r);
395                }
396            }
397        }
398        // Child manifest chunks — probe and recurse.
399        for fork in node.forks.values() {
400            if let Some(sa) = fork.node.self_address {
401                if let Ok(r) = Reference::new(&sa) {
402                    if seen.insert(r.to_hex()) {
403                        addresses.push(r.clone());
404                        queue.push_back(r);
405                    }
406                }
407            }
408        }
409    }
410    Ok(addresses)
411}
412
413pub async fn reseed(req: ReseedRequest) -> Result<()> {
414    let bee = make_bee(&req.bee_url, req.timeout)?;
415    let r = parse_reference(&req.reference)?;
416    let batch = BatchId::from_hex(&req.batch_id)
417        .map_err(|e| anyhow!("invalid batch id {}: {e}", req.batch_id))?;
418    bee.api().reupload(&r, &batch).await?;
419    Ok(())
420}
421
422/// Pre-flight check before `--reseed`: look up `GET /stamps/{id}` on the
423/// target Bee and surface usable/expiry concerns. Mirrors the spirit of
424/// ipfs-check's "stale records" UX hint — flag freshness problems
425/// before doing the operation.
426pub async fn check_stamp(
427    bee_url: &str,
428    batch_id: &str,
429    timeout: Duration,
430) -> Result<StampStatus> {
431    let bee = make_bee(bee_url, timeout)?;
432    let batch = BatchId::from_hex(batch_id)
433        .map_err(|e| anyhow!("invalid batch id {batch_id}: {e}"))?;
434    let pb = bee
435        .postage()
436        .get_postage_batch(&batch)
437        .await
438        .map_err(anyhow::Error::from)?;
439
440    let mut warnings = Vec::new();
441    if !pb.exists {
442        warnings.push("batch not known to this Bee".to_string());
443    }
444    if !pb.usable {
445        warnings.push("batch not usable yet (chain may be syncing)".to_string());
446    }
447    if pb.batch_ttl >= 0 && pb.batch_ttl < STAMP_LOW_TTL_SECS {
448        warnings.push(format!(
449            "batch TTL low: ~{} (re-seed may not outlive the batch)",
450            humanize_secs(pb.batch_ttl)
451        ));
452    }
453
454    let healthy = pb.exists && pb.usable && (pb.batch_ttl < 0 || pb.batch_ttl >= STAMP_LOW_TTL_SECS);
455
456    Ok(StampStatus {
457        batch_id: batch_id.to_string(),
458        exists: pb.exists,
459        usable: pb.usable,
460        batch_ttl: pb.batch_ttl,
461        healthy,
462        warnings,
463    })
464}
465
466fn humanize_secs(s: i64) -> String {
467    if s < 0 {
468        return "unknown".to_string();
469    }
470    let s = s as u64;
471    if s >= 86_400 {
472        format!("{} day(s)", s / 86_400)
473    } else if s >= 3_600 {
474        format!("{} hour(s)", s / 3_600)
475    } else if s >= 60 {
476        format!("{} min", s / 60)
477    } else {
478        format!("{}s", s)
479    }
480}
481
482pub fn render_report(report: &Report, fmt: OutputFormat) -> String {
483    match fmt {
484        OutputFormat::Json => {
485            serde_json::to_string_pretty(report).expect("report serialization") + "\n"
486        }
487        OutputFormat::Text => render_text(report),
488    }
489}
490
491fn render_text(r: &Report) -> String {
492    use std::fmt::Write;
493    let mut out = String::new();
494    let _ = writeln!(out, "ref     {}", r.reference);
495    let _ = writeln!(out, "status  {:?}", r.status);
496    let _ = writeln!(out);
497    let _ = writeln!(out, "vantages:");
498    let url_w = r.vantages.iter().map(|v| v.bee_url.len()).max().unwrap_or(20);
499    for v in &r.vantages {
500        let state = match (v.retrievable, &v.error) {
501            (Some(true), _) => "retrievable",
502            (Some(false), _) => "unretrievable",
503            (None, Some(_)) => "error",
504            (None, None) => "unknown",
505        };
506        let meta = vantage_meta(v);
507        let _ = writeln!(
508            out,
509            "  {:<url_w$}  {:<14} {:>6} ms{}{}",
510            v.bee_url,
511            state,
512            v.elapsed_ms,
513            if meta.is_empty() { String::new() } else { format!("  {meta}") },
514            v.error
515                .as_deref()
516                .map(|e| format!("  ({e})"))
517                .unwrap_or_default(),
518            url_w = url_w
519        );
520    }
521    if let Some(chunks) = &r.chunks {
522        let _ = writeln!(out);
523        let _ = writeln!(out, "chunks: {} probed", chunks.len());
524        let mut missing = 0usize;
525        for c in chunks {
526            let missing_in: Vec<String> = c
527                .per_vantage
528                .iter()
529                .filter(|(_, cv)| !cv.found)
530                .map(|(u, cv)| match cv.proximity {
531                    Some(p) => format!("{u} (PO {p})"),
532                    None => u.clone(),
533                })
534                .collect();
535            if !missing_in.is_empty() {
536                missing += 1;
537                let _ = writeln!(
538                    out,
539                    "  [{}] {}  missing on: {}",
540                    c.neighborhood,
541                    short(&c.address),
542                    missing_in.join(", ")
543                );
544            }
545        }
546        if missing == 0 {
547            let _ = writeln!(out, "  all chunks present on all vantages");
548        } else {
549            let _ = writeln!(out, "  {missing} chunk(s) missing on at least one vantage");
550        }
551    }
552    out
553}
554
555/// Single-line metadata trailer for a vantage: overlay neighborhood,
556/// proximity to root, Bee version. Compactly formatted; pieces that
557/// weren't fetched are silently dropped.
558fn vantage_meta(v: &VantageResult) -> String {
559    let mut parts: Vec<String> = Vec::new();
560    if let Some(o) = &v.overlay {
561        let neigh = o.chars().take(2).collect::<String>();
562        let short_overlay = short_overlay(o);
563        parts.push(format!("overlay {short_overlay} (nb {neigh})"));
564    }
565    if let Some(p) = v.proximity_to_root {
566        parts.push(format!("PO {p}"));
567    }
568    if let Some(ver) = &v.bee_version {
569        parts.push(format!("v{ver}"));
570    }
571    if parts.is_empty() {
572        String::new()
573    } else {
574        format!("· {}", parts.join(" · "))
575    }
576}
577
578fn short_overlay(hex: &str) -> String {
579    let s = hex.strip_prefix("0x").unwrap_or(hex);
580    if s.len() > 12 {
581        format!("{}…{}", &s[..6], &s[s.len() - 4..])
582    } else {
583        s.to_string()
584    }
585}
586
587/// Human-readable summary of a stamp pre-flight check, suitable for
588/// stderr before a `--reseed` operation.
589pub fn render_stamp_status(s: &StampStatus) -> String {
590    use std::fmt::Write;
591    let mut out = String::new();
592    let ttl = if s.batch_ttl < 0 {
593        "unknown".to_string()
594    } else {
595        humanize_secs(s.batch_ttl)
596    };
597    let header = if s.healthy { "stamp OK" } else { "stamp warning" };
598    let _ = writeln!(
599        out,
600        "{header}: batch {} · usable={} · ttl={}",
601        short_overlay(&s.batch_id),
602        s.usable,
603        ttl,
604    );
605    for w in &s.warnings {
606        let _ = writeln!(out, "  · {w}");
607    }
608    out
609}
610
611fn short(hex: &str) -> String {
612    if hex.len() > 16 {
613        format!("{}…{}", &hex[..8], &hex[hex.len() - 4..])
614    } else {
615        hex.to_string()
616    }
617}