1use std::marker::PhantomData;
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use anyhow::{anyhow, Context, Result};
8use chrono::{NaiveDate, Utc};
9use parquet::arrow::AsyncArrowWriter;
10use parquet::basic::{Compression, ZstdLevel};
11use parquet::file::properties::WriterProperties;
12use tokio::fs::{self, File};
13use tokio::sync::mpsc::{self, error::TrySendError};
14use tokio::task::JoinHandle;
15use tokio::time::{interval, MissedTickBehavior};
16use tracing::{debug, warn};
17
18use tesser_core::{Candle, Fill, Order, Tick};
19
20use crate::encoding::{
21 candle_schema, candles_to_batch, fill_schema, fills_to_batch, order_schema, orders_to_batch,
22 tick_schema, ticks_to_batch,
23};
24
25#[derive(Clone, Debug)]
27pub struct RecorderConfig {
28 pub root: PathBuf,
30 pub max_buffered_rows: usize,
32 pub flush_interval: Duration,
34 pub max_rows_per_file: usize,
36 pub channel_capacity: usize,
38}
39
40impl Default for RecorderConfig {
41 fn default() -> Self {
42 Self {
43 root: PathBuf::from("data/flight_recorder"),
44 max_buffered_rows: 1024,
45 flush_interval: Duration::from_secs(5),
46 max_rows_per_file: 250_000,
47 channel_capacity: 4096,
48 }
49 }
50}
51
52pub struct ParquetRecorder {
54 handle: RecorderHandle,
55 task: Option<JoinHandle<Result<()>>>,
56}
57
58impl ParquetRecorder {
59 pub async fn spawn(config: RecorderConfig) -> Result<Self> {
61 fs::create_dir_all(&config.root)
62 .await
63 .with_context(|| format!("failed to create {}", config.root.display()))?;
64
65 let (tx, rx) = mpsc::channel(config.channel_capacity.max(1));
66 let worker = FlightRecorderWorker::new(config, rx).await?;
67 let task = tokio::spawn(async move { worker.run().await });
68 Ok(Self {
69 handle: RecorderHandle { sender: tx },
70 task: Some(task),
71 })
72 }
73
74 pub fn handle(&self) -> RecorderHandle {
76 self.handle.clone()
77 }
78
79 pub async fn shutdown(mut self) -> Result<()> {
81 drop(self.handle);
82 if let Some(task) = self.task.take() {
83 match task.await {
84 Ok(result) => result?,
85 Err(err) => {
86 return Err(anyhow!("flight recorder task aborted: {err}"));
87 }
88 }
89 }
90 Ok(())
91 }
92}
93
94#[derive(Clone)]
95pub struct RecorderHandle {
96 sender: mpsc::Sender<RecorderMessage>,
97}
98
99impl RecorderHandle {
100 pub fn record_tick(&self, tick: Tick) {
102 self.enqueue(RecorderMessage::Tick(tick), "tick", &TICK_SATURATION);
103 }
104
105 pub fn record_candle(&self, candle: Candle) {
107 self.enqueue(
108 RecorderMessage::Candle(candle),
109 "candle",
110 &CANDLE_SATURATION,
111 );
112 }
113
114 pub fn record_fill(&self, fill: Fill) {
116 self.enqueue(RecorderMessage::Fill(fill), "fill", &FILL_SATURATION);
117 }
118
119 pub fn record_order(&self, order: Order) {
121 self.enqueue(RecorderMessage::Order(order), "order", &ORDER_SATURATION);
122 }
123
124 fn enqueue(&self, message: RecorderMessage, label: &'static str, flag: &'static AtomicBool) {
125 match self.sender.try_send(message) {
126 Ok(()) => {}
127 Err(TrySendError::Full(_)) => {
128 if !flag.swap(true, Ordering::Relaxed) {
129 warn!("flight recorder channel saturated; dropping {label} events");
130 }
131 }
132 Err(TrySendError::Closed(_)) => {
133 warn!("flight recorder channel closed; ignoring {label} event");
134 }
135 }
136 }
137}
138
139enum RecorderMessage {
140 Tick(Tick),
141 Candle(Candle),
142 Fill(Fill),
143 Order(Order),
144}
145
146static TICK_SATURATION: AtomicBool = AtomicBool::new(false);
147static CANDLE_SATURATION: AtomicBool = AtomicBool::new(false);
148static FILL_SATURATION: AtomicBool = AtomicBool::new(false);
149static ORDER_SATURATION: AtomicBool = AtomicBool::new(false);
150
151struct FlightRecorderWorker {
152 rx: mpsc::Receiver<RecorderMessage>,
153 tick_sink: DataSink<TickEncoder>,
154 candle_sink: DataSink<CandleEncoder>,
155 fill_sink: DataSink<FillEncoder>,
156 order_sink: DataSink<OrderEncoder>,
157 flush_interval: Duration,
158}
159
160impl FlightRecorderWorker {
161 async fn new(config: RecorderConfig, rx: mpsc::Receiver<RecorderMessage>) -> Result<Self> {
162 let writer_props = Arc::new(
163 WriterProperties::builder()
164 .set_compression(Compression::ZSTD(ZstdLevel::default()))
165 .build(),
166 );
167
168 let tick_dir = ensure_subdir(&config.root, TickEncoder::KIND).await?;
169 let candle_dir = ensure_subdir(&config.root, CandleEncoder::KIND).await?;
170 let fill_dir = ensure_subdir(&config.root, FillEncoder::KIND).await?;
171 let order_dir = ensure_subdir(&config.root, OrderEncoder::KIND).await?;
172
173 Ok(Self {
174 rx,
175 tick_sink: DataSink::new(
176 tick_dir,
177 config.max_buffered_rows,
178 config.max_rows_per_file,
179 config.flush_interval,
180 writer_props.clone(),
181 ),
182 candle_sink: DataSink::new(
183 candle_dir,
184 config.max_buffered_rows,
185 config.max_rows_per_file,
186 config.flush_interval,
187 writer_props.clone(),
188 ),
189 fill_sink: DataSink::new(
190 fill_dir,
191 config.max_buffered_rows,
192 config.max_rows_per_file,
193 config.flush_interval,
194 writer_props.clone(),
195 ),
196 order_sink: DataSink::new(
197 order_dir,
198 config.max_buffered_rows,
199 config.max_rows_per_file,
200 config.flush_interval,
201 writer_props,
202 ),
203 flush_interval: config.flush_interval,
204 })
205 }
206
207 async fn run(mut self) -> Result<()> {
208 let mut timer = interval(self.flush_interval);
209 timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
210
211 loop {
212 tokio::select! {
213 Some(msg) = self.rx.recv() => {
214 self.handle_message(msg).await?;
215 }
216 _ = timer.tick() => {
217 self.tick_sink.maybe_flush_due_time().await?;
218 self.candle_sink.maybe_flush_due_time().await?;
219 self.fill_sink.maybe_flush_due_time().await?;
220 self.order_sink.maybe_flush_due_time().await?;
221 }
222 else => break,
223 }
224 }
225
226 self.tick_sink.shutdown().await?;
227 self.candle_sink.shutdown().await?;
228 self.fill_sink.shutdown().await?;
229 self.order_sink.shutdown().await?;
230 Ok(())
231 }
232
233 async fn handle_message(&mut self, msg: RecorderMessage) -> Result<()> {
234 match msg {
235 RecorderMessage::Tick(tick) => self.tick_sink.push(tick).await?,
236 RecorderMessage::Candle(candle) => self.candle_sink.push(candle).await?,
237 RecorderMessage::Fill(fill) => self.fill_sink.push(fill).await?,
238 RecorderMessage::Order(order) => self.order_sink.push(order).await?,
239 }
240 Ok(())
241 }
242}
243
244async fn ensure_subdir(root: &Path, kind: &str) -> Result<PathBuf> {
245 let path = root.join(kind);
246 fs::create_dir_all(&path)
247 .await
248 .with_context(|| format!("failed to create {}", path.display()))?;
249 Ok(path)
250}
251
252trait SinkEncoder {
253 type Record: Send + 'static;
254 const KIND: &'static str;
255
256 fn schema() -> Arc<arrow::datatypes::Schema>;
257 fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch>;
258 fn partition_for(record: &Self::Record) -> NaiveDate;
259}
260
261struct TickEncoder;
262struct CandleEncoder;
263struct FillEncoder;
264struct OrderEncoder;
265
266impl SinkEncoder for TickEncoder {
267 type Record = Tick;
268 const KIND: &'static str = "ticks";
269
270 fn schema() -> Arc<arrow::datatypes::Schema> {
271 tick_schema()
272 }
273
274 fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
275 ticks_to_batch(records)
276 }
277
278 fn partition_for(record: &Self::Record) -> NaiveDate {
279 record.exchange_timestamp.date_naive()
280 }
281}
282
283impl SinkEncoder for CandleEncoder {
284 type Record = Candle;
285 const KIND: &'static str = "candles";
286
287 fn schema() -> Arc<arrow::datatypes::Schema> {
288 candle_schema()
289 }
290
291 fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
292 candles_to_batch(records)
293 }
294
295 fn partition_for(record: &Self::Record) -> NaiveDate {
296 record.timestamp.date_naive()
297 }
298}
299
300impl SinkEncoder for FillEncoder {
301 type Record = Fill;
302 const KIND: &'static str = "fills";
303
304 fn schema() -> Arc<arrow::datatypes::Schema> {
305 fill_schema()
306 }
307
308 fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
309 fills_to_batch(records)
310 }
311
312 fn partition_for(record: &Self::Record) -> NaiveDate {
313 record.timestamp.date_naive()
314 }
315}
316
317impl SinkEncoder for OrderEncoder {
318 type Record = Order;
319 const KIND: &'static str = "orders";
320
321 fn schema() -> Arc<arrow::datatypes::Schema> {
322 order_schema()
323 }
324
325 fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
326 orders_to_batch(records)
327 }
328
329 fn partition_for(record: &Self::Record) -> NaiveDate {
330 record.updated_at.date_naive()
331 }
332}
333
334struct DataSink<E: SinkEncoder> {
335 dir: PathBuf,
336 buffer: Vec<E::Record>,
337 writer: Option<ActiveWriter>,
338 partition: Option<NaiveDate>,
339 max_buffered_rows: usize,
340 max_rows_per_file: usize,
341 flush_interval: Duration,
342 last_flush: Instant,
343 properties: Arc<WriterProperties>,
344 file_seq: u64,
345 schema: Arc<arrow::datatypes::Schema>,
346 _marker: PhantomData<E>,
347}
348
349impl<E: SinkEncoder> DataSink<E> {
350 fn new(
351 dir: PathBuf,
352 max_buffered_rows: usize,
353 max_rows_per_file: usize,
354 flush_interval: Duration,
355 properties: Arc<WriterProperties>,
356 ) -> Self {
357 Self {
358 dir,
359 buffer: Vec::with_capacity(max_buffered_rows.max(1)),
360 writer: None,
361 partition: None,
362 max_buffered_rows: max_buffered_rows.max(1),
363 max_rows_per_file: max_rows_per_file.max(1),
364 flush_interval,
365 last_flush: Instant::now(),
366 properties,
367 file_seq: 0,
368 schema: E::schema(),
369 _marker: PhantomData,
370 }
371 }
372
373 async fn push(&mut self, record: E::Record) -> Result<()> {
374 let partition = E::partition_for(&record);
375 if self.partition != Some(partition) {
376 self.flush().await?;
377 self.close_writer().await?;
378 self.partition = Some(partition);
379 }
380 self.buffer.push(record);
381 if self.buffer.len() >= self.max_buffered_rows {
382 self.flush().await?;
383 }
384 Ok(())
385 }
386
387 async fn maybe_flush_due_time(&mut self) -> Result<()> {
388 if self.buffer.is_empty() {
389 return Ok(());
390 }
391 if self.last_flush.elapsed() >= self.flush_interval {
392 self.flush().await?;
393 }
394 Ok(())
395 }
396
397 async fn flush(&mut self) -> Result<()> {
398 if self.buffer.is_empty() {
399 return Ok(());
400 }
401 if self.partition.is_none() {
402 if let Some(first) = self.buffer.first() {
403 self.partition = Some(E::partition_for(first));
404 }
405 }
406 let partition = self
407 .partition
408 .ok_or_else(|| anyhow!("buffer partition undefined"))?;
409
410 self.ensure_writer(partition).await?;
411 let rows = std::mem::take(&mut self.buffer);
412 if rows.is_empty() {
413 return Ok(());
414 }
415 let row_count = rows.len();
416 let batch = match E::encode(&rows) {
417 Ok(batch) => batch,
418 Err(err) => {
419 warn!(
420 kind = E::KIND,
421 error = %err,
422 dropped = row_count,
423 "failed to encode flight recorder batch"
424 );
425 self.last_flush = Instant::now();
426 return Ok(());
427 }
428 };
429
430 if batch.num_rows() == 0 {
431 return Ok(());
432 }
433
434 if let Some(writer) = &mut self.writer {
435 writer.write(&batch).await?;
436 if writer.rows_written >= self.max_rows_per_file {
437 if let Some(writer) = self.writer.take() {
438 writer.finish().await?;
439 }
440 }
441 }
442 self.last_flush = Instant::now();
443 Ok(())
444 }
445
446 async fn ensure_writer(&mut self, partition: NaiveDate) -> Result<()> {
447 if self.writer.is_some() {
448 return Ok(());
449 }
450 let date_dir = self.dir.join(partition.format("%Y-%m-%d").to_string());
451 fs::create_dir_all(&date_dir)
452 .await
453 .with_context(|| format!("failed to create {}", date_dir.display()))?;
454 let timestamp = Utc::now().format("%Y%m%dT%H%M%S");
455 let file_name = format!("{}-{}-{:04}.parquet", E::KIND, timestamp, self.file_seq);
456 self.file_seq = self.file_seq.wrapping_add(1);
457 let path = date_dir.join(file_name);
458 let file = File::create(&path)
459 .await
460 .with_context(|| format!("failed to create {}", path.display()))?;
461 let writer = AsyncArrowWriter::try_new(
462 file,
463 self.schema.clone(),
464 Some(self.properties.as_ref().clone()),
465 )?;
466 debug!(kind = E::KIND, path = %path.display(), "opened flight recorder file");
467 self.writer = Some(ActiveWriter {
468 writer,
469 rows_written: 0,
470 path,
471 });
472 Ok(())
473 }
474
475 async fn close_writer(&mut self) -> Result<()> {
476 if let Some(writer) = self.writer.take() {
477 writer.finish().await?;
478 }
479 Ok(())
480 }
481
482 async fn shutdown(&mut self) -> Result<()> {
483 self.flush().await?;
484 self.close_writer().await?;
485 Ok(())
486 }
487}
488
489struct ActiveWriter {
490 writer: AsyncArrowWriter<File>,
491 rows_written: usize,
492 path: PathBuf,
493}
494
495impl ActiveWriter {
496 async fn write(&mut self, batch: &arrow::record_batch::RecordBatch) -> Result<()> {
497 self.writer.write(batch).await?;
498 self.rows_written += batch.num_rows();
499 Ok(())
500 }
501
502 async fn finish(mut self) -> Result<()> {
503 self.writer.finish().await?;
504 debug!(path = %self.path.display(), "closed flight recorder file");
505 Ok(())
506 }
507}