Skip to main content

px_core/websocket/
traits.rs

1use chrono::{DateTime, Utc};
2use futures::Stream;
3use serde::{Deserialize, Serialize};
4use std::borrow::Cow;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU8, Ordering};
7use std::time::Duration;
8
9use crate::error::WebSocketError;
10use crate::models::{CryptoPrice, LiquidityRole, OrderbookUpdate, SportResult};
11
12/// Shared WebSocket reconnect/keepalive constants for all exchange implementations.
13pub const WS_PING_INTERVAL: Duration = Duration::from_secs(20);
14pub const WS_CRYPTO_PING_INTERVAL: Duration = Duration::from_secs(5);
15pub const WS_RECONNECT_BASE_DELAY: Duration = Duration::from_millis(3000);
16pub const WS_RECONNECT_MAX_DELAY: Duration = Duration::from_millis(60000);
17pub const WS_MAX_RECONNECT_ATTEMPTS: u32 = 10;
18
19/// Envelope wrapping every WebSocket stream item for HFT feed-latency measurement.
20///
21/// - `exchange_time` — server-authoritative UTC timestamp for event ordering.
22///   Always prefer this for trade sequencing and cross-exchange correlation.
23/// - `received_at` — local UTC timestamp captured at socket read for measuring
24///   wire-to-process latency. Compare `received_at - exchange_time` for feed lag.
25/// - `seq` — per-market monotonic sequence number for gap detection.
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct WsMessage<T> {
28    pub seq: u64,
29    pub exchange_time: Option<DateTime<Utc>>,
30    pub received_at: DateTime<Utc>,
31    pub data: T,
32}
33
34pub type OrderbookStream =
35    Pin<Box<dyn Stream<Item = Result<WsMessage<OrderbookUpdate>, WebSocketError>> + Send>>;
36pub type ActivityStream =
37    Pin<Box<dyn Stream<Item = Result<WsMessage<ActivityEvent>, WebSocketError>> + Send>>;
38pub type SportsStream = Pin<Box<dyn Stream<Item = Result<SportResult, WebSocketError>> + Send>>;
39pub type CryptoPriceStream =
40    Pin<Box<dyn Stream<Item = Result<CryptoPrice, WebSocketError>> + Send>>;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43#[repr(u8)]
44pub enum WebSocketState {
45    Disconnected = 0,
46    Connecting = 1,
47    Connected = 2,
48    Reconnecting = 3,
49    Closed = 4,
50}
51
52impl WebSocketState {
53    fn from_u8(v: u8) -> Self {
54        match v {
55            1 => Self::Connecting,
56            2 => Self::Connected,
57            3 => Self::Reconnecting,
58            4 => Self::Closed,
59            _ => Self::Disconnected,
60        }
61    }
62}
63
64/// Lock-free atomic wrapper for WebSocketState.
65/// Enables O(1) reads without acquiring any async lock.
66pub struct AtomicWebSocketState(AtomicU8);
67
68impl AtomicWebSocketState {
69    pub fn new(state: WebSocketState) -> Self {
70        Self(AtomicU8::new(state as u8))
71    }
72
73    #[inline]
74    pub fn load(&self) -> WebSocketState {
75        WebSocketState::from_u8(self.0.load(Ordering::Acquire))
76    }
77
78    #[inline]
79    pub fn store(&self, state: WebSocketState) {
80        self.0.store(state as u8, Ordering::Release);
81    }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
86pub enum ActivityEvent {
87    Trade(ActivityTrade),
88    Fill(ActivityFill),
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
93pub struct ActivityTrade {
94    pub market_id: String,
95    pub asset_id: String,
96    pub trade_id: Option<String>,
97    pub price: f64,
98    pub size: f64,
99    pub side: Option<String>,
100    pub aggressor_side: Option<String>,
101    pub outcome: Option<String>,
102    /// Fee rate in basis points (e.g. 0 = no fee, 200 = 2%).
103    /// Polymarket: present on `last_trade_price` events.
104    #[serde(default, skip_serializing_if = "Option::is_none")]
105    pub fee_rate_bps: Option<u32>,
106    pub timestamp: Option<DateTime<Utc>>,
107    pub source_channel: Cow<'static, str>,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
112pub struct ActivityFill {
113    pub market_id: String,
114    pub asset_id: String,
115    pub fill_id: Option<String>,
116    pub order_id: Option<String>,
117    pub price: f64,
118    pub size: f64,
119    pub side: Option<String>,
120    pub outcome: Option<String>,
121    /// On-chain transaction hash. Opinion: `txHash` from `trade.record.new`.
122    #[serde(default, skip_serializing_if = "Option::is_none")]
123    pub tx_hash: Option<String>,
124    /// Fee charged for this fill. Opinion: `fee` from `trade.record.new`.
125    #[serde(default, skip_serializing_if = "Option::is_none")]
126    pub fee: Option<f64>,
127    pub timestamp: Option<DateTime<Utc>>,
128    pub source_channel: Cow<'static, str>,
129    pub liquidity_role: Option<LiquidityRole>,
130}
131
132#[allow(async_fn_in_trait)]
133pub trait OrderBookWebSocket: Send + Sync {
134    async fn connect(&mut self) -> Result<(), WebSocketError>;
135
136    async fn disconnect(&mut self) -> Result<(), WebSocketError>;
137
138    async fn subscribe(&mut self, market_id: &str) -> Result<(), WebSocketError>;
139
140    async fn unsubscribe(&mut self, market_id: &str) -> Result<(), WebSocketError>;
141
142    fn state(&self) -> WebSocketState;
143
144    async fn orderbook_stream(
145        &mut self,
146        market_id: &str,
147    ) -> Result<OrderbookStream, WebSocketError>;
148
149    async fn activity_stream(
150        &mut self,
151        _market_id: &str,
152    ) -> Result<ActivityStream, WebSocketError> {
153        Err(WebSocketError::Subscription(
154            "activity stream not supported".to_string(),
155        ))
156    }
157}