#![allow(dead_code)]
use std::sync::Arc;
use std::time::{Duration, Instant};
use bee::api::Tag;
use bee::debug::{
Addresses, ChainState, ChequebookBalance, LastCheque, RedistributionState, Settlements, Status,
Topology, TransactionInfo, Wallet,
};
use bee::postage::PostageBatch;
use num_bigint::BigInt;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use crate::api::ApiClient;
#[derive(Clone, Debug, Default)]
pub struct HealthSnapshot {
pub status: Option<Status>,
pub chain_state: Option<ChainState>,
pub wallet: Option<Wallet>,
pub redistribution: Option<RedistributionState>,
pub last_ping: Option<Duration>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl HealthSnapshot {
pub fn is_fully_loaded(&self) -> bool {
self.last_error.is_none()
&& self.status.is_some()
&& self.chain_state.is_some()
&& self.wallet.is_some()
&& self.redistribution.is_some()
}
}
#[derive(Clone, Debug, Default)]
pub struct StampsSnapshot {
pub batches: Vec<PostageBatch>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl StampsSnapshot {
pub fn is_loaded(&self) -> bool {
self.last_update.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug, Default)]
pub struct SwapSnapshot {
pub chequebook: Option<ChequebookBalance>,
pub chequebook_address: Option<String>,
pub settlements: Option<Settlements>,
pub time_settlements: Option<Settlements>,
pub last_received: Vec<LastCheque>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl SwapSnapshot {
pub fn is_loaded(&self) -> bool {
self.last_update.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug, Default)]
pub struct TagsSnapshot {
pub tags: Vec<Tag>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl TagsSnapshot {
pub fn is_loaded(&self) -> bool {
self.last_update.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug, Default)]
pub struct TransactionsSnapshot {
pub pending: Vec<TransactionInfo>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl TransactionsSnapshot {
pub fn is_loaded(&self) -> bool {
self.last_update.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug, Default)]
pub struct NetworkSnapshot {
pub addresses: Option<Addresses>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl NetworkSnapshot {
pub fn is_loaded(&self) -> bool {
self.addresses.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug, Default)]
pub struct TopologySnapshot {
pub topology: Option<Topology>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl TopologySnapshot {
pub fn is_loaded(&self) -> bool {
self.topology.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug, Default)]
pub struct LotterySnapshot {
pub staked: Option<BigInt>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl LotterySnapshot {
pub fn is_loaded(&self) -> bool {
self.last_update.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug)]
pub struct BeeWatch {
health_rx: watch::Receiver<HealthSnapshot>,
stamps_rx: watch::Receiver<StampsSnapshot>,
swap_rx: watch::Receiver<SwapSnapshot>,
lottery_rx: watch::Receiver<LotterySnapshot>,
topology_rx: watch::Receiver<TopologySnapshot>,
network_rx: watch::Receiver<NetworkSnapshot>,
transactions_rx: watch::Receiver<TransactionsSnapshot>,
tags_rx: watch::Receiver<TagsSnapshot>,
cancel: CancellationToken,
}
impl BeeWatch {
pub fn start(client: Arc<ApiClient>, parent_cancel: &CancellationToken) -> Self {
let cancel = parent_cancel.child_token();
let (health_tx, health_rx) = watch::channel(HealthSnapshot::default());
spawn_health_poller(
client.clone(),
health_tx,
cancel.clone(),
Duration::from_secs(2),
);
let (stamps_tx, stamps_rx) = watch::channel(StampsSnapshot::default());
spawn_stamps_poller(
client.clone(),
stamps_tx,
cancel.clone(),
Duration::from_secs(10),
);
let (swap_tx, swap_rx) = watch::channel(SwapSnapshot::default());
spawn_swap_poller(
client.clone(),
swap_tx,
cancel.clone(),
Duration::from_secs(30),
);
let (lottery_tx, lottery_rx) = watch::channel(LotterySnapshot::default());
spawn_lottery_poller(
client.clone(),
lottery_tx,
cancel.clone(),
Duration::from_secs(30),
);
let (topology_tx, topology_rx) = watch::channel(TopologySnapshot::default());
spawn_topology_poller(
client.clone(),
topology_tx,
cancel.clone(),
Duration::from_secs(5),
);
let (network_tx, network_rx) = watch::channel(NetworkSnapshot::default());
spawn_network_poller(
client.clone(),
network_tx,
cancel.clone(),
Duration::from_secs(60),
);
let (transactions_tx, transactions_rx) =
watch::channel(TransactionsSnapshot::default());
spawn_transactions_poller(
client.clone(),
transactions_tx,
cancel.clone(),
Duration::from_secs(30),
);
let (tags_tx, tags_rx) = watch::channel(TagsSnapshot::default());
spawn_tags_poller(client, tags_tx, cancel.clone(), Duration::from_secs(5));
Self {
health_rx,
stamps_rx,
swap_rx,
lottery_rx,
topology_rx,
network_rx,
transactions_rx,
tags_rx,
cancel,
}
}
pub fn health(&self) -> watch::Receiver<HealthSnapshot> {
self.health_rx.clone()
}
pub fn stamps(&self) -> watch::Receiver<StampsSnapshot> {
self.stamps_rx.clone()
}
pub fn swap(&self) -> watch::Receiver<SwapSnapshot> {
self.swap_rx.clone()
}
pub fn lottery(&self) -> watch::Receiver<LotterySnapshot> {
self.lottery_rx.clone()
}
pub fn topology(&self) -> watch::Receiver<TopologySnapshot> {
self.topology_rx.clone()
}
pub fn network(&self) -> watch::Receiver<NetworkSnapshot> {
self.network_rx.clone()
}
pub fn transactions(&self) -> watch::Receiver<TransactionsSnapshot> {
self.transactions_rx.clone()
}
pub fn tags(&self) -> watch::Receiver<TagsSnapshot> {
self.tags_rx.clone()
}
pub fn shutdown(&self) {
self.cancel.cancel();
}
}
fn spawn_health_poller(
client: Arc<ApiClient>,
tx: watch::Sender<HealthSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_health(&client).await;
if tx.send(snap).is_err() {
break; }
}
}
}
});
}
fn spawn_stamps_poller(
client: Arc<ApiClient>,
tx: watch::Sender<StampsSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_stamps(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_stamps(client: &ApiClient) -> StampsSnapshot {
match client.bee().postage().get_postage_batches().await {
Ok(batches) => StampsSnapshot {
batches,
last_error: None,
last_update: Some(Instant::now()),
},
Err(e) => StampsSnapshot {
batches: Vec::new(),
last_error: Some(format!("stamps: {e}")),
last_update: Some(Instant::now()),
},
}
}
fn spawn_swap_poller(
client: Arc<ApiClient>,
tx: watch::Sender<SwapSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_swap(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_swap(client: &ApiClient) -> SwapSnapshot {
let bee = client.bee();
let chequebook = bee.debug().chequebook_balance().await;
let chequebook_address = bee.debug().chequebook_address().await;
let settlements = bee.debug().settlements().await;
let time_settlements = bee.debug().time_settlements().await;
let last_received = bee.debug().last_cheques().await;
let mut snap = SwapSnapshot {
last_update: Some(Instant::now()),
..Default::default()
};
let mut errors: Vec<String> = Vec::new();
match chequebook {
Ok(c) => snap.chequebook = Some(c),
Err(e) => errors.push(format!("chequebook: {e}")),
}
if let Ok(a) = chequebook_address {
snap.chequebook_address = Some(a);
}
match settlements {
Ok(s) => snap.settlements = Some(s),
Err(e) => errors.push(format!("settlements: {e}")),
}
match time_settlements {
Ok(s) => snap.time_settlements = Some(s),
Err(e) => errors.push(format!("timesettlements: {e}")),
}
match last_received {
Ok(v) => snap.last_received = v,
Err(e) => errors.push(format!("cheques: {e}")),
}
if !errors.is_empty() {
snap.last_error = Some(errors.join("; "));
}
snap
}
fn spawn_lottery_poller(
client: Arc<ApiClient>,
tx: watch::Sender<LotterySnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_lottery(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_lottery(client: &ApiClient) -> LotterySnapshot {
match client.bee().debug().stake().await {
Ok(staked) => LotterySnapshot {
staked: Some(staked),
last_error: None,
last_update: Some(Instant::now()),
},
Err(e) => LotterySnapshot {
staked: None,
last_error: Some(format!("stake: {e}")),
last_update: Some(Instant::now()),
},
}
}
fn spawn_topology_poller(
client: Arc<ApiClient>,
tx: watch::Sender<TopologySnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_topology(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_topology(client: &ApiClient) -> TopologySnapshot {
match client.bee().debug().topology().await {
Ok(topology) => TopologySnapshot {
topology: Some(topology),
last_error: None,
last_update: Some(Instant::now()),
},
Err(e) => TopologySnapshot {
topology: None,
last_error: Some(format!("topology: {e}")),
last_update: Some(Instant::now()),
},
}
}
fn spawn_network_poller(
client: Arc<ApiClient>,
tx: watch::Sender<NetworkSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_network(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_network(client: &ApiClient) -> NetworkSnapshot {
match client.bee().debug().addresses().await {
Ok(addresses) => NetworkSnapshot {
addresses: Some(addresses),
last_error: None,
last_update: Some(Instant::now()),
},
Err(e) => NetworkSnapshot {
addresses: None,
last_error: Some(format!("addresses: {e}")),
last_update: Some(Instant::now()),
},
}
}
fn spawn_transactions_poller(
client: Arc<ApiClient>,
tx: watch::Sender<TransactionsSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_transactions(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_transactions(client: &ApiClient) -> TransactionsSnapshot {
match client.bee().debug().pending_transactions().await {
Ok(pending) => TransactionsSnapshot {
pending,
last_error: None,
last_update: Some(Instant::now()),
},
Err(e) => TransactionsSnapshot {
pending: Vec::new(),
last_error: Some(format!("transactions: {e}")),
last_update: Some(Instant::now()),
},
}
}
fn spawn_tags_poller(
client: Arc<ApiClient>,
tx: watch::Sender<TagsSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_tags(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_tags(client: &ApiClient) -> TagsSnapshot {
match client.bee().api().list_tags(None, None).await {
Ok(tags) => TagsSnapshot {
tags,
last_error: None,
last_update: Some(Instant::now()),
},
Err(e) => TagsSnapshot {
tags: Vec::new(),
last_error: Some(format!("tags: {e}")),
last_update: Some(Instant::now()),
},
}
}
async fn collect_health(client: &ApiClient) -> HealthSnapshot {
let bee = client.bee();
let ping_start = Instant::now();
let health_ok = bee.debug().health().await.is_ok();
let last_ping = health_ok.then(|| ping_start.elapsed());
let status = bee.debug().status().await;
let chain_state = bee.debug().chain_state().await;
let wallet = bee.debug().wallet().await;
let redistribution = bee.debug().redistribution_state().await;
let mut snap = HealthSnapshot {
last_ping,
last_update: Some(Instant::now()),
..Default::default()
};
let mut errors: Vec<String> = Vec::new();
match status {
Ok(s) => snap.status = Some(s),
Err(e) => errors.push(format!("status: {e}")),
}
match chain_state {
Ok(c) => snap.chain_state = Some(c),
Err(e) => errors.push(format!("chainstate: {e}")),
}
match wallet {
Ok(w) => snap.wallet = Some(w),
Err(e) => errors.push(format!("wallet: {e}")),
}
match redistribution {
Ok(r) => snap.redistribution = Some(r),
Err(e) => errors.push(format!("redistributionstate: {e}")),
}
if !errors.is_empty() {
snap.last_error = Some(errors.join("; "));
}
snap
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fully_loaded_default_is_false() {
assert!(!HealthSnapshot::default().is_fully_loaded());
}
#[test]
fn fully_loaded_requires_no_error_and_all_fields() {
let snap = HealthSnapshot {
status: Some(Status::default()),
chain_state: Some(serde_json::from_str(r#"{"block":0,"chainTip":0}"#).unwrap()),
wallet: Some(
serde_json::from_str(
r#"{"chainID":1,"walletAddress":"0x0000000000000000000000000000000000000000"}"#,
)
.unwrap(),
),
redistribution: Some(RedistributionState::default()),
..Default::default()
};
assert!(snap.is_fully_loaded());
let mut bad = snap;
bad.last_error = Some("boom".into());
assert!(!bad.is_fully_loaded());
}
}