use std::time::{Duration, Instant};
use crate::error::AsynResult;
use crate::user::AsynUser;
use super::{OctetInterpose, OctetNext, OctetReadResult};
pub struct FlushTimeoutInterpose {
pub flush_timeout: Duration,
pub flush_buffer_size: usize,
}
impl FlushTimeoutInterpose {
pub fn new(flush_timeout: Duration) -> Self {
Self {
flush_timeout,
flush_buffer_size: 512,
}
}
}
impl Default for FlushTimeoutInterpose {
fn default() -> Self {
Self::new(Duration::from_millis(50))
}
}
impl OctetInterpose for FlushTimeoutInterpose {
fn read(
&mut self,
user: &AsynUser,
buf: &mut [u8],
next: &mut dyn OctetNext,
) -> AsynResult<OctetReadResult> {
let deadline = Instant::now() + self.flush_timeout;
let mut discard = vec![0u8; self.flush_buffer_size];
let flush_user = AsynUser::new(user.reason)
.with_addr(user.addr)
.with_timeout(self.flush_timeout);
loop {
if Instant::now() >= deadline {
break;
}
match next.read(&flush_user, &mut discard) {
Ok(result) if result.nbytes_transferred > 0 => {
continue;
}
_ => break,
}
}
next.read(user, buf)
}
fn write(
&mut self,
user: &mut AsynUser,
data: &[u8],
next: &mut dyn OctetNext,
) -> AsynResult<usize> {
next.write(user, data)
}
fn flush(
&mut self,
user: &mut AsynUser,
next: &mut dyn OctetNext,
) -> AsynResult<()> {
next.flush(user)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use crate::interpose::EomReason;
struct CountingBase {
read_count: Arc<AtomicUsize>,
reads_returning_data: usize,
}
impl CountingBase {
fn new(reads_returning_data: usize) -> Self {
Self {
read_count: Arc::new(AtomicUsize::new(0)),
reads_returning_data,
}
}
}
impl OctetNext for CountingBase {
fn read(&mut self, _user: &AsynUser, buf: &mut [u8]) -> AsynResult<OctetReadResult> {
let n = self.read_count.fetch_add(1, Ordering::Relaxed);
if n < self.reads_returning_data {
let msg = b"stale";
let len = msg.len().min(buf.len());
buf[..len].copy_from_slice(&msg[..len]);
Ok(OctetReadResult {
nbytes_transferred: len,
eom_reason: EomReason::CNT,
})
} else if n == self.reads_returning_data {
Ok(OctetReadResult {
nbytes_transferred: 0,
eom_reason: EomReason::CNT,
})
} else {
let msg = b"real";
let len = msg.len().min(buf.len());
buf[..len].copy_from_slice(&msg[..len]);
Ok(OctetReadResult {
nbytes_transferred: len,
eom_reason: EomReason::CNT,
})
}
}
fn write(&mut self, _user: &mut AsynUser, _data: &[u8]) -> AsynResult<usize> {
Ok(0)
}
fn flush(&mut self, _user: &mut AsynUser) -> AsynResult<()> {
Ok(())
}
}
#[test]
fn test_flush_discards_stale_data() {
let mut interpose = FlushTimeoutInterpose::new(Duration::from_millis(10));
let mut base = CountingBase::new(2); let user = AsynUser::default();
let mut buf = [0u8; 32];
let result = interpose.read(&user, &mut buf, &mut base).unwrap();
assert_eq!(&buf[..result.nbytes_transferred], b"real");
assert!(base.read_count.load(Ordering::Relaxed) >= 3);
}
#[test]
fn test_write_passthrough() {
let mut interpose = FlushTimeoutInterpose::default();
let mut base = CountingBase::new(0);
let mut user = AsynUser::default();
let n = interpose.write(&mut user, b"hello", &mut base).unwrap();
assert_eq!(n, 0); }
}