drm_exchange_limitless/
websocket.rs

1use async_trait::async_trait;
2use futures::{FutureExt, StreamExt};
3use rust_socketio::{
4    asynchronous::{Client, ClientBuilder},
5    Payload,
6};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11
12use drm_core::{
13    OrderBookWebSocket, Orderbook, OrderbookStream, PriceLevel, WebSocketError, WebSocketState,
14};
15
16const WS_URL: &str = "wss://ws.limitless.exchange";
17const NAMESPACE: &str = "/markets";
18
19#[derive(Debug, Clone, Serialize)]
20struct SubscribePayload {
21    #[serde(rename = "marketSlugs", skip_serializing_if = "Vec::is_empty")]
22    market_slugs: Vec<String>,
23    #[serde(rename = "marketAddresses", skip_serializing_if = "Vec::is_empty")]
24    market_addresses: Vec<String>,
25}
26
27#[derive(Debug, Clone, Deserialize)]
28struct OrderbookUpdateData {
29    #[serde(rename = "marketSlug", alias = "slug")]
30    market_slug: Option<String>,
31    orderbook: Option<OrderbookData>,
32    bids: Option<Vec<PriceLevelData>>,
33    asks: Option<Vec<PriceLevelData>>,
34    #[allow(dead_code)]
35    timestamp: Option<serde_json::Value>,
36}
37
38#[derive(Debug, Clone, Deserialize)]
39struct OrderbookData {
40    bids: Option<Vec<PriceLevelData>>,
41    asks: Option<Vec<PriceLevelData>>,
42}
43
44#[derive(Debug, Clone, Deserialize)]
45struct PriceLevelData {
46    price: serde_json::Value,
47    size: serde_json::Value,
48}
49
50#[derive(Debug, Clone, Deserialize)]
51struct PriceUpdateData {
52    #[serde(rename = "marketAddress")]
53    market_address: Option<String>,
54    #[serde(rename = "updatedPrices")]
55    updated_prices: Option<PriceData>,
56    #[serde(rename = "blockNumber")]
57    #[allow(dead_code)]
58    block_number: Option<i64>,
59}
60
61#[derive(Debug, Clone, Deserialize)]
62struct PriceData {
63    yes: Option<f64>,
64    no: Option<f64>,
65}
66
67type OrderbookSender = broadcast::Sender<Result<Orderbook, WebSocketError>>;
68
69struct SharedState {
70    ws_state: WebSocketState,
71    subscribed_slugs: Vec<String>,
72    subscribed_addresses: Vec<String>,
73    orderbook_senders: HashMap<String, OrderbookSender>,
74    orderbooks: HashMap<String, Orderbook>,
75}
76
77impl SharedState {
78    fn new() -> Self {
79        Self {
80            ws_state: WebSocketState::Disconnected,
81            subscribed_slugs: Vec::new(),
82            subscribed_addresses: Vec::new(),
83            orderbook_senders: HashMap::new(),
84            orderbooks: HashMap::new(),
85        }
86    }
87}
88
89pub struct LimitlessWebSocket {
90    shared: Arc<RwLock<SharedState>>,
91    client: Arc<RwLock<Option<Client>>>,
92    #[allow(dead_code)]
93    auto_reconnect: bool,
94}
95
96impl LimitlessWebSocket {
97    pub fn new() -> Self {
98        Self::with_config(true)
99    }
100
101    pub fn with_config(auto_reconnect: bool) -> Self {
102        Self {
103            shared: Arc::new(RwLock::new(SharedState::new())),
104            client: Arc::new(RwLock::new(None)),
105            auto_reconnect,
106        }
107    }
108
109    async fn set_state(&self, new_state: WebSocketState) {
110        let mut shared = self.shared.write().await;
111        shared.ws_state = new_state;
112    }
113
114    async fn send_subscription(&self) -> Result<(), WebSocketError> {
115        let client_guard = self.client.read().await;
116        let client = client_guard
117            .as_ref()
118            .ok_or_else(|| WebSocketError::Connection("not connected".into()))?;
119
120        let shared = self.shared.read().await;
121        if shared.subscribed_slugs.is_empty() && shared.subscribed_addresses.is_empty() {
122            return Ok(());
123        }
124
125        let payload = SubscribePayload {
126            market_slugs: shared.subscribed_slugs.clone(),
127            market_addresses: shared.subscribed_addresses.clone(),
128        };
129
130        let json =
131            serde_json::to_value(&payload).map_err(|e| WebSocketError::Protocol(e.to_string()))?;
132
133        client
134            .emit("subscribe_market_prices", json)
135            .await
136            .map_err(|e| WebSocketError::Connection(e.to_string()))?;
137
138        Ok(())
139    }
140
141    fn parse_price_level(data: &PriceLevelData) -> Option<PriceLevel> {
142        let price = match &data.price {
143            serde_json::Value::Number(n) => n.as_f64()?,
144            serde_json::Value::String(s) => s.parse::<f64>().ok()?,
145            _ => return None,
146        };
147
148        let size = match &data.size {
149            serde_json::Value::Number(n) => n.as_f64()?,
150            serde_json::Value::String(s) => s.parse::<f64>().ok()?,
151            _ => return None,
152        };
153
154        if price > 0.0 && size > 0.0 {
155            Some(PriceLevel::new(price, size))
156        } else {
157            None
158        }
159    }
160
161    async fn handle_orderbook_update(shared: Arc<RwLock<SharedState>>, data: OrderbookUpdateData) {
162        let market_slug = match data.market_slug {
163            Some(s) => s,
164            None => return,
165        };
166
167        let (raw_bids, raw_asks) = if let Some(ob) = data.orderbook {
168            (ob.bids.unwrap_or_default(), ob.asks.unwrap_or_default())
169        } else {
170            (data.bids.unwrap_or_default(), data.asks.unwrap_or_default())
171        };
172
173        let mut bids: Vec<PriceLevel> = raw_bids
174            .iter()
175            .filter_map(Self::parse_price_level)
176            .collect();
177        let mut asks: Vec<PriceLevel> = raw_asks
178            .iter()
179            .filter_map(Self::parse_price_level)
180            .collect();
181
182        bids.sort_by(|a, b| {
183            b.price
184                .partial_cmp(&a.price)
185                .unwrap_or(std::cmp::Ordering::Equal)
186        });
187        asks.sort_by(|a, b| {
188            a.price
189                .partial_cmp(&b.price)
190                .unwrap_or(std::cmp::Ordering::Equal)
191        });
192
193        let orderbook = Orderbook {
194            market_id: market_slug.clone(),
195            asset_id: market_slug.clone(),
196            bids,
197            asks,
198            last_update_id: None,
199            timestamp: Some(chrono::Utc::now()),
200        };
201
202        let mut shared = shared.write().await;
203        shared
204            .orderbooks
205            .insert(market_slug.clone(), orderbook.clone());
206
207        if let Some(sender) = shared.orderbook_senders.get(&market_slug) {
208            let _ = sender.send(Ok(orderbook));
209        }
210    }
211
212    async fn handle_price_update(shared: Arc<RwLock<SharedState>>, data: PriceUpdateData) {
213        let market_address = match data.market_address {
214            Some(a) => a,
215            None => return,
216        };
217
218        let prices = match data.updated_prices {
219            Some(p) => p,
220            None => return,
221        };
222
223        let yes_price = prices.yes.unwrap_or(0.0);
224        let no_price = prices.no.unwrap_or(0.0);
225
226        if yes_price <= 0.0 && no_price <= 0.0 {
227            return;
228        }
229
230        let mut bids = Vec::new();
231        let mut asks = Vec::new();
232
233        if yes_price > 0.0 {
234            bids.push(PriceLevel::new(yes_price, 1.0));
235        }
236        if no_price > 0.0 {
237            asks.push(PriceLevel::new(1.0 - no_price, 1.0));
238        }
239
240        let orderbook = Orderbook {
241            market_id: market_address.clone(),
242            asset_id: market_address.clone(),
243            bids,
244            asks,
245            last_update_id: None,
246            timestamp: Some(chrono::Utc::now()),
247        };
248
249        let mut shared = shared.write().await;
250        shared
251            .orderbooks
252            .insert(market_address.clone(), orderbook.clone());
253
254        if let Some(sender) = shared.orderbook_senders.get(&market_address) {
255            let _ = sender.send(Ok(orderbook));
256        }
257    }
258}
259
260impl Default for LimitlessWebSocket {
261    fn default() -> Self {
262        Self::new()
263    }
264}
265
266#[async_trait]
267impl OrderBookWebSocket for LimitlessWebSocket {
268    async fn connect(&mut self) -> Result<(), WebSocketError> {
269        self.set_state(WebSocketState::Connecting).await;
270
271        let shared = self.shared.clone();
272        let shared_orderbook = self.shared.clone();
273        let shared_price = self.shared.clone();
274        let shared_connect = self.shared.clone();
275        let shared_disconnect = self.shared.clone();
276
277        let client = ClientBuilder::new(WS_URL)
278            .namespace(NAMESPACE)
279            .on("connect", move |_, _| {
280                let shared = shared_connect.clone();
281                async move {
282                    let mut s = shared.write().await;
283                    s.ws_state = WebSocketState::Connected;
284                    tracing::debug!("Connected to Limitless WebSocket");
285                }
286                .boxed()
287            })
288            .on("disconnect", move |_, _| {
289                let shared = shared_disconnect.clone();
290                async move {
291                    let mut s = shared.write().await;
292                    s.ws_state = WebSocketState::Disconnected;
293                    tracing::debug!("Disconnected from Limitless WebSocket");
294                }
295                .boxed()
296            })
297            .on("orderbookUpdate", move |payload, _| {
298                let shared = shared_orderbook.clone();
299                async move {
300                    if let Payload::Text(values) = payload {
301                        for value in values {
302                            if let Ok(data) = serde_json::from_value::<OrderbookUpdateData>(value) {
303                                Self::handle_orderbook_update(shared.clone(), data).await;
304                            }
305                        }
306                    }
307                }
308                .boxed()
309            })
310            .on("newPriceData", move |payload, _| {
311                let shared = shared_price.clone();
312                async move {
313                    if let Payload::Text(values) = payload {
314                        for value in values {
315                            if let Ok(data) = serde_json::from_value::<PriceUpdateData>(value) {
316                                Self::handle_price_update(shared.clone(), data).await;
317                            }
318                        }
319                    }
320                }
321                .boxed()
322            })
323            .on("exception", |payload, _| {
324                async move {
325                    tracing::warn!("Limitless WebSocket exception: {:?}", payload);
326                }
327                .boxed()
328            })
329            .connect()
330            .await
331            .map_err(|e| WebSocketError::Connection(e.to_string()))?;
332
333        {
334            let mut client_guard = self.client.write().await;
335            *client_guard = Some(client);
336        }
337
338        {
339            let mut s = shared.write().await;
340            s.ws_state = WebSocketState::Connected;
341        }
342
343        self.send_subscription().await?;
344
345        Ok(())
346    }
347
348    async fn disconnect(&mut self) -> Result<(), WebSocketError> {
349        self.set_state(WebSocketState::Closed).await;
350
351        let mut client_guard = self.client.write().await;
352        if let Some(client) = client_guard.take() {
353            client
354                .disconnect()
355                .await
356                .map_err(|e| WebSocketError::Connection(e.to_string()))?;
357        }
358
359        Ok(())
360    }
361
362    async fn subscribe(&mut self, market_id: &str) -> Result<(), WebSocketError> {
363        {
364            let mut shared = self.shared.write().await;
365            if !shared.subscribed_slugs.contains(&market_id.to_string()) {
366                shared.subscribed_slugs.push(market_id.to_string());
367            }
368            if !shared.orderbook_senders.contains_key(market_id) {
369                let (tx, _) = broadcast::channel(100);
370                shared.orderbook_senders.insert(market_id.to_string(), tx);
371            }
372        }
373
374        let state = {
375            let shared = self.shared.read().await;
376            shared.ws_state
377        };
378
379        if state == WebSocketState::Connected {
380            self.send_subscription().await?;
381        }
382
383        Ok(())
384    }
385
386    async fn unsubscribe(&mut self, market_id: &str) -> Result<(), WebSocketError> {
387        {
388            let mut shared = self.shared.write().await;
389            shared.subscribed_slugs.retain(|s| s != market_id);
390            shared.subscribed_addresses.retain(|s| s != market_id);
391            shared.orderbook_senders.remove(market_id);
392            shared.orderbooks.remove(market_id);
393        }
394
395        let state = {
396            let shared = self.shared.read().await;
397            shared.ws_state
398        };
399
400        if state == WebSocketState::Connected {
401            self.send_subscription().await?;
402        }
403
404        Ok(())
405    }
406
407    fn state(&self) -> WebSocketState {
408        futures::executor::block_on(async {
409            let shared = self.shared.read().await;
410            shared.ws_state
411        })
412    }
413
414    async fn orderbook_stream(
415        &mut self,
416        market_id: &str,
417    ) -> Result<OrderbookStream, WebSocketError> {
418        let shared = self.shared.read().await;
419        let sender = shared.orderbook_senders.get(market_id).ok_or_else(|| {
420            WebSocketError::Subscription(format!("not subscribed to {market_id}"))
421        })?;
422
423        let rx = sender.subscribe();
424
425        Ok(Box::pin(
426            tokio_stream::wrappers::BroadcastStream::new(rx)
427                .filter_map(|result| async move { result.ok() }),
428        ))
429    }
430}
431
432impl LimitlessWebSocket {
433    pub async fn subscribe_market_address(
434        &mut self,
435        market_address: &str,
436    ) -> Result<(), WebSocketError> {
437        {
438            let mut shared = self.shared.write().await;
439            if !shared
440                .subscribed_addresses
441                .contains(&market_address.to_string())
442            {
443                shared.subscribed_addresses.push(market_address.to_string());
444            }
445            if !shared.orderbook_senders.contains_key(market_address) {
446                let (tx, _) = broadcast::channel(100);
447                shared
448                    .orderbook_senders
449                    .insert(market_address.to_string(), tx);
450            }
451        }
452
453        let state = {
454            let shared = self.shared.read().await;
455            shared.ws_state
456        };
457
458        if state == WebSocketState::Connected {
459            self.send_subscription().await?;
460        }
461
462        Ok(())
463    }
464
465    pub async fn get_orderbook(&self, market_id: &str) -> Option<Orderbook> {
466        let shared = self.shared.read().await;
467        shared.orderbooks.get(market_id).cloned()
468    }
469}