pub mod delay;
pub mod echo;
pub mod eos;
pub mod flush;
use bitflags::bitflags;
use crate::error::AsynResult;
use crate::user::AsynUser;
bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EomReason: u32 {
const CNT = 0x01;
const EOS = 0x02;
const END = 0x04;
}
}
#[derive(Debug, Clone)]
pub struct OctetReadResult {
pub nbytes_transferred: usize,
pub eom_reason: EomReason,
}
pub trait OctetNext: Send + Sync {
fn read(&mut self, user: &AsynUser, buf: &mut [u8]) -> AsynResult<OctetReadResult>;
fn write(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<usize>;
fn flush(&mut self, user: &mut AsynUser) -> AsynResult<()>;
}
pub trait OctetInterpose: Send + Sync {
fn read(
&mut self,
user: &AsynUser,
buf: &mut [u8],
next: &mut dyn OctetNext,
) -> AsynResult<OctetReadResult>;
fn write(
&mut self,
user: &mut AsynUser,
data: &[u8],
next: &mut dyn OctetNext,
) -> AsynResult<usize>;
fn flush(
&mut self,
user: &mut AsynUser,
next: &mut dyn OctetNext,
) -> AsynResult<()>;
}
pub struct OctetInterposeStack {
layers: Vec<Box<dyn OctetInterpose>>,
}
impl OctetInterposeStack {
pub fn new() -> Self {
Self {
layers: Vec::new(),
}
}
pub fn push(&mut self, layer: Box<dyn OctetInterpose>) {
self.layers.push(layer);
}
pub fn len(&self) -> usize {
self.layers.len()
}
pub fn is_empty(&self) -> bool {
self.layers.is_empty()
}
pub fn dispatch_read(
&mut self,
user: &AsynUser,
buf: &mut [u8],
base: &mut dyn OctetNext,
) -> AsynResult<OctetReadResult> {
if self.layers.is_empty() {
return base.read(user, buf);
}
let mut chain = InterposeChain {
layers: &mut self.layers,
base,
};
chain.read(user, buf)
}
pub fn dispatch_write(
&mut self,
user: &mut AsynUser,
data: &[u8],
base: &mut dyn OctetNext,
) -> AsynResult<usize> {
if self.layers.is_empty() {
return base.write(user, data);
}
let mut chain = InterposeChain {
layers: &mut self.layers,
base,
};
chain.write(user, data)
}
pub fn dispatch_flush(
&mut self,
user: &mut AsynUser,
base: &mut dyn OctetNext,
) -> AsynResult<()> {
if self.layers.is_empty() {
return base.flush(user);
}
let mut chain = InterposeChain {
layers: &mut self.layers,
base,
};
chain.flush(user)
}
}
impl Default for OctetInterposeStack {
fn default() -> Self {
Self::new()
}
}
struct InterposeChain<'a> {
layers: &'a mut [Box<dyn OctetInterpose>],
base: &'a mut dyn OctetNext,
}
impl OctetNext for InterposeChain<'_> {
fn read(&mut self, user: &AsynUser, buf: &mut [u8]) -> AsynResult<OctetReadResult> {
if let Some((first, rest)) = self.layers.split_first_mut() {
let mut next = InterposeChain {
layers: rest,
base: self.base,
};
first.read(user, buf, &mut next)
} else {
self.base.read(user, buf)
}
}
fn write(&mut self, user: &mut AsynUser, data: &[u8]) -> AsynResult<usize> {
if let Some((first, rest)) = self.layers.split_first_mut() {
let mut next = InterposeChain {
layers: rest,
base: self.base,
};
first.write(user, data, &mut next)
} else {
self.base.write(user, data)
}
}
fn flush(&mut self, user: &mut AsynUser) -> AsynResult<()> {
if let Some((first, rest)) = self.layers.split_first_mut() {
let mut next = InterposeChain {
layers: rest,
base: self.base,
};
first.flush(user, &mut next)
} else {
self.base.flush(user)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::user::AsynUser;
struct MockBase {
read_data: Vec<u8>,
written: Vec<u8>,
flushed: bool,
}
impl MockBase {
fn new(data: &[u8]) -> Self {
Self {
read_data: data.to_vec(),
written: Vec::new(),
flushed: false,
}
}
}
impl OctetNext for MockBase {
fn read(&mut self, _user: &AsynUser, buf: &mut [u8]) -> AsynResult<OctetReadResult> {
let n = self.read_data.len().min(buf.len());
buf[..n].copy_from_slice(&self.read_data[..n]);
Ok(OctetReadResult {
nbytes_transferred: n,
eom_reason: EomReason::CNT,
})
}
fn write(&mut self, _user: &mut AsynUser, data: &[u8]) -> AsynResult<usize> {
self.written.extend_from_slice(data);
Ok(data.len())
}
fn flush(&mut self, _user: &mut AsynUser) -> AsynResult<()> {
self.flushed = true;
Ok(())
}
}
struct PassthroughInterpose;
impl OctetInterpose for PassthroughInterpose {
fn read(
&mut self,
user: &AsynUser,
buf: &mut [u8],
next: &mut dyn OctetNext,
) -> AsynResult<OctetReadResult> {
next.read(user, buf)
}
fn write(
&mut self,
user: &mut AsynUser,
data: &[u8],
next: &mut dyn OctetNext,
) -> AsynResult<usize> {
next.write(user, data)
}
fn flush(
&mut self,
user: &mut AsynUser,
next: &mut dyn OctetNext,
) -> AsynResult<()> {
next.flush(user)
}
}
struct UppercaseInterpose;
impl OctetInterpose for UppercaseInterpose {
fn read(
&mut self,
user: &AsynUser,
buf: &mut [u8],
next: &mut dyn OctetNext,
) -> AsynResult<OctetReadResult> {
next.read(user, buf)
}
fn write(
&mut self,
user: &mut AsynUser,
data: &[u8],
next: &mut dyn OctetNext,
) -> AsynResult<usize> {
let upper: Vec<u8> = data.iter().map(|b| b.to_ascii_uppercase()).collect();
next.write(user, &upper)
}
fn flush(
&mut self,
user: &mut AsynUser,
next: &mut dyn OctetNext,
) -> AsynResult<()> {
next.flush(user)
}
}
#[test]
fn test_empty_stack_passthrough() {
let mut stack = OctetInterposeStack::new();
let mut base = MockBase::new(b"hello");
let user = AsynUser::default();
let mut buf = [0u8; 32];
let result = stack.dispatch_read(&user, &mut buf, &mut base).unwrap();
assert_eq!(result.nbytes_transferred, 5);
assert_eq!(&buf[..5], b"hello");
}
#[test]
fn test_single_passthrough_layer() {
let mut stack = OctetInterposeStack::new();
stack.push(Box::new(PassthroughInterpose));
let mut base = MockBase::new(b"world");
let user = AsynUser::default();
let mut buf = [0u8; 32];
let result = stack.dispatch_read(&user, &mut buf, &mut base).unwrap();
assert_eq!(result.nbytes_transferred, 5);
assert_eq!(&buf[..5], b"world");
}
#[test]
fn test_uppercase_interpose_write() {
let mut stack = OctetInterposeStack::new();
stack.push(Box::new(UppercaseInterpose));
let mut base = MockBase::new(b"");
let mut user = AsynUser::default();
let n = stack.dispatch_write(&mut user, b"hello", &mut base).unwrap();
assert_eq!(n, 5);
assert_eq!(&base.written, b"HELLO");
}
#[test]
fn test_multi_layer_chain() {
let mut stack = OctetInterposeStack::new();
stack.push(Box::new(PassthroughInterpose));
stack.push(Box::new(UppercaseInterpose));
assert_eq!(stack.len(), 2);
let mut base = MockBase::new(b"");
let mut user = AsynUser::default();
stack.dispatch_write(&mut user, b"test", &mut base).unwrap();
assert_eq!(&base.written, b"TEST");
}
#[test]
fn test_flush_dispatch() {
let mut stack = OctetInterposeStack::new();
stack.push(Box::new(PassthroughInterpose));
let mut base = MockBase::new(b"");
let mut user = AsynUser::default();
stack.dispatch_flush(&mut user, &mut base).unwrap();
assert!(base.flushed);
}
}