1use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use anyhow::{Context, Result, anyhow};
12use bee::Client;
13use bee::manifest::{is_null_address, unmarshal};
14use bee::swarm::gsoc::proximity;
15use bee::swarm::{BatchId, EthAddress, Reference, Topic};
16use futures::stream::{FuturesUnordered, StreamExt};
17use reqwest::Method;
18use serde::{Deserialize, Serialize};
19use tokio::sync::Semaphore;
20use tracing::{debug, info, trace, warn};
21
22pub const DEFAULT_GATEWAY: &str = "https://api.gateway.ethswarm.org";
28
29const MAX_CHUNKS: usize = 1000;
30const STAMP_LOW_TTL_SECS: i64 = 86_400;
33
34#[derive(Copy, Clone, Debug)]
35pub enum OutputFormat {
36 Text,
37 Json,
38}
39
40#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
42#[serde(rename_all = "snake_case")]
43pub enum Status {
44 Retrievable,
45 Unretrievable,
46 Partial,
47 Error,
48}
49
50#[derive(Debug, Serialize, Deserialize)]
51pub struct Report {
52 pub reference: String,
53 pub status: Status,
54 pub vantages: Vec<VantageResult>,
55 #[serde(default, skip_serializing_if = "Vec::is_empty")]
58 pub gateways: Vec<GatewayResult>,
59 #[serde(default, skip_serializing_if = "Option::is_none")]
62 pub resolution: Option<Resolution>,
63 #[serde(skip_serializing_if = "Option::is_none")]
65 pub chunks: Option<Vec<ChunkProbe>>,
66 #[serde(skip_serializing_if = "Option::is_none")]
69 pub chunk_stats: Option<ChunkStats>,
70 #[serde(default, skip_serializing_if = "Vec::is_empty")]
75 pub cold_downloads: Vec<ColdDownloadResult>,
76 pub spec_version: u32,
77}
78
79#[derive(Debug, Serialize, Deserialize)]
83pub struct ChunkStats {
84 pub per_vantage: BTreeMap<String, ChunkStatRow>,
85 pub per_neighborhood: BTreeMap<String, ChunkStatRow>,
87}
88
89#[derive(Debug, Serialize, Deserialize, Clone)]
90pub struct ChunkStatRow {
91 pub total: usize,
92 pub found: usize,
93 pub missing: usize,
94 #[serde(skip_serializing_if = "Option::is_none")]
97 pub elapsed_p50_ms: Option<u64>,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 pub elapsed_p95_ms: Option<u64>,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub elapsed_max_ms: Option<u64>,
102}
103
104#[derive(Debug, Serialize, Deserialize)]
109pub struct GatewayResult {
110 pub url: String,
111 pub retrievable: Option<bool>,
114 pub elapsed_ms: u64,
115 #[serde(skip_serializing_if = "Option::is_none")]
117 pub status_code: Option<u16>,
118 #[serde(skip_serializing_if = "Option::is_none")]
119 pub error: Option<String>,
120}
121
122#[derive(Debug, Serialize, Deserialize)]
125#[serde(tag = "kind", rename_all = "snake_case")]
126pub enum Resolution {
127 Feed {
130 owner: String,
131 topic: String,
132 resolved_reference: String,
134 },
135}
136
137#[derive(Debug, Serialize, Deserialize)]
138pub struct VantageResult {
139 pub bee_url: String,
140 pub retrievable: Option<bool>,
142 pub elapsed_ms: u64,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 pub error: Option<String>,
145 #[serde(skip_serializing_if = "Option::is_none")]
148 pub overlay: Option<String>,
149 #[serde(skip_serializing_if = "Option::is_none")]
151 pub bee_version: Option<String>,
152 #[serde(skip_serializing_if = "Option::is_none")]
156 pub proximity_to_root: Option<u32>,
157 #[serde(skip_serializing_if = "Option::is_none")]
161 pub target_proximity: Option<u32>,
162}
163
164#[derive(Debug, Serialize, Deserialize)]
165pub struct ChunkProbe {
166 pub address: String,
167 pub neighborhood: String,
169 pub per_vantage: BTreeMap<String, ChunkVantage>,
170}
171
172#[derive(Debug, Serialize, Deserialize)]
173pub struct ChunkVantage {
174 pub found: bool,
175 pub elapsed_ms: u64,
176 #[serde(skip_serializing_if = "Option::is_none")]
177 pub error: Option<String>,
178 #[serde(skip_serializing_if = "Option::is_none")]
181 pub proximity: Option<u32>,
182}
183
184#[derive(Debug, Serialize, Deserialize)]
190pub struct ColdDownloadResult {
191 pub url: String,
192 pub endpoint: String,
195 pub success: bool,
198 pub bytes_downloaded: u64,
200 pub elapsed_ms: u64,
201 #[serde(skip_serializing_if = "Option::is_none")]
202 pub status_code: Option<u16>,
203 #[serde(skip_serializing_if = "Option::is_none")]
204 pub error: Option<String>,
205}
206
207pub struct ReseedRequest {
208 pub reference: String,
209 pub bee_url: String,
210 pub batch_id: String,
211 pub timeout: Duration,
212}
213
214#[derive(Debug, Serialize, Deserialize)]
218pub struct StampStatus {
219 pub batch_id: String,
220 pub exists: bool,
221 pub usable: bool,
222 pub batch_ttl: i64,
223 pub healthy: bool,
226 #[serde(skip_serializing_if = "Vec::is_empty")]
227 pub warnings: Vec<String>,
228}
229
230const SPEC_VERSION: u32 = 1;
231
232fn parse_reference(s: &str) -> Result<Reference> {
233 Reference::from_hex(s).map_err(|e| anyhow!("invalid reference {s}: {e}"))
234}
235
236fn make_bee(url: &str, timeout: Duration) -> Result<Client> {
237 let http = reqwest::Client::builder()
238 .timeout(timeout)
239 .build()
240 .context("building http client")?;
241 Client::with_http_client(url, http).map_err(|e| anyhow!("invalid bee url {url}: {e}"))
242}
243
244pub async fn check_multi_vantage(
251 reference: &str,
252 bees: &[String],
253 timeout: Duration,
254) -> Result<Report> {
255 let r = parse_reference(reference)?;
256 let root_bytes = first_32(&r);
257 info!(
258 reference = reference,
259 vantages = bees.len(),
260 timeout_secs = timeout.as_secs(),
261 "starting multi-vantage probe",
262 );
263
264 let mut futs = FuturesUnordered::new();
265 for bee_url in bees {
266 let bee_url = bee_url.clone();
267 let r = r.clone();
268 futs.push(async move {
269 debug!(bee_url = %bee_url, "probing vantage");
270 let bee = match make_bee(&bee_url, timeout) {
271 Ok(b) => b,
272 Err(e) => {
273 return VantageResult {
274 bee_url,
275 retrievable: None,
276 elapsed_ms: 0,
277 error: Some(format!("{e:#}")),
278 overlay: None,
279 bee_version: None,
280 proximity_to_root: None,
281 target_proximity: None,
282 };
283 }
284 };
285 let started = Instant::now();
286 let api = bee.api();
287 let debug = bee.debug();
288 let (stew_res, addr_res, health_res) = tokio::join!(
289 api.is_retrievable(&r),
290 debug.addresses(),
291 debug.health(),
292 );
293 let elapsed_ms = started.elapsed().as_millis() as u64;
294
295 let overlay = addr_res.ok().map(|a| a.overlay);
296 let bee_version = health_res.ok().map(|h| h.version);
297 let proximity_to_root = overlay
298 .as_deref()
299 .and_then(decode_overlay)
300 .map(|o| proximity(&o, &root_bytes));
301
302 match stew_res {
303 Ok(ok) => {
304 debug!(
305 bee_url = %bee_url,
306 retrievable = ok,
307 elapsed_ms,
308 overlay = overlay.as_deref().unwrap_or("?"),
309 "vantage done",
310 );
311 VantageResult {
312 bee_url,
313 retrievable: Some(ok),
314 elapsed_ms,
315 error: None,
316 overlay,
317 bee_version,
318 proximity_to_root,
319 target_proximity: None,
320 }
321 }
322 Err(e) => {
323 debug!(bee_url = %bee_url, error = %e, "vantage errored");
324 VantageResult {
325 bee_url,
326 retrievable: None,
327 elapsed_ms,
328 error: Some(format!("{e}")),
329 overlay,
330 bee_version,
331 proximity_to_root,
332 target_proximity: None,
333 }
334 }
335 }
336 });
337 }
338
339 let mut vantages = Vec::with_capacity(bees.len());
340 while let Some(v) = futs.next().await {
341 vantages.push(v);
342 }
343 vantages.sort_by(|a, b| a.bee_url.cmp(&b.bee_url));
344
345 let status = aggregate_status(&vantages, &[]);
346 Ok(Report {
347 reference: reference.to_string(),
348 status,
349 vantages,
350 gateways: Vec::new(),
351 resolution: None,
352 chunks: None,
353 chunk_stats: None,
354 cold_downloads: Vec::new(),
355 spec_version: SPEC_VERSION,
356 })
357}
358
359pub async fn check_gateways(
364 reference: &str,
365 gateway_urls: &[String],
366 timeout: Duration,
367) -> Result<Vec<GatewayResult>> {
368 let _ = parse_reference(reference)?;
371 if !gateway_urls.is_empty() {
372 info!(count = gateway_urls.len(), "probing gateways");
373 }
374
375 let http = reqwest::Client::builder()
376 .timeout(timeout)
377 .build()
378 .context("building http client for gateway probes")?;
379
380 let mut futs = FuturesUnordered::new();
381 for base in gateway_urls {
382 let base = base.clone();
383 let reference = reference.to_string();
384 let http = http.clone();
385 futs.push(async move {
386 let url = build_gateway_url(&base, &reference);
387 let started = Instant::now();
388 let res = http.request(Method::HEAD, &url).send().await;
389 let elapsed_ms = started.elapsed().as_millis() as u64;
390 match res {
391 Ok(resp) => {
392 let status = resp.status().as_u16();
393 GatewayResult {
394 url: base,
395 retrievable: Some(resp.status().is_success()),
396 elapsed_ms,
397 status_code: Some(status),
398 error: None,
399 }
400 }
401 Err(e) => GatewayResult {
402 url: base,
403 retrievable: None,
404 elapsed_ms,
405 status_code: None,
406 error: Some(format!("{e}")),
407 },
408 }
409 });
410 }
411 let mut out = Vec::with_capacity(gateway_urls.len());
412 while let Some(g) = futs.next().await {
413 out.push(g);
414 }
415 out.sort_by(|a, b| a.url.cmp(&b.url));
416 Ok(out)
417}
418
419pub fn merge_gateways(mut report: Report, gateways: Vec<GatewayResult>) -> Report {
423 report.gateways = gateways;
424 report.status = aggregate_status(&report.vantages, &report.gateways);
425 report
426}
427
428fn build_gateway_url(base: &str, reference: &str) -> String {
429 let trimmed = base.trim_end_matches('/');
430 format!("{trimmed}/bzz/{reference}/")
431}
432
433fn build_bytes_url(base: &str, reference: &str) -> String {
434 let trimmed = base.trim_end_matches('/');
435 format!("{trimmed}/bytes/{reference}")
436}
437
438pub async fn cold_download_all(
446 bee_urls: &[String],
447 gateway_urls: &[String],
448 reference: &str,
449 timeout: Duration,
450) -> Result<Vec<ColdDownloadResult>> {
451 let _ = parse_reference(reference)?;
452 info!(
453 bee_count = bee_urls.len(),
454 gateway_count = gateway_urls.len(),
455 "starting cold-download probes",
456 );
457
458 let http = reqwest::Client::builder()
459 .timeout(timeout)
460 .build()
461 .context("building http client for cold-download probes")?;
462
463 let mut targets: Vec<(String, String, String)> =
465 Vec::with_capacity(bee_urls.len() + gateway_urls.len());
466 for url in bee_urls {
467 targets.push((
468 url.clone(),
469 "/bytes/{ref}".to_string(),
470 build_bytes_url(url, reference),
471 ));
472 }
473 for url in gateway_urls {
474 targets.push((
475 url.clone(),
476 "/bzz/{ref}/".to_string(),
477 build_gateway_url(url, reference),
478 ));
479 }
480
481 let mut futs = FuturesUnordered::new();
482 for (base, endpoint, probe_url) in targets {
483 let http = http.clone();
484 futs.push(async move { cold_probe(&http, base, endpoint, probe_url).await });
485 }
486
487 let mut out = Vec::with_capacity(bee_urls.len() + gateway_urls.len());
488 while let Some(c) = futs.next().await {
489 out.push(c);
490 }
491 out.sort_by(|a, b| a.url.cmp(&b.url));
492 Ok(out)
493}
494
495async fn cold_probe(
496 http: &reqwest::Client,
497 base: String,
498 endpoint: String,
499 probe_url: String,
500) -> ColdDownloadResult {
501 debug!(url = %probe_url, "cold-download GET");
502 let started = Instant::now();
503 let res = http.get(&probe_url).send().await;
504 match res {
505 Ok(mut resp) => {
506 let status = resp.status();
507 let status_code = status.as_u16();
508 if !status.is_success() {
509 return ColdDownloadResult {
510 url: base,
511 endpoint,
512 success: false,
513 bytes_downloaded: 0,
514 elapsed_ms: started.elapsed().as_millis() as u64,
515 status_code: Some(status_code),
516 error: Some(format!("HTTP {status_code}")),
517 };
518 }
519 let mut bytes: u64 = 0;
520 let mut err: Option<String> = None;
521 loop {
522 match resp.chunk().await {
523 Ok(Some(chunk)) => bytes += chunk.len() as u64,
524 Ok(None) => break,
525 Err(e) => {
526 err = Some(format!("stream error after {bytes} bytes: {e}"));
527 break;
528 }
529 }
530 }
531 ColdDownloadResult {
532 url: base,
533 endpoint,
534 success: err.is_none(),
535 bytes_downloaded: bytes,
536 elapsed_ms: started.elapsed().as_millis() as u64,
537 status_code: Some(status_code),
538 error: err,
539 }
540 }
541 Err(e) => ColdDownloadResult {
542 url: base,
543 endpoint,
544 success: false,
545 bytes_downloaded: 0,
546 elapsed_ms: started.elapsed().as_millis() as u64,
547 status_code: None,
548 error: Some(format!("{e}")),
549 },
550 }
551}
552
553pub async fn resolve_feed(
558 bee_url: &str,
559 owner_hex: &str,
560 topic_hex: &str,
561 timeout: Duration,
562) -> Result<(String, Resolution)> {
563 let bee = make_bee(bee_url, timeout)?;
564 let owner = EthAddress::from_hex(owner_hex)
565 .map_err(|e| anyhow!("invalid feed owner {owner_hex}: {e}"))?;
566 let topic = Topic::from_hex(topic_hex)
567 .map_err(|e| anyhow!("invalid feed topic {topic_hex}: {e}"))?;
568 let reference = bee
569 .file()
570 .get_feed_lookup(&owner, &topic)
571 .await
572 .map_err(anyhow::Error::from)?;
573 let r_hex = reference.to_hex();
574 Ok((
575 r_hex.clone(),
576 Resolution::Feed {
577 owner: owner.to_hex(),
578 topic: topic.to_hex(),
579 resolved_reference: r_hex,
580 },
581 ))
582}
583
584pub fn parse_input(input: &str) -> ParsedInput {
590 if let Some(rest) = input.strip_prefix("feed:") {
591 let parts: Vec<&str> = rest.splitn(2, [':', '/']).collect();
593 if parts.len() == 2 {
594 return ParsedInput::Feed {
595 owner: parts[0].to_string(),
596 topic: parts[1].to_string(),
597 };
598 }
599 }
600 ParsedInput::Reference(input.to_string())
601}
602
603pub enum ParsedInput {
605 Reference(String),
608 Feed { owner: String, topic: String },
611}
612
613fn decode_overlay(hex: &str) -> Option<[u8; 32]> {
617 let s = hex.strip_prefix("0x").unwrap_or(hex);
618 if s.len() < 64 {
619 return None;
620 }
621 let mut out = [0u8; 32];
622 for (i, b) in out.iter_mut().enumerate() {
623 let h = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16).ok()?;
624 *b = h;
625 }
626 Some(out)
627}
628
629fn first_32(r: &Reference) -> [u8; 32] {
630 let mut out = [0u8; 32];
631 out.copy_from_slice(&r.as_bytes()[..32]);
632 out
633}
634
635fn aggregate_status(vantages: &[VantageResult], gateways: &[GatewayResult]) -> Status {
636 let outcomes: Vec<Option<bool>> = vantages
637 .iter()
638 .map(|v| v.retrievable)
639 .chain(gateways.iter().map(|g| g.retrievable))
640 .collect();
641 let total = outcomes.len();
642 if total == 0 {
643 return Status::Error;
644 }
645 let retr = outcomes.iter().filter(|o| **o == Some(true)).count();
646 let unret = outcomes.iter().filter(|o| **o == Some(false)).count();
647 let err = outcomes.iter().filter(|o| o.is_none()).count();
648 if err == total {
649 Status::Error
650 } else if retr == total {
651 Status::Retrievable
652 } else if retr == 0 && unret + err == total {
653 Status::Unretrievable
654 } else {
655 Status::Partial
656 }
657}
658
659pub async fn drill_down(
665 mut report: Report,
666 bees: &[String],
667 timeout: Duration,
668 concurrency: usize,
669) -> Result<Report> {
670 let r = parse_reference(&report.reference)?;
671 let walker_bee = bees.first().context("no bee URL for drill-down")?;
673 let walker = make_bee(walker_bee, timeout)?;
674
675 let addresses = collect_chunk_addresses(&walker, &r).await?;
676 info!(chunks = addresses.len(), concurrency, "drill-down probing chunks");
677
678 let overlays: BTreeMap<String, [u8; 32]> = report
681 .vantages
682 .iter()
683 .filter_map(|v| {
684 v.overlay
685 .as_deref()
686 .and_then(decode_overlay)
687 .map(|o| (v.bee_url.clone(), o))
688 })
689 .collect();
690
691 let clients: Vec<(String, Client)> = bees
692 .iter()
693 .map(|u| make_bee(u, timeout).map(|b| (u.clone(), b)))
694 .collect::<Result<_>>()?;
695
696 let sem = Arc::new(Semaphore::new(concurrency.max(1)));
697 let mut probes = Vec::with_capacity(addresses.len());
698
699 let mut futs = FuturesUnordered::new();
700 for addr in addresses {
701 let sem = sem.clone();
702 let clients = clients.clone();
703 let overlays = overlays.clone();
704 futs.push(async move {
705 let chunk_bytes = first_32_of_ref(&addr);
706 let mut per_vantage = BTreeMap::new();
707 for (url, bee) in &clients {
708 let _permit = sem.acquire().await.expect("semaphore not closed");
709 let started = Instant::now();
710 let res = bee.file().download_chunk(&addr, None).await;
711 let elapsed_ms = started.elapsed().as_millis() as u64;
712 let prox = overlays
713 .get(url)
714 .map(|o| proximity(o, &chunk_bytes));
715 trace!(
716 chunk = %addr.to_hex(),
717 bee_url = %url,
718 found = res.is_ok(),
719 elapsed_ms,
720 "chunk probe",
721 );
722 let cv = match res {
723 Ok(_) => ChunkVantage {
724 found: true,
725 elapsed_ms,
726 error: None,
727 proximity: prox,
728 },
729 Err(e) => ChunkVantage {
730 found: false,
731 elapsed_ms,
732 error: Some(format!("{e}")),
733 proximity: prox,
734 },
735 };
736 per_vantage.insert(url.clone(), cv);
737 }
738 let hex = addr.to_hex();
739 let neighborhood = hex.chars().take(2).collect::<String>();
740 ChunkProbe {
741 address: hex,
742 neighborhood,
743 per_vantage,
744 }
745 });
746 }
747
748 while let Some(p) = futs.next().await {
749 probes.push(p);
750 }
751 probes.sort_by(|a, b| a.address.cmp(&b.address));
752 report.chunk_stats = Some(compute_chunk_stats(&probes));
753 report.chunks = Some(probes);
754 Ok(report)
755}
756
757pub fn compute_chunk_stats(probes: &[ChunkProbe]) -> ChunkStats {
761 let mut per_vantage: BTreeMap<String, (Vec<u64>, usize, usize)> = BTreeMap::new();
763 let mut per_neighborhood: BTreeMap<String, (Vec<u64>, usize, usize)> = BTreeMap::new();
764
765 for p in probes {
766 for (url, cv) in &p.per_vantage {
767 let entry = per_vantage.entry(url.clone()).or_default();
768 if cv.found {
769 entry.0.push(cv.elapsed_ms);
770 entry.1 += 1;
771 } else {
772 entry.2 += 1;
773 }
774 let n = per_neighborhood.entry(p.neighborhood.clone()).or_default();
775 if cv.found {
776 n.0.push(cv.elapsed_ms);
777 n.1 += 1;
778 } else {
779 n.2 += 1;
780 }
781 }
782 }
783
784 let to_row = |(latencies, found, missing): (Vec<u64>, usize, usize)| -> ChunkStatRow {
785 let mut sorted = latencies;
786 sorted.sort_unstable();
787 let p = |q: f64| -> Option<u64> {
788 if sorted.is_empty() {
789 None
790 } else {
791 let idx = ((sorted.len() as f64 - 1.0) * q).round() as usize;
792 Some(sorted[idx])
793 }
794 };
795 ChunkStatRow {
796 total: found + missing,
797 found,
798 missing,
799 elapsed_p50_ms: p(0.50),
800 elapsed_p95_ms: p(0.95),
801 elapsed_max_ms: sorted.last().copied(),
802 }
803 };
804
805 ChunkStats {
806 per_vantage: per_vantage.into_iter().map(|(k, v)| (k, to_row(v))).collect(),
807 per_neighborhood: per_neighborhood
808 .into_iter()
809 .map(|(k, v)| (k, to_row(v)))
810 .collect(),
811 }
812}
813
814pub fn annotate_target_overlay(mut report: Report, target_overlay_hex: &str) -> Report {
819 let Some(target) = decode_overlay(target_overlay_hex) else {
820 warn!(
824 target = target_overlay_hex,
825 "--target-overlay value is not valid 64-hex overlay; flag ignored",
826 );
827 return report;
828 };
829 for v in &mut report.vantages {
830 if let Some(o) = v.overlay.as_deref().and_then(decode_overlay) {
831 v.target_proximity = Some(proximity(&o, &target));
832 }
833 }
834 report
835 .vantages
836 .sort_by(|a, b| b.target_proximity.cmp(&a.target_proximity));
837 report
838}
839
840fn first_32_of_ref(r: &Reference) -> [u8; 32] {
841 let mut out = [0u8; 32];
842 out.copy_from_slice(&r.as_bytes()[..32]);
843 out
844}
845
846async fn collect_chunk_addresses(bee: &Client, root: &Reference) -> Result<Vec<Reference>> {
851 let mut addresses: Vec<Reference> = vec![root.clone()];
852 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
853 seen.insert(root.to_hex());
854
855 let mut queue: std::collections::VecDeque<Reference> = std::collections::VecDeque::new();
856 queue.push_back(root.clone());
857
858 while let Some(addr) = queue.pop_front() {
859 if addresses.len() >= MAX_CHUNKS {
860 break;
861 }
862 let bytes = match bee.file().download_chunk(&addr, None).await {
863 Ok(b) => b,
864 Err(_) => continue,
867 };
868 let Ok(node) = unmarshal(&bytes, addr.as_bytes()) else {
869 continue;
872 };
873 if !is_null_address(&node.target_address) {
875 if let Ok(r) = Reference::new(&node.target_address) {
876 if seen.insert(r.to_hex()) {
877 addresses.push(r);
878 }
879 }
880 }
881 for fork in node.forks.values() {
883 if let Some(sa) = fork.node.self_address {
884 if let Ok(r) = Reference::new(&sa) {
885 if seen.insert(r.to_hex()) {
886 addresses.push(r.clone());
887 queue.push_back(r);
888 }
889 }
890 }
891 }
892 }
893 Ok(addresses)
894}
895
896pub async fn reseed(req: ReseedRequest) -> Result<()> {
897 let bee = make_bee(&req.bee_url, req.timeout)?;
898 let r = parse_reference(&req.reference)?;
899 let batch = BatchId::from_hex(&req.batch_id)
900 .map_err(|e| anyhow!("invalid batch id {}: {e}", req.batch_id))?;
901 info!(
902 bee_url = %req.bee_url,
903 batch = %req.batch_id,
904 "re-uploading via PUT /stewardship/{{ref}}",
905 );
906 bee.api().reupload(&r, &batch).await?;
907 Ok(())
908}
909
910pub async fn check_stamp(
915 bee_url: &str,
916 batch_id: &str,
917 timeout: Duration,
918) -> Result<StampStatus> {
919 let bee = make_bee(bee_url, timeout)?;
920 let batch = BatchId::from_hex(batch_id)
921 .map_err(|e| anyhow!("invalid batch id {batch_id}: {e}"))?;
922 let pb = bee
923 .postage()
924 .get_postage_batch(&batch)
925 .await
926 .map_err(anyhow::Error::from)?;
927
928 let mut warnings = Vec::new();
929 if !pb.exists {
930 warnings.push("batch not known to this Bee".to_string());
931 }
932 if !pb.usable {
933 warnings.push("batch not usable yet (chain may be syncing)".to_string());
934 }
935 if pb.batch_ttl >= 0 && pb.batch_ttl < STAMP_LOW_TTL_SECS {
936 warnings.push(format!(
937 "batch TTL low: ~{} (re-seed may not outlive the batch)",
938 humanize_secs(pb.batch_ttl)
939 ));
940 }
941
942 let healthy = pb.exists && pb.usable && (pb.batch_ttl < 0 || pb.batch_ttl >= STAMP_LOW_TTL_SECS);
943
944 Ok(StampStatus {
945 batch_id: batch_id.to_string(),
946 exists: pb.exists,
947 usable: pb.usable,
948 batch_ttl: pb.batch_ttl,
949 healthy,
950 warnings,
951 })
952}
953
954fn human_bytes(b: u64) -> String {
955 const KIB: u64 = 1024;
956 const MIB: u64 = KIB * 1024;
957 const GIB: u64 = MIB * 1024;
958 if b >= GIB {
959 format!("{:.2} GiB", b as f64 / GIB as f64)
960 } else if b >= MIB {
961 format!("{:.2} MiB", b as f64 / MIB as f64)
962 } else if b >= KIB {
963 format!("{:.2} KiB", b as f64 / KIB as f64)
964 } else {
965 format!("{b} B")
966 }
967}
968
969fn humanize_secs(s: i64) -> String {
970 if s < 0 {
971 return "unknown".to_string();
972 }
973 let s = s as u64;
974 if s >= 86_400 {
975 format!("{} day(s)", s / 86_400)
976 } else if s >= 3_600 {
977 format!("{} hour(s)", s / 3_600)
978 } else if s >= 60 {
979 format!("{} min", s / 60)
980 } else {
981 format!("{}s", s)
982 }
983}
984
985pub fn render_report(report: &Report, fmt: OutputFormat) -> String {
986 match fmt {
987 OutputFormat::Json => {
988 serde_json::to_string_pretty(report).expect("report serialization") + "\n"
989 }
990 OutputFormat::Text => render_text(report),
991 }
992}
993
994fn render_text(r: &Report) -> String {
995 use std::fmt::Write;
996 let mut out = String::new();
997 let _ = writeln!(out, "ref {}", r.reference);
998 let _ = writeln!(out, "status {:?}", r.status);
999 let _ = writeln!(out);
1000 let _ = writeln!(out, "vantages:");
1001 let url_w = r.vantages.iter().map(|v| v.bee_url.len()).max().unwrap_or(20);
1002 for v in &r.vantages {
1003 let state = match (v.retrievable, &v.error) {
1004 (Some(true), _) => "retrievable",
1005 (Some(false), _) => "unretrievable",
1006 (None, Some(_)) => "error",
1007 (None, None) => "unknown",
1008 };
1009 let meta = vantage_meta(v);
1010 let _ = writeln!(
1011 out,
1012 " {:<url_w$} {:<14} {:>6} ms{}{}",
1013 v.bee_url,
1014 state,
1015 v.elapsed_ms,
1016 if meta.is_empty() { String::new() } else { format!(" {meta}") },
1017 v.error
1018 .as_deref()
1019 .map(|e| format!(" ({e})"))
1020 .unwrap_or_default(),
1021 url_w = url_w
1022 );
1023 }
1024 if !r.gateways.is_empty() {
1025 let _ = writeln!(out);
1026 let _ = writeln!(out, "gateways:");
1027 let url_w = r.gateways.iter().map(|g| g.url.len()).max().unwrap_or(20);
1028 for g in &r.gateways {
1029 let state = match (g.retrievable, &g.error) {
1030 (Some(true), _) => "retrievable",
1031 (Some(false), _) => "unretrievable",
1032 (None, _) => "error",
1033 };
1034 let code = g
1035 .status_code
1036 .map(|c| format!(" HTTP {c}"))
1037 .unwrap_or_default();
1038 let _ = writeln!(
1039 out,
1040 " {:<url_w$} {:<14} {:>6} ms{}{}",
1041 g.url,
1042 state,
1043 g.elapsed_ms,
1044 code,
1045 g.error
1046 .as_deref()
1047 .map(|e| format!(" ({e})"))
1048 .unwrap_or_default(),
1049 url_w = url_w
1050 );
1051 }
1052 }
1053 if !r.cold_downloads.is_empty() {
1054 let _ = writeln!(out);
1055 let _ = writeln!(out, "cold downloads:");
1056 let url_w = r.cold_downloads.iter().map(|c| c.url.len()).max().unwrap_or(20);
1057 for c in &r.cold_downloads {
1058 let state = if c.success { "ok" } else { "fail" };
1059 let bytes = human_bytes(c.bytes_downloaded);
1060 let code = c
1061 .status_code
1062 .map(|s| format!(" HTTP {s}"))
1063 .unwrap_or_default();
1064 let _ = writeln!(
1065 out,
1066 " {:<url_w$} {:<4} {:>6} ms {:>9}{}{}",
1067 c.url,
1068 state,
1069 c.elapsed_ms,
1070 bytes,
1071 code,
1072 c.error
1073 .as_deref()
1074 .map(|e| format!(" ({e})"))
1075 .unwrap_or_default(),
1076 url_w = url_w
1077 );
1078 }
1079 }
1080 if let Some(res) = &r.resolution {
1081 let _ = writeln!(out);
1082 match res {
1083 Resolution::Feed { owner, topic, resolved_reference } => {
1084 let _ = writeln!(
1085 out,
1086 "resolved feed owner={owner} topic={topic} -> {resolved_reference}",
1087 );
1088 }
1089 }
1090 }
1091 if let Some(stats) = &r.chunk_stats {
1092 let _ = writeln!(out);
1093 let _ = writeln!(out, "chunk stats per vantage:");
1094 let url_w = stats
1095 .per_vantage
1096 .keys()
1097 .map(|k| k.len())
1098 .max()
1099 .unwrap_or(20);
1100 for (url, row) in &stats.per_vantage {
1101 let _ = writeln!(
1102 out,
1103 " {:<url_w$} found {:>3}/{:<3} p50 {:>5} ms · p95 {:>5} ms · max {:>5} ms",
1104 url,
1105 row.found,
1106 row.total,
1107 fmt_ms(row.elapsed_p50_ms),
1108 fmt_ms(row.elapsed_p95_ms),
1109 fmt_ms(row.elapsed_max_ms),
1110 url_w = url_w
1111 );
1112 }
1113 if !stats.per_neighborhood.is_empty() {
1114 let _ = writeln!(out);
1115 let _ = writeln!(out, "chunk stats per neighborhood:");
1116 let mut rows: Vec<(&String, &ChunkStatRow)> =
1117 stats.per_neighborhood.iter().collect();
1118 rows.sort_by(|a, b| {
1119 b.1.elapsed_p95_ms
1120 .unwrap_or(0)
1121 .cmp(&a.1.elapsed_p95_ms.unwrap_or(0))
1122 });
1123 for (nb, row) in rows.iter().take(10) {
1124 let _ = writeln!(
1125 out,
1126 " nb {} found {:>3}/{:<3} p50 {:>5} ms · p95 {:>5} ms",
1127 nb,
1128 row.found,
1129 row.total,
1130 fmt_ms(row.elapsed_p50_ms),
1131 fmt_ms(row.elapsed_p95_ms),
1132 );
1133 }
1134 if rows.len() > 10 {
1135 let _ = writeln!(out, " ... {} more neighborhoods", rows.len() - 10);
1136 }
1137 }
1138 }
1139 if let Some(chunks) = &r.chunks {
1140 let _ = writeln!(out);
1141 let _ = writeln!(out, "chunks: {} probed", chunks.len());
1142 let mut missing = 0usize;
1143 for c in chunks {
1144 let missing_in: Vec<String> = c
1145 .per_vantage
1146 .iter()
1147 .filter(|(_, cv)| !cv.found)
1148 .map(|(u, cv)| match cv.proximity {
1149 Some(p) => format!("{u} (PO {p})"),
1150 None => u.clone(),
1151 })
1152 .collect();
1153 if !missing_in.is_empty() {
1154 missing += 1;
1155 let _ = writeln!(
1156 out,
1157 " [{}] {} missing on: {}",
1158 c.neighborhood,
1159 short(&c.address),
1160 missing_in.join(", ")
1161 );
1162 }
1163 }
1164 if missing == 0 {
1165 let _ = writeln!(out, " all chunks present on all vantages");
1166 } else {
1167 let _ = writeln!(out, " {missing} chunk(s) missing on at least one vantage");
1168 }
1169 }
1170 out
1171}
1172
1173fn vantage_meta(v: &VantageResult) -> String {
1178 let mut parts: Vec<String> = Vec::new();
1179 if let Some(o) = &v.overlay {
1180 let neigh = o.chars().take(2).collect::<String>();
1181 let short_overlay = short_overlay(o);
1182 parts.push(format!("overlay {short_overlay} (nb {neigh})"));
1183 }
1184 if let Some(p) = v.proximity_to_root {
1185 parts.push(format!("PO {p}"));
1186 }
1187 if let Some(p) = v.target_proximity {
1188 parts.push(format!("tgtPO {p}"));
1189 }
1190 if let Some(ver) = &v.bee_version {
1191 parts.push(format!("v{ver}"));
1192 }
1193 if parts.is_empty() {
1194 String::new()
1195 } else {
1196 format!("· {}", parts.join(" · "))
1197 }
1198}
1199
1200fn fmt_ms(v: Option<u64>) -> String {
1201 match v {
1202 Some(ms) => format!("{ms}"),
1203 None => "—".to_string(),
1204 }
1205}
1206
1207fn short_overlay(hex: &str) -> String {
1208 let s = hex.strip_prefix("0x").unwrap_or(hex);
1209 if s.len() > 12 {
1210 format!("{}…{}", &s[..6], &s[s.len() - 4..])
1211 } else {
1212 s.to_string()
1213 }
1214}
1215
1216pub fn render_stamp_status(s: &StampStatus) -> String {
1219 use std::fmt::Write;
1220 let mut out = String::new();
1221 let ttl = if s.batch_ttl < 0 {
1222 "unknown".to_string()
1223 } else {
1224 humanize_secs(s.batch_ttl)
1225 };
1226 let header = if s.healthy { "stamp OK" } else { "stamp warning" };
1227 let _ = writeln!(
1228 out,
1229 "{header}: batch {} · usable={} · ttl={}",
1230 short_overlay(&s.batch_id),
1231 s.usable,
1232 ttl,
1233 );
1234 for w in &s.warnings {
1235 let _ = writeln!(out, " · {w}");
1236 }
1237 out
1238}
1239
1240fn short(hex: &str) -> String {
1241 if hex.len() > 16 {
1242 format!("{}…{}", &hex[..8], &hex[hex.len() - 4..])
1243 } else {
1244 hex.to_string()
1245 }
1246}