use crate::objstore::{Bucket, MultipartUploader, ObjectStorageFactory};
use crate::tar::CountingReader;
use crate::{Config, Result};
use bytes::{Bytes, BytesMut};
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use snafu::{prelude::*, IntoError};
use std::future::Future;
use std::io::{BufReader, Read};
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tracing::{debug, debug_span, error, info, info_span, instrument, Instrument};
use url::Url;
pub enum SourceArchive {
ObjectStorage(Url),
File(PathBuf),
Reader(Box<dyn Read + Send + Sync>),
}
impl std::fmt::Debug for SourceArchive {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ObjectStorage(url) => f.debug_tuple("ObjectStorage").field(url).finish(),
Self::File(path) => f.debug_tuple("File").field(path).finish(),
Self::Reader(_) => f
.debug_tuple("Reader")
.field(&"dyn Read".to_string())
.finish(),
}
}
}
impl SourceArchive {
#[instrument(skip(config))]
async fn into_internal(self, config: Config) -> Result<SourceArchiveInternal> {
match self {
Self::ObjectStorage(url) => {
let objstore = ObjectStorageFactory::from_url(config, &url).await?;
let (bucket, key, version_id) = objstore.parse_url(&url).await?;
let key = key.ok_or_else(|| {
crate::error::ArchiveUrlInvalidSnafu { url: url.clone() }.build()
})?;
let len = bucket.get_object_size(key.clone(), None).await?;
Ok(SourceArchiveInternal::ObjectStorage {
bucket,
key,
version_id,
len,
})
}
Self::File(path) => {
let metadata = tokio::fs::metadata(&path).await.with_context(|_| {
crate::error::OpeningArchiveFileSnafu { path: path.clone() }
})?;
Ok(SourceArchiveInternal::File { path, metadata })
}
Self::Reader(reader) => {
Ok(SourceArchiveInternal::Reader(reader))
}
}
}
}
enum SourceArchiveInternal {
ObjectStorage {
bucket: Box<dyn Bucket>,
key: String,
version_id: Option<String>,
len: u64,
},
File {
path: PathBuf,
metadata: std::fs::Metadata,
},
Reader(Box<dyn Read + Send + Sync>),
}
impl std::fmt::Debug for SourceArchiveInternal {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ObjectStorage {
bucket,
key,
version_id,
len,
} => f
.debug_struct("ObjectStorage")
.field("bucket", &bucket.name())
.field("key", &key)
.field("version_id", &version_id)
.field("len", &len)
.finish(),
Self::File { path, metadata } => f
.debug_struct("File")
.field("path", &path)
.field("metadata", metadata)
.finish(),
Self::Reader(_) => f
.debug_tuple("Reader")
.field(&"dyn Read".to_string())
.finish(),
}
}
}
impl SourceArchiveInternal {
async fn into_reader(
self,
progress: Arc<dyn ExtractProgressCallback>,
) -> Result<CountingReader<Box<dyn Read + Send + Sync>>> {
let read_buffer_size = 256 * 1024;
let reader: Box<dyn Read + Send + Sync> = match self {
Self::ObjectStorage {
bucket,
key,
version_id,
len,
} => {
Box::new(crate::async_bridge::stream_as_reader(
tokio_stream::wrappers::ReceiverStream::new(
bucket.read_object(key, version_id, 0..len).await?,
),
))
}
SourceArchiveInternal::File { path, metadata: _ } => {
let file = tokio::task::spawn_blocking(move || {
std::fs::File::open(&path).with_context(|_| {
crate::error::OpeningArchiveFileSnafu { path: path.clone() }
})
})
.await
.with_context(|_| crate::error::SpawnBlockingSnafu {})??;
Box::new(BufReader::with_capacity(read_buffer_size, file))
}
SourceArchiveInternal::Reader(reader) => {
Box::new(BufReader::with_capacity(read_buffer_size, reader))
}
};
Ok(CountingReader::new(reader, progress))
}
}
impl SourceArchiveInternal {
async fn get_size(&self) -> Result<Option<u64>> {
match self {
Self::ObjectStorage { len, .. } => {
Ok(Some(*len))
}
Self::File { metadata, .. } => {
Ok(Some(metadata.len()))
}
Self::Reader(_) => {
Ok(None)
}
}
}
}
#[derive(Debug)]
pub enum ExtractFilter {
Object { key: String },
Prefix { prefix: String },
Glob { pattern: glob::Pattern },
}
impl FromStr for ExtractFilter {
type Err = crate::S3TarError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.is_empty() || s == "/" {
crate::error::InvalidFilterSnafu {
filter: s.to_string(),
}
.fail()
} else if s.contains('*') || s.contains('?') || s.contains('[') || s.contains(']') {
Ok(Self::Glob {
pattern: glob::Pattern::from_str(s).with_context(|_| {
crate::error::InvalidGlobPatternSnafu {
pattern: s.to_string(),
}
})?,
})
} else if s.ends_with('/') {
Ok(Self::Prefix {
prefix: s.to_string(),
})
} else {
Ok(Self::Object { key: s.to_string() })
}
}
}
impl ExtractFilter {
fn matches(&self, path: &Path) -> bool {
match self {
Self::Object { key } => path.to_string_lossy() == key.as_ref(),
Self::Prefix { prefix } => path.to_string_lossy().starts_with(&**prefix),
Self::Glob { pattern } => {
let match_options = glob::MatchOptions {
require_literal_separator: true,
..Default::default()
};
pattern.matches_with(&path.to_string_lossy(), match_options)
}
}
}
}
#[derive(Debug)]
pub struct ExtractArchiveJobBuilder {
config: Config,
source_archive: SourceArchive,
target_bucket: Box<dyn Bucket>,
target_prefix: String,
filters: Vec<ExtractFilter>,
}
impl ExtractArchiveJobBuilder {
pub async fn new(config: Config, source: SourceArchive, target: Url) -> Result<Self> {
let objstore = ObjectStorageFactory::from_url(config.clone(), &target).await?;
let (bucket, key, _) = objstore.parse_url(&target).await?;
Ok(Self {
config,
source_archive: source,
target_bucket: bucket,
target_prefix: key.unwrap_or_default(),
filters: Vec::new(),
})
}
pub fn add_filter(&mut self, filter: impl AsRef<str>) -> Result<()> {
self.filters.push(filter.as_ref().parse()?);
Ok(())
}
pub async fn build(self) -> Result<ExtractArchiveJob> {
let source_archive = self
.source_archive
.into_internal(self.config.clone())
.await?;
let archive_size = source_archive.get_size().await?;
Ok(ExtractArchiveJob {
config: self.config,
source_archive,
target_bucket: self.target_bucket,
target_prefix: self.target_prefix,
filters: self.filters,
archive_size,
})
}
}
#[allow(unused_variables)]
pub trait ExtractProgressCallback: Sync + Send {
fn extract_starting(&self, archive_size: Option<u64>) {}
fn extract_archive_part_read(&self, bytes: usize) {}
fn extract_object_skipped(&self, key: &str, size: u64) {}
fn extract_object_starting(&self, key: &str, size: u64) {}
fn extract_object_part_read(&self, key: &str, bytes: usize) {}
fn extract_object_finished(&self, key: &str, size: u64) {}
fn extract_finished(
&self,
extracted_objects: usize,
extracted_object_bytes: u64,
skipped_objects: usize,
skipped_object_bytes: u64,
total_bytes: u64,
duration: Duration,
) {
}
fn object_upload_starting(&self, key: &str, size: u64) {}
fn object_part_uploaded(&self, key: &str, bytes: usize) {}
fn object_uploaded(&self, key: &str, size: u64) {}
fn objects_uploaded(&self, total_objects: usize, total_object_bytes: u64, duration: Duration) {}
}
#[derive(Debug)]
pub struct ExtractArchiveJob {
config: Config,
source_archive: SourceArchiveInternal,
target_bucket: Box<dyn Bucket>,
target_prefix: String,
filters: Vec<ExtractFilter>,
archive_size: Option<u64>,
}
impl ExtractArchiveJob {
pub fn archive_size(&self) -> Option<u64> {
self.archive_size
}
pub async fn run_without_progress(self, abort: impl Future<Output = ()>) -> Result<()> {
struct NoProgress {}
impl ExtractProgressCallback for NoProgress {}
self.run(abort, NoProgress {}).await
}
pub async fn run<Abort, Progress>(self, _abort: Abort, progress: Progress) -> Result<()>
where
Abort: Future<Output = ()>,
Progress: ExtractProgressCallback + 'static,
{
let span = info_span!("run",
source_archive = ?self.source_archive,
target_bucket = self.target_bucket.name(),
target_prefix = %self.target_prefix);
async move {
info!(?self.filters, ?self.archive_size, "Starting extract archive job");
let progress: Arc<dyn ExtractProgressCallback> = Arc::new(progress);
progress.extract_starting(self.archive_size);
let (entry_sender, entry_receiver) = mpsc::channel(self.config.max_concurrent_requests);
let reader_span =
info_span!("read_tar_entries_blocking", source_archive = ?self.source_archive);
let reader = self.source_archive.into_reader(progress.clone()).await?;
let reader_fut = {
let progress = progress.clone();
let target_bucket = self.target_bucket.clone();
tokio::task::spawn_blocking(move || {
let _guard = reader_span.enter();
debug!("Starting blocking tar read task");
match Self::read_tar_entries_blocking(
target_bucket,
self.filters,
reader,
progress,
entry_sender,
) {
Ok(()) => {
debug!("Blocking tar read task completing successfully");
Ok(())
}
Err(e) => {
error!(err = ?e, "Blocking tar read task failed");
Err(e)
}
}
})
};
let processor_fut = {
let progress = progress.clone();
tokio::spawn(async move {
debug!("Starting tar entry processor task");
match Self::process_tar_entries(
self.config.max_concurrent_requests,
self.target_bucket,
self.target_prefix,
progress,
entry_receiver,
)
.await
{
Ok(totals) => {
debug!("Tar entry processor task completed successfully");
Ok(totals)
}
Err(e) => {
error!(err = ?e, "Tar entry processor task failed");
Err(e)
}
}
})
};
let (reader_result, processor_result) = futures::join!(reader_fut, processor_fut);
let reader_result =
reader_result.with_context(|_| crate::error::SpawnBlockingSnafu {})?;
let processor_result =
processor_result.with_context(|_| crate::error::SpawnSnafu {})?;
let (total_objects, total_object_bytes, elapsed) = match reader_result {
Ok(_) => {
processor_result?
}
e @ Err(crate::S3TarError::TarExtractAborted) => {
processor_result?;
return e;
}
Err(e) => {
return Err(e);
}
};
progress.objects_uploaded(total_objects, total_object_bytes, elapsed);
info!("Finished extract job");
Ok(())
}
.instrument(span)
.await
}
fn read_tar_entries_blocking(
target_bucket: Box<dyn Bucket>,
filters: Vec<ExtractFilter>,
reader: CountingReader<Box<dyn Read + Send + Sync>>,
progress: Arc<dyn ExtractProgressCallback>,
entry_sender: mpsc::Sender<TarEntryComponent>,
) -> Result<()> {
let mut archive = tar::Archive::new(reader);
let mut extracted_objects = 0usize;
let mut extracted_object_bytes = 0u64;
let mut skipped_objects = 0usize;
let mut skipped_object_bytes = 0u64;
let started = Instant::now();
for result in archive
.entries()
.with_context(|_| crate::error::TarReadSnafu {})?
{
let mut entry = result.with_context(|_| crate::error::TarReadSnafu {})?;
let path = entry
.path()
.with_context(|_| crate::error::TarReadSnafu {})?
.into_owned();
let len = entry.size();
let key = path.display().to_string();
let span = debug_span!("Processing tar entry", path = %path.display(), len);
let _guard = span.enter();
let included = if filters.is_empty() {
debug!("No filters are applied so this entry is included");
true
} else if let Some(filter) = filters.iter().find(|filter| filter.matches(&path)) {
debug!(?filter, "Filter matched so this entry is included");
true
} else {
debug!("No filters matched, so this entry is excluded");
false
};
if !included {
progress.extract_object_skipped(&key, len);
skipped_objects += 1;
skipped_object_bytes += len;
continue;
}
progress.extract_object_starting(&key, len);
match target_bucket.partition_for_multipart_upload(&key, len)? {
None => {
debug!("Processing entry as a small file");
let data = Self::read_data(&mut entry, len as usize)?;
progress.extract_object_part_read(&key, len as usize);
progress.extract_object_finished(&key, len);
extracted_objects += 1;
extracted_object_bytes += len;
Self::send_component(
TarEntryComponent::SmallFile { path, data },
&entry_sender,
)?;
}
Some(parts) => {
debug!(num_parts = parts.len(), "Processing entry as multi-part");
Self::send_component(
TarEntryComponent::StartMultipartFile {
path,
len,
parts: parts.clone(),
},
&entry_sender,
)?;
for part in parts {
let len = part.end - part.start;
let data = Self::read_data(&mut entry, len as usize)?;
progress.extract_object_part_read(&key, data.len());
let component = TarEntryComponent::FilePart { part, data };
Self::send_component(component, &entry_sender)?;
}
Self::send_component(TarEntryComponent::EndMultipartFile, &entry_sender)?;
progress.extract_object_finished(&key, len);
extracted_objects += 1;
extracted_object_bytes += len;
}
}
}
debug!("Completed processing all tar entries");
let reader = archive.into_inner();
progress.extract_finished(
extracted_objects,
extracted_object_bytes,
skipped_objects,
skipped_object_bytes,
reader.total_bytes_read(),
started.elapsed(),
);
Ok(())
}
fn read_data(reader: &mut impl Read, len: usize) -> Result<Bytes> {
let mut bytes = BytesMut::zeroed(len);
reader
.read_exact(&mut bytes)
.with_context(|_| crate::error::TarReadSnafu {})?;
Ok(bytes.freeze())
}
fn send_component(
component: TarEntryComponent,
sender: &mpsc::Sender<TarEntryComponent>,
) -> Result<()> {
if sender.blocking_send(component).is_err() {
debug!("TarEntryComponent receiver task dropped the receiver; aborting blocking reader task");
return crate::error::TarExtractAbortedSnafu {}.fail();
}
Ok(())
}
#[instrument(skip(target_bucket, progress, entry_receiver), fields(target_bucket = target_bucket.name()))]
async fn process_tar_entries(
max_concurrent_requests: usize,
target_bucket: Box<dyn Bucket>,
target_prefix: String,
progress: Arc<dyn ExtractProgressCallback>,
entry_receiver: mpsc::Receiver<TarEntryComponent>,
) -> Result<(usize, u64, Duration)> {
let entry_receiver = tokio_stream::wrappers::ReceiverStream::new(entry_receiver);
struct State<InitFut, PartUploadFut> {
current_uploader: Option<Box<dyn MultipartUploader>>,
current_key: Option<String>,
init_fut: Option<InitFut>,
part_upload_futs: Vec<PartUploadFut>,
last_multipart_uploaded: Option<Range<u64>>,
total_objects: Arc<AtomicUsize>,
total_object_bytes: Arc<AtomicU64>,
}
let total_objects = Arc::new(AtomicUsize::new(0));
let total_object_bytes = Arc::new(AtomicU64::new(0));
let started = Instant::now();
let futs = entry_receiver.scan(
State {
current_uploader: None,
current_key: None,
init_fut: None,
part_upload_futs: Vec::new(),
last_multipart_uploaded: None,
total_objects: total_objects.clone(),
total_object_bytes: total_object_bytes.clone(),
},
move |state, tar_entry_component| {
let progress = progress.clone();
let fut = match tar_entry_component {
TarEntryComponent::SmallFile { path, data } => {
let key = format!("{}{}", target_prefix, path.display());
let len = data.len();
let target_bucket = target_bucket.clone();
let total_objects = state.total_objects.clone();
let total_object_bytes = state.total_object_bytes.clone();
async move {
debug!(path= %path.display(),len, %key, "Uploading small file");
progress.object_upload_starting(&key, len as u64);
target_bucket.put_small_object(key.clone(), data).await?;
progress.object_part_uploaded(&key, len);
progress.object_uploaded(&key, len as u64);
total_objects.fetch_add(1, Ordering::SeqCst);
total_object_bytes.fetch_add(len as u64, Ordering::SeqCst);
Ok(())
}
.boxed()
}
TarEntryComponent::StartMultipartFile {
path,
len,
parts
} => {
let key = format!("{}{}", target_prefix, path.display());
let uploader = target_bucket.start_multipart_upload(key.clone(),
parts);
state.current_key = Some(key.clone());
state.current_uploader = Some(uploader.clone());
state.part_upload_futs = Vec::new();
state.last_multipart_uploaded = None;
let init_fut = async move {
debug!(path = %path.display(), len, %key, "Starting multipart file upload");
uploader.init().await?;
progress.object_upload_starting(&key, len);
Ok(())
}.map_err(|e| {
Arc::new(e)
}).shared();
state.init_fut = Some(init_fut.clone());
init_fut
.map_err(|e| {
crate::error::TarFileStartMultipartFileSnafu { }.into_error(e)
})
.boxed()
}
TarEntryComponent::FilePart { data, part } => {
let uploader = state
.current_uploader
.clone()
.expect("BUG: Got FilePart without StartMultipartFile");
let total_object_bytes = state.total_object_bytes.clone();
let key = state.current_key.clone().expect("BUG: Missing current_key");
let init_fut = state.init_fut.clone().expect("BUG: Missing init_fut");
match &state.last_multipart_uploaded {
Some(last_part) => {
assert_eq!(last_part.end, part.start);
}
None => {
assert_eq!(0, part.start);
}
}
assert_eq!(part.end - part.start, data.len() as u64);
state.last_multipart_uploaded = Some(part.clone());
let part_fut = async move {
init_fut.await
.map_err(|e| {
crate::error::TarFileStartMultipartFileSnafu {}.into_error(e)
})?;
let len = data.len();
debug!(%key, ?part, len, "Uploading file part");
uploader.upload_part(part, data).await?;
progress.object_part_uploaded(&key, len);
total_object_bytes.fetch_add(len as u64, Ordering::SeqCst);
Ok(())
}.map_err(|e| {
Arc::new(e)
}).shared();
state.part_upload_futs.push(part_fut.clone());
part_fut
.map_err(|e| {
crate::error::TarFilePartUploadSnafu { }.into_error(e)
})
.boxed()
}
TarEntryComponent::EndMultipartFile => {
let uploader = state
.current_uploader
.take()
.expect("BUG: Got FilePart without StartMultipartFile");
let last_part = state
.last_multipart_uploaded
.take()
.expect("BUG: Missing last_multipart_uploaded");
let key = state.current_key.clone().expect("BUG: Missing current_key");
let total_objects = state.total_objects.clone();
let part_futs = std::mem::take(&mut state.part_upload_futs);
async move {
debug!(%key, "Completing multi-part file upload");
futures::future::try_join_all(part_futs).await
.map_err(|e| {
crate::error::TarFilePartUploadSnafu {}.into_error(e)
})?;
uploader.finish().await?;
let len = last_part.end;
progress.object_uploaded(&key, len);
total_objects.fetch_add(1, Ordering::SeqCst);
Ok(())
}
.boxed()
}
};
futures::future::ready(Some(fut))
},
);
let futs = futs.boxed();
let mut futs = futs.buffered(max_concurrent_requests);
while let Some(()) = futs.try_next().await? {}
debug!("Entry sender dropped; no more tar entries to process");
Ok((
total_objects.load(Ordering::SeqCst),
total_object_bytes.load(Ordering::SeqCst),
started.elapsed(),
))
}
}
enum TarEntryComponent {
SmallFile { path: PathBuf, data: Bytes },
StartMultipartFile {
path: PathBuf,
len: u64,
parts: Vec<Range<u64>>,
},
FilePart { part: Range<u64>, data: Bytes },
EndMultipartFile,
}