Skip to main content

zenith_session/
store.rs

1//! Content-addressed object store.
2//!
3//! Objects are addressed by the lowercase hex SHA-256 of their UNCOMPRESSED
4//! content and persisted DEFLATE-compressed (pure-Rust `flate2`/`miniz_oxide`
5//! backend — the workspace stays free of C dependencies). Compression is an
6//! internal detail behind [`put_object`]/[`get_object`]; because the address is
7//! the hash of the *plaintext*, the codec can be swapped (e.g. to zstd) without
8//! changing object identity or breaking dedup.
9
10use std::fmt::Write as _;
11use std::io::{Read, Write};
12use std::path::PathBuf;
13
14use flate2::Compression;
15use flate2::read::ZlibDecoder;
16use flate2::write::ZlibEncoder;
17use sha2::{Digest, Sha256};
18
19use crate::adapter::Fs;
20use crate::error::SessionError;
21use crate::layout::StorePaths;
22
23// ── Private helpers ────────────────────────────────────────────────────────────
24
25/// `<objects_dir>/<hash[0..2]>/<hash[2..]>`. Errors if `hash` is too short to shard.
26fn object_path(paths: &StorePaths, doc_id: &str, hash: &str) -> Result<PathBuf, SessionError> {
27    let shard = hash
28        .get(0..2)
29        .ok_or_else(|| SessionError::new(format!("invalid object hash (too short): {hash:?}")))?;
30    let rest = hash
31        .get(2..)
32        .ok_or_else(|| SessionError::new(format!("invalid object hash (too short): {hash:?}")))?;
33    Ok(paths.objects_dir(doc_id).join(shard).join(rest))
34}
35
36// ── Public API ─────────────────────────────────────────────────────────────────
37
38/// Compute the lowercase-hex SHA-256 address of `content`. Pure; no IO.
39pub fn object_hash(content: &[u8]) -> String {
40    let mut h = Sha256::new();
41    h.update(content);
42    let digest = h.finalize();
43    let mut s = String::with_capacity(64);
44    for b in digest {
45        let _ = write!(s, "{b:02x}");
46    }
47    s
48}
49
50/// True if an object with `hash` already exists for `doc_id`.
51pub fn has_object(fs: &impl Fs, paths: &StorePaths, doc_id: &str, hash: &str) -> bool {
52    match object_path(paths, doc_id, hash) {
53        Ok(p) => fs.exists(&p),
54        Err(_) => false,
55    }
56}
57
58/// Store `content`, returning its hash address. Idempotent / dedup'd:
59/// if the object already exists it is NOT rewritten.
60pub fn put_object(
61    fs: &impl Fs,
62    paths: &StorePaths,
63    doc_id: &str,
64    content: &[u8],
65) -> Result<String, SessionError> {
66    let hash = object_hash(content);
67    put_object_with_hash(fs, paths, doc_id, content, &hash)?;
68    Ok(hash)
69}
70
71/// Store `content` at the already-computed address `hash`. Idempotent / dedup'd:
72/// if the object already exists it is NOT rewritten. Callers that have already
73/// hashed `content` (e.g. for a dedup check) use this to avoid hashing twice;
74/// `hash` MUST equal `object_hash(content)`.
75pub fn put_object_with_hash(
76    fs: &impl Fs,
77    paths: &StorePaths,
78    doc_id: &str,
79    content: &[u8],
80    hash: &str,
81) -> Result<(), SessionError> {
82    if has_object(fs, paths, doc_id, hash) {
83        return Ok(());
84    }
85    let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
86    encoder.write_all(content).map_err(SessionError::from)?;
87    let compressed = encoder.finish().map_err(SessionError::from)?;
88    let path = object_path(paths, doc_id, hash)?;
89    let shard_dir = path
90        .parent()
91        .ok_or_else(|| SessionError::new("object path has no parent directory"))?;
92    fs.create_dir_all(shard_dir)?;
93    fs.write(&path, &compressed)?;
94    Ok(())
95}
96
97/// The on-disk (compressed) byte size of the object addressed by `hash`.
98/// Errors if the object does not exist.
99pub fn object_size(
100    fs: &impl Fs,
101    paths: &StorePaths,
102    doc_id: &str,
103    hash: &str,
104) -> Result<u64, SessionError> {
105    let path = object_path(paths, doc_id, hash)?;
106    let bytes = fs.read(&path)?;
107    Ok(u64::try_from(bytes.len()).unwrap_or(u64::MAX))
108}
109
110/// Load and decompress the object addressed by `hash` for `doc_id`.
111pub fn get_object(
112    fs: &impl Fs,
113    paths: &StorePaths,
114    doc_id: &str,
115    hash: &str,
116) -> Result<Vec<u8>, SessionError> {
117    let path = object_path(paths, doc_id, hash)?;
118    let compressed = fs.read(&path)?;
119    let mut decoder = ZlibDecoder::new(&compressed[..]);
120    let mut out = Vec::new();
121    decoder.read_to_end(&mut out).map_err(SessionError::from)?;
122    // Integrity: a content-addressed store must never hand back bytes that do
123    // not match the requested address. A mismatch means on-disk corruption.
124    let actual = object_hash(&out);
125    if actual != hash {
126        return Err(SessionError::new(format!(
127            "object integrity check failed for {hash}: decompressed content hashes to {actual}"
128        )));
129    }
130    Ok(out)
131}
132
133// ── Tests ──────────────────────────────────────────────────────────────────────
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138    use crate::adapter::MemFs;
139
140    fn setup() -> (MemFs, StorePaths) {
141        (MemFs::new(), StorePaths::new("/data"))
142    }
143
144    #[test]
145    fn object_hash_known_vector() {
146        let hash = object_hash(b"hello");
147        assert_eq!(
148            hash,
149            "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"
150        );
151        assert_eq!(hash.len(), 64);
152    }
153
154    #[test]
155    fn put_get_roundtrip() {
156        let (fs, paths) = setup();
157        let hash = put_object(&fs, &paths, "doc1", b"some content").unwrap();
158        let got = get_object(&fs, &paths, "doc1", &hash).unwrap();
159        assert_eq!(got, b"some content");
160    }
161
162    #[test]
163    fn put_with_hash_matches_put_object() {
164        let (fs, paths) = setup();
165        let content = b"precomputed-hash content";
166        let hash = object_hash(content);
167        put_object_with_hash(&fs, &paths, "doc1", content, &hash).unwrap();
168        // Same address, same readback as the hashing put_object would produce.
169        assert!(has_object(&fs, &paths, "doc1", &hash));
170        assert_eq!(get_object(&fs, &paths, "doc1", &hash).unwrap(), content);
171        // Idempotent: a second call is a no-op and still succeeds.
172        put_object_with_hash(&fs, &paths, "doc1", content, &hash).unwrap();
173        assert_eq!(get_object(&fs, &paths, "doc1", &hash).unwrap(), content);
174    }
175
176    #[test]
177    fn put_is_deterministic_and_dedup() {
178        let (fs, paths) = setup();
179        let hash1 = put_object(&fs, &paths, "doc1", b"repeated content").unwrap();
180        let hash2 = put_object(&fs, &paths, "doc1", b"repeated content").unwrap();
181        assert_eq!(hash1, hash2);
182        assert!(has_object(&fs, &paths, "doc1", &hash1));
183    }
184
185    #[test]
186    fn put_different_content_different_hash() {
187        let (fs, paths) = setup();
188        let hash_a = put_object(&fs, &paths, "doc1", b"content A").unwrap();
189        let hash_b = put_object(&fs, &paths, "doc1", b"content B").unwrap();
190        assert_ne!(hash_a, hash_b);
191    }
192
193    #[test]
194    fn get_missing_errors() {
195        let (fs, paths) = setup();
196        let missing_hash = object_hash(b"never stored");
197        let result = get_object(&fs, &paths, "doc1", &missing_hash);
198        assert!(result.is_err());
199    }
200
201    #[test]
202    fn malformed_hash_errors() {
203        let (fs, paths) = setup();
204        let result = get_object(&fs, &paths, "doc1", "x");
205        assert!(result.is_err());
206    }
207
208    #[test]
209    fn large_content_roundtrips() {
210        let (fs, paths) = setup();
211        let content: Vec<u8> = vec![0xABu8; 100_000];
212        let hash = put_object(&fs, &paths, "doc1", &content).unwrap();
213        let got = get_object(&fs, &paths, "doc1", &hash).unwrap();
214        assert_eq!(got, content);
215    }
216
217    #[test]
218    fn corrupted_object_fails_integrity_check() {
219        let (fs, paths) = setup();
220        // Store real content, then move its blob under a DIFFERENT (claimed)
221        // address so that what we read decompresses to bytes whose hash does
222        // not match the requested address — i.e. simulated on-disk corruption.
223        let real_hash = put_object(&fs, &paths, "doc1", b"the real bytes").unwrap();
224        let real_path = paths
225            .objects_dir("doc1")
226            .join(&real_hash[..2])
227            .join(&real_hash[2..]);
228        let blob = fs.read(&real_path).unwrap();
229
230        let claimed_hash = object_hash(b"a different thing entirely");
231        let claimed_path = paths
232            .objects_dir("doc1")
233            .join(&claimed_hash[..2])
234            .join(&claimed_hash[2..]);
235        fs.create_dir_all(claimed_path.parent().unwrap()).unwrap();
236        fs.write(&claimed_path, &blob).unwrap();
237
238        let result = get_object(&fs, &paths, "doc1", &claimed_hash);
239        assert!(
240            result.is_err(),
241            "integrity check must reject content that does not hash to the requested address"
242        );
243    }
244
245    #[test]
246    fn compression_actually_shrinks() {
247        let (fs, paths) = setup();
248        let content = vec![0u8; 10_000];
249        let hash = put_object(&fs, &paths, "doc1", &content).unwrap();
250        // Read the raw stored bytes at the sharded path.
251        let stored_path = paths.objects_dir("doc1").join(&hash[..2]).join(&hash[2..]);
252        let raw = fs.read(&stored_path).unwrap();
253        assert!(
254            raw.len() < 10_000,
255            "expected compressed size ({}) to be smaller than 10000",
256            raw.len()
257        );
258    }
259
260    #[test]
261    fn object_size_returns_stored_length() {
262        let (fs, paths) = setup();
263        let content = b"some stored content";
264        let hash = put_object(&fs, &paths, "doc1", content).unwrap();
265
266        let stored_path = paths.objects_dir("doc1").join(&hash[..2]).join(&hash[2..]);
267        let raw = fs.read(&stored_path).unwrap();
268        let expected = u64::try_from(raw.len()).unwrap();
269
270        let size = object_size(&fs, &paths, "doc1", &hash).unwrap();
271        assert!(size > 0, "stored size must be nonzero");
272        assert_eq!(
273            size, expected,
274            "object_size must match the raw stored file length"
275        );
276    }
277
278    #[test]
279    fn object_size_missing_hash_errors() {
280        let (fs, paths) = setup();
281        let missing = object_hash(b"never stored");
282        let result = object_size(&fs, &paths, "doc1", &missing);
283        assert!(
284            result.is_err(),
285            "object_size for a missing hash must return Err"
286        );
287    }
288}