1use std::collections::{BTreeMap, BTreeSet};
2use std::path::{Path, PathBuf};
3use std::sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc, Mutex,
6};
7use std::thread::JoinHandle;
8use std::time::Duration;
9
10use chrono::{DateTime, TimeZone, Utc};
11use duckdb::{params, Connection};
12use futures_util::StreamExt;
13use serde::Deserialize;
14use tokio_tungstenite::connect_async;
15use tokio_tungstenite::tungstenite::Message;
16
17use crate::app::bootstrap::BinanceMode;
18use crate::dataset::query::{backtest_summary_for_path, metrics_for_path};
19use crate::dataset::schema::init_schema_for_path;
20use crate::dataset::types::{BacktestDatasetSummary, RecorderMetrics};
21use crate::error::storage_error::StorageError;
22use crate::record::coordination::RecorderCoordination;
23use crate::storage::postgres_market_data::{
24 connect as connect_postgres, init_schema as init_postgres_schema, insert_agg_trade,
25 insert_book_ticker, insert_liquidation, mask_postgres_url, metrics_for_postgres_url,
26 postgres_url_from_env, CollectorStorageBackend, PostgresAggTradeRecord,
27 PostgresBookTickerRecord, PostgresLiquidationRecord,
28};
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum RecorderState {
32 Running,
33 Stopped,
34}
35
36impl RecorderState {
37 pub fn as_str(self) -> &'static str {
38 match self {
39 Self::Running => "running",
40 Self::Stopped => "stopped",
41 }
42 }
43
44 pub fn is_running(self) -> bool {
45 self == Self::Running
46 }
47}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub struct RecorderStatus {
51 pub mode: BinanceMode,
52 pub state: RecorderState,
53 pub db_path: PathBuf,
54 pub storage_backend: String,
55 pub storage_target: String,
56 pub started_at: Option<DateTime<Utc>>,
57 pub updated_at: DateTime<Utc>,
58 pub manual_symbols: Vec<String>,
59 pub strategy_symbols: Vec<String>,
60 pub watched_symbols: Vec<String>,
61 pub worker_alive: bool,
62 pub heartbeat_age_sec: i64,
63 pub last_error: Option<String>,
64 pub metrics: RecorderMetrics,
65}
66
67impl RecorderStatus {
68 fn new(
69 mode: BinanceMode,
70 state: RecorderState,
71 db_path: PathBuf,
72 storage_backend: String,
73 storage_target: String,
74 manual_symbols: Vec<String>,
75 strategy_symbols: Vec<String>,
76 watched_symbols: Vec<String>,
77 ) -> Self {
78 let now = Utc::now();
79 Self {
80 mode,
81 state,
82 db_path,
83 storage_backend,
84 storage_target,
85 started_at: if state == RecorderState::Running {
86 Some(now)
87 } else {
88 None
89 },
90 updated_at: now,
91 manual_symbols,
92 strategy_symbols,
93 watched_symbols,
94 worker_alive: true,
95 heartbeat_age_sec: 0,
96 last_error: None,
97 metrics: RecorderMetrics::default(),
98 }
99 }
100}
101
102#[derive(Debug, Clone)]
103struct WorkerSnapshot {
104 updated_at: DateTime<Utc>,
105 metrics: RecorderMetrics,
106 last_error: Option<String>,
107}
108
109impl WorkerSnapshot {
110 fn new(metrics: RecorderMetrics) -> Self {
111 Self {
112 updated_at: Utc::now(),
113 metrics,
114 last_error: None,
115 }
116 }
117}
118
119struct ModeWorker {
120 stop_flag: Arc<AtomicBool>,
121 snapshot: Arc<Mutex<WorkerSnapshot>>,
122 pub(crate) handle: JoinHandle<()>,
123}
124
125pub struct MarketDataRecorder {
126 base_dir: PathBuf,
127 network_enabled: bool,
128 storage_backend: CollectorStorageBackend,
129 postgres_url: Option<String>,
130 statuses: BTreeMap<BinanceMode, RecorderStatus>,
131 workers: BTreeMap<BinanceMode, ModeWorker>,
132}
133
134impl std::fmt::Debug for MarketDataRecorder {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 f.debug_struct("MarketDataRecorder")
137 .field("base_dir", &self.base_dir)
138 .field("network_enabled", &self.network_enabled)
139 .field("storage_backend", &self.storage_backend)
140 .field("statuses", &self.statuses)
141 .finish()
142 }
143}
144
145impl Default for MarketDataRecorder {
146 fn default() -> Self {
147 Self::new("var")
148 }
149}
150
151impl MarketDataRecorder {
152 pub fn new(base_dir: impl Into<PathBuf>) -> Self {
153 let storage_backend = std::env::var("SANDBOX_QUANT_RECORDER_STORAGE")
154 .ok()
155 .as_deref()
156 .map(parse_storage_backend)
157 .unwrap_or(CollectorStorageBackend::DuckDb);
158 let postgres_url = if storage_backend == CollectorStorageBackend::Postgres {
159 postgres_url_from_env().ok()
160 } else {
161 None
162 };
163 Self {
164 base_dir: base_dir.into(),
165 network_enabled: true,
166 storage_backend,
167 postgres_url,
168 statuses: BTreeMap::new(),
169 workers: BTreeMap::new(),
170 }
171 }
172
173 pub fn without_network(mut self) -> Self {
174 self.network_enabled = false;
175 self
176 }
177
178 pub fn start(
179 &mut self,
180 mode: BinanceMode,
181 manual_symbols: Vec<String>,
182 strategy_symbols: Vec<String>,
183 ) -> Result<RecorderStatus, StorageError> {
184 if self
185 .statuses
186 .get(&mode)
187 .is_some_and(|status| status.state == RecorderState::Running)
188 {
189 return Err(StorageError::RecorderAlreadyRunning {
190 mode: mode.as_str().to_string(),
191 });
192 }
193
194 let db_path = self.db_path(mode);
195 if self.storage_backend == CollectorStorageBackend::DuckDb {
196 init_schema_for_path(&db_path)?;
197 } else if let Some(url) = self.postgres_url.as_deref() {
198 let mut client = connect_postgres(url)?;
199 let _ = init_postgres_schema(&mut client, url)?;
200 }
201 let manual_symbols = normalize_symbols(manual_symbols);
202 let strategy_symbols = normalize_symbols(strategy_symbols);
203 let watched_symbols = merge_symbol_sets(manual_symbols.clone(), strategy_symbols.clone());
204 let status = RecorderStatus::new(
205 mode,
206 RecorderState::Running,
207 db_path.clone(),
208 self.storage_backend.as_str().to_string(),
209 self.storage_target(mode),
210 manual_symbols,
211 strategy_symbols,
212 watched_symbols.clone(),
213 );
214 let initial_metrics = self.load_metrics(&db_path).unwrap_or_default();
215 let mut status = status;
216 status.metrics = initial_metrics.clone();
217 if self.network_enabled {
218 self.spawn_worker(mode, db_path, watched_symbols, initial_metrics)?;
219 }
220 self.statuses.insert(mode, status.clone());
221 Ok(status)
222 }
223
224 pub fn status(&self, mode: BinanceMode) -> RecorderStatus {
225 let mut status = self.statuses.get(&mode).cloned().unwrap_or_else(|| {
226 RecorderStatus::new(
227 mode,
228 RecorderState::Stopped,
229 self.db_path(mode),
230 self.storage_backend.as_str().to_string(),
231 self.storage_target(mode),
232 Vec::new(),
233 Vec::new(),
234 Vec::new(),
235 )
236 });
237 if let Some(worker) = self.workers.get(&mode) {
238 let worker_alive = !worker.handle.is_finished();
239 status.worker_alive = worker_alive;
240 if let Ok(snapshot) = worker.snapshot.lock() {
241 status.updated_at = snapshot.updated_at;
242 status.heartbeat_age_sec = (Utc::now() - snapshot.updated_at).num_seconds();
243 status.metrics = snapshot.metrics.clone();
244 status.last_error = snapshot.last_error.clone();
245 }
246 } else {
247 status.worker_alive = false;
248 status.heartbeat_age_sec = (Utc::now() - status.updated_at).num_seconds();
249 status.metrics = self.load_metrics(&status.db_path).unwrap_or_default();
250 }
251 status
252 }
253
254 pub fn update_strategy_symbols(
255 &mut self,
256 mode: BinanceMode,
257 strategy_symbols: Vec<String>,
258 ) -> Result<(), StorageError> {
259 let strategy_symbols = normalize_symbols(strategy_symbols);
260 let Some(status) = self.statuses.get_mut(&mode) else {
261 return Ok(());
262 };
263 if status.strategy_symbols == strategy_symbols {
264 return Ok(());
265 }
266 status.strategy_symbols = strategy_symbols.clone();
267 status.watched_symbols =
268 merge_symbol_sets(status.manual_symbols.clone(), strategy_symbols.clone());
269 status.updated_at = Utc::now();
270 let watched_symbols = status.watched_symbols.clone();
271 let should_restart = status.state == RecorderState::Running && self.network_enabled;
272
273 if !should_restart {
274 return Ok(());
275 }
276
277 self.restart_worker(mode, watched_symbols)
278 }
279
280 pub fn update_manual_symbols(
281 &mut self,
282 mode: BinanceMode,
283 manual_symbols: Vec<String>,
284 ) -> Result<(), StorageError> {
285 let manual_symbols = normalize_symbols(manual_symbols);
286 let Some(status) = self.statuses.get_mut(&mode) else {
287 return Ok(());
288 };
289 if status.manual_symbols == manual_symbols {
290 return Ok(());
291 }
292 status.manual_symbols = manual_symbols.clone();
293 status.watched_symbols =
294 merge_symbol_sets(manual_symbols.clone(), status.strategy_symbols.clone());
295 status.updated_at = Utc::now();
296 let watched_symbols = status.watched_symbols.clone();
297 let should_restart = status.state == RecorderState::Running && self.network_enabled;
298
299 if !should_restart {
300 return Ok(());
301 }
302
303 self.restart_worker(mode, watched_symbols)
304 }
305
306 pub fn stop(&mut self, mode: BinanceMode) -> Result<RecorderStatus, StorageError> {
307 let Some(existing) = self.statuses.get_mut(&mode) else {
308 return Err(StorageError::RecorderNotRunning {
309 mode: mode.as_str().to_string(),
310 });
311 };
312 if existing.state != RecorderState::Running {
313 return Err(StorageError::RecorderNotRunning {
314 mode: mode.as_str().to_string(),
315 });
316 }
317
318 if let Some(worker) = self.workers.remove(&mode) {
319 if let Ok(snapshot) = worker.snapshot.lock() {
320 existing.updated_at = snapshot.updated_at;
321 existing.metrics = snapshot.metrics.clone();
322 existing.last_error = snapshot.last_error.clone();
323 }
324 worker.stop_flag.store(true, Ordering::Relaxed);
325 }
326
327 existing.state = RecorderState::Stopped;
328 existing.updated_at = Utc::now();
329 existing.worker_alive = false;
330 Ok(existing.clone())
331 }
332
333 pub fn backtest_dataset_summary(
334 &self,
335 mode: BinanceMode,
336 symbol: &str,
337 from: chrono::NaiveDate,
338 to: chrono::NaiveDate,
339 ) -> Result<BacktestDatasetSummary, StorageError> {
340 backtest_summary_for_path(&self.db_path(mode), mode, symbol, from, to)
341 }
342
343 pub fn worker_alive(&self, mode: BinanceMode) -> bool {
344 self.workers
345 .get(&mode)
346 .is_some_and(|worker| !worker.handle.is_finished())
347 }
348
349 pub fn metrics_for_path(db_path: &Path) -> Result<RecorderMetrics, StorageError> {
350 metrics_for_path(db_path)
351 }
352
353 pub fn backtest_summary_for_path(
354 db_path: &Path,
355 mode: BinanceMode,
356 symbol: &str,
357 from: chrono::NaiveDate,
358 to: chrono::NaiveDate,
359 ) -> Result<BacktestDatasetSummary, StorageError> {
360 backtest_summary_for_path(db_path, mode, symbol, from, to)
361 }
362
363 pub fn init_schema_for_path(db_path: &Path) -> Result<(), StorageError> {
364 init_schema_for_path(db_path)
365 }
366
367 fn restart_worker(
368 &mut self,
369 mode: BinanceMode,
370 watched_symbols: Vec<String>,
371 ) -> Result<(), StorageError> {
372 let initial_metrics = self.status(mode).metrics.clone();
373 if let Some(worker) = self.workers.remove(&mode) {
374 worker.stop_flag.store(true, Ordering::Relaxed);
375 }
376 let db_path = self.db_path(mode);
377 self.spawn_worker(mode, db_path, watched_symbols, initial_metrics)
378 }
379
380 fn spawn_worker(
381 &mut self,
382 mode: BinanceMode,
383 db_path: PathBuf,
384 watched_symbols: Vec<String>,
385 initial_metrics: RecorderMetrics,
386 ) -> Result<(), StorageError> {
387 let stop_flag = Arc::new(AtomicBool::new(false));
388 let worker_stop_flag = stop_flag.clone();
389 let snapshot = Arc::new(Mutex::new(WorkerSnapshot::new(initial_metrics)));
390 let worker_snapshot = snapshot.clone();
391 let storage_backend = self.storage_backend;
392 let postgres_url = self.postgres_url.clone();
393 let handle = std::thread::Builder::new()
394 .name(format!("market-recorder-{}", mode.as_str()))
395 .spawn(move || {
396 let _ = rustls::crypto::ring::default_provider().install_default();
397 let runtime = tokio::runtime::Builder::new_current_thread()
398 .enable_all()
399 .build();
400 let Ok(runtime) = runtime else {
401 record_worker_error(
402 &worker_snapshot,
403 "failed to initialize tokio runtime".to_string(),
404 );
405 return;
406 };
407 runtime.block_on(async move {
408 run_market_data_worker(
409 mode,
410 db_path,
411 storage_backend,
412 postgres_url,
413 watched_symbols,
414 worker_stop_flag,
415 worker_snapshot,
416 )
417 .await;
418 });
419 })
420 .map_err(|error| StorageError::WriteFailedWithContext {
421 message: error.to_string(),
422 })?;
423 self.workers.insert(
424 mode,
425 ModeWorker {
426 stop_flag,
427 snapshot,
428 handle,
429 },
430 );
431 Ok(())
432 }
433
434 fn load_metrics(&self, db_path: &Path) -> Result<RecorderMetrics, StorageError> {
435 match self.storage_backend {
436 CollectorStorageBackend::DuckDb => metrics_for_path(db_path),
437 CollectorStorageBackend::Postgres => self
438 .postgres_url
439 .as_deref()
440 .ok_or_else(|| StorageError::WriteFailedWithContext {
441 message: "postgres recorder backend selected but postgres URL is missing"
442 .to_string(),
443 })
444 .and_then(metrics_for_postgres_url),
445 }
446 }
447
448 fn db_path(&self, mode: BinanceMode) -> PathBuf {
449 RecorderCoordination::new(self.base_dir.clone()).db_path(mode)
450 }
451
452 fn storage_target(&self, mode: BinanceMode) -> String {
453 match self.storage_backend {
454 CollectorStorageBackend::DuckDb => self.db_path(mode).display().to_string(),
455 CollectorStorageBackend::Postgres => self
456 .postgres_url
457 .as_deref()
458 .map(mask_postgres_url)
459 .unwrap_or_else(|| "postgres://***".to_string()),
460 }
461 }
462}
463
464impl Drop for MarketDataRecorder {
465 fn drop(&mut self) {
466 for worker in self.workers.values() {
467 worker.stop_flag.store(true, Ordering::Relaxed);
468 }
469 }
470}
471
472async fn run_market_data_worker(
473 mode: BinanceMode,
474 db_path: PathBuf,
475 storage_backend: CollectorStorageBackend,
476 postgres_url: Option<String>,
477 watched_symbols: Vec<String>,
478 stop_flag: Arc<AtomicBool>,
479 snapshot: Arc<Mutex<WorkerSnapshot>>,
480) {
481 let duck_connection = if storage_backend == CollectorStorageBackend::DuckDb {
482 match Connection::open(&db_path) {
483 Ok(connection) => Some(connection),
484 Err(_) => {
485 record_worker_error(
486 &snapshot,
487 format!("failed to open duckdb at {}", db_path.display()),
488 );
489 return;
490 }
491 }
492 } else {
493 None
494 };
495 let mut postgres_client = if storage_backend == CollectorStorageBackend::Postgres {
496 let Some(url) = postgres_url.as_deref() else {
497 record_worker_error(
498 &snapshot,
499 "postgres recorder backend missing URL".to_string(),
500 );
501 return;
502 };
503 match connect_postgres(url) {
504 Ok(mut client) => {
505 if let Err(error) = init_postgres_schema(&mut client, url) {
506 record_worker_error(&snapshot, error.to_string());
507 return;
508 }
509 Some(client)
510 }
511 Err(error) => {
512 record_worker_error(&snapshot, error.to_string());
513 return;
514 }
515 }
516 } else {
517 None
518 };
519 let mut agg_trade_bar_seconds = BTreeMap::new();
520
521 loop {
522 touch_worker_snapshot(&snapshot);
523 if stop_flag.load(Ordering::Relaxed) {
524 break;
525 }
526
527 let force_order_url = format!("{}/ws/!forceOrder@arr", market_stream_base_url(mode));
528 let symbol_stream_url = combined_symbol_stream_url(mode, &watched_symbols);
529
530 let force_stream = connect_async(force_order_url).await;
531 let mut force_stream = match force_stream {
532 Ok((stream, _)) => stream,
533 Err(error) => {
534 record_worker_error(&snapshot, format!("forceOrder connect failed: {error}"));
535 eprintln!(
536 "market recorder: failed to connect forceOrder stream mode={} error={}",
537 mode.as_str(),
538 error
539 );
540 tokio::time::sleep(Duration::from_secs(2)).await;
541 continue;
542 }
543 };
544
545 let mut symbol_stream = match symbol_stream_url {
546 Some(url) => match connect_async(url).await {
547 Ok((stream, _)) => Some(stream),
548 Err(error) => {
549 record_worker_error(
550 &snapshot,
551 format!("symbol stream connect failed: {error}"),
552 );
553 eprintln!(
554 "market recorder: failed to connect symbol streams mode={} symbols={} error={}",
555 mode.as_str(),
556 watched_symbols.join(","),
557 error
558 );
559 None
560 }
561 },
562 None => None,
563 };
564
565 let mut liquidation_seq = 0i64;
566 let mut ticker_seq = 0i64;
567 let mut trade_seq = 0i64;
568
569 loop {
570 if stop_flag.load(Ordering::Relaxed) {
571 return;
572 }
573
574 tokio::select! {
575 message = force_stream.next() => {
576 match message {
577 Some(Ok(message)) => {
578 if let Err(error) = handle_force_order_message(
579 duck_connection.as_ref(),
580 postgres_client.as_mut(),
581 mode,
582 &mut liquidation_seq,
583 &snapshot,
584 message
585 ) {
586 record_worker_error(&snapshot, error.to_string());
587 eprintln!(
588 "market recorder: forceOrder stream handling failed mode={} error={}",
589 mode.as_str(),
590 error
591 );
592 break;
593 }
594 }
595 Some(Err(error)) => {
596 record_worker_error(&snapshot, format!("forceOrder stream disconnected: {error}"));
597 eprintln!(
598 "market recorder: forceOrder stream disconnected mode={} error={}",
599 mode.as_str(),
600 error
601 );
602 break
603 }
604 None => {
605 record_worker_error(&snapshot, "forceOrder stream disconnected: eof".to_string());
606 eprintln!(
607 "market recorder: forceOrder stream disconnected mode={} error=eof",
608 mode.as_str()
609 );
610 break
611 }
612 }
613 }
614 message = next_symbol_message(&mut symbol_stream), if symbol_stream.is_some() => {
615 match message {
616 Some(Ok(message)) => {
617 if let Err(error) = handle_symbol_message(
618 duck_connection.as_ref(),
619 postgres_client.as_mut(),
620 mode,
621 &mut ticker_seq,
622 &mut trade_seq,
623 &mut agg_trade_bar_seconds,
624 &snapshot,
625 message,
626 ) {
627 record_worker_error(&snapshot, error.to_string());
628 eprintln!(
629 "market recorder: symbol stream handling failed mode={} error={}",
630 mode.as_str(),
631 error
632 );
633 break;
634 }
635 }
636 Some(Err(error)) => {
637 record_worker_error(&snapshot, format!("symbol stream disconnected: {error}"));
638 eprintln!(
639 "market recorder: symbol stream disconnected mode={} symbols={} error={}",
640 mode.as_str(),
641 watched_symbols.join(","),
642 error
643 );
644 break
645 }
646 None => {
647 record_worker_error(&snapshot, "symbol stream disconnected: eof".to_string());
648 eprintln!(
649 "market recorder: symbol stream disconnected mode={} symbols={} error=eof",
650 mode.as_str(),
651 watched_symbols.join(",")
652 );
653 break
654 }
655 }
656 }
657 _ = tokio::time::sleep(Duration::from_millis(250)) => {
658 touch_worker_snapshot(&snapshot);
659 }
660 }
661 }
662
663 tokio::time::sleep(Duration::from_secs(1)).await;
664 }
665}
666
667async fn next_symbol_message(
668 stream: &mut Option<
669 tokio_tungstenite::WebSocketStream<
670 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
671 >,
672 >,
673) -> Option<Result<Message, tokio_tungstenite::tungstenite::Error>> {
674 match stream {
675 Some(stream) => stream.next().await,
676 None => None,
677 }
678}
679
680fn handle_force_order_message(
681 duck_connection: Option<&Connection>,
682 postgres_client: Option<&mut postgres::Client>,
683 mode: BinanceMode,
684 sequence: &mut i64,
685 snapshot: &Arc<Mutex<WorkerSnapshot>>,
686 message: Message,
687) -> Result<(), StorageError> {
688 let payload = match message {
689 Message::Text(text) => text,
690 Message::Ping(_) | Message::Pong(_) | Message::Binary(_) => return Ok(()),
691 Message::Close(_) => {
692 return Err(StorageError::WriteFailedWithContext {
693 message: "forceOrder stream closed".to_string(),
694 })
695 }
696 Message::Frame(_) => return Ok(()),
697 };
698 let parsed: ForceOrderEnvelope =
699 serde_json::from_str(&payload).map_err(|error| StorageError::WriteFailedWithContext {
700 message: format!("forceOrder parse failed: {error}; payload={payload}"),
701 })?;
702 let Some(order) = parsed.order else {
703 return Ok(());
704 };
705 *sequence += 1;
706 let receive_time_ms = Utc::now().timestamp_millis();
707 let symbol = order.symbol.clone();
708 let side = order.side.clone();
709 if let Some(connection) = duck_connection {
710 connection
711 .execute(
712 "INSERT INTO raw_liquidation_events (
713 event_id, mode, symbol, event_time, receive_time, force_side, price, qty, notional, raw_payload
714 ) VALUES (
715 ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?, ?
716 )",
717 params![
718 *sequence,
719 mode.as_str(),
720 symbol,
721 parsed.event_time,
722 receive_time_ms,
723 side,
724 order.price,
725 order.qty,
726 order.price * order.qty,
727 payload,
728 ],
729 )
730 .map_err(|error| StorageError::WriteFailedWithContext {
731 message: error.to_string(),
732 })?;
733 } else if let Some(client) = postgres_client {
734 insert_liquidation(
735 client,
736 &PostgresLiquidationRecord {
737 mode: mode.as_str().to_string(),
738 product: "um".to_string(),
739 symbol: symbol.clone(),
740 event_time_ms: parsed.event_time,
741 receive_time_ms,
742 force_side: side,
743 price: order.price,
744 qty: order.qty,
745 notional: order.price * order.qty,
746 raw_payload: payload,
747 },
748 )?;
749 } else {
750 return Err(StorageError::WriteFailedWithContext {
751 message: "no recorder storage backend available".to_string(),
752 });
753 }
754 record_force_order_event(snapshot, &symbol, parsed.event_time);
755 Ok(())
756}
757
758fn handle_symbol_message(
759 duck_connection: Option<&Connection>,
760 postgres_client: Option<&mut postgres::Client>,
761 mode: BinanceMode,
762 ticker_sequence: &mut i64,
763 trade_sequence: &mut i64,
764 agg_trade_bar_seconds: &mut BTreeMap<String, i64>,
765 snapshot: &Arc<Mutex<WorkerSnapshot>>,
766 message: Message,
767) -> Result<(), StorageError> {
768 let payload = match message {
769 Message::Text(text) => text,
770 Message::Ping(_) | Message::Pong(_) | Message::Binary(_) => return Ok(()),
771 Message::Close(_) => {
772 return Err(StorageError::WriteFailedWithContext {
773 message: "symbol stream closed".to_string(),
774 })
775 }
776 Message::Frame(_) => return Ok(()),
777 };
778 let parsed: CombinedStreamEnvelope =
779 serde_json::from_str(&payload).map_err(|error| StorageError::WriteFailedWithContext {
780 message: format!("symbol stream parse failed: {error}; payload={payload}"),
781 })?;
782 let receive_time_ms = Utc::now().timestamp_millis();
783
784 if parsed.data.event_type == "bookTicker" {
785 let Some(symbol) = parsed.data.symbol else {
786 return Ok(());
787 };
788 let Some(event_time) = parsed.data.event_time else {
789 return Ok(());
790 };
791 let Some(bid) = parsed.data.bid else {
792 return Ok(());
793 };
794 let Some(bid_qty) = parsed.data.bid_qty else {
795 return Ok(());
796 };
797 let Some(ask) = parsed.data.ask else {
798 return Ok(());
799 };
800 let Some(ask_qty) = parsed.data.ask_qty else {
801 return Ok(());
802 };
803 *ticker_sequence += 1;
804 if let Some(connection) = duck_connection {
805 connection
806 .execute(
807 "INSERT INTO raw_book_ticker (
808 tick_id, mode, symbol, event_time, receive_time, bid, bid_qty, ask, ask_qty
809 ) VALUES (
810 ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?
811 )",
812 params![
813 *ticker_sequence,
814 mode.as_str(),
815 symbol,
816 event_time,
817 receive_time_ms,
818 bid,
819 bid_qty,
820 ask,
821 ask_qty,
822 ],
823 )
824 .map_err(|error| StorageError::WriteFailedWithContext {
825 message: error.to_string(),
826 })?;
827 } else if let Some(client) = postgres_client {
828 insert_book_ticker(
829 client,
830 &PostgresBookTickerRecord {
831 mode: mode.as_str().to_string(),
832 symbol: symbol.clone(),
833 event_time_ms: event_time,
834 receive_time_ms,
835 bid,
836 bid_qty,
837 ask,
838 ask_qty,
839 },
840 )?;
841 } else {
842 return Err(StorageError::WriteFailedWithContext {
843 message: "no recorder storage backend available".to_string(),
844 });
845 }
846 record_book_ticker_event(snapshot, &symbol, event_time);
847 } else if parsed.data.event_type == "aggTrade" {
848 let Some(symbol) = parsed.data.symbol else {
849 return Ok(());
850 };
851 let Some(event_time) = parsed.data.event_time else {
852 return Ok(());
853 };
854 let Some(price) = parsed.data.price else {
855 return Ok(());
856 };
857 let Some(qty) = parsed.data.qty else {
858 return Ok(());
859 };
860 let Some(is_buyer_maker) = parsed.data.is_buyer_maker else {
861 return Ok(());
862 };
863 *trade_sequence += 1;
864 if let Some(connection) = duck_connection {
865 connection
866 .execute(
867 "INSERT INTO raw_agg_trades (
868 trade_id, mode, symbol, event_time, receive_time, price, qty, is_buyer_maker
869 ) VALUES (
870 ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?
871 )",
872 params![
873 *trade_sequence,
874 mode.as_str(),
875 symbol,
876 event_time,
877 receive_time_ms,
878 price,
879 qty,
880 is_buyer_maker,
881 ],
882 )
883 .map_err(|error| StorageError::WriteFailedWithContext {
884 message: error.to_string(),
885 })?;
886 } else if let Some(client) = postgres_client {
887 insert_agg_trade(
888 client,
889 &PostgresAggTradeRecord {
890 mode: mode.as_str().to_string(),
891 symbol: symbol.clone(),
892 event_time_ms: event_time,
893 receive_time_ms,
894 price,
895 qty,
896 is_buyer_maker,
897 },
898 )?;
899 } else {
900 return Err(StorageError::WriteFailedWithContext {
901 message: "no recorder storage backend available".to_string(),
902 });
903 }
904 record_agg_trade_event(snapshot, &symbol, event_time, agg_trade_bar_seconds);
905 }
906
907 Ok(())
908}
909
910fn market_stream_base_url(mode: BinanceMode) -> &'static str {
911 let _ = mode;
912 "wss://fstream.binance.com"
913}
914
915fn combined_symbol_stream_url(mode: BinanceMode, watched_symbols: &[String]) -> Option<String> {
916 if watched_symbols.is_empty() {
917 return None;
918 }
919
920 let streams = watched_symbols
921 .iter()
922 .flat_map(|symbol| {
923 let lower = symbol.to_ascii_lowercase();
924 [format!("{lower}@bookTicker"), format!("{lower}@aggTrade")]
925 })
926 .collect::<Vec<_>>()
927 .join("/");
928 Some(format!(
929 "{}/stream?streams={streams}",
930 market_stream_base_url(mode)
931 ))
932}
933
934fn normalize_symbols(symbols: Vec<String>) -> Vec<String> {
935 let mut normalized = BTreeSet::new();
936 for symbol in symbols {
937 normalized.insert(symbol.trim().to_ascii_uppercase());
938 }
939 normalized.into_iter().collect()
940}
941
942fn merge_symbol_sets(left: Vec<String>, right: Vec<String>) -> Vec<String> {
943 let mut merged = BTreeSet::new();
944 for symbol in left.into_iter().chain(right.into_iter()) {
945 merged.insert(symbol);
946 }
947 merged.into_iter().collect()
948}
949
950fn parse_storage_backend(value: &str) -> CollectorStorageBackend {
951 match value {
952 "postgres" => CollectorStorageBackend::Postgres,
953 _ => CollectorStorageBackend::DuckDb,
954 }
955}
956
957#[derive(Debug, Deserialize)]
958struct ForceOrderEnvelope {
959 #[serde(rename = "E")]
960 event_time: i64,
961 #[serde(rename = "o")]
962 order: Option<ForceOrderData>,
963}
964
965#[derive(Debug, Deserialize)]
966struct ForceOrderData {
967 #[serde(rename = "s")]
968 symbol: String,
969 #[serde(rename = "S")]
970 side: String,
971 #[serde(rename = "p", deserialize_with = "deserialize_string_number")]
972 price: f64,
973 #[serde(rename = "q", deserialize_with = "deserialize_string_number")]
974 qty: f64,
975}
976
977#[derive(Debug, Deserialize)]
978struct CombinedStreamEnvelope {
979 data: CombinedStreamData,
980}
981
982#[derive(Debug, Deserialize)]
983struct CombinedStreamData {
984 #[serde(rename = "e")]
985 event_type: String,
986 #[serde(rename = "E")]
987 event_time: Option<i64>,
988 #[serde(rename = "s")]
989 symbol: Option<String>,
990 #[serde(
991 rename = "b",
992 default,
993 deserialize_with = "deserialize_optional_string_number"
994 )]
995 bid: Option<f64>,
996 #[serde(
997 rename = "B",
998 default,
999 deserialize_with = "deserialize_optional_string_number"
1000 )]
1001 bid_qty: Option<f64>,
1002 #[serde(
1003 rename = "a",
1004 default,
1005 deserialize_with = "deserialize_optional_string_number"
1006 )]
1007 ask: Option<f64>,
1008 #[serde(
1009 rename = "A",
1010 default,
1011 deserialize_with = "deserialize_optional_string_number"
1012 )]
1013 ask_qty: Option<f64>,
1014 #[serde(
1015 rename = "p",
1016 default,
1017 deserialize_with = "deserialize_optional_string_number"
1018 )]
1019 price: Option<f64>,
1020 #[serde(
1021 rename = "q",
1022 default,
1023 deserialize_with = "deserialize_optional_string_number"
1024 )]
1025 qty: Option<f64>,
1026 #[serde(rename = "m")]
1027 is_buyer_maker: Option<bool>,
1028}
1029
1030fn deserialize_string_number<'de, D>(deserializer: D) -> Result<f64, D::Error>
1031where
1032 D: serde::Deserializer<'de>,
1033{
1034 let value = String::deserialize(deserializer)?;
1035 value.parse::<f64>().map_err(serde::de::Error::custom)
1036}
1037
1038fn deserialize_optional_string_number<'de, D>(deserializer: D) -> Result<Option<f64>, D::Error>
1039where
1040 D: serde::Deserializer<'de>,
1041{
1042 let value = Option::<serde_json::Value>::deserialize(deserializer)?;
1043 match value {
1044 None | Some(serde_json::Value::Null) => Ok(None),
1045 Some(serde_json::Value::String(value)) => value
1046 .parse::<f64>()
1047 .map(Some)
1048 .map_err(serde::de::Error::custom),
1049 Some(serde_json::Value::Number(value)) => value
1050 .as_f64()
1051 .ok_or_else(|| serde::de::Error::custom("invalid numeric value"))
1052 .map(Some),
1053 Some(other) => Err(serde::de::Error::custom(format!(
1054 "expected string or number, got {other}"
1055 ))),
1056 }
1057}
1058
1059fn touch_worker_snapshot(snapshot: &Arc<Mutex<WorkerSnapshot>>) {
1060 if let Ok(mut snapshot) = snapshot.lock() {
1061 snapshot.updated_at = Utc::now();
1062 }
1063}
1064
1065fn record_worker_error(snapshot: &Arc<Mutex<WorkerSnapshot>>, error: String) {
1066 if let Ok(mut snapshot) = snapshot.lock() {
1067 snapshot.updated_at = Utc::now();
1068 snapshot.last_error = Some(error);
1069 }
1070}
1071
1072fn record_force_order_event(
1073 snapshot: &Arc<Mutex<WorkerSnapshot>>,
1074 symbol: &str,
1075 event_time_ms: i64,
1076) {
1077 if let Ok(mut snapshot) = snapshot.lock() {
1078 snapshot.updated_at = Utc::now();
1079 snapshot.last_error = None;
1080 snapshot.metrics.liquidation_events += 1;
1081 snapshot.metrics.last_liquidation_event_time = timestamp_string(event_time_ms);
1082 increment_top_symbol(&mut snapshot.metrics.top_liquidation_symbols, symbol);
1083 }
1084}
1085
1086fn record_book_ticker_event(
1087 snapshot: &Arc<Mutex<WorkerSnapshot>>,
1088 symbol: &str,
1089 event_time_ms: i64,
1090) {
1091 if let Ok(mut snapshot) = snapshot.lock() {
1092 snapshot.updated_at = Utc::now();
1093 snapshot.last_error = None;
1094 snapshot.metrics.book_ticker_events += 1;
1095 snapshot.metrics.last_book_ticker_event_time = timestamp_string(event_time_ms);
1096 increment_top_symbol(&mut snapshot.metrics.top_book_ticker_symbols, symbol);
1097 }
1098}
1099
1100fn record_agg_trade_event(
1101 snapshot: &Arc<Mutex<WorkerSnapshot>>,
1102 symbol: &str,
1103 event_time_ms: i64,
1104 agg_trade_bar_seconds: &mut BTreeMap<String, i64>,
1105) {
1106 if let Ok(mut snapshot) = snapshot.lock() {
1107 snapshot.updated_at = Utc::now();
1108 snapshot.last_error = None;
1109 snapshot.metrics.agg_trade_events += 1;
1110 snapshot.metrics.last_agg_trade_event_time = timestamp_string(event_time_ms);
1111 increment_top_symbol(&mut snapshot.metrics.top_agg_trade_symbols, symbol);
1112 let bar_second = event_time_ms / 1_000;
1113 let should_increment_bar = agg_trade_bar_seconds
1114 .insert(symbol.to_string(), bar_second)
1115 .map(|previous| previous != bar_second)
1116 .unwrap_or(true);
1117 if should_increment_bar {
1118 snapshot.metrics.derived_kline_1s_bars += 1;
1119 }
1120 }
1121}
1122
1123fn increment_top_symbol(top_symbols: &mut Vec<String>, symbol: &str) {
1124 let mut counts = top_symbols
1125 .iter()
1126 .filter_map(|entry| {
1127 let (symbol, count) = entry.split_once(':')?;
1128 let count = count.parse::<u64>().ok()?;
1129 Some((symbol.to_string(), count))
1130 })
1131 .collect::<BTreeMap<_, _>>();
1132 *counts.entry(symbol.to_string()).or_default() += 1;
1133 let mut sorted = counts.into_iter().collect::<Vec<_>>();
1134 sorted.sort_by(|left, right| right.1.cmp(&left.1).then_with(|| left.0.cmp(&right.0)));
1135 *top_symbols = sorted
1136 .into_iter()
1137 .take(5)
1138 .map(|(symbol, count)| format!("{symbol}:{count}"))
1139 .collect();
1140}
1141
1142fn timestamp_string(event_time_ms: i64) -> Option<String> {
1143 Utc.timestamp_millis_opt(event_time_ms)
1144 .single()
1145 .map(|value| {
1146 value
1147 .naive_utc()
1148 .format("%Y-%m-%d %H:%M:%S%.3f")
1149 .to_string()
1150 })
1151}