Skip to main content

bee_tui/
durability.rs

1//! `:durability-check <ref>` — the operator-facing answer to the
2//! single most-feared question: "is my data still alive?"
3//!
4//! The check walks the chunk graph rooted at `<ref>`:
5//!
6//! * Fetches the root chunk via `/chunks/{ref}`.
7//! * If the root parses as a Mantaray manifest, recursively fetches
8//!   each fork's `self_address`. Forks with `target_address` that
9//!   isn't NULL are counted as leaves but their target's BMT tree is
10//!   NOT walked (that's a v1.4 follow-up — bee-rs would need to
11//!   stream chunks through the file chunker for a complete answer).
12//! * If the root doesn't parse as a manifest, the single-chunk fetch
13//!   IS the durability answer.
14//!
15//! Result is a [`DurabilityResult`] with `(chunks_total, chunks_lost,
16//! chunks_errors)`. The S13 Watchlist screen records each invocation
17//! and surfaces the running history; `cmd_status_tx` carries the
18//! one-line summary back to the command bar.
19//!
20//! Mirrors beekeeper's `pkg/check/datadurability` but for one
21//! operator's local node + one reference, without the cluster
22//! orchestration.
23
24use std::collections::HashSet;
25use std::sync::Arc;
26use std::time::{Duration, Instant, SystemTime};
27
28use bee::manifest::{MantarayNode, unmarshal};
29use bee::swarm::Reference;
30use bee::swarm::bmt::calculate_chunk_address;
31
32use crate::api::ApiClient;
33
34/// Ceiling on how many chunks one durability-check will walk before
35/// giving up. Operators with very large manifests (10⁵+ chunks) get
36/// a partial answer rather than a stuck cockpit. Conservative
37/// default; can be lifted via a future config knob.
38const MAX_CHUNKS_PER_WALK: u64 = 10_000;
39
40/// Outcome bucket for the running summary. We separate
41/// `chunks_lost` (a 404 on `/chunks/{ref}`) from `chunks_errors`
42/// (any other failure — timeout, 500, decode error) and from
43/// `chunks_corrupt` (BMT hash of the returned content doesn't
44/// match the requested reference). They have different operator
45/// implications: lost = the network truly dropped your data;
46/// errors = something flaky that needs a retry; corrupt = a peer
47/// or local store returned different bytes than the address asked
48/// for (bit-rot, swap-corrupted on-disk chunk, hostile peer).
49#[derive(Debug, Clone)]
50pub struct DurabilityResult {
51    pub reference: Reference,
52    pub started_at: SystemTime,
53    pub duration_ms: u64,
54    pub chunks_total: u64,
55    pub chunks_lost: u64,
56    pub chunks_errors: u64,
57    /// Count of chunks the network returned but whose content
58    /// didn't BMT-hash to the requested reference. Populated only
59    /// when `bmt_verify` was on — `0` otherwise (and the operator
60    /// can't tell from the count alone whether 0 means "verified
61    /// clean" or "verification skipped"; check `bmt_verified`).
62    pub chunks_corrupt: u64,
63    /// True iff the root chunk parsed as a Mantaray manifest. When
64    /// false the rest of the counts come from a single raw-chunk
65    /// fetch.
66    pub root_is_manifest: bool,
67    /// True when we hit `MAX_CHUNKS_PER_WALK` and stopped early.
68    pub truncated: bool,
69    /// True when each fetched chunk had its content BMT-hashed and
70    /// compared against the requested reference. Default `true` for
71    /// new walks; old `DurabilityResult` records persisted to disk
72    /// before v1.5 deserialise as `false` (no `chunks_corrupt`
73    /// information available).
74    pub bmt_verified: bool,
75    /// Independent network-side answer from a swarmscan-style
76    /// indexer probe. `Some(true)` = indexer sees the ref;
77    /// `Some(false)` = indexer returned 404; `None` = probe was
78    /// skipped (config off) or errored (timeout, non-200/404).
79    pub swarmscan_seen: Option<bool>,
80}
81
82impl DurabilityResult {
83    /// All checked chunks fetched cleanly + BMT-verified.
84    pub fn is_healthy(&self) -> bool {
85        self.chunks_lost == 0 && self.chunks_errors == 0 && self.chunks_corrupt == 0
86    }
87    /// Summary line shown on the command-status row + S13 detail.
88    pub fn summary(&self) -> String {
89        let kind = if self.root_is_manifest {
90            "manifest"
91        } else {
92            "raw chunk"
93        };
94        let trunc = if self.truncated { " (truncated)" } else { "" };
95        let verify = if self.bmt_verified { " · BMT" } else { "" };
96        let swarmscan = match self.swarmscan_seen {
97            Some(true) => " · swarmscan: seen",
98            Some(false) => " · swarmscan: NOT seen",
99            None => "",
100        };
101        if self.is_healthy() {
102            format!(
103                "durability-check OK in {}ms · {kind} · {} chunk{} retrievable{verify}{swarmscan}{trunc}",
104                self.duration_ms,
105                self.chunks_total,
106                if self.chunks_total == 1 { "" } else { "s" },
107            )
108        } else {
109            format!(
110                "durability-check UNHEALTHY in {}ms · {kind} · total {} · lost {} · errors {} · corrupt {}{swarmscan}{trunc}",
111                self.duration_ms,
112                self.chunks_total,
113                self.chunks_lost,
114                self.chunks_errors,
115                self.chunks_corrupt,
116            )
117        }
118    }
119}
120
121/// Walk the chunk graph rooted at `reference` and report the result.
122/// Times out per-chunk via reqwest's default; the surrounding `tokio`
123/// task can be cancelled by dropping its handle (the Watchlist
124/// screen owns the in-flight handle). BMT verification on by
125/// default; swarmscan probe off — see [`check_with_options`].
126pub async fn check(api: Arc<ApiClient>, reference: Reference) -> DurabilityResult {
127    check_with_options(api, reference, CheckOptions::default()).await
128}
129
130/// Knobs for the durability walk.
131#[derive(Debug, Clone)]
132pub struct CheckOptions {
133    /// When `true`, every fetched chunk's content is BMT-hashed
134    /// and compared against the requested reference. Mismatches
135    /// land in `chunks_corrupt` (separate from `chunks_lost` /
136    /// `chunks_errors`). Default on for new callers — the cost is
137    /// one keccak per chunk and the correctness gain is high.
138    pub bmt_verify: bool,
139    /// When `Some(url_template)`, after the local walk completes
140    /// the cockpit hits the indexer URL (replacing `{ref}` with
141    /// the hex-encoded reference) and records the outcome on
142    /// `DurabilityResult.swarmscan_seen`. `None` skips the probe.
143    /// Templated so swarmscan API URL changes (or operators using
144    /// a different indexer) don't require a code change.
145    pub swarmscan_url: Option<String>,
146}
147
148impl Default for CheckOptions {
149    fn default() -> Self {
150        Self {
151            bmt_verify: true,
152            swarmscan_url: None,
153        }
154    }
155}
156
157/// `check` with explicit options. Exposed so a future
158/// `[durability].bmt_verify = false` config knob (or a CLI flag)
159/// can opt out for very large walks where the keccak cost adds up.
160pub async fn check_with_options(
161    api: Arc<ApiClient>,
162    reference: Reference,
163    opts: CheckOptions,
164) -> DurabilityResult {
165    let started = Instant::now();
166    let started_at = SystemTime::now();
167    let mut result = DurabilityResult {
168        reference: reference.clone(),
169        started_at,
170        duration_ms: 0,
171        chunks_total: 0,
172        chunks_lost: 0,
173        chunks_errors: 0,
174        chunks_corrupt: 0,
175        root_is_manifest: false,
176        truncated: false,
177        bmt_verified: opts.bmt_verify,
178        swarmscan_seen: None,
179    };
180
181    // Root fetch.
182    let root_bytes = match api.bee().file().download_chunk(&reference, None).await {
183        Ok(b) => b,
184        Err(e) => {
185            // Distinguish 404 (chunk genuinely not found) from other
186            // failures by looking at the error string. bee-rs doesn't
187            // expose a structured-error path here; we lean on the
188            // text format the api client emits.
189            let s = e.to_string();
190            if s.contains("404") {
191                result.chunks_lost = 1;
192            } else {
193                result.chunks_errors = 1;
194            }
195            result.chunks_total = 1;
196            result.duration_ms = elapsed_ms(started);
197            return result;
198        }
199    };
200    result.chunks_total = 1;
201    if opts.bmt_verify && !bmt_matches(&root_bytes, reference.as_bytes()) {
202        // Root content doesn't hash to the requested reference —
203        // count as corrupt, but still try to parse as a manifest
204        // (operator gets a more useful "what was retrieved looked
205        // like a manifest, but the bytes were wrong" signal).
206        result.chunks_corrupt += 1;
207    }
208
209    // Try to parse as manifest. If not, we're done — single chunk
210    // fetch was the answer.
211    let root_node = match unmarshal(&root_bytes, reference.as_bytes()) {
212        Ok(n) => n,
213        Err(_) => {
214            result.duration_ms = elapsed_ms(started);
215            return result;
216        }
217    };
218    result.root_is_manifest = true;
219
220    // BFS over fork tree. Track visited self-addresses to short-circuit
221    // cycles (shouldn't happen in a real manifest but cheap insurance).
222    let mut visited: HashSet<[u8; 32]> = HashSet::new();
223    let mut queue: Vec<MantarayNode> = vec![root_node];
224
225    while let Some(node) = queue.pop() {
226        for fork in node.forks.values() {
227            let Some(addr) = fork.node.self_address else {
228                continue;
229            };
230            if !visited.insert(addr) {
231                continue;
232            }
233            if result.chunks_total >= MAX_CHUNKS_PER_WALK {
234                result.truncated = true;
235                result.duration_ms = elapsed_ms(started);
236                return result;
237            }
238            result.chunks_total += 1;
239            let child_ref = match Reference::new(&addr) {
240                Ok(r) => r,
241                Err(_) => {
242                    result.chunks_errors += 1;
243                    continue;
244                }
245            };
246            match api.bee().file().download_chunk(&child_ref, None).await {
247                Ok(child_bytes) => {
248                    if opts.bmt_verify && !bmt_matches(&child_bytes, child_ref.as_bytes()) {
249                        // Don't descend into corrupt chunks — their
250                        // unmarshal output is untrustworthy.
251                        result.chunks_corrupt += 1;
252                        continue;
253                    }
254                    // Try to keep walking — if this fork is itself a
255                    // sub-manifest its forks reach further leaves.
256                    if let Ok(child_node) = unmarshal(&child_bytes, child_ref.as_bytes()) {
257                        queue.push(child_node);
258                    }
259                }
260                Err(e) => {
261                    if e.to_string().contains("404") {
262                        result.chunks_lost += 1;
263                    } else {
264                        result.chunks_errors += 1;
265                    }
266                }
267            }
268        }
269    }
270    // Optional swarmscan cross-check. The probe URL is templated
271    // so the operator can point at any indexer with a similar
272    // shape (200 = seen, 404 = not seen).
273    if let Some(template) = opts.swarmscan_url.as_deref() {
274        result.swarmscan_seen = swarmscan_probe(template, &reference).await;
275    }
276    result.duration_ms = elapsed_ms(started);
277    result
278}
279
280/// HTTP GET against the swarmscan-style probe URL.
281/// `Some(true)` on 200, `Some(false)` on 404, `None` on anything
282/// else (timeout, 5xx, DNS failure, …) — the operator's S13 row
283/// then renders "?" rather than a misleading boolean.
284async fn swarmscan_probe(url_template: &str, reference: &Reference) -> Option<bool> {
285    let url = url_template.replace("{ref}", &reference.to_hex());
286    let client = reqwest::Client::builder()
287        .timeout(std::time::Duration::from_secs(5))
288        .user_agent(concat!("bee-tui/", env!("CARGO_PKG_VERSION")))
289        .build()
290        .ok()?;
291    match client.get(&url).send().await {
292        Ok(resp) => match resp.status().as_u16() {
293            200 => Some(true),
294            404 => Some(false),
295            _ => None,
296        },
297        Err(_) => None,
298    }
299}
300
301/// True when `bytes` BMT-hashes to `expected`. Returns `false` on
302/// any error (e.g. payload exceeds `CHUNK_SIZE`) — caller treats
303/// that as "didn't verify cleanly", which lands in `chunks_corrupt`.
304fn bmt_matches(bytes: &[u8], expected: &[u8]) -> bool {
305    match calculate_chunk_address(bytes) {
306        Ok(a) => a.as_slice() == expected,
307        Err(_) => false,
308    }
309}
310
311fn elapsed_ms(started: Instant) -> u64 {
312    let d: Duration = started.elapsed();
313    d.as_millis().min(u128::from(u64::MAX)) as u64
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319
320    fn fake_ref() -> Reference {
321        Reference::from_hex(&"a".repeat(64)).unwrap()
322    }
323
324    #[test]
325    fn summary_renders_healthy_message() {
326        let r = DurabilityResult {
327            reference: fake_ref(),
328            started_at: SystemTime::now(),
329            duration_ms: 123,
330            chunks_total: 4,
331            chunks_lost: 0,
332            chunks_errors: 0,
333            chunks_corrupt: 0,
334            root_is_manifest: true,
335            truncated: false,
336            bmt_verified: true,
337            swarmscan_seen: None,
338        };
339        let s = r.summary();
340        assert!(s.contains("OK"), "{s}");
341        assert!(s.contains("4 chunks retrievable"), "{s}");
342        assert!(s.contains("manifest"), "{s}");
343    }
344
345    #[test]
346    fn summary_renders_unhealthy_breakdown() {
347        let r = DurabilityResult {
348            reference: fake_ref(),
349            started_at: SystemTime::now(),
350            duration_ms: 990,
351            chunks_total: 8,
352            chunks_lost: 2,
353            chunks_errors: 1,
354            chunks_corrupt: 0,
355            root_is_manifest: true,
356            truncated: false,
357            bmt_verified: true,
358            swarmscan_seen: None,
359        };
360        let s = r.summary();
361        assert!(s.contains("UNHEALTHY"), "{s}");
362        assert!(s.contains("lost 2"), "{s}");
363        assert!(s.contains("errors 1"), "{s}");
364    }
365
366    #[test]
367    fn summary_includes_corrupt_when_bmt_finds_mismatch() {
368        let r = DurabilityResult {
369            reference: fake_ref(),
370            started_at: SystemTime::now(),
371            duration_ms: 100,
372            chunks_total: 5,
373            chunks_lost: 0,
374            chunks_errors: 0,
375            chunks_corrupt: 2,
376            root_is_manifest: true,
377            truncated: false,
378            bmt_verified: true,
379            swarmscan_seen: None,
380        };
381        let s = r.summary();
382        assert!(!r.is_healthy());
383        assert!(s.contains("UNHEALTHY"), "{s}");
384        assert!(s.contains("corrupt 2"), "{s}");
385    }
386
387    #[test]
388    fn summary_includes_bmt_marker_when_verified() {
389        let r = DurabilityResult {
390            reference: fake_ref(),
391            started_at: SystemTime::now(),
392            duration_ms: 100,
393            chunks_total: 3,
394            chunks_lost: 0,
395            chunks_errors: 0,
396            chunks_corrupt: 0,
397            root_is_manifest: true,
398            truncated: false,
399            bmt_verified: true,
400            swarmscan_seen: None,
401        };
402        assert!(r.summary().contains("BMT"), "{}", r.summary());
403    }
404
405    #[test]
406    fn summary_omits_bmt_marker_when_skipped() {
407        let r = DurabilityResult {
408            reference: fake_ref(),
409            started_at: SystemTime::now(),
410            duration_ms: 100,
411            chunks_total: 3,
412            chunks_lost: 0,
413            chunks_errors: 0,
414            chunks_corrupt: 0,
415            root_is_manifest: true,
416            truncated: false,
417            bmt_verified: false,
418            swarmscan_seen: None,
419        };
420        assert!(!r.summary().contains("BMT"), "{}", r.summary());
421    }
422
423    #[test]
424    fn truncated_flag_surfaces_in_summary() {
425        let r = DurabilityResult {
426            reference: fake_ref(),
427            started_at: SystemTime::now(),
428            duration_ms: 1,
429            chunks_total: 10_000,
430            chunks_lost: 0,
431            chunks_errors: 0,
432            chunks_corrupt: 0,
433            root_is_manifest: true,
434            truncated: true,
435            bmt_verified: true,
436            swarmscan_seen: None,
437        };
438        assert!(r.summary().contains("truncated"), "{}", r.summary());
439    }
440
441    #[test]
442    fn is_healthy_requires_zero_lost_errors_and_corrupt() {
443        let mut r = DurabilityResult {
444            reference: fake_ref(),
445            started_at: SystemTime::now(),
446            duration_ms: 1,
447            chunks_total: 5,
448            chunks_lost: 0,
449            chunks_errors: 0,
450            chunks_corrupt: 0,
451            root_is_manifest: true,
452            truncated: false,
453            bmt_verified: true,
454            swarmscan_seen: None,
455        };
456        assert!(r.is_healthy());
457        r.chunks_lost = 1;
458        assert!(!r.is_healthy());
459        r.chunks_lost = 0;
460        r.chunks_errors = 1;
461        assert!(!r.is_healthy());
462        r.chunks_errors = 0;
463        r.chunks_corrupt = 1;
464        assert!(!r.is_healthy());
465    }
466
467    #[test]
468    fn summary_includes_swarmscan_seen() {
469        let mut r = DurabilityResult {
470            reference: fake_ref(),
471            started_at: SystemTime::now(),
472            duration_ms: 100,
473            chunks_total: 3,
474            chunks_lost: 0,
475            chunks_errors: 0,
476            chunks_corrupt: 0,
477            root_is_manifest: true,
478            truncated: false,
479            bmt_verified: true,
480            swarmscan_seen: Some(true),
481        };
482        assert!(r.summary().contains("swarmscan: seen"), "{}", r.summary());
483
484        r.swarmscan_seen = Some(false);
485        // A "NOT seen" answer doesn't change is_healthy() — it's a
486        // separate independent signal — but the summary surfaces it.
487        assert!(
488            r.summary().contains("swarmscan: NOT seen"),
489            "{}",
490            r.summary(),
491        );
492
493        r.swarmscan_seen = None;
494        assert!(!r.summary().contains("swarmscan"), "{}", r.summary());
495    }
496
497    #[test]
498    fn bmt_matches_verifies_real_chunk() {
499        // Build a span+payload pair; BMT-hash it; assert
500        // bmt_matches() agrees on the same input + the chunk's
501        // computed address. This guards against accidentally
502        // breaking the calculate_chunk_address contract from
503        // bee-rs without us noticing — the durability walk's
504        // correctness depends on this round-trip.
505        use bee::swarm::bmt::calculate_chunk_address;
506        let payload = b"some chunk content".to_vec();
507        let span_len = (payload.len() as u64).to_le_bytes();
508        let mut bytes = Vec::with_capacity(8 + payload.len());
509        bytes.extend_from_slice(&span_len);
510        bytes.extend_from_slice(&payload);
511        let addr = calculate_chunk_address(&bytes).expect("hash ok");
512        assert!(bmt_matches(&bytes, addr.as_slice()));
513
514        // Flip one byte → no longer matches.
515        let mut tampered = bytes.clone();
516        tampered[10] ^= 0xff;
517        assert!(!bmt_matches(&tampered, addr.as_slice()));
518    }
519}