use std::collections::HashMap;
use std::collections::VecDeque;
use std::path::PathBuf;
use tracing::log;
use crate::checksum::calculate_sha256_checksum;
use crate::checksum::calculate_sha256_chunked_checksum;
use crate::checksum::MULTIHASH_SHA256_CHUNKED;
use crate::io::manifest::resolve_latest;
use crate::io::remote::Remote;
use crate::io::storage::Storage;
use crate::lineage::Change;
use crate::lineage::ChangeSet;
use crate::lineage::InstalledPackageStatus;
use crate::lineage::PackageFileFingerprint;
use crate::lineage::PackageLineage;
use crate::manifest::Table;
use crate::Error;
use crate::Res;
pub async fn refresh_latest_hash(
mut lineage: PackageLineage,
remote: &impl Remote,
) -> Res<PackageLineage> {
let latest = resolve_latest(remote, lineage.remote.clone().into()).await?;
if lineage.latest_hash == latest.hash {
return Ok(lineage);
}
lineage.latest_hash = latest.hash;
Ok(lineage)
}
pub async fn create_status(
lineage: PackageLineage,
storage: &(impl Storage + Sync),
manifest: &Table,
working_dir: PathBuf,
) -> Res<(PackageLineage, InstalledPackageStatus)> {
let mut orig_paths = HashMap::new();
for path in lineage.paths.keys() {
let row = manifest
.get_record(path)
.await?
.ok_or(Error::ManifestPath(format!(
"path {:?} not found in installed manifest",
path
)))?;
orig_paths.insert(path.clone(), row);
}
let mut queue = VecDeque::new();
queue.push_back(working_dir.clone());
let mut changes = ChangeSet::new();
while let Some(dir) = queue.pop_front() {
let mut dir_entries = match storage.read_dir(&dir).await {
Ok(dir_entries) => dir_entries,
Err(err) => {
log::error!("Failed to read directory {:?}: {}", dir, err);
continue;
}
};
while let Some(dir_entry) = dir_entries.next_entry().await? {
let file_path = dir_entry.path();
let file_type = dir_entry.file_type().await?;
if file_type.is_dir() {
queue.push_back(file_path);
} else if file_type.is_file() {
let file = storage.open_file(&file_path).await?;
let file_metadata = file.metadata().await?;
let relative_path = file_path.strip_prefix(&working_dir).unwrap();
if let Some(orig_row) = orig_paths.remove(&relative_path.to_path_buf()) {
let file_hash = match orig_row.hash.code() {
MULTIHASH_SHA256_CHUNKED => {
calculate_sha256_chunked_checksum(file, file_metadata.len()).await?
}
_ => calculate_sha256_checksum(file).await?,
};
if file_hash != orig_row.hash {
changes.insert(
relative_path.to_path_buf(),
Change::Modified(PackageFileFingerprint {
size: file_metadata.len(),
hash: file_hash,
}),
);
}
} else {
let sha256_hash =
calculate_sha256_chunked_checksum(file, file_metadata.len()).await?;
changes.insert(
relative_path.to_path_buf(),
Change::Added(PackageFileFingerprint {
size: file_metadata.len(),
hash: sha256_hash,
}),
);
}
} else {
log::warn!("Unexpected file type: {:?}", file_path);
}
}
}
for (orig_path, orig_row) in orig_paths {
changes.insert(orig_path.to_path_buf(), Change::Removed(orig_row));
}
let status = InstalledPackageStatus::new(lineage.clone().into(), changes);
Ok((lineage, status))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::checksum::ContentHash;
use crate::lineage::CommitState;
use crate::lineage::PackageFileFingerprint;
use crate::lineage::UpstreamState;
use crate::mocks;
#[tokio::test]
async fn test_default_status() -> Res {
let storage = mocks::storage::MockStorage::default();
let (_lineage, status) = create_status(
PackageLineage::default(),
&storage,
&Table::default(),
PathBuf::default(),
)
.await?;
assert_eq!(status, InstalledPackageStatus::default());
Ok(())
}
#[tokio::test]
async fn test_behind() -> Res {
let base_hash = "AAA";
let latest_hash = "BBB";
let commit_hash = "AAA";
let lineage = mocks::lineage::with_commit_hashes(base_hash, latest_hash, commit_hash);
let (_lineage, status) = create_status(
lineage,
&mocks::storage::MockStorage::default(),
&Table::default(),
PathBuf::default(),
)
.await?;
assert_eq!(status.upstream_state, UpstreamState::Behind);
Ok(())
}
#[tokio::test]
async fn test_ahead() -> Res {
let base_hash = "AAA";
let latest_hash = "AAA";
let commit_hash = "BBB";
let lineage = mocks::lineage::with_commit_hashes(base_hash, latest_hash, commit_hash);
let (_, status) = create_status(
lineage,
&mocks::storage::MockStorage::default(),
&Table::default(),
PathBuf::default(),
)
.await?;
assert_eq!(status.upstream_state, UpstreamState::Ahead);
Ok(())
}
#[tokio::test]
async fn test_diverged() -> Res {
let lineage = PackageLineage {
commit: Some(CommitState {
hash: "aaa".to_string(),
..CommitState::default()
}),
base_hash: "bbb".to_string(),
latest_hash: "ccc".to_string(),
..PackageLineage::default()
};
let (_, status) = create_status(
lineage,
&mocks::storage::MockStorage::default(),
&Table::default(),
PathBuf::default(),
)
.await?;
assert_eq!(status.upstream_state, UpstreamState::Diverged);
Ok(())
}
#[tokio::test]
async fn test_removed_files() -> Res {
let lineage = mocks::lineage::with_paths(vec![PathBuf::from("a/a")]);
let manifest = mocks::manifest::with_record_keys(vec![PathBuf::from("a/a")]);
let (_, status) = create_status(
lineage,
&mocks::storage::MockStorage::default(),
&manifest,
PathBuf::default(),
)
.await?;
let removed_file = status.changes.get(&PathBuf::from("a/a")).unwrap();
assert!(matches!(removed_file, Change::Removed(_)));
Ok(())
}
#[tokio::test]
async fn test_added_files() -> Res {
let lineage = PackageLineage::default();
let manifest = Table::default();
let storage = mocks::storage::MockStorage::default();
let working_dir = storage.temp_dir.as_ref().join(PathBuf::from("foo/bar"));
let file_path = PathBuf::from("inside/package/file.pq");
storage
.write_file(
working_dir.join(&file_path),
&std::fs::read(mocks::manifest::parquet())?,
)
.await?;
let (_, status) =
create_status(lineage, &storage, &manifest, working_dir.to_path_buf()).await?;
let added_file = status.changes.get(&file_path).unwrap();
if let Change::Added(fingerprint) = added_file {
assert_eq!(
*fingerprint,
PackageFileFingerprint {
size: 5324,
hash: ContentHash::SHA256Chunked(
"EfrtXWeClWPJ/IVKjQeAmMKhJV45/GcpjDm1IhvhJAY=".to_string()
)
.try_into()?,
}
);
Ok(())
} else {
panic!()
}
}
}