polynode 0.7.3

Rust SDK for the PolyNode API — real-time Polymarket data
Documentation
//! SQLite storage for credentials, market metadata, and order history.

use rusqlite::{params, Connection};

use crate::error::{Error, Result};
use super::constants::META_TTL_SECONDS;
use super::types::*;

const SCHEMA_VERSION: i64 = 2;

const SCHEMA_SQL: &str = r#"
CREATE TABLE IF NOT EXISTS _trading_meta (
  key TEXT PRIMARY KEY,
  value TEXT NOT NULL
);

CREATE TABLE IF NOT EXISTS clob_credentials (
  wallet_address TEXT PRIMARY KEY,
  funder_address TEXT,
  api_key TEXT NOT NULL,
  api_secret TEXT NOT NULL,
  api_passphrase TEXT NOT NULL,
  signature_type INTEGER NOT NULL DEFAULT 2,
  safe_deployed INTEGER NOT NULL DEFAULT 0,
  approvals_set INTEGER NOT NULL DEFAULT 0,
  created_at REAL NOT NULL,
  updated_at REAL NOT NULL
);

CREATE TABLE IF NOT EXISTS market_meta (
  token_id TEXT PRIMARY KEY,
  tick_size TEXT NOT NULL,
  fee_rate_bps INTEGER NOT NULL,
  neg_risk INTEGER NOT NULL DEFAULT 0,
  fetched_at REAL NOT NULL
);

CREATE TABLE IF NOT EXISTS order_history (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  wallet_address TEXT NOT NULL,
  order_id TEXT,
  token_id TEXT NOT NULL,
  side TEXT NOT NULL,
  price REAL NOT NULL,
  size REAL NOT NULL,
  order_type TEXT NOT NULL,
  status TEXT NOT NULL DEFAULT 'submitted',
  error_msg TEXT,
  response_json TEXT,
  created_at REAL NOT NULL,
  fee_amount_raw TEXT,
  escrow_order_id TEXT,
  fee_escrow_tx_hash TEXT
);

CREATE INDEX IF NOT EXISTS idx_oh_wallet ON order_history(wallet_address);
CREATE INDEX IF NOT EXISTS idx_oh_token ON order_history(token_id);
"#;

pub struct TradingSqliteBackend {
    conn: Connection,
}

impl TradingSqliteBackend {
    pub fn open(db_path: &str) -> Result<Self> {
        let conn = Connection::open(db_path)
            .map_err(Error::Sqlite)?;
        conn.execute_batch("PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL;")
            .map_err(Error::Sqlite)?;
        conn.execute_batch(SCHEMA_SQL)
            .map_err(Error::Sqlite)?;

        // Check schema version
        let version: Option<String> = conn
            .query_row(
                "SELECT value FROM _trading_meta WHERE key = 'schema_version'",
                [],
                |row| row.get(0),
            )
            .ok();

        if version.is_none() {
            conn.execute(
                "INSERT INTO _trading_meta (key, value) VALUES ('schema_version', ?1)",
                params![SCHEMA_VERSION.to_string()],
            ).map_err(Error::Sqlite)?;
        } else {
            let ver: i64 = version.unwrap_or_default().parse().unwrap_or(1);
            if ver < 2 {
                // v1 → v2: add fee escrow columns to order_history
                conn.execute_batch(
                    "ALTER TABLE order_history ADD COLUMN fee_amount_raw TEXT;
                     ALTER TABLE order_history ADD COLUMN escrow_order_id TEXT;
                     ALTER TABLE order_history ADD COLUMN fee_escrow_tx_hash TEXT;"
                ).map_err(Error::Sqlite)?;
                conn.execute(
                    "UPDATE _trading_meta SET value = ?1 WHERE key = 'schema_version'",
                    params![SCHEMA_VERSION.to_string()],
                ).map_err(Error::Sqlite)?;
            }
        }

        Ok(Self { conn })
    }

    // ── Credentials ──

    pub fn get_credentials(&self, wallet_address: &str) -> Result<Option<StoredCredentials>> {
        let mut stmt = self.conn.prepare(
            "SELECT * FROM clob_credentials WHERE wallet_address = ?1"
        ).map_err(Error::Sqlite)?;

        let result = stmt.query_row(params![wallet_address.to_lowercase()], |row| {
            Ok(StoredCredentials {
                wallet_address: row.get("wallet_address")?,
                funder_address: row.get("funder_address")?,
                api_key: row.get("api_key")?,
                api_secret: row.get("api_secret")?,
                api_passphrase: row.get("api_passphrase")?,
                signature_type: SignatureType::from_u8(row.get::<_, u8>("signature_type")?)
                    .unwrap_or(SignatureType::PolyGnosisSafe),
                safe_deployed: row.get::<_, i32>("safe_deployed")? != 0,
                approvals_set: row.get::<_, i32>("approvals_set")? != 0,
                created_at: row.get("created_at")?,
                updated_at: row.get("updated_at")?,
            })
        });

        match result {
            Ok(creds) => Ok(Some(creds)),
            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
            Err(e) => Err(Error::Sqlite(e)),
        }
    }

