use crate::error::AsynResult;
use crate::user::AsynUser;
use super::{EomReason, OctetInterpose, OctetNext, OctetReadResult};
const INPUT_BUFFER_SIZE: usize = 2048;
#[derive(Debug, Clone)]
pub struct EosConfig {
pub input_eos: Vec<u8>,
pub output_eos: Vec<u8>,
}
impl Default for EosConfig {
fn default() -> Self {
Self {
input_eos: Vec::new(),
output_eos: Vec::new(),
}
}
}
pub struct EosInterpose {
config: EosConfig,
in_buf: Vec<u8>,
in_buf_head: usize,
in_buf_tail: usize,
eos_in_match: usize,
}
impl EosInterpose {
pub fn new(config: EosConfig) -> Self {
Self {
config,
in_buf: vec![0u8; INPUT_BUFFER_SIZE],
in_buf_head: 0,
in_buf_tail: 0,
eos_in_match: 0,
}
}
pub fn set_input_eos(&mut self, eos: &[u8]) {
self.config.input_eos = eos.to_vec();
self.eos_in_match = 0;
}
pub fn set_output_eos(&mut self, eos: &[u8]) {
self.config.output_eos = eos.to_vec();
}
pub fn get_input_eos(&self) -> &[u8] {
&self.config.input_eos
}
pub fn get_output_eos(&self) -> &[u8] {
&self.config.output_eos
}
}
impl OctetInterpose for EosInterpose {
fn read(
&mut self,
user: &AsynUser,
buf: &mut [u8],
next: &mut dyn OctetNext,
) -> AsynResult<OctetReadResult> {
if self.config.input_eos.is_empty() {
return next.read(user, buf);
}
let maxchars = buf.len();
let mut n_read: usize = 0;
let mut eom = EomReason::empty();
loop {
if self.in_buf_tail != self.in_buf_head {
let c = self.in_buf[self.in_buf_tail];
self.in_buf_tail += 1;
buf[n_read] = c;
n_read += 1;
let eos = &self.config.input_eos;
if c == eos[self.eos_in_match] {
self.eos_in_match += 1;
if self.eos_in_match == eos.len() {
self.eos_in_match = 0;
n_read -= eos.len();
eom |= EomReason::EOS;
break;
}
} else {
if c == eos[0] {
self.eos_in_match = 1;
} else {
self.eos_in_match = 0;
}
}
if n_read >= maxchars {
eom = EomReason::CNT;
break;
}
continue;
}
if !eom.is_empty() {
break;
}
let result = match next.read(user, &mut self.in_buf[..]) {
Ok(r) => r,
Err(_) if n_read > 0 => {
break;
}
Err(e) => return Err(e),
};
if result.nbytes_transferred == 0 {
break;
}
self.in_buf_tail = 0;
self.in_buf_head = result.nbytes_transferred;
eom = result.eom_reason & !EomReason::CNT;
}
if n_read < maxchars {
buf[n_read] = 0;
}
Ok(OctetReadResult {
nbytes_transferred: n_read,
eom_reason: eom,
})
}
fn write(
&mut self,
user: &mut AsynUser,
data: &[u8],
next: &mut dyn OctetNext,
) -> AsynResult<usize> {
if self.config.output_eos.is_empty() {
return next.write(user, data);
}
let mut buf = Vec::with_capacity(data.len() + self.config.output_eos.len());
buf.extend_from_slice(data);
buf.extend_from_slice(&self.config.output_eos);
let actual = next.write(user, &buf)?;
Ok(actual.min(data.len()))
}
fn flush(&mut self, user: &mut AsynUser, next: &mut dyn OctetNext) -> AsynResult<()> {
self.in_buf_head = 0;
self.in_buf_tail = 0;
self.eos_in_match = 0;
next.flush(user)
}
}
#[cfg(test)]
mod tests {
use super::*;
struct MockOctetBase {
data: Vec<u8>,
pos: usize,
written: Vec<u8>,
}
impl MockOctetBase {
fn new(data: &[u8]) -> Self {
Self {
data: data.to_vec(),
pos: 0,
written: Vec::new(),
}
}
}
impl OctetNext for MockOctetBase {
fn read(&mut self, _user: &AsynUser, buf: &mut [u8]) -> AsynResult<OctetReadResult> {
let avail = self.data.len() - self.pos;
let n = avail.min(buf.len());
buf[..n].copy_from_slice(&self.data[self.pos..self.pos + n]);
self.pos += n;
Ok(OctetReadResult {
nbytes_transferred: n,
eom_reason: EomReason::CNT,
})
}
fn write(&mut self, _user: &mut AsynUser, data: &[u8]) -> AsynResult<usize> {
self.written.extend_from_slice(data);
Ok(data.len())
}
fn flush(&mut self, _user: &mut AsynUser) -> AsynResult<()> {
Ok(())
}
}
#[test]
fn test_single_char_eos() {
let mut interpose = EosInterpose::new(EosConfig {
input_eos: vec![b'\n'],
output_eos: vec![],
});
let mut base = MockOctetBase::new(b"hello\nworld\n");
let user = AsynUser::default();
let mut buf = [0u8; 64];
let r = interpose.read(&user, &mut buf, &mut base).unwrap();
assert_eq!(&buf[..r.nbytes_transferred], b"hello");
assert!(r.eom_reason.contains(EomReason::EOS));
let r = interpose.read(&user, &mut buf, &mut base).unwrap();
assert_eq!(&buf[..r.nbytes_transferred], b"world");
assert!(r.eom_reason.contains(EomReason::EOS));
}
#[test]
fn test_two_char_eos() {
let mut interpose = EosInterpose::new(EosConfig {
input_eos: vec![b'\r', b'\n'],
output_eos: vec![],
});
let mut base = MockOctetBase::new(b"cmd1\r\ncmd2\r\n");
let user = AsynUser::default();
let mut buf = [0u8; 64];
let r = interpose.read(&user, &mut buf, &mut base).unwrap();
assert_eq!(&buf[..r.nbytes_transferred], b"cmd1");
assert!(r.eom_reason.contains(EomReason::EOS));
let r = interpose.read(&user, &mut buf, &mut base).unwrap();
assert_eq!(&buf[..r.nbytes_transferred], b"cmd2");
assert!(r.eom_reason.contains(EomReason::EOS));
}
#[test]
fn test_output_eos_append() {
let mut interpose = EosInterpose::new(EosConfig {
input_eos: vec![],
output_eos: vec![b'\r', b'\n'],
});
let mut base = MockOctetBase::new(b"");
let mut user = AsynUser::default();
let n = interpose.write(&mut user, b"hello", &mut base).unwrap();
assert_eq!(&base.written, b"hello\r\n");
assert_eq!(n, 5);
}
#[test]
fn test_no_eos_passthrough() {
let mut interpose = EosInterpose::new(EosConfig::default());
let mut base = MockOctetBase::new(b"data");
let user = AsynUser::default();
let mut buf = [0u8; 64];
let r = interpose.read(&user, &mut buf, &mut base).unwrap();
assert_eq!(&buf[..r.nbytes_transferred], b"data");
}
#[test]
fn test_flush_clears_buffer() {
let mut interpose = EosInterpose::new(EosConfig {
input_eos: vec![b'\n'],
output_eos: vec![],
});
let mut base = MockOctetBase::new(b"partial");
let user = AsynUser::default();
let mut buf = [0u8; 4];
let _ = interpose.read(&user, &mut buf, &mut base);
let mut user2 = AsynUser::default();
interpose.flush(&mut user2, &mut base).unwrap();
assert_eq!(interpose.in_buf_head, 0);
assert_eq!(interpose.in_buf_tail, 0);
assert_eq!(interpose.eos_in_match, 0);
}
#[test]
fn test_eos_config_getters_setters() {
let mut interpose = EosInterpose::new(EosConfig::default());
assert!(interpose.get_input_eos().is_empty());
interpose.set_input_eos(b"\n");
assert_eq!(interpose.get_input_eos(), b"\n");
interpose.set_output_eos(b"\r\n");
assert_eq!(interpose.get_output_eos(), b"\r\n");
}
#[test]
fn test_null_termination() {
let mut interpose = EosInterpose::new(EosConfig {
input_eos: vec![b'\n'],
output_eos: vec![],
});
let mut base = MockOctetBase::new(b"hi\n");
let user = AsynUser::default();
let mut buf = [0xFFu8; 64];
let r = interpose.read(&user, &mut buf, &mut base).unwrap();
assert_eq!(r.nbytes_transferred, 2);
assert_eq!(&buf[..2], b"hi");
assert_eq!(buf[2], 0);
}
#[test]
fn test_eos_resynchronization() {
let mut interpose = EosInterpose::new(EosConfig {
input_eos: vec![b'\r', b'\n'],
output_eos: vec![],
});
let mut base = MockOctetBase::new(b"a\rb\r\n");
let user = AsynUser::default();
let mut buf = [0u8; 64];
let r = interpose.read(&user, &mut buf, &mut base).unwrap();
assert_eq!(&buf[..r.nbytes_transferred], b"a\rb");
assert!(r.eom_reason.contains(EomReason::EOS));
}
#[test]
fn test_cnt_filtering_from_lower_layer() {
struct CntBase {
chunks: Vec<Vec<u8>>,
idx: usize,
}
impl OctetNext for CntBase {
fn read(&mut self, _user: &AsynUser, buf: &mut [u8]) -> AsynResult<OctetReadResult> {
if self.idx < self.chunks.len() {
let chunk = &self.chunks[self.idx];
self.idx += 1;
let n = chunk.len().min(buf.len());
buf[..n].copy_from_slice(&chunk[..n]);
Ok(OctetReadResult {
nbytes_transferred: n,
eom_reason: EomReason::CNT,
})
} else {
Ok(OctetReadResult {
nbytes_transferred: 0,
eom_reason: EomReason::empty(),
})
}
}
fn write(&mut self, _user: &mut AsynUser, _data: &[u8]) -> AsynResult<usize> {
Ok(0)
}
fn flush(&mut self, _user: &mut AsynUser) -> AsynResult<()> {
Ok(())
}
}
let mut interpose = EosInterpose::new(EosConfig {
input_eos: vec![b'\n'],
output_eos: vec![],
});
let mut base = CntBase {
chunks: vec![b"hel".to_vec(), b"lo\n".to_vec()],
idx: 0,
};
let user = AsynUser::default();
let mut buf = [0u8; 64];
let r = interpose.read(&user, &mut buf, &mut base).unwrap();
assert_eq!(&buf[..r.nbytes_transferred], b"hello");
assert!(r.eom_reason.contains(EomReason::EOS));
assert!(!r.eom_reason.contains(EomReason::CNT));
}
#[test]
fn test_buffer_full_returns_cnt() {
let mut interpose = EosInterpose::new(EosConfig {
input_eos: vec![b'\n'],
output_eos: vec![],
});
let mut base = MockOctetBase::new(b"abcdefgh\n");
let user = AsynUser::default();
let mut buf = [0u8; 4];
let r = interpose.read(&user, &mut buf, &mut base).unwrap();
assert_eq!(r.nbytes_transferred, 4);
assert_eq!(&buf[..4], b"abcd");
assert!(r.eom_reason.contains(EomReason::CNT));
let mut buf2 = [0u8; 64];
let r = interpose.read(&user, &mut buf2, &mut base).unwrap();
assert_eq!(&buf2[..r.nbytes_transferred], b"efgh");
assert!(r.eom_reason.contains(EomReason::EOS));
}
}