Skip to main content

sandbox_quant/
order_store.rs

1use anyhow::Result;
2use chrono::TimeZone;
3use rusqlite::{params, Connection};
4use std::collections::HashMap;
5
6use crate::binance::types::{BinanceAllOrder, BinanceMyTrade};
7
8#[derive(Debug, Clone)]
9pub struct PersistedTrade {
10    pub trade: BinanceMyTrade,
11    pub source: String,
12}
13
14#[derive(Debug, Clone)]
15pub struct DailyRealizedReturn {
16    pub symbol: String,
17    pub date: String,
18    pub realized_return_pct: f64,
19}
20
21#[derive(Debug, Clone)]
22pub struct StrategyScopedStats {
23    pub trade_count: u32,
24    pub win_count: u32,
25    pub lose_count: u32,
26    pub realized_pnl: f64,
27}
28
29fn ensure_trade_schema(conn: &Connection) -> Result<()> {
30    for ddl in [
31        "ALTER TABLE order_history_trades ADD COLUMN commission REAL NOT NULL DEFAULT 0.0",
32        "ALTER TABLE order_history_trades ADD COLUMN commission_asset TEXT NOT NULL DEFAULT ''",
33        "ALTER TABLE order_history_trades ADD COLUMN realized_pnl REAL NOT NULL DEFAULT 0.0",
34    ] {
35        if let Err(e) = conn.execute(ddl, []) {
36            let msg = e.to_string();
37            if !msg.contains("duplicate column name") {
38                return Err(e.into());
39            }
40        }
41    }
42    Ok(())
43}
44
45fn ensure_strategy_stats_schema(conn: &Connection) -> Result<()> {
46    conn.execute_batch(
47        r#"
48        CREATE TABLE IF NOT EXISTS strategy_symbol_stats (
49            symbol TEXT NOT NULL,
50            source TEXT NOT NULL,
51            trade_count INTEGER NOT NULL,
52            win_count INTEGER NOT NULL,
53            lose_count INTEGER NOT NULL,
54            realized_pnl REAL NOT NULL,
55            updated_at_ms INTEGER NOT NULL,
56            PRIMARY KEY(symbol, source)
57        );
58        "#,
59    )?;
60    Ok(())
61}
62
63pub fn persist_strategy_symbol_stats(
64    symbol: &str,
65    stats: &HashMap<String, StrategyScopedStats>,
66) -> Result<()> {
67    std::fs::create_dir_all("data")?;
68    let mut conn = Connection::open("data/strategy_stats.sqlite")?;
69    ensure_strategy_stats_schema(&conn)?;
70    let tx = conn.transaction()?;
71    let now_ms = chrono::Utc::now().timestamp_millis();
72
73    for (source, row) in stats {
74        tx.execute(
75            r#"
76            INSERT INTO strategy_symbol_stats (
77                symbol, source, trade_count, win_count, lose_count, realized_pnl, updated_at_ms
78            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
79            ON CONFLICT(symbol, source) DO UPDATE SET
80                trade_count = excluded.trade_count,
81                win_count = excluded.win_count,
82                lose_count = excluded.lose_count,
83                realized_pnl = excluded.realized_pnl,
84                updated_at_ms = excluded.updated_at_ms
85            "#,
86            params![
87                symbol,
88                source,
89                row.trade_count as i64,
90                row.win_count as i64,
91                row.lose_count as i64,
92                row.realized_pnl,
93                now_ms,
94            ],
95        )?;
96    }
97    tx.commit()?;
98    Ok(())
99}
100
101pub fn load_strategy_symbol_stats(symbol: &str) -> Result<HashMap<String, StrategyScopedStats>> {
102    std::fs::create_dir_all("data")?;
103    let conn = Connection::open("data/strategy_stats.sqlite")?;
104    ensure_strategy_stats_schema(&conn)?;
105    let mut stmt = conn.prepare(
106        r#"
107        SELECT source, trade_count, win_count, lose_count, realized_pnl
108        FROM strategy_symbol_stats
109        WHERE symbol = ?1
110        "#,
111    )?;
112
113    let rows = stmt.query_map([symbol], |row| {
114        Ok((
115            row.get::<_, String>(0)?,
116            StrategyScopedStats {
117                trade_count: row.get::<_, i64>(1)?.max(0) as u32,
118                win_count: row.get::<_, i64>(2)?.max(0) as u32,
119                lose_count: row.get::<_, i64>(3)?.max(0) as u32,
120                realized_pnl: row.get::<_, f64>(4)?,
121            },
122        ))
123    })?;
124
125    let mut out = HashMap::new();
126    for row in rows {
127        let (source, stats) = row?;
128        out.insert(source, stats);
129    }
130    Ok(out)
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum HistoryBucket {
135    Day,
136    Hour,
137    Month,
138}
139
140pub fn persist_order_snapshot(
141    symbol: &str,
142    orders: &[BinanceAllOrder],
143    trades: &[BinanceMyTrade],
144) -> Result<()> {
145    std::fs::create_dir_all("data")?;
146    let mut conn = Connection::open("data/order_history.sqlite")?;
147    conn.execute_batch(
148        r#"
149        CREATE TABLE IF NOT EXISTS order_history_orders (
150            symbol TEXT NOT NULL,
151            order_id INTEGER NOT NULL,
152            client_order_id TEXT NOT NULL,
153            status TEXT NOT NULL,
154            side TEXT NOT NULL,
155            orig_qty REAL NOT NULL,
156            executed_qty REAL NOT NULL,
157            avg_price REAL NOT NULL,
158            event_time_ms INTEGER NOT NULL,
159            source TEXT NOT NULL,
160            updated_at_ms INTEGER NOT NULL,
161            PRIMARY KEY(symbol, order_id)
162        );
163
164        CREATE TABLE IF NOT EXISTS order_history_trades (
165            symbol TEXT NOT NULL,
166            trade_id INTEGER NOT NULL,
167            order_id INTEGER NOT NULL,
168            side TEXT NOT NULL,
169            qty REAL NOT NULL,
170            price REAL NOT NULL,
171            commission REAL NOT NULL DEFAULT 0.0,
172            commission_asset TEXT NOT NULL DEFAULT '',
173            event_time_ms INTEGER NOT NULL,
174            realized_pnl REAL NOT NULL DEFAULT 0.0,
175            source TEXT NOT NULL,
176            updated_at_ms INTEGER NOT NULL,
177            PRIMARY KEY(symbol, trade_id)
178        );
179        "#,
180    )?;
181    ensure_trade_schema(&conn)?;
182
183    let now_ms = chrono::Utc::now().timestamp_millis();
184    let tx = conn.transaction()?;
185    let mut source_by_order_id = std::collections::HashMap::new();
186
187    for o in orders {
188        let avg_price = if o.executed_qty > 0.0 {
189            o.cummulative_quote_qty / o.executed_qty
190        } else {
191            o.price
192        };
193        tx.execute(
194            r#"
195            INSERT INTO order_history_orders (
196                symbol, order_id, client_order_id, status, side,
197                orig_qty, executed_qty, avg_price, event_time_ms, source, updated_at_ms
198            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
199            ON CONFLICT(symbol, order_id) DO UPDATE SET
200                client_order_id = excluded.client_order_id,
201                status = excluded.status,
202                side = excluded.side,
203                orig_qty = excluded.orig_qty,
204                executed_qty = excluded.executed_qty,
205                avg_price = excluded.avg_price,
206                event_time_ms = excluded.event_time_ms,
207                source = excluded.source,
208                updated_at_ms = excluded.updated_at_ms
209            "#,
210            params![
211                symbol,
212                o.order_id as i64,
213                o.client_order_id,
214                o.status,
215                o.side,
216                o.orig_qty,
217                o.executed_qty,
218                avg_price,
219                o.update_time.max(o.time) as i64,
220                source_label_from_client_order_id(&o.client_order_id),
221                now_ms,
222            ],
223        )?;
224        source_by_order_id.insert(
225            o.order_id,
226            source_label_from_client_order_id(&o.client_order_id),
227        );
228    }
229
230    for t in trades {
231        tx.execute(
232            r#"
233            INSERT INTO order_history_trades (
234                symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source, updated_at_ms
235            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
236            ON CONFLICT(symbol, trade_id) DO UPDATE SET
237                order_id = excluded.order_id,
238                side = excluded.side,
239                qty = excluded.qty,
240                price = excluded.price,
241                commission = excluded.commission,
242                commission_asset = excluded.commission_asset,
243                event_time_ms = excluded.event_time_ms,
244                realized_pnl = excluded.realized_pnl,
245                source = excluded.source,
246                updated_at_ms = excluded.updated_at_ms
247            "#,
248            params![
249                symbol,
250                t.id as i64,
251                t.order_id as i64,
252                if t.is_buyer { "BUY" } else { "SELL" },
253                t.qty,
254                t.price,
255                t.commission,
256                t.commission_asset,
257                t.time as i64,
258                t.realized_pnl,
259                source_by_order_id
260                    .get(&t.order_id)
261                    .map(String::as_str)
262                    .unwrap_or("UNKNOWN"),
263                now_ms,
264            ],
265        )?;
266    }
267
268    tx.commit()?;
269    Ok(())
270}
271
272fn source_label_from_client_order_id(client_order_id: &str) -> String {
273    if client_order_id.contains("-mnl-") {
274        "MANUAL".to_string()
275    } else if client_order_id.contains("-cfg-") {
276        "MA(Config)".to_string()
277    } else if client_order_id.contains("-fst-") {
278        "MA(Fast 5/20)".to_string()
279    } else if client_order_id.contains("-slw-") {
280        "MA(Slow 20/60)".to_string()
281    } else if let Some(source_tag) = parse_source_tag_from_client_order_id(client_order_id) {
282        source_tag.to_ascii_lowercase()
283    } else {
284        "UNKNOWN".to_string()
285    }
286}
287
288fn parse_source_tag_from_client_order_id(client_order_id: &str) -> Option<&str> {
289    let body = client_order_id.strip_prefix("sq-")?;
290    let (source_tag, _) = body.split_once('-')?;
291    if source_tag.is_empty() {
292        None
293    } else {
294        Some(source_tag)
295    }
296}
297
298pub fn load_persisted_trades(symbol: &str) -> Result<Vec<PersistedTrade>> {
299    std::fs::create_dir_all("data")?;
300    let conn = Connection::open("data/order_history.sqlite")?;
301    ensure_trade_schema(&conn)?;
302    let mut stmt = conn.prepare(
303        r#"
304        SELECT trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source
305        FROM order_history_trades
306        WHERE symbol = ?1
307        ORDER BY event_time_ms ASC, trade_id ASC
308        "#,
309    )?;
310
311    let rows = stmt.query_map([symbol], |row| {
312        let side: String = row.get(2)?;
313        let is_buyer = side.eq_ignore_ascii_case("BUY");
314        let trade = BinanceMyTrade {
315            symbol: symbol.to_string(),
316            id: row.get::<_, i64>(0)? as u64,
317            order_id: row.get::<_, i64>(1)? as u64,
318            price: row.get(4)?,
319            qty: row.get(3)?,
320            commission: row.get(5)?,
321            commission_asset: row.get(6)?,
322            time: row.get::<_, i64>(7)? as u64,
323            realized_pnl: row.get(8)?,
324            is_buyer,
325            is_maker: false,
326        };
327        Ok(PersistedTrade {
328            trade,
329            source: row.get(9)?,
330        })
331    })?;
332
333    let mut trades = Vec::new();
334    for row in rows {
335        trades.push(row?);
336    }
337    Ok(trades)
338}
339
340pub fn load_last_trade_id(symbol: &str) -> Result<Option<u64>> {
341    std::fs::create_dir_all("data")?;
342    let conn = Connection::open("data/order_history.sqlite")?;
343    let mut stmt = conn.prepare(
344        r#"
345        SELECT MAX(trade_id)
346        FROM order_history_trades
347        WHERE symbol = ?1
348        "#,
349    )?;
350    let max_id = stmt.query_row([symbol], |row| row.get::<_, Option<i64>>(0))?;
351    Ok(max_id.map(|v| v as u64))
352}
353
354pub fn load_trade_count(symbol: &str) -> Result<usize> {
355    std::fs::create_dir_all("data")?;
356    let conn = Connection::open("data/order_history.sqlite")?;
357    let mut stmt = conn.prepare(
358        r#"
359        SELECT COUNT(*)
360        FROM order_history_trades
361        WHERE symbol = ?1
362        "#,
363    )?;
364    let count = stmt.query_row([symbol], |row| row.get::<_, i64>(0))?;
365    Ok(count.max(0) as usize)
366}
367
368#[derive(Clone, Copy, Default)]
369struct LongPos {
370    qty: f64,
371    cost_quote: f64,
372}
373
374#[derive(Clone, Copy, Default)]
375struct DailyBucket {
376    pnl: f64,
377    basis: f64,
378}
379
380pub fn load_realized_returns_by_bucket(
381    bucket: HistoryBucket,
382    limit: usize,
383) -> Result<Vec<DailyRealizedReturn>> {
384    std::fs::create_dir_all("data")?;
385    let conn = Connection::open("data/order_history.sqlite")?;
386    ensure_trade_schema(&conn)?;
387    let mut stmt = conn.prepare(
388        r#"
389        SELECT symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl
390        FROM order_history_trades
391        ORDER BY symbol ASC, event_time_ms ASC, trade_id ASC
392        "#,
393    )?;
394
395    let rows = stmt.query_map([], |row| {
396        Ok((
397            row.get::<_, String>(0)?,
398            row.get::<_, i64>(1)? as u64,
399            row.get::<_, i64>(2)? as u64,
400            row.get::<_, String>(3)?,
401            row.get::<_, f64>(4)?,
402            row.get::<_, f64>(5)?,
403            row.get::<_, f64>(6)?,
404            row.get::<_, String>(7)?,
405            row.get::<_, i64>(8)? as u64,
406            row.get::<_, f64>(9)?,
407        ))
408    })?;
409
410    let mut pos_by_symbol: HashMap<String, LongPos> = HashMap::new();
411    let mut daily_by_key: HashMap<(String, String), DailyBucket> = HashMap::new();
412
413    for row in rows {
414        let (
415            symbol,
416            _trade_id,
417            _order_id,
418            side,
419            qty_raw,
420            price,
421            commission,
422            commission_asset,
423            event_time_ms,
424            realized_pnl,
425        ) = row?;
426        let qty = qty_raw.max(0.0);
427        if qty <= f64::EPSILON {
428            continue;
429        }
430
431        let (base_asset, quote_asset) = split_symbol_assets(&symbol);
432        let fee_is_base =
433            !base_asset.is_empty() && commission_asset.eq_ignore_ascii_case(&base_asset);
434        let fee_is_quote =
435            !quote_asset.is_empty() && commission_asset.eq_ignore_ascii_case(&quote_asset);
436        let pos = pos_by_symbol.entry(symbol.clone()).or_default();
437
438        let date = chrono::Utc
439            .timestamp_millis_opt(event_time_ms as i64)
440            .single()
441            .map(|dt| dt.with_timezone(&chrono::Local))
442            .map(|dt| match bucket {
443                HistoryBucket::Day => dt.format("%Y-%m-%d").to_string(),
444                HistoryBucket::Hour => dt.format("%Y-%m-%d %H:00").to_string(),
445                HistoryBucket::Month => dt.format("%Y-%m").to_string(),
446            })
447            .unwrap_or_else(|| "unknown".to_string());
448
449        // Futures: realized_pnl is provided directly by exchange, do not apply spot long inventory logic.
450        if symbol.ends_with("#FUT") {
451            let basis = (qty * price).abs();
452            let bucket = daily_by_key.entry((symbol.clone(), date)).or_default();
453            bucket.pnl += realized_pnl;
454            bucket.basis += basis;
455            continue;
456        }
457
458        if side.eq_ignore_ascii_case("BUY") {
459            let net_qty = (qty
460                - if fee_is_base {
461                    commission.max(0.0)
462                } else {
463                    0.0
464                })
465            .max(0.0);
466            if net_qty <= f64::EPSILON {
467                continue;
468            }
469            let fee_quote = if fee_is_quote {
470                commission.max(0.0)
471            } else {
472                0.0
473            };
474            pos.qty += net_qty;
475            pos.cost_quote += qty * price + fee_quote;
476            continue;
477        }
478
479        if pos.qty <= f64::EPSILON {
480            continue;
481        }
482        let close_qty = qty.min(pos.qty);
483        if close_qty <= f64::EPSILON {
484            continue;
485        }
486        let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
487        let fee_quote_total = if fee_is_quote {
488            commission.max(0.0)
489        } else if fee_is_base {
490            commission.max(0.0) * price
491        } else {
492            0.0
493        };
494        let fee_quote = fee_quote_total * (close_qty / qty.max(f64::EPSILON));
495        let realized_pnl = (close_qty * price - fee_quote) - (avg_cost * close_qty);
496        let realized_basis = avg_cost * close_qty;
497
498        let bucket = daily_by_key.entry((symbol.clone(), date)).or_default();
499        bucket.pnl += realized_pnl;
500        bucket.basis += realized_basis;
501
502        pos.qty -= close_qty;
503        pos.cost_quote -= realized_basis;
504        if pos.qty <= f64::EPSILON {
505            pos.qty = 0.0;
506            pos.cost_quote = 0.0;
507        }
508    }
509
510    let mut out: Vec<DailyRealizedReturn> = daily_by_key
511        .into_iter()
512        .map(|((symbol, date), b)| DailyRealizedReturn {
513            symbol,
514            date,
515            realized_return_pct: if b.basis.abs() > f64::EPSILON {
516                (b.pnl / b.basis) * 100.0
517            } else {
518                0.0
519            },
520        })
521        .collect();
522
523    out.sort_by(|a, b| b.date.cmp(&a.date).then_with(|| a.symbol.cmp(&b.symbol)));
524    if out.len() > limit {
525        out.truncate(limit);
526    }
527    Ok(out)
528}
529
530pub fn load_daily_realized_returns(limit: usize) -> Result<Vec<DailyRealizedReturn>> {
531    load_realized_returns_by_bucket(HistoryBucket::Day, limit)
532}
533
534fn split_symbol_assets(symbol: &str) -> (String, String) {
535    const QUOTE_SUFFIXES: [&str; 10] = [
536        "USDT", "USDC", "FDUSD", "BUSD", "TUSD", "TRY", "EUR", "BTC", "ETH", "BNB",
537    ];
538    for q in QUOTE_SUFFIXES {
539        if let Some(base) = symbol.strip_suffix(q) {
540            if !base.is_empty() {
541                return (base.to_string(), q.to_string());
542            }
543        }
544    }
545    (symbol.to_string(), String::new())
546}