Skip to main content

scirs2_core/memory_efficient/
compressed_memmap.rs

1//! Compressed memory-mapped arrays.
2//!
3//! This module provides functionality for memory-mapping arrays with transparent
4//! compression and decompression. This can significantly reduce disk space
5//! requirements while maintaining the benefits of memory-mapping for large data.
6//!
7//! The implementation uses a block-based approach, where data is split into blocks
8//! that are compressed independently. This allows for efficient random access and
9//! partial loading of the data.
10
11use crate::error::{CoreError, CoreResult, ErrorContext};
12use ::ndarray::{Array, ArrayBase, Dimension, IxDyn, RawData};
13
14#[cfg(feature = "memory_compression")]
15use oxiarc_lz4;
16#[cfg(feature = "memory_compression")]
17use oxiarc_zstd;
18
19use std::collections::HashMap;
20use std::fs::{File, OpenOptions};
21use std::io::{Read, Seek, SeekFrom, Write};
22use std::path::{Path, PathBuf};
23use std::sync::{Arc, RwLock};
24use std::time::{Duration, Instant};
25
26/// Metadata for a compressed memory-mapped file
27#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
28pub struct CompressedFileMetadata {
29    /// Original array shape
30    pub shape: Vec<usize>,
31
32    /// Element type information (size in bytes)
33    pub element_size: usize,
34
35    /// Total number of elements in the array
36    pub num_elements: usize,
37
38    /// Block size in elements
39    pub block_size: usize,
40
41    /// Number of blocks
42    pub num_blocks: usize,
43
44    /// Offset of each block in the compressed file
45    pub block_offsets: Vec<u64>,
46
47    /// Compressed size of each block
48    pub block_compressed_sizes: Vec<usize>,
49
50    /// Uncompressed size of each block
51    pub block_uncompressed_sizes: Vec<usize>,
52
53    /// Compression algorithm used
54    pub compression_algorithm: CompressionAlgorithm,
55
56    /// Compression level
57    pub compression_level: i32,
58
59    /// Creation timestamp
60    pub creation_time: chrono::DateTime<chrono::Utc>,
61
62    /// Optional description
63    pub description: Option<String>,
64}
65
66/// Supported compression algorithms
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
68pub enum CompressionAlgorithm {
69    /// LZ4 compression
70    #[default]
71    Lz4,
72    /// Zstd compression
73    Zstd,
74    /// Snappy compression
75    Snappy,
76}
77
78/// Builder for compressed memory-mapped arrays.
79#[derive(Debug, Clone)]
80pub struct CompressedMemMapBuilder {
81    /// Block size in elements
82    block_size: usize,
83
84    /// Compression algorithm
85    algorithm: CompressionAlgorithm,
86
87    /// Compression level
88    level: i32,
89
90    /// Maximum cache size in blocks
91    cache_size: usize,
92
93    /// Cache time-to-live in seconds
94    cache_ttl: Option<Duration>,
95
96    /// Description
97    description: Option<String>,
98}
99
100impl Default for CompressedMemMapBuilder {
101    fn default() -> Self {
102        Self {
103            block_size: 65536, // 64K elements by default
104            algorithm: CompressionAlgorithm::Lz4,
105            level: 1,                                  // Default compression level
106            cache_size: 32,                            // Cache 32 blocks by default
107            cache_ttl: Some(Duration::from_secs(300)), // 5 minute TTL
108            description: None,
109        }
110    }
111}
112
113impl CompressedMemMapBuilder {
114    /// Create a new builder with default settings.
115    pub fn new() -> Self {
116        Self::default()
117    }
118
119    /// Set the block size in elements.
120    ///
121    /// Larger blocks provide better compression but slower random access.
122    pub fn with_block_size(mut self, blocksize: usize) -> Self {
123        self.block_size = blocksize;
124        self
125    }
126
127    /// Set the compression algorithm.
128    pub fn with_algorithm(mut self, algorithm: CompressionAlgorithm) -> Self {
129        self.algorithm = algorithm;
130        self
131    }
132
133    /// Set the compression level.
134    ///
135    /// Higher levels provide better compression but slower compression speed.
136    /// The valid range depends on the algorithm.
137    /// - LZ4: 1-12
138    /// - Zstd: 1-22
139    /// - Snappy: Ignores this parameter
140    pub fn with_level(mut self, level: i32) -> Self {
141        self.level = level;
142        self
143    }
144
145    /// Set the maximum cache size in blocks.
146    ///
147    /// Larger cache sizes allow for more decompressed blocks to be held in memory,
148    /// potentially improving performance for repeated access patterns.
149    pub fn with_cache_size(mut self, cachesize: usize) -> Self {
150        self.cache_size = cachesize;
151        self
152    }
153
154    /// Set the cache time-to-live duration.
155    ///
156    /// Blocks will be evicted from cache after this duration if not accessed.
157    /// Set to None for no time-based eviction.
158    pub fn with_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
159        self.cache_ttl = ttl;
160        self
161    }
162
163    /// Set an optional description.
164    pub fn with_description(mut self, description: impl Into<String>) -> Self {
165        self.description = Some(description.into());
166        self
167    }
168
169    /// Create a compressed memory-mapped array from existing data.
170    ///
171    /// # Arguments
172    ///
173    /// * `data` - The data to compress
174    /// * `path` - The file path to save the compressed data
175    ///
176    /// # Returns
177    ///
178    /// A compressed memory-mapped array
179    pub fn create<A, S, D>(
180        &self,
181        data: &ArrayBase<S, D>,
182        path: impl AsRef<Path>,
183    ) -> CoreResult<CompressedMemMappedArray<A>>
184    where
185        A: Clone + Copy + 'static + Send + Sync,
186        S: RawData<Elem = A>,
187        D: Dimension,
188    {
189        // Get array information
190        let shape = data.shape().to_vec();
191        let num_elements = data.len();
192        let element_size = std::mem::size_of::<A>();
193
194        // Calculate block information
195        let block_size = self.block_size.min(num_elements);
196        let num_blocks = num_elements.div_ceil(block_size);
197
198        // Prepare metadata
199        let mut metadata = CompressedFileMetadata {
200            shape,
201            element_size,
202            num_elements,
203            block_size,
204            num_blocks,
205            block_offsets: Vec::with_capacity(num_blocks),
206            block_compressed_sizes: Vec::with_capacity(num_blocks),
207            block_uncompressed_sizes: Vec::with_capacity(num_blocks),
208            compression_algorithm: self.algorithm,
209            compression_level: self.level,
210            creation_time: chrono::Utc::now(),
211            description: self.description.clone(),
212        };
213
214        // Create output file
215        let path = path.as_ref();
216        let mut file = OpenOptions::new()
217            .read(true)
218            .write(true)
219            .create(true)
220            .truncate(true)
221            .open(path)?;
222
223        // Reserve space for metadata (will write it after compression)
224        let metadata_placeholder = vec![0u8; 1024];
225        file.write_all(&metadata_placeholder)?;
226
227        // Compress each block
228        let data_ptr = data.as_ptr() as *const u8;
229        let mut current_offset = metadata_placeholder.len() as u64;
230
231        for blockidx in 0..num_blocks {
232            let start_element = blockidx * block_size;
233            let end_element = (start_element + block_size).min(num_elements);
234            let block_elements = end_element - start_element;
235            let uncompressed_size = block_elements * element_size;
236
237            // Record the block offset
238            metadata.block_offsets.push(current_offset);
239            metadata.block_uncompressed_sizes.push(uncompressed_size);
240
241            // Get the data for this block
242            let data_offset = start_element * element_size;
243            let block_data =
244                unsafe { std::slice::from_raw_parts(data_ptr.add(data_offset), uncompressed_size) };
245
246            // Compress the block
247            let compressed_data = match self.algorithm {
248                CompressionAlgorithm::Lz4 => oxiarc_lz4::compress(block_data).map_err(|e| {
249                    CoreError::ComputationError(ErrorContext::new(format!(
250                        "LZ4 compression error: {}",
251                        e
252                    )))
253                })?,
254                CompressionAlgorithm::Zstd => {
255                    oxiarc_zstd::compress_with_level(block_data, self.level).map_err(|e| {
256                        CoreError::ComputationError(ErrorContext::new(format!(
257                            "Zstd compression error: {}",
258                            e
259                        )))
260                    })?
261                }
262                CompressionAlgorithm::Snappy => snap::raw::Encoder::new()
263                    .compress_vec(block_data)
264                    .map_err(|e| {
265                        CoreError::ComputationError(ErrorContext::new(format!(
266                            "Snappy compression error: {}",
267                            e
268                        )))
269                    })?,
270            };
271
272            // Record the compressed size
273            metadata.block_compressed_sizes.push(compressed_data.len());
274
275            // Write the compressed block
276            file.write_all(&compressed_data)?;
277
278            // Update the offset for the next block
279            current_offset += compressed_data.len() as u64;
280        }
281
282        // Write the metadata at the beginning of the file
283        let metadata_json = serde_json::to_string(&metadata).map_err(|e| {
284            CoreError::ValueError(ErrorContext::new(format!(
285                "Failed to serialize metadata: {}",
286                e
287            )))
288        })?;
289        let mut metadata_bytes = metadata_json.into_bytes();
290
291        // Ensure the metadata fits in the reserved space
292        if metadata_bytes.len() > metadata_placeholder.len() {
293            return Err(CoreError::ValueError(ErrorContext::new(format!(
294                "Metadata size ({} bytes) exceeds reserved space ({} bytes)",
295                metadata_bytes.len(),
296                metadata_placeholder.len()
297            ))));
298        }
299
300        // Pad the metadata to the reserved size
301        metadata_bytes.resize(metadata_placeholder.len(), 0);
302
303        // Write the metadata
304        file.seek(SeekFrom::Start(0))?;
305        file.write_all(&metadata_bytes)?;
306
307        // Create and return the compressed memory-mapped array
308        let compressed_mmap = CompressedMemMappedArray::open_impl(
309            path.to_path_buf(),
310            self.cache_size,
311            self.cache_ttl,
312        )?;
313
314        Ok(compressed_mmap)
315    }
316
317    /// Create a compressed memory-mapped array from raw data.
318    ///
319    /// # Arguments
320    ///
321    /// * `data` - The raw data to compress
322    /// * `shape` - The shape of the array
323    /// * `path` - The file path to save the compressed data
324    ///
325    /// # Returns
326    ///
327    /// A compressed memory-mapped array
328    pub fn create_from_raw<A>(
329        &self,
330        data: &[A],
331        shape: &[usize],
332        path: impl AsRef<Path>,
333    ) -> CoreResult<CompressedMemMappedArray<A>>
334    where
335        A: Clone + Copy + 'static + Send + Sync,
336    {
337        // Create ndarray from raw data
338        let array = Array::from_shape_vec(IxDyn(shape), data.to_vec())
339            .map_err(|e| CoreError::ShapeError(ErrorContext::new(format!("{e}"))))?;
340
341        // Create compressed memory-mapped array
342        self.create(&array, path)
343    }
344}
345
346/// A memory-mapped array with transparent compression.
347///
348/// This struct provides a view into a compressed array stored on disk,
349/// with transparent decompression of blocks as they are accessed.
350#[derive(Debug, Clone)]
351pub struct CompressedMemMappedArray<A: Clone + Copy + 'static + Send + Sync> {
352    /// Path to the compressed file
353    path: PathBuf,
354
355    /// Metadata about the compressed file
356    metadata: CompressedFileMetadata,
357
358    /// Block cache for decompressed blocks
359    block_cache: Arc<BlockCache<A>>,
360
361    /// Phantom data for type parameter
362    phantom: std::marker::PhantomData<A>,
363}
364
365impl<A: Clone + Copy + 'static + Send + Sync> CompressedMemMappedArray<A> {
366    /// Open a compressed memory-mapped array from a file.
367    ///
368    /// # Arguments
369    ///
370    /// * `path` - The path to the compressed file
371    ///
372    /// # Returns
373    ///
374    /// A compressed memory-mapped array
375    pub fn open<P: AsRef<Path>>(path: P) -> CoreResult<Self> {
376        // Use default cache settings
377        let cache_size = 32;
378        let cache_ttl = Some(Duration::from_secs(300));
379
380        Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
381    }
382
383    /// Open a compressed memory-mapped array with custom cache settings.
384    ///
385    /// # Arguments
386    ///
387    /// * `path` - The path to the compressed file
388    /// * `cache_size` - The maximum number of blocks to cache
389    /// * `cache_ttl` - The time-to-live for cached blocks
390    ///
391    /// # Returns
392    ///
393    /// A compressed memory-mapped array
394    pub fn open_with_cache<P: AsRef<Path>>(
395        path: P,
396        cache_size: usize,
397        cache_ttl: Option<Duration>,
398    ) -> CoreResult<Self> {
399        Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
400    }
401
402    /// Internal implementation of open.
403    fn open_impl(
404        path: PathBuf,
405        cache_size: usize,
406        cache_ttl: Option<Duration>,
407    ) -> CoreResult<Self> {
408        // Open the file
409        let mut file = File::open(&path)?;
410
411        // Read the metadata from the beginning of the file
412        let mut metadata_bytes = vec![0u8; 1024];
413        file.read_exact(&mut metadata_bytes)?;
414
415        // Parse the metadata
416        // from_utf8_lossy doesn't return an error, it replaces invalid utf8 sequences
417        let metadata_json = String::from_utf8_lossy(&metadata_bytes)
418            .trim_end_matches('\0')
419            .to_string();
420        let metadata: CompressedFileMetadata =
421            serde_json::from_str(&metadata_json).map_err(|e| {
422                CoreError::ValueError(ErrorContext::new(format!(
423                    "Failed to deserialize metadata: {}",
424                    e
425                )))
426            })?;
427
428        // Check that the element _size matches
429        let expected_element_size = std::mem::size_of::<A>();
430        if metadata.element_size != expected_element_size {
431            return Err(CoreError::ValueError(ErrorContext::new(format!(
432                "Element _size mismatch: expected {}, got {}",
433                expected_element_size, metadata.element_size
434            ))));
435        }
436
437        // Create the block cache
438        let block_cache = Arc::new(BlockCache::new(cache_size, cache_ttl));
439
440        // Create and return the array
441        Ok(Self {
442            path,
443            metadata,
444            block_cache,
445            phantom: std::marker::PhantomData,
446        })
447    }
448
449    /// Get the shape of the array.
450    pub fn shape(&self) -> &[usize] {
451        &self.metadata.shape
452    }
453
454    /// Get the total number of elements in the array.
455    pub fn size(&self) -> usize {
456        self.metadata.num_elements
457    }
458
459    /// Get the number of dimensions of the array.
460    pub fn ndim(&self) -> usize {
461        self.metadata.shape.len()
462    }
463
464    /// Get the metadata for the compressed file.
465    pub fn metadata(&self) -> &CompressedFileMetadata {
466        &self.metadata
467    }
468
469    /// Get the block size.
470    pub fn block_size(&self) -> usize {
471        self.metadata.block_size
472    }
473
474    /// Get the number of blocks.
475    pub fn num_blocks(&self) -> usize {
476        self.metadata.num_blocks
477    }
478
479    /// Load a specific block into memory.
480    ///
481    /// This is useful for preloading blocks that will be accessed soon.
482    ///
483    /// # Arguments
484    ///
485    /// * `blockidx` - The index of the block to load
486    ///
487    /// # Returns
488    ///
489    /// `Ok(())` if successful, or an error
490    pub fn preload_block(&self, blockidx: usize) -> CoreResult<()> {
491        if blockidx >= self.metadata.num_blocks {
492            return Err(CoreError::IndexError(ErrorContext::new(format!(
493                "Block index {} out of bounds (max {})",
494                blockidx,
495                self.metadata.num_blocks - 1
496            ))));
497        }
498
499        // Check if the block is already cached
500        if self.block_cache.has_block(blockidx) {
501            return Ok(());
502        }
503
504        // Load and decompress the block
505        let block = self.load_block(blockidx)?;
506
507        // Add to cache
508        self.block_cache.put_block(blockidx, block);
509
510        Ok(())
511    }
512
513    /// Load a block from the compressed file.
514    fn load_block(&self, blockidx: usize) -> CoreResult<Vec<A>> {
515        // Open the file
516        let mut file = File::open(&self.path)?;
517
518        // Get block information
519        let offset = self.metadata.block_offsets[blockidx];
520        let compressed_size = self.metadata.block_compressed_sizes[blockidx];
521        let uncompressed_size = self.metadata.block_uncompressed_sizes[blockidx];
522
523        // Read the compressed block
524        file.seek(SeekFrom::Start(offset))?;
525        let mut compressed_data = vec![0u8; compressed_size];
526        file.read_exact(&mut compressed_data)?;
527
528        // Decompress the block
529        let block_bytes = match self.metadata.compression_algorithm {
530            CompressionAlgorithm::Lz4 => {
531                oxiarc_lz4::decompress(&compressed_data, uncompressed_size).map_err(|e| {
532                    CoreError::ComputationError(ErrorContext::new(format!(
533                        "LZ4 decompression error: {}",
534                        e
535                    )))
536                })?
537            }
538            CompressionAlgorithm::Zstd => {
539                oxiarc_zstd::decompress(&compressed_data).map_err(|e| {
540                    CoreError::ComputationError(ErrorContext::new(format!(
541                        "Zstd decompression error: {}",
542                        e
543                    )))
544                })?
545            }
546            CompressionAlgorithm::Snappy => snap::raw::Decoder::new()
547                .decompress_vec(&compressed_data)
548                .map_err(|e| {
549                    CoreError::ComputationError(ErrorContext::new(format!(
550                        "Snappy decompression error: {}",
551                        e
552                    )))
553                })?,
554        };
555
556        // Check that we got the expected number of bytes
557        if block_bytes.len() != uncompressed_size {
558            return Err(CoreError::ValueError(ErrorContext::new(format!(
559                "Block {} decompressed to {} bytes, expected {}",
560                blockidx,
561                block_bytes.len(),
562                uncompressed_size
563            ))));
564        }
565
566        // Convert bytes to elements
567        let num_elements = uncompressed_size / std::mem::size_of::<A>();
568        let mut elements = Vec::with_capacity(num_elements);
569
570        // Interpret bytes as elements (this is safe because A is Copy)
571        for chunk in block_bytes.chunks_exact(std::mem::size_of::<A>()) {
572            let element = unsafe { *(chunk.as_ptr() as *const A) };
573            elements.push(element);
574        }
575
576        Ok(elements)
577    }
578
579    /// Get a read-only view of the entire array.
580    ///
581    /// This will decompress all blocks and load them into memory.
582    ///
583    /// # Returns
584    ///
585    /// A read-only ndarray view of the data
586    pub fn readonly_array(&self) -> CoreResult<Array<A, IxDyn>> {
587        // Allocate an array for the result
588        let mut result = Array::from_elem(IxDyn(&self.metadata.shape), unsafe {
589            std::mem::zeroed::<A>()
590        });
591
592        // Load each block and copy it into the result
593        let mut offset = 0;
594        for blockidx in 0..self.metadata.num_blocks {
595            // Get the block (from cache if available)
596            let block = match self.block_cache.get_block(blockidx) {
597                Some(block) => block,
598                None => {
599                    let block = self.load_block(blockidx)?;
600                    self.block_cache.put_block(blockidx, block.clone());
601                    block
602                }
603            };
604
605            // Copy the block into the result
606            let start = offset;
607            let end = (start + block.len()).min(self.metadata.num_elements);
608            let slice = &mut result.as_slice_mut().expect("Operation failed")[start..end];
609            slice.copy_from_slice(&block[..(end - start)]);
610
611            // Update the offset
612            offset = end;
613        }
614
615        Ok(result)
616    }
617
618    /// Get a specific element from the array.
619    ///
620    /// This will decompress only the block containing the element.
621    ///
622    /// # Arguments
623    ///
624    /// * `indices` - The indices of the element to get
625    ///
626    /// # Returns
627    ///
628    /// The element at the specified indices
629    pub fn get(&self, indices: &[usize]) -> CoreResult<A> {
630        // Check that the indices are valid
631        if indices.len() != self.metadata.shape.len() {
632            return Err(CoreError::DimensionError(ErrorContext::new(format!(
633                "Expected {} indices, got {}",
634                self.metadata.shape.len(),
635                indices.len()
636            ))));
637        }
638
639        for (dim, &idx) in indices.iter().enumerate() {
640            if idx >= self.metadata.shape[dim] {
641                return Err(CoreError::IndexError(ErrorContext::new(format!(
642                    "Index {} out of bounds for dimension {} (max {})",
643                    idx,
644                    dim,
645                    self.metadata.shape[dim] - 1
646                ))));
647            }
648        }
649
650        // Calculate flat index
651        let mut flat_index = 0;
652        let mut stride = 1;
653        for i in (0..indices.len()).rev() {
654            flat_index += indices[i] * stride;
655            if i > 0 {
656                stride *= self.metadata.shape[i];
657            }
658        }
659
660        // Calculate block index
661        let blockidx = flat_index / self.metadata.block_size;
662        let block_offset = flat_index % self.metadata.block_size;
663
664        // Get the block (from cache if available)
665        let block = match self.block_cache.get_block(blockidx) {
666            Some(block) => block,
667            None => {
668                let block = self.load_block(blockidx)?;
669                self.block_cache.put_block(blockidx, block.clone());
670                block
671            }
672        };
673
674        // Get the element
675        if block_offset < block.len() {
676            Ok(block[block_offset])
677        } else {
678            Err(CoreError::IndexError(ErrorContext::new(format!(
679                "Block offset {} out of bounds for block {} (max {})",
680                block_offset,
681                blockidx,
682                block.len() - 1
683            ))))
684        }
685    }
686
687    /// Get a subset of the array as a new array.
688    ///
689    /// This will decompress only the blocks containing the subset.
690    ///
691    /// # Arguments
692    ///
693    /// * `ranges` - The ranges of indices to get for each dimension
694    ///
695    /// # Returns
696    ///
697    /// A new array containing the subset
698    pub fn slice(&self, ranges: &[(usize, usize)]) -> CoreResult<Array<A, IxDyn>> {
699        // Check that the ranges are valid
700        if ranges.len() != self.metadata.shape.len() {
701            return Err(CoreError::DimensionError(ErrorContext::new(format!(
702                "Expected {} ranges, got {}",
703                self.metadata.shape.len(),
704                ranges.len()
705            ))));
706        }
707
708        // Calculate the shape of the result
709        let mut resultshape = Vec::with_capacity(ranges.len());
710        for (dim, &(start, end)) in ranges.iter().enumerate() {
711            if start >= end {
712                return Err(CoreError::ValueError(ErrorContext::new(format!(
713                    "Invalid range for dimension {}: {}..{}",
714                    dim, start, end
715                ))));
716            }
717            if end > self.metadata.shape[dim] {
718                return Err(CoreError::IndexError(ErrorContext::new(format!(
719                    "Range {}..{} out of bounds for dimension {} (max {})",
720                    start, end, dim, self.metadata.shape[dim]
721                ))));
722            }
723            resultshape.push(end - start);
724        }
725
726        // Allocate an array for the result
727        let mut result = Array::from_elem(IxDyn(&resultshape), unsafe { std::mem::zeroed::<A>() });
728
729        // Calculate the total number of elements in the result
730        let result_size = resultshape.iter().product::<usize>();
731
732        // Create iterators for the result indices
733        let mut result_indices = vec![0; ranges.len()];
734        let mut source_indices = Vec::with_capacity(ranges.len());
735        for &(start, _) in ranges.iter() {
736            source_indices.push(start);
737        }
738
739        // Iterate through all elements in the result
740        for _result_flat_idx in 0..result_size {
741            // Calculate source flat index
742            let mut source_flat_idx = 0;
743            let mut stride = 1;
744            for i in (0..source_indices.len()).rev() {
745                source_flat_idx += source_indices[i] * stride;
746                if i > 0 {
747                    stride *= self.metadata.shape[i];
748                }
749            }
750
751            // Get the element from the source
752            let blockidx = source_flat_idx / self.metadata.block_size;
753            let block_offset = source_flat_idx % self.metadata.block_size;
754
755            // Get the block (from cache if available)
756            let block = match self.block_cache.get_block(blockidx) {
757                Some(block) => block,
758                None => {
759                    let block = self.load_block(blockidx)?;
760                    self.block_cache.put_block(blockidx, block.clone());
761                    block
762                }
763            };
764
765            // Get the element and set it in the result
766            if block_offset < block.len() {
767                // Calculate the result flat index
768                let mut result_stride = 1;
769                let mut result_flat_idx = 0;
770                for i in (0..result_indices.len()).rev() {
771                    result_flat_idx += result_indices[i] * result_stride;
772                    if i > 0 {
773                        result_stride *= resultshape[i];
774                    }
775                }
776
777                // Set the element in the result
778                let result_slice = result.as_slice_mut().expect("Operation failed");
779                result_slice[result_flat_idx] = block[block_offset];
780            }
781
782            // Increment the indices
783            for i in (0..ranges.len()).rev() {
784                result_indices[i] += 1;
785                source_indices[i] += 1;
786                if result_indices[i] < resultshape[i] {
787                    break;
788                }
789                result_indices[i] = 0;
790                source_indices[i] = ranges[i].0;
791            }
792        }
793
794        Ok(result)
795    }
796
797    /// Process the array in blocks, with each block loaded and decompressed on demand.
798    ///
799    /// This is useful for operations that can be performed on blocks independently.
800    ///
801    /// # Arguments
802    ///
803    /// * `f` - A function that processes a block of elements
804    ///
805    /// # Returns
806    ///
807    /// A vector of results, one for each block
808    pub fn process_blocks<F, R>(&self, f: F) -> CoreResult<Vec<R>>
809    where
810        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
811        R: Send + 'static,
812    {
813        self.process_blocks_internal(f, false, None)
814    }
815
816    /// Process the array in blocks with a custom block size.
817    ///
818    /// # Arguments
819    ///
820    /// * `block_size` - The block size to use (in elements)
821    /// * `f` - A function that processes a block of elements
822    ///
823    /// # Returns
824    ///
825    /// A vector of results, one for each block
826    pub fn process_blocks_with_size<F, R>(&self, blocksize: usize, f: F) -> CoreResult<Vec<R>>
827    where
828        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
829        R: Send + 'static,
830    {
831        self.process_blocks_internal(f, false, Some(blocksize))
832    }
833
834    /// Process the array in blocks in parallel.
835    ///
836    /// # Arguments
837    ///
838    /// * `f` - A function that processes a block of elements
839    ///
840    /// # Returns
841    ///
842    /// A vector of results, one for each block
843    #[cfg(feature = "parallel")]
844    pub fn process_blocks_parallel<F, R>(&self, f: F) -> CoreResult<Vec<R>>
845    where
846        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
847        R: Send + 'static,
848    {
849        self.process_blocks_internal(f, true, None)
850    }
851
852    /// Process the array in blocks in parallel with a custom block size.
853    ///
854    /// # Arguments
855    ///
856    /// * `block_size` - The block size to use (in elements)
857    /// * `f` - A function that processes a block of elements
858    ///
859    /// # Returns
860    ///
861    /// A vector of results, one for each block
862    #[cfg(feature = "parallel")]
863    pub fn process_blocks_parallel_with_size<F, R>(
864        &self,
865        block_size: usize,
866        f: F,
867    ) -> CoreResult<Vec<R>>
868    where
869        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
870        R: Send + 'static,
871    {
872        self.process_blocks_internal(f, true, Some(block_size))
873    }
874
875    /// Internal implementation of block processing.
876    #[cfg(not(feature = "parallel"))]
877    fn process_blocks_internal<F, R>(
878        &self,
879        mut f: F,
880        _parallel: bool,
881        custom_block_size: Option<usize>,
882    ) -> CoreResult<Vec<R>>
883    where
884        F: FnMut(&[A], usize) -> R,
885    {
886        // Determine block layout
887        let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
888        let num_elements = self.metadata.num_elements;
889        let num_blocks = num_elements.div_ceil(block_size);
890
891        // Serial processing — f is FnMut so it requires mutable access per call
892        let mut results = Vec::with_capacity(num_blocks);
893        for blockidx in 0..num_blocks {
894            let start = blockidx * block_size;
895            let end = (start + block_size).min(num_elements);
896            let elements = self.load_elements(start, end)?;
897            results.push(f(&elements, blockidx));
898        }
899        Ok(results)
900    }
901
902    /// Internal implementation of block processing (parallel version).
903    #[cfg(feature = "parallel")]
904    fn process_blocks_internal<F, R>(
905        &self,
906        f: F,
907        parallel: bool,
908        custom_block_size: Option<usize>,
909    ) -> CoreResult<Vec<R>>
910    where
911        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
912        R: Send + 'static,
913    {
914        // Determine block layout
915        let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
916        let num_elements = self.metadata.num_elements;
917        let num_blocks = num_elements.div_ceil(block_size);
918
919        // Process blocks
920        if parallel {
921            use crate::parallel_ops::*;
922
923            return (0..num_blocks)
924                .into_par_iter()
925                .map(|blockidx| {
926                    // Calculate the range of elements for this block
927                    let start = blockidx * block_size;
928                    let end = (start + block_size).min(num_elements);
929
930                    // Load the elements for this block
931                    let elements = match self.load_elements(start, end) {
932                        Ok(elems) => elems,
933                        Err(e) => return Err(e),
934                    };
935
936                    // Apply the function to the block
937                    Ok(f(&elements, blockidx))
938                })
939                .collect::<Result<Vec<R>, CoreError>>();
940        }
941
942        // Serial processing (used when parallel=false)
943        (0..num_blocks)
944            .map(|blockidx| {
945                // Calculate the range of elements for this block
946                let start = blockidx * block_size;
947                let end = (start + block_size).min(num_elements);
948
949                // Load the elements for this block
950                let elements = self.load_elements(start, end)?;
951
952                // Apply the function to the block
953                Ok(f(&elements, blockidx))
954            })
955            .collect::<Result<Vec<R>, CoreError>>()
956    }
957
958    /// Load a range of elements from the array.
959    ///
960    /// This will decompress all blocks containing the range.
961    ///
962    /// # Arguments
963    ///
964    /// * `start` - The starting element index
965    /// * `end` - The ending element index (exclusive)
966    ///
967    /// # Returns
968    ///
969    /// A vector of elements in the range
970    fn load_elements(&self, start: usize, end: usize) -> CoreResult<Vec<A>> {
971        // Check bounds
972        if start >= self.metadata.num_elements {
973            return Err(CoreError::IndexError(ErrorContext::new(format!(
974                "Start index {} out of bounds (max {})",
975                start,
976                self.metadata.num_elements - 1
977            ))));
978        }
979        if end > self.metadata.num_elements {
980            return Err(CoreError::IndexError(ErrorContext::new(format!(
981                "End index {} out of bounds (max {})",
982                end, self.metadata.num_elements
983            ))));
984        }
985        if start >= end {
986            return Ok(Vec::new());
987        }
988
989        // Determine which blocks we need
990        let start_block = start / self.metadata.block_size;
991        let end_block = (end - 1) / self.metadata.block_size;
992
993        // Allocate space for the result
994        let mut result = Vec::with_capacity(end - start);
995
996        // Load each required block
997        for blockidx in start_block..=end_block {
998            // Get the block (from cache if available)
999            let block = match self.block_cache.get_block(blockidx) {
1000                Some(block) => block,
1001                None => {
1002                    let block = self.load_block(blockidx)?;
1003                    self.block_cache.put_block(blockidx, block.clone());
1004                    block
1005                }
1006            };
1007
1008            // Calculate the range of elements we need from this block
1009            let block_start = blockidx * self.metadata.block_size;
1010            let block_end = block_start + block.len();
1011
1012            let range_start = start.max(block_start) - block_start;
1013            let range_end = end.min(block_end) - block_start;
1014
1015            // Copy the elements into the result
1016            result.extend_from_slice(&block[range_start..range_end]);
1017        }
1018
1019        Ok(result)
1020    }
1021}
1022
1023/// Cache for decompressed blocks.
1024///
1025/// This struct provides a LRU (Least Recently Used) cache for decompressed blocks.
1026#[derive(Debug)]
1027struct BlockCache<A: Clone + Copy + 'static + Send + Sync> {
1028    /// Maximum number of blocks to cache
1029    capacity: usize,
1030
1031    /// Time-to-live for cached blocks
1032    ttl: Option<Duration>,
1033
1034    /// Cache of decompressed blocks
1035    cache: RwLock<HashMap<usize, CachedBlock<A>>>,
1036}
1037
1038/// A cached block with its timestamp.
1039#[derive(Debug, Clone)]
1040struct CachedBlock<A: Clone + Copy + 'static + Send + Sync> {
1041    /// The decompressed block data
1042    data: Vec<A>,
1043
1044    /// The time when the block was last accessed
1045    timestamp: Instant,
1046}
1047
1048impl<A: Clone + Copy + 'static + Send + Sync> BlockCache<A> {
1049    /// Create a new block cache.
1050    ///
1051    /// # Arguments
1052    ///
1053    /// * `capacity` - The maximum number of blocks to cache
1054    /// * `ttl` - The time-to-live for cached blocks
1055    fn new(capacity: usize, ttl: Option<Duration>) -> Self {
1056        Self {
1057            capacity,
1058            ttl,
1059            cache: RwLock::new(HashMap::new()),
1060        }
1061    }
1062
1063    /// Check if a block is in the cache.
1064    ///
1065    /// # Arguments
1066    ///
1067    /// * `blockidx` - The index of the block to check
1068    ///
1069    /// # Returns
1070    ///
1071    /// `true` if the block is in the cache, `false` otherwise
1072    fn has_block(&self, blockidx: usize) -> bool {
1073        let cache = self.cache.read().expect("Operation failed");
1074
1075        // Check if the block is in the cache
1076        if let Some(cached) = cache.get(&blockidx) {
1077            // Check if the block has expired
1078            if let Some(ttl) = self.ttl {
1079                if cached.timestamp.elapsed() > ttl {
1080                    return false;
1081                }
1082            }
1083
1084            true
1085        } else {
1086            false
1087        }
1088    }
1089
1090    /// Get a block from the cache.
1091    ///
1092    /// # Arguments
1093    ///
1094    /// * `blockidx` - The index of the block to get
1095    ///
1096    /// # Returns
1097    ///
1098    /// The block if it is in the cache, `None` otherwise
1099    fn get_block(&self, blockidx: usize) -> Option<Vec<A>> {
1100        let mut cache = self.cache.write().expect("Operation failed");
1101
1102        // Check if the block is in the cache
1103        if let Some(mut cached) = cache.remove(&blockidx) {
1104            // Check if the block has expired
1105            if let Some(ttl) = self.ttl {
1106                if cached.timestamp.elapsed() > ttl {
1107                    return None;
1108                }
1109            }
1110
1111            // Update the timestamp
1112            cached.timestamp = Instant::now();
1113
1114            // Put the block back in the cache
1115            let data = cached.data.clone();
1116            cache.insert(blockidx, cached);
1117
1118            Some(data)
1119        } else {
1120            None
1121        }
1122    }
1123
1124    /// Put a block in the cache.
1125    ///
1126    /// # Arguments
1127    ///
1128    /// * `blockidx` - The index of the block to put
1129    /// * `block` - The block data
1130    fn put_block(&self, blockidx: usize, block: Vec<A>) {
1131        let mut cache = self.cache.write().expect("Operation failed");
1132
1133        // Check if we need to evict a block
1134        if cache.len() >= self.capacity && !cache.contains_key(&blockidx) {
1135            // Find the least recently used block
1136            if let Some(lru_idx) = cache
1137                .iter()
1138                .min_by_key(|(_, cached)| cached.timestamp)
1139                .map(|(idx, _)| *idx)
1140            {
1141                cache.remove(&lru_idx);
1142            }
1143        }
1144
1145        // Add the block to the cache
1146        cache.insert(
1147            blockidx,
1148            CachedBlock {
1149                data: block,
1150                timestamp: Instant::now(),
1151            },
1152        );
1153    }
1154
1155    /// Clear the cache.
1156    #[allow(dead_code)]
1157    fn clear(&self) {
1158        let mut cache = self.cache.write().expect("Operation failed");
1159        cache.clear();
1160    }
1161
1162    /// Get the number of blocks in the cache.
1163    #[allow(dead_code)]
1164    fn len(&self) -> usize {
1165        let cache = self.cache.read().expect("Operation failed");
1166        cache.len()
1167    }
1168
1169    /// Check if the cache is empty.
1170    #[allow(dead_code)]
1171    fn is_empty(&self) -> bool {
1172        let cache = self.cache.read().expect("Operation failed");
1173        cache.is_empty()
1174    }
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179    use super::*;
1180    use ::ndarray::Array2;
1181    use tempfile::tempdir;
1182
1183    #[test]
1184    fn test_compressed_memmapped_array_1d() {
1185        // Create a temporary directory for our test files
1186        let dir = tempdir().expect("Operation failed");
1187        let file_path = dir.path().join("test_compressed_1d.cmm");
1188
1189        // Create test data
1190        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1191
1192        // Create a builder
1193        let builder = CompressedMemMapBuilder::new()
1194            .with_block_size(100)
1195            .with_algorithm(CompressionAlgorithm::Lz4)
1196            .with_level(1)
1197            .with_cache_size(4)
1198            .with_description("Test 1D array");
1199
1200        // Create the compressed memory-mapped array
1201        let cmm = builder
1202            .create_from_raw(&data, &[1000], &file_path)
1203            .expect("Operation failed");
1204
1205        // Check metadata
1206        assert_eq!(cmm.shape(), &[1000]);
1207        assert_eq!(cmm.size(), 1000);
1208        assert_eq!(cmm.ndim(), 1);
1209
1210        // Test random access
1211        for i in 0..10 {
1212            let val = cmm.get(&[i * 100]).expect("Operation failed");
1213            assert_eq!(val, (i * 100) as f64);
1214        }
1215
1216        // Test slicing
1217        let slice = cmm.slice(&[(200, 300)]).expect("Operation failed");
1218        assert_eq!(slice.shape(), &[100]);
1219        for i in 0..100 {
1220            assert_eq!(slice[crate::ndarray::IxDyn(&[i])], (i + 200) as f64);
1221        }
1222
1223        // Test block processing
1224        let sums = cmm
1225            .process_blocks(|block, _| block.iter().sum::<f64>())
1226            .expect("Test: operation failed");
1227
1228        assert_eq!(sums.len(), 10); // 1000 elements / 100 block size = 10 blocks
1229
1230        // Test loading the entire array
1231        let array = cmm.readonly_array().expect("Operation failed");
1232        assert_eq!(array.shape(), &[1000]);
1233        for i in 0..1000 {
1234            assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1235        }
1236    }
1237
1238    #[test]
1239    fn test_compressed_memmapped_array_2d() {
1240        // Create a temporary directory for our test files
1241        let dir = tempdir().expect("Operation failed");
1242        let file_path = dir.path().join("test_compressed_2d.cmm");
1243
1244        // Create test data - 10x10 matrix
1245        let data = Array2::<f64>::from_shape_fn((10, 10), |(i, j)| (i * 10 + j) as f64);
1246
1247        // Create a builder
1248        let builder = CompressedMemMapBuilder::new()
1249            .with_block_size(25) // 5x5 blocks
1250            .with_algorithm(CompressionAlgorithm::Lz4)
1251            .with_level(1)
1252            .with_cache_size(4)
1253            .with_description("Test 2D array");
1254
1255        // Create the compressed memory-mapped array
1256        let cmm = builder.create(&data, &file_path).expect("Operation failed");
1257
1258        // Check metadata
1259        assert_eq!(cmm.shape(), &[10, 10]);
1260        assert_eq!(cmm.size(), 100);
1261        assert_eq!(cmm.ndim(), 2);
1262
1263        // Test random access
1264        for i in 0..10 {
1265            for j in 0..10 {
1266                let val = cmm.get(&[i, j]).expect("Operation failed");
1267                assert_eq!(val, (i * 10 + j) as f64);
1268            }
1269        }
1270
1271        // Test slicing
1272        let slice = cmm.slice(&[(2, 5), (3, 7)]).expect("Operation failed");
1273        assert_eq!(slice.shape(), &[3, 4]);
1274        for i in 0..3 {
1275            for j in 0..4 {
1276                assert_eq!(
1277                    slice[crate::ndarray::IxDyn(&[i, j])],
1278                    ((i + 2) * 10 + (j + 3)) as f64
1279                );
1280            }
1281        }
1282
1283        // Test loading the entire array
1284        let array = cmm.readonly_array().expect("Operation failed");
1285        assert_eq!(array.shape(), &[10, 10]);
1286        for i in 0..10 {
1287            for j in 0..10 {
1288                assert_eq!(array[crate::ndarray::IxDyn(&[i, j])], (i * 10 + j) as f64);
1289            }
1290        }
1291    }
1292
1293    #[test]
1294    fn test_different_compression_algorithms() {
1295        // Create a temporary directory for our test files
1296        let dir = tempdir().expect("Operation failed");
1297
1298        // Create test data
1299        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1300
1301        // Test each compression algorithm
1302        for algorithm in &[
1303            CompressionAlgorithm::Lz4,
1304            CompressionAlgorithm::Zstd,
1305            CompressionAlgorithm::Snappy,
1306        ] {
1307            let file_path = dir.path().join(format!("test_{:?}.cmm", algorithm));
1308
1309            // Create a builder
1310            let builder = CompressedMemMapBuilder::new()
1311                .with_block_size(100)
1312                .with_algorithm(*algorithm)
1313                .with_level(1)
1314                .with_cache_size(4);
1315
1316            // Create the compressed memory-mapped array
1317            let cmm = builder
1318                .create_from_raw(&data, &[1000], &file_path)
1319                .expect("Operation failed");
1320
1321            // Test loading the entire array
1322            let array = cmm.readonly_array().expect("Operation failed");
1323            for i in 0..1000 {
1324                assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1325            }
1326        }
1327    }
1328
1329    #[test]
1330    fn test_block_cache() {
1331        // Create a temporary directory for our test files
1332        let dir = tempdir().expect("Operation failed");
1333        let file_path = dir.path().join("test_cache.cmm");
1334
1335        // Create test data
1336        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1337
1338        // Create compressed arrays with different cache settings
1339        let small_cache = CompressedMemMapBuilder::new()
1340            .with_block_size(100)
1341            .with_cache_size(2) // Very small cache
1342            .create_from_raw(&data, &[1000], &file_path).expect("Operation failed");
1343
1344        // Test cache behavior
1345        // First, load all blocks to fill the cache
1346        for i in 0..10 {
1347            small_cache.preload_block(i).expect("Operation failed");
1348        }
1349
1350        // Check the cache size - should be 2 (capacity)
1351        assert_eq!(small_cache.block_cache.len(), 2);
1352
1353        // Now access a block that's not in the cache
1354        // This should evict the least recently used block
1355        let val = small_cache.get(&[0]).expect("Operation failed"); // Block 0
1356        assert_eq!(val, 0.0);
1357
1358        // Check that the block is now in the cache
1359        assert!(small_cache.block_cache.has_block(0));
1360    }
1361
1362    #[test]
1363    fn test_block_preloading() {
1364        // Create a temporary directory for our test files
1365        let dir = tempdir().expect("Operation failed");
1366        let file_path = dir.path().join("test_preload.cmm");
1367
1368        // Create test data
1369        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1370
1371        // Create the compressed memory-mapped array
1372        let cmm = CompressedMemMapBuilder::new()
1373            .with_block_size(100)
1374            .create_from_raw(&data, &[1000], &file_path)
1375            .expect("Test: operation failed");
1376
1377        // Preload a block
1378        cmm.preload_block(5).expect("Operation failed");
1379
1380        // Check that the block is now in the cache
1381        assert!(cmm.block_cache.has_block(5));
1382
1383        // Access an element from the preloaded block
1384        let val = cmm.get(&[550]).expect("Operation failed"); // In block 5
1385        assert_eq!(val, 550.0);
1386    }
1387}