Skip to main content

tael_server/storage/
blobs.rs

1//! Content-addressed blob store for large payloads kept out of the columnar
2//! tables — LLM prompts/completions and oversized log bodies.
3//!
4//! Blobs are keyed by `sha256(content)` and stored snap-compressed at
5//! `blobs/<aa>/<bb>/<full-sha256>`. Identical content (e.g. a system prompt
6//! reused across thousands of calls, or a repeated stack trace) is stored
7//! once — the write is skipped when the hash already exists. See
8//! `docs/tael-backend-design.md` → "Payload blobs".
9
10use std::path::{Path, PathBuf};
11
12use anyhow::{Context, Result};
13use sha2::{Digest, Sha256};
14
15/// A local-filesystem content-addressed blob store. The same path layout is a
16/// valid object-store key prefix, so a future S3/R2 backend is a swap behind
17/// this type (design Phase 9).
18pub struct BlobStore {
19    root: PathBuf,
20}
21
22impl BlobStore {
23    /// Open (creating if needed) the blob store rooted at `<data_dir>/blobs`.
24    pub fn new(data_dir: &str) -> Result<Self> {
25        let root = Path::new(data_dir).join("blobs");
26        std::fs::create_dir_all(&root)
27            .with_context(|| format!("creating blob dir {}", root.display()))?;
28        Ok(Self { root })
29    }
30
31    /// Hash `content`, write it snap-compressed if not already present, and
32    /// return the hex sha256. Idempotent: re-putting identical content is a
33    /// cheap no-op (the dedup property).
34    pub fn put(&self, content: &[u8]) -> Result<String> {
35        let hash = hex::encode(Sha256::digest(content));
36        let path = self.path_for(&hash);
37        if path.exists() {
38            return Ok(hash);
39        }
40        let compressed = snap::raw::Encoder::new()
41            .compress_vec(content)
42            .context("compressing blob")?;
43        // Parent dirs: blobs/<aa>/<bb>/
44        if let Some(parent) = path.parent() {
45            std::fs::create_dir_all(parent)
46                .with_context(|| format!("creating blob shard {}", parent.display()))?;
47        }
48        // Write to a temp file then rename, so a concurrent reader never sees a
49        // half-written blob.
50        let tmp = path.with_extension("tmp");
51        std::fs::write(&tmp, &compressed)
52            .with_context(|| format!("writing blob {}", tmp.display()))?;
53        std::fs::rename(&tmp, &path)
54            .with_context(|| format!("finalizing blob {}", path.display()))?;
55        Ok(hash)
56    }
57
58    /// Fetch and decompress a blob by hex sha256. `Ok(None)` if it doesn't
59    /// exist (e.g. GC'd under retention).
60    pub fn get(&self, hash: &str) -> Result<Option<Vec<u8>>> {
61        let path = self.path_for(hash);
62        match std::fs::read(&path) {
63            Ok(compressed) => {
64                let content = snap::raw::Decoder::new()
65                    .decompress_vec(&compressed)
66                    .context("decompressing blob")?;
67                Ok(Some(content))
68            }
69            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
70            Err(e) => Err(e).with_context(|| format!("reading blob {hash}")),
71        }
72    }
73
74    /// List every stored blob hash (walks the shard tree).
75    pub fn list_hashes(&self) -> Result<Vec<String>> {
76        let mut out = Vec::new();
77        collect_hashes(&self.root, &mut out)?;
78        Ok(out)
79    }
80
81    /// Delete a blob by hash. Missing blobs are a no-op.
82    pub fn remove(&self, hash: &str) -> Result<()> {
83        match std::fs::remove_file(self.path_for(hash)) {
84            Ok(()) => Ok(()),
85            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
86            Err(e) => Err(e).with_context(|| format!("removing blob {hash}")),
87        }
88    }
89
90    /// Mark-and-sweep GC: delete every blob whose hash is not in `live`.
91    /// Returns the number of blobs removed. (Blobs are written before their
92    /// referencing row, so an orphan = a row that never landed, or a row that
93    /// retention has since dropped — both safe to collect.)
94    pub fn gc(&self, live: &std::collections::HashSet<String>) -> Result<usize> {
95        let mut removed = 0;
96        for hash in self.list_hashes()? {
97            if !live.contains(&hash) {
98                self.remove(&hash)?;
99                removed += 1;
100            }
101        }
102        Ok(removed)
103    }
104
105    /// `blobs/<aa>/<bb>/<hash>` — two-level sharding to keep directory sizes
106    /// sane. Falls back to a flat path for pathologically short hashes.
107    fn path_for(&self, hash: &str) -> PathBuf {
108        if hash.len() >= 4 {
109            self.root.join(&hash[0..2]).join(&hash[2..4]).join(hash)
110        } else {
111            self.root.join(hash)
112        }
113    }
114}
115
116/// Collect blob hashes (leaf filenames, skipping temp files) under `dir`.
117fn collect_hashes(dir: &Path, out: &mut Vec<String>) -> Result<()> {
118    if !dir.exists() {
119        return Ok(());
120    }
121    for entry in std::fs::read_dir(dir)? {
122        let path = entry?.path();
123        if path.is_dir() {
124            collect_hashes(&path, out)?;
125        } else if path.extension().and_then(|e| e.to_str()) != Some("tmp") {
126            if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
127                out.push(name.to_string());
128            }
129        }
130    }
131    Ok(())
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137
138    fn store() -> (BlobStore, tempfile::TempDir) {
139        let dir = tempfile::tempdir().unwrap();
140        let s = BlobStore::new(dir.path().to_str().unwrap()).unwrap();
141        (s, dir)
142    }
143
144    #[test]
145    fn put_then_get_round_trips() {
146        let (s, _d) = store();
147        let content = b"You are a helpful assistant.";
148        let hash = s.put(content).unwrap();
149        assert_eq!(hash.len(), 64); // sha256 hex
150        assert_eq!(s.get(&hash).unwrap().as_deref(), Some(&content[..]));
151    }
152
153    #[test]
154    fn identical_content_dedups_to_one_file() {
155        let (s, _d) = store();
156        let h1 = s.put(b"same system prompt").unwrap();
157        let h2 = s.put(b"same system prompt").unwrap();
158        assert_eq!(h1, h2);
159        // Exactly one blob file exists on disk.
160        let count = walk_count(&s.root);
161        assert_eq!(count, 1, "duplicate content should produce one blob");
162    }
163
164    #[test]
165    fn distinct_content_distinct_hashes() {
166        let (s, _d) = store();
167        assert_ne!(s.put(b"prompt a").unwrap(), s.put(b"prompt b").unwrap());
168    }
169
170    #[test]
171    fn missing_hash_returns_none() {
172        let (s, _d) = store();
173        assert!(s.get(&"0".repeat(64)).unwrap().is_none());
174    }
175
176    #[test]
177    fn gc_removes_only_unreferenced_blobs() {
178        use std::collections::HashSet;
179        let (s, _d) = store();
180        let keep = s.put(b"referenced prompt").unwrap();
181        let _drop = s.put(b"orphaned prompt").unwrap();
182        assert_eq!(s.list_hashes().unwrap().len(), 2);
183
184        let live: HashSet<String> = [keep.clone()].into_iter().collect();
185        let removed = s.gc(&live).unwrap();
186        assert_eq!(removed, 1);
187        assert_eq!(s.list_hashes().unwrap(), vec![keep.clone()]);
188        assert!(s.get(&keep).unwrap().is_some());
189    }
190
191    fn walk_count(dir: &Path) -> usize {
192        let mut n = 0;
193        for entry in std::fs::read_dir(dir).unwrap() {
194            let path = entry.unwrap().path();
195            if path.is_dir() {
196                n += walk_count(&path);
197            } else if path.extension().and_then(|e| e.to_str()) != Some("tmp") {
198                n += 1;
199            }
200        }
201        n
202    }
203}