1use crate::error::{CoreError, CoreResult, ErrorContext};
12use ::ndarray::{Array, ArrayBase, Dimension, IxDyn, RawData};
13
14#[cfg(feature = "memory_compression")]
15use oxiarc_lz4;
16#[cfg(feature = "memory_compression")]
17use oxiarc_zstd;
18
19use std::collections::HashMap;
20use std::fs::{File, OpenOptions};
21use std::io::{Read, Seek, SeekFrom, Write};
22use std::path::{Path, PathBuf};
23use std::sync::{Arc, RwLock};
24use std::time::{Duration, Instant};
25
26#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
28pub struct CompressedFileMetadata {
29 pub shape: Vec<usize>,
31
32 pub element_size: usize,
34
35 pub num_elements: usize,
37
38 pub block_size: usize,
40
41 pub num_blocks: usize,
43
44 pub block_offsets: Vec<u64>,
46
47 pub block_compressed_sizes: Vec<usize>,
49
50 pub block_uncompressed_sizes: Vec<usize>,
52
53 pub compression_algorithm: CompressionAlgorithm,
55
56 pub compression_level: i32,
58
59 pub creation_time: chrono::DateTime<chrono::Utc>,
61
62 pub description: Option<String>,
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
68pub enum CompressionAlgorithm {
69 #[default]
71 Lz4,
72 Zstd,
74 Snappy,
76}
77
78#[derive(Debug, Clone)]
80pub struct CompressedMemMapBuilder {
81 block_size: usize,
83
84 algorithm: CompressionAlgorithm,
86
87 level: i32,
89
90 cache_size: usize,
92
93 cache_ttl: Option<Duration>,
95
96 description: Option<String>,
98}
99
100impl Default for CompressedMemMapBuilder {
101 fn default() -> Self {
102 Self {
103 block_size: 65536, algorithm: CompressionAlgorithm::Lz4,
105 level: 1, cache_size: 32, cache_ttl: Some(Duration::from_secs(300)), description: None,
109 }
110 }
111}
112
113impl CompressedMemMapBuilder {
114 pub fn new() -> Self {
116 Self::default()
117 }
118
119 pub fn with_block_size(mut self, blocksize: usize) -> Self {
123 self.block_size = blocksize;
124 self
125 }
126
127 pub fn with_algorithm(mut self, algorithm: CompressionAlgorithm) -> Self {
129 self.algorithm = algorithm;
130 self
131 }
132
133 pub fn with_level(mut self, level: i32) -> Self {
141 self.level = level;
142 self
143 }
144
145 pub fn with_cache_size(mut self, cachesize: usize) -> Self {
150 self.cache_size = cachesize;
151 self
152 }
153
154 pub fn with_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
159 self.cache_ttl = ttl;
160 self
161 }
162
163 pub fn with_description(mut self, description: impl Into<String>) -> Self {
165 self.description = Some(description.into());
166 self
167 }
168
169 pub fn create<A, S, D>(
180 &self,
181 data: &ArrayBase<S, D>,
182 path: impl AsRef<Path>,
183 ) -> CoreResult<CompressedMemMappedArray<A>>
184 where
185 A: Clone + Copy + 'static + Send + Sync,
186 S: RawData<Elem = A>,
187 D: Dimension,
188 {
189 let shape = data.shape().to_vec();
191 let num_elements = data.len();
192 let element_size = std::mem::size_of::<A>();
193
194 let block_size = self.block_size.min(num_elements);
196 let num_blocks = num_elements.div_ceil(block_size);
197
198 let mut metadata = CompressedFileMetadata {
200 shape,
201 element_size,
202 num_elements,
203 block_size,
204 num_blocks,
205 block_offsets: Vec::with_capacity(num_blocks),
206 block_compressed_sizes: Vec::with_capacity(num_blocks),
207 block_uncompressed_sizes: Vec::with_capacity(num_blocks),
208 compression_algorithm: self.algorithm,
209 compression_level: self.level,
210 creation_time: chrono::Utc::now(),
211 description: self.description.clone(),
212 };
213
214 let path = path.as_ref();
216 let mut file = OpenOptions::new()
217 .read(true)
218 .write(true)
219 .create(true)
220 .truncate(true)
221 .open(path)?;
222
223 let metadata_placeholder = vec![0u8; 1024];
225 file.write_all(&metadata_placeholder)?;
226
227 let data_ptr = data.as_ptr() as *const u8;
229 let mut current_offset = metadata_placeholder.len() as u64;
230
231 for blockidx in 0..num_blocks {
232 let start_element = blockidx * block_size;
233 let end_element = (start_element + block_size).min(num_elements);
234 let block_elements = end_element - start_element;
235 let uncompressed_size = block_elements * element_size;
236
237 metadata.block_offsets.push(current_offset);
239 metadata.block_uncompressed_sizes.push(uncompressed_size);
240
241 let data_offset = start_element * element_size;
243 let block_data =
244 unsafe { std::slice::from_raw_parts(data_ptr.add(data_offset), uncompressed_size) };
245
246 let compressed_data = match self.algorithm {
248 CompressionAlgorithm::Lz4 => oxiarc_lz4::compress(block_data).map_err(|e| {
249 CoreError::ComputationError(ErrorContext::new(format!(
250 "LZ4 compression error: {}",
251 e
252 )))
253 })?,
254 CompressionAlgorithm::Zstd => {
255 oxiarc_zstd::compress_with_level(block_data, self.level).map_err(|e| {
256 CoreError::ComputationError(ErrorContext::new(format!(
257 "Zstd compression error: {}",
258 e
259 )))
260 })?
261 }
262 CompressionAlgorithm::Snappy => oxiarc_snappy::compress(block_data),
263 };
264
265 metadata.block_compressed_sizes.push(compressed_data.len());
267
268 file.write_all(&compressed_data)?;
270
271 current_offset += compressed_data.len() as u64;
273 }
274
275 let metadata_json = serde_json::to_string(&metadata).map_err(|e| {
277 CoreError::ValueError(ErrorContext::new(format!(
278 "Failed to serialize metadata: {}",
279 e
280 )))
281 })?;
282 let mut metadata_bytes = metadata_json.into_bytes();
283
284 if metadata_bytes.len() > metadata_placeholder.len() {
286 return Err(CoreError::ValueError(ErrorContext::new(format!(
287 "Metadata size ({} bytes) exceeds reserved space ({} bytes)",
288 metadata_bytes.len(),
289 metadata_placeholder.len()
290 ))));
291 }
292
293 metadata_bytes.resize(metadata_placeholder.len(), 0);
295
296 file.seek(SeekFrom::Start(0))?;
298 file.write_all(&metadata_bytes)?;
299
300 let compressed_mmap = CompressedMemMappedArray::open_impl(
302 path.to_path_buf(),
303 self.cache_size,
304 self.cache_ttl,
305 )?;
306
307 Ok(compressed_mmap)
308 }
309
310 pub fn create_from_raw<A>(
322 &self,
323 data: &[A],
324 shape: &[usize],
325 path: impl AsRef<Path>,
326 ) -> CoreResult<CompressedMemMappedArray<A>>
327 where
328 A: Clone + Copy + 'static + Send + Sync,
329 {
330 let array = Array::from_shape_vec(IxDyn(shape), data.to_vec())
332 .map_err(|e| CoreError::ShapeError(ErrorContext::new(format!("{e}"))))?;
333
334 self.create(&array, path)
336 }
337}
338
339#[derive(Debug, Clone)]
344pub struct CompressedMemMappedArray<A: Clone + Copy + 'static + Send + Sync> {
345 path: PathBuf,
347
348 metadata: CompressedFileMetadata,
350
351 block_cache: Arc<BlockCache<A>>,
353
354 phantom: std::marker::PhantomData<A>,
356}
357
358impl<A: Clone + Copy + 'static + Send + Sync> CompressedMemMappedArray<A> {
359 pub fn open<P: AsRef<Path>>(path: P) -> CoreResult<Self> {
369 let cache_size = 32;
371 let cache_ttl = Some(Duration::from_secs(300));
372
373 Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
374 }
375
376 pub fn open_with_cache<P: AsRef<Path>>(
388 path: P,
389 cache_size: usize,
390 cache_ttl: Option<Duration>,
391 ) -> CoreResult<Self> {
392 Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
393 }
394
395 fn open_impl(
397 path: PathBuf,
398 cache_size: usize,
399 cache_ttl: Option<Duration>,
400 ) -> CoreResult<Self> {
401 let mut file = File::open(&path)?;
403
404 let mut metadata_bytes = vec![0u8; 1024];
406 file.read_exact(&mut metadata_bytes)?;
407
408 let metadata_json = String::from_utf8_lossy(&metadata_bytes)
411 .trim_end_matches('\0')
412 .to_string();
413 let metadata: CompressedFileMetadata =
414 serde_json::from_str(&metadata_json).map_err(|e| {
415 CoreError::ValueError(ErrorContext::new(format!(
416 "Failed to deserialize metadata: {}",
417 e
418 )))
419 })?;
420
421 let expected_element_size = std::mem::size_of::<A>();
423 if metadata.element_size != expected_element_size {
424 return Err(CoreError::ValueError(ErrorContext::new(format!(
425 "Element _size mismatch: expected {}, got {}",
426 expected_element_size, metadata.element_size
427 ))));
428 }
429
430 let block_cache = Arc::new(BlockCache::new(cache_size, cache_ttl));
432
433 Ok(Self {
435 path,
436 metadata,
437 block_cache,
438 phantom: std::marker::PhantomData,
439 })
440 }
441
442 pub fn shape(&self) -> &[usize] {
444 &self.metadata.shape
445 }
446
447 pub fn size(&self) -> usize {
449 self.metadata.num_elements
450 }
451
452 pub fn ndim(&self) -> usize {
454 self.metadata.shape.len()
455 }
456
457 pub fn metadata(&self) -> &CompressedFileMetadata {
459 &self.metadata
460 }
461
462 pub fn block_size(&self) -> usize {
464 self.metadata.block_size
465 }
466
467 pub fn num_blocks(&self) -> usize {
469 self.metadata.num_blocks
470 }
471
472 pub fn preload_block(&self, blockidx: usize) -> CoreResult<()> {
484 if blockidx >= self.metadata.num_blocks {
485 return Err(CoreError::IndexError(ErrorContext::new(format!(
486 "Block index {} out of bounds (max {})",
487 blockidx,
488 self.metadata.num_blocks - 1
489 ))));
490 }
491
492 if self.block_cache.has_block(blockidx) {
494 return Ok(());
495 }
496
497 let block = self.load_block(blockidx)?;
499
500 self.block_cache.put_block(blockidx, block);
502
503 Ok(())
504 }
505
506 fn load_block(&self, blockidx: usize) -> CoreResult<Vec<A>> {
508 let mut file = File::open(&self.path)?;
510
511 let offset = self.metadata.block_offsets[blockidx];
513 let compressed_size = self.metadata.block_compressed_sizes[blockidx];
514 let uncompressed_size = self.metadata.block_uncompressed_sizes[blockidx];
515
516 file.seek(SeekFrom::Start(offset))?;
518 let mut compressed_data = vec![0u8; compressed_size];
519 file.read_exact(&mut compressed_data)?;
520
521 let block_bytes = match self.metadata.compression_algorithm {
523 CompressionAlgorithm::Lz4 => {
524 oxiarc_lz4::decompress(&compressed_data, uncompressed_size).map_err(|e| {
525 CoreError::ComputationError(ErrorContext::new(format!(
526 "LZ4 decompression error: {}",
527 e
528 )))
529 })?
530 }
531 CompressionAlgorithm::Zstd => {
532 oxiarc_zstd::decompress(&compressed_data).map_err(|e| {
533 CoreError::ComputationError(ErrorContext::new(format!(
534 "Zstd decompression error: {}",
535 e
536 )))
537 })?
538 }
539 CompressionAlgorithm::Snappy => {
540 oxiarc_snappy::decompress(&compressed_data).map_err(|e| {
541 CoreError::ComputationError(ErrorContext::new(format!(
542 "Snappy decompression error: {}",
543 e
544 )))
545 })?
546 }
547 };
548
549 if block_bytes.len() != uncompressed_size {
551 return Err(CoreError::ValueError(ErrorContext::new(format!(
552 "Block {} decompressed to {} bytes, expected {}",
553 blockidx,
554 block_bytes.len(),
555 uncompressed_size
556 ))));
557 }
558
559 let num_elements = uncompressed_size / std::mem::size_of::<A>();
561 let mut elements = Vec::with_capacity(num_elements);
562
563 for chunk in block_bytes.chunks_exact(std::mem::size_of::<A>()) {
565 let element = unsafe { *(chunk.as_ptr() as *const A) };
566 elements.push(element);
567 }
568
569 Ok(elements)
570 }
571
572 pub fn readonly_array(&self) -> CoreResult<Array<A, IxDyn>> {
580 let mut result = Array::from_elem(IxDyn(&self.metadata.shape), unsafe {
582 std::mem::zeroed::<A>()
583 });
584
585 let mut offset = 0;
587 for blockidx in 0..self.metadata.num_blocks {
588 let block = match self.block_cache.get_block(blockidx) {
590 Some(block) => block,
591 None => {
592 let block = self.load_block(blockidx)?;
593 self.block_cache.put_block(blockidx, block.clone());
594 block
595 }
596 };
597
598 let start = offset;
600 let end = (start + block.len()).min(self.metadata.num_elements);
601 let slice = &mut result.as_slice_mut().expect("Operation failed")[start..end];
602 slice.copy_from_slice(&block[..(end - start)]);
603
604 offset = end;
606 }
607
608 Ok(result)
609 }
610
611 pub fn get(&self, indices: &[usize]) -> CoreResult<A> {
623 if indices.len() != self.metadata.shape.len() {
625 return Err(CoreError::DimensionError(ErrorContext::new(format!(
626 "Expected {} indices, got {}",
627 self.metadata.shape.len(),
628 indices.len()
629 ))));
630 }
631
632 for (dim, &idx) in indices.iter().enumerate() {
633 if idx >= self.metadata.shape[dim] {
634 return Err(CoreError::IndexError(ErrorContext::new(format!(
635 "Index {} out of bounds for dimension {} (max {})",
636 idx,
637 dim,
638 self.metadata.shape[dim] - 1
639 ))));
640 }
641 }
642
643 let mut flat_index = 0;
645 let mut stride = 1;
646 for i in (0..indices.len()).rev() {
647 flat_index += indices[i] * stride;
648 if i > 0 {
649 stride *= self.metadata.shape[i];
650 }
651 }
652
653 let blockidx = flat_index / self.metadata.block_size;
655 let block_offset = flat_index % self.metadata.block_size;
656
657 let block = match self.block_cache.get_block(blockidx) {
659 Some(block) => block,
660 None => {
661 let block = self.load_block(blockidx)?;
662 self.block_cache.put_block(blockidx, block.clone());
663 block
664 }
665 };
666
667 if block_offset < block.len() {
669 Ok(block[block_offset])
670 } else {
671 Err(CoreError::IndexError(ErrorContext::new(format!(
672 "Block offset {} out of bounds for block {} (max {})",
673 block_offset,
674 blockidx,
675 block.len() - 1
676 ))))
677 }
678 }
679
680 pub fn slice(&self, ranges: &[(usize, usize)]) -> CoreResult<Array<A, IxDyn>> {
692 if ranges.len() != self.metadata.shape.len() {
694 return Err(CoreError::DimensionError(ErrorContext::new(format!(
695 "Expected {} ranges, got {}",
696 self.metadata.shape.len(),
697 ranges.len()
698 ))));
699 }
700
701 let mut resultshape = Vec::with_capacity(ranges.len());
703 for (dim, &(start, end)) in ranges.iter().enumerate() {
704 if start >= end {
705 return Err(CoreError::ValueError(ErrorContext::new(format!(
706 "Invalid range for dimension {}: {}..{}",
707 dim, start, end
708 ))));
709 }
710 if end > self.metadata.shape[dim] {
711 return Err(CoreError::IndexError(ErrorContext::new(format!(
712 "Range {}..{} out of bounds for dimension {} (max {})",
713 start, end, dim, self.metadata.shape[dim]
714 ))));
715 }
716 resultshape.push(end - start);
717 }
718
719 let mut result = Array::from_elem(IxDyn(&resultshape), unsafe { std::mem::zeroed::<A>() });
721
722 let result_size = resultshape.iter().product::<usize>();
724
725 let mut result_indices = vec![0; ranges.len()];
727 let mut source_indices = Vec::with_capacity(ranges.len());
728 for &(start, _) in ranges.iter() {
729 source_indices.push(start);
730 }
731
732 for _result_flat_idx in 0..result_size {
734 let mut source_flat_idx = 0;
736 let mut stride = 1;
737 for i in (0..source_indices.len()).rev() {
738 source_flat_idx += source_indices[i] * stride;
739 if i > 0 {
740 stride *= self.metadata.shape[i];
741 }
742 }
743
744 let blockidx = source_flat_idx / self.metadata.block_size;
746 let block_offset = source_flat_idx % self.metadata.block_size;
747
748 let block = match self.block_cache.get_block(blockidx) {
750 Some(block) => block,
751 None => {
752 let block = self.load_block(blockidx)?;
753 self.block_cache.put_block(blockidx, block.clone());
754 block
755 }
756 };
757
758 if block_offset < block.len() {
760 let mut result_stride = 1;
762 let mut result_flat_idx = 0;
763 for i in (0..result_indices.len()).rev() {
764 result_flat_idx += result_indices[i] * result_stride;
765 if i > 0 {
766 result_stride *= resultshape[i];
767 }
768 }
769
770 let result_slice = result.as_slice_mut().expect("Operation failed");
772 result_slice[result_flat_idx] = block[block_offset];
773 }
774
775 for i in (0..ranges.len()).rev() {
777 result_indices[i] += 1;
778 source_indices[i] += 1;
779 if result_indices[i] < resultshape[i] {
780 break;
781 }
782 result_indices[i] = 0;
783 source_indices[i] = ranges[i].0;
784 }
785 }
786
787 Ok(result)
788 }
789
790 pub fn process_blocks<F, R>(&self, f: F) -> CoreResult<Vec<R>>
802 where
803 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
804 R: Send + 'static,
805 {
806 self.process_blocks_internal(f, false, None)
807 }
808
809 pub fn process_blocks_with_size<F, R>(&self, blocksize: usize, f: F) -> CoreResult<Vec<R>>
820 where
821 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
822 R: Send + 'static,
823 {
824 self.process_blocks_internal(f, false, Some(blocksize))
825 }
826
827 #[cfg(feature = "parallel")]
837 pub fn process_blocks_parallel<F, R>(&self, f: F) -> CoreResult<Vec<R>>
838 where
839 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
840 R: Send + 'static,
841 {
842 self.process_blocks_internal(f, true, None)
843 }
844
845 #[cfg(feature = "parallel")]
856 pub fn process_blocks_parallel_with_size<F, R>(
857 &self,
858 block_size: usize,
859 f: F,
860 ) -> CoreResult<Vec<R>>
861 where
862 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
863 R: Send + 'static,
864 {
865 self.process_blocks_internal(f, true, Some(block_size))
866 }
867
868 #[cfg(not(feature = "parallel"))]
870 fn process_blocks_internal<F, R>(
871 &self,
872 mut f: F,
873 _parallel: bool,
874 custom_block_size: Option<usize>,
875 ) -> CoreResult<Vec<R>>
876 where
877 F: FnMut(&[A], usize) -> R,
878 {
879 let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
881 let num_elements = self.metadata.num_elements;
882 let num_blocks = num_elements.div_ceil(block_size);
883
884 let mut results = Vec::with_capacity(num_blocks);
886 for blockidx in 0..num_blocks {
887 let start = blockidx * block_size;
888 let end = (start + block_size).min(num_elements);
889 let elements = self.load_elements(start, end)?;
890 results.push(f(&elements, blockidx));
891 }
892 Ok(results)
893 }
894
895 #[cfg(feature = "parallel")]
897 fn process_blocks_internal<F, R>(
898 &self,
899 f: F,
900 parallel: bool,
901 custom_block_size: Option<usize>,
902 ) -> CoreResult<Vec<R>>
903 where
904 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
905 R: Send + 'static,
906 {
907 let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
909 let num_elements = self.metadata.num_elements;
910 let num_blocks = num_elements.div_ceil(block_size);
911
912 if parallel {
914 use crate::parallel_ops::*;
915
916 return (0..num_blocks)
917 .into_par_iter()
918 .map(|blockidx| {
919 let start = blockidx * block_size;
921 let end = (start + block_size).min(num_elements);
922
923 let elements = match self.load_elements(start, end) {
925 Ok(elems) => elems,
926 Err(e) => return Err(e),
927 };
928
929 Ok(f(&elements, blockidx))
931 })
932 .collect::<Result<Vec<R>, CoreError>>();
933 }
934
935 (0..num_blocks)
937 .map(|blockidx| {
938 let start = blockidx * block_size;
940 let end = (start + block_size).min(num_elements);
941
942 let elements = self.load_elements(start, end)?;
944
945 Ok(f(&elements, blockidx))
947 })
948 .collect::<Result<Vec<R>, CoreError>>()
949 }
950
951 fn load_elements(&self, start: usize, end: usize) -> CoreResult<Vec<A>> {
964 if start >= self.metadata.num_elements {
966 return Err(CoreError::IndexError(ErrorContext::new(format!(
967 "Start index {} out of bounds (max {})",
968 start,
969 self.metadata.num_elements - 1
970 ))));
971 }
972 if end > self.metadata.num_elements {
973 return Err(CoreError::IndexError(ErrorContext::new(format!(
974 "End index {} out of bounds (max {})",
975 end, self.metadata.num_elements
976 ))));
977 }
978 if start >= end {
979 return Ok(Vec::new());
980 }
981
982 let start_block = start / self.metadata.block_size;
984 let end_block = (end - 1) / self.metadata.block_size;
985
986 let mut result = Vec::with_capacity(end - start);
988
989 for blockidx in start_block..=end_block {
991 let block = match self.block_cache.get_block(blockidx) {
993 Some(block) => block,
994 None => {
995 let block = self.load_block(blockidx)?;
996 self.block_cache.put_block(blockidx, block.clone());
997 block
998 }
999 };
1000
1001 let block_start = blockidx * self.metadata.block_size;
1003 let block_end = block_start + block.len();
1004
1005 let range_start = start.max(block_start) - block_start;
1006 let range_end = end.min(block_end) - block_start;
1007
1008 result.extend_from_slice(&block[range_start..range_end]);
1010 }
1011
1012 Ok(result)
1013 }
1014}
1015
1016#[derive(Debug)]
1020struct BlockCache<A: Clone + Copy + 'static + Send + Sync> {
1021 capacity: usize,
1023
1024 ttl: Option<Duration>,
1026
1027 cache: RwLock<HashMap<usize, CachedBlock<A>>>,
1029}
1030
1031#[derive(Debug, Clone)]
1033struct CachedBlock<A: Clone + Copy + 'static + Send + Sync> {
1034 data: Vec<A>,
1036
1037 timestamp: Instant,
1039}
1040
1041impl<A: Clone + Copy + 'static + Send + Sync> BlockCache<A> {
1042 fn new(capacity: usize, ttl: Option<Duration>) -> Self {
1049 Self {
1050 capacity,
1051 ttl,
1052 cache: RwLock::new(HashMap::new()),
1053 }
1054 }
1055
1056 fn has_block(&self, blockidx: usize) -> bool {
1066 let cache = self.cache.read().expect("Operation failed");
1067
1068 if let Some(cached) = cache.get(&blockidx) {
1070 if let Some(ttl) = self.ttl {
1072 if cached.timestamp.elapsed() > ttl {
1073 return false;
1074 }
1075 }
1076
1077 true
1078 } else {
1079 false
1080 }
1081 }
1082
1083 fn get_block(&self, blockidx: usize) -> Option<Vec<A>> {
1093 let mut cache = self.cache.write().expect("Operation failed");
1094
1095 if let Some(mut cached) = cache.remove(&blockidx) {
1097 if let Some(ttl) = self.ttl {
1099 if cached.timestamp.elapsed() > ttl {
1100 return None;
1101 }
1102 }
1103
1104 cached.timestamp = Instant::now();
1106
1107 let data = cached.data.clone();
1109 cache.insert(blockidx, cached);
1110
1111 Some(data)
1112 } else {
1113 None
1114 }
1115 }
1116
1117 fn put_block(&self, blockidx: usize, block: Vec<A>) {
1124 let mut cache = self.cache.write().expect("Operation failed");
1125
1126 if cache.len() >= self.capacity && !cache.contains_key(&blockidx) {
1128 if let Some(lru_idx) = cache
1130 .iter()
1131 .min_by_key(|(_, cached)| cached.timestamp)
1132 .map(|(idx, _)| *idx)
1133 {
1134 cache.remove(&lru_idx);
1135 }
1136 }
1137
1138 cache.insert(
1140 blockidx,
1141 CachedBlock {
1142 data: block,
1143 timestamp: Instant::now(),
1144 },
1145 );
1146 }
1147
1148 #[allow(dead_code)]
1150 fn clear(&self) {
1151 let mut cache = self.cache.write().expect("Operation failed");
1152 cache.clear();
1153 }
1154
1155 #[allow(dead_code)]
1157 fn len(&self) -> usize {
1158 let cache = self.cache.read().expect("Operation failed");
1159 cache.len()
1160 }
1161
1162 #[allow(dead_code)]
1164 fn is_empty(&self) -> bool {
1165 let cache = self.cache.read().expect("Operation failed");
1166 cache.is_empty()
1167 }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172 use super::*;
1173 use ::ndarray::Array2;
1174 use tempfile::tempdir;
1175
1176 #[test]
1177 fn test_compressed_memmapped_array_1d() {
1178 let dir = tempdir().expect("Operation failed");
1180 let file_path = dir.path().join("test_compressed_1d.cmm");
1181
1182 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1184
1185 let builder = CompressedMemMapBuilder::new()
1187 .with_block_size(100)
1188 .with_algorithm(CompressionAlgorithm::Lz4)
1189 .with_level(1)
1190 .with_cache_size(4)
1191 .with_description("Test 1D array");
1192
1193 let cmm = builder
1195 .create_from_raw(&data, &[1000], &file_path)
1196 .expect("Operation failed");
1197
1198 assert_eq!(cmm.shape(), &[1000]);
1200 assert_eq!(cmm.size(), 1000);
1201 assert_eq!(cmm.ndim(), 1);
1202
1203 for i in 0..10 {
1205 let val = cmm.get(&[i * 100]).expect("Operation failed");
1206 assert_eq!(val, (i * 100) as f64);
1207 }
1208
1209 let slice = cmm.slice(&[(200, 300)]).expect("Operation failed");
1211 assert_eq!(slice.shape(), &[100]);
1212 for i in 0..100 {
1213 assert_eq!(slice[crate::ndarray::IxDyn(&[i])], (i + 200) as f64);
1214 }
1215
1216 let sums = cmm
1218 .process_blocks(|block, _| block.iter().sum::<f64>())
1219 .expect("Test: operation failed");
1220
1221 assert_eq!(sums.len(), 10); let array = cmm.readonly_array().expect("Operation failed");
1225 assert_eq!(array.shape(), &[1000]);
1226 for i in 0..1000 {
1227 assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1228 }
1229 }
1230
1231 #[test]
1232 fn test_compressed_memmapped_array_2d() {
1233 let dir = tempdir().expect("Operation failed");
1235 let file_path = dir.path().join("test_compressed_2d.cmm");
1236
1237 let data = Array2::<f64>::from_shape_fn((10, 10), |(i, j)| (i * 10 + j) as f64);
1239
1240 let builder = CompressedMemMapBuilder::new()
1242 .with_block_size(25) .with_algorithm(CompressionAlgorithm::Lz4)
1244 .with_level(1)
1245 .with_cache_size(4)
1246 .with_description("Test 2D array");
1247
1248 let cmm = builder.create(&data, &file_path).expect("Operation failed");
1250
1251 assert_eq!(cmm.shape(), &[10, 10]);
1253 assert_eq!(cmm.size(), 100);
1254 assert_eq!(cmm.ndim(), 2);
1255
1256 for i in 0..10 {
1258 for j in 0..10 {
1259 let val = cmm.get(&[i, j]).expect("Operation failed");
1260 assert_eq!(val, (i * 10 + j) as f64);
1261 }
1262 }
1263
1264 let slice = cmm.slice(&[(2, 5), (3, 7)]).expect("Operation failed");
1266 assert_eq!(slice.shape(), &[3, 4]);
1267 for i in 0..3 {
1268 for j in 0..4 {
1269 assert_eq!(
1270 slice[crate::ndarray::IxDyn(&[i, j])],
1271 ((i + 2) * 10 + (j + 3)) as f64
1272 );
1273 }
1274 }
1275
1276 let array = cmm.readonly_array().expect("Operation failed");
1278 assert_eq!(array.shape(), &[10, 10]);
1279 for i in 0..10 {
1280 for j in 0..10 {
1281 assert_eq!(array[crate::ndarray::IxDyn(&[i, j])], (i * 10 + j) as f64);
1282 }
1283 }
1284 }
1285
1286 #[test]
1287 fn test_different_compression_algorithms() {
1288 let dir = tempdir().expect("Operation failed");
1290
1291 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1293
1294 for algorithm in &[
1296 CompressionAlgorithm::Lz4,
1297 CompressionAlgorithm::Zstd,
1298 CompressionAlgorithm::Snappy,
1299 ] {
1300 let file_path = dir.path().join(format!("test_{:?}.cmm", algorithm));
1301
1302 let builder = CompressedMemMapBuilder::new()
1304 .with_block_size(100)
1305 .with_algorithm(*algorithm)
1306 .with_level(1)
1307 .with_cache_size(4);
1308
1309 let cmm = builder
1311 .create_from_raw(&data, &[1000], &file_path)
1312 .expect("Operation failed");
1313
1314 let array = cmm.readonly_array().expect("Operation failed");
1316 for i in 0..1000 {
1317 assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1318 }
1319 }
1320 }
1321
1322 #[test]
1323 fn test_block_cache() {
1324 let dir = tempdir().expect("Operation failed");
1326 let file_path = dir.path().join("test_cache.cmm");
1327
1328 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1330
1331 let small_cache = CompressedMemMapBuilder::new()
1333 .with_block_size(100)
1334 .with_cache_size(2) .create_from_raw(&data, &[1000], &file_path).expect("Operation failed");
1336
1337 for i in 0..10 {
1340 small_cache.preload_block(i).expect("Operation failed");
1341 }
1342
1343 assert_eq!(small_cache.block_cache.len(), 2);
1345
1346 let val = small_cache.get(&[0]).expect("Operation failed"); assert_eq!(val, 0.0);
1350
1351 assert!(small_cache.block_cache.has_block(0));
1353 }
1354
1355 #[test]
1356 fn test_block_preloading() {
1357 let dir = tempdir().expect("Operation failed");
1359 let file_path = dir.path().join("test_preload.cmm");
1360
1361 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1363
1364 let cmm = CompressedMemMapBuilder::new()
1366 .with_block_size(100)
1367 .create_from_raw(&data, &[1000], &file_path)
1368 .expect("Test: operation failed");
1369
1370 cmm.preload_block(5).expect("Operation failed");
1372
1373 assert!(cmm.block_cache.has_block(5));
1375
1376 let val = cmm.get(&[550]).expect("Operation failed"); assert_eq!(val, 550.0);
1379 }
1380}