1#![allow(dead_code)] use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use bee::debug::{ChainState, RedistributionState, Status, Wallet};
22use bee::postage::PostageBatch;
23use tokio::sync::watch;
24use tokio_util::sync::CancellationToken;
25
26use crate::api::ApiClient;
27
28#[derive(Clone, Debug, Default)]
32pub struct HealthSnapshot {
33 pub status: Option<Status>,
34 pub chain_state: Option<ChainState>,
35 pub wallet: Option<Wallet>,
36 pub redistribution: Option<RedistributionState>,
37 pub last_ping: Option<Duration>,
40 pub last_error: Option<String>,
43 pub last_update: Option<Instant>,
46}
47
48impl HealthSnapshot {
49 pub fn is_fully_loaded(&self) -> bool {
52 self.last_error.is_none()
53 && self.status.is_some()
54 && self.chain_state.is_some()
55 && self.wallet.is_some()
56 && self.redistribution.is_some()
57 }
58}
59
60#[derive(Clone, Debug, Default)]
64pub struct StampsSnapshot {
65 pub batches: Vec<PostageBatch>,
66 pub last_error: Option<String>,
67 pub last_update: Option<Instant>,
68}
69
70impl StampsSnapshot {
71 pub fn is_loaded(&self) -> bool {
72 self.last_update.is_some() && self.last_error.is_none()
73 }
74}
75
76#[derive(Clone, Debug)]
79pub struct BeeWatch {
80 health_rx: watch::Receiver<HealthSnapshot>,
81 stamps_rx: watch::Receiver<StampsSnapshot>,
82 cancel: CancellationToken,
83}
84
85impl BeeWatch {
86 pub fn start(client: Arc<ApiClient>, parent_cancel: &CancellationToken) -> Self {
90 let cancel = parent_cancel.child_token();
91 let (health_tx, health_rx) = watch::channel(HealthSnapshot::default());
92 spawn_health_poller(
93 client.clone(),
94 health_tx,
95 cancel.clone(),
96 Duration::from_secs(2),
97 );
98 let (stamps_tx, stamps_rx) = watch::channel(StampsSnapshot::default());
99 spawn_stamps_poller(client, stamps_tx, cancel.clone(), Duration::from_secs(10));
100 Self {
101 health_rx,
102 stamps_rx,
103 cancel,
104 }
105 }
106
107 pub fn health(&self) -> watch::Receiver<HealthSnapshot> {
110 self.health_rx.clone()
111 }
112
113 pub fn stamps(&self) -> watch::Receiver<StampsSnapshot> {
115 self.stamps_rx.clone()
116 }
117
118 pub fn shutdown(&self) {
120 self.cancel.cancel();
121 }
122}
123
124fn spawn_health_poller(
127 client: Arc<ApiClient>,
128 tx: watch::Sender<HealthSnapshot>,
129 cancel: CancellationToken,
130 interval: Duration,
131) {
132 tokio::spawn(async move {
133 let mut tick = tokio::time::interval(interval);
134 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
135 loop {
136 tokio::select! {
137 _ = cancel.cancelled() => break,
138 _ = tick.tick() => {
139 let snap = collect_health(&client).await;
140 if tx.send(snap).is_err() {
141 break; }
143 }
144 }
145 }
146 });
147}
148
149fn spawn_stamps_poller(
152 client: Arc<ApiClient>,
153 tx: watch::Sender<StampsSnapshot>,
154 cancel: CancellationToken,
155 interval: Duration,
156) {
157 tokio::spawn(async move {
158 let mut tick = tokio::time::interval(interval);
159 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
160 loop {
161 tokio::select! {
162 _ = cancel.cancelled() => break,
163 _ = tick.tick() => {
164 let snap = collect_stamps(&client).await;
165 if tx.send(snap).is_err() {
166 break;
167 }
168 }
169 }
170 }
171 });
172}
173
174async fn collect_stamps(client: &ApiClient) -> StampsSnapshot {
175 match client.bee().postage().get_postage_batches().await {
176 Ok(batches) => StampsSnapshot {
177 batches,
178 last_error: None,
179 last_update: Some(Instant::now()),
180 },
181 Err(e) => StampsSnapshot {
182 batches: Vec::new(),
183 last_error: Some(format!("stamps: {e}")),
184 last_update: Some(Instant::now()),
185 },
186 }
187}
188
189async fn collect_health(client: &ApiClient) -> HealthSnapshot {
190 let bee = client.bee();
191
192 let ping_start = Instant::now();
195 let health_ok = bee.debug().health().await.is_ok();
196 let last_ping = health_ok.then(|| ping_start.elapsed());
197
198 let status = bee.debug().status().await;
199 let chain_state = bee.debug().chain_state().await;
200 let wallet = bee.debug().wallet().await;
201 let redistribution = bee.debug().redistribution_state().await;
202
203 let mut snap = HealthSnapshot {
204 last_ping,
205 last_update: Some(Instant::now()),
206 ..Default::default()
207 };
208 let mut errors: Vec<String> = Vec::new();
209 match status {
210 Ok(s) => snap.status = Some(s),
211 Err(e) => errors.push(format!("status: {e}")),
212 }
213 match chain_state {
214 Ok(c) => snap.chain_state = Some(c),
215 Err(e) => errors.push(format!("chainstate: {e}")),
216 }
217 match wallet {
218 Ok(w) => snap.wallet = Some(w),
219 Err(e) => errors.push(format!("wallet: {e}")),
220 }
221 match redistribution {
222 Ok(r) => snap.redistribution = Some(r),
223 Err(e) => errors.push(format!("redistributionstate: {e}")),
224 }
225 if !errors.is_empty() {
226 snap.last_error = Some(errors.join("; "));
227 }
228 snap
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234
235 #[test]
236 fn fully_loaded_default_is_false() {
237 assert!(!HealthSnapshot::default().is_fully_loaded());
238 }
239
240 #[test]
241 fn fully_loaded_requires_no_error_and_all_fields() {
242 let snap = HealthSnapshot {
245 status: Some(Status::default()),
246 chain_state: Some(serde_json::from_str(r#"{"block":0,"chainTip":0}"#).unwrap()),
247 wallet: Some(
248 serde_json::from_str(
249 r#"{"chainID":1,"walletAddress":"0x0000000000000000000000000000000000000000"}"#,
250 )
251 .unwrap(),
252 ),
253 redistribution: Some(RedistributionState::default()),
254 ..Default::default()
255 };
256 assert!(snap.is_fully_loaded());
257 let mut bad = snap;
258 bad.last_error = Some("boom".into());
259 assert!(!bad.is_fully_loaded());
260 }
261}