Skip to main content

px_core/websocket/
traits.rs

1use chrono::{DateTime, Utc};
2use futures::Stream;
3use serde::{Deserialize, Serialize};
4use std::borrow::Cow;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU8, Ordering};
7use std::time::Duration;
8
9use crate::error::WebSocketError;
10use crate::models::{CryptoPrice, LiquidityRole, OrderbookUpdate, SportResult};
11
12/// Shared WebSocket reconnect/keepalive constants for all exchange implementations.
13pub const WS_PING_INTERVAL: Duration = Duration::from_secs(20);
14pub const WS_CRYPTO_PING_INTERVAL: Duration = Duration::from_secs(5);
15pub const WS_RECONNECT_BASE_DELAY: Duration = Duration::from_millis(3000);
16pub const WS_RECONNECT_MAX_DELAY: Duration = Duration::from_millis(60000);
17pub const WS_MAX_RECONNECT_ATTEMPTS: u32 = 10;
18
19pub type OrderbookStream =
20    Pin<Box<dyn Stream<Item = Result<OrderbookUpdate, WebSocketError>> + Send>>;
21pub type ActivityStream = Pin<Box<dyn Stream<Item = Result<ActivityEvent, WebSocketError>> + Send>>;
22pub type SportsStream = Pin<Box<dyn Stream<Item = Result<SportResult, WebSocketError>> + Send>>;
23pub type CryptoPriceStream =
24    Pin<Box<dyn Stream<Item = Result<CryptoPrice, WebSocketError>> + Send>>;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27#[repr(u8)]
28pub enum WebSocketState {
29    Disconnected = 0,
30    Connecting = 1,
31    Connected = 2,
32    Reconnecting = 3,
33    Closed = 4,
34}
35
36impl WebSocketState {
37    fn from_u8(v: u8) -> Self {
38        match v {
39            1 => Self::Connecting,
40            2 => Self::Connected,
41            3 => Self::Reconnecting,
42            4 => Self::Closed,
43            _ => Self::Disconnected,
44        }
45    }
46}
47
48/// Lock-free atomic wrapper for WebSocketState.
49/// Enables O(1) reads without acquiring any async lock.
50pub struct AtomicWebSocketState(AtomicU8);
51
52impl AtomicWebSocketState {
53    pub fn new(state: WebSocketState) -> Self {
54        Self(AtomicU8::new(state as u8))
55    }
56
57    #[inline]
58    pub fn load(&self) -> WebSocketState {
59        WebSocketState::from_u8(self.0.load(Ordering::Acquire))
60    }
61
62    #[inline]
63    pub fn store(&self, state: WebSocketState) {
64        self.0.store(state as u8, Ordering::Release);
65    }
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
69#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
70pub enum ActivityEvent {
71    Trade(ActivityTrade),
72    Fill(ActivityFill),
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
77pub struct ActivityTrade {
78    pub market_id: String,
79    pub asset_id: String,
80    pub trade_id: Option<String>,
81    pub price: f64,
82    pub size: f64,
83    pub side: Option<String>,
84    pub aggressor_side: Option<String>,
85    pub outcome: Option<String>,
86    pub timestamp: Option<DateTime<Utc>>,
87    pub source_channel: Cow<'static, str>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
92pub struct ActivityFill {
93    pub market_id: String,
94    pub asset_id: String,
95    pub fill_id: Option<String>,
96    pub order_id: Option<String>,
97    pub price: f64,
98    pub size: f64,
99    pub side: Option<String>,
100    pub outcome: Option<String>,
101    pub timestamp: Option<DateTime<Utc>>,
102    pub source_channel: Cow<'static, str>,
103    pub liquidity_role: Option<LiquidityRole>,
104}
105
106#[allow(async_fn_in_trait)]
107pub trait OrderBookWebSocket: Send + Sync {
108    async fn connect(&mut self) -> Result<(), WebSocketError>;
109
110    async fn disconnect(&mut self) -> Result<(), WebSocketError>;
111
112    async fn subscribe(&mut self, market_id: &str) -> Result<(), WebSocketError>;
113
114    async fn unsubscribe(&mut self, market_id: &str) -> Result<(), WebSocketError>;
115
116    fn state(&self) -> WebSocketState;
117
118    async fn orderbook_stream(
119        &mut self,
120        market_id: &str,
121    ) -> Result<OrderbookStream, WebSocketError>;
122
123    async fn activity_stream(
124        &mut self,
125        _market_id: &str,
126    ) -> Result<ActivityStream, WebSocketError> {
127        Err(WebSocketError::Subscription(
128            "activity stream not supported".to_string(),
129        ))
130    }
131}