use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Context, Result, anyhow};
use bee::Client;
use bee::manifest::{is_null_address, unmarshal};
use bee::swarm::gsoc::proximity;
use bee::swarm::{BatchId, EthAddress, Reference, Topic};
use futures::stream::{FuturesUnordered, StreamExt};
use reqwest::Method;
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
pub const DEFAULT_GATEWAY: &str = "https://api.gateway.ethswarm.org";
const MAX_CHUNKS: usize = 1000;
const STAMP_LOW_TTL_SECS: i64 = 86_400;
#[derive(Copy, Clone, Debug)]
pub enum OutputFormat {
Text,
Json,
}
#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum Status {
Retrievable,
Unretrievable,
Partial,
Error,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Report {
pub reference: String,
pub status: Status,
pub vantages: Vec<VantageResult>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub gateways: Vec<GatewayResult>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resolution: Option<Resolution>,
#[serde(skip_serializing_if = "Option::is_none")]
pub chunks: Option<Vec<ChunkProbe>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub chunk_stats: Option<ChunkStats>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub cold_downloads: Vec<ColdDownloadResult>,
pub spec_version: u32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ChunkStats {
pub per_vantage: BTreeMap<String, ChunkStatRow>,
pub per_neighborhood: BTreeMap<String, ChunkStatRow>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ChunkStatRow {
pub total: usize,
pub found: usize,
pub missing: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub elapsed_p50_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub elapsed_p95_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub elapsed_max_ms: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct GatewayResult {
pub url: String,
pub retrievable: Option<bool>,
pub elapsed_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub status_code: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Resolution {
Feed {
owner: String,
topic: String,
resolved_reference: String,
},
}
#[derive(Debug, Serialize, Deserialize)]
pub struct VantageResult {
pub bee_url: String,
pub retrievable: Option<bool>,
pub elapsed_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub overlay: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub bee_version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub proximity_to_root: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub target_proximity: Option<u32>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ChunkProbe {
pub address: String,
pub neighborhood: String,
pub per_vantage: BTreeMap<String, ChunkVantage>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ChunkVantage {
pub found: bool,
pub elapsed_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub proximity: Option<u32>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ColdDownloadResult {
pub url: String,
pub endpoint: String,
pub success: bool,
pub bytes_downloaded: u64,
pub elapsed_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub status_code: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
pub struct ReseedRequest {
pub reference: String,
pub bee_url: String,
pub batch_id: String,
pub timeout: Duration,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StampStatus {
pub batch_id: String,
pub exists: bool,
pub usable: bool,
pub batch_ttl: i64,
pub healthy: bool,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub warnings: Vec<String>,
}
const SPEC_VERSION: u32 = 1;
fn parse_reference(s: &str) -> Result<Reference> {
Reference::from_hex(s).map_err(|e| anyhow!("invalid reference {s}: {e}"))
}
fn make_bee(url: &str, timeout: Duration) -> Result<Client> {
let http = reqwest::Client::builder()
.timeout(timeout)
.build()
.context("building http client")?;
Client::with_http_client(url, http).map_err(|e| anyhow!("invalid bee url {url}: {e}"))
}
pub async fn check_multi_vantage(
reference: &str,
bees: &[String],
timeout: Duration,
) -> Result<Report> {
let r = parse_reference(reference)?;
let root_bytes = first_32(&r);
let mut futs = FuturesUnordered::new();
for bee_url in bees {
let bee_url = bee_url.clone();
let r = r.clone();
futs.push(async move {
let bee = match make_bee(&bee_url, timeout) {
Ok(b) => b,
Err(e) => {
return VantageResult {
bee_url,
retrievable: None,
elapsed_ms: 0,
error: Some(format!("{e:#}")),
overlay: None,
bee_version: None,
proximity_to_root: None,
target_proximity: None,
};
}
};
let started = Instant::now();
let api = bee.api();
let debug = bee.debug();
let (stew_res, addr_res, health_res) = tokio::join!(
api.is_retrievable(&r),
debug.addresses(),
debug.health(),
);
let elapsed_ms = started.elapsed().as_millis() as u64;
let overlay = addr_res.ok().map(|a| a.overlay);
let bee_version = health_res.ok().map(|h| h.version);
let proximity_to_root = overlay
.as_deref()
.and_then(decode_overlay)
.map(|o| proximity(&o, &root_bytes));
match stew_res {
Ok(ok) => VantageResult {
bee_url,
retrievable: Some(ok),
elapsed_ms,
error: None,
overlay,
bee_version,
proximity_to_root,
target_proximity: None,
},
Err(e) => VantageResult {
bee_url,
retrievable: None,
elapsed_ms,
error: Some(format!("{e}")),
overlay,
bee_version,
proximity_to_root,
target_proximity: None,
},
}
});
}
let mut vantages = Vec::with_capacity(bees.len());
while let Some(v) = futs.next().await {
vantages.push(v);
}
vantages.sort_by(|a, b| a.bee_url.cmp(&b.bee_url));
let status = aggregate_status(&vantages, &[]);
Ok(Report {
reference: reference.to_string(),
status,
vantages,
gateways: Vec::new(),
resolution: None,
chunks: None,
chunk_stats: None,
cold_downloads: Vec::new(),
spec_version: SPEC_VERSION,
})
}
pub async fn check_gateways(
reference: &str,
gateway_urls: &[String],
timeout: Duration,
) -> Result<Vec<GatewayResult>> {
let _ = parse_reference(reference)?;
let http = reqwest::Client::builder()
.timeout(timeout)
.build()
.context("building http client for gateway probes")?;
let mut futs = FuturesUnordered::new();
for base in gateway_urls {
let base = base.clone();
let reference = reference.to_string();
let http = http.clone();
futs.push(async move {
let url = build_gateway_url(&base, &reference);
let started = Instant::now();
let res = http.request(Method::HEAD, &url).send().await;
let elapsed_ms = started.elapsed().as_millis() as u64;
match res {
Ok(resp) => {
let status = resp.status().as_u16();
GatewayResult {
url: base,
retrievable: Some(resp.status().is_success()),
elapsed_ms,
status_code: Some(status),
error: None,
}
}
Err(e) => GatewayResult {
url: base,
retrievable: None,
elapsed_ms,
status_code: None,
error: Some(format!("{e}")),
},
}
});
}
let mut out = Vec::with_capacity(gateway_urls.len());
while let Some(g) = futs.next().await {
out.push(g);
}
out.sort_by(|a, b| a.url.cmp(&b.url));
Ok(out)
}
pub fn merge_gateways(mut report: Report, gateways: Vec<GatewayResult>) -> Report {
report.gateways = gateways;
report.status = aggregate_status(&report.vantages, &report.gateways);
report
}
fn build_gateway_url(base: &str, reference: &str) -> String {
let trimmed = base.trim_end_matches('/');
format!("{trimmed}/bzz/{reference}/")
}
fn build_bytes_url(base: &str, reference: &str) -> String {
let trimmed = base.trim_end_matches('/');
format!("{trimmed}/bytes/{reference}")
}
pub async fn cold_download_all(
bee_urls: &[String],
gateway_urls: &[String],
reference: &str,
timeout: Duration,
) -> Result<Vec<ColdDownloadResult>> {
let _ = parse_reference(reference)?;
let http = reqwest::Client::builder()
.timeout(timeout)
.build()
.context("building http client for cold-download probes")?;
let mut targets: Vec<(String, String, String)> =
Vec::with_capacity(bee_urls.len() + gateway_urls.len());
for url in bee_urls {
targets.push((
url.clone(),
"/bytes/{ref}".to_string(),
build_bytes_url(url, reference),
));
}
for url in gateway_urls {
targets.push((
url.clone(),
"/bzz/{ref}/".to_string(),
build_gateway_url(url, reference),
));
}
let mut futs = FuturesUnordered::new();
for (base, endpoint, probe_url) in targets {
let http = http.clone();
futs.push(async move { cold_probe(&http, base, endpoint, probe_url).await });
}
let mut out = Vec::with_capacity(bee_urls.len() + gateway_urls.len());
while let Some(c) = futs.next().await {
out.push(c);
}
out.sort_by(|a, b| a.url.cmp(&b.url));
Ok(out)
}
async fn cold_probe(
http: &reqwest::Client,
base: String,
endpoint: String,
probe_url: String,
) -> ColdDownloadResult {
let started = Instant::now();
let res = http.get(&probe_url).send().await;
match res {
Ok(mut resp) => {
let status = resp.status();
let status_code = status.as_u16();
if !status.is_success() {
return ColdDownloadResult {
url: base,
endpoint,
success: false,
bytes_downloaded: 0,
elapsed_ms: started.elapsed().as_millis() as u64,
status_code: Some(status_code),
error: Some(format!("HTTP {status_code}")),
};
}
let mut bytes: u64 = 0;
let mut err: Option<String> = None;
loop {
match resp.chunk().await {
Ok(Some(chunk)) => bytes += chunk.len() as u64,
Ok(None) => break,
Err(e) => {
err = Some(format!("stream error after {bytes} bytes: {e}"));
break;
}
}
}
ColdDownloadResult {
url: base,
endpoint,
success: err.is_none(),
bytes_downloaded: bytes,
elapsed_ms: started.elapsed().as_millis() as u64,
status_code: Some(status_code),
error: err,
}
}
Err(e) => ColdDownloadResult {
url: base,
endpoint,
success: false,
bytes_downloaded: 0,
elapsed_ms: started.elapsed().as_millis() as u64,
status_code: None,
error: Some(format!("{e}")),
},
}
}
pub async fn resolve_feed(
bee_url: &str,
owner_hex: &str,
topic_hex: &str,
timeout: Duration,
) -> Result<(String, Resolution)> {
let bee = make_bee(bee_url, timeout)?;
let owner = EthAddress::from_hex(owner_hex)
.map_err(|e| anyhow!("invalid feed owner {owner_hex}: {e}"))?;
let topic = Topic::from_hex(topic_hex)
.map_err(|e| anyhow!("invalid feed topic {topic_hex}: {e}"))?;
let reference = bee
.file()
.get_feed_lookup(&owner, &topic)
.await
.map_err(anyhow::Error::from)?;
let r_hex = reference.to_hex();
Ok((
r_hex.clone(),
Resolution::Feed {
owner: owner.to_hex(),
topic: topic.to_hex(),
resolved_reference: r_hex,
},
))
}
pub fn parse_input(input: &str) -> ParsedInput {
if let Some(rest) = input.strip_prefix("feed:") {
let parts: Vec<&str> = rest.splitn(2, [':', '/']).collect();
if parts.len() == 2 {
return ParsedInput::Feed {
owner: parts[0].to_string(),
topic: parts[1].to_string(),
};
}
}
ParsedInput::Reference(input.to_string())
}
pub enum ParsedInput {
Reference(String),
Feed { owner: String, topic: String },
}
fn decode_overlay(hex: &str) -> Option<[u8; 32]> {
let s = hex.strip_prefix("0x").unwrap_or(hex);
if s.len() < 64 {
return None;
}
let mut out = [0u8; 32];
for (i, b) in out.iter_mut().enumerate() {
let h = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16).ok()?;
*b = h;
}
Some(out)
}
fn first_32(r: &Reference) -> [u8; 32] {
let mut out = [0u8; 32];
out.copy_from_slice(&r.as_bytes()[..32]);
out
}
fn aggregate_status(vantages: &[VantageResult], gateways: &[GatewayResult]) -> Status {
let outcomes: Vec<Option<bool>> = vantages
.iter()
.map(|v| v.retrievable)
.chain(gateways.iter().map(|g| g.retrievable))
.collect();
let total = outcomes.len();
if total == 0 {
return Status::Error;
}
let retr = outcomes.iter().filter(|o| **o == Some(true)).count();
let unret = outcomes.iter().filter(|o| **o == Some(false)).count();
let err = outcomes.iter().filter(|o| o.is_none()).count();
if err == total {
Status::Error
} else if retr == total {
Status::Retrievable
} else if retr == 0 && unret + err == total {
Status::Unretrievable
} else {
Status::Partial
}
}
pub async fn drill_down(
mut report: Report,
bees: &[String],
timeout: Duration,
concurrency: usize,
) -> Result<Report> {
let r = parse_reference(&report.reference)?;
let walker_bee = bees.first().context("no bee URL for drill-down")?;
let walker = make_bee(walker_bee, timeout)?;
let addresses = collect_chunk_addresses(&walker, &r).await?;
let overlays: BTreeMap<String, [u8; 32]> = report
.vantages
.iter()
.filter_map(|v| {
v.overlay
.as_deref()
.and_then(decode_overlay)
.map(|o| (v.bee_url.clone(), o))
})
.collect();
let clients: Vec<(String, Client)> = bees
.iter()
.map(|u| make_bee(u, timeout).map(|b| (u.clone(), b)))
.collect::<Result<_>>()?;
let sem = Arc::new(Semaphore::new(concurrency.max(1)));
let mut probes = Vec::with_capacity(addresses.len());
let mut futs = FuturesUnordered::new();
for addr in addresses {
let sem = sem.clone();
let clients = clients.clone();
let overlays = overlays.clone();
futs.push(async move {
let chunk_bytes = first_32_of_ref(&addr);
let mut per_vantage = BTreeMap::new();
for (url, bee) in &clients {
let _permit = sem.acquire().await.expect("semaphore not closed");
let started = Instant::now();
let res = bee.file().download_chunk(&addr, None).await;
let elapsed_ms = started.elapsed().as_millis() as u64;
let prox = overlays
.get(url)
.map(|o| proximity(o, &chunk_bytes));
let cv = match res {
Ok(_) => ChunkVantage {
found: true,
elapsed_ms,
error: None,
proximity: prox,
},
Err(e) => ChunkVantage {
found: false,
elapsed_ms,
error: Some(format!("{e}")),
proximity: prox,
},
};
per_vantage.insert(url.clone(), cv);
}
let hex = addr.to_hex();
let neighborhood = hex.chars().take(2).collect::<String>();
ChunkProbe {
address: hex,
neighborhood,
per_vantage,
}
});
}
while let Some(p) = futs.next().await {
probes.push(p);
}
probes.sort_by(|a, b| a.address.cmp(&b.address));
report.chunk_stats = Some(compute_chunk_stats(&probes));
report.chunks = Some(probes);
Ok(report)
}
pub fn compute_chunk_stats(probes: &[ChunkProbe]) -> ChunkStats {
let mut per_vantage: BTreeMap<String, (Vec<u64>, usize, usize)> = BTreeMap::new();
let mut per_neighborhood: BTreeMap<String, (Vec<u64>, usize, usize)> = BTreeMap::new();
for p in probes {
for (url, cv) in &p.per_vantage {
let entry = per_vantage.entry(url.clone()).or_default();
if cv.found {
entry.0.push(cv.elapsed_ms);
entry.1 += 1;
} else {
entry.2 += 1;
}
let n = per_neighborhood.entry(p.neighborhood.clone()).or_default();
if cv.found {
n.0.push(cv.elapsed_ms);
n.1 += 1;
} else {
n.2 += 1;
}
}
}
let to_row = |(latencies, found, missing): (Vec<u64>, usize, usize)| -> ChunkStatRow {
let mut sorted = latencies;
sorted.sort_unstable();
let p = |q: f64| -> Option<u64> {
if sorted.is_empty() {
None
} else {
let idx = ((sorted.len() as f64 - 1.0) * q).round() as usize;
Some(sorted[idx])
}
};
ChunkStatRow {
total: found + missing,
found,
missing,
elapsed_p50_ms: p(0.50),
elapsed_p95_ms: p(0.95),
elapsed_max_ms: sorted.last().copied(),
}
};
ChunkStats {
per_vantage: per_vantage.into_iter().map(|(k, v)| (k, to_row(v))).collect(),
per_neighborhood: per_neighborhood
.into_iter()
.map(|(k, v)| (k, to_row(v)))
.collect(),
}
}
pub fn annotate_target_overlay(mut report: Report, target_overlay_hex: &str) -> Report {
let Some(target) = decode_overlay(target_overlay_hex) else {
return report;
};
for v in &mut report.vantages {
if let Some(o) = v.overlay.as_deref().and_then(decode_overlay) {
v.target_proximity = Some(proximity(&o, &target));
}
}
report
.vantages
.sort_by(|a, b| b.target_proximity.cmp(&a.target_proximity));
report
}
fn first_32_of_ref(r: &Reference) -> [u8; 32] {
let mut out = [0u8; 32];
out.copy_from_slice(&r.as_bytes()[..32]);
out
}
async fn collect_chunk_addresses(bee: &Client, root: &Reference) -> Result<Vec<Reference>> {
let mut addresses: Vec<Reference> = vec![root.clone()];
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
seen.insert(root.to_hex());
let mut queue: std::collections::VecDeque<Reference> = std::collections::VecDeque::new();
queue.push_back(root.clone());
while let Some(addr) = queue.pop_front() {
if addresses.len() >= MAX_CHUNKS {
break;
}
let bytes = match bee.file().download_chunk(&addr, None).await {
Ok(b) => b,
Err(_) => continue,
};
let Ok(node) = unmarshal(&bytes, addr.as_bytes()) else {
continue;
};
if !is_null_address(&node.target_address) {
if let Ok(r) = Reference::new(&node.target_address) {
if seen.insert(r.to_hex()) {
addresses.push(r);
}
}
}
for fork in node.forks.values() {
if let Some(sa) = fork.node.self_address {
if let Ok(r) = Reference::new(&sa) {
if seen.insert(r.to_hex()) {
addresses.push(r.clone());
queue.push_back(r);
}
}
}
}
}
Ok(addresses)
}
pub async fn reseed(req: ReseedRequest) -> Result<()> {
let bee = make_bee(&req.bee_url, req.timeout)?;
let r = parse_reference(&req.reference)?;
let batch = BatchId::from_hex(&req.batch_id)
.map_err(|e| anyhow!("invalid batch id {}: {e}", req.batch_id))?;
bee.api().reupload(&r, &batch).await?;
Ok(())
}
pub async fn check_stamp(
bee_url: &str,
batch_id: &str,
timeout: Duration,
) -> Result<StampStatus> {
let bee = make_bee(bee_url, timeout)?;
let batch = BatchId::from_hex(batch_id)
.map_err(|e| anyhow!("invalid batch id {batch_id}: {e}"))?;
let pb = bee
.postage()
.get_postage_batch(&batch)
.await
.map_err(anyhow::Error::from)?;
let mut warnings = Vec::new();
if !pb.exists {
warnings.push("batch not known to this Bee".to_string());
}
if !pb.usable {
warnings.push("batch not usable yet (chain may be syncing)".to_string());
}
if pb.batch_ttl >= 0 && pb.batch_ttl < STAMP_LOW_TTL_SECS {
warnings.push(format!(
"batch TTL low: ~{} (re-seed may not outlive the batch)",
humanize_secs(pb.batch_ttl)
));
}
let healthy = pb.exists && pb.usable && (pb.batch_ttl < 0 || pb.batch_ttl >= STAMP_LOW_TTL_SECS);
Ok(StampStatus {
batch_id: batch_id.to_string(),
exists: pb.exists,
usable: pb.usable,
batch_ttl: pb.batch_ttl,
healthy,
warnings,
})
}
fn human_bytes(b: u64) -> String {
const KIB: u64 = 1024;
const MIB: u64 = KIB * 1024;
const GIB: u64 = MIB * 1024;
if b >= GIB {
format!("{:.2} GiB", b as f64 / GIB as f64)
} else if b >= MIB {
format!("{:.2} MiB", b as f64 / MIB as f64)
} else if b >= KIB {
format!("{:.2} KiB", b as f64 / KIB as f64)
} else {
format!("{b} B")
}
}
fn humanize_secs(s: i64) -> String {
if s < 0 {
return "unknown".to_string();
}
let s = s as u64;
if s >= 86_400 {
format!("{} day(s)", s / 86_400)
} else if s >= 3_600 {
format!("{} hour(s)", s / 3_600)
} else if s >= 60 {
format!("{} min", s / 60)
} else {
format!("{}s", s)
}
}
pub fn render_report(report: &Report, fmt: OutputFormat) -> String {
match fmt {
OutputFormat::Json => {
serde_json::to_string_pretty(report).expect("report serialization") + "\n"
}
OutputFormat::Text => render_text(report),
}
}
fn render_text(r: &Report) -> String {
use std::fmt::Write;
let mut out = String::new();
let _ = writeln!(out, "ref {}", r.reference);
let _ = writeln!(out, "status {:?}", r.status);
let _ = writeln!(out);
let _ = writeln!(out, "vantages:");
let url_w = r.vantages.iter().map(|v| v.bee_url.len()).max().unwrap_or(20);
for v in &r.vantages {
let state = match (v.retrievable, &v.error) {
(Some(true), _) => "retrievable",
(Some(false), _) => "unretrievable",
(None, Some(_)) => "error",
(None, None) => "unknown",
};
let meta = vantage_meta(v);
let _ = writeln!(
out,
" {:<url_w$} {:<14} {:>6} ms{}{}",
v.bee_url,
state,
v.elapsed_ms,
if meta.is_empty() { String::new() } else { format!(" {meta}") },
v.error
.as_deref()
.map(|e| format!(" ({e})"))
.unwrap_or_default(),
url_w = url_w
);
}
if !r.gateways.is_empty() {
let _ = writeln!(out);
let _ = writeln!(out, "gateways:");
let url_w = r.gateways.iter().map(|g| g.url.len()).max().unwrap_or(20);
for g in &r.gateways {
let state = match (g.retrievable, &g.error) {
(Some(true), _) => "retrievable",
(Some(false), _) => "unretrievable",
(None, _) => "error",
};
let code = g
.status_code
.map(|c| format!(" HTTP {c}"))
.unwrap_or_default();
let _ = writeln!(
out,
" {:<url_w$} {:<14} {:>6} ms{}{}",
g.url,
state,
g.elapsed_ms,
code,
g.error
.as_deref()
.map(|e| format!(" ({e})"))
.unwrap_or_default(),
url_w = url_w
);
}
}
if !r.cold_downloads.is_empty() {
let _ = writeln!(out);
let _ = writeln!(out, "cold downloads:");
let url_w = r.cold_downloads.iter().map(|c| c.url.len()).max().unwrap_or(20);
for c in &r.cold_downloads {
let state = if c.success { "ok" } else { "fail" };
let bytes = human_bytes(c.bytes_downloaded);
let code = c
.status_code
.map(|s| format!(" HTTP {s}"))
.unwrap_or_default();
let _ = writeln!(
out,
" {:<url_w$} {:<4} {:>6} ms {:>9}{}{}",
c.url,
state,
c.elapsed_ms,
bytes,
code,
c.error
.as_deref()
.map(|e| format!(" ({e})"))
.unwrap_or_default(),
url_w = url_w
);
}
}
if let Some(res) = &r.resolution {
let _ = writeln!(out);
match res {
Resolution::Feed { owner, topic, resolved_reference } => {
let _ = writeln!(
out,
"resolved feed owner={owner} topic={topic} -> {resolved_reference}",
);
}
}
}
if let Some(stats) = &r.chunk_stats {
let _ = writeln!(out);
let _ = writeln!(out, "chunk stats per vantage:");
let url_w = stats
.per_vantage
.keys()
.map(|k| k.len())
.max()
.unwrap_or(20);
for (url, row) in &stats.per_vantage {
let _ = writeln!(
out,
" {:<url_w$} found {:>3}/{:<3} p50 {:>5} ms · p95 {:>5} ms · max {:>5} ms",
url,
row.found,
row.total,
fmt_ms(row.elapsed_p50_ms),
fmt_ms(row.elapsed_p95_ms),
fmt_ms(row.elapsed_max_ms),
url_w = url_w
);
}
if !stats.per_neighborhood.is_empty() {
let _ = writeln!(out);
let _ = writeln!(out, "chunk stats per neighborhood:");
let mut rows: Vec<(&String, &ChunkStatRow)> =
stats.per_neighborhood.iter().collect();
rows.sort_by(|a, b| {
b.1.elapsed_p95_ms
.unwrap_or(0)
.cmp(&a.1.elapsed_p95_ms.unwrap_or(0))
});
for (nb, row) in rows.iter().take(10) {
let _ = writeln!(
out,
" nb {} found {:>3}/{:<3} p50 {:>5} ms · p95 {:>5} ms",
nb,
row.found,
row.total,
fmt_ms(row.elapsed_p50_ms),
fmt_ms(row.elapsed_p95_ms),
);
}
if rows.len() > 10 {
let _ = writeln!(out, " ... {} more neighborhoods", rows.len() - 10);
}
}
}
if let Some(chunks) = &r.chunks {
let _ = writeln!(out);
let _ = writeln!(out, "chunks: {} probed", chunks.len());
let mut missing = 0usize;
for c in chunks {
let missing_in: Vec<String> = c
.per_vantage
.iter()
.filter(|(_, cv)| !cv.found)
.map(|(u, cv)| match cv.proximity {
Some(p) => format!("{u} (PO {p})"),
None => u.clone(),
})
.collect();
if !missing_in.is_empty() {
missing += 1;
let _ = writeln!(
out,
" [{}] {} missing on: {}",
c.neighborhood,
short(&c.address),
missing_in.join(", ")
);
}
}
if missing == 0 {
let _ = writeln!(out, " all chunks present on all vantages");
} else {
let _ = writeln!(out, " {missing} chunk(s) missing on at least one vantage");
}
}
out
}
fn vantage_meta(v: &VantageResult) -> String {
let mut parts: Vec<String> = Vec::new();
if let Some(o) = &v.overlay {
let neigh = o.chars().take(2).collect::<String>();
let short_overlay = short_overlay(o);
parts.push(format!("overlay {short_overlay} (nb {neigh})"));
}
if let Some(p) = v.proximity_to_root {
parts.push(format!("PO {p}"));
}
if let Some(p) = v.target_proximity {
parts.push(format!("tgtPO {p}"));
}
if let Some(ver) = &v.bee_version {
parts.push(format!("v{ver}"));
}
if parts.is_empty() {
String::new()
} else {
format!("· {}", parts.join(" · "))
}
}
fn fmt_ms(v: Option<u64>) -> String {
match v {
Some(ms) => format!("{ms}"),
None => "—".to_string(),
}
}
fn short_overlay(hex: &str) -> String {
let s = hex.strip_prefix("0x").unwrap_or(hex);
if s.len() > 12 {
format!("{}…{}", &s[..6], &s[s.len() - 4..])
} else {
s.to_string()
}
}
pub fn render_stamp_status(s: &StampStatus) -> String {
use std::fmt::Write;
let mut out = String::new();
let ttl = if s.batch_ttl < 0 {
"unknown".to_string()
} else {
humanize_secs(s.batch_ttl)
};
let header = if s.healthy { "stamp OK" } else { "stamp warning" };
let _ = writeln!(
out,
"{header}: batch {} · usable={} · ttl={}",
short_overlay(&s.batch_id),
s.usable,
ttl,
);
for w in &s.warnings {
let _ = writeln!(out, " · {w}");
}
out
}
fn short(hex: &str) -> String {
if hex.len() > 16 {
format!("{}…{}", &hex[..8], &hex[hex.len() - 4..])
} else {
hex.to_string()
}
}