Skip to main content

bee_tui/watch/
mod.rs

1#![allow(dead_code)] // wired into App + Health screen in the next commits.
2
3//! k9s-style watch / informer layer.
4//!
5//! One [`BeeWatch`] hub spawns a polling task per resource group;
6//! each task pushes fresh snapshots into a [`tokio::sync::watch`]
7//! channel. Screens subscribe via [`watch::Receiver`] handles and
8//! render the latest snapshot — they never poll directly.
9//!
10//! The cancellation tree mirrors `docs/PLAN.md` § 6: every poller's
11//! token is a child of the hub's, which is a child of the App's
12//! root. Quitting cancels the root and unwinds everything; switching
13//! profile (v0.4) drops one hub and starts another.
14//!
15//! Refresh policy is per resource group, not global — `tig`-style
16//! (`docs/PLAN.md` § 3 principle 7).
17
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use bee::debug::{ChainState, RedistributionState, Status, Wallet};
22use bee::postage::PostageBatch;
23use tokio::sync::watch;
24use tokio_util::sync::CancellationToken;
25
26use crate::api::ApiClient;
27
28/// Snapshot fed to the Health screen and the connection-status bar.
29/// Updated together because the gates need a coherent view across
30/// `/status`, `/chainstate`, `/wallet`, and `/redistributionstate`.
31#[derive(Clone, Debug, Default)]
32pub struct HealthSnapshot {
33    pub status: Option<Status>,
34    pub chain_state: Option<ChainState>,
35    pub wallet: Option<Wallet>,
36    pub redistribution: Option<RedistributionState>,
37    /// Round-trip time of the last `/health` ping; `None` until the
38    /// first poll completes or after a transport failure.
39    pub last_ping: Option<Duration>,
40    /// One-line description of the most recent fetch error, if any.
41    /// Cleared on every successful refresh.
42    pub last_error: Option<String>,
43    /// Wall-clock instant of the last successful poll. Used to grey
44    /// out stale data when the link drops.
45    pub last_update: Option<Instant>,
46}
47
48impl HealthSnapshot {
49    /// True iff every required field is populated and there is no
50    /// recorded error. Used by the connection-status indicator.
51    pub fn is_fully_loaded(&self) -> bool {
52        self.last_error.is_none()
53            && self.status.is_some()
54            && self.chain_state.is_some()
55            && self.wallet.is_some()
56            && self.redistribution.is_some()
57    }
58}
59
60/// Snapshot fed to the S2 Stamps screen. `/stamps` polled at the
61/// slower 10 s cadence per `docs/PLAN.md` § 9 — postage state is
62/// updated on chain, not at request rate.
63#[derive(Clone, Debug, Default)]
64pub struct StampsSnapshot {
65    pub batches: Vec<PostageBatch>,
66    pub last_error: Option<String>,
67    pub last_update: Option<Instant>,
68}
69
70impl StampsSnapshot {
71    pub fn is_loaded(&self) -> bool {
72        self.last_update.is_some() && self.last_error.is_none()
73    }
74}
75
76/// Watch-channel hub. Owns one [`watch::Sender`] per resource group;
77/// hands out clones of the receiver via `health()` / `stamps()` etc.
78#[derive(Clone, Debug)]
79pub struct BeeWatch {
80    health_rx: watch::Receiver<HealthSnapshot>,
81    stamps_rx: watch::Receiver<StampsSnapshot>,
82    cancel: CancellationToken,
83}
84
85impl BeeWatch {
86    /// Spawn the polling tasks. The returned hub stays alive (and
87    /// pollers keep running) until `shutdown()` is called or `cancel`
88    /// is cancelled by the caller's parent.
89    pub fn start(client: Arc<ApiClient>, parent_cancel: &CancellationToken) -> Self {
90        let cancel = parent_cancel.child_token();
91        let (health_tx, health_rx) = watch::channel(HealthSnapshot::default());
92        spawn_health_poller(
93            client.clone(),
94            health_tx,
95            cancel.clone(),
96            Duration::from_secs(2),
97        );
98        let (stamps_tx, stamps_rx) = watch::channel(StampsSnapshot::default());
99        spawn_stamps_poller(client, stamps_tx, cancel.clone(), Duration::from_secs(10));
100        Self {
101            health_rx,
102            stamps_rx,
103            cancel,
104        }
105    }
106
107    /// Subscribe to the health snapshot stream. Cheap; cloning the
108    /// receiver does not start a new poller.
109    pub fn health(&self) -> watch::Receiver<HealthSnapshot> {
110        self.health_rx.clone()
111    }
112
113    /// Subscribe to the stamps snapshot stream.
114    pub fn stamps(&self) -> watch::Receiver<StampsSnapshot> {
115        self.stamps_rx.clone()
116    }
117
118    /// Cancel every polling task this hub owns. Idempotent.
119    pub fn shutdown(&self) {
120        self.cancel.cancel();
121    }
122}
123
124/// Poll `/status` + `/chainstate` + `/wallet` + `/redistributionstate`
125/// every `interval` and broadcast a coherent [`HealthSnapshot`].
126fn spawn_health_poller(
127    client: Arc<ApiClient>,
128    tx: watch::Sender<HealthSnapshot>,
129    cancel: CancellationToken,
130    interval: Duration,
131) {
132    tokio::spawn(async move {
133        let mut tick = tokio::time::interval(interval);
134        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
135        loop {
136            tokio::select! {
137                _ = cancel.cancelled() => break,
138                _ = tick.tick() => {
139                    let snap = collect_health(&client).await;
140                    if tx.send(snap).is_err() {
141                        break; // no receivers; nobody cares anymore
142                    }
143                }
144            }
145        }
146    });
147}
148
149/// Poll `/stamps` every `interval` and broadcast a fresh
150/// [`StampsSnapshot`].
151fn spawn_stamps_poller(
152    client: Arc<ApiClient>,
153    tx: watch::Sender<StampsSnapshot>,
154    cancel: CancellationToken,
155    interval: Duration,
156) {
157    tokio::spawn(async move {
158        let mut tick = tokio::time::interval(interval);
159        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
160        loop {
161            tokio::select! {
162                _ = cancel.cancelled() => break,
163                _ = tick.tick() => {
164                    let snap = collect_stamps(&client).await;
165                    if tx.send(snap).is_err() {
166                        break;
167                    }
168                }
169            }
170        }
171    });
172}
173
174async fn collect_stamps(client: &ApiClient) -> StampsSnapshot {
175    match client.bee().postage().get_postage_batches().await {
176        Ok(batches) => StampsSnapshot {
177            batches,
178            last_error: None,
179            last_update: Some(Instant::now()),
180        },
181        Err(e) => StampsSnapshot {
182            batches: Vec::new(),
183            last_error: Some(format!("stamps: {e}")),
184            last_update: Some(Instant::now()),
185        },
186    }
187}
188
189async fn collect_health(client: &ApiClient) -> HealthSnapshot {
190    let bee = client.bee();
191
192    // Time the cheap /health probe alongside the rest so the header
193    // bar can show a single representative latency.
194    let ping_start = Instant::now();
195    let health_ok = bee.debug().health().await.is_ok();
196    let last_ping = health_ok.then(|| ping_start.elapsed());
197
198    let status = bee.debug().status().await;
199    let chain_state = bee.debug().chain_state().await;
200    let wallet = bee.debug().wallet().await;
201    let redistribution = bee.debug().redistribution_state().await;
202
203    let mut snap = HealthSnapshot {
204        last_ping,
205        last_update: Some(Instant::now()),
206        ..Default::default()
207    };
208    let mut errors: Vec<String> = Vec::new();
209    match status {
210        Ok(s) => snap.status = Some(s),
211        Err(e) => errors.push(format!("status: {e}")),
212    }
213    match chain_state {
214        Ok(c) => snap.chain_state = Some(c),
215        Err(e) => errors.push(format!("chainstate: {e}")),
216    }
217    match wallet {
218        Ok(w) => snap.wallet = Some(w),
219        Err(e) => errors.push(format!("wallet: {e}")),
220    }
221    match redistribution {
222        Ok(r) => snap.redistribution = Some(r),
223        Err(e) => errors.push(format!("redistributionstate: {e}")),
224    }
225    if !errors.is_empty() {
226        snap.last_error = Some(errors.join("; "));
227    }
228    snap
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234
235    #[test]
236    fn fully_loaded_default_is_false() {
237        assert!(!HealthSnapshot::default().is_fully_loaded());
238    }
239
240    #[test]
241    fn fully_loaded_requires_no_error_and_all_fields() {
242        // ChainState and Wallet don't implement Default; build empty
243        // instances via JSON to keep the test self-contained.
244        let snap = HealthSnapshot {
245            status: Some(Status::default()),
246            chain_state: Some(serde_json::from_str(r#"{"block":0,"chainTip":0}"#).unwrap()),
247            wallet: Some(
248                serde_json::from_str(
249                    r#"{"chainID":1,"walletAddress":"0x0000000000000000000000000000000000000000"}"#,
250                )
251                .unwrap(),
252            ),
253            redistribution: Some(RedistributionState::default()),
254            ..Default::default()
255        };
256        assert!(snap.is_fully_loaded());
257        let mut bad = snap;
258        bad.last_error = Some("boom".into());
259        assert!(!bad.is_fully_loaded());
260    }
261}