1use crate::digest::Digest;
4use crate::error::{Error, Result};
5use sha2::{Digest as _, Sha256};
6use std::fs;
7use std::io::{self, Read, Write};
8use std::path::{Path, PathBuf};
9use tracing::trace;
10
11pub trait Cas: Send + Sync {
15 fn contains(&self, digest: &Digest) -> Result<bool>;
21
22 fn get(&self, digest: &Digest) -> Result<Vec<u8>>;
29
30 fn get_to_file(&self, digest: &Digest, destination: &Path) -> Result<()>;
38
39 fn put_bytes(&self, bytes: &[u8]) -> Result<Digest>;
45
46 fn put_file(&self, source: &Path) -> Result<Digest>;
54}
55
56#[derive(Debug, Clone)]
66pub struct LocalCas {
67 root: PathBuf,
68}
69
70impl LocalCas {
71 pub fn open(root: impl AsRef<Path>) -> Result<Self> {
77 let root = root.as_ref().to_path_buf();
78 let cas_dir = root.join("cas").join("sha256");
79 let tmp_dir = root.join("tmp");
80 fs::create_dir_all(&cas_dir).map_err(|e| Error::io(e, &cas_dir, "create_dir_all"))?;
81 fs::create_dir_all(&tmp_dir).map_err(|e| Error::io(e, &tmp_dir, "create_dir_all"))?;
82 Ok(Self { root })
83 }
84
85 #[must_use]
87 pub fn root(&self) -> &Path {
88 &self.root
89 }
90
91 #[must_use]
93 pub fn blob_path(&self, digest: &Digest) -> PathBuf {
94 let (prefix, rest) = digest.hash.split_at(2);
95 self.root.join("cas").join("sha256").join(prefix).join(rest)
96 }
97
98 fn tmp_dir(&self) -> PathBuf {
99 self.root.join("tmp")
100 }
101
102 fn verify_bytes(digest: &Digest, bytes: &[u8]) -> Result<()> {
103 let actual = Digest::of_bytes(bytes);
104 if &actual != digest {
105 return Err(Error::digest_mismatch(
106 digest.to_resource(),
107 actual.to_resource(),
108 ));
109 }
110 Ok(())
111 }
112
113 fn verify_file(path: &Path, digest: &Digest) -> Result<()> {
114 let mut file = fs::File::open(path).map_err(|e| Error::io(e, path, "open"))?;
115 let mut hasher = Sha256::new();
116 let mut size: u64 = 0;
117 let mut buffer: Box<[u8]> = vec![0u8; 64 * 1024].into_boxed_slice();
118
119 loop {
120 let count = file
121 .read(&mut buffer)
122 .map_err(|e| Error::io(e, path, "read"))?;
123 if count == 0 {
124 break;
125 }
126 hasher.update(&buffer[..count]);
127 size += count as u64;
128 }
129
130 let actual = Digest {
131 hash: hex::encode(hasher.finalize()),
132 size_bytes: size,
133 };
134 if &actual != digest {
135 return Err(Error::digest_mismatch(
136 digest.to_resource(),
137 actual.to_resource(),
138 ));
139 }
140 Ok(())
141 }
142
143 fn install(src: &Path, dst: &Path) -> Result<()> {
146 if let Some(parent) = dst.parent() {
147 fs::create_dir_all(parent).map_err(|e| Error::io(e, parent, "create_dir_all"))?;
148 }
149 if dst.exists() {
150 let _ = fs::remove_file(src);
152 return Ok(());
153 }
154 match fs::rename(src, dst) {
155 Ok(()) => Ok(()),
156 Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
157 let _ = fs::remove_file(src);
158 Ok(())
159 }
160 Err(e) if e.raw_os_error() == Some(EXDEV) => {
161 fs::copy(src, dst).map_err(|e2| Error::io(e2, dst, "copy"))?;
163 let _ = fs::remove_file(src);
164 Ok(())
165 }
166 Err(e) => Err(Error::io(e, dst, "rename")),
167 }
168 }
169}
170
171impl Cas for LocalCas {
172 fn contains(&self, digest: &Digest) -> Result<bool> {
173 Ok(self.blob_path(digest).exists())
174 }
175
176 fn get(&self, digest: &Digest) -> Result<Vec<u8>> {
177 let path = self.blob_path(digest);
178 match fs::read(&path) {
179 Ok(bytes) => {
180 Self::verify_bytes(digest, &bytes)?;
181 Ok(bytes)
182 }
183 Err(e) if e.kind() == io::ErrorKind::NotFound => {
184 Err(Error::not_found(digest.hash.clone()))
185 }
186 Err(e) => Err(Error::io(e, &path, "read")),
187 }
188 }
189
190 fn get_to_file(&self, digest: &Digest, destination: &Path) -> Result<()> {
191 let src = self.blob_path(digest);
192 if !src.exists() {
193 return Err(Error::not_found(digest.hash.clone()));
194 }
195 if let Some(parent) = destination.parent() {
196 fs::create_dir_all(parent).map_err(|e| Error::io(e, parent, "create_dir_all"))?;
197 }
198 fs::copy(&src, destination).map_err(|e| Error::io(e, destination, "copy"))?;
199 Self::verify_file(destination, digest)
200 }
201
202 fn put_bytes(&self, bytes: &[u8]) -> Result<Digest> {
203 let digest = Digest::of_bytes(bytes);
204 let dst = self.blob_path(&digest);
205 if dst.exists() {
206 trace!(digest = %digest, "CAS put_bytes: already present");
207 return Ok(digest);
208 }
209 let tmp_dir = self.tmp_dir();
210 let mut tmp = tempfile::NamedTempFile::new_in(&tmp_dir)
211 .map_err(|e| Error::io(e, &tmp_dir, "tempfile"))?;
212 tmp.write_all(bytes)
213 .map_err(|e| Error::io(e, tmp.path(), "write"))?;
214 tmp.as_file()
215 .sync_all()
216 .map_err(|e| Error::io(e, tmp.path(), "fsync"))?;
217 let (_, tmp_path) = tmp
218 .keep()
219 .map_err(|e| Error::io(e.error, &tmp_dir, "keep"))?;
220 Self::install(&tmp_path, &dst)?;
221 trace!(digest = %digest, "CAS put_bytes: installed");
222 Ok(digest)
223 }
224
225 fn put_file(&self, source: &Path) -> Result<Digest> {
226 let mut file = fs::File::open(source).map_err(|e| Error::io(e, source, "open"))?;
228 let mut hasher = Sha256::new();
229 let mut size: u64 = 0;
230 let mut buf: Box<[u8]> = vec![0u8; 64 * 1024].into_boxed_slice();
231 loop {
232 let n = file
233 .read(&mut buf)
234 .map_err(|e| Error::io(e, source, "read"))?;
235 if n == 0 {
236 break;
237 }
238 hasher.update(&buf[..n]);
239 size += n as u64;
240 }
241 let digest = Digest {
242 hash: hex::encode(hasher.finalize()),
243 size_bytes: size,
244 };
245 let dst = self.blob_path(&digest);
246 if dst.exists() {
247 trace!(digest = %digest, source = %source.display(), "CAS put_file: already present");
248 return Ok(digest);
249 }
250
251 if let Some(parent) = dst.parent() {
252 fs::create_dir_all(parent).map_err(|e| Error::io(e, parent, "create_dir_all"))?;
253 }
254 let tmp_dir = self.tmp_dir();
255 let tmp = tempfile::NamedTempFile::new_in(&tmp_dir)
256 .map_err(|e| Error::io(e, &tmp_dir, "tempfile"))?;
257 fs::copy(source, tmp.path()).map_err(|e| Error::io(e, tmp.path(), "copy"))?;
258 let (_, tmp_path) = tmp
259 .keep()
260 .map_err(|e| Error::io(e.error, &tmp_dir, "keep"))?;
261 Self::install(&tmp_path, &dst)?;
262 trace!(digest = %digest, "CAS put_file: copied");
263 Ok(digest)
264 }
265}
266
267#[cfg(target_family = "unix")]
268const EXDEV: i32 = 18;
269
270#[cfg(not(target_family = "unix"))]
271const EXDEV: i32 = -1;
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use tempfile::TempDir;
277
278 #[test]
279 fn put_and_get_bytes() {
280 let tmp = TempDir::new().unwrap();
281 let cas = LocalCas::open(tmp.path()).unwrap();
282 let digest = cas.put_bytes(b"hello cas").unwrap();
283 assert!(cas.contains(&digest).unwrap());
284 assert_eq!(cas.get(&digest).unwrap(), b"hello cas");
285 }
286
287 #[test]
288 fn put_bytes_is_idempotent() {
289 let tmp = TempDir::new().unwrap();
290 let cas = LocalCas::open(tmp.path()).unwrap();
291 let a = cas.put_bytes(b"same").unwrap();
292 let b = cas.put_bytes(b"same").unwrap();
293 assert_eq!(a, b);
294 }
295
296 #[test]
297 fn put_file_matches_put_bytes() {
298 let tmp = TempDir::new().unwrap();
299 let cas = LocalCas::open(tmp.path()).unwrap();
300 let src = tmp.path().join("src.txt");
301 fs::write(&src, b"from disk").unwrap();
302 let d_file = cas.put_file(&src).unwrap();
303 let d_bytes = Digest::of_bytes(b"from disk");
304 assert_eq!(d_file, d_bytes);
305 assert!(cas.contains(&d_file).unwrap());
306 }
307
308 #[test]
309 fn get_to_file_materializes_content() {
310 let tmp = TempDir::new().unwrap();
311 let cas = LocalCas::open(tmp.path()).unwrap();
312 let digest = cas.put_bytes(b"materialize me").unwrap();
313 let dst = tmp.path().join("out/file.bin");
314 cas.get_to_file(&digest, &dst).unwrap();
315 assert_eq!(fs::read(&dst).unwrap(), b"materialize me");
316 }
317
318 #[test]
319 fn get_detects_corrupted_blob() {
320 let tmp = TempDir::new().unwrap();
321 let cas = LocalCas::open(tmp.path()).unwrap();
322 let digest = cas.put_bytes(b"immutable").unwrap();
323 fs::write(cas.blob_path(&digest), b"mutated").unwrap();
324
325 let err = cas.get(&digest).unwrap_err();
326 assert!(matches!(err, Error::DigestMismatch { .. }));
327 }
328
329 #[test]
330 fn mutating_materialized_file_does_not_corrupt_cas_blob() {
331 let tmp = TempDir::new().unwrap();
332 let cas = LocalCas::open(tmp.path()).unwrap();
333 let digest = cas.put_bytes(b"original").unwrap();
334 let dst = tmp.path().join("out/file.bin");
335
336 cas.get_to_file(&digest, &dst).unwrap();
337 fs::write(&dst, b"modified").unwrap();
338
339 assert_eq!(cas.get(&digest).unwrap(), b"original");
340 }
341
342 #[test]
343 fn mutating_source_after_put_file_does_not_corrupt_cas_blob() {
344 let tmp = TempDir::new().unwrap();
345 let cas = LocalCas::open(tmp.path()).unwrap();
346 let src = tmp.path().join("src.txt");
347 fs::write(&src, b"from disk").unwrap();
348
349 let digest = cas.put_file(&src).unwrap();
350 fs::write(&src, b"changed later").unwrap();
351
352 assert_eq!(cas.get(&digest).unwrap(), b"from disk");
353 }
354
355 #[test]
356 fn get_missing_returns_not_found() {
357 let tmp = TempDir::new().unwrap();
358 let cas = LocalCas::open(tmp.path()).unwrap();
359 let bogus = Digest::of_bytes(b"never written");
360 let err = cas.get(&bogus).unwrap_err();
361 assert!(matches!(err, Error::NotFound { .. }));
362 }
363
364 #[test]
365 fn contains_reflects_state() {
366 let tmp = TempDir::new().unwrap();
367 let cas = LocalCas::open(tmp.path()).unwrap();
368 let d = Digest::of_bytes(b"x");
369 assert!(!cas.contains(&d).unwrap());
370 cas.put_bytes(b"x").unwrap();
371 assert!(cas.contains(&d).unwrap());
372 }
373}