use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Context, Result, anyhow};
use bee::Client;
use bee::manifest::{is_null_address, unmarshal};
use bee::swarm::{BatchId, Reference};
use futures::stream::{FuturesUnordered, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
const MAX_CHUNKS: usize = 1000;
#[derive(Copy, Clone, Debug)]
pub enum OutputFormat {
Text,
Json,
}
#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum Status {
Retrievable,
Unretrievable,
Partial,
Error,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Report {
pub reference: String,
pub status: Status,
pub vantages: Vec<VantageResult>,
#[serde(skip_serializing_if = "Option::is_none")]
pub chunks: Option<Vec<ChunkProbe>>,
pub spec_version: u32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct VantageResult {
pub bee_url: String,
pub retrievable: Option<bool>,
pub elapsed_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ChunkProbe {
pub address: String,
pub neighborhood: String,
pub per_vantage: BTreeMap<String, ChunkVantage>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ChunkVantage {
pub found: bool,
pub elapsed_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
pub struct ReseedRequest {
pub reference: String,
pub bee_url: String,
pub batch_id: String,
pub timeout: Duration,
}
const SPEC_VERSION: u32 = 1;
fn parse_reference(s: &str) -> Result<Reference> {
Reference::from_hex(s).map_err(|e| anyhow!("invalid reference {s}: {e}"))
}
fn make_bee(url: &str, timeout: Duration) -> Result<Client> {
let http = reqwest::Client::builder()
.timeout(timeout)
.build()
.context("building http client")?;
Client::with_http_client(url, http).map_err(|e| anyhow!("invalid bee url {url}: {e}"))
}
pub async fn check_multi_vantage(
reference: &str,
bees: &[String],
timeout: Duration,
) -> Result<Report> {
let r = parse_reference(reference)?;
let mut futs = FuturesUnordered::new();
for bee_url in bees {
let bee_url = bee_url.clone();
let r = r.clone();
futs.push(async move {
let started = Instant::now();
let result = async {
let bee = make_bee(&bee_url, timeout)?;
let ok = bee.api().is_retrievable(&r).await.map_err(anyhow::Error::from)?;
Ok::<_, anyhow::Error>(ok)
}
.await;
let elapsed_ms = started.elapsed().as_millis() as u64;
match result {
Ok(ok) => VantageResult {
bee_url,
retrievable: Some(ok),
elapsed_ms,
error: None,
},
Err(e) => VantageResult {
bee_url,
retrievable: None,
elapsed_ms,
error: Some(format!("{e:#}")),
},
}
});
}
let mut vantages = Vec::with_capacity(bees.len());
while let Some(v) = futs.next().await {
vantages.push(v);
}
vantages.sort_by(|a, b| a.bee_url.cmp(&b.bee_url));
let status = aggregate_status(&vantages);
Ok(Report {
reference: reference.to_string(),
status,
vantages,
chunks: None,
spec_version: SPEC_VERSION,
})
}
fn aggregate_status(vantages: &[VantageResult]) -> Status {
let total = vantages.len();
if total == 0 {
return Status::Error;
}
let mut retr = 0usize;
let mut unret = 0usize;
let mut err = 0usize;
for v in vantages {
match v.retrievable {
Some(true) => retr += 1,
Some(false) => unret += 1,
None => err += 1,
}
}
if err == total {
Status::Error
} else if retr == total {
Status::Retrievable
} else if retr == 0 && unret + err == total {
Status::Unretrievable
} else {
Status::Partial
}
}
pub async fn drill_down(
mut report: Report,
bees: &[String],
timeout: Duration,
concurrency: usize,
) -> Result<Report> {
let r = parse_reference(&report.reference)?;
let walker_bee = bees.first().context("no bee URL for drill-down")?;
let walker = make_bee(walker_bee, timeout)?;
let addresses = collect_chunk_addresses(&walker, &r).await?;
let clients: Vec<(String, Client)> = bees
.iter()
.map(|u| make_bee(u, timeout).map(|b| (u.clone(), b)))
.collect::<Result<_>>()?;
let sem = Arc::new(Semaphore::new(concurrency.max(1)));
let mut probes = Vec::with_capacity(addresses.len());
let mut futs = FuturesUnordered::new();
for addr in addresses {
let sem = sem.clone();
let clients = clients.clone();
futs.push(async move {
let mut per_vantage = BTreeMap::new();
for (url, bee) in &clients {
let _permit = sem.acquire().await.expect("semaphore not closed");
let started = Instant::now();
let res = bee.file().download_chunk(&addr, None).await;
let elapsed_ms = started.elapsed().as_millis() as u64;
let cv = match res {
Ok(_) => ChunkVantage { found: true, elapsed_ms, error: None },
Err(e) => ChunkVantage {
found: false,
elapsed_ms,
error: Some(format!("{e}")),
},
};
per_vantage.insert(url.clone(), cv);
}
let hex = addr.to_hex();
let neighborhood = hex.chars().take(2).collect::<String>();
ChunkProbe {
address: hex,
neighborhood,
per_vantage,
}
});
}
while let Some(p) = futs.next().await {
probes.push(p);
}
probes.sort_by(|a, b| a.address.cmp(&b.address));
report.chunks = Some(probes);
Ok(report)
}
async fn collect_chunk_addresses(bee: &Client, root: &Reference) -> Result<Vec<Reference>> {
let mut addresses: Vec<Reference> = vec![root.clone()];
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
seen.insert(root.to_hex());
let mut queue: std::collections::VecDeque<Reference> = std::collections::VecDeque::new();
queue.push_back(root.clone());
while let Some(addr) = queue.pop_front() {
if addresses.len() >= MAX_CHUNKS {
break;
}
let bytes = match bee.file().download_chunk(&addr, None).await {
Ok(b) => b,
Err(_) => continue,
};
let Ok(node) = unmarshal(&bytes, addr.as_bytes()) else {
continue;
};
if !is_null_address(&node.target_address) {
if let Ok(r) = Reference::new(&node.target_address) {
if seen.insert(r.to_hex()) {
addresses.push(r);
}
}
}
for fork in node.forks.values() {
if let Some(sa) = fork.node.self_address {
if let Ok(r) = Reference::new(&sa) {
if seen.insert(r.to_hex()) {
addresses.push(r.clone());
queue.push_back(r);
}
}
}
}
}
Ok(addresses)
}
pub async fn reseed(req: ReseedRequest) -> Result<()> {
let bee = make_bee(&req.bee_url, req.timeout)?;
let r = parse_reference(&req.reference)?;
let batch = BatchId::from_hex(&req.batch_id)
.map_err(|e| anyhow!("invalid batch id {}: {e}", req.batch_id))?;
bee.api().reupload(&r, &batch).await?;
Ok(())
}
pub fn render_report(report: &Report, fmt: OutputFormat) -> String {
match fmt {
OutputFormat::Json => {
serde_json::to_string_pretty(report).expect("report serialization") + "\n"
}
OutputFormat::Text => render_text(report),
}
}
fn render_text(r: &Report) -> String {
use std::fmt::Write;
let mut out = String::new();
let _ = writeln!(out, "ref {}", r.reference);
let _ = writeln!(out, "status {:?}", r.status);
let _ = writeln!(out);
let _ = writeln!(out, "vantages:");
let url_w = r.vantages.iter().map(|v| v.bee_url.len()).max().unwrap_or(20);
for v in &r.vantages {
let state = match (v.retrievable, &v.error) {
(Some(true), _) => "retrievable",
(Some(false), _) => "unretrievable",
(None, Some(_)) => "error",
(None, None) => "unknown",
};
let _ = writeln!(
out,
" {:<url_w$} {:<14} {:>6} ms{}",
v.bee_url,
state,
v.elapsed_ms,
v.error
.as_deref()
.map(|e| format!(" ({e})"))
.unwrap_or_default(),
url_w = url_w
);
}
if let Some(chunks) = &r.chunks {
let _ = writeln!(out);
let _ = writeln!(out, "chunks: {} probed", chunks.len());
let mut missing = 0usize;
for c in chunks {
let missing_in: Vec<&String> = c
.per_vantage
.iter()
.filter(|(_, cv)| !cv.found)
.map(|(u, _)| u)
.collect();
if !missing_in.is_empty() {
missing += 1;
let _ = writeln!(
out,
" [{}] {} missing on: {}",
c.neighborhood,
short(&c.address),
missing_in
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()
.join(", ")
);
}
}
if missing == 0 {
let _ = writeln!(out, " all chunks present on all vantages");
} else {
let _ = writeln!(out, " {missing} chunk(s) missing on at least one vantage");
}
}
out
}
fn short(hex: &str) -> String {
if hex.len() > 16 {
format!("{}…{}", &hex[..8], &hex[hex.len() - 4..])
} else {
hex.to_string()
}
}