Skip to main content

sandbox_quant/storage/
postgres_market_data.rs

1use chrono::NaiveDate;
2use duckdb::{params, Connection as DuckConnection};
3use postgres::{Client, NoTls};
4
5use crate::app::bootstrap::BinanceMode;
6use crate::dataset::schema::{init_schema_for_path, MARKET_DATA_SCHEMA_VERSION};
7use crate::error::storage_error::StorageError;
8use crate::record::coordination::RecorderCoordination;
9
10pub const POSTGRES_MARKET_DATA_SCHEMA_VERSION: &str = MARKET_DATA_SCHEMA_VERSION;
11
12const POSTGRES_MARKET_DATA_SCHEMA_SQL: &str = r#"
13CREATE TABLE IF NOT EXISTS schema_metadata (
14  key TEXT PRIMARY KEY,
15  value TEXT NOT NULL,
16  updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
17);
18
19CREATE TABLE IF NOT EXISTS raw_liquidation_events (
20  event_id BIGSERIAL PRIMARY KEY,
21  mode TEXT NOT NULL,
22  product TEXT NOT NULL,
23  symbol TEXT NOT NULL,
24  event_time TIMESTAMPTZ NOT NULL,
25  receive_time TIMESTAMPTZ NOT NULL,
26  force_side TEXT NOT NULL,
27  price DOUBLE PRECISION NOT NULL,
28  qty DOUBLE PRECISION NOT NULL,
29  notional DOUBLE PRECISION NOT NULL,
30  raw_payload TEXT NOT NULL
31);
32
33CREATE UNIQUE INDEX IF NOT EXISTS raw_liquidation_events_natural_idx
34ON raw_liquidation_events (mode, product, symbol, event_time, force_side, price, qty);
35
36CREATE TABLE IF NOT EXISTS raw_book_ticker (
37  tick_id BIGSERIAL PRIMARY KEY,
38  mode TEXT NOT NULL,
39  symbol TEXT NOT NULL,
40  event_time TIMESTAMPTZ NOT NULL,
41  receive_time TIMESTAMPTZ NOT NULL,
42  bid DOUBLE PRECISION NOT NULL,
43  bid_qty DOUBLE PRECISION NOT NULL,
44  ask DOUBLE PRECISION NOT NULL,
45  ask_qty DOUBLE PRECISION NOT NULL
46);
47
48CREATE UNIQUE INDEX IF NOT EXISTS raw_book_ticker_natural_idx
49ON raw_book_ticker (mode, symbol, event_time, bid, ask, bid_qty, ask_qty);
50
51CREATE TABLE IF NOT EXISTS raw_agg_trades (
52  trade_id BIGSERIAL PRIMARY KEY,
53  mode TEXT NOT NULL,
54  symbol TEXT NOT NULL,
55  event_time TIMESTAMPTZ NOT NULL,
56  receive_time TIMESTAMPTZ NOT NULL,
57  price DOUBLE PRECISION NOT NULL,
58  qty DOUBLE PRECISION NOT NULL,
59  is_buyer_maker BOOLEAN NOT NULL
60);
61
62CREATE UNIQUE INDEX IF NOT EXISTS raw_agg_trades_natural_idx
63ON raw_agg_trades (mode, symbol, event_time, price, qty, is_buyer_maker);
64
65CREATE TABLE IF NOT EXISTS raw_klines (
66  kline_id BIGSERIAL PRIMARY KEY,
67  mode TEXT NOT NULL,
68  product TEXT NOT NULL,
69  symbol TEXT NOT NULL,
70  interval_name TEXT NOT NULL,
71  open_time TIMESTAMPTZ NOT NULL,
72  close_time TIMESTAMPTZ NOT NULL,
73  open DOUBLE PRECISION NOT NULL,
74  high DOUBLE PRECISION NOT NULL,
75  low DOUBLE PRECISION NOT NULL,
76  close DOUBLE PRECISION NOT NULL,
77  volume DOUBLE PRECISION NOT NULL,
78  quote_volume DOUBLE PRECISION NOT NULL,
79  trade_count BIGINT NOT NULL,
80  taker_buy_base_volume DOUBLE PRECISION,
81  taker_buy_quote_volume DOUBLE PRECISION,
82  raw_payload TEXT NOT NULL
83);
84
85CREATE UNIQUE INDEX IF NOT EXISTS raw_klines_natural_idx
86ON raw_klines (mode, product, symbol, interval_name, open_time);
87"#;
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum CollectorStorageBackend {
91    DuckDb,
92    Postgres,
93}
94
95impl CollectorStorageBackend {
96    pub fn as_str(self) -> &'static str {
97        match self {
98            Self::DuckDb => "duckdb",
99            Self::Postgres => "postgres",
100        }
101    }
102}
103
104#[derive(Debug, Clone, PartialEq)]
105pub struct PostgresKlineRecord {
106    pub mode: String,
107    pub product: String,
108    pub symbol: String,
109    pub interval_name: String,
110    pub open_time_ms: i64,
111    pub close_time_ms: i64,
112    pub open: f64,
113    pub high: f64,
114    pub low: f64,
115    pub close: f64,
116    pub volume: f64,
117    pub quote_volume: f64,
118    pub trade_count: i64,
119    pub taker_buy_base_volume: Option<f64>,
120    pub taker_buy_quote_volume: Option<f64>,
121    pub raw_payload: String,
122}
123
124#[derive(Debug, Clone, PartialEq)]
125pub struct PostgresLiquidationRecord {
126    pub mode: String,
127    pub product: String,
128    pub symbol: String,
129    pub event_time_ms: i64,
130    pub receive_time_ms: i64,
131    pub force_side: String,
132    pub price: f64,
133    pub qty: f64,
134    pub notional: f64,
135    pub raw_payload: String,
136}
137
138#[derive(Debug, Clone, PartialEq)]
139pub struct PostgresBookTickerRecord {
140    pub mode: String,
141    pub symbol: String,
142    pub event_time_ms: i64,
143    pub receive_time_ms: i64,
144    pub bid: f64,
145    pub bid_qty: f64,
146    pub ask: f64,
147    pub ask_qty: f64,
148}
149
150#[derive(Debug, Clone, PartialEq)]
151pub struct PostgresAggTradeRecord {
152    pub mode: String,
153    pub symbol: String,
154    pub event_time_ms: i64,
155    pub receive_time_ms: i64,
156    pub price: f64,
157    pub qty: f64,
158    pub is_buyer_maker: bool,
159}
160
161#[derive(Debug, Clone, PartialEq)]
162pub struct PostgresKlineSummaryRow {
163    pub product: String,
164    pub symbol: String,
165    pub interval_name: String,
166    pub row_count: i64,
167    pub min_time: Option<String>,
168    pub max_time: Option<String>,
169}
170
171#[derive(Debug, Clone, PartialEq)]
172pub struct PostgresLiquidationSummaryRow {
173    pub symbol: String,
174    pub row_count: i64,
175    pub min_time: Option<String>,
176    pub max_time: Option<String>,
177}
178
179#[derive(Debug, Clone, PartialEq)]
180pub struct PostgresSummary {
181    pub schema_version: String,
182    pub previous_version: Option<String>,
183    pub klines: Vec<PostgresKlineSummaryRow>,
184    pub liquidations: Vec<PostgresLiquidationSummaryRow>,
185}
186
187#[derive(Debug, Clone, PartialEq)]
188pub struct PostgresToDuckDbSnapshotConfig {
189    pub postgres_url: String,
190    pub mode: BinanceMode,
191    pub base_dir: String,
192    pub symbols: Vec<String>,
193    pub from: NaiveDate,
194    pub to: NaiveDate,
195    pub product: Option<String>,
196    pub interval_name: Option<String>,
197    pub include_klines: bool,
198    pub include_liquidations: bool,
199    pub include_book_tickers: bool,
200    pub include_agg_trades: bool,
201    pub clear_duckdb_range: bool,
202}
203
204#[derive(Debug, Clone, PartialEq, Eq)]
205pub struct PostgresToDuckDbSnapshotReport {
206    pub snapshot_export_id: i64,
207    pub db_path: String,
208    pub kline_rows: usize,
209    pub liquidation_rows: usize,
210    pub book_ticker_rows: usize,
211    pub agg_trade_rows: usize,
212}
213
214pub fn postgres_url_from_env() -> Result<String, StorageError> {
215    std::env::var("SANDBOX_QUANT_POSTGRES_URL")
216        .or_else(|_| std::env::var("DATABASE_URL"))
217        .map_err(|_| StorageError::WriteFailedWithContext {
218            message: "missing PostgreSQL URL; set SANDBOX_QUANT_POSTGRES_URL or DATABASE_URL"
219                .to_string(),
220        })
221}
222
223pub fn connect(url: &str) -> Result<Client, StorageError> {
224    Client::connect(url, NoTls).map_err(|error| StorageError::DatabaseInitFailed {
225        path: mask_postgres_url(url),
226        message: error.to_string(),
227    })
228}
229
230pub fn init_schema(client: &mut Client, url: &str) -> Result<Option<String>, StorageError> {
231    let previous_version = existing_schema_version(client)?;
232    client
233        .batch_execute(POSTGRES_MARKET_DATA_SCHEMA_SQL)
234        .map_err(|error| StorageError::DatabaseInitFailed {
235            path: mask_postgres_url(url),
236            message: error.to_string(),
237        })?;
238    client
239        .execute(
240            "INSERT INTO schema_metadata (key, value, updated_at)
241             VALUES ('market_data_schema_version', $1, CURRENT_TIMESTAMP)
242             ON CONFLICT (key) DO UPDATE
243             SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at",
244            &[&POSTGRES_MARKET_DATA_SCHEMA_VERSION],
245        )
246        .map_err(|error| StorageError::DatabaseInitFailed {
247            path: mask_postgres_url(url),
248            message: error.to_string(),
249        })?;
250    Ok(previous_version)
251}
252
253pub fn insert_kline(client: &mut Client, record: &PostgresKlineRecord) -> Result<(), StorageError> {
254    client
255        .execute(
256            "INSERT INTO raw_klines (
257                mode, product, symbol, interval_name, open_time, close_time,
258                open, high, low, close, volume, quote_volume, trade_count,
259                taker_buy_base_volume, taker_buy_quote_volume, raw_payload
260             ) VALUES (
261                $1, $2, $3, $4, to_timestamp($5 / 1000.0), to_timestamp($6 / 1000.0),
262                $7, $8, $9, $10, $11, $12, $13, $14, $15, $16
263             )
264             ON CONFLICT (mode, product, symbol, interval_name, open_time) DO NOTHING",
265            &[
266                &record.mode,
267                &record.product,
268                &record.symbol,
269                &record.interval_name,
270                &record.open_time_ms,
271                &record.close_time_ms,
272                &record.open,
273                &record.high,
274                &record.low,
275                &record.close,
276                &record.volume,
277                &record.quote_volume,
278                &record.trade_count,
279                &record.taker_buy_base_volume,
280                &record.taker_buy_quote_volume,
281                &record.raw_payload,
282            ],
283        )
284        .map(|_| ())
285        .map_err(storage_err)
286}
287
288pub fn insert_liquidation(
289    client: &mut Client,
290    record: &PostgresLiquidationRecord,
291) -> Result<(), StorageError> {
292    client
293        .execute(
294            "INSERT INTO raw_liquidation_events (
295                mode, product, symbol, event_time, receive_time, force_side, price, qty, notional, raw_payload
296             ) VALUES (
297                $1, $2, $3, to_timestamp($4 / 1000.0), to_timestamp($5 / 1000.0), $6, $7, $8, $9, $10
298             )
299             ON CONFLICT (mode, product, symbol, event_time, force_side, price, qty) DO NOTHING",
300            &[
301                &record.mode,
302                &record.product,
303                &record.symbol,
304                &record.event_time_ms,
305                &record.receive_time_ms,
306                &record.force_side,
307                &record.price,
308                &record.qty,
309                &record.notional,
310                &record.raw_payload,
311            ],
312        )
313        .map(|_| ())
314        .map_err(storage_err)
315}
316
317pub fn insert_book_ticker(
318    client: &mut Client,
319    record: &PostgresBookTickerRecord,
320) -> Result<(), StorageError> {
321    client
322        .execute(
323            "INSERT INTO raw_book_ticker (
324                mode, symbol, event_time, receive_time, bid, bid_qty, ask, ask_qty
325             ) VALUES (
326                $1, $2, to_timestamp($3 / 1000.0), to_timestamp($4 / 1000.0), $5, $6, $7, $8
327             )
328             ON CONFLICT (mode, symbol, event_time, bid, ask, bid_qty, ask_qty) DO NOTHING",
329            &[
330                &record.mode,
331                &record.symbol,
332                &record.event_time_ms,
333                &record.receive_time_ms,
334                &record.bid,
335                &record.bid_qty,
336                &record.ask,
337                &record.ask_qty,
338            ],
339        )
340        .map(|_| ())
341        .map_err(storage_err)
342}
343
344pub fn insert_agg_trade(
345    client: &mut Client,
346    record: &PostgresAggTradeRecord,
347) -> Result<(), StorageError> {
348    client
349        .execute(
350            "INSERT INTO raw_agg_trades (
351                mode, symbol, event_time, receive_time, price, qty, is_buyer_maker
352             ) VALUES (
353                $1, $2, to_timestamp($3 / 1000.0), to_timestamp($4 / 1000.0), $5, $6, $7
354             )
355             ON CONFLICT (mode, symbol, event_time, price, qty, is_buyer_maker) DO NOTHING",
356            &[
357                &record.mode,
358                &record.symbol,
359                &record.event_time_ms,
360                &record.receive_time_ms,
361                &record.price,
362                &record.qty,
363                &record.is_buyer_maker,
364            ],
365        )
366        .map(|_| ())
367        .map_err(storage_err)
368}
369
370pub fn metrics_for_postgres_url(
371    url: &str,
372) -> Result<crate::dataset::types::RecorderMetrics, StorageError> {
373    let mut client = connect(url)?;
374    let _ = init_schema(&mut client, url)?;
375    Ok(crate::dataset::types::RecorderMetrics {
376        liquidation_events: query_count(&mut client, "raw_liquidation_events")?,
377        book_ticker_events: query_count(&mut client, "raw_book_ticker")?,
378        agg_trade_events: query_count(&mut client, "raw_agg_trades")?,
379        derived_kline_1s_bars: 0,
380        schema_version: existing_schema_version(&mut client)?,
381        last_liquidation_event_time: query_latest_timestamp(
382            &mut client,
383            "raw_liquidation_events",
384            "event_time",
385        )?,
386        last_book_ticker_event_time: query_latest_timestamp(
387            &mut client,
388            "raw_book_ticker",
389            "event_time",
390        )?,
391        last_agg_trade_event_time: query_latest_timestamp(
392            &mut client,
393            "raw_agg_trades",
394            "event_time",
395        )?,
396        top_liquidation_symbols: query_top_symbols(&mut client, "raw_liquidation_events")?,
397        top_book_ticker_symbols: query_top_symbols(&mut client, "raw_book_ticker")?,
398        top_agg_trade_symbols: query_top_symbols(&mut client, "raw_agg_trades")?,
399    })
400}
401
402pub fn load_summary(
403    client: &mut Client,
404    previous_version: Option<String>,
405) -> Result<PostgresSummary, StorageError> {
406    let schema_version = existing_schema_version(client)?.unwrap_or_else(|| "missing".to_string());
407
408    let klines = client
409        .query(
410            "SELECT product, symbol, interval_name, COUNT(*) AS row_count,
411                    CAST(MIN(open_time) AS TEXT), CAST(MAX(close_time) AS TEXT)
412             FROM raw_klines
413             GROUP BY product, symbol, interval_name
414             ORDER BY product, symbol, interval_name",
415            &[],
416        )
417        .map_err(storage_err)?
418        .into_iter()
419        .map(|row| PostgresKlineSummaryRow {
420            product: row.get(0),
421            symbol: row.get(1),
422            interval_name: row.get(2),
423            row_count: row.get(3),
424            min_time: row.get(4),
425            max_time: row.get(5),
426        })
427        .collect();
428
429    let liquidations = client
430        .query(
431            "SELECT symbol, COUNT(*) AS row_count,
432                    CAST(MIN(event_time) AS TEXT), CAST(MAX(event_time) AS TEXT)
433             FROM raw_liquidation_events
434             GROUP BY symbol
435             ORDER BY symbol",
436            &[],
437        )
438        .map_err(storage_err)?
439        .into_iter()
440        .map(|row| PostgresLiquidationSummaryRow {
441            symbol: row.get(0),
442            row_count: row.get(1),
443            min_time: row.get(2),
444            max_time: row.get(3),
445        })
446        .collect();
447
448    Ok(PostgresSummary {
449        schema_version,
450        previous_version,
451        klines,
452        liquidations,
453    })
454}
455
456pub fn export_snapshot_to_duckdb(
457    config: &PostgresToDuckDbSnapshotConfig,
458) -> Result<PostgresToDuckDbSnapshotReport, StorageError> {
459    let mut client = connect(&config.postgres_url)?;
460    init_schema(&mut client, &config.postgres_url)?;
461
462    let db_path = RecorderCoordination::new(config.base_dir.clone()).db_path(config.mode);
463    init_schema_for_path(&db_path)?;
464    let duck =
465        DuckConnection::open(&db_path).map_err(|error| StorageError::DatabaseInitFailed {
466            path: db_path.display().to_string(),
467            message: error.to_string(),
468        })?;
469
470    let from_ts = format!("{} 00:00:00", config.from);
471    let to_ts = format!("{} 23:59:59", config.to);
472
473    let mut kline_rows = 0usize;
474    let mut liquidation_rows = 0usize;
475    let mut book_ticker_rows = 0usize;
476    let mut agg_trade_rows = 0usize;
477
478    for symbol in &config.symbols {
479        if config.clear_duckdb_range {
480            if config.include_klines {
481                clear_duckdb_klines(
482                    &duck,
483                    config.mode,
484                    symbol,
485                    config.product.as_deref(),
486                    config.interval_name.as_deref(),
487                    &from_ts,
488                    &to_ts,
489                )?;
490            }
491            if config.include_liquidations {
492                clear_duckdb_liquidations(&duck, config.mode, symbol, &from_ts, &to_ts)?;
493            }
494            if config.include_book_tickers {
495                clear_duckdb_book_tickers(&duck, config.mode, symbol, &from_ts, &to_ts)?;
496            }
497            if config.include_agg_trades {
498                clear_duckdb_agg_trades(&duck, config.mode, symbol, &from_ts, &to_ts)?;
499            }
500        }
501
502        if config.include_klines {
503            let rows = client
504                .query(
505                    "SELECT product, symbol, interval_name,
506                            (EXTRACT(EPOCH FROM open_time) * 1000)::BIGINT AS open_time_ms,
507                            (EXTRACT(EPOCH FROM close_time) * 1000)::BIGINT AS close_time_ms,
508                            open, high, low, close, volume, quote_volume, trade_count,
509                            taker_buy_base_volume, taker_buy_quote_volume, raw_payload
510                     FROM raw_klines
511                     WHERE mode = $1
512                       AND symbol = $2
513                       AND open_time >= CAST($3 AS TIMESTAMPTZ)
514                       AND open_time <= CAST($4 AS TIMESTAMPTZ)
515                       AND ($5::TEXT IS NULL OR product = $5)
516                       AND ($6::TEXT IS NULL OR interval_name = $6)
517                     ORDER BY open_time ASC",
518                    &[
519                        &config.mode.as_str(),
520                        &symbol,
521                        &from_ts,
522                        &to_ts,
523                        &config.product,
524                        &config.interval_name,
525                    ],
526                )
527                .map_err(storage_err)?;
528
529            let mut next_id = next_duckdb_kline_id(&duck)?;
530            for row in rows {
531                duck.execute(
532                    "INSERT INTO raw_klines (
533                        kline_id, mode, product, symbol, interval, open_time, close_time,
534                        open, high, low, close, volume, quote_volume, trade_count,
535                        taker_buy_base_volume, taker_buy_quote_volume, raw_payload
536                     ) VALUES (
537                        ?, ?, ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0),
538                        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
539                     )",
540                    params![
541                        next_id,
542                        config.mode.as_str(),
543                        row.get::<_, String>(0),
544                        row.get::<_, String>(1),
545                        row.get::<_, String>(2),
546                        row.get::<_, i64>(3),
547                        row.get::<_, i64>(4),
548                        row.get::<_, f64>(5),
549                        row.get::<_, f64>(6),
550                        row.get::<_, f64>(7),
551                        row.get::<_, f64>(8),
552                        row.get::<_, f64>(9),
553                        row.get::<_, f64>(10),
554                        row.get::<_, i64>(11),
555                        row.get::<_, Option<f64>>(12),
556                        row.get::<_, Option<f64>>(13),
557                        row.get::<_, String>(14),
558                    ],
559                )
560                .map_err(storage_err)?;
561                next_id += 1;
562                kline_rows += 1;
563            }
564        }
565
566        if config.include_liquidations {
567            let rows = client
568                .query(
569                    "SELECT symbol,
570                            (EXTRACT(EPOCH FROM event_time) * 1000)::BIGINT AS event_time_ms,
571                            (EXTRACT(EPOCH FROM receive_time) * 1000)::BIGINT AS receive_time_ms,
572                            force_side, price, qty, notional, raw_payload
573                     FROM raw_liquidation_events
574                     WHERE mode = $1
575                       AND symbol = $2
576                       AND event_time >= CAST($3 AS TIMESTAMPTZ)
577                       AND event_time <= CAST($4 AS TIMESTAMPTZ)
578                     ORDER BY event_time ASC",
579                    &[&config.mode.as_str(), &symbol, &from_ts, &to_ts],
580                )
581                .map_err(storage_err)?;
582            let mut next_id = next_duckdb_liquidation_event_id(&duck)?;
583            for row in rows {
584                duck.execute(
585                    "INSERT INTO raw_liquidation_events (
586                        event_id, mode, symbol, event_time, receive_time, force_side, price, qty, notional, raw_payload
587                     ) VALUES (
588                        ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?, ?
589                     )",
590                    params![
591                        next_id,
592                        config.mode.as_str(),
593                        row.get::<_, String>(0),
594                        row.get::<_, i64>(1),
595                        row.get::<_, i64>(2),
596                        row.get::<_, String>(3),
597                        row.get::<_, f64>(4),
598                        row.get::<_, f64>(5),
599                        row.get::<_, f64>(6),
600                        row.get::<_, String>(7),
601                    ],
602                )
603                .map_err(storage_err)?;
604                next_id += 1;
605                liquidation_rows += 1;
606            }
607        }
608
609        if config.include_book_tickers {
610            let rows = client
611                .query(
612                    "SELECT symbol,
613                            (EXTRACT(EPOCH FROM event_time) * 1000)::BIGINT AS event_time_ms,
614                            (EXTRACT(EPOCH FROM receive_time) * 1000)::BIGINT AS receive_time_ms,
615                            bid, bid_qty, ask, ask_qty
616                     FROM raw_book_ticker
617                     WHERE mode = $1
618                       AND symbol = $2
619                       AND event_time >= CAST($3 AS TIMESTAMPTZ)
620                       AND event_time <= CAST($4 AS TIMESTAMPTZ)
621                     ORDER BY event_time ASC",
622                    &[&config.mode.as_str(), &symbol, &from_ts, &to_ts],
623                )
624                .map_err(storage_err)?;
625            let mut next_id = next_duckdb_book_ticker_id(&duck)?;
626            for row in rows {
627                duck.execute(
628                    "INSERT INTO raw_book_ticker (
629                        tick_id, mode, symbol, event_time, receive_time, bid, bid_qty, ask, ask_qty
630                     ) VALUES (
631                        ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?
632                     )",
633                    params![
634                        next_id,
635                        config.mode.as_str(),
636                        row.get::<_, String>(0),
637                        row.get::<_, i64>(1),
638                        row.get::<_, i64>(2),
639                        row.get::<_, f64>(3),
640                        row.get::<_, f64>(4),
641                        row.get::<_, f64>(5),
642                        row.get::<_, f64>(6),
643                    ],
644                )
645                .map_err(storage_err)?;
646                next_id += 1;
647                book_ticker_rows += 1;
648            }
649        }
650
651        if config.include_agg_trades {
652            let rows = client
653                .query(
654                    "SELECT symbol,
655                            (EXTRACT(EPOCH FROM event_time) * 1000)::BIGINT AS event_time_ms,
656                            (EXTRACT(EPOCH FROM receive_time) * 1000)::BIGINT AS receive_time_ms,
657                            price, qty, is_buyer_maker
658                     FROM raw_agg_trades
659                     WHERE mode = $1
660                       AND symbol = $2
661                       AND event_time >= CAST($3 AS TIMESTAMPTZ)
662                       AND event_time <= CAST($4 AS TIMESTAMPTZ)
663                     ORDER BY event_time ASC",
664                    &[&config.mode.as_str(), &symbol, &from_ts, &to_ts],
665                )
666                .map_err(storage_err)?;
667            let mut next_id = next_duckdb_agg_trade_id(&duck)?;
668            for row in rows {
669                duck.execute(
670                    "INSERT INTO raw_agg_trades (
671                        trade_id, mode, symbol, event_time, receive_time, price, qty, is_buyer_maker
672                     ) VALUES (
673                        ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?
674                     )",
675                    params![
676                        next_id,
677                        config.mode.as_str(),
678                        row.get::<_, String>(0),
679                        row.get::<_, i64>(1),
680                        row.get::<_, i64>(2),
681                        row.get::<_, f64>(3),
682                        row.get::<_, f64>(4),
683                        row.get::<_, bool>(5),
684                    ],
685                )
686                .map_err(storage_err)?;
687                next_id += 1;
688                agg_trade_rows += 1;
689            }
690        }
691    }
692
693    let snapshot_export_id = next_duckdb_snapshot_export_id(&duck)?;
694    duck.execute(
695        "INSERT INTO snapshot_exports (
696            export_id, created_at, source_backend, source_target, mode, symbols_csv, from_date, to_date,
697            product, interval_name, include_klines, include_liquidations, include_book_tickers, include_agg_trades,
698            clear_duckdb_range, exported_kline_rows, exported_liquidation_rows, exported_book_ticker_rows, exported_agg_trade_rows
699         ) VALUES (
700            ?, CURRENT_TIMESTAMP, 'postgres', ?, ?, ?, CAST(? AS DATE), CAST(? AS DATE),
701            ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
702         )",
703        params![
704            snapshot_export_id,
705            mask_postgres_url(&config.postgres_url),
706            config.mode.as_str(),
707            config.symbols.join(","),
708            config.from.to_string(),
709            config.to.to_string(),
710            config.product.as_deref(),
711            config.interval_name.as_deref(),
712            config.include_klines,
713            config.include_liquidations,
714            config.include_book_tickers,
715            config.include_agg_trades,
716            config.clear_duckdb_range,
717            kline_rows as i64,
718            liquidation_rows as i64,
719            book_ticker_rows as i64,
720            agg_trade_rows as i64,
721        ],
722    )
723    .map_err(storage_err)?;
724
725    Ok(PostgresToDuckDbSnapshotReport {
726        snapshot_export_id,
727        db_path: db_path.display().to_string(),
728        kline_rows,
729        liquidation_rows,
730        book_ticker_rows,
731        agg_trade_rows,
732    })
733}
734
735fn existing_schema_version(client: &mut Client) -> Result<Option<String>, StorageError> {
736    let table_exists = client
737        .query_one(
738            "SELECT EXISTS (
739                SELECT 1 FROM information_schema.tables
740                WHERE table_schema = 'public' AND table_name = 'schema_metadata'
741            )",
742            &[],
743        )
744        .map_err(storage_err)?
745        .get::<_, bool>(0);
746    if !table_exists {
747        return Ok(None);
748    }
749    client
750        .query_opt(
751            "SELECT value FROM schema_metadata WHERE key = 'market_data_schema_version'",
752            &[],
753        )
754        .map_err(storage_err)
755        .map(|row| row.map(|row| row.get(0)))
756}
757
758fn query_count(client: &mut Client, table: &str) -> Result<u64, StorageError> {
759    client
760        .query_one(&format!("SELECT COUNT(*) FROM {table}"), &[])
761        .map_err(storage_err)
762        .map(|row| row.get::<_, i64>(0).max(0) as u64)
763}
764
765fn query_latest_timestamp(
766    client: &mut Client,
767    table: &str,
768    column: &str,
769) -> Result<Option<String>, StorageError> {
770    client
771        .query_one(
772            &format!("SELECT CAST(MAX({column}) AS TEXT) FROM {table}"),
773            &[],
774        )
775        .map_err(storage_err)
776        .map(|row| row.get(0))
777}
778
779fn query_top_symbols(client: &mut Client, table: &str) -> Result<Vec<String>, StorageError> {
780    client
781        .query(
782            &format!(
783                "SELECT symbol, COUNT(*) AS row_count FROM {table} GROUP BY symbol ORDER BY row_count DESC, symbol ASC LIMIT 5"
784            ),
785            &[],
786        )
787        .map_err(storage_err)
788        .map(|rows| {
789            rows.into_iter()
790                .map(|row| format!("{}:{}", row.get::<_, String>(0), row.get::<_, i64>(1)))
791                .collect()
792        })
793}
794
795fn clear_duckdb_klines(
796    duck: &DuckConnection,
797    mode: BinanceMode,
798    symbol: &str,
799    product: Option<&str>,
800    interval_name: Option<&str>,
801    from_ts: &str,
802    to_ts: &str,
803) -> Result<(), StorageError> {
804    duck.execute(
805        "DELETE FROM raw_klines
806         WHERE mode = ?
807           AND symbol = ?
808           AND open_time >= CAST(? AS TIMESTAMP)
809           AND open_time <= CAST(? AS TIMESTAMP)
810           AND (? IS NULL OR product = ?)
811           AND (? IS NULL OR interval = ?)",
812        params![
813            mode.as_str(),
814            symbol,
815            from_ts,
816            to_ts,
817            product,
818            product,
819            interval_name,
820            interval_name
821        ],
822    )
823    .map(|_| ())
824    .map_err(storage_err)
825}
826
827fn clear_duckdb_liquidations(
828    duck: &DuckConnection,
829    mode: BinanceMode,
830    symbol: &str,
831    from_ts: &str,
832    to_ts: &str,
833) -> Result<(), StorageError> {
834    duck.execute(
835        "DELETE FROM raw_liquidation_events
836         WHERE mode = ?
837           AND symbol = ?
838           AND event_time >= CAST(? AS TIMESTAMP)
839           AND event_time <= CAST(? AS TIMESTAMP)",
840        params![mode.as_str(), symbol, from_ts, to_ts],
841    )
842    .map(|_| ())
843    .map_err(storage_err)
844}
845
846fn clear_duckdb_book_tickers(
847    duck: &DuckConnection,
848    mode: BinanceMode,
849    symbol: &str,
850    from_ts: &str,
851    to_ts: &str,
852) -> Result<(), StorageError> {
853    duck.execute(
854        "DELETE FROM raw_book_ticker
855         WHERE mode = ?
856           AND symbol = ?
857           AND event_time >= CAST(? AS TIMESTAMP)
858           AND event_time <= CAST(? AS TIMESTAMP)",
859        params![mode.as_str(), symbol, from_ts, to_ts],
860    )
861    .map(|_| ())
862    .map_err(storage_err)
863}
864
865fn clear_duckdb_agg_trades(
866    duck: &DuckConnection,
867    mode: BinanceMode,
868    symbol: &str,
869    from_ts: &str,
870    to_ts: &str,
871) -> Result<(), StorageError> {
872    duck.execute(
873        "DELETE FROM raw_agg_trades
874         WHERE mode = ?
875           AND symbol = ?
876           AND event_time >= CAST(? AS TIMESTAMP)
877           AND event_time <= CAST(? AS TIMESTAMP)",
878        params![mode.as_str(), symbol, from_ts, to_ts],
879    )
880    .map(|_| ())
881    .map_err(storage_err)
882}
883
884fn next_duckdb_kline_id(connection: &DuckConnection) -> Result<i64, StorageError> {
885    connection
886        .prepare("SELECT COALESCE(MAX(kline_id), 0) + 1 FROM raw_klines")
887        .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
888        .map_err(storage_err)
889}
890
891fn next_duckdb_liquidation_event_id(connection: &DuckConnection) -> Result<i64, StorageError> {
892    connection
893        .prepare("SELECT COALESCE(MAX(event_id), 0) + 1 FROM raw_liquidation_events")
894        .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
895        .map_err(storage_err)
896}
897
898fn next_duckdb_book_ticker_id(connection: &DuckConnection) -> Result<i64, StorageError> {
899    connection
900        .prepare("SELECT COALESCE(MAX(tick_id), 0) + 1 FROM raw_book_ticker")
901        .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
902        .map_err(storage_err)
903}
904
905fn next_duckdb_agg_trade_id(connection: &DuckConnection) -> Result<i64, StorageError> {
906    connection
907        .prepare("SELECT COALESCE(MAX(trade_id), 0) + 1 FROM raw_agg_trades")
908        .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
909        .map_err(storage_err)
910}
911
912fn next_duckdb_snapshot_export_id(connection: &DuckConnection) -> Result<i64, StorageError> {
913    connection
914        .prepare("SELECT COALESCE(MAX(export_id), 0) + 1 FROM snapshot_exports")
915        .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
916        .map_err(storage_err)
917}
918
919pub fn mask_postgres_url(url: &str) -> String {
920    if let Some((scheme, rest)) = url.split_once("://") {
921        if let Some((_, host_part)) = rest.rsplit_once('@') {
922            return format!("{scheme}://***@{host_part}");
923        }
924    }
925    "postgres://***".to_string()
926}
927
928fn storage_err(error: impl std::fmt::Display) -> StorageError {
929    StorageError::WriteFailedWithContext {
930        message: error.to_string(),
931    }
932}