1use chrono::{NaiveDate, TimeZone, Utc};
2use duckdb::{params, Connection as DuckConnection};
3use postgres::{Client, NoTls};
4
5use crate::app::bootstrap::BinanceMode;
6use crate::backtest_app::runner::{BacktestReport, BacktestTrade};
7use crate::dataset::schema::{init_schema_for_path, MARKET_DATA_SCHEMA_VERSION};
8use crate::error::storage_error::StorageError;
9use crate::record::coordination::RecorderCoordination;
10
11pub const POSTGRES_MARKET_DATA_SCHEMA_VERSION: &str = MARKET_DATA_SCHEMA_VERSION;
12
13const POSTGRES_MARKET_DATA_SCHEMA_SQL: &str = r#"
14CREATE TABLE IF NOT EXISTS schema_metadata (
15 key TEXT PRIMARY KEY,
16 value TEXT NOT NULL,
17 updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
18);
19
20CREATE TABLE IF NOT EXISTS raw_liquidation_events (
21 event_id BIGSERIAL PRIMARY KEY,
22 mode TEXT NOT NULL,
23 product TEXT NOT NULL,
24 symbol TEXT NOT NULL,
25 event_time TIMESTAMPTZ NOT NULL,
26 receive_time TIMESTAMPTZ NOT NULL,
27 force_side TEXT NOT NULL,
28 price DOUBLE PRECISION NOT NULL,
29 qty DOUBLE PRECISION NOT NULL,
30 notional DOUBLE PRECISION NOT NULL,
31 raw_payload TEXT NOT NULL
32);
33
34CREATE UNIQUE INDEX IF NOT EXISTS raw_liquidation_events_natural_idx
35ON raw_liquidation_events (mode, product, symbol, event_time, force_side, price, qty);
36
37CREATE TABLE IF NOT EXISTS raw_book_ticker (
38 tick_id BIGSERIAL PRIMARY KEY,
39 mode TEXT NOT NULL,
40 symbol TEXT NOT NULL,
41 event_time TIMESTAMPTZ NOT NULL,
42 receive_time TIMESTAMPTZ NOT NULL,
43 bid DOUBLE PRECISION NOT NULL,
44 bid_qty DOUBLE PRECISION NOT NULL,
45 ask DOUBLE PRECISION NOT NULL,
46 ask_qty DOUBLE PRECISION NOT NULL
47);
48
49CREATE UNIQUE INDEX IF NOT EXISTS raw_book_ticker_natural_idx
50ON raw_book_ticker (mode, symbol, event_time, bid, ask, bid_qty, ask_qty);
51
52CREATE TABLE IF NOT EXISTS raw_agg_trades (
53 trade_id BIGSERIAL PRIMARY KEY,
54 mode TEXT NOT NULL,
55 symbol TEXT NOT NULL,
56 event_time TIMESTAMPTZ NOT NULL,
57 receive_time TIMESTAMPTZ NOT NULL,
58 price DOUBLE PRECISION NOT NULL,
59 qty DOUBLE PRECISION NOT NULL,
60 is_buyer_maker BOOLEAN NOT NULL
61);
62
63CREATE UNIQUE INDEX IF NOT EXISTS raw_agg_trades_natural_idx
64ON raw_agg_trades (mode, symbol, event_time, price, qty, is_buyer_maker);
65
66CREATE TABLE IF NOT EXISTS raw_klines (
67 kline_id BIGSERIAL PRIMARY KEY,
68 mode TEXT NOT NULL,
69 product TEXT NOT NULL,
70 symbol TEXT NOT NULL,
71 interval_name TEXT NOT NULL,
72 open_time TIMESTAMPTZ NOT NULL,
73 close_time TIMESTAMPTZ NOT NULL,
74 open DOUBLE PRECISION NOT NULL,
75 high DOUBLE PRECISION NOT NULL,
76 low DOUBLE PRECISION NOT NULL,
77 close DOUBLE PRECISION NOT NULL,
78 volume DOUBLE PRECISION NOT NULL,
79 quote_volume DOUBLE PRECISION NOT NULL,
80 trade_count BIGINT NOT NULL,
81 taker_buy_base_volume DOUBLE PRECISION,
82 taker_buy_quote_volume DOUBLE PRECISION,
83 raw_payload TEXT NOT NULL
84);
85
86CREATE UNIQUE INDEX IF NOT EXISTS raw_klines_natural_idx
87ON raw_klines (mode, product, symbol, interval_name, open_time);
88
89CREATE TABLE IF NOT EXISTS backtest_runs (
90 export_run_id BIGSERIAL PRIMARY KEY,
91 source_db_path TEXT NOT NULL,
92 source_run_id BIGINT NOT NULL,
93 exported_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
94 mode TEXT NOT NULL,
95 template TEXT NOT NULL,
96 instrument TEXT NOT NULL,
97 from_date DATE NOT NULL,
98 to_date DATE NOT NULL,
99 liquidation_events BIGINT NOT NULL,
100 book_ticker_events BIGINT NOT NULL,
101 agg_trade_events BIGINT NOT NULL,
102 derived_kline_1s_bars BIGINT NOT NULL,
103 trigger_count BIGINT NOT NULL,
104 closed_trades BIGINT NOT NULL,
105 open_trades BIGINT NOT NULL,
106 wins BIGINT NOT NULL,
107 losses BIGINT NOT NULL,
108 skipped_triggers BIGINT NOT NULL,
109 starting_equity DOUBLE PRECISION NOT NULL,
110 ending_equity DOUBLE PRECISION NOT NULL,
111 net_pnl DOUBLE PRECISION NOT NULL,
112 observed_win_rate DOUBLE PRECISION NOT NULL,
113 average_net_pnl DOUBLE PRECISION NOT NULL,
114 configured_expected_value DOUBLE PRECISION NOT NULL,
115 risk_pct DOUBLE PRECISION NOT NULL,
116 win_rate_assumption DOUBLE PRECISION NOT NULL,
117 r_multiple DOUBLE PRECISION NOT NULL,
118 max_entry_slippage_pct DOUBLE PRECISION NOT NULL,
119 stop_distance_pct DOUBLE PRECISION NOT NULL,
120 UNIQUE (source_db_path, source_run_id)
121);
122
123CREATE INDEX IF NOT EXISTS backtest_runs_lookup_idx
124ON backtest_runs (mode, instrument, template, export_run_id DESC);
125
126CREATE TABLE IF NOT EXISTS backtest_trades (
127 export_run_id BIGINT NOT NULL REFERENCES backtest_runs(export_run_id) ON DELETE CASCADE,
128 trade_id BIGINT NOT NULL,
129 trigger_time TIMESTAMPTZ NOT NULL,
130 entry_time TIMESTAMPTZ NOT NULL,
131 entry_price DOUBLE PRECISION NOT NULL,
132 stop_price DOUBLE PRECISION NOT NULL,
133 take_profit_price DOUBLE PRECISION NOT NULL,
134 qty DOUBLE PRECISION NOT NULL,
135 exit_time TIMESTAMPTZ,
136 exit_price DOUBLE PRECISION,
137 exit_reason TEXT,
138 gross_pnl DOUBLE PRECISION,
139 fees DOUBLE PRECISION,
140 net_pnl DOUBLE PRECISION,
141 PRIMARY KEY (export_run_id, trade_id)
142);
143
144CREATE INDEX IF NOT EXISTS backtest_trades_exit_time_idx
145ON backtest_trades (export_run_id, exit_time);
146
147CREATE TABLE IF NOT EXISTS backtest_equity_points (
148 export_run_id BIGINT NOT NULL REFERENCES backtest_runs(export_run_id) ON DELETE CASCADE,
149 point_id BIGINT NOT NULL,
150 event_time TIMESTAMPTZ NOT NULL,
151 equity DOUBLE PRECISION NOT NULL,
152 cumulative_net_pnl DOUBLE PRECISION NOT NULL,
153 PRIMARY KEY (export_run_id, point_id)
154);
155
156CREATE INDEX IF NOT EXISTS backtest_equity_points_time_idx
157ON backtest_equity_points (export_run_id, event_time);
158"#;
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
161pub enum CollectorStorageBackend {
162 DuckDb,
163 Postgres,
164}
165
166impl CollectorStorageBackend {
167 pub fn as_str(self) -> &'static str {
168 match self {
169 Self::DuckDb => "duckdb",
170 Self::Postgres => "postgres",
171 }
172 }
173}
174
175#[derive(Debug, Clone, PartialEq)]
176pub struct PostgresKlineRecord {
177 pub mode: String,
178 pub product: String,
179 pub symbol: String,
180 pub interval_name: String,
181 pub open_time_ms: i64,
182 pub close_time_ms: i64,
183 pub open: f64,
184 pub high: f64,
185 pub low: f64,
186 pub close: f64,
187 pub volume: f64,
188 pub quote_volume: f64,
189 pub trade_count: i64,
190 pub taker_buy_base_volume: Option<f64>,
191 pub taker_buy_quote_volume: Option<f64>,
192 pub raw_payload: String,
193}
194
195#[derive(Debug, Clone, PartialEq)]
196pub struct PostgresLiquidationRecord {
197 pub mode: String,
198 pub product: String,
199 pub symbol: String,
200 pub event_time_ms: i64,
201 pub receive_time_ms: i64,
202 pub force_side: String,
203 pub price: f64,
204 pub qty: f64,
205 pub notional: f64,
206 pub raw_payload: String,
207}
208
209#[derive(Debug, Clone, PartialEq)]
210pub struct PostgresBookTickerRecord {
211 pub mode: String,
212 pub symbol: String,
213 pub event_time_ms: i64,
214 pub receive_time_ms: i64,
215 pub bid: f64,
216 pub bid_qty: f64,
217 pub ask: f64,
218 pub ask_qty: f64,
219}
220
221#[derive(Debug, Clone, PartialEq)]
222pub struct PostgresAggTradeRecord {
223 pub mode: String,
224 pub symbol: String,
225 pub event_time_ms: i64,
226 pub receive_time_ms: i64,
227 pub price: f64,
228 pub qty: f64,
229 pub is_buyer_maker: bool,
230}
231
232#[derive(Debug, Clone, PartialEq)]
233pub struct PostgresKlineSummaryRow {
234 pub product: String,
235 pub symbol: String,
236 pub interval_name: String,
237 pub row_count: i64,
238 pub min_time: Option<String>,
239 pub max_time: Option<String>,
240}
241
242#[derive(Debug, Clone, PartialEq)]
243pub struct PostgresLiquidationSummaryRow {
244 pub symbol: String,
245 pub row_count: i64,
246 pub min_time: Option<String>,
247 pub max_time: Option<String>,
248}
249
250#[derive(Debug, Clone, PartialEq)]
251pub struct PostgresSummary {
252 pub schema_version: String,
253 pub previous_version: Option<String>,
254 pub klines: Vec<PostgresKlineSummaryRow>,
255 pub liquidations: Vec<PostgresLiquidationSummaryRow>,
256}
257
258#[derive(Debug, Clone, PartialEq)]
259pub struct PostgresToDuckDbSnapshotConfig {
260 pub postgres_url: String,
261 pub mode: BinanceMode,
262 pub base_dir: String,
263 pub symbols: Vec<String>,
264 pub from: NaiveDate,
265 pub to: NaiveDate,
266 pub product: Option<String>,
267 pub interval_name: Option<String>,
268 pub include_klines: bool,
269 pub include_liquidations: bool,
270 pub include_book_tickers: bool,
271 pub include_agg_trades: bool,
272 pub clear_duckdb_range: bool,
273}
274
275#[derive(Debug, Clone, PartialEq, Eq)]
276pub struct PostgresToDuckDbSnapshotReport {
277 pub snapshot_export_id: i64,
278 pub db_path: String,
279 pub kline_rows: usize,
280 pub liquidation_rows: usize,
281 pub book_ticker_rows: usize,
282 pub agg_trade_rows: usize,
283}
284
285#[derive(Debug, Clone, PartialEq, Eq)]
286pub struct PostgresBacktestExportReport {
287 pub export_run_id: i64,
288 pub source_run_id: i64,
289 pub trade_rows: usize,
290 pub equity_point_rows: usize,
291}
292
293pub fn postgres_url_from_env() -> Result<String, StorageError> {
294 std::env::var("SANDBOX_QUANT_POSTGRES_URL")
295 .or_else(|_| std::env::var("DATABASE_URL"))
296 .map_err(|_| StorageError::WriteFailedWithContext {
297 message: "missing PostgreSQL URL; set SANDBOX_QUANT_POSTGRES_URL or DATABASE_URL"
298 .to_string(),
299 })
300}
301
302pub fn connect(url: &str) -> Result<Client, StorageError> {
303 Client::connect(url, NoTls).map_err(|error| StorageError::DatabaseInitFailed {
304 path: mask_postgres_url(url),
305 message: error.to_string(),
306 })
307}
308
309pub fn init_schema(client: &mut Client, url: &str) -> Result<Option<String>, StorageError> {
310 let previous_version = existing_schema_version(client)?;
311 client
312 .batch_execute(POSTGRES_MARKET_DATA_SCHEMA_SQL)
313 .map_err(|error| StorageError::DatabaseInitFailed {
314 path: mask_postgres_url(url),
315 message: error.to_string(),
316 })?;
317 client
318 .execute(
319 "INSERT INTO schema_metadata (key, value, updated_at)
320 VALUES ('market_data_schema_version', $1, CURRENT_TIMESTAMP)
321 ON CONFLICT (key) DO UPDATE
322 SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at",
323 &[&POSTGRES_MARKET_DATA_SCHEMA_VERSION],
324 )
325 .map_err(|error| StorageError::DatabaseInitFailed {
326 path: mask_postgres_url(url),
327 message: error.to_string(),
328 })?;
329 Ok(previous_version)
330}
331
332pub fn insert_kline(client: &mut Client, record: &PostgresKlineRecord) -> Result<(), StorageError> {
333 client
334 .execute(
335 "INSERT INTO raw_klines (
336 mode, product, symbol, interval_name, open_time, close_time,
337 open, high, low, close, volume, quote_volume, trade_count,
338 taker_buy_base_volume, taker_buy_quote_volume, raw_payload
339 ) VALUES (
340 $1, $2, $3, $4, to_timestamp($5 / 1000.0), to_timestamp($6 / 1000.0),
341 $7, $8, $9, $10, $11, $12, $13, $14, $15, $16
342 )
343 ON CONFLICT (mode, product, symbol, interval_name, open_time) DO NOTHING",
344 &[
345 &record.mode,
346 &record.product,
347 &record.symbol,
348 &record.interval_name,
349 &record.open_time_ms,
350 &record.close_time_ms,
351 &record.open,
352 &record.high,
353 &record.low,
354 &record.close,
355 &record.volume,
356 &record.quote_volume,
357 &record.trade_count,
358 &record.taker_buy_base_volume,
359 &record.taker_buy_quote_volume,
360 &record.raw_payload,
361 ],
362 )
363 .map(|_| ())
364 .map_err(storage_err)
365}
366
367pub fn insert_liquidation(
368 client: &mut Client,
369 record: &PostgresLiquidationRecord,
370) -> Result<(), StorageError> {
371 client
372 .execute(
373 "INSERT INTO raw_liquidation_events (
374 mode, product, symbol, event_time, receive_time, force_side, price, qty, notional, raw_payload
375 ) VALUES (
376 $1, $2, $3, to_timestamp($4 / 1000.0), to_timestamp($5 / 1000.0), $6, $7, $8, $9, $10
377 )
378 ON CONFLICT (mode, product, symbol, event_time, force_side, price, qty) DO NOTHING",
379 &[
380 &record.mode,
381 &record.product,
382 &record.symbol,
383 &record.event_time_ms,
384 &record.receive_time_ms,
385 &record.force_side,
386 &record.price,
387 &record.qty,
388 &record.notional,
389 &record.raw_payload,
390 ],
391 )
392 .map(|_| ())
393 .map_err(storage_err)
394}
395
396pub fn insert_book_ticker(
397 client: &mut Client,
398 record: &PostgresBookTickerRecord,
399) -> Result<(), StorageError> {
400 client
401 .execute(
402 "INSERT INTO raw_book_ticker (
403 mode, symbol, event_time, receive_time, bid, bid_qty, ask, ask_qty
404 ) VALUES (
405 $1, $2, to_timestamp($3 / 1000.0), to_timestamp($4 / 1000.0), $5, $6, $7, $8
406 )
407 ON CONFLICT (mode, symbol, event_time, bid, ask, bid_qty, ask_qty) DO NOTHING",
408 &[
409 &record.mode,
410 &record.symbol,
411 &record.event_time_ms,
412 &record.receive_time_ms,
413 &record.bid,
414 &record.bid_qty,
415 &record.ask,
416 &record.ask_qty,
417 ],
418 )
419 .map(|_| ())
420 .map_err(storage_err)
421}
422
423pub fn insert_agg_trade(
424 client: &mut Client,
425 record: &PostgresAggTradeRecord,
426) -> Result<(), StorageError> {
427 client
428 .execute(
429 "INSERT INTO raw_agg_trades (
430 mode, symbol, event_time, receive_time, price, qty, is_buyer_maker
431 ) VALUES (
432 $1, $2, to_timestamp($3 / 1000.0), to_timestamp($4 / 1000.0), $5, $6, $7
433 )
434 ON CONFLICT (mode, symbol, event_time, price, qty, is_buyer_maker) DO NOTHING",
435 &[
436 &record.mode,
437 &record.symbol,
438 &record.event_time_ms,
439 &record.receive_time_ms,
440 &record.price,
441 &record.qty,
442 &record.is_buyer_maker,
443 ],
444 )
445 .map(|_| ())
446 .map_err(storage_err)
447}
448
449pub fn metrics_for_postgres_url(
450 url: &str,
451) -> Result<crate::dataset::types::RecorderMetrics, StorageError> {
452 let mut client = connect(url)?;
453 let _ = init_schema(&mut client, url)?;
454 Ok(crate::dataset::types::RecorderMetrics {
455 liquidation_events: query_count(&mut client, "raw_liquidation_events")?,
456 book_ticker_events: query_count(&mut client, "raw_book_ticker")?,
457 agg_trade_events: query_count(&mut client, "raw_agg_trades")?,
458 derived_kline_1s_bars: 0,
459 schema_version: existing_schema_version(&mut client)?,
460 last_liquidation_event_time: query_latest_timestamp(
461 &mut client,
462 "raw_liquidation_events",
463 "event_time",
464 )?,
465 last_book_ticker_event_time: query_latest_timestamp(
466 &mut client,
467 "raw_book_ticker",
468 "event_time",
469 )?,
470 last_agg_trade_event_time: query_latest_timestamp(
471 &mut client,
472 "raw_agg_trades",
473 "event_time",
474 )?,
475 top_liquidation_symbols: query_top_symbols(&mut client, "raw_liquidation_events")?,
476 top_book_ticker_symbols: query_top_symbols(&mut client, "raw_book_ticker")?,
477 top_agg_trade_symbols: query_top_symbols(&mut client, "raw_agg_trades")?,
478 })
479}
480
481pub fn load_summary(
482 client: &mut Client,
483 previous_version: Option<String>,
484) -> Result<PostgresSummary, StorageError> {
485 let schema_version = existing_schema_version(client)?.unwrap_or_else(|| "missing".to_string());
486
487 let klines = client
488 .query(
489 "SELECT product, symbol, interval_name, COUNT(*) AS row_count,
490 CAST(MIN(open_time) AS TEXT), CAST(MAX(close_time) AS TEXT)
491 FROM raw_klines
492 GROUP BY product, symbol, interval_name
493 ORDER BY product, symbol, interval_name",
494 &[],
495 )
496 .map_err(storage_err)?
497 .into_iter()
498 .map(|row| PostgresKlineSummaryRow {
499 product: row.get(0),
500 symbol: row.get(1),
501 interval_name: row.get(2),
502 row_count: row.get(3),
503 min_time: row.get(4),
504 max_time: row.get(5),
505 })
506 .collect();
507
508 let liquidations = client
509 .query(
510 "SELECT symbol, COUNT(*) AS row_count,
511 CAST(MIN(event_time) AS TEXT), CAST(MAX(event_time) AS TEXT)
512 FROM raw_liquidation_events
513 GROUP BY symbol
514 ORDER BY symbol",
515 &[],
516 )
517 .map_err(storage_err)?
518 .into_iter()
519 .map(|row| PostgresLiquidationSummaryRow {
520 symbol: row.get(0),
521 row_count: row.get(1),
522 min_time: row.get(2),
523 max_time: row.get(3),
524 })
525 .collect();
526
527 Ok(PostgresSummary {
528 schema_version,
529 previous_version,
530 klines,
531 liquidations,
532 })
533}
534
535pub fn export_snapshot_to_duckdb(
536 config: &PostgresToDuckDbSnapshotConfig,
537) -> Result<PostgresToDuckDbSnapshotReport, StorageError> {
538 let mut client = connect(&config.postgres_url)?;
539 init_schema(&mut client, &config.postgres_url)?;
540
541 let db_path = RecorderCoordination::new(config.base_dir.clone()).db_path(config.mode);
542 init_schema_for_path(&db_path)?;
543 let duck =
544 DuckConnection::open(&db_path).map_err(|error| StorageError::DatabaseInitFailed {
545 path: db_path.display().to_string(),
546 message: error.to_string(),
547 })?;
548
549 let from_ts = format!("{} 00:00:00", config.from);
550 let to_ts = format!("{} 23:59:59", config.to);
551
552 let mut kline_rows = 0usize;
553 let mut liquidation_rows = 0usize;
554 let mut book_ticker_rows = 0usize;
555 let mut agg_trade_rows = 0usize;
556
557 for symbol in &config.symbols {
558 if config.clear_duckdb_range {
559 if config.include_klines {
560 clear_duckdb_klines(
561 &duck,
562 config.mode,
563 symbol,
564 config.product.as_deref(),
565 config.interval_name.as_deref(),
566 &from_ts,
567 &to_ts,
568 )?;
569 }
570 if config.include_liquidations {
571 clear_duckdb_liquidations(&duck, config.mode, symbol, &from_ts, &to_ts)?;
572 }
573 if config.include_book_tickers {
574 clear_duckdb_book_tickers(&duck, config.mode, symbol, &from_ts, &to_ts)?;
575 }
576 if config.include_agg_trades {
577 clear_duckdb_agg_trades(&duck, config.mode, symbol, &from_ts, &to_ts)?;
578 }
579 }
580
581 if config.include_klines {
582 let rows = client
583 .query(
584 "SELECT product, symbol, interval_name,
585 (EXTRACT(EPOCH FROM open_time) * 1000)::BIGINT AS open_time_ms,
586 (EXTRACT(EPOCH FROM close_time) * 1000)::BIGINT AS close_time_ms,
587 open, high, low, close, volume, quote_volume, trade_count,
588 taker_buy_base_volume, taker_buy_quote_volume, raw_payload
589 FROM raw_klines
590 WHERE mode = $1
591 AND symbol = $2
592 AND open_time >= CAST($3 AS TIMESTAMPTZ)
593 AND open_time <= CAST($4 AS TIMESTAMPTZ)
594 AND ($5::TEXT IS NULL OR product = $5)
595 AND ($6::TEXT IS NULL OR interval_name = $6)
596 ORDER BY open_time ASC",
597 &[
598 &config.mode.as_str(),
599 &symbol,
600 &from_ts,
601 &to_ts,
602 &config.product,
603 &config.interval_name,
604 ],
605 )
606 .map_err(storage_err)?;
607
608 let mut next_id = next_duckdb_kline_id(&duck)?;
609 for row in rows {
610 duck.execute(
611 "INSERT INTO raw_klines (
612 kline_id, mode, product, symbol, interval, open_time, close_time,
613 open, high, low, close, volume, quote_volume, trade_count,
614 taker_buy_base_volume, taker_buy_quote_volume, raw_payload
615 ) VALUES (
616 ?, ?, ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0),
617 ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
618 )",
619 params![
620 next_id,
621 config.mode.as_str(),
622 row.get::<_, String>(0),
623 row.get::<_, String>(1),
624 row.get::<_, String>(2),
625 row.get::<_, i64>(3),
626 row.get::<_, i64>(4),
627 row.get::<_, f64>(5),
628 row.get::<_, f64>(6),
629 row.get::<_, f64>(7),
630 row.get::<_, f64>(8),
631 row.get::<_, f64>(9),
632 row.get::<_, f64>(10),
633 row.get::<_, i64>(11),
634 row.get::<_, Option<f64>>(12),
635 row.get::<_, Option<f64>>(13),
636 row.get::<_, String>(14),
637 ],
638 )
639 .map_err(storage_err)?;
640 next_id += 1;
641 kline_rows += 1;
642 }
643 }
644
645 if config.include_liquidations {
646 let rows = client
647 .query(
648 "SELECT symbol,
649 (EXTRACT(EPOCH FROM event_time) * 1000)::BIGINT AS event_time_ms,
650 (EXTRACT(EPOCH FROM receive_time) * 1000)::BIGINT AS receive_time_ms,
651 force_side, price, qty, notional, raw_payload
652 FROM raw_liquidation_events
653 WHERE mode = $1
654 AND symbol = $2
655 AND event_time >= CAST($3 AS TIMESTAMPTZ)
656 AND event_time <= CAST($4 AS TIMESTAMPTZ)
657 ORDER BY event_time ASC",
658 &[&config.mode.as_str(), &symbol, &from_ts, &to_ts],
659 )
660 .map_err(storage_err)?;
661 let mut next_id = next_duckdb_liquidation_event_id(&duck)?;
662 for row in rows {
663 duck.execute(
664 "INSERT INTO raw_liquidation_events (
665 event_id, mode, symbol, event_time, receive_time, force_side, price, qty, notional, raw_payload
666 ) VALUES (
667 ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?, ?
668 )",
669 params![
670 next_id,
671 config.mode.as_str(),
672 row.get::<_, String>(0),
673 row.get::<_, i64>(1),
674 row.get::<_, i64>(2),
675 row.get::<_, String>(3),
676 row.get::<_, f64>(4),
677 row.get::<_, f64>(5),
678 row.get::<_, f64>(6),
679 row.get::<_, String>(7),
680 ],
681 )
682 .map_err(storage_err)?;
683 next_id += 1;
684 liquidation_rows += 1;
685 }
686 }
687
688 if config.include_book_tickers {
689 let rows = client
690 .query(
691 "SELECT symbol,
692 (EXTRACT(EPOCH FROM event_time) * 1000)::BIGINT AS event_time_ms,
693 (EXTRACT(EPOCH FROM receive_time) * 1000)::BIGINT AS receive_time_ms,
694 bid, bid_qty, ask, ask_qty
695 FROM raw_book_ticker
696 WHERE mode = $1
697 AND symbol = $2
698 AND event_time >= CAST($3 AS TIMESTAMPTZ)
699 AND event_time <= CAST($4 AS TIMESTAMPTZ)
700 ORDER BY event_time ASC",
701 &[&config.mode.as_str(), &symbol, &from_ts, &to_ts],
702 )
703 .map_err(storage_err)?;
704 let mut next_id = next_duckdb_book_ticker_id(&duck)?;
705 for row in rows {
706 duck.execute(
707 "INSERT INTO raw_book_ticker (
708 tick_id, mode, symbol, event_time, receive_time, bid, bid_qty, ask, ask_qty
709 ) VALUES (
710 ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?
711 )",
712 params![
713 next_id,
714 config.mode.as_str(),
715 row.get::<_, String>(0),
716 row.get::<_, i64>(1),
717 row.get::<_, i64>(2),
718 row.get::<_, f64>(3),
719 row.get::<_, f64>(4),
720 row.get::<_, f64>(5),
721 row.get::<_, f64>(6),
722 ],
723 )
724 .map_err(storage_err)?;
725 next_id += 1;
726 book_ticker_rows += 1;
727 }
728 }
729
730 if config.include_agg_trades {
731 let rows = client
732 .query(
733 "SELECT symbol,
734 (EXTRACT(EPOCH FROM event_time) * 1000)::BIGINT AS event_time_ms,
735 (EXTRACT(EPOCH FROM receive_time) * 1000)::BIGINT AS receive_time_ms,
736 price, qty, is_buyer_maker
737 FROM raw_agg_trades
738 WHERE mode = $1
739 AND symbol = $2
740 AND event_time >= CAST($3 AS TIMESTAMPTZ)
741 AND event_time <= CAST($4 AS TIMESTAMPTZ)
742 ORDER BY event_time ASC",
743 &[&config.mode.as_str(), &symbol, &from_ts, &to_ts],
744 )
745 .map_err(storage_err)?;
746 let mut next_id = next_duckdb_agg_trade_id(&duck)?;
747 for row in rows {
748 duck.execute(
749 "INSERT INTO raw_agg_trades (
750 trade_id, mode, symbol, event_time, receive_time, price, qty, is_buyer_maker
751 ) VALUES (
752 ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?
753 )",
754 params![
755 next_id,
756 config.mode.as_str(),
757 row.get::<_, String>(0),
758 row.get::<_, i64>(1),
759 row.get::<_, i64>(2),
760 row.get::<_, f64>(3),
761 row.get::<_, f64>(4),
762 row.get::<_, bool>(5),
763 ],
764 )
765 .map_err(storage_err)?;
766 next_id += 1;
767 agg_trade_rows += 1;
768 }
769 }
770 }
771
772 let snapshot_export_id = next_duckdb_snapshot_export_id(&duck)?;
773 duck.execute(
774 "INSERT INTO snapshot_exports (
775 export_id, created_at, source_backend, source_target, mode, symbols_csv, from_date, to_date,
776 product, interval_name, include_klines, include_liquidations, include_book_tickers, include_agg_trades,
777 clear_duckdb_range, exported_kline_rows, exported_liquidation_rows, exported_book_ticker_rows, exported_agg_trade_rows
778 ) VALUES (
779 ?, CURRENT_TIMESTAMP, 'postgres', ?, ?, ?, CAST(? AS DATE), CAST(? AS DATE),
780 ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
781 )",
782 params![
783 snapshot_export_id,
784 mask_postgres_url(&config.postgres_url),
785 config.mode.as_str(),
786 config.symbols.join(","),
787 config.from.to_string(),
788 config.to.to_string(),
789 config.product.as_deref(),
790 config.interval_name.as_deref(),
791 config.include_klines,
792 config.include_liquidations,
793 config.include_book_tickers,
794 config.include_agg_trades,
795 config.clear_duckdb_range,
796 kline_rows as i64,
797 liquidation_rows as i64,
798 book_ticker_rows as i64,
799 agg_trade_rows as i64,
800 ],
801 )
802 .map_err(storage_err)?;
803
804 Ok(PostgresToDuckDbSnapshotReport {
805 snapshot_export_id,
806 db_path: db_path.display().to_string(),
807 kline_rows,
808 liquidation_rows,
809 book_ticker_rows,
810 agg_trade_rows,
811 })
812}
813
814pub fn export_backtest_report_to_postgres(
815 postgres_url: &str,
816 report: &BacktestReport,
817) -> Result<PostgresBacktestExportReport, StorageError> {
818 let source_run_id = report.run_id.ok_or_else(|| StorageError::WriteFailedWithContext {
819 message: "backtest report export requires a persisted run_id".to_string(),
820 })?;
821 let mut client = connect(postgres_url)?;
822 init_schema(&mut client, postgres_url)?;
823
824 let export_run_id = upsert_backtest_run(&mut client, report, source_run_id)?;
825 client
826 .execute(
827 "DELETE FROM backtest_trades WHERE export_run_id = $1",
828 &[&export_run_id],
829 )
830 .map_err(storage_err)?;
831 client
832 .execute(
833 "DELETE FROM backtest_equity_points WHERE export_run_id = $1",
834 &[&export_run_id],
835 )
836 .map_err(storage_err)?;
837
838 let mut trade_rows = 0usize;
839 for trade in &report.trades {
840 insert_backtest_trade_row(&mut client, export_run_id, trade)?;
841 trade_rows += 1;
842 }
843
844 let start_time = report
845 .from
846 .and_hms_opt(0, 0, 0)
847 .ok_or_else(|| StorageError::WriteFailedWithContext {
848 message: format!("invalid backtest start date: {}", report.from),
849 })
850 .map(|value| Utc.from_utc_datetime(&value))?;
851 let mut equity_point_rows = 0usize;
852 insert_backtest_equity_point(
853 &mut client,
854 export_run_id,
855 0,
856 start_time,
857 report.starting_equity,
858 0.0,
859 )?;
860 equity_point_rows += 1;
861
862 let mut realized = report
863 .trades
864 .iter()
865 .filter_map(|trade| Some((trade.exit_time?, trade.net_pnl?, trade.trade_id as i64)))
866 .collect::<Vec<_>>();
867 realized.sort_by(|left, right| left.0.cmp(&right.0).then(left.2.cmp(&right.2)));
868
869 let mut cumulative_net_pnl = 0.0f64;
870 for (index, (exit_time, net_pnl, _)) in realized.into_iter().enumerate() {
871 cumulative_net_pnl += net_pnl;
872 insert_backtest_equity_point(
873 &mut client,
874 export_run_id,
875 index as i64 + 1,
876 exit_time,
877 report.starting_equity + cumulative_net_pnl,
878 cumulative_net_pnl,
879 )?;
880 equity_point_rows += 1;
881 }
882
883 Ok(PostgresBacktestExportReport {
884 export_run_id,
885 source_run_id,
886 trade_rows,
887 equity_point_rows,
888 })
889}
890
891fn existing_schema_version(client: &mut Client) -> Result<Option<String>, StorageError> {
892 let table_exists = client
893 .query_one(
894 "SELECT EXISTS (
895 SELECT 1 FROM information_schema.tables
896 WHERE table_schema = 'public' AND table_name = 'schema_metadata'
897 )",
898 &[],
899 )
900 .map_err(storage_err)?
901 .get::<_, bool>(0);
902 if !table_exists {
903 return Ok(None);
904 }
905 client
906 .query_opt(
907 "SELECT value FROM schema_metadata WHERE key = 'market_data_schema_version'",
908 &[],
909 )
910 .map_err(storage_err)
911 .map(|row| row.map(|row| row.get(0)))
912}
913
914fn query_count(client: &mut Client, table: &str) -> Result<u64, StorageError> {
915 client
916 .query_one(&format!("SELECT COUNT(*) FROM {table}"), &[])
917 .map_err(storage_err)
918 .map(|row| row.get::<_, i64>(0).max(0) as u64)
919}
920
921fn query_latest_timestamp(
922 client: &mut Client,
923 table: &str,
924 column: &str,
925) -> Result<Option<String>, StorageError> {
926 client
927 .query_one(
928 &format!("SELECT CAST(MAX({column}) AS TEXT) FROM {table}"),
929 &[],
930 )
931 .map_err(storage_err)
932 .map(|row| row.get(0))
933}
934
935fn query_top_symbols(client: &mut Client, table: &str) -> Result<Vec<String>, StorageError> {
936 client
937 .query(
938 &format!(
939 "SELECT symbol, COUNT(*) AS row_count FROM {table} GROUP BY symbol ORDER BY row_count DESC, symbol ASC LIMIT 5"
940 ),
941 &[],
942 )
943 .map_err(storage_err)
944 .map(|rows| {
945 rows.into_iter()
946 .map(|row| format!("{}:{}", row.get::<_, String>(0), row.get::<_, i64>(1)))
947 .collect()
948 })
949}
950
951fn upsert_backtest_run(
952 client: &mut Client,
953 report: &BacktestReport,
954 source_run_id: i64,
955) -> Result<i64, StorageError> {
956 let closed_trades = report
957 .trades
958 .iter()
959 .filter(|trade| trade.net_pnl.is_some())
960 .count() as i64;
961 let mode = report.mode.as_str().to_string();
962 let template = report.template.slug().to_string();
963 let instrument = report.instrument.clone();
964 let source_db_path = report.db_path.display().to_string();
965 client
966 .query_one(
967 "INSERT INTO backtest_runs (
968 source_db_path, source_run_id, mode, template, instrument, from_date, to_date,
969 liquidation_events, book_ticker_events, agg_trade_events, derived_kline_1s_bars,
970 trigger_count, closed_trades, open_trades, wins, losses, skipped_triggers,
971 starting_equity, ending_equity, net_pnl, observed_win_rate, average_net_pnl,
972 configured_expected_value, risk_pct, win_rate_assumption, r_multiple,
973 max_entry_slippage_pct, stop_distance_pct
974 ) VALUES (
975 $1, $2, $3, $4, $5, $6, $7,
976 $8, $9, $10, $11, $12, $13, $14, $15, $16, $17,
977 $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28
978 )
979 ON CONFLICT (source_db_path, source_run_id) DO UPDATE SET
980 exported_at = CURRENT_TIMESTAMP,
981 mode = EXCLUDED.mode,
982 template = EXCLUDED.template,
983 instrument = EXCLUDED.instrument,
984 from_date = EXCLUDED.from_date,
985 to_date = EXCLUDED.to_date,
986 liquidation_events = EXCLUDED.liquidation_events,
987 book_ticker_events = EXCLUDED.book_ticker_events,
988 agg_trade_events = EXCLUDED.agg_trade_events,
989 derived_kline_1s_bars = EXCLUDED.derived_kline_1s_bars,
990 trigger_count = EXCLUDED.trigger_count,
991 closed_trades = EXCLUDED.closed_trades,
992 open_trades = EXCLUDED.open_trades,
993 wins = EXCLUDED.wins,
994 losses = EXCLUDED.losses,
995 skipped_triggers = EXCLUDED.skipped_triggers,
996 starting_equity = EXCLUDED.starting_equity,
997 ending_equity = EXCLUDED.ending_equity,
998 net_pnl = EXCLUDED.net_pnl,
999 observed_win_rate = EXCLUDED.observed_win_rate,
1000 average_net_pnl = EXCLUDED.average_net_pnl,
1001 configured_expected_value = EXCLUDED.configured_expected_value,
1002 risk_pct = EXCLUDED.risk_pct,
1003 win_rate_assumption = EXCLUDED.win_rate_assumption,
1004 r_multiple = EXCLUDED.r_multiple,
1005 max_entry_slippage_pct = EXCLUDED.max_entry_slippage_pct,
1006 stop_distance_pct = EXCLUDED.stop_distance_pct
1007 RETURNING export_run_id",
1008 &[
1009 &source_db_path,
1010 &source_run_id,
1011 &mode,
1012 &template,
1013 &instrument,
1014 &report.from,
1015 &report.to,
1016 &(report.dataset.liquidation_events as i64),
1017 &(report.dataset.book_ticker_events as i64),
1018 &(report.dataset.agg_trade_events as i64),
1019 &(report.dataset.derived_kline_1s_bars as i64),
1020 &(report.trigger_count as i64),
1021 &closed_trades,
1022 &(report.open_trades as i64),
1023 &(report.wins as i64),
1024 &(report.losses as i64),
1025 &(report.skipped_triggers as i64),
1026 &report.starting_equity,
1027 &report.ending_equity,
1028 &report.net_pnl,
1029 &report.observed_win_rate,
1030 &report.average_net_pnl,
1031 &report.configured_expected_value,
1032 &report.config.risk_pct,
1033 &report.config.win_rate_assumption,
1034 &report.config.r_multiple,
1035 &report.config.max_entry_slippage_pct,
1036 &report.config.stop_distance_pct,
1037 ],
1038 )
1039 .map_err(|error| StorageError::WriteFailedWithContext {
1040 message: format!(
1041 "upsert backtest run failed: source_run_id={} instrument={} template={} message={error}",
1042 source_run_id,
1043 report.instrument,
1044 report.template.slug(),
1045 ),
1046 })
1047 .map(|row| row.get(0))
1048}
1049
1050fn insert_backtest_trade_row(
1051 client: &mut Client,
1052 export_run_id: i64,
1053 trade: &BacktestTrade,
1054) -> Result<(), StorageError> {
1055 client
1056 .execute(
1057 "INSERT INTO backtest_trades (
1058 export_run_id, trade_id, trigger_time, entry_time, entry_price, stop_price,
1059 take_profit_price, qty, exit_time, exit_price, exit_reason, gross_pnl, fees, net_pnl
1060 ) VALUES (
1061 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
1062 )",
1063 &[
1064 &export_run_id,
1065 &(trade.trade_id as i64),
1066 &trade.trigger_time,
1067 &trade.entry_time,
1068 &trade.entry_price,
1069 &trade.stop_price,
1070 &trade.take_profit_price,
1071 &trade.qty,
1072 &trade.exit_time,
1073 &trade.exit_price,
1074 &trade.exit_reason.as_ref().map(|reason| reason.as_str()),
1075 &trade.gross_pnl,
1076 &trade.fees,
1077 &trade.net_pnl,
1078 ],
1079 )
1080 .map(|_| ())
1081 .map_err(|error| StorageError::WriteFailedWithContext {
1082 message: format!(
1083 "insert backtest trade failed: trade_id={} message={error}",
1084 trade.trade_id
1085 ),
1086 })
1087}
1088
1089fn insert_backtest_equity_point(
1090 client: &mut Client,
1091 export_run_id: i64,
1092 point_id: i64,
1093 event_time: chrono::DateTime<Utc>,
1094 equity: f64,
1095 cumulative_net_pnl: f64,
1096) -> Result<(), StorageError> {
1097 client
1098 .execute(
1099 "INSERT INTO backtest_equity_points (
1100 export_run_id, point_id, event_time, equity, cumulative_net_pnl
1101 ) VALUES (
1102 $1, $2, $3, $4, $5
1103 )",
1104 &[&export_run_id, &point_id, &event_time, &equity, &cumulative_net_pnl],
1105 )
1106 .map(|_| ())
1107 .map_err(|error| StorageError::WriteFailedWithContext {
1108 message: format!(
1109 "insert backtest equity point failed: point_id={} message={error}",
1110 point_id
1111 ),
1112 })
1113}
1114
1115fn clear_duckdb_klines(
1116 duck: &DuckConnection,
1117 mode: BinanceMode,
1118 symbol: &str,
1119 product: Option<&str>,
1120 interval_name: Option<&str>,
1121 from_ts: &str,
1122 to_ts: &str,
1123) -> Result<(), StorageError> {
1124 duck.execute(
1125 "DELETE FROM raw_klines
1126 WHERE mode = ?
1127 AND symbol = ?
1128 AND open_time >= CAST(? AS TIMESTAMP)
1129 AND open_time <= CAST(? AS TIMESTAMP)
1130 AND (? IS NULL OR product = ?)
1131 AND (? IS NULL OR interval = ?)",
1132 params![
1133 mode.as_str(),
1134 symbol,
1135 from_ts,
1136 to_ts,
1137 product,
1138 product,
1139 interval_name,
1140 interval_name
1141 ],
1142 )
1143 .map(|_| ())
1144 .map_err(storage_err)
1145}
1146
1147fn clear_duckdb_liquidations(
1148 duck: &DuckConnection,
1149 mode: BinanceMode,
1150 symbol: &str,
1151 from_ts: &str,
1152 to_ts: &str,
1153) -> Result<(), StorageError> {
1154 duck.execute(
1155 "DELETE FROM raw_liquidation_events
1156 WHERE mode = ?
1157 AND symbol = ?
1158 AND event_time >= CAST(? AS TIMESTAMP)
1159 AND event_time <= CAST(? AS TIMESTAMP)",
1160 params![mode.as_str(), symbol, from_ts, to_ts],
1161 )
1162 .map(|_| ())
1163 .map_err(storage_err)
1164}
1165
1166fn clear_duckdb_book_tickers(
1167 duck: &DuckConnection,
1168 mode: BinanceMode,
1169 symbol: &str,
1170 from_ts: &str,
1171 to_ts: &str,
1172) -> Result<(), StorageError> {
1173 duck.execute(
1174 "DELETE FROM raw_book_ticker
1175 WHERE mode = ?
1176 AND symbol = ?
1177 AND event_time >= CAST(? AS TIMESTAMP)
1178 AND event_time <= CAST(? AS TIMESTAMP)",
1179 params![mode.as_str(), symbol, from_ts, to_ts],
1180 )
1181 .map(|_| ())
1182 .map_err(storage_err)
1183}
1184
1185fn clear_duckdb_agg_trades(
1186 duck: &DuckConnection,
1187 mode: BinanceMode,
1188 symbol: &str,
1189 from_ts: &str,
1190 to_ts: &str,
1191) -> Result<(), StorageError> {
1192 duck.execute(
1193 "DELETE FROM raw_agg_trades
1194 WHERE mode = ?
1195 AND symbol = ?
1196 AND event_time >= CAST(? AS TIMESTAMP)
1197 AND event_time <= CAST(? AS TIMESTAMP)",
1198 params![mode.as_str(), symbol, from_ts, to_ts],
1199 )
1200 .map(|_| ())
1201 .map_err(storage_err)
1202}
1203
1204fn next_duckdb_kline_id(connection: &DuckConnection) -> Result<i64, StorageError> {
1205 connection
1206 .prepare("SELECT COALESCE(MAX(kline_id), 0) + 1 FROM raw_klines")
1207 .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
1208 .map_err(storage_err)
1209}
1210
1211fn next_duckdb_liquidation_event_id(connection: &DuckConnection) -> Result<i64, StorageError> {
1212 connection
1213 .prepare("SELECT COALESCE(MAX(event_id), 0) + 1 FROM raw_liquidation_events")
1214 .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
1215 .map_err(storage_err)
1216}
1217
1218fn next_duckdb_book_ticker_id(connection: &DuckConnection) -> Result<i64, StorageError> {
1219 connection
1220 .prepare("SELECT COALESCE(MAX(tick_id), 0) + 1 FROM raw_book_ticker")
1221 .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
1222 .map_err(storage_err)
1223}
1224
1225fn next_duckdb_agg_trade_id(connection: &DuckConnection) -> Result<i64, StorageError> {
1226 connection
1227 .prepare("SELECT COALESCE(MAX(trade_id), 0) + 1 FROM raw_agg_trades")
1228 .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
1229 .map_err(storage_err)
1230}
1231
1232fn next_duckdb_snapshot_export_id(connection: &DuckConnection) -> Result<i64, StorageError> {
1233 connection
1234 .prepare("SELECT COALESCE(MAX(export_id), 0) + 1 FROM snapshot_exports")
1235 .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
1236 .map_err(storage_err)
1237}
1238
1239pub fn mask_postgres_url(url: &str) -> String {
1240 if let Some((scheme, rest)) = url.split_once("://") {
1241 if let Some((_, host_part)) = rest.rsplit_once('@') {
1242 return format!("{scheme}://***@{host_part}");
1243 }
1244 }
1245 "postgres://***".to_string()
1246}
1247
1248fn storage_err(error: impl std::fmt::Display) -> StorageError {
1249 StorageError::WriteFailedWithContext {
1250 message: error.to_string(),
1251 }
1252}