Skip to main content

scirs2_core/memory_efficient/
compressed_memmap.rs

1//! Compressed memory-mapped arrays.
2//!
3//! This module provides functionality for memory-mapping arrays with transparent
4//! compression and decompression. This can significantly reduce disk space
5//! requirements while maintaining the benefits of memory-mapping for large data.
6//!
7//! The implementation uses a block-based approach, where data is split into blocks
8//! that are compressed independently. This allows for efficient random access and
9//! partial loading of the data.
10
11use crate::error::{CoreError, CoreResult, ErrorContext};
12use ::ndarray::{Array, ArrayBase, Dimension, IxDyn, RawData};
13
14#[cfg(feature = "memory_compression")]
15use oxiarc_lz4;
16#[cfg(feature = "memory_compression")]
17use oxiarc_zstd;
18
19use std::collections::HashMap;
20use std::fs::{File, OpenOptions};
21use std::io::{Read, Seek, SeekFrom, Write};
22use std::path::{Path, PathBuf};
23use std::sync::{Arc, RwLock};
24use std::time::{Duration, Instant};
25
26/// Metadata for a compressed memory-mapped file
27#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
28pub struct CompressedFileMetadata {
29    /// Original array shape
30    pub shape: Vec<usize>,
31
32    /// Element type information (size in bytes)
33    pub element_size: usize,
34
35    /// Total number of elements in the array
36    pub num_elements: usize,
37
38    /// Block size in elements
39    pub block_size: usize,
40
41    /// Number of blocks
42    pub num_blocks: usize,
43
44    /// Offset of each block in the compressed file
45    pub block_offsets: Vec<u64>,
46
47    /// Compressed size of each block
48    pub block_compressed_sizes: Vec<usize>,
49
50    /// Uncompressed size of each block
51    pub block_uncompressed_sizes: Vec<usize>,
52
53    /// Compression algorithm used
54    pub compression_algorithm: CompressionAlgorithm,
55
56    /// Compression level
57    pub compression_level: i32,
58
59    /// Creation timestamp
60    pub creation_time: chrono::DateTime<chrono::Utc>,
61
62    /// Optional description
63    pub description: Option<String>,
64}
65
66/// Supported compression algorithms
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
68pub enum CompressionAlgorithm {
69    /// LZ4 compression
70    #[default]
71    Lz4,
72    /// Zstd compression
73    Zstd,
74    /// Snappy compression
75    Snappy,
76}
77
78/// Builder for compressed memory-mapped arrays.
79#[derive(Debug, Clone)]
80pub struct CompressedMemMapBuilder {
81    /// Block size in elements
82    block_size: usize,
83
84    /// Compression algorithm
85    algorithm: CompressionAlgorithm,
86
87    /// Compression level
88    level: i32,
89
90    /// Maximum cache size in blocks
91    cache_size: usize,
92
93    /// Cache time-to-live in seconds
94    cache_ttl: Option<Duration>,
95
96    /// Description
97    description: Option<String>,
98}
99
100impl Default for CompressedMemMapBuilder {
101    fn default() -> Self {
102        Self {
103            block_size: 65536, // 64K elements by default
104            algorithm: CompressionAlgorithm::Lz4,
105            level: 1,                                  // Default compression level
106            cache_size: 32,                            // Cache 32 blocks by default
107            cache_ttl: Some(Duration::from_secs(300)), // 5 minute TTL
108            description: None,
109        }
110    }
111}
112
113impl CompressedMemMapBuilder {
114    /// Create a new builder with default settings.
115    pub fn new() -> Self {
116        Self::default()
117    }
118
119    /// Set the block size in elements.
120    ///
121    /// Larger blocks provide better compression but slower random access.
122    pub fn with_block_size(mut self, blocksize: usize) -> Self {
123        self.block_size = blocksize;
124        self
125    }
126
127    /// Set the compression algorithm.
128    pub fn with_algorithm(mut self, algorithm: CompressionAlgorithm) -> Self {
129        self.algorithm = algorithm;
130        self
131    }
132
133    /// Set the compression level.
134    ///
135    /// Higher levels provide better compression but slower compression speed.
136    /// The valid range depends on the algorithm.
137    /// - LZ4: 1-12
138    /// - Zstd: 1-22
139    /// - Snappy: Ignores this parameter
140    pub fn with_level(mut self, level: i32) -> Self {
141        self.level = level;
142        self
143    }
144
145    /// Set the maximum cache size in blocks.
146    ///
147    /// Larger cache sizes allow for more decompressed blocks to be held in memory,
148    /// potentially improving performance for repeated access patterns.
149    pub fn with_cache_size(mut self, cachesize: usize) -> Self {
150        self.cache_size = cachesize;
151        self
152    }
153
154    /// Set the cache time-to-live duration.
155    ///
156    /// Blocks will be evicted from cache after this duration if not accessed.
157    /// Set to None for no time-based eviction.
158    pub fn with_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
159        self.cache_ttl = ttl;
160        self
161    }
162
163    /// Set an optional description.
164    pub fn with_description(mut self, description: impl Into<String>) -> Self {
165        self.description = Some(description.into());
166        self
167    }
168
169    /// Create a compressed memory-mapped array from existing data.
170    ///
171    /// # Arguments
172    ///
173    /// * `data` - The data to compress
174    /// * `path` - The file path to save the compressed data
175    ///
176    /// # Returns
177    ///
178    /// A compressed memory-mapped array
179    pub fn create<A, S, D>(
180        &self,
181        data: &ArrayBase<S, D>,
182        path: impl AsRef<Path>,
183    ) -> CoreResult<CompressedMemMappedArray<A>>
184    where
185        A: Clone + Copy + 'static + Send + Sync,
186        S: RawData<Elem = A>,
187        D: Dimension,
188    {
189        // Get array information
190        let shape = data.shape().to_vec();
191        let num_elements = data.len();
192        let element_size = std::mem::size_of::<A>();
193
194        // Calculate block information
195        let block_size = self.block_size.min(num_elements);
196        let num_blocks = num_elements.div_ceil(block_size);
197
198        // Prepare metadata
199        let mut metadata = CompressedFileMetadata {
200            shape,
201            element_size,
202            num_elements,
203            block_size,
204            num_blocks,
205            block_offsets: Vec::with_capacity(num_blocks),
206            block_compressed_sizes: Vec::with_capacity(num_blocks),
207            block_uncompressed_sizes: Vec::with_capacity(num_blocks),
208            compression_algorithm: self.algorithm,
209            compression_level: self.level,
210            creation_time: chrono::Utc::now(),
211            description: self.description.clone(),
212        };
213
214        // Create output file
215        let path = path.as_ref();
216        let mut file = OpenOptions::new()
217            .read(true)
218            .write(true)
219            .create(true)
220            .truncate(true)
221            .open(path)?;
222
223        // Reserve space for metadata (will write it after compression)
224        let metadata_placeholder = vec![0u8; 1024];
225        file.write_all(&metadata_placeholder)?;
226
227        // Compress each block
228        let data_ptr = data.as_ptr() as *const u8;
229        let mut current_offset = metadata_placeholder.len() as u64;
230
231        for blockidx in 0..num_blocks {
232            let start_element = blockidx * block_size;
233            let end_element = (start_element + block_size).min(num_elements);
234            let block_elements = end_element - start_element;
235            let uncompressed_size = block_elements * element_size;
236
237            // Record the block offset
238            metadata.block_offsets.push(current_offset);
239            metadata.block_uncompressed_sizes.push(uncompressed_size);
240
241            // Get the data for this block
242            let data_offset = start_element * element_size;
243            let block_data =
244                unsafe { std::slice::from_raw_parts(data_ptr.add(data_offset), uncompressed_size) };
245
246            // Compress the block
247            let compressed_data = match self.algorithm {
248                CompressionAlgorithm::Lz4 => oxiarc_lz4::compress(block_data).map_err(|e| {
249                    CoreError::ComputationError(ErrorContext::new(format!(
250                        "LZ4 compression error: {}",
251                        e
252                    )))
253                })?,
254                CompressionAlgorithm::Zstd => {
255                    oxiarc_zstd::compress_with_level(block_data, self.level).map_err(|e| {
256                        CoreError::ComputationError(ErrorContext::new(format!(
257                            "Zstd compression error: {}",
258                            e
259                        )))
260                    })?
261                }
262                CompressionAlgorithm::Snappy => oxiarc_snappy::compress(block_data),
263            };
264
265            // Record the compressed size
266            metadata.block_compressed_sizes.push(compressed_data.len());
267
268            // Write the compressed block
269            file.write_all(&compressed_data)?;
270
271            // Update the offset for the next block
272            current_offset += compressed_data.len() as u64;
273        }
274
275        // Write the metadata at the beginning of the file
276        let metadata_json = serde_json::to_string(&metadata).map_err(|e| {
277            CoreError::ValueError(ErrorContext::new(format!(
278                "Failed to serialize metadata: {}",
279                e
280            )))
281        })?;
282        let mut metadata_bytes = metadata_json.into_bytes();
283
284        // Ensure the metadata fits in the reserved space
285        if metadata_bytes.len() > metadata_placeholder.len() {
286            return Err(CoreError::ValueError(ErrorContext::new(format!(
287                "Metadata size ({} bytes) exceeds reserved space ({} bytes)",
288                metadata_bytes.len(),
289                metadata_placeholder.len()
290            ))));
291        }
292
293        // Pad the metadata to the reserved size
294        metadata_bytes.resize(metadata_placeholder.len(), 0);
295
296        // Write the metadata
297        file.seek(SeekFrom::Start(0))?;
298        file.write_all(&metadata_bytes)?;
299
300        // Create and return the compressed memory-mapped array
301        let compressed_mmap = CompressedMemMappedArray::open_impl(
302            path.to_path_buf(),
303            self.cache_size,
304            self.cache_ttl,
305        )?;
306
307        Ok(compressed_mmap)
308    }
309
310    /// Create a compressed memory-mapped array from raw data.
311    ///
312    /// # Arguments
313    ///
314    /// * `data` - The raw data to compress
315    /// * `shape` - The shape of the array
316    /// * `path` - The file path to save the compressed data
317    ///
318    /// # Returns
319    ///
320    /// A compressed memory-mapped array
321    pub fn create_from_raw<A>(
322        &self,
323        data: &[A],
324        shape: &[usize],
325        path: impl AsRef<Path>,
326    ) -> CoreResult<CompressedMemMappedArray<A>>
327    where
328        A: Clone + Copy + 'static + Send + Sync,
329    {
330        // Create ndarray from raw data
331        let array = Array::from_shape_vec(IxDyn(shape), data.to_vec())
332            .map_err(|e| CoreError::ShapeError(ErrorContext::new(format!("{e}"))))?;
333
334        // Create compressed memory-mapped array
335        self.create(&array, path)
336    }
337}
338
339/// A memory-mapped array with transparent compression.
340///
341/// This struct provides a view into a compressed array stored on disk,
342/// with transparent decompression of blocks as they are accessed.
343#[derive(Debug, Clone)]
344pub struct CompressedMemMappedArray<A: Clone + Copy + 'static + Send + Sync> {
345    /// Path to the compressed file
346    path: PathBuf,
347
348    /// Metadata about the compressed file
349    metadata: CompressedFileMetadata,
350
351    /// Block cache for decompressed blocks
352    block_cache: Arc<BlockCache<A>>,
353
354    /// Phantom data for type parameter
355    phantom: std::marker::PhantomData<A>,
356}
357
358impl<A: Clone + Copy + 'static + Send + Sync> CompressedMemMappedArray<A> {
359    /// Open a compressed memory-mapped array from a file.
360    ///
361    /// # Arguments
362    ///
363    /// * `path` - The path to the compressed file
364    ///
365    /// # Returns
366    ///
367    /// A compressed memory-mapped array
368    pub fn open<P: AsRef<Path>>(path: P) -> CoreResult<Self> {
369        // Use default cache settings
370        let cache_size = 32;
371        let cache_ttl = Some(Duration::from_secs(300));
372
373        Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
374    }
375
376    /// Open a compressed memory-mapped array with custom cache settings.
377    ///
378    /// # Arguments
379    ///
380    /// * `path` - The path to the compressed file
381    /// * `cache_size` - The maximum number of blocks to cache
382    /// * `cache_ttl` - The time-to-live for cached blocks
383    ///
384    /// # Returns
385    ///
386    /// A compressed memory-mapped array
387    pub fn open_with_cache<P: AsRef<Path>>(
388        path: P,
389        cache_size: usize,
390        cache_ttl: Option<Duration>,
391    ) -> CoreResult<Self> {
392        Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
393    }
394
395    /// Internal implementation of open.
396    fn open_impl(
397        path: PathBuf,
398        cache_size: usize,
399        cache_ttl: Option<Duration>,
400    ) -> CoreResult<Self> {
401        // Open the file
402        let mut file = File::open(&path)?;
403
404        // Read the metadata from the beginning of the file
405        let mut metadata_bytes = vec![0u8; 1024];
406        file.read_exact(&mut metadata_bytes)?;
407
408        // Parse the metadata
409        // from_utf8_lossy doesn't return an error, it replaces invalid utf8 sequences
410        let metadata_json = String::from_utf8_lossy(&metadata_bytes)
411            .trim_end_matches('\0')
412            .to_string();
413        let metadata: CompressedFileMetadata =
414            serde_json::from_str(&metadata_json).map_err(|e| {
415                CoreError::ValueError(ErrorContext::new(format!(
416                    "Failed to deserialize metadata: {}",
417                    e
418                )))
419            })?;
420
421        // Check that the element _size matches
422        let expected_element_size = std::mem::size_of::<A>();
423        if metadata.element_size != expected_element_size {
424            return Err(CoreError::ValueError(ErrorContext::new(format!(
425                "Element _size mismatch: expected {}, got {}",
426                expected_element_size, metadata.element_size
427            ))));
428        }
429
430        // Create the block cache
431        let block_cache = Arc::new(BlockCache::new(cache_size, cache_ttl));
432
433        // Create and return the array
434        Ok(Self {
435            path,
436            metadata,
437            block_cache,
438            phantom: std::marker::PhantomData,
439        })
440    }
441
442    /// Get the shape of the array.
443    pub fn shape(&self) -> &[usize] {
444        &self.metadata.shape
445    }
446
447    /// Get the total number of elements in the array.
448    pub fn size(&self) -> usize {
449        self.metadata.num_elements
450    }
451
452    /// Get the number of dimensions of the array.
453    pub fn ndim(&self) -> usize {
454        self.metadata.shape.len()
455    }
456
457    /// Get the metadata for the compressed file.
458    pub fn metadata(&self) -> &CompressedFileMetadata {
459        &self.metadata
460    }
461
462    /// Get the block size.
463    pub fn block_size(&self) -> usize {
464        self.metadata.block_size
465    }
466
467    /// Get the number of blocks.
468    pub fn num_blocks(&self) -> usize {
469        self.metadata.num_blocks
470    }
471
472    /// Load a specific block into memory.
473    ///
474    /// This is useful for preloading blocks that will be accessed soon.
475    ///
476    /// # Arguments
477    ///
478    /// * `blockidx` - The index of the block to load
479    ///
480    /// # Returns
481    ///
482    /// `Ok(())` if successful, or an error
483    pub fn preload_block(&self, blockidx: usize) -> CoreResult<()> {
484        if blockidx >= self.metadata.num_blocks {
485            return Err(CoreError::IndexError(ErrorContext::new(format!(
486                "Block index {} out of bounds (max {})",
487                blockidx,
488                self.metadata.num_blocks - 1
489            ))));
490        }
491
492        // Check if the block is already cached
493        if self.block_cache.has_block(blockidx) {
494            return Ok(());
495        }
496
497        // Load and decompress the block
498        let block = self.load_block(blockidx)?;
499
500        // Add to cache
501        self.block_cache.put_block(blockidx, block);
502
503        Ok(())
504    }
505
506    /// Load a block from the compressed file.
507    fn load_block(&self, blockidx: usize) -> CoreResult<Vec<A>> {
508        // Open the file
509        let mut file = File::open(&self.path)?;
510
511        // Get block information
512        let offset = self.metadata.block_offsets[blockidx];
513        let compressed_size = self.metadata.block_compressed_sizes[blockidx];
514        let uncompressed_size = self.metadata.block_uncompressed_sizes[blockidx];
515
516        // Read the compressed block
517        file.seek(SeekFrom::Start(offset))?;
518        let mut compressed_data = vec![0u8; compressed_size];
519        file.read_exact(&mut compressed_data)?;
520
521        // Decompress the block
522        let block_bytes = match self.metadata.compression_algorithm {
523            CompressionAlgorithm::Lz4 => {
524                oxiarc_lz4::decompress(&compressed_data, uncompressed_size).map_err(|e| {
525                    CoreError::ComputationError(ErrorContext::new(format!(
526                        "LZ4 decompression error: {}",
527                        e
528                    )))
529                })?
530            }
531            CompressionAlgorithm::Zstd => {
532                oxiarc_zstd::decompress(&compressed_data).map_err(|e| {
533                    CoreError::ComputationError(ErrorContext::new(format!(
534                        "Zstd decompression error: {}",
535                        e
536                    )))
537                })?
538            }
539            CompressionAlgorithm::Snappy => {
540                oxiarc_snappy::decompress(&compressed_data).map_err(|e| {
541                    CoreError::ComputationError(ErrorContext::new(format!(
542                        "Snappy decompression error: {}",
543                        e
544                    )))
545                })?
546            }
547        };
548
549        // Check that we got the expected number of bytes
550        if block_bytes.len() != uncompressed_size {
551            return Err(CoreError::ValueError(ErrorContext::new(format!(
552                "Block {} decompressed to {} bytes, expected {}",
553                blockidx,
554                block_bytes.len(),
555                uncompressed_size
556            ))));
557        }
558
559        // Convert bytes to elements
560        let num_elements = uncompressed_size / std::mem::size_of::<A>();
561        let mut elements = Vec::with_capacity(num_elements);
562
563        // Interpret bytes as elements (this is safe because A is Copy)
564        for chunk in block_bytes.chunks_exact(std::mem::size_of::<A>()) {
565            let element = unsafe { *(chunk.as_ptr() as *const A) };
566            elements.push(element);
567        }
568
569        Ok(elements)
570    }
571
572    /// Get a read-only view of the entire array.
573    ///
574    /// This will decompress all blocks and load them into memory.
575    ///
576    /// # Returns
577    ///
578    /// A read-only ndarray view of the data
579    pub fn readonly_array(&self) -> CoreResult<Array<A, IxDyn>> {
580        // Allocate an array for the result
581        let mut result = Array::from_elem(IxDyn(&self.metadata.shape), unsafe {
582            std::mem::zeroed::<A>()
583        });
584
585        // Load each block and copy it into the result
586        let mut offset = 0;
587        for blockidx in 0..self.metadata.num_blocks {
588            // Get the block (from cache if available)
589            let block = match self.block_cache.get_block(blockidx) {
590                Some(block) => block,
591                None => {
592                    let block = self.load_block(blockidx)?;
593                    self.block_cache.put_block(blockidx, block.clone());
594                    block
595                }
596            };
597
598            // Copy the block into the result
599            let start = offset;
600            let end = (start + block.len()).min(self.metadata.num_elements);
601            let slice = &mut result.as_slice_mut().expect("Operation failed")[start..end];
602            slice.copy_from_slice(&block[..(end - start)]);
603
604            // Update the offset
605            offset = end;
606        }
607
608        Ok(result)
609    }
610
611    /// Get a specific element from the array.
612    ///
613    /// This will decompress only the block containing the element.
614    ///
615    /// # Arguments
616    ///
617    /// * `indices` - The indices of the element to get
618    ///
619    /// # Returns
620    ///
621    /// The element at the specified indices
622    pub fn get(&self, indices: &[usize]) -> CoreResult<A> {
623        // Check that the indices are valid
624        if indices.len() != self.metadata.shape.len() {
625            return Err(CoreError::DimensionError(ErrorContext::new(format!(
626                "Expected {} indices, got {}",
627                self.metadata.shape.len(),
628                indices.len()
629            ))));
630        }
631
632        for (dim, &idx) in indices.iter().enumerate() {
633            if idx >= self.metadata.shape[dim] {
634                return Err(CoreError::IndexError(ErrorContext::new(format!(
635                    "Index {} out of bounds for dimension {} (max {})",
636                    idx,
637                    dim,
638                    self.metadata.shape[dim] - 1
639                ))));
640            }
641        }
642
643        // Calculate flat index
644        let mut flat_index = 0;
645        let mut stride = 1;
646        for i in (0..indices.len()).rev() {
647            flat_index += indices[i] * stride;
648            if i > 0 {
649                stride *= self.metadata.shape[i];
650            }
651        }
652
653        // Calculate block index
654        let blockidx = flat_index / self.metadata.block_size;
655        let block_offset = flat_index % self.metadata.block_size;
656
657        // Get the block (from cache if available)
658        let block = match self.block_cache.get_block(blockidx) {
659            Some(block) => block,
660            None => {
661                let block = self.load_block(blockidx)?;
662                self.block_cache.put_block(blockidx, block.clone());
663                block
664            }
665        };
666
667        // Get the element
668        if block_offset < block.len() {
669            Ok(block[block_offset])
670        } else {
671            Err(CoreError::IndexError(ErrorContext::new(format!(
672                "Block offset {} out of bounds for block {} (max {})",
673                block_offset,
674                blockidx,
675                block.len() - 1
676            ))))
677        }
678    }
679
680    /// Get a subset of the array as a new array.
681    ///
682    /// This will decompress only the blocks containing the subset.
683    ///
684    /// # Arguments
685    ///
686    /// * `ranges` - The ranges of indices to get for each dimension
687    ///
688    /// # Returns
689    ///
690    /// A new array containing the subset
691    pub fn slice(&self, ranges: &[(usize, usize)]) -> CoreResult<Array<A, IxDyn>> {
692        // Check that the ranges are valid
693        if ranges.len() != self.metadata.shape.len() {
694            return Err(CoreError::DimensionError(ErrorContext::new(format!(
695                "Expected {} ranges, got {}",
696                self.metadata.shape.len(),
697                ranges.len()
698            ))));
699        }
700
701        // Calculate the shape of the result
702        let mut resultshape = Vec::with_capacity(ranges.len());
703        for (dim, &(start, end)) in ranges.iter().enumerate() {
704            if start >= end {
705                return Err(CoreError::ValueError(ErrorContext::new(format!(
706                    "Invalid range for dimension {}: {}..{}",
707                    dim, start, end
708                ))));
709            }
710            if end > self.metadata.shape[dim] {
711                return Err(CoreError::IndexError(ErrorContext::new(format!(
712                    "Range {}..{} out of bounds for dimension {} (max {})",
713                    start, end, dim, self.metadata.shape[dim]
714                ))));
715            }
716            resultshape.push(end - start);
717        }
718
719        // Allocate an array for the result
720        let mut result = Array::from_elem(IxDyn(&resultshape), unsafe { std::mem::zeroed::<A>() });
721
722        // Calculate the total number of elements in the result
723        let result_size = resultshape.iter().product::<usize>();
724
725        // Create iterators for the result indices
726        let mut result_indices = vec![0; ranges.len()];
727        let mut source_indices = Vec::with_capacity(ranges.len());
728        for &(start, _) in ranges.iter() {
729            source_indices.push(start);
730        }
731
732        // Iterate through all elements in the result
733        for _result_flat_idx in 0..result_size {
734            // Calculate source flat index
735            let mut source_flat_idx = 0;
736            let mut stride = 1;
737            for i in (0..source_indices.len()).rev() {
738                source_flat_idx += source_indices[i] * stride;
739                if i > 0 {
740                    stride *= self.metadata.shape[i];
741                }
742            }
743
744            // Get the element from the source
745            let blockidx = source_flat_idx / self.metadata.block_size;
746            let block_offset = source_flat_idx % self.metadata.block_size;
747
748            // Get the block (from cache if available)
749            let block = match self.block_cache.get_block(blockidx) {
750                Some(block) => block,
751                None => {
752                    let block = self.load_block(blockidx)?;
753                    self.block_cache.put_block(blockidx, block.clone());
754                    block
755                }
756            };
757
758            // Get the element and set it in the result
759            if block_offset < block.len() {
760                // Calculate the result flat index
761                let mut result_stride = 1;
762                let mut result_flat_idx = 0;
763                for i in (0..result_indices.len()).rev() {
764                    result_flat_idx += result_indices[i] * result_stride;
765                    if i > 0 {
766                        result_stride *= resultshape[i];
767                    }
768                }
769
770                // Set the element in the result
771                let result_slice = result.as_slice_mut().expect("Operation failed");
772                result_slice[result_flat_idx] = block[block_offset];
773            }
774
775            // Increment the indices
776            for i in (0..ranges.len()).rev() {
777                result_indices[i] += 1;
778                source_indices[i] += 1;
779                if result_indices[i] < resultshape[i] {
780                    break;
781                }
782                result_indices[i] = 0;
783                source_indices[i] = ranges[i].0;
784            }
785        }
786
787        Ok(result)
788    }
789
790    /// Process the array in blocks, with each block loaded and decompressed on demand.
791    ///
792    /// This is useful for operations that can be performed on blocks independently.
793    ///
794    /// # Arguments
795    ///
796    /// * `f` - A function that processes a block of elements
797    ///
798    /// # Returns
799    ///
800    /// A vector of results, one for each block
801    pub fn process_blocks<F, R>(&self, f: F) -> CoreResult<Vec<R>>
802    where
803        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
804        R: Send + 'static,
805    {
806        self.process_blocks_internal(f, false, None)
807    }
808
809    /// Process the array in blocks with a custom block size.
810    ///
811    /// # Arguments
812    ///
813    /// * `block_size` - The block size to use (in elements)
814    /// * `f` - A function that processes a block of elements
815    ///
816    /// # Returns
817    ///
818    /// A vector of results, one for each block
819    pub fn process_blocks_with_size<F, R>(&self, blocksize: usize, f: F) -> CoreResult<Vec<R>>
820    where
821        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
822        R: Send + 'static,
823    {
824        self.process_blocks_internal(f, false, Some(blocksize))
825    }
826
827    /// Process the array in blocks in parallel.
828    ///
829    /// # Arguments
830    ///
831    /// * `f` - A function that processes a block of elements
832    ///
833    /// # Returns
834    ///
835    /// A vector of results, one for each block
836    #[cfg(feature = "parallel")]
837    pub fn process_blocks_parallel<F, R>(&self, f: F) -> CoreResult<Vec<R>>
838    where
839        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
840        R: Send + 'static,
841    {
842        self.process_blocks_internal(f, true, None)
843    }
844
845    /// Process the array in blocks in parallel with a custom block size.
846    ///
847    /// # Arguments
848    ///
849    /// * `block_size` - The block size to use (in elements)
850    /// * `f` - A function that processes a block of elements
851    ///
852    /// # Returns
853    ///
854    /// A vector of results, one for each block
855    #[cfg(feature = "parallel")]
856    pub fn process_blocks_parallel_with_size<F, R>(
857        &self,
858        block_size: usize,
859        f: F,
860    ) -> CoreResult<Vec<R>>
861    where
862        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
863        R: Send + 'static,
864    {
865        self.process_blocks_internal(f, true, Some(block_size))
866    }
867
868    /// Internal implementation of block processing.
869    #[cfg(not(feature = "parallel"))]
870    fn process_blocks_internal<F, R>(
871        &self,
872        mut f: F,
873        _parallel: bool,
874        custom_block_size: Option<usize>,
875    ) -> CoreResult<Vec<R>>
876    where
877        F: FnMut(&[A], usize) -> R,
878    {
879        // Determine block layout
880        let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
881        let num_elements = self.metadata.num_elements;
882        let num_blocks = num_elements.div_ceil(block_size);
883
884        // Serial processing — f is FnMut so it requires mutable access per call
885        let mut results = Vec::with_capacity(num_blocks);
886        for blockidx in 0..num_blocks {
887            let start = blockidx * block_size;
888            let end = (start + block_size).min(num_elements);
889            let elements = self.load_elements(start, end)?;
890            results.push(f(&elements, blockidx));
891        }
892        Ok(results)
893    }
894
895    /// Internal implementation of block processing (parallel version).
896    #[cfg(feature = "parallel")]
897    fn process_blocks_internal<F, R>(
898        &self,
899        f: F,
900        parallel: bool,
901        custom_block_size: Option<usize>,
902    ) -> CoreResult<Vec<R>>
903    where
904        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
905        R: Send + 'static,
906    {
907        // Determine block layout
908        let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
909        let num_elements = self.metadata.num_elements;
910        let num_blocks = num_elements.div_ceil(block_size);
911
912        // Process blocks
913        if parallel {
914            use crate::parallel_ops::*;
915
916            return (0..num_blocks)
917                .into_par_iter()
918                .map(|blockidx| {
919                    // Calculate the range of elements for this block
920                    let start = blockidx * block_size;
921                    let end = (start + block_size).min(num_elements);
922
923                    // Load the elements for this block
924                    let elements = match self.load_elements(start, end) {
925                        Ok(elems) => elems,
926                        Err(e) => return Err(e),
927                    };
928
929                    // Apply the function to the block
930                    Ok(f(&elements, blockidx))
931                })
932                .collect::<Result<Vec<R>, CoreError>>();
933        }
934
935        // Serial processing (used when parallel=false)
936        (0..num_blocks)
937            .map(|blockidx| {
938                // Calculate the range of elements for this block
939                let start = blockidx * block_size;
940                let end = (start + block_size).min(num_elements);
941
942                // Load the elements for this block
943                let elements = self.load_elements(start, end)?;
944
945                // Apply the function to the block
946                Ok(f(&elements, blockidx))
947            })
948            .collect::<Result<Vec<R>, CoreError>>()
949    }
950
951    /// Load a range of elements from the array.
952    ///
953    /// This will decompress all blocks containing the range.
954    ///
955    /// # Arguments
956    ///
957    /// * `start` - The starting element index
958    /// * `end` - The ending element index (exclusive)
959    ///
960    /// # Returns
961    ///
962    /// A vector of elements in the range
963    fn load_elements(&self, start: usize, end: usize) -> CoreResult<Vec<A>> {
964        // Check bounds
965        if start >= self.metadata.num_elements {
966            return Err(CoreError::IndexError(ErrorContext::new(format!(
967                "Start index {} out of bounds (max {})",
968                start,
969                self.metadata.num_elements - 1
970            ))));
971        }
972        if end > self.metadata.num_elements {
973            return Err(CoreError::IndexError(ErrorContext::new(format!(
974                "End index {} out of bounds (max {})",
975                end, self.metadata.num_elements
976            ))));
977        }
978        if start >= end {
979            return Ok(Vec::new());
980        }
981
982        // Determine which blocks we need
983        let start_block = start / self.metadata.block_size;
984        let end_block = (end - 1) / self.metadata.block_size;
985
986        // Allocate space for the result
987        let mut result = Vec::with_capacity(end - start);
988
989        // Load each required block
990        for blockidx in start_block..=end_block {
991            // Get the block (from cache if available)
992            let block = match self.block_cache.get_block(blockidx) {
993                Some(block) => block,
994                None => {
995                    let block = self.load_block(blockidx)?;
996                    self.block_cache.put_block(blockidx, block.clone());
997                    block
998                }
999            };
1000
1001            // Calculate the range of elements we need from this block
1002            let block_start = blockidx * self.metadata.block_size;
1003            let block_end = block_start + block.len();
1004
1005            let range_start = start.max(block_start) - block_start;
1006            let range_end = end.min(block_end) - block_start;
1007
1008            // Copy the elements into the result
1009            result.extend_from_slice(&block[range_start..range_end]);
1010        }
1011
1012        Ok(result)
1013    }
1014}
1015
1016/// Cache for decompressed blocks.
1017///
1018/// This struct provides a LRU (Least Recently Used) cache for decompressed blocks.
1019#[derive(Debug)]
1020struct BlockCache<A: Clone + Copy + 'static + Send + Sync> {
1021    /// Maximum number of blocks to cache
1022    capacity: usize,
1023
1024    /// Time-to-live for cached blocks
1025    ttl: Option<Duration>,
1026
1027    /// Cache of decompressed blocks
1028    cache: RwLock<HashMap<usize, CachedBlock<A>>>,
1029}
1030
1031/// A cached block with its timestamp.
1032#[derive(Debug, Clone)]
1033struct CachedBlock<A: Clone + Copy + 'static + Send + Sync> {
1034    /// The decompressed block data
1035    data: Vec<A>,
1036
1037    /// The time when the block was last accessed
1038    timestamp: Instant,
1039}
1040
1041impl<A: Clone + Copy + 'static + Send + Sync> BlockCache<A> {
1042    /// Create a new block cache.
1043    ///
1044    /// # Arguments
1045    ///
1046    /// * `capacity` - The maximum number of blocks to cache
1047    /// * `ttl` - The time-to-live for cached blocks
1048    fn new(capacity: usize, ttl: Option<Duration>) -> Self {
1049        Self {
1050            capacity,
1051            ttl,
1052            cache: RwLock::new(HashMap::new()),
1053        }
1054    }
1055
1056    /// Check if a block is in the cache.
1057    ///
1058    /// # Arguments
1059    ///
1060    /// * `blockidx` - The index of the block to check
1061    ///
1062    /// # Returns
1063    ///
1064    /// `true` if the block is in the cache, `false` otherwise
1065    fn has_block(&self, blockidx: usize) -> bool {
1066        let cache = self.cache.read().expect("Operation failed");
1067
1068        // Check if the block is in the cache
1069        if let Some(cached) = cache.get(&blockidx) {
1070            // Check if the block has expired
1071            if let Some(ttl) = self.ttl {
1072                if cached.timestamp.elapsed() > ttl {
1073                    return false;
1074                }
1075            }
1076
1077            true
1078        } else {
1079            false
1080        }
1081    }
1082
1083    /// Get a block from the cache.
1084    ///
1085    /// # Arguments
1086    ///
1087    /// * `blockidx` - The index of the block to get
1088    ///
1089    /// # Returns
1090    ///
1091    /// The block if it is in the cache, `None` otherwise
1092    fn get_block(&self, blockidx: usize) -> Option<Vec<A>> {
1093        let mut cache = self.cache.write().expect("Operation failed");
1094
1095        // Check if the block is in the cache
1096        if let Some(mut cached) = cache.remove(&blockidx) {
1097            // Check if the block has expired
1098            if let Some(ttl) = self.ttl {
1099                if cached.timestamp.elapsed() > ttl {
1100                    return None;
1101                }
1102            }
1103
1104            // Update the timestamp
1105            cached.timestamp = Instant::now();
1106
1107            // Put the block back in the cache
1108            let data = cached.data.clone();
1109            cache.insert(blockidx, cached);
1110
1111            Some(data)
1112        } else {
1113            None
1114        }
1115    }
1116
1117    /// Put a block in the cache.
1118    ///
1119    /// # Arguments
1120    ///
1121    /// * `blockidx` - The index of the block to put
1122    /// * `block` - The block data
1123    fn put_block(&self, blockidx: usize, block: Vec<A>) {
1124        let mut cache = self.cache.write().expect("Operation failed");
1125
1126        // Check if we need to evict a block
1127        if cache.len() >= self.capacity && !cache.contains_key(&blockidx) {
1128            // Find the least recently used block
1129            if let Some(lru_idx) = cache
1130                .iter()
1131                .min_by_key(|(_, cached)| cached.timestamp)
1132                .map(|(idx, _)| *idx)
1133            {
1134                cache.remove(&lru_idx);
1135            }
1136        }
1137
1138        // Add the block to the cache
1139        cache.insert(
1140            blockidx,
1141            CachedBlock {
1142                data: block,
1143                timestamp: Instant::now(),
1144            },
1145        );
1146    }
1147
1148    /// Clear the cache.
1149    #[allow(dead_code)]
1150    fn clear(&self) {
1151        let mut cache = self.cache.write().expect("Operation failed");
1152        cache.clear();
1153    }
1154
1155    /// Get the number of blocks in the cache.
1156    #[allow(dead_code)]
1157    fn len(&self) -> usize {
1158        let cache = self.cache.read().expect("Operation failed");
1159        cache.len()
1160    }
1161
1162    /// Check if the cache is empty.
1163    #[allow(dead_code)]
1164    fn is_empty(&self) -> bool {
1165        let cache = self.cache.read().expect("Operation failed");
1166        cache.is_empty()
1167    }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172    use super::*;
1173    use ::ndarray::Array2;
1174    use tempfile::tempdir;
1175
1176    #[test]
1177    fn test_compressed_memmapped_array_1d() {
1178        // Create a temporary directory for our test files
1179        let dir = tempdir().expect("Operation failed");
1180        let file_path = dir.path().join("test_compressed_1d.cmm");
1181
1182        // Create test data
1183        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1184
1185        // Create a builder
1186        let builder = CompressedMemMapBuilder::new()
1187            .with_block_size(100)
1188            .with_algorithm(CompressionAlgorithm::Lz4)
1189            .with_level(1)
1190            .with_cache_size(4)
1191            .with_description("Test 1D array");
1192
1193        // Create the compressed memory-mapped array
1194        let cmm = builder
1195            .create_from_raw(&data, &[1000], &file_path)
1196            .expect("Operation failed");
1197
1198        // Check metadata
1199        assert_eq!(cmm.shape(), &[1000]);
1200        assert_eq!(cmm.size(), 1000);
1201        assert_eq!(cmm.ndim(), 1);
1202
1203        // Test random access
1204        for i in 0..10 {
1205            let val = cmm.get(&[i * 100]).expect("Operation failed");
1206            assert_eq!(val, (i * 100) as f64);
1207        }
1208
1209        // Test slicing
1210        let slice = cmm.slice(&[(200, 300)]).expect("Operation failed");
1211        assert_eq!(slice.shape(), &[100]);
1212        for i in 0..100 {
1213            assert_eq!(slice[crate::ndarray::IxDyn(&[i])], (i + 200) as f64);
1214        }
1215
1216        // Test block processing
1217        let sums = cmm
1218            .process_blocks(|block, _| block.iter().sum::<f64>())
1219            .expect("Test: operation failed");
1220
1221        assert_eq!(sums.len(), 10); // 1000 elements / 100 block size = 10 blocks
1222
1223        // Test loading the entire array
1224        let array = cmm.readonly_array().expect("Operation failed");
1225        assert_eq!(array.shape(), &[1000]);
1226        for i in 0..1000 {
1227            assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1228        }
1229    }
1230
1231    #[test]
1232    fn test_compressed_memmapped_array_2d() {
1233        // Create a temporary directory for our test files
1234        let dir = tempdir().expect("Operation failed");
1235        let file_path = dir.path().join("test_compressed_2d.cmm");
1236
1237        // Create test data - 10x10 matrix
1238        let data = Array2::<f64>::from_shape_fn((10, 10), |(i, j)| (i * 10 + j) as f64);
1239
1240        // Create a builder
1241        let builder = CompressedMemMapBuilder::new()
1242            .with_block_size(25) // 5x5 blocks
1243            .with_algorithm(CompressionAlgorithm::Lz4)
1244            .with_level(1)
1245            .with_cache_size(4)
1246            .with_description("Test 2D array");
1247
1248        // Create the compressed memory-mapped array
1249        let cmm = builder.create(&data, &file_path).expect("Operation failed");
1250
1251        // Check metadata
1252        assert_eq!(cmm.shape(), &[10, 10]);
1253        assert_eq!(cmm.size(), 100);
1254        assert_eq!(cmm.ndim(), 2);
1255
1256        // Test random access
1257        for i in 0..10 {
1258            for j in 0..10 {
1259                let val = cmm.get(&[i, j]).expect("Operation failed");
1260                assert_eq!(val, (i * 10 + j) as f64);
1261            }
1262        }
1263
1264        // Test slicing
1265        let slice = cmm.slice(&[(2, 5), (3, 7)]).expect("Operation failed");
1266        assert_eq!(slice.shape(), &[3, 4]);
1267        for i in 0..3 {
1268            for j in 0..4 {
1269                assert_eq!(
1270                    slice[crate::ndarray::IxDyn(&[i, j])],
1271                    ((i + 2) * 10 + (j + 3)) as f64
1272                );
1273            }
1274        }
1275
1276        // Test loading the entire array
1277        let array = cmm.readonly_array().expect("Operation failed");
1278        assert_eq!(array.shape(), &[10, 10]);
1279        for i in 0..10 {
1280            for j in 0..10 {
1281                assert_eq!(array[crate::ndarray::IxDyn(&[i, j])], (i * 10 + j) as f64);
1282            }
1283        }
1284    }
1285
1286    #[test]
1287    fn test_different_compression_algorithms() {
1288        // Create a temporary directory for our test files
1289        let dir = tempdir().expect("Operation failed");
1290
1291        // Create test data
1292        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1293
1294        // Test each compression algorithm
1295        for algorithm in &[
1296            CompressionAlgorithm::Lz4,
1297            CompressionAlgorithm::Zstd,
1298            CompressionAlgorithm::Snappy,
1299        ] {
1300            let file_path = dir.path().join(format!("test_{:?}.cmm", algorithm));
1301
1302            // Create a builder
1303            let builder = CompressedMemMapBuilder::new()
1304                .with_block_size(100)
1305                .with_algorithm(*algorithm)
1306                .with_level(1)
1307                .with_cache_size(4);
1308
1309            // Create the compressed memory-mapped array
1310            let cmm = builder
1311                .create_from_raw(&data, &[1000], &file_path)
1312                .expect("Operation failed");
1313
1314            // Test loading the entire array
1315            let array = cmm.readonly_array().expect("Operation failed");
1316            for i in 0..1000 {
1317                assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1318            }
1319        }
1320    }
1321
1322    #[test]
1323    fn test_block_cache() {
1324        // Create a temporary directory for our test files
1325        let dir = tempdir().expect("Operation failed");
1326        let file_path = dir.path().join("test_cache.cmm");
1327
1328        // Create test data
1329        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1330
1331        // Create compressed arrays with different cache settings
1332        let small_cache = CompressedMemMapBuilder::new()
1333            .with_block_size(100)
1334            .with_cache_size(2) // Very small cache
1335            .create_from_raw(&data, &[1000], &file_path).expect("Operation failed");
1336
1337        // Test cache behavior
1338        // First, load all blocks to fill the cache
1339        for i in 0..10 {
1340            small_cache.preload_block(i).expect("Operation failed");
1341        }
1342
1343        // Check the cache size - should be 2 (capacity)
1344        assert_eq!(small_cache.block_cache.len(), 2);
1345
1346        // Now access a block that's not in the cache
1347        // This should evict the least recently used block
1348        let val = small_cache.get(&[0]).expect("Operation failed"); // Block 0
1349        assert_eq!(val, 0.0);
1350
1351        // Check that the block is now in the cache
1352        assert!(small_cache.block_cache.has_block(0));
1353    }
1354
1355    #[test]
1356    fn test_block_preloading() {
1357        // Create a temporary directory for our test files
1358        let dir = tempdir().expect("Operation failed");
1359        let file_path = dir.path().join("test_preload.cmm");
1360
1361        // Create test data
1362        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1363
1364        // Create the compressed memory-mapped array
1365        let cmm = CompressedMemMapBuilder::new()
1366            .with_block_size(100)
1367            .create_from_raw(&data, &[1000], &file_path)
1368            .expect("Test: operation failed");
1369
1370        // Preload a block
1371        cmm.preload_block(5).expect("Operation failed");
1372
1373        // Check that the block is now in the cache
1374        assert!(cmm.block_cache.has_block(5));
1375
1376        // Access an element from the preloaded block
1377        let val = cmm.get(&[550]).expect("Operation failed"); // In block 5
1378        assert_eq!(val, 550.0);
1379    }
1380}