1use std::{io, num::NonZeroU64};
2
3use fallible_streaming_iterator::FallibleStreamingIterator;
4
5use crate::{
6 decode::{DbnMetadata, DecodeRecordRef},
7 encode::{DbnEncodable, EncodeDbn, EncodeRecord, EncodeRecordRef, EncodeRecordTextExt},
8 rtype_dispatch, schema_dispatch, v2, Error, RType, Record, Result, Schema, WithTsOut,
9 DBN_VERSION,
10};
11
12pub struct Encoder<W>
17where
18 W: io::Write,
19{
20 writer: csv::Writer<W>,
21 has_written_header: bool,
23 use_pretty_px: bool,
24 use_pretty_ts: bool,
25}
26
27pub struct EncoderBuilder<W>
31where
32 W: io::Write,
33{
34 writer: W,
35 use_pretty_px: bool,
36 use_pretty_ts: bool,
37 write_header: bool,
38 version: u8,
39 schema: Option<Schema>,
40 ts_out: bool,
41 with_symbol: bool,
42 delimiter: u8,
43}
44
45impl<W> EncoderBuilder<W>
46where
47 W: io::Write,
48{
49 pub fn new(writer: W) -> Self {
51 Self {
52 writer,
53 use_pretty_px: false,
54 use_pretty_ts: false,
55 write_header: true,
56 version: DBN_VERSION,
57 schema: None,
58 ts_out: false,
59 with_symbol: false,
60 delimiter: b',',
61 }
62 }
63
64 pub fn use_pretty_px(mut self, use_pretty_px: bool) -> Self {
67 self.use_pretty_px = use_pretty_px;
68 self
69 }
70
71 pub fn use_pretty_ts(mut self, use_pretty_ts: bool) -> Self {
74 self.use_pretty_ts = use_pretty_ts;
75 self
76 }
77
78 pub fn write_header(mut self, write_header: bool) -> Self {
84 self.write_header = write_header;
85 self
86 }
87
88 pub fn schema(mut self, schema: Option<Schema>) -> Self {
93 self.schema = schema;
94 self
95 }
96
97 pub fn ts_out(mut self, ts_out: bool) -> Self {
99 self.ts_out = ts_out;
100 self
101 }
102
103 pub fn with_symbol(mut self, with_symbol: bool) -> Self {
105 self.with_symbol = with_symbol;
106 self
107 }
108
109 pub fn delimiter(mut self, delimiter: u8) -> Self {
111 self.delimiter = delimiter;
112 self
113 }
114
115 pub fn version(mut self, version: u8) -> Self {
121 self.version = version;
122 self
123 }
124
125 pub fn build(self) -> crate::Result<Encoder<W>> {
131 let mut encoder = Encoder {
132 writer: csv::WriterBuilder::new()
133 .has_headers(false)
134 .delimiter(self.delimiter)
135 .from_writer(self.writer),
136 has_written_header: true,
137 use_pretty_px: self.use_pretty_px,
138 use_pretty_ts: self.use_pretty_ts,
139 };
140 if self.write_header {
141 if let Some(schema) = self.schema {
142 encoder.encode_header_for_schema(
143 self.version,
144 schema,
145 self.ts_out,
146 self.with_symbol,
147 )?;
148 } else {
149 encoder.has_written_header = false;
150 }
151 }
152 Ok(encoder)
153 }
154}
155
156impl<W> Encoder<W>
157where
158 W: io::Write,
159{
160 pub fn builder(writer: W) -> EncoderBuilder<W> {
162 EncoderBuilder::new(writer)
163 }
164
165 pub fn new(writer: W, use_pretty_px: bool, use_pretty_ts: bool) -> Self {
172 Self::builder(writer)
173 .use_pretty_px(use_pretty_px)
174 .use_pretty_ts(use_pretty_ts)
175 .build()
176 .unwrap()
178 }
179
180 pub fn get_ref(&self) -> &W {
182 self.writer.get_ref()
183 }
184
185 pub fn encode_header<R: DbnEncodable>(&mut self, with_symbol: bool) -> Result<()> {
196 R::serialize_header(&mut self.writer)?;
197 if with_symbol {
198 self.writer.write_field("symbol")?;
199 }
200 self.writer.write_record(None::<&[u8]>)?;
202 self.has_written_header = true;
203 Ok(())
204 }
205
206 pub fn encode_header_for_schema(
221 &mut self,
222 version: u8,
223 schema: Schema,
224 ts_out: bool,
225 with_symbol: bool,
226 ) -> Result<()> {
227 if version < 3 && schema == Schema::Definition {
229 if ts_out {
230 self.encode_header::<WithTsOut<v2::InstrumentDefMsg>>(with_symbol)?;
231 } else {
232 self.encode_header::<v2::InstrumentDefMsg>(with_symbol)?;
233 }
234 } else {
235 schema_dispatch!(schema, ts_out: ts_out, self.encode_header(with_symbol))?;
236 }
237 self.has_written_header = true;
238 Ok(())
239 }
240
241 fn encode_record_impl<R: DbnEncodable>(&mut self, record: &R) -> csv::Result<()> {
242 match (self.use_pretty_px, self.use_pretty_ts) {
243 (true, true) => record.serialize_to::<_, true, true>(&mut self.writer),
244 (true, false) => record.serialize_to::<_, true, false>(&mut self.writer),
245 (false, true) => record.serialize_to::<_, false, true>(&mut self.writer),
246 (false, false) => record.serialize_to::<_, false, false>(&mut self.writer),
247 }
248 }
249
250 fn encode_symbol(&mut self, symbol: Option<&str>) -> csv::Result<()> {
251 self.writer.write_field(symbol.unwrap_or_default())
252 }
253}
254
255impl<W> EncodeRecord for Encoder<W>
256where
257 W: io::Write,
258{
259 fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()> {
260 if !self.has_written_header {
261 self.encode_header::<R>(false)?;
262 }
263 match self
264 .encode_record_impl(record)
265 .and_then(|_| self.writer.write_record(None::<&[u8]>))
267 {
268 Ok(()) => Ok(()),
269 Err(e) => Err(match e.into_kind() {
270 csv::ErrorKind::Io(err) => Error::io(err, format!("serializing {record:?}")),
271 e => Error::encode(format!("failed to serialize {record:?}: {e:?}")),
272 }),
273 }
274 }
275
276 fn encode_records<R: DbnEncodable>(&mut self, records: &[R]) -> Result<()> {
277 for record in records {
278 self.encode_record(record)?;
279 }
280 self.flush()?;
281 Ok(())
282 }
283
284 fn flush(&mut self) -> Result<()> {
285 self.writer
286 .flush()
287 .map_err(|e| Error::io(e, "flushing output"))
288 }
289}
290
291impl<W> EncodeRecordRef for Encoder<W>
292where
293 W: io::Write,
294{
295 fn encode_record_ref(&mut self, record: crate::RecordRef) -> Result<()> {
296 rtype_dispatch!(record, self.encode_record())?
297 }
298
299 unsafe fn encode_record_ref_ts_out(
300 &mut self,
301 record: crate::RecordRef,
302 ts_out: bool,
303 ) -> Result<()> {
304 rtype_dispatch!(record, ts_out: ts_out, self.encode_record())?
305 }
306}
307
308impl<W> EncodeDbn for Encoder<W>
309where
310 W: io::Write,
311{
312 fn encode_stream<R: DbnEncodable>(
318 &mut self,
319 mut stream: impl FallibleStreamingIterator<Item = R, Error = Error>,
320 ) -> Result<()> {
321 while let Some(record) = stream.next()? {
322 self.encode_record(record)?;
323 }
324 self.flush()?;
325 Ok(())
326 }
327
328 fn encode_decoded<D: DecodeRecordRef + DbnMetadata>(&mut self, mut decoder: D) -> Result<()> {
336 let ts_out = decoder.metadata().ts_out;
337 if let Some(schema) = decoder.metadata().schema {
338 let rtype = RType::from(schema);
339 while let Some(record) = decoder.decode_record_ref()? {
340 if record.rtype().map_or(true, |r| r != rtype) {
341 return Err(Error::encode(format!("schema indicated {rtype:?}, but found record with rtype {:?}. Mixed schemas cannot be encoded in CSV.", record.rtype())));
342 }
343 unsafe { self.encode_record_ref_ts_out(record, ts_out) }?;
346 }
347 self.flush()?;
348 Ok(())
349 } else {
350 Err(Error::encode("can't encode a CSV with mixed schemas"))
351 }
352 }
353
354 fn encode_decoded_with_limit<D: DecodeRecordRef + DbnMetadata>(
355 &mut self,
356 mut decoder: D,
357 limit: NonZeroU64,
358 ) -> Result<()> {
359 let ts_out = decoder.metadata().ts_out;
360 if let Some(schema) = decoder.metadata().schema {
361 schema_dispatch!(schema, self.encode_header(false))?;
362 let rtype = RType::from(schema);
363 let mut i = 0;
364 while let Some(record) = decoder.decode_record_ref()? {
365 if record.rtype().map_or(true, |r| r != rtype) {
366 return Err(Error::encode(format!("schema indicated {rtype:?}, but found record with rtype {:?}. Mixed schemas cannot be encoded in CSV.", record.rtype())));
367 }
368 unsafe { self.encode_record_ref_ts_out(record, ts_out) }?;
371 i += 1;
372 if i == limit.get() {
373 break;
374 }
375 }
376 self.flush()?;
377 Ok(())
378 } else {
379 Err(Error::encode("can't encode a CSV with mixed schemas"))
380 }
381 }
382}
383
384impl<W> EncodeRecordTextExt for Encoder<W>
385where
386 W: io::Write,
387{
388 fn encode_record_with_sym<R: DbnEncodable>(
389 &mut self,
390 record: &R,
391 symbol: Option<&str>,
392 ) -> Result<()> {
393 if !self.has_written_header {
394 self.encode_header::<R>(true)?;
395 }
396 match self
397 .encode_record_impl(record)
398 .and_then(|_| self.encode_symbol(symbol))
399 .and_then(|_| self.writer.write_record(None::<&[u8]>))
401 {
402 Ok(()) => Ok(()),
403 Err(e) => Err(match e.into_kind() {
404 csv::ErrorKind::Io(err) => Error::io(err, format!("serializing {record:?}")),
405 e => Error::encode(format!("failed to serialize {record:?}: {e:?}")),
406 }),
407 }
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 #![allow(clippy::clone_on_copy)]
414
415 use std::{array, io::BufWriter, os::raw::c_char};
416
417 use rstest::*;
418
419 use super::*;
420 use crate::{
421 encode::test_data::{BID_ASK, RECORD_HEADER},
422 enums::{
423 rtype, InstrumentClass, SecurityUpdateAction, StatType, StatUpdateAction,
424 UserDefinedInstrument,
425 },
426 record::{
427 str_to_c_chars, ImbalanceMsg, InstrumentDefMsg, MboMsg, Mbp10Msg, Mbp1Msg, OhlcvMsg,
428 RecordHeader, StatMsg, StatusMsg, TradeMsg, WithTsOut,
429 },
430 test_utils::VecStream,
431 RecordRef, FIXED_PRICE_SCALE, UNDEF_PRICE,
432 };
433
434 fn header(sep: char) -> String {
435 format!("1658441851000000000{sep}4{sep}1{sep}323")
436 }
437
438 fn bid_ask(sep: char) -> String {
439 format!("372000000000000{sep}372500000000000{sep}10{sep}5{sep}5{sep}2")
440 }
441
442 fn extract_2nd_line(buffer: Vec<u8>) -> String {
443 let output = String::from_utf8(buffer).expect("valid UTF-8");
444 let (first, second) = output.split_once('\n').expect("two lines");
445 assert!(!first.trim().is_empty());
446 second
447 .trim_end() .to_owned()
449 }
450
451 #[rstest]
452 #[case::csv(b',')]
453 #[case::tsv(b'\t')]
454 fn test_mbo_encode_stream(#[case] sep: u8) {
455 let data = vec![MboMsg {
456 hd: RECORD_HEADER,
457 order_id: 16,
458 price: 5500,
459 size: 3,
460 flags: 128.into(),
461 channel_id: 14,
462 action: 'B' as c_char,
463 side: 'B' as c_char,
464 ts_recv: 1658441891000000000,
465 ts_in_delta: 22_000,
466 sequence: 1_002_375,
467 }];
468 let mut buffer = Vec::new();
469 let writer = BufWriter::new(&mut buffer);
470 Encoder::builder(writer)
471 .delimiter(sep)
472 .build()
473 .unwrap()
474 .encode_stream(VecStream::new(data))
475 .unwrap();
476 let line = extract_2nd_line(buffer);
477 let sep = sep as char;
478 assert_eq!(
479 line,
480 format!(
481 "1658441891000000000{sep}{}{sep}B{sep}B{sep}5500{sep}3{sep}14{sep}16{sep}128{sep}22000{sep}1002375",
482 header(sep)
483 )
484 );
485 }
486
487 #[rstest]
488 #[case::csv(b',')]
489 #[case::tsv(b'\t')]
490 fn test_mbp1_encode_records(#[case] sep: u8) {
491 let data = vec![Mbp1Msg {
492 hd: RECORD_HEADER,
493 price: 5500,
494 size: 3,
495 action: 'M' as c_char,
496 side: 'A' as c_char,
497 flags: 128.into(),
498 depth: 9,
499 ts_recv: 1658441891000000000,
500 ts_in_delta: 22_000,
501 sequence: 1_002_375,
502 levels: [BID_ASK],
503 }];
504 let mut buffer = Vec::new();
505 let writer = BufWriter::new(&mut buffer);
506 Encoder::builder(writer)
507 .delimiter(sep)
508 .build()
509 .unwrap()
510 .encode_records(data.as_slice())
511 .unwrap();
512 let line = extract_2nd_line(buffer);
513 let sep = sep as char;
514 assert_eq!(
515 line,
516 format!(
517 "1658441891000000000{sep}{}{sep}M{sep}A{sep}9{sep}5500{sep}3{sep}128{sep}22000{sep}1002375{sep}{}",
518 header(sep),
519 bid_ask(sep)
520 )
521 );
522 }
523
524 #[rstest]
525 #[case::csv(b',')]
526 #[case::tsv(b'\t')]
527 fn test_mbp10_encode_stream(#[case] sep: u8) {
528 let data = vec![Mbp10Msg {
529 hd: RECORD_HEADER,
530 price: 5500,
531 size: 3,
532 action: 'B' as c_char,
533 side: 'A' as c_char,
534 flags: 128.into(),
535 depth: 9,
536 ts_recv: 1658441891000000000,
537 ts_in_delta: 22_000,
538 sequence: 1_002_375,
539 levels: array::from_fn(|_| BID_ASK.clone()),
540 }];
541 let mut buffer = Vec::new();
542 let writer = BufWriter::new(&mut buffer);
543 Encoder::builder(writer)
544 .delimiter(sep)
545 .build()
546 .unwrap()
547 .encode_stream(VecStream::new(data))
548 .unwrap();
549 let line = extract_2nd_line(buffer);
550 let sep = sep as char;
551 assert_eq!(
552 line,
553 format!("1658441891000000000{sep}{}{sep}B{sep}A{sep}9{sep}5500{sep}3{sep}128{sep}22000\
554 {sep}1002375{sep}{bid_ask}{sep}{bid_ask}{sep}{bid_ask}{sep}{bid_ask}{sep}\
555 {bid_ask}{sep}{bid_ask}{sep}{bid_ask}{sep}{bid_ask}{sep}{bid_ask}{sep}{bid_ask}",
556 header(sep), bid_ask = bid_ask(sep))
557 );
558 }
559 #[rstest]
560 #[case::csv(b',')]
561 #[case::tsv(b'\t')]
562 fn test_trade_encode_records(#[case] sep: u8) {
563 let data = vec![TradeMsg {
564 hd: RECORD_HEADER,
565 price: 5500,
566 size: 3,
567 action: 'B' as c_char,
568 side: 'B' as c_char,
569 flags: 128.into(),
570 depth: 9,
571 ts_recv: 1658441891000000000,
572 ts_in_delta: 22_000,
573 sequence: 1_002_375,
574 }];
575 let mut buffer = Vec::new();
576 let writer = BufWriter::new(&mut buffer);
577 Encoder::builder(writer)
578 .delimiter(sep)
579 .build()
580 .unwrap()
581 .encode_records(data.as_slice())
582 .unwrap();
583 let line = extract_2nd_line(buffer);
584 let sep = sep as char;
585 assert_eq!(
586 line,
587 format!(
588 "1658441891000000000{sep}{}{sep}B{sep}B{sep}9{sep}5500{sep}3{sep}128{sep}22000{sep}1002375",
589 header(sep)
590 )
591 );
592 }
593
594 #[rstest]
595 #[case::csv(b',')]
596 #[case::tsv(b'\t')]
597 fn test_ohlcv_encode_stream(#[case] sep: u8) {
598 let data = vec![OhlcvMsg {
599 hd: RECORD_HEADER,
600 open: 5000,
601 high: 8000,
602 low: 3000,
603 close: 6000,
604 volume: 55_000,
605 }];
606 let mut buffer = Vec::new();
607 let writer = BufWriter::new(&mut buffer);
608 Encoder::builder(writer)
609 .delimiter(sep)
610 .build()
611 .unwrap()
612 .encode_stream(VecStream::new(data))
613 .unwrap();
614 let line = extract_2nd_line(buffer);
615 let sep = sep as char;
616 assert_eq!(
617 line,
618 format!(
619 "{}{sep}5000{sep}8000{sep}3000{sep}6000{sep}55000",
620 header(sep)
621 )
622 );
623 }
624
625 #[rstest]
626 #[case::csv(b',')]
627 #[case::tsv(b'\t')]
628 fn test_status_encode_records(#[case] sep: u8) {
629 let mut group = [0; 21];
630 for (i, c) in "group".chars().enumerate() {
631 group[i] = c as c_char;
632 }
633 let data = vec![StatusMsg {
634 hd: RECORD_HEADER,
635 ts_recv: 1658441891000000000,
636 action: 1,
637 reason: 2,
638 trading_event: 3,
639 is_trading: b'Y' as c_char,
640 is_quoting: b'Y' as c_char,
641 is_short_sell_restricted: b'~' as c_char,
642 _reserved: Default::default(),
643 }];
644 let mut buffer = Vec::new();
645 let writer = BufWriter::new(&mut buffer);
646 Encoder::builder(writer)
647 .delimiter(sep)
648 .build()
649 .unwrap()
650 .encode_records(data.as_slice())
651 .unwrap();
652 let line = extract_2nd_line(buffer);
653 let sep = sep as char;
654 assert_eq!(
655 line,
656 format!(
657 "1658441891000000000{sep}{}{sep}1{sep}2{sep}3{sep}Y{sep}Y{sep}~",
658 header(sep)
659 )
660 );
661 }
662
663 #[rstest]
664 #[case::csv(b',')]
665 #[case::tsv(b'\t')]
666 fn test_instrument_def_encode_stream(#[case] sep: u8) {
667 let data = vec![InstrumentDefMsg {
668 hd: RECORD_HEADER,
669 ts_recv: 1658441891000000000,
670 min_price_increment: 100,
671 display_factor: 1000,
672 expiration: 1698450000000000000,
673 activation: 1697350000000000000,
674 high_limit_price: 1_000_000,
675 low_limit_price: -1_000_000,
676 max_price_variation: 0,
677 unit_of_measure_qty: 5,
678 min_price_increment_amount: 5,
679 price_ratio: 10,
680 inst_attrib_value: 10,
681 underlying_id: 256785,
682 raw_instrument_id: RECORD_HEADER.instrument_id as u64,
683 market_depth_implied: 0,
684 market_depth: 13,
685 market_segment_id: 0,
686 max_trade_vol: 10_000,
687 min_lot_size: 1,
688 min_lot_size_block: 1000,
689 min_lot_size_round_lot: 100,
690 min_trade_vol: 1,
691 contract_multiplier: 0,
692 decay_quantity: 0,
693 original_contract_size: 0,
694 appl_id: 0,
695 maturity_year: 0,
696 decay_start_date: 0,
697 channel_id: 4,
698 currency: str_to_c_chars("USD").unwrap(),
699 settl_currency: str_to_c_chars("USD").unwrap(),
700 secsubtype: Default::default(),
701 raw_symbol: str_to_c_chars("ESZ4 C4100").unwrap(),
702 group: str_to_c_chars("EW").unwrap(),
703 exchange: str_to_c_chars("XCME").unwrap(),
704 asset: str_to_c_chars("ES").unwrap(),
705 cfi: str_to_c_chars("OCAFPS").unwrap(),
706 security_type: str_to_c_chars("OOF").unwrap(),
707 unit_of_measure: str_to_c_chars("IPNT").unwrap(),
708 underlying: str_to_c_chars("ESZ4").unwrap(),
709 strike_price_currency: str_to_c_chars("USD").unwrap(),
710 instrument_class: InstrumentClass::Call as u8 as c_char,
711 strike_price: 4_100_000_000_000,
712 match_algorithm: 'F' as c_char,
713 main_fraction: 4,
714 price_display_format: 8,
715 sub_fraction: 23,
716 underlying_product: 10,
717 security_update_action: SecurityUpdateAction::Add as c_char,
718 maturity_month: 8,
719 maturity_day: 9,
720 maturity_week: 11,
721 user_defined_instrument: UserDefinedInstrument::No as c_char,
722 contract_multiplier_unit: 0,
723 flow_schedule_type: 5,
724 tick_rule: 0,
725 leg_count: 2,
726 leg_index: 1,
727 leg_instrument_id: 24,
728 leg_underlying_id: 25,
729 leg_instrument_class: InstrumentClass::Future as c_char,
730 leg_ratio_qty_numerator: 1,
731 leg_ratio_qty_denominator: 2,
732 ..Default::default()
733 }];
734 let mut buffer = Vec::new();
735 let writer = BufWriter::new(&mut buffer);
736 Encoder::builder(writer)
737 .delimiter(sep)
738 .build()
739 .unwrap()
740 .encode_stream(VecStream::new(data))
741 .unwrap();
742 let line = extract_2nd_line(buffer);
743 let sep = sep as char;
744 assert_eq!(line, format!("1658441891000000000{sep}{}{sep}ESZ4 C4100{sep}A{sep}C{sep}100{sep}\
745 1000{sep}1698450000000000000{sep}1697350000000000000{sep}1000000\
746 {sep}-1000000{sep}0{sep}5{sep}5{sep}10{sep}10{sep}256785\
747 {sep}323{sep}0{sep}13{sep}0{sep}10000{sep}1{sep}1000{sep}100{sep}1\
748 {sep}0{sep}0{sep}0{sep}0{sep}0{sep}0{sep}4{sep}USD{sep}USD\
749 {sep}{sep}EW{sep}XCME{sep}ES{sep}OCAFPS{sep}OOF{sep}IPNT{sep}ESZ4{sep}\
750 USD{sep}4100000000000{sep}F{sep}4{sep}8{sep}23{sep}10\
751 {sep}8{sep}9{sep}11{sep}N{sep}0{sep}5{sep}0{sep}2{sep}1{sep}24{sep}\
752 {sep}F{sep}N{sep}{UNDEF_PRICE}{sep}{UNDEF_PRICE}{sep}0{sep}0{sep}1{sep}2{sep}25", header(sep)));
753 }
754
755 #[rstest]
756 #[case::csv(b',')]
757 #[case::tsv(b'\t')]
758 fn test_encode_with_ts_out(#[case] sep: u8) {
759 let data = vec![WithTsOut {
760 rec: TradeMsg {
761 hd: RECORD_HEADER,
762 price: 5500,
763 size: 3,
764 action: 'T' as c_char,
765 side: 'A' as c_char,
766 flags: 128.into(),
767 depth: 9,
768 ts_recv: 1658441891000000000,
769 ts_in_delta: 22_000,
770 sequence: 1_002_375,
771 },
772 ts_out: 1678480044000000000,
773 }];
774 let mut buffer = Vec::new();
775 let writer = BufWriter::new(&mut buffer);
776 Encoder::builder(writer)
777 .delimiter(sep)
778 .build()
779 .unwrap()
780 .encode_records(data.as_slice())
781 .unwrap();
782 let lines = String::from_utf8(buffer).expect("valid UTF-8");
783 let sep = sep as char;
784 assert_eq!(
785 lines,
786 format!("ts_recv{sep}ts_event{sep}rtype{sep}publisher_id{sep}instrument_id{sep}action\
787 {sep}side{sep}depth{sep}price{sep}size{sep}flags{sep}ts_in_delta{sep}sequence\
788 {sep}ts_out\n1658441891000000000{sep}{}{sep}T{sep}A{sep}9{sep}5500{sep}3{sep}128\
789 {sep}22000{sep}1002375{sep}1678480044000000000\n", header(sep))
790 );
791 }
792
793 #[rstest]
794 #[case::csv(b',')]
795 #[case::tsv(b'\t')]
796 fn test_imbalance_encode_records(#[case] sep: u8) {
797 let data = vec![ImbalanceMsg {
798 hd: RECORD_HEADER,
799 ts_recv: 1,
800 ref_price: 2,
801 auction_time: 3,
802 cont_book_clr_price: 4,
803 auct_interest_clr_price: 5,
804 ssr_filling_price: 6,
805 ind_match_price: 7,
806 upper_collar: 8,
807 lower_collar: 9,
808 paired_qty: 10,
809 total_imbalance_qty: 11,
810 market_imbalance_qty: 12,
811 unpaired_qty: 13,
812 auction_type: 'B' as c_char,
813 side: 'A' as c_char,
814 auction_status: 14,
815 freeze_status: 15,
816 num_extensions: 16,
817 unpaired_side: 'A' as c_char,
818 significant_imbalance: 'N' as c_char,
819 _reserved: [0],
820 }];
821 let mut buffer = Vec::new();
822 let writer = BufWriter::new(&mut buffer);
823 Encoder::builder(writer)
824 .delimiter(sep)
825 .build()
826 .unwrap()
827 .encode_records(data.as_slice())
828 .unwrap();
829 let line = extract_2nd_line(buffer);
830 let sep = sep as char;
831 assert_eq!(
832 line,
833 format!(
834 "1{sep}{}{sep}2{sep}3{sep}4{sep}5{sep}6{sep}7{sep}8{sep}9{sep}10{sep}11{sep}12{sep}\
835 13{sep}B{sep}A{sep}14{sep}15{sep}16{sep}A{sep}N",
836 header(sep)
837 )
838 );
839 }
840
841 #[rstest]
842 #[case::csv(b',')]
843 #[case::tsv(b'\t')]
844 fn test_stat_encode_stream(#[case] sep: u8) {
845 let data = vec![StatMsg {
846 hd: RECORD_HEADER,
847 ts_recv: 1,
848 ts_ref: 2,
849 price: 3,
850 quantity: 0,
851 sequence: 4,
852 ts_in_delta: 5,
853 stat_type: StatType::OpeningPrice as u16,
854 channel_id: 7,
855 update_action: StatUpdateAction::New as u8,
856 stat_flags: 0,
857 _reserved: Default::default(),
858 }];
859 let mut buffer = Vec::new();
860 let writer = BufWriter::new(&mut buffer);
861 Encoder::builder(writer)
862 .delimiter(sep)
863 .build()
864 .unwrap()
865 .encode_stream(VecStream::new(data))
866 .unwrap();
867 let line = extract_2nd_line(buffer);
868 let sep = sep as char;
869 assert_eq!(
870 line,
871 format!(
872 "1{sep}{}{sep}2{sep}3{sep}0{sep}4{sep}5{sep}1{sep}7{sep}1{sep}0",
873 header(sep)
874 )
875 );
876 }
877
878 #[test]
879 fn test_encode_ref_with_sym() {
880 let mut buffer = Vec::new();
881 const BAR: OhlcvMsg = OhlcvMsg {
882 hd: RecordHeader::new::<OhlcvMsg>(rtype::OHLCV_1H, 10, 9, 0),
883 open: 175 * FIXED_PRICE_SCALE,
884 high: 177 * FIXED_PRICE_SCALE,
885 low: 174 * FIXED_PRICE_SCALE,
886 close: 175 * FIXED_PRICE_SCALE,
887 volume: 4033445,
888 };
889 let rec_ref = RecordRef::from(&BAR);
890 let mut encoder = Encoder::builder(&mut buffer)
891 .use_pretty_px(false)
892 .use_pretty_ts(false)
893 .write_header(false)
894 .build()
895 .unwrap();
896 encoder.encode_ref_with_sym(rec_ref, None).unwrap();
897 encoder.encode_ref_with_sym(rec_ref, Some("AAPL")).unwrap();
898 drop(encoder);
899 let res = String::from_utf8(buffer).unwrap();
900 assert_eq!(
901 res,
902 "0,34,10,9,175000000000,177000000000,174000000000,175000000000,4033445,\n\
903 0,34,10,9,175000000000,177000000000,174000000000,175000000000,4033445,AAPL\n"
904 );
905 }
906
907 #[test]
908 fn test_encode_header_for_schema() {
909 let mut buffer = Vec::new();
910 {
911 let mut encoder = Encoder::new(&mut buffer, false, false);
912 encoder
913 .encode_header_for_schema(DBN_VERSION, Schema::Statistics, false, false)
914 .unwrap();
915 }
916 {
917 let mut encoder = Encoder::new(&mut buffer, false, false);
918 encoder
919 .encode_header_for_schema(DBN_VERSION, Schema::Statistics, true, true)
920 .unwrap();
921 }
922
923 let res = String::from_utf8(buffer).unwrap();
924 let (fst_line, snd_line) = res.split_once('\n').unwrap();
925 assert!(snd_line.ends_with(",symbol\n"));
926 let orig_header = snd_line.split_once(",ts_out,symbol").unwrap().0;
927 assert_eq!(fst_line, orig_header);
928 }
929}