use rusqlite::{params, Connection, OptionalExtension};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Mutex;
use super::storage::StorageBackend;
use super::types::*;
use crate::error::{Error, Result};
use crate::types::events::SettlementEvent;
const SCHEMA_VERSION: i64 = 1;
const SCHEMA_SQL: &str = r#"
CREATE TABLE IF NOT EXISTS _cache_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS settlements (
tx_hash TEXT NOT NULL,
status TEXT NOT NULL,
detected_at REAL NOT NULL,
block_number INTEGER,
taker_wallet TEXT NOT NULL,
taker_token TEXT NOT NULL,
taker_side TEXT NOT NULL,
taker_price REAL NOT NULL,
taker_size REAL NOT NULL,
condition_id TEXT NOT NULL DEFAULT '',
market_title TEXT NOT NULL DEFAULT '',
market_slug TEXT NOT NULL DEFAULT '',
outcome TEXT NOT NULL DEFAULT '',
trade_count INTEGER NOT NULL DEFAULT 0,
raw_json TEXT NOT NULL,
cached_at REAL NOT NULL,
PRIMARY KEY (tx_hash, status)
);
CREATE TABLE IF NOT EXISTS trades (
tx_hash TEXT NOT NULL,
log_index INTEGER NOT NULL DEFAULT 0,
block_number INTEGER,
timestamp REAL NOT NULL,
maker TEXT NOT NULL,
taker TEXT NOT NULL,
token_id TEXT NOT NULL,
condition_id TEXT NOT NULL DEFAULT '',
market_title TEXT NOT NULL DEFAULT '',
market_slug TEXT NOT NULL DEFAULT '',
outcome TEXT NOT NULL DEFAULT '',
side TEXT NOT NULL,
price REAL NOT NULL,
size REAL NOT NULL,
maker_amount TEXT NOT NULL DEFAULT '0',
taker_amount TEXT NOT NULL DEFAULT '0',
fee REAL,
source TEXT NOT NULL DEFAULT 'trade_event',
raw_json TEXT,
cached_at REAL NOT NULL,
UNIQUE(tx_hash, log_index, maker, taker, token_id)
);
CREATE TABLE IF NOT EXISTS positions (
wallet TEXT NOT NULL,
token_id TEXT NOT NULL,
condition_id TEXT NOT NULL DEFAULT '',
market_title TEXT NOT NULL DEFAULT '',
market_slug TEXT NOT NULL DEFAULT '',
outcome TEXT NOT NULL DEFAULT '',
size REAL NOT NULL DEFAULT 0,
avg_price REAL NOT NULL DEFAULT 0,
cur_price REAL,
current_value REAL,
initial_value REAL,
cash_pnl REAL,
percent_pnl REAL,
realized_pnl REAL,
total_bought REAL,
redeemable INTEGER NOT NULL DEFAULT 0,
end_date TEXT,
raw_json TEXT,
cached_at REAL NOT NULL,
PRIMARY KEY (wallet, token_id)
);
CREATE INDEX IF NOT EXISTS idx_positions_wallet ON positions(wallet);
CREATE INDEX IF NOT EXISTS idx_positions_condition ON positions(condition_id);
CREATE INDEX IF NOT EXISTS idx_positions_cached_at ON positions(cached_at);
CREATE TABLE IF NOT EXISTS backfill_state (
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
label TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'pending',
last_offset INTEGER NOT NULL DEFAULT 0,
fetched INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
started_at REAL NOT NULL,
updated_at REAL NOT NULL,
PRIMARY KEY (entity_type, entity_id)
);
CREATE TABLE IF NOT EXISTS watchlist_snapshot (
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
label TEXT NOT NULL DEFAULT '',
backfill INTEGER NOT NULL DEFAULT 1,
added_at REAL NOT NULL,
PRIMARY KEY (entity_type, entity_id)
);
CREATE INDEX IF NOT EXISTS idx_trades_maker ON trades(maker, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_trades_taker ON trades(taker, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_trades_token_id ON trades(token_id, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_trades_condition_id ON trades(condition_id, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_trades_tx_hash ON trades(tx_hash);
CREATE INDEX IF NOT EXISTS idx_trades_side ON trades(side, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_trades_timestamp ON trades(timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_trades_cached_at ON trades(cached_at);
CREATE INDEX IF NOT EXISTS idx_trades_wallet_token ON trades(taker, token_id, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_trades_maker_token ON trades(maker, token_id, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_trades_condition_side ON trades(condition_id, side, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_settlements_taker_wallet ON settlements(taker_wallet, detected_at DESC);
CREATE INDEX IF NOT EXISTS idx_settlements_condition_id ON settlements(condition_id, detected_at DESC);
CREATE INDEX IF NOT EXISTS idx_settlements_cached_at ON settlements(cached_at);
"#;
pub struct SqliteBackend {
db_path: PathBuf,
conn: Option<Mutex<Connection>>,
}
unsafe impl Sync for SqliteBackend {}
impl SqliteBackend {
pub fn new(db_path: impl Into<PathBuf>) -> Self {
Self {
db_path: db_path.into(),
conn: None,
}
}
fn conn(&self) -> Result<std::sync::MutexGuard<'_, Connection>> {
self.conn
.as_ref()
.ok_or_else(|| Error::Cache("database not open".into()))?
.lock()
.map_err(|e| Error::Cache(format!("mutex poisoned: {e}")))
}
fn build_query_where(opts: &QueryOptions) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
let mut where_clause = String::new();
let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
if let Some(since) = opts.since {
where_clause.push_str(" AND timestamp >= ?");
params.push(Box::new(since));
}
if let Some(until) = opts.until {
where_clause.push_str(" AND timestamp <= ?");
params.push(Box::new(until));
}
if let Some(ref side) = opts.side {
where_clause.push_str(" AND side = ?");
params.push(Box::new(side.clone()));
}
(where_clause, params)
}
fn read_trade_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<TradeRow> {
Ok(TradeRow {
tx_hash: row.get("tx_hash")?,
log_index: row.get("log_index")?,
block_number: row.get("block_number")?,
timestamp: row.get("timestamp")?,
maker: row.get("maker")?,
taker: row.get("taker")?,
token_id: row.get("token_id")?,
condition_id: row.get("condition_id")?,
market_title: row.get("market_title")?,
market_slug: row.get("market_slug")?,
outcome: row.get("outcome")?,
side: row.get("side")?,
price: row.get("price")?,
size: row.get("size")?,
maker_amount: row.get("maker_amount")?,
taker_amount: row.get("taker_amount")?,
fee: row.get("fee")?,
source: row.get("source")?,
raw_json: row.get("raw_json")?,
cached_at: row.get("cached_at")?,
})
}
pub fn wallet_token_ids(&self, wallet: &str) -> Vec<String> {
let conn = match self.conn() {
Ok(c) => c,
Err(_) => return Vec::new(),
};
let w = wallet.to_lowercase();
let mut stmt = match conn.prepare(
"SELECT DISTINCT token_id FROM trades WHERE LOWER(taker) = ?1 OR LOWER(maker) = ?1",
) {
Ok(s) => s,
Err(_) => return Vec::new(),
};
let rows = match stmt.query_map(params![w], |row| row.get::<_, String>(0)) {
Ok(r) => r,
Err(_) => return Vec::new(),
};
rows.filter_map(|r| r.ok()).collect()
}
pub fn wallet_token_trades(&self, wallet: &str, token_id: &str) -> Vec<TradeRow> {
let conn = match self.conn() {
Ok(c) => c,
Err(_) => return Vec::new(),
};
let w = wallet.to_lowercase();
let mut stmt = match conn.prepare(
"SELECT * FROM trades WHERE (LOWER(taker) = ?1 OR LOWER(maker) = ?1) AND token_id = ?2 ORDER BY timestamp ASC, log_index ASC"
) {
Ok(s) => s,
Err(_) => return Vec::new(),
};
let rows = match stmt.query_map(params![w, token_id], Self::read_trade_row) {
Ok(r) => r,
Err(_) => return Vec::new(),
};
rows.filter_map(|r| r.ok()).collect()
}
pub fn wallet_backfill_complete(&self, wallet: &str) -> bool {
let conn = match self.conn() {
Ok(c) => c,
Err(_) => return false,
};
let w = wallet.to_lowercase();
let status: Option<String> = conn
.prepare_cached("SELECT status FROM backfill_state WHERE entity_type = 'wallet' AND LOWER(entity_id) = ?")
.ok()
.and_then(|mut s| s.query_row(params![w], |r| r.get(0)).optional().ok().flatten());
status.as_deref() == Some("complete")
}
fn read_settlement_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SettlementRow> {
Ok(SettlementRow {
tx_hash: row.get("tx_hash")?,
status: row.get("status")?,
detected_at: row.get("detected_at")?,
block_number: row.get("block_number")?,
taker_wallet: row.get("taker_wallet")?,
taker_token: row.get("taker_token")?,
taker_side: row.get("taker_side")?,
taker_price: row.get("taker_price")?,
taker_size: row.get("taker_size")?,
condition_id: row.get("condition_id")?,
market_title: row.get("market_title")?,
market_slug: row.get("market_slug")?,
outcome: row.get("outcome")?,
trade_count: row.get("trade_count")?,
raw_json: row.get("raw_json")?,
cached_at: row.get("cached_at")?,
})
}
}
impl StorageBackend for SqliteBackend {
fn open(&mut self) -> Result<()> {
if let Some(parent) = self.db_path.parent() {
if !parent.exists() && self.db_path.to_str() != Some(":memory:") {
std::fs::create_dir_all(parent)?;
}
}
let conn = if self.db_path.to_str() == Some(":memory:") {
Connection::open_in_memory()?
} else {
Connection::open(&self.db_path)?
};
conn.execute_batch(
"PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000; PRAGMA synchronous=NORMAL;",
)?;
conn.execute_batch(SCHEMA_SQL)?;
let version: Option<String> = conn
.prepare_cached("SELECT value FROM _cache_meta WHERE key = ?")?
.query_row(params!["schema_version"], |r| r.get(0))
.optional()?;
if version.is_none() {
conn.execute(
"INSERT INTO _cache_meta (key, value) VALUES (?, ?)",
params!["schema_version", SCHEMA_VERSION.to_string()],
)?;
}
self.conn = Some(Mutex::new(conn));
Ok(())
}
fn close(&mut self) {
self.conn = None;
}
fn upsert_settlement(&self, event: &SettlementEvent) -> Result<()> {
let conn = self.conn()?;
let now = now_secs();
let detected_at = normalize_f64(event.detected_at as f64);
conn.prepare_cached(
"INSERT OR REPLACE INTO settlements
(tx_hash, status, detected_at, block_number, taker_wallet, taker_token,
taker_side, taker_price, taker_size, condition_id, market_title,
market_slug, outcome, trade_count, raw_json, cached_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
)?
.execute(params![
event.tx_hash,
format!("{}", event.status),
detected_at,
event.block_number.map(|n| n as i64),
event.taker_wallet,
event.taker_token,
event.taker_side,
event.taker_price,
event.taker_size,
event.condition_id.as_deref().unwrap_or(""),
event.market_title.as_deref().unwrap_or(""),
event.market_slug.as_deref().unwrap_or(""),
event.outcome.as_deref().unwrap_or(""),
event.trades.len() as i64,
serde_json::to_string(event).unwrap_or_default(),
now,
])?;
if !event.trades.is_empty() {
let tx = conn.unchecked_transaction()?;
{
let mut stmt = conn.prepare_cached(
"INSERT OR IGNORE INTO trades
(tx_hash, log_index, block_number, timestamp, maker, taker, token_id,
condition_id, market_title, market_slug, outcome, side, price, size,
maker_amount, taker_amount, fee, source, raw_json, cached_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20)"
)?;
for (i, t) in event.trades.iter().enumerate() {
stmt.execute(params![
event.tx_hash,
i as i64,
event.block_number.map(|n| n as i64),
detected_at,
t.maker,
t.taker,
t.token_id,
event.condition_id.as_deref().unwrap_or(""),
event.market_title.as_deref().unwrap_or(""),
event.market_slug.as_deref().unwrap_or(""),
t.outcome
.as_deref()
.unwrap_or(event.outcome.as_deref().unwrap_or("")),
format!("{}", t.side),
t.price,
t.size,
t.maker_amount,
t.taker_amount,
Option::<f64>::None,
"settlement",
Option::<String>::None,
now,
])?;
}
}
tx.commit()?;
}
Ok(())
}
fn upsert_trade(&self, trade: &TradeRow) -> Result<()> {
let conn = self.conn()?;
conn.prepare_cached(
"INSERT OR IGNORE INTO trades
(tx_hash, log_index, block_number, timestamp, maker, taker, token_id,
condition_id, market_title, market_slug, outcome, side, price, size,
maker_amount, taker_amount, fee, source, raw_json, cached_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20)"
)?.execute(params![
trade.tx_hash, trade.log_index, trade.block_number, trade.timestamp,
trade.maker, trade.taker, trade.token_id, trade.condition_id,
trade.market_title, trade.market_slug, trade.outcome, trade.side,
trade.price, trade.size, trade.maker_amount, trade.taker_amount,
trade.fee, trade.source, trade.raw_json, trade.cached_at,
])?;
Ok(())
}
fn upsert_trades(&self, trades: &[TradeRow]) -> Result<()> {
if trades.is_empty() {
return Ok(());
}
let conn = self.conn()?;
let tx = conn.unchecked_transaction()?;
{
let mut stmt = conn.prepare_cached(
"INSERT OR IGNORE INTO trades
(tx_hash, log_index, block_number, timestamp, maker, taker, token_id,
condition_id, market_title, market_slug, outcome, side, price, size,
maker_amount, taker_amount, fee, source, raw_json, cached_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20)"
)?;
for t in trades {
stmt.execute(params![
t.tx_hash,
t.log_index,
t.block_number,
t.timestamp,
t.maker,
t.taker,
t.token_id,
t.condition_id,
t.market_title,
t.market_slug,
t.outcome,
t.side,
t.price,
t.size,
t.maker_amount,
t.taker_amount,
t.fee,
t.source,
t.raw_json,
t.cached_at,
])?;
}
}
tx.commit()?;
Ok(())
}
fn upsert_positions(&self, wallet: &str, positions: &[serde_json::Value]) -> Result<()> {
if positions.is_empty() {
return Ok(());
}
let conn = self.conn()?;
let w = wallet.to_lowercase();
let now = now_secs();
let tx = conn.unchecked_transaction()?;
{
let mut stmt = conn.prepare_cached(
"INSERT OR REPLACE INTO positions
(wallet, token_id, condition_id, market_title, market_slug, outcome,
size, avg_price, cur_price, current_value, initial_value,
cash_pnl, percent_pnl, realized_pnl, total_bought,
redeemable, end_date, raw_json, cached_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19)"
)?;
for p in positions {
let token_id = p
.get("asset")
.or_else(|| p.get("token_id"))
.and_then(|v| v.as_str())
.unwrap_or("");
let size_val = p
.get("size")
.and_then(|v| {
v.as_f64()
.or_else(|| v.as_str().and_then(|s| s.parse().ok()))
})
.unwrap_or(0.0);
let avg_price = p
.get("avgPrice")
.or_else(|| p.get("avg_price"))
.and_then(|v| {
v.as_f64()
.or_else(|| v.as_str().and_then(|s| s.parse().ok()))
})
.unwrap_or(0.0);
stmt.execute(params![
w,
token_id,
p.get("conditionId")
.or_else(|| p.get("condition_id"))
.and_then(|v| v.as_str())
.unwrap_or(""),
p.get("market_title")
.or_else(|| p.get("title"))
.and_then(|v| v.as_str())
.unwrap_or(""),
p.get("market_slug")
.or_else(|| p.get("slug"))
.and_then(|v| v.as_str())
.unwrap_or(""),
p.get("outcome").and_then(|v| v.as_str()).unwrap_or(""),
size_val,
avg_price,
p.get("curPrice").and_then(|v| v.as_f64()),
p.get("currentValue").and_then(|v| v.as_f64()),
p.get("initialValue").and_then(|v| v.as_f64()),
p.get("cashPnl").and_then(|v| v.as_f64()),
p.get("percentPnl").and_then(|v| v.as_f64()),
p.get("realizedPnl").and_then(|v| v.as_f64()),
p.get("totalBought").and_then(|v| v.as_f64()),
if p.get("redeemable")
.and_then(|v| v.as_bool())
.unwrap_or(false)
{
1i64
} else {
0i64
},
p.get("endDate")
.or_else(|| p.get("end_date"))
.and_then(|v| v.as_str()),
p.to_string(),
now,
])?;
}
}
tx.commit()?;
Ok(())
}
fn upsert_onchain_positions(
&self,
wallet: &str,
positions: &[crate::types::rest::OnchainPosition],
) -> Result<()> {
if positions.is_empty() {
return Ok(());
}
let conn = self.conn()?;
let w = wallet.to_lowercase();
let now = now_secs();
let tx = conn.unchecked_transaction()?;
{
let mut stmt = conn.prepare_cached(
"INSERT INTO positions
(wallet, token_id, condition_id, market_title, market_slug, outcome,
size, avg_price, cur_price, current_value, initial_value,
cash_pnl, percent_pnl, realized_pnl, total_bought,
redeemable, end_date, raw_json, cached_at)
VALUES (?1, ?2, '', '', '', '', ?3, ?4, NULL, NULL, NULL, NULL, NULL, ?5, ?6, 0, NULL, NULL, ?7)
ON CONFLICT(wallet, token_id) DO UPDATE SET
realized_pnl = excluded.realized_pnl,
total_bought = excluded.total_bought,
avg_price = CASE WHEN excluded.avg_price > 0 THEN excluded.avg_price ELSE avg_price END,
size = CASE WHEN excluded.size != size THEN excluded.size ELSE size END,
cached_at = excluded.cached_at"
)?;
for p in positions {
stmt.execute(params![
w,
p.token_id,
p.size,
p.avg_price,
p.realized_pnl,
p.total_bought,
now
])?;
}
}
tx.commit()?;
Ok(())
}
fn wallet_trades(&self, wallet: &str, opts: &QueryOptions) -> Result<Vec<TradeRow>> {
let conn = self.conn()?;
let (where_extra, extra_params) = Self::build_query_where(opts);
let order = if opts.order_by == Some(OrderBy::TimestampAsc) {
"ASC"
} else {
"DESC"
};
let limit = opts.limit.unwrap_or(100);
let offset = opts.offset.unwrap_or(0);
let sql = format!(
"SELECT * FROM trades WHERE (LOWER(taker) = ? OR LOWER(maker) = ?) {} ORDER BY timestamp {} LIMIT ? OFFSET ?",
where_extra, order
);
let mut stmt = conn.prepare(&sql)?;
let w = wallet.to_lowercase();
let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
param_values.push(Box::new(w.clone()));
param_values.push(Box::new(w));
for p in extra_params {
param_values.push(p);
}
param_values.push(Box::new(limit as i64));
param_values.push(Box::new(offset as i64));
let params_ref: Vec<&dyn rusqlite::types::ToSql> =
param_values.iter().map(|p| p.as_ref()).collect();
let rows = stmt.query_map(params_ref.as_slice(), Self::read_trade_row)?;
let mut result = Vec::new();
for row in rows {
result.push(row?);
}
Ok(result)
}
fn wallet_positions(&self, wallet: &str) -> Result<Vec<PositionSummary>> {
let conn = self.conn()?;
let w = wallet.to_lowercase();
let mut stmt = conn.prepare_cached(
"SELECT p.*,
(SELECT COUNT(*) FROM trades WHERE (LOWER(taker) = ?1 OR LOWER(maker) = ?1) AND token_id = p.token_id) as trade_count,
(SELECT MIN(timestamp) FROM trades WHERE (LOWER(taker) = ?1 OR LOWER(maker) = ?1) AND token_id = p.token_id) as first_trade_at,
(SELECT MAX(timestamp) FROM trades WHERE (LOWER(taker) = ?1 OR LOWER(maker) = ?1) AND token_id = p.token_id) as last_trade_at
FROM positions p
WHERE LOWER(p.wallet) = ?1"
)?;
let rows = stmt.query_map(params![w], |row| {
Ok(PositionSummary {
wallet: row.get("wallet")?,
token_id: row.get("token_id")?,
condition_id: row.get("condition_id")?,
market_title: row.get("market_title")?,
market_slug: row.get("market_slug")?,
outcome: row.get("outcome")?,
size: row.get("size")?,
avg_price: row.get("avg_price")?,
cur_price: row.get("cur_price")?,
current_value: row.get("current_value")?,
initial_value: row.get("initial_value")?,
cash_pnl: row.get("cash_pnl")?,
percent_pnl: row.get("percent_pnl")?,
realized_pnl: row.get("realized_pnl")?,
total_bought: row.get("total_bought")?,
redeemable: row.get::<_, i64>("redeemable")? != 0,
end_date: row.get("end_date")?,
trade_count: row.get("trade_count")?,
first_trade_at: row.get("first_trade_at")?,
last_trade_at: row.get("last_trade_at")?,
})
})?;
let mut result = Vec::new();
for row in rows {
result.push(row?);
}
Ok(result)
}
fn multi_wallet_positions(
&self,
wallets: &[String],
) -> Result<HashMap<String, Vec<PositionSummary>>> {
let mut map = HashMap::new();
for wallet in wallets {
map.insert(wallet.clone(), self.wallet_positions(wallet)?);
}
Ok(map)
}
fn market_trades(&self, condition_id: &str, opts: &QueryOptions) -> Result<Vec<TradeRow>> {
let conn = self.conn()?;
let (where_extra, extra_params) = Self::build_query_where(opts);
let order = if opts.order_by == Some(OrderBy::TimestampAsc) {
"ASC"
} else {
"DESC"
};
let limit = opts.limit.unwrap_or(100);
let offset = opts.offset.unwrap_or(0);
let sql = format!(
"SELECT * FROM trades WHERE condition_id = ? {} ORDER BY timestamp {} LIMIT ? OFFSET ?",
where_extra, order
);
let mut stmt = conn.prepare(&sql)?;
let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
param_values.push(Box::new(condition_id.to_string()));
for p in extra_params {
param_values.push(p);
}
param_values.push(Box::new(limit as i64));
param_values.push(Box::new(offset as i64));
let params_ref: Vec<&dyn rusqlite::types::ToSql> =
param_values.iter().map(|p| p.as_ref()).collect();
let rows = stmt.query_map(params_ref.as_slice(), Self::read_trade_row)?;
let mut result = Vec::new();
for row in rows {
result.push(row?);
}
Ok(result)
}
fn market_positions(&self, condition_id: &str) -> Result<Vec<PositionSummary>> {
let conn = self.conn()?;
let mut stmt = conn.prepare_cached("SELECT * FROM positions WHERE condition_id = ?")?;
let rows = stmt.query_map(params![condition_id], |row| {
Ok(PositionSummary {
wallet: row.get("wallet")?,
token_id: row.get("token_id")?,
condition_id: row.get("condition_id")?,
market_title: row.get("market_title")?,
market_slug: row.get("market_slug")?,
outcome: row.get("outcome")?,
size: row.get("size")?,
avg_price: row.get("avg_price")?,
cur_price: row.get("cur_price")?,
current_value: row.get("current_value")?,
initial_value: row.get("initial_value")?,
cash_pnl: row.get("cash_pnl")?,
percent_pnl: row.get("percent_pnl")?,
realized_pnl: row.get("realized_pnl")?,
total_bought: row.get("total_bought")?,
redeemable: row.get::<_, i64>("redeemable")? != 0,
end_date: row.get("end_date")?,
trade_count: None,
first_trade_at: None,
last_trade_at: None,
})
})?;
let mut result = Vec::new();
for row in rows {
result.push(row?);
}
Ok(result)
}
fn token_trades(&self, token_id: &str, opts: &QueryOptions) -> Result<Vec<TradeRow>> {
let conn = self.conn()?;
let (where_extra, extra_params) = Self::build_query_where(opts);
let order = if opts.order_by == Some(OrderBy::TimestampAsc) {
"ASC"
} else {
"DESC"
};
let limit = opts.limit.unwrap_or(100);
let offset = opts.offset.unwrap_or(0);
let sql = format!(
"SELECT * FROM trades WHERE token_id = ? {} ORDER BY timestamp {} LIMIT ? OFFSET ?",
where_extra, order
);
let mut stmt = conn.prepare(&sql)?;
let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
param_values.push(Box::new(token_id.to_string()));
for p in extra_params {
param_values.push(p);
}
param_values.push(Box::new(limit as i64));
param_values.push(Box::new(offset as i64));
let params_ref: Vec<&dyn rusqlite::types::ToSql> =
param_values.iter().map(|p| p.as_ref()).collect();
let rows = stmt.query_map(params_ref.as_slice(), Self::read_trade_row)?;
let mut result = Vec::new();
for row in rows {
result.push(row?);
}
Ok(result)
}
fn wallet_settlements(&self, wallet: &str, opts: &QueryOptions) -> Result<Vec<SettlementRow>> {
let conn = self.conn()?;
let w = wallet.to_lowercase();
let order = if opts.order_by == Some(OrderBy::TimestampAsc) {
"ASC"
} else {
"DESC"
};
let limit = opts.limit.unwrap_or(100);
let offset = opts.offset.unwrap_or(0);
let mut where_extra = String::new();
let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
param_values.push(Box::new(w));
if let Some(since) = opts.since {
where_extra.push_str(" AND detected_at >= ?");
param_values.push(Box::new(since));
}
if let Some(until) = opts.until {
where_extra.push_str(" AND detected_at <= ?");
param_values.push(Box::new(until));
}
param_values.push(Box::new(limit as i64));
param_values.push(Box::new(offset as i64));
let sql = format!(
"SELECT * FROM settlements WHERE LOWER(taker_wallet) = ? {} ORDER BY detected_at {} LIMIT ? OFFSET ?",
where_extra, order
);
let mut stmt = conn.prepare(&sql)?;
let params_ref: Vec<&dyn rusqlite::types::ToSql> =
param_values.iter().map(|p| p.as_ref()).collect();
let rows = stmt.query_map(params_ref.as_slice(), Self::read_settlement_row)?;
let mut result = Vec::new();
for row in rows {
result.push(row?);
}
Ok(result)
}
fn trade_by_tx_hash(&self, tx_hash: &str) -> Result<Vec<TradeRow>> {
let conn = self.conn()?;
let mut stmt =
conn.prepare_cached("SELECT * FROM trades WHERE tx_hash = ? ORDER BY log_index ASC")?;
let rows = stmt.query_map(params![tx_hash], Self::read_trade_row)?;
let mut result = Vec::new();
for row in rows {
result.push(row?);
}
Ok(result)
}
fn wallet_token_ids(&self, wallet: &str) -> Vec<String> {
SqliteBackend::wallet_token_ids(self, wallet)
}
fn wallet_token_trades(&self, wallet: &str, token_id: &str) -> Vec<TradeRow> {
SqliteBackend::wallet_token_trades(self, wallet, token_id)
}
fn wallet_backfill_complete(&self, wallet: &str) -> bool {
SqliteBackend::wallet_backfill_complete(self, wallet)
}
fn get_backfill_state(
&self,
entity_type: &str,
entity_id: &str,
) -> Result<Option<BackfillStateRow>> {
let conn = self.conn()?;
let row = conn
.prepare_cached("SELECT * FROM backfill_state WHERE entity_type = ? AND entity_id = ?")?
.query_row(params![entity_type, entity_id], |row| {
Ok(BackfillStateRow {
entity_type: row.get("entity_type")?,
entity_id: row.get("entity_id")?,
label: row.get("label")?,
status: row.get("status")?,
last_offset: row.get("last_offset")?,
fetched: row.get("fetched")?,
last_error: row.get("last_error")?,
started_at: row.get("started_at")?,
updated_at: row.get("updated_at")?,
})
})
.optional()?;
Ok(row)
}
fn get_pending_backfills(&self) -> Result<Vec<BackfillStateRow>> {
let conn = self.conn()?;
let mut stmt = conn.prepare_cached(
"SELECT * FROM backfill_state WHERE status IN ('pending', 'in_progress') ORDER BY started_at ASC"
)?;
let rows = stmt.query_map([], |row| {
Ok(BackfillStateRow {
entity_type: row.get("entity_type")?,
entity_id: row.get("entity_id")?,
label: row.get("label")?,
status: row.get("status")?,
last_offset: row.get("last_offset")?,
fetched: row.get("fetched")?,
last_error: row.get("last_error")?,
started_at: row.get("started_at")?,
updated_at: row.get("updated_at")?,
})
})?;
let mut result = Vec::new();
for row in rows {
result.push(row?);
}
Ok(result)
}
fn set_backfill_state(&self, state: &BackfillStateRow) -> Result<()> {
let conn = self.conn()?;
conn.prepare_cached(
"INSERT OR REPLACE INTO backfill_state
(entity_type, entity_id, label, status, last_offset, fetched, last_error, started_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
)?.execute(params![
state.entity_type, state.entity_id, state.label, state.status,
state.last_offset, state.fetched, state.last_error,
state.started_at, state.updated_at,
])?;
Ok(())
}
fn update_backfill_progress(
&self,
entity_type: &str,
entity_id: &str,
offset: i64,
fetched: i64,
) -> Result<()> {
let conn = self.conn()?;
conn.prepare_cached(
"UPDATE backfill_state SET status = 'in_progress', last_offset = ?, fetched = ?, updated_at = ? WHERE entity_type = ? AND entity_id = ?"
)?.execute(params![offset, fetched, now_secs(), entity_type, entity_id])?;
Ok(())
}
fn complete_backfill(&self, entity_type: &str, entity_id: &str) -> Result<()> {
let conn = self.conn()?;
conn.prepare_cached(
"UPDATE backfill_state SET status = 'complete', updated_at = ? WHERE entity_type = ? AND entity_id = ?"
)?.execute(params![now_secs(), entity_type, entity_id])?;
Ok(())
}
fn fail_backfill(&self, entity_type: &str, entity_id: &str, error: &str) -> Result<()> {
let conn = self.conn()?;
conn.prepare_cached(
"UPDATE backfill_state SET status = 'failed', last_error = ?, updated_at = ? WHERE entity_type = ? AND entity_id = ?"
)?.execute(params![error, now_secs(), entity_type, entity_id])?;
Ok(())
}
fn get_watchlist_snapshot(&self) -> Result<Vec<WatchlistSnapshotRow>> {
let conn = self.conn()?;
let mut stmt = conn.prepare_cached("SELECT * FROM watchlist_snapshot")?;
let rows = stmt.query_map([], |row| {
Ok(WatchlistSnapshotRow {
entity_type: row.get("entity_type")?,
entity_id: row.get("entity_id")?,
label: row.get("label")?,
backfill: row.get::<_, i64>("backfill")? != 0,
added_at: row.get("added_at")?,
})
})?;
let mut result = Vec::new();
for row in rows {
result.push(row?);
}
Ok(result)
}
fn set_watchlist_snapshot(&self, entries: &[WatchlistSnapshotRow]) -> Result<()> {
let conn = self.conn()?;
let tx = conn.unchecked_transaction()?;
conn.execute("DELETE FROM watchlist_snapshot", [])?;
{
let mut stmt = conn.prepare_cached(
"INSERT INTO watchlist_snapshot (entity_type, entity_id, label, backfill, added_at) VALUES (?, ?, ?, ?, ?)"
)?;
for e in entries {
stmt.execute(params![
e.entity_type,
e.entity_id,
e.label,
e.backfill as i64,
e.added_at
])?;
}
}
tx.commit()?;
Ok(())
}
fn prune(&self, max_age_seconds: u64) -> Result<usize> {
let conn = self.conn()?;
let cutoff = now_secs() - max_age_seconds as f64;
let trades = conn.execute("DELETE FROM trades WHERE cached_at < ?", params![cutoff])?;
let settlements = conn.execute(
"DELETE FROM settlements WHERE cached_at < ?",
params![cutoff],
)?;
Ok(trades + settlements)
}
fn purge_entity(&self, entity_type: &str, entity_id: &str) -> Result<usize> {
let conn = self.conn()?;
let id_lower = entity_id.to_lowercase();
let mut deleted = 0usize;
match entity_type {
"wallet" => {
deleted += conn.execute(
"DELETE FROM trades WHERE LOWER(taker) = ? OR LOWER(maker) = ?",
params![id_lower, id_lower],
)?;
deleted += conn.execute(
"DELETE FROM settlements WHERE LOWER(taker_wallet) = ?",
params![id_lower],
)?;
}
"market" => {
deleted += conn.execute(
"DELETE FROM trades WHERE condition_id = ?",
params![entity_id],
)?;
deleted += conn.execute(
"DELETE FROM settlements WHERE condition_id = ?",
params![entity_id],
)?;
}
"token" => {
deleted +=
conn.execute("DELETE FROM trades WHERE token_id = ?", params![entity_id])?;
}
_ => {}
}
conn.execute(
"DELETE FROM backfill_state WHERE entity_type = ? AND entity_id = ?",
params![entity_type, entity_id],
)?;
Ok(deleted)
}
fn analyze(&self) -> Result<()> {
let conn = self.conn()?;
conn.execute_batch("ANALYZE")?;
Ok(())
}
fn stats(&self) -> Result<CacheStats> {
let conn = self.conn()?;
let trade_count: i64 = conn.query_row("SELECT COUNT(*) FROM trades", [], |r| r.get(0))?;
let settlement_count: i64 =
conn.query_row("SELECT COUNT(*) FROM settlements", [], |r| r.get(0))?;
let oldest: Option<f64> =
conn.query_row("SELECT MIN(timestamp) FROM trades", [], |r| r.get(0))?;
let newest: Option<f64> =
conn.query_row("SELECT MAX(timestamp) FROM trades", [], |r| r.get(0))?;
let total: i64 = conn.query_row("SELECT COUNT(*) FROM backfill_state", [], |r| r.get(0))?;
let complete: i64 = conn.query_row(
"SELECT COUNT(*) FROM backfill_state WHERE status = 'complete'",
[],
|r| r.get(0),
)?;
let pending: i64 = conn.query_row(
"SELECT COUNT(*) FROM backfill_state WHERE status = 'pending'",
[],
|r| r.get(0),
)?;
let in_progress: i64 = conn.query_row(
"SELECT COUNT(*) FROM backfill_state WHERE status = 'in_progress'",
[],
|r| r.get(0),
)?;
let failed: i64 = conn.query_row(
"SELECT COUNT(*) FROM backfill_state WHERE status = 'failed'",
[],
|r| r.get(0),
)?;
let mut db_size: u64 = 0;
if self.db_path.to_str() != Some(":memory:") {
if let Ok(meta) = std::fs::metadata(&self.db_path) {
db_size = meta.len();
}
let wal_path = format!("{}-wal", self.db_path.display());
if let Ok(meta) = std::fs::metadata(&wal_path) {
db_size += meta.len();
}
}
Ok(CacheStats {
trade_count,
settlement_count,
db_size_bytes: db_size,
oldest_trade_at: oldest,
newest_trade_at: newest,
backfill_entities: total,
backfill_complete: complete,
backfill_pending: pending,
backfill_in_progress: in_progress,
backfill_failed: failed,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_trade(overrides: &[(&str, &str)]) -> TradeRow {
let mut t = TradeRow {
tx_hash: format!("0x{:x}", rand_u64()),
log_index: 0,
block_number: Some(12345),
timestamp: now_secs(),
maker: "0xmaker".into(),
taker: "0xtaker".into(),
token_id: "token_abc".into(),
condition_id: "0xcondition".into(),
market_title: "Will BTC hit 100k?".into(),
market_slug: "btc-100k".into(),
outcome: "Yes".into(),
side: "BUY".into(),
price: 0.65,
size: 100.0,
maker_amount: "65000000".into(),
taker_amount: "100000000".into(),
fee: None,
source: "trade_event".into(),
raw_json: None,
cached_at: now_secs(),
};
for (k, v) in overrides {
match *k {
"tx_hash" => t.tx_hash = v.to_string(),
"taker" => t.taker = v.to_string(),
"maker" => t.maker = v.to_string(),
"side" => t.side = v.to_string(),
"token_id" => t.token_id = v.to_string(),
"condition_id" => t.condition_id = v.to_string(),
"outcome" => t.outcome = v.to_string(),
"source" => t.source = v.to_string(),
_ => {}
}
}
t
}
fn rand_u64() -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut h = DefaultHasher::new();
std::time::SystemTime::now().hash(&mut h);
std::thread::current().id().hash(&mut h);
h.finish()
}
fn open_memory() -> SqliteBackend {
let mut db = SqliteBackend::new(":memory:");
db.open().unwrap();
db
}
#[test]
fn test_open_and_schema() {
let db = open_memory();
let stats = db.stats().unwrap();
assert_eq!(stats.trade_count, 0);
assert_eq!(stats.settlement_count, 0);
}
#[test]
fn test_insert_and_query_trade() {
let db = open_memory();
let trade = make_trade(&[("taker", "0xalice")]);
db.upsert_trade(&trade).unwrap();
let results = db
.wallet_trades("0xalice", &QueryOptions::default())
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].taker, "0xalice");
}
#[test]
fn test_dedup() {
let db = open_memory();
let trade = make_trade(&[("tx_hash", "0xdup")]);
db.upsert_trade(&trade).unwrap();
db.upsert_trade(&trade).unwrap();
let results = db.trade_by_tx_hash("0xdup").unwrap();
assert_eq!(results.len(), 1);
}
#[test]
fn test_batch_insert() {
let db = open_memory();
let trades: Vec<TradeRow> = (0..50)
.map(|i| {
let mut t = make_trade(&[("taker", "0xbatch")]);
t.tx_hash = format!("0xbatch_{i}");
t
})
.collect();
db.upsert_trades(&trades).unwrap();
let results = db
.wallet_trades(
"0xbatch",
&QueryOptions {
limit: Some(100),
..Default::default()
},
)
.unwrap();
assert_eq!(results.len(), 50);
}
#[test]
fn test_wallet_positions_from_table() {
let db = open_memory();
db.upsert_positions("0xalice", &[
serde_json::json!({ "asset": "tok1", "conditionId": "cond1", "outcome": "Yes", "size": 200, "avgPrice": 0.65, "title": "BTC 100k", "cashPnl": 10.5 }),
serde_json::json!({ "asset": "tok2", "conditionId": "cond2", "outcome": "No", "size": 50, "avgPrice": 0.30 }),
]).unwrap();
let positions = db.wallet_positions("0xalice").unwrap();
assert_eq!(positions.len(), 2);
assert_eq!(positions[0].token_id, "tok1");
assert!((positions[0].size - 200.0).abs() < 0.01);
assert!((positions[0].avg_price - 0.65).abs() < 0.001);
assert_eq!(positions[0].wallet, "0xalice");
}
#[test]
fn test_positions_with_trade_timestamps() {
let db = open_memory();
db.upsert_positions("0xalice", &[
serde_json::json!({ "asset": "tok1", "conditionId": "cond1", "outcome": "Yes", "size": 100, "avgPrice": 0.50 }),
]).unwrap();
let mut t1 = make_trade(&[("taker", "0xalice"), ("token_id", "tok1")]);
t1.timestamp = 1000.0;
let mut t2 = make_trade(&[("taker", "0xalice"), ("token_id", "tok1")]);
t2.timestamp = 2000.0;
db.upsert_trades(&[t1, t2]).unwrap();
let positions = db.wallet_positions("0xalice").unwrap();
assert_eq!(positions.len(), 1);
assert_eq!(positions[0].trade_count, Some(2));
assert_eq!(positions[0].first_trade_at, Some(1000.0));
assert_eq!(positions[0].last_trade_at, Some(2000.0));
}
#[test]
fn test_filter_by_side() {
let db = open_memory();
db.upsert_trades(&[
make_trade(&[("taker", "0xalice"), ("side", "BUY")]),
make_trade(&[("taker", "0xalice"), ("side", "SELL")]),
make_trade(&[("taker", "0xalice"), ("side", "BUY")]),
])
.unwrap();
let buys = db
.wallet_trades(
"0xalice",
&QueryOptions {
side: Some("BUY".into()),
limit: Some(100),
..Default::default()
},
)
.unwrap();
assert_eq!(buys.len(), 2);
}
#[test]
fn test_prune() {
let db = open_memory();
let mut old = make_trade(&[("taker", "0xalice")]);
old.cached_at = now_secs() - 100.0;
let recent = make_trade(&[("taker", "0xalice")]);
db.upsert_trades(&[old, recent]).unwrap();
let pruned = db.prune(50).unwrap();
assert_eq!(pruned, 1);
}
#[test]
fn test_backfill_state() {
let db = open_memory();
let now = now_secs();
db.set_backfill_state(&BackfillStateRow {
entity_type: "wallet".into(),
entity_id: "0xalice".into(),
label: "Alice".into(),
status: "pending".into(),
last_offset: 0,
fetched: 0,
last_error: None,
started_at: now,
updated_at: now,
})
.unwrap();
let state = db.get_backfill_state("wallet", "0xalice").unwrap().unwrap();
assert_eq!(state.status, "pending");
db.update_backfill_progress("wallet", "0xalice", 100, 50)
.unwrap();
let state = db.get_backfill_state("wallet", "0xalice").unwrap().unwrap();
assert_eq!(state.status, "in_progress");
assert_eq!(state.last_offset, 100);
db.complete_backfill("wallet", "0xalice").unwrap();
let state = db.get_backfill_state("wallet", "0xalice").unwrap().unwrap();
assert_eq!(state.status, "complete");
}
#[test]
fn test_case_insensitive_wallet() {
let db = open_memory();
db.upsert_trade(&make_trade(&[("taker", "0xAbCdEf")]))
.unwrap();
assert_eq!(
db.wallet_trades("0XABCDEF", &QueryOptions::default())
.unwrap()
.len(),
1
);
assert_eq!(
db.wallet_trades("0xabcdef", &QueryOptions::default())
.unwrap()
.len(),
1
);
}
#[test]
fn test_stats() {
let db = open_memory();
let mut t1 = make_trade(&[("taker", "0xa")]);
t1.timestamp = 1000.0;
let mut t2 = make_trade(&[("taker", "0xb")]);
t2.timestamp = 2000.0;
db.upsert_trades(&[t1, t2]).unwrap();
let stats = db.stats().unwrap();
assert_eq!(stats.trade_count, 2);
assert_eq!(stats.oldest_trade_at, Some(1000.0));
assert_eq!(stats.newest_trade_at, Some(2000.0));
}
}