use std::path::Path;
use async_compression::tokio::bufread::BzDecoder;
use async_spooled_tempfile::SpooledTempFile;
use async_zip::base::read::stream::ZipFileReader;
#[cfg(feature = "reqwest")]
use futures_util::StreamExt;
#[cfg(feature = "reqwest")]
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncRead, AsyncSeekExt};
use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt};
use crate::{read::SizeCountingReader, ExtractError, ExtractResult};
use super::shared::{extract_tar_zst_entry, unpack_tar_archive, DEFAULT_BUF_SIZE};
pub async fn extract_tar_bz2(
reader: impl AsyncRead + Send + Unpin + 'static,
destination: &Path,
) -> Result<ExtractResult, ExtractError> {
tokio::fs::create_dir_all(destination)
.await
.map_err(ExtractError::CouldNotCreateDestination)?;
let destination = destination.to_owned();
let sha256_reader = rattler_digest::HashingReader::<_, rattler_digest::Sha256>::new(reader);
let mut md5_reader =
rattler_digest::HashingReader::<_, rattler_digest::Md5>::new(sha256_reader);
let mut size_reader = SizeCountingReader::new(&mut md5_reader);
let buf_reader = tokio::io::BufReader::with_capacity(DEFAULT_BUF_SIZE, &mut size_reader);
let decoder = BzDecoder::new(buf_reader);
let archive = tokio_tar::ArchiveBuilder::new(decoder)
.set_preserve_mtime(true)
.set_preserve_permissions(false)
.set_unpack_xattrs(false)
.set_allow_external_symlinks(true)
.build();
unpack_tar_archive(archive, &destination).await?;
tokio::io::copy(&mut size_reader, &mut tokio::io::sink())
.await
.map_err(ExtractError::IoError)?;
let (_, total_size) = size_reader.finalize();
let (sha256_reader, md5) = md5_reader.finalize();
let (_, sha256) = sha256_reader.finalize();
if total_size == 0 {
return Err(ExtractError::IoError(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"no data was read from the package stream - the stream may have been truncated",
)));
}
Ok(ExtractResult {
sha256,
md5,
total_size,
})
}
pub async fn extract_conda(
reader: impl AsyncRead + Send + Unpin + 'static,
destination: &Path,
) -> Result<ExtractResult, ExtractError> {
tokio::fs::create_dir_all(destination)
.await
.map_err(ExtractError::CouldNotCreateDestination)?;
let destination = destination.to_owned();
let sha256_reader = rattler_digest::HashingReader::<_, rattler_digest::Sha256>::new(reader);
let mut md5_reader =
rattler_digest::HashingReader::<_, rattler_digest::Md5>::new(sha256_reader);
let mut size_reader = SizeCountingReader::new(&mut md5_reader);
let compat_reader = (&mut size_reader).compat();
let mut buf_reader = futures::io::BufReader::with_capacity(DEFAULT_BUF_SIZE, compat_reader);
let mut zip_reader = ZipFileReader::new(&mut buf_reader);
while let Some(mut entry) = zip_reader
.next_with_entry()
.await
.map_err(|e| ExtractError::IoError(std::io::Error::other(e)))?
{
let filename = entry.reader().entry().filename().as_str().map_err(|e| {
ExtractError::IoError(std::io::Error::new(std::io::ErrorKind::InvalidData, e))
})?;
if filename.ends_with(".tar.zst") {
let mut compat_entry = entry.reader_mut().compat();
extract_tar_zst_entry(&mut compat_entry, &destination).await?;
}
(.., zip_reader) = entry
.skip()
.await
.map_err(|e| ExtractError::IoError(std::io::Error::other(e)))?;
}
futures::io::copy(&mut buf_reader, &mut futures::io::sink())
.await
.map_err(ExtractError::IoError)?;
let (_, total_size) = size_reader.finalize();
let (sha256_reader, md5) = md5_reader.finalize();
let (_, sha256) = sha256_reader.finalize();
if total_size == 0 {
return Err(ExtractError::IoError(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"no data was read from the package stream - the stream may have been truncated",
)));
}
Ok(ExtractResult {
sha256,
md5,
total_size,
})
}
pub async fn extract_conda_via_buffering(
reader: impl AsyncRead + Send + Unpin + 'static,
destination: &Path,
) -> Result<ExtractResult, ExtractError> {
if tokio::fs::try_exists(destination)
.await
.map_err(ExtractError::IoError)?
{
tokio::fs::remove_dir_all(destination)
.await
.map_err(ExtractError::CouldNotCreateDestination)?;
}
tokio::fs::create_dir_all(destination)
.await
.map_err(ExtractError::CouldNotCreateDestination)?;
let destination = destination.to_owned();
let sha256_reader = rattler_digest::HashingReader::<_, rattler_digest::Sha256>::new(reader);
let mut md5_reader =
rattler_digest::HashingReader::<_, rattler_digest::Md5>::new(sha256_reader);
let mut size_reader = SizeCountingReader::new(&mut md5_reader);
let mut spooled_file = SpooledTempFile::new(5 * 1024 * 1024);
tokio::io::copy(&mut size_reader, &mut spooled_file)
.await
.map_err(ExtractError::IoError)?;
let (_, total_size) = size_reader.finalize();
let (sha256_reader, md5) = md5_reader.finalize();
let (_, sha256) = sha256_reader.finalize();
if total_size == 0 {
return Err(ExtractError::IoError(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"no data was read from the package stream - the stream may have been truncated",
)));
}
spooled_file.rewind().await.map_err(ExtractError::IoError)?;
crate::tokio::async_seek::extract_conda(spooled_file, &destination).await?;
Ok(ExtractResult {
sha256,
md5,
total_size,
})
}
#[cfg(feature = "reqwest")]
pub(crate) fn conda_entry_prefix(target_path: &Path) -> &'static str {
if target_path.starts_with("info") {
"info-"
} else {
"pkg-"
}
}
#[cfg(feature = "reqwest")]
pub(crate) async fn get_file_from_tar_archive<R: tokio::io::AsyncRead + Unpin>(
archive: &mut tokio_tar::Archive<R>,
file_name: &Path,
) -> Result<Option<Vec<u8>>, ExtractError> {
let mut entries = archive.entries().map_err(ExtractError::IoError)?;
while let Some(entry) = entries.next().await {
let mut entry = entry.map_err(ExtractError::IoError)?;
let path = entry.path().map_err(ExtractError::IoError)?;
if path.as_ref() == file_name {
let size = entry.header().size().map_err(ExtractError::IoError)?;
let mut buf = Vec::with_capacity(size as usize);
entry
.read_to_end(&mut buf)
.await
.map_err(ExtractError::IoError)?;
return Ok(Some(buf));
}
}
Ok(None)
}
#[cfg(all(test, feature = "reqwest"))]
mod tests {
use super::conda_entry_prefix;
use std::path::Path;
#[test]
fn test_conda_entry_prefix_info_files() {
assert_eq!(conda_entry_prefix(Path::new("info/index.json")), "info-");
assert_eq!(conda_entry_prefix(Path::new("info/about.json")), "info-");
assert_eq!(conda_entry_prefix(Path::new("info/paths.json")), "info-");
assert_eq!(
conda_entry_prefix(Path::new("info/nested/deep/file.txt")),
"info-"
);
}
#[test]
fn test_conda_entry_prefix_pkg_files() {
assert_eq!(conda_entry_prefix(Path::new("lib/libz.so")), "pkg-");
assert_eq!(conda_entry_prefix(Path::new("bin/python")), "pkg-");
assert_eq!(conda_entry_prefix(Path::new("clobber")), "pkg-");
}
#[test]
fn test_conda_entry_prefix_info_bare() {
assert_eq!(conda_entry_prefix(Path::new("info")), "info-");
}
#[test]
fn test_conda_entry_prefix_info_like_but_not_info_dir() {
assert_eq!(conda_entry_prefix(Path::new("info-custom.txt")), "pkg-");
assert_eq!(conda_entry_prefix(Path::new("information/file")), "pkg-");
}
}