use bytes::buf::{Buf, BufMut};
use bytes::BytesMut;
use log::{debug, trace};
use tokio_util::codec::*;
#[derive(Default)]
pub struct MllpCodec {}
impl MllpCodec {
const BLOCK_HEADER: u8 = 0x0B; const BLOCK_FOOTER: [u8; 2] = [0x1C, 0x0D];
pub fn new() -> Self {
MllpCodec {}
}
#[cfg(feature = "noncompliance")]
fn get_footer_position(src: &BytesMut) -> Option<usize> {
let mut iter = src.iter().enumerate().peekable(); loop {
let cur = iter.next();
let next = iter.peek();
match (cur, next) {
(Some((i, cur_ele)), Some((_, next_ele))) => {
if cur_ele == &MllpCodec::BLOCK_FOOTER[0]
&& *next_ele == &MllpCodec::BLOCK_FOOTER[1]
{
trace!("MLLP: Found footer at index {}", i);
return Some(i);
}
}
(_, None) => {
trace!("MLLP: Unable to find footer...");
return None;
}
_ => {} }
}
}
#[cfg(not(feature = "noncompliance"))]
fn get_footer_position(src: &BytesMut) -> Option<usize> {
let mut iter = src.iter().rev().enumerate().peekable(); loop {
let cur = iter.next();
let next = iter.peek();
match (cur, next) {
(Some((_, cur_ele)), Some((i, next_ele))) => {
if cur_ele == &MllpCodec::BLOCK_FOOTER[1]
&& *next_ele == &MllpCodec::BLOCK_FOOTER[0]
{
let index = src.len() - i - 1; trace!("MLLP: Found footer at index {}", index);
return Some(index);
}
}
(_, None) => {
trace!("MLLP: Unable to find footer...");
return None;
}
_ => {} }
}
}
}
impl Encoder<BytesMut> for MllpCodec {
type Error = std::io::Error;
fn encode(&mut self, event: BytesMut, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.reserve(event.len() + 3); dst.put_u8(MllpCodec::BLOCK_HEADER);
dst.put_slice(&event);
dst.put_slice(&MllpCodec::BLOCK_FOOTER);
debug!("MLLP: Encoded value for send: '{:?}'", dst);
Ok(())
}
}
impl Decoder for MllpCodec {
type Item = BytesMut; type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if let Some(start_offset) = src.iter().position(|b| *b == MllpCodec::BLOCK_HEADER) {
if let Some(end_offset) = MllpCodec::get_footer_position(src) {
let mut result = src
.split_to(end_offset + 2) .split_to(end_offset); result.advance(start_offset + 1); return Ok(Some(result));
}
}
Ok(None) }
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
fn wrap_for_mllp(s: &str) -> Bytes {
Bytes::from(format!("\x0B{}\x1C\x0D", s))
}
fn wrap_for_mllp_mut(s: &str) -> BytesMut {
BytesMut::from(format!("\x0B{}\x1C\x0D", s).as_str())
}
#[test]
fn can_construct_without_error() {
let _m = MllpCodec::new();
}
#[test]
fn implements_default() {
let _m = MllpCodec::default();
}
#[test]
fn wraps_simple_data() {
let data = BytesMut::from("abcd");
let mut m = MllpCodec::new();
let mut output_buf = BytesMut::with_capacity(64);
match m.encode(data, &mut output_buf) {
Ok(()) => {}
_ => panic!("Non OK value returned from encode"),
}
let encoded_msg = output_buf.freeze();
println!("Encoded: {:?}", encoded_msg);
assert_eq!(encoded_msg, wrap_for_mllp("abcd"));
}
#[test]
fn find_footer_location() {
let data = wrap_for_mllp_mut("abcd"); let result = MllpCodec::get_footer_position(&data);
assert_eq!(result, Some(5));
}
#[test]
fn missing_footer_detected() {
let data = BytesMut::from("no footer");
let result = MllpCodec::get_footer_position(&data);
assert_eq!(result, None);
}
#[test]
fn ensure_decoder_finds_simple_message() {
let mut data = wrap_for_mllp_mut("abcd");
let mut m = MllpCodec::new();
let result = m.decode(&mut data);
println!("simple message result: {:?}", result);
match result {
Ok(None) => panic!("Failed to find a simple message!"),
Ok(Some(message)) => {
assert_eq!(&message[..], b"abcd");
}
Err(err) => panic!("Error looking for simple message: {:?}", err),
}
}
#[test]
fn ensure_data_after_end_is_ignored() {
let mut data = BytesMut::from("\x0BTest Data\x1C\x0DMore Data");
let mut m = MllpCodec::new();
let result = m.decode(&mut data);
match result {
Ok(Some(message)) => {
assert_eq!(&message[..], b"Test Data");
}
_ => panic!("Failure for message with illegal trailing data"),
}
}
#[test]
fn ensure_no_data_is_left_on_the_stream() {
let mut data = BytesMut::from("\x0BTest Data\x1C\x0D");
let mut m = MllpCodec::new();
let _result = m.decode(&mut data);
assert_eq!(
data.len(),
0,
"Decoder left data sitting in the buffer after read!"
);
}
#[test]
fn ensure_buffer_is_reset_per_message() {
let mut mllp = MllpCodec::new();
let mut data1 = wrap_for_mllp_mut("Test Data");
let mut data2 = wrap_for_mllp_mut("This is different");
let result = mllp.decode(&mut data1);
match result {
Ok(Some(message)) => {
assert_eq!(&message[..], b"Test Data");
}
_ => panic!("Error decoding second message"),
}
let result = mllp.decode(&mut data2);
match result {
Ok(Some(message)) => {
assert_eq!(&message[..], b"This is different");
}
_ => panic!("Error decoding second message"),
}
}
#[test]
fn test_real_message() {
let mut mllp = MllpCodec::new();
let mut data = wrap_for_mllp_mut("MSH|^~\\&|ZIS|1^AHospital|||200405141144||¶ADT^A01|20041104082400|P|2.3|||AL|NE|||8859/15|¶EVN|A01|20041104082400.0000+0100|20041104082400¶PID||\"\"|10||Vries^Danny^D.^^de||19951202|M|||Rembrandlaan^7^Leiden^^7301TH^\"\"^^P||\"\"|\"\"||\"\"|||||||\"\"|\"\"¶PV1||I|3w^301^\"\"^01|S|||100^van den Berg^^A.S.^^\"\"^dr|\"\"||9||||H||||20041104082400.0000+0100");
let result = mllp.decode(&mut data);
match result {
Ok(Some(message)) => {
assert_eq!(message.len(), 338);
}
_ => panic!("Error decoding second message"),
}
}
#[cfg(feature = "noncompliance")]
mod noncompliance_tests {
use super::*;
#[test]
fn test_parsing_multiple_messages() {
let mut mllp = MllpCodec::new();
let mut data = wrap_for_mllp_mut("MSH|^~\\&|ZIS|1^AHospital|||200405141144||¶ADT^A01|20041104082400|P|2.3|||AL|NE|||8859/15|¶EVN|A01|20041104082400.0000+0100|20041104082400¶PID||\"\"|10||Vries^Danny^D.^^de||19951202|M|||Rembrandlaan^7^Leiden^^7301TH^\"\"^^P||\"\"|\"\"||\"\"|||||||\"\"|\"\"¶PV1||I|3w^301^\"\"^01|S|||100^van den Berg^^A.S.^^\"\"^dr|\"\"||9||||H||||20041104082400.0000+0100");
let bytes = data.clone().iter().map(|s| s.to_owned()).collect::<Vec<u8>>();
data.extend_from_slice(&bytes[..]);
data.extend_from_slice(&bytes[..]);
let result = mllp.decode(&mut data);
match result {
Ok(Some(message)) => {
assert_eq!(message.len(), 338);
assert_eq!(data.len(), (message.len() * 2) + 6);
}
_ => assert!(false),
}
let result = mllp.decode(&mut data);
match result {
Ok(Some(message)) => {
assert_eq!(message.len(), 338);
assert_eq!(data.len(), message.len() + 3);
}
_ => assert!(false),
}
}
}
}