#![forbid(unsafe_code)]
#![warn(missing_docs)]
use bytes::{BufMut, Bytes, BytesMut};
pub const VT: u8 = 0x0B;
pub const FS: u8 = 0x1C;
pub const CR: u8 = 0x0D;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MllpError {
MissingStartByte,
MissingEndSequence,
EmptyPayload,
Incomplete,
InvalidFrame {
reason: String,
},
}
impl std::fmt::Display for MllpError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MissingStartByte => {
write!(
f,
"MLLP frame missing VT start byte (expected 0x0B at position 0)"
)
}
Self::MissingEndSequence => {
write!(
f,
"MLLP frame missing FS+CR end sequence (expected 0x1C 0x0D)"
)
}
Self::EmptyPayload => {
write!(f, "MLLP frame contains no HL7 payload between delimiters")
}
Self::Incomplete => {
write!(
f,
"Buffer too short for complete MLLP frame (need at least 4 bytes: VT + payload + FS + CR)"
)
}
Self::InvalidFrame { reason } => {
write!(f, "Invalid MLLP frame: {reason}")
}
}
}
}
impl std::error::Error for MllpError {}
impl From<MllpError> for std::io::Error {
fn from(err: MllpError) -> Self {
std::io::Error::new(std::io::ErrorKind::InvalidData, err)
}
}
pub struct MllpFrame;
impl MllpFrame {
pub fn encode(payload: &[u8]) -> Bytes {
let mut buf = BytesMut::with_capacity(payload.len() + 3);
buf.put_u8(VT);
buf.put_slice(payload);
buf.put_u8(FS);
buf.put_u8(CR);
buf.freeze()
}
pub fn decode(buf: &[u8]) -> Result<&[u8], MllpError> {
if buf.len() < 4 {
return Err(MllpError::Incomplete);
}
if buf[0] != VT {
return Err(MllpError::MissingStartByte);
}
let end = buf.len();
if buf[end - 2] != FS || buf[end - 1] != CR {
return Err(MllpError::MissingEndSequence);
}
let payload = &buf[1..end - 2];
if payload.is_empty() {
return Err(MllpError::EmptyPayload);
}
Ok(payload)
}
pub fn find_frame_end(buf: &[u8]) -> Option<usize> {
if buf.is_empty() || buf[0] != VT {
return None;
}
for i in 1..buf.len().saturating_sub(1) {
if buf[i] == FS && buf[i + 1] == CR {
return Some(i + 2);
}
}
None
}
pub fn find_all_frames(buf: &[u8]) -> Vec<(usize, usize)> {
let mut frames = Vec::new();
let mut pos = 0;
while pos < buf.len() {
if buf[pos] != VT {
#[cfg(feature = "noncompliance")]
{
if let Some(vt_pos) = buf[pos..].iter().position(|&b| b == VT) {
pos += vt_pos;
} else {
break;
}
}
#[cfg(not(feature = "noncompliance"))]
break;
}
if buf.len() - pos < 4 {
break;
}
let search_start = pos + 1;
let mut found_end = None;
for i in search_start..buf.len().saturating_sub(1) {
if buf[i] == FS && buf[i + 1] == CR {
found_end = Some(i + 2); break;
}
}
#[cfg(feature = "noncompliance")]
if found_end.is_none() {
let remaining = buf.len() - pos;
if remaining >= 3 && buf[buf.len() - 1] == FS {
if remaining >= 3 {
found_end = Some(buf.len());
}
}
}
if let Some(end) = found_end {
if end - pos >= 4 {
frames.push((pos, end));
pos = end;
} else {
break;
}
} else {
break;
}
}
frames
}
pub fn build_ack(message_control_id: &str, accepting: bool) -> Option<String> {
if message_control_id.is_empty() {
return None;
}
let code = if accepting { "AA" } else { "AE" };
let timestamp = chrono_now_str();
let ack_control_id = format!("ACK{}{}", ×tamp, message_control_id);
Some(format!(
"MSH|^~\\&|||||{}||ACK|{}|P|2.3.1\rMSA|{}|{}",
timestamp, ack_control_id, code, message_control_id,
))
}
pub fn build_nack(
message_control_id: &str,
error_code: &str,
error_text: &str,
) -> Option<String> {
if message_control_id.is_empty() {
return None;
}
let timestamp = chrono_now_str();
let nack_control_id = format!("NACK{}{}", ×tamp, message_control_id);
let escaped_text = error_text.replace('|', "\\F\\");
Some(format!(
"MSH|^~\\&|||||{}||ACK|{}|P|2.3.1\rMSA|AR|{}|{}: {} - {}",
timestamp, nack_control_id, message_control_id, error_code, error_code, escaped_text,
))
}
}
fn chrono_now_str() -> String {
#[cfg(feature = "timestamps")]
{
use chrono::Local;
Local::now().format("%Y%m%d%H%M%S").to_string()
}
#[cfg(not(feature = "timestamps"))]
{
"20250101000000".to_string()
}
}
#[derive(Debug, Clone)]
pub struct MllpFramer {
buffer: BytesMut,
}
impl MllpFramer {
pub fn new() -> Self {
Self {
buffer: BytesMut::new(),
}
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
buffer: BytesMut::with_capacity(capacity),
}
}
pub fn push(&mut self, bytes: &[u8]) {
self.buffer.extend_from_slice(bytes);
}
pub fn next_frame(&mut self) -> Option<Vec<u8>> {
let frame_end = MllpFrame::find_frame_end(&self.buffer)?;
let frame = self.buffer.split_to(frame_end).to_vec();
Some(frame)
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn len(&self) -> usize {
self.buffer.len()
}
pub fn clear(&mut self) {
self.buffer.clear();
}
}
impl Default for MllpFramer {
fn default() -> Self {
Self::new()
}
}
pub trait MllpTransport {
type Error: std::error::Error;
fn read_frame(&mut self) -> Result<Vec<u8>, Self::Error>;
fn write_frame(&mut self, frame: &[u8]) -> Result<(), Self::Error>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn roundtrip() {
let payload =
b"MSH|^~\\&|SendApp|SendFac|RecApp|RecFac|20240101120000||ORU^R01|12345|P|2.3.1";
let framed = MllpFrame::encode(payload);
let decoded = MllpFrame::decode(&framed).unwrap();
assert_eq!(decoded, payload);
}
#[test]
fn missing_start_byte() {
let bad = b"no_vt_here\x1C\x0D";
assert_eq!(MllpFrame::decode(bad), Err(MllpError::MissingStartByte));
}
#[test]
fn missing_end_sequence() {
let bad = b"\x0Bpayload_no_end";
assert_eq!(MllpFrame::decode(bad), Err(MllpError::MissingEndSequence));
}
#[test]
fn find_frame_end_complete() {
let payload = b"MSH|test";
let framed = MllpFrame::encode(payload);
assert_eq!(MllpFrame::find_frame_end(&framed), Some(framed.len()));
}
#[test]
fn find_frame_end_incomplete() {
let partial = b"\x0Bincomplete_data";
assert_eq!(MllpFrame::find_frame_end(partial), None);
}
#[test]
fn find_all_frames_two_back_to_back() {
let payload1 = b"MSH|first";
let payload2 = b"MSH|second";
let frame1 = MllpFrame::encode(payload1);
let frame2 = MllpFrame::encode(payload2);
let combined = [&frame1[..], &frame2[..]].concat();
let frames = MllpFrame::find_all_frames(&combined);
assert_eq!(frames.len(), 2);
assert_eq!(frames[0], (0, frame1.len()));
assert_eq!(frames[1], (frame1.len(), frame1.len() + frame2.len()));
let decoded1 = MllpFrame::decode(&combined[frames[0].0..frames[0].1]).unwrap();
let decoded2 = MllpFrame::decode(&combined[frames[1].0..frames[1].1]).unwrap();
assert_eq!(decoded1, payload1);
assert_eq!(decoded2, payload2);
}
#[test]
fn find_all_frames_with_partial_third() {
let payload1 = b"MSH|first";
let payload2 = b"MSH|second";
let payload3 = b"MSH|partial_no_end";
let frame1 = MllpFrame::encode(payload1);
let frame2 = MllpFrame::encode(payload2);
let partial3 = [&[VT][..], payload3].concat();
let combined = [&frame1[..], &frame2[..], &partial3[..]].concat();
let frames = MllpFrame::find_all_frames(&combined);
assert_eq!(frames.len(), 2); assert_eq!(frames[0], (0, frame1.len()));
assert_eq!(frames[1], (frame1.len(), frame1.len() + frame2.len()));
}
#[test]
fn find_all_frames_empty_buffer() {
assert!(MllpFrame::find_all_frames(b"").is_empty());
}
#[test]
fn find_all_frames_no_frames() {
assert!(MllpFrame::find_all_frames(b"garbage_data_no_vt").is_empty());
}
#[test]
fn find_all_frames_empty_payload_rejected() {
let empty_frame = [VT, FS, CR];
let frames = MllpFrame::find_all_frames(&empty_frame);
assert!(frames.is_empty(), "Empty payload frame should be rejected");
}
#[test]
fn verify_mllp_byte_constants() {
assert_eq!(VT, 0x0B, "VT must be 0x0B per HL7 v2.5.1 Appendix C");
assert_eq!(FS, 0x1C, "FS must be 0x1C per HL7 v2.5.1 Appendix C");
assert_eq!(CR, 0x0D, "CR must be 0x0D per HL7 v2.5.1 Appendix C");
}
#[test]
fn verify_single_byte_start_block() {
let frame = MllpFrame::encode(b"test");
assert_eq!(frame[0], VT);
assert_eq!(frame.len(), 7); }
#[cfg(feature = "noncompliance")]
mod noncompliance_tests {
use super::*;
#[test]
fn tolerate_missing_final_cr() {
let payload = b"MSH|test";
let incomplete = [&[VT][..], payload, &[FS]].concat();
let frames = MllpFrame::find_all_frames(&incomplete);
assert_eq!(frames.len(), 1);
assert_eq!(frames[0], (0, incomplete.len()));
}
#[test]
fn tolerate_extra_bytes_before_vt() {
let payload = b"MSH|test";
let frame = MllpFrame::encode(payload);
let garbage_before = [b"garbage", &frame[..]].concat();
let frames = MllpFrame::find_all_frames(&garbage_before);
assert_eq!(frames.len(), 1);
assert_eq!(frames[0].0, 7);
}
#[test]
fn noncompliance_empty_payload_rejected() {
let empty_frame = [VT, FS]; let frames = MllpFrame::find_all_frames(&empty_frame);
assert!(
frames.is_empty(),
"Empty payload should be rejected even with noncompliance"
);
}
#[test]
fn strict_mode_rejects_missing_cr() {
let payload = b"MSH|test";
let incomplete = [&[VT][..], payload, &[FS]].concat();
}
}
#[test]
fn build_ack_validates_empty_control_id() {
assert!(MllpFrame::build_ack("", true).is_none());
assert!(MllpFrame::build_ack("", false).is_none());
}
#[test]
fn build_ack_creates_aa_for_accept() {
let ack = MllpFrame::build_ack("MSG001", true).unwrap();
assert!(ack.contains("MSA|AA|MSG001"));
}
#[test]
fn build_ack_creates_ae_for_reject() {
let ack = MllpFrame::build_ack("MSG001", false).unwrap();
assert!(ack.contains("MSA|AE|MSG001"));
}
#[test]
fn build_ack_has_unique_control_id() {
let ack = MllpFrame::build_ack("MSG001", true).unwrap();
assert!(ack.contains("||ACK|ACK"));
assert!(ack.contains("MSG001|P|2.3.1"));
}
#[test]
fn build_nack_validates_empty_control_id() {
assert!(MllpFrame::build_nack("", "101", "Error").is_none());
}
#[test]
fn build_nack_creates_ar_with_error_details() {
let nack = MllpFrame::build_nack("MSG001", "101", "Invalid message").unwrap();
assert!(nack.contains("MSA|AR|MSG001|101: 101 - Invalid message"));
}
#[test]
fn build_nack_contains_ack_msh() {
let nack = MllpFrame::build_nack("MSG001", "102", "Parse error").unwrap();
assert!(nack.starts_with("MSH|^~\\&|||||"));
assert!(nack.contains("||ACK|NACK")); }
#[test]
fn build_nack_escapes_pipe_in_error_text() {
let nack = MllpFrame::build_nack("MSG001", "101", "Error|with|pipes").unwrap();
assert!(nack.contains("Error\\F\\with\\F\\pipes"));
}
#[test]
fn ack_roundtrip_parse() {
use hl7_v2::Hl7Message;
let ack_str = MllpFrame::build_ack("MSG12345", true).unwrap();
let ack_bytes = ack_str.as_bytes();
let parsed = Hl7Message::parse(ack_bytes);
assert!(
parsed.is_ok(),
"ACK should be valid HL7 that hl7-v2 can parse"
);
let msg = parsed.unwrap();
let msh = msg.segment("MSH");
assert!(msh.is_some(), "ACK should have MSH segment");
let msa = msg.segment("MSA");
assert!(msa.is_some(), "ACK should have MSA segment");
let msa_seg = msa.unwrap();
let msa_2 = msa_seg.raw_fields().get(1);
assert_eq!(msa_2, Some(&"MSG12345"));
}
#[test]
fn nack_roundtrip_parse() {
use hl7_v2::Hl7Message;
let nack_str = MllpFrame::build_nack("MSG999", "102", "Processing failed").unwrap();
let nack_bytes = nack_str.as_bytes();
let parsed = Hl7Message::parse(nack_bytes);
assert!(
parsed.is_ok(),
"NACK should be valid HL7 that hl7-v2 can parse"
);
let msg = parsed.unwrap();
let msh = msg.segment("MSH");
assert!(msh.is_some(), "NACK should have MSH segment");
let msa = msg.segment("MSA");
assert!(msa.is_some(), "NACK should have MSA segment");
let msa_seg = msa.unwrap();
let msa_1 = msa_seg.raw_fields().first();
assert_eq!(msa_1, Some(&"AR"));
let msa_2 = msa_seg.raw_fields().get(1);
assert_eq!(msa_2, Some(&"MSG999"));
}
#[test]
fn framer_push_single_bytes_and_recover_frame() {
let mut framer = MllpFramer::new();
let frame = MllpFrame::encode(b"MSH|test");
for byte in &frame {
assert!(framer.next_frame().is_none());
framer.push(&[*byte]);
}
let recovered = framer.next_frame().unwrap();
assert_eq!(recovered, frame.to_vec());
assert!(framer.is_empty());
}
#[test]
fn framer_push_two_frames_at_once() {
let mut framer = MllpFramer::new();
let frame1 = MllpFrame::encode(b"MSH|first");
let frame2 = MllpFrame::encode(b"MSH|second");
let combined = [&frame1[..], &frame2[..]].concat();
framer.push(&combined);
let recovered1 = framer.next_frame().unwrap();
assert_eq!(recovered1, frame1.to_vec());
let recovered2 = framer.next_frame().unwrap();
assert_eq!(recovered2, frame2.to_vec());
assert!(framer.next_frame().is_none());
assert!(framer.is_empty());
}
#[test]
fn framer_is_empty_and_len() {
let mut framer = MllpFramer::new();
assert!(framer.is_empty());
assert_eq!(framer.len(), 0);
framer.push(b"\x0Btest");
assert!(!framer.is_empty());
assert_eq!(framer.len(), 5);
framer.clear();
assert!(framer.is_empty());
assert_eq!(framer.len(), 0);
}
#[test]
fn framer_with_capacity() {
let framer = MllpFramer::with_capacity(1024);
assert!(framer.is_empty());
}
#[test]
fn framer_default() {
let framer: MllpFramer = Default::default();
assert!(framer.is_empty());
}
#[test]
fn framer_partial_frame_no_complete() {
let mut framer = MllpFramer::new();
framer.push(b"\x0Bpartial_data");
assert!(framer.next_frame().is_none());
assert!(!framer.is_empty());
}
#[test]
fn framer_preserves_remaining_bytes() {
let mut framer = MllpFramer::new();
let frame1 = MllpFrame::encode(b"MSH|first");
let partial = b"\x0BMSH|partial_no_end";
let combined = [&frame1[..], &partial[..]].concat();
framer.push(&combined);
let recovered = framer.next_frame().unwrap();
assert_eq!(recovered, frame1.to_vec());
assert!(!framer.is_empty());
assert_eq!(framer.len(), partial.len());
assert!(framer.next_frame().is_none());
}
#[test]
fn encode_decode_roundtrip_unicode() {
let unicode_payload = "MSH|^~\\&|Test|Facility|20240101120000||ORU^R01|12345|P|2.5\rPID|1||P001||Doe^John^José||19800101|M".as_bytes();
let framed = MllpFrame::encode(unicode_payload);
let decoded = MllpFrame::decode(&framed).unwrap();
assert_eq!(decoded, unicode_payload);
}
#[test]
fn decode_minimum_length_valid_frame() {
let min_frame = [VT, b'X', FS, CR];
let decoded = MllpFrame::decode(&min_frame).unwrap();
assert_eq!(decoded, b"X");
}
#[test]
fn find_frame_end_exactly_one_frame() {
let payload = b"MSH|exact";
let frame = MllpFrame::encode(payload);
let end = MllpFrame::find_frame_end(&frame);
assert_eq!(end, Some(frame.len()));
}
#[test]
fn find_frame_end_empty_buffer() {
assert_eq!(MllpFrame::find_frame_end(b""), None);
}
#[test]
fn find_frame_end_no_vt() {
assert_eq!(MllpFrame::find_frame_end(b"no_vt_here"), None);
}
#[test]
fn framer_push_pop_streaming() {
let mut framer = MllpFramer::new();
let frames = vec![
MllpFrame::encode(b"MSH|msg1"),
MllpFrame::encode(b"MSH|msg2"),
MllpFrame::encode(b"MSH|msg3"),
];
let combined: Vec<u8> = frames.iter().flat_map(|f| f.to_vec()).collect();
framer.push(&combined);
for (i, expected) in frames.iter().enumerate() {
let actual = framer.next_frame().unwrap();
assert_eq!(actual, expected.to_vec(), "Frame {} mismatch", i);
}
assert!(framer.next_frame().is_none());
}
}