    pub fn upsert_credentials(&self, creds: &StoredCredentials) -> Result<()> {
        self.conn.execute(
            "INSERT INTO clob_credentials (wallet_address, funder_address, api_key, api_secret, api_passphrase, signature_type, safe_deployed, approvals_set, created_at, updated_at)
             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
             ON CONFLICT(wallet_address) DO UPDATE SET
               funder_address = excluded.funder_address,
               api_key = excluded.api_key,
               api_secret = excluded.api_secret,
               api_passphrase = excluded.api_passphrase,
               signature_type = excluded.signature_type,
               safe_deployed = excluded.safe_deployed,
               approvals_set = excluded.approvals_set,
               updated_at = excluded.updated_at",
            params![
                creds.wallet_address.to_lowercase(),
                creds.funder_address.as_ref().map(|a| a.to_lowercase()),
                creds.api_key,
                creds.api_secret,
                creds.api_passphrase,
                creds.signature_type.as_u8(),
                creds.safe_deployed as i32,
                creds.approvals_set as i32,
                creds.created_at,
                creds.updated_at,
            ],
        ).map_err(Error::Sqlite)?;
        Ok(())
    }

    pub fn delete_credentials(&self, wallet_address: &str) -> Result<()> {
        self.conn.execute(
            "DELETE FROM clob_credentials WHERE wallet_address = ?1",
            params![wallet_address.to_lowercase()],
        ).map_err(Error::Sqlite)?;
        Ok(())
    }

    pub fn get_all_credentials(&self) -> Result<Vec<StoredCredentials>> {
        let mut stmt = self.conn.prepare(
            "SELECT * FROM clob_credentials ORDER BY created_at DESC"
        ).map_err(Error::Sqlite)?;

        let rows = stmt.query_map([], |row| {
            Ok(StoredCredentials {
                wallet_address: row.get("wallet_address")?,
                funder_address: row.get("funder_address")?,
                api_key: row.get("api_key")?,
                api_secret: row.get("api_secret")?,
                api_passphrase: row.get("api_passphrase")?,
                signature_type: SignatureType::from_u8(row.get::<_, u8>("signature_type")?)
                    .unwrap_or(SignatureType::PolyGnosisSafe),
                safe_deployed: row.get::<_, i32>("safe_deployed")? != 0,
                approvals_set: row.get::<_, i32>("approvals_set")? != 0,
                created_at: row.get("created_at")?,
                updated_at: row.get("updated_at")?,
            })
        }).map_err(Error::Sqlite)?;

        let mut result = Vec::new();
        for row in rows {
            result.push(row.map_err(Error::Sqlite)?);
        }
        Ok(result)
    }

    // ── Market Metadata ──

    pub fn get_market_meta(&self, token_id: &str) -> Result<Option<MarketMeta>> {
        let result = self.conn.query_row(
            "SELECT * FROM market_meta WHERE token_id = ?1",
            params![token_id],
            |row| {
                let fetched_at: f64 = row.get("fetched_at")?;
                Ok(MarketMeta {
                    token_id: row.get("token_id")?,
                    tick_size: row.get("tick_size")?,
                    fee_rate_bps: row.get("fee_rate_bps")?,
                    neg_risk: row.get::<_, i32>("neg_risk")? != 0,
                    fetched_at,
                })
            },
        );

        match result {
            Ok(meta) => {
                let now = std::time::SystemTime::now()
                    .duration_since(std::time::UNIX_EPOCH)
                    .unwrap()
                    .as_secs_f64();
                if now - meta.fetched_at > META_TTL_SECONDS {
                    Ok(None) // expired
                } else {
                    Ok(Some(meta))
                }
            }
            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
            Err(e) => Err(Error::Sqlite(e)),
        }
    }

    pub fn upsert_market_meta(&self, meta: &MarketMeta) -> Result<()> {
        self.conn.execute(
            "INSERT INTO market_meta (token_id, tick_size, fee_rate_bps, neg_risk, fetched_at)
             VALUES (?1, ?2, ?3, ?4, ?5)
             ON CONFLICT(token_id) DO UPDATE SET
               tick_size = excluded.tick_size,
               fee_rate_bps = excluded.fee_rate_bps,
               neg_risk = excluded.neg_risk,
               fetched_at = excluded.fetched_at",
            params![meta.token_id, meta.tick_size, meta.fee_rate_bps, meta.neg_risk as i32, meta.fetched_at],
        ).map_err(Error::Sqlite)?;
        Ok(())
    }

