use crate::Result;
use crate::{create, extract};
use futures::FutureExt;
use snafu::{IntoError, ResultExt};
use std::io::Write;
use std::ops::Range;
use std::{
future::Future,
path::PathBuf,
sync::{Arc, Mutex},
};
use tracing::{debug, error, warn};
mod internal;
pub(crate) struct TarBuilderWrapper<W: std::io::Write + Send + 'static> {
builder: Option<Arc<Mutex<tar::Builder<CountingWriter<W>>>>>,
current_byte_offset: Arc<Mutex<u64>>,
}
impl<W: std::io::Write + Send + 'static> Clone for TarBuilderWrapper<W> {
fn clone(&self) -> Self {
Self {
builder: self.builder.clone(),
current_byte_offset: self.current_byte_offset.clone(),
}
}
}
impl<W: std::io::Write + Send + 'static> Drop for TarBuilderWrapper<W> {
fn drop(&mut self) {
if let Some(builder) = self.builder.take() {
if let Some(builder) = Arc::try_unwrap(builder)
.ok()
.map(|mutex| mutex.into_inner().unwrap())
{
if let Ok(handle) = tokio::runtime::Handle::try_current() {
warn!("tar builder dropped without being finished, probably due to an error. spawning cleanup on blocking thread.");
handle.spawn(Self::drop_builder_async(builder));
} else {
warn!("tar builder is being dropped outside of async context");
if let Err(e) = Self::drop_builder(builder) {
error!(?e, "sync drop of builder failed");
}
}
}
}
}
}
impl<W: std::io::Write + Send + 'static> TarBuilderWrapper<W> {
pub fn new(writer: W, progress: Arc<dyn create::CreateProgressCallback>) -> Self {
let writer = CountingWriter::new(writer, progress);
Self {
builder: Some(Arc::new(Mutex::new(tar::Builder::new(writer)))),
current_byte_offset: Arc::new(Mutex::new(0)),
}
}
pub fn spawn_append_data(
&self,
mut header: tar::Header,
path: impl Into<PathBuf>,
data: impl std::io::Read + Send + 'static,
) -> impl Future<Output = Result<Range<u64>>> {
let path = path.into();
struct CountingReader<R> {
inner: R,
bytes_read: u64,
}
impl<R: std::io::Read> std::io::Read for CountingReader<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let result = self.inner.read(buf);
if let Ok(bytes_read) = &result {
self.bytes_read += *bytes_read as u64;
}
result
}
}
let current_byte_offset = self.current_byte_offset.clone();
self.with_builder_mut(move |builder| {
let mut reader = CountingReader {
inner: data,
bytes_read: 0,
};
builder
.append_data(&mut header, &path, &mut reader)
.with_context(|_| crate::error::TarAppendDataSnafu {})?;
let header_len = internal::calculate_header_size(header, &path)
.with_context(|_| crate::error::TarAppendDataSnafu {})?
as u64;
let data_len: u64 = reader.bytes_read;
let remaining = 512 - (data_len % 512);
let zero_pad = if remaining < 512 { remaining } else { 0 };
let mut guard = current_byte_offset.lock().unwrap();
debug!(
header_len,
data_len,
zero_pad,
current_byte_offset = *guard,
"appended data to archive"
);
let data_range = (*guard + header_len)..(*guard + header_len + data_len);
*guard += header_len + data_len + zero_pad;
Ok(data_range)
})
}
pub fn with_builder_mut<F, T>(&self, f: F) -> impl Future<Output = Result<T>>
where
F: FnOnce(&mut tar::Builder<CountingWriter<W>>) -> Result<T> + Send + 'static,
T: Send + 'static,
{
let builder = self.builder();
tokio::task::spawn_blocking(move || {
let mut guard = builder.lock().unwrap();
f(&mut guard)
})
.map(|result| result.with_context(|_| crate::error::SpawnBlockingSnafu {})?)
}
pub async fn finish_and_close(mut self) -> Result<u64> {
if let Some(builder) = self.builder.take() {
let builder = Arc::try_unwrap(builder)
.unwrap_or_else(|_| panic!("BUG: all tar builder clones should be dropped by now"))
.into_inner()
.unwrap();
Self::drop_builder_async(builder).await
} else {
unreachable!("BUG: builder already dropped before call to finish_and_close")
}
}
fn builder(&self) -> Arc<Mutex<tar::Builder<CountingWriter<W>>>> {
self.builder.clone().expect("BUG: already dropped")
}
async fn drop_builder_async(builder: tar::Builder<CountingWriter<W>>) -> Result<u64> {
let fut = tokio::task::spawn_blocking(move || Self::drop_builder(builder));
match fut.await {
Err(e) => {
error!(
?e,
"blocking task to drop tar builder panicked or was canceleed"
);
Err(crate::error::SpawnBlockingSnafu {}.into_error(e))
}
Ok(Err(e)) => {
error!(?e, "finalizing the tar builder reported an error");
Err(e)
}
Ok(Ok(bytes_written)) => Ok(bytes_written),
}
}
fn drop_builder(builder: tar::Builder<CountingWriter<W>>) -> Result<u64> {
let mut blocking_writer = builder
.into_inner()
.with_context(|_| crate::error::FlushSnafu {})?;
blocking_writer
.flush()
.with_context(|_| crate::error::FlushSnafu {})?;
Ok(blocking_writer.total_bytes_written)
}
}
pub(crate) struct CountingWriter<W: std::io::Write + Send + 'static> {
inner: W,
progress: Arc<dyn create::CreateProgressCallback>,
total_bytes_written: u64,
}
impl<W: std::io::Write + Send + 'static> CountingWriter<W> {
fn new(writer: W, progress: Arc<dyn create::CreateProgressCallback>) -> Self {
Self {
inner: writer,
progress,
total_bytes_written: 0,
}
}
}
impl<W: std::io::Write + Send + 'static> std::io::Write for CountingWriter<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let bytes_written = self.inner.write(buf)?;
self.progress.archive_bytes_written(bytes_written);
self.total_bytes_written += bytes_written as u64;
Ok(bytes_written)
}
fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
}
}
pub(crate) struct CountingReader<R: std::io::Read + Send + 'static> {
inner: R,
progress: Arc<dyn extract::ExtractProgressCallback>,
total_bytes_read: u64,
}
impl<R: std::io::Read + Send + 'static> CountingReader<R> {
pub(crate) fn new(reader: R, progress: Arc<dyn extract::ExtractProgressCallback>) -> Self {
Self {
inner: reader,
progress,
total_bytes_read: 0,
}
}
pub fn total_bytes_read(&self) -> u64 {
self.total_bytes_read
}
}
impl<R: std::io::Read + Send + 'static> std::io::Read for CountingReader<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let bytes_read = self.inner.read(buf)?;
self.progress.extract_archive_part_read(bytes_read);
self.total_bytes_read += bytes_read as u64;
Ok(bytes_read)
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::rngs::ThreadRng;
use rand::{
distributions::{Alphanumeric, Standard},
Rng,
};
use std::io::{Cursor, Read, Seek};
use std::path::PathBuf;
struct TestTarData {
path: PathBuf,
data: Vec<u8>,
}
impl TestTarData {
fn generate_random(rng: &mut ThreadRng) -> Self {
let path_length: usize = rng.gen_range(1..1024);
let random_string: String = rng
.sample_iter(&Alphanumeric)
.take(path_length)
.map(char::from)
.collect();
let path = PathBuf::from(random_string);
let data_length: usize = rng.gen_range(1..10_000);
let data: Vec<u8> = rng.sample_iter(&Standard).take(data_length).collect();
Self { path, data }
}
}
#[tokio::test]
async fn builder_calculates_data_location() {
let mut rng = rand::thread_rng();
println!("Generating test data");
let test_data = (0..100)
.map(|_| TestTarData::generate_random(&mut rng))
.collect::<Vec<_>>();
let temp_dir = tempfile::TempDir::new().unwrap();
let tar_path = temp_dir.path().join("test.tar");
let tar_file = std::fs::File::create(&tar_path).unwrap();
struct NoProgress;
impl crate::CreateProgressCallback for NoProgress {}
let builder = TarBuilderWrapper::new(tar_file, Arc::new(NoProgress));
println!("Writing test data to tar file {}", tar_path.display());
let mut test_data_with_ranges = Vec::with_capacity(test_data.len());
for test_data in test_data {
let mut header = tar::Header::new_gnu();
header.set_size(test_data.data.len() as u64);
header.set_cksum();
let data_range = builder
.spawn_append_data(
header,
test_data.path.clone(),
Cursor::new(test_data.data.clone()),
)
.await
.unwrap();
println!(
"Test data file '{}' with length {} is in tar file at {:?}",
test_data.path.display(),
test_data.data.len(),
data_range,
);
test_data_with_ranges.push((test_data, data_range));
}
builder.finish_and_close().await.unwrap();
println!(
"Reading back test data from tar file {}",
tar_path.display()
);
let mut tar_file = std::fs::File::open(&tar_path).unwrap();
for (test_data, data_range) in test_data_with_ranges {
let mut data = vec![0; test_data.data.len()];
println!(
"Validating data for test data file {} at {:?}",
test_data.path.display(),
data_range
);
tar_file
.seek(std::io::SeekFrom::Start(data_range.start))
.unwrap();
tar_file.read_exact(&mut data).unwrap();
if data != test_data.data {
tar_file.seek(std::io::SeekFrom::Start(0)).unwrap();
let mut entire_file = vec![];
tar_file.read_to_end(&mut entire_file).unwrap();
fn find_substring(entire_file: &[u8], substring: &[u8]) -> Option<usize> {
let window_size = substring.len();
entire_file
.windows(window_size)
.position(|window| window == substring)
}
if let Some(offset) = find_substring(&entire_file, &test_data.data) {
panic!("Test data file {} contents was found at offset {} in tar file, but the builder claimed the data range is {:?}",
test_data.path.display(), offset,
data_range);
} else {
panic!("Test data file {} contents was not found in tar file at all, but the builder claimed the data range is {:?}",
test_data.path.display(),
data_range);
}
}
}
}
}