#![cfg(feature = "async-io")]
use oxiarc_core::async_io::{AsyncCompressor, AsyncDecompressor};
use oxiarc_core::error::Result;
use std::future::Future;
use std::io::{Cursor, Read, Write};
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use crate::frame::{FrameDecoder, FrameEncoder};
const SNAPPY_ASYNC_BUFFER_SIZE: usize = 64 * 1024;
pub struct AsyncSnappyCompressor;
pub struct AsyncSnappyDecompressor;
impl AsyncCompressor for AsyncSnappyCompressor {
fn compress_async<'a, R, W>(
&'a mut self,
input: &'a mut R,
output: &'a mut W,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'a>>
where
R: AsyncRead + Unpin + Send + 'a,
W: AsyncWrite + Unpin + Send + 'a,
{
self.compress_async_with_buffer(input, output, SNAPPY_ASYNC_BUFFER_SIZE)
}
fn compress_async_with_buffer<'a, R, W>(
&'a mut self,
input: &'a mut R,
output: &'a mut W,
buffer_size: usize,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'a>>
where
R: AsyncRead + Unpin + Send + 'a,
W: AsyncWrite + Unpin + Send + 'a,
{
let buf_size = buffer_size.max(256);
Box::pin(async move {
let mut read_buf = vec![0u8; buf_size];
let mut all_input: Vec<u8> = Vec::new();
loop {
let n = input.read(&mut read_buf).await?;
if n == 0 {
break;
}
all_input.extend_from_slice(&read_buf[..n]);
}
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder
.write_all(&all_input)
.map_err(oxiarc_core::error::OxiArcError::Io)?;
encoder
.finish()
.map_err(oxiarc_core::error::OxiArcError::Io)?;
}
let total_written = compressed.len();
let mut offset = 0;
while offset < compressed.len() {
let end = (offset + buf_size).min(compressed.len());
output.write_all(&compressed[offset..end]).await?;
offset = end;
}
output.flush().await?;
Ok(total_written)
})
}
}
impl AsyncDecompressor for AsyncSnappyDecompressor {
fn decompress_async<'a, R, W>(
&'a mut self,
input: &'a mut R,
output: &'a mut W,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'a>>
where
R: AsyncRead + Unpin + Send + 'a,
W: AsyncWrite + Unpin + Send + 'a,
{
self.decompress_async_with_buffer(input, output, SNAPPY_ASYNC_BUFFER_SIZE)
}
fn decompress_async_with_buffer<'a, R, W>(
&'a mut self,
input: &'a mut R,
output: &'a mut W,
buffer_size: usize,
) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'a>>
where
R: AsyncRead + Unpin + Send + 'a,
W: AsyncWrite + Unpin + Send + 'a,
{
let buf_size = buffer_size.max(256);
Box::pin(async move {
let mut read_buf = vec![0u8; buf_size];
let mut all_compressed: Vec<u8> = Vec::new();
loop {
let n = input.read(&mut read_buf).await?;
if n == 0 {
break;
}
all_compressed.extend_from_slice(&read_buf[..n]);
}
let mut decompressed = Vec::new();
{
let cursor = Cursor::new(&all_compressed);
let mut decoder = FrameDecoder::new(cursor);
decoder
.read_to_end(&mut decompressed)
.map_err(oxiarc_core::error::OxiArcError::Io)?;
}
let total_written = decompressed.len();
output.write_all(&decompressed).await?;
output.flush().await?;
Ok(total_written)
})
}
}
pub async fn compress_frame_async<R, W>(mut input: R, output: &mut W) -> Result<usize>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
let mut all_input = Vec::new();
input.read_to_end(&mut all_input).await?;
let mut compressed = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut compressed);
encoder
.write_all(&all_input)
.map_err(oxiarc_core::error::OxiArcError::Io)?;
encoder
.finish()
.map_err(oxiarc_core::error::OxiArcError::Io)?;
}
let total = compressed.len();
output.write_all(&compressed).await?;
output.flush().await?;
Ok(total)
}
pub async fn decompress_frame_async<R, W>(mut input: R, output: &mut W) -> Result<usize>
where
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
let mut all_compressed = Vec::new();
input.read_to_end(&mut all_compressed).await?;
let mut decompressed = Vec::new();
{
let cursor = Cursor::new(&all_compressed);
let mut decoder = FrameDecoder::new(cursor);
decoder
.read_to_end(&mut decompressed)
.map_err(oxiarc_core::error::OxiArcError::Io)?;
}
let total = decompressed.len();
output.write_all(&decompressed).await?;
output.flush().await?;
Ok(total)
}