1use chrono::{DateTime, Utc};
2use futures::Stream;
3use serde::{Deserialize, Serialize};
4use std::borrow::Cow;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU8, Ordering};
7use std::time::Duration;
8
9use crate::error::WebSocketError;
10use crate::models::{CryptoPrice, LiquidityRole, OrderbookUpdate, SportResult};
11
12pub const WS_PING_INTERVAL: Duration = Duration::from_secs(20);
14pub const WS_CRYPTO_PING_INTERVAL: Duration = Duration::from_secs(5);
15pub const WS_RECONNECT_BASE_DELAY: Duration = Duration::from_millis(3000);
16pub const WS_RECONNECT_MAX_DELAY: Duration = Duration::from_millis(60000);
17pub const WS_MAX_RECONNECT_ATTEMPTS: u32 = 10;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct WsMessage<T> {
28 pub seq: u64,
29 pub exchange_time: Option<DateTime<Utc>>,
30 pub received_at: DateTime<Utc>,
31 pub data: T,
32}
33
34pub type OrderbookStream =
35 Pin<Box<dyn Stream<Item = Result<WsMessage<OrderbookUpdate>, WebSocketError>> + Send>>;
36pub type ActivityStream =
37 Pin<Box<dyn Stream<Item = Result<WsMessage<ActivityEvent>, WebSocketError>> + Send>>;
38pub type SportsStream = Pin<Box<dyn Stream<Item = Result<SportResult, WebSocketError>> + Send>>;
39pub type CryptoPriceStream =
40 Pin<Box<dyn Stream<Item = Result<CryptoPrice, WebSocketError>> + Send>>;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43#[repr(u8)]
44pub enum WebSocketState {
45 Disconnected = 0,
46 Connecting = 1,
47 Connected = 2,
48 Reconnecting = 3,
49 Closed = 4,
50}
51
52impl WebSocketState {
53 fn from_u8(v: u8) -> Self {
54 match v {
55 1 => Self::Connecting,
56 2 => Self::Connected,
57 3 => Self::Reconnecting,
58 4 => Self::Closed,
59 _ => Self::Disconnected,
60 }
61 }
62}
63
64pub struct AtomicWebSocketState(AtomicU8);
67
68impl AtomicWebSocketState {
69 pub fn new(state: WebSocketState) -> Self {
70 Self(AtomicU8::new(state as u8))
71 }
72
73 #[inline]
74 pub fn load(&self) -> WebSocketState {
75 WebSocketState::from_u8(self.0.load(Ordering::Acquire))
76 }
77
78 #[inline]
79 pub fn store(&self, state: WebSocketState) {
80 self.0.store(state as u8, Ordering::Release);
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
86pub enum ActivityEvent {
87 Trade(ActivityTrade),
88 Fill(ActivityFill),
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
93pub struct ActivityTrade {
94 pub market_id: String,
95 pub asset_id: String,
96 pub trade_id: Option<String>,
97 pub price: f64,
98 pub size: f64,
99 pub side: Option<String>,
100 pub aggressor_side: Option<String>,
101 pub outcome: Option<String>,
102 #[serde(default, skip_serializing_if = "Option::is_none")]
105 pub fee_rate_bps: Option<u32>,
106 pub timestamp: Option<DateTime<Utc>>,
107 pub source_channel: Cow<'static, str>,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
111#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
112pub struct ActivityFill {
113 pub market_id: String,
114 pub asset_id: String,
115 pub fill_id: Option<String>,
116 pub order_id: Option<String>,
117 pub price: f64,
118 pub size: f64,
119 pub side: Option<String>,
120 pub outcome: Option<String>,
121 #[serde(default, skip_serializing_if = "Option::is_none")]
123 pub tx_hash: Option<String>,
124 #[serde(default, skip_serializing_if = "Option::is_none")]
126 pub fee: Option<f64>,
127 pub timestamp: Option<DateTime<Utc>>,
128 pub source_channel: Cow<'static, str>,
129 pub liquidity_role: Option<LiquidityRole>,
130}
131
132#[allow(async_fn_in_trait)]
133pub trait OrderBookWebSocket: Send + Sync {
134 async fn connect(&mut self) -> Result<(), WebSocketError>;
135
136 async fn disconnect(&mut self) -> Result<(), WebSocketError>;
137
138 async fn subscribe(&mut self, market_id: &str) -> Result<(), WebSocketError>;
139
140 async fn unsubscribe(&mut self, market_id: &str) -> Result<(), WebSocketError>;
141
142 fn state(&self) -> WebSocketState;
143
144 async fn orderbook_stream(
145 &mut self,
146 market_id: &str,
147 ) -> Result<OrderbookStream, WebSocketError>;
148
149 async fn activity_stream(
150 &mut self,
151 _market_id: &str,
152 ) -> Result<ActivityStream, WebSocketError> {
153 Err(WebSocketError::Subscription(
154 "activity stream not supported".to_string(),
155 ))
156 }
157}