uv_extract/
stream.rs

1use std::path::{Component, Path, PathBuf};
2use std::pin::Pin;
3
4use async_zip::base::read::cd::Entry;
5use async_zip::error::ZipError;
6use futures::{AsyncReadExt, StreamExt};
7use rustc_hash::{FxHashMap, FxHashSet};
8use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
9use tracing::{debug, warn};
10
11use uv_distribution_filename::SourceDistExtension;
12
13use crate::{Error, insecure_no_validate, validate_archive_member_name};
14
15const DEFAULT_BUF_SIZE: usize = 128 * 1024;
16
17#[derive(Debug, Clone, PartialEq, Eq)]
18struct LocalHeaderEntry {
19    /// The relative path of the entry, as computed from the local file header.
20    relpath: PathBuf,
21    /// The computed CRC32 checksum of the entry.
22    crc32: u32,
23    /// The computed compressed size of the entry.
24    compressed_size: u64,
25    /// The computed uncompressed size of the entry.
26    uncompressed_size: u64,
27    /// Whether the entry has a data descriptor.
28    data_descriptor: bool,
29}
30
31#[derive(Debug, Clone, PartialEq, Eq)]
32struct ComputedEntry {
33    /// The computed CRC32 checksum of the entry.
34    crc32: u32,
35    /// The computed uncompressed size of the entry.
36    uncompressed_size: u64,
37    /// The computed compressed size of the entry.
38    compressed_size: u64,
39}
40
41/// Unpack a `.zip` archive into the target directory, without requiring `Seek`.
42///
43/// This is useful for unzipping files as they're being downloaded. If the archive
44/// is already fully on disk, consider using `unzip_archive`, which can use multiple
45/// threads to work faster in that case.
46pub async fn unzip<R: tokio::io::AsyncRead + Unpin>(
47    reader: R,
48    target: impl AsRef<Path>,
49) -> Result<(), Error> {
50    /// Ensure the file path is safe to use as a [`Path`].
51    ///
52    /// See: <https://docs.rs/zip/latest/zip/read/struct.ZipFile.html#method.enclosed_name>
53    pub(crate) fn enclosed_name(file_name: &str) -> Option<PathBuf> {
54        if file_name.contains('\0') {
55            return None;
56        }
57        let path = PathBuf::from(file_name);
58        let mut depth = 0usize;
59        for component in path.components() {
60            match component {
61                Component::Prefix(_) | Component::RootDir => return None,
62                Component::ParentDir => depth = depth.checked_sub(1)?,
63                Component::Normal(_) => depth += 1,
64                Component::CurDir => (),
65            }
66        }
67        Some(path)
68    }
69
70    // Determine whether ZIP validation is disabled.
71    let skip_validation = insecure_no_validate();
72
73    let target = target.as_ref();
74    let mut reader = futures::io::BufReader::with_capacity(DEFAULT_BUF_SIZE, reader.compat());
75    let mut zip = async_zip::base::read::stream::ZipFileReader::new(&mut reader);
76
77    let mut directories = FxHashSet::default();
78    let mut local_headers = FxHashMap::default();
79    let mut offset = 0;
80
81    while let Some(mut entry) = zip.next_with_entry().await? {
82        // Construct the (expected) path to the file on-disk.
83        let path = match entry.reader().entry().filename().as_str() {
84            Ok(path) => path,
85            Err(ZipError::StringNotUtf8) => return Err(Error::LocalHeaderNotUtf8 { offset }),
86            Err(err) => return Err(err.into()),
87        };
88
89        // Apply sanity checks to the file names in local headers.
90        if let Err(e) = validate_archive_member_name(path) {
91            if !skip_validation {
92                return Err(e);
93            }
94        }
95
96        // Sanitize the file name to prevent directory traversal attacks.
97        let Some(relpath) = enclosed_name(path) else {
98            warn!("Skipping unsafe file name: {path}");
99
100            // Close current file prior to proceeding, as per:
101            // https://docs.rs/async_zip/0.0.16/async_zip/base/read/stream/
102            (.., zip) = entry.skip().await?;
103
104            // Store the current offset.
105            offset = zip.offset();
106
107            continue;
108        };
109
110        let file_offset = entry.reader().entry().file_offset();
111        let expected_compressed_size = entry.reader().entry().compressed_size();
112        let expected_uncompressed_size = entry.reader().entry().uncompressed_size();
113        let expected_data_descriptor = entry.reader().entry().data_descriptor();
114
115        // Either create the directory or write the file to disk.
116        let path = target.join(&relpath);
117        let is_dir = entry.reader().entry().dir()?;
118        let computed = if is_dir {
119            if directories.insert(path.clone()) {
120                fs_err::tokio::create_dir_all(path)
121                    .await
122                    .map_err(Error::Io)?;
123            }
124
125            // If this is a directory, we expect the CRC32 to be 0.
126            if entry.reader().entry().crc32() != 0 {
127                if !skip_validation {
128                    return Err(Error::BadCrc32 {
129                        path: relpath.clone(),
130                        computed: 0,
131                        expected: entry.reader().entry().crc32(),
132                    });
133                }
134            }
135
136            // If this is a directory, we expect the uncompressed size to be 0.
137            if entry.reader().entry().uncompressed_size() != 0 {
138                if !skip_validation {
139                    return Err(Error::BadUncompressedSize {
140                        path: relpath.clone(),
141                        computed: 0,
142                        expected: entry.reader().entry().uncompressed_size(),
143                    });
144                }
145            }
146
147            ComputedEntry {
148                crc32: 0,
149                uncompressed_size: 0,
150                compressed_size: 0,
151            }
152        } else {
153            if let Some(parent) = path.parent() {
154                if directories.insert(parent.to_path_buf()) {
155                    fs_err::tokio::create_dir_all(parent)
156                        .await
157                        .map_err(Error::Io)?;
158                }
159            }
160
161            // We don't know the file permissions here, because we haven't seen the central directory yet.
162            let (actual_uncompressed_size, reader) = match fs_err::tokio::File::create_new(&path)
163                .await
164            {
165                Ok(file) => {
166                    // Write the file to disk.
167                    let size = entry.reader().entry().uncompressed_size();
168                    let mut writer = if let Ok(size) = usize::try_from(size) {
169                        tokio::io::BufWriter::with_capacity(std::cmp::min(size, 1024 * 1024), file)
170                    } else {
171                        tokio::io::BufWriter::new(file)
172                    };
173                    let mut reader = entry.reader_mut().compat();
174                    let bytes_read = tokio::io::copy(&mut reader, &mut writer)
175                        .await
176                        .map_err(Error::io_or_compression)?;
177                    let reader = reader.into_inner();
178
179                    (bytes_read, reader)
180                }
181                Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
182                    debug!(
183                        "Found duplicate local file header for: {}",
184                        relpath.display()
185                    );
186
187                    // Read the existing file into memory.
188                    let existing_contents = fs_err::tokio::read(&path).await.map_err(Error::Io)?;
189
190                    // Read the entry into memory.
191                    let mut expected_contents = Vec::with_capacity(existing_contents.len());
192                    let entry_reader = entry.reader_mut();
193                    let bytes_read = entry_reader
194                        .read_to_end(&mut expected_contents)
195                        .await
196                        .map_err(Error::io_or_compression)?;
197
198                    // Verify that the existing file contents match the expected contents.
199                    if existing_contents != expected_contents {
200                        if !skip_validation {
201                            return Err(Error::DuplicateLocalFileHeader {
202                                path: relpath.clone(),
203                            });
204                        }
205                    }
206
207                    (bytes_read as u64, entry_reader)
208                }
209                Err(err) => return Err(Error::Io(err)),
210            };
211
212            // Validate the uncompressed size.
213            if actual_uncompressed_size != expected_uncompressed_size {
214                if !(expected_compressed_size == 0 && expected_data_descriptor) {
215                    if !skip_validation {
216                        return Err(Error::BadUncompressedSize {
217                            path: relpath.clone(),
218                            computed: actual_uncompressed_size,
219                            expected: expected_uncompressed_size,
220                        });
221                    }
222                }
223            }
224
225            // Validate the compressed size.
226            let actual_compressed_size = reader.bytes_read();
227            if actual_compressed_size != expected_compressed_size {
228                if !(expected_compressed_size == 0 && expected_data_descriptor) {
229                    if !skip_validation {
230                        return Err(Error::BadCompressedSize {
231                            path: relpath.clone(),
232                            computed: actual_compressed_size,
233                            expected: expected_compressed_size,
234                        });
235                    }
236                }
237            }
238
239            // Validate the CRC of any file we unpack
240            // (It would be nice if async_zip made it harder to Not do this...)
241            let actual_crc32 = reader.compute_hash();
242            let expected_crc32 = reader.entry().crc32();
243            if actual_crc32 != expected_crc32 {
244                if !(expected_crc32 == 0 && expected_data_descriptor) {
245                    if !skip_validation {
246                        return Err(Error::BadCrc32 {
247                            path: relpath.clone(),
248                            computed: actual_crc32,
249                            expected: expected_crc32,
250                        });
251                    }
252                }
253            }
254
255            ComputedEntry {
256                crc32: actual_crc32,
257                uncompressed_size: actual_uncompressed_size,
258                compressed_size: actual_compressed_size,
259            }
260        };
261
262        // Close current file prior to proceeding, as per:
263        // https://docs.rs/async_zip/0.0.16/async_zip/base/read/stream/
264        let (descriptor, next) = entry.skip().await?;
265
266        // Verify that the data descriptor field is consistent with the presence (or absence) of a
267        // data descriptor in the local file header.
268        if expected_data_descriptor && descriptor.is_none() {
269            if !skip_validation {
270                return Err(Error::MissingDataDescriptor {
271                    path: relpath.clone(),
272                });
273            }
274        }
275        if !expected_data_descriptor && descriptor.is_some() {
276            if !skip_validation {
277                return Err(Error::UnexpectedDataDescriptor {
278                    path: relpath.clone(),
279                });
280            }
281        }
282
283        // If we have a data descriptor, validate it.
284        if let Some(descriptor) = descriptor {
285            if descriptor.crc != computed.crc32 {
286                if !skip_validation {
287                    return Err(Error::BadCrc32 {
288                        path: relpath.clone(),
289                        computed: computed.crc32,
290                        expected: descriptor.crc,
291                    });
292                }
293            }
294            if descriptor.uncompressed_size != computed.uncompressed_size {
295                if !skip_validation {
296                    return Err(Error::BadUncompressedSize {
297                        path: relpath.clone(),
298                        computed: computed.uncompressed_size,
299                        expected: descriptor.uncompressed_size,
300                    });
301                }
302            }
303            if descriptor.compressed_size != computed.compressed_size {
304                if !skip_validation {
305                    return Err(Error::BadCompressedSize {
306                        path: relpath.clone(),
307                        computed: computed.compressed_size,
308                        expected: descriptor.compressed_size,
309                    });
310                }
311            }
312        }
313
314        // Store the offset, for validation, and error if we see a duplicate file.
315        match local_headers.entry(file_offset) {
316            std::collections::hash_map::Entry::Vacant(entry) => {
317                entry.insert(LocalHeaderEntry {
318                    relpath,
319                    crc32: computed.crc32,
320                    uncompressed_size: computed.uncompressed_size,
321                    compressed_size: expected_compressed_size,
322                    data_descriptor: expected_data_descriptor,
323                });
324            }
325            std::collections::hash_map::Entry::Occupied(..) => {
326                if !skip_validation {
327                    return Err(Error::DuplicateLocalFileHeader {
328                        path: relpath.clone(),
329                    });
330                }
331            }
332        }
333
334        // Advance the reader to the next entry.
335        zip = next;
336
337        // Store the current offset.
338        offset = zip.offset();
339    }
340
341    // Record the actual number of entries in the central directory.
342    let mut num_entries = 0;
343
344    // Track the file modes on Unix, to ensure that they're consistent across duplicates.
345    #[cfg(unix)]
346    let mut modes =
347        FxHashMap::with_capacity_and_hasher(local_headers.len(), rustc_hash::FxBuildHasher);
348
349    let mut directory = async_zip::base::read::cd::CentralDirectoryReader::new(&mut reader, offset);
350    loop {
351        match directory.next().await? {
352            Entry::CentralDirectoryEntry(entry) => {
353                // Count the number of entries in the central directory.
354                num_entries += 1;
355
356                // Construct the (expected) path to the file on-disk.
357                let path = match entry.filename().as_str() {
358                    Ok(path) => path,
359                    Err(ZipError::StringNotUtf8) => {
360                        return Err(Error::CentralDirectoryEntryNotUtf8 {
361                            index: num_entries - 1,
362                        });
363                    }
364                    Err(err) => return Err(err.into()),
365                };
366
367                // Apply sanity checks to the file names in CD headers.
368                if let Err(e) = validate_archive_member_name(path) {
369                    if !skip_validation {
370                        return Err(e);
371                    }
372                }
373
374                // Sanitize the file name to prevent directory traversal attacks.
375                let Some(relpath) = enclosed_name(path) else {
376                    continue;
377                };
378
379                // Validate that various fields are consistent between the local file header and the
380                // central directory entry.
381                match local_headers.remove(&entry.file_offset()) {
382                    Some(local_header) => {
383                        if local_header.relpath != relpath {
384                            if !skip_validation {
385                                return Err(Error::ConflictingPaths {
386                                    offset: entry.file_offset(),
387                                    local_path: local_header.relpath.clone(),
388                                    central_directory_path: relpath.clone(),
389                                });
390                            }
391                        }
392                        if local_header.crc32 != entry.crc32() {
393                            if !skip_validation {
394                                return Err(Error::ConflictingChecksums {
395                                    path: relpath.clone(),
396                                    offset: entry.file_offset(),
397                                    local_crc32: local_header.crc32,
398                                    central_directory_crc32: entry.crc32(),
399                                });
400                            }
401                        }
402                        if local_header.uncompressed_size != entry.uncompressed_size() {
403                            if !skip_validation {
404                                return Err(Error::ConflictingUncompressedSizes {
405                                    path: relpath.clone(),
406                                    offset: entry.file_offset(),
407                                    local_uncompressed_size: local_header.uncompressed_size,
408                                    central_directory_uncompressed_size: entry.uncompressed_size(),
409                                });
410                            }
411                        }
412                        if local_header.compressed_size != entry.compressed_size() {
413                            if !local_header.data_descriptor {
414                                if !skip_validation {
415                                    return Err(Error::ConflictingCompressedSizes {
416                                        path: relpath.clone(),
417                                        offset: entry.file_offset(),
418                                        local_compressed_size: local_header.compressed_size,
419                                        central_directory_compressed_size: entry.compressed_size(),
420                                    });
421                                }
422                            }
423                        }
424                    }
425                    None => {
426                        if !skip_validation {
427                            return Err(Error::MissingLocalFileHeader {
428                                path: relpath.clone(),
429                                offset: entry.file_offset(),
430                            });
431                        }
432                    }
433                }
434
435                // On Unix, we need to set file permissions, which are stored in the central directory, at the
436                // end of the archive. The `ZipFileReader` reads until it sees a central directory signature,
437                // which indicates the first entry in the central directory. So we continue reading from there.
438                #[cfg(unix)]
439                {
440                    use std::fs::Permissions;
441                    use std::os::unix::fs::PermissionsExt;
442
443                    if entry.dir()? {
444                        continue;
445                    }
446
447                    let Some(mode) = entry.unix_permissions() else {
448                        continue;
449                    };
450
451                    // If the file is included multiple times, ensure that the mode is consistent.
452                    match modes.entry(relpath.clone()) {
453                        std::collections::hash_map::Entry::Vacant(entry) => {
454                            entry.insert(mode);
455                        }
456                        std::collections::hash_map::Entry::Occupied(entry) => {
457                            if mode != *entry.get() {
458                                if !skip_validation {
459                                    return Err(Error::DuplicateExecutableFileHeader {
460                                        path: relpath.clone(),
461                                    });
462                                }
463                            }
464                        }
465                    }
466
467                    // The executable bit is the only permission we preserve, otherwise we use the OS defaults.
468                    // https://github.com/pypa/pip/blob/3898741e29b7279e7bffe044ecfbe20f6a438b1e/src/pip/_internal/utils/unpacking.py#L88-L100
469                    let has_any_executable_bit = mode & 0o111;
470                    if has_any_executable_bit != 0 {
471                        let path = target.join(relpath);
472                        let permissions = fs_err::tokio::metadata(&path)
473                            .await
474                            .map_err(Error::Io)?
475                            .permissions();
476                        if permissions.mode() & 0o111 != 0o111 {
477                            fs_err::tokio::set_permissions(
478                                &path,
479                                Permissions::from_mode(permissions.mode() | 0o111),
480                            )
481                            .await
482                            .map_err(Error::Io)?;
483                        }
484                    }
485                }
486            }
487            Entry::EndOfCentralDirectoryRecord {
488                record,
489                comment,
490                extensible,
491            } => {
492                // Reject ZIP64 end-of-central-directory records with extensible data, as the safety
493                // tradeoffs don't outweigh the usefulness. We don't ever expect to encounter wheels
494                // that leverage this feature anyway.
495                if extensible {
496                    if !skip_validation {
497                        return Err(Error::ExtensibleData);
498                    }
499                }
500
501                // Sanitize the comment by rejecting bytes `01` to `08`. If the comment contains an
502                // embedded ZIP file, it _must_ contain one of these bytes, which are otherwise
503                // very rare (non-printing) characters.
504                if comment.as_bytes().iter().any(|&b| (1..=8).contains(&b)) {
505                    if !skip_validation {
506                        return Err(Error::ZipInZip);
507                    }
508                }
509
510                // Validate that the reported number of entries match what we experienced while
511                // reading the local file headers.
512                if record.num_entries() != num_entries {
513                    if !skip_validation {
514                        return Err(Error::ConflictingNumberOfEntries {
515                            expected: num_entries,
516                            actual: record.num_entries(),
517                        });
518                    }
519                }
520
521                break;
522            }
523        }
524    }
525
526    // If we didn't see the file in the central directory, it means it was not present in the
527    // archive.
528    if !skip_validation {
529        if let Some((key, value)) = local_headers.iter().next() {
530            return Err(Error::MissingCentralDirectoryEntry {
531                offset: *key,
532                path: value.relpath.clone(),
533            });
534        }
535    }
536
537    // Determine whether the reader is exhausted, but allow trailing null bytes, which some zip
538    // implementations incorrectly include.
539    if !skip_validation {
540        let mut has_trailing_bytes = false;
541        let mut buf = [0u8; 256];
542        loop {
543            let n = reader.read(&mut buf).await.map_err(Error::Io)?;
544            if n == 0 {
545                if has_trailing_bytes {
546                    warn!("Ignoring trailing null bytes in ZIP archive");
547                }
548                break;
549            }
550            for &b in &buf[..n] {
551                if b == 0 {
552                    has_trailing_bytes = true;
553                } else {
554                    return Err(Error::TrailingContents);
555                }
556            }
557        }
558    }
559
560    Ok(())
561}
562
563/// Unpack the given tar archive into the destination directory.
564///
565/// This is equivalent to `archive.unpack_in(dst)`, but it also preserves the executable bit.
566async fn untar_in(
567    mut archive: tokio_tar::Archive<&'_ mut (dyn tokio::io::AsyncRead + Unpin)>,
568    dst: &Path,
569) -> std::io::Result<()> {
570    // Like `tokio-tar`, canonicalize the destination prior to unpacking.
571    let dst = fs_err::tokio::canonicalize(dst).await?;
572
573    // Memoize filesystem calls to canonicalize paths.
574    let mut memo = FxHashSet::default();
575
576    let mut entries = archive.entries()?;
577    let mut pinned = Pin::new(&mut entries);
578    while let Some(entry) = pinned.next().await {
579        // Unpack the file into the destination directory.
580        let mut file = entry?;
581
582        // On Windows, skip symlink entries, as they're not supported. pip recursively copies the
583        // symlink target instead.
584        if cfg!(windows) && file.header().entry_type().is_symlink() {
585            warn!(
586                "Skipping symlink in tar archive: {}",
587                file.path()?.display()
588            );
589            continue;
590        }
591
592        // Unpack the file into the destination directory.
593        #[cfg_attr(not(unix), allow(unused_variables))]
594        let unpacked_at = file.unpack_in_raw(&dst, &mut memo).await?;
595
596        // Preserve the executable bit.
597        #[cfg(unix)]
598        {
599            use std::fs::Permissions;
600            use std::os::unix::fs::PermissionsExt;
601
602            let entry_type = file.header().entry_type();
603            if entry_type.is_file() || entry_type.is_hard_link() {
604                let mode = file.header().mode()?;
605                let has_any_executable_bit = mode & 0o111;
606                if has_any_executable_bit != 0 {
607                    if let Some(path) = unpacked_at.as_deref() {
608                        let permissions = fs_err::tokio::metadata(&path).await?.permissions();
609                        if permissions.mode() & 0o111 != 0o111 {
610                            fs_err::tokio::set_permissions(
611                                &path,
612                                Permissions::from_mode(permissions.mode() | 0o111),
613                            )
614                            .await?;
615                        }
616                    }
617                }
618            }
619        }
620    }
621
622    Ok(())
623}
624
625/// Unpack a `.tar.gz` archive into the target directory, without requiring `Seek`.
626///
627/// This is useful for unpacking files as they're being downloaded.
628pub async fn untar_gz<R: tokio::io::AsyncRead + Unpin>(
629    reader: R,
630    target: impl AsRef<Path>,
631) -> Result<(), Error> {
632    let reader = tokio::io::BufReader::with_capacity(DEFAULT_BUF_SIZE, reader);
633    let mut decompressed_bytes = async_compression::tokio::bufread::GzipDecoder::new(reader);
634
635    let archive = tokio_tar::ArchiveBuilder::new(
636        &mut decompressed_bytes as &mut (dyn tokio::io::AsyncRead + Unpin),
637    )
638    .set_preserve_mtime(false)
639    .set_preserve_permissions(false)
640    .set_allow_external_symlinks(false)
641    .build();
642    untar_in(archive, target.as_ref())
643        .await
644        .map_err(Error::io_or_compression)
645}
646
647/// Unpack a `.tar.bz2` archive into the target directory, without requiring `Seek`.
648///
649/// This is useful for unpacking files as they're being downloaded.
650pub async fn untar_bz2<R: tokio::io::AsyncRead + Unpin>(
651    reader: R,
652    target: impl AsRef<Path>,
653) -> Result<(), Error> {
654    let reader = tokio::io::BufReader::with_capacity(DEFAULT_BUF_SIZE, reader);
655    let mut decompressed_bytes = async_compression::tokio::bufread::BzDecoder::new(reader);
656
657    let archive = tokio_tar::ArchiveBuilder::new(
658        &mut decompressed_bytes as &mut (dyn tokio::io::AsyncRead + Unpin),
659    )
660    .set_preserve_mtime(false)
661    .set_preserve_permissions(false)
662    .set_allow_external_symlinks(false)
663    .build();
664    untar_in(archive, target.as_ref())
665        .await
666        .map_err(Error::io_or_compression)
667}
668
669/// Unpack a `.tar.zst` archive into the target directory, without requiring `Seek`.
670///
671/// This is useful for unpacking files as they're being downloaded.
672pub async fn untar_zst<R: tokio::io::AsyncRead + Unpin>(
673    reader: R,
674    target: impl AsRef<Path>,
675) -> Result<(), Error> {
676    let reader = tokio::io::BufReader::with_capacity(DEFAULT_BUF_SIZE, reader);
677    let mut decompressed_bytes = async_compression::tokio::bufread::ZstdDecoder::new(reader);
678
679    let archive = tokio_tar::ArchiveBuilder::new(
680        &mut decompressed_bytes as &mut (dyn tokio::io::AsyncRead + Unpin),
681    )
682    .set_preserve_mtime(false)
683    .set_preserve_permissions(false)
684    .set_allow_external_symlinks(false)
685    .build();
686    untar_in(archive, target.as_ref())
687        .await
688        .map_err(Error::io_or_compression)
689}
690
691/// Unpack a `.tar.zst` archive from a file on disk into the target directory.
692pub fn untar_zst_file<R: std::io::Read>(reader: R, target: impl AsRef<Path>) -> Result<(), Error> {
693    let reader = std::io::BufReader::with_capacity(DEFAULT_BUF_SIZE, reader);
694    let decompressed = zstd::Decoder::new(reader).map_err(Error::Io)?;
695    let mut archive = tar::Archive::new(decompressed);
696    archive.set_preserve_mtime(false);
697    archive.unpack(target).map_err(Error::io_or_compression)?;
698    Ok(())
699}
700
701/// Unpack a `.tar.xz` archive into the target directory, without requiring `Seek`.
702///
703/// This is useful for unpacking files as they're being downloaded.
704pub async fn untar_xz<R: tokio::io::AsyncRead + Unpin>(
705    reader: R,
706    target: impl AsRef<Path>,
707) -> Result<(), Error> {
708    let reader = tokio::io::BufReader::with_capacity(DEFAULT_BUF_SIZE, reader);
709    let mut decompressed_bytes = async_compression::tokio::bufread::XzDecoder::new(reader);
710
711    let archive = tokio_tar::ArchiveBuilder::new(
712        &mut decompressed_bytes as &mut (dyn tokio::io::AsyncRead + Unpin),
713    )
714    .set_preserve_mtime(false)
715    .set_preserve_permissions(false)
716    .set_allow_external_symlinks(false)
717    .build();
718    untar_in(archive, target.as_ref())
719        .await
720        .map_err(Error::io_or_compression)?;
721    Ok(())
722}
723
724/// Unpack a `.tar` archive into the target directory, without requiring `Seek`.
725///
726/// This is useful for unpacking files as they're being downloaded.
727pub async fn untar<R: tokio::io::AsyncRead + Unpin>(
728    reader: R,
729    target: impl AsRef<Path>,
730) -> Result<(), Error> {
731    let mut reader = tokio::io::BufReader::with_capacity(DEFAULT_BUF_SIZE, reader);
732
733    let archive =
734        tokio_tar::ArchiveBuilder::new(&mut reader as &mut (dyn tokio::io::AsyncRead + Unpin))
735            .set_preserve_mtime(false)
736            .set_preserve_permissions(false)
737            .set_allow_external_symlinks(false)
738            .build();
739    untar_in(archive, target.as_ref())
740        .await
741        .map_err(Error::io_or_compression)?;
742    Ok(())
743}
744
745/// Unpack a `.zip`, `.tar.gz`, `.tar.bz2`, `.tar.zst`, or `.tar.xz` archive into the target directory,
746/// without requiring `Seek`.
747pub async fn archive<R: tokio::io::AsyncRead + Unpin>(
748    reader: R,
749    ext: SourceDistExtension,
750    target: impl AsRef<Path>,
751) -> Result<(), Error> {
752    match ext {
753        SourceDistExtension::Zip => {
754            unzip(reader, target).await?;
755        }
756        SourceDistExtension::Tar => {
757            untar(reader, target).await?;
758        }
759        SourceDistExtension::Tgz | SourceDistExtension::TarGz => {
760            untar_gz(reader, target).await?;
761        }
762        SourceDistExtension::Tbz | SourceDistExtension::TarBz2 => {
763            untar_bz2(reader, target).await?;
764        }
765        SourceDistExtension::Txz
766        | SourceDistExtension::TarXz
767        | SourceDistExtension::Tlz
768        | SourceDistExtension::TarLz
769        | SourceDistExtension::TarLzma => {
770            untar_xz(reader, target).await?;
771        }
772        SourceDistExtension::TarZst => {
773            untar_zst(reader, target).await?;
774        }
775    }
776    Ok(())
777}