use crate::digest::Digest;
use crate::error::{Error, Result};
use sha2::{Digest as _, Sha256};
use std::fs;
use std::io::{self, Read, Write};
use std::path::{Path, PathBuf};
use tracing::trace;
pub trait Cas: Send + Sync {
fn contains(&self, digest: &Digest) -> Result<bool>;
fn get(&self, digest: &Digest) -> Result<Vec<u8>>;
fn get_to_file(&self, digest: &Digest, destination: &Path) -> Result<()>;
fn put_bytes(&self, bytes: &[u8]) -> Result<Digest>;
fn put_file(&self, source: &Path) -> Result<Digest>;
}
#[derive(Debug, Clone)]
pub struct LocalCas {
root: PathBuf,
}
impl LocalCas {
pub fn open(root: impl AsRef<Path>) -> Result<Self> {
let root = root.as_ref().to_path_buf();
let cas_dir = root.join("cas").join("sha256");
let tmp_dir = root.join("tmp");
fs::create_dir_all(&cas_dir).map_err(|e| Error::io(e, &cas_dir, "create_dir_all"))?;
fs::create_dir_all(&tmp_dir).map_err(|e| Error::io(e, &tmp_dir, "create_dir_all"))?;
Ok(Self { root })
}
#[must_use]
pub fn root(&self) -> &Path {
&self.root
}
#[must_use]
pub fn blob_path(&self, digest: &Digest) -> PathBuf {
let (prefix, rest) = digest.hash.split_at(2);
self.root.join("cas").join("sha256").join(prefix).join(rest)
}
fn tmp_dir(&self) -> PathBuf {
self.root.join("tmp")
}
fn verify_bytes(digest: &Digest, bytes: &[u8]) -> Result<()> {
let actual = Digest::of_bytes(bytes);
if &actual != digest {
return Err(Error::digest_mismatch(
digest.to_resource(),
actual.to_resource(),
));
}
Ok(())
}
fn verify_file(path: &Path, digest: &Digest) -> Result<()> {
let mut file = fs::File::open(path).map_err(|e| Error::io(e, path, "open"))?;
let mut hasher = Sha256::new();
let mut size: u64 = 0;
let mut buffer: Box<[u8]> = vec![0u8; 64 * 1024].into_boxed_slice();
loop {
let count = file
.read(&mut buffer)
.map_err(|e| Error::io(e, path, "read"))?;
if count == 0 {
break;
}
hasher.update(&buffer[..count]);
size += count as u64;
}
let actual = Digest {
hash: hex::encode(hasher.finalize()),
size_bytes: size,
};
if &actual != digest {
return Err(Error::digest_mismatch(
digest.to_resource(),
actual.to_resource(),
));
}
Ok(())
}
fn install(src: &Path, dst: &Path) -> Result<()> {
if let Some(parent) = dst.parent() {
fs::create_dir_all(parent).map_err(|e| Error::io(e, parent, "create_dir_all"))?;
}
if dst.exists() {
let _ = fs::remove_file(src);
return Ok(());
}
match fs::rename(src, dst) {
Ok(()) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
let _ = fs::remove_file(src);
Ok(())
}
Err(e) if e.raw_os_error() == Some(EXDEV) => {
fs::copy(src, dst).map_err(|e2| Error::io(e2, dst, "copy"))?;
let _ = fs::remove_file(src);
Ok(())
}
Err(e) => Err(Error::io(e, dst, "rename")),
}
}
}
impl Cas for LocalCas {
fn contains(&self, digest: &Digest) -> Result<bool> {
Ok(self.blob_path(digest).exists())
}
fn get(&self, digest: &Digest) -> Result<Vec<u8>> {
let path = self.blob_path(digest);
match fs::read(&path) {
Ok(bytes) => {
Self::verify_bytes(digest, &bytes)?;
Ok(bytes)
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
Err(Error::not_found(digest.hash.clone()))
}
Err(e) => Err(Error::io(e, &path, "read")),
}
}
fn get_to_file(&self, digest: &Digest, destination: &Path) -> Result<()> {
let src = self.blob_path(digest);
if !src.exists() {
return Err(Error::not_found(digest.hash.clone()));
}
if let Some(parent) = destination.parent() {
fs::create_dir_all(parent).map_err(|e| Error::io(e, parent, "create_dir_all"))?;
}
fs::copy(&src, destination).map_err(|e| Error::io(e, destination, "copy"))?;
Self::verify_file(destination, digest)
}
fn put_bytes(&self, bytes: &[u8]) -> Result<Digest> {
let digest = Digest::of_bytes(bytes);
let dst = self.blob_path(&digest);
if dst.exists() {
trace!(digest = %digest, "CAS put_bytes: already present");
return Ok(digest);
}
let tmp_dir = self.tmp_dir();
let mut tmp = tempfile::NamedTempFile::new_in(&tmp_dir)
.map_err(|e| Error::io(e, &tmp_dir, "tempfile"))?;
tmp.write_all(bytes)
.map_err(|e| Error::io(e, tmp.path(), "write"))?;
tmp.as_file()
.sync_all()
.map_err(|e| Error::io(e, tmp.path(), "fsync"))?;
let (_, tmp_path) = tmp
.keep()
.map_err(|e| Error::io(e.error, &tmp_dir, "keep"))?;
Self::install(&tmp_path, &dst)?;
trace!(digest = %digest, "CAS put_bytes: installed");
Ok(digest)
}
fn put_file(&self, source: &Path) -> Result<Digest> {
let mut file = fs::File::open(source).map_err(|e| Error::io(e, source, "open"))?;
let mut hasher = Sha256::new();
let mut size: u64 = 0;
let mut buf: Box<[u8]> = vec![0u8; 64 * 1024].into_boxed_slice();
loop {
let n = file
.read(&mut buf)
.map_err(|e| Error::io(e, source, "read"))?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
size += n as u64;
}
let digest = Digest {
hash: hex::encode(hasher.finalize()),
size_bytes: size,
};
let dst = self.blob_path(&digest);
if dst.exists() {
trace!(digest = %digest, source = %source.display(), "CAS put_file: already present");
return Ok(digest);
}
if let Some(parent) = dst.parent() {
fs::create_dir_all(parent).map_err(|e| Error::io(e, parent, "create_dir_all"))?;
}
let tmp_dir = self.tmp_dir();
let tmp = tempfile::NamedTempFile::new_in(&tmp_dir)
.map_err(|e| Error::io(e, &tmp_dir, "tempfile"))?;
fs::copy(source, tmp.path()).map_err(|e| Error::io(e, tmp.path(), "copy"))?;
let (_, tmp_path) = tmp
.keep()
.map_err(|e| Error::io(e.error, &tmp_dir, "keep"))?;
Self::install(&tmp_path, &dst)?;
trace!(digest = %digest, "CAS put_file: copied");
Ok(digest)
}
}
#[cfg(target_family = "unix")]
const EXDEV: i32 = 18;
#[cfg(not(target_family = "unix"))]
const EXDEV: i32 = -1;
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn put_and_get_bytes() {
let tmp = TempDir::new().unwrap();
let cas = LocalCas::open(tmp.path()).unwrap();
let digest = cas.put_bytes(b"hello cas").unwrap();
assert!(cas.contains(&digest).unwrap());
assert_eq!(cas.get(&digest).unwrap(), b"hello cas");
}
#[test]
fn put_bytes_is_idempotent() {
let tmp = TempDir::new().unwrap();
let cas = LocalCas::open(tmp.path()).unwrap();
let a = cas.put_bytes(b"same").unwrap();
let b = cas.put_bytes(b"same").unwrap();
assert_eq!(a, b);
}
#[test]
fn put_file_matches_put_bytes() {
let tmp = TempDir::new().unwrap();
let cas = LocalCas::open(tmp.path()).unwrap();
let src = tmp.path().join("src.txt");
fs::write(&src, b"from disk").unwrap();
let d_file = cas.put_file(&src).unwrap();
let d_bytes = Digest::of_bytes(b"from disk");
assert_eq!(d_file, d_bytes);
assert!(cas.contains(&d_file).unwrap());
}
#[test]
fn get_to_file_materializes_content() {
let tmp = TempDir::new().unwrap();
let cas = LocalCas::open(tmp.path()).unwrap();
let digest = cas.put_bytes(b"materialize me").unwrap();
let dst = tmp.path().join("out/file.bin");
cas.get_to_file(&digest, &dst).unwrap();
assert_eq!(fs::read(&dst).unwrap(), b"materialize me");
}
#[test]
fn get_detects_corrupted_blob() {
let tmp = TempDir::new().unwrap();
let cas = LocalCas::open(tmp.path()).unwrap();
let digest = cas.put_bytes(b"immutable").unwrap();
fs::write(cas.blob_path(&digest), b"mutated").unwrap();
let err = cas.get(&digest).unwrap_err();
assert!(matches!(err, Error::DigestMismatch { .. }));
}
#[test]
fn mutating_materialized_file_does_not_corrupt_cas_blob() {
let tmp = TempDir::new().unwrap();
let cas = LocalCas::open(tmp.path()).unwrap();
let digest = cas.put_bytes(b"original").unwrap();
let dst = tmp.path().join("out/file.bin");
cas.get_to_file(&digest, &dst).unwrap();
fs::write(&dst, b"modified").unwrap();
assert_eq!(cas.get(&digest).unwrap(), b"original");
}
#[test]
fn mutating_source_after_put_file_does_not_corrupt_cas_blob() {
let tmp = TempDir::new().unwrap();
let cas = LocalCas::open(tmp.path()).unwrap();
let src = tmp.path().join("src.txt");
fs::write(&src, b"from disk").unwrap();
let digest = cas.put_file(&src).unwrap();
fs::write(&src, b"changed later").unwrap();
assert_eq!(cas.get(&digest).unwrap(), b"from disk");
}
#[test]
fn get_missing_returns_not_found() {
let tmp = TempDir::new().unwrap();
let cas = LocalCas::open(tmp.path()).unwrap();
let bogus = Digest::of_bytes(b"never written");
let err = cas.get(&bogus).unwrap_err();
assert!(matches!(err, Error::NotFound { .. }));
}
#[test]
fn contains_reflects_state() {
let tmp = TempDir::new().unwrap();
let cas = LocalCas::open(tmp.path()).unwrap();
let d = Digest::of_bytes(b"x");
assert!(!cas.contains(&d).unwrap());
cas.put_bytes(b"x").unwrap();
assert!(cas.contains(&d).unwrap());
}
}