Skip to main content

px_core/websocket/
events.rs

1//! Multiplexed WebSocket event types.
2//!
3//! A single `WsUpdate` stream carries every per-market event — snapshots, deltas,
4//! fills, trades, and exchange-specific escape-hatch payloads. Connection-level
5//! events (reconnect, lag, book invalidation) are split into a separate
6//! `SessionEvent` stream so a reconnect is one event, not 576.
7//!
8//! Timestamps are dual-clock by design:
9//! - `exchange_ts: Option<u64>` — exchange-authoritative millis since epoch for
10//!   cross-stream ordering and feed-lag measurement.
11//! - `local_ts: Instant` — captured the moment the socket read returned, before
12//!   any parse. Monotonic; correct under NTP adjustments. Skipped during
13//!   serialization since `Instant` has no portable representation.
14//! - `local_ts_ms: u64` — wall-clock millis captured alongside `local_ts`,
15//!   suitable for FFI / NDJSON archival when serialization is required.
16
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use serde::{Deserialize, Serialize};
21
22use crate::error::WebSocketError;
23use crate::models::{ChangeVec, Orderbook};
24use crate::websocket::traits::{ActivityFill, ActivityTrade};
25
26/// Every per-market event the WebSocket surface emits. Closed tagged union;
27/// no untyped escape hatch in the stable enum. If an exchange grows a
28/// payload we want to surface in raw form, add a separate `raw_events()`
29/// stream rather than another `WsUpdate` variant — keeps consumer
30/// `match` exhaustiveness honest.
31#[derive(Debug, Clone, Serialize)]
32#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
33#[serde(tag = "kind")]
34pub enum WsUpdate {
35    /// Full orderbook snapshot — replace any cached book for `(market_id, asset_id)`.
36    Snapshot {
37        market_id: String,
38        asset_id: String,
39        book: Arc<Orderbook>,
40        exchange_ts: Option<u64>,
41        #[serde(skip)]
42        #[cfg_attr(feature = "schema", schemars(skip))]
43        local_ts: Instant,
44        local_ts_ms: u64,
45        seq: u64,
46    },
47    /// Incremental change to an existing book — apply in-place.
48    Delta {
49        market_id: String,
50        asset_id: String,
51        changes: ChangeVec,
52        exchange_ts: Option<u64>,
53        #[serde(skip)]
54        #[cfg_attr(feature = "schema", schemars(skip))]
55        local_ts: Instant,
56        local_ts_ms: u64,
57        seq: u64,
58    },
59    /// Book invalidation — drop the cached book and wait for the next `Snapshot`.
60    Clear {
61        market_id: String,
62        asset_id: String,
63        reason: InvalidationReason,
64        #[serde(skip)]
65        #[cfg_attr(feature = "schema", schemars(skip))]
66        local_ts: Instant,
67        local_ts_ms: u64,
68        seq: u64,
69    },
70    /// A public trade (any counterparty), not tied to a local order.
71    Trade {
72        trade: ActivityTrade,
73        #[serde(skip)]
74        #[cfg_attr(feature = "schema", schemars(skip))]
75        local_ts: Instant,
76        local_ts_ms: u64,
77    },
78    /// A fill on one of the authenticated user's orders.
79    Fill {
80        fill: ActivityFill,
81        #[serde(skip)]
82        #[cfg_attr(feature = "schema", schemars(skip))]
83        local_ts: Instant,
84        local_ts_ms: u64,
85    },
86}
87
88/// Connection-level events, emitted on a channel separate from `WsUpdate` so
89/// a reconnect is observable as a single global signal rather than per-market.
90#[derive(Debug, Clone, Serialize)]
91#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
92#[serde(tag = "kind")]
93pub enum SessionEvent {
94    /// Initial socket establishment.
95    Connected,
96    /// Socket re-established after an outage — `gap_ms` is the wall-clock interval since the last received message.
97    Reconnected { gap_ms: u64 },
98    /// Slow consumer dropped messages — every affected book is invalidated and must be rebuilt from the next `Snapshot`.
99    Lagged {
100        dropped: u64,
101        first_seq: u64,
102        last_seq: u64,
103    },
104    /// A specific market's book is no longer trustworthy — drop cache and wait for next `Snapshot` (reasons: `Reconnect`, `Lag`, `SequenceGap`, `ExchangeReset`).
105    BookInvalidated {
106        market_id: String,
107        reason: InvalidationReason,
108    },
109    /// A non-fatal error observed on the connection (session continues).
110    Error { message: String },
111}
112
113impl SessionEvent {
114    /// Construct a `Reconnected` event from a `Duration`-shaped gap. Saturating
115    /// cast at u64::MAX keeps the type stable for callers serializing to JSON.
116    #[inline]
117    pub fn reconnected(gap: Duration) -> Self {
118        Self::Reconnected {
119            gap_ms: u64::try_from(gap.as_millis()).unwrap_or(u64::MAX),
120        }
121    }
122
123    /// Convenience constructor that stringifies the upstream error.
124    #[inline]
125    pub fn error(err: WebSocketError) -> Self {
126        Self::Error {
127            message: err.to_string(),
128        }
129    }
130}
131
132/// Why a specific book was invalidated — handed to users so they can decide
133/// whether to alert, log, or handle it silently.
134#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
135#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
136pub enum InvalidationReason {
137    Reconnect,
138    Lag,
139    SequenceGap { expected: u64, received: u64 },
140    ExchangeReset,
141}
142
143impl WsUpdate {
144    /// Uniform accessor for the ingest-side monotonic timestamp. Use for
145    /// metrics and cross-update ordering; for per-market sequencing prefer
146    /// the `seq` field on `Snapshot` / `Delta`.
147    #[inline]
148    pub fn local_ts(&self) -> Instant {
149        match self {
150            Self::Snapshot { local_ts, .. }
151            | Self::Delta { local_ts, .. }
152            | Self::Clear { local_ts, .. }
153            | Self::Trade { local_ts, .. }
154            | Self::Fill { local_ts, .. } => *local_ts,
155        }
156    }
157
158    /// Wall-clock millis paired with `local_ts`. Use for serialization and
159    /// any cross-process correlation; not monotonic.
160    #[inline]
161    pub fn local_ts_ms(&self) -> u64 {
162        match self {
163            Self::Snapshot { local_ts_ms, .. }
164            | Self::Delta { local_ts_ms, .. }
165            | Self::Clear { local_ts_ms, .. }
166            | Self::Trade { local_ts_ms, .. }
167            | Self::Fill { local_ts_ms, .. } => *local_ts_ms,
168        }
169    }
170
171    /// Market ID for events scoped to a single market.
172    #[inline]
173    pub fn market_id(&self) -> Option<&str> {
174        match self {
175            Self::Snapshot { market_id, .. }
176            | Self::Delta { market_id, .. }
177            | Self::Clear { market_id, .. } => Some(market_id),
178            Self::Trade { trade, .. } => Some(&trade.market_id),
179            Self::Fill { fill, .. } => Some(&fill.market_id),
180        }
181    }
182
183    /// Asset ID for book-scoped events. Returns `None` for activity events
184    /// (`Trade` / `Fill`) where the asset is carried on the inner payload.
185    #[inline]
186    pub fn asset_id(&self) -> Option<&str> {
187        match self {
188            Self::Snapshot { asset_id, .. }
189            | Self::Delta { asset_id, .. }
190            | Self::Clear { asset_id, .. } => Some(asset_id),
191            Self::Trade { trade, .. } => Some(&trade.asset_id),
192            Self::Fill { fill, .. } => Some(&fill.asset_id),
193        }
194    }
195}
196
197/// Capture both clocks at the same call site. Use this at the socket-read
198/// boundary so every emitted update carries paired monotonic + wall-clock
199/// timestamps.
200#[inline]
201pub fn now_pair() -> (Instant, u64) {
202    let local_ts = Instant::now();
203    let local_ts_ms = chrono::Utc::now().timestamp_millis() as u64;
204    (local_ts, local_ts_ms)
205}