use std::marker::Unpin;
use std::path::PathBuf;
use aws_sdk_s3::primitives::ByteStream;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_stream::Stream;
use tokio_stream::StreamExt;
use tracing::log;
use url::Url;
use crate::io::remote::HostConfig;
use crate::io::remote::Remote;
use crate::io::storage::Storage;
use crate::io::ParquetWriter;
use crate::manifest::Header;
use crate::manifest::Manifest;
use crate::manifest::Row;
use crate::manifest::Table;
use crate::manifest::TopHasher;
use crate::uri::Host;
use crate::uri::ManifestUri;
use crate::uri::ManifestUriLegacy;
use crate::uri::ObjectUri;
use crate::uri::RevisionPointer;
use crate::uri::S3PackageHandle;
use crate::uri::S3PackageUri;
use crate::uri::S3Uri;
use crate::uri::Tag;
use crate::uri::TagUri;
use crate::Error;
use crate::Res;
async fn bytestream_to_string(bytestream: ByteStream) -> Res<String> {
let mut reader = bytestream.into_async_read();
let mut contents = Vec::new();
reader.read_to_end(&mut contents).await?;
String::from_utf8(contents).map_err(|err| Error::Utf8(err.utf8_error()))
}
async fn upload_legacy(
storage: &impl Storage,
remote: &impl Remote,
manifest_path: &PathBuf,
manifest_uri: &ManifestUri,
) -> Res {
let s3_uri: S3Uri = ManifestUriLegacy::from(manifest_uri).into();
let jsonl = Manifest::from_table(&Table::read_from_path(storage, manifest_path).await?)
.await?
.to_jsonlines()
.as_bytes()
.to_vec();
remote
.put_object(&manifest_uri.catalog, &s3_uri, jsonl)
.await
}
async fn upload_from(
storage: &impl Storage,
remote: &impl Remote,
manifest_path: &PathBuf,
manifest_uri: &ManifestUri,
) -> Res {
let body = storage.read_byte_stream(manifest_path).await?;
log::info!("Writing remote manifest to {manifest_uri:?}");
remote
.put_object(&manifest_uri.catalog, &manifest_uri.into(), body)
.await
}
pub async fn upload_manifest(
storage: &impl Storage,
remote: &impl Remote,
manifest_uri: &ManifestUri,
path: &PathBuf,
) -> Res {
upload_from(storage, remote, path, manifest_uri).await?;
log::info!("Parque file uploaded");
upload_legacy(storage, remote, path, manifest_uri).await?;
log::info!("JSONL file uploaded");
log::info!("Uploaded remote manifest: {manifest_uri:?}");
Ok(())
}
pub async fn tag_timestamp(
remote: &impl Remote,
manifest_uri: &ManifestUri,
timestamp: chrono::DateTime<chrono::Utc>,
) -> Res {
let tag_timestamp = TagUri::timestamp(manifest_uri.clone(), timestamp);
upload_tag(remote, manifest_uri, tag_timestamp).await
}
pub async fn tag_latest(remote: &impl Remote, manifest_uri: &ManifestUri) -> Res {
let tag_latest = TagUri::latest(manifest_uri.clone().into());
upload_tag(remote, manifest_uri, tag_latest).await
}
async fn upload_tag(remote: &impl Remote, manifest_uri: &ManifestUri, tag_uri: TagUri) -> Res {
remote
.put_object(
&manifest_uri.catalog,
&tag_uri.into(),
manifest_uri.hash.as_bytes().to_vec(),
)
.await
}
pub async fn resolve_tag(
remote: &impl Remote,
host: &Option<Host>,
uri: &S3PackageHandle,
tag: Tag,
) -> Res<ManifestUri> {
let tag_uri = TagUri::new(uri.bucket.clone(), uri.namespace.clone(), tag);
let stream = remote.get_object_stream(host, &tag_uri.into()).await?;
let hash = bytestream_to_string(stream.body).await?;
let S3PackageHandle { bucket, namespace } = uri.to_owned();
let catalog = host.to_owned();
Ok(ManifestUri {
hash,
bucket,
namespace,
catalog,
})
}
async fn resolve_top_hash(
remote: &impl Remote,
host: &Option<Host>,
uri: &S3PackageUri,
) -> Res<String> {
match &uri.revision {
RevisionPointer::Hash(top_hash) => Ok(top_hash.clone()),
RevisionPointer::Tag(tag_str) => {
Ok(resolve_tag(remote, host, &uri.into(), tag_str.parse()?)
.await?
.hash)
}
}
}
pub async fn resolve_manifest_uri(
remote: &impl Remote,
host: &Option<Host>,
uri: &S3PackageUri,
) -> Res<ManifestUri> {
let bucket = uri.bucket.clone();
let namespace = uri.namespace.clone();
let hash = resolve_top_hash(remote, host, uri).await?;
let catalog = host.to_owned();
Ok(ManifestUri {
bucket,
namespace,
hash,
catalog,
})
}
pub async fn upload_row(
remote: &impl Remote,
host_config: &HostConfig,
package_handle: S3PackageHandle,
row: Row,
) -> Res<Row> {
let local_url = Url::parse(&row.place)?;
if local_url.scheme() != "file" {
return Err(Error::FileUri(local_url));
}
let file_path = local_url
.to_file_path()
.map_err(|_| Error::FileUri(local_url))?;
let object_uri = ObjectUri::new(package_handle, row.name.clone());
log::info!("Uploading to S3: {object_uri}");
let (remote_url, hash) = remote
.upload_file(host_config, &file_path, &object_uri.into(), row.size)
.await?;
let place = remote_url.to_string();
Ok(Row {
hash: hash.into(),
place,
..row
})
}
struct WritableManifest {
writer: ParquetWriter,
}
impl TryFrom<File> for WritableManifest {
type Error = Error;
fn try_from(file: File) -> Result<Self, Self::Error> {
Ok(WritableManifest {
writer: file.try_into()?,
})
}
}
impl WritableManifest {
pub async fn insert_header(&mut self, header: Header) -> Res {
let header_chunk: StreamRowsChunk = vec![Ok(header.into())];
self.writer.insert(header_chunk).await
}
pub async fn insert(&mut self, chunk: StreamRowsChunk) -> Res {
self.writer.insert(chunk).await
}
pub async fn flush(self) -> Res {
self.writer.flush().await
}
}
pub type StreamRowsChunk = Vec<Res<Row>>;
pub type StreamItem = Res<StreamRowsChunk>;
pub trait RowsStream: Stream<Item = StreamItem> {}
impl<T: Stream<Item = StreamItem>> RowsStream for T {}
pub async fn build_manifest_from_rows_stream(
storage: &impl Storage,
dest_dir: PathBuf,
header: Header,
mut stream: impl RowsStream + Unpin,
) -> Res<(PathBuf, String)> {
let temp_dir = tempfile::tempdir()?;
let temp_path = temp_dir.path().join("manifest.pq");
log::info!("Temp path for creating manifest {temp_path:?}");
let file = storage.create_file(&temp_path).await?;
let mut manifest = WritableManifest::try_from(file)?;
let mut top_hasher = TopHasher::new();
top_hasher.append_header(&header)?;
manifest.insert_header(header).await?;
while let Some(Ok(rows)) = stream.next().await {
for row in &rows {
match row {
Ok(row) => top_hasher.append(row)?,
Err(err) => return Err(Error::Table(err.to_string())),
}
}
manifest.insert(rows).await?;
}
manifest.flush().await?;
let top_hash = top_hasher.finalize();
let dest_path = dest_dir.join(&top_hash);
storage.create_dir_all(&dest_dir).await?;
storage.rename(temp_path, &dest_path).await?;
Ok((dest_path, top_hash))
}
#[cfg(test)]
mod tests {
use super::*;
use test_log::test;
use tokio_stream;
use crate::fixtures::manifest_empty;
use crate::io::remote::mocks::MockRemote;
use crate::io::storage::mocks::MockStorage;
#[test(tokio::test)]
async fn test_resolve_existing_hash() -> Res {
let uri = S3PackageUri::try_from("quilt+s3://b#package=foo/bar@hjknlmn")?;
let remote = MockRemote::default();
let top_hash = resolve_top_hash(&remote, &None, &uri).await?;
assert_eq!(top_hash, "hjknlmn".to_string(),);
Ok(())
}
#[test(tokio::test)]
async fn test_resolve_remote_hash() -> Res {
let uri = S3PackageUri::try_from("quilt+s3://b#package=foo/bar")?;
let remote = MockRemote::default();
remote
.put_object(
&None,
&S3Uri::try_from("s3://b/.quilt/named_packages/foo/bar/latest")?,
b"abcdef".to_vec(),
)
.await?;
let top_hash = resolve_top_hash(&remote, &None, &uri).await?;
assert_eq!(top_hash, "abcdef".to_string(),);
Ok(())
}
#[test(tokio::test)]
async fn test_empty_manifest_header_empty() -> Res {
let storage = MockStorage::default();
let dest_dir = storage.temp_dir.path();
let (dest_path, top_hash) = build_manifest_from_rows_stream(
&storage,
dest_dir.to_path_buf(),
manifest_empty::empty().header,
tokio_stream::empty(),
)
.await?;
assert_eq!(
dest_path,
dest_dir.join(manifest_empty::EMPTY_EMPTY_TOP_HASH)
);
assert_eq!(top_hash, manifest_empty::EMPTY_EMPTY_TOP_HASH);
Ok(())
}
#[test(tokio::test)]
async fn test_empty_manifest_header_empty_none() -> Res {
let storage = MockStorage::default();
let dest_dir = storage.temp_dir.path();
let (dest_path, top_hash) = build_manifest_from_rows_stream(
&storage,
dest_dir.to_path_buf(),
manifest_empty::empty_none().header,
tokio_stream::empty(),
)
.await?;
assert_eq!(
dest_path,
dest_dir.join(manifest_empty::EMPTY_NONE_TOP_HASH)
);
assert_eq!(top_hash, manifest_empty::EMPTY_NONE_TOP_HASH);
Ok(())
}
#[test(tokio::test)]
async fn test_empty_manifest_header_empty_null() -> Res {
let storage = MockStorage::default();
let dest_dir = storage.temp_dir.path();
let (dest_path, top_hash) = build_manifest_from_rows_stream(
&storage,
dest_dir.to_path_buf(),
manifest_empty::empty_null().header,
tokio_stream::empty(),
)
.await?;
assert_eq!(
dest_path,
dest_dir.join(manifest_empty::EMPTY_NULL_TOP_HASH)
);
assert_eq!(top_hash, manifest_empty::EMPTY_NULL_TOP_HASH);
Ok(())
}
#[test(tokio::test)]
async fn test_empty_manifest_header_null_empty() -> Res {
let storage = MockStorage::default();
let dest_dir = storage.temp_dir.path();
let (dest_path, top_hash) = build_manifest_from_rows_stream(
&storage,
dest_dir.to_path_buf(),
manifest_empty::null_empty().header,
tokio_stream::empty(),
)
.await?;
assert_eq!(
dest_path,
dest_dir.join(manifest_empty::NULL_EMPTY_TOP_HASH)
);
assert_eq!(top_hash, manifest_empty::NULL_EMPTY_TOP_HASH);
Ok(())
}
#[test(tokio::test)]
async fn test_empty_manifest_header_null_none() -> Res {
let storage = MockStorage::default();
let dest_dir = storage.temp_dir.path();
let (dest_path, top_hash) = build_manifest_from_rows_stream(
&storage,
dest_dir.to_path_buf(),
manifest_empty::null_none().header,
tokio_stream::empty(),
)
.await?;
assert_eq!(dest_path, dest_dir.join(manifest_empty::NULL_NONE_TOP_HASH));
assert_eq!(top_hash, manifest_empty::NULL_NONE_TOP_HASH);
Ok(())
}
#[test(tokio::test)]
async fn test_empty_manifest_header_null_null() -> Res {
let storage = MockStorage::default();
let dest_dir = storage.temp_dir.path();
let (dest_path, top_hash) = build_manifest_from_rows_stream(
&storage,
dest_dir.to_path_buf(),
manifest_empty::null().header,
tokio_stream::empty(),
)
.await?;
assert_eq!(dest_path, dest_dir.join(manifest_empty::NULL_NULL_TOP_HASH));
assert_eq!(top_hash, manifest_empty::NULL_NULL_TOP_HASH);
Ok(())
}
}