1use crate::error::{CoreError, CoreResult, ErrorContext};
12use ::ndarray::{Array, ArrayBase, Dimension, IxDyn, RawData};
13
14#[cfg(feature = "memory_compression")]
15use lz4::{Decoder, EncoderBuilder};
16
17use std::collections::HashMap;
18use std::fs::{File, OpenOptions};
19use std::io::{Read, Seek, SeekFrom, Write};
20use std::path::{Path, PathBuf};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
26pub struct CompressedFileMetadata {
27 pub shape: Vec<usize>,
29
30 pub element_size: usize,
32
33 pub num_elements: usize,
35
36 pub block_size: usize,
38
39 pub num_blocks: usize,
41
42 pub block_offsets: Vec<u64>,
44
45 pub block_compressed_sizes: Vec<usize>,
47
48 pub block_uncompressed_sizes: Vec<usize>,
50
51 pub compression_algorithm: CompressionAlgorithm,
53
54 pub compression_level: i32,
56
57 pub creation_time: chrono::DateTime<chrono::Utc>,
59
60 pub description: Option<String>,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
66pub enum CompressionAlgorithm {
67 #[default]
69 Lz4,
70 Zstd,
72 Snappy,
74}
75
76#[derive(Debug, Clone)]
78pub struct CompressedMemMapBuilder {
79 block_size: usize,
81
82 algorithm: CompressionAlgorithm,
84
85 level: i32,
87
88 cache_size: usize,
90
91 cache_ttl: Option<Duration>,
93
94 description: Option<String>,
96}
97
98impl Default for CompressedMemMapBuilder {
99 fn default() -> Self {
100 Self {
101 block_size: 65536, algorithm: CompressionAlgorithm::Lz4,
103 level: 1, cache_size: 32, cache_ttl: Some(Duration::from_secs(300)), description: None,
107 }
108 }
109}
110
111impl CompressedMemMapBuilder {
112 pub fn new() -> Self {
114 Self::default()
115 }
116
117 pub fn with_block_size(mut self, blocksize: usize) -> Self {
121 self.block_size = blocksize;
122 self
123 }
124
125 pub fn with_algorithm(mut self, algorithm: CompressionAlgorithm) -> Self {
127 self.algorithm = algorithm;
128 self
129 }
130
131 pub fn with_level(mut self, level: i32) -> Self {
139 self.level = level;
140 self
141 }
142
143 pub fn with_cache_size(mut self, cachesize: usize) -> Self {
148 self.cache_size = cachesize;
149 self
150 }
151
152 pub fn with_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
157 self.cache_ttl = ttl;
158 self
159 }
160
161 pub fn with_description(mut self, description: impl Into<String>) -> Self {
163 self.description = Some(description.into());
164 self
165 }
166
167 pub fn create<A, S, D>(
178 &self,
179 data: &ArrayBase<S, D>,
180 path: impl AsRef<Path>,
181 ) -> CoreResult<CompressedMemMappedArray<A>>
182 where
183 A: Clone + Copy + 'static + Send + Sync,
184 S: RawData<Elem = A>,
185 D: Dimension,
186 {
187 let shape = data.shape().to_vec();
189 let num_elements = data.len();
190 let element_size = std::mem::size_of::<A>();
191
192 let block_size = self.block_size.min(num_elements);
194 let num_blocks = num_elements.div_ceil(block_size);
195
196 let mut metadata = CompressedFileMetadata {
198 shape,
199 element_size,
200 num_elements,
201 block_size,
202 num_blocks,
203 block_offsets: Vec::with_capacity(num_blocks),
204 block_compressed_sizes: Vec::with_capacity(num_blocks),
205 block_uncompressed_sizes: Vec::with_capacity(num_blocks),
206 compression_algorithm: self.algorithm,
207 compression_level: self.level,
208 creation_time: chrono::Utc::now(),
209 description: self.description.clone(),
210 };
211
212 let path = path.as_ref();
214 let mut file = OpenOptions::new()
215 .read(true)
216 .write(true)
217 .create(true)
218 .truncate(true)
219 .open(path)?;
220
221 let metadata_placeholder = vec![0u8; 1024];
223 file.write_all(&metadata_placeholder)?;
224
225 let data_ptr = data.as_ptr() as *const u8;
227 let mut current_offset = metadata_placeholder.len() as u64;
228
229 for blockidx in 0..num_blocks {
230 let start_element = blockidx * block_size;
231 let end_element = (start_element + block_size).min(num_elements);
232 let block_elements = end_element - start_element;
233 let uncompressed_size = block_elements * element_size;
234
235 metadata.block_offsets.push(current_offset);
237 metadata.block_uncompressed_sizes.push(uncompressed_size);
238
239 let data_offset = start_element * element_size;
241 let block_data =
242 unsafe { std::slice::from_raw_parts(data_ptr.add(data_offset), uncompressed_size) };
243
244 let compressed_data = match self.algorithm {
246 CompressionAlgorithm::Lz4 => {
247 let mut encoder = EncoderBuilder::new()
248 .level(self.level as u32)
249 .build(Vec::new())?;
250 encoder.write_all(block_data)?;
251 let (compressed, result) = encoder.finish();
252 result?;
253 compressed
254 }
255 CompressionAlgorithm::Zstd => zstd::encode_all(block_data, self.level)?,
256 CompressionAlgorithm::Snappy => snap::raw::Encoder::new()
257 .compress_vec(block_data)
258 .map_err(|e| {
259 CoreError::ComputationError(ErrorContext::new(format!(
260 "Snappy compression error: {}",
261 e
262 )))
263 })?,
264 };
265
266 metadata.block_compressed_sizes.push(compressed_data.len());
268
269 file.write_all(&compressed_data)?;
271
272 current_offset += compressed_data.len() as u64;
274 }
275
276 let metadata_json = serde_json::to_string(&metadata).map_err(|e| {
278 CoreError::ValueError(ErrorContext::new(format!(
279 "Failed to serialize metadata: {}",
280 e
281 )))
282 })?;
283 let mut metadata_bytes = metadata_json.into_bytes();
284
285 if metadata_bytes.len() > metadata_placeholder.len() {
287 return Err(CoreError::ValueError(ErrorContext::new(format!(
288 "Metadata size ({} bytes) exceeds reserved space ({} bytes)",
289 metadata_bytes.len(),
290 metadata_placeholder.len()
291 ))));
292 }
293
294 metadata_bytes.resize(metadata_placeholder.len(), 0);
296
297 file.seek(SeekFrom::Start(0))?;
299 file.write_all(&metadata_bytes)?;
300
301 let compressed_mmap = CompressedMemMappedArray::open_impl(
303 path.to_path_buf(),
304 self.cache_size,
305 self.cache_ttl,
306 )?;
307
308 Ok(compressed_mmap)
309 }
310
311 pub fn create_from_raw<A>(
323 &self,
324 data: &[A],
325 shape: &[usize],
326 path: impl AsRef<Path>,
327 ) -> CoreResult<CompressedMemMappedArray<A>>
328 where
329 A: Clone + Copy + 'static + Send + Sync,
330 {
331 let array = Array::from_shape_vec(IxDyn(shape), data.to_vec())
333 .map_err(|e| CoreError::ShapeError(ErrorContext::new(format!("{e}"))))?;
334
335 self.create(&array, path)
337 }
338}
339
340#[derive(Debug, Clone)]
345pub struct CompressedMemMappedArray<A: Clone + Copy + 'static + Send + Sync> {
346 path: PathBuf,
348
349 metadata: CompressedFileMetadata,
351
352 block_cache: Arc<BlockCache<A>>,
354
355 phantom: std::marker::PhantomData<A>,
357}
358
359impl<A: Clone + Copy + 'static + Send + Sync> CompressedMemMappedArray<A> {
360 pub fn open<P: AsRef<Path>>(path: P) -> CoreResult<Self> {
370 let cache_size = 32;
372 let cache_ttl = Some(Duration::from_secs(300));
373
374 Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
375 }
376
377 pub fn open_with_cache<P: AsRef<Path>>(
389 path: P,
390 cache_size: usize,
391 cache_ttl: Option<Duration>,
392 ) -> CoreResult<Self> {
393 Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
394 }
395
396 fn open_impl(
398 path: PathBuf,
399 cache_size: usize,
400 cache_ttl: Option<Duration>,
401 ) -> CoreResult<Self> {
402 let mut file = File::open(&path)?;
404
405 let mut metadata_bytes = vec![0u8; 1024];
407 file.read_exact(&mut metadata_bytes)?;
408
409 let metadata_json = String::from_utf8_lossy(&metadata_bytes)
412 .trim_end_matches('\0')
413 .to_string();
414 let metadata: CompressedFileMetadata =
415 serde_json::from_str(&metadata_json).map_err(|e| {
416 CoreError::ValueError(ErrorContext::new(format!(
417 "Failed to deserialize metadata: {}",
418 e
419 )))
420 })?;
421
422 let expected_element_size = std::mem::size_of::<A>();
424 if metadata.element_size != expected_element_size {
425 return Err(CoreError::ValueError(ErrorContext::new(format!(
426 "Element _size mismatch: expected {}, got {}",
427 expected_element_size, metadata.element_size
428 ))));
429 }
430
431 let block_cache = Arc::new(BlockCache::new(cache_size, cache_ttl));
433
434 Ok(Self {
436 path,
437 metadata,
438 block_cache,
439 phantom: std::marker::PhantomData,
440 })
441 }
442
443 pub fn shape(&self) -> &[usize] {
445 &self.metadata.shape
446 }
447
448 pub fn size(&self) -> usize {
450 self.metadata.num_elements
451 }
452
453 pub fn ndim(&self) -> usize {
455 self.metadata.shape.len()
456 }
457
458 pub fn metadata(&self) -> &CompressedFileMetadata {
460 &self.metadata
461 }
462
463 pub fn block_size(&self) -> usize {
465 self.metadata.block_size
466 }
467
468 pub fn num_blocks(&self) -> usize {
470 self.metadata.num_blocks
471 }
472
473 pub fn preload_block(&self, blockidx: usize) -> CoreResult<()> {
485 if blockidx >= self.metadata.num_blocks {
486 return Err(CoreError::IndexError(ErrorContext::new(format!(
487 "Block index {} out of bounds (max {})",
488 blockidx,
489 self.metadata.num_blocks - 1
490 ))));
491 }
492
493 if self.block_cache.has_block(blockidx) {
495 return Ok(());
496 }
497
498 let block = self.load_block(blockidx)?;
500
501 self.block_cache.put_block(blockidx, block);
503
504 Ok(())
505 }
506
507 fn load_block(&self, blockidx: usize) -> CoreResult<Vec<A>> {
509 let mut file = File::open(&self.path)?;
511
512 let offset = self.metadata.block_offsets[blockidx];
514 let compressed_size = self.metadata.block_compressed_sizes[blockidx];
515 let uncompressed_size = self.metadata.block_uncompressed_sizes[blockidx];
516
517 file.seek(SeekFrom::Start(offset))?;
519 let mut compressed_data = vec![0u8; compressed_size];
520 file.read_exact(&mut compressed_data)?;
521
522 let block_bytes = match self.metadata.compression_algorithm {
524 CompressionAlgorithm::Lz4 => {
525 let mut decoder = Decoder::new(&compressed_data[..])?;
526 let mut decompressed = Vec::with_capacity(uncompressed_size);
527 decoder.read_to_end(&mut decompressed)?;
528 decompressed
529 }
530 CompressionAlgorithm::Zstd => zstd::decode_all(&compressed_data[..])?,
531 CompressionAlgorithm::Snappy => snap::raw::Decoder::new()
532 .decompress_vec(&compressed_data)
533 .map_err(|e| {
534 CoreError::ComputationError(ErrorContext::new(format!(
535 "Snappy decompression error: {}",
536 e
537 )))
538 })?,
539 };
540
541 if block_bytes.len() != uncompressed_size {
543 return Err(CoreError::ValueError(ErrorContext::new(format!(
544 "Block {} decompressed to {} bytes, expected {}",
545 blockidx,
546 block_bytes.len(),
547 uncompressed_size
548 ))));
549 }
550
551 let num_elements = uncompressed_size / std::mem::size_of::<A>();
553 let mut elements = Vec::with_capacity(num_elements);
554
555 for chunk in block_bytes.chunks_exact(std::mem::size_of::<A>()) {
557 let element = unsafe { *(chunk.as_ptr() as *const A) };
558 elements.push(element);
559 }
560
561 Ok(elements)
562 }
563
564 pub fn readonly_array(&self) -> CoreResult<Array<A, IxDyn>> {
572 let mut result = Array::from_elem(IxDyn(&self.metadata.shape), unsafe {
574 std::mem::zeroed::<A>()
575 });
576
577 let mut offset = 0;
579 for blockidx in 0..self.metadata.num_blocks {
580 let block = match self.block_cache.get_block(blockidx) {
582 Some(block) => block,
583 None => {
584 let block = self.load_block(blockidx)?;
585 self.block_cache.put_block(blockidx, block.clone());
586 block
587 }
588 };
589
590 let start = offset;
592 let end = (start + block.len()).min(self.metadata.num_elements);
593 let slice = &mut result.as_slice_mut().expect("Operation failed")[start..end];
594 slice.copy_from_slice(&block[..(end - start)]);
595
596 offset = end;
598 }
599
600 Ok(result)
601 }
602
603 pub fn get(&self, indices: &[usize]) -> CoreResult<A> {
615 if indices.len() != self.metadata.shape.len() {
617 return Err(CoreError::DimensionError(ErrorContext::new(format!(
618 "Expected {} indices, got {}",
619 self.metadata.shape.len(),
620 indices.len()
621 ))));
622 }
623
624 for (dim, &idx) in indices.iter().enumerate() {
625 if idx >= self.metadata.shape[dim] {
626 return Err(CoreError::IndexError(ErrorContext::new(format!(
627 "Index {} out of bounds for dimension {} (max {})",
628 idx,
629 dim,
630 self.metadata.shape[dim] - 1
631 ))));
632 }
633 }
634
635 let mut flat_index = 0;
637 let mut stride = 1;
638 for i in (0..indices.len()).rev() {
639 flat_index += indices[i] * stride;
640 if i > 0 {
641 stride *= self.metadata.shape[i];
642 }
643 }
644
645 let blockidx = flat_index / self.metadata.block_size;
647 let block_offset = flat_index % self.metadata.block_size;
648
649 let block = match self.block_cache.get_block(blockidx) {
651 Some(block) => block,
652 None => {
653 let block = self.load_block(blockidx)?;
654 self.block_cache.put_block(blockidx, block.clone());
655 block
656 }
657 };
658
659 if block_offset < block.len() {
661 Ok(block[block_offset])
662 } else {
663 Err(CoreError::IndexError(ErrorContext::new(format!(
664 "Block offset {} out of bounds for block {} (max {})",
665 block_offset,
666 blockidx,
667 block.len() - 1
668 ))))
669 }
670 }
671
672 pub fn slice(&self, ranges: &[(usize, usize)]) -> CoreResult<Array<A, IxDyn>> {
684 if ranges.len() != self.metadata.shape.len() {
686 return Err(CoreError::DimensionError(ErrorContext::new(format!(
687 "Expected {} ranges, got {}",
688 self.metadata.shape.len(),
689 ranges.len()
690 ))));
691 }
692
693 let mut resultshape = Vec::with_capacity(ranges.len());
695 for (dim, &(start, end)) in ranges.iter().enumerate() {
696 if start >= end {
697 return Err(CoreError::ValueError(ErrorContext::new(format!(
698 "Invalid range for dimension {}: {}..{}",
699 dim, start, end
700 ))));
701 }
702 if end > self.metadata.shape[dim] {
703 return Err(CoreError::IndexError(ErrorContext::new(format!(
704 "Range {}..{} out of bounds for dimension {} (max {})",
705 start, end, dim, self.metadata.shape[dim]
706 ))));
707 }
708 resultshape.push(end - start);
709 }
710
711 let mut result = Array::from_elem(IxDyn(&resultshape), unsafe { std::mem::zeroed::<A>() });
713
714 let result_size = resultshape.iter().product::<usize>();
716
717 let mut result_indices = vec![0; ranges.len()];
719 let mut source_indices = Vec::with_capacity(ranges.len());
720 for &(start, _) in ranges.iter() {
721 source_indices.push(start);
722 }
723
724 for _result_flat_idx in 0..result_size {
726 let mut source_flat_idx = 0;
728 let mut stride = 1;
729 for i in (0..source_indices.len()).rev() {
730 source_flat_idx += source_indices[i] * stride;
731 if i > 0 {
732 stride *= self.metadata.shape[i];
733 }
734 }
735
736 let blockidx = source_flat_idx / self.metadata.block_size;
738 let block_offset = source_flat_idx % self.metadata.block_size;
739
740 let block = match self.block_cache.get_block(blockidx) {
742 Some(block) => block,
743 None => {
744 let block = self.load_block(blockidx)?;
745 self.block_cache.put_block(blockidx, block.clone());
746 block
747 }
748 };
749
750 if block_offset < block.len() {
752 let mut result_stride = 1;
754 let mut result_flat_idx = 0;
755 for i in (0..result_indices.len()).rev() {
756 result_flat_idx += result_indices[i] * result_stride;
757 if i > 0 {
758 result_stride *= resultshape[i];
759 }
760 }
761
762 let result_slice = result.as_slice_mut().expect("Operation failed");
764 result_slice[result_flat_idx] = block[block_offset];
765 }
766
767 for i in (0..ranges.len()).rev() {
769 result_indices[i] += 1;
770 source_indices[i] += 1;
771 if result_indices[i] < resultshape[i] {
772 break;
773 }
774 result_indices[i] = 0;
775 source_indices[i] = ranges[i].0;
776 }
777 }
778
779 Ok(result)
780 }
781
782 pub fn process_blocks<F, R>(&self, f: F) -> CoreResult<Vec<R>>
794 where
795 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
796 R: Send + 'static,
797 {
798 self.process_blocks_internal(f, false, None)
799 }
800
801 pub fn process_blocks_with_size<F, R>(&self, blocksize: usize, f: F) -> CoreResult<Vec<R>>
812 where
813 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
814 R: Send + 'static,
815 {
816 self.process_blocks_internal(f, false, Some(blocksize))
817 }
818
819 #[cfg(feature = "parallel")]
829 pub fn process_blocks_parallel<F, R>(&self, f: F) -> CoreResult<Vec<R>>
830 where
831 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
832 R: Send + 'static,
833 {
834 self.process_blocks_internal(f, true, None)
835 }
836
837 #[cfg(feature = "parallel")]
848 pub fn process_blocks_parallel_with_size<F, R>(
849 &self,
850 block_size: usize,
851 f: F,
852 ) -> CoreResult<Vec<R>>
853 where
854 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
855 R: Send + 'static,
856 {
857 self.process_blocks_internal(f, true, Some(block_size))
858 }
859
860 #[cfg(not(feature = "parallel"))]
862 fn process_blocks_internal<F, R>(
863 &self,
864 f: F,
865 _parallel: bool,
866 custom_block_size: Option<usize>,
867 ) -> CoreResult<Vec<R>>
868 where
869 F: FnMut(&[A], usize) -> R,
870 {
871 let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
873 let num_elements = self.metadata.num_elements;
874 let num_blocks = num_elements.div_ceil(block_size);
875
876 (0..num_blocks)
878 .map(|blockidx| {
879 let start = blockidx * block_size;
881 let end = (start + block_size).min(num_elements);
882
883 let elements = self.load_elements(start, end)?;
885
886 Ok(f(&elements, blockidx))
888 })
889 .collect::<Result<Vec<R>, CoreError>>()
890 }
891
892 #[cfg(feature = "parallel")]
894 fn process_blocks_internal<F, R>(
895 &self,
896 f: F,
897 parallel: bool,
898 custom_block_size: Option<usize>,
899 ) -> CoreResult<Vec<R>>
900 where
901 F: Fn(&[A], usize) -> R + Send + Sync + 'static,
902 R: Send + 'static,
903 {
904 let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
906 let num_elements = self.metadata.num_elements;
907 let num_blocks = num_elements.div_ceil(block_size);
908
909 if parallel {
911 use crate::parallel_ops::*;
912
913 return (0..num_blocks)
914 .into_par_iter()
915 .map(|blockidx| {
916 let start = blockidx * block_size;
918 let end = (start + block_size).min(num_elements);
919
920 let elements = match self.load_elements(start, end) {
922 Ok(elems) => elems,
923 Err(e) => return Err(e),
924 };
925
926 Ok(f(&elements, blockidx))
928 })
929 .collect::<Result<Vec<R>, CoreError>>();
930 }
931
932 (0..num_blocks)
934 .map(|blockidx| {
935 let start = blockidx * block_size;
937 let end = (start + block_size).min(num_elements);
938
939 let elements = self.load_elements(start, end)?;
941
942 Ok(f(&elements, blockidx))
944 })
945 .collect::<Result<Vec<R>, CoreError>>()
946 }
947
948 fn load_elements(&self, start: usize, end: usize) -> CoreResult<Vec<A>> {
961 if start >= self.metadata.num_elements {
963 return Err(CoreError::IndexError(ErrorContext::new(format!(
964 "Start index {} out of bounds (max {})",
965 start,
966 self.metadata.num_elements - 1
967 ))));
968 }
969 if end > self.metadata.num_elements {
970 return Err(CoreError::IndexError(ErrorContext::new(format!(
971 "End index {} out of bounds (max {})",
972 end, self.metadata.num_elements
973 ))));
974 }
975 if start >= end {
976 return Ok(Vec::new());
977 }
978
979 let start_block = start / self.metadata.block_size;
981 let end_block = (end - 1) / self.metadata.block_size;
982
983 let mut result = Vec::with_capacity(end - start);
985
986 for blockidx in start_block..=end_block {
988 let block = match self.block_cache.get_block(blockidx) {
990 Some(block) => block,
991 None => {
992 let block = self.load_block(blockidx)?;
993 self.block_cache.put_block(blockidx, block.clone());
994 block
995 }
996 };
997
998 let block_start = blockidx * self.metadata.block_size;
1000 let block_end = block_start + block.len();
1001
1002 let range_start = start.max(block_start) - block_start;
1003 let range_end = end.min(block_end) - block_start;
1004
1005 result.extend_from_slice(&block[range_start..range_end]);
1007 }
1008
1009 Ok(result)
1010 }
1011}
1012
1013#[derive(Debug)]
1017struct BlockCache<A: Clone + Copy + 'static + Send + Sync> {
1018 capacity: usize,
1020
1021 ttl: Option<Duration>,
1023
1024 cache: RwLock<HashMap<usize, CachedBlock<A>>>,
1026}
1027
1028#[derive(Debug, Clone)]
1030struct CachedBlock<A: Clone + Copy + 'static + Send + Sync> {
1031 data: Vec<A>,
1033
1034 timestamp: Instant,
1036}
1037
1038impl<A: Clone + Copy + 'static + Send + Sync> BlockCache<A> {
1039 fn new(capacity: usize, ttl: Option<Duration>) -> Self {
1046 Self {
1047 capacity,
1048 ttl,
1049 cache: RwLock::new(HashMap::new()),
1050 }
1051 }
1052
1053 fn has_block(&self, blockidx: usize) -> bool {
1063 let cache = self.cache.read().expect("Operation failed");
1064
1065 if let Some(cached) = cache.get(&blockidx) {
1067 if let Some(ttl) = self.ttl {
1069 if cached.timestamp.elapsed() > ttl {
1070 return false;
1071 }
1072 }
1073
1074 true
1075 } else {
1076 false
1077 }
1078 }
1079
1080 fn get_block(&self, blockidx: usize) -> Option<Vec<A>> {
1090 let mut cache = self.cache.write().expect("Operation failed");
1091
1092 if let Some(mut cached) = cache.remove(&blockidx) {
1094 if let Some(ttl) = self.ttl {
1096 if cached.timestamp.elapsed() > ttl {
1097 return None;
1098 }
1099 }
1100
1101 cached.timestamp = Instant::now();
1103
1104 let data = cached.data.clone();
1106 cache.insert(blockidx, cached);
1107
1108 Some(data)
1109 } else {
1110 None
1111 }
1112 }
1113
1114 fn put_block(&self, blockidx: usize, block: Vec<A>) {
1121 let mut cache = self.cache.write().expect("Operation failed");
1122
1123 if cache.len() >= self.capacity && !cache.contains_key(&blockidx) {
1125 if let Some(lru_idx) = cache
1127 .iter()
1128 .min_by_key(|(_, cached)| cached.timestamp)
1129 .map(|(idx, _)| *idx)
1130 {
1131 cache.remove(&lru_idx);
1132 }
1133 }
1134
1135 cache.insert(
1137 blockidx,
1138 CachedBlock {
1139 data: block,
1140 timestamp: Instant::now(),
1141 },
1142 );
1143 }
1144
1145 #[allow(dead_code)]
1147 fn clear(&self) {
1148 let mut cache = self.cache.write().expect("Operation failed");
1149 cache.clear();
1150 }
1151
1152 #[allow(dead_code)]
1154 fn len(&self) -> usize {
1155 let cache = self.cache.read().expect("Operation failed");
1156 cache.len()
1157 }
1158
1159 #[allow(dead_code)]
1161 fn is_empty(&self) -> bool {
1162 let cache = self.cache.read().expect("Operation failed");
1163 cache.is_empty()
1164 }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169 use super::*;
1170 use ::ndarray::Array2;
1171 use tempfile::tempdir;
1172
1173 #[test]
1174 fn test_compressed_memmapped_array_1d() {
1175 let dir = tempdir().expect("Operation failed");
1177 let file_path = dir.path().join("test_compressed_1d.cmm");
1178
1179 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1181
1182 let builder = CompressedMemMapBuilder::new()
1184 .with_block_size(100)
1185 .with_algorithm(CompressionAlgorithm::Lz4)
1186 .with_level(1)
1187 .with_cache_size(4)
1188 .with_description("Test 1D array");
1189
1190 let cmm = builder
1192 .create_from_raw(&data, &[1000], &file_path)
1193 .expect("Operation failed");
1194
1195 assert_eq!(cmm.shape(), &[1000]);
1197 assert_eq!(cmm.size(), 1000);
1198 assert_eq!(cmm.ndim(), 1);
1199
1200 for i in 0..10 {
1202 let val = cmm.get(&[i * 100]).expect("Operation failed");
1203 assert_eq!(val, (i * 100) as f64);
1204 }
1205
1206 let slice = cmm.slice(&[(200, 300)]).expect("Operation failed");
1208 assert_eq!(slice.shape(), &[100]);
1209 for i in 0..100 {
1210 assert_eq!(slice[crate::ndarray::IxDyn(&[i])], (i + 200) as f64);
1211 }
1212
1213 let sums = cmm
1215 .process_blocks(|block, _| block.iter().sum::<f64>())
1216 .expect("Test: operation failed");
1217
1218 assert_eq!(sums.len(), 10); let array = cmm.readonly_array().expect("Operation failed");
1222 assert_eq!(array.shape(), &[1000]);
1223 for i in 0..1000 {
1224 assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1225 }
1226 }
1227
1228 #[test]
1229 fn test_compressed_memmapped_array_2d() {
1230 let dir = tempdir().expect("Operation failed");
1232 let file_path = dir.path().join("test_compressed_2d.cmm");
1233
1234 let data = Array2::<f64>::from_shape_fn((10, 10), |(i, j)| (i * 10 + j) as f64);
1236
1237 let builder = CompressedMemMapBuilder::new()
1239 .with_block_size(25) .with_algorithm(CompressionAlgorithm::Lz4)
1241 .with_level(1)
1242 .with_cache_size(4)
1243 .with_description("Test 2D array");
1244
1245 let cmm = builder.create(&data, &file_path).expect("Operation failed");
1247
1248 assert_eq!(cmm.shape(), &[10, 10]);
1250 assert_eq!(cmm.size(), 100);
1251 assert_eq!(cmm.ndim(), 2);
1252
1253 for i in 0..10 {
1255 for j in 0..10 {
1256 let val = cmm.get(&[i, j]).expect("Operation failed");
1257 assert_eq!(val, (i * 10 + j) as f64);
1258 }
1259 }
1260
1261 let slice = cmm.slice(&[(2, 5), (3, 7)]).expect("Operation failed");
1263 assert_eq!(slice.shape(), &[3, 4]);
1264 for i in 0..3 {
1265 for j in 0..4 {
1266 assert_eq!(
1267 slice[crate::ndarray::IxDyn(&[i, j])],
1268 ((i + 2) * 10 + (j + 3)) as f64
1269 );
1270 }
1271 }
1272
1273 let array = cmm.readonly_array().expect("Operation failed");
1275 assert_eq!(array.shape(), &[10, 10]);
1276 for i in 0..10 {
1277 for j in 0..10 {
1278 assert_eq!(array[crate::ndarray::IxDyn(&[i, j])], (i * 10 + j) as f64);
1279 }
1280 }
1281 }
1282
1283 #[test]
1284 fn test_different_compression_algorithms() {
1285 let dir = tempdir().expect("Operation failed");
1287
1288 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1290
1291 for algorithm in &[
1293 CompressionAlgorithm::Lz4,
1294 CompressionAlgorithm::Zstd,
1295 CompressionAlgorithm::Snappy,
1296 ] {
1297 let file_path = dir.path().join(format!("test_{:?}.cmm", algorithm));
1298
1299 let builder = CompressedMemMapBuilder::new()
1301 .with_block_size(100)
1302 .with_algorithm(*algorithm)
1303 .with_level(1)
1304 .with_cache_size(4);
1305
1306 let cmm = builder
1308 .create_from_raw(&data, &[1000], &file_path)
1309 .expect("Operation failed");
1310
1311 let array = cmm.readonly_array().expect("Operation failed");
1313 for i in 0..1000 {
1314 assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1315 }
1316 }
1317 }
1318
1319 #[test]
1320 fn test_block_cache() {
1321 let dir = tempdir().expect("Operation failed");
1323 let file_path = dir.path().join("test_cache.cmm");
1324
1325 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1327
1328 let small_cache = CompressedMemMapBuilder::new()
1330 .with_block_size(100)
1331 .with_cache_size(2) .create_from_raw(&data, &[1000], &file_path).expect("Operation failed");
1333
1334 for i in 0..10 {
1337 small_cache.preload_block(i).expect("Operation failed");
1338 }
1339
1340 assert_eq!(small_cache.block_cache.len(), 2);
1342
1343 let val = small_cache.get(&[0]).expect("Operation failed"); assert_eq!(val, 0.0);
1347
1348 assert!(small_cache.block_cache.has_block(0));
1350 }
1351
1352 #[test]
1353 fn test_block_preloading() {
1354 let dir = tempdir().expect("Operation failed");
1356 let file_path = dir.path().join("test_preload.cmm");
1357
1358 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1360
1361 let cmm = CompressedMemMapBuilder::new()
1363 .with_block_size(100)
1364 .create_from_raw(&data, &[1000], &file_path)
1365 .expect("Test: operation failed");
1366
1367 cmm.preload_block(5).expect("Operation failed");
1369
1370 assert!(cmm.block_cache.has_block(5));
1372
1373 let val = cmm.get(&[550]).expect("Operation failed"); assert_eq!(val, 550.0);
1376 }
1377}