tesser_data/
recorder.rs

1use std::marker::PhantomData;
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use anyhow::{anyhow, Context, Result};
8use chrono::{NaiveDate, Utc};
9use parquet::arrow::AsyncArrowWriter;
10use parquet::basic::{Compression, ZstdLevel};
11use parquet::file::properties::WriterProperties;
12use tokio::fs::{self, File};
13use tokio::sync::mpsc::{self, error::TrySendError};
14use tokio::task::JoinHandle;
15use tokio::time::{interval, MissedTickBehavior};
16use tracing::{debug, warn};
17
18use tesser_core::{Candle, Fill, Order, OrderBook, Signal, Tick};
19
20use crate::encoding::{
21    candle_schema, candles_to_batch, fill_schema, fills_to_batch, order_book_schema,
22    order_books_to_batch, order_schema, orders_to_batch, signal_schema, signals_to_batch,
23    tick_schema, ticks_to_batch,
24};
25
26/// Configuration used when spawning a [`ParquetRecorder`].
27#[derive(Clone, Debug)]
28pub struct RecorderConfig {
29    /// Directory where all flight recorder data will be stored.
30    pub root: PathBuf,
31    /// Maximum number of rows buffered in-memory before forcing a flush.
32    pub max_buffered_rows: usize,
33    /// How often to flush buffers when traffic is low.
34    pub flush_interval: Duration,
35    /// Maximum number of rows per parquet file before starting a new file.
36    pub max_rows_per_file: usize,
37    /// Capacity of the asynchronous channel accepting recorder events.
38    pub channel_capacity: usize,
39}
40
41impl Default for RecorderConfig {
42    fn default() -> Self {
43        Self {
44            root: PathBuf::from("data/flight_recorder"),
45            max_buffered_rows: 1024,
46            flush_interval: Duration::from_secs(5),
47            max_rows_per_file: 250_000,
48            channel_capacity: 4096,
49        }
50    }
51}
52
53/// Background component responsible for writing parquet batches to disk.
54pub struct ParquetRecorder {
55    handle: RecorderHandle,
56    task: Option<JoinHandle<Result<()>>>,
57}
58
59impl ParquetRecorder {
60    /// Starts a new recorder task with the provided configuration.
61    pub async fn spawn(config: RecorderConfig) -> Result<Self> {
62        fs::create_dir_all(&config.root)
63            .await
64            .with_context(|| format!("failed to create {}", config.root.display()))?;
65
66        let (tx, rx) = mpsc::channel(config.channel_capacity.max(1));
67        let worker = FlightRecorderWorker::new(config, rx).await?;
68        let task = tokio::spawn(async move { worker.run().await });
69        Ok(Self {
70            handle: RecorderHandle { sender: tx },
71            task: Some(task),
72        })
73    }
74
75    /// Returns a handle that can be cloned and used across tasks to enqueue records.
76    pub fn handle(&self) -> RecorderHandle {
77        self.handle.clone()
78    }
79
80    /// Signals the recorder to stop and waits for buffers to flush.
81    pub async fn shutdown(mut self) -> Result<()> {
82        drop(self.handle);
83        if let Some(task) = self.task.take() {
84            match task.await {
85                Ok(result) => result?,
86                Err(err) => {
87                    return Err(anyhow!("flight recorder task aborted: {err}"));
88                }
89            }
90        }
91        Ok(())
92    }
93}
94
95#[derive(Clone)]
96pub struct RecorderHandle {
97    sender: mpsc::Sender<RecorderMessage>,
98}
99
100impl RecorderHandle {
101    /// Enqueues a tick event for recording.
102    pub fn record_tick(&self, tick: Tick) {
103        self.enqueue(RecorderMessage::Tick(tick), "tick", &TICK_SATURATION);
104    }
105
106    /// Enqueues a candle event for recording.
107    pub fn record_candle(&self, candle: Candle) {
108        self.enqueue(
109            RecorderMessage::Candle(candle),
110            "candle",
111            &CANDLE_SATURATION,
112        );
113    }
114
115    /// Enqueues a fill event for recording.
116    pub fn record_fill(&self, fill: Fill) {
117        self.enqueue(RecorderMessage::Fill(fill), "fill", &FILL_SATURATION);
118    }
119
120    /// Enqueues an order update for recording.
121    pub fn record_order(&self, order: Order) {
122        self.enqueue(RecorderMessage::Order(order), "order", &ORDER_SATURATION);
123    }
124
125    /// Enqueues an order book snapshot for recording.
126    pub fn record_order_book(&self, book: OrderBook) {
127        self.enqueue(
128            RecorderMessage::OrderBook(book),
129            "order_book",
130            &ORDER_BOOK_SATURATION,
131        );
132    }
133
134    /// Enqueues a signal for recording.
135    pub fn record_signal(&self, signal: Signal) {
136        self.enqueue(
137            RecorderMessage::Signal(signal),
138            "signal",
139            &SIGNAL_SATURATION,
140        );
141    }
142
143    fn enqueue(&self, message: RecorderMessage, label: &'static str, flag: &'static AtomicBool) {
144        match self.sender.try_send(message) {
145            Ok(()) => {}
146            Err(TrySendError::Full(_)) => {
147                if !flag.swap(true, Ordering::Relaxed) {
148                    warn!("flight recorder channel saturated; dropping {label} events");
149                }
150            }
151            Err(TrySendError::Closed(_)) => {
152                warn!("flight recorder channel closed; ignoring {label} event");
153            }
154        }
155    }
156}
157
158enum RecorderMessage {
159    Tick(Tick),
160    Candle(Candle),
161    Fill(Fill),
162    Order(Order),
163    OrderBook(OrderBook),
164    Signal(Signal),
165}
166
167static TICK_SATURATION: AtomicBool = AtomicBool::new(false);
168static CANDLE_SATURATION: AtomicBool = AtomicBool::new(false);
169static FILL_SATURATION: AtomicBool = AtomicBool::new(false);
170static ORDER_SATURATION: AtomicBool = AtomicBool::new(false);
171static ORDER_BOOK_SATURATION: AtomicBool = AtomicBool::new(false);
172static SIGNAL_SATURATION: AtomicBool = AtomicBool::new(false);
173
174struct FlightRecorderWorker {
175    rx: mpsc::Receiver<RecorderMessage>,
176    tick_sink: DataSink<TickEncoder>,
177    candle_sink: DataSink<CandleEncoder>,
178    fill_sink: DataSink<FillEncoder>,
179    order_sink: DataSink<OrderEncoder>,
180    book_sink: DataSink<OrderBookEncoder>,
181    signal_sink: DataSink<SignalEncoder>,
182    flush_interval: Duration,
183}
184
185impl FlightRecorderWorker {
186    async fn new(config: RecorderConfig, rx: mpsc::Receiver<RecorderMessage>) -> Result<Self> {
187        let writer_props = Arc::new(
188            WriterProperties::builder()
189                .set_compression(Compression::ZSTD(ZstdLevel::default()))
190                .build(),
191        );
192
193        let tick_dir = ensure_subdir(&config.root, TickEncoder::KIND).await?;
194        let candle_dir = ensure_subdir(&config.root, CandleEncoder::KIND).await?;
195        let fill_dir = ensure_subdir(&config.root, FillEncoder::KIND).await?;
196        let order_dir = ensure_subdir(&config.root, OrderEncoder::KIND).await?;
197        let book_dir = ensure_subdir(&config.root, OrderBookEncoder::KIND).await?;
198        let signal_dir = ensure_subdir(&config.root, SignalEncoder::KIND).await?;
199
200        Ok(Self {
201            rx,
202            tick_sink: DataSink::new(
203                tick_dir,
204                config.max_buffered_rows,
205                config.max_rows_per_file,
206                config.flush_interval,
207                writer_props.clone(),
208            ),
209            candle_sink: DataSink::new(
210                candle_dir,
211                config.max_buffered_rows,
212                config.max_rows_per_file,
213                config.flush_interval,
214                writer_props.clone(),
215            ),
216            fill_sink: DataSink::new(
217                fill_dir,
218                config.max_buffered_rows,
219                config.max_rows_per_file,
220                config.flush_interval,
221                writer_props.clone(),
222            ),
223            order_sink: DataSink::new(
224                order_dir,
225                config.max_buffered_rows,
226                config.max_rows_per_file,
227                config.flush_interval,
228                writer_props.clone(),
229            ),
230            book_sink: DataSink::new(
231                book_dir,
232                config.max_buffered_rows,
233                config.max_rows_per_file,
234                config.flush_interval,
235                writer_props.clone(),
236            ),
237            signal_sink: DataSink::new(
238                signal_dir,
239                config.max_buffered_rows,
240                config.max_rows_per_file,
241                config.flush_interval,
242                writer_props,
243            ),
244            flush_interval: config.flush_interval,
245        })
246    }
247
248    async fn run(mut self) -> Result<()> {
249        let mut timer = interval(self.flush_interval);
250        timer.set_missed_tick_behavior(MissedTickBehavior::Delay);
251
252        loop {
253            tokio::select! {
254                message = self.rx.recv() => {
255                    match message {
256                        Some(msg) => self.handle_message(msg).await?,
257                        None => break,
258                    }
259                }
260                _ = timer.tick() => {
261                    self.tick_sink.maybe_flush_due_time().await?;
262                    self.candle_sink.maybe_flush_due_time().await?;
263                    self.fill_sink.maybe_flush_due_time().await?;
264                    self.order_sink.maybe_flush_due_time().await?;
265                    self.book_sink.maybe_flush_due_time().await?;
266                    self.signal_sink.maybe_flush_due_time().await?;
267                }
268            }
269        }
270
271        self.tick_sink.shutdown().await?;
272        self.candle_sink.shutdown().await?;
273        self.fill_sink.shutdown().await?;
274        self.order_sink.shutdown().await?;
275        self.book_sink.shutdown().await?;
276        self.signal_sink.shutdown().await?;
277        Ok(())
278    }
279
280    async fn handle_message(&mut self, msg: RecorderMessage) -> Result<()> {
281        match msg {
282            RecorderMessage::Tick(tick) => self.tick_sink.push(tick).await?,
283            RecorderMessage::Candle(candle) => self.candle_sink.push(candle).await?,
284            RecorderMessage::Fill(fill) => self.fill_sink.push(fill).await?,
285            RecorderMessage::Order(order) => self.order_sink.push(order).await?,
286            RecorderMessage::OrderBook(book) => self.book_sink.push(book).await?,
287            RecorderMessage::Signal(signal) => self.signal_sink.push(signal).await?,
288        }
289        Ok(())
290    }
291}
292
293async fn ensure_subdir(root: &Path, kind: &str) -> Result<PathBuf> {
294    let path = root.join(kind);
295    fs::create_dir_all(&path)
296        .await
297        .with_context(|| format!("failed to create {}", path.display()))?;
298    Ok(path)
299}
300
301trait SinkEncoder {
302    type Record: Send + 'static;
303    const KIND: &'static str;
304
305    fn schema() -> Arc<arrow::datatypes::Schema>;
306    fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch>;
307    fn partition_for(record: &Self::Record) -> NaiveDate;
308}
309
310struct TickEncoder;
311struct CandleEncoder;
312struct FillEncoder;
313struct OrderEncoder;
314struct OrderBookEncoder;
315struct SignalEncoder;
316
317impl SinkEncoder for TickEncoder {
318    type Record = Tick;
319    const KIND: &'static str = "ticks";
320
321    fn schema() -> Arc<arrow::datatypes::Schema> {
322        tick_schema()
323    }
324
325    fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
326        ticks_to_batch(records)
327    }
328
329    fn partition_for(record: &Self::Record) -> NaiveDate {
330        record.exchange_timestamp.date_naive()
331    }
332}
333
334impl SinkEncoder for CandleEncoder {
335    type Record = Candle;
336    const KIND: &'static str = "candles";
337
338    fn schema() -> Arc<arrow::datatypes::Schema> {
339        candle_schema()
340    }
341
342    fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
343        candles_to_batch(records)
344    }
345
346    fn partition_for(record: &Self::Record) -> NaiveDate {
347        record.timestamp.date_naive()
348    }
349}
350
351impl SinkEncoder for FillEncoder {
352    type Record = Fill;
353    const KIND: &'static str = "fills";
354
355    fn schema() -> Arc<arrow::datatypes::Schema> {
356        fill_schema()
357    }
358
359    fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
360        fills_to_batch(records)
361    }
362
363    fn partition_for(record: &Self::Record) -> NaiveDate {
364        record.timestamp.date_naive()
365    }
366}
367
368impl SinkEncoder for OrderEncoder {
369    type Record = Order;
370    const KIND: &'static str = "orders";
371
372    fn schema() -> Arc<arrow::datatypes::Schema> {
373        order_schema()
374    }
375
376    fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
377        orders_to_batch(records)
378    }
379
380    fn partition_for(record: &Self::Record) -> NaiveDate {
381        record.updated_at.date_naive()
382    }
383}
384
385impl SinkEncoder for SignalEncoder {
386    type Record = Signal;
387    const KIND: &'static str = "signals";
388
389    fn schema() -> Arc<arrow::datatypes::Schema> {
390        signal_schema()
391    }
392
393    fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
394        signals_to_batch(records)
395    }
396
397    fn partition_for(record: &Self::Record) -> NaiveDate {
398        record.generated_at.date_naive()
399    }
400}
401
402struct DataSink<E: SinkEncoder> {
403    dir: PathBuf,
404    buffer: Vec<E::Record>,
405    writer: Option<ActiveWriter>,
406    partition: Option<NaiveDate>,
407    max_buffered_rows: usize,
408    max_rows_per_file: usize,
409    flush_interval: Duration,
410    last_flush: Instant,
411    properties: Arc<WriterProperties>,
412    file_seq: u64,
413    schema: Arc<arrow::datatypes::Schema>,
414    _marker: PhantomData<E>,
415}
416
417impl<E: SinkEncoder> DataSink<E> {
418    fn new(
419        dir: PathBuf,
420        max_buffered_rows: usize,
421        max_rows_per_file: usize,
422        flush_interval: Duration,
423        properties: Arc<WriterProperties>,
424    ) -> Self {
425        Self {
426            dir,
427            buffer: Vec::with_capacity(max_buffered_rows.max(1)),
428            writer: None,
429            partition: None,
430            max_buffered_rows: max_buffered_rows.max(1),
431            max_rows_per_file: max_rows_per_file.max(1),
432            flush_interval,
433            last_flush: Instant::now(),
434            properties,
435            file_seq: 0,
436            schema: E::schema(),
437            _marker: PhantomData,
438        }
439    }
440
441    async fn push(&mut self, record: E::Record) -> Result<()> {
442        let partition = E::partition_for(&record);
443        if self.partition != Some(partition) {
444            self.flush().await?;
445            self.close_writer().await?;
446            self.partition = Some(partition);
447        }
448        self.buffer.push(record);
449        if self.buffer.len() >= self.max_buffered_rows {
450            self.flush().await?;
451        }
452        Ok(())
453    }
454
455    async fn maybe_flush_due_time(&mut self) -> Result<()> {
456        if self.buffer.is_empty() {
457            return Ok(());
458        }
459        if self.last_flush.elapsed() >= self.flush_interval {
460            self.flush().await?;
461        }
462        Ok(())
463    }
464
465    async fn flush(&mut self) -> Result<()> {
466        if self.buffer.is_empty() {
467            return Ok(());
468        }
469        if self.partition.is_none() {
470            if let Some(first) = self.buffer.first() {
471                self.partition = Some(E::partition_for(first));
472            }
473        }
474        let partition = self
475            .partition
476            .ok_or_else(|| anyhow!("buffer partition undefined"))?;
477
478        self.ensure_writer(partition).await?;
479        let rows = std::mem::take(&mut self.buffer);
480        if rows.is_empty() {
481            return Ok(());
482        }
483        let row_count = rows.len();
484        let batch = match E::encode(&rows) {
485            Ok(batch) => batch,
486            Err(err) => {
487                warn!(
488                    kind = E::KIND,
489                    error = %err,
490                    dropped = row_count,
491                    "failed to encode flight recorder batch"
492                );
493                self.last_flush = Instant::now();
494                return Ok(());
495            }
496        };
497
498        if batch.num_rows() == 0 {
499            return Ok(());
500        }
501
502        if let Some(writer) = &mut self.writer {
503            writer.write(&batch).await?;
504            if writer.rows_written >= self.max_rows_per_file {
505                if let Some(writer) = self.writer.take() {
506                    writer.finish().await?;
507                }
508            }
509        }
510        self.last_flush = Instant::now();
511        Ok(())
512    }
513
514    async fn ensure_writer(&mut self, partition: NaiveDate) -> Result<()> {
515        if self.writer.is_some() {
516            return Ok(());
517        }
518        let date_dir = self.dir.join(partition.format("%Y-%m-%d").to_string());
519        fs::create_dir_all(&date_dir)
520            .await
521            .with_context(|| format!("failed to create {}", date_dir.display()))?;
522        let timestamp = Utc::now().format("%Y%m%dT%H%M%S");
523        let file_name = format!("{}-{}-{:04}.parquet", E::KIND, timestamp, self.file_seq);
524        self.file_seq = self.file_seq.wrapping_add(1);
525        let path = date_dir.join(file_name);
526        let file = File::create(&path)
527            .await
528            .with_context(|| format!("failed to create {}", path.display()))?;
529        let writer = AsyncArrowWriter::try_new(
530            file,
531            self.schema.clone(),
532            Some(self.properties.as_ref().clone()),
533        )?;
534        debug!(kind = E::KIND, path = %path.display(), "opened flight recorder file");
535        self.writer = Some(ActiveWriter {
536            writer,
537            rows_written: 0,
538            path,
539        });
540        Ok(())
541    }
542
543    async fn close_writer(&mut self) -> Result<()> {
544        if let Some(writer) = self.writer.take() {
545            writer.finish().await?;
546        }
547        Ok(())
548    }
549
550    async fn shutdown(&mut self) -> Result<()> {
551        self.flush().await?;
552        self.close_writer().await?;
553        Ok(())
554    }
555}
556
557struct ActiveWriter {
558    writer: AsyncArrowWriter<File>,
559    rows_written: usize,
560    path: PathBuf,
561}
562
563impl ActiveWriter {
564    async fn write(&mut self, batch: &arrow::record_batch::RecordBatch) -> Result<()> {
565        self.writer.write(batch).await?;
566        self.rows_written += batch.num_rows();
567        Ok(())
568    }
569
570    async fn finish(mut self) -> Result<()> {
571        self.writer.finish().await?;
572        debug!(path = %self.path.display(), "closed flight recorder file");
573        Ok(())
574    }
575}
576
577impl SinkEncoder for OrderBookEncoder {
578    type Record = OrderBook;
579    const KIND: &'static str = "order_books";
580
581    fn schema() -> Arc<arrow::datatypes::Schema> {
582        order_book_schema()
583    }
584
585    fn encode(records: &[Self::Record]) -> Result<arrow::record_batch::RecordBatch> {
586        order_books_to_batch(records)
587    }
588
589    fn partition_for(record: &Self::Record) -> NaiveDate {
590        record.timestamp.date_naive()
591    }
592}
593
594#[cfg(test)]
595mod tests {
596    use super::*;
597    use arrow::array::{
598        Array, Decimal128Array, Float64Array, StringArray, TimestampNanosecondArray,
599    };
600    use arrow::datatypes::DataType;
601    use arrow::record_batch::RecordBatch;
602    use chrono::Duration as ChronoDuration;
603    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
604    use rust_decimal::Decimal;
605    use tempfile::tempdir;
606    use tesser_core::{ExecutionHint, Signal, SignalKind};
607
608    fn build_signal() -> Signal {
609        let mut signal = Signal::new("BTCUSDT", SignalKind::EnterLong, 0.82);
610        signal.stop_loss = Some(Decimal::new(99_500, 2));
611        signal.take_profit = Some(Decimal::new(101_500, 2));
612        signal.generated_at = chrono::Utc::now();
613        signal.note = Some("trend-follow".into());
614        signal.execution_hint = Some(ExecutionHint::Twap {
615            duration: ChronoDuration::minutes(5),
616        });
617        signal
618    }
619
620    fn read_signal_batch(path: &Path) -> RecordBatch {
621        let file = std::fs::File::open(path).expect("open parquet file");
622        let builder = ParquetRecordBatchReaderBuilder::try_new(file).expect("build batch reader");
623        let mut reader = builder.build().expect("build reader");
624        reader.next().expect("batch result").expect("batch")
625    }
626
627    fn find_signal_file(root: &Path) -> PathBuf {
628        let signals_dir = root.join(SignalEncoder::KIND);
629        for day in std::fs::read_dir(&signals_dir).expect("read signal dir") {
630            let day = day.expect("entry");
631            if day.path().is_dir() {
632                for file in std::fs::read_dir(day.path()).expect("read partition dir") {
633                    let file = file.expect("file entry");
634                    if file
635                        .path()
636                        .extension()
637                        .map(|ext| ext == "parquet")
638                        .unwrap_or(false)
639                    {
640                        return file.path();
641                    }
642                }
643            }
644        }
645        panic!("no signal parquet files found in {}", signals_dir.display());
646    }
647
648    fn decimal_from_array(arr: &Decimal128Array, idx: usize) -> Decimal {
649        let scale = match arr.data_type() {
650            DataType::Decimal128(_, scale) => (*scale) as u32,
651            _ => panic!("unexpected data type"),
652        };
653        Decimal::from_i128_with_scale(arr.value(idx), scale)
654    }
655
656    #[tokio::test]
657    async fn writes_signal_batches() {
658        let signal = build_signal();
659        let temp = tempdir().expect("tempdir");
660        let config = RecorderConfig {
661            root: temp.path().to_path_buf(),
662            max_buffered_rows: 1,
663            flush_interval: Duration::from_millis(20),
664            max_rows_per_file: 16,
665            channel_capacity: 4,
666        };
667        let recorder = ParquetRecorder::spawn(config)
668            .await
669            .expect("spawn recorder");
670        let handle = recorder.handle();
671        handle.record_signal(signal.clone());
672        tokio::time::sleep(Duration::from_millis(50)).await;
673        drop(handle);
674        recorder.shutdown().await.expect("shutdown recorder");
675
676        let file_path = find_signal_file(temp.path());
677        let batch = read_signal_batch(&file_path);
678        assert_eq!(batch.num_rows(), 1);
679
680        let ids = batch
681            .column(batch.schema().index_of("id").unwrap())
682            .as_any()
683            .downcast_ref::<StringArray>()
684            .unwrap();
685        assert_eq!(ids.value(0), signal.id.to_string());
686
687        let kinds = batch
688            .column(batch.schema().index_of("kind").unwrap())
689            .as_any()
690            .downcast_ref::<StringArray>()
691            .unwrap();
692        assert_eq!(kinds.value(0), "enter_long");
693
694        let confidences = batch
695            .column(batch.schema().index_of("confidence").unwrap())
696            .as_any()
697            .downcast_ref::<Float64Array>()
698            .unwrap();
699        assert!((confidences.value(0) - signal.confidence).abs() < f64::EPSILON);
700
701        let stop_losses = batch
702            .column(batch.schema().index_of("stop_loss").unwrap())
703            .as_any()
704            .downcast_ref::<Decimal128Array>()
705            .unwrap();
706        assert_eq!(
707            decimal_from_array(stop_losses, 0),
708            signal.stop_loss.unwrap()
709        );
710
711        let generated = batch
712            .column(batch.schema().index_of("generated_at").unwrap())
713            .as_any()
714            .downcast_ref::<TimestampNanosecondArray>()
715            .unwrap();
716        let nanos = generated.value(0);
717        let secs = nanos / 1_000_000_000;
718        let rem = (nanos % 1_000_000_000) as u32;
719        let recorded_ts = chrono::DateTime::<chrono::Utc>::from_timestamp(secs, rem).unwrap();
720        assert_eq!(recorded_ts.timestamp(), signal.generated_at.timestamp());
721
722        let metadata = batch
723            .column(batch.schema().index_of("metadata").unwrap())
724            .as_any()
725            .downcast_ref::<StringArray>()
726            .unwrap();
727        let meta_value: serde_json::Value =
728            serde_json::from_str(metadata.value(0)).expect("metadata json");
729        assert_eq!(meta_value["note"], signal.note.as_ref().unwrap().as_str());
730    }
731}