use std::{
sync::atomic::{AtomicU64, Ordering},
time::Duration,
};
use crate::client::BatMarkets;
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct LockDiagnosticsSnapshot {
pub operations: u64,
pub wait_total_ns: u64,
pub wait_max_ns: u64,
pub hold_total_ns: u64,
pub hold_max_ns: u64,
}
impl LockDiagnosticsSnapshot {
#[must_use]
pub const fn average_wait_ns(&self) -> u64 {
if self.operations == 0 {
0
} else {
self.wait_total_ns / self.operations
}
}
#[must_use]
pub const fn average_hold_ns(&self) -> u64 {
if self.operations == 0 {
0
} else {
self.hold_total_ns / self.operations
}
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct LatencyDiagnosticsSnapshot {
pub operations: u64,
pub total_ns: u64,
pub max_ns: u64,
}
impl LatencyDiagnosticsSnapshot {
#[must_use]
pub const fn average_ns(&self) -> u64 {
if self.operations == 0 {
0
} else {
self.total_ns / self.operations
}
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct RuntimeDiagnosticsSnapshot {
pub state_reads: LockDiagnosticsSnapshot,
pub state_writes: LockDiagnosticsSnapshot,
pub refresh_metadata: LatencyDiagnosticsSnapshot,
pub fetch_ticker: LatencyDiagnosticsSnapshot,
pub fetch_mark_price: LatencyDiagnosticsSnapshot,
pub fetch_funding_rate: LatencyDiagnosticsSnapshot,
pub fetch_trades: LatencyDiagnosticsSnapshot,
pub fetch_book_top: LatencyDiagnosticsSnapshot,
pub fetch_order_book: LatencyDiagnosticsSnapshot,
pub fetch_liquidations: LatencyDiagnosticsSnapshot,
pub fetch_ohlcv: LatencyDiagnosticsSnapshot,
pub refresh_account: LatencyDiagnosticsSnapshot,
pub refresh_positions: LatencyDiagnosticsSnapshot,
pub refresh_open_orders: LatencyDiagnosticsSnapshot,
pub refresh_executions: LatencyDiagnosticsSnapshot,
pub get_order: LatencyDiagnosticsSnapshot,
pub create_order: LatencyDiagnosticsSnapshot,
pub create_orders: LatencyDiagnosticsSnapshot,
pub amend_order: LatencyDiagnosticsSnapshot,
pub amend_orders: LatencyDiagnosticsSnapshot,
pub cancel_order: LatencyDiagnosticsSnapshot,
pub cancel_orders: LatencyDiagnosticsSnapshot,
pub cancel_all_orders: LatencyDiagnosticsSnapshot,
pub close_position: LatencyDiagnosticsSnapshot,
pub validate_order: LatencyDiagnosticsSnapshot,
pub set_leverage: LatencyDiagnosticsSnapshot,
pub set_margin_mode: LatencyDiagnosticsSnapshot,
pub set_position_mode: LatencyDiagnosticsSnapshot,
pub reconcile_private: LatencyDiagnosticsSnapshot,
pub refresh_open_interest: LatencyDiagnosticsSnapshot,
}
pub struct DiagnosticsClient<'a> {
inner: &'a BatMarkets,
}
impl<'a> DiagnosticsClient<'a> {
pub(crate) const fn new(inner: &'a BatMarkets) -> Self {
Self { inner }
}
#[must_use]
pub fn snapshot(&self) -> RuntimeDiagnosticsSnapshot {
let mut snapshot = self.inner.runtime_state.diagnostics.snapshot();
snapshot.state_reads = self.inner.shared.read_diagnostics();
snapshot.state_writes = self.inner.shared.write_diagnostics();
snapshot
}
}
#[derive(Debug, Default)]
pub(crate) struct SharedStateDiagnostics {
reads: AtomicLockDiagnostics,
writes: AtomicLockDiagnostics,
}
impl SharedStateDiagnostics {
pub(crate) fn observe_read(&self, wait: Duration, hold: Duration) {
self.reads.observe(wait, hold);
}
pub(crate) fn observe_write(&self, wait: Duration, hold: Duration) {
self.writes.observe(wait, hold);
}
pub(crate) fn read_snapshot(&self) -> LockDiagnosticsSnapshot {
self.reads.snapshot()
}
pub(crate) fn write_snapshot(&self) -> LockDiagnosticsSnapshot {
self.writes.snapshot()
}
}
#[derive(Debug, Default)]
pub(crate) struct RuntimeDiagnosticsState {
refresh_metadata: AtomicLatencyDiagnostics,
fetch_ticker: AtomicLatencyDiagnostics,
fetch_mark_price: AtomicLatencyDiagnostics,
fetch_funding_rate: AtomicLatencyDiagnostics,
fetch_trades: AtomicLatencyDiagnostics,
fetch_book_top: AtomicLatencyDiagnostics,
fetch_order_book: AtomicLatencyDiagnostics,
fetch_liquidations: AtomicLatencyDiagnostics,
fetch_ohlcv: AtomicLatencyDiagnostics,
refresh_account: AtomicLatencyDiagnostics,
refresh_positions: AtomicLatencyDiagnostics,
refresh_open_orders: AtomicLatencyDiagnostics,
refresh_executions: AtomicLatencyDiagnostics,
get_order: AtomicLatencyDiagnostics,
create_order: AtomicLatencyDiagnostics,
create_orders: AtomicLatencyDiagnostics,
amend_order: AtomicLatencyDiagnostics,
amend_orders: AtomicLatencyDiagnostics,
cancel_order: AtomicLatencyDiagnostics,
cancel_orders: AtomicLatencyDiagnostics,
cancel_all_orders: AtomicLatencyDiagnostics,
close_position: AtomicLatencyDiagnostics,
validate_order: AtomicLatencyDiagnostics,
set_leverage: AtomicLatencyDiagnostics,
set_margin_mode: AtomicLatencyDiagnostics,
set_position_mode: AtomicLatencyDiagnostics,
reconcile_private: AtomicLatencyDiagnostics,
refresh_open_interest: AtomicLatencyDiagnostics,
}
impl RuntimeDiagnosticsState {
pub(crate) fn observe(&self, operation: RuntimeOperation, elapsed: Duration) {
let diagnostics = match operation {
RuntimeOperation::RefreshMetadata => &self.refresh_metadata,
RuntimeOperation::FetchTicker => &self.fetch_ticker,
RuntimeOperation::FetchMarkPrice => &self.fetch_mark_price,
RuntimeOperation::FetchFundingRate => &self.fetch_funding_rate,
RuntimeOperation::FetchTrades => &self.fetch_trades,
RuntimeOperation::FetchBookTop => &self.fetch_book_top,
RuntimeOperation::FetchOrderBook => &self.fetch_order_book,
RuntimeOperation::FetchLiquidations => &self.fetch_liquidations,
RuntimeOperation::FetchOhlcv => &self.fetch_ohlcv,
RuntimeOperation::RefreshAccount => &self.refresh_account,
RuntimeOperation::RefreshPositions => &self.refresh_positions,
RuntimeOperation::RefreshOpenOrders => &self.refresh_open_orders,
RuntimeOperation::RefreshExecutions => &self.refresh_executions,
RuntimeOperation::GetOrder => &self.get_order,
RuntimeOperation::CreateOrder => &self.create_order,
RuntimeOperation::CreateOrders => &self.create_orders,
RuntimeOperation::AmendOrder => &self.amend_order,
RuntimeOperation::AmendOrders => &self.amend_orders,
RuntimeOperation::CancelOrder => &self.cancel_order,
RuntimeOperation::CancelOrders => &self.cancel_orders,
RuntimeOperation::CancelAllOrders => &self.cancel_all_orders,
RuntimeOperation::ClosePosition => &self.close_position,
RuntimeOperation::ValidateOrder => &self.validate_order,
RuntimeOperation::SetLeverage => &self.set_leverage,
RuntimeOperation::SetMarginMode => &self.set_margin_mode,
RuntimeOperation::SetPositionMode => &self.set_position_mode,
RuntimeOperation::ReconcilePrivate => &self.reconcile_private,
RuntimeOperation::RefreshOpenInterest => &self.refresh_open_interest,
};
diagnostics.observe(elapsed);
}
pub(crate) fn snapshot(&self) -> RuntimeDiagnosticsSnapshot {
RuntimeDiagnosticsSnapshot {
state_reads: LockDiagnosticsSnapshot::default(),
state_writes: LockDiagnosticsSnapshot::default(),
refresh_metadata: self.refresh_metadata.snapshot(),
fetch_ticker: self.fetch_ticker.snapshot(),
fetch_mark_price: self.fetch_mark_price.snapshot(),
fetch_funding_rate: self.fetch_funding_rate.snapshot(),
fetch_trades: self.fetch_trades.snapshot(),
fetch_book_top: self.fetch_book_top.snapshot(),
fetch_order_book: self.fetch_order_book.snapshot(),
fetch_liquidations: self.fetch_liquidations.snapshot(),
fetch_ohlcv: self.fetch_ohlcv.snapshot(),
refresh_account: self.refresh_account.snapshot(),
refresh_positions: self.refresh_positions.snapshot(),
refresh_open_orders: self.refresh_open_orders.snapshot(),
refresh_executions: self.refresh_executions.snapshot(),
get_order: self.get_order.snapshot(),
create_order: self.create_order.snapshot(),
create_orders: self.create_orders.snapshot(),
amend_order: self.amend_order.snapshot(),
amend_orders: self.amend_orders.snapshot(),
cancel_order: self.cancel_order.snapshot(),
cancel_orders: self.cancel_orders.snapshot(),
cancel_all_orders: self.cancel_all_orders.snapshot(),
close_position: self.close_position.snapshot(),
validate_order: self.validate_order.snapshot(),
set_leverage: self.set_leverage.snapshot(),
set_margin_mode: self.set_margin_mode.snapshot(),
set_position_mode: self.set_position_mode.snapshot(),
reconcile_private: self.reconcile_private.snapshot(),
refresh_open_interest: self.refresh_open_interest.snapshot(),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum RuntimeOperation {
RefreshMetadata,
FetchTicker,
FetchMarkPrice,
FetchFundingRate,
FetchTrades,
FetchBookTop,
FetchOrderBook,
FetchLiquidations,
FetchOhlcv,
RefreshAccount,
RefreshPositions,
RefreshOpenOrders,
RefreshExecutions,
GetOrder,
CreateOrder,
CreateOrders,
AmendOrder,
AmendOrders,
CancelOrder,
CancelOrders,
CancelAllOrders,
ClosePosition,
ValidateOrder,
SetLeverage,
SetMarginMode,
SetPositionMode,
ReconcilePrivate,
RefreshOpenInterest,
}
#[derive(Debug, Default)]
struct AtomicLockDiagnostics {
operations: AtomicU64,
wait_total_ns: AtomicU64,
wait_max_ns: AtomicU64,
hold_total_ns: AtomicU64,
hold_max_ns: AtomicU64,
}
impl AtomicLockDiagnostics {
fn observe(&self, wait: Duration, hold: Duration) {
let wait_ns = duration_ns(wait);
let hold_ns = duration_ns(hold);
self.operations.fetch_add(1, Ordering::Relaxed);
self.wait_total_ns.fetch_add(wait_ns, Ordering::Relaxed);
self.hold_total_ns.fetch_add(hold_ns, Ordering::Relaxed);
update_max(&self.wait_max_ns, wait_ns);
update_max(&self.hold_max_ns, hold_ns);
}
fn snapshot(&self) -> LockDiagnosticsSnapshot {
LockDiagnosticsSnapshot {
operations: self.operations.load(Ordering::Relaxed),
wait_total_ns: self.wait_total_ns.load(Ordering::Relaxed),
wait_max_ns: self.wait_max_ns.load(Ordering::Relaxed),
hold_total_ns: self.hold_total_ns.load(Ordering::Relaxed),
hold_max_ns: self.hold_max_ns.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Default)]
struct AtomicLatencyDiagnostics {
operations: AtomicU64,
total_ns: AtomicU64,
max_ns: AtomicU64,
}
impl AtomicLatencyDiagnostics {
fn observe(&self, elapsed: Duration) {
let elapsed_ns = duration_ns(elapsed);
self.operations.fetch_add(1, Ordering::Relaxed);
self.total_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
update_max(&self.max_ns, elapsed_ns);
}
fn snapshot(&self) -> LatencyDiagnosticsSnapshot {
LatencyDiagnosticsSnapshot {
operations: self.operations.load(Ordering::Relaxed),
total_ns: self.total_ns.load(Ordering::Relaxed),
max_ns: self.max_ns.load(Ordering::Relaxed),
}
}
}
fn duration_ns(duration: Duration) -> u64 {
duration.as_nanos().min(u128::from(u64::MAX)) as u64
}
fn update_max(cell: &AtomicU64, sample: u64) {
let mut current = cell.load(Ordering::Relaxed);
while sample > current {
match cell.compare_exchange_weak(current, sample, Ordering::Relaxed, Ordering::Relaxed) {
Ok(_) => return,
Err(observed) => current = observed,
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::{RuntimeDiagnosticsState, RuntimeOperation};
use crate::BatMarketsBuilder;
use bat_markets_core::{Product, Venue};
#[test]
fn diagnostics_snapshot_tracks_state_lock_activity() {
let client = BatMarketsBuilder::default()
.venue(Venue::Binance)
.product(Product::LinearUsdt)
.build()
.expect("fixture client should build");
let before = client.diagnostics().snapshot();
assert_eq!(before.state_reads.operations, 0);
assert_eq!(before.state_writes.operations, 0);
let _ = client.market().instrument_specs();
client.write_state(|state| state.mark_rest_success(None));
let after = client.diagnostics().snapshot();
assert!(after.state_reads.operations >= 1);
assert!(after.state_writes.operations >= 1);
}
#[test]
fn runtime_latency_snapshot_accumulates_average_and_max() {
let diagnostics = RuntimeDiagnosticsState::default();
diagnostics.observe(
RuntimeOperation::ReconcilePrivate,
Duration::from_micros(100),
);
diagnostics.observe(
RuntimeOperation::ReconcilePrivate,
Duration::from_micros(300),
);
let snapshot = diagnostics.snapshot();
assert_eq!(snapshot.reconcile_private.operations, 2);
assert_eq!(snapshot.reconcile_private.average_ns(), 200_000);
assert_eq!(snapshot.reconcile_private.max_ns, 300_000);
}
}