Skip to main content

sandbox_quant/
order_store.rs

1use anyhow::Result;
2use chrono::TimeZone;
3use rusqlite::{params, Connection};
4use std::collections::HashMap;
5
6use crate::binance::types::{BinanceAllOrder, BinanceMyTrade};
7
8#[derive(Debug, Clone)]
9pub struct PersistedTrade {
10    pub trade: BinanceMyTrade,
11    pub source: String,
12}
13
14#[derive(Debug, Clone)]
15pub struct DailyRealizedReturn {
16    pub symbol: String,
17    pub date: String,
18    pub realized_return_pct: f64,
19}
20
21#[derive(Debug, Clone)]
22pub struct StrategyScopedStats {
23    pub trade_count: u32,
24    pub win_count: u32,
25    pub lose_count: u32,
26    pub realized_pnl: f64,
27}
28
29fn ensure_trade_schema(conn: &Connection) -> Result<()> {
30    for ddl in [
31        "ALTER TABLE order_history_trades ADD COLUMN commission REAL NOT NULL DEFAULT 0.0",
32        "ALTER TABLE order_history_trades ADD COLUMN commission_asset TEXT NOT NULL DEFAULT ''",
33        "ALTER TABLE order_history_trades ADD COLUMN realized_pnl REAL NOT NULL DEFAULT 0.0",
34    ] {
35        if let Err(e) = conn.execute(ddl, []) {
36            let msg = e.to_string();
37            if !msg.contains("duplicate column name") {
38                return Err(e.into());
39            }
40        }
41    }
42    Ok(())
43}
44
45fn ensure_strategy_stats_schema(conn: &Connection) -> Result<()> {
46    conn.execute_batch(
47        r#"
48        CREATE TABLE IF NOT EXISTS strategy_symbol_stats (
49            symbol TEXT NOT NULL,
50            source TEXT NOT NULL,
51            trade_count INTEGER NOT NULL,
52            win_count INTEGER NOT NULL,
53            lose_count INTEGER NOT NULL,
54            realized_pnl REAL NOT NULL,
55            updated_at_ms INTEGER NOT NULL,
56            PRIMARY KEY(symbol, source)
57        );
58        "#,
59    )?;
60    Ok(())
61}
62
63pub fn persist_strategy_symbol_stats(
64    symbol: &str,
65    stats: &HashMap<String, StrategyScopedStats>,
66) -> Result<()> {
67    std::fs::create_dir_all("data")?;
68    let mut conn = Connection::open("data/strategy_stats.sqlite")?;
69    ensure_strategy_stats_schema(&conn)?;
70    let tx = conn.transaction()?;
71    let now_ms = chrono::Utc::now().timestamp_millis();
72
73    for (source, row) in stats {
74        tx.execute(
75            r#"
76            INSERT INTO strategy_symbol_stats (
77                symbol, source, trade_count, win_count, lose_count, realized_pnl, updated_at_ms
78            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
79            ON CONFLICT(symbol, source) DO UPDATE SET
80                trade_count = excluded.trade_count,
81                win_count = excluded.win_count,
82                lose_count = excluded.lose_count,
83                realized_pnl = excluded.realized_pnl,
84                updated_at_ms = excluded.updated_at_ms
85            "#,
86            params![
87                symbol,
88                source,
89                row.trade_count as i64,
90                row.win_count as i64,
91                row.lose_count as i64,
92                row.realized_pnl,
93                now_ms,
94            ],
95        )?;
96    }
97    tx.commit()?;
98    Ok(())
99}
100
101pub fn load_strategy_symbol_stats(symbol: &str) -> Result<HashMap<String, StrategyScopedStats>> {
102    std::fs::create_dir_all("data")?;
103    let conn = Connection::open("data/strategy_stats.sqlite")?;
104    ensure_strategy_stats_schema(&conn)?;
105    let mut stmt = conn.prepare(
106        r#"
107        SELECT source, trade_count, win_count, lose_count, realized_pnl
108        FROM strategy_symbol_stats
109        WHERE symbol = ?1
110        "#,
111    )?;
112
113    let rows = stmt.query_map([symbol], |row| {
114        Ok((
115            row.get::<_, String>(0)?,
116            StrategyScopedStats {
117                trade_count: row.get::<_, i64>(1)?.max(0) as u32,
118                win_count: row.get::<_, i64>(2)?.max(0) as u32,
119                lose_count: row.get::<_, i64>(3)?.max(0) as u32,
120                realized_pnl: row.get::<_, f64>(4)?,
121            },
122        ))
123    })?;
124
125    let mut out = HashMap::new();
126    for row in rows {
127        let (source, stats) = row?;
128        out.insert(source, stats);
129    }
130    Ok(out)
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum HistoryBucket {
135    Day,
136    Hour,
137    Month,
138}
139
140pub fn persist_order_snapshot(
141    symbol: &str,
142    orders: &[BinanceAllOrder],
143    trades: &[BinanceMyTrade],
144) -> Result<()> {
145    std::fs::create_dir_all("data")?;
146    let mut conn = Connection::open("data/order_history.sqlite")?;
147    conn.execute_batch(
148        r#"
149        CREATE TABLE IF NOT EXISTS order_history_orders (
150            symbol TEXT NOT NULL,
151            order_id INTEGER NOT NULL,
152            client_order_id TEXT NOT NULL,
153            status TEXT NOT NULL,
154            side TEXT NOT NULL,
155            orig_qty REAL NOT NULL,
156            executed_qty REAL NOT NULL,
157            avg_price REAL NOT NULL,
158            event_time_ms INTEGER NOT NULL,
159            source TEXT NOT NULL,
160            updated_at_ms INTEGER NOT NULL,
161            PRIMARY KEY(symbol, order_id)
162        );
163
164        CREATE TABLE IF NOT EXISTS order_history_trades (
165            symbol TEXT NOT NULL,
166            trade_id INTEGER NOT NULL,
167            order_id INTEGER NOT NULL,
168            side TEXT NOT NULL,
169            qty REAL NOT NULL,
170            price REAL NOT NULL,
171            commission REAL NOT NULL DEFAULT 0.0,
172            commission_asset TEXT NOT NULL DEFAULT '',
173            event_time_ms INTEGER NOT NULL,
174            realized_pnl REAL NOT NULL DEFAULT 0.0,
175            source TEXT NOT NULL,
176            updated_at_ms INTEGER NOT NULL,
177            PRIMARY KEY(symbol, trade_id)
178        );
179        "#,
180    )?;
181    ensure_trade_schema(&conn)?;
182
183    let now_ms = chrono::Utc::now().timestamp_millis();
184    let tx = conn.transaction()?;
185    let mut source_by_order_id = std::collections::HashMap::new();
186
187    for o in orders {
188        let avg_price = if o.executed_qty > 0.0 {
189            o.cummulative_quote_qty / o.executed_qty
190        } else {
191            o.price
192        };
193        tx.execute(
194            r#"
195            INSERT INTO order_history_orders (
196                symbol, order_id, client_order_id, status, side,
197                orig_qty, executed_qty, avg_price, event_time_ms, source, updated_at_ms
198            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
199            ON CONFLICT(symbol, order_id) DO UPDATE SET
200                client_order_id = excluded.client_order_id,
201                status = excluded.status,
202                side = excluded.side,
203                orig_qty = excluded.orig_qty,
204                executed_qty = excluded.executed_qty,
205                avg_price = excluded.avg_price,
206                event_time_ms = excluded.event_time_ms,
207                source = excluded.source,
208                updated_at_ms = excluded.updated_at_ms
209            "#,
210            params![
211                symbol,
212                o.order_id as i64,
213                o.client_order_id,
214                o.status,
215                o.side,
216                o.orig_qty,
217                o.executed_qty,
218                avg_price,
219                o.update_time.max(o.time) as i64,
220                source_label_from_client_order_id(&o.client_order_id),
221                now_ms,
222            ],
223        )?;
224        source_by_order_id.insert(
225            o.order_id,
226            source_label_from_client_order_id(&o.client_order_id).to_string(),
227        );
228    }
229
230    for t in trades {
231        tx.execute(
232            r#"
233            INSERT INTO order_history_trades (
234                symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source, updated_at_ms
235            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
236            ON CONFLICT(symbol, trade_id) DO UPDATE SET
237                order_id = excluded.order_id,
238                side = excluded.side,
239                qty = excluded.qty,
240                price = excluded.price,
241                commission = excluded.commission,
242                commission_asset = excluded.commission_asset,
243                event_time_ms = excluded.event_time_ms,
244                realized_pnl = excluded.realized_pnl,
245                source = excluded.source,
246                updated_at_ms = excluded.updated_at_ms
247            "#,
248            params![
249                symbol,
250                t.id as i64,
251                t.order_id as i64,
252                if t.is_buyer { "BUY" } else { "SELL" },
253                t.qty,
254                t.price,
255                t.commission,
256                t.commission_asset,
257                t.time as i64,
258                t.realized_pnl,
259                source_by_order_id
260                    .get(&t.order_id)
261                    .map(String::as_str)
262                    .unwrap_or("UNKNOWN"),
263                now_ms,
264            ],
265        )?;
266    }
267
268    tx.commit()?;
269    Ok(())
270}
271
272fn source_label_from_client_order_id(client_order_id: &str) -> &'static str {
273    if client_order_id.contains("-mnl-") {
274        "MANUAL"
275    } else if client_order_id.contains("-cfg-") {
276        "MA(Config)"
277    } else if client_order_id.contains("-fst-") {
278        "MA(Fast 5/20)"
279    } else if client_order_id.contains("-slw-") {
280        "MA(Slow 20/60)"
281    } else {
282        "UNKNOWN"
283    }
284}
285
286pub fn load_persisted_trades(symbol: &str) -> Result<Vec<PersistedTrade>> {
287    std::fs::create_dir_all("data")?;
288    let conn = Connection::open("data/order_history.sqlite")?;
289    ensure_trade_schema(&conn)?;
290    let mut stmt = conn.prepare(
291        r#"
292        SELECT trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source
293        FROM order_history_trades
294        WHERE symbol = ?1
295        ORDER BY event_time_ms ASC, trade_id ASC
296        "#,
297    )?;
298
299    let rows = stmt.query_map([symbol], |row| {
300        let side: String = row.get(2)?;
301        let is_buyer = side.eq_ignore_ascii_case("BUY");
302        let trade = BinanceMyTrade {
303            symbol: symbol.to_string(),
304            id: row.get::<_, i64>(0)? as u64,
305            order_id: row.get::<_, i64>(1)? as u64,
306            price: row.get(4)?,
307            qty: row.get(3)?,
308            commission: row.get(5)?,
309            commission_asset: row.get(6)?,
310            time: row.get::<_, i64>(7)? as u64,
311            realized_pnl: row.get(8)?,
312            is_buyer,
313            is_maker: false,
314        };
315        Ok(PersistedTrade {
316            trade,
317            source: row.get(9)?,
318        })
319    })?;
320
321    let mut trades = Vec::new();
322    for row in rows {
323        trades.push(row?);
324    }
325    Ok(trades)
326}
327
328pub fn load_last_trade_id(symbol: &str) -> Result<Option<u64>> {
329    std::fs::create_dir_all("data")?;
330    let conn = Connection::open("data/order_history.sqlite")?;
331    let mut stmt = conn.prepare(
332        r#"
333        SELECT MAX(trade_id)
334        FROM order_history_trades
335        WHERE symbol = ?1
336        "#,
337    )?;
338    let max_id = stmt.query_row([symbol], |row| row.get::<_, Option<i64>>(0))?;
339    Ok(max_id.map(|v| v as u64))
340}
341
342pub fn load_trade_count(symbol: &str) -> Result<usize> {
343    std::fs::create_dir_all("data")?;
344    let conn = Connection::open("data/order_history.sqlite")?;
345    let mut stmt = conn.prepare(
346        r#"
347        SELECT COUNT(*)
348        FROM order_history_trades
349        WHERE symbol = ?1
350        "#,
351    )?;
352    let count = stmt.query_row([symbol], |row| row.get::<_, i64>(0))?;
353    Ok(count.max(0) as usize)
354}
355
356#[derive(Clone, Copy, Default)]
357struct LongPos {
358    qty: f64,
359    cost_quote: f64,
360}
361
362#[derive(Clone, Copy, Default)]
363struct DailyBucket {
364    pnl: f64,
365    basis: f64,
366}
367
368pub fn load_realized_returns_by_bucket(
369    bucket: HistoryBucket,
370    limit: usize,
371) -> Result<Vec<DailyRealizedReturn>> {
372    std::fs::create_dir_all("data")?;
373    let conn = Connection::open("data/order_history.sqlite")?;
374    ensure_trade_schema(&conn)?;
375    let mut stmt = conn.prepare(
376        r#"
377        SELECT symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl
378        FROM order_history_trades
379        ORDER BY symbol ASC, event_time_ms ASC, trade_id ASC
380        "#,
381    )?;
382
383    let rows = stmt.query_map([], |row| {
384        Ok((
385            row.get::<_, String>(0)?,
386            row.get::<_, i64>(1)? as u64,
387            row.get::<_, i64>(2)? as u64,
388            row.get::<_, String>(3)?,
389            row.get::<_, f64>(4)?,
390            row.get::<_, f64>(5)?,
391            row.get::<_, f64>(6)?,
392            row.get::<_, String>(7)?,
393            row.get::<_, i64>(8)? as u64,
394            row.get::<_, f64>(9)?,
395        ))
396    })?;
397
398    let mut pos_by_symbol: HashMap<String, LongPos> = HashMap::new();
399    let mut daily_by_key: HashMap<(String, String), DailyBucket> = HashMap::new();
400
401    for row in rows {
402        let (
403            symbol,
404            _trade_id,
405            _order_id,
406            side,
407            qty_raw,
408            price,
409            commission,
410            commission_asset,
411            event_time_ms,
412            realized_pnl,
413        ) = row?;
414        let qty = qty_raw.max(0.0);
415        if qty <= f64::EPSILON {
416            continue;
417        }
418
419        let (base_asset, quote_asset) = split_symbol_assets(&symbol);
420        let fee_is_base =
421            !base_asset.is_empty() && commission_asset.eq_ignore_ascii_case(&base_asset);
422        let fee_is_quote =
423            !quote_asset.is_empty() && commission_asset.eq_ignore_ascii_case(&quote_asset);
424        let pos = pos_by_symbol.entry(symbol.clone()).or_default();
425
426        let date = chrono::Utc
427            .timestamp_millis_opt(event_time_ms as i64)
428            .single()
429            .map(|dt| dt.with_timezone(&chrono::Local))
430            .map(|dt| match bucket {
431                HistoryBucket::Day => dt.format("%Y-%m-%d").to_string(),
432                HistoryBucket::Hour => dt.format("%Y-%m-%d %H:00").to_string(),
433                HistoryBucket::Month => dt.format("%Y-%m").to_string(),
434            })
435            .unwrap_or_else(|| "unknown".to_string());
436
437        // Futures: realized_pnl is provided directly by exchange, do not apply spot long inventory logic.
438        if symbol.ends_with("#FUT") {
439            let basis = (qty * price).abs();
440            let bucket = daily_by_key.entry((symbol.clone(), date)).or_default();
441            bucket.pnl += realized_pnl;
442            bucket.basis += basis;
443            continue;
444        }
445
446        if side.eq_ignore_ascii_case("BUY") {
447            let net_qty = (qty
448                - if fee_is_base {
449                    commission.max(0.0)
450                } else {
451                    0.0
452                })
453            .max(0.0);
454            if net_qty <= f64::EPSILON {
455                continue;
456            }
457            let fee_quote = if fee_is_quote {
458                commission.max(0.0)
459            } else {
460                0.0
461            };
462            pos.qty += net_qty;
463            pos.cost_quote += qty * price + fee_quote;
464            continue;
465        }
466
467        if pos.qty <= f64::EPSILON {
468            continue;
469        }
470        let close_qty = qty.min(pos.qty);
471        if close_qty <= f64::EPSILON {
472            continue;
473        }
474        let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
475        let fee_quote_total = if fee_is_quote {
476            commission.max(0.0)
477        } else if fee_is_base {
478            commission.max(0.0) * price
479        } else {
480            0.0
481        };
482        let fee_quote = fee_quote_total * (close_qty / qty.max(f64::EPSILON));
483        let realized_pnl = (close_qty * price - fee_quote) - (avg_cost * close_qty);
484        let realized_basis = avg_cost * close_qty;
485
486        let bucket = daily_by_key.entry((symbol.clone(), date)).or_default();
487        bucket.pnl += realized_pnl;
488        bucket.basis += realized_basis;
489
490        pos.qty -= close_qty;
491        pos.cost_quote -= realized_basis;
492        if pos.qty <= f64::EPSILON {
493            pos.qty = 0.0;
494            pos.cost_quote = 0.0;
495        }
496    }
497
498    let mut out: Vec<DailyRealizedReturn> = daily_by_key
499        .into_iter()
500        .map(|((symbol, date), b)| DailyRealizedReturn {
501            symbol,
502            date,
503            realized_return_pct: if b.basis.abs() > f64::EPSILON {
504                (b.pnl / b.basis) * 100.0
505            } else {
506                0.0
507            },
508        })
509        .collect();
510
511    out.sort_by(|a, b| b.date.cmp(&a.date).then_with(|| a.symbol.cmp(&b.symbol)));
512    if out.len() > limit {
513        out.truncate(limit);
514    }
515    Ok(out)
516}
517
518pub fn load_daily_realized_returns(limit: usize) -> Result<Vec<DailyRealizedReturn>> {
519    load_realized_returns_by_bucket(HistoryBucket::Day, limit)
520}
521
522fn split_symbol_assets(symbol: &str) -> (String, String) {
523    const QUOTE_SUFFIXES: [&str; 10] = [
524        "USDT", "USDC", "FDUSD", "BUSD", "TUSD", "TRY", "EUR", "BTC", "ETH", "BNB",
525    ];
526    for q in QUOTE_SUFFIXES {
527        if let Some(base) = symbol.strip_suffix(q) {
528            if !base.is_empty() {
529                return (base.to_string(), q.to_string());
530            }
531        }
532    }
533    (symbol.to_string(), String::new())
534}