Skip to main content

dbn/encode/csv/
sync.rs

1use std::{io, num::NonZeroU64};
2
3use fallible_streaming_iterator::FallibleStreamingIterator;
4
5use crate::{
6    decode::{DbnMetadata, DecodeRecordRef},
7    encode::{DbnEncodable, EncodeDbn, EncodeRecord, EncodeRecordRef, EncodeRecordTextExt},
8    rtype_dispatch, schema_dispatch, v2, Error, RType, Record, Result, Schema, WithTsOut,
9    DBN_VERSION,
10};
11
12/// Type for encoding files and streams of DBN records in CSV or other text-delimited
13/// tabular file formats including TSV (tab-separated values).
14///
15/// Note that encoding [`Metadata`](crate::Metadata) in CSV is not supported.
16pub struct Encoder<W>
17where
18    W: io::Write,
19{
20    writer: csv::Writer<W>,
21    /// Prevent writing header twice.
22    has_written_header: bool,
23    use_pretty_px: bool,
24    use_pretty_ts: bool,
25}
26
27/// Helper for constructing a CSV [`Encoder`].
28///
29/// No fields are required.
30pub struct EncoderBuilder<W>
31where
32    W: io::Write,
33{
34    writer: W,
35    use_pretty_px: bool,
36    use_pretty_ts: bool,
37    write_header: bool,
38    version: u8,
39    schema: Option<Schema>,
40    ts_out: bool,
41    with_symbol: bool,
42    delimiter: u8,
43}
44
45impl<W> EncoderBuilder<W>
46where
47    W: io::Write,
48{
49    /// Creates a new CSV encoder builder.
50    pub fn new(writer: W) -> Self {
51        Self {
52            writer,
53            use_pretty_px: false,
54            use_pretty_ts: false,
55            write_header: true,
56            version: DBN_VERSION,
57            schema: None,
58            ts_out: false,
59            with_symbol: false,
60            delimiter: b',',
61        }
62    }
63
64    /// Sets whether the CSV encoder will serialize price fields as a decimal. Defaults
65    /// to `false`.
66    pub fn use_pretty_px(mut self, use_pretty_px: bool) -> Self {
67        self.use_pretty_px = use_pretty_px;
68        self
69    }
70
71    /// Sets whether the CSV encoder will serialize timestamp fields as ISO8601 datetime
72    /// strings. Defaults to `false`.
73    pub fn use_pretty_ts(mut self, use_pretty_ts: bool) -> Self {
74        self.use_pretty_ts = use_pretty_ts;
75        self
76    }
77
78    /// Sets whether the CSV encoder will write a header row automatically.
79    /// Defaults to `true`.
80    ///
81    /// If `false`, a header row can still be written with
82    /// [`Encoder::encode_header()`] or [`Encoder::encode_header_for_schema()`].
83    pub fn write_header(mut self, write_header: bool) -> Self {
84        self.write_header = write_header;
85        self
86    }
87
88    /// Sets the schema that will be encoded, used for determining the header row to write.
89    ///
90    /// If schema isn't set and `write_header` is left enabled, the header will be written
91    /// based on the type of the first record.
92    pub fn schema(mut self, schema: Option<Schema>) -> Self {
93        self.schema = schema;
94        self
95    }
96
97    /// Sets whether to add a header field "ts_out". Defaults to `false`.
98    pub fn ts_out(mut self, ts_out: bool) -> Self {
99        self.ts_out = ts_out;
100        self
101    }
102
103    /// Sets whether to add a header field "symbol". Defaults to `false`.
104    pub fn with_symbol(mut self, with_symbol: bool) -> Self {
105        self.with_symbol = with_symbol;
106        self
107    }
108
109    /// Sets the field delimiter. Defaults to `b','` for comma-separated values (CSV).
110    pub fn delimiter(mut self, delimiter: u8) -> Self {
111        self.delimiter = delimiter;
112        self
113    }
114
115    /// Sets the DBN version which is used for determining which fields to include in
116    /// the header. Currently only relevant to the definition schema where fields have
117    /// changed between versions.
118    ///
119    /// If not specified, defaults to [`DBN_VERSION`].
120    pub fn version(mut self, version: u8) -> Self {
121        self.version = version;
122        self
123    }
124
125    /// Creates the new encoder with the previously specified settings and if
126    /// `write_header` is `true`, encodes the header row.
127    ///
128    /// # Errors
129    /// This function returns an error if it fails to write the header row.
130    pub fn build(self) -> crate::Result<Encoder<W>> {
131        let mut encoder = Encoder {
132            writer: csv::WriterBuilder::new()
133                .has_headers(false)
134                .delimiter(self.delimiter)
135                .from_writer(self.writer),
136            has_written_header: true,
137            use_pretty_px: self.use_pretty_px,
138            use_pretty_ts: self.use_pretty_ts,
139        };
140        if self.write_header {
141            if let Some(schema) = self.schema {
142                encoder.encode_header_for_schema(
143                    self.version,
144                    schema,
145                    self.ts_out,
146                    self.with_symbol,
147                )?;
148            } else {
149                encoder.has_written_header = false;
150            }
151        }
152        Ok(encoder)
153    }
154}
155
156impl<W> Encoder<W>
157where
158    W: io::Write,
159{
160    /// Creates a builder for configuring an `Encoder` object.
161    pub fn builder(writer: W) -> EncoderBuilder<W> {
162        EncoderBuilder::new(writer)
163    }
164
165    /// Creates a new [`Encoder`] that will write to `writer`.
166    ///
167    /// If `use_pretty_px`
168    /// is `true`, price fields will be serialized as a decimal. If `pretty_ts` is
169    /// `true`, timestamp fields will be serialized in a ISO8601 datetime string.
170    /// By default, a header will be written once a schema can be inferred.
171    pub fn new(writer: W, use_pretty_px: bool, use_pretty_ts: bool) -> Self {
172        Self::builder(writer)
173            .use_pretty_px(use_pretty_px)
174            .use_pretty_ts(use_pretty_ts)
175            .build()
176            // Not setting `schema` or enabling `write_header`
177            .unwrap()
178    }
179
180    /// Returns a reference to the underlying writer.
181    pub fn get_ref(&self) -> &W {
182        self.writer.get_ref()
183    }
184
185    /// Encodes the CSV header for the record type `R`, i.e. the names of each of the
186    /// fields to the output.
187    ///
188    /// If `with_symbol` is `true`, will add a header field for "symbol". This should
189    /// only be used with  [`Self::encode_record_with_sym()`] and
190    /// [`Self::encode_ref_with_sym()`], otherwise there will be a mismatch between the
191    /// number of fields in the header and the body.
192    ///
193    /// # Errors
194    /// This function returns an error if there's an error writing to `writer`.
195    pub fn encode_header<R: DbnEncodable>(&mut self, with_symbol: bool) -> Result<()> {
196        R::serialize_header(&mut self.writer)?;
197        if with_symbol {
198            self.writer.write_field("symbol")?;
199        }
200        // end of line
201        self.writer.write_record(None::<&[u8]>)?;
202        self.has_written_header = true;
203        Ok(())
204    }
205
206    /// Encodes the CSV header for `schema`, i.e. the names of each of the fields to
207    /// the output. Only supports the current [`DBN_VERSION`](crate::DBN_VERSION), use
208    /// [`encode_header()`](Self::encode_header) to encode a header for older versions
209    /// of schemas that changed between DBN versions.
210    ///
211    /// If `ts_out` is `true`, it will add a header field "ts_out".
212    ///
213    /// If `with_symbol` is `true`, it will add a header field for "symbol". This should
214    /// only be used with  [`Self::encode_record_with_sym()`] and
215    /// [`Self::encode_ref_with_sym()`], otherwise there will be a mismatch between the
216    /// number of fields in the header and the body.
217    ///
218    /// # Errors
219    /// This function returns an error if there's an error writing to `writer`.
220    pub fn encode_header_for_schema(
221        &mut self,
222        version: u8,
223        schema: Schema,
224        ts_out: bool,
225        with_symbol: bool,
226    ) -> Result<()> {
227        // Workaround for definitions fields changing between versions 1/2 and 3
228        if version < 3 && schema == Schema::Definition {
229            if ts_out {
230                self.encode_header::<WithTsOut<v2::InstrumentDefMsg>>(with_symbol)?;
231            } else {
232                self.encode_header::<v2::InstrumentDefMsg>(with_symbol)?;
233            }
234        } else {
235            schema_dispatch!(schema, ts_out: ts_out, self.encode_header(with_symbol))?;
236        }
237        self.has_written_header = true;
238        Ok(())
239    }
240
241    fn encode_record_impl<R: DbnEncodable>(&mut self, record: &R) -> csv::Result<()> {
242        match (self.use_pretty_px, self.use_pretty_ts) {
243            (true, true) => record.serialize_to::<_, true, true>(&mut self.writer),
244            (true, false) => record.serialize_to::<_, true, false>(&mut self.writer),
245            (false, true) => record.serialize_to::<_, false, true>(&mut self.writer),
246            (false, false) => record.serialize_to::<_, false, false>(&mut self.writer),
247        }
248    }
249
250    fn encode_symbol(&mut self, symbol: Option<&str>) -> csv::Result<()> {
251        self.writer.write_field(symbol.unwrap_or_default())
252    }
253}
254
255impl<W> EncodeRecord for Encoder<W>
256where
257    W: io::Write,
258{
259    fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()> {
260        if !self.has_written_header {
261            self.encode_header::<R>(false)?;
262        }
263        match self
264            .encode_record_impl(record)
265            // write new line
266            .and_then(|_| self.writer.write_record(None::<&[u8]>))
267        {
268            Ok(()) => Ok(()),
269            Err(e) => Err(match e.into_kind() {
270                csv::ErrorKind::Io(err) => Error::io(err, format!("serializing {record:?}")),
271                e => Error::encode(format!("failed to serialize {record:?}: {e:?}")),
272            }),
273        }
274    }
275
276    fn encode_records<R: DbnEncodable>(&mut self, records: &[R]) -> Result<()> {
277        for record in records {
278            self.encode_record(record)?;
279        }
280        self.flush()?;
281        Ok(())
282    }
283
284    fn flush(&mut self) -> Result<()> {
285        self.writer
286            .flush()
287            .map_err(|e| Error::io(e, "flushing output"))
288    }
289}
290
291impl<W> EncodeRecordRef for Encoder<W>
292where
293    W: io::Write,
294{
295    fn encode_record_ref(&mut self, record: crate::RecordRef) -> Result<()> {
296        rtype_dispatch!(record, self.encode_record())?
297    }
298
299    unsafe fn encode_record_ref_ts_out(
300        &mut self,
301        record: crate::RecordRef,
302        ts_out: bool,
303    ) -> Result<()> {
304        rtype_dispatch!(record, ts_out: ts_out, self.encode_record())?
305    }
306}
307
308impl<W> EncodeDbn for Encoder<W>
309where
310    W: io::Write,
311{
312    /// Encodes a stream of DBN records.
313    ///
314    /// # Errors
315    /// This function returns an error if it's unable to write to the underlying writer
316    /// or there's a serialization error.
317    fn encode_stream<R: DbnEncodable>(
318        &mut self,
319        mut stream: impl FallibleStreamingIterator<Item = R, Error = Error>,
320    ) -> Result<()> {
321        while let Some(record) = stream.next()? {
322            self.encode_record(record)?;
323        }
324        self.flush()?;
325        Ok(())
326    }
327
328    /// Encode DBN records directly from a DBN decoder. This implemented outside
329    /// [`EncodeDbn`] because the CSV encoder has the additional constraint of only
330    /// being able to encode a single schema in a stream.
331    ///
332    /// # Errors
333    /// This function returns an error if it's unable to write to the underlying writer
334    /// or there's a serialization error.
335    fn encode_decoded<D: DecodeRecordRef + DbnMetadata>(&mut self, mut decoder: D) -> Result<()> {
336        let ts_out = decoder.metadata().ts_out;
337        if let Some(schema) = decoder.metadata().schema {
338            let rtype = RType::from(schema);
339            while let Some(record) = decoder.decode_record_ref()? {
340                if record.rtype().map_or(true, |r| r != rtype) {
341                    return Err(Error::encode(format!("schema indicated {rtype:?}, but found record with rtype {:?}. Mixed schemas cannot be encoded in CSV.", record.rtype())));
342                }
343                // Safety: It's safe to cast to `WithTsOut` because we're passing in the `ts_out`
344                // from the metadata header.
345                unsafe { self.encode_record_ref_ts_out(record, ts_out) }?;
346            }
347            self.flush()?;
348            Ok(())
349        } else {
350            Err(Error::encode("can't encode a CSV with mixed schemas"))
351        }
352    }
353
354    fn encode_decoded_with_limit<D: DecodeRecordRef + DbnMetadata>(
355        &mut self,
356        mut decoder: D,
357        limit: NonZeroU64,
358    ) -> Result<()> {
359        let ts_out = decoder.metadata().ts_out;
360        if let Some(schema) = decoder.metadata().schema {
361            schema_dispatch!(schema, self.encode_header(false))?;
362            let rtype = RType::from(schema);
363            let mut i = 0;
364            while let Some(record) = decoder.decode_record_ref()? {
365                if record.rtype().map_or(true, |r| r != rtype) {
366                    return Err(Error::encode(format!("schema indicated {rtype:?}, but found record with rtype {:?}. Mixed schemas cannot be encoded in CSV.", record.rtype())));
367                }
368                // Safety: It's safe to cast to `WithTsOut` because we're passing in the `ts_out`
369                // from the metadata header.
370                unsafe { self.encode_record_ref_ts_out(record, ts_out) }?;
371                i += 1;
372                if i == limit.get() {
373                    break;
374                }
375            }
376            self.flush()?;
377            Ok(())
378        } else {
379            Err(Error::encode("can't encode a CSV with mixed schemas"))
380        }
381    }
382}
383
384impl<W> EncodeRecordTextExt for Encoder<W>
385where
386    W: io::Write,
387{
388    fn encode_record_with_sym<R: DbnEncodable>(
389        &mut self,
390        record: &R,
391        symbol: Option<&str>,
392    ) -> Result<()> {
393        if !self.has_written_header {
394            self.encode_header::<R>(true)?;
395        }
396        match self
397            .encode_record_impl(record)
398            .and_then(|_| self.encode_symbol(symbol))
399            // write new line
400            .and_then(|_| self.writer.write_record(None::<&[u8]>))
401        {
402            Ok(()) => Ok(()),
403            Err(e) => Err(match e.into_kind() {
404                csv::ErrorKind::Io(err) => Error::io(err, format!("serializing {record:?}")),
405                e => Error::encode(format!("failed to serialize {record:?}: {e:?}")),
406            }),
407        }
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    #![allow(clippy::clone_on_copy)]
414
415    use std::{array, io::BufWriter, os::raw::c_char};
416
417    use rstest::*;
418
419    use super::*;
420    use crate::{
421        encode::test_data::{BID_ASK, RECORD_HEADER},
422        enums::{
423            rtype, InstrumentClass, SecurityUpdateAction, StatType, StatUpdateAction,
424            UserDefinedInstrument,
425        },
426        record::{
427            str_to_c_chars, ImbalanceMsg, InstrumentDefMsg, MboMsg, Mbp10Msg, Mbp1Msg, OhlcvMsg,
428            RecordHeader, StatMsg, StatusMsg, TradeMsg, WithTsOut,
429        },
430        test_utils::VecStream,
431        RecordRef, FIXED_PRICE_SCALE, UNDEF_PRICE,
432    };
433
434    fn header(sep: char) -> String {
435        format!("1658441851000000000{sep}4{sep}1{sep}323")
436    }
437
438    fn bid_ask(sep: char) -> String {
439        format!("372000000000000{sep}372500000000000{sep}10{sep}5{sep}5{sep}2")
440    }
441
442    fn extract_2nd_line(buffer: Vec<u8>) -> String {
443        let output = String::from_utf8(buffer).expect("valid UTF-8");
444        let (first, second) = output.split_once('\n').expect("two lines");
445        assert!(!first.trim().is_empty());
446        second
447            .trim_end() // remove newline
448            .to_owned()
449    }
450
451    #[rstest]
452    #[case::csv(b',')]
453    #[case::tsv(b'\t')]
454    fn test_mbo_encode_stream(#[case] sep: u8) {
455        let data = vec![MboMsg {
456            hd: RECORD_HEADER,
457            order_id: 16,
458            price: 5500,
459            size: 3,
460            flags: 128.into(),
461            channel_id: 14,
462            action: 'B' as c_char,
463            side: 'B' as c_char,
464            ts_recv: 1658441891000000000,
465            ts_in_delta: 22_000,
466            sequence: 1_002_375,
467        }];
468        let mut buffer = Vec::new();
469        let writer = BufWriter::new(&mut buffer);
470        Encoder::builder(writer)
471            .delimiter(sep)
472            .build()
473            .unwrap()
474            .encode_stream(VecStream::new(data))
475            .unwrap();
476        let line = extract_2nd_line(buffer);
477        let sep = sep as char;
478        assert_eq!(
479            line,
480            format!(
481                "1658441891000000000{sep}{}{sep}B{sep}B{sep}5500{sep}3{sep}14{sep}16{sep}128{sep}22000{sep}1002375",
482                header(sep)
483            )
484        );
485    }
486
487    #[rstest]
488    #[case::csv(b',')]
489    #[case::tsv(b'\t')]
490    fn test_mbp1_encode_records(#[case] sep: u8) {
491        let data = vec![Mbp1Msg {
492            hd: RECORD_HEADER,
493            price: 5500,
494            size: 3,
495            action: 'M' as c_char,
496            side: 'A' as c_char,
497            flags: 128.into(),
498            depth: 9,
499            ts_recv: 1658441891000000000,
500            ts_in_delta: 22_000,
501            sequence: 1_002_375,
502            levels: [BID_ASK],
503        }];
504        let mut buffer = Vec::new();
505        let writer = BufWriter::new(&mut buffer);
506        Encoder::builder(writer)
507            .delimiter(sep)
508            .build()
509            .unwrap()
510            .encode_records(data.as_slice())
511            .unwrap();
512        let line = extract_2nd_line(buffer);
513        let sep = sep as char;
514        assert_eq!(
515            line,
516            format!(
517                "1658441891000000000{sep}{}{sep}M{sep}A{sep}9{sep}5500{sep}3{sep}128{sep}22000{sep}1002375{sep}{}",
518                header(sep),
519                bid_ask(sep)
520            )
521        );
522    }
523
524    #[rstest]
525    #[case::csv(b',')]
526    #[case::tsv(b'\t')]
527    fn test_mbp10_encode_stream(#[case] sep: u8) {
528        let data = vec![Mbp10Msg {
529            hd: RECORD_HEADER,
530            price: 5500,
531            size: 3,
532            action: 'B' as c_char,
533            side: 'A' as c_char,
534            flags: 128.into(),
535            depth: 9,
536            ts_recv: 1658441891000000000,
537            ts_in_delta: 22_000,
538            sequence: 1_002_375,
539            levels: array::from_fn(|_| BID_ASK.clone()),
540        }];
541        let mut buffer = Vec::new();
542        let writer = BufWriter::new(&mut buffer);
543        Encoder::builder(writer)
544            .delimiter(sep)
545            .build()
546            .unwrap()
547            .encode_stream(VecStream::new(data))
548            .unwrap();
549        let line = extract_2nd_line(buffer);
550        let sep = sep as char;
551        assert_eq!(
552            line,
553            format!("1658441891000000000{sep}{}{sep}B{sep}A{sep}9{sep}5500{sep}3{sep}128{sep}22000\
554                     {sep}1002375{sep}{bid_ask}{sep}{bid_ask}{sep}{bid_ask}{sep}{bid_ask}{sep}\
555                     {bid_ask}{sep}{bid_ask}{sep}{bid_ask}{sep}{bid_ask}{sep}{bid_ask}{sep}{bid_ask}",
556                     header(sep), bid_ask = bid_ask(sep))
557        );
558    }
559    #[rstest]
560    #[case::csv(b',')]
561    #[case::tsv(b'\t')]
562    fn test_trade_encode_records(#[case] sep: u8) {
563        let data = vec![TradeMsg {
564            hd: RECORD_HEADER,
565            price: 5500,
566            size: 3,
567            action: 'B' as c_char,
568            side: 'B' as c_char,
569            flags: 128.into(),
570            depth: 9,
571            ts_recv: 1658441891000000000,
572            ts_in_delta: 22_000,
573            sequence: 1_002_375,
574        }];
575        let mut buffer = Vec::new();
576        let writer = BufWriter::new(&mut buffer);
577        Encoder::builder(writer)
578            .delimiter(sep)
579            .build()
580            .unwrap()
581            .encode_records(data.as_slice())
582            .unwrap();
583        let line = extract_2nd_line(buffer);
584        let sep = sep as char;
585        assert_eq!(
586            line,
587            format!(
588                "1658441891000000000{sep}{}{sep}B{sep}B{sep}9{sep}5500{sep}3{sep}128{sep}22000{sep}1002375",
589                header(sep)
590            )
591        );
592    }
593
594    #[rstest]
595    #[case::csv(b',')]
596    #[case::tsv(b'\t')]
597    fn test_ohlcv_encode_stream(#[case] sep: u8) {
598        let data = vec![OhlcvMsg {
599            hd: RECORD_HEADER,
600            open: 5000,
601            high: 8000,
602            low: 3000,
603            close: 6000,
604            volume: 55_000,
605        }];
606        let mut buffer = Vec::new();
607        let writer = BufWriter::new(&mut buffer);
608        Encoder::builder(writer)
609            .delimiter(sep)
610            .build()
611            .unwrap()
612            .encode_stream(VecStream::new(data))
613            .unwrap();
614        let line = extract_2nd_line(buffer);
615        let sep = sep as char;
616        assert_eq!(
617            line,
618            format!(
619                "{}{sep}5000{sep}8000{sep}3000{sep}6000{sep}55000",
620                header(sep)
621            )
622        );
623    }
624
625    #[rstest]
626    #[case::csv(b',')]
627    #[case::tsv(b'\t')]
628    fn test_status_encode_records(#[case] sep: u8) {
629        let mut group = [0; 21];
630        for (i, c) in "group".chars().enumerate() {
631            group[i] = c as c_char;
632        }
633        let data = vec![StatusMsg {
634            hd: RECORD_HEADER,
635            ts_recv: 1658441891000000000,
636            action: 1,
637            reason: 2,
638            trading_event: 3,
639            is_trading: b'Y' as c_char,
640            is_quoting: b'Y' as c_char,
641            is_short_sell_restricted: b'~' as c_char,
642            _reserved: Default::default(),
643        }];
644        let mut buffer = Vec::new();
645        let writer = BufWriter::new(&mut buffer);
646        Encoder::builder(writer)
647            .delimiter(sep)
648            .build()
649            .unwrap()
650            .encode_records(data.as_slice())
651            .unwrap();
652        let line = extract_2nd_line(buffer);
653        let sep = sep as char;
654        assert_eq!(
655            line,
656            format!(
657                "1658441891000000000{sep}{}{sep}1{sep}2{sep}3{sep}Y{sep}Y{sep}~",
658                header(sep)
659            )
660        );
661    }
662
663    #[rstest]
664    #[case::csv(b',')]
665    #[case::tsv(b'\t')]
666    fn test_instrument_def_encode_stream(#[case] sep: u8) {
667        let data = vec![InstrumentDefMsg {
668            hd: RECORD_HEADER,
669            ts_recv: 1658441891000000000,
670            min_price_increment: 100,
671            display_factor: 1000,
672            expiration: 1698450000000000000,
673            activation: 1697350000000000000,
674            high_limit_price: 1_000_000,
675            low_limit_price: -1_000_000,
676            max_price_variation: 0,
677            unit_of_measure_qty: 5,
678            min_price_increment_amount: 5,
679            price_ratio: 10,
680            inst_attrib_value: 10,
681            underlying_id: 256785,
682            raw_instrument_id: RECORD_HEADER.instrument_id as u64,
683            market_depth_implied: 0,
684            market_depth: 13,
685            market_segment_id: 0,
686            max_trade_vol: 10_000,
687            min_lot_size: 1,
688            min_lot_size_block: 1000,
689            min_lot_size_round_lot: 100,
690            min_trade_vol: 1,
691            contract_multiplier: 0,
692            decay_quantity: 0,
693            original_contract_size: 0,
694            appl_id: 0,
695            maturity_year: 0,
696            decay_start_date: 0,
697            channel_id: 4,
698            currency: str_to_c_chars("USD").unwrap(),
699            settl_currency: str_to_c_chars("USD").unwrap(),
700            secsubtype: Default::default(),
701            raw_symbol: str_to_c_chars("ESZ4 C4100").unwrap(),
702            group: str_to_c_chars("EW").unwrap(),
703            exchange: str_to_c_chars("XCME").unwrap(),
704            asset: str_to_c_chars("ES").unwrap(),
705            cfi: str_to_c_chars("OCAFPS").unwrap(),
706            security_type: str_to_c_chars("OOF").unwrap(),
707            unit_of_measure: str_to_c_chars("IPNT").unwrap(),
708            underlying: str_to_c_chars("ESZ4").unwrap(),
709            strike_price_currency: str_to_c_chars("USD").unwrap(),
710            instrument_class: InstrumentClass::Call as u8 as c_char,
711            strike_price: 4_100_000_000_000,
712            match_algorithm: 'F' as c_char,
713            main_fraction: 4,
714            price_display_format: 8,
715            sub_fraction: 23,
716            underlying_product: 10,
717            security_update_action: SecurityUpdateAction::Add as c_char,
718            maturity_month: 8,
719            maturity_day: 9,
720            maturity_week: 11,
721            user_defined_instrument: UserDefinedInstrument::No as c_char,
722            contract_multiplier_unit: 0,
723            flow_schedule_type: 5,
724            tick_rule: 0,
725            leg_count: 2,
726            leg_index: 1,
727            leg_instrument_id: 24,
728            leg_underlying_id: 25,
729            leg_instrument_class: InstrumentClass::Future as c_char,
730            leg_ratio_qty_numerator: 1,
731            leg_ratio_qty_denominator: 2,
732            ..Default::default()
733        }];
734        let mut buffer = Vec::new();
735        let writer = BufWriter::new(&mut buffer);
736        Encoder::builder(writer)
737            .delimiter(sep)
738            .build()
739            .unwrap()
740            .encode_stream(VecStream::new(data))
741            .unwrap();
742        let line = extract_2nd_line(buffer);
743        let sep = sep as char;
744        assert_eq!(line, format!("1658441891000000000{sep}{}{sep}ESZ4 C4100{sep}A{sep}C{sep}100{sep}\
745                                 1000{sep}1698450000000000000{sep}1697350000000000000{sep}1000000\
746                                 {sep}-1000000{sep}0{sep}5{sep}5{sep}10{sep}10{sep}256785\
747                                 {sep}323{sep}0{sep}13{sep}0{sep}10000{sep}1{sep}1000{sep}100{sep}1\
748                                 {sep}0{sep}0{sep}0{sep}0{sep}0{sep}0{sep}4{sep}USD{sep}USD\
749                                 {sep}{sep}EW{sep}XCME{sep}ES{sep}OCAFPS{sep}OOF{sep}IPNT{sep}ESZ4{sep}\
750                                 USD{sep}4100000000000{sep}F{sep}4{sep}8{sep}23{sep}10\
751                                 {sep}8{sep}9{sep}11{sep}N{sep}0{sep}5{sep}0{sep}2{sep}1{sep}24{sep}\
752                                 {sep}F{sep}N{sep}{UNDEF_PRICE}{sep}{UNDEF_PRICE}{sep}0{sep}0{sep}1{sep}2{sep}25", header(sep)));
753    }
754
755    #[rstest]
756    #[case::csv(b',')]
757    #[case::tsv(b'\t')]
758    fn test_encode_with_ts_out(#[case] sep: u8) {
759        let data = vec![WithTsOut {
760            rec: TradeMsg {
761                hd: RECORD_HEADER,
762                price: 5500,
763                size: 3,
764                action: 'T' as c_char,
765                side: 'A' as c_char,
766                flags: 128.into(),
767                depth: 9,
768                ts_recv: 1658441891000000000,
769                ts_in_delta: 22_000,
770                sequence: 1_002_375,
771            },
772            ts_out: 1678480044000000000,
773        }];
774        let mut buffer = Vec::new();
775        let writer = BufWriter::new(&mut buffer);
776        Encoder::builder(writer)
777            .delimiter(sep)
778            .build()
779            .unwrap()
780            .encode_records(data.as_slice())
781            .unwrap();
782        let lines = String::from_utf8(buffer).expect("valid UTF-8");
783        let sep = sep as char;
784        assert_eq!(
785            lines,
786            format!("ts_recv{sep}ts_event{sep}rtype{sep}publisher_id{sep}instrument_id{sep}action\
787                    {sep}side{sep}depth{sep}price{sep}size{sep}flags{sep}ts_in_delta{sep}sequence\
788                    {sep}ts_out\n1658441891000000000{sep}{}{sep}T{sep}A{sep}9{sep}5500{sep}3{sep}128\
789                    {sep}22000{sep}1002375{sep}1678480044000000000\n", header(sep))
790        );
791    }
792
793    #[rstest]
794    #[case::csv(b',')]
795    #[case::tsv(b'\t')]
796    fn test_imbalance_encode_records(#[case] sep: u8) {
797        let data = vec![ImbalanceMsg {
798            hd: RECORD_HEADER,
799            ts_recv: 1,
800            ref_price: 2,
801            auction_time: 3,
802            cont_book_clr_price: 4,
803            auct_interest_clr_price: 5,
804            ssr_filling_price: 6,
805            ind_match_price: 7,
806            upper_collar: 8,
807            lower_collar: 9,
808            paired_qty: 10,
809            total_imbalance_qty: 11,
810            market_imbalance_qty: 12,
811            unpaired_qty: 13,
812            auction_type: 'B' as c_char,
813            side: 'A' as c_char,
814            auction_status: 14,
815            freeze_status: 15,
816            num_extensions: 16,
817            unpaired_side: 'A' as c_char,
818            significant_imbalance: 'N' as c_char,
819            _reserved: [0],
820        }];
821        let mut buffer = Vec::new();
822        let writer = BufWriter::new(&mut buffer);
823        Encoder::builder(writer)
824            .delimiter(sep)
825            .build()
826            .unwrap()
827            .encode_records(data.as_slice())
828            .unwrap();
829        let line = extract_2nd_line(buffer);
830        let sep = sep as char;
831        assert_eq!(
832            line,
833            format!(
834                "1{sep}{}{sep}2{sep}3{sep}4{sep}5{sep}6{sep}7{sep}8{sep}9{sep}10{sep}11{sep}12{sep}\
835                13{sep}B{sep}A{sep}14{sep}15{sep}16{sep}A{sep}N",
836                header(sep)
837            )
838        );
839    }
840
841    #[rstest]
842    #[case::csv(b',')]
843    #[case::tsv(b'\t')]
844    fn test_stat_encode_stream(#[case] sep: u8) {
845        let data = vec![StatMsg {
846            hd: RECORD_HEADER,
847            ts_recv: 1,
848            ts_ref: 2,
849            price: 3,
850            quantity: 0,
851            sequence: 4,
852            ts_in_delta: 5,
853            stat_type: StatType::OpeningPrice as u16,
854            channel_id: 7,
855            update_action: StatUpdateAction::New as u8,
856            stat_flags: 0,
857            _reserved: Default::default(),
858        }];
859        let mut buffer = Vec::new();
860        let writer = BufWriter::new(&mut buffer);
861        Encoder::builder(writer)
862            .delimiter(sep)
863            .build()
864            .unwrap()
865            .encode_stream(VecStream::new(data))
866            .unwrap();
867        let line = extract_2nd_line(buffer);
868        let sep = sep as char;
869        assert_eq!(
870            line,
871            format!(
872                "1{sep}{}{sep}2{sep}3{sep}0{sep}4{sep}5{sep}1{sep}7{sep}1{sep}0",
873                header(sep)
874            )
875        );
876    }
877
878    #[test]
879    fn test_encode_ref_with_sym() {
880        let mut buffer = Vec::new();
881        const BAR: OhlcvMsg = OhlcvMsg {
882            hd: RecordHeader::new::<OhlcvMsg>(rtype::OHLCV_1H, 10, 9, 0),
883            open: 175 * FIXED_PRICE_SCALE,
884            high: 177 * FIXED_PRICE_SCALE,
885            low: 174 * FIXED_PRICE_SCALE,
886            close: 175 * FIXED_PRICE_SCALE,
887            volume: 4033445,
888        };
889        let rec_ref = RecordRef::from(&BAR);
890        let mut encoder = Encoder::builder(&mut buffer)
891            .use_pretty_px(false)
892            .use_pretty_ts(false)
893            .write_header(false)
894            .build()
895            .unwrap();
896        encoder.encode_ref_with_sym(rec_ref, None).unwrap();
897        encoder.encode_ref_with_sym(rec_ref, Some("AAPL")).unwrap();
898        drop(encoder);
899        let res = String::from_utf8(buffer).unwrap();
900        assert_eq!(
901            res,
902            "0,34,10,9,175000000000,177000000000,174000000000,175000000000,4033445,\n\
903            0,34,10,9,175000000000,177000000000,174000000000,175000000000,4033445,AAPL\n"
904        );
905    }
906
907    #[test]
908    fn test_encode_header_for_schema() {
909        let mut buffer = Vec::new();
910        {
911            let mut encoder = Encoder::new(&mut buffer, false, false);
912            encoder
913                .encode_header_for_schema(DBN_VERSION, Schema::Statistics, false, false)
914                .unwrap();
915        }
916        {
917            let mut encoder = Encoder::new(&mut buffer, false, false);
918            encoder
919                .encode_header_for_schema(DBN_VERSION, Schema::Statistics, true, true)
920                .unwrap();
921        }
922
923        let res = String::from_utf8(buffer).unwrap();
924        let (fst_line, snd_line) = res.split_once('\n').unwrap();
925        assert!(snd_line.ends_with(",symbol\n"));
926        let orig_header = snd_line.split_once(",ts_out,symbol").unwrap().0;
927        assert_eq!(fst_line, orig_header);
928    }
929}