polynode 0.13.6

Rust SDK for the PolyNode API — real-time Polymarket data
Documentation
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::Notify;
use tokio::task::JoinHandle;

use crate::client::PolyNodeClient;
use crate::error::Error;
use super::storage::StorageBackend;
use super::types::*;

const MAX_RETRIES: u32 = 3;
const RETRY_BACKOFF_MS: u64 = 5000;

pub struct BackfillOrchestrator {
    client: Arc<PolyNodeClient>,
    storage: Arc<Mutex<Box<dyn StorageBackend>>>,
    rate_per_second: f64,
    max_pages: u32,
    page_size: u32,
    on_progress: Option<Arc<dyn Fn(BackfillProgress) + Send + Sync>>,
    running: Arc<AtomicBool>,
    wake: Arc<Notify>,
    handle: Option<JoinHandle<()>>,
}

impl BackfillOrchestrator {
    pub fn new(
        client: Arc<PolyNodeClient>,
        storage: Arc<Mutex<Box<dyn StorageBackend>>>,
        rate_per_second: f64,
        max_pages: u32,
        page_size: u32,
        on_progress: Option<Arc<dyn Fn(BackfillProgress) + Send + Sync>>,
    ) -> Self {
        Self {
            client,
            storage,
            rate_per_second,
            max_pages,
            page_size,
            on_progress,
            running: Arc::new(AtomicBool::new(false)),
            wake: Arc::new(Notify::new()),
            handle: None,
        }
    }

    pub fn start(&mut self) {
        if self.running.load(Ordering::Relaxed) { return; }
        self.running.store(true, Ordering::Relaxed);

        let client = self.client.clone();
        let storage = self.storage.clone();
        let running = self.running.clone();
        let wake = self.wake.clone();
        let rate = self.rate_per_second;
        let max_pages = self.max_pages;
        let page_size = self.page_size;
        let on_progress = self.on_progress.clone();

        self.handle = Some(tokio::spawn(async move {
            backfill_loop(client, storage, running, wake, rate, max_pages, page_size, on_progress).await;
        }));
    }

    pub fn stop(&mut self) {
        self.running.store(false, Ordering::Relaxed);
        self.wake.notify_one();
    }

    pub fn queue_entity(&self, entity_type: &str, entity_id: &str, label: &str) {
        let storage = self.storage.lock().unwrap();
        if let Ok(Some(existing)) = storage.get_backfill_state(entity_type, entity_id) {
            if existing.status == "complete" || existing.status == "in_progress" {
                return;
            }
        }
        let now = now_secs();
        let _ = storage.set_backfill_state(&BackfillStateRow {
            entity_type: entity_type.into(),
            entity_id: entity_id.into(),
            label: label.into(),
            status: "pending".into(),
            last_offset: 0,
            fetched: 0,
            last_error: None,
            started_at: now,
            updated_at: now,
        });
        drop(storage);
        self.wake.notify_one();
    }
}

async fn backfill_loop(
    client: Arc<PolyNodeClient>,
    storage: Arc<Mutex<Box<dyn StorageBackend>>>,
    running: Arc<AtomicBool>,
    wake: Arc<Notify>,
    rate: f64,
    max_pages: u32,
    page_size: u32,
    on_progress: Option<Arc<dyn Fn(BackfillProgress) + Send + Sync>>,
) {
    while running.load(Ordering::Relaxed) {
        let pending = {
            let s = storage.lock().unwrap();
            s.get_pending_backfills().unwrap_or_default()
        };

        if pending.is_empty() {
            tokio::select! {
                _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {},
                _ = wake.notified() => {},
            }
            continue;
        }

        for entity in &pending {
            if !running.load(Ordering::Relaxed) { return; }
            backfill_entity(&client, &storage, entity, rate, max_pages, page_size, &on_progress).await;
        }

        // ANALYZE after batch
        let _ = storage.lock().unwrap().analyze();
    }
}

