Skip to main content

compress_tools/
futures_support.rs

1//! Async support with a built-in thread pool.
2
3use crate::{
4    async_support::{
5        self, new_async_archive_iterator, AsyncArchiveIterator, AsyncEntryFilterCallbackFn,
6        BlockingExecutor,
7    },
8    ArchivePassword, DecodeCallback, Ownership, Result,
9};
10use async_trait::async_trait;
11use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
12use std::path::Path;
13
14pub(crate) struct FuturesBlockingExecutor {}
15
16#[async_trait]
17impl BlockingExecutor for FuturesBlockingExecutor {
18    async fn execute_blocking<T, F>(f: F) -> Result<T>
19    where
20        T: Send + 'static,
21        F: FnOnce() -> T + Send + 'static,
22    {
23        Ok(blocking::unblock(f).await)
24    }
25}
26
27const FUTURES_BLOCKING_EXECUTOR: FuturesBlockingExecutor = FuturesBlockingExecutor {};
28
29/// Async version of
30/// [`list_archive_files_with_encoding`](crate::
31/// list_archive_files_with_encoding).
32pub async fn list_archive_files_with_encoding<R>(
33    source: R,
34    decode: DecodeCallback,
35) -> Result<Vec<String>>
36where
37    R: AsyncRead + AsyncSeek + Unpin,
38{
39    async_support::list_archive_files_with_encoding(FUTURES_BLOCKING_EXECUTOR, source, decode).await
40}
41
42/// Async version of [`list_archive_files`](crate::list_archive_files).
43pub async fn list_archive_files<R>(source: R) -> Result<Vec<String>>
44where
45    R: AsyncRead + AsyncSeek + Unpin,
46{
47    async_support::list_archive_files(FUTURES_BLOCKING_EXECUTOR, source).await
48}
49
50/// Async version of
51/// [`list_archive_entries_with_encoding`](crate::
52/// list_archive_entries_with_encoding).
53pub async fn list_archive_entries_with_encoding<R>(
54    source: R,
55    decode: DecodeCallback,
56) -> Result<Vec<crate::ArchiveEntryInfo>>
57where
58    R: AsyncRead + AsyncSeek + Unpin,
59{
60    async_support::list_archive_entries_with_encoding(FUTURES_BLOCKING_EXECUTOR, source, decode)
61        .await
62}
63
64/// Async version of [`list_archive_entries`](crate::list_archive_entries).
65pub async fn list_archive_entries<R>(source: R) -> Result<Vec<crate::ArchiveEntryInfo>>
66where
67    R: AsyncRead + AsyncSeek + Unpin,
68{
69    async_support::list_archive_entries(FUTURES_BLOCKING_EXECUTOR, source).await
70}
71
72/// Async version of [`uncompress_data`](crate::uncompress_data).
73pub async fn uncompress_data<R, W>(source: R, target: W) -> Result<usize>
74where
75    R: AsyncRead + Unpin,
76    W: AsyncWrite + Unpin,
77{
78    async_support::uncompress_data(FUTURES_BLOCKING_EXECUTOR, source, target).await
79}
80
81/// Async version of
82/// [`uncompress_archive_with_encoding`](crate::
83/// uncompress_archive_with_encoding).
84pub async fn uncompress_archive_with_encoding<R>(
85    source: R,
86    dest: &Path,
87    ownership: Ownership,
88    decode: DecodeCallback,
89) -> Result<()>
90where
91    R: AsyncRead + AsyncSeek + Unpin,
92{
93    async_support::uncompress_archive_with_encoding(
94        FUTURES_BLOCKING_EXECUTOR,
95        source,
96        dest,
97        ownership,
98        decode,
99    )
100    .await
101}
102
103/// Async version of [`uncompress_archive`](crate::uncompress_archive).
104pub async fn uncompress_archive<R>(source: R, dest: &Path, ownership: Ownership) -> Result<()>
105where
106    R: AsyncRead + AsyncSeek + Unpin,
107{
108    async_support::uncompress_archive(FUTURES_BLOCKING_EXECUTOR, source, dest, ownership).await
109}
110
111/// Async version of
112/// [`uncompress_archive_file_with_encoding`](crate::
113/// uncompress_archive_file_with_encoding).
114pub async fn uncompress_archive_file_with_encoding<R, W>(
115    source: R,
116    target: W,
117    path: &str,
118    decode: DecodeCallback,
119) -> Result<usize>
120where
121    R: AsyncRead + AsyncSeek + Unpin,
122    W: AsyncWrite + Unpin,
123{
124    async_support::uncompress_archive_file_with_encoding(
125        FUTURES_BLOCKING_EXECUTOR,
126        source,
127        target,
128        path,
129        decode,
130    )
131    .await
132}
133
134/// Async version of
135/// [`uncompress_archive_file`](crate::uncompress_archive_file).
136pub async fn uncompress_archive_file<R, W>(source: R, target: W, path: &str) -> Result<usize>
137where
138    R: AsyncRead + AsyncSeek + Unpin,
139    W: AsyncWrite + Unpin,
140{
141    async_support::uncompress_archive_file(FUTURES_BLOCKING_EXECUTOR, source, target, path).await
142}
143
144// ---------------------------------------------------------------------------
145// Async archive iterator
146// ---------------------------------------------------------------------------
147
148/// Builder for a futures-backed [`AsyncArchiveIterator`].
149///
150/// Mirrors [`crate::ArchiveIteratorBuilder`] but produces an asynchronous
151/// [`futures_core::Stream`] of entries over a `futures_io::AsyncRead +
152/// AsyncSeek` source. The source must be `Send + 'static` because it is
153/// moved into a blocking worker that hosts the sync libarchive state for
154/// the iterator's lifetime.
155#[must_use]
156pub struct ArchiveIteratorBuilder<R> {
157    source: R,
158    decoder: DecodeCallback,
159    filter: Option<Box<AsyncEntryFilterCallbackFn>>,
160    password: Option<ArchivePassword>,
161}
162
163impl<R> ArchiveIteratorBuilder<R>
164where
165    R: AsyncRead + AsyncSeek + Unpin + Send + 'static,
166{
167    pub fn new(source: R) -> ArchiveIteratorBuilder<R> {
168        ArchiveIteratorBuilder {
169            source,
170            decoder: crate::decode_utf8,
171            filter: None,
172            password: None,
173        }
174    }
175
176    pub fn decoder(mut self, decoder: DecodeCallback) -> ArchiveIteratorBuilder<R> {
177        self.decoder = decoder;
178        self
179    }
180
181    pub fn filter<F>(mut self, filter: F) -> ArchiveIteratorBuilder<R>
182    where
183        F: Fn(&str, &crate::stat) -> bool + Send + Sync + 'static,
184    {
185        self.filter = Some(Box::new(filter));
186        self
187    }
188
189    pub fn with_password(mut self, password: ArchivePassword) -> ArchiveIteratorBuilder<R> {
190        self.password = Some(password);
191        self
192    }
193
194    pub fn build(self) -> AsyncArchiveIterator {
195        new_async_archive_iterator::<FuturesBlockingExecutor, _>(
196            self.source,
197            self.decoder,
198            self.filter,
199            self.password,
200        )
201    }
202}