1use std::path::Path;
2
3use duckdb::{params, AccessMode, Config, Connection};
4
5use crate::app::bootstrap::BinanceMode;
6use crate::backtest_app::runner::{BacktestExitReason, BacktestReport, BacktestTrade};
7use crate::dataset::types::{
8 BacktestDatasetSummary, BacktestRunSummaryRow, BookTickerRow, DerivedKlineRow,
9 LiquidationEventRow, RecorderMetrics,
10};
11use crate::error::storage_error::StorageError;
12use crate::strategy::model::StrategyTemplate;
13
14fn open_dataset_connection_read_only(db_path: &Path) -> Result<Connection, StorageError> {
15 let config = Config::default()
16 .access_mode(AccessMode::ReadOnly)
17 .map_err(storage_err)?;
18 Connection::open_with_flags(db_path, config).map_err(|error| StorageError::DatabaseInitFailed {
19 path: db_path.display().to_string(),
20 message: error.to_string(),
21 })
22}
23
24fn open_dataset_connection_read_write(db_path: &Path) -> Result<Connection, StorageError> {
25 let config = Config::default()
26 .access_mode(AccessMode::ReadWrite)
27 .map_err(storage_err)?;
28 Connection::open_with_flags(db_path, config).map_err(|error| StorageError::DatabaseInitFailed {
29 path: db_path.display().to_string(),
30 message: error.to_string(),
31 })
32}
33
34pub fn metrics_for_path(db_path: &Path) -> Result<RecorderMetrics, StorageError> {
35 if !db_path.exists() {
36 return Ok(RecorderMetrics::default());
37 }
38 let connection = open_dataset_connection_read_only(db_path)?;
39
40 Ok(RecorderMetrics {
41 liquidation_events: query_count(&connection, "raw_liquidation_events")?,
42 book_ticker_events: query_count(&connection, "raw_book_ticker")?,
43 agg_trade_events: query_count(&connection, "raw_agg_trades")?,
44 derived_kline_1s_bars: query_count(&connection, "derived_kline_1s")?,
45 schema_version: query_schema_version(&connection)?,
46 last_liquidation_event_time: query_latest_timestamp(
47 &connection,
48 "raw_liquidation_events",
49 "event_time",
50 )?,
51 last_book_ticker_event_time: query_latest_timestamp(
52 &connection,
53 "raw_book_ticker",
54 "event_time",
55 )?,
56 last_agg_trade_event_time: query_latest_timestamp(
57 &connection,
58 "raw_agg_trades",
59 "event_time",
60 )?,
61 top_liquidation_symbols: query_top_symbols(&connection, "raw_liquidation_events")?,
62 top_book_ticker_symbols: query_top_symbols(&connection, "raw_book_ticker")?,
63 top_agg_trade_symbols: query_top_symbols(&connection, "raw_agg_trades")?,
64 })
65}
66
67pub fn backtest_summary_for_path(
68 db_path: &Path,
69 mode: BinanceMode,
70 symbol: &str,
71 from: chrono::NaiveDate,
72 to: chrono::NaiveDate,
73) -> Result<BacktestDatasetSummary, StorageError> {
74 if !db_path.exists() {
75 return Ok(BacktestDatasetSummary {
76 mode,
77 symbol: symbol.to_string(),
78 symbol_found: false,
79 from: from.to_string(),
80 to: to.to_string(),
81 liquidation_events: 0,
82 book_ticker_events: 0,
83 agg_trade_events: 0,
84 derived_kline_1s_bars: 0,
85 });
86 }
87 let connection = open_dataset_connection_read_only(db_path)?;
88 let symbol_found = market_data_symbol_exists(&connection, symbol)?;
89 let from_ts = format!("{from} 00:00:00");
90 let to_ts = format!("{to} 23:59:59");
91 Ok(BacktestDatasetSummary {
92 mode,
93 symbol: symbol.to_string(),
94 symbol_found,
95 from: from.to_string(),
96 to: to.to_string(),
97 liquidation_events: query_count_in_range(
98 &connection,
99 "raw_liquidation_events",
100 "event_time",
101 symbol,
102 &from_ts,
103 &to_ts,
104 )?,
105 book_ticker_events: query_count_in_range(
106 &connection,
107 "raw_book_ticker",
108 "event_time",
109 symbol,
110 &from_ts,
111 &to_ts,
112 )?,
113 agg_trade_events: query_count_in_range(
114 &connection,
115 "raw_agg_trades",
116 "event_time",
117 symbol,
118 &from_ts,
119 &to_ts,
120 )?,
121 derived_kline_1s_bars: query_count_in_range(
122 &connection,
123 "derived_kline_1s",
124 "open_time",
125 symbol,
126 &from_ts,
127 &to_ts,
128 )?,
129 })
130}
131
132pub fn load_liquidation_events_for_path(
133 db_path: &Path,
134 symbol: &str,
135 from: chrono::NaiveDate,
136 to: chrono::NaiveDate,
137) -> Result<Vec<LiquidationEventRow>, StorageError> {
138 if !db_path.exists() {
139 return Ok(Vec::new());
140 }
141 let connection = open_dataset_connection_read_only(db_path)?;
142 let from_ts = format!("{from} 00:00:00");
143 let to_ts = format!("{to} 23:59:59");
144 let mut statement = connection
145 .prepare(
146 "SELECT epoch_ms(event_time), force_side, price, qty, notional
147 FROM raw_liquidation_events
148 WHERE symbol = ? AND event_time >= CAST(? AS TIMESTAMP) AND event_time <= CAST(? AS TIMESTAMP)
149 ORDER BY event_time ASC",
150 )
151 .map_err(|error| StorageError::WriteFailedWithContext {
152 message: error.to_string(),
153 })?;
154 let mut rows = statement
155 .query(params![symbol, from_ts, to_ts])
156 .map_err(|error| StorageError::WriteFailedWithContext {
157 message: error.to_string(),
158 })?;
159 let mut result = Vec::new();
160 while let Some(row) = rows
161 .next()
162 .map_err(|error| StorageError::WriteFailedWithContext {
163 message: error.to_string(),
164 })?
165 {
166 result.push(LiquidationEventRow {
167 event_time_ms: row
168 .get(0)
169 .map_err(|error| StorageError::WriteFailedWithContext {
170 message: error.to_string(),
171 })?,
172 force_side: row
173 .get(1)
174 .map_err(|error| StorageError::WriteFailedWithContext {
175 message: error.to_string(),
176 })?,
177 price: row
178 .get(2)
179 .map_err(|error| StorageError::WriteFailedWithContext {
180 message: error.to_string(),
181 })?,
182 qty: row
183 .get(3)
184 .map_err(|error| StorageError::WriteFailedWithContext {
185 message: error.to_string(),
186 })?,
187 notional: row
188 .get(4)
189 .map_err(|error| StorageError::WriteFailedWithContext {
190 message: error.to_string(),
191 })?,
192 });
193 }
194 Ok(result)
195}
196
197pub fn load_book_ticker_rows_for_path(
198 db_path: &Path,
199 symbol: &str,
200 from: chrono::NaiveDate,
201 to: chrono::NaiveDate,
202) -> Result<Vec<BookTickerRow>, StorageError> {
203 if !db_path.exists() {
204 return Ok(Vec::new());
205 }
206 let connection = open_dataset_connection_read_only(db_path)?;
207 let from_ts = format!("{from} 00:00:00");
208 let to_ts = format!("{to} 23:59:59");
209 let mut statement = connection
210 .prepare(
211 "SELECT epoch_ms(event_time), bid, ask
212 FROM raw_book_ticker
213 WHERE symbol = ? AND event_time >= CAST(? AS TIMESTAMP) AND event_time <= CAST(? AS TIMESTAMP)
214 ORDER BY event_time ASC",
215 )
216 .map_err(|error| StorageError::WriteFailedWithContext {
217 message: error.to_string(),
218 })?;
219 let mut rows = statement
220 .query(params![symbol, from_ts, to_ts])
221 .map_err(|error| StorageError::WriteFailedWithContext {
222 message: error.to_string(),
223 })?;
224 let mut result = Vec::new();
225 while let Some(row) = rows
226 .next()
227 .map_err(|error| StorageError::WriteFailedWithContext {
228 message: error.to_string(),
229 })?
230 {
231 result.push(BookTickerRow {
232 event_time_ms: row
233 .get(0)
234 .map_err(|error| StorageError::WriteFailedWithContext {
235 message: error.to_string(),
236 })?,
237 bid: row
238 .get(1)
239 .map_err(|error| StorageError::WriteFailedWithContext {
240 message: error.to_string(),
241 })?,
242 ask: row
243 .get(2)
244 .map_err(|error| StorageError::WriteFailedWithContext {
245 message: error.to_string(),
246 })?,
247 });
248 }
249 Ok(result)
250}
251
252pub fn load_derived_kline_rows_for_path(
253 db_path: &Path,
254 symbol: &str,
255 from: chrono::NaiveDate,
256 to: chrono::NaiveDate,
257) -> Result<Vec<DerivedKlineRow>, StorageError> {
258 if !db_path.exists() {
259 return Ok(Vec::new());
260 }
261 let connection = open_dataset_connection_read_only(db_path)?;
262 let from_ts = format!("{from} 00:00:00");
263 let to_ts = format!("{to} 23:59:59");
264 let mut statement = connection
265 .prepare(
266 "SELECT epoch_ms(open_time), epoch_ms(close_time), open, high, low, close, volume, quote_volume, trade_count
267 FROM derived_kline_1s
268 WHERE symbol = ? AND open_time >= CAST(? AS TIMESTAMP) AND open_time <= CAST(? AS TIMESTAMP)
269 ORDER BY open_time ASC",
270 )
271 .map_err(|error| StorageError::WriteFailedWithContext {
272 message: error.to_string(),
273 })?;
274 let mut rows = statement
275 .query(params![symbol, from_ts, to_ts])
276 .map_err(|error| StorageError::WriteFailedWithContext {
277 message: error.to_string(),
278 })?;
279 let mut result = Vec::new();
280 while let Some(row) = rows
281 .next()
282 .map_err(|error| StorageError::WriteFailedWithContext {
283 message: error.to_string(),
284 })?
285 {
286 result.push(DerivedKlineRow {
287 open_time_ms: row.get(0).map_err(storage_err)?,
288 close_time_ms: row.get(1).map_err(storage_err)?,
289 open: row.get(2).map_err(storage_err)?,
290 high: row.get(3).map_err(storage_err)?,
291 low: row.get(4).map_err(storage_err)?,
292 close: row.get(5).map_err(storage_err)?,
293 volume: row.get(6).map_err(storage_err)?,
294 quote_volume: row.get(7).map_err(storage_err)?,
295 trade_count: positive_i64_to_u64(row.get::<_, i64>(8).map_err(storage_err)?),
296 });
297 }
298 Ok(result)
299}
300
301pub fn load_raw_kline_rows_for_path(
302 db_path: &Path,
303 symbol: &str,
304 from: chrono::NaiveDate,
305 to: chrono::NaiveDate,
306) -> Result<Option<(String, Vec<DerivedKlineRow>)>, StorageError> {
307 if !db_path.exists() {
308 return Ok(None);
309 }
310 let connection = open_dataset_connection_read_only(db_path)?;
311 let from_ts = format!("{from} 00:00:00");
312 let to_ts = format!("{to} 23:59:59");
313 let interval = preferred_raw_kline_interval(&connection, symbol, &from_ts, &to_ts)?;
314 let Some(interval) = interval else {
315 return Ok(None);
316 };
317 let mut statement = connection
318 .prepare(
319 "SELECT epoch_ms(open_time), epoch_ms(close_time), open, high, low, close, volume, quote_volume, trade_count
320 FROM raw_klines
321 WHERE symbol = ? AND interval = ? AND open_time >= CAST(? AS TIMESTAMP) AND open_time <= CAST(? AS TIMESTAMP)
322 ORDER BY open_time ASC",
323 )
324 .map_err(storage_err)?;
325 let mut rows = statement
326 .query(params![symbol, interval.as_str(), from_ts, to_ts])
327 .map_err(storage_err)?;
328 let mut result = Vec::new();
329 while let Some(row) = rows.next().map_err(storage_err)? {
330 result.push(DerivedKlineRow {
331 open_time_ms: row.get(0).map_err(storage_err)?,
332 close_time_ms: row.get(1).map_err(storage_err)?,
333 open: row.get(2).map_err(storage_err)?,
334 high: row.get(3).map_err(storage_err)?,
335 low: row.get(4).map_err(storage_err)?,
336 close: row.get(5).map_err(storage_err)?,
337 volume: row.get(6).map_err(storage_err)?,
338 quote_volume: row.get(7).map_err(storage_err)?,
339 trade_count: positive_i64_to_u64(row.get::<_, i64>(8).map_err(storage_err)?),
340 });
341 }
342 Ok(Some((interval, result)))
343}
344
345fn preferred_raw_kline_interval(
346 connection: &Connection,
347 symbol: &str,
348 from_ts: &str,
349 to_ts: &str,
350) -> Result<Option<String>, StorageError> {
351 let mut statement = connection
352 .prepare(
353 "SELECT DISTINCT interval
354 FROM raw_klines
355 WHERE symbol = ? AND open_time >= CAST(? AS TIMESTAMP) AND open_time <= CAST(? AS TIMESTAMP)",
356 )
357 .map_err(storage_err)?;
358 let mut rows = statement
359 .query(params![symbol, from_ts, to_ts])
360 .map_err(storage_err)?;
361 let mut intervals = Vec::new();
362 while let Some(row) = rows.next().map_err(storage_err)? {
363 intervals.push(row.get::<_, String>(0).map_err(storage_err)?);
364 }
365 Ok(intervals
366 .into_iter()
367 .min_by_key(|interval| raw_kline_interval_rank(interval)))
368}
369
370fn raw_kline_interval_rank(interval: &str) -> usize {
371 match interval {
372 "1m" => 0,
373 "3m" => 1,
374 "5m" => 2,
375 "15m" => 3,
376 "30m" => 4,
377 "1h" => 5,
378 "4h" => 6,
379 "1d" => 7,
380 "1w" => 8,
381 "1mo" => 9,
382 _ => usize::MAX,
383 }
384}
385
386pub fn load_recorded_symbols_for_path(
387 db_path: &Path,
388 limit: usize,
389) -> Result<Vec<String>, StorageError> {
390 if !db_path.exists() {
391 return Ok(Vec::new());
392 }
393 let connection = open_dataset_connection_read_only(db_path)?;
394 let mut statement = connection
395 .prepare(
396 "SELECT symbol
397 FROM (
398 SELECT symbol FROM raw_liquidation_events
399 UNION
400 SELECT symbol FROM raw_book_ticker
401 UNION
402 SELECT symbol FROM raw_agg_trades
403 UNION
404 SELECT symbol FROM raw_klines
405 UNION
406 SELECT instrument AS symbol FROM backtest_runs
407 )
408 ORDER BY symbol ASC
409 LIMIT ?",
410 )
411 .map_err(storage_err)?;
412 let mut rows = statement
413 .query(params![limit as i64])
414 .map_err(storage_err)?;
415 let mut result = Vec::new();
416 while let Some(row) = rows.next().map_err(storage_err)? {
417 result.push(row.get(0).map_err(storage_err)?);
418 }
419 Ok(result)
420}
421
422pub fn latest_market_data_day_for_path(
423 db_path: &Path,
424 symbol: &str,
425) -> Result<Option<chrono::NaiveDate>, StorageError> {
426 if !db_path.exists() {
427 return Ok(None);
428 }
429 let connection = open_dataset_connection_read_only(db_path)?;
430 let timestamps = [
431 latest_symbol_timestamp(
432 &connection,
433 "raw_book_ticker",
434 "event_time",
435 "symbol",
436 symbol,
437 )?,
438 latest_symbol_timestamp(
439 &connection,
440 "raw_agg_trades",
441 "event_time",
442 "symbol",
443 symbol,
444 )?,
445 latest_symbol_timestamp(
446 &connection,
447 "raw_liquidation_events",
448 "event_time",
449 "symbol",
450 symbol,
451 )?,
452 latest_symbol_timestamp(&connection, "raw_klines", "open_time", "symbol", symbol)?,
453 ];
454 Ok(timestamps
455 .into_iter()
456 .flatten()
457 .max()
458 .map(|value| value.date_naive()))
459}
460
461pub fn persist_backtest_report(
462 db_path: &Path,
463 report: &BacktestReport,
464) -> Result<i64, StorageError> {
465 let connection = open_dataset_connection_read_write(db_path)?;
466 let run_id = next_backtest_run_id(&connection)?;
467 let closed_trades = report
468 .trades
469 .iter()
470 .filter(|trade| trade.net_pnl.is_some())
471 .count() as i64;
472 connection
473 .execute(
474 "INSERT INTO backtest_runs (
475 run_id, created_at, mode, template, instrument, from_date, to_date, db_path,
476 liquidation_events, book_ticker_events, agg_trade_events, derived_kline_1s_bars,
477 trigger_count, closed_trades, open_trades, wins, losses, skipped_triggers,
478 starting_equity, ending_equity, net_pnl, observed_win_rate, average_net_pnl,
479 configured_expected_value, risk_pct, win_rate_assumption, r_multiple,
480 max_entry_slippage_pct, stop_distance_pct
481 ) VALUES (
482 ?, CAST(? AS TIMESTAMP), ?, ?, ?, CAST(? AS DATE), CAST(? AS DATE), ?,
483 ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
484 )",
485 params![
486 run_id,
487 chrono::Utc::now().to_rfc3339(),
488 report.mode.as_str(),
489 report.template.slug(),
490 report.instrument,
491 report.from.to_string(),
492 report.to.to_string(),
493 report.db_path.display().to_string(),
494 report.dataset.liquidation_events as i64,
495 report.dataset.book_ticker_events as i64,
496 report.dataset.agg_trade_events as i64,
497 report.dataset.derived_kline_1s_bars as i64,
498 report.trigger_count as i64,
499 closed_trades,
500 report.open_trades as i64,
501 report.wins as i64,
502 report.losses as i64,
503 report.skipped_triggers as i64,
504 report.starting_equity,
505 report.ending_equity,
506 report.net_pnl,
507 report.observed_win_rate,
508 report.average_net_pnl,
509 report.configured_expected_value,
510 report.config.risk_pct,
511 report.config.win_rate_assumption,
512 report.config.r_multiple,
513 report.config.max_entry_slippage_pct,
514 report.config.stop_distance_pct,
515 ],
516 )
517 .map_err(storage_err)?;
518 for trade in &report.trades {
519 insert_backtest_trade(&connection, run_id, trade)?;
520 }
521 Ok(run_id)
522}
523
524pub fn load_backtest_run_summaries(
525 db_path: &Path,
526 limit: usize,
527) -> Result<Vec<BacktestRunSummaryRow>, StorageError> {
528 if !db_path.exists() {
529 return Ok(Vec::new());
530 }
531 let connection = open_dataset_connection_read_only(db_path)?;
532 let mut statement = connection
533 .prepare(
534 "SELECT run_id, CAST(created_at AS VARCHAR), mode, template, instrument,
535 CAST(from_date AS VARCHAR), CAST(to_date AS VARCHAR),
536 trigger_count, closed_trades, open_trades, wins, losses, net_pnl, ending_equity
537 FROM backtest_runs
538 ORDER BY run_id DESC
539 LIMIT ?",
540 )
541 .map_err(storage_err)?;
542 let mut rows = statement
543 .query(params![limit as i64])
544 .map_err(storage_err)?;
545 let mut result = Vec::new();
546 while let Some(row) = rows.next().map_err(storage_err)? {
547 let mode_raw: String = row.get(2).map_err(storage_err)?;
548 result.push(BacktestRunSummaryRow {
549 run_id: row.get(0).map_err(storage_err)?,
550 created_at: row.get(1).map_err(storage_err)?,
551 mode: parse_mode(&mode_raw)?,
552 template: row.get(3).map_err(storage_err)?,
553 instrument: row.get(4).map_err(storage_err)?,
554 from: row.get(5).map_err(storage_err)?,
555 to: row.get(6).map_err(storage_err)?,
556 trigger_count: positive_i64_to_u64(row.get::<_, i64>(7).map_err(storage_err)?),
557 closed_trades: positive_i64_to_u64(row.get::<_, i64>(8).map_err(storage_err)?),
558 open_trades: positive_i64_to_u64(row.get::<_, i64>(9).map_err(storage_err)?),
559 wins: positive_i64_to_u64(row.get::<_, i64>(10).map_err(storage_err)?),
560 losses: positive_i64_to_u64(row.get::<_, i64>(11).map_err(storage_err)?),
561 net_pnl: row.get(12).map_err(storage_err)?,
562 ending_equity: row.get(13).map_err(storage_err)?,
563 });
564 }
565 Ok(result)
566}
567
568pub fn load_backtest_report(
569 db_path: &Path,
570 requested_run_id: Option<i64>,
571) -> Result<Option<BacktestReport>, StorageError> {
572 if !db_path.exists() {
573 return Ok(None);
574 }
575 let connection = open_dataset_connection_read_only(db_path)?;
576 let mut statement = match requested_run_id {
577 Some(_) => connection.prepare(
578 "SELECT run_id, mode, template, instrument, CAST(from_date AS VARCHAR), CAST(to_date AS VARCHAR),
579 db_path, liquidation_events, book_ticker_events, agg_trade_events, derived_kline_1s_bars,
580 trigger_count, open_trades, wins, losses, skipped_triggers, starting_equity,
581 ending_equity, net_pnl, observed_win_rate, average_net_pnl, configured_expected_value,
582 risk_pct, win_rate_assumption, r_multiple, max_entry_slippage_pct, stop_distance_pct
583 FROM backtest_runs WHERE run_id = ?",
584 ),
585 None => connection.prepare(
586 "SELECT run_id, mode, template, instrument, CAST(from_date AS VARCHAR), CAST(to_date AS VARCHAR),
587 db_path, liquidation_events, book_ticker_events, agg_trade_events, derived_kline_1s_bars,
588 trigger_count, open_trades, wins, losses, skipped_triggers, starting_equity,
589 ending_equity, net_pnl, observed_win_rate, average_net_pnl, configured_expected_value,
590 risk_pct, win_rate_assumption, r_multiple, max_entry_slippage_pct, stop_distance_pct
591 FROM backtest_runs ORDER BY run_id DESC LIMIT 1",
592 ),
593 }
594 .map_err(storage_err)?;
595 let mut rows = match requested_run_id {
596 Some(run_id) => statement.query(params![run_id]).map_err(storage_err)?,
597 None => statement.query([]).map_err(storage_err)?,
598 };
599 let Some(row) = rows.next().map_err(storage_err)? else {
600 return Ok(None);
601 };
602 let run_id: i64 = row.get(0).map_err(storage_err)?;
603 let mode_raw: String = row.get(1).map_err(storage_err)?;
604 let template_raw: String = row.get(2).map_err(storage_err)?;
605 let from_raw: String = row.get(4).map_err(storage_err)?;
606 let to_raw: String = row.get(5).map_err(storage_err)?;
607 let trades = load_backtest_trades(&connection, run_id)?;
608 Ok(Some(BacktestReport {
609 run_id: Some(run_id),
610 template: parse_template(&template_raw)?,
611 instrument: row.get(3).map_err(storage_err)?,
612 mode: parse_mode(&mode_raw)?,
613 from: chrono::NaiveDate::parse_from_str(&from_raw, "%Y-%m-%d").map_err(|error| {
614 StorageError::WriteFailedWithContext {
615 message: error.to_string(),
616 }
617 })?,
618 to: chrono::NaiveDate::parse_from_str(&to_raw, "%Y-%m-%d").map_err(|error| {
619 StorageError::WriteFailedWithContext {
620 message: error.to_string(),
621 }
622 })?,
623 db_path: Path::new(&row.get::<_, String>(6).map_err(storage_err)?).to_path_buf(),
624 dataset: BacktestDatasetSummary {
625 mode: parse_mode(&mode_raw)?,
626 symbol: row.get(3).map_err(storage_err)?,
627 symbol_found: true,
628 from: from_raw,
629 to: to_raw,
630 liquidation_events: positive_i64_to_u64(row.get::<_, i64>(7).map_err(storage_err)?),
631 book_ticker_events: positive_i64_to_u64(row.get::<_, i64>(8).map_err(storage_err)?),
632 agg_trade_events: positive_i64_to_u64(row.get::<_, i64>(9).map_err(storage_err)?),
633 derived_kline_1s_bars: positive_i64_to_u64(row.get::<_, i64>(10).map_err(storage_err)?),
634 },
635 config: crate::backtest_app::runner::BacktestConfig {
636 starting_equity: row.get(16).map_err(storage_err)?,
637 risk_pct: row.get(22).map_err(storage_err)?,
638 win_rate_assumption: row.get(23).map_err(storage_err)?,
639 r_multiple: row.get(24).map_err(storage_err)?,
640 max_entry_slippage_pct: row.get(25).map_err(storage_err)?,
641 stop_distance_pct: row.get(26).map_err(storage_err)?,
642 ..Default::default()
643 },
644 trigger_count: positive_i64_to_u64(row.get::<_, i64>(11).map_err(storage_err)?) as usize,
645 trades,
646 wins: positive_i64_to_u64(row.get::<_, i64>(13).map_err(storage_err)?) as usize,
647 losses: positive_i64_to_u64(row.get::<_, i64>(14).map_err(storage_err)?) as usize,
648 open_trades: positive_i64_to_u64(row.get::<_, i64>(12).map_err(storage_err)?) as usize,
649 skipped_triggers: positive_i64_to_u64(row.get::<_, i64>(15).map_err(storage_err)?) as usize,
650 starting_equity: row.get(16).map_err(storage_err)?,
651 ending_equity: row.get(17).map_err(storage_err)?,
652 net_pnl: row.get(18).map_err(storage_err)?,
653 observed_win_rate: row.get(19).map_err(storage_err)?,
654 average_net_pnl: row.get(20).map_err(storage_err)?,
655 configured_expected_value: row.get(21).map_err(storage_err)?,
656 }))
657}
658
659fn query_count(connection: &Connection, table: &str) -> Result<u64, StorageError> {
660 let sql = format!("SELECT COUNT(*) FROM {table}");
661 let mut statement =
662 connection
663 .prepare(&sql)
664 .map_err(|error| StorageError::WriteFailedWithContext {
665 message: error.to_string(),
666 })?;
667 let count: i64 = statement.query_row([], |row| row.get(0)).map_err(|error| {
668 StorageError::WriteFailedWithContext {
669 message: error.to_string(),
670 }
671 })?;
672 Ok(count.max(0) as u64)
673}
674
675fn next_backtest_run_id(connection: &Connection) -> Result<i64, StorageError> {
676 let mut statement = connection
677 .prepare("SELECT COALESCE(MAX(run_id), 0) + 1 FROM backtest_runs")
678 .map_err(storage_err)?;
679 statement
680 .query_row([], |row| row.get(0))
681 .map_err(storage_err)
682}
683
684fn insert_backtest_trade(
685 connection: &Connection,
686 run_id: i64,
687 trade: &BacktestTrade,
688) -> Result<(), StorageError> {
689 connection
690 .execute(
691 "INSERT INTO backtest_trades (
692 run_id, trade_id, trigger_time, entry_time, entry_price, stop_price,
693 take_profit_price, qty, exit_time, exit_price, exit_reason, gross_pnl, fees, net_pnl
694 ) VALUES (
695 ?, ?, CAST(? AS TIMESTAMP), CAST(? AS TIMESTAMP), ?, ?, ?, ?, CAST(? AS TIMESTAMP), ?, ?, ?, ?, ?
696 )",
697 params![
698 run_id,
699 trade.trade_id as i64,
700 trade.trigger_time.to_rfc3339(),
701 trade.entry_time.to_rfc3339(),
702 trade.entry_price,
703 trade.stop_price,
704 trade.take_profit_price,
705 trade.qty,
706 trade.exit_time.map(|value| value.to_rfc3339()),
707 trade.exit_price,
708 trade.exit_reason.as_ref().map(|reason| reason.as_str()),
709 trade.gross_pnl,
710 trade.fees,
711 trade.net_pnl,
712 ],
713 )
714 .map_err(storage_err)?;
715 Ok(())
716}
717
718fn load_backtest_trades(
719 connection: &Connection,
720 run_id: i64,
721) -> Result<Vec<BacktestTrade>, StorageError> {
722 let mut statement = connection
723 .prepare(
724 "SELECT trade_id, CAST(trigger_time AS VARCHAR), CAST(entry_time AS VARCHAR), entry_price, stop_price,
725 take_profit_price, qty, CAST(exit_time AS VARCHAR), exit_price, exit_reason, gross_pnl, fees, net_pnl
726 FROM backtest_trades
727 WHERE run_id = ?
728 ORDER BY trade_id ASC",
729 )
730 .map_err(storage_err)?;
731 let mut rows = statement.query(params![run_id]).map_err(storage_err)?;
732 let mut result = Vec::new();
733 while let Some(row) = rows.next().map_err(storage_err)? {
734 let trigger_time_raw: String = row.get(1).map_err(storage_err)?;
735 let entry_time_raw: String = row.get(2).map_err(storage_err)?;
736 let exit_reason_raw: Option<String> = row.get(9).map_err(storage_err)?;
737 result.push(BacktestTrade {
738 trade_id: positive_i64_to_u64(row.get::<_, i64>(0).map_err(storage_err)?) as usize,
739 trigger_time: parse_timestamp_string(&trigger_time_raw)?,
740 entry_time: parse_timestamp_string(&entry_time_raw)?,
741 entry_price: row.get(3).map_err(storage_err)?,
742 stop_price: row.get(4).map_err(storage_err)?,
743 take_profit_price: row.get(5).map_err(storage_err)?,
744 qty: row.get(6).map_err(storage_err)?,
745 exit_time: row
746 .get::<_, Option<String>>(7)
747 .map_err(storage_err)?
748 .map(|value| parse_timestamp_string(&value))
749 .transpose()?,
750 exit_price: row.get(8).map_err(storage_err)?,
751 exit_reason: exit_reason_raw
752 .map(|value| parse_exit_reason(&value))
753 .transpose()?,
754 gross_pnl: row.get(10).map_err(storage_err)?,
755 fees: row.get(11).map_err(storage_err)?,
756 net_pnl: row.get(12).map_err(storage_err)?,
757 });
758 }
759 Ok(result)
760}
761
762fn parse_mode(raw: &str) -> Result<BinanceMode, StorageError> {
763 match raw {
764 "demo" => Ok(BinanceMode::Demo),
765 "real" => Ok(BinanceMode::Real),
766 other => Err(StorageError::WriteFailedWithContext {
767 message: format!("unsupported mode in backtest row: {other}"),
768 }),
769 }
770}
771
772fn parse_template(raw: &str) -> Result<StrategyTemplate, StorageError> {
773 match raw {
774 "liquidation-breakdown-short" => Ok(StrategyTemplate::LiquidationBreakdownShort),
775 "price-sma-cross-long" => Ok(StrategyTemplate::PriceSmaCrossLong),
776 other => Err(StorageError::WriteFailedWithContext {
777 message: format!("unsupported backtest template: {other}"),
778 }),
779 }
780}
781
782fn parse_exit_reason(raw: &str) -> Result<BacktestExitReason, StorageError> {
783 match raw {
784 "take_profit" => Ok(BacktestExitReason::TakeProfit),
785 "stop_loss" => Ok(BacktestExitReason::StopLoss),
786 "open_at_end" => Ok(BacktestExitReason::OpenAtEnd),
787 "signal_exit" => Ok(BacktestExitReason::SignalExit),
788 other => Err(StorageError::WriteFailedWithContext {
789 message: format!("unsupported backtest exit reason: {other}"),
790 }),
791 }
792}
793
794fn positive_i64_to_u64(value: i64) -> u64 {
795 value.max(0) as u64
796}
797
798fn storage_err(error: duckdb::Error) -> StorageError {
799 StorageError::WriteFailedWithContext {
800 message: error.to_string(),
801 }
802}
803
804fn parse_timestamp_string(value: &str) -> Result<chrono::DateTime<chrono::Utc>, StorageError> {
805 if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(value) {
806 return Ok(parsed.with_timezone(&chrono::Utc));
807 }
808 let naive =
809 chrono::NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S%.f").map_err(|error| {
810 StorageError::WriteFailedWithContext {
811 message: error.to_string(),
812 }
813 })?;
814 Ok(chrono::DateTime::<chrono::Utc>::from_naive_utc_and_offset(
815 naive,
816 chrono::Utc,
817 ))
818}
819
820fn query_schema_version(connection: &Connection) -> Result<Option<String>, StorageError> {
821 let mut statement = connection
822 .prepare("SELECT value FROM schema_metadata WHERE key = 'market_data_schema_version'")
823 .map_err(|error| StorageError::WriteFailedWithContext {
824 message: error.to_string(),
825 })?;
826 let value: Option<String> = statement.query_row([], |row| row.get(0)).map_err(|error| {
827 StorageError::WriteFailedWithContext {
828 message: error.to_string(),
829 }
830 })?;
831 Ok(value)
832}
833
834fn query_latest_timestamp(
835 connection: &Connection,
836 table: &str,
837 column: &str,
838) -> Result<Option<String>, StorageError> {
839 let sql = format!("SELECT CAST(MAX({column}) AS VARCHAR) FROM {table}");
840 let mut statement =
841 connection
842 .prepare(&sql)
843 .map_err(|error| StorageError::WriteFailedWithContext {
844 message: error.to_string(),
845 })?;
846 let value: Option<String> = statement.query_row([], |row| row.get(0)).map_err(|error| {
847 StorageError::WriteFailedWithContext {
848 message: error.to_string(),
849 }
850 })?;
851 Ok(value)
852}
853
854fn query_top_symbols(connection: &Connection, table: &str) -> Result<Vec<String>, StorageError> {
855 let sql = format!(
856 "SELECT symbol, COUNT(*) AS row_count FROM {table} GROUP BY symbol ORDER BY row_count DESC, symbol ASC LIMIT 5"
857 );
858 let mut statement =
859 connection
860 .prepare(&sql)
861 .map_err(|error| StorageError::WriteFailedWithContext {
862 message: error.to_string(),
863 })?;
864 let mut rows = statement
865 .query([])
866 .map_err(|error| StorageError::WriteFailedWithContext {
867 message: error.to_string(),
868 })?;
869 let mut result = Vec::new();
870 while let Some(row) = rows
871 .next()
872 .map_err(|error| StorageError::WriteFailedWithContext {
873 message: error.to_string(),
874 })?
875 {
876 let symbol: String = row
877 .get(0)
878 .map_err(|error| StorageError::WriteFailedWithContext {
879 message: error.to_string(),
880 })?;
881 let row_count: i64 = row
882 .get(1)
883 .map_err(|error| StorageError::WriteFailedWithContext {
884 message: error.to_string(),
885 })?;
886 result.push(format!("{symbol}:{row_count}"));
887 }
888 Ok(result)
889}
890
891fn latest_symbol_timestamp(
892 connection: &Connection,
893 table: &str,
894 time_column: &str,
895 symbol_column: &str,
896 symbol: &str,
897) -> Result<Option<chrono::DateTime<chrono::Utc>>, StorageError> {
898 let sql = format!(
899 "SELECT CAST(MAX({time_column}) AS VARCHAR) FROM {table} WHERE {symbol_column} = ?"
900 );
901 let mut statement = connection.prepare(&sql).map_err(storage_err)?;
902 let value: Option<String> = statement
903 .query_row(params![symbol], |row| row.get(0))
904 .map_err(storage_err)?;
905 value.map(|raw| parse_timestamp_string(&raw)).transpose()
906}
907
908fn query_count_in_range(
909 connection: &Connection,
910 table: &str,
911 time_column: &str,
912 symbol: &str,
913 from_ts: &str,
914 to_ts: &str,
915) -> Result<u64, StorageError> {
916 let sql = format!(
917 "SELECT COUNT(*) FROM {table} WHERE symbol = ? AND {time_column} >= CAST(? AS TIMESTAMP) AND {time_column} <= CAST(? AS TIMESTAMP)"
918 );
919 let mut statement =
920 connection
921 .prepare(&sql)
922 .map_err(|error| StorageError::WriteFailedWithContext {
923 message: error.to_string(),
924 })?;
925 let count: i64 = statement
926 .query_row(params![symbol, from_ts, to_ts], |row| row.get(0))
927 .map_err(|error| StorageError::WriteFailedWithContext {
928 message: error.to_string(),
929 })?;
930 Ok(count.max(0) as u64)
931}
932
933fn market_data_symbol_exists(connection: &Connection, symbol: &str) -> Result<bool, StorageError> {
934 let mut statement = connection
935 .prepare(
936 "SELECT EXISTS(
937 SELECT 1 FROM (
938 SELECT symbol FROM raw_liquidation_events WHERE symbol = ?
939 UNION
940 SELECT symbol FROM raw_book_ticker WHERE symbol = ?
941 UNION
942 SELECT symbol FROM raw_agg_trades WHERE symbol = ?
943 UNION
944 SELECT symbol FROM raw_klines WHERE symbol = ?
945 UNION
946 SELECT symbol FROM derived_kline_1s WHERE symbol = ?
947 )
948 )",
949 )
950 .map_err(storage_err)?;
951 let exists = statement
952 .query_row(params![symbol, symbol, symbol, symbol, symbol], |row| {
953 row.get::<_, bool>(0)
954 })
955 .map_err(storage_err)?;
956 Ok(exists)
957}
958
959#[cfg(test)]
960mod tests {
961 use super::*;
962 use crate::dataset::schema::init_schema_for_path;
963
964 #[test]
965 fn read_only_backtest_queries_work_while_write_connection_is_held() {
966 let db_path = std::env::temp_dir().join(format!(
967 "sandbox-quant-query-{}.duckdb",
968 uuid::Uuid::new_v4()
969 ));
970 init_schema_for_path(&db_path).expect("init schema");
971
972 let write_connection =
973 open_dataset_connection_read_write(&db_path).expect("write connection");
974
975 let runs =
976 load_backtest_run_summaries(&db_path, 20).expect("read summaries with writer held");
977 let report =
978 load_backtest_report(&db_path, None).expect("read latest report with writer held");
979
980 assert!(runs.is_empty());
981 assert!(report.is_none());
982
983 drop(write_connection);
984 let _ = std::fs::remove_file(&db_path);
985 }
986}