1use std::collections::BTreeMap;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11use anyhow::{Context, Result, anyhow};
12use bee::Client;
13use bee::manifest::{is_null_address, unmarshal};
14use bee::swarm::{BatchId, Reference};
15use futures::stream::{FuturesUnordered, StreamExt};
16use serde::{Deserialize, Serialize};
17use tokio::sync::Semaphore;
18
19const MAX_CHUNKS: usize = 1000;
20
21#[derive(Copy, Clone, Debug)]
22pub enum OutputFormat {
23 Text,
24 Json,
25}
26
27#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
29#[serde(rename_all = "snake_case")]
30pub enum Status {
31 Retrievable,
32 Unretrievable,
33 Partial,
34 Error,
35}
36
37#[derive(Debug, Serialize, Deserialize)]
38pub struct Report {
39 pub reference: String,
40 pub status: Status,
41 pub vantages: Vec<VantageResult>,
42 #[serde(skip_serializing_if = "Option::is_none")]
44 pub chunks: Option<Vec<ChunkProbe>>,
45 pub spec_version: u32,
46}
47
48#[derive(Debug, Serialize, Deserialize)]
49pub struct VantageResult {
50 pub bee_url: String,
51 pub retrievable: Option<bool>,
53 pub elapsed_ms: u64,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub error: Option<String>,
56}
57
58#[derive(Debug, Serialize, Deserialize)]
59pub struct ChunkProbe {
60 pub address: String,
61 pub neighborhood: String,
63 pub per_vantage: BTreeMap<String, ChunkVantage>,
64}
65
66#[derive(Debug, Serialize, Deserialize)]
67pub struct ChunkVantage {
68 pub found: bool,
69 pub elapsed_ms: u64,
70 #[serde(skip_serializing_if = "Option::is_none")]
71 pub error: Option<String>,
72}
73
74pub struct ReseedRequest {
75 pub reference: String,
76 pub bee_url: String,
77 pub batch_id: String,
78 pub timeout: Duration,
79}
80
81const SPEC_VERSION: u32 = 1;
82
83fn parse_reference(s: &str) -> Result<Reference> {
84 Reference::from_hex(s).map_err(|e| anyhow!("invalid reference {s}: {e}"))
85}
86
87fn make_bee(url: &str, timeout: Duration) -> Result<Client> {
88 let http = reqwest::Client::builder()
89 .timeout(timeout)
90 .build()
91 .context("building http client")?;
92 Client::with_http_client(url, http).map_err(|e| anyhow!("invalid bee url {url}: {e}"))
93}
94
95pub async fn check_multi_vantage(
97 reference: &str,
98 bees: &[String],
99 timeout: Duration,
100) -> Result<Report> {
101 let r = parse_reference(reference)?;
102
103 let mut futs = FuturesUnordered::new();
104 for bee_url in bees {
105 let bee_url = bee_url.clone();
106 let r = r.clone();
107 futs.push(async move {
108 let started = Instant::now();
109 let result = async {
110 let bee = make_bee(&bee_url, timeout)?;
111 let ok = bee.api().is_retrievable(&r).await.map_err(anyhow::Error::from)?;
112 Ok::<_, anyhow::Error>(ok)
113 }
114 .await;
115 let elapsed_ms = started.elapsed().as_millis() as u64;
116 match result {
117 Ok(ok) => VantageResult {
118 bee_url,
119 retrievable: Some(ok),
120 elapsed_ms,
121 error: None,
122 },
123 Err(e) => VantageResult {
124 bee_url,
125 retrievable: None,
126 elapsed_ms,
127 error: Some(format!("{e:#}")),
128 },
129 }
130 });
131 }
132
133 let mut vantages = Vec::with_capacity(bees.len());
134 while let Some(v) = futs.next().await {
135 vantages.push(v);
136 }
137 vantages.sort_by(|a, b| a.bee_url.cmp(&b.bee_url));
138
139 let status = aggregate_status(&vantages);
140 Ok(Report {
141 reference: reference.to_string(),
142 status,
143 vantages,
144 chunks: None,
145 spec_version: SPEC_VERSION,
146 })
147}
148
149fn aggregate_status(vantages: &[VantageResult]) -> Status {
150 let total = vantages.len();
151 if total == 0 {
152 return Status::Error;
153 }
154 let mut retr = 0usize;
155 let mut unret = 0usize;
156 let mut err = 0usize;
157 for v in vantages {
158 match v.retrievable {
159 Some(true) => retr += 1,
160 Some(false) => unret += 1,
161 None => err += 1,
162 }
163 }
164 if err == total {
165 Status::Error
166 } else if retr == total {
167 Status::Retrievable
168 } else if retr == 0 && unret + err == total {
169 Status::Unretrievable
170 } else {
171 Status::Partial
172 }
173}
174
175pub async fn drill_down(
179 mut report: Report,
180 bees: &[String],
181 timeout: Duration,
182 concurrency: usize,
183) -> Result<Report> {
184 let r = parse_reference(&report.reference)?;
185 let walker_bee = bees.first().context("no bee URL for drill-down")?;
187 let walker = make_bee(walker_bee, timeout)?;
188
189 let addresses = collect_chunk_addresses(&walker, &r).await?;
190
191 let clients: Vec<(String, Client)> = bees
193 .iter()
194 .map(|u| make_bee(u, timeout).map(|b| (u.clone(), b)))
195 .collect::<Result<_>>()?;
196
197 let sem = Arc::new(Semaphore::new(concurrency.max(1)));
198 let mut probes = Vec::with_capacity(addresses.len());
199
200 let mut futs = FuturesUnordered::new();
201 for addr in addresses {
202 let sem = sem.clone();
203 let clients = clients.clone();
204 futs.push(async move {
205 let mut per_vantage = BTreeMap::new();
206 for (url, bee) in &clients {
207 let _permit = sem.acquire().await.expect("semaphore not closed");
208 let started = Instant::now();
209 let res = bee.file().download_chunk(&addr, None).await;
210 let elapsed_ms = started.elapsed().as_millis() as u64;
211 let cv = match res {
212 Ok(_) => ChunkVantage { found: true, elapsed_ms, error: None },
213 Err(e) => ChunkVantage {
214 found: false,
215 elapsed_ms,
216 error: Some(format!("{e}")),
217 },
218 };
219 per_vantage.insert(url.clone(), cv);
220 }
221 let hex = addr.to_hex();
222 let neighborhood = hex.chars().take(2).collect::<String>();
223 ChunkProbe {
224 address: hex,
225 neighborhood,
226 per_vantage,
227 }
228 });
229 }
230
231 while let Some(p) = futs.next().await {
232 probes.push(p);
233 }
234 probes.sort_by(|a, b| a.address.cmp(&b.address));
235 report.chunks = Some(probes);
236 Ok(report)
237}
238
239async fn collect_chunk_addresses(bee: &Client, root: &Reference) -> Result<Vec<Reference>> {
244 let mut addresses: Vec<Reference> = vec![root.clone()];
245 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
246 seen.insert(root.to_hex());
247
248 let mut queue: std::collections::VecDeque<Reference> = std::collections::VecDeque::new();
249 queue.push_back(root.clone());
250
251 while let Some(addr) = queue.pop_front() {
252 if addresses.len() >= MAX_CHUNKS {
253 break;
254 }
255 let bytes = match bee.file().download_chunk(&addr, None).await {
256 Ok(b) => b,
257 Err(_) => continue,
260 };
261 let Ok(node) = unmarshal(&bytes, addr.as_bytes()) else {
262 continue;
265 };
266 if !is_null_address(&node.target_address) {
268 if let Ok(r) = Reference::new(&node.target_address) {
269 if seen.insert(r.to_hex()) {
270 addresses.push(r);
271 }
272 }
273 }
274 for fork in node.forks.values() {
276 if let Some(sa) = fork.node.self_address {
277 if let Ok(r) = Reference::new(&sa) {
278 if seen.insert(r.to_hex()) {
279 addresses.push(r.clone());
280 queue.push_back(r);
281 }
282 }
283 }
284 }
285 }
286 Ok(addresses)
287}
288
289pub async fn reseed(req: ReseedRequest) -> Result<()> {
290 let bee = make_bee(&req.bee_url, req.timeout)?;
291 let r = parse_reference(&req.reference)?;
292 let batch = BatchId::from_hex(&req.batch_id)
293 .map_err(|e| anyhow!("invalid batch id {}: {e}", req.batch_id))?;
294 bee.api().reupload(&r, &batch).await?;
295 Ok(())
296}
297
298pub fn render_report(report: &Report, fmt: OutputFormat) -> String {
299 match fmt {
300 OutputFormat::Json => {
301 serde_json::to_string_pretty(report).expect("report serialization") + "\n"
302 }
303 OutputFormat::Text => render_text(report),
304 }
305}
306
307fn render_text(r: &Report) -> String {
308 use std::fmt::Write;
309 let mut out = String::new();
310 let _ = writeln!(out, "ref {}", r.reference);
311 let _ = writeln!(out, "status {:?}", r.status);
312 let _ = writeln!(out);
313 let _ = writeln!(out, "vantages:");
314 let url_w = r.vantages.iter().map(|v| v.bee_url.len()).max().unwrap_or(20);
315 for v in &r.vantages {
316 let state = match (v.retrievable, &v.error) {
317 (Some(true), _) => "retrievable",
318 (Some(false), _) => "unretrievable",
319 (None, Some(_)) => "error",
320 (None, None) => "unknown",
321 };
322 let _ = writeln!(
323 out,
324 " {:<url_w$} {:<14} {:>6} ms{}",
325 v.bee_url,
326 state,
327 v.elapsed_ms,
328 v.error
329 .as_deref()
330 .map(|e| format!(" ({e})"))
331 .unwrap_or_default(),
332 url_w = url_w
333 );
334 }
335 if let Some(chunks) = &r.chunks {
336 let _ = writeln!(out);
337 let _ = writeln!(out, "chunks: {} probed", chunks.len());
338 let mut missing = 0usize;
339 for c in chunks {
340 let missing_in: Vec<&String> = c
341 .per_vantage
342 .iter()
343 .filter(|(_, cv)| !cv.found)
344 .map(|(u, _)| u)
345 .collect();
346 if !missing_in.is_empty() {
347 missing += 1;
348 let _ = writeln!(
349 out,
350 " [{}] {} missing on: {}",
351 c.neighborhood,
352 short(&c.address),
353 missing_in
354 .iter()
355 .map(|s| s.as_str())
356 .collect::<Vec<_>>()
357 .join(", ")
358 );
359 }
360 }
361 if missing == 0 {
362 let _ = writeln!(out, " all chunks present on all vantages");
363 } else {
364 let _ = writeln!(out, " {missing} chunk(s) missing on at least one vantage");
365 }
366 }
367 out
368}
369
370fn short(hex: &str) -> String {
371 if hex.len() > 16 {
372 format!("{}…{}", &hex[..8], &hex[hex.len() - 4..])
373 } else {
374 hex.to_string()
375 }
376}