1use crate::digest::Digest256;
13use crate::error::{Error, Result};
14
15use dashmap::DashMap;
16use parking_lot::Mutex;
17use std::fs;
18use std::io::{Read, Write};
19use std::path::{Path, PathBuf};
20
21pub const ZSTD_LEVEL: i32 = 19;
24
25pub trait BlobStore: Send + Sync {
31 fn put(&self, bytes: &[u8]) -> Result<Digest256>;
35
36 fn get(&self, digest: &Digest256) -> Result<Vec<u8>>;
38
39 fn contains(&self, digest: &Digest256) -> Result<bool>;
41
42 fn physical_bytes(&self) -> Result<u64>;
46}
47
48#[derive(Debug)]
68pub struct FsBlobStore {
69 root: PathBuf,
70 counter: Mutex<u64>,
72}
73
74impl FsBlobStore {
75 pub fn open(root: impl AsRef<Path>) -> Result<Self> {
79 let root = root.as_ref().to_path_buf();
80 let blobs = root.join("blobs").join("sha256");
81 fs::create_dir_all(&blobs)?;
82 Ok(Self {
83 root,
84 counter: Mutex::new(0),
85 })
86 }
87
88 pub fn root(&self) -> &Path {
90 &self.root
91 }
92
93 fn path_for(&self, digest: &Digest256) -> Result<PathBuf> {
96 let hex = digest.hex();
97 let shard = &hex[..2];
98 let dir = self.root.join("blobs").join("sha256").join(shard);
99 fs::create_dir_all(&dir)?;
100 Ok(dir.join(format!("{hex}.zst")))
101 }
102
103 fn walk_size(dir: &Path) -> Result<u64> {
105 let mut total = 0u64;
106 if !dir.exists() {
107 return Ok(0);
108 }
109 for entry in fs::read_dir(dir)? {
110 let entry = entry?;
111 let ty = entry.file_type()?;
112 if ty.is_dir() {
113 total = total.saturating_add(Self::walk_size(&entry.path())?);
114 } else if ty.is_file() {
115 total = total.saturating_add(entry.metadata()?.len());
116 }
117 }
118 Ok(total)
119 }
120}
121
122impl BlobStore for FsBlobStore {
123 fn put(&self, bytes: &[u8]) -> Result<Digest256> {
124 let digest = Digest256::of(bytes);
125 let final_path = self.path_for(&digest)?;
126
127 if final_path.exists() {
129 return Ok(digest);
130 }
131
132 let counter = {
136 let mut g = self.counter.lock();
137 *g = g.wrapping_add(1);
138 *g
139 };
140 let pid = std::process::id();
141 let tmp_path = final_path.with_extension(format!("tmp.{pid}.{counter}"));
142
143 let compressed = zstd::encode_all(bytes, ZSTD_LEVEL)?;
147
148 {
150 let mut f = fs::File::create(&tmp_path)?;
151 f.write_all(&compressed)?;
152 f.sync_all()?;
153 }
154 match fs::rename(&tmp_path, &final_path) {
155 Ok(()) => Ok(digest),
156 Err(e) => {
157 let _ = fs::remove_file(&tmp_path);
159 Err(e.into())
160 }
161 }
162 }
163
164 fn get(&self, digest: &Digest256) -> Result<Vec<u8>> {
165 let path = self.path_for(digest)?;
166 let mut f = fs::File::open(&path)?;
167 let mut compressed = Vec::new();
168 f.read_to_end(&mut compressed)?;
169 let bytes = zstd::decode_all(compressed.as_slice())?;
170
171 let observed = Digest256::of(&bytes);
174 if &observed != digest {
175 return Err(Error::Integrity(format!(
176 "blob {digest} on disk hashes to {observed}"
177 )));
178 }
179 Ok(bytes)
180 }
181
182 fn contains(&self, digest: &Digest256) -> Result<bool> {
183 Ok(self.path_for(digest)?.exists())
184 }
185
186 fn physical_bytes(&self) -> Result<u64> {
187 Self::walk_size(&self.root.join("blobs"))
188 }
189}
190
191#[derive(Debug, Default)]
197pub struct MemBlobStore {
198 inner: DashMap<Digest256, Vec<u8>>,
199}
200
201impl MemBlobStore {
202 pub fn new() -> Self {
204 Self::default()
205 }
206}
207
208impl BlobStore for MemBlobStore {
209 fn put(&self, bytes: &[u8]) -> Result<Digest256> {
210 let d = Digest256::of(bytes);
211 self.inner
212 .entry(d.clone())
213 .or_insert_with(|| bytes.to_vec());
214 Ok(d)
215 }
216
217 fn get(&self, digest: &Digest256) -> Result<Vec<u8>> {
218 self.inner
219 .get(digest)
220 .map(|r| r.value().clone())
221 .ok_or_else(|| Error::Integrity(format!("not in mem store: {digest}")))
222 }
223
224 fn contains(&self, digest: &Digest256) -> Result<bool> {
225 Ok(self.inner.contains_key(digest))
226 }
227
228 fn physical_bytes(&self) -> Result<u64> {
229 Ok(self
230 .inner
231 .iter()
232 .map(|kv| kv.value().len() as u64)
233 .sum::<u64>())
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use tempfile::TempDir;
241
242 #[test]
243 fn fs_round_trip_byte_identical() {
244 let dir = TempDir::new().unwrap();
245 let store = FsBlobStore::open(dir.path()).unwrap();
246 let payload = b"the quick brown fox".to_vec();
247 let d = store.put(&payload).unwrap();
248 assert_eq!(d, Digest256::of(&payload));
249 assert!(store.contains(&d).unwrap());
250 let back = store.get(&d).unwrap();
251 assert_eq!(back, payload);
252 }
253
254 #[test]
255 fn fs_dedupes_identical_writes() {
256 let dir = TempDir::new().unwrap();
257 let store = FsBlobStore::open(dir.path()).unwrap();
258 let payload = vec![0xABu8; 1024];
259 let d1 = store.put(&payload).unwrap();
260 let size_after_first = store.physical_bytes().unwrap();
261 let d2 = store.put(&payload).unwrap();
262 let size_after_second = store.physical_bytes().unwrap();
263 assert_eq!(d1, d2);
264 assert_eq!(
265 size_after_first, size_after_second,
266 "second put must be a no-op"
267 );
268 }
269
270 #[test]
271 fn fs_detects_corruption() {
272 let dir = TempDir::new().unwrap();
273 let store = FsBlobStore::open(dir.path()).unwrap();
274 let d = store.put(b"original").unwrap();
275 let path = store.path_for(&d).unwrap();
276 fs::write(&path, b"\x28\xb5\x2f\xfd\x00garbage").unwrap();
278 let err = store.get(&d).unwrap_err();
279 assert!(matches!(err, Error::Integrity(_)) || matches!(err, Error::Io(_)));
280 }
281
282 #[test]
283 fn mem_round_trip() {
284 let store = MemBlobStore::new();
285 let d = store.put(b"hello").unwrap();
286 assert_eq!(store.get(&d).unwrap(), b"hello".to_vec());
287 assert!(store.contains(&d).unwrap());
288 }
289
290 #[test]
291 fn fs_sharding_uses_first_two_hex_chars() {
292 let dir = TempDir::new().unwrap();
293 let store = FsBlobStore::open(dir.path()).unwrap();
294 let d = store.put(b"sharding test").unwrap();
295 let path = store.path_for(&d).unwrap();
296 let parent = path
297 .parent()
298 .unwrap()
299 .file_name()
300 .unwrap()
301 .to_str()
302 .unwrap();
303 assert_eq!(parent, &d.hex()[..2]);
304 }
305}