1use chrono::{DateTime, Utc};
2use futures::Stream;
3use serde::{Deserialize, Serialize};
4use std::borrow::Cow;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU8, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::RwLock;
10use tokio::time::interval;
11
12use crate::error::WebSocketError;
13use crate::models::{CryptoPrice, LiquidityRole, SportResult};
14use crate::websocket::stream::{SessionStream, UpdateStream};
15
16pub const WS_PING_INTERVAL: Duration = Duration::from_secs(20);
18pub const WS_CRYPTO_PING_INTERVAL: Duration = Duration::from_secs(5);
19pub const WS_RECONNECT_BASE_DELAY: Duration = Duration::from_millis(3000);
20pub const WS_RECONNECT_MAX_DELAY: Duration = Duration::from_millis(60000);
21pub const WS_MAX_RECONNECT_ATTEMPTS: u32 = 10;
22
23pub const WS_STALL_TIMEOUT: Duration = Duration::from_secs(90);
29pub const WS_STALL_CHECK_INTERVAL: Duration = Duration::from_secs(10);
30
31pub async fn stall_watchdog(last_message_at: Arc<RwLock<Option<DateTime<Utc>>>>) {
39 let mut tick = interval(WS_STALL_CHECK_INTERVAL);
40 tick.tick().await; loop {
42 tick.tick().await;
43 let last = *last_message_at.read().await;
44 if let Some(last) = last {
45 let age = Utc::now() - last;
46 if age.to_std().is_ok_and(|d| d > WS_STALL_TIMEOUT) {
47 tracing::warn!(
48 stall_secs = age.num_seconds(),
49 "no messages for >{}s; forcing reconnect",
50 WS_STALL_TIMEOUT.as_secs()
51 );
52 return;
53 }
54 }
55 }
56}
57
58pub type SportsStream = Pin<Box<dyn Stream<Item = Result<SportResult, WebSocketError>> + Send>>;
59pub type CryptoPriceStream =
60 Pin<Box<dyn Stream<Item = Result<CryptoPrice, WebSocketError>> + Send>>;
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63#[repr(u8)]
64pub enum WebSocketState {
65 Disconnected = 0,
66 Connecting = 1,
67 Connected = 2,
68 Reconnecting = 3,
69 Closed = 4,
70}
71
72impl WebSocketState {
73 fn from_u8(v: u8) -> Self {
74 match v {
75 1 => Self::Connecting,
76 2 => Self::Connected,
77 3 => Self::Reconnecting,
78 4 => Self::Closed,
79 _ => Self::Disconnected,
80 }
81 }
82
83 #[inline]
86 pub const fn as_str(self) -> &'static str {
87 match self {
88 Self::Disconnected => "Disconnected",
89 Self::Connecting => "Connecting",
90 Self::Connected => "Connected",
91 Self::Reconnecting => "Reconnecting",
92 Self::Closed => "Closed",
93 }
94 }
95}
96
97impl std::fmt::Display for WebSocketState {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.write_str(self.as_str())
100 }
101}
102
103pub struct AtomicWebSocketState(AtomicU8);
106
107impl AtomicWebSocketState {
108 pub fn new(state: WebSocketState) -> Self {
109 Self(AtomicU8::new(state as u8))
110 }
111
112 #[inline]
113 pub fn load(&self) -> WebSocketState {
114 WebSocketState::from_u8(self.0.load(Ordering::Acquire))
115 }
116
117 #[inline]
118 pub fn store(&self, state: WebSocketState) {
119 self.0.store(state as u8, Ordering::Release);
120 }
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
132#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
133pub struct ActivityTrade {
134 pub market_id: String,
135 pub asset_id: String,
136 pub trade_id: Option<String>,
137 pub price: f64,
138 pub size: f64,
139 pub side: Option<String>,
140 pub aggressor_side: Option<String>,
141 pub outcome: Option<String>,
142 #[serde(default, skip_serializing_if = "Option::is_none")]
145 pub fee_rate_bps: Option<u32>,
146 #[serde(default, skip_serializing_if = "Option::is_none")]
148 pub exchange_ts_ms: Option<u64>,
149 pub source_channel: Cow<'static, str>,
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
153#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
154pub struct ActivityFill {
155 pub market_id: String,
156 pub asset_id: String,
157 pub fill_id: Option<String>,
158 pub order_id: Option<String>,
159 pub price: f64,
160 pub size: f64,
161 pub side: Option<String>,
162 pub outcome: Option<String>,
163 #[serde(default, skip_serializing_if = "Option::is_none")]
165 pub tx_hash: Option<String>,
166 #[serde(default, skip_serializing_if = "Option::is_none")]
168 pub fee: Option<f64>,
169 #[serde(default, skip_serializing_if = "Option::is_none")]
171 pub exchange_ts_ms: Option<u64>,
172 pub source_channel: Cow<'static, str>,
173 pub liquidity_role: Option<LiquidityRole>,
174}
175
176#[allow(async_fn_in_trait)]
189pub trait OrderBookWebSocket: Send + Sync {
190 async fn connect(&mut self) -> Result<(), WebSocketError>;
191 async fn disconnect(&mut self) -> Result<(), WebSocketError>;
192 async fn subscribe(&mut self, market_id: &str) -> Result<(), WebSocketError>;
193 async fn unsubscribe(&mut self, market_id: &str) -> Result<(), WebSocketError>;
194 fn state(&self) -> WebSocketState;
195 fn updates(&self) -> Option<UpdateStream>;
198 fn session_events(&self) -> Option<SessionStream>;
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use tokio::time::{sleep, Duration as TDuration};
207
208 #[tokio::test]
213 async fn watchdog_does_not_fire_when_last_message_is_none() {
214 let last = Arc::new(RwLock::new(None::<DateTime<Utc>>));
215 let result = tokio::time::timeout(
218 WS_STALL_CHECK_INTERVAL + TDuration::from_secs(5),
219 stall_watchdog(last),
220 )
221 .await;
222 assert!(
223 result.is_err(),
224 "watchdog returned {result:?} despite last_message_at being None"
225 );
226 }
227
228 #[tokio::test]
230 async fn watchdog_does_not_fire_with_recent_activity() {
231 let last = Arc::new(RwLock::new(Some(Utc::now())));
232 let result = tokio::time::timeout(
233 WS_STALL_CHECK_INTERVAL + TDuration::from_secs(5),
234 stall_watchdog(last),
235 )
236 .await;
237 assert!(result.is_err(), "watchdog fired despite fresh activity");
238 }
239
240 #[tokio::test]
245 async fn watchdog_fires_when_last_message_is_stale() {
246 let stale = Utc::now()
247 - chrono::Duration::from_std(WS_STALL_TIMEOUT).unwrap()
248 - chrono::Duration::seconds(5);
249 let last = Arc::new(RwLock::new(Some(stale)));
250 let result = tokio::time::timeout(
252 WS_STALL_CHECK_INTERVAL + TDuration::from_secs(2),
253 stall_watchdog(last),
254 )
255 .await;
256 assert!(result.is_ok(), "watchdog did not fire on stale timestamp");
257 }
258
259 #[tokio::test]
263 async fn watchdog_stays_asleep_under_continuous_activity() {
264 let last = Arc::new(RwLock::new(Some(Utc::now())));
265 let last_clone = last.clone();
266 let refresher = tokio::spawn(async move {
267 for _ in 0..15 {
268 sleep(TDuration::from_secs(1)).await;
269 *last_clone.write().await = Some(Utc::now());
270 }
271 });
272 let result = tokio::time::timeout(TDuration::from_secs(15), stall_watchdog(last)).await;
273 refresher.abort();
274 assert!(
275 result.is_err(),
276 "watchdog fired despite messages every 1s for 15s"
277 );
278 }
279}