use crate::error::{Result, StreamingError};
use bytes::{Bytes, BytesMut};
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, warn};
#[derive(Debug, Clone)]
pub struct ChunkDescriptor {
pub offset: u64,
pub length: usize,
pub index: usize,
pub total_chunks: usize,
pub is_last: bool,
}
impl ChunkDescriptor {
pub fn new(offset: u64, length: usize, index: usize, total_chunks: usize) -> Self {
Self {
offset,
length,
index,
total_chunks,
is_last: index + 1 == total_chunks,
}
}
pub fn end_offset(&self) -> u64 {
self.offset + self.length as u64
}
}
pub struct ChunkedBuffer {
inner: Arc<RwLock<ChunkedBufferInner>>,
chunk_size: usize,
max_size: usize,
}
struct ChunkedBufferInner {
chunks: VecDeque<BufferedChunk>,
current_size: usize,
next_read_index: usize,
next_write_index: usize,
total_chunks: Option<usize>,
write_complete: bool,
}
struct BufferedChunk {
descriptor: ChunkDescriptor,
data: Bytes,
}
impl ChunkedBuffer {
pub fn new(chunk_size: usize, max_size: usize) -> Self {
Self {
inner: Arc::new(RwLock::new(ChunkedBufferInner {
chunks: VecDeque::new(),
current_size: 0,
next_read_index: 0,
next_write_index: 0,
total_chunks: None,
write_complete: false,
})),
chunk_size,
max_size,
}
}
pub fn with_defaults() -> Self {
Self::new(1024 * 1024, 100 * 1024 * 1024) }
pub fn calculate_chunks(&self, total_size: u64) -> usize {
((total_size + self.chunk_size as u64 - 1) / self.chunk_size as u64) as usize
}
pub fn descriptor_for_index(&self, index: usize, total_size: u64) -> ChunkDescriptor {
let total_chunks = self.calculate_chunks(total_size);
let offset = (index as u64) * (self.chunk_size as u64);
let remaining = total_size.saturating_sub(offset);
let length = remaining.min(self.chunk_size as u64) as usize;
ChunkDescriptor::new(offset, length, index, total_chunks)
}
pub async fn push(&self, descriptor: ChunkDescriptor, data: Bytes) -> Result<()> {
let mut inner = self.inner.write().await;
if inner.current_size + data.len() > self.max_size {
return Err(StreamingError::BufferFull);
}
if descriptor.index != inner.next_write_index {
return Err(StreamingError::InvalidOperation(format!(
"Expected chunk {}, got {}",
inner.next_write_index, descriptor.index
)));
}
inner.chunks.push_back(BufferedChunk {
descriptor: descriptor.clone(),
data,
});
inner.current_size += descriptor.length;
inner.next_write_index += 1;
if let Some(total) = inner.total_chunks {
if descriptor.index + 1 == total {
inner.write_complete = true;
}
} else if descriptor.is_last {
inner.total_chunks = Some(descriptor.total_chunks);
inner.write_complete = true;
}
debug!(
"Pushed chunk {} ({} bytes), buffer size: {}",
descriptor.index, descriptor.length, inner.current_size
);
Ok(())
}
pub async fn pop(&self) -> Result<Option<(ChunkDescriptor, Bytes)>> {
let mut inner = self.inner.write().await;
if inner.chunks.is_empty() {
if inner.write_complete {
return Ok(None);
} else {
return Err(StreamingError::Other("No chunks available".to_string()));
}
}
let chunk = inner.chunks.pop_front()
.ok_or_else(|| StreamingError::Other("Failed to pop chunk".to_string()))?;
inner.current_size = inner.current_size.saturating_sub(chunk.descriptor.length);
inner.next_read_index += 1;
debug!(
"Popped chunk {} ({} bytes), buffer size: {}",
chunk.descriptor.index, chunk.descriptor.length, inner.current_size
);
Ok(Some((chunk.descriptor, chunk.data)))
}
pub async fn peek(&self) -> Result<Option<ChunkDescriptor>> {
let inner = self.inner.read().await;
Ok(inner.chunks.front().map(|c| c.descriptor.clone()))
}
pub async fn len(&self) -> usize {
let inner = self.inner.read().await;
inner.chunks.len()
}
pub async fn is_empty(&self) -> bool {
let inner = self.inner.read().await;
inner.chunks.is_empty()
}
pub async fn size_bytes(&self) -> usize {
let inner = self.inner.read().await;
inner.current_size
}
pub async fn is_complete(&self) -> bool {
let inner = self.inner.read().await;
inner.write_complete
}
pub async fn clear(&self) {
let mut inner = self.inner.write().await;
inner.chunks.clear();
inner.current_size = 0;
debug!("Buffer cleared");
}
pub async fn stats(&self) -> BufferStats {
let inner = self.inner.read().await;
BufferStats {
chunks_buffered: inner.chunks.len(),
bytes_buffered: inner.current_size,
max_bytes: self.max_size,
utilization: (inner.current_size as f64) / (self.max_size as f64),
chunks_read: inner.next_read_index,
chunks_written: inner.next_write_index,
total_chunks: inner.total_chunks,
complete: inner.write_complete,
}
}
}
#[derive(Debug, Clone)]
pub struct BufferStats {
pub chunks_buffered: usize,
pub bytes_buffered: usize,
pub max_bytes: usize,
pub utilization: f64,
pub chunks_read: usize,
pub chunks_written: usize,
pub total_chunks: Option<usize>,
pub complete: bool,
}
impl BufferStats {
pub fn progress(&self) -> Option<f64> {
self.total_chunks.map(|total| {
if total == 0 {
100.0
} else {
(self.chunks_read as f64 / total as f64) * 100.0
}
})
}
}
pub struct CircularChunkBuffer {
buffer: Vec<u8>,
read_pos: usize,
write_pos: usize,
available: usize,
capacity: usize,
}
impl CircularChunkBuffer {
pub fn new(capacity: usize) -> Self {
Self {
buffer: vec![0; capacity],
read_pos: 0,
write_pos: 0,
available: 0,
capacity,
}
}
pub fn write(&mut self, data: &[u8]) -> Result<usize> {
let space_available = self.capacity - self.available;
let to_write = data.len().min(space_available);
if to_write == 0 {
return Ok(0);
}
let end_pos = self.write_pos + to_write;
if end_pos <= self.capacity {
self.buffer[self.write_pos..end_pos].copy_from_slice(&data[..to_write]);
self.write_pos = end_pos % self.capacity;
} else {
let first_part = self.capacity - self.write_pos;
self.buffer[self.write_pos..].copy_from_slice(&data[..first_part]);
self.buffer[..to_write - first_part].copy_from_slice(&data[first_part..to_write]);
self.write_pos = to_write - first_part;
}
self.available += to_write;
Ok(to_write)
}
pub fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let to_read = buf.len().min(self.available);
if to_read == 0 {
return Ok(0);
}
let end_pos = self.read_pos + to_read;
if end_pos <= self.capacity {
buf[..to_read].copy_from_slice(&self.buffer[self.read_pos..end_pos]);
self.read_pos = end_pos % self.capacity;
} else {
let first_part = self.capacity - self.read_pos;
buf[..first_part].copy_from_slice(&self.buffer[self.read_pos..]);
buf[first_part..to_read].copy_from_slice(&self.buffer[..to_read - first_part]);
self.read_pos = to_read - first_part;
}
self.available -= to_read;
Ok(to_read)
}
pub fn available(&self) -> usize {
self.available
}
pub fn space_available(&self) -> usize {
self.capacity - self.available
}
pub fn is_empty(&self) -> bool {
self.available == 0
}
pub fn is_full(&self) -> bool {
self.available == self.capacity
}
pub fn clear(&mut self) {
self.read_pos = 0;
self.write_pos = 0;
self.available = 0;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_chunked_buffer() {
let buffer = ChunkedBuffer::new(1024, 10240);
let desc = ChunkDescriptor::new(0, 1024, 0, 10);
let data = Bytes::from(vec![0u8; 1024]);
buffer.push(desc.clone(), data.clone()).await.ok();
assert_eq!(buffer.len().await, 1);
assert_eq!(buffer.size_bytes().await, 1024);
let popped = buffer.pop().await.ok().flatten();
assert!(popped.is_some());
assert_eq!(buffer.len().await, 0);
}
#[test]
fn test_circular_buffer() {
let mut buffer = CircularChunkBuffer::new(10);
let written = buffer.write(&[1, 2, 3, 4, 5]).ok();
assert_eq!(written, Some(5));
assert_eq!(buffer.available(), 5);
let mut read_buf = [0u8; 3];
let read = buffer.read(&mut read_buf).ok();
assert_eq!(read, Some(3));
assert_eq!(read_buf, [1, 2, 3]);
assert_eq!(buffer.available(), 2);
let written = buffer.write(&[6, 7, 8, 9, 10]).ok();
assert_eq!(written, Some(5));
assert_eq!(buffer.available(), 7);
}
#[test]
fn test_chunk_descriptor() {
let desc = ChunkDescriptor::new(0, 1024, 0, 10);
assert_eq!(desc.offset, 0);
assert_eq!(desc.length, 1024);
assert_eq!(desc.end_offset(), 1024);
assert_eq!(desc.is_last, false);
let last_desc = ChunkDescriptor::new(9216, 1024, 9, 10);
assert_eq!(last_desc.is_last, true);
}
}