use std::{
collections::{hash_map::RandomState, BTreeMap, HashMap, HashSet, VecDeque},
path::PathBuf,
};
use aws_sdk_s3::{error::SdkError, primitives::ByteStream};
use multihash::Multihash;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::{
fs::{create_dir_all, read_dir, remove_dir_all, File},
io::{AsyncReadExt, AsyncWriteExt},
};
use url::Url;
pub mod lineage;
pub mod manifest;
pub mod storage;
pub mod uri;
use crate::{quilt4::table::HEADER_ROW, s3_utils, Row4, Table, UPath};
pub use self::{
lineage::{CommitState, DomainLineage, PackageLineage, PathState},
manifest::{ContentHash, Manifest, ManifestHeader, ManifestRow},
storage::{fs, s3},
uri::{RevisionPointer, S3PackageURI},
};
const MANIFEST_DIR: &str = ".quilt/packages";
const TAGS_DIR: &str = ".quilt/named_packages";
const OBJECTS_DIR: &str = ".quilt/objects";
const LINEAGE_FILE: &str = ".quilt/data.json";
const INSTALLED_DIR: &str = ".quilt/installed";
const MULTIHASH_SHA256: u64 = 0x16;
pub fn tag_key(namespace: &str, tag: &str) -> String {
format!("{TAGS_DIR}/{namespace}/{tag}")
}
pub fn tag_uri(bucket: &str, namespace: &str, tag: &str) -> s3::S3Uri {
s3::S3Uri {
bucket: bucket.to_owned(),
key: tag_key(namespace, tag),
version: None,
}
}
fn parquet_manifest_filename(top_hash: &str) -> String {
format!("1220{}.parquet", top_hash)
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RemoteManifest {
pub bucket: String,
pub namespace: String,
pub hash: String,
}
impl RemoteManifest {
pub async fn resolve(uri: &S3PackageURI) -> Result<Self, String> {
let top_hash = match &uri.revision {
RevisionPointer::Hash(top_hash) => top_hash.clone(),
RevisionPointer::Tag(tag) => {
tag_uri(&uri.bucket, &uri.namespace, tag)
.get_contents()
.await?
}
};
Ok(Self {
bucket: uri.bucket.clone(),
namespace: uri.namespace.clone(),
hash: top_hash,
})
}
pub async fn resolve_latest(&self) -> Result<String, String> {
tag_uri(&self.bucket, &self.namespace, "latest")
.get_contents()
.await
}
pub async fn update_latest(&self, hash: String) -> Result<(), String> {
tag_uri(&self.bucket, &self.namespace, "latest")
.put_contents(hash.into_bytes())
.await
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct CachedManifest {
pub domain: LocalDomain,
pub bucket: String,
pub hash: String,
}
impl CachedManifest {
pub async fn read(&self) -> Result<Table, String> {
let pathbuf = self.domain.manifest_cache_path(&self.bucket, &self.hash);
let path = UPath::Local(pathbuf);
let table = Table::read_from_upath(&path)
.await
.map_err(|err| err.to_string())?;
Ok(table)
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct InstalledManifest {
pub package: InstalledPackage,
pub hash: String,
}
impl InstalledManifest {
pub async fn read(&self) -> Result<Table, String> {
let pathbuf = self
.package
.domain
.installed_manifest_path(&self.package.namespace, &self.hash);
let path = UPath::Local(pathbuf);
let table = Table::read_from_upath(&path)
.await
.map_err(|err| err.to_string())?;
Ok(table)
}
}
#[derive(Debug, PartialEq, Eq)]
struct S3Domain {
bucket: String,
}
impl From<&S3PackageURI> for S3Domain {
fn from(uri: &S3PackageURI) -> Self {
Self {
bucket: uri.bucket.clone(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct LocalDomain {
root_dir: PathBuf,
}
impl LocalDomain {
pub fn new(root_dir: PathBuf) -> Self {
Self { root_dir }
}
pub fn make_cached_manifest(
&self,
bucket: impl AsRef<str>,
hash: impl AsRef<str>,
) -> CachedManifest {
CachedManifest {
domain: self.clone(),
bucket: String::from(bucket.as_ref()),
hash: String::from(hash.as_ref()),
}
}
pub fn manifest_cache_path(&self, bucket: &str, hash: &str) -> PathBuf {
self.root_dir.join(MANIFEST_DIR).join(bucket).join(hash)
}
pub fn working_folder(&self, namespace: &str) -> PathBuf {
self.root_dir.join(namespace)
}
pub async fn read_lineage(&self) -> Result<DomainLineage, String> {
let lineage_path = self.root_dir.join(LINEAGE_FILE);
let contents = fs::read_to_string(&lineage_path).await.or_else(|err| {
if err.kind() == std::io::ErrorKind::NotFound {
Ok("{}".into())
} else {
Err(format!(
"Failed to read the lineage file: {}",
err.to_string()
))
}
})?;
DomainLineage::try_from(&contents[..])
}
pub async fn write_lineage(&self, lineage: &DomainLineage) -> Result<(), String> {
let lineage_path = self.root_dir.join(LINEAGE_FILE);
let contents = serde_json::to_string_pretty(lineage).map_err(|err| err.to_string())?;
fs::write(lineage_path, contents.as_bytes())
.await
.map_err(|err| err.to_string())
}
pub async fn cache_remote_manifest(
&self,
manifest: &RemoteManifest,
) -> Result<CachedManifest, String> {
let cache_path = self.manifest_cache_path(&manifest.bucket, &manifest.hash);
create_dir_all(&cache_path.parent().unwrap())
.await
.map_err(|err| err.to_string())?;
if !fs::exists(&cache_path).await {
let client = crate::s3_utils::get_client_for_bucket(&manifest.bucket).await?;
let result = client
.get_object()
.bucket(&manifest.bucket)
.key(format!(
"{}/{}",
MANIFEST_DIR,
parquet_manifest_filename(&manifest.hash)
))
.send()
.await;
match result {
Ok(output) => {
let mut contents = Vec::new();
output
.body
.into_async_read()
.read_to_end(&mut contents)
.await
.map_err(|err| err.to_string())?;
fs::write(&cache_path, &contents).await.map_err(|err| {
format!("Failed to write manifest to {cache_path:?}: {err}")
})?;
}
Err(SdkError::ServiceError(err)) if err.err().is_no_such_key() => {
let result = client
.get_object()
.bucket(&manifest.bucket)
.key(format!("{}/{}", MANIFEST_DIR, &manifest.hash))
.send()
.await
.map_err(|err| {
err.into_service_error()
.meta()
.message()
.unwrap_or("failed to download s3 object")
.to_string()
})?;
let quilt3_manifest =
Manifest::from_file(result.body.into_async_read()).await?;
let header = Row4 {
name: HEADER_ROW.into(),
place: HEADER_ROW.into(),
path: None,
size: 0,
hash: Multihash::default(),
info: serde_json::json!({
"message": quilt3_manifest.header.message,
"version": quilt3_manifest.header.version,
}),
meta: match quilt3_manifest.header.user_meta {
Some(meta) => meta.into(),
None => serde_json::Value::Null,
},
};
let mut records = BTreeMap::new();
for row in quilt3_manifest.rows {
let ContentHash::SHA256(hash) = row.hash;
let hash_bytes = hex::decode(hash).map_err(|err| err.to_string())?;
let mut info = row.meta.unwrap_or_default();
let meta = info.remove("user_meta").unwrap_or_default();
records.insert(
row.logical_key.clone(),
Row4 {
name: row.logical_key,
place: row.physical_key,
path: None,
size: row.size,
hash: Multihash::wrap(MULTIHASH_SHA256, &hash_bytes).unwrap(),
info: info.into(),
meta,
},
);
}
let table = Table { header, records };
table
.write_to_upath(&UPath::Local(cache_path))
.await
.map_err(|err| err.to_string())?;
}
Err(err) => {
return Err(err
.into_service_error()
.meta()
.message()
.unwrap_or("failed to download s3 object")
.to_string());
}
}
}
Ok(CachedManifest {
domain: self.to_owned(),
bucket: manifest.bucket.clone(),
hash: manifest.hash.clone(),
})
}
pub async fn browse_remote_manifest(&self, remote: &RemoteManifest) -> Result<Table, String> {
self.cache_remote_manifest(remote).await?.read().await
}
pub async fn browse_uri(&self, uri: &S3PackageURI) -> Result<Table, String> {
let remote_manifest = RemoteManifest::resolve(uri).await?;
self.browse_remote_manifest(&remote_manifest).await
}
pub fn installed_manifests_path(&self, namespace: &str) -> PathBuf {
self.root_dir.join(INSTALLED_DIR).join(namespace)
}
pub fn installed_manifest_path(&self, namespace: &str, hash: &str) -> PathBuf {
self.installed_manifests_path(namespace).join(hash)
}
pub async fn install_package(
&self,
remote: &RemoteManifest,
) -> Result<InstalledPackage, String> {
let lineage: DomainLineage = self.read_lineage().await?;
if lineage.packages.contains_key(&remote.namespace) {
return Err(format!(
"Package '{}' is already installed",
remote.namespace
));
}
self.cache_remote_manifest(remote).await?;
let installed_manifest_path = self.installed_manifest_path(&remote.namespace, &remote.hash);
create_dir_all(&installed_manifest_path.parent().unwrap())
.await
.map_err(|err| err.to_string())?;
tokio::fs::copy(
self.manifest_cache_path(&remote.bucket, &remote.hash),
installed_manifest_path,
)
.await
.map_err(|err| err.to_string())?;
let objects_dir = self.root_dir.join(OBJECTS_DIR);
create_dir_all(&objects_dir)
.await
.map_err(|err| err.to_string())?;
let working_dir = self.working_folder(&remote.namespace);
create_dir_all(&working_dir)
.await
.map_err(|err| err.to_string())?;
let latest_hash = remote.resolve_latest().await?;
let mut lineage = lineage;
lineage.packages.insert(
remote.namespace.clone(),
PackageLineage::from_remote(remote.to_owned(), latest_hash),
);
self.write_lineage(&lineage).await?;
Ok(InstalledPackage {
domain: self.to_owned(),
namespace: remote.namespace.clone(),
})
}
pub async fn uninstall_package(&self, namespace: impl AsRef<str>) -> Result<(), String> {
let namespace = namespace.as_ref();
let mut lineage = self.read_lineage().await?;
lineage
.packages
.remove(namespace)
.ok_or("Package not installed".to_string())?;
self.write_lineage(&lineage).await?;
if let Err(err) = remove_dir_all(self.installed_manifests_path(namespace)).await {
println!("Failed to remove installed manifests: {err}");
}
if let Err(err) = remove_dir_all(self.working_folder(namespace)).await {
println!("Failed to remove working directory: {err}");
}
Ok(())
}
pub async fn list_installed_packages(&self) -> Result<Vec<InstalledPackage>, String> {
let lineage = self.read_lineage().await?;
let mut namespaces: Vec<String> = lineage.packages.into_keys().collect();
namespaces.sort();
let packages = namespaces
.into_iter()
.map(|namespace| InstalledPackage {
domain: self.to_owned(),
namespace,
})
.collect();
Ok(packages)
}
pub async fn get_installed_package(
&self,
namespace: &str,
) -> Result<Option<InstalledPackage>, String> {
let lineage = self.read_lineage().await?;
if lineage.packages.contains_key(namespace) {
Ok(Some(InstalledPackage {
domain: self.to_owned(),
namespace: namespace.to_owned(),
}))
} else {
Ok(None)
}
}
}
#[derive(Debug, PartialEq, Eq, Serialize)]
pub struct Change<T> {
pub current: Option<T>,
pub previous: Option<T>,
}
pub type ChangeSet<K, T> = BTreeMap<K, Change<T>>;
#[derive(Debug, PartialEq, Eq, Default, Serialize)]
pub struct UpstreamState {
commit_pending: bool, behind: bool, ahead: bool, }
impl UpstreamState {
pub fn from_lineage(lineage: &PackageLineage) -> Self {
Self {
commit_pending: lineage.commit.is_some(),
behind: lineage.base_hash != lineage.latest_hash,
ahead: lineage.base_hash != lineage.current_hash(),
}
}
}
#[derive(Debug, PartialEq, Eq, Default, Serialize)]
pub enum UpstreamDiscreteState {
#[default]
UpToDate,
Behind,
Ahead,
Diverged,
}
impl From<&UpstreamState> for UpstreamDiscreteState {
fn from(upstream: &UpstreamState) -> Self {
match (upstream.ahead, upstream.behind) {
(false, false) => Self::UpToDate,
(false, true) => Self::Behind,
(true, false) => Self::Ahead,
(true, true) => Self::Diverged,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct PackageFileFingerprint {
pub size: u64,
pub hash: Multihash<256>,
}
#[derive(Debug, PartialEq, Default)]
pub struct InstalledPackageStatus {
pub upstream: UpstreamState,
pub upstream_state: UpstreamDiscreteState,
pub dirty: bool, pub changes: ChangeSet<String, PackageFileFingerprint>,
}
impl InstalledPackageStatus {
pub fn new(
upstream: UpstreamState,
changes: ChangeSet<String, PackageFileFingerprint>,
) -> Self {
Self {
upstream_state: UpstreamDiscreteState::from(&upstream),
upstream,
dirty: !changes.is_empty(),
changes,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct InstalledPackage {
pub domain: LocalDomain,
pub namespace: String,
}
impl InstalledPackage {
pub async fn lineage(&self) -> Result<PackageLineage, String> {
self.domain
.read_lineage()
.await?
.packages
.get(&self.namespace)
.ok_or("not found".to_string())
.cloned()
}
pub async fn write_lineage(&self, lineage: PackageLineage) -> Result<(), String> {
let mut domain_lineage = self.domain.read_lineage().await?;
domain_lineage
.packages
.insert(self.namespace.clone(), lineage);
self.domain.write_lineage(&domain_lineage).await
}
pub async fn paths(&self) -> Result<Vec<String>, String> {
self.lineage().await.map(|l| l.paths.into_keys().collect())
}
pub fn make_installed_manifest(&self, hash: impl AsRef<str>) -> InstalledManifest {
InstalledManifest {
package: self.to_owned(),
hash: String::from(hash.as_ref()),
}
}
pub async fn manifest(&self) -> Result<InstalledManifest, String> {
self.lineage()
.await
.map(|l| self.make_installed_manifest(l.current_hash()))
}
pub fn working_folder(&self) -> PathBuf {
self.domain.working_folder(&self.namespace)
}
pub async fn uninstall(&self) -> Result<(), String> {
self.domain.uninstall_package(&self.namespace).await
}
pub async fn status(&self) -> Result<InstalledPackageStatus, String> {
let mut lineage = self.lineage().await?;
if let Ok(latest_hash) = lineage.remote.resolve_latest().await {
lineage.latest_hash = latest_hash;
self.write_lineage(lineage.clone()).await?;
}
let table = self.manifest().await?.read().await?;
let work_dir = self.working_folder();
let mut orig_paths = HashMap::new();
for path in lineage.paths.keys() {
let row = table.get_row(path).ok_or("no such path")?;
orig_paths.insert(PathBuf::from(path), (row.hash.clone(), row.size));
}
let mut queue = VecDeque::new();
queue.push_back(work_dir.clone());
let mut changes = ChangeSet::new();
while let Some(dir) = queue.pop_front() {
let mut dir_entries = match read_dir(&dir).await {
Ok(dir_entries) => dir_entries,
Err(err) => {
println!("Failed to read directory {:?}: {}", dir, err);
continue;
}
};
while let Some(dir_entry) = dir_entries
.next_entry()
.await
.map_err(|err| err.to_string())?
{
let file_path = dir_entry.path();
let file_type = dir_entry.file_type().await.map_err(|err| err.to_string())?;
if file_type.is_dir() {
queue.push_back(file_path);
} else if file_type.is_file() {
let file_metadata =
dir_entry.metadata().await.map_err(|err| err.to_string())?;
let sha256_hash = sha256::try_async_digest(&file_path)
.await
.map_err(|err| err.to_string())?;
let file_hash =
Multihash::wrap(MULTIHASH_SHA256, &hex::decode(sha256_hash).unwrap())
.unwrap();
let relative_path = file_path.strip_prefix(&work_dir).unwrap();
if let Some((orig_hash, orig_size)) = orig_paths.remove(relative_path) {
if file_hash != orig_hash {
changes.insert(
relative_path.display().to_string(),
Change {
current: Some(PackageFileFingerprint {
size: file_metadata.len(),
hash: file_hash,
}),
previous: Some(PackageFileFingerprint {
size: orig_size,
hash: orig_hash,
}),
},
);
}
} else {
changes.insert(
relative_path.display().to_string(),
Change {
current: Some(PackageFileFingerprint {
size: file_metadata.len(),
hash: file_hash,
}),
previous: None,
},
);
}
} else {
println!("Unexpected file type: {}", file_path.display());
}
}
}
for (orig_path, (orig_hash, orig_size)) in orig_paths {
changes.insert(
orig_path.display().to_string(),
Change {
current: None,
previous: Some(PackageFileFingerprint {
size: orig_size,
hash: orig_hash,
}),
},
);
}
Ok(InstalledPackageStatus::new(
UpstreamState::from_lineage(&lineage),
changes,
))
}
pub async fn install_paths(&self, paths: &Vec<String>) -> Result<(), String> {
if paths.len() == 0 {
return Ok(());
}
let mut lineage = self.lineage().await?;
if !HashSet::<String, RandomState>::from_iter(lineage.paths.keys().cloned())
.is_disjoint(&HashSet::from_iter(paths.to_owned()))
{
return Err(format!("duplicate paths"));
}
let objects_dir = self.domain.root_dir.join(OBJECTS_DIR);
let working_dir = self.working_folder();
let mut table = self.manifest().await?.read().await?;
for path in paths {
let row = table.records.get_mut(path).ok_or("no such path")?;
let parsed_url = Url::parse(&row.place).map_err(|err| err.to_string())?;
if parsed_url.scheme() != "s3" {
return Err("invalid scheme".into());
}
let bucket = parsed_url.host_str().ok_or("missing bucket")?;
let key = &parsed_url.path()[1..];
let query: HashMap<_, _> = parsed_url.query_pairs().into_owned().collect();
let version_id = query.get("versionId").ok_or("missing versionId")?;
let object_dest = objects_dir.join(hex::encode(row.hash.digest()));
if !fs::exists(&object_dest).await {
let mut file = File::create(&object_dest)
.await
.map_err(|err| err.to_string())?;
let client = s3_utils::get_client_for_bucket(bucket.into()).await?;
let mut object = client
.get_object()
.bucket(bucket)
.key(key)
.version_id(version_id)
.send()
.await
.map_err(|err| {
err.into_service_error()
.meta()
.message()
.unwrap_or("failed to download")
.to_string()
})?;
while let Some(bytes) = object
.body
.try_next()
.await
.map_err(|err| err.to_string())?
{
file.write_all(&bytes)
.await
.map_err(|err| err.to_string())?;
}
file.flush().await.map_err(|err| err.to_string())?;
}
row.place = Url::from_file_path(&object_dest).unwrap().to_string();
let working_dest = working_dir.join(&row.name);
let parent_dir = working_dest.parent();
if let Some(_) = parent_dir {
tokio::fs::create_dir_all(parent_dir.unwrap())
.await
.map_err(|err| err.to_string())?;
}
tokio::fs::copy(&object_dest, &working_dest)
.await
.map_err(|err| err.to_string())?;
let timestamp = fs::get_file_modified_ts(&working_dest).await?;
lineage.paths.insert(
row.name.to_owned(),
PathState {
timestamp,
hash: row.hash.to_owned(),
},
);
}
let installed_manifest_path = self
.domain
.installed_manifest_path(&self.namespace, lineage.current_hash());
table
.write_to_upath(&UPath::Local(installed_manifest_path))
.await
.map_err(|err| err.to_string())?;
self.write_lineage(lineage).await?;
Ok(())
}
pub async fn uninstall_paths(&self, paths: &Vec<String>) -> Result<(), String> {
println!("uninstall_paths: {paths:?}");
let mut lineage = self.lineage().await?;
let working_dir = self.working_folder();
for path in paths {
lineage.paths.remove(path).ok_or("path is not installed")?;
let working_path = working_dir.join(path);
match tokio::fs::remove_file(working_path).await {
Ok(()) => (),
Err(err) => {
if err.kind() != std::io::ErrorKind::NotFound {
return Err(err.to_string());
}
}
};
}
self.write_lineage(lineage).await?;
Ok(())
}
pub async fn revert_paths(&self, paths: &Vec<String>) -> Result<(), String> {
println!("revert_paths: {paths:?}");
Err("not implemented".into())
}
pub async fn commit(
&self,
message: String,
user_meta: Option<manifest::JsonObject>,
) -> Result<(), String> {
println!("commit: {message:?}, {user_meta:?}");
let mut lineage = self.domain.read_lineage().await?;
let package_lineage = lineage
.packages
.get_mut(&self.namespace)
.ok_or("not found")?;
let status = self.status().await?;
let objects_dir = self.domain.root_dir.join(OBJECTS_DIR);
create_dir_all(&objects_dir)
.await
.map_err(|err| err.to_string())?;
let work_dir = self.working_folder();
let mut table = self.manifest().await?.read().await?;
for (logical_key, Change { current, previous }) in status.changes {
if let Some(previous) = previous {
let removed = table
.records
.remove(&logical_key)
.ok_or(format!("cannot remove {}", logical_key))?;
if removed.size != previous.size || removed.hash != previous.hash {
return Err(format!(
"unexpected size or hash for removed {}",
logical_key
));
}
package_lineage.paths.remove(&logical_key);
}
if let Some(current) = current {
let object_dest = objects_dir.join(hex::encode(current.hash.digest()));
let new_physical_key = Url::from_file_path(&object_dest).unwrap().into();
if table
.records
.insert(
logical_key.to_owned(),
Row4 {
name: logical_key.to_owned(),
place: new_physical_key,
path: None,
size: current.size,
hash: current.hash.clone(),
info: serde_json::Value::default(),
meta: serde_json::Value::default(),
},
)
.is_some()
{
return Err(format!("cannot overwrite {}", logical_key));
}
let work_dest = work_dir.join(&logical_key);
if !fs::exists(&object_dest).await {
tokio::fs::copy(&work_dest, object_dest)
.await
.map_err(|err| err.to_string())?;
}
package_lineage.paths.insert(
logical_key,
PathState {
timestamp: fs::get_file_modified_ts(&work_dest).await?,
hash: current.hash,
},
);
}
}
table.header.info = json!({
"message": message,
"version": "v0",
});
if let Some(user_meta) = user_meta {
table.header.meta = user_meta.into();
}
let new_top_hash = table.top_hash();
let new_manifest_path = self
.domain
.installed_manifest_path(&self.namespace, &new_top_hash);
table
.write_to_upath(&UPath::Local(new_manifest_path))
.await
.map_err(|err| err.to_string())?;
let mut prev_hashes = Vec::new();
if let Some(commit) = &package_lineage.commit {
prev_hashes.push(commit.hash.to_owned());
prev_hashes.extend(commit.prev_hashes.to_owned());
}
let commit = CommitState {
hash: new_top_hash,
timestamp: chrono::Utc::now(),
prev_hashes,
};
package_lineage.commit = Some(commit);
self.domain.write_lineage(&lineage).await?;
Ok(())
}
pub async fn push(&self) -> Result<(), String> {
let mut lineage = self.lineage().await?;
let commit = match lineage.commit {
None => return Ok(()), Some(commit) => commit,
};
let remote = &lineage.remote;
let mut local_manifest = self.manifest().await?.read().await?;
let remote_manifest = self.domain.browse_remote_manifest(remote).await?;
let client = crate::s3_utils::get_client_for_bucket(&remote.bucket).await?;
for row in local_manifest.records.values_mut() {
if let Some(remote_row) = remote_manifest.records.get(&row.name) {
if remote_row.eq(row) {
row.place = remote_row.place.to_owned();
continue;
}
}
let local_url = Url::parse(&row.place).unwrap();
let file_path: PathBuf = local_url.to_file_path().unwrap();
let body = ByteStream::from_path(&file_path)
.await
.map_err(|err| err.to_string())?;
let s3_key = format!("{}/{}", self.namespace, row.name);
println!("uploading to s3({}): {}", remote.bucket, s3_key);
let response = client
.put_object()
.bucket(&remote.bucket)
.key(&s3_key)
.body(body)
.send()
.await
.map_err(|err| err.to_string())?;
let mut remote_url = Url::parse("s3://").unwrap();
remote_url
.set_host(Some(&remote.bucket))
.expect("failed to set bucket");
remote_url.set_path(&s3_key);
if let Some(version_id) = response.version_id {
remote_url
.query_pairs_mut()
.append_pair("versionId", &version_id);
}
println!("got remote url: {}", remote_url);
row.place = remote_url.to_string();
}
let top_hash = local_manifest.top_hash();
let new_remote = RemoteManifest {
hash: top_hash.clone(),
..remote.clone()
};
let cache_path = self
.domain
.manifest_cache_path(&new_remote.bucket, &new_remote.hash);
local_manifest
.write_to_upath(&UPath::Local(cache_path.clone()))
.await
.map_err(|err| err.to_string())?;
let manifest_key = format!(
"{MANIFEST_DIR}/{}",
parquet_manifest_filename(&new_remote.hash)
);
println!("writing remote manifest to {manifest_key}");
let body = ByteStream::from_path(&cache_path)
.await
.map_err(|err| err.to_string())?;
client
.put_object()
.bucket(&new_remote.bucket)
.key(&manifest_key)
.body(body)
.send()
.await
.map_err(|err| err.to_string())?;
let quilt3_manifest = Manifest {
header: ManifestHeader {
version: "v0".into(),
message: local_manifest
.header
.info
.get("message")
.map(|v| v.as_str())
.flatten()
.map(|s| s.to_string()),
user_meta: local_manifest.header.meta.as_object().cloned(),
},
rows: local_manifest
.records
.values()
.map(|row| {
let mut meta = match row.info.as_object() {
Some(meta) => meta.clone(),
None => serde_json::Map::default(),
};
if row.meta.is_object() {
meta.insert("user_meta".into(), row.meta.clone());
}
ManifestRow {
logical_key: row.name.clone(),
physical_key: row.place.clone(),
hash: ContentHash::SHA256(hex::encode(row.hash.digest())),
size: row.size,
meta: Some(meta),
}
})
.collect(),
};
client
.put_object()
.bucket(&new_remote.bucket)
.key(format!("{MANIFEST_DIR}/{}", &new_remote.hash))
.body(quilt3_manifest.to_jsonlines().as_bytes().to_vec().into())
.send()
.await
.map_err(|err| err.to_string())?;
println!("uploaded remote manifest: {new_remote:?}");
client
.put_object()
.bucket(&new_remote.bucket)
.key(&format!(
"{TAGS_DIR}/{}/{}",
new_remote.namespace,
commit.timestamp.timestamp(),
))
.body(new_remote.hash.as_bytes().to_vec().into())
.send()
.await
.map_err(|err| err.to_string())?;
lineage.latest_hash = new_remote.resolve_latest().await?;
lineage.remote = new_remote;
lineage.commit = None;
if lineage.base_hash == lineage.latest_hash {
lineage.remote.update_latest(top_hash.clone()).await?;
lineage.latest_hash = top_hash.clone();
lineage.base_hash = top_hash.clone();
}
self.write_lineage(lineage).await?;
Ok(())
}
pub async fn pull(&self) -> Result<(), String> {
let status = self.status().await?;
if !status.changes.is_empty() {
return Err("package has pending changes".into());
}
let lineage = self.lineage().await?;
if lineage.commit.is_some() {
return Err("package has pending commits".into());
}
if lineage.remote.hash != lineage.base_hash {
return Err("package is has diverged".into());
}
if lineage.base_hash == lineage.latest_hash {
return Err("package is already up to date".into());
}
let paths: Vec<String> = lineage.paths.keys().cloned().collect();
self.uninstall_paths(&paths).await?;
let mut lineage = self.lineage().await?;
lineage.remote.hash = lineage.latest_hash.clone();
lineage.base_hash = lineage.latest_hash.clone();
self.domain.cache_remote_manifest(&lineage.remote).await?;
tokio::fs::copy(
self.domain
.manifest_cache_path(&lineage.remote.bucket, &lineage.remote.hash),
self.domain
.installed_manifest_path(&self.namespace, &lineage.remote.hash),
)
.await
.map_err(|err| err.to_string())?;
self.write_lineage(lineage).await?;
let manifest = self.manifest().await?.read().await?;
let paths_to_install = paths
.into_iter()
.filter(|x| manifest.records.contains_key(x))
.collect();
self.install_paths(&paths_to_install).await?;
Ok(())
}
pub async fn certify_latest(&self) -> Result<(), String> {
let mut lineage = self.lineage().await?;
let new_latest = lineage.remote.hash.clone();
lineage.remote.update_latest(new_latest.clone()).await?;
lineage.latest_hash = new_latest.clone();
lineage.base_hash = new_latest;
self.write_lineage(lineage).await
}
pub async fn reset_to_latest(&self) -> Result<(), String> {
let lineage = self.lineage().await?;
let new_latest = lineage.remote.resolve_latest().await?;
if new_latest == lineage.remote.hash {
return Ok(());
}
let paths: Vec<String> = lineage.paths.into_keys().collect();
self.uninstall_paths(&paths).await?;
let mut lineage = self.lineage().await?;
lineage.latest_hash = new_latest.clone();
lineage.remote.hash = new_latest.clone();
lineage.base_hash = new_latest;
self.domain.cache_remote_manifest(&lineage.remote).await?;
tokio::fs::copy(
self.domain
.manifest_cache_path(&lineage.remote.bucket, &lineage.remote.hash),
self.domain
.installed_manifest_path(&self.namespace, &lineage.remote.hash),
)
.await
.map_err(|err| err.to_string())?;
self.write_lineage(lineage).await?;
let manifest = self.manifest().await?.read().await?;
let paths_to_install = paths
.into_iter()
.filter(|x| manifest.records.contains_key(x))
.collect();
self.install_paths(&paths_to_install).await
}
}
#[derive(Debug, PartialEq)]
pub struct Conflict {
package: InstalledPackage,
changes: ChangeSet<String, PackageFileFingerprint>,
folder: PathBuf,
}
#[cfg(test)]
mod tests {
use super::*;
use temp_testdir::TempDir;
use tokio_test::{assert_err, block_on};
fn get_timestamp() -> String {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
.to_string()
}
#[test]
#[ignore]
fn flow() {
let test_uri_string = "quilt+s3://quilt-example#package=akarve/test_dest&path=README.md";
let test_uri = S3PackageURI::try_from(test_uri_string).expect("Failed to parse URI");
assert_eq!(
test_uri,
S3PackageURI {
bucket: "quilt-example".into(),
namespace: "akarve/test_dest".into(),
path: Some("README.md".into()),
revision: RevisionPointer::default(),
}
);
let temp_dir = TempDir::default();
let local_path = PathBuf::from(temp_dir.as_ref());
let local_domain = LocalDomain::new(local_path);
let remote_manifest =
block_on(RemoteManifest::resolve(&test_uri)).expect("Failed to resolve manifest");
let cached_manifest = block_on(local_domain.cache_remote_manifest(&remote_manifest))
.expect("Failed to cache the manifest");
let manifest = block_on(cached_manifest.read()).expect("Failed to parse the manifest");
println!("manifest: {manifest:?}");
let paths = vec![test_uri.path.unwrap()];
let installed_package = block_on(local_domain.install_package(&remote_manifest))
.expect("Failed to install package");
block_on(installed_package.install_paths(&paths)).expect("Failed to install paths");
assert_err!(block_on(installed_package.install_paths(&paths)));
let status = block_on(installed_package.status()).expect("Failed to get status");
assert_eq!(status, InstalledPackageStatus::default());
let readme_path = installed_package.working_folder().join("README.md");
println!("readme_path: {readme_path:?}");
let old_readme =
block_on(fs::read_to_string(&readme_path)).expect("Failed to read README.md");
let timestamp = get_timestamp();
println!("timestamp: {timestamp:?}");
block_on(fs::write(readme_path, timestamp.as_bytes()))
.expect("Failed to overwrite README.md");
let status = block_on(installed_package.status()).expect("Failed to get status");
let expected_status = InstalledPackageStatus::new(
UpstreamState::default(),
ChangeSet::from([(
"README.md".into(),
Change {
current: Some(PackageFileFingerprint {
size: timestamp.len() as u64,
hash: Multihash::wrap(
MULTIHASH_SHA256,
&hex::decode(sha256::digest(×tamp)).unwrap(),
)
.unwrap(),
}),
previous: Some(PackageFileFingerprint {
size: old_readme.len() as u64,
hash: Multihash::wrap(
MULTIHASH_SHA256,
&hex::decode(sha256::digest(&old_readme)).unwrap(),
)
.unwrap(),
}),
},
)]),
);
assert_eq!(status, expected_status);
let commit_message = format!("Commit made at {}", timestamp);
let user_meta = serde_json::json!({
"test": "value",
"timestamp": timestamp,
})
.as_object()
.unwrap()
.to_owned();
block_on(installed_package.commit(commit_message, Some(user_meta)))
.expect("Failed to commit");
}
}