1#![allow(dead_code)] use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use bee::api::Tag;
22use bee::debug::{
23 Addresses, ChainState, ChequebookBalance, LastCheque, RedistributionState, Settlements, Status,
24 Topology, TransactionInfo, Wallet,
25};
26use bee::postage::PostageBatch;
27use bee::swarm::Reference;
28use num_bigint::BigInt;
29use tokio::sync::watch;
30use tokio_util::sync::CancellationToken;
31
32use crate::api::ApiClient;
33
34#[derive(Clone, Debug, Default)]
38pub struct HealthSnapshot {
39 pub status: Option<Status>,
40 pub chain_state: Option<ChainState>,
41 pub wallet: Option<Wallet>,
42 pub redistribution: Option<RedistributionState>,
43 pub last_ping: Option<Duration>,
46 pub last_error: Option<String>,
49 pub last_update: Option<Instant>,
52}
53
54impl HealthSnapshot {
55 pub fn is_fully_loaded(&self) -> bool {
58 self.last_error.is_none()
59 && self.status.is_some()
60 && self.chain_state.is_some()
61 && self.wallet.is_some()
62 && self.redistribution.is_some()
63 }
64}
65
66#[derive(Clone, Debug, Default)]
70pub struct StampsSnapshot {
71 pub batches: Vec<PostageBatch>,
72 pub last_error: Option<String>,
73 pub last_update: Option<Instant>,
74}
75
76impl StampsSnapshot {
77 pub fn is_loaded(&self) -> bool {
78 self.last_update.is_some() && self.last_error.is_none()
79 }
80}
81
82#[derive(Clone, Debug, Default)]
86pub struct SwapSnapshot {
87 pub chequebook: Option<ChequebookBalance>,
88 pub chequebook_address: Option<String>,
93 pub settlements: Option<Settlements>,
94 pub time_settlements: Option<Settlements>,
95 pub last_received: Vec<LastCheque>,
97 pub last_error: Option<String>,
98 pub last_update: Option<Instant>,
99}
100
101impl SwapSnapshot {
102 pub fn is_loaded(&self) -> bool {
103 self.last_update.is_some() && self.last_error.is_none()
104 }
105}
106
107#[derive(Clone, Debug, Default)]
114pub struct TagsSnapshot {
115 pub tags: Vec<Tag>,
116 pub last_error: Option<String>,
117 pub last_update: Option<Instant>,
118}
119
120impl TagsSnapshot {
121 pub fn is_loaded(&self) -> bool {
122 self.last_update.is_some() && self.last_error.is_none()
123 }
124}
125
126#[derive(Clone, Debug, Default)]
132pub struct PinsSnapshot {
133 pub pins: Vec<Reference>,
135 pub last_error: Option<String>,
136 pub last_update: Option<Instant>,
137}
138
139impl PinsSnapshot {
140 pub fn is_loaded(&self) -> bool {
141 self.last_update.is_some() && self.last_error.is_none()
142 }
143}
144
145#[derive(Clone, Debug, Default)]
151pub struct TransactionsSnapshot {
152 pub pending: Vec<TransactionInfo>,
153 pub last_error: Option<String>,
154 pub last_update: Option<Instant>,
155}
156
157impl TransactionsSnapshot {
158 pub fn is_loaded(&self) -> bool {
159 self.last_update.is_some() && self.last_error.is_none()
160 }
161}
162
163#[derive(Clone, Debug, Default)]
168pub struct NetworkSnapshot {
169 pub addresses: Option<Addresses>,
170 pub last_error: Option<String>,
171 pub last_update: Option<Instant>,
172}
173
174impl NetworkSnapshot {
175 pub fn is_loaded(&self) -> bool {
176 self.addresses.is_some() && self.last_error.is_none()
177 }
178}
179
180#[derive(Clone, Debug, Default)]
185pub struct TopologySnapshot {
186 pub topology: Option<Topology>,
187 pub last_error: Option<String>,
188 pub last_update: Option<Instant>,
189}
190
191impl TopologySnapshot {
192 pub fn is_loaded(&self) -> bool {
193 self.topology.is_some() && self.last_error.is_none()
194 }
195}
196
197#[derive(Clone, Debug, Default)]
203pub struct LotterySnapshot {
204 pub staked: Option<BigInt>,
206 pub last_error: Option<String>,
207 pub last_update: Option<Instant>,
208}
209
210impl LotterySnapshot {
211 pub fn is_loaded(&self) -> bool {
212 self.last_update.is_some() && self.last_error.is_none()
213 }
214}
215
216#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum RefreshProfile {
221 Live,
225 Default,
230 Slow,
234}
235
236impl RefreshProfile {
237 pub fn from_config(s: &str) -> Self {
241 match s {
242 "live" => Self::Live,
243 "default" => Self::Default,
244 "slow" => Self::Slow,
245 other => {
246 tracing::warn!(
247 "unknown [ui].refresh value {other:?}; falling back to \"default\". \
248 Recognised: live | default | slow."
249 );
250 Self::Default
251 }
252 }
253 }
254
255 pub fn health(self) -> Duration {
256 match self {
257 Self::Live => Duration::from_secs(2),
258 Self::Default => Duration::from_secs(4),
259 Self::Slow => Duration::from_secs(8),
260 }
261 }
262 pub fn topology(self) -> Duration {
263 match self {
264 Self::Live => Duration::from_secs(5),
265 Self::Default => Duration::from_secs(10),
266 Self::Slow => Duration::from_secs(20),
267 }
268 }
269 pub fn stamps(self) -> Duration {
270 match self {
273 Self::Live | Self::Default => Duration::from_secs(10),
274 Self::Slow => Duration::from_secs(20),
275 }
276 }
277 pub fn tags(self) -> Duration {
278 match self {
279 Self::Live => Duration::from_secs(5),
280 Self::Default => Duration::from_secs(10),
281 Self::Slow => Duration::from_secs(20),
282 }
283 }
284 pub fn swap(self) -> Duration {
285 match self {
286 Self::Live | Self::Default => Duration::from_secs(30),
287 Self::Slow => Duration::from_secs(60),
288 }
289 }
290 pub fn lottery(self) -> Duration {
291 match self {
292 Self::Live | Self::Default => Duration::from_secs(30),
293 Self::Slow => Duration::from_secs(60),
294 }
295 }
296 pub fn transactions(self) -> Duration {
297 match self {
298 Self::Live | Self::Default => Duration::from_secs(30),
299 Self::Slow => Duration::from_secs(60),
300 }
301 }
302 pub fn network(self) -> Duration {
303 match self {
304 Self::Live | Self::Default => Duration::from_secs(60),
305 Self::Slow => Duration::from_secs(120),
306 }
307 }
308 pub fn pins(self) -> Duration {
309 match self {
313 Self::Live | Self::Default => Duration::from_secs(30),
314 Self::Slow => Duration::from_secs(60),
315 }
316 }
317}
318
319#[derive(Clone, Debug)]
323pub struct BeeWatch {
324 health_rx: watch::Receiver<HealthSnapshot>,
325 stamps_rx: watch::Receiver<StampsSnapshot>,
326 swap_rx: watch::Receiver<SwapSnapshot>,
327 lottery_rx: watch::Receiver<LotterySnapshot>,
328 topology_rx: watch::Receiver<TopologySnapshot>,
329 network_rx: watch::Receiver<NetworkSnapshot>,
330 transactions_rx: watch::Receiver<TransactionsSnapshot>,
331 tags_rx: watch::Receiver<TagsSnapshot>,
332 pins_rx: watch::Receiver<PinsSnapshot>,
333 cancel: CancellationToken,
334}
335
336impl BeeWatch {
337 pub fn start(client: Arc<ApiClient>, parent_cancel: &CancellationToken) -> Self {
340 Self::start_with_profile(client, parent_cancel, RefreshProfile::Default)
341 }
342
343 pub fn start_with_profile(
347 client: Arc<ApiClient>,
348 parent_cancel: &CancellationToken,
349 profile: RefreshProfile,
350 ) -> Self {
351 let cancel = parent_cancel.child_token();
352 let (health_tx, health_rx) = watch::channel(HealthSnapshot::default());
353 spawn_health_poller(client.clone(), health_tx, cancel.clone(), profile.health());
354 let (stamps_tx, stamps_rx) = watch::channel(StampsSnapshot::default());
355 spawn_stamps_poller(client.clone(), stamps_tx, cancel.clone(), profile.stamps());
356 let (swap_tx, swap_rx) = watch::channel(SwapSnapshot::default());
357 spawn_swap_poller(client.clone(), swap_tx, cancel.clone(), profile.swap());
358 let (lottery_tx, lottery_rx) = watch::channel(LotterySnapshot::default());
359 spawn_lottery_poller(
360 client.clone(),
361 lottery_tx,
362 cancel.clone(),
363 profile.lottery(),
364 );
365 let (topology_tx, topology_rx) = watch::channel(TopologySnapshot::default());
366 spawn_topology_poller(
367 client.clone(),
368 topology_tx,
369 cancel.clone(),
370 profile.topology(),
371 );
372 let (network_tx, network_rx) = watch::channel(NetworkSnapshot::default());
373 spawn_network_poller(
374 client.clone(),
375 network_tx,
376 cancel.clone(),
377 profile.network(),
378 );
379 let (transactions_tx, transactions_rx) = watch::channel(TransactionsSnapshot::default());
380 spawn_transactions_poller(
381 client.clone(),
382 transactions_tx,
383 cancel.clone(),
384 profile.transactions(),
385 );
386 let (tags_tx, tags_rx) = watch::channel(TagsSnapshot::default());
387 spawn_tags_poller(client.clone(), tags_tx, cancel.clone(), profile.tags());
388 let (pins_tx, pins_rx) = watch::channel(PinsSnapshot::default());
389 spawn_pins_poller(client, pins_tx, cancel.clone(), profile.pins());
390 Self {
391 health_rx,
392 stamps_rx,
393 swap_rx,
394 lottery_rx,
395 topology_rx,
396 network_rx,
397 transactions_rx,
398 tags_rx,
399 pins_rx,
400 cancel,
401 }
402 }
403
404 pub fn health(&self) -> watch::Receiver<HealthSnapshot> {
407 self.health_rx.clone()
408 }
409
410 pub fn stamps(&self) -> watch::Receiver<StampsSnapshot> {
412 self.stamps_rx.clone()
413 }
414
415 pub fn swap(&self) -> watch::Receiver<SwapSnapshot> {
417 self.swap_rx.clone()
418 }
419
420 pub fn lottery(&self) -> watch::Receiver<LotterySnapshot> {
422 self.lottery_rx.clone()
423 }
424
425 pub fn topology(&self) -> watch::Receiver<TopologySnapshot> {
427 self.topology_rx.clone()
428 }
429
430 pub fn network(&self) -> watch::Receiver<NetworkSnapshot> {
432 self.network_rx.clone()
433 }
434
435 pub fn transactions(&self) -> watch::Receiver<TransactionsSnapshot> {
438 self.transactions_rx.clone()
439 }
440
441 pub fn tags(&self) -> watch::Receiver<TagsSnapshot> {
443 self.tags_rx.clone()
444 }
445
446 pub fn pins(&self) -> watch::Receiver<PinsSnapshot> {
451 self.pins_rx.clone()
452 }
453
454 pub fn shutdown(&self) {
456 self.cancel.cancel();
457 }
458}
459
460fn spawn_health_poller(
463 client: Arc<ApiClient>,
464 tx: watch::Sender<HealthSnapshot>,
465 cancel: CancellationToken,
466 interval: Duration,
467) {
468 tokio::spawn(async move {
469 let mut tick = tokio::time::interval(interval);
470 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
471 loop {
472 tokio::select! {
473 _ = cancel.cancelled() => break,
474 _ = tick.tick() => {
475 let snap = collect_health(&client).await;
476 if tx.send(snap).is_err() {
477 break; }
479 }
480 }
481 }
482 });
483}
484
485fn spawn_stamps_poller(
488 client: Arc<ApiClient>,
489 tx: watch::Sender<StampsSnapshot>,
490 cancel: CancellationToken,
491 interval: Duration,
492) {
493 tokio::spawn(async move {
494 let mut tick = tokio::time::interval(interval);
495 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
496 loop {
497 tokio::select! {
498 _ = cancel.cancelled() => break,
499 _ = tick.tick() => {
500 let snap = collect_stamps(&client).await;
501 if tx.send(snap).is_err() {
502 break;
503 }
504 }
505 }
506 }
507 });
508}
509
510async fn collect_stamps(client: &ApiClient) -> StampsSnapshot {
511 match client.bee().postage().get_postage_batches().await {
512 Ok(batches) => StampsSnapshot {
513 batches,
514 last_error: None,
515 last_update: Some(Instant::now()),
516 },
517 Err(e) => StampsSnapshot {
518 batches: Vec::new(),
519 last_error: Some(format!("stamps: {e}")),
520 last_update: Some(Instant::now()),
521 },
522 }
523}
524
525fn spawn_swap_poller(
528 client: Arc<ApiClient>,
529 tx: watch::Sender<SwapSnapshot>,
530 cancel: CancellationToken,
531 interval: Duration,
532) {
533 tokio::spawn(async move {
534 let mut tick = tokio::time::interval(interval);
535 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
536 loop {
537 tokio::select! {
538 _ = cancel.cancelled() => break,
539 _ = tick.tick() => {
540 let snap = collect_swap(&client).await;
541 if tx.send(snap).is_err() {
542 break;
543 }
544 }
545 }
546 }
547 });
548}
549
550async fn collect_swap(client: &ApiClient) -> SwapSnapshot {
551 let bee = client.bee();
552 let chequebook = bee.debug().chequebook_balance().await;
553 let chequebook_address = bee.debug().chequebook_address().await;
554 let settlements = bee.debug().settlements().await;
555 let time_settlements = bee.debug().time_settlements().await;
556 let last_received = bee.debug().last_cheques().await;
557
558 let mut snap = SwapSnapshot {
559 last_update: Some(Instant::now()),
560 ..Default::default()
561 };
562 let mut errors: Vec<String> = Vec::new();
563 match chequebook {
564 Ok(c) => snap.chequebook = Some(c),
565 Err(e) => errors.push(format!("chequebook: {e}")),
566 }
567 if let Ok(a) = chequebook_address {
571 snap.chequebook_address = Some(a);
572 }
573 match settlements {
574 Ok(s) => snap.settlements = Some(s),
575 Err(e) => errors.push(format!("settlements: {e}")),
576 }
577 match time_settlements {
578 Ok(s) => snap.time_settlements = Some(s),
579 Err(e) => errors.push(format!("timesettlements: {e}")),
580 }
581 match last_received {
582 Ok(v) => snap.last_received = v,
583 Err(e) => errors.push(format!("cheques: {e}")),
584 }
585 if !errors.is_empty() {
586 snap.last_error = Some(errors.join("; "));
587 }
588 snap
589}
590
591fn spawn_lottery_poller(
594 client: Arc<ApiClient>,
595 tx: watch::Sender<LotterySnapshot>,
596 cancel: CancellationToken,
597 interval: Duration,
598) {
599 tokio::spawn(async move {
600 let mut tick = tokio::time::interval(interval);
601 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
602 loop {
603 tokio::select! {
604 _ = cancel.cancelled() => break,
605 _ = tick.tick() => {
606 let snap = collect_lottery(&client).await;
607 if tx.send(snap).is_err() {
608 break;
609 }
610 }
611 }
612 }
613 });
614}
615
616async fn collect_lottery(client: &ApiClient) -> LotterySnapshot {
617 match client.bee().debug().stake().await {
618 Ok(staked) => LotterySnapshot {
619 staked: Some(staked),
620 last_error: None,
621 last_update: Some(Instant::now()),
622 },
623 Err(e) => LotterySnapshot {
624 staked: None,
625 last_error: Some(format!("stake: {e}")),
626 last_update: Some(Instant::now()),
627 },
628 }
629}
630
631fn spawn_topology_poller(
634 client: Arc<ApiClient>,
635 tx: watch::Sender<TopologySnapshot>,
636 cancel: CancellationToken,
637 interval: Duration,
638) {
639 tokio::spawn(async move {
640 let mut tick = tokio::time::interval(interval);
641 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
642 loop {
643 tokio::select! {
644 _ = cancel.cancelled() => break,
645 _ = tick.tick() => {
646 let snap = collect_topology(&client).await;
647 if tx.send(snap).is_err() {
648 break;
649 }
650 }
651 }
652 }
653 });
654}
655
656async fn collect_topology(client: &ApiClient) -> TopologySnapshot {
657 match client.bee().debug().topology().await {
658 Ok(topology) => TopologySnapshot {
659 topology: Some(topology),
660 last_error: None,
661 last_update: Some(Instant::now()),
662 },
663 Err(e) => TopologySnapshot {
664 topology: None,
665 last_error: Some(format!("topology: {e}")),
666 last_update: Some(Instant::now()),
667 },
668 }
669}
670
671fn spawn_network_poller(
674 client: Arc<ApiClient>,
675 tx: watch::Sender<NetworkSnapshot>,
676 cancel: CancellationToken,
677 interval: Duration,
678) {
679 tokio::spawn(async move {
680 let mut tick = tokio::time::interval(interval);
681 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
682 loop {
683 tokio::select! {
684 _ = cancel.cancelled() => break,
685 _ = tick.tick() => {
686 let snap = collect_network(&client).await;
687 if tx.send(snap).is_err() {
688 break;
689 }
690 }
691 }
692 }
693 });
694}
695
696async fn collect_network(client: &ApiClient) -> NetworkSnapshot {
697 match client.bee().debug().addresses().await {
698 Ok(addresses) => NetworkSnapshot {
699 addresses: Some(addresses),
700 last_error: None,
701 last_update: Some(Instant::now()),
702 },
703 Err(e) => NetworkSnapshot {
704 addresses: None,
705 last_error: Some(format!("addresses: {e}")),
706 last_update: Some(Instant::now()),
707 },
708 }
709}
710
711fn spawn_transactions_poller(
714 client: Arc<ApiClient>,
715 tx: watch::Sender<TransactionsSnapshot>,
716 cancel: CancellationToken,
717 interval: Duration,
718) {
719 tokio::spawn(async move {
720 let mut tick = tokio::time::interval(interval);
721 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
722 loop {
723 tokio::select! {
724 _ = cancel.cancelled() => break,
725 _ = tick.tick() => {
726 let snap = collect_transactions(&client).await;
727 if tx.send(snap).is_err() {
728 break;
729 }
730 }
731 }
732 }
733 });
734}
735
736async fn collect_transactions(client: &ApiClient) -> TransactionsSnapshot {
737 match client.bee().debug().pending_transactions().await {
738 Ok(pending) => TransactionsSnapshot {
739 pending,
740 last_error: None,
741 last_update: Some(Instant::now()),
742 },
743 Err(e) => TransactionsSnapshot {
744 pending: Vec::new(),
745 last_error: Some(format!("transactions: {e}")),
746 last_update: Some(Instant::now()),
747 },
748 }
749}
750
751fn spawn_tags_poller(
754 client: Arc<ApiClient>,
755 tx: watch::Sender<TagsSnapshot>,
756 cancel: CancellationToken,
757 interval: Duration,
758) {
759 tokio::spawn(async move {
760 let mut tick = tokio::time::interval(interval);
761 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
762 loop {
763 tokio::select! {
764 _ = cancel.cancelled() => break,
765 _ = tick.tick() => {
766 let snap = collect_tags(&client).await;
767 if tx.send(snap).is_err() {
768 break;
769 }
770 }
771 }
772 }
773 });
774}
775
776async fn collect_tags(client: &ApiClient) -> TagsSnapshot {
777 match client.bee().api().list_tags(None, None).await {
778 Ok(tags) => TagsSnapshot {
779 tags,
780 last_error: None,
781 last_update: Some(Instant::now()),
782 },
783 Err(e) => TagsSnapshot {
784 tags: Vec::new(),
785 last_error: Some(format!("tags: {e}")),
786 last_update: Some(Instant::now()),
787 },
788 }
789}
790
791fn spawn_pins_poller(
796 client: Arc<ApiClient>,
797 tx: watch::Sender<PinsSnapshot>,
798 cancel: CancellationToken,
799 interval: Duration,
800) {
801 tokio::spawn(async move {
802 let mut tick = tokio::time::interval(interval);
803 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
804 loop {
805 tokio::select! {
806 _ = cancel.cancelled() => break,
807 _ = tick.tick() => {
808 let snap = collect_pins(&client).await;
809 if tx.send(snap).is_err() {
810 break;
811 }
812 }
813 }
814 }
815 });
816}
817
818async fn collect_pins(client: &ApiClient) -> PinsSnapshot {
819 match client.bee().api().list_pins().await {
820 Ok(pins) => PinsSnapshot {
821 pins,
822 last_error: None,
823 last_update: Some(Instant::now()),
824 },
825 Err(e) => PinsSnapshot {
826 pins: Vec::new(),
827 last_error: Some(format!("pins: {e}")),
828 last_update: Some(Instant::now()),
829 },
830 }
831}
832
833async fn collect_health(client: &ApiClient) -> HealthSnapshot {
834 let bee = client.bee();
835
836 let ping_start = Instant::now();
839 let health_ok = bee.debug().health().await.is_ok();
840 let last_ping = health_ok.then(|| ping_start.elapsed());
841
842 let status = bee.debug().status().await;
843 let chain_state = bee.debug().chain_state().await;
844 let wallet = bee.debug().wallet().await;
845 let redistribution = bee.debug().redistribution_state().await;
846
847 let mut snap = HealthSnapshot {
848 last_ping,
849 last_update: Some(Instant::now()),
850 ..Default::default()
851 };
852 let mut errors: Vec<String> = Vec::new();
853 match status {
854 Ok(s) => snap.status = Some(s),
855 Err(e) => errors.push(format!("status: {e}")),
856 }
857 match chain_state {
858 Ok(c) => snap.chain_state = Some(c),
859 Err(e) => errors.push(format!("chainstate: {e}")),
860 }
861 match wallet {
862 Ok(w) => snap.wallet = Some(w),
863 Err(e) => errors.push(format!("wallet: {e}")),
864 }
865 match redistribution {
866 Ok(r) => snap.redistribution = Some(r),
867 Err(e) => errors.push(format!("redistributionstate: {e}")),
868 }
869 if !errors.is_empty() {
870 snap.last_error = Some(errors.join("; "));
871 }
872 snap
873}
874
875#[cfg(test)]
876mod tests {
877 use super::*;
878
879 #[test]
880 fn fully_loaded_default_is_false() {
881 assert!(!HealthSnapshot::default().is_fully_loaded());
882 }
883
884 #[test]
885 fn fully_loaded_requires_no_error_and_all_fields() {
886 let snap = HealthSnapshot {
889 status: Some(Status::default()),
890 chain_state: Some(serde_json::from_str(r#"{"block":0,"chainTip":0}"#).unwrap()),
891 wallet: Some(
892 serde_json::from_str(
893 r#"{"chainID":1,"walletAddress":"0x0000000000000000000000000000000000000000"}"#,
894 )
895 .unwrap(),
896 ),
897 redistribution: Some(RedistributionState::default()),
898 ..Default::default()
899 };
900 assert!(snap.is_fully_loaded());
901 let mut bad = snap;
902 bad.last_error = Some("boom".into());
903 assert!(!bad.is_fully_loaded());
904 }
905}