1use crate::error::{CoreError, CoreResult, ErrorContext};
12use ::ndarray::{Array, ArrayBase, Dimension, IxDyn, RawData};
13
14#[cfg(feature = "memory_compression")]
15use lz4::{Decoder, EncoderBuilder};
16
17use std::collections::HashMap;
18use std::fs::{File, OpenOptions};
19use std::io::{Read, Seek, SeekFrom, Write};
20use std::path::{Path, PathBuf};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
26pub struct CompressedFileMetadata {
27 pub shape: Vec<usize>,
29
30 pub element_size: usize,
32
33 pub num_elements: usize,
35
36 pub block_size: usize,
38
39 pub num_blocks: usize,
41
42 pub block_offsets: Vec<u64>,
44
45 pub block_compressed_sizes: Vec<usize>,
47
48 pub block_uncompressed_sizes: Vec<usize>,
50
51 pub compression_algorithm: CompressionAlgorithm,
53
54 pub compression_level: i32,
56
57 pub creation_time: chrono::DateTime<chrono::Utc>,
59
60 pub description: Option<String>,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
66pub enum CompressionAlgorithm {
67 #[default]
69 Lz4,
70 Zstd,
72 Snappy,
74}
75
76#[derive(Debug, Clone)]
78pub struct CompressedMemMapBuilder {
79 block_size: usize,
81
82 algorithm: CompressionAlgorithm,
84
85 level: i32,
87
88 cache_size: usize,
90
91 cache_ttl: Option<Duration>,
93
94 description: Option<String>,
96}
97
98impl Default for CompressedMemMapBuilder {
99 fn default() -> Self {
100 Self {
101 block_size: 65536, algorithm: CompressionAlgorithm::Lz4,
103 level: 1, cache_size: 32, cache_ttl: Some(Duration::from_secs(300)), description: None,
107 }
108 }
109}
110
111impl CompressedMemMapBuilder {
112 pub fn new() -> Self {
114 Self::default()
115 }
116
117 pub fn with_block_size(mut self, blocksize: usize) -> Self {
121 self.block_size = blocksize;
122 self
123 }
124
125 pub fn with_algorithm(mut self, algorithm: CompressionAlgorithm) -> Self {
127 self.algorithm = algorithm;
128 self
129 }
130
131 pub fn with_level(mut self, level: i32) -> Self {
139 self.level = level;
140 self
141 }
142
143 pub fn with_cache_size(mut self, cachesize: usize) -> Self {
148 self.cache_size = cachesize;
149 self
150 }
151
152 pub fn with_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
157 self.cache_ttl = ttl;
158 self
159 }
160
161 pub fn with_description(mut self, description: impl Into<String>) -> Self {
163 self.description = Some(description.into());
164 self
165 }
166
167 pub fn create<A, S, D>(
178 &self,
179 data: &ArrayBase<S, D>,
180 path: impl AsRef<Path>,
181 ) -> CoreResult<CompressedMemMappedArray<A>>
182 where
183 A: Clone + Copy + 'static + Send + Sync,
184 S: RawData<Elem = A>,
185 D: Dimension,
186 {
187 let shape = data.shape().to_vec();
189 let num_elements = data.len();
190 let element_size = std::mem::size_of::<A>();
191
192 let block_size = self.block_size.min(num_elements);
194 let num_blocks = num_elements.div_ceil(block_size);
195
196 let mut metadata = CompressedFileMetadata {
198 shape,
199 element_size,
200 num_elements,
201 block_size,
202 num_blocks,
203 block_offsets: Vec::with_capacity(num_blocks),
204 block_compressed_sizes: Vec::with_capacity(num_blocks),
205 block_uncompressed_sizes: Vec::with_capacity(num_blocks),
206 compression_algorithm: self.algorithm,
207 compression_level: self.level,
208 creation_time: chrono::Utc::now(),
209 description: self.description.clone(),
210 };
211
212 let path = path.as_ref();
214 let mut file = OpenOptions::new()
215 .read(true)
216 .write(true)
217 .create(true)
218 .truncate(true)
219 .open(path)?;
220
221 let metadata_placeholder = vec![0u8; 1024];
223 file.write_all(&metadata_placeholder)?;
224
225 let data_ptr = data.as_ptr() as *const u8;
227 let mut current_offset = metadata_placeholder.len() as u64;
228
229 for blockidx in 0..num_blocks {
230 let start_element = blockidx * block_size;
231 let end_element = (start_element + block_size).min(num_elements);
232 let block_elements = end_element - start_element;
233 let uncompressed_size = block_elements * element_size;
234
235 metadata.block_offsets.push(current_offset);
237 metadata.block_uncompressed_sizes.push(uncompressed_size);
238
239 let data_offset = start_element * element_size;
241 let block_data =
242 unsafe { std::slice::from_raw_parts(data_ptr.add(data_offset), uncompressed_size) };
243
244 let compressed_data = match self.algorithm {
246 CompressionAlgorithm::Lz4 => {
247 let mut encoder = EncoderBuilder::new()
248 .level(self.level as u32)
249 .build(Vec::new())?;
250 encoder.write_all(block_data)?;
251 let (compressed, result) = encoder.finish();
252 result?;
253 compressed
254 }
255 CompressionAlgorithm::Zstd => zstd::encode_all(block_data, self.level)?,
256 CompressionAlgorithm::Snappy => snap::raw::Encoder::new()
257 .compress_vec(block_data)
258 .map_err(|e| {
259 CoreError::ComputationError(ErrorContext::new(format!(
260 "Snappy compression error: {}",
261 e
262 )))
263 })?,
264 };
265
266 metadata.block_compressed_sizes.push(compressed_data.len());
268
269 file.write_all(&compressed_data)?;
271
272 current_offset += compressed_data.len() as u64;
274 }
275
276 let metadata_json = serde_json::to_string(&metadata).map_err(|e| {
278 CoreError::ValueError(ErrorContext::new(format!(
279 "Failed to serialize metadata: {}",
280 e
281 )))
282 })?;
283 let mut metadata_bytes = metadata_json.into_bytes();
284
285 if metadata_bytes.len() > metadata_placeholder.len() {
287 return Err(CoreError::ValueError(ErrorContext::new(format!(
288 "Metadata size ({} bytes) exceeds reserved space ({} bytes)",
289 metadata_bytes.len(),
290 metadata_placeholder.len()
291 ))));
292 }
293
294 metadata_bytes.resize(metadata_placeholder.len(), 0);
296
297 file.seek(SeekFrom::Start(0))?;
299 file.write_all(&metadata_bytes)?;
300
301 let compressed_mmap = CompressedMemMappedArray::open_impl(
303 path.to_path_buf(),
304 self.cache_size,
305 self.cache_ttl,
306 )?;
307
308 Ok(compressed_mmap)
309 }
310
311 pub fn create_from_raw<A>(
323 &self,
324 data: &[A],
325 shape: &[usize],
326 path: impl AsRef<Path>,
327 ) -> CoreResult<CompressedMemMappedArray<A>>
328 where
329 A: Clone + Copy + 'static + Send + Sync,
330 {
331 let array = Array::from_shape_vec(IxDyn(shape), data.to_vec())
333 .map_err(|e| CoreError::ShapeError(ErrorContext::new(format!("{e}"))))?;
334
335 self.create(&array, path)
337 }
338}
339
340#[derive(Debug, Clone)]
345pub struct CompressedMemMappedArray<A: Clone + Copy + 'static + Send + Sync> {
346 path: PathBuf,
348
349 metadata: CompressedFileMetadata,
351
352 block_cache: Arc<BlockCache<A>>,
354
355 phantom: std::marker::PhantomData<A>,
357}
358
359impl<A: Clone + Copy + 'static + Send + Sync> CompressedMemMappedArray<A> {
360 pub fn open<P: AsRef<Path>>(path: P) -> CoreResult<Self> {
370 let cache_size = 32;
372 let cache_ttl = Some(Duration::from_secs(300));
373
374 Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
375 }
376
377 pub fn open_with_cache<P: AsRef<Path>>(
389 path: P,
390 cache_size: usize,
391 cache_ttl: Option<Duration>,
392 ) -> CoreResult<Self> {
393 Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
394 }
395
396 fn open_impl(
398 path: PathBuf,
399 cache_size: usize,
400 cache_ttl: Option<Duration>,
401 ) -> CoreResult<Self> {
402 let mut file = File::open(&path)?;
404
405 let mut metadata_bytes = vec![0u8; 1024];
407 file.read_exact(&mut metadata_bytes)?;
408
409 let metadata_json = String::from_utf8_lossy(&metadata_bytes)
412 .trim_end_matches('\0')
413 .to_string();
414 let metadata: CompressedFileMetadata =
415 serde_json::from_str(&metadata_json).map_err(|e| {
416 CoreError::ValueError(ErrorContext::new(format!(
417 "Failed to deserialize metadata: {}",
418 e
419 )))
420 })?;
421
422 let expected_element_size = std::mem::size_of::<A>();
424 if metadata.element_size != expected_element_size {
425 return Err(CoreError::ValueError(ErrorContext::new(format!(
426 "Element _size mismatch: expected {}, got {}",
427 expected_element_size, metadata.element_size
428 ))));
429 }
430
431 let block_cache = Arc::new(BlockCache::new(cache_size, cache_ttl));
433
434 Ok(Self {
436 path,
437 metadata,
438 block_cache,
439 phantom: std::marker::PhantomData,
440 })
441 }
442
443 pub fn shape(&self) -> &[usize] {
445 &self.metadata.shape
446 }
447
448 pub fn size(&self) -> usize {
450 self.metadata.num_elements
451 }
452
453 pub fn ndim(&self) -> usize {
455 self.metadata.shape.len()
456 }
457
458 pub fn metadata(&self) -> &CompressedFileMetadata {
460 &self.metadata
461 }
462
463 pub fn block_size(&self) -> usize {
465 self.metadata.block_size
466 }
467
468 pub fn num_blocks(&self) -> usize {
470 self.metadata.num_blocks
471 }
472
473 pub fn preload_block(&self, blockidx: usize) -> CoreResult<()> {
485 if blockidx >= self.metadata.num_blocks {
486 return Err(CoreError::IndexError(ErrorContext::new(format!(
487 "Block index {} out of bounds (max {})",
488 blockidx,
489 self.metadata.num_blocks - 1
490 ))));
491 }
492
493 if self.block_cache.has_block(blockidx) {
495 return Ok(());
496 }
497
498 let block = self.load_block(blockidx)?;
500
501 self.block_cache.put_block(blockidx, block);
503
504 Ok(())
505 }
506
507 fn load_block(&self, blockidx: usize) -> CoreResult<Vec<A>> {
509 let mut file = File::open(&self.path)?;
511
512 let offset = self.metadata.block_offsets[blockidx];
514 let compressed_size = self.metadata.block_compressed_sizes[blockidx];
515 let uncompressed_size = self.metadata.block_uncompressed_sizes[blockidx];
516
517 file.seek(SeekFrom::Start(offset))?;
519 let mut compressed_data = vec![0u8; compressed_size];
520 file.read_exact(&mut compressed_data)?;
521
522 let block_bytes = match self.metadata.compression_algorithm {
524 CompressionAlgorithm::Lz4 => {
525 let mut decoder = Decoder::new(&compressed_data[..])?;
526 let mut decompressed = Vec::with_capacity(uncompressed_size);
527 decoder.read_to_end(&mut decompressed)?;
528 decompressed
529 }
530 CompressionAlgorithm::Zstd => zstd::decode_all(&compressed_data[..])?,
531 CompressionAlgorithm::Snappy => snap::raw::Decoder::new()
532 .decompress_vec(&compressed_data)
533 .map_err(|e| {
534 CoreError::ComputationError(ErrorContext::new(format!(
535 "Snappy decompression error: {}",
536 e
537 )))
538 })?,
539 };
540
541 if block_bytes.len() != uncompressed_size {
543 return Err(CoreError::ValueError(ErrorContext::new(format!(
544 "Block {} decompressed to {} bytes, expected {}",
545 blockidx,
546 block_bytes.len(),
547 uncompressed_size
548 ))));
549 }
550
551 let num_elements = uncompressed_size / std::mem::size_of::<A>();
553 let mut elements = Vec::with_capacity(num_elements);
554
555 for chunk in block_bytes.chunks_exact(std::mem::size_of::<A>()) {
557 let element = unsafe { *(chunk.as_ptr() as *const A) };
558 elements.push(element);
559 }
560
561 Ok(elements)
562 }
563
564 pub fn readonly_array(&self) -> CoreResult<Array<A, IxDyn>> {
572 let mut result = Array::from_elem(IxDyn(&self.metadata.shape), unsafe {
574 std::mem::zeroed::<A>()
575 });
576
577 let mut offset = 0;
579 for blockidx in 0..self.metadata.num_blocks {
580 let block = match self.block_cache.get_block(blockidx) {
582 Some(block) => block,
583 None => {
584 let block = self.load_block(blockidx)?;
585 self.block_cache.put_block(blockidx, block.clone());
586 block
587 }
588 };
589
590 let start = offset;
592 let end = (start + block.len()).min(self.metadata.num_elements);
593 let slice = &mut result.as_slice_mut().expect("Operation failed")[start..end];
594 slice.copy_from_slice(&block[..(end - start)]);
595
596 offset = end;
598 }
599
600 Ok(result)
601 }
602
603 pub fn get(&self, indices: &[usize]) -> CoreResult<A> {
615 if indices.len() != self.metadata.shape.len() {
617 return Err(CoreError::DimensionError(ErrorContext::new(format!(
618 "Expected {} indices, got {}",
619 self.metadata.shape.len(),
620 indices.len()
621 ))));
622 }
623
624 for (dim, &idx) in indices.iter().enumerate() {
625 if idx >= self.metadata.shape[dim] {
626 return Err(CoreError::IndexError(ErrorContext::new(format!(
627 "Index {} out of bounds for dimension {} (max {})",
628 idx,
629 dim,
630 self.metadata.shape[dim] - 1
631 ))));
632 }
633 }
634
635 let mut flat_index = 0;
637 let mut stride = 1;
638 for i in (0..indices.len()).rev() {
639 flat_index += indices[i] * stride;
640 if i > 0 {
641 stride *= self.metadata.shape[i];
642 }
643 }
644
645 let blockidx = flat_index / self.metadata.block_size;
647 let block_offset = flat_index % self.metadata.block_size;
648
649 let block = match self.block_cache.get_block(blockidx) {
651 Some(block) => block,
652 None => {
653 let block = self.load_block(blockidx)?;
654 self.block_cache.put_block(blockidx, block.clone());
655 block
656 }
657 };
658
659 if block_offset < block.len() {
661 Ok(block[block_offset])
662 } else {
663 Err(CoreError::IndexError(ErrorContext::new(format!(
664 "Block offset {} out of bounds for block {} (max {})",
665 block_offset,
666 blockidx,
667 block.len() - 1
668 ))))
669 }
670 }
671
672 pub fn slice(&self, ranges: &[(usize, usize)]) -> CoreResult<Array<A, IxDyn>> {
684 if ranges.len() != self.metadata.shape.len() {
686 return Err(CoreError::DimensionError(ErrorContext::new(format!(
687 "Expected {} ranges, got {}",
688 self.metadata.shape.len(),
689 ranges.len()
690 ))));
691 }
692
693 let mut resultshape = Vec::with_capacity(ranges.len());
695 for (dim, &(start, end)) in ranges.iter().enumerate() {
696 if start >= end {
697 return Err(CoreError::ValueError(ErrorContext::new(format!(
698 "Invalid range for dimension {}: {}..{}",
699 dim, start, end
700 ))));
701 }
702 if end > self.metadata.shape[dim] {
703 return Err(CoreError::IndexError(ErrorContext::new(format!(
704 "Range {}..{} out of bounds for dimension {} (max {})",
705 start, end, dim, self.metadata.shape[dim]
706 ))));
707 }
708 resultshape.push(end - start);
709 }
710
711 let mut result = Array::from_elem(IxDyn(&resultshape), unsafe { std::mem::zeroed::<A>() });
713
714 let result_size = resultshape.iter().product::<usize>();
716
717 let mut result_indices = vec![0; ranges.len()];
719 let mut source_indices = Vec::with_capacity(ranges.len());
720 for &(start, _) in ranges.iter() {
721 source_indices.push(start);
722 }
723
724 for _result_flat_idx in 0..result_size {
726 let mut source_flat_idx = 0;
728 let mut stride = 1;
729 for i in (0..source_indices.len()).rev() {
730 source_flat_idx += source_indices[i] * stride;
731 if i > 0 {
732 stride *= self.metadata.shape[i];
733 }
734 }
735
736 let blockidx = source_flat_idx / self.metadata.block_size;
738 let block_offset = source_flat_idx % self.metadata.block_size;
739
740 let block = match self.block_cache.get_block(blockidx) {
742 Some(block) => block,
743 None => {
744 let block = self.load_block(blockidx)?;
745 self.block_cache.put_block(blockidx, block.clone());
746 block
747 }
748 };
749
750 if block_offset < block.len() {
752 let mut result_stride = 1;
754 let mut result_flat_idx = 0;
755 for i in (0..result_indices.len()).rev() {
756 result_flat_idx += result_indices[i] * result_stride;
757 if i > 0 {
758 result_stride *= resultshape[i];
759 }
760 }
761
762 let result_slice = result.as_slice_mut().expect("Operation failed");
764 result_slice[result_flat_idx] = block[block_offset];
765 }
766
767 for i in (0..ranges.len()).rev() {
769 result_indices[i] += 1;
770 source_indices[i] += 1;
771 if result_indices[i] < resultshape[i] {
772 break;
773 }
774 result_indices[i] = 0;
775 source_indices[i] = ranges[i].0;
776 }
777 }
778
779 Ok(result)
780 }
781
782 pub fn process_blocks<F, R>(&self, f: F) -> CoreResult<Vec<R>>
794 where
795 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
796 R: Send + 'static,
797 {
798 self.process_blocks_internal(f, false, None)
799 }
800
801 pub fn process_blocks_with_size<F, R>(&self, blocksize: usize, f: F) -> CoreResult<Vec<R>>
812 where
813 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
814 R: Send + 'static,
815 {
816 self.process_blocks_internal(f, false, Some(blocksize))
817 }
818
819 #[cfg(feature = "parallel")]
829 pub fn process_blocks_parallel<F, R>(&self, f: F) -> CoreResult<Vec<R>>
830 where
831 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
832 R: Send + 'static,
833 {
834 self.process_blocks_internal(f, true, None)
835 }
836
837 #[cfg(feature = "parallel")]
848 pub fn process_blocks_parallel_with_size<F, R>(
849 &self,
850 block_size: usize,
851 f: F,
852 ) -> CoreResult<Vec<R>>
853 where
854 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
855 R: Send + 'static,
856 {
857 self.process_blocks_internal(f, true, Some(block_size))
858 }
859
860 #[cfg(not(feature = "parallel"))]
862 fn process_blocks_internal<F, R>(
863 &self,
864 mut f: F,
865 _parallel: bool,
866 custom_block_size: Option<usize>,
867 ) -> CoreResult<Vec<R>>
868 where
869 F: FnMut(&[A], usize) -> R,
870 {
871 let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
873 let num_elements = self.metadata.num_elements;
874 let num_blocks = num_elements.div_ceil(block_size);
875
876 let mut results = Vec::with_capacity(num_blocks);
878 for blockidx in 0..num_blocks {
879 let start = blockidx * block_size;
880 let end = (start + block_size).min(num_elements);
881 let elements = self.load_elements(start, end)?;
882 results.push(f(&elements, blockidx));
883 }
884 Ok(results)
885 }
886
887 #[cfg(feature = "parallel")]
889 fn process_blocks_internal<F, R>(
890 &self,
891 f: F,
892 parallel: bool,
893 custom_block_size: Option<usize>,
894 ) -> CoreResult<Vec<R>>
895 where
896 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
897 R: Send + 'static,
898 {
899 let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
901 let num_elements = self.metadata.num_elements;
902 let num_blocks = num_elements.div_ceil(block_size);
903
904 if parallel {
906 use crate::parallel_ops::*;
907
908 return (0..num_blocks)
909 .into_par_iter()
910 .map(|blockidx| {
911 let start = blockidx * block_size;
913 let end = (start + block_size).min(num_elements);
914
915 let elements = match self.load_elements(start, end) {
917 Ok(elems) => elems,
918 Err(e) => return Err(e),
919 };
920
921 Ok(f(&elements, blockidx))
923 })
924 .collect::<Result<Vec<R>, CoreError>>();
925 }
926
927 (0..num_blocks)
929 .map(|blockidx| {
930 let start = blockidx * block_size;
932 let end = (start + block_size).min(num_elements);
933
934 let elements = self.load_elements(start, end)?;
936
937 Ok(f(&elements, blockidx))
939 })
940 .collect::<Result<Vec<R>, CoreError>>()
941 }
942
943 fn load_elements(&self, start: usize, end: usize) -> CoreResult<Vec<A>> {
956 if start >= self.metadata.num_elements {
958 return Err(CoreError::IndexError(ErrorContext::new(format!(
959 "Start index {} out of bounds (max {})",
960 start,
961 self.metadata.num_elements - 1
962 ))));
963 }
964 if end > self.metadata.num_elements {
965 return Err(CoreError::IndexError(ErrorContext::new(format!(
966 "End index {} out of bounds (max {})",
967 end, self.metadata.num_elements
968 ))));
969 }
970 if start >= end {
971 return Ok(Vec::new());
972 }
973
974 let start_block = start / self.metadata.block_size;
976 let end_block = (end - 1) / self.metadata.block_size;
977
978 let mut result = Vec::with_capacity(end - start);
980
981 for blockidx in start_block..=end_block {
983 let block = match self.block_cache.get_block(blockidx) {
985 Some(block) => block,
986 None => {
987 let block = self.load_block(blockidx)?;
988 self.block_cache.put_block(blockidx, block.clone());
989 block
990 }
991 };
992
993 let block_start = blockidx * self.metadata.block_size;
995 let block_end = block_start + block.len();
996
997 let range_start = start.max(block_start) - block_start;
998 let range_end = end.min(block_end) - block_start;
999
1000 result.extend_from_slice(&block[range_start..range_end]);
1002 }
1003
1004 Ok(result)
1005 }
1006}
1007
1008#[derive(Debug)]
1012struct BlockCache<A: Clone + Copy + 'static + Send + Sync> {
1013 capacity: usize,
1015
1016 ttl: Option<Duration>,
1018
1019 cache: RwLock<HashMap<usize, CachedBlock<A>>>,
1021}
1022
1023#[derive(Debug, Clone)]
1025struct CachedBlock<A: Clone + Copy + 'static + Send + Sync> {
1026 data: Vec<A>,
1028
1029 timestamp: Instant,
1031}
1032
1033impl<A: Clone + Copy + 'static + Send + Sync> BlockCache<A> {
1034 fn new(capacity: usize, ttl: Option<Duration>) -> Self {
1041 Self {
1042 capacity,
1043 ttl,
1044 cache: RwLock::new(HashMap::new()),
1045 }
1046 }
1047
1048 fn has_block(&self, blockidx: usize) -> bool {
1058 let cache = self.cache.read().expect("Operation failed");
1059
1060 if let Some(cached) = cache.get(&blockidx) {
1062 if let Some(ttl) = self.ttl {
1064 if cached.timestamp.elapsed() > ttl {
1065 return false;
1066 }
1067 }
1068
1069 true
1070 } else {
1071 false
1072 }
1073 }
1074
1075 fn get_block(&self, blockidx: usize) -> Option<Vec<A>> {
1085 let mut cache = self.cache.write().expect("Operation failed");
1086
1087 if let Some(mut cached) = cache.remove(&blockidx) {
1089 if let Some(ttl) = self.ttl {
1091 if cached.timestamp.elapsed() > ttl {
1092 return None;
1093 }
1094 }
1095
1096 cached.timestamp = Instant::now();
1098
1099 let data = cached.data.clone();
1101 cache.insert(blockidx, cached);
1102
1103 Some(data)
1104 } else {
1105 None
1106 }
1107 }
1108
1109 fn put_block(&self, blockidx: usize, block: Vec<A>) {
1116 let mut cache = self.cache.write().expect("Operation failed");
1117
1118 if cache.len() >= self.capacity && !cache.contains_key(&blockidx) {
1120 if let Some(lru_idx) = cache
1122 .iter()
1123 .min_by_key(|(_, cached)| cached.timestamp)
1124 .map(|(idx, _)| *idx)
1125 {
1126 cache.remove(&lru_idx);
1127 }
1128 }
1129
1130 cache.insert(
1132 blockidx,
1133 CachedBlock {
1134 data: block,
1135 timestamp: Instant::now(),
1136 },
1137 );
1138 }
1139
1140 #[allow(dead_code)]
1142 fn clear(&self) {
1143 let mut cache = self.cache.write().expect("Operation failed");
1144 cache.clear();
1145 }
1146
1147 #[allow(dead_code)]
1149 fn len(&self) -> usize {
1150 let cache = self.cache.read().expect("Operation failed");
1151 cache.len()
1152 }
1153
1154 #[allow(dead_code)]
1156 fn is_empty(&self) -> bool {
1157 let cache = self.cache.read().expect("Operation failed");
1158 cache.is_empty()
1159 }
1160}
1161
1162#[cfg(test)]
1163mod tests {
1164 use super::*;
1165 use ::ndarray::Array2;
1166 use tempfile::tempdir;
1167
1168 #[test]
1169 fn test_compressed_memmapped_array_1d() {
1170 let dir = tempdir().expect("Operation failed");
1172 let file_path = dir.path().join("test_compressed_1d.cmm");
1173
1174 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1176
1177 let builder = CompressedMemMapBuilder::new()
1179 .with_block_size(100)
1180 .with_algorithm(CompressionAlgorithm::Lz4)
1181 .with_level(1)
1182 .with_cache_size(4)
1183 .with_description("Test 1D array");
1184
1185 let cmm = builder
1187 .create_from_raw(&data, &[1000], &file_path)
1188 .expect("Operation failed");
1189
1190 assert_eq!(cmm.shape(), &[1000]);
1192 assert_eq!(cmm.size(), 1000);
1193 assert_eq!(cmm.ndim(), 1);
1194
1195 for i in 0..10 {
1197 let val = cmm.get(&[i * 100]).expect("Operation failed");
1198 assert_eq!(val, (i * 100) as f64);
1199 }
1200
1201 let slice = cmm.slice(&[(200, 300)]).expect("Operation failed");
1203 assert_eq!(slice.shape(), &[100]);
1204 for i in 0..100 {
1205 assert_eq!(slice[crate::ndarray::IxDyn(&[i])], (i + 200) as f64);
1206 }
1207
1208 let sums = cmm
1210 .process_blocks(|block, _| block.iter().sum::<f64>())
1211 .expect("Test: operation failed");
1212
1213 assert_eq!(sums.len(), 10); let array = cmm.readonly_array().expect("Operation failed");
1217 assert_eq!(array.shape(), &[1000]);
1218 for i in 0..1000 {
1219 assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1220 }
1221 }
1222
1223 #[test]
1224 fn test_compressed_memmapped_array_2d() {
1225 let dir = tempdir().expect("Operation failed");
1227 let file_path = dir.path().join("test_compressed_2d.cmm");
1228
1229 let data = Array2::<f64>::from_shape_fn((10, 10), |(i, j)| (i * 10 + j) as f64);
1231
1232 let builder = CompressedMemMapBuilder::new()
1234 .with_block_size(25) .with_algorithm(CompressionAlgorithm::Lz4)
1236 .with_level(1)
1237 .with_cache_size(4)
1238 .with_description("Test 2D array");
1239
1240 let cmm = builder.create(&data, &file_path).expect("Operation failed");
1242
1243 assert_eq!(cmm.shape(), &[10, 10]);
1245 assert_eq!(cmm.size(), 100);
1246 assert_eq!(cmm.ndim(), 2);
1247
1248 for i in 0..10 {
1250 for j in 0..10 {
1251 let val = cmm.get(&[i, j]).expect("Operation failed");
1252 assert_eq!(val, (i * 10 + j) as f64);
1253 }
1254 }
1255
1256 let slice = cmm.slice(&[(2, 5), (3, 7)]).expect("Operation failed");
1258 assert_eq!(slice.shape(), &[3, 4]);
1259 for i in 0..3 {
1260 for j in 0..4 {
1261 assert_eq!(
1262 slice[crate::ndarray::IxDyn(&[i, j])],
1263 ((i + 2) * 10 + (j + 3)) as f64
1264 );
1265 }
1266 }
1267
1268 let array = cmm.readonly_array().expect("Operation failed");
1270 assert_eq!(array.shape(), &[10, 10]);
1271 for i in 0..10 {
1272 for j in 0..10 {
1273 assert_eq!(array[crate::ndarray::IxDyn(&[i, j])], (i * 10 + j) as f64);
1274 }
1275 }
1276 }
1277
1278 #[test]
1279 fn test_different_compression_algorithms() {
1280 let dir = tempdir().expect("Operation failed");
1282
1283 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1285
1286 for algorithm in &[
1288 CompressionAlgorithm::Lz4,
1289 CompressionAlgorithm::Zstd,
1290 CompressionAlgorithm::Snappy,
1291 ] {
1292 let file_path = dir.path().join(format!("test_{:?}.cmm", algorithm));
1293
1294 let builder = CompressedMemMapBuilder::new()
1296 .with_block_size(100)
1297 .with_algorithm(*algorithm)
1298 .with_level(1)
1299 .with_cache_size(4);
1300
1301 let cmm = builder
1303 .create_from_raw(&data, &[1000], &file_path)
1304 .expect("Operation failed");
1305
1306 let array = cmm.readonly_array().expect("Operation failed");
1308 for i in 0..1000 {
1309 assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1310 }
1311 }
1312 }
1313
1314 #[test]
1315 fn test_block_cache() {
1316 let dir = tempdir().expect("Operation failed");
1318 let file_path = dir.path().join("test_cache.cmm");
1319
1320 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1322
1323 let small_cache = CompressedMemMapBuilder::new()
1325 .with_block_size(100)
1326 .with_cache_size(2) .create_from_raw(&data, &[1000], &file_path).expect("Operation failed");
1328
1329 for i in 0..10 {
1332 small_cache.preload_block(i).expect("Operation failed");
1333 }
1334
1335 assert_eq!(small_cache.block_cache.len(), 2);
1337
1338 let val = small_cache.get(&[0]).expect("Operation failed"); assert_eq!(val, 0.0);
1342
1343 assert!(small_cache.block_cache.has_block(0));
1345 }
1346
1347 #[test]
1348 fn test_block_preloading() {
1349 let dir = tempdir().expect("Operation failed");
1351 let file_path = dir.path().join("test_preload.cmm");
1352
1353 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1355
1356 let cmm = CompressedMemMapBuilder::new()
1358 .with_block_size(100)
1359 .create_from_raw(&data, &[1000], &file_path)
1360 .expect("Test: operation failed");
1361
1362 cmm.preload_block(5).expect("Operation failed");
1364
1365 assert!(cmm.block_cache.has_block(5));
1367
1368 let val = cmm.get(&[550]).expect("Operation failed"); assert_eq!(val, 550.0);
1371 }
1372}