use crate::{DecodeCallback, Ownership, Result, READER_BUFFER_SIZE};
use async_trait::async_trait;
use futures_channel::mpsc::{channel, Receiver, Sender};
use futures_core::FusedStream;
use futures_executor::block_on;
use futures_io::{AsyncRead, AsyncWrite};
use futures_util::{
io::{AsyncReadExt, AsyncWriteExt},
join,
sink::SinkExt,
stream::StreamExt,
};
use std::{
future::Future,
io::{ErrorKind, Read, Seek, SeekFrom, Write},
path::Path,
};
#[async_trait]
pub trait BlockingExecutor {
async fn execute_blocking<T, F>(f: F) -> Result<T>
where
T: Send + 'static,
F: FnOnce() -> T + Send + 'static;
}
struct AsyncReadWrapper {
rx: Receiver<Vec<u8>>,
}
impl Read for AsyncReadWrapper {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
if self.rx.is_terminated() {
return Ok(0);
}
assert_eq!(buf.len(), READER_BUFFER_SIZE);
Ok(match block_on(self.rx.next()) {
Some(data) => {
buf.write_all(&data)?;
data.len()
}
None => 0,
})
}
}
impl Seek for AsyncReadWrapper {
fn seek(&mut self, _: SeekFrom) -> std::io::Result<u64> {
unreachable!("We need to use libarchive_seek_callback() underlying.")
}
}
fn make_async_read_wrapper_and_worker<R>(
mut read: R,
) -> (AsyncReadWrapper, impl Future<Output = Result<()>>)
where
R: AsyncRead + Unpin,
{
let (mut tx, rx) = channel(0);
(AsyncReadWrapper { rx }, async move {
loop {
let mut data = vec![0; READER_BUFFER_SIZE];
let read = read.read(&mut data).await?;
data.truncate(read);
if read == 0 || tx.send(data).await.is_err() {
break;
}
}
Ok(())
})
}
struct AsyncWriteWrapper {
tx: Sender<Vec<u8>>,
}
impl Write for AsyncWriteWrapper {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
match block_on(self.tx.send(buf.to_owned())) {
Ok(()) => Ok(buf.len()),
Err(err) => Err(std::io::Error::new(ErrorKind::Other, err)),
}
}
fn flush(&mut self) -> std::io::Result<()> {
block_on(self.tx.send(vec![])).map_err(|err| std::io::Error::new(ErrorKind::Other, err))
}
}
fn make_async_write_wrapper_and_worker<W>(
mut write: W,
) -> (AsyncWriteWrapper, impl Future<Output = Result<()>>)
where
W: AsyncWrite + Unpin,
{
let (tx, mut rx) = channel(0);
(AsyncWriteWrapper { tx }, async move {
while let Some(v) = rx.next().await {
if v.is_empty() {
write.flush().await?;
} else {
write.write_all(&v).await?;
}
}
Ok(())
})
}
async fn wrap_async_read<B, R, F, T>(_: B, read: R, f: F) -> Result<T>
where
B: BlockingExecutor,
R: AsyncRead + Unpin,
F: FnOnce(AsyncReadWrapper) -> T + Send + 'static,
T: Send + 'static,
{
let (async_read_wrapper, async_read_wrapper_worker) = make_async_read_wrapper_and_worker(read);
let g = B::execute_blocking(move || f(async_read_wrapper));
let join = join!(async_read_wrapper_worker, g);
join.0?;
join.1
}
async fn wrap_async_read_and_write<B, R, W, F, T>(_: B, read: R, write: W, f: F) -> Result<T>
where
B: BlockingExecutor,
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
F: FnOnce(AsyncReadWrapper, AsyncWriteWrapper) -> T + Send + 'static,
T: Send + 'static,
{
let (async_read_wrapper, async_read_wrapper_worker) = make_async_read_wrapper_and_worker(read);
let (async_write_wrapper, async_write_wrapper_worker) =
make_async_write_wrapper_and_worker(write);
let g = B::execute_blocking(move || f(async_read_wrapper, async_write_wrapper));
let join = join!(async_read_wrapper_worker, async_write_wrapper_worker, g);
join.0?;
join.1?;
join.2
}
pub async fn list_archive_files_with_encoding<B, R>(
blocking_executor: B,
source: R,
decode: DecodeCallback,
) -> Result<Vec<String>>
where
B: BlockingExecutor,
R: AsyncRead + Unpin,
{
wrap_async_read(blocking_executor, source, move |source| {
crate::list_archive_files_with_encoding(source, decode)
})
.await?
}
pub async fn list_archive_files<B, R>(blocking_executor: B, source: R) -> Result<Vec<String>>
where
B: BlockingExecutor,
R: AsyncRead + Unpin,
{
wrap_async_read(blocking_executor, source, crate::list_archive_files).await?
}
pub async fn uncompress_data<B, R, W>(blocking_executor: B, source: R, target: W) -> Result<usize>
where
B: BlockingExecutor,
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
wrap_async_read_and_write(blocking_executor, source, target, |source, target| {
crate::uncompress_data(source, target)
})
.await?
}
pub async fn uncompress_archive_with_encoding<B, R>(
blocking_executor: B,
source: R,
dest: &Path,
ownership: Ownership,
decode: DecodeCallback,
) -> Result<()>
where
B: BlockingExecutor,
R: AsyncRead + Unpin,
{
let dest = dest.to_owned();
wrap_async_read(blocking_executor, source, move |source| {
crate::uncompress_archive_with_encoding(source, &dest, ownership, decode)
})
.await?
}
pub async fn uncompress_archive<B, R>(
blocking_executor: B,
source: R,
dest: &Path,
ownership: Ownership,
) -> Result<()>
where
B: BlockingExecutor,
R: AsyncRead + Unpin,
{
let dest = dest.to_owned();
wrap_async_read(blocking_executor, source, move |source| {
crate::uncompress_archive(source, &dest, ownership)
})
.await?
}
pub async fn uncompress_archive_file_with_encoding<B, R, W>(
blocking_executor: B,
source: R,
target: W,
path: &str,
decode: DecodeCallback,
) -> Result<usize>
where
B: BlockingExecutor,
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
let path = path.to_owned();
wrap_async_read_and_write(blocking_executor, source, target, move |source, target| {
crate::uncompress_archive_file_with_encoding(source, target, &path, decode)
})
.await?
}
pub async fn uncompress_archive_file<B, R, W>(
blocking_executor: B,
source: R,
target: W,
path: &str,
) -> Result<usize>
where
B: BlockingExecutor,
R: AsyncRead + Unpin,
W: AsyncWrite + Unpin,
{
let path = path.to_owned();
wrap_async_read_and_write(blocking_executor, source, target, move |source, target| {
crate::uncompress_archive_file(source, target, &path)
})
.await?
}