Skip to main content

px_core/websocket/
traits.rs

1use chrono::{DateTime, Utc};
2use futures::Stream;
3use serde::{Deserialize, Serialize};
4use std::borrow::Cow;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU8, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::RwLock;
10use tokio::time::interval;
11
12use crate::error::WebSocketError;
13use crate::models::{CryptoPrice, LiquidityRole, SportResult};
14use crate::websocket::stream::{SessionStream, UpdateStream};
15
16/// Shared WebSocket reconnect/keepalive constants for all exchange implementations.
17pub const WS_PING_INTERVAL: Duration = Duration::from_secs(20);
18pub const WS_CRYPTO_PING_INTERVAL: Duration = Duration::from_secs(5);
19pub const WS_RECONNECT_BASE_DELAY: Duration = Duration::from_millis(3000);
20pub const WS_RECONNECT_MAX_DELAY: Duration = Duration::from_millis(60000);
21pub const WS_MAX_RECONNECT_ATTEMPTS: u32 = 10;
22
23/// Force a reconnect if no message has been received for this long. Sized
24/// for the quietest live venue: a 50-market Kalshi subscription at
25/// ~0.5 msg/s hit a 40s max-quiet window in smoke testing, so 60s was
26/// only 33% above observed worst-case. 90s gives ~2× headroom while
27/// still shrinking macOS's ~30-minute silent-death window by ~20×.
28pub const WS_STALL_TIMEOUT: Duration = Duration::from_secs(90);
29pub const WS_STALL_CHECK_INTERVAL: Duration = Duration::from_secs(10);
30
31/// Returns once `last_message_at` is older than `WS_STALL_TIMEOUT`.
32///
33/// Drop into a `tokio::select!` alongside the read loop to break out when
34/// the socket goes silent (half-open connection, server-side hang, NAT
35/// timeout). The existing reconnect path then handles it. Without this,
36/// a dead socket can sit there for 30+ minutes before OS keepalive
37/// surfaces an error — observed in production smoke tests.
38pub async fn stall_watchdog(last_message_at: Arc<RwLock<Option<DateTime<Utc>>>>) {
39    let mut tick = interval(WS_STALL_CHECK_INTERVAL);
40    tick.tick().await; // first tick fires immediately; skip
41    loop {
42        tick.tick().await;
43        let last = *last_message_at.read().await;
44        if let Some(last) = last {
45            let age = Utc::now() - last;
46            if age.to_std().is_ok_and(|d| d > WS_STALL_TIMEOUT) {
47                tracing::warn!(
48                    stall_secs = age.num_seconds(),
49                    "no messages for >{}s; forcing reconnect",
50                    WS_STALL_TIMEOUT.as_secs()
51                );
52                return;
53            }
54        }
55    }
56}
57
58pub type SportsStream = Pin<Box<dyn Stream<Item = Result<SportResult, WebSocketError>> + Send>>;
59pub type CryptoPriceStream =
60    Pin<Box<dyn Stream<Item = Result<CryptoPrice, WebSocketError>> + Send>>;
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63#[repr(u8)]
64pub enum WebSocketState {
65    Disconnected = 0,
66    Connecting = 1,
67    Connected = 2,
68    Reconnecting = 3,
69    Closed = 4,
70}
71
72impl WebSocketState {
73    fn from_u8(v: u8) -> Self {
74        match v {
75            1 => Self::Connecting,
76            2 => Self::Connected,
77            3 => Self::Reconnecting,
78            4 => Self::Closed,
79            _ => Self::Disconnected,
80        }
81    }
82
83    /// Stable string label, matching the `Display` impl. Bindings should use
84    /// this rather than `Debug` formatting, which is not a stability guarantee.
85    #[inline]
86    pub const fn as_str(self) -> &'static str {
87        match self {
88            Self::Disconnected => "Disconnected",
89            Self::Connecting => "Connecting",
90            Self::Connected => "Connected",
91            Self::Reconnecting => "Reconnecting",
92            Self::Closed => "Closed",
93        }
94    }
95}
96
97impl std::fmt::Display for WebSocketState {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.write_str(self.as_str())
100    }
101}
102
103/// Lock-free atomic wrapper for WebSocketState. O(1) reads without acquiring
104/// any async lock.
105pub struct AtomicWebSocketState(AtomicU8);
106
107impl AtomicWebSocketState {
108    pub fn new(state: WebSocketState) -> Self {
109        Self(AtomicU8::new(state as u8))
110    }
111
112    #[inline]
113    pub fn load(&self) -> WebSocketState {
114        WebSocketState::from_u8(self.0.load(Ordering::Acquire))
115    }
116
117    #[inline]
118    pub fn store(&self, state: WebSocketState) {
119        self.0.store(state as u8, Ordering::Release);
120    }
121}
122
123/// Activity events still carried inside `WsUpdate::Trade` / `WsUpdate::Fill`.
124/// Retained as a typed payload alongside the stream rather than as a separate
125/// per-token surface.
126///
127/// `exchange_ts_ms` is exchange-authoritative millis since epoch — uniform
128/// with `WsUpdate::{Snapshot, Delta}::exchange_ts`, so every timestamp on
129/// the WS surface is the same type (`u64` millis). `chrono::DateTime` was
130/// more expressive but cost a representation mismatch at every FFI boundary.
131#[derive(Debug, Clone, Serialize, Deserialize)]
132#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
133pub struct ActivityTrade {
134    pub market_id: String,
135    pub asset_id: String,
136    pub trade_id: Option<String>,
137    pub price: f64,
138    pub size: f64,
139    pub side: Option<String>,
140    pub aggressor_side: Option<String>,
141    pub outcome: Option<String>,
142    /// Fee rate in basis points (e.g. 0 = no fee, 200 = 2%). Polymarket
143    /// `last_trade_price` events populate this.
144    #[serde(default, skip_serializing_if = "Option::is_none")]
145    pub fee_rate_bps: Option<u32>,
146    /// Exchange-authoritative timestamp (millis since epoch).
147    #[serde(default, skip_serializing_if = "Option::is_none")]
148    pub exchange_ts_ms: Option<u64>,
149    pub source_channel: Cow<'static, str>,
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
153#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
154pub struct ActivityFill {
155    pub market_id: String,
156    pub asset_id: String,
157    pub fill_id: Option<String>,
158    pub order_id: Option<String>,
159    pub price: f64,
160    pub size: f64,
161    pub side: Option<String>,
162    pub outcome: Option<String>,
163    /// On-chain transaction hash, when the exchange publishes one.
164    #[serde(default, skip_serializing_if = "Option::is_none")]
165    pub tx_hash: Option<String>,
166    /// Fee charged for this fill, when the exchange publishes one.
167    #[serde(default, skip_serializing_if = "Option::is_none")]
168    pub fee: Option<f64>,
169    /// Exchange-authoritative timestamp (millis since epoch).
170    #[serde(default, skip_serializing_if = "Option::is_none")]
171    pub exchange_ts_ms: Option<u64>,
172    pub source_channel: Cow<'static, str>,
173    pub liquidity_role: Option<LiquidityRole>,
174}
175
176/// WebSocket driver trait. Surface is deliberately small: connect/disconnect,
177/// subscribe/unsubscribe per market, and hand out two multiplexed streams.
178///
179/// `updates()` carries per-market book and activity events; `session_events()`
180/// carries connection-level signals that a consumer wants to observe exactly
181/// once regardless of how many markets are subscribed.
182///
183/// Both stream methods are take-once: subsequent calls return `None`. The
184/// underlying channel is single-consumer by contract — handing out cloned
185/// receivers would split messages silently between holders, so a second
186/// "debug sidecar" consumer is rejected at the call site instead. For
187/// fan-out, run one consumer that re-dispatches.
188#[allow(async_fn_in_trait)]
189pub trait OrderBookWebSocket: Send + Sync {
190    async fn connect(&mut self) -> Result<(), WebSocketError>;
191    async fn disconnect(&mut self) -> Result<(), WebSocketError>;
192    async fn subscribe(&mut self, market_ticker: &str) -> Result<(), WebSocketError>;
193    async fn unsubscribe(&mut self, market_ticker: &str) -> Result<(), WebSocketError>;
194    fn state(&self) -> WebSocketState;
195    /// Take ownership of the multiplexed update stream. Returns `None` if
196    /// already taken.
197    fn updates(&self) -> Option<UpdateStream>;
198    /// Take ownership of the connection-level session event stream. Returns
199    /// `None` if already taken.
200    fn session_events(&self) -> Option<SessionStream>;
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use tokio::time::{sleep, Duration as TDuration};
207
208    /// Watchdog must fire after WS_STALL_TIMEOUT of silence even if
209    /// `last_message_at` was never written (stays `None`)? Actually no —
210    /// `None` means "we haven't received anything yet, so don't trigger";
211    /// fresh connections need a grace period before traffic starts.
212    #[tokio::test]
213    async fn watchdog_does_not_fire_when_last_message_is_none() {
214        let last = Arc::new(RwLock::new(None::<DateTime<Utc>>));
215        // Race the watchdog against a 1.5x check-interval timeout — the
216        // watchdog must NOT return in that window.
217        let result = tokio::time::timeout(
218            WS_STALL_CHECK_INTERVAL + TDuration::from_secs(5),
219            stall_watchdog(last),
220        )
221        .await;
222        assert!(
223            result.is_err(),
224            "watchdog returned {result:?} despite last_message_at being None"
225        );
226    }
227
228    /// If a recent message has been recorded, watchdog stays asleep.
229    #[tokio::test]
230    async fn watchdog_does_not_fire_with_recent_activity() {
231        let last = Arc::new(RwLock::new(Some(Utc::now())));
232        let result = tokio::time::timeout(
233            WS_STALL_CHECK_INTERVAL + TDuration::from_secs(5),
234            stall_watchdog(last),
235        )
236        .await;
237        assert!(result.is_err(), "watchdog fired despite fresh activity");
238    }
239
240    /// If `last_message_at` is older than the threshold at the moment of the
241    /// next check tick, watchdog must return promptly. We pre-age the
242    /// timestamp by `WS_STALL_TIMEOUT + 5s` so the very first post-`tick`
243    /// read sees it as stale.
244    #[tokio::test]
245    async fn watchdog_fires_when_last_message_is_stale() {
246        let stale = Utc::now()
247            - chrono::Duration::from_std(WS_STALL_TIMEOUT).unwrap()
248            - chrono::Duration::seconds(5);
249        let last = Arc::new(RwLock::new(Some(stale)));
250        // Should fire within one check interval (10s) plus slack.
251        let result = tokio::time::timeout(
252            WS_STALL_CHECK_INTERVAL + TDuration::from_secs(2),
253            stall_watchdog(last),
254        )
255        .await;
256        assert!(result.is_ok(), "watchdog did not fire on stale timestamp");
257    }
258
259    /// Continuous activity (refresh `last_message_at` every 1s) must keep
260    /// the watchdog asleep indefinitely. Models the "active market"
261    /// scenario.
262    #[tokio::test]
263    async fn watchdog_stays_asleep_under_continuous_activity() {
264        let last = Arc::new(RwLock::new(Some(Utc::now())));
265        let last_clone = last.clone();
266        let refresher = tokio::spawn(async move {
267            for _ in 0..15 {
268                sleep(TDuration::from_secs(1)).await;
269                *last_clone.write().await = Some(Utc::now());
270            }
271        });
272        let result = tokio::time::timeout(TDuration::from_secs(15), stall_watchdog(last)).await;
273        refresher.abort();
274        assert!(
275            result.is_err(),
276            "watchdog fired despite messages every 1s for 15s"
277        );
278    }
279}