1use crate::error::{CoreError, CoreResult, ErrorContext};
12use ::ndarray::{Array, ArrayBase, Dimension, IxDyn, RawData};
13
14#[cfg(feature = "memory_compression")]
15use oxiarc_lz4;
16#[cfg(feature = "memory_compression")]
17use oxiarc_zstd;
18
19use std::collections::HashMap;
20use std::fs::{File, OpenOptions};
21use std::io::{Read, Seek, SeekFrom, Write};
22use std::path::{Path, PathBuf};
23use std::sync::{Arc, RwLock};
24use std::time::{Duration, Instant};
25
26#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
28pub struct CompressedFileMetadata {
29 pub shape: Vec<usize>,
31
32 pub element_size: usize,
34
35 pub num_elements: usize,
37
38 pub block_size: usize,
40
41 pub num_blocks: usize,
43
44 pub block_offsets: Vec<u64>,
46
47 pub block_compressed_sizes: Vec<usize>,
49
50 pub block_uncompressed_sizes: Vec<usize>,
52
53 pub compression_algorithm: CompressionAlgorithm,
55
56 pub compression_level: i32,
58
59 pub creation_time: chrono::DateTime<chrono::Utc>,
61
62 pub description: Option<String>,
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
68pub enum CompressionAlgorithm {
69 #[default]
71 Lz4,
72 Zstd,
74 Snappy,
76}
77
78#[derive(Debug, Clone)]
80pub struct CompressedMemMapBuilder {
81 block_size: usize,
83
84 algorithm: CompressionAlgorithm,
86
87 level: i32,
89
90 cache_size: usize,
92
93 cache_ttl: Option<Duration>,
95
96 description: Option<String>,
98}
99
100impl Default for CompressedMemMapBuilder {
101 fn default() -> Self {
102 Self {
103 block_size: 65536, algorithm: CompressionAlgorithm::Lz4,
105 level: 1, cache_size: 32, cache_ttl: Some(Duration::from_secs(300)), description: None,
109 }
110 }
111}
112
113impl CompressedMemMapBuilder {
114 pub fn new() -> Self {
116 Self::default()
117 }
118
119 pub fn with_block_size(mut self, blocksize: usize) -> Self {
123 self.block_size = blocksize;
124 self
125 }
126
127 pub fn with_algorithm(mut self, algorithm: CompressionAlgorithm) -> Self {
129 self.algorithm = algorithm;
130 self
131 }
132
133 pub fn with_level(mut self, level: i32) -> Self {
141 self.level = level;
142 self
143 }
144
145 pub fn with_cache_size(mut self, cachesize: usize) -> Self {
150 self.cache_size = cachesize;
151 self
152 }
153
154 pub fn with_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
159 self.cache_ttl = ttl;
160 self
161 }
162
163 pub fn with_description(mut self, description: impl Into<String>) -> Self {
165 self.description = Some(description.into());
166 self
167 }
168
169 pub fn create<A, S, D>(
180 &self,
181 data: &ArrayBase<S, D>,
182 path: impl AsRef<Path>,
183 ) -> CoreResult<CompressedMemMappedArray<A>>
184 where
185 A: Clone + Copy + 'static + Send + Sync,
186 S: RawData<Elem = A>,
187 D: Dimension,
188 {
189 let shape = data.shape().to_vec();
191 let num_elements = data.len();
192 let element_size = std::mem::size_of::<A>();
193
194 let block_size = self.block_size.min(num_elements);
196 let num_blocks = num_elements.div_ceil(block_size);
197
198 let mut metadata = CompressedFileMetadata {
200 shape,
201 element_size,
202 num_elements,
203 block_size,
204 num_blocks,
205 block_offsets: Vec::with_capacity(num_blocks),
206 block_compressed_sizes: Vec::with_capacity(num_blocks),
207 block_uncompressed_sizes: Vec::with_capacity(num_blocks),
208 compression_algorithm: self.algorithm,
209 compression_level: self.level,
210 creation_time: chrono::Utc::now(),
211 description: self.description.clone(),
212 };
213
214 let path = path.as_ref();
216 let mut file = OpenOptions::new()
217 .read(true)
218 .write(true)
219 .create(true)
220 .truncate(true)
221 .open(path)?;
222
223 let metadata_placeholder = vec![0u8; 1024];
225 file.write_all(&metadata_placeholder)?;
226
227 let data_ptr = data.as_ptr() as *const u8;
229 let mut current_offset = metadata_placeholder.len() as u64;
230
231 for blockidx in 0..num_blocks {
232 let start_element = blockidx * block_size;
233 let end_element = (start_element + block_size).min(num_elements);
234 let block_elements = end_element - start_element;
235 let uncompressed_size = block_elements * element_size;
236
237 metadata.block_offsets.push(current_offset);
239 metadata.block_uncompressed_sizes.push(uncompressed_size);
240
241 let data_offset = start_element * element_size;
243 let block_data =
244 unsafe { std::slice::from_raw_parts(data_ptr.add(data_offset), uncompressed_size) };
245
246 let compressed_data = match self.algorithm {
248 CompressionAlgorithm::Lz4 => oxiarc_lz4::compress(block_data).map_err(|e| {
249 CoreError::ComputationError(ErrorContext::new(format!(
250 "LZ4 compression error: {}",
251 e
252 )))
253 })?,
254 CompressionAlgorithm::Zstd => {
255 oxiarc_zstd::compress_with_level(block_data, self.level).map_err(|e| {
256 CoreError::ComputationError(ErrorContext::new(format!(
257 "Zstd compression error: {}",
258 e
259 )))
260 })?
261 }
262 CompressionAlgorithm::Snappy => snap::raw::Encoder::new()
263 .compress_vec(block_data)
264 .map_err(|e| {
265 CoreError::ComputationError(ErrorContext::new(format!(
266 "Snappy compression error: {}",
267 e
268 )))
269 })?,
270 };
271
272 metadata.block_compressed_sizes.push(compressed_data.len());
274
275 file.write_all(&compressed_data)?;
277
278 current_offset += compressed_data.len() as u64;
280 }
281
282 let metadata_json = serde_json::to_string(&metadata).map_err(|e| {
284 CoreError::ValueError(ErrorContext::new(format!(
285 "Failed to serialize metadata: {}",
286 e
287 )))
288 })?;
289 let mut metadata_bytes = metadata_json.into_bytes();
290
291 if metadata_bytes.len() > metadata_placeholder.len() {
293 return Err(CoreError::ValueError(ErrorContext::new(format!(
294 "Metadata size ({} bytes) exceeds reserved space ({} bytes)",
295 metadata_bytes.len(),
296 metadata_placeholder.len()
297 ))));
298 }
299
300 metadata_bytes.resize(metadata_placeholder.len(), 0);
302
303 file.seek(SeekFrom::Start(0))?;
305 file.write_all(&metadata_bytes)?;
306
307 let compressed_mmap = CompressedMemMappedArray::open_impl(
309 path.to_path_buf(),
310 self.cache_size,
311 self.cache_ttl,
312 )?;
313
314 Ok(compressed_mmap)
315 }
316
317 pub fn create_from_raw<A>(
329 &self,
330 data: &[A],
331 shape: &[usize],
332 path: impl AsRef<Path>,
333 ) -> CoreResult<CompressedMemMappedArray<A>>
334 where
335 A: Clone + Copy + 'static + Send + Sync,
336 {
337 let array = Array::from_shape_vec(IxDyn(shape), data.to_vec())
339 .map_err(|e| CoreError::ShapeError(ErrorContext::new(format!("{e}"))))?;
340
341 self.create(&array, path)
343 }
344}
345
346#[derive(Debug, Clone)]
351pub struct CompressedMemMappedArray<A: Clone + Copy + 'static + Send + Sync> {
352 path: PathBuf,
354
355 metadata: CompressedFileMetadata,
357
358 block_cache: Arc<BlockCache<A>>,
360
361 phantom: std::marker::PhantomData<A>,
363}
364
365impl<A: Clone + Copy + 'static + Send + Sync> CompressedMemMappedArray<A> {
366 pub fn open<P: AsRef<Path>>(path: P) -> CoreResult<Self> {
376 let cache_size = 32;
378 let cache_ttl = Some(Duration::from_secs(300));
379
380 Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
381 }
382
383 pub fn open_with_cache<P: AsRef<Path>>(
395 path: P,
396 cache_size: usize,
397 cache_ttl: Option<Duration>,
398 ) -> CoreResult<Self> {
399 Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
400 }
401
402 fn open_impl(
404 path: PathBuf,
405 cache_size: usize,
406 cache_ttl: Option<Duration>,
407 ) -> CoreResult<Self> {
408 let mut file = File::open(&path)?;
410
411 let mut metadata_bytes = vec![0u8; 1024];
413 file.read_exact(&mut metadata_bytes)?;
414
415 let metadata_json = String::from_utf8_lossy(&metadata_bytes)
418 .trim_end_matches('\0')
419 .to_string();
420 let metadata: CompressedFileMetadata =
421 serde_json::from_str(&metadata_json).map_err(|e| {
422 CoreError::ValueError(ErrorContext::new(format!(
423 "Failed to deserialize metadata: {}",
424 e
425 )))
426 })?;
427
428 let expected_element_size = std::mem::size_of::<A>();
430 if metadata.element_size != expected_element_size {
431 return Err(CoreError::ValueError(ErrorContext::new(format!(
432 "Element _size mismatch: expected {}, got {}",
433 expected_element_size, metadata.element_size
434 ))));
435 }
436
437 let block_cache = Arc::new(BlockCache::new(cache_size, cache_ttl));
439
440 Ok(Self {
442 path,
443 metadata,
444 block_cache,
445 phantom: std::marker::PhantomData,
446 })
447 }
448
449 pub fn shape(&self) -> &[usize] {
451 &self.metadata.shape
452 }
453
454 pub fn size(&self) -> usize {
456 self.metadata.num_elements
457 }
458
459 pub fn ndim(&self) -> usize {
461 self.metadata.shape.len()
462 }
463
464 pub fn metadata(&self) -> &CompressedFileMetadata {
466 &self.metadata
467 }
468
469 pub fn block_size(&self) -> usize {
471 self.metadata.block_size
472 }
473
474 pub fn num_blocks(&self) -> usize {
476 self.metadata.num_blocks
477 }
478
479 pub fn preload_block(&self, blockidx: usize) -> CoreResult<()> {
491 if blockidx >= self.metadata.num_blocks {
492 return Err(CoreError::IndexError(ErrorContext::new(format!(
493 "Block index {} out of bounds (max {})",
494 blockidx,
495 self.metadata.num_blocks - 1
496 ))));
497 }
498
499 if self.block_cache.has_block(blockidx) {
501 return Ok(());
502 }
503
504 let block = self.load_block(blockidx)?;
506
507 self.block_cache.put_block(blockidx, block);
509
510 Ok(())
511 }
512
513 fn load_block(&self, blockidx: usize) -> CoreResult<Vec<A>> {
515 let mut file = File::open(&self.path)?;
517
518 let offset = self.metadata.block_offsets[blockidx];
520 let compressed_size = self.metadata.block_compressed_sizes[blockidx];
521 let uncompressed_size = self.metadata.block_uncompressed_sizes[blockidx];
522
523 file.seek(SeekFrom::Start(offset))?;
525 let mut compressed_data = vec![0u8; compressed_size];
526 file.read_exact(&mut compressed_data)?;
527
528 let block_bytes = match self.metadata.compression_algorithm {
530 CompressionAlgorithm::Lz4 => {
531 oxiarc_lz4::decompress(&compressed_data, uncompressed_size).map_err(|e| {
532 CoreError::ComputationError(ErrorContext::new(format!(
533 "LZ4 decompression error: {}",
534 e
535 )))
536 })?
537 }
538 CompressionAlgorithm::Zstd => {
539 oxiarc_zstd::decompress(&compressed_data).map_err(|e| {
540 CoreError::ComputationError(ErrorContext::new(format!(
541 "Zstd decompression error: {}",
542 e
543 )))
544 })?
545 }
546 CompressionAlgorithm::Snappy => snap::raw::Decoder::new()
547 .decompress_vec(&compressed_data)
548 .map_err(|e| {
549 CoreError::ComputationError(ErrorContext::new(format!(
550 "Snappy decompression error: {}",
551 e
552 )))
553 })?,
554 };
555
556 if block_bytes.len() != uncompressed_size {
558 return Err(CoreError::ValueError(ErrorContext::new(format!(
559 "Block {} decompressed to {} bytes, expected {}",
560 blockidx,
561 block_bytes.len(),
562 uncompressed_size
563 ))));
564 }
565
566 let num_elements = uncompressed_size / std::mem::size_of::<A>();
568 let mut elements = Vec::with_capacity(num_elements);
569
570 for chunk in block_bytes.chunks_exact(std::mem::size_of::<A>()) {
572 let element = unsafe { *(chunk.as_ptr() as *const A) };
573 elements.push(element);
574 }
575
576 Ok(elements)
577 }
578
579 pub fn readonly_array(&self) -> CoreResult<Array<A, IxDyn>> {
587 let mut result = Array::from_elem(IxDyn(&self.metadata.shape), unsafe {
589 std::mem::zeroed::<A>()
590 });
591
592 let mut offset = 0;
594 for blockidx in 0..self.metadata.num_blocks {
595 let block = match self.block_cache.get_block(blockidx) {
597 Some(block) => block,
598 None => {
599 let block = self.load_block(blockidx)?;
600 self.block_cache.put_block(blockidx, block.clone());
601 block
602 }
603 };
604
605 let start = offset;
607 let end = (start + block.len()).min(self.metadata.num_elements);
608 let slice = &mut result.as_slice_mut().expect("Operation failed")[start..end];
609 slice.copy_from_slice(&block[..(end - start)]);
610
611 offset = end;
613 }
614
615 Ok(result)
616 }
617
618 pub fn get(&self, indices: &[usize]) -> CoreResult<A> {
630 if indices.len() != self.metadata.shape.len() {
632 return Err(CoreError::DimensionError(ErrorContext::new(format!(
633 "Expected {} indices, got {}",
634 self.metadata.shape.len(),
635 indices.len()
636 ))));
637 }
638
639 for (dim, &idx) in indices.iter().enumerate() {
640 if idx >= self.metadata.shape[dim] {
641 return Err(CoreError::IndexError(ErrorContext::new(format!(
642 "Index {} out of bounds for dimension {} (max {})",
643 idx,
644 dim,
645 self.metadata.shape[dim] - 1
646 ))));
647 }
648 }
649
650 let mut flat_index = 0;
652 let mut stride = 1;
653 for i in (0..indices.len()).rev() {
654 flat_index += indices[i] * stride;
655 if i > 0 {
656 stride *= self.metadata.shape[i];
657 }
658 }
659
660 let blockidx = flat_index / self.metadata.block_size;
662 let block_offset = flat_index % self.metadata.block_size;
663
664 let block = match self.block_cache.get_block(blockidx) {
666 Some(block) => block,
667 None => {
668 let block = self.load_block(blockidx)?;
669 self.block_cache.put_block(blockidx, block.clone());
670 block
671 }
672 };
673
674 if block_offset < block.len() {
676 Ok(block[block_offset])
677 } else {
678 Err(CoreError::IndexError(ErrorContext::new(format!(
679 "Block offset {} out of bounds for block {} (max {})",
680 block_offset,
681 blockidx,
682 block.len() - 1
683 ))))
684 }
685 }
686
687 pub fn slice(&self, ranges: &[(usize, usize)]) -> CoreResult<Array<A, IxDyn>> {
699 if ranges.len() != self.metadata.shape.len() {
701 return Err(CoreError::DimensionError(ErrorContext::new(format!(
702 "Expected {} ranges, got {}",
703 self.metadata.shape.len(),
704 ranges.len()
705 ))));
706 }
707
708 let mut resultshape = Vec::with_capacity(ranges.len());
710 for (dim, &(start, end)) in ranges.iter().enumerate() {
711 if start >= end {
712 return Err(CoreError::ValueError(ErrorContext::new(format!(
713 "Invalid range for dimension {}: {}..{}",
714 dim, start, end
715 ))));
716 }
717 if end > self.metadata.shape[dim] {
718 return Err(CoreError::IndexError(ErrorContext::new(format!(
719 "Range {}..{} out of bounds for dimension {} (max {})",
720 start, end, dim, self.metadata.shape[dim]
721 ))));
722 }
723 resultshape.push(end - start);
724 }
725
726 let mut result = Array::from_elem(IxDyn(&resultshape), unsafe { std::mem::zeroed::<A>() });
728
729 let result_size = resultshape.iter().product::<usize>();
731
732 let mut result_indices = vec![0; ranges.len()];
734 let mut source_indices = Vec::with_capacity(ranges.len());
735 for &(start, _) in ranges.iter() {
736 source_indices.push(start);
737 }
738
739 for _result_flat_idx in 0..result_size {
741 let mut source_flat_idx = 0;
743 let mut stride = 1;
744 for i in (0..source_indices.len()).rev() {
745 source_flat_idx += source_indices[i] * stride;
746 if i > 0 {
747 stride *= self.metadata.shape[i];
748 }
749 }
750
751 let blockidx = source_flat_idx / self.metadata.block_size;
753 let block_offset = source_flat_idx % self.metadata.block_size;
754
755 let block = match self.block_cache.get_block(blockidx) {
757 Some(block) => block,
758 None => {
759 let block = self.load_block(blockidx)?;
760 self.block_cache.put_block(blockidx, block.clone());
761 block
762 }
763 };
764
765 if block_offset < block.len() {
767 let mut result_stride = 1;
769 let mut result_flat_idx = 0;
770 for i in (0..result_indices.len()).rev() {
771 result_flat_idx += result_indices[i] * result_stride;
772 if i > 0 {
773 result_stride *= resultshape[i];
774 }
775 }
776
777 let result_slice = result.as_slice_mut().expect("Operation failed");
779 result_slice[result_flat_idx] = block[block_offset];
780 }
781
782 for i in (0..ranges.len()).rev() {
784 result_indices[i] += 1;
785 source_indices[i] += 1;
786 if result_indices[i] < resultshape[i] {
787 break;
788 }
789 result_indices[i] = 0;
790 source_indices[i] = ranges[i].0;
791 }
792 }
793
794 Ok(result)
795 }
796
797 pub fn process_blocks<F, R>(&self, f: F) -> CoreResult<Vec<R>>
809 where
810 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
811 R: Send + 'static,
812 {
813 self.process_blocks_internal(f, false, None)
814 }
815
816 pub fn process_blocks_with_size<F, R>(&self, blocksize: usize, f: F) -> CoreResult<Vec<R>>
827 where
828 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
829 R: Send + 'static,
830 {
831 self.process_blocks_internal(f, false, Some(blocksize))
832 }
833
834 #[cfg(feature = "parallel")]
844 pub fn process_blocks_parallel<F, R>(&self, f: F) -> CoreResult<Vec<R>>
845 where
846 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
847 R: Send + 'static,
848 {
849 self.process_blocks_internal(f, true, None)
850 }
851
852 #[cfg(feature = "parallel")]
863 pub fn process_blocks_parallel_with_size<F, R>(
864 &self,
865 block_size: usize,
866 f: F,
867 ) -> CoreResult<Vec<R>>
868 where
869 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
870 R: Send + 'static,
871 {
872 self.process_blocks_internal(f, true, Some(block_size))
873 }
874
875 #[cfg(not(feature = "parallel"))]
877 fn process_blocks_internal<F, R>(
878 &self,
879 mut f: F,
880 _parallel: bool,
881 custom_block_size: Option<usize>,
882 ) -> CoreResult<Vec<R>>
883 where
884 F: FnMut(&[A], usize) -> R,
885 {
886 let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
888 let num_elements = self.metadata.num_elements;
889 let num_blocks = num_elements.div_ceil(block_size);
890
891 let mut results = Vec::with_capacity(num_blocks);
893 for blockidx in 0..num_blocks {
894 let start = blockidx * block_size;
895 let end = (start + block_size).min(num_elements);
896 let elements = self.load_elements(start, end)?;
897 results.push(f(&elements, blockidx));
898 }
899 Ok(results)
900 }
901
902 #[cfg(feature = "parallel")]
904 fn process_blocks_internal<F, R>(
905 &self,
906 f: F,
907 parallel: bool,
908 custom_block_size: Option<usize>,
909 ) -> CoreResult<Vec<R>>
910 where
911 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
912 R: Send + 'static,
913 {
914 let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
916 let num_elements = self.metadata.num_elements;
917 let num_blocks = num_elements.div_ceil(block_size);
918
919 if parallel {
921 use crate::parallel_ops::*;
922
923 return (0..num_blocks)
924 .into_par_iter()
925 .map(|blockidx| {
926 let start = blockidx * block_size;
928 let end = (start + block_size).min(num_elements);
929
930 let elements = match self.load_elements(start, end) {
932 Ok(elems) => elems,
933 Err(e) => return Err(e),
934 };
935
936 Ok(f(&elements, blockidx))
938 })
939 .collect::<Result<Vec<R>, CoreError>>();
940 }
941
942 (0..num_blocks)
944 .map(|blockidx| {
945 let start = blockidx * block_size;
947 let end = (start + block_size).min(num_elements);
948
949 let elements = self.load_elements(start, end)?;
951
952 Ok(f(&elements, blockidx))
954 })
955 .collect::<Result<Vec<R>, CoreError>>()
956 }
957
958 fn load_elements(&self, start: usize, end: usize) -> CoreResult<Vec<A>> {
971 if start >= self.metadata.num_elements {
973 return Err(CoreError::IndexError(ErrorContext::new(format!(
974 "Start index {} out of bounds (max {})",
975 start,
976 self.metadata.num_elements - 1
977 ))));
978 }
979 if end > self.metadata.num_elements {
980 return Err(CoreError::IndexError(ErrorContext::new(format!(
981 "End index {} out of bounds (max {})",
982 end, self.metadata.num_elements
983 ))));
984 }
985 if start >= end {
986 return Ok(Vec::new());
987 }
988
989 let start_block = start / self.metadata.block_size;
991 let end_block = (end - 1) / self.metadata.block_size;
992
993 let mut result = Vec::with_capacity(end - start);
995
996 for blockidx in start_block..=end_block {
998 let block = match self.block_cache.get_block(blockidx) {
1000 Some(block) => block,
1001 None => {
1002 let block = self.load_block(blockidx)?;
1003 self.block_cache.put_block(blockidx, block.clone());
1004 block
1005 }
1006 };
1007
1008 let block_start = blockidx * self.metadata.block_size;
1010 let block_end = block_start + block.len();
1011
1012 let range_start = start.max(block_start) - block_start;
1013 let range_end = end.min(block_end) - block_start;
1014
1015 result.extend_from_slice(&block[range_start..range_end]);
1017 }
1018
1019 Ok(result)
1020 }
1021}
1022
1023#[derive(Debug)]
1027struct BlockCache<A: Clone + Copy + 'static + Send + Sync> {
1028 capacity: usize,
1030
1031 ttl: Option<Duration>,
1033
1034 cache: RwLock<HashMap<usize, CachedBlock<A>>>,
1036}
1037
1038#[derive(Debug, Clone)]
1040struct CachedBlock<A: Clone + Copy + 'static + Send + Sync> {
1041 data: Vec<A>,
1043
1044 timestamp: Instant,
1046}
1047
1048impl<A: Clone + Copy + 'static + Send + Sync> BlockCache<A> {
1049 fn new(capacity: usize, ttl: Option<Duration>) -> Self {
1056 Self {
1057 capacity,
1058 ttl,
1059 cache: RwLock::new(HashMap::new()),
1060 }
1061 }
1062
1063 fn has_block(&self, blockidx: usize) -> bool {
1073 let cache = self.cache.read().expect("Operation failed");
1074
1075 if let Some(cached) = cache.get(&blockidx) {
1077 if let Some(ttl) = self.ttl {
1079 if cached.timestamp.elapsed() > ttl {
1080 return false;
1081 }
1082 }
1083
1084 true
1085 } else {
1086 false
1087 }
1088 }
1089
1090 fn get_block(&self, blockidx: usize) -> Option<Vec<A>> {
1100 let mut cache = self.cache.write().expect("Operation failed");
1101
1102 if let Some(mut cached) = cache.remove(&blockidx) {
1104 if let Some(ttl) = self.ttl {
1106 if cached.timestamp.elapsed() > ttl {
1107 return None;
1108 }
1109 }
1110
1111 cached.timestamp = Instant::now();
1113
1114 let data = cached.data.clone();
1116 cache.insert(blockidx, cached);
1117
1118 Some(data)
1119 } else {
1120 None
1121 }
1122 }
1123
1124 fn put_block(&self, blockidx: usize, block: Vec<A>) {
1131 let mut cache = self.cache.write().expect("Operation failed");
1132
1133 if cache.len() >= self.capacity && !cache.contains_key(&blockidx) {
1135 if let Some(lru_idx) = cache
1137 .iter()
1138 .min_by_key(|(_, cached)| cached.timestamp)
1139 .map(|(idx, _)| *idx)
1140 {
1141 cache.remove(&lru_idx);
1142 }
1143 }
1144
1145 cache.insert(
1147 blockidx,
1148 CachedBlock {
1149 data: block,
1150 timestamp: Instant::now(),
1151 },
1152 );
1153 }
1154
1155 #[allow(dead_code)]
1157 fn clear(&self) {
1158 let mut cache = self.cache.write().expect("Operation failed");
1159 cache.clear();
1160 }
1161
1162 #[allow(dead_code)]
1164 fn len(&self) -> usize {
1165 let cache = self.cache.read().expect("Operation failed");
1166 cache.len()
1167 }
1168
1169 #[allow(dead_code)]
1171 fn is_empty(&self) -> bool {
1172 let cache = self.cache.read().expect("Operation failed");
1173 cache.is_empty()
1174 }
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179 use super::*;
1180 use ::ndarray::Array2;
1181 use tempfile::tempdir;
1182
1183 #[test]
1184 fn test_compressed_memmapped_array_1d() {
1185 let dir = tempdir().expect("Operation failed");
1187 let file_path = dir.path().join("test_compressed_1d.cmm");
1188
1189 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1191
1192 let builder = CompressedMemMapBuilder::new()
1194 .with_block_size(100)
1195 .with_algorithm(CompressionAlgorithm::Lz4)
1196 .with_level(1)
1197 .with_cache_size(4)
1198 .with_description("Test 1D array");
1199
1200 let cmm = builder
1202 .create_from_raw(&data, &[1000], &file_path)
1203 .expect("Operation failed");
1204
1205 assert_eq!(cmm.shape(), &[1000]);
1207 assert_eq!(cmm.size(), 1000);
1208 assert_eq!(cmm.ndim(), 1);
1209
1210 for i in 0..10 {
1212 let val = cmm.get(&[i * 100]).expect("Operation failed");
1213 assert_eq!(val, (i * 100) as f64);
1214 }
1215
1216 let slice = cmm.slice(&[(200, 300)]).expect("Operation failed");
1218 assert_eq!(slice.shape(), &[100]);
1219 for i in 0..100 {
1220 assert_eq!(slice[crate::ndarray::IxDyn(&[i])], (i + 200) as f64);
1221 }
1222
1223 let sums = cmm
1225 .process_blocks(|block, _| block.iter().sum::<f64>())
1226 .expect("Test: operation failed");
1227
1228 assert_eq!(sums.len(), 10); let array = cmm.readonly_array().expect("Operation failed");
1232 assert_eq!(array.shape(), &[1000]);
1233 for i in 0..1000 {
1234 assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1235 }
1236 }
1237
1238 #[test]
1239 fn test_compressed_memmapped_array_2d() {
1240 let dir = tempdir().expect("Operation failed");
1242 let file_path = dir.path().join("test_compressed_2d.cmm");
1243
1244 let data = Array2::<f64>::from_shape_fn((10, 10), |(i, j)| (i * 10 + j) as f64);
1246
1247 let builder = CompressedMemMapBuilder::new()
1249 .with_block_size(25) .with_algorithm(CompressionAlgorithm::Lz4)
1251 .with_level(1)
1252 .with_cache_size(4)
1253 .with_description("Test 2D array");
1254
1255 let cmm = builder.create(&data, &file_path).expect("Operation failed");
1257
1258 assert_eq!(cmm.shape(), &[10, 10]);
1260 assert_eq!(cmm.size(), 100);
1261 assert_eq!(cmm.ndim(), 2);
1262
1263 for i in 0..10 {
1265 for j in 0..10 {
1266 let val = cmm.get(&[i, j]).expect("Operation failed");
1267 assert_eq!(val, (i * 10 + j) as f64);
1268 }
1269 }
1270
1271 let slice = cmm.slice(&[(2, 5), (3, 7)]).expect("Operation failed");
1273 assert_eq!(slice.shape(), &[3, 4]);
1274 for i in 0..3 {
1275 for j in 0..4 {
1276 assert_eq!(
1277 slice[crate::ndarray::IxDyn(&[i, j])],
1278 ((i + 2) * 10 + (j + 3)) as f64
1279 );
1280 }
1281 }
1282
1283 let array = cmm.readonly_array().expect("Operation failed");
1285 assert_eq!(array.shape(), &[10, 10]);
1286 for i in 0..10 {
1287 for j in 0..10 {
1288 assert_eq!(array[crate::ndarray::IxDyn(&[i, j])], (i * 10 + j) as f64);
1289 }
1290 }
1291 }
1292
1293 #[test]
1294 fn test_different_compression_algorithms() {
1295 let dir = tempdir().expect("Operation failed");
1297
1298 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1300
1301 for algorithm in &[
1303 CompressionAlgorithm::Lz4,
1304 CompressionAlgorithm::Zstd,
1305 CompressionAlgorithm::Snappy,
1306 ] {
1307 let file_path = dir.path().join(format!("test_{:?}.cmm", algorithm));
1308
1309 let builder = CompressedMemMapBuilder::new()
1311 .with_block_size(100)
1312 .with_algorithm(*algorithm)
1313 .with_level(1)
1314 .with_cache_size(4);
1315
1316 let cmm = builder
1318 .create_from_raw(&data, &[1000], &file_path)
1319 .expect("Operation failed");
1320
1321 let array = cmm.readonly_array().expect("Operation failed");
1323 for i in 0..1000 {
1324 assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1325 }
1326 }
1327 }
1328
1329 #[test]
1330 fn test_block_cache() {
1331 let dir = tempdir().expect("Operation failed");
1333 let file_path = dir.path().join("test_cache.cmm");
1334
1335 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1337
1338 let small_cache = CompressedMemMapBuilder::new()
1340 .with_block_size(100)
1341 .with_cache_size(2) .create_from_raw(&data, &[1000], &file_path).expect("Operation failed");
1343
1344 for i in 0..10 {
1347 small_cache.preload_block(i).expect("Operation failed");
1348 }
1349
1350 assert_eq!(small_cache.block_cache.len(), 2);
1352
1353 let val = small_cache.get(&[0]).expect("Operation failed"); assert_eq!(val, 0.0);
1357
1358 assert!(small_cache.block_cache.has_block(0));
1360 }
1361
1362 #[test]
1363 fn test_block_preloading() {
1364 let dir = tempdir().expect("Operation failed");
1366 let file_path = dir.path().join("test_preload.cmm");
1367
1368 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1370
1371 let cmm = CompressedMemMapBuilder::new()
1373 .with_block_size(100)
1374 .create_from_raw(&data, &[1000], &file_path)
1375 .expect("Test: operation failed");
1376
1377 cmm.preload_block(5).expect("Operation failed");
1379
1380 assert!(cmm.block_cache.has_block(5));
1382
1383 let val = cmm.get(&[550]).expect("Operation failed"); assert_eq!(val, 550.0);
1386 }
1387}