polymarket_api/
rtds.rs

1use {
2    crate::error::{PolymarketError, Result},
3    futures_util::{SinkExt, StreamExt},
4    serde::{Deserialize, Serialize},
5    std::sync::Arc,
6    tokio::sync::Mutex,
7    tokio_tungstenite::{connect_async, tungstenite::Message},
8};
9
10#[cfg(feature = "tracing")]
11use tracing::{debug, error, warn};
12
13const RTDS_WS_URL: &str = "wss://ws-live-data.polymarket.com/";
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct RTDSSubscription {
17    pub action: String, // "subscribe" or "unsubscribe"
18    pub subscriptions: Vec<SubscriptionTopic>,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SubscriptionTopic {
23    pub topic: String, // "activity", "comments", etc.
24    #[serde(rename = "type")]
25    pub topic_type: String, // "*", "orders_matched", etc.
26    pub filters: String, // JSON string with filters
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub clob_auth: Option<ClobAuth>,
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub gamma_auth: Option<GammaAuth>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct ClobAuth {
35    pub key: String,
36    pub secret: String,
37    pub passphrase: String,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct GammaAuth {
42    pub address: String,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct RTDSMessage {
47    #[serde(rename = "connection_id")]
48    pub connection_id: Option<String>,
49    pub payload: ActivityPayload,
50    pub timestamp: i64,
51    pub topic: String,
52    #[serde(rename = "type")]
53    pub message_type: String,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct ActivityPayload {
58    pub asset: String,
59    pub side: String, // "BUY" or "SELL"
60    pub price: f64,
61    pub size: f64,
62    pub timestamp: i64,
63    pub title: String,
64    pub slug: String,
65    #[serde(rename = "eventSlug")]
66    pub event_slug: String,
67    pub outcome: String, // "Yes" or "No"
68    #[serde(rename = "outcomeIndex")]
69    pub outcome_index: i32,
70    pub name: String,
71    pub pseudonym: String,
72    #[serde(rename = "proxyWallet")]
73    pub proxy_wallet: String,
74    #[serde(rename = "transactionHash")]
75    pub transaction_hash: String,
76    #[serde(rename = "conditionId")]
77    pub condition_id: Option<String>,
78    pub bio: Option<String>,
79    pub icon: Option<String>,
80    pub profile_image: Option<String>,
81}
82
83pub struct RTDSClient {
84    event_slug: Option<String>,
85    event_id: Option<u64>,
86    clob_auth: Option<ClobAuth>,
87    gamma_auth: Option<GammaAuth>,
88}
89
90impl RTDSClient {
91    pub fn new() -> Self {
92        // Try to load authentication from environment variables
93        // Note: Activity subscriptions (orders_matched) typically don't require auth
94        // Only include auth if explicitly needed for protected subscriptions
95        let clob_auth = match (
96            std::env::var("api_key"),
97            std::env::var("secret"),
98            std::env::var("passphrase"),
99        ) {
100            (Ok(key), Ok(secret), Ok(passphrase)) => {
101                // Validate secret length (must be 44 characters as per API)
102                if secret.len() != 44 {
103                    #[cfg(feature = "tracing")]
104                    tracing::warn!(
105                        "CLOB secret length is {} (expected 44). Authentication may fail.",
106                        secret.len()
107                    );
108                    #[cfg(not(feature = "tracing"))]
109                    eprintln!(
110                        "Warning: CLOB secret length is {} (expected 44). Authentication may fail.",
111                        secret.len()
112                    );
113                }
114                Some(ClobAuth {
115                    key,
116                    secret,
117                    passphrase,
118                })
119            },
120            _ => None,
121        };
122
123        let gamma_auth = std::env::var("gamma_address")
124            .ok()
125            .map(|address| GammaAuth { address });
126
127        Self {
128            event_slug: None,
129            event_id: None,
130            clob_auth,
131            gamma_auth,
132        }
133    }
134
135    pub fn with_event_slug(mut self, event_slug: String) -> Self {
136        self.event_slug = Some(event_slug);
137        self
138    }
139
140    pub fn with_event_id(mut self, event_id: u64) -> Self {
141        self.event_id = Some(event_id);
142        self
143    }
144
145    pub fn with_clob_auth(mut self, key: String, secret: String, passphrase: String) -> Self {
146        self.clob_auth = Some(ClobAuth {
147            key,
148            secret,
149            passphrase,
150        });
151        self
152    }
153
154    pub fn with_gamma_auth(mut self, address: String) -> Self {
155        self.gamma_auth = Some(GammaAuth { address });
156        self
157    }
158
159    pub async fn connect_and_listen<F>(&self, mut on_update: F) -> Result<()>
160    where
161        F: FnMut(RTDSMessage) + Send,
162    {
163        #[cfg(feature = "tracing")]
164        debug!("Connecting to RTDS WebSocket: {}", RTDS_WS_URL);
165
166        let (ws_stream, _) = connect_async(RTDS_WS_URL).await.map_err(|e| {
167            PolymarketError::WebSocket(format!("Failed to connect to RTDS WebSocket: {}", e))
168        })?;
169
170        #[cfg(feature = "tracing")]
171        debug!("Connected to RTDS WebSocket");
172
173        let (write, mut read) = ws_stream.split();
174        let write = Arc::new(Mutex::new(write));
175
176        // Build subscription message
177        let mut subscriptions = Vec::new();
178
179        // Subscribe to activity (trades/orders_matched)
180        // Note: Activity subscriptions are public and typically don't require CLOB auth
181        // Only include auth if explicitly needed (some endpoints may require it)
182        if let Some(ref event_slug) = self.event_slug {
183            let filters = serde_json::json!({
184                "event_slug": event_slug
185            });
186            subscriptions.push(SubscriptionTopic {
187                topic: "activity".to_string(),
188                topic_type: "orders_matched".to_string(),
189                filters: serde_json::to_string(&filters).map_err(PolymarketError::Serialization)?,
190                clob_auth: None, // Activity subscriptions don't require CLOB auth (public data)
191                gamma_auth: self.gamma_auth.clone(),
192            });
193        }
194
195        // Subscribe to comments if event_id is provided
196        if let Some(event_id) = self.event_id {
197            let filters = serde_json::json!({
198                "parentEntityID": event_id,
199                "parentEntityType": "Event"
200            });
201            subscriptions.push(SubscriptionTopic {
202                topic: "comments".to_string(),
203                topic_type: "*".to_string(),
204                filters: serde_json::to_string(&filters).map_err(PolymarketError::Serialization)?,
205                clob_auth: None,                     // Comments don't need CLOB auth
206                gamma_auth: self.gamma_auth.clone(), // Comments might need gamma auth
207            });
208        }
209
210        if subscriptions.is_empty() {
211            return Err(crate::error::PolymarketError::InvalidData(
212                "No subscriptions configured. Provide event_slug or event_id.".to_string(),
213            ));
214        }
215
216        let subscribe_msg = RTDSSubscription {
217            action: "subscribe".to_string(),
218            subscriptions,
219        };
220
221        let subscribe_json =
222            serde_json::to_string(&subscribe_msg).map_err(PolymarketError::Serialization)?;
223
224        #[cfg(feature = "tracing")]
225        debug!("Sending RTDS subscription: {}", subscribe_json);
226
227        {
228            let mut w = write.lock().await;
229            w.send(Message::Text(subscribe_json.clone()))
230                .await
231                .map_err(|e| {
232                    PolymarketError::WebSocket(format!(
233                        "Failed to send RTDS subscription message: {}",
234                        e
235                    ))
236                })?;
237        }
238
239        #[cfg(feature = "tracing")]
240        debug!("RTDS subscription sent successfully");
241
242        // Start PING task (send PING every 5 seconds as per RTDS docs)
243        let write_ping = Arc::clone(&write);
244        let ping_handle = tokio::spawn(async move {
245            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
246            loop {
247                interval.tick().await;
248                let mut w = write_ping.lock().await;
249                if let Err(e) = w.send(Message::Text("PING".to_string())).await {
250                    #[cfg(feature = "tracing")]
251                    error!("Failed to send PING: {}", e);
252                    #[cfg(not(feature = "tracing"))]
253                    eprintln!("Failed to send PING: {}", e);
254                    break;
255                }
256            }
257        });
258
259        // Listen for messages
260        while let Some(msg) = read.next().await {
261            match msg {
262                Ok(Message::Text(text)) => {
263                    // Skip empty messages
264                    if text.trim().is_empty() {
265                        continue;
266                    }
267
268                    // Try to parse as RTDS message
269                    if let Ok(rtds_msg) = serde_json::from_str::<RTDSMessage>(&text) {
270                        #[cfg(feature = "tracing")]
271                        debug!(
272                            "Received RTDS message: topic={}, type={}",
273                            rtds_msg.topic, rtds_msg.message_type
274                        );
275                        on_update(rtds_msg);
276                        continue; // Successfully handled, move to next message
277                    }
278
279                    // Handle PING/PONG
280                    if text.as_str() == "PING" {
281                        // Respond to ping (though we're the ones sending PING, server might send it too)
282                        let mut w = write.lock().await;
283                        if let Err(e) = w.send(Message::Text("PONG".to_string())).await {
284                            #[cfg(feature = "tracing")]
285                            error!("Failed to send PONG: {}", e);
286                            #[cfg(not(feature = "tracing"))]
287                            eprintln!("Failed to send PONG: {}", e);
288                            break;
289                        }
290                    } else if text == "PONG" {
291                        // Server responded to our PING, just continue silently
292                        continue;
293                    } else {
294                        // Try to parse as error message
295                        if let Ok(error_json) = serde_json::from_str::<serde_json::Value>(&text)
296                            && let Some(body) = error_json.get("body")
297                            && let Some(message) = body.get("message").and_then(|m| m.as_str())
298                        {
299                            #[cfg(feature = "tracing")]
300                            {
301                                error!("RTDS error: {}", message);
302                            }
303                            #[cfg(not(feature = "tracing"))]
304                            {
305                                eprintln!("RTDS error: {}", message);
306                            }
307                            // If it's an authentication error, break the connection
308                            if message.contains("validation") || message.contains("auth") {
309                                break;
310                            }
311                            continue;
312                        }
313
314                        // If we get here, it's a truly unknown message format
315                        #[cfg(feature = "tracing")]
316                        {
317                            warn!(
318                                "Unknown RTDS message format: {}",
319                                if text.len() > 200 {
320                                    &text[..200]
321                                } else {
322                                    &text
323                                }
324                            );
325                        }
326                        #[cfg(not(feature = "tracing"))]
327                        {
328                            eprintln!(
329                                "Unknown RTDS message format: {}",
330                                if text.len() > 200 {
331                                    &text[..200]
332                                } else {
333                                    &text
334                                }
335                            );
336                        }
337                    }
338                },
339                Ok(Message::Ping(data)) => {
340                    // Respond to ping with pong
341                    let mut w = write.lock().await;
342                    if let Err(e) = w.send(Message::Pong(data)).await {
343                        #[cfg(feature = "tracing")]
344                        error!("Failed to send pong: {}", e);
345                        #[cfg(not(feature = "tracing"))]
346                        eprintln!("Failed to send pong: {}", e);
347                        break;
348                    }
349                },
350                Ok(Message::Close(_)) => {
351                    break;
352                },
353                Err(e) => {
354                    #[cfg(feature = "tracing")]
355                    error!("RTDS WebSocket error: {}", e);
356                    #[cfg(not(feature = "tracing"))]
357                    eprintln!("RTDS WebSocket error: {}", e);
358                    break;
359                },
360                _ => {},
361            }
362        }
363
364        // Cancel PING task
365        ping_handle.abort();
366
367        Ok(())
368    }
369}
370
371impl Default for RTDSClient {
372    fn default() -> Self {
373        Self::new()
374    }
375}