    // ── Order History ──

    pub fn insert_order(&self, wallet_address: &str, order: &OrderHistoryInsert) -> Result<i64> {
        self.conn.execute(
            "INSERT INTO order_history (wallet_address, order_id, token_id, side, price, size, order_type, status, error_msg, response_json, created_at, fee_amount_raw, escrow_order_id, fee_escrow_tx_hash)
             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
            params![
                wallet_address.to_lowercase(),
                order.order_id,
                order.token_id,
                order.side,
                order.price,
                order.size,
                order.order_type,
                order.status,
                order.error_msg,
                order.response_json,
                order.created_at,
                order.fee_amount_raw,
                order.escrow_order_id,
                order.fee_escrow_tx_hash,
            ],
        ).map_err(Error::Sqlite)?;
        Ok(self.conn.last_insert_rowid())
    }

    pub fn update_order_status(
        &self,
        id: i64,
        status: &str,
        order_id: Option<&str>,
        error_msg: Option<&str>,
        response_json: Option<&str>,
        fee_amount_raw: Option<&str>,
        escrow_order_id: Option<&str>,
        fee_escrow_tx_hash: Option<&str>,
    ) -> Result<()> {
        self.conn.execute(
            "UPDATE order_history SET status = ?1, order_id = COALESCE(?2, order_id), error_msg = COALESCE(?3, error_msg), response_json = COALESCE(?4, response_json), fee_amount_raw = COALESCE(?5, fee_amount_raw), escrow_order_id = COALESCE(?6, escrow_order_id), fee_escrow_tx_hash = COALESCE(?7, fee_escrow_tx_hash) WHERE id = ?8",
            params![status, order_id, error_msg, response_json, fee_amount_raw, escrow_order_id, fee_escrow_tx_hash, id],
        ).map_err(Error::Sqlite)?;
        Ok(())
    }

    pub fn get_order_history(&self, wallet: &str, params: &HistoryParams) -> Result<Vec<OrderHistoryRow>> {
        let mut sql = "SELECT * FROM order_history WHERE wallet_address = ?1".to_string();
        let mut args: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(wallet.to_lowercase())];

        if let Some(ref token_id) = params.token_id {
            sql.push_str(" AND token_id = ?");
            args.push(Box::new(token_id.clone()));
        }
        if let Some(side) = params.side {
            sql.push_str(" AND side = ?");
            args.push(Box::new(side.to_string()));
        }

        sql.push_str(" ORDER BY created_at DESC");

        if let Some(limit) = params.limit {
            sql.push_str(&format!(" LIMIT {}", limit));
        }
        if let Some(offset) = params.offset {
            sql.push_str(&format!(" OFFSET {}", offset));
        }

        let params_refs: Vec<&dyn rusqlite::types::ToSql> = args.iter().map(|b| &**b).collect();
        let mut stmt = self.conn.prepare(&sql).map_err(Error::Sqlite)?;
        let rows = stmt.query_map(params_refs.as_slice(), |row| {
            Ok(OrderHistoryRow {
                id: row.get("id")?,
                wallet_address: row.get("wallet_address")?,
                order_id: row.get("order_id")?,
                token_id: row.get("token_id")?,
                side: row.get("side")?,
                price: row.get("price")?,
                size: row.get("size")?,
                order_type: row.get("order_type")?,
                status: row.get("status")?,
                error_msg: row.get("error_msg")?,
                response_json: row.get("response_json")?,
                created_at: row.get("created_at")?,
                fee_amount_raw: row.get("fee_amount_raw").unwrap_or(None),
                escrow_order_id: row.get("escrow_order_id").unwrap_or(None),
                fee_escrow_tx_hash: row.get("fee_escrow_tx_hash").unwrap_or(None),
            })
        }).map_err(Error::Sqlite)?;

        let mut result = Vec::new();
        for row in rows {
            result.push(row.map_err(Error::Sqlite)?);
        }
        Ok(result)
    }

    pub fn close(self) {
        drop(self.conn);
    }
}

/// Insert data for a new order (without the auto-generated id).
pub struct OrderHistoryInsert {
    pub order_id: Option<String>,
    pub token_id: String,
    pub side: String,
    pub price: f64,
    pub size: f64,
    pub order_type: String,
    pub status: String,
    pub error_msg: Option<String>,
    pub response_json: Option<String>,
    pub created_at: f64,
    pub fee_amount_raw: Option<String>,
    pub escrow_order_id: Option<String>,
    pub fee_escrow_tx_hash: Option<String>,
}