#![allow(dead_code)]
use std::sync::Arc;
use std::time::{Duration, Instant};
use bee::debug::{ChainState, RedistributionState, Status, Wallet};
use bee::postage::PostageBatch;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use crate::api::ApiClient;
#[derive(Clone, Debug, Default)]
pub struct HealthSnapshot {
pub status: Option<Status>,
pub chain_state: Option<ChainState>,
pub wallet: Option<Wallet>,
pub redistribution: Option<RedistributionState>,
pub last_ping: Option<Duration>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl HealthSnapshot {
pub fn is_fully_loaded(&self) -> bool {
self.last_error.is_none()
&& self.status.is_some()
&& self.chain_state.is_some()
&& self.wallet.is_some()
&& self.redistribution.is_some()
}
}
#[derive(Clone, Debug, Default)]
pub struct StampsSnapshot {
pub batches: Vec<PostageBatch>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl StampsSnapshot {
pub fn is_loaded(&self) -> bool {
self.last_update.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug)]
pub struct BeeWatch {
health_rx: watch::Receiver<HealthSnapshot>,
stamps_rx: watch::Receiver<StampsSnapshot>,
cancel: CancellationToken,
}
impl BeeWatch {
pub fn start(client: Arc<ApiClient>, parent_cancel: &CancellationToken) -> Self {
let cancel = parent_cancel.child_token();
let (health_tx, health_rx) = watch::channel(HealthSnapshot::default());
spawn_health_poller(
client.clone(),
health_tx,
cancel.clone(),
Duration::from_secs(2),
);
let (stamps_tx, stamps_rx) = watch::channel(StampsSnapshot::default());
spawn_stamps_poller(client, stamps_tx, cancel.clone(), Duration::from_secs(10));
Self {
health_rx,
stamps_rx,
cancel,
}
}
pub fn health(&self) -> watch::Receiver<HealthSnapshot> {
self.health_rx.clone()
}
pub fn stamps(&self) -> watch::Receiver<StampsSnapshot> {
self.stamps_rx.clone()
}
pub fn shutdown(&self) {
self.cancel.cancel();
}
}
fn spawn_health_poller(
client: Arc<ApiClient>,
tx: watch::Sender<HealthSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_health(&client).await;
if tx.send(snap).is_err() {
break; }
}
}
}
});
}
fn spawn_stamps_poller(
client: Arc<ApiClient>,
tx: watch::Sender<StampsSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_stamps(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_stamps(client: &ApiClient) -> StampsSnapshot {
match client.bee().postage().get_postage_batches().await {
Ok(batches) => StampsSnapshot {
batches,
last_error: None,
last_update: Some(Instant::now()),
},
Err(e) => StampsSnapshot {
batches: Vec::new(),
last_error: Some(format!("stamps: {e}")),
last_update: Some(Instant::now()),
},
}
}
async fn collect_health(client: &ApiClient) -> HealthSnapshot {
let bee = client.bee();
let ping_start = Instant::now();
let health_ok = bee.debug().health().await.is_ok();
let last_ping = health_ok.then(|| ping_start.elapsed());
let status = bee.debug().status().await;
let chain_state = bee.debug().chain_state().await;
let wallet = bee.debug().wallet().await;
let redistribution = bee.debug().redistribution_state().await;
let mut snap = HealthSnapshot {
last_ping,
last_update: Some(Instant::now()),
..Default::default()
};
let mut errors: Vec<String> = Vec::new();
match status {
Ok(s) => snap.status = Some(s),
Err(e) => errors.push(format!("status: {e}")),
}
match chain_state {
Ok(c) => snap.chain_state = Some(c),
Err(e) => errors.push(format!("chainstate: {e}")),
}
match wallet {
Ok(w) => snap.wallet = Some(w),
Err(e) => errors.push(format!("wallet: {e}")),
}
match redistribution {
Ok(r) => snap.redistribution = Some(r),
Err(e) => errors.push(format!("redistributionstate: {e}")),
}
if !errors.is_empty() {
snap.last_error = Some(errors.join("; "));
}
snap
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fully_loaded_default_is_false() {
assert!(!HealthSnapshot::default().is_fully_loaded());
}
#[test]
fn fully_loaded_requires_no_error_and_all_fields() {
let snap = HealthSnapshot {
status: Some(Status::default()),
chain_state: Some(serde_json::from_str(r#"{"block":0,"chainTip":0}"#).unwrap()),
wallet: Some(
serde_json::from_str(
r#"{"chainID":1,"walletAddress":"0x0000000000000000000000000000000000000000"}"#,
)
.unwrap(),
),
redistribution: Some(RedistributionState::default()),
..Default::default()
};
assert!(snap.is_fully_loaded());
let mut bad = snap;
bad.last_error = Some("boom".into());
assert!(!bad.is_fully_loaded());
}
}