1use {
2 crate::error::{PolymarketError, Result},
3 futures_util::{SinkExt, StreamExt},
4 serde::{Deserialize, Serialize},
5 std::sync::Arc,
6 tokio::sync::Mutex,
7 tokio_tungstenite::{connect_async, tungstenite::Message},
8};
9
10#[cfg(feature = "tracing")]
11use tracing::{debug, error, warn};
12
13const RTDS_WS_URL: &str = "wss://ws-live-data.polymarket.com/";
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct RTDSSubscription {
17 pub action: String, pub subscriptions: Vec<SubscriptionTopic>,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SubscriptionTopic {
23 pub topic: String, #[serde(rename = "type")]
25 pub topic_type: String, pub filters: String, #[serde(skip_serializing_if = "Option::is_none")]
28 pub clob_auth: Option<ClobAuth>,
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub gamma_auth: Option<GammaAuth>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct ClobAuth {
35 pub key: String,
36 pub secret: String,
37 pub passphrase: String,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct GammaAuth {
42 pub address: String,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct RTDSMessage {
47 #[serde(rename = "connection_id")]
48 pub connection_id: Option<String>,
49 pub payload: ActivityPayload,
50 pub timestamp: i64,
51 pub topic: String,
52 #[serde(rename = "type")]
53 pub message_type: String,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct ActivityPayload {
58 pub asset: String,
59 pub side: String, pub price: f64,
61 pub size: f64,
62 pub timestamp: i64,
63 pub title: String,
64 pub slug: String,
65 #[serde(rename = "eventSlug")]
66 pub event_slug: String,
67 pub outcome: String, #[serde(rename = "outcomeIndex")]
69 pub outcome_index: i32,
70 pub name: String,
71 pub pseudonym: String,
72 #[serde(rename = "proxyWallet")]
73 pub proxy_wallet: String,
74 #[serde(rename = "transactionHash")]
75 pub transaction_hash: String,
76 #[serde(rename = "conditionId")]
77 pub condition_id: Option<String>,
78 pub bio: Option<String>,
79 pub icon: Option<String>,
80 pub profile_image: Option<String>,
81}
82
83pub struct RTDSClient {
84 event_slug: Option<String>,
85 event_id: Option<u64>,
86 clob_auth: Option<ClobAuth>,
87 gamma_auth: Option<GammaAuth>,
88}
89
90impl RTDSClient {
91 pub fn new() -> Self {
92 let clob_auth = match (
96 std::env::var("api_key"),
97 std::env::var("secret"),
98 std::env::var("passphrase"),
99 ) {
100 (Ok(key), Ok(secret), Ok(passphrase)) => {
101 if secret.len() != 44 {
103 #[cfg(feature = "tracing")]
104 tracing::warn!(
105 "CLOB secret length is {} (expected 44). Authentication may fail.",
106 secret.len()
107 );
108 #[cfg(not(feature = "tracing"))]
109 eprintln!(
110 "Warning: CLOB secret length is {} (expected 44). Authentication may fail.",
111 secret.len()
112 );
113 }
114 Some(ClobAuth {
115 key,
116 secret,
117 passphrase,
118 })
119 },
120 _ => None,
121 };
122
123 let gamma_auth = std::env::var("gamma_address")
124 .ok()
125 .map(|address| GammaAuth { address });
126
127 Self {
128 event_slug: None,
129 event_id: None,
130 clob_auth,
131 gamma_auth,
132 }
133 }
134
135 pub fn with_event_slug(mut self, event_slug: String) -> Self {
136 self.event_slug = Some(event_slug);
137 self
138 }
139
140 pub fn with_event_id(mut self, event_id: u64) -> Self {
141 self.event_id = Some(event_id);
142 self
143 }
144
145 pub fn with_clob_auth(mut self, key: String, secret: String, passphrase: String) -> Self {
146 self.clob_auth = Some(ClobAuth {
147 key,
148 secret,
149 passphrase,
150 });
151 self
152 }
153
154 pub fn with_gamma_auth(mut self, address: String) -> Self {
155 self.gamma_auth = Some(GammaAuth { address });
156 self
157 }
158
159 pub async fn connect_and_listen<F>(&self, mut on_update: F) -> Result<()>
160 where
161 F: FnMut(RTDSMessage) + Send,
162 {
163 #[cfg(feature = "tracing")]
164 debug!("Connecting to RTDS WebSocket: {}", RTDS_WS_URL);
165
166 let (ws_stream, _) = connect_async(RTDS_WS_URL).await.map_err(|e| {
167 PolymarketError::WebSocket(format!("Failed to connect to RTDS WebSocket: {}", e))
168 })?;
169
170 #[cfg(feature = "tracing")]
171 debug!("Connected to RTDS WebSocket");
172
173 let (write, mut read) = ws_stream.split();
174 let write = Arc::new(Mutex::new(write));
175
176 let mut subscriptions = Vec::new();
178
179 if let Some(ref event_slug) = self.event_slug {
183 let filters = serde_json::json!({
184 "event_slug": event_slug
185 });
186 subscriptions.push(SubscriptionTopic {
187 topic: "activity".to_string(),
188 topic_type: "orders_matched".to_string(),
189 filters: serde_json::to_string(&filters).map_err(PolymarketError::Serialization)?,
190 clob_auth: None, gamma_auth: self.gamma_auth.clone(),
192 });
193 }
194
195 if let Some(event_id) = self.event_id {
197 let filters = serde_json::json!({
198 "parentEntityID": event_id,
199 "parentEntityType": "Event"
200 });
201 subscriptions.push(SubscriptionTopic {
202 topic: "comments".to_string(),
203 topic_type: "*".to_string(),
204 filters: serde_json::to_string(&filters).map_err(PolymarketError::Serialization)?,
205 clob_auth: None, gamma_auth: self.gamma_auth.clone(), });
208 }
209
210 if subscriptions.is_empty() {
211 return Err(crate::error::PolymarketError::InvalidData(
212 "No subscriptions configured. Provide event_slug or event_id.".to_string(),
213 ));
214 }
215
216 let subscribe_msg = RTDSSubscription {
217 action: "subscribe".to_string(),
218 subscriptions,
219 };
220
221 let subscribe_json =
222 serde_json::to_string(&subscribe_msg).map_err(PolymarketError::Serialization)?;
223
224 #[cfg(feature = "tracing")]
225 debug!("Sending RTDS subscription: {}", subscribe_json);
226
227 {
228 let mut w = write.lock().await;
229 w.send(Message::Text(subscribe_json.clone()))
230 .await
231 .map_err(|e| {
232 PolymarketError::WebSocket(format!(
233 "Failed to send RTDS subscription message: {}",
234 e
235 ))
236 })?;
237 }
238
239 #[cfg(feature = "tracing")]
240 debug!("RTDS subscription sent successfully");
241
242 let write_ping = Arc::clone(&write);
244 let ping_handle = tokio::spawn(async move {
245 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
246 loop {
247 interval.tick().await;
248 let mut w = write_ping.lock().await;
249 if let Err(e) = w.send(Message::Text("PING".to_string())).await {
250 #[cfg(feature = "tracing")]
251 error!("Failed to send PING: {}", e);
252 #[cfg(not(feature = "tracing"))]
253 eprintln!("Failed to send PING: {}", e);
254 break;
255 }
256 }
257 });
258
259 while let Some(msg) = read.next().await {
261 match msg {
262 Ok(Message::Text(text)) => {
263 if text.trim().is_empty() {
265 continue;
266 }
267
268 if let Ok(rtds_msg) = serde_json::from_str::<RTDSMessage>(&text) {
270 #[cfg(feature = "tracing")]
271 debug!(
272 "Received RTDS message: topic={}, type={}",
273 rtds_msg.topic, rtds_msg.message_type
274 );
275 on_update(rtds_msg);
276 continue; }
278
279 if text.as_str() == "PING" {
281 let mut w = write.lock().await;
283 if let Err(e) = w.send(Message::Text("PONG".to_string())).await {
284 #[cfg(feature = "tracing")]
285 error!("Failed to send PONG: {}", e);
286 #[cfg(not(feature = "tracing"))]
287 eprintln!("Failed to send PONG: {}", e);
288 break;
289 }
290 } else if text == "PONG" {
291 continue;
293 } else {
294 if let Ok(error_json) = serde_json::from_str::<serde_json::Value>(&text)
296 && let Some(body) = error_json.get("body")
297 && let Some(message) = body.get("message").and_then(|m| m.as_str())
298 {
299 #[cfg(feature = "tracing")]
300 {
301 error!("RTDS error: {}", message);
302 }
303 #[cfg(not(feature = "tracing"))]
304 {
305 eprintln!("RTDS error: {}", message);
306 }
307 if message.contains("validation") || message.contains("auth") {
309 break;
310 }
311 continue;
312 }
313
314 #[cfg(feature = "tracing")]
316 {
317 warn!(
318 "Unknown RTDS message format: {}",
319 if text.len() > 200 {
320 &text[..200]
321 } else {
322 &text
323 }
324 );
325 }
326 #[cfg(not(feature = "tracing"))]
327 {
328 eprintln!(
329 "Unknown RTDS message format: {}",
330 if text.len() > 200 {
331 &text[..200]
332 } else {
333 &text
334 }
335 );
336 }
337 }
338 },
339 Ok(Message::Ping(data)) => {
340 let mut w = write.lock().await;
342 if let Err(e) = w.send(Message::Pong(data)).await {
343 #[cfg(feature = "tracing")]
344 error!("Failed to send pong: {}", e);
345 #[cfg(not(feature = "tracing"))]
346 eprintln!("Failed to send pong: {}", e);
347 break;
348 }
349 },
350 Ok(Message::Close(_)) => {
351 break;
352 },
353 Err(e) => {
354 #[cfg(feature = "tracing")]
355 error!("RTDS WebSocket error: {}", e);
356 #[cfg(not(feature = "tracing"))]
357 eprintln!("RTDS WebSocket error: {}", e);
358 break;
359 },
360 _ => {},
361 }
362 }
363
364 ping_handle.abort();
366
367 Ok(())
368 }
369}
370
371impl Default for RTDSClient {
372 fn default() -> Self {
373 Self::new()
374 }
375}