Skip to main content

sandbox_quant/
order_store.rs

1use anyhow::Result;
2use chrono::TimeZone;
3use rusqlite::{params, Connection};
4use std::collections::HashMap;
5
6use crate::binance::types::{BinanceAllOrder, BinanceMyTrade};
7
8#[derive(Debug, Clone)]
9pub struct PersistedTrade {
10    pub trade: BinanceMyTrade,
11    pub source: String,
12}
13
14#[derive(Debug, Clone)]
15pub struct DailyRealizedReturn {
16    pub symbol: String,
17    pub date: String,
18    pub realized_return_pct: f64,
19}
20
21fn ensure_trade_schema(conn: &Connection) -> Result<()> {
22    for ddl in [
23        "ALTER TABLE order_history_trades ADD COLUMN commission REAL NOT NULL DEFAULT 0.0",
24        "ALTER TABLE order_history_trades ADD COLUMN commission_asset TEXT NOT NULL DEFAULT ''",
25        "ALTER TABLE order_history_trades ADD COLUMN realized_pnl REAL NOT NULL DEFAULT 0.0",
26    ] {
27        if let Err(e) = conn.execute(ddl, []) {
28            let msg = e.to_string();
29            if !msg.contains("duplicate column name") {
30                return Err(e.into());
31            }
32        }
33    }
34    Ok(())
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum HistoryBucket {
39    Day,
40    Hour,
41    Month,
42}
43
44pub fn persist_order_snapshot(
45    symbol: &str,
46    orders: &[BinanceAllOrder],
47    trades: &[BinanceMyTrade],
48) -> Result<()> {
49    std::fs::create_dir_all("data")?;
50    let mut conn = Connection::open("data/order_history.sqlite")?;
51    conn.execute_batch(
52        r#"
53        CREATE TABLE IF NOT EXISTS order_history_orders (
54            symbol TEXT NOT NULL,
55            order_id INTEGER NOT NULL,
56            client_order_id TEXT NOT NULL,
57            status TEXT NOT NULL,
58            side TEXT NOT NULL,
59            orig_qty REAL NOT NULL,
60            executed_qty REAL NOT NULL,
61            avg_price REAL NOT NULL,
62            event_time_ms INTEGER NOT NULL,
63            source TEXT NOT NULL,
64            updated_at_ms INTEGER NOT NULL,
65            PRIMARY KEY(symbol, order_id)
66        );
67
68        CREATE TABLE IF NOT EXISTS order_history_trades (
69            symbol TEXT NOT NULL,
70            trade_id INTEGER NOT NULL,
71            order_id INTEGER NOT NULL,
72            side TEXT NOT NULL,
73            qty REAL NOT NULL,
74            price REAL NOT NULL,
75            commission REAL NOT NULL DEFAULT 0.0,
76            commission_asset TEXT NOT NULL DEFAULT '',
77            event_time_ms INTEGER NOT NULL,
78            realized_pnl REAL NOT NULL DEFAULT 0.0,
79            source TEXT NOT NULL,
80            updated_at_ms INTEGER NOT NULL,
81            PRIMARY KEY(symbol, trade_id)
82        );
83        "#,
84    )?;
85    ensure_trade_schema(&conn)?;
86
87    let now_ms = chrono::Utc::now().timestamp_millis();
88    let tx = conn.transaction()?;
89    let mut source_by_order_id = std::collections::HashMap::new();
90
91    for o in orders {
92        let avg_price = if o.executed_qty > 0.0 {
93            o.cummulative_quote_qty / o.executed_qty
94        } else {
95            o.price
96        };
97        tx.execute(
98            r#"
99            INSERT INTO order_history_orders (
100                symbol, order_id, client_order_id, status, side,
101                orig_qty, executed_qty, avg_price, event_time_ms, source, updated_at_ms
102            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
103            ON CONFLICT(symbol, order_id) DO UPDATE SET
104                client_order_id = excluded.client_order_id,
105                status = excluded.status,
106                side = excluded.side,
107                orig_qty = excluded.orig_qty,
108                executed_qty = excluded.executed_qty,
109                avg_price = excluded.avg_price,
110                event_time_ms = excluded.event_time_ms,
111                source = excluded.source,
112                updated_at_ms = excluded.updated_at_ms
113            "#,
114            params![
115                symbol,
116                o.order_id as i64,
117                o.client_order_id,
118                o.status,
119                o.side,
120                o.orig_qty,
121                o.executed_qty,
122                avg_price,
123                o.update_time.max(o.time) as i64,
124                source_label_from_client_order_id(&o.client_order_id),
125                now_ms,
126            ],
127        )?;
128        source_by_order_id.insert(
129            o.order_id,
130            source_label_from_client_order_id(&o.client_order_id).to_string(),
131        );
132    }
133
134    for t in trades {
135        tx.execute(
136            r#"
137            INSERT INTO order_history_trades (
138                symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source, updated_at_ms
139            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
140            ON CONFLICT(symbol, trade_id) DO UPDATE SET
141                order_id = excluded.order_id,
142                side = excluded.side,
143                qty = excluded.qty,
144                price = excluded.price,
145                commission = excluded.commission,
146                commission_asset = excluded.commission_asset,
147                event_time_ms = excluded.event_time_ms,
148                realized_pnl = excluded.realized_pnl,
149                source = excluded.source,
150                updated_at_ms = excluded.updated_at_ms
151            "#,
152            params![
153                symbol,
154                t.id as i64,
155                t.order_id as i64,
156                if t.is_buyer { "BUY" } else { "SELL" },
157                t.qty,
158                t.price,
159                t.commission,
160                t.commission_asset,
161                t.time as i64,
162                t.realized_pnl,
163                source_by_order_id
164                    .get(&t.order_id)
165                    .map(String::as_str)
166                    .unwrap_or("UNKNOWN"),
167                now_ms,
168            ],
169        )?;
170    }
171
172    tx.commit()?;
173    Ok(())
174}
175
176fn source_label_from_client_order_id(client_order_id: &str) -> &'static str {
177    if client_order_id.contains("-mnl-") {
178        "MANUAL"
179    } else if client_order_id.contains("-cfg-") {
180        "MA(Config)"
181    } else if client_order_id.contains("-fst-") {
182        "MA(Fast 5/20)"
183    } else if client_order_id.contains("-slw-") {
184        "MA(Slow 20/60)"
185    } else {
186        "UNKNOWN"
187    }
188}
189
190pub fn load_persisted_trades(symbol: &str) -> Result<Vec<PersistedTrade>> {
191    std::fs::create_dir_all("data")?;
192    let conn = Connection::open("data/order_history.sqlite")?;
193    ensure_trade_schema(&conn)?;
194    let mut stmt = conn.prepare(
195        r#"
196        SELECT trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source
197        FROM order_history_trades
198        WHERE symbol = ?1
199        ORDER BY event_time_ms ASC, trade_id ASC
200        "#,
201    )?;
202
203    let rows = stmt.query_map([symbol], |row| {
204        let side: String = row.get(2)?;
205        let is_buyer = side.eq_ignore_ascii_case("BUY");
206        let trade = BinanceMyTrade {
207            symbol: symbol.to_string(),
208            id: row.get::<_, i64>(0)? as u64,
209            order_id: row.get::<_, i64>(1)? as u64,
210            price: row.get(4)?,
211            qty: row.get(3)?,
212            commission: row.get(5)?,
213            commission_asset: row.get(6)?,
214            time: row.get::<_, i64>(7)? as u64,
215            realized_pnl: row.get(8)?,
216            is_buyer,
217            is_maker: false,
218        };
219        Ok(PersistedTrade {
220            trade,
221            source: row.get(9)?,
222        })
223    })?;
224
225    let mut trades = Vec::new();
226    for row in rows {
227        trades.push(row?);
228    }
229    Ok(trades)
230}
231
232pub fn load_last_trade_id(symbol: &str) -> Result<Option<u64>> {
233    std::fs::create_dir_all("data")?;
234    let conn = Connection::open("data/order_history.sqlite")?;
235    let mut stmt = conn.prepare(
236        r#"
237        SELECT MAX(trade_id)
238        FROM order_history_trades
239        WHERE symbol = ?1
240        "#,
241    )?;
242    let max_id = stmt.query_row([symbol], |row| row.get::<_, Option<i64>>(0))?;
243    Ok(max_id.map(|v| v as u64))
244}
245
246pub fn load_trade_count(symbol: &str) -> Result<usize> {
247    std::fs::create_dir_all("data")?;
248    let conn = Connection::open("data/order_history.sqlite")?;
249    let mut stmt = conn.prepare(
250        r#"
251        SELECT COUNT(*)
252        FROM order_history_trades
253        WHERE symbol = ?1
254        "#,
255    )?;
256    let count = stmt.query_row([symbol], |row| row.get::<_, i64>(0))?;
257    Ok(count.max(0) as usize)
258}
259
260#[derive(Clone, Copy, Default)]
261struct LongPos {
262    qty: f64,
263    cost_quote: f64,
264}
265
266#[derive(Clone, Copy, Default)]
267struct DailyBucket {
268    pnl: f64,
269    basis: f64,
270}
271
272pub fn load_realized_returns_by_bucket(
273    bucket: HistoryBucket,
274    limit: usize,
275) -> Result<Vec<DailyRealizedReturn>> {
276    std::fs::create_dir_all("data")?;
277    let conn = Connection::open("data/order_history.sqlite")?;
278    ensure_trade_schema(&conn)?;
279    let mut stmt = conn.prepare(
280        r#"
281        SELECT symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl
282        FROM order_history_trades
283        ORDER BY symbol ASC, event_time_ms ASC, trade_id ASC
284        "#,
285    )?;
286
287    let rows = stmt.query_map([], |row| {
288        Ok((
289            row.get::<_, String>(0)?,
290            row.get::<_, i64>(1)? as u64,
291            row.get::<_, i64>(2)? as u64,
292            row.get::<_, String>(3)?,
293            row.get::<_, f64>(4)?,
294            row.get::<_, f64>(5)?,
295            row.get::<_, f64>(6)?,
296            row.get::<_, String>(7)?,
297            row.get::<_, i64>(8)? as u64,
298            row.get::<_, f64>(9)?,
299        ))
300    })?;
301
302    let mut pos_by_symbol: HashMap<String, LongPos> = HashMap::new();
303    let mut daily_by_key: HashMap<(String, String), DailyBucket> = HashMap::new();
304
305    for row in rows {
306        let (
307            symbol,
308            _trade_id,
309            _order_id,
310            side,
311            qty_raw,
312            price,
313            commission,
314            commission_asset,
315            event_time_ms,
316            realized_pnl,
317        ) = row?;
318        let qty = qty_raw.max(0.0);
319        if qty <= f64::EPSILON {
320            continue;
321        }
322
323        let (base_asset, quote_asset) = split_symbol_assets(&symbol);
324        let fee_is_base =
325            !base_asset.is_empty() && commission_asset.eq_ignore_ascii_case(&base_asset);
326        let fee_is_quote =
327            !quote_asset.is_empty() && commission_asset.eq_ignore_ascii_case(&quote_asset);
328        let pos = pos_by_symbol.entry(symbol.clone()).or_default();
329
330        let date = chrono::Utc
331            .timestamp_millis_opt(event_time_ms as i64)
332            .single()
333            .map(|dt| dt.with_timezone(&chrono::Local))
334            .map(|dt| match bucket {
335                HistoryBucket::Day => dt.format("%Y-%m-%d").to_string(),
336                HistoryBucket::Hour => dt.format("%Y-%m-%d %H:00").to_string(),
337                HistoryBucket::Month => dt.format("%Y-%m").to_string(),
338            })
339            .unwrap_or_else(|| "unknown".to_string());
340
341        // Futures: realized_pnl is provided directly by exchange, do not apply spot long inventory logic.
342        if symbol.ends_with("#FUT") {
343            let basis = (qty * price).abs();
344            let bucket = daily_by_key.entry((symbol.clone(), date)).or_default();
345            bucket.pnl += realized_pnl;
346            bucket.basis += basis;
347            continue;
348        }
349
350        if side.eq_ignore_ascii_case("BUY") {
351            let net_qty = (qty
352                - if fee_is_base {
353                    commission.max(0.0)
354                } else {
355                    0.0
356                })
357            .max(0.0);
358            if net_qty <= f64::EPSILON {
359                continue;
360            }
361            let fee_quote = if fee_is_quote {
362                commission.max(0.0)
363            } else {
364                0.0
365            };
366            pos.qty += net_qty;
367            pos.cost_quote += qty * price + fee_quote;
368            continue;
369        }
370
371        if pos.qty <= f64::EPSILON {
372            continue;
373        }
374        let close_qty = qty.min(pos.qty);
375        if close_qty <= f64::EPSILON {
376            continue;
377        }
378        let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
379        let fee_quote_total = if fee_is_quote {
380            commission.max(0.0)
381        } else if fee_is_base {
382            commission.max(0.0) * price
383        } else {
384            0.0
385        };
386        let fee_quote = fee_quote_total * (close_qty / qty.max(f64::EPSILON));
387        let realized_pnl = (close_qty * price - fee_quote) - (avg_cost * close_qty);
388        let realized_basis = avg_cost * close_qty;
389
390        let bucket = daily_by_key.entry((symbol.clone(), date)).or_default();
391        bucket.pnl += realized_pnl;
392        bucket.basis += realized_basis;
393
394        pos.qty -= close_qty;
395        pos.cost_quote -= realized_basis;
396        if pos.qty <= f64::EPSILON {
397            pos.qty = 0.0;
398            pos.cost_quote = 0.0;
399        }
400    }
401
402    let mut out: Vec<DailyRealizedReturn> = daily_by_key
403        .into_iter()
404        .map(|((symbol, date), b)| DailyRealizedReturn {
405            symbol,
406            date,
407            realized_return_pct: if b.basis.abs() > f64::EPSILON {
408                (b.pnl / b.basis) * 100.0
409            } else {
410                0.0
411            },
412        })
413        .collect();
414
415    out.sort_by(|a, b| b.date.cmp(&a.date).then_with(|| a.symbol.cmp(&b.symbol)));
416    if out.len() > limit {
417        out.truncate(limit);
418    }
419    Ok(out)
420}
421
422pub fn load_daily_realized_returns(limit: usize) -> Result<Vec<DailyRealizedReturn>> {
423    load_realized_returns_by_bucket(HistoryBucket::Day, limit)
424}
425
426fn split_symbol_assets(symbol: &str) -> (String, String) {
427    const QUOTE_SUFFIXES: [&str; 10] = [
428        "USDT", "USDC", "FDUSD", "BUSD", "TUSD", "TRY", "EUR", "BTC", "ETH", "BNB",
429    ];
430    for q in QUOTE_SUFFIXES {
431        if let Some(base) = symbol.strip_suffix(q) {
432            if !base.is_empty() {
433                return (base.to_string(), q.to_string());
434            }
435        }
436    }
437    (symbol.to_string(), String::new())
438}