Skip to main content

sandbox_quant/dataset/
query.rs

1use std::path::Path;
2
3use duckdb::{params, AccessMode, Config, Connection};
4
5use crate::app::bootstrap::BinanceMode;
6use crate::backtest_app::runner::{BacktestExitReason, BacktestReport, BacktestTrade};
7use crate::dataset::types::{
8    BacktestDatasetSummary, BacktestRunSummaryRow, BookTickerRow, DerivedKlineRow,
9    LiquidationEventRow, RecorderMetrics,
10};
11use crate::error::storage_error::StorageError;
12use crate::strategy::model::StrategyTemplate;
13
14fn open_dataset_connection_read_only(db_path: &Path) -> Result<Connection, StorageError> {
15    let config = Config::default()
16        .access_mode(AccessMode::ReadOnly)
17        .map_err(storage_err)?;
18    Connection::open_with_flags(db_path, config).map_err(|error| StorageError::DatabaseInitFailed {
19        path: db_path.display().to_string(),
20        message: error.to_string(),
21    })
22}
23
24fn open_dataset_connection_read_write(db_path: &Path) -> Result<Connection, StorageError> {
25    let config = Config::default()
26        .access_mode(AccessMode::ReadWrite)
27        .map_err(storage_err)?;
28    Connection::open_with_flags(db_path, config).map_err(|error| StorageError::DatabaseInitFailed {
29        path: db_path.display().to_string(),
30        message: error.to_string(),
31    })
32}
33
34pub fn metrics_for_path(db_path: &Path) -> Result<RecorderMetrics, StorageError> {
35    if !db_path.exists() {
36        return Ok(RecorderMetrics::default());
37    }
38    let connection = open_dataset_connection_read_only(db_path)?;
39
40    Ok(RecorderMetrics {
41        liquidation_events: query_count(&connection, "raw_liquidation_events")?,
42        book_ticker_events: query_count(&connection, "raw_book_ticker")?,
43        agg_trade_events: query_count(&connection, "raw_agg_trades")?,
44        derived_kline_1s_bars: query_count(&connection, "derived_kline_1s")?,
45        schema_version: query_schema_version(&connection)?,
46        last_liquidation_event_time: query_latest_timestamp(
47            &connection,
48            "raw_liquidation_events",
49            "event_time",
50        )?,
51        last_book_ticker_event_time: query_latest_timestamp(
52            &connection,
53            "raw_book_ticker",
54            "event_time",
55        )?,
56        last_agg_trade_event_time: query_latest_timestamp(
57            &connection,
58            "raw_agg_trades",
59            "event_time",
60        )?,
61        top_liquidation_symbols: query_top_symbols(&connection, "raw_liquidation_events")?,
62        top_book_ticker_symbols: query_top_symbols(&connection, "raw_book_ticker")?,
63        top_agg_trade_symbols: query_top_symbols(&connection, "raw_agg_trades")?,
64    })
65}
66
67pub fn backtest_summary_for_path(
68    db_path: &Path,
69    mode: BinanceMode,
70    symbol: &str,
71    from: chrono::NaiveDate,
72    to: chrono::NaiveDate,
73) -> Result<BacktestDatasetSummary, StorageError> {
74    if !db_path.exists() {
75        return Ok(BacktestDatasetSummary {
76            mode,
77            symbol: symbol.to_string(),
78            symbol_found: false,
79            from: from.to_string(),
80            to: to.to_string(),
81            liquidation_events: 0,
82            book_ticker_events: 0,
83            agg_trade_events: 0,
84            derived_kline_1s_bars: 0,
85        });
86    }
87    let connection = open_dataset_connection_read_only(db_path)?;
88    let symbol_found = market_data_symbol_exists(&connection, symbol)?;
89    let from_ts = format!("{from} 00:00:00");
90    let to_ts = format!("{to} 23:59:59");
91    Ok(BacktestDatasetSummary {
92        mode,
93        symbol: symbol.to_string(),
94        symbol_found,
95        from: from.to_string(),
96        to: to.to_string(),
97        liquidation_events: query_count_in_range(
98            &connection,
99            "raw_liquidation_events",
100            "event_time",
101            symbol,
102            &from_ts,
103            &to_ts,
104        )?,
105        book_ticker_events: query_count_in_range(
106            &connection,
107            "raw_book_ticker",
108            "event_time",
109            symbol,
110            &from_ts,
111            &to_ts,
112        )?,
113        agg_trade_events: query_count_in_range(
114            &connection,
115            "raw_agg_trades",
116            "event_time",
117            symbol,
118            &from_ts,
119            &to_ts,
120        )?,
121        derived_kline_1s_bars: query_count_in_range(
122            &connection,
123            "derived_kline_1s",
124            "open_time",
125            symbol,
126            &from_ts,
127            &to_ts,
128        )?,
129    })
130}
131
132pub fn load_liquidation_events_for_path(
133    db_path: &Path,
134    symbol: &str,
135    from: chrono::NaiveDate,
136    to: chrono::NaiveDate,
137) -> Result<Vec<LiquidationEventRow>, StorageError> {
138    if !db_path.exists() {
139        return Ok(Vec::new());
140    }
141    let connection = open_dataset_connection_read_only(db_path)?;
142    let from_ts = format!("{from} 00:00:00");
143    let to_ts = format!("{to} 23:59:59");
144    let mut statement = connection
145        .prepare(
146            "SELECT epoch_ms(event_time), force_side, price, qty, notional
147             FROM raw_liquidation_events
148             WHERE symbol = ? AND event_time >= CAST(? AS TIMESTAMP) AND event_time <= CAST(? AS TIMESTAMP)
149             ORDER BY event_time ASC",
150        )
151        .map_err(|error| StorageError::WriteFailedWithContext {
152            message: error.to_string(),
153        })?;
154    let mut rows = statement
155        .query(params![symbol, from_ts, to_ts])
156        .map_err(|error| StorageError::WriteFailedWithContext {
157            message: error.to_string(),
158        })?;
159    let mut result = Vec::new();
160    while let Some(row) = rows
161        .next()
162        .map_err(|error| StorageError::WriteFailedWithContext {
163            message: error.to_string(),
164        })?
165    {
166        result.push(LiquidationEventRow {
167            event_time_ms: row
168                .get(0)
169                .map_err(|error| StorageError::WriteFailedWithContext {
170                    message: error.to_string(),
171                })?,
172            force_side: row
173                .get(1)
174                .map_err(|error| StorageError::WriteFailedWithContext {
175                    message: error.to_string(),
176                })?,
177            price: row
178                .get(2)
179                .map_err(|error| StorageError::WriteFailedWithContext {
180                    message: error.to_string(),
181                })?,
182            qty: row
183                .get(3)
184                .map_err(|error| StorageError::WriteFailedWithContext {
185                    message: error.to_string(),
186                })?,
187            notional: row
188                .get(4)
189                .map_err(|error| StorageError::WriteFailedWithContext {
190                    message: error.to_string(),
191                })?,
192        });
193    }
194    Ok(result)
195}
196
197pub fn load_book_ticker_rows_for_path(
198    db_path: &Path,
199    symbol: &str,
200    from: chrono::NaiveDate,
201    to: chrono::NaiveDate,
202) -> Result<Vec<BookTickerRow>, StorageError> {
203    if !db_path.exists() {
204        return Ok(Vec::new());
205    }
206    let connection = open_dataset_connection_read_only(db_path)?;
207    let from_ts = format!("{from} 00:00:00");
208    let to_ts = format!("{to} 23:59:59");
209    let mut statement = connection
210        .prepare(
211            "SELECT epoch_ms(event_time), bid, ask
212             FROM raw_book_ticker
213             WHERE symbol = ? AND event_time >= CAST(? AS TIMESTAMP) AND event_time <= CAST(? AS TIMESTAMP)
214             ORDER BY event_time ASC",
215        )
216        .map_err(|error| StorageError::WriteFailedWithContext {
217            message: error.to_string(),
218        })?;
219    let mut rows = statement
220        .query(params![symbol, from_ts, to_ts])
221        .map_err(|error| StorageError::WriteFailedWithContext {
222            message: error.to_string(),
223        })?;
224    let mut result = Vec::new();
225    while let Some(row) = rows
226        .next()
227        .map_err(|error| StorageError::WriteFailedWithContext {
228            message: error.to_string(),
229        })?
230    {
231        result.push(BookTickerRow {
232            event_time_ms: row
233                .get(0)
234                .map_err(|error| StorageError::WriteFailedWithContext {
235                    message: error.to_string(),
236                })?,
237            bid: row
238                .get(1)
239                .map_err(|error| StorageError::WriteFailedWithContext {
240                    message: error.to_string(),
241                })?,
242            ask: row
243                .get(2)
244                .map_err(|error| StorageError::WriteFailedWithContext {
245                    message: error.to_string(),
246                })?,
247        });
248    }
249    Ok(result)
250}
251
252pub fn load_derived_kline_rows_for_path(
253    db_path: &Path,
254    symbol: &str,
255    from: chrono::NaiveDate,
256    to: chrono::NaiveDate,
257) -> Result<Vec<DerivedKlineRow>, StorageError> {
258    if !db_path.exists() {
259        return Ok(Vec::new());
260    }
261    let connection = open_dataset_connection_read_only(db_path)?;
262    let from_ts = format!("{from} 00:00:00");
263    let to_ts = format!("{to} 23:59:59");
264    let mut statement = connection
265        .prepare(
266            "SELECT epoch_ms(open_time), epoch_ms(close_time), open, high, low, close, volume, quote_volume, trade_count
267             FROM derived_kline_1s
268             WHERE symbol = ? AND open_time >= CAST(? AS TIMESTAMP) AND open_time <= CAST(? AS TIMESTAMP)
269             ORDER BY open_time ASC",
270        )
271        .map_err(|error| StorageError::WriteFailedWithContext {
272            message: error.to_string(),
273        })?;
274    let mut rows = statement
275        .query(params![symbol, from_ts, to_ts])
276        .map_err(|error| StorageError::WriteFailedWithContext {
277            message: error.to_string(),
278        })?;
279    let mut result = Vec::new();
280    while let Some(row) = rows
281        .next()
282        .map_err(|error| StorageError::WriteFailedWithContext {
283            message: error.to_string(),
284        })?
285    {
286        result.push(DerivedKlineRow {
287            open_time_ms: row.get(0).map_err(storage_err)?,
288            close_time_ms: row.get(1).map_err(storage_err)?,
289            open: row.get(2).map_err(storage_err)?,
290            high: row.get(3).map_err(storage_err)?,
291            low: row.get(4).map_err(storage_err)?,
292            close: row.get(5).map_err(storage_err)?,
293            volume: row.get(6).map_err(storage_err)?,
294            quote_volume: row.get(7).map_err(storage_err)?,
295            trade_count: positive_i64_to_u64(row.get::<_, i64>(8).map_err(storage_err)?),
296        });
297    }
298    Ok(result)
299}
300
301pub fn load_raw_kline_rows_for_path(
302    db_path: &Path,
303    symbol: &str,
304    from: chrono::NaiveDate,
305    to: chrono::NaiveDate,
306) -> Result<Option<(String, Vec<DerivedKlineRow>)>, StorageError> {
307    if !db_path.exists() {
308        return Ok(None);
309    }
310    let connection = open_dataset_connection_read_only(db_path)?;
311    let from_ts = format!("{from} 00:00:00");
312    let to_ts = format!("{to} 23:59:59");
313    let interval = preferred_raw_kline_interval(&connection, symbol, &from_ts, &to_ts)?;
314    let Some(interval) = interval else {
315        return Ok(None);
316    };
317    let mut statement = connection
318        .prepare(
319            "SELECT epoch_ms(open_time), epoch_ms(close_time), open, high, low, close, volume, quote_volume, trade_count
320             FROM raw_klines
321             WHERE symbol = ? AND interval = ? AND open_time >= CAST(? AS TIMESTAMP) AND open_time <= CAST(? AS TIMESTAMP)
322             ORDER BY open_time ASC",
323        )
324        .map_err(storage_err)?;
325    let mut rows = statement
326        .query(params![symbol, interval.as_str(), from_ts, to_ts])
327        .map_err(storage_err)?;
328    let mut result = Vec::new();
329    while let Some(row) = rows.next().map_err(storage_err)? {
330        result.push(DerivedKlineRow {
331            open_time_ms: row.get(0).map_err(storage_err)?,
332            close_time_ms: row.get(1).map_err(storage_err)?,
333            open: row.get(2).map_err(storage_err)?,
334            high: row.get(3).map_err(storage_err)?,
335            low: row.get(4).map_err(storage_err)?,
336            close: row.get(5).map_err(storage_err)?,
337            volume: row.get(6).map_err(storage_err)?,
338            quote_volume: row.get(7).map_err(storage_err)?,
339            trade_count: positive_i64_to_u64(row.get::<_, i64>(8).map_err(storage_err)?),
340        });
341    }
342    Ok(Some((interval, result)))
343}
344
345fn preferred_raw_kline_interval(
346    connection: &Connection,
347    symbol: &str,
348    from_ts: &str,
349    to_ts: &str,
350) -> Result<Option<String>, StorageError> {
351    let mut statement = connection
352        .prepare(
353            "SELECT DISTINCT interval
354             FROM raw_klines
355             WHERE symbol = ? AND open_time >= CAST(? AS TIMESTAMP) AND open_time <= CAST(? AS TIMESTAMP)",
356        )
357        .map_err(storage_err)?;
358    let mut rows = statement
359        .query(params![symbol, from_ts, to_ts])
360        .map_err(storage_err)?;
361    let mut intervals = Vec::new();
362    while let Some(row) = rows.next().map_err(storage_err)? {
363        intervals.push(row.get::<_, String>(0).map_err(storage_err)?);
364    }
365    Ok(intervals
366        .into_iter()
367        .min_by_key(|interval| raw_kline_interval_rank(interval)))
368}
369
370fn raw_kline_interval_rank(interval: &str) -> usize {
371    match interval {
372        "1m" => 0,
373        "3m" => 1,
374        "5m" => 2,
375        "15m" => 3,
376        "30m" => 4,
377        "1h" => 5,
378        "4h" => 6,
379        "1d" => 7,
380        "1w" => 8,
381        "1mo" => 9,
382        _ => usize::MAX,
383    }
384}
385
386pub fn load_recorded_symbols_for_path(
387    db_path: &Path,
388    limit: usize,
389) -> Result<Vec<String>, StorageError> {
390    if !db_path.exists() {
391        return Ok(Vec::new());
392    }
393    let connection = open_dataset_connection_read_only(db_path)?;
394    let mut statement = connection
395        .prepare(
396            "SELECT symbol
397             FROM (
398                SELECT symbol FROM raw_liquidation_events
399                UNION
400                SELECT symbol FROM raw_book_ticker
401                UNION
402                SELECT symbol FROM raw_agg_trades
403                UNION
404                SELECT symbol FROM raw_klines
405                UNION
406                SELECT instrument AS symbol FROM backtest_runs
407             )
408             ORDER BY symbol ASC
409             LIMIT ?",
410        )
411        .map_err(storage_err)?;
412    let mut rows = statement
413        .query(params![limit as i64])
414        .map_err(storage_err)?;
415    let mut result = Vec::new();
416    while let Some(row) = rows.next().map_err(storage_err)? {
417        result.push(row.get(0).map_err(storage_err)?);
418    }
419    Ok(result)
420}
421
422pub fn latest_market_data_day_for_path(
423    db_path: &Path,
424    symbol: &str,
425) -> Result<Option<chrono::NaiveDate>, StorageError> {
426    if !db_path.exists() {
427        return Ok(None);
428    }
429    let connection = open_dataset_connection_read_only(db_path)?;
430    let timestamps = [
431        latest_symbol_timestamp(
432            &connection,
433            "raw_book_ticker",
434            "event_time",
435            "symbol",
436            symbol,
437        )?,
438        latest_symbol_timestamp(
439            &connection,
440            "raw_agg_trades",
441            "event_time",
442            "symbol",
443            symbol,
444        )?,
445        latest_symbol_timestamp(
446            &connection,
447            "raw_liquidation_events",
448            "event_time",
449            "symbol",
450            symbol,
451        )?,
452        latest_symbol_timestamp(&connection, "raw_klines", "open_time", "symbol", symbol)?,
453    ];
454    Ok(timestamps
455        .into_iter()
456        .flatten()
457        .max()
458        .map(|value| value.date_naive()))
459}
460
461pub fn persist_backtest_report(
462    db_path: &Path,
463    report: &BacktestReport,
464) -> Result<i64, StorageError> {
465    let connection = open_dataset_connection_read_write(db_path)?;
466    let run_id = next_backtest_run_id(&connection)?;
467    let closed_trades = report
468        .trades
469        .iter()
470        .filter(|trade| trade.net_pnl.is_some())
471        .count() as i64;
472    connection
473        .execute(
474            "INSERT INTO backtest_runs (
475                run_id, created_at, mode, template, instrument, from_date, to_date, db_path,
476                liquidation_events, book_ticker_events, agg_trade_events, derived_kline_1s_bars,
477                trigger_count, closed_trades, open_trades, wins, losses, skipped_triggers,
478                starting_equity, ending_equity, net_pnl, observed_win_rate, average_net_pnl,
479                configured_expected_value, risk_pct, win_rate_assumption, r_multiple,
480                max_entry_slippage_pct, stop_distance_pct
481             ) VALUES (
482                ?, CAST(? AS TIMESTAMP), ?, ?, ?, CAST(? AS DATE), CAST(? AS DATE), ?,
483                ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
484             )",
485            params![
486                run_id,
487                chrono::Utc::now().to_rfc3339(),
488                report.mode.as_str(),
489                report.template.slug(),
490                report.instrument,
491                report.from.to_string(),
492                report.to.to_string(),
493                report.db_path.display().to_string(),
494                report.dataset.liquidation_events as i64,
495                report.dataset.book_ticker_events as i64,
496                report.dataset.agg_trade_events as i64,
497                report.dataset.derived_kline_1s_bars as i64,
498                report.trigger_count as i64,
499                closed_trades,
500                report.open_trades as i64,
501                report.wins as i64,
502                report.losses as i64,
503                report.skipped_triggers as i64,
504                report.starting_equity,
505                report.ending_equity,
506                report.net_pnl,
507                report.observed_win_rate,
508                report.average_net_pnl,
509                report.configured_expected_value,
510                report.config.risk_pct,
511                report.config.win_rate_assumption,
512                report.config.r_multiple,
513                report.config.max_entry_slippage_pct,
514                report.config.stop_distance_pct,
515            ],
516        )
517        .map_err(storage_err)?;
518    for trade in &report.trades {
519        insert_backtest_trade(&connection, run_id, trade)?;
520    }
521    Ok(run_id)
522}
523
524pub fn load_backtest_run_summaries(
525    db_path: &Path,
526    limit: usize,
527) -> Result<Vec<BacktestRunSummaryRow>, StorageError> {
528    if !db_path.exists() {
529        return Ok(Vec::new());
530    }
531    let connection = open_dataset_connection_read_only(db_path)?;
532    let mut statement = connection
533        .prepare(
534            "SELECT run_id, CAST(created_at AS VARCHAR), mode, template, instrument,
535                    CAST(from_date AS VARCHAR), CAST(to_date AS VARCHAR),
536                    trigger_count, closed_trades, open_trades, wins, losses, net_pnl, ending_equity
537             FROM backtest_runs
538             ORDER BY run_id DESC
539             LIMIT ?",
540        )
541        .map_err(storage_err)?;
542    let mut rows = statement
543        .query(params![limit as i64])
544        .map_err(storage_err)?;
545    let mut result = Vec::new();
546    while let Some(row) = rows.next().map_err(storage_err)? {
547        let mode_raw: String = row.get(2).map_err(storage_err)?;
548        result.push(BacktestRunSummaryRow {
549            run_id: row.get(0).map_err(storage_err)?,
550            created_at: row.get(1).map_err(storage_err)?,
551            mode: parse_mode(&mode_raw)?,
552            template: row.get(3).map_err(storage_err)?,
553            instrument: row.get(4).map_err(storage_err)?,
554            from: row.get(5).map_err(storage_err)?,
555            to: row.get(6).map_err(storage_err)?,
556            trigger_count: positive_i64_to_u64(row.get::<_, i64>(7).map_err(storage_err)?),
557            closed_trades: positive_i64_to_u64(row.get::<_, i64>(8).map_err(storage_err)?),
558            open_trades: positive_i64_to_u64(row.get::<_, i64>(9).map_err(storage_err)?),
559            wins: positive_i64_to_u64(row.get::<_, i64>(10).map_err(storage_err)?),
560            losses: positive_i64_to_u64(row.get::<_, i64>(11).map_err(storage_err)?),
561            net_pnl: row.get(12).map_err(storage_err)?,
562            ending_equity: row.get(13).map_err(storage_err)?,
563        });
564    }
565    Ok(result)
566}
567
568pub fn load_backtest_report(
569    db_path: &Path,
570    requested_run_id: Option<i64>,
571) -> Result<Option<BacktestReport>, StorageError> {
572    if !db_path.exists() {
573        return Ok(None);
574    }
575    let connection = open_dataset_connection_read_only(db_path)?;
576    let mut statement = match requested_run_id {
577        Some(_) => connection.prepare(
578            "SELECT run_id, mode, template, instrument, CAST(from_date AS VARCHAR), CAST(to_date AS VARCHAR),
579                    db_path, liquidation_events, book_ticker_events, agg_trade_events, derived_kline_1s_bars,
580                    trigger_count, open_trades, wins, losses, skipped_triggers, starting_equity,
581                    ending_equity, net_pnl, observed_win_rate, average_net_pnl, configured_expected_value,
582                    risk_pct, win_rate_assumption, r_multiple, max_entry_slippage_pct, stop_distance_pct
583             FROM backtest_runs WHERE run_id = ?",
584        ),
585        None => connection.prepare(
586            "SELECT run_id, mode, template, instrument, CAST(from_date AS VARCHAR), CAST(to_date AS VARCHAR),
587                    db_path, liquidation_events, book_ticker_events, agg_trade_events, derived_kline_1s_bars,
588                    trigger_count, open_trades, wins, losses, skipped_triggers, starting_equity,
589                    ending_equity, net_pnl, observed_win_rate, average_net_pnl, configured_expected_value,
590                    risk_pct, win_rate_assumption, r_multiple, max_entry_slippage_pct, stop_distance_pct
591             FROM backtest_runs ORDER BY run_id DESC LIMIT 1",
592        ),
593    }
594    .map_err(storage_err)?;
595    let mut rows = match requested_run_id {
596        Some(run_id) => statement.query(params![run_id]).map_err(storage_err)?,
597        None => statement.query([]).map_err(storage_err)?,
598    };
599    let Some(row) = rows.next().map_err(storage_err)? else {
600        return Ok(None);
601    };
602    let run_id: i64 = row.get(0).map_err(storage_err)?;
603    let mode_raw: String = row.get(1).map_err(storage_err)?;
604    let template_raw: String = row.get(2).map_err(storage_err)?;
605    let from_raw: String = row.get(4).map_err(storage_err)?;
606    let to_raw: String = row.get(5).map_err(storage_err)?;
607    let trades = load_backtest_trades(&connection, run_id)?;
608    Ok(Some(BacktestReport {
609        run_id: Some(run_id),
610        template: parse_template(&template_raw)?,
611        instrument: row.get(3).map_err(storage_err)?,
612        mode: parse_mode(&mode_raw)?,
613        from: chrono::NaiveDate::parse_from_str(&from_raw, "%Y-%m-%d").map_err(|error| {
614            StorageError::WriteFailedWithContext {
615                message: error.to_string(),
616            }
617        })?,
618        to: chrono::NaiveDate::parse_from_str(&to_raw, "%Y-%m-%d").map_err(|error| {
619            StorageError::WriteFailedWithContext {
620                message: error.to_string(),
621            }
622        })?,
623        db_path: Path::new(&row.get::<_, String>(6).map_err(storage_err)?).to_path_buf(),
624        dataset: BacktestDatasetSummary {
625            mode: parse_mode(&mode_raw)?,
626            symbol: row.get(3).map_err(storage_err)?,
627            symbol_found: true,
628            from: from_raw,
629            to: to_raw,
630            liquidation_events: positive_i64_to_u64(row.get::<_, i64>(7).map_err(storage_err)?),
631            book_ticker_events: positive_i64_to_u64(row.get::<_, i64>(8).map_err(storage_err)?),
632            agg_trade_events: positive_i64_to_u64(row.get::<_, i64>(9).map_err(storage_err)?),
633            derived_kline_1s_bars: positive_i64_to_u64(row.get::<_, i64>(10).map_err(storage_err)?),
634        },
635        config: crate::backtest_app::runner::BacktestConfig {
636            starting_equity: row.get(16).map_err(storage_err)?,
637            risk_pct: row.get(22).map_err(storage_err)?,
638            win_rate_assumption: row.get(23).map_err(storage_err)?,
639            r_multiple: row.get(24).map_err(storage_err)?,
640            max_entry_slippage_pct: row.get(25).map_err(storage_err)?,
641            stop_distance_pct: row.get(26).map_err(storage_err)?,
642            ..Default::default()
643        },
644        trigger_count: positive_i64_to_u64(row.get::<_, i64>(11).map_err(storage_err)?) as usize,
645        trades,
646        wins: positive_i64_to_u64(row.get::<_, i64>(13).map_err(storage_err)?) as usize,
647        losses: positive_i64_to_u64(row.get::<_, i64>(14).map_err(storage_err)?) as usize,
648        open_trades: positive_i64_to_u64(row.get::<_, i64>(12).map_err(storage_err)?) as usize,
649        skipped_triggers: positive_i64_to_u64(row.get::<_, i64>(15).map_err(storage_err)?) as usize,
650        starting_equity: row.get(16).map_err(storage_err)?,
651        ending_equity: row.get(17).map_err(storage_err)?,
652        net_pnl: row.get(18).map_err(storage_err)?,
653        observed_win_rate: row.get(19).map_err(storage_err)?,
654        average_net_pnl: row.get(20).map_err(storage_err)?,
655        configured_expected_value: row.get(21).map_err(storage_err)?,
656    }))
657}
658
659fn query_count(connection: &Connection, table: &str) -> Result<u64, StorageError> {
660    let sql = format!("SELECT COUNT(*) FROM {table}");
661    let mut statement =
662        connection
663            .prepare(&sql)
664            .map_err(|error| StorageError::WriteFailedWithContext {
665                message: error.to_string(),
666            })?;
667    let count: i64 = statement.query_row([], |row| row.get(0)).map_err(|error| {
668        StorageError::WriteFailedWithContext {
669            message: error.to_string(),
670        }
671    })?;
672    Ok(count.max(0) as u64)
673}
674
675fn next_backtest_run_id(connection: &Connection) -> Result<i64, StorageError> {
676    let mut statement = connection
677        .prepare("SELECT COALESCE(MAX(run_id), 0) + 1 FROM backtest_runs")
678        .map_err(storage_err)?;
679    statement
680        .query_row([], |row| row.get(0))
681        .map_err(storage_err)
682}
683
684fn insert_backtest_trade(
685    connection: &Connection,
686    run_id: i64,
687    trade: &BacktestTrade,
688) -> Result<(), StorageError> {
689    connection
690        .execute(
691            "INSERT INTO backtest_trades (
692                run_id, trade_id, trigger_time, entry_time, entry_price, stop_price,
693                take_profit_price, qty, exit_time, exit_price, exit_reason, gross_pnl, fees, net_pnl
694             ) VALUES (
695                ?, ?, CAST(? AS TIMESTAMP), CAST(? AS TIMESTAMP), ?, ?, ?, ?, CAST(? AS TIMESTAMP), ?, ?, ?, ?, ?
696             )",
697            params![
698                run_id,
699                trade.trade_id as i64,
700                trade.trigger_time.to_rfc3339(),
701                trade.entry_time.to_rfc3339(),
702                trade.entry_price,
703                trade.stop_price,
704                trade.take_profit_price,
705                trade.qty,
706                trade.exit_time.map(|value| value.to_rfc3339()),
707                trade.exit_price,
708                trade.exit_reason.as_ref().map(|reason| reason.as_str()),
709                trade.gross_pnl,
710                trade.fees,
711                trade.net_pnl,
712            ],
713        )
714        .map_err(storage_err)?;
715    Ok(())
716}
717
718fn load_backtest_trades(
719    connection: &Connection,
720    run_id: i64,
721) -> Result<Vec<BacktestTrade>, StorageError> {
722    let mut statement = connection
723        .prepare(
724            "SELECT trade_id, CAST(trigger_time AS VARCHAR), CAST(entry_time AS VARCHAR), entry_price, stop_price,
725                    take_profit_price, qty, CAST(exit_time AS VARCHAR), exit_price, exit_reason, gross_pnl, fees, net_pnl
726             FROM backtest_trades
727             WHERE run_id = ?
728             ORDER BY trade_id ASC",
729        )
730        .map_err(storage_err)?;
731    let mut rows = statement.query(params![run_id]).map_err(storage_err)?;
732    let mut result = Vec::new();
733    while let Some(row) = rows.next().map_err(storage_err)? {
734        let trigger_time_raw: String = row.get(1).map_err(storage_err)?;
735        let entry_time_raw: String = row.get(2).map_err(storage_err)?;
736        let exit_reason_raw: Option<String> = row.get(9).map_err(storage_err)?;
737        result.push(BacktestTrade {
738            trade_id: positive_i64_to_u64(row.get::<_, i64>(0).map_err(storage_err)?) as usize,
739            trigger_time: parse_timestamp_string(&trigger_time_raw)?,
740            entry_time: parse_timestamp_string(&entry_time_raw)?,
741            entry_price: row.get(3).map_err(storage_err)?,
742            stop_price: row.get(4).map_err(storage_err)?,
743            take_profit_price: row.get(5).map_err(storage_err)?,
744            qty: row.get(6).map_err(storage_err)?,
745            exit_time: row
746                .get::<_, Option<String>>(7)
747                .map_err(storage_err)?
748                .map(|value| parse_timestamp_string(&value))
749                .transpose()?,
750            exit_price: row.get(8).map_err(storage_err)?,
751            exit_reason: exit_reason_raw
752                .map(|value| parse_exit_reason(&value))
753                .transpose()?,
754            gross_pnl: row.get(10).map_err(storage_err)?,
755            fees: row.get(11).map_err(storage_err)?,
756            net_pnl: row.get(12).map_err(storage_err)?,
757        });
758    }
759    Ok(result)
760}
761
762fn parse_mode(raw: &str) -> Result<BinanceMode, StorageError> {
763    match raw {
764        "demo" => Ok(BinanceMode::Demo),
765        "real" => Ok(BinanceMode::Real),
766        other => Err(StorageError::WriteFailedWithContext {
767            message: format!("unsupported mode in backtest row: {other}"),
768        }),
769    }
770}
771
772fn parse_template(raw: &str) -> Result<StrategyTemplate, StorageError> {
773    match raw {
774        "liquidation-breakdown-short" => Ok(StrategyTemplate::LiquidationBreakdownShort),
775        "price-sma-cross-long" => Ok(StrategyTemplate::PriceSmaCrossLong),
776        other => Err(StorageError::WriteFailedWithContext {
777            message: format!("unsupported backtest template: {other}"),
778        }),
779    }
780}
781
782fn parse_exit_reason(raw: &str) -> Result<BacktestExitReason, StorageError> {
783    match raw {
784        "take_profit" => Ok(BacktestExitReason::TakeProfit),
785        "stop_loss" => Ok(BacktestExitReason::StopLoss),
786        "open_at_end" => Ok(BacktestExitReason::OpenAtEnd),
787        "signal_exit" => Ok(BacktestExitReason::SignalExit),
788        other => Err(StorageError::WriteFailedWithContext {
789            message: format!("unsupported backtest exit reason: {other}"),
790        }),
791    }
792}
793
794fn positive_i64_to_u64(value: i64) -> u64 {
795    value.max(0) as u64
796}
797
798fn storage_err(error: duckdb::Error) -> StorageError {
799    StorageError::WriteFailedWithContext {
800        message: error.to_string(),
801    }
802}
803
804fn parse_timestamp_string(value: &str) -> Result<chrono::DateTime<chrono::Utc>, StorageError> {
805    if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(value) {
806        return Ok(parsed.with_timezone(&chrono::Utc));
807    }
808    let naive =
809        chrono::NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S%.f").map_err(|error| {
810            StorageError::WriteFailedWithContext {
811                message: error.to_string(),
812            }
813        })?;
814    Ok(chrono::DateTime::<chrono::Utc>::from_naive_utc_and_offset(
815        naive,
816        chrono::Utc,
817    ))
818}
819
820fn query_schema_version(connection: &Connection) -> Result<Option<String>, StorageError> {
821    let mut statement = connection
822        .prepare("SELECT value FROM schema_metadata WHERE key = 'market_data_schema_version'")
823        .map_err(|error| StorageError::WriteFailedWithContext {
824            message: error.to_string(),
825        })?;
826    let value: Option<String> = statement.query_row([], |row| row.get(0)).map_err(|error| {
827        StorageError::WriteFailedWithContext {
828            message: error.to_string(),
829        }
830    })?;
831    Ok(value)
832}
833
834fn query_latest_timestamp(
835    connection: &Connection,
836    table: &str,
837    column: &str,
838) -> Result<Option<String>, StorageError> {
839    let sql = format!("SELECT CAST(MAX({column}) AS VARCHAR) FROM {table}");
840    let mut statement =
841        connection
842            .prepare(&sql)
843            .map_err(|error| StorageError::WriteFailedWithContext {
844                message: error.to_string(),
845            })?;
846    let value: Option<String> = statement.query_row([], |row| row.get(0)).map_err(|error| {
847        StorageError::WriteFailedWithContext {
848            message: error.to_string(),
849        }
850    })?;
851    Ok(value)
852}
853
854fn query_top_symbols(connection: &Connection, table: &str) -> Result<Vec<String>, StorageError> {
855    let sql = format!(
856        "SELECT symbol, COUNT(*) AS row_count FROM {table} GROUP BY symbol ORDER BY row_count DESC, symbol ASC LIMIT 5"
857    );
858    let mut statement =
859        connection
860            .prepare(&sql)
861            .map_err(|error| StorageError::WriteFailedWithContext {
862                message: error.to_string(),
863            })?;
864    let mut rows = statement
865        .query([])
866        .map_err(|error| StorageError::WriteFailedWithContext {
867            message: error.to_string(),
868        })?;
869    let mut result = Vec::new();
870    while let Some(row) = rows
871        .next()
872        .map_err(|error| StorageError::WriteFailedWithContext {
873            message: error.to_string(),
874        })?
875    {
876        let symbol: String = row
877            .get(0)
878            .map_err(|error| StorageError::WriteFailedWithContext {
879                message: error.to_string(),
880            })?;
881        let row_count: i64 = row
882            .get(1)
883            .map_err(|error| StorageError::WriteFailedWithContext {
884                message: error.to_string(),
885            })?;
886        result.push(format!("{symbol}:{row_count}"));
887    }
888    Ok(result)
889}
890
891fn latest_symbol_timestamp(
892    connection: &Connection,
893    table: &str,
894    time_column: &str,
895    symbol_column: &str,
896    symbol: &str,
897) -> Result<Option<chrono::DateTime<chrono::Utc>>, StorageError> {
898    let sql = format!(
899        "SELECT CAST(MAX({time_column}) AS VARCHAR) FROM {table} WHERE {symbol_column} = ?"
900    );
901    let mut statement = connection.prepare(&sql).map_err(storage_err)?;
902    let value: Option<String> = statement
903        .query_row(params![symbol], |row| row.get(0))
904        .map_err(storage_err)?;
905    value.map(|raw| parse_timestamp_string(&raw)).transpose()
906}
907
908fn query_count_in_range(
909    connection: &Connection,
910    table: &str,
911    time_column: &str,
912    symbol: &str,
913    from_ts: &str,
914    to_ts: &str,
915) -> Result<u64, StorageError> {
916    let sql = format!(
917        "SELECT COUNT(*) FROM {table} WHERE symbol = ? AND {time_column} >= CAST(? AS TIMESTAMP) AND {time_column} <= CAST(? AS TIMESTAMP)"
918    );
919    let mut statement =
920        connection
921            .prepare(&sql)
922            .map_err(|error| StorageError::WriteFailedWithContext {
923                message: error.to_string(),
924            })?;
925    let count: i64 = statement
926        .query_row(params![symbol, from_ts, to_ts], |row| row.get(0))
927        .map_err(|error| StorageError::WriteFailedWithContext {
928            message: error.to_string(),
929        })?;
930    Ok(count.max(0) as u64)
931}
932
933fn market_data_symbol_exists(connection: &Connection, symbol: &str) -> Result<bool, StorageError> {
934    let mut statement = connection
935        .prepare(
936            "SELECT EXISTS(
937                SELECT 1 FROM (
938                    SELECT symbol FROM raw_liquidation_events WHERE symbol = ?
939                    UNION
940                    SELECT symbol FROM raw_book_ticker WHERE symbol = ?
941                    UNION
942                    SELECT symbol FROM raw_agg_trades WHERE symbol = ?
943                    UNION
944                    SELECT symbol FROM raw_klines WHERE symbol = ?
945                    UNION
946                    SELECT symbol FROM derived_kline_1s WHERE symbol = ?
947                )
948            )",
949        )
950        .map_err(storage_err)?;
951    let exists = statement
952        .query_row(params![symbol, symbol, symbol, symbol, symbol], |row| {
953            row.get::<_, bool>(0)
954        })
955        .map_err(storage_err)?;
956    Ok(exists)
957}
958
959#[cfg(test)]
960mod tests {
961    use super::*;
962    use crate::dataset::schema::init_schema_for_path;
963
964    #[test]
965    fn read_only_backtest_queries_work_while_write_connection_is_held() {
966        let db_path = std::env::temp_dir().join(format!(
967            "sandbox-quant-query-{}.duckdb",
968            uuid::Uuid::new_v4()
969        ));
970        init_schema_for_path(&db_path).expect("init schema");
971
972        let write_connection =
973            open_dataset_connection_read_write(&db_path).expect("write connection");
974
975        let runs =
976            load_backtest_run_summaries(&db_path, 20).expect("read summaries with writer held");
977        let report =
978            load_backtest_report(&db_path, None).expect("read latest report with writer held");
979
980        assert!(runs.is_empty());
981        assert!(report.is_none());
982
983        drop(write_connection);
984        let _ = std::fs::remove_file(&db_path);
985    }
986}