Skip to main content

sandbox_quant/recorder_app/
runtime.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::path::{Path, PathBuf};
3use std::sync::{
4    atomic::{AtomicBool, Ordering},
5    Arc, Mutex,
6};
7use std::thread::JoinHandle;
8use std::time::Duration;
9
10use chrono::{DateTime, TimeZone, Utc};
11use duckdb::{params, Connection};
12use futures_util::StreamExt;
13use serde::Deserialize;
14use tokio_tungstenite::connect_async;
15use tokio_tungstenite::tungstenite::Message;
16
17use crate::app::bootstrap::BinanceMode;
18use crate::dataset::query::{backtest_summary_for_path, metrics_for_path};
19use crate::dataset::schema::init_schema_for_path;
20use crate::dataset::types::{BacktestDatasetSummary, RecorderMetrics};
21use crate::error::storage_error::StorageError;
22use crate::record::coordination::RecorderCoordination;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum RecorderState {
26    Running,
27    Stopped,
28}
29
30impl RecorderState {
31    pub fn as_str(self) -> &'static str {
32        match self {
33            Self::Running => "running",
34            Self::Stopped => "stopped",
35        }
36    }
37
38    pub fn is_running(self) -> bool {
39        self == Self::Running
40    }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub struct RecorderStatus {
45    pub mode: BinanceMode,
46    pub state: RecorderState,
47    pub db_path: PathBuf,
48    pub started_at: Option<DateTime<Utc>>,
49    pub updated_at: DateTime<Utc>,
50    pub manual_symbols: Vec<String>,
51    pub strategy_symbols: Vec<String>,
52    pub watched_symbols: Vec<String>,
53    pub worker_alive: bool,
54    pub heartbeat_age_sec: i64,
55    pub last_error: Option<String>,
56    pub metrics: RecorderMetrics,
57}
58
59impl RecorderStatus {
60    fn new(
61        mode: BinanceMode,
62        state: RecorderState,
63        db_path: PathBuf,
64        manual_symbols: Vec<String>,
65        strategy_symbols: Vec<String>,
66        watched_symbols: Vec<String>,
67    ) -> Self {
68        let now = Utc::now();
69        Self {
70            mode,
71            state,
72            db_path,
73            started_at: if state == RecorderState::Running {
74                Some(now)
75            } else {
76                None
77            },
78            updated_at: now,
79            manual_symbols,
80            strategy_symbols,
81            watched_symbols,
82            worker_alive: true,
83            heartbeat_age_sec: 0,
84            last_error: None,
85            metrics: RecorderMetrics::default(),
86        }
87    }
88}
89
90#[derive(Debug, Clone)]
91struct WorkerSnapshot {
92    updated_at: DateTime<Utc>,
93    metrics: RecorderMetrics,
94    last_error: Option<String>,
95}
96
97impl WorkerSnapshot {
98    fn new(metrics: RecorderMetrics) -> Self {
99        Self {
100            updated_at: Utc::now(),
101            metrics,
102            last_error: None,
103        }
104    }
105}
106
107struct ModeWorker {
108    stop_flag: Arc<AtomicBool>,
109    snapshot: Arc<Mutex<WorkerSnapshot>>,
110    pub(crate) handle: JoinHandle<()>,
111}
112
113pub struct MarketDataRecorder {
114    base_dir: PathBuf,
115    network_enabled: bool,
116    statuses: BTreeMap<BinanceMode, RecorderStatus>,
117    workers: BTreeMap<BinanceMode, ModeWorker>,
118}
119
120impl std::fmt::Debug for MarketDataRecorder {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        f.debug_struct("MarketDataRecorder")
123            .field("base_dir", &self.base_dir)
124            .field("network_enabled", &self.network_enabled)
125            .field("statuses", &self.statuses)
126            .finish()
127    }
128}
129
130impl Default for MarketDataRecorder {
131    fn default() -> Self {
132        Self::new("var")
133    }
134}
135
136impl MarketDataRecorder {
137    pub fn new(base_dir: impl Into<PathBuf>) -> Self {
138        Self {
139            base_dir: base_dir.into(),
140            network_enabled: true,
141            statuses: BTreeMap::new(),
142            workers: BTreeMap::new(),
143        }
144    }
145
146    pub fn without_network(mut self) -> Self {
147        self.network_enabled = false;
148        self
149    }
150
151    pub fn start(
152        &mut self,
153        mode: BinanceMode,
154        manual_symbols: Vec<String>,
155        strategy_symbols: Vec<String>,
156    ) -> Result<RecorderStatus, StorageError> {
157        if self
158            .statuses
159            .get(&mode)
160            .is_some_and(|status| status.state == RecorderState::Running)
161        {
162            return Err(StorageError::RecorderAlreadyRunning {
163                mode: mode.as_str().to_string(),
164            });
165        }
166
167        let db_path = self.db_path(mode);
168        init_schema_for_path(&db_path)?;
169        let manual_symbols = normalize_symbols(manual_symbols);
170        let strategy_symbols = normalize_symbols(strategy_symbols);
171        let watched_symbols = merge_symbol_sets(manual_symbols.clone(), strategy_symbols.clone());
172        let status = RecorderStatus::new(
173            mode,
174            RecorderState::Running,
175            db_path.clone(),
176            manual_symbols,
177            strategy_symbols,
178            watched_symbols.clone(),
179        );
180        let initial_metrics = self.load_metrics(&db_path).unwrap_or_default();
181        let mut status = status;
182        status.metrics = initial_metrics.clone();
183        if self.network_enabled {
184            self.spawn_worker(mode, db_path, watched_symbols, initial_metrics)?;
185        }
186        self.statuses.insert(mode, status.clone());
187        Ok(status)
188    }
189
190    pub fn status(&self, mode: BinanceMode) -> RecorderStatus {
191        let mut status = self.statuses.get(&mode).cloned().unwrap_or_else(|| {
192            RecorderStatus::new(
193                mode,
194                RecorderState::Stopped,
195                self.db_path(mode),
196                Vec::new(),
197                Vec::new(),
198                Vec::new(),
199            )
200        });
201        if let Some(worker) = self.workers.get(&mode) {
202            let worker_alive = !worker.handle.is_finished();
203            status.worker_alive = worker_alive;
204            if let Ok(snapshot) = worker.snapshot.lock() {
205                status.updated_at = snapshot.updated_at;
206                status.heartbeat_age_sec = (Utc::now() - snapshot.updated_at).num_seconds();
207                status.metrics = snapshot.metrics.clone();
208                status.last_error = snapshot.last_error.clone();
209            }
210        } else {
211            status.worker_alive = false;
212            status.heartbeat_age_sec = (Utc::now() - status.updated_at).num_seconds();
213            status.metrics = self.load_metrics(&status.db_path).unwrap_or_default();
214        }
215        status
216    }
217
218    pub fn update_strategy_symbols(
219        &mut self,
220        mode: BinanceMode,
221        strategy_symbols: Vec<String>,
222    ) -> Result<(), StorageError> {
223        let strategy_symbols = normalize_symbols(strategy_symbols);
224        let Some(status) = self.statuses.get_mut(&mode) else {
225            return Ok(());
226        };
227        if status.strategy_symbols == strategy_symbols {
228            return Ok(());
229        }
230        status.strategy_symbols = strategy_symbols.clone();
231        status.watched_symbols =
232            merge_symbol_sets(status.manual_symbols.clone(), strategy_symbols.clone());
233        status.updated_at = Utc::now();
234        let watched_symbols = status.watched_symbols.clone();
235        let should_restart = status.state == RecorderState::Running && self.network_enabled;
236
237        if !should_restart {
238            return Ok(());
239        }
240
241        self.restart_worker(mode, watched_symbols)
242    }
243
244    pub fn update_manual_symbols(
245        &mut self,
246        mode: BinanceMode,
247        manual_symbols: Vec<String>,
248    ) -> Result<(), StorageError> {
249        let manual_symbols = normalize_symbols(manual_symbols);
250        let Some(status) = self.statuses.get_mut(&mode) else {
251            return Ok(());
252        };
253        if status.manual_symbols == manual_symbols {
254            return Ok(());
255        }
256        status.manual_symbols = manual_symbols.clone();
257        status.watched_symbols =
258            merge_symbol_sets(manual_symbols.clone(), status.strategy_symbols.clone());
259        status.updated_at = Utc::now();
260        let watched_symbols = status.watched_symbols.clone();
261        let should_restart = status.state == RecorderState::Running && self.network_enabled;
262
263        if !should_restart {
264            return Ok(());
265        }
266
267        self.restart_worker(mode, watched_symbols)
268    }
269
270    pub fn stop(&mut self, mode: BinanceMode) -> Result<RecorderStatus, StorageError> {
271        let Some(existing) = self.statuses.get_mut(&mode) else {
272            return Err(StorageError::RecorderNotRunning {
273                mode: mode.as_str().to_string(),
274            });
275        };
276        if existing.state != RecorderState::Running {
277            return Err(StorageError::RecorderNotRunning {
278                mode: mode.as_str().to_string(),
279            });
280        }
281
282        if let Some(worker) = self.workers.remove(&mode) {
283            if let Ok(snapshot) = worker.snapshot.lock() {
284                existing.updated_at = snapshot.updated_at;
285                existing.metrics = snapshot.metrics.clone();
286                existing.last_error = snapshot.last_error.clone();
287            }
288            worker.stop_flag.store(true, Ordering::Relaxed);
289        }
290
291        existing.state = RecorderState::Stopped;
292        existing.updated_at = Utc::now();
293        existing.worker_alive = false;
294        Ok(existing.clone())
295    }
296
297    pub fn backtest_dataset_summary(
298        &self,
299        mode: BinanceMode,
300        symbol: &str,
301        from: chrono::NaiveDate,
302        to: chrono::NaiveDate,
303    ) -> Result<BacktestDatasetSummary, StorageError> {
304        backtest_summary_for_path(&self.db_path(mode), mode, symbol, from, to)
305    }
306
307    pub fn worker_alive(&self, mode: BinanceMode) -> bool {
308        self.workers
309            .get(&mode)
310            .is_some_and(|worker| !worker.handle.is_finished())
311    }
312
313    pub fn metrics_for_path(db_path: &Path) -> Result<RecorderMetrics, StorageError> {
314        metrics_for_path(db_path)
315    }
316
317    pub fn backtest_summary_for_path(
318        db_path: &Path,
319        mode: BinanceMode,
320        symbol: &str,
321        from: chrono::NaiveDate,
322        to: chrono::NaiveDate,
323    ) -> Result<BacktestDatasetSummary, StorageError> {
324        backtest_summary_for_path(db_path, mode, symbol, from, to)
325    }
326
327    pub fn init_schema_for_path(db_path: &Path) -> Result<(), StorageError> {
328        init_schema_for_path(db_path)
329    }
330
331    fn restart_worker(
332        &mut self,
333        mode: BinanceMode,
334        watched_symbols: Vec<String>,
335    ) -> Result<(), StorageError> {
336        let initial_metrics = self.status(mode).metrics.clone();
337        if let Some(worker) = self.workers.remove(&mode) {
338            worker.stop_flag.store(true, Ordering::Relaxed);
339        }
340        let db_path = self.db_path(mode);
341        self.spawn_worker(mode, db_path, watched_symbols, initial_metrics)
342    }
343
344    fn spawn_worker(
345        &mut self,
346        mode: BinanceMode,
347        db_path: PathBuf,
348        watched_symbols: Vec<String>,
349        initial_metrics: RecorderMetrics,
350    ) -> Result<(), StorageError> {
351        let stop_flag = Arc::new(AtomicBool::new(false));
352        let worker_stop_flag = stop_flag.clone();
353        let snapshot = Arc::new(Mutex::new(WorkerSnapshot::new(initial_metrics)));
354        let worker_snapshot = snapshot.clone();
355        let handle = std::thread::Builder::new()
356            .name(format!("market-recorder-{}", mode.as_str()))
357            .spawn(move || {
358                let _ = rustls::crypto::ring::default_provider().install_default();
359                let runtime = tokio::runtime::Builder::new_current_thread()
360                    .enable_all()
361                    .build();
362                let Ok(runtime) = runtime else {
363                    record_worker_error(
364                        &worker_snapshot,
365                        "failed to initialize tokio runtime".to_string(),
366                    );
367                    return;
368                };
369                runtime.block_on(async move {
370                    run_market_data_worker(
371                        mode,
372                        db_path,
373                        watched_symbols,
374                        worker_stop_flag,
375                        worker_snapshot,
376                    )
377                    .await;
378                });
379            })
380            .map_err(|error| StorageError::WriteFailedWithContext {
381                message: error.to_string(),
382            })?;
383        self.workers.insert(
384            mode,
385            ModeWorker {
386                stop_flag,
387                snapshot,
388                handle,
389            },
390        );
391        Ok(())
392    }
393
394    fn load_metrics(&self, db_path: &Path) -> Result<RecorderMetrics, StorageError> {
395        metrics_for_path(db_path)
396    }
397
398    fn db_path(&self, mode: BinanceMode) -> PathBuf {
399        RecorderCoordination::new(self.base_dir.clone()).db_path(mode)
400    }
401}
402
403impl Drop for MarketDataRecorder {
404    fn drop(&mut self) {
405        for worker in self.workers.values() {
406            worker.stop_flag.store(true, Ordering::Relaxed);
407        }
408    }
409}
410
411async fn run_market_data_worker(
412    mode: BinanceMode,
413    db_path: PathBuf,
414    watched_symbols: Vec<String>,
415    stop_flag: Arc<AtomicBool>,
416    snapshot: Arc<Mutex<WorkerSnapshot>>,
417) {
418    let Ok(connection) = Connection::open(&db_path) else {
419        record_worker_error(
420            &snapshot,
421            format!("failed to open duckdb at {}", db_path.display()),
422        );
423        return;
424    };
425    let mut agg_trade_bar_seconds = BTreeMap::new();
426
427    loop {
428        touch_worker_snapshot(&snapshot);
429        if stop_flag.load(Ordering::Relaxed) {
430            break;
431        }
432
433        let force_order_url = format!("{}/ws/!forceOrder@arr", market_stream_base_url(mode));
434        let symbol_stream_url = combined_symbol_stream_url(mode, &watched_symbols);
435
436        let force_stream = connect_async(force_order_url).await;
437        let mut force_stream = match force_stream {
438            Ok((stream, _)) => stream,
439            Err(error) => {
440                record_worker_error(&snapshot, format!("forceOrder connect failed: {error}"));
441                eprintln!(
442                    "market recorder: failed to connect forceOrder stream mode={} error={}",
443                    mode.as_str(),
444                    error
445                );
446                tokio::time::sleep(Duration::from_secs(2)).await;
447                continue;
448            }
449        };
450
451        let mut symbol_stream = match symbol_stream_url {
452            Some(url) => match connect_async(url).await {
453                Ok((stream, _)) => Some(stream),
454                Err(error) => {
455                    record_worker_error(
456                        &snapshot,
457                        format!("symbol stream connect failed: {error}"),
458                    );
459                    eprintln!(
460                        "market recorder: failed to connect symbol streams mode={} symbols={} error={}",
461                        mode.as_str(),
462                        watched_symbols.join(","),
463                        error
464                    );
465                    None
466                }
467            },
468            None => None,
469        };
470
471        let mut liquidation_seq = 0i64;
472        let mut ticker_seq = 0i64;
473        let mut trade_seq = 0i64;
474
475        loop {
476            if stop_flag.load(Ordering::Relaxed) {
477                return;
478            }
479
480            tokio::select! {
481                message = force_stream.next() => {
482                    match message {
483                        Some(Ok(message)) => {
484                            if let Err(error) = handle_force_order_message(&connection, mode, &mut liquidation_seq, message) {
485                                record_worker_error(&snapshot, error.to_string());
486                                eprintln!(
487                                    "market recorder: forceOrder stream handling failed mode={} error={}",
488                                    mode.as_str(),
489                                    error
490                                );
491                                break;
492                            }
493                            record_force_order_event(&snapshot, &connection);
494                        }
495                        Some(Err(error)) => {
496                            record_worker_error(&snapshot, format!("forceOrder stream disconnected: {error}"));
497                            eprintln!(
498                                "market recorder: forceOrder stream disconnected mode={} error={}",
499                                mode.as_str(),
500                                error
501                            );
502                            break
503                        }
504                        None => {
505                            record_worker_error(&snapshot, "forceOrder stream disconnected: eof".to_string());
506                            eprintln!(
507                                "market recorder: forceOrder stream disconnected mode={} error=eof",
508                                mode.as_str()
509                            );
510                            break
511                        }
512                    }
513                }
514                message = next_symbol_message(&mut symbol_stream), if symbol_stream.is_some() => {
515                    match message {
516                        Some(Ok(message)) => {
517                            if let Err(error) = handle_symbol_message(
518                                &connection,
519                                mode,
520                                &mut ticker_seq,
521                                &mut trade_seq,
522                                &mut agg_trade_bar_seconds,
523                                &snapshot,
524                                message,
525                            ) {
526                                record_worker_error(&snapshot, error.to_string());
527                                eprintln!(
528                                    "market recorder: symbol stream handling failed mode={} error={}",
529                                    mode.as_str(),
530                                    error
531                                );
532                                break;
533                            }
534                        }
535                        Some(Err(error)) => {
536                            record_worker_error(&snapshot, format!("symbol stream disconnected: {error}"));
537                            eprintln!(
538                                "market recorder: symbol stream disconnected mode={} symbols={} error={}",
539                                mode.as_str(),
540                                watched_symbols.join(","),
541                                error
542                            );
543                            break
544                        }
545                        None => {
546                            record_worker_error(&snapshot, "symbol stream disconnected: eof".to_string());
547                            eprintln!(
548                                "market recorder: symbol stream disconnected mode={} symbols={} error=eof",
549                                mode.as_str(),
550                                watched_symbols.join(",")
551                            );
552                            break
553                        }
554                    }
555                }
556                _ = tokio::time::sleep(Duration::from_millis(250)) => {
557                    touch_worker_snapshot(&snapshot);
558                }
559            }
560        }
561
562        tokio::time::sleep(Duration::from_secs(1)).await;
563    }
564}
565
566async fn next_symbol_message(
567    stream: &mut Option<
568        tokio_tungstenite::WebSocketStream<
569            tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
570        >,
571    >,
572) -> Option<Result<Message, tokio_tungstenite::tungstenite::Error>> {
573    match stream {
574        Some(stream) => stream.next().await,
575        None => None,
576    }
577}
578
579fn handle_force_order_message(
580    connection: &Connection,
581    mode: BinanceMode,
582    sequence: &mut i64,
583    message: Message,
584) -> Result<(), StorageError> {
585    let payload = match message {
586        Message::Text(text) => text,
587        Message::Ping(_) | Message::Pong(_) | Message::Binary(_) => return Ok(()),
588        Message::Close(_) => {
589            return Err(StorageError::WriteFailedWithContext {
590                message: "forceOrder stream closed".to_string(),
591            })
592        }
593        Message::Frame(_) => return Ok(()),
594    };
595    let parsed: ForceOrderEnvelope =
596        serde_json::from_str(&payload).map_err(|error| StorageError::WriteFailedWithContext {
597            message: format!("forceOrder parse failed: {error}; payload={payload}"),
598        })?;
599    let Some(order) = parsed.order else {
600        return Ok(());
601    };
602    *sequence += 1;
603    let receive_time_ms = Utc::now().timestamp_millis();
604    connection
605        .execute(
606            "INSERT INTO raw_liquidation_events (
607                event_id, mode, symbol, event_time, receive_time, force_side, price, qty, notional, raw_payload
608             ) VALUES (
609                ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?, ?
610             )",
611                params![
612                    *sequence,
613                    mode.as_str(),
614                    order.symbol,
615                    parsed.event_time,
616                receive_time_ms,
617                order.side,
618                order.price,
619                order.qty,
620                order.price * order.qty,
621                payload,
622            ],
623        )
624        .map_err(|error| StorageError::WriteFailedWithContext {
625            message: error.to_string(),
626        })?;
627    Ok(())
628}
629
630fn handle_symbol_message(
631    connection: &Connection,
632    mode: BinanceMode,
633    ticker_sequence: &mut i64,
634    trade_sequence: &mut i64,
635    agg_trade_bar_seconds: &mut BTreeMap<String, i64>,
636    snapshot: &Arc<Mutex<WorkerSnapshot>>,
637    message: Message,
638) -> Result<(), StorageError> {
639    let payload = match message {
640        Message::Text(text) => text,
641        Message::Ping(_) | Message::Pong(_) | Message::Binary(_) => return Ok(()),
642        Message::Close(_) => {
643            return Err(StorageError::WriteFailedWithContext {
644                message: "symbol stream closed".to_string(),
645            })
646        }
647        Message::Frame(_) => return Ok(()),
648    };
649    let parsed: CombinedStreamEnvelope =
650        serde_json::from_str(&payload).map_err(|error| StorageError::WriteFailedWithContext {
651            message: format!("symbol stream parse failed: {error}; payload={payload}"),
652        })?;
653    let receive_time_ms = Utc::now().timestamp_millis();
654
655    if parsed.data.event_type == "bookTicker" {
656        let Some(symbol) = parsed.data.symbol else {
657            return Ok(());
658        };
659        let Some(event_time) = parsed.data.event_time else {
660            return Ok(());
661        };
662        let Some(bid) = parsed.data.bid else {
663            return Ok(());
664        };
665        let Some(bid_qty) = parsed.data.bid_qty else {
666            return Ok(());
667        };
668        let Some(ask) = parsed.data.ask else {
669            return Ok(());
670        };
671        let Some(ask_qty) = parsed.data.ask_qty else {
672            return Ok(());
673        };
674        *ticker_sequence += 1;
675        connection
676            .execute(
677                "INSERT INTO raw_book_ticker (
678                    tick_id, mode, symbol, event_time, receive_time, bid, bid_qty, ask, ask_qty
679                 ) VALUES (
680                    ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?
681                 )",
682                params![
683                    *ticker_sequence,
684                    mode.as_str(),
685                    symbol,
686                    event_time,
687                    receive_time_ms,
688                    bid,
689                    bid_qty,
690                    ask,
691                    ask_qty,
692                ],
693            )
694            .map_err(|error| StorageError::WriteFailedWithContext {
695                message: error.to_string(),
696            })?;
697        record_book_ticker_event(snapshot, &symbol, event_time);
698    } else if parsed.data.event_type == "aggTrade" {
699        let Some(symbol) = parsed.data.symbol else {
700            return Ok(());
701        };
702        let Some(event_time) = parsed.data.event_time else {
703            return Ok(());
704        };
705        let Some(price) = parsed.data.price else {
706            return Ok(());
707        };
708        let Some(qty) = parsed.data.qty else {
709            return Ok(());
710        };
711        let Some(is_buyer_maker) = parsed.data.is_buyer_maker else {
712            return Ok(());
713        };
714        *trade_sequence += 1;
715        connection
716            .execute(
717                "INSERT INTO raw_agg_trades (
718                    trade_id, mode, symbol, event_time, receive_time, price, qty, is_buyer_maker
719                 ) VALUES (
720                    ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?
721                 )",
722                params![
723                    *trade_sequence,
724                    mode.as_str(),
725                    symbol,
726                    event_time,
727                    receive_time_ms,
728                    price,
729                    qty,
730                    is_buyer_maker,
731                ],
732            )
733            .map_err(|error| StorageError::WriteFailedWithContext {
734                message: error.to_string(),
735            })?;
736        record_agg_trade_event(snapshot, &symbol, event_time, agg_trade_bar_seconds);
737    }
738
739    Ok(())
740}
741
742fn market_stream_base_url(mode: BinanceMode) -> &'static str {
743    let _ = mode;
744    "wss://fstream.binance.com"
745}
746
747fn combined_symbol_stream_url(mode: BinanceMode, watched_symbols: &[String]) -> Option<String> {
748    if watched_symbols.is_empty() {
749        return None;
750    }
751
752    let streams = watched_symbols
753        .iter()
754        .flat_map(|symbol| {
755            let lower = symbol.to_ascii_lowercase();
756            [format!("{lower}@bookTicker"), format!("{lower}@aggTrade")]
757        })
758        .collect::<Vec<_>>()
759        .join("/");
760    Some(format!(
761        "{}/stream?streams={streams}",
762        market_stream_base_url(mode)
763    ))
764}
765
766fn normalize_symbols(symbols: Vec<String>) -> Vec<String> {
767    let mut normalized = BTreeSet::new();
768    for symbol in symbols {
769        normalized.insert(symbol.trim().to_ascii_uppercase());
770    }
771    normalized.into_iter().collect()
772}
773
774fn merge_symbol_sets(left: Vec<String>, right: Vec<String>) -> Vec<String> {
775    let mut merged = BTreeSet::new();
776    for symbol in left.into_iter().chain(right.into_iter()) {
777        merged.insert(symbol);
778    }
779    merged.into_iter().collect()
780}
781
782#[derive(Debug, Deserialize)]
783struct ForceOrderEnvelope {
784    #[serde(rename = "E")]
785    event_time: i64,
786    #[serde(rename = "o")]
787    order: Option<ForceOrderData>,
788}
789
790#[derive(Debug, Deserialize)]
791struct ForceOrderData {
792    #[serde(rename = "s")]
793    symbol: String,
794    #[serde(rename = "S")]
795    side: String,
796    #[serde(rename = "p", deserialize_with = "deserialize_string_number")]
797    price: f64,
798    #[serde(rename = "q", deserialize_with = "deserialize_string_number")]
799    qty: f64,
800}
801
802#[derive(Debug, Deserialize)]
803struct CombinedStreamEnvelope {
804    data: CombinedStreamData,
805}
806
807#[derive(Debug, Deserialize)]
808struct CombinedStreamData {
809    #[serde(rename = "e")]
810    event_type: String,
811    #[serde(rename = "E")]
812    event_time: Option<i64>,
813    #[serde(rename = "s")]
814    symbol: Option<String>,
815    #[serde(
816        rename = "b",
817        default,
818        deserialize_with = "deserialize_optional_string_number"
819    )]
820    bid: Option<f64>,
821    #[serde(
822        rename = "B",
823        default,
824        deserialize_with = "deserialize_optional_string_number"
825    )]
826    bid_qty: Option<f64>,
827    #[serde(
828        rename = "a",
829        default,
830        deserialize_with = "deserialize_optional_string_number"
831    )]
832    ask: Option<f64>,
833    #[serde(
834        rename = "A",
835        default,
836        deserialize_with = "deserialize_optional_string_number"
837    )]
838    ask_qty: Option<f64>,
839    #[serde(
840        rename = "p",
841        default,
842        deserialize_with = "deserialize_optional_string_number"
843    )]
844    price: Option<f64>,
845    #[serde(
846        rename = "q",
847        default,
848        deserialize_with = "deserialize_optional_string_number"
849    )]
850    qty: Option<f64>,
851    #[serde(rename = "m")]
852    is_buyer_maker: Option<bool>,
853}
854
855fn deserialize_string_number<'de, D>(deserializer: D) -> Result<f64, D::Error>
856where
857    D: serde::Deserializer<'de>,
858{
859    let value = String::deserialize(deserializer)?;
860    value.parse::<f64>().map_err(serde::de::Error::custom)
861}
862
863fn deserialize_optional_string_number<'de, D>(deserializer: D) -> Result<Option<f64>, D::Error>
864where
865    D: serde::Deserializer<'de>,
866{
867    let value = Option::<serde_json::Value>::deserialize(deserializer)?;
868    match value {
869        None | Some(serde_json::Value::Null) => Ok(None),
870        Some(serde_json::Value::String(value)) => value
871            .parse::<f64>()
872            .map(Some)
873            .map_err(serde::de::Error::custom),
874        Some(serde_json::Value::Number(value)) => value
875            .as_f64()
876            .ok_or_else(|| serde::de::Error::custom("invalid numeric value"))
877            .map(Some),
878        Some(other) => Err(serde::de::Error::custom(format!(
879            "expected string or number, got {other}"
880        ))),
881    }
882}
883
884fn touch_worker_snapshot(snapshot: &Arc<Mutex<WorkerSnapshot>>) {
885    if let Ok(mut snapshot) = snapshot.lock() {
886        snapshot.updated_at = Utc::now();
887    }
888}
889
890fn record_worker_error(snapshot: &Arc<Mutex<WorkerSnapshot>>, error: String) {
891    if let Ok(mut snapshot) = snapshot.lock() {
892        snapshot.updated_at = Utc::now();
893        snapshot.last_error = Some(error);
894    }
895}
896
897fn record_force_order_event(snapshot: &Arc<Mutex<WorkerSnapshot>>, connection: &Connection) {
898    if let Ok(mut snapshot) = snapshot.lock() {
899        snapshot.updated_at = Utc::now();
900        snapshot.last_error = None;
901        snapshot.metrics.liquidation_events += 1;
902        snapshot.metrics.last_liquidation_event_time =
903            query_latest_timestamp(connection, "raw_liquidation_events", "event_time")
904                .ok()
905                .flatten();
906        snapshot.metrics.top_liquidation_symbols =
907            query_top_symbols(connection, "raw_liquidation_events").unwrap_or_default();
908    }
909}
910
911fn record_book_ticker_event(
912    snapshot: &Arc<Mutex<WorkerSnapshot>>,
913    symbol: &str,
914    event_time_ms: i64,
915) {
916    if let Ok(mut snapshot) = snapshot.lock() {
917        snapshot.updated_at = Utc::now();
918        snapshot.last_error = None;
919        snapshot.metrics.book_ticker_events += 1;
920        snapshot.metrics.last_book_ticker_event_time = timestamp_string(event_time_ms);
921        increment_top_symbol(&mut snapshot.metrics.top_book_ticker_symbols, symbol);
922    }
923}
924
925fn record_agg_trade_event(
926    snapshot: &Arc<Mutex<WorkerSnapshot>>,
927    symbol: &str,
928    event_time_ms: i64,
929    agg_trade_bar_seconds: &mut BTreeMap<String, i64>,
930) {
931    if let Ok(mut snapshot) = snapshot.lock() {
932        snapshot.updated_at = Utc::now();
933        snapshot.last_error = None;
934        snapshot.metrics.agg_trade_events += 1;
935        snapshot.metrics.last_agg_trade_event_time = timestamp_string(event_time_ms);
936        increment_top_symbol(&mut snapshot.metrics.top_agg_trade_symbols, symbol);
937        let bar_second = event_time_ms / 1_000;
938        let should_increment_bar = agg_trade_bar_seconds
939            .insert(symbol.to_string(), bar_second)
940            .map(|previous| previous != bar_second)
941            .unwrap_or(true);
942        if should_increment_bar {
943            snapshot.metrics.derived_kline_1s_bars += 1;
944        }
945    }
946}
947
948fn increment_top_symbol(top_symbols: &mut Vec<String>, symbol: &str) {
949    let mut counts = top_symbols
950        .iter()
951        .filter_map(|entry| {
952            let (symbol, count) = entry.split_once(':')?;
953            let count = count.parse::<u64>().ok()?;
954            Some((symbol.to_string(), count))
955        })
956        .collect::<BTreeMap<_, _>>();
957    *counts.entry(symbol.to_string()).or_default() += 1;
958    let mut sorted = counts.into_iter().collect::<Vec<_>>();
959    sorted.sort_by(|left, right| right.1.cmp(&left.1).then_with(|| left.0.cmp(&right.0)));
960    *top_symbols = sorted
961        .into_iter()
962        .take(5)
963        .map(|(symbol, count)| format!("{symbol}:{count}"))
964        .collect();
965}
966
967fn timestamp_string(event_time_ms: i64) -> Option<String> {
968    Utc.timestamp_millis_opt(event_time_ms)
969        .single()
970        .map(|value| {
971            value
972                .naive_utc()
973                .format("%Y-%m-%d %H:%M:%S%.3f")
974                .to_string()
975        })
976}
977
978fn query_latest_timestamp(
979    connection: &Connection,
980    table: &str,
981    column: &str,
982) -> Result<Option<String>, StorageError> {
983    let sql = format!("SELECT CAST(MAX({column}) AS VARCHAR) FROM {table}");
984    let mut statement =
985        connection
986            .prepare(&sql)
987            .map_err(|error| StorageError::WriteFailedWithContext {
988                message: error.to_string(),
989            })?;
990    statement.query_row([], |row| row.get(0)).map_err(|error| {
991        StorageError::WriteFailedWithContext {
992            message: error.to_string(),
993        }
994    })
995}
996
997fn query_top_symbols(connection: &Connection, table: &str) -> Result<Vec<String>, StorageError> {
998    let sql = format!(
999        "SELECT symbol, COUNT(*) AS row_count FROM {table} GROUP BY symbol ORDER BY row_count DESC, symbol ASC LIMIT 5"
1000    );
1001    let mut statement =
1002        connection
1003            .prepare(&sql)
1004            .map_err(|error| StorageError::WriteFailedWithContext {
1005                message: error.to_string(),
1006            })?;
1007    let mut rows = statement
1008        .query([])
1009        .map_err(|error| StorageError::WriteFailedWithContext {
1010            message: error.to_string(),
1011        })?;
1012    let mut result = Vec::new();
1013    while let Some(row) = rows
1014        .next()
1015        .map_err(|error| StorageError::WriteFailedWithContext {
1016            message: error.to_string(),
1017        })?
1018    {
1019        let symbol: String = row
1020            .get(0)
1021            .map_err(|error| StorageError::WriteFailedWithContext {
1022                message: error.to_string(),
1023            })?;
1024        let row_count: i64 = row
1025            .get(1)
1026            .map_err(|error| StorageError::WriteFailedWithContext {
1027                message: error.to_string(),
1028            })?;
1029        result.push(format!("{symbol}:{row_count}"));
1030    }
1031    Ok(result)
1032}