1use crate::{
4 error::EdifactError,
5 model::{Element, OwnedSegment, Segment, Span},
6 tokenizer::{Token, Tokenizer},
7};
8use memchr::memchr2;
9use smallvec::SmallVec;
10use std::borrow::Cow;
11use std::io::{BufRead, BufReader, Read};
12
13fn finish_element<'a>(
14 elements: &mut Vec<Element<'a>>,
15 current_components: &mut SmallVec<[Cow<'a, str>; 4]>,
16 current_component_spans: &mut SmallVec<[Span; 4]>,
17 current_element_start: &mut Option<usize>,
18) {
19 if let (Some(start), Some(last_span)) = (
20 current_element_start.take(),
21 current_component_spans.last().copied(),
22 ) {
23 elements.push(Element {
24 span: Span::new(start, last_span.end),
25 components: std::mem::take(current_components),
26 component_spans: std::mem::take(current_component_spans),
27 });
28 }
29}
30
31fn resolve_release(
32 val: &str,
33 release_char: char,
34 start_offset: usize,
35) -> Result<Cow<'_, str>, EdifactError> {
36 if !val.contains(release_char) {
37 return Ok(Cow::Borrowed(val));
38 }
39 resolve_release_owned(val, release_char, start_offset).map(Cow::Owned)
40}
41
42fn resolve_release_owned(
43 val: &str,
44 release_char: char,
45 start_offset: usize,
46) -> Result<String, EdifactError> {
47 let mut out = String::with_capacity(val.len());
48 let mut chars = val.chars();
49 while let Some(c) = chars.next() {
50 if c == release_char {
51 if let Some(escaped) = chars.next() {
52 out.push(escaped);
53 } else {
54 return Err(EdifactError::InvalidReleaseSequence {
55 offset: start_offset + val.len().saturating_sub(1),
56 });
57 }
58 } else {
59 out.push(c);
60 }
61 }
62 Ok(out)
63}
64
65pub struct Parser<'a> {
70 tokenizer: Tokenizer<'a>,
71 peeked: Option<Token<'a>>,
73 release_char: char,
75}
76
77impl<'a> Parser<'a> {
78 pub fn new(tokenizer: Tokenizer<'a>) -> Self {
80 let release_char = tokenizer.service_string_advice().release_char as char;
81 Self {
82 tokenizer,
83 peeked: None,
84 release_char,
85 }
86 }
87}
88
89impl<'a> Iterator for Parser<'a> {
90 type Item = Result<Segment<'a>, EdifactError>;
91
92 fn next(&mut self) -> Option<Self::Item> {
93 let (tag, tag_span) = loop {
95 let tok = match self.peeked.take() {
96 Some(t) => Ok(t),
97 None => self.tokenizer.next()?,
98 };
99 match tok {
100 Ok(Token::SegmentTag { value, span }) => break (value, span),
101 Ok(Token::SegmentTerminator { .. }) => continue, Ok(Token::DataElement { span, .. }) | Ok(Token::ComponentElement { span, .. }) => {
103 return Some(Err(EdifactError::UnexpectedDataToken {
104 offset: span.start,
105 }));
106 }
107 Err(e) => return Some(Err(e)),
108 }
109 };
110
111 let mut elements: Vec<Element<'a>> = Vec::with_capacity(8);
112 let mut current_components: SmallVec<[Cow<'a, str>; 4]> = SmallVec::new();
113 let mut current_component_spans: SmallVec<[Span; 4]> = SmallVec::new();
114 let mut current_element_start: Option<usize> = None;
115 let mut in_element = false;
116 let mut segment_end = tag_span.end;
117
118 loop {
119 let tok = match self.tokenizer.next() {
120 Some(Ok(t)) => t,
121 Some(Err(e)) => return Some(Err(e)),
122 None => {
123 if in_element {
125 finish_element(
126 &mut elements,
127 &mut current_components,
128 &mut current_component_spans,
129 &mut current_element_start,
130 );
131 if let Some(last) = elements.last() {
132 segment_end = last.span.end;
133 }
134 }
135 break;
136 }
137 };
138
139 match tok {
140 Token::SegmentTag {
141 value: next_tag,
142 span,
143 } => {
144 self.peeked = Some(Token::SegmentTag {
146 value: next_tag,
147 span,
148 });
149 if in_element {
150 finish_element(
151 &mut elements,
152 &mut current_components,
153 &mut current_component_spans,
154 &mut current_element_start,
155 );
156 if let Some(last) = elements.last() {
157 segment_end = last.span.end;
158 }
159 }
160 break;
161 }
162 Token::SegmentTerminator { span } => {
163 if in_element {
164 finish_element(
165 &mut elements,
166 &mut current_components,
167 &mut current_component_spans,
168 &mut current_element_start,
169 );
170 }
171 segment_end = span.end;
172 break;
173 }
174 Token::DataElement { value, span } => {
175 if in_element {
176 finish_element(
177 &mut elements,
178 &mut current_components,
179 &mut current_component_spans,
180 &mut current_element_start,
181 );
182 }
183 let resolved = match resolve_release(value, self.release_char, span.start) {
184 Ok(v) => v,
185 Err(error) => return Some(Err(error)),
186 };
187 current_components.push(resolved);
188 current_component_spans.push(span);
189 current_element_start = Some(span.start);
190 in_element = true;
191 }
192 Token::ComponentElement { value, span } => {
193 if !in_element {
194 in_element = true;
196 current_element_start = Some(span.start);
197 }
198 let resolved = match resolve_release(value, self.release_char, span.start) {
199 Ok(v) => v,
200 Err(error) => return Some(Err(error)),
201 };
202 current_components.push(resolved);
203 current_component_spans.push(span);
204 }
205 }
206 }
207
208 Some(Ok(Segment {
209 tag,
210 span: Span::new(tag_span.start, segment_end),
211 tag_span,
212 elements,
213 }))
214 }
215}
216
217pub fn from_reader<R: Read>(reader: R) -> Result<Vec<OwnedSegment>, EdifactError> {
223 from_reader_stream(reader).collect()
224}
225
226pub fn from_bufread<R: BufRead>(reader: R) -> Result<Vec<OwnedSegment>, EdifactError> {
228 from_bufread_stream(reader).collect()
229}
230
231#[derive(Debug, Clone, Copy)]
247pub struct ReaderConfig {
248 pub max_segment_bytes: usize,
258 pub max_segments: Option<usize>,
267 pub max_input_bytes: Option<u64>,
279}
280
281impl Default for ReaderConfig {
282 fn default() -> Self {
283 Self {
284 max_segment_bytes: 65_536,
285 max_segments: None,
286 max_input_bytes: None,
287 }
288 }
289}
290
291impl ReaderConfig {
292 #[must_use]
294 pub fn max_segment_bytes(mut self, limit: usize) -> Self {
295 self.max_segment_bytes = limit;
296 self
297 }
298
299 #[must_use]
301 pub fn max_segments(mut self, limit: usize) -> Self {
302 self.max_segments = Some(limit);
303 self
304 }
305
306 #[must_use]
308 pub fn max_input_bytes(mut self, limit: u64) -> Self {
309 self.max_input_bytes = Some(limit);
310 self
311 }
312}
313
314#[derive(Debug, Clone, Copy, PartialEq, Eq)]
316enum StreamState {
317 Init,
319 Running,
321 Done,
323}
324
325pub struct OwnedSegmentStream<R: BufRead> {
343 reader: R,
344 ssa: crate::tokenizer::ServiceStringAdvice,
345 state: StreamState,
346 stream_offset: usize,
347 config: ReaderConfig,
348 segments_yielded: usize,
350 bytes_consumed: u64,
352}
353
354impl<R: BufRead> OwnedSegmentStream<R> {
355 fn new(reader: R) -> Self {
356 Self::with_config(reader, ReaderConfig::default())
357 }
358
359 fn with_config(reader: R, config: ReaderConfig) -> Self {
360 Self {
361 reader,
362 ssa: crate::tokenizer::ServiceStringAdvice::default(),
363 state: StreamState::Init,
364 stream_offset: 0,
365 config,
366 segments_yielded: 0,
367 bytes_consumed: 0,
368 }
369 }
370}
371
372enum FastSegment {
376 Parsed(OwnedSegment, usize),
378 Skip(usize),
380 NeedMore,
382 Eof,
384 Err(EdifactError),
386}
387
388fn find_unescaped_term(buf: &[u8], term: u8, release: u8) -> Option<usize> {
400 let mut i = 0;
401 while i < buf.len() {
402 let rel = memchr2(release, term, &buf[i..])?;
404 let pos = i + rel;
405 if buf[pos] == release {
406 i = pos + 2;
408 } else {
409 return Some(pos);
411 }
412 }
413 None
414}
415
416fn try_fast_segment<R: BufRead>(
421 reader: &mut R,
422 ssa: crate::tokenizer::ServiceStringAdvice,
423 seg_start: usize,
424 max_segment_bytes: usize,
425) -> FastSegment {
426 let buf = match reader.fill_buf() {
427 Ok(b) => b,
428 Err(e) => return FastSegment::Err(e.into()),
429 };
430
431 if buf.is_empty() {
432 return FastSegment::Eof;
433 }
434
435 let Some(pos) = find_unescaped_term(buf, ssa.segment_term, ssa.release_char) else {
436 return FastSegment::NeedMore;
437 };
438
439 if pos > max_segment_bytes {
442 return FastSegment::Err(EdifactError::SegmentTooLong {
443 offset: seg_start,
444 limit: max_segment_bytes,
445 });
446 }
447
448 let seg_bytes = &buf[..pos];
450
451 if seg_bytes
453 .iter()
454 .all(|&b| matches!(b, b' ' | b'\t' | b'\r' | b'\n'))
455 {
456 return FastSegment::Skip(pos + 1);
457 }
458
459 let tok = Tokenizer::new(&buf[..pos + 1], ssa);
463 let mut parser_iter = Parser::new(tok);
464 match parser_iter.next() {
465 None => FastSegment::Skip(pos + 1),
466 Some(Err(e)) => FastSegment::Err(e),
467 Some(Ok(s)) => FastSegment::Parsed(OwnedSegment::from(s).offset(seg_start), pos + 1),
468 }
469 }
471
472impl<R: BufRead> Iterator for OwnedSegmentStream<R> {
475 type Item = Result<OwnedSegment, EdifactError>;
476
477 fn next(&mut self) -> Option<Self::Item> {
478 if self.state == StreamState::Done {
479 return None;
480 }
481
482 if let Some(max) = self.config.max_segments {
484 if self.segments_yielded >= max {
485 self.state = StreamState::Done;
486 return None;
487 }
488 }
489
490 if let Some(max) = self.config.max_input_bytes {
492 if self.bytes_consumed >= max {
493 self.state = StreamState::Done;
494 return None;
495 }
496 }
497
498 loop {
499 if self.state == StreamState::Running {
501 let seg_start = self.stream_offset;
502 match try_fast_segment(
503 &mut self.reader,
504 self.ssa,
505 seg_start,
506 self.config.max_segment_bytes,
507 ) {
508 FastSegment::Parsed(seg, n) => {
509 self.reader.consume(n);
510 self.stream_offset += n;
511 self.bytes_consumed = self.stream_offset as u64;
512 self.segments_yielded += 1;
513 if let Some(max) = self.config.max_input_bytes {
517 if self.bytes_consumed >= max {
518 self.state = StreamState::Done;
519 }
520 }
521 return Some(Ok(seg));
522 }
523 FastSegment::Skip(n) => {
524 self.reader.consume(n);
525 self.stream_offset += n;
526 self.bytes_consumed = self.stream_offset as u64;
527 continue;
528 }
529 FastSegment::Eof => return None,
530 FastSegment::Err(e) => {
531 self.state = StreamState::Done;
532 return Some(Err(e));
533 }
534 FastSegment::NeedMore => {
535 }
537 }
538 }
539
540 let mut scanned = self.state != StreamState::Init;
542 let mut raw = match read_next_raw_segment(
543 &mut self.reader,
544 &mut self.ssa,
545 &mut scanned,
546 &mut self.stream_offset,
547 self.config.max_segment_bytes,
548 ) {
549 Ok(Some(r)) => r,
550 Ok(None) => return None,
551 Err(e) => {
552 self.state = StreamState::Done;
553 return Some(Err(e));
554 }
555 };
556 if scanned {
557 self.state = StreamState::Running;
558 }
559 self.bytes_consumed = self.stream_offset as u64;
560
561 raw.bytes.push(self.ssa.segment_term);
562 let tok = Tokenizer::new(raw.bytes.as_slice(), self.ssa);
563 let mut parser_iter = Parser::new(tok);
564 match parser_iter.next() {
565 Some(Ok(s)) => {
566 self.segments_yielded += 1;
567 return Some(Ok(OwnedSegment::from(s).offset(raw.start_offset)));
568 }
569 Some(Err(e)) => {
570 self.state = StreamState::Done;
571 return Some(Err(e));
572 }
573 None => {} }
575 }
576 }
577}
578
579pub fn from_bufread_stream<R: BufRead>(reader: R) -> OwnedSegmentStream<R> {
581 OwnedSegmentStream::new(reader)
582}
583
584pub fn from_bufread_stream_with_config<R: BufRead>(
586 reader: R,
587 config: ReaderConfig,
588) -> OwnedSegmentStream<R> {
589 OwnedSegmentStream::with_config(reader, config)
590}
591
592pub fn from_reader_stream<R: Read>(reader: R) -> OwnedSegmentStream<BufReader<R>> {
594 from_bufread_stream(BufReader::new(reader))
595}
596
597pub fn from_reader_with_config<R: Read>(
610 reader: R,
611 config: ReaderConfig,
612) -> OwnedSegmentStream<BufReader<R>> {
613 from_bufread_stream_with_config(BufReader::new(reader), config)
614}
615
616fn read_next_raw_segment<R: BufRead>(
617 reader: &mut R,
618 ssa: &mut crate::tokenizer::ServiceStringAdvice,
619 scanned_header: &mut bool,
620 stream_offset: &mut usize,
621 max_segment_bytes: usize,
622) -> Result<Option<crate::tokenizer::RawSegment>, EdifactError> {
623 loop {
624 let Some((first_offset, first)) = read_next_non_ws_byte(reader, stream_offset)? else {
625 return Ok(None);
626 };
627
628 if !*scanned_header && first == b'U' {
629 let second = read_required_byte(reader, stream_offset)?;
630 let third = read_required_byte(reader, stream_offset)?;
631 if second == b'N' && third == b'A' {
632 let mut una = [0u8; 9];
633 una[0] = b'U';
634 una[1] = b'N';
635 una[2] = b'A';
636 for slot in una.iter_mut().skip(3) {
637 *slot = read_required_byte(reader, stream_offset)?;
638 }
639 *ssa = crate::tokenizer::ServiceStringAdvice {
640 component_sep: una[3],
641 element_sep: una[4],
642 decimal_mark: una[5],
643 release_char: una[6],
644 segment_term: una[8],
645 };
646 if !ssa.is_valid() {
647 return Err(EdifactError::InvalidUna);
648 }
649 *scanned_header = true;
650 continue;
651 }
652
653 *scanned_header = true;
654 return read_remainder_of_segment(
655 reader,
656 ssa,
657 crate::tokenizer::RawSegment {
658 bytes: vec![first, second, third],
659 start_offset: first_offset,
660 },
661 stream_offset,
662 max_segment_bytes,
663 );
664 }
665
666 *scanned_header = true;
667 return read_remainder_of_segment(
668 reader,
669 ssa,
670 crate::tokenizer::RawSegment {
671 bytes: vec![first],
672 start_offset: first_offset,
673 },
674 stream_offset,
675 max_segment_bytes,
676 );
677 }
678}
679
680fn read_remainder_of_segment<R: BufRead>(
681 reader: &mut R,
682 ssa: &crate::tokenizer::ServiceStringAdvice,
683 mut out: crate::tokenizer::RawSegment,
684 stream_offset: &mut usize,
685 max_segment_bytes: usize,
686) -> Result<Option<crate::tokenizer::RawSegment>, EdifactError> {
687 let mut escaped = false;
688 loop {
689 if out.bytes.len() >= max_segment_bytes {
690 return Err(EdifactError::SegmentTooLong {
691 offset: out.start_offset,
692 limit: max_segment_bytes,
693 });
694 }
695 let Some(byte) = read_next_byte(reader, stream_offset)? else {
696 return if out.bytes.is_empty() {
697 Ok(None)
698 } else if escaped {
699 Err(EdifactError::InvalidReleaseSequence {
700 offset: out.start_offset + out.bytes.len().saturating_sub(1),
701 })
702 } else {
703 Err(EdifactError::UnexpectedEof {
704 offset: out.start_offset + out.bytes.len(),
705 })
706 };
707 };
708
709 if !escaped && byte == ssa.segment_term {
710 return Ok(Some(out));
711 }
712
713 if !escaped && byte == ssa.release_char {
714 escaped = true;
715 out.bytes.push(byte);
716 continue;
717 }
718
719 escaped = false;
720 out.bytes.push(byte);
721 }
722}
723
724fn read_next_byte<R: BufRead>(
725 reader: &mut R,
726 stream_offset: &mut usize,
727) -> Result<Option<u8>, EdifactError> {
728 let buf = reader.fill_buf()?;
729 if buf.is_empty() {
730 return Ok(None);
731 }
732
733 let byte = buf[0];
734 reader.consume(1);
735 *stream_offset += 1;
736 Ok(Some(byte))
737}
738
739fn read_required_byte<R: BufRead>(
740 reader: &mut R,
741 stream_offset: &mut usize,
742) -> Result<u8, EdifactError> {
743 read_next_byte(reader, stream_offset)?.ok_or(EdifactError::UnexpectedEof {
744 offset: *stream_offset,
745 })
746}
747
748fn read_next_non_ws_byte<R: BufRead>(
749 reader: &mut R,
750 stream_offset: &mut usize,
751) -> Result<Option<(usize, u8)>, EdifactError> {
752 loop {
753 let current_offset = *stream_offset;
754 let Some(byte) = read_next_byte(reader, stream_offset)? else {
755 return Ok(None);
756 };
757 if !matches!(byte, b' ' | b'\t' | b'\r' | b'\n') {
758 return Ok(Some((current_offset, byte)));
759 }
760 }
761}
762
763#[cfg(test)]
764mod tests {
765 use super::*;
766 use crate::tokenizer::ServiceStringAdvice;
767
768 fn parse_all(input: &[u8]) -> Vec<Segment<'_>> {
769 let ssa = ServiceStringAdvice::from_bytes(input);
770 let tok = Tokenizer::new(input, ssa);
771 Parser::new(tok)
772 .collect::<Result<Vec<_>, _>>()
773 .expect("parse failed")
774 }
775
776 #[test]
777 fn parses_unb_unz() {
778 let input = b"UNB+UNOA:1+SENDER+RECEIVER+200101:0900+1'UNZ+0+1'";
779 let segs = parse_all(input);
780 assert_eq!(segs.len(), 2);
781 assert_eq!(segs[0].tag, "UNB");
782 assert_eq!(segs[1].tag, "UNZ");
783 assert_eq!(segs[0].tag_span, Span::new(0, 3));
784 assert_eq!(segs[0].span, Span::new(0, 41));
785 }
786
787 #[test]
788 fn element_access() {
789 let input = b"BGM+220+ORDER123+9'";
790 let segs = parse_all(input);
791 assert_eq!(segs[0].element_str(0), Some("220"));
792 assert_eq!(segs[0].element_str(1), Some("ORDER123"));
793 }
794
795 #[test]
796 fn component_access() {
797 let input = b"DTM+137:20200101:102'";
798 let segs = parse_all(input);
799 let dtm = &segs[0];
800 assert_eq!(dtm.get_element(0).unwrap().get_component(0), Some("137"));
801 assert_eq!(
802 dtm.get_element(0).unwrap().get_component(1),
803 Some("20200101")
804 );
805 assert_eq!(dtm.get_element(0).unwrap().get_component(2), Some("102"));
806 }
807
808 #[test]
809 fn release_char_resolved() {
810 let input = b"FTX+AAA++test?+value'";
811 let segs = parse_all(input);
812 assert_eq!(segs[0].element_str(2), Some("test+value"));
813 assert_eq!(
814 segs[0].get_element(2).unwrap().component_span(0),
815 Some(Span::new(9, 20))
816 );
817 }
818
819 #[test]
820 fn reader_path_preserves_custom_una_delimiters() {
821 let input = b"UNA:;.? 'BGM;220;test?;value'";
822 let segments = super::from_bufread(std::io::BufReader::new(std::io::Cursor::new(input)))
823 .expect("reader parse should succeed");
824 let bgm = segments
825 .iter()
826 .find(|segment| segment.tag == "BGM")
827 .expect("BGM segment should be present");
828 assert_eq!(bgm.elements[0].components[0], "220");
829 assert_eq!(bgm.elements[1].components[0], "test;value");
830 }
831
832 #[test]
833 fn arbitrary_bytes_no_panic() {
834 let garbage: &[u8] = b"\xff\x00\x01\x02ABC+++'''???";
836 let _ = crate::from_bytes(garbage).collect::<Vec<_>>();
837 }
838
839 #[test]
840 fn from_reader_handles_chunk_boundaries() {
841 let input = b"UNA:+.? 'BGM+220+test?+value'UNT+2+1'";
842 let reader = std::io::BufReader::with_capacity(5, std::io::Cursor::new(input));
843 let parsed = from_bufread(reader).expect("reader parsing should succeed");
844 assert_eq!(parsed.len(), 2);
845 assert_eq!(parsed[0].tag, "BGM");
846 assert_eq!(parsed[0].elements[1].components[0], "test+value");
847 assert_eq!(parsed[1].tag, "UNT");
848 }
849
850 #[test]
851 fn from_reader_without_una_uses_default_delimiters() {
852 let input = b"BGM+220+X'UNT+2+1'";
853 let parsed =
854 from_reader(std::io::Cursor::new(input)).expect("reader parsing should succeed");
855 assert_eq!(parsed.len(), 2);
856 assert_eq!(parsed[0].tag, "BGM");
857 assert_eq!(parsed[0].elements[0].components[0], "220");
858 assert_eq!(parsed[1].span, Span::new(10, 18));
859 }
860
861 #[test]
862 fn dangling_release_sequence_is_error() {
863 let input = b"FTX+AAA++dangling?";
864 let err = crate::from_bytes(input)
865 .collect::<Result<Vec<_>, _>>()
866 .expect_err("expected dangling release to fail");
867
868 assert!(matches!(err, EdifactError::InvalidReleaseSequence { .. }));
869 }
870
871 #[test]
872 fn from_reader_reports_dangling_release_sequence() {
873 let input = b"FTX+AAA++dangling?";
874 let err = from_reader(std::io::Cursor::new(input))
875 .expect_err("expected dangling release from reader path");
876 assert!(matches!(err, EdifactError::InvalidReleaseSequence { .. }));
877 }
878
879 #[test]
880 fn from_reader_rejects_invalid_una() {
881 let input = b"UNA::.? 'BGM:220'";
882 let err = from_reader(std::io::Cursor::new(input))
883 .expect_err("invalid UNA should fail reader parsing");
884 assert!(matches!(err, EdifactError::InvalidUna));
885 }
886}