Skip to main content

scirs2_core/memory_efficient/
compressed_memmap.rs

1//! Compressed memory-mapped arrays.
2//!
3//! This module provides functionality for memory-mapping arrays with transparent
4//! compression and decompression. This can significantly reduce disk space
5//! requirements while maintaining the benefits of memory-mapping for large data.
6//!
7//! The implementation uses a block-based approach, where data is split into blocks
8//! that are compressed independently. This allows for efficient random access and
9//! partial loading of the data.
10
11use crate::error::{CoreError, CoreResult, ErrorContext};
12use ::ndarray::{Array, ArrayBase, Dimension, IxDyn, RawData};
13
14#[cfg(feature = "memory_compression")]
15use lz4::{Decoder, EncoderBuilder};
16
17use std::collections::HashMap;
18use std::fs::{File, OpenOptions};
19use std::io::{Read, Seek, SeekFrom, Write};
20use std::path::{Path, PathBuf};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24/// Metadata for a compressed memory-mapped file
25#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
26pub struct CompressedFileMetadata {
27    /// Original array shape
28    pub shape: Vec<usize>,
29
30    /// Element type information (size in bytes)
31    pub element_size: usize,
32
33    /// Total number of elements in the array
34    pub num_elements: usize,
35
36    /// Block size in elements
37    pub block_size: usize,
38
39    /// Number of blocks
40    pub num_blocks: usize,
41
42    /// Offset of each block in the compressed file
43    pub block_offsets: Vec<u64>,
44
45    /// Compressed size of each block
46    pub block_compressed_sizes: Vec<usize>,
47
48    /// Uncompressed size of each block
49    pub block_uncompressed_sizes: Vec<usize>,
50
51    /// Compression algorithm used
52    pub compression_algorithm: CompressionAlgorithm,
53
54    /// Compression level
55    pub compression_level: i32,
56
57    /// Creation timestamp
58    pub creation_time: chrono::DateTime<chrono::Utc>,
59
60    /// Optional description
61    pub description: Option<String>,
62}
63
64/// Supported compression algorithms
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
66pub enum CompressionAlgorithm {
67    /// LZ4 compression
68    #[default]
69    Lz4,
70    /// Zstd compression
71    Zstd,
72    /// Snappy compression
73    Snappy,
74}
75
76/// Builder for compressed memory-mapped arrays.
77#[derive(Debug, Clone)]
78pub struct CompressedMemMapBuilder {
79    /// Block size in elements
80    block_size: usize,
81
82    /// Compression algorithm
83    algorithm: CompressionAlgorithm,
84
85    /// Compression level
86    level: i32,
87
88    /// Maximum cache size in blocks
89    cache_size: usize,
90
91    /// Cache time-to-live in seconds
92    cache_ttl: Option<Duration>,
93
94    /// Description
95    description: Option<String>,
96}
97
98impl Default for CompressedMemMapBuilder {
99    fn default() -> Self {
100        Self {
101            block_size: 65536, // 64K elements by default
102            algorithm: CompressionAlgorithm::Lz4,
103            level: 1,                                  // Default compression level
104            cache_size: 32,                            // Cache 32 blocks by default
105            cache_ttl: Some(Duration::from_secs(300)), // 5 minute TTL
106            description: None,
107        }
108    }
109}
110
111impl CompressedMemMapBuilder {
112    /// Create a new builder with default settings.
113    pub fn new() -> Self {
114        Self::default()
115    }
116
117    /// Set the block size in elements.
118    ///
119    /// Larger blocks provide better compression but slower random access.
120    pub fn with_block_size(mut self, blocksize: usize) -> Self {
121        self.block_size = blocksize;
122        self
123    }
124
125    /// Set the compression algorithm.
126    pub fn with_algorithm(mut self, algorithm: CompressionAlgorithm) -> Self {
127        self.algorithm = algorithm;
128        self
129    }
130
131    /// Set the compression level.
132    ///
133    /// Higher levels provide better compression but slower compression speed.
134    /// The valid range depends on the algorithm.
135    /// - LZ4: 1-12
136    /// - Zstd: 1-22
137    /// - Snappy: Ignores this parameter
138    pub fn with_level(mut self, level: i32) -> Self {
139        self.level = level;
140        self
141    }
142
143    /// Set the maximum cache size in blocks.
144    ///
145    /// Larger cache sizes allow for more decompressed blocks to be held in memory,
146    /// potentially improving performance for repeated access patterns.
147    pub fn with_cache_size(mut self, cachesize: usize) -> Self {
148        self.cache_size = cachesize;
149        self
150    }
151
152    /// Set the cache time-to-live duration.
153    ///
154    /// Blocks will be evicted from cache after this duration if not accessed.
155    /// Set to None for no time-based eviction.
156    pub fn with_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
157        self.cache_ttl = ttl;
158        self
159    }
160
161    /// Set an optional description.
162    pub fn with_description(mut self, description: impl Into<String>) -> Self {
163        self.description = Some(description.into());
164        self
165    }
166
167    /// Create a compressed memory-mapped array from existing data.
168    ///
169    /// # Arguments
170    ///
171    /// * `data` - The data to compress
172    /// * `path` - The file path to save the compressed data
173    ///
174    /// # Returns
175    ///
176    /// A compressed memory-mapped array
177    pub fn create<A, S, D>(
178        &self,
179        data: &ArrayBase<S, D>,
180        path: impl AsRef<Path>,
181    ) -> CoreResult<CompressedMemMappedArray<A>>
182    where
183        A: Clone + Copy + 'static + Send + Sync,
184        S: RawData<Elem = A>,
185        D: Dimension,
186    {
187        // Get array information
188        let shape = data.shape().to_vec();
189        let num_elements = data.len();
190        let element_size = std::mem::size_of::<A>();
191
192        // Calculate block information
193        let block_size = self.block_size.min(num_elements);
194        let num_blocks = num_elements.div_ceil(block_size);
195
196        // Prepare metadata
197        let mut metadata = CompressedFileMetadata {
198            shape,
199            element_size,
200            num_elements,
201            block_size,
202            num_blocks,
203            block_offsets: Vec::with_capacity(num_blocks),
204            block_compressed_sizes: Vec::with_capacity(num_blocks),
205            block_uncompressed_sizes: Vec::with_capacity(num_blocks),
206            compression_algorithm: self.algorithm,
207            compression_level: self.level,
208            creation_time: chrono::Utc::now(),
209            description: self.description.clone(),
210        };
211
212        // Create output file
213        let path = path.as_ref();
214        let mut file = OpenOptions::new()
215            .read(true)
216            .write(true)
217            .create(true)
218            .truncate(true)
219            .open(path)?;
220
221        // Reserve space for metadata (will write it after compression)
222        let metadata_placeholder = vec![0u8; 1024];
223        file.write_all(&metadata_placeholder)?;
224
225        // Compress each block
226        let data_ptr = data.as_ptr() as *const u8;
227        let mut current_offset = metadata_placeholder.len() as u64;
228
229        for blockidx in 0..num_blocks {
230            let start_element = blockidx * block_size;
231            let end_element = (start_element + block_size).min(num_elements);
232            let block_elements = end_element - start_element;
233            let uncompressed_size = block_elements * element_size;
234
235            // Record the block offset
236            metadata.block_offsets.push(current_offset);
237            metadata.block_uncompressed_sizes.push(uncompressed_size);
238
239            // Get the data for this block
240            let data_offset = start_element * element_size;
241            let block_data =
242                unsafe { std::slice::from_raw_parts(data_ptr.add(data_offset), uncompressed_size) };
243
244            // Compress the block
245            let compressed_data = match self.algorithm {
246                CompressionAlgorithm::Lz4 => {
247                    let mut encoder = EncoderBuilder::new()
248                        .level(self.level as u32)
249                        .build(Vec::new())?;
250                    encoder.write_all(block_data)?;
251                    let (compressed, result) = encoder.finish();
252                    result?;
253                    compressed
254                }
255                CompressionAlgorithm::Zstd => zstd::encode_all(block_data, self.level)?,
256                CompressionAlgorithm::Snappy => snap::raw::Encoder::new()
257                    .compress_vec(block_data)
258                    .map_err(|e| {
259                        CoreError::ComputationError(ErrorContext::new(format!(
260                            "Snappy compression error: {}",
261                            e
262                        )))
263                    })?,
264            };
265
266            // Record the compressed size
267            metadata.block_compressed_sizes.push(compressed_data.len());
268
269            // Write the compressed block
270            file.write_all(&compressed_data)?;
271
272            // Update the offset for the next block
273            current_offset += compressed_data.len() as u64;
274        }
275
276        // Write the metadata at the beginning of the file
277        let metadata_json = serde_json::to_string(&metadata).map_err(|e| {
278            CoreError::ValueError(ErrorContext::new(format!(
279                "Failed to serialize metadata: {}",
280                e
281            )))
282        })?;
283        let mut metadata_bytes = metadata_json.into_bytes();
284
285        // Ensure the metadata fits in the reserved space
286        if metadata_bytes.len() > metadata_placeholder.len() {
287            return Err(CoreError::ValueError(ErrorContext::new(format!(
288                "Metadata size ({} bytes) exceeds reserved space ({} bytes)",
289                metadata_bytes.len(),
290                metadata_placeholder.len()
291            ))));
292        }
293
294        // Pad the metadata to the reserved size
295        metadata_bytes.resize(metadata_placeholder.len(), 0);
296
297        // Write the metadata
298        file.seek(SeekFrom::Start(0))?;
299        file.write_all(&metadata_bytes)?;
300
301        // Create and return the compressed memory-mapped array
302        let compressed_mmap = CompressedMemMappedArray::open_impl(
303            path.to_path_buf(),
304            self.cache_size,
305            self.cache_ttl,
306        )?;
307
308        Ok(compressed_mmap)
309    }
310
311    /// Create a compressed memory-mapped array from raw data.
312    ///
313    /// # Arguments
314    ///
315    /// * `data` - The raw data to compress
316    /// * `shape` - The shape of the array
317    /// * `path` - The file path to save the compressed data
318    ///
319    /// # Returns
320    ///
321    /// A compressed memory-mapped array
322    pub fn create_from_raw<A>(
323        &self,
324        data: &[A],
325        shape: &[usize],
326        path: impl AsRef<Path>,
327    ) -> CoreResult<CompressedMemMappedArray<A>>
328    where
329        A: Clone + Copy + 'static + Send + Sync,
330    {
331        // Create ndarray from raw data
332        let array = Array::from_shape_vec(IxDyn(shape), data.to_vec())
333            .map_err(|e| CoreError::ShapeError(ErrorContext::new(format!("{e}"))))?;
334
335        // Create compressed memory-mapped array
336        self.create(&array, path)
337    }
338}
339
340/// A memory-mapped array with transparent compression.
341///
342/// This struct provides a view into a compressed array stored on disk,
343/// with transparent decompression of blocks as they are accessed.
344#[derive(Debug, Clone)]
345pub struct CompressedMemMappedArray<A: Clone + Copy + 'static + Send + Sync> {
346    /// Path to the compressed file
347    path: PathBuf,
348
349    /// Metadata about the compressed file
350    metadata: CompressedFileMetadata,
351
352    /// Block cache for decompressed blocks
353    block_cache: Arc<BlockCache<A>>,
354
355    /// Phantom data for type parameter
356    phantom: std::marker::PhantomData<A>,
357}
358
359impl<A: Clone + Copy + 'static + Send + Sync> CompressedMemMappedArray<A> {
360    /// Open a compressed memory-mapped array from a file.
361    ///
362    /// # Arguments
363    ///
364    /// * `path` - The path to the compressed file
365    ///
366    /// # Returns
367    ///
368    /// A compressed memory-mapped array
369    pub fn open<P: AsRef<Path>>(path: P) -> CoreResult<Self> {
370        // Use default cache settings
371        let cache_size = 32;
372        let cache_ttl = Some(Duration::from_secs(300));
373
374        Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
375    }
376
377    /// Open a compressed memory-mapped array with custom cache settings.
378    ///
379    /// # Arguments
380    ///
381    /// * `path` - The path to the compressed file
382    /// * `cache_size` - The maximum number of blocks to cache
383    /// * `cache_ttl` - The time-to-live for cached blocks
384    ///
385    /// # Returns
386    ///
387    /// A compressed memory-mapped array
388    pub fn open_with_cache<P: AsRef<Path>>(
389        path: P,
390        cache_size: usize,
391        cache_ttl: Option<Duration>,
392    ) -> CoreResult<Self> {
393        Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
394    }
395
396    /// Internal implementation of open.
397    fn open_impl(
398        path: PathBuf,
399        cache_size: usize,
400        cache_ttl: Option<Duration>,
401    ) -> CoreResult<Self> {
402        // Open the file
403        let mut file = File::open(&path)?;
404
405        // Read the metadata from the beginning of the file
406        let mut metadata_bytes = vec![0u8; 1024];
407        file.read_exact(&mut metadata_bytes)?;
408
409        // Parse the metadata
410        // from_utf8_lossy doesn't return an error, it replaces invalid utf8 sequences
411        let metadata_json = String::from_utf8_lossy(&metadata_bytes)
412            .trim_end_matches('\0')
413            .to_string();
414        let metadata: CompressedFileMetadata =
415            serde_json::from_str(&metadata_json).map_err(|e| {
416                CoreError::ValueError(ErrorContext::new(format!(
417                    "Failed to deserialize metadata: {}",
418                    e
419                )))
420            })?;
421
422        // Check that the element _size matches
423        let expected_element_size = std::mem::size_of::<A>();
424        if metadata.element_size != expected_element_size {
425            return Err(CoreError::ValueError(ErrorContext::new(format!(
426                "Element _size mismatch: expected {}, got {}",
427                expected_element_size, metadata.element_size
428            ))));
429        }
430
431        // Create the block cache
432        let block_cache = Arc::new(BlockCache::new(cache_size, cache_ttl));
433
434        // Create and return the array
435        Ok(Self {
436            path,
437            metadata,
438            block_cache,
439            phantom: std::marker::PhantomData,
440        })
441    }
442
443    /// Get the shape of the array.
444    pub fn shape(&self) -> &[usize] {
445        &self.metadata.shape
446    }
447
448    /// Get the total number of elements in the array.
449    pub fn size(&self) -> usize {
450        self.metadata.num_elements
451    }
452
453    /// Get the number of dimensions of the array.
454    pub fn ndim(&self) -> usize {
455        self.metadata.shape.len()
456    }
457
458    /// Get the metadata for the compressed file.
459    pub fn metadata(&self) -> &CompressedFileMetadata {
460        &self.metadata
461    }
462
463    /// Get the block size.
464    pub fn block_size(&self) -> usize {
465        self.metadata.block_size
466    }
467
468    /// Get the number of blocks.
469    pub fn num_blocks(&self) -> usize {
470        self.metadata.num_blocks
471    }
472
473    /// Load a specific block into memory.
474    ///
475    /// This is useful for preloading blocks that will be accessed soon.
476    ///
477    /// # Arguments
478    ///
479    /// * `blockidx` - The index of the block to load
480    ///
481    /// # Returns
482    ///
483    /// `Ok(())` if successful, or an error
484    pub fn preload_block(&self, blockidx: usize) -> CoreResult<()> {
485        if blockidx >= self.metadata.num_blocks {
486            return Err(CoreError::IndexError(ErrorContext::new(format!(
487                "Block index {} out of bounds (max {})",
488                blockidx,
489                self.metadata.num_blocks - 1
490            ))));
491        }
492
493        // Check if the block is already cached
494        if self.block_cache.has_block(blockidx) {
495            return Ok(());
496        }
497
498        // Load and decompress the block
499        let block = self.load_block(blockidx)?;
500
501        // Add to cache
502        self.block_cache.put_block(blockidx, block);
503
504        Ok(())
505    }
506
507    /// Load a block from the compressed file.
508    fn load_block(&self, blockidx: usize) -> CoreResult<Vec<A>> {
509        // Open the file
510        let mut file = File::open(&self.path)?;
511
512        // Get block information
513        let offset = self.metadata.block_offsets[blockidx];
514        let compressed_size = self.metadata.block_compressed_sizes[blockidx];
515        let uncompressed_size = self.metadata.block_uncompressed_sizes[blockidx];
516
517        // Read the compressed block
518        file.seek(SeekFrom::Start(offset))?;
519        let mut compressed_data = vec![0u8; compressed_size];
520        file.read_exact(&mut compressed_data)?;
521
522        // Decompress the block
523        let block_bytes = match self.metadata.compression_algorithm {
524            CompressionAlgorithm::Lz4 => {
525                let mut decoder = Decoder::new(&compressed_data[..])?;
526                let mut decompressed = Vec::with_capacity(uncompressed_size);
527                decoder.read_to_end(&mut decompressed)?;
528                decompressed
529            }
530            CompressionAlgorithm::Zstd => zstd::decode_all(&compressed_data[..])?,
531            CompressionAlgorithm::Snappy => snap::raw::Decoder::new()
532                .decompress_vec(&compressed_data)
533                .map_err(|e| {
534                    CoreError::ComputationError(ErrorContext::new(format!(
535                        "Snappy decompression error: {}",
536                        e
537                    )))
538                })?,
539        };
540
541        // Check that we got the expected number of bytes
542        if block_bytes.len() != uncompressed_size {
543            return Err(CoreError::ValueError(ErrorContext::new(format!(
544                "Block {} decompressed to {} bytes, expected {}",
545                blockidx,
546                block_bytes.len(),
547                uncompressed_size
548            ))));
549        }
550
551        // Convert bytes to elements
552        let num_elements = uncompressed_size / std::mem::size_of::<A>();
553        let mut elements = Vec::with_capacity(num_elements);
554
555        // Interpret bytes as elements (this is safe because A is Copy)
556        for chunk in block_bytes.chunks_exact(std::mem::size_of::<A>()) {
557            let element = unsafe { *(chunk.as_ptr() as *const A) };
558            elements.push(element);
559        }
560
561        Ok(elements)
562    }
563
564    /// Get a read-only view of the entire array.
565    ///
566    /// This will decompress all blocks and load them into memory.
567    ///
568    /// # Returns
569    ///
570    /// A read-only ndarray view of the data
571    pub fn readonly_array(&self) -> CoreResult<Array<A, IxDyn>> {
572        // Allocate an array for the result
573        let mut result = Array::from_elem(IxDyn(&self.metadata.shape), unsafe {
574            std::mem::zeroed::<A>()
575        });
576
577        // Load each block and copy it into the result
578        let mut offset = 0;
579        for blockidx in 0..self.metadata.num_blocks {
580            // Get the block (from cache if available)
581            let block = match self.block_cache.get_block(blockidx) {
582                Some(block) => block,
583                None => {
584                    let block = self.load_block(blockidx)?;
585                    self.block_cache.put_block(blockidx, block.clone());
586                    block
587                }
588            };
589
590            // Copy the block into the result
591            let start = offset;
592            let end = (start + block.len()).min(self.metadata.num_elements);
593            let slice = &mut result.as_slice_mut().expect("Operation failed")[start..end];
594            slice.copy_from_slice(&block[..(end - start)]);
595
596            // Update the offset
597            offset = end;
598        }
599
600        Ok(result)
601    }
602
603    /// Get a specific element from the array.
604    ///
605    /// This will decompress only the block containing the element.
606    ///
607    /// # Arguments
608    ///
609    /// * `indices` - The indices of the element to get
610    ///
611    /// # Returns
612    ///
613    /// The element at the specified indices
614    pub fn get(&self, indices: &[usize]) -> CoreResult<A> {
615        // Check that the indices are valid
616        if indices.len() != self.metadata.shape.len() {
617            return Err(CoreError::DimensionError(ErrorContext::new(format!(
618                "Expected {} indices, got {}",
619                self.metadata.shape.len(),
620                indices.len()
621            ))));
622        }
623
624        for (dim, &idx) in indices.iter().enumerate() {
625            if idx >= self.metadata.shape[dim] {
626                return Err(CoreError::IndexError(ErrorContext::new(format!(
627                    "Index {} out of bounds for dimension {} (max {})",
628                    idx,
629                    dim,
630                    self.metadata.shape[dim] - 1
631                ))));
632            }
633        }
634
635        // Calculate flat index
636        let mut flat_index = 0;
637        let mut stride = 1;
638        for i in (0..indices.len()).rev() {
639            flat_index += indices[i] * stride;
640            if i > 0 {
641                stride *= self.metadata.shape[i];
642            }
643        }
644
645        // Calculate block index
646        let blockidx = flat_index / self.metadata.block_size;
647        let block_offset = flat_index % self.metadata.block_size;
648
649        // Get the block (from cache if available)
650        let block = match self.block_cache.get_block(blockidx) {
651            Some(block) => block,
652            None => {
653                let block = self.load_block(blockidx)?;
654                self.block_cache.put_block(blockidx, block.clone());
655                block
656            }
657        };
658
659        // Get the element
660        if block_offset < block.len() {
661            Ok(block[block_offset])
662        } else {
663            Err(CoreError::IndexError(ErrorContext::new(format!(
664                "Block offset {} out of bounds for block {} (max {})",
665                block_offset,
666                blockidx,
667                block.len() - 1
668            ))))
669        }
670    }
671
672    /// Get a subset of the array as a new array.
673    ///
674    /// This will decompress only the blocks containing the subset.
675    ///
676    /// # Arguments
677    ///
678    /// * `ranges` - The ranges of indices to get for each dimension
679    ///
680    /// # Returns
681    ///
682    /// A new array containing the subset
683    pub fn slice(&self, ranges: &[(usize, usize)]) -> CoreResult<Array<A, IxDyn>> {
684        // Check that the ranges are valid
685        if ranges.len() != self.metadata.shape.len() {
686            return Err(CoreError::DimensionError(ErrorContext::new(format!(
687                "Expected {} ranges, got {}",
688                self.metadata.shape.len(),
689                ranges.len()
690            ))));
691        }
692
693        // Calculate the shape of the result
694        let mut resultshape = Vec::with_capacity(ranges.len());
695        for (dim, &(start, end)) in ranges.iter().enumerate() {
696            if start >= end {
697                return Err(CoreError::ValueError(ErrorContext::new(format!(
698                    "Invalid range for dimension {}: {}..{}",
699                    dim, start, end
700                ))));
701            }
702            if end > self.metadata.shape[dim] {
703                return Err(CoreError::IndexError(ErrorContext::new(format!(
704                    "Range {}..{} out of bounds for dimension {} (max {})",
705                    start, end, dim, self.metadata.shape[dim]
706                ))));
707            }
708            resultshape.push(end - start);
709        }
710
711        // Allocate an array for the result
712        let mut result = Array::from_elem(IxDyn(&resultshape), unsafe { std::mem::zeroed::<A>() });
713
714        // Calculate the total number of elements in the result
715        let result_size = resultshape.iter().product::<usize>();
716
717        // Create iterators for the result indices
718        let mut result_indices = vec![0; ranges.len()];
719        let mut source_indices = Vec::with_capacity(ranges.len());
720        for &(start, _) in ranges.iter() {
721            source_indices.push(start);
722        }
723
724        // Iterate through all elements in the result
725        for _result_flat_idx in 0..result_size {
726            // Calculate source flat index
727            let mut source_flat_idx = 0;
728            let mut stride = 1;
729            for i in (0..source_indices.len()).rev() {
730                source_flat_idx += source_indices[i] * stride;
731                if i > 0 {
732                    stride *= self.metadata.shape[i];
733                }
734            }
735
736            // Get the element from the source
737            let blockidx = source_flat_idx / self.metadata.block_size;
738            let block_offset = source_flat_idx % self.metadata.block_size;
739
740            // Get the block (from cache if available)
741            let block = match self.block_cache.get_block(blockidx) {
742                Some(block) => block,
743                None => {
744                    let block = self.load_block(blockidx)?;
745                    self.block_cache.put_block(blockidx, block.clone());
746                    block
747                }
748            };
749
750            // Get the element and set it in the result
751            if block_offset < block.len() {
752                // Calculate the result flat index
753                let mut result_stride = 1;
754                let mut result_flat_idx = 0;
755                for i in (0..result_indices.len()).rev() {
756                    result_flat_idx += result_indices[i] * result_stride;
757                    if i > 0 {
758                        result_stride *= resultshape[i];
759                    }
760                }
761
762                // Set the element in the result
763                let result_slice = result.as_slice_mut().expect("Operation failed");
764                result_slice[result_flat_idx] = block[block_offset];
765            }
766
767            // Increment the indices
768            for i in (0..ranges.len()).rev() {
769                result_indices[i] += 1;
770                source_indices[i] += 1;
771                if result_indices[i] < resultshape[i] {
772                    break;
773                }
774                result_indices[i] = 0;
775                source_indices[i] = ranges[i].0;
776            }
777        }
778
779        Ok(result)
780    }
781
782    /// Process the array in blocks, with each block loaded and decompressed on demand.
783    ///
784    /// This is useful for operations that can be performed on blocks independently.
785    ///
786    /// # Arguments
787    ///
788    /// * `f` - A function that processes a block of elements
789    ///
790    /// # Returns
791    ///
792    /// A vector of results, one for each block
793    pub fn process_blocks<F, R>(&self, f: F) -> CoreResult<Vec<R>>
794    where
795        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
796        R: Send + 'static,
797    {
798        self.process_blocks_internal(f, false, None)
799    }
800
801    /// Process the array in blocks with a custom block size.
802    ///
803    /// # Arguments
804    ///
805    /// * `block_size` - The block size to use (in elements)
806    /// * `f` - A function that processes a block of elements
807    ///
808    /// # Returns
809    ///
810    /// A vector of results, one for each block
811    pub fn process_blocks_with_size<F, R>(&self, blocksize: usize, f: F) -> CoreResult<Vec<R>>
812    where
813        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
814        R: Send + 'static,
815    {
816        self.process_blocks_internal(f, false, Some(blocksize))
817    }
818
819    /// Process the array in blocks in parallel.
820    ///
821    /// # Arguments
822    ///
823    /// * `f` - A function that processes a block of elements
824    ///
825    /// # Returns
826    ///
827    /// A vector of results, one for each block
828    #[cfg(feature = "parallel")]
829    pub fn process_blocks_parallel<F, R>(&self, f: F) -> CoreResult<Vec<R>>
830    where
831        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
832        R: Send + 'static,
833    {
834        self.process_blocks_internal(f, true, None)
835    }
836
837    /// Process the array in blocks in parallel with a custom block size.
838    ///
839    /// # Arguments
840    ///
841    /// * `block_size` - The block size to use (in elements)
842    /// * `f` - A function that processes a block of elements
843    ///
844    /// # Returns
845    ///
846    /// A vector of results, one for each block
847    #[cfg(feature = "parallel")]
848    pub fn process_blocks_parallel_with_size<F, R>(
849        &self,
850        block_size: usize,
851        f: F,
852    ) -> CoreResult<Vec<R>>
853    where
854        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
855        R: Send + 'static,
856    {
857        self.process_blocks_internal(f, true, Some(block_size))
858    }
859
860    /// Internal implementation of block processing.
861    #[cfg(not(feature = "parallel"))]
862    fn process_blocks_internal<F, R>(
863        &self,
864        mut f: F,
865        _parallel: bool,
866        custom_block_size: Option<usize>,
867    ) -> CoreResult<Vec<R>>
868    where
869        F: FnMut(&[A], usize) -> R,
870    {
871        // Determine block layout
872        let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
873        let num_elements = self.metadata.num_elements;
874        let num_blocks = num_elements.div_ceil(block_size);
875
876        // Serial processing — f is FnMut so it requires mutable access per call
877        let mut results = Vec::with_capacity(num_blocks);
878        for blockidx in 0..num_blocks {
879            let start = blockidx * block_size;
880            let end = (start + block_size).min(num_elements);
881            let elements = self.load_elements(start, end)?;
882            results.push(f(&elements, blockidx));
883        }
884        Ok(results)
885    }
886
887    /// Internal implementation of block processing (parallel version).
888    #[cfg(feature = "parallel")]
889    fn process_blocks_internal<F, R>(
890        &self,
891        f: F,
892        parallel: bool,
893        custom_block_size: Option<usize>,
894    ) -> CoreResult<Vec<R>>
895    where
896        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
897        R: Send + 'static,
898    {
899        // Determine block layout
900        let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
901        let num_elements = self.metadata.num_elements;
902        let num_blocks = num_elements.div_ceil(block_size);
903
904        // Process blocks
905        if parallel {
906            use crate::parallel_ops::*;
907
908            return (0..num_blocks)
909                .into_par_iter()
910                .map(|blockidx| {
911                    // Calculate the range of elements for this block
912                    let start = blockidx * block_size;
913                    let end = (start + block_size).min(num_elements);
914
915                    // Load the elements for this block
916                    let elements = match self.load_elements(start, end) {
917                        Ok(elems) => elems,
918                        Err(e) => return Err(e),
919                    };
920
921                    // Apply the function to the block
922                    Ok(f(&elements, blockidx))
923                })
924                .collect::<Result<Vec<R>, CoreError>>();
925        }
926
927        // Serial processing (used when parallel=false)
928        (0..num_blocks)
929            .map(|blockidx| {
930                // Calculate the range of elements for this block
931                let start = blockidx * block_size;
932                let end = (start + block_size).min(num_elements);
933
934                // Load the elements for this block
935                let elements = self.load_elements(start, end)?;
936
937                // Apply the function to the block
938                Ok(f(&elements, blockidx))
939            })
940            .collect::<Result<Vec<R>, CoreError>>()
941    }
942
943    /// Load a range of elements from the array.
944    ///
945    /// This will decompress all blocks containing the range.
946    ///
947    /// # Arguments
948    ///
949    /// * `start` - The starting element index
950    /// * `end` - The ending element index (exclusive)
951    ///
952    /// # Returns
953    ///
954    /// A vector of elements in the range
955    fn load_elements(&self, start: usize, end: usize) -> CoreResult<Vec<A>> {
956        // Check bounds
957        if start >= self.metadata.num_elements {
958            return Err(CoreError::IndexError(ErrorContext::new(format!(
959                "Start index {} out of bounds (max {})",
960                start,
961                self.metadata.num_elements - 1
962            ))));
963        }
964        if end > self.metadata.num_elements {
965            return Err(CoreError::IndexError(ErrorContext::new(format!(
966                "End index {} out of bounds (max {})",
967                end, self.metadata.num_elements
968            ))));
969        }
970        if start >= end {
971            return Ok(Vec::new());
972        }
973
974        // Determine which blocks we need
975        let start_block = start / self.metadata.block_size;
976        let end_block = (end - 1) / self.metadata.block_size;
977
978        // Allocate space for the result
979        let mut result = Vec::with_capacity(end - start);
980
981        // Load each required block
982        for blockidx in start_block..=end_block {
983            // Get the block (from cache if available)
984            let block = match self.block_cache.get_block(blockidx) {
985                Some(block) => block,
986                None => {
987                    let block = self.load_block(blockidx)?;
988                    self.block_cache.put_block(blockidx, block.clone());
989                    block
990                }
991            };
992
993            // Calculate the range of elements we need from this block
994            let block_start = blockidx * self.metadata.block_size;
995            let block_end = block_start + block.len();
996
997            let range_start = start.max(block_start) - block_start;
998            let range_end = end.min(block_end) - block_start;
999
1000            // Copy the elements into the result
1001            result.extend_from_slice(&block[range_start..range_end]);
1002        }
1003
1004        Ok(result)
1005    }
1006}
1007
1008/// Cache for decompressed blocks.
1009///
1010/// This struct provides a LRU (Least Recently Used) cache for decompressed blocks.
1011#[derive(Debug)]
1012struct BlockCache<A: Clone + Copy + 'static + Send + Sync> {
1013    /// Maximum number of blocks to cache
1014    capacity: usize,
1015
1016    /// Time-to-live for cached blocks
1017    ttl: Option<Duration>,
1018
1019    /// Cache of decompressed blocks
1020    cache: RwLock<HashMap<usize, CachedBlock<A>>>,
1021}
1022
1023/// A cached block with its timestamp.
1024#[derive(Debug, Clone)]
1025struct CachedBlock<A: Clone + Copy + 'static + Send + Sync> {
1026    /// The decompressed block data
1027    data: Vec<A>,
1028
1029    /// The time when the block was last accessed
1030    timestamp: Instant,
1031}
1032
1033impl<A: Clone + Copy + 'static + Send + Sync> BlockCache<A> {
1034    /// Create a new block cache.
1035    ///
1036    /// # Arguments
1037    ///
1038    /// * `capacity` - The maximum number of blocks to cache
1039    /// * `ttl` - The time-to-live for cached blocks
1040    fn new(capacity: usize, ttl: Option<Duration>) -> Self {
1041        Self {
1042            capacity,
1043            ttl,
1044            cache: RwLock::new(HashMap::new()),
1045        }
1046    }
1047
1048    /// Check if a block is in the cache.
1049    ///
1050    /// # Arguments
1051    ///
1052    /// * `blockidx` - The index of the block to check
1053    ///
1054    /// # Returns
1055    ///
1056    /// `true` if the block is in the cache, `false` otherwise
1057    fn has_block(&self, blockidx: usize) -> bool {
1058        let cache = self.cache.read().expect("Operation failed");
1059
1060        // Check if the block is in the cache
1061        if let Some(cached) = cache.get(&blockidx) {
1062            // Check if the block has expired
1063            if let Some(ttl) = self.ttl {
1064                if cached.timestamp.elapsed() > ttl {
1065                    return false;
1066                }
1067            }
1068
1069            true
1070        } else {
1071            false
1072        }
1073    }
1074
1075    /// Get a block from the cache.
1076    ///
1077    /// # Arguments
1078    ///
1079    /// * `blockidx` - The index of the block to get
1080    ///
1081    /// # Returns
1082    ///
1083    /// The block if it is in the cache, `None` otherwise
1084    fn get_block(&self, blockidx: usize) -> Option<Vec<A>> {
1085        let mut cache = self.cache.write().expect("Operation failed");
1086
1087        // Check if the block is in the cache
1088        if let Some(mut cached) = cache.remove(&blockidx) {
1089            // Check if the block has expired
1090            if let Some(ttl) = self.ttl {
1091                if cached.timestamp.elapsed() > ttl {
1092                    return None;
1093                }
1094            }
1095
1096            // Update the timestamp
1097            cached.timestamp = Instant::now();
1098
1099            // Put the block back in the cache
1100            let data = cached.data.clone();
1101            cache.insert(blockidx, cached);
1102
1103            Some(data)
1104        } else {
1105            None
1106        }
1107    }
1108
1109    /// Put a block in the cache.
1110    ///
1111    /// # Arguments
1112    ///
1113    /// * `blockidx` - The index of the block to put
1114    /// * `block` - The block data
1115    fn put_block(&self, blockidx: usize, block: Vec<A>) {
1116        let mut cache = self.cache.write().expect("Operation failed");
1117
1118        // Check if we need to evict a block
1119        if cache.len() >= self.capacity && !cache.contains_key(&blockidx) {
1120            // Find the least recently used block
1121            if let Some(lru_idx) = cache
1122                .iter()
1123                .min_by_key(|(_, cached)| cached.timestamp)
1124                .map(|(idx, _)| *idx)
1125            {
1126                cache.remove(&lru_idx);
1127            }
1128        }
1129
1130        // Add the block to the cache
1131        cache.insert(
1132            blockidx,
1133            CachedBlock {
1134                data: block,
1135                timestamp: Instant::now(),
1136            },
1137        );
1138    }
1139
1140    /// Clear the cache.
1141    #[allow(dead_code)]
1142    fn clear(&self) {
1143        let mut cache = self.cache.write().expect("Operation failed");
1144        cache.clear();
1145    }
1146
1147    /// Get the number of blocks in the cache.
1148    #[allow(dead_code)]
1149    fn len(&self) -> usize {
1150        let cache = self.cache.read().expect("Operation failed");
1151        cache.len()
1152    }
1153
1154    /// Check if the cache is empty.
1155    #[allow(dead_code)]
1156    fn is_empty(&self) -> bool {
1157        let cache = self.cache.read().expect("Operation failed");
1158        cache.is_empty()
1159    }
1160}
1161
1162#[cfg(test)]
1163mod tests {
1164    use super::*;
1165    use ::ndarray::Array2;
1166    use tempfile::tempdir;
1167
1168    #[test]
1169    fn test_compressed_memmapped_array_1d() {
1170        // Create a temporary directory for our test files
1171        let dir = tempdir().expect("Operation failed");
1172        let file_path = dir.path().join("test_compressed_1d.cmm");
1173
1174        // Create test data
1175        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1176
1177        // Create a builder
1178        let builder = CompressedMemMapBuilder::new()
1179            .with_block_size(100)
1180            .with_algorithm(CompressionAlgorithm::Lz4)
1181            .with_level(1)
1182            .with_cache_size(4)
1183            .with_description("Test 1D array");
1184
1185        // Create the compressed memory-mapped array
1186        let cmm = builder
1187            .create_from_raw(&data, &[1000], &file_path)
1188            .expect("Operation failed");
1189
1190        // Check metadata
1191        assert_eq!(cmm.shape(), &[1000]);
1192        assert_eq!(cmm.size(), 1000);
1193        assert_eq!(cmm.ndim(), 1);
1194
1195        // Test random access
1196        for i in 0..10 {
1197            let val = cmm.get(&[i * 100]).expect("Operation failed");
1198            assert_eq!(val, (i * 100) as f64);
1199        }
1200
1201        // Test slicing
1202        let slice = cmm.slice(&[(200, 300)]).expect("Operation failed");
1203        assert_eq!(slice.shape(), &[100]);
1204        for i in 0..100 {
1205            assert_eq!(slice[crate::ndarray::IxDyn(&[i])], (i + 200) as f64);
1206        }
1207
1208        // Test block processing
1209        let sums = cmm
1210            .process_blocks(|block, _| block.iter().sum::<f64>())
1211            .expect("Test: operation failed");
1212
1213        assert_eq!(sums.len(), 10); // 1000 elements / 100 block size = 10 blocks
1214
1215        // Test loading the entire array
1216        let array = cmm.readonly_array().expect("Operation failed");
1217        assert_eq!(array.shape(), &[1000]);
1218        for i in 0..1000 {
1219            assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1220        }
1221    }
1222
1223    #[test]
1224    fn test_compressed_memmapped_array_2d() {
1225        // Create a temporary directory for our test files
1226        let dir = tempdir().expect("Operation failed");
1227        let file_path = dir.path().join("test_compressed_2d.cmm");
1228
1229        // Create test data - 10x10 matrix
1230        let data = Array2::<f64>::from_shape_fn((10, 10), |(i, j)| (i * 10 + j) as f64);
1231
1232        // Create a builder
1233        let builder = CompressedMemMapBuilder::new()
1234            .with_block_size(25) // 5x5 blocks
1235            .with_algorithm(CompressionAlgorithm::Lz4)
1236            .with_level(1)
1237            .with_cache_size(4)
1238            .with_description("Test 2D array");
1239
1240        // Create the compressed memory-mapped array
1241        let cmm = builder.create(&data, &file_path).expect("Operation failed");
1242
1243        // Check metadata
1244        assert_eq!(cmm.shape(), &[10, 10]);
1245        assert_eq!(cmm.size(), 100);
1246        assert_eq!(cmm.ndim(), 2);
1247
1248        // Test random access
1249        for i in 0..10 {
1250            for j in 0..10 {
1251                let val = cmm.get(&[i, j]).expect("Operation failed");
1252                assert_eq!(val, (i * 10 + j) as f64);
1253            }
1254        }
1255
1256        // Test slicing
1257        let slice = cmm.slice(&[(2, 5), (3, 7)]).expect("Operation failed");
1258        assert_eq!(slice.shape(), &[3, 4]);
1259        for i in 0..3 {
1260            for j in 0..4 {
1261                assert_eq!(
1262                    slice[crate::ndarray::IxDyn(&[i, j])],
1263                    ((i + 2) * 10 + (j + 3)) as f64
1264                );
1265            }
1266        }
1267
1268        // Test loading the entire array
1269        let array = cmm.readonly_array().expect("Operation failed");
1270        assert_eq!(array.shape(), &[10, 10]);
1271        for i in 0..10 {
1272            for j in 0..10 {
1273                assert_eq!(array[crate::ndarray::IxDyn(&[i, j])], (i * 10 + j) as f64);
1274            }
1275        }
1276    }
1277
1278    #[test]
1279    fn test_different_compression_algorithms() {
1280        // Create a temporary directory for our test files
1281        let dir = tempdir().expect("Operation failed");
1282
1283        // Create test data
1284        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1285
1286        // Test each compression algorithm
1287        for algorithm in &[
1288            CompressionAlgorithm::Lz4,
1289            CompressionAlgorithm::Zstd,
1290            CompressionAlgorithm::Snappy,
1291        ] {
1292            let file_path = dir.path().join(format!("test_{:?}.cmm", algorithm));
1293
1294            // Create a builder
1295            let builder = CompressedMemMapBuilder::new()
1296                .with_block_size(100)
1297                .with_algorithm(*algorithm)
1298                .with_level(1)
1299                .with_cache_size(4);
1300
1301            // Create the compressed memory-mapped array
1302            let cmm = builder
1303                .create_from_raw(&data, &[1000], &file_path)
1304                .expect("Operation failed");
1305
1306            // Test loading the entire array
1307            let array = cmm.readonly_array().expect("Operation failed");
1308            for i in 0..1000 {
1309                assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1310            }
1311        }
1312    }
1313
1314    #[test]
1315    fn test_block_cache() {
1316        // Create a temporary directory for our test files
1317        let dir = tempdir().expect("Operation failed");
1318        let file_path = dir.path().join("test_cache.cmm");
1319
1320        // Create test data
1321        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1322
1323        // Create compressed arrays with different cache settings
1324        let small_cache = CompressedMemMapBuilder::new()
1325            .with_block_size(100)
1326            .with_cache_size(2) // Very small cache
1327            .create_from_raw(&data, &[1000], &file_path).expect("Operation failed");
1328
1329        // Test cache behavior
1330        // First, load all blocks to fill the cache
1331        for i in 0..10 {
1332            small_cache.preload_block(i).expect("Operation failed");
1333        }
1334
1335        // Check the cache size - should be 2 (capacity)
1336        assert_eq!(small_cache.block_cache.len(), 2);
1337
1338        // Now access a block that's not in the cache
1339        // This should evict the least recently used block
1340        let val = small_cache.get(&[0]).expect("Operation failed"); // Block 0
1341        assert_eq!(val, 0.0);
1342
1343        // Check that the block is now in the cache
1344        assert!(small_cache.block_cache.has_block(0));
1345    }
1346
1347    #[test]
1348    fn test_block_preloading() {
1349        // Create a temporary directory for our test files
1350        let dir = tempdir().expect("Operation failed");
1351        let file_path = dir.path().join("test_preload.cmm");
1352
1353        // Create test data
1354        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1355
1356        // Create the compressed memory-mapped array
1357        let cmm = CompressedMemMapBuilder::new()
1358            .with_block_size(100)
1359            .create_from_raw(&data, &[1000], &file_path)
1360            .expect("Test: operation failed");
1361
1362        // Preload a block
1363        cmm.preload_block(5).expect("Operation failed");
1364
1365        // Check that the block is now in the cache
1366        assert!(cmm.block_cache.has_block(5));
1367
1368        // Access an element from the preloaded block
1369        let val = cmm.get(&[550]).expect("Operation failed"); // In block 5
1370        assert_eq!(val, 550.0);
1371    }
1372}