polymarket_rs/websocket/user.rs
1use futures_util::{SinkExt, Stream, StreamExt};
2use std::pin::Pin;
3use tokio_tungstenite::{connect_async, tungstenite::Message};
4
5use crate::error::{Error, Result};
6use crate::types::{ApiCreds, UserAuthentication, UserWsEvent};
7
8/// WebSocket client for streaming authenticated user events
9///
10/// This client connects to the Polymarket CLOB user WebSocket endpoint and streams
11/// real-time updates about the user's trades and orders.
12///
13/// # Connection Management
14///
15/// The Polymarket WebSocket server will disconnect idle connections after 1-2 minutes.
16/// For production use, it's recommended to use [`ReconnectingStream`](crate::websocket::ReconnectingStream)
17/// to automatically handle disconnections and reconnect with exponential backoff.
18///
19/// # Example with Auto-Reconnect
20///
21/// ```no_run
22/// use polymarket_rs::websocket::{UserWsClient, ReconnectConfig, ReconnectingStream};
23/// use polymarket_rs::types::ApiCreds;
24/// use futures_util::StreamExt;
25/// use std::time::Duration;
26///
27/// #[tokio::main]
28/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
29/// let creds = ApiCreds::new(
30/// "api_key".to_string(),
31/// "api_secret".to_string(),
32/// "api_passphrase".to_string(),
33/// );
34///
35/// let client = UserWsClient::new();
36///
37/// let config = ReconnectConfig {
38/// initial_delay: Duration::from_secs(1),
39/// max_delay: Duration::from_secs(30),
40/// multiplier: 2.0,
41/// max_attempts: None,
42/// };
43///
44/// let creds_clone = creds.clone();
45/// let mut stream = ReconnectingStream::new(config, move || {
46/// let client = client.clone();
47/// let creds = creds_clone.clone();
48/// async move { client.subscribe_with_creds(&creds).await }
49/// });
50///
51/// while let Some(event) = stream.next().await {
52/// match event {
53/// Ok(evt) => println!("Event: {:?}", evt),
54/// Err(_) => continue, // Will auto-reconnect
55/// }
56/// }
57/// Ok(())
58/// }
59/// ```
60#[derive(Debug, Clone)]
61pub struct UserWsClient {
62 ws_url: String,
63}
64
65impl UserWsClient {
66 /// Default WebSocket URL for user events
67 const DEFAULT_WS_URL: &'static str = "wss://ws-subscriptions-clob.polymarket.com/ws/user";
68
69 /// Create a new user WebSocket client with the default endpoint
70 pub fn new() -> Self {
71 Self {
72 ws_url: Self::DEFAULT_WS_URL.to_string(),
73 }
74 }
75
76 /// Create a new user WebSocket client with a custom endpoint
77 pub fn with_url(ws_url: impl Into<String>) -> Self {
78 Self {
79 ws_url: ws_url.into(),
80 }
81 }
82
83 /// Subscribe to user events with API credentials
84 ///
85 /// Returns a stream of [`UserWsEvent`] items. The stream will yield events as they
86 /// are received from the WebSocket connection.
87 ///
88 /// # Arguments
89 ///
90 /// * `creds` - API credentials (api_key, secret, passphrase)
91 ///
92 /// # Events
93 ///
94 /// The stream will yield two types of events:
95 /// - [`UserWsEvent::Trade`]: Trade execution updates
96 /// - [`UserWsEvent::Order`]: Order status updates
97 ///
98 /// # Errors
99 ///
100 /// Returns an error if:
101 /// - The WebSocket connection fails
102 /// - The authentication message cannot be sent
103 /// - Authentication fails (server will close the connection)
104 ///
105 /// # Example
106 ///
107 /// ```no_run
108 /// # use polymarket_rs::websocket::UserWsClient;
109 /// # use polymarket_rs::types::ApiCreds;
110 /// # use futures_util::StreamExt;
111 /// # #[tokio::main]
112 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
113 /// let creds = ApiCreds::new(
114 /// "your_api_key".to_string(),
115 /// "your_api_secret".to_string(),
116 /// "your_api_passphrase".to_string(),
117 /// );
118 ///
119 /// let client = UserWsClient::new();
120 /// let mut stream = client.subscribe_with_creds(&creds).await?;
121 ///
122 /// while let Some(event) = stream.next().await {
123 /// println!("Event: {:?}", event?);
124 /// }
125 /// # Ok(())
126 /// # }
127 /// ```
128 pub async fn subscribe_with_creds(
129 &self,
130 creds: &ApiCreds,
131 ) -> Result<Pin<Box<dyn Stream<Item = Result<UserWsEvent>> + Send>>> {
132 self.subscribe(
133 creds.api_key.clone(),
134 creds.secret.clone(),
135 creds.passphrase.clone(),
136 )
137 .await
138 }
139
140 /// Subscribe to user events with authentication
141 ///
142 /// Returns a stream of [`UserWsEvent`] items. The stream will yield events as they
143 /// are received from the WebSocket connection.
144 ///
145 /// # Arguments
146 ///
147 /// * `api_key` - API key for authentication
148 /// * `api_secret` - API secret for authentication
149 /// * `api_passphrase` - API passphrase for authentication
150 ///
151 /// # Events
152 ///
153 /// The stream will yield two types of events:
154 /// - [`UserWsEvent::Trade`]: Trade execution updates
155 /// - [`UserWsEvent::Order`]: Order status updates
156 ///
157 /// # Errors
158 ///
159 /// Returns an error if:
160 /// - The WebSocket connection fails
161 /// - The authentication message cannot be sent
162 /// - Authentication fails (server will close the connection)
163 pub async fn subscribe(
164 &self,
165 api_key: String,
166 api_secret: String,
167 api_passphrase: String,
168 ) -> Result<Pin<Box<dyn Stream<Item = Result<UserWsEvent>> + Send>>> {
169 // Connect to the WebSocket endpoint
170 let (ws_stream, _) = connect_async(&self.ws_url).await?;
171
172 let (mut write, read) = ws_stream.split();
173
174 // Create authentication message
175 let auth = UserAuthentication::new(api_key, api_secret, api_passphrase);
176
177 let auth_msg = serde_json::to_string(&auth)?;
178
179 // Send authentication message
180 write
181 .send(Message::Text(auth_msg))
182 .await
183 .map_err(|e| Error::WebSocket(e.to_string()))?;
184
185 // Return stream that parses events
186 let stream = read.filter_map(|msg| async move {
187 match msg {
188 Ok(Message::Text(text)) => {
189 // The server can send either a single object or an array
190 // Try to parse as array first
191 if let Ok(events) = serde_json::from_str::<Vec<serde_json::Value>>(&text) {
192 // Got an array, take the first event
193 if let Some(first) = events.first() {
194 match serde_json::from_value::<UserWsEvent>(first.clone()) {
195 Ok(event) => return Some(Ok(event)),
196 Err(e) => return Some(Err(Error::Json(e))),
197 }
198 } else {
199 // Empty array, ignore
200 return None;
201 }
202 }
203
204 // Try parsing as single object
205 match serde_json::from_str::<UserWsEvent>(&text) {
206 Ok(event) => Some(Ok(event)),
207 Err(e) => Some(Err(Error::Json(e))),
208 }
209 }
210 Ok(Message::Close(close_frame)) => {
211 // Connection closed - may indicate auth failure
212 if let Some(frame) = close_frame {
213 Some(Err(Error::WebSocket(format!(
214 "Connection closed: code={}, reason={}",
215 frame.code, frame.reason
216 ))))
217 } else {
218 Some(Err(Error::ConnectionClosed))
219 }
220 }
221 Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => {
222 // Ignore ping/pong frames (handled automatically)
223 None
224 }
225 Ok(Message::Binary(_)) => {
226 // Unexpected binary message
227 Some(Err(Error::WebSocket(
228 "Unexpected binary message".to_string(),
229 )))
230 }
231 Ok(Message::Frame(_)) => {
232 // Raw frame (shouldn't happen)
233 None
234 }
235 Err(e) => {
236 // WebSocket error
237 Some(Err(Error::WebSocket(e.to_string())))
238 }
239 }
240 });
241
242 Ok(Box::pin(stream))
243 }
244}
245
246impl Default for UserWsClient {
247 fn default() -> Self {
248 Self::new()
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255
256 #[test]
257 fn test_client_creation() {
258 let client = UserWsClient::new();
259 assert_eq!(client.ws_url, UserWsClient::DEFAULT_WS_URL);
260 }
261}