use std::sync::Arc;
use futures::Stream;
use futures::StreamExt as _;
use super::subscription::{SimpleParser, SubscriptionManager, TopicType};
use super::types::request::Subscription;
use super::types::response::{ChainlinkPrice, Comment, CommentType, CryptoPrice, RtdsMessage};
use crate::Result;
use crate::auth::state::{Authenticated, State, Unauthenticated};
use crate::auth::{Credentials, Normal};
use crate::error::Error;
use crate::types::Address;
use crate::ws::ConnectionManager;
use crate::ws::config::Config;
use crate::ws::connection::ConnectionState;
#[derive(Clone)]
pub struct Client<S: State = Unauthenticated> {
inner: Arc<ClientInner<S>>,
}
impl Default for Client<Unauthenticated> {
fn default() -> Self {
Self::new("wss://ws-live-data.polymarket.com", Config::default())
.expect("RTDS client with default endpoint should succeed")
}
}
struct ClientInner<S: State> {
state: S,
config: Config,
endpoint: String,
connection: ConnectionManager<RtdsMessage, SimpleParser>,
subscriptions: Arc<SubscriptionManager>,
}
impl Client<Unauthenticated> {
pub fn new(endpoint: &str, config: Config) -> Result<Self> {
let connection = ConnectionManager::new(endpoint.to_owned(), config.clone(), SimpleParser)?;
let subscriptions = Arc::new(SubscriptionManager::new(connection.clone()));
subscriptions.start_reconnection_handler();
Ok(Self {
inner: Arc::new(ClientInner {
state: Unauthenticated,
config,
endpoint: endpoint.to_owned(),
connection,
subscriptions,
}),
})
}
pub fn authenticate(
self,
address: Address,
credentials: Credentials,
) -> Result<Client<Authenticated<Normal>>> {
let inner = Arc::into_inner(self.inner).ok_or(Error::validation(
"Cannot authenticate while other references to this client exist",
))?;
Ok(Client {
inner: Arc::new(ClientInner {
state: Authenticated {
address,
credentials,
kind: Normal,
},
config: inner.config,
endpoint: inner.endpoint,
connection: inner.connection,
subscriptions: inner.subscriptions,
}),
})
}
pub fn subscribe_comments(
&self,
comment_type: Option<CommentType>,
) -> Result<impl Stream<Item = Result<Comment>>> {
let subscription = Subscription::comments(comment_type);
let stream = self.inner.subscriptions.subscribe(subscription)?;
Ok(stream.filter_map(|msg_result| async move {
match msg_result {
Ok(msg) => msg.as_comment().map(Ok),
Err(e) => Some(Err(e)),
}
}))
}
}
impl<S: State> Client<S> {
pub fn subscribe_crypto_prices(
&self,
symbols: Option<Vec<String>>,
) -> Result<impl Stream<Item = Result<CryptoPrice>>> {
let subscription = Subscription::crypto_prices(symbols);
let stream = self.inner.subscriptions.subscribe(subscription)?;
Ok(stream.filter_map(|msg_result| async move {
match msg_result {
Ok(msg) => msg.as_crypto_price().map(Ok),
Err(e) => Some(Err(e)),
}
}))
}
pub fn subscribe_chainlink_prices(
&self,
symbol: Option<String>,
) -> Result<impl Stream<Item = Result<ChainlinkPrice>>> {
let subscription = Subscription::chainlink_prices(symbol);
let stream = self.inner.subscriptions.subscribe(subscription)?;
Ok(stream.filter_map(|msg_result| async move {
match msg_result {
Ok(msg) => msg.as_chainlink_price().map(Ok),
Err(e) => Some(Err(e)),
}
}))
}
pub fn subscribe_raw(
&self,
subscription: Subscription,
) -> Result<impl Stream<Item = Result<RtdsMessage>>> {
self.inner.subscriptions.subscribe(subscription)
}
#[must_use]
pub fn connection_state(&self) -> ConnectionState {
self.inner.connection.state()
}
#[must_use]
pub fn subscription_count(&self) -> usize {
self.inner.subscriptions.subscription_count()
}
pub fn unsubscribe_crypto_prices(&self) -> Result<()> {
let topic = TopicType::new("crypto_prices".to_owned(), "update".to_owned());
self.inner.subscriptions.unsubscribe(&[topic])
}
pub fn unsubscribe_chainlink_prices(&self) -> Result<()> {
let topic = TopicType::new("crypto_prices_chainlink".to_owned(), "*".to_owned());
self.inner.subscriptions.unsubscribe(&[topic])
}
pub fn unsubscribe_comments(&self, comment_type: Option<CommentType>) -> Result<()> {
let msg_type = comment_type.map_or("*".to_owned(), |t| {
serde_json::to_string(&t)
.ok()
.and_then(|s| s.trim_matches('"').to_owned().into())
.unwrap_or_else(|| "*".to_owned())
});
let topic = TopicType::new("comments".to_owned(), msg_type);
self.inner.subscriptions.unsubscribe(&[topic])
}
}
impl Client<Authenticated<Normal>> {
pub fn subscribe_comments(
&self,
comment_type: Option<CommentType>,
) -> Result<impl Stream<Item = Result<Comment>>> {
let subscription = Subscription::comments(comment_type)
.with_clob_auth(self.inner.state.credentials.clone());
let stream = self.inner.subscriptions.subscribe(subscription)?;
Ok(stream.filter_map(|msg_result| async move {
match msg_result {
Ok(msg) => msg.as_comment().map(Ok),
Err(e) => Some(Err(e)),
}
}))
}
pub fn deauthenticate(self) -> Result<Client<Unauthenticated>> {
let inner = Arc::into_inner(self.inner).ok_or(Error::validation(
"Cannot deauthenticate while other references to this client exist",
))?;
Ok(Client {
inner: Arc::new(ClientInner {
state: Unauthenticated,
config: inner.config,
endpoint: inner.endpoint,
connection: inner.connection,
subscriptions: inner.subscriptions,
}),
})
}
}