use crate::bytes::BytesMut;
use crate::codec::{Decoder, Encoder};
use std::io;
#[derive(Debug)]
pub enum LinesCodecError {
MaxLineLengthExceeded,
InvalidUtf8,
Io(io::Error),
}
impl std::fmt::Display for LinesCodecError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MaxLineLengthExceeded => write!(f, "line exceeds maximum length"),
Self::InvalidUtf8 => write!(f, "line is not valid UTF-8"),
Self::Io(err) => write!(f, "i/o error while decoding line: {err}"),
}
}
}
impl std::error::Error for LinesCodecError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Io(err) => Some(err),
Self::MaxLineLengthExceeded | Self::InvalidUtf8 => None,
}
}
}
impl LinesCodecError {
#[inline]
#[must_use]
pub fn io_kind(&self) -> Option<io::ErrorKind> {
match self {
Self::Io(err) => Some(err.kind()),
Self::MaxLineLengthExceeded | Self::InvalidUtf8 => None,
}
}
}
impl From<io::Error> for LinesCodecError {
#[inline]
fn from(err: io::Error) -> Self {
Self::Io(err)
}
}
#[derive(Debug, Clone)]
pub struct LinesCodec {
max_length: usize,
next_index: usize,
is_discarding: bool,
}
impl LinesCodec {
#[inline]
#[must_use]
pub fn new() -> Self {
Self::new_with_max_length(usize::MAX)
}
#[inline]
#[must_use]
pub fn new_with_max_length(max_length: usize) -> Self {
Self {
max_length,
next_index: 0,
is_discarding: false,
}
}
#[inline]
#[must_use]
pub fn max_length(&self) -> usize {
self.max_length
}
#[inline]
fn reset_stale_scan_state(&mut self, src: &BytesMut) {
if self.next_index > 0 && self.next_index >= src.len() {
self.next_index = 0;
self.is_discarding = false;
}
}
}
impl Default for LinesCodec {
fn default() -> Self {
Self::new()
}
}
impl Decoder for LinesCodec {
type Item = String;
type Error = LinesCodecError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<String>, Self::Error> {
self.reset_stale_scan_state(src);
loop {
let read_to = if self.is_discarding {
src.len()
} else {
std::cmp::min(self.max_length.saturating_add(1), src.len())
};
let newline_offset = src[self.next_index..read_to]
.iter()
.position(|b| *b == b'\n');
match (self.is_discarding, newline_offset) {
(true, Some(offset)) => {
let newline_index = self.next_index + offset;
let _ = src.split_to(newline_index + 1);
self.next_index = 0;
self.is_discarding = false;
}
(true, None) => {
src.clear();
self.next_index = 0;
return Ok(None);
}
(false, Some(offset)) => {
let newline_index = self.next_index + offset;
self.next_index = 0;
let mut line = src.split_to(newline_index + 1);
line.truncate(line.len().saturating_sub(1));
if line.last() == Some(&b'\r') {
line.truncate(line.len().saturating_sub(1));
}
let s = String::from_utf8(line.to_vec())
.map_err(|_| LinesCodecError::InvalidUtf8)?;
return Ok(Some(s));
}
(false, None) => {
if src.len() > self.max_length {
self.is_discarding = true;
return Err(LinesCodecError::MaxLineLengthExceeded);
}
self.next_index = read_to;
return Ok(None);
}
}
}
}
fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.decode(src)? {
Some(frame) => Ok(Some(frame)),
None if src.is_empty() => Ok(None),
None if self.is_discarding => {
src.clear();
self.next_index = 0;
self.is_discarding = false;
Ok(None)
}
None => {
self.next_index = 0;
if src.len() > self.max_length {
src.clear();
return Err(LinesCodecError::MaxLineLengthExceeded);
}
let mut line = src.split_to(src.len());
if line.last() == Some(&b'\r') {
line.truncate(line.len().saturating_sub(1));
}
let s =
String::from_utf8(line.to_vec()).map_err(|_| LinesCodecError::InvalidUtf8)?;
Ok(Some(s))
}
}
}
}
impl Encoder<String> for LinesCodec {
type Error = io::Error;
fn encode(&mut self, line: String, dst: &mut BytesMut) -> Result<(), io::Error> {
dst.reserve(line.len() + 1);
dst.put_slice(line.as_bytes());
dst.put_u8(b'\n');
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lines_codec_decode() {
let mut codec = LinesCodec::new();
let mut buf = BytesMut::from("hello\nworld\n");
assert_eq!(codec.decode(&mut buf).unwrap(), Some("hello".to_string()));
assert_eq!(codec.decode(&mut buf).unwrap(), Some("world".to_string()));
assert_eq!(codec.decode(&mut buf).unwrap(), None);
}
#[test]
fn test_lines_codec_crlf() {
let mut codec = LinesCodec::new();
let mut buf = BytesMut::from("hello\r\n");
assert_eq!(codec.decode(&mut buf).unwrap(), Some("hello".to_string()));
}
#[test]
fn test_lines_codec_max_length() {
let mut codec = LinesCodec::new_with_max_length(5);
let mut buf = BytesMut::from("toolong\n");
assert!(matches!(
codec.decode(&mut buf),
Err(LinesCodecError::MaxLineLengthExceeded)
));
}
#[test]
fn test_lines_codec_discards_oversized_and_recovers() {
let mut codec = LinesCodec::new_with_max_length(5);
let mut buf = BytesMut::from("toolong");
assert!(matches!(
codec.decode(&mut buf),
Err(LinesCodecError::MaxLineLengthExceeded)
));
buf.put_slice(b"\nok\n");
assert_eq!(codec.decode(&mut buf).unwrap(), Some("ok".to_string()));
assert_eq!(codec.decode(&mut buf).unwrap(), None);
}
#[test]
fn test_lines_codec_reused_shorter_buffer_after_partial_line() {
let mut codec = LinesCodec::new();
let mut buf = BytesMut::from("partial");
assert_eq!(codec.decode(&mut buf).unwrap(), None);
buf.clear();
buf.put_slice(b"ok\n");
assert_eq!(codec.decode(&mut buf).unwrap(), Some("ok".to_string()));
assert_eq!(codec.decode(&mut buf).unwrap(), None);
}
#[test]
fn test_lines_codec_reused_shorter_buffer_clears_discarding_state() {
let mut codec = LinesCodec::new_with_max_length(5);
let mut buf = BytesMut::from("abc");
assert_eq!(codec.decode(&mut buf).unwrap(), None);
buf.put_slice(b"def");
assert!(matches!(
codec.decode(&mut buf),
Err(LinesCodecError::MaxLineLengthExceeded)
));
buf.clear();
buf.put_slice(b"ok\n");
assert_eq!(codec.decode(&mut buf).unwrap(), Some("ok".to_string()));
assert_eq!(codec.decode(&mut buf).unwrap(), None);
}
#[test]
fn test_lines_codec_decode_eof_returns_trailing_line() {
let mut codec = LinesCodec::new();
let mut buf = BytesMut::from("tail-without-newline");
assert_eq!(
codec.decode_eof(&mut buf).unwrap(),
Some("tail-without-newline".to_string())
);
assert_eq!(codec.decode_eof(&mut buf).unwrap(), None);
}
#[test]
fn test_lines_codec_encode() {
let mut codec = LinesCodec::new();
let mut buf = BytesMut::new();
codec.encode("hello".to_string(), &mut buf).unwrap();
assert_eq!(&buf[..], b"hello\n");
}
#[test]
fn lines_codec_error_debug_and_display() {
let e1 = LinesCodecError::MaxLineLengthExceeded;
let e2 = LinesCodecError::InvalidUtf8;
assert!(format!("{e1:?}").contains("MaxLineLengthExceeded"));
assert!(format!("{e2:?}").contains("InvalidUtf8"));
assert!(format!("{e1}").contains("maximum length"));
assert!(format!("{e2}").contains("not valid UTF-8"));
let err: &dyn std::error::Error = &e1;
assert!(err.source().is_none());
}
#[test]
fn lines_codec_error_from_io() {
let io_err = std::io::Error::other("test");
let codec_err: LinesCodecError = io_err.into();
assert_eq!(codec_err.io_kind(), Some(io::ErrorKind::Other));
assert!(format!("{codec_err}").contains("i/o error"));
assert!(std::error::Error::source(&codec_err).is_some());
}
#[test]
fn lines_codec_debug_clone_default() {
let codec = LinesCodec::new();
let dbg = format!("{codec:?}");
assert!(dbg.contains("LinesCodec"), "{dbg}");
let cloned = codec.clone();
assert_eq!(cloned.max_length(), codec.max_length());
let def = LinesCodec::default();
assert_eq!(def.max_length(), usize::MAX);
}
}