use bytes::Bytes;
use futures_lite::{AsyncWriteExt, StreamExt};
#[derive(Debug)]
pub struct AsyncFileWriter {
file: smol::fs::File,
buffer: Vec<u8>,
buffer_size: usize,
compressed: bool,
bytes_written: u64,
}
impl AsyncFileWriter {
pub async fn new(path: &str) -> Result<Self, std::io::Error> {
Self::with_config(path, 8192, false).await
}
pub async fn with_config(
path: &str,
buffer_size: usize,
compressed: bool,
) -> Result<Self, std::io::Error> {
let file = smol::fs::File::create(path).await?;
Ok(Self {
file,
buffer: Vec::with_capacity(buffer_size),
buffer_size,
compressed,
bytes_written: 0,
})
}
pub async fn write(&mut self, data: &[u8]) -> Result<(), std::io::Error> {
let data_to_write = if self.compressed {
crate::io::utils::compress_brotli(data, 6)?
} else {
data.to_vec()
};
self.buffer.extend_from_slice(&data_to_write);
if self.buffer.len() >= self.buffer_size {
self.flush().await?;
}
Ok(())
}
pub async fn write_json<T: serde::Serialize>(
&mut self,
data: &T,
) -> Result<(), std::io::Error> {
let json = serde_json::to_string(data)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
self.write(json.as_bytes()).await
}
pub async fn flush(&mut self) -> Result<(), std::io::Error> {
if !self.buffer.is_empty() {
self.file.write_all(&self.buffer).await?;
self.bytes_written += self.buffer.len() as u64;
self.buffer.clear();
}
self.file.flush().await?;
Ok(())
}
#[must_use]
pub fn bytes_written(&self) -> u64 {
self.bytes_written
}
#[must_use]
pub fn is_compressed(&self) -> bool {
self.compressed
}
}
impl Drop for AsyncFileWriter {
fn drop(&mut self) {
futures_lite::future::block_on(async {
let _ = self.flush().await;
});
}
}
pub struct StreamingFileWriter {
file: smol::fs::File,
chunk_size: usize,
compressed: bool,
bytes_written: u64,
}
impl StreamingFileWriter {
pub async fn new(
path: &str,
chunk_size: usize,
compressed: bool,
) -> Result<Self, std::io::Error> {
let file = smol::fs::File::create(path).await?;
Ok(Self {
file,
chunk_size,
compressed,
bytes_written: 0,
})
}
pub async fn write_chunk(&mut self, data: &[u8]) -> Result<(), std::io::Error> {
let data_to_write = if self.compressed {
crate::io::utils::compress_brotli(data, 6)?
} else {
data.to_vec()
};
self.file.write_all(&data_to_write).await?;
self.file.flush().await?;
self.bytes_written += data_to_write.len() as u64;
Ok(())
}
pub async fn write_from_stream<S>(&mut self, mut stream: S) -> Result<(), std::io::Error>
where
S: futures_lite::Stream<Item = Result<Bytes, std::io::Error>> + Unpin,
{
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?;
self.write_chunk(&chunk).await?;
}
self.file.flush().await?;
Ok(())
}
#[must_use]
pub fn bytes_written(&self) -> u64 {
self.bytes_written
}
#[must_use]
pub fn chunk_size(&self) -> usize {
self.chunk_size
}
}
pub struct AdvancedFileWriter {
inner: AsyncFileWriter,
progress_callback: Option<Box<dyn Fn(u64) + Send + Sync>>,
error_recovery: bool,
}
impl AdvancedFileWriter {
pub async fn new(
path: &str,
buffer_size: usize,
compressed: bool,
progress_callback: Option<Box<dyn Fn(u64) + Send + Sync>>,
error_recovery: bool,
) -> Result<Self, std::io::Error> {
let inner = AsyncFileWriter::with_config(path, buffer_size, compressed).await?;
Ok(Self {
inner,
progress_callback,
error_recovery,
})
}
pub async fn write_with_progress(&mut self, data: &[u8]) -> Result<(), std::io::Error> {
let result = self.inner.write(data).await;
if let Err(ref e) = result
&& self.error_recovery
{
eprintln!("Write error, attempting recovery: {e}");
}
if let Some(ref callback) = self.progress_callback {
callback(self.inner.bytes_written());
}
result
}
pub async fn write_json_with_progress<T: serde::Serialize>(
&mut self,
data: &T,
) -> Result<(), std::io::Error> {
let json = serde_json::to_string(data)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
self.write_with_progress(json.as_bytes()).await
}
pub async fn flush(&mut self) -> Result<(), std::io::Error> {
self.inner.flush().await
}
#[must_use]
pub fn bytes_written(&self) -> u64 {
self.inner.bytes_written()
}
}
pub fn write_stdout_async(data: &[u8]) -> Result<(), std::io::Error> {
use std::io::Write;
let mut stdout = std::io::stdout().lock();
stdout.write_all(data)?;
stdout.flush()?;
Ok(())
}
pub fn write_stderr_async(data: &[u8]) -> Result<(), std::io::Error> {
use std::io::Write;
let mut stderr = std::io::stderr().lock();
stderr.write_all(data)?;
stderr.flush()?;
Ok(())
}