1#![allow(dead_code)] use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use bee::api::Tag;
22use bee::debug::{
23 Addresses, ChainState, ChequebookBalance, LastCheque, RedistributionState, Settlements, Status,
24 Topology, TransactionInfo, Wallet,
25};
26use bee::postage::PostageBatch;
27use num_bigint::BigInt;
28use tokio::sync::watch;
29use tokio_util::sync::CancellationToken;
30
31use crate::api::ApiClient;
32
33#[derive(Clone, Debug, Default)]
37pub struct HealthSnapshot {
38 pub status: Option<Status>,
39 pub chain_state: Option<ChainState>,
40 pub wallet: Option<Wallet>,
41 pub redistribution: Option<RedistributionState>,
42 pub last_ping: Option<Duration>,
45 pub last_error: Option<String>,
48 pub last_update: Option<Instant>,
51}
52
53impl HealthSnapshot {
54 pub fn is_fully_loaded(&self) -> bool {
57 self.last_error.is_none()
58 && self.status.is_some()
59 && self.chain_state.is_some()
60 && self.wallet.is_some()
61 && self.redistribution.is_some()
62 }
63}
64
65#[derive(Clone, Debug, Default)]
69pub struct StampsSnapshot {
70 pub batches: Vec<PostageBatch>,
71 pub last_error: Option<String>,
72 pub last_update: Option<Instant>,
73}
74
75impl StampsSnapshot {
76 pub fn is_loaded(&self) -> bool {
77 self.last_update.is_some() && self.last_error.is_none()
78 }
79}
80
81#[derive(Clone, Debug, Default)]
85pub struct SwapSnapshot {
86 pub chequebook: Option<ChequebookBalance>,
87 pub chequebook_address: Option<String>,
92 pub settlements: Option<Settlements>,
93 pub time_settlements: Option<Settlements>,
94 pub last_received: Vec<LastCheque>,
96 pub last_error: Option<String>,
97 pub last_update: Option<Instant>,
98}
99
100impl SwapSnapshot {
101 pub fn is_loaded(&self) -> bool {
102 self.last_update.is_some() && self.last_error.is_none()
103 }
104}
105
106#[derive(Clone, Debug, Default)]
113pub struct TagsSnapshot {
114 pub tags: Vec<Tag>,
115 pub last_error: Option<String>,
116 pub last_update: Option<Instant>,
117}
118
119impl TagsSnapshot {
120 pub fn is_loaded(&self) -> bool {
121 self.last_update.is_some() && self.last_error.is_none()
122 }
123}
124
125#[derive(Clone, Debug, Default)]
131pub struct TransactionsSnapshot {
132 pub pending: Vec<TransactionInfo>,
133 pub last_error: Option<String>,
134 pub last_update: Option<Instant>,
135}
136
137impl TransactionsSnapshot {
138 pub fn is_loaded(&self) -> bool {
139 self.last_update.is_some() && self.last_error.is_none()
140 }
141}
142
143#[derive(Clone, Debug, Default)]
148pub struct NetworkSnapshot {
149 pub addresses: Option<Addresses>,
150 pub last_error: Option<String>,
151 pub last_update: Option<Instant>,
152}
153
154impl NetworkSnapshot {
155 pub fn is_loaded(&self) -> bool {
156 self.addresses.is_some() && self.last_error.is_none()
157 }
158}
159
160#[derive(Clone, Debug, Default)]
165pub struct TopologySnapshot {
166 pub topology: Option<Topology>,
167 pub last_error: Option<String>,
168 pub last_update: Option<Instant>,
169}
170
171impl TopologySnapshot {
172 pub fn is_loaded(&self) -> bool {
173 self.topology.is_some() && self.last_error.is_none()
174 }
175}
176
177#[derive(Clone, Debug, Default)]
183pub struct LotterySnapshot {
184 pub staked: Option<BigInt>,
186 pub last_error: Option<String>,
187 pub last_update: Option<Instant>,
188}
189
190impl LotterySnapshot {
191 pub fn is_loaded(&self) -> bool {
192 self.last_update.is_some() && self.last_error.is_none()
193 }
194}
195
196#[derive(Clone, Debug)]
200pub struct BeeWatch {
201 health_rx: watch::Receiver<HealthSnapshot>,
202 stamps_rx: watch::Receiver<StampsSnapshot>,
203 swap_rx: watch::Receiver<SwapSnapshot>,
204 lottery_rx: watch::Receiver<LotterySnapshot>,
205 topology_rx: watch::Receiver<TopologySnapshot>,
206 network_rx: watch::Receiver<NetworkSnapshot>,
207 transactions_rx: watch::Receiver<TransactionsSnapshot>,
208 tags_rx: watch::Receiver<TagsSnapshot>,
209 cancel: CancellationToken,
210}
211
212impl BeeWatch {
213 pub fn start(client: Arc<ApiClient>, parent_cancel: &CancellationToken) -> Self {
217 let cancel = parent_cancel.child_token();
218 let (health_tx, health_rx) = watch::channel(HealthSnapshot::default());
219 spawn_health_poller(
220 client.clone(),
221 health_tx,
222 cancel.clone(),
223 Duration::from_secs(2),
224 );
225 let (stamps_tx, stamps_rx) = watch::channel(StampsSnapshot::default());
226 spawn_stamps_poller(
227 client.clone(),
228 stamps_tx,
229 cancel.clone(),
230 Duration::from_secs(10),
231 );
232 let (swap_tx, swap_rx) = watch::channel(SwapSnapshot::default());
233 spawn_swap_poller(
234 client.clone(),
235 swap_tx,
236 cancel.clone(),
237 Duration::from_secs(30),
238 );
239 let (lottery_tx, lottery_rx) = watch::channel(LotterySnapshot::default());
240 spawn_lottery_poller(
241 client.clone(),
242 lottery_tx,
243 cancel.clone(),
244 Duration::from_secs(30),
245 );
246 let (topology_tx, topology_rx) = watch::channel(TopologySnapshot::default());
247 spawn_topology_poller(
248 client.clone(),
249 topology_tx,
250 cancel.clone(),
251 Duration::from_secs(5),
252 );
253 let (network_tx, network_rx) = watch::channel(NetworkSnapshot::default());
254 spawn_network_poller(
255 client.clone(),
256 network_tx,
257 cancel.clone(),
258 Duration::from_secs(60),
259 );
260 let (transactions_tx, transactions_rx) =
261 watch::channel(TransactionsSnapshot::default());
262 spawn_transactions_poller(
263 client.clone(),
264 transactions_tx,
265 cancel.clone(),
266 Duration::from_secs(30),
267 );
268 let (tags_tx, tags_rx) = watch::channel(TagsSnapshot::default());
269 spawn_tags_poller(client, tags_tx, cancel.clone(), Duration::from_secs(5));
270 Self {
271 health_rx,
272 stamps_rx,
273 swap_rx,
274 lottery_rx,
275 topology_rx,
276 network_rx,
277 transactions_rx,
278 tags_rx,
279 cancel,
280 }
281 }
282
283 pub fn health(&self) -> watch::Receiver<HealthSnapshot> {
286 self.health_rx.clone()
287 }
288
289 pub fn stamps(&self) -> watch::Receiver<StampsSnapshot> {
291 self.stamps_rx.clone()
292 }
293
294 pub fn swap(&self) -> watch::Receiver<SwapSnapshot> {
296 self.swap_rx.clone()
297 }
298
299 pub fn lottery(&self) -> watch::Receiver<LotterySnapshot> {
301 self.lottery_rx.clone()
302 }
303
304 pub fn topology(&self) -> watch::Receiver<TopologySnapshot> {
306 self.topology_rx.clone()
307 }
308
309 pub fn network(&self) -> watch::Receiver<NetworkSnapshot> {
311 self.network_rx.clone()
312 }
313
314 pub fn transactions(&self) -> watch::Receiver<TransactionsSnapshot> {
317 self.transactions_rx.clone()
318 }
319
320 pub fn tags(&self) -> watch::Receiver<TagsSnapshot> {
322 self.tags_rx.clone()
323 }
324
325 pub fn shutdown(&self) {
327 self.cancel.cancel();
328 }
329}
330
331fn spawn_health_poller(
334 client: Arc<ApiClient>,
335 tx: watch::Sender<HealthSnapshot>,
336 cancel: CancellationToken,
337 interval: Duration,
338) {
339 tokio::spawn(async move {
340 let mut tick = tokio::time::interval(interval);
341 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
342 loop {
343 tokio::select! {
344 _ = cancel.cancelled() => break,
345 _ = tick.tick() => {
346 let snap = collect_health(&client).await;
347 if tx.send(snap).is_err() {
348 break; }
350 }
351 }
352 }
353 });
354}
355
356fn spawn_stamps_poller(
359 client: Arc<ApiClient>,
360 tx: watch::Sender<StampsSnapshot>,
361 cancel: CancellationToken,
362 interval: Duration,
363) {
364 tokio::spawn(async move {
365 let mut tick = tokio::time::interval(interval);
366 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
367 loop {
368 tokio::select! {
369 _ = cancel.cancelled() => break,
370 _ = tick.tick() => {
371 let snap = collect_stamps(&client).await;
372 if tx.send(snap).is_err() {
373 break;
374 }
375 }
376 }
377 }
378 });
379}
380
381async fn collect_stamps(client: &ApiClient) -> StampsSnapshot {
382 match client.bee().postage().get_postage_batches().await {
383 Ok(batches) => StampsSnapshot {
384 batches,
385 last_error: None,
386 last_update: Some(Instant::now()),
387 },
388 Err(e) => StampsSnapshot {
389 batches: Vec::new(),
390 last_error: Some(format!("stamps: {e}")),
391 last_update: Some(Instant::now()),
392 },
393 }
394}
395
396fn spawn_swap_poller(
399 client: Arc<ApiClient>,
400 tx: watch::Sender<SwapSnapshot>,
401 cancel: CancellationToken,
402 interval: Duration,
403) {
404 tokio::spawn(async move {
405 let mut tick = tokio::time::interval(interval);
406 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
407 loop {
408 tokio::select! {
409 _ = cancel.cancelled() => break,
410 _ = tick.tick() => {
411 let snap = collect_swap(&client).await;
412 if tx.send(snap).is_err() {
413 break;
414 }
415 }
416 }
417 }
418 });
419}
420
421async fn collect_swap(client: &ApiClient) -> SwapSnapshot {
422 let bee = client.bee();
423 let chequebook = bee.debug().chequebook_balance().await;
424 let chequebook_address = bee.debug().chequebook_address().await;
425 let settlements = bee.debug().settlements().await;
426 let time_settlements = bee.debug().time_settlements().await;
427 let last_received = bee.debug().last_cheques().await;
428
429 let mut snap = SwapSnapshot {
430 last_update: Some(Instant::now()),
431 ..Default::default()
432 };
433 let mut errors: Vec<String> = Vec::new();
434 match chequebook {
435 Ok(c) => snap.chequebook = Some(c),
436 Err(e) => errors.push(format!("chequebook: {e}")),
437 }
438 if let Ok(a) = chequebook_address {
442 snap.chequebook_address = Some(a);
443 }
444 match settlements {
445 Ok(s) => snap.settlements = Some(s),
446 Err(e) => errors.push(format!("settlements: {e}")),
447 }
448 match time_settlements {
449 Ok(s) => snap.time_settlements = Some(s),
450 Err(e) => errors.push(format!("timesettlements: {e}")),
451 }
452 match last_received {
453 Ok(v) => snap.last_received = v,
454 Err(e) => errors.push(format!("cheques: {e}")),
455 }
456 if !errors.is_empty() {
457 snap.last_error = Some(errors.join("; "));
458 }
459 snap
460}
461
462fn spawn_lottery_poller(
465 client: Arc<ApiClient>,
466 tx: watch::Sender<LotterySnapshot>,
467 cancel: CancellationToken,
468 interval: Duration,
469) {
470 tokio::spawn(async move {
471 let mut tick = tokio::time::interval(interval);
472 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
473 loop {
474 tokio::select! {
475 _ = cancel.cancelled() => break,
476 _ = tick.tick() => {
477 let snap = collect_lottery(&client).await;
478 if tx.send(snap).is_err() {
479 break;
480 }
481 }
482 }
483 }
484 });
485}
486
487async fn collect_lottery(client: &ApiClient) -> LotterySnapshot {
488 match client.bee().debug().stake().await {
489 Ok(staked) => LotterySnapshot {
490 staked: Some(staked),
491 last_error: None,
492 last_update: Some(Instant::now()),
493 },
494 Err(e) => LotterySnapshot {
495 staked: None,
496 last_error: Some(format!("stake: {e}")),
497 last_update: Some(Instant::now()),
498 },
499 }
500}
501
502fn spawn_topology_poller(
505 client: Arc<ApiClient>,
506 tx: watch::Sender<TopologySnapshot>,
507 cancel: CancellationToken,
508 interval: Duration,
509) {
510 tokio::spawn(async move {
511 let mut tick = tokio::time::interval(interval);
512 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
513 loop {
514 tokio::select! {
515 _ = cancel.cancelled() => break,
516 _ = tick.tick() => {
517 let snap = collect_topology(&client).await;
518 if tx.send(snap).is_err() {
519 break;
520 }
521 }
522 }
523 }
524 });
525}
526
527async fn collect_topology(client: &ApiClient) -> TopologySnapshot {
528 match client.bee().debug().topology().await {
529 Ok(topology) => TopologySnapshot {
530 topology: Some(topology),
531 last_error: None,
532 last_update: Some(Instant::now()),
533 },
534 Err(e) => TopologySnapshot {
535 topology: None,
536 last_error: Some(format!("topology: {e}")),
537 last_update: Some(Instant::now()),
538 },
539 }
540}
541
542fn spawn_network_poller(
545 client: Arc<ApiClient>,
546 tx: watch::Sender<NetworkSnapshot>,
547 cancel: CancellationToken,
548 interval: Duration,
549) {
550 tokio::spawn(async move {
551 let mut tick = tokio::time::interval(interval);
552 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
553 loop {
554 tokio::select! {
555 _ = cancel.cancelled() => break,
556 _ = tick.tick() => {
557 let snap = collect_network(&client).await;
558 if tx.send(snap).is_err() {
559 break;
560 }
561 }
562 }
563 }
564 });
565}
566
567async fn collect_network(client: &ApiClient) -> NetworkSnapshot {
568 match client.bee().debug().addresses().await {
569 Ok(addresses) => NetworkSnapshot {
570 addresses: Some(addresses),
571 last_error: None,
572 last_update: Some(Instant::now()),
573 },
574 Err(e) => NetworkSnapshot {
575 addresses: None,
576 last_error: Some(format!("addresses: {e}")),
577 last_update: Some(Instant::now()),
578 },
579 }
580}
581
582fn spawn_transactions_poller(
585 client: Arc<ApiClient>,
586 tx: watch::Sender<TransactionsSnapshot>,
587 cancel: CancellationToken,
588 interval: Duration,
589) {
590 tokio::spawn(async move {
591 let mut tick = tokio::time::interval(interval);
592 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
593 loop {
594 tokio::select! {
595 _ = cancel.cancelled() => break,
596 _ = tick.tick() => {
597 let snap = collect_transactions(&client).await;
598 if tx.send(snap).is_err() {
599 break;
600 }
601 }
602 }
603 }
604 });
605}
606
607async fn collect_transactions(client: &ApiClient) -> TransactionsSnapshot {
608 match client.bee().debug().pending_transactions().await {
609 Ok(pending) => TransactionsSnapshot {
610 pending,
611 last_error: None,
612 last_update: Some(Instant::now()),
613 },
614 Err(e) => TransactionsSnapshot {
615 pending: Vec::new(),
616 last_error: Some(format!("transactions: {e}")),
617 last_update: Some(Instant::now()),
618 },
619 }
620}
621
622fn spawn_tags_poller(
625 client: Arc<ApiClient>,
626 tx: watch::Sender<TagsSnapshot>,
627 cancel: CancellationToken,
628 interval: Duration,
629) {
630 tokio::spawn(async move {
631 let mut tick = tokio::time::interval(interval);
632 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
633 loop {
634 tokio::select! {
635 _ = cancel.cancelled() => break,
636 _ = tick.tick() => {
637 let snap = collect_tags(&client).await;
638 if tx.send(snap).is_err() {
639 break;
640 }
641 }
642 }
643 }
644 });
645}
646
647async fn collect_tags(client: &ApiClient) -> TagsSnapshot {
648 match client.bee().api().list_tags(None, None).await {
649 Ok(tags) => TagsSnapshot {
650 tags,
651 last_error: None,
652 last_update: Some(Instant::now()),
653 },
654 Err(e) => TagsSnapshot {
655 tags: Vec::new(),
656 last_error: Some(format!("tags: {e}")),
657 last_update: Some(Instant::now()),
658 },
659 }
660}
661
662async fn collect_health(client: &ApiClient) -> HealthSnapshot {
663 let bee = client.bee();
664
665 let ping_start = Instant::now();
668 let health_ok = bee.debug().health().await.is_ok();
669 let last_ping = health_ok.then(|| ping_start.elapsed());
670
671 let status = bee.debug().status().await;
672 let chain_state = bee.debug().chain_state().await;
673 let wallet = bee.debug().wallet().await;
674 let redistribution = bee.debug().redistribution_state().await;
675
676 let mut snap = HealthSnapshot {
677 last_ping,
678 last_update: Some(Instant::now()),
679 ..Default::default()
680 };
681 let mut errors: Vec<String> = Vec::new();
682 match status {
683 Ok(s) => snap.status = Some(s),
684 Err(e) => errors.push(format!("status: {e}")),
685 }
686 match chain_state {
687 Ok(c) => snap.chain_state = Some(c),
688 Err(e) => errors.push(format!("chainstate: {e}")),
689 }
690 match wallet {
691 Ok(w) => snap.wallet = Some(w),
692 Err(e) => errors.push(format!("wallet: {e}")),
693 }
694 match redistribution {
695 Ok(r) => snap.redistribution = Some(r),
696 Err(e) => errors.push(format!("redistributionstate: {e}")),
697 }
698 if !errors.is_empty() {
699 snap.last_error = Some(errors.join("; "));
700 }
701 snap
702}
703
704#[cfg(test)]
705mod tests {
706 use super::*;
707
708 #[test]
709 fn fully_loaded_default_is_false() {
710 assert!(!HealthSnapshot::default().is_fully_loaded());
711 }
712
713 #[test]
714 fn fully_loaded_requires_no_error_and_all_fields() {
715 let snap = HealthSnapshot {
718 status: Some(Status::default()),
719 chain_state: Some(serde_json::from_str(r#"{"block":0,"chainTip":0}"#).unwrap()),
720 wallet: Some(
721 serde_json::from_str(
722 r#"{"chainID":1,"walletAddress":"0x0000000000000000000000000000000000000000"}"#,
723 )
724 .unwrap(),
725 ),
726 redistribution: Some(RedistributionState::default()),
727 ..Default::default()
728 };
729 assert!(snap.is_fully_loaded());
730 let mut bad = snap;
731 bad.last_error = Some("boom".into());
732 assert!(!bad.is_fully_loaded());
733 }
734}