Skip to main content

polymarket_client/subscriptions/
mod.rs

1//! WebSocket client wiring and subscription routing.
2
3#![allow(clippy::unnecessary_wraps)]
4
5mod types;
6
7pub use types::{
8    CommentsSubscription, CryptoPricesSubscription, EquityPricesSubscription, MarketStreamEvent,
9    MarketSubscription, SportsStreamEvent, StreamEvent, SubscribeError, SubscriptionHandle,
10    SubscriptionSpec, UserStreamEvent, UserSubscription,
11};
12
13pub use types::merge_streams;
14
15use std::str::FromStr as _;
16
17use futures::StreamExt as _;
18use polymarket_client_sdk_v2::clob::ws::types::response::WsMessage;
19use polymarket_client_sdk_v2::clob::ws::Client as ClobWsClient;
20use polymarket_client_sdk_v2::rtds::Client as RtdsClient;
21use polymarket_client_sdk_v2::types::{B256, U256};
22use polymarket_client_sdk_v2::ws::config::Config as WsConfig;
23
24use crate::environment::Environment;
25use crate::error::user_input;
26use crate::public_client::PublicClient;
27use types::{EventStream, RtdsStreamEvent};
28
29#[derive(Clone)]
30pub struct WebSocketClients {
31    pub clob: ClobWsClient,
32    pub rtds: RtdsClient,
33    #[cfg(feature = "secure")]
34    pub clob_ws_url: String,
35    pub sports_url: String,
36}
37
38impl WebSocketClients {
39    pub fn new(environment: &Environment) -> Result<Self, SubscribeError> {
40        ensure_rustls_crypto_provider();
41        Ok(Self {
42            clob: ClobWsClient::new(environment.clob_ws, WsConfig::default())
43                .map_err(|e| SubscribeError::Transport(e.to_string()))?,
44            rtds: RtdsClient::new(environment.rtds_ws, WsConfig::default())
45                .map_err(|e| SubscribeError::Transport(e.to_string()))?,
46            #[cfg(feature = "secure")]
47            clob_ws_url: environment.clob_ws.to_string(),
48            sports_url: environment.sports_ws.to_string(),
49        })
50    }
51}
52
53impl PublicClient {
54    /// Subscribe to one or more realtime channels.
55    pub fn subscribe(
56        &self,
57        specs: Vec<SubscriptionSpec>,
58    ) -> Result<SubscriptionHandle, SubscribeError> {
59        if specs.is_empty() {
60            return Err(SubscribeError::UserInput(user_input(
61                "at least one subscription spec is required",
62            )));
63        }
64
65        let ws = &self.ws;
66
67        let mut streams = Vec::with_capacity(specs.len());
68        for spec in specs {
69            streams.push(subscribe_one(ws, spec)?);
70        }
71
72        Ok(SubscriptionHandle::new(merge_streams(streams), None))
73    }
74}
75
76pub fn subscribe_one(
77    ws: &WebSocketClients,
78    spec: SubscriptionSpec,
79) -> Result<EventStream, SubscribeError> {
80    match spec {
81        SubscriptionSpec::Market(market) => subscribe_market(ws, market),
82        SubscriptionSpec::Sports => subscribe_sports(ws),
83        SubscriptionSpec::Comments(comments) => subscribe_comments(ws, comments),
84        SubscriptionSpec::CryptoPricesBinance(crypto) => subscribe_crypto_binance(ws, crypto),
85        SubscriptionSpec::CryptoPricesChainlink(crypto) => subscribe_crypto_chainlink(ws, crypto),
86        SubscriptionSpec::EquityPrices(equity) => subscribe_equity(ws, equity),
87        SubscriptionSpec::User(_) => Err(SubscribeError::UserInput(user_input(
88            "user subscriptions require SecureClient",
89        ))),
90    }
91}
92
93fn subscribe_market(
94    ws: &WebSocketClients,
95    spec: MarketSubscription,
96) -> Result<EventStream, SubscribeError> {
97    if spec.token_ids.is_empty() {
98        return Err(SubscribeError::UserInput(user_input(
99            "market subscription requires at least one token_id",
100        )));
101    }
102
103    let asset_ids = parse_token_ids(&spec.token_ids)?;
104    let clob = ws.clob.clone();
105
106    let book = clob
107        .subscribe_orderbook(asset_ids.clone())
108        .map_err(map_ws_err)?;
109    let prices = clob
110        .subscribe_prices(asset_ids.clone())
111        .map_err(map_ws_err)?;
112    let trades = clob
113        .subscribe_last_trade_price(asset_ids.clone())
114        .map_err(map_ws_err)?;
115
116    let mut streams: Vec<EventStream> = vec![
117        Box::pin(book.map(map_market_book)),
118        Box::pin(prices.map(map_market_price)),
119        Box::pin(trades.map(map_market_last_trade)),
120    ];
121
122    if spec.custom_feature_enabled {
123        let bba = clob.subscribe_best_bid_ask(asset_ids).map_err(map_ws_err)?;
124        streams.push(Box::pin(bba.map(map_market_bba)));
125    }
126
127    Ok(merge_streams(streams))
128}
129
130fn subscribe_sports(ws: &WebSocketClients) -> Result<EventStream, SubscribeError> {
131    let url = ws.sports_url.clone();
132    Ok(Box::pin(async_stream::try_stream! {
133        use futures::SinkExt as _;
134        use tokio_tungstenite::connect_async;
135        use tokio_tungstenite::tungstenite::Message;
136
137        let (mut socket, _) = connect_async(&url)
138            .await
139            .map_err(|e| SubscribeError::Transport(e.to_string()))?;
140
141        while let Some(message) = socket.next().await {
142            let message = message.map_err(|e| SubscribeError::Transport(e.to_string()))?;
143            match message {
144                Message::Text(text) => {
145                    if text == "ping" {
146                        socket
147                            .send(Message::Text("pong".into()))
148                            .await
149                            .map_err(|e| SubscribeError::Transport(e.to_string()))?;
150                        continue;
151                    }
152                    if let Ok(value) = serde_json::from_str::<serde_json::Value>(&text) {
153                        if let Some(event) = parse_sports_event(&value) {
154                            yield StreamEvent::Sports(event);
155                        }
156                    }
157                }
158                Message::Close(_) => break,
159                _ => {}
160            }
161        }
162    }))
163}
164
165fn subscribe_comments(
166    ws: &WebSocketClients,
167    _spec: CommentsSubscription,
168) -> Result<EventStream, SubscribeError> {
169    let rtds = ws.rtds.clone();
170    Ok(Box::pin(async_stream::try_stream! {
171        let subscribed = match rtds.subscribe_comments(None) {
172            Ok(stream) => stream,
173            Err(error) => Err(map_ws_err(error))?,
174        };
175        let mut stream = std::pin::pin!(subscribed);
176        while let Some(result) = stream.next().await {
177            match result {
178                Ok(comment) => {
179                    yield StreamEvent::Rtds(RtdsStreamEvent::Comment {
180                        entity_id: comment.parent_entity_id.to_string(),
181                        body: comment.body,
182                    });
183                }
184                Err(error) => Err(map_ws_err(error))?,
185            }
186        }
187    }))
188}
189
190fn subscribe_crypto_binance(
191    ws: &WebSocketClients,
192    spec: CryptoPricesSubscription,
193) -> Result<EventStream, SubscribeError> {
194    let rtds = ws.rtds.clone();
195    let symbols = if spec.symbols.is_empty() {
196        None
197    } else {
198        Some(spec.symbols)
199    };
200    Ok(Box::pin(async_stream::try_stream! {
201        let subscribed = match rtds.subscribe_crypto_prices(symbols) {
202            Ok(stream) => stream,
203            Err(error) => Err(map_ws_err(error))?,
204        };
205        let mut stream = std::pin::pin!(subscribed);
206        while let Some(result) = stream.next().await {
207            match result {
208                Ok(price) => {
209                    yield StreamEvent::Rtds(RtdsStreamEvent::CryptoPrice {
210                        source: "binance".into(),
211                        symbol: price.symbol,
212                        price: price.value.to_string(),
213                    });
214                }
215                Err(error) => Err(map_ws_err(error))?,
216            }
217        }
218    }))
219}
220
221fn subscribe_crypto_chainlink(
222    ws: &WebSocketClients,
223    spec: CryptoPricesSubscription,
224) -> Result<EventStream, SubscribeError> {
225    let rtds = ws.rtds.clone();
226    if spec.symbols.len() <= 1 {
227        let symbol = spec.symbols.into_iter().next();
228        return Ok(Box::pin(async_stream::try_stream! {
229            let subscribed = match rtds.subscribe_chainlink_prices(symbol) {
230                Ok(stream) => stream,
231                Err(error) => Err(map_ws_err(error))?,
232            };
233            let mut stream = std::pin::pin!(subscribed);
234            while let Some(result) = stream.next().await {
235                match result {
236                    Ok(price) => {
237                        yield StreamEvent::Rtds(RtdsStreamEvent::CryptoPrice {
238                            source: "chainlink".into(),
239                            symbol: price.symbol,
240                            price: price.value.to_string(),
241                        });
242                    }
243                    Err(error) => Err(map_ws_err(error))?,
244                }
245            }
246        }));
247    }
248
249    let mut streams: Vec<EventStream> = Vec::with_capacity(spec.symbols.len());
250    for symbol in spec.symbols {
251        let rtds = ws.rtds.clone();
252        streams.push(Box::pin(async_stream::try_stream! {
253            let subscribed = match rtds.subscribe_chainlink_prices(Some(symbol)) {
254                Ok(stream) => stream,
255                Err(error) => Err(map_ws_err(error))?,
256            };
257            let mut stream = std::pin::pin!(subscribed);
258            while let Some(result) = stream.next().await {
259                match result {
260                    Ok(price) => {
261                        yield StreamEvent::Rtds(RtdsStreamEvent::CryptoPrice {
262                            source: "chainlink".into(),
263                            symbol: price.symbol,
264                            price: price.value.to_string(),
265                        });
266                    }
267                    Err(error) => Err(map_ws_err(error))?,
268                }
269            }
270        }));
271    }
272    Ok(merge_streams(streams))
273}
274
275fn subscribe_equity(
276    _ws: &WebSocketClients,
277    _spec: EquityPricesSubscription,
278) -> Result<EventStream, SubscribeError> {
279    Err(SubscribeError::Transport(
280        "equity price subscriptions are not yet supported in the Rust SDK".into(),
281    ))
282}
283
284#[cfg(feature = "secure")]
285pub fn subscribe_user(
286    ws: &WebSocketClients,
287    credentials: polymarket_client_sdk_v2::auth::Credentials,
288    address: polymarket_client_sdk_v2::types::Address,
289    spec: UserSubscription,
290) -> Result<EventStream, SubscribeError> {
291    let markets = if spec.markets.is_empty() {
292        Vec::new()
293    } else {
294        spec.markets
295            .iter()
296            .map(|market| {
297                B256::from_str(market).map_err(|e| {
298                    SubscribeError::UserInput(user_input(format!("invalid market: {e}")))
299                })
300            })
301            .collect::<Result<Vec<_>, _>>()?
302    };
303
304    let clob = ClobWsClient::new(&ws.clob_ws_url, WsConfig::default()).map_err(map_ws_err)?;
305    let auth = clob
306        .authenticate(credentials, address)
307        .map_err(map_ws_err)?;
308
309    let stream = auth.subscribe_user_events(markets).map_err(map_ws_err)?;
310
311    Ok(Box::pin(stream.filter_map(|result| async move {
312        match result {
313            Ok(message) => map_user_message(message).map(Ok),
314            Err(error) => Some(Err(map_ws_err(error))),
315        }
316    })))
317}
318
319fn parse_token_ids(token_ids: &[String]) -> Result<Vec<U256>, SubscribeError> {
320    token_ids
321        .iter()
322        .map(|token_id| {
323            U256::from_str(token_id).map_err(|e| {
324                SubscribeError::UserInput(user_input(format!("invalid token_id: {e}")))
325            })
326        })
327        .collect()
328}
329
330fn map_ws_err(error: impl std::fmt::Display) -> SubscribeError {
331    SubscribeError::Transport(error.to_string())
332}
333
334fn map_market_book(
335    result: Result<
336        polymarket_client_sdk_v2::clob::ws::types::response::BookUpdate,
337        impl std::fmt::Display,
338    >,
339) -> Result<StreamEvent, SubscribeError> {
340    let book = result.map_err(map_ws_err)?;
341    Ok(StreamEvent::Market(MarketStreamEvent::OrderBook {
342        token_id: book.asset_id.to_string(),
343        market: book.market.to_string(),
344        timestamp: book.timestamp,
345        bid_levels: book.bids.len(),
346        ask_levels: book.asks.len(),
347    }))
348}
349
350fn map_market_price(
351    result: Result<
352        polymarket_client_sdk_v2::clob::ws::types::response::PriceChange,
353        impl std::fmt::Display,
354    >,
355) -> Result<StreamEvent, SubscribeError> {
356    let change = result.map_err(map_ws_err)?;
357    let entry =
358        change.price_changes.into_iter().next().ok_or_else(|| {
359            SubscribeError::Transport("price change event missing entries".into())
360        })?;
361    Ok(StreamEvent::Market(MarketStreamEvent::PriceChange {
362        token_id: entry.asset_id.to_string(),
363        market: change.market.to_string(),
364        price: entry.price.to_string(),
365        side: format!("{:?}", entry.side),
366    }))
367}
368
369fn map_market_last_trade(
370    result: Result<
371        polymarket_client_sdk_v2::clob::ws::types::response::LastTradePrice,
372        impl std::fmt::Display,
373    >,
374) -> Result<StreamEvent, SubscribeError> {
375    let trade = result.map_err(map_ws_err)?;
376    Ok(StreamEvent::Market(MarketStreamEvent::LastTradePrice {
377        token_id: trade.asset_id.to_string(),
378        market: trade.market.to_string(),
379        price: trade.price.to_string(),
380    }))
381}
382
383fn map_market_bba(
384    result: Result<
385        polymarket_client_sdk_v2::clob::ws::types::response::BestBidAsk,
386        impl std::fmt::Display,
387    >,
388) -> Result<StreamEvent, SubscribeError> {
389    let bba = result.map_err(map_ws_err)?;
390    Ok(StreamEvent::Market(MarketStreamEvent::BestBidAsk {
391        token_id: bba.asset_id.to_string(),
392        best_bid: bba.best_bid.to_string(),
393        best_ask: bba.best_ask.to_string(),
394    }))
395}
396
397#[cfg(feature = "secure")]
398fn map_user_message(message: WsMessage) -> Option<StreamEvent> {
399    match message {
400        WsMessage::Order(order) => Some(StreamEvent::User(UserStreamEvent::Order {
401            order_id: order.id,
402            token_id: order.asset_id.to_string(),
403            side: format!("{:?}", order.side),
404            status: order
405                .status
406                .map_or_else(|| "UNKNOWN".into(), |status| format!("{status:?}")),
407        })),
408        WsMessage::Trade(trade) => Some(StreamEvent::User(UserStreamEvent::Trade {
409            trade_id: trade.id,
410            token_id: trade.asset_id.to_string(),
411            side: format!("{:?}", trade.side),
412            price: trade.price.to_string(),
413            size: trade.size.to_string(),
414        })),
415        _ => None,
416    }
417}
418
419fn parse_sports_event(value: &serde_json::Value) -> Option<SportsStreamEvent> {
420    Some(SportsStreamEvent {
421        game_id: value.get("gameId")?.as_i64()?,
422        league: value
423            .get("leagueAbbreviation")
424            .and_then(|v| v.as_str())
425            .unwrap_or("")
426            .to_string(),
427        status: value
428            .get("status")
429            .and_then(|v| v.as_str())
430            .unwrap_or("")
431            .to_string(),
432        score: value
433            .get("score")
434            .and_then(|v| v.as_str())
435            .unwrap_or("")
436            .to_string(),
437        live: value
438            .get("live")
439            .and_then(serde_json::Value::as_bool)
440            .unwrap_or(false),
441    })
442}
443
444fn ensure_rustls_crypto_provider() {
445    static INIT: std::sync::Once = std::sync::Once::new();
446    INIT.call_once(|| {
447        let _ = rustls::crypto::ring::default_provider().install_default();
448    });
449}