use thiserror::Error;
use tokio_util::bytes::BytesMut;
use tokio_util::codec::{Decoder, LinesCodec, LinesCodecError};
use crate::protocol::*;
#[derive(Error, Debug)]
pub enum RMonitorCodecError {
#[error("unable to decode record from line")]
RecordDecode(#[from] RecordError),
#[error(transparent)]
LinesCodec(#[from] LinesCodecError),
#[error(transparent)]
Io(#[from] std::io::Error),
}
#[derive(Default, Debug)]
pub struct RMonitorDecoder {
lines_codec: LinesCodec,
}
impl RMonitorDecoder {
pub fn new() -> Self {
Self {
lines_codec: LinesCodec::new(),
}
}
pub fn new_with_max_length(max_length: usize) -> Self {
Self {
lines_codec: LinesCodec::new_with_max_length(max_length),
}
}
}
impl Decoder for RMonitorDecoder {
type Item = Record;
type Error = RMonitorCodecError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let line = self.lines_codec.decode(src)?;
if let Some(line) = line {
if line.is_empty() || line.as_bytes()[0] != b'$' {
return Ok(None);
}
Ok(Some(Record::decode(&line)?))
} else {
Ok(None)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn consume(
decoder: &mut RMonitorDecoder,
bytes: &mut BytesMut,
) -> Vec<Result<Option<Record>, RMonitorCodecError>> {
let mut result = vec![];
loop {
match decoder.decode(bytes) {
Ok(None) => {
break;
}
out => result.push(out),
}
}
result
}
#[test]
fn test_decodes_single_line() {
let mut decoder = RMonitorDecoder::new_with_max_length(2048);
let mut bytes =
BytesMut::from("$F,9999,\"00:00:00\",\"14:09:52\",\"00:59:59\",\" \"\r\n");
let result = consume(&mut decoder, &mut bytes);
assert_eq!(0, bytes.len());
assert_eq!(1, result.len());
assert!(matches!(result[0], Ok(Some(Record::Heartbeat(_)))));
}
#[test]
fn test_decodes_large_sample() {
let mut decoder = RMonitorDecoder::new_with_max_length(2048);
let data: Vec<u8> = std::fs::read("sample/2009_Sebring_ALMS_Session_5.txt").unwrap();
let mut bytes = BytesMut::from(data.as_slice());
let result = consume(&mut decoder, &mut bytes);
assert_eq!(0, bytes.len());
assert!(result.into_iter().all(|r| r.is_ok()));
}
}