1use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use anyhow::{Context, Result, anyhow};
12use bee::Client;
13use bee::manifest::{is_null_address, unmarshal};
14use bee::swarm::gsoc::proximity;
15use bee::swarm::{BatchId, Reference};
16use futures::stream::{FuturesUnordered, StreamExt};
17use serde::{Deserialize, Serialize};
18use tokio::sync::Semaphore;
19
20const MAX_CHUNKS: usize = 1000;
21const STAMP_LOW_TTL_SECS: i64 = 86_400;
24
25#[derive(Copy, Clone, Debug)]
26pub enum OutputFormat {
27 Text,
28 Json,
29}
30
31#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
33#[serde(rename_all = "snake_case")]
34pub enum Status {
35 Retrievable,
36 Unretrievable,
37 Partial,
38 Error,
39}
40
41#[derive(Debug, Serialize, Deserialize)]
42pub struct Report {
43 pub reference: String,
44 pub status: Status,
45 pub vantages: Vec<VantageResult>,
46 #[serde(skip_serializing_if = "Option::is_none")]
48 pub chunks: Option<Vec<ChunkProbe>>,
49 pub spec_version: u32,
50}
51
52#[derive(Debug, Serialize, Deserialize)]
53pub struct VantageResult {
54 pub bee_url: String,
55 pub retrievable: Option<bool>,
57 pub elapsed_ms: u64,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub error: Option<String>,
60 #[serde(skip_serializing_if = "Option::is_none")]
63 pub overlay: Option<String>,
64 #[serde(skip_serializing_if = "Option::is_none")]
66 pub bee_version: Option<String>,
67 #[serde(skip_serializing_if = "Option::is_none")]
71 pub proximity_to_root: Option<u32>,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75pub struct ChunkProbe {
76 pub address: String,
77 pub neighborhood: String,
79 pub per_vantage: BTreeMap<String, ChunkVantage>,
80}
81
82#[derive(Debug, Serialize, Deserialize)]
83pub struct ChunkVantage {
84 pub found: bool,
85 pub elapsed_ms: u64,
86 #[serde(skip_serializing_if = "Option::is_none")]
87 pub error: Option<String>,
88 #[serde(skip_serializing_if = "Option::is_none")]
91 pub proximity: Option<u32>,
92}
93
94pub struct ReseedRequest {
95 pub reference: String,
96 pub bee_url: String,
97 pub batch_id: String,
98 pub timeout: Duration,
99}
100
101#[derive(Debug, Serialize, Deserialize)]
105pub struct StampStatus {
106 pub batch_id: String,
107 pub exists: bool,
108 pub usable: bool,
109 pub batch_ttl: i64,
110 pub healthy: bool,
113 #[serde(skip_serializing_if = "Vec::is_empty")]
114 pub warnings: Vec<String>,
115}
116
117const SPEC_VERSION: u32 = 1;
118
119fn parse_reference(s: &str) -> Result<Reference> {
120 Reference::from_hex(s).map_err(|e| anyhow!("invalid reference {s}: {e}"))
121}
122
123fn make_bee(url: &str, timeout: Duration) -> Result<Client> {
124 let http = reqwest::Client::builder()
125 .timeout(timeout)
126 .build()
127 .context("building http client")?;
128 Client::with_http_client(url, http).map_err(|e| anyhow!("invalid bee url {url}: {e}"))
129}
130
131pub async fn check_multi_vantage(
138 reference: &str,
139 bees: &[String],
140 timeout: Duration,
141) -> Result<Report> {
142 let r = parse_reference(reference)?;
143 let root_bytes = first_32(&r);
144
145 let mut futs = FuturesUnordered::new();
146 for bee_url in bees {
147 let bee_url = bee_url.clone();
148 let r = r.clone();
149 futs.push(async move {
150 let bee = match make_bee(&bee_url, timeout) {
151 Ok(b) => b,
152 Err(e) => {
153 return VantageResult {
154 bee_url,
155 retrievable: None,
156 elapsed_ms: 0,
157 error: Some(format!("{e:#}")),
158 overlay: None,
159 bee_version: None,
160 proximity_to_root: None,
161 };
162 }
163 };
164 let started = Instant::now();
165 let api = bee.api();
166 let debug = bee.debug();
167 let (stew_res, addr_res, health_res) = tokio::join!(
168 api.is_retrievable(&r),
169 debug.addresses(),
170 debug.health(),
171 );
172 let elapsed_ms = started.elapsed().as_millis() as u64;
173
174 let overlay = addr_res.ok().map(|a| a.overlay);
175 let bee_version = health_res.ok().map(|h| h.version);
176 let proximity_to_root = overlay
177 .as_deref()
178 .and_then(decode_overlay)
179 .map(|o| proximity(&o, &root_bytes));
180
181 match stew_res {
182 Ok(ok) => VantageResult {
183 bee_url,
184 retrievable: Some(ok),
185 elapsed_ms,
186 error: None,
187 overlay,
188 bee_version,
189 proximity_to_root,
190 },
191 Err(e) => VantageResult {
192 bee_url,
193 retrievable: None,
194 elapsed_ms,
195 error: Some(format!("{e}")),
196 overlay,
197 bee_version,
198 proximity_to_root,
199 },
200 }
201 });
202 }
203
204 let mut vantages = Vec::with_capacity(bees.len());
205 while let Some(v) = futs.next().await {
206 vantages.push(v);
207 }
208 vantages.sort_by(|a, b| a.bee_url.cmp(&b.bee_url));
209
210 let status = aggregate_status(&vantages);
211 Ok(Report {
212 reference: reference.to_string(),
213 status,
214 vantages,
215 chunks: None,
216 spec_version: SPEC_VERSION,
217 })
218}
219
220fn decode_overlay(hex: &str) -> Option<[u8; 32]> {
224 let s = hex.strip_prefix("0x").unwrap_or(hex);
225 if s.len() < 64 {
226 return None;
227 }
228 let mut out = [0u8; 32];
229 for (i, b) in out.iter_mut().enumerate() {
230 let h = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16).ok()?;
231 *b = h;
232 }
233 Some(out)
234}
235
236fn first_32(r: &Reference) -> [u8; 32] {
237 let mut out = [0u8; 32];
238 out.copy_from_slice(&r.as_bytes()[..32]);
239 out
240}
241
242fn aggregate_status(vantages: &[VantageResult]) -> Status {
243 let total = vantages.len();
244 if total == 0 {
245 return Status::Error;
246 }
247 let mut retr = 0usize;
248 let mut unret = 0usize;
249 let mut err = 0usize;
250 for v in vantages {
251 match v.retrievable {
252 Some(true) => retr += 1,
253 Some(false) => unret += 1,
254 None => err += 1,
255 }
256 }
257 if err == total {
258 Status::Error
259 } else if retr == total {
260 Status::Retrievable
261 } else if retr == 0 && unret + err == total {
262 Status::Unretrievable
263 } else {
264 Status::Partial
265 }
266}
267
268pub async fn drill_down(
274 mut report: Report,
275 bees: &[String],
276 timeout: Duration,
277 concurrency: usize,
278) -> Result<Report> {
279 let r = parse_reference(&report.reference)?;
280 let walker_bee = bees.first().context("no bee URL for drill-down")?;
282 let walker = make_bee(walker_bee, timeout)?;
283
284 let addresses = collect_chunk_addresses(&walker, &r).await?;
285
286 let overlays: BTreeMap<String, [u8; 32]> = report
289 .vantages
290 .iter()
291 .filter_map(|v| {
292 v.overlay
293 .as_deref()
294 .and_then(decode_overlay)
295 .map(|o| (v.bee_url.clone(), o))
296 })
297 .collect();
298
299 let clients: Vec<(String, Client)> = bees
300 .iter()
301 .map(|u| make_bee(u, timeout).map(|b| (u.clone(), b)))
302 .collect::<Result<_>>()?;
303
304 let sem = Arc::new(Semaphore::new(concurrency.max(1)));
305 let mut probes = Vec::with_capacity(addresses.len());
306
307 let mut futs = FuturesUnordered::new();
308 for addr in addresses {
309 let sem = sem.clone();
310 let clients = clients.clone();
311 let overlays = overlays.clone();
312 futs.push(async move {
313 let chunk_bytes = first_32_of_ref(&addr);
314 let mut per_vantage = BTreeMap::new();
315 for (url, bee) in &clients {
316 let _permit = sem.acquire().await.expect("semaphore not closed");
317 let started = Instant::now();
318 let res = bee.file().download_chunk(&addr, None).await;
319 let elapsed_ms = started.elapsed().as_millis() as u64;
320 let prox = overlays
321 .get(url)
322 .map(|o| proximity(o, &chunk_bytes));
323 let cv = match res {
324 Ok(_) => ChunkVantage {
325 found: true,
326 elapsed_ms,
327 error: None,
328 proximity: prox,
329 },
330 Err(e) => ChunkVantage {
331 found: false,
332 elapsed_ms,
333 error: Some(format!("{e}")),
334 proximity: prox,
335 },
336 };
337 per_vantage.insert(url.clone(), cv);
338 }
339 let hex = addr.to_hex();
340 let neighborhood = hex.chars().take(2).collect::<String>();
341 ChunkProbe {
342 address: hex,
343 neighborhood,
344 per_vantage,
345 }
346 });
347 }
348
349 while let Some(p) = futs.next().await {
350 probes.push(p);
351 }
352 probes.sort_by(|a, b| a.address.cmp(&b.address));
353 report.chunks = Some(probes);
354 Ok(report)
355}
356
357fn first_32_of_ref(r: &Reference) -> [u8; 32] {
358 let mut out = [0u8; 32];
359 out.copy_from_slice(&r.as_bytes()[..32]);
360 out
361}
362
363async fn collect_chunk_addresses(bee: &Client, root: &Reference) -> Result<Vec<Reference>> {
368 let mut addresses: Vec<Reference> = vec![root.clone()];
369 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
370 seen.insert(root.to_hex());
371
372 let mut queue: std::collections::VecDeque<Reference> = std::collections::VecDeque::new();
373 queue.push_back(root.clone());
374
375 while let Some(addr) = queue.pop_front() {
376 if addresses.len() >= MAX_CHUNKS {
377 break;
378 }
379 let bytes = match bee.file().download_chunk(&addr, None).await {
380 Ok(b) => b,
381 Err(_) => continue,
384 };
385 let Ok(node) = unmarshal(&bytes, addr.as_bytes()) else {
386 continue;
389 };
390 if !is_null_address(&node.target_address) {
392 if let Ok(r) = Reference::new(&node.target_address) {
393 if seen.insert(r.to_hex()) {
394 addresses.push(r);
395 }
396 }
397 }
398 for fork in node.forks.values() {
400 if let Some(sa) = fork.node.self_address {
401 if let Ok(r) = Reference::new(&sa) {
402 if seen.insert(r.to_hex()) {
403 addresses.push(r.clone());
404 queue.push_back(r);
405 }
406 }
407 }
408 }
409 }
410 Ok(addresses)
411}
412
413pub async fn reseed(req: ReseedRequest) -> Result<()> {
414 let bee = make_bee(&req.bee_url, req.timeout)?;
415 let r = parse_reference(&req.reference)?;
416 let batch = BatchId::from_hex(&req.batch_id)
417 .map_err(|e| anyhow!("invalid batch id {}: {e}", req.batch_id))?;
418 bee.api().reupload(&r, &batch).await?;
419 Ok(())
420}
421
422pub async fn check_stamp(
427 bee_url: &str,
428 batch_id: &str,
429 timeout: Duration,
430) -> Result<StampStatus> {
431 let bee = make_bee(bee_url, timeout)?;
432 let batch = BatchId::from_hex(batch_id)
433 .map_err(|e| anyhow!("invalid batch id {batch_id}: {e}"))?;
434 let pb = bee
435 .postage()
436 .get_postage_batch(&batch)
437 .await
438 .map_err(anyhow::Error::from)?;
439
440 let mut warnings = Vec::new();
441 if !pb.exists {
442 warnings.push("batch not known to this Bee".to_string());
443 }
444 if !pb.usable {
445 warnings.push("batch not usable yet (chain may be syncing)".to_string());
446 }
447 if pb.batch_ttl >= 0 && pb.batch_ttl < STAMP_LOW_TTL_SECS {
448 warnings.push(format!(
449 "batch TTL low: ~{} (re-seed may not outlive the batch)",
450 humanize_secs(pb.batch_ttl)
451 ));
452 }
453
454 let healthy = pb.exists && pb.usable && (pb.batch_ttl < 0 || pb.batch_ttl >= STAMP_LOW_TTL_SECS);
455
456 Ok(StampStatus {
457 batch_id: batch_id.to_string(),
458 exists: pb.exists,
459 usable: pb.usable,
460 batch_ttl: pb.batch_ttl,
461 healthy,
462 warnings,
463 })
464}
465
466fn humanize_secs(s: i64) -> String {
467 if s < 0 {
468 return "unknown".to_string();
469 }
470 let s = s as u64;
471 if s >= 86_400 {
472 format!("{} day(s)", s / 86_400)
473 } else if s >= 3_600 {
474 format!("{} hour(s)", s / 3_600)
475 } else if s >= 60 {
476 format!("{} min", s / 60)
477 } else {
478 format!("{}s", s)
479 }
480}
481
482pub fn render_report(report: &Report, fmt: OutputFormat) -> String {
483 match fmt {
484 OutputFormat::Json => {
485 serde_json::to_string_pretty(report).expect("report serialization") + "\n"
486 }
487 OutputFormat::Text => render_text(report),
488 }
489}
490
491fn render_text(r: &Report) -> String {
492 use std::fmt::Write;
493 let mut out = String::new();
494 let _ = writeln!(out, "ref {}", r.reference);
495 let _ = writeln!(out, "status {:?}", r.status);
496 let _ = writeln!(out);
497 let _ = writeln!(out, "vantages:");
498 let url_w = r.vantages.iter().map(|v| v.bee_url.len()).max().unwrap_or(20);
499 for v in &r.vantages {
500 let state = match (v.retrievable, &v.error) {
501 (Some(true), _) => "retrievable",
502 (Some(false), _) => "unretrievable",
503 (None, Some(_)) => "error",
504 (None, None) => "unknown",
505 };
506 let meta = vantage_meta(v);
507 let _ = writeln!(
508 out,
509 " {:<url_w$} {:<14} {:>6} ms{}{}",
510 v.bee_url,
511 state,
512 v.elapsed_ms,
513 if meta.is_empty() { String::new() } else { format!(" {meta}") },
514 v.error
515 .as_deref()
516 .map(|e| format!(" ({e})"))
517 .unwrap_or_default(),
518 url_w = url_w
519 );
520 }
521 if let Some(chunks) = &r.chunks {
522 let _ = writeln!(out);
523 let _ = writeln!(out, "chunks: {} probed", chunks.len());
524 let mut missing = 0usize;
525 for c in chunks {
526 let missing_in: Vec<String> = c
527 .per_vantage
528 .iter()
529 .filter(|(_, cv)| !cv.found)
530 .map(|(u, cv)| match cv.proximity {
531 Some(p) => format!("{u} (PO {p})"),
532 None => u.clone(),
533 })
534 .collect();
535 if !missing_in.is_empty() {
536 missing += 1;
537 let _ = writeln!(
538 out,
539 " [{}] {} missing on: {}",
540 c.neighborhood,
541 short(&c.address),
542 missing_in.join(", ")
543 );
544 }
545 }
546 if missing == 0 {
547 let _ = writeln!(out, " all chunks present on all vantages");
548 } else {
549 let _ = writeln!(out, " {missing} chunk(s) missing on at least one vantage");
550 }
551 }
552 out
553}
554
555fn vantage_meta(v: &VantageResult) -> String {
559 let mut parts: Vec<String> = Vec::new();
560 if let Some(o) = &v.overlay {
561 let neigh = o.chars().take(2).collect::<String>();
562 let short_overlay = short_overlay(o);
563 parts.push(format!("overlay {short_overlay} (nb {neigh})"));
564 }
565 if let Some(p) = v.proximity_to_root {
566 parts.push(format!("PO {p}"));
567 }
568 if let Some(ver) = &v.bee_version {
569 parts.push(format!("v{ver}"));
570 }
571 if parts.is_empty() {
572 String::new()
573 } else {
574 format!("· {}", parts.join(" · "))
575 }
576}
577
578fn short_overlay(hex: &str) -> String {
579 let s = hex.strip_prefix("0x").unwrap_or(hex);
580 if s.len() > 12 {
581 format!("{}…{}", &s[..6], &s[s.len() - 4..])
582 } else {
583 s.to_string()
584 }
585}
586
587pub fn render_stamp_status(s: &StampStatus) -> String {
590 use std::fmt::Write;
591 let mut out = String::new();
592 let ttl = if s.batch_ttl < 0 {
593 "unknown".to_string()
594 } else {
595 humanize_secs(s.batch_ttl)
596 };
597 let header = if s.healthy { "stamp OK" } else { "stamp warning" };
598 let _ = writeln!(
599 out,
600 "{header}: batch {} · usable={} · ttl={}",
601 short_overlay(&s.batch_id),
602 s.usable,
603 ttl,
604 );
605 for w in &s.warnings {
606 let _ = writeln!(out, " · {w}");
607 }
608 out
609}
610
611fn short(hex: &str) -> String {
612 if hex.len() > 16 {
613 format!("{}…{}", &hex[..8], &hex[hex.len() - 4..])
614 } else {
615 hex.to_string()
616 }
617}