use std::sync::{Arc, Mutex};
use std::time::Duration;
use async_trait::async_trait;
use reqwest::Client;
use crate::core::{
AccountType, ExchangeError, ExchangeId, ExchangeResult, ExchangeType,
Kline, OrderBook, Price, Symbol, SymbolInfo, Ticker,
};
use crate::core::traits::{ExchangeIdentity, MarketData};
use crate::core::types::MarketDataCapabilities;
use crate::core::types::{ConnectorStats, RateLimitCapabilities, LimitModel, RestLimitPool, WsLimits, OrderbookCapabilities};
use crate::core::utils::{RuntimeLimiter, RateLimitMonitor, RateLimitPressure};
use super::auth::{PolymarketAuth, PolymarketCredentials};
use super::endpoints::{
PolymarketEndpoints, map_interval, get_fidelity,
};
use super::parser::{
PolymarketParser, ClobMarket, PolyMarket, PolyOrderBook, PolyMidpoint,
PolyEvent,
clob_market_to_symbol_info, clob_market_to_ticker,
poly_market_to_symbol_info,
price_history_to_klines, poly_orderbook_to_v5,
interval_to_ms,
};
static POLYMARKET_RATE_POOLS: &[RestLimitPool] = &[RestLimitPool {
name: "default",
max_budget: 500,
window_seconds: 60,
is_weight: false,
has_server_headers: false,
server_header: None,
header_reports_used: false,
}];
static POLYMARKET_RATE_CAPS: RateLimitCapabilities = RateLimitCapabilities {
model: LimitModel::Simple,
rest_pools: POLYMARKET_RATE_POOLS,
decaying: None,
endpoint_weights: &[],
ws: WsLimits {
max_connections: None,
max_subs_per_conn: None,
max_msg_per_sec: None,
max_streams_per_conn: None,
},
};
pub struct PolymarketConnector {
client: Client,
_auth: PolymarketAuth,
endpoints: PolymarketEndpoints,
limiter: Arc<Mutex<RuntimeLimiter>>,
monitor: Arc<Mutex<RateLimitMonitor>>,
}
impl PolymarketConnector {
pub fn public() -> Self {
Self {
client: Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap_or_default(),
_auth: PolymarketAuth::new(),
endpoints: PolymarketEndpoints::default(),
limiter: Arc::new(Mutex::new(RuntimeLimiter::from_caps(&POLYMARKET_RATE_CAPS))),
monitor: Arc::new(Mutex::new(RateLimitMonitor::new("Polymarket"))),
}
}
pub fn authenticated(creds: PolymarketCredentials) -> Self {
Self {
client: Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap_or_default(),
_auth: PolymarketAuth::with_credentials(creds),
endpoints: PolymarketEndpoints::default(),
limiter: Arc::new(Mutex::new(RuntimeLimiter::from_caps(&POLYMARKET_RATE_CAPS))),
monitor: Arc::new(Mutex::new(RateLimitMonitor::new("Polymarket"))),
}
}
pub fn from_env() -> Self {
Self {
client: Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap_or_default(),
_auth: PolymarketAuth::from_env(),
endpoints: PolymarketEndpoints::default(),
limiter: Arc::new(Mutex::new(RuntimeLimiter::from_caps(&POLYMARKET_RATE_CAPS))),
monitor: Arc::new(Mutex::new(RateLimitMonitor::new("Polymarket"))),
}
}
async fn rate_limit_wait(&self, weight: u32, essential: bool) -> bool {
loop {
let wait_time = {
let mut limiter = self.limiter.lock()
.expect("rate limiter mutex poisoned");
let pressure = self.monitor.lock()
.expect("rate monitor mutex poisoned")
.check(&mut limiter);
if pressure >= RateLimitPressure::Cutoff && !essential {
return false;
}
if limiter.try_acquire("default", weight) {
return true;
}
limiter.time_until_ready("default", weight)
};
if wait_time > Duration::ZERO {
tokio::time::sleep(wait_time).await;
}
}
}
async fn get_url(&self, url: &str) -> ExchangeResult<serde_json::Value> {
if !self.rate_limit_wait(1, false).await {
return Err(ExchangeError::RateLimitExceeded {
retry_after: None,
message: "Rate limit budget >= 90% used; non-essential market data request dropped".to_string(),
});
}
let response = self
.client
.get(url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!("Request failed: {}", e)))?;
let status = response.status();
if !status.is_success() {
let body = response.text().await.unwrap_or_default();
return Err(ExchangeError::Api {
code: status.as_u16() as i32,
message: format!("HTTP {}: {}", status, body.chars().take(200).collect::<String>()),
});
}
let json: serde_json::Value = response
.json()
.await
.map_err(|e| ExchangeError::Parse(format!("JSON parse error: {}", e)))?;
PolymarketParser::check_error(&json)?;
Ok(json)
}
async fn get_clob(
&self,
path: &str,
params: &[(&str, &str)],
) -> ExchangeResult<serde_json::Value> {
let base_url = format!("{}{}", self.endpoints.clob_base, path);
let url = if params.is_empty() {
base_url
} else {
let qs: String = params
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join("&");
format!("{}?{}", base_url, qs)
};
self.get_url(&url).await
}
async fn get_gamma(
&self,
path: &str,
params: &[(&str, &str)],
) -> ExchangeResult<serde_json::Value> {
let base_url = format!("{}{}", self.endpoints.gamma_base, path);
let url = if params.is_empty() {
base_url
} else {
let qs: String = params
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join("&");
format!("{}?{}", base_url, qs)
};
self.get_url(&url).await
}
pub async fn get_markets(
&self,
limit: Option<u32>,
next_cursor: Option<&str>,
) -> ExchangeResult<(Vec<ClobMarket>, Option<String>)> {
let limit_str;
let cursor_str;
let mut params: Vec<(&str, &str)> = Vec::new();
if let Some(l) = limit {
limit_str = l.to_string();
params.push(("limit", &limit_str));
}
if let Some(c) = next_cursor {
cursor_str = c.to_string();
params.push(("next_cursor", &cursor_str));
}
let response = self.get_clob("/markets", ¶ms).await?;
let markets = PolymarketParser::parse_clob_markets(&response)?;
let next = PolymarketParser::get_next_cursor(&response);
Ok((markets, next))
}
pub async fn get_market(&self, condition_id: &str) -> ExchangeResult<ClobMarket> {
let url = format!("{}/markets/{}", self.endpoints.clob_base, condition_id);
let response = self.get_url(&url).await?;
PolymarketParser::parse_clob_market(&response)
}
pub async fn get_order_book(&self, token_id: &str) -> ExchangeResult<PolyOrderBook> {
let response = self.get_clob("/book", &[("token_id", token_id)]).await?;
PolymarketParser::parse_order_book(&response)
}
pub async fn get_midpoint(&self, token_id: &str) -> ExchangeResult<PolyMidpoint> {
let response = self.get_clob("/midpoint", &[("token_id", token_id)]).await?;
PolymarketParser::parse_midpoint(&response)
}
pub async fn get_last_trade_price(&self, token_id: &str) -> ExchangeResult<f64> {
let response = self.get_clob("/last-trade-price", &[("token_id", token_id)]).await?;
PolymarketParser::parse_price(&response)
}
pub async fn get_active_markets(&self, limit: Option<u32>) -> ExchangeResult<Vec<ClobMarket>> {
let limit_str;
let mut params: Vec<(&str, &str)> = vec![("active", "true")];
if let Some(l) = limit {
limit_str = l.to_string();
params.push(("limit", &limit_str));
}
let response = self.get_clob("/markets", ¶ms).await?;
PolymarketParser::parse_clob_markets(&response)
}
pub async fn get_events(&self, limit: Option<u32>) -> ExchangeResult<Vec<PolyEvent>> {
let limit_str;
let mut params: Vec<(&str, &str)> = Vec::new();
if let Some(l) = limit {
limit_str = l.to_string();
params.push(("limit", &limit_str));
}
let response = self.get_gamma("/events", ¶ms).await?;
PolymarketParser::parse_events(&response)
}
pub async fn get_event(&self, event_id: &str) -> ExchangeResult<PolyEvent> {
let url = format!("{}/events/{}", self.endpoints.gamma_base, event_id);
let response = self.get_url(&url).await?;
PolymarketParser::parse_event(&response)
}
pub async fn get_gamma_markets(&self, limit: Option<u32>) -> ExchangeResult<Vec<PolyMarket>> {
let limit_str;
let mut params: Vec<(&str, &str)> = Vec::new();
if let Some(l) = limit {
limit_str = l.to_string();
params.push(("limit", &limit_str));
}
let response = self.get_gamma("/markets", ¶ms).await?;
PolymarketParser::parse_gamma_markets(&response)
}
async fn get_yes_token_id(&self, condition_id: &str) -> ExchangeResult<String> {
let market = self.get_market(condition_id).await?;
let token = market
.tokens
.iter()
.find(|t| t.outcome == "Yes")
.or_else(|| market.tokens.first())
.ok_or_else(|| {
ExchangeError::Parse(format!(
"No tokens found for condition_id {}",
condition_id
))
})?;
Ok(token.token_id.clone())
}
fn symbol_id<'a>(&self, symbol: &'a Symbol) -> std::borrow::Cow<'a, str> {
let s = symbol.raw().unwrap_or(&symbol.base);
if s.chars().any(|c| c.is_ascii_uppercase()) {
std::borrow::Cow::Owned(s.to_lowercase())
} else {
std::borrow::Cow::Borrowed(s)
}
}
}
impl Default for PolymarketConnector {
fn default() -> Self {
Self::public()
}
}
impl ExchangeIdentity for PolymarketConnector {
fn exchange_id(&self) -> ExchangeId {
ExchangeId::Polymarket
}
fn is_testnet(&self) -> bool {
false
}
fn supported_account_types(&self) -> Vec<AccountType> {
vec![AccountType::Spot]
}
fn exchange_type(&self) -> ExchangeType {
ExchangeType::DataProvider
}
fn metrics(&self) -> ConnectorStats {
let (rate_used, rate_max) = if let Ok(mut limiter) = self.limiter.lock() {
limiter.primary_stats()
} else {
(0, 0)
};
ConnectorStats {
http_requests: 0,
http_errors: 0,
last_latency_ms: 0,
rate_used,
rate_max,
rate_groups: Vec::new(),
ws_ping_rtt_ms: 0,
}
}
fn rate_limit_capabilities(&self) -> RateLimitCapabilities {
POLYMARKET_RATE_CAPS
}
fn orderbook_capabilities(&self, _account_type: AccountType) -> OrderbookCapabilities {
OrderbookCapabilities {
ws_depths: &[],
ws_default_depth: None,
rest_max_depth: None,
rest_depth_values: &[],
supports_snapshot: true,
supports_delta: true,
update_speeds_ms: &[],
default_speed_ms: None,
ws_channels: &[],
checksum: None,
has_sequence: false,
has_prev_sequence: false,
supports_aggregation: false,
aggregation_levels: &[],
}
}
}
#[async_trait]
impl MarketData for PolymarketConnector {
fn market_data_capabilities(&self, _account_type: AccountType) -> MarketDataCapabilities {
MarketDataCapabilities {
has_ping: true,
has_price: true,
has_ticker: true,
has_orderbook: true,
has_klines: true,
has_exchange_info: true,
has_recent_trades: false,
supported_intervals: &["1m", "1h", "6h", "1d", "1w"],
max_kline_limit: Some(1000),
has_ws_orderbook: true,
has_ws_trades: true,
has_ws_klines: false,
has_ws_ticker: false,
}
}
async fn ping(&self) -> ExchangeResult<()> {
self.get_clob("/time", &[]).await?;
Ok(())
}
async fn get_price(
&self,
symbol: Symbol,
_account_type: AccountType,
) -> ExchangeResult<Price> {
let condition_id = self.symbol_id(&symbol);
let condition_id = condition_id.as_ref();
let market = self.get_market(condition_id).await?;
let yes_token = market
.tokens
.iter()
.find(|t| t.outcome == "Yes")
.or_else(|| market.tokens.first())
.ok_or_else(|| {
ExchangeError::Parse(format!(
"No tokens found for market {}",
condition_id,
))
})?;
match self.get_last_trade_price(&yes_token.token_id).await {
Ok(price) => Ok(price),
Err(_) => {
let midpoint = self.get_midpoint(&yes_token.token_id).await?;
Ok(midpoint.mid)
}
}
}
async fn get_orderbook(
&self,
symbol: Symbol,
_depth: Option<u16>,
_account_type: AccountType,
) -> ExchangeResult<OrderBook> {
let condition_id = self.symbol_id(&symbol);
let condition_id = condition_id.as_ref();
let yes_token_id = self.get_yes_token_id(condition_id).await?;
let poly_book = self.get_order_book(&yes_token_id).await?;
Ok(poly_orderbook_to_v5(&poly_book))
}
async fn get_klines(
&self,
symbol: Symbol,
interval: &str,
limit: Option<u16>,
_account_type: AccountType,
_end_time: Option<i64>,
) -> ExchangeResult<Vec<Kline>> {
let input_cow = self.symbol_id(&symbol);
let input = input_cow.as_ref();
let token_id = if (input.starts_with("0x") || input.starts_with("0X")) && input.len() == 66
{
self.get_yes_token_id(input).await?
} else {
input.to_string()
};
let poly_interval = map_interval(interval);
let fidelity = get_fidelity(limit);
let fidelity_str = fidelity.to_string();
let response = self
.get_clob(
"/prices-history",
&[
("market", token_id.as_str()),
("interval", poly_interval),
("fidelity", fidelity_str.as_str()),
],
)
.await?;
let history = PolymarketParser::parse_price_history(&response)?;
let interval_ms = interval_to_ms(poly_interval);
let klines = price_history_to_klines(history, interval_ms);
Ok(klines)
}
async fn get_ticker(
&self,
symbol: Symbol,
_account_type: AccountType,
) -> ExchangeResult<Ticker> {
let condition_id = self.symbol_id(&symbol);
let condition_id = condition_id.as_ref();
let market = self.get_market(condition_id).await?;
clob_market_to_ticker(&market).ok_or_else(|| {
ExchangeError::Parse(format!(
"No ticker data available for market {}",
condition_id
))
})
}
async fn get_exchange_info(
&self,
account_type: AccountType,
) -> ExchangeResult<Vec<SymbolInfo>> {
let gamma_result = self
.get_gamma(
"/markets",
&[("active", "true"), ("closed", "false"), ("limit", "500")],
)
.await;
match gamma_result {
Ok(response) => {
let markets = PolymarketParser::parse_gamma_markets(&response)?;
let symbols = markets
.iter()
.map(|m| poly_market_to_symbol_info(m, account_type))
.collect();
Ok(symbols)
}
Err(_) => {
let markets = self.get_active_markets(Some(500)).await?;
let symbols = markets
.iter()
.map(|m| clob_market_to_symbol_info(m, account_type))
.collect();
Ok(symbols)
}
}
}
}