Skip to main content

bee_tui/watch/
mod.rs

1#![allow(dead_code)] // wired into App + Health screen in the next commits.
2
3//! k9s-style watch / informer layer.
4//!
5//! One [`BeeWatch`] hub spawns a polling task per resource group;
6//! each task pushes fresh snapshots into a [`tokio::sync::watch`]
7//! channel. Screens subscribe via [`watch::Receiver`] handles and
8//! render the latest snapshot — they never poll directly.
9//!
10//! The cancellation tree mirrors `docs/PLAN.md` § 6: every poller's
11//! token is a child of the hub's, which is a child of the App's
12//! root. Quitting cancels the root and unwinds everything; switching
13//! profile (v0.4) drops one hub and starts another.
14//!
15//! Refresh policy is per resource group, not global — `tig`-style
16//! (`docs/PLAN.md` § 3 principle 7).
17
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use bee::api::Tag;
22use bee::debug::{
23    Addresses, ChainState, ChequebookBalance, LastCheque, RedistributionState, Settlements, Status,
24    Topology, TransactionInfo, Wallet,
25};
26use bee::postage::PostageBatch;
27use bee::swarm::Reference;
28use num_bigint::BigInt;
29use tokio::sync::watch;
30use tokio_util::sync::CancellationToken;
31
32use crate::api::ApiClient;
33
34/// Snapshot fed to the Health screen and the connection-status bar.
35/// Updated together because the gates need a coherent view across
36/// `/status`, `/chainstate`, `/wallet`, and `/redistributionstate`.
37#[derive(Clone, Debug, Default)]
38pub struct HealthSnapshot {
39    pub status: Option<Status>,
40    pub chain_state: Option<ChainState>,
41    pub wallet: Option<Wallet>,
42    pub redistribution: Option<RedistributionState>,
43    /// Round-trip time of the last `/health` ping; `None` until the
44    /// first poll completes or after a transport failure.
45    pub last_ping: Option<Duration>,
46    /// One-line description of the most recent fetch error, if any.
47    /// Cleared on every successful refresh.
48    pub last_error: Option<String>,
49    /// Wall-clock instant of the last successful poll. Used to grey
50    /// out stale data when the link drops.
51    pub last_update: Option<Instant>,
52}
53
54impl HealthSnapshot {
55    /// True iff every required field is populated and there is no
56    /// recorded error. Used by the connection-status indicator.
57    pub fn is_fully_loaded(&self) -> bool {
58        self.last_error.is_none()
59            && self.status.is_some()
60            && self.chain_state.is_some()
61            && self.wallet.is_some()
62            && self.redistribution.is_some()
63    }
64}
65
66/// Snapshot fed to the S2 Stamps screen. `/stamps` polled at the
67/// slower 10 s cadence per `docs/PLAN.md` § 9 — postage state is
68/// updated on chain, not at request rate.
69#[derive(Clone, Debug, Default)]
70pub struct StampsSnapshot {
71    pub batches: Vec<PostageBatch>,
72    pub last_error: Option<String>,
73    pub last_update: Option<Instant>,
74}
75
76impl StampsSnapshot {
77    pub fn is_loaded(&self) -> bool {
78        self.last_update.is_some() && self.last_error.is_none()
79    }
80}
81
82/// Snapshot fed to the S3 SWAP / cheques screen. `/chequebook/*` and
83/// `/settlements` are slow-changing — chain-rate at most — so the
84/// poll cadence is 30 s per `docs/PLAN.md` § 9.
85#[derive(Clone, Debug, Default)]
86pub struct SwapSnapshot {
87    pub chequebook: Option<ChequebookBalance>,
88    /// On-chain chequebook contract address from
89    /// `/chequebook/address`. Pasted onto the S3 header so operators
90    /// can jump straight to a block explorer without unpacking the
91    /// full `/wallet` response.
92    pub chequebook_address: Option<String>,
93    pub settlements: Option<Settlements>,
94    pub time_settlements: Option<Settlements>,
95    /// Last received cheque per peer (from `/chequebook/cheque`).
96    pub last_received: Vec<LastCheque>,
97    pub last_error: Option<String>,
98    pub last_update: Option<Instant>,
99}
100
101impl SwapSnapshot {
102    pub fn is_loaded(&self) -> bool {
103        self.last_update.is_some() && self.last_error.is_none()
104    }
105}
106
107/// Snapshot fed to the S9 Tags / uploads screen. `/tags` is polled
108/// at 5 s — slow enough to be cheap on a quiet node, quick enough
109/// that an in-progress upload's split / sent / synced columns visibly
110/// tick. PLAN proposes 1 s when uploads are active; bumping the
111/// cadence dynamically can land in a follow-up once we observe real
112/// usage.
113#[derive(Clone, Debug, Default)]
114pub struct TagsSnapshot {
115    pub tags: Vec<Tag>,
116    pub last_error: Option<String>,
117    pub last_update: Option<Instant>,
118}
119
120impl TagsSnapshot {
121    pub fn is_loaded(&self) -> bool {
122        self.last_update.is_some() && self.last_error.is_none()
123    }
124}
125
126/// Snapshot fed to the S11 Pins screen. `/pins` is operator-driven
127/// (it only changes when someone explicitly pins/unpins a reference)
128/// so the poll cadence is the slow tier. Per-pin integrity isn't
129/// auto-polled — `/pins/check` walks the entire chunk graph and is
130/// expensive — operators trigger it on demand from the screen.
131#[derive(Clone, Debug, Default)]
132pub struct PinsSnapshot {
133    /// References returned by `GET /pins`, in Bee's response order.
134    pub pins: Vec<Reference>,
135    pub last_error: Option<String>,
136    pub last_update: Option<Instant>,
137}
138
139impl PinsSnapshot {
140    pub fn is_loaded(&self) -> bool {
141        self.last_update.is_some() && self.last_error.is_none()
142    }
143}
144
145/// Snapshot fed to the S8 RPC / API health screen. `/transactions`
146/// only changes when the operator submits something (postage topup,
147/// stake deposit, withdrawal, etc.); 30 s cadence is the same tier
148/// as SWAP and Lottery — slow enough to be cheap, quick enough that
149/// a stuck pending TX shows up within a tick of submission.
150#[derive(Clone, Debug, Default)]
151pub struct TransactionsSnapshot {
152    pub pending: Vec<TransactionInfo>,
153    pub last_error: Option<String>,
154    pub last_update: Option<Instant>,
155}
156
157impl TransactionsSnapshot {
158    pub fn is_loaded(&self) -> bool {
159        self.last_update.is_some() && self.last_error.is_none()
160    }
161}
162
163/// Snapshot fed to the S7 Network/NAT screen. `/addresses` doesn't
164/// change unless the node restarts, so the cadence is 60 s — slow
165/// enough to be invisible in the command-log pane but quick enough
166/// to catch a restart-induced overlay change.
167#[derive(Clone, Debug, Default)]
168pub struct NetworkSnapshot {
169    pub addresses: Option<Addresses>,
170    pub last_error: Option<String>,
171    pub last_update: Option<Instant>,
172}
173
174impl NetworkSnapshot {
175    pub fn is_loaded(&self) -> bool {
176        self.addresses.is_some() && self.last_error.is_none()
177    }
178}
179
180/// Snapshot fed to the S6 Peers screen and the S1 bin-saturation
181/// gate. `/topology` is polled at 5 s — per-bin populations don't
182/// drift faster than peer churn, but the operator does want to see
183/// "bin 4 starving" go yellow within a few ticks of the issue.
184#[derive(Clone, Debug, Default)]
185pub struct TopologySnapshot {
186    pub topology: Option<Topology>,
187    pub last_error: Option<String>,
188    pub last_update: Option<Instant>,
189}
190
191impl TopologySnapshot {
192    pub fn is_loaded(&self) -> bool {
193        self.topology.is_some() && self.last_error.is_none()
194    }
195}
196
197/// Snapshot fed to the S4 Lottery screen. `/stake` is operator-driven
198/// (deposit / withdraw transactions only) so the cadence is 30 s per
199/// `docs/PLAN.md` § 9 — same as SWAP. The redistribution-state half of
200/// the screen is read off the existing 2 s [`HealthSnapshot`] feed; the
201/// Lottery component subscribes to both.
202#[derive(Clone, Debug, Default)]
203pub struct LotterySnapshot {
204    /// `/stake` — currently staked BZZ (PLUR).
205    pub staked: Option<BigInt>,
206    pub last_error: Option<String>,
207    pub last_update: Option<Instant>,
208}
209
210impl LotterySnapshot {
211    pub fn is_loaded(&self) -> bool {
212        self.last_update.is_some() && self.last_error.is_none()
213    }
214}
215
216/// Polling-cadence preset chosen by `[ui].refresh` in `config.toml`.
217/// Each variant maps every per-resource interval to a fixed value,
218/// trading freshness for HTTP volume.
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum RefreshProfile {
221    /// Original cockpit cadence — 2 s health / 5 s topology+tags /
222    /// 30 s swap+lottery+transactions / 60 s network. Use when
223    /// actively diagnosing; floods the bottom Bee HTTP tab.
224    Live,
225    /// Halved-fast-tier cadence (4 s health, 10 s topology+tags,
226    /// otherwise the same as Live). Default for new installs since
227    /// the tabbed log pane shipped — keeps the Bee HTTP tab readable
228    /// without losing operator-relevant freshness.
229    Default,
230    /// Minimal-traffic cadence (8 s health, 20 s topology+tags,
231    /// 60 s mid tier, 120 s network). For "leave it open all day"
232    /// monitoring.
233    Slow,
234}
235
236impl RefreshProfile {
237    /// Parse from the `[ui].refresh` config value. Unknown strings
238    /// fall back to `Default` with a tracing warning so a typo
239    /// can't break startup.
240    pub fn from_config(s: &str) -> Self {
241        match s {
242            "live" => Self::Live,
243            "default" => Self::Default,
244            "slow" => Self::Slow,
245            other => {
246                tracing::warn!(
247                    "unknown [ui].refresh value {other:?}; falling back to \"default\". \
248                     Recognised: live | default | slow."
249                );
250                Self::Default
251            }
252        }
253    }
254
255    pub fn health(self) -> Duration {
256        match self {
257            Self::Live => Duration::from_secs(2),
258            Self::Default => Duration::from_secs(4),
259            Self::Slow => Duration::from_secs(8),
260        }
261    }
262    pub fn topology(self) -> Duration {
263        match self {
264            Self::Live => Duration::from_secs(5),
265            Self::Default => Duration::from_secs(10),
266            Self::Slow => Duration::from_secs(20),
267        }
268    }
269    pub fn stamps(self) -> Duration {
270        // Stamps were already 10s under Live; we slow them under
271        // Slow only. Utilization grows at upload rate, not poll rate.
272        match self {
273            Self::Live | Self::Default => Duration::from_secs(10),
274            Self::Slow => Duration::from_secs(20),
275        }
276    }
277    pub fn tags(self) -> Duration {
278        match self {
279            Self::Live => Duration::from_secs(5),
280            Self::Default => Duration::from_secs(10),
281            Self::Slow => Duration::from_secs(20),
282        }
283    }
284    pub fn swap(self) -> Duration {
285        match self {
286            Self::Live | Self::Default => Duration::from_secs(30),
287            Self::Slow => Duration::from_secs(60),
288        }
289    }
290    pub fn lottery(self) -> Duration {
291        match self {
292            Self::Live | Self::Default => Duration::from_secs(30),
293            Self::Slow => Duration::from_secs(60),
294        }
295    }
296    pub fn transactions(self) -> Duration {
297        match self {
298            Self::Live | Self::Default => Duration::from_secs(30),
299            Self::Slow => Duration::from_secs(60),
300        }
301    }
302    pub fn network(self) -> Duration {
303        match self {
304            Self::Live | Self::Default => Duration::from_secs(60),
305            Self::Slow => Duration::from_secs(120),
306        }
307    }
308    pub fn pins(self) -> Duration {
309        // /pins is a tiny list; cadence is set by how fast a pin set
310        // can change in practice (operator-driven). Same tier as
311        // swap/lottery/transactions.
312        match self {
313            Self::Live | Self::Default => Duration::from_secs(30),
314            Self::Slow => Duration::from_secs(60),
315        }
316    }
317}
318
319/// Watch-channel hub. Owns one [`watch::Sender`] per resource group;
320/// hands out clones of the receiver via `health()` / `stamps()` /
321/// `swap()` / `lottery()` / `topology()` / `network()` etc.
322#[derive(Clone, Debug)]
323pub struct BeeWatch {
324    health_rx: watch::Receiver<HealthSnapshot>,
325    stamps_rx: watch::Receiver<StampsSnapshot>,
326    swap_rx: watch::Receiver<SwapSnapshot>,
327    lottery_rx: watch::Receiver<LotterySnapshot>,
328    topology_rx: watch::Receiver<TopologySnapshot>,
329    network_rx: watch::Receiver<NetworkSnapshot>,
330    transactions_rx: watch::Receiver<TransactionsSnapshot>,
331    tags_rx: watch::Receiver<TagsSnapshot>,
332    pins_rx: watch::Receiver<PinsSnapshot>,
333    cancel: CancellationToken,
334}
335
336impl BeeWatch {
337    /// Spawn the polling tasks at [`RefreshProfile::Default`] cadence.
338    /// Use [`BeeWatch::start_with_profile`] to override.
339    pub fn start(client: Arc<ApiClient>, parent_cancel: &CancellationToken) -> Self {
340        Self::start_with_profile(client, parent_cancel, RefreshProfile::Default)
341    }
342
343    /// Spawn the polling tasks at the supplied cadence. The returned
344    /// hub stays alive (and pollers keep running) until `shutdown()`
345    /// is called or `cancel` is cancelled by the caller's parent.
346    pub fn start_with_profile(
347        client: Arc<ApiClient>,
348        parent_cancel: &CancellationToken,
349        profile: RefreshProfile,
350    ) -> Self {
351        let cancel = parent_cancel.child_token();
352        let (health_tx, health_rx) = watch::channel(HealthSnapshot::default());
353        spawn_health_poller(client.clone(), health_tx, cancel.clone(), profile.health());
354        let (stamps_tx, stamps_rx) = watch::channel(StampsSnapshot::default());
355        spawn_stamps_poller(client.clone(), stamps_tx, cancel.clone(), profile.stamps());
356        let (swap_tx, swap_rx) = watch::channel(SwapSnapshot::default());
357        spawn_swap_poller(client.clone(), swap_tx, cancel.clone(), profile.swap());
358        let (lottery_tx, lottery_rx) = watch::channel(LotterySnapshot::default());
359        spawn_lottery_poller(
360            client.clone(),
361            lottery_tx,
362            cancel.clone(),
363            profile.lottery(),
364        );
365        let (topology_tx, topology_rx) = watch::channel(TopologySnapshot::default());
366        spawn_topology_poller(
367            client.clone(),
368            topology_tx,
369            cancel.clone(),
370            profile.topology(),
371        );
372        let (network_tx, network_rx) = watch::channel(NetworkSnapshot::default());
373        spawn_network_poller(
374            client.clone(),
375            network_tx,
376            cancel.clone(),
377            profile.network(),
378        );
379        let (transactions_tx, transactions_rx) = watch::channel(TransactionsSnapshot::default());
380        spawn_transactions_poller(
381            client.clone(),
382            transactions_tx,
383            cancel.clone(),
384            profile.transactions(),
385        );
386        let (tags_tx, tags_rx) = watch::channel(TagsSnapshot::default());
387        spawn_tags_poller(client.clone(), tags_tx, cancel.clone(), profile.tags());
388        let (pins_tx, pins_rx) = watch::channel(PinsSnapshot::default());
389        spawn_pins_poller(client, pins_tx, cancel.clone(), profile.pins());
390        Self {
391            health_rx,
392            stamps_rx,
393            swap_rx,
394            lottery_rx,
395            topology_rx,
396            network_rx,
397            transactions_rx,
398            tags_rx,
399            pins_rx,
400            cancel,
401        }
402    }
403
404    /// Subscribe to the health snapshot stream. Cheap; cloning the
405    /// receiver does not start a new poller.
406    pub fn health(&self) -> watch::Receiver<HealthSnapshot> {
407        self.health_rx.clone()
408    }
409
410    /// Subscribe to the stamps snapshot stream.
411    pub fn stamps(&self) -> watch::Receiver<StampsSnapshot> {
412        self.stamps_rx.clone()
413    }
414
415    /// Subscribe to the swap snapshot stream.
416    pub fn swap(&self) -> watch::Receiver<SwapSnapshot> {
417        self.swap_rx.clone()
418    }
419
420    /// Subscribe to the lottery snapshot stream (`/stake`).
421    pub fn lottery(&self) -> watch::Receiver<LotterySnapshot> {
422        self.lottery_rx.clone()
423    }
424
425    /// Subscribe to the topology snapshot stream (`/topology`).
426    pub fn topology(&self) -> watch::Receiver<TopologySnapshot> {
427        self.topology_rx.clone()
428    }
429
430    /// Subscribe to the network snapshot stream (`/addresses`).
431    pub fn network(&self) -> watch::Receiver<NetworkSnapshot> {
432        self.network_rx.clone()
433    }
434
435    /// Subscribe to the pending-transactions snapshot stream
436    /// (`/transactions`).
437    pub fn transactions(&self) -> watch::Receiver<TransactionsSnapshot> {
438        self.transactions_rx.clone()
439    }
440
441    /// Subscribe to the tags snapshot stream (`/tags`).
442    pub fn tags(&self) -> watch::Receiver<TagsSnapshot> {
443        self.tags_rx.clone()
444    }
445
446    /// Subscribe to the pins snapshot stream (`/pins`). Per-pin
447    /// integrity (from `/pins/check`) isn't part of this stream —
448    /// the S11 component walks integrity on demand because the
449    /// underlying check is expensive.
450    pub fn pins(&self) -> watch::Receiver<PinsSnapshot> {
451        self.pins_rx.clone()
452    }
453
454    /// Cancel every polling task this hub owns. Idempotent.
455    pub fn shutdown(&self) {
456        self.cancel.cancel();
457    }
458}
459
460/// Poll `/status` + `/chainstate` + `/wallet` + `/redistributionstate`
461/// every `interval` and broadcast a coherent [`HealthSnapshot`].
462fn spawn_health_poller(
463    client: Arc<ApiClient>,
464    tx: watch::Sender<HealthSnapshot>,
465    cancel: CancellationToken,
466    interval: Duration,
467) {
468    tokio::spawn(async move {
469        let mut tick = tokio::time::interval(interval);
470        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
471        loop {
472            tokio::select! {
473                _ = cancel.cancelled() => break,
474                _ = tick.tick() => {
475                    let snap = collect_health(&client).await;
476                    if tx.send(snap).is_err() {
477                        break; // no receivers; nobody cares anymore
478                    }
479                }
480            }
481        }
482    });
483}
484
485/// Poll `/stamps` every `interval` and broadcast a fresh
486/// [`StampsSnapshot`].
487fn spawn_stamps_poller(
488    client: Arc<ApiClient>,
489    tx: watch::Sender<StampsSnapshot>,
490    cancel: CancellationToken,
491    interval: Duration,
492) {
493    tokio::spawn(async move {
494        let mut tick = tokio::time::interval(interval);
495        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
496        loop {
497            tokio::select! {
498                _ = cancel.cancelled() => break,
499                _ = tick.tick() => {
500                    let snap = collect_stamps(&client).await;
501                    if tx.send(snap).is_err() {
502                        break;
503                    }
504                }
505            }
506        }
507    });
508}
509
510async fn collect_stamps(client: &ApiClient) -> StampsSnapshot {
511    match client.bee().postage().get_postage_batches().await {
512        Ok(batches) => StampsSnapshot {
513            batches,
514            last_error: None,
515            last_update: Some(Instant::now()),
516        },
517        Err(e) => StampsSnapshot {
518            batches: Vec::new(),
519            last_error: Some(format!("stamps: {e}")),
520            last_update: Some(Instant::now()),
521        },
522    }
523}
524
525/// Poll the four `/chequebook` + `/settlement` endpoints every
526/// `interval` and broadcast a fresh [`SwapSnapshot`].
527fn spawn_swap_poller(
528    client: Arc<ApiClient>,
529    tx: watch::Sender<SwapSnapshot>,
530    cancel: CancellationToken,
531    interval: Duration,
532) {
533    tokio::spawn(async move {
534        let mut tick = tokio::time::interval(interval);
535        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
536        loop {
537            tokio::select! {
538                _ = cancel.cancelled() => break,
539                _ = tick.tick() => {
540                    let snap = collect_swap(&client).await;
541                    if tx.send(snap).is_err() {
542                        break;
543                    }
544                }
545            }
546        }
547    });
548}
549
550async fn collect_swap(client: &ApiClient) -> SwapSnapshot {
551    let bee = client.bee();
552    let chequebook = bee.debug().chequebook_balance().await;
553    let chequebook_address = bee.debug().chequebook_address().await;
554    let settlements = bee.debug().settlements().await;
555    let time_settlements = bee.debug().time_settlements().await;
556    let last_received = bee.debug().last_cheques().await;
557
558    let mut snap = SwapSnapshot {
559        last_update: Some(Instant::now()),
560        ..Default::default()
561    };
562    let mut errors: Vec<String> = Vec::new();
563    match chequebook {
564        Ok(c) => snap.chequebook = Some(c),
565        Err(e) => errors.push(format!("chequebook: {e}")),
566    }
567    // Address-fetch failure is non-fatal — surfacing the contract
568    // address is a "nice to have" header decoration; the rest of the
569    // SWAP screen keeps working without it.
570    if let Ok(a) = chequebook_address {
571        snap.chequebook_address = Some(a);
572    }
573    match settlements {
574        Ok(s) => snap.settlements = Some(s),
575        Err(e) => errors.push(format!("settlements: {e}")),
576    }
577    match time_settlements {
578        Ok(s) => snap.time_settlements = Some(s),
579        Err(e) => errors.push(format!("timesettlements: {e}")),
580    }
581    match last_received {
582        Ok(v) => snap.last_received = v,
583        Err(e) => errors.push(format!("cheques: {e}")),
584    }
585    if !errors.is_empty() {
586        snap.last_error = Some(errors.join("; "));
587    }
588    snap
589}
590
591/// Poll `/stake` every `interval` and broadcast a fresh
592/// [`LotterySnapshot`].
593fn spawn_lottery_poller(
594    client: Arc<ApiClient>,
595    tx: watch::Sender<LotterySnapshot>,
596    cancel: CancellationToken,
597    interval: Duration,
598) {
599    tokio::spawn(async move {
600        let mut tick = tokio::time::interval(interval);
601        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
602        loop {
603            tokio::select! {
604                _ = cancel.cancelled() => break,
605                _ = tick.tick() => {
606                    let snap = collect_lottery(&client).await;
607                    if tx.send(snap).is_err() {
608                        break;
609                    }
610                }
611            }
612        }
613    });
614}
615
616async fn collect_lottery(client: &ApiClient) -> LotterySnapshot {
617    match client.bee().debug().stake().await {
618        Ok(staked) => LotterySnapshot {
619            staked: Some(staked),
620            last_error: None,
621            last_update: Some(Instant::now()),
622        },
623        Err(e) => LotterySnapshot {
624            staked: None,
625            last_error: Some(format!("stake: {e}")),
626            last_update: Some(Instant::now()),
627        },
628    }
629}
630
631/// Poll `/topology` every `interval` and broadcast a fresh
632/// [`TopologySnapshot`].
633fn spawn_topology_poller(
634    client: Arc<ApiClient>,
635    tx: watch::Sender<TopologySnapshot>,
636    cancel: CancellationToken,
637    interval: Duration,
638) {
639    tokio::spawn(async move {
640        let mut tick = tokio::time::interval(interval);
641        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
642        loop {
643            tokio::select! {
644                _ = cancel.cancelled() => break,
645                _ = tick.tick() => {
646                    let snap = collect_topology(&client).await;
647                    if tx.send(snap).is_err() {
648                        break;
649                    }
650                }
651            }
652        }
653    });
654}
655
656async fn collect_topology(client: &ApiClient) -> TopologySnapshot {
657    match client.bee().debug().topology().await {
658        Ok(topology) => TopologySnapshot {
659            topology: Some(topology),
660            last_error: None,
661            last_update: Some(Instant::now()),
662        },
663        Err(e) => TopologySnapshot {
664            topology: None,
665            last_error: Some(format!("topology: {e}")),
666            last_update: Some(Instant::now()),
667        },
668    }
669}
670
671/// Poll `/addresses` every `interval` and broadcast a fresh
672/// [`NetworkSnapshot`].
673fn spawn_network_poller(
674    client: Arc<ApiClient>,
675    tx: watch::Sender<NetworkSnapshot>,
676    cancel: CancellationToken,
677    interval: Duration,
678) {
679    tokio::spawn(async move {
680        let mut tick = tokio::time::interval(interval);
681        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
682        loop {
683            tokio::select! {
684                _ = cancel.cancelled() => break,
685                _ = tick.tick() => {
686                    let snap = collect_network(&client).await;
687                    if tx.send(snap).is_err() {
688                        break;
689                    }
690                }
691            }
692        }
693    });
694}
695
696async fn collect_network(client: &ApiClient) -> NetworkSnapshot {
697    match client.bee().debug().addresses().await {
698        Ok(addresses) => NetworkSnapshot {
699            addresses: Some(addresses),
700            last_error: None,
701            last_update: Some(Instant::now()),
702        },
703        Err(e) => NetworkSnapshot {
704            addresses: None,
705            last_error: Some(format!("addresses: {e}")),
706            last_update: Some(Instant::now()),
707        },
708    }
709}
710
711/// Poll `/transactions` every `interval` and broadcast a fresh
712/// [`TransactionsSnapshot`].
713fn spawn_transactions_poller(
714    client: Arc<ApiClient>,
715    tx: watch::Sender<TransactionsSnapshot>,
716    cancel: CancellationToken,
717    interval: Duration,
718) {
719    tokio::spawn(async move {
720        let mut tick = tokio::time::interval(interval);
721        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
722        loop {
723            tokio::select! {
724                _ = cancel.cancelled() => break,
725                _ = tick.tick() => {
726                    let snap = collect_transactions(&client).await;
727                    if tx.send(snap).is_err() {
728                        break;
729                    }
730                }
731            }
732        }
733    });
734}
735
736async fn collect_transactions(client: &ApiClient) -> TransactionsSnapshot {
737    match client.bee().debug().pending_transactions().await {
738        Ok(pending) => TransactionsSnapshot {
739            pending,
740            last_error: None,
741            last_update: Some(Instant::now()),
742        },
743        Err(e) => TransactionsSnapshot {
744            pending: Vec::new(),
745            last_error: Some(format!("transactions: {e}")),
746            last_update: Some(Instant::now()),
747        },
748    }
749}
750
751/// Poll `/tags` every `interval` and broadcast a fresh
752/// [`TagsSnapshot`].
753fn spawn_tags_poller(
754    client: Arc<ApiClient>,
755    tx: watch::Sender<TagsSnapshot>,
756    cancel: CancellationToken,
757    interval: Duration,
758) {
759    tokio::spawn(async move {
760        let mut tick = tokio::time::interval(interval);
761        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
762        loop {
763            tokio::select! {
764                _ = cancel.cancelled() => break,
765                _ = tick.tick() => {
766                    let snap = collect_tags(&client).await;
767                    if tx.send(snap).is_err() {
768                        break;
769                    }
770                }
771            }
772        }
773    });
774}
775
776async fn collect_tags(client: &ApiClient) -> TagsSnapshot {
777    match client.bee().api().list_tags(None, None).await {
778        Ok(tags) => TagsSnapshot {
779            tags,
780            last_error: None,
781            last_update: Some(Instant::now()),
782        },
783        Err(e) => TagsSnapshot {
784            tags: Vec::new(),
785            last_error: Some(format!("tags: {e}")),
786            last_update: Some(Instant::now()),
787        },
788    }
789}
790
791/// Poll `GET /pins` and broadcast a fresh [`PinsSnapshot`]. Same
792/// shape as the other slow-tier pollers; integrity (`/pins/check`)
793/// is *not* called from here — operators trigger it from the S11
794/// component on demand.
795fn spawn_pins_poller(
796    client: Arc<ApiClient>,
797    tx: watch::Sender<PinsSnapshot>,
798    cancel: CancellationToken,
799    interval: Duration,
800) {
801    tokio::spawn(async move {
802        let mut tick = tokio::time::interval(interval);
803        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
804        loop {
805            tokio::select! {
806                _ = cancel.cancelled() => break,
807                _ = tick.tick() => {
808                    let snap = collect_pins(&client).await;
809                    if tx.send(snap).is_err() {
810                        break;
811                    }
812                }
813            }
814        }
815    });
816}
817
818async fn collect_pins(client: &ApiClient) -> PinsSnapshot {
819    match client.bee().api().list_pins().await {
820        Ok(pins) => PinsSnapshot {
821            pins,
822            last_error: None,
823            last_update: Some(Instant::now()),
824        },
825        Err(e) => PinsSnapshot {
826            pins: Vec::new(),
827            last_error: Some(format!("pins: {e}")),
828            last_update: Some(Instant::now()),
829        },
830    }
831}
832
833async fn collect_health(client: &ApiClient) -> HealthSnapshot {
834    let bee = client.bee();
835
836    // Time the cheap /health probe alongside the rest so the header
837    // bar can show a single representative latency.
838    let ping_start = Instant::now();
839    let health_ok = bee.debug().health().await.is_ok();
840    let last_ping = health_ok.then(|| ping_start.elapsed());
841
842    let status = bee.debug().status().await;
843    let chain_state = bee.debug().chain_state().await;
844    let wallet = bee.debug().wallet().await;
845    let redistribution = bee.debug().redistribution_state().await;
846
847    let mut snap = HealthSnapshot {
848        last_ping,
849        last_update: Some(Instant::now()),
850        ..Default::default()
851    };
852    let mut errors: Vec<String> = Vec::new();
853    match status {
854        Ok(s) => snap.status = Some(s),
855        Err(e) => errors.push(format!("status: {e}")),
856    }
857    match chain_state {
858        Ok(c) => snap.chain_state = Some(c),
859        Err(e) => errors.push(format!("chainstate: {e}")),
860    }
861    match wallet {
862        Ok(w) => snap.wallet = Some(w),
863        Err(e) => errors.push(format!("wallet: {e}")),
864    }
865    match redistribution {
866        Ok(r) => snap.redistribution = Some(r),
867        Err(e) => errors.push(format!("redistributionstate: {e}")),
868    }
869    if !errors.is_empty() {
870        snap.last_error = Some(errors.join("; "));
871    }
872    snap
873}
874
875#[cfg(test)]
876mod tests {
877    use super::*;
878
879    #[test]
880    fn fully_loaded_default_is_false() {
881        assert!(!HealthSnapshot::default().is_fully_loaded());
882    }
883
884    #[test]
885    fn fully_loaded_requires_no_error_and_all_fields() {
886        // ChainState and Wallet don't implement Default; build empty
887        // instances via JSON to keep the test self-contained.
888        let snap = HealthSnapshot {
889            status: Some(Status::default()),
890            chain_state: Some(serde_json::from_str(r#"{"block":0,"chainTip":0}"#).unwrap()),
891            wallet: Some(
892                serde_json::from_str(
893                    r#"{"chainID":1,"walletAddress":"0x0000000000000000000000000000000000000000"}"#,
894                )
895                .unwrap(),
896            ),
897            redistribution: Some(RedistributionState::default()),
898            ..Default::default()
899        };
900        assert!(snap.is_fully_loaded());
901        let mut bad = snap;
902        bad.last_error = Some("boom".into());
903        assert!(!bad.is_fully_loaded());
904    }
905}