use std::collections::BTreeMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use tokio_stream::StreamExt;
use tracing::debug;
use tracing::info;
use url::Url;
use crate::Error;
use crate::Res;
use crate::error::ManifestError;
use crate::error::PackageOpError;
use crate::io::manifest::RowsStream;
use crate::io::manifest::StreamRowsChunk;
use crate::io::manifest::build_manifest_from_rows_stream;
use crate::io::storage::Storage;
use crate::lineage::Change;
use crate::lineage::CommitState;
use crate::lineage::InstalledPackageStatus;
use crate::lineage::PackageLineage;
use crate::lineage::PathState;
use crate::manifest::Manifest;
use crate::manifest::ManifestHeader;
use crate::manifest::ManifestRow;
use crate::manifest::Workflow;
use crate::paths::DomainPaths;
use quilt_uri::Namespace;
async fn stream_local_with_changes(
local_manifest: &Manifest,
removed: HashSet<PathBuf>,
modified: BTreeMap<PathBuf, ManifestRow>,
new_files: StreamRowsChunk,
) -> impl RowsStream {
let mut all_rows: Vec<Res<ManifestRow>> = Vec::new();
all_rows.extend(new_files);
let mut stream = local_manifest.records_stream().await;
while let Some(chunk_result) = stream.next().await {
if let Ok(chunk) = chunk_result {
for row_res in chunk {
match row_res {
Ok(row) => {
if removed.contains(&row.logical_key) {
continue;
}
if let Some(modified_row) = modified.get(&row.logical_key) {
all_rows.push(Ok(modified_row.clone()));
} else {
all_rows.push(Ok(row));
}
}
Err(err) => {
all_rows.push(Err(Error::Manifest(ManifestError::Table(err.to_string()))))
}
}
}
}
}
all_rows.sort_by(|a, b| match (a, b) {
(Ok(row_a), Ok(row_b)) => row_a.logical_key.cmp(&row_b.logical_key),
(Ok(_), Err(_)) => std::cmp::Ordering::Less,
(Err(_), Ok(_)) => std::cmp::Ordering::Greater,
(Err(_), Err(_)) => std::cmp::Ordering::Equal,
});
tokio_stream::iter(vec![Ok(all_rows)])
}
async fn create_immutable_object_copy(
storage: &impl Storage,
paths: &DomainPaths,
working_dir: &Path,
lineage: &mut PackageLineage,
logical_key: &PathBuf,
current: ManifestRow,
) -> Res<ManifestRow> {
debug!(
"⏳ Creating immutable object copy for: {}",
logical_key.display()
);
let objects_dir = paths.objects_dir();
let object_dest = objects_dir.join(hex::encode(current.hash.digest()));
let new_physical_key = Url::from_file_path(&object_dest)
.map_err(|_| {
Error::PackageOp(PackageOpError::Commit(format!(
"Failed to create URL from {:?}",
&object_dest
)))
})?
.to_string();
let current_hash = current.hash.clone();
let row = ManifestRow {
logical_key: logical_key.clone(),
physical_key: new_physical_key,
..current
};
let work_dest = working_dir.join(logical_key);
if !storage.exists(&object_dest).await {
debug!(
"⏳ Copying file to objects directory: {}",
object_dest.display()
);
storage.copy(&work_dest, object_dest).await?;
debug!("✔️ File copied successfully");
} else {
debug!(
"✔️ Object already exists in storage: {}",
object_dest.display()
);
}
lineage.paths.insert(
logical_key.clone(),
PathState {
timestamp: storage.modified_timestamp(&work_dest).await?,
hash: current_hash.into(),
},
);
Ok(row)
}
#[allow(clippy::too_many_arguments)]
pub async fn commit_package(
mut lineage: PackageLineage,
manifest: &mut Manifest,
paths: &DomainPaths,
storage: &(impl Storage + Sync),
working_dir: PathBuf,
status: InstalledPackageStatus,
namespace: Namespace,
message: String,
user_meta: Option<serde_json::Value>,
workflow: Option<Workflow>,
) -> Res<(PackageLineage, CommitState)> {
info!(
r#"⏳ Starting commit with message "{}" and user_meta `{:?}`"#,
message, user_meta
);
let mut modified_keys = BTreeMap::new();
let mut removed_keys = HashSet::new();
let mut new_files = Vec::new();
for (logical_key, state) in status.changes {
debug!(
"Processing change type {:?} for: {}",
state,
logical_key.display()
);
match state {
Change::Removed(row) => {
lineage.paths.remove(&row.logical_key);
removed_keys.insert(row.logical_key);
}
Change::Added(current) => {
if manifest.contains_record(¤t.logical_key) {
return Err(Error::PackageOp(PackageOpError::Commit(format!(
"Trying to add a file that is already in the manifest: \"{}\"",
current.logical_key.display()
))));
}
let added = create_immutable_object_copy(
storage,
paths,
&working_dir,
&mut lineage,
&logical_key,
current,
)
.await?;
new_files.push(Ok(added));
}
Change::Modified(current) => {
let modified = create_immutable_object_copy(
storage,
paths,
&working_dir,
&mut lineage,
&logical_key,
current,
)
.await?;
modified_keys.insert(logical_key.clone(), modified);
}
}
}
let processed_user_meta = match user_meta {
Some(serde_json::Value::Object(mut m)) => {
m.sort_keys();
Some(m.into())
}
other => other,
};
let header = ManifestHeader {
message: Some(message.to_string()),
workflow,
user_meta: processed_user_meta,
..ManifestHeader::default()
};
debug!(
"⏳ Building new manifest with {} removed, {} modified, {} new files",
removed_keys.len(),
modified_keys.len(),
new_files.len()
);
let stream = stream_local_with_changes(manifest, removed_keys, modified_keys, new_files).await;
let dest_dir = paths.installed_manifests_dir(&namespace);
let (manifest_path, new_top_hash) =
build_manifest_from_rows_stream(storage, dest_dir, header, stream).await?;
info!(
"✔️New manifest with {} was built in {}",
manifest_path.display(),
new_top_hash
);
let mut prev_hashes = Vec::new();
if let Some(commit) = lineage.commit {
prev_hashes.push(commit.hash.to_owned());
prev_hashes.extend(commit.prev_hashes.to_owned());
}
let commit = CommitState {
hash: new_top_hash,
timestamp: chrono::Utc::now(),
prev_hashes,
};
lineage.commit = Some(commit.clone());
info!(
"✔️ Successfully committed changes with hash: {}",
commit.hash
);
Ok((lineage, commit))
}
#[cfg(test)]
mod tests {
use super::*;
use test_log::test;
use std::collections::BTreeMap;
use aws_sdk_s3::primitives::ByteStream;
use crate::fixtures;
use crate::io::storage::mocks::MockStorage;
use crate::lineage::Change;
#[test(tokio::test)]
async fn test_commit_empty() -> Res {
let storage = MockStorage::default();
let lineage = PackageLineage::default();
assert!(lineage.commit.is_none());
let (_lineage, commit) = commit_package(
lineage,
&mut Manifest::default(),
&DomainPaths::default(),
&storage,
PathBuf::default(),
InstalledPackageStatus::default(),
("foo", "bar").into(),
String::default(),
None,
None,
)
.await?;
let hash = fixtures::top_hash::EMPTY_NONE_TOP_HASH;
assert!(
storage
.exists(&PathBuf::from(format!(".quilt/installed/foo/bar/{hash}")))
.await
);
assert_eq!(commit.hash, hash);
Ok(())
}
#[test(tokio::test)]
async fn test_commit_meta() -> Res {
let storage = MockStorage::default();
let commit_message = "Lorem ipsum".to_string();
let mut user_meta = serde_json::Map::new();
user_meta.insert(
"lorem".to_string(),
serde_json::Value::String("ipsum".to_string()),
);
let lineage = PackageLineage::default();
assert!(lineage.commit.is_none());
let (_lineage, commit) = commit_package(
lineage,
&mut Manifest::default(),
&DomainPaths::default(),
&storage,
PathBuf::default(),
InstalledPackageStatus::default(),
("foo", "bar").into(),
commit_message,
Some(serde_json::Value::Object(user_meta)),
None,
)
.await?;
let hash = "56c329d2390c9c6efedb698f47b75f096112c89a7751d55a426507ec6c432897";
assert!(
storage
.exists(&PathBuf::from(format!(".quilt/installed/foo/bar/{hash}")))
.await
);
assert_eq!(commit.hash, hash);
Ok(())
}
#[test(tokio::test)]
async fn test_removing_and_commit() -> Res {
let storage = MockStorage::default();
let status = InstalledPackageStatus {
changes: BTreeMap::from([(
PathBuf::from("one/two two/three three three/READ ME.md"),
Change::Removed(ManifestRow {
logical_key: PathBuf::from("one/two two/three three three/READ ME.md"),
..ManifestRow::default()
}),
)]),
..InstalledPackageStatus::default()
};
let lineage = PackageLineage {
paths: BTreeMap::from([(
PathBuf::from("one/two two/three three three/READ ME.md"),
PathState::default(),
)]),
..PackageLineage::default()
};
let mut manifest = crate::fixtures::manifest_with_objects_all_sizes::manifest().await?;
assert!(
lineage.commit.is_none(),
"Initial lineage has commit already"
);
assert!(
lineage
.paths
.contains_key(&PathBuf::from("one/two two/three three three/READ ME.md")),
"Initial lineage doesn't have testing path"
);
let (lineage, commit) = commit_package(
lineage,
&mut manifest,
&DomainPaths::default(),
&storage,
PathBuf::default(),
status,
("foo", "bar").into(),
String::from("Initial"),
Some(serde_json::json!({"A": "b", "z": "Y", "a": "B", "Z": "y"})),
None,
)
.await?;
let hash = "22590f2254e00b12f0c141117969172e925d6b8e9af26a04fa35658f1ad4e04c";
assert!(
!lineage
.paths
.contains_key(&PathBuf::from("one/two two/three three three/READ ME.md")),
"Commited lineage still has a path, that should be clear after commit"
);
assert!(
storage
.exists(&PathBuf::from(format!(".quilt/installed/foo/bar/{hash}")))
.await,
"Registry doesn't have installed package with a new hash"
);
assert_eq!(commit.hash, hash);
Ok(())
}
#[test(tokio::test)]
async fn test_adding_and_commit() -> Res {
let manifest = fixtures::manifest_with_objects_all_sizes::manifest().await?;
let base_record = manifest.get_record(&PathBuf::from("0mb.bin")).unwrap();
let added_file = ManifestRow {
logical_key: PathBuf::from("foo"),
hash: base_record.hash.clone(),
size: base_record.size,
physical_key: base_record.physical_key.clone(),
..ManifestRow::default()
};
let storage = MockStorage::default();
storage
.write_byte_stream(PathBuf::from("/working-dir/foo"), ByteStream::default())
.await?;
let status = InstalledPackageStatus {
changes: BTreeMap::from([(PathBuf::from("foo"), Change::Added(added_file.clone()))]),
..InstalledPackageStatus::default()
};
let lineage = PackageLineage::default();
let mut manifest = crate::fixtures::manifest_with_objects_all_sizes::manifest().await?;
assert!(
lineage.commit.is_none(),
"Initial lineage has commit already"
);
assert!(
!lineage.paths.contains_key(&PathBuf::from("foo")),
"Initial lineage has path, but shouldn't because we test _new_ file"
);
let (lineage, commit) = commit_package(
lineage,
&mut manifest,
&DomainPaths::new(PathBuf::from("/")),
&storage,
PathBuf::from("/working-dir"),
status,
("foo", "bar").into(),
String::from("Initial"),
Some(serde_json::json!({"A": "b", "z": "Y", "a": "B", "Z": "y"})),
None,
)
.await?;
let hash = fixtures::objects::ZERO_HASH_HEX;
assert!(
lineage.paths.contains_key(&PathBuf::from("foo")),
"Commited lineage doesn't have path, but should have. We added new file and it should be there."
);
assert!(
storage
.exists(&PathBuf::from(format!("/.quilt/objects/{hash}")))
.await,
"Registry doesn't have installed path"
);
assert_eq!(
commit.hash,
"e8fc7ccb96e87acd4ca02123e0c658ad92cdb2cc2822103d4f5bac79254cca08"
);
Ok(())
}
#[test(tokio::test)]
async fn test_adding_manifest_already_has_it() -> Res {
let manifest = fixtures::manifest_with_objects_all_sizes::manifest().await?;
let base_record = manifest
.get_record(&PathBuf::from("one/two two/three three three/READ ME.md"))
.unwrap();
let added_file = ManifestRow {
logical_key: PathBuf::from("one/two two/three three three/READ ME.md"),
hash: base_record.hash.clone(),
size: base_record.size,
physical_key: base_record.physical_key.clone(),
..ManifestRow::default()
};
let hash = added_file.hash.clone();
let storage = MockStorage::default();
storage
.write_byte_stream(
PathBuf::from("one/two two/three three three/READ ME.md"),
ByteStream::from_static(b"This is the README."),
)
.await?;
storage
.write_byte_stream(
PathBuf::from(format!(".quilt/objects/{}", hex::encode(hash.digest()))),
ByteStream::from_static(b"This is the README."),
)
.await?;
let status = InstalledPackageStatus {
changes: BTreeMap::from([(
PathBuf::from("one/two two/three three three/READ ME.md"),
Change::Added(added_file.clone()),
)]),
..InstalledPackageStatus::default()
};
let lineage = PackageLineage {
paths: BTreeMap::from([(
PathBuf::from("one/two two/three three three/READ ME.md"),
PathState::default(),
)]),
..PackageLineage::default()
};
let mut manifest = crate::fixtures::manifest_with_objects_all_sizes::manifest().await?;
let result = commit_package(
lineage,
&mut manifest,
&DomainPaths::new(PathBuf::from("/")),
&storage,
PathBuf::default(),
status,
("foo", "bar").into(),
String::from("Initial"),
Some(serde_json::json!({"A": "b", "z": "Y", "a": "B", "Z": "y"})),
None,
)
.await;
assert_eq!(
result.unwrap_err().to_string(),
"Commit error: Trying to add a file that is already in the manifest: \"one/two two/three three three/READ ME.md\""
);
Ok(())
}
#[test(tokio::test)]
async fn test_modifying_and_commit() -> Res {
let storage = MockStorage::default();
storage
.write_byte_stream(
PathBuf::from("/working-dir/one/two two/three three three/READ ME.md"),
ByteStream::from_static(fixtures::objects::less_than_8mb()),
)
.await?;
let manifest = fixtures::manifest_with_objects_all_sizes::manifest().await?;
let base_record = manifest
.get_record(&PathBuf::from("less-then-8mb.txt"))
.unwrap();
let modified_file = ManifestRow {
logical_key: PathBuf::from("one/two two/three three three/READ ME.md"),
hash: base_record.hash.clone(),
size: base_record.size,
physical_key: base_record.physical_key.clone(),
..ManifestRow::default()
};
let status = InstalledPackageStatus {
changes: BTreeMap::from([(
PathBuf::from("one/two two/three three three/READ ME.md"),
Change::Modified(modified_file),
)]),
..InstalledPackageStatus::default()
};
let lineage = PackageLineage {
paths: BTreeMap::from([(
PathBuf::from("one/two two/three three three/READ ME.md"),
PathState::default(),
)]),
..PackageLineage::default()
};
let mut manifest = crate::fixtures::manifest_with_objects_all_sizes::manifest().await?;
assert!(
lineage.commit.is_none(),
"Initial lineage has commit already"
);
assert!(
lineage
.paths
.contains_key(&PathBuf::from("one/two two/three three three/READ ME.md")),
"Initial lineage doesn't have path, but should because we test installed and modified file"
);
let (lineage, commit) = commit_package(
lineage,
&mut manifest,
&DomainPaths::new(PathBuf::from("/")),
&storage,
PathBuf::from("/working-dir"),
status,
("foo", "bar").into(),
String::from("Initial"),
Some(serde_json::json!({"A": "b", "z": "Y", "a": "B", "Z": "y"})),
None,
)
.await?;
assert!(
lineage
.paths
.contains_key(&PathBuf::from("one/two two/three three three/READ ME.md")),
"Commited lineage doesn't have path, but should have. We added new file and it should be there."
);
assert!(
storage
.exists(&PathBuf::from(format!(
"/.quilt/objects/{}",
fixtures::objects::LESS_THAN_8MB_HASH_HEX
)))
.await,
"Registry doesn't have installed path"
);
assert_eq!(
commit.hash,
"39bbc9a95f787cd938fb5830abe5e25408f0aac4000528b8717130be5f7bc2b3"
);
Ok(())
}
}