use bytes::{Bytes, BytesMut};
use std::io::{Read, Result, Seek, SeekFrom};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use tokio_util::sync::CancellationToken;
use super::AppendableDataWrapper;
#[derive(Debug, Clone)]
pub struct MVecBytesWrapper {
data: Arc<Mutex<Vec<Bytes>>>,
completed: Arc<AtomicBool>,
chunk_size: usize,
current_chunk: BytesMut,
}
impl MVecBytesWrapper {
pub fn new(chunk_size: usize) -> Self {
Self {
data: Arc::new(Mutex::new(Vec::new())),
completed: Arc::new(AtomicBool::new(false)),
chunk_size,
current_chunk: BytesMut::with_capacity(chunk_size),
}
}
pub fn data(&self) -> Arc<Mutex<Vec<Bytes>>> {
self.data.clone()
}
pub fn completed(&self) -> Arc<AtomicBool> {
self.completed.clone()
}
pub fn chunk_size(&self) -> usize {
self.chunk_size
}
}
impl AppendableDataWrapper for MVecBytesWrapper {
fn append_data(&mut self, slice: &[u8]) {
if self.completed.load(Ordering::SeqCst) {
return;
}
let current_chunk_len = self.current_chunk.len();
if current_chunk_len + slice.len() <= self.chunk_size {
self.current_chunk.extend_from_slice(slice);
if self.current_chunk.len() == self.chunk_size {
self.data
.lock()
.unwrap()
.push(self.current_chunk.clone().freeze());
self.current_chunk = BytesMut::with_capacity(self.chunk_size);
}
}
else {
let mut append_data: Vec<Bytes> = Vec::new();
let mut offset = 0;
if current_chunk_len != 0 {
let first_part_len = self.chunk_size - current_chunk_len;
let first_part = &slice[..first_part_len];
self.current_chunk.extend_from_slice(first_part);
append_data.push(self.current_chunk.clone().freeze());
offset += first_part_len;
self.current_chunk = BytesMut::with_capacity(self.chunk_size);
}
while offset + self.chunk_size <= slice.len() {
append_data.push(Bytes::copy_from_slice(
&slice[offset..offset + self.chunk_size],
));
offset += self.chunk_size;
}
if offset < slice.len() {
let remaining = &slice[offset..];
self.current_chunk.extend_from_slice(remaining);
} else {
self.current_chunk = BytesMut::with_capacity(self.chunk_size);
}
self.data.lock().unwrap().append(&mut append_data);
}
}
fn complete(&mut self) {
if self.current_chunk.len() > 0 {
self.data
.lock()
.unwrap()
.push(self.current_chunk.clone().freeze());
self.current_chunk = BytesMut::new();
}
self.completed.store(true, Ordering::SeqCst);
}
fn set_capacity(&mut self, capacity: usize) {
let mut data = self.data.lock().unwrap();
let len = data.len();
data.reserve_exact((capacity - len) / self.chunk_size + 1);
}
}
pub struct MVecBytesReader {
data: Arc<Mutex<Vec<Bytes>>>,
chunk_size: usize,
condvar: Arc<Condvar>,
pos: u64,
download_completed: Arc<AtomicBool>,
cancellation_token: CancellationToken,
}
impl MVecBytesReader {
pub fn new(wrapper: MVecBytesWrapper, condvar: Arc<Condvar>) -> Self {
Self {
data: wrapper.data(),
condvar,
chunk_size: wrapper.chunk_size(),
pos: 0,
download_completed: wrapper.completed(),
cancellation_token: CancellationToken::new(),
}
}
pub fn cancellation_token(&self) -> CancellationToken {
self.cancellation_token.clone()
}
}
impl Read for MVecBytesReader {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let lock = &*self.data;
let mut data = lock.lock().unwrap();
while self.pos as usize >= data.len() * self.chunk_size {
if self.download_completed.load(Ordering::Acquire) {
return Ok(0);
}
if self.cancellation_token.is_cancelled() {
return Ok(0);
}
data = self.condvar.wait(data).unwrap();
}
let chunk_start_idx = self.pos as usize / self.chunk_size;
let chunk_start_offset = self.pos as usize % self.chunk_size;
let mut chunk_end_idx = (self.pos as usize + buf.len()) / self.chunk_size;
let mut chunk_end_offset = (self.pos as usize + buf.len()) % self.chunk_size;
if chunk_end_idx >= data.len() {
chunk_end_idx = data.len();
chunk_end_offset = 0;
}
let start_chunk = data[chunk_start_idx].clone();
let middle_chunks: Option<Vec<Bytes>> = if chunk_end_idx - chunk_start_idx > 1 {
Some(data[chunk_start_idx + 1..chunk_end_idx].to_vec())
} else {
None
};
let end_chunk = if chunk_end_idx > chunk_start_idx && chunk_end_offset > 0 {
Some(data[chunk_end_idx].clone())
} else {
None
};
drop(data);
let mut offset: usize = 0;
if chunk_start_idx == chunk_end_idx {
let chunk = start_chunk;
let len = chunk_end_offset.min(chunk.len()) - chunk_start_offset;
buf[..len].copy_from_slice(&chunk[chunk_start_offset..chunk_start_offset + len]);
offset += len;
} else {
{
let len = start_chunk.len() - chunk_start_offset;
buf[..len].copy_from_slice(&start_chunk[chunk_start_offset..]);
offset += len;
}
if let Some(middle_chunks) = middle_chunks {
for chunk in middle_chunks {
let len = chunk.len();
buf[offset..offset + len].copy_from_slice(&chunk);
offset += len;
}
}
if let Some(end_chunk) = end_chunk {
let len = chunk_end_offset.min(end_chunk.len());
buf[offset..offset + len].copy_from_slice(&end_chunk[..len]);
offset += len;
}
}
self.pos += offset as u64;
Ok(offset)
}
}
impl Seek for MVecBytesReader {
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
let new_pos = match pos {
SeekFrom::Start(p) => p,
SeekFrom::Current(off) => (self.pos as i64 + off) as u64,
SeekFrom::End(_) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"SeekFrom::End not supported",
));
}
};
self.pos = new_pos;
Ok(self.pos)
}
}