use futures_util::{SinkExt, Stream, StreamExt};
use std::pin::Pin;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use crate::error::{Error, Result};
use crate::types::{ApiCreds, UserAuthentication, UserWsEvent};
#[derive(Debug, Clone)]
pub struct UserWsClient {
ws_url: String,
}
impl UserWsClient {
const DEFAULT_WS_URL: &'static str = "wss://ws-subscriptions-clob.polymarket.com/ws/user";
pub fn new() -> Self {
Self {
ws_url: Self::DEFAULT_WS_URL.to_string(),
}
}
pub fn with_url(ws_url: impl Into<String>) -> Self {
Self {
ws_url: ws_url.into(),
}
}
pub async fn subscribe_with_creds(
&self,
creds: &ApiCreds,
) -> Result<Pin<Box<dyn Stream<Item = Result<UserWsEvent>> + Send>>> {
self.subscribe(
creds.api_key.clone(),
creds.secret.clone(),
creds.passphrase.clone(),
)
.await
}
pub async fn subscribe(
&self,
api_key: String,
api_secret: String,
api_passphrase: String,
) -> Result<Pin<Box<dyn Stream<Item = Result<UserWsEvent>> + Send>>> {
let (ws_stream, _) = connect_async(&self.ws_url).await?;
let (mut write, read) = ws_stream.split();
let auth = UserAuthentication::new(api_key, api_secret, api_passphrase);
let auth_msg = serde_json::to_string(&auth)?;
write
.send(Message::Text(auth_msg))
.await
.map_err(|e| Error::WebSocket(e.to_string()))?;
let stream = read.filter_map(|msg| async move {
match msg {
Ok(Message::Text(text)) => {
if let Ok(events) = serde_json::from_str::<Vec<serde_json::Value>>(&text) {
if let Some(first) = events.first() {
match serde_json::from_value::<UserWsEvent>(first.clone()) {
Ok(event) => return Some(Ok(event)),
Err(e) => return Some(Err(Error::Json(e))),
}
} else {
return None;
}
}
match serde_json::from_str::<UserWsEvent>(&text) {
Ok(event) => Some(Ok(event)),
Err(e) => Some(Err(Error::Json(e))),
}
}
Ok(Message::Close(close_frame)) => {
if let Some(frame) = close_frame {
Some(Err(Error::WebSocket(format!(
"Connection closed: code={}, reason={}",
frame.code, frame.reason
))))
} else {
Some(Err(Error::ConnectionClosed))
}
}
Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => {
None
}
Ok(Message::Binary(_)) => {
Some(Err(Error::WebSocket(
"Unexpected binary message".to_string(),
)))
}
Ok(Message::Frame(_)) => {
None
}
Err(e) => {
Some(Err(Error::WebSocket(e.to_string())))
}
}
});
Ok(Box::pin(stream))
}
}
impl Default for UserWsClient {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_client_creation() {
let client = UserWsClient::new();
assert_eq!(client.ws_url, UserWsClient::DEFAULT_WS_URL);
}
}