1use anyhow::Result;
2use chrono::TimeZone;
3use rusqlite::{params, Connection};
4use std::collections::HashMap;
5
6use crate::binance::types::{BinanceAllOrder, BinanceMyTrade};
7
8#[derive(Debug, Clone)]
9pub struct PersistedTrade {
10 pub trade: BinanceMyTrade,
11 pub source: String,
12}
13
14#[derive(Debug, Clone)]
15pub struct DailyRealizedReturn {
16 pub symbol: String,
17 pub date: String,
18 pub realized_return_pct: f64,
19}
20
21fn ensure_trade_schema(conn: &Connection) -> Result<()> {
22 for ddl in [
23 "ALTER TABLE order_history_trades ADD COLUMN commission REAL NOT NULL DEFAULT 0.0",
24 "ALTER TABLE order_history_trades ADD COLUMN commission_asset TEXT NOT NULL DEFAULT ''",
25 "ALTER TABLE order_history_trades ADD COLUMN realized_pnl REAL NOT NULL DEFAULT 0.0",
26 ] {
27 if let Err(e) = conn.execute(ddl, []) {
28 let msg = e.to_string();
29 if !msg.contains("duplicate column name") {
30 return Err(e.into());
31 }
32 }
33 }
34 Ok(())
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum HistoryBucket {
39 Day,
40 Hour,
41 Month,
42}
43
44pub fn persist_order_snapshot(
45 symbol: &str,
46 orders: &[BinanceAllOrder],
47 trades: &[BinanceMyTrade],
48) -> Result<()> {
49 std::fs::create_dir_all("data")?;
50 let mut conn = Connection::open("data/order_history.sqlite")?;
51 conn.execute_batch(
52 r#"
53 CREATE TABLE IF NOT EXISTS order_history_orders (
54 symbol TEXT NOT NULL,
55 order_id INTEGER NOT NULL,
56 client_order_id TEXT NOT NULL,
57 status TEXT NOT NULL,
58 side TEXT NOT NULL,
59 orig_qty REAL NOT NULL,
60 executed_qty REAL NOT NULL,
61 avg_price REAL NOT NULL,
62 event_time_ms INTEGER NOT NULL,
63 source TEXT NOT NULL,
64 updated_at_ms INTEGER NOT NULL,
65 PRIMARY KEY(symbol, order_id)
66 );
67
68 CREATE TABLE IF NOT EXISTS order_history_trades (
69 symbol TEXT NOT NULL,
70 trade_id INTEGER NOT NULL,
71 order_id INTEGER NOT NULL,
72 side TEXT NOT NULL,
73 qty REAL NOT NULL,
74 price REAL NOT NULL,
75 commission REAL NOT NULL DEFAULT 0.0,
76 commission_asset TEXT NOT NULL DEFAULT '',
77 event_time_ms INTEGER NOT NULL,
78 realized_pnl REAL NOT NULL DEFAULT 0.0,
79 source TEXT NOT NULL,
80 updated_at_ms INTEGER NOT NULL,
81 PRIMARY KEY(symbol, trade_id)
82 );
83 "#,
84 )?;
85 ensure_trade_schema(&conn)?;
86
87 let now_ms = chrono::Utc::now().timestamp_millis();
88 let tx = conn.transaction()?;
89 let mut source_by_order_id = std::collections::HashMap::new();
90
91 for o in orders {
92 let avg_price = if o.executed_qty > 0.0 {
93 o.cummulative_quote_qty / o.executed_qty
94 } else {
95 o.price
96 };
97 tx.execute(
98 r#"
99 INSERT INTO order_history_orders (
100 symbol, order_id, client_order_id, status, side,
101 orig_qty, executed_qty, avg_price, event_time_ms, source, updated_at_ms
102 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
103 ON CONFLICT(symbol, order_id) DO UPDATE SET
104 client_order_id = excluded.client_order_id,
105 status = excluded.status,
106 side = excluded.side,
107 orig_qty = excluded.orig_qty,
108 executed_qty = excluded.executed_qty,
109 avg_price = excluded.avg_price,
110 event_time_ms = excluded.event_time_ms,
111 source = excluded.source,
112 updated_at_ms = excluded.updated_at_ms
113 "#,
114 params![
115 symbol,
116 o.order_id as i64,
117 o.client_order_id,
118 o.status,
119 o.side,
120 o.orig_qty,
121 o.executed_qty,
122 avg_price,
123 o.update_time.max(o.time) as i64,
124 source_label_from_client_order_id(&o.client_order_id),
125 now_ms,
126 ],
127 )?;
128 source_by_order_id.insert(
129 o.order_id,
130 source_label_from_client_order_id(&o.client_order_id).to_string(),
131 );
132 }
133
134 for t in trades {
135 tx.execute(
136 r#"
137 INSERT INTO order_history_trades (
138 symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source, updated_at_ms
139 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
140 ON CONFLICT(symbol, trade_id) DO UPDATE SET
141 order_id = excluded.order_id,
142 side = excluded.side,
143 qty = excluded.qty,
144 price = excluded.price,
145 commission = excluded.commission,
146 commission_asset = excluded.commission_asset,
147 event_time_ms = excluded.event_time_ms,
148 realized_pnl = excluded.realized_pnl,
149 source = excluded.source,
150 updated_at_ms = excluded.updated_at_ms
151 "#,
152 params![
153 symbol,
154 t.id as i64,
155 t.order_id as i64,
156 if t.is_buyer { "BUY" } else { "SELL" },
157 t.qty,
158 t.price,
159 t.commission,
160 t.commission_asset,
161 t.time as i64,
162 t.realized_pnl,
163 source_by_order_id
164 .get(&t.order_id)
165 .map(String::as_str)
166 .unwrap_or("UNKNOWN"),
167 now_ms,
168 ],
169 )?;
170 }
171
172 tx.commit()?;
173 Ok(())
174}
175
176fn source_label_from_client_order_id(client_order_id: &str) -> &'static str {
177 if client_order_id.contains("-mnl-") {
178 "MANUAL"
179 } else if client_order_id.contains("-cfg-") {
180 "MA(Config)"
181 } else if client_order_id.contains("-fst-") {
182 "MA(Fast 5/20)"
183 } else if client_order_id.contains("-slw-") {
184 "MA(Slow 20/60)"
185 } else {
186 "UNKNOWN"
187 }
188}
189
190pub fn load_persisted_trades(symbol: &str) -> Result<Vec<PersistedTrade>> {
191 std::fs::create_dir_all("data")?;
192 let conn = Connection::open("data/order_history.sqlite")?;
193 ensure_trade_schema(&conn)?;
194 let mut stmt = conn.prepare(
195 r#"
196 SELECT trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl, source
197 FROM order_history_trades
198 WHERE symbol = ?1
199 ORDER BY event_time_ms ASC, trade_id ASC
200 "#,
201 )?;
202
203 let rows = stmt.query_map([symbol], |row| {
204 let side: String = row.get(2)?;
205 let is_buyer = side.eq_ignore_ascii_case("BUY");
206 let trade = BinanceMyTrade {
207 symbol: symbol.to_string(),
208 id: row.get::<_, i64>(0)? as u64,
209 order_id: row.get::<_, i64>(1)? as u64,
210 price: row.get(4)?,
211 qty: row.get(3)?,
212 commission: row.get(5)?,
213 commission_asset: row.get(6)?,
214 time: row.get::<_, i64>(7)? as u64,
215 realized_pnl: row.get(8)?,
216 is_buyer,
217 is_maker: false,
218 };
219 Ok(PersistedTrade {
220 trade,
221 source: row.get(9)?,
222 })
223 })?;
224
225 let mut trades = Vec::new();
226 for row in rows {
227 trades.push(row?);
228 }
229 Ok(trades)
230}
231
232pub fn load_last_trade_id(symbol: &str) -> Result<Option<u64>> {
233 std::fs::create_dir_all("data")?;
234 let conn = Connection::open("data/order_history.sqlite")?;
235 let mut stmt = conn.prepare(
236 r#"
237 SELECT MAX(trade_id)
238 FROM order_history_trades
239 WHERE symbol = ?1
240 "#,
241 )?;
242 let max_id = stmt.query_row([symbol], |row| row.get::<_, Option<i64>>(0))?;
243 Ok(max_id.map(|v| v as u64))
244}
245
246pub fn load_trade_count(symbol: &str) -> Result<usize> {
247 std::fs::create_dir_all("data")?;
248 let conn = Connection::open("data/order_history.sqlite")?;
249 let mut stmt = conn.prepare(
250 r#"
251 SELECT COUNT(*)
252 FROM order_history_trades
253 WHERE symbol = ?1
254 "#,
255 )?;
256 let count = stmt.query_row([symbol], |row| row.get::<_, i64>(0))?;
257 Ok(count.max(0) as usize)
258}
259
260#[derive(Clone, Copy, Default)]
261struct LongPos {
262 qty: f64,
263 cost_quote: f64,
264}
265
266#[derive(Clone, Copy, Default)]
267struct DailyBucket {
268 pnl: f64,
269 basis: f64,
270}
271
272pub fn load_realized_returns_by_bucket(
273 bucket: HistoryBucket,
274 limit: usize,
275) -> Result<Vec<DailyRealizedReturn>> {
276 std::fs::create_dir_all("data")?;
277 let conn = Connection::open("data/order_history.sqlite")?;
278 ensure_trade_schema(&conn)?;
279 let mut stmt = conn.prepare(
280 r#"
281 SELECT symbol, trade_id, order_id, side, qty, price, commission, commission_asset, event_time_ms, realized_pnl
282 FROM order_history_trades
283 ORDER BY symbol ASC, event_time_ms ASC, trade_id ASC
284 "#,
285 )?;
286
287 let rows = stmt.query_map([], |row| {
288 Ok((
289 row.get::<_, String>(0)?,
290 row.get::<_, i64>(1)? as u64,
291 row.get::<_, i64>(2)? as u64,
292 row.get::<_, String>(3)?,
293 row.get::<_, f64>(4)?,
294 row.get::<_, f64>(5)?,
295 row.get::<_, f64>(6)?,
296 row.get::<_, String>(7)?,
297 row.get::<_, i64>(8)? as u64,
298 row.get::<_, f64>(9)?,
299 ))
300 })?;
301
302 let mut pos_by_symbol: HashMap<String, LongPos> = HashMap::new();
303 let mut daily_by_key: HashMap<(String, String), DailyBucket> = HashMap::new();
304
305 for row in rows {
306 let (
307 symbol,
308 _trade_id,
309 _order_id,
310 side,
311 qty_raw,
312 price,
313 commission,
314 commission_asset,
315 event_time_ms,
316 realized_pnl,
317 ) = row?;
318 let qty = qty_raw.max(0.0);
319 if qty <= f64::EPSILON {
320 continue;
321 }
322
323 let (base_asset, quote_asset) = split_symbol_assets(&symbol);
324 let fee_is_base =
325 !base_asset.is_empty() && commission_asset.eq_ignore_ascii_case(&base_asset);
326 let fee_is_quote =
327 !quote_asset.is_empty() && commission_asset.eq_ignore_ascii_case("e_asset);
328 let pos = pos_by_symbol.entry(symbol.clone()).or_default();
329
330 let date = chrono::Utc
331 .timestamp_millis_opt(event_time_ms as i64)
332 .single()
333 .map(|dt| dt.with_timezone(&chrono::Local))
334 .map(|dt| match bucket {
335 HistoryBucket::Day => dt.format("%Y-%m-%d").to_string(),
336 HistoryBucket::Hour => dt.format("%Y-%m-%d %H:00").to_string(),
337 HistoryBucket::Month => dt.format("%Y-%m").to_string(),
338 })
339 .unwrap_or_else(|| "unknown".to_string());
340
341 if symbol.ends_with("#FUT") {
343 let basis = (qty * price).abs();
344 let bucket = daily_by_key.entry((symbol.clone(), date)).or_default();
345 bucket.pnl += realized_pnl;
346 bucket.basis += basis;
347 continue;
348 }
349
350 if side.eq_ignore_ascii_case("BUY") {
351 let net_qty = (qty
352 - if fee_is_base {
353 commission.max(0.0)
354 } else {
355 0.0
356 })
357 .max(0.0);
358 if net_qty <= f64::EPSILON {
359 continue;
360 }
361 let fee_quote = if fee_is_quote {
362 commission.max(0.0)
363 } else {
364 0.0
365 };
366 pos.qty += net_qty;
367 pos.cost_quote += qty * price + fee_quote;
368 continue;
369 }
370
371 if pos.qty <= f64::EPSILON {
372 continue;
373 }
374 let close_qty = qty.min(pos.qty);
375 if close_qty <= f64::EPSILON {
376 continue;
377 }
378 let avg_cost = pos.cost_quote / pos.qty.max(f64::EPSILON);
379 let fee_quote_total = if fee_is_quote {
380 commission.max(0.0)
381 } else if fee_is_base {
382 commission.max(0.0) * price
383 } else {
384 0.0
385 };
386 let fee_quote = fee_quote_total * (close_qty / qty.max(f64::EPSILON));
387 let realized_pnl = (close_qty * price - fee_quote) - (avg_cost * close_qty);
388 let realized_basis = avg_cost * close_qty;
389
390 let bucket = daily_by_key.entry((symbol.clone(), date)).or_default();
391 bucket.pnl += realized_pnl;
392 bucket.basis += realized_basis;
393
394 pos.qty -= close_qty;
395 pos.cost_quote -= realized_basis;
396 if pos.qty <= f64::EPSILON {
397 pos.qty = 0.0;
398 pos.cost_quote = 0.0;
399 }
400 }
401
402 let mut out: Vec<DailyRealizedReturn> = daily_by_key
403 .into_iter()
404 .map(|((symbol, date), b)| DailyRealizedReturn {
405 symbol,
406 date,
407 realized_return_pct: if b.basis.abs() > f64::EPSILON {
408 (b.pnl / b.basis) * 100.0
409 } else {
410 0.0
411 },
412 })
413 .collect();
414
415 out.sort_by(|a, b| b.date.cmp(&a.date).then_with(|| a.symbol.cmp(&b.symbol)));
416 if out.len() > limit {
417 out.truncate(limit);
418 }
419 Ok(out)
420}
421
422pub fn load_daily_realized_returns(limit: usize) -> Result<Vec<DailyRealizedReturn>> {
423 load_realized_returns_by_bucket(HistoryBucket::Day, limit)
424}
425
426fn split_symbol_assets(symbol: &str) -> (String, String) {
427 const QUOTE_SUFFIXES: [&str; 10] = [
428 "USDT", "USDC", "FDUSD", "BUSD", "TUSD", "TRY", "EUR", "BTC", "ETH", "BNB",
429 ];
430 for q in QUOTE_SUFFIXES {
431 if let Some(base) = symbol.strip_suffix(q) {
432 if !base.is_empty() {
433 return (base.to_string(), q.to_string());
434 }
435 }
436 }
437 (symbol.to_string(), String::new())
438}