use crate::objstore::{Bucket, ObjectStorageFactory};
use crate::tar::TarBuilderWrapper;
use crate::{Config, Result};
use chrono::{DateTime, Utc};
use futures::StreamExt;
use itertools::Itertools;
use snafu::prelude::*;
use std::fmt::Display;
use std::future::Future;
use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::AsyncWrite;
use tokio::sync::oneshot;
use tracing::{debug, error, instrument};
use url::Url;
pub enum TargetArchive {
ObjectStorage(Url),
File(PathBuf),
Writer(Box<dyn AsyncWrite + Send + Unpin>),
}
impl std::fmt::Debug for TargetArchive {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ObjectStorage(url) => f.debug_tuple("ObjectStorage").field(url).finish(),
Self::File(path) => f.debug_tuple("File").field(path).finish(),
Self::Writer(_) => f
.debug_tuple("Writer")
.field(&"dyn AsyncWrite".to_string())
.finish(),
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct CreateArchiveInput {
bucket: Box<dyn Bucket>,
selector: ObjectSelector,
}
impl Display for CreateArchiveInput {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.selector {
ObjectSelector::Bucket => {
write!(f, "Bucket {}", self.bucket.name())
}
ObjectSelector::Object {
key,
version_id: None,
} => write!(f, "Object '{key}' in bucket '{}'", self.bucket.name()),
ObjectSelector::Object {
key,
version_id: Some(version_id),
} => write!(
f,
"Object '{key}' (version '{version_id}') in bucket '{}'",
self.bucket.name()
),
ObjectSelector::Prefix { prefix } => write!(
f,
"Objects with prefix '{prefix}' in bucket '{}'",
self.bucket.name()
),
ObjectSelector::Glob { pattern } => write!(
f,
"Objects matching glob '{pattern}' in bucket '{}'",
self.bucket.name()
),
}
}
}
impl CreateArchiveInput {
fn parse_key(bucket: Box<dyn Bucket>, key: Option<String>) -> Result<Self> {
match key {
None => {
Ok(Self {
bucket,
selector: ObjectSelector::Bucket,
})
}
Some(key) => {
if key.contains('*') || key.contains('?') || key.contains('[') || key.contains(']')
{
Ok(Self {
bucket,
selector: ObjectSelector::Glob { pattern: key },
})
} else if key.ends_with('/') {
Ok(Self {
bucket,
selector: ObjectSelector::Prefix { prefix: key },
})
} else {
Ok(Self {
bucket,
selector: ObjectSelector::Object {
key,
version_id: None,
},
})
}
}
}
}
#[instrument(err, skip(self))]
async fn into_input_objects(self) -> Result<Vec<InputObject>> {
let input_text = self.to_string();
let input_objects = self.into_possible_input_objects().await?;
if input_objects.is_empty() {
crate::error::SelectorMatchesNoObjectsSnafu { input: input_text }.fail()
} else {
Ok(input_objects)
}
}
#[instrument(err, skip(self))]
async fn into_possible_input_objects(self) -> Result<Vec<InputObject>> {
let input_text = self.to_string();
debug!(self = %input_text, "Listing all object store objects that match this archive input");
let input_objects = self.bucket.list_matching_objects(self.selector).await?;
debug!(
self = %input_text,
count = input_objects.len(),
"Listing matching objects completed"
);
Ok(input_objects)
}
}
#[derive(Clone, Debug)]
pub(crate) enum ObjectSelector {
Object {
key: String,
version_id: Option<String>,
},
Prefix {
prefix: String,
},
Bucket,
Glob {
pattern: String,
},
}
#[derive(Clone, Debug)]
pub(crate) struct InputObject {
pub bucket: Box<dyn Bucket>,
pub key: String,
pub version_id: Option<String>,
pub size: u64,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
impl InputObject {
fn into_parts(self, config: &Config) -> Vec<InputObjectPart> {
if self.size < config.multipart_threshold.get_bytes() as u64 {
vec![InputObjectPart {
part_number: 0,
byte_range: 0..self.size,
input_object: Arc::new(self),
}]
} else {
let me = Arc::new(self);
let mut parts = Vec::with_capacity(
((me.size + config.multipart_chunk_size.get_bytes() as u64 - 1)
/ config.multipart_chunk_size.get_bytes() as u64) as usize,
);
let mut part_number = 0;
let mut byte_offset = 0u64;
while byte_offset < me.size {
let byte_length =
(config.multipart_chunk_size.get_bytes() as u64).min(me.size - byte_offset);
parts.push(InputObjectPart {
input_object: me.clone(),
part_number,
byte_range: byte_offset..(byte_offset + byte_length),
});
part_number += 1;
byte_offset += byte_length;
}
parts
}
}
}
struct InputObjectPart {
input_object: Arc<InputObject>,
part_number: usize,
byte_range: Range<u64>,
}
#[derive(Debug)]
pub struct CreateArchiveJobBuilder {
config: Config,
target: TargetArchive,
inputs: Vec<CreateArchiveInput>,
allow_empty: bool,
}
impl CreateArchiveJobBuilder {
pub fn new(config: Config, target: TargetArchive) -> Self {
Self {
config,
target,
inputs: vec![],
allow_empty: false,
}
}
pub fn allow_empty(&mut self, allow: bool) {
self.allow_empty = allow;
}
pub async fn add_input(&mut self, input: &Url) -> Result<()> {
debug!(url = %input, "Adding archive input");
let objstore = ObjectStorageFactory::from_url(self.config.clone(), input).await?;
let (bucket, key, _version_id) = objstore.parse_url(input).await?;
debug!(url = %input, ?bucket, "Confirmed bucket access for input");
let input = CreateArchiveInput::parse_key(bucket, key)?;
debug!(?input, "Adding archive input to job");
self.inputs.push(input);
Ok(())
}
pub async fn build(self) -> Result<CreateArchiveJob> {
debug!(
input_count = self.inputs.len(),
"Listing objects for all inputs"
);
let mut inputs = if self.allow_empty {
let input_futs = self
.inputs
.into_iter()
.map(move |input| input.into_possible_input_objects());
futures::future::try_join_all(input_futs).await?
} else {
let input_futs = self
.inputs
.into_iter()
.map(move |input| input.into_input_objects());
futures::future::try_join_all(input_futs).await?
}
.into_iter()
.flatten()
.collect::<Vec<_>>();
debug!(
object_count = inputs.len(),
"Listed all objects in all inputs"
);
inputs.sort_unstable_by_key(|input_object| input_object.timestamp);
let inputs = inputs
.into_iter()
.dedup_by(|x, y| {
x.bucket.name() == y.bucket.name() && x.key == y.key && x.version_id == y.version_id
})
.collect::<Vec<_>>();
Ok(CreateArchiveJob {
config: self.config,
target: self.target,
inputs,
allow_empty: self.allow_empty,
})
}
}
#[allow(unused_variables)]
pub trait CreateProgressCallback: Sync + Send {
fn input_objects_download_starting(&self, total_objects: usize, total_bytes: u64) {}
fn input_object_download_started(
&self,
bucket: &str,
key: &str,
version_id: Option<&str>,
size: u64,
) {
}
fn input_part_unordered_downloaded(
&self,
bucket: &str,
key: &str,
version_id: Option<&str>,
part_number: usize,
part_size: usize,
) {
}
fn input_part_downloaded(
&self,
bucket: &str,
key: &str,
version_id: Option<&str>,
part_number: usize,
part_size: usize,
) {
}
fn input_object_download_completed(
&self,
bucket: &str,
key: &str,
version_id: Option<&str>,
size: u64,
) {
}
fn input_objects_download_completed(&self, total_bytes: u64, duration: Duration) {}
fn archive_initialized(
&self,
total_objects: usize,
total_bytes: u64,
estimated_archive_size: u64,
) {
}
fn archive_part_written(
&self,
bucket: &str,
key: &str,
version_id: Option<&str>,
part_number: usize,
part_size: usize,
) {
}
fn archive_object_written(
&self,
bucket: &str,
key: &str,
version_id: Option<&str>,
timestamp: DateTime<Utc>,
byte_offset: u64,
size: u64,
) {
}
fn archive_bytes_written(&self, bytes_written: usize) {}
fn archive_writes_completed(&self, total_bytes_written: u64) {}
fn archive_bytes_uploaded(&self, bytes_uploaded: usize) {}
fn archive_upload_completed(&self, size: u64, duration: Duration) {}
}
#[derive(Debug)]
pub struct CreateArchiveJob {
config: Config,
target: TargetArchive,
inputs: Vec<InputObject>,
allow_empty: bool,
}
impl CreateArchiveJob {
pub fn total_bytes(&self) -> u64 {
self.inputs.iter().map(|input| input.size).sum()
}
pub fn total_objects(&self) -> usize {
self.inputs.len()
}
pub async fn run_without_progress(self, abort: impl Future<Output = ()>) -> Result<()> {
struct NoProgress {}
impl CreateProgressCallback for NoProgress {}
self.run(abort, NoProgress {}).await
}
pub async fn run<Abort, Progress>(self, _abort: Abort, progress: Progress) -> Result<()>
where
Abort: Future<Output = ()>,
Progress: CreateProgressCallback + 'static,
{
let progress: Arc<dyn CreateProgressCallback> = Arc::new(progress);
let total_bytes = self.total_bytes();
let total_objects = self.total_objects();
progress.input_objects_download_starting(total_objects, total_bytes);
if total_objects == 0 && !self.allow_empty {
return crate::error::NoInputsSnafu {}.fail();
}
let approx_archive_size = total_bytes + (total_objects as u64 * 512);
let (writer, result_receiver): (
Box<dyn AsyncWrite + Send + Unpin>,
Option<oneshot::Receiver<Result<u64>>>,
) = match self.target {
TargetArchive::ObjectStorage(url) => {
let objstore = ObjectStorageFactory::from_url(self.config.clone(), &url).await?;
let (bucket, key, _) = objstore.parse_url(&url).await?;
let key = key.ok_or_else(|| {
crate::error::ArchiveUrlInvalidSnafu { url: url.clone() }.build()
})?;
let (bytes_writer, mut progress_receiver, result_receiver) = bucket
.create_object_writer(key, Some(approx_archive_size))
.await?;
let progress = progress.clone();
tokio::spawn(async move {
while let Some(bytes_uploaded) = progress_receiver.recv().await {
progress.archive_bytes_uploaded(bytes_uploaded);
}
});
(Box::new(bytes_writer), Some(result_receiver))
}
TargetArchive::File(path) => {
let bytes_writer = tokio::fs::File::create(&path)
.await
.with_context(|_| crate::error::WritingArchiveFileSnafu { path })?;
(Box::new(bytes_writer), None)
}
TargetArchive::Writer(writer) => (writer, None),
};
let blocking_writer = crate::async_bridge::async_write_as_writer(writer);
let tar_builder = TarBuilderWrapper::new(blocking_writer, progress.clone());
progress.archive_initialized(total_objects, total_bytes, approx_archive_size);
#[allow(clippy::needless_collect)] let parts = self
.inputs
.into_iter()
.flat_map(|input_object| input_object.into_parts(&self.config))
.collect::<Vec<_>>();
let progress_clone = progress.clone();
let part_futs = parts.into_iter().map(move |part| {
let progress = progress_clone.clone();
async move {
let data = part
.input_object
.bucket
.read_object_part(
part.input_object.key.clone(),
part.input_object.version_id.clone(),
part.byte_range.clone(),
)
.await?;
progress.input_part_unordered_downloaded(
part.input_object.bucket.name(),
&part.input_object.key,
part.input_object.version_id.as_deref(),
part.part_number,
(part.byte_range.end - part.byte_range.start) as usize,
);
Ok((part, data))
}
});
let mut parts_stream =
futures::stream::iter(part_futs).buffered(self.config.max_concurrent_requests);
let (parts_sender, mut parts_receiver) =
tokio::sync::mpsc::channel(self.config.max_concurrent_requests);
{
let progress = progress.clone();
tokio::spawn(async move {
let mut total_bytes_downloaded = 0u64;
let input_objects_download_started = Instant::now();
while let Some(result) = parts_stream.next().await {
if let Ok((part, _data)) = &result {
total_bytes_downloaded += part.byte_range.end - part.byte_range.start;
if part.part_number == 0 {
progress.input_object_download_started(
part.input_object.bucket.name(),
&part.input_object.key,
part.input_object.version_id.as_deref(),
part.input_object.size,
);
}
progress.input_part_downloaded(
part.input_object.bucket.name(),
&part.input_object.key,
part.input_object.version_id.as_deref(),
part.part_number,
(part.byte_range.end - part.byte_range.start) as usize,
);
if part.byte_range.end == part.input_object.size {
progress.input_object_download_completed(
part.input_object.bucket.name(),
&part.input_object.key,
part.input_object.version_id.as_deref(),
part.input_object.size,
);
}
}
if (parts_sender.send(result).await).is_err() {
debug!("parts channel is closed; aborting feeder task");
break;
}
}
if !parts_sender.is_closed() {
progress.input_objects_download_completed(
total_bytes_downloaded,
input_objects_download_started.elapsed(),
);
}
});
}
let mut tar_archive_writes_started: Option<Instant> = None;
loop {
match parts_receiver.recv().await {
None => {
debug!("Completed processing of all input objects");
break;
}
Some(result) => {
let (mut part, data) = result?;
assert_eq!(0, part.part_number, "BUG: the parts are completing out of order or there's a logic error in this loop");
debug!(key = %part.input_object.key, size = part.input_object.size, "Reading object and writing to tar archive");
let (sender, receiver) = tokio::sync::mpsc::channel::<Result<bytes::Bytes>>(1);
let blocking_reader = crate::async_bridge::stream_as_reader(
tokio_stream::wrappers::ReceiverStream::new(receiver),
);
let tar_builder = tar_builder.clone();
let mut header = tar::Header::new_gnu();
header.set_size(part.input_object.size);
header.set_cksum();
let object_path = part.input_object.key.clone();
let append_fut =
tar_builder.spawn_append_data(header, object_path, blocking_reader);
let mut appender_aborted = false;
if (sender.send(Ok(data)).await).is_ok() {
if tar_archive_writes_started.is_none() {
tar_archive_writes_started = Some(Instant::now());
}
progress.archive_part_written(
part.input_object.bucket.name(),
&part.input_object.key,
part.input_object.version_id.as_deref(),
part.part_number,
(part.byte_range.end - part.byte_range.start) as usize,
);
while part.byte_range.end < part.input_object.size {
let (next_part, data) =
parts_receiver.recv().await.unwrap_or_else(|| {
panic!(
"BUG: stream ended prematurely after part {} of input {}",
part.part_number, part.input_object.key
)
})?;
assert_eq!(next_part.input_object.key, part.input_object.key);
assert_eq!(next_part.part_number, part.part_number + 1);
assert_eq!(next_part.byte_range.start, part.byte_range.end);
if (sender.send(Ok(data)).await).is_err() {
appender_aborted = true;
break;
} else {
progress.archive_part_written(
next_part.input_object.bucket.name(),
&next_part.input_object.key,
next_part.input_object.version_id.as_deref(),
next_part.part_number,
(next_part.byte_range.end - next_part.byte_range.start)
as usize,
);
}
part = next_part;
}
} else {
appender_aborted = true;
}
drop(sender);
let data_range = append_fut.await?;
assert_eq!(part.input_object.size, data_range.end - data_range.start,
"BUG: reported data range doesn't match the expected size of the object's data");
assert!(
!appender_aborted,
"BUG: data channel for writing to tar archive was closed without any error"
);
progress.archive_object_written(
part.input_object.bucket.name(),
&part.input_object.key,
part.input_object.version_id.as_deref(),
part.input_object.timestamp,
data_range.start,
part.input_object.size,
);
debug!(key = %part.input_object.key,
size = part.input_object.size,
final_part_number = part.part_number,
"Streamed object data into tar archive");
}
}
}
let bytes_written = tar_builder.finish_and_close().await?;
progress.archive_writes_completed(bytes_written);
if let Some(result_receiver) = result_receiver {
match result_receiver.await {
Ok(Ok(bytes_written)) => {
debug!(
bytes_written,
"Upload of tar archive to object storage completed"
);
let elapsed = tar_archive_writes_started
.expect("BUG: is set unconditionally during tar writes")
.elapsed();
progress.archive_upload_completed(bytes_written, elapsed);
Ok(())
}
Ok(Err(error)) => {
error!(?error, "The async upload task which uploads the tar archive to object storage reported an error");
Err(error)
}
Err(_) => {
crate::error::AsyncTarWriterPanicSnafu {}.fail()
}
}
} else {
Ok(())
}
}
}