1use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use anyhow::{Context, Result, anyhow};
12use bee::Client;
13use bee::manifest::{is_null_address, unmarshal};
14use bee::swarm::gsoc::proximity;
15use bee::swarm::{BatchId, EthAddress, Reference, Topic};
16use futures::stream::{FuturesUnordered, StreamExt};
17use reqwest::Method;
18use serde::{Deserialize, Serialize};
19use tokio::sync::Semaphore;
20
21pub const DEFAULT_GATEWAY: &str = "https://api.gateway.ethswarm.org";
27
28const MAX_CHUNKS: usize = 1000;
29const STAMP_LOW_TTL_SECS: i64 = 86_400;
32
33#[derive(Copy, Clone, Debug)]
34pub enum OutputFormat {
35 Text,
36 Json,
37}
38
39#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
41#[serde(rename_all = "snake_case")]
42pub enum Status {
43 Retrievable,
44 Unretrievable,
45 Partial,
46 Error,
47}
48
49#[derive(Debug, Serialize, Deserialize)]
50pub struct Report {
51 pub reference: String,
52 pub status: Status,
53 pub vantages: Vec<VantageResult>,
54 #[serde(default, skip_serializing_if = "Vec::is_empty")]
57 pub gateways: Vec<GatewayResult>,
58 #[serde(default, skip_serializing_if = "Option::is_none")]
61 pub resolution: Option<Resolution>,
62 #[serde(skip_serializing_if = "Option::is_none")]
64 pub chunks: Option<Vec<ChunkProbe>>,
65 pub spec_version: u32,
66}
67
68#[derive(Debug, Serialize, Deserialize)]
73pub struct GatewayResult {
74 pub url: String,
75 pub retrievable: Option<bool>,
78 pub elapsed_ms: u64,
79 #[serde(skip_serializing_if = "Option::is_none")]
81 pub status_code: Option<u16>,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 pub error: Option<String>,
84}
85
86#[derive(Debug, Serialize, Deserialize)]
89#[serde(tag = "kind", rename_all = "snake_case")]
90pub enum Resolution {
91 Feed {
94 owner: String,
95 topic: String,
96 resolved_reference: String,
98 },
99}
100
101#[derive(Debug, Serialize, Deserialize)]
102pub struct VantageResult {
103 pub bee_url: String,
104 pub retrievable: Option<bool>,
106 pub elapsed_ms: u64,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 pub error: Option<String>,
109 #[serde(skip_serializing_if = "Option::is_none")]
112 pub overlay: Option<String>,
113 #[serde(skip_serializing_if = "Option::is_none")]
115 pub bee_version: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
120 pub proximity_to_root: Option<u32>,
121}
122
123#[derive(Debug, Serialize, Deserialize)]
124pub struct ChunkProbe {
125 pub address: String,
126 pub neighborhood: String,
128 pub per_vantage: BTreeMap<String, ChunkVantage>,
129}
130
131#[derive(Debug, Serialize, Deserialize)]
132pub struct ChunkVantage {
133 pub found: bool,
134 pub elapsed_ms: u64,
135 #[serde(skip_serializing_if = "Option::is_none")]
136 pub error: Option<String>,
137 #[serde(skip_serializing_if = "Option::is_none")]
140 pub proximity: Option<u32>,
141}
142
143pub struct ReseedRequest {
144 pub reference: String,
145 pub bee_url: String,
146 pub batch_id: String,
147 pub timeout: Duration,
148}
149
150#[derive(Debug, Serialize, Deserialize)]
154pub struct StampStatus {
155 pub batch_id: String,
156 pub exists: bool,
157 pub usable: bool,
158 pub batch_ttl: i64,
159 pub healthy: bool,
162 #[serde(skip_serializing_if = "Vec::is_empty")]
163 pub warnings: Vec<String>,
164}
165
166const SPEC_VERSION: u32 = 1;
167
168fn parse_reference(s: &str) -> Result<Reference> {
169 Reference::from_hex(s).map_err(|e| anyhow!("invalid reference {s}: {e}"))
170}
171
172fn make_bee(url: &str, timeout: Duration) -> Result<Client> {
173 let http = reqwest::Client::builder()
174 .timeout(timeout)
175 .build()
176 .context("building http client")?;
177 Client::with_http_client(url, http).map_err(|e| anyhow!("invalid bee url {url}: {e}"))
178}
179
180pub async fn check_multi_vantage(
187 reference: &str,
188 bees: &[String],
189 timeout: Duration,
190) -> Result<Report> {
191 let r = parse_reference(reference)?;
192 let root_bytes = first_32(&r);
193
194 let mut futs = FuturesUnordered::new();
195 for bee_url in bees {
196 let bee_url = bee_url.clone();
197 let r = r.clone();
198 futs.push(async move {
199 let bee = match make_bee(&bee_url, timeout) {
200 Ok(b) => b,
201 Err(e) => {
202 return VantageResult {
203 bee_url,
204 retrievable: None,
205 elapsed_ms: 0,
206 error: Some(format!("{e:#}")),
207 overlay: None,
208 bee_version: None,
209 proximity_to_root: None,
210 };
211 }
212 };
213 let started = Instant::now();
214 let api = bee.api();
215 let debug = bee.debug();
216 let (stew_res, addr_res, health_res) = tokio::join!(
217 api.is_retrievable(&r),
218 debug.addresses(),
219 debug.health(),
220 );
221 let elapsed_ms = started.elapsed().as_millis() as u64;
222
223 let overlay = addr_res.ok().map(|a| a.overlay);
224 let bee_version = health_res.ok().map(|h| h.version);
225 let proximity_to_root = overlay
226 .as_deref()
227 .and_then(decode_overlay)
228 .map(|o| proximity(&o, &root_bytes));
229
230 match stew_res {
231 Ok(ok) => VantageResult {
232 bee_url,
233 retrievable: Some(ok),
234 elapsed_ms,
235 error: None,
236 overlay,
237 bee_version,
238 proximity_to_root,
239 },
240 Err(e) => VantageResult {
241 bee_url,
242 retrievable: None,
243 elapsed_ms,
244 error: Some(format!("{e}")),
245 overlay,
246 bee_version,
247 proximity_to_root,
248 },
249 }
250 });
251 }
252
253 let mut vantages = Vec::with_capacity(bees.len());
254 while let Some(v) = futs.next().await {
255 vantages.push(v);
256 }
257 vantages.sort_by(|a, b| a.bee_url.cmp(&b.bee_url));
258
259 let status = aggregate_status(&vantages, &[]);
260 Ok(Report {
261 reference: reference.to_string(),
262 status,
263 vantages,
264 gateways: Vec::new(),
265 resolution: None,
266 chunks: None,
267 spec_version: SPEC_VERSION,
268 })
269}
270
271pub async fn check_gateways(
276 reference: &str,
277 gateway_urls: &[String],
278 timeout: Duration,
279) -> Result<Vec<GatewayResult>> {
280 let _ = parse_reference(reference)?;
283
284 let http = reqwest::Client::builder()
285 .timeout(timeout)
286 .build()
287 .context("building http client for gateway probes")?;
288
289 let mut futs = FuturesUnordered::new();
290 for base in gateway_urls {
291 let base = base.clone();
292 let reference = reference.to_string();
293 let http = http.clone();
294 futs.push(async move {
295 let url = build_gateway_url(&base, &reference);
296 let started = Instant::now();
297 let res = http.request(Method::HEAD, &url).send().await;
298 let elapsed_ms = started.elapsed().as_millis() as u64;
299 match res {
300 Ok(resp) => {
301 let status = resp.status().as_u16();
302 GatewayResult {
303 url: base,
304 retrievable: Some(resp.status().is_success()),
305 elapsed_ms,
306 status_code: Some(status),
307 error: None,
308 }
309 }
310 Err(e) => GatewayResult {
311 url: base,
312 retrievable: None,
313 elapsed_ms,
314 status_code: None,
315 error: Some(format!("{e}")),
316 },
317 }
318 });
319 }
320 let mut out = Vec::with_capacity(gateway_urls.len());
321 while let Some(g) = futs.next().await {
322 out.push(g);
323 }
324 out.sort_by(|a, b| a.url.cmp(&b.url));
325 Ok(out)
326}
327
328pub fn merge_gateways(mut report: Report, gateways: Vec<GatewayResult>) -> Report {
332 report.gateways = gateways;
333 report.status = aggregate_status(&report.vantages, &report.gateways);
334 report
335}
336
337fn build_gateway_url(base: &str, reference: &str) -> String {
338 let trimmed = base.trim_end_matches('/');
339 format!("{trimmed}/bzz/{reference}/")
340}
341
342pub async fn resolve_feed(
347 bee_url: &str,
348 owner_hex: &str,
349 topic_hex: &str,
350 timeout: Duration,
351) -> Result<(String, Resolution)> {
352 let bee = make_bee(bee_url, timeout)?;
353 let owner = EthAddress::from_hex(owner_hex)
354 .map_err(|e| anyhow!("invalid feed owner {owner_hex}: {e}"))?;
355 let topic = Topic::from_hex(topic_hex)
356 .map_err(|e| anyhow!("invalid feed topic {topic_hex}: {e}"))?;
357 let reference = bee
358 .file()
359 .get_feed_lookup(&owner, &topic)
360 .await
361 .map_err(anyhow::Error::from)?;
362 let r_hex = reference.to_hex();
363 Ok((
364 r_hex.clone(),
365 Resolution::Feed {
366 owner: owner.to_hex(),
367 topic: topic.to_hex(),
368 resolved_reference: r_hex,
369 },
370 ))
371}
372
373pub fn parse_input(input: &str) -> ParsedInput {
379 if let Some(rest) = input.strip_prefix("feed:") {
380 let parts: Vec<&str> = rest.splitn(2, [':', '/']).collect();
382 if parts.len() == 2 {
383 return ParsedInput::Feed {
384 owner: parts[0].to_string(),
385 topic: parts[1].to_string(),
386 };
387 }
388 }
389 ParsedInput::Reference(input.to_string())
390}
391
392pub enum ParsedInput {
394 Reference(String),
397 Feed { owner: String, topic: String },
400}
401
402fn decode_overlay(hex: &str) -> Option<[u8; 32]> {
406 let s = hex.strip_prefix("0x").unwrap_or(hex);
407 if s.len() < 64 {
408 return None;
409 }
410 let mut out = [0u8; 32];
411 for (i, b) in out.iter_mut().enumerate() {
412 let h = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16).ok()?;
413 *b = h;
414 }
415 Some(out)
416}
417
418fn first_32(r: &Reference) -> [u8; 32] {
419 let mut out = [0u8; 32];
420 out.copy_from_slice(&r.as_bytes()[..32]);
421 out
422}
423
424fn aggregate_status(vantages: &[VantageResult], gateways: &[GatewayResult]) -> Status {
425 let outcomes: Vec<Option<bool>> = vantages
426 .iter()
427 .map(|v| v.retrievable)
428 .chain(gateways.iter().map(|g| g.retrievable))
429 .collect();
430 let total = outcomes.len();
431 if total == 0 {
432 return Status::Error;
433 }
434 let retr = outcomes.iter().filter(|o| **o == Some(true)).count();
435 let unret = outcomes.iter().filter(|o| **o == Some(false)).count();
436 let err = outcomes.iter().filter(|o| o.is_none()).count();
437 if err == total {
438 Status::Error
439 } else if retr == total {
440 Status::Retrievable
441 } else if retr == 0 && unret + err == total {
442 Status::Unretrievable
443 } else {
444 Status::Partial
445 }
446}
447
448pub async fn drill_down(
454 mut report: Report,
455 bees: &[String],
456 timeout: Duration,
457 concurrency: usize,
458) -> Result<Report> {
459 let r = parse_reference(&report.reference)?;
460 let walker_bee = bees.first().context("no bee URL for drill-down")?;
462 let walker = make_bee(walker_bee, timeout)?;
463
464 let addresses = collect_chunk_addresses(&walker, &r).await?;
465
466 let overlays: BTreeMap<String, [u8; 32]> = report
469 .vantages
470 .iter()
471 .filter_map(|v| {
472 v.overlay
473 .as_deref()
474 .and_then(decode_overlay)
475 .map(|o| (v.bee_url.clone(), o))
476 })
477 .collect();
478
479 let clients: Vec<(String, Client)> = bees
480 .iter()
481 .map(|u| make_bee(u, timeout).map(|b| (u.clone(), b)))
482 .collect::<Result<_>>()?;
483
484 let sem = Arc::new(Semaphore::new(concurrency.max(1)));
485 let mut probes = Vec::with_capacity(addresses.len());
486
487 let mut futs = FuturesUnordered::new();
488 for addr in addresses {
489 let sem = sem.clone();
490 let clients = clients.clone();
491 let overlays = overlays.clone();
492 futs.push(async move {
493 let chunk_bytes = first_32_of_ref(&addr);
494 let mut per_vantage = BTreeMap::new();
495 for (url, bee) in &clients {
496 let _permit = sem.acquire().await.expect("semaphore not closed");
497 let started = Instant::now();
498 let res = bee.file().download_chunk(&addr, None).await;
499 let elapsed_ms = started.elapsed().as_millis() as u64;
500 let prox = overlays
501 .get(url)
502 .map(|o| proximity(o, &chunk_bytes));
503 let cv = match res {
504 Ok(_) => ChunkVantage {
505 found: true,
506 elapsed_ms,
507 error: None,
508 proximity: prox,
509 },
510 Err(e) => ChunkVantage {
511 found: false,
512 elapsed_ms,
513 error: Some(format!("{e}")),
514 proximity: prox,
515 },
516 };
517 per_vantage.insert(url.clone(), cv);
518 }
519 let hex = addr.to_hex();
520 let neighborhood = hex.chars().take(2).collect::<String>();
521 ChunkProbe {
522 address: hex,
523 neighborhood,
524 per_vantage,
525 }
526 });
527 }
528
529 while let Some(p) = futs.next().await {
530 probes.push(p);
531 }
532 probes.sort_by(|a, b| a.address.cmp(&b.address));
533 report.chunks = Some(probes);
534 Ok(report)
535}
536
537fn first_32_of_ref(r: &Reference) -> [u8; 32] {
538 let mut out = [0u8; 32];
539 out.copy_from_slice(&r.as_bytes()[..32]);
540 out
541}
542
543async fn collect_chunk_addresses(bee: &Client, root: &Reference) -> Result<Vec<Reference>> {
548 let mut addresses: Vec<Reference> = vec![root.clone()];
549 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
550 seen.insert(root.to_hex());
551
552 let mut queue: std::collections::VecDeque<Reference> = std::collections::VecDeque::new();
553 queue.push_back(root.clone());
554
555 while let Some(addr) = queue.pop_front() {
556 if addresses.len() >= MAX_CHUNKS {
557 break;
558 }
559 let bytes = match bee.file().download_chunk(&addr, None).await {
560 Ok(b) => b,
561 Err(_) => continue,
564 };
565 let Ok(node) = unmarshal(&bytes, addr.as_bytes()) else {
566 continue;
569 };
570 if !is_null_address(&node.target_address) {
572 if let Ok(r) = Reference::new(&node.target_address) {
573 if seen.insert(r.to_hex()) {
574 addresses.push(r);
575 }
576 }
577 }
578 for fork in node.forks.values() {
580 if let Some(sa) = fork.node.self_address {
581 if let Ok(r) = Reference::new(&sa) {
582 if seen.insert(r.to_hex()) {
583 addresses.push(r.clone());
584 queue.push_back(r);
585 }
586 }
587 }
588 }
589 }
590 Ok(addresses)
591}
592
593pub async fn reseed(req: ReseedRequest) -> Result<()> {
594 let bee = make_bee(&req.bee_url, req.timeout)?;
595 let r = parse_reference(&req.reference)?;
596 let batch = BatchId::from_hex(&req.batch_id)
597 .map_err(|e| anyhow!("invalid batch id {}: {e}", req.batch_id))?;
598 bee.api().reupload(&r, &batch).await?;
599 Ok(())
600}
601
602pub async fn check_stamp(
607 bee_url: &str,
608 batch_id: &str,
609 timeout: Duration,
610) -> Result<StampStatus> {
611 let bee = make_bee(bee_url, timeout)?;
612 let batch = BatchId::from_hex(batch_id)
613 .map_err(|e| anyhow!("invalid batch id {batch_id}: {e}"))?;
614 let pb = bee
615 .postage()
616 .get_postage_batch(&batch)
617 .await
618 .map_err(anyhow::Error::from)?;
619
620 let mut warnings = Vec::new();
621 if !pb.exists {
622 warnings.push("batch not known to this Bee".to_string());
623 }
624 if !pb.usable {
625 warnings.push("batch not usable yet (chain may be syncing)".to_string());
626 }
627 if pb.batch_ttl >= 0 && pb.batch_ttl < STAMP_LOW_TTL_SECS {
628 warnings.push(format!(
629 "batch TTL low: ~{} (re-seed may not outlive the batch)",
630 humanize_secs(pb.batch_ttl)
631 ));
632 }
633
634 let healthy = pb.exists && pb.usable && (pb.batch_ttl < 0 || pb.batch_ttl >= STAMP_LOW_TTL_SECS);
635
636 Ok(StampStatus {
637 batch_id: batch_id.to_string(),
638 exists: pb.exists,
639 usable: pb.usable,
640 batch_ttl: pb.batch_ttl,
641 healthy,
642 warnings,
643 })
644}
645
646fn humanize_secs(s: i64) -> String {
647 if s < 0 {
648 return "unknown".to_string();
649 }
650 let s = s as u64;
651 if s >= 86_400 {
652 format!("{} day(s)", s / 86_400)
653 } else if s >= 3_600 {
654 format!("{} hour(s)", s / 3_600)
655 } else if s >= 60 {
656 format!("{} min", s / 60)
657 } else {
658 format!("{}s", s)
659 }
660}
661
662pub fn render_report(report: &Report, fmt: OutputFormat) -> String {
663 match fmt {
664 OutputFormat::Json => {
665 serde_json::to_string_pretty(report).expect("report serialization") + "\n"
666 }
667 OutputFormat::Text => render_text(report),
668 }
669}
670
671fn render_text(r: &Report) -> String {
672 use std::fmt::Write;
673 let mut out = String::new();
674 let _ = writeln!(out, "ref {}", r.reference);
675 let _ = writeln!(out, "status {:?}", r.status);
676 let _ = writeln!(out);
677 let _ = writeln!(out, "vantages:");
678 let url_w = r.vantages.iter().map(|v| v.bee_url.len()).max().unwrap_or(20);
679 for v in &r.vantages {
680 let state = match (v.retrievable, &v.error) {
681 (Some(true), _) => "retrievable",
682 (Some(false), _) => "unretrievable",
683 (None, Some(_)) => "error",
684 (None, None) => "unknown",
685 };
686 let meta = vantage_meta(v);
687 let _ = writeln!(
688 out,
689 " {:<url_w$} {:<14} {:>6} ms{}{}",
690 v.bee_url,
691 state,
692 v.elapsed_ms,
693 if meta.is_empty() { String::new() } else { format!(" {meta}") },
694 v.error
695 .as_deref()
696 .map(|e| format!(" ({e})"))
697 .unwrap_or_default(),
698 url_w = url_w
699 );
700 }
701 if !r.gateways.is_empty() {
702 let _ = writeln!(out);
703 let _ = writeln!(out, "gateways:");
704 let url_w = r.gateways.iter().map(|g| g.url.len()).max().unwrap_or(20);
705 for g in &r.gateways {
706 let state = match (g.retrievable, &g.error) {
707 (Some(true), _) => "retrievable",
708 (Some(false), _) => "unretrievable",
709 (None, _) => "error",
710 };
711 let code = g
712 .status_code
713 .map(|c| format!(" HTTP {c}"))
714 .unwrap_or_default();
715 let _ = writeln!(
716 out,
717 " {:<url_w$} {:<14} {:>6} ms{}{}",
718 g.url,
719 state,
720 g.elapsed_ms,
721 code,
722 g.error
723 .as_deref()
724 .map(|e| format!(" ({e})"))
725 .unwrap_or_default(),
726 url_w = url_w
727 );
728 }
729 }
730 if let Some(res) = &r.resolution {
731 let _ = writeln!(out);
732 match res {
733 Resolution::Feed { owner, topic, resolved_reference } => {
734 let _ = writeln!(
735 out,
736 "resolved feed owner={owner} topic={topic} -> {resolved_reference}",
737 );
738 }
739 }
740 }
741 if let Some(chunks) = &r.chunks {
742 let _ = writeln!(out);
743 let _ = writeln!(out, "chunks: {} probed", chunks.len());
744 let mut missing = 0usize;
745 for c in chunks {
746 let missing_in: Vec<String> = c
747 .per_vantage
748 .iter()
749 .filter(|(_, cv)| !cv.found)
750 .map(|(u, cv)| match cv.proximity {
751 Some(p) => format!("{u} (PO {p})"),
752 None => u.clone(),
753 })
754 .collect();
755 if !missing_in.is_empty() {
756 missing += 1;
757 let _ = writeln!(
758 out,
759 " [{}] {} missing on: {}",
760 c.neighborhood,
761 short(&c.address),
762 missing_in.join(", ")
763 );
764 }
765 }
766 if missing == 0 {
767 let _ = writeln!(out, " all chunks present on all vantages");
768 } else {
769 let _ = writeln!(out, " {missing} chunk(s) missing on at least one vantage");
770 }
771 }
772 out
773}
774
775fn vantage_meta(v: &VantageResult) -> String {
779 let mut parts: Vec<String> = Vec::new();
780 if let Some(o) = &v.overlay {
781 let neigh = o.chars().take(2).collect::<String>();
782 let short_overlay = short_overlay(o);
783 parts.push(format!("overlay {short_overlay} (nb {neigh})"));
784 }
785 if let Some(p) = v.proximity_to_root {
786 parts.push(format!("PO {p}"));
787 }
788 if let Some(ver) = &v.bee_version {
789 parts.push(format!("v{ver}"));
790 }
791 if parts.is_empty() {
792 String::new()
793 } else {
794 format!("· {}", parts.join(" · "))
795 }
796}
797
798fn short_overlay(hex: &str) -> String {
799 let s = hex.strip_prefix("0x").unwrap_or(hex);
800 if s.len() > 12 {
801 format!("{}…{}", &s[..6], &s[s.len() - 4..])
802 } else {
803 s.to_string()
804 }
805}
806
807pub fn render_stamp_status(s: &StampStatus) -> String {
810 use std::fmt::Write;
811 let mut out = String::new();
812 let ttl = if s.batch_ttl < 0 {
813 "unknown".to_string()
814 } else {
815 humanize_secs(s.batch_ttl)
816 };
817 let header = if s.healthy { "stamp OK" } else { "stamp warning" };
818 let _ = writeln!(
819 out,
820 "{header}: batch {} · usable={} · ttl={}",
821 short_overlay(&s.batch_id),
822 s.usable,
823 ttl,
824 );
825 for w in &s.warnings {
826 let _ = writeln!(out, " · {w}");
827 }
828 out
829}
830
831fn short(hex: &str) -> String {
832 if hex.len() > 16 {
833 format!("{}…{}", &hex[..8], &hex[hex.len() - 4..])
834 } else {
835 hex.to_string()
836 }
837}