nautilus_serialization/arrow/
trade.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19    array::{
20        FixedSizeBinaryArray, FixedSizeBinaryBuilder, StringArray, StringBuilder, StringViewArray,
21        UInt8Array, UInt64Array,
22    },
23    datatypes::{DataType, Field, Schema},
24    error::ArrowError,
25    record_batch::RecordBatch,
26};
27use nautilus_model::{
28    data::TradeTick,
29    enums::AggressorSide,
30    identifiers::{InstrumentId, TradeId},
31    types::fixed::PRECISION_BYTES,
32};
33
34use super::{
35    DecodeDataFromRecordBatch, EncodingError, KEY_INSTRUMENT_ID, KEY_PRICE_PRECISION,
36    KEY_SIZE_PRECISION, decode_price, decode_quantity, extract_column,
37};
38use crate::arrow::{ArrowSchemaProvider, Data, DecodeFromRecordBatch, EncodeToRecordBatch};
39
40impl ArrowSchemaProvider for TradeTick {
41    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
42        let fields = vec![
43            Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
44            Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
45            Field::new("aggressor_side", DataType::UInt8, false),
46            Field::new("trade_id", DataType::Utf8, false),
47            Field::new("ts_event", DataType::UInt64, false),
48            Field::new("ts_init", DataType::UInt64, false),
49        ];
50
51        match metadata {
52            Some(metadata) => Schema::new_with_metadata(fields, metadata),
53            None => Schema::new(fields),
54        }
55    }
56}
57
58fn parse_metadata(
59    metadata: &HashMap<String, String>,
60) -> Result<(InstrumentId, u8, u8), EncodingError> {
61    let instrument_id_str = metadata
62        .get(KEY_INSTRUMENT_ID)
63        .ok_or_else(|| EncodingError::MissingMetadata(KEY_INSTRUMENT_ID))?;
64    let instrument_id = InstrumentId::from_str(instrument_id_str)
65        .map_err(|e| EncodingError::ParseError(KEY_INSTRUMENT_ID, e.to_string()))?;
66
67    let price_precision = metadata
68        .get(KEY_PRICE_PRECISION)
69        .ok_or_else(|| EncodingError::MissingMetadata(KEY_PRICE_PRECISION))?
70        .parse::<u8>()
71        .map_err(|e| EncodingError::ParseError(KEY_PRICE_PRECISION, e.to_string()))?;
72
73    let size_precision = metadata
74        .get(KEY_SIZE_PRECISION)
75        .ok_or_else(|| EncodingError::MissingMetadata(KEY_SIZE_PRECISION))?
76        .parse::<u8>()
77        .map_err(|e| EncodingError::ParseError(KEY_SIZE_PRECISION, e.to_string()))?;
78
79    Ok((instrument_id, price_precision, size_precision))
80}
81
82impl EncodeToRecordBatch for TradeTick {
83    fn encode_batch(
84        metadata: &HashMap<String, String>,
85        data: &[Self],
86    ) -> Result<RecordBatch, ArrowError> {
87        let mut price_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
88        let mut size_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
89
90        let mut aggressor_side_builder = UInt8Array::builder(data.len());
91        let mut trade_id_builder = StringBuilder::new();
92        let mut ts_event_builder = UInt64Array::builder(data.len());
93        let mut ts_init_builder = UInt64Array::builder(data.len());
94
95        for tick in data {
96            price_builder
97                .append_value(tick.price.raw.to_le_bytes())
98                .unwrap();
99            size_builder
100                .append_value(tick.size.raw.to_le_bytes())
101                .unwrap();
102            aggressor_side_builder.append_value(tick.aggressor_side as u8);
103            trade_id_builder.append_value(tick.trade_id.to_string());
104            ts_event_builder.append_value(tick.ts_event.as_u64());
105            ts_init_builder.append_value(tick.ts_init.as_u64());
106        }
107
108        let price_array = Arc::new(price_builder.finish());
109        let size_array = Arc::new(size_builder.finish());
110        let aggressor_side_array = Arc::new(aggressor_side_builder.finish());
111        let trade_id_array = Arc::new(trade_id_builder.finish());
112        let ts_event_array = Arc::new(ts_event_builder.finish());
113        let ts_init_array = Arc::new(ts_init_builder.finish());
114
115        RecordBatch::try_new(
116            Self::get_schema(Some(metadata.clone())).into(),
117            vec![
118                price_array,
119                size_array,
120                aggressor_side_array,
121                trade_id_array,
122                ts_event_array,
123                ts_init_array,
124            ],
125        )
126    }
127
128    fn metadata(&self) -> HashMap<String, String> {
129        Self::get_metadata(
130            &self.instrument_id,
131            self.price.precision,
132            self.size.precision,
133        )
134    }
135}
136
137impl DecodeFromRecordBatch for TradeTick {
138    fn decode_batch(
139        metadata: &HashMap<String, String>,
140        record_batch: RecordBatch,
141    ) -> Result<Vec<Self>, EncodingError> {
142        let (instrument_id, price_precision, size_precision) = parse_metadata(metadata)?;
143        let cols = record_batch.columns();
144
145        let price_values = extract_column::<FixedSizeBinaryArray>(
146            cols,
147            "price",
148            0,
149            DataType::FixedSizeBinary(PRECISION_BYTES),
150        )?;
151
152        let size_values = extract_column::<FixedSizeBinaryArray>(
153            cols,
154            "size",
155            1,
156            DataType::FixedSizeBinary(PRECISION_BYTES),
157        )?;
158        let aggressor_side_values =
159            extract_column::<UInt8Array>(cols, "aggressor_side", 2, DataType::UInt8)?;
160        let ts_event_values = extract_column::<UInt64Array>(cols, "ts_event", 4, DataType::UInt64)?;
161        let ts_init_values = extract_column::<UInt64Array>(cols, "ts_init", 5, DataType::UInt64)?;
162
163        // Datafusion reads trade_ids as StringView
164        let trade_id_values: Vec<TradeId> = if record_batch
165            .schema()
166            .field_with_name("trade_id")?
167            .data_type()
168            == &DataType::Utf8View
169        {
170            extract_column::<StringViewArray>(cols, "trade_id", 3, DataType::Utf8View)?
171                .iter()
172                .enumerate()
173                .map(|(i, id)| {
174                    id.map(TradeId::from).ok_or_else(|| {
175                        EncodingError::ParseError("trade_id", format!("NULL value at row {i}"))
176                    })
177                })
178                .collect::<Result<Vec<_>, _>>()?
179        } else {
180            extract_column::<StringArray>(cols, "trade_id", 3, DataType::Utf8)?
181                .iter()
182                .enumerate()
183                .map(|(i, id)| {
184                    id.map(TradeId::from).ok_or_else(|| {
185                        EncodingError::ParseError("trade_id", format!("NULL value at row {i}"))
186                    })
187                })
188                .collect::<Result<Vec<_>, _>>()?
189        };
190
191        let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
192            .map(|i| {
193                let price = decode_price(price_values.value(i), price_precision, "price", i)?;
194                let size = decode_quantity(size_values.value(i), size_precision, "size", i)?;
195                let aggressor_side_value = aggressor_side_values.value(i);
196                let aggressor_side = AggressorSide::from_repr(aggressor_side_value as usize)
197                    .ok_or_else(|| {
198                        EncodingError::ParseError(
199                            stringify!(AggressorSide),
200                            format!("Invalid enum value, was {aggressor_side_value}"),
201                        )
202                    })?;
203                let trade_id = trade_id_values[i];
204                let ts_event = ts_event_values.value(i).into();
205                let ts_init = ts_init_values.value(i).into();
206
207                Ok(Self {
208                    instrument_id,
209                    price,
210                    size,
211                    aggressor_side,
212                    trade_id,
213                    ts_event,
214                    ts_init,
215                })
216            })
217            .collect();
218
219        result
220    }
221}
222
223impl DecodeDataFromRecordBatch for TradeTick {
224    fn decode_data_batch(
225        metadata: &HashMap<String, String>,
226        record_batch: RecordBatch,
227    ) -> Result<Vec<Data>, EncodingError> {
228        let ticks: Vec<Self> = Self::decode_batch(metadata, record_batch)?;
229        Ok(ticks.into_iter().map(Data::from).collect())
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use std::sync::Arc;
236
237    use arrow::{
238        array::{Array, FixedSizeBinaryArray, UInt8Array, UInt64Array},
239        record_batch::RecordBatch,
240    };
241    use nautilus_model::types::{
242        Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw,
243    };
244    use rstest::rstest;
245
246    use super::*;
247    use crate::arrow::{get_raw_price, get_raw_quantity};
248
249    #[rstest]
250    fn test_get_schema() {
251        let instrument_id = InstrumentId::from("AAPL.XNAS");
252        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
253        let schema = TradeTick::get_schema(Some(metadata.clone()));
254
255        let mut expected_fields = Vec::with_capacity(6);
256
257        expected_fields.push(Field::new(
258            "price",
259            DataType::FixedSizeBinary(PRECISION_BYTES),
260            false,
261        ));
262
263        expected_fields.extend(vec![
264            Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
265            Field::new("aggressor_side", DataType::UInt8, false),
266            Field::new("trade_id", DataType::Utf8, false),
267            Field::new("ts_event", DataType::UInt64, false),
268            Field::new("ts_init", DataType::UInt64, false),
269        ]);
270
271        let expected_schema = Schema::new_with_metadata(expected_fields, metadata);
272        assert_eq!(schema, expected_schema);
273    }
274
275    #[rstest]
276    fn test_get_schema_map() {
277        let schema_map = TradeTick::get_schema_map();
278        let mut expected_map = HashMap::new();
279
280        let precision_bytes = format!("FixedSizeBinary({PRECISION_BYTES})");
281        expected_map.insert("price".to_string(), precision_bytes.clone());
282        expected_map.insert("size".to_string(), precision_bytes);
283        expected_map.insert("aggressor_side".to_string(), "UInt8".to_string());
284        expected_map.insert("trade_id".to_string(), "Utf8".to_string());
285        expected_map.insert("ts_event".to_string(), "UInt64".to_string());
286        expected_map.insert("ts_init".to_string(), "UInt64".to_string());
287        assert_eq!(schema_map, expected_map);
288    }
289
290    #[rstest]
291    fn test_encode_trade_tick() {
292        let instrument_id = InstrumentId::from("AAPL.XNAS");
293        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
294
295        let tick1 = TradeTick {
296            instrument_id,
297            price: Price::from("100.10"),
298            size: Quantity::from(1000),
299            aggressor_side: AggressorSide::Buyer,
300            trade_id: TradeId::new("1"),
301            ts_event: 1.into(),
302            ts_init: 3.into(),
303        };
304
305        let tick2 = TradeTick {
306            instrument_id,
307            price: Price::from("100.50"),
308            size: Quantity::from(500),
309            aggressor_side: AggressorSide::Seller,
310            trade_id: TradeId::new("2"),
311            ts_event: 2.into(),
312            ts_init: 4.into(),
313        };
314
315        let data = vec![tick1, tick2];
316        let record_batch = TradeTick::encode_batch(&metadata, &data).unwrap();
317        let columns = record_batch.columns();
318
319        let price_values = columns[0]
320            .as_any()
321            .downcast_ref::<FixedSizeBinaryArray>()
322            .unwrap();
323        assert_eq!(
324            get_raw_price(price_values.value(0)),
325            (100.10 * FIXED_SCALAR) as PriceRaw
326        );
327        assert_eq!(
328            get_raw_price(price_values.value(1)),
329            (100.50 * FIXED_SCALAR) as PriceRaw
330        );
331
332        let size_values = columns[1]
333            .as_any()
334            .downcast_ref::<FixedSizeBinaryArray>()
335            .unwrap();
336        assert_eq!(
337            get_raw_quantity(size_values.value(0)),
338            (1000.0 * FIXED_SCALAR) as QuantityRaw
339        );
340        assert_eq!(
341            get_raw_quantity(size_values.value(1)),
342            (500.0 * FIXED_SCALAR) as QuantityRaw
343        );
344
345        let aggressor_side_values = columns[2].as_any().downcast_ref::<UInt8Array>().unwrap();
346        let trade_id_values = columns[3].as_any().downcast_ref::<StringArray>().unwrap();
347        let ts_event_values = columns[4].as_any().downcast_ref::<UInt64Array>().unwrap();
348        let ts_init_values = columns[5].as_any().downcast_ref::<UInt64Array>().unwrap();
349
350        assert_eq!(columns.len(), 6);
351        assert_eq!(size_values.len(), 2);
352        assert_eq!(
353            get_raw_quantity(size_values.value(0)),
354            (1000.0 * FIXED_SCALAR) as QuantityRaw
355        );
356        assert_eq!(
357            get_raw_quantity(size_values.value(1)),
358            (500.0 * FIXED_SCALAR) as QuantityRaw
359        );
360        assert_eq!(aggressor_side_values.len(), 2);
361        assert_eq!(aggressor_side_values.value(0), 1);
362        assert_eq!(aggressor_side_values.value(1), 2);
363        assert_eq!(trade_id_values.len(), 2);
364        assert_eq!(trade_id_values.value(0), "1");
365        assert_eq!(trade_id_values.value(1), "2");
366        assert_eq!(ts_event_values.len(), 2);
367        assert_eq!(ts_event_values.value(0), 1);
368        assert_eq!(ts_event_values.value(1), 2);
369        assert_eq!(ts_init_values.len(), 2);
370        assert_eq!(ts_init_values.value(0), 3);
371        assert_eq!(ts_init_values.value(1), 4);
372    }
373
374    #[rstest]
375    fn test_decode_batch() {
376        let instrument_id = InstrumentId::from("AAPL.XNAS");
377        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
378
379        let raw_price1 = (100.00 * FIXED_SCALAR) as PriceRaw;
380        let raw_price2 = (101.00 * FIXED_SCALAR) as PriceRaw;
381        let price =
382            FixedSizeBinaryArray::from(vec![&raw_price1.to_le_bytes(), &raw_price2.to_le_bytes()]);
383
384        let size = FixedSizeBinaryArray::from(vec![
385            &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
386            &((900.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
387        ]);
388        let aggressor_side = UInt8Array::from(vec![0, 1]); // 0 for BUY, 1 for SELL
389        let trade_id = StringArray::from(vec!["1", "2"]);
390        let ts_event = UInt64Array::from(vec![1, 2]);
391        let ts_init = UInt64Array::from(vec![3, 4]);
392
393        let record_batch = RecordBatch::try_new(
394            TradeTick::get_schema(Some(metadata.clone())).into(),
395            vec![
396                Arc::new(price),
397                Arc::new(size),
398                Arc::new(aggressor_side),
399                Arc::new(trade_id),
400                Arc::new(ts_event),
401                Arc::new(ts_init),
402            ],
403        )
404        .unwrap();
405
406        let decoded_data = TradeTick::decode_batch(&metadata, record_batch).unwrap();
407        assert_eq!(decoded_data.len(), 2);
408        assert_eq!(decoded_data[0].price, Price::from_raw(raw_price1, 2));
409        assert_eq!(decoded_data[1].price, Price::from_raw(raw_price2, 2));
410    }
411
412    #[rstest]
413    fn test_decode_batch_null_trade_id_returns_error() {
414        use arrow::datatypes::Field;
415
416        let instrument_id = InstrumentId::from("AAPL.XNAS");
417        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
418
419        let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
420        let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
421        let size = FixedSizeBinaryArray::from(vec![
422            &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
423        ]);
424        let aggressor_side = UInt8Array::from(vec![0]);
425
426        let trade_id: StringArray = vec![None::<&str>].into();
427        let ts_event = UInt64Array::from(vec![1]);
428        let ts_init = UInt64Array::from(vec![2]);
429
430        // Create schema with nullable trade_id to simulate external data source
431        let fields = vec![
432            Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
433            Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
434            Field::new("aggressor_side", DataType::UInt8, false),
435            Field::new("trade_id", DataType::Utf8, true), // nullable
436            Field::new("ts_event", DataType::UInt64, false),
437            Field::new("ts_init", DataType::UInt64, false),
438        ];
439        let schema = Schema::new_with_metadata(fields, metadata.clone());
440
441        let record_batch = RecordBatch::try_new(
442            schema.into(),
443            vec![
444                Arc::new(price),
445                Arc::new(size),
446                Arc::new(aggressor_side),
447                Arc::new(trade_id),
448                Arc::new(ts_event),
449                Arc::new(ts_init),
450            ],
451        )
452        .unwrap();
453
454        let result = TradeTick::decode_batch(&metadata, record_batch);
455        assert!(result.is_err());
456        let err = result.unwrap_err();
457        assert!(
458            err.to_string().contains("NULL value at row 0"),
459            "Expected NULL error, got: {err}"
460        );
461    }
462
463    #[rstest]
464    fn test_decode_batch_invalid_price_returns_error() {
465        let instrument_id = InstrumentId::from("AAPL.XNAS");
466        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
467
468        let invalid_price: PriceRaw = PriceRaw::MAX - 1000;
469        let price = FixedSizeBinaryArray::from(vec![&invalid_price.to_le_bytes()]);
470        let size = FixedSizeBinaryArray::from(vec![
471            &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
472        ]);
473        let aggressor_side = UInt8Array::from(vec![0]);
474        let trade_id = StringArray::from(vec!["1"]);
475        let ts_event = UInt64Array::from(vec![1]);
476        let ts_init = UInt64Array::from(vec![2]);
477
478        let record_batch = RecordBatch::try_new(
479            TradeTick::get_schema(Some(metadata.clone())).into(),
480            vec![
481                Arc::new(price),
482                Arc::new(size),
483                Arc::new(aggressor_side),
484                Arc::new(trade_id),
485                Arc::new(ts_event),
486                Arc::new(ts_init),
487            ],
488        )
489        .unwrap();
490
491        let result = TradeTick::decode_batch(&metadata, record_batch);
492        assert!(result.is_err());
493        let err = result.unwrap_err();
494        assert!(
495            err.to_string().contains("price") && err.to_string().contains("row 0"),
496            "Expected price error at row 0, got: {err}"
497        );
498    }
499
500    #[rstest]
501    fn test_decode_batch_invalid_size_returns_error() {
502        use nautilus_model::types::quantity::QUANTITY_RAW_MAX;
503
504        let instrument_id = InstrumentId::from("AAPL.XNAS");
505        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
506
507        let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
508        let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
509
510        let invalid_size = QUANTITY_RAW_MAX + 1;
511        let size = FixedSizeBinaryArray::from(vec![&invalid_size.to_le_bytes()]);
512        let aggressor_side = UInt8Array::from(vec![0]);
513        let trade_id = StringArray::from(vec!["1"]);
514        let ts_event = UInt64Array::from(vec![1]);
515        let ts_init = UInt64Array::from(vec![2]);
516
517        let record_batch = RecordBatch::try_new(
518            TradeTick::get_schema(Some(metadata.clone())).into(),
519            vec![
520                Arc::new(price),
521                Arc::new(size),
522                Arc::new(aggressor_side),
523                Arc::new(trade_id),
524                Arc::new(ts_event),
525                Arc::new(ts_init),
526            ],
527        )
528        .unwrap();
529
530        let result = TradeTick::decode_batch(&metadata, record_batch);
531        assert!(result.is_err());
532        let err = result.unwrap_err();
533        assert!(
534            err.to_string().contains("size") && err.to_string().contains("row 0"),
535            "Expected size error at row 0, got: {err}"
536        );
537    }
538
539    #[rstest]
540    fn test_decode_batch_invalid_aggressor_side_returns_error() {
541        let instrument_id = InstrumentId::from("AAPL.XNAS");
542        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
543
544        let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
545        let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
546        let size = FixedSizeBinaryArray::from(vec![
547            &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
548        ]);
549
550        let aggressor_side = UInt8Array::from(vec![99]);
551        let trade_id = StringArray::from(vec!["1"]);
552        let ts_event = UInt64Array::from(vec![1]);
553        let ts_init = UInt64Array::from(vec![2]);
554
555        let record_batch = RecordBatch::try_new(
556            TradeTick::get_schema(Some(metadata.clone())).into(),
557            vec![
558                Arc::new(price),
559                Arc::new(size),
560                Arc::new(aggressor_side),
561                Arc::new(trade_id),
562                Arc::new(ts_event),
563                Arc::new(ts_init),
564            ],
565        )
566        .unwrap();
567
568        let result = TradeTick::decode_batch(&metadata, record_batch);
569        assert!(result.is_err());
570        let err = result.unwrap_err();
571        assert!(
572            err.to_string().contains("AggressorSide"),
573            "Expected AggressorSide error, got: {err}"
574        );
575    }
576
577    #[rstest]
578    fn test_decode_batch_missing_instrument_id_returns_error() {
579        let instrument_id = InstrumentId::from("AAPL.XNAS");
580        let mut metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
581        metadata.remove(KEY_INSTRUMENT_ID);
582
583        let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
584        let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
585        let size = FixedSizeBinaryArray::from(vec![
586            &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
587        ]);
588        let aggressor_side = UInt8Array::from(vec![0]);
589        let trade_id = StringArray::from(vec!["1"]);
590        let ts_event = UInt64Array::from(vec![1]);
591        let ts_init = UInt64Array::from(vec![2]);
592
593        let record_batch = RecordBatch::try_new(
594            TradeTick::get_schema(Some(metadata.clone())).into(),
595            vec![
596                Arc::new(price),
597                Arc::new(size),
598                Arc::new(aggressor_side),
599                Arc::new(trade_id),
600                Arc::new(ts_event),
601                Arc::new(ts_init),
602            ],
603        )
604        .unwrap();
605
606        let result = TradeTick::decode_batch(&metadata, record_batch);
607        assert!(result.is_err());
608        let err = result.unwrap_err();
609        assert!(
610            err.to_string().contains("instrument_id"),
611            "Expected missing instrument_id error, got: {err}"
612        );
613    }
614
615    #[rstest]
616    fn test_decode_batch_missing_price_precision_returns_error() {
617        let instrument_id = InstrumentId::from("AAPL.XNAS");
618        let mut metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
619        metadata.remove(KEY_PRICE_PRECISION);
620
621        let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
622        let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
623        let size = FixedSizeBinaryArray::from(vec![
624            &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
625        ]);
626        let aggressor_side = UInt8Array::from(vec![0]);
627        let trade_id = StringArray::from(vec!["1"]);
628        let ts_event = UInt64Array::from(vec![1]);
629        let ts_init = UInt64Array::from(vec![2]);
630
631        let record_batch = RecordBatch::try_new(
632            TradeTick::get_schema(Some(metadata.clone())).into(),
633            vec![
634                Arc::new(price),
635                Arc::new(size),
636                Arc::new(aggressor_side),
637                Arc::new(trade_id),
638                Arc::new(ts_event),
639                Arc::new(ts_init),
640            ],
641        )
642        .unwrap();
643
644        let result = TradeTick::decode_batch(&metadata, record_batch);
645        assert!(result.is_err());
646        let err = result.unwrap_err();
647        assert!(
648            err.to_string().contains("price_precision"),
649            "Expected missing price_precision error, got: {err}"
650        );
651    }
652
653    #[rstest]
654    fn test_encode_decode_round_trip() {
655        let instrument_id = InstrumentId::from("AAPL.XNAS");
656        let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
657
658        let tick1 = TradeTick {
659            instrument_id,
660            price: Price::from("100.10"),
661            size: Quantity::from(1000),
662            aggressor_side: AggressorSide::Buyer,
663            trade_id: TradeId::new("trade-123"),
664            ts_event: 1_000_000_000.into(),
665            ts_init: 1_000_000_001.into(),
666        };
667
668        let tick2 = TradeTick {
669            instrument_id,
670            price: Price::from("100.50"),
671            size: Quantity::from(500),
672            aggressor_side: AggressorSide::Seller,
673            trade_id: TradeId::new("trade-456"),
674            ts_event: 2_000_000_000.into(),
675            ts_init: 2_000_000_001.into(),
676        };
677
678        let original = vec![tick1, tick2];
679        let record_batch = TradeTick::encode_batch(&metadata, &original).unwrap();
680        let decoded = TradeTick::decode_batch(&metadata, record_batch).unwrap();
681
682        assert_eq!(decoded.len(), original.len());
683        for (orig, dec) in original.iter().zip(decoded.iter()) {
684            assert_eq!(dec.instrument_id, orig.instrument_id);
685            assert_eq!(dec.price, orig.price);
686            assert_eq!(dec.size, orig.size);
687            assert_eq!(dec.aggressor_side, orig.aggressor_side);
688            assert_eq!(dec.trade_id, orig.trade_id);
689            assert_eq!(dec.ts_event, orig.ts_event);
690            assert_eq!(dec.ts_init, orig.ts_init);
691        }
692    }
693}