1use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use anyhow::{Context, Result, anyhow};
12use bee::Client;
13use bee::manifest::{is_null_address, unmarshal};
14use bee::swarm::gsoc::proximity;
15use bee::swarm::{BatchId, EthAddress, Reference, Topic};
16use futures::stream::{FuturesUnordered, StreamExt};
17use reqwest::Method;
18use serde::{Deserialize, Serialize};
19use tokio::sync::Semaphore;
20use tracing::{debug, info, trace};
21
22pub const DEFAULT_GATEWAY: &str = "https://api.gateway.ethswarm.org";
28
29const MAX_CHUNKS: usize = 1000;
30const STAMP_LOW_TTL_SECS: i64 = 86_400;
33
34#[derive(Copy, Clone, Debug)]
35pub enum OutputFormat {
36 Text,
37 Json,
38}
39
40#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
42#[serde(rename_all = "snake_case")]
43pub enum Status {
44 Retrievable,
45 Unretrievable,
46 Partial,
47 Error,
48}
49
50#[derive(Debug, Serialize, Deserialize)]
51pub struct Report {
52 pub reference: String,
53 pub status: Status,
54 pub vantages: Vec<VantageResult>,
55 #[serde(default, skip_serializing_if = "Vec::is_empty")]
58 pub gateways: Vec<GatewayResult>,
59 #[serde(default, skip_serializing_if = "Option::is_none")]
62 pub resolution: Option<Resolution>,
63 #[serde(skip_serializing_if = "Option::is_none")]
65 pub chunks: Option<Vec<ChunkProbe>>,
66 #[serde(skip_serializing_if = "Option::is_none")]
69 pub chunk_stats: Option<ChunkStats>,
70 #[serde(default, skip_serializing_if = "Vec::is_empty")]
75 pub cold_downloads: Vec<ColdDownloadResult>,
76 pub spec_version: u32,
77}
78
79#[derive(Debug, Serialize, Deserialize)]
83pub struct ChunkStats {
84 pub per_vantage: BTreeMap<String, ChunkStatRow>,
85 pub per_neighborhood: BTreeMap<String, ChunkStatRow>,
87}
88
89#[derive(Debug, Serialize, Deserialize, Clone)]
90pub struct ChunkStatRow {
91 pub total: usize,
92 pub found: usize,
93 pub missing: usize,
94 #[serde(skip_serializing_if = "Option::is_none")]
97 pub elapsed_p50_ms: Option<u64>,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 pub elapsed_p95_ms: Option<u64>,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub elapsed_max_ms: Option<u64>,
102}
103
104#[derive(Debug, Serialize, Deserialize)]
109pub struct GatewayResult {
110 pub url: String,
111 pub retrievable: Option<bool>,
114 pub elapsed_ms: u64,
115 #[serde(skip_serializing_if = "Option::is_none")]
117 pub status_code: Option<u16>,
118 #[serde(skip_serializing_if = "Option::is_none")]
119 pub error: Option<String>,
120}
121
122#[derive(Debug, Serialize, Deserialize)]
125#[serde(tag = "kind", rename_all = "snake_case")]
126pub enum Resolution {
127 Feed {
130 owner: String,
131 topic: String,
132 resolved_reference: String,
134 },
135}
136
137#[derive(Debug, Serialize, Deserialize)]
138pub struct VantageResult {
139 pub bee_url: String,
140 pub retrievable: Option<bool>,
142 pub elapsed_ms: u64,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 pub error: Option<String>,
145 #[serde(skip_serializing_if = "Option::is_none")]
148 pub overlay: Option<String>,
149 #[serde(skip_serializing_if = "Option::is_none")]
151 pub bee_version: Option<String>,
152 #[serde(skip_serializing_if = "Option::is_none")]
156 pub proximity_to_root: Option<u32>,
157 #[serde(skip_serializing_if = "Option::is_none")]
161 pub target_proximity: Option<u32>,
162}
163
164#[derive(Debug, Serialize, Deserialize)]
165pub struct ChunkProbe {
166 pub address: String,
167 pub neighborhood: String,
169 pub per_vantage: BTreeMap<String, ChunkVantage>,
170}
171
172#[derive(Debug, Serialize, Deserialize)]
173pub struct ChunkVantage {
174 pub found: bool,
175 pub elapsed_ms: u64,
176 #[serde(skip_serializing_if = "Option::is_none")]
177 pub error: Option<String>,
178 #[serde(skip_serializing_if = "Option::is_none")]
181 pub proximity: Option<u32>,
182}
183
184#[derive(Debug, Serialize, Deserialize)]
190pub struct ColdDownloadResult {
191 pub url: String,
192 pub endpoint: String,
195 pub success: bool,
198 pub bytes_downloaded: u64,
200 pub elapsed_ms: u64,
201 #[serde(skip_serializing_if = "Option::is_none")]
202 pub status_code: Option<u16>,
203 #[serde(skip_serializing_if = "Option::is_none")]
204 pub error: Option<String>,
205}
206
207pub struct ReseedRequest {
208 pub reference: String,
209 pub bee_url: String,
210 pub batch_id: String,
211 pub timeout: Duration,
212}
213
214#[derive(Debug, Serialize, Deserialize)]
218pub struct StampStatus {
219 pub batch_id: String,
220 pub exists: bool,
221 pub usable: bool,
222 pub batch_ttl: i64,
223 pub healthy: bool,
226 #[serde(skip_serializing_if = "Vec::is_empty")]
227 pub warnings: Vec<String>,
228}
229
230const SPEC_VERSION: u32 = 1;
231
232fn parse_reference(s: &str) -> Result<Reference> {
233 Reference::from_hex(s).map_err(|e| anyhow!("invalid reference {s}: {e}"))
234}
235
236fn make_bee(url: &str, timeout: Duration) -> Result<Client> {
237 let http = reqwest::Client::builder()
238 .timeout(timeout)
239 .build()
240 .context("building http client")?;
241 Client::with_http_client(url, http).map_err(|e| anyhow!("invalid bee url {url}: {e}"))
242}
243
244pub async fn check_multi_vantage(
251 reference: &str,
252 bees: &[String],
253 timeout: Duration,
254) -> Result<Report> {
255 let r = parse_reference(reference)?;
256 let root_bytes = first_32(&r);
257 info!(
258 reference = reference,
259 vantages = bees.len(),
260 timeout_secs = timeout.as_secs(),
261 "starting multi-vantage probe",
262 );
263
264 let mut futs = FuturesUnordered::new();
265 for bee_url in bees {
266 let bee_url = bee_url.clone();
267 let r = r.clone();
268 futs.push(async move {
269 debug!(bee_url = %bee_url, "probing vantage");
270 let bee = match make_bee(&bee_url, timeout) {
271 Ok(b) => b,
272 Err(e) => {
273 return VantageResult {
274 bee_url,
275 retrievable: None,
276 elapsed_ms: 0,
277 error: Some(format!("{e:#}")),
278 overlay: None,
279 bee_version: None,
280 proximity_to_root: None,
281 target_proximity: None,
282 };
283 }
284 };
285 let started = Instant::now();
286 let api = bee.api();
287 let debug = bee.debug();
288 let (stew_res, addr_res, health_res) = tokio::join!(
289 api.is_retrievable(&r),
290 debug.addresses(),
291 debug.health(),
292 );
293 let elapsed_ms = started.elapsed().as_millis() as u64;
294
295 let overlay = addr_res.ok().map(|a| a.overlay);
296 let bee_version = health_res.ok().map(|h| h.version);
297 let proximity_to_root = overlay
298 .as_deref()
299 .and_then(decode_overlay)
300 .map(|o| proximity(&o, &root_bytes));
301
302 match stew_res {
303 Ok(ok) => {
304 debug!(
305 bee_url = %bee_url,
306 retrievable = ok,
307 elapsed_ms,
308 overlay = overlay.as_deref().unwrap_or("?"),
309 "vantage done",
310 );
311 VantageResult {
312 bee_url,
313 retrievable: Some(ok),
314 elapsed_ms,
315 error: None,
316 overlay,
317 bee_version,
318 proximity_to_root,
319 target_proximity: None,
320 }
321 }
322 Err(e) => {
323 debug!(bee_url = %bee_url, error = %e, "vantage errored");
324 VantageResult {
325 bee_url,
326 retrievable: None,
327 elapsed_ms,
328 error: Some(format!("{e}")),
329 overlay,
330 bee_version,
331 proximity_to_root,
332 target_proximity: None,
333 }
334 }
335 }
336 });
337 }
338
339 let mut vantages = Vec::with_capacity(bees.len());
340 while let Some(v) = futs.next().await {
341 vantages.push(v);
342 }
343 vantages.sort_by(|a, b| a.bee_url.cmp(&b.bee_url));
344
345 let status = aggregate_status(&vantages, &[]);
346 Ok(Report {
347 reference: reference.to_string(),
348 status,
349 vantages,
350 gateways: Vec::new(),
351 resolution: None,
352 chunks: None,
353 chunk_stats: None,
354 cold_downloads: Vec::new(),
355 spec_version: SPEC_VERSION,
356 })
357}
358
359pub async fn check_gateways(
364 reference: &str,
365 gateway_urls: &[String],
366 timeout: Duration,
367) -> Result<Vec<GatewayResult>> {
368 let _ = parse_reference(reference)?;
371 if !gateway_urls.is_empty() {
372 info!(count = gateway_urls.len(), "probing gateways");
373 }
374
375 let http = reqwest::Client::builder()
376 .timeout(timeout)
377 .build()
378 .context("building http client for gateway probes")?;
379
380 let mut futs = FuturesUnordered::new();
381 for base in gateway_urls {
382 let base = base.clone();
383 let reference = reference.to_string();
384 let http = http.clone();
385 futs.push(async move {
386 let url = build_gateway_url(&base, &reference);
387 let started = Instant::now();
388 let res = http.request(Method::HEAD, &url).send().await;
389 let elapsed_ms = started.elapsed().as_millis() as u64;
390 match res {
391 Ok(resp) => {
392 let status = resp.status().as_u16();
393 GatewayResult {
394 url: base,
395 retrievable: Some(resp.status().is_success()),
396 elapsed_ms,
397 status_code: Some(status),
398 error: None,
399 }
400 }
401 Err(e) => GatewayResult {
402 url: base,
403 retrievable: None,
404 elapsed_ms,
405 status_code: None,
406 error: Some(format!("{e}")),
407 },
408 }
409 });
410 }
411 let mut out = Vec::with_capacity(gateway_urls.len());
412 while let Some(g) = futs.next().await {
413 out.push(g);
414 }
415 out.sort_by(|a, b| a.url.cmp(&b.url));
416 Ok(out)
417}
418
419pub fn merge_gateways(mut report: Report, gateways: Vec<GatewayResult>) -> Report {
423 report.gateways = gateways;
424 report.status = aggregate_status(&report.vantages, &report.gateways);
425 report
426}
427
428fn build_gateway_url(base: &str, reference: &str) -> String {
429 let trimmed = base.trim_end_matches('/');
430 format!("{trimmed}/bzz/{reference}/")
431}
432
433fn build_bytes_url(base: &str, reference: &str) -> String {
434 let trimmed = base.trim_end_matches('/');
435 format!("{trimmed}/bytes/{reference}")
436}
437
438pub async fn cold_download_all(
446 bee_urls: &[String],
447 gateway_urls: &[String],
448 reference: &str,
449 timeout: Duration,
450) -> Result<Vec<ColdDownloadResult>> {
451 let _ = parse_reference(reference)?;
452 info!(
453 bee_count = bee_urls.len(),
454 gateway_count = gateway_urls.len(),
455 "starting cold-download probes",
456 );
457
458 let http = reqwest::Client::builder()
459 .timeout(timeout)
460 .build()
461 .context("building http client for cold-download probes")?;
462
463 let mut targets: Vec<(String, String, String)> =
465 Vec::with_capacity(bee_urls.len() + gateway_urls.len());
466 for url in bee_urls {
467 targets.push((
468 url.clone(),
469 "/bytes/{ref}".to_string(),
470 build_bytes_url(url, reference),
471 ));
472 }
473 for url in gateway_urls {
474 targets.push((
475 url.clone(),
476 "/bzz/{ref}/".to_string(),
477 build_gateway_url(url, reference),
478 ));
479 }
480
481 let mut futs = FuturesUnordered::new();
482 for (base, endpoint, probe_url) in targets {
483 let http = http.clone();
484 futs.push(async move { cold_probe(&http, base, endpoint, probe_url).await });
485 }
486
487 let mut out = Vec::with_capacity(bee_urls.len() + gateway_urls.len());
488 while let Some(c) = futs.next().await {
489 out.push(c);
490 }
491 out.sort_by(|a, b| a.url.cmp(&b.url));
492 Ok(out)
493}
494
495async fn cold_probe(
496 http: &reqwest::Client,
497 base: String,
498 endpoint: String,
499 probe_url: String,
500) -> ColdDownloadResult {
501 debug!(url = %probe_url, "cold-download GET");
502 let started = Instant::now();
503 let res = http.get(&probe_url).send().await;
504 match res {
505 Ok(mut resp) => {
506 let status = resp.status();
507 let status_code = status.as_u16();
508 if !status.is_success() {
509 return ColdDownloadResult {
510 url: base,
511 endpoint,
512 success: false,
513 bytes_downloaded: 0,
514 elapsed_ms: started.elapsed().as_millis() as u64,
515 status_code: Some(status_code),
516 error: Some(format!("HTTP {status_code}")),
517 };
518 }
519 let mut bytes: u64 = 0;
520 let mut err: Option<String> = None;
521 loop {
522 match resp.chunk().await {
523 Ok(Some(chunk)) => bytes += chunk.len() as u64,
524 Ok(None) => break,
525 Err(e) => {
526 err = Some(format!("stream error after {bytes} bytes: {e}"));
527 break;
528 }
529 }
530 }
531 ColdDownloadResult {
532 url: base,
533 endpoint,
534 success: err.is_none(),
535 bytes_downloaded: bytes,
536 elapsed_ms: started.elapsed().as_millis() as u64,
537 status_code: Some(status_code),
538 error: err,
539 }
540 }
541 Err(e) => ColdDownloadResult {
542 url: base,
543 endpoint,
544 success: false,
545 bytes_downloaded: 0,
546 elapsed_ms: started.elapsed().as_millis() as u64,
547 status_code: None,
548 error: Some(format!("{e}")),
549 },
550 }
551}
552
553pub async fn resolve_feed(
558 bee_url: &str,
559 owner_hex: &str,
560 topic_hex: &str,
561 timeout: Duration,
562) -> Result<(String, Resolution)> {
563 let bee = make_bee(bee_url, timeout)?;
564 let owner = EthAddress::from_hex(owner_hex)
565 .map_err(|e| anyhow!("invalid feed owner {owner_hex}: {e}"))?;
566 let topic = Topic::from_hex(topic_hex)
567 .map_err(|e| anyhow!("invalid feed topic {topic_hex}: {e}"))?;
568 let reference = bee
569 .file()
570 .get_feed_lookup(&owner, &topic)
571 .await
572 .map_err(anyhow::Error::from)?;
573 let r_hex = reference.to_hex();
574 Ok((
575 r_hex.clone(),
576 Resolution::Feed {
577 owner: owner.to_hex(),
578 topic: topic.to_hex(),
579 resolved_reference: r_hex,
580 },
581 ))
582}
583
584pub fn parse_input(input: &str) -> ParsedInput {
590 if let Some(rest) = input.strip_prefix("feed:") {
591 let parts: Vec<&str> = rest.splitn(2, [':', '/']).collect();
593 if parts.len() == 2 {
594 return ParsedInput::Feed {
595 owner: parts[0].to_string(),
596 topic: parts[1].to_string(),
597 };
598 }
599 }
600 ParsedInput::Reference(input.to_string())
601}
602
603pub enum ParsedInput {
605 Reference(String),
608 Feed { owner: String, topic: String },
611}
612
613fn decode_overlay(hex: &str) -> Option<[u8; 32]> {
617 let s = hex.strip_prefix("0x").unwrap_or(hex);
618 if s.len() < 64 {
619 return None;
620 }
621 let mut out = [0u8; 32];
622 for (i, b) in out.iter_mut().enumerate() {
623 let h = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16).ok()?;
624 *b = h;
625 }
626 Some(out)
627}
628
629fn first_32(r: &Reference) -> [u8; 32] {
630 let mut out = [0u8; 32];
631 out.copy_from_slice(&r.as_bytes()[..32]);
632 out
633}
634
635fn aggregate_status(vantages: &[VantageResult], gateways: &[GatewayResult]) -> Status {
636 let outcomes: Vec<Option<bool>> = vantages
637 .iter()
638 .map(|v| v.retrievable)
639 .chain(gateways.iter().map(|g| g.retrievable))
640 .collect();
641 let total = outcomes.len();
642 if total == 0 {
643 return Status::Error;
644 }
645 let retr = outcomes.iter().filter(|o| **o == Some(true)).count();
646 let unret = outcomes.iter().filter(|o| **o == Some(false)).count();
647 let err = outcomes.iter().filter(|o| o.is_none()).count();
648 if err == total {
649 Status::Error
650 } else if retr == total {
651 Status::Retrievable
652 } else if retr == 0 && unret + err == total {
653 Status::Unretrievable
654 } else {
655 Status::Partial
656 }
657}
658
659pub async fn drill_down(
665 mut report: Report,
666 bees: &[String],
667 timeout: Duration,
668 concurrency: usize,
669) -> Result<Report> {
670 let r = parse_reference(&report.reference)?;
671 let walker_bee = bees.first().context("no bee URL for drill-down")?;
673 let walker = make_bee(walker_bee, timeout)?;
674
675 let addresses = collect_chunk_addresses(&walker, &r).await?;
676 info!(chunks = addresses.len(), concurrency, "drill-down probing chunks");
677
678 let overlays: BTreeMap<String, [u8; 32]> = report
681 .vantages
682 .iter()
683 .filter_map(|v| {
684 v.overlay
685 .as_deref()
686 .and_then(decode_overlay)
687 .map(|o| (v.bee_url.clone(), o))
688 })
689 .collect();
690
691 let clients: Vec<(String, Client)> = bees
692 .iter()
693 .map(|u| make_bee(u, timeout).map(|b| (u.clone(), b)))
694 .collect::<Result<_>>()?;
695
696 let sem = Arc::new(Semaphore::new(concurrency.max(1)));
697 let mut probes = Vec::with_capacity(addresses.len());
698
699 let mut futs = FuturesUnordered::new();
700 for addr in addresses {
701 let sem = sem.clone();
702 let clients = clients.clone();
703 let overlays = overlays.clone();
704 futs.push(async move {
705 let chunk_bytes = first_32_of_ref(&addr);
706 let mut per_vantage = BTreeMap::new();
707 for (url, bee) in &clients {
708 let _permit = sem.acquire().await.expect("semaphore not closed");
709 let started = Instant::now();
710 let res = bee.file().download_chunk(&addr, None).await;
711 let elapsed_ms = started.elapsed().as_millis() as u64;
712 let prox = overlays
713 .get(url)
714 .map(|o| proximity(o, &chunk_bytes));
715 trace!(
716 chunk = %addr.to_hex(),
717 bee_url = %url,
718 found = res.is_ok(),
719 elapsed_ms,
720 "chunk probe",
721 );
722 let cv = match res {
723 Ok(_) => ChunkVantage {
724 found: true,
725 elapsed_ms,
726 error: None,
727 proximity: prox,
728 },
729 Err(e) => ChunkVantage {
730 found: false,
731 elapsed_ms,
732 error: Some(format!("{e}")),
733 proximity: prox,
734 },
735 };
736 per_vantage.insert(url.clone(), cv);
737 }
738 let hex = addr.to_hex();
739 let neighborhood = hex.chars().take(2).collect::<String>();
740 ChunkProbe {
741 address: hex,
742 neighborhood,
743 per_vantage,
744 }
745 });
746 }
747
748 while let Some(p) = futs.next().await {
749 probes.push(p);
750 }
751 probes.sort_by(|a, b| a.address.cmp(&b.address));
752 report.chunk_stats = Some(compute_chunk_stats(&probes));
753 report.chunks = Some(probes);
754 Ok(report)
755}
756
757pub fn compute_chunk_stats(probes: &[ChunkProbe]) -> ChunkStats {
761 let mut per_vantage: BTreeMap<String, (Vec<u64>, usize, usize)> = BTreeMap::new();
763 let mut per_neighborhood: BTreeMap<String, (Vec<u64>, usize, usize)> = BTreeMap::new();
764
765 for p in probes {
766 for (url, cv) in &p.per_vantage {
767 let entry = per_vantage.entry(url.clone()).or_default();
768 if cv.found {
769 entry.0.push(cv.elapsed_ms);
770 entry.1 += 1;
771 } else {
772 entry.2 += 1;
773 }
774 let n = per_neighborhood.entry(p.neighborhood.clone()).or_default();
775 if cv.found {
776 n.0.push(cv.elapsed_ms);
777 n.1 += 1;
778 } else {
779 n.2 += 1;
780 }
781 }
782 }
783
784 let to_row = |(latencies, found, missing): (Vec<u64>, usize, usize)| -> ChunkStatRow {
785 let mut sorted = latencies;
786 sorted.sort_unstable();
787 let p = |q: f64| -> Option<u64> {
788 if sorted.is_empty() {
789 None
790 } else {
791 let idx = ((sorted.len() as f64 - 1.0) * q).round() as usize;
792 Some(sorted[idx])
793 }
794 };
795 ChunkStatRow {
796 total: found + missing,
797 found,
798 missing,
799 elapsed_p50_ms: p(0.50),
800 elapsed_p95_ms: p(0.95),
801 elapsed_max_ms: sorted.last().copied(),
802 }
803 };
804
805 ChunkStats {
806 per_vantage: per_vantage.into_iter().map(|(k, v)| (k, to_row(v))).collect(),
807 per_neighborhood: per_neighborhood
808 .into_iter()
809 .map(|(k, v)| (k, to_row(v)))
810 .collect(),
811 }
812}
813
814pub fn annotate_target_overlay(mut report: Report, target_overlay_hex: &str) -> Report {
819 let Some(target) = decode_overlay(target_overlay_hex) else {
820 return report;
821 };
822 for v in &mut report.vantages {
823 if let Some(o) = v.overlay.as_deref().and_then(decode_overlay) {
824 v.target_proximity = Some(proximity(&o, &target));
825 }
826 }
827 report
828 .vantages
829 .sort_by(|a, b| b.target_proximity.cmp(&a.target_proximity));
830 report
831}
832
833fn first_32_of_ref(r: &Reference) -> [u8; 32] {
834 let mut out = [0u8; 32];
835 out.copy_from_slice(&r.as_bytes()[..32]);
836 out
837}
838
839async fn collect_chunk_addresses(bee: &Client, root: &Reference) -> Result<Vec<Reference>> {
844 let mut addresses: Vec<Reference> = vec![root.clone()];
845 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
846 seen.insert(root.to_hex());
847
848 let mut queue: std::collections::VecDeque<Reference> = std::collections::VecDeque::new();
849 queue.push_back(root.clone());
850
851 while let Some(addr) = queue.pop_front() {
852 if addresses.len() >= MAX_CHUNKS {
853 break;
854 }
855 let bytes = match bee.file().download_chunk(&addr, None).await {
856 Ok(b) => b,
857 Err(_) => continue,
860 };
861 let Ok(node) = unmarshal(&bytes, addr.as_bytes()) else {
862 continue;
865 };
866 if !is_null_address(&node.target_address) {
868 if let Ok(r) = Reference::new(&node.target_address) {
869 if seen.insert(r.to_hex()) {
870 addresses.push(r);
871 }
872 }
873 }
874 for fork in node.forks.values() {
876 if let Some(sa) = fork.node.self_address {
877 if let Ok(r) = Reference::new(&sa) {
878 if seen.insert(r.to_hex()) {
879 addresses.push(r.clone());
880 queue.push_back(r);
881 }
882 }
883 }
884 }
885 }
886 Ok(addresses)
887}
888
889pub async fn reseed(req: ReseedRequest) -> Result<()> {
890 let bee = make_bee(&req.bee_url, req.timeout)?;
891 let r = parse_reference(&req.reference)?;
892 let batch = BatchId::from_hex(&req.batch_id)
893 .map_err(|e| anyhow!("invalid batch id {}: {e}", req.batch_id))?;
894 info!(
895 bee_url = %req.bee_url,
896 batch = %req.batch_id,
897 "re-uploading via PUT /stewardship/{{ref}}",
898 );
899 bee.api().reupload(&r, &batch).await?;
900 Ok(())
901}
902
903pub async fn check_stamp(
908 bee_url: &str,
909 batch_id: &str,
910 timeout: Duration,
911) -> Result<StampStatus> {
912 let bee = make_bee(bee_url, timeout)?;
913 let batch = BatchId::from_hex(batch_id)
914 .map_err(|e| anyhow!("invalid batch id {batch_id}: {e}"))?;
915 let pb = bee
916 .postage()
917 .get_postage_batch(&batch)
918 .await
919 .map_err(anyhow::Error::from)?;
920
921 let mut warnings = Vec::new();
922 if !pb.exists {
923 warnings.push("batch not known to this Bee".to_string());
924 }
925 if !pb.usable {
926 warnings.push("batch not usable yet (chain may be syncing)".to_string());
927 }
928 if pb.batch_ttl >= 0 && pb.batch_ttl < STAMP_LOW_TTL_SECS {
929 warnings.push(format!(
930 "batch TTL low: ~{} (re-seed may not outlive the batch)",
931 humanize_secs(pb.batch_ttl)
932 ));
933 }
934
935 let healthy = pb.exists && pb.usable && (pb.batch_ttl < 0 || pb.batch_ttl >= STAMP_LOW_TTL_SECS);
936
937 Ok(StampStatus {
938 batch_id: batch_id.to_string(),
939 exists: pb.exists,
940 usable: pb.usable,
941 batch_ttl: pb.batch_ttl,
942 healthy,
943 warnings,
944 })
945}
946
947fn human_bytes(b: u64) -> String {
948 const KIB: u64 = 1024;
949 const MIB: u64 = KIB * 1024;
950 const GIB: u64 = MIB * 1024;
951 if b >= GIB {
952 format!("{:.2} GiB", b as f64 / GIB as f64)
953 } else if b >= MIB {
954 format!("{:.2} MiB", b as f64 / MIB as f64)
955 } else if b >= KIB {
956 format!("{:.2} KiB", b as f64 / KIB as f64)
957 } else {
958 format!("{b} B")
959 }
960}
961
962fn humanize_secs(s: i64) -> String {
963 if s < 0 {
964 return "unknown".to_string();
965 }
966 let s = s as u64;
967 if s >= 86_400 {
968 format!("{} day(s)", s / 86_400)
969 } else if s >= 3_600 {
970 format!("{} hour(s)", s / 3_600)
971 } else if s >= 60 {
972 format!("{} min", s / 60)
973 } else {
974 format!("{}s", s)
975 }
976}
977
978pub fn render_report(report: &Report, fmt: OutputFormat) -> String {
979 match fmt {
980 OutputFormat::Json => {
981 serde_json::to_string_pretty(report).expect("report serialization") + "\n"
982 }
983 OutputFormat::Text => render_text(report),
984 }
985}
986
987fn render_text(r: &Report) -> String {
988 use std::fmt::Write;
989 let mut out = String::new();
990 let _ = writeln!(out, "ref {}", r.reference);
991 let _ = writeln!(out, "status {:?}", r.status);
992 let _ = writeln!(out);
993 let _ = writeln!(out, "vantages:");
994 let url_w = r.vantages.iter().map(|v| v.bee_url.len()).max().unwrap_or(20);
995 for v in &r.vantages {
996 let state = match (v.retrievable, &v.error) {
997 (Some(true), _) => "retrievable",
998 (Some(false), _) => "unretrievable",
999 (None, Some(_)) => "error",
1000 (None, None) => "unknown",
1001 };
1002 let meta = vantage_meta(v);
1003 let _ = writeln!(
1004 out,
1005 " {:<url_w$} {:<14} {:>6} ms{}{}",
1006 v.bee_url,
1007 state,
1008 v.elapsed_ms,
1009 if meta.is_empty() { String::new() } else { format!(" {meta}") },
1010 v.error
1011 .as_deref()
1012 .map(|e| format!(" ({e})"))
1013 .unwrap_or_default(),
1014 url_w = url_w
1015 );
1016 }
1017 if !r.gateways.is_empty() {
1018 let _ = writeln!(out);
1019 let _ = writeln!(out, "gateways:");
1020 let url_w = r.gateways.iter().map(|g| g.url.len()).max().unwrap_or(20);
1021 for g in &r.gateways {
1022 let state = match (g.retrievable, &g.error) {
1023 (Some(true), _) => "retrievable",
1024 (Some(false), _) => "unretrievable",
1025 (None, _) => "error",
1026 };
1027 let code = g
1028 .status_code
1029 .map(|c| format!(" HTTP {c}"))
1030 .unwrap_or_default();
1031 let _ = writeln!(
1032 out,
1033 " {:<url_w$} {:<14} {:>6} ms{}{}",
1034 g.url,
1035 state,
1036 g.elapsed_ms,
1037 code,
1038 g.error
1039 .as_deref()
1040 .map(|e| format!(" ({e})"))
1041 .unwrap_or_default(),
1042 url_w = url_w
1043 );
1044 }
1045 }
1046 if !r.cold_downloads.is_empty() {
1047 let _ = writeln!(out);
1048 let _ = writeln!(out, "cold downloads:");
1049 let url_w = r.cold_downloads.iter().map(|c| c.url.len()).max().unwrap_or(20);
1050 for c in &r.cold_downloads {
1051 let state = if c.success { "ok" } else { "fail" };
1052 let bytes = human_bytes(c.bytes_downloaded);
1053 let code = c
1054 .status_code
1055 .map(|s| format!(" HTTP {s}"))
1056 .unwrap_or_default();
1057 let _ = writeln!(
1058 out,
1059 " {:<url_w$} {:<4} {:>6} ms {:>9}{}{}",
1060 c.url,
1061 state,
1062 c.elapsed_ms,
1063 bytes,
1064 code,
1065 c.error
1066 .as_deref()
1067 .map(|e| format!(" ({e})"))
1068 .unwrap_or_default(),
1069 url_w = url_w
1070 );
1071 }
1072 }
1073 if let Some(res) = &r.resolution {
1074 let _ = writeln!(out);
1075 match res {
1076 Resolution::Feed { owner, topic, resolved_reference } => {
1077 let _ = writeln!(
1078 out,
1079 "resolved feed owner={owner} topic={topic} -> {resolved_reference}",
1080 );
1081 }
1082 }
1083 }
1084 if let Some(stats) = &r.chunk_stats {
1085 let _ = writeln!(out);
1086 let _ = writeln!(out, "chunk stats per vantage:");
1087 let url_w = stats
1088 .per_vantage
1089 .keys()
1090 .map(|k| k.len())
1091 .max()
1092 .unwrap_or(20);
1093 for (url, row) in &stats.per_vantage {
1094 let _ = writeln!(
1095 out,
1096 " {:<url_w$} found {:>3}/{:<3} p50 {:>5} ms · p95 {:>5} ms · max {:>5} ms",
1097 url,
1098 row.found,
1099 row.total,
1100 fmt_ms(row.elapsed_p50_ms),
1101 fmt_ms(row.elapsed_p95_ms),
1102 fmt_ms(row.elapsed_max_ms),
1103 url_w = url_w
1104 );
1105 }
1106 if !stats.per_neighborhood.is_empty() {
1107 let _ = writeln!(out);
1108 let _ = writeln!(out, "chunk stats per neighborhood:");
1109 let mut rows: Vec<(&String, &ChunkStatRow)> =
1110 stats.per_neighborhood.iter().collect();
1111 rows.sort_by(|a, b| {
1112 b.1.elapsed_p95_ms
1113 .unwrap_or(0)
1114 .cmp(&a.1.elapsed_p95_ms.unwrap_or(0))
1115 });
1116 for (nb, row) in rows.iter().take(10) {
1117 let _ = writeln!(
1118 out,
1119 " nb {} found {:>3}/{:<3} p50 {:>5} ms · p95 {:>5} ms",
1120 nb,
1121 row.found,
1122 row.total,
1123 fmt_ms(row.elapsed_p50_ms),
1124 fmt_ms(row.elapsed_p95_ms),
1125 );
1126 }
1127 if rows.len() > 10 {
1128 let _ = writeln!(out, " ... {} more neighborhoods", rows.len() - 10);
1129 }
1130 }
1131 }
1132 if let Some(chunks) = &r.chunks {
1133 let _ = writeln!(out);
1134 let _ = writeln!(out, "chunks: {} probed", chunks.len());
1135 let mut missing = 0usize;
1136 for c in chunks {
1137 let missing_in: Vec<String> = c
1138 .per_vantage
1139 .iter()
1140 .filter(|(_, cv)| !cv.found)
1141 .map(|(u, cv)| match cv.proximity {
1142 Some(p) => format!("{u} (PO {p})"),
1143 None => u.clone(),
1144 })
1145 .collect();
1146 if !missing_in.is_empty() {
1147 missing += 1;
1148 let _ = writeln!(
1149 out,
1150 " [{}] {} missing on: {}",
1151 c.neighborhood,
1152 short(&c.address),
1153 missing_in.join(", ")
1154 );
1155 }
1156 }
1157 if missing == 0 {
1158 let _ = writeln!(out, " all chunks present on all vantages");
1159 } else {
1160 let _ = writeln!(out, " {missing} chunk(s) missing on at least one vantage");
1161 }
1162 }
1163 out
1164}
1165
1166fn vantage_meta(v: &VantageResult) -> String {
1171 let mut parts: Vec<String> = Vec::new();
1172 if let Some(o) = &v.overlay {
1173 let neigh = o.chars().take(2).collect::<String>();
1174 let short_overlay = short_overlay(o);
1175 parts.push(format!("overlay {short_overlay} (nb {neigh})"));
1176 }
1177 if let Some(p) = v.proximity_to_root {
1178 parts.push(format!("PO {p}"));
1179 }
1180 if let Some(p) = v.target_proximity {
1181 parts.push(format!("tgtPO {p}"));
1182 }
1183 if let Some(ver) = &v.bee_version {
1184 parts.push(format!("v{ver}"));
1185 }
1186 if parts.is_empty() {
1187 String::new()
1188 } else {
1189 format!("· {}", parts.join(" · "))
1190 }
1191}
1192
1193fn fmt_ms(v: Option<u64>) -> String {
1194 match v {
1195 Some(ms) => format!("{ms}"),
1196 None => "—".to_string(),
1197 }
1198}
1199
1200fn short_overlay(hex: &str) -> String {
1201 let s = hex.strip_prefix("0x").unwrap_or(hex);
1202 if s.len() > 12 {
1203 format!("{}…{}", &s[..6], &s[s.len() - 4..])
1204 } else {
1205 s.to_string()
1206 }
1207}
1208
1209pub fn render_stamp_status(s: &StampStatus) -> String {
1212 use std::fmt::Write;
1213 let mut out = String::new();
1214 let ttl = if s.batch_ttl < 0 {
1215 "unknown".to_string()
1216 } else {
1217 humanize_secs(s.batch_ttl)
1218 };
1219 let header = if s.healthy { "stamp OK" } else { "stamp warning" };
1220 let _ = writeln!(
1221 out,
1222 "{header}: batch {} · usable={} · ttl={}",
1223 short_overlay(&s.batch_id),
1224 s.usable,
1225 ttl,
1226 );
1227 for w in &s.warnings {
1228 let _ = writeln!(out, " · {w}");
1229 }
1230 out
1231}
1232
1233fn short(hex: &str) -> String {
1234 if hex.len() > 16 {
1235 format!("{}…{}", &hex[..8], &hex[hex.len() - 4..])
1236 } else {
1237 hex.to_string()
1238 }
1239}