use super::buffer::{ChunkDescriptor, ChunkedBuffer};
use super::chunked::{ChunkStrategy, ChunkedIO, FileChunkedIO};
use crate::error::{Result, StreamingError};
use bytes::Bytes;
use std::path::Path;
use tokio::sync::Semaphore;
use std::sync::Arc;
use tracing::{debug, info};
pub struct ChunkedWriter {
io: Box<dyn ChunkedIO>,
buffer: ChunkedBuffer,
current_index: usize,
bytes_written: u64,
write_semaphore: Arc<Semaphore>,
strategy: ChunkStrategy,
}
impl ChunkedWriter {
pub async fn from_file<P: AsRef<Path>>(
path: P,
strategy: ChunkStrategy,
buffer_size: usize,
max_concurrent_writes: usize,
) -> Result<Self> {
let mut io = FileChunkedIO::new(path, strategy).await?;
io.open_write().await?;
let chunk_size = strategy.chunk_size_for_index(0, 0);
let buffer = ChunkedBuffer::new(chunk_size, buffer_size);
info!("Created chunked writer with chunk size {}", chunk_size);
Ok(Self {
io: Box::new(io),
buffer,
current_index: 0,
bytes_written: 0,
write_semaphore: Arc::new(Semaphore::new(max_concurrent_writes)),
strategy,
})
}
pub async fn write_chunk(&mut self, data: Bytes) -> Result<()> {
let _permit = self.write_semaphore.acquire().await
.map_err(|e| StreamingError::Other(e.to_string()))?;
let offset = self.bytes_written;
let length = data.len();
let descriptor = ChunkDescriptor::new(
offset,
length,
self.current_index,
0, );
self.io.write_chunk(&descriptor, data).await?;
self.current_index += 1;
self.bytes_written += length as u64;
debug!(
"Wrote chunk {} ({} bytes), total: {} bytes",
descriptor.index, length, self.bytes_written
);
Ok(())
}
pub async fn write_chunks(&mut self, chunks: Vec<Bytes>) -> Result<()> {
for chunk in chunks {
self.write_chunk(chunk).await?;
}
Ok(())
}
pub async fn flush(&mut self) -> Result<()> {
self.io.flush().await?;
info!("Flushed {} bytes in {} chunks", self.bytes_written, self.current_index);
Ok(())
}
pub async fn finalize(mut self) -> Result<()> {
self.flush().await?;
info!("Finalized chunked writer");
Ok(())
}
pub fn chunks_written(&self) -> usize {
self.current_index
}
pub fn bytes_written(&self) -> u64 {
self.bytes_written
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
#[tokio::test]
async fn test_chunked_writer() {
let temp_dir = env::temp_dir();
let test_path = temp_dir.join("test_chunked_write.dat");
let result = ChunkedWriter::from_file(
&test_path,
ChunkStrategy::FixedSize(1024),
10240,
4,
)
.await;
if let Ok(mut writer) = result {
let data = Bytes::from(vec![42u8; 1024]);
writer.write_chunk(data).await.ok();
writer.finalize().await.ok();
}
tokio::fs::remove_file(&test_path).await.ok();
}
}