polymarket_client/subscriptions/
mod.rs1#![allow(clippy::unnecessary_wraps)]
4
5mod types;
6
7pub use types::{
8 CommentsSubscription, CryptoPricesSubscription, EquityPricesSubscription, MarketStreamEvent,
9 MarketSubscription, SportsStreamEvent, StreamEvent, SubscribeError, SubscriptionHandle,
10 SubscriptionSpec, UserStreamEvent, UserSubscription,
11};
12
13pub use types::merge_streams;
14
15use std::str::FromStr as _;
16
17use futures::StreamExt as _;
18use polymarket_client_sdk_v2::clob::ws::types::response::WsMessage;
19use polymarket_client_sdk_v2::clob::ws::Client as ClobWsClient;
20use polymarket_client_sdk_v2::rtds::Client as RtdsClient;
21use polymarket_client_sdk_v2::types::{B256, U256};
22use polymarket_client_sdk_v2::ws::config::Config as WsConfig;
23
24use crate::environment::Environment;
25use crate::error::user_input;
26use crate::public_client::PublicClient;
27use types::{EventStream, RtdsStreamEvent};
28
29#[derive(Clone)]
30pub struct WebSocketClients {
31 pub clob: ClobWsClient,
32 pub rtds: RtdsClient,
33 #[cfg(feature = "secure")]
34 pub clob_ws_url: String,
35 pub sports_url: String,
36}
37
38impl WebSocketClients {
39 pub fn new(environment: &Environment) -> Result<Self, SubscribeError> {
40 ensure_rustls_crypto_provider();
41 Ok(Self {
42 clob: ClobWsClient::new(environment.clob_ws, WsConfig::default())
43 .map_err(|e| SubscribeError::Transport(e.to_string()))?,
44 rtds: RtdsClient::new(environment.rtds_ws, WsConfig::default())
45 .map_err(|e| SubscribeError::Transport(e.to_string()))?,
46 #[cfg(feature = "secure")]
47 clob_ws_url: environment.clob_ws.to_string(),
48 sports_url: environment.sports_ws.to_string(),
49 })
50 }
51}
52
53impl PublicClient {
54 pub fn subscribe(
56 &self,
57 specs: Vec<SubscriptionSpec>,
58 ) -> Result<SubscriptionHandle, SubscribeError> {
59 if specs.is_empty() {
60 return Err(SubscribeError::UserInput(user_input(
61 "at least one subscription spec is required",
62 )));
63 }
64
65 let ws = &self.ws;
66
67 let mut streams = Vec::with_capacity(specs.len());
68 for spec in specs {
69 streams.push(subscribe_one(ws, spec)?);
70 }
71
72 Ok(SubscriptionHandle::new(merge_streams(streams), None))
73 }
74}
75
76pub fn subscribe_one(
77 ws: &WebSocketClients,
78 spec: SubscriptionSpec,
79) -> Result<EventStream, SubscribeError> {
80 match spec {
81 SubscriptionSpec::Market(market) => subscribe_market(ws, market),
82 SubscriptionSpec::Sports => subscribe_sports(ws),
83 SubscriptionSpec::Comments(comments) => subscribe_comments(ws, comments),
84 SubscriptionSpec::CryptoPricesBinance(crypto) => subscribe_crypto_binance(ws, crypto),
85 SubscriptionSpec::CryptoPricesChainlink(crypto) => subscribe_crypto_chainlink(ws, crypto),
86 SubscriptionSpec::EquityPrices(equity) => subscribe_equity(ws, equity),
87 SubscriptionSpec::User(_) => Err(SubscribeError::UserInput(user_input(
88 "user subscriptions require SecureClient",
89 ))),
90 }
91}
92
93fn subscribe_market(
94 ws: &WebSocketClients,
95 spec: MarketSubscription,
96) -> Result<EventStream, SubscribeError> {
97 if spec.token_ids.is_empty() {
98 return Err(SubscribeError::UserInput(user_input(
99 "market subscription requires at least one token_id",
100 )));
101 }
102
103 let asset_ids = parse_token_ids(&spec.token_ids)?;
104 let clob = ws.clob.clone();
105
106 let book = clob
107 .subscribe_orderbook(asset_ids.clone())
108 .map_err(map_ws_err)?;
109 let prices = clob
110 .subscribe_prices(asset_ids.clone())
111 .map_err(map_ws_err)?;
112 let trades = clob
113 .subscribe_last_trade_price(asset_ids.clone())
114 .map_err(map_ws_err)?;
115
116 let mut streams: Vec<EventStream> = vec![
117 Box::pin(book.map(map_market_book)),
118 Box::pin(prices.map(map_market_price)),
119 Box::pin(trades.map(map_market_last_trade)),
120 ];
121
122 if spec.custom_feature_enabled {
123 let bba = clob.subscribe_best_bid_ask(asset_ids).map_err(map_ws_err)?;
124 streams.push(Box::pin(bba.map(map_market_bba)));
125 }
126
127 Ok(merge_streams(streams))
128}
129
130fn subscribe_sports(ws: &WebSocketClients) -> Result<EventStream, SubscribeError> {
131 let url = ws.sports_url.clone();
132 Ok(Box::pin(async_stream::try_stream! {
133 use futures::SinkExt as _;
134 use tokio_tungstenite::connect_async;
135 use tokio_tungstenite::tungstenite::Message;
136
137 let (mut socket, _) = connect_async(&url)
138 .await
139 .map_err(|e| SubscribeError::Transport(e.to_string()))?;
140
141 while let Some(message) = socket.next().await {
142 let message = message.map_err(|e| SubscribeError::Transport(e.to_string()))?;
143 match message {
144 Message::Text(text) => {
145 if text == "ping" {
146 socket
147 .send(Message::Text("pong".into()))
148 .await
149 .map_err(|e| SubscribeError::Transport(e.to_string()))?;
150 continue;
151 }
152 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&text) {
153 if let Some(event) = parse_sports_event(&value) {
154 yield StreamEvent::Sports(event);
155 }
156 }
157 }
158 Message::Close(_) => break,
159 _ => {}
160 }
161 }
162 }))
163}
164
165fn subscribe_comments(
166 ws: &WebSocketClients,
167 _spec: CommentsSubscription,
168) -> Result<EventStream, SubscribeError> {
169 let rtds = ws.rtds.clone();
170 Ok(Box::pin(async_stream::try_stream! {
171 let subscribed = match rtds.subscribe_comments(None) {
172 Ok(stream) => stream,
173 Err(error) => Err(map_ws_err(error))?,
174 };
175 let mut stream = std::pin::pin!(subscribed);
176 while let Some(result) = stream.next().await {
177 match result {
178 Ok(comment) => {
179 yield StreamEvent::Rtds(RtdsStreamEvent::Comment {
180 entity_id: comment.parent_entity_id.to_string(),
181 body: comment.body,
182 });
183 }
184 Err(error) => Err(map_ws_err(error))?,
185 }
186 }
187 }))
188}
189
190fn subscribe_crypto_binance(
191 ws: &WebSocketClients,
192 spec: CryptoPricesSubscription,
193) -> Result<EventStream, SubscribeError> {
194 let rtds = ws.rtds.clone();
195 let symbols = if spec.symbols.is_empty() {
196 None
197 } else {
198 Some(spec.symbols)
199 };
200 Ok(Box::pin(async_stream::try_stream! {
201 let subscribed = match rtds.subscribe_crypto_prices(symbols) {
202 Ok(stream) => stream,
203 Err(error) => Err(map_ws_err(error))?,
204 };
205 let mut stream = std::pin::pin!(subscribed);
206 while let Some(result) = stream.next().await {
207 match result {
208 Ok(price) => {
209 yield StreamEvent::Rtds(RtdsStreamEvent::CryptoPrice {
210 source: "binance".into(),
211 symbol: price.symbol,
212 price: price.value.to_string(),
213 });
214 }
215 Err(error) => Err(map_ws_err(error))?,
216 }
217 }
218 }))
219}
220
221fn subscribe_crypto_chainlink(
222 ws: &WebSocketClients,
223 spec: CryptoPricesSubscription,
224) -> Result<EventStream, SubscribeError> {
225 let rtds = ws.rtds.clone();
226 if spec.symbols.len() <= 1 {
227 let symbol = spec.symbols.into_iter().next();
228 return Ok(Box::pin(async_stream::try_stream! {
229 let subscribed = match rtds.subscribe_chainlink_prices(symbol) {
230 Ok(stream) => stream,
231 Err(error) => Err(map_ws_err(error))?,
232 };
233 let mut stream = std::pin::pin!(subscribed);
234 while let Some(result) = stream.next().await {
235 match result {
236 Ok(price) => {
237 yield StreamEvent::Rtds(RtdsStreamEvent::CryptoPrice {
238 source: "chainlink".into(),
239 symbol: price.symbol,
240 price: price.value.to_string(),
241 });
242 }
243 Err(error) => Err(map_ws_err(error))?,
244 }
245 }
246 }));
247 }
248
249 let mut streams: Vec<EventStream> = Vec::with_capacity(spec.symbols.len());
250 for symbol in spec.symbols {
251 let rtds = ws.rtds.clone();
252 streams.push(Box::pin(async_stream::try_stream! {
253 let subscribed = match rtds.subscribe_chainlink_prices(Some(symbol)) {
254 Ok(stream) => stream,
255 Err(error) => Err(map_ws_err(error))?,
256 };
257 let mut stream = std::pin::pin!(subscribed);
258 while let Some(result) = stream.next().await {
259 match result {
260 Ok(price) => {
261 yield StreamEvent::Rtds(RtdsStreamEvent::CryptoPrice {
262 source: "chainlink".into(),
263 symbol: price.symbol,
264 price: price.value.to_string(),
265 });
266 }
267 Err(error) => Err(map_ws_err(error))?,
268 }
269 }
270 }));
271 }
272 Ok(merge_streams(streams))
273}
274
275fn subscribe_equity(
276 _ws: &WebSocketClients,
277 _spec: EquityPricesSubscription,
278) -> Result<EventStream, SubscribeError> {
279 Err(SubscribeError::Transport(
280 "equity price subscriptions are not yet supported in the Rust SDK".into(),
281 ))
282}
283
284#[cfg(feature = "secure")]
285pub fn subscribe_user(
286 ws: &WebSocketClients,
287 credentials: polymarket_client_sdk_v2::auth::Credentials,
288 address: polymarket_client_sdk_v2::types::Address,
289 spec: UserSubscription,
290) -> Result<EventStream, SubscribeError> {
291 let markets = if spec.markets.is_empty() {
292 Vec::new()
293 } else {
294 spec.markets
295 .iter()
296 .map(|market| {
297 B256::from_str(market).map_err(|e| {
298 SubscribeError::UserInput(user_input(format!("invalid market: {e}")))
299 })
300 })
301 .collect::<Result<Vec<_>, _>>()?
302 };
303
304 let clob = ClobWsClient::new(&ws.clob_ws_url, WsConfig::default()).map_err(map_ws_err)?;
305 let auth = clob
306 .authenticate(credentials, address)
307 .map_err(map_ws_err)?;
308
309 let stream = auth.subscribe_user_events(markets).map_err(map_ws_err)?;
310
311 Ok(Box::pin(stream.filter_map(|result| async move {
312 match result {
313 Ok(message) => map_user_message(message).map(Ok),
314 Err(error) => Some(Err(map_ws_err(error))),
315 }
316 })))
317}
318
319fn parse_token_ids(token_ids: &[String]) -> Result<Vec<U256>, SubscribeError> {
320 token_ids
321 .iter()
322 .map(|token_id| {
323 U256::from_str(token_id).map_err(|e| {
324 SubscribeError::UserInput(user_input(format!("invalid token_id: {e}")))
325 })
326 })
327 .collect()
328}
329
330fn map_ws_err(error: impl std::fmt::Display) -> SubscribeError {
331 SubscribeError::Transport(error.to_string())
332}
333
334fn map_market_book(
335 result: Result<
336 polymarket_client_sdk_v2::clob::ws::types::response::BookUpdate,
337 impl std::fmt::Display,
338 >,
339) -> Result<StreamEvent, SubscribeError> {
340 let book = result.map_err(map_ws_err)?;
341 Ok(StreamEvent::Market(MarketStreamEvent::OrderBook {
342 token_id: book.asset_id.to_string(),
343 market: book.market.to_string(),
344 timestamp: book.timestamp,
345 bid_levels: book.bids.len(),
346 ask_levels: book.asks.len(),
347 }))
348}
349
350fn map_market_price(
351 result: Result<
352 polymarket_client_sdk_v2::clob::ws::types::response::PriceChange,
353 impl std::fmt::Display,
354 >,
355) -> Result<StreamEvent, SubscribeError> {
356 let change = result.map_err(map_ws_err)?;
357 let entry =
358 change.price_changes.into_iter().next().ok_or_else(|| {
359 SubscribeError::Transport("price change event missing entries".into())
360 })?;
361 Ok(StreamEvent::Market(MarketStreamEvent::PriceChange {
362 token_id: entry.asset_id.to_string(),
363 market: change.market.to_string(),
364 price: entry.price.to_string(),
365 side: format!("{:?}", entry.side),
366 }))
367}
368
369fn map_market_last_trade(
370 result: Result<
371 polymarket_client_sdk_v2::clob::ws::types::response::LastTradePrice,
372 impl std::fmt::Display,
373 >,
374) -> Result<StreamEvent, SubscribeError> {
375 let trade = result.map_err(map_ws_err)?;
376 Ok(StreamEvent::Market(MarketStreamEvent::LastTradePrice {
377 token_id: trade.asset_id.to_string(),
378 market: trade.market.to_string(),
379 price: trade.price.to_string(),
380 }))
381}
382
383fn map_market_bba(
384 result: Result<
385 polymarket_client_sdk_v2::clob::ws::types::response::BestBidAsk,
386 impl std::fmt::Display,
387 >,
388) -> Result<StreamEvent, SubscribeError> {
389 let bba = result.map_err(map_ws_err)?;
390 Ok(StreamEvent::Market(MarketStreamEvent::BestBidAsk {
391 token_id: bba.asset_id.to_string(),
392 best_bid: bba.best_bid.to_string(),
393 best_ask: bba.best_ask.to_string(),
394 }))
395}
396
397#[cfg(feature = "secure")]
398fn map_user_message(message: WsMessage) -> Option<StreamEvent> {
399 match message {
400 WsMessage::Order(order) => Some(StreamEvent::User(UserStreamEvent::Order {
401 order_id: order.id,
402 token_id: order.asset_id.to_string(),
403 side: format!("{:?}", order.side),
404 status: order
405 .status
406 .map_or_else(|| "UNKNOWN".into(), |status| format!("{status:?}")),
407 })),
408 WsMessage::Trade(trade) => Some(StreamEvent::User(UserStreamEvent::Trade {
409 trade_id: trade.id,
410 token_id: trade.asset_id.to_string(),
411 side: format!("{:?}", trade.side),
412 price: trade.price.to_string(),
413 size: trade.size.to_string(),
414 })),
415 _ => None,
416 }
417}
418
419fn parse_sports_event(value: &serde_json::Value) -> Option<SportsStreamEvent> {
420 Some(SportsStreamEvent {
421 game_id: value.get("gameId")?.as_i64()?,
422 league: value
423 .get("leagueAbbreviation")
424 .and_then(|v| v.as_str())
425 .unwrap_or("")
426 .to_string(),
427 status: value
428 .get("status")
429 .and_then(|v| v.as_str())
430 .unwrap_or("")
431 .to_string(),
432 score: value
433 .get("score")
434 .and_then(|v| v.as_str())
435 .unwrap_or("")
436 .to_string(),
437 live: value
438 .get("live")
439 .and_then(serde_json::Value::as_bool)
440 .unwrap_or(false),
441 })
442}
443
444fn ensure_rustls_crypto_provider() {
445 static INIT: std::sync::Once = std::sync::Once::new();
446 INIT.call_once(|| {
447 let _ = rustls::crypto::ring::default_provider().install_default();
448 });
449}