Skip to main content

sandbox_quant/recorder_app/
runtime.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::path::{Path, PathBuf};
3use std::sync::{
4    atomic::{AtomicBool, Ordering},
5    Arc, Mutex,
6};
7use std::thread::JoinHandle;
8use std::time::Duration;
9
10use chrono::{DateTime, TimeZone, Utc};
11use duckdb::{params, Connection};
12use futures_util::StreamExt;
13use serde::Deserialize;
14use tokio_tungstenite::connect_async;
15use tokio_tungstenite::tungstenite::Message;
16
17use crate::app::bootstrap::BinanceMode;
18use crate::dataset::query::{backtest_summary_for_path, metrics_for_path};
19use crate::dataset::schema::init_schema_for_path;
20use crate::dataset::types::{BacktestDatasetSummary, RecorderMetrics};
21use crate::error::storage_error::StorageError;
22use crate::record::coordination::RecorderCoordination;
23use crate::storage::postgres_market_data::{
24    connect as connect_postgres, init_schema as init_postgres_schema, insert_agg_trade,
25    insert_book_ticker, insert_liquidation, mask_postgres_url, metrics_for_postgres_url,
26    postgres_url_from_env, CollectorStorageBackend, PostgresAggTradeRecord,
27    PostgresBookTickerRecord, PostgresLiquidationRecord,
28};
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum RecorderState {
32    Running,
33    Stopped,
34}
35
36impl RecorderState {
37    pub fn as_str(self) -> &'static str {
38        match self {
39            Self::Running => "running",
40            Self::Stopped => "stopped",
41        }
42    }
43
44    pub fn is_running(self) -> bool {
45        self == Self::Running
46    }
47}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub struct RecorderStatus {
51    pub mode: BinanceMode,
52    pub state: RecorderState,
53    pub db_path: PathBuf,
54    pub storage_backend: String,
55    pub storage_target: String,
56    pub started_at: Option<DateTime<Utc>>,
57    pub updated_at: DateTime<Utc>,
58    pub manual_symbols: Vec<String>,
59    pub strategy_symbols: Vec<String>,
60    pub watched_symbols: Vec<String>,
61    pub worker_alive: bool,
62    pub heartbeat_age_sec: i64,
63    pub last_error: Option<String>,
64    pub metrics: RecorderMetrics,
65}
66
67impl RecorderStatus {
68    fn new(
69        mode: BinanceMode,
70        state: RecorderState,
71        db_path: PathBuf,
72        storage_backend: String,
73        storage_target: String,
74        manual_symbols: Vec<String>,
75        strategy_symbols: Vec<String>,
76        watched_symbols: Vec<String>,
77    ) -> Self {
78        let now = Utc::now();
79        Self {
80            mode,
81            state,
82            db_path,
83            storage_backend,
84            storage_target,
85            started_at: if state == RecorderState::Running {
86                Some(now)
87            } else {
88                None
89            },
90            updated_at: now,
91            manual_symbols,
92            strategy_symbols,
93            watched_symbols,
94            worker_alive: true,
95            heartbeat_age_sec: 0,
96            last_error: None,
97            metrics: RecorderMetrics::default(),
98        }
99    }
100}
101
102#[derive(Debug, Clone)]
103struct WorkerSnapshot {
104    updated_at: DateTime<Utc>,
105    metrics: RecorderMetrics,
106    last_error: Option<String>,
107}
108
109impl WorkerSnapshot {
110    fn new(metrics: RecorderMetrics) -> Self {
111        Self {
112            updated_at: Utc::now(),
113            metrics,
114            last_error: None,
115        }
116    }
117}
118
119struct ModeWorker {
120    stop_flag: Arc<AtomicBool>,
121    snapshot: Arc<Mutex<WorkerSnapshot>>,
122    pub(crate) handle: JoinHandle<()>,
123}
124
125pub struct MarketDataRecorder {
126    base_dir: PathBuf,
127    network_enabled: bool,
128    storage_backend: CollectorStorageBackend,
129    postgres_url: Option<String>,
130    statuses: BTreeMap<BinanceMode, RecorderStatus>,
131    workers: BTreeMap<BinanceMode, ModeWorker>,
132}
133
134impl std::fmt::Debug for MarketDataRecorder {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        f.debug_struct("MarketDataRecorder")
137            .field("base_dir", &self.base_dir)
138            .field("network_enabled", &self.network_enabled)
139            .field("storage_backend", &self.storage_backend)
140            .field("statuses", &self.statuses)
141            .finish()
142    }
143}
144
145impl Default for MarketDataRecorder {
146    fn default() -> Self {
147        Self::new("var")
148    }
149}
150
151impl MarketDataRecorder {
152    pub fn new(base_dir: impl Into<PathBuf>) -> Self {
153        let storage_backend = std::env::var("SANDBOX_QUANT_RECORDER_STORAGE")
154            .ok()
155            .as_deref()
156            .map(parse_storage_backend)
157            .unwrap_or(CollectorStorageBackend::DuckDb);
158        let postgres_url = if storage_backend == CollectorStorageBackend::Postgres {
159            postgres_url_from_env().ok()
160        } else {
161            None
162        };
163        Self {
164            base_dir: base_dir.into(),
165            network_enabled: true,
166            storage_backend,
167            postgres_url,
168            statuses: BTreeMap::new(),
169            workers: BTreeMap::new(),
170        }
171    }
172
173    pub fn without_network(mut self) -> Self {
174        self.network_enabled = false;
175        self
176    }
177
178    pub fn start(
179        &mut self,
180        mode: BinanceMode,
181        manual_symbols: Vec<String>,
182        strategy_symbols: Vec<String>,
183    ) -> Result<RecorderStatus, StorageError> {
184        if self
185            .statuses
186            .get(&mode)
187            .is_some_and(|status| status.state == RecorderState::Running)
188        {
189            return Err(StorageError::RecorderAlreadyRunning {
190                mode: mode.as_str().to_string(),
191            });
192        }
193
194        let db_path = self.db_path(mode);
195        if self.storage_backend == CollectorStorageBackend::DuckDb {
196            init_schema_for_path(&db_path)?;
197        } else if let Some(url) = self.postgres_url.as_deref() {
198            let mut client = connect_postgres(url)?;
199            let _ = init_postgres_schema(&mut client, url)?;
200        }
201        let manual_symbols = normalize_symbols(manual_symbols);
202        let strategy_symbols = normalize_symbols(strategy_symbols);
203        let watched_symbols = merge_symbol_sets(manual_symbols.clone(), strategy_symbols.clone());
204        let status = RecorderStatus::new(
205            mode,
206            RecorderState::Running,
207            db_path.clone(),
208            self.storage_backend.as_str().to_string(),
209            self.storage_target(mode),
210            manual_symbols,
211            strategy_symbols,
212            watched_symbols.clone(),
213        );
214        let initial_metrics = self.load_metrics(&db_path).unwrap_or_default();
215        let mut status = status;
216        status.metrics = initial_metrics.clone();
217        if self.network_enabled {
218            self.spawn_worker(mode, db_path, watched_symbols, initial_metrics)?;
219        }
220        self.statuses.insert(mode, status.clone());
221        Ok(status)
222    }
223
224    pub fn status(&self, mode: BinanceMode) -> RecorderStatus {
225        let mut status = self.statuses.get(&mode).cloned().unwrap_or_else(|| {
226            RecorderStatus::new(
227                mode,
228                RecorderState::Stopped,
229                self.db_path(mode),
230                self.storage_backend.as_str().to_string(),
231                self.storage_target(mode),
232                Vec::new(),
233                Vec::new(),
234                Vec::new(),
235            )
236        });
237        if let Some(worker) = self.workers.get(&mode) {
238            let worker_alive = !worker.handle.is_finished();
239            status.worker_alive = worker_alive;
240            if let Ok(snapshot) = worker.snapshot.lock() {
241                status.updated_at = snapshot.updated_at;
242                status.heartbeat_age_sec = (Utc::now() - snapshot.updated_at).num_seconds();
243                status.metrics = snapshot.metrics.clone();
244                status.last_error = snapshot.last_error.clone();
245            }
246        } else {
247            status.worker_alive = false;
248            status.heartbeat_age_sec = (Utc::now() - status.updated_at).num_seconds();
249            status.metrics = self.load_metrics(&status.db_path).unwrap_or_default();
250        }
251        status
252    }
253
254    pub fn update_strategy_symbols(
255        &mut self,
256        mode: BinanceMode,
257        strategy_symbols: Vec<String>,
258    ) -> Result<(), StorageError> {
259        let strategy_symbols = normalize_symbols(strategy_symbols);
260        let Some(status) = self.statuses.get_mut(&mode) else {
261            return Ok(());
262        };
263        if status.strategy_symbols == strategy_symbols {
264            return Ok(());
265        }
266        status.strategy_symbols = strategy_symbols.clone();
267        status.watched_symbols =
268            merge_symbol_sets(status.manual_symbols.clone(), strategy_symbols.clone());
269        status.updated_at = Utc::now();
270        let watched_symbols = status.watched_symbols.clone();
271        let should_restart = status.state == RecorderState::Running && self.network_enabled;
272
273        if !should_restart {
274            return Ok(());
275        }
276
277        self.restart_worker(mode, watched_symbols)
278    }
279
280    pub fn update_manual_symbols(
281        &mut self,
282        mode: BinanceMode,
283        manual_symbols: Vec<String>,
284    ) -> Result<(), StorageError> {
285        let manual_symbols = normalize_symbols(manual_symbols);
286        let Some(status) = self.statuses.get_mut(&mode) else {
287            return Ok(());
288        };
289        if status.manual_symbols == manual_symbols {
290            return Ok(());
291        }
292        status.manual_symbols = manual_symbols.clone();
293        status.watched_symbols =
294            merge_symbol_sets(manual_symbols.clone(), status.strategy_symbols.clone());
295        status.updated_at = Utc::now();
296        let watched_symbols = status.watched_symbols.clone();
297        let should_restart = status.state == RecorderState::Running && self.network_enabled;
298
299        if !should_restart {
300            return Ok(());
301        }
302
303        self.restart_worker(mode, watched_symbols)
304    }
305
306    pub fn stop(&mut self, mode: BinanceMode) -> Result<RecorderStatus, StorageError> {
307        let Some(existing) = self.statuses.get_mut(&mode) else {
308            return Err(StorageError::RecorderNotRunning {
309                mode: mode.as_str().to_string(),
310            });
311        };
312        if existing.state != RecorderState::Running {
313            return Err(StorageError::RecorderNotRunning {
314                mode: mode.as_str().to_string(),
315            });
316        }
317
318        if let Some(worker) = self.workers.remove(&mode) {
319            if let Ok(snapshot) = worker.snapshot.lock() {
320                existing.updated_at = snapshot.updated_at;
321                existing.metrics = snapshot.metrics.clone();
322                existing.last_error = snapshot.last_error.clone();
323            }
324            worker.stop_flag.store(true, Ordering::Relaxed);
325        }
326
327        existing.state = RecorderState::Stopped;
328        existing.updated_at = Utc::now();
329        existing.worker_alive = false;
330        Ok(existing.clone())
331    }
332
333    pub fn backtest_dataset_summary(
334        &self,
335        mode: BinanceMode,
336        symbol: &str,
337        from: chrono::NaiveDate,
338        to: chrono::NaiveDate,
339    ) -> Result<BacktestDatasetSummary, StorageError> {
340        backtest_summary_for_path(&self.db_path(mode), mode, symbol, from, to)
341    }
342
343    pub fn worker_alive(&self, mode: BinanceMode) -> bool {
344        self.workers
345            .get(&mode)
346            .is_some_and(|worker| !worker.handle.is_finished())
347    }
348
349    pub fn metrics_for_path(db_path: &Path) -> Result<RecorderMetrics, StorageError> {
350        metrics_for_path(db_path)
351    }
352
353    pub fn backtest_summary_for_path(
354        db_path: &Path,
355        mode: BinanceMode,
356        symbol: &str,
357        from: chrono::NaiveDate,
358        to: chrono::NaiveDate,
359    ) -> Result<BacktestDatasetSummary, StorageError> {
360        backtest_summary_for_path(db_path, mode, symbol, from, to)
361    }
362
363    pub fn init_schema_for_path(db_path: &Path) -> Result<(), StorageError> {
364        init_schema_for_path(db_path)
365    }
366
367    fn restart_worker(
368        &mut self,
369        mode: BinanceMode,
370        watched_symbols: Vec<String>,
371    ) -> Result<(), StorageError> {
372        let initial_metrics = self.status(mode).metrics.clone();
373        if let Some(worker) = self.workers.remove(&mode) {
374            worker.stop_flag.store(true, Ordering::Relaxed);
375        }
376        let db_path = self.db_path(mode);
377        self.spawn_worker(mode, db_path, watched_symbols, initial_metrics)
378    }
379
380    fn spawn_worker(
381        &mut self,
382        mode: BinanceMode,
383        db_path: PathBuf,
384        watched_symbols: Vec<String>,
385        initial_metrics: RecorderMetrics,
386    ) -> Result<(), StorageError> {
387        let stop_flag = Arc::new(AtomicBool::new(false));
388        let worker_stop_flag = stop_flag.clone();
389        let snapshot = Arc::new(Mutex::new(WorkerSnapshot::new(initial_metrics)));
390        let worker_snapshot = snapshot.clone();
391        let storage_backend = self.storage_backend;
392        let postgres_url = self.postgres_url.clone();
393        let handle = std::thread::Builder::new()
394            .name(format!("market-recorder-{}", mode.as_str()))
395            .spawn(move || {
396                let _ = rustls::crypto::ring::default_provider().install_default();
397                let runtime = tokio::runtime::Builder::new_current_thread()
398                    .enable_all()
399                    .build();
400                let Ok(runtime) = runtime else {
401                    record_worker_error(
402                        &worker_snapshot,
403                        "failed to initialize tokio runtime".to_string(),
404                    );
405                    return;
406                };
407                runtime.block_on(async move {
408                    run_market_data_worker(
409                        mode,
410                        db_path,
411                        storage_backend,
412                        postgres_url,
413                        watched_symbols,
414                        worker_stop_flag,
415                        worker_snapshot,
416                    )
417                    .await;
418                });
419            })
420            .map_err(|error| StorageError::WriteFailedWithContext {
421                message: error.to_string(),
422            })?;
423        self.workers.insert(
424            mode,
425            ModeWorker {
426                stop_flag,
427                snapshot,
428                handle,
429            },
430        );
431        Ok(())
432    }
433
434    fn load_metrics(&self, db_path: &Path) -> Result<RecorderMetrics, StorageError> {
435        match self.storage_backend {
436            CollectorStorageBackend::DuckDb => metrics_for_path(db_path),
437            CollectorStorageBackend::Postgres => self
438                .postgres_url
439                .as_deref()
440                .ok_or_else(|| StorageError::WriteFailedWithContext {
441                    message: "postgres recorder backend selected but postgres URL is missing"
442                        .to_string(),
443                })
444                .and_then(metrics_for_postgres_url),
445        }
446    }
447
448    fn db_path(&self, mode: BinanceMode) -> PathBuf {
449        RecorderCoordination::new(self.base_dir.clone()).db_path(mode)
450    }
451
452    fn storage_target(&self, mode: BinanceMode) -> String {
453        match self.storage_backend {
454            CollectorStorageBackend::DuckDb => self.db_path(mode).display().to_string(),
455            CollectorStorageBackend::Postgres => self
456                .postgres_url
457                .as_deref()
458                .map(mask_postgres_url)
459                .unwrap_or_else(|| "postgres://***".to_string()),
460        }
461    }
462}
463
464impl Drop for MarketDataRecorder {
465    fn drop(&mut self) {
466        for worker in self.workers.values() {
467            worker.stop_flag.store(true, Ordering::Relaxed);
468        }
469    }
470}
471
472async fn run_market_data_worker(
473    mode: BinanceMode,
474    db_path: PathBuf,
475    storage_backend: CollectorStorageBackend,
476    postgres_url: Option<String>,
477    watched_symbols: Vec<String>,
478    stop_flag: Arc<AtomicBool>,
479    snapshot: Arc<Mutex<WorkerSnapshot>>,
480) {
481    let duck_connection = if storage_backend == CollectorStorageBackend::DuckDb {
482        match Connection::open(&db_path) {
483            Ok(connection) => Some(connection),
484            Err(_) => {
485                record_worker_error(
486                    &snapshot,
487                    format!("failed to open duckdb at {}", db_path.display()),
488                );
489                return;
490            }
491        }
492    } else {
493        None
494    };
495    let mut postgres_client = if storage_backend == CollectorStorageBackend::Postgres {
496        let Some(url) = postgres_url.as_deref() else {
497            record_worker_error(
498                &snapshot,
499                "postgres recorder backend missing URL".to_string(),
500            );
501            return;
502        };
503        match connect_postgres(url) {
504            Ok(mut client) => {
505                if let Err(error) = init_postgres_schema(&mut client, url) {
506                    record_worker_error(&snapshot, error.to_string());
507                    return;
508                }
509                Some(client)
510            }
511            Err(error) => {
512                record_worker_error(&snapshot, error.to_string());
513                return;
514            }
515        }
516    } else {
517        None
518    };
519    let mut agg_trade_bar_seconds = BTreeMap::new();
520
521    loop {
522        touch_worker_snapshot(&snapshot);
523        if stop_flag.load(Ordering::Relaxed) {
524            break;
525        }
526
527        let force_order_url = format!("{}/ws/!forceOrder@arr", market_stream_base_url(mode));
528        let symbol_stream_url = combined_symbol_stream_url(mode, &watched_symbols);
529
530        let force_stream = connect_async(force_order_url).await;
531        let mut force_stream = match force_stream {
532            Ok((stream, _)) => stream,
533            Err(error) => {
534                record_worker_error(&snapshot, format!("forceOrder connect failed: {error}"));
535                eprintln!(
536                    "market recorder: failed to connect forceOrder stream mode={} error={}",
537                    mode.as_str(),
538                    error
539                );
540                tokio::time::sleep(Duration::from_secs(2)).await;
541                continue;
542            }
543        };
544
545        let mut symbol_stream = match symbol_stream_url {
546            Some(url) => match connect_async(url).await {
547                Ok((stream, _)) => Some(stream),
548                Err(error) => {
549                    record_worker_error(
550                        &snapshot,
551                        format!("symbol stream connect failed: {error}"),
552                    );
553                    eprintln!(
554                        "market recorder: failed to connect symbol streams mode={} symbols={} error={}",
555                        mode.as_str(),
556                        watched_symbols.join(","),
557                        error
558                    );
559                    None
560                }
561            },
562            None => None,
563        };
564
565        let mut liquidation_seq = 0i64;
566        let mut ticker_seq = 0i64;
567        let mut trade_seq = 0i64;
568
569        loop {
570            if stop_flag.load(Ordering::Relaxed) {
571                return;
572            }
573
574            tokio::select! {
575                message = force_stream.next() => {
576                    match message {
577                        Some(Ok(message)) => {
578                            if let Err(error) = handle_force_order_message(
579                                duck_connection.as_ref(),
580                                postgres_client.as_mut(),
581                                mode,
582                                &mut liquidation_seq,
583                                &snapshot,
584                                message
585                            ) {
586                                record_worker_error(&snapshot, error.to_string());
587                                eprintln!(
588                                    "market recorder: forceOrder stream handling failed mode={} error={}",
589                                    mode.as_str(),
590                                    error
591                                );
592                                break;
593                            }
594                        }
595                        Some(Err(error)) => {
596                            record_worker_error(&snapshot, format!("forceOrder stream disconnected: {error}"));
597                            eprintln!(
598                                "market recorder: forceOrder stream disconnected mode={} error={}",
599                                mode.as_str(),
600                                error
601                            );
602                            break
603                        }
604                        None => {
605                            record_worker_error(&snapshot, "forceOrder stream disconnected: eof".to_string());
606                            eprintln!(
607                                "market recorder: forceOrder stream disconnected mode={} error=eof",
608                                mode.as_str()
609                            );
610                            break
611                        }
612                    }
613                }
614                message = next_symbol_message(&mut symbol_stream), if symbol_stream.is_some() => {
615                    match message {
616                        Some(Ok(message)) => {
617                            if let Err(error) = handle_symbol_message(
618                                duck_connection.as_ref(),
619                                postgres_client.as_mut(),
620                                mode,
621                                &mut ticker_seq,
622                                &mut trade_seq,
623                                &mut agg_trade_bar_seconds,
624                                &snapshot,
625                                message,
626                            ) {
627                                record_worker_error(&snapshot, error.to_string());
628                                eprintln!(
629                                    "market recorder: symbol stream handling failed mode={} error={}",
630                                    mode.as_str(),
631                                    error
632                                );
633                                break;
634                            }
635                        }
636                        Some(Err(error)) => {
637                            record_worker_error(&snapshot, format!("symbol stream disconnected: {error}"));
638                            eprintln!(
639                                "market recorder: symbol stream disconnected mode={} symbols={} error={}",
640                                mode.as_str(),
641                                watched_symbols.join(","),
642                                error
643                            );
644                            break
645                        }
646                        None => {
647                            record_worker_error(&snapshot, "symbol stream disconnected: eof".to_string());
648                            eprintln!(
649                                "market recorder: symbol stream disconnected mode={} symbols={} error=eof",
650                                mode.as_str(),
651                                watched_symbols.join(",")
652                            );
653                            break
654                        }
655                    }
656                }
657                _ = tokio::time::sleep(Duration::from_millis(250)) => {
658                    touch_worker_snapshot(&snapshot);
659                }
660            }
661        }
662
663        tokio::time::sleep(Duration::from_secs(1)).await;
664    }
665}
666
667async fn next_symbol_message(
668    stream: &mut Option<
669        tokio_tungstenite::WebSocketStream<
670            tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
671        >,
672    >,
673) -> Option<Result<Message, tokio_tungstenite::tungstenite::Error>> {
674    match stream {
675        Some(stream) => stream.next().await,
676        None => None,
677    }
678}
679
680fn handle_force_order_message(
681    duck_connection: Option<&Connection>,
682    postgres_client: Option<&mut postgres::Client>,
683    mode: BinanceMode,
684    sequence: &mut i64,
685    snapshot: &Arc<Mutex<WorkerSnapshot>>,
686    message: Message,
687) -> Result<(), StorageError> {
688    let payload = match message {
689        Message::Text(text) => text,
690        Message::Ping(_) | Message::Pong(_) | Message::Binary(_) => return Ok(()),
691        Message::Close(_) => {
692            return Err(StorageError::WriteFailedWithContext {
693                message: "forceOrder stream closed".to_string(),
694            })
695        }
696        Message::Frame(_) => return Ok(()),
697    };
698    let parsed: ForceOrderEnvelope =
699        serde_json::from_str(&payload).map_err(|error| StorageError::WriteFailedWithContext {
700            message: format!("forceOrder parse failed: {error}; payload={payload}"),
701        })?;
702    let Some(order) = parsed.order else {
703        return Ok(());
704    };
705    *sequence += 1;
706    let receive_time_ms = Utc::now().timestamp_millis();
707    let symbol = order.symbol.clone();
708    let side = order.side.clone();
709    if let Some(connection) = duck_connection {
710        connection
711            .execute(
712                "INSERT INTO raw_liquidation_events (
713                    event_id, mode, symbol, event_time, receive_time, force_side, price, qty, notional, raw_payload
714                 ) VALUES (
715                    ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?, ?
716                 )",
717                params![
718                    *sequence,
719                    mode.as_str(),
720                    symbol,
721                    parsed.event_time,
722                    receive_time_ms,
723                    side,
724                    order.price,
725                    order.qty,
726                    order.price * order.qty,
727                    payload,
728                ],
729            )
730            .map_err(|error| StorageError::WriteFailedWithContext {
731                message: error.to_string(),
732            })?;
733    } else if let Some(client) = postgres_client {
734        insert_liquidation(
735            client,
736            &PostgresLiquidationRecord {
737                mode: mode.as_str().to_string(),
738                product: "um".to_string(),
739                symbol: symbol.clone(),
740                event_time_ms: parsed.event_time,
741                receive_time_ms,
742                force_side: side,
743                price: order.price,
744                qty: order.qty,
745                notional: order.price * order.qty,
746                raw_payload: payload,
747            },
748        )?;
749    } else {
750        return Err(StorageError::WriteFailedWithContext {
751            message: "no recorder storage backend available".to_string(),
752        });
753    }
754    record_force_order_event(snapshot, &symbol, parsed.event_time);
755    Ok(())
756}
757
758fn handle_symbol_message(
759    duck_connection: Option<&Connection>,
760    postgres_client: Option<&mut postgres::Client>,
761    mode: BinanceMode,
762    ticker_sequence: &mut i64,
763    trade_sequence: &mut i64,
764    agg_trade_bar_seconds: &mut BTreeMap<String, i64>,
765    snapshot: &Arc<Mutex<WorkerSnapshot>>,
766    message: Message,
767) -> Result<(), StorageError> {
768    let payload = match message {
769        Message::Text(text) => text,
770        Message::Ping(_) | Message::Pong(_) | Message::Binary(_) => return Ok(()),
771        Message::Close(_) => {
772            return Err(StorageError::WriteFailedWithContext {
773                message: "symbol stream closed".to_string(),
774            })
775        }
776        Message::Frame(_) => return Ok(()),
777    };
778    let parsed: CombinedStreamEnvelope =
779        serde_json::from_str(&payload).map_err(|error| StorageError::WriteFailedWithContext {
780            message: format!("symbol stream parse failed: {error}; payload={payload}"),
781        })?;
782    let receive_time_ms = Utc::now().timestamp_millis();
783
784    if parsed.data.event_type == "bookTicker" {
785        let Some(symbol) = parsed.data.symbol else {
786            return Ok(());
787        };
788        let Some(event_time) = parsed.data.event_time else {
789            return Ok(());
790        };
791        let Some(bid) = parsed.data.bid else {
792            return Ok(());
793        };
794        let Some(bid_qty) = parsed.data.bid_qty else {
795            return Ok(());
796        };
797        let Some(ask) = parsed.data.ask else {
798            return Ok(());
799        };
800        let Some(ask_qty) = parsed.data.ask_qty else {
801            return Ok(());
802        };
803        *ticker_sequence += 1;
804        if let Some(connection) = duck_connection {
805            connection
806                .execute(
807                    "INSERT INTO raw_book_ticker (
808                        tick_id, mode, symbol, event_time, receive_time, bid, bid_qty, ask, ask_qty
809                     ) VALUES (
810                        ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?
811                     )",
812                    params![
813                        *ticker_sequence,
814                        mode.as_str(),
815                        symbol,
816                        event_time,
817                        receive_time_ms,
818                        bid,
819                        bid_qty,
820                        ask,
821                        ask_qty,
822                    ],
823                )
824                .map_err(|error| StorageError::WriteFailedWithContext {
825                    message: error.to_string(),
826                })?;
827        } else if let Some(client) = postgres_client {
828            insert_book_ticker(
829                client,
830                &PostgresBookTickerRecord {
831                    mode: mode.as_str().to_string(),
832                    symbol: symbol.clone(),
833                    event_time_ms: event_time,
834                    receive_time_ms,
835                    bid,
836                    bid_qty,
837                    ask,
838                    ask_qty,
839                },
840            )?;
841        } else {
842            return Err(StorageError::WriteFailedWithContext {
843                message: "no recorder storage backend available".to_string(),
844            });
845        }
846        record_book_ticker_event(snapshot, &symbol, event_time);
847    } else if parsed.data.event_type == "aggTrade" {
848        let Some(symbol) = parsed.data.symbol else {
849            return Ok(());
850        };
851        let Some(event_time) = parsed.data.event_time else {
852            return Ok(());
853        };
854        let Some(price) = parsed.data.price else {
855            return Ok(());
856        };
857        let Some(qty) = parsed.data.qty else {
858            return Ok(());
859        };
860        let Some(is_buyer_maker) = parsed.data.is_buyer_maker else {
861            return Ok(());
862        };
863        *trade_sequence += 1;
864        if let Some(connection) = duck_connection {
865            connection
866                .execute(
867                    "INSERT INTO raw_agg_trades (
868                        trade_id, mode, symbol, event_time, receive_time, price, qty, is_buyer_maker
869                     ) VALUES (
870                        ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?
871                     )",
872                    params![
873                        *trade_sequence,
874                        mode.as_str(),
875                        symbol,
876                        event_time,
877                        receive_time_ms,
878                        price,
879                        qty,
880                        is_buyer_maker,
881                    ],
882                )
883                .map_err(|error| StorageError::WriteFailedWithContext {
884                    message: error.to_string(),
885                })?;
886        } else if let Some(client) = postgres_client {
887            insert_agg_trade(
888                client,
889                &PostgresAggTradeRecord {
890                    mode: mode.as_str().to_string(),
891                    symbol: symbol.clone(),
892                    event_time_ms: event_time,
893                    receive_time_ms,
894                    price,
895                    qty,
896                    is_buyer_maker,
897                },
898            )?;
899        } else {
900            return Err(StorageError::WriteFailedWithContext {
901                message: "no recorder storage backend available".to_string(),
902            });
903        }
904        record_agg_trade_event(snapshot, &symbol, event_time, agg_trade_bar_seconds);
905    }
906
907    Ok(())
908}
909
910fn market_stream_base_url(mode: BinanceMode) -> &'static str {
911    let _ = mode;
912    "wss://fstream.binance.com"
913}
914
915fn combined_symbol_stream_url(mode: BinanceMode, watched_symbols: &[String]) -> Option<String> {
916    if watched_symbols.is_empty() {
917        return None;
918    }
919
920    let streams = watched_symbols
921        .iter()
922        .flat_map(|symbol| {
923            let lower = symbol.to_ascii_lowercase();
924            [format!("{lower}@bookTicker"), format!("{lower}@aggTrade")]
925        })
926        .collect::<Vec<_>>()
927        .join("/");
928    Some(format!(
929        "{}/stream?streams={streams}",
930        market_stream_base_url(mode)
931    ))
932}
933
934fn normalize_symbols(symbols: Vec<String>) -> Vec<String> {
935    let mut normalized = BTreeSet::new();
936    for symbol in symbols {
937        normalized.insert(symbol.trim().to_ascii_uppercase());
938    }
939    normalized.into_iter().collect()
940}
941
942fn merge_symbol_sets(left: Vec<String>, right: Vec<String>) -> Vec<String> {
943    let mut merged = BTreeSet::new();
944    for symbol in left.into_iter().chain(right.into_iter()) {
945        merged.insert(symbol);
946    }
947    merged.into_iter().collect()
948}
949
950fn parse_storage_backend(value: &str) -> CollectorStorageBackend {
951    match value {
952        "postgres" => CollectorStorageBackend::Postgres,
953        _ => CollectorStorageBackend::DuckDb,
954    }
955}
956
957#[derive(Debug, Deserialize)]
958struct ForceOrderEnvelope {
959    #[serde(rename = "E")]
960    event_time: i64,
961    #[serde(rename = "o")]
962    order: Option<ForceOrderData>,
963}
964
965#[derive(Debug, Deserialize)]
966struct ForceOrderData {
967    #[serde(rename = "s")]
968    symbol: String,
969    #[serde(rename = "S")]
970    side: String,
971    #[serde(rename = "p", deserialize_with = "deserialize_string_number")]
972    price: f64,
973    #[serde(rename = "q", deserialize_with = "deserialize_string_number")]
974    qty: f64,
975}
976
977#[derive(Debug, Deserialize)]
978struct CombinedStreamEnvelope {
979    data: CombinedStreamData,
980}
981
982#[derive(Debug, Deserialize)]
983struct CombinedStreamData {
984    #[serde(rename = "e")]
985    event_type: String,
986    #[serde(rename = "E")]
987    event_time: Option<i64>,
988    #[serde(rename = "s")]
989    symbol: Option<String>,
990    #[serde(
991        rename = "b",
992        default,
993        deserialize_with = "deserialize_optional_string_number"
994    )]
995    bid: Option<f64>,
996    #[serde(
997        rename = "B",
998        default,
999        deserialize_with = "deserialize_optional_string_number"
1000    )]
1001    bid_qty: Option<f64>,
1002    #[serde(
1003        rename = "a",
1004        default,
1005        deserialize_with = "deserialize_optional_string_number"
1006    )]
1007    ask: Option<f64>,
1008    #[serde(
1009        rename = "A",
1010        default,
1011        deserialize_with = "deserialize_optional_string_number"
1012    )]
1013    ask_qty: Option<f64>,
1014    #[serde(
1015        rename = "p",
1016        default,
1017        deserialize_with = "deserialize_optional_string_number"
1018    )]
1019    price: Option<f64>,
1020    #[serde(
1021        rename = "q",
1022        default,
1023        deserialize_with = "deserialize_optional_string_number"
1024    )]
1025    qty: Option<f64>,
1026    #[serde(rename = "m")]
1027    is_buyer_maker: Option<bool>,
1028}
1029
1030fn deserialize_string_number<'de, D>(deserializer: D) -> Result<f64, D::Error>
1031where
1032    D: serde::Deserializer<'de>,
1033{
1034    let value = String::deserialize(deserializer)?;
1035    value.parse::<f64>().map_err(serde::de::Error::custom)
1036}
1037
1038fn deserialize_optional_string_number<'de, D>(deserializer: D) -> Result<Option<f64>, D::Error>
1039where
1040    D: serde::Deserializer<'de>,
1041{
1042    let value = Option::<serde_json::Value>::deserialize(deserializer)?;
1043    match value {
1044        None | Some(serde_json::Value::Null) => Ok(None),
1045        Some(serde_json::Value::String(value)) => value
1046            .parse::<f64>()
1047            .map(Some)
1048            .map_err(serde::de::Error::custom),
1049        Some(serde_json::Value::Number(value)) => value
1050            .as_f64()
1051            .ok_or_else(|| serde::de::Error::custom("invalid numeric value"))
1052            .map(Some),
1053        Some(other) => Err(serde::de::Error::custom(format!(
1054            "expected string or number, got {other}"
1055        ))),
1056    }
1057}
1058
1059fn touch_worker_snapshot(snapshot: &Arc<Mutex<WorkerSnapshot>>) {
1060    if let Ok(mut snapshot) = snapshot.lock() {
1061        snapshot.updated_at = Utc::now();
1062    }
1063}
1064
1065fn record_worker_error(snapshot: &Arc<Mutex<WorkerSnapshot>>, error: String) {
1066    if let Ok(mut snapshot) = snapshot.lock() {
1067        snapshot.updated_at = Utc::now();
1068        snapshot.last_error = Some(error);
1069    }
1070}
1071
1072fn record_force_order_event(
1073    snapshot: &Arc<Mutex<WorkerSnapshot>>,
1074    symbol: &str,
1075    event_time_ms: i64,
1076) {
1077    if let Ok(mut snapshot) = snapshot.lock() {
1078        snapshot.updated_at = Utc::now();
1079        snapshot.last_error = None;
1080        snapshot.metrics.liquidation_events += 1;
1081        snapshot.metrics.last_liquidation_event_time = timestamp_string(event_time_ms);
1082        increment_top_symbol(&mut snapshot.metrics.top_liquidation_symbols, symbol);
1083    }
1084}
1085
1086fn record_book_ticker_event(
1087    snapshot: &Arc<Mutex<WorkerSnapshot>>,
1088    symbol: &str,
1089    event_time_ms: i64,
1090) {
1091    if let Ok(mut snapshot) = snapshot.lock() {
1092        snapshot.updated_at = Utc::now();
1093        snapshot.last_error = None;
1094        snapshot.metrics.book_ticker_events += 1;
1095        snapshot.metrics.last_book_ticker_event_time = timestamp_string(event_time_ms);
1096        increment_top_symbol(&mut snapshot.metrics.top_book_ticker_symbols, symbol);
1097    }
1098}
1099
1100fn record_agg_trade_event(
1101    snapshot: &Arc<Mutex<WorkerSnapshot>>,
1102    symbol: &str,
1103    event_time_ms: i64,
1104    agg_trade_bar_seconds: &mut BTreeMap<String, i64>,
1105) {
1106    if let Ok(mut snapshot) = snapshot.lock() {
1107        snapshot.updated_at = Utc::now();
1108        snapshot.last_error = None;
1109        snapshot.metrics.agg_trade_events += 1;
1110        snapshot.metrics.last_agg_trade_event_time = timestamp_string(event_time_ms);
1111        increment_top_symbol(&mut snapshot.metrics.top_agg_trade_symbols, symbol);
1112        let bar_second = event_time_ms / 1_000;
1113        let should_increment_bar = agg_trade_bar_seconds
1114            .insert(symbol.to_string(), bar_second)
1115            .map(|previous| previous != bar_second)
1116            .unwrap_or(true);
1117        if should_increment_bar {
1118            snapshot.metrics.derived_kline_1s_bars += 1;
1119        }
1120    }
1121}
1122
1123fn increment_top_symbol(top_symbols: &mut Vec<String>, symbol: &str) {
1124    let mut counts = top_symbols
1125        .iter()
1126        .filter_map(|entry| {
1127            let (symbol, count) = entry.split_once(':')?;
1128            let count = count.parse::<u64>().ok()?;
1129            Some((symbol.to_string(), count))
1130        })
1131        .collect::<BTreeMap<_, _>>();
1132    *counts.entry(symbol.to_string()).or_default() += 1;
1133    let mut sorted = counts.into_iter().collect::<Vec<_>>();
1134    sorted.sort_by(|left, right| right.1.cmp(&left.1).then_with(|| left.0.cmp(&right.0)));
1135    *top_symbols = sorted
1136        .into_iter()
1137        .take(5)
1138        .map(|(symbol, count)| format!("{symbol}:{count}"))
1139        .collect();
1140}
1141
1142fn timestamp_string(event_time_ms: i64) -> Option<String> {
1143    Utc.timestamp_millis_opt(event_time_ms)
1144        .single()
1145        .map(|value| {
1146            value
1147                .naive_utc()
1148                .format("%Y-%m-%d %H:%M:%S%.3f")
1149                .to_string()
1150        })
1151}