use std::{
collections::{hash_map::RandomState, BTreeMap, HashMap, HashSet, VecDeque},
path::PathBuf,
};
use aws_sdk_s3::primitives::ByteStream;
use serde::{Deserialize, Serialize};
use tokio::{
fs::{create_dir_all, read_dir, remove_dir_all, File},
io::AsyncWriteExt,
};
use url::Url;
pub mod lineage;
pub mod manifest;
pub mod storage;
pub mod uri;
use crate::s3_utils;
pub use self::{
lineage::{CommitState, DomainLineage, PackageLineage, PathState},
manifest::{ContentHash, Manifest, ManifestHeader, ManifestRow},
storage::{fs, s3},
uri::{RevisionPointer, S3PackageURI},
};
const MANIFEST_DIR: &str = ".quilt/packages";
const TAGS_DIR: &str = ".quilt/named_packages";
const OBJECTS_DIR: &str = ".quilt/objects";
const LINEAGE_FILE: &str = ".quilt/data.json";
const INSTALLED_DIR: &str = ".quilt/installed";
pub fn tag_key(namespace: &str, tag: &str) -> String {
format!("{TAGS_DIR}/{namespace}/{tag}")
}
pub fn tag_uri(bucket: &str, namespace: &str, tag: &str) -> s3::S3Uri {
s3::S3Uri {
bucket: bucket.to_owned(),
key: tag_key(namespace, tag),
version: None,
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RemoteManifest {
pub bucket: String,
pub namespace: String,
pub hash: String,
}
impl RemoteManifest {
pub async fn resolve(uri: &S3PackageURI) -> Result<Self, String> {
let top_hash = match &uri.revision {
RevisionPointer::Hash(top_hash) => top_hash.clone(),
RevisionPointer::Tag(tag) => {
tag_uri(&uri.bucket, &uri.namespace, tag)
.get_contents()
.await?
}
};
Ok(Self {
bucket: uri.bucket.clone(),
namespace: uri.namespace.clone(),
hash: top_hash,
})
}
pub async fn resolve_latest(&self) -> Result<String, String> {
tag_uri(&self.bucket, &self.namespace, "latest")
.get_contents()
.await
}
pub async fn update_latest(&self, hash: String) -> Result<(), String> {
tag_uri(&self.bucket, &self.namespace, "latest")
.put_contents(hash.into_bytes())
.await
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct CachedManifest {
pub domain: LocalDomain,
pub bucket: String,
pub hash: String,
}
impl CachedManifest {
pub async fn read(&self) -> Result<Manifest, String> {
let path = self.domain.manifest_cache_path(&self.bucket, &self.hash);
let file = fs::open(&path).await?;
Manifest::from_file(file).await
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct InstalledManifest {
pub package: InstalledPackage,
pub hash: String,
}
impl InstalledManifest {
pub async fn read(&self) -> Result<Manifest, String> {
let path = self
.package
.domain
.installed_manifest_path(&self.package.namespace, &self.hash);
let file = fs::open(&path).await?;
Manifest::from_file(file).await
}
}
#[derive(Debug, PartialEq, Eq)]
struct S3Domain {
bucket: String,
}
impl From<&S3PackageURI> for S3Domain {
fn from(uri: &S3PackageURI) -> Self {
Self {
bucket: uri.bucket.clone(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct LocalDomain {
root_dir: PathBuf,
}
impl LocalDomain {
pub fn new(root_dir: PathBuf) -> Self {
Self { root_dir }
}
pub fn make_cached_manifest(
&self,
bucket: impl AsRef<str>,
hash: impl AsRef<str>,
) -> CachedManifest {
CachedManifest {
domain: self.clone(),
bucket: String::from(bucket.as_ref()),
hash: String::from(hash.as_ref()),
}
}
pub fn manifest_cache_path(&self, bucket: &str, hash: &str) -> PathBuf {
self.root_dir.join(MANIFEST_DIR).join(bucket).join(hash)
}
pub fn working_folder(&self, namespace: &str) -> PathBuf {
self.root_dir.join(namespace)
}
pub async fn read_lineage(&self) -> Result<DomainLineage, String> {
let lineage_path = self.root_dir.join(LINEAGE_FILE);
let contents = fs::read_to_string(&lineage_path).await.or_else(|err| {
if err.kind() == std::io::ErrorKind::NotFound {
Ok("{}".into())
} else {
Err(format!(
"Failed to read the lineage file: {}",
err.to_string()
))
}
})?;
DomainLineage::try_from(&contents[..])
}
pub async fn write_lineage(&self, lineage: &DomainLineage) -> Result<(), String> {
let lineage_path = self.root_dir.join(LINEAGE_FILE);
let contents = serde_json::to_string_pretty(lineage).map_err(|err| err.to_string())?;
fs::write(lineage_path, contents.as_bytes())
.await
.map_err(|err| err.to_string())
}
pub async fn cache_remote_manifest(
&self,
manifest: &RemoteManifest,
) -> Result<CachedManifest, String> {
let cache_path = self.manifest_cache_path(&manifest.bucket, &manifest.hash);
if !fs::exists(&cache_path).await {
let obj_uri = s3::S3Uri {
bucket: manifest.bucket.clone(),
key: format!("{}/{}", MANIFEST_DIR, &manifest.hash),
version: None,
};
let contents = s3::get_object_contents(&obj_uri)
.await
.map_err(|err| format!("Failed to download manifest from {obj_uri:?}: {err}"))?;
fs::write(&cache_path, contents.as_bytes())
.await
.map_err(|err| format!("Failed to write manifest to {cache_path:?}: {err}"))?;
}
Ok(CachedManifest {
domain: self.to_owned(),
bucket: manifest.bucket.clone(),
hash: manifest.hash.clone(),
})
}
pub async fn browse_remote_manifest(
&self,
remote: &RemoteManifest,
) -> Result<Manifest, String> {
self.cache_remote_manifest(remote).await?.read().await
}
pub async fn browse_uri(&self, uri: &S3PackageURI) -> Result<Manifest, String> {
let remote_manifest = RemoteManifest::resolve(uri).await?;
self.browse_remote_manifest(&remote_manifest).await
}
pub fn installed_manifests_path(&self, namespace: &str) -> PathBuf {
self.root_dir.join(INSTALLED_DIR).join(namespace)
}
pub fn installed_manifest_path(&self, namespace: &str, hash: &str) -> PathBuf {
self.installed_manifests_path(namespace).join(hash)
}
pub async fn install_package<'r, 'p>(
&self,
remote: &'r RemoteManifest,
) -> Result<InstalledPackage, String> {
let lineage: DomainLineage = self.read_lineage().await?;
if lineage.packages.contains_key(&remote.namespace) {
return Err(format!(
"Package '{}' is already installed",
remote.namespace
));
}
self.cache_remote_manifest(remote).await?;
let installed_manifest_path = self.installed_manifest_path(&remote.namespace, &remote.hash);
create_dir_all(&installed_manifest_path.parent().unwrap())
.await
.map_err(|err| err.to_string())?;
tokio::fs::copy(
self.manifest_cache_path(&remote.bucket, &remote.hash),
installed_manifest_path,
)
.await
.map_err(|err| err.to_string())?;
let latest_hash = remote.resolve_latest().await?;
let mut lineage = lineage;
lineage.packages.insert(
remote.namespace.clone(),
PackageLineage::from_remote(remote.to_owned(), latest_hash),
);
self.write_lineage(&lineage).await?;
Ok(InstalledPackage {
domain: self.to_owned(),
namespace: remote.namespace.clone(),
})
}
pub async fn uninstall_package(&self, namespace: impl AsRef<str>) -> Result<(), String> {
let namespace = namespace.as_ref();
let mut lineage = self.read_lineage().await?;
lineage
.packages
.remove(namespace)
.ok_or("Package not installed".to_string())?;
self.write_lineage(&lineage).await?;
if let Err(err) = remove_dir_all(self.installed_manifests_path(namespace)).await {
println!("Failed to remove installed manifests: {err}");
}
if let Err(err) = remove_dir_all(self.working_folder(namespace)).await {
println!("Failed to remove working directory: {err}");
}
Ok(())
}
pub async fn list_installed_packages(&self) -> Result<Vec<InstalledPackage>, String> {
let lineage = self.read_lineage().await?;
let mut namespaces: Vec<String> = lineage.packages.into_keys().collect();
namespaces.sort();
let packages = namespaces
.into_iter()
.map(|namespace| InstalledPackage {
domain: self.to_owned(),
namespace,
})
.collect();
Ok(packages)
}
pub async fn get_installed_package(
&self,
namespace: &String,
) -> Result<Option<InstalledPackage>, String> {
let lineage = self.read_lineage().await?;
if lineage.packages.contains_key(namespace) {
Ok(Some(InstalledPackage {
domain: self.to_owned(),
namespace: namespace.clone(),
}))
} else {
Ok(None)
}
}
}
#[derive(Debug, PartialEq, Eq, Serialize)]
pub struct Change<T> {
current: Option<T>,
previous: Option<T>,
}
pub type ChangeSet<K, T> = BTreeMap<K, Change<T>>;
#[derive(Debug, PartialEq, Eq, Default, Serialize)]
pub struct UpstreamState {
commit_pending: bool, behind: bool, ahead: bool, }
impl UpstreamState {
pub fn from_lineage(lineage: &PackageLineage) -> Self {
Self {
commit_pending: lineage.commit.is_some(),
behind: lineage.base_hash != lineage.latest_hash,
ahead: lineage.base_hash != lineage.current_hash(),
}
}
}
#[derive(Debug, PartialEq, Eq, Default, Serialize)]
pub enum UpstreamDiscreteState {
#[default]
UpToDate,
Behind,
Ahead,
Diverged,
}
impl From<&UpstreamState> for UpstreamDiscreteState {
fn from(upstream: &UpstreamState) -> Self {
match (upstream.ahead, upstream.behind) {
(false, false) => Self::UpToDate,
(false, true) => Self::Behind,
(true, false) => Self::Ahead,
(true, true) => Self::Diverged,
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct PackageFileFingerprint {
pub size: u64,
pub hash: ContentHash,
}
#[derive(Debug, PartialEq, Default, Serialize)]
pub struct InstalledPackageStatus {
pub upstream: UpstreamState,
pub upstream_state: UpstreamDiscreteState,
pub dirty: bool, pub changes: ChangeSet<String, PackageFileFingerprint>,
}
impl InstalledPackageStatus {
pub fn new(
upstream: UpstreamState,
changes: ChangeSet<String, PackageFileFingerprint>,
) -> Self {
Self {
upstream_state: UpstreamDiscreteState::from(&upstream),
upstream,
dirty: !changes.is_empty(),
changes,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct InstalledPackage {
pub domain: LocalDomain,
pub namespace: String,
}
impl InstalledPackage {
pub async fn lineage(&self) -> Result<PackageLineage, String> {
self.domain
.read_lineage()
.await?
.packages
.get(&self.namespace)
.ok_or("not found".to_string())
.cloned()
}
pub async fn write_lineage(&self, lineage: PackageLineage) -> Result<(), String> {
let mut domain_lineage = self.domain.read_lineage().await?;
domain_lineage
.packages
.insert(self.namespace.clone(), lineage);
self.domain.write_lineage(&domain_lineage).await
}
pub async fn paths(&self) -> Result<Vec<String>, String> {
self.lineage().await.map(|l| l.paths.into_keys().collect())
}
pub fn make_installed_manifest(&self, hash: impl AsRef<str>) -> InstalledManifest {
InstalledManifest {
package: self.to_owned(),
hash: String::from(hash.as_ref()),
}
}
pub async fn manifest(&self) -> Result<InstalledManifest, String> {
self.lineage()
.await
.map(|l| self.make_installed_manifest(l.current_hash()))
}
pub fn working_folder(&self) -> PathBuf {
self.domain.working_folder(&self.namespace)
}
pub async fn uninstall(&self) -> Result<(), String> {
self.domain.uninstall_package(&self.namespace).await
}
pub async fn status(&self) -> Result<InstalledPackageStatus, String> {
let mut lineage = self.lineage().await?;
if let Ok(latest_hash) = lineage.remote.resolve_latest().await {
lineage.latest_hash = latest_hash;
self.write_lineage(lineage.clone()).await?;
}
let manifest = self.manifest().await?.read().await?;
let work_dir = self.working_folder();
let mut orig_paths = HashMap::new();
for path in lineage.paths.keys() {
let idx = manifest.find_path(path).ok_or("no such path")?;
let row = &manifest.rows[idx];
let ContentHash::SHA256(hash) = &row.hash;
orig_paths.insert(PathBuf::from(path), (hash.clone(), row.size));
}
let mut queue = VecDeque::new();
queue.push_back(work_dir.clone());
let mut changes = ChangeSet::new();
while let Some(dir) = queue.pop_front() {
let mut dir_entries = match read_dir(dir).await {
Ok(dir_entries) => dir_entries,
Err(err) => {
println!("Failed to read directory: {}", err);
continue;
}
};
while let Some(dir_entry) = dir_entries
.next_entry()
.await
.map_err(|err| err.to_string())?
{
let file_path = dir_entry.path();
let file_type = dir_entry.file_type().await.map_err(|err| err.to_string())?;
if file_type.is_dir() {
queue.push_back(file_path);
} else if file_type.is_file() {
let file_metadata =
dir_entry.metadata().await.map_err(|err| err.to_string())?;
let file_hash = sha256::try_async_digest(&file_path)
.await
.map_err(|err| err.to_string())?;
let relative_path = file_path.strip_prefix(&work_dir).unwrap();
if let Some((orig_hash, orig_size)) = orig_paths.remove(relative_path) {
if file_hash != orig_hash {
changes.insert(
relative_path.display().to_string(),
Change {
current: Some(PackageFileFingerprint {
size: file_metadata.len(),
hash: ContentHash::SHA256(file_hash),
}),
previous: Some(PackageFileFingerprint {
size: orig_size,
hash: ContentHash::SHA256(orig_hash),
}),
},
);
}
} else {
changes.insert(
relative_path.display().to_string(),
Change {
current: Some(PackageFileFingerprint {
size: file_metadata.len(),
hash: ContentHash::SHA256(file_hash),
}),
previous: None,
},
);
}
} else {
println!("Unexpected file type: {}", file_path.display());
}
}
}
for (orig_path, (orig_hash, orig_size)) in orig_paths {
changes.insert(
orig_path.display().to_string(),
Change {
current: None,
previous: Some(PackageFileFingerprint {
size: orig_size,
hash: ContentHash::SHA256(orig_hash),
}),
},
);
}
Ok(InstalledPackageStatus::new(
UpstreamState::from_lineage(&lineage),
changes,
))
}
pub async fn install_paths(&self, paths: &Vec<String>) -> Result<(), String> {
if paths.len() == 0 {
return Ok(());
}
let mut lineage = self.lineage().await?;
if !HashSet::<String, RandomState>::from_iter(lineage.paths.keys().cloned())
.is_disjoint(&HashSet::from_iter(paths.to_owned()))
{
return Err(format!("duplicate paths"));
}
let objects_dir = self.domain.root_dir.join(OBJECTS_DIR);
create_dir_all(&objects_dir)
.await
.map_err(|err| err.to_string())?;
let working_dir = self.working_folder();
create_dir_all(&working_dir)
.await
.map_err(|err| err.to_string())?;
let mut manifest = self.manifest().await?.read().await?;
for path in paths {
let idx = manifest.find_path(path).ok_or("no such path")?;
let row = &mut manifest.rows[idx];
let parsed_url = Url::parse(&row.physical_key).map_err(|err| err.to_string())?;
if parsed_url.scheme() != "s3" {
return Err("invalid scheme".into());
}
let bucket = parsed_url.host_str().ok_or("missing bucket")?;
let key = &parsed_url.path()[1..];
let query: HashMap<_, _> = parsed_url.query_pairs().into_owned().collect();
let version_id = query.get("versionId").ok_or("missing versionId")?;
let ContentHash::SHA256(hash) = &row.hash;
let object_dest = objects_dir.join(hash);
if !fs::exists(&object_dest).await {
let mut file = File::create(&object_dest)
.await
.map_err(|err| err.to_string())?;
let client = s3_utils::get_client_for_bucket(bucket.into()).await?;
let mut object = client
.get_object()
.bucket(bucket)
.key(key)
.version_id(version_id)
.send()
.await
.map_err(|err| {
err.into_service_error()
.meta()
.message()
.unwrap_or("failed to download")
.to_string()
})?;
while let Some(bytes) = object
.body
.try_next()
.await
.map_err(|err| err.to_string())?
{
file.write_all(&bytes)
.await
.map_err(|err| err.to_string())?;
}
file.flush().await.map_err(|err| err.to_string())?;
}
row.physical_key = Url::from_file_path(&object_dest).unwrap().to_string();
let working_dest = working_dir.join(&row.logical_key);
tokio::fs::copy(&object_dest, &working_dest)
.await
.map_err(|err| err.to_string())?;
let timestamp = fs::get_file_modified_ts(&working_dest).await?;
lineage.paths.insert(
row.logical_key.to_owned(),
PathState {
timestamp,
hash: row.hash.to_owned(),
},
);
}
let installed_manifest_path = self
.domain
.installed_manifest_path(&self.namespace, lineage.current_hash());
fs::write(&installed_manifest_path, manifest.to_jsonlines().as_bytes())
.await
.map_err(|err| err.to_string())?;
self.write_lineage(lineage).await?;
Ok(())
}
pub async fn uninstall_paths(&self, paths: &Vec<String>) -> Result<(), String> {
println!("uninstall_paths: {paths:?}");
let mut lineage = self.lineage().await?;
let working_dir = self.working_folder();
for path in paths {
lineage.paths.remove(path).ok_or("path is not installed")?;
let working_path = working_dir.join(path);
match tokio::fs::remove_file(working_path).await {
Ok(()) => (),
Err(err) => {
if err.kind() != std::io::ErrorKind::NotFound {
return Err(err.to_string());
}
}
};
}
self.write_lineage(lineage).await?;
Ok(())
}
pub async fn revert_paths(&self, paths: &Vec<String>) -> Result<(), String> {
println!("revert_paths: {paths:?}");
Err("not implemented".into())
}
pub async fn commit(
&self,
message: String,
user_meta: Option<manifest::JsonObject>,
) -> Result<(), String> {
println!("commit: {message:?}, {user_meta:?}");
let mut lineage = self.domain.read_lineage().await?;
let package_lineage = lineage
.packages
.get_mut(&self.namespace)
.ok_or("not found")?;
let status = self.status().await?;
let objects_dir = self.domain.root_dir.join(OBJECTS_DIR);
create_dir_all(&objects_dir)
.await
.map_err(|err| err.to_string())?;
let work_dir = self.working_folder();
let mut manifest = self.manifest().await?.read().await?;
let mut entry_map: HashMap<_, _> = manifest
.rows
.into_iter()
.map(|row| (row.logical_key.to_owned(), row))
.collect();
for (logical_key, Change { current, previous }) in status.changes {
if let Some(previous) = previous {
let removed = entry_map
.remove(&logical_key)
.ok_or(format!("cannot remove {}", logical_key))?;
if removed.size != previous.size || removed.hash != previous.hash {
return Err(format!(
"unexpected size or hash for removed {}",
logical_key
));
}
package_lineage.paths.remove(&logical_key);
}
if let Some(current) = current {
let ContentHash::SHA256(hash) = ¤t.hash;
let object_dest = objects_dir.join(hash);
let new_physical_key = Url::from_file_path(&object_dest).unwrap().into();
if entry_map
.insert(
logical_key.to_owned(),
ManifestRow {
logical_key: logical_key.to_owned(),
physical_key: new_physical_key,
hash: current.hash.clone(),
size: current.size,
meta: None,
},
)
.is_some()
{
return Err(format!("cannot overwrite {}", logical_key));
}
let work_dest = work_dir.join(&logical_key);
if !fs::exists(&object_dest).await {
tokio::fs::copy(&work_dest, object_dest)
.await
.map_err(|err| err.to_string())?;
}
package_lineage.paths.insert(
logical_key,
PathState {
timestamp: fs::get_file_modified_ts(&work_dest).await?,
hash: current.hash,
},
);
}
}
manifest.rows = entry_map.values().cloned().collect();
manifest
.rows
.sort_unstable_by(|a, b| a.logical_key.cmp(&b.logical_key));
manifest.header.message = Some(message);
manifest.header.user_meta = user_meta;
let new_top_hash = manifest.top_hash();
let new_manifest_path = self
.domain
.installed_manifest_path(&self.namespace, &new_top_hash);
fs::write(&new_manifest_path, manifest.to_jsonlines().as_bytes())
.await
.map_err(|err| err.to_string())?;
let mut prev_hashes = Vec::new();
if let Some(commit) = &package_lineage.commit {
prev_hashes.push(commit.hash.to_owned());
prev_hashes.extend(commit.prev_hashes.to_owned());
}
let commit = CommitState {
hash: new_top_hash,
timestamp: chrono::Utc::now(),
prev_hashes,
};
package_lineage.commit = Some(commit);
self.domain.write_lineage(&lineage).await?;
Ok(())
}
pub async fn push(&self) -> Result<(), String> {
let mut lineage = self.lineage().await?;
let commit = match lineage.commit {
None => return Ok(()), Some(commit) => commit,
};
let remote = &lineage.remote;
let mut local_manifest = self.manifest().await?.read().await?;
let remote_manifest = self.domain.browse_remote_manifest(remote).await?;
let client = crate::s3_utils::get_client_for_bucket(remote.bucket.clone()).await?;
for row in local_manifest.rows.iter_mut() {
if let Some(remote_row) = remote_manifest.get(&row.logical_key) {
if remote_row.eq(row) {
row.physical_key = remote_row.physical_key.to_owned();
continue;
}
}
let local_url = Url::parse(&row.physical_key).unwrap();
let file_path: PathBuf = local_url.to_file_path().unwrap();
let body = ByteStream::from_path(&file_path)
.await
.map_err(|err| err.to_string())?;
let s3_key = format!("{}/{}", self.namespace, row.logical_key);
println!("uploading to s3({}): {}", remote.bucket, s3_key);
let response = client
.put_object()
.bucket(&remote.bucket)
.key(&s3_key)
.body(body)
.send()
.await
.map_err(|err| err.to_string())?;
let mut remote_url = Url::parse("s3://").unwrap();
remote_url
.set_host(Some(&remote.bucket))
.expect("failed to set bucket");
remote_url.set_path(&s3_key);
if let Some(version_id) = response.version_id {
remote_url
.query_pairs_mut()
.append_pair("versionId", &version_id);
}
println!("got remote url: {}", remote_url);
row.physical_key = remote_url.to_string();
}
let top_hash = local_manifest.top_hash();
let new_remote = RemoteManifest {
hash: top_hash.clone(),
..remote.clone()
};
let cache_path = self
.domain
.manifest_cache_path(&new_remote.bucket, &new_remote.hash);
fs::write(&cache_path, local_manifest.to_jsonlines().as_bytes())
.await
.map_err(|err| format!("Failed to write manifest to {cache_path:?}: {err}"))?;
let manifest_key = format!("{MANIFEST_DIR}/{}", new_remote.hash);
println!("writing remote manifest to {manifest_key}");
client
.put_object()
.bucket(&new_remote.bucket)
.key(&manifest_key)
.body(local_manifest.to_jsonlines().as_bytes().to_owned().into())
.send()
.await
.map_err(|err| err.to_string())?;
println!("uploaded remote manifest: {new_remote:?}");
client
.put_object()
.bucket(&new_remote.bucket)
.key(&format!(
"{TAGS_DIR}/{}/{}",
new_remote.namespace,
commit.timestamp.timestamp(),
))
.body(new_remote.hash.as_bytes().to_vec().into())
.send()
.await
.map_err(|err| err.to_string())?;
lineage.latest_hash = new_remote.resolve_latest().await?;
lineage.remote = new_remote;
lineage.commit = None;
if lineage.base_hash == lineage.latest_hash {
lineage.remote.update_latest(top_hash.clone()).await?;
lineage.latest_hash = top_hash.clone();
lineage.base_hash = top_hash.clone();
}
self.write_lineage(lineage).await?;
Ok(())
}
pub async fn pull(&self) -> Result<(), String> {
let status = self.status().await?;
if !status.changes.is_empty() {
return Err("package has pending changes".into());
}
let lineage = self.lineage().await?;
if lineage.commit.is_some() {
return Err("package has pending commits".into());
}
if lineage.remote.hash != lineage.base_hash {
return Err("package is has diverged".into());
}
if lineage.base_hash == lineage.latest_hash {
return Err("package is already up to date".into());
}
let paths: Vec<String> = lineage.paths.keys().cloned().collect();
self.uninstall_paths(&paths).await?;
let mut lineage = self.lineage().await?;
lineage.remote.hash = lineage.latest_hash.clone();
lineage.base_hash = lineage.latest_hash.clone();
self.domain.cache_remote_manifest(&lineage.remote).await?;
tokio::fs::copy(
self.domain
.manifest_cache_path(&lineage.remote.bucket, &lineage.remote.hash),
self.domain
.installed_manifest_path(&self.namespace, &lineage.remote.hash),
)
.await
.map_err(|err| err.to_string())?;
self.write_lineage(lineage).await?;
let manifest = self.manifest().await?.read().await?;
let paths_to_install = paths.into_iter().filter(|x| manifest.has_path(x)).collect();
self.install_paths(&paths_to_install).await?;
Ok(())
}
pub async fn certify_latest(&self) -> Result<(), String> {
let mut lineage = self.lineage().await?;
let new_latest = lineage.remote.hash.clone();
lineage.remote.update_latest(new_latest.clone()).await?;
lineage.latest_hash = new_latest.clone();
lineage.base_hash = new_latest;
self.write_lineage(lineage).await
}
pub async fn reset_to_latest(&self) -> Result<(), String> {
let lineage = self.lineage().await?;
let new_latest = lineage.remote.resolve_latest().await?;
if new_latest == lineage.remote.hash {
return Ok(());
}
let paths: Vec<String> = lineage.paths.into_keys().collect();
self.uninstall_paths(&paths).await?;
let mut lineage = self.lineage().await?;
lineage.latest_hash = new_latest.clone();
lineage.remote.hash = new_latest.clone();
lineage.base_hash = new_latest;
self.domain.cache_remote_manifest(&lineage.remote).await?;
tokio::fs::copy(
self.domain
.manifest_cache_path(&lineage.remote.bucket, &lineage.remote.hash),
self.domain
.installed_manifest_path(&self.namespace, &lineage.remote.hash),
)
.await
.map_err(|err| err.to_string())?;
self.write_lineage(lineage).await?;
let manifest = self.manifest().await?.read().await?;
let paths_to_install = paths.into_iter().filter(|x| manifest.has_path(x)).collect();
self.install_paths(&paths_to_install).await
}
}
#[derive(Debug, PartialEq)]
pub struct Conflict {
package: InstalledPackage,
changes: ChangeSet<String, PackageFileFingerprint>,
folder: PathBuf,
}
#[cfg(test)]
mod tests {
use super::*;
use temp_testdir::TempDir;
use tokio_test::{assert_err, block_on};
fn get_timestamp() -> String {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
.to_string()
}
#[test]
#[ignore]
fn flow() {
let test_uri_string = utils::TEST_URI_STRING;
let test_uri = S3PackageURI::try_from(test_uri_string).expect("Failed to parse URI");
assert_eq!(
test_uri,
S3PackageURI {
bucket: "quilt-example".into(),
namespace: "akarve/test_dest".into(),
path: Some("README.md".into()),
revision: RevisionPointer::default(),
}
);
let temp_dir = TempDir::default();
let local_path = PathBuf::from(temp_dir.as_ref());
let local_domain = LocalDomain::new(local_path);
let remote_manifest =
block_on(RemoteManifest::resolve(&test_uri)).expect("Failed to resolve manifest");
let cached_manifest = block_on(local_domain.cache_remote_manifest(&remote_manifest))
.expect("Failed to cache the manifest");
let manifest = block_on(cached_manifest.read()).expect("Failed to parse the manifest");
println!("manifest: {manifest:?}");
let paths = vec![test_uri.path.unwrap()];
let installed_package = block_on(local_domain.install_package(&remote_manifest))
.expect("Failed to install package");
block_on(installed_package.install_paths(&paths)).expect("Failed to install paths");
assert_err!(block_on(installed_package.install_paths(&paths)));
let status = block_on(installed_package.status()).expect("Failed to get status");
assert_eq!(status, InstalledPackageStatus::default());
let readme_path = installed_package.working_folder().join("README.md");
println!("readme_path: {readme_path:?}");
let old_readme =
block_on(fs::read_to_string(&readme_path)).expect("Failed to read README.md");
let timestamp = get_timestamp();
println!("timestamp: {timestamp:?}");
block_on(fs::write(readme_path, timestamp.as_bytes()))
.expect("Failed to overwrite README.md");
let status = block_on(installed_package.status()).expect("Failed to get status");
let expected_status = InstalledPackageStatus::new(
UpstreamState::default(),
ChangeSet::from([(
"README.md".into(),
Change {
current: Some(PackageFileFingerprint {
size: timestamp.len() as u64,
hash: ContentHash::SHA256(sha256::digest(×tamp)),
}),
previous: Some(PackageFileFingerprint {
size: old_readme.len() as u64,
hash: ContentHash::SHA256(sha256::digest(&old_readme)),
}),
},
)]),
);
assert_eq!(status, expected_status);
let commit_message = format!("Commit made at {}", timestamp);
let user_meta = serde_json::json!({
"test": "value",
"timestamp": timestamp,
})
.as_object()
.unwrap()
.to_owned();
block_on(installed_package.commit(commit_message, Some(user_meta)))
.expect("Failed to commit");
}
}