use std::io::{Read, Result, Seek, SeekFrom};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use tokio_util::sync::CancellationToken;
use crate::reader::AppendableDataWrapper;
#[derive(Debug, Clone)]
pub struct MVecU8Wrapper {
data: Arc<Mutex<Vec<u8>>>,
completed: Arc<AtomicBool>,
}
impl MVecU8Wrapper {
pub fn new() -> Self {
Self {
data: Arc::new(Mutex::new(Vec::new())),
completed: Arc::new(AtomicBool::new(false)),
}
}
pub fn data(&self) -> Arc<Mutex<Vec<u8>>> {
self.data.clone()
}
pub fn completed(&self) -> Arc<AtomicBool> {
self.completed.clone()
}
}
impl AppendableDataWrapper for MVecU8Wrapper {
fn append_data(&mut self, slice: &[u8]) {
let mut data_lock = self.data.lock().unwrap();
data_lock.extend_from_slice(slice);
}
fn complete(&mut self) {
self.completed.store(true, Ordering::SeqCst);
}
fn set_capacity(&mut self, capacity: usize) {
let mut data = self.data.lock().unwrap();
let len = data.len();
data.reserve_exact(capacity - len);
}
}
pub struct MVecU8Reader {
data: Arc<Mutex<Vec<u8>>>,
condvar: Arc<Condvar>,
pos: u64,
download_completed: Arc<AtomicBool>,
cancellation_token: CancellationToken,
}
impl MVecU8Reader {
pub fn new(wrapper: MVecU8Wrapper, condvar: Arc<Condvar>) -> Self {
Self {
data: wrapper.data(),
condvar,
pos: 0,
download_completed: wrapper.completed(),
cancellation_token: CancellationToken::new(),
}
}
pub fn cancellation_token(&self) -> CancellationToken {
self.cancellation_token.clone()
}
}
impl Read for MVecU8Reader {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let lock = &*self.data;
let mut data = lock.lock().unwrap();
while self.pos as usize >= data.len() {
if self.download_completed.load(Ordering::Acquire) {
return Ok(0);
}
if self.cancellation_token.is_cancelled() {
return Ok(0);
}
data = self.condvar.wait(data).unwrap();
}
let available = &data[self.pos as usize..];
let len = available.len().min(buf.len());
buf[..len].copy_from_slice(&available[..len]);
self.pos += len as u64;
Ok(len)
}
}
impl Seek for MVecU8Reader {
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
let new_pos = match pos {
SeekFrom::Start(p) => p,
SeekFrom::Current(off) => (self.pos as i64 + off) as u64,
SeekFrom::End(_) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"SeekFrom::End not supported",
));
}
};
self.pos = new_pos;
Ok(self.pos)
}
}