Skip to main content

bee_tui/
durability.rs

1//! `:durability-check <ref>` — the operator-facing answer to the
2//! single most-feared question: "is my data still alive?"
3//!
4//! The check walks the chunk graph rooted at `<ref>`:
5//!
6//! * Fetches the root chunk via `/chunks/{ref}`.
7//! * If the root parses as a Mantaray manifest, recursively fetches
8//!   each fork's `self_address`. Forks with `target_address` that
9//!   isn't NULL are counted as leaves but their target's BMT tree is
10//!   NOT walked (that's a v1.4 follow-up — bee-rs would need to
11//!   stream chunks through the file chunker for a complete answer).
12//! * If the root doesn't parse as a manifest, the single-chunk fetch
13//!   IS the durability answer.
14//!
15//! Result is a [`DurabilityResult`] with `(chunks_total, chunks_lost,
16//! chunks_errors)`. The S13 Watchlist screen records each invocation
17//! and surfaces the running history; `cmd_status_tx` carries the
18//! one-line summary back to the command bar.
19//!
20//! Mirrors beekeeper's `pkg/check/datadurability` but for one
21//! operator's local node + one reference, without the cluster
22//! orchestration.
23
24use std::collections::HashSet;
25use std::sync::Arc;
26use std::time::{Duration, Instant, SystemTime};
27
28use bee::manifest::{MantarayNode, unmarshal};
29use bee::swarm::Reference;
30
31use crate::api::ApiClient;
32
33/// Ceiling on how many chunks one durability-check will walk before
34/// giving up. Operators with very large manifests (10⁵+ chunks) get
35/// a partial answer rather than a stuck cockpit. Conservative
36/// default; can be lifted via a future config knob.
37const MAX_CHUNKS_PER_WALK: u64 = 10_000;
38
39/// Outcome bucket for the running summary. We separate
40/// `chunks_lost` (a 404 on `/chunks/{ref}`) from `chunks_errors`
41/// (any other failure — timeout, 500, decode error) because they
42/// have different operator implications: lost = the network truly
43/// dropped your data; errors = something flaky that needs a retry.
44#[derive(Debug, Clone)]
45pub struct DurabilityResult {
46    pub reference: Reference,
47    pub started_at: SystemTime,
48    pub duration_ms: u64,
49    pub chunks_total: u64,
50    pub chunks_lost: u64,
51    pub chunks_errors: u64,
52    /// True iff the root chunk parsed as a Mantaray manifest. When
53    /// false the rest of the counts come from a single raw-chunk
54    /// fetch.
55    pub root_is_manifest: bool,
56    /// True when we hit `MAX_CHUNKS_PER_WALK` and stopped early.
57    pub truncated: bool,
58}
59
60impl DurabilityResult {
61    /// All checked chunks fetched cleanly.
62    pub fn is_healthy(&self) -> bool {
63        self.chunks_lost == 0 && self.chunks_errors == 0
64    }
65    /// Summary line shown on the command-status row + S13 detail.
66    pub fn summary(&self) -> String {
67        let kind = if self.root_is_manifest {
68            "manifest"
69        } else {
70            "raw chunk"
71        };
72        let trunc = if self.truncated { " (truncated)" } else { "" };
73        if self.is_healthy() {
74            format!(
75                "durability-check OK in {}ms · {kind} · {} chunk{} retrievable{trunc}",
76                self.duration_ms,
77                self.chunks_total,
78                if self.chunks_total == 1 { "" } else { "s" },
79            )
80        } else {
81            format!(
82                "durability-check UNHEALTHY in {}ms · {kind} · total {} · lost {} · errors {}{trunc}",
83                self.duration_ms,
84                self.chunks_total,
85                self.chunks_lost,
86                self.chunks_errors,
87            )
88        }
89    }
90}
91
92/// Walk the chunk graph rooted at `reference` and report the result.
93/// Times out per-chunk via reqwest's default; the surrounding `tokio`
94/// task can be cancelled by dropping its handle (the Watchlist
95/// screen owns the in-flight handle).
96pub async fn check(api: Arc<ApiClient>, reference: Reference) -> DurabilityResult {
97    let started = Instant::now();
98    let started_at = SystemTime::now();
99    let mut result = DurabilityResult {
100        reference: reference.clone(),
101        started_at,
102        duration_ms: 0,
103        chunks_total: 0,
104        chunks_lost: 0,
105        chunks_errors: 0,
106        root_is_manifest: false,
107        truncated: false,
108    };
109
110    // Root fetch.
111    let root_bytes = match api.bee().file().download_chunk(&reference, None).await {
112        Ok(b) => b,
113        Err(e) => {
114            // Distinguish 404 (chunk genuinely not found) from other
115            // failures by looking at the error string. bee-rs doesn't
116            // expose a structured-error path here; we lean on the
117            // text format the api client emits.
118            let s = e.to_string();
119            if s.contains("404") {
120                result.chunks_lost = 1;
121            } else {
122                result.chunks_errors = 1;
123            }
124            result.chunks_total = 1;
125            result.duration_ms = elapsed_ms(started);
126            return result;
127        }
128    };
129    result.chunks_total = 1;
130
131    // Try to parse as manifest. If not, we're done — single chunk
132    // fetch was the answer.
133    let root_node = match unmarshal(&root_bytes, reference.as_bytes()) {
134        Ok(n) => n,
135        Err(_) => {
136            result.duration_ms = elapsed_ms(started);
137            return result;
138        }
139    };
140    result.root_is_manifest = true;
141
142    // BFS over fork tree. Track visited self-addresses to short-circuit
143    // cycles (shouldn't happen in a real manifest but cheap insurance).
144    let mut visited: HashSet<[u8; 32]> = HashSet::new();
145    let mut queue: Vec<MantarayNode> = vec![root_node];
146
147    while let Some(node) = queue.pop() {
148        for fork in node.forks.values() {
149            let Some(addr) = fork.node.self_address else {
150                continue;
151            };
152            if !visited.insert(addr) {
153                continue;
154            }
155            if result.chunks_total >= MAX_CHUNKS_PER_WALK {
156                result.truncated = true;
157                result.duration_ms = elapsed_ms(started);
158                return result;
159            }
160            result.chunks_total += 1;
161            let child_ref = match Reference::new(&addr) {
162                Ok(r) => r,
163                Err(_) => {
164                    result.chunks_errors += 1;
165                    continue;
166                }
167            };
168            match api.bee().file().download_chunk(&child_ref, None).await {
169                Ok(child_bytes) => {
170                    // Try to keep walking — if this fork is itself a
171                    // sub-manifest its forks reach further leaves.
172                    if let Ok(child_node) = unmarshal(&child_bytes, child_ref.as_bytes()) {
173                        queue.push(child_node);
174                    }
175                }
176                Err(e) => {
177                    if e.to_string().contains("404") {
178                        result.chunks_lost += 1;
179                    } else {
180                        result.chunks_errors += 1;
181                    }
182                }
183            }
184        }
185    }
186    result.duration_ms = elapsed_ms(started);
187    result
188}
189
190fn elapsed_ms(started: Instant) -> u64 {
191    let d: Duration = started.elapsed();
192    d.as_millis().min(u128::from(u64::MAX)) as u64
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198
199    fn fake_ref() -> Reference {
200        Reference::from_hex(&"a".repeat(64)).unwrap()
201    }
202
203    #[test]
204    fn summary_renders_healthy_message() {
205        let r = DurabilityResult {
206            reference: fake_ref(),
207            started_at: SystemTime::now(),
208            duration_ms: 123,
209            chunks_total: 4,
210            chunks_lost: 0,
211            chunks_errors: 0,
212            root_is_manifest: true,
213            truncated: false,
214        };
215        let s = r.summary();
216        assert!(s.contains("OK"), "{s}");
217        assert!(s.contains("4 chunks retrievable"), "{s}");
218        assert!(s.contains("manifest"), "{s}");
219    }
220
221    #[test]
222    fn summary_renders_unhealthy_breakdown() {
223        let r = DurabilityResult {
224            reference: fake_ref(),
225            started_at: SystemTime::now(),
226            duration_ms: 990,
227            chunks_total: 8,
228            chunks_lost: 2,
229            chunks_errors: 1,
230            root_is_manifest: true,
231            truncated: false,
232        };
233        let s = r.summary();
234        assert!(s.contains("UNHEALTHY"), "{s}");
235        assert!(s.contains("lost 2"), "{s}");
236        assert!(s.contains("errors 1"), "{s}");
237    }
238
239    #[test]
240    fn truncated_flag_surfaces_in_summary() {
241        let r = DurabilityResult {
242            reference: fake_ref(),
243            started_at: SystemTime::now(),
244            duration_ms: 1,
245            chunks_total: 10_000,
246            chunks_lost: 0,
247            chunks_errors: 0,
248            root_is_manifest: true,
249            truncated: true,
250        };
251        assert!(r.summary().contains("truncated"), "{}", r.summary());
252    }
253
254    #[test]
255    fn is_healthy_requires_zero_lost_and_zero_errors() {
256        let mut r = DurabilityResult {
257            reference: fake_ref(),
258            started_at: SystemTime::now(),
259            duration_ms: 1,
260            chunks_total: 5,
261            chunks_lost: 0,
262            chunks_errors: 0,
263            root_is_manifest: true,
264            truncated: false,
265        };
266        assert!(r.is_healthy());
267        r.chunks_lost = 1;
268        assert!(!r.is_healthy());
269        r.chunks_lost = 0;
270        r.chunks_errors = 1;
271        assert!(!r.is_healthy());
272    }
273}