Skip to main content

limitless/ws/
channel.rs

1//! WebSocket types — channels, config, state, and subscription options.
2//!
3//! These model the Limitless Exchange WebSocket API's subscription channels
4//! and payloads without depending on any Socket.IO protocol.
5
6use crate::prelude::*;
7use serde::{Deserialize, Serialize};
8use std::collections::BTreeMap;
9
10// ═══════════════════════════════════════════════════════════════════════════
11//  Connection state
12// ═══════════════════════════════════════════════════════════════════════════
13
14/// Tracks the lifecycle of a WebSocket connection.
15#[derive(Clone, Copy, Debug, PartialEq, Eq)]
16pub enum WebSocketState {
17    /// Not connected and not trying.
18    Disconnected,
19    /// Currently performing the initial handshake.
20    Connecting,
21    /// Connected and receiving events.
22    Connected,
23    /// Temporarily disconnected; attempting to re-establish.
24    Reconnecting,
25    /// Connection failed and will not be retried.
26    Error,
27}
28
29// ═══════════════════════════════════════════════════════════════════════════
30//  Subscription channels
31// ═══════════════════════════════════════════════════════════════════════════
32
33/// Identifies a WebSocket subscription target on the Limitless Exchange.
34///
35/// Variants prefixed with `Subscribe` / `Unsubscribe` represent client →
36/// server subscription requests. The non-prefixed variants are server-emitted
37/// event names used for dispatching incoming messages.
38#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
39pub enum SubscriptionChannel {
40    // ── Server → Client event names ──
41    /// CLOB orderbook snapshots (`orderbook`).
42    Orderbook,
43    /// Public trade feed (`trades`).
44    Trades,
45    /// Order status updates (`orders`).
46    Orders,
47    /// Fill notifications (`fills`).
48    Fills,
49    /// Market statistics (`markets`).
50    Markets,
51    /// Aggregated price feed (`prices`).
52    Prices,
53    /// Portfolio position updates (`positions`).
54    Positions,
55    /// Blockchain transaction events (`transactions`).
56    Transactions,
57    /// OME + settlement lifecycle events (`orderEvent`).
58    OrderEvents,
59    /// Live sports data (`liveSports`).
60    LiveSports,
61    /// Live esports data (`liveEsports`).
62    LiveEsports,
63    /// Market creation / resolution events (`marketLifecycle`).
64    MarketLifecycle,
65
66    // ── Client → Server subscription requests ──
67    /// Subscribe to AMM prices + CLOB orderbook.
68    SubscribeMarketPrices,
69    /// Subscribe to portfolio position updates (requires auth).
70    SubscribePositions,
71    /// Subscribe to blockchain transaction events (requires auth).
72    SubscribeTransactions,
73    /// Subscribe to OME + settlement lifecycle events (requires auth).
74    SubscribeOrderEvents,
75    /// Subscribe to live sports data.
76    SubscribeLiveSports,
77    /// Subscribe to live esports data.
78    SubscribeLiveEsports,
79    /// Subscribe to market creation / resolution events.
80    SubscribeMarketLifecycle,
81    /// Unsubscribe from market lifecycle events.
82    UnsubscribeMarketLifecycle,
83}
84
85impl SubscriptionChannel {
86    /// Returns the wire-protocol string for this channel.
87    pub fn as_str(&self) -> &'static str {
88        match self {
89            Self::Orderbook => "orderbook",
90            Self::Trades => "trades",
91            Self::Orders => "orders",
92            Self::Fills => "fills",
93            Self::Markets => "markets",
94            Self::Prices => "prices",
95            Self::Positions => "positions",
96            Self::Transactions => "transactions",
97            Self::OrderEvents => "orderEvent",
98            Self::LiveSports => "liveSports",
99            Self::LiveEsports => "liveEsports",
100            Self::MarketLifecycle => "marketLifecycle",
101            Self::SubscribeMarketPrices => "subscribe_market_prices",
102            Self::SubscribePositions => "subscribe_positions",
103            Self::SubscribeTransactions => "subscribe_transactions",
104            Self::SubscribeOrderEvents => "subscribe_order_events",
105            Self::SubscribeLiveSports => "subscribe_live_sports",
106            Self::SubscribeLiveEsports => "subscribe_live_esports",
107            Self::SubscribeMarketLifecycle => "subscribe_market_lifecycle",
108            Self::UnsubscribeMarketLifecycle => "unsubscribe_market_lifecycle",
109        }
110    }
111}
112
113// ═══════════════════════════════════════════════════════════════════════════
114//  Subscription options
115// ═══════════════════════════════════════════════════════════════════════════
116
117/// Parameters supplied when subscribing to a channel.
118#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
119pub struct SubscriptionOptions {
120    /// A single market slug (for channels that accept one).
121    #[serde(
122        rename = "marketSlug",
123        skip_serializing_if = "Option::is_none",
124        default
125    )]
126    pub market_slug: Option<String>,
127
128    /// One or more market slugs (for multi-market subscriptions).
129    #[serde(rename = "marketSlugs", skip_serializing_if = "Vec::is_empty", default)]
130    pub market_slugs: Vec<String>,
131
132    /// A single on-chain market address.
133    #[serde(
134        rename = "marketAddress",
135        skip_serializing_if = "Option::is_none",
136        default
137    )]
138    pub market_address: Option<String>,
139
140    /// One or more on-chain market addresses.
141    #[serde(
142        rename = "marketAddresses",
143        skip_serializing_if = "Vec::is_empty",
144        default
145    )]
146    pub market_addresses: Vec<String>,
147
148    /// Arbitrary server-side filters (channel-dependent).
149    #[serde(skip_serializing_if = "BTreeMap::is_empty", default)]
150    pub filters: BTreeMap<String, Value>,
151}
152
153// ═══════════════════════════════════════════════════════════════════════════
154//  WebSocket config
155// ═══════════════════════════════════════════════════════════════════════════
156
157/// Configuration for the Limitless WebSocket connection.
158#[derive(Clone, Debug)]
159pub struct WebSocketConfig {
160    /// The WebSocket endpoint URL.
161    pub url: String,
162    /// Optional API key / token ID for authenticated streams.
163    pub api_key: Option<String>,
164    /// Whether to automatically reconnect on disconnection.
165    pub auto_reconnect: bool,
166    /// Delay (in milliseconds) before each reconnection attempt.
167    pub reconnect_delay_ms: u64,
168    /// Maximum number of reconnection attempts (0 = unlimited).
169    pub max_reconnect_attempts: u32,
170    /// Connection and read timeout (in milliseconds).
171    pub timeout_ms: u64,
172}
173
174impl Default for WebSocketConfig {
175    fn default() -> Self {
176        Self {
177            url: "wss://ws.limitless.exchange/markets".to_string(),
178            api_key: std::env::var("LIMITLESS_API_KEY").ok(),
179            auto_reconnect: true,
180            reconnect_delay_ms: 1_000,
181            max_reconnect_attempts: 0,
182            timeout_ms: 10_000,
183        }
184    }
185}
186
187// ═══════════════════════════════════════════════════════════════════════════
188//  FlexFloat — handles string-encoded floats in WS payloads
189// ═══════════════════════════════════════════════════════════════════════════
190
191/// A flexible `f64` that deserializes from both JSON numbers and strings.
192///
193/// The Limitless WebSocket occasionally encodes numeric fields as strings
194/// (e.g., `"0.55"` instead of `0.55`). This wrapper handles both formats
195/// transparently.
196#[derive(Clone, Copy, Debug, PartialEq)]
197pub struct FlexFloat(pub f64);
198
199impl FlexFloat {
200    /// Extract the inner `f64`.
201    pub fn float64(self) -> f64 {
202        self.0
203    }
204}
205
206impl<'de> Deserialize<'de> for FlexFloat {
207    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
208    where
209        D: serde::Deserializer<'de>,
210    {
211        match Value::deserialize(deserializer)? {
212            Value::Number(n) => n
213                .as_f64()
214                .map(Self)
215                .ok_or_else(|| serde::de::Error::custom("expected f64-compatible number")),
216            Value::String(s) => s.parse::<f64>().map(Self).map_err(|err| {
217                serde::de::Error::custom(format!("cannot parse float '{s}': {err}"))
218            }),
219            other => Err(serde::de::Error::custom(format!(
220                "cannot deserialize FlexFloat from {other}"
221            ))),
222        }
223    }
224}
225
226impl Serialize for FlexFloat {
227    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
228    where
229        S: serde::Serializer,
230    {
231        serializer.serialize_f64(self.0)
232    }
233}
234
235// ═══════════════════════════════════════════════════════════════════════════
236//  WebSocket event payloads
237// ═══════════════════════════════════════════════════════════════════════════
238
239/// Generic WebSocket event — used as a fallback when the event type is
240/// not recognized by the typed dispatch.
241pub type WsEvent = Value;
242
243// ── Orderbook ────────────────────────────────────────────────────────────
244
245/// A single level in the orderbook (bid or ask).
246#[derive(Clone, Debug, Serialize, Deserialize)]
247pub struct OrderbookLevel {
248    pub price: f64,
249    pub size: f64,
250}
251
252/// Full CLOB orderbook snapshot for a market.
253#[derive(Clone, Debug, Serialize, Deserialize)]
254pub struct OrderbookData {
255    pub bids: Vec<OrderbookLevel>,
256    pub asks: Vec<OrderbookLevel>,
257    #[serde(rename = "tokenId")]
258    pub token_id: String,
259    #[serde(rename = "adjustedMidpoint")]
260    pub adjusted_midpoint: f64,
261    #[serde(rename = "maxSpread")]
262    pub max_spread: FlexFloat,
263    #[serde(rename = "minSize")]
264    pub min_size: FlexFloat,
265}
266
267/// Server-emitted orderbook update event.
268#[derive(Clone, Debug, Serialize, Deserialize)]
269pub struct OrderbookUpdate {
270    #[serde(rename = "marketSlug")]
271    pub market_slug: String,
272    pub orderbook: OrderbookData,
273    pub timestamp: Value,
274}
275
276// ── Trades ───────────────────────────────────────────────────────────────
277
278/// A single public trade.
279#[derive(Clone, Debug, Serialize, Deserialize)]
280pub struct TradeEvent {
281    #[serde(rename = "marketSlug")]
282    pub market_slug: String,
283    pub side: String,
284    pub price: f64,
285    pub size: f64,
286    pub timestamp: f64,
287    #[serde(rename = "tradeId")]
288    pub trade_id: String,
289}
290
291// ── Orders ───────────────────────────────────────────────────────────────
292
293/// An order status update emitted by the server.
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub struct OrderUpdate {
296    #[serde(rename = "orderId")]
297    pub order_id: String,
298    #[serde(rename = "marketSlug")]
299    pub market_slug: String,
300    pub side: String,
301    #[serde(default)]
302    pub price: Option<f64>,
303    pub size: f64,
304    pub filled: f64,
305    pub status: String,
306    pub timestamp: f64,
307}
308
309// ── Fills ────────────────────────────────────────────────────────────────
310
311/// A fill (matched trade) notification.
312#[derive(Clone, Debug, Serialize, Deserialize)]
313pub struct FillEvent {
314    #[serde(rename = "orderId")]
315    pub order_id: String,
316    #[serde(rename = "marketSlug")]
317    pub market_slug: String,
318    pub side: String,
319    pub price: f64,
320    pub size: f64,
321    pub timestamp: f64,
322    #[serde(rename = "fillId")]
323    pub fill_id: String,
324}
325
326// ── Market stats ─────────────────────────────────────────────────────────
327
328/// Periodic market-level statistics update.
329#[derive(Clone, Debug, Serialize, Deserialize)]
330pub struct MarketUpdateEvent {
331    #[serde(rename = "marketSlug")]
332    pub market_slug: String,
333    #[serde(rename = "lastPrice", default)]
334    pub last_price: Option<f64>,
335    #[serde(rename = "volume24h", default)]
336    pub volume_24h: Option<f64>,
337    #[serde(rename = "priceChange24h", default)]
338    pub price_change_24h: Option<f64>,
339    pub timestamp: f64,
340}
341
342// ── AMM prices ───────────────────────────────────────────────────────────
343
344/// A per-market AMM price entry within a `NewPriceData` payload.
345#[derive(Clone, Debug, Serialize, Deserialize)]
346pub struct AmmPriceEntry {
347    #[serde(rename = "marketId")]
348    pub market_id: i32,
349    #[serde(rename = "marketAddress")]
350    pub market_address: String,
351    #[serde(rename = "yesPrice")]
352    pub yes_price: f64,
353    #[serde(rename = "noPrice")]
354    pub no_price: f64,
355}
356
357/// Server-emitted AMM price update (the `newPriceData` event).
358#[derive(Clone, Debug, Serialize, Deserialize)]
359pub struct NewPriceData {
360    #[serde(rename = "marketAddress")]
361    pub market_address: String,
362    #[serde(rename = "updatedPrices")]
363    pub updated_prices: Vec<AmmPriceEntry>,
364    #[serde(rename = "blockNumber")]
365    pub block_number: i64,
366    pub timestamp: Value,
367}
368
369// ── Oracle prices ────────────────────────────────────────────────────────
370
371/// Oracle price data for a market.
372#[derive(Clone, Debug, Serialize, Deserialize)]
373pub struct OraclePriceData {
374    #[serde(rename = "marketAddress", default)]
375    pub market_address: Option<String>,
376    #[serde(rename = "marketSlug")]
377    pub market_slug: String,
378    pub timestamp: i64,
379    pub value: f64,
380}
381
382// ── Transactions ─────────────────────────────────────────────────────────
383
384/// On-chain transaction event (deposit, withdrawal, trade settlement).
385#[derive(Clone, Debug, Serialize, Deserialize)]
386pub struct TransactionEvent {
387    #[serde(rename = "userId", default)]
388    pub user_id: Option<i32>,
389    #[serde(rename = "txHash", default)]
390    pub tx_hash: Option<String>,
391    pub status: String,
392    pub source: String,
393    pub timestamp: String,
394    #[serde(rename = "marketAddress", default)]
395    pub market_address: Option<String>,
396    #[serde(rename = "marketSlug", default)]
397    pub market_slug: Option<String>,
398    #[serde(rename = "tokenId", default)]
399    pub token_id: Option<String>,
400    #[serde(rename = "conditionId", default)]
401    pub condition_id: Option<String>,
402    #[serde(rename = "amountContracts", default)]
403    pub amount_contracts: Option<String>,
404    #[serde(rename = "amountCollateral", default)]
405    pub amount_collateral: Option<String>,
406    #[serde(default)]
407    pub price: Option<String>,
408    #[serde(default)]
409    pub side: Option<String>,
410}
411
412// ── Market lifecycle ─────────────────────────────────────────────────────
413
414/// Emitted when a new market is created and funded.
415#[derive(Clone, Debug, Serialize, Deserialize)]
416pub struct MarketCreatedEvent {
417    pub slug: String,
418    pub title: String,
419    #[serde(rename = "type")]
420    pub market_type: String,
421    #[serde(rename = "groupSlug", default)]
422    pub group_slug: Option<String>,
423    #[serde(rename = "categoryIds", default)]
424    pub category_ids: Vec<i32>,
425    #[serde(rename = "createdAt")]
426    pub created_at: String,
427}
428
429/// Emitted when a market is resolved with a winning outcome.
430#[derive(Clone, Debug, Serialize, Deserialize)]
431pub struct MarketResolvedEvent {
432    pub slug: String,
433    #[serde(rename = "type")]
434    pub market_type: String,
435    #[serde(rename = "winningOutcome")]
436    pub winning_outcome: String,
437    #[serde(rename = "winningIndex")]
438    pub winning_index: i32,
439    #[serde(rename = "resolutionDate")]
440    pub resolution_date: String,
441}
442
443// ═══════════════════════════════════════════════════════════════════════════
444//  Helpers
445// ═══════════════════════════════════════════════════════════════════════════
446
447/// Normalize `SubscriptionOptions` by copying the singular `market_slug` /
448/// `market_address` into the plural vecs when the plural vecs are empty.
449///
450/// This mirrors the reference SDK behaviour so that callers can supply
451/// either form.
452pub fn normalize_subscription_options(opts: SubscriptionOptions) -> SubscriptionOptions {
453    let mut opts = opts;
454    if opts.market_slugs.is_empty() {
455        if let Some(ref slug) = opts.market_slug {
456            opts.market_slugs = vec![slug.clone()];
457        }
458    }
459    if opts.market_addresses.is_empty() {
460        if let Some(ref addr) = opts.market_address {
461            opts.market_addresses = vec![addr.clone()];
462        }
463    }
464    opts
465}
466
467/// Build a deterministic key for a `(channel, options)` pair, suitable for
468/// tracking active subscriptions.
469pub fn subscription_key(channel: SubscriptionChannel, opts: &SubscriptionOptions) -> String {
470    let slugs = if opts.market_slugs.is_empty() {
471        String::new()
472    } else {
473        let mut sorted: Vec<&str> = opts.market_slugs.iter().map(String::as_str).collect();
474        sorted.sort_unstable();
475        sorted.join(",")
476    };
477
478    let addresses = if opts.market_addresses.is_empty() {
479        String::new()
480    } else {
481        let mut sorted: Vec<&str> = opts.market_addresses.iter().map(String::as_str).collect();
482        sorted.sort_unstable();
483        sorted.join(",")
484    };
485
486    format!("{}|{}|{}", channel.as_str(), slugs, addresses)
487}
488
489/// Attempt to recover a `SubscriptionChannel` from its wire-protocol string.
490pub fn channel_from_key(key: &str) -> Option<SubscriptionChannel> {
491    // The key format is "channel|slugs|addresses" — extract the channel part.
492    let channel_str = key.split('|').next().unwrap_or(key);
493    match channel_str {
494        "orderbook" => Some(SubscriptionChannel::Orderbook),
495        "trades" => Some(SubscriptionChannel::Trades),
496        "orders" => Some(SubscriptionChannel::Orders),
497        "fills" => Some(SubscriptionChannel::Fills),
498        "markets" => Some(SubscriptionChannel::Markets),
499        "prices" => Some(SubscriptionChannel::Prices),
500        "positions" => Some(SubscriptionChannel::Positions),
501        "transactions" => Some(SubscriptionChannel::Transactions),
502        "orderEvent" => Some(SubscriptionChannel::OrderEvents),
503        "liveSports" => Some(SubscriptionChannel::LiveSports),
504        "liveEsports" => Some(SubscriptionChannel::LiveEsports),
505        "marketLifecycle" => Some(SubscriptionChannel::MarketLifecycle),
506        "subscribe_market_prices" => Some(SubscriptionChannel::SubscribeMarketPrices),
507        "subscribe_positions" => Some(SubscriptionChannel::SubscribePositions),
508        "subscribe_transactions" => Some(SubscriptionChannel::SubscribeTransactions),
509        "subscribe_order_events" => Some(SubscriptionChannel::SubscribeOrderEvents),
510        "subscribe_live_sports" => Some(SubscriptionChannel::SubscribeLiveSports),
511        "subscribe_live_esports" => Some(SubscriptionChannel::SubscribeLiveEsports),
512        "subscribe_market_lifecycle" => Some(SubscriptionChannel::SubscribeMarketLifecycle),
513        "unsubscribe_market_lifecycle" => Some(SubscriptionChannel::UnsubscribeMarketLifecycle),
514        _ => None,
515    }
516}
517
518/// Returns `true` when the given channel requires API-key authentication.
519pub fn requires_websocket_auth(channel: SubscriptionChannel) -> bool {
520    matches!(
521        channel,
522        SubscriptionChannel::SubscribePositions
523            | SubscriptionChannel::SubscribeTransactions
524            | SubscriptionChannel::SubscribeOrderEvents
525    )
526}
527
528// ═══════════════════════════════════════════════════════════════════════════
529//  Tests
530// ═══════════════════════════════════════════════════════════════════════════
531
532#[cfg(test)]
533mod tests {
534    use super::*;
535
536    #[test]
537    fn subscription_key_is_order_independent() {
538        let opts_a = SubscriptionOptions {
539            market_slugs: vec!["btc-above-100k".into(), "eth-merge".into()],
540            ..Default::default()
541        };
542        let opts_b = SubscriptionOptions {
543            market_slugs: vec!["eth-merge".into(), "btc-above-100k".into()],
544            ..Default::default()
545        };
546        assert_eq!(
547            subscription_key(SubscriptionChannel::SubscribeMarketPrices, &opts_a),
548            subscription_key(SubscriptionChannel::SubscribeMarketPrices, &opts_b),
549        );
550    }
551
552    #[test]
553    fn normalize_copies_singular_into_plural() {
554        let opts = SubscriptionOptions {
555            market_slug: Some("test-slug".into()),
556            market_address: Some("0xdead".into()),
557            ..Default::default()
558        };
559        let normalized = normalize_subscription_options(opts);
560        assert_eq!(normalized.market_slugs, vec!["test-slug"]);
561        assert_eq!(normalized.market_addresses, vec!["0xdead"]);
562    }
563
564    #[test]
565    fn normalize_preserves_existing_plurals() {
566        let opts = SubscriptionOptions {
567            market_slugs: vec!["existing".into()],
568            ..Default::default()
569        };
570        let normalized = normalize_subscription_options(opts);
571        assert_eq!(normalized.market_slugs, vec!["existing"]);
572    }
573
574    #[test]
575    fn channel_from_key_roundtrips() {
576        for channel in &[
577            SubscriptionChannel::Orderbook,
578            SubscriptionChannel::Trades,
579            SubscriptionChannel::SubscribeMarketPrices,
580            SubscriptionChannel::SubscribePositions,
581            SubscriptionChannel::OrderEvents,
582            SubscriptionChannel::MarketLifecycle,
583        ] {
584            let key = subscription_key(*channel, &SubscriptionOptions::default());
585            let recovered = channel_from_key(&key);
586            assert_eq!(
587                recovered,
588                Some(*channel),
589                "round-trip failed for {channel:?}"
590            );
591        }
592    }
593
594    #[test]
595    fn requires_auth_returns_true_for_private_channels() {
596        assert!(requires_websocket_auth(
597            SubscriptionChannel::SubscribePositions
598        ));
599        assert!(requires_websocket_auth(
600            SubscriptionChannel::SubscribeTransactions
601        ));
602        assert!(requires_websocket_auth(
603            SubscriptionChannel::SubscribeOrderEvents
604        ));
605    }
606
607    #[test]
608    fn requires_auth_returns_false_for_public_channels() {
609        assert!(!requires_websocket_auth(
610            SubscriptionChannel::SubscribeMarketPrices
611        ));
612        assert!(!requires_websocket_auth(
613            SubscriptionChannel::SubscribeMarketLifecycle
614        ));
615    }
616
617    #[test]
618    fn flexfloat_parses_number_and_string() {
619        let from_number: FlexFloat = serde_json::from_str("0.55").unwrap();
620        assert!((from_number.float64() - 0.55).abs() < f64::EPSILON);
621
622        let from_string: FlexFloat = serde_json::from_str(r#""0.55""#).unwrap();
623        assert!((from_string.float64() - 0.55).abs() < f64::EPSILON);
624    }
625
626    #[test]
627    fn websocket_channel_inventory_includes_all_server_events() {
628        // Ensure every server-emitted event name has a corresponding variant.
629        let server_events = [
630            "orderbook",
631            "trades",
632            "orders",
633            "fills",
634            "markets",
635            "prices",
636            "positions",
637            "transactions",
638            "orderEvent",
639            "liveSports",
640            "liveEsports",
641            "marketLifecycle",
642        ];
643        for &event in &server_events {
644            assert!(
645                channel_from_key(event).is_some(),
646                "missing channel variant for server event '{event}'"
647            );
648        }
649    }
650}