1use anyhow::Result;
2use chrono::TimeZone;
3use rusqlite::{params, Connection};
4use std::collections::HashMap;
5
6use crate::binance::types::{BinanceAllOrder, BinanceMyTrade};
7
8#[derive(Debug, Clone)]
9pub struct PersistedTrade {
10 pub trade: BinanceMyTrade,
11 pub source: String,
12}
13
14#[derive(Debug, Clone)]
15pub struct DailyRealizedReturn {
16 pub symbol: String,
17 pub date: String,
18 pub realized_return_pct: f64,
19}
20
21#[derive(Debug, Clone)]
22pub struct StrategyScopedStats {
23 pub trade_count: u32,
24 pub win_count: u32,
25 pub lose_count: u32,
26 pub realized_pnl: f64,
27}
28
29fn ensure_trade_schema(conn: &Connection) -> Result<()> {
30 for ddl in [
31 "ALTER TABLE order_history_trades ADD COLUMN commission REAL NOT NULL DEFAULT 0.0",
32 "ALTER TABLE order_history_trades ADD COLUMN commission_asset TEXT NOT NULL DEFAULT ''",
33 "ALTER TABLE order_history_trades ADD COLUMN realized_pnl REAL NOT NULL DEFAULT 0.0",
34 ] {
35 if let Err(e) = conn.execute(ddl, []) {
36 let msg = e.to_string();
37 if !msg.contains("duplicate column name") {
38 return Err(e.into());
39 }
40 }
41 }
42 Ok(())
43}
44
45fn ensure_strategy_stats_schema(conn: &Connection) -> Result<()> {
46 conn.execute_batch(
47 r#"
48 CREATE TABLE IF NOT EXISTS strategy_symbol_stats (
49 symbol TEXT NOT NULL,
50 source TEXT NOT NULL,
51 trade_count INTEGER NOT NULL,
52 win_count INTEGER NOT NULL,
53 lose_count INTEGER NOT NULL,
54 realized_pnl REAL NOT NULL,
55 updated_at_ms INTEGER NOT NULL,
56 PRIMARY KEY(symbol, source)
57 );
58 "#,
59 )?;
60 Ok(())
61}
62
63pub fn persist_strategy_symbol_stats(
64 symbol: &str,
65 stats: &HashMap<String, StrategyScopedStats>,
66) -> Result<()> {
67 std::fs::create_dir_all("data")?;
68 let mut conn = Connection::open("data/strategy_stats.sqlite")?;
69 ensure_strategy_stats_schema(&conn)?;
70 let tx = conn.transaction()?;
71 let now_ms = chrono::Utc::now().timestamp_millis();
72
73 for (source, row) in stats {
74 tx.execute(
75 r#"
76 INSERT INTO strategy_symbol_stats (
77 symbol, source, trade_count, win_count, lose_count, realized_pnl, updated_at_ms
78 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
79 ON CONFLICT(symbol, source) DO UPDATE SET
80 trade_count = excluded.trade_count,
81 win_count = excluded.win_count,
82 lose_count = excluded.lose_count,
83 realized_pnl = excluded.realized_pnl,
84 updated_at_ms = excluded.updated_at_ms
85 "#,
86 params![
87 symbol,
88 source,
89 row.trade_count as i64,
90 row.win_count as i64,
91 row.lose_count as i64,
92 row.realized_pnl,
93 now_ms,
94 ],
95 )?;
96 }
97 tx.commit()?;
98 Ok(())
99}
100
101pub fn load_strategy_symbol_stats(symbol: &str) -> Result<HashMap<String, StrategyScopedStats>> {
102 std::fs::create_dir_all("data")?;
103 let conn = Connection::open("data/strategy_stats.sqlite")?;
104 ensure_strategy_stats_schema(&conn)?;
105 let mut stmt = conn.prepare(
106 r#"
107 SELECT source, trade_count, win_count, lose_count, realized_pnl
108 FROM strategy_symbol_stats
109 WHERE symbol = ?1
110 "#,
111 )?;
112
113 let rows = stmt.query_map([symbol], |row| {
114 Ok((
115 row.get::<_, String>(0)?,
116 StrategyScopedStats {
117 trade_count: row.get::<_, i64>(1)?.max(0) as u32,
118 win_count: row.get::<_, i64>(2)?.max(0) as u32,
119 lose_count: row.get::<_, i64>(3)?.max(0) as u32,
120 realized_pnl: row.get::<_, f64>(4)?,
121 },
122 ))
123 })?;
124
125 let mut out = HashMap::new();
126 for row in rows {
127 let (source, stats) = row?;
128 out.insert(source, stats);
129 }
130 Ok(out)
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum HistoryBucket {
135 Day,
136 Hour,
137 Month,
138}
139
140pub fn persist_order_snapshot(
141 symbol: &str,
142 orders: &[BinanceAllOrder],
143 trades: &[BinanceMyTrade],
144) -> Result<()> {
145 std::fs::create_dir_all("data")?;
146 let mut conn = Connection::open("data/order_history.sqlite")?;
147 conn.execute_batch(
148 r#"
149 CREATE TABLE IF NOT EXISTS order_history_orders (
150 symbol TEXT NOT NULL,
151 order_id INTEGER NOT NULL,
152 client_order_id TEXT NOT NULL,
153 status TEXT NOT NULL,
154 side TEXT NOT NULL,
155 orig_qty REAL NOT NULL,
156 executed_qty REAL NOT NULL,
157 avg_price REAL NOT NULL,
158 event_time_ms INTEGER NOT NULL,
159 source TEXT NOT NULL,
160 updated_at_ms INTEGER NOT NULL,
161 PRIMARY KEY(symbol, order_id)
162 );
163
164 CREATE TABLE IF NOT EXISTS order_history_trades (
165 symbol TEXT NOT NULL,
166 trade_id INTEGER NOT NULL,
167 order_id INTEGER NOT NULL,
168 side TEXT NOT NULL,
169 qty REAL NOT NULL,
170 price REAL NOT NULL,
171 commission REAL NOT NULL DEFAULT 0.0,
172 commission_asset TEXT NOT NULL DEFAULT '',
173 event_time_ms INTEGER NOT NULL,
174 realized_pnl REAL NOT NULL DEFAULT 0.0,
175 source TEXT NOT NULL,
176 updated_at_ms INTEGER NOT NULL,
177 PRIMARY KEY(symbol, trade_id)
178 );
179 "#,
180 )?;
181 ensure_trade_schema(&conn)?;
182
183 let now_ms = chrono::Utc::now().timestamp_millis();
184 let tx = conn.transaction()?;
185 let mut source_by_order_id = std::collections::HashMap::new();
186
187 for o in orders {
188 let avg_price = if o.executed_qty > 0.0 {
189 o.cummulative_quote_qty / o.executed_qty
190 } else {
191 o.price
192 };
193 tx.execute(
194 r#"
195 INSERT INTO order_history_orders (
196 symbol, order_id, client_order_id, status, side,
197 orig_qty, executed_qty, avg_price, event_time_ms, source, updated_at_ms
198 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
199 ON CONFLICT(symbol, order_id) DO UPDATE SET
200 client_order_id = excluded.client_order_id,
201 status = excluded.status,
202 side = excluded.side,
203 orig_qty = excluded.orig_qty,
204 executed_qty = excluded.executed_qty,
205 avg_price = excluded.avg_price,
206 event_time_ms = excluded.event_time_ms,
207 source = excluded.source,
208 updated_at_ms = excluded.updated_at_ms
209 "#,
210 params![
211 symbol,
212 o.order_id as i64,
213 o.client_order_id,
214 o.status,
215 o.side,
216 o.orig_qty,
217 o.executed_qty,
218 avg_price,
219 o.update_time.max(o.time) as i64,
220 source_label_from_client_order_id(&o.client_order_id),
221 now_ms,
222 ],
223 )?;
224 source_by_order_id.insert(
225 o.order_id,
226 source_label_from_client_order_id(&o.client_order_id),
227 );
228 }
229
230 for t in trades {
231 tx.execute(
232 r#"
233 INSERT INTO order_history_trades (
234 symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source, updated_at_ms
235 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
236 ON CONFLICT(symbol, trade_id) DO UPDATE SET
237 order_id = excluded.order_id,
238 side = excluded.side,
239 qty = excluded.qty,
240 price = excluded.price,
241 commission = excluded.commission,
242 commission_asset = excluded.commission_asset,
243 event_time_ms = excluded.event_time_ms,
244 realized_pnl = excluded.realized_pnl,
245 source = excluded.source,
246 updated_at_ms = excluded.updated_at_ms
247 "#,
248 params![
249 symbol,
250 t.id as i64,
251 t.order_id as i64,
252 if t.is_buyer { "BUY" } else { "SELL" },
253 t.qty,
254 t.price,
255 t.commission,
256 t.commission_asset,
257 t.time as i64,
258 t.realized_pnl,
259 source_by_order_id
260 .get(&t.order_id)
261 .map(String::as_str)
262 .unwrap_or("UNKNOWN"),
263 now_ms,
264 ],
265 )?;
266 }
267
268 tx.commit()?;
269 Ok(())
270}
271
272fn source_label_from_client_order_id(client_order_id: &str) -> String {
273 if client_order_id.contains("-mnl-") {
274 "MANUAL".to_string()
275 } else if client_order_id.contains("-cfg-") {
276 "MA(Config)".to_string()
277 } else if client_order_id.contains("-fst-") {
278 "MA(Fast 5/20)".to_string()
279 } else if client_order_id.contains("-slw-") {
280 "MA(Slow 20/60)".to_string()
281 } else if let Some(source_tag) = parse_source_tag_from_client_order_id(client_order_id) {
282 source_tag.to_ascii_lowercase()
283 } else {
284 "UNKNOWN".to_string()
285 }
286}
287
288fn parse_source_tag_from_client_order_id(client_order_id: &str) -> Option<&str> {
289 let body = client_order_id.strip_prefix("sq-")?;
290 let (source_tag, _) = body.split_once('-')?;
291 if source_tag.is_empty() {
292 None
293 } else {
294 Some(source_tag)
295 }
296}
297
298pub fn load_persisted_trades(symbol: &str) -> Result<Vec<PersistedTrade>> {
299 std::fs::create_dir_all("data")?;
300 let conn = Connection::open("data/order_history.sqlite")?;
301 ensure_trade_schema(&conn)?;
302 let mut stmt = conn.prepare(
303 r#"
304 SELECT trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source
305 FROM order_history_trades
306 WHERE symbol = ?1
307 ORDER BY event_time_ms ASC, trade_id ASC
308 "#,
309 )?;
310
311 let rows = stmt.query_map([symbol], |row| {
312 let side: String = row.get(2)?;
313 let is_buyer = side.eq_ignore_ascii_case("BUY");
314 let trade = BinanceMyTrade {
315 symbol: symbol.to_string(),
316 id: row.get::<_, i64>(0)? as u64,
317 order_id: row.get::<_, i64>(1)? as u64,
318 price: row.get(4)?,
319 qty: row.get(3)?,
320 commission: row.get(5)?,
321 commission_asset: row.get(6)?,
322 time: row.get::<_, i64>(7)? as u64,
323 realized_pnl: row.get(8)?,
324 is_buyer,
325 is_maker: false,
326 };
327 Ok(PersistedTrade {
328 trade,
329 source: row.get(9)?,
330 })
331 })?;
332
333 let mut trades = Vec::new();
334 for row in rows {
335 trades.push(row?);
336 }
337 Ok(trades)
338}
339
340pub fn load_recent_persisted_trades_filtered(
341 symbol: Option<&str>,
342 source: Option<&str>,
343 limit: usize,
344) -> Result<Vec<PersistedTrade>> {
345 std::fs::create_dir_all("data")?;
346 let conn = Connection::open("data/order_history.sqlite")?;
347 ensure_trade_schema(&conn)?;
348
349 let limit = limit.max(1) as i64;
350 let (sql, bind_symbol, bind_source) = match (symbol, source) {
351 (Some(_), Some(_)) => (
352 r#"
353 SELECT symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source
354 FROM order_history_trades
355 WHERE symbol = ?1 AND LOWER(source) = LOWER(?2)
356 ORDER BY event_time_ms DESC, trade_id DESC
357 LIMIT ?3
358 "#,
359 true,
360 true,
361 ),
362 (Some(_), None) => (
363 r#"
364 SELECT symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source
365 FROM order_history_trades
366 WHERE symbol = ?1
367 ORDER BY event_time_ms DESC, trade_id DESC
368 LIMIT ?2
369 "#,
370 true,
371 false,
372 ),
373 (None, Some(_)) => (
374 r#"
375 SELECT symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source
376 FROM order_history_trades
377 WHERE LOWER(source) = LOWER(?1)
378 ORDER BY event_time_ms DESC, trade_id DESC
379 LIMIT ?2
380 "#,
381 false,
382 true,
383 ),
384 (None, None) => (
385 r#"
386 SELECT symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source
387 FROM order_history_trades
388 ORDER BY event_time_ms DESC, trade_id DESC
389 LIMIT ?1
390 "#,
391 false,
392 false,
393 ),
394 };
395
396 let mut stmt = conn.prepare(sql)?;
397 let mut out = Vec::new();
398 match (bind_symbol, bind_source) {
399 (true, true) => {
400 let rows = stmt.query_map(
401 params![symbol.unwrap_or_default(), source.unwrap_or_default(), limit],
402 map_persisted_trade_row,
403 )?;
404 for row in rows {
405 out.push(row?);
406 }
407 }
408 (true, false) => {
409 let rows = stmt.query_map(
410 params![symbol.unwrap_or_default(), limit],
411 map_persisted_trade_row,
412 )?;
413 for row in rows {
414 out.push(row?);
415 }
416 }
417 (false, true) => {
418 let rows = stmt.query_map(
419 params![source.unwrap_or_default(), limit],
420 map_persisted_trade_row,
421 )?;
422 for row in rows {
423 out.push(row?);
424 }
425 }
426 (false, false) => {
427 let rows = stmt.query_map(params![limit], map_persisted_trade_row)?;
428 for row in rows {
429 out.push(row?);
430 }
431 }
432 }
433 Ok(out)
434}
435
436fn map_persisted_trade_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<PersistedTrade> {
437 Ok(PersistedTrade {
438 trade: BinanceMyTrade {
439 symbol: row.get::<_, String>(0)?,
440 id: row.get::<_, i64>(1)? as u64,
441 order_id: row.get::<_, i64>(2)? as u64,
442 price: row.get(5)?,
443 qty: row.get(4)?,
444 commission: row.get(6)?,
445 commission_asset: row.get(7)?,
446 time: row.get::<_, i64>(8)? as u64,
447 realized_pnl: row.get(9)?,
448 is_buyer: row.get::<_, String>(3)?.eq_ignore_ascii_case("BUY"),
449 is_maker: false,
450 },
451 source: row.get(10)?,
452 })
453}
454
455pub fn load_last_trade_id(symbol: &str) -> Result<Option<u64>> {
456 std::fs::create_dir_all("data")?;
457 let conn = Connection::open("data/order_history.sqlite")?;
458 let mut stmt = conn.prepare(
459 r#"
460 SELECT MAX(trade_id)
461 FROM order_history_trades
462 WHERE symbol = ?1
463 "#,
464 )?;
465 let max_id = stmt.query_row([symbol], |row| row.get::<_, Option<i64>>(0))?;
466 Ok(max_id.map(|v| v as u64))
467}
468
469pub fn load_trade_count(symbol: &str) -> Result<usize> {
470 std::fs::create_dir_all("data")?;
471 let conn = Connection::open("data/order_history.sqlite")?;
472 let mut stmt = conn.prepare(
473 r#"
474 SELECT COUNT(*)
475 FROM order_history_trades
476 WHERE symbol = ?1
477 "#,
478 )?;
479 let count = stmt.query_row([symbol], |row| row.get::<_, i64>(0))?;
480 Ok(count.max(0) as usize)
481}
482
483#[derive(Clone, Copy, Default)]
484struct LongPos {
485 qty: f64,
486 cost_quote: f64,
487}
488
489#[derive(Clone, Copy, Default)]
490struct DailyBucket {
491 pnl: f64,
492 basis: f64,
493}
494
495pub fn load_realized_returns_by_bucket(
496 bucket: HistoryBucket,
497 limit: usize,
498) -> Result<Vec<DailyRealizedReturn>> {
499 std::fs::create_dir_all("data")?;
500 let conn = Connection::open("data/order_history.sqlite")?;
501 ensure_trade_schema(&conn)?;
502 let mut stmt = conn.prepare(
503 r#"
504 SELECT symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl
505 FROM order_history_trades
506 ORDER BY symbol ASC, event_time_ms ASC, trade_id ASC
507 "#,
508 )?;
509
510 let rows = stmt.query_map([], |row| {
511 Ok((
512 row.get::<_, String>(0)?,
513 row.get::<_, i64>(1)? as u64,
514 row.get::<_, i64>(2)? as u64,
515 row.get::<_, String>(3)?,
516 row.get::<_, f64>(4)?,
517 row.get::<_, f64>(5)?,
518 row.get::<_, f64>(6)?,
519 row.get::<_, String>(7)?,
520 row.get::<_, i64>(8)? as u64,
521 row.get::<_, f64>(9)?,
522 ))
523 })?;
524
525 let mut pos_by_symbol: HashMap<String, LongPos> = HashMap::new();
526 let mut daily_by_key: HashMap<(String, String), DailyBucket> = HashMap::new();
527
528 for row in rows {
529 let (
530 symbol,
531 _trade_id,
532 _order_id,
533 side,
534 qty_raw,
535 price,
536 commission,
537 commission_asset,
538 event_time_ms,
539 realized_pnl,
540 ) = row?;
541 let qty = qty_raw.max(0.0);
542 if qty <= f64::EPSILON {
543 continue;
544 }
545
546 let (base_asset, quote_asset) = split_symbol_assets(&symbol);
547 let fee_is_base =
548 !base_asset.is_empty() && commission_asset.eq_ignore_ascii_case(&base_asset);
549 let fee_is_quote =
550 !quote_asset.is_empty() && commission_asset.eq_ignore_ascii_case("e_asset);
551 let pos = pos_by_symbol.entry(symbol.clone()).or_default();
552
553 let date = chrono::Utc
554 .timestamp_millis_opt(event_time_ms as i64)
555 .single()
556 .map(|dt| dt.with_timezone(&chrono::Local))
557 .map(|dt| match bucket {
558 HistoryBucket::Day => dt.format("%Y-%m-%d").to_string(),
559 HistoryBucket::Hour => dt.format("%Y-%m-%d %H:00").to_string(),
560 HistoryBucket::Month => dt.format("%Y-%m").to_string(),
561 })
562 .unwrap_or_else(|| "unknown".to_string());
563
564 if symbol.ends_with("#FUT") {
566 let basis = (qty * price).abs();
567 let bucket = daily_by_key.entry((symbol.clone(), date)).or_default();
568 bucket.pnl += realized_pnl;
569 bucket.basis += basis;
570 continue;
571 }
572
573 if side.eq_ignore_ascii_case("BUY") {
574 let net_qty = (qty
575 - if fee_is_base {
576 commission.max(0.0)
577 } else {
578 0.0
579 })
580 .max(0.0);
581 if net_qty <= f64::EPSILON {
582 continue;
583 }
584 let fee_quote = if fee_is_quote {
585 commission.max(0.0)
586 } else {
587 0.0
588 };
589 pos.qty += net_qty;
590 pos.cost_quote += qty * price + fee_quote;
591 continue;
592 }
593
594 if pos.qty <= f64::EPSILON {
595 continue;
596 }
597 let close_qty = qty.min(pos.qty);
598 if close_qty <= f64::EPSILON {
599 continue;
600 }
601 let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
602 let fee_quote_total = if fee_is_quote {
603 commission.max(0.0)
604 } else if fee_is_base {
605 commission.max(0.0) * price
606 } else {
607 0.0
608 };
609 let fee_quote = fee_quote_total * (close_qty / qty.max(f64::EPSILON));
610 let realized_pnl = (close_qty * price - fee_quote) - (avg_cost * close_qty);
611 let realized_basis = avg_cost * close_qty;
612
613 let bucket = daily_by_key.entry((symbol.clone(), date)).or_default();
614 bucket.pnl += realized_pnl;
615 bucket.basis += realized_basis;
616
617 pos.qty -= close_qty;
618 pos.cost_quote -= realized_basis;
619 if pos.qty <= f64::EPSILON {
620 pos.qty = 0.0;
621 pos.cost_quote = 0.0;
622 }
623 }
624
625 let mut out: Vec<DailyRealizedReturn> = daily_by_key
626 .into_iter()
627 .map(|((symbol, date), b)| DailyRealizedReturn {
628 symbol,
629 date,
630 realized_return_pct: if b.basis.abs() > f64::EPSILON {
631 (b.pnl / b.basis) * 100.0
632 } else {
633 0.0
634 },
635 })
636 .collect();
637
638 out.sort_by(|a, b| b.date.cmp(&a.date).then_with(|| a.symbol.cmp(&b.symbol)));
639 if out.len() > limit {
640 out.truncate(limit);
641 }
642 Ok(out)
643}
644
645pub fn load_daily_realized_returns(limit: usize) -> Result<Vec<DailyRealizedReturn>> {
646 load_realized_returns_by_bucket(HistoryBucket::Day, limit)
647}
648
649fn split_symbol_assets(symbol: &str) -> (String, String) {
650 const QUOTE_SUFFIXES: [&str; 10] = [
651 "USDT", "USDC", "FDUSD", "BUSD", "TUSD", "TRY", "EUR", "BTC", "ETH", "BNB",
652 ];
653 for q in QUOTE_SUFFIXES {
654 if let Some(base) = symbol.strip_suffix(q) {
655 if !base.is_empty() {
656 return (base.to_string(), q.to_string());
657 }
658 }
659 }
660 (symbol.to_string(), String::new())
661}