Skip to main content

px_core/websocket/
events.rs

1//! Multiplexed WebSocket event types.
2//!
3//! A single `WsUpdate` stream carries every per-market event — snapshots, deltas,
4//! fills, trades, and exchange-specific escape-hatch payloads. Connection-level
5//! events (reconnect, lag, book invalidation) are split into a separate
6//! `SessionEvent` stream so a reconnect is one event, not 576.
7//!
8//! Timestamps are dual-clock by design:
9//! - `exchange_ts: Option<u64>` — exchange-authoritative millis since epoch for
10//!   cross-stream ordering and feed-lag measurement.
11//! - `local_ts: Instant` — captured the moment the socket read returned, before
12//!   any parse. Monotonic; correct under NTP adjustments. Skipped during
13//!   serialization since `Instant` has no portable representation.
14//! - `local_ts_ms: u64` — wall-clock millis captured alongside `local_ts`,
15//!   suitable for FFI / NDJSON archival when serialization is required.
16
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use serde::{Deserialize, Serialize};
21
22use crate::error::WebSocketError;
23use crate::models::{ChangeVec, Orderbook};
24use crate::websocket::traits::{ActivityFill, ActivityTrade};
25
26/// Every per-market event the WebSocket surface emits. Closed tagged union;
27/// no untyped escape hatch in the stable enum. If an exchange grows a
28/// payload we want to surface in raw form, add a separate `raw_events()`
29/// stream rather than another `WsUpdate` variant — keeps consumer
30/// `match` exhaustiveness honest.
31#[derive(Debug, Clone, Serialize)]
32#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
33#[serde(tag = "kind")]
34pub enum WsUpdate {
35    /// Full orderbook snapshot. Caller should replace any cached book keyed by
36    /// `(market_id, asset_id)`. `market_id` is the parent market on every
37    /// exchange; `asset_id` is the per-outcome identifier (Polymarket token,
38    /// Kalshi ticker, Opinion numeric market id). Emitted on initial subscribe
39    /// and after any `BookInvalidated` / `Clear` recovery path.
40    Snapshot {
41        market_id: String,
42        asset_id: String,
43        book: Arc<Orderbook>,
44        exchange_ts: Option<u64>,
45        #[serde(skip)]
46        #[cfg_attr(feature = "schema", schemars(skip))]
47        local_ts: Instant,
48        local_ts_ms: u64,
49        seq: u64,
50    },
51    /// Incremental change to an existing book. Apply in-place, or discard if
52    /// the caller has seen a matching `Clear` / `BookInvalidated` without a
53    /// follow-up `Snapshot` yet.
54    Delta {
55        market_id: String,
56        asset_id: String,
57        changes: ChangeVec,
58        exchange_ts: Option<u64>,
59        #[serde(skip)]
60        #[cfg_attr(feature = "schema", schemars(skip))]
61        local_ts: Instant,
62        local_ts_ms: u64,
63        seq: u64,
64    },
65    /// Book invalidation on the same stream as `Snapshot` / `Delta`, so a
66    /// consumer can say "seq N was Clear, drop anything with seq ≤ N, wait
67    /// for the next Snapshot" without merging with the session stream. Mirrors
68    /// `SessionEvent::BookInvalidated`, which stays as the connection-level
69    /// signal for global observability.
70    Clear {
71        market_id: String,
72        asset_id: String,
73        reason: InvalidationReason,
74        #[serde(skip)]
75        #[cfg_attr(feature = "schema", schemars(skip))]
76        local_ts: Instant,
77        local_ts_ms: u64,
78        seq: u64,
79    },
80    /// A public trade (any counterparty). Not tied to a local order.
81    Trade {
82        trade: ActivityTrade,
83        #[serde(skip)]
84        #[cfg_attr(feature = "schema", schemars(skip))]
85        local_ts: Instant,
86        local_ts_ms: u64,
87    },
88    /// A fill on one of the authenticated user's orders.
89    Fill {
90        fill: ActivityFill,
91        #[serde(skip)]
92        #[cfg_attr(feature = "schema", schemars(skip))]
93        local_ts: Instant,
94        local_ts_ms: u64,
95    },
96}
97
98/// Connection-level events, emitted on a channel separate from `WsUpdate` so
99/// a reconnect is observable as a single global signal rather than per-market.
100#[derive(Debug, Clone, Serialize)]
101#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
102#[serde(tag = "kind")]
103pub enum SessionEvent {
104    /// Initial socket establishment.
105    Connected,
106    /// Socket re-established after an observed outage. `gap_ms` is the
107    /// wall-clock interval between the last received message and this event.
108    /// Callers who maintain per-market books should discard them and wait for
109    /// the next `WsUpdate::Snapshot` for each subscribed market.
110    Reconnected { gap_ms: u64 },
111    /// Outbound dispatch channel overflowed — a slow consumer missed deltas.
112    /// Unlike `tokio::sync::broadcast` which silently skips ahead, openpx
113    /// raises this explicitly and invalidates every affected book, because
114    /// a missed delta corrupts book state in a way the caller cannot detect
115    /// from the stream alone.
116    Lagged {
117        dropped: u64,
118        first_seq: u64,
119        last_seq: u64,
120    },
121    /// A specific market's book is no longer trustworthy. Caller should
122    /// discard its cache for that `market_id` and wait for the next
123    /// `WsUpdate::Snapshot`.
124    BookInvalidated {
125        market_id: String,
126        reason: InvalidationReason,
127    },
128    /// A non-fatal error observed on the connection. The session continues;
129    /// the caller is informed in case they want to log or alert.
130    Error { message: String },
131}
132
133impl SessionEvent {
134    /// Construct a `Reconnected` event from a `Duration`-shaped gap. Saturating
135    /// cast at u64::MAX keeps the type stable for callers serializing to JSON.
136    #[inline]
137    pub fn reconnected(gap: Duration) -> Self {
138        Self::Reconnected {
139            gap_ms: u64::try_from(gap.as_millis()).unwrap_or(u64::MAX),
140        }
141    }
142
143    /// Convenience constructor that stringifies the upstream error.
144    #[inline]
145    pub fn error(err: WebSocketError) -> Self {
146        Self::Error {
147            message: err.to_string(),
148        }
149    }
150}
151
152/// Why a specific book was invalidated — handed to users so they can decide
153/// whether to alert, log, or handle it silently.
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
155#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
156pub enum InvalidationReason {
157    Reconnect,
158    Lag,
159    SequenceGap { expected: u64, received: u64 },
160    ExchangeReset,
161}
162
163impl WsUpdate {
164    /// Uniform accessor for the ingest-side monotonic timestamp. Use for
165    /// metrics and cross-update ordering; for per-market sequencing prefer
166    /// the `seq` field on `Snapshot` / `Delta`.
167    #[inline]
168    pub fn local_ts(&self) -> Instant {
169        match self {
170            Self::Snapshot { local_ts, .. }
171            | Self::Delta { local_ts, .. }
172            | Self::Clear { local_ts, .. }
173            | Self::Trade { local_ts, .. }
174            | Self::Fill { local_ts, .. } => *local_ts,
175        }
176    }
177
178    /// Wall-clock millis paired with `local_ts`. Use for serialization and
179    /// any cross-process correlation; not monotonic.
180    #[inline]
181    pub fn local_ts_ms(&self) -> u64 {
182        match self {
183            Self::Snapshot { local_ts_ms, .. }
184            | Self::Delta { local_ts_ms, .. }
185            | Self::Clear { local_ts_ms, .. }
186            | Self::Trade { local_ts_ms, .. }
187            | Self::Fill { local_ts_ms, .. } => *local_ts_ms,
188        }
189    }
190
191    /// Market ID for events scoped to a single market.
192    #[inline]
193    pub fn market_id(&self) -> Option<&str> {
194        match self {
195            Self::Snapshot { market_id, .. }
196            | Self::Delta { market_id, .. }
197            | Self::Clear { market_id, .. } => Some(market_id),
198            Self::Trade { trade, .. } => Some(&trade.market_id),
199            Self::Fill { fill, .. } => Some(&fill.market_id),
200        }
201    }
202
203    /// Asset ID for book-scoped events. Returns `None` for activity events
204    /// (`Trade` / `Fill`) where the asset is carried on the inner payload.
205    #[inline]
206    pub fn asset_id(&self) -> Option<&str> {
207        match self {
208            Self::Snapshot { asset_id, .. }
209            | Self::Delta { asset_id, .. }
210            | Self::Clear { asset_id, .. } => Some(asset_id),
211            Self::Trade { trade, .. } => Some(&trade.asset_id),
212            Self::Fill { fill, .. } => Some(&fill.asset_id),
213        }
214    }
215}
216
217/// Capture both clocks at the same call site. Use this at the socket-read
218/// boundary so every emitted update carries paired monotonic + wall-clock
219/// timestamps.
220#[inline]
221pub fn now_pair() -> (Instant, u64) {
222    let local_ts = Instant::now();
223    let local_ts_ms = chrono::Utc::now().timestamp_millis() as u64;
224    (local_ts, local_ts_ms)
225}