use bytes::buf::BufMut;
use bytes::BytesMut;
use log::{debug, error, info, trace, warn};
use tokio::codec::*;
pub struct MllpCodec {}
impl MllpCodec {
const BLOCK_HEADER: u8 = 0x0B; const BLOCK_FOOTER: [u8; 2] = [0x1C, 0x0D];
pub fn new() -> Self {
MllpCodec {}
}
fn get_footer_position(src: &BytesMut) -> Option<usize> {
for i in 0..src.len() - 1 {
if src[i] == MllpCodec::BLOCK_FOOTER[0] && src[i + 1] == MllpCodec::BLOCK_FOOTER[1] {
trace!("MLLP: Found footer at index {}", i);
return Some(i);
}
}
trace!("MLLP: Unable to find footer...");
None
}
}
impl Encoder for MllpCodec {
type Item = BytesMut; type Error = std::io::Error;
fn encode(&mut self, event: Self::Item, buf: &mut BytesMut) -> Result<(), Self::Error> {
buf.reserve(event.len() + 3); buf.put_u8(MllpCodec::BLOCK_HEADER);
buf.extend_from_slice(&event);
buf.extend_from_slice(&MllpCodec::BLOCK_FOOTER);
debug!("MLLP: Encoded value for send: '{:?}'", buf);
Ok(())
}
}
impl Decoder for MllpCodec {
type Item = BytesMut; type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if let Some(start_offset) = src.iter().position(|b| *b == MllpCodec::BLOCK_HEADER) {
trace!("MLLP: Found message header at index {}", start_offset);
if let Some(end_offset) = MllpCodec::get_footer_position(src) {
let result = src.split_to(end_offset + 2); let result = &result[start_offset + 1..&result.len() - 2]; let return_buf = BytesMut::from(result);
debug!("MLLP: Received message: {:?}", return_buf);
return Ok(Some(return_buf));
}
}
warn!("MLLP: Unable to find a message, but we're meant to b a synchronous protocol?");
Ok(None) }
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
fn wrap_for_mllp(s: &str) -> Bytes {
Bytes::from(format!("\x0B{}\x1C\x0D", s))
}
fn wrap_for_mllp_mut(s: &str) -> BytesMut {
BytesMut::from(format!("\x0B{}\x1C\x0D", s))
}
#[test]
fn can_construct_without_error() {
let _m = MllpCodec::new();
}
#[test]
fn wraps_simple_data() {
let data = BytesMut::from("abcd");
let mut m = MllpCodec::new();
let mut output_buf = BytesMut::with_capacity(64);
match m.encode(data, &mut output_buf) {
Ok(()) => {}
_ => assert!(false, "Non OK value returned from encode"),
}
let encoded_msg = output_buf.freeze();
println!("Encoded: {:?}", encoded_msg);
assert_eq!(encoded_msg, wrap_for_mllp("abcd"));
}
#[test]
fn find_footer_location() {
let data = wrap_for_mllp_mut("abcd"); let result = MllpCodec::get_footer_position(&data);
assert_eq!(result, Some(5));
}
#[test]
fn ensure_decoder_finds_simple_message() {
let mut data = wrap_for_mllp_mut("abcd");
let mut m = MllpCodec::new();
let result = m.decode(&mut data);
println!("simple message result: {:?}", result);
match result {
Ok(None) => assert!(false, "Failed to find a simple message!"),
Ok(Some(message)) => {
assert_eq!(&message[..], b"abcd");
}
Err(err) => assert!(false, "Error looking for simple message: {:?}", err),
}
}
#[test]
fn ensure_data_after_end_is_ignored() {
let mut data = BytesMut::from("\x0BTest Data\x1C\x0DMore Data");
let mut m = MllpCodec::new();
let result = m.decode(&mut data);
match result {
Ok(Some(message)) => {
assert_eq!(&message[..], b"Test Data");
}
_ => assert!(false, "Failure for message with illegal trailing data"),
}
}
#[test]
fn ensure_no_data_is_left_on_the_stream() {
let mut data = BytesMut::from("\x0BTest Data\x1C\x0D");
let mut m = MllpCodec::new();
let _result = m.decode(&mut data);
assert_eq!(
data.len(),
0,
"Decoder left data sitting in the buffer after read!"
);
}
#[test]
fn ensure_buffer_is_reset_per_message() {
let mut mllp = MllpCodec::new();
let mut data1 = wrap_for_mllp_mut("Test Data");
let mut data2 = wrap_for_mllp_mut("This is different");
let result = mllp.decode(&mut data1);
match result {
Ok(Some(message)) => {
assert_eq!(&message[..], b"Test Data");
}
_ => assert!(false),
}
let result = mllp.decode(&mut data2);
match result {
Ok(Some(message)) => {
assert_eq!(&message[..], b"This is different");
}
_ => assert!(false, "Error decoding second message"),
}
}
#[test]
fn test_real_message() {
let mut mllp = MllpCodec::new();
let mut data = wrap_for_mllp_mut("MSH|^~\\&|ZIS|1^AHospital|||200405141144||¶ADT^A01|20041104082400|P|2.3|||AL|NE|||8859/15|¶EVN|A01|20041104082400.0000+0100|20041104082400¶PID||\"\"|10||Vries^Danny^D.^^de||19951202|M|||Rembrandlaan^7^Leiden^^7301TH^\"\"^^P||\"\"|\"\"||\"\"|||||||\"\"|\"\"¶PV1||I|3w^301^\"\"^01|S|||100^van den Berg^^A.S.^^\"\"^dr|\"\"||9||||H||||20041104082400.0000+0100");
let result = mllp.decode(&mut data);
match result {
Ok(Some(message)) => {
assert_eq!(message.len(), 338);
}
_ => assert!(false),
}
}
}