1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use tokio::sync::{broadcast, watch};
use bat_markets_core::{ErrorKind, HealthNotification, HealthReport, MarketError, Result};
use crate::client::BatMarkets;
/// Cheap health snapshots for applications and automation.
pub struct HealthClient<'a> {
inner: &'a BatMarkets,
}
impl<'a> HealthClient<'a> {
pub(crate) const fn new(inner: &'a BatMarkets) -> Self {
Self { inner }
}
/// Return the current health snapshot.
#[must_use]
pub fn snapshot(&self) -> HealthReport {
self.inner.shared.health_snapshot()
}
/// Subscribe to snapshot-style health changes.
///
/// ```no_run
/// use bat_markets::{BatMarkets, errors::Result, types::{Product, Venue}};
///
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let client = BatMarkets::builder()
/// .venue(Venue::Bybit)
/// .product(Product::LinearUsdt)
/// .build_live()
/// .await?;
/// let receiver = client.health().subscribe();
/// let _initial = receiver.borrow().clone();
/// # Ok(())
/// # }
/// ```
pub fn subscribe(&self) -> watch::Receiver<HealthReport> {
self.inner.shared.subscribe_health()
}
/// Subscribe to transition-style health notifications.
///
/// Notifications are emitted for structural state changes, not every market-data tick.
pub fn notifications(&self) -> broadcast::Receiver<HealthNotification> {
self.inner.shared.subscribe_health_notifications()
}
}
/// RAII-style watcher for runtime status changes.
///
/// Dropping the watcher releases the local subscription. There is no global
/// `un_watch` method because Rust ownership already scopes the subscription.
pub struct StatusWatch {
receiver: watch::Receiver<HealthReport>,
}
impl StatusWatch {
pub(crate) const fn new(receiver: watch::Receiver<HealthReport>) -> Self {
Self { receiver }
}
/// Return the current cached status snapshot without waiting.
#[must_use]
pub fn current(&self) -> HealthReport {
self.receiver.borrow().clone()
}
/// Wait for the next status change.
pub async fn recv(&mut self) -> Result<HealthReport> {
self.receiver.changed().await.map_err(|error| {
MarketError::new(
ErrorKind::TransportError,
format!("status watch receive failed: {error}"),
)
})?;
Ok(self.current())
}
/// Explicitly end the watch.
///
/// This mirrors other watch handles. Dropping the handle is equivalent for
/// the local health subscription.
pub async fn shutdown(self) -> Result<()> {
Ok(())
}
}