Skip to main content

pf_core/
cas.rs

1// SPDX-License-Identifier: MIT
2//! Content-addressed blob store.
3//!
4//! Two implementations ship in v1:
5//! - [`FsBlobStore`]: sharded on-disk store, zstd-19 compressed at rest,
6//!   atomic write via temp + rename. Default backing for `~/.processfork`.
7//! - [`MemBlobStore`]: in-memory; used for unit tests and the `--ephemeral`
8//!   CLI flag.
9//!
10//! Both implement [`BlobStore`].
11
12use crate::digest::Digest256;
13use crate::error::{Error, Result};
14
15use dashmap::DashMap;
16use parking_lot::Mutex;
17use std::fs;
18use std::io::{Read, Write};
19use std::path::{Path, PathBuf};
20
21/// Default zstd compression level. Spec §4.2 mandates 19 (high ratio,
22/// reasonable encode CPU at our blob sizes).
23pub const ZSTD_LEVEL: i32 = 19;
24
25/// A read/write content-addressed store.
26///
27/// Implementations MUST guarantee that `get(put(x)?)?` round-trips to bytes
28/// identical to `x`, and that the returned digest equals
29/// [`Digest256::of`](crate::digest::Digest256::of) of `x`.
30pub trait BlobStore: Send + Sync {
31    /// Insert `bytes`, return its content digest. Idempotent: storing the
32    /// same payload twice produces the same digest and is a no-op the second
33    /// time.
34    fn put(&self, bytes: &[u8]) -> Result<Digest256>;
35
36    /// Retrieve the bytes for a previously-stored digest.
37    fn get(&self, digest: &Digest256) -> Result<Vec<u8>>;
38
39    /// Cheap existence check.
40    fn contains(&self, digest: &Digest256) -> Result<bool>;
41
42    /// Total physical bytes stored on disk (compressed). Implementations may
43    /// approximate; used for `pf status` and the storage-efficiency
44    /// microbenchmark.
45    fn physical_bytes(&self) -> Result<u64>;
46}
47
48// ------------------------- FsBlobStore -------------------------
49
50/// On-disk content-addressed store, sharded by digest prefix.
51///
52/// Layout (rooted at `<root>/blobs/sha256/`):
53///
54/// ```text
55/// blobs/sha256/
56///   ab/
57///     ab12cd34…ff.zst        (zstd-19 compressed payload)
58///   cd/
59///     cd56ef78…00.zst
60/// ```
61///
62/// The two-byte shard prefix keeps directory entry counts <65 536 even at
63/// large stores. Atomic write is `temp file in same dir → fsync → rename`.
64///
65/// Thread-safe (concurrent `put`/`get` ok). On a race two writers may both
66/// produce the same temp file path — we add a per-process counter to dodge.
67#[derive(Debug)]
68pub struct FsBlobStore {
69    root: PathBuf,
70    /// Per-process tempfile counter to avoid intra-process write races.
71    counter: Mutex<u64>,
72}
73
74impl FsBlobStore {
75    /// Open (or create) an on-disk store rooted at `root`.
76    ///
77    /// Creates `root/blobs/sha256/` if missing. Idempotent.
78    pub fn open(root: impl AsRef<Path>) -> Result<Self> {
79        let root = root.as_ref().to_path_buf();
80        let blobs = root.join("blobs").join("sha256");
81        fs::create_dir_all(&blobs)?;
82        Ok(Self {
83            root,
84            counter: Mutex::new(0),
85        })
86    }
87
88    /// The root directory passed to [`Self::open`].
89    pub fn root(&self) -> &Path {
90        &self.root
91    }
92
93    /// Resolve the on-disk path for a given digest, creating the shard
94    /// subdirectory if it does not exist.
95    fn path_for(&self, digest: &Digest256) -> Result<PathBuf> {
96        let hex = digest.hex();
97        let shard = &hex[..2];
98        let dir = self.root.join("blobs").join("sha256").join(shard);
99        fs::create_dir_all(&dir)?;
100        Ok(dir.join(format!("{hex}.zst")))
101    }
102
103    /// Walk the entire store and sum compressed file sizes.
104    fn walk_size(dir: &Path) -> Result<u64> {
105        let mut total = 0u64;
106        if !dir.exists() {
107            return Ok(0);
108        }
109        for entry in fs::read_dir(dir)? {
110            let entry = entry?;
111            let ty = entry.file_type()?;
112            if ty.is_dir() {
113                total = total.saturating_add(Self::walk_size(&entry.path())?);
114            } else if ty.is_file() {
115                total = total.saturating_add(entry.metadata()?.len());
116            }
117        }
118        Ok(total)
119    }
120}
121
122impl BlobStore for FsBlobStore {
123    fn put(&self, bytes: &[u8]) -> Result<Digest256> {
124        let digest = Digest256::of(bytes);
125        let final_path = self.path_for(&digest)?;
126
127        // Idempotency: identical content → same digest → same path.
128        if final_path.exists() {
129            return Ok(digest);
130        }
131
132        // Compose a unique temp path in the SAME directory so rename is atomic
133        // (POSIX guarantee). Counter avoids races between concurrent writers
134        // in the same process; PID disambiguates between concurrent processes.
135        let counter = {
136            let mut g = self.counter.lock();
137            *g = g.wrapping_add(1);
138            *g
139        };
140        let pid = std::process::id();
141        let tmp_path = final_path.with_extension(format!("tmp.{pid}.{counter}"));
142
143        // Stream-encode with zstd-19. We allocate a single Vec because our
144        // largest blob (a KV-cache page) is well under 64 MiB; switch to
145        // streaming if we ever exceed that.
146        let compressed = zstd::encode_all(bytes, ZSTD_LEVEL)?;
147
148        // Write + fsync + atomic rename.
149        {
150            let mut f = fs::File::create(&tmp_path)?;
151            f.write_all(&compressed)?;
152            f.sync_all()?;
153        }
154        match fs::rename(&tmp_path, &final_path) {
155            Ok(()) => Ok(digest),
156            Err(e) => {
157                // Best-effort cleanup of the temp file; ignore errors.
158                let _ = fs::remove_file(&tmp_path);
159                Err(e.into())
160            }
161        }
162    }
163
164    fn get(&self, digest: &Digest256) -> Result<Vec<u8>> {
165        let path = self.path_for(digest)?;
166        let mut f = fs::File::open(&path)?;
167        let mut compressed = Vec::new();
168        f.read_to_end(&mut compressed)?;
169        let bytes = zstd::decode_all(compressed.as_slice())?;
170
171        // Defence in depth: re-hash on read. If anything corrupted the file,
172        // we surface it as `Error::Integrity` instead of returning bad bytes.
173        let observed = Digest256::of(&bytes);
174        if &observed != digest {
175            return Err(Error::Integrity(format!(
176                "blob {digest} on disk hashes to {observed}"
177            )));
178        }
179        Ok(bytes)
180    }
181
182    fn contains(&self, digest: &Digest256) -> Result<bool> {
183        Ok(self.path_for(digest)?.exists())
184    }
185
186    fn physical_bytes(&self) -> Result<u64> {
187        Self::walk_size(&self.root.join("blobs"))
188    }
189}
190
191// ------------------------- MemBlobStore -------------------------
192
193/// In-memory CAS — used by unit tests and the (planned) `--ephemeral` CLI
194/// flag. Stores raw bytes; no compression because we already paid for the
195/// allocation.
196#[derive(Debug, Default)]
197pub struct MemBlobStore {
198    inner: DashMap<Digest256, Vec<u8>>,
199}
200
201impl MemBlobStore {
202    /// Construct an empty in-memory store.
203    pub fn new() -> Self {
204        Self::default()
205    }
206}
207
208impl BlobStore for MemBlobStore {
209    fn put(&self, bytes: &[u8]) -> Result<Digest256> {
210        let d = Digest256::of(bytes);
211        self.inner
212            .entry(d.clone())
213            .or_insert_with(|| bytes.to_vec());
214        Ok(d)
215    }
216
217    fn get(&self, digest: &Digest256) -> Result<Vec<u8>> {
218        self.inner
219            .get(digest)
220            .map(|r| r.value().clone())
221            .ok_or_else(|| Error::Integrity(format!("not in mem store: {digest}")))
222    }
223
224    fn contains(&self, digest: &Digest256) -> Result<bool> {
225        Ok(self.inner.contains_key(digest))
226    }
227
228    fn physical_bytes(&self) -> Result<u64> {
229        Ok(self
230            .inner
231            .iter()
232            .map(|kv| kv.value().len() as u64)
233            .sum::<u64>())
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use tempfile::TempDir;
241
242    #[test]
243    fn fs_round_trip_byte_identical() {
244        let dir = TempDir::new().unwrap();
245        let store = FsBlobStore::open(dir.path()).unwrap();
246        let payload = b"the quick brown fox".to_vec();
247        let d = store.put(&payload).unwrap();
248        assert_eq!(d, Digest256::of(&payload));
249        assert!(store.contains(&d).unwrap());
250        let back = store.get(&d).unwrap();
251        assert_eq!(back, payload);
252    }
253
254    #[test]
255    fn fs_dedupes_identical_writes() {
256        let dir = TempDir::new().unwrap();
257        let store = FsBlobStore::open(dir.path()).unwrap();
258        let payload = vec![0xABu8; 1024];
259        let d1 = store.put(&payload).unwrap();
260        let size_after_first = store.physical_bytes().unwrap();
261        let d2 = store.put(&payload).unwrap();
262        let size_after_second = store.physical_bytes().unwrap();
263        assert_eq!(d1, d2);
264        assert_eq!(
265            size_after_first, size_after_second,
266            "second put must be a no-op"
267        );
268    }
269
270    #[test]
271    fn fs_detects_corruption() {
272        let dir = TempDir::new().unwrap();
273        let store = FsBlobStore::open(dir.path()).unwrap();
274        let d = store.put(b"original").unwrap();
275        let path = store.path_for(&d).unwrap();
276        // Overwrite with garbage; the on-read re-hash must catch it.
277        fs::write(&path, b"\x28\xb5\x2f\xfd\x00garbage").unwrap();
278        let err = store.get(&d).unwrap_err();
279        assert!(matches!(err, Error::Integrity(_)) || matches!(err, Error::Io(_)));
280    }
281
282    #[test]
283    fn mem_round_trip() {
284        let store = MemBlobStore::new();
285        let d = store.put(b"hello").unwrap();
286        assert_eq!(store.get(&d).unwrap(), b"hello".to_vec());
287        assert!(store.contains(&d).unwrap());
288    }
289
290    #[test]
291    fn fs_sharding_uses_first_two_hex_chars() {
292        let dir = TempDir::new().unwrap();
293        let store = FsBlobStore::open(dir.path()).unwrap();
294        let d = store.put(b"sharding test").unwrap();
295        let path = store.path_for(&d).unwrap();
296        let parent = path
297            .parent()
298            .unwrap()
299            .file_name()
300            .unwrap()
301            .to_str()
302            .unwrap();
303        assert_eq!(parent, &d.hex()[..2]);
304    }
305}