Skip to main content

bee_tui/
durability.rs

1//! `:durability-check <ref>` — the operator-facing answer to the
2//! single most-feared question: "is my data still alive?"
3//!
4//! The check walks the chunk graph rooted at `<ref>`:
5//!
6//! * Fetches the root chunk via `/chunks/{ref}`.
7//! * If the root parses as a Mantaray manifest, recursively fetches
8//!   each fork's `self_address`. Forks with `target_address` that
9//!   isn't NULL are counted as leaves but their target's BMT tree is
10//!   NOT walked (that's a v1.4 follow-up — bee-rs would need to
11//!   stream chunks through the file chunker for a complete answer).
12//! * If the root doesn't parse as a manifest, the single-chunk fetch
13//!   IS the durability answer.
14//!
15//! Result is a [`DurabilityResult`] with `(chunks_total, chunks_lost,
16//! chunks_errors)`. The S13 Watchlist screen records each invocation
17//! and surfaces the running history; `cmd_status_tx` carries the
18//! one-line summary back to the command bar.
19//!
20//! Mirrors beekeeper's `pkg/check/datadurability` but for one
21//! operator's local node + one reference, without the cluster
22//! orchestration.
23
24use std::collections::HashSet;
25use std::sync::Arc;
26use std::time::{Duration, Instant, SystemTime};
27
28use bee::manifest::{MantarayNode, unmarshal};
29use bee::swarm::Reference;
30use bee::swarm::bmt::calculate_chunk_address;
31
32use crate::api::ApiClient;
33
34/// Ceiling on how many chunks one durability-check will walk before
35/// giving up. Operators with very large manifests (10⁵+ chunks) get
36/// a partial answer rather than a stuck cockpit. Conservative
37/// default; can be lifted via a future config knob.
38const MAX_CHUNKS_PER_WALK: u64 = 10_000;
39
40/// Outcome bucket for the running summary. We separate
41/// `chunks_lost` (a 404 on `/chunks/{ref}`) from `chunks_errors`
42/// (any other failure — timeout, 500, decode error) and from
43/// `chunks_corrupt` (BMT hash of the returned content doesn't
44/// match the requested reference). They have different operator
45/// implications: lost = the network truly dropped your data;
46/// errors = something flaky that needs a retry; corrupt = a peer
47/// or local store returned different bytes than the address asked
48/// for (bit-rot, swap-corrupted on-disk chunk, hostile peer).
49#[derive(Debug, Clone)]
50pub struct DurabilityResult {
51    pub reference: Reference,
52    pub started_at: SystemTime,
53    pub duration_ms: u64,
54    pub chunks_total: u64,
55    pub chunks_lost: u64,
56    pub chunks_errors: u64,
57    /// Count of chunks the network returned but whose content
58    /// didn't BMT-hash to the requested reference. Populated only
59    /// when `bmt_verify` was on — `0` otherwise (and the operator
60    /// can't tell from the count alone whether 0 means "verified
61    /// clean" or "verification skipped"; check `bmt_verified`).
62    pub chunks_corrupt: u64,
63    /// True iff the root chunk parsed as a Mantaray manifest. When
64    /// false the rest of the counts come from a single raw-chunk
65    /// fetch.
66    pub root_is_manifest: bool,
67    /// True when we hit `MAX_CHUNKS_PER_WALK` and stopped early.
68    pub truncated: bool,
69    /// True when each fetched chunk had its content BMT-hashed and
70    /// compared against the requested reference. Default `true` for
71    /// new walks; old `DurabilityResult` records persisted to disk
72    /// before v1.5 deserialise as `false` (no `chunks_corrupt`
73    /// information available).
74    pub bmt_verified: bool,
75}
76
77impl DurabilityResult {
78    /// All checked chunks fetched cleanly + BMT-verified.
79    pub fn is_healthy(&self) -> bool {
80        self.chunks_lost == 0 && self.chunks_errors == 0 && self.chunks_corrupt == 0
81    }
82    /// Summary line shown on the command-status row + S13 detail.
83    pub fn summary(&self) -> String {
84        let kind = if self.root_is_manifest {
85            "manifest"
86        } else {
87            "raw chunk"
88        };
89        let trunc = if self.truncated { " (truncated)" } else { "" };
90        let verify = if self.bmt_verified { " · BMT" } else { "" };
91        if self.is_healthy() {
92            format!(
93                "durability-check OK in {}ms · {kind} · {} chunk{} retrievable{verify}{trunc}",
94                self.duration_ms,
95                self.chunks_total,
96                if self.chunks_total == 1 { "" } else { "s" },
97            )
98        } else {
99            format!(
100                "durability-check UNHEALTHY in {}ms · {kind} · total {} · lost {} · errors {} · corrupt {}{trunc}",
101                self.duration_ms,
102                self.chunks_total,
103                self.chunks_lost,
104                self.chunks_errors,
105                self.chunks_corrupt,
106            )
107        }
108    }
109}
110
111/// Walk the chunk graph rooted at `reference` and report the result.
112/// Times out per-chunk via reqwest's default; the surrounding `tokio`
113/// task can be cancelled by dropping its handle (the Watchlist
114/// screen owns the in-flight handle). BMT verification on by
115/// default — see [`check_with_options`].
116pub async fn check(api: Arc<ApiClient>, reference: Reference) -> DurabilityResult {
117    check_with_options(api, reference, CheckOptions { bmt_verify: true }).await
118}
119
120/// Knobs for the durability walk. `bmt_verify` is the only one
121/// today; future iterations may add `concurrency`,
122/// `bytes_per_chunk_limit`, etc.
123#[derive(Debug, Clone, Copy)]
124pub struct CheckOptions {
125    /// When `true`, every fetched chunk's content is BMT-hashed
126    /// and compared against the requested reference. Mismatches
127    /// land in `chunks_corrupt` (separate from `chunks_lost` /
128    /// `chunks_errors`). Default on for new callers — the cost is
129    /// one keccak per chunk and the correctness gain is high.
130    pub bmt_verify: bool,
131}
132
133impl Default for CheckOptions {
134    fn default() -> Self {
135        Self { bmt_verify: true }
136    }
137}
138
139/// `check` with explicit options. Exposed so a future
140/// `[durability].bmt_verify = false` config knob (or a CLI flag)
141/// can opt out for very large walks where the keccak cost adds up.
142pub async fn check_with_options(
143    api: Arc<ApiClient>,
144    reference: Reference,
145    opts: CheckOptions,
146) -> DurabilityResult {
147    let started = Instant::now();
148    let started_at = SystemTime::now();
149    let mut result = DurabilityResult {
150        reference: reference.clone(),
151        started_at,
152        duration_ms: 0,
153        chunks_total: 0,
154        chunks_lost: 0,
155        chunks_errors: 0,
156        chunks_corrupt: 0,
157        root_is_manifest: false,
158        truncated: false,
159        bmt_verified: opts.bmt_verify,
160    };
161
162    // Root fetch.
163    let root_bytes = match api.bee().file().download_chunk(&reference, None).await {
164        Ok(b) => b,
165        Err(e) => {
166            // Distinguish 404 (chunk genuinely not found) from other
167            // failures by looking at the error string. bee-rs doesn't
168            // expose a structured-error path here; we lean on the
169            // text format the api client emits.
170            let s = e.to_string();
171            if s.contains("404") {
172                result.chunks_lost = 1;
173            } else {
174                result.chunks_errors = 1;
175            }
176            result.chunks_total = 1;
177            result.duration_ms = elapsed_ms(started);
178            return result;
179        }
180    };
181    result.chunks_total = 1;
182    if opts.bmt_verify && !bmt_matches(&root_bytes, reference.as_bytes()) {
183        // Root content doesn't hash to the requested reference —
184        // count as corrupt, but still try to parse as a manifest
185        // (operator gets a more useful "what was retrieved looked
186        // like a manifest, but the bytes were wrong" signal).
187        result.chunks_corrupt += 1;
188    }
189
190    // Try to parse as manifest. If not, we're done — single chunk
191    // fetch was the answer.
192    let root_node = match unmarshal(&root_bytes, reference.as_bytes()) {
193        Ok(n) => n,
194        Err(_) => {
195            result.duration_ms = elapsed_ms(started);
196            return result;
197        }
198    };
199    result.root_is_manifest = true;
200
201    // BFS over fork tree. Track visited self-addresses to short-circuit
202    // cycles (shouldn't happen in a real manifest but cheap insurance).
203    let mut visited: HashSet<[u8; 32]> = HashSet::new();
204    let mut queue: Vec<MantarayNode> = vec![root_node];
205
206    while let Some(node) = queue.pop() {
207        for fork in node.forks.values() {
208            let Some(addr) = fork.node.self_address else {
209                continue;
210            };
211            if !visited.insert(addr) {
212                continue;
213            }
214            if result.chunks_total >= MAX_CHUNKS_PER_WALK {
215                result.truncated = true;
216                result.duration_ms = elapsed_ms(started);
217                return result;
218            }
219            result.chunks_total += 1;
220            let child_ref = match Reference::new(&addr) {
221                Ok(r) => r,
222                Err(_) => {
223                    result.chunks_errors += 1;
224                    continue;
225                }
226            };
227            match api.bee().file().download_chunk(&child_ref, None).await {
228                Ok(child_bytes) => {
229                    if opts.bmt_verify && !bmt_matches(&child_bytes, child_ref.as_bytes()) {
230                        // Don't descend into corrupt chunks — their
231                        // unmarshal output is untrustworthy.
232                        result.chunks_corrupt += 1;
233                        continue;
234                    }
235                    // Try to keep walking — if this fork is itself a
236                    // sub-manifest its forks reach further leaves.
237                    if let Ok(child_node) = unmarshal(&child_bytes, child_ref.as_bytes()) {
238                        queue.push(child_node);
239                    }
240                }
241                Err(e) => {
242                    if e.to_string().contains("404") {
243                        result.chunks_lost += 1;
244                    } else {
245                        result.chunks_errors += 1;
246                    }
247                }
248            }
249        }
250    }
251    result.duration_ms = elapsed_ms(started);
252    result
253}
254
255/// True when `bytes` BMT-hashes to `expected`. Returns `false` on
256/// any error (e.g. payload exceeds `CHUNK_SIZE`) — caller treats
257/// that as "didn't verify cleanly", which lands in `chunks_corrupt`.
258fn bmt_matches(bytes: &[u8], expected: &[u8]) -> bool {
259    match calculate_chunk_address(bytes) {
260        Ok(a) => a.as_slice() == expected,
261        Err(_) => false,
262    }
263}
264
265fn elapsed_ms(started: Instant) -> u64 {
266    let d: Duration = started.elapsed();
267    d.as_millis().min(u128::from(u64::MAX)) as u64
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    fn fake_ref() -> Reference {
275        Reference::from_hex(&"a".repeat(64)).unwrap()
276    }
277
278    #[test]
279    fn summary_renders_healthy_message() {
280        let r = DurabilityResult {
281            reference: fake_ref(),
282            started_at: SystemTime::now(),
283            duration_ms: 123,
284            chunks_total: 4,
285            chunks_lost: 0,
286            chunks_errors: 0,
287            chunks_corrupt: 0,
288            root_is_manifest: true,
289            truncated: false,
290            bmt_verified: true,
291        };
292        let s = r.summary();
293        assert!(s.contains("OK"), "{s}");
294        assert!(s.contains("4 chunks retrievable"), "{s}");
295        assert!(s.contains("manifest"), "{s}");
296    }
297
298    #[test]
299    fn summary_renders_unhealthy_breakdown() {
300        let r = DurabilityResult {
301            reference: fake_ref(),
302            started_at: SystemTime::now(),
303            duration_ms: 990,
304            chunks_total: 8,
305            chunks_lost: 2,
306            chunks_errors: 1,
307            chunks_corrupt: 0,
308            root_is_manifest: true,
309            truncated: false,
310            bmt_verified: true,
311        };
312        let s = r.summary();
313        assert!(s.contains("UNHEALTHY"), "{s}");
314        assert!(s.contains("lost 2"), "{s}");
315        assert!(s.contains("errors 1"), "{s}");
316    }
317
318    #[test]
319    fn summary_includes_corrupt_when_bmt_finds_mismatch() {
320        let r = DurabilityResult {
321            reference: fake_ref(),
322            started_at: SystemTime::now(),
323            duration_ms: 100,
324            chunks_total: 5,
325            chunks_lost: 0,
326            chunks_errors: 0,
327            chunks_corrupt: 2,
328            root_is_manifest: true,
329            truncated: false,
330            bmt_verified: true,
331        };
332        let s = r.summary();
333        assert!(!r.is_healthy());
334        assert!(s.contains("UNHEALTHY"), "{s}");
335        assert!(s.contains("corrupt 2"), "{s}");
336    }
337
338    #[test]
339    fn summary_includes_bmt_marker_when_verified() {
340        let r = DurabilityResult {
341            reference: fake_ref(),
342            started_at: SystemTime::now(),
343            duration_ms: 100,
344            chunks_total: 3,
345            chunks_lost: 0,
346            chunks_errors: 0,
347            chunks_corrupt: 0,
348            root_is_manifest: true,
349            truncated: false,
350            bmt_verified: true,
351        };
352        assert!(r.summary().contains("BMT"), "{}", r.summary());
353    }
354
355    #[test]
356    fn summary_omits_bmt_marker_when_skipped() {
357        let r = DurabilityResult {
358            reference: fake_ref(),
359            started_at: SystemTime::now(),
360            duration_ms: 100,
361            chunks_total: 3,
362            chunks_lost: 0,
363            chunks_errors: 0,
364            chunks_corrupt: 0,
365            root_is_manifest: true,
366            truncated: false,
367            bmt_verified: false,
368        };
369        assert!(!r.summary().contains("BMT"), "{}", r.summary());
370    }
371
372    #[test]
373    fn truncated_flag_surfaces_in_summary() {
374        let r = DurabilityResult {
375            reference: fake_ref(),
376            started_at: SystemTime::now(),
377            duration_ms: 1,
378            chunks_total: 10_000,
379            chunks_lost: 0,
380            chunks_errors: 0,
381            chunks_corrupt: 0,
382            root_is_manifest: true,
383            truncated: true,
384            bmt_verified: true,
385        };
386        assert!(r.summary().contains("truncated"), "{}", r.summary());
387    }
388
389    #[test]
390    fn is_healthy_requires_zero_lost_errors_and_corrupt() {
391        let mut r = DurabilityResult {
392            reference: fake_ref(),
393            started_at: SystemTime::now(),
394            duration_ms: 1,
395            chunks_total: 5,
396            chunks_lost: 0,
397            chunks_errors: 0,
398            chunks_corrupt: 0,
399            root_is_manifest: true,
400            truncated: false,
401            bmt_verified: true,
402        };
403        assert!(r.is_healthy());
404        r.chunks_lost = 1;
405        assert!(!r.is_healthy());
406        r.chunks_lost = 0;
407        r.chunks_errors = 1;
408        assert!(!r.is_healthy());
409        r.chunks_errors = 0;
410        r.chunks_corrupt = 1;
411        assert!(!r.is_healthy());
412    }
413
414    #[test]
415    fn bmt_matches_verifies_real_chunk() {
416        // Build a span+payload pair; BMT-hash it; assert
417        // bmt_matches() agrees on the same input + the chunk's
418        // computed address. This guards against accidentally
419        // breaking the calculate_chunk_address contract from
420        // bee-rs without us noticing — the durability walk's
421        // correctness depends on this round-trip.
422        use bee::swarm::bmt::calculate_chunk_address;
423        let payload = b"some chunk content".to_vec();
424        let span_len = (payload.len() as u64).to_le_bytes();
425        let mut bytes = Vec::with_capacity(8 + payload.len());
426        bytes.extend_from_slice(&span_len);
427        bytes.extend_from_slice(&payload);
428        let addr = calculate_chunk_address(&bytes).expect("hash ok");
429        assert!(bmt_matches(&bytes, addr.as_slice()));
430
431        // Flip one byte → no longer matches.
432        let mut tampered = bytes.clone();
433        tampered[10] ^= 0xff;
434        assert!(!bmt_matches(&tampered, addr.as_slice()));
435    }
436}