polymarket_rs/websocket/
user.rs

1use futures_util::{SinkExt, Stream, StreamExt};
2use std::pin::Pin;
3use tokio_tungstenite::{connect_async, tungstenite::Message};
4
5use crate::error::{Error, Result};
6use crate::types::{ApiCreds, UserAuthentication, UserWsEvent};
7
8/// WebSocket client for streaming authenticated user events
9///
10/// This client connects to the Polymarket CLOB user WebSocket endpoint and streams
11/// real-time updates about the user's trades and orders.
12///
13/// # Connection Management
14///
15/// The Polymarket WebSocket server will disconnect idle connections after 1-2 minutes.
16/// For production use, it's recommended to use [`ReconnectingStream`](crate::websocket::ReconnectingStream)
17/// to automatically handle disconnections and reconnect with exponential backoff.
18///
19/// # Example with Auto-Reconnect
20///
21/// ```no_run
22/// use polymarket_rs::websocket::{UserWsClient, ReconnectConfig, ReconnectingStream};
23/// use polymarket_rs::types::ApiCreds;
24/// use futures_util::StreamExt;
25/// use std::time::Duration;
26///
27/// #[tokio::main]
28/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
29///     let creds = ApiCreds::new(
30///         "api_key".to_string(),
31///         "api_secret".to_string(),
32///         "api_passphrase".to_string(),
33///     );
34///
35///     let client = UserWsClient::new();
36///
37///     let config = ReconnectConfig {
38///         initial_delay: Duration::from_secs(1),
39///         max_delay: Duration::from_secs(30),
40///         multiplier: 2.0,
41///         max_attempts: None,
42///     };
43///
44///     let creds_clone = creds.clone();
45///     let mut stream = ReconnectingStream::new(config, move || {
46///         let client = client.clone();
47///         let creds = creds_clone.clone();
48///         async move { client.subscribe_with_creds(&creds).await }
49///     });
50///
51///     while let Some(event) = stream.next().await {
52///         match event {
53///             Ok(evt) => println!("Event: {:?}", evt),
54///             Err(_) => continue, // Will auto-reconnect
55///         }
56///     }
57///     Ok(())
58/// }
59/// ```
60#[derive(Debug, Clone)]
61pub struct UserWsClient {
62    ws_url: String,
63}
64
65impl UserWsClient {
66    /// Default WebSocket URL for user events
67    const DEFAULT_WS_URL: &'static str = "wss://ws-subscriptions-clob.polymarket.com/ws/user";
68
69    /// Create a new user WebSocket client with the default endpoint
70    pub fn new() -> Self {
71        Self {
72            ws_url: Self::DEFAULT_WS_URL.to_string(),
73        }
74    }
75
76    /// Create a new user WebSocket client with a custom endpoint
77    pub fn with_url(ws_url: impl Into<String>) -> Self {
78        Self {
79            ws_url: ws_url.into(),
80        }
81    }
82
83    /// Subscribe to user events with API credentials
84    ///
85    /// Returns a stream of [`UserWsEvent`] items. The stream will yield events as they
86    /// are received from the WebSocket connection.
87    ///
88    /// # Arguments
89    ///
90    /// * `creds` - API credentials (api_key, secret, passphrase)
91    ///
92    /// # Events
93    ///
94    /// The stream will yield two types of events:
95    /// - [`UserWsEvent::Trade`]: Trade execution updates
96    /// - [`UserWsEvent::Order`]: Order status updates
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if:
101    /// - The WebSocket connection fails
102    /// - The authentication message cannot be sent
103    /// - Authentication fails (server will close the connection)
104    ///
105    /// # Example
106    ///
107    /// ```no_run
108    /// # use polymarket_rs::websocket::UserWsClient;
109    /// # use polymarket_rs::types::ApiCreds;
110    /// # use futures_util::StreamExt;
111    /// # #[tokio::main]
112    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
113    /// let creds = ApiCreds::new(
114    ///     "your_api_key".to_string(),
115    ///     "your_api_secret".to_string(),
116    ///     "your_api_passphrase".to_string(),
117    /// );
118    ///
119    /// let client = UserWsClient::new();
120    /// let mut stream = client.subscribe_with_creds(&creds).await?;
121    ///
122    /// while let Some(event) = stream.next().await {
123    ///     println!("Event: {:?}", event?);
124    /// }
125    /// # Ok(())
126    /// # }
127    /// ```
128    pub async fn subscribe_with_creds(
129        &self,
130        creds: &ApiCreds,
131    ) -> Result<Pin<Box<dyn Stream<Item = Result<UserWsEvent>> + Send>>> {
132        self.subscribe(
133            creds.api_key.clone(),
134            creds.secret.clone(),
135            creds.passphrase.clone(),
136        )
137        .await
138    }
139
140    /// Subscribe to user events with authentication
141    ///
142    /// Returns a stream of [`UserWsEvent`] items. The stream will yield events as they
143    /// are received from the WebSocket connection.
144    ///
145    /// # Arguments
146    ///
147    /// * `api_key` - API key for authentication
148    /// * `api_secret` - API secret for authentication
149    /// * `api_passphrase` - API passphrase for authentication
150    ///
151    /// # Events
152    ///
153    /// The stream will yield two types of events:
154    /// - [`UserWsEvent::Trade`]: Trade execution updates
155    /// - [`UserWsEvent::Order`]: Order status updates
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if:
160    /// - The WebSocket connection fails
161    /// - The authentication message cannot be sent
162    /// - Authentication fails (server will close the connection)
163    pub async fn subscribe(
164        &self,
165        api_key: String,
166        api_secret: String,
167        api_passphrase: String,
168    ) -> Result<Pin<Box<dyn Stream<Item = Result<UserWsEvent>> + Send>>> {
169        // Connect to the WebSocket endpoint
170        let (ws_stream, _) = connect_async(&self.ws_url).await?;
171
172        let (mut write, read) = ws_stream.split();
173
174        // Create authentication message
175        let auth = UserAuthentication::new(api_key, api_secret, api_passphrase);
176
177        let auth_msg = serde_json::to_string(&auth)?;
178
179        // Send authentication message
180        write
181            .send(Message::Text(auth_msg))
182            .await
183            .map_err(|e| Error::WebSocket(e.to_string()))?;
184
185        // Return stream that parses events
186        let stream = read.filter_map(|msg| async move {
187            match msg {
188                Ok(Message::Text(text)) => {
189                    // The server can send either a single object or an array
190                    // Try to parse as array first
191                    if let Ok(events) = serde_json::from_str::<Vec<serde_json::Value>>(&text) {
192                        // Got an array, take the first event
193                        if let Some(first) = events.first() {
194                            match serde_json::from_value::<UserWsEvent>(first.clone()) {
195                                Ok(event) => return Some(Ok(event)),
196                                Err(e) => return Some(Err(Error::Json(e))),
197                            }
198                        } else {
199                            // Empty array, ignore
200                            return None;
201                        }
202                    }
203
204                    // Try parsing as single object
205                    match serde_json::from_str::<UserWsEvent>(&text) {
206                        Ok(event) => Some(Ok(event)),
207                        Err(e) => Some(Err(Error::Json(e))),
208                    }
209                }
210                Ok(Message::Close(close_frame)) => {
211                    // Connection closed - may indicate auth failure
212                    if let Some(frame) = close_frame {
213                        Some(Err(Error::WebSocket(format!(
214                            "Connection closed: code={}, reason={}",
215                            frame.code, frame.reason
216                        ))))
217                    } else {
218                        Some(Err(Error::ConnectionClosed))
219                    }
220                }
221                Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => {
222                    // Ignore ping/pong frames (handled automatically)
223                    None
224                }
225                Ok(Message::Binary(_)) => {
226                    // Unexpected binary message
227                    Some(Err(Error::WebSocket(
228                        "Unexpected binary message".to_string(),
229                    )))
230                }
231                Ok(Message::Frame(_)) => {
232                    // Raw frame (shouldn't happen)
233                    None
234                }
235                Err(e) => {
236                    // WebSocket error
237                    Some(Err(Error::WebSocket(e.to_string())))
238                }
239            }
240        });
241
242        Ok(Box::pin(stream))
243    }
244}
245
246impl Default for UserWsClient {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255
256    #[test]
257    fn test_client_creation() {
258        let client = UserWsClient::new();
259        assert_eq!(client.ws_url, UserWsClient::DEFAULT_WS_URL);
260    }
261}