1use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19 array::{
20 FixedSizeBinaryArray, FixedSizeBinaryBuilder, StringArray, StringBuilder, StringViewArray,
21 UInt8Array, UInt64Array,
22 },
23 datatypes::{DataType, Field, Schema},
24 error::ArrowError,
25 record_batch::RecordBatch,
26};
27use nautilus_model::{
28 data::TradeTick,
29 enums::AggressorSide,
30 identifiers::{InstrumentId, TradeId},
31 types::fixed::PRECISION_BYTES,
32};
33
34use super::{
35 DecodeDataFromRecordBatch, EncodingError, KEY_INSTRUMENT_ID, KEY_PRICE_PRECISION,
36 KEY_SIZE_PRECISION, decode_price, decode_quantity, extract_column,
37};
38use crate::arrow::{ArrowSchemaProvider, Data, DecodeFromRecordBatch, EncodeToRecordBatch};
39
40impl ArrowSchemaProvider for TradeTick {
41 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
42 let fields = vec![
43 Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
44 Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
45 Field::new("aggressor_side", DataType::UInt8, false),
46 Field::new("trade_id", DataType::Utf8, false),
47 Field::new("ts_event", DataType::UInt64, false),
48 Field::new("ts_init", DataType::UInt64, false),
49 ];
50
51 match metadata {
52 Some(metadata) => Schema::new_with_metadata(fields, metadata),
53 None => Schema::new(fields),
54 }
55 }
56}
57
58fn parse_metadata(
59 metadata: &HashMap<String, String>,
60) -> Result<(InstrumentId, u8, u8), EncodingError> {
61 let instrument_id_str = metadata
62 .get(KEY_INSTRUMENT_ID)
63 .ok_or_else(|| EncodingError::MissingMetadata(KEY_INSTRUMENT_ID))?;
64 let instrument_id = InstrumentId::from_str(instrument_id_str)
65 .map_err(|e| EncodingError::ParseError(KEY_INSTRUMENT_ID, e.to_string()))?;
66
67 let price_precision = metadata
68 .get(KEY_PRICE_PRECISION)
69 .ok_or_else(|| EncodingError::MissingMetadata(KEY_PRICE_PRECISION))?
70 .parse::<u8>()
71 .map_err(|e| EncodingError::ParseError(KEY_PRICE_PRECISION, e.to_string()))?;
72
73 let size_precision = metadata
74 .get(KEY_SIZE_PRECISION)
75 .ok_or_else(|| EncodingError::MissingMetadata(KEY_SIZE_PRECISION))?
76 .parse::<u8>()
77 .map_err(|e| EncodingError::ParseError(KEY_SIZE_PRECISION, e.to_string()))?;
78
79 Ok((instrument_id, price_precision, size_precision))
80}
81
82impl EncodeToRecordBatch for TradeTick {
83 fn encode_batch(
84 metadata: &HashMap<String, String>,
85 data: &[Self],
86 ) -> Result<RecordBatch, ArrowError> {
87 let mut price_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
88 let mut size_builder = FixedSizeBinaryBuilder::with_capacity(data.len(), PRECISION_BYTES);
89
90 let mut aggressor_side_builder = UInt8Array::builder(data.len());
91 let mut trade_id_builder = StringBuilder::new();
92 let mut ts_event_builder = UInt64Array::builder(data.len());
93 let mut ts_init_builder = UInt64Array::builder(data.len());
94
95 for tick in data {
96 price_builder
97 .append_value(tick.price.raw.to_le_bytes())
98 .unwrap();
99 size_builder
100 .append_value(tick.size.raw.to_le_bytes())
101 .unwrap();
102 aggressor_side_builder.append_value(tick.aggressor_side as u8);
103 trade_id_builder.append_value(tick.trade_id.to_string());
104 ts_event_builder.append_value(tick.ts_event.as_u64());
105 ts_init_builder.append_value(tick.ts_init.as_u64());
106 }
107
108 let price_array = Arc::new(price_builder.finish());
109 let size_array = Arc::new(size_builder.finish());
110 let aggressor_side_array = Arc::new(aggressor_side_builder.finish());
111 let trade_id_array = Arc::new(trade_id_builder.finish());
112 let ts_event_array = Arc::new(ts_event_builder.finish());
113 let ts_init_array = Arc::new(ts_init_builder.finish());
114
115 RecordBatch::try_new(
116 Self::get_schema(Some(metadata.clone())).into(),
117 vec![
118 price_array,
119 size_array,
120 aggressor_side_array,
121 trade_id_array,
122 ts_event_array,
123 ts_init_array,
124 ],
125 )
126 }
127
128 fn metadata(&self) -> HashMap<String, String> {
129 Self::get_metadata(
130 &self.instrument_id,
131 self.price.precision,
132 self.size.precision,
133 )
134 }
135}
136
137impl DecodeFromRecordBatch for TradeTick {
138 fn decode_batch(
139 metadata: &HashMap<String, String>,
140 record_batch: RecordBatch,
141 ) -> Result<Vec<Self>, EncodingError> {
142 let (instrument_id, price_precision, size_precision) = parse_metadata(metadata)?;
143 let cols = record_batch.columns();
144
145 let price_values = extract_column::<FixedSizeBinaryArray>(
146 cols,
147 "price",
148 0,
149 DataType::FixedSizeBinary(PRECISION_BYTES),
150 )?;
151
152 let size_values = extract_column::<FixedSizeBinaryArray>(
153 cols,
154 "size",
155 1,
156 DataType::FixedSizeBinary(PRECISION_BYTES),
157 )?;
158 let aggressor_side_values =
159 extract_column::<UInt8Array>(cols, "aggressor_side", 2, DataType::UInt8)?;
160 let ts_event_values = extract_column::<UInt64Array>(cols, "ts_event", 4, DataType::UInt64)?;
161 let ts_init_values = extract_column::<UInt64Array>(cols, "ts_init", 5, DataType::UInt64)?;
162
163 let trade_id_values: Vec<TradeId> = if record_batch
165 .schema()
166 .field_with_name("trade_id")?
167 .data_type()
168 == &DataType::Utf8View
169 {
170 extract_column::<StringViewArray>(cols, "trade_id", 3, DataType::Utf8View)?
171 .iter()
172 .enumerate()
173 .map(|(i, id)| {
174 id.map(TradeId::from).ok_or_else(|| {
175 EncodingError::ParseError("trade_id", format!("NULL value at row {i}"))
176 })
177 })
178 .collect::<Result<Vec<_>, _>>()?
179 } else {
180 extract_column::<StringArray>(cols, "trade_id", 3, DataType::Utf8)?
181 .iter()
182 .enumerate()
183 .map(|(i, id)| {
184 id.map(TradeId::from).ok_or_else(|| {
185 EncodingError::ParseError("trade_id", format!("NULL value at row {i}"))
186 })
187 })
188 .collect::<Result<Vec<_>, _>>()?
189 };
190
191 let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
192 .map(|i| {
193 let price = decode_price(price_values.value(i), price_precision, "price", i)?;
194 let size = decode_quantity(size_values.value(i), size_precision, "size", i)?;
195 let aggressor_side_value = aggressor_side_values.value(i);
196 let aggressor_side = AggressorSide::from_repr(aggressor_side_value as usize)
197 .ok_or_else(|| {
198 EncodingError::ParseError(
199 stringify!(AggressorSide),
200 format!("Invalid enum value, was {aggressor_side_value}"),
201 )
202 })?;
203 let trade_id = trade_id_values[i];
204 let ts_event = ts_event_values.value(i).into();
205 let ts_init = ts_init_values.value(i).into();
206
207 Ok(Self {
208 instrument_id,
209 price,
210 size,
211 aggressor_side,
212 trade_id,
213 ts_event,
214 ts_init,
215 })
216 })
217 .collect();
218
219 result
220 }
221}
222
223impl DecodeDataFromRecordBatch for TradeTick {
224 fn decode_data_batch(
225 metadata: &HashMap<String, String>,
226 record_batch: RecordBatch,
227 ) -> Result<Vec<Data>, EncodingError> {
228 let ticks: Vec<Self> = Self::decode_batch(metadata, record_batch)?;
229 Ok(ticks.into_iter().map(Data::from).collect())
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use std::sync::Arc;
236
237 use arrow::{
238 array::{Array, FixedSizeBinaryArray, UInt8Array, UInt64Array},
239 record_batch::RecordBatch,
240 };
241 use nautilus_model::types::{
242 Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw,
243 };
244 use rstest::rstest;
245
246 use super::*;
247 use crate::arrow::{get_raw_price, get_raw_quantity};
248
249 #[rstest]
250 fn test_get_schema() {
251 let instrument_id = InstrumentId::from("AAPL.XNAS");
252 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
253 let schema = TradeTick::get_schema(Some(metadata.clone()));
254
255 let mut expected_fields = Vec::with_capacity(6);
256
257 expected_fields.push(Field::new(
258 "price",
259 DataType::FixedSizeBinary(PRECISION_BYTES),
260 false,
261 ));
262
263 expected_fields.extend(vec![
264 Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
265 Field::new("aggressor_side", DataType::UInt8, false),
266 Field::new("trade_id", DataType::Utf8, false),
267 Field::new("ts_event", DataType::UInt64, false),
268 Field::new("ts_init", DataType::UInt64, false),
269 ]);
270
271 let expected_schema = Schema::new_with_metadata(expected_fields, metadata);
272 assert_eq!(schema, expected_schema);
273 }
274
275 #[rstest]
276 fn test_get_schema_map() {
277 let schema_map = TradeTick::get_schema_map();
278 let mut expected_map = HashMap::new();
279
280 let precision_bytes = format!("FixedSizeBinary({PRECISION_BYTES})");
281 expected_map.insert("price".to_string(), precision_bytes.clone());
282 expected_map.insert("size".to_string(), precision_bytes);
283 expected_map.insert("aggressor_side".to_string(), "UInt8".to_string());
284 expected_map.insert("trade_id".to_string(), "Utf8".to_string());
285 expected_map.insert("ts_event".to_string(), "UInt64".to_string());
286 expected_map.insert("ts_init".to_string(), "UInt64".to_string());
287 assert_eq!(schema_map, expected_map);
288 }
289
290 #[rstest]
291 fn test_encode_trade_tick() {
292 let instrument_id = InstrumentId::from("AAPL.XNAS");
293 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
294
295 let tick1 = TradeTick {
296 instrument_id,
297 price: Price::from("100.10"),
298 size: Quantity::from(1000),
299 aggressor_side: AggressorSide::Buyer,
300 trade_id: TradeId::new("1"),
301 ts_event: 1.into(),
302 ts_init: 3.into(),
303 };
304
305 let tick2 = TradeTick {
306 instrument_id,
307 price: Price::from("100.50"),
308 size: Quantity::from(500),
309 aggressor_side: AggressorSide::Seller,
310 trade_id: TradeId::new("2"),
311 ts_event: 2.into(),
312 ts_init: 4.into(),
313 };
314
315 let data = vec![tick1, tick2];
316 let record_batch = TradeTick::encode_batch(&metadata, &data).unwrap();
317 let columns = record_batch.columns();
318
319 let price_values = columns[0]
320 .as_any()
321 .downcast_ref::<FixedSizeBinaryArray>()
322 .unwrap();
323 assert_eq!(
324 get_raw_price(price_values.value(0)),
325 (100.10 * FIXED_SCALAR) as PriceRaw
326 );
327 assert_eq!(
328 get_raw_price(price_values.value(1)),
329 (100.50 * FIXED_SCALAR) as PriceRaw
330 );
331
332 let size_values = columns[1]
333 .as_any()
334 .downcast_ref::<FixedSizeBinaryArray>()
335 .unwrap();
336 assert_eq!(
337 get_raw_quantity(size_values.value(0)),
338 (1000.0 * FIXED_SCALAR) as QuantityRaw
339 );
340 assert_eq!(
341 get_raw_quantity(size_values.value(1)),
342 (500.0 * FIXED_SCALAR) as QuantityRaw
343 );
344
345 let aggressor_side_values = columns[2].as_any().downcast_ref::<UInt8Array>().unwrap();
346 let trade_id_values = columns[3].as_any().downcast_ref::<StringArray>().unwrap();
347 let ts_event_values = columns[4].as_any().downcast_ref::<UInt64Array>().unwrap();
348 let ts_init_values = columns[5].as_any().downcast_ref::<UInt64Array>().unwrap();
349
350 assert_eq!(columns.len(), 6);
351 assert_eq!(size_values.len(), 2);
352 assert_eq!(
353 get_raw_quantity(size_values.value(0)),
354 (1000.0 * FIXED_SCALAR) as QuantityRaw
355 );
356 assert_eq!(
357 get_raw_quantity(size_values.value(1)),
358 (500.0 * FIXED_SCALAR) as QuantityRaw
359 );
360 assert_eq!(aggressor_side_values.len(), 2);
361 assert_eq!(aggressor_side_values.value(0), 1);
362 assert_eq!(aggressor_side_values.value(1), 2);
363 assert_eq!(trade_id_values.len(), 2);
364 assert_eq!(trade_id_values.value(0), "1");
365 assert_eq!(trade_id_values.value(1), "2");
366 assert_eq!(ts_event_values.len(), 2);
367 assert_eq!(ts_event_values.value(0), 1);
368 assert_eq!(ts_event_values.value(1), 2);
369 assert_eq!(ts_init_values.len(), 2);
370 assert_eq!(ts_init_values.value(0), 3);
371 assert_eq!(ts_init_values.value(1), 4);
372 }
373
374 #[rstest]
375 fn test_decode_batch() {
376 let instrument_id = InstrumentId::from("AAPL.XNAS");
377 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
378
379 let raw_price1 = (100.00 * FIXED_SCALAR) as PriceRaw;
380 let raw_price2 = (101.00 * FIXED_SCALAR) as PriceRaw;
381 let price =
382 FixedSizeBinaryArray::from(vec![&raw_price1.to_le_bytes(), &raw_price2.to_le_bytes()]);
383
384 let size = FixedSizeBinaryArray::from(vec![
385 &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
386 &((900.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
387 ]);
388 let aggressor_side = UInt8Array::from(vec![0, 1]); let trade_id = StringArray::from(vec!["1", "2"]);
390 let ts_event = UInt64Array::from(vec![1, 2]);
391 let ts_init = UInt64Array::from(vec![3, 4]);
392
393 let record_batch = RecordBatch::try_new(
394 TradeTick::get_schema(Some(metadata.clone())).into(),
395 vec![
396 Arc::new(price),
397 Arc::new(size),
398 Arc::new(aggressor_side),
399 Arc::new(trade_id),
400 Arc::new(ts_event),
401 Arc::new(ts_init),
402 ],
403 )
404 .unwrap();
405
406 let decoded_data = TradeTick::decode_batch(&metadata, record_batch).unwrap();
407 assert_eq!(decoded_data.len(), 2);
408 assert_eq!(decoded_data[0].price, Price::from_raw(raw_price1, 2));
409 assert_eq!(decoded_data[1].price, Price::from_raw(raw_price2, 2));
410 }
411
412 #[rstest]
413 fn test_decode_batch_null_trade_id_returns_error() {
414 use arrow::datatypes::Field;
415
416 let instrument_id = InstrumentId::from("AAPL.XNAS");
417 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
418
419 let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
420 let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
421 let size = FixedSizeBinaryArray::from(vec![
422 &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
423 ]);
424 let aggressor_side = UInt8Array::from(vec![0]);
425
426 let trade_id: StringArray = vec![None::<&str>].into();
427 let ts_event = UInt64Array::from(vec![1]);
428 let ts_init = UInt64Array::from(vec![2]);
429
430 let fields = vec![
432 Field::new("price", DataType::FixedSizeBinary(PRECISION_BYTES), false),
433 Field::new("size", DataType::FixedSizeBinary(PRECISION_BYTES), false),
434 Field::new("aggressor_side", DataType::UInt8, false),
435 Field::new("trade_id", DataType::Utf8, true), Field::new("ts_event", DataType::UInt64, false),
437 Field::new("ts_init", DataType::UInt64, false),
438 ];
439 let schema = Schema::new_with_metadata(fields, metadata.clone());
440
441 let record_batch = RecordBatch::try_new(
442 schema.into(),
443 vec![
444 Arc::new(price),
445 Arc::new(size),
446 Arc::new(aggressor_side),
447 Arc::new(trade_id),
448 Arc::new(ts_event),
449 Arc::new(ts_init),
450 ],
451 )
452 .unwrap();
453
454 let result = TradeTick::decode_batch(&metadata, record_batch);
455 assert!(result.is_err());
456 let err = result.unwrap_err();
457 assert!(
458 err.to_string().contains("NULL value at row 0"),
459 "Expected NULL error, got: {err}"
460 );
461 }
462
463 #[rstest]
464 fn test_decode_batch_invalid_price_returns_error() {
465 let instrument_id = InstrumentId::from("AAPL.XNAS");
466 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
467
468 let invalid_price: PriceRaw = PriceRaw::MAX - 1000;
469 let price = FixedSizeBinaryArray::from(vec![&invalid_price.to_le_bytes()]);
470 let size = FixedSizeBinaryArray::from(vec![
471 &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
472 ]);
473 let aggressor_side = UInt8Array::from(vec![0]);
474 let trade_id = StringArray::from(vec!["1"]);
475 let ts_event = UInt64Array::from(vec![1]);
476 let ts_init = UInt64Array::from(vec![2]);
477
478 let record_batch = RecordBatch::try_new(
479 TradeTick::get_schema(Some(metadata.clone())).into(),
480 vec![
481 Arc::new(price),
482 Arc::new(size),
483 Arc::new(aggressor_side),
484 Arc::new(trade_id),
485 Arc::new(ts_event),
486 Arc::new(ts_init),
487 ],
488 )
489 .unwrap();
490
491 let result = TradeTick::decode_batch(&metadata, record_batch);
492 assert!(result.is_err());
493 let err = result.unwrap_err();
494 assert!(
495 err.to_string().contains("price") && err.to_string().contains("row 0"),
496 "Expected price error at row 0, got: {err}"
497 );
498 }
499
500 #[rstest]
501 fn test_decode_batch_invalid_size_returns_error() {
502 use nautilus_model::types::quantity::QUANTITY_RAW_MAX;
503
504 let instrument_id = InstrumentId::from("AAPL.XNAS");
505 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
506
507 let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
508 let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
509
510 let invalid_size = QUANTITY_RAW_MAX + 1;
511 let size = FixedSizeBinaryArray::from(vec![&invalid_size.to_le_bytes()]);
512 let aggressor_side = UInt8Array::from(vec![0]);
513 let trade_id = StringArray::from(vec!["1"]);
514 let ts_event = UInt64Array::from(vec![1]);
515 let ts_init = UInt64Array::from(vec![2]);
516
517 let record_batch = RecordBatch::try_new(
518 TradeTick::get_schema(Some(metadata.clone())).into(),
519 vec![
520 Arc::new(price),
521 Arc::new(size),
522 Arc::new(aggressor_side),
523 Arc::new(trade_id),
524 Arc::new(ts_event),
525 Arc::new(ts_init),
526 ],
527 )
528 .unwrap();
529
530 let result = TradeTick::decode_batch(&metadata, record_batch);
531 assert!(result.is_err());
532 let err = result.unwrap_err();
533 assert!(
534 err.to_string().contains("size") && err.to_string().contains("row 0"),
535 "Expected size error at row 0, got: {err}"
536 );
537 }
538
539 #[rstest]
540 fn test_decode_batch_invalid_aggressor_side_returns_error() {
541 let instrument_id = InstrumentId::from("AAPL.XNAS");
542 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
543
544 let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
545 let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
546 let size = FixedSizeBinaryArray::from(vec![
547 &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
548 ]);
549
550 let aggressor_side = UInt8Array::from(vec![99]);
551 let trade_id = StringArray::from(vec!["1"]);
552 let ts_event = UInt64Array::from(vec![1]);
553 let ts_init = UInt64Array::from(vec![2]);
554
555 let record_batch = RecordBatch::try_new(
556 TradeTick::get_schema(Some(metadata.clone())).into(),
557 vec![
558 Arc::new(price),
559 Arc::new(size),
560 Arc::new(aggressor_side),
561 Arc::new(trade_id),
562 Arc::new(ts_event),
563 Arc::new(ts_init),
564 ],
565 )
566 .unwrap();
567
568 let result = TradeTick::decode_batch(&metadata, record_batch);
569 assert!(result.is_err());
570 let err = result.unwrap_err();
571 assert!(
572 err.to_string().contains("AggressorSide"),
573 "Expected AggressorSide error, got: {err}"
574 );
575 }
576
577 #[rstest]
578 fn test_decode_batch_missing_instrument_id_returns_error() {
579 let instrument_id = InstrumentId::from("AAPL.XNAS");
580 let mut metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
581 metadata.remove(KEY_INSTRUMENT_ID);
582
583 let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
584 let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
585 let size = FixedSizeBinaryArray::from(vec![
586 &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
587 ]);
588 let aggressor_side = UInt8Array::from(vec![0]);
589 let trade_id = StringArray::from(vec!["1"]);
590 let ts_event = UInt64Array::from(vec![1]);
591 let ts_init = UInt64Array::from(vec![2]);
592
593 let record_batch = RecordBatch::try_new(
594 TradeTick::get_schema(Some(metadata.clone())).into(),
595 vec![
596 Arc::new(price),
597 Arc::new(size),
598 Arc::new(aggressor_side),
599 Arc::new(trade_id),
600 Arc::new(ts_event),
601 Arc::new(ts_init),
602 ],
603 )
604 .unwrap();
605
606 let result = TradeTick::decode_batch(&metadata, record_batch);
607 assert!(result.is_err());
608 let err = result.unwrap_err();
609 assert!(
610 err.to_string().contains("instrument_id"),
611 "Expected missing instrument_id error, got: {err}"
612 );
613 }
614
615 #[rstest]
616 fn test_decode_batch_missing_price_precision_returns_error() {
617 let instrument_id = InstrumentId::from("AAPL.XNAS");
618 let mut metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
619 metadata.remove(KEY_PRICE_PRECISION);
620
621 let raw_price = (100.00 * FIXED_SCALAR) as PriceRaw;
622 let price = FixedSizeBinaryArray::from(vec![&raw_price.to_le_bytes()]);
623 let size = FixedSizeBinaryArray::from(vec![
624 &((1000.0 * FIXED_SCALAR) as QuantityRaw).to_le_bytes(),
625 ]);
626 let aggressor_side = UInt8Array::from(vec![0]);
627 let trade_id = StringArray::from(vec!["1"]);
628 let ts_event = UInt64Array::from(vec![1]);
629 let ts_init = UInt64Array::from(vec![2]);
630
631 let record_batch = RecordBatch::try_new(
632 TradeTick::get_schema(Some(metadata.clone())).into(),
633 vec![
634 Arc::new(price),
635 Arc::new(size),
636 Arc::new(aggressor_side),
637 Arc::new(trade_id),
638 Arc::new(ts_event),
639 Arc::new(ts_init),
640 ],
641 )
642 .unwrap();
643
644 let result = TradeTick::decode_batch(&metadata, record_batch);
645 assert!(result.is_err());
646 let err = result.unwrap_err();
647 assert!(
648 err.to_string().contains("price_precision"),
649 "Expected missing price_precision error, got: {err}"
650 );
651 }
652
653 #[rstest]
654 fn test_encode_decode_round_trip() {
655 let instrument_id = InstrumentId::from("AAPL.XNAS");
656 let metadata = TradeTick::get_metadata(&instrument_id, 2, 0);
657
658 let tick1 = TradeTick {
659 instrument_id,
660 price: Price::from("100.10"),
661 size: Quantity::from(1000),
662 aggressor_side: AggressorSide::Buyer,
663 trade_id: TradeId::new("trade-123"),
664 ts_event: 1_000_000_000.into(),
665 ts_init: 1_000_000_001.into(),
666 };
667
668 let tick2 = TradeTick {
669 instrument_id,
670 price: Price::from("100.50"),
671 size: Quantity::from(500),
672 aggressor_side: AggressorSide::Seller,
673 trade_id: TradeId::new("trade-456"),
674 ts_event: 2_000_000_000.into(),
675 ts_init: 2_000_000_001.into(),
676 };
677
678 let original = vec![tick1, tick2];
679 let record_batch = TradeTick::encode_batch(&metadata, &original).unwrap();
680 let decoded = TradeTick::decode_batch(&metadata, record_batch).unwrap();
681
682 assert_eq!(decoded.len(), original.len());
683 for (orig, dec) in original.iter().zip(decoded.iter()) {
684 assert_eq!(dec.instrument_id, orig.instrument_id);
685 assert_eq!(dec.price, orig.price);
686 assert_eq!(dec.size, orig.size);
687 assert_eq!(dec.aggressor_side, orig.aggressor_side);
688 assert_eq!(dec.trade_id, orig.trade_id);
689 assert_eq!(dec.ts_event, orig.ts_event);
690 assert_eq!(dec.ts_init, orig.ts_init);
691 }
692 }
693}