Skip to main content

tael_server/storage/
blobs.rs

1//! Content-addressed blob store for large payloads kept out of the columnar
2//! tables — LLM prompts/completions and oversized log bodies.
3//!
4//! Blobs are keyed by `sha256(content)` and stored snap-compressed at
5//! `<aa>/<bb>/<full-sha256>`. Identical content (e.g. a system prompt reused
6//! across thousands of calls, or a repeated stack trace) is stored once — the
7//! write is skipped when the hash already exists. See
8//! `docs/tael-backend-design.md` → "Payload blobs".
9//!
10//! Storage sits on the shared [`ObjectBackend`](super::ObjectBackend): a local
11//! directory by default (`<data_dir>/blobs`, overridable via `TAEL_BLOB_DIR`),
12//! or a GCS bucket under the `cloud` feature. Content-addressing makes object
13//! storage ideal — puts are idempotent and dedup is cross-node (two shards
14//! writing the same payload collapse to one object).
15
16use std::path::{Path, PathBuf};
17
18use anyhow::{Context, Result};
19use sha2::{Digest, Sha256};
20
21use super::objstore::{DynObjectBackend, FsBackend};
22
23/// A content-addressed blob store over a pluggable object backend.
24pub struct BlobStore {
25    backend: DynObjectBackend,
26}
27
28impl BlobStore {
29    /// Open the blob store on the local filesystem at `<data_dir>/blobs`, or at
30    /// `TAEL_BLOB_DIR` when set (e.g. a separate fast mount). This is the
31    /// default, single-binary path.
32    pub fn new(data_dir: &str) -> Result<Self> {
33        let root = match std::env::var("TAEL_BLOB_DIR") {
34            Ok(dir) if !dir.trim().is_empty() => PathBuf::from(dir),
35            _ => Path::new(data_dir).join("blobs"),
36        };
37        Self::with_backend(std::sync::Arc::new(FsBackend::new(root)?))
38    }
39
40    /// Open the blob store on an arbitrary object backend (e.g. GCS). The path
41    /// layout is identical, so the backend is a transparent swap.
42    pub fn with_backend(backend: DynObjectBackend) -> Result<Self> {
43        Ok(Self { backend })
44    }
45
46    /// Hash `content`, write it snap-compressed if not already present, and
47    /// return the hex sha256. Idempotent: re-putting identical content is a
48    /// cheap no-op (the dedup property).
49    pub fn put(&self, content: &[u8]) -> Result<String> {
50        let hash = hex::encode(Sha256::digest(content));
51        let key = key_for(&hash);
52        if self.backend.exists(&key)? {
53            return Ok(hash);
54        }
55        let compressed = snap::raw::Encoder::new()
56            .compress_vec(content)
57            .context("compressing blob")?;
58        self.backend.put(&key, &compressed)?;
59        Ok(hash)
60    }
61
62    /// Fetch and decompress a blob by hex sha256. `Ok(None)` if it doesn't
63    /// exist (e.g. GC'd under retention).
64    pub fn get(&self, hash: &str) -> Result<Option<Vec<u8>>> {
65        match self.backend.get(&key_for(hash))? {
66            Some(compressed) => {
67                let content = snap::raw::Decoder::new()
68                    .decompress_vec(&compressed)
69                    .context("decompressing blob")?;
70                Ok(Some(content))
71            }
72            None => Ok(None),
73        }
74    }
75
76    /// List every stored blob hash.
77    pub fn list_hashes(&self) -> Result<Vec<String>> {
78        Ok(self
79            .backend
80            .list("")?
81            .iter()
82            .filter_map(|key| key.rsplit('/').next().map(str::to_string))
83            .collect())
84    }
85
86    /// Delete a blob by hash. Missing blobs are a no-op.
87    pub fn remove(&self, hash: &str) -> Result<()> {
88        self.backend.delete(&key_for(hash))
89    }
90
91    /// Mark-and-sweep GC: delete every blob whose hash is not in `live`.
92    /// Returns the number of blobs removed. (Blobs are written before their
93    /// referencing row, so an orphan = a row that never landed, or a row that
94    /// retention has since dropped — both safe to collect.)
95    ///
96    /// **Single-owner contract:** on a shared object store this must run on
97    /// exactly one node with a `live` set spanning all shards, or it will
98    /// delete blobs other shards still reference. `lib.rs` enforces this by
99    /// disabling per-node GC when the blob store is shared (GCS) unless this
100    /// node is the designated coordinator.
101    pub fn gc(&self, live: &std::collections::HashSet<String>) -> Result<usize> {
102        let mut removed = 0;
103        for hash in self.list_hashes()? {
104            if !live.contains(&hash) {
105                self.remove(&hash)?;
106                removed += 1;
107            }
108        }
109        Ok(removed)
110    }
111}
112
113/// `<aa>/<bb>/<hash>` — two-level sharding to keep listing/dir sizes sane.
114/// Falls back to a flat key for pathologically short hashes.
115fn key_for(hash: &str) -> String {
116    if hash.len() >= 4 {
117        format!("{}/{}/{}", &hash[0..2], &hash[2..4], hash)
118    } else {
119        hash.to_string()
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    fn store() -> (BlobStore, tempfile::TempDir) {
128        let dir = tempfile::tempdir().unwrap();
129        let s = BlobStore::new(dir.path().to_str().unwrap()).unwrap();
130        (s, dir)
131    }
132
133    #[test]
134    fn put_then_get_round_trips() {
135        let (s, _d) = store();
136        let content = b"You are a helpful assistant.";
137        let hash = s.put(content).unwrap();
138        assert_eq!(hash.len(), 64); // sha256 hex
139        assert_eq!(s.get(&hash).unwrap().as_deref(), Some(&content[..]));
140    }
141
142    #[test]
143    fn identical_content_dedups_to_one_file() {
144        let (s, _d) = store();
145        let h1 = s.put(b"same system prompt").unwrap();
146        let h2 = s.put(b"same system prompt").unwrap();
147        assert_eq!(h1, h2);
148        // Exactly one blob exists.
149        assert_eq!(
150            s.list_hashes().unwrap().len(),
151            1,
152            "duplicate content should produce one blob"
153        );
154    }
155
156    #[test]
157    fn distinct_content_distinct_hashes() {
158        let (s, _d) = store();
159        assert_ne!(s.put(b"prompt a").unwrap(), s.put(b"prompt b").unwrap());
160    }
161
162    #[test]
163    fn missing_hash_returns_none() {
164        let (s, _d) = store();
165        assert!(s.get(&"0".repeat(64)).unwrap().is_none());
166    }
167
168    #[test]
169    fn gc_removes_only_unreferenced_blobs() {
170        use std::collections::HashSet;
171        let (s, _d) = store();
172        let keep = s.put(b"referenced prompt").unwrap();
173        let _drop = s.put(b"orphaned prompt").unwrap();
174        assert_eq!(s.list_hashes().unwrap().len(), 2);
175
176        let live: HashSet<String> = [keep.clone()].into_iter().collect();
177        let removed = s.gc(&live).unwrap();
178        assert_eq!(removed, 1);
179        assert_eq!(s.list_hashes().unwrap(), vec![keep.clone()]);
180        assert!(s.get(&keep).unwrap().is_some());
181    }
182
183    #[test]
184    fn blob_dir_env_override_relocates_store() {
185        let data = tempfile::tempdir().unwrap();
186        let blobs = tempfile::tempdir().unwrap();
187        // SAFETY: single-threaded test; restored immediately after construction.
188        unsafe { std::env::set_var("TAEL_BLOB_DIR", blobs.path()) };
189        let s = BlobStore::new(data.path().to_str().unwrap()).unwrap();
190        unsafe { std::env::remove_var("TAEL_BLOB_DIR") };
191        let hash = s.put(b"relocated").unwrap();
192        // Lands under the override dir, not <data_dir>/blobs.
193        assert!(s.get(&hash).unwrap().is_some());
194        assert!(
195            !data.path().join("blobs").exists(),
196            "nothing should be written under data_dir when TAEL_BLOB_DIR is set"
197        );
198    }
199}