Skip to main content

compress_tools/
tokio_support.rs

1//! Async support that uses [`tokio::task::spawn_blocking`] and its I/O traits.
2
3use crate::{
4    async_support::{
5        self, new_async_archive_iterator, AsyncArchiveIterator, AsyncEntryFilterCallbackFn,
6        BlockingExecutor,
7    },
8    ArchivePassword, DecodeCallback, Ownership, Result,
9};
10use async_trait::async_trait;
11use std::path::Path;
12use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite};
13use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
14
15pub(crate) struct TokioBlockingExecutor {}
16
17#[async_trait]
18impl BlockingExecutor for TokioBlockingExecutor {
19    async fn execute_blocking<T, F>(f: F) -> Result<T>
20    where
21        T: Send + 'static,
22        F: FnOnce() -> T + Send + 'static,
23    {
24        tokio::task::spawn_blocking(f).await.map_err(Into::into)
25    }
26}
27
28const TOKIO_BLOCKING_EXECUTOR: TokioBlockingExecutor = TokioBlockingExecutor {};
29
30/// Async version of
31/// [`list_archive_files_with_encoding`](crate::
32/// list_archive_files_with_encoding).
33pub async fn list_archive_files_with_encoding<R>(
34    source: R,
35    decode: DecodeCallback,
36) -> Result<Vec<String>>
37where
38    R: AsyncRead + AsyncSeek + Unpin,
39{
40    async_support::list_archive_files_with_encoding(
41        TOKIO_BLOCKING_EXECUTOR,
42        source.compat(),
43        decode,
44    )
45    .await
46}
47
48/// Async version of [`list_archive_files`](crate::list_archive_files).
49pub async fn list_archive_files<R>(source: R) -> Result<Vec<String>>
50where
51    R: AsyncRead + AsyncSeek + Unpin,
52{
53    async_support::list_archive_files(TOKIO_BLOCKING_EXECUTOR, source.compat()).await
54}
55
56/// Async version of
57/// [`list_archive_entries_with_encoding`](crate::
58/// list_archive_entries_with_encoding).
59pub async fn list_archive_entries_with_encoding<R>(
60    source: R,
61    decode: DecodeCallback,
62) -> Result<Vec<crate::ArchiveEntryInfo>>
63where
64    R: AsyncRead + AsyncSeek + Unpin,
65{
66    async_support::list_archive_entries_with_encoding(
67        TOKIO_BLOCKING_EXECUTOR,
68        source.compat(),
69        decode,
70    )
71    .await
72}
73
74/// Async version of [`list_archive_entries`](crate::list_archive_entries).
75pub async fn list_archive_entries<R>(source: R) -> Result<Vec<crate::ArchiveEntryInfo>>
76where
77    R: AsyncRead + AsyncSeek + Unpin,
78{
79    async_support::list_archive_entries(TOKIO_BLOCKING_EXECUTOR, source.compat()).await
80}
81
82/// Async version of [`uncompress_data`](crate::uncompress_data).
83pub async fn uncompress_data<R, W>(source: R, target: W) -> Result<usize>
84where
85    R: AsyncRead + Unpin,
86    W: AsyncWrite + Unpin,
87{
88    async_support::uncompress_data(
89        TOKIO_BLOCKING_EXECUTOR,
90        source.compat(),
91        target.compat_write(),
92    )
93    .await
94}
95
96/// Async version of
97/// [`uncompress_archive_with_encoding`](crate::
98/// uncompress_archive_with_encoding).
99pub async fn uncompress_archive_with_encoding<R>(
100    source: R,
101    dest: &Path,
102    ownership: Ownership,
103    decode: DecodeCallback,
104) -> Result<()>
105where
106    R: AsyncRead + AsyncSeek + Unpin,
107{
108    async_support::uncompress_archive_with_encoding(
109        TOKIO_BLOCKING_EXECUTOR,
110        source.compat(),
111        dest,
112        ownership,
113        decode,
114    )
115    .await
116}
117
118/// Async version of [`uncompress_archive`](crate::uncompress_archive).
119pub async fn uncompress_archive<R>(source: R, dest: &Path, ownership: Ownership) -> Result<()>
120where
121    R: AsyncRead + AsyncSeek + Unpin,
122{
123    async_support::uncompress_archive(TOKIO_BLOCKING_EXECUTOR, source.compat(), dest, ownership)
124        .await
125}
126
127/// Async version of
128/// [`uncompress_archive_file_with_encoding`](crate::
129/// uncompress_archive_file_with_encoding).
130pub async fn uncompress_archive_file_with_encoding<R, W>(
131    source: R,
132    target: W,
133    path: &str,
134    decode: DecodeCallback,
135) -> Result<usize>
136where
137    R: AsyncRead + AsyncSeek + Unpin,
138    W: AsyncWrite + Unpin,
139{
140    async_support::uncompress_archive_file_with_encoding(
141        TOKIO_BLOCKING_EXECUTOR,
142        source.compat(),
143        target.compat_write(),
144        path,
145        decode,
146    )
147    .await
148}
149
150/// Async version of
151/// [`uncompress_archive_file`](crate::uncompress_archive_file).
152pub async fn uncompress_archive_file<R, W>(source: R, target: W, path: &str) -> Result<usize>
153where
154    R: AsyncRead + AsyncSeek + Unpin,
155    W: AsyncWrite + Unpin,
156{
157    async_support::uncompress_archive_file(
158        TOKIO_BLOCKING_EXECUTOR,
159        source.compat(),
160        target.compat_write(),
161        path,
162    )
163    .await
164}
165
166// ---------------------------------------------------------------------------
167// Async archive iterator
168// ---------------------------------------------------------------------------
169
170/// Builder for a tokio-backed [`AsyncArchiveIterator`].
171///
172/// Mirrors [`crate::ArchiveIteratorBuilder`] but produces an asynchronous
173/// [`futures_core::Stream`] of entries over an `AsyncRead + AsyncSeek`
174/// source. The source must be `Send + 'static` because it is moved into a
175/// blocking worker that hosts the sync libarchive state for the iterator's
176/// lifetime.
177#[must_use]
178pub struct ArchiveIteratorBuilder<R> {
179    source: R,
180    decoder: DecodeCallback,
181    filter: Option<Box<AsyncEntryFilterCallbackFn>>,
182    password: Option<ArchivePassword>,
183}
184
185impl<R> ArchiveIteratorBuilder<R>
186where
187    R: AsyncRead + AsyncSeek + Unpin + Send + 'static,
188{
189    pub fn new(source: R) -> ArchiveIteratorBuilder<R> {
190        ArchiveIteratorBuilder {
191            source,
192            decoder: crate::decode_utf8,
193            filter: None,
194            password: None,
195        }
196    }
197
198    pub fn decoder(mut self, decoder: DecodeCallback) -> ArchiveIteratorBuilder<R> {
199        self.decoder = decoder;
200        self
201    }
202
203    pub fn filter<F>(mut self, filter: F) -> ArchiveIteratorBuilder<R>
204    where
205        F: Fn(&str, &crate::stat) -> bool + Send + Sync + 'static,
206    {
207        self.filter = Some(Box::new(filter));
208        self
209    }
210
211    pub fn with_password(mut self, password: ArchivePassword) -> ArchiveIteratorBuilder<R> {
212        self.password = Some(password);
213        self
214    }
215
216    pub fn build(self) -> AsyncArchiveIterator {
217        new_async_archive_iterator::<TokioBlockingExecutor, _>(
218            self.source.compat(),
219            self.decoder,
220            self.filter,
221            self.password,
222        )
223    }
224}