#![expect(
clippy::arithmetic_side_effects,
clippy::indexing_slicing,
clippy::manual_let_else,
reason = "pre-existing network codec implementation debt moved from staged microcrate into hl7v2; cleanup is split from topology collapse"
)]
use bytes::{Buf, BufMut, BytesMut};
use tokio_util::codec::{Decoder, Encoder};
const MLLP_START: u8 = 0x0B;
const MLLP_END_1: u8 = 0x1C;
const MLLP_END_2: u8 = 0x0D;
const MAX_FRAME_SIZE: usize = 10 * 1024 * 1024;
#[derive(Debug, Clone, Default)]
pub struct MllpCodec {
max_frame_size: usize,
}
impl MllpCodec {
pub fn new() -> Self {
Self {
max_frame_size: MAX_FRAME_SIZE,
}
}
pub fn with_max_frame_size(max_frame_size: usize) -> Self {
Self { max_frame_size }
}
}
impl Decoder for MllpCodec {
type Item = BytesMut;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < 3 {
return Ok(None);
}
let start_pos = match src.iter().position(|&b| b == MLLP_START) {
Some(pos) => pos,
None => {
src.clear();
return Ok(None);
}
};
if start_pos > 0 {
src.advance(start_pos);
}
let end_pos = src[1..]
.windows(2)
.position(|window| window[0] == MLLP_END_1 && window[1] == MLLP_END_2);
match end_pos {
Some(pos) => {
let content_len = pos;
if content_len > self.max_frame_size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Frame size {} exceeds maximum {}",
content_len, self.max_frame_size
),
));
}
src.advance(1);
let content = src.split_to(content_len);
src.advance(2);
Ok(Some(content))
}
None => {
if src.len() > self.max_frame_size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Buffer size {} exceeds maximum {}",
src.len(),
self.max_frame_size
),
));
}
Ok(None)
}
}
}
}
impl Encoder<BytesMut> for MllpCodec {
type Error = std::io::Error;
fn encode(&mut self, item: BytesMut, dst: &mut BytesMut) -> Result<(), Self::Error> {
if item.len() > self.max_frame_size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Message size {} exceeds maximum {}",
item.len(),
self.max_frame_size
),
));
}
dst.reserve(1 + item.len() + 2);
dst.put_u8(MLLP_START);
dst.put(item);
dst.put_u8(MLLP_END_1);
dst.put_u8(MLLP_END_2);
Ok(())
}
}
impl Encoder<&[u8]> for MllpCodec {
type Error = std::io::Error;
fn encode(&mut self, item: &[u8], dst: &mut BytesMut) -> Result<(), Self::Error> {
if item.len() > self.max_frame_size {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Message size {} exceeds maximum {}",
item.len(),
self.max_frame_size
),
));
}
dst.reserve(1 + item.len() + 2);
dst.put_u8(MLLP_START);
dst.put_slice(item);
dst.put_u8(MLLP_END_1);
dst.put_u8(MLLP_END_2);
Ok(())
}
}
#[cfg(test)]
mod tests {
#![expect(
clippy::indexing_slicing,
clippy::unwrap_used,
reason = "pre-existing network codec test debt moved into hl7v2; cleanup is split from topology collapse"
)]
use super::*;
#[test]
fn test_encode() {
let mut codec = MllpCodec::new();
let mut dst = BytesMut::new();
let msg = BytesMut::from("MSH|^~\\&|TEST\r");
codec.encode(msg, &mut dst).unwrap();
assert_eq!(dst[0], MLLP_START);
assert_eq!(dst[dst.len() - 2], MLLP_END_1);
assert_eq!(dst[dst.len() - 1], MLLP_END_2);
assert_eq!(&dst[1..dst.len() - 2], b"MSH|^~\\&|TEST\r");
}
#[test]
fn test_decode() {
let mut codec = MllpCodec::new();
let mut src = BytesMut::from(&b"\x0BMSH|^~\\&|TEST\r\x1C\x0D"[..]);
let result = codec.decode(&mut src).unwrap();
assert!(result.is_some());
let content = result.unwrap();
assert_eq!(&content[..], b"MSH|^~\\&|TEST\r");
}
#[test]
fn test_decode_incomplete() {
let mut codec = MllpCodec::new();
let mut src = BytesMut::from(&b"\x0BMSH|^~\\&|TEST\r"[..]);
let result = codec.decode(&mut src).unwrap();
assert!(result.is_none());
}
#[test]
fn test_decode_with_junk_before() {
let mut codec = MllpCodec::new();
let mut src = BytesMut::from(&b"JUNK\x0BMSH|^~\\&|TEST\r\x1C\x0D"[..]);
let result = codec.decode(&mut src).unwrap();
assert!(result.is_some());
let content = result.unwrap();
assert_eq!(&content[..], b"MSH|^~\\&|TEST\r");
}
#[test]
fn test_decode_no_start_byte() {
let mut codec = MllpCodec::new();
let mut src = BytesMut::from(&b"MSH|^~\\&|TEST\r\x1C\x0D"[..]);
let result = codec.decode(&mut src).unwrap();
assert!(result.is_none());
assert_eq!(src.len(), 0); }
#[test]
fn test_max_frame_size() {
let mut codec = MllpCodec::with_max_frame_size(10);
let mut dst = BytesMut::new();
let large_msg = BytesMut::from(&b"12345678901"[..]);
let result = codec.encode(large_msg, &mut dst);
assert!(result.is_err());
}
#[test]
fn test_decode_multiple_frames() {
let mut codec = MllpCodec::new();
let mut src = BytesMut::from(&b"\x0BMSG1\r\x1C\x0D\x0BMSG2\r\x1C\x0D"[..]);
let result1 = codec.decode(&mut src).unwrap();
assert!(result1.is_some());
assert_eq!(&result1.unwrap()[..], b"MSG1\r");
let result2 = codec.decode(&mut src).unwrap();
assert!(result2.is_some());
assert_eq!(&result2.unwrap()[..], b"MSG2\r");
let result3 = codec.decode(&mut src).unwrap();
assert!(result3.is_none());
}
}