use std::collections::BTreeMap;
use std::path::PathBuf;
use tracing::log;
use crate::flow;
use crate::flow::cache_remote_manifest;
use crate::io::remote::resolve_workflow;
use crate::io::remote::HostConfig;
use crate::io::remote::Remote;
use crate::io::remote::RemoteS3;
use crate::io::storage::LocalStorage;
use crate::io::storage::Storage;
use crate::lineage;
use crate::lineage::CommitState;
use crate::lineage::InstalledPackageStatus;
use crate::lineage::LineagePaths;
use crate::manifest::Manifest;
use crate::manifest::Workflow;
use crate::paths;
use crate::paths::copy_cached_to_installed;
use crate::uri::Host;
use crate::uri::ManifestUri;
use crate::uri::Namespace;
use crate::uri::S3Uri;
use crate::Error;
use crate::Res;
#[derive(Debug)]
pub struct InstalledPackage<S: Storage = LocalStorage, R: Remote = RemoteS3> {
pub lineage: lineage::PackageLineageIo,
pub paths: paths::DomainPaths,
pub remote: R,
pub storage: S,
pub namespace: Namespace,
}
impl std::fmt::Display for InstalledPackage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, r##"Installed package "{}""##, self.namespace)
}
}
impl<S: Storage + Sync, R: Remote> InstalledPackage<S, R> {
pub async fn scaffold_paths(&self) -> Res {
let home = self.lineage.domain_home(&self.storage).await?;
self.paths
.scaffold_for_installing(&self.storage, &home, &self.namespace)
.await
}
pub async fn scaffold_paths_for_caching(&self, bucket: &str) -> Res {
self.paths.scaffold_for_caching(&self.storage, bucket).await
}
pub async fn manifest(&self) -> Res<Manifest> {
let (_, lineage) = self.lineage.read(&self.storage).await?;
let installed_path = self
.paths
.installed_manifest(&self.namespace, lineage.current_hash());
match Manifest::from_path(&self.storage, &installed_path).await {
Ok(manifest) => return Ok(manifest),
Err(e) => {
log::warn!(
"Failed to read installed manifest at {}: {}",
installed_path.display(),
e
);
}
}
log::info!("Attempting to recover from cache at {}", &lineage.remote);
let cached_manifest =
cache_remote_manifest(&self.paths, &self.storage, &self.remote, &lineage.remote)
.await?;
copy_cached_to_installed(&self.paths, &self.storage, &lineage.remote).await?;
Ok(cached_manifest)
}
pub async fn lineage(&self) -> Res<lineage::PackageLineage> {
let (_, lineage) = self.lineage.read(&self.storage).await?;
Ok(lineage)
}
pub async fn package_home(&self) -> Res<PathBuf> {
self.lineage.package_home(&self.storage).await
}
pub async fn status(&self, host_config_opt: Option<HostConfig>) -> Res<InstalledPackageStatus> {
let (package_home, lineage) = self.lineage.read(&self.storage).await?;
let lineage = flow::refresh_latest_hash(lineage, &self.remote).await?;
let manifest = self.manifest().await?;
let host_config =
host_config_opt.unwrap_or(self.remote.host_config(&lineage.remote.origin).await?);
let (lineage, status) = flow::status(
lineage,
&self.storage,
&manifest,
&package_home,
host_config,
)
.await?;
self.lineage.write(&self.storage, lineage).await?;
Ok(status)
}
pub async fn install_paths(&self, paths: &[PathBuf]) -> Res<LineagePaths> {
if paths.is_empty() {
return Ok(BTreeMap::new());
}
self.scaffold_paths().await?;
let (package_home, lineage) = self.lineage.read(&self.storage).await?;
self.scaffold_paths_for_caching(&lineage.remote.bucket)
.await?;
let mut manifest = self.manifest().await?;
let lineage = flow::install_paths(
lineage,
&mut manifest,
&self.paths,
package_home,
self.namespace.clone(),
&self.storage,
&self.remote,
&paths.iter().collect::<Vec<&PathBuf>>(),
)
.await?;
let lineage = self.lineage.write(&self.storage, lineage).await?;
Ok(lineage.paths)
}
pub async fn uninstall_paths(&self, paths: &Vec<PathBuf>) -> Res<LineagePaths> {
let (package_home, lineage) = self.lineage.read(&self.storage).await?;
let lineage = flow::uninstall_paths(lineage, package_home, &self.storage, paths).await?;
let lineage = self.lineage.write(&self.storage, lineage).await?;
Ok(lineage.paths)
}
pub async fn revert_paths(&self, paths: &Vec<String>) -> Res {
log::debug!("revert_paths: {paths:?}");
unimplemented!()
}
pub async fn commit(
&self,
message: String,
user_meta: Option<serde_json::Value>,
workflow: Option<Workflow>,
host_config_opt: Option<HostConfig>,
) -> Res<CommitState> {
self.scaffold_paths().await?;
let (package_home, lineage) = self.lineage.read(&self.storage).await?;
let mut manifest = self.manifest().await?;
let host_config =
host_config_opt.unwrap_or(self.remote.host_config(&lineage.remote.origin).await?);
let (lineage, status) = flow::status(
lineage,
&self.storage,
&manifest,
&package_home,
host_config,
)
.await?;
let lineage = flow::commit(
lineage,
&mut manifest,
&self.paths,
&self.storage,
package_home,
status,
self.namespace.clone(),
message,
user_meta,
workflow,
)
.await?;
let lineage = self.lineage.write(&self.storage, lineage).await?;
match lineage.commit {
Some(commit) => Ok(commit),
None => Err(Error::Commit("Nothing committed".to_string())),
}
}
pub async fn push(&self, host_config_opt: Option<HostConfig>) -> Res<ManifestUri> {
self.scaffold_paths().await?;
let (_, lineage) = self.lineage.read(&self.storage).await?;
if lineage.commit.is_none() {
return Err(Error::Push("No commits to push".to_string()));
}
self.scaffold_paths_for_caching(&lineage.remote.bucket)
.await?;
let manifest = self.manifest().await?;
let host_config =
host_config_opt.unwrap_or(self.remote.host_config(&lineage.remote.origin).await?);
let lineage = flow::push(
lineage,
manifest,
&self.paths,
&self.storage,
&self.remote,
Some(self.namespace.clone()),
host_config,
)
.await?;
let lineage = self.lineage.write(&self.storage, lineage).await?;
Ok(lineage.remote)
}
pub async fn pull(&self, host_config_opt: Option<HostConfig>) -> Res<ManifestUri> {
self.scaffold_paths().await?;
let (package_home, lineage) = self.lineage.read(&self.storage).await?;
self.scaffold_paths_for_caching(&lineage.remote.bucket)
.await?;
let mut manifest = self.manifest().await?;
let host_config =
host_config_opt.unwrap_or(self.remote.host_config(&lineage.remote.origin).await?);
let (lineage, status) = flow::status(
lineage,
&self.storage,
&manifest,
&package_home,
host_config,
)
.await?;
let lineage = flow::pull(
lineage,
&mut manifest,
&self.paths,
&self.storage,
&self.remote,
package_home,
status,
self.namespace.clone(),
)
.await?;
let lineage = self.lineage.write(&self.storage, lineage).await?;
Ok(lineage.remote)
}
pub async fn certify_latest(&self) -> Res<ManifestUri> {
let (_, lineage) = self.lineage.read(&self.storage).await?;
let latest_manifest_uri = lineage.remote.clone();
let lineage = flow::certify_latest(lineage, &self.remote, latest_manifest_uri).await?;
let lineage = self.lineage.write(&self.storage, lineage).await?;
Ok(lineage.remote)
}
pub async fn reset_to_latest(&self) -> Res<ManifestUri> {
self.scaffold_paths().await?;
let (package_home, lineage) = self.lineage.read(&self.storage).await?;
self.scaffold_paths_for_caching(&lineage.remote.bucket)
.await?;
let mut manifest = self.manifest().await?;
let lineage = flow::reset_to_latest(
lineage,
&mut manifest,
&self.paths,
&self.storage,
&self.remote,
package_home,
self.namespace.clone(),
)
.await?;
let lineage = self.lineage.write(&self.storage, lineage).await?;
Ok(lineage.remote)
}
pub async fn set_origin(&self, origin: Host) -> Res {
let (_, mut lineage) = self.lineage.read(&self.storage).await?;
lineage.remote.origin = Some(origin);
self.lineage.write(&self.storage, lineage).await?;
Ok(())
}
pub async fn resolve_workflow(&self, workflow_id: Option<String>) -> Res<Option<Workflow>> {
let (_, lineage) = self.lineage.read(&self.storage).await?;
let remote_uri = lineage.remote;
let workflows_config_uri = S3Uri {
key: ".quilt/workflows/config.yml".to_string(),
..S3Uri::from(remote_uri.clone())
};
resolve_workflow(
&self.remote,
&remote_uri.origin,
workflow_id,
&workflows_config_uri,
)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use test_log::test;
use aws_sdk_s3::primitives::ByteStream;
use crate::io::remote::mocks::MockRemote;
use crate::io::storage::StorageExt;
use crate::lineage::DomainLineageIo;
use crate::lineage::Home;
use crate::lineage::PackageLineageIo;
use crate::paths::DomainPaths;
#[test(tokio::test)]
async fn test_spamming_commit_writes() -> Res {
let (home, _temp_dir1) = Home::from_temp_dir()?;
let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
let storage = LocalStorage::new();
let remote = MockRemote::default();
let namespace: Namespace = ("test", "history").into();
let test_hash = "deadbeef".to_string();
paths
.scaffold_for_installing(&storage, &home, &namespace)
.await?;
let lineage_json = format!(
r#"{{
"packages": {{
"test/history": {{
"commit": null,
"remote": {{
"bucket": "bucket",
"namespace": "test/history",
"hash": "{}",
"catalog": "test.quilt.dev"
}},
"base_hash": "{}",
"latest_hash": "{}",
"paths": {{}}
}}}},
"home": "/tmp/working_dir"
}}"#,
test_hash, "foo", "bar"
);
storage
.write_byte_stream(&paths.lineage(), lineage_json.into_bytes().into())
.await?;
let test_manifest_path = paths.installed_manifest(&namespace, &test_hash);
let test_manifest = r#"{"version": "v0"}"#;
storage
.write_byte_stream(
&test_manifest_path,
ByteStream::from_static(test_manifest.as_bytes()),
)
.await?;
let domain_lineage_io = DomainLineageIo::new(paths.lineage());
let package = InstalledPackage {
lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
paths,
remote,
storage,
namespace,
};
let mut expected_hashes = Vec::new();
for i in 0..10 {
let commit = package
.commit(
format!("Commit new1 {i}"),
Some(serde_json::json!({ "count": i })),
None,
None,
)
.await?;
expected_hashes.insert(i, commit.hash);
}
expected_hashes.pop();
let commit_state = package.lineage().await?.commit.unwrap();
assert_eq!(commit_state.prev_hashes.len(), 9);
assert_eq!(
commit_state.prev_hashes,
expected_hashes.into_iter().rev().collect::<Vec<String>>()
);
Ok(())
}
#[test(tokio::test)]
async fn test_manifest_recovery_from_corruption() -> Res {
let (home, _temp_dir1) = Home::from_temp_dir()?;
let (paths, _temp_dir2) = DomainPaths::from_temp_dir()?;
let storage = LocalStorage::new();
let remote = MockRemote::default();
let namespace: Namespace = ("test", "recovery").into();
let test_hash = "deadbeef".to_string();
paths
.scaffold_for_installing(&storage, &home, &namespace)
.await?;
paths.scaffold_for_caching(&storage, "test-bucket").await?;
let lineage_json = format!(
r#"{{
"packages": {{
"test/recovery": {{
"commit": null,
"remote": {{
"bucket": "test-bucket",
"namespace": "test/recovery",
"hash": "{}",
"catalog": null
}},
"base_hash": "{}",
"latest_hash": "{}",
"paths": {{}}
}}}},
"home": "/tmp/working_dir"
}}"#,
test_hash, "foo", "bar"
);
storage
.write_byte_stream(&paths.lineage(), lineage_json.into_bytes().into())
.await?;
let reference_manifest = crate::fixtures::manifest::path();
let cached_manifest = paths.cached_manifest("test-bucket", &test_hash);
storage.copy(reference_manifest?, cached_manifest).await?;
let installed_manifest = paths.installed_manifest(&namespace, &test_hash);
storage
.write_byte_stream(
&installed_manifest,
ByteStream::from_static(b"corrupted data"),
)
.await?;
let domain_lineage_io = DomainLineageIo::new(paths.lineage());
let package = InstalledPackage {
lineage: PackageLineageIo::new(domain_lineage_io, namespace.clone()),
paths,
remote,
storage: storage.clone(),
namespace,
};
let result = package.manifest().await;
assert!(
result.is_ok(),
"Should recover from cache when installed is corrupted"
);
let fixed_manifest_content = storage.read_bytes(&installed_manifest).await?;
assert!(
fixed_manifest_content.len() > 10,
"Installed manifest should be fixed"
);
assert!(
!fixed_manifest_content.starts_with(b"corrupted"),
"Should no longer be corrupted"
);
Ok(())
}
}