1use crate::{
4 error::EdifactError,
5 model::{Element, OwnedSegment, Segment, Span},
6 tokenizer::{Token, Tokenizer},
7};
8use memchr::memchr;
9use smallvec::SmallVec;
10use std::borrow::Cow;
11use std::io::{BufRead, BufReader, Read};
12
13fn finish_element<'a>(
14 elements: &mut Vec<Element<'a>>,
15 current_components: &mut SmallVec<[Cow<'a, str>; 4]>,
16 current_component_spans: &mut SmallVec<[Span; 4]>,
17 current_element_start: &mut Option<usize>,
18) {
19 if let (Some(start), Some(last_span)) = (
20 current_element_start.take(),
21 current_component_spans.last().copied(),
22 ) {
23 elements.push(Element {
24 span: Span::new(start, last_span.end),
25 components: std::mem::take(current_components),
26 component_spans: std::mem::take(current_component_spans),
27 });
28 }
29}
30
31fn resolve_release(
32 val: &str,
33 release_char: char,
34 start_offset: usize,
35) -> Result<Cow<'_, str>, EdifactError> {
36 if !val.contains(release_char) {
37 return Ok(Cow::Borrowed(val));
38 }
39 resolve_release_owned(val, release_char, start_offset).map(Cow::Owned)
40}
41
42fn resolve_release_owned(
43 val: &str,
44 release_char: char,
45 start_offset: usize,
46) -> Result<String, EdifactError> {
47 let mut out = String::with_capacity(val.len());
48 let mut chars = val.chars();
49 while let Some(c) = chars.next() {
50 if c == release_char {
51 if let Some(escaped) = chars.next() {
52 out.push(escaped);
53 } else {
54 return Err(EdifactError::InvalidReleaseSequence {
55 offset: start_offset + val.len().saturating_sub(1),
56 });
57 }
58 } else {
59 out.push(c);
60 }
61 }
62 Ok(out)
63}
64
65pub struct Parser<'a> {
70 tokenizer: Tokenizer<'a>,
71 peeked: Option<Token<'a>>,
73 release_char: char,
75}
76
77impl<'a> Parser<'a> {
78 pub fn new(tokenizer: Tokenizer<'a>) -> Self {
80 let release_char = tokenizer.service_string_advice().release_char as char;
81 Self {
82 tokenizer,
83 peeked: None,
84 release_char,
85 }
86 }
87}
88
89impl<'a> Iterator for Parser<'a> {
90 type Item = Result<Segment<'a>, EdifactError>;
91
92 fn next(&mut self) -> Option<Self::Item> {
93 let (tag, tag_span) = loop {
95 let tok = match self.peeked.take() {
96 Some(t) => Ok(t),
97 None => self.tokenizer.next()?,
98 };
99 match tok {
100 Ok(Token::SegmentTag { value, span }) => break (value, span),
101 Ok(Token::SegmentTerminator { .. }) => continue, Ok(_) => continue, Err(e) => return Some(Err(e)),
104 }
105 };
106
107 let mut elements: Vec<Element<'a>> = Vec::with_capacity(8);
108 let mut current_components: SmallVec<[Cow<'a, str>; 4]> = SmallVec::new();
109 let mut current_component_spans: SmallVec<[Span; 4]> = SmallVec::new();
110 let mut current_element_start: Option<usize> = None;
111 let mut in_element = false;
112 let mut segment_end = tag_span.end;
113
114 loop {
115 let tok = match self.tokenizer.next() {
116 Some(Ok(t)) => t,
117 Some(Err(e)) => return Some(Err(e)),
118 None => {
119 if in_element {
121 finish_element(
122 &mut elements,
123 &mut current_components,
124 &mut current_component_spans,
125 &mut current_element_start,
126 );
127 if let Some(last) = elements.last() {
128 segment_end = last.span.end;
129 }
130 }
131 break;
132 }
133 };
134
135 match tok {
136 Token::SegmentTag {
137 value: next_tag,
138 span,
139 } => {
140 self.peeked = Some(Token::SegmentTag {
142 value: next_tag,
143 span,
144 });
145 if in_element {
146 finish_element(
147 &mut elements,
148 &mut current_components,
149 &mut current_component_spans,
150 &mut current_element_start,
151 );
152 if let Some(last) = elements.last() {
153 segment_end = last.span.end;
154 }
155 }
156 break;
157 }
158 Token::SegmentTerminator { span } => {
159 if in_element {
160 finish_element(
161 &mut elements,
162 &mut current_components,
163 &mut current_component_spans,
164 &mut current_element_start,
165 );
166 }
167 segment_end = span.end;
168 break;
169 }
170 Token::DataElement { value, span } => {
171 if in_element {
172 finish_element(
173 &mut elements,
174 &mut current_components,
175 &mut current_component_spans,
176 &mut current_element_start,
177 );
178 }
179 let resolved = match resolve_release(value, self.release_char, span.start) {
180 Ok(v) => v,
181 Err(error) => return Some(Err(error)),
182 };
183 current_components.push(resolved);
184 current_component_spans.push(span);
185 current_element_start = Some(span.start);
186 in_element = true;
187 }
188 Token::ComponentElement { value, span } => {
189 if !in_element {
190 in_element = true;
192 current_element_start = Some(span.start);
193 }
194 let resolved = match resolve_release(value, self.release_char, span.start) {
195 Ok(v) => v,
196 Err(error) => return Some(Err(error)),
197 };
198 current_components.push(resolved);
199 current_component_spans.push(span);
200 }
201 }
202 }
203
204 Some(Ok(Segment {
205 tag,
206 span: Span::new(tag_span.start, segment_end),
207 tag_span,
208 elements,
209 }))
210 }
211}
212
213pub fn from_reader<R: Read>(reader: R) -> Result<Vec<OwnedSegment>, EdifactError> {
219 from_reader_stream(reader).collect()
220}
221
222pub fn from_bufread<R: BufRead>(reader: R) -> Result<Vec<OwnedSegment>, EdifactError> {
224 from_bufread_stream(reader).collect()
225}
226
227#[derive(Debug, Clone)]
243pub struct ReaderConfig {
244 pub max_segment_bytes: usize,
254}
255
256impl Default for ReaderConfig {
257 fn default() -> Self {
258 Self {
259 max_segment_bytes: 65_536,
260 }
261 }
262}
263
264impl ReaderConfig {
265 #[must_use]
267 pub fn max_segment_bytes(mut self, limit: usize) -> Self {
268 self.max_segment_bytes = limit;
269 self
270 }
271}
272
273#[derive(Debug, Clone, Copy, PartialEq, Eq)]
275enum StreamState {
276 Init,
278 Running,
280 Done,
282}
283
284pub struct OwnedSegmentStream<R: BufRead> {
302 reader: R,
303 ssa: crate::tokenizer::ServiceStringAdvice,
304 state: StreamState,
305 stream_offset: usize,
306 config: ReaderConfig,
307}
308
309impl<R: BufRead> OwnedSegmentStream<R> {
310 fn new(reader: R) -> Self {
311 Self::with_config(reader, ReaderConfig::default())
312 }
313
314 fn with_config(reader: R, config: ReaderConfig) -> Self {
315 Self {
316 reader,
317 ssa: crate::tokenizer::ServiceStringAdvice::default(),
318 state: StreamState::Init,
319 stream_offset: 0,
320 config,
321 }
322 }
323}
324
325enum FastSegment {
329 Parsed(OwnedSegment, usize),
331 Skip(usize),
333 NeedMore,
335 Eof,
337 Err(EdifactError),
339}
340
341fn find_unescaped_term(buf: &[u8], term: u8, release: u8) -> Option<usize> {
346 let mut start = 0;
347 loop {
348 let rel = memchr(term, &buf[start..])?;
349 let abs = start + rel;
350 let n = buf[..abs]
352 .iter()
353 .rev()
354 .take_while(|&&b| b == release)
355 .count();
356 if n % 2 == 0 {
357 return Some(abs);
358 }
359 start = abs + 1;
360 }
361}
362
363fn try_fast_segment<R: BufRead>(
368 reader: &mut R,
369 ssa: crate::tokenizer::ServiceStringAdvice,
370 seg_start: usize,
371 max_segment_bytes: usize,
372) -> FastSegment {
373 let buf = match reader.fill_buf() {
374 Ok(b) => b,
375 Err(e) => return FastSegment::Err(e.into()),
376 };
377
378 if buf.is_empty() {
379 return FastSegment::Eof;
380 }
381
382 let Some(pos) = find_unescaped_term(buf, ssa.segment_term, ssa.release_char) else {
383 return FastSegment::NeedMore;
384 };
385
386 if pos > max_segment_bytes {
389 return FastSegment::Err(EdifactError::SegmentTooLong {
390 offset: seg_start,
391 limit: max_segment_bytes,
392 });
393 }
394
395 let seg_bytes = &buf[..pos];
397
398 if seg_bytes
400 .iter()
401 .all(|&b| matches!(b, b' ' | b'\t' | b'\r' | b'\n'))
402 {
403 return FastSegment::Skip(pos + 1);
404 }
405
406 let tok = Tokenizer::new(&buf[..pos + 1], ssa);
410 match Parser::new(tok).collect::<Result<Vec<Segment<'_>>, _>>() {
411 Err(e) => FastSegment::Err(e),
412 Ok(segs) => match segs.into_iter().next() {
413 None => FastSegment::Skip(pos + 1),
414 Some(s) => {
415 FastSegment::Parsed(OwnedSegment::from(s).offset(seg_start), pos + 1)
416 }
417 },
418 }
419 }
421
422impl<R: BufRead> Iterator for OwnedSegmentStream<R> {
425 type Item = Result<OwnedSegment, EdifactError>;
426
427 fn next(&mut self) -> Option<Self::Item> {
428 if self.state == StreamState::Done {
429 return None;
430 }
431
432 loop {
433 if self.state == StreamState::Running {
435 let seg_start = self.stream_offset;
436 match try_fast_segment(&mut self.reader, self.ssa, seg_start, self.config.max_segment_bytes) {
437 FastSegment::Parsed(seg, n) => {
438 self.reader.consume(n);
439 self.stream_offset += n;
440 return Some(Ok(seg));
441 }
442 FastSegment::Skip(n) => {
443 self.reader.consume(n);
444 self.stream_offset += n;
445 continue;
446 }
447 FastSegment::Eof => return None,
448 FastSegment::Err(e) => {
449 self.state = StreamState::Done;
450 return Some(Err(e));
451 }
452 FastSegment::NeedMore => {
453 }
455 }
456 }
457
458 let mut scanned = self.state != StreamState::Init;
460 let mut raw = match read_next_raw_segment(
461 &mut self.reader,
462 &mut self.ssa,
463 &mut scanned,
464 &mut self.stream_offset,
465 self.config.max_segment_bytes,
466 ) {
467 Ok(Some(r)) => r,
468 Ok(None) => return None,
469 Err(e) => {
470 self.state = StreamState::Done;
471 return Some(Err(e));
472 }
473 };
474 if scanned {
475 self.state = StreamState::Running;
476 }
477
478 raw.bytes.push(self.ssa.segment_term);
479 let tok = Tokenizer::new(raw.bytes.as_slice(), self.ssa);
480 match Parser::new(tok).collect::<Result<Vec<Segment<'_>>, _>>() {
481 Ok(segs) => {
482 if let Some(s) = segs.into_iter().next() {
483 return Some(Ok(OwnedSegment::from(s).offset(raw.start_offset)));
484 }
485 }
487 Err(e) => {
488 self.state = StreamState::Done;
489 return Some(Err(e));
490 }
491 }
492 }
493 }
494}
495
496pub fn from_bufread_stream<R: BufRead>(reader: R) -> OwnedSegmentStream<R> {
498 OwnedSegmentStream::new(reader)
499}
500
501pub fn from_bufread_stream_with_config<R: BufRead>(
503 reader: R,
504 config: ReaderConfig,
505) -> OwnedSegmentStream<R> {
506 OwnedSegmentStream::with_config(reader, config)
507}
508
509pub fn from_reader_stream<R: Read>(reader: R) -> OwnedSegmentStream<BufReader<R>> {
511 from_bufread_stream(BufReader::new(reader))
512}
513
514pub fn from_reader_with_config<R: Read>(
527 reader: R,
528 config: ReaderConfig,
529) -> OwnedSegmentStream<BufReader<R>> {
530 from_bufread_stream_with_config(BufReader::new(reader), config)
531}
532
533fn read_next_raw_segment<R: BufRead>(
534 reader: &mut R,
535 ssa: &mut crate::tokenizer::ServiceStringAdvice,
536 scanned_header: &mut bool,
537 stream_offset: &mut usize,
538 max_segment_bytes: usize,
539) -> Result<Option<crate::tokenizer::RawSegment>, EdifactError> {
540 loop {
541 let Some((first_offset, first)) = read_next_non_ws_byte(reader, stream_offset)? else {
542 return Ok(None);
543 };
544
545 if !*scanned_header && first == b'U' {
546 let second = read_required_byte(reader, stream_offset)?;
547 let third = read_required_byte(reader, stream_offset)?;
548 if second == b'N' && third == b'A' {
549 let mut una = [0u8; 9];
550 una[0] = b'U';
551 una[1] = b'N';
552 una[2] = b'A';
553 for slot in una.iter_mut().skip(3) {
554 *slot = read_required_byte(reader, stream_offset)?;
555 }
556 *ssa = crate::tokenizer::ServiceStringAdvice {
557 component_sep: una[3],
558 element_sep: una[4],
559 decimal_mark: una[5],
560 release_char: una[6],
561 segment_term: una[8],
562 };
563 if !ssa.is_valid() {
564 return Err(EdifactError::InvalidUna);
565 }
566 *scanned_header = true;
567 continue;
568 }
569
570 *scanned_header = true;
571 return read_remainder_of_segment(
572 reader,
573 ssa,
574 crate::tokenizer::RawSegment {
575 bytes: vec![first, second, third],
576 start_offset: first_offset,
577 },
578 stream_offset,
579 max_segment_bytes,
580 );
581 }
582
583 *scanned_header = true;
584 return read_remainder_of_segment(
585 reader,
586 ssa,
587 crate::tokenizer::RawSegment {
588 bytes: vec![first],
589 start_offset: first_offset,
590 },
591 stream_offset,
592 max_segment_bytes,
593 );
594 }
595}
596
597fn read_remainder_of_segment<R: BufRead>(
598 reader: &mut R,
599 ssa: &crate::tokenizer::ServiceStringAdvice,
600 mut out: crate::tokenizer::RawSegment,
601 stream_offset: &mut usize,
602 max_segment_bytes: usize,
603) -> Result<Option<crate::tokenizer::RawSegment>, EdifactError> {
604 let mut escaped = false;
605 loop {
606 if out.bytes.len() >= max_segment_bytes {
607 return Err(EdifactError::SegmentTooLong {
608 offset: out.start_offset,
609 limit: max_segment_bytes,
610 });
611 }
612 let Some(byte) = read_next_byte(reader, stream_offset)? else {
613 return if out.bytes.is_empty() {
614 Ok(None)
615 } else if escaped {
616 Err(EdifactError::InvalidReleaseSequence {
617 offset: out.start_offset + out.bytes.len().saturating_sub(1),
618 })
619 } else {
620 Err(EdifactError::UnexpectedEof {
621 offset: out.start_offset + out.bytes.len(),
622 })
623 };
624 };
625
626 if !escaped && byte == ssa.segment_term {
627 return Ok(Some(out));
628 }
629
630 if !escaped && byte == ssa.release_char {
631 escaped = true;
632 out.bytes.push(byte);
633 continue;
634 }
635
636 escaped = false;
637 out.bytes.push(byte);
638 }
639}
640
641fn read_next_byte<R: BufRead>(
642 reader: &mut R,
643 stream_offset: &mut usize,
644) -> Result<Option<u8>, EdifactError> {
645 let buf = reader.fill_buf()?;
646 if buf.is_empty() {
647 return Ok(None);
648 }
649
650 let byte = buf[0];
651 reader.consume(1);
652 *stream_offset += 1;
653 Ok(Some(byte))
654}
655
656fn read_required_byte<R: BufRead>(
657 reader: &mut R,
658 stream_offset: &mut usize,
659) -> Result<u8, EdifactError> {
660 read_next_byte(reader, stream_offset)?.ok_or(EdifactError::UnexpectedEof {
661 offset: *stream_offset,
662 })
663}
664
665fn read_next_non_ws_byte<R: BufRead>(
666 reader: &mut R,
667 stream_offset: &mut usize,
668) -> Result<Option<(usize, u8)>, EdifactError> {
669 loop {
670 let current_offset = *stream_offset;
671 let Some(byte) = read_next_byte(reader, stream_offset)? else {
672 return Ok(None);
673 };
674 if !matches!(byte, b' ' | b'\t' | b'\r' | b'\n') {
675 return Ok(Some((current_offset, byte)));
676 }
677 }
678}
679
680#[cfg(test)]
681mod tests {
682 use super::*;
683 use crate::tokenizer::ServiceStringAdvice;
684
685 fn parse_all(input: &[u8]) -> Vec<Segment<'_>> {
686 let ssa = ServiceStringAdvice::from_bytes(input);
687 let tok = Tokenizer::new(input, ssa);
688 Parser::new(tok)
689 .collect::<Result<Vec<_>, _>>()
690 .expect("parse failed")
691 }
692
693 #[test]
694 fn parses_unb_unz() {
695 let input = b"UNB+UNOA:1+SENDER+RECEIVER+200101:0900+1'UNZ+0+1'";
696 let segs = parse_all(input);
697 assert_eq!(segs.len(), 2);
698 assert_eq!(segs[0].tag, "UNB");
699 assert_eq!(segs[1].tag, "UNZ");
700 assert_eq!(segs[0].tag_span, Span::new(0, 3));
701 assert_eq!(segs[0].span, Span::new(0, 41));
702 }
703
704 #[test]
705 fn element_access() {
706 let input = b"BGM+220+ORDER123+9'";
707 let segs = parse_all(input);
708 assert_eq!(segs[0].element_str(0), Some("220"));
709 assert_eq!(segs[0].element_str(1), Some("ORDER123"));
710 }
711
712 #[test]
713 fn component_access() {
714 let input = b"DTM+137:20200101:102'";
715 let segs = parse_all(input);
716 let dtm = &segs[0];
717 assert_eq!(dtm.get_element(0).unwrap().get_component(0), Some("137"));
718 assert_eq!(
719 dtm.get_element(0).unwrap().get_component(1),
720 Some("20200101")
721 );
722 assert_eq!(dtm.get_element(0).unwrap().get_component(2), Some("102"));
723 }
724
725 #[test]
726 fn release_char_resolved() {
727 let input = b"FTX+AAA++test?+value'";
728 let segs = parse_all(input);
729 assert_eq!(segs[0].element_str(2), Some("test+value"));
730 assert_eq!(
731 segs[0].get_element(2).unwrap().component_span(0),
732 Some(Span::new(9, 20))
733 );
734 }
735
736 #[test]
737 fn reader_path_preserves_custom_una_delimiters() {
738 let input = b"UNA:;.? 'BGM;220;test?;value'";
739 let segments = super::from_bufread(std::io::BufReader::new(std::io::Cursor::new(input)))
740 .expect("reader parse should succeed");
741 let bgm = segments
742 .iter()
743 .find(|segment| segment.tag == "BGM")
744 .expect("BGM segment should be present");
745 assert_eq!(bgm.elements[0].components[0], "220");
746 assert_eq!(bgm.elements[1].components[0], "test;value");
747 }
748
749 #[test]
750 fn arbitrary_bytes_no_panic() {
751 let garbage: &[u8] = b"\xff\x00\x01\x02ABC+++'''???";
753 let _ = crate::from_bytes(garbage).collect::<Vec<_>>();
754 }
755
756 #[test]
757 fn from_reader_handles_chunk_boundaries() {
758 let input = b"UNA:+.? 'BGM+220+test?+value'UNT+2+1'";
759 let reader = std::io::BufReader::with_capacity(5, std::io::Cursor::new(input));
760 let parsed = from_bufread(reader).expect("reader parsing should succeed");
761 assert_eq!(parsed.len(), 2);
762 assert_eq!(parsed[0].tag, "BGM");
763 assert_eq!(parsed[0].elements[1].components[0], "test+value");
764 assert_eq!(parsed[1].tag, "UNT");
765 }
766
767 #[test]
768 fn from_reader_without_una_uses_default_delimiters() {
769 let input = b"BGM+220+X'UNT+2+1'";
770 let parsed =
771 from_reader(std::io::Cursor::new(input)).expect("reader parsing should succeed");
772 assert_eq!(parsed.len(), 2);
773 assert_eq!(parsed[0].tag, "BGM");
774 assert_eq!(parsed[0].elements[0].components[0], "220");
775 assert_eq!(parsed[1].span, Span::new(10, 18));
776 }
777
778 #[test]
779 fn dangling_release_sequence_is_error() {
780 let input = b"FTX+AAA++dangling?";
781 let err = crate::from_bytes(input)
782 .collect::<Result<Vec<_>, _>>()
783 .expect_err("expected dangling release to fail");
784
785 assert!(matches!(err, EdifactError::InvalidReleaseSequence { .. }));
786 }
787
788 #[test]
789 fn from_reader_reports_dangling_release_sequence() {
790 let input = b"FTX+AAA++dangling?";
791 let err = from_reader(std::io::Cursor::new(input))
792 .expect_err("expected dangling release from reader path");
793 assert!(matches!(err, EdifactError::InvalidReleaseSequence { .. }));
794 }
795
796 #[test]
797 fn from_reader_rejects_invalid_una() {
798 let input = b"UNA::.? 'BGM:220'";
799 let err = from_reader(std::io::Cursor::new(input))
800 .expect_err("invalid UNA should fail reader parsing");
801 assert!(matches!(err, EdifactError::InvalidUna));
802 }
803}