use std::io::Read;
use std::time::{Duration, Instant};
use crate::PrintError;
const STX: u8 = 0x02;
const ETX: u8 = 0x03;
pub(crate) const DEFAULT_MAX_FRAME_SIZE: usize = 1024;
enum FrameState {
WaitingForStx,
ReadingFrame,
}
pub fn read_frames(
stream: &mut impl Read,
expected_count: usize,
timeout: Duration,
max_frame_size: usize,
) -> Result<Vec<Vec<u8>>, PrintError> {
let now = Instant::now();
let deadline = now
.checked_add(timeout)
.unwrap_or_else(|| now + Duration::from_secs(86400));
let mut frames: Vec<Vec<u8>> = Vec::with_capacity(expected_count);
let mut current_frame: Vec<u8> = Vec::with_capacity(256);
let mut state = FrameState::WaitingForStx;
let mut buf = [0u8; 512];
while frames.len() < expected_count {
if Instant::now() >= deadline {
return Err(PrintError::ReadTimeout);
}
let n = match stream.read(&mut buf) {
Ok(0) => return Err(PrintError::ConnectionClosed),
Ok(n) => n,
Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => {
if Instant::now() >= deadline {
return Err(PrintError::ReadTimeout);
}
std::thread::sleep(Duration::from_millis(1));
continue;
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
if Instant::now() >= deadline {
return Err(PrintError::ReadTimeout);
}
std::thread::sleep(Duration::from_millis(1));
continue;
}
Err(e) => {
return Err(PrintError::ReadFailed(e));
}
};
for &byte in &buf[..n] {
match (&state, byte) {
(FrameState::WaitingForStx, STX) => {
current_frame.clear();
state = FrameState::ReadingFrame;
}
(FrameState::WaitingForStx, _) => {
}
(FrameState::ReadingFrame, ETX) => {
frames.push(std::mem::take(&mut current_frame));
state = FrameState::WaitingForStx;
if frames.len() >= expected_count {
return Ok(frames);
}
}
(FrameState::ReadingFrame, _) => {
if current_frame.len() >= max_frame_size {
return Err(PrintError::FrameTooLarge {
size: current_frame.len() + 1,
max: max_frame_size,
});
}
current_frame.push(byte);
}
}
}
}
Ok(frames)
}
pub fn expected_frame_count(cmd: &[u8]) -> usize {
if cmd.starts_with(b"~HS") { 3 } else { 1 }
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn reads_single_framed_payload() {
let data = [0x02, b'H', b'e', b'l', b'l', b'o', 0x03];
let mut cursor = Cursor::new(data);
let frames = read_frames(
&mut cursor,
1,
Duration::from_secs(1),
DEFAULT_MAX_FRAME_SIZE,
)
.unwrap();
assert_eq!(frames.len(), 1);
assert_eq!(frames[0], b"Hello");
}
#[test]
fn reads_three_frames_for_hs_like_response() {
let mut data = Vec::new();
data.push(STX);
data.extend_from_slice(b"030,0,0,1245,000,0,0,0,000,0,0,0");
data.push(ETX);
data.extend_from_slice(b"\r\n");
data.push(STX);
data.extend_from_slice(b"000,0,0,0,0,2,4,0,00000000,1,000");
data.push(ETX);
data.extend_from_slice(b"\r\n");
data.push(STX);
data.extend_from_slice(b"1234,0");
data.push(ETX);
let mut cursor = Cursor::new(data);
let frames = read_frames(
&mut cursor,
3,
Duration::from_secs(1),
DEFAULT_MAX_FRAME_SIZE,
)
.unwrap();
assert_eq!(frames.len(), 3);
assert_eq!(frames[0], b"030,0,0,1245,000,0,0,0,000,0,0,0");
assert_eq!(frames[1], b"000,0,0,0,0,2,4,0,00000000,1,000");
assert_eq!(frames[2], b"1234,0");
}
#[test]
fn skips_leading_garbage_before_first_stx() {
let mut data = Vec::new();
data.extend_from_slice(b"\r\n\r\n");
data.push(STX);
data.extend_from_slice(b"data");
data.push(ETX);
let mut cursor = Cursor::new(data);
let frames = read_frames(
&mut cursor,
1,
Duration::from_secs(1),
DEFAULT_MAX_FRAME_SIZE,
)
.unwrap();
assert_eq!(frames.len(), 1);
assert_eq!(frames[0], b"data");
}
#[test]
fn errors_when_frame_exceeds_max_size() {
let mut data = Vec::new();
data.push(STX);
data.extend(vec![b'X'; 2000]);
data.push(ETX);
let mut cursor = Cursor::new(data);
let result = read_frames(&mut cursor, 1, Duration::from_secs(1), 1024);
assert!(result.is_err());
match result.unwrap_err() {
PrintError::FrameTooLarge { max, .. } => assert_eq!(max, 1024),
other => panic!("expected FrameTooLarge, got {:?}", other),
}
}
#[test]
fn allows_empty_frame_between_stx_etx() {
let data = [STX, ETX];
let mut cursor = Cursor::new(data);
let frames = read_frames(
&mut cursor,
1,
Duration::from_secs(1),
DEFAULT_MAX_FRAME_SIZE,
)
.unwrap();
assert_eq!(frames.len(), 1);
assert!(frames[0].is_empty());
}
#[test]
fn returns_connection_closed_on_empty_input() {
let data: &[u8] = &[];
let mut cursor = Cursor::new(data);
let result = read_frames(
&mut cursor,
1,
Duration::from_secs(1),
DEFAULT_MAX_FRAME_SIZE,
);
assert!(matches!(result, Err(PrintError::ConnectionClosed)));
}
#[test]
fn returns_immediately_when_expected_count_is_zero() {
let data = [STX, b'A', ETX];
let mut cursor = Cursor::new(data);
let frames = read_frames(
&mut cursor,
0,
Duration::from_secs(1),
DEFAULT_MAX_FRAME_SIZE,
)
.unwrap();
assert!(frames.is_empty());
}
#[test]
fn parses_back_to_back_frames_in_single_buffer() {
let data = [STX, b'A', ETX, STX, b'B', ETX];
let mut cursor = Cursor::new(data);
let frames = read_frames(
&mut cursor,
2,
Duration::from_secs(1),
DEFAULT_MAX_FRAME_SIZE,
)
.unwrap();
assert_eq!(frames.len(), 2);
assert_eq!(frames[0], b"A");
assert_eq!(frames[1], b"B");
}
#[test]
fn returns_connection_closed_when_no_stx_is_seen() {
let data = [0x0D, 0x0A, b'x', b'y'];
let mut cursor = Cursor::new(data);
let result = read_frames(
&mut cursor,
1,
Duration::from_secs(1),
DEFAULT_MAX_FRAME_SIZE,
);
assert!(matches!(result, Err(PrintError::ConnectionClosed)));
}
#[test]
fn accepts_frame_at_exact_max_size() {
let mut data = Vec::new();
data.push(STX);
data.extend(vec![b'X'; 1024]);
data.push(ETX);
let mut cursor = Cursor::new(data);
let frames = read_frames(&mut cursor, 1, Duration::from_secs(1), 1024).unwrap();
assert_eq!(frames.len(), 1);
assert_eq!(frames[0].len(), 1024);
}
#[test]
fn rejects_frame_one_byte_over_max_size() {
let mut data = Vec::new();
data.push(STX);
data.extend(vec![b'X'; 1025]);
data.push(ETX);
let mut cursor = Cursor::new(data);
let result = read_frames(&mut cursor, 1, Duration::from_secs(1), 1024);
assert!(matches!(result, Err(PrintError::FrameTooLarge { .. })));
}
#[test]
fn errors_when_connection_closes_mid_frame() {
let data = [STX, b'p', b'a', b'r', b't', b'i', b'a', b'l'];
let mut cursor = Cursor::new(data);
let result = read_frames(
&mut cursor,
1,
Duration::from_secs(1),
DEFAULT_MAX_FRAME_SIZE,
);
assert!(result.is_err());
match result.unwrap_err() {
PrintError::ConnectionClosed => {}
other => panic!("expected ConnectionClosed, got {:?}", other),
}
}
}