px_core/websocket/events.rs
1//! Multiplexed WebSocket event types.
2//!
3//! A single `WsUpdate` stream carries every per-market event — snapshots, deltas,
4//! fills, trades, and exchange-specific escape-hatch payloads. Connection-level
5//! events (reconnect, lag, book invalidation) are split into a separate
6//! `SessionEvent` stream so a reconnect is one event, not 576.
7//!
8//! Timestamps are dual-clock by design:
9//! - `exchange_ts: Option<u64>` — exchange-authoritative millis since epoch for
10//! cross-stream ordering and feed-lag measurement.
11//! - `local_ts: Instant` — captured the moment the socket read returned, before
12//! any parse. Monotonic; correct under NTP adjustments. Skipped during
13//! serialization since `Instant` has no portable representation.
14//! - `local_ts_ms: u64` — wall-clock millis captured alongside `local_ts`,
15//! suitable for FFI / NDJSON archival when serialization is required.
16
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use serde::{Deserialize, Serialize};
21
22use crate::error::WebSocketError;
23use crate::models::{ChangeVec, Orderbook};
24use crate::websocket::traits::{ActivityFill, ActivityTrade};
25
26/// Every per-market event the WebSocket surface emits. Closed tagged union;
27/// no untyped escape hatch in the stable enum. If an exchange grows a
28/// payload we want to surface in raw form, add a separate `raw_events()`
29/// stream rather than another `WsUpdate` variant — keeps consumer
30/// `match` exhaustiveness honest.
31#[derive(Debug, Clone, Serialize)]
32#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
33#[serde(tag = "kind")]
34pub enum WsUpdate {
35 /// Full orderbook snapshot. Caller should replace any cached book for this
36 /// `market_id`. Emitted on initial subscribe and after any
37 /// `BookInvalidated` recovery path.
38 Snapshot {
39 market_id: String,
40 book: Arc<Orderbook>,
41 exchange_ts: Option<u64>,
42 #[serde(skip)]
43 #[cfg_attr(feature = "schema", schemars(skip))]
44 local_ts: Instant,
45 local_ts_ms: u64,
46 seq: u64,
47 },
48 /// Incremental change to an existing book. Apply in-place, or discard if
49 /// the caller has seen a matching `BookInvalidated` without a follow-up
50 /// `Snapshot` yet.
51 Delta {
52 market_id: String,
53 changes: ChangeVec,
54 exchange_ts: Option<u64>,
55 #[serde(skip)]
56 #[cfg_attr(feature = "schema", schemars(skip))]
57 local_ts: Instant,
58 local_ts_ms: u64,
59 seq: u64,
60 },
61 /// A public trade (any counterparty). Not tied to a local order.
62 Trade {
63 trade: ActivityTrade,
64 #[serde(skip)]
65 #[cfg_attr(feature = "schema", schemars(skip))]
66 local_ts: Instant,
67 local_ts_ms: u64,
68 },
69 /// A fill on one of the authenticated user's orders.
70 Fill {
71 fill: ActivityFill,
72 #[serde(skip)]
73 #[cfg_attr(feature = "schema", schemars(skip))]
74 local_ts: Instant,
75 local_ts_ms: u64,
76 },
77}
78
79/// Connection-level events, emitted on a channel separate from `WsUpdate` so
80/// a reconnect is observable as a single global signal rather than per-market.
81#[derive(Debug, Clone, Serialize)]
82#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
83#[serde(tag = "kind")]
84pub enum SessionEvent {
85 /// Initial socket establishment.
86 Connected,
87 /// Socket re-established after an observed outage. `gap_ms` is the
88 /// wall-clock interval between the last received message and this event.
89 /// Callers who maintain per-market books should discard them and wait for
90 /// the next `WsUpdate::Snapshot` for each subscribed market.
91 Reconnected { gap_ms: u64 },
92 /// Outbound dispatch channel overflowed — a slow consumer missed deltas.
93 /// Unlike `tokio::sync::broadcast` which silently skips ahead, openpx
94 /// raises this explicitly and invalidates every affected book, because
95 /// a missed delta corrupts book state in a way the caller cannot detect
96 /// from the stream alone.
97 Lagged {
98 dropped: u64,
99 first_seq: u64,
100 last_seq: u64,
101 },
102 /// A specific market's book is no longer trustworthy. Caller should
103 /// discard its cache for that `market_id` and wait for the next
104 /// `WsUpdate::Snapshot`.
105 BookInvalidated {
106 market_id: String,
107 reason: InvalidationReason,
108 },
109 /// A non-fatal error observed on the connection. The session continues;
110 /// the caller is informed in case they want to log or alert.
111 Error { message: String },
112}
113
114impl SessionEvent {
115 /// Construct a `Reconnected` event from a `Duration`-shaped gap. Saturating
116 /// cast at u64::MAX keeps the type stable for callers serializing to JSON.
117 #[inline]
118 pub fn reconnected(gap: Duration) -> Self {
119 Self::Reconnected {
120 gap_ms: u64::try_from(gap.as_millis()).unwrap_or(u64::MAX),
121 }
122 }
123
124 /// Convenience constructor that stringifies the upstream error.
125 #[inline]
126 pub fn error(err: WebSocketError) -> Self {
127 Self::Error {
128 message: err.to_string(),
129 }
130 }
131}
132
133/// Why a specific book was invalidated — handed to users so they can decide
134/// whether to alert, log, or handle it silently.
135#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
136#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
137pub enum InvalidationReason {
138 Reconnect,
139 Lag,
140 SequenceGap { expected: u64, received: u64 },
141 ExchangeReset,
142}
143
144impl WsUpdate {
145 /// Uniform accessor for the ingest-side monotonic timestamp. Use for
146 /// metrics and cross-update ordering; for per-market sequencing prefer
147 /// the `seq` field on `Snapshot` / `Delta`.
148 #[inline]
149 pub fn local_ts(&self) -> Instant {
150 match self {
151 Self::Snapshot { local_ts, .. }
152 | Self::Delta { local_ts, .. }
153 | Self::Trade { local_ts, .. }
154 | Self::Fill { local_ts, .. } => *local_ts,
155 }
156 }
157
158 /// Wall-clock millis paired with `local_ts`. Use for serialization and
159 /// any cross-process correlation; not monotonic.
160 #[inline]
161 pub fn local_ts_ms(&self) -> u64 {
162 match self {
163 Self::Snapshot { local_ts_ms, .. }
164 | Self::Delta { local_ts_ms, .. }
165 | Self::Trade { local_ts_ms, .. }
166 | Self::Fill { local_ts_ms, .. } => *local_ts_ms,
167 }
168 }
169
170 /// Market ID for events scoped to a single market.
171 #[inline]
172 pub fn market_id(&self) -> Option<&str> {
173 match self {
174 Self::Snapshot { market_id, .. } | Self::Delta { market_id, .. } => Some(market_id),
175 Self::Trade { trade, .. } => Some(&trade.market_id),
176 Self::Fill { fill, .. } => Some(&fill.market_id),
177 }
178 }
179}
180
181/// Capture both clocks at the same call site. Use this at the socket-read
182/// boundary so every emitted update carries paired monotonic + wall-clock
183/// timestamps.
184#[inline]
185pub fn now_pair() -> (Instant, u64) {
186 let local_ts = Instant::now();
187 let local_ts_ms = chrono::Utc::now().timestamp_millis() as u64;
188 (local_ts, local_ts_ms)
189}