embeddenator_fs/fs/versioned/
chunk_store.rs1use super::chunk::VersionedChunk;
10use super::types::{ChunkId, VersionMismatch, VersionedResult};
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::{Arc, RwLock};
14
15pub struct VersionedChunkStore {
37 chunks: Arc<RwLock<HashMap<ChunkId, Arc<VersionedChunk>>>>,
39
40 global_version: Arc<AtomicU64>,
43
44 hash_index: Arc<RwLock<HashMap<[u8; 8], ChunkId>>>,
47}
48
49impl VersionedChunkStore {
50 pub fn new() -> Self {
52 Self {
53 chunks: Arc::new(RwLock::new(HashMap::new())),
54 global_version: Arc::new(AtomicU64::new(0)),
55 hash_index: Arc::new(RwLock::new(HashMap::new())),
56 }
57 }
58
59 pub fn version(&self) -> u64 {
61 self.global_version.load(Ordering::Acquire)
62 }
63
64 pub fn get(&self, chunk_id: ChunkId) -> Option<(Arc<VersionedChunk>, u64)> {
69 let chunks = self.chunks.read().unwrap();
70 let version = self.version();
71 chunks
72 .get(&chunk_id)
73 .map(|chunk| (Arc::clone(chunk), version))
74 }
75
76 pub fn find_by_hash(&self, content_hash: &[u8; 8]) -> Option<(ChunkId, Arc<VersionedChunk>)> {
78 let hash_index = self.hash_index.read().unwrap();
79 let chunks = self.chunks.read().unwrap();
80
81 hash_index.get(content_hash).and_then(|&chunk_id| {
82 chunks
83 .get(&chunk_id)
84 .map(|chunk| (chunk_id, Arc::clone(chunk)))
85 })
86 }
87
88 pub fn insert(
93 &self,
94 chunk_id: ChunkId,
95 chunk: VersionedChunk,
96 expected_version: u64,
97 ) -> VersionedResult<u64> {
98 let mut chunks = self.chunks.write().unwrap();
99 let mut hash_index = self.hash_index.write().unwrap();
100
101 let current_version = self.version();
103 if current_version != expected_version {
104 return Err(VersionMismatch {
105 expected: expected_version,
106 actual: current_version,
107 });
108 }
109
110 hash_index.insert(chunk.content_hash, chunk_id);
112
113 chunks.insert(chunk_id, Arc::new(chunk));
115
116 let new_version = self.global_version.fetch_add(1, Ordering::AcqRel) + 1;
118 Ok(new_version)
119 }
120
121 pub fn batch_insert(
126 &self,
127 updates: Vec<(ChunkId, VersionedChunk)>,
128 expected_version: u64,
129 ) -> VersionedResult<u64> {
130 let mut chunks = self.chunks.write().unwrap();
131 let mut hash_index = self.hash_index.write().unwrap();
132
133 let current_version = self.version();
135 if current_version != expected_version {
136 return Err(VersionMismatch {
137 expected: expected_version,
138 actual: current_version,
139 });
140 }
141
142 for (chunk_id, chunk) in updates {
144 hash_index.insert(chunk.content_hash, chunk_id);
145 chunks.insert(chunk_id, Arc::new(chunk));
146 }
147
148 let new_version = self.global_version.fetch_add(1, Ordering::AcqRel) + 1;
150 Ok(new_version)
151 }
152
153 pub fn batch_insert_new(
161 &self,
162 updates: Vec<(ChunkId, VersionedChunk)>,
163 ) -> VersionedResult<u64> {
164 let mut chunks = self.chunks.write().unwrap();
165 let mut hash_index = self.hash_index.write().unwrap();
166
167 for (chunk_id, chunk) in updates {
169 hash_index.insert(chunk.content_hash, chunk_id);
170 chunks.insert(chunk_id, Arc::new(chunk));
171 }
172
173 let new_version = self.global_version.fetch_add(1, Ordering::AcqRel) + 1;
175 Ok(new_version)
176 }
177
178 pub fn remove(
182 &self,
183 chunk_id: ChunkId,
184 expected_version: u64,
185 ) -> VersionedResult<Option<Arc<VersionedChunk>>> {
186 let mut chunks = self.chunks.write().unwrap();
187 let mut hash_index = self.hash_index.write().unwrap();
188
189 let current_version = self.version();
191 if current_version != expected_version {
192 return Err(VersionMismatch {
193 expected: expected_version,
194 actual: current_version,
195 });
196 }
197
198 let removed = chunks.remove(&chunk_id);
200 if let Some(ref chunk) = removed {
201 hash_index.remove(&chunk.content_hash);
202 }
203
204 self.global_version.fetch_add(1, Ordering::AcqRel);
206 Ok(removed)
207 }
208
209 pub fn len(&self) -> usize {
211 self.chunks.read().unwrap().len()
212 }
213
214 pub fn is_empty(&self) -> bool {
216 self.chunks.read().unwrap().is_empty()
217 }
218
219 pub fn chunk_ids(&self) -> Vec<ChunkId> {
221 self.chunks.read().unwrap().keys().copied().collect()
222 }
223
224 pub fn iter(&self) -> Vec<(ChunkId, Arc<VersionedChunk>)> {
229 self.chunks
230 .read()
231 .unwrap()
232 .iter()
233 .map(|(&id, chunk)| (id, Arc::clone(chunk)))
234 .collect()
235 }
236
237 pub fn gc(&self, expected_version: u64) -> VersionedResult<usize> {
241 let mut chunks = self.chunks.write().unwrap();
242 let mut hash_index = self.hash_index.write().unwrap();
243
244 let current_version = self.version();
246 if current_version != expected_version {
247 return Err(VersionMismatch {
248 expected: expected_version,
249 actual: current_version,
250 });
251 }
252
253 let to_remove: Vec<ChunkId> = chunks
255 .iter()
256 .filter(|(_, chunk)| chunk.is_unreferenced())
257 .map(|(id, _)| *id)
258 .collect();
259
260 let count = to_remove.len();
261
262 for chunk_id in to_remove {
264 if let Some(chunk) = chunks.remove(&chunk_id) {
265 hash_index.remove(&chunk.content_hash);
266 }
267 }
268
269 if count > 0 {
271 self.global_version.fetch_add(1, Ordering::AcqRel);
272 }
273
274 Ok(count)
275 }
276
277 pub fn stats(&self) -> CodebookStats {
279 let chunks = self.chunks.read().unwrap();
280
281 let total_chunks = chunks.len();
282 let mut total_refs = 0u64;
283 let mut unreferenced = 0;
284 let mut total_size = 0usize;
285
286 for chunk in chunks.values() {
287 let refs = chunk.ref_count();
288 total_refs += refs as u64;
289 if refs == 0 {
290 unreferenced += 1;
291 }
292 total_size += chunk.original_size;
293 }
294
295 CodebookStats {
296 total_chunks,
297 unreferenced_chunks: unreferenced,
298 total_references: total_refs,
299 avg_references: if total_chunks > 0 {
300 total_refs as f64 / total_chunks as f64
301 } else {
302 0.0
303 },
304 total_size_bytes: total_size,
305 version: self.version(),
306 }
307 }
308}
309
310impl Default for VersionedChunkStore {
311 fn default() -> Self {
312 Self::new()
313 }
314}
315
316impl Clone for VersionedChunkStore {
317 fn clone(&self) -> Self {
318 Self {
319 chunks: Arc::clone(&self.chunks),
320 global_version: Arc::clone(&self.global_version),
321 hash_index: Arc::clone(&self.hash_index),
322 }
323 }
324}
325
326#[derive(Debug, Clone)]
328pub struct CodebookStats {
329 pub total_chunks: usize,
330 pub unreferenced_chunks: usize,
331 pub total_references: u64,
332 pub avg_references: f64,
333 pub total_size_bytes: usize,
334 pub version: u64,
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use crate::SparseVec;
341
342 fn make_test_chunk(id: usize) -> VersionedChunk {
343 let vec = SparseVec::new();
344 let hash = [(id & 0xFF) as u8; 8];
345 VersionedChunk::new(vec, 4096, hash)
346 }
347
348 #[test]
349 fn test_codebook_creation() {
350 let codebook = VersionedChunkStore::new();
351 assert_eq!(codebook.version(), 0);
352 assert!(codebook.is_empty());
353 }
354
355 #[test]
356 fn test_insert_and_get() {
357 let codebook = VersionedChunkStore::new();
358 let chunk = make_test_chunk(1);
359
360 let version = codebook.insert(1, chunk, 0).unwrap();
361 assert_eq!(version, 1);
362
363 let (retrieved, ver) = codebook.get(1).unwrap();
364 assert_eq!(retrieved.version, 0);
365 assert_eq!(ver, 1);
366 }
367
368 #[test]
369 fn test_version_mismatch() {
370 let codebook = VersionedChunkStore::new();
371 let chunk1 = make_test_chunk(1);
372 let chunk2 = make_test_chunk(2);
373
374 codebook.insert(1, chunk1, 0).unwrap();
376
377 let result = codebook.insert(2, chunk2, 0);
379 assert!(result.is_err());
380
381 match result {
382 Err(VersionMismatch { expected, actual }) => {
383 assert_eq!(expected, 0);
384 assert_eq!(actual, 1);
385 }
386 _ => panic!("Expected VersionMismatch"),
387 }
388 }
389
390 #[test]
391 fn test_batch_insert() {
392 let codebook = VersionedChunkStore::new();
393
394 let updates = vec![
395 (1, make_test_chunk(1)),
396 (2, make_test_chunk(2)),
397 (3, make_test_chunk(3)),
398 ];
399
400 let version = codebook.batch_insert(updates, 0).unwrap();
401 assert_eq!(version, 1);
402 assert_eq!(codebook.len(), 3);
403 }
404
405 #[test]
406 fn test_deduplication() {
407 let codebook = VersionedChunkStore::new();
408
409 let chunk1 = make_test_chunk(1);
410 let hash = chunk1.content_hash;
411
412 codebook.insert(1, chunk1, 0).unwrap();
413
414 let found = codebook.find_by_hash(&hash);
416 assert!(found.is_some());
417
418 let (id, _) = found.unwrap();
419 assert_eq!(id, 1);
420 }
421
422 #[test]
423 fn test_garbage_collection() {
424 let codebook = VersionedChunkStore::new();
425
426 let chunk = make_test_chunk(1);
427 codebook.insert(1, chunk.clone(), 0).unwrap();
428
429 chunk.dec_ref();
431
432 assert_eq!(codebook.len(), 1);
433
434 let removed = codebook.gc(1).unwrap();
436 assert_eq!(removed, 1);
437 assert_eq!(codebook.len(), 0);
438 }
439
440 #[test]
441 fn test_stats() {
442 let codebook = VersionedChunkStore::new();
443
444 for i in 0..10 {
445 codebook.insert(i, make_test_chunk(i), i as u64).unwrap();
446 }
447
448 let stats = codebook.stats();
449 assert_eq!(stats.total_chunks, 10);
450 assert_eq!(stats.total_size_bytes, 10 * 4096);
451 assert_eq!(stats.version, 10);
452 }
453}