1use chrono::NaiveDate;
2use duckdb::{params, Connection as DuckConnection};
3use postgres::{Client, NoTls};
4
5use crate::app::bootstrap::BinanceMode;
6use crate::dataset::schema::{init_schema_for_path, MARKET_DATA_SCHEMA_VERSION};
7use crate::error::storage_error::StorageError;
8use crate::record::coordination::RecorderCoordination;
9
10pub const POSTGRES_MARKET_DATA_SCHEMA_VERSION: &str = MARKET_DATA_SCHEMA_VERSION;
11
12const POSTGRES_MARKET_DATA_SCHEMA_SQL: &str = r#"
13CREATE TABLE IF NOT EXISTS schema_metadata (
14 key TEXT PRIMARY KEY,
15 value TEXT NOT NULL,
16 updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
17);
18
19CREATE TABLE IF NOT EXISTS raw_liquidation_events (
20 event_id BIGSERIAL PRIMARY KEY,
21 mode TEXT NOT NULL,
22 product TEXT NOT NULL,
23 symbol TEXT NOT NULL,
24 event_time TIMESTAMPTZ NOT NULL,
25 receive_time TIMESTAMPTZ NOT NULL,
26 force_side TEXT NOT NULL,
27 price DOUBLE PRECISION NOT NULL,
28 qty DOUBLE PRECISION NOT NULL,
29 notional DOUBLE PRECISION NOT NULL,
30 raw_payload TEXT NOT NULL
31);
32
33CREATE UNIQUE INDEX IF NOT EXISTS raw_liquidation_events_natural_idx
34ON raw_liquidation_events (mode, product, symbol, event_time, force_side, price, qty);
35
36CREATE TABLE IF NOT EXISTS raw_book_ticker (
37 tick_id BIGSERIAL PRIMARY KEY,
38 mode TEXT NOT NULL,
39 symbol TEXT NOT NULL,
40 event_time TIMESTAMPTZ NOT NULL,
41 receive_time TIMESTAMPTZ NOT NULL,
42 bid DOUBLE PRECISION NOT NULL,
43 bid_qty DOUBLE PRECISION NOT NULL,
44 ask DOUBLE PRECISION NOT NULL,
45 ask_qty DOUBLE PRECISION NOT NULL
46);
47
48CREATE UNIQUE INDEX IF NOT EXISTS raw_book_ticker_natural_idx
49ON raw_book_ticker (mode, symbol, event_time, bid, ask, bid_qty, ask_qty);
50
51CREATE TABLE IF NOT EXISTS raw_agg_trades (
52 trade_id BIGSERIAL PRIMARY KEY,
53 mode TEXT NOT NULL,
54 symbol TEXT NOT NULL,
55 event_time TIMESTAMPTZ NOT NULL,
56 receive_time TIMESTAMPTZ NOT NULL,
57 price DOUBLE PRECISION NOT NULL,
58 qty DOUBLE PRECISION NOT NULL,
59 is_buyer_maker BOOLEAN NOT NULL
60);
61
62CREATE UNIQUE INDEX IF NOT EXISTS raw_agg_trades_natural_idx
63ON raw_agg_trades (mode, symbol, event_time, price, qty, is_buyer_maker);
64
65CREATE TABLE IF NOT EXISTS raw_klines (
66 kline_id BIGSERIAL PRIMARY KEY,
67 mode TEXT NOT NULL,
68 product TEXT NOT NULL,
69 symbol TEXT NOT NULL,
70 interval_name TEXT NOT NULL,
71 open_time TIMESTAMPTZ NOT NULL,
72 close_time TIMESTAMPTZ NOT NULL,
73 open DOUBLE PRECISION NOT NULL,
74 high DOUBLE PRECISION NOT NULL,
75 low DOUBLE PRECISION NOT NULL,
76 close DOUBLE PRECISION NOT NULL,
77 volume DOUBLE PRECISION NOT NULL,
78 quote_volume DOUBLE PRECISION NOT NULL,
79 trade_count BIGINT NOT NULL,
80 taker_buy_base_volume DOUBLE PRECISION,
81 taker_buy_quote_volume DOUBLE PRECISION,
82 raw_payload TEXT NOT NULL
83);
84
85CREATE UNIQUE INDEX IF NOT EXISTS raw_klines_natural_idx
86ON raw_klines (mode, product, symbol, interval_name, open_time);
87"#;
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum CollectorStorageBackend {
91 DuckDb,
92 Postgres,
93}
94
95impl CollectorStorageBackend {
96 pub fn as_str(self) -> &'static str {
97 match self {
98 Self::DuckDb => "duckdb",
99 Self::Postgres => "postgres",
100 }
101 }
102}
103
104#[derive(Debug, Clone, PartialEq)]
105pub struct PostgresKlineRecord {
106 pub mode: String,
107 pub product: String,
108 pub symbol: String,
109 pub interval_name: String,
110 pub open_time_ms: i64,
111 pub close_time_ms: i64,
112 pub open: f64,
113 pub high: f64,
114 pub low: f64,
115 pub close: f64,
116 pub volume: f64,
117 pub quote_volume: f64,
118 pub trade_count: i64,
119 pub taker_buy_base_volume: Option<f64>,
120 pub taker_buy_quote_volume: Option<f64>,
121 pub raw_payload: String,
122}
123
124#[derive(Debug, Clone, PartialEq)]
125pub struct PostgresLiquidationRecord {
126 pub mode: String,
127 pub product: String,
128 pub symbol: String,
129 pub event_time_ms: i64,
130 pub receive_time_ms: i64,
131 pub force_side: String,
132 pub price: f64,
133 pub qty: f64,
134 pub notional: f64,
135 pub raw_payload: String,
136}
137
138#[derive(Debug, Clone, PartialEq)]
139pub struct PostgresBookTickerRecord {
140 pub mode: String,
141 pub symbol: String,
142 pub event_time_ms: i64,
143 pub receive_time_ms: i64,
144 pub bid: f64,
145 pub bid_qty: f64,
146 pub ask: f64,
147 pub ask_qty: f64,
148}
149
150#[derive(Debug, Clone, PartialEq)]
151pub struct PostgresAggTradeRecord {
152 pub mode: String,
153 pub symbol: String,
154 pub event_time_ms: i64,
155 pub receive_time_ms: i64,
156 pub price: f64,
157 pub qty: f64,
158 pub is_buyer_maker: bool,
159}
160
161#[derive(Debug, Clone, PartialEq)]
162pub struct PostgresKlineSummaryRow {
163 pub product: String,
164 pub symbol: String,
165 pub interval_name: String,
166 pub row_count: i64,
167 pub min_time: Option<String>,
168 pub max_time: Option<String>,
169}
170
171#[derive(Debug, Clone, PartialEq)]
172pub struct PostgresLiquidationSummaryRow {
173 pub symbol: String,
174 pub row_count: i64,
175 pub min_time: Option<String>,
176 pub max_time: Option<String>,
177}
178
179#[derive(Debug, Clone, PartialEq)]
180pub struct PostgresSummary {
181 pub schema_version: String,
182 pub previous_version: Option<String>,
183 pub klines: Vec<PostgresKlineSummaryRow>,
184 pub liquidations: Vec<PostgresLiquidationSummaryRow>,
185}
186
187#[derive(Debug, Clone, PartialEq)]
188pub struct PostgresToDuckDbSnapshotConfig {
189 pub postgres_url: String,
190 pub mode: BinanceMode,
191 pub base_dir: String,
192 pub symbols: Vec<String>,
193 pub from: NaiveDate,
194 pub to: NaiveDate,
195 pub product: Option<String>,
196 pub interval_name: Option<String>,
197 pub include_klines: bool,
198 pub include_liquidations: bool,
199 pub include_book_tickers: bool,
200 pub include_agg_trades: bool,
201 pub clear_duckdb_range: bool,
202}
203
204#[derive(Debug, Clone, PartialEq, Eq)]
205pub struct PostgresToDuckDbSnapshotReport {
206 pub snapshot_export_id: i64,
207 pub db_path: String,
208 pub kline_rows: usize,
209 pub liquidation_rows: usize,
210 pub book_ticker_rows: usize,
211 pub agg_trade_rows: usize,
212}
213
214pub fn postgres_url_from_env() -> Result<String, StorageError> {
215 std::env::var("SANDBOX_QUANT_POSTGRES_URL")
216 .or_else(|_| std::env::var("DATABASE_URL"))
217 .map_err(|_| StorageError::WriteFailedWithContext {
218 message: "missing PostgreSQL URL; set SANDBOX_QUANT_POSTGRES_URL or DATABASE_URL"
219 .to_string(),
220 })
221}
222
223pub fn connect(url: &str) -> Result<Client, StorageError> {
224 Client::connect(url, NoTls).map_err(|error| StorageError::DatabaseInitFailed {
225 path: mask_postgres_url(url),
226 message: error.to_string(),
227 })
228}
229
230pub fn init_schema(client: &mut Client, url: &str) -> Result<Option<String>, StorageError> {
231 let previous_version = existing_schema_version(client)?;
232 client
233 .batch_execute(POSTGRES_MARKET_DATA_SCHEMA_SQL)
234 .map_err(|error| StorageError::DatabaseInitFailed {
235 path: mask_postgres_url(url),
236 message: error.to_string(),
237 })?;
238 client
239 .execute(
240 "INSERT INTO schema_metadata (key, value, updated_at)
241 VALUES ('market_data_schema_version', $1, CURRENT_TIMESTAMP)
242 ON CONFLICT (key) DO UPDATE
243 SET value = EXCLUDED.value, updated_at = EXCLUDED.updated_at",
244 &[&POSTGRES_MARKET_DATA_SCHEMA_VERSION],
245 )
246 .map_err(|error| StorageError::DatabaseInitFailed {
247 path: mask_postgres_url(url),
248 message: error.to_string(),
249 })?;
250 Ok(previous_version)
251}
252
253pub fn insert_kline(client: &mut Client, record: &PostgresKlineRecord) -> Result<(), StorageError> {
254 client
255 .execute(
256 "INSERT INTO raw_klines (
257 mode, product, symbol, interval_name, open_time, close_time,
258 open, high, low, close, volume, quote_volume, trade_count,
259 taker_buy_base_volume, taker_buy_quote_volume, raw_payload
260 ) VALUES (
261 $1, $2, $3, $4, to_timestamp($5 / 1000.0), to_timestamp($6 / 1000.0),
262 $7, $8, $9, $10, $11, $12, $13, $14, $15, $16
263 )
264 ON CONFLICT (mode, product, symbol, interval_name, open_time) DO NOTHING",
265 &[
266 &record.mode,
267 &record.product,
268 &record.symbol,
269 &record.interval_name,
270 &record.open_time_ms,
271 &record.close_time_ms,
272 &record.open,
273 &record.high,
274 &record.low,
275 &record.close,
276 &record.volume,
277 &record.quote_volume,
278 &record.trade_count,
279 &record.taker_buy_base_volume,
280 &record.taker_buy_quote_volume,
281 &record.raw_payload,
282 ],
283 )
284 .map(|_| ())
285 .map_err(storage_err)
286}
287
288pub fn insert_liquidation(
289 client: &mut Client,
290 record: &PostgresLiquidationRecord,
291) -> Result<(), StorageError> {
292 client
293 .execute(
294 "INSERT INTO raw_liquidation_events (
295 mode, product, symbol, event_time, receive_time, force_side, price, qty, notional, raw_payload
296 ) VALUES (
297 $1, $2, $3, to_timestamp($4 / 1000.0), to_timestamp($5 / 1000.0), $6, $7, $8, $9, $10
298 )
299 ON CONFLICT (mode, product, symbol, event_time, force_side, price, qty) DO NOTHING",
300 &[
301 &record.mode,
302 &record.product,
303 &record.symbol,
304 &record.event_time_ms,
305 &record.receive_time_ms,
306 &record.force_side,
307 &record.price,
308 &record.qty,
309 &record.notional,
310 &record.raw_payload,
311 ],
312 )
313 .map(|_| ())
314 .map_err(storage_err)
315}
316
317pub fn insert_book_ticker(
318 client: &mut Client,
319 record: &PostgresBookTickerRecord,
320) -> Result<(), StorageError> {
321 client
322 .execute(
323 "INSERT INTO raw_book_ticker (
324 mode, symbol, event_time, receive_time, bid, bid_qty, ask, ask_qty
325 ) VALUES (
326 $1, $2, to_timestamp($3 / 1000.0), to_timestamp($4 / 1000.0), $5, $6, $7, $8
327 )
328 ON CONFLICT (mode, symbol, event_time, bid, ask, bid_qty, ask_qty) DO NOTHING",
329 &[
330 &record.mode,
331 &record.symbol,
332 &record.event_time_ms,
333 &record.receive_time_ms,
334 &record.bid,
335 &record.bid_qty,
336 &record.ask,
337 &record.ask_qty,
338 ],
339 )
340 .map(|_| ())
341 .map_err(storage_err)
342}
343
344pub fn insert_agg_trade(
345 client: &mut Client,
346 record: &PostgresAggTradeRecord,
347) -> Result<(), StorageError> {
348 client
349 .execute(
350 "INSERT INTO raw_agg_trades (
351 mode, symbol, event_time, receive_time, price, qty, is_buyer_maker
352 ) VALUES (
353 $1, $2, to_timestamp($3 / 1000.0), to_timestamp($4 / 1000.0), $5, $6, $7
354 )
355 ON CONFLICT (mode, symbol, event_time, price, qty, is_buyer_maker) DO NOTHING",
356 &[
357 &record.mode,
358 &record.symbol,
359 &record.event_time_ms,
360 &record.receive_time_ms,
361 &record.price,
362 &record.qty,
363 &record.is_buyer_maker,
364 ],
365 )
366 .map(|_| ())
367 .map_err(storage_err)
368}
369
370pub fn metrics_for_postgres_url(
371 url: &str,
372) -> Result<crate::dataset::types::RecorderMetrics, StorageError> {
373 let mut client = connect(url)?;
374 let _ = init_schema(&mut client, url)?;
375 Ok(crate::dataset::types::RecorderMetrics {
376 liquidation_events: query_count(&mut client, "raw_liquidation_events")?,
377 book_ticker_events: query_count(&mut client, "raw_book_ticker")?,
378 agg_trade_events: query_count(&mut client, "raw_agg_trades")?,
379 derived_kline_1s_bars: 0,
380 schema_version: existing_schema_version(&mut client)?,
381 last_liquidation_event_time: query_latest_timestamp(
382 &mut client,
383 "raw_liquidation_events",
384 "event_time",
385 )?,
386 last_book_ticker_event_time: query_latest_timestamp(
387 &mut client,
388 "raw_book_ticker",
389 "event_time",
390 )?,
391 last_agg_trade_event_time: query_latest_timestamp(
392 &mut client,
393 "raw_agg_trades",
394 "event_time",
395 )?,
396 top_liquidation_symbols: query_top_symbols(&mut client, "raw_liquidation_events")?,
397 top_book_ticker_symbols: query_top_symbols(&mut client, "raw_book_ticker")?,
398 top_agg_trade_symbols: query_top_symbols(&mut client, "raw_agg_trades")?,
399 })
400}
401
402pub fn load_summary(
403 client: &mut Client,
404 previous_version: Option<String>,
405) -> Result<PostgresSummary, StorageError> {
406 let schema_version = existing_schema_version(client)?.unwrap_or_else(|| "missing".to_string());
407
408 let klines = client
409 .query(
410 "SELECT product, symbol, interval_name, COUNT(*) AS row_count,
411 CAST(MIN(open_time) AS TEXT), CAST(MAX(close_time) AS TEXT)
412 FROM raw_klines
413 GROUP BY product, symbol, interval_name
414 ORDER BY product, symbol, interval_name",
415 &[],
416 )
417 .map_err(storage_err)?
418 .into_iter()
419 .map(|row| PostgresKlineSummaryRow {
420 product: row.get(0),
421 symbol: row.get(1),
422 interval_name: row.get(2),
423 row_count: row.get(3),
424 min_time: row.get(4),
425 max_time: row.get(5),
426 })
427 .collect();
428
429 let liquidations = client
430 .query(
431 "SELECT symbol, COUNT(*) AS row_count,
432 CAST(MIN(event_time) AS TEXT), CAST(MAX(event_time) AS TEXT)
433 FROM raw_liquidation_events
434 GROUP BY symbol
435 ORDER BY symbol",
436 &[],
437 )
438 .map_err(storage_err)?
439 .into_iter()
440 .map(|row| PostgresLiquidationSummaryRow {
441 symbol: row.get(0),
442 row_count: row.get(1),
443 min_time: row.get(2),
444 max_time: row.get(3),
445 })
446 .collect();
447
448 Ok(PostgresSummary {
449 schema_version,
450 previous_version,
451 klines,
452 liquidations,
453 })
454}
455
456pub fn export_snapshot_to_duckdb(
457 config: &PostgresToDuckDbSnapshotConfig,
458) -> Result<PostgresToDuckDbSnapshotReport, StorageError> {
459 let mut client = connect(&config.postgres_url)?;
460 init_schema(&mut client, &config.postgres_url)?;
461
462 let db_path = RecorderCoordination::new(config.base_dir.clone()).db_path(config.mode);
463 init_schema_for_path(&db_path)?;
464 let duck =
465 DuckConnection::open(&db_path).map_err(|error| StorageError::DatabaseInitFailed {
466 path: db_path.display().to_string(),
467 message: error.to_string(),
468 })?;
469
470 let from_ts = format!("{} 00:00:00", config.from);
471 let to_ts = format!("{} 23:59:59", config.to);
472
473 let mut kline_rows = 0usize;
474 let mut liquidation_rows = 0usize;
475 let mut book_ticker_rows = 0usize;
476 let mut agg_trade_rows = 0usize;
477
478 for symbol in &config.symbols {
479 if config.clear_duckdb_range {
480 if config.include_klines {
481 clear_duckdb_klines(
482 &duck,
483 config.mode,
484 symbol,
485 config.product.as_deref(),
486 config.interval_name.as_deref(),
487 &from_ts,
488 &to_ts,
489 )?;
490 }
491 if config.include_liquidations {
492 clear_duckdb_liquidations(&duck, config.mode, symbol, &from_ts, &to_ts)?;
493 }
494 if config.include_book_tickers {
495 clear_duckdb_book_tickers(&duck, config.mode, symbol, &from_ts, &to_ts)?;
496 }
497 if config.include_agg_trades {
498 clear_duckdb_agg_trades(&duck, config.mode, symbol, &from_ts, &to_ts)?;
499 }
500 }
501
502 if config.include_klines {
503 let rows = client
504 .query(
505 "SELECT product, symbol, interval_name,
506 (EXTRACT(EPOCH FROM open_time) * 1000)::BIGINT AS open_time_ms,
507 (EXTRACT(EPOCH FROM close_time) * 1000)::BIGINT AS close_time_ms,
508 open, high, low, close, volume, quote_volume, trade_count,
509 taker_buy_base_volume, taker_buy_quote_volume, raw_payload
510 FROM raw_klines
511 WHERE mode = $1
512 AND symbol = $2
513 AND open_time >= CAST($3 AS TIMESTAMPTZ)
514 AND open_time <= CAST($4 AS TIMESTAMPTZ)
515 AND ($5::TEXT IS NULL OR product = $5)
516 AND ($6::TEXT IS NULL OR interval_name = $6)
517 ORDER BY open_time ASC",
518 &[
519 &config.mode.as_str(),
520 &symbol,
521 &from_ts,
522 &to_ts,
523 &config.product,
524 &config.interval_name,
525 ],
526 )
527 .map_err(storage_err)?;
528
529 let mut next_id = next_duckdb_kline_id(&duck)?;
530 for row in rows {
531 duck.execute(
532 "INSERT INTO raw_klines (
533 kline_id, mode, product, symbol, interval, open_time, close_time,
534 open, high, low, close, volume, quote_volume, trade_count,
535 taker_buy_base_volume, taker_buy_quote_volume, raw_payload
536 ) VALUES (
537 ?, ?, ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0),
538 ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
539 )",
540 params![
541 next_id,
542 config.mode.as_str(),
543 row.get::<_, String>(0),
544 row.get::<_, String>(1),
545 row.get::<_, String>(2),
546 row.get::<_, i64>(3),
547 row.get::<_, i64>(4),
548 row.get::<_, f64>(5),
549 row.get::<_, f64>(6),
550 row.get::<_, f64>(7),
551 row.get::<_, f64>(8),
552 row.get::<_, f64>(9),
553 row.get::<_, f64>(10),
554 row.get::<_, i64>(11),
555 row.get::<_, Option<f64>>(12),
556 row.get::<_, Option<f64>>(13),
557 row.get::<_, String>(14),
558 ],
559 )
560 .map_err(storage_err)?;
561 next_id += 1;
562 kline_rows += 1;
563 }
564 }
565
566 if config.include_liquidations {
567 let rows = client
568 .query(
569 "SELECT symbol,
570 (EXTRACT(EPOCH FROM event_time) * 1000)::BIGINT AS event_time_ms,
571 (EXTRACT(EPOCH FROM receive_time) * 1000)::BIGINT AS receive_time_ms,
572 force_side, price, qty, notional, raw_payload
573 FROM raw_liquidation_events
574 WHERE mode = $1
575 AND symbol = $2
576 AND event_time >= CAST($3 AS TIMESTAMPTZ)
577 AND event_time <= CAST($4 AS TIMESTAMPTZ)
578 ORDER BY event_time ASC",
579 &[&config.mode.as_str(), &symbol, &from_ts, &to_ts],
580 )
581 .map_err(storage_err)?;
582 let mut next_id = next_duckdb_liquidation_event_id(&duck)?;
583 for row in rows {
584 duck.execute(
585 "INSERT INTO raw_liquidation_events (
586 event_id, mode, symbol, event_time, receive_time, force_side, price, qty, notional, raw_payload
587 ) VALUES (
588 ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?, ?
589 )",
590 params![
591 next_id,
592 config.mode.as_str(),
593 row.get::<_, String>(0),
594 row.get::<_, i64>(1),
595 row.get::<_, i64>(2),
596 row.get::<_, String>(3),
597 row.get::<_, f64>(4),
598 row.get::<_, f64>(5),
599 row.get::<_, f64>(6),
600 row.get::<_, String>(7),
601 ],
602 )
603 .map_err(storage_err)?;
604 next_id += 1;
605 liquidation_rows += 1;
606 }
607 }
608
609 if config.include_book_tickers {
610 let rows = client
611 .query(
612 "SELECT symbol,
613 (EXTRACT(EPOCH FROM event_time) * 1000)::BIGINT AS event_time_ms,
614 (EXTRACT(EPOCH FROM receive_time) * 1000)::BIGINT AS receive_time_ms,
615 bid, bid_qty, ask, ask_qty
616 FROM raw_book_ticker
617 WHERE mode = $1
618 AND symbol = $2
619 AND event_time >= CAST($3 AS TIMESTAMPTZ)
620 AND event_time <= CAST($4 AS TIMESTAMPTZ)
621 ORDER BY event_time ASC",
622 &[&config.mode.as_str(), &symbol, &from_ts, &to_ts],
623 )
624 .map_err(storage_err)?;
625 let mut next_id = next_duckdb_book_ticker_id(&duck)?;
626 for row in rows {
627 duck.execute(
628 "INSERT INTO raw_book_ticker (
629 tick_id, mode, symbol, event_time, receive_time, bid, bid_qty, ask, ask_qty
630 ) VALUES (
631 ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?
632 )",
633 params![
634 next_id,
635 config.mode.as_str(),
636 row.get::<_, String>(0),
637 row.get::<_, i64>(1),
638 row.get::<_, i64>(2),
639 row.get::<_, f64>(3),
640 row.get::<_, f64>(4),
641 row.get::<_, f64>(5),
642 row.get::<_, f64>(6),
643 ],
644 )
645 .map_err(storage_err)?;
646 next_id += 1;
647 book_ticker_rows += 1;
648 }
649 }
650
651 if config.include_agg_trades {
652 let rows = client
653 .query(
654 "SELECT symbol,
655 (EXTRACT(EPOCH FROM event_time) * 1000)::BIGINT AS event_time_ms,
656 (EXTRACT(EPOCH FROM receive_time) * 1000)::BIGINT AS receive_time_ms,
657 price, qty, is_buyer_maker
658 FROM raw_agg_trades
659 WHERE mode = $1
660 AND symbol = $2
661 AND event_time >= CAST($3 AS TIMESTAMPTZ)
662 AND event_time <= CAST($4 AS TIMESTAMPTZ)
663 ORDER BY event_time ASC",
664 &[&config.mode.as_str(), &symbol, &from_ts, &to_ts],
665 )
666 .map_err(storage_err)?;
667 let mut next_id = next_duckdb_agg_trade_id(&duck)?;
668 for row in rows {
669 duck.execute(
670 "INSERT INTO raw_agg_trades (
671 trade_id, mode, symbol, event_time, receive_time, price, qty, is_buyer_maker
672 ) VALUES (
673 ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?
674 )",
675 params![
676 next_id,
677 config.mode.as_str(),
678 row.get::<_, String>(0),
679 row.get::<_, i64>(1),
680 row.get::<_, i64>(2),
681 row.get::<_, f64>(3),
682 row.get::<_, f64>(4),
683 row.get::<_, bool>(5),
684 ],
685 )
686 .map_err(storage_err)?;
687 next_id += 1;
688 agg_trade_rows += 1;
689 }
690 }
691 }
692
693 let snapshot_export_id = next_duckdb_snapshot_export_id(&duck)?;
694 duck.execute(
695 "INSERT INTO snapshot_exports (
696 export_id, created_at, source_backend, source_target, mode, symbols_csv, from_date, to_date,
697 product, interval_name, include_klines, include_liquidations, include_book_tickers, include_agg_trades,
698 clear_duckdb_range, exported_kline_rows, exported_liquidation_rows, exported_book_ticker_rows, exported_agg_trade_rows
699 ) VALUES (
700 ?, CURRENT_TIMESTAMP, 'postgres', ?, ?, ?, CAST(? AS DATE), CAST(? AS DATE),
701 ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
702 )",
703 params![
704 snapshot_export_id,
705 mask_postgres_url(&config.postgres_url),
706 config.mode.as_str(),
707 config.symbols.join(","),
708 config.from.to_string(),
709 config.to.to_string(),
710 config.product.as_deref(),
711 config.interval_name.as_deref(),
712 config.include_klines,
713 config.include_liquidations,
714 config.include_book_tickers,
715 config.include_agg_trades,
716 config.clear_duckdb_range,
717 kline_rows as i64,
718 liquidation_rows as i64,
719 book_ticker_rows as i64,
720 agg_trade_rows as i64,
721 ],
722 )
723 .map_err(storage_err)?;
724
725 Ok(PostgresToDuckDbSnapshotReport {
726 snapshot_export_id,
727 db_path: db_path.display().to_string(),
728 kline_rows,
729 liquidation_rows,
730 book_ticker_rows,
731 agg_trade_rows,
732 })
733}
734
735fn existing_schema_version(client: &mut Client) -> Result<Option<String>, StorageError> {
736 let table_exists = client
737 .query_one(
738 "SELECT EXISTS (
739 SELECT 1 FROM information_schema.tables
740 WHERE table_schema = 'public' AND table_name = 'schema_metadata'
741 )",
742 &[],
743 )
744 .map_err(storage_err)?
745 .get::<_, bool>(0);
746 if !table_exists {
747 return Ok(None);
748 }
749 client
750 .query_opt(
751 "SELECT value FROM schema_metadata WHERE key = 'market_data_schema_version'",
752 &[],
753 )
754 .map_err(storage_err)
755 .map(|row| row.map(|row| row.get(0)))
756}
757
758fn query_count(client: &mut Client, table: &str) -> Result<u64, StorageError> {
759 client
760 .query_one(&format!("SELECT COUNT(*) FROM {table}"), &[])
761 .map_err(storage_err)
762 .map(|row| row.get::<_, i64>(0).max(0) as u64)
763}
764
765fn query_latest_timestamp(
766 client: &mut Client,
767 table: &str,
768 column: &str,
769) -> Result<Option<String>, StorageError> {
770 client
771 .query_one(
772 &format!("SELECT CAST(MAX({column}) AS TEXT) FROM {table}"),
773 &[],
774 )
775 .map_err(storage_err)
776 .map(|row| row.get(0))
777}
778
779fn query_top_symbols(client: &mut Client, table: &str) -> Result<Vec<String>, StorageError> {
780 client
781 .query(
782 &format!(
783 "SELECT symbol, COUNT(*) AS row_count FROM {table} GROUP BY symbol ORDER BY row_count DESC, symbol ASC LIMIT 5"
784 ),
785 &[],
786 )
787 .map_err(storage_err)
788 .map(|rows| {
789 rows.into_iter()
790 .map(|row| format!("{}:{}", row.get::<_, String>(0), row.get::<_, i64>(1)))
791 .collect()
792 })
793}
794
795fn clear_duckdb_klines(
796 duck: &DuckConnection,
797 mode: BinanceMode,
798 symbol: &str,
799 product: Option<&str>,
800 interval_name: Option<&str>,
801 from_ts: &str,
802 to_ts: &str,
803) -> Result<(), StorageError> {
804 duck.execute(
805 "DELETE FROM raw_klines
806 WHERE mode = ?
807 AND symbol = ?
808 AND open_time >= CAST(? AS TIMESTAMP)
809 AND open_time <= CAST(? AS TIMESTAMP)
810 AND (? IS NULL OR product = ?)
811 AND (? IS NULL OR interval = ?)",
812 params![
813 mode.as_str(),
814 symbol,
815 from_ts,
816 to_ts,
817 product,
818 product,
819 interval_name,
820 interval_name
821 ],
822 )
823 .map(|_| ())
824 .map_err(storage_err)
825}
826
827fn clear_duckdb_liquidations(
828 duck: &DuckConnection,
829 mode: BinanceMode,
830 symbol: &str,
831 from_ts: &str,
832 to_ts: &str,
833) -> Result<(), StorageError> {
834 duck.execute(
835 "DELETE FROM raw_liquidation_events
836 WHERE mode = ?
837 AND symbol = ?
838 AND event_time >= CAST(? AS TIMESTAMP)
839 AND event_time <= CAST(? AS TIMESTAMP)",
840 params![mode.as_str(), symbol, from_ts, to_ts],
841 )
842 .map(|_| ())
843 .map_err(storage_err)
844}
845
846fn clear_duckdb_book_tickers(
847 duck: &DuckConnection,
848 mode: BinanceMode,
849 symbol: &str,
850 from_ts: &str,
851 to_ts: &str,
852) -> Result<(), StorageError> {
853 duck.execute(
854 "DELETE FROM raw_book_ticker
855 WHERE mode = ?
856 AND symbol = ?
857 AND event_time >= CAST(? AS TIMESTAMP)
858 AND event_time <= CAST(? AS TIMESTAMP)",
859 params![mode.as_str(), symbol, from_ts, to_ts],
860 )
861 .map(|_| ())
862 .map_err(storage_err)
863}
864
865fn clear_duckdb_agg_trades(
866 duck: &DuckConnection,
867 mode: BinanceMode,
868 symbol: &str,
869 from_ts: &str,
870 to_ts: &str,
871) -> Result<(), StorageError> {
872 duck.execute(
873 "DELETE FROM raw_agg_trades
874 WHERE mode = ?
875 AND symbol = ?
876 AND event_time >= CAST(? AS TIMESTAMP)
877 AND event_time <= CAST(? AS TIMESTAMP)",
878 params![mode.as_str(), symbol, from_ts, to_ts],
879 )
880 .map(|_| ())
881 .map_err(storage_err)
882}
883
884fn next_duckdb_kline_id(connection: &DuckConnection) -> Result<i64, StorageError> {
885 connection
886 .prepare("SELECT COALESCE(MAX(kline_id), 0) + 1 FROM raw_klines")
887 .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
888 .map_err(storage_err)
889}
890
891fn next_duckdb_liquidation_event_id(connection: &DuckConnection) -> Result<i64, StorageError> {
892 connection
893 .prepare("SELECT COALESCE(MAX(event_id), 0) + 1 FROM raw_liquidation_events")
894 .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
895 .map_err(storage_err)
896}
897
898fn next_duckdb_book_ticker_id(connection: &DuckConnection) -> Result<i64, StorageError> {
899 connection
900 .prepare("SELECT COALESCE(MAX(tick_id), 0) + 1 FROM raw_book_ticker")
901 .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
902 .map_err(storage_err)
903}
904
905fn next_duckdb_agg_trade_id(connection: &DuckConnection) -> Result<i64, StorageError> {
906 connection
907 .prepare("SELECT COALESCE(MAX(trade_id), 0) + 1 FROM raw_agg_trades")
908 .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
909 .map_err(storage_err)
910}
911
912fn next_duckdb_snapshot_export_id(connection: &DuckConnection) -> Result<i64, StorageError> {
913 connection
914 .prepare("SELECT COALESCE(MAX(export_id), 0) + 1 FROM snapshot_exports")
915 .and_then(|mut statement| statement.query_row([], |row| row.get(0)))
916 .map_err(storage_err)
917}
918
919pub fn mask_postgres_url(url: &str) -> String {
920 if let Some((scheme, rest)) = url.split_once("://") {
921 if let Some((_, host_part)) = rest.rsplit_once('@') {
922 return format!("{scheme}://***@{host_part}");
923 }
924 }
925 "postgres://***".to_string()
926}
927
928fn storage_err(error: impl std::fmt::Display) -> StorageError {
929 StorageError::WriteFailedWithContext {
930 message: error.to_string(),
931 }
932}