async fn backfill_entity(
    client: &PolyNodeClient,
    storage: &Arc<Mutex<Box<dyn StorageBackend>>>,
    entity: &BackfillStateRow,
    rate: f64,
    max_pages: u32,
    page_size: u32,
    on_progress: &Option<Arc<dyn Fn(BackfillProgress) + Send + Sync>>,
) {
    let mut offset = entity.last_offset;
    let mut fetched = entity.fetched;
    let mut pages_used = 0u32;
    let mut retries = 0u32;

    emit_progress(on_progress, entity, "in_progress", fetched, offset, None);

    // Fetch positions first (wallets only) — one call for the full picture
    if entity.entity_type == "wallet" {
        match client.wallet_positions_data(&entity.entity_id, Some(500), None).await {
            Ok(resp) => {
                if !resp.positions.is_empty() {
                    let s = storage.lock().unwrap();
                    let _ = s.upsert_positions(&entity.entity_id, &resp.positions);
                }
                tokio::time::sleep(std::time::Duration::from_secs_f64(1.0 / rate)).await;
            }
            Err(_) => {} // best-effort
        }

        // Fetch onchain positions from PNL subgraph — accurate realized P&L for ALL positions (open + closed)
        match client.wallet_onchain_positions(&entity.entity_id).await {
            Ok(resp) => {
                if !resp.positions.is_empty() {
                    let s = storage.lock().unwrap();
                    let _ = s.upsert_onchain_positions(&entity.entity_id, &resp.positions);
                }
                tokio::time::sleep(std::time::Duration::from_secs_f64(1.0 / rate)).await;
            }
            Err(_) => {} // best-effort
        }
    }

    loop {
        if pages_used >= max_pages { break; }

        match fetch_page(client, &entity.entity_type, &entity.entity_id, offset, page_size).await {
            Ok(trades) => {
                retries = 0;

                if trades.is_empty() {
                    let _ = storage.lock().unwrap().complete_backfill(&entity.entity_type, &entity.entity_id);
                    emit_progress(on_progress, entity, "complete", fetched, offset, None);
                    return;
                }

                let now = now_secs();
                let rows: Vec<TradeRow> = trades.iter().enumerate().map(|(i, t)| {
                    TradeRow {
                        tx_hash: json_str(t, &["transactionHash", "tx_hash"]),
                        log_index: json_i64(t, &["logIndex", "log_index"]).unwrap_or(i as i64),
                        block_number: json_i64(t, &["blockNumber", "block_number"]),
                        timestamp: normalize_timestamp(t.get("timestamp").or_else(|| t.get("matchTime")).or_else(|| t.get("created_at")).unwrap_or(&serde_json::Value::Null)),
                        maker: json_str(t, &["maker"]).to_lowercase(),
                        taker: json_str_first(t, &["taker", "proxyWallet", "user"]).to_lowercase(),
                        token_id: json_str(t, &["asset", "token_id"]),
                        condition_id: json_str(t, &["conditionId", "market", "condition_id"]),
                        market_title: json_str(t, &["market_title", "title"]),
                        market_slug: json_str(t, &["market_slug", "slug"]),
                        outcome: json_str(t, &["outcome"]),
                        side: json_str(t, &["side"]).to_uppercase(),
                        price: t.get("price").and_then(|v| v.as_f64().or_else(|| v.as_str().and_then(|s| s.parse().ok()))).unwrap_or(0.0),
                        size: t.get("size").or_else(|| t.get("amount")).and_then(|v| v.as_f64().or_else(|| v.as_str().and_then(|s| s.parse().ok()))).unwrap_or(0.0),
                        maker_amount: json_str(t, &["makerAmount", "maker_amount"]),
                        taker_amount: json_str(t, &["takerAmount", "taker_amount"]),
                        fee: t.get("fee").and_then(|v| v.as_f64()),
                        source: "backfill".into(),
                        raw_json: Some(t.to_string()),
                        cached_at: now,
                    }
                }).collect();

                let count = rows.len() as i64;
                {
                    let s = storage.lock().unwrap();
                    let _ = s.upsert_trades(&rows);
                }
                fetched += count;
                offset += page_size as i64;
                pages_used += 1;

                {
                    let s = storage.lock().unwrap();
                    let _ = s.update_backfill_progress(&entity.entity_type, &entity.entity_id, offset, fetched);
                }
                emit_progress(on_progress, entity, "in_progress", fetched, offset, None);

                if count < page_size as i64 {
                    let _ = storage.lock().unwrap().complete_backfill(&entity.entity_type, &entity.entity_id);
                    emit_progress(on_progress, entity, "complete", fetched, offset, None);
                    return;
                }

                if pages_used < max_pages {
                    tokio::time::sleep(std::time::Duration::from_secs_f64(1.0 / rate)).await;
                }
            }
            Err(err) => {
                let msg = err.to_string();
                let is_rate_limit = msg.contains("429") || msg.to_lowercase().contains("rate limit");

                if is_rate_limit && retries < MAX_RETRIES {
                    retries += 1;
                    let backoff = RETRY_BACKOFF_MS * retries as u64;
                    emit_progress(on_progress, entity, "in_progress", fetched, offset, Some(format!("rate limited, retry {retries}/{MAX_RETRIES}")));
                    tokio::time::sleep(std::time::Duration::from_millis(backoff)).await;
                    continue;
                }

                // Upstream offset cap
                let is_offset_limit = msg.contains("offset") || msg.contains("max historical") || (msg.contains("400") && offset >= 3000);
                if is_offset_limit {
                    let _ = storage.lock().unwrap().complete_backfill(&entity.entity_type, &entity.entity_id);
                    emit_progress(on_progress, entity, "complete", fetched, offset, Some("reached upstream limit".into()));
                    return;
                }

                let _ = storage.lock().unwrap().fail_backfill(&entity.entity_type, &entity.entity_id, &msg);
                emit_progress(on_progress, entity, "failed", fetched, offset, Some(msg));
                return;
            }
        }
    }

    // Hit max_pages — mark complete
    let _ = storage.lock().unwrap().complete_backfill(&entity.entity_type, &entity.entity_id);
    emit_progress(on_progress, entity, "complete", fetched, offset, None);
}

