1#![allow(dead_code)] use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use bee::api::Tag;
22use bee::debug::{
23 Addresses, ChainState, ChequebookBalance, LastCheque, RedistributionState, Settlements, Status,
24 Topology, TransactionInfo, Wallet,
25};
26use bee::postage::PostageBatch;
27use num_bigint::BigInt;
28use tokio::sync::watch;
29use tokio_util::sync::CancellationToken;
30
31use crate::api::ApiClient;
32
33#[derive(Clone, Debug, Default)]
37pub struct HealthSnapshot {
38 pub status: Option<Status>,
39 pub chain_state: Option<ChainState>,
40 pub wallet: Option<Wallet>,
41 pub redistribution: Option<RedistributionState>,
42 pub last_ping: Option<Duration>,
45 pub last_error: Option<String>,
48 pub last_update: Option<Instant>,
51}
52
53impl HealthSnapshot {
54 pub fn is_fully_loaded(&self) -> bool {
57 self.last_error.is_none()
58 && self.status.is_some()
59 && self.chain_state.is_some()
60 && self.wallet.is_some()
61 && self.redistribution.is_some()
62 }
63}
64
65#[derive(Clone, Debug, Default)]
69pub struct StampsSnapshot {
70 pub batches: Vec<PostageBatch>,
71 pub last_error: Option<String>,
72 pub last_update: Option<Instant>,
73}
74
75impl StampsSnapshot {
76 pub fn is_loaded(&self) -> bool {
77 self.last_update.is_some() && self.last_error.is_none()
78 }
79}
80
81#[derive(Clone, Debug, Default)]
85pub struct SwapSnapshot {
86 pub chequebook: Option<ChequebookBalance>,
87 pub settlements: Option<Settlements>,
88 pub time_settlements: Option<Settlements>,
89 pub last_received: Vec<LastCheque>,
91 pub last_error: Option<String>,
92 pub last_update: Option<Instant>,
93}
94
95impl SwapSnapshot {
96 pub fn is_loaded(&self) -> bool {
97 self.last_update.is_some() && self.last_error.is_none()
98 }
99}
100
101#[derive(Clone, Debug, Default)]
108pub struct TagsSnapshot {
109 pub tags: Vec<Tag>,
110 pub last_error: Option<String>,
111 pub last_update: Option<Instant>,
112}
113
114impl TagsSnapshot {
115 pub fn is_loaded(&self) -> bool {
116 self.last_update.is_some() && self.last_error.is_none()
117 }
118}
119
120#[derive(Clone, Debug, Default)]
126pub struct TransactionsSnapshot {
127 pub pending: Vec<TransactionInfo>,
128 pub last_error: Option<String>,
129 pub last_update: Option<Instant>,
130}
131
132impl TransactionsSnapshot {
133 pub fn is_loaded(&self) -> bool {
134 self.last_update.is_some() && self.last_error.is_none()
135 }
136}
137
138#[derive(Clone, Debug, Default)]
143pub struct NetworkSnapshot {
144 pub addresses: Option<Addresses>,
145 pub last_error: Option<String>,
146 pub last_update: Option<Instant>,
147}
148
149impl NetworkSnapshot {
150 pub fn is_loaded(&self) -> bool {
151 self.addresses.is_some() && self.last_error.is_none()
152 }
153}
154
155#[derive(Clone, Debug, Default)]
160pub struct TopologySnapshot {
161 pub topology: Option<Topology>,
162 pub last_error: Option<String>,
163 pub last_update: Option<Instant>,
164}
165
166impl TopologySnapshot {
167 pub fn is_loaded(&self) -> bool {
168 self.topology.is_some() && self.last_error.is_none()
169 }
170}
171
172#[derive(Clone, Debug, Default)]
178pub struct LotterySnapshot {
179 pub staked: Option<BigInt>,
181 pub last_error: Option<String>,
182 pub last_update: Option<Instant>,
183}
184
185impl LotterySnapshot {
186 pub fn is_loaded(&self) -> bool {
187 self.last_update.is_some() && self.last_error.is_none()
188 }
189}
190
191#[derive(Clone, Debug)]
195pub struct BeeWatch {
196 health_rx: watch::Receiver<HealthSnapshot>,
197 stamps_rx: watch::Receiver<StampsSnapshot>,
198 swap_rx: watch::Receiver<SwapSnapshot>,
199 lottery_rx: watch::Receiver<LotterySnapshot>,
200 topology_rx: watch::Receiver<TopologySnapshot>,
201 network_rx: watch::Receiver<NetworkSnapshot>,
202 transactions_rx: watch::Receiver<TransactionsSnapshot>,
203 tags_rx: watch::Receiver<TagsSnapshot>,
204 cancel: CancellationToken,
205}
206
207impl BeeWatch {
208 pub fn start(client: Arc<ApiClient>, parent_cancel: &CancellationToken) -> Self {
212 let cancel = parent_cancel.child_token();
213 let (health_tx, health_rx) = watch::channel(HealthSnapshot::default());
214 spawn_health_poller(
215 client.clone(),
216 health_tx,
217 cancel.clone(),
218 Duration::from_secs(2),
219 );
220 let (stamps_tx, stamps_rx) = watch::channel(StampsSnapshot::default());
221 spawn_stamps_poller(
222 client.clone(),
223 stamps_tx,
224 cancel.clone(),
225 Duration::from_secs(10),
226 );
227 let (swap_tx, swap_rx) = watch::channel(SwapSnapshot::default());
228 spawn_swap_poller(
229 client.clone(),
230 swap_tx,
231 cancel.clone(),
232 Duration::from_secs(30),
233 );
234 let (lottery_tx, lottery_rx) = watch::channel(LotterySnapshot::default());
235 spawn_lottery_poller(
236 client.clone(),
237 lottery_tx,
238 cancel.clone(),
239 Duration::from_secs(30),
240 );
241 let (topology_tx, topology_rx) = watch::channel(TopologySnapshot::default());
242 spawn_topology_poller(
243 client.clone(),
244 topology_tx,
245 cancel.clone(),
246 Duration::from_secs(5),
247 );
248 let (network_tx, network_rx) = watch::channel(NetworkSnapshot::default());
249 spawn_network_poller(
250 client.clone(),
251 network_tx,
252 cancel.clone(),
253 Duration::from_secs(60),
254 );
255 let (transactions_tx, transactions_rx) =
256 watch::channel(TransactionsSnapshot::default());
257 spawn_transactions_poller(
258 client.clone(),
259 transactions_tx,
260 cancel.clone(),
261 Duration::from_secs(30),
262 );
263 let (tags_tx, tags_rx) = watch::channel(TagsSnapshot::default());
264 spawn_tags_poller(client, tags_tx, cancel.clone(), Duration::from_secs(5));
265 Self {
266 health_rx,
267 stamps_rx,
268 swap_rx,
269 lottery_rx,
270 topology_rx,
271 network_rx,
272 transactions_rx,
273 tags_rx,
274 cancel,
275 }
276 }
277
278 pub fn health(&self) -> watch::Receiver<HealthSnapshot> {
281 self.health_rx.clone()
282 }
283
284 pub fn stamps(&self) -> watch::Receiver<StampsSnapshot> {
286 self.stamps_rx.clone()
287 }
288
289 pub fn swap(&self) -> watch::Receiver<SwapSnapshot> {
291 self.swap_rx.clone()
292 }
293
294 pub fn lottery(&self) -> watch::Receiver<LotterySnapshot> {
296 self.lottery_rx.clone()
297 }
298
299 pub fn topology(&self) -> watch::Receiver<TopologySnapshot> {
301 self.topology_rx.clone()
302 }
303
304 pub fn network(&self) -> watch::Receiver<NetworkSnapshot> {
306 self.network_rx.clone()
307 }
308
309 pub fn transactions(&self) -> watch::Receiver<TransactionsSnapshot> {
312 self.transactions_rx.clone()
313 }
314
315 pub fn tags(&self) -> watch::Receiver<TagsSnapshot> {
317 self.tags_rx.clone()
318 }
319
320 pub fn shutdown(&self) {
322 self.cancel.cancel();
323 }
324}
325
326fn spawn_health_poller(
329 client: Arc<ApiClient>,
330 tx: watch::Sender<HealthSnapshot>,
331 cancel: CancellationToken,
332 interval: Duration,
333) {
334 tokio::spawn(async move {
335 let mut tick = tokio::time::interval(interval);
336 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
337 loop {
338 tokio::select! {
339 _ = cancel.cancelled() => break,
340 _ = tick.tick() => {
341 let snap = collect_health(&client).await;
342 if tx.send(snap).is_err() {
343 break; }
345 }
346 }
347 }
348 });
349}
350
351fn spawn_stamps_poller(
354 client: Arc<ApiClient>,
355 tx: watch::Sender<StampsSnapshot>,
356 cancel: CancellationToken,
357 interval: Duration,
358) {
359 tokio::spawn(async move {
360 let mut tick = tokio::time::interval(interval);
361 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
362 loop {
363 tokio::select! {
364 _ = cancel.cancelled() => break,
365 _ = tick.tick() => {
366 let snap = collect_stamps(&client).await;
367 if tx.send(snap).is_err() {
368 break;
369 }
370 }
371 }
372 }
373 });
374}
375
376async fn collect_stamps(client: &ApiClient) -> StampsSnapshot {
377 match client.bee().postage().get_postage_batches().await {
378 Ok(batches) => StampsSnapshot {
379 batches,
380 last_error: None,
381 last_update: Some(Instant::now()),
382 },
383 Err(e) => StampsSnapshot {
384 batches: Vec::new(),
385 last_error: Some(format!("stamps: {e}")),
386 last_update: Some(Instant::now()),
387 },
388 }
389}
390
391fn spawn_swap_poller(
394 client: Arc<ApiClient>,
395 tx: watch::Sender<SwapSnapshot>,
396 cancel: CancellationToken,
397 interval: Duration,
398) {
399 tokio::spawn(async move {
400 let mut tick = tokio::time::interval(interval);
401 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
402 loop {
403 tokio::select! {
404 _ = cancel.cancelled() => break,
405 _ = tick.tick() => {
406 let snap = collect_swap(&client).await;
407 if tx.send(snap).is_err() {
408 break;
409 }
410 }
411 }
412 }
413 });
414}
415
416async fn collect_swap(client: &ApiClient) -> SwapSnapshot {
417 let bee = client.bee();
418 let chequebook = bee.debug().chequebook_balance().await;
419 let settlements = bee.debug().settlements().await;
420 let time_settlements = bee.debug().time_settlements().await;
421 let last_received = bee.debug().last_cheques().await;
422
423 let mut snap = SwapSnapshot {
424 last_update: Some(Instant::now()),
425 ..Default::default()
426 };
427 let mut errors: Vec<String> = Vec::new();
428 match chequebook {
429 Ok(c) => snap.chequebook = Some(c),
430 Err(e) => errors.push(format!("chequebook: {e}")),
431 }
432 match settlements {
433 Ok(s) => snap.settlements = Some(s),
434 Err(e) => errors.push(format!("settlements: {e}")),
435 }
436 match time_settlements {
437 Ok(s) => snap.time_settlements = Some(s),
438 Err(e) => errors.push(format!("timesettlements: {e}")),
439 }
440 match last_received {
441 Ok(v) => snap.last_received = v,
442 Err(e) => errors.push(format!("cheques: {e}")),
443 }
444 if !errors.is_empty() {
445 snap.last_error = Some(errors.join("; "));
446 }
447 snap
448}
449
450fn spawn_lottery_poller(
453 client: Arc<ApiClient>,
454 tx: watch::Sender<LotterySnapshot>,
455 cancel: CancellationToken,
456 interval: Duration,
457) {
458 tokio::spawn(async move {
459 let mut tick = tokio::time::interval(interval);
460 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
461 loop {
462 tokio::select! {
463 _ = cancel.cancelled() => break,
464 _ = tick.tick() => {
465 let snap = collect_lottery(&client).await;
466 if tx.send(snap).is_err() {
467 break;
468 }
469 }
470 }
471 }
472 });
473}
474
475async fn collect_lottery(client: &ApiClient) -> LotterySnapshot {
476 match client.bee().debug().stake().await {
477 Ok(staked) => LotterySnapshot {
478 staked: Some(staked),
479 last_error: None,
480 last_update: Some(Instant::now()),
481 },
482 Err(e) => LotterySnapshot {
483 staked: None,
484 last_error: Some(format!("stake: {e}")),
485 last_update: Some(Instant::now()),
486 },
487 }
488}
489
490fn spawn_topology_poller(
493 client: Arc<ApiClient>,
494 tx: watch::Sender<TopologySnapshot>,
495 cancel: CancellationToken,
496 interval: Duration,
497) {
498 tokio::spawn(async move {
499 let mut tick = tokio::time::interval(interval);
500 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
501 loop {
502 tokio::select! {
503 _ = cancel.cancelled() => break,
504 _ = tick.tick() => {
505 let snap = collect_topology(&client).await;
506 if tx.send(snap).is_err() {
507 break;
508 }
509 }
510 }
511 }
512 });
513}
514
515async fn collect_topology(client: &ApiClient) -> TopologySnapshot {
516 match client.bee().debug().topology().await {
517 Ok(topology) => TopologySnapshot {
518 topology: Some(topology),
519 last_error: None,
520 last_update: Some(Instant::now()),
521 },
522 Err(e) => TopologySnapshot {
523 topology: None,
524 last_error: Some(format!("topology: {e}")),
525 last_update: Some(Instant::now()),
526 },
527 }
528}
529
530fn spawn_network_poller(
533 client: Arc<ApiClient>,
534 tx: watch::Sender<NetworkSnapshot>,
535 cancel: CancellationToken,
536 interval: Duration,
537) {
538 tokio::spawn(async move {
539 let mut tick = tokio::time::interval(interval);
540 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
541 loop {
542 tokio::select! {
543 _ = cancel.cancelled() => break,
544 _ = tick.tick() => {
545 let snap = collect_network(&client).await;
546 if tx.send(snap).is_err() {
547 break;
548 }
549 }
550 }
551 }
552 });
553}
554
555async fn collect_network(client: &ApiClient) -> NetworkSnapshot {
556 match client.bee().debug().addresses().await {
557 Ok(addresses) => NetworkSnapshot {
558 addresses: Some(addresses),
559 last_error: None,
560 last_update: Some(Instant::now()),
561 },
562 Err(e) => NetworkSnapshot {
563 addresses: None,
564 last_error: Some(format!("addresses: {e}")),
565 last_update: Some(Instant::now()),
566 },
567 }
568}
569
570fn spawn_transactions_poller(
573 client: Arc<ApiClient>,
574 tx: watch::Sender<TransactionsSnapshot>,
575 cancel: CancellationToken,
576 interval: Duration,
577) {
578 tokio::spawn(async move {
579 let mut tick = tokio::time::interval(interval);
580 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
581 loop {
582 tokio::select! {
583 _ = cancel.cancelled() => break,
584 _ = tick.tick() => {
585 let snap = collect_transactions(&client).await;
586 if tx.send(snap).is_err() {
587 break;
588 }
589 }
590 }
591 }
592 });
593}
594
595async fn collect_transactions(client: &ApiClient) -> TransactionsSnapshot {
596 match client.bee().debug().pending_transactions().await {
597 Ok(pending) => TransactionsSnapshot {
598 pending,
599 last_error: None,
600 last_update: Some(Instant::now()),
601 },
602 Err(e) => TransactionsSnapshot {
603 pending: Vec::new(),
604 last_error: Some(format!("transactions: {e}")),
605 last_update: Some(Instant::now()),
606 },
607 }
608}
609
610fn spawn_tags_poller(
613 client: Arc<ApiClient>,
614 tx: watch::Sender<TagsSnapshot>,
615 cancel: CancellationToken,
616 interval: Duration,
617) {
618 tokio::spawn(async move {
619 let mut tick = tokio::time::interval(interval);
620 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
621 loop {
622 tokio::select! {
623 _ = cancel.cancelled() => break,
624 _ = tick.tick() => {
625 let snap = collect_tags(&client).await;
626 if tx.send(snap).is_err() {
627 break;
628 }
629 }
630 }
631 }
632 });
633}
634
635async fn collect_tags(client: &ApiClient) -> TagsSnapshot {
636 match client.bee().api().list_tags(None, None).await {
637 Ok(tags) => TagsSnapshot {
638 tags,
639 last_error: None,
640 last_update: Some(Instant::now()),
641 },
642 Err(e) => TagsSnapshot {
643 tags: Vec::new(),
644 last_error: Some(format!("tags: {e}")),
645 last_update: Some(Instant::now()),
646 },
647 }
648}
649
650async fn collect_health(client: &ApiClient) -> HealthSnapshot {
651 let bee = client.bee();
652
653 let ping_start = Instant::now();
656 let health_ok = bee.debug().health().await.is_ok();
657 let last_ping = health_ok.then(|| ping_start.elapsed());
658
659 let status = bee.debug().status().await;
660 let chain_state = bee.debug().chain_state().await;
661 let wallet = bee.debug().wallet().await;
662 let redistribution = bee.debug().redistribution_state().await;
663
664 let mut snap = HealthSnapshot {
665 last_ping,
666 last_update: Some(Instant::now()),
667 ..Default::default()
668 };
669 let mut errors: Vec<String> = Vec::new();
670 match status {
671 Ok(s) => snap.status = Some(s),
672 Err(e) => errors.push(format!("status: {e}")),
673 }
674 match chain_state {
675 Ok(c) => snap.chain_state = Some(c),
676 Err(e) => errors.push(format!("chainstate: {e}")),
677 }
678 match wallet {
679 Ok(w) => snap.wallet = Some(w),
680 Err(e) => errors.push(format!("wallet: {e}")),
681 }
682 match redistribution {
683 Ok(r) => snap.redistribution = Some(r),
684 Err(e) => errors.push(format!("redistributionstate: {e}")),
685 }
686 if !errors.is_empty() {
687 snap.last_error = Some(errors.join("; "));
688 }
689 snap
690}
691
692#[cfg(test)]
693mod tests {
694 use super::*;
695
696 #[test]
697 fn fully_loaded_default_is_false() {
698 assert!(!HealthSnapshot::default().is_fully_loaded());
699 }
700
701 #[test]
702 fn fully_loaded_requires_no_error_and_all_fields() {
703 let snap = HealthSnapshot {
706 status: Some(Status::default()),
707 chain_state: Some(serde_json::from_str(r#"{"block":0,"chainTip":0}"#).unwrap()),
708 wallet: Some(
709 serde_json::from_str(
710 r#"{"chainID":1,"walletAddress":"0x0000000000000000000000000000000000000000"}"#,
711 )
712 .unwrap(),
713 ),
714 redistribution: Some(RedistributionState::default()),
715 ..Default::default()
716 };
717 assert!(snap.is_fully_loaded());
718 let mut bad = snap;
719 bad.last_error = Some("boom".into());
720 assert!(!bad.is_fully_loaded());
721 }
722}