px_core/websocket/events.rs
1//! Multiplexed WebSocket event types.
2//!
3//! A single `WsUpdate` stream carries every per-market event — snapshots, deltas,
4//! fills, trades, and exchange-specific escape-hatch payloads. Connection-level
5//! events (reconnect, lag, book invalidation) are split into a separate
6//! `SessionEvent` stream so a reconnect is one event, not 576.
7//!
8//! Timestamps are dual-clock by design:
9//! - `exchange_ts: Option<u64>` — exchange-authoritative millis since epoch for
10//! cross-stream ordering and feed-lag measurement.
11//! - `local_ts: Instant` — captured the moment the socket read returned, before
12//! any parse. Monotonic; correct under NTP adjustments. Skipped during
13//! serialization since `Instant` has no portable representation.
14//! - `local_ts_ms: u64` — wall-clock millis captured alongside `local_ts`,
15//! suitable for FFI / NDJSON archival when serialization is required.
16
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use serde::{Deserialize, Serialize};
21
22use crate::error::WebSocketError;
23use crate::models::{ChangeVec, Orderbook};
24use crate::websocket::traits::{ActivityFill, ActivityTrade};
25
26/// Every per-market event the WebSocket surface emits. Closed tagged union;
27/// no untyped escape hatch in the stable enum. If an exchange grows a
28/// payload we want to surface in raw form, add a separate `raw_events()`
29/// stream rather than another `WsUpdate` variant — keeps consumer
30/// `match` exhaustiveness honest.
31#[derive(Debug, Clone, Serialize)]
32#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
33#[serde(tag = "kind")]
34pub enum WsUpdate {
35 /// Full orderbook snapshot. Caller should replace any cached book keyed by
36 /// `(market_id, asset_id)`. `market_id` is the parent market on every
37 /// exchange; `asset_id` is the per-outcome identifier (Polymarket token,
38 /// Kalshi ticker, Opinion numeric market id). Emitted on initial subscribe
39 /// and after any `BookInvalidated` / `Clear` recovery path.
40 Snapshot {
41 market_id: String,
42 asset_id: String,
43 book: Arc<Orderbook>,
44 exchange_ts: Option<u64>,
45 #[serde(skip)]
46 #[cfg_attr(feature = "schema", schemars(skip))]
47 local_ts: Instant,
48 local_ts_ms: u64,
49 seq: u64,
50 },
51 /// Incremental change to an existing book. Apply in-place, or discard if
52 /// the caller has seen a matching `Clear` / `BookInvalidated` without a
53 /// follow-up `Snapshot` yet.
54 Delta {
55 market_id: String,
56 asset_id: String,
57 changes: ChangeVec,
58 exchange_ts: Option<u64>,
59 #[serde(skip)]
60 #[cfg_attr(feature = "schema", schemars(skip))]
61 local_ts: Instant,
62 local_ts_ms: u64,
63 seq: u64,
64 },
65 /// Book invalidation on the same stream as `Snapshot` / `Delta`, so a
66 /// consumer can say "seq N was Clear, drop anything with seq ≤ N, wait
67 /// for the next Snapshot" without merging with the session stream. Mirrors
68 /// `SessionEvent::BookInvalidated`, which stays as the connection-level
69 /// signal for global observability.
70 Clear {
71 market_id: String,
72 asset_id: String,
73 reason: InvalidationReason,
74 #[serde(skip)]
75 #[cfg_attr(feature = "schema", schemars(skip))]
76 local_ts: Instant,
77 local_ts_ms: u64,
78 seq: u64,
79 },
80 /// A public trade (any counterparty). Not tied to a local order.
81 Trade {
82 trade: ActivityTrade,
83 #[serde(skip)]
84 #[cfg_attr(feature = "schema", schemars(skip))]
85 local_ts: Instant,
86 local_ts_ms: u64,
87 },
88 /// A fill on one of the authenticated user's orders.
89 Fill {
90 fill: ActivityFill,
91 #[serde(skip)]
92 #[cfg_attr(feature = "schema", schemars(skip))]
93 local_ts: Instant,
94 local_ts_ms: u64,
95 },
96}
97
98/// Connection-level events, emitted on a channel separate from `WsUpdate` so
99/// a reconnect is observable as a single global signal rather than per-market.
100#[derive(Debug, Clone, Serialize)]
101#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
102#[serde(tag = "kind")]
103pub enum SessionEvent {
104 /// Initial socket establishment.
105 Connected,
106 /// Socket re-established after an observed outage. `gap_ms` is the
107 /// wall-clock interval between the last received message and this event.
108 /// Callers who maintain per-market books should discard them and wait for
109 /// the next `WsUpdate::Snapshot` for each subscribed market.
110 Reconnected { gap_ms: u64 },
111 /// Outbound dispatch channel overflowed — a slow consumer missed deltas.
112 /// Unlike `tokio::sync::broadcast` which silently skips ahead, openpx
113 /// raises this explicitly and invalidates every affected book, because
114 /// a missed delta corrupts book state in a way the caller cannot detect
115 /// from the stream alone.
116 Lagged {
117 dropped: u64,
118 first_seq: u64,
119 last_seq: u64,
120 },
121 /// A specific market's book is no longer trustworthy. Caller should
122 /// discard its cache for that `market_id` and wait for the next
123 /// `WsUpdate::Snapshot`.
124 BookInvalidated {
125 market_id: String,
126 reason: InvalidationReason,
127 },
128 /// A non-fatal error observed on the connection. The session continues;
129 /// the caller is informed in case they want to log or alert.
130 Error { message: String },
131}
132
133impl SessionEvent {
134 /// Construct a `Reconnected` event from a `Duration`-shaped gap. Saturating
135 /// cast at u64::MAX keeps the type stable for callers serializing to JSON.
136 #[inline]
137 pub fn reconnected(gap: Duration) -> Self {
138 Self::Reconnected {
139 gap_ms: u64::try_from(gap.as_millis()).unwrap_or(u64::MAX),
140 }
141 }
142
143 /// Convenience constructor that stringifies the upstream error.
144 #[inline]
145 pub fn error(err: WebSocketError) -> Self {
146 Self::Error {
147 message: err.to_string(),
148 }
149 }
150}
151
152/// Why a specific book was invalidated — handed to users so they can decide
153/// whether to alert, log, or handle it silently.
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
155#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
156pub enum InvalidationReason {
157 Reconnect,
158 Lag,
159 SequenceGap { expected: u64, received: u64 },
160 ExchangeReset,
161}
162
163impl WsUpdate {
164 /// Uniform accessor for the ingest-side monotonic timestamp. Use for
165 /// metrics and cross-update ordering; for per-market sequencing prefer
166 /// the `seq` field on `Snapshot` / `Delta`.
167 #[inline]
168 pub fn local_ts(&self) -> Instant {
169 match self {
170 Self::Snapshot { local_ts, .. }
171 | Self::Delta { local_ts, .. }
172 | Self::Clear { local_ts, .. }
173 | Self::Trade { local_ts, .. }
174 | Self::Fill { local_ts, .. } => *local_ts,
175 }
176 }
177
178 /// Wall-clock millis paired with `local_ts`. Use for serialization and
179 /// any cross-process correlation; not monotonic.
180 #[inline]
181 pub fn local_ts_ms(&self) -> u64 {
182 match self {
183 Self::Snapshot { local_ts_ms, .. }
184 | Self::Delta { local_ts_ms, .. }
185 | Self::Clear { local_ts_ms, .. }
186 | Self::Trade { local_ts_ms, .. }
187 | Self::Fill { local_ts_ms, .. } => *local_ts_ms,
188 }
189 }
190
191 /// Market ID for events scoped to a single market.
192 #[inline]
193 pub fn market_id(&self) -> Option<&str> {
194 match self {
195 Self::Snapshot { market_id, .. }
196 | Self::Delta { market_id, .. }
197 | Self::Clear { market_id, .. } => Some(market_id),
198 Self::Trade { trade, .. } => Some(&trade.market_id),
199 Self::Fill { fill, .. } => Some(&fill.market_id),
200 }
201 }
202
203 /// Asset ID for book-scoped events. Returns `None` for activity events
204 /// (`Trade` / `Fill`) where the asset is carried on the inner payload.
205 #[inline]
206 pub fn asset_id(&self) -> Option<&str> {
207 match self {
208 Self::Snapshot { asset_id, .. }
209 | Self::Delta { asset_id, .. }
210 | Self::Clear { asset_id, .. } => Some(asset_id),
211 Self::Trade { trade, .. } => Some(&trade.asset_id),
212 Self::Fill { fill, .. } => Some(&fill.asset_id),
213 }
214 }
215}
216
217/// Capture both clocks at the same call site. Use this at the socket-read
218/// boundary so every emitted update carries paired monotonic + wall-clock
219/// timestamps.
220#[inline]
221pub fn now_pair() -> (Instant, u64) {
222 let local_ts = Instant::now();
223 let local_ts_ms = chrono::Utc::now().timestamp_millis() as u64;
224 (local_ts, local_ts_ms)
225}