1use std::marker::PhantomData;
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use anyhow::{anyhow, Context, Result};
8use chrono::{NaiveDate, Utc};
9use parquet::arrow::AsyncArrowWriter;
10use parquet::basic::{Compression, ZstdLevel};
11use parquet::file::properties::WriterProperties;
12use tokio::fs::{self, File};
13use tokio::sync::mpsc::{self, error::TrySendError};
14use tokio::task::JoinHandle;
15use tokio::time::{interval, MissedTickBehavior};
16use tracing::{debug, warn};
17
18use tesser_core::{Candle, Fill, Order, OrderBook, Signal, Tick};
19
20use crate::encoding::{
21 candle_schema, candles_to_batch, fill_schema, fills_to_batch, order_book_schema,
22 order_books_to_batch, order_schema, orders_to_batch, signal_schema, signals_to_batch,
23 tick_schema, ticks_to_batch,
24};
25
26#[derive(Clone, Debug)]
28pub struct RecorderConfig {
29 pub root: PathBuf,
31 pub max_buffered_rows: usize,
33 pub flush_interval: Duration,
35 pub max_rows_per_file: usize,
37 pub channel_capacity: usize,
39}
40
41impl Default for RecorderConfig {
42 fn default() -> Self {
43 Self {
44 root: PathBuf::from("data/flight_recorder"),
45 max_buffered_rows: 1024,
46 flush_interval: Duration::from_secs(5),
47 max_rows_per_file: 250_000,
48 channel_capacity: 4096,
49 }
50 }
51}
52
53pub struct ParquetRecorder {
55 handle: RecorderHandle,
56 task: Option<JoinHandle<Result<()>>>,
57}
58
59impl ParquetRecorder {
60 pub async fn spawn(config: RecorderConfig) -> Result<Self> {
62 fs::create_dir_all(&config.root)
63 .await
64 .with_context(|| format!("failed to create {}", config.root.display()))?;
65
66 let (tx, rx) = mpsc::channel(config.channel_capacity.max(1));
67 let worker = FlightRecorderWorker::new(config, rx).await?;
68 let task = tokio::spawn(async move { worker.run().await });
69 Ok(Self {
70 handle: RecorderHandle { sender: tx },
71 task: Some(task),
72 })
73 }
74
75 pub fn handle(&self) -> RecorderHandle {
77 self.handle.clone()
78 }
79
80 pub async fn shutdown(mut self) -> Result<()> {
82 drop(self.handle);
83 if let Some(task) = self.task.take() {
84 match task.await {
85 Ok(result) => result?,
86 Err(err) => {
87 return Err(anyhow!("flight recorder task aborted: {err}"));
88 }
89 }
90 }
91 Ok(())
92 }
93}
94
95#[derive(Clone)]
96pub struct RecorderHandle {
97 sender: mpsc::Sender<RecorderMessage>,
98}
99
100impl RecorderHandle {
101 pub fn record_tick(&self, tick: Tick) {
103 self.enqueue(RecorderMessage::Tick(tick), "tick", &TICK_SATURATION);
104 }
105
106 pub fn record_candle(&self, candle: Candle) {
108 self.enqueue(
109 RecorderMessage::Candle(candle),
110 "candle",
111 &CANDLE_SATURATION,
112 );
113 }
114
115 pub fn record_fill(&self, fill: Fill) {
117 self.enqueue(RecorderMessage::Fill(fill), "fill", &FILL_SATURATION);
118 }
119
120 pub fn record_order(&self, order: Order) {
122 self.enqueue(RecorderMessage::Order(order), "order", &ORDER_SATURATION);
123 }
124
125 pub fn record_order_book(&self, book: OrderBook) {
127 self.enqueue(
128 RecorderMessage::OrderBook(book),
129 "order_book",
130 &ORDER_BOOK_SATURATION,
131 );
132 }
133
134 pub fn record_signal(&self, signal: Signal) {
136 self.enqueue(
137 RecorderMessage::Signal(signal),
138 "signal",
139 &SIGNAL_SATURATION,
140 );
141 }
142
143 fn enqueue(&self, message: RecorderMessage, label: &'static str, flag: &'static AtomicBool) {
144 match self.sender.try_send(message) {
145 Ok(()) => {}
146 Err(TrySendError::Full(_)) => {
147 if !flag.swap(true, Ordering::Relaxed) {
148 warn!("flight recorder channel saturated; dropping {label} events");
149 }
150 }
151 Err(TrySendError::Closed(_)) => {
152 warn!("flight recorder channel closed; ignoring {label} event");
153 }
154 }
155 }
156}
157
158enum RecorderMessage {
159 Tick(Tick),
160 Candle(Candle),
161 Fill(Fill),
162 Order(Order),
163 OrderBook(OrderBook),
164 Signal(Signal),
165}
166
167static TICK_SATURATION: AtomicBool = AtomicBool::new(false);
168static CANDLE_SATURATION: AtomicBool = AtomicBool::new(false);
169static FILL_SATURATION: AtomicBool = AtomicBool::new(false);
170static ORDER_SATURATION: AtomicBool = AtomicBool::new(false);
171static ORDER_BOOK_SATURATION: AtomicBool = AtomicBool::new(false);
172static SIGNAL_SATURATION: AtomicBool = AtomicBool::new(false);
173
174struct FlightRecorderWorker {
175 rx: mpsc::Receiver<RecorderMessage>,
176 tick_sink: DataSink<TickEncoder>,
177 candle_sink: DataSink<CandleEncoder>,
178 fill_sink: DataSink<FillEncoder>,
179 order_sink: DataSink<OrderEncoder>,
180 book_sink: DataSink<OrderBookEncoder>,
181 signal_sink: DataSink<SignalEncoder>,
182 flush_interval: Duration,
183}
184
185impl FlightRecorderWorker {
186 async fn new(config: RecorderConfig, rx: mpsc::Receiver<RecorderMessage>) -> Result<Self> {
187 let writer_props = Arc::new(
188 WriterProperties::builder()
189 .set_compression(Compression::ZSTD(ZstdLevel::default()))
190 .build(),
191 );
192
193 let tick_dir = ensure_subdir(&config.root, TickEncoder::KIND).await?;
194 let candle_dir = ensure_subdir(&config.root, CandleEncoder::KIND).await?;
195 let fill_dir = ensure_subdir(&config.root, FillEncoder::KIND).await?;
196 let order_dir = ensure_subdir(&config.root, OrderEncoder::KIND).await?;
197 let book_dir = ensure_subdir(&config.root, OrderBookEncoder::KIND).await?;
198 let signal_dir = ensure_subdir(&config.root, SignalEncoder::KIND).await?;
199
200 Ok(Self {
201 rx,
202 tick_sink: DataSink::new(
203 tick_dir,
204 config.max_buffered_rows,
205 config.max_rows_per_file,
206 config.flush_interval,
207 writer_props.clone(),
208 ),
209 candle_sink: DataSink::new(
210 candle_dir,
211 config.max_buffered_rows,
212 config.max_rows_per_file,
213 config.flush_interval,
214 writer_props.clone(),
215 ),
216 fill_sink: DataSink::new(
217 fill_dir,
218 config.max_buffered_rows,
219 config.max_rows_per_file,
220 config.flush_interval,
221 writer_props.clone(),
222 ),
223 order_sink: DataSink::new(
224 order_dir,
225 config.max_buffered_rows,
226 config.max_rows_per_file,
227 config.flush_interval,
228 writer_props.clone(),
229 ),
230 book_sink: DataSink::new(
231 book_dir,
232 config.max_buffered_rows,
233 config.max_rows_per_file,
234 config.flush_interval,
235 writer_props.clone(),
236 ),
237 signal_sink: DataSink::new(
238 signal_dir,
239 config.max_buffered_rows,
240 config.max_rows_per_file,
241 config.flush_interval,
242 writer_props,
243 ),
244 flush_interval: config.flush_interval,
245 })
246 }
247
248 async fn run(mut self) -> Result<()> {
249 let mut timer = interval(self.flush_interval);
250 timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
251
252 loop {
253 tokio::select! {
254 message = self.rx.recv() => {
255 match message {
256 Some(msg) => self.handle_message(msg).await?,
257 None => break,
258 }
259 }
260 _ = timer.tick() => {
261 self.tick_sink.maybe_flush_due_time().await?;
262 self.candle_sink.maybe_flush_due_time().await?;
263 self.fill_sink.maybe_flush_due_time().await?;
264 self.order_sink.maybe_flush_due_time().await?;
265 self.book_sink.maybe_flush_due_time().await?;
266 self.signal_sink.maybe_flush_due_time().await?;
267 }
268 }
269 }
270
271 self.tick_sink.shutdown().await?;
272 self.candle_sink.shutdown().await?;
273 self.fill_sink.shutdown().await?;
274 self.order_sink.shutdown().await?;
275 self.book_sink.shutdown().await?;
276 self.signal_sink.shutdown().await?;
277 Ok(())
278 }
279
280 async fn handle_message(&mut self, msg: RecorderMessage) -> Result<()> {
281 match msg {
282 RecorderMessage::Tick(tick) => self.tick_sink.push(tick).await?,
283 RecorderMessage::Candle(candle) => self.candle_sink.push(candle).await?,
284 RecorderMessage::Fill(fill) => self.fill_sink.push(fill).await?,
285 RecorderMessage::Order(order) => self.order_sink.push(order).await?,
286 RecorderMessage::OrderBook(book) => self.book_sink.push(book).await?,
287 RecorderMessage::Signal(signal) => self.signal_sink.push(signal).await?,
288 }
289 Ok(())
290 }
291}
292
293async fn ensure_subdir(root: &Path, kind: &str) -> Result<PathBuf> {
294 let path = root.join(kind);
295 fs::create_dir_all(&path)
296 .await
297 .with_context(|| format!("failed to create {}", path.display()))?;
298 Ok(path)
299}
300
301trait SinkEncoder {
302 type Record: Send + 'static;
303 const KIND: &'static str;
304
305 fn schema() -> Arc<arrow::datatypes::Schema>;
306 fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch>;
307 fn partition_for(record: &Self::Record) -> NaiveDate;
308}
309
310struct TickEncoder;
311struct CandleEncoder;
312struct FillEncoder;
313struct OrderEncoder;
314struct OrderBookEncoder;
315struct SignalEncoder;
316
317impl SinkEncoder for TickEncoder {
318 type Record = Tick;
319 const KIND: &'static str = "ticks";
320
321 fn schema() -> Arc<arrow::datatypes::Schema> {
322 tick_schema()
323 }
324
325 fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
326 ticks_to_batch(records)
327 }
328
329 fn partition_for(record: &Self::Record) -> NaiveDate {
330 record.exchange_timestamp.date_naive()
331 }
332}
333
334impl SinkEncoder for CandleEncoder {
335 type Record = Candle;
336 const KIND: &'static str = "candles";
337
338 fn schema() -> Arc<arrow::datatypes::Schema> {
339 candle_schema()
340 }
341
342 fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
343 candles_to_batch(records)
344 }
345
346 fn partition_for(record: &Self::Record) -> NaiveDate {
347 record.timestamp.date_naive()
348 }
349}
350
351impl SinkEncoder for FillEncoder {
352 type Record = Fill;
353 const KIND: &'static str = "fills";
354
355 fn schema() -> Arc<arrow::datatypes::Schema> {
356 fill_schema()
357 }
358
359 fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
360 fills_to_batch(records)
361 }
362
363 fn partition_for(record: &Self::Record) -> NaiveDate {
364 record.timestamp.date_naive()
365 }
366}
367
368impl SinkEncoder for OrderEncoder {
369 type Record = Order;
370 const KIND: &'static str = "orders";
371
372 fn schema() -> Arc<arrow::datatypes::Schema> {
373 order_schema()
374 }
375
376 fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
377 orders_to_batch(records)
378 }
379
380 fn partition_for(record: &Self::Record) -> NaiveDate {
381 record.updated_at.date_naive()
382 }
383}
384
385impl SinkEncoder for SignalEncoder {
386 type Record = Signal;
387 const KIND: &'static str = "signals";
388
389 fn schema() -> Arc<arrow::datatypes::Schema> {
390 signal_schema()
391 }
392
393 fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
394 signals_to_batch(records)
395 }
396
397 fn partition_for(record: &Self::Record) -> NaiveDate {
398 record.generated_at.date_naive()
399 }
400}
401
402struct DataSink<E: SinkEncoder> {
403 dir: PathBuf,
404 buffer: Vec<E::Record>,
405 writer: Option<ActiveWriter>,
406 partition: Option<NaiveDate>,
407 max_buffered_rows: usize,
408 max_rows_per_file: usize,
409 flush_interval: Duration,
410 last_flush: Instant,
411 properties: Arc<WriterProperties>,
412 file_seq: u64,
413 schema: Arc<arrow::datatypes::Schema>,
414 _marker: PhantomData<E>,
415}
416
417impl<E: SinkEncoder> DataSink<E> {
418 fn new(
419 dir: PathBuf,
420 max_buffered_rows: usize,
421 max_rows_per_file: usize,
422 flush_interval: Duration,
423 properties: Arc<WriterProperties>,
424 ) -> Self {
425 Self {
426 dir,
427 buffer: Vec::with_capacity(max_buffered_rows.max(1)),
428 writer: None,
429 partition: None,
430 max_buffered_rows: max_buffered_rows.max(1),
431 max_rows_per_file: max_rows_per_file.max(1),
432 flush_interval,
433 last_flush: Instant::now(),
434 properties,
435 file_seq: 0,
436 schema: E::schema(),
437 _marker: PhantomData,
438 }
439 }
440
441 async fn push(&mut self, record: E::Record) -> Result<()> {
442 let partition = E::partition_for(&record);
443 if self.partition != Some(partition) {
444 self.flush().await?;
445 self.close_writer().await?;
446 self.partition = Some(partition);
447 }
448 self.buffer.push(record);
449 if self.buffer.len() >= self.max_buffered_rows {
450 self.flush().await?;
451 }
452 Ok(())
453 }
454
455 async fn maybe_flush_due_time(&mut self) -> Result<()> {
456 if self.buffer.is_empty() {
457 return Ok(());
458 }
459 if self.last_flush.elapsed() >= self.flush_interval {
460 self.flush().await?;
461 }
462 Ok(())
463 }
464
465 async fn flush(&mut self) -> Result<()> {
466 if self.buffer.is_empty() {
467 return Ok(());
468 }
469 if self.partition.is_none() {
470 if let Some(first) = self.buffer.first() {
471 self.partition = Some(E::partition_for(first));
472 }
473 }
474 let partition = self
475 .partition
476 .ok_or_else(|| anyhow!("buffer partition undefined"))?;
477
478 self.ensure_writer(partition).await?;
479 let rows = std::mem::take(&mut self.buffer);
480 if rows.is_empty() {
481 return Ok(());
482 }
483 let row_count = rows.len();
484 let batch = match E::encode(&rows) {
485 Ok(batch) => batch,
486 Err(err) => {
487 warn!(
488 kind = E::KIND,
489 error = %err,
490 dropped = row_count,
491 "failed to encode flight recorder batch"
492 );
493 self.last_flush = Instant::now();
494 return Ok(());
495 }
496 };
497
498 if batch.num_rows() == 0 {
499 return Ok(());
500 }
501
502 if let Some(writer) = &mut self.writer {
503 writer.write(&batch).await?;
504 if writer.rows_written >= self.max_rows_per_file {
505 if let Some(writer) = self.writer.take() {
506 writer.finish().await?;
507 }
508 }
509 }
510 self.last_flush = Instant::now();
511 Ok(())
512 }
513
514 async fn ensure_writer(&mut self, partition: NaiveDate) -> Result<()> {
515 if self.writer.is_some() {
516 return Ok(());
517 }
518 let date_dir = self.dir.join(partition.format("%Y-%m-%d").to_string());
519 fs::create_dir_all(&date_dir)
520 .await
521 .with_context(|| format!("failed to create {}", date_dir.display()))?;
522 let timestamp = Utc::now().format("%Y%m%dT%H%M%S");
523 let file_name = format!("{}-{}-{:04}.parquet", E::KIND, timestamp, self.file_seq);
524 self.file_seq = self.file_seq.wrapping_add(1);
525 let path = date_dir.join(file_name);
526 let file = File::create(&path)
527 .await
528 .with_context(|| format!("failed to create {}", path.display()))?;
529 let writer = AsyncArrowWriter::try_new(
530 file,
531 self.schema.clone(),
532 Some(self.properties.as_ref().clone()),
533 )?;
534 debug!(kind = E::KIND, path = %path.display(), "opened flight recorder file");
535 self.writer = Some(ActiveWriter {
536 writer,
537 rows_written: 0,
538 path,
539 });
540 Ok(())
541 }
542
543 async fn close_writer(&mut self) -> Result<()> {
544 if let Some(writer) = self.writer.take() {
545 writer.finish().await?;
546 }
547 Ok(())
548 }
549
550 async fn shutdown(&mut self) -> Result<()> {
551 self.flush().await?;
552 self.close_writer().await?;
553 Ok(())
554 }
555}
556
557struct ActiveWriter {
558 writer: AsyncArrowWriter<File>,
559 rows_written: usize,
560 path: PathBuf,
561}
562
563impl ActiveWriter {
564 async fn write(&mut self, batch: &arrow::record_batch::RecordBatch) -> Result<()> {
565 self.writer.write(batch).await?;
566 self.rows_written += batch.num_rows();
567 Ok(())
568 }
569
570 async fn finish(mut self) -> Result<()> {
571 self.writer.finish().await?;
572 debug!(path = %self.path.display(), "closed flight recorder file");
573 Ok(())
574 }
575}
576
577impl SinkEncoder for OrderBookEncoder {
578 type Record = OrderBook;
579 const KIND: &'static str = "order_books";
580
581 fn schema() -> Arc<arrow::datatypes::Schema> {
582 order_book_schema()
583 }
584
585 fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
586 order_books_to_batch(records)
587 }
588
589 fn partition_for(record: &Self::Record) -> NaiveDate {
590 record.timestamp.date_naive()
591 }
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597 use arrow::array::{
598 Array, Decimal128Array, Float64Array, StringArray, TimestampNanosecondArray,
599 };
600 use arrow::datatypes::DataType;
601 use arrow::record_batch::RecordBatch;
602 use chrono::Duration as ChronoDuration;
603 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
604 use rust_decimal::Decimal;
605 use tempfile::tempdir;
606 use tesser_core::{ExecutionHint, Signal, SignalKind};
607
608 fn build_signal() -> Signal {
609 let mut signal = Signal::new("BTCUSDT", SignalKind::EnterLong, 0.82);
610 signal.stop_loss = Some(Decimal::new(99_500, 2));
611 signal.take_profit = Some(Decimal::new(101_500, 2));
612 signal.generated_at = chrono::Utc::now();
613 signal.note = Some("trend-follow".into());
614 signal.execution_hint = Some(ExecutionHint::Twap {
615 duration: ChronoDuration::minutes(5),
616 });
617 signal
618 }
619
620 fn read_signal_batch(path: &Path) -> RecordBatch {
621 let file = std::fs::File::open(path).expect("open parquet file");
622 let builder = ParquetRecordBatchReaderBuilder::try_new(file).expect("build batch reader");
623 let mut reader = builder.build().expect("build reader");
624 reader.next().expect("batch result").expect("batch")
625 }
626
627 fn find_signal_file(root: &Path) -> PathBuf {
628 let signals_dir = root.join(SignalEncoder::KIND);
629 for day in std::fs::read_dir(&signals_dir).expect("read signal dir") {
630 let day = day.expect("entry");
631 if day.path().is_dir() {
632 for file in std::fs::read_dir(day.path()).expect("read partition dir") {
633 let file = file.expect("file entry");
634 if file
635 .path()
636 .extension()
637 .map(|ext| ext == "parquet")
638 .unwrap_or(false)
639 {
640 return file.path();
641 }
642 }
643 }
644 }
645 panic!("no signal parquet files found in {}", signals_dir.display());
646 }
647
648 fn decimal_from_array(arr: &Decimal128Array, idx: usize) -> Decimal {
649 let scale = match arr.data_type() {
650 DataType::Decimal128(_, scale) => (*scale) as u32,
651 _ => panic!("unexpected data type"),
652 };
653 Decimal::from_i128_with_scale(arr.value(idx), scale)
654 }
655
656 #[tokio::test]
657 async fn writes_signal_batches() {
658 let signal = build_signal();
659 let temp = tempdir().expect("tempdir");
660 let config = RecorderConfig {
661 root: temp.path().to_path_buf(),
662 max_buffered_rows: 1,
663 flush_interval: Duration::from_millis(20),
664 max_rows_per_file: 16,
665 channel_capacity: 4,
666 };
667 let recorder = ParquetRecorder::spawn(config)
668 .await
669 .expect("spawn recorder");
670 let handle = recorder.handle();
671 handle.record_signal(signal.clone());
672 tokio::time::sleep(Duration::from_millis(50)).await;
673 drop(handle);
674 recorder.shutdown().await.expect("shutdown recorder");
675
676 let file_path = find_signal_file(temp.path());
677 let batch = read_signal_batch(&file_path);
678 assert_eq!(batch.num_rows(), 1);
679
680 let ids = batch
681 .column(batch.schema().index_of("id").unwrap())
682 .as_any()
683 .downcast_ref::<StringArray>()
684 .unwrap();
685 assert_eq!(ids.value(0), signal.id.to_string());
686
687 let kinds = batch
688 .column(batch.schema().index_of("kind").unwrap())
689 .as_any()
690 .downcast_ref::<StringArray>()
691 .unwrap();
692 assert_eq!(kinds.value(0), "enter_long");
693
694 let confidences = batch
695 .column(batch.schema().index_of("confidence").unwrap())
696 .as_any()
697 .downcast_ref::<Float64Array>()
698 .unwrap();
699 assert!((confidences.value(0) - signal.confidence).abs() < f64::EPSILON);
700
701 let stop_losses = batch
702 .column(batch.schema().index_of("stop_loss").unwrap())
703 .as_any()
704 .downcast_ref::<Decimal128Array>()
705 .unwrap();
706 assert_eq!(
707 decimal_from_array(stop_losses, 0),
708 signal.stop_loss.unwrap()
709 );
710
711 let generated = batch
712 .column(batch.schema().index_of("generated_at").unwrap())
713 .as_any()
714 .downcast_ref::<TimestampNanosecondArray>()
715 .unwrap();
716 let nanos = generated.value(0);
717 let secs = nanos / 1_000_000_000;
718 let rem = (nanos % 1_000_000_000) as u32;
719 let recorded_ts = chrono::DateTime::<chrono::Utc>::from_timestamp(secs, rem).unwrap();
720 assert_eq!(recorded_ts.timestamp(), signal.generated_at.timestamp());
721
722 let metadata = batch
723 .column(batch.schema().index_of("metadata").unwrap())
724 .as_any()
725 .downcast_ref::<StringArray>()
726 .unwrap();
727 let meta_value: serde_json::Value =
728 serde_json::from_str(metadata.value(0)).expect("metadata json");
729 assert_eq!(meta_value["note"], signal.note.as_ref().unwrap().as_str());
730 }
731}