1use crate::{
4 error::EdifactError,
5 model::{Element, OwnedSegment, Segment, Span},
6 tokenizer::{Token, Tokenizer},
7};
8use memchr::memchr2;
9use smallvec::SmallVec;
10use std::borrow::Cow;
11use std::io::{BufRead, BufReader, Read};
12
13fn finish_element<'a>(
14 elements: &mut Vec<Element<'a>>,
15 current_components: &mut SmallVec<[(Cow<'a, str>, Span); 4]>,
16 current_element_start: &mut Option<usize>,
17) {
18 if let (Some(start), Some((_, last_span))) =
19 (current_element_start.take(), current_components.last())
20 {
21 let last_end = last_span.end;
22 elements.push(Element {
23 span: Span::new(start, last_end),
24 components: std::mem::take(current_components),
25 });
26 }
27}
28
29fn resolve_release(
30 val: &str,
31 release_char: char,
32 start_offset: usize,
33) -> Result<Cow<'_, str>, EdifactError> {
34 if !val.contains(release_char) {
35 return Ok(Cow::Borrowed(val));
36 }
37 resolve_release_owned(val, release_char, start_offset).map(Cow::Owned)
38}
39
40fn resolve_release_owned(
41 val: &str,
42 release_char: char,
43 start_offset: usize,
44) -> Result<String, EdifactError> {
45 let cap = val.len() - val.len() / 4;
51 let mut out = String::with_capacity(cap);
52 let mut chars = val.chars();
53 while let Some(c) = chars.next() {
54 if c == release_char {
55 if let Some(escaped) = chars.next() {
56 out.push(escaped);
57 } else {
58 return Err(EdifactError::InvalidReleaseSequence {
59 offset: start_offset + val.len().saturating_sub(1),
60 });
61 }
62 } else {
63 out.push(c);
64 }
65 }
66 Ok(out)
67}
68
69pub struct Parser<'a> {
74 tokenizer: Tokenizer<'a>,
75 peeked: Option<Token<'a>>,
77 release_char: char,
79}
80
81impl<'a> Parser<'a> {
82 pub fn new(tokenizer: Tokenizer<'a>) -> Self {
84 let release_char = tokenizer.service_string_advice().release_char as char;
85 Self {
86 tokenizer,
87 peeked: None,
88 release_char,
89 }
90 }
91}
92
93impl<'a> Iterator for Parser<'a> {
94 type Item = Result<Segment<'a>, EdifactError>;
95
96 fn next(&mut self) -> Option<Self::Item> {
97 let (tag, tag_span) = loop {
99 let tok = match self.peeked.take() {
100 Some(t) => Ok(t),
101 None => self.tokenizer.next()?,
102 };
103 match tok {
104 Ok(Token::SegmentTag { value, span }) => break (value, span),
105 Ok(Token::SegmentTerminator { .. }) => continue, Ok(Token::DataElement { span, .. }) | Ok(Token::ComponentElement { span, .. }) => {
107 return Some(Err(EdifactError::UnexpectedDataToken {
108 offset: span.start,
109 }));
110 }
111 Err(e) => return Some(Err(e)),
112 }
113 };
114
115 let mut elements: Vec<Element<'a>> = Vec::with_capacity(8);
116 let mut current_components: SmallVec<[(Cow<'a, str>, Span); 4]> = SmallVec::new();
117 let mut current_element_start: Option<usize> = None;
118 let mut in_element = false;
119 let mut segment_end = tag_span.end;
120
121 loop {
122 let tok = match self.tokenizer.next() {
123 Some(Ok(t)) => t,
124 Some(Err(e)) => return Some(Err(e)),
125 None => {
126 if in_element {
128 finish_element(
129 &mut elements,
130 &mut current_components,
131 &mut current_element_start,
132 );
133 if let Some(last) = elements.last() {
134 segment_end = last.span.end;
135 }
136 }
137 break;
138 }
139 };
140
141 match tok {
142 Token::SegmentTag {
143 value: next_tag,
144 span,
145 } => {
146 self.peeked = Some(Token::SegmentTag {
148 value: next_tag,
149 span,
150 });
151 if in_element {
152 finish_element(
153 &mut elements,
154 &mut current_components,
155 &mut current_element_start,
156 );
157 if let Some(last) = elements.last() {
158 segment_end = last.span.end;
159 }
160 }
161 break;
162 }
163 Token::SegmentTerminator { span } => {
164 if in_element {
165 finish_element(
166 &mut elements,
167 &mut current_components,
168 &mut current_element_start,
169 );
170 }
171 segment_end = span.end;
172 break;
173 }
174 Token::DataElement { value, span } => {
175 if in_element {
176 finish_element(
177 &mut elements,
178 &mut current_components,
179 &mut current_element_start,
180 );
181 }
182 let resolved = match resolve_release(value, self.release_char, span.start) {
183 Ok(v) => v,
184 Err(error) => return Some(Err(error)),
185 };
186 current_components.push((resolved, span));
187 current_element_start = Some(span.start);
188 in_element = true;
189 }
190 Token::ComponentElement { value, span } => {
191 if !in_element {
192 in_element = true;
194 current_element_start = Some(span.start);
195 }
196 let resolved = match resolve_release(value, self.release_char, span.start) {
197 Ok(v) => v,
198 Err(error) => return Some(Err(error)),
199 };
200 current_components.push((resolved, span));
201 }
202 }
203 }
204
205 Some(Ok(Segment {
206 tag,
207 span: Span::new(tag_span.start, segment_end),
208 tag_span,
209 elements,
210 }))
211 }
212}
213
214pub fn from_reader<R: Read>(reader: R) -> Result<Vec<OwnedSegment>, EdifactError> {
220 from_reader_stream(reader).collect()
221}
222
223pub fn from_bufread<R: BufRead>(reader: R) -> Result<Vec<OwnedSegment>, EdifactError> {
225 from_bufread_stream(reader).collect()
226}
227
228#[derive(Debug, Clone, Copy)]
244pub struct ReaderConfig {
245 pub max_segment_bytes: usize,
255 pub max_segments: Option<usize>,
264 pub max_input_bytes: Option<u64>,
276 pub max_messages: Option<usize>,
285}
286
287impl Default for ReaderConfig {
288 fn default() -> Self {
289 Self {
290 max_segment_bytes: 65_536,
291 max_segments: None,
292 max_input_bytes: None,
293 max_messages: None,
294 }
295 }
296}
297
298impl ReaderConfig {
299 #[must_use]
301 pub fn max_segment_bytes(mut self, limit: usize) -> Self {
302 self.max_segment_bytes = limit;
303 self
304 }
305
306 #[must_use]
308 pub fn max_segments(mut self, limit: usize) -> Self {
309 self.max_segments = Some(limit);
310 self
311 }
312
313 #[must_use]
315 pub fn max_input_bytes(mut self, limit: u64) -> Self {
316 self.max_input_bytes = Some(limit);
317 self
318 }
319
320 #[must_use]
322 pub fn max_messages(mut self, limit: usize) -> Self {
323 self.max_messages = Some(limit);
324 self
325 }
326}
327
328#[derive(Debug, Clone, Copy, PartialEq, Eq)]
330enum StreamState {
331 Init,
333 Running,
335 Done,
337}
338
339pub struct OwnedSegmentStream<R: BufRead> {
357 reader: R,
358 ssa: crate::tokenizer::ServiceStringAdvice,
359 state: StreamState,
360 stream_offset: u64,
361 config: ReaderConfig,
362 segments_yielded: usize,
364 messages_yielded: usize,
366 in_message: bool,
368 bytes_consumed: u64,
370}
371
372impl<R: BufRead> OwnedSegmentStream<R> {
373 fn new(reader: R) -> Self {
374 Self::with_config(reader, ReaderConfig::default())
375 }
376
377 fn with_config(reader: R, config: ReaderConfig) -> Self {
378 Self {
379 reader,
380 ssa: crate::tokenizer::ServiceStringAdvice::default(),
381 state: StreamState::Init,
382 stream_offset: 0,
383 config,
384 segments_yielded: 0,
385 messages_yielded: 0,
386 in_message: false,
387 bytes_consumed: 0,
388 }
389 }
390}
391
392enum FastSegment {
396 Parsed(OwnedSegment, usize),
398 Skip(usize),
400 NeedMore,
402 Eof,
404 Err(EdifactError),
406}
407
408fn find_unescaped_term(buf: &[u8], term: u8, release: u8) -> Option<usize> {
420 let mut i = 0;
421 while i < buf.len() {
422 let rel = memchr2(release, term, &buf[i..])?;
424 let pos = i + rel;
425 if buf[pos] == release {
426 i = pos + 2;
428 } else {
429 return Some(pos);
431 }
432 }
433 None
434}
435
436fn try_fast_segment<R: BufRead>(
441 reader: &mut R,
442 ssa: crate::tokenizer::ServiceStringAdvice,
443 seg_start: usize,
444 max_segment_bytes: usize,
445) -> FastSegment {
446 let buf = match reader.fill_buf() {
447 Ok(b) => b,
448 Err(e) => return FastSegment::Err(e.into()),
449 };
450
451 if buf.is_empty() {
452 return FastSegment::Eof;
453 }
454
455 let Some(pos) = find_unescaped_term(buf, ssa.segment_term, ssa.release_char) else {
456 return FastSegment::NeedMore;
457 };
458
459 if pos > max_segment_bytes {
462 return FastSegment::Err(EdifactError::SegmentTooLong {
463 offset: seg_start,
464 limit: max_segment_bytes,
465 });
466 }
467
468 let seg_bytes = &buf[..pos];
470
471 if seg_bytes
473 .iter()
474 .all(|&b| matches!(b, b' ' | b'\t' | b'\r' | b'\n'))
475 {
476 return FastSegment::Skip(pos + 1);
477 }
478
479 let tok = Tokenizer::with_limit(&buf[..pos + 1], ssa, max_segment_bytes);
486 let mut parser_iter = Parser::new(tok);
487 match parser_iter.next() {
488 None => FastSegment::Skip(pos + 1),
489 Some(Err(e)) => FastSegment::Err(e),
490 Some(Ok(s)) => FastSegment::Parsed(OwnedSegment::from(s).offset(seg_start), pos + 1),
491 }
492 }
494
495impl<R: BufRead> Iterator for OwnedSegmentStream<R> {
498 type Item = Result<OwnedSegment, EdifactError>;
499
500 fn next(&mut self) -> Option<Self::Item> {
501 if self.state == StreamState::Done {
502 return None;
503 }
504
505 if let Some(max) = self.config.max_segments {
507 if self.segments_yielded >= max {
508 self.state = StreamState::Done;
509 return None;
510 }
511 }
512
513 if let Some(max) = self.config.max_input_bytes {
515 if self.bytes_consumed >= max {
516 self.state = StreamState::Done;
517 return None;
518 }
519 }
520
521 if let Some(max) = self.config.max_messages {
523 if self.messages_yielded >= max {
524 self.state = StreamState::Done;
525 return None;
526 }
527 }
528
529 loop {
530 if self.state == StreamState::Running {
532 let seg_start = self.stream_offset;
533 match try_fast_segment(
534 &mut self.reader,
535 self.ssa,
536 seg_start.min(usize::MAX as u64) as usize,
540 self.config.max_segment_bytes,
541 ) {
542 FastSegment::Parsed(seg, n) => {
543 let n = n as u64;
544 self.reader.consume(n as usize);
545 self.stream_offset += n;
546 self.bytes_consumed = self.stream_offset;
547 self.segments_yielded += 1;
548 if seg.tag == "UNT" {
553 if self.in_message {
554 self.messages_yielded += 1;
555 }
556 self.in_message = false;
557 } else if seg.tag == "UNH" {
558 self.in_message = true;
559 }
560 if let Some(max) = self.config.max_input_bytes {
564 if self.bytes_consumed >= max {
565 self.state = StreamState::Done;
566 }
567 }
568 return Some(Ok(seg));
569 }
570 FastSegment::Skip(n) => {
571 let n = n as u64;
572 self.reader.consume(n as usize);
573 self.stream_offset += n;
574 self.bytes_consumed = self.stream_offset;
575 continue;
576 }
577 FastSegment::Eof => return None,
578 FastSegment::Err(e) => {
579 self.state = StreamState::Done;
580 return Some(Err(e));
581 }
582 FastSegment::NeedMore => {
583 }
585 }
586 }
587
588 let mut scanned = self.state != StreamState::Init;
590 let mut slow_offset: usize = self.stream_offset.min(usize::MAX as u64) as usize;
595 let mut raw = match read_next_raw_segment(
596 &mut self.reader,
597 &mut self.ssa,
598 &mut scanned,
599 &mut slow_offset,
600 self.config.max_segment_bytes,
601 ) {
602 Ok(Some(r)) => r,
603 Ok(None) => return None,
604 Err(e) => {
605 self.state = StreamState::Done;
606 return Some(Err(e));
607 }
608 };
609 self.stream_offset = slow_offset as u64;
610 if scanned {
611 self.state = StreamState::Running;
612 }
613 self.bytes_consumed = self.stream_offset;
614
615 raw.bytes.push(self.ssa.segment_term);
616 let tok = Tokenizer::with_limit(
620 raw.bytes.as_slice(),
621 self.ssa,
622 self.config.max_segment_bytes,
623 );
624 let mut parser_iter = Parser::new(tok);
625 match parser_iter.next() {
626 Some(Ok(s)) => {
627 self.segments_yielded += 1;
628 let seg = OwnedSegment::from(s).offset(raw.start_offset);
629 if seg.tag == "UNT" {
630 if self.in_message {
631 self.messages_yielded += 1;
632 }
633 self.in_message = false;
634 } else if seg.tag == "UNH" {
635 self.in_message = true;
636 }
637 return Some(Ok(seg));
638 }
639 Some(Err(e)) => {
640 self.state = StreamState::Done;
641 return Some(Err(e));
642 }
643 None => {} }
645 }
646 }
647}
648
649pub fn from_bufread_stream<R: BufRead>(reader: R) -> OwnedSegmentStream<R> {
651 OwnedSegmentStream::new(reader)
652}
653
654pub fn from_bufread_stream_with_config<R: BufRead>(
656 reader: R,
657 config: ReaderConfig,
658) -> OwnedSegmentStream<R> {
659 OwnedSegmentStream::with_config(reader, config)
660}
661
662pub fn from_reader_stream<R: Read>(reader: R) -> OwnedSegmentStream<BufReader<R>> {
664 from_bufread_stream(BufReader::new(reader))
665}
666
667pub fn from_reader_with_config<R: Read>(
680 reader: R,
681 config: ReaderConfig,
682) -> OwnedSegmentStream<BufReader<R>> {
683 from_bufread_stream_with_config(BufReader::new(reader), config)
684}
685
686fn read_next_raw_segment<R: BufRead>(
687 reader: &mut R,
688 ssa: &mut crate::tokenizer::ServiceStringAdvice,
689 scanned_header: &mut bool,
690 stream_offset: &mut usize,
691 max_segment_bytes: usize,
692) -> Result<Option<crate::tokenizer::RawSegment>, EdifactError> {
693 loop {
694 let Some((first_offset, first)) = read_next_non_ws_byte(reader, stream_offset)? else {
695 return Ok(None);
696 };
697
698 if !*scanned_header && first == b'U' {
699 let second = read_required_byte(reader, stream_offset)?;
700 let third = read_required_byte(reader, stream_offset)?;
701 if second == b'N' && third == b'A' {
702 let mut una = [0u8; 9];
703 una[0] = b'U';
704 una[1] = b'N';
705 una[2] = b'A';
706 for slot in una.iter_mut().skip(3) {
707 *slot = read_required_byte(reader, stream_offset)?;
708 }
709 *ssa = crate::tokenizer::ServiceStringAdvice {
710 component_sep: una[3],
711 element_sep: una[4],
712 decimal_mark: una[5],
713 release_char: una[6],
714 segment_term: una[8],
715 };
716 if !ssa.is_valid() {
717 return Err(EdifactError::InvalidUna);
718 }
719 *scanned_header = true;
720 continue;
721 }
722
723 *scanned_header = true;
724 return read_remainder_of_segment(
725 reader,
726 ssa,
727 crate::tokenizer::RawSegment {
728 bytes: vec![first, second, third],
729 start_offset: first_offset,
730 },
731 stream_offset,
732 max_segment_bytes,
733 );
734 }
735
736 *scanned_header = true;
737 return read_remainder_of_segment(
738 reader,
739 ssa,
740 crate::tokenizer::RawSegment {
741 bytes: vec![first],
742 start_offset: first_offset,
743 },
744 stream_offset,
745 max_segment_bytes,
746 );
747 }
748}
749
750fn read_remainder_of_segment<R: BufRead>(
751 reader: &mut R,
752 ssa: &crate::tokenizer::ServiceStringAdvice,
753 mut out: crate::tokenizer::RawSegment,
754 stream_offset: &mut usize,
755 max_segment_bytes: usize,
756) -> Result<Option<crate::tokenizer::RawSegment>, EdifactError> {
757 let mut escaped = false;
758 loop {
759 if out.bytes.len() >= max_segment_bytes {
760 return Err(EdifactError::SegmentTooLong {
761 offset: out.start_offset,
762 limit: max_segment_bytes,
763 });
764 }
765 let Some(byte) = read_next_byte(reader, stream_offset)? else {
766 return if out.bytes.is_empty() {
767 Ok(None)
768 } else if escaped {
769 Err(EdifactError::InvalidReleaseSequence {
770 offset: out.start_offset + out.bytes.len().saturating_sub(1),
771 })
772 } else {
773 Err(EdifactError::UnexpectedEof {
774 offset: out.start_offset + out.bytes.len(),
775 })
776 };
777 };
778
779 if !escaped && byte == ssa.segment_term {
780 return Ok(Some(out));
781 }
782
783 if !escaped && byte == ssa.release_char {
784 escaped = true;
785 out.bytes.push(byte);
786 continue;
787 }
788
789 escaped = false;
790 out.bytes.push(byte);
791 }
792}
793
794fn read_next_byte<R: BufRead>(
795 reader: &mut R,
796 stream_offset: &mut usize,
797) -> Result<Option<u8>, EdifactError> {
798 let buf = reader.fill_buf()?;
799 if buf.is_empty() {
800 return Ok(None);
801 }
802
803 let byte = buf[0];
804 reader.consume(1);
805 let next_offset = stream_offset.saturating_add(1);
810 *stream_offset = next_offset;
811 Ok(Some(byte))
812}
813
814fn read_required_byte<R: BufRead>(
815 reader: &mut R,
816 stream_offset: &mut usize,
817) -> Result<u8, EdifactError> {
818 read_next_byte(reader, stream_offset)?.ok_or(EdifactError::UnexpectedEof {
819 offset: *stream_offset,
820 })
821}
822
823fn read_next_non_ws_byte<R: BufRead>(
824 reader: &mut R,
825 stream_offset: &mut usize,
826) -> Result<Option<(usize, u8)>, EdifactError> {
827 loop {
828 let current_offset = *stream_offset;
829 let Some(byte) = read_next_byte(reader, stream_offset)? else {
830 return Ok(None);
831 };
832 if !matches!(byte, b' ' | b'\t' | b'\r' | b'\n') {
833 return Ok(Some((current_offset, byte)));
834 }
835 }
836}
837
838#[cfg(test)]
839mod tests {
840 use super::*;
841 use crate::tokenizer::ServiceStringAdvice;
842
843 fn parse_all(input: &[u8]) -> Vec<Segment<'_>> {
844 let ssa = ServiceStringAdvice::from_bytes_unchecked(input);
845 let tok = Tokenizer::new(input, ssa);
846 Parser::new(tok)
847 .collect::<Result<Vec<_>, _>>()
848 .expect("parse failed")
849 }
850
851 #[test]
852 fn parses_unb_unz() {
853 let input = b"UNB+UNOA:1+SENDER+RECEIVER+200101:0900+1'UNZ+0+1'";
854 let segs = parse_all(input);
855 assert_eq!(segs.len(), 2);
856 assert_eq!(segs[0].tag, "UNB");
857 assert_eq!(segs[1].tag, "UNZ");
858 assert_eq!(segs[0].tag_span, Span::new(0, 3));
859 assert_eq!(segs[0].span, Span::new(0, 41));
860 }
861
862 #[test]
863 fn element_access() {
864 let input = b"BGM+220+ORDER123+9'";
865 let segs = parse_all(input);
866 assert_eq!(segs[0].element_str(0), Some("220"));
867 assert_eq!(segs[0].element_str(1), Some("ORDER123"));
868 }
869
870 #[test]
871 fn component_access() {
872 let input = b"DTM+137:20200101:102'";
873 let segs = parse_all(input);
874 let dtm = &segs[0];
875 assert_eq!(dtm.get_element(0).unwrap().get_component(0), Some("137"));
876 assert_eq!(
877 dtm.get_element(0).unwrap().get_component(1),
878 Some("20200101")
879 );
880 assert_eq!(dtm.get_element(0).unwrap().get_component(2), Some("102"));
881 }
882
883 #[test]
884 fn release_char_resolved() {
885 let input = b"FTX+AAA++test?+value'";
886 let segs = parse_all(input);
887 assert_eq!(segs[0].element_str(2), Some("test+value"));
888 assert_eq!(
889 segs[0].get_element(2).unwrap().component_span(0),
890 Some(Span::new(9, 20))
891 );
892 }
893
894 #[test]
895 fn reader_path_preserves_custom_una_delimiters() {
896 let input = b"UNA:;.? 'BGM;220;test?;value'";
897 let segments = super::from_bufread(std::io::BufReader::new(std::io::Cursor::new(input)))
898 .expect("reader parse should succeed");
899 let bgm = segments
900 .iter()
901 .find(|segment| segment.tag == "BGM")
902 .expect("BGM segment should be present");
903 assert_eq!(bgm.elements[0].components[0].0, "220");
904 assert_eq!(bgm.elements[1].components[0].0, "test;value");
905 }
906
907 #[test]
908 fn arbitrary_bytes_no_panic() {
909 let garbage: &[u8] = b"\xff\x00\x01\x02ABC+++'''???";
911 let _ = crate::from_bytes(garbage).collect::<Vec<_>>();
912 }
913
914 #[test]
915 fn from_reader_handles_chunk_boundaries() {
916 let input = b"UNA:+.? 'BGM+220+test?+value'UNT+2+1'";
917 let reader = std::io::BufReader::with_capacity(5, std::io::Cursor::new(input));
918 let parsed = from_bufread(reader).expect("reader parsing should succeed");
919 assert_eq!(parsed.len(), 2);
920 assert_eq!(parsed[0].tag, "BGM");
921 assert_eq!(parsed[0].elements[1].components[0].0, "test+value");
922 assert_eq!(parsed[1].tag, "UNT");
923 }
924
925 #[test]
926 fn from_reader_without_una_uses_default_delimiters() {
927 let input = b"BGM+220+X'UNT+2+1'";
928 let parsed =
929 from_reader(std::io::Cursor::new(input)).expect("reader parsing should succeed");
930 assert_eq!(parsed.len(), 2);
931 assert_eq!(parsed[0].tag, "BGM");
932 assert_eq!(parsed[0].elements[0].components[0].0, "220");
933 assert_eq!(parsed[1].span, Span::new(10, 18));
934 }
935
936 #[test]
937 fn dangling_release_sequence_is_error() {
938 let input = b"FTX+AAA++dangling?";
939 let err = crate::from_bytes(input)
940 .collect::<Result<Vec<_>, _>>()
941 .expect_err("expected dangling release to fail");
942
943 assert!(matches!(err, EdifactError::InvalidReleaseSequence { .. }));
944 }
945
946 #[test]
947 fn from_reader_reports_dangling_release_sequence() {
948 let input = b"FTX+AAA++dangling?";
949 let err = from_reader(std::io::Cursor::new(input))
950 .expect_err("expected dangling release from reader path");
951 assert!(matches!(err, EdifactError::InvalidReleaseSequence { .. }));
952 }
953
954 #[test]
955 fn from_reader_rejects_invalid_una() {
956 let input = b"UNA::.? 'BGM:220'";
957 let err = from_reader(std::io::Cursor::new(input))
958 .expect_err("invalid UNA should fail reader parsing");
959 assert!(matches!(err, EdifactError::InvalidUna));
960 }
961}