1#![allow(dead_code)] use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use bee::api::Tag;
22use bee::debug::{
23 Addresses, ChainState, ChequebookBalance, LastCheque, RedistributionState, Settlements, Status,
24 Topology, TransactionInfo, Wallet,
25};
26use bee::postage::PostageBatch;
27use num_bigint::BigInt;
28use tokio::sync::watch;
29use tokio_util::sync::CancellationToken;
30
31use crate::api::ApiClient;
32
33#[derive(Clone, Debug, Default)]
37pub struct HealthSnapshot {
38 pub status: Option<Status>,
39 pub chain_state: Option<ChainState>,
40 pub wallet: Option<Wallet>,
41 pub redistribution: Option<RedistributionState>,
42 pub last_ping: Option<Duration>,
45 pub last_error: Option<String>,
48 pub last_update: Option<Instant>,
51}
52
53impl HealthSnapshot {
54 pub fn is_fully_loaded(&self) -> bool {
57 self.last_error.is_none()
58 && self.status.is_some()
59 && self.chain_state.is_some()
60 && self.wallet.is_some()
61 && self.redistribution.is_some()
62 }
63}
64
65#[derive(Clone, Debug, Default)]
69pub struct StampsSnapshot {
70 pub batches: Vec<PostageBatch>,
71 pub last_error: Option<String>,
72 pub last_update: Option<Instant>,
73}
74
75impl StampsSnapshot {
76 pub fn is_loaded(&self) -> bool {
77 self.last_update.is_some() && self.last_error.is_none()
78 }
79}
80
81#[derive(Clone, Debug, Default)]
85pub struct SwapSnapshot {
86 pub chequebook: Option<ChequebookBalance>,
87 pub chequebook_address: Option<String>,
92 pub settlements: Option<Settlements>,
93 pub time_settlements: Option<Settlements>,
94 pub last_received: Vec<LastCheque>,
96 pub last_error: Option<String>,
97 pub last_update: Option<Instant>,
98}
99
100impl SwapSnapshot {
101 pub fn is_loaded(&self) -> bool {
102 self.last_update.is_some() && self.last_error.is_none()
103 }
104}
105
106#[derive(Clone, Debug, Default)]
113pub struct TagsSnapshot {
114 pub tags: Vec<Tag>,
115 pub last_error: Option<String>,
116 pub last_update: Option<Instant>,
117}
118
119impl TagsSnapshot {
120 pub fn is_loaded(&self) -> bool {
121 self.last_update.is_some() && self.last_error.is_none()
122 }
123}
124
125#[derive(Clone, Debug, Default)]
131pub struct TransactionsSnapshot {
132 pub pending: Vec<TransactionInfo>,
133 pub last_error: Option<String>,
134 pub last_update: Option<Instant>,
135}
136
137impl TransactionsSnapshot {
138 pub fn is_loaded(&self) -> bool {
139 self.last_update.is_some() && self.last_error.is_none()
140 }
141}
142
143#[derive(Clone, Debug, Default)]
148pub struct NetworkSnapshot {
149 pub addresses: Option<Addresses>,
150 pub last_error: Option<String>,
151 pub last_update: Option<Instant>,
152}
153
154impl NetworkSnapshot {
155 pub fn is_loaded(&self) -> bool {
156 self.addresses.is_some() && self.last_error.is_none()
157 }
158}
159
160#[derive(Clone, Debug, Default)]
165pub struct TopologySnapshot {
166 pub topology: Option<Topology>,
167 pub last_error: Option<String>,
168 pub last_update: Option<Instant>,
169}
170
171impl TopologySnapshot {
172 pub fn is_loaded(&self) -> bool {
173 self.topology.is_some() && self.last_error.is_none()
174 }
175}
176
177#[derive(Clone, Debug, Default)]
183pub struct LotterySnapshot {
184 pub staked: Option<BigInt>,
186 pub last_error: Option<String>,
187 pub last_update: Option<Instant>,
188}
189
190impl LotterySnapshot {
191 pub fn is_loaded(&self) -> bool {
192 self.last_update.is_some() && self.last_error.is_none()
193 }
194}
195
196#[derive(Clone, Debug)]
200pub struct BeeWatch {
201 health_rx: watch::Receiver<HealthSnapshot>,
202 stamps_rx: watch::Receiver<StampsSnapshot>,
203 swap_rx: watch::Receiver<SwapSnapshot>,
204 lottery_rx: watch::Receiver<LotterySnapshot>,
205 topology_rx: watch::Receiver<TopologySnapshot>,
206 network_rx: watch::Receiver<NetworkSnapshot>,
207 transactions_rx: watch::Receiver<TransactionsSnapshot>,
208 tags_rx: watch::Receiver<TagsSnapshot>,
209 cancel: CancellationToken,
210}
211
212impl BeeWatch {
213 pub fn start(client: Arc<ApiClient>, parent_cancel: &CancellationToken) -> Self {
217 let cancel = parent_cancel.child_token();
218 let (health_tx, health_rx) = watch::channel(HealthSnapshot::default());
219 spawn_health_poller(
220 client.clone(),
221 health_tx,
222 cancel.clone(),
223 Duration::from_secs(2),
224 );
225 let (stamps_tx, stamps_rx) = watch::channel(StampsSnapshot::default());
226 spawn_stamps_poller(
227 client.clone(),
228 stamps_tx,
229 cancel.clone(),
230 Duration::from_secs(10),
231 );
232 let (swap_tx, swap_rx) = watch::channel(SwapSnapshot::default());
233 spawn_swap_poller(
234 client.clone(),
235 swap_tx,
236 cancel.clone(),
237 Duration::from_secs(30),
238 );
239 let (lottery_tx, lottery_rx) = watch::channel(LotterySnapshot::default());
240 spawn_lottery_poller(
241 client.clone(),
242 lottery_tx,
243 cancel.clone(),
244 Duration::from_secs(30),
245 );
246 let (topology_tx, topology_rx) = watch::channel(TopologySnapshot::default());
247 spawn_topology_poller(
248 client.clone(),
249 topology_tx,
250 cancel.clone(),
251 Duration::from_secs(5),
252 );
253 let (network_tx, network_rx) = watch::channel(NetworkSnapshot::default());
254 spawn_network_poller(
255 client.clone(),
256 network_tx,
257 cancel.clone(),
258 Duration::from_secs(60),
259 );
260 let (transactions_tx, transactions_rx) = watch::channel(TransactionsSnapshot::default());
261 spawn_transactions_poller(
262 client.clone(),
263 transactions_tx,
264 cancel.clone(),
265 Duration::from_secs(30),
266 );
267 let (tags_tx, tags_rx) = watch::channel(TagsSnapshot::default());
268 spawn_tags_poller(client, tags_tx, cancel.clone(), Duration::from_secs(5));
269 Self {
270 health_rx,
271 stamps_rx,
272 swap_rx,
273 lottery_rx,
274 topology_rx,
275 network_rx,
276 transactions_rx,
277 tags_rx,
278 cancel,
279 }
280 }
281
282 pub fn health(&self) -> watch::Receiver<HealthSnapshot> {
285 self.health_rx.clone()
286 }
287
288 pub fn stamps(&self) -> watch::Receiver<StampsSnapshot> {
290 self.stamps_rx.clone()
291 }
292
293 pub fn swap(&self) -> watch::Receiver<SwapSnapshot> {
295 self.swap_rx.clone()
296 }
297
298 pub fn lottery(&self) -> watch::Receiver<LotterySnapshot> {
300 self.lottery_rx.clone()
301 }
302
303 pub fn topology(&self) -> watch::Receiver<TopologySnapshot> {
305 self.topology_rx.clone()
306 }
307
308 pub fn network(&self) -> watch::Receiver<NetworkSnapshot> {
310 self.network_rx.clone()
311 }
312
313 pub fn transactions(&self) -> watch::Receiver<TransactionsSnapshot> {
316 self.transactions_rx.clone()
317 }
318
319 pub fn tags(&self) -> watch::Receiver<TagsSnapshot> {
321 self.tags_rx.clone()
322 }
323
324 pub fn shutdown(&self) {
326 self.cancel.cancel();
327 }
328}
329
330fn spawn_health_poller(
333 client: Arc<ApiClient>,
334 tx: watch::Sender<HealthSnapshot>,
335 cancel: CancellationToken,
336 interval: Duration,
337) {
338 tokio::spawn(async move {
339 let mut tick = tokio::time::interval(interval);
340 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
341 loop {
342 tokio::select! {
343 _ = cancel.cancelled() => break,
344 _ = tick.tick() => {
345 let snap = collect_health(&client).await;
346 if tx.send(snap).is_err() {
347 break; }
349 }
350 }
351 }
352 });
353}
354
355fn spawn_stamps_poller(
358 client: Arc<ApiClient>,
359 tx: watch::Sender<StampsSnapshot>,
360 cancel: CancellationToken,
361 interval: Duration,
362) {
363 tokio::spawn(async move {
364 let mut tick = tokio::time::interval(interval);
365 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
366 loop {
367 tokio::select! {
368 _ = cancel.cancelled() => break,
369 _ = tick.tick() => {
370 let snap = collect_stamps(&client).await;
371 if tx.send(snap).is_err() {
372 break;
373 }
374 }
375 }
376 }
377 });
378}
379
380async fn collect_stamps(client: &ApiClient) -> StampsSnapshot {
381 match client.bee().postage().get_postage_batches().await {
382 Ok(batches) => StampsSnapshot {
383 batches,
384 last_error: None,
385 last_update: Some(Instant::now()),
386 },
387 Err(e) => StampsSnapshot {
388 batches: Vec::new(),
389 last_error: Some(format!("stamps: {e}")),
390 last_update: Some(Instant::now()),
391 },
392 }
393}
394
395fn spawn_swap_poller(
398 client: Arc<ApiClient>,
399 tx: watch::Sender<SwapSnapshot>,
400 cancel: CancellationToken,
401 interval: Duration,
402) {
403 tokio::spawn(async move {
404 let mut tick = tokio::time::interval(interval);
405 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
406 loop {
407 tokio::select! {
408 _ = cancel.cancelled() => break,
409 _ = tick.tick() => {
410 let snap = collect_swap(&client).await;
411 if tx.send(snap).is_err() {
412 break;
413 }
414 }
415 }
416 }
417 });
418}
419
420async fn collect_swap(client: &ApiClient) -> SwapSnapshot {
421 let bee = client.bee();
422 let chequebook = bee.debug().chequebook_balance().await;
423 let chequebook_address = bee.debug().chequebook_address().await;
424 let settlements = bee.debug().settlements().await;
425 let time_settlements = bee.debug().time_settlements().await;
426 let last_received = bee.debug().last_cheques().await;
427
428 let mut snap = SwapSnapshot {
429 last_update: Some(Instant::now()),
430 ..Default::default()
431 };
432 let mut errors: Vec<String> = Vec::new();
433 match chequebook {
434 Ok(c) => snap.chequebook = Some(c),
435 Err(e) => errors.push(format!("chequebook: {e}")),
436 }
437 if let Ok(a) = chequebook_address {
441 snap.chequebook_address = Some(a);
442 }
443 match settlements {
444 Ok(s) => snap.settlements = Some(s),
445 Err(e) => errors.push(format!("settlements: {e}")),
446 }
447 match time_settlements {
448 Ok(s) => snap.time_settlements = Some(s),
449 Err(e) => errors.push(format!("timesettlements: {e}")),
450 }
451 match last_received {
452 Ok(v) => snap.last_received = v,
453 Err(e) => errors.push(format!("cheques: {e}")),
454 }
455 if !errors.is_empty() {
456 snap.last_error = Some(errors.join("; "));
457 }
458 snap
459}
460
461fn spawn_lottery_poller(
464 client: Arc<ApiClient>,
465 tx: watch::Sender<LotterySnapshot>,
466 cancel: CancellationToken,
467 interval: Duration,
468) {
469 tokio::spawn(async move {
470 let mut tick = tokio::time::interval(interval);
471 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
472 loop {
473 tokio::select! {
474 _ = cancel.cancelled() => break,
475 _ = tick.tick() => {
476 let snap = collect_lottery(&client).await;
477 if tx.send(snap).is_err() {
478 break;
479 }
480 }
481 }
482 }
483 });
484}
485
486async fn collect_lottery(client: &ApiClient) -> LotterySnapshot {
487 match client.bee().debug().stake().await {
488 Ok(staked) => LotterySnapshot {
489 staked: Some(staked),
490 last_error: None,
491 last_update: Some(Instant::now()),
492 },
493 Err(e) => LotterySnapshot {
494 staked: None,
495 last_error: Some(format!("stake: {e}")),
496 last_update: Some(Instant::now()),
497 },
498 }
499}
500
501fn spawn_topology_poller(
504 client: Arc<ApiClient>,
505 tx: watch::Sender<TopologySnapshot>,
506 cancel: CancellationToken,
507 interval: Duration,
508) {
509 tokio::spawn(async move {
510 let mut tick = tokio::time::interval(interval);
511 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
512 loop {
513 tokio::select! {
514 _ = cancel.cancelled() => break,
515 _ = tick.tick() => {
516 let snap = collect_topology(&client).await;
517 if tx.send(snap).is_err() {
518 break;
519 }
520 }
521 }
522 }
523 });
524}
525
526async fn collect_topology(client: &ApiClient) -> TopologySnapshot {
527 match client.bee().debug().topology().await {
528 Ok(topology) => TopologySnapshot {
529 topology: Some(topology),
530 last_error: None,
531 last_update: Some(Instant::now()),
532 },
533 Err(e) => TopologySnapshot {
534 topology: None,
535 last_error: Some(format!("topology: {e}")),
536 last_update: Some(Instant::now()),
537 },
538 }
539}
540
541fn spawn_network_poller(
544 client: Arc<ApiClient>,
545 tx: watch::Sender<NetworkSnapshot>,
546 cancel: CancellationToken,
547 interval: Duration,
548) {
549 tokio::spawn(async move {
550 let mut tick = tokio::time::interval(interval);
551 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
552 loop {
553 tokio::select! {
554 _ = cancel.cancelled() => break,
555 _ = tick.tick() => {
556 let snap = collect_network(&client).await;
557 if tx.send(snap).is_err() {
558 break;
559 }
560 }
561 }
562 }
563 });
564}
565
566async fn collect_network(client: &ApiClient) -> NetworkSnapshot {
567 match client.bee().debug().addresses().await {
568 Ok(addresses) => NetworkSnapshot {
569 addresses: Some(addresses),
570 last_error: None,
571 last_update: Some(Instant::now()),
572 },
573 Err(e) => NetworkSnapshot {
574 addresses: None,
575 last_error: Some(format!("addresses: {e}")),
576 last_update: Some(Instant::now()),
577 },
578 }
579}
580
581fn spawn_transactions_poller(
584 client: Arc<ApiClient>,
585 tx: watch::Sender<TransactionsSnapshot>,
586 cancel: CancellationToken,
587 interval: Duration,
588) {
589 tokio::spawn(async move {
590 let mut tick = tokio::time::interval(interval);
591 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
592 loop {
593 tokio::select! {
594 _ = cancel.cancelled() => break,
595 _ = tick.tick() => {
596 let snap = collect_transactions(&client).await;
597 if tx.send(snap).is_err() {
598 break;
599 }
600 }
601 }
602 }
603 });
604}
605
606async fn collect_transactions(client: &ApiClient) -> TransactionsSnapshot {
607 match client.bee().debug().pending_transactions().await {
608 Ok(pending) => TransactionsSnapshot {
609 pending,
610 last_error: None,
611 last_update: Some(Instant::now()),
612 },
613 Err(e) => TransactionsSnapshot {
614 pending: Vec::new(),
615 last_error: Some(format!("transactions: {e}")),
616 last_update: Some(Instant::now()),
617 },
618 }
619}
620
621fn spawn_tags_poller(
624 client: Arc<ApiClient>,
625 tx: watch::Sender<TagsSnapshot>,
626 cancel: CancellationToken,
627 interval: Duration,
628) {
629 tokio::spawn(async move {
630 let mut tick = tokio::time::interval(interval);
631 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
632 loop {
633 tokio::select! {
634 _ = cancel.cancelled() => break,
635 _ = tick.tick() => {
636 let snap = collect_tags(&client).await;
637 if tx.send(snap).is_err() {
638 break;
639 }
640 }
641 }
642 }
643 });
644}
645
646async fn collect_tags(client: &ApiClient) -> TagsSnapshot {
647 match client.bee().api().list_tags(None, None).await {
648 Ok(tags) => TagsSnapshot {
649 tags,
650 last_error: None,
651 last_update: Some(Instant::now()),
652 },
653 Err(e) => TagsSnapshot {
654 tags: Vec::new(),
655 last_error: Some(format!("tags: {e}")),
656 last_update: Some(Instant::now()),
657 },
658 }
659}
660
661async fn collect_health(client: &ApiClient) -> HealthSnapshot {
662 let bee = client.bee();
663
664 let ping_start = Instant::now();
667 let health_ok = bee.debug().health().await.is_ok();
668 let last_ping = health_ok.then(|| ping_start.elapsed());
669
670 let status = bee.debug().status().await;
671 let chain_state = bee.debug().chain_state().await;
672 let wallet = bee.debug().wallet().await;
673 let redistribution = bee.debug().redistribution_state().await;
674
675 let mut snap = HealthSnapshot {
676 last_ping,
677 last_update: Some(Instant::now()),
678 ..Default::default()
679 };
680 let mut errors: Vec<String> = Vec::new();
681 match status {
682 Ok(s) => snap.status = Some(s),
683 Err(e) => errors.push(format!("status: {e}")),
684 }
685 match chain_state {
686 Ok(c) => snap.chain_state = Some(c),
687 Err(e) => errors.push(format!("chainstate: {e}")),
688 }
689 match wallet {
690 Ok(w) => snap.wallet = Some(w),
691 Err(e) => errors.push(format!("wallet: {e}")),
692 }
693 match redistribution {
694 Ok(r) => snap.redistribution = Some(r),
695 Err(e) => errors.push(format!("redistributionstate: {e}")),
696 }
697 if !errors.is_empty() {
698 snap.last_error = Some(errors.join("; "));
699 }
700 snap
701}
702
703#[cfg(test)]
704mod tests {
705 use super::*;
706
707 #[test]
708 fn fully_loaded_default_is_false() {
709 assert!(!HealthSnapshot::default().is_fully_loaded());
710 }
711
712 #[test]
713 fn fully_loaded_requires_no_error_and_all_fields() {
714 let snap = HealthSnapshot {
717 status: Some(Status::default()),
718 chain_state: Some(serde_json::from_str(r#"{"block":0,"chainTip":0}"#).unwrap()),
719 wallet: Some(
720 serde_json::from_str(
721 r#"{"chainID":1,"walletAddress":"0x0000000000000000000000000000000000000000"}"#,
722 )
723 .unwrap(),
724 ),
725 redistribution: Some(RedistributionState::default()),
726 ..Default::default()
727 };
728 assert!(snap.is_fully_loaded());
729 let mut bad = snap;
730 bad.last_error = Some("boom".into());
731 assert!(!bad.is_fully_loaded());
732 }
733}