use std::marker::Unpin;
use std::path::Path;
use std::path::PathBuf;
use tracing::{debug, info, warn};
use crate::Res;
use crate::flow;
use crate::installed_package::InstalledPackage;
use crate::io::manifest::RowsStream;
use crate::io::manifest::build_manifest_from_rows_stream;
use crate::io::remote::Remote;
use crate::io::remote::RemoteS3;
use crate::io::storage::LocalStorage;
use crate::io::storage::Storage;
use crate::lineage;
use crate::lineage::DomainLineage;
use crate::lineage::Home;
use crate::manifest::Manifest;
use crate::manifest::ManifestHeader;
use crate::paths;
use quilt_uri::ManifestUri;
use quilt_uri::Namespace;
#[derive(Debug)]
pub struct LocalDomain<S: Storage = LocalStorage, R: Remote = RemoteS3> {
paths: paths::DomainPaths,
lineage: lineage::DomainLineageIo,
storage: S,
remote: R,
}
impl LocalDomain {
pub fn get_remote(&self) -> &RemoteS3 {
&self.remote
}
pub fn new(root_dir: impl AsRef<Path>) -> Self {
let paths = paths::DomainPaths::new(root_dir.as_ref().to_path_buf());
let lineage = lineage::DomainLineageIo::new(paths.lineage());
let storage = LocalStorage::new();
let remote = RemoteS3::new(paths.clone(), storage.clone());
Self {
lineage,
paths,
remote,
storage,
}
}
pub async fn get_home(&self) -> Res<Home> {
let lineage: DomainLineage = self.lineage.read(&self.storage).await?;
Ok(lineage.home)
}
pub async fn set_home(&self, dir: impl AsRef<Path>) -> Res<Home> {
info!("Setting home directory to {}", dir.as_ref().display());
Ok(self.lineage.set_home(&self.storage, dir).await?.home)
}
pub async fn scaffold_paths_for_installing(&self, namespace: &Namespace) -> Res {
debug!(
"Scaffolding installation paths for namespace: {}",
namespace
);
let home = self.get_home().await?;
self.paths
.scaffold_for_installing(&self.storage, &home, namespace)
.await
}
pub async fn scaffold_paths_for_caching(&self, bucket: &str) -> Res {
debug!("Scaffolding cache paths for bucket: {}", bucket);
self.paths.scaffold_for_caching(&self.storage, bucket).await
}
pub async fn browse_remote_manifest(&self, uri: &ManifestUri) -> Res<Manifest> {
self.scaffold_paths_for_caching(&uri.bucket).await?;
debug!("Initiating browse flow for manifest {}", uri.hash);
flow::browse(&self.paths, &self.storage, &self.remote, uri)
.await
.inspect(|_| debug!("Successfully browsed manifest {}", uri.hash))
.inspect_err(|e| warn!("Failed to browse manifest {}: {}", uri.hash, e))
}
pub fn create_installed_package(&self, namespace: Namespace) -> Res<InstalledPackage> {
Ok(InstalledPackage {
lineage: self.lineage.create_package_lineage(namespace.clone()),
namespace,
paths: self.paths.clone(),
remote: self.remote.try_clone()?,
storage: self.storage.clone(),
})
}
pub async fn install_package(&self, manifest_uri: &ManifestUri) -> Res<InstalledPackage> {
info!("Installing package: {}", manifest_uri.namespace);
debug!("Installing from manifest: {}", manifest_uri.display());
debug!("Preparing paths for installation");
self.scaffold_paths_for_caching(&manifest_uri.bucket)
.await?;
self.scaffold_paths_for_installing(&manifest_uri.namespace)
.await?;
debug!("Reading domain lineage");
let lineage: DomainLineage = self.lineage.read(&self.storage).await?;
debug!("Executing package installation flow");
let lineage = flow::install_package(
lineage,
&self.paths,
&self.storage,
&self.remote,
manifest_uri,
)
.await?;
debug!("Updating domain lineage");
self.lineage.write(&self.storage, lineage).await?;
info!("Successfully installed package: {}", manifest_uri.namespace);
self.create_installed_package(manifest_uri.namespace.clone())
}
pub async fn create_package(
&self,
namespace: Namespace,
source: Option<PathBuf>,
message: Option<String>,
) -> Res<InstalledPackage> {
info!("Creating package: {}", namespace);
let lineage: DomainLineage = self.lineage.read(&self.storage).await?;
let lineage = flow::create(
lineage,
&self.paths,
&self.storage,
namespace.clone(),
source,
message,
)
.await?;
self.lineage.write(&self.storage, lineage).await?;
info!("Successfully created package: {}", namespace);
self.create_installed_package(namespace)
}
pub async fn uninstall_package(&self, namespace: Namespace) -> Res<()> {
info!("Uninstalling package: {}", namespace);
debug!("Preparing paths for uninstallation");
self.scaffold_paths_for_installing(&namespace).await?;
debug!("Reading current lineage state");
let lineage = self.lineage.read(&self.storage).await?;
debug!("Executing package uninstallation flow");
let lineage =
flow::uninstall_package(lineage, &self.paths, &self.storage, namespace.clone()).await?;
debug!("Updating domain lineage after uninstallation");
self.lineage.write(&self.storage, lineage).await?;
info!("Successfully uninstalled package: {}", namespace);
Ok(())
}
pub async fn list_installed_packages(&self) -> Res<Vec<InstalledPackage>> {
debug!("Listing installed packages");
let lineage = self.lineage.read(&self.storage).await?;
let namespaces = lineage.namespaces();
debug!("Found {} installed packages", namespaces.len());
let mut packages = Vec::with_capacity(namespaces.len());
for namespace in namespaces {
packages.push(self.create_installed_package(namespace)?);
}
Ok(packages)
}
pub async fn get_installed_package(
&self,
namespace: &Namespace,
) -> Res<Option<InstalledPackage>> {
debug!("Looking up installed package: {}", namespace);
let lineage = self.lineage.read(&self.storage).await?;
if lineage.packages.contains_key(namespace) {
debug!("Found installed package: {}", namespace);
Ok(Some(self.create_installed_package(namespace.to_owned())?))
} else {
debug!("Package not found: {}", namespace);
Ok(None)
}
}
pub async fn build_manifest(
&self,
dest_path: PathBuf,
stream: impl RowsStream + Unpin,
) -> Res<(PathBuf, String)> {
info!("Building manifest at: {}", dest_path.display());
let dest_dir = dest_path.parent().unwrap_or(&dest_path).to_path_buf();
build_manifest_from_rows_stream(&self.storage, dest_dir, ManifestHeader::default(), stream)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use test_log::test;
#[test(tokio::test)]
async fn test_list_installed_packages() -> Res<()> {
let temp_dir = TempDir::new()?;
let local_domain = super::LocalDomain::new(temp_dir.path());
local_domain.set_home(&temp_dir.path()).await?;
let packages = local_domain.list_installed_packages().await?;
assert!(packages.is_empty());
let mut lineage = local_domain.lineage.read(&local_domain.storage).await?;
let namespaces = vec![
Namespace::from(("foo", "bar")),
Namespace::from(("test", "package")),
Namespace::from(("abc", "xyz")),
];
for namespace in &namespaces {
lineage.packages.insert(
namespace.clone(),
crate::lineage::PackageLineage {
commit: None,
remote_uri: Some(ManifestUri {
bucket: "test-bucket".to_string(),
namespace: namespace.clone(),
hash: "abcdef".to_string(),
origin: None,
}),
base_hash: "abcdef".to_string(),
latest_hash: "abcdef".to_string(),
paths: std::collections::BTreeMap::new(),
},
);
}
local_domain
.lineage
.write(&local_domain.storage, lineage)
.await?;
let packages = local_domain.list_installed_packages().await?;
assert_eq!(packages.len(), 3);
assert_eq!(packages[0].namespace, Namespace::from(("abc", "xyz")));
assert_eq!(packages[1].namespace, Namespace::from(("foo", "bar")));
assert_eq!(packages[2].namespace, Namespace::from(("test", "package")));
Ok(())
}
}