1use std::collections::{BTreeMap, BTreeSet};
2use std::path::{Path, PathBuf};
3use std::sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc, Mutex,
6};
7use std::thread::JoinHandle;
8use std::time::Duration;
9
10use chrono::{DateTime, TimeZone, Utc};
11use duckdb::{params, Connection};
12use futures_util::StreamExt;
13use serde::Deserialize;
14use tokio_tungstenite::connect_async;
15use tokio_tungstenite::tungstenite::Message;
16
17use crate::app::bootstrap::BinanceMode;
18use crate::dataset::query::{backtest_summary_for_path, metrics_for_path};
19use crate::dataset::schema::init_schema_for_path;
20use crate::dataset::types::{BacktestDatasetSummary, RecorderMetrics};
21use crate::error::storage_error::StorageError;
22use crate::record::coordination::RecorderCoordination;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum RecorderState {
26 Running,
27 Stopped,
28}
29
30impl RecorderState {
31 pub fn as_str(self) -> &'static str {
32 match self {
33 Self::Running => "running",
34 Self::Stopped => "stopped",
35 }
36 }
37
38 pub fn is_running(self) -> bool {
39 self == Self::Running
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub struct RecorderStatus {
45 pub mode: BinanceMode,
46 pub state: RecorderState,
47 pub db_path: PathBuf,
48 pub started_at: Option<DateTime<Utc>>,
49 pub updated_at: DateTime<Utc>,
50 pub manual_symbols: Vec<String>,
51 pub strategy_symbols: Vec<String>,
52 pub watched_symbols: Vec<String>,
53 pub worker_alive: bool,
54 pub heartbeat_age_sec: i64,
55 pub last_error: Option<String>,
56 pub metrics: RecorderMetrics,
57}
58
59impl RecorderStatus {
60 fn new(
61 mode: BinanceMode,
62 state: RecorderState,
63 db_path: PathBuf,
64 manual_symbols: Vec<String>,
65 strategy_symbols: Vec<String>,
66 watched_symbols: Vec<String>,
67 ) -> Self {
68 let now = Utc::now();
69 Self {
70 mode,
71 state,
72 db_path,
73 started_at: if state == RecorderState::Running {
74 Some(now)
75 } else {
76 None
77 },
78 updated_at: now,
79 manual_symbols,
80 strategy_symbols,
81 watched_symbols,
82 worker_alive: true,
83 heartbeat_age_sec: 0,
84 last_error: None,
85 metrics: RecorderMetrics::default(),
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
91struct WorkerSnapshot {
92 updated_at: DateTime<Utc>,
93 metrics: RecorderMetrics,
94 last_error: Option<String>,
95}
96
97impl WorkerSnapshot {
98 fn new(metrics: RecorderMetrics) -> Self {
99 Self {
100 updated_at: Utc::now(),
101 metrics,
102 last_error: None,
103 }
104 }
105}
106
107struct ModeWorker {
108 stop_flag: Arc<AtomicBool>,
109 snapshot: Arc<Mutex<WorkerSnapshot>>,
110 pub(crate) handle: JoinHandle<()>,
111}
112
113pub struct MarketDataRecorder {
114 base_dir: PathBuf,
115 network_enabled: bool,
116 statuses: BTreeMap<BinanceMode, RecorderStatus>,
117 workers: BTreeMap<BinanceMode, ModeWorker>,
118}
119
120impl std::fmt::Debug for MarketDataRecorder {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 f.debug_struct("MarketDataRecorder")
123 .field("base_dir", &self.base_dir)
124 .field("network_enabled", &self.network_enabled)
125 .field("statuses", &self.statuses)
126 .finish()
127 }
128}
129
130impl Default for MarketDataRecorder {
131 fn default() -> Self {
132 Self::new("var")
133 }
134}
135
136impl MarketDataRecorder {
137 pub fn new(base_dir: impl Into<PathBuf>) -> Self {
138 Self {
139 base_dir: base_dir.into(),
140 network_enabled: true,
141 statuses: BTreeMap::new(),
142 workers: BTreeMap::new(),
143 }
144 }
145
146 pub fn without_network(mut self) -> Self {
147 self.network_enabled = false;
148 self
149 }
150
151 pub fn start(
152 &mut self,
153 mode: BinanceMode,
154 manual_symbols: Vec<String>,
155 strategy_symbols: Vec<String>,
156 ) -> Result<RecorderStatus, StorageError> {
157 if self
158 .statuses
159 .get(&mode)
160 .is_some_and(|status| status.state == RecorderState::Running)
161 {
162 return Err(StorageError::RecorderAlreadyRunning {
163 mode: mode.as_str().to_string(),
164 });
165 }
166
167 let db_path = self.db_path(mode);
168 init_schema_for_path(&db_path)?;
169 let manual_symbols = normalize_symbols(manual_symbols);
170 let strategy_symbols = normalize_symbols(strategy_symbols);
171 let watched_symbols = merge_symbol_sets(manual_symbols.clone(), strategy_symbols.clone());
172 let status = RecorderStatus::new(
173 mode,
174 RecorderState::Running,
175 db_path.clone(),
176 manual_symbols,
177 strategy_symbols,
178 watched_symbols.clone(),
179 );
180 let initial_metrics = self.load_metrics(&db_path).unwrap_or_default();
181 let mut status = status;
182 status.metrics = initial_metrics.clone();
183 if self.network_enabled {
184 self.spawn_worker(mode, db_path, watched_symbols, initial_metrics)?;
185 }
186 self.statuses.insert(mode, status.clone());
187 Ok(status)
188 }
189
190 pub fn status(&self, mode: BinanceMode) -> RecorderStatus {
191 let mut status = self.statuses.get(&mode).cloned().unwrap_or_else(|| {
192 RecorderStatus::new(
193 mode,
194 RecorderState::Stopped,
195 self.db_path(mode),
196 Vec::new(),
197 Vec::new(),
198 Vec::new(),
199 )
200 });
201 if let Some(worker) = self.workers.get(&mode) {
202 let worker_alive = !worker.handle.is_finished();
203 status.worker_alive = worker_alive;
204 if let Ok(snapshot) = worker.snapshot.lock() {
205 status.updated_at = snapshot.updated_at;
206 status.heartbeat_age_sec = (Utc::now() - snapshot.updated_at).num_seconds();
207 status.metrics = snapshot.metrics.clone();
208 status.last_error = snapshot.last_error.clone();
209 }
210 } else {
211 status.worker_alive = false;
212 status.heartbeat_age_sec = (Utc::now() - status.updated_at).num_seconds();
213 status.metrics = self.load_metrics(&status.db_path).unwrap_or_default();
214 }
215 status
216 }
217
218 pub fn update_strategy_symbols(
219 &mut self,
220 mode: BinanceMode,
221 strategy_symbols: Vec<String>,
222 ) -> Result<(), StorageError> {
223 let strategy_symbols = normalize_symbols(strategy_symbols);
224 let Some(status) = self.statuses.get_mut(&mode) else {
225 return Ok(());
226 };
227 if status.strategy_symbols == strategy_symbols {
228 return Ok(());
229 }
230 status.strategy_symbols = strategy_symbols.clone();
231 status.watched_symbols =
232 merge_symbol_sets(status.manual_symbols.clone(), strategy_symbols.clone());
233 status.updated_at = Utc::now();
234 let watched_symbols = status.watched_symbols.clone();
235 let should_restart = status.state == RecorderState::Running && self.network_enabled;
236
237 if !should_restart {
238 return Ok(());
239 }
240
241 self.restart_worker(mode, watched_symbols)
242 }
243
244 pub fn update_manual_symbols(
245 &mut self,
246 mode: BinanceMode,
247 manual_symbols: Vec<String>,
248 ) -> Result<(), StorageError> {
249 let manual_symbols = normalize_symbols(manual_symbols);
250 let Some(status) = self.statuses.get_mut(&mode) else {
251 return Ok(());
252 };
253 if status.manual_symbols == manual_symbols {
254 return Ok(());
255 }
256 status.manual_symbols = manual_symbols.clone();
257 status.watched_symbols =
258 merge_symbol_sets(manual_symbols.clone(), status.strategy_symbols.clone());
259 status.updated_at = Utc::now();
260 let watched_symbols = status.watched_symbols.clone();
261 let should_restart = status.state == RecorderState::Running && self.network_enabled;
262
263 if !should_restart {
264 return Ok(());
265 }
266
267 self.restart_worker(mode, watched_symbols)
268 }
269
270 pub fn stop(&mut self, mode: BinanceMode) -> Result<RecorderStatus, StorageError> {
271 let Some(existing) = self.statuses.get_mut(&mode) else {
272 return Err(StorageError::RecorderNotRunning {
273 mode: mode.as_str().to_string(),
274 });
275 };
276 if existing.state != RecorderState::Running {
277 return Err(StorageError::RecorderNotRunning {
278 mode: mode.as_str().to_string(),
279 });
280 }
281
282 if let Some(worker) = self.workers.remove(&mode) {
283 if let Ok(snapshot) = worker.snapshot.lock() {
284 existing.updated_at = snapshot.updated_at;
285 existing.metrics = snapshot.metrics.clone();
286 existing.last_error = snapshot.last_error.clone();
287 }
288 worker.stop_flag.store(true, Ordering::Relaxed);
289 }
290
291 existing.state = RecorderState::Stopped;
292 existing.updated_at = Utc::now();
293 existing.worker_alive = false;
294 Ok(existing.clone())
295 }
296
297 pub fn backtest_dataset_summary(
298 &self,
299 mode: BinanceMode,
300 symbol: &str,
301 from: chrono::NaiveDate,
302 to: chrono::NaiveDate,
303 ) -> Result<BacktestDatasetSummary, StorageError> {
304 backtest_summary_for_path(&self.db_path(mode), mode, symbol, from, to)
305 }
306
307 pub fn worker_alive(&self, mode: BinanceMode) -> bool {
308 self.workers
309 .get(&mode)
310 .is_some_and(|worker| !worker.handle.is_finished())
311 }
312
313 pub fn metrics_for_path(db_path: &Path) -> Result<RecorderMetrics, StorageError> {
314 metrics_for_path(db_path)
315 }
316
317 pub fn backtest_summary_for_path(
318 db_path: &Path,
319 mode: BinanceMode,
320 symbol: &str,
321 from: chrono::NaiveDate,
322 to: chrono::NaiveDate,
323 ) -> Result<BacktestDatasetSummary, StorageError> {
324 backtest_summary_for_path(db_path, mode, symbol, from, to)
325 }
326
327 pub fn init_schema_for_path(db_path: &Path) -> Result<(), StorageError> {
328 init_schema_for_path(db_path)
329 }
330
331 fn restart_worker(
332 &mut self,
333 mode: BinanceMode,
334 watched_symbols: Vec<String>,
335 ) -> Result<(), StorageError> {
336 let initial_metrics = self.status(mode).metrics.clone();
337 if let Some(worker) = self.workers.remove(&mode) {
338 worker.stop_flag.store(true, Ordering::Relaxed);
339 }
340 let db_path = self.db_path(mode);
341 self.spawn_worker(mode, db_path, watched_symbols, initial_metrics)
342 }
343
344 fn spawn_worker(
345 &mut self,
346 mode: BinanceMode,
347 db_path: PathBuf,
348 watched_symbols: Vec<String>,
349 initial_metrics: RecorderMetrics,
350 ) -> Result<(), StorageError> {
351 let stop_flag = Arc::new(AtomicBool::new(false));
352 let worker_stop_flag = stop_flag.clone();
353 let snapshot = Arc::new(Mutex::new(WorkerSnapshot::new(initial_metrics)));
354 let worker_snapshot = snapshot.clone();
355 let handle = std::thread::Builder::new()
356 .name(format!("market-recorder-{}", mode.as_str()))
357 .spawn(move || {
358 let _ = rustls::crypto::ring::default_provider().install_default();
359 let runtime = tokio::runtime::Builder::new_current_thread()
360 .enable_all()
361 .build();
362 let Ok(runtime) = runtime else {
363 record_worker_error(
364 &worker_snapshot,
365 "failed to initialize tokio runtime".to_string(),
366 );
367 return;
368 };
369 runtime.block_on(async move {
370 run_market_data_worker(
371 mode,
372 db_path,
373 watched_symbols,
374 worker_stop_flag,
375 worker_snapshot,
376 )
377 .await;
378 });
379 })
380 .map_err(|error| StorageError::WriteFailedWithContext {
381 message: error.to_string(),
382 })?;
383 self.workers.insert(
384 mode,
385 ModeWorker {
386 stop_flag,
387 snapshot,
388 handle,
389 },
390 );
391 Ok(())
392 }
393
394 fn load_metrics(&self, db_path: &Path) -> Result<RecorderMetrics, StorageError> {
395 metrics_for_path(db_path)
396 }
397
398 fn db_path(&self, mode: BinanceMode) -> PathBuf {
399 RecorderCoordination::new(self.base_dir.clone()).db_path(mode)
400 }
401}
402
403impl Drop for MarketDataRecorder {
404 fn drop(&mut self) {
405 for worker in self.workers.values() {
406 worker.stop_flag.store(true, Ordering::Relaxed);
407 }
408 }
409}
410
411async fn run_market_data_worker(
412 mode: BinanceMode,
413 db_path: PathBuf,
414 watched_symbols: Vec<String>,
415 stop_flag: Arc<AtomicBool>,
416 snapshot: Arc<Mutex<WorkerSnapshot>>,
417) {
418 let Ok(connection) = Connection::open(&db_path) else {
419 record_worker_error(
420 &snapshot,
421 format!("failed to open duckdb at {}", db_path.display()),
422 );
423 return;
424 };
425 let mut agg_trade_bar_seconds = BTreeMap::new();
426
427 loop {
428 touch_worker_snapshot(&snapshot);
429 if stop_flag.load(Ordering::Relaxed) {
430 break;
431 }
432
433 let force_order_url = format!("{}/ws/!forceOrder@arr", market_stream_base_url(mode));
434 let symbol_stream_url = combined_symbol_stream_url(mode, &watched_symbols);
435
436 let force_stream = connect_async(force_order_url).await;
437 let mut force_stream = match force_stream {
438 Ok((stream, _)) => stream,
439 Err(error) => {
440 record_worker_error(&snapshot, format!("forceOrder connect failed: {error}"));
441 eprintln!(
442 "market recorder: failed to connect forceOrder stream mode={} error={}",
443 mode.as_str(),
444 error
445 );
446 tokio::time::sleep(Duration::from_secs(2)).await;
447 continue;
448 }
449 };
450
451 let mut symbol_stream = match symbol_stream_url {
452 Some(url) => match connect_async(url).await {
453 Ok((stream, _)) => Some(stream),
454 Err(error) => {
455 record_worker_error(
456 &snapshot,
457 format!("symbol stream connect failed: {error}"),
458 );
459 eprintln!(
460 "market recorder: failed to connect symbol streams mode={} symbols={} error={}",
461 mode.as_str(),
462 watched_symbols.join(","),
463 error
464 );
465 None
466 }
467 },
468 None => None,
469 };
470
471 let mut liquidation_seq = 0i64;
472 let mut ticker_seq = 0i64;
473 let mut trade_seq = 0i64;
474
475 loop {
476 if stop_flag.load(Ordering::Relaxed) {
477 return;
478 }
479
480 tokio::select! {
481 message = force_stream.next() => {
482 match message {
483 Some(Ok(message)) => {
484 if let Err(error) = handle_force_order_message(&connection, mode, &mut liquidation_seq, message) {
485 record_worker_error(&snapshot, error.to_string());
486 eprintln!(
487 "market recorder: forceOrder stream handling failed mode={} error={}",
488 mode.as_str(),
489 error
490 );
491 break;
492 }
493 record_force_order_event(&snapshot, &connection);
494 }
495 Some(Err(error)) => {
496 record_worker_error(&snapshot, format!("forceOrder stream disconnected: {error}"));
497 eprintln!(
498 "market recorder: forceOrder stream disconnected mode={} error={}",
499 mode.as_str(),
500 error
501 );
502 break
503 }
504 None => {
505 record_worker_error(&snapshot, "forceOrder stream disconnected: eof".to_string());
506 eprintln!(
507 "market recorder: forceOrder stream disconnected mode={} error=eof",
508 mode.as_str()
509 );
510 break
511 }
512 }
513 }
514 message = next_symbol_message(&mut symbol_stream), if symbol_stream.is_some() => {
515 match message {
516 Some(Ok(message)) => {
517 if let Err(error) = handle_symbol_message(
518 &connection,
519 mode,
520 &mut ticker_seq,
521 &mut trade_seq,
522 &mut agg_trade_bar_seconds,
523 &snapshot,
524 message,
525 ) {
526 record_worker_error(&snapshot, error.to_string());
527 eprintln!(
528 "market recorder: symbol stream handling failed mode={} error={}",
529 mode.as_str(),
530 error
531 );
532 break;
533 }
534 }
535 Some(Err(error)) => {
536 record_worker_error(&snapshot, format!("symbol stream disconnected: {error}"));
537 eprintln!(
538 "market recorder: symbol stream disconnected mode={} symbols={} error={}",
539 mode.as_str(),
540 watched_symbols.join(","),
541 error
542 );
543 break
544 }
545 None => {
546 record_worker_error(&snapshot, "symbol stream disconnected: eof".to_string());
547 eprintln!(
548 "market recorder: symbol stream disconnected mode={} symbols={} error=eof",
549 mode.as_str(),
550 watched_symbols.join(",")
551 );
552 break
553 }
554 }
555 }
556 _ = tokio::time::sleep(Duration::from_millis(250)) => {
557 touch_worker_snapshot(&snapshot);
558 }
559 }
560 }
561
562 tokio::time::sleep(Duration::from_secs(1)).await;
563 }
564}
565
566async fn next_symbol_message(
567 stream: &mut Option<
568 tokio_tungstenite::WebSocketStream<
569 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
570 >,
571 >,
572) -> Option<Result<Message, tokio_tungstenite::tungstenite::Error>> {
573 match stream {
574 Some(stream) => stream.next().await,
575 None => None,
576 }
577}
578
579fn handle_force_order_message(
580 connection: &Connection,
581 mode: BinanceMode,
582 sequence: &mut i64,
583 message: Message,
584) -> Result<(), StorageError> {
585 let payload = match message {
586 Message::Text(text) => text,
587 Message::Ping(_) | Message::Pong(_) | Message::Binary(_) => return Ok(()),
588 Message::Close(_) => {
589 return Err(StorageError::WriteFailedWithContext {
590 message: "forceOrder stream closed".to_string(),
591 })
592 }
593 Message::Frame(_) => return Ok(()),
594 };
595 let parsed: ForceOrderEnvelope =
596 serde_json::from_str(&payload).map_err(|error| StorageError::WriteFailedWithContext {
597 message: format!("forceOrder parse failed: {error}; payload={payload}"),
598 })?;
599 let Some(order) = parsed.order else {
600 return Ok(());
601 };
602 *sequence += 1;
603 let receive_time_ms = Utc::now().timestamp_millis();
604 connection
605 .execute(
606 "INSERT INTO raw_liquidation_events (
607 event_id, mode, symbol, event_time, receive_time, force_side, price, qty, notional, raw_payload
608 ) VALUES (
609 ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?, ?
610 )",
611 params![
612 *sequence,
613 mode.as_str(),
614 order.symbol,
615 parsed.event_time,
616 receive_time_ms,
617 order.side,
618 order.price,
619 order.qty,
620 order.price * order.qty,
621 payload,
622 ],
623 )
624 .map_err(|error| StorageError::WriteFailedWithContext {
625 message: error.to_string(),
626 })?;
627 Ok(())
628}
629
630fn handle_symbol_message(
631 connection: &Connection,
632 mode: BinanceMode,
633 ticker_sequence: &mut i64,
634 trade_sequence: &mut i64,
635 agg_trade_bar_seconds: &mut BTreeMap<String, i64>,
636 snapshot: &Arc<Mutex<WorkerSnapshot>>,
637 message: Message,
638) -> Result<(), StorageError> {
639 let payload = match message {
640 Message::Text(text) => text,
641 Message::Ping(_) | Message::Pong(_) | Message::Binary(_) => return Ok(()),
642 Message::Close(_) => {
643 return Err(StorageError::WriteFailedWithContext {
644 message: "symbol stream closed".to_string(),
645 })
646 }
647 Message::Frame(_) => return Ok(()),
648 };
649 let parsed: CombinedStreamEnvelope =
650 serde_json::from_str(&payload).map_err(|error| StorageError::WriteFailedWithContext {
651 message: format!("symbol stream parse failed: {error}; payload={payload}"),
652 })?;
653 let receive_time_ms = Utc::now().timestamp_millis();
654
655 if parsed.data.event_type == "bookTicker" {
656 let Some(symbol) = parsed.data.symbol else {
657 return Ok(());
658 };
659 let Some(event_time) = parsed.data.event_time else {
660 return Ok(());
661 };
662 let Some(bid) = parsed.data.bid else {
663 return Ok(());
664 };
665 let Some(bid_qty) = parsed.data.bid_qty else {
666 return Ok(());
667 };
668 let Some(ask) = parsed.data.ask else {
669 return Ok(());
670 };
671 let Some(ask_qty) = parsed.data.ask_qty else {
672 return Ok(());
673 };
674 *ticker_sequence += 1;
675 connection
676 .execute(
677 "INSERT INTO raw_book_ticker (
678 tick_id, mode, symbol, event_time, receive_time, bid, bid_qty, ask, ask_qty
679 ) VALUES (
680 ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?, ?
681 )",
682 params![
683 *ticker_sequence,
684 mode.as_str(),
685 symbol,
686 event_time,
687 receive_time_ms,
688 bid,
689 bid_qty,
690 ask,
691 ask_qty,
692 ],
693 )
694 .map_err(|error| StorageError::WriteFailedWithContext {
695 message: error.to_string(),
696 })?;
697 record_book_ticker_event(snapshot, &symbol, event_time);
698 } else if parsed.data.event_type == "aggTrade" {
699 let Some(symbol) = parsed.data.symbol else {
700 return Ok(());
701 };
702 let Some(event_time) = parsed.data.event_time else {
703 return Ok(());
704 };
705 let Some(price) = parsed.data.price else {
706 return Ok(());
707 };
708 let Some(qty) = parsed.data.qty else {
709 return Ok(());
710 };
711 let Some(is_buyer_maker) = parsed.data.is_buyer_maker else {
712 return Ok(());
713 };
714 *trade_sequence += 1;
715 connection
716 .execute(
717 "INSERT INTO raw_agg_trades (
718 trade_id, mode, symbol, event_time, receive_time, price, qty, is_buyer_maker
719 ) VALUES (
720 ?, ?, ?, to_timestamp(? / 1000.0), to_timestamp(? / 1000.0), ?, ?, ?
721 )",
722 params![
723 *trade_sequence,
724 mode.as_str(),
725 symbol,
726 event_time,
727 receive_time_ms,
728 price,
729 qty,
730 is_buyer_maker,
731 ],
732 )
733 .map_err(|error| StorageError::WriteFailedWithContext {
734 message: error.to_string(),
735 })?;
736 record_agg_trade_event(snapshot, &symbol, event_time, agg_trade_bar_seconds);
737 }
738
739 Ok(())
740}
741
742fn market_stream_base_url(mode: BinanceMode) -> &'static str {
743 let _ = mode;
744 "wss://fstream.binance.com"
745}
746
747fn combined_symbol_stream_url(mode: BinanceMode, watched_symbols: &[String]) -> Option<String> {
748 if watched_symbols.is_empty() {
749 return None;
750 }
751
752 let streams = watched_symbols
753 .iter()
754 .flat_map(|symbol| {
755 let lower = symbol.to_ascii_lowercase();
756 [format!("{lower}@bookTicker"), format!("{lower}@aggTrade")]
757 })
758 .collect::<Vec<_>>()
759 .join("/");
760 Some(format!(
761 "{}/stream?streams={streams}",
762 market_stream_base_url(mode)
763 ))
764}
765
766fn normalize_symbols(symbols: Vec<String>) -> Vec<String> {
767 let mut normalized = BTreeSet::new();
768 for symbol in symbols {
769 normalized.insert(symbol.trim().to_ascii_uppercase());
770 }
771 normalized.into_iter().collect()
772}
773
774fn merge_symbol_sets(left: Vec<String>, right: Vec<String>) -> Vec<String> {
775 let mut merged = BTreeSet::new();
776 for symbol in left.into_iter().chain(right.into_iter()) {
777 merged.insert(symbol);
778 }
779 merged.into_iter().collect()
780}
781
782#[derive(Debug, Deserialize)]
783struct ForceOrderEnvelope {
784 #[serde(rename = "E")]
785 event_time: i64,
786 #[serde(rename = "o")]
787 order: Option<ForceOrderData>,
788}
789
790#[derive(Debug, Deserialize)]
791struct ForceOrderData {
792 #[serde(rename = "s")]
793 symbol: String,
794 #[serde(rename = "S")]
795 side: String,
796 #[serde(rename = "p", deserialize_with = "deserialize_string_number")]
797 price: f64,
798 #[serde(rename = "q", deserialize_with = "deserialize_string_number")]
799 qty: f64,
800}
801
802#[derive(Debug, Deserialize)]
803struct CombinedStreamEnvelope {
804 data: CombinedStreamData,
805}
806
807#[derive(Debug, Deserialize)]
808struct CombinedStreamData {
809 #[serde(rename = "e")]
810 event_type: String,
811 #[serde(rename = "E")]
812 event_time: Option<i64>,
813 #[serde(rename = "s")]
814 symbol: Option<String>,
815 #[serde(
816 rename = "b",
817 default,
818 deserialize_with = "deserialize_optional_string_number"
819 )]
820 bid: Option<f64>,
821 #[serde(
822 rename = "B",
823 default,
824 deserialize_with = "deserialize_optional_string_number"
825 )]
826 bid_qty: Option<f64>,
827 #[serde(
828 rename = "a",
829 default,
830 deserialize_with = "deserialize_optional_string_number"
831 )]
832 ask: Option<f64>,
833 #[serde(
834 rename = "A",
835 default,
836 deserialize_with = "deserialize_optional_string_number"
837 )]
838 ask_qty: Option<f64>,
839 #[serde(
840 rename = "p",
841 default,
842 deserialize_with = "deserialize_optional_string_number"
843 )]
844 price: Option<f64>,
845 #[serde(
846 rename = "q",
847 default,
848 deserialize_with = "deserialize_optional_string_number"
849 )]
850 qty: Option<f64>,
851 #[serde(rename = "m")]
852 is_buyer_maker: Option<bool>,
853}
854
855fn deserialize_string_number<'de, D>(deserializer: D) -> Result<f64, D::Error>
856where
857 D: serde::Deserializer<'de>,
858{
859 let value = String::deserialize(deserializer)?;
860 value.parse::<f64>().map_err(serde::de::Error::custom)
861}
862
863fn deserialize_optional_string_number<'de, D>(deserializer: D) -> Result<Option<f64>, D::Error>
864where
865 D: serde::Deserializer<'de>,
866{
867 let value = Option::<serde_json::Value>::deserialize(deserializer)?;
868 match value {
869 None | Some(serde_json::Value::Null) => Ok(None),
870 Some(serde_json::Value::String(value)) => value
871 .parse::<f64>()
872 .map(Some)
873 .map_err(serde::de::Error::custom),
874 Some(serde_json::Value::Number(value)) => value
875 .as_f64()
876 .ok_or_else(|| serde::de::Error::custom("invalid numeric value"))
877 .map(Some),
878 Some(other) => Err(serde::de::Error::custom(format!(
879 "expected string or number, got {other}"
880 ))),
881 }
882}
883
884fn touch_worker_snapshot(snapshot: &Arc<Mutex<WorkerSnapshot>>) {
885 if let Ok(mut snapshot) = snapshot.lock() {
886 snapshot.updated_at = Utc::now();
887 }
888}
889
890fn record_worker_error(snapshot: &Arc<Mutex<WorkerSnapshot>>, error: String) {
891 if let Ok(mut snapshot) = snapshot.lock() {
892 snapshot.updated_at = Utc::now();
893 snapshot.last_error = Some(error);
894 }
895}
896
897fn record_force_order_event(snapshot: &Arc<Mutex<WorkerSnapshot>>, connection: &Connection) {
898 if let Ok(mut snapshot) = snapshot.lock() {
899 snapshot.updated_at = Utc::now();
900 snapshot.last_error = None;
901 snapshot.metrics.liquidation_events += 1;
902 snapshot.metrics.last_liquidation_event_time =
903 query_latest_timestamp(connection, "raw_liquidation_events", "event_time")
904 .ok()
905 .flatten();
906 snapshot.metrics.top_liquidation_symbols =
907 query_top_symbols(connection, "raw_liquidation_events").unwrap_or_default();
908 }
909}
910
911fn record_book_ticker_event(
912 snapshot: &Arc<Mutex<WorkerSnapshot>>,
913 symbol: &str,
914 event_time_ms: i64,
915) {
916 if let Ok(mut snapshot) = snapshot.lock() {
917 snapshot.updated_at = Utc::now();
918 snapshot.last_error = None;
919 snapshot.metrics.book_ticker_events += 1;
920 snapshot.metrics.last_book_ticker_event_time = timestamp_string(event_time_ms);
921 increment_top_symbol(&mut snapshot.metrics.top_book_ticker_symbols, symbol);
922 }
923}
924
925fn record_agg_trade_event(
926 snapshot: &Arc<Mutex<WorkerSnapshot>>,
927 symbol: &str,
928 event_time_ms: i64,
929 agg_trade_bar_seconds: &mut BTreeMap<String, i64>,
930) {
931 if let Ok(mut snapshot) = snapshot.lock() {
932 snapshot.updated_at = Utc::now();
933 snapshot.last_error = None;
934 snapshot.metrics.agg_trade_events += 1;
935 snapshot.metrics.last_agg_trade_event_time = timestamp_string(event_time_ms);
936 increment_top_symbol(&mut snapshot.metrics.top_agg_trade_symbols, symbol);
937 let bar_second = event_time_ms / 1_000;
938 let should_increment_bar = agg_trade_bar_seconds
939 .insert(symbol.to_string(), bar_second)
940 .map(|previous| previous != bar_second)
941 .unwrap_or(true);
942 if should_increment_bar {
943 snapshot.metrics.derived_kline_1s_bars += 1;
944 }
945 }
946}
947
948fn increment_top_symbol(top_symbols: &mut Vec<String>, symbol: &str) {
949 let mut counts = top_symbols
950 .iter()
951 .filter_map(|entry| {
952 let (symbol, count) = entry.split_once(':')?;
953 let count = count.parse::<u64>().ok()?;
954 Some((symbol.to_string(), count))
955 })
956 .collect::<BTreeMap<_, _>>();
957 *counts.entry(symbol.to_string()).or_default() += 1;
958 let mut sorted = counts.into_iter().collect::<Vec<_>>();
959 sorted.sort_by(|left, right| right.1.cmp(&left.1).then_with(|| left.0.cmp(&right.0)));
960 *top_symbols = sorted
961 .into_iter()
962 .take(5)
963 .map(|(symbol, count)| format!("{symbol}:{count}"))
964 .collect();
965}
966
967fn timestamp_string(event_time_ms: i64) -> Option<String> {
968 Utc.timestamp_millis_opt(event_time_ms)
969 .single()
970 .map(|value| {
971 value
972 .naive_utc()
973 .format("%Y-%m-%d %H:%M:%S%.3f")
974 .to_string()
975 })
976}
977
978fn query_latest_timestamp(
979 connection: &Connection,
980 table: &str,
981 column: &str,
982) -> Result<Option<String>, StorageError> {
983 let sql = format!("SELECT CAST(MAX({column}) AS VARCHAR) FROM {table}");
984 let mut statement =
985 connection
986 .prepare(&sql)
987 .map_err(|error| StorageError::WriteFailedWithContext {
988 message: error.to_string(),
989 })?;
990 statement.query_row([], |row| row.get(0)).map_err(|error| {
991 StorageError::WriteFailedWithContext {
992 message: error.to_string(),
993 }
994 })
995}
996
997fn query_top_symbols(connection: &Connection, table: &str) -> Result<Vec<String>, StorageError> {
998 let sql = format!(
999 "SELECT symbol, COUNT(*) AS row_count FROM {table} GROUP BY symbol ORDER BY row_count DESC, symbol ASC LIMIT 5"
1000 );
1001 let mut statement =
1002 connection
1003 .prepare(&sql)
1004 .map_err(|error| StorageError::WriteFailedWithContext {
1005 message: error.to_string(),
1006 })?;
1007 let mut rows = statement
1008 .query([])
1009 .map_err(|error| StorageError::WriteFailedWithContext {
1010 message: error.to_string(),
1011 })?;
1012 let mut result = Vec::new();
1013 while let Some(row) = rows
1014 .next()
1015 .map_err(|error| StorageError::WriteFailedWithContext {
1016 message: error.to_string(),
1017 })?
1018 {
1019 let symbol: String = row
1020 .get(0)
1021 .map_err(|error| StorageError::WriteFailedWithContext {
1022 message: error.to_string(),
1023 })?;
1024 let row_count: i64 = row
1025 .get(1)
1026 .map_err(|error| StorageError::WriteFailedWithContext {
1027 message: error.to_string(),
1028 })?;
1029 result.push(format!("{symbol}:{row_count}"));
1030 }
1031 Ok(result)
1032}