1use crate::{
4 error::EdifactError,
5 model::{Element, OwnedSegment, Segment, Span},
6 tokenizer::{Token, Tokenizer},
7};
8use memchr::memchr2;
9use smallvec::SmallVec;
10use std::borrow::Cow;
11use std::io::{BufRead, BufReader, Read};
12
13fn finish_element<'a>(
14 elements: &mut Vec<Element<'a>>,
15 current_components: &mut SmallVec<[(Cow<'a, str>, Span); 4]>,
16 current_element_start: &mut Option<usize>,
17) {
18 if let (Some(start), Some((_, last_span))) =
19 (current_element_start.take(), current_components.last())
20 {
21 let last_end = last_span.end;
22 elements.push(Element {
23 span: Span::new(start, last_end),
24 components: std::mem::take(current_components),
25 });
26 }
27}
28
29fn resolve_release(
30 val: &str,
31 release_char: char,
32 start_offset: usize,
33) -> Result<Cow<'_, str>, EdifactError> {
34 if !val.contains(release_char) {
35 return Ok(Cow::Borrowed(val));
36 }
37 resolve_release_owned(val, release_char, start_offset).map(Cow::Owned)
38}
39
40fn resolve_release_owned(
41 val: &str,
42 release_char: char,
43 start_offset: usize,
44) -> Result<String, EdifactError> {
45 let mut out = String::with_capacity(val.len());
46 let mut chars = val.chars();
47 while let Some(c) = chars.next() {
48 if c == release_char {
49 if let Some(escaped) = chars.next() {
50 out.push(escaped);
51 } else {
52 return Err(EdifactError::InvalidReleaseSequence {
53 offset: start_offset + val.len().saturating_sub(1),
54 });
55 }
56 } else {
57 out.push(c);
58 }
59 }
60 Ok(out)
61}
62
63pub struct Parser<'a> {
68 tokenizer: Tokenizer<'a>,
69 peeked: Option<Token<'a>>,
71 release_char: char,
73}
74
75impl<'a> Parser<'a> {
76 pub fn new(tokenizer: Tokenizer<'a>) -> Self {
78 let release_char = tokenizer.service_string_advice().release_char as char;
79 Self {
80 tokenizer,
81 peeked: None,
82 release_char,
83 }
84 }
85}
86
87impl<'a> Iterator for Parser<'a> {
88 type Item = Result<Segment<'a>, EdifactError>;
89
90 fn next(&mut self) -> Option<Self::Item> {
91 let (tag, tag_span) = loop {
93 let tok = match self.peeked.take() {
94 Some(t) => Ok(t),
95 None => self.tokenizer.next()?,
96 };
97 match tok {
98 Ok(Token::SegmentTag { value, span }) => break (value, span),
99 Ok(Token::SegmentTerminator { .. }) => continue, Ok(Token::DataElement { span, .. }) | Ok(Token::ComponentElement { span, .. }) => {
101 return Some(Err(EdifactError::UnexpectedDataToken {
102 offset: span.start,
103 }));
104 }
105 Err(e) => return Some(Err(e)),
106 }
107 };
108
109 let mut elements: Vec<Element<'a>> = Vec::with_capacity(8);
110 let mut current_components: SmallVec<[(Cow<'a, str>, Span); 4]> = SmallVec::new();
111 let mut current_element_start: Option<usize> = None;
112 let mut in_element = false;
113 let mut segment_end = tag_span.end;
114
115 loop {
116 let tok = match self.tokenizer.next() {
117 Some(Ok(t)) => t,
118 Some(Err(e)) => return Some(Err(e)),
119 None => {
120 if in_element {
122 finish_element(
123 &mut elements,
124 &mut current_components,
125 &mut current_element_start,
126 );
127 if let Some(last) = elements.last() {
128 segment_end = last.span.end;
129 }
130 }
131 break;
132 }
133 };
134
135 match tok {
136 Token::SegmentTag {
137 value: next_tag,
138 span,
139 } => {
140 self.peeked = Some(Token::SegmentTag {
142 value: next_tag,
143 span,
144 });
145 if in_element {
146 finish_element(
147 &mut elements,
148 &mut current_components,
149 &mut current_element_start,
150 );
151 if let Some(last) = elements.last() {
152 segment_end = last.span.end;
153 }
154 }
155 break;
156 }
157 Token::SegmentTerminator { span } => {
158 if in_element {
159 finish_element(
160 &mut elements,
161 &mut current_components,
162 &mut current_element_start,
163 );
164 }
165 segment_end = span.end;
166 break;
167 }
168 Token::DataElement { value, span } => {
169 if in_element {
170 finish_element(
171 &mut elements,
172 &mut current_components,
173 &mut current_element_start,
174 );
175 }
176 let resolved = match resolve_release(value, self.release_char, span.start) {
177 Ok(v) => v,
178 Err(error) => return Some(Err(error)),
179 };
180 current_components.push((resolved, span));
181 current_element_start = Some(span.start);
182 in_element = true;
183 }
184 Token::ComponentElement { value, span } => {
185 if !in_element {
186 in_element = true;
188 current_element_start = Some(span.start);
189 }
190 let resolved = match resolve_release(value, self.release_char, span.start) {
191 Ok(v) => v,
192 Err(error) => return Some(Err(error)),
193 };
194 current_components.push((resolved, span));
195 }
196 }
197 }
198
199 Some(Ok(Segment {
200 tag,
201 span: Span::new(tag_span.start, segment_end),
202 tag_span,
203 elements,
204 }))
205 }
206}
207
208pub fn from_reader<R: Read>(reader: R) -> Result<Vec<OwnedSegment>, EdifactError> {
214 from_reader_stream(reader).collect()
215}
216
217pub fn from_bufread<R: BufRead>(reader: R) -> Result<Vec<OwnedSegment>, EdifactError> {
219 from_bufread_stream(reader).collect()
220}
221
222#[derive(Debug, Clone, Copy)]
238pub struct ReaderConfig {
239 pub max_segment_bytes: usize,
249 pub max_segments: Option<usize>,
258 pub max_input_bytes: Option<u64>,
270 pub max_messages: Option<usize>,
279}
280
281impl Default for ReaderConfig {
282 fn default() -> Self {
283 Self {
284 max_segment_bytes: 65_536,
285 max_segments: None,
286 max_input_bytes: None,
287 max_messages: None,
288 }
289 }
290}
291
292impl ReaderConfig {
293 #[must_use]
295 pub fn max_segment_bytes(mut self, limit: usize) -> Self {
296 self.max_segment_bytes = limit;
297 self
298 }
299
300 #[must_use]
302 pub fn max_segments(mut self, limit: usize) -> Self {
303 self.max_segments = Some(limit);
304 self
305 }
306
307 #[must_use]
309 pub fn max_input_bytes(mut self, limit: u64) -> Self {
310 self.max_input_bytes = Some(limit);
311 self
312 }
313
314 #[must_use]
316 pub fn max_messages(mut self, limit: usize) -> Self {
317 self.max_messages = Some(limit);
318 self
319 }
320}
321
322#[derive(Debug, Clone, Copy, PartialEq, Eq)]
324enum StreamState {
325 Init,
327 Running,
329 Done,
331}
332
333pub struct OwnedSegmentStream<R: BufRead> {
351 reader: R,
352 ssa: crate::tokenizer::ServiceStringAdvice,
353 state: StreamState,
354 stream_offset: u64,
355 config: ReaderConfig,
356 segments_yielded: usize,
358 messages_yielded: usize,
360 in_message: bool,
362 bytes_consumed: u64,
364}
365
366impl<R: BufRead> OwnedSegmentStream<R> {
367 fn new(reader: R) -> Self {
368 Self::with_config(reader, ReaderConfig::default())
369 }
370
371 fn with_config(reader: R, config: ReaderConfig) -> Self {
372 Self {
373 reader,
374 ssa: crate::tokenizer::ServiceStringAdvice::default(),
375 state: StreamState::Init,
376 stream_offset: 0,
377 config,
378 segments_yielded: 0,
379 messages_yielded: 0,
380 in_message: false,
381 bytes_consumed: 0,
382 }
383 }
384}
385
386enum FastSegment {
390 Parsed(OwnedSegment, usize),
392 Skip(usize),
394 NeedMore,
396 Eof,
398 Err(EdifactError),
400}
401
402fn find_unescaped_term(buf: &[u8], term: u8, release: u8) -> Option<usize> {
414 let mut i = 0;
415 while i < buf.len() {
416 let rel = memchr2(release, term, &buf[i..])?;
418 let pos = i + rel;
419 if buf[pos] == release {
420 i = pos + 2;
422 } else {
423 return Some(pos);
425 }
426 }
427 None
428}
429
430fn try_fast_segment<R: BufRead>(
435 reader: &mut R,
436 ssa: crate::tokenizer::ServiceStringAdvice,
437 seg_start: usize,
438 max_segment_bytes: usize,
439) -> FastSegment {
440 let buf = match reader.fill_buf() {
441 Ok(b) => b,
442 Err(e) => return FastSegment::Err(e.into()),
443 };
444
445 if buf.is_empty() {
446 return FastSegment::Eof;
447 }
448
449 let Some(pos) = find_unescaped_term(buf, ssa.segment_term, ssa.release_char) else {
450 return FastSegment::NeedMore;
451 };
452
453 if pos > max_segment_bytes {
456 return FastSegment::Err(EdifactError::SegmentTooLong {
457 offset: seg_start,
458 limit: max_segment_bytes,
459 });
460 }
461
462 let seg_bytes = &buf[..pos];
464
465 if seg_bytes
467 .iter()
468 .all(|&b| matches!(b, b' ' | b'\t' | b'\r' | b'\n'))
469 {
470 return FastSegment::Skip(pos + 1);
471 }
472
473 let tok = Tokenizer::with_limit(&buf[..pos + 1], ssa, max_segment_bytes);
480 let mut parser_iter = Parser::new(tok);
481 match parser_iter.next() {
482 None => FastSegment::Skip(pos + 1),
483 Some(Err(e)) => FastSegment::Err(e),
484 Some(Ok(s)) => FastSegment::Parsed(OwnedSegment::from(s).offset(seg_start), pos + 1),
485 }
486 }
488
489impl<R: BufRead> Iterator for OwnedSegmentStream<R> {
492 type Item = Result<OwnedSegment, EdifactError>;
493
494 fn next(&mut self) -> Option<Self::Item> {
495 if self.state == StreamState::Done {
496 return None;
497 }
498
499 if let Some(max) = self.config.max_segments {
501 if self.segments_yielded >= max {
502 self.state = StreamState::Done;
503 return None;
504 }
505 }
506
507 if let Some(max) = self.config.max_input_bytes {
509 if self.bytes_consumed >= max {
510 self.state = StreamState::Done;
511 return None;
512 }
513 }
514
515 if let Some(max) = self.config.max_messages {
517 if self.messages_yielded >= max {
518 self.state = StreamState::Done;
519 return None;
520 }
521 }
522
523 loop {
524 if self.state == StreamState::Running {
526 let seg_start = self.stream_offset;
527 match try_fast_segment(
528 &mut self.reader,
529 self.ssa,
530 seg_start.min(usize::MAX as u64) as usize,
534 self.config.max_segment_bytes,
535 ) {
536 FastSegment::Parsed(seg, n) => {
537 let n = n as u64;
538 self.reader.consume(n as usize);
539 self.stream_offset += n;
540 self.bytes_consumed = self.stream_offset;
541 self.segments_yielded += 1;
542 if seg.tag == "UNT" {
547 if self.in_message {
548 self.messages_yielded += 1;
549 }
550 self.in_message = false;
551 } else if seg.tag == "UNH" {
552 self.in_message = true;
553 }
554 if let Some(max) = self.config.max_input_bytes {
558 if self.bytes_consumed >= max {
559 self.state = StreamState::Done;
560 }
561 }
562 return Some(Ok(seg));
563 }
564 FastSegment::Skip(n) => {
565 let n = n as u64;
566 self.reader.consume(n as usize);
567 self.stream_offset += n;
568 self.bytes_consumed = self.stream_offset;
569 continue;
570 }
571 FastSegment::Eof => return None,
572 FastSegment::Err(e) => {
573 self.state = StreamState::Done;
574 return Some(Err(e));
575 }
576 FastSegment::NeedMore => {
577 }
579 }
580 }
581
582 let mut scanned = self.state != StreamState::Init;
584 let mut slow_offset: usize = self.stream_offset.min(usize::MAX as u64) as usize;
589 let mut raw = match read_next_raw_segment(
590 &mut self.reader,
591 &mut self.ssa,
592 &mut scanned,
593 &mut slow_offset,
594 self.config.max_segment_bytes,
595 ) {
596 Ok(Some(r)) => r,
597 Ok(None) => return None,
598 Err(e) => {
599 self.state = StreamState::Done;
600 return Some(Err(e));
601 }
602 };
603 self.stream_offset = slow_offset as u64;
604 if scanned {
605 self.state = StreamState::Running;
606 }
607 self.bytes_consumed = self.stream_offset;
608
609 raw.bytes.push(self.ssa.segment_term);
610 let tok = Tokenizer::with_limit(
614 raw.bytes.as_slice(),
615 self.ssa,
616 self.config.max_segment_bytes,
617 );
618 let mut parser_iter = Parser::new(tok);
619 match parser_iter.next() {
620 Some(Ok(s)) => {
621 self.segments_yielded += 1;
622 let seg = OwnedSegment::from(s).offset(raw.start_offset);
623 if seg.tag == "UNT" {
624 if self.in_message {
625 self.messages_yielded += 1;
626 }
627 self.in_message = false;
628 } else if seg.tag == "UNH" {
629 self.in_message = true;
630 }
631 return Some(Ok(seg));
632 }
633 Some(Err(e)) => {
634 self.state = StreamState::Done;
635 return Some(Err(e));
636 }
637 None => {} }
639 }
640 }
641}
642
643pub fn from_bufread_stream<R: BufRead>(reader: R) -> OwnedSegmentStream<R> {
645 OwnedSegmentStream::new(reader)
646}
647
648pub fn from_bufread_stream_with_config<R: BufRead>(
650 reader: R,
651 config: ReaderConfig,
652) -> OwnedSegmentStream<R> {
653 OwnedSegmentStream::with_config(reader, config)
654}
655
656pub fn from_reader_stream<R: Read>(reader: R) -> OwnedSegmentStream<BufReader<R>> {
658 from_bufread_stream(BufReader::new(reader))
659}
660
661pub fn from_reader_with_config<R: Read>(
674 reader: R,
675 config: ReaderConfig,
676) -> OwnedSegmentStream<BufReader<R>> {
677 from_bufread_stream_with_config(BufReader::new(reader), config)
678}
679
680fn read_next_raw_segment<R: BufRead>(
681 reader: &mut R,
682 ssa: &mut crate::tokenizer::ServiceStringAdvice,
683 scanned_header: &mut bool,
684 stream_offset: &mut usize,
685 max_segment_bytes: usize,
686) -> Result<Option<crate::tokenizer::RawSegment>, EdifactError> {
687 loop {
688 let Some((first_offset, first)) = read_next_non_ws_byte(reader, stream_offset)? else {
689 return Ok(None);
690 };
691
692 if !*scanned_header && first == b'U' {
693 let second = read_required_byte(reader, stream_offset)?;
694 let third = read_required_byte(reader, stream_offset)?;
695 if second == b'N' && third == b'A' {
696 let mut una = [0u8; 9];
697 una[0] = b'U';
698 una[1] = b'N';
699 una[2] = b'A';
700 for slot in una.iter_mut().skip(3) {
701 *slot = read_required_byte(reader, stream_offset)?;
702 }
703 *ssa = crate::tokenizer::ServiceStringAdvice {
704 component_sep: una[3],
705 element_sep: una[4],
706 decimal_mark: una[5],
707 release_char: una[6],
708 segment_term: una[8],
709 };
710 if !ssa.is_valid() {
711 return Err(EdifactError::InvalidUna);
712 }
713 *scanned_header = true;
714 continue;
715 }
716
717 *scanned_header = true;
718 return read_remainder_of_segment(
719 reader,
720 ssa,
721 crate::tokenizer::RawSegment {
722 bytes: vec![first, second, third],
723 start_offset: first_offset,
724 },
725 stream_offset,
726 max_segment_bytes,
727 );
728 }
729
730 *scanned_header = true;
731 return read_remainder_of_segment(
732 reader,
733 ssa,
734 crate::tokenizer::RawSegment {
735 bytes: vec![first],
736 start_offset: first_offset,
737 },
738 stream_offset,
739 max_segment_bytes,
740 );
741 }
742}
743
744fn read_remainder_of_segment<R: BufRead>(
745 reader: &mut R,
746 ssa: &crate::tokenizer::ServiceStringAdvice,
747 mut out: crate::tokenizer::RawSegment,
748 stream_offset: &mut usize,
749 max_segment_bytes: usize,
750) -> Result<Option<crate::tokenizer::RawSegment>, EdifactError> {
751 let mut escaped = false;
752 loop {
753 if out.bytes.len() >= max_segment_bytes {
754 return Err(EdifactError::SegmentTooLong {
755 offset: out.start_offset,
756 limit: max_segment_bytes,
757 });
758 }
759 let Some(byte) = read_next_byte(reader, stream_offset)? else {
760 return if out.bytes.is_empty() {
761 Ok(None)
762 } else if escaped {
763 Err(EdifactError::InvalidReleaseSequence {
764 offset: out.start_offset + out.bytes.len().saturating_sub(1),
765 })
766 } else {
767 Err(EdifactError::UnexpectedEof {
768 offset: out.start_offset + out.bytes.len(),
769 })
770 };
771 };
772
773 if !escaped && byte == ssa.segment_term {
774 return Ok(Some(out));
775 }
776
777 if !escaped && byte == ssa.release_char {
778 escaped = true;
779 out.bytes.push(byte);
780 continue;
781 }
782
783 escaped = false;
784 out.bytes.push(byte);
785 }
786}
787
788fn read_next_byte<R: BufRead>(
789 reader: &mut R,
790 stream_offset: &mut usize,
791) -> Result<Option<u8>, EdifactError> {
792 let buf = reader.fill_buf()?;
793 if buf.is_empty() {
794 return Ok(None);
795 }
796
797 let byte = buf[0];
798 reader.consume(1);
799 let next_offset = stream_offset.saturating_add(1);
804 *stream_offset = next_offset;
805 Ok(Some(byte))
806}
807
808fn read_required_byte<R: BufRead>(
809 reader: &mut R,
810 stream_offset: &mut usize,
811) -> Result<u8, EdifactError> {
812 read_next_byte(reader, stream_offset)?.ok_or(EdifactError::UnexpectedEof {
813 offset: *stream_offset,
814 })
815}
816
817fn read_next_non_ws_byte<R: BufRead>(
818 reader: &mut R,
819 stream_offset: &mut usize,
820) -> Result<Option<(usize, u8)>, EdifactError> {
821 loop {
822 let current_offset = *stream_offset;
823 let Some(byte) = read_next_byte(reader, stream_offset)? else {
824 return Ok(None);
825 };
826 if !matches!(byte, b' ' | b'\t' | b'\r' | b'\n') {
827 return Ok(Some((current_offset, byte)));
828 }
829 }
830}
831
832#[cfg(test)]
833mod tests {
834 use super::*;
835 use crate::tokenizer::ServiceStringAdvice;
836
837 fn parse_all(input: &[u8]) -> Vec<Segment<'_>> {
838 let ssa = ServiceStringAdvice::from_bytes(input);
839 let tok = Tokenizer::new(input, ssa);
840 Parser::new(tok)
841 .collect::<Result<Vec<_>, _>>()
842 .expect("parse failed")
843 }
844
845 #[test]
846 fn parses_unb_unz() {
847 let input = b"UNB+UNOA:1+SENDER+RECEIVER+200101:0900+1'UNZ+0+1'";
848 let segs = parse_all(input);
849 assert_eq!(segs.len(), 2);
850 assert_eq!(segs[0].tag, "UNB");
851 assert_eq!(segs[1].tag, "UNZ");
852 assert_eq!(segs[0].tag_span, Span::new(0, 3));
853 assert_eq!(segs[0].span, Span::new(0, 41));
854 }
855
856 #[test]
857 fn element_access() {
858 let input = b"BGM+220+ORDER123+9'";
859 let segs = parse_all(input);
860 assert_eq!(segs[0].element_str(0), Some("220"));
861 assert_eq!(segs[0].element_str(1), Some("ORDER123"));
862 }
863
864 #[test]
865 fn component_access() {
866 let input = b"DTM+137:20200101:102'";
867 let segs = parse_all(input);
868 let dtm = &segs[0];
869 assert_eq!(dtm.get_element(0).unwrap().get_component(0), Some("137"));
870 assert_eq!(
871 dtm.get_element(0).unwrap().get_component(1),
872 Some("20200101")
873 );
874 assert_eq!(dtm.get_element(0).unwrap().get_component(2), Some("102"));
875 }
876
877 #[test]
878 fn release_char_resolved() {
879 let input = b"FTX+AAA++test?+value'";
880 let segs = parse_all(input);
881 assert_eq!(segs[0].element_str(2), Some("test+value"));
882 assert_eq!(
883 segs[0].get_element(2).unwrap().component_span(0),
884 Some(Span::new(9, 20))
885 );
886 }
887
888 #[test]
889 fn reader_path_preserves_custom_una_delimiters() {
890 let input = b"UNA:;.? 'BGM;220;test?;value'";
891 let segments = super::from_bufread(std::io::BufReader::new(std::io::Cursor::new(input)))
892 .expect("reader parse should succeed");
893 let bgm = segments
894 .iter()
895 .find(|segment| segment.tag == "BGM")
896 .expect("BGM segment should be present");
897 assert_eq!(bgm.elements[0].components[0].0, "220");
898 assert_eq!(bgm.elements[1].components[0].0, "test;value");
899 }
900
901 #[test]
902 fn arbitrary_bytes_no_panic() {
903 let garbage: &[u8] = b"\xff\x00\x01\x02ABC+++'''???";
905 let _ = crate::from_bytes(garbage).collect::<Vec<_>>();
906 }
907
908 #[test]
909 fn from_reader_handles_chunk_boundaries() {
910 let input = b"UNA:+.? 'BGM+220+test?+value'UNT+2+1'";
911 let reader = std::io::BufReader::with_capacity(5, std::io::Cursor::new(input));
912 let parsed = from_bufread(reader).expect("reader parsing should succeed");
913 assert_eq!(parsed.len(), 2);
914 assert_eq!(parsed[0].tag, "BGM");
915 assert_eq!(parsed[0].elements[1].components[0].0, "test+value");
916 assert_eq!(parsed[1].tag, "UNT");
917 }
918
919 #[test]
920 fn from_reader_without_una_uses_default_delimiters() {
921 let input = b"BGM+220+X'UNT+2+1'";
922 let parsed =
923 from_reader(std::io::Cursor::new(input)).expect("reader parsing should succeed");
924 assert_eq!(parsed.len(), 2);
925 assert_eq!(parsed[0].tag, "BGM");
926 assert_eq!(parsed[0].elements[0].components[0].0, "220");
927 assert_eq!(parsed[1].span, Span::new(10, 18));
928 }
929
930 #[test]
931 fn dangling_release_sequence_is_error() {
932 let input = b"FTX+AAA++dangling?";
933 let err = crate::from_bytes(input)
934 .collect::<Result<Vec<_>, _>>()
935 .expect_err("expected dangling release to fail");
936
937 assert!(matches!(err, EdifactError::InvalidReleaseSequence { .. }));
938 }
939
940 #[test]
941 fn from_reader_reports_dangling_release_sequence() {
942 let input = b"FTX+AAA++dangling?";
943 let err = from_reader(std::io::Cursor::new(input))
944 .expect_err("expected dangling release from reader path");
945 assert!(matches!(err, EdifactError::InvalidReleaseSequence { .. }));
946 }
947
948 #[test]
949 fn from_reader_rejects_invalid_una() {
950 let input = b"UNA::.? 'BGM:220'";
951 let err = from_reader(std::io::Cursor::new(input))
952 .expect_err("invalid UNA should fail reader parsing");
953 assert!(matches!(err, EdifactError::InvalidUna));
954 }
955}