pub mod messages;
pub mod types;
use {
anyhow::{Context, Result},
futures_util::{SinkExt, StreamExt},
serde::{Deserialize, Serialize},
std::collections::HashMap,
tokio_tungstenite::{connect_async, tungstenite::Message},
};
#[cfg(feature = "tracing")]
use tracing::{error, warn};
pub use {
messages::{Auth, SubscribedMessage, SubscriptionMessage, UpdateSubscriptionMessage},
types::{ErrorMessage, OrderUpdate, OrderbookUpdate, PriceLevel, PriceUpdate, TradeUpdate},
};
const WS_URL: &str = "wss://ws-subscriptions-clob.polymarket.com/ws/market";
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum WebSocketMessage {
#[serde(rename = "orderbook")]
Orderbook(OrderbookUpdate),
#[serde(rename = "trade")]
Trade(TradeUpdate),
#[serde(rename = "order")]
Order(OrderUpdate),
#[serde(rename = "price")]
Price(PriceUpdate),
#[serde(rename = "error")]
Error(ErrorMessage),
#[serde(rename = "subscribed")]
Subscribed(SubscribedMessage),
#[serde(other)]
Unknown,
}
pub struct PolymarketWebSocket {
pub(crate) asset_ids: Vec<String>,
market_info_cache: HashMap<String, crate::gamma::MarketInfo>,
}
impl PolymarketWebSocket {
pub fn new(asset_ids: Vec<String>) -> Self {
Self {
asset_ids,
market_info_cache: HashMap::new(),
}
}
pub async fn connect_and_listen<F>(&mut self, mut on_update: F) -> Result<()>
where
F: FnMut(WebSocketMessage) + Send,
{
let (ws_stream, _) = connect_async(WS_URL)
.await
.context("Failed to connect to WebSocket")?;
let (mut write, mut read) = ws_stream.split();
let subscribe_msg = SubscriptionMessage {
auth: None, markets: None,
assets_ids: Some(self.asset_ids.clone()),
channel_type: "market".to_string(), custom_feature_enabled: None,
};
let subscribe_json = serde_json::to_string(&subscribe_msg)?;
write
.send(Message::Text(subscribe_json))
.await
.context("Failed to send subscription message")?;
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
if let Ok(ws_msg) = serde_json::from_str::<WebSocketMessage>(&text) {
on_update(ws_msg);
} else if let Ok(subscribed) = serde_json::from_str::<SubscribedMessage>(&text)
{
on_update(WebSocketMessage::Subscribed(subscribed));
} else if let Ok(err) = serde_json::from_str::<ErrorMessage>(&text) {
on_update(WebSocketMessage::Error(err));
} else {
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&text)
&& let Some(msg_type) = json.get("type").and_then(|v| v.as_str())
{
match msg_type {
"orderbook" => {
if let Ok(update) =
serde_json::from_value::<OrderbookUpdate>(json)
{
on_update(WebSocketMessage::Orderbook(update));
}
},
"trade" => {
if let Ok(update) = serde_json::from_value::<TradeUpdate>(json)
{
on_update(WebSocketMessage::Trade(update));
}
},
"order" => {
if let Ok(update) = serde_json::from_value::<OrderUpdate>(json)
{
on_update(WebSocketMessage::Order(update));
}
},
"price" => {
if let Ok(update) = serde_json::from_value::<PriceUpdate>(json)
{
on_update(WebSocketMessage::Price(update));
}
},
_ => {
#[cfg(feature = "tracing")]
warn!("Unknown message type: {}", text);
#[cfg(not(feature = "tracing"))]
eprintln!("Unknown message type: {}", text);
},
}
}
}
},
Ok(Message::Ping(data)) => {
if let Err(e) = write.send(Message::Pong(data)).await {
#[cfg(feature = "tracing")]
error!("Failed to send pong: {}", e);
#[cfg(not(feature = "tracing"))]
eprintln!("Failed to send pong: {}", e);
break;
}
},
Ok(Message::Close(_)) => {
break;
},
Err(e) => {
#[cfg(feature = "tracing")]
error!("WebSocket error: {}", e);
#[cfg(not(feature = "tracing"))]
eprintln!("WebSocket error: {}", e);
break;
},
_ => {},
}
}
Ok(())
}
pub fn update_market_info(&mut self, asset_id: String, info: crate::gamma::MarketInfo) {
self.market_info_cache.insert(asset_id, info);
}
pub fn get_market_info(&self, asset_id: &str) -> Option<&crate::gamma::MarketInfo> {
self.market_info_cache.get(asset_id)
}
}