1use async_trait::async_trait;
2use futures::{FutureExt, StreamExt};
3use rust_socketio::{
4 asynchronous::{Client, ClientBuilder},
5 Payload,
6};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11
12use drm_core::{
13 OrderBookWebSocket, Orderbook, OrderbookStream, PriceLevel, WebSocketError, WebSocketState,
14};
15
16const WS_URL: &str = "wss://ws.limitless.exchange";
17const NAMESPACE: &str = "/markets";
18
19#[derive(Debug, Clone, Serialize)]
20struct SubscribePayload {
21 #[serde(rename = "marketSlugs", skip_serializing_if = "Vec::is_empty")]
22 market_slugs: Vec<String>,
23 #[serde(rename = "marketAddresses", skip_serializing_if = "Vec::is_empty")]
24 market_addresses: Vec<String>,
25}
26
27#[derive(Debug, Clone, Deserialize)]
28struct OrderbookUpdateData {
29 #[serde(rename = "marketSlug", alias = "slug")]
30 market_slug: Option<String>,
31 orderbook: Option<OrderbookData>,
32 bids: Option<Vec<PriceLevelData>>,
33 asks: Option<Vec<PriceLevelData>>,
34 #[allow(dead_code)]
35 timestamp: Option<serde_json::Value>,
36}
37
38#[derive(Debug, Clone, Deserialize)]
39struct OrderbookData {
40 bids: Option<Vec<PriceLevelData>>,
41 asks: Option<Vec<PriceLevelData>>,
42}
43
44#[derive(Debug, Clone, Deserialize)]
45struct PriceLevelData {
46 price: serde_json::Value,
47 size: serde_json::Value,
48}
49
50#[derive(Debug, Clone, Deserialize)]
51struct PriceUpdateData {
52 #[serde(rename = "marketAddress")]
53 market_address: Option<String>,
54 #[serde(rename = "updatedPrices")]
55 updated_prices: Option<PriceData>,
56 #[serde(rename = "blockNumber")]
57 #[allow(dead_code)]
58 block_number: Option<i64>,
59}
60
61#[derive(Debug, Clone, Deserialize)]
62struct PriceData {
63 yes: Option<f64>,
64 no: Option<f64>,
65}
66
67type OrderbookSender = broadcast::Sender<Result<Orderbook, WebSocketError>>;
68
69struct SharedState {
70 ws_state: WebSocketState,
71 subscribed_slugs: Vec<String>,
72 subscribed_addresses: Vec<String>,
73 orderbook_senders: HashMap<String, OrderbookSender>,
74 orderbooks: HashMap<String, Orderbook>,
75}
76
77impl SharedState {
78 fn new() -> Self {
79 Self {
80 ws_state: WebSocketState::Disconnected,
81 subscribed_slugs: Vec::new(),
82 subscribed_addresses: Vec::new(),
83 orderbook_senders: HashMap::new(),
84 orderbooks: HashMap::new(),
85 }
86 }
87}
88
89pub struct LimitlessWebSocket {
90 shared: Arc<RwLock<SharedState>>,
91 client: Arc<RwLock<Option<Client>>>,
92 #[allow(dead_code)]
93 auto_reconnect: bool,
94}
95
96impl LimitlessWebSocket {
97 pub fn new() -> Self {
98 Self::with_config(true)
99 }
100
101 pub fn with_config(auto_reconnect: bool) -> Self {
102 Self {
103 shared: Arc::new(RwLock::new(SharedState::new())),
104 client: Arc::new(RwLock::new(None)),
105 auto_reconnect,
106 }
107 }
108
109 async fn set_state(&self, new_state: WebSocketState) {
110 let mut shared = self.shared.write().await;
111 shared.ws_state = new_state;
112 }
113
114 async fn send_subscription(&self) -> Result<(), WebSocketError> {
115 let client_guard = self.client.read().await;
116 let client = client_guard
117 .as_ref()
118 .ok_or_else(|| WebSocketError::Connection("not connected".into()))?;
119
120 let shared = self.shared.read().await;
121 if shared.subscribed_slugs.is_empty() && shared.subscribed_addresses.is_empty() {
122 return Ok(());
123 }
124
125 let payload = SubscribePayload {
126 market_slugs: shared.subscribed_slugs.clone(),
127 market_addresses: shared.subscribed_addresses.clone(),
128 };
129
130 let json =
131 serde_json::to_value(&payload).map_err(|e| WebSocketError::Protocol(e.to_string()))?;
132
133 client
134 .emit("subscribe_market_prices", json)
135 .await
136 .map_err(|e| WebSocketError::Connection(e.to_string()))?;
137
138 Ok(())
139 }
140
141 fn parse_price_level(data: &PriceLevelData) -> Option<PriceLevel> {
142 let price = match &data.price {
143 serde_json::Value::Number(n) => n.as_f64()?,
144 serde_json::Value::String(s) => s.parse::<f64>().ok()?,
145 _ => return None,
146 };
147
148 let size = match &data.size {
149 serde_json::Value::Number(n) => n.as_f64()?,
150 serde_json::Value::String(s) => s.parse::<f64>().ok()?,
151 _ => return None,
152 };
153
154 if price > 0.0 && size > 0.0 {
155 Some(PriceLevel::new(price, size))
156 } else {
157 None
158 }
159 }
160
161 async fn handle_orderbook_update(shared: Arc<RwLock<SharedState>>, data: OrderbookUpdateData) {
162 let market_slug = match data.market_slug {
163 Some(s) => s,
164 None => return,
165 };
166
167 let (raw_bids, raw_asks) = if let Some(ob) = data.orderbook {
168 (ob.bids.unwrap_or_default(), ob.asks.unwrap_or_default())
169 } else {
170 (data.bids.unwrap_or_default(), data.asks.unwrap_or_default())
171 };
172
173 let mut bids: Vec<PriceLevel> = raw_bids
174 .iter()
175 .filter_map(Self::parse_price_level)
176 .collect();
177 let mut asks: Vec<PriceLevel> = raw_asks
178 .iter()
179 .filter_map(Self::parse_price_level)
180 .collect();
181
182 bids.sort_by(|a, b| {
183 b.price
184 .partial_cmp(&a.price)
185 .unwrap_or(std::cmp::Ordering::Equal)
186 });
187 asks.sort_by(|a, b| {
188 a.price
189 .partial_cmp(&b.price)
190 .unwrap_or(std::cmp::Ordering::Equal)
191 });
192
193 let orderbook = Orderbook {
194 market_id: market_slug.clone(),
195 asset_id: market_slug.clone(),
196 bids,
197 asks,
198 last_update_id: None,
199 timestamp: Some(chrono::Utc::now()),
200 };
201
202 let mut shared = shared.write().await;
203 shared
204 .orderbooks
205 .insert(market_slug.clone(), orderbook.clone());
206
207 if let Some(sender) = shared.orderbook_senders.get(&market_slug) {
208 let _ = sender.send(Ok(orderbook));
209 }
210 }
211
212 async fn handle_price_update(shared: Arc<RwLock<SharedState>>, data: PriceUpdateData) {
213 let market_address = match data.market_address {
214 Some(a) => a,
215 None => return,
216 };
217
218 let prices = match data.updated_prices {
219 Some(p) => p,
220 None => return,
221 };
222
223 let yes_price = prices.yes.unwrap_or(0.0);
224 let no_price = prices.no.unwrap_or(0.0);
225
226 if yes_price <= 0.0 && no_price <= 0.0 {
227 return;
228 }
229
230 let mut bids = Vec::new();
231 let mut asks = Vec::new();
232
233 if yes_price > 0.0 {
234 bids.push(PriceLevel::new(yes_price, 1.0));
235 }
236 if no_price > 0.0 {
237 asks.push(PriceLevel::new(1.0 - no_price, 1.0));
238 }
239
240 let orderbook = Orderbook {
241 market_id: market_address.clone(),
242 asset_id: market_address.clone(),
243 bids,
244 asks,
245 last_update_id: None,
246 timestamp: Some(chrono::Utc::now()),
247 };
248
249 let mut shared = shared.write().await;
250 shared
251 .orderbooks
252 .insert(market_address.clone(), orderbook.clone());
253
254 if let Some(sender) = shared.orderbook_senders.get(&market_address) {
255 let _ = sender.send(Ok(orderbook));
256 }
257 }
258}
259
260impl Default for LimitlessWebSocket {
261 fn default() -> Self {
262 Self::new()
263 }
264}
265
266#[async_trait]
267impl OrderBookWebSocket for LimitlessWebSocket {
268 async fn connect(&mut self) -> Result<(), WebSocketError> {
269 self.set_state(WebSocketState::Connecting).await;
270
271 let shared = self.shared.clone();
272 let shared_orderbook = self.shared.clone();
273 let shared_price = self.shared.clone();
274 let shared_connect = self.shared.clone();
275 let shared_disconnect = self.shared.clone();
276
277 let client = ClientBuilder::new(WS_URL)
278 .namespace(NAMESPACE)
279 .on("connect", move |_, _| {
280 let shared = shared_connect.clone();
281 async move {
282 let mut s = shared.write().await;
283 s.ws_state = WebSocketState::Connected;
284 tracing::debug!("Connected to Limitless WebSocket");
285 }
286 .boxed()
287 })
288 .on("disconnect", move |_, _| {
289 let shared = shared_disconnect.clone();
290 async move {
291 let mut s = shared.write().await;
292 s.ws_state = WebSocketState::Disconnected;
293 tracing::debug!("Disconnected from Limitless WebSocket");
294 }
295 .boxed()
296 })
297 .on("orderbookUpdate", move |payload, _| {
298 let shared = shared_orderbook.clone();
299 async move {
300 if let Payload::Text(values) = payload {
301 for value in values {
302 if let Ok(data) = serde_json::from_value::<OrderbookUpdateData>(value) {
303 Self::handle_orderbook_update(shared.clone(), data).await;
304 }
305 }
306 }
307 }
308 .boxed()
309 })
310 .on("newPriceData", move |payload, _| {
311 let shared = shared_price.clone();
312 async move {
313 if let Payload::Text(values) = payload {
314 for value in values {
315 if let Ok(data) = serde_json::from_value::<PriceUpdateData>(value) {
316 Self::handle_price_update(shared.clone(), data).await;
317 }
318 }
319 }
320 }
321 .boxed()
322 })
323 .on("exception", |payload, _| {
324 async move {
325 tracing::warn!("Limitless WebSocket exception: {:?}", payload);
326 }
327 .boxed()
328 })
329 .connect()
330 .await
331 .map_err(|e| WebSocketError::Connection(e.to_string()))?;
332
333 {
334 let mut client_guard = self.client.write().await;
335 *client_guard = Some(client);
336 }
337
338 {
339 let mut s = shared.write().await;
340 s.ws_state = WebSocketState::Connected;
341 }
342
343 self.send_subscription().await?;
344
345 Ok(())
346 }
347
348 async fn disconnect(&mut self) -> Result<(), WebSocketError> {
349 self.set_state(WebSocketState::Closed).await;
350
351 let mut client_guard = self.client.write().await;
352 if let Some(client) = client_guard.take() {
353 client
354 .disconnect()
355 .await
356 .map_err(|e| WebSocketError::Connection(e.to_string()))?;
357 }
358
359 Ok(())
360 }
361
362 async fn subscribe(&mut self, market_id: &str) -> Result<(), WebSocketError> {
363 {
364 let mut shared = self.shared.write().await;
365 if !shared.subscribed_slugs.contains(&market_id.to_string()) {
366 shared.subscribed_slugs.push(market_id.to_string());
367 }
368 if !shared.orderbook_senders.contains_key(market_id) {
369 let (tx, _) = broadcast::channel(100);
370 shared.orderbook_senders.insert(market_id.to_string(), tx);
371 }
372 }
373
374 let state = {
375 let shared = self.shared.read().await;
376 shared.ws_state
377 };
378
379 if state == WebSocketState::Connected {
380 self.send_subscription().await?;
381 }
382
383 Ok(())
384 }
385
386 async fn unsubscribe(&mut self, market_id: &str) -> Result<(), WebSocketError> {
387 {
388 let mut shared = self.shared.write().await;
389 shared.subscribed_slugs.retain(|s| s != market_id);
390 shared.subscribed_addresses.retain(|s| s != market_id);
391 shared.orderbook_senders.remove(market_id);
392 shared.orderbooks.remove(market_id);
393 }
394
395 let state = {
396 let shared = self.shared.read().await;
397 shared.ws_state
398 };
399
400 if state == WebSocketState::Connected {
401 self.send_subscription().await?;
402 }
403
404 Ok(())
405 }
406
407 fn state(&self) -> WebSocketState {
408 futures::executor::block_on(async {
409 let shared = self.shared.read().await;
410 shared.ws_state
411 })
412 }
413
414 async fn orderbook_stream(
415 &mut self,
416 market_id: &str,
417 ) -> Result<OrderbookStream, WebSocketError> {
418 let shared = self.shared.read().await;
419 let sender = shared.orderbook_senders.get(market_id).ok_or_else(|| {
420 WebSocketError::Subscription(format!("not subscribed to {market_id}"))
421 })?;
422
423 let rx = sender.subscribe();
424
425 Ok(Box::pin(
426 tokio_stream::wrappers::BroadcastStream::new(rx)
427 .filter_map(|result| async move { result.ok() }),
428 ))
429 }
430}
431
432impl LimitlessWebSocket {
433 pub async fn subscribe_market_address(
434 &mut self,
435 market_address: &str,
436 ) -> Result<(), WebSocketError> {
437 {
438 let mut shared = self.shared.write().await;
439 if !shared
440 .subscribed_addresses
441 .contains(&market_address.to_string())
442 {
443 shared.subscribed_addresses.push(market_address.to_string());
444 }
445 if !shared.orderbook_senders.contains_key(market_address) {
446 let (tx, _) = broadcast::channel(100);
447 shared
448 .orderbook_senders
449 .insert(market_address.to_string(), tx);
450 }
451 }
452
453 let state = {
454 let shared = self.shared.read().await;
455 shared.ws_state
456 };
457
458 if state == WebSocketState::Connected {
459 self.send_subscription().await?;
460 }
461
462 Ok(())
463 }
464
465 pub async fn get_orderbook(&self, market_id: &str) -> Option<Orderbook> {
466 let shared = self.shared.read().await;
467 shared.orderbooks.get(market_id).cloned()
468 }
469}