tesser_data/
recorder.rs

1use std::marker::PhantomData;
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use anyhow::{anyhow, Context, Result};
8use chrono::{NaiveDate, Utc};
9use parquet::arrow::AsyncArrowWriter;
10use parquet::basic::{Compression, ZstdLevel};
11use parquet::file::properties::WriterProperties;
12use tokio::fs::{self, File};
13use tokio::sync::mpsc::{self, error::TrySendError};
14use tokio::task::JoinHandle;
15use tokio::time::{interval, MissedTickBehavior};
16use tracing::{debug, warn};
17
18use tesser_core::{Candle, Fill, Order, Tick};
19
20use crate::encoding::{
21    candle_schema, candles_to_batch, fill_schema, fills_to_batch, order_schema, orders_to_batch,
22    tick_schema, ticks_to_batch,
23};
24
25/// Configuration used when spawning a [`ParquetRecorder`].
26#[derive(Clone, Debug)]
27pub struct RecorderConfig {
28    /// Directory where all flight recorder data will be stored.
29    pub root: PathBuf,
30    /// Maximum number of rows buffered in-memory before forcing a flush.
31    pub max_buffered_rows: usize,
32    /// How often to flush buffers when traffic is low.
33    pub flush_interval: Duration,
34    /// Maximum number of rows per parquet file before starting a new file.
35    pub max_rows_per_file: usize,
36    /// Capacity of the asynchronous channel accepting recorder events.
37    pub channel_capacity: usize,
38}
39
40impl Default for RecorderConfig {
41    fn default() -> Self {
42        Self {
43            root: PathBuf::from("data/flight_recorder"),
44            max_buffered_rows: 1024,
45            flush_interval: Duration::from_secs(5),
46            max_rows_per_file: 250_000,
47            channel_capacity: 4096,
48        }
49    }
50}
51
52/// Background component responsible for writing parquet batches to disk.
53pub struct ParquetRecorder {
54    handle: RecorderHandle,
55    task: Option<JoinHandle<Result<()>>>,
56}
57
58impl ParquetRecorder {
59    /// Starts a new recorder task with the provided configuration.
60    pub async fn spawn(config: RecorderConfig) -> Result<Self> {
61        fs::create_dir_all(&config.root)
62            .await
63            .with_context(|| format!("failed to create {}", config.root.display()))?;
64
65        let (tx, rx) = mpsc::channel(config.channel_capacity.max(1));
66        let worker = FlightRecorderWorker::new(config, rx).await?;
67        let task = tokio::spawn(async move { worker.run().await });
68        Ok(Self {
69            handle: RecorderHandle { sender: tx },
70            task: Some(task),
71        })
72    }
73
74    /// Returns a handle that can be cloned and used across tasks to enqueue records.
75    pub fn handle(&self) -> RecorderHandle {
76        self.handle.clone()
77    }
78
79    /// Signals the recorder to stop and waits for buffers to flush.
80    pub async fn shutdown(mut self) -> Result<()> {
81        drop(self.handle);
82        if let Some(task) = self.task.take() {
83            match task.await {
84                Ok(result) => result?,
85                Err(err) => {
86                    return Err(anyhow!("flight recorder task aborted: {err}"));
87                }
88            }
89        }
90        Ok(())
91    }
92}
93
94#[derive(Clone)]
95pub struct RecorderHandle {
96    sender: mpsc::Sender<RecorderMessage>,
97}
98
99impl RecorderHandle {
100    /// Enqueues a tick event for recording.
101    pub fn record_tick(&self, tick: Tick) {
102        self.enqueue(RecorderMessage::Tick(tick), "tick", &TICK_SATURATION);
103    }
104
105    /// Enqueues a candle event for recording.
106    pub fn record_candle(&self, candle: Candle) {
107        self.enqueue(
108            RecorderMessage::Candle(candle),
109            "candle",
110            &CANDLE_SATURATION,
111        );
112    }
113
114    /// Enqueues a fill event for recording.
115    pub fn record_fill(&self, fill: Fill) {
116        self.enqueue(RecorderMessage::Fill(fill), "fill", &FILL_SATURATION);
117    }
118
119    /// Enqueues an order update for recording.
120    pub fn record_order(&self, order: Order) {
121        self.enqueue(RecorderMessage::Order(order), "order", &ORDER_SATURATION);
122    }
123
124    fn enqueue(&self, message: RecorderMessage, label: &'static str, flag: &'static AtomicBool) {
125        match self.sender.try_send(message) {
126            Ok(()) => {}
127            Err(TrySendError::Full(_)) => {
128                if !flag.swap(true, Ordering::Relaxed) {
129                    warn!("flight recorder channel saturated; dropping {label} events");
130                }
131            }
132            Err(TrySendError::Closed(_)) => {
133                warn!("flight recorder channel closed; ignoring {label} event");
134            }
135        }
136    }
137}
138
139enum RecorderMessage {
140    Tick(Tick),
141    Candle(Candle),
142    Fill(Fill),
143    Order(Order),
144}
145
146static TICK_SATURATION: AtomicBool = AtomicBool::new(false);
147static CANDLE_SATURATION: AtomicBool = AtomicBool::new(false);
148static FILL_SATURATION: AtomicBool = AtomicBool::new(false);
149static ORDER_SATURATION: AtomicBool = AtomicBool::new(false);
150
151struct FlightRecorderWorker {
152    rx: mpsc::Receiver<RecorderMessage>,
153    tick_sink: DataSink<TickEncoder>,
154    candle_sink: DataSink<CandleEncoder>,
155    fill_sink: DataSink<FillEncoder>,
156    order_sink: DataSink<OrderEncoder>,
157    flush_interval: Duration,
158}
159
160impl FlightRecorderWorker {
161    async fn new(config: RecorderConfig, rx: mpsc::Receiver<RecorderMessage>) -> Result<Self> {
162        let writer_props = Arc::new(
163            WriterProperties::builder()
164                .set_compression(Compression::ZSTD(ZstdLevel::default()))
165                .build(),
166        );
167
168        let tick_dir = ensure_subdir(&config.root, TickEncoder::KIND).await?;
169        let candle_dir = ensure_subdir(&config.root, CandleEncoder::KIND).await?;
170        let fill_dir = ensure_subdir(&config.root, FillEncoder::KIND).await?;
171        let order_dir = ensure_subdir(&config.root, OrderEncoder::KIND).await?;
172
173        Ok(Self {
174            rx,
175            tick_sink: DataSink::new(
176                tick_dir,
177                config.max_buffered_rows,
178                config.max_rows_per_file,
179                config.flush_interval,
180                writer_props.clone(),
181            ),
182            candle_sink: DataSink::new(
183                candle_dir,
184                config.max_buffered_rows,
185                config.max_rows_per_file,
186                config.flush_interval,
187                writer_props.clone(),
188            ),
189            fill_sink: DataSink::new(
190                fill_dir,
191                config.max_buffered_rows,
192                config.max_rows_per_file,
193                config.flush_interval,
194                writer_props.clone(),
195            ),
196            order_sink: DataSink::new(
197                order_dir,
198                config.max_buffered_rows,
199                config.max_rows_per_file,
200                config.flush_interval,
201                writer_props,
202            ),
203            flush_interval: config.flush_interval,
204        })
205    }
206
207    async fn run(mut self) -> Result<()> {
208        let mut timer = interval(self.flush_interval);
209        timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
210
211        loop {
212            tokio::select! {
213                Some(msg) = self.rx.recv() => {
214                    self.handle_message(msg).await?;
215                }
216                _ = timer.tick() => {
217                    self.tick_sink.maybe_flush_due_time().await?;
218                    self.candle_sink.maybe_flush_due_time().await?;
219                    self.fill_sink.maybe_flush_due_time().await?;
220                    self.order_sink.maybe_flush_due_time().await?;
221                }
222                else => break,
223            }
224        }
225
226        self.tick_sink.shutdown().await?;
227        self.candle_sink.shutdown().await?;
228        self.fill_sink.shutdown().await?;
229        self.order_sink.shutdown().await?;
230        Ok(())
231    }
232
233    async fn handle_message(&mut self, msg: RecorderMessage) -> Result<()> {
234        match msg {
235            RecorderMessage::Tick(tick) => self.tick_sink.push(tick).await?,
236            RecorderMessage::Candle(candle) => self.candle_sink.push(candle).await?,
237            RecorderMessage::Fill(fill) => self.fill_sink.push(fill).await?,
238            RecorderMessage::Order(order) => self.order_sink.push(order).await?,
239        }
240        Ok(())
241    }
242}
243
244async fn ensure_subdir(root: &Path, kind: &str) -> Result<PathBuf> {
245    let path = root.join(kind);
246    fs::create_dir_all(&path)
247        .await
248        .with_context(|| format!("failed to create {}", path.display()))?;
249    Ok(path)
250}
251
252trait SinkEncoder {
253    type Record: Send + 'static;
254    const KIND: &'static str;
255
256    fn schema() -> Arc<arrow::datatypes::Schema>;
257    fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch>;
258    fn partition_for(record: &Self::Record) -> NaiveDate;
259}
260
261struct TickEncoder;
262struct CandleEncoder;
263struct FillEncoder;
264struct OrderEncoder;
265
266impl SinkEncoder for TickEncoder {
267    type Record = Tick;
268    const KIND: &'static str = "ticks";
269
270    fn schema() -> Arc<arrow::datatypes::Schema> {
271        tick_schema()
272    }
273
274    fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
275        ticks_to_batch(records)
276    }
277
278    fn partition_for(record: &Self::Record) -> NaiveDate {
279        record.exchange_timestamp.date_naive()
280    }
281}
282
283impl SinkEncoder for CandleEncoder {
284    type Record = Candle;
285    const KIND: &'static str = "candles";
286
287    fn schema() -> Arc<arrow::datatypes::Schema> {
288        candle_schema()
289    }
290
291    fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
292        candles_to_batch(records)
293    }
294
295    fn partition_for(record: &Self::Record) -> NaiveDate {
296        record.timestamp.date_naive()
297    }
298}
299
300impl SinkEncoder for FillEncoder {
301    type Record = Fill;
302    const KIND: &'static str = "fills";
303
304    fn schema() -> Arc<arrow::datatypes::Schema> {
305        fill_schema()
306    }
307
308    fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
309        fills_to_batch(records)
310    }
311
312    fn partition_for(record: &Self::Record) -> NaiveDate {
313        record.timestamp.date_naive()
314    }
315}
316
317impl SinkEncoder for OrderEncoder {
318    type Record = Order;
319    const KIND: &'static str = "orders";
320
321    fn schema() -> Arc<arrow::datatypes::Schema> {
322        order_schema()
323    }
324
325    fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
326        orders_to_batch(records)
327    }
328
329    fn partition_for(record: &Self::Record) -> NaiveDate {
330        record.updated_at.date_naive()
331    }
332}
333
334struct DataSink<E: SinkEncoder> {
335    dir: PathBuf,
336    buffer: Vec<E::Record>,
337    writer: Option<ActiveWriter>,
338    partition: Option<NaiveDate>,
339    max_buffered_rows: usize,
340    max_rows_per_file: usize,
341    flush_interval: Duration,
342    last_flush: Instant,
343    properties: Arc<WriterProperties>,
344    file_seq: u64,
345    schema: Arc<arrow::datatypes::Schema>,
346    _marker: PhantomData<E>,
347}
348
349impl<E: SinkEncoder> DataSink<E> {
350    fn new(
351        dir: PathBuf,
352        max_buffered_rows: usize,
353        max_rows_per_file: usize,
354        flush_interval: Duration,
355        properties: Arc<WriterProperties>,
356    ) -> Self {
357        Self {
358            dir,
359            buffer: Vec::with_capacity(max_buffered_rows.max(1)),
360            writer: None,
361            partition: None,
362            max_buffered_rows: max_buffered_rows.max(1),
363            max_rows_per_file: max_rows_per_file.max(1),
364            flush_interval,
365            last_flush: Instant::now(),
366            properties,
367            file_seq: 0,
368            schema: E::schema(),
369            _marker: PhantomData,
370        }
371    }
372
373    async fn push(&mut self, record: E::Record) -> Result<()> {
374        let partition = E::partition_for(&record);
375        if self.partition != Some(partition) {
376            self.flush().await?;
377            self.close_writer().await?;
378            self.partition = Some(partition);
379        }
380        self.buffer.push(record);
381        if self.buffer.len() >= self.max_buffered_rows {
382            self.flush().await?;
383        }
384        Ok(())
385    }
386
387    async fn maybe_flush_due_time(&mut self) -> Result<()> {
388        if self.buffer.is_empty() {
389            return Ok(());
390        }
391        if self.last_flush.elapsed() >= self.flush_interval {
392            self.flush().await?;
393        }
394        Ok(())
395    }
396
397    async fn flush(&mut self) -> Result<()> {
398        if self.buffer.is_empty() {
399            return Ok(());
400        }
401        if self.partition.is_none() {
402            if let Some(first) = self.buffer.first() {
403                self.partition = Some(E::partition_for(first));
404            }
405        }
406        let partition = self
407            .partition
408            .ok_or_else(|| anyhow!("buffer partition undefined"))?;
409
410        self.ensure_writer(partition).await?;
411        let rows = std::mem::take(&mut self.buffer);
412        if rows.is_empty() {
413            return Ok(());
414        }
415        let row_count = rows.len();
416        let batch = match E::encode(&rows) {
417            Ok(batch) => batch,
418            Err(err) => {
419                warn!(
420                    kind = E::KIND,
421                    error = %err,
422                    dropped = row_count,
423                    "failed to encode flight recorder batch"
424                );
425                self.last_flush = Instant::now();
426                return Ok(());
427            }
428        };
429
430        if batch.num_rows() == 0 {
431            return Ok(());
432        }
433
434        if let Some(writer) = &mut self.writer {
435            writer.write(&batch).await?;
436            if writer.rows_written >= self.max_rows_per_file {
437                if let Some(writer) = self.writer.take() {
438                    writer.finish().await?;
439                }
440            }
441        }
442        self.last_flush = Instant::now();
443        Ok(())
444    }
445
446    async fn ensure_writer(&mut self, partition: NaiveDate) -> Result<()> {
447        if self.writer.is_some() {
448            return Ok(());
449        }
450        let date_dir = self.dir.join(partition.format("%Y-%m-%d").to_string());
451        fs::create_dir_all(&date_dir)
452            .await
453            .with_context(|| format!("failed to create {}", date_dir.display()))?;
454        let timestamp = Utc::now().format("%Y%m%dT%H%M%S");
455        let file_name = format!("{}-{}-{:04}.parquet", E::KIND, timestamp, self.file_seq);
456        self.file_seq = self.file_seq.wrapping_add(1);
457        let path = date_dir.join(file_name);
458        let file = File::create(&path)
459            .await
460            .with_context(|| format!("failed to create {}", path.display()))?;
461        let writer = AsyncArrowWriter::try_new(
462            file,
463            self.schema.clone(),
464            Some(self.properties.as_ref().clone()),
465        )?;
466        debug!(kind = E::KIND, path = %path.display(), "opened flight recorder file");
467        self.writer = Some(ActiveWriter {
468            writer,
469            rows_written: 0,
470            path,
471        });
472        Ok(())
473    }
474
475    async fn close_writer(&mut self) -> Result<()> {
476        if let Some(writer) = self.writer.take() {
477            writer.finish().await?;
478        }
479        Ok(())
480    }
481
482    async fn shutdown(&mut self) -> Result<()> {
483        self.flush().await?;
484        self.close_writer().await?;
485        Ok(())
486    }
487}
488
489struct ActiveWriter {
490    writer: AsyncArrowWriter<File>,
491    rows_written: usize,
492    path: PathBuf,
493}
494
495impl ActiveWriter {
496    async fn write(&mut self, batch: &arrow::record_batch::RecordBatch) -> Result<()> {
497        self.writer.write(batch).await?;
498        self.rows_written += batch.num_rows();
499        Ok(())
500    }
501
502    async fn finish(mut self) -> Result<()> {
503        self.writer.finish().await?;
504        debug!(path = %self.path.display(), "closed flight recorder file");
505        Ok(())
506    }
507}