use tokio::io::AsyncReadExt;
use tokio_stream::StreamExt;
use tracing::debug;
use tracing::info;
use crate::io::manifest::build_manifest_from_rows_stream;
use crate::io::manifest::RowsStream;
use crate::io::remote::Remote;
use crate::io::storage::Storage;
use crate::manifest::Header;
use crate::manifest::Manifest;
use crate::manifest::Row;
use crate::manifest::Table;
use crate::paths::scaffold_paths;
use crate::paths::DomainPaths;
use crate::uri::ManifestUri;
use crate::uri::ManifestUriLegacy;
use crate::uri::S3Uri;
use crate::Res;
async fn stream_jsonl_rows(jsonl: Manifest) -> impl RowsStream {
tokio_stream::iter(jsonl.rows)
.map(Row::try_from)
.map(|rows| Ok(vec![rows]))
}
async fn is_parquet(remote: &impl Remote, manifest: &ManifestUri) -> Res<bool> {
remote
.exists(&manifest.catalog, &S3Uri::from(manifest))
.await
}
async fn fetch_parquet(remote: &impl Remote, manifest: &ManifestUri) -> Res<Vec<u8>> {
let s3_uri = S3Uri::from(manifest);
let mut contents = remote.get_object(&manifest.catalog, &s3_uri).await?;
let mut output = Vec::new();
contents.read_to_end(&mut output).await?;
Ok(output)
}
async fn fetch_jsonl(remote: &impl Remote, manifest_uri: &ManifestUri) -> Res<Manifest> {
let s3_uri: S3Uri = ManifestUriLegacy::from(manifest_uri).into();
let contents = remote.get_object(&manifest_uri.catalog, &s3_uri).await?;
Manifest::from_reader(contents).await
}
pub async fn cache_remote_manifest(
paths: &DomainPaths,
storage: &(impl Storage + Sync),
remote: &impl Remote,
manifest_uri: &ManifestUri,
) -> Res<Table> {
info!("⏳ Caching remote manifest: {}", manifest_uri.display());
scaffold_paths(storage, paths.required_local_domain_paths()).await?;
let cache_path = paths.manifest_cache(&manifest_uri.bucket, &manifest_uri.hash);
if !storage.exists(&cache_path).await {
debug!("🔍 Manifest does not exist in cache, fetching from remote");
if is_parquet(remote, manifest_uri).await? {
debug!(
"⏳ Manifest {} stored remotely in Parquet format. Fetching…",
manifest_uri.display()
);
let manifest = fetch_parquet(remote, manifest_uri).await?;
debug!("✔️ Fetched manifest. Size: {}", manifest.len());
storage.write_file(&cache_path, &manifest).await?;
debug!("✔️ Manifest has written to {}", cache_path.display());
} else {
debug!(
"⏳ Manifest {} stored remotely in JSONL format. Fetching…",
manifest_uri.display()
);
let manifest = fetch_jsonl(remote, manifest_uri).await?;
debug!("✔️ Fetched JSONL manifest");
let header = Header::from(&manifest);
let manifest_path = |_: &str| cache_path.clone();
let stream = stream_jsonl_rows(manifest).await;
let (dest_path, _) =
build_manifest_from_rows_stream(storage, manifest_path, header, stream).await?;
debug!(
"✔️ Manifest has converted to Parquet and written to {}",
dest_path.display()
);
};
} else {
debug!("✔️ Manifest exists already in {}", cache_path.display());
}
info!("✔️ Manifest {} was written …", manifest_uri.display());
let manifest = Table::read_from_path(storage, &cache_path).await?;
info!("✔️ … and, Successfully cached:\n{}", manifest);
Ok(manifest)
}
pub async fn browse_remote_manifest(
paths: &DomainPaths,
storage: &(impl Storage + Sync),
remote: &impl Remote,
manifest_uri: &ManifestUri,
) -> Res<Table> {
cache_remote_manifest(paths, storage, remote, manifest_uri).await
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use std::str::FromStr;
use crate::mocks;
#[tokio::test]
async fn test_if_cached() -> Res {
let paths = DomainPaths::default();
let manifest_uri = ManifestUri {
bucket: "a".to_string(),
namespace: ("f", "b").into(),
hash: "c".to_string(),
catalog: None,
};
let cache_path = paths.manifest_cache(&manifest_uri.bucket, &manifest_uri.hash);
let parquet = std::fs::read(mocks::manifest::parquet())?;
let storage = mocks::storage::MockStorage::default();
storage.write_file(&cache_path, &parquet).await?;
let remote = mocks::remote::MockRemote::default();
let cached_manifest =
cache_remote_manifest(&paths, &storage, &remote, &manifest_uri).await?;
assert_eq!(
cached_manifest.header.info.get("message").unwrap(),
"test_spec_write 2023-11-29T14:01:39.543975"
);
Ok(())
}
#[tokio::test]
async fn test_if_cached_random_file() -> Res {
let paths = DomainPaths::default();
let manifest = ManifestUri {
bucket: "a".to_string(),
namespace: ("f", "b").into(),
hash: "c".to_string(),
catalog: None,
};
let cache_path = paths.manifest_cache(&manifest.bucket, &manifest.hash);
let storage = mocks::storage::MockStorage::default();
storage.write_file(cache_path, &Vec::new()).await?;
let remote = mocks::remote::MockRemote::default();
let cached_manifest = cache_remote_manifest(&paths, &storage, &remote, &manifest).await;
assert_eq!(
cached_manifest.unwrap_err().to_string(),
"Parquet error: External: Invalid argument (os error 22)"
);
Ok(())
}
#[tokio::test]
async fn test_caching_parquet() -> Res {
let paths = DomainPaths::default();
let manifest = ManifestUri {
bucket: "a".to_string(),
namespace: ("f", "b").into(),
hash: "c".to_string(),
catalog: None,
};
let parquet = std::fs::read(mocks::manifest::parquet())?;
let remote = mocks::remote::MockRemote::default();
let remote_uri = S3Uri::from_str(&format!(
"s3://{}/.quilt/packages/1220{}.parquet",
manifest.bucket, manifest.hash
))?;
remote
.put_object(&manifest.catalog, &remote_uri, parquet.clone())
.await?;
let storage = mocks::storage::MockStorage::default();
let cached_manifest = cache_remote_manifest(&paths, &storage, &remote, &manifest).await?;
assert_eq!(
cached_manifest.header.info.get("message").unwrap(),
"test_spec_write 2023-11-29T14:01:39.543975"
);
let cache_path = PathBuf::from(format!(
".quilt/packages/{}/{}",
manifest.bucket, manifest.hash
));
assert_eq!(storage.read_file(cache_path).await?, parquet);
Ok(())
}
#[tokio::test]
async fn test_caching_jsonl() -> Res {
let paths = DomainPaths::default();
let manifest = ManifestUri {
bucket: "a".to_string(),
namespace: ("f", "b").into(),
hash: "c".to_string(),
catalog: None,
};
let jsonl = std::fs::read(mocks::manifest::jsonl())?;
let remote = mocks::remote::MockRemote::default();
let remote_uri = S3Uri::from_str(&format!(
"s3://{}/.quilt/packages/{}",
manifest.bucket, manifest.hash
))?;
remote
.put_object(&manifest.catalog, &remote_uri, jsonl.clone())
.await?;
let storage = mocks::storage::MockStorage::default();
let cached_manifest = cache_remote_manifest(&paths, &storage, &remote, &manifest).await?;
let cache_path = PathBuf::from(format!(
".quilt/packages/{}/{}",
manifest.bucket, manifest.hash
));
assert!(storage.exists(cache_path).await);
assert!(cached_manifest
.get_record(&PathBuf::from("README.md"))
.await?
.is_some());
Ok(())
}
}