use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use crate::client::PolyNodeClient;
use crate::error::Error;
use super::storage::StorageBackend;
use super::types::*;
const MAX_RETRIES: u32 = 3;
const RETRY_BACKOFF_MS: u64 = 5000;
pub struct BackfillOrchestrator {
client: Arc<PolyNodeClient>,
storage: Arc<Mutex<Box<dyn StorageBackend>>>,
rate_per_second: f64,
max_pages: u32,
page_size: u32,
on_progress: Option<Arc<dyn Fn(BackfillProgress) + Send + Sync>>,
running: Arc<AtomicBool>,
wake: Arc<Notify>,
handle: Option<JoinHandle<()>>,
}
impl BackfillOrchestrator {
pub fn new(
client: Arc<PolyNodeClient>,
storage: Arc<Mutex<Box<dyn StorageBackend>>>,
rate_per_second: f64,
max_pages: u32,
page_size: u32,
on_progress: Option<Arc<dyn Fn(BackfillProgress) + Send + Sync>>,
) -> Self {
Self {
client,
storage,
rate_per_second,
max_pages,
page_size,
on_progress,
running: Arc::new(AtomicBool::new(false)),
wake: Arc::new(Notify::new()),
handle: None,
}
}
pub fn start(&mut self) {
if self.running.load(Ordering::Relaxed) { return; }
self.running.store(true, Ordering::Relaxed);
let client = self.client.clone();
let storage = self.storage.clone();
let running = self.running.clone();
let wake = self.wake.clone();
let rate = self.rate_per_second;
let max_pages = self.max_pages;
let page_size = self.page_size;
let on_progress = self.on_progress.clone();
self.handle = Some(tokio::spawn(async move {
backfill_loop(client, storage, running, wake, rate, max_pages, page_size, on_progress).await;
}));
}
pub fn stop(&mut self) {
self.running.store(false, Ordering::Relaxed);
self.wake.notify_one();
}
pub fn queue_entity(&self, entity_type: &str, entity_id: &str, label: &str) {
let storage = self.storage.lock().unwrap();
if let Ok(Some(existing)) = storage.get_backfill_state(entity_type, entity_id) {
if existing.status == "complete" || existing.status == "in_progress" {
return;
}
}
let now = now_secs();
let _ = storage.set_backfill_state(&BackfillStateRow {
entity_type: entity_type.into(),
entity_id: entity_id.into(),
label: label.into(),
status: "pending".into(),
last_offset: 0,
fetched: 0,
last_error: None,
started_at: now,
updated_at: now,
});
drop(storage);
self.wake.notify_one();
}
}
async fn backfill_loop(
client: Arc<PolyNodeClient>,
storage: Arc<Mutex<Box<dyn StorageBackend>>>,
running: Arc<AtomicBool>,
wake: Arc<Notify>,
rate: f64,
max_pages: u32,
page_size: u32,
on_progress: Option<Arc<dyn Fn(BackfillProgress) + Send + Sync>>,
) {
while running.load(Ordering::Relaxed) {
let pending = {
let s = storage.lock().unwrap();
s.get_pending_backfills().unwrap_or_default()
};
if pending.is_empty() {
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {},
_ = wake.notified() => {},
}
continue;
}
for entity in &pending {
if !running.load(Ordering::Relaxed) { return; }
backfill_entity(&client, &storage, entity, rate, max_pages, page_size, &on_progress).await;
}
let _ = storage.lock().unwrap().analyze();
}
}
async fn backfill_entity(
client: &PolyNodeClient,
storage: &Arc<Mutex<Box<dyn StorageBackend>>>,
entity: &BackfillStateRow,
rate: f64,
max_pages: u32,
page_size: u32,
on_progress: &Option<Arc<dyn Fn(BackfillProgress) + Send + Sync>>,
) {
let mut offset = entity.last_offset;
let mut fetched = entity.fetched;
let mut pages_used = 0u32;
let mut retries = 0u32;
emit_progress(on_progress, entity, "in_progress", fetched, offset, None);
if entity.entity_type == "wallet" {
match client.wallet_positions_data(&entity.entity_id, Some(500), None).await {
Ok(resp) => {
if !resp.positions.is_empty() {
let s = storage.lock().unwrap();
let _ = s.upsert_positions(&entity.entity_id, &resp.positions);
}
tokio::time::sleep(std::time::Duration::from_secs_f64(1.0 / rate)).await;
}
Err(_) => {} }
match client.wallet_onchain_positions(&entity.entity_id).await {
Ok(resp) => {
if !resp.positions.is_empty() {
let s = storage.lock().unwrap();
let _ = s.upsert_onchain_positions(&entity.entity_id, &resp.positions);
}
tokio::time::sleep(std::time::Duration::from_secs_f64(1.0 / rate)).await;
}
Err(_) => {} }
}
loop {
if pages_used >= max_pages { break; }
match fetch_page(client, &entity.entity_type, &entity.entity_id, offset, page_size).await {
Ok(trades) => {
retries = 0;
if trades.is_empty() {
let _ = storage.lock().unwrap().complete_backfill(&entity.entity_type, &entity.entity_id);
emit_progress(on_progress, entity, "complete", fetched, offset, None);
return;
}
let now = now_secs();
let rows: Vec<TradeRow> = trades.iter().enumerate().map(|(i, t)| {
TradeRow {
tx_hash: json_str(t, &["transactionHash", "tx_hash"]),
log_index: json_i64(t, &["logIndex", "log_index"]).unwrap_or(i as i64),
block_number: json_i64(t, &["blockNumber", "block_number"]),
timestamp: normalize_timestamp(t.get("timestamp").or_else(|| t.get("matchTime")).or_else(|| t.get("created_at")).unwrap_or(&serde_json::Value::Null)),
maker: json_str(t, &["maker"]).to_lowercase(),
taker: json_str_first(t, &["taker", "proxyWallet", "user"]).to_lowercase(),
token_id: json_str(t, &["asset", "token_id"]),
condition_id: json_str(t, &["conditionId", "market", "condition_id"]),
market_title: json_str(t, &["market_title", "title"]),
market_slug: json_str(t, &["market_slug", "slug"]),
outcome: json_str(t, &["outcome"]),
side: json_str(t, &["side"]).to_uppercase(),
price: t.get("price").and_then(|v| v.as_f64().or_else(|| v.as_str().and_then(|s| s.parse().ok()))).unwrap_or(0.0),
size: t.get("size").or_else(|| t.get("amount")).and_then(|v| v.as_f64().or_else(|| v.as_str().and_then(|s| s.parse().ok()))).unwrap_or(0.0),
maker_amount: json_str(t, &["makerAmount", "maker_amount"]),
taker_amount: json_str(t, &["takerAmount", "taker_amount"]),
fee: t.get("fee").and_then(|v| v.as_f64()),
source: "backfill".into(),
raw_json: Some(t.to_string()),
cached_at: now,
}
}).collect();
let count = rows.len() as i64;
{
let s = storage.lock().unwrap();
let _ = s.upsert_trades(&rows);
}
fetched += count;
offset += page_size as i64;
pages_used += 1;
{
let s = storage.lock().unwrap();
let _ = s.update_backfill_progress(&entity.entity_type, &entity.entity_id, offset, fetched);
}
emit_progress(on_progress, entity, "in_progress", fetched, offset, None);
if count < page_size as i64 {
let _ = storage.lock().unwrap().complete_backfill(&entity.entity_type, &entity.entity_id);
emit_progress(on_progress, entity, "complete", fetched, offset, None);
return;
}
if pages_used < max_pages {
tokio::time::sleep(std::time::Duration::from_secs_f64(1.0 / rate)).await;
}
}
Err(err) => {
let msg = err.to_string();
let is_rate_limit = msg.contains("429") || msg.to_lowercase().contains("rate limit");
if is_rate_limit && retries < MAX_RETRIES {
retries += 1;
let backoff = RETRY_BACKOFF_MS * retries as u64;
emit_progress(on_progress, entity, "in_progress", fetched, offset, Some(format!("rate limited, retry {retries}/{MAX_RETRIES}")));
tokio::time::sleep(std::time::Duration::from_millis(backoff)).await;
continue;
}
let is_offset_limit = msg.contains("offset") || msg.contains("max historical") || (msg.contains("400") && offset >= 3000);
if is_offset_limit {
let _ = storage.lock().unwrap().complete_backfill(&entity.entity_type, &entity.entity_id);
emit_progress(on_progress, entity, "complete", fetched, offset, Some("reached upstream limit".into()));
return;
}
let _ = storage.lock().unwrap().fail_backfill(&entity.entity_type, &entity.entity_id, &msg);
emit_progress(on_progress, entity, "failed", fetched, offset, Some(msg));
return;
}
}
}
let _ = storage.lock().unwrap().complete_backfill(&entity.entity_type, &entity.entity_id);
emit_progress(on_progress, entity, "complete", fetched, offset, None);
}
async fn fetch_page(
client: &PolyNodeClient,
entity_type: &str,
entity_id: &str,
offset: i64,
page_size: u32,
) -> Result<Vec<serde_json::Value>, Error> {
match entity_type {
"wallet" => {
let resp = client.wallet_trades(entity_id, Some(page_size as u64), Some(offset as u64)).await?;
Ok(resp.trades)
}
"market" => {
let resp = client.market_trades(entity_id, Some(page_size as u64), Some(offset as u64), None, None).await?;
Ok(resp.trades)
}
"token" => {
let resp = client.token_settlements(entity_id, Some(page_size as u64)).await?;
let values: Vec<serde_json::Value> = resp.settlements
.into_iter()
.filter_map(|s| serde_json::to_value(s).ok())
.collect();
Ok(values)
}
_ => Ok(Vec::new()),
}
}
fn emit_progress(
on_progress: &Option<Arc<dyn Fn(BackfillProgress) + Send + Sync>>,
entity: &BackfillStateRow,
status: &str,
fetched: i64,
offset: i64,
message: Option<String>,
) {
if let Some(cb) = on_progress {
cb(BackfillProgress {
entity_type: entity.entity_type.clone(),
entity_id: entity.entity_id.clone(),
label: entity.label.clone(),
status: status.into(),
fetched,
offset,
message,
});
}
}
fn json_str(v: &serde_json::Value, keys: &[&str]) -> String {
for k in keys {
if let Some(s) = v.get(k).and_then(|v| v.as_str()) {
return s.to_string();
}
}
String::new()
}
fn json_str_first(v: &serde_json::Value, keys: &[&str]) -> String {
for k in keys {
if let Some(s) = v.get(k).and_then(|v| v.as_str()) {
if !s.is_empty() { return s.to_string(); }
}
}
String::new()
}
fn json_i64(v: &serde_json::Value, keys: &[&str]) -> Option<i64> {
for k in keys {
if let Some(n) = v.get(k).and_then(|v| v.as_i64()) {
return Some(n);
}
}
None
}