use std::{collections::VecDeque, ops::Range};
use imap_types::{
core::{LiteralMode, Tag},
secret::Secret,
};
use crate::decode::Decoder;
#[derive(Clone, Debug)]
pub struct Fragmentizer {
unparsed_buffer: VecDeque<u8>,
max_message_size: Option<u32>,
max_message_size_exceeded: bool,
message_poisoned: bool,
message_buffer: Vec<u8>,
parser: Option<Parser>,
}
impl Fragmentizer {
pub fn new(max_message_size: u32) -> Self {
Self {
unparsed_buffer: VecDeque::new(),
max_message_size: Some(max_message_size),
max_message_size_exceeded: false,
message_poisoned: false,
message_buffer: Vec::new(),
parser: Some(Parser::Line(LineParser::new(0))),
}
}
pub fn without_max_message_size() -> Self {
Self {
unparsed_buffer: VecDeque::new(),
max_message_size: None,
max_message_size_exceeded: false,
message_poisoned: false,
message_buffer: Vec::new(),
parser: Some(Parser::Line(LineParser::new(0))),
}
}
pub fn progress(&mut self) -> Option<FragmentInfo> {
let parser = match &mut self.parser {
Some(parser) => {
parser
}
None => {
self.max_message_size_exceeded = false;
self.message_poisoned = false;
self.message_buffer.clear();
self.parser.insert(Parser::Line(LineParser::new(0)))
}
};
let (parsed_byte_count, fragment) = match parser {
Parser::Line(parser) => parser.parse(&self.unparsed_buffer),
Parser::Literal(parser) => parser.parse(&self.unparsed_buffer),
};
self.dequeue_parsed_bytes(parsed_byte_count);
if let Some(fragment) = fragment {
self.parser = match fragment {
FragmentInfo::Line {
announcement: None, ..
} => None,
FragmentInfo::Line {
end,
announcement: Some(LiteralAnnouncement { length, .. }),
..
} => Some(Parser::Literal(LiteralParser::new(end, length))),
FragmentInfo::Literal { end, .. } => Some(Parser::Line(LineParser::new(end))),
}
}
fragment
}
pub fn enqueue_bytes(&mut self, bytes: &[u8]) {
self.unparsed_buffer.extend(bytes);
}
pub fn fragment_bytes(&self, fragment_info: FragmentInfo) -> &[u8] {
let (start, end) = match fragment_info {
FragmentInfo::Line { start, end, .. } => (start, end),
FragmentInfo::Literal { start, end } => (start, end),
};
let start = start.min(self.message_buffer.len());
let end = end.min(self.message_buffer.len());
&self.message_buffer[start..end]
}
pub fn is_message_complete(&self) -> bool {
self.parser.is_none()
}
pub fn message_bytes(&self) -> &[u8] {
&self.message_buffer
}
pub fn is_max_message_size_exceeded(&self) -> bool {
self.max_message_size_exceeded
}
pub fn is_message_poisoned(&self) -> bool {
self.message_poisoned
}
pub fn skip_message(&mut self) {
self.max_message_size_exceeded = false;
self.message_poisoned = false;
self.message_buffer.clear();
self.parser = Some(Parser::Line(LineParser::new(0)));
}
pub fn poison_message(&mut self) {
self.message_poisoned = true;
}
pub fn decode_tag(&self) -> Option<Tag> {
parse_tag(&self.message_buffer)
}
pub fn decode_message<'a, C: Decoder>(
&'a self,
codec: &C,
) -> Result<C::Message<'a>, DecodeMessageError<'a, C>> {
if self.max_message_size_exceeded {
return Err(DecodeMessageError::MessageTooLong {
initial: Secret::new(&self.message_buffer),
});
}
if self.message_poisoned {
return Err(DecodeMessageError::MessagePoisoned {
discarded: Secret::new(&self.message_buffer),
});
}
let (remainder, message) = match codec.decode(&self.message_buffer) {
Ok(res) => res,
Err(err) => return Err(DecodeMessageError::DecodingFailure(err)),
};
if !remainder.is_empty() {
return Err(DecodeMessageError::DecodingRemainder {
message,
remainder: Secret::new(remainder),
});
}
Ok(message)
}
fn dequeue_parsed_bytes(&mut self, parsed_byte_count: usize) {
let parsed_bytes = self.unparsed_buffer.drain(..parsed_byte_count);
let remaining_size = self
.max_message_size
.map(|size| size as usize - self.message_buffer.len());
match remaining_size {
Some(remaining_size) if remaining_size < parsed_byte_count => {
let remaining_bytes = parsed_bytes.take(remaining_size);
self.message_buffer.extend(remaining_bytes);
self.max_message_size_exceeded = true;
}
_ => {
self.message_buffer.extend(parsed_bytes);
}
}
}
}
#[derive(Clone, Debug)]
enum Parser {
Line(LineParser),
Literal(LiteralParser),
}
#[derive(Clone, Debug)]
struct LineParser {
start: usize,
end: usize,
latest_byte: LatestByte,
}
impl LineParser {
fn new(start: usize) -> Self {
Self {
start,
end: start,
latest_byte: LatestByte::Other,
}
}
fn parse(&mut self, unprocessed_bytes: &VecDeque<u8>) -> (usize, Option<FragmentInfo>) {
let mut parsed_byte_count = 0;
let mut parsed_line = None;
for &next_byte in unprocessed_bytes {
parsed_byte_count += 1;
self.end += 1;
self.latest_byte = match self.latest_byte {
LatestByte::Other => match next_byte {
b'\r' => LatestByte::Cr { announcement: None },
b'\n' => {
parsed_line = Some(FragmentInfo::Line {
start: self.start,
end: self.end,
announcement: None,
ending: LineEnding::Lf,
});
LatestByte::Other
}
b'{' => LatestByte::OpeningBracket,
_ => LatestByte::Other,
},
LatestByte::OpeningBracket => match next_byte {
b'\r' => LatestByte::Cr { announcement: None },
b'\n' => {
parsed_line = Some(FragmentInfo::Line {
start: self.start,
end: self.end,
announcement: None,
ending: LineEnding::Lf,
});
LatestByte::Other
}
b'{' => LatestByte::OpeningBracket,
b'0'..=b'9' => {
let digit = (next_byte - b'0') as u32;
LatestByte::Digit { length: digit }
}
_ => LatestByte::Other,
},
LatestByte::Plus { length } => match next_byte {
b'\r' => LatestByte::Cr { announcement: None },
b'\n' => {
parsed_line = Some(FragmentInfo::Line {
start: self.start,
end: self.end,
announcement: None,
ending: LineEnding::Lf,
});
LatestByte::Other
}
b'{' => LatestByte::OpeningBracket,
b'}' => LatestByte::ClosingBracket {
announcement: LiteralAnnouncement {
mode: LiteralMode::NonSync,
length,
},
},
_ => LatestByte::Other,
},
LatestByte::Digit { length } => match next_byte {
b'\r' => LatestByte::Cr { announcement: None },
b'\n' => {
parsed_line = Some(FragmentInfo::Line {
start: self.start,
end: self.end,
announcement: None,
ending: LineEnding::Lf,
});
LatestByte::Other
}
b'{' => LatestByte::OpeningBracket,
b'0'..=b'9' => {
let digit = (next_byte - b'0') as u32;
let new_length = length.checked_mul(10).and_then(|x| x.checked_add(digit));
match new_length {
None => LatestByte::Other,
Some(length) => LatestByte::Digit { length },
}
}
b'+' => LatestByte::Plus { length },
b'}' => LatestByte::ClosingBracket {
announcement: LiteralAnnouncement {
mode: LiteralMode::Sync,
length,
},
},
_ => LatestByte::Other,
},
LatestByte::ClosingBracket { announcement } => match next_byte {
b'\r' => LatestByte::Cr {
announcement: Some(announcement),
},
b'\n' => {
parsed_line = Some(FragmentInfo::Line {
start: self.start,
end: self.end,
announcement: Some(announcement),
ending: LineEnding::Lf,
});
LatestByte::Other
}
b'{' => LatestByte::OpeningBracket,
_ => LatestByte::Other,
},
LatestByte::Cr { announcement } => match next_byte {
b'\r' => LatestByte::Cr { announcement: None },
b'\n' => {
parsed_line = Some(FragmentInfo::Line {
start: self.start,
end: self.end,
announcement,
ending: LineEnding::CrLf,
});
LatestByte::Other
}
b'{' => LatestByte::OpeningBracket,
_ => LatestByte::Other,
},
};
if parsed_line.is_some() {
break;
}
}
(parsed_byte_count, parsed_line)
}
}
#[derive(Clone, Debug)]
enum LatestByte {
Other,
OpeningBracket,
Digit {
length: u32,
},
Plus {
length: u32,
},
ClosingBracket {
announcement: LiteralAnnouncement,
},
Cr {
announcement: Option<LiteralAnnouncement>,
},
}
#[derive(Clone, Debug)]
struct LiteralParser {
start: usize,
end: usize,
remaining: u32,
}
impl LiteralParser {
fn new(start: usize, length: u32) -> Self {
Self {
start,
end: start,
remaining: length,
}
}
fn parse(&mut self, unprocessed_bytes: &VecDeque<u8>) -> (usize, Option<FragmentInfo>) {
if unprocessed_bytes.len() < self.remaining as usize {
let parsed_byte_count = unprocessed_bytes.len();
self.end += parsed_byte_count;
self.remaining -= parsed_byte_count as u32;
(parsed_byte_count, None)
} else {
let parsed_byte_count = self.remaining as usize;
self.end += parsed_byte_count;
self.remaining = 0;
let parsed_literal = FragmentInfo::Literal {
start: self.start,
end: self.end,
};
(parsed_byte_count, Some(parsed_literal))
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum FragmentInfo {
Line {
start: usize,
end: usize,
announcement: Option<LiteralAnnouncement>,
ending: LineEnding,
},
Literal {
start: usize,
end: usize,
},
}
impl FragmentInfo {
pub fn range(self) -> Range<usize> {
match self {
FragmentInfo::Line { start, end, .. } => start..end,
FragmentInfo::Literal { start, end } => start..end,
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct LiteralAnnouncement {
pub mode: LiteralMode,
pub length: u32,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum LineEnding {
Lf,
CrLf,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum DecodeMessageError<'a, C: Decoder> {
DecodingFailure(C::Error<'a>),
DecodingRemainder {
message: C::Message<'a>,
remainder: Secret<&'a [u8]>,
},
MessageTooLong { initial: Secret<&'a [u8]> },
MessagePoisoned { discarded: Secret<&'a [u8]> },
}
fn parse_tag(message_bytes: &[u8]) -> Option<Tag> {
let mut bytes = message_bytes.iter().enumerate();
let sp = loop {
let (i, byte) = bytes.next()?;
match byte {
b' ' => break i,
b'\n' => return None,
_ => continue,
}
};
Tag::try_from(&message_bytes[..sp]).ok()
}
#[cfg(test)]
mod tests {
use core::panic;
use std::collections::VecDeque;
use imap_types::{
command::{Command, CommandBody},
core::{LiteralMode, Tag},
secret::Secret,
};
use super::{
FragmentInfo, Fragmentizer, LineEnding, LineParser, LiteralAnnouncement, parse_tag,
};
use crate::{
CommandCodec, ResponseCodec, decode::ResponseDecodeError, fragmentizer::DecodeMessageError,
};
#[test]
fn fragmentizer_progress_nothing() {
let mut fragmentizer = Fragmentizer::without_max_message_size();
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(!fragmentizer.is_message_complete());
fragmentizer.enqueue_bytes(&[]);
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(!fragmentizer.is_message_complete());
}
#[test]
fn fragmentizer_progress_single_message() {
let mut fragmentizer = Fragmentizer::without_max_message_size();
fragmentizer.enqueue_bytes(b"* OK ...\r\n");
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 10,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"* OK ...\r\n");
assert!(fragmentizer.is_message_complete());
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(!fragmentizer.is_message_complete());
}
#[test]
fn fragmentizer_progress_multiple_messages() {
let mut fragmentizer = Fragmentizer::without_max_message_size();
fragmentizer.enqueue_bytes(b"A1 OK ...\r\n");
fragmentizer.enqueue_bytes(b"A2 BAD ...\r\n");
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 11,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"A1 OK ...\r\n");
assert!(fragmentizer.is_message_complete());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 12,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(
fragmentizer.fragment_bytes(fragment_info),
b"A2 BAD ...\r\n"
);
assert!(fragmentizer.is_message_complete());
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(!fragmentizer.is_message_complete());
}
#[test]
fn fragmentizer_progress_multiple_messages_with_lf() {
let mut fragmentizer = Fragmentizer::without_max_message_size();
fragmentizer.enqueue_bytes(b"A1 NOOP\n");
fragmentizer.enqueue_bytes(b"A2 LOGIN {5}\n");
fragmentizer.enqueue_bytes(b"ABCDE");
fragmentizer.enqueue_bytes(b" EFGIJ\n");
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 8,
announcement: None,
ending: LineEnding::Lf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"A1 NOOP\n");
assert!(fragmentizer.is_message_complete());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 13,
announcement: Some(LiteralAnnouncement {
mode: LiteralMode::Sync,
length: 5
}),
ending: LineEnding::Lf,
}
);
assert_eq!(
fragmentizer.fragment_bytes(fragment_info),
b"A2 LOGIN {5}\n"
);
assert!(!fragmentizer.is_message_complete());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(fragment_info, FragmentInfo::Literal { start: 13, end: 18 });
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"ABCDE");
assert!(!fragmentizer.is_message_complete());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 18,
end: 25,
announcement: None,
ending: LineEnding::Lf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b" EFGIJ\n");
assert!(fragmentizer.is_message_complete());
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(!fragmentizer.is_message_complete());
}
#[test]
fn fragmentizer_progress_message_with_multiple_literals() {
let mut fragmentizer = Fragmentizer::without_max_message_size();
fragmentizer.enqueue_bytes(b"A1 LOGIN {5}\r\n");
fragmentizer.enqueue_bytes(b"ABCDE");
fragmentizer.enqueue_bytes(b" {5}\r\n");
fragmentizer.enqueue_bytes(b"FGHIJ");
fragmentizer.enqueue_bytes(b"\r\n");
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 14,
announcement: Some(LiteralAnnouncement {
mode: LiteralMode::Sync,
length: 5,
}),
ending: LineEnding::CrLf,
}
);
assert_eq!(
fragmentizer.fragment_bytes(fragment_info),
b"A1 LOGIN {5}\r\n"
);
assert!(!fragmentizer.is_message_complete());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(fragment_info, FragmentInfo::Literal { start: 14, end: 19 });
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"ABCDE");
assert!(!fragmentizer.is_message_complete());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 19,
end: 25,
announcement: Some(LiteralAnnouncement {
mode: LiteralMode::Sync,
length: 5,
}),
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b" {5}\r\n");
assert!(!fragmentizer.is_message_complete());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(fragment_info, FragmentInfo::Literal { start: 25, end: 30 });
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"FGHIJ");
assert!(!fragmentizer.is_message_complete());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 30,
end: 32,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"\r\n");
assert!(fragmentizer.is_message_complete());
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(!fragmentizer.is_message_complete());
}
#[test]
fn fragmentizer_progress_message_and_skip_after_literal_announcement() {
let mut fragmentizer = Fragmentizer::without_max_message_size();
fragmentizer.enqueue_bytes(b"A1 LOGIN {5}\r\n");
fragmentizer.enqueue_bytes(b"A2 NOOP\r\n");
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 14,
announcement: Some(LiteralAnnouncement {
mode: LiteralMode::Sync,
length: 5,
}),
ending: LineEnding::CrLf,
}
);
assert_eq!(
fragmentizer.fragment_bytes(fragment_info),
b"A1 LOGIN {5}\r\n"
);
assert!(!fragmentizer.is_message_complete());
fragmentizer.skip_message();
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 9,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"A2 NOOP\r\n");
assert!(fragmentizer.is_message_complete());
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(!fragmentizer.is_message_complete());
}
#[test]
fn fragmentizer_progress_message_byte_by_byte() {
let mut fragmentizer = Fragmentizer::without_max_message_size();
let mut bytes = VecDeque::new();
bytes.extend(b"A1 LOGIN {5}\r\n");
bytes.extend(b"ABCDE");
bytes.extend(b" FGHIJ\r\n");
for _ in 0..14 {
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert!(!fragmentizer.is_message_complete());
fragmentizer.enqueue_bytes(&[bytes.pop_front().unwrap()]);
}
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 14,
announcement: Some(LiteralAnnouncement {
mode: LiteralMode::Sync,
length: 5,
}),
ending: LineEnding::CrLf,
}
);
assert_eq!(
fragmentizer.fragment_bytes(fragment_info),
b"A1 LOGIN {5}\r\n"
);
assert!(!fragmentizer.is_message_complete());
for _ in 0..5 {
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert!(!fragmentizer.is_message_complete());
fragmentizer.enqueue_bytes(&[bytes.pop_front().unwrap()]);
}
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(fragment_info, FragmentInfo::Literal { start: 14, end: 19 });
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"ABCDE");
assert!(!fragmentizer.is_message_complete());
for _ in 0..8 {
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert!(!fragmentizer.is_message_complete());
fragmentizer.enqueue_bytes(&[bytes.pop_front().unwrap()]);
}
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 19,
end: 27,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b" FGHIJ\r\n");
assert!(fragmentizer.is_message_complete());
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(!fragmentizer.is_message_complete());
}
#[track_caller]
fn assert_is_line(
unprocessed_bytes: &[u8],
line_byte_count: usize,
expected_announcement: Option<LiteralAnnouncement>,
expected_ending: LineEnding,
) {
let mut line_parser = LineParser::new(0);
let unprocessed_bytes = unprocessed_bytes.iter().copied().collect();
let (parsed_byte_count, fragment_info) = line_parser.parse(&unprocessed_bytes);
assert_eq!(parsed_byte_count, line_byte_count);
let Some(FragmentInfo::Line {
start,
end,
announcement,
ending,
}) = fragment_info
else {
panic!("Unexpected fragment: {fragment_info:?}");
};
assert_eq!(start, 0);
assert_eq!(end, line_byte_count);
assert_eq!(announcement, expected_announcement);
assert_eq!(ending, expected_ending);
}
#[test]
fn fragmentizer_progress_multiple_messages_longer_than_max_size() {
let mut fragmentizer = Fragmentizer::new(17);
fragmentizer.enqueue_bytes(b"A1 NOOP\r\n");
fragmentizer.enqueue_bytes(b"A2 LOGIN ABCDE EFGIJ\r\n");
fragmentizer.enqueue_bytes(b"A3 LOGIN {5}\r\n");
fragmentizer.enqueue_bytes(b"ABCDE");
fragmentizer.enqueue_bytes(b" EFGIJ\r\n");
fragmentizer.enqueue_bytes(b"A4 LOGIN A B\r\n");
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 9,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"A1 NOOP\r\n");
assert_eq!(fragmentizer.message_bytes(), b"A1 NOOP\r\n");
assert!(fragmentizer.is_message_complete());
assert!(!fragmentizer.is_max_message_size_exceeded());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 22,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(
fragmentizer.fragment_bytes(fragment_info),
b"A2 LOGIN ABCDE EF"
);
assert_eq!(fragmentizer.message_bytes(), b"A2 LOGIN ABCDE EF");
assert!(fragmentizer.is_message_complete());
assert!(fragmentizer.is_max_message_size_exceeded());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 14,
announcement: Some(LiteralAnnouncement {
mode: LiteralMode::Sync,
length: 5
}),
ending: LineEnding::CrLf,
}
);
assert_eq!(
fragmentizer.fragment_bytes(fragment_info),
b"A3 LOGIN {5}\r\n"
);
assert_eq!(fragmentizer.message_bytes(), b"A3 LOGIN {5}\r\n");
assert!(!fragmentizer.is_message_complete());
assert!(!fragmentizer.is_max_message_size_exceeded());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(fragment_info, FragmentInfo::Literal { start: 14, end: 19 });
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"ABC");
assert_eq!(fragmentizer.message_bytes(), b"A3 LOGIN {5}\r\nABC");
assert!(!fragmentizer.is_message_complete());
assert!(fragmentizer.is_max_message_size_exceeded());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 19,
end: 27,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"");
assert_eq!(fragmentizer.message_bytes(), b"A3 LOGIN {5}\r\nABC");
assert!(fragmentizer.is_message_complete());
assert!(fragmentizer.is_max_message_size_exceeded());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 14,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(
fragmentizer.fragment_bytes(fragment_info),
b"A4 LOGIN A B\r\n"
);
assert_eq!(fragmentizer.message_bytes(), b"A4 LOGIN A B\r\n");
assert!(fragmentizer.is_message_complete());
assert!(!fragmentizer.is_max_message_size_exceeded());
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert_eq!(fragmentizer.message_bytes(), b"");
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(!fragmentizer.is_message_complete());
assert!(!fragmentizer.is_max_message_size_exceeded());
}
#[test]
fn fragmentizer_progress_messages_with_zero_max_size() {
let mut fragmentizer = Fragmentizer::new(0);
fragmentizer.enqueue_bytes(b"A1 NOOP\r\n");
fragmentizer.enqueue_bytes(b"A2 LOGIN ABCDE EFGIJ\r\n");
fragmentizer.enqueue_bytes(b"A3 LOGIN {5}\r\n");
fragmentizer.enqueue_bytes(b"ABCDE");
fragmentizer.enqueue_bytes(b" EFGIJ\r\n");
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 9,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"");
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(fragmentizer.is_message_complete());
assert!(fragmentizer.is_max_message_size_exceeded());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 22,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"");
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(fragmentizer.is_message_complete());
assert!(fragmentizer.is_max_message_size_exceeded());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 14,
announcement: Some(LiteralAnnouncement {
mode: LiteralMode::Sync,
length: 5
}),
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"");
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(!fragmentizer.is_message_complete());
assert!(fragmentizer.is_max_message_size_exceeded());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(fragment_info, FragmentInfo::Literal { start: 14, end: 19 });
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"");
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(!fragmentizer.is_message_complete());
assert!(fragmentizer.is_max_message_size_exceeded());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 19,
end: 27,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"");
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(fragmentizer.is_message_complete());
assert!(fragmentizer.is_max_message_size_exceeded());
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert_eq!(fragmentizer.message_bytes(), b"");
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(!fragmentizer.is_message_complete());
assert!(!fragmentizer.is_max_message_size_exceeded());
}
#[test]
fn fragmentizer_decode_message() {
let command_codec = CommandCodec::new();
let response_codec = ResponseCodec::new();
let mut fragmentizer = Fragmentizer::new(10);
fragmentizer.enqueue_bytes(b"A1 NOOP\r\n");
fragmentizer.enqueue_bytes(b"A2 LOGIN ABCDE EFGIJ\r\n");
fragmentizer.progress();
assert_eq!(
fragmentizer.decode_message(&command_codec),
Ok(Command::new("A1", CommandBody::Noop).unwrap()),
);
assert_eq!(
fragmentizer.decode_message(&response_codec),
Err(DecodeMessageError::DecodingFailure(
ResponseDecodeError::Failed
)),
);
fragmentizer.progress();
assert_eq!(
fragmentizer.decode_message(&response_codec),
Err(DecodeMessageError::MessageTooLong {
initial: Secret::new(b"A2 LOGIN A"),
}),
);
}
#[test]
fn fragmentizer_poison_message() {
let command_codec = CommandCodec::new();
let mut fragmentizer = Fragmentizer::without_max_message_size();
fragmentizer.enqueue_bytes(b"A1 NOOP\r\n");
fragmentizer.enqueue_bytes(b"A2 LOGIN {5}\r\n");
fragmentizer.enqueue_bytes(b"ABCDE");
fragmentizer.enqueue_bytes(b" EFGIJ\r\n");
assert!(!fragmentizer.is_message_poisoned());
fragmentizer.poison_message();
assert!(fragmentizer.is_message_poisoned());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 9,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"A1 NOOP\r\n");
assert_eq!(fragmentizer.message_bytes(), b"A1 NOOP\r\n");
assert!(fragmentizer.is_message_complete());
assert!(fragmentizer.is_message_poisoned());
let decode_err = fragmentizer.decode_message(&command_codec).unwrap_err();
assert_eq!(
decode_err,
DecodeMessageError::MessagePoisoned {
discarded: Secret::new(fragmentizer.message_bytes())
}
);
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 14,
announcement: Some(LiteralAnnouncement {
mode: LiteralMode::Sync,
length: 5
}),
ending: LineEnding::CrLf,
}
);
assert_eq!(
fragmentizer.fragment_bytes(fragment_info),
b"A2 LOGIN {5}\r\n"
);
assert_eq!(fragmentizer.message_bytes(), b"A2 LOGIN {5}\r\n");
assert!(!fragmentizer.is_message_complete());
assert!(!fragmentizer.is_message_poisoned());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(fragment_info, FragmentInfo::Literal { start: 14, end: 19 });
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"ABCDE");
assert_eq!(fragmentizer.message_bytes(), b"A2 LOGIN {5}\r\nABCDE");
assert!(!fragmentizer.is_message_complete());
assert!(!fragmentizer.is_message_poisoned());
fragmentizer.poison_message();
assert!(fragmentizer.is_message_poisoned());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 19,
end: 27,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b" EFGIJ\r\n");
assert_eq!(
fragmentizer.message_bytes(),
b"A2 LOGIN {5}\r\nABCDE EFGIJ\r\n"
);
assert!(fragmentizer.is_message_complete());
assert!(fragmentizer.is_message_poisoned());
let decode_err = fragmentizer.decode_message(&command_codec).unwrap_err();
assert_eq!(
decode_err,
DecodeMessageError::MessagePoisoned {
discarded: Secret::new(fragmentizer.message_bytes())
}
);
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert_eq!(fragmentizer.message_bytes(), b"");
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(!fragmentizer.is_message_complete());
assert!(!fragmentizer.is_message_poisoned());
}
#[test]
fn fragmentizer_poison_too_long_message() {
let command_codec = CommandCodec::new();
let mut fragmentizer = Fragmentizer::new(5);
fragmentizer.enqueue_bytes(b"A1 NOOP\r\n");
assert!(!fragmentizer.is_message_poisoned());
fragmentizer.poison_message();
assert!(fragmentizer.is_message_poisoned());
let fragment_info = fragmentizer.progress().unwrap();
assert_eq!(
fragment_info,
FragmentInfo::Line {
start: 0,
end: 9,
announcement: None,
ending: LineEnding::CrLf,
}
);
assert_eq!(fragmentizer.fragment_bytes(fragment_info), b"A1 NO");
assert_eq!(fragmentizer.message_bytes(), b"A1 NO");
assert!(fragmentizer.is_message_complete());
assert!(fragmentizer.is_max_message_size_exceeded());
assert!(fragmentizer.is_message_poisoned());
let decode_err = fragmentizer.decode_message(&command_codec).unwrap_err();
assert_eq!(
decode_err,
DecodeMessageError::MessageTooLong {
initial: Secret::new(b"A1 NO")
}
);
let fragment_info = fragmentizer.progress();
assert_eq!(fragment_info, None);
assert_eq!(fragmentizer.message_bytes(), b"");
assert_eq!(fragmentizer.message_bytes(), b"");
assert!(!fragmentizer.is_message_complete());
assert!(!fragmentizer.is_max_message_size_exceeded());
assert!(!fragmentizer.is_message_poisoned());
}
#[track_caller]
fn assert_not_line(not_a_line_bytes: &[u8]) {
let mut line_parser = LineParser::new(0);
let not_a_line_bytes = not_a_line_bytes.iter().copied().collect();
let (parsed_byte_count, fragment_info) = line_parser.parse(¬_a_line_bytes);
assert_eq!(parsed_byte_count, not_a_line_bytes.len());
assert_eq!(fragment_info, None);
}
#[test]
fn parse_line_examples() {
assert_not_line(b"");
assert_not_line(b"foo");
assert_is_line(b"\n", 1, None, LineEnding::Lf);
assert_is_line(b"\r\n", 2, None, LineEnding::CrLf);
assert_is_line(b"\n\r", 1, None, LineEnding::Lf);
assert_is_line(b"foo\n", 4, None, LineEnding::Lf);
assert_is_line(b"foo\r\n", 5, None, LineEnding::CrLf);
assert_is_line(b"foo\n\r", 4, None, LineEnding::Lf);
assert_is_line(b"foo\nbar\n", 4, None, LineEnding::Lf);
assert_is_line(b"foo\r\nbar\r\n", 5, None, LineEnding::CrLf);
assert_is_line(b"\r\nfoo\r\n", 2, None, LineEnding::CrLf);
assert_is_line(
b"{1}\r\n",
5,
Some(LiteralAnnouncement {
length: 1,
mode: LiteralMode::Sync,
}),
LineEnding::CrLf,
);
assert_is_line(
b"{1}\n",
4,
Some(LiteralAnnouncement {
length: 1,
mode: LiteralMode::Sync,
}),
LineEnding::Lf,
);
assert_is_line(
b"foo {1}\r\n",
9,
Some(LiteralAnnouncement {
length: 1,
mode: LiteralMode::Sync,
}),
LineEnding::CrLf,
);
assert_is_line(
b"foo {2} {1}\r\n",
13,
Some(LiteralAnnouncement {
length: 1,
mode: LiteralMode::Sync,
}),
LineEnding::CrLf,
);
assert_is_line(b"foo {1} \r\n", 10, None, LineEnding::CrLf);
assert_is_line(b"foo \n {1}\r\n", 5, None, LineEnding::Lf);
assert_is_line(b"foo {1} foo\r\n", 13, None, LineEnding::CrLf);
assert_is_line(b"foo {1\r\n", 8, None, LineEnding::CrLf);
assert_is_line(b"foo 1}\r\n", 8, None, LineEnding::CrLf);
assert_is_line(b"foo { 1}\r\n", 10, None, LineEnding::CrLf);
assert_is_line(
b"foo {{1}\r\n",
10,
Some(LiteralAnnouncement {
length: 1,
mode: LiteralMode::Sync,
}),
LineEnding::CrLf,
);
assert_is_line(
b"foo {42}\r\n",
10,
Some(LiteralAnnouncement {
length: 42,
mode: LiteralMode::Sync,
}),
LineEnding::CrLf,
);
assert_is_line(
b"foo {42+}\r\n",
11,
Some(LiteralAnnouncement {
length: 42,
mode: LiteralMode::NonSync,
}),
LineEnding::CrLf,
);
assert_is_line(
b"foo +{42}\r\n",
11,
Some(LiteralAnnouncement {
length: 42,
mode: LiteralMode::Sync,
}),
LineEnding::CrLf,
);
assert_is_line(b"foo {+}\r\n", 9, None, LineEnding::CrLf);
assert_is_line(b"foo {42++}\r\n", 12, None, LineEnding::CrLf);
assert_is_line(b"foo {+42+}\r\n", 12, None, LineEnding::CrLf);
assert_is_line(b"foo {+42}\r\n", 11, None, LineEnding::CrLf);
assert_is_line(b"foo {42}+\r\n", 11, None, LineEnding::CrLf);
assert_is_line(b"foo {-42}\r\n", 11, None, LineEnding::CrLf);
assert_is_line(b"foo {42-}\r\n", 11, None, LineEnding::CrLf);
assert_is_line(
b"foo {4294967295}\r\n",
18,
Some(LiteralAnnouncement {
length: 4294967295,
mode: LiteralMode::Sync,
}),
LineEnding::CrLf,
);
assert_is_line(b"foo {4294967296}\r\n", 18, None, LineEnding::CrLf);
}
#[test]
fn parse_line_corner_case() {
assert_is_line(
b"* OK {1}\r\n",
10,
Some(LiteralAnnouncement {
length: 1,
mode: LiteralMode::Sync,
}),
LineEnding::CrLf,
);
}
#[test]
fn parse_tag_examples() {
assert_eq!(parse_tag(b"1 NOOP\r\n"), Tag::try_from("1").ok());
assert_eq!(parse_tag(b"12 NOOP\r\n"), Tag::try_from("12").ok());
assert_eq!(parse_tag(b"123 NOOP\r\n"), Tag::try_from("123").ok());
assert_eq!(parse_tag(b"1234 NOOP\r\n"), Tag::try_from("1234").ok());
assert_eq!(parse_tag(b"12345 NOOP\r\n"), Tag::try_from("12345").ok());
assert_eq!(parse_tag(b"A1 NOOP\r\n"), Tag::try_from("A1").ok());
assert_eq!(parse_tag(b"A1 NOOP"), Tag::try_from("A1").ok());
assert_eq!(parse_tag(b"A1 "), Tag::try_from("A1").ok());
assert_eq!(parse_tag(b"A1 "), Tag::try_from("A1").ok());
assert_eq!(parse_tag(b"A1 \r\n"), Tag::try_from("A1").ok());
assert_eq!(parse_tag(b"A1 \n"), Tag::try_from("A1").ok());
assert_eq!(parse_tag(b"A1"), None);
assert_eq!(parse_tag(b"A1\r\n"), None);
assert_eq!(parse_tag(b"A1\n"), None);
assert_eq!(parse_tag(b" \r\n"), None);
assert_eq!(parse_tag(b"\r\n"), None);
assert_eq!(parse_tag(b""), None);
assert_eq!(parse_tag(b" A1 NOOP\r\n"), None);
}
}