1use nohash_hasher::IntMap;
2use std::collections::HashMap;
3use std::convert::TryInto;
4use std::fmt::Debug;
5use std::iter::FusedIterator;
6use std::marker::PhantomData;
7
8use crate::config::{Config, GetConfig};
9use crate::error::DecodeError;
10use crate::field_access::{FieldMap, FieldType, FieldValueError, RepeatingGroup};
11use crate::raw_decoder::{RawDecoder, RawFrame};
12use hotfix_dictionary::{Dictionary, FixDatatype, IsFieldDefinition, TagU32};
13
14#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
23struct FieldLocator {
24 pub tag: TagU32,
25 pub context: FieldLocatorContext,
26}
27
28#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
29pub enum FieldLocatorContext {
30 TopLevel,
31 WithinGroup {
32 index_of_group_tag: u32,
33 entry_index: u32,
34 },
35}
36
37const BEGIN_STRING_OFFSET: usize = 2;
42
43#[derive(Debug)]
47pub struct Decoder {
48 pub(crate) builder: MessageBuilder<'static>,
49 raw_decoder: RawDecoder,
50 tag_lookup: IntMap<u32, FixDatatype>,
51}
52
53impl Decoder {
54 pub fn new(dict: Dictionary) -> Self {
57 Self {
58 builder: MessageBuilder::default(),
59 raw_decoder: RawDecoder::default(),
60 tag_lookup: dict
61 .fields()
62 .iter()
63 .filter_map(|field| {
64 let mut fix_type = field.data_type().basetype();
65 if field.is_num_in_group() {
66 fix_type = FixDatatype::NumInGroup;
67 }
68
69 if fix_type == FixDatatype::Length || fix_type == FixDatatype::NumInGroup {
70 Some((field.tag().get(), fix_type))
71 } else {
72 None
73 }
74 })
75 .collect(),
76 }
77 }
78
79 #[inline]
99 pub fn decode<T>(&mut self, bytes: T) -> Result<Message<T>, DecodeError>
100 where
101 T: AsRef<[u8]>,
102 {
103 let frame = self.raw_decoder.decode(bytes)?;
104 self.from_frame(frame)
105 }
106
107 fn message_builder_mut(&mut self) -> &mut MessageBuilder {
108 unsafe { std::mem::transmute(&mut self.builder) }
109 }
110
111 #[allow(clippy::wrong_self_convention)]
112 pub(crate) fn from_frame<T>(&mut self, frame: RawFrame<T>) -> Result<Message<T>, DecodeError>
113 where
114 T: AsRef<[u8]>,
115 {
116 self.builder.clear();
117 self.message_builder_mut().bytes = frame.as_bytes();
118 let separator = self.config().separator;
119 let payload = frame.payload();
120 self.store_field(
121 TagU32::new(8).unwrap(),
122 frame.as_bytes(),
123 BEGIN_STRING_OFFSET,
124 frame.begin_string().len(),
125 );
126 let mut i = 0;
127 while i < payload.len() {
128 let index_of_next_equal_sign = {
129 let i_eq = payload[i..]
130 .iter()
131 .copied()
132 .position(|byte| byte == b'=')
133 .map(|pos| pos + i);
134 if i_eq.is_none() {
135 break;
136 }
137 i_eq.unwrap()
138 };
139 let field_value_len = if let Some(len) = self.builder.state.data_field_length {
140 self.builder.state.data_field_length = None;
141 len
142 } else {
143 let len = payload[index_of_next_equal_sign + 1..]
144 .iter()
145 .copied()
146 .position(|byte| byte == separator);
147 if len.is_none() {
148 break;
149 }
150 len.unwrap()
151 };
152 let tag_num = {
153 let mut tag = 0u32;
154 for byte in payload[i..index_of_next_equal_sign].iter().copied() {
155 tag = tag * 10 + (byte as u32 - b'0' as u32);
156 }
157 if let Some(tag) = TagU32::new(tag) {
158 tag
159 } else {
160 break;
161 }
162 };
163 self.store_field(
164 tag_num,
165 frame.payload(),
166 index_of_next_equal_sign + 1,
167 field_value_len,
168 );
169 i = index_of_next_equal_sign + 1 + field_value_len + 1;
172 }
173 Ok(Message {
174 builder: self.message_builder_mut(),
175 phantom: PhantomData,
176 field_locator_context: FieldLocatorContext::TopLevel,
177 })
178 }
179
180 fn store_field(
181 &mut self,
182 tag: TagU32,
183 raw_message: &[u8],
184 field_value_start: usize,
185 field_value_len: usize,
186 ) {
187 let config_assoc = self.config().should_decode_associative;
188 let field_value = &raw_message[field_value_start..][..field_value_len];
189 if self.builder.state.new_group.is_some() {
190 self.builder.state.set_new_group(tag);
193 } else if let Some(group_info) = self.builder.state.group_information.last_mut() {
194 if group_info.current_entry_i >= group_info.num_entries {
195 self.builder.state.group_information.pop();
196 } else if tag == group_info.first_tag_of_every_group_entry {
197 group_info.current_entry_i += 1;
198 }
199 }
200 self.message_builder_mut()
201 .add_field(
202 tag,
203 &raw_message[field_value_start..][..field_value_len],
204 config_assoc,
205 )
206 .unwrap();
207 let fix_type = self.tag_lookup.get(&tag.get());
208 if fix_type == Some(&FixDatatype::NumInGroup) {
209 self.builder
210 .state
211 .add_group(tag, self.builder.field_locators.len() - 1, field_value);
212 } else if fix_type == Some(&FixDatatype::Length) {
213 let last_field_locator = self.builder.field_locators.last().unwrap();
215 let last_field = self.builder.fields.get(last_field_locator).unwrap();
216 let last_field_value = last_field.1;
217 let s = std::str::from_utf8(last_field_value).unwrap();
218 let data_field_length = str::parse(s).unwrap();
219 self.builder.state.data_field_length = Some(data_field_length);
220 }
221 }
222}
223
224impl GetConfig for Decoder {
225 type Config = Config;
226
227 fn config(&self) -> &Self::Config {
228 self.raw_decoder.config()
229 }
230
231 fn config_mut(&mut self) -> &mut Self::Config {
232 self.raw_decoder.config_mut()
233 }
234}
235
236#[derive(Debug, Clone)]
238pub struct MessageGroup<'a, T>
239where
240 T: AsRef<[u8]>,
241{
242 message: Message<'a, T>,
243 index_of_group_tag: u32,
244 len: usize,
245}
246
247impl<'a, T> RepeatingGroup for MessageGroup<'a, T>
248where
249 T: AsRef<[u8]> + Clone,
250{
251 type Entry = Message<'a, T>;
252
253 fn len(&self) -> usize {
254 self.len
255 }
256
257 fn is_empty(&self) -> bool {
258 self.len() == 0
259 }
260
261 fn get(&self, i: usize) -> Option<Self::Entry> {
262 if i < self.len {
263 Some(Message {
264 builder: self.message.builder,
265 phantom: PhantomData,
266 field_locator_context: FieldLocatorContext::WithinGroup {
267 index_of_group_tag: self.index_of_group_tag,
268 entry_index: i.try_into().unwrap(),
269 },
270 })
271 } else {
272 None
273 }
274 }
275}
276
277#[derive(Debug, Copy, Clone)]
279pub struct Message<'a, T> {
280 pub(crate) builder: &'a MessageBuilder<'a>,
281 pub(crate) phantom: PhantomData<T>,
282 pub(crate) field_locator_context: FieldLocatorContext,
283}
284
285impl<'a, T> Message<'a, T> {
286 pub fn fields(&'a self) -> Fields<'a, T> {
307 Fields {
308 message: self,
309 i: 0,
310 }
311 }
312
313 pub fn as_bytes(&self) -> &[u8] {
331 self.builder.bytes
332 }
333
334 pub fn len(&self) -> usize {
352 self.builder.field_locators.len()
353 }
354
355 pub fn is_empty(&self) -> bool {
356 self.len() == 0
357 }
358}
359
360impl<'a, T> PartialEq for Message<'a, T> {
361 fn eq(&self, other: &Self) -> bool {
362 self.fields().eq(other.fields())
366 }
367}
368
369impl<'a, T> Eq for Message<'a, T> {}
370
371#[derive(Debug, Copy, Clone)]
372struct DecoderGroupState {
373 first_tag_of_every_group_entry: TagU32,
374 num_entries: usize,
375 current_entry_i: usize,
376 index_of_group_tag: usize,
377}
378
379#[allow(dead_code)]
380#[derive(Debug, Copy, Clone)]
381struct DecoderStateNewGroup {
382 tag: TagU32,
383 index_of_group_tag: usize,
384 num_entries: usize,
385}
386
387#[derive(Debug, Clone)]
388struct DecoderState {
389 group_information: Vec<DecoderGroupState>,
390 new_group: Option<DecoderStateNewGroup>,
391 data_field_length: Option<usize>,
392}
393
394impl DecoderState {
395 fn current_field_locator(&self, tag: TagU32) -> FieldLocator {
396 FieldLocator {
397 tag,
398 context: match self.group_information.last() {
399 Some(group_info) => FieldLocatorContext::WithinGroup {
400 index_of_group_tag: group_info.index_of_group_tag as u32,
401 entry_index: group_info.current_entry_i as u32,
402 },
403 None => FieldLocatorContext::TopLevel,
404 },
405 }
406 }
407
408 fn set_new_group(&mut self, tag: TagU32) {
409 assert!(self.new_group.is_some());
410 let new_group = self.new_group.take().unwrap();
411 self.group_information.push(DecoderGroupState {
412 first_tag_of_every_group_entry: tag,
413 num_entries: new_group.num_entries,
414 current_entry_i: 0,
415 index_of_group_tag: new_group.index_of_group_tag,
416 });
417 }
418
419 fn add_group(&mut self, tag: TagU32, index_of_group_tag: usize, field_value: &[u8]) {
420 let field_value_str = std::str::from_utf8(field_value).unwrap();
421 let num_entries = str::parse(field_value_str).unwrap();
422 if num_entries > 0 {
423 self.new_group = Some(DecoderStateNewGroup {
424 tag,
425 index_of_group_tag,
426 num_entries,
427 });
428 }
429 }
430}
431
432#[allow(dead_code)]
434#[derive(Debug, Clone)]
435pub(crate) struct MessageBuilder<'a> {
436 state: DecoderState,
437 raw: &'a [u8],
438 fields: HashMap<FieldLocator, (TagU32, &'a [u8], usize)>,
439 field_locators: Vec<FieldLocator>,
440 i_first_cell: usize,
441 i_last_cell: usize,
442 len_end_header: usize,
443 len_end_body: usize,
444 len_end_trailer: usize,
445 bytes: &'a [u8],
446}
447
448impl<'a> Default for MessageBuilder<'a> {
449 fn default() -> Self {
450 Self {
451 state: DecoderState {
452 group_information: Vec::new(),
453 new_group: None,
454 data_field_length: None,
455 },
456 raw: b"",
457 field_locators: Vec::new(),
458 fields: HashMap::new(),
459 i_first_cell: 0,
460 i_last_cell: 0,
461 len_end_body: 0,
462 len_end_trailer: 0,
463 len_end_header: 0,
464 bytes: b"",
465 }
466 }
467}
468
469impl<'a> MessageBuilder<'a> {
470 fn clear(&mut self) {
471 *self = Self::default();
472 }
473
474 fn add_field(
475 &mut self,
476 tag: TagU32,
477 field_value: &'a [u8],
478 associative: bool,
479 ) -> Result<(), DecodeError> {
480 let field_locator = self.state.current_field_locator(tag);
481 let i = self.field_locators.len();
482 if associative {
483 self.fields.insert(field_locator, (tag, field_value, i));
484 }
485 self.field_locators.push(field_locator);
486 Ok(())
487 }
488}
489
490#[derive(Debug)]
492pub struct Fields<'a, T> {
493 message: &'a Message<'a, T>,
494 i: usize,
495}
496
497impl<'a, T> ExactSizeIterator for Fields<'a, T> {
498 fn len(&self) -> usize {
499 self.message.len()
500 }
501}
502
503impl<'a, T> FusedIterator for Fields<'a, T> {}
504
505impl<'a, T> Iterator for Fields<'a, T> {
506 type Item = (TagU32, &'a [u8]);
507
508 fn next(&mut self) -> Option<Self::Item> {
509 if self.i == self.message.len() {
510 None
511 } else {
512 let context = self.message.builder.field_locators[self.i];
513 let field = self.message.builder.fields.get(&context).unwrap();
514 self.i += 1;
515 Some((field.0, field.1))
516 }
517 }
518}
519
520impl<'a, T> FieldMap<u32> for Message<'a, T>
521where
522 T: AsRef<[u8]> + Clone,
523{
524 type Group = MessageGroup<'a, T>;
525
526 fn get_raw(&self, tag: u32) -> Option<&[u8]> {
527 let tag = TagU32::new(tag)?;
528 let field_locator = FieldLocator {
529 tag,
530 context: self.field_locator_context,
531 };
532 self.builder.fields.get(&field_locator).map(|field| field.1)
533 }
534
535 fn group(&self, tag: u32) -> Result<Self::Group, FieldValueError<<usize as FieldType>::Error>> {
536 let tag = TagU32::new(tag).ok_or(FieldValueError::Missing)?;
537 let field_locator_of_group_tag = FieldLocator {
538 tag,
539 context: self.field_locator_context,
540 };
541 let num_in_group = self
542 .builder
543 .fields
544 .get(&field_locator_of_group_tag)
545 .ok_or(FieldValueError::Missing)?;
546 let num_entries = usize::deserialize(num_in_group.1).map_err(FieldValueError::Invalid)?;
547 let index_of_group_tag = num_in_group.2 as u32;
548 Ok(MessageGroup {
549 message: Message {
550 builder: self.builder,
551 phantom: PhantomData,
552 field_locator_context: FieldLocatorContext::TopLevel,
553 },
554 index_of_group_tag,
555 len: num_entries,
556 })
557 }
558}
559
560impl<'a, F, T> FieldMap<&F> for Message<'a, T>
561where
562 F: IsFieldDefinition,
563 T: AsRef<[u8]> + Clone,
564{
565 type Group = MessageGroup<'a, T>;
566
567 fn get_raw(&self, field: &F) -> Option<&[u8]> {
568 self.get_raw(field.tag().get())
569 }
570
571 fn group(
572 &self,
573 field: &F,
574 ) -> Result<Self::Group, FieldValueError<<usize as FieldType>::Error>> {
575 self.group(field.tag().get())
576 }
577}
578
579#[cfg(feature = "utils-slog")]
580#[cfg_attr(doc_cfg, doc(cfg(feature = "utils-slog")))]
581impl<'a, T> slog::Value for Message<'a, T>
582where
583 T: AsRef<[u8]>,
584{
585 fn serialize(
586 &self,
587 _rec: &slog::Record,
588 key: slog::Key,
589 serializer: &mut dyn slog::Serializer,
590 ) -> slog::Result {
591 for (tag, _value) in self.fields() {
592 serializer.emit_u32(key, tag.get())?;
593 serializer.emit_char(key, '=')?;
594 serializer.emit_char(key, '?')?;
596 serializer.emit_char(key, '|')?;
597 }
598 Ok(())
599 }
600}
601
602#[allow(dead_code)]
603#[derive(Debug, Clone)]
604pub struct GroupRef<'a, T>
605where
606 T: AsRef<[u8]>,
607{
608 message: &'a Message<'a, T>,
609 len: usize,
610 field_len: u32,
611}
612
613#[allow(dead_code)]
614#[derive(Debug, Clone)]
615pub struct GroupRefIter<'a, T>
616where
617 T: AsRef<[u8]>,
618{
619 group: &'a GroupRef<'a, T>,
620 i: usize,
621}
622
623#[cfg(test)]
624mod test {
625 use super::*;
626
627 const RANDOM_MESSAGES: &[&str] = &[
630 "8=FIX.4.2|9=42|35=0|49=A|56=B|34=12|52=20100304-07:59:30|10=185|",
631 "8=FIX.4.2|9=97|35=6|49=BKR|56=IM|34=14|52=20100204-09:18:42|23=115685|28=N|55=SPMI.MI|54=2|44=2200.75|27=S|25=H|10=248|",
632 "8=FIX.4.4|9=117|35=AD|34=2|49=A|50=1|52=20100219-14:33:32.258|56=B|57=M|263=1|568=1|569=0|580=1|75=20100218|60=20100218-00:00:00.000|10=202|",
633 "8=FIX.4.4|9=94|35=3|34=214|49=A|50=U1|52=20100304-09:42:23.130|56=AB|128=B1|45=176|58=txt|371=15|372=X|373=1|10=058|",
634 "8=FIX.4.4|9=70|35=4|49=A|56=XYZ|34=129|52=20100302-19:38:21|43=Y|57=LOL|123=Y|36=175|10=192|",
635 "8=FIX.4.4|9=122|35=D|34=215|49=CLIENT12|52=20100225-19:41:57.316|56=B|1=Marcel|11=13346|21=1|40=2|44=5|54=1|59=0|60=20100225-19:39:52.020|10=072|",
636 "8=FIX.4.2|9=196|35=X|49=A|56=B|34=12|52=20100318-03:21:11.364|262=A|268=2|279=0|269=0|278=BID|55=EUR/USD|270=1.37215|15=EUR|271=2500000|346=1|279=0|269=1|278=OFFER|55=EUR/USD|270=1.37224|15=EUR|271=2503200|346=1|10=171|",
637 ];
638
639 fn with_soh(msg: &str) -> String {
640 msg.split('|').collect::<Vec<&str>>().join("\x01")
641 }
642
643 fn decoder() -> Decoder {
644 let mut decoder = Decoder::new(Dictionary::fix44());
645 decoder.config_mut().separator = b'|';
646 decoder
647 }
648
649 #[test]
650 fn can_parse_simple_message() {
651 let message = "8=FIX.4.2|9=40|35=D|49=AFUNDMGR|56=ABROKER|15=USD|59=0|10=091|";
652 let mut decoder = decoder();
653 let result = decoder.decode(message.as_bytes());
654 assert!(result.is_ok());
655 }
656
657 #[test]
658 fn skip_checksum_verification() {
659 let message = "8=FIX.FOOBAR|9=5|35=0|10=000|";
660 let mut decoder = decoder();
661 let result = decoder.decode(message.as_bytes());
662 assert!(result.is_ok());
663 }
664
665 #[test]
666 fn repeating_group_entries() {
667 let bytes = b"8=FIX.4.2|9=196|35=X|49=A|56=B|34=12|52=20100318-03:21:11.364|262=A|268=2|279=0|269=0|278=BID|55=EUR/USD|270=1.37215|15=EUR|271=2500000|346=1|279=0|269=1|278=OFFER|55=EUR/USD|270=1.37224|15=EUR|271=2503200|346=1|10=171|";
668 let decoder = &mut decoder();
669 let message = decoder.decode(bytes).unwrap();
670 let group = message.group(268).unwrap();
671 assert_eq!(group.len(), 2);
672 assert_eq!(group.get(0).unwrap().get_raw(278).unwrap(), b"BID" as &[u8]);
673 }
675
676 #[test]
677 fn top_level_tag_after_empty_group() {
678 let bytes = b"8=FIX.4.4|9=17|35=X|268=0|346=1|10=171|";
679 let mut decoder = decoder();
680 let message = decoder.decode(&bytes).unwrap();
681 let group = message.group(268).unwrap();
682 assert_eq!(group.len(), 0);
683 assert_eq!(message.get_raw(346), Some("1".as_bytes()));
684 }
685
686 #[test]
687 fn assortment_of_random_messages_is_ok() {
688 for msg_with_vertical_bar in RANDOM_MESSAGES {
689 let message = with_soh(msg_with_vertical_bar);
690 let mut codec = decoder();
691 codec.config_mut().separator = 0x1;
692 let result = codec.decode(message.as_bytes());
693 result.unwrap();
694 }
695 }
696
697 #[test]
698 fn heartbeat_message_fields_are_ok() {
699 let mut codec = decoder();
700 let message = codec.decode(RANDOM_MESSAGES[0].as_bytes()).unwrap();
701 assert_eq!(message.get(35), Ok(b"0"));
702 assert_eq!(message.get_raw(8), Some(b"FIX.4.2" as &[u8]));
703 assert_eq!(message.get(34), Ok(12));
704 assert_eq!(message.get_raw(34), Some(b"12" as &[u8]));
705 }
706
707 #[test]
708 fn message_without_final_separator() {
709 let mut codec = decoder();
710 let message = "8=FIX.4.4|9=122|35=D|34=215|49=CLIENT12|52=20100225-19:41:57.316|56=B|1=Marcel|11=13346|21=1|40=2|44=5|54=1|59=0|60=20100225-19:39:52.020|10=072";
711 let result = codec.decode(message.as_bytes());
712 assert!(result.is_err());
713 }
714
715 #[test]
716 fn message_must_end_with_separator() {
717 let msg = "8=FIX.4.2|9=41|35=D|49=AFUNDMGR|56=ABROKERt|15=USD|59=0|10=127";
718 let mut codec = decoder();
719 let result = codec.decode(msg.as_bytes());
720 assert!(matches!(result, Err(DecodeError::Invalid)));
721 }
722
723 #[test]
724 fn message_without_checksum() {
725 let msg = "8=FIX.4.4|9=37|35=D|49=AFUNDMGR|56=ABROKERt|15=USD|59=0|";
726 let mut codec = decoder();
727 let result = codec.decode(msg.as_bytes());
728 assert!(matches!(result, Err(DecodeError::Invalid)));
729 }
730
731 #[test]
732 fn message_with_data_field() {
733 let msg =
734 "8=FIX.4.4|9=58|35=D|49=AFUNDMGR|56=ABROKERt|15=USD|39=0|93=8|89=foo|\x01bar|10=000|";
735 let mut codec = decoder();
736 let result = codec.decode(msg.as_bytes()).unwrap();
737 assert_eq!(result.get(93), Ok(8));
738 assert!(matches!(result.get_raw(89), Some(b"foo|\x01bar")));
739 }
740
741 #[test]
742 fn message_without_standard_header() {
743 let msg = "35=D|49=AFUNDMGR|56=ABROKERt|15=USD|59=0|10=000|";
744 let mut codec = decoder();
745 let result = codec.decode(msg.as_bytes());
746 assert!(matches!(result, Err(DecodeError::Invalid)));
747 }
748
749 #[test]
750 fn detect_incorrect_checksum() {
751 let msg = "8=FIX.4.2|9=43|35=D|49=AFUNDMGR|56=ABROKER|15=USD|59=0|10=146|";
752 let mut codec = decoder();
753 let result = codec.decode(msg.as_bytes());
754 assert!(matches!(result, Err(DecodeError::Invalid)));
755 }
756}