Skip to main content

embeddenator_fs/fs/versioned/
chunk_store.rs

1//! Versioned chunk store with optimistic locking
2//!
3//! The chunk store is a HashMap that stores chunk ID → VersionedChunk mappings.
4//! This is NOT the VSA codebook (base vectors) - that's in embeddenator-vsa.
5//! This stores the encoded chunks that make up files in the engram.
6//!
7//! It supports concurrent reads and writes with optimistic locking for conflict detection.
8
9use super::chunk::VersionedChunk;
10use super::types::{ChunkId, VersionMismatch, VersionedResult};
11use std::collections::HashMap;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::{Arc, RwLock};
14
15/// A versioned chunk store with optimistic locking
16///
17/// The chunk store maintains mappings from chunk IDs to versioned chunks. Multiple readers
18/// can access the store concurrently without blocking. Writers check the global
19/// version before committing changes to detect conflicts.
20///
21/// Note: This is distinct from the VSA codebook (base vectors used for encoding).
22/// The VSA codebook is in embeddenator-vsa and is typically static.
23/// This chunk store maps file chunk IDs to their VSA-encoded representations.
24///
25/// ## Concurrency Model
26///
27/// ```text
28/// Reader 1 ─┐
29///           ├─→ RwLock::read() ──→ Success (shared access)
30/// Reader 2 ─┘
31///
32/// Writer 1 ──→ RwLock::write() ──→ Check version ──→ Update ──→ Increment version
33///                                        ↓
34///                                   If changed → VersionMismatch
35/// ```
36pub struct VersionedChunkStore {
37    /// Map of chunk ID to versioned chunk (protected by RwLock)
38    chunks: Arc<RwLock<HashMap<ChunkId, Arc<VersionedChunk>>>>,
39
40    /// Global version number for the entire chunk store
41    /// Incremented on every write operation
42    global_version: Arc<AtomicU64>,
43
44    /// Content hash index for deduplication
45    /// Maps content hash → chunk ID
46    hash_index: Arc<RwLock<HashMap<[u8; 8], ChunkId>>>,
47}
48
49impl VersionedChunkStore {
50    /// Create a new empty versioned codebook
51    pub fn new() -> Self {
52        Self {
53            chunks: Arc::new(RwLock::new(HashMap::new())),
54            global_version: Arc::new(AtomicU64::new(0)),
55            hash_index: Arc::new(RwLock::new(HashMap::new())),
56        }
57    }
58
59    /// Get the current global version
60    pub fn version(&self) -> u64 {
61        self.global_version.load(Ordering::Acquire)
62    }
63
64    /// Get a chunk by ID (non-blocking read)
65    ///
66    /// Returns the chunk and the global version at the time of read.
67    /// The version can be used later for optimistic locking on writes.
68    pub fn get(&self, chunk_id: ChunkId) -> Option<(Arc<VersionedChunk>, u64)> {
69        let chunks = self.chunks.read().unwrap();
70        let version = self.version();
71        chunks
72            .get(&chunk_id)
73            .map(|chunk| (Arc::clone(chunk), version))
74    }
75
76    /// Check if a chunk with the given content hash already exists (deduplication)
77    pub fn find_by_hash(&self, content_hash: &[u8; 8]) -> Option<(ChunkId, Arc<VersionedChunk>)> {
78        let hash_index = self.hash_index.read().unwrap();
79        let chunks = self.chunks.read().unwrap();
80
81        hash_index.get(content_hash).and_then(|&chunk_id| {
82            chunks
83                .get(&chunk_id)
84                .map(|chunk| (chunk_id, Arc::clone(chunk)))
85        })
86    }
87
88    /// Insert or update a single chunk with optimistic locking
89    ///
90    /// Returns the new global version on success, or VersionMismatch if
91    /// the expected version doesn't match the current version.
92    pub fn insert(
93        &self,
94        chunk_id: ChunkId,
95        chunk: VersionedChunk,
96        expected_version: u64,
97    ) -> VersionedResult<u64> {
98        let mut chunks = self.chunks.write().unwrap();
99        let mut hash_index = self.hash_index.write().unwrap();
100
101        // Check version
102        let current_version = self.version();
103        if current_version != expected_version {
104            return Err(VersionMismatch {
105                expected: expected_version,
106                actual: current_version,
107            });
108        }
109
110        // Update hash index
111        hash_index.insert(chunk.content_hash, chunk_id);
112
113        // Insert chunk
114        chunks.insert(chunk_id, Arc::new(chunk));
115
116        // Increment version
117        let new_version = self.global_version.fetch_add(1, Ordering::AcqRel) + 1;
118        Ok(new_version)
119    }
120
121    /// Batch insert multiple chunks atomically
122    ///
123    /// Either all chunks are inserted or none are (on version mismatch).
124    /// This is more efficient than multiple individual inserts.
125    pub fn batch_insert(
126        &self,
127        updates: Vec<(ChunkId, VersionedChunk)>,
128        expected_version: u64,
129    ) -> VersionedResult<u64> {
130        let mut chunks = self.chunks.write().unwrap();
131        let mut hash_index = self.hash_index.write().unwrap();
132
133        // Check version
134        let current_version = self.version();
135        if current_version != expected_version {
136            return Err(VersionMismatch {
137                expected: expected_version,
138                actual: current_version,
139            });
140        }
141
142        // Insert all chunks
143        for (chunk_id, chunk) in updates {
144            hash_index.insert(chunk.content_hash, chunk_id);
145            chunks.insert(chunk_id, Arc::new(chunk));
146        }
147
148        // Increment version once for the entire batch
149        let new_version = self.global_version.fetch_add(1, Ordering::AcqRel) + 1;
150        Ok(new_version)
151    }
152
153    /// Batch insert NEW chunks without version checking
154    ///
155    /// This is used for inserting brand new chunks (e.g., when creating a new file)
156    /// where chunk IDs are guaranteed unique and monotonically increasing, so
157    /// concurrent inserts cannot conflict.
158    ///
159    /// This enables lock-free concurrent file creation.
160    pub fn batch_insert_new(
161        &self,
162        updates: Vec<(ChunkId, VersionedChunk)>,
163    ) -> VersionedResult<u64> {
164        let mut chunks = self.chunks.write().unwrap();
165        let mut hash_index = self.hash_index.write().unwrap();
166
167        // No version check - chunk IDs are unique
168        for (chunk_id, chunk) in updates {
169            hash_index.insert(chunk.content_hash, chunk_id);
170            chunks.insert(chunk_id, Arc::new(chunk));
171        }
172
173        // Increment version once for the entire batch
174        let new_version = self.global_version.fetch_add(1, Ordering::AcqRel) + 1;
175        Ok(new_version)
176    }
177
178    /// Remove a chunk by ID
179    ///
180    /// Returns the removed chunk on success, or VersionMismatch if the version changed.
181    pub fn remove(
182        &self,
183        chunk_id: ChunkId,
184        expected_version: u64,
185    ) -> VersionedResult<Option<Arc<VersionedChunk>>> {
186        let mut chunks = self.chunks.write().unwrap();
187        let mut hash_index = self.hash_index.write().unwrap();
188
189        // Check version
190        let current_version = self.version();
191        if current_version != expected_version {
192            return Err(VersionMismatch {
193                expected: expected_version,
194                actual: current_version,
195            });
196        }
197
198        // Remove chunk and update hash index
199        let removed = chunks.remove(&chunk_id);
200        if let Some(ref chunk) = removed {
201            hash_index.remove(&chunk.content_hash);
202        }
203
204        // Increment version
205        self.global_version.fetch_add(1, Ordering::AcqRel);
206        Ok(removed)
207    }
208
209    /// Get the number of chunks in the codebook
210    pub fn len(&self) -> usize {
211        self.chunks.read().unwrap().len()
212    }
213
214    /// Check if the codebook is empty
215    pub fn is_empty(&self) -> bool {
216        self.chunks.read().unwrap().is_empty()
217    }
218
219    /// Get all chunk IDs (snapshot)
220    pub fn chunk_ids(&self) -> Vec<ChunkId> {
221        self.chunks.read().unwrap().keys().copied().collect()
222    }
223
224    /// Iterate over all chunks (snapshot)
225    ///
226    /// Returns a snapshot of (ChunkId, Arc<VersionedChunk>) pairs.
227    /// Safe to iterate without holding locks.
228    pub fn iter(&self) -> Vec<(ChunkId, Arc<VersionedChunk>)> {
229        self.chunks
230            .read()
231            .unwrap()
232            .iter()
233            .map(|(&id, chunk)| (id, Arc::clone(chunk)))
234            .collect()
235    }
236
237    /// Garbage collect unreferenced chunks
238    ///
239    /// Removes chunks with ref_count == 0. Returns the number of chunks removed.
240    pub fn gc(&self, expected_version: u64) -> VersionedResult<usize> {
241        let mut chunks = self.chunks.write().unwrap();
242        let mut hash_index = self.hash_index.write().unwrap();
243
244        // Check version
245        let current_version = self.version();
246        if current_version != expected_version {
247            return Err(VersionMismatch {
248                expected: expected_version,
249                actual: current_version,
250            });
251        }
252
253        // Find unreferenced chunks
254        let to_remove: Vec<ChunkId> = chunks
255            .iter()
256            .filter(|(_, chunk)| chunk.is_unreferenced())
257            .map(|(id, _)| *id)
258            .collect();
259
260        let count = to_remove.len();
261
262        // Remove them
263        for chunk_id in to_remove {
264            if let Some(chunk) = chunks.remove(&chunk_id) {
265                hash_index.remove(&chunk.content_hash);
266            }
267        }
268
269        // Increment version if we removed anything
270        if count > 0 {
271            self.global_version.fetch_add(1, Ordering::AcqRel);
272        }
273
274        Ok(count)
275    }
276
277    /// Get statistics about the codebook
278    pub fn stats(&self) -> CodebookStats {
279        let chunks = self.chunks.read().unwrap();
280
281        let total_chunks = chunks.len();
282        let mut total_refs = 0u64;
283        let mut unreferenced = 0;
284        let mut total_size = 0usize;
285
286        for chunk in chunks.values() {
287            let refs = chunk.ref_count();
288            total_refs += refs as u64;
289            if refs == 0 {
290                unreferenced += 1;
291            }
292            total_size += chunk.original_size;
293        }
294
295        CodebookStats {
296            total_chunks,
297            unreferenced_chunks: unreferenced,
298            total_references: total_refs,
299            avg_references: if total_chunks > 0 {
300                total_refs as f64 / total_chunks as f64
301            } else {
302                0.0
303            },
304            total_size_bytes: total_size,
305            version: self.version(),
306        }
307    }
308}
309
310impl Default for VersionedChunkStore {
311    fn default() -> Self {
312        Self::new()
313    }
314}
315
316impl Clone for VersionedChunkStore {
317    fn clone(&self) -> Self {
318        Self {
319            chunks: Arc::clone(&self.chunks),
320            global_version: Arc::clone(&self.global_version),
321            hash_index: Arc::clone(&self.hash_index),
322        }
323    }
324}
325
326/// Statistics about a codebook
327#[derive(Debug, Clone)]
328pub struct CodebookStats {
329    pub total_chunks: usize,
330    pub unreferenced_chunks: usize,
331    pub total_references: u64,
332    pub avg_references: f64,
333    pub total_size_bytes: usize,
334    pub version: u64,
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use crate::SparseVec;
341
342    fn make_test_chunk(id: usize) -> VersionedChunk {
343        let vec = SparseVec::new();
344        let hash = [(id & 0xFF) as u8; 8];
345        VersionedChunk::new(vec, 4096, hash)
346    }
347
348    #[test]
349    fn test_codebook_creation() {
350        let codebook = VersionedChunkStore::new();
351        assert_eq!(codebook.version(), 0);
352        assert!(codebook.is_empty());
353    }
354
355    #[test]
356    fn test_insert_and_get() {
357        let codebook = VersionedChunkStore::new();
358        let chunk = make_test_chunk(1);
359
360        let version = codebook.insert(1, chunk, 0).unwrap();
361        assert_eq!(version, 1);
362
363        let (retrieved, ver) = codebook.get(1).unwrap();
364        assert_eq!(retrieved.version, 0);
365        assert_eq!(ver, 1);
366    }
367
368    #[test]
369    fn test_version_mismatch() {
370        let codebook = VersionedChunkStore::new();
371        let chunk1 = make_test_chunk(1);
372        let chunk2 = make_test_chunk(2);
373
374        // Insert first chunk
375        codebook.insert(1, chunk1, 0).unwrap();
376
377        // Try to insert with old version
378        let result = codebook.insert(2, chunk2, 0);
379        assert!(result.is_err());
380
381        match result {
382            Err(VersionMismatch { expected, actual }) => {
383                assert_eq!(expected, 0);
384                assert_eq!(actual, 1);
385            }
386            _ => panic!("Expected VersionMismatch"),
387        }
388    }
389
390    #[test]
391    fn test_batch_insert() {
392        let codebook = VersionedChunkStore::new();
393
394        let updates = vec![
395            (1, make_test_chunk(1)),
396            (2, make_test_chunk(2)),
397            (3, make_test_chunk(3)),
398        ];
399
400        let version = codebook.batch_insert(updates, 0).unwrap();
401        assert_eq!(version, 1);
402        assert_eq!(codebook.len(), 3);
403    }
404
405    #[test]
406    fn test_deduplication() {
407        let codebook = VersionedChunkStore::new();
408
409        let chunk1 = make_test_chunk(1);
410        let hash = chunk1.content_hash;
411
412        codebook.insert(1, chunk1, 0).unwrap();
413
414        // Try to find by hash
415        let found = codebook.find_by_hash(&hash);
416        assert!(found.is_some());
417
418        let (id, _) = found.unwrap();
419        assert_eq!(id, 1);
420    }
421
422    #[test]
423    fn test_garbage_collection() {
424        let codebook = VersionedChunkStore::new();
425
426        let chunk = make_test_chunk(1);
427        codebook.insert(1, chunk.clone(), 0).unwrap();
428
429        // Decrement ref_count to 0
430        chunk.dec_ref();
431
432        assert_eq!(codebook.len(), 1);
433
434        // Run GC
435        let removed = codebook.gc(1).unwrap();
436        assert_eq!(removed, 1);
437        assert_eq!(codebook.len(), 0);
438    }
439
440    #[test]
441    fn test_stats() {
442        let codebook = VersionedChunkStore::new();
443
444        for i in 0..10 {
445            codebook.insert(i, make_test_chunk(i), i as u64).unwrap();
446        }
447
448        let stats = codebook.stats();
449        assert_eq!(stats.total_chunks, 10);
450        assert_eq!(stats.total_size_bytes, 10 * 4096);
451        assert_eq!(stats.version, 10);
452    }
453}