use futures::stream::{FuturesUnordered, StreamExt};
use px_core::{
manifests::POLYMARKET_MANIFEST, CheckpointCallback, FetchResult, MarketFetcher, OpenPxError,
};
use reqwest::Client;
use std::collections::BTreeMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Semaphore;
use tracing::info;
use crate::error::PolymarketError;
const CONCURRENCY: usize = 15;
type FetchFuture = Pin<
Box<dyn Future<Output = Result<(usize, usize, Vec<serde_json::Value>), OpenPxError>> + Send>,
>;
pub struct PolymarketMarketFetcher {
base_url: String,
}
impl PolymarketMarketFetcher {
pub fn new() -> Result<Self, PolymarketError> {
Ok(Self {
base_url: POLYMARKET_MANIFEST.base_url.to_string(),
})
}
fn create_client() -> Result<Client, PolymarketError> {
px_core::http::tuned_client_builder()
.pool_max_idle_per_host(CONCURRENCY)
.timeout(Duration::from_secs(30))
.build()
.map_err(PolymarketError::from)
}
async fn fetch_page(
client: Client,
base_url: String,
offset: usize,
page_size: usize,
stream_id: usize,
) -> Result<(usize, usize, Vec<serde_json::Value>), OpenPxError> {
info!(
exchange = "polymarket",
stream_id, offset, "Stream fetching page"
);
let url = format!("{}/markets?limit={}&offset={}", base_url, page_size, offset);
let response = client
.get(&url)
.send()
.await
.map_err(|e| OpenPxError::Exchange(PolymarketError::from(e).into()))?;
let status = response.status();
if !status.is_success() {
let body = response.text().await.unwrap_or_default();
return Err(OpenPxError::Exchange(PolymarketError::Api(body).into()));
}
let markets: Vec<serde_json::Value> = response
.json()
.await
.map_err(|e| OpenPxError::Exchange(PolymarketError::from(e).into()))?;
Ok((stream_id, offset, markets))
}
fn create_fetch_task(
client: Client,
base_url: String,
offset: usize,
page_size: usize,
semaphore: Arc<Semaphore>,
done: Arc<AtomicBool>,
stream_id: usize,
) -> FetchFuture {
Box::pin(async move {
let _permit = semaphore
.acquire()
.await
.map_err(|_| px_core::OpenPxError::Other("semaphore closed".into()))?;
if done.load(Ordering::SeqCst) {
return Ok((stream_id, offset, vec![]));
}
Self::fetch_page(client, base_url, offset, page_size, stream_id).await
})
}
}
impl MarketFetcher for PolymarketMarketFetcher {
fn exchange_id(&self) -> &'static str {
"polymarket"
}
async fn fetch_markets(&self) -> Result<Vec<serde_json::Value>, OpenPxError> {
let client = Self::create_client().map_err(|e| OpenPxError::Exchange(e.into()))?;
let max_page_size = POLYMARKET_MANIFEST.pagination.max_page_size;
let next_offset = Arc::new(AtomicUsize::new(0));
let done = Arc::new(AtomicBool::new(false));
let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
let mut results: BTreeMap<usize, Vec<serde_json::Value>> = BTreeMap::new();
let mut futures: FuturesUnordered<FetchFuture> = FuturesUnordered::new();
for stream_id in 0..CONCURRENCY {
let offset = next_offset.fetch_add(max_page_size, Ordering::SeqCst);
futures.push(Self::create_fetch_task(
client.clone(),
self.base_url.clone(),
offset,
max_page_size,
Arc::clone(&semaphore),
Arc::clone(&done),
stream_id,
));
}
let mut total_fetched = 0;
let mut highest_complete_offset = 0;
while let Some(result) = futures.next().await {
let (stream_id, offset, markets) = result?;
let count = markets.len();
if count > 0 {
results.insert(offset, markets);
total_fetched += count;
}
if count < max_page_size {
done.store(true, Ordering::SeqCst);
}
if !done.load(Ordering::SeqCst) {
let next = next_offset.fetch_add(max_page_size, Ordering::SeqCst);
futures.push(Self::create_fetch_task(
client.clone(),
self.base_url.clone(),
next,
max_page_size,
Arc::clone(&semaphore),
Arc::clone(&done),
stream_id,
));
}
while results.contains_key(&highest_complete_offset) {
let page_count = results
.get(&highest_complete_offset)
.map(|v| v.len())
.unwrap_or(0);
info!(
exchange = "polymarket",
offset = highest_complete_offset,
page_count,
total = total_fetched,
"Fetched page"
);
highest_complete_offset += max_page_size;
}
}
let mut all_markets = Vec::with_capacity(total_fetched);
for (_, markets) in results {
all_markets.extend(markets);
}
drop(client);
Ok(all_markets)
}
fn extract_status(&self, raw: &serde_json::Value) -> String {
let closed = raw.get("closed").and_then(|v| v.as_bool()).unwrap_or(false);
let active = raw.get("active").and_then(|v| v.as_bool()).unwrap_or(true);
if closed || !active {
"resolved".to_string()
} else {
"active".to_string()
}
}
async fn fetch_markets_with_checkpoints(
&self,
start_cursor: Option<String>,
checkpoint_interval: usize,
on_checkpoint: CheckpointCallback,
) -> Result<FetchResult, OpenPxError> {
let client = Self::create_client().map_err(|e| OpenPxError::Exchange(e.into()))?;
let max_page_size = POLYMARKET_MANIFEST.pagination.max_page_size;
let start_offset: usize = start_cursor
.as_ref()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let next_offset = Arc::new(AtomicUsize::new(start_offset));
let done = Arc::new(AtomicBool::new(false));
let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
let mut pending_results: BTreeMap<usize, Vec<serde_json::Value>> = BTreeMap::new();
let mut futures: FuturesUnordered<FetchFuture> = FuturesUnordered::new();
let mut buffer: Vec<serde_json::Value> = Vec::new();
let mut next_expected_offset = start_offset;
let mut total_fetched = 0;
for stream_id in 0..CONCURRENCY {
let offset = next_offset.fetch_add(max_page_size, Ordering::SeqCst);
futures.push(Self::create_fetch_task(
client.clone(),
self.base_url.clone(),
offset,
max_page_size,
Arc::clone(&semaphore),
Arc::clone(&done),
stream_id,
));
}
while let Some(result) = futures.next().await {
let (stream_id, offset, markets) = result?;
let count = markets.len();
if count > 0 {
pending_results.insert(offset, markets);
}
if count < max_page_size {
done.store(true, Ordering::SeqCst);
}
if !done.load(Ordering::SeqCst) {
let next = next_offset.fetch_add(max_page_size, Ordering::SeqCst);
futures.push(Self::create_fetch_task(
client.clone(),
self.base_url.clone(),
next,
max_page_size,
Arc::clone(&semaphore),
Arc::clone(&done),
stream_id,
));
}
while let Some(markets) = pending_results.remove(&next_expected_offset) {
let page_count = markets.len();
total_fetched += page_count;
info!(
exchange = "polymarket",
offset = next_expected_offset,
page_count,
buffer_size = buffer.len(),
total = total_fetched,
"Fetched page"
);
buffer.extend(markets);
next_expected_offset += max_page_size;
while buffer.len() >= checkpoint_interval {
let checkpoint_data: Vec<_> = buffer.drain(..checkpoint_interval).collect();
let cursor_str = next_expected_offset.to_string();
on_checkpoint(&checkpoint_data, &cursor_str).await?;
buffer.shrink_to_fit();
}
}
}
let final_offset = next_expected_offset;
drop(client);
buffer.shrink_to_fit();
Ok(FetchResult {
markets: buffer,
final_cursor: Some(final_offset.to_string()),
total_fetched,
})
}
}