bee-check 0.1.0

Retrievability checker for Ethereum Swarm references. Multi-vantage stewardship probes, per-chunk drill-down, and one-shot re-seed.
Documentation
//! Core check engine for `bee-check`.
//!
//! The CLI in `main.rs` and (in future) the `bee-check-web` SPA both
//! consume the same [`Report`] shape produced here. See `SPEC.md` for
//! the JSON shape.

use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::{Context, Result, anyhow};
use bee::Client;
use bee::manifest::{is_null_address, unmarshal};
use bee::swarm::{BatchId, Reference};
use futures::stream::{FuturesUnordered, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;

const MAX_CHUNKS: usize = 1000;

#[derive(Copy, Clone, Debug)]
pub enum OutputFormat {
    Text,
    Json,
}

/// Outcome enum reported per vantage and aggregated for the whole check.
#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum Status {
    Retrievable,
    Unretrievable,
    Partial,
    Error,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Report {
    pub reference: String,
    pub status: Status,
    pub vantages: Vec<VantageResult>,
    /// Populated only when `--per-chunk` was requested.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub chunks: Option<Vec<ChunkProbe>>,
    pub spec_version: u32,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct VantageResult {
    pub bee_url: String,
    /// `None` means the call errored (see `error`).
    pub retrievable: Option<bool>,
    pub elapsed_ms: u64,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub error: Option<String>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ChunkProbe {
    pub address: String,
    /// First 2 hex chars — neighborhood the chunk should land in.
    pub neighborhood: String,
    pub per_vantage: BTreeMap<String, ChunkVantage>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ChunkVantage {
    pub found: bool,
    pub elapsed_ms: u64,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub error: Option<String>,
}

pub struct ReseedRequest {
    pub reference: String,
    pub bee_url: String,
    pub batch_id: String,
    pub timeout: Duration,
}

const SPEC_VERSION: u32 = 1;

fn parse_reference(s: &str) -> Result<Reference> {
    Reference::from_hex(s).map_err(|e| anyhow!("invalid reference {s}: {e}"))
}

fn make_bee(url: &str, timeout: Duration) -> Result<Client> {
    let http = reqwest::Client::builder()
        .timeout(timeout)
        .build()
        .context("building http client")?;
    Client::with_http_client(url, http).map_err(|e| anyhow!("invalid bee url {url}: {e}"))
}

/// Probe `/stewardship/{ref}` across all vantages in parallel.
pub async fn check_multi_vantage(
    reference: &str,
    bees: &[String],
    timeout: Duration,
) -> Result<Report> {
    let r = parse_reference(reference)?;

    let mut futs = FuturesUnordered::new();
    for bee_url in bees {
        let bee_url = bee_url.clone();
        let r = r.clone();
        futs.push(async move {
            let started = Instant::now();
            let result = async {
                let bee = make_bee(&bee_url, timeout)?;
                let ok = bee.api().is_retrievable(&r).await.map_err(anyhow::Error::from)?;
                Ok::<_, anyhow::Error>(ok)
            }
            .await;
            let elapsed_ms = started.elapsed().as_millis() as u64;
            match result {
                Ok(ok) => VantageResult {
                    bee_url,
                    retrievable: Some(ok),
                    elapsed_ms,
                    error: None,
                },
                Err(e) => VantageResult {
                    bee_url,
                    retrievable: None,
                    elapsed_ms,
                    error: Some(format!("{e:#}")),
                },
            }
        });
    }

    let mut vantages = Vec::with_capacity(bees.len());
    while let Some(v) = futs.next().await {
        vantages.push(v);
    }
    vantages.sort_by(|a, b| a.bee_url.cmp(&b.bee_url));

    let status = aggregate_status(&vantages);
    Ok(Report {
        reference: reference.to_string(),
        status,
        vantages,
        chunks: None,
        spec_version: SPEC_VERSION,
    })
}

fn aggregate_status(vantages: &[VantageResult]) -> Status {
    let total = vantages.len();
    if total == 0 {
        return Status::Error;
    }
    let mut retr = 0usize;
    let mut unret = 0usize;
    let mut err = 0usize;
    for v in vantages {
        match v.retrievable {
            Some(true) => retr += 1,
            Some(false) => unret += 1,
            None => err += 1,
        }
    }
    if err == total {
        Status::Error
    } else if retr == total {
        Status::Retrievable
    } else if retr == 0 && unret + err == total {
        Status::Unretrievable
    } else {
        Status::Partial
    }
}

/// Walk the manifest at `report.reference`, probe each leaf chunk via
/// `GET /chunks/{addr}` from every vantage, and attach the result to
/// the report.
pub async fn drill_down(
    mut report: Report,
    bees: &[String],
    timeout: Duration,
    concurrency: usize,
) -> Result<Report> {
    let r = parse_reference(&report.reference)?;
    // Use the first vantage as the source-of-truth for manifest walking.
    let walker_bee = bees.first().context("no bee URL for drill-down")?;
    let walker = make_bee(walker_bee, timeout)?;

    let addresses = collect_chunk_addresses(&walker, &r).await?;

    // Build a per-vantage Bee client once.
    let clients: Vec<(String, Client)> = bees
        .iter()
        .map(|u| make_bee(u, timeout).map(|b| (u.clone(), b)))
        .collect::<Result<_>>()?;

    let sem = Arc::new(Semaphore::new(concurrency.max(1)));
    let mut probes = Vec::with_capacity(addresses.len());

    let mut futs = FuturesUnordered::new();
    for addr in addresses {
        let sem = sem.clone();
        let clients = clients.clone();
        futs.push(async move {
            let mut per_vantage = BTreeMap::new();
            for (url, bee) in &clients {
                let _permit = sem.acquire().await.expect("semaphore not closed");
                let started = Instant::now();
                let res = bee.file().download_chunk(&addr, None).await;
                let elapsed_ms = started.elapsed().as_millis() as u64;
                let cv = match res {
                    Ok(_) => ChunkVantage { found: true, elapsed_ms, error: None },
                    Err(e) => ChunkVantage {
                        found: false,
                        elapsed_ms,
                        error: Some(format!("{e}")),
                    },
                };
                per_vantage.insert(url.clone(), cv);
            }
            let hex = addr.to_hex();
            let neighborhood = hex.chars().take(2).collect::<String>();
            ChunkProbe {
                address: hex,
                neighborhood,
                per_vantage,
            }
        });
    }

    while let Some(p) = futs.next().await {
        probes.push(p);
    }
    probes.sort_by(|a, b| a.address.cmp(&b.address));
    report.chunks = Some(probes);
    Ok(report)
}

/// BFS-walk the manifest starting at `root`, collecting both child
/// manifest chunk addresses and content (`target_address`) addresses.
/// Capped at [`MAX_CHUNKS`] to bound work for pathological cases. If
/// `root` isn't a manifest, returns just `[root]`.
async fn collect_chunk_addresses(bee: &Client, root: &Reference) -> Result<Vec<Reference>> {
    let mut addresses: Vec<Reference> = vec![root.clone()];
    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
    seen.insert(root.to_hex());

    let mut queue: std::collections::VecDeque<Reference> = std::collections::VecDeque::new();
    queue.push_back(root.clone());

    while let Some(addr) = queue.pop_front() {
        if addresses.len() >= MAX_CHUNKS {
            break;
        }
        let bytes = match bee.file().download_chunk(&addr, None).await {
            Ok(b) => b,
            // If we can't fetch a manifest node, we can't walk deeper. The
            // outer probe loop will still record the chunk-level miss.
            Err(_) => continue,
        };
        let Ok(node) = unmarshal(&bytes, addr.as_bytes()) else {
            // Root might be raw content; deeper nodes that don't parse
            // are leaves — both fine to skip.
            continue;
        };
        // Content reference at this node — probe it but don't recurse.
        if !is_null_address(&node.target_address) {
            if let Ok(r) = Reference::new(&node.target_address) {
                if seen.insert(r.to_hex()) {
                    addresses.push(r);
                }
            }
        }
        // Child manifest chunks — probe and recurse.
        for fork in node.forks.values() {
            if let Some(sa) = fork.node.self_address {
                if let Ok(r) = Reference::new(&sa) {
                    if seen.insert(r.to_hex()) {
                        addresses.push(r.clone());
                        queue.push_back(r);
                    }
                }
            }
        }
    }
    Ok(addresses)
}

pub async fn reseed(req: ReseedRequest) -> Result<()> {
    let bee = make_bee(&req.bee_url, req.timeout)?;
    let r = parse_reference(&req.reference)?;
    let batch = BatchId::from_hex(&req.batch_id)
        .map_err(|e| anyhow!("invalid batch id {}: {e}", req.batch_id))?;
    bee.api().reupload(&r, &batch).await?;
    Ok(())
}

pub fn render_report(report: &Report, fmt: OutputFormat) -> String {
    match fmt {
        OutputFormat::Json => {
            serde_json::to_string_pretty(report).expect("report serialization") + "\n"
        }
        OutputFormat::Text => render_text(report),
    }
}

fn render_text(r: &Report) -> String {
    use std::fmt::Write;
    let mut out = String::new();
    let _ = writeln!(out, "ref     {}", r.reference);
    let _ = writeln!(out, "status  {:?}", r.status);
    let _ = writeln!(out);
    let _ = writeln!(out, "vantages:");
    let url_w = r.vantages.iter().map(|v| v.bee_url.len()).max().unwrap_or(20);
    for v in &r.vantages {
        let state = match (v.retrievable, &v.error) {
            (Some(true), _) => "retrievable",
            (Some(false), _) => "unretrievable",
            (None, Some(_)) => "error",
            (None, None) => "unknown",
        };
        let _ = writeln!(
            out,
            "  {:<url_w$}  {:<14} {:>6} ms{}",
            v.bee_url,
            state,
            v.elapsed_ms,
            v.error
                .as_deref()
                .map(|e| format!("  ({e})"))
                .unwrap_or_default(),
            url_w = url_w
        );
    }
    if let Some(chunks) = &r.chunks {
        let _ = writeln!(out);
        let _ = writeln!(out, "chunks: {} probed", chunks.len());
        let mut missing = 0usize;
        for c in chunks {
            let missing_in: Vec<&String> = c
                .per_vantage
                .iter()
                .filter(|(_, cv)| !cv.found)
                .map(|(u, _)| u)
                .collect();
            if !missing_in.is_empty() {
                missing += 1;
                let _ = writeln!(
                    out,
                    "  [{}] {}  missing on: {}",
                    c.neighborhood,
                    short(&c.address),
                    missing_in
                        .iter()
                        .map(|s| s.as_str())
                        .collect::<Vec<_>>()
                        .join(", ")
                );
            }
        }
        if missing == 0 {
            let _ = writeln!(out, "  all chunks present on all vantages");
        } else {
            let _ = writeln!(out, "  {missing} chunk(s) missing on at least one vantage");
        }
    }
    out
}

fn short(hex: &str) -> String {
    if hex.len() > 16 {
        format!("{}{}", &hex[..8], &hex[hex.len() - 4..])
    } else {
        hex.to_string()
    }
}