1use std::collections::HashSet;
25use std::sync::Arc;
26use std::time::{Duration, Instant, SystemTime};
27
28use bee::manifest::{MantarayNode, unmarshal};
29use bee::swarm::Reference;
30use bee::swarm::bmt::calculate_chunk_address;
31
32use crate::api::ApiClient;
33
34const MAX_CHUNKS_PER_WALK: u64 = 10_000;
39
40#[derive(Debug, Clone)]
50pub struct DurabilityResult {
51 pub reference: Reference,
52 pub started_at: SystemTime,
53 pub duration_ms: u64,
54 pub chunks_total: u64,
55 pub chunks_lost: u64,
56 pub chunks_errors: u64,
57 pub chunks_corrupt: u64,
63 pub root_is_manifest: bool,
67 pub truncated: bool,
69 pub bmt_verified: bool,
75}
76
77impl DurabilityResult {
78 pub fn is_healthy(&self) -> bool {
80 self.chunks_lost == 0 && self.chunks_errors == 0 && self.chunks_corrupt == 0
81 }
82 pub fn summary(&self) -> String {
84 let kind = if self.root_is_manifest {
85 "manifest"
86 } else {
87 "raw chunk"
88 };
89 let trunc = if self.truncated { " (truncated)" } else { "" };
90 let verify = if self.bmt_verified { " · BMT" } else { "" };
91 if self.is_healthy() {
92 format!(
93 "durability-check OK in {}ms · {kind} · {} chunk{} retrievable{verify}{trunc}",
94 self.duration_ms,
95 self.chunks_total,
96 if self.chunks_total == 1 { "" } else { "s" },
97 )
98 } else {
99 format!(
100 "durability-check UNHEALTHY in {}ms · {kind} · total {} · lost {} · errors {} · corrupt {}{trunc}",
101 self.duration_ms,
102 self.chunks_total,
103 self.chunks_lost,
104 self.chunks_errors,
105 self.chunks_corrupt,
106 )
107 }
108 }
109}
110
111pub async fn check(api: Arc<ApiClient>, reference: Reference) -> DurabilityResult {
117 check_with_options(api, reference, CheckOptions { bmt_verify: true }).await
118}
119
120#[derive(Debug, Clone, Copy)]
124pub struct CheckOptions {
125 pub bmt_verify: bool,
131}
132
133impl Default for CheckOptions {
134 fn default() -> Self {
135 Self { bmt_verify: true }
136 }
137}
138
139pub async fn check_with_options(
143 api: Arc<ApiClient>,
144 reference: Reference,
145 opts: CheckOptions,
146) -> DurabilityResult {
147 let started = Instant::now();
148 let started_at = SystemTime::now();
149 let mut result = DurabilityResult {
150 reference: reference.clone(),
151 started_at,
152 duration_ms: 0,
153 chunks_total: 0,
154 chunks_lost: 0,
155 chunks_errors: 0,
156 chunks_corrupt: 0,
157 root_is_manifest: false,
158 truncated: false,
159 bmt_verified: opts.bmt_verify,
160 };
161
162 let root_bytes = match api.bee().file().download_chunk(&reference, None).await {
164 Ok(b) => b,
165 Err(e) => {
166 let s = e.to_string();
171 if s.contains("404") {
172 result.chunks_lost = 1;
173 } else {
174 result.chunks_errors = 1;
175 }
176 result.chunks_total = 1;
177 result.duration_ms = elapsed_ms(started);
178 return result;
179 }
180 };
181 result.chunks_total = 1;
182 if opts.bmt_verify && !bmt_matches(&root_bytes, reference.as_bytes()) {
183 result.chunks_corrupt += 1;
188 }
189
190 let root_node = match unmarshal(&root_bytes, reference.as_bytes()) {
193 Ok(n) => n,
194 Err(_) => {
195 result.duration_ms = elapsed_ms(started);
196 return result;
197 }
198 };
199 result.root_is_manifest = true;
200
201 let mut visited: HashSet<[u8; 32]> = HashSet::new();
204 let mut queue: Vec<MantarayNode> = vec![root_node];
205
206 while let Some(node) = queue.pop() {
207 for fork in node.forks.values() {
208 let Some(addr) = fork.node.self_address else {
209 continue;
210 };
211 if !visited.insert(addr) {
212 continue;
213 }
214 if result.chunks_total >= MAX_CHUNKS_PER_WALK {
215 result.truncated = true;
216 result.duration_ms = elapsed_ms(started);
217 return result;
218 }
219 result.chunks_total += 1;
220 let child_ref = match Reference::new(&addr) {
221 Ok(r) => r,
222 Err(_) => {
223 result.chunks_errors += 1;
224 continue;
225 }
226 };
227 match api.bee().file().download_chunk(&child_ref, None).await {
228 Ok(child_bytes) => {
229 if opts.bmt_verify && !bmt_matches(&child_bytes, child_ref.as_bytes()) {
230 result.chunks_corrupt += 1;
233 continue;
234 }
235 if let Ok(child_node) = unmarshal(&child_bytes, child_ref.as_bytes()) {
238 queue.push(child_node);
239 }
240 }
241 Err(e) => {
242 if e.to_string().contains("404") {
243 result.chunks_lost += 1;
244 } else {
245 result.chunks_errors += 1;
246 }
247 }
248 }
249 }
250 }
251 result.duration_ms = elapsed_ms(started);
252 result
253}
254
255fn bmt_matches(bytes: &[u8], expected: &[u8]) -> bool {
259 match calculate_chunk_address(bytes) {
260 Ok(a) => a.as_slice() == expected,
261 Err(_) => false,
262 }
263}
264
265fn elapsed_ms(started: Instant) -> u64 {
266 let d: Duration = started.elapsed();
267 d.as_millis().min(u128::from(u64::MAX)) as u64
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273
274 fn fake_ref() -> Reference {
275 Reference::from_hex(&"a".repeat(64)).unwrap()
276 }
277
278 #[test]
279 fn summary_renders_healthy_message() {
280 let r = DurabilityResult {
281 reference: fake_ref(),
282 started_at: SystemTime::now(),
283 duration_ms: 123,
284 chunks_total: 4,
285 chunks_lost: 0,
286 chunks_errors: 0,
287 chunks_corrupt: 0,
288 root_is_manifest: true,
289 truncated: false,
290 bmt_verified: true,
291 };
292 let s = r.summary();
293 assert!(s.contains("OK"), "{s}");
294 assert!(s.contains("4 chunks retrievable"), "{s}");
295 assert!(s.contains("manifest"), "{s}");
296 }
297
298 #[test]
299 fn summary_renders_unhealthy_breakdown() {
300 let r = DurabilityResult {
301 reference: fake_ref(),
302 started_at: SystemTime::now(),
303 duration_ms: 990,
304 chunks_total: 8,
305 chunks_lost: 2,
306 chunks_errors: 1,
307 chunks_corrupt: 0,
308 root_is_manifest: true,
309 truncated: false,
310 bmt_verified: true,
311 };
312 let s = r.summary();
313 assert!(s.contains("UNHEALTHY"), "{s}");
314 assert!(s.contains("lost 2"), "{s}");
315 assert!(s.contains("errors 1"), "{s}");
316 }
317
318 #[test]
319 fn summary_includes_corrupt_when_bmt_finds_mismatch() {
320 let r = DurabilityResult {
321 reference: fake_ref(),
322 started_at: SystemTime::now(),
323 duration_ms: 100,
324 chunks_total: 5,
325 chunks_lost: 0,
326 chunks_errors: 0,
327 chunks_corrupt: 2,
328 root_is_manifest: true,
329 truncated: false,
330 bmt_verified: true,
331 };
332 let s = r.summary();
333 assert!(!r.is_healthy());
334 assert!(s.contains("UNHEALTHY"), "{s}");
335 assert!(s.contains("corrupt 2"), "{s}");
336 }
337
338 #[test]
339 fn summary_includes_bmt_marker_when_verified() {
340 let r = DurabilityResult {
341 reference: fake_ref(),
342 started_at: SystemTime::now(),
343 duration_ms: 100,
344 chunks_total: 3,
345 chunks_lost: 0,
346 chunks_errors: 0,
347 chunks_corrupt: 0,
348 root_is_manifest: true,
349 truncated: false,
350 bmt_verified: true,
351 };
352 assert!(r.summary().contains("BMT"), "{}", r.summary());
353 }
354
355 #[test]
356 fn summary_omits_bmt_marker_when_skipped() {
357 let r = DurabilityResult {
358 reference: fake_ref(),
359 started_at: SystemTime::now(),
360 duration_ms: 100,
361 chunks_total: 3,
362 chunks_lost: 0,
363 chunks_errors: 0,
364 chunks_corrupt: 0,
365 root_is_manifest: true,
366 truncated: false,
367 bmt_verified: false,
368 };
369 assert!(!r.summary().contains("BMT"), "{}", r.summary());
370 }
371
372 #[test]
373 fn truncated_flag_surfaces_in_summary() {
374 let r = DurabilityResult {
375 reference: fake_ref(),
376 started_at: SystemTime::now(),
377 duration_ms: 1,
378 chunks_total: 10_000,
379 chunks_lost: 0,
380 chunks_errors: 0,
381 chunks_corrupt: 0,
382 root_is_manifest: true,
383 truncated: true,
384 bmt_verified: true,
385 };
386 assert!(r.summary().contains("truncated"), "{}", r.summary());
387 }
388
389 #[test]
390 fn is_healthy_requires_zero_lost_errors_and_corrupt() {
391 let mut r = DurabilityResult {
392 reference: fake_ref(),
393 started_at: SystemTime::now(),
394 duration_ms: 1,
395 chunks_total: 5,
396 chunks_lost: 0,
397 chunks_errors: 0,
398 chunks_corrupt: 0,
399 root_is_manifest: true,
400 truncated: false,
401 bmt_verified: true,
402 };
403 assert!(r.is_healthy());
404 r.chunks_lost = 1;
405 assert!(!r.is_healthy());
406 r.chunks_lost = 0;
407 r.chunks_errors = 1;
408 assert!(!r.is_healthy());
409 r.chunks_errors = 0;
410 r.chunks_corrupt = 1;
411 assert!(!r.is_healthy());
412 }
413
414 #[test]
415 fn bmt_matches_verifies_real_chunk() {
416 use bee::swarm::bmt::calculate_chunk_address;
423 let payload = b"some chunk content".to_vec();
424 let span_len = (payload.len() as u64).to_le_bytes();
425 let mut bytes = Vec::with_capacity(8 + payload.len());
426 bytes.extend_from_slice(&span_len);
427 bytes.extend_from_slice(&payload);
428 let addr = calculate_chunk_address(&bytes).expect("hash ok");
429 assert!(bmt_matches(&bytes, addr.as_slice()));
430
431 let mut tampered = bytes.clone();
433 tampered[10] ^= 0xff;
434 assert!(!bmt_matches(&tampered, addr.as_slice()));
435 }
436}