Skip to main content

embeddenator_fs/fs/
versioned_embrfs.rs

1//! Mutable EmbrFS implementation using versioned structures
2//!
3//! This module provides a mutable, concurrent-safe version of EmbrFS that supports
4//! read-write operations with optimistic locking. Unlike the original EmbrFS which is
5//! immutable by design, VersionedEmbrFS allows in-place updates while maintaining:
6//!
7//! - Bit-perfect reconstruction guarantees
8//! - Concurrent read access without blocking
9//! - Optimistic locking for writes with conflict detection
10//! - VSA-native operations on compressed state
11//!
12//! ## Architecture
13//!
14//! ```text
15//! VersionedEmbrFS
16//!     ↓
17//! VersionedEngram (coordinatesthree versioned components)
18//!     ├── VersionedChunkStore (chunk_id → SparseVec)
19//!     ├── VersionedManifest (file metadata)
20//!     └── VersionedCorrectionStore (bit-perfect adjustments)
21//! ```
22//!
23//! ## Usage Example
24//!
25//! ```rust,no_run
26//! use embeddenator_fs::VersionedEmbrFS;
27//! use std::path::Path;
28//!
29//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
30//! // Create new mutable filesystem
31//! let fs = VersionedEmbrFS::new();
32//!
33//! // Write a file
34//! let data = b"Hello, EmbrFS!";
35//! fs.write_file("hello.txt", data, None)?;
36//!
37//! // Read it back
38//! let (content, version) = fs.read_file("hello.txt")?;
39//! assert_eq!(&content, data);
40//!
41//! // Update the file
42//! let updated = b"Hello, Mutable EmbrFS!";
43//! fs.write_file("hello.txt", updated, Some(version))?;
44//!
45//! // Concurrent operations work with optimistic locking
46//! # Ok(())
47//! # }
48//! ```
49
50use crate::versioned::{
51    VersionedChunk, VersionedChunkStore, VersionedCorrectionStore, VersionedFileEntry,
52    VersionedManifest,
53};
54use crate::ReversibleVSAConfig;
55use embeddenator_vsa::SparseVec;
56use sha2::{Digest, Sha256};
57use std::sync::atomic::{AtomicU64, Ordering};
58use std::sync::{Arc, RwLock};
59
60pub use crate::versioned::types::{ChunkId, VersionMismatch, VersionedResult};
61pub use crate::versioned::Operation;
62
63/// Default chunk size for file encoding (4KB)
64pub const DEFAULT_CHUNK_SIZE: usize = 4096;
65
66/// Error types for VersionedEmbrFS operations
67#[derive(Debug, Clone)]
68pub enum EmbrFSError {
69    /// File not found
70    FileNotFound(String),
71    /// Chunk not found
72    ChunkNotFound(ChunkId),
73    /// Version mismatch during optimistic locking
74    VersionMismatch { expected: u64, actual: u64 },
75    /// File already exists
76    FileExists(String),
77    /// Invalid operation
78    InvalidOperation(String),
79    /// IO error
80    IoError(String),
81}
82
83impl std::fmt::Display for EmbrFSError {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        match self {
86            EmbrFSError::FileNotFound(path) => write!(f, "File not found: {}", path),
87            EmbrFSError::ChunkNotFound(id) => write!(f, "Chunk not found: {}", id),
88            EmbrFSError::VersionMismatch { expected, actual } => {
89                write!(f, "Version mismatch: expected {}, got {}", expected, actual)
90            }
91            EmbrFSError::FileExists(path) => write!(f, "File already exists: {}", path),
92            EmbrFSError::InvalidOperation(msg) => write!(f, "Invalid operation: {}", msg),
93            EmbrFSError::IoError(msg) => write!(f, "IO error: {}", msg),
94        }
95    }
96}
97
98impl std::error::Error for EmbrFSError {}
99
100impl From<VersionMismatch> for EmbrFSError {
101    fn from(e: VersionMismatch) -> Self {
102        EmbrFSError::VersionMismatch {
103            expected: e.expected,
104            actual: e.actual,
105        }
106    }
107}
108
109/// A mutable, versioned filesystem backed by holographic engrams
110///
111/// VersionedEmbrFS provides read-write operations with optimistic locking,
112/// enabling concurrent access while maintaining consistency and bit-perfect
113/// reconstruction guarantees.
114pub struct VersionedEmbrFS {
115    /// Root VSA vector (bundled superposition of all chunks)
116    root: Arc<RwLock<Arc<SparseVec>>>,
117    root_version: Arc<AtomicU64>,
118
119    /// Versioned chunk store (chunk_id → encoded SparseVec)
120    pub chunk_store: VersionedChunkStore,
121
122    /// Versioned corrections for bit-perfect reconstruction
123    pub corrections: VersionedCorrectionStore,
124
125    /// Versioned manifest (file metadata)
126    pub manifest: VersionedManifest,
127
128    /// VSA configuration
129    config: ReversibleVSAConfig,
130
131    /// Global filesystem version
132    global_version: Arc<AtomicU64>,
133
134    /// Next chunk ID to allocate
135    next_chunk_id: Arc<AtomicU64>,
136}
137
138impl VersionedEmbrFS {
139    /// Create a new empty mutable filesystem
140    pub fn new() -> Self {
141        Self::with_config(ReversibleVSAConfig::default())
142    }
143
144    /// Create a new mutable filesystem with custom VSA configuration
145    pub fn with_config(config: ReversibleVSAConfig) -> Self {
146        Self {
147            root: Arc::new(RwLock::new(Arc::new(SparseVec::new()))),
148            root_version: Arc::new(AtomicU64::new(0)),
149            chunk_store: VersionedChunkStore::new(),
150            corrections: VersionedCorrectionStore::new(),
151            manifest: VersionedManifest::new(),
152            config,
153            global_version: Arc::new(AtomicU64::new(0)),
154            next_chunk_id: Arc::new(AtomicU64::new(1)),
155        }
156    }
157
158    /// Get the current global version
159    pub fn version(&self) -> u64 {
160        self.global_version.load(Ordering::Acquire)
161    }
162
163    /// Read a file's contents
164    ///
165    /// Returns the file data and the file entry version at read time.
166    /// The version can be used for optimistic locking on subsequent writes.
167    ///
168    /// # Example
169    ///
170    /// ```rust,no_run
171    /// # use embeddenator_fs::VersionedEmbrFS;
172    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
173    /// let fs = VersionedEmbrFS::new();
174    /// let (data, version) = fs.read_file("example.txt")?;
175    /// println!("Read {} bytes at version {}", data.len(), version);
176    /// # Ok(())
177    /// # }
178    /// ```
179    pub fn read_file(&self, path: &str) -> Result<(Vec<u8>, u64), EmbrFSError> {
180        // 1. Get file entry with version
181        let (file_entry, _manifest_version) = self
182            .manifest
183            .get_file(path)
184            .ok_or_else(|| EmbrFSError::FileNotFound(path.to_string()))?;
185
186        if file_entry.deleted {
187            return Err(EmbrFSError::FileNotFound(path.to_string()));
188        }
189
190        // 2. Read and decode chunks
191        let mut file_data = Vec::with_capacity(file_entry.size);
192
193        for &chunk_id in &file_entry.chunks {
194            // Get chunk from store
195            let (chunk, _chunk_version) = self
196                .chunk_store
197                .get(chunk_id)
198                .ok_or(EmbrFSError::ChunkNotFound(chunk_id))?;
199
200            // Decode chunk
201            let decoded =
202                chunk
203                    .vector
204                    .decode_data(&self.config, Some(&file_entry.path), DEFAULT_CHUNK_SIZE);
205
206            // Apply correction
207            let corrected = self
208                .corrections
209                .get(chunk_id as u64)
210                .map(|(corr, _)| corr.apply(&decoded))
211                .unwrap_or(decoded);
212
213            file_data.extend_from_slice(&corrected);
214        }
215
216        // Truncate to exact file size
217        file_data.truncate(file_entry.size);
218
219        Ok((file_data, file_entry.version))
220    }
221
222    /// Write a file's contents
223    ///
224    /// If `expected_version` is provided, performs optimistic locking - the write
225    /// will fail with VersionMismatch if the file has been modified since the version
226    /// was read.
227    ///
228    /// # Example
229    ///
230    /// ```rust,no_run
231    /// # use embeddenator_fs::VersionedEmbrFS;
232    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
233    /// let fs = VersionedEmbrFS::new();
234    ///
235    /// // Create new file
236    /// let version = fs.write_file("new.txt", b"content", None)?;
237    ///
238    /// // Update with version check
239    /// let new_version = fs.write_file("new.txt", b"updated", Some(version))?;
240    /// # Ok(())
241    /// # }
242    /// ```
243    pub fn write_file(
244        &self,
245        path: &str,
246        data: &[u8],
247        expected_version: Option<u64>,
248    ) -> Result<u64, EmbrFSError> {
249        // 1. Check existing file
250        let existing = self.manifest.get_file(path);
251
252        match (&existing, expected_version) {
253            (Some((entry, _)), Some(expected_ver)) => {
254                // Update existing - verify version
255                if entry.version != expected_ver {
256                    return Err(EmbrFSError::VersionMismatch {
257                        expected: expected_ver,
258                        actual: entry.version,
259                    });
260                }
261            }
262            (Some(_), None) => {
263                // File exists but no version check - fail
264                return Err(EmbrFSError::FileExists(path.to_string()));
265            }
266            (None, Some(_)) => {
267                // Expected file but doesn't exist
268                return Err(EmbrFSError::FileNotFound(path.to_string()));
269            }
270            (None, None) => {
271                // New file - OK
272            }
273        }
274
275        // 2. Chunk the data
276        let chunks = self.chunk_data(data);
277        let mut chunk_ids = Vec::new();
278
279        // 3. Get current chunk store version
280        let store_version = self.chunk_store.version();
281
282        // 4. Encode chunks and build updates
283        let mut chunk_updates = Vec::new();
284        let mut corrections_to_add = Vec::new();
285
286        for chunk_data in chunks {
287            let chunk_id = self.allocate_chunk_id();
288
289            // Encode chunk
290            let chunk_vec = SparseVec::encode_data(chunk_data, &self.config, Some(path));
291
292            // Immediately verify
293            let decoded = chunk_vec.decode_data(&self.config, Some(path), chunk_data.len());
294
295            // Compute content hash
296            let mut hasher = Sha256::new();
297            hasher.update(chunk_data);
298            let hash = hasher.finalize();
299            let mut hash_bytes = [0u8; 8];
300            hash_bytes.copy_from_slice(&hash[0..8]);
301
302            // Create versioned chunk
303            let versioned_chunk = VersionedChunk::new(chunk_vec, chunk_data.len(), hash_bytes);
304
305            chunk_updates.push((chunk_id, versioned_chunk));
306
307            // Prepare correction
308            let correction =
309                crate::correction::ChunkCorrection::new(chunk_id as u64, chunk_data, &decoded);
310            corrections_to_add.push((chunk_id as u64, correction));
311
312            chunk_ids.push(chunk_id);
313        }
314
315        // 5. Batch insert chunks into store
316        if expected_version.is_none() {
317            // New file - use lock-free insert (chunk IDs are unique)
318            self.chunk_store.batch_insert_new(chunk_updates)?;
319        } else {
320            // Existing file - use versioned update with optimistic locking
321            self.chunk_store
322                .batch_insert(chunk_updates, store_version)?;
323        }
324
325        // 6. Add corrections (after chunk store update)
326        if expected_version.is_none() {
327            // New file - use lock-free insert (chunk IDs are unique)
328            self.corrections.batch_insert_new(corrections_to_add)?;
329        } else {
330            // Existing file - use versioned batch update
331            let corrections_version = self.corrections.current_version();
332            self.corrections
333                .batch_update(corrections_to_add, corrections_version)?;
334        }
335
336        // 6. Update manifest
337        let is_text = is_text_data(data);
338        let new_entry =
339            VersionedFileEntry::new(path.to_string(), is_text, data.len(), chunk_ids.clone());
340
341        let file_version = if let Some((entry, _)) = existing {
342            self.manifest.update_file(path, new_entry, entry.version)?;
343            entry.version + 1
344        } else {
345            self.manifest.add_file(new_entry)?;
346            0
347        };
348
349        // 7. Bundle chunks into root
350        self.bundle_chunks_to_root(&chunk_ids)?;
351
352        // 8. Increment global version
353        self.global_version.fetch_add(1, Ordering::AcqRel);
354
355        Ok(file_version)
356    }
357
358    /// Delete a file (soft delete)
359    ///
360    /// The file is marked as deleted but its chunks remain in the engram until
361    /// compaction. Requires the current file version for optimistic locking.
362    pub fn delete_file(&self, path: &str, expected_version: u64) -> Result<(), EmbrFSError> {
363        self.manifest.remove_file(path, expected_version)?;
364        self.global_version.fetch_add(1, Ordering::AcqRel);
365        Ok(())
366    }
367
368    /// List all non-deleted files
369    pub fn list_files(&self) -> Vec<String> {
370        self.manifest.list_files()
371    }
372
373    /// Check if a file exists
374    pub fn exists(&self, path: &str) -> bool {
375        self.manifest
376            .get_file(path)
377            .map(|(entry, _)| !entry.deleted)
378            .unwrap_or(false)
379    }
380
381    /// Get filesystem statistics
382    pub fn stats(&self) -> FilesystemStats {
383        let manifest_stats = self.manifest.stats();
384        let chunk_stats = self.chunk_store.stats();
385        let correction_stats = self.corrections.stats();
386
387        FilesystemStats {
388            total_files: manifest_stats.total_files,
389            active_files: manifest_stats.active_files,
390            deleted_files: manifest_stats.deleted_files,
391            total_chunks: chunk_stats.total_chunks as u64,
392            total_size_bytes: manifest_stats.total_size_bytes,
393            correction_overhead_bytes: correction_stats.total_correction_bytes,
394            version: self.version(),
395        }
396    }
397
398    // === Private helper methods ===
399
400    /// Chunk data into DEFAULT_CHUNK_SIZE blocks
401    fn chunk_data<'a>(&self, data: &'a [u8]) -> Vec<&'a [u8]> {
402        data.chunks(DEFAULT_CHUNK_SIZE).collect()
403    }
404
405    /// Allocate a new unique chunk ID
406    fn allocate_chunk_id(&self) -> ChunkId {
407        self.next_chunk_id.fetch_add(1, Ordering::AcqRel) as ChunkId
408    }
409
410    /// Bundle chunks into root with CAS retry loop
411    fn bundle_chunks_to_root(&self, chunk_ids: &[ChunkId]) -> Result<(), EmbrFSError> {
412        // Retry loop for CAS
413        loop {
414            // Read current root
415            let root_lock = self.root.read().unwrap();
416            let current_root = Arc::clone(&*root_lock);
417            let current_version = self.root_version.load(Ordering::Acquire);
418            drop(root_lock);
419
420            // Build new root by bundling chunks
421            let mut new_root = (*current_root).clone();
422            for &chunk_id in chunk_ids {
423                if let Some((chunk, _)) = self.chunk_store.get(chunk_id) {
424                    new_root = new_root.bundle(&chunk.vector);
425                }
426            }
427
428            // Try CAS update
429            let mut root_lock = self.root.write().unwrap();
430            let actual_version = self.root_version.load(Ordering::Acquire);
431
432            if actual_version == current_version {
433                // Success - no concurrent update
434                *root_lock = Arc::new(new_root);
435                self.root_version.fetch_add(1, Ordering::AcqRel);
436                return Ok(());
437            }
438
439            // Retry - someone else updated root
440            drop(root_lock);
441            // Small backoff to reduce contention
442            std::thread::yield_now();
443        }
444    }
445}
446
447impl Default for VersionedEmbrFS {
448    fn default() -> Self {
449        Self::new()
450    }
451}
452
453/// Filesystem statistics
454#[derive(Debug, Clone)]
455pub struct FilesystemStats {
456    pub total_files: usize,
457    pub active_files: usize,
458    pub deleted_files: usize,
459    pub total_chunks: u64,
460    pub total_size_bytes: usize,
461    pub correction_overhead_bytes: u64,
462    pub version: u64,
463}
464
465/// Heuristic to detect if data is likely text
466fn is_text_data(data: &[u8]) -> bool {
467    if data.is_empty() {
468        return true;
469    }
470
471    let sample_size = data.len().min(8192);
472    let sample = &data[0..sample_size];
473
474    let non_printable = sample
475        .iter()
476        .filter(|&&b| b < 32 && b != b'\n' && b != b'\r' && b != b'\t')
477        .count();
478
479    (non_printable as f64 / sample_size as f64) < 0.05
480}
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485
486    #[test]
487    fn test_new_filesystem() {
488        let fs = VersionedEmbrFS::new();
489        assert_eq!(fs.version(), 0);
490        assert_eq!(fs.list_files().len(), 0);
491    }
492
493    #[test]
494    fn test_write_and_read_file() {
495        let fs = VersionedEmbrFS::new();
496        let data = b"Hello, EmbrFS!";
497
498        // Write file
499        let version = fs.write_file("test.txt", data, None).unwrap();
500        assert_eq!(version, 0);
501
502        // Read it back
503        let (content, read_version) = fs.read_file("test.txt").unwrap();
504        assert_eq!(&content[..], data);
505        assert_eq!(read_version, 0);
506    }
507
508    #[test]
509    fn test_update_file_with_version_check() {
510        let fs = VersionedEmbrFS::new();
511
512        // Create file
513        let v1 = fs.write_file("test.txt", b"version 1", None).unwrap();
514
515        // Update with correct version
516        let v2 = fs.write_file("test.txt", b"version 2", Some(v1)).unwrap();
517        assert_eq!(v2, v1 + 1);
518
519        // Try to update with stale version (should fail)
520        let result = fs.write_file("test.txt", b"version 3", Some(v1));
521        assert!(matches!(result, Err(EmbrFSError::VersionMismatch { .. })));
522    }
523
524    #[test]
525    fn test_delete_file() {
526        let fs = VersionedEmbrFS::new();
527
528        // Create and delete
529        let version = fs.write_file("test.txt", b"data", None).unwrap();
530        fs.delete_file("test.txt", version).unwrap();
531
532        // Should not exist
533        assert!(!fs.exists("test.txt"));
534
535        // Read should fail
536        let result = fs.read_file("test.txt");
537        assert!(matches!(result, Err(EmbrFSError::FileNotFound(_))));
538    }
539
540    #[test]
541    fn test_list_files() {
542        let fs = VersionedEmbrFS::new();
543
544        fs.write_file("file1.txt", b"a", None).unwrap();
545        fs.write_file("file2.txt", b"b", None).unwrap();
546        fs.write_file("file3.txt", b"c", None).unwrap();
547
548        let files = fs.list_files();
549        assert_eq!(files.len(), 3);
550        assert!(files.contains(&"file1.txt".to_string()));
551        assert!(files.contains(&"file2.txt".to_string()));
552        assert!(files.contains(&"file3.txt".to_string()));
553    }
554
555    #[test]
556    fn test_large_file() {
557        let fs = VersionedEmbrFS::new();
558
559        // Create file larger than one chunk
560        let data = vec![42u8; DEFAULT_CHUNK_SIZE * 3 + 100];
561        fs.write_file("large.bin", &data, None).unwrap();
562
563        let (content, _) = fs.read_file("large.bin").unwrap();
564        assert_eq!(content, data);
565    }
566
567    #[test]
568    fn test_stats() {
569        let fs = VersionedEmbrFS::new();
570
571        fs.write_file("file1.txt", b"hello", None).unwrap();
572        fs.write_file("file2.txt", b"world", None).unwrap();
573
574        let stats = fs.stats();
575        assert_eq!(stats.active_files, 2);
576        assert_eq!(stats.total_files, 2);
577        assert_eq!(stats.deleted_files, 0);
578        assert_eq!(stats.total_size_bytes, 10);
579    }
580}