1use anyhow::Result;
2use chrono::TimeZone;
3use rusqlite::{params, Connection};
4use std::collections::HashMap;
5
6use crate::binance::types::{BinanceAllOrder, BinanceMyTrade};
7
8#[derive(Debug, Clone)]
9pub struct PersistedTrade {
10 pub trade: BinanceMyTrade,
11 pub source: String,
12}
13
14#[derive(Debug, Clone)]
15pub struct DailyRealizedReturn {
16 pub symbol: String,
17 pub date: String,
18 pub realized_return_pct: f64,
19}
20
21#[derive(Debug, Clone)]
22pub struct StrategyScopedStats {
23 pub trade_count: u32,
24 pub win_count: u32,
25 pub lose_count: u32,
26 pub realized_pnl: f64,
27}
28
29fn ensure_trade_schema(conn: &Connection) -> Result<()> {
30 for ddl in [
31 "ALTER TABLE order_history_trades ADD COLUMN commission REAL NOT NULL DEFAULT 0.0",
32 "ALTER TABLE order_history_trades ADD COLUMN commission_asset TEXT NOT NULL DEFAULT ''",
33 "ALTER TABLE order_history_trades ADD COLUMN realized_pnl REAL NOT NULL DEFAULT 0.0",
34 ] {
35 if let Err(e) = conn.execute(ddl, []) {
36 let msg = e.to_string();
37 if !msg.contains("duplicate column name") {
38 return Err(e.into());
39 }
40 }
41 }
42 Ok(())
43}
44
45fn ensure_strategy_stats_schema(conn: &Connection) -> Result<()> {
46 conn.execute_batch(
47 r#"
48 CREATE TABLE IF NOT EXISTS strategy_symbol_stats (
49 symbol TEXT NOT NULL,
50 source TEXT NOT NULL,
51 trade_count INTEGER NOT NULL,
52 win_count INTEGER NOT NULL,
53 lose_count INTEGER NOT NULL,
54 realized_pnl REAL NOT NULL,
55 updated_at_ms INTEGER NOT NULL,
56 PRIMARY KEY(symbol, source)
57 );
58 "#,
59 )?;
60 Ok(())
61}
62
63pub fn persist_strategy_symbol_stats(
64 symbol: &str,
65 stats: &HashMap<String, StrategyScopedStats>,
66) -> Result<()> {
67 std::fs::create_dir_all("data")?;
68 let mut conn = Connection::open("data/strategy_stats.sqlite")?;
69 ensure_strategy_stats_schema(&conn)?;
70 let tx = conn.transaction()?;
71 let now_ms = chrono::Utc::now().timestamp_millis();
72
73 for (source, row) in stats {
74 tx.execute(
75 r#"
76 INSERT INTO strategy_symbol_stats (
77 symbol, source, trade_count, win_count, lose_count, realized_pnl, updated_at_ms
78 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
79 ON CONFLICT(symbol, source) DO UPDATE SET
80 trade_count = excluded.trade_count,
81 win_count = excluded.win_count,
82 lose_count = excluded.lose_count,
83 realized_pnl = excluded.realized_pnl,
84 updated_at_ms = excluded.updated_at_ms
85 "#,
86 params![
87 symbol,
88 source,
89 row.trade_count as i64,
90 row.win_count as i64,
91 row.lose_count as i64,
92 row.realized_pnl,
93 now_ms,
94 ],
95 )?;
96 }
97 tx.commit()?;
98 Ok(())
99}
100
101pub fn load_strategy_symbol_stats(symbol: &str) -> Result<HashMap<String, StrategyScopedStats>> {
102 std::fs::create_dir_all("data")?;
103 let conn = Connection::open("data/strategy_stats.sqlite")?;
104 ensure_strategy_stats_schema(&conn)?;
105 let mut stmt = conn.prepare(
106 r#"
107 SELECT source, trade_count, win_count, lose_count, realized_pnl
108 FROM strategy_symbol_stats
109 WHERE symbol = ?1
110 "#,
111 )?;
112
113 let rows = stmt.query_map([symbol], |row| {
114 Ok((
115 row.get::<_, String>(0)?,
116 StrategyScopedStats {
117 trade_count: row.get::<_, i64>(1)?.max(0) as u32,
118 win_count: row.get::<_, i64>(2)?.max(0) as u32,
119 lose_count: row.get::<_, i64>(3)?.max(0) as u32,
120 realized_pnl: row.get::<_, f64>(4)?,
121 },
122 ))
123 })?;
124
125 let mut out = HashMap::new();
126 for row in rows {
127 let (source, stats) = row?;
128 out.insert(source, stats);
129 }
130 Ok(out)
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum HistoryBucket {
135 Day,
136 Hour,
137 Month,
138}
139
140pub fn persist_order_snapshot(
141 symbol: &str,
142 orders: &[BinanceAllOrder],
143 trades: &[BinanceMyTrade],
144) -> Result<()> {
145 std::fs::create_dir_all("data")?;
146 let mut conn = Connection::open("data/order_history.sqlite")?;
147 conn.execute_batch(
148 r#"
149 CREATE TABLE IF NOT EXISTS order_history_orders (
150 symbol TEXT NOT NULL,
151 order_id INTEGER NOT NULL,
152 client_order_id TEXT NOT NULL,
153 status TEXT NOT NULL,
154 side TEXT NOT NULL,
155 orig_qty REAL NOT NULL,
156 executed_qty REAL NOT NULL,
157 avg_price REAL NOT NULL,
158 event_time_ms INTEGER NOT NULL,
159 source TEXT NOT NULL,
160 updated_at_ms INTEGER NOT NULL,
161 PRIMARY KEY(symbol, order_id)
162 );
163
164 CREATE TABLE IF NOT EXISTS order_history_trades (
165 symbol TEXT NOT NULL,
166 trade_id INTEGER NOT NULL,
167 order_id INTEGER NOT NULL,
168 side TEXT NOT NULL,
169 qty REAL NOT NULL,
170 price REAL NOT NULL,
171 commission REAL NOT NULL DEFAULT 0.0,
172 commission_asset TEXT NOT NULL DEFAULT '',
173 event_time_ms INTEGER NOT NULL,
174 realized_pnl REAL NOT NULL DEFAULT 0.0,
175 source TEXT NOT NULL,
176 updated_at_ms INTEGER NOT NULL,
177 PRIMARY KEY(symbol, trade_id)
178 );
179 "#,
180 )?;
181 ensure_trade_schema(&conn)?;
182
183 let now_ms = chrono::Utc::now().timestamp_millis();
184 let tx = conn.transaction()?;
185 let mut source_by_order_id = std::collections::HashMap::new();
186
187 for o in orders {
188 let avg_price = if o.executed_qty > 0.0 {
189 o.cummulative_quote_qty / o.executed_qty
190 } else {
191 o.price
192 };
193 tx.execute(
194 r#"
195 INSERT INTO order_history_orders (
196 symbol, order_id, client_order_id, status, side,
197 orig_qty, executed_qty, avg_price, event_time_ms, source, updated_at_ms
198 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
199 ON CONFLICT(symbol, order_id) DO UPDATE SET
200 client_order_id = excluded.client_order_id,
201 status = excluded.status,
202 side = excluded.side,
203 orig_qty = excluded.orig_qty,
204 executed_qty = excluded.executed_qty,
205 avg_price = excluded.avg_price,
206 event_time_ms = excluded.event_time_ms,
207 source = excluded.source,
208 updated_at_ms = excluded.updated_at_ms
209 "#,
210 params![
211 symbol,
212 o.order_id as i64,
213 o.client_order_id,
214 o.status,
215 o.side,
216 o.orig_qty,
217 o.executed_qty,
218 avg_price,
219 o.update_time.max(o.time) as i64,
220 source_label_from_client_order_id(&o.client_order_id),
221 now_ms,
222 ],
223 )?;
224 source_by_order_id.insert(
225 o.order_id,
226 source_label_from_client_order_id(&o.client_order_id).to_string(),
227 );
228 }
229
230 for t in trades {
231 tx.execute(
232 r#"
233 INSERT INTO order_history_trades (
234 symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source, updated_at_ms
235 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
236 ON CONFLICT(symbol, trade_id) DO UPDATE SET
237 order_id = excluded.order_id,
238 side = excluded.side,
239 qty = excluded.qty,
240 price = excluded.price,
241 commission = excluded.commission,
242 commission_asset = excluded.commission_asset,
243 event_time_ms = excluded.event_time_ms,
244 realized_pnl = excluded.realized_pnl,
245 source = excluded.source,
246 updated_at_ms = excluded.updated_at_ms
247 "#,
248 params![
249 symbol,
250 t.id as i64,
251 t.order_id as i64,
252 if t.is_buyer { "BUY" } else { "SELL" },
253 t.qty,
254 t.price,
255 t.commission,
256 t.commission_asset,
257 t.time as i64,
258 t.realized_pnl,
259 source_by_order_id
260 .get(&t.order_id)
261 .map(String::as_str)
262 .unwrap_or("UNKNOWN"),
263 now_ms,
264 ],
265 )?;
266 }
267
268 tx.commit()?;
269 Ok(())
270}
271
272fn source_label_from_client_order_id(client_order_id: &str) -> &'static str {
273 if client_order_id.contains("-mnl-") {
274 "MANUAL"
275 } else if client_order_id.contains("-cfg-") {
276 "MA(Config)"
277 } else if client_order_id.contains("-fst-") {
278 "MA(Fast 5/20)"
279 } else if client_order_id.contains("-slw-") {
280 "MA(Slow 20/60)"
281 } else {
282 "UNKNOWN"
283 }
284}
285
286pub fn load_persisted_trades(symbol: &str) -> Result<Vec<PersistedTrade>> {
287 std::fs::create_dir_all("data")?;
288 let conn = Connection::open("data/order_history.sqlite")?;
289 ensure_trade_schema(&conn)?;
290 let mut stmt = conn.prepare(
291 r#"
292 SELECT trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source
293 FROM order_history_trades
294 WHERE symbol = ?1
295 ORDER BY event_time_ms ASC, trade_id ASC
296 "#,
297 )?;
298
299 let rows = stmt.query_map([symbol], |row| {
300 let side: String = row.get(2)?;
301 let is_buyer = side.eq_ignore_ascii_case("BUY");
302 let trade = BinanceMyTrade {
303 symbol: symbol.to_string(),
304 id: row.get::<_, i64>(0)? as u64,
305 order_id: row.get::<_, i64>(1)? as u64,
306 price: row.get(4)?,
307 qty: row.get(3)?,
308 commission: row.get(5)?,
309 commission_asset: row.get(6)?,
310 time: row.get::<_, i64>(7)? as u64,
311 realized_pnl: row.get(8)?,
312 is_buyer,
313 is_maker: false,
314 };
315 Ok(PersistedTrade {
316 trade,
317 source: row.get(9)?,
318 })
319 })?;
320
321 let mut trades = Vec::new();
322 for row in rows {
323 trades.push(row?);
324 }
325 Ok(trades)
326}
327
328pub fn load_last_trade_id(symbol: &str) -> Result<Option<u64>> {
329 std::fs::create_dir_all("data")?;
330 let conn = Connection::open("data/order_history.sqlite")?;
331 let mut stmt = conn.prepare(
332 r#"
333 SELECT MAX(trade_id)
334 FROM order_history_trades
335 WHERE symbol = ?1
336 "#,
337 )?;
338 let max_id = stmt.query_row([symbol], |row| row.get::<_, Option<i64>>(0))?;
339 Ok(max_id.map(|v| v as u64))
340}
341
342pub fn load_trade_count(symbol: &str) -> Result<usize> {
343 std::fs::create_dir_all("data")?;
344 let conn = Connection::open("data/order_history.sqlite")?;
345 let mut stmt = conn.prepare(
346 r#"
347 SELECT COUNT(*)
348 FROM order_history_trades
349 WHERE symbol = ?1
350 "#,
351 )?;
352 let count = stmt.query_row([symbol], |row| row.get::<_, i64>(0))?;
353 Ok(count.max(0) as usize)
354}
355
356#[derive(Clone, Copy, Default)]
357struct LongPos {
358 qty: f64,
359 cost_quote: f64,
360}
361
362#[derive(Clone, Copy, Default)]
363struct DailyBucket {
364 pnl: f64,
365 basis: f64,
366}
367
368pub fn load_realized_returns_by_bucket(
369 bucket: HistoryBucket,
370 limit: usize,
371) -> Result<Vec<DailyRealizedReturn>> {
372 std::fs::create_dir_all("data")?;
373 let conn = Connection::open("data/order_history.sqlite")?;
374 ensure_trade_schema(&conn)?;
375 let mut stmt = conn.prepare(
376 r#"
377 SELECT symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl
378 FROM order_history_trades
379 ORDER BY symbol ASC, event_time_ms ASC, trade_id ASC
380 "#,
381 )?;
382
383 let rows = stmt.query_map([], |row| {
384 Ok((
385 row.get::<_, String>(0)?,
386 row.get::<_, i64>(1)? as u64,
387 row.get::<_, i64>(2)? as u64,
388 row.get::<_, String>(3)?,
389 row.get::<_, f64>(4)?,
390 row.get::<_, f64>(5)?,
391 row.get::<_, f64>(6)?,
392 row.get::<_, String>(7)?,
393 row.get::<_, i64>(8)? as u64,
394 row.get::<_, f64>(9)?,
395 ))
396 })?;
397
398 let mut pos_by_symbol: HashMap<String, LongPos> = HashMap::new();
399 let mut daily_by_key: HashMap<(String, String), DailyBucket> = HashMap::new();
400
401 for row in rows {
402 let (
403 symbol,
404 _trade_id,
405 _order_id,
406 side,
407 qty_raw,
408 price,
409 commission,
410 commission_asset,
411 event_time_ms,
412 realized_pnl,
413 ) = row?;
414 let qty = qty_raw.max(0.0);
415 if qty <= f64::EPSILON {
416 continue;
417 }
418
419 let (base_asset, quote_asset) = split_symbol_assets(&symbol);
420 let fee_is_base =
421 !base_asset.is_empty() && commission_asset.eq_ignore_ascii_case(&base_asset);
422 let fee_is_quote =
423 !quote_asset.is_empty() && commission_asset.eq_ignore_ascii_case("e_asset);
424 let pos = pos_by_symbol.entry(symbol.clone()).or_default();
425
426 let date = chrono::Utc
427 .timestamp_millis_opt(event_time_ms as i64)
428 .single()
429 .map(|dt| dt.with_timezone(&chrono::Local))
430 .map(|dt| match bucket {
431 HistoryBucket::Day => dt.format("%Y-%m-%d").to_string(),
432 HistoryBucket::Hour => dt.format("%Y-%m-%d %H:00").to_string(),
433 HistoryBucket::Month => dt.format("%Y-%m").to_string(),
434 })
435 .unwrap_or_else(|| "unknown".to_string());
436
437 if symbol.ends_with("#FUT") {
439 let basis = (qty * price).abs();
440 let bucket = daily_by_key.entry((symbol.clone(), date)).or_default();
441 bucket.pnl += realized_pnl;
442 bucket.basis += basis;
443 continue;
444 }
445
446 if side.eq_ignore_ascii_case("BUY") {
447 let net_qty = (qty
448 - if fee_is_base {
449 commission.max(0.0)
450 } else {
451 0.0
452 })
453 .max(0.0);
454 if net_qty <= f64::EPSILON {
455 continue;
456 }
457 let fee_quote = if fee_is_quote {
458 commission.max(0.0)
459 } else {
460 0.0
461 };
462 pos.qty += net_qty;
463 pos.cost_quote += qty * price + fee_quote;
464 continue;
465 }
466
467 if pos.qty <= f64::EPSILON {
468 continue;
469 }
470 let close_qty = qty.min(pos.qty);
471 if close_qty <= f64::EPSILON {
472 continue;
473 }
474 let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
475 let fee_quote_total = if fee_is_quote {
476 commission.max(0.0)
477 } else if fee_is_base {
478 commission.max(0.0) * price
479 } else {
480 0.0
481 };
482 let fee_quote = fee_quote_total * (close_qty / qty.max(f64::EPSILON));
483 let realized_pnl = (close_qty * price - fee_quote) - (avg_cost * close_qty);
484 let realized_basis = avg_cost * close_qty;
485
486 let bucket = daily_by_key.entry((symbol.clone(), date)).or_default();
487 bucket.pnl += realized_pnl;
488 bucket.basis += realized_basis;
489
490 pos.qty -= close_qty;
491 pos.cost_quote -= realized_basis;
492 if pos.qty <= f64::EPSILON {
493 pos.qty = 0.0;
494 pos.cost_quote = 0.0;
495 }
496 }
497
498 let mut out: Vec<DailyRealizedReturn> = daily_by_key
499 .into_iter()
500 .map(|((symbol, date), b)| DailyRealizedReturn {
501 symbol,
502 date,
503 realized_return_pct: if b.basis.abs() > f64::EPSILON {
504 (b.pnl / b.basis) * 100.0
505 } else {
506 0.0
507 },
508 })
509 .collect();
510
511 out.sort_by(|a, b| b.date.cmp(&a.date).then_with(|| a.symbol.cmp(&b.symbol)));
512 if out.len() > limit {
513 out.truncate(limit);
514 }
515 Ok(out)
516}
517
518pub fn load_daily_realized_returns(limit: usize) -> Result<Vec<DailyRealizedReturn>> {
519 load_realized_returns_by_bucket(HistoryBucket::Day, limit)
520}
521
522fn split_symbol_assets(symbol: &str) -> (String, String) {
523 const QUOTE_SUFFIXES: [&str; 10] = [
524 "USDT", "USDC", "FDUSD", "BUSD", "TUSD", "TRY", "EUR", "BTC", "ETH", "BNB",
525 ];
526 for q in QUOTE_SUFFIXES {
527 if let Some(base) = symbol.strip_suffix(q) {
528 if !base.is_empty() {
529 return (base.to_string(), q.to_string());
530 }
531 }
532 }
533 (symbol.to_string(), String::new())
534}