Skip to main content

sandbox_quant/storage/
postgres_market_data.rs

1use chrono::{NaiveDate, TimeZone, Utc};
2use duckdb::{params, Connection as DuckConnection};
3use postgres::{Client, NoTls};
4
5use crate::app::bootstrap::BinanceMode;
6use crate::backtest_app::runner::{BacktestReport, BacktestTrade};
7use crate::dataset::schema::{init_schema_for_path, MARKET_DATA_SCHEMA_VERSION};
8use crate::error::storage_error::StorageError;
9use crate::record::coordination::RecorderCoordination;
10
11pub const POSTGRES_MARKET_DATA_SCHEMA_VERSION: &str = MARKET_DATA_SCHEMA_VERSION;
12
13const POSTGRES_MARKET_DATA_SCHEMA_SQL: &str = r#"
14CREATE TABLE IF NOT EXISTS schema_metadata (
15  key TEXT PRIMARY KEY,
16  value TEXT NOT NULL,
17  updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
18);
19
20CREATE TABLE IF NOT EXISTS raw_liquidation_events (
21  event_id BIGSERIAL PRIMARY KEY,
22  mode TEXT NOT NULL,
23  product TEXT NOT NULL,
24  symbol TEXT NOT NULL,
25  event_time TIMESTAMPTZ NOT NULL,
26  receive_time TIMESTAMPTZ NOT NULL,
27  force_side TEXT NOT NULL,
28  price DOUBLE PRECISION NOT NULL,
29  qty DOUBLE PRECISION NOT NULL,
30  notional DOUBLE PRECISION NOT NULL,
31  raw_payload TEXT NOT NULL
32);
33
34CREATE UNIQUE INDEX IF NOT EXISTS raw_liquidation_events_natural_idx
35ON raw_liquidation_events (mode, product, symbol, event_time, force_side, price, qty);
36
37CREATE TABLE IF NOT EXISTS raw_book_ticker (
38  tick_id BIGSERIAL PRIMARY KEY,
39  mode TEXT NOT NULL,
40  symbol TEXT NOT NULL,
41  event_time TIMESTAMPTZ NOT NULL,
42  receive_time TIMESTAMPTZ NOT NULL,
43  bid DOUBLE PRECISION NOT NULL,
44  bid_qty DOUBLE PRECISION NOT NULL,
45  ask DOUBLE PRECISION NOT NULL,
46  ask_qty DOUBLE PRECISION NOT NULL
47);
48
49CREATE UNIQUE INDEX IF NOT EXISTS raw_book_ticker_natural_idx
50ON raw_book_ticker (mode, symbol, event_time, bid, ask, bid_qty, ask_qty);
51
52CREATE TABLE IF NOT EXISTS raw_agg_trades (
53  trade_id BIGSERIAL PRIMARY KEY,
54  mode TEXT NOT NULL,
55  symbol TEXT NOT NULL,
56  event_time TIMESTAMPTZ NOT NULL,
57  receive_time TIMESTAMPTZ NOT NULL,
58  price DOUBLE PRECISION NOT NULL,
59  qty DOUBLE PRECISION NOT NULL,
60  is_buyer_maker BOOLEAN NOT NULL
61);
62
63CREATE UNIQUE INDEX IF NOT EXISTS raw_agg_trades_natural_idx
64ON raw_agg_trades (mode, symbol, event_time, price, qty, is_buyer_maker);
65
66CREATE TABLE IF NOT EXISTS raw_klines (
67  kline_id BIGSERIAL PRIMARY KEY,
68  mode TEXT NOT NULL,
69  product TEXT NOT NULL,
70  symbol TEXT NOT NULL,
71  interval_name TEXT NOT NULL,
72  open_time TIMESTAMPTZ NOT NULL,
73  close_time TIMESTAMPTZ NOT NULL,
74  open DOUBLE PRECISION NOT NULL,
75  high DOUBLE PRECISION NOT NULL,
76  low DOUBLE PRECISION NOT NULL,
77  close DOUBLE PRECISION NOT NULL,
78  volume DOUBLE PRECISION NOT NULL,
79  quote_volume DOUBLE PRECISION NOT NULL,
80  trade_count BIGINT NOT NULL,
81  taker_buy_base_volume DOUBLE PRECISION,
82  taker_buy_quote_volume DOUBLE PRECISION,
83  raw_payload TEXT NOT NULL
84);
85
86CREATE UNIQUE INDEX IF NOT EXISTS raw_klines_natural_idx
87ON raw_klines (mode, product, symbol, interval_name, open_time);
88
89CREATE TABLE IF NOT EXISTS backtest_runs (
90  export_run_id BIGSERIAL PRIMARY KEY,
91  source_db_path TEXT NOT NULL,
92  source_run_id BIGINT NOT NULL,
93  exported_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
94  mode TEXT NOT NULL,
95  template TEXT NOT NULL,
96  instrument TEXT NOT NULL,
97  from_date DATE NOT NULL,
98  to_date DATE NOT NULL,
99  liquidation_events BIGINT NOT NULL,
100  book_ticker_events BIGINT NOT NULL,
101  agg_trade_events BIGINT NOT NULL,
102  derived_kline_1s_bars BIGINT NOT NULL,
103  trigger_count BIGINT NOT NULL,
104  closed_trades BIGINT NOT NULL,
105  open_trades BIGINT NOT NULL,
106  wins BIGINT NOT NULL,
107  losses BIGINT NOT NULL,
108  skipped_triggers BIGINT NOT NULL,
109  starting_equity DOUBLE PRECISION NOT NULL,
110  ending_equity DOUBLE PRECISION NOT NULL,
111  net_pnl DOUBLE PRECISION NOT NULL,
112  observed_win_rate DOUBLE PRECISION NOT NULL,
113  average_net_pnl DOUBLE PRECISION NOT NULL,
114  configured_expected_value DOUBLE PRECISION NOT NULL,
115  risk_pct DOUBLE PRECISION NOT NULL,
116  win_rate_assumption DOUBLE PRECISION NOT NULL,
117  r_multiple DOUBLE PRECISION NOT NULL,
118  max_entry_slippage_pct DOUBLE PRECISION NOT NULL,
119  stop_distance_pct DOUBLE PRECISION NOT NULL,
120  UNIQUE (source_db_path, source_run_id)
121);
122
123CREATE INDEX IF NOT EXISTS backtest_runs_lookup_idx
124ON backtest_runs (mode, instrument, template, export_run_id DESC);
125
126CREATE TABLE IF NOT EXISTS backtest_trades (
127  export_run_id BIGINT NOT NULL REFERENCES backtest_runs(export_run_id) ON DELETE CASCADE,
128  trade_id BIGINT NOT NULL,
129  trigger_time TIMESTAMPTZ NOT NULL,
130  entry_time TIMESTAMPTZ NOT NULL,
131  entry_price DOUBLE PRECISION NOT NULL,
132  stop_price DOUBLE PRECISION NOT NULL,
133  take_profit_price DOUBLE PRECISION NOT NULL,
134  qty DOUBLE PRECISION NOT NULL,
135  exit_time TIMESTAMPTZ,
136  exit_price DOUBLE PRECISION,
137  exit_reason TEXT,
138  gross_pnl DOUBLE PRECISION,
139  fees DOUBLE PRECISION,
140  net_pnl DOUBLE PRECISION,
141  PRIMARY KEY (export_run_id, trade_id)
142);
143
144CREATE INDEX IF NOT EXISTS backtest_trades_exit_time_idx
145ON backtest_trades (export_run_id, exit_time);
146
147CREATE TABLE IF NOT EXISTS backtest_equity_points (
148  export_run_id BIGINT NOT NULL REFERENCES backtest_runs(export_run_id) ON DELETE CASCADE,
149  point_id BIGINT NOT NULL,
150  event_time TIMESTAMPTZ NOT NULL,
151  equity DOUBLE PRECISION NOT NULL,
152  cumulative_net_pnl DOUBLE PRECISION NOT NULL,
153  PRIMARY KEY (export_run_id, point_id)
154);
155
156CREATE INDEX IF NOT EXISTS backtest_equity_points_time_idx
157ON backtest_equity_points (export_run_id, event_time);
158"#;
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
161pub enum CollectorStorageBackend {
162    DuckDb,
163    Postgres,
164}
165
166impl CollectorStorageBackend {
167    pub fn as_str(self) -> &'static str {
168        match self {
169            Self::DuckDb => "duckdb",
170            Self::Postgres => "postgres",
171        }
172    }
173}
174
175#[derive(Debug, Clone, PartialEq)]
176pub struct PostgresKlineRecord {
177    pub mode: String,
178    pub product: String,
179    pub symbol: String,
180    pub interval_name: String,
181    pub open_time_ms: i64,
182    pub close_time_ms: i64,
183    pub open: f64,
184    pub high: f64,
185    pub low: f64,
186    pub close: f64,
187    pub volume: f64,
188    pub quote_volume: f64,
189    pub trade_count: i64,
190    pub taker_buy_base_volume: Option<f64>,
191    pub taker_buy_quote_volume: Option<f64>,
192    pub raw_payload: String,
193}
194
195#[derive(Debug, Clone, PartialEq)]
196pub struct PostgresLiquidationRecord {
197    pub mode: String,
198    pub product: String,
199    pub symbol: String,
200    pub event_time_ms: i64,
201    pub receive_time_ms: i64,
202    pub force_side: String,
203    pub price: f64,
204    pub qty: f64,
205    pub notional: f64,
206    pub raw_payload: String,
207}
208
209#[derive(Debug, Clone, PartialEq)]
210pub struct PostgresBookTickerRecord {
211    pub mode: String,
212    pub symbol: String,
213    pub event_time_ms: i64,
214    pub receive_time_ms: i64,
215    pub bid: f64,
216    pub bid_qty: f64,
217    pub ask: f64,
218    pub ask_qty: f64,
219}
220
221#[derive(Debug, Clone, PartialEq)]
222pub struct PostgresAggTradeRecord {
223    pub mode: String,
224    pub symbol: String,
225    pub event_time_ms: i64,
226    pub receive_time_ms: i64,
227    pub price: f64,
228    pub qty: f64,
229    pub is_buyer_maker: bool,
230}
231
232#[derive(Debug, Clone, PartialEq)]
233pub struct PostgresKlineSummaryRow {
234    pub product: String,
235    pub symbol: String,
236    pub interval_name: String,
237    pub row_count: i64,
238    pub min_time: Option<String>,
239    pub max_time: Option<String>,
240}
241
242#[derive(Debug, Clone, PartialEq)]
243pub struct PostgresLiquidationSummaryRow {
244    pub symbol: String,
245    pub row_count: i64,
246    pub min_time: Option<String>,
247    pub max_time: Option<String>,
248}
249
250#[derive(Debug, Clone, PartialEq)]
251pub struct PostgresSummary {
252    pub schema_version: String,
253    pub previous_version: Option<String>,
254    pub klines: Vec<PostgresKlineSummaryRow>,
255    pub liquidations: Vec<PostgresLiquidationSummaryRow>,
256}
257
258#[derive(Debug, Clone, PartialEq)]
259pub struct PostgresToDuckDbSnapshotConfig {
260    pub postgres_url: String,
261    pub mode: BinanceMode,
262    pub base_dir: String,
263    pub symbols: Vec<String>,
264    pub from: NaiveDate,
265    pub to: NaiveDate,
266    pub product: Option<String>,
267    pub interval_name: Option<String>,
268    pub include_klines: bool,
269    pub include_liquidations: bool,
270    pub include_book_tickers: bool,
271    pub include_agg_trades: bool,
272    pub clear_duckdb_range: bool,
273}
274
275#[derive(Debug, Clone, PartialEq, Eq)]
276pub struct PostgresToDuckDbSnapshotReport {
277    pub snapshot_export_id: i64,
278    pub db_path: String,
279    pub kline_rows: usize,
280    pub liquidation_rows: usize,
281    pub book_ticker_rows: usize,
282    pub agg_trade_rows: usize,
283}
284
285#[derive(Debug, Clone, PartialEq, Eq)]
286pub struct PostgresBacktestExportReport {
287    pub export_run_id: i64,
288    pub source_run_id: i64,
289    pub trade_rows: usize,
290    pub equity_point_rows: usize,
291}
292
293pub fn postgres_url_from_env() -> Result<String, StorageError> {
294    std::env::var("SANDBOX_QUANT_POSTGRES_URL")
295        .or_else(|_| std::env::var("DATABASE_URL"))
296        .map_err(|_| StorageError::WriteFailedWithContext {
297            message: "missing PostgreSQL URL; set SANDBOX_QUANT_POSTGRES_URL or DATABASE_URL"
298                .to_string(),
299        })
300}
301
302pub fn connect(url: &str) -> Result<Client, StorageError> {
303    Client::connect(url, NoTls).map_err(|error| StorageError::DatabaseInitFailed {
304        path: mask_postgres_url(url),
305        message: error.to_string(),
306    })
307}
308
309pub fn init_schema(client: &mut Client, url: &str) -> Result<Option<String>, StorageError> {
310    let previous_version = existing_schema_version(client)?;
311    client
312        .batch_execute(POSTGRES_MARKET_DATA_SCHEMA_SQL)
313        .map_err(|error| StorageError::DatabaseInitFailed {
314            path: mask_postgres_url(url),
315            message: error.to_string(),
316        })?;
317    client
318        .execute(
319            "INSERT INTO schema_metadata (key, value, updated_at)
320             VALUES ('market_data_schema_version', $1, CURRENT_TIMESTAMP)
321             ON CONFLICT (key) DO UPDATE
322             SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at",
323            &[&POSTGRES_MARKET_DATA_SCHEMA_VERSION],
324        )
325        .map_err(|error| StorageError::DatabaseInitFailed {
326            path: mask_postgres_url(url),
327            message: error.to_string(),
328        })?;
329    Ok(previous_version)
330}
331
332pub fn insert_kline(client: &mut Client, record: &PostgresKlineRecord) -> Result<(), StorageError> {
333    client
334        .execute(
335            "INSERT INTO raw_klines (
336                mode, product, symbol, interval_name, open_time, close_time,
337                open, high, low, close, volume, quote_volume, trade_count,
338                taker_buy_base_volume, taker_buy_quote_volume, raw_payload
339             ) VALUES (
340                $1, $2, $3, $4, to_timestamp($5 / 1000.0), to_timestamp($6 / 1000.0),
341                $7, $8, $9, $10, $11, $12, $13, $14, $15, $16
342             )
343             ON CONFLICT (mode, product, symbol, interval_name, open_time) DO NOTHING",
344            &[
345                &record.mode,
346                &record.product,
347                &record.symbol,
348                &record.interval_name,
349                &record.open_time_ms,
350                &record.close_time_ms,
351                &record.open,
352                &record.high,
353                &record.low,
354                &record.close,
355                &record.volume,
356                &record.quote_volume,
357                &record.trade_count,
358                &record.taker_buy_base_volume,
359                &record.taker_buy_quote_volume,
360                &record.raw_payload,
361            ],
362        )
363        .map(|_| ())
364        .map_err(storage_err)
365}
366
367pub fn insert_liquidation(
368    client: &mut Client,
369    record: &PostgresLiquidationRecord,
370) -> Result<(), StorageError> {
371    client
372        .execute(
373            "INSERT INTO raw_liquidation_events (
374                mode, product, symbol, event_time, receive_time, force_side, price, qty, notional, raw_payload
375             ) VALUES (
376                $1, $2, $3, to_timestamp($4 / 1000.0), to_timestamp($5 / 1000.0), $6, $7, $8, $9, $10
377             )
378             ON CONFLICT (mode, product, symbol, event_time, force_side, price, qty) DO NOTHING",
379            &[
380                &record.mode,
381                &record.product,
382                &record.symbol,
383                &record.event_time_ms,
384                &record.receive_time_ms,
385                &record.force_side,
386                &record.price,
387                &record.qty,
388                &record.notional,
389                &record.raw_payload,
390            ],
391        )
392        .map(|_| ())
393        .map_err(storage_err)
394}
395
396pub fn insert_book_ticker(
397    client: &mut Client,
398    record: &PostgresBookTickerRecord,
399) -> Result<(), StorageError> {
400    client
401        .execute(
402            "INSERT INTO raw_book_ticker (
403                mode, symbol, event_time, receive_time, bid, bid_qty, ask, ask_qty
404             ) VALUES (
405                $1, $2, to_timestamp($3 / 1000.0), to_timestamp($4 / 1000.0), $5, $6, $7, $8
406             )
407             ON CONFLICT (mode, symbol, event_time, bid, ask, bid_qty, ask_qty) DO NOTHING",
408            &[
409                &record.mode,
410                &record.symbol,
411                &record.event_time_ms,
412                &record.receive_time_ms,
413                &record.bid,
414                &record.bid_qty,
415                &record.ask,
416                &record.ask_qty,
417            ],
418        )
419        .map(|_| ())
420        .map_err(storage_err)
421}
422
423pub fn insert_agg_trade(
424    client: &mut Client,
425    record: &PostgresAggTradeRecord,
426) -> Result<(), StorageError> {
427    client
428        .execute(
429            "INSERT INTO raw_agg_trades (
430                mode, symbol, event_time, receive_time, price, qty, is_buyer_maker
431             ) VALUES (
432                $1, $2, to_timestamp($3 / 1000.0), to_timestamp($4 / 1000.0), $5, $6, $7
433             )
434             ON CONFLICT (mode, symbol, event_time, price, qty, is_buyer_maker) DO NOTHING",
435            &[
436                &record.mode,
437                &record.symbol,
438                &record.event_time_ms,
439                &record.receive_time_ms,
440                &record.price,
441                &record.qty,
442                &record.is_buyer_maker,
443            ],
444        )
445        .map(|_| ())
446        .map_err(storage_err)
447}
448
449pub fn metrics_for_postgres_url(
450    url: &str,
451) -> Result<crate::dataset::types::RecorderMetrics, StorageError> {
452    let mut client = connect(url)?;
453    let _ = init_schema(&mut client, url)?;
454    Ok(crate::dataset::types::RecorderMetrics {
455        liquidation_events: query_count(&mut client, "raw_liquidation_events")?,
456        book_ticker_events: query_count(&mut client, "raw_book_ticker")?,
457        agg_trade_events: query_count(&mut client, "raw_agg_trades")?,
458        derived_kline_1s_bars: 0,
459        schema_version: existing_schema_version(&mut client)?,
460        last_liquidation_event_time: query_latest_timestamp(
461            &mut client,
462            "raw_liquidation_events",
463            "event_time",
464        )?,
465        last_book_ticker_event_time: query_latest_timestamp(
466            &mut client,
467            "raw_book_ticker",
468            "event_time",
469        )?,
470        last_agg_trade_event_time: query_latest_timestamp(
471            &mut client,
472            "raw_agg_trades",
473            "event_time",
474        )?,
475        top_liquidation_symbols: query_top_symbols(&mut client, "raw_liquidation_events")?,
476        top_book_ticker_symbols: query_top_symbols(&mut client, "raw_book_ticker")?,
477        top_agg_trade_symbols: query_top_symbols(&mut client, "raw_agg_trades")?,
478    })
479}
480
481pub fn load_summary(
482    client: &mut Client,
483    previous_version: Option<String>,
484) -> Result<PostgresSummary, StorageError> {
485    let schema_version = existing_schema_version(client)?.unwrap_or_else(|| "missing".to_string());
486
487    let klines = client
488        .query(
489            "SELECT product, symbol, interval_name, COUNT(*) AS row_count,
490                    CAST(MIN(open_time) AS TEXT), CAST(MAX(close_time) AS TEXT)
491             FROM raw_klines
492             GROUP BY product, symbol, interval_name
493             ORDER BY product, symbol, interval_name",
494            &[],
495        )
496        .map_err(storage_err)?
497        .into_iter()
498        .map(|row| PostgresKlineSummaryRow {
499            product: row.get(0),
500            symbol: row.get(1),
501            interval_name: row.get(2),
502            row_count: row.get(3),
503            min_time: row.get(4),
504            max_time: row.get(5),
505        })
506        .collect();
507
508    let liquidations = client
509        .query(
510            "SELECT symbol, COUNT(*) AS row_count,
511                    CAST(MIN(event_time) AS TEXT), CAST(MAX(event_time) AS TEXT)
512             FROM raw_liquidation_events
513             GROUP BY symbol
514             ORDER BY symbol",
515            &[],
516        )
517        .map_err(storage_err)?
518        .into_iter()
519        .map(|row| PostgresLiquidationSummaryRow {
520            symbol: row.get(0),
521            row_count: row.get(1),
522            min_time: row.get(2),
523            max_time: row.get(3),
524        })
525        .collect();
526
527    Ok(PostgresSummary {
528        schema_version,
529        previous_version,
530        klines,
531        liquidations,
532    })
533}
534
535pub fn export_snapshot_to_duckdb(
536    config: &PostgresToDuckDbSnapshotConfig,
537) -> Result<PostgresToDuckDbSnapshotReport, StorageError> {
538    let mut client = connect(&config.postgres_url)?;
539    init_schema(&mut client, &config.postgres_url)?;
540
541    let db_path = RecorderCoordination::new(config.base_dir.clone()).db_path(config.mode);
542    init_schema_for_path(&db_path)?;
543    let duck =
544        DuckConnection::open(&db_path).map_err(|error| StorageError::DatabaseInitFailed {
545            path: db_path.display().to_string(),
546            message: error.to_string(),
547        })?;
548
549    let from_ts = format!("{} 00:00:00", config.from);
550    let to_ts = format!("{} 23:59:59", config.to);
551
552    let mut kline_rows = 0usize;
553    let mut liquidation_rows = 0usize;
554    let mut book_ticker_rows = 0usize;
555    let mut agg_trade_rows = 0usize;
556
557    for symbol in &config.symbols {
558        if config.clear_duckdb_range {
559            if config.include_klines {
560                clear_duckdb_klines(
561                    &duck,
562                    config.mode,
563                    symbol,
564                    config.product.as_deref(),
565                    config.interval_name.as_deref(),
566                    &from_ts,
567                    &to_ts,
568                )?;
569            }
570            if config.include_liquidations {
571                clear_duckdb_liquidations(&duck, config.mode, symbol, &from_ts, &to_ts)?;
572            }
573            if config.include_book_tickers {
574                clear_duckdb_book_tickers(&duck, config.mode, symbol, &from_ts, &to_ts)?;
575            }
576            if config.include_agg_trades {
577                clear_duckdb_agg_trades(&duck, config.mode, symbol, &from_ts, &to_ts)?;
578            }
579        }
580
581        if config.include_klines {
582            let rows = client
583                .query(
584                    "SELECT product, symbol, interval_name,
585                            (EXTRACT(EPOCH FROM open_time) * 1000)::BIGINT AS open_time_ms,
586                            (EXTRACT(EPOCH FROM close_time) * 1000)::BIGINT AS close_time_ms,
587                            open, high, low, close, volume, quote_volume, trade_count,
588                            taker_buy_base_volume, taker_buy_quote_volume, raw_payload
589                     FROM raw_klines
590                     WHERE mode = $1
591                       AND symbol = $2
592                       AND open_time >= CAST($3 AS TIMESTAMPTZ)
593                       AND open_time <= CAST($4 AS TIMESTAMPTZ)
594                       AND ($5::TEXT IS NULL OR product = $5)
595                       AND ($6::TEXT IS NULL OR interval_name = $6)
596                     ORDER BY open_time ASC",
597                    &[
598                        &config.mode.as_str(),
599                        &symbol,
600                        &from_ts,
601                        &to_ts,
602                        &config.product,
603                        &config.interval_name,
604                    ],
605                )
606                .map_err(storage_err)?;
607
608            let mut next_id = next_duckdb_kline_id(&duck)?;
609            for row in rows {
610                duck.execute(
611                    "INSERT INTO raw_klines (
612                        kline_id, mode, product, symbol, interval, open_time, close_time,
613                        open, high, low, close, volume, quote_volume, trade_count,
614                        taker_buy_base_volume, taker_buy_quote_volume, raw_payload
615                     ) VALUES (
616                        ?, ?, ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0),
617                        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
618                     )",
619                    params![
620                        next_id,
621                        config.mode.as_str(),
622                        row.get::<_, String>(0),
623                        row.get::<_, String>(1),
624                        row.get::<_, String>(2),
625                        row.get::<_, i64>(3),
626                        row.get::<_, i64>(4),
627                        row.get::<_, f64>(5),
628                        row.get::<_, f64>(6),
629                        row.get::<_, f64>(7),
630                        row.get::<_, f64>(8),
631                        row.get::<_, f64>(9),
632                        row.get::<_, f64>(10),
633                        row.get::<_, i64>(11),
634                        row.get::<_, Option<f64>>(12),
635                        row.get::<_, Option<f64>>(13),
636                        row.get::<_, String>(14),
637                    ],
638                )
639                .map_err(storage_err)?;
640                next_id += 1;
641                kline_rows += 1;
642            }
643        }
644
645        if config.include_liquidations {
646            let rows = client
647                .query(
648                    "SELECT symbol,
649                            (EXTRACT(EPOCH FROM event_time) * 1000)::BIGINT AS event_time_ms,
650                            (EXTRACT(EPOCH FROM receive_time) * 1000)::BIGINT AS receive_time_ms,
651                            force_side, price, qty, notional, raw_payload
652                     FROM raw_liquidation_events
653                     WHERE mode = $1
654                       AND symbol = $2
655                       AND event_time >= CAST($3 AS TIMESTAMPTZ)
656                       AND event_time <= CAST($4 AS TIMESTAMPTZ)
657                     ORDER BY event_time ASC",
658                    &[&config.mode.as_str(), &symbol, &from_ts, &to_ts],
659                )
660                .map_err(storage_err)?;
661            let mut next_id = next_duckdb_liquidation_event_id(&duck)?;
662            for row in rows {
663                duck.execute(
664                    "INSERT INTO raw_liquidation_events (
665                        event_id, mode, symbol, event_time, receive_time, force_side, price, qty, notional, raw_payload
666                     ) VALUES (
667                        ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?, ?
668                     )",
669                    params![
670                        next_id,
671                        config.mode.as_str(),
672                        row.get::<_, String>(0),
673                        row.get::<_, i64>(1),
674                        row.get::<_, i64>(2),
675                        row.get::<_, String>(3),
676                        row.get::<_, f64>(4),
677                        row.get::<_, f64>(5),
678                        row.get::<_, f64>(6),
679                        row.get::<_, String>(7),
680                    ],
681                )
682                .map_err(storage_err)?;
683                next_id += 1;
684                liquidation_rows += 1;
685            }
686        }
687
688        if config.include_book_tickers {
689            let rows = client
690                .query(
691                    "SELECT symbol,
692                            (EXTRACT(EPOCH FROM event_time) * 1000)::BIGINT AS event_time_ms,
693                            (EXTRACT(EPOCH FROM receive_time) * 1000)::BIGINT AS receive_time_ms,
694                            bid, bid_qty, ask, ask_qty
695                     FROM raw_book_ticker
696                     WHERE mode = $1
697                       AND symbol = $2
698                       AND event_time >= CAST($3 AS TIMESTAMPTZ)
699                       AND event_time <= CAST($4 AS TIMESTAMPTZ)
700                     ORDER BY event_time ASC",
701                    &[&config.mode.as_str(), &symbol, &from_ts, &to_ts],
702                )
703                .map_err(storage_err)?;
704            let mut next_id = next_duckdb_book_ticker_id(&duck)?;
705            for row in rows {
706                duck.execute(
707                    "INSERT INTO raw_book_ticker (
708                        tick_id, mode, symbol, event_time, receive_time, bid, bid_qty, ask, ask_qty
709                     ) VALUES (
710                        ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?
711                     )",
712                    params![
713                        next_id,
714                        config.mode.as_str(),
715                        row.get::<_, String>(0),
716                        row.get::<_, i64>(1),
717                        row.get::<_, i64>(2),
718                        row.get::<_, f64>(3),
719                        row.get::<_, f64>(4),
720                        row.get::<_, f64>(5),
721                        row.get::<_, f64>(6),
722                    ],
723                )
724                .map_err(storage_err)?;
725                next_id += 1;
726                book_ticker_rows += 1;
727            }
728        }
729
730        if config.include_agg_trades {
731            let rows = client
732                .query(
733                    "SELECT symbol,
734                            (EXTRACT(EPOCH FROM event_time) * 1000)::BIGINT AS event_time_ms,
735                            (EXTRACT(EPOCH FROM receive_time) * 1000)::BIGINT AS receive_time_ms,
736                            price, qty, is_buyer_maker
737                     FROM raw_agg_trades
738                     WHERE mode = $1
739                       AND symbol = $2
740                       AND event_time >= CAST($3 AS TIMESTAMPTZ)
741                       AND event_time <= CAST($4 AS TIMESTAMPTZ)
742                     ORDER BY event_time ASC",
743                    &[&config.mode.as_str(), &symbol, &from_ts, &to_ts],
744                )
745                .map_err(storage_err)?;
746            let mut next_id = next_duckdb_agg_trade_id(&duck)?;
747            for row in rows {
748                duck.execute(
749                    "INSERT INTO raw_agg_trades (
750                        trade_id, mode, symbol, event_time, receive_time, price, qty, is_buyer_maker
751                     ) VALUES (
752                        ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?
753                     )",
754                    params![
755                        next_id,
756                        config.mode.as_str(),
757                        row.get::<_, String>(0),
758                        row.get::<_, i64>(1),
759                        row.get::<_, i64>(2),
760                        row.get::<_, f64>(3),
761                        row.get::<_, f64>(4),
762                        row.get::<_, bool>(5),
763                    ],
764                )
765                .map_err(storage_err)?;
766                next_id += 1;
767                agg_trade_rows += 1;
768            }
769        }
770    }
771
772    let snapshot_export_id = next_duckdb_snapshot_export_id(&duck)?;
773    duck.execute(
774        "INSERT INTO snapshot_exports (
775            export_id, created_at, source_backend, source_target, mode, symbols_csv, from_date, to_date,
776            product, interval_name, include_klines, include_liquidations, include_book_tickers, include_agg_trades,
777            clear_duckdb_range, exported_kline_rows, exported_liquidation_rows, exported_book_ticker_rows, exported_agg_trade_rows
778         ) VALUES (
779            ?, CURRENT_TIMESTAMP, 'postgres', ?, ?, ?, CAST(? AS DATE), CAST(? AS DATE),
780            ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
781         )",
782        params![
783            snapshot_export_id,
784            mask_postgres_url(&config.postgres_url),
785            config.mode.as_str(),
786            config.symbols.join(","),
787            config.from.to_string(),
788            config.to.to_string(),
789            config.product.as_deref(),
790            config.interval_name.as_deref(),
791            config.include_klines,
792            config.include_liquidations,
793            config.include_book_tickers,
794            config.include_agg_trades,
795            config.clear_duckdb_range,
796            kline_rows as i64,
797            liquidation_rows as i64,
798            book_ticker_rows as i64,
799            agg_trade_rows as i64,
800        ],
801    )
802    .map_err(storage_err)?;
803
804    Ok(PostgresToDuckDbSnapshotReport {
805        snapshot_export_id,
806        db_path: db_path.display().to_string(),
807        kline_rows,
808        liquidation_rows,
809        book_ticker_rows,
810        agg_trade_rows,
811    })
812}
813
814pub fn export_backtest_report_to_postgres(
815    postgres_url: &str,
816    report: &BacktestReport,
817) -> Result<PostgresBacktestExportReport, StorageError> {
818    let source_run_id = report.run_id.ok_or_else(|| StorageError::WriteFailedWithContext {
819        message: "backtest report export requires a persisted run_id".to_string(),
820    })?;
821    let mut client = connect(postgres_url)?;
822    init_schema(&mut client, postgres_url)?;
823
824    let export_run_id = upsert_backtest_run(&mut client, report, source_run_id)?;
825    client
826        .execute(
827            "DELETE FROM backtest_trades WHERE export_run_id = $1",
828            &[&export_run_id],
829        )
830        .map_err(storage_err)?;
831    client
832        .execute(
833            "DELETE FROM backtest_equity_points WHERE export_run_id = $1",
834            &[&export_run_id],
835        )
836        .map_err(storage_err)?;
837
838    let mut trade_rows = 0usize;
839    for trade in &report.trades {
840        insert_backtest_trade_row(&mut client, export_run_id, trade)?;
841        trade_rows += 1;
842    }
843
844    let start_time = report
845        .from
846        .and_hms_opt(0, 0, 0)
847        .ok_or_else(|| StorageError::WriteFailedWithContext {
848            message: format!("invalid backtest start date: {}", report.from),
849        })
850        .map(|value| Utc.from_utc_datetime(&value))?;
851    let mut equity_point_rows = 0usize;
852    insert_backtest_equity_point(
853        &mut client,
854        export_run_id,
855        0,
856        start_time,
857        report.starting_equity,
858        0.0,
859    )?;
860    equity_point_rows += 1;
861
862    let mut realized = report
863        .trades
864        .iter()
865        .filter_map(|trade| Some((trade.exit_time?, trade.net_pnl?, trade.trade_id as i64)))
866        .collect::<Vec<_>>();
867    realized.sort_by(|left, right| left.0.cmp(&right.0).then(left.2.cmp(&right.2)));
868
869    let mut cumulative_net_pnl = 0.0f64;
870    for (index, (exit_time, net_pnl, _)) in realized.into_iter().enumerate() {
871        cumulative_net_pnl += net_pnl;
872        insert_backtest_equity_point(
873            &mut client,
874            export_run_id,
875            index as i64 + 1,
876            exit_time,
877            report.starting_equity + cumulative_net_pnl,
878            cumulative_net_pnl,
879        )?;
880        equity_point_rows += 1;
881    }
882
883    Ok(PostgresBacktestExportReport {
884        export_run_id,
885        source_run_id,
886        trade_rows,
887        equity_point_rows,
888    })
889}
890
891fn existing_schema_version(client: &mut Client) -> Result<Option<String>, StorageError> {
892    let table_exists = client
893        .query_one(
894            "SELECT EXISTS (
895                SELECT 1 FROM information_schema.tables
896                WHERE table_schema = 'public' AND table_name = 'schema_metadata'
897            )",
898            &[],
899        )
900        .map_err(storage_err)?
901        .get::<_, bool>(0);
902    if !table_exists {
903        return Ok(None);
904    }
905    client
906        .query_opt(
907            "SELECT value FROM schema_metadata WHERE key = 'market_data_schema_version'",
908            &[],
909        )
910        .map_err(storage_err)
911        .map(|row| row.map(|row| row.get(0)))
912}
913
914fn query_count(client: &mut Client, table: &str) -> Result<u64, StorageError> {
915    client
916        .query_one(&format!("SELECT COUNT(*) FROM {table}"), &[])
917        .map_err(storage_err)
918        .map(|row| row.get::<_, i64>(0).max(0) as u64)
919}
920
921fn query_latest_timestamp(
922    client: &mut Client,
923    table: &str,
924    column: &str,
925) -> Result<Option<String>, StorageError> {
926    client
927        .query_one(
928            &format!("SELECT CAST(MAX({column}) AS TEXT) FROM {table}"),
929            &[],
930        )
931        .map_err(storage_err)
932        .map(|row| row.get(0))
933}
934
935fn query_top_symbols(client: &mut Client, table: &str) -> Result<Vec<String>, StorageError> {
936    client
937        .query(
938            &format!(
939                "SELECT symbol, COUNT(*) AS row_count FROM {table} GROUP BY symbol ORDER BY row_count DESC, symbol ASC LIMIT 5"
940            ),
941            &[],
942        )
943        .map_err(storage_err)
944        .map(|rows| {
945            rows.into_iter()
946                .map(|row| format!("{}:{}", row.get::<_, String>(0), row.get::<_, i64>(1)))
947                .collect()
948        })
949}
950
951fn upsert_backtest_run(
952    client: &mut Client,
953    report: &BacktestReport,
954    source_run_id: i64,
955) -> Result<i64, StorageError> {
956    let closed_trades = report
957        .trades
958        .iter()
959        .filter(|trade| trade.net_pnl.is_some())
960        .count() as i64;
961    let mode = report.mode.as_str().to_string();
962    let template = report.template.slug().to_string();
963    let instrument = report.instrument.clone();
964    let source_db_path = report.db_path.display().to_string();
965    client
966        .query_one(
967            "INSERT INTO backtest_runs (
968                source_db_path, source_run_id, mode, template, instrument, from_date, to_date,
969                liquidation_events, book_ticker_events, agg_trade_events, derived_kline_1s_bars,
970                trigger_count, closed_trades, open_trades, wins, losses, skipped_triggers,
971                starting_equity, ending_equity, net_pnl, observed_win_rate, average_net_pnl,
972                configured_expected_value, risk_pct, win_rate_assumption, r_multiple,
973                max_entry_slippage_pct, stop_distance_pct
974             ) VALUES (
975                $1, $2, $3, $4, $5, $6, $7,
976                $8, $9, $10, $11, $12, $13, $14, $15, $16, $17,
977                $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28
978             )
979             ON CONFLICT (source_db_path, source_run_id) DO UPDATE SET
980                exported_at = CURRENT_TIMESTAMP,
981                mode = EXCLUDED.mode,
982                template = EXCLUDED.template,
983                instrument = EXCLUDED.instrument,
984                from_date = EXCLUDED.from_date,
985                to_date = EXCLUDED.to_date,
986                liquidation_events = EXCLUDED.liquidation_events,
987                book_ticker_events = EXCLUDED.book_ticker_events,
988                agg_trade_events = EXCLUDED.agg_trade_events,
989                derived_kline_1s_bars = EXCLUDED.derived_kline_1s_bars,
990                trigger_count = EXCLUDED.trigger_count,
991                closed_trades = EXCLUDED.closed_trades,
992                open_trades = EXCLUDED.open_trades,
993                wins = EXCLUDED.wins,
994                losses = EXCLUDED.losses,
995                skipped_triggers = EXCLUDED.skipped_triggers,
996                starting_equity = EXCLUDED.starting_equity,
997                ending_equity = EXCLUDED.ending_equity,
998                net_pnl = EXCLUDED.net_pnl,
999                observed_win_rate = EXCLUDED.observed_win_rate,
1000                average_net_pnl = EXCLUDED.average_net_pnl,
1001                configured_expected_value = EXCLUDED.configured_expected_value,
1002                risk_pct = EXCLUDED.risk_pct,
1003                win_rate_assumption = EXCLUDED.win_rate_assumption,
1004                r_multiple = EXCLUDED.r_multiple,
1005                max_entry_slippage_pct = EXCLUDED.max_entry_slippage_pct,
1006                stop_distance_pct = EXCLUDED.stop_distance_pct
1007            RETURNING export_run_id",
1008            &[
1009                &source_db_path,
1010                &source_run_id,
1011                &mode,
1012                &template,
1013                &instrument,
1014                &report.from,
1015                &report.to,
1016                &(report.dataset.liquidation_events as i64),
1017                &(report.dataset.book_ticker_events as i64),
1018                &(report.dataset.agg_trade_events as i64),
1019                &(report.dataset.derived_kline_1s_bars as i64),
1020                &(report.trigger_count as i64),
1021                &closed_trades,
1022                &(report.open_trades as i64),
1023                &(report.wins as i64),
1024                &(report.losses as i64),
1025                &(report.skipped_triggers as i64),
1026                &report.starting_equity,
1027                &report.ending_equity,
1028                &report.net_pnl,
1029                &report.observed_win_rate,
1030                &report.average_net_pnl,
1031                &report.configured_expected_value,
1032                &report.config.risk_pct,
1033                &report.config.win_rate_assumption,
1034                &report.config.r_multiple,
1035                &report.config.max_entry_slippage_pct,
1036                &report.config.stop_distance_pct,
1037            ],
1038        )
1039        .map_err(|error| StorageError::WriteFailedWithContext {
1040            message: format!(
1041                "upsert backtest run failed: source_run_id={} instrument={} template={} message={error}",
1042                source_run_id,
1043                report.instrument,
1044                report.template.slug(),
1045            ),
1046        })
1047        .map(|row| row.get(0))
1048}
1049
1050fn insert_backtest_trade_row(
1051    client: &mut Client,
1052    export_run_id: i64,
1053    trade: &BacktestTrade,
1054) -> Result<(), StorageError> {
1055    client
1056        .execute(
1057            "INSERT INTO backtest_trades (
1058                export_run_id, trade_id, trigger_time, entry_time, entry_price, stop_price,
1059                take_profit_price, qty, exit_time, exit_price, exit_reason, gross_pnl, fees, net_pnl
1060             ) VALUES (
1061                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
1062             )",
1063            &[
1064                &export_run_id,
1065                &(trade.trade_id as i64),
1066                &trade.trigger_time,
1067                &trade.entry_time,
1068                &trade.entry_price,
1069                &trade.stop_price,
1070                &trade.take_profit_price,
1071                &trade.qty,
1072                &trade.exit_time,
1073                &trade.exit_price,
1074                &trade.exit_reason.as_ref().map(|reason| reason.as_str()),
1075                &trade.gross_pnl,
1076                &trade.fees,
1077                &trade.net_pnl,
1078            ],
1079        )
1080        .map(|_| ())
1081        .map_err(|error| StorageError::WriteFailedWithContext {
1082            message: format!(
1083                "insert backtest trade failed: trade_id={} message={error}",
1084                trade.trade_id
1085            ),
1086        })
1087}
1088
1089fn insert_backtest_equity_point(
1090    client: &mut Client,
1091    export_run_id: i64,
1092    point_id: i64,
1093    event_time: chrono::DateTime<Utc>,
1094    equity: f64,
1095    cumulative_net_pnl: f64,
1096) -> Result<(), StorageError> {
1097    client
1098        .execute(
1099            "INSERT INTO backtest_equity_points (
1100                export_run_id, point_id, event_time, equity, cumulative_net_pnl
1101             ) VALUES (
1102                $1, $2, $3, $4, $5
1103             )",
1104            &[&export_run_id, &point_id, &event_time, &equity, &cumulative_net_pnl],
1105        )
1106        .map(|_| ())
1107        .map_err(|error| StorageError::WriteFailedWithContext {
1108            message: format!(
1109                "insert backtest equity point failed: point_id={} message={error}",
1110                point_id
1111            ),
1112        })
1113}
1114
1115fn clear_duckdb_klines(
1116    duck: &DuckConnection,
1117    mode: BinanceMode,
1118    symbol: &str,
1119    product: Option<&str>,
1120    interval_name: Option<&str>,
1121    from_ts: &str,
1122    to_ts: &str,
1123) -> Result<(), StorageError> {
1124    duck.execute(
1125        "DELETE FROM raw_klines
1126         WHERE mode = ?
1127           AND symbol = ?
1128           AND open_time >= CAST(? AS TIMESTAMP)
1129           AND open_time <= CAST(? AS TIMESTAMP)
1130           AND (? IS NULL OR product = ?)
1131           AND (? IS NULL OR interval = ?)",
1132        params![
1133            mode.as_str(),
1134            symbol,
1135            from_ts,
1136            to_ts,
1137            product,
1138            product,
1139            interval_name,
1140            interval_name
1141        ],
1142    )
1143    .map(|_| ())
1144    .map_err(storage_err)
1145}
1146
1147fn clear_duckdb_liquidations(
1148    duck: &DuckConnection,
1149    mode: BinanceMode,
1150    symbol: &str,
1151    from_ts: &str,
1152    to_ts: &str,
1153) -> Result<(), StorageError> {
1154    duck.execute(
1155        "DELETE FROM raw_liquidation_events
1156         WHERE mode = ?
1157           AND symbol = ?
1158           AND event_time >= CAST(? AS TIMESTAMP)
1159           AND event_time <= CAST(? AS TIMESTAMP)",
1160        params![mode.as_str(), symbol, from_ts, to_ts],
1161    )
1162    .map(|_| ())
1163    .map_err(storage_err)
1164}
1165
1166fn clear_duckdb_book_tickers(
1167    duck: &DuckConnection,
1168    mode: BinanceMode,
1169    symbol: &str,
1170    from_ts: &str,
1171    to_ts: &str,
1172) -> Result<(), StorageError> {
1173    duck.execute(
1174        "DELETE FROM raw_book_ticker
1175         WHERE mode = ?
1176           AND symbol = ?
1177           AND event_time >= CAST(? AS TIMESTAMP)
1178           AND event_time <= CAST(? AS TIMESTAMP)",
1179        params![mode.as_str(), symbol, from_ts, to_ts],
1180    )
1181    .map(|_| ())
1182    .map_err(storage_err)
1183}
1184
1185fn clear_duckdb_agg_trades(
1186    duck: &DuckConnection,
1187    mode: BinanceMode,
1188    symbol: &str,
1189    from_ts: &str,
1190    to_ts: &str,
1191) -> Result<(), StorageError> {
1192    duck.execute(
1193        "DELETE FROM raw_agg_trades
1194         WHERE mode = ?
1195           AND symbol = ?
1196           AND event_time >= CAST(? AS TIMESTAMP)
1197           AND event_time <= CAST(? AS TIMESTAMP)",
1198        params![mode.as_str(), symbol, from_ts, to_ts],
1199    )
1200    .map(|_| ())
1201    .map_err(storage_err)
1202}
1203
1204fn next_duckdb_kline_id(connection: &DuckConnection) -> Result<i64, StorageError> {
1205    connection
1206        .prepare("SELECT COALESCE(MAX(kline_id), 0) + 1 FROM raw_klines")
1207        .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
1208        .map_err(storage_err)
1209}
1210
1211fn next_duckdb_liquidation_event_id(connection: &DuckConnection) -> Result<i64, StorageError> {
1212    connection
1213        .prepare("SELECT COALESCE(MAX(event_id), 0) + 1 FROM raw_liquidation_events")
1214        .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
1215        .map_err(storage_err)
1216}
1217
1218fn next_duckdb_book_ticker_id(connection: &DuckConnection) -> Result<i64, StorageError> {
1219    connection
1220        .prepare("SELECT COALESCE(MAX(tick_id), 0) + 1 FROM raw_book_ticker")
1221        .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
1222        .map_err(storage_err)
1223}
1224
1225fn next_duckdb_agg_trade_id(connection: &DuckConnection) -> Result<i64, StorageError> {
1226    connection
1227        .prepare("SELECT COALESCE(MAX(trade_id), 0) + 1 FROM raw_agg_trades")
1228        .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
1229        .map_err(storage_err)
1230}
1231
1232fn next_duckdb_snapshot_export_id(connection: &DuckConnection) -> Result<i64, StorageError> {
1233    connection
1234        .prepare("SELECT COALESCE(MAX(export_id), 0) + 1 FROM snapshot_exports")
1235        .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
1236        .map_err(storage_err)
1237}
1238
1239pub fn mask_postgres_url(url: &str) -> String {
1240    if let Some((scheme, rest)) = url.split_once("://") {
1241        if let Some((_, host_part)) = rest.rsplit_once('@') {
1242            return format!("{scheme}://***@{host_part}");
1243        }
1244    }
1245    "postgres://***".to_string()
1246}
1247
1248fn storage_err(error: impl std::fmt::Display) -> StorageError {
1249    StorageError::WriteFailedWithContext {
1250        message: error.to_string(),
1251    }
1252}