#![allow(dead_code)]
use std::sync::Arc;
use std::time::{Duration, Instant};
use bee::api::Tag;
use bee::debug::{
Addresses, ChainState, ChequebookBalance, LastCheque, RedistributionState, Settlements, Status,
Topology, TransactionInfo, Wallet,
};
use bee::postage::PostageBatch;
use bee::swarm::Reference;
use num_bigint::BigInt;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use crate::api::ApiClient;
#[derive(Clone, Debug, Default)]
pub struct HealthSnapshot {
pub status: Option<Status>,
pub chain_state: Option<ChainState>,
pub wallet: Option<Wallet>,
pub redistribution: Option<RedistributionState>,
pub last_ping: Option<Duration>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl HealthSnapshot {
pub fn is_fully_loaded(&self) -> bool {
self.last_error.is_none()
&& self.status.is_some()
&& self.chain_state.is_some()
&& self.wallet.is_some()
&& self.redistribution.is_some()
}
}
#[derive(Clone, Debug, Default)]
pub struct StampsSnapshot {
pub batches: Vec<PostageBatch>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl StampsSnapshot {
pub fn is_loaded(&self) -> bool {
self.last_update.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug, Default)]
pub struct SwapSnapshot {
pub chequebook: Option<ChequebookBalance>,
pub chequebook_address: Option<String>,
pub settlements: Option<Settlements>,
pub time_settlements: Option<Settlements>,
pub last_received: Vec<LastCheque>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl SwapSnapshot {
pub fn is_loaded(&self) -> bool {
self.last_update.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug, Default)]
pub struct TagsSnapshot {
pub tags: Vec<Tag>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl TagsSnapshot {
pub fn is_loaded(&self) -> bool {
self.last_update.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug, Default)]
pub struct PinsSnapshot {
pub pins: Vec<Reference>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl PinsSnapshot {
pub fn is_loaded(&self) -> bool {
self.last_update.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug, Default)]
pub struct TransactionsSnapshot {
pub pending: Vec<TransactionInfo>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl TransactionsSnapshot {
pub fn is_loaded(&self) -> bool {
self.last_update.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug, Default)]
pub struct NetworkSnapshot {
pub addresses: Option<Addresses>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl NetworkSnapshot {
pub fn is_loaded(&self) -> bool {
self.addresses.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug, Default)]
pub struct TopologySnapshot {
pub topology: Option<Topology>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl TopologySnapshot {
pub fn is_loaded(&self) -> bool {
self.topology.is_some() && self.last_error.is_none()
}
}
#[derive(Clone, Debug, Default)]
pub struct LotterySnapshot {
pub staked: Option<BigInt>,
pub last_error: Option<String>,
pub last_update: Option<Instant>,
}
impl LotterySnapshot {
pub fn is_loaded(&self) -> bool {
self.last_update.is_some() && self.last_error.is_none()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RefreshProfile {
Live,
Default,
Slow,
}
impl RefreshProfile {
pub fn from_config(s: &str) -> Self {
match s {
"live" => Self::Live,
"default" => Self::Default,
"slow" => Self::Slow,
other => {
tracing::warn!(
"unknown [ui].refresh value {other:?}; falling back to \"default\". \
Recognised: live | default | slow."
);
Self::Default
}
}
}
pub fn health(self) -> Duration {
match self {
Self::Live => Duration::from_secs(2),
Self::Default => Duration::from_secs(4),
Self::Slow => Duration::from_secs(8),
}
}
pub fn topology(self) -> Duration {
match self {
Self::Live => Duration::from_secs(5),
Self::Default => Duration::from_secs(10),
Self::Slow => Duration::from_secs(20),
}
}
pub fn stamps(self) -> Duration {
match self {
Self::Live | Self::Default => Duration::from_secs(10),
Self::Slow => Duration::from_secs(20),
}
}
pub fn tags(self) -> Duration {
match self {
Self::Live => Duration::from_secs(5),
Self::Default => Duration::from_secs(10),
Self::Slow => Duration::from_secs(20),
}
}
pub fn swap(self) -> Duration {
match self {
Self::Live | Self::Default => Duration::from_secs(30),
Self::Slow => Duration::from_secs(60),
}
}
pub fn lottery(self) -> Duration {
match self {
Self::Live | Self::Default => Duration::from_secs(30),
Self::Slow => Duration::from_secs(60),
}
}
pub fn transactions(self) -> Duration {
match self {
Self::Live | Self::Default => Duration::from_secs(30),
Self::Slow => Duration::from_secs(60),
}
}
pub fn network(self) -> Duration {
match self {
Self::Live | Self::Default => Duration::from_secs(60),
Self::Slow => Duration::from_secs(120),
}
}
pub fn pins(self) -> Duration {
match self {
Self::Live | Self::Default => Duration::from_secs(30),
Self::Slow => Duration::from_secs(60),
}
}
}
#[derive(Clone, Debug)]
pub struct BeeWatch {
health_rx: watch::Receiver<HealthSnapshot>,
stamps_rx: watch::Receiver<StampsSnapshot>,
swap_rx: watch::Receiver<SwapSnapshot>,
lottery_rx: watch::Receiver<LotterySnapshot>,
topology_rx: watch::Receiver<TopologySnapshot>,
network_rx: watch::Receiver<NetworkSnapshot>,
transactions_rx: watch::Receiver<TransactionsSnapshot>,
tags_rx: watch::Receiver<TagsSnapshot>,
pins_rx: watch::Receiver<PinsSnapshot>,
cancel: CancellationToken,
}
impl BeeWatch {
pub fn start(client: Arc<ApiClient>, parent_cancel: &CancellationToken) -> Self {
Self::start_with_profile(client, parent_cancel, RefreshProfile::Default)
}
pub fn start_with_profile(
client: Arc<ApiClient>,
parent_cancel: &CancellationToken,
profile: RefreshProfile,
) -> Self {
let cancel = parent_cancel.child_token();
let (health_tx, health_rx) = watch::channel(HealthSnapshot::default());
spawn_health_poller(client.clone(), health_tx, cancel.clone(), profile.health());
let (stamps_tx, stamps_rx) = watch::channel(StampsSnapshot::default());
spawn_stamps_poller(client.clone(), stamps_tx, cancel.clone(), profile.stamps());
let (swap_tx, swap_rx) = watch::channel(SwapSnapshot::default());
spawn_swap_poller(client.clone(), swap_tx, cancel.clone(), profile.swap());
let (lottery_tx, lottery_rx) = watch::channel(LotterySnapshot::default());
spawn_lottery_poller(
client.clone(),
lottery_tx,
cancel.clone(),
profile.lottery(),
);
let (topology_tx, topology_rx) = watch::channel(TopologySnapshot::default());
spawn_topology_poller(
client.clone(),
topology_tx,
cancel.clone(),
profile.topology(),
);
let (network_tx, network_rx) = watch::channel(NetworkSnapshot::default());
spawn_network_poller(
client.clone(),
network_tx,
cancel.clone(),
profile.network(),
);
let (transactions_tx, transactions_rx) = watch::channel(TransactionsSnapshot::default());
spawn_transactions_poller(
client.clone(),
transactions_tx,
cancel.clone(),
profile.transactions(),
);
let (tags_tx, tags_rx) = watch::channel(TagsSnapshot::default());
spawn_tags_poller(client.clone(), tags_tx, cancel.clone(), profile.tags());
let (pins_tx, pins_rx) = watch::channel(PinsSnapshot::default());
spawn_pins_poller(client, pins_tx, cancel.clone(), profile.pins());
Self {
health_rx,
stamps_rx,
swap_rx,
lottery_rx,
topology_rx,
network_rx,
transactions_rx,
tags_rx,
pins_rx,
cancel,
}
}
pub fn health(&self) -> watch::Receiver<HealthSnapshot> {
self.health_rx.clone()
}
pub fn stamps(&self) -> watch::Receiver<StampsSnapshot> {
self.stamps_rx.clone()
}
pub fn swap(&self) -> watch::Receiver<SwapSnapshot> {
self.swap_rx.clone()
}
pub fn lottery(&self) -> watch::Receiver<LotterySnapshot> {
self.lottery_rx.clone()
}
pub fn topology(&self) -> watch::Receiver<TopologySnapshot> {
self.topology_rx.clone()
}
pub fn network(&self) -> watch::Receiver<NetworkSnapshot> {
self.network_rx.clone()
}
pub fn transactions(&self) -> watch::Receiver<TransactionsSnapshot> {
self.transactions_rx.clone()
}
pub fn tags(&self) -> watch::Receiver<TagsSnapshot> {
self.tags_rx.clone()
}
pub fn pins(&self) -> watch::Receiver<PinsSnapshot> {
self.pins_rx.clone()
}
pub fn shutdown(&self) {
self.cancel.cancel();
}
}
fn spawn_health_poller(
client: Arc<ApiClient>,
tx: watch::Sender<HealthSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_health(&client).await;
if tx.send(snap).is_err() {
break; }
}
}
}
});
}
fn spawn_stamps_poller(
client: Arc<ApiClient>,
tx: watch::Sender<StampsSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_stamps(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_stamps(client: &ApiClient) -> StampsSnapshot {
match client.bee().postage().get_postage_batches().await {
Ok(batches) => StampsSnapshot {
batches,
last_error: None,
last_update: Some(Instant::now()),
},
Err(e) => StampsSnapshot {
batches: Vec::new(),
last_error: Some(format!("stamps: {e}")),
last_update: Some(Instant::now()),
},
}
}
fn spawn_swap_poller(
client: Arc<ApiClient>,
tx: watch::Sender<SwapSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_swap(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_swap(client: &ApiClient) -> SwapSnapshot {
let bee = client.bee();
let chequebook = bee.debug().chequebook_balance().await;
let chequebook_address = bee.debug().chequebook_address().await;
let settlements = bee.debug().settlements().await;
let time_settlements = bee.debug().time_settlements().await;
let last_received = bee.debug().last_cheques().await;
let mut snap = SwapSnapshot {
last_update: Some(Instant::now()),
..Default::default()
};
let mut errors: Vec<String> = Vec::new();
match chequebook {
Ok(c) => snap.chequebook = Some(c),
Err(e) => errors.push(format!("chequebook: {e}")),
}
if let Ok(a) = chequebook_address {
snap.chequebook_address = Some(a);
}
match settlements {
Ok(s) => snap.settlements = Some(s),
Err(e) => errors.push(format!("settlements: {e}")),
}
match time_settlements {
Ok(s) => snap.time_settlements = Some(s),
Err(e) => errors.push(format!("timesettlements: {e}")),
}
match last_received {
Ok(v) => snap.last_received = v,
Err(e) => errors.push(format!("cheques: {e}")),
}
if !errors.is_empty() {
snap.last_error = Some(errors.join("; "));
}
snap
}
fn spawn_lottery_poller(
client: Arc<ApiClient>,
tx: watch::Sender<LotterySnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_lottery(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_lottery(client: &ApiClient) -> LotterySnapshot {
match client.bee().debug().stake().await {
Ok(staked) => LotterySnapshot {
staked: Some(staked),
last_error: None,
last_update: Some(Instant::now()),
},
Err(e) => LotterySnapshot {
staked: None,
last_error: Some(format!("stake: {e}")),
last_update: Some(Instant::now()),
},
}
}
fn spawn_topology_poller(
client: Arc<ApiClient>,
tx: watch::Sender<TopologySnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_topology(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_topology(client: &ApiClient) -> TopologySnapshot {
match client.bee().debug().topology().await {
Ok(topology) => TopologySnapshot {
topology: Some(topology),
last_error: None,
last_update: Some(Instant::now()),
},
Err(e) => TopologySnapshot {
topology: None,
last_error: Some(format!("topology: {e}")),
last_update: Some(Instant::now()),
},
}
}
fn spawn_network_poller(
client: Arc<ApiClient>,
tx: watch::Sender<NetworkSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_network(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_network(client: &ApiClient) -> NetworkSnapshot {
match client.bee().debug().addresses().await {
Ok(addresses) => NetworkSnapshot {
addresses: Some(addresses),
last_error: None,
last_update: Some(Instant::now()),
},
Err(e) => NetworkSnapshot {
addresses: None,
last_error: Some(format!("addresses: {e}")),
last_update: Some(Instant::now()),
},
}
}
fn spawn_transactions_poller(
client: Arc<ApiClient>,
tx: watch::Sender<TransactionsSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_transactions(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_transactions(client: &ApiClient) -> TransactionsSnapshot {
match client.bee().debug().pending_transactions().await {
Ok(pending) => TransactionsSnapshot {
pending,
last_error: None,
last_update: Some(Instant::now()),
},
Err(e) => TransactionsSnapshot {
pending: Vec::new(),
last_error: Some(format!("transactions: {e}")),
last_update: Some(Instant::now()),
},
}
}
fn spawn_tags_poller(
client: Arc<ApiClient>,
tx: watch::Sender<TagsSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_tags(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_tags(client: &ApiClient) -> TagsSnapshot {
match client.bee().api().list_tags(None, None).await {
Ok(tags) => TagsSnapshot {
tags,
last_error: None,
last_update: Some(Instant::now()),
},
Err(e) => TagsSnapshot {
tags: Vec::new(),
last_error: Some(format!("tags: {e}")),
last_update: Some(Instant::now()),
},
}
}
fn spawn_pins_poller(
client: Arc<ApiClient>,
tx: watch::Sender<PinsSnapshot>,
cancel: CancellationToken,
interval: Duration,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tick.tick() => {
let snap = collect_pins(&client).await;
if tx.send(snap).is_err() {
break;
}
}
}
}
});
}
async fn collect_pins(client: &ApiClient) -> PinsSnapshot {
match client.bee().api().list_pins().await {
Ok(pins) => PinsSnapshot {
pins,
last_error: None,
last_update: Some(Instant::now()),
},
Err(e) => PinsSnapshot {
pins: Vec::new(),
last_error: Some(format!("pins: {e}")),
last_update: Some(Instant::now()),
},
}
}
async fn collect_health(client: &ApiClient) -> HealthSnapshot {
let bee = client.bee();
let ping_start = Instant::now();
let health_ok = bee.debug().health().await.is_ok();
let last_ping = health_ok.then(|| ping_start.elapsed());
let status = bee.debug().status().await;
let chain_state = bee.debug().chain_state().await;
let wallet = bee.debug().wallet().await;
let redistribution = bee.debug().redistribution_state().await;
let mut snap = HealthSnapshot {
last_ping,
last_update: Some(Instant::now()),
..Default::default()
};
let mut errors: Vec<String> = Vec::new();
match status {
Ok(s) => snap.status = Some(s),
Err(e) => errors.push(format!("status: {e}")),
}
match chain_state {
Ok(c) => snap.chain_state = Some(c),
Err(e) => errors.push(format!("chainstate: {e}")),
}
match wallet {
Ok(w) => snap.wallet = Some(w),
Err(e) => errors.push(format!("wallet: {e}")),
}
match redistribution {
Ok(r) => snap.redistribution = Some(r),
Err(e) => errors.push(format!("redistributionstate: {e}")),
}
if !errors.is_empty() {
snap.last_error = Some(errors.join("; "));
}
snap
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fully_loaded_default_is_false() {
assert!(!HealthSnapshot::default().is_fully_loaded());
}
#[test]
fn fully_loaded_requires_no_error_and_all_fields() {
let snap = HealthSnapshot {
status: Some(Status::default()),
chain_state: Some(serde_json::from_str(r#"{"block":0,"chainTip":0}"#).unwrap()),
wallet: Some(
serde_json::from_str(
r#"{"chainID":1,"walletAddress":"0x0000000000000000000000000000000000000000"}"#,
)
.unwrap(),
),
redistribution: Some(RedistributionState::default()),
..Default::default()
};
assert!(snap.is_fully_loaded());
let mut bad = snap;
bad.last_error = Some("boom".into());
assert!(!bad.is_fully_loaded());
}
}