Skip to main content

bee_check/
lib.rs

1//! Core check engine for `bee-check`.
2//!
3//! The CLI in `main.rs` and (in future) the `bee-check-web` SPA both
4//! consume the same [`Report`] shape produced here. See `SPEC.md` for
5//! the JSON shape.
6
7use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use anyhow::{Context, Result, anyhow};
12use bee::Client;
13use bee::manifest::{is_null_address, unmarshal};
14use bee::swarm::{BatchId, Reference};
15use futures::stream::{FuturesUnordered, StreamExt};
16use serde::{Deserialize, Serialize};
17use tokio::sync::Semaphore;
18
19const MAX_CHUNKS: usize = 1000;
20
21#[derive(Copy, Clone, Debug)]
22pub enum OutputFormat {
23    Text,
24    Json,
25}
26
27/// Outcome enum reported per vantage and aggregated for the whole check.
28#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
29#[serde(rename_all = "snake_case")]
30pub enum Status {
31    Retrievable,
32    Unretrievable,
33    Partial,
34    Error,
35}
36
37#[derive(Debug, Serialize, Deserialize)]
38pub struct Report {
39    pub reference: String,
40    pub status: Status,
41    pub vantages: Vec<VantageResult>,
42    /// Populated only when `--per-chunk` was requested.
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub chunks: Option<Vec<ChunkProbe>>,
45    pub spec_version: u32,
46}
47
48#[derive(Debug, Serialize, Deserialize)]
49pub struct VantageResult {
50    pub bee_url: String,
51    /// `None` means the call errored (see `error`).
52    pub retrievable: Option<bool>,
53    pub elapsed_ms: u64,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub error: Option<String>,
56}
57
58#[derive(Debug, Serialize, Deserialize)]
59pub struct ChunkProbe {
60    pub address: String,
61    /// First 2 hex chars — neighborhood the chunk should land in.
62    pub neighborhood: String,
63    pub per_vantage: BTreeMap<String, ChunkVantage>,
64}
65
66#[derive(Debug, Serialize, Deserialize)]
67pub struct ChunkVantage {
68    pub found: bool,
69    pub elapsed_ms: u64,
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub error: Option<String>,
72}
73
74pub struct ReseedRequest {
75    pub reference: String,
76    pub bee_url: String,
77    pub batch_id: String,
78    pub timeout: Duration,
79}
80
81const SPEC_VERSION: u32 = 1;
82
83fn parse_reference(s: &str) -> Result<Reference> {
84    Reference::from_hex(s).map_err(|e| anyhow!("invalid reference {s}: {e}"))
85}
86
87fn make_bee(url: &str, timeout: Duration) -> Result<Client> {
88    let http = reqwest::Client::builder()
89        .timeout(timeout)
90        .build()
91        .context("building http client")?;
92    Client::with_http_client(url, http).map_err(|e| anyhow!("invalid bee url {url}: {e}"))
93}
94
95/// Probe `/stewardship/{ref}` across all vantages in parallel.
96pub async fn check_multi_vantage(
97    reference: &str,
98    bees: &[String],
99    timeout: Duration,
100) -> Result<Report> {
101    let r = parse_reference(reference)?;
102
103    let mut futs = FuturesUnordered::new();
104    for bee_url in bees {
105        let bee_url = bee_url.clone();
106        let r = r.clone();
107        futs.push(async move {
108            let started = Instant::now();
109            let result = async {
110                let bee = make_bee(&bee_url, timeout)?;
111                let ok = bee.api().is_retrievable(&r).await.map_err(anyhow::Error::from)?;
112                Ok::<_, anyhow::Error>(ok)
113            }
114            .await;
115            let elapsed_ms = started.elapsed().as_millis() as u64;
116            match result {
117                Ok(ok) => VantageResult {
118                    bee_url,
119                    retrievable: Some(ok),
120                    elapsed_ms,
121                    error: None,
122                },
123                Err(e) => VantageResult {
124                    bee_url,
125                    retrievable: None,
126                    elapsed_ms,
127                    error: Some(format!("{e:#}")),
128                },
129            }
130        });
131    }
132
133    let mut vantages = Vec::with_capacity(bees.len());
134    while let Some(v) = futs.next().await {
135        vantages.push(v);
136    }
137    vantages.sort_by(|a, b| a.bee_url.cmp(&b.bee_url));
138
139    let status = aggregate_status(&vantages);
140    Ok(Report {
141        reference: reference.to_string(),
142        status,
143        vantages,
144        chunks: None,
145        spec_version: SPEC_VERSION,
146    })
147}
148
149fn aggregate_status(vantages: &[VantageResult]) -> Status {
150    let total = vantages.len();
151    if total == 0 {
152        return Status::Error;
153    }
154    let mut retr = 0usize;
155    let mut unret = 0usize;
156    let mut err = 0usize;
157    for v in vantages {
158        match v.retrievable {
159            Some(true) => retr += 1,
160            Some(false) => unret += 1,
161            None => err += 1,
162        }
163    }
164    if err == total {
165        Status::Error
166    } else if retr == total {
167        Status::Retrievable
168    } else if retr == 0 && unret + err == total {
169        Status::Unretrievable
170    } else {
171        Status::Partial
172    }
173}
174
175/// Walk the manifest at `report.reference`, probe each leaf chunk via
176/// `GET /chunks/{addr}` from every vantage, and attach the result to
177/// the report.
178pub async fn drill_down(
179    mut report: Report,
180    bees: &[String],
181    timeout: Duration,
182    concurrency: usize,
183) -> Result<Report> {
184    let r = parse_reference(&report.reference)?;
185    // Use the first vantage as the source-of-truth for manifest walking.
186    let walker_bee = bees.first().context("no bee URL for drill-down")?;
187    let walker = make_bee(walker_bee, timeout)?;
188
189    let addresses = collect_chunk_addresses(&walker, &r).await?;
190
191    // Build a per-vantage Bee client once.
192    let clients: Vec<(String, Client)> = bees
193        .iter()
194        .map(|u| make_bee(u, timeout).map(|b| (u.clone(), b)))
195        .collect::<Result<_>>()?;
196
197    let sem = Arc::new(Semaphore::new(concurrency.max(1)));
198    let mut probes = Vec::with_capacity(addresses.len());
199
200    let mut futs = FuturesUnordered::new();
201    for addr in addresses {
202        let sem = sem.clone();
203        let clients = clients.clone();
204        futs.push(async move {
205            let mut per_vantage = BTreeMap::new();
206            for (url, bee) in &clients {
207                let _permit = sem.acquire().await.expect("semaphore not closed");
208                let started = Instant::now();
209                let res = bee.file().download_chunk(&addr, None).await;
210                let elapsed_ms = started.elapsed().as_millis() as u64;
211                let cv = match res {
212                    Ok(_) => ChunkVantage { found: true, elapsed_ms, error: None },
213                    Err(e) => ChunkVantage {
214                        found: false,
215                        elapsed_ms,
216                        error: Some(format!("{e}")),
217                    },
218                };
219                per_vantage.insert(url.clone(), cv);
220            }
221            let hex = addr.to_hex();
222            let neighborhood = hex.chars().take(2).collect::<String>();
223            ChunkProbe {
224                address: hex,
225                neighborhood,
226                per_vantage,
227            }
228        });
229    }
230
231    while let Some(p) = futs.next().await {
232        probes.push(p);
233    }
234    probes.sort_by(|a, b| a.address.cmp(&b.address));
235    report.chunks = Some(probes);
236    Ok(report)
237}
238
239/// BFS-walk the manifest starting at `root`, collecting both child
240/// manifest chunk addresses and content (`target_address`) addresses.
241/// Capped at [`MAX_CHUNKS`] to bound work for pathological cases. If
242/// `root` isn't a manifest, returns just `[root]`.
243async fn collect_chunk_addresses(bee: &Client, root: &Reference) -> Result<Vec<Reference>> {
244    let mut addresses: Vec<Reference> = vec![root.clone()];
245    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
246    seen.insert(root.to_hex());
247
248    let mut queue: std::collections::VecDeque<Reference> = std::collections::VecDeque::new();
249    queue.push_back(root.clone());
250
251    while let Some(addr) = queue.pop_front() {
252        if addresses.len() >= MAX_CHUNKS {
253            break;
254        }
255        let bytes = match bee.file().download_chunk(&addr, None).await {
256            Ok(b) => b,
257            // If we can't fetch a manifest node, we can't walk deeper. The
258            // outer probe loop will still record the chunk-level miss.
259            Err(_) => continue,
260        };
261        let Ok(node) = unmarshal(&bytes, addr.as_bytes()) else {
262            // Root might be raw content; deeper nodes that don't parse
263            // are leaves — both fine to skip.
264            continue;
265        };
266        // Content reference at this node — probe it but don't recurse.
267        if !is_null_address(&node.target_address) {
268            if let Ok(r) = Reference::new(&node.target_address) {
269                if seen.insert(r.to_hex()) {
270                    addresses.push(r);
271                }
272            }
273        }
274        // Child manifest chunks — probe and recurse.
275        for fork in node.forks.values() {
276            if let Some(sa) = fork.node.self_address {
277                if let Ok(r) = Reference::new(&sa) {
278                    if seen.insert(r.to_hex()) {
279                        addresses.push(r.clone());
280                        queue.push_back(r);
281                    }
282                }
283            }
284        }
285    }
286    Ok(addresses)
287}
288
289pub async fn reseed(req: ReseedRequest) -> Result<()> {
290    let bee = make_bee(&req.bee_url, req.timeout)?;
291    let r = parse_reference(&req.reference)?;
292    let batch = BatchId::from_hex(&req.batch_id)
293        .map_err(|e| anyhow!("invalid batch id {}: {e}", req.batch_id))?;
294    bee.api().reupload(&r, &batch).await?;
295    Ok(())
296}
297
298pub fn render_report(report: &Report, fmt: OutputFormat) -> String {
299    match fmt {
300        OutputFormat::Json => {
301            serde_json::to_string_pretty(report).expect("report serialization") + "\n"
302        }
303        OutputFormat::Text => render_text(report),
304    }
305}
306
307fn render_text(r: &Report) -> String {
308    use std::fmt::Write;
309    let mut out = String::new();
310    let _ = writeln!(out, "ref     {}", r.reference);
311    let _ = writeln!(out, "status  {:?}", r.status);
312    let _ = writeln!(out);
313    let _ = writeln!(out, "vantages:");
314    let url_w = r.vantages.iter().map(|v| v.bee_url.len()).max().unwrap_or(20);
315    for v in &r.vantages {
316        let state = match (v.retrievable, &v.error) {
317            (Some(true), _) => "retrievable",
318            (Some(false), _) => "unretrievable",
319            (None, Some(_)) => "error",
320            (None, None) => "unknown",
321        };
322        let _ = writeln!(
323            out,
324            "  {:<url_w$}  {:<14} {:>6} ms{}",
325            v.bee_url,
326            state,
327            v.elapsed_ms,
328            v.error
329                .as_deref()
330                .map(|e| format!("  ({e})"))
331                .unwrap_or_default(),
332            url_w = url_w
333        );
334    }
335    if let Some(chunks) = &r.chunks {
336        let _ = writeln!(out);
337        let _ = writeln!(out, "chunks: {} probed", chunks.len());
338        let mut missing = 0usize;
339        for c in chunks {
340            let missing_in: Vec<&String> = c
341                .per_vantage
342                .iter()
343                .filter(|(_, cv)| !cv.found)
344                .map(|(u, _)| u)
345                .collect();
346            if !missing_in.is_empty() {
347                missing += 1;
348                let _ = writeln!(
349                    out,
350                    "  [{}] {}  missing on: {}",
351                    c.neighborhood,
352                    short(&c.address),
353                    missing_in
354                        .iter()
355                        .map(|s| s.as_str())
356                        .collect::<Vec<_>>()
357                        .join(", ")
358                );
359            }
360        }
361        if missing == 0 {
362            let _ = writeln!(out, "  all chunks present on all vantages");
363        } else {
364            let _ = writeln!(out, "  {missing} chunk(s) missing on at least one vantage");
365        }
366    }
367    out
368}
369
370fn short(hex: &str) -> String {
371    if hex.len() > 16 {
372        format!("{}…{}", &hex[..8], &hex[hex.len() - 4..])
373    } else {
374        hex.to_string()
375    }
376}