1use anyhow::Result;
2use chrono::TimeZone;
3use rusqlite::{params, Connection};
4use std::collections::HashMap;
5
6use crate::binance::types::{BinanceAllOrder, BinanceMyTrade};
7
8#[derive(Debug, Clone)]
9pub struct PersistedTrade {
10 pub trade: BinanceMyTrade,
11 pub source: String,
12}
13
14#[derive(Debug, Clone)]
15pub struct DailyRealizedReturn {
16 pub symbol: String,
17 pub date: String,
18 pub realized_return_pct: f64,
19}
20
21#[derive(Debug, Clone)]
22pub struct StrategyScopedStats {
23 pub trade_count: u32,
24 pub win_count: u32,
25 pub lose_count: u32,
26 pub realized_pnl: f64,
27}
28
29fn ensure_trade_schema(conn: &Connection) -> Result<()> {
30 for ddl in [
31 "ALTER TABLE order_history_trades ADD COLUMN commission REAL NOT NULL DEFAULT 0.0",
32 "ALTER TABLE order_history_trades ADD COLUMN commission_asset TEXT NOT NULL DEFAULT ''",
33 "ALTER TABLE order_history_trades ADD COLUMN realized_pnl REAL NOT NULL DEFAULT 0.0",
34 ] {
35 if let Err(e) = conn.execute(ddl, []) {
36 let msg = e.to_string();
37 if !msg.contains("duplicate column name") {
38 return Err(e.into());
39 }
40 }
41 }
42 Ok(())
43}
44
45fn ensure_strategy_stats_schema(conn: &Connection) -> Result<()> {
46 conn.execute_batch(
47 r#"
48 CREATE TABLE IF NOT EXISTS strategy_symbol_stats (
49 symbol TEXT NOT NULL,
50 source TEXT NOT NULL,
51 trade_count INTEGER NOT NULL,
52 win_count INTEGER NOT NULL,
53 lose_count INTEGER NOT NULL,
54 realized_pnl REAL NOT NULL,
55 updated_at_ms INTEGER NOT NULL,
56 PRIMARY KEY(symbol, source)
57 );
58 "#,
59 )?;
60 Ok(())
61}
62
63pub fn persist_strategy_symbol_stats(
64 symbol: &str,
65 stats: &HashMap<String, StrategyScopedStats>,
66) -> Result<()> {
67 std::fs::create_dir_all("data")?;
68 let mut conn = Connection::open("data/strategy_stats.sqlite")?;
69 ensure_strategy_stats_schema(&conn)?;
70 let tx = conn.transaction()?;
71 let now_ms = chrono::Utc::now().timestamp_millis();
72
73 for (source, row) in stats {
74 tx.execute(
75 r#"
76 INSERT INTO strategy_symbol_stats (
77 symbol, source, trade_count, win_count, lose_count, realized_pnl, updated_at_ms
78 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
79 ON CONFLICT(symbol, source) DO UPDATE SET
80 trade_count = excluded.trade_count,
81 win_count = excluded.win_count,
82 lose_count = excluded.lose_count,
83 realized_pnl = excluded.realized_pnl,
84 updated_at_ms = excluded.updated_at_ms
85 "#,
86 params![
87 symbol,
88 source,
89 row.trade_count as i64,
90 row.win_count as i64,
91 row.lose_count as i64,
92 row.realized_pnl,
93 now_ms,
94 ],
95 )?;
96 }
97 tx.commit()?;
98 Ok(())
99}
100
101pub fn load_strategy_symbol_stats(symbol: &str) -> Result<HashMap<String, StrategyScopedStats>> {
102 std::fs::create_dir_all("data")?;
103 let conn = Connection::open("data/strategy_stats.sqlite")?;
104 ensure_strategy_stats_schema(&conn)?;
105 let mut stmt = conn.prepare(
106 r#"
107 SELECT source, trade_count, win_count, lose_count, realized_pnl
108 FROM strategy_symbol_stats
109 WHERE symbol = ?1
110 "#,
111 )?;
112
113 let rows = stmt.query_map([symbol], |row| {
114 Ok((
115 row.get::<_, String>(0)?,
116 StrategyScopedStats {
117 trade_count: row.get::<_, i64>(1)?.max(0) as u32,
118 win_count: row.get::<_, i64>(2)?.max(0) as u32,
119 lose_count: row.get::<_, i64>(3)?.max(0) as u32,
120 realized_pnl: row.get::<_, f64>(4)?,
121 },
122 ))
123 })?;
124
125 let mut out = HashMap::new();
126 for row in rows {
127 let (source, stats) = row?;
128 out.insert(source, stats);
129 }
130 Ok(out)
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum HistoryBucket {
135 Day,
136 Hour,
137 Month,
138}
139
140pub fn persist_order_snapshot(
141 symbol: &str,
142 orders: &[BinanceAllOrder],
143 trades: &[BinanceMyTrade],
144) -> Result<()> {
145 std::fs::create_dir_all("data")?;
146 let mut conn = Connection::open("data/order_history.sqlite")?;
147 conn.execute_batch(
148 r#"
149 CREATE TABLE IF NOT EXISTS order_history_orders (
150 symbol TEXT NOT NULL,
151 order_id INTEGER NOT NULL,
152 client_order_id TEXT NOT NULL,
153 status TEXT NOT NULL,
154 side TEXT NOT NULL,
155 orig_qty REAL NOT NULL,
156 executed_qty REAL NOT NULL,
157 avg_price REAL NOT NULL,
158 event_time_ms INTEGER NOT NULL,
159 source TEXT NOT NULL,
160 updated_at_ms INTEGER NOT NULL,
161 PRIMARY KEY(symbol, order_id)
162 );
163
164 CREATE TABLE IF NOT EXISTS order_history_trades (
165 symbol TEXT NOT NULL,
166 trade_id INTEGER NOT NULL,
167 order_id INTEGER NOT NULL,
168 side TEXT NOT NULL,
169 qty REAL NOT NULL,
170 price REAL NOT NULL,
171 commission REAL NOT NULL DEFAULT 0.0,
172 commission_asset TEXT NOT NULL DEFAULT '',
173 event_time_ms INTEGER NOT NULL,
174 realized_pnl REAL NOT NULL DEFAULT 0.0,
175 source TEXT NOT NULL,
176 updated_at_ms INTEGER NOT NULL,
177 PRIMARY KEY(symbol, trade_id)
178 );
179 "#,
180 )?;
181 ensure_trade_schema(&conn)?;
182
183 let now_ms = chrono::Utc::now().timestamp_millis();
184 let tx = conn.transaction()?;
185 let mut source_by_order_id = std::collections::HashMap::new();
186
187 for o in orders {
188 let avg_price = if o.executed_qty > 0.0 {
189 o.cummulative_quote_qty / o.executed_qty
190 } else {
191 o.price
192 };
193 tx.execute(
194 r#"
195 INSERT INTO order_history_orders (
196 symbol, order_id, client_order_id, status, side,
197 orig_qty, executed_qty, avg_price, event_time_ms, source, updated_at_ms
198 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
199 ON CONFLICT(symbol, order_id) DO UPDATE SET
200 client_order_id = excluded.client_order_id,
201 status = excluded.status,
202 side = excluded.side,
203 orig_qty = excluded.orig_qty,
204 executed_qty = excluded.executed_qty,
205 avg_price = excluded.avg_price,
206 event_time_ms = excluded.event_time_ms,
207 source = excluded.source,
208 updated_at_ms = excluded.updated_at_ms
209 "#,
210 params![
211 symbol,
212 o.order_id as i64,
213 o.client_order_id,
214 o.status,
215 o.side,
216 o.orig_qty,
217 o.executed_qty,
218 avg_price,
219 o.update_time.max(o.time) as i64,
220 source_label_from_client_order_id(&o.client_order_id),
221 now_ms,
222 ],
223 )?;
224 source_by_order_id.insert(
225 o.order_id,
226 source_label_from_client_order_id(&o.client_order_id),
227 );
228 }
229
230 for t in trades {
231 tx.execute(
232 r#"
233 INSERT INTO order_history_trades (
234 symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source, updated_at_ms
235 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
236 ON CONFLICT(symbol, trade_id) DO UPDATE SET
237 order_id = excluded.order_id,
238 side = excluded.side,
239 qty = excluded.qty,
240 price = excluded.price,
241 commission = excluded.commission,
242 commission_asset = excluded.commission_asset,
243 event_time_ms = excluded.event_time_ms,
244 realized_pnl = excluded.realized_pnl,
245 source = excluded.source,
246 updated_at_ms = excluded.updated_at_ms
247 "#,
248 params![
249 symbol,
250 t.id as i64,
251 t.order_id as i64,
252 if t.is_buyer { "BUY" } else { "SELL" },
253 t.qty,
254 t.price,
255 t.commission,
256 t.commission_asset,
257 t.time as i64,
258 t.realized_pnl,
259 source_by_order_id
260 .get(&t.order_id)
261 .map(String::as_str)
262 .unwrap_or("UNKNOWN"),
263 now_ms,
264 ],
265 )?;
266 }
267
268 tx.commit()?;
269 Ok(())
270}
271
272fn source_label_from_client_order_id(client_order_id: &str) -> String {
273 if client_order_id.contains("-mnl-") {
274 "MANUAL".to_string()
275 } else if client_order_id.contains("-cfg-") {
276 "MA(Config)".to_string()
277 } else if client_order_id.contains("-fst-") {
278 "MA(Fast 5/20)".to_string()
279 } else if client_order_id.contains("-slw-") {
280 "MA(Slow 20/60)".to_string()
281 } else if let Some(source_tag) = parse_source_tag_from_client_order_id(client_order_id) {
282 source_tag.to_ascii_lowercase()
283 } else {
284 "UNKNOWN".to_string()
285 }
286}
287
288fn parse_source_tag_from_client_order_id(client_order_id: &str) -> Option<&str> {
289 let body = client_order_id.strip_prefix("sq-")?;
290 let (source_tag, _) = body.split_once('-')?;
291 if source_tag.is_empty() {
292 None
293 } else {
294 Some(source_tag)
295 }
296}
297
298pub fn load_persisted_trades(symbol: &str) -> Result<Vec<PersistedTrade>> {
299 std::fs::create_dir_all("data")?;
300 let conn = Connection::open("data/order_history.sqlite")?;
301 ensure_trade_schema(&conn)?;
302 let mut stmt = conn.prepare(
303 r#"
304 SELECT trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source
305 FROM order_history_trades
306 WHERE symbol = ?1
307 ORDER BY event_time_ms ASC, trade_id ASC
308 "#,
309 )?;
310
311 let rows = stmt.query_map([symbol], |row| {
312 let side: String = row.get(2)?;
313 let is_buyer = side.eq_ignore_ascii_case("BUY");
314 let trade = BinanceMyTrade {
315 symbol: symbol.to_string(),
316 id: row.get::<_, i64>(0)? as u64,
317 order_id: row.get::<_, i64>(1)? as u64,
318 price: row.get(4)?,
319 qty: row.get(3)?,
320 commission: row.get(5)?,
321 commission_asset: row.get(6)?,
322 time: row.get::<_, i64>(7)? as u64,
323 realized_pnl: row.get(8)?,
324 is_buyer,
325 is_maker: false,
326 };
327 Ok(PersistedTrade {
328 trade,
329 source: row.get(9)?,
330 })
331 })?;
332
333 let mut trades = Vec::new();
334 for row in rows {
335 trades.push(row?);
336 }
337 Ok(trades)
338}
339
340pub fn load_last_trade_id(symbol: &str) -> Result<Option<u64>> {
341 std::fs::create_dir_all("data")?;
342 let conn = Connection::open("data/order_history.sqlite")?;
343 let mut stmt = conn.prepare(
344 r#"
345 SELECT MAX(trade_id)
346 FROM order_history_trades
347 WHERE symbol = ?1
348 "#,
349 )?;
350 let max_id = stmt.query_row([symbol], |row| row.get::<_, Option<i64>>(0))?;
351 Ok(max_id.map(|v| v as u64))
352}
353
354pub fn load_trade_count(symbol: &str) -> Result<usize> {
355 std::fs::create_dir_all("data")?;
356 let conn = Connection::open("data/order_history.sqlite")?;
357 let mut stmt = conn.prepare(
358 r#"
359 SELECT COUNT(*)
360 FROM order_history_trades
361 WHERE symbol = ?1
362 "#,
363 )?;
364 let count = stmt.query_row([symbol], |row| row.get::<_, i64>(0))?;
365 Ok(count.max(0) as usize)
366}
367
368#[derive(Clone, Copy, Default)]
369struct LongPos {
370 qty: f64,
371 cost_quote: f64,
372}
373
374#[derive(Clone, Copy, Default)]
375struct DailyBucket {
376 pnl: f64,
377 basis: f64,
378}
379
380pub fn load_realized_returns_by_bucket(
381 bucket: HistoryBucket,
382 limit: usize,
383) -> Result<Vec<DailyRealizedReturn>> {
384 std::fs::create_dir_all("data")?;
385 let conn = Connection::open("data/order_history.sqlite")?;
386 ensure_trade_schema(&conn)?;
387 let mut stmt = conn.prepare(
388 r#"
389 SELECT symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl
390 FROM order_history_trades
391 ORDER BY symbol ASC, event_time_ms ASC, trade_id ASC
392 "#,
393 )?;
394
395 let rows = stmt.query_map([], |row| {
396 Ok((
397 row.get::<_, String>(0)?,
398 row.get::<_, i64>(1)? as u64,
399 row.get::<_, i64>(2)? as u64,
400 row.get::<_, String>(3)?,
401 row.get::<_, f64>(4)?,
402 row.get::<_, f64>(5)?,
403 row.get::<_, f64>(6)?,
404 row.get::<_, String>(7)?,
405 row.get::<_, i64>(8)? as u64,
406 row.get::<_, f64>(9)?,
407 ))
408 })?;
409
410 let mut pos_by_symbol: HashMap<String, LongPos> = HashMap::new();
411 let mut daily_by_key: HashMap<(String, String), DailyBucket> = HashMap::new();
412
413 for row in rows {
414 let (
415 symbol,
416 _trade_id,
417 _order_id,
418 side,
419 qty_raw,
420 price,
421 commission,
422 commission_asset,
423 event_time_ms,
424 realized_pnl,
425 ) = row?;
426 let qty = qty_raw.max(0.0);
427 if qty <= f64::EPSILON {
428 continue;
429 }
430
431 let (base_asset, quote_asset) = split_symbol_assets(&symbol);
432 let fee_is_base =
433 !base_asset.is_empty() && commission_asset.eq_ignore_ascii_case(&base_asset);
434 let fee_is_quote =
435 !quote_asset.is_empty() && commission_asset.eq_ignore_ascii_case("e_asset);
436 let pos = pos_by_symbol.entry(symbol.clone()).or_default();
437
438 let date = chrono::Utc
439 .timestamp_millis_opt(event_time_ms as i64)
440 .single()
441 .map(|dt| dt.with_timezone(&chrono::Local))
442 .map(|dt| match bucket {
443 HistoryBucket::Day => dt.format("%Y-%m-%d").to_string(),
444 HistoryBucket::Hour => dt.format("%Y-%m-%d %H:00").to_string(),
445 HistoryBucket::Month => dt.format("%Y-%m").to_string(),
446 })
447 .unwrap_or_else(|| "unknown".to_string());
448
449 if symbol.ends_with("#FUT") {
451 let basis = (qty * price).abs();
452 let bucket = daily_by_key.entry((symbol.clone(), date)).or_default();
453 bucket.pnl += realized_pnl;
454 bucket.basis += basis;
455 continue;
456 }
457
458 if side.eq_ignore_ascii_case("BUY") {
459 let net_qty = (qty
460 - if fee_is_base {
461 commission.max(0.0)
462 } else {
463 0.0
464 })
465 .max(0.0);
466 if net_qty <= f64::EPSILON {
467 continue;
468 }
469 let fee_quote = if fee_is_quote {
470 commission.max(0.0)
471 } else {
472 0.0
473 };
474 pos.qty += net_qty;
475 pos.cost_quote += qty * price + fee_quote;
476 continue;
477 }
478
479 if pos.qty <= f64::EPSILON {
480 continue;
481 }
482 let close_qty = qty.min(pos.qty);
483 if close_qty <= f64::EPSILON {
484 continue;
485 }
486 let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
487 let fee_quote_total = if fee_is_quote {
488 commission.max(0.0)
489 } else if fee_is_base {
490 commission.max(0.0) * price
491 } else {
492 0.0
493 };
494 let fee_quote = fee_quote_total * (close_qty / qty.max(f64::EPSILON));
495 let realized_pnl = (close_qty * price - fee_quote) - (avg_cost * close_qty);
496 let realized_basis = avg_cost * close_qty;
497
498 let bucket = daily_by_key.entry((symbol.clone(), date)).or_default();
499 bucket.pnl += realized_pnl;
500 bucket.basis += realized_basis;
501
502 pos.qty -= close_qty;
503 pos.cost_quote -= realized_basis;
504 if pos.qty <= f64::EPSILON {
505 pos.qty = 0.0;
506 pos.cost_quote = 0.0;
507 }
508 }
509
510 let mut out: Vec<DailyRealizedReturn> = daily_by_key
511 .into_iter()
512 .map(|((symbol, date), b)| DailyRealizedReturn {
513 symbol,
514 date,
515 realized_return_pct: if b.basis.abs() > f64::EPSILON {
516 (b.pnl / b.basis) * 100.0
517 } else {
518 0.0
519 },
520 })
521 .collect();
522
523 out.sort_by(|a, b| b.date.cmp(&a.date).then_with(|| a.symbol.cmp(&b.symbol)));
524 if out.len() > limit {
525 out.truncate(limit);
526 }
527 Ok(out)
528}
529
530pub fn load_daily_realized_returns(limit: usize) -> Result<Vec<DailyRealizedReturn>> {
531 load_realized_returns_by_bucket(HistoryBucket::Day, limit)
532}
533
534fn split_symbol_assets(symbol: &str) -> (String, String) {
535 const QUOTE_SUFFIXES: [&str; 10] = [
536 "USDT", "USDC", "FDUSD", "BUSD", "TUSD", "TRY", "EUR", "BTC", "ETH", "BNB",
537 ];
538 for q in QUOTE_SUFFIXES {
539 if let Some(base) = symbol.strip_suffix(q) {
540 if !base.is_empty() {
541 return (base.to_string(), q.to_string());
542 }
543 }
544 }
545 (symbol.to_string(), String::new())
546}