use std::collections::hash_map::RandomState;
use std::collections::HashSet;
use std::path::PathBuf;
use aws_sdk_s3::error::DisplayErrorContext;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use url::Url;
use crate::paths::scaffold_paths;
use crate::paths::DomainPaths;
use crate::quilt::lineage::PackageLineage;
use crate::quilt::lineage::PathState;
use crate::quilt::manifest_handle::ReadableManifest;
use crate::quilt::storage::s3;
use crate::quilt::Storage;
use crate::s3_utils;
use crate::Error;
async fn cache_immutable_object(object_dest: &PathBuf, uri: &s3::S3Uri) -> Result<(), Error> {
let version = uri
.version
.clone()
.ok_or(Error::S3Uri("missing versionId in s3 URL".to_string()))?;
let mut file = File::create(&object_dest).await?;
let client = s3_utils::get_client_for_bucket(&uri.bucket).await?;
let mut object = client
.get_object()
.bucket(uri.bucket.clone())
.key(uri.key.clone())
.version_id(version)
.send()
.await
.map_err(|err| Error::S3(DisplayErrorContext(err).to_string()))?;
while let Some(bytes) = object
.body
.try_next()
.await
.map_err(|err| Error::S3(DisplayErrorContext(err).to_string()))?
{
file.write_all(&bytes).await?;
}
file.flush().await?;
Ok(())
}
async fn create_mutable_copy(
storage: &impl Storage,
immutable_source: &PathBuf,
mutable_target: &PathBuf,
) -> Result<chrono::DateTime<chrono::Utc>, Error> {
let parent_dir = mutable_target.parent();
if let Some(parent) = parent_dir {
storage.create_dir_all(parent).await?;
}
storage.copy(&immutable_source, &mutable_target).await?;
storage.modified_timestamp(&mutable_target).await
}
pub async fn install_paths(
mut lineage: PackageLineage,
manifest: &(impl ReadableManifest + Sync),
paths: &DomainPaths,
working_dir: PathBuf,
namespace: String,
storage: &(impl Storage + Sync),
entries_paths: &Vec<String>,
) -> Result<PackageLineage, Error> {
if entries_paths.is_empty() {
return Ok(lineage);
}
scaffold_paths(storage, paths.required_installed_package_paths(&namespace)).await?;
if !HashSet::<String, RandomState>::from_iter(lineage.paths.keys().cloned())
.is_disjoint(&HashSet::from_iter(entries_paths.to_owned()))
{
return Err(Error::InstallPath(
"some paths are already installed".to_string(),
));
}
let mut table = manifest.read(storage).await?;
for path in entries_paths {
let row = table
.records
.get_mut(path)
.ok_or(Error::Table(format!("path {} not found", path)))?;
let object_dest = paths.object(&row.hash);
if !storage.exists(&object_dest).await {
cache_immutable_object(&object_dest, &row.place.parse()?).await?;
}
row.place = Url::from_file_path(&object_dest)
.map_err(|_| {
Error::InstallPath(format!("Failed to create URL from {:?}", &object_dest))
})?
.to_string();
let working_dest = working_dir.join(&row.name);
let last_modified = create_mutable_copy(storage, &object_dest, &working_dest).await?;
lineage.paths.insert(
row.name.clone(),
PathState {
timestamp: last_modified,
hash: row.hash,
},
);
}
let installed_manifest_path = paths.installed_manifest(&namespace, lineage.current_hash());
table
.write_to_path(storage, &installed_manifest_path)
.await?;
Ok(lineage)
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use tempfile;
use crate::quilt::mocks;
use crate::quilt::storage::mock_storage::MockStorage;
#[tokio::test]
async fn test_installing_one_path() -> Result<(), Error> {
let working_dir = tempfile::tempdir()?;
let namespace = "foo/bar".to_string();
let domain_paths = &DomainPaths::new(working_dir.path().to_path_buf());
let storage = MockStorage::default();
storage
.write_file(
working_dir
.path()
.join(PathBuf::from(".quilt/objects/7065646573747269616e")),
&Vec::new(),
)
.await?;
let lineage = mocks::lineage::with_commit_hash("fghijk");
let entries_paths = vec!["a/a".to_string()];
let manifest = mocks::manifest::with_record_keys(entries_paths.clone());
assert!(lineage.paths.is_empty());
let lineage = install_paths(
lineage,
&manifest,
domain_paths,
working_dir.path().to_path_buf(),
namespace,
&storage,
&entries_paths,
)
.await?;
assert!(lineage.paths.contains_key("a/a"));
assert!(
storage
.exists(&working_dir.path().join(PathBuf::from("a/a")))
.await
);
Ok(())
}
#[tokio::test]
async fn test_installing_path_that_doesnt_exists_in_manifest() -> Result<(), Error> {
let lineage = mocks::lineage::with_commit_hash("fghijk");
let storage = MockStorage::default();
let entries_paths = vec!["z/z".to_string()];
let manifest = mocks::manifest::with_record_keys(vec!["a/a".to_string()]);
assert!(lineage.paths.is_empty());
let lineage = install_paths(
lineage,
&manifest,
&DomainPaths::default(),
PathBuf::new(),
String::default(),
&storage,
&entries_paths,
)
.await;
assert_eq!(
lineage.unwrap_err().to_string(),
"Table error: path z/z not found".to_string()
);
Ok(())
}
}