use rusqlite::{params, Connection};
use super::constants::META_TTL_SECONDS;
use super::types::*;
use crate::error::{Error, Result};
const SCHEMA_VERSION: i64 = 2;
const SCHEMA_SQL: &str = r#"
CREATE TABLE IF NOT EXISTS _trading_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS clob_credentials (
wallet_address TEXT PRIMARY KEY,
funder_address TEXT,
api_key TEXT NOT NULL,
api_secret TEXT NOT NULL,
api_passphrase TEXT NOT NULL,
signature_type INTEGER NOT NULL DEFAULT 2,
safe_deployed INTEGER NOT NULL DEFAULT 0,
approvals_set INTEGER NOT NULL DEFAULT 0,
created_at REAL NOT NULL,
updated_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS market_meta (
token_id TEXT PRIMARY KEY,
tick_size TEXT NOT NULL,
fee_rate_bps INTEGER NOT NULL,
neg_risk INTEGER NOT NULL DEFAULT 0,
fetched_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS order_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
wallet_address TEXT NOT NULL,
order_id TEXT,
token_id TEXT NOT NULL,
side TEXT NOT NULL,
price REAL NOT NULL,
size REAL NOT NULL,
order_type TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'submitted',
error_msg TEXT,
response_json TEXT,
created_at REAL NOT NULL,
fee_amount_raw TEXT,
escrow_order_id TEXT,
fee_escrow_tx_hash TEXT
);
CREATE INDEX IF NOT EXISTS idx_oh_wallet ON order_history(wallet_address);
CREATE INDEX IF NOT EXISTS idx_oh_token ON order_history(token_id);
"#;
pub struct TradingSqliteBackend {
conn: Connection,
}
impl TradingSqliteBackend {
pub fn open(db_path: &str) -> Result<Self> {
let conn = Connection::open(db_path).map_err(Error::Sqlite)?;
conn.execute_batch("PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL;")
.map_err(Error::Sqlite)?;
conn.execute_batch(SCHEMA_SQL).map_err(Error::Sqlite)?;
let version: Option<String> = conn
.query_row(
"SELECT value FROM _trading_meta WHERE key = 'schema_version'",
[],
|row| row.get(0),
)
.ok();
if version.is_none() {
conn.execute(
"INSERT INTO _trading_meta (key, value) VALUES ('schema_version', ?1)",
params![SCHEMA_VERSION.to_string()],
)
.map_err(Error::Sqlite)?;
} else {
let ver: i64 = version.unwrap_or_default().parse().unwrap_or(1);
if ver < 2 {
conn.execute_batch(
"ALTER TABLE order_history ADD COLUMN fee_amount_raw TEXT;
ALTER TABLE order_history ADD COLUMN escrow_order_id TEXT;
ALTER TABLE order_history ADD COLUMN fee_escrow_tx_hash TEXT;",
)
.map_err(Error::Sqlite)?;
conn.execute(
"UPDATE _trading_meta SET value = ?1 WHERE key = 'schema_version'",
params![SCHEMA_VERSION.to_string()],
)
.map_err(Error::Sqlite)?;
}
}
Ok(Self { conn })
}
pub fn get_credentials(&self, wallet_address: &str) -> Result<Option<StoredCredentials>> {
let mut stmt = self
.conn
.prepare("SELECT * FROM clob_credentials WHERE wallet_address = ?1")
.map_err(Error::Sqlite)?;
let result = stmt.query_row(params![wallet_address.to_lowercase()], |row| {
Ok(StoredCredentials {
wallet_address: row.get("wallet_address")?,
funder_address: row.get("funder_address")?,
api_key: row.get("api_key")?,
api_secret: row.get("api_secret")?,
api_passphrase: row.get("api_passphrase")?,
signature_type: SignatureType::from_u8(row.get::<_, u8>("signature_type")?)
.unwrap_or(SignatureType::PolyGnosisSafe),
safe_deployed: row.get::<_, i32>("safe_deployed")? != 0,
approvals_set: row.get::<_, i32>("approvals_set")? != 0,
created_at: row.get("created_at")?,
updated_at: row.get("updated_at")?,
})
});
match result {
Ok(creds) => Ok(Some(creds)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(Error::Sqlite(e)),
}
}
pub fn upsert_credentials(&self, creds: &StoredCredentials) -> Result<()> {
self.conn.execute(
"INSERT INTO clob_credentials (wallet_address, funder_address, api_key, api_secret, api_passphrase, signature_type, safe_deployed, approvals_set, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
ON CONFLICT(wallet_address) DO UPDATE SET
funder_address = excluded.funder_address,
api_key = excluded.api_key,
api_secret = excluded.api_secret,
api_passphrase = excluded.api_passphrase,
signature_type = excluded.signature_type,
safe_deployed = excluded.safe_deployed,
approvals_set = excluded.approvals_set,
updated_at = excluded.updated_at",
params![
creds.wallet_address.to_lowercase(),
creds.funder_address.as_ref().map(|a| a.to_lowercase()),
creds.api_key,
creds.api_secret,
creds.api_passphrase,
creds.signature_type.as_u8(),
creds.safe_deployed as i32,
creds.approvals_set as i32,
creds.created_at,
creds.updated_at,
],
).map_err(Error::Sqlite)?;
Ok(())
}
pub fn delete_credentials(&self, wallet_address: &str) -> Result<()> {
self.conn
.execute(
"DELETE FROM clob_credentials WHERE wallet_address = ?1",
params![wallet_address.to_lowercase()],
)
.map_err(Error::Sqlite)?;
Ok(())
}
pub fn get_all_credentials(&self) -> Result<Vec<StoredCredentials>> {
let mut stmt = self
.conn
.prepare("SELECT * FROM clob_credentials ORDER BY created_at DESC")
.map_err(Error::Sqlite)?;
let rows = stmt
.query_map([], |row| {
Ok(StoredCredentials {
wallet_address: row.get("wallet_address")?,
funder_address: row.get("funder_address")?,
api_key: row.get("api_key")?,
api_secret: row.get("api_secret")?,
api_passphrase: row.get("api_passphrase")?,
signature_type: SignatureType::from_u8(row.get::<_, u8>("signature_type")?)
.unwrap_or(SignatureType::PolyGnosisSafe),
safe_deployed: row.get::<_, i32>("safe_deployed")? != 0,
approvals_set: row.get::<_, i32>("approvals_set")? != 0,
created_at: row.get("created_at")?,
updated_at: row.get("updated_at")?,
})
})
.map_err(Error::Sqlite)?;
let mut result = Vec::new();
for row in rows {
result.push(row.map_err(Error::Sqlite)?);
}
Ok(result)
}
pub fn get_market_meta(&self, token_id: &str) -> Result<Option<MarketMeta>> {
let result = self.conn.query_row(
"SELECT * FROM market_meta WHERE token_id = ?1",
params![token_id],
|row| {
let fetched_at: f64 = row.get("fetched_at")?;
Ok(MarketMeta {
token_id: row.get("token_id")?,
tick_size: row.get("tick_size")?,
fee_rate_bps: row.get("fee_rate_bps")?,
neg_risk: row.get::<_, i32>("neg_risk")? != 0,
fetched_at,
})
},
);
match result {
Ok(meta) => {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs_f64();
if now - meta.fetched_at > META_TTL_SECONDS {
Ok(None) } else {
Ok(Some(meta))
}
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(Error::Sqlite(e)),
}
}
pub fn upsert_market_meta(&self, meta: &MarketMeta) -> Result<()> {
self.conn
.execute(
"INSERT INTO market_meta (token_id, tick_size, fee_rate_bps, neg_risk, fetched_at)
VALUES (?1, ?2, ?3, ?4, ?5)
ON CONFLICT(token_id) DO UPDATE SET
tick_size = excluded.tick_size,
fee_rate_bps = excluded.fee_rate_bps,
neg_risk = excluded.neg_risk,
fetched_at = excluded.fetched_at",
params![
meta.token_id,
meta.tick_size,
meta.fee_rate_bps,
meta.neg_risk as i32,
meta.fetched_at
],
)
.map_err(Error::Sqlite)?;
Ok(())
}
pub fn insert_order(&self, wallet_address: &str, order: &OrderHistoryInsert) -> Result<i64> {
self.conn.execute(
"INSERT INTO order_history (wallet_address, order_id, token_id, side, price, size, order_type, status, error_msg, response_json, created_at, fee_amount_raw, escrow_order_id, fee_escrow_tx_hash)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
params![
wallet_address.to_lowercase(),
order.order_id,
order.token_id,
order.side,
order.price,
order.size,
order.order_type,
order.status,
order.error_msg,
order.response_json,
order.created_at,
order.fee_amount_raw,
order.escrow_order_id,
order.fee_escrow_tx_hash,
],
).map_err(Error::Sqlite)?;
Ok(self.conn.last_insert_rowid())
}
pub fn update_order_status(
&self,
id: i64,
status: &str,
order_id: Option<&str>,
error_msg: Option<&str>,
response_json: Option<&str>,
fee_amount_raw: Option<&str>,
escrow_order_id: Option<&str>,
fee_escrow_tx_hash: Option<&str>,
) -> Result<()> {
self.conn.execute(
"UPDATE order_history SET status = ?1, order_id = COALESCE(?2, order_id), error_msg = COALESCE(?3, error_msg), response_json = COALESCE(?4, response_json), fee_amount_raw = COALESCE(?5, fee_amount_raw), escrow_order_id = COALESCE(?6, escrow_order_id), fee_escrow_tx_hash = COALESCE(?7, fee_escrow_tx_hash) WHERE id = ?8",
params![status, order_id, error_msg, response_json, fee_amount_raw, escrow_order_id, fee_escrow_tx_hash, id],
).map_err(Error::Sqlite)?;
Ok(())
}
pub fn get_order_history(
&self,
wallet: &str,
params: &HistoryParams,
) -> Result<Vec<OrderHistoryRow>> {
let mut sql = "SELECT * FROM order_history WHERE wallet_address = ?1".to_string();
let mut args: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(wallet.to_lowercase())];
if let Some(ref token_id) = params.token_id {
sql.push_str(" AND token_id = ?");
args.push(Box::new(token_id.clone()));
}
if let Some(side) = params.side {
sql.push_str(" AND side = ?");
args.push(Box::new(side.to_string()));
}
sql.push_str(" ORDER BY created_at DESC");
if let Some(limit) = params.limit {
sql.push_str(&format!(" LIMIT {}", limit));
}
if let Some(offset) = params.offset {
sql.push_str(&format!(" OFFSET {}", offset));
}
let params_refs: Vec<&dyn rusqlite::types::ToSql> = args.iter().map(|b| &**b).collect();
let mut stmt = self.conn.prepare(&sql).map_err(Error::Sqlite)?;
let rows = stmt
.query_map(params_refs.as_slice(), |row| {
Ok(OrderHistoryRow {
id: row.get("id")?,
wallet_address: row.get("wallet_address")?,
order_id: row.get("order_id")?,
token_id: row.get("token_id")?,
side: row.get("side")?,
price: row.get("price")?,
size: row.get("size")?,
order_type: row.get("order_type")?,
status: row.get("status")?,
error_msg: row.get("error_msg")?,
response_json: row.get("response_json")?,
created_at: row.get("created_at")?,
fee_amount_raw: row.get("fee_amount_raw").unwrap_or(None),
escrow_order_id: row.get("escrow_order_id").unwrap_or(None),
fee_escrow_tx_hash: row.get("fee_escrow_tx_hash").unwrap_or(None),
})
})
.map_err(Error::Sqlite)?;
let mut result = Vec::new();
for row in rows {
result.push(row.map_err(Error::Sqlite)?);
}
Ok(result)
}
pub fn close(self) {
drop(self.conn);
}
}
pub struct OrderHistoryInsert {
pub order_id: Option<String>,
pub token_id: String,
pub side: String,
pub price: f64,
pub size: f64,
pub order_type: String,
pub status: String,
pub error_msg: Option<String>,
pub response_json: Option<String>,
pub created_at: f64,
pub fee_amount_raw: Option<String>,
pub escrow_order_id: Option<String>,
pub fee_escrow_tx_hash: Option<String>,
}