pub mod types;
pub mod storage;
pub mod sqlite_backend;
pub mod backfill;
pub mod watchlist;
pub mod estimator;
pub mod pnl;
pub use types::*;
pub use storage::StorageBackend;
pub use sqlite_backend::SqliteBackend;
pub use backfill::BackfillOrchestrator;
pub use watchlist::WatchlistManager;
pub use estimator::estimate_storage;
pub use pnl::compute_realized_pnl;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use crate::client::PolyNodeClient;
use crate::error::Result;
use crate::types::events::PolyNodeEvent;
use crate::types::ws_messages::WsMessage;
use crate::ws::stream::StreamOptions;
use crate::ws::subscription::{Subscription, SubscriptionType};
const PRUNE_INTERVAL_SECS: u64 = 3600;
pub struct PolyNodeCache {
client: Arc<PolyNodeClient>,
storage: Arc<Mutex<Box<dyn StorageBackend>>>,
watchlist_mgr: WatchlistManager,
config: CacheConfig,
backfill: Option<BackfillOrchestrator>,
running: Arc<AtomicBool>,
ws_handle: Option<tokio::task::JoinHandle<()>>,
prune_handle: Option<tokio::task::JoinHandle<()>>,
watcher_handle: Option<std::thread::JoinHandle<()>>,
started: bool,
}
impl PolyNodeCache {
pub fn builder(client: Arc<PolyNodeClient>) -> CacheBuilder {
CacheBuilder::new(client)
}
pub async fn start(&mut self) -> Result<()> {
if self.started { return Ok(()); }
self.started = true;
self.running.store(true, Ordering::Relaxed);
{
let mut s = self.storage.lock().unwrap();
s.open()?;
}
let watchlist_file = self.watchlist_mgr.load()?;
let current_entries = self.watchlist_mgr.to_snapshot_rows(&watchlist_file);
let stored_entries = self.storage.lock().unwrap().get_watchlist_snapshot()?;
let _diff = self.watchlist_mgr.diff(&stored_entries, ¤t_entries);
self.storage.lock().unwrap().set_watchlist_snapshot(¤t_entries)?;
let entity_count = current_entries.iter().filter(|e| e.backfill).count();
if entity_count > 0 {
let total_requests = entity_count as u32 * self.config.backfill_pages;
let eta_seconds = (total_requests as f64 / self.config.backfill_rate_per_second).ceil() as u64;
tracing::info!(
"[PolyNodeCache] Backfilling {} entities ({} page{} of {} each) — ETA: ~{}s",
entity_count,
self.config.backfill_pages,
if self.config.backfill_pages > 1 { "s" } else { "" },
self.config.backfill_page_size,
eta_seconds,
);
}
let mut backfill = BackfillOrchestrator::new(
self.client.clone(),
self.storage.clone(),
self.config.backfill_rate_per_second,
self.config.backfill_pages,
self.config.backfill_page_size,
self.config.on_backfill_progress.clone(),
);
for entry in ¤t_entries {
if entry.backfill {
backfill.queue_entity(&entry.entity_type, &entry.entity_id, &entry.label);
}
}
backfill.start();
self.backfill = Some(backfill);
self.start_ws_stream(¤t_entries).await;
let storage_clone = self.storage.clone();
let running_clone = self.running.clone();
let ttl = self.config.ttl_seconds;
self.prune_handle = Some(tokio::spawn(async move {
while running_clone.load(Ordering::Relaxed) {
tokio::time::sleep(std::time::Duration::from_secs(PRUNE_INTERVAL_SECS)).await;
if !running_clone.load(Ordering::Relaxed) { break; }
if let Ok(s) = storage_clone.lock() {
let _ = s.prune(ttl);
let _ = s.analyze();
}
}
}));
self.start_file_watcher();
Ok(())
}
pub async fn stop(&mut self) -> Result<()> {
if !self.started { return Ok(()); }
self.started = false;
self.running.store(false, Ordering::Relaxed);
if let Some(ref mut bf) = self.backfill {
bf.stop();
}
self.backfill = None;
if let Some(h) = self.ws_handle.take() { h.abort(); }
if let Some(h) = self.prune_handle.take() { h.abort(); }
if let Some(h) = self.watcher_handle.take() { let _ = h.join(); }
self.storage.lock().unwrap().close();
Ok(())
}
pub fn wallet_trades(&self, wallet: &str, opts: &QueryOptions) -> Result<Vec<TradeRow>> {
self.storage.lock().unwrap().wallet_trades(wallet, opts)
}
pub fn wallet_positions(&self, wallet: &str) -> Result<Vec<PositionSummary>> {
self.storage.lock().unwrap().wallet_positions(wallet)
}
pub fn multi_wallet_positions(&self, wallets: &[String]) -> Result<HashMap<String, Vec<PositionSummary>>> {
self.storage.lock().unwrap().multi_wallet_positions(wallets)
}
pub fn market_trades(&self, condition_id: &str, opts: &QueryOptions) -> Result<Vec<TradeRow>> {
self.storage.lock().unwrap().market_trades(condition_id, opts)
}
pub fn market_positions(&self, condition_id: &str) -> Result<Vec<PositionSummary>> {
self.storage.lock().unwrap().market_positions(condition_id)
}
pub fn token_trades(&self, token_id: &str, opts: &QueryOptions) -> Result<Vec<TradeRow>> {
self.storage.lock().unwrap().token_trades(token_id, opts)
}
pub fn wallet_settlements(&self, wallet: &str, opts: &QueryOptions) -> Result<Vec<SettlementRow>> {
self.storage.lock().unwrap().wallet_settlements(wallet, opts)
}
pub fn trade_by_tx_hash(&self, tx_hash: &str) -> Result<Vec<TradeRow>> {
self.storage.lock().unwrap().trade_by_tx_hash(tx_hash)
}
pub fn stats(&self) -> Result<CacheStats> {
self.storage.lock().unwrap().stats()
}
pub fn wallet_realized_pnl(&self, wallet: &str) -> Result<RealizedPnlResult> {
let s = self.storage.lock().unwrap();
Ok(pnl::compute_realized_pnl(s.as_ref(), wallet))
}
pub fn prune(&self) -> Result<usize> {
let s = self.storage.lock().unwrap();
let pruned = s.prune(self.config.ttl_seconds)?;
if pruned > 0 { let _ = s.analyze(); }
Ok(pruned)
}
pub fn add_to_watchlist(&self, entries: &[(EntityType, String, String, bool)]) -> Result<()> {
let added = self.watchlist_mgr.add_entries(entries)?;
if added.is_empty() { return Ok(()); }
let mut current = self.storage.lock().unwrap().get_watchlist_snapshot()?;
current.extend(added.iter().cloned());
self.storage.lock().unwrap().set_watchlist_snapshot(¤t)?;
if let Some(ref bf) = self.backfill {
for entry in &added {
if entry.backfill {
bf.queue_entity(&entry.entity_type, &entry.entity_id, &entry.label);
}
}
}
Ok(())
}
pub fn remove_from_watchlist(&self, entries: &[(EntityType, String)]) -> Result<()> {
let removed = self.watchlist_mgr.remove_entries(entries)?;
if removed.is_empty() { return Ok(()); }
if self.config.purge_on_remove {
let s = self.storage.lock().unwrap();
for (et, id) in &removed {
let _ = s.purge_entity(&et.to_string(), id);
}
}
let watchlist_file = self.watchlist_mgr.load()?;
let current_entries = self.watchlist_mgr.to_snapshot_rows(&watchlist_file);
self.storage.lock().unwrap().set_watchlist_snapshot(¤t_entries)?;
Ok(())
}
async fn start_ws_stream(&mut self, entries: &[WatchlistSnapshotRow]) {
let wallets: Vec<String> = entries.iter().filter(|e| e.entity_type == "wallet").map(|e| e.entity_id.clone()).collect();
let tokens: Vec<String> = entries.iter().filter(|e| e.entity_type == "token").map(|e| e.entity_id.clone()).collect();
let condition_ids: Vec<String> = entries.iter().filter(|e| e.entity_type == "market").map(|e| e.entity_id.clone()).collect();
if wallets.is_empty() && tokens.is_empty() && condition_ids.is_empty() { return; }
let storage = self.storage.clone();
let running = self.running.clone();
let client = self.client.clone();
self.ws_handle = Some(tokio::spawn(async move {
let opts = StreamOptions::default();
let mut stream = match client.stream(opts).await {
Ok(s) => s,
Err(e) => { tracing::error!("Cache WS connect failed: {e}"); return; }
};
let mut sub = Subscription::new(SubscriptionType::Settlements);
sub.filters.status = Some("all".into());
if !wallets.is_empty() { sub.filters.wallets = Some(wallets.clone()); }
if !tokens.is_empty() { sub.filters.tokens = Some(tokens.clone()); }
if !condition_ids.is_empty() { sub.filters.condition_ids = Some(condition_ids.clone()); }
let _ = stream.subscribe(sub).await;
let mut trade_sub = Subscription::new(SubscriptionType::Trades);
trade_sub.filters.status = Some("all".into());
if !wallets.is_empty() { trade_sub.filters.wallets = Some(wallets); }
if !tokens.is_empty() { trade_sub.filters.tokens = Some(tokens); }
if !condition_ids.is_empty() { trade_sub.filters.condition_ids = Some(condition_ids); }
let _ = stream.subscribe(trade_sub).await;
while running.load(Ordering::Relaxed) {
match stream.next().await {
Some(Ok(WsMessage::Event(event))) => {
let s = storage.lock().unwrap();
match event {
PolyNodeEvent::Settlement(ref settlement) => {
let _ = s.upsert_settlement(settlement);
}
PolyNodeEvent::Trade(ref trade) => {
let now = now_secs();
let row = TradeRow {
tx_hash: trade.tx_hash.clone(),
log_index: trade.log_index as i64,
block_number: Some(trade.block_number as i64),
timestamp: normalize_f64(trade.timestamp as f64),
maker: trade.maker.to_lowercase(),
taker: trade.taker.to_lowercase(),
token_id: trade.token_id.clone(),
condition_id: trade.condition_id.clone().unwrap_or_default(),
market_title: trade.market_title.clone().unwrap_or_default(),
market_slug: trade.market_slug.clone().unwrap_or_default(),
outcome: trade.outcome.clone().unwrap_or_default(),
side: format!("{}", trade.side),
price: trade.price,
size: trade.size,
maker_amount: trade.maker_amount.clone(),
taker_amount: trade.taker_amount.clone(),
fee: trade.fee,
source: "trade_event".into(),
raw_json: None,
cached_at: now,
};
let _ = s.upsert_trade(&row);
}
_ => {}
}
}
Some(Ok(_)) => {} Some(Err(e)) => { tracing::debug!("Cache WS error: {e}"); }
None => break,
}
}
}));
}
fn start_file_watcher(&mut self) {
let path = self.watchlist_mgr.path().to_owned();
let running = self.running.clone();
let storage = self.storage.clone();
let watchlist_path = self.config.watchlist_path.clone();
let purge_on_remove = self.config.purge_on_remove;
self.watcher_handle = Some(std::thread::spawn(move || {
let mut last_modified = std::fs::metadata(&path).ok().and_then(|m| m.modified().ok());
while running.load(Ordering::Relaxed) {
std::thread::sleep(std::time::Duration::from_secs(2));
if !running.load(Ordering::Relaxed) { break; }
let current_modified = std::fs::metadata(&path).ok().and_then(|m| m.modified().ok());
if current_modified != last_modified {
last_modified = current_modified;
let mgr = WatchlistManager::new(&watchlist_path);
if let Ok(wl) = mgr.load() {
let current = mgr.to_snapshot_rows(&wl);
if let Ok(s) = storage.lock() {
if let Ok(stored) = s.get_watchlist_snapshot() {
let diff = mgr.diff(&stored, ¤t);
if !diff.added.is_empty() || !diff.removed.is_empty() {
let _ = s.set_watchlist_snapshot(&diff.current);
if purge_on_remove {
for entry in &diff.removed {
let _ = s.purge_entity(&entry.entity_type, &entry.entity_id);
}
}
}
}
}
}
}
}
}));
}
}
pub struct CacheBuilder {
client: Arc<PolyNodeClient>,
config: CacheConfig,
}
impl CacheBuilder {
fn new(client: Arc<PolyNodeClient>) -> Self {
Self {
client,
config: CacheConfig::default(),
}
}
pub fn db_path(mut self, path: impl Into<PathBuf>) -> Self {
self.config.db_path = path.into(); self
}
pub fn watchlist_path(mut self, path: impl Into<PathBuf>) -> Self {
self.config.watchlist_path = path.into(); self
}
pub fn ttl_seconds(mut self, ttl: u64) -> Self {
self.config.ttl_seconds = ttl; self
}
pub fn backfill_rate(mut self, rate: f64) -> Self {
self.config.backfill_rate_per_second = rate; self
}
pub fn backfill_pages(mut self, pages: u32) -> Self {
self.config.backfill_pages = pages; self
}
pub fn backfill_page_size(mut self, size: u32) -> Self {
self.config.backfill_page_size = size; self
}
pub fn purge_on_remove(mut self, purge: bool) -> Self {
self.config.purge_on_remove = purge; self
}
pub fn on_backfill_progress(mut self, cb: impl Fn(BackfillProgress) + Send + Sync + 'static) -> Self {
self.config.on_backfill_progress = Some(Arc::new(cb)); self
}
pub fn build(self) -> Result<PolyNodeCache> {
let storage: Box<dyn StorageBackend> = Box::new(SqliteBackend::new(&self.config.db_path));
let watchlist_mgr = WatchlistManager::new(&self.config.watchlist_path);
Ok(PolyNodeCache {
client: self.client,
storage: Arc::new(Mutex::new(storage)),
watchlist_mgr,
config: self.config,
backfill: None,
running: Arc::new(AtomicBool::new(false)),
ws_handle: None,
prune_handle: None,
watcher_handle: None,
started: false,
})
}
}