1use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use anyhow::{Context, Result, anyhow};
12use bee::Client;
13use bee::manifest::{is_null_address, unmarshal};
14use bee::swarm::gsoc::proximity;
15use bee::swarm::{BatchId, EthAddress, Reference, Topic};
16use futures::stream::{FuturesUnordered, StreamExt};
17use reqwest::Method;
18use serde::{Deserialize, Serialize};
19use tokio::sync::Semaphore;
20
21pub const DEFAULT_GATEWAY: &str = "https://api.gateway.ethswarm.org";
27
28const MAX_CHUNKS: usize = 1000;
29const STAMP_LOW_TTL_SECS: i64 = 86_400;
32
33#[derive(Copy, Clone, Debug)]
34pub enum OutputFormat {
35 Text,
36 Json,
37}
38
39#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
41#[serde(rename_all = "snake_case")]
42pub enum Status {
43 Retrievable,
44 Unretrievable,
45 Partial,
46 Error,
47}
48
49#[derive(Debug, Serialize, Deserialize)]
50pub struct Report {
51 pub reference: String,
52 pub status: Status,
53 pub vantages: Vec<VantageResult>,
54 #[serde(default, skip_serializing_if = "Vec::is_empty")]
57 pub gateways: Vec<GatewayResult>,
58 #[serde(default, skip_serializing_if = "Option::is_none")]
61 pub resolution: Option<Resolution>,
62 #[serde(skip_serializing_if = "Option::is_none")]
64 pub chunks: Option<Vec<ChunkProbe>>,
65 #[serde(skip_serializing_if = "Option::is_none")]
68 pub chunk_stats: Option<ChunkStats>,
69 #[serde(default, skip_serializing_if = "Vec::is_empty")]
74 pub cold_downloads: Vec<ColdDownloadResult>,
75 pub spec_version: u32,
76}
77
78#[derive(Debug, Serialize, Deserialize)]
82pub struct ChunkStats {
83 pub per_vantage: BTreeMap<String, ChunkStatRow>,
84 pub per_neighborhood: BTreeMap<String, ChunkStatRow>,
86}
87
88#[derive(Debug, Serialize, Deserialize, Clone)]
89pub struct ChunkStatRow {
90 pub total: usize,
91 pub found: usize,
92 pub missing: usize,
93 #[serde(skip_serializing_if = "Option::is_none")]
96 pub elapsed_p50_ms: Option<u64>,
97 #[serde(skip_serializing_if = "Option::is_none")]
98 pub elapsed_p95_ms: Option<u64>,
99 #[serde(skip_serializing_if = "Option::is_none")]
100 pub elapsed_max_ms: Option<u64>,
101}
102
103#[derive(Debug, Serialize, Deserialize)]
108pub struct GatewayResult {
109 pub url: String,
110 pub retrievable: Option<bool>,
113 pub elapsed_ms: u64,
114 #[serde(skip_serializing_if = "Option::is_none")]
116 pub status_code: Option<u16>,
117 #[serde(skip_serializing_if = "Option::is_none")]
118 pub error: Option<String>,
119}
120
121#[derive(Debug, Serialize, Deserialize)]
124#[serde(tag = "kind", rename_all = "snake_case")]
125pub enum Resolution {
126 Feed {
129 owner: String,
130 topic: String,
131 resolved_reference: String,
133 },
134}
135
136#[derive(Debug, Serialize, Deserialize)]
137pub struct VantageResult {
138 pub bee_url: String,
139 pub retrievable: Option<bool>,
141 pub elapsed_ms: u64,
142 #[serde(skip_serializing_if = "Option::is_none")]
143 pub error: Option<String>,
144 #[serde(skip_serializing_if = "Option::is_none")]
147 pub overlay: Option<String>,
148 #[serde(skip_serializing_if = "Option::is_none")]
150 pub bee_version: Option<String>,
151 #[serde(skip_serializing_if = "Option::is_none")]
155 pub proximity_to_root: Option<u32>,
156 #[serde(skip_serializing_if = "Option::is_none")]
160 pub target_proximity: Option<u32>,
161}
162
163#[derive(Debug, Serialize, Deserialize)]
164pub struct ChunkProbe {
165 pub address: String,
166 pub neighborhood: String,
168 pub per_vantage: BTreeMap<String, ChunkVantage>,
169}
170
171#[derive(Debug, Serialize, Deserialize)]
172pub struct ChunkVantage {
173 pub found: bool,
174 pub elapsed_ms: u64,
175 #[serde(skip_serializing_if = "Option::is_none")]
176 pub error: Option<String>,
177 #[serde(skip_serializing_if = "Option::is_none")]
180 pub proximity: Option<u32>,
181}
182
183#[derive(Debug, Serialize, Deserialize)]
189pub struct ColdDownloadResult {
190 pub url: String,
191 pub endpoint: String,
194 pub success: bool,
197 pub bytes_downloaded: u64,
199 pub elapsed_ms: u64,
200 #[serde(skip_serializing_if = "Option::is_none")]
201 pub status_code: Option<u16>,
202 #[serde(skip_serializing_if = "Option::is_none")]
203 pub error: Option<String>,
204}
205
206pub struct ReseedRequest {
207 pub reference: String,
208 pub bee_url: String,
209 pub batch_id: String,
210 pub timeout: Duration,
211}
212
213#[derive(Debug, Serialize, Deserialize)]
217pub struct StampStatus {
218 pub batch_id: String,
219 pub exists: bool,
220 pub usable: bool,
221 pub batch_ttl: i64,
222 pub healthy: bool,
225 #[serde(skip_serializing_if = "Vec::is_empty")]
226 pub warnings: Vec<String>,
227}
228
229const SPEC_VERSION: u32 = 1;
230
231fn parse_reference(s: &str) -> Result<Reference> {
232 Reference::from_hex(s).map_err(|e| anyhow!("invalid reference {s}: {e}"))
233}
234
235fn make_bee(url: &str, timeout: Duration) -> Result<Client> {
236 let http = reqwest::Client::builder()
237 .timeout(timeout)
238 .build()
239 .context("building http client")?;
240 Client::with_http_client(url, http).map_err(|e| anyhow!("invalid bee url {url}: {e}"))
241}
242
243pub async fn check_multi_vantage(
250 reference: &str,
251 bees: &[String],
252 timeout: Duration,
253) -> Result<Report> {
254 let r = parse_reference(reference)?;
255 let root_bytes = first_32(&r);
256
257 let mut futs = FuturesUnordered::new();
258 for bee_url in bees {
259 let bee_url = bee_url.clone();
260 let r = r.clone();
261 futs.push(async move {
262 let bee = match make_bee(&bee_url, timeout) {
263 Ok(b) => b,
264 Err(e) => {
265 return VantageResult {
266 bee_url,
267 retrievable: None,
268 elapsed_ms: 0,
269 error: Some(format!("{e:#}")),
270 overlay: None,
271 bee_version: None,
272 proximity_to_root: None,
273 target_proximity: None,
274 };
275 }
276 };
277 let started = Instant::now();
278 let api = bee.api();
279 let debug = bee.debug();
280 let (stew_res, addr_res, health_res) = tokio::join!(
281 api.is_retrievable(&r),
282 debug.addresses(),
283 debug.health(),
284 );
285 let elapsed_ms = started.elapsed().as_millis() as u64;
286
287 let overlay = addr_res.ok().map(|a| a.overlay);
288 let bee_version = health_res.ok().map(|h| h.version);
289 let proximity_to_root = overlay
290 .as_deref()
291 .and_then(decode_overlay)
292 .map(|o| proximity(&o, &root_bytes));
293
294 match stew_res {
295 Ok(ok) => VantageResult {
296 bee_url,
297 retrievable: Some(ok),
298 elapsed_ms,
299 error: None,
300 overlay,
301 bee_version,
302 proximity_to_root,
303 target_proximity: None,
304 },
305 Err(e) => VantageResult {
306 bee_url,
307 retrievable: None,
308 elapsed_ms,
309 error: Some(format!("{e}")),
310 overlay,
311 bee_version,
312 proximity_to_root,
313 target_proximity: None,
314 },
315 }
316 });
317 }
318
319 let mut vantages = Vec::with_capacity(bees.len());
320 while let Some(v) = futs.next().await {
321 vantages.push(v);
322 }
323 vantages.sort_by(|a, b| a.bee_url.cmp(&b.bee_url));
324
325 let status = aggregate_status(&vantages, &[]);
326 Ok(Report {
327 reference: reference.to_string(),
328 status,
329 vantages,
330 gateways: Vec::new(),
331 resolution: None,
332 chunks: None,
333 chunk_stats: None,
334 cold_downloads: Vec::new(),
335 spec_version: SPEC_VERSION,
336 })
337}
338
339pub async fn check_gateways(
344 reference: &str,
345 gateway_urls: &[String],
346 timeout: Duration,
347) -> Result<Vec<GatewayResult>> {
348 let _ = parse_reference(reference)?;
351
352 let http = reqwest::Client::builder()
353 .timeout(timeout)
354 .build()
355 .context("building http client for gateway probes")?;
356
357 let mut futs = FuturesUnordered::new();
358 for base in gateway_urls {
359 let base = base.clone();
360 let reference = reference.to_string();
361 let http = http.clone();
362 futs.push(async move {
363 let url = build_gateway_url(&base, &reference);
364 let started = Instant::now();
365 let res = http.request(Method::HEAD, &url).send().await;
366 let elapsed_ms = started.elapsed().as_millis() as u64;
367 match res {
368 Ok(resp) => {
369 let status = resp.status().as_u16();
370 GatewayResult {
371 url: base,
372 retrievable: Some(resp.status().is_success()),
373 elapsed_ms,
374 status_code: Some(status),
375 error: None,
376 }
377 }
378 Err(e) => GatewayResult {
379 url: base,
380 retrievable: None,
381 elapsed_ms,
382 status_code: None,
383 error: Some(format!("{e}")),
384 },
385 }
386 });
387 }
388 let mut out = Vec::with_capacity(gateway_urls.len());
389 while let Some(g) = futs.next().await {
390 out.push(g);
391 }
392 out.sort_by(|a, b| a.url.cmp(&b.url));
393 Ok(out)
394}
395
396pub fn merge_gateways(mut report: Report, gateways: Vec<GatewayResult>) -> Report {
400 report.gateways = gateways;
401 report.status = aggregate_status(&report.vantages, &report.gateways);
402 report
403}
404
405fn build_gateway_url(base: &str, reference: &str) -> String {
406 let trimmed = base.trim_end_matches('/');
407 format!("{trimmed}/bzz/{reference}/")
408}
409
410fn build_bytes_url(base: &str, reference: &str) -> String {
411 let trimmed = base.trim_end_matches('/');
412 format!("{trimmed}/bytes/{reference}")
413}
414
415pub async fn cold_download_all(
423 bee_urls: &[String],
424 gateway_urls: &[String],
425 reference: &str,
426 timeout: Duration,
427) -> Result<Vec<ColdDownloadResult>> {
428 let _ = parse_reference(reference)?;
429
430 let http = reqwest::Client::builder()
431 .timeout(timeout)
432 .build()
433 .context("building http client for cold-download probes")?;
434
435 let mut targets: Vec<(String, String, String)> =
437 Vec::with_capacity(bee_urls.len() + gateway_urls.len());
438 for url in bee_urls {
439 targets.push((
440 url.clone(),
441 "/bytes/{ref}".to_string(),
442 build_bytes_url(url, reference),
443 ));
444 }
445 for url in gateway_urls {
446 targets.push((
447 url.clone(),
448 "/bzz/{ref}/".to_string(),
449 build_gateway_url(url, reference),
450 ));
451 }
452
453 let mut futs = FuturesUnordered::new();
454 for (base, endpoint, probe_url) in targets {
455 let http = http.clone();
456 futs.push(async move { cold_probe(&http, base, endpoint, probe_url).await });
457 }
458
459 let mut out = Vec::with_capacity(bee_urls.len() + gateway_urls.len());
460 while let Some(c) = futs.next().await {
461 out.push(c);
462 }
463 out.sort_by(|a, b| a.url.cmp(&b.url));
464 Ok(out)
465}
466
467async fn cold_probe(
468 http: &reqwest::Client,
469 base: String,
470 endpoint: String,
471 probe_url: String,
472) -> ColdDownloadResult {
473 let started = Instant::now();
474 let res = http.get(&probe_url).send().await;
475 match res {
476 Ok(mut resp) => {
477 let status = resp.status();
478 let status_code = status.as_u16();
479 if !status.is_success() {
480 return ColdDownloadResult {
481 url: base,
482 endpoint,
483 success: false,
484 bytes_downloaded: 0,
485 elapsed_ms: started.elapsed().as_millis() as u64,
486 status_code: Some(status_code),
487 error: Some(format!("HTTP {status_code}")),
488 };
489 }
490 let mut bytes: u64 = 0;
491 let mut err: Option<String> = None;
492 loop {
493 match resp.chunk().await {
494 Ok(Some(chunk)) => bytes += chunk.len() as u64,
495 Ok(None) => break,
496 Err(e) => {
497 err = Some(format!("stream error after {bytes} bytes: {e}"));
498 break;
499 }
500 }
501 }
502 ColdDownloadResult {
503 url: base,
504 endpoint,
505 success: err.is_none(),
506 bytes_downloaded: bytes,
507 elapsed_ms: started.elapsed().as_millis() as u64,
508 status_code: Some(status_code),
509 error: err,
510 }
511 }
512 Err(e) => ColdDownloadResult {
513 url: base,
514 endpoint,
515 success: false,
516 bytes_downloaded: 0,
517 elapsed_ms: started.elapsed().as_millis() as u64,
518 status_code: None,
519 error: Some(format!("{e}")),
520 },
521 }
522}
523
524pub async fn resolve_feed(
529 bee_url: &str,
530 owner_hex: &str,
531 topic_hex: &str,
532 timeout: Duration,
533) -> Result<(String, Resolution)> {
534 let bee = make_bee(bee_url, timeout)?;
535 let owner = EthAddress::from_hex(owner_hex)
536 .map_err(|e| anyhow!("invalid feed owner {owner_hex}: {e}"))?;
537 let topic = Topic::from_hex(topic_hex)
538 .map_err(|e| anyhow!("invalid feed topic {topic_hex}: {e}"))?;
539 let reference = bee
540 .file()
541 .get_feed_lookup(&owner, &topic)
542 .await
543 .map_err(anyhow::Error::from)?;
544 let r_hex = reference.to_hex();
545 Ok((
546 r_hex.clone(),
547 Resolution::Feed {
548 owner: owner.to_hex(),
549 topic: topic.to_hex(),
550 resolved_reference: r_hex,
551 },
552 ))
553}
554
555pub fn parse_input(input: &str) -> ParsedInput {
561 if let Some(rest) = input.strip_prefix("feed:") {
562 let parts: Vec<&str> = rest.splitn(2, [':', '/']).collect();
564 if parts.len() == 2 {
565 return ParsedInput::Feed {
566 owner: parts[0].to_string(),
567 topic: parts[1].to_string(),
568 };
569 }
570 }
571 ParsedInput::Reference(input.to_string())
572}
573
574pub enum ParsedInput {
576 Reference(String),
579 Feed { owner: String, topic: String },
582}
583
584fn decode_overlay(hex: &str) -> Option<[u8; 32]> {
588 let s = hex.strip_prefix("0x").unwrap_or(hex);
589 if s.len() < 64 {
590 return None;
591 }
592 let mut out = [0u8; 32];
593 for (i, b) in out.iter_mut().enumerate() {
594 let h = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16).ok()?;
595 *b = h;
596 }
597 Some(out)
598}
599
600fn first_32(r: &Reference) -> [u8; 32] {
601 let mut out = [0u8; 32];
602 out.copy_from_slice(&r.as_bytes()[..32]);
603 out
604}
605
606fn aggregate_status(vantages: &[VantageResult], gateways: &[GatewayResult]) -> Status {
607 let outcomes: Vec<Option<bool>> = vantages
608 .iter()
609 .map(|v| v.retrievable)
610 .chain(gateways.iter().map(|g| g.retrievable))
611 .collect();
612 let total = outcomes.len();
613 if total == 0 {
614 return Status::Error;
615 }
616 let retr = outcomes.iter().filter(|o| **o == Some(true)).count();
617 let unret = outcomes.iter().filter(|o| **o == Some(false)).count();
618 let err = outcomes.iter().filter(|o| o.is_none()).count();
619 if err == total {
620 Status::Error
621 } else if retr == total {
622 Status::Retrievable
623 } else if retr == 0 && unret + err == total {
624 Status::Unretrievable
625 } else {
626 Status::Partial
627 }
628}
629
630pub async fn drill_down(
636 mut report: Report,
637 bees: &[String],
638 timeout: Duration,
639 concurrency: usize,
640) -> Result<Report> {
641 let r = parse_reference(&report.reference)?;
642 let walker_bee = bees.first().context("no bee URL for drill-down")?;
644 let walker = make_bee(walker_bee, timeout)?;
645
646 let addresses = collect_chunk_addresses(&walker, &r).await?;
647
648 let overlays: BTreeMap<String, [u8; 32]> = report
651 .vantages
652 .iter()
653 .filter_map(|v| {
654 v.overlay
655 .as_deref()
656 .and_then(decode_overlay)
657 .map(|o| (v.bee_url.clone(), o))
658 })
659 .collect();
660
661 let clients: Vec<(String, Client)> = bees
662 .iter()
663 .map(|u| make_bee(u, timeout).map(|b| (u.clone(), b)))
664 .collect::<Result<_>>()?;
665
666 let sem = Arc::new(Semaphore::new(concurrency.max(1)));
667 let mut probes = Vec::with_capacity(addresses.len());
668
669 let mut futs = FuturesUnordered::new();
670 for addr in addresses {
671 let sem = sem.clone();
672 let clients = clients.clone();
673 let overlays = overlays.clone();
674 futs.push(async move {
675 let chunk_bytes = first_32_of_ref(&addr);
676 let mut per_vantage = BTreeMap::new();
677 for (url, bee) in &clients {
678 let _permit = sem.acquire().await.expect("semaphore not closed");
679 let started = Instant::now();
680 let res = bee.file().download_chunk(&addr, None).await;
681 let elapsed_ms = started.elapsed().as_millis() as u64;
682 let prox = overlays
683 .get(url)
684 .map(|o| proximity(o, &chunk_bytes));
685 let cv = match res {
686 Ok(_) => ChunkVantage {
687 found: true,
688 elapsed_ms,
689 error: None,
690 proximity: prox,
691 },
692 Err(e) => ChunkVantage {
693 found: false,
694 elapsed_ms,
695 error: Some(format!("{e}")),
696 proximity: prox,
697 },
698 };
699 per_vantage.insert(url.clone(), cv);
700 }
701 let hex = addr.to_hex();
702 let neighborhood = hex.chars().take(2).collect::<String>();
703 ChunkProbe {
704 address: hex,
705 neighborhood,
706 per_vantage,
707 }
708 });
709 }
710
711 while let Some(p) = futs.next().await {
712 probes.push(p);
713 }
714 probes.sort_by(|a, b| a.address.cmp(&b.address));
715 report.chunk_stats = Some(compute_chunk_stats(&probes));
716 report.chunks = Some(probes);
717 Ok(report)
718}
719
720pub fn compute_chunk_stats(probes: &[ChunkProbe]) -> ChunkStats {
724 let mut per_vantage: BTreeMap<String, (Vec<u64>, usize, usize)> = BTreeMap::new();
726 let mut per_neighborhood: BTreeMap<String, (Vec<u64>, usize, usize)> = BTreeMap::new();
727
728 for p in probes {
729 for (url, cv) in &p.per_vantage {
730 let entry = per_vantage.entry(url.clone()).or_default();
731 if cv.found {
732 entry.0.push(cv.elapsed_ms);
733 entry.1 += 1;
734 } else {
735 entry.2 += 1;
736 }
737 let n = per_neighborhood.entry(p.neighborhood.clone()).or_default();
738 if cv.found {
739 n.0.push(cv.elapsed_ms);
740 n.1 += 1;
741 } else {
742 n.2 += 1;
743 }
744 }
745 }
746
747 let to_row = |(latencies, found, missing): (Vec<u64>, usize, usize)| -> ChunkStatRow {
748 let mut sorted = latencies;
749 sorted.sort_unstable();
750 let p = |q: f64| -> Option<u64> {
751 if sorted.is_empty() {
752 None
753 } else {
754 let idx = ((sorted.len() as f64 - 1.0) * q).round() as usize;
755 Some(sorted[idx])
756 }
757 };
758 ChunkStatRow {
759 total: found + missing,
760 found,
761 missing,
762 elapsed_p50_ms: p(0.50),
763 elapsed_p95_ms: p(0.95),
764 elapsed_max_ms: sorted.last().copied(),
765 }
766 };
767
768 ChunkStats {
769 per_vantage: per_vantage.into_iter().map(|(k, v)| (k, to_row(v))).collect(),
770 per_neighborhood: per_neighborhood
771 .into_iter()
772 .map(|(k, v)| (k, to_row(v)))
773 .collect(),
774 }
775}
776
777pub fn annotate_target_overlay(mut report: Report, target_overlay_hex: &str) -> Report {
782 let Some(target) = decode_overlay(target_overlay_hex) else {
783 return report;
784 };
785 for v in &mut report.vantages {
786 if let Some(o) = v.overlay.as_deref().and_then(decode_overlay) {
787 v.target_proximity = Some(proximity(&o, &target));
788 }
789 }
790 report
791 .vantages
792 .sort_by(|a, b| b.target_proximity.cmp(&a.target_proximity));
793 report
794}
795
796fn first_32_of_ref(r: &Reference) -> [u8; 32] {
797 let mut out = [0u8; 32];
798 out.copy_from_slice(&r.as_bytes()[..32]);
799 out
800}
801
802async fn collect_chunk_addresses(bee: &Client, root: &Reference) -> Result<Vec<Reference>> {
807 let mut addresses: Vec<Reference> = vec![root.clone()];
808 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
809 seen.insert(root.to_hex());
810
811 let mut queue: std::collections::VecDeque<Reference> = std::collections::VecDeque::new();
812 queue.push_back(root.clone());
813
814 while let Some(addr) = queue.pop_front() {
815 if addresses.len() >= MAX_CHUNKS {
816 break;
817 }
818 let bytes = match bee.file().download_chunk(&addr, None).await {
819 Ok(b) => b,
820 Err(_) => continue,
823 };
824 let Ok(node) = unmarshal(&bytes, addr.as_bytes()) else {
825 continue;
828 };
829 if !is_null_address(&node.target_address) {
831 if let Ok(r) = Reference::new(&node.target_address) {
832 if seen.insert(r.to_hex()) {
833 addresses.push(r);
834 }
835 }
836 }
837 for fork in node.forks.values() {
839 if let Some(sa) = fork.node.self_address {
840 if let Ok(r) = Reference::new(&sa) {
841 if seen.insert(r.to_hex()) {
842 addresses.push(r.clone());
843 queue.push_back(r);
844 }
845 }
846 }
847 }
848 }
849 Ok(addresses)
850}
851
852pub async fn reseed(req: ReseedRequest) -> Result<()> {
853 let bee = make_bee(&req.bee_url, req.timeout)?;
854 let r = parse_reference(&req.reference)?;
855 let batch = BatchId::from_hex(&req.batch_id)
856 .map_err(|e| anyhow!("invalid batch id {}: {e}", req.batch_id))?;
857 bee.api().reupload(&r, &batch).await?;
858 Ok(())
859}
860
861pub async fn check_stamp(
866 bee_url: &str,
867 batch_id: &str,
868 timeout: Duration,
869) -> Result<StampStatus> {
870 let bee = make_bee(bee_url, timeout)?;
871 let batch = BatchId::from_hex(batch_id)
872 .map_err(|e| anyhow!("invalid batch id {batch_id}: {e}"))?;
873 let pb = bee
874 .postage()
875 .get_postage_batch(&batch)
876 .await
877 .map_err(anyhow::Error::from)?;
878
879 let mut warnings = Vec::new();
880 if !pb.exists {
881 warnings.push("batch not known to this Bee".to_string());
882 }
883 if !pb.usable {
884 warnings.push("batch not usable yet (chain may be syncing)".to_string());
885 }
886 if pb.batch_ttl >= 0 && pb.batch_ttl < STAMP_LOW_TTL_SECS {
887 warnings.push(format!(
888 "batch TTL low: ~{} (re-seed may not outlive the batch)",
889 humanize_secs(pb.batch_ttl)
890 ));
891 }
892
893 let healthy = pb.exists && pb.usable && (pb.batch_ttl < 0 || pb.batch_ttl >= STAMP_LOW_TTL_SECS);
894
895 Ok(StampStatus {
896 batch_id: batch_id.to_string(),
897 exists: pb.exists,
898 usable: pb.usable,
899 batch_ttl: pb.batch_ttl,
900 healthy,
901 warnings,
902 })
903}
904
905fn human_bytes(b: u64) -> String {
906 const KIB: u64 = 1024;
907 const MIB: u64 = KIB * 1024;
908 const GIB: u64 = MIB * 1024;
909 if b >= GIB {
910 format!("{:.2} GiB", b as f64 / GIB as f64)
911 } else if b >= MIB {
912 format!("{:.2} MiB", b as f64 / MIB as f64)
913 } else if b >= KIB {
914 format!("{:.2} KiB", b as f64 / KIB as f64)
915 } else {
916 format!("{b} B")
917 }
918}
919
920fn humanize_secs(s: i64) -> String {
921 if s < 0 {
922 return "unknown".to_string();
923 }
924 let s = s as u64;
925 if s >= 86_400 {
926 format!("{} day(s)", s / 86_400)
927 } else if s >= 3_600 {
928 format!("{} hour(s)", s / 3_600)
929 } else if s >= 60 {
930 format!("{} min", s / 60)
931 } else {
932 format!("{}s", s)
933 }
934}
935
936pub fn render_report(report: &Report, fmt: OutputFormat) -> String {
937 match fmt {
938 OutputFormat::Json => {
939 serde_json::to_string_pretty(report).expect("report serialization") + "\n"
940 }
941 OutputFormat::Text => render_text(report),
942 }
943}
944
945fn render_text(r: &Report) -> String {
946 use std::fmt::Write;
947 let mut out = String::new();
948 let _ = writeln!(out, "ref {}", r.reference);
949 let _ = writeln!(out, "status {:?}", r.status);
950 let _ = writeln!(out);
951 let _ = writeln!(out, "vantages:");
952 let url_w = r.vantages.iter().map(|v| v.bee_url.len()).max().unwrap_or(20);
953 for v in &r.vantages {
954 let state = match (v.retrievable, &v.error) {
955 (Some(true), _) => "retrievable",
956 (Some(false), _) => "unretrievable",
957 (None, Some(_)) => "error",
958 (None, None) => "unknown",
959 };
960 let meta = vantage_meta(v);
961 let _ = writeln!(
962 out,
963 " {:<url_w$} {:<14} {:>6} ms{}{}",
964 v.bee_url,
965 state,
966 v.elapsed_ms,
967 if meta.is_empty() { String::new() } else { format!(" {meta}") },
968 v.error
969 .as_deref()
970 .map(|e| format!(" ({e})"))
971 .unwrap_or_default(),
972 url_w = url_w
973 );
974 }
975 if !r.gateways.is_empty() {
976 let _ = writeln!(out);
977 let _ = writeln!(out, "gateways:");
978 let url_w = r.gateways.iter().map(|g| g.url.len()).max().unwrap_or(20);
979 for g in &r.gateways {
980 let state = match (g.retrievable, &g.error) {
981 (Some(true), _) => "retrievable",
982 (Some(false), _) => "unretrievable",
983 (None, _) => "error",
984 };
985 let code = g
986 .status_code
987 .map(|c| format!(" HTTP {c}"))
988 .unwrap_or_default();
989 let _ = writeln!(
990 out,
991 " {:<url_w$} {:<14} {:>6} ms{}{}",
992 g.url,
993 state,
994 g.elapsed_ms,
995 code,
996 g.error
997 .as_deref()
998 .map(|e| format!(" ({e})"))
999 .unwrap_or_default(),
1000 url_w = url_w
1001 );
1002 }
1003 }
1004 if !r.cold_downloads.is_empty() {
1005 let _ = writeln!(out);
1006 let _ = writeln!(out, "cold downloads:");
1007 let url_w = r.cold_downloads.iter().map(|c| c.url.len()).max().unwrap_or(20);
1008 for c in &r.cold_downloads {
1009 let state = if c.success { "ok" } else { "fail" };
1010 let bytes = human_bytes(c.bytes_downloaded);
1011 let code = c
1012 .status_code
1013 .map(|s| format!(" HTTP {s}"))
1014 .unwrap_or_default();
1015 let _ = writeln!(
1016 out,
1017 " {:<url_w$} {:<4} {:>6} ms {:>9}{}{}",
1018 c.url,
1019 state,
1020 c.elapsed_ms,
1021 bytes,
1022 code,
1023 c.error
1024 .as_deref()
1025 .map(|e| format!(" ({e})"))
1026 .unwrap_or_default(),
1027 url_w = url_w
1028 );
1029 }
1030 }
1031 if let Some(res) = &r.resolution {
1032 let _ = writeln!(out);
1033 match res {
1034 Resolution::Feed { owner, topic, resolved_reference } => {
1035 let _ = writeln!(
1036 out,
1037 "resolved feed owner={owner} topic={topic} -> {resolved_reference}",
1038 );
1039 }
1040 }
1041 }
1042 if let Some(stats) = &r.chunk_stats {
1043 let _ = writeln!(out);
1044 let _ = writeln!(out, "chunk stats per vantage:");
1045 let url_w = stats
1046 .per_vantage
1047 .keys()
1048 .map(|k| k.len())
1049 .max()
1050 .unwrap_or(20);
1051 for (url, row) in &stats.per_vantage {
1052 let _ = writeln!(
1053 out,
1054 " {:<url_w$} found {:>3}/{:<3} p50 {:>5} ms · p95 {:>5} ms · max {:>5} ms",
1055 url,
1056 row.found,
1057 row.total,
1058 fmt_ms(row.elapsed_p50_ms),
1059 fmt_ms(row.elapsed_p95_ms),
1060 fmt_ms(row.elapsed_max_ms),
1061 url_w = url_w
1062 );
1063 }
1064 if !stats.per_neighborhood.is_empty() {
1065 let _ = writeln!(out);
1066 let _ = writeln!(out, "chunk stats per neighborhood:");
1067 let mut rows: Vec<(&String, &ChunkStatRow)> =
1068 stats.per_neighborhood.iter().collect();
1069 rows.sort_by(|a, b| {
1070 b.1.elapsed_p95_ms
1071 .unwrap_or(0)
1072 .cmp(&a.1.elapsed_p95_ms.unwrap_or(0))
1073 });
1074 for (nb, row) in rows.iter().take(10) {
1075 let _ = writeln!(
1076 out,
1077 " nb {} found {:>3}/{:<3} p50 {:>5} ms · p95 {:>5} ms",
1078 nb,
1079 row.found,
1080 row.total,
1081 fmt_ms(row.elapsed_p50_ms),
1082 fmt_ms(row.elapsed_p95_ms),
1083 );
1084 }
1085 if rows.len() > 10 {
1086 let _ = writeln!(out, " ... {} more neighborhoods", rows.len() - 10);
1087 }
1088 }
1089 }
1090 if let Some(chunks) = &r.chunks {
1091 let _ = writeln!(out);
1092 let _ = writeln!(out, "chunks: {} probed", chunks.len());
1093 let mut missing = 0usize;
1094 for c in chunks {
1095 let missing_in: Vec<String> = c
1096 .per_vantage
1097 .iter()
1098 .filter(|(_, cv)| !cv.found)
1099 .map(|(u, cv)| match cv.proximity {
1100 Some(p) => format!("{u} (PO {p})"),
1101 None => u.clone(),
1102 })
1103 .collect();
1104 if !missing_in.is_empty() {
1105 missing += 1;
1106 let _ = writeln!(
1107 out,
1108 " [{}] {} missing on: {}",
1109 c.neighborhood,
1110 short(&c.address),
1111 missing_in.join(", ")
1112 );
1113 }
1114 }
1115 if missing == 0 {
1116 let _ = writeln!(out, " all chunks present on all vantages");
1117 } else {
1118 let _ = writeln!(out, " {missing} chunk(s) missing on at least one vantage");
1119 }
1120 }
1121 out
1122}
1123
1124fn vantage_meta(v: &VantageResult) -> String {
1129 let mut parts: Vec<String> = Vec::new();
1130 if let Some(o) = &v.overlay {
1131 let neigh = o.chars().take(2).collect::<String>();
1132 let short_overlay = short_overlay(o);
1133 parts.push(format!("overlay {short_overlay} (nb {neigh})"));
1134 }
1135 if let Some(p) = v.proximity_to_root {
1136 parts.push(format!("PO {p}"));
1137 }
1138 if let Some(p) = v.target_proximity {
1139 parts.push(format!("tgtPO {p}"));
1140 }
1141 if let Some(ver) = &v.bee_version {
1142 parts.push(format!("v{ver}"));
1143 }
1144 if parts.is_empty() {
1145 String::new()
1146 } else {
1147 format!("· {}", parts.join(" · "))
1148 }
1149}
1150
1151fn fmt_ms(v: Option<u64>) -> String {
1152 match v {
1153 Some(ms) => format!("{ms}"),
1154 None => "—".to_string(),
1155 }
1156}
1157
1158fn short_overlay(hex: &str) -> String {
1159 let s = hex.strip_prefix("0x").unwrap_or(hex);
1160 if s.len() > 12 {
1161 format!("{}…{}", &s[..6], &s[s.len() - 4..])
1162 } else {
1163 s.to_string()
1164 }
1165}
1166
1167pub fn render_stamp_status(s: &StampStatus) -> String {
1170 use std::fmt::Write;
1171 let mut out = String::new();
1172 let ttl = if s.batch_ttl < 0 {
1173 "unknown".to_string()
1174 } else {
1175 humanize_secs(s.batch_ttl)
1176 };
1177 let header = if s.healthy { "stamp OK" } else { "stamp warning" };
1178 let _ = writeln!(
1179 out,
1180 "{header}: batch {} · usable={} · ttl={}",
1181 short_overlay(&s.batch_id),
1182 s.usable,
1183 ttl,
1184 );
1185 for w in &s.warnings {
1186 let _ = writeln!(out, " · {w}");
1187 }
1188 out
1189}
1190
1191fn short(hex: &str) -> String {
1192 if hex.len() > 16 {
1193 format!("{}…{}", &hex[..8], &hex[hex.len() - 4..])
1194 } else {
1195 hex.to_string()
1196 }
1197}