async fn fetch_page(
    client: &PolyNodeClient,
    entity_type: &str,
    entity_id: &str,
    offset: i64,
    page_size: u32,
) -> Result<Vec<serde_json::Value>, Error> {
    match entity_type {
        "wallet" => {
            let resp = client.wallet_trades(entity_id, Some(page_size as u64), Some(offset as u64)).await?;
            Ok(resp.trades)
        }
        "market" => {
            let resp = client.market_trades(entity_id, Some(page_size as u64), Some(offset as u64), None, None).await?;
            Ok(resp.trades)
        }
        "token" => {
            let resp = client.token_settlements(entity_id, Some(page_size as u64)).await?;
            // Convert typed Settlement structs back to Value for the generic cache pipeline
            let values: Vec<serde_json::Value> = resp.settlements
                .into_iter()
                .filter_map(|s| serde_json::to_value(s).ok())
                .collect();
            Ok(values)
        }
        _ => Ok(Vec::new()),
    }
}

fn emit_progress(
    on_progress: &Option<Arc<dyn Fn(BackfillProgress) + Send + Sync>>,
    entity: &BackfillStateRow,
    status: &str,
    fetched: i64,
    offset: i64,
    message: Option<String>,
) {
    if let Some(cb) = on_progress {
        cb(BackfillProgress {
            entity_type: entity.entity_type.clone(),
            entity_id: entity.entity_id.clone(),
            label: entity.label.clone(),
            status: status.into(),
            fetched,
            offset,
            message,
        });
    }
}

// JSON field extraction helpers

fn json_str(v: &serde_json::Value, keys: &[&str]) -> String {
    for k in keys {
        if let Some(s) = v.get(k).and_then(|v| v.as_str()) {
            return s.to_string();
        }
    }
    String::new()
}

fn json_str_first(v: &serde_json::Value, keys: &[&str]) -> String {
    for k in keys {
        if let Some(s) = v.get(k).and_then(|v| v.as_str()) {
            if !s.is_empty() { return s.to_string(); }
        }
    }
    String::new()
}

fn json_i64(v: &serde_json::Value, keys: &[&str]) -> Option<i64> {
    for k in keys {
        if let Some(n) = v.get(k).and_then(|v| v.as_i64()) {
            return Some(n);
        }
    }
    None
}