pub use crate::model::Delims;
use crate::model::Error;
use std::collections::VecDeque;
use std::io::BufRead;
use tokio::sync::mpsc::{self, Receiver};
const DEFAULT_BUFFER_SIZE: usize = 100;
const DEFAULT_MAX_MESSAGE_SIZE: usize = 1024 * 1024;
#[derive(Debug, Clone, PartialEq)]
pub enum Event {
StartMessage {
delims: Delims,
},
Segment {
id: Vec<u8>,
},
Field {
num: u16,
raw: Vec<u8>,
},
EndMessage,
}
#[derive(Debug, Clone, PartialEq, thiserror::Error)]
pub enum StreamError {
#[error("Message size {actual} exceeds maximum allowed size {max}")]
MessageTooLarge {
actual: usize,
max: usize,
},
#[error("Parse error: {0}")]
ParseError(String),
#[error("Channel error: {0}")]
ChannelError(String),
}
impl From<Error> for StreamError {
fn from(err: Error) -> Self {
StreamError::ParseError(format!("{err:?}"))
}
}
#[derive(Debug, Clone)]
pub struct StreamParserBuilder {
buffer_size: usize,
max_message_size: usize,
}
impl Default for StreamParserBuilder {
fn default() -> Self {
Self {
buffer_size: DEFAULT_BUFFER_SIZE,
max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
}
}
}
impl StreamParserBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
pub fn max_message_size(mut self, size: usize) -> Self {
self.max_message_size = size;
self
}
pub fn build<R: BufRead>(self, reader: R) -> StreamParser<R> {
StreamParser {
reader,
delims: Delims::default(),
read_buf: [0u8; 8192],
read_pos: 0,
read_len: 0,
buffer: Vec::new(),
pre_msh: true,
in_message: false,
event_queue: VecDeque::new(),
max_message_size: self.max_message_size,
current_message_size: 0,
}
}
pub fn build_async(self, data: Vec<u8>) -> AsyncStreamParser {
let (tx, rx) = mpsc::channel(self.buffer_size);
let max_message_size = self.max_message_size;
tokio::spawn(async move {
let cursor = std::io::Cursor::new(data);
let buf_reader = std::io::BufReader::new(cursor);
let mut parser = StreamParser {
reader: buf_reader,
delims: Delims::default(),
read_buf: [0u8; 8192],
read_pos: 0,
read_len: 0,
buffer: Vec::new(),
pre_msh: true,
in_message: false,
event_queue: VecDeque::new(),
max_message_size,
current_message_size: 0,
};
loop {
match parser.next_event() {
Ok(Some(event)) => {
if tx.send(Ok(event)).await.is_err() {
break; }
}
Ok(None) => {
break; }
Err(e) => {
if tx.send(Err(StreamError::from(e))).await.is_err() {
break;
}
break;
}
}
}
});
AsyncStreamParser { receiver: rx }
}
}
pub struct StreamParser<D> {
reader: D,
delims: Delims,
read_buf: [u8; 8192],
read_pos: usize,
read_len: usize,
buffer: Vec<u8>,
pre_msh: bool,
in_message: bool,
event_queue: VecDeque<Event>,
max_message_size: usize,
current_message_size: usize,
}
impl<D: BufRead> StreamParser<D> {
pub fn new(reader: D) -> Self {
Self {
reader,
delims: Delims::default(),
read_buf: [0u8; 8192],
read_pos: 0,
read_len: 0,
buffer: Vec::new(),
pre_msh: true,
in_message: false,
event_queue: VecDeque::new(),
max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
current_message_size: 0,
}
}
pub fn with_max_message_size(reader: D, max_message_size: usize) -> Self {
Self {
reader,
delims: Delims::default(),
read_buf: [0u8; 8192],
read_pos: 0,
read_len: 0,
buffer: Vec::new(),
pre_msh: true,
in_message: false,
event_queue: VecDeque::new(),
max_message_size,
current_message_size: 0,
}
}
pub fn next_event(&mut self) -> Result<Option<Event>, Error> {
if let Some(event) = self.event_queue.pop_front() {
return Ok(Some(event));
}
loop {
if let Some(pos) = self.buffer.iter().position(|&b| b == b'\r') {
let segment_data = self
.buffer
.get(..pos)
.ok_or_else(|| Error::InvalidFieldFormat {
details: "Internal segment buffer position was out of bounds".to_string(),
})?
.to_vec();
self.buffer.drain(..pos.saturating_add(1));
let result = self.process_segment(segment_data)?;
if result.is_some() {
return Ok(result);
}
continue;
}
if self.read_pos >= self.read_len {
match self.reader.read(&mut self.read_buf) {
Ok(0) => {
if self.in_message {
self.in_message = false;
self.pre_msh = true;
self.current_message_size = 0;
return Ok(Some(Event::EndMessage));
}
return Ok(None);
}
Ok(n) => {
self.read_len = n;
self.read_pos = 0;
}
Err(_) => return Err(Error::InvalidCharset),
}
}
let remaining = self
.read_buf
.get(self.read_pos..self.read_len)
.ok_or_else(|| Error::InvalidFieldFormat {
details: "Internal read buffer position was out of bounds".to_string(),
})?;
if let Some(rel_cr_pos) = remaining.iter().position(|&b| b == b'\r') {
let abs_cr_pos = self.read_pos.checked_add(rel_cr_pos).ok_or_else(|| {
Error::InvalidFieldFormat {
details: "Internal read buffer position overflowed".to_string(),
}
})?;
let segment_part =
self.read_buf
.get(self.read_pos..abs_cr_pos)
.ok_or_else(|| Error::InvalidFieldFormat {
details: "Internal read buffer segment was out of bounds".to_string(),
})?;
self.buffer.extend_from_slice(segment_part);
self.read_pos = abs_cr_pos.saturating_add(1);
let segment_data = std::mem::take(&mut self.buffer);
let result = self.process_segment(segment_data)?;
if result.is_some() {
return Ok(result);
}
} else {
self.buffer.extend_from_slice(remaining);
self.read_pos = self.read_len;
if self.buffer.len() > self.max_message_size {
return Err(Error::InvalidFieldFormat {
details: format!(
"Segment size exceeds maximum allowed size {}",
self.max_message_size
),
});
}
}
}
}
fn process_segment(&mut self, segment_data: Vec<u8>) -> Result<Option<Event>, Error> {
let segment_len = segment_data.len().saturating_add(1);
if self.in_message {
self.current_message_size = self.current_message_size.saturating_add(segment_len);
if self.current_message_size > self.max_message_size {
let actual_size = self.current_message_size;
let max_size = self.max_message_size;
self.in_message = false;
self.pre_msh = true;
self.current_message_size = 0;
return Err(Error::InvalidFieldFormat {
details: format!("Message size {actual_size} exceeds maximum {max_size}"),
});
}
}
if segment_data.starts_with(b"MSH") {
let mut end_prev = false;
if self.in_message {
end_prev = true;
}
let new_delims = Delims::parse_from_msh(
std::str::from_utf8(&segment_data).map_err(|_utf8_err| Error::InvalidCharset)?,
)
.map_err(|e| Error::ParseError {
segment_id: "MSH".to_string(),
field_index: 0,
source: Box::new(e),
})?;
self.delims = new_delims.clone();
self.pre_msh = false;
self.in_message = true;
self.current_message_size = segment_len;
let start_event = Event::StartMessage { delims: new_delims };
if end_prev {
self.event_queue.push_back(start_event);
self.generate_msh_field_events(&segment_data)?;
return Ok(Some(Event::EndMessage));
} else {
self.generate_msh_field_events(&segment_data)?;
return Ok(Some(start_event));
}
}
if self.in_message
&& segment_data
.get(..3)
.is_some_and(|id| id.iter().all(u8::is_ascii_alphanumeric))
{
let Some(id) = segment_data.get(..3).map(<[u8]>::to_vec) else {
return Err(Error::InvalidSegmentId);
};
self.generate_field_events(&segment_data)?;
return Ok(Some(Event::Segment { id }));
}
if !self.in_message
&& self.pre_msh
&& segment_data
.get(..3)
.is_some_and(|id| id.iter().all(u8::is_ascii_alphanumeric))
{
self.delims = Delims::default();
self.pre_msh = false;
self.in_message = true;
self.current_message_size = segment_len;
self.generate_field_events(&segment_data)?;
return Ok(Some(Event::StartMessage {
delims: Delims::default(),
}));
}
Ok(None)
}
fn generate_field_events(&mut self, segment_data: &[u8]) -> Result<(), Error> {
if segment_data.len() > 4 {
let Some(fields_data) = segment_data.get(4..) else {
return Err(Error::InvalidFieldFormat {
details: "Segment field data was out of bounds".to_string(),
});
};
let field_sep = self.delims.field as u8;
for (index, field) in fields_data.split(|&b| b == field_sep).enumerate() {
self.event_queue.push_back(Event::Field {
num: field_number(index)?,
raw: field.to_vec(),
});
}
}
Ok(())
}
fn generate_msh_field_events(&mut self, segment_data: &[u8]) -> Result<(), Error> {
if segment_data.len() > 8 {
let Some(fields_data) = segment_data.get(8..) else {
return Err(Error::InvalidFieldFormat {
details: "MSH field data was out of bounds".to_string(),
});
};
let field_sep = self.delims.field as u8;
for (index, field) in fields_data.split(|&b| b == field_sep).enumerate() {
self.event_queue.push_back(Event::Field {
num: field_number(index)?,
raw: field.to_vec(),
});
}
}
Ok(())
}
pub fn current_message_size(&self) -> usize {
self.current_message_size
}
pub fn max_message_size(&self) -> usize {
self.max_message_size
}
pub fn is_in_message(&self) -> bool {
self.in_message
}
pub fn resume_with_data(&mut self, data: &[u8]) {
self.buffer.extend_from_slice(data);
}
pub fn clear_buffer(&mut self) {
self.buffer.clear();
self.read_pos = 0;
self.read_len = 0;
}
}
fn field_number(index: usize) -> Result<u16, Error> {
let one_based = index
.checked_add(1)
.ok_or_else(|| Error::InvalidFieldFormat {
details: "Field index overflowed".to_string(),
})?;
u16::try_from(one_based).map_err(|_int_err| Error::InvalidFieldFormat {
details: format!("Field index {one_based} exceeds u16"),
})
}
pub struct AsyncStreamParser {
receiver: Receiver<Result<Event, StreamError>>,
}
impl AsyncStreamParser {
pub async fn next(&mut self) -> Option<Result<Event, StreamError>> {
self.receiver.recv().await
}
}