1use crate::versioned::{
51 VersionedChunk, VersionedChunkStore, VersionedCorrectionStore, VersionedFileEntry,
52 VersionedManifest,
53};
54use crate::ReversibleVSAConfig;
55use embeddenator_vsa::SparseVec;
56use sha2::{Digest, Sha256};
57use std::sync::atomic::{AtomicU64, Ordering};
58use std::sync::{Arc, RwLock};
59
60pub use crate::versioned::types::{ChunkId, VersionMismatch, VersionedResult};
61pub use crate::versioned::Operation;
62
63pub const DEFAULT_CHUNK_SIZE: usize = 4096;
65
66#[derive(Debug, Clone)]
68pub enum EmbrFSError {
69 FileNotFound(String),
71 ChunkNotFound(ChunkId),
73 VersionMismatch { expected: u64, actual: u64 },
75 FileExists(String),
77 InvalidOperation(String),
79 IoError(String),
81}
82
83impl std::fmt::Display for EmbrFSError {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 match self {
86 EmbrFSError::FileNotFound(path) => write!(f, "File not found: {}", path),
87 EmbrFSError::ChunkNotFound(id) => write!(f, "Chunk not found: {}", id),
88 EmbrFSError::VersionMismatch { expected, actual } => {
89 write!(f, "Version mismatch: expected {}, got {}", expected, actual)
90 }
91 EmbrFSError::FileExists(path) => write!(f, "File already exists: {}", path),
92 EmbrFSError::InvalidOperation(msg) => write!(f, "Invalid operation: {}", msg),
93 EmbrFSError::IoError(msg) => write!(f, "IO error: {}", msg),
94 }
95 }
96}
97
98impl std::error::Error for EmbrFSError {}
99
100impl From<VersionMismatch> for EmbrFSError {
101 fn from(e: VersionMismatch) -> Self {
102 EmbrFSError::VersionMismatch {
103 expected: e.expected,
104 actual: e.actual,
105 }
106 }
107}
108
109pub struct VersionedEmbrFS {
115 root: Arc<RwLock<Arc<SparseVec>>>,
117 root_version: Arc<AtomicU64>,
118
119 pub chunk_store: VersionedChunkStore,
121
122 pub corrections: VersionedCorrectionStore,
124
125 pub manifest: VersionedManifest,
127
128 config: ReversibleVSAConfig,
130
131 global_version: Arc<AtomicU64>,
133
134 next_chunk_id: Arc<AtomicU64>,
136}
137
138impl VersionedEmbrFS {
139 pub fn new() -> Self {
141 Self::with_config(ReversibleVSAConfig::default())
142 }
143
144 pub fn with_config(config: ReversibleVSAConfig) -> Self {
146 Self {
147 root: Arc::new(RwLock::new(Arc::new(SparseVec::new()))),
148 root_version: Arc::new(AtomicU64::new(0)),
149 chunk_store: VersionedChunkStore::new(),
150 corrections: VersionedCorrectionStore::new(),
151 manifest: VersionedManifest::new(),
152 config,
153 global_version: Arc::new(AtomicU64::new(0)),
154 next_chunk_id: Arc::new(AtomicU64::new(1)),
155 }
156 }
157
158 pub fn version(&self) -> u64 {
160 self.global_version.load(Ordering::Acquire)
161 }
162
163 pub fn read_file(&self, path: &str) -> Result<(Vec<u8>, u64), EmbrFSError> {
180 let (file_entry, _manifest_version) = self
182 .manifest
183 .get_file(path)
184 .ok_or_else(|| EmbrFSError::FileNotFound(path.to_string()))?;
185
186 if file_entry.deleted {
187 return Err(EmbrFSError::FileNotFound(path.to_string()));
188 }
189
190 let mut file_data = Vec::with_capacity(file_entry.size);
192
193 for &chunk_id in &file_entry.chunks {
194 let (chunk, _chunk_version) = self
196 .chunk_store
197 .get(chunk_id)
198 .ok_or(EmbrFSError::ChunkNotFound(chunk_id))?;
199
200 let decoded =
202 chunk
203 .vector
204 .decode_data(&self.config, Some(&file_entry.path), DEFAULT_CHUNK_SIZE);
205
206 let corrected = self
208 .corrections
209 .get(chunk_id as u64)
210 .map(|(corr, _)| corr.apply(&decoded))
211 .unwrap_or(decoded);
212
213 file_data.extend_from_slice(&corrected);
214 }
215
216 file_data.truncate(file_entry.size);
218
219 Ok((file_data, file_entry.version))
220 }
221
222 pub fn write_file(
244 &self,
245 path: &str,
246 data: &[u8],
247 expected_version: Option<u64>,
248 ) -> Result<u64, EmbrFSError> {
249 let existing = self.manifest.get_file(path);
251
252 match (&existing, expected_version) {
253 (Some((entry, _)), Some(expected_ver)) => {
254 if entry.version != expected_ver {
256 return Err(EmbrFSError::VersionMismatch {
257 expected: expected_ver,
258 actual: entry.version,
259 });
260 }
261 }
262 (Some(_), None) => {
263 return Err(EmbrFSError::FileExists(path.to_string()));
265 }
266 (None, Some(_)) => {
267 return Err(EmbrFSError::FileNotFound(path.to_string()));
269 }
270 (None, None) => {
271 }
273 }
274
275 let chunks = self.chunk_data(data);
277 let mut chunk_ids = Vec::new();
278
279 let store_version = self.chunk_store.version();
281
282 let mut chunk_updates = Vec::new();
284 let mut corrections_to_add = Vec::new();
285
286 for chunk_data in chunks {
287 let chunk_id = self.allocate_chunk_id();
288
289 let chunk_vec = SparseVec::encode_data(chunk_data, &self.config, Some(path));
291
292 let decoded = chunk_vec.decode_data(&self.config, Some(path), chunk_data.len());
294
295 let mut hasher = Sha256::new();
297 hasher.update(chunk_data);
298 let hash = hasher.finalize();
299 let mut hash_bytes = [0u8; 8];
300 hash_bytes.copy_from_slice(&hash[0..8]);
301
302 let versioned_chunk = VersionedChunk::new(chunk_vec, chunk_data.len(), hash_bytes);
304
305 chunk_updates.push((chunk_id, versioned_chunk));
306
307 let correction =
309 crate::correction::ChunkCorrection::new(chunk_id as u64, chunk_data, &decoded);
310 corrections_to_add.push((chunk_id as u64, correction));
311
312 chunk_ids.push(chunk_id);
313 }
314
315 if expected_version.is_none() {
317 self.chunk_store.batch_insert_new(chunk_updates)?;
319 } else {
320 self.chunk_store
322 .batch_insert(chunk_updates, store_version)?;
323 }
324
325 if expected_version.is_none() {
327 self.corrections.batch_insert_new(corrections_to_add)?;
329 } else {
330 let corrections_version = self.corrections.current_version();
332 self.corrections
333 .batch_update(corrections_to_add, corrections_version)?;
334 }
335
336 let is_text = is_text_data(data);
338 let new_entry =
339 VersionedFileEntry::new(path.to_string(), is_text, data.len(), chunk_ids.clone());
340
341 let file_version = if let Some((entry, _)) = existing {
342 self.manifest.update_file(path, new_entry, entry.version)?;
343 entry.version + 1
344 } else {
345 self.manifest.add_file(new_entry)?;
346 0
347 };
348
349 self.bundle_chunks_to_root(&chunk_ids)?;
351
352 self.global_version.fetch_add(1, Ordering::AcqRel);
354
355 Ok(file_version)
356 }
357
358 pub fn delete_file(&self, path: &str, expected_version: u64) -> Result<(), EmbrFSError> {
363 self.manifest.remove_file(path, expected_version)?;
364 self.global_version.fetch_add(1, Ordering::AcqRel);
365 Ok(())
366 }
367
368 pub fn list_files(&self) -> Vec<String> {
370 self.manifest.list_files()
371 }
372
373 pub fn exists(&self, path: &str) -> bool {
375 self.manifest
376 .get_file(path)
377 .map(|(entry, _)| !entry.deleted)
378 .unwrap_or(false)
379 }
380
381 pub fn stats(&self) -> FilesystemStats {
383 let manifest_stats = self.manifest.stats();
384 let chunk_stats = self.chunk_store.stats();
385 let correction_stats = self.corrections.stats();
386
387 FilesystemStats {
388 total_files: manifest_stats.total_files,
389 active_files: manifest_stats.active_files,
390 deleted_files: manifest_stats.deleted_files,
391 total_chunks: chunk_stats.total_chunks as u64,
392 total_size_bytes: manifest_stats.total_size_bytes,
393 correction_overhead_bytes: correction_stats.total_correction_bytes,
394 version: self.version(),
395 }
396 }
397
398 fn chunk_data<'a>(&self, data: &'a [u8]) -> Vec<&'a [u8]> {
402 data.chunks(DEFAULT_CHUNK_SIZE).collect()
403 }
404
405 fn allocate_chunk_id(&self) -> ChunkId {
407 self.next_chunk_id.fetch_add(1, Ordering::AcqRel) as ChunkId
408 }
409
410 fn bundle_chunks_to_root(&self, chunk_ids: &[ChunkId]) -> Result<(), EmbrFSError> {
412 loop {
414 let root_lock = self.root.read().unwrap();
416 let current_root = Arc::clone(&*root_lock);
417 let current_version = self.root_version.load(Ordering::Acquire);
418 drop(root_lock);
419
420 let mut new_root = (*current_root).clone();
422 for &chunk_id in chunk_ids {
423 if let Some((chunk, _)) = self.chunk_store.get(chunk_id) {
424 new_root = new_root.bundle(&chunk.vector);
425 }
426 }
427
428 let mut root_lock = self.root.write().unwrap();
430 let actual_version = self.root_version.load(Ordering::Acquire);
431
432 if actual_version == current_version {
433 *root_lock = Arc::new(new_root);
435 self.root_version.fetch_add(1, Ordering::AcqRel);
436 return Ok(());
437 }
438
439 drop(root_lock);
441 std::thread::yield_now();
443 }
444 }
445}
446
447impl Default for VersionedEmbrFS {
448 fn default() -> Self {
449 Self::new()
450 }
451}
452
453#[derive(Debug, Clone)]
455pub struct FilesystemStats {
456 pub total_files: usize,
457 pub active_files: usize,
458 pub deleted_files: usize,
459 pub total_chunks: u64,
460 pub total_size_bytes: usize,
461 pub correction_overhead_bytes: u64,
462 pub version: u64,
463}
464
465fn is_text_data(data: &[u8]) -> bool {
467 if data.is_empty() {
468 return true;
469 }
470
471 let sample_size = data.len().min(8192);
472 let sample = &data[0..sample_size];
473
474 let non_printable = sample
475 .iter()
476 .filter(|&&b| b < 32 && b != b'\n' && b != b'\r' && b != b'\t')
477 .count();
478
479 (non_printable as f64 / sample_size as f64) < 0.05
480}
481
482#[cfg(test)]
483mod tests {
484 use super::*;
485
486 #[test]
487 fn test_new_filesystem() {
488 let fs = VersionedEmbrFS::new();
489 assert_eq!(fs.version(), 0);
490 assert_eq!(fs.list_files().len(), 0);
491 }
492
493 #[test]
494 fn test_write_and_read_file() {
495 let fs = VersionedEmbrFS::new();
496 let data = b"Hello, EmbrFS!";
497
498 let version = fs.write_file("test.txt", data, None).unwrap();
500 assert_eq!(version, 0);
501
502 let (content, read_version) = fs.read_file("test.txt").unwrap();
504 assert_eq!(&content[..], data);
505 assert_eq!(read_version, 0);
506 }
507
508 #[test]
509 fn test_update_file_with_version_check() {
510 let fs = VersionedEmbrFS::new();
511
512 let v1 = fs.write_file("test.txt", b"version 1", None).unwrap();
514
515 let v2 = fs.write_file("test.txt", b"version 2", Some(v1)).unwrap();
517 assert_eq!(v2, v1 + 1);
518
519 let result = fs.write_file("test.txt", b"version 3", Some(v1));
521 assert!(matches!(result, Err(EmbrFSError::VersionMismatch { .. })));
522 }
523
524 #[test]
525 fn test_delete_file() {
526 let fs = VersionedEmbrFS::new();
527
528 let version = fs.write_file("test.txt", b"data", None).unwrap();
530 fs.delete_file("test.txt", version).unwrap();
531
532 assert!(!fs.exists("test.txt"));
534
535 let result = fs.read_file("test.txt");
537 assert!(matches!(result, Err(EmbrFSError::FileNotFound(_))));
538 }
539
540 #[test]
541 fn test_list_files() {
542 let fs = VersionedEmbrFS::new();
543
544 fs.write_file("file1.txt", b"a", None).unwrap();
545 fs.write_file("file2.txt", b"b", None).unwrap();
546 fs.write_file("file3.txt", b"c", None).unwrap();
547
548 let files = fs.list_files();
549 assert_eq!(files.len(), 3);
550 assert!(files.contains(&"file1.txt".to_string()));
551 assert!(files.contains(&"file2.txt".to_string()));
552 assert!(files.contains(&"file3.txt".to_string()));
553 }
554
555 #[test]
556 fn test_large_file() {
557 let fs = VersionedEmbrFS::new();
558
559 let data = vec![42u8; DEFAULT_CHUNK_SIZE * 3 + 100];
561 fs.write_file("large.bin", &data, None).unwrap();
562
563 let (content, _) = fs.read_file("large.bin").unwrap();
564 assert_eq!(content, data);
565 }
566
567 #[test]
568 fn test_stats() {
569 let fs = VersionedEmbrFS::new();
570
571 fs.write_file("file1.txt", b"hello", None).unwrap();
572 fs.write_file("file2.txt", b"world", None).unwrap();
573
574 let stats = fs.stats();
575 assert_eq!(stats.active_files, 2);
576 assert_eq!(stats.total_files, 2);
577 assert_eq!(stats.deleted_files, 0);
578 assert_eq!(stats.total_size_bytes, 10);
579 }
580}