use aws_sdk_s3::types::Object;
use chrono;
use futures::future::try_join_all;
use tokio_stream::StreamExt;
use tracing::log;
use crate::io::manifest::build_manifest_from_rows_stream;
use crate::io::manifest::tag_latest;
use crate::io::manifest::tag_timestamp;
use crate::io::manifest::upload_manifest;
use crate::io::manifest::RowsStream;
use crate::io::remote::Remote;
use crate::io::remote::S3Attributes;
use crate::io::remote::StreamItem;
use crate::io::storage::Storage;
use crate::manifest::Header;
use crate::manifest::Row;
use crate::paths::DomainPaths;
use crate::perf::Measure;
use crate::uri::ManifestUri;
use crate::uri::S3PackageUri;
use crate::uri::S3Uri;
use crate::Res;
async fn get_object_attributes_inner(
storage: &impl Storage,
remote: &impl Remote,
listing_uri: &S3Uri,
object: Res<Object>,
) -> Res<S3Attributes> {
let object_key = object?
.key
.clone()
.expect("object key expected to be present");
match remote.get_object_attributes(listing_uri, &object_key).await {
Ok(attrs) => Ok(attrs),
Err(err) => {
log::warn!("Error getting attributes: {}", err);
storage
.get_object_attributes(listing_uri, &object_key)
.await
}
}
}
async fn get_object_attributes(
storage: &impl Storage,
remote: &impl Remote,
listing_uri: S3Uri,
objects: StreamItem,
) -> Res<Vec<S3Attributes>> {
try_join_all(
objects?
.into_iter()
.map(|object| get_object_attributes_inner(storage, remote, &listing_uri, object))
.collect::<Vec<_>>(),
)
.await
}
async fn stream_objects<'a>(
storage: &'a impl Storage,
remote: &'a impl Remote,
listing_uri: S3Uri,
) -> impl RowsStream + 'a {
let stream = remote.list_objects(listing_uri.clone()).await;
stream
.then(move |objs| get_object_attributes(storage, remote, listing_uri.clone(), objs))
.map(|result| {
result.map(move |objs| objs.into_iter().map(|obj| Ok(Row::from(obj))).collect())
})
}
pub async fn package_s3_prefix(
paths: &DomainPaths,
storage: &(impl Storage + Sync),
remote: &impl Remote,
source_uri: &S3Uri,
dest_uri: S3PackageUri,
) -> Res<ManifestUri> {
log::debug!("Source URI: {:?}, target URI: {:?}", source_uri, dest_uri);
let perf = Measure::start();
let stream = Box::pin(stream_objects(storage, remote, source_uri.clone()).await);
let manifest_path = |t: &str| paths.manifest_cache(&source_uri.bucket, t);
let (cache_path, top_hash) =
build_manifest_from_rows_stream(storage, manifest_path, Header::default(), stream).await?;
let S3PackageUri {
bucket, namespace, ..
} = dest_uri;
let manifest_uri = ManifestUri {
bucket,
namespace,
hash: top_hash,
};
let perf = perf.elapsed();
log::info!("Created manifest {:?} for {}", manifest_uri, perf);
upload_manifest(storage, remote, &manifest_uri, &cache_path).await?;
log::debug!("Manifest uploaded for {}", perf.elapsed());
tag_timestamp(remote, &manifest_uri, chrono::Utc::now()).await?;
log::debug!("Timestamp tag uploaded");
tag_latest(remote, &manifest_uri).await?;
log::debug!("Latest uploaded");
Ok(manifest_uri)
}