tael_server/storage/
blobs.rs1use std::path::{Path, PathBuf};
11
12use anyhow::{Context, Result};
13use sha2::{Digest, Sha256};
14
15pub struct BlobStore {
19 root: PathBuf,
20}
21
22impl BlobStore {
23 pub fn new(data_dir: &str) -> Result<Self> {
25 let root = Path::new(data_dir).join("blobs");
26 std::fs::create_dir_all(&root)
27 .with_context(|| format!("creating blob dir {}", root.display()))?;
28 Ok(Self { root })
29 }
30
31 pub fn put(&self, content: &[u8]) -> Result<String> {
35 let hash = hex::encode(Sha256::digest(content));
36 let path = self.path_for(&hash);
37 if path.exists() {
38 return Ok(hash);
39 }
40 let compressed = snap::raw::Encoder::new()
41 .compress_vec(content)
42 .context("compressing blob")?;
43 if let Some(parent) = path.parent() {
45 std::fs::create_dir_all(parent)
46 .with_context(|| format!("creating blob shard {}", parent.display()))?;
47 }
48 let tmp = path.with_extension("tmp");
51 std::fs::write(&tmp, &compressed)
52 .with_context(|| format!("writing blob {}", tmp.display()))?;
53 std::fs::rename(&tmp, &path)
54 .with_context(|| format!("finalizing blob {}", path.display()))?;
55 Ok(hash)
56 }
57
58 pub fn get(&self, hash: &str) -> Result<Option<Vec<u8>>> {
61 let path = self.path_for(hash);
62 match std::fs::read(&path) {
63 Ok(compressed) => {
64 let content = snap::raw::Decoder::new()
65 .decompress_vec(&compressed)
66 .context("decompressing blob")?;
67 Ok(Some(content))
68 }
69 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
70 Err(e) => Err(e).with_context(|| format!("reading blob {hash}")),
71 }
72 }
73
74 pub fn list_hashes(&self) -> Result<Vec<String>> {
76 let mut out = Vec::new();
77 collect_hashes(&self.root, &mut out)?;
78 Ok(out)
79 }
80
81 pub fn remove(&self, hash: &str) -> Result<()> {
83 match std::fs::remove_file(self.path_for(hash)) {
84 Ok(()) => Ok(()),
85 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
86 Err(e) => Err(e).with_context(|| format!("removing blob {hash}")),
87 }
88 }
89
90 pub fn gc(&self, live: &std::collections::HashSet<String>) -> Result<usize> {
95 let mut removed = 0;
96 for hash in self.list_hashes()? {
97 if !live.contains(&hash) {
98 self.remove(&hash)?;
99 removed += 1;
100 }
101 }
102 Ok(removed)
103 }
104
105 fn path_for(&self, hash: &str) -> PathBuf {
108 if hash.len() >= 4 {
109 self.root.join(&hash[0..2]).join(&hash[2..4]).join(hash)
110 } else {
111 self.root.join(hash)
112 }
113 }
114}
115
116fn collect_hashes(dir: &Path, out: &mut Vec<String>) -> Result<()> {
118 if !dir.exists() {
119 return Ok(());
120 }
121 for entry in std::fs::read_dir(dir)? {
122 let path = entry?.path();
123 if path.is_dir() {
124 collect_hashes(&path, out)?;
125 } else if path.extension().and_then(|e| e.to_str()) != Some("tmp") {
126 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
127 out.push(name.to_string());
128 }
129 }
130 }
131 Ok(())
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137
138 fn store() -> (BlobStore, tempfile::TempDir) {
139 let dir = tempfile::tempdir().unwrap();
140 let s = BlobStore::new(dir.path().to_str().unwrap()).unwrap();
141 (s, dir)
142 }
143
144 #[test]
145 fn put_then_get_round_trips() {
146 let (s, _d) = store();
147 let content = b"You are a helpful assistant.";
148 let hash = s.put(content).unwrap();
149 assert_eq!(hash.len(), 64); assert_eq!(s.get(&hash).unwrap().as_deref(), Some(&content[..]));
151 }
152
153 #[test]
154 fn identical_content_dedups_to_one_file() {
155 let (s, _d) = store();
156 let h1 = s.put(b"same system prompt").unwrap();
157 let h2 = s.put(b"same system prompt").unwrap();
158 assert_eq!(h1, h2);
159 let count = walk_count(&s.root);
161 assert_eq!(count, 1, "duplicate content should produce one blob");
162 }
163
164 #[test]
165 fn distinct_content_distinct_hashes() {
166 let (s, _d) = store();
167 assert_ne!(s.put(b"prompt a").unwrap(), s.put(b"prompt b").unwrap());
168 }
169
170 #[test]
171 fn missing_hash_returns_none() {
172 let (s, _d) = store();
173 assert!(s.get(&"0".repeat(64)).unwrap().is_none());
174 }
175
176 #[test]
177 fn gc_removes_only_unreferenced_blobs() {
178 use std::collections::HashSet;
179 let (s, _d) = store();
180 let keep = s.put(b"referenced prompt").unwrap();
181 let _drop = s.put(b"orphaned prompt").unwrap();
182 assert_eq!(s.list_hashes().unwrap().len(), 2);
183
184 let live: HashSet<String> = [keep.clone()].into_iter().collect();
185 let removed = s.gc(&live).unwrap();
186 assert_eq!(removed, 1);
187 assert_eq!(s.list_hashes().unwrap(), vec![keep.clone()]);
188 assert!(s.get(&keep).unwrap().is_some());
189 }
190
191 fn walk_count(dir: &Path) -> usize {
192 let mut n = 0;
193 for entry in std::fs::read_dir(dir).unwrap() {
194 let path = entry.unwrap().path();
195 if path.is_dir() {
196 n += walk_count(&path);
197 } else if path.extension().and_then(|e| e.to_str()) != Some("tmp") {
198 n += 1;
199 }
200 }
201 n
202 }
203}