Skip to main content

predict_fun_sdk/
ws.rs

1//! Predict.fun WebSocket client for real-time market data (BNB Chain).
2//!
3//! Connects to `wss://ws.predict.fun/ws` and provides:
4//! - Orderbook snapshots (`predictOrderbook/{marketId}`)
5//! - Asset price updates (`assetPriceUpdate/{feedId}`)
6//! - Cross-venue chance data (`polymarketChance/{marketId}`, `kalshiChance/{marketId}`)
7//! - Wallet event notifications (`predictWalletEvents/{jwt}`)
8//!
9//! # Protocol
10//!
11//! Custom JSON RPC over WebSocket (not graphql-ws):
12//! ```text
13//! → {"requestId": 0, "method": "subscribe", "params": ["predictOrderbook/123"]}
14//! ← {"type": "R", "requestId": 0, "success": true}
15//! ← {"type": "M", "topic": "predictOrderbook/123", "data": {...}}
16//! ```
17//!
18//! # Example
19//!
20//! ```rust,no_run
21//! use predict_fun_sdk::ws::{PredictWsClient, PredictWsMessage, Topic};
22//!
23//! # async fn example() -> anyhow::Result<()> {
24//! let (client, mut rx) = PredictWsClient::connect_mainnet().await?;
25//!
26//! client.subscribe(Topic::Orderbook { market_id: 45532 }).await?;
27//! client.subscribe(Topic::AssetPrice { feed_id: 1 }).await?;
28//!
29//! while let Some(msg) = rx.recv().await {
30//!     match msg {
31//!         PredictWsMessage::Orderbook(ob) => {
32//!             println!("OB market={}: {} bids, {} asks",
33//!                 ob.market_id, ob.bids.len(), ob.asks.len());
34//!         }
35//!         PredictWsMessage::AssetPrice(p) => {
36//!             println!("Price feed {}: ${:.2}", p.feed_id, p.price);
37//!         }
38//!         _ => {}
39//!     }
40//! }
41//! # Ok(())
42//! # }
43//! ```
44
45use std::collections::{HashMap, HashSet};
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::sync::Arc;
48
49use anyhow::{anyhow, Context, Result};
50use futures_util::{SinkExt, StreamExt};
51use serde::{Deserialize, Serialize};
52use serde_json::Value;
53use tokio::sync::{mpsc, oneshot, Mutex};
54use tokio::time::{self, Duration};
55use tokio_tungstenite::tungstenite::Message as WsMessage;
56use tracing::{debug, error, info, warn};
57
58/// WebSocket endpoints.
59pub const PREDICT_WS_MAINNET: &str = "wss://ws.predict.fun/ws";
60pub const PREDICT_WS_TESTNET: &str = "wss://ws.bnb.predict.fail/ws";
61
62/// GraphQL endpoints (for reference / future use).
63pub const PREDICT_GQL_MAINNET: &str = "https://graphql.predict.fun/graphql";
64pub const PREDICT_GQL_TESTNET: &str = "https://graphql.bnb.predict.fail/graphql";
65
66// ── Topic ──
67
68/// Subscription topic for the predict.fun WebSocket feed.
69#[derive(Debug, Clone, PartialEq, Eq, Hash)]
70pub enum Topic {
71    /// Full orderbook snapshots on every change (sub-second).
72    Orderbook { market_id: i64 },
73    /// Oracle price updates (~3-5/sec per asset).
74    AssetPrice { feed_id: i64 },
75    /// Cross-venue Polymarket probability.
76    PolymarketChance { market_id: i64 },
77    /// Cross-venue Kalshi probability.
78    KalshiChance { market_id: i64 },
79    /// Wallet events (fills, settlements). Requires JWT.
80    WalletEvents { jwt: String },
81    /// Raw topic string for undocumented topics.
82    Raw(String),
83}
84
85impl Topic {
86    pub fn to_topic_string(&self) -> String {
87        match self {
88            Topic::Orderbook { market_id } => format!("predictOrderbook/{}", market_id),
89            Topic::AssetPrice { feed_id } => format!("assetPriceUpdate/{}", feed_id),
90            Topic::PolymarketChance { market_id } => format!("polymarketChance/{}", market_id),
91            Topic::KalshiChance { market_id } => format!("kalshiChance/{}", market_id),
92            Topic::WalletEvents { jwt } => format!("predictWalletEvents/{}", jwt),
93            Topic::Raw(s) => s.clone(),
94        }
95    }
96
97    pub fn from_topic_string(s: &str) -> Self {
98        let prefixes: &[(&str, fn(&str) -> Option<Topic>)] = &[
99            ("predictOrderbook/", |r| {
100                r.parse().ok().map(|id| Topic::Orderbook { market_id: id })
101            }),
102            ("assetPriceUpdate/", |r| {
103                r.parse().ok().map(|id| Topic::AssetPrice { feed_id: id })
104            }),
105            ("polymarketChance/", |r| {
106                r.parse()
107                    .ok()
108                    .map(|id| Topic::PolymarketChance { market_id: id })
109            }),
110            ("kalshiChance/", |r| {
111                r.parse()
112                    .ok()
113                    .map(|id| Topic::KalshiChance { market_id: id })
114            }),
115            ("predictWalletEvents/", |r| {
116                Some(Topic::WalletEvents {
117                    jwt: r.to_string(),
118                })
119            }),
120        ];
121
122        for (prefix, parse_fn) in prefixes {
123            if let Some(rest) = s.strip_prefix(prefix) {
124                if let Some(topic) = parse_fn(rest) {
125                    return topic;
126                }
127            }
128        }
129        Topic::Raw(s.to_string())
130    }
131}
132
133impl std::fmt::Display for Topic {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        write!(f, "{}", self.to_topic_string())
136    }
137}
138
139// ── Parsed message types ──
140
141/// Orderbook level: `(price, size)`.
142pub type Level = (f64, f64);
143
144/// Last settled order info.
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct LastOrderSettled {
147    pub id: String,
148    pub kind: String,
149    #[serde(rename = "marketId")]
150    pub market_id: i64,
151    pub outcome: String,
152    pub price: String,
153    pub side: String,
154}
155
156/// Full orderbook snapshot pushed on every change.
157#[derive(Debug, Clone)]
158pub struct OrderbookSnapshot {
159    pub market_id: i64,
160    pub bids: Vec<Level>,
161    pub asks: Vec<Level>,
162    pub version: u64,
163    pub update_timestamp_ms: u64,
164    pub order_count: u64,
165    pub last_order_settled: Option<LastOrderSettled>,
166}
167
168impl OrderbookSnapshot {
169    pub fn best_bid(&self) -> Option<f64> {
170        self.bids.first().map(|(p, _)| *p)
171    }
172    pub fn best_ask(&self) -> Option<f64> {
173        self.asks.first().map(|(p, _)| *p)
174    }
175    pub fn mid(&self) -> Option<f64> {
176        match (self.best_bid(), self.best_ask()) {
177            (Some(b), Some(a)) => Some((b + a) / 2.0),
178            _ => None,
179        }
180    }
181    pub fn spread(&self) -> Option<f64> {
182        match (self.best_bid(), self.best_ask()) {
183            (Some(b), Some(a)) => Some(a - b),
184            _ => None,
185        }
186    }
187}
188
189/// Oracle price update.
190#[derive(Debug, Clone)]
191pub struct AssetPriceUpdate {
192    pub feed_id: i64,
193    pub price: f64,
194    pub publish_time: u64,
195    pub timestamp: u64,
196}
197
198/// Cross-venue chance data (Polymarket or Kalshi).
199#[derive(Debug, Clone)]
200pub struct CrossVenueChance {
201    pub source: CrossVenueSource,
202    pub market_id: i64,
203    pub data: Value,
204}
205
206#[derive(Debug, Clone, Copy, PartialEq, Eq)]
207pub enum CrossVenueSource {
208    Polymarket,
209    Kalshi,
210}
211
212/// Wallet event notification (fills, settlements).
213#[derive(Debug, Clone)]
214pub struct WalletEvent {
215    pub data: Value,
216}
217
218/// Parsed WebSocket message.
219#[derive(Debug, Clone)]
220pub enum PredictWsMessage {
221    Orderbook(OrderbookSnapshot),
222    AssetPrice(AssetPriceUpdate),
223    CrossVenueChance(CrossVenueChance),
224    WalletEvent(WalletEvent),
225    /// Unparsed message (unknown topic or parse failure).
226    Raw { topic: String, data: Value },
227}
228
229// ── Wire protocol ──
230
231#[derive(Serialize)]
232struct WsRequest {
233    #[serde(rename = "requestId")]
234    request_id: u64,
235    method: String,
236    #[serde(skip_serializing_if = "Option::is_none")]
237    params: Option<Vec<String>>,
238    #[serde(skip_serializing_if = "Option::is_none")]
239    data: Option<Value>,
240}
241
242#[derive(Deserialize)]
243struct WsRawMessage {
244    #[serde(rename = "type")]
245    msg_type: String,
246    #[serde(rename = "requestId")]
247    request_id: Option<i64>,
248    success: Option<bool>,
249    error: Option<WsError>,
250    topic: Option<String>,
251    data: Option<Value>,
252}
253
254#[derive(Deserialize, Debug)]
255struct WsError {
256    code: String,
257    message: Option<String>,
258}
259
260impl std::fmt::Display for WsError {
261    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262        write!(f, "{}", self.code)?;
263        if let Some(msg) = &self.message {
264            write!(f, ": {}", msg)?;
265        }
266        Ok(())
267    }
268}
269
270// ── Client ──
271
272type WsSink = futures_util::stream::SplitSink<
273    tokio_tungstenite::WebSocketStream<
274        tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
275    >,
276    WsMessage,
277>;
278type WsStream = futures_util::stream::SplitStream<
279    tokio_tungstenite::WebSocketStream<
280        tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
281    >,
282>;
283
284type PendingResponse = oneshot::Sender<Result<()>>;
285
286/// WebSocket connection config.
287#[derive(Debug, Clone)]
288pub struct PredictWsConfig {
289    pub url: String,
290    pub channel_buffer: usize,
291    pub heartbeat_timeout_secs: u64,
292    pub max_reconnect_attempts: u32,
293    pub max_reconnect_backoff_secs: u64,
294}
295
296impl Default for PredictWsConfig {
297    fn default() -> Self {
298        Self {
299            url: PREDICT_WS_MAINNET.to_string(),
300            channel_buffer: 1024,
301            heartbeat_timeout_secs: 60,
302            max_reconnect_attempts: 0, // infinite
303            max_reconnect_backoff_secs: 15,
304        }
305    }
306}
307
308impl PredictWsConfig {
309    pub fn mainnet() -> Self {
310        Self::default()
311    }
312    pub fn testnet() -> Self {
313        Self {
314            url: PREDICT_WS_TESTNET.to_string(),
315            ..Self::default()
316        }
317    }
318}
319
320/// Handle for interacting with the WebSocket connection.
321///
322/// Messages are received on the `mpsc::Receiver` returned from `connect`.
323#[derive(Clone)]
324pub struct PredictWsClient {
325    sink: Arc<Mutex<WsSink>>,
326    request_id: Arc<AtomicU64>,
327    pending: Arc<Mutex<HashMap<u64, PendingResponse>>>,
328    active_topics: Arc<Mutex<HashSet<String>>>,
329    config: PredictWsConfig,
330}
331
332impl PredictWsClient {
333    pub async fn connect_mainnet() -> Result<(Self, mpsc::Receiver<PredictWsMessage>)> {
334        Self::connect(PredictWsConfig::mainnet()).await
335    }
336
337    pub async fn connect_testnet() -> Result<(Self, mpsc::Receiver<PredictWsMessage>)> {
338        Self::connect(PredictWsConfig::testnet()).await
339    }
340
341    pub async fn connect(
342        config: PredictWsConfig,
343    ) -> Result<(Self, mpsc::Receiver<PredictWsMessage>)> {
344        let (ws_stream, _) = tokio_tungstenite::connect_async(&config.url)
345            .await
346            .with_context(|| format!("failed to connect to {}", config.url))?;
347
348        info!("Connected to {}", config.url);
349
350        let (sink, stream) = ws_stream.split();
351        let (tx, rx) = mpsc::channel(config.channel_buffer);
352
353        let client = Self {
354            sink: Arc::new(Mutex::new(sink)),
355            request_id: Arc::new(AtomicU64::new(0)),
356            pending: Arc::new(Mutex::new(HashMap::new())),
357            active_topics: Arc::new(Mutex::new(HashSet::new())),
358            config,
359        };
360
361        let client_clone = client.clone();
362        tokio::spawn(async move {
363            client_clone.run_loop(stream, tx).await;
364        });
365
366        Ok((client, rx))
367    }
368
369    pub async fn subscribe(&self, topic: Topic) -> Result<()> {
370        let topic_str = topic.to_topic_string();
371        let result = self.send_and_wait("subscribe", &topic_str).await?;
372
373        self.active_topics.lock().await.insert(topic_str.clone());
374        info!("Subscribed to {}", topic_str);
375        Ok(result)
376    }
377
378    pub async fn unsubscribe(&self, topic: Topic) -> Result<()> {
379        let topic_str = topic.to_topic_string();
380        self.send_and_wait("unsubscribe", &topic_str).await?;
381
382        self.active_topics.lock().await.remove(&topic_str);
383        info!("Unsubscribed from {}", topic_str);
384        Ok(())
385    }
386
387    pub async fn active_topics(&self) -> Vec<String> {
388        self.active_topics.lock().await.iter().cloned().collect()
389    }
390
391    // ── Internal ──
392
393    async fn send_and_wait(&self, method: &str, topic: &str) -> Result<()> {
394        let request_id = self.request_id.fetch_add(1, Ordering::Relaxed);
395        let (resp_tx, resp_rx) = oneshot::channel();
396
397        self.pending.lock().await.insert(request_id, resp_tx);
398
399        let msg = WsRequest {
400            request_id,
401            method: method.to_string(),
402            params: Some(vec![topic.to_string()]),
403            data: None,
404        };
405        self.send_raw(&msg).await?;
406        debug!("{} {} (requestId={})", method, topic, request_id);
407
408        tokio::time::timeout(Duration::from_secs(10), resp_rx)
409            .await
410            .map_err(|_| anyhow!("{} timeout for {}", method, topic))?
411            .map_err(|_| anyhow!("{} channel closed for {}", method, topic))?
412    }
413
414    async fn send_heartbeat(&self, data: &Value) -> Result<()> {
415        let msg = WsRequest {
416            request_id: self.request_id.fetch_add(1, Ordering::Relaxed),
417            method: "heartbeat".to_string(),
418            params: None,
419            data: Some(data.clone()),
420        };
421        self.send_raw(&msg).await
422    }
423
424    async fn send_raw(&self, msg: &WsRequest) -> Result<()> {
425        let text = serde_json::to_string(msg).context("failed to serialize WS message")?;
426        self.sink
427            .lock()
428            .await
429            .send(WsMessage::Text(text))
430            .await
431            .context("failed to send WS message")
432    }
433
434    /// Main event loop: read messages, handle heartbeats, reconnect on disconnect.
435    /// Iterative (not recursive) — simpler and no stack growth.
436    async fn run_loop(&self, mut stream: WsStream, tx: mpsc::Sender<PredictWsMessage>) {
437        let heartbeat_timeout = Duration::from_secs(self.config.heartbeat_timeout_secs);
438        let mut attempt = 0u32;
439
440        loop {
441            // Read loop for current connection
442            let mut last_heartbeat = time::Instant::now();
443            let disconnected = loop {
444                tokio::select! {
445                    msg = stream.next() => {
446                        match msg {
447                            Some(Ok(WsMessage::Text(text))) => {
448                                if let Ok(raw) = serde_json::from_str::<WsRawMessage>(&text) {
449                                    self.handle_message(raw, &tx, &mut last_heartbeat).await;
450                                }
451                            }
452                            Some(Ok(WsMessage::Ping(data))) => {
453                                let _ = self.sink.lock().await.send(WsMessage::Pong(data)).await;
454                            }
455                            Some(Ok(WsMessage::Close(frame))) => {
456                                info!("WebSocket closed by server: {:?}", frame);
457                                break true;
458                            }
459                            Some(Err(e)) => {
460                                error!("WebSocket error: {}", e);
461                                break true;
462                            }
463                            None => {
464                                info!("WebSocket stream ended");
465                                break true;
466                            }
467                            _ => {} // Binary, Pong — ignore
468                        }
469                    }
470                    _ = time::sleep(heartbeat_timeout) => {
471                        if last_heartbeat.elapsed() > heartbeat_timeout {
472                            warn!("Heartbeat timeout ({}s)", self.config.heartbeat_timeout_secs);
473                            break true;
474                        }
475                    }
476                }
477            };
478
479            if !disconnected {
480                return;
481            }
482
483            // Reconnect with exponential backoff
484            let max = self.config.max_reconnect_attempts;
485            if max > 0 && attempt >= max {
486                error!("Max reconnect attempts ({}) reached", max);
487                return;
488            }
489
490            let backoff_secs = (2u64.pow(attempt.min(10))).min(self.config.max_reconnect_backoff_secs);
491            warn!("Reconnecting in {}s (attempt {})", backoff_secs, attempt + 1);
492            time::sleep(Duration::from_secs(backoff_secs)).await;
493            attempt += 1;
494
495            match tokio_tungstenite::connect_async(&self.config.url).await {
496                Ok((ws_stream, _)) => {
497                    info!("Reconnected to {}", self.config.url);
498                    let (new_sink, new_stream) = ws_stream.split();
499                    *self.sink.lock().await = new_sink;
500                    stream = new_stream;
501                    attempt = 0; // reset on success
502
503                    // Resubscribe to all active topics
504                    let topics: Vec<String> =
505                        self.active_topics.lock().await.iter().cloned().collect();
506                    for topic_str in &topics {
507                        let req_id = self.request_id.fetch_add(1, Ordering::Relaxed);
508                        let msg = WsRequest {
509                            request_id: req_id,
510                            method: "subscribe".to_string(),
511                            params: Some(vec![topic_str.clone()]),
512                            data: None,
513                        };
514                        if let Err(e) = self.send_raw(&msg).await {
515                            warn!("Failed to resubscribe to {}: {}", topic_str, e);
516                        }
517                    }
518                }
519                Err(e) => {
520                    error!("Reconnection failed: {}", e);
521                    // loop will retry
522                }
523            }
524        }
525    }
526
527    async fn handle_message(
528        &self,
529        raw: WsRawMessage,
530        tx: &mpsc::Sender<PredictWsMessage>,
531        last_heartbeat: &mut time::Instant,
532    ) {
533        match raw.msg_type.as_str() {
534            "R" => {
535                // Response to subscribe/unsubscribe
536                if let Some(req_id) = raw.request_id {
537                    if let Some(resp_tx) = self.pending.lock().await.remove(&(req_id as u64)) {
538                        let result = if raw.success.unwrap_or(false) {
539                            Ok(())
540                        } else {
541                            Err(anyhow!(
542                                "subscribe failed: {}",
543                                raw.error
544                                    .map(|e| e.to_string())
545                                    .unwrap_or_else(|| "unknown".into())
546                            ))
547                        };
548                        let _ = resp_tx.send(result);
549                    }
550                }
551            }
552            "M" => {
553                // Push message
554                let topic_str = match &raw.topic {
555                    Some(t) => t.as_str(),
556                    None => return,
557                };
558
559                // Heartbeat echo
560                if topic_str == "heartbeat" {
561                    *last_heartbeat = time::Instant::now();
562                    if let Some(data) = &raw.data {
563                        if let Err(e) = self.send_heartbeat(data).await {
564                            warn!("Heartbeat response failed: {}", e);
565                        }
566                    }
567                    return;
568                }
569
570                if let Some(data) = raw.data {
571                    let parsed = parse_push_message(topic_str, data);
572                    if tx.try_send(parsed).is_err() {
573                        warn!("Channel full, dropping message for {}", topic_str);
574                    }
575                }
576            }
577            _ => debug!("Unknown WS message type: {}", raw.msg_type),
578        }
579    }
580}
581
582// ── Message parsing ──
583
584fn parse_push_message(topic: &str, data: Value) -> PredictWsMessage {
585    if let Some(rest) = topic.strip_prefix("predictOrderbook/") {
586        if let Ok(market_id) = rest.parse::<i64>() {
587            if let Some(ob) = parse_orderbook(market_id, &data) {
588                return PredictWsMessage::Orderbook(ob);
589            }
590        }
591    }
592
593    if let Some(rest) = topic.strip_prefix("assetPriceUpdate/") {
594        if let Ok(feed_id) = rest.parse::<i64>() {
595            if let Some(price) = parse_asset_price(feed_id, &data) {
596                return PredictWsMessage::AssetPrice(price);
597            }
598        }
599    }
600
601    if let Some(rest) = topic.strip_prefix("polymarketChance/") {
602        if let Ok(id) = rest.parse::<i64>() {
603            return PredictWsMessage::CrossVenueChance(CrossVenueChance {
604                source: CrossVenueSource::Polymarket,
605                market_id: id,
606                data,
607            });
608        }
609    }
610
611    if let Some(rest) = topic.strip_prefix("kalshiChance/") {
612        if let Ok(id) = rest.parse::<i64>() {
613            return PredictWsMessage::CrossVenueChance(CrossVenueChance {
614                source: CrossVenueSource::Kalshi,
615                market_id: id,
616                data,
617            });
618        }
619    }
620
621    if topic.starts_with("predictWalletEvents/") {
622        return PredictWsMessage::WalletEvent(WalletEvent { data });
623    }
624
625    PredictWsMessage::Raw {
626        topic: topic.to_string(),
627        data,
628    }
629}
630
631fn parse_levels(val: &Value) -> Vec<Level> {
632    val.as_array()
633        .map(|arr| {
634            arr.iter()
635                .filter_map(|lvl| {
636                    let price = lvl.get(0)?.as_f64()?;
637                    let size = lvl.get(1)?.as_f64()?;
638                    Some((price, size))
639                })
640                .collect()
641        })
642        .unwrap_or_default()
643}
644
645fn parse_orderbook(market_id: i64, data: &Value) -> Option<OrderbookSnapshot> {
646    Some(OrderbookSnapshot {
647        market_id,
648        bids: parse_levels(data.get("bids")?),
649        asks: parse_levels(data.get("asks")?),
650        version: data.get("version")?.as_u64().unwrap_or(0),
651        update_timestamp_ms: data
652            .get("updateTimestampMs")
653            .and_then(|v| v.as_u64())
654            .unwrap_or(0),
655        order_count: data
656            .get("orderCount")
657            .and_then(|v| v.as_u64())
658            .unwrap_or(0),
659        last_order_settled: data
660            .get("lastOrderSettled")
661            .and_then(|v| serde_json::from_value(v.clone()).ok()),
662    })
663}
664
665fn parse_asset_price(feed_id: i64, data: &Value) -> Option<AssetPriceUpdate> {
666    Some(AssetPriceUpdate {
667        feed_id,
668        price: data.get("price")?.as_f64()?,
669        publish_time: data.get("publishTime").and_then(|v| v.as_u64()).unwrap_or(0),
670        timestamp: data.get("timestamp").and_then(|v| v.as_u64()).unwrap_or(0),
671    })
672}
673
674// ── Known asset feed IDs ──
675
676/// Known asset price feed IDs for `Topic::AssetPrice`.
677pub mod feeds {
678    pub const BTC: i64 = 1;
679    pub const ETH: i64 = 4;
680    /// Tentative — needs confirmation from predict.fun team.
681    pub const BNB: i64 = 2;
682}
683
684#[cfg(test)]
685mod tests {
686    use super::*;
687
688    #[test]
689    fn topic_roundtrip() {
690        let topics = vec![
691            Topic::Orderbook { market_id: 123 },
692            Topic::AssetPrice { feed_id: 1 },
693            Topic::PolymarketChance { market_id: 456 },
694            Topic::KalshiChance { market_id: 789 },
695            Topic::WalletEvents {
696                jwt: "abc123".to_string(),
697            },
698            Topic::Raw("custom/topic".to_string()),
699        ];
700        for topic in topics {
701            let s = topic.to_topic_string();
702            assert_eq!(topic, Topic::from_topic_string(&s), "Roundtrip failed: {}", s);
703        }
704    }
705
706    #[test]
707    fn topic_display() {
708        assert_eq!(Topic::Orderbook { market_id: 42 }.to_string(), "predictOrderbook/42");
709        assert_eq!(Topic::AssetPrice { feed_id: 1 }.to_string(), "assetPriceUpdate/1");
710    }
711
712    #[test]
713    fn parse_orderbook_snapshot() {
714        let data = serde_json::json!({
715            "asks": [[0.72, 15.0], [0.83, 5.88]],
716            "bids": [[0.57, 15.0], [0.38, 2.63]],
717            "version": 1,
718            "updateTimestampMs": 1772898630219u64,
719            "orderCount": 13,
720            "lastOrderSettled": {
721                "id": "20035648", "kind": "LIMIT", "marketId": 45532,
722                "outcome": "No", "price": "0.60", "side": "Bid"
723            }
724        });
725
726        let ob = parse_orderbook(45532, &data).unwrap();
727        assert_eq!(ob.market_id, 45532);
728        assert_eq!(ob.bids.len(), 2);
729        assert_eq!(ob.asks.len(), 2);
730        assert!((ob.best_bid().unwrap() - 0.57).abs() < 1e-10);
731        assert!((ob.best_ask().unwrap() - 0.72).abs() < 1e-10);
732        assert!((ob.mid().unwrap() - 0.645).abs() < 1e-10);
733        assert!((ob.spread().unwrap() - 0.15).abs() < 1e-10);
734        assert_eq!(ob.version, 1);
735        assert_eq!(ob.order_count, 13);
736        assert!(ob.last_order_settled.is_some());
737    }
738
739    #[test]
740    fn parse_asset_price_update() {
741        let data = serde_json::json!({
742            "price": 67853.57751504,
743            "publishTime": 1772898632u64,
744            "timestamp": 1772898633u64
745        });
746        let price = parse_asset_price(1, &data).unwrap();
747        assert_eq!(price.feed_id, 1);
748        assert!((price.price - 67853.577).abs() < 1.0);
749    }
750
751    #[test]
752    fn parse_push_dispatches() {
753        let ob = serde_json::json!({"asks": [], "bids": [], "version": 1, "updateTimestampMs": 0, "orderCount": 0});
754        assert!(matches!(parse_push_message("predictOrderbook/123", ob), PredictWsMessage::Orderbook(_)));
755
756        let p = serde_json::json!({"price": 100.0, "publishTime": 0, "timestamp": 0});
757        assert!(matches!(parse_push_message("assetPriceUpdate/1", p), PredictWsMessage::AssetPrice(_)));
758
759        let c = serde_json::json!({"chance": 0.5});
760        assert!(matches!(parse_push_message("polymarketChance/456", c), PredictWsMessage::CrossVenueChance(_)));
761
762        let k = serde_json::json!({"chance": 0.3});
763        assert!(matches!(parse_push_message("kalshiChance/789", k), PredictWsMessage::CrossVenueChance(_)));
764
765        let w = serde_json::json!({"event": "fill"});
766        assert!(matches!(parse_push_message("predictWalletEvents/jwt123", w), PredictWsMessage::WalletEvent(_)));
767
768        let u = serde_json::json!({"foo": "bar"});
769        assert!(matches!(parse_push_message("unknown/topic", u), PredictWsMessage::Raw { .. }));
770    }
771
772    #[test]
773    fn orderbook_helpers_empty() {
774        let ob = OrderbookSnapshot {
775            market_id: 1, bids: vec![], asks: vec![], version: 0,
776            update_timestamp_ms: 0, order_count: 0, last_order_settled: None,
777        };
778        assert!(ob.best_bid().is_none());
779        assert!(ob.mid().is_none());
780        assert!(ob.spread().is_none());
781    }
782
783    #[test]
784    fn feed_id_constants() {
785        assert_eq!(feeds::BTC, 1);
786        assert_eq!(feeds::ETH, 4);
787        assert_eq!(feeds::BNB, 2);
788    }
789
790    #[test]
791    fn config_defaults() {
792        let c = PredictWsConfig::default();
793        assert_eq!(c.url, PREDICT_WS_MAINNET);
794        assert_eq!(c.channel_buffer, 1024);
795        assert_eq!(c.heartbeat_timeout_secs, 60);
796    }
797
798    #[test]
799    fn config_testnet() {
800        assert_eq!(PredictWsConfig::testnet().url, PREDICT_WS_TESTNET);
801    }
802}