1use crate::versioned::{
51 VersionedChunk, VersionedChunkStore, VersionedCorrectionStore, VersionedFileEntry,
52 VersionedManifest,
53};
54use crate::ReversibleVSAConfig;
55use embeddenator_io::{
56 unwrap_auto, wrap_or_legacy, CompressionCodec, CompressionProfiler, PayloadKind,
57};
58use embeddenator_vsa::{Codebook, ProjectionResult, ReversibleVSAEncoder, SparseVec, DIM};
59use sha2::{Digest, Sha256};
60use std::sync::atomic::{AtomicU64, Ordering};
61use std::sync::{Arc, RwLock};
62
63pub use crate::versioned::types::{ChunkId, VersionMismatch, VersionedResult};
64pub use crate::versioned::Operation;
65
66pub const DEFAULT_CHUNK_SIZE: usize = 4096;
68
69pub const ENCODING_FORMAT_LEGACY: u8 = 0;
72pub const ENCODING_FORMAT_REVERSIBLE_VSA: u8 = 1;
74
75pub const REVERSIBLE_CHUNK_SIZE: usize = 64;
84
85#[derive(Debug, Clone)]
87pub enum EmbrFSError {
88 FileNotFound(String),
90 ChunkNotFound(ChunkId),
92 VersionMismatch { expected: u64, actual: u64 },
94 FileExists(String),
96 InvalidOperation(String),
98 IoError(String),
100}
101
102impl std::fmt::Display for EmbrFSError {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 match self {
105 EmbrFSError::FileNotFound(path) => write!(f, "File not found: {}", path),
106 EmbrFSError::ChunkNotFound(id) => write!(f, "Chunk not found: {}", id),
107 EmbrFSError::VersionMismatch { expected, actual } => {
108 write!(f, "Version mismatch: expected {}, got {}", expected, actual)
109 }
110 EmbrFSError::FileExists(path) => write!(f, "File already exists: {}", path),
111 EmbrFSError::InvalidOperation(msg) => write!(f, "Invalid operation: {}", msg),
112 EmbrFSError::IoError(msg) => write!(f, "IO error: {}", msg),
113 }
114 }
115}
116
117impl std::error::Error for EmbrFSError {}
118
119impl From<VersionMismatch> for EmbrFSError {
120 fn from(e: VersionMismatch) -> Self {
121 EmbrFSError::VersionMismatch {
122 expected: e.expected,
123 actual: e.actual,
124 }
125 }
126}
127
128pub struct VersionedEmbrFS {
134 root: Arc<RwLock<Arc<SparseVec>>>,
136 root_version: Arc<AtomicU64>,
137
138 pub chunk_store: VersionedChunkStore,
140
141 pub corrections: VersionedCorrectionStore,
143
144 pub manifest: VersionedManifest,
146
147 config: ReversibleVSAConfig,
149
150 profiler: CompressionProfiler,
152
153 global_version: Arc<AtomicU64>,
155
156 next_chunk_id: Arc<AtomicU64>,
158
159 codebook: Arc<RwLock<Codebook>>,
164
165 reversible_encoder: Arc<RwLock<ReversibleVSAEncoder>>,
168
169 holographic_mode: bool,
173}
174
175impl VersionedEmbrFS {
176 pub fn new() -> Self {
178 Self::with_config(ReversibleVSAConfig::default())
179 }
180
181 pub fn new_holographic() -> Self {
187 let mut fs = Self::with_config(ReversibleVSAConfig::default());
188 fs.holographic_mode = true;
189 fs
191 }
192
193 pub fn with_config(config: ReversibleVSAConfig) -> Self {
195 Self::with_config_and_profiler(config, CompressionProfiler::default())
196 }
197
198 pub fn with_config_and_profiler(
200 config: ReversibleVSAConfig,
201 profiler: CompressionProfiler,
202 ) -> Self {
203 Self {
204 root: Arc::new(RwLock::new(Arc::new(SparseVec::new()))),
205 root_version: Arc::new(AtomicU64::new(0)),
206 chunk_store: VersionedChunkStore::new(),
207 corrections: VersionedCorrectionStore::new(),
208 manifest: VersionedManifest::new(),
209 config,
210 profiler,
211 global_version: Arc::new(AtomicU64::new(0)),
212 next_chunk_id: Arc::new(AtomicU64::new(1)),
213 codebook: Arc::new(RwLock::new(Codebook::new(DIM))),
214 reversible_encoder: Arc::new(RwLock::new(ReversibleVSAEncoder::new())),
215 holographic_mode: false,
216 }
217 }
218
219 pub fn enable_holographic_mode(&mut self) {
223 self.holographic_mode = true;
224 }
226
227 pub fn reversible_encoder(&self) -> &Arc<RwLock<ReversibleVSAEncoder>> {
229 &self.reversible_encoder
230 }
231
232 pub fn is_holographic(&self) -> bool {
234 self.holographic_mode
235 }
236
237 pub fn codebook(&self) -> &Arc<RwLock<Codebook>> {
239 &self.codebook
240 }
241
242 pub fn profiler(&self) -> &CompressionProfiler {
244 &self.profiler
245 }
246
247 pub fn version(&self) -> u64 {
249 self.global_version.load(Ordering::Acquire)
250 }
251
252 pub fn read_file(&self, path: &str) -> Result<(Vec<u8>, u64), EmbrFSError> {
269 let (file_entry, _manifest_version) = self
271 .manifest
272 .get_file(path)
273 .ok_or_else(|| EmbrFSError::FileNotFound(path.to_string()))?;
274
275 if file_entry.deleted {
276 return Err(EmbrFSError::FileNotFound(path.to_string()));
277 }
278
279 let mut file_data = Vec::with_capacity(file_entry.size);
281
282 let use_reversible = file_entry.encoding_format == Some(ENCODING_FORMAT_REVERSIBLE_VSA);
284
285 for &chunk_id in &file_entry.chunks {
286 let (chunk, _chunk_version) = self
288 .chunk_store
289 .get(chunk_id)
290 .ok_or(EmbrFSError::ChunkNotFound(chunk_id))?;
291
292 let decoded = if use_reversible {
294 self.reversible_encoder
295 .read()
296 .unwrap()
297 .decode(&chunk.vector, DEFAULT_CHUNK_SIZE)
298 } else {
299 chunk
300 .vector
301 .decode_data(&self.config, Some(&file_entry.path), DEFAULT_CHUNK_SIZE)
302 };
303
304 let corrected = self
306 .corrections
307 .get(chunk_id as u64)
308 .map(|(corr, _)| corr.apply(&decoded))
309 .unwrap_or(decoded);
310
311 file_data.extend_from_slice(&corrected);
312 }
313
314 file_data.truncate(file_entry.size);
316
317 let final_data = if let Some(codec) = file_entry.compression_codec {
319 if codec != 0 {
320 unwrap_auto(PayloadKind::EngramBincode, &file_data)
322 .map_err(|e| EmbrFSError::IoError(format!("Decompression failed: {}", e)))?
323 } else {
324 file_data
325 }
326 } else {
327 file_data
328 };
329
330 Ok((final_data, file_entry.version))
331 }
332
333 pub fn read_range(
364 &self,
365 path: &str,
366 offset: usize,
367 length: usize,
368 ) -> Result<(Vec<u8>, u64), EmbrFSError> {
369 let (file_entry, _manifest_version) = self
371 .manifest
372 .get_file(path)
373 .ok_or_else(|| EmbrFSError::FileNotFound(path.to_string()))?;
374
375 if file_entry.deleted {
376 return Err(EmbrFSError::FileNotFound(path.to_string()));
377 }
378
379 if offset >= file_entry.size || length == 0 {
381 return Ok((Vec::new(), file_entry.version));
382 }
383
384 let actual_length = length.min(file_entry.size - offset);
386
387 if file_entry.has_offset_index() {
389 self.read_range_with_index(path, &file_entry, offset, actual_length)
391 } else {
392 self.read_range_sequential(path, &file_entry, offset, actual_length)
394 }
395 }
396
397 fn read_range_with_index(
399 &self,
400 _path: &str,
401 file_entry: &crate::fs::versioned::manifest::VersionedFileEntry,
402 offset: usize,
403 length: usize,
404 ) -> Result<(Vec<u8>, u64), EmbrFSError> {
405 let chunk_ranges = file_entry.chunks_for_range(offset, length);
406
407 if chunk_ranges.is_empty() {
408 return Ok((Vec::new(), file_entry.version));
409 }
410
411 let mut result = Vec::with_capacity(length);
412
413 for range in chunk_ranges {
414 let (chunk, _chunk_version) = self
416 .chunk_store
417 .get(range.chunk_id)
418 .ok_or(EmbrFSError::ChunkNotFound(range.chunk_id))?;
419
420 let decoded =
422 chunk
423 .vector
424 .decode_data(&self.config, Some(&file_entry.path), chunk.original_size);
425
426 let corrected = self
428 .corrections
429 .get(range.chunk_id as u64)
430 .map(|(corr, _)| corr.apply(&decoded))
431 .unwrap_or(decoded);
432
433 let chunk_data = if range.start_within_chunk == 0 && range.length == corrected.len() {
435 corrected
436 } else {
437 let end = (range.start_within_chunk + range.length).min(corrected.len());
438 corrected[range.start_within_chunk..end].to_vec()
439 };
440
441 result.extend_from_slice(&chunk_data);
442 }
443
444 result.truncate(length);
446
447 Ok((result, file_entry.version))
448 }
449
450 fn read_range_sequential(
452 &self,
453 _path: &str,
454 file_entry: &crate::fs::versioned::manifest::VersionedFileEntry,
455 offset: usize,
456 length: usize,
457 ) -> Result<(Vec<u8>, u64), EmbrFSError> {
458 let mut result = Vec::with_capacity(length);
459 let mut current_offset = 0usize;
460 let end_offset = offset + length;
461
462 for &chunk_id in &file_entry.chunks {
463 let (chunk, _chunk_version) = self
465 .chunk_store
466 .get(chunk_id)
467 .ok_or(EmbrFSError::ChunkNotFound(chunk_id))?;
468
469 let chunk_size = chunk.original_size;
470 let chunk_end = current_offset + chunk_size;
471
472 if chunk_end <= offset {
474 current_offset = chunk_end;
475 continue;
476 }
477
478 if current_offset >= end_offset {
480 break;
481 }
482
483 let decoded =
485 chunk
486 .vector
487 .decode_data(&self.config, Some(&file_entry.path), chunk_size);
488
489 let corrected = self
491 .corrections
492 .get(chunk_id as u64)
493 .map(|(corr, _)| corr.apply(&decoded))
494 .unwrap_or(decoded);
495
496 let start_in_chunk = offset.saturating_sub(current_offset);
498 let end_in_chunk = (end_offset - current_offset).min(corrected.len());
499
500 if start_in_chunk < end_in_chunk {
501 result.extend_from_slice(&corrected[start_in_chunk..end_in_chunk]);
502 }
503
504 current_offset = chunk_end;
505
506 if result.len() >= length {
508 break;
509 }
510 }
511
512 result.truncate(length);
514
515 Ok((result, file_entry.version))
516 }
517
518 pub fn apply_delta(
565 &self,
566 path: &str,
567 delta: &crate::fs::delta::Delta,
568 ) -> Result<u64, EmbrFSError> {
569 use crate::fs::delta::{analyze_delta, DeltaType};
570
571 let (file_entry, _manifest_version) = self
573 .manifest
574 .get_file(path)
575 .ok_or_else(|| EmbrFSError::FileNotFound(path.to_string()))?;
576
577 if file_entry.deleted {
578 return Err(EmbrFSError::FileNotFound(path.to_string()));
579 }
580
581 if let Some(expected) = delta.expected_version {
583 if file_entry.version != expected {
584 return Err(EmbrFSError::VersionMismatch {
585 expected,
586 actual: file_entry.version,
587 });
588 }
589 }
590
591 let chunk_size = DEFAULT_CHUNK_SIZE;
593 let affected = analyze_delta(
594 delta,
595 file_entry.size,
596 chunk_size,
597 file_entry.chunk_offsets.as_deref(),
598 );
599
600 if affected.is_empty() {
601 return Ok(file_entry.version);
603 }
604
605 match &delta.delta_type {
607 DeltaType::ByteReplace {
608 offset,
609 old_value,
610 new_value,
611 } => self.apply_byte_replace(path, &file_entry, *offset, *old_value, *new_value),
612
613 DeltaType::MultiByteReplace { changes } => {
614 self.apply_multi_byte_replace(path, &file_entry, changes)
615 }
616
617 DeltaType::RangeReplace {
618 offset,
619 old_data,
620 new_data,
621 } => {
622 if old_data.len() == new_data.len() {
623 self.apply_same_length_replace(path, &file_entry, *offset, old_data, new_data)
625 } else {
626 self.apply_length_changing_replace(
628 path,
629 &file_entry,
630 *offset,
631 old_data,
632 new_data,
633 )
634 }
635 }
636
637 DeltaType::Append { data } => self.apply_append(path, &file_entry, data),
638
639 DeltaType::Truncate {
640 new_length,
641 truncated_data: _,
642 } => self.apply_truncate(path, &file_entry, *new_length),
643
644 DeltaType::Insert { .. } | DeltaType::Delete { .. } => {
645 Err(EmbrFSError::InvalidOperation(
648 "Insert/Delete operations require full file rewrite - use read_file + write_file".to_string(),
649 ))
650 }
651 }
652 }
653
654 pub fn encode_chunk(&self, data: &[u8], path: Option<&str>) -> SparseVec {
659 if self.holographic_mode {
660 self.reversible_encoder.write().unwrap().encode(data)
661 } else {
662 SparseVec::encode_data(data, &self.config, path)
663 }
664 }
665
666 pub fn decode_chunk(
671 &self,
672 vec: &SparseVec,
673 path: Option<&str>,
674 original_size: usize,
675 ) -> Vec<u8> {
676 if self.holographic_mode {
677 self.reversible_encoder
678 .read()
679 .unwrap()
680 .decode(vec, original_size)
681 } else {
682 vec.decode_data(&self.config, path, original_size)
683 }
684 }
685
686 pub fn encode_chunk_with_format(
691 &self,
692 data: &[u8],
693 path: Option<&str>,
694 encoding_format: Option<u8>,
695 ) -> SparseVec {
696 if encoding_format == Some(ENCODING_FORMAT_REVERSIBLE_VSA) {
697 self.reversible_encoder.write().unwrap().encode(data)
698 } else {
699 SparseVec::encode_data(data, &self.config, path)
700 }
701 }
702
703 pub fn decode_chunk_with_format(
708 &self,
709 vec: &SparseVec,
710 path: Option<&str>,
711 original_size: usize,
712 encoding_format: Option<u8>,
713 ) -> Vec<u8> {
714 if encoding_format == Some(ENCODING_FORMAT_REVERSIBLE_VSA) {
715 self.reversible_encoder
716 .read()
717 .unwrap()
718 .decode(vec, original_size)
719 } else {
720 vec.decode_data(&self.config, path, original_size)
721 }
722 }
723
724 fn apply_byte_replace(
726 &self,
727 path: &str,
728 file_entry: &crate::fs::versioned::manifest::VersionedFileEntry,
729 offset: usize,
730 _old_value: u8,
731 new_value: u8,
732 ) -> Result<u64, EmbrFSError> {
733 let chunk_size = DEFAULT_CHUNK_SIZE;
734 let chunk_idx = offset / chunk_size;
735
736 if chunk_idx >= file_entry.chunks.len() {
737 return Err(EmbrFSError::InvalidOperation(
738 "Offset out of bounds".to_string(),
739 ));
740 }
741
742 let chunk_id = file_entry.chunks[chunk_idx];
743
744 let (chunk, _) = self
746 .chunk_store
747 .get(chunk_id)
748 .ok_or(EmbrFSError::ChunkNotFound(chunk_id))?;
749
750 let decoded = chunk
751 .vector
752 .decode_data(&self.config, Some(path), chunk.original_size);
753
754 let corrected = self
755 .corrections
756 .get(chunk_id as u64)
757 .map(|(corr, _)| corr.apply(&decoded))
758 .unwrap_or(decoded);
759
760 let offset_in_chunk = offset % chunk_size;
762 let mut modified = corrected;
763 if offset_in_chunk < modified.len() {
764 modified[offset_in_chunk] = new_value;
765 }
766
767 let new_vec = self.encode_chunk(&modified, Some(path));
769
770 let decoded_new = self.decode_chunk(&new_vec, Some(path), modified.len());
772 let correction =
773 crate::correction::ChunkCorrection::new(chunk_id as u64, &modified, &decoded_new);
774
775 let mut hasher = Sha256::new();
777 hasher.update(&modified);
778 let hash = hasher.finalize();
779 let mut hash_bytes = [0u8; 8];
780 hash_bytes.copy_from_slice(&hash[0..8]);
781
782 let new_chunk = crate::versioned::VersionedChunk::new(new_vec, modified.len(), hash_bytes);
784
785 let store_version = self.chunk_store.version();
787 self.chunk_store
788 .insert(chunk_id, new_chunk, store_version)?;
789
790 if correction.needs_correction() {
791 let corrections_version = self.corrections.current_version();
792 self.corrections
793 .update(chunk_id as u64, correction, corrections_version)?;
794 }
795
796 let new_version = self.global_version.fetch_add(1, Ordering::SeqCst);
798 let mut updated_entry = file_entry.clone();
799 updated_entry.version = new_version;
800 self.manifest
801 .update_file(path, updated_entry, file_entry.version)?;
802
803 Ok(new_version)
804 }
805
806 fn apply_multi_byte_replace(
808 &self,
809 path: &str,
810 file_entry: &crate::fs::versioned::manifest::VersionedFileEntry,
811 changes: &[(usize, u8, u8)],
812 ) -> Result<u64, EmbrFSError> {
813 let chunk_size = DEFAULT_CHUNK_SIZE;
815 let mut changes_by_chunk: std::collections::HashMap<usize, Vec<(usize, u8)>> =
816 std::collections::HashMap::new();
817
818 for &(offset, _old_val, new_val) in changes {
819 let chunk_idx = offset / chunk_size;
820 let offset_in_chunk = offset % chunk_size;
821 changes_by_chunk
822 .entry(chunk_idx)
823 .or_default()
824 .push((offset_in_chunk, new_val));
825 }
826
827 for (chunk_idx, chunk_changes) in changes_by_chunk {
829 if chunk_idx >= file_entry.chunks.len() {
830 continue;
831 }
832
833 let chunk_id = file_entry.chunks[chunk_idx];
834
835 let (chunk, _) = self
837 .chunk_store
838 .get(chunk_id)
839 .ok_or(EmbrFSError::ChunkNotFound(chunk_id))?;
840
841 let decoded = chunk
842 .vector
843 .decode_data(&self.config, Some(path), chunk.original_size);
844
845 let corrected = self
846 .corrections
847 .get(chunk_id as u64)
848 .map(|(corr, _)| corr.apply(&decoded))
849 .unwrap_or(decoded);
850
851 let mut modified = corrected;
853 for (offset_in_chunk, new_val) in chunk_changes {
854 if offset_in_chunk < modified.len() {
855 modified[offset_in_chunk] = new_val;
856 }
857 }
858
859 let new_vec = self.encode_chunk(&modified, Some(path));
861 let decoded_new = self.decode_chunk(&new_vec, Some(path), modified.len());
862 let correction =
863 crate::correction::ChunkCorrection::new(chunk_id as u64, &modified, &decoded_new);
864
865 let mut hasher = Sha256::new();
866 hasher.update(&modified);
867 let hash = hasher.finalize();
868 let mut hash_bytes = [0u8; 8];
869 hash_bytes.copy_from_slice(&hash[0..8]);
870
871 let new_chunk =
872 crate::versioned::VersionedChunk::new(new_vec, modified.len(), hash_bytes);
873 let store_version = self.chunk_store.version();
874 self.chunk_store
875 .insert(chunk_id, new_chunk, store_version)?;
876
877 if correction.needs_correction() {
878 let corrections_version = self.corrections.current_version();
879 self.corrections
880 .update(chunk_id as u64, correction, corrections_version)?;
881 }
882 }
883
884 let new_version = self.global_version.fetch_add(1, Ordering::SeqCst);
886 let mut updated_entry = file_entry.clone();
887 updated_entry.version = new_version;
888 self.manifest
889 .update_file(path, updated_entry, file_entry.version)?;
890
891 Ok(new_version)
892 }
893
894 fn apply_same_length_replace(
896 &self,
897 path: &str,
898 file_entry: &crate::fs::versioned::manifest::VersionedFileEntry,
899 offset: usize,
900 _old_data: &[u8],
901 new_data: &[u8],
902 ) -> Result<u64, EmbrFSError> {
903 let changes: Vec<(usize, u8, u8)> = new_data
905 .iter()
906 .enumerate()
907 .map(|(i, &new_val)| (offset + i, 0, new_val)) .collect();
909
910 self.apply_multi_byte_replace(path, file_entry, &changes)
911 }
912
913 fn apply_length_changing_replace(
915 &self,
916 path: &str,
917 file_entry: &crate::fs::versioned::manifest::VersionedFileEntry,
918 offset: usize,
919 old_data: &[u8],
920 new_data: &[u8],
921 ) -> Result<u64, EmbrFSError> {
922 let (content, _) = self.read_file(path)?;
924
925 if offset + old_data.len() <= content.len()
927 && &content[offset..offset + old_data.len()] != old_data
928 {
929 return Err(EmbrFSError::VersionMismatch {
930 expected: 0,
931 actual: 1,
932 });
933 }
934
935 let before = &content[..offset];
937 let after = if offset + old_data.len() < content.len() {
938 &content[offset + old_data.len()..]
939 } else {
940 &[]
941 };
942
943 let mut new_content = Vec::with_capacity(before.len() + new_data.len() + after.len());
944 new_content.extend_from_slice(before);
945 new_content.extend_from_slice(new_data);
946 new_content.extend_from_slice(after);
947
948 self.write_file(path, &new_content, Some(file_entry.version))
950 }
951
952 fn apply_append(
954 &self,
955 path: &str,
956 file_entry: &crate::fs::versioned::manifest::VersionedFileEntry,
957 data: &[u8],
958 ) -> Result<u64, EmbrFSError> {
959 if data.is_empty() {
960 return Ok(file_entry.version);
961 }
962
963 let chunk_size = DEFAULT_CHUNK_SIZE;
964 let current_size = file_entry.size;
965
966 let bytes_in_last_chunk = if current_size == 0 {
968 0
969 } else {
970 ((current_size - 1) % chunk_size) + 1
971 };
972 let space_in_last = chunk_size - bytes_in_last_chunk;
973
974 let mut chunk_ids = file_entry.chunks.clone();
975 let mut new_size = current_size;
976
977 let mut remaining_data = data;
979 if space_in_last > 0 && !file_entry.chunks.is_empty() && bytes_in_last_chunk > 0 {
980 let last_chunk_id = *file_entry.chunks.last().unwrap();
981 let fill_amount = space_in_last.min(remaining_data.len());
982
983 let (chunk, _) = self
985 .chunk_store
986 .get(last_chunk_id)
987 .ok_or(EmbrFSError::ChunkNotFound(last_chunk_id))?;
988
989 let decoded = chunk
990 .vector
991 .decode_data(&self.config, Some(path), chunk.original_size);
992
993 let mut corrected = self
994 .corrections
995 .get(last_chunk_id as u64)
996 .map(|(corr, _)| corr.apply(&decoded))
997 .unwrap_or(decoded);
998
999 corrected.extend_from_slice(&remaining_data[..fill_amount]);
1001
1002 let new_vec = self.encode_chunk(&corrected, Some(path));
1004 let decoded_new = self.decode_chunk(&new_vec, Some(path), corrected.len());
1005 let correction = crate::correction::ChunkCorrection::new(
1006 last_chunk_id as u64,
1007 &corrected,
1008 &decoded_new,
1009 );
1010
1011 let mut hasher = Sha256::new();
1012 hasher.update(&corrected);
1013 let hash = hasher.finalize();
1014 let mut hash_bytes = [0u8; 8];
1015 hash_bytes.copy_from_slice(&hash[0..8]);
1016
1017 let new_chunk =
1018 crate::versioned::VersionedChunk::new(new_vec, corrected.len(), hash_bytes);
1019 let store_version = self.chunk_store.version();
1020 self.chunk_store
1021 .insert(last_chunk_id, new_chunk, store_version)?;
1022
1023 if correction.needs_correction() {
1024 let corrections_version = self.corrections.current_version();
1025 self.corrections
1026 .update(last_chunk_id as u64, correction, corrections_version)?;
1027 }
1028
1029 remaining_data = &remaining_data[fill_amount..];
1030 new_size += fill_amount;
1031 }
1032
1033 for chunk_data in remaining_data.chunks(chunk_size) {
1035 let chunk_id = self.allocate_chunk_id();
1036
1037 let chunk_vec = self.encode_chunk(chunk_data, Some(path));
1039 let decoded = self.decode_chunk(&chunk_vec, Some(path), chunk_data.len());
1040 let correction =
1041 crate::correction::ChunkCorrection::new(chunk_id as u64, chunk_data, &decoded);
1042
1043 let mut hasher = Sha256::new();
1044 hasher.update(chunk_data);
1045 let hash = hasher.finalize();
1046 let mut hash_bytes = [0u8; 8];
1047 hash_bytes.copy_from_slice(&hash[0..8]);
1048
1049 let versioned_chunk =
1050 crate::versioned::VersionedChunk::new(chunk_vec, chunk_data.len(), hash_bytes);
1051 let store_version = self.chunk_store.version();
1052 self.chunk_store
1053 .insert(chunk_id, versioned_chunk, store_version)?;
1054
1055 if correction.needs_correction() {
1056 let corrections_version = self.corrections.current_version();
1057 self.corrections
1058 .update(chunk_id as u64, correction, corrections_version)?;
1059 }
1060
1061 chunk_ids.push(chunk_id);
1062 new_size += chunk_data.len();
1063 }
1064
1065 let new_version = self.global_version.fetch_add(1, Ordering::SeqCst);
1067 let mut updated_entry = file_entry.clone();
1068 updated_entry.chunks = chunk_ids;
1069 updated_entry.size = new_size;
1070 updated_entry.version = new_version;
1071
1072 let chunk_sizes: Vec<usize> = updated_entry
1074 .chunks
1075 .iter()
1076 .enumerate()
1077 .map(|(i, _)| {
1078 if i < updated_entry.chunks.len() - 1 {
1079 chunk_size
1080 } else {
1081 new_size - (i * chunk_size)
1082 }
1083 })
1084 .collect();
1085 updated_entry.build_offset_index(&chunk_sizes);
1086
1087 self.manifest
1088 .update_file(path, updated_entry, file_entry.version)?;
1089
1090 Ok(new_version)
1091 }
1092
1093 fn apply_truncate(
1095 &self,
1096 path: &str,
1097 file_entry: &crate::fs::versioned::manifest::VersionedFileEntry,
1098 new_length: usize,
1099 ) -> Result<u64, EmbrFSError> {
1100 if new_length >= file_entry.size {
1101 return Ok(file_entry.version);
1102 }
1103
1104 let chunk_size = DEFAULT_CHUNK_SIZE;
1105 let new_chunk_count = new_length.div_ceil(chunk_size);
1106
1107 let mut new_chunks = file_entry.chunks.clone();
1109 new_chunks.truncate(new_chunk_count);
1110
1111 if new_length > 0 {
1113 let last_chunk_bytes = new_length - ((new_chunk_count - 1) * chunk_size);
1114 if last_chunk_bytes < chunk_size && !new_chunks.is_empty() {
1115 let last_chunk_id = *new_chunks.last().unwrap();
1116
1117 let (chunk, _) = self
1119 .chunk_store
1120 .get(last_chunk_id)
1121 .ok_or(EmbrFSError::ChunkNotFound(last_chunk_id))?;
1122
1123 let decoded =
1124 chunk
1125 .vector
1126 .decode_data(&self.config, Some(path), chunk.original_size);
1127
1128 let corrected = self
1129 .corrections
1130 .get(last_chunk_id as u64)
1131 .map(|(corr, _)| corr.apply(&decoded))
1132 .unwrap_or(decoded);
1133
1134 let truncated: Vec<u8> = corrected.into_iter().take(last_chunk_bytes).collect();
1136
1137 let new_vec = self.encode_chunk(&truncated, Some(path));
1139 let decoded_new = self.decode_chunk(&new_vec, Some(path), truncated.len());
1140 let correction = crate::correction::ChunkCorrection::new(
1141 last_chunk_id as u64,
1142 &truncated,
1143 &decoded_new,
1144 );
1145
1146 let mut hasher = Sha256::new();
1147 hasher.update(&truncated);
1148 let hash = hasher.finalize();
1149 let mut hash_bytes = [0u8; 8];
1150 hash_bytes.copy_from_slice(&hash[0..8]);
1151
1152 let new_chunk =
1153 crate::versioned::VersionedChunk::new(new_vec, truncated.len(), hash_bytes);
1154 let store_version = self.chunk_store.version();
1155 self.chunk_store
1156 .insert(last_chunk_id, new_chunk, store_version)?;
1157
1158 if correction.needs_correction() {
1159 let corrections_version = self.corrections.current_version();
1160 self.corrections.update(
1161 last_chunk_id as u64,
1162 correction,
1163 corrections_version,
1164 )?;
1165 }
1166 }
1167 }
1168
1169 let new_version = self.global_version.fetch_add(1, Ordering::SeqCst);
1171 let mut updated_entry = file_entry.clone();
1172 updated_entry.chunks = new_chunks;
1173 updated_entry.size = new_length;
1174 updated_entry.version = new_version;
1175
1176 let chunk_sizes: Vec<usize> = updated_entry
1178 .chunks
1179 .iter()
1180 .enumerate()
1181 .map(|(i, _)| {
1182 if i < updated_entry.chunks.len() - 1 {
1183 chunk_size
1184 } else {
1185 new_length - (i * chunk_size)
1186 }
1187 })
1188 .collect();
1189 updated_entry.build_offset_index(&chunk_sizes);
1190
1191 self.manifest
1192 .update_file(path, updated_entry, file_entry.version)?;
1193
1194 Ok(new_version)
1195 }
1196
1197 pub fn write_file(
1219 &self,
1220 path: &str,
1221 data: &[u8],
1222 expected_version: Option<u64>,
1223 ) -> Result<u64, EmbrFSError> {
1224 let existing = self.manifest.get_file(path);
1226
1227 match (&existing, expected_version) {
1228 (Some((entry, _)), Some(expected_ver)) => {
1229 if entry.version != expected_ver {
1231 return Err(EmbrFSError::VersionMismatch {
1232 expected: expected_ver,
1233 actual: entry.version,
1234 });
1235 }
1236 }
1237 (Some(_), None) => {
1238 return Err(EmbrFSError::FileExists(path.to_string()));
1240 }
1241 (None, Some(_)) => {
1242 return Err(EmbrFSError::FileNotFound(path.to_string()));
1244 }
1245 (None, None) => {
1246 }
1248 }
1249
1250 let chunks = self.chunk_data(data);
1252 let mut chunk_ids = Vec::new();
1253
1254 let store_version = self.chunk_store.version();
1256
1257 let mut chunk_updates = Vec::new();
1259 let mut corrections_to_add = Vec::new();
1260
1261 for chunk_data in chunks {
1262 let chunk_id = self.allocate_chunk_id();
1263
1264 let chunk_vec = self.encode_chunk(chunk_data, Some(path));
1266 let decoded = self.decode_chunk(&chunk_vec, Some(path), chunk_data.len());
1267
1268 let mut hasher = Sha256::new();
1270 hasher.update(chunk_data);
1271 let hash = hasher.finalize();
1272 let mut hash_bytes = [0u8; 8];
1273 hash_bytes.copy_from_slice(&hash[0..8]);
1274
1275 let versioned_chunk = VersionedChunk::new(chunk_vec, chunk_data.len(), hash_bytes);
1277
1278 chunk_updates.push((chunk_id, versioned_chunk));
1279
1280 let correction =
1282 crate::correction::ChunkCorrection::new(chunk_id as u64, chunk_data, &decoded);
1283 corrections_to_add.push((chunk_id as u64, correction));
1284
1285 chunk_ids.push(chunk_id);
1286 }
1287
1288 if expected_version.is_none() {
1290 self.chunk_store.batch_insert_new(chunk_updates)?;
1292 } else {
1293 self.chunk_store
1295 .batch_insert(chunk_updates, store_version)?;
1296 }
1297
1298 if expected_version.is_none() {
1300 self.corrections.batch_insert_new(corrections_to_add)?;
1302 } else {
1303 let corrections_version = self.corrections.current_version();
1305 self.corrections
1306 .batch_update(corrections_to_add, corrections_version)?;
1307 }
1308
1309 let is_text = is_text_data(data);
1311 let mut new_entry =
1312 VersionedFileEntry::new(path.to_string(), is_text, data.len(), chunk_ids.clone());
1313
1314 if self.holographic_mode {
1316 new_entry.encoding_format = Some(ENCODING_FORMAT_REVERSIBLE_VSA);
1317 }
1318
1319 let file_version = if let Some((entry, _)) = existing {
1320 self.manifest.update_file(path, new_entry, entry.version)?;
1321 entry.version + 1
1322 } else {
1323 self.manifest.add_file(new_entry)?;
1324 0
1325 };
1326
1327 self.bundle_chunks_to_root(&chunk_ids)?;
1329
1330 self.global_version.fetch_add(1, Ordering::AcqRel);
1332
1333 Ok(file_version)
1334 }
1335
1336 pub fn write_file_compressed(
1364 &self,
1365 path: &str,
1366 data: &[u8],
1367 expected_version: Option<u64>,
1368 ) -> Result<u64, EmbrFSError> {
1369 let profile = self.profiler.for_path(path);
1371 let write_opts = profile.to_write_options();
1372
1373 let (compressed_data, codec_byte) = if write_opts.codec == CompressionCodec::None {
1375 (data.to_vec(), 0u8)
1377 } else {
1378 let wrapped = wrap_or_legacy(PayloadKind::EngramBincode, write_opts, data)
1380 .map_err(|e| EmbrFSError::IoError(format!("Compression failed: {}", e)))?;
1381 let codec = match write_opts.codec {
1382 CompressionCodec::None => 0,
1383 CompressionCodec::Zstd => 1,
1384 CompressionCodec::Lz4 => 2,
1385 };
1386 (wrapped, codec)
1387 };
1388
1389 let existing = self.manifest.get_file(path);
1391
1392 match (&existing, expected_version) {
1393 (Some((entry, _)), Some(expected_ver)) => {
1394 if entry.version != expected_ver {
1395 return Err(EmbrFSError::VersionMismatch {
1396 expected: expected_ver,
1397 actual: entry.version,
1398 });
1399 }
1400 }
1401 (Some(_), None) => {
1402 return Err(EmbrFSError::FileExists(path.to_string()));
1403 }
1404 (None, Some(_)) => {
1405 return Err(EmbrFSError::FileNotFound(path.to_string()));
1406 }
1407 (None, None) => {}
1408 }
1409
1410 let chunks = self.chunk_data(&compressed_data);
1412 let mut chunk_ids = Vec::new();
1413
1414 let store_version = self.chunk_store.version();
1416
1417 let mut chunk_updates = Vec::new();
1419 let mut corrections_to_add = Vec::new();
1420
1421 for chunk_data in chunks {
1422 let chunk_id = self.allocate_chunk_id();
1423
1424 let chunk_vec = self.encode_chunk(chunk_data, Some(path));
1426 let decoded = self.decode_chunk(&chunk_vec, Some(path), chunk_data.len());
1427
1428 let mut hasher = Sha256::new();
1430 hasher.update(chunk_data);
1431 let hash = hasher.finalize();
1432 let mut hash_bytes = [0u8; 8];
1433 hash_bytes.copy_from_slice(&hash[0..8]);
1434
1435 let versioned_chunk = VersionedChunk::new(chunk_vec, chunk_data.len(), hash_bytes);
1437 chunk_updates.push((chunk_id, versioned_chunk));
1438
1439 let correction =
1441 crate::correction::ChunkCorrection::new(chunk_id as u64, chunk_data, &decoded);
1442 corrections_to_add.push((chunk_id as u64, correction));
1443
1444 chunk_ids.push(chunk_id);
1445 }
1446
1447 if expected_version.is_none() {
1449 self.chunk_store.batch_insert_new(chunk_updates)?;
1450 } else {
1451 self.chunk_store
1452 .batch_insert(chunk_updates, store_version)?;
1453 }
1454
1455 if expected_version.is_none() {
1457 self.corrections.batch_insert_new(corrections_to_add)?;
1458 } else {
1459 let corrections_version = self.corrections.current_version();
1460 self.corrections
1461 .batch_update(corrections_to_add, corrections_version)?;
1462 }
1463
1464 let is_text = is_text_data(data);
1466 let mut new_entry = if codec_byte == 0 {
1467 VersionedFileEntry::new(path.to_string(), is_text, data.len(), chunk_ids.clone())
1468 } else {
1469 VersionedFileEntry::new_compressed(
1470 path.to_string(),
1471 is_text,
1472 compressed_data.len(),
1473 data.len(),
1474 codec_byte,
1475 chunk_ids.clone(),
1476 )
1477 };
1478
1479 if self.holographic_mode {
1481 new_entry.encoding_format = Some(ENCODING_FORMAT_REVERSIBLE_VSA);
1482 }
1483
1484 let file_version = if let Some((entry, _)) = existing {
1485 self.manifest.update_file(path, new_entry, entry.version)?;
1486 entry.version + 1
1487 } else {
1488 self.manifest.add_file(new_entry)?;
1489 0
1490 };
1491
1492 self.bundle_chunks_to_root(&chunk_ids)?;
1494
1495 self.global_version.fetch_add(1, Ordering::AcqRel);
1497
1498 Ok(file_version)
1499 }
1500
1501 pub fn delete_file(&self, path: &str, expected_version: u64) -> Result<(), EmbrFSError> {
1506 self.manifest.remove_file(path, expected_version)?;
1507 self.global_version.fetch_add(1, Ordering::AcqRel);
1508 Ok(())
1509 }
1510
1511 pub fn list_files(&self) -> Vec<String> {
1513 self.manifest.list_files()
1514 }
1515
1516 pub fn exists(&self, path: &str) -> bool {
1518 self.manifest
1519 .get_file(path)
1520 .map(|(entry, _)| !entry.deleted)
1521 .unwrap_or(false)
1522 }
1523
1524 pub fn stats(&self) -> FilesystemStats {
1526 let manifest_stats = self.manifest.stats();
1527 let chunk_stats = self.chunk_store.stats();
1528 let correction_stats = self.corrections.stats();
1529
1530 FilesystemStats {
1531 total_files: manifest_stats.total_files,
1532 active_files: manifest_stats.active_files,
1533 deleted_files: manifest_stats.deleted_files,
1534 total_chunks: chunk_stats.total_chunks as u64,
1535 total_size_bytes: manifest_stats.total_size_bytes,
1536 correction_overhead_bytes: correction_stats.total_correction_bytes,
1537 version: self.version(),
1538 }
1539 }
1540
1541 pub fn write_file_holographic(
1554 &self,
1555 path: &str,
1556 data: &[u8],
1557 expected_version: Option<u64>,
1558 ) -> Result<u64, EmbrFSError> {
1559 let existing = self.manifest.get_file(path);
1561
1562 match (&existing, expected_version) {
1563 (Some((entry, _)), Some(expected_ver)) => {
1564 if entry.version != expected_ver {
1565 return Err(EmbrFSError::VersionMismatch {
1566 expected: expected_ver,
1567 actual: entry.version,
1568 });
1569 }
1570 }
1571 (Some(_), None) => {
1572 return Err(EmbrFSError::FileExists(path.to_string()));
1573 }
1574 (None, Some(_)) => {
1575 return Err(EmbrFSError::FileNotFound(path.to_string()));
1576 }
1577 (None, None) => {}
1578 }
1579
1580 let mut encoder = self.reversible_encoder.write().unwrap();
1582 let encoded_chunks = encoder.encode_chunked(data, REVERSIBLE_CHUNK_SIZE);
1583
1584 let decoded = encoder.decode_chunked(&encoded_chunks, REVERSIBLE_CHUNK_SIZE, data.len());
1586 drop(encoder);
1587
1588 let store_version = self.chunk_store.version();
1590 let mut chunk_ids = Vec::with_capacity(encoded_chunks.len());
1591 let mut chunk_updates = Vec::with_capacity(encoded_chunks.len());
1592 let mut corrections_to_add = Vec::new();
1593
1594 for (chunk_idx, chunk_vec) in encoded_chunks.into_iter().enumerate() {
1595 let chunk_id = self.allocate_chunk_id();
1596 chunk_ids.push(chunk_id);
1597
1598 let start = chunk_idx * REVERSIBLE_CHUNK_SIZE;
1600 let end = (start + REVERSIBLE_CHUNK_SIZE).min(data.len());
1601 let chunk_data = &data[start..end];
1602 let decoded_chunk = &decoded[start..end];
1603
1604 let mut hasher = Sha256::new();
1606 hasher.update(chunk_data);
1607 let hash = hasher.finalize();
1608 let mut hash_bytes = [0u8; 8];
1609 hash_bytes.copy_from_slice(&hash[0..8]);
1610
1611 let versioned_chunk = VersionedChunk::new(chunk_vec, chunk_data.len(), hash_bytes);
1612 chunk_updates.push((chunk_id, versioned_chunk));
1613
1614 let correction =
1616 crate::correction::ChunkCorrection::new(chunk_id as u64, chunk_data, decoded_chunk);
1617 corrections_to_add.push((chunk_id as u64, correction));
1618 }
1619
1620 if expected_version.is_none() {
1622 self.chunk_store.batch_insert_new(chunk_updates)?;
1623 } else {
1624 self.chunk_store
1625 .batch_insert(chunk_updates, store_version)?;
1626 }
1627
1628 if expected_version.is_none() {
1630 self.corrections.batch_insert_new(corrections_to_add)?;
1631 } else {
1632 let corrections_version = self.corrections.current_version();
1633 self.corrections
1634 .batch_update(corrections_to_add, corrections_version)?;
1635 }
1636
1637 let is_text = is_text_data(data);
1639 let new_entry = VersionedFileEntry::new_holographic(
1640 path.to_string(),
1641 is_text,
1642 data.len(),
1643 chunk_ids.clone(),
1644 ENCODING_FORMAT_REVERSIBLE_VSA,
1645 );
1646
1647 let file_version = if let Some((entry, _)) = existing {
1648 self.manifest.update_file(path, new_entry, entry.version)?;
1649 entry.version + 1
1650 } else {
1651 self.manifest.add_file(new_entry)?;
1652 0
1653 };
1654
1655 self.bundle_chunks_to_root(&chunk_ids)?;
1657
1658 self.global_version.fetch_add(1, Ordering::AcqRel);
1660
1661 Ok(file_version)
1662 }
1663
1664 pub fn read_file_holographic(&self, path: &str) -> Result<(Vec<u8>, u64), EmbrFSError> {
1673 let (file_entry, _) = self
1675 .manifest
1676 .get_file(path)
1677 .ok_or_else(|| EmbrFSError::FileNotFound(path.to_string()))?;
1678
1679 if file_entry.deleted {
1680 return Err(EmbrFSError::FileNotFound(path.to_string()));
1681 }
1682
1683 if file_entry.chunks.is_empty() {
1685 return Ok((Vec::new(), file_entry.version));
1686 }
1687
1688 let encoding_format = file_entry.encoding_format.unwrap_or(ENCODING_FORMAT_LEGACY);
1690
1691 match encoding_format {
1692 ENCODING_FORMAT_REVERSIBLE_VSA => self.read_file_holographic_reversible(&file_entry),
1693 _ => self.read_file_holographic_legacy(&file_entry),
1695 }
1696 }
1697
1698 fn read_file_holographic_reversible(
1700 &self,
1701 file_entry: &VersionedFileEntry,
1702 ) -> Result<(Vec<u8>, u64), EmbrFSError> {
1703 let mut chunk_vecs = Vec::with_capacity(file_entry.chunks.len());
1705 for &chunk_id in &file_entry.chunks {
1706 let (chunk, _) = self
1707 .chunk_store
1708 .get(chunk_id)
1709 .ok_or(EmbrFSError::ChunkNotFound(chunk_id))?;
1710 chunk_vecs.push((*chunk.vector).clone());
1712 }
1713
1714 let encoder = self.reversible_encoder.read().unwrap();
1716 let mut reconstructed =
1717 encoder.decode_chunked(&chunk_vecs, REVERSIBLE_CHUNK_SIZE, file_entry.size);
1718 drop(encoder);
1719
1720 for (chunk_idx, &chunk_id) in file_entry.chunks.iter().enumerate() {
1722 if let Some((correction, _)) = self.corrections.get(chunk_id as u64) {
1723 let start = chunk_idx * REVERSIBLE_CHUNK_SIZE;
1725 let end = (start + REVERSIBLE_CHUNK_SIZE).min(file_entry.size);
1726
1727 let chunk_data = &reconstructed[start..end];
1729 let corrected = correction.apply(chunk_data);
1730
1731 reconstructed[start..end].copy_from_slice(&corrected[..end - start]);
1733 }
1734 }
1735
1736 reconstructed.truncate(file_entry.size);
1738
1739 Ok((reconstructed, file_entry.version))
1740 }
1741
1742 fn read_file_holographic_legacy(
1744 &self,
1745 file_entry: &VersionedFileEntry,
1746 ) -> Result<(Vec<u8>, u64), EmbrFSError> {
1747 let chunk_id = file_entry.chunks[0];
1749 let (chunk, _) = self
1750 .chunk_store
1751 .get(chunk_id)
1752 .ok_or(EmbrFSError::ChunkNotFound(chunk_id))?;
1753
1754 let projection = self.sparsevec_to_projection(&chunk.vector);
1756
1757 let codebook = self.codebook.read().unwrap();
1759 let mut reconstructed = codebook.reconstruct(&projection, file_entry.size);
1760 drop(codebook);
1761
1762 if let Some((correction, _)) = self.corrections.get(chunk_id as u64) {
1764 reconstructed = correction.apply(&reconstructed);
1765 }
1766
1767 reconstructed.truncate(file_entry.size);
1769
1770 Ok((reconstructed, file_entry.version))
1771 }
1772
1773 #[allow(dead_code)]
1782 fn projection_to_sparsevec(&self, projection: &ProjectionResult) -> SparseVec {
1783 let mut pos = Vec::new();
1784 let mut neg = Vec::new();
1785
1786 for (&key, word) in &projection.coefficients {
1787 let value = word.decode();
1788 let basis_id = (key / 1000) as usize; let chunk_idx = (key % 1000) as usize;
1790
1791 let base_idx = (basis_id * 1000 + chunk_idx) % DIM;
1794
1795 if value > 0 {
1796 pos.push(base_idx);
1797 } else if value < 0 {
1798 neg.push(base_idx);
1799 }
1800 }
1801
1802 pos.sort_unstable();
1803 pos.dedup();
1804 neg.sort_unstable();
1805 neg.dedup();
1806
1807 SparseVec { pos, neg }
1808 }
1809
1810 fn sparsevec_to_projection(&self, vec: &SparseVec) -> ProjectionResult {
1812 use embeddenator_vsa::{BalancedTernaryWord, WordMetadata};
1813 use std::collections::HashMap;
1814
1815 let mut coefficients = HashMap::new();
1816
1817 for &idx in &vec.pos {
1819 let chunk_idx = idx % 1000;
1820 let basis_id = (idx / 1000) % 100; let key = (basis_id * 1000 + chunk_idx) as u32;
1822
1823 if let Ok(word) = BalancedTernaryWord::new(500, WordMetadata::Data) {
1825 coefficients.insert(key, word);
1826 }
1827 }
1828
1829 for &idx in &vec.neg {
1831 let chunk_idx = idx % 1000;
1832 let basis_id = (idx / 1000) % 100;
1833 let key = (basis_id * 1000 + chunk_idx) as u32;
1834
1835 if let Ok(word) = BalancedTernaryWord::new(-500, WordMetadata::Data) {
1837 coefficients.insert(key, word);
1838 }
1839 }
1840
1841 ProjectionResult {
1842 coefficients,
1843 residual: Vec::new(), outliers: Vec::new(),
1845 quality_score: 0.5,
1846 }
1847 }
1848
1849 #[allow(dead_code)]
1854 fn projection_to_correction(
1855 &self,
1856 chunk_id: u64,
1857 original: &[u8],
1858 projection: &ProjectionResult,
1859 ) -> crate::correction::ChunkCorrection {
1860 let codebook = self.codebook.read().unwrap();
1862 let reconstructed = codebook.reconstruct(projection, original.len());
1863 drop(codebook);
1864
1865 crate::correction::ChunkCorrection::new(chunk_id, original, &reconstructed)
1867 }
1868
1869 pub fn config(&self) -> &ReversibleVSAConfig {
1873 &self.config
1874 }
1875
1876 pub fn stream_decode(
1903 &self,
1904 path: &str,
1905 ) -> Result<crate::streaming::StreamingDecoder<'_>, EmbrFSError> {
1906 crate::streaming::StreamingDecoder::new(self, path)
1907 }
1908
1909 pub fn stream_decode_range(
1926 &self,
1927 path: &str,
1928 offset: usize,
1929 max_bytes: Option<usize>,
1930 ) -> Result<crate::streaming::StreamingDecoder<'_>, EmbrFSError> {
1931 let mut builder = crate::streaming::StreamingDecoderBuilder::new(self, path);
1932 if offset > 0 {
1933 builder = builder.with_offset(offset);
1934 }
1935 if let Some(max) = max_bytes {
1936 builder = builder.with_max_bytes(max);
1937 }
1938 builder.build()
1939 }
1940
1941 pub fn allocate_chunk_id(&self) -> ChunkId {
1943 self.next_chunk_id.fetch_add(1, Ordering::AcqRel) as ChunkId
1944 }
1945
1946 pub fn bundle_chunks_to_root_streaming(
1951 &self,
1952 chunk_ids: &[ChunkId],
1953 ) -> Result<(), EmbrFSError> {
1954 self.bundle_chunks_to_root(chunk_ids)
1955 }
1956
1957 fn chunk_data<'a>(&self, data: &'a [u8]) -> Vec<&'a [u8]> {
1961 data.chunks(DEFAULT_CHUNK_SIZE).collect()
1962 }
1963
1964 fn bundle_chunks_to_root(&self, chunk_ids: &[ChunkId]) -> Result<(), EmbrFSError> {
1966 loop {
1968 let root_lock = self.root.read().unwrap();
1970 let current_root = Arc::clone(&*root_lock);
1971 let current_version = self.root_version.load(Ordering::Acquire);
1972 drop(root_lock);
1973
1974 let mut new_root = (*current_root).clone();
1976 for &chunk_id in chunk_ids {
1977 if let Some((chunk, _)) = self.chunk_store.get(chunk_id) {
1978 new_root = new_root.bundle(&chunk.vector);
1979 }
1980 }
1981
1982 let mut root_lock = self.root.write().unwrap();
1984 let actual_version = self.root_version.load(Ordering::Acquire);
1985
1986 if actual_version == current_version {
1987 *root_lock = Arc::new(new_root);
1989 self.root_version.fetch_add(1, Ordering::AcqRel);
1990 return Ok(());
1991 }
1992
1993 drop(root_lock);
1995 std::thread::yield_now();
1997 }
1998 }
1999}
2000
2001impl Default for VersionedEmbrFS {
2002 fn default() -> Self {
2003 Self::new()
2004 }
2005}
2006
2007#[derive(Debug, Clone)]
2009pub struct FilesystemStats {
2010 pub total_files: usize,
2011 pub active_files: usize,
2012 pub deleted_files: usize,
2013 pub total_chunks: u64,
2014 pub total_size_bytes: usize,
2015 pub correction_overhead_bytes: u64,
2016 pub version: u64,
2017}
2018
2019fn is_text_data(data: &[u8]) -> bool {
2021 if data.is_empty() {
2022 return true;
2023 }
2024
2025 let sample_size = data.len().min(8192);
2026 let sample = &data[0..sample_size];
2027
2028 let non_printable = sample
2029 .iter()
2030 .filter(|&&b| b < 32 && b != b'\n' && b != b'\r' && b != b'\t')
2031 .count();
2032
2033 (non_printable as f64 / sample_size as f64) < 0.05
2034}
2035
2036#[cfg(test)]
2037mod tests {
2038 use super::*;
2039
2040 #[test]
2041 fn test_new_filesystem() {
2042 let fs = VersionedEmbrFS::new();
2043 assert_eq!(fs.version(), 0);
2044 assert_eq!(fs.list_files().len(), 0);
2045 }
2046
2047 #[test]
2048 fn test_write_and_read_file() {
2049 let fs = VersionedEmbrFS::new();
2050 let data = b"Hello, EmbrFS!";
2051
2052 let version = fs.write_file("test.txt", data, None).unwrap();
2054 assert_eq!(version, 0);
2055
2056 let (content, read_version) = fs.read_file("test.txt").unwrap();
2058 assert_eq!(&content[..], data);
2059 assert_eq!(read_version, 0);
2060 }
2061
2062 #[test]
2063 fn test_update_file_with_version_check() {
2064 let fs = VersionedEmbrFS::new();
2065
2066 let v1 = fs.write_file("test.txt", b"version 1", None).unwrap();
2068
2069 let v2 = fs.write_file("test.txt", b"version 2", Some(v1)).unwrap();
2071 assert_eq!(v2, v1 + 1);
2072
2073 let result = fs.write_file("test.txt", b"version 3", Some(v1));
2075 assert!(matches!(result, Err(EmbrFSError::VersionMismatch { .. })));
2076 }
2077
2078 #[test]
2079 fn test_delete_file() {
2080 let fs = VersionedEmbrFS::new();
2081
2082 let version = fs.write_file("test.txt", b"data", None).unwrap();
2084 fs.delete_file("test.txt", version).unwrap();
2085
2086 assert!(!fs.exists("test.txt"));
2088
2089 let result = fs.read_file("test.txt");
2091 assert!(matches!(result, Err(EmbrFSError::FileNotFound(_))));
2092 }
2093
2094 #[test]
2095 fn test_list_files() {
2096 let fs = VersionedEmbrFS::new();
2097
2098 fs.write_file("file1.txt", b"a", None).unwrap();
2099 fs.write_file("file2.txt", b"b", None).unwrap();
2100 fs.write_file("file3.txt", b"c", None).unwrap();
2101
2102 let files = fs.list_files();
2103 assert_eq!(files.len(), 3);
2104 assert!(files.contains(&"file1.txt".to_string()));
2105 assert!(files.contains(&"file2.txt".to_string()));
2106 assert!(files.contains(&"file3.txt".to_string()));
2107 }
2108
2109 #[test]
2110 fn test_large_file() {
2111 let fs = VersionedEmbrFS::new();
2112
2113 let data = vec![42u8; DEFAULT_CHUNK_SIZE * 3 + 100];
2115 fs.write_file("large.bin", &data, None).unwrap();
2116
2117 let (content, _) = fs.read_file("large.bin").unwrap();
2118 assert_eq!(content, data);
2119 }
2120
2121 #[test]
2122 fn test_stats() {
2123 let fs = VersionedEmbrFS::new();
2124
2125 fs.write_file("file1.txt", b"hello", None).unwrap();
2126 fs.write_file("file2.txt", b"world", None).unwrap();
2127
2128 let stats = fs.stats();
2129 assert_eq!(stats.active_files, 2);
2130 assert_eq!(stats.total_files, 2);
2131 assert_eq!(stats.deleted_files, 0);
2132 assert_eq!(stats.total_size_bytes, 10);
2133 }
2134
2135 #[test]
2136 fn test_write_and_read_compressed_file() {
2137 let fs = VersionedEmbrFS::new();
2138
2139 let config_data = b"[server]\nport = 8080\nhost = localhost";
2141 let version = fs
2142 .write_file_compressed("/etc/app.conf", config_data, None)
2143 .unwrap();
2144 assert_eq!(version, 0);
2145
2146 let (content, read_version) = fs.read_file("/etc/app.conf").unwrap();
2148 assert_eq!(&content[..], config_data);
2149 assert_eq!(read_version, 0);
2150 }
2151
2152 #[test]
2153 fn test_write_compressed_with_zstd_profile() {
2154 let fs = VersionedEmbrFS::new();
2155
2156 let binary_data: Vec<u8> = (0..1000).map(|i| [0xDE, 0xAD, 0xBE, 0xEF][i % 4]).collect();
2158 let version = fs
2159 .write_file_compressed("/usr/bin/myapp", &binary_data, None)
2160 .unwrap();
2161 assert_eq!(version, 0);
2162
2163 let (content, _) = fs.read_file("/usr/bin/myapp").unwrap();
2165 assert_eq!(content, binary_data);
2166 }
2167
2168 #[test]
2169 fn test_write_compressed_no_compression_for_media() {
2170 let fs = VersionedEmbrFS::new();
2171
2172 let media_data: Vec<u8> = (0..500).map(|i| [0xFF, 0xD8, 0xFF, 0xE0][i % 4]).collect();
2174 let version = fs
2175 .write_file_compressed("/photos/image.jpg", &media_data, None)
2176 .unwrap();
2177 assert_eq!(version, 0);
2178
2179 let (content, _) = fs.read_file("/photos/image.jpg").unwrap();
2181 assert_eq!(content, media_data);
2182 }
2183
2184 #[test]
2185 fn test_profiler_access() {
2186 let fs = VersionedEmbrFS::new();
2187 let profiler = fs.profiler();
2188
2189 let kernel_profile = profiler.for_path("/boot/vmlinuz");
2191 assert_eq!(kernel_profile.name, "Kernel");
2192
2193 let config_profile = profiler.for_path("/etc/nginx.conf");
2194 assert_eq!(config_profile.name, "Config");
2195 }
2196
2197 #[test]
2198 fn test_holographic_write_and_read() {
2199 let fs = VersionedEmbrFS::new_holographic();
2200 let data = b"Hello, Holographic EmbrFS!";
2201
2202 let version = fs.write_file_holographic("test.txt", data, None).unwrap();
2204 assert_eq!(version, 0);
2205
2206 let (content, read_version) = fs.read_file_holographic("test.txt").unwrap();
2208 assert_eq!(&content[..], data);
2209 assert_eq!(read_version, 0);
2210 }
2211
2212 #[test]
2213 fn test_holographic_accuracy() {
2214 let fs = VersionedEmbrFS::new_holographic();
2215
2216 let test_data: Vec<u8> = (0..1024).map(|i| (i % 256) as u8).collect();
2218
2219 fs.write_file_holographic("accuracy_test.bin", &test_data, None)
2221 .unwrap();
2222 let (content, _) = fs.read_file_holographic("accuracy_test.bin").unwrap();
2223
2224 assert_eq!(content, test_data);
2226
2227 let stats = fs.stats();
2229 let correction_ratio =
2230 stats.correction_overhead_bytes as f64 / stats.total_size_bytes as f64;
2231
2232 assert!(
2235 correction_ratio < 0.15,
2236 "Correction overhead too high: {:.1}%",
2237 correction_ratio * 100.0
2238 );
2239 }
2240
2241 #[test]
2242 fn test_holographic_large_file() {
2243 let fs = VersionedEmbrFS::new_holographic();
2244
2245 let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
2247
2248 fs.write_file_holographic("large_holo.bin", &data, None)
2249 .unwrap();
2250 let (content, _) = fs.read_file_holographic("large_holo.bin").unwrap();
2251
2252 assert_eq!(content, data);
2254 }
2255
2256 #[test]
2257 fn test_holographic_encoding_format_in_manifest() {
2258 let fs = VersionedEmbrFS::new_holographic();
2259 let data = b"Test encoding format";
2260
2261 fs.write_file_holographic("format_test.txt", data, None)
2262 .unwrap();
2263
2264 let (file_entry, _) = fs.manifest.get_file("format_test.txt").unwrap();
2266 assert_eq!(
2267 file_entry.encoding_format,
2268 Some(ENCODING_FORMAT_REVERSIBLE_VSA)
2269 );
2270 }
2271
2272 #[test]
2273 fn test_holographic_update_file() {
2274 let fs = VersionedEmbrFS::new_holographic();
2275
2276 let v1 = fs
2278 .write_file_holographic("update_test.txt", b"version 1", None)
2279 .unwrap();
2280 assert_eq!(v1, 0);
2281
2282 let v2 = fs
2284 .write_file_holographic("update_test.txt", b"version 2 is longer", Some(v1))
2285 .unwrap();
2286 assert_eq!(v2, 1);
2287
2288 let (content, version) = fs.read_file_holographic("update_test.txt").unwrap();
2290 assert_eq!(&content[..], b"version 2 is longer");
2291 assert_eq!(version, 1);
2292 }
2293
2294 #[test]
2295 fn test_holographic_empty_file() {
2296 let fs = VersionedEmbrFS::new_holographic();
2297
2298 fs.write_file_holographic("empty.txt", b"", None).unwrap();
2299 let (content, _) = fs.read_file_holographic("empty.txt").unwrap();
2300
2301 assert!(content.is_empty());
2302 }
2303
2304 #[test]
2305 fn test_enable_holographic_mode() {
2306 let mut fs = VersionedEmbrFS::new();
2307 assert!(!fs.is_holographic());
2308
2309 fs.enable_holographic_mode();
2310 assert!(fs.is_holographic());
2311 }
2312
2313 #[test]
2314 fn test_read_range_basic() {
2315 let fs = VersionedEmbrFS::new();
2316
2317 let data = b"Hello, World! This is a test file for range queries.";
2319 fs.write_file("range_test.txt", data, None).unwrap();
2320
2321 let (result, _) = fs.read_range("range_test.txt", 0, 5).unwrap();
2323 assert_eq!(&result[..], b"Hello");
2324
2325 let (result, _) = fs.read_range("range_test.txt", 7, 6).unwrap();
2326 assert_eq!(&result[..], b"World!");
2327
2328 let (result, _) = fs.read_range("range_test.txt", 44, 100).unwrap();
2330 assert_eq!(&result[..], b"queries.");
2331
2332 let (result, _) = fs.read_range("range_test.txt", 1000, 10).unwrap();
2334 assert!(result.is_empty());
2335
2336 let (result, _) = fs.read_range("range_test.txt", 0, 0).unwrap();
2338 assert!(result.is_empty());
2339 }
2340
2341 #[test]
2342 fn test_read_range_not_found() {
2343 let fs = VersionedEmbrFS::new();
2344 let result = fs.read_range("nonexistent.txt", 0, 10);
2345 assert!(result.is_err());
2346 }
2347}