use {
core::{
sync::atomic::{self, AtomicUsize},
time::Duration,
},
std::{
io::{Error, ErrorKind, Read},
sync::Arc,
thread,
time::SystemTime,
},
crate::{Bytes, IoResult, Options},
};
pub type ArcBi = Arc<Bi>;
impl From<Options> for ArcBi {
fn from(options: Options) -> Self {
Self::new(Bi::from(options))
}
}
pub (crate) const ATOMIC_ORDERING: atomic::Ordering = atomic::Ordering::Relaxed;
const DELAY: Duration = Duration::from_millis(10);
#[derive(Debug)]
pub struct Bi {
options: Options,
counter: Arc<AtomicUsize>,
}
impl Bi {
pub fn read<R>(&self, src: &mut R, capacity: usize) -> IoResult<Bytes> where R: Read {
let mut buf = self.make_buf()?;
let mut bytes = Vec::with_capacity(match self.reserve(capacity) {
Ok(capacity) => capacity,
Err(err) => {
if self.counter.fetch_update(ATOMIC_ORDERING, ATOMIC_ORDERING, |c| Some(c.saturating_sub(self.options.buf_size))).is_err() {
}
return Err(err);
},
});
let remove_size_read_from_counter = |bytes: &Vec<_>| self.counter.fetch_update(
ATOMIC_ORDERING, ATOMIC_ORDERING, |c| Some(c.saturating_sub(bytes.len().max(capacity)).saturating_sub(self.options.buf_size)),
).map_err(|_| Error::new(ErrorKind::Other, __!()));
loop {
match src.read(&mut buf) {
Ok(0) => break,
Ok(size) => {
let required = match bytes.len().checked_add(size) {
Some(new) => new.saturating_sub(bytes.capacity()),
None => {
remove_size_read_from_counter(&bytes)?;
return Err(Error::new(ErrorKind::Other, __!("Out of usize")));
},
};
let start_time = SystemTime::now();
loop {
match match required {
0 => Ok(0),
required => self.counter.fetch_update(ATOMIC_ORDERING, ATOMIC_ORDERING, |c| match c.checked_add(required) {
Some(new) if new <= self.options.limit => Some(new),
_ => None,
}),
} {
Ok(_) => break bytes.extend(&buf[..size]),
Err(_) => if let Err(err) = self.sleep(start_time) {
remove_size_read_from_counter(&bytes)?;
return Err(err);
},
};
}
},
Err(err) => {
remove_size_read_from_counter(&bytes)?;
return Err(err);
},
};
}
Ok(Bytes::new(bytes, self.counter.clone()))
}
pub fn r#move<B>(&self, src: B) -> Result<Bytes, Vec<u8>> where B: Into<Vec<u8>> {
let src = src.into();
match self.reserve(src.len()) {
Ok(_) => Ok(Bytes::new(src, self.counter.clone())),
Err(_) => Err(src),
}
}
fn reserve(&self, capacity: usize) -> IoResult<usize> {
if capacity == 0 {
return Ok(capacity);
}
let start_time = SystemTime::now();
loop {
match self.counter.fetch_update(ATOMIC_ORDERING, ATOMIC_ORDERING, |current| match current.checked_add(capacity) {
Some(new) if new <= self.options.limit => Some(new),
_ => None,
}) {
Ok(_) => return Ok(capacity),
Err(_) => self.sleep(start_time)?,
};
}
}
fn make_buf(&self) -> IoResult<Vec<u8>> {
const ZEROS: &[u8] = &[0; 1024];
self.reserve(self.options.buf_size)?;
let mut result = Vec::with_capacity(self.options.buf_size);
while result.len() < self.options.buf_size {
result.extend(&ZEROS[..ZEROS.len().min(self.options.buf_size.saturating_sub(result.len()))]);
}
Ok(result)
}
fn sleep(&self, start_time: SystemTime) -> IoResult<()> {
let duration = SystemTime::now().duration_since(start_time).map_err(|e| Error::new(ErrorKind::Other, e))?;
if duration < self.options.wait_timeout {
thread::sleep(DELAY);
Ok(())
} else {
Err(Error::new(ErrorKind::TimedOut, "Timed out"))
}
}
}
impl From<Options> for Bi {
fn from(options: Options) -> Self {
Self {
options,
counter: Arc::new(AtomicUsize::new(0)),
}
}
}