Skip to main content

cuenv_cas/
cas.rs

1//! [`Cas`] trait and local on-disk implementation.
2
3use crate::digest::Digest;
4use crate::error::{Error, Result};
5use sha2::{Digest as _, Sha256};
6use std::fs;
7use std::io::{self, Read, Write};
8use std::path::{Path, PathBuf};
9use tracing::trace;
10
11/// A content-addressed blob store.
12///
13/// Implementations must be safe to use from multiple threads concurrently.
14pub trait Cas: Send + Sync {
15    /// True if the store holds a blob with `digest`.
16    ///
17    /// # Errors
18    ///
19    /// Returns an error if the underlying storage cannot be queried.
20    fn contains(&self, digest: &Digest) -> Result<bool>;
21
22    /// Load the blob at `digest` into memory.
23    ///
24    /// # Errors
25    ///
26    /// Returns [`Error::NotFound`] if the digest is not present, or an
27    /// I/O error if the blob cannot be read.
28    fn get(&self, digest: &Digest) -> Result<Vec<u8>>;
29
30    /// Copy the blob at `digest` to `destination`. The parent directory must
31    /// already exist.
32    ///
33    /// # Errors
34    ///
35    /// Returns [`Error::NotFound`] if the digest is not present, or an
36    /// I/O error if the copy/link fails.
37    fn get_to_file(&self, digest: &Digest, destination: &Path) -> Result<()>;
38
39    /// Store `bytes` and return its digest.
40    ///
41    /// # Errors
42    ///
43    /// Returns an I/O error if the blob cannot be written to the store.
44    fn put_bytes(&self, bytes: &[u8]) -> Result<Digest>;
45
46    /// Stream a file into the store and return its digest. The source file
47    /// is read but not modified.
48    ///
49    /// # Errors
50    ///
51    /// Returns an I/O error if the source cannot be read or the blob
52    /// cannot be written to the store.
53    fn put_file(&self, source: &Path) -> Result<Digest>;
54}
55
56/// A blob store rooted at a local directory.
57///
58/// Layout:
59///
60/// ```text
61/// root/
62///   cas/sha256/<ab>/<cdef...>    blob files (name = rest of hex digest)
63///   tmp/                          staging area for atomic writes
64/// ```
65#[derive(Debug, Clone)]
66pub struct LocalCas {
67    root: PathBuf,
68}
69
70impl LocalCas {
71    /// Open or create a local CAS rooted at `root`.
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if the required directories cannot be created.
76    pub fn open(root: impl AsRef<Path>) -> Result<Self> {
77        let root = root.as_ref().to_path_buf();
78        let cas_dir = root.join("cas").join("sha256");
79        let tmp_dir = root.join("tmp");
80        fs::create_dir_all(&cas_dir).map_err(|e| Error::io(e, &cas_dir, "create_dir_all"))?;
81        fs::create_dir_all(&tmp_dir).map_err(|e| Error::io(e, &tmp_dir, "create_dir_all"))?;
82        Ok(Self { root })
83    }
84
85    /// Root directory of this store.
86    #[must_use]
87    pub fn root(&self) -> &Path {
88        &self.root
89    }
90
91    /// Compute the on-disk path for `digest`.
92    #[must_use]
93    pub fn blob_path(&self, digest: &Digest) -> PathBuf {
94        let (prefix, rest) = digest.hash.split_at(2);
95        self.root.join("cas").join("sha256").join(prefix).join(rest)
96    }
97
98    fn tmp_dir(&self) -> PathBuf {
99        self.root.join("tmp")
100    }
101
102    fn verify_bytes(digest: &Digest, bytes: &[u8]) -> Result<()> {
103        let actual = Digest::of_bytes(bytes);
104        if &actual != digest {
105            return Err(Error::digest_mismatch(
106                digest.to_resource(),
107                actual.to_resource(),
108            ));
109        }
110        Ok(())
111    }
112
113    fn verify_file(path: &Path, digest: &Digest) -> Result<()> {
114        let mut file = fs::File::open(path).map_err(|e| Error::io(e, path, "open"))?;
115        let mut hasher = Sha256::new();
116        let mut size: u64 = 0;
117        let mut buffer: Box<[u8]> = vec![0u8; 64 * 1024].into_boxed_slice();
118
119        loop {
120            let count = file
121                .read(&mut buffer)
122                .map_err(|e| Error::io(e, path, "read"))?;
123            if count == 0 {
124                break;
125            }
126            hasher.update(&buffer[..count]);
127            size += count as u64;
128        }
129
130        let actual = Digest {
131            hash: hex::encode(hasher.finalize()),
132            size_bytes: size,
133        };
134        if &actual != digest {
135            return Err(Error::digest_mismatch(
136                digest.to_resource(),
137                actual.to_resource(),
138            ));
139        }
140        Ok(())
141    }
142
143    /// Atomically rename `src` into `dst`, tolerating the case where another
144    /// writer populated the same digest concurrently.
145    fn install(src: &Path, dst: &Path) -> Result<()> {
146        if let Some(parent) = dst.parent() {
147            fs::create_dir_all(parent).map_err(|e| Error::io(e, parent, "create_dir_all"))?;
148        }
149        if dst.exists() {
150            // Content-addressed: same path ⇒ same content. Drop the temp.
151            let _ = fs::remove_file(src);
152            return Ok(());
153        }
154        match fs::rename(src, dst) {
155            Ok(()) => Ok(()),
156            Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
157                let _ = fs::remove_file(src);
158                Ok(())
159            }
160            Err(e) if e.raw_os_error() == Some(EXDEV) => {
161                // Cross-device rename isn't supported; copy then drop temp.
162                fs::copy(src, dst).map_err(|e2| Error::io(e2, dst, "copy"))?;
163                let _ = fs::remove_file(src);
164                Ok(())
165            }
166            Err(e) => Err(Error::io(e, dst, "rename")),
167        }
168    }
169}
170
171impl Cas for LocalCas {
172    fn contains(&self, digest: &Digest) -> Result<bool> {
173        Ok(self.blob_path(digest).exists())
174    }
175
176    fn get(&self, digest: &Digest) -> Result<Vec<u8>> {
177        let path = self.blob_path(digest);
178        match fs::read(&path) {
179            Ok(bytes) => {
180                Self::verify_bytes(digest, &bytes)?;
181                Ok(bytes)
182            }
183            Err(e) if e.kind() == io::ErrorKind::NotFound => {
184                Err(Error::not_found(digest.hash.clone()))
185            }
186            Err(e) => Err(Error::io(e, &path, "read")),
187        }
188    }
189
190    fn get_to_file(&self, digest: &Digest, destination: &Path) -> Result<()> {
191        let src = self.blob_path(digest);
192        if !src.exists() {
193            return Err(Error::not_found(digest.hash.clone()));
194        }
195        if let Some(parent) = destination.parent() {
196            fs::create_dir_all(parent).map_err(|e| Error::io(e, parent, "create_dir_all"))?;
197        }
198        fs::copy(&src, destination).map_err(|e| Error::io(e, destination, "copy"))?;
199        Self::verify_file(destination, digest)
200    }
201
202    fn put_bytes(&self, bytes: &[u8]) -> Result<Digest> {
203        let digest = Digest::of_bytes(bytes);
204        let dst = self.blob_path(&digest);
205        if dst.exists() {
206            trace!(digest = %digest, "CAS put_bytes: already present");
207            return Ok(digest);
208        }
209        let tmp_dir = self.tmp_dir();
210        let mut tmp = tempfile::NamedTempFile::new_in(&tmp_dir)
211            .map_err(|e| Error::io(e, &tmp_dir, "tempfile"))?;
212        tmp.write_all(bytes)
213            .map_err(|e| Error::io(e, tmp.path(), "write"))?;
214        tmp.as_file()
215            .sync_all()
216            .map_err(|e| Error::io(e, tmp.path(), "fsync"))?;
217        let (_, tmp_path) = tmp
218            .keep()
219            .map_err(|e| Error::io(e.error, &tmp_dir, "keep"))?;
220        Self::install(&tmp_path, &dst)?;
221        trace!(digest = %digest, "CAS put_bytes: installed");
222        Ok(digest)
223    }
224
225    fn put_file(&self, source: &Path) -> Result<Digest> {
226        // Pass 1: streaming sha256 + size, no copy yet.
227        let mut file = fs::File::open(source).map_err(|e| Error::io(e, source, "open"))?;
228        let mut hasher = Sha256::new();
229        let mut size: u64 = 0;
230        let mut buf: Box<[u8]> = vec![0u8; 64 * 1024].into_boxed_slice();
231        loop {
232            let n = file
233                .read(&mut buf)
234                .map_err(|e| Error::io(e, source, "read"))?;
235            if n == 0 {
236                break;
237            }
238            hasher.update(&buf[..n]);
239            size += n as u64;
240        }
241        let digest = Digest {
242            hash: hex::encode(hasher.finalize()),
243            size_bytes: size,
244        };
245        let dst = self.blob_path(&digest);
246        if dst.exists() {
247            trace!(digest = %digest, source = %source.display(), "CAS put_file: already present");
248            return Ok(digest);
249        }
250
251        if let Some(parent) = dst.parent() {
252            fs::create_dir_all(parent).map_err(|e| Error::io(e, parent, "create_dir_all"))?;
253        }
254        let tmp_dir = self.tmp_dir();
255        let tmp = tempfile::NamedTempFile::new_in(&tmp_dir)
256            .map_err(|e| Error::io(e, &tmp_dir, "tempfile"))?;
257        fs::copy(source, tmp.path()).map_err(|e| Error::io(e, tmp.path(), "copy"))?;
258        let (_, tmp_path) = tmp
259            .keep()
260            .map_err(|e| Error::io(e.error, &tmp_dir, "keep"))?;
261        Self::install(&tmp_path, &dst)?;
262        trace!(digest = %digest, "CAS put_file: copied");
263        Ok(digest)
264    }
265}
266
267#[cfg(target_family = "unix")]
268const EXDEV: i32 = 18;
269
270#[cfg(not(target_family = "unix"))]
271const EXDEV: i32 = -1;
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use tempfile::TempDir;
277
278    #[test]
279    fn put_and_get_bytes() {
280        let tmp = TempDir::new().unwrap();
281        let cas = LocalCas::open(tmp.path()).unwrap();
282        let digest = cas.put_bytes(b"hello cas").unwrap();
283        assert!(cas.contains(&digest).unwrap());
284        assert_eq!(cas.get(&digest).unwrap(), b"hello cas");
285    }
286
287    #[test]
288    fn put_bytes_is_idempotent() {
289        let tmp = TempDir::new().unwrap();
290        let cas = LocalCas::open(tmp.path()).unwrap();
291        let a = cas.put_bytes(b"same").unwrap();
292        let b = cas.put_bytes(b"same").unwrap();
293        assert_eq!(a, b);
294    }
295
296    #[test]
297    fn put_file_matches_put_bytes() {
298        let tmp = TempDir::new().unwrap();
299        let cas = LocalCas::open(tmp.path()).unwrap();
300        let src = tmp.path().join("src.txt");
301        fs::write(&src, b"from disk").unwrap();
302        let d_file = cas.put_file(&src).unwrap();
303        let d_bytes = Digest::of_bytes(b"from disk");
304        assert_eq!(d_file, d_bytes);
305        assert!(cas.contains(&d_file).unwrap());
306    }
307
308    #[test]
309    fn get_to_file_materializes_content() {
310        let tmp = TempDir::new().unwrap();
311        let cas = LocalCas::open(tmp.path()).unwrap();
312        let digest = cas.put_bytes(b"materialize me").unwrap();
313        let dst = tmp.path().join("out/file.bin");
314        cas.get_to_file(&digest, &dst).unwrap();
315        assert_eq!(fs::read(&dst).unwrap(), b"materialize me");
316    }
317
318    #[test]
319    fn get_detects_corrupted_blob() {
320        let tmp = TempDir::new().unwrap();
321        let cas = LocalCas::open(tmp.path()).unwrap();
322        let digest = cas.put_bytes(b"immutable").unwrap();
323        fs::write(cas.blob_path(&digest), b"mutated").unwrap();
324
325        let err = cas.get(&digest).unwrap_err();
326        assert!(matches!(err, Error::DigestMismatch { .. }));
327    }
328
329    #[test]
330    fn mutating_materialized_file_does_not_corrupt_cas_blob() {
331        let tmp = TempDir::new().unwrap();
332        let cas = LocalCas::open(tmp.path()).unwrap();
333        let digest = cas.put_bytes(b"original").unwrap();
334        let dst = tmp.path().join("out/file.bin");
335
336        cas.get_to_file(&digest, &dst).unwrap();
337        fs::write(&dst, b"modified").unwrap();
338
339        assert_eq!(cas.get(&digest).unwrap(), b"original");
340    }
341
342    #[test]
343    fn mutating_source_after_put_file_does_not_corrupt_cas_blob() {
344        let tmp = TempDir::new().unwrap();
345        let cas = LocalCas::open(tmp.path()).unwrap();
346        let src = tmp.path().join("src.txt");
347        fs::write(&src, b"from disk").unwrap();
348
349        let digest = cas.put_file(&src).unwrap();
350        fs::write(&src, b"changed later").unwrap();
351
352        assert_eq!(cas.get(&digest).unwrap(), b"from disk");
353    }
354
355    #[test]
356    fn get_missing_returns_not_found() {
357        let tmp = TempDir::new().unwrap();
358        let cas = LocalCas::open(tmp.path()).unwrap();
359        let bogus = Digest::of_bytes(b"never written");
360        let err = cas.get(&bogus).unwrap_err();
361        assert!(matches!(err, Error::NotFound { .. }));
362    }
363
364    #[test]
365    fn contains_reflects_state() {
366        let tmp = TempDir::new().unwrap();
367        let cas = LocalCas::open(tmp.path()).unwrap();
368        let d = Digest::of_bytes(b"x");
369        assert!(!cas.contains(&d).unwrap());
370        cas.put_bytes(b"x").unwrap();
371        assert!(cas.contains(&d).unwrap());
372    }
373}