px_core/websocket/
traits.rs1use chrono::{DateTime, Utc};
2use futures::Stream;
3use serde::{Deserialize, Serialize};
4use std::borrow::Cow;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU8, Ordering};
7use std::time::Duration;
8
9use crate::error::WebSocketError;
10use crate::models::{CryptoPrice, LiquidityRole, OrderbookUpdate, SportResult};
11
12pub const WS_PING_INTERVAL: Duration = Duration::from_secs(20);
14pub const WS_CRYPTO_PING_INTERVAL: Duration = Duration::from_secs(5);
15pub const WS_RECONNECT_BASE_DELAY: Duration = Duration::from_millis(3000);
16pub const WS_RECONNECT_MAX_DELAY: Duration = Duration::from_millis(60000);
17pub const WS_MAX_RECONNECT_ATTEMPTS: u32 = 10;
18
19pub type OrderbookStream =
20 Pin<Box<dyn Stream<Item = Result<OrderbookUpdate, WebSocketError>> + Send>>;
21pub type ActivityStream = Pin<Box<dyn Stream<Item = Result<ActivityEvent, WebSocketError>> + Send>>;
22pub type SportsStream = Pin<Box<dyn Stream<Item = Result<SportResult, WebSocketError>> + Send>>;
23pub type CryptoPriceStream =
24 Pin<Box<dyn Stream<Item = Result<CryptoPrice, WebSocketError>> + Send>>;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27#[repr(u8)]
28pub enum WebSocketState {
29 Disconnected = 0,
30 Connecting = 1,
31 Connected = 2,
32 Reconnecting = 3,
33 Closed = 4,
34}
35
36impl WebSocketState {
37 fn from_u8(v: u8) -> Self {
38 match v {
39 1 => Self::Connecting,
40 2 => Self::Connected,
41 3 => Self::Reconnecting,
42 4 => Self::Closed,
43 _ => Self::Disconnected,
44 }
45 }
46}
47
48pub struct AtomicWebSocketState(AtomicU8);
51
52impl AtomicWebSocketState {
53 pub fn new(state: WebSocketState) -> Self {
54 Self(AtomicU8::new(state as u8))
55 }
56
57 #[inline]
58 pub fn load(&self) -> WebSocketState {
59 WebSocketState::from_u8(self.0.load(Ordering::Acquire))
60 }
61
62 #[inline]
63 pub fn store(&self, state: WebSocketState) {
64 self.0.store(state as u8, Ordering::Release);
65 }
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
69#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
70pub enum ActivityEvent {
71 Trade(ActivityTrade),
72 Fill(ActivityFill),
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
77pub struct ActivityTrade {
78 pub market_id: String,
79 pub asset_id: String,
80 pub trade_id: Option<String>,
81 pub price: f64,
82 pub size: f64,
83 pub side: Option<String>,
84 pub aggressor_side: Option<String>,
85 pub outcome: Option<String>,
86 pub timestamp: Option<DateTime<Utc>>,
87 pub source_channel: Cow<'static, str>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
92pub struct ActivityFill {
93 pub market_id: String,
94 pub asset_id: String,
95 pub fill_id: Option<String>,
96 pub order_id: Option<String>,
97 pub price: f64,
98 pub size: f64,
99 pub side: Option<String>,
100 pub outcome: Option<String>,
101 pub timestamp: Option<DateTime<Utc>>,
102 pub source_channel: Cow<'static, str>,
103 pub liquidity_role: Option<LiquidityRole>,
104}
105
106#[allow(async_fn_in_trait)]
107pub trait OrderBookWebSocket: Send + Sync {
108 async fn connect(&mut self) -> Result<(), WebSocketError>;
109
110 async fn disconnect(&mut self) -> Result<(), WebSocketError>;
111
112 async fn subscribe(&mut self, market_id: &str) -> Result<(), WebSocketError>;
113
114 async fn unsubscribe(&mut self, market_id: &str) -> Result<(), WebSocketError>;
115
116 fn state(&self) -> WebSocketState;
117
118 async fn orderbook_stream(
119 &mut self,
120 market_id: &str,
121 ) -> Result<OrderbookStream, WebSocketError>;
122
123 async fn activity_stream(
124 &mut self,
125 _market_id: &str,
126 ) -> Result<ActivityStream, WebSocketError> {
127 Err(WebSocketError::Subscription(
128 "activity stream not supported".to_string(),
129 ))
130 }
131}