use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::Mutex;
use std::time::UNIX_EPOCH;
use anyhow::Context;
use anyhow::Result;
use anyhow::bail;
use arrayvec::ArrayString;
use blake3::Hash;
use blake3::Hasher;
use cloud_copy::ContentDigest;
use cloud_copy::UrlExt;
use futures::FutureExt;
use tokio::sync::OnceCell;
use tokio::task::spawn_blocking;
use tracing::debug;
use url::Url;
use crate::ContentKind;
use crate::cache::Hashable;
use crate::config::ContentDigestMode;
use crate::http::Transferer;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Digest {
File(Hash),
Directory(Hash),
}
impl Digest {
pub fn to_hex(self) -> ArrayString<64> {
match self {
Self::File(hash) => hash.to_hex(),
Self::Directory(hash) => hash.to_hex(),
}
}
}
type LocalDigestMap = HashMap<(ContentDigestMode, PathBuf), Arc<OnceCell<Digest>>>;
type RemoteDigestMap = HashMap<Url, Arc<OnceCell<Digest>>>;
static LOCAL_DIGESTS: LazyLock<Mutex<LocalDigestMap>> = LazyLock::new(Mutex::default);
static REMOTE_DIGESTS: LazyLock<Mutex<RemoteDigestMap>> = LazyLock::new(Mutex::default);
pub trait UrlDigestExt: Sized {
fn join_digest(&self, digest: Digest) -> Self;
}
impl UrlDigestExt for Url {
fn join_digest(&self, digest: Digest) -> Self {
assert!(
!self.cannot_be_a_base(),
"invalid URL: URL is required to be a base"
);
let mut url = self.clone();
{
let mut segments = url.path_segments_mut().unwrap();
segments.pop_if_empty();
let digest = match digest {
Digest::File(digest) => {
segments.push("file");
digest
}
Digest::Directory(digest) => {
segments.push("directory");
digest
}
};
let hex = digest.to_hex();
segments.push(hex.as_str());
}
url
}
}
async fn get_content_digest(transferer: &dyn Transferer, url: &Url) -> Result<Arc<ContentDigest>> {
match transferer.digest(url).await.with_context(|| {
format!(
"failed to get content digest of URL `{url}`",
url = url.display()
)
})? {
Some(digest) => Ok(digest),
None => bail!("URL `{url}` does not have a known content digest"),
}
}
async fn calculate_file_digest(path: &Path, mode: ContentDigestMode) -> Result<Digest> {
match mode {
ContentDigestMode::Strong => {
let path = path.to_path_buf();
spawn_blocking(move || {
let mut hasher = Hasher::new();
hasher.update_mmap_rayon(&path).with_context(|| {
format!(
"failed to calculate digest of `{path}`",
path = path.display()
)
})?;
anyhow::Ok(Digest::File(hasher.finalize()))
})
.await
.context("file digest task panicked")?
}
ContentDigestMode::Weak => {
let metadata = path.metadata().with_context(|| {
format!("failed to read metadata of `{path}`", path = path.display())
})?;
let mtime = metadata
.modified()
.with_context(|| {
format!(
"failed to determine last modified time of `{path}`",
path = path.display()
)
})?
.duration_since(UNIX_EPOCH)
.with_context(|| {
format!(
"last modified time of `{path}` occurs is before UNIX epoch",
path = path.display()
)
})?;
let mut hasher = Hasher::new();
hasher.update(&metadata.len().to_le_bytes());
hasher.update(&mtime.as_secs().to_le_bytes());
hasher.update(&mtime.as_millis().to_le_bytes());
hasher.update(&mtime.as_micros().to_le_bytes());
hasher.update(&mtime.as_nanos().to_le_bytes());
Ok(Digest::File(hasher.finalize()))
}
}
}
fn calculate_directory_digest(
path: &Path,
mode: ContentDigestMode,
) -> impl Future<Output = Result<Digest>> + Send {
async move {
let mut dir = tokio::fs::read_dir(&path)
.await
.with_context(|| format!("failed to read directory `{path}`", path = path.display()))?;
let mut entries = Vec::new();
while let Some(entry) = dir
.next_entry()
.await
.with_context(|| format!("failed to read directory `{path}`", path = path.display()))?
{
entries.push(entry);
}
drop(dir);
entries.sort_by_key(|e| e.file_name());
let mut count: u32 = 0;
let mut hasher = Hasher::new();
for entry in &entries {
let entry_path = entry.path();
let mut metadata = entry.metadata().await.with_context(|| {
format!(
"failed to read metadata for path `{path}`",
path = entry_path.display()
)
})?;
if metadata.is_symlink() {
match fs::metadata(&entry_path) {
Ok(m) => metadata = m,
Err(_) => continue,
}
}
let kind = if metadata.is_file() {
ContentKind::File
} else {
ContentKind::Directory
};
let entry_rel_path = entry_path
.strip_prefix(path)
.expect("entry path should be relative")
.to_str()
.with_context(|| {
format!("path `{path}` is not UTF-8", path = entry_path.display())
})?;
entry_rel_path.hash(&mut hasher);
let digest = calculate_local_digest(&entry_path, kind, mode).await?;
digest.hash(&mut hasher);
count += 1;
}
hasher.update(&count.to_le_bytes());
Ok(Digest::Directory(hasher.finalize()))
}
.boxed()
}
pub async fn calculate_local_digest(
path: &Path,
kind: ContentKind,
mode: ContentDigestMode,
) -> Result<Digest> {
let digest = {
let mut digests = LOCAL_DIGESTS.lock().expect("failed to lock digests");
digests
.entry((mode, path.to_path_buf()))
.or_default()
.clone()
};
Ok(*digest
.get_or_try_init(|| async move {
let metadata = path.metadata().with_context(|| {
format!("failed to read metadata of `{path}`", path = path.display())
})?;
debug!(
"calculating content digest of `{path}`",
path = path.display()
);
if kind == ContentKind::File {
if !metadata.is_file() {
bail!("expected path `{path}` to be a file", path = path.display());
}
calculate_file_digest(path, mode).await
} else {
if metadata.is_file() {
bail!(
"expected path `{path}` to be a directory",
path = path.display()
);
}
calculate_directory_digest(path, mode).await
}
})
.await?)
}
pub async fn calculate_remote_digest(
transferer: &dyn Transferer,
url: &Url,
kind: ContentKind,
) -> Result<Digest> {
let digest = {
let mut digests = REMOTE_DIGESTS.lock().expect("failed to lock digests");
digests.entry(url.clone()).or_default().clone()
};
Ok(*digest
.get_or_try_init(|| async {
debug!("calculating content digest of `{url}`", url = url.display());
if kind == ContentKind::File {
let digest = get_content_digest(transferer, url).await?;
let mut hasher = Hasher::new();
digest.hash(&mut hasher);
return anyhow::Ok(Digest::File(hasher.finalize()));
}
let entries = transferer
.walk(url)
.await
.with_context(|| format!("failed to walk URL `{url}`", url = url.display()))?;
let mut hasher = Hasher::new();
for entry in entries.iter() {
let mut url = url.clone();
{
let mut segments = url.path_segments_mut().expect("URL should have a path");
segments.pop_if_empty();
for segment in entry.split('/') {
segments.push(segment);
}
}
let digest = get_content_digest(transferer, &url).await?;
entry.hash(&mut hasher);
digest.hash(&mut hasher);
}
hasher.update(&(entries.len() as u32).to_le_bytes());
Ok(Digest::Directory(hasher.finalize()))
})
.await?)
}
#[cfg(test)]
pub(crate) mod test {
use std::fs;
use std::io::Write;
use std::time::Duration;
use std::time::SystemTime;
use anyhow::anyhow;
use futures::FutureExt;
use futures::future::BoxFuture;
use pretty_assertions::assert_eq;
use tempfile::NamedTempFile;
use tempfile::tempdir;
use super::*;
use crate::ContentKind;
use crate::http::Location;
pub fn clear_digest_cache() {
LOCAL_DIGESTS
.lock()
.expect("failed to lock digests")
.clear();
REMOTE_DIGESTS
.lock()
.expect("failed to lock digests")
.clear();
}
pub struct DigestTransferer(HashMap<&'static str, Option<Arc<ContentDigest>>>);
impl DigestTransferer {
pub fn new<C>(c: C) -> Self
where
C: IntoIterator<Item = (&'static str, Option<ContentDigest>)>,
{
Self(HashMap::from_iter(
c.into_iter().map(|(k, v)| (k, v.map(Into::into))),
))
}
}
impl Transferer for DigestTransferer {
fn download<'a>(&'a self, _source: &'a Url) -> BoxFuture<'a, Result<Location>> {
unimplemented!()
}
fn upload<'a>(
&'a self,
_source: &'a Path,
_destination: &'a Url,
) -> BoxFuture<'a, Result<()>> {
unimplemented!()
}
fn size<'a>(&'a self, _url: &'a Url) -> BoxFuture<'a, Result<Option<u64>>> {
unimplemented!()
}
fn walk<'a>(&'a self, url: &'a Url) -> BoxFuture<'a, Result<Arc<[String]>>> {
async {
let mut entries = Vec::new();
for k in self.0.keys() {
if let Some(path) = k.strip_prefix(url.as_str()) {
let path = path.strip_prefix("/").unwrap_or(path);
entries.push(path.to_string());
}
}
entries.sort();
Ok(entries.into())
}
.boxed()
}
fn exists<'a>(&'a self, _url: &'a Url) -> BoxFuture<'a, Result<bool>> {
unimplemented!()
}
fn digest<'a>(&'a self, url: &'a Url) -> BoxFuture<'a, Result<Option<Arc<ContentDigest>>>> {
async {
Ok(self
.0
.get(url.as_str())
.ok_or_else(|| anyhow!("does not exist"))?
.clone())
}
.boxed()
}
}
#[tokio::test]
async fn local_file_digest_strong() {
let mut file = NamedTempFile::new().unwrap();
file.write_all(b"hello world!").unwrap();
let digest =
calculate_local_digest(file.path(), ContentKind::File, ContentDigestMode::Strong)
.await
.unwrap();
assert_eq!(
*digest.to_hex(),
*"3aa61c409fd7717c9d9c639202af2fae470c0ef669be7ba2caea5779cb534e9d"
);
}
#[tokio::test]
async fn local_file_digest_weak() {
let mut file = NamedTempFile::new().unwrap();
file.write_all(b"hello world!").unwrap();
let digest =
calculate_local_digest(file.path(), ContentKind::File, ContentDigestMode::Weak)
.await
.unwrap();
assert_eq!(
digest,
calculate_file_digest(file.path(), ContentDigestMode::Weak)
.await
.unwrap()
);
file.write_all(b"!").unwrap();
file.flush().unwrap();
clear_digest_cache();
let changed =
calculate_local_digest(file.path(), ContentKind::File, ContentDigestMode::Weak)
.await
.unwrap();
assert!(digest != changed, "expected digest to change");
let digest = changed;
file.as_file()
.set_modified(
SystemTime::now()
.checked_sub(Duration::from_hours(1))
.unwrap(),
)
.unwrap();
clear_digest_cache();
let changed =
calculate_local_digest(file.path(), ContentKind::File, ContentDigestMode::Weak)
.await
.unwrap();
assert!(digest != changed, "expected digest to change");
}
#[tokio::test]
async fn local_directory_digest() {
let dir = tempdir().unwrap();
fs::write(dir.path().join("a"), b"a").unwrap();
fs::write(dir.path().join("b"), b"b").unwrap();
fs::write(dir.path().join("c"), b"c").unwrap();
let subdir = dir.path().join("subdir");
fs::create_dir(&subdir).unwrap();
fs::write(subdir.join("z"), b"z").unwrap();
fs::write(subdir.join("y"), b"y").unwrap();
fs::write(subdir.join("x"), b"x").unwrap();
let digest = calculate_local_digest(
dir.path(),
ContentKind::Directory,
ContentDigestMode::Strong,
)
.await
.unwrap();
let mut hasher = Hasher::new();
hasher.update(&1u32.to_le_bytes()); hasher.update("x".as_bytes()); hasher.update(&[0]); hasher.update(&32u32.to_le_bytes()); hasher.update(
Hash::from_hex("3ae7d805f6789a6402acb70ad4096a85a56bf6804eaf25c0493ac697548d30b5")
.unwrap()
.as_bytes(),
); hasher.update(&1u32.to_le_bytes()); hasher.update("y".as_bytes()); hasher.update(&[0]); hasher.update(&32u32.to_le_bytes()); hasher.update(
Hash::from_hex("08112a9e334ce73042b531c25668cf5cb12a1ee040a4326afeac065461079a06")
.unwrap()
.as_bytes(),
); hasher.update(&1u32.to_le_bytes()); hasher.update("z".as_bytes()); hasher.update(&[0]); hasher.update(&32u32.to_le_bytes()); hasher.update(
Hash::from_hex("1104908ab930e671002c7cd7f3fc921570b1bf64ecfa12fe363585c630eaca6b")
.unwrap()
.as_bytes(),
); hasher.update(&3u32.to_le_bytes()); let subdir_digest = hasher.finalize();
let mut hasher = Hasher::new();
hasher.update(&1u32.to_le_bytes()); hasher.update("a".as_bytes()); hasher.update(&[0]); hasher.update(&32u32.to_le_bytes()); hasher.update(
Hash::from_hex("17762fddd969a453925d65717ac3eea21320b66b54342fde15128d6caf21215f")
.unwrap()
.as_bytes(),
); hasher.update(&1u32.to_le_bytes()); hasher.update("b".as_bytes()); hasher.update(&[0]); hasher.update(&32u32.to_le_bytes()); hasher.update(
Hash::from_hex("10e5cf3d3c8a4f9f3468c8cc58eea84892a22fdadbc1acb22410190044c1d553")
.unwrap()
.as_bytes(),
); hasher.update(&1u32.to_le_bytes()); hasher.update("c".as_bytes()); hasher.update(&[0]); hasher.update(&32u32.to_le_bytes()); hasher.update(
Hash::from_hex("ea7aa1fc9efdbe106dbb70369a75e9671fa29d52bd55536711bf197477b8f021")
.unwrap()
.as_bytes(),
); hasher.update(&6u32.to_le_bytes()); hasher.update("subdir".as_bytes()); hasher.update(&[1]); hasher.update(&32u32.to_le_bytes()); hasher.update(subdir_digest.as_bytes()); hasher.update(&4u32.to_le_bytes()); assert_eq!(digest.to_hex(), hasher.finalize().to_hex());
}
#[tokio::test]
async fn remote_file_digest() {
let content_digest =
Hash::from_hex("7509e5bda0c762d2bac7f90d758b5b2263fa01ccbc542ab5e3df163be08e6ca9")
.unwrap();
let transferer = DigestTransferer::new([
(
"http://example.com/foo",
Some(ContentDigest::Hash {
algorithm: "sha256".to_string(),
digest: content_digest.as_bytes().into(),
}),
),
(
"http://example.com/bar",
Some(ContentDigest::ETag("etag".into())),
),
("http://example.com/baz", None),
]);
let digest = calculate_remote_digest(
&transferer,
&"http://example.com/foo".parse().unwrap(),
ContentKind::File,
)
.await
.unwrap();
let mut hasher = Hasher::new();
hasher.update(&[0]); hasher.update(&6u32.to_le_bytes()); hasher.update("sha256".as_bytes()); hasher.update(&32u32.to_le_bytes()); hasher.update(content_digest.as_bytes()); assert_eq!(digest.to_hex(), hasher.finalize().to_hex());
let digest = calculate_remote_digest(
&transferer,
&"http://example.com/bar".parse().unwrap(),
ContentKind::File,
)
.await
.unwrap();
let mut hasher = Hasher::new();
hasher.update(&[1]); hasher.update(&4u32.to_le_bytes()); hasher.update("etag".as_bytes()); assert_eq!(digest.to_hex(), hasher.finalize().to_hex());
assert_eq!(
calculate_remote_digest(
&transferer,
&"http://example.com/baz".parse().unwrap(),
ContentKind::File,
)
.await
.unwrap_err()
.to_string(),
"URL `http://example.com/baz` does not have a known content digest"
);
assert_eq!(
format!(
"{:#}",
calculate_remote_digest(
&transferer,
&"http://example.com/nope".parse().unwrap(),
ContentKind::File,
)
.await
.unwrap_err()
),
"failed to get content digest of URL `http://example.com/nope`: does not exist"
);
}
#[tokio::test]
async fn remote_directory_digest() {
let content_digest =
Hash::from_hex("7509e5bda0c762d2bac7f90d758b5b2263fa01ccbc542ab5e3df163be08e6ca9")
.unwrap();
let transferer = DigestTransferer::new([
(
"http://example.com/dir/foo",
Some(ContentDigest::Hash {
algorithm: "sha256".to_string(),
digest: content_digest.as_bytes().into(),
}),
),
(
"http://example.com/dir/bar/baz",
Some(ContentDigest::ETag("etag".into())),
),
("http://example.com/missing/baz", None),
]);
let digest = calculate_remote_digest(
&transferer,
&"http://example.com/dir".parse().unwrap(),
ContentKind::Directory,
)
.await
.unwrap();
let mut hasher = Hasher::new();
hasher.update(&7u32.to_le_bytes()); hasher.update("bar/baz".as_bytes()); hasher.update(&[1]); hasher.update(&4u32.to_le_bytes()); hasher.update("etag".as_bytes()); hasher.update(&3u32.to_le_bytes()); hasher.update("foo".as_bytes()); hasher.update(&[0]); hasher.update(&6u32.to_le_bytes()); hasher.update("sha256".as_bytes()); hasher.update(&32u32.to_le_bytes()); hasher.update(content_digest.as_bytes()); hasher.update(&2u32.to_le_bytes()); assert_eq!(digest.to_hex(), hasher.finalize().to_hex());
let trailing_digest = calculate_remote_digest(
&transferer,
&"http://example.com/dir/".parse().unwrap(),
ContentKind::Directory,
)
.await
.unwrap();
assert_eq!(digest, trailing_digest);
let digest = calculate_remote_digest(
&transferer,
&"http://example.com/empty".parse().unwrap(),
ContentKind::Directory,
)
.await
.unwrap();
let mut hasher = Hasher::new();
hasher.update(&0u32.to_le_bytes()); assert_eq!(digest.to_hex(), hasher.finalize().to_hex());
assert_eq!(
format!(
"{:#}",
calculate_remote_digest(
&transferer,
&"http://example.com/missing".parse().unwrap(),
ContentKind::Directory,
)
.await
.unwrap_err()
),
"URL `http://example.com/missing/baz` does not have a known content digest"
);
}
#[cfg(unix)]
#[tokio::test]
async fn ignore_broken_symlink() {
use std::os::unix::fs::symlink;
let target = NamedTempFile::new()
.expect("failed to create temporary file")
.into_temp_path();
fs::write(&target, b"hello world!").expect("failed to write temporary file");
let dir = tempdir().expect("failed to create temp directory");
let link = dir.path().join("b");
symlink(&target, &link).expect("failed to create symlink");
let digest = calculate_directory_digest(dir.path(), ContentDigestMode::Strong)
.await
.expect("failed to calculate digest");
fs::remove_file(&target).expect("failed to delete file");
let modified = calculate_directory_digest(dir.path(), ContentDigestMode::Strong)
.await
.expect("failed to calculate digest");
assert!(digest != modified);
fs::write(&target, b"hello world!").expect("failed to create temporary file");
let modified = calculate_directory_digest(dir.path(), ContentDigestMode::Strong)
.await
.expect("failed to calculate digest");
assert_eq!(digest, modified);
}
}