scirs2_core/memory_efficient/
compressed_memmap.rs

1//! Compressed memory-mapped arrays.
2//!
3//! This module provides functionality for memory-mapping arrays with transparent
4//! compression and decompression. This can significantly reduce disk space
5//! requirements while maintaining the benefits of memory-mapping for large data.
6//!
7//! The implementation uses a block-based approach, where data is split into blocks
8//! that are compressed independently. This allows for efficient random access and
9//! partial loading of the data.
10
11use crate::error::{CoreError, CoreResult, ErrorContext};
12use ::ndarray::{Array, ArrayBase, Dimension, IxDyn, RawData};
13
14#[cfg(feature = "memory_compression")]
15use lz4::{Decoder, EncoderBuilder};
16
17use std::collections::HashMap;
18use std::fs::{File, OpenOptions};
19use std::io::{Read, Seek, SeekFrom, Write};
20use std::path::{Path, PathBuf};
21use std::sync::{Arc, RwLock};
22use std::time::{Duration, Instant};
23
24/// Metadata for a compressed memory-mapped file
25#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
26pub struct CompressedFileMetadata {
27    /// Original array shape
28    pub shape: Vec<usize>,
29
30    /// Element type information (size in bytes)
31    pub element_size: usize,
32
33    /// Total number of elements in the array
34    pub num_elements: usize,
35
36    /// Block size in elements
37    pub block_size: usize,
38
39    /// Number of blocks
40    pub num_blocks: usize,
41
42    /// Offset of each block in the compressed file
43    pub block_offsets: Vec<u64>,
44
45    /// Compressed size of each block
46    pub block_compressed_sizes: Vec<usize>,
47
48    /// Uncompressed size of each block
49    pub block_uncompressed_sizes: Vec<usize>,
50
51    /// Compression algorithm used
52    pub compression_algorithm: CompressionAlgorithm,
53
54    /// Compression level
55    pub compression_level: i32,
56
57    /// Creation timestamp
58    pub creation_time: chrono::DateTime<chrono::Utc>,
59
60    /// Optional description
61    pub description: Option<String>,
62}
63
64/// Supported compression algorithms
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
66pub enum CompressionAlgorithm {
67    /// LZ4 compression
68    #[default]
69    Lz4,
70    /// Zstd compression
71    Zstd,
72    /// Snappy compression
73    Snappy,
74}
75
76/// Builder for compressed memory-mapped arrays.
77#[derive(Debug, Clone)]
78pub struct CompressedMemMapBuilder {
79    /// Block size in elements
80    block_size: usize,
81
82    /// Compression algorithm
83    algorithm: CompressionAlgorithm,
84
85    /// Compression level
86    level: i32,
87
88    /// Maximum cache size in blocks
89    cache_size: usize,
90
91    /// Cache time-to-live in seconds
92    cache_ttl: Option<Duration>,
93
94    /// Description
95    description: Option<String>,
96}
97
98impl Default for CompressedMemMapBuilder {
99    fn default() -> Self {
100        Self {
101            block_size: 65536, // 64K elements by default
102            algorithm: CompressionAlgorithm::Lz4,
103            level: 1,                                  // Default compression level
104            cache_size: 32,                            // Cache 32 blocks by default
105            cache_ttl: Some(Duration::from_secs(300)), // 5 minute TTL
106            description: None,
107        }
108    }
109}
110
111impl CompressedMemMapBuilder {
112    /// Create a new builder with default settings.
113    pub fn new() -> Self {
114        Self::default()
115    }
116
117    /// Set the block size in elements.
118    ///
119    /// Larger blocks provide better compression but slower random access.
120    pub fn with_block_size(mut self, blocksize: usize) -> Self {
121        self.block_size = blocksize;
122        self
123    }
124
125    /// Set the compression algorithm.
126    pub fn with_algorithm(mut self, algorithm: CompressionAlgorithm) -> Self {
127        self.algorithm = algorithm;
128        self
129    }
130
131    /// Set the compression level.
132    ///
133    /// Higher levels provide better compression but slower compression speed.
134    /// The valid range depends on the algorithm.
135    /// - LZ4: 1-12
136    /// - Zstd: 1-22
137    /// - Snappy: Ignores this parameter
138    pub fn with_level(mut self, level: i32) -> Self {
139        self.level = level;
140        self
141    }
142
143    /// Set the maximum cache size in blocks.
144    ///
145    /// Larger cache sizes allow for more decompressed blocks to be held in memory,
146    /// potentially improving performance for repeated access patterns.
147    pub fn with_cache_size(mut self, cachesize: usize) -> Self {
148        self.cache_size = cachesize;
149        self
150    }
151
152    /// Set the cache time-to-live duration.
153    ///
154    /// Blocks will be evicted from cache after this duration if not accessed.
155    /// Set to None for no time-based eviction.
156    pub fn with_cache_ttl(mut self, ttl: Option<Duration>) -> Self {
157        self.cache_ttl = ttl;
158        self
159    }
160
161    /// Set an optional description.
162    pub fn with_description(mut self, description: impl Into<String>) -> Self {
163        self.description = Some(description.into());
164        self
165    }
166
167    /// Create a compressed memory-mapped array from existing data.
168    ///
169    /// # Arguments
170    ///
171    /// * `data` - The data to compress
172    /// * `path` - The file path to save the compressed data
173    ///
174    /// # Returns
175    ///
176    /// A compressed memory-mapped array
177    pub fn create<A, S, D>(
178        &self,
179        data: &ArrayBase<S, D>,
180        path: impl AsRef<Path>,
181    ) -> CoreResult<CompressedMemMappedArray<A>>
182    where
183        A: Clone + Copy + 'static + Send + Sync,
184        S: RawData<Elem = A>,
185        D: Dimension,
186    {
187        // Get array information
188        let shape = data.shape().to_vec();
189        let num_elements = data.len();
190        let element_size = std::mem::size_of::<A>();
191
192        // Calculate block information
193        let block_size = self.block_size.min(num_elements);
194        let num_blocks = num_elements.div_ceil(block_size);
195
196        // Prepare metadata
197        let mut metadata = CompressedFileMetadata {
198            shape,
199            element_size,
200            num_elements,
201            block_size,
202            num_blocks,
203            block_offsets: Vec::with_capacity(num_blocks),
204            block_compressed_sizes: Vec::with_capacity(num_blocks),
205            block_uncompressed_sizes: Vec::with_capacity(num_blocks),
206            compression_algorithm: self.algorithm,
207            compression_level: self.level,
208            creation_time: chrono::Utc::now(),
209            description: self.description.clone(),
210        };
211
212        // Create output file
213        let path = path.as_ref();
214        let mut file = OpenOptions::new()
215            .read(true)
216            .write(true)
217            .create(true)
218            .truncate(true)
219            .open(path)?;
220
221        // Reserve space for metadata (will write it after compression)
222        let metadata_placeholder = vec![0u8; 1024];
223        file.write_all(&metadata_placeholder)?;
224
225        // Compress each block
226        let data_ptr = data.as_ptr() as *const u8;
227        let mut current_offset = metadata_placeholder.len() as u64;
228
229        for blockidx in 0..num_blocks {
230            let start_element = blockidx * block_size;
231            let end_element = (start_element + block_size).min(num_elements);
232            let block_elements = end_element - start_element;
233            let uncompressed_size = block_elements * element_size;
234
235            // Record the block offset
236            metadata.block_offsets.push(current_offset);
237            metadata.block_uncompressed_sizes.push(uncompressed_size);
238
239            // Get the data for this block
240            let data_offset = start_element * element_size;
241            let block_data =
242                unsafe { std::slice::from_raw_parts(data_ptr.add(data_offset), uncompressed_size) };
243
244            // Compress the block
245            let compressed_data = match self.algorithm {
246                CompressionAlgorithm::Lz4 => {
247                    let mut encoder = EncoderBuilder::new()
248                        .level(self.level as u32)
249                        .build(Vec::new())?;
250                    encoder.write_all(block_data)?;
251                    let (compressed, result) = encoder.finish();
252                    result?;
253                    compressed
254                }
255                CompressionAlgorithm::Zstd => zstd::encode_all(block_data, self.level)?,
256                CompressionAlgorithm::Snappy => snap::raw::Encoder::new()
257                    .compress_vec(block_data)
258                    .map_err(|e| {
259                        CoreError::ComputationError(ErrorContext::new(format!(
260                            "Snappy compression error: {}",
261                            e
262                        )))
263                    })?,
264            };
265
266            // Record the compressed size
267            metadata.block_compressed_sizes.push(compressed_data.len());
268
269            // Write the compressed block
270            file.write_all(&compressed_data)?;
271
272            // Update the offset for the next block
273            current_offset += compressed_data.len() as u64;
274        }
275
276        // Write the metadata at the beginning of the file
277        let metadata_json = serde_json::to_string(&metadata).map_err(|e| {
278            CoreError::ValueError(ErrorContext::new(format!(
279                "Failed to serialize metadata: {}",
280                e
281            )))
282        })?;
283        let mut metadata_bytes = metadata_json.into_bytes();
284
285        // Ensure the metadata fits in the reserved space
286        if metadata_bytes.len() > metadata_placeholder.len() {
287            return Err(CoreError::ValueError(ErrorContext::new(format!(
288                "Metadata size ({} bytes) exceeds reserved space ({} bytes)",
289                metadata_bytes.len(),
290                metadata_placeholder.len()
291            ))));
292        }
293
294        // Pad the metadata to the reserved size
295        metadata_bytes.resize(metadata_placeholder.len(), 0);
296
297        // Write the metadata
298        file.seek(SeekFrom::Start(0))?;
299        file.write_all(&metadata_bytes)?;
300
301        // Create and return the compressed memory-mapped array
302        let compressed_mmap = CompressedMemMappedArray::open_impl(
303            path.to_path_buf(),
304            self.cache_size,
305            self.cache_ttl,
306        )?;
307
308        Ok(compressed_mmap)
309    }
310
311    /// Create a compressed memory-mapped array from raw data.
312    ///
313    /// # Arguments
314    ///
315    /// * `data` - The raw data to compress
316    /// * `shape` - The shape of the array
317    /// * `path` - The file path to save the compressed data
318    ///
319    /// # Returns
320    ///
321    /// A compressed memory-mapped array
322    pub fn create_from_raw<A>(
323        &self,
324        data: &[A],
325        shape: &[usize],
326        path: impl AsRef<Path>,
327    ) -> CoreResult<CompressedMemMappedArray<A>>
328    where
329        A: Clone + Copy + 'static + Send + Sync,
330    {
331        // Create ndarray from raw data
332        let array = Array::from_shape_vec(IxDyn(shape), data.to_vec())
333            .map_err(|e| CoreError::ShapeError(ErrorContext::new(format!("{e}"))))?;
334
335        // Create compressed memory-mapped array
336        self.create(&array, path)
337    }
338}
339
340/// A memory-mapped array with transparent compression.
341///
342/// This struct provides a view into a compressed array stored on disk,
343/// with transparent decompression of blocks as they are accessed.
344#[derive(Debug, Clone)]
345pub struct CompressedMemMappedArray<A: Clone + Copy + 'static + Send + Sync> {
346    /// Path to the compressed file
347    path: PathBuf,
348
349    /// Metadata about the compressed file
350    metadata: CompressedFileMetadata,
351
352    /// Block cache for decompressed blocks
353    block_cache: Arc<BlockCache<A>>,
354
355    /// Phantom data for type parameter
356    phantom: std::marker::PhantomData<A>,
357}
358
359impl<A: Clone + Copy + 'static + Send + Sync> CompressedMemMappedArray<A> {
360    /// Open a compressed memory-mapped array from a file.
361    ///
362    /// # Arguments
363    ///
364    /// * `path` - The path to the compressed file
365    ///
366    /// # Returns
367    ///
368    /// A compressed memory-mapped array
369    pub fn open<P: AsRef<Path>>(path: P) -> CoreResult<Self> {
370        // Use default cache settings
371        let cache_size = 32;
372        let cache_ttl = Some(Duration::from_secs(300));
373
374        Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
375    }
376
377    /// Open a compressed memory-mapped array with custom cache settings.
378    ///
379    /// # Arguments
380    ///
381    /// * `path` - The path to the compressed file
382    /// * `cache_size` - The maximum number of blocks to cache
383    /// * `cache_ttl` - The time-to-live for cached blocks
384    ///
385    /// # Returns
386    ///
387    /// A compressed memory-mapped array
388    pub fn open_with_cache<P: AsRef<Path>>(
389        path: P,
390        cache_size: usize,
391        cache_ttl: Option<Duration>,
392    ) -> CoreResult<Self> {
393        Self::open_impl(path.as_ref().to_path_buf(), cache_size, cache_ttl)
394    }
395
396    /// Internal implementation of open.
397    fn open_impl(
398        path: PathBuf,
399        cache_size: usize,
400        cache_ttl: Option<Duration>,
401    ) -> CoreResult<Self> {
402        // Open the file
403        let mut file = File::open(&path)?;
404
405        // Read the metadata from the beginning of the file
406        let mut metadata_bytes = vec![0u8; 1024];
407        file.read_exact(&mut metadata_bytes)?;
408
409        // Parse the metadata
410        // from_utf8_lossy doesn't return an error, it replaces invalid utf8 sequences
411        let metadata_json = String::from_utf8_lossy(&metadata_bytes)
412            .trim_end_matches('\0')
413            .to_string();
414        let metadata: CompressedFileMetadata =
415            serde_json::from_str(&metadata_json).map_err(|e| {
416                CoreError::ValueError(ErrorContext::new(format!(
417                    "Failed to deserialize metadata: {}",
418                    e
419                )))
420            })?;
421
422        // Check that the element _size matches
423        let expected_element_size = std::mem::size_of::<A>();
424        if metadata.element_size != expected_element_size {
425            return Err(CoreError::ValueError(ErrorContext::new(format!(
426                "Element _size mismatch: expected {}, got {}",
427                expected_element_size, metadata.element_size
428            ))));
429        }
430
431        // Create the block cache
432        let block_cache = Arc::new(BlockCache::new(cache_size, cache_ttl));
433
434        // Create and return the array
435        Ok(Self {
436            path,
437            metadata,
438            block_cache,
439            phantom: std::marker::PhantomData,
440        })
441    }
442
443    /// Get the shape of the array.
444    pub fn shape(&self) -> &[usize] {
445        &self.metadata.shape
446    }
447
448    /// Get the total number of elements in the array.
449    pub fn size(&self) -> usize {
450        self.metadata.num_elements
451    }
452
453    /// Get the number of dimensions of the array.
454    pub fn ndim(&self) -> usize {
455        self.metadata.shape.len()
456    }
457
458    /// Get the metadata for the compressed file.
459    pub fn metadata(&self) -> &CompressedFileMetadata {
460        &self.metadata
461    }
462
463    /// Get the block size.
464    pub fn block_size(&self) -> usize {
465        self.metadata.block_size
466    }
467
468    /// Get the number of blocks.
469    pub fn num_blocks(&self) -> usize {
470        self.metadata.num_blocks
471    }
472
473    /// Load a specific block into memory.
474    ///
475    /// This is useful for preloading blocks that will be accessed soon.
476    ///
477    /// # Arguments
478    ///
479    /// * `blockidx` - The index of the block to load
480    ///
481    /// # Returns
482    ///
483    /// `Ok(())` if successful, or an error
484    pub fn preload_block(&self, blockidx: usize) -> CoreResult<()> {
485        if blockidx >= self.metadata.num_blocks {
486            return Err(CoreError::IndexError(ErrorContext::new(format!(
487                "Block index {} out of bounds (max {})",
488                blockidx,
489                self.metadata.num_blocks - 1
490            ))));
491        }
492
493        // Check if the block is already cached
494        if self.block_cache.has_block(blockidx) {
495            return Ok(());
496        }
497
498        // Load and decompress the block
499        let block = self.load_block(blockidx)?;
500
501        // Add to cache
502        self.block_cache.put_block(blockidx, block);
503
504        Ok(())
505    }
506
507    /// Load a block from the compressed file.
508    fn load_block(&self, blockidx: usize) -> CoreResult<Vec<A>> {
509        // Open the file
510        let mut file = File::open(&self.path)?;
511
512        // Get block information
513        let offset = self.metadata.block_offsets[blockidx];
514        let compressed_size = self.metadata.block_compressed_sizes[blockidx];
515        let uncompressed_size = self.metadata.block_uncompressed_sizes[blockidx];
516
517        // Read the compressed block
518        file.seek(SeekFrom::Start(offset))?;
519        let mut compressed_data = vec![0u8; compressed_size];
520        file.read_exact(&mut compressed_data)?;
521
522        // Decompress the block
523        let block_bytes = match self.metadata.compression_algorithm {
524            CompressionAlgorithm::Lz4 => {
525                let mut decoder = Decoder::new(&compressed_data[..])?;
526                let mut decompressed = Vec::with_capacity(uncompressed_size);
527                decoder.read_to_end(&mut decompressed)?;
528                decompressed
529            }
530            CompressionAlgorithm::Zstd => zstd::decode_all(&compressed_data[..])?,
531            CompressionAlgorithm::Snappy => snap::raw::Decoder::new()
532                .decompress_vec(&compressed_data)
533                .map_err(|e| {
534                    CoreError::ComputationError(ErrorContext::new(format!(
535                        "Snappy decompression error: {}",
536                        e
537                    )))
538                })?,
539        };
540
541        // Check that we got the expected number of bytes
542        if block_bytes.len() != uncompressed_size {
543            return Err(CoreError::ValueError(ErrorContext::new(format!(
544                "Block {} decompressed to {} bytes, expected {}",
545                blockidx,
546                block_bytes.len(),
547                uncompressed_size
548            ))));
549        }
550
551        // Convert bytes to elements
552        let num_elements = uncompressed_size / std::mem::size_of::<A>();
553        let mut elements = Vec::with_capacity(num_elements);
554
555        // Interpret bytes as elements (this is safe because A is Copy)
556        for chunk in block_bytes.chunks_exact(std::mem::size_of::<A>()) {
557            let element = unsafe { *(chunk.as_ptr() as *const A) };
558            elements.push(element);
559        }
560
561        Ok(elements)
562    }
563
564    /// Get a read-only view of the entire array.
565    ///
566    /// This will decompress all blocks and load them into memory.
567    ///
568    /// # Returns
569    ///
570    /// A read-only ndarray view of the data
571    pub fn readonly_array(&self) -> CoreResult<Array<A, IxDyn>> {
572        // Allocate an array for the result
573        let mut result = Array::from_elem(IxDyn(&self.metadata.shape), unsafe {
574            std::mem::zeroed::<A>()
575        });
576
577        // Load each block and copy it into the result
578        let mut offset = 0;
579        for blockidx in 0..self.metadata.num_blocks {
580            // Get the block (from cache if available)
581            let block = match self.block_cache.get_block(blockidx) {
582                Some(block) => block,
583                None => {
584                    let block = self.load_block(blockidx)?;
585                    self.block_cache.put_block(blockidx, block.clone());
586                    block
587                }
588            };
589
590            // Copy the block into the result
591            let start = offset;
592            let end = (start + block.len()).min(self.metadata.num_elements);
593            let slice = &mut result.as_slice_mut().expect("Operation failed")[start..end];
594            slice.copy_from_slice(&block[..(end - start)]);
595
596            // Update the offset
597            offset = end;
598        }
599
600        Ok(result)
601    }
602
603    /// Get a specific element from the array.
604    ///
605    /// This will decompress only the block containing the element.
606    ///
607    /// # Arguments
608    ///
609    /// * `indices` - The indices of the element to get
610    ///
611    /// # Returns
612    ///
613    /// The element at the specified indices
614    pub fn get(&self, indices: &[usize]) -> CoreResult<A> {
615        // Check that the indices are valid
616        if indices.len() != self.metadata.shape.len() {
617            return Err(CoreError::DimensionError(ErrorContext::new(format!(
618                "Expected {} indices, got {}",
619                self.metadata.shape.len(),
620                indices.len()
621            ))));
622        }
623
624        for (dim, &idx) in indices.iter().enumerate() {
625            if idx >= self.metadata.shape[dim] {
626                return Err(CoreError::IndexError(ErrorContext::new(format!(
627                    "Index {} out of bounds for dimension {} (max {})",
628                    idx,
629                    dim,
630                    self.metadata.shape[dim] - 1
631                ))));
632            }
633        }
634
635        // Calculate flat index
636        let mut flat_index = 0;
637        let mut stride = 1;
638        for i in (0..indices.len()).rev() {
639            flat_index += indices[i] * stride;
640            if i > 0 {
641                stride *= self.metadata.shape[i];
642            }
643        }
644
645        // Calculate block index
646        let blockidx = flat_index / self.metadata.block_size;
647        let block_offset = flat_index % self.metadata.block_size;
648
649        // Get the block (from cache if available)
650        let block = match self.block_cache.get_block(blockidx) {
651            Some(block) => block,
652            None => {
653                let block = self.load_block(blockidx)?;
654                self.block_cache.put_block(blockidx, block.clone());
655                block
656            }
657        };
658
659        // Get the element
660        if block_offset < block.len() {
661            Ok(block[block_offset])
662        } else {
663            Err(CoreError::IndexError(ErrorContext::new(format!(
664                "Block offset {} out of bounds for block {} (max {})",
665                block_offset,
666                blockidx,
667                block.len() - 1
668            ))))
669        }
670    }
671
672    /// Get a subset of the array as a new array.
673    ///
674    /// This will decompress only the blocks containing the subset.
675    ///
676    /// # Arguments
677    ///
678    /// * `ranges` - The ranges of indices to get for each dimension
679    ///
680    /// # Returns
681    ///
682    /// A new array containing the subset
683    pub fn slice(&self, ranges: &[(usize, usize)]) -> CoreResult<Array<A, IxDyn>> {
684        // Check that the ranges are valid
685        if ranges.len() != self.metadata.shape.len() {
686            return Err(CoreError::DimensionError(ErrorContext::new(format!(
687                "Expected {} ranges, got {}",
688                self.metadata.shape.len(),
689                ranges.len()
690            ))));
691        }
692
693        // Calculate the shape of the result
694        let mut resultshape = Vec::with_capacity(ranges.len());
695        for (dim, &(start, end)) in ranges.iter().enumerate() {
696            if start >= end {
697                return Err(CoreError::ValueError(ErrorContext::new(format!(
698                    "Invalid range for dimension {}: {}..{}",
699                    dim, start, end
700                ))));
701            }
702            if end > self.metadata.shape[dim] {
703                return Err(CoreError::IndexError(ErrorContext::new(format!(
704                    "Range {}..{} out of bounds for dimension {} (max {})",
705                    start, end, dim, self.metadata.shape[dim]
706                ))));
707            }
708            resultshape.push(end - start);
709        }
710
711        // Allocate an array for the result
712        let mut result = Array::from_elem(IxDyn(&resultshape), unsafe { std::mem::zeroed::<A>() });
713
714        // Calculate the total number of elements in the result
715        let result_size = resultshape.iter().product::<usize>();
716
717        // Create iterators for the result indices
718        let mut result_indices = vec![0; ranges.len()];
719        let mut source_indices = Vec::with_capacity(ranges.len());
720        for &(start, _) in ranges.iter() {
721            source_indices.push(start);
722        }
723
724        // Iterate through all elements in the result
725        for _result_flat_idx in 0..result_size {
726            // Calculate source flat index
727            let mut source_flat_idx = 0;
728            let mut stride = 1;
729            for i in (0..source_indices.len()).rev() {
730                source_flat_idx += source_indices[i] * stride;
731                if i > 0 {
732                    stride *= self.metadata.shape[i];
733                }
734            }
735
736            // Get the element from the source
737            let blockidx = source_flat_idx / self.metadata.block_size;
738            let block_offset = source_flat_idx % self.metadata.block_size;
739
740            // Get the block (from cache if available)
741            let block = match self.block_cache.get_block(blockidx) {
742                Some(block) => block,
743                None => {
744                    let block = self.load_block(blockidx)?;
745                    self.block_cache.put_block(blockidx, block.clone());
746                    block
747                }
748            };
749
750            // Get the element and set it in the result
751            if block_offset < block.len() {
752                // Calculate the result flat index
753                let mut result_stride = 1;
754                let mut result_flat_idx = 0;
755                for i in (0..result_indices.len()).rev() {
756                    result_flat_idx += result_indices[i] * result_stride;
757                    if i > 0 {
758                        result_stride *= resultshape[i];
759                    }
760                }
761
762                // Set the element in the result
763                let result_slice = result.as_slice_mut().expect("Operation failed");
764                result_slice[result_flat_idx] = block[block_offset];
765            }
766
767            // Increment the indices
768            for i in (0..ranges.len()).rev() {
769                result_indices[i] += 1;
770                source_indices[i] += 1;
771                if result_indices[i] < resultshape[i] {
772                    break;
773                }
774                result_indices[i] = 0;
775                source_indices[i] = ranges[i].0;
776            }
777        }
778
779        Ok(result)
780    }
781
782    /// Process the array in blocks, with each block loaded and decompressed on demand.
783    ///
784    /// This is useful for operations that can be performed on blocks independently.
785    ///
786    /// # Arguments
787    ///
788    /// * `f` - A function that processes a block of elements
789    ///
790    /// # Returns
791    ///
792    /// A vector of results, one for each block
793    pub fn process_blocks<F, R>(&self, f: F) -> CoreResult<Vec<R>>
794    where
795        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
796        R: Send + 'static,
797    {
798        self.process_blocks_internal(f, false, None)
799    }
800
801    /// Process the array in blocks with a custom block size.
802    ///
803    /// # Arguments
804    ///
805    /// * `block_size` - The block size to use (in elements)
806    /// * `f` - A function that processes a block of elements
807    ///
808    /// # Returns
809    ///
810    /// A vector of results, one for each block
811    pub fn process_blocks_with_size<F, R>(&self, blocksize: usize, f: F) -> CoreResult<Vec<R>>
812    where
813        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
814        R: Send + 'static,
815    {
816        self.process_blocks_internal(f, false, Some(blocksize))
817    }
818
819    /// Process the array in blocks in parallel.
820    ///
821    /// # Arguments
822    ///
823    /// * `f` - A function that processes a block of elements
824    ///
825    /// # Returns
826    ///
827    /// A vector of results, one for each block
828    #[cfg(feature = "parallel")]
829    pub fn process_blocks_parallel<F, R>(&self, f: F) -> CoreResult<Vec<R>>
830    where
831        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
832        R: Send + 'static,
833    {
834        self.process_blocks_internal(f, true, None)
835    }
836
837    /// Process the array in blocks in parallel with a custom block size.
838    ///
839    /// # Arguments
840    ///
841    /// * `block_size` - The block size to use (in elements)
842    /// * `f` - A function that processes a block of elements
843    ///
844    /// # Returns
845    ///
846    /// A vector of results, one for each block
847    #[cfg(feature = "parallel")]
848    pub fn process_blocks_parallel_with_size<F, R>(
849        &self,
850        block_size: usize,
851        f: F,
852    ) -> CoreResult<Vec<R>>
853    where
854        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
855        R: Send + 'static,
856    {
857        self.process_blocks_internal(f, true, Some(block_size))
858    }
859
860    /// Internal implementation of block processing.
861    #[cfg(not(feature = "parallel"))]
862    fn process_blocks_internal<F, R>(
863        &self,
864        f: F,
865        _parallel: bool,
866        custom_block_size: Option<usize>,
867    ) -> CoreResult<Vec<R>>
868    where
869        F: FnMut(&[A], usize) -> R,
870    {
871        // Determine block layout
872        let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
873        let num_elements = self.metadata.num_elements;
874        let num_blocks = num_elements.div_ceil(block_size);
875
876        // Serial processing
877        (0..num_blocks)
878            .map(|blockidx| {
879                // Calculate the range of elements for this block
880                let start = blockidx * block_size;
881                let end = (start + block_size).min(num_elements);
882
883                // Load the elements for this block
884                let elements = self.load_elements(start, end)?;
885
886                // Apply the function to the block
887                Ok(f(&elements, blockidx))
888            })
889            .collect::<Result<Vec<R>, CoreError>>()
890    }
891
892    /// Internal implementation of block processing (parallel version).
893    #[cfg(feature = "parallel")]
894    fn process_blocks_internal<F, R>(
895        &self,
896        f: F,
897        parallel: bool,
898        custom_block_size: Option<usize>,
899    ) -> CoreResult<Vec<R>>
900    where
901        F: Fn(&[A], usize) -> R + Send + Sync + 'static,
902        R: Send + 'static,
903    {
904        // Determine block layout
905        let block_size = custom_block_size.unwrap_or(self.metadata.block_size);
906        let num_elements = self.metadata.num_elements;
907        let num_blocks = num_elements.div_ceil(block_size);
908
909        // Process blocks
910        if parallel {
911            use crate::parallel_ops::*;
912
913            return (0..num_blocks)
914                .into_par_iter()
915                .map(|blockidx| {
916                    // Calculate the range of elements for this block
917                    let start = blockidx * block_size;
918                    let end = (start + block_size).min(num_elements);
919
920                    // Load the elements for this block
921                    let elements = match self.load_elements(start, end) {
922                        Ok(elems) => elems,
923                        Err(e) => return Err(e),
924                    };
925
926                    // Apply the function to the block
927                    Ok(f(&elements, blockidx))
928                })
929                .collect::<Result<Vec<R>, CoreError>>();
930        }
931
932        // Serial processing (used when parallel=false)
933        (0..num_blocks)
934            .map(|blockidx| {
935                // Calculate the range of elements for this block
936                let start = blockidx * block_size;
937                let end = (start + block_size).min(num_elements);
938
939                // Load the elements for this block
940                let elements = self.load_elements(start, end)?;
941
942                // Apply the function to the block
943                Ok(f(&elements, blockidx))
944            })
945            .collect::<Result<Vec<R>, CoreError>>()
946    }
947
948    /// Load a range of elements from the array.
949    ///
950    /// This will decompress all blocks containing the range.
951    ///
952    /// # Arguments
953    ///
954    /// * `start` - The starting element index
955    /// * `end` - The ending element index (exclusive)
956    ///
957    /// # Returns
958    ///
959    /// A vector of elements in the range
960    fn load_elements(&self, start: usize, end: usize) -> CoreResult<Vec<A>> {
961        // Check bounds
962        if start >= self.metadata.num_elements {
963            return Err(CoreError::IndexError(ErrorContext::new(format!(
964                "Start index {} out of bounds (max {})",
965                start,
966                self.metadata.num_elements - 1
967            ))));
968        }
969        if end > self.metadata.num_elements {
970            return Err(CoreError::IndexError(ErrorContext::new(format!(
971                "End index {} out of bounds (max {})",
972                end, self.metadata.num_elements
973            ))));
974        }
975        if start >= end {
976            return Ok(Vec::new());
977        }
978
979        // Determine which blocks we need
980        let start_block = start / self.metadata.block_size;
981        let end_block = (end - 1) / self.metadata.block_size;
982
983        // Allocate space for the result
984        let mut result = Vec::with_capacity(end - start);
985
986        // Load each required block
987        for blockidx in start_block..=end_block {
988            // Get the block (from cache if available)
989            let block = match self.block_cache.get_block(blockidx) {
990                Some(block) => block,
991                None => {
992                    let block = self.load_block(blockidx)?;
993                    self.block_cache.put_block(blockidx, block.clone());
994                    block
995                }
996            };
997
998            // Calculate the range of elements we need from this block
999            let block_start = blockidx * self.metadata.block_size;
1000            let block_end = block_start + block.len();
1001
1002            let range_start = start.max(block_start) - block_start;
1003            let range_end = end.min(block_end) - block_start;
1004
1005            // Copy the elements into the result
1006            result.extend_from_slice(&block[range_start..range_end]);
1007        }
1008
1009        Ok(result)
1010    }
1011}
1012
1013/// Cache for decompressed blocks.
1014///
1015/// This struct provides a LRU (Least Recently Used) cache for decompressed blocks.
1016#[derive(Debug)]
1017struct BlockCache<A: Clone + Copy + 'static + Send + Sync> {
1018    /// Maximum number of blocks to cache
1019    capacity: usize,
1020
1021    /// Time-to-live for cached blocks
1022    ttl: Option<Duration>,
1023
1024    /// Cache of decompressed blocks
1025    cache: RwLock<HashMap<usize, CachedBlock<A>>>,
1026}
1027
1028/// A cached block with its timestamp.
1029#[derive(Debug, Clone)]
1030struct CachedBlock<A: Clone + Copy + 'static + Send + Sync> {
1031    /// The decompressed block data
1032    data: Vec<A>,
1033
1034    /// The time when the block was last accessed
1035    timestamp: Instant,
1036}
1037
1038impl<A: Clone + Copy + 'static + Send + Sync> BlockCache<A> {
1039    /// Create a new block cache.
1040    ///
1041    /// # Arguments
1042    ///
1043    /// * `capacity` - The maximum number of blocks to cache
1044    /// * `ttl` - The time-to-live for cached blocks
1045    fn new(capacity: usize, ttl: Option<Duration>) -> Self {
1046        Self {
1047            capacity,
1048            ttl,
1049            cache: RwLock::new(HashMap::new()),
1050        }
1051    }
1052
1053    /// Check if a block is in the cache.
1054    ///
1055    /// # Arguments
1056    ///
1057    /// * `blockidx` - The index of the block to check
1058    ///
1059    /// # Returns
1060    ///
1061    /// `true` if the block is in the cache, `false` otherwise
1062    fn has_block(&self, blockidx: usize) -> bool {
1063        let cache = self.cache.read().expect("Operation failed");
1064
1065        // Check if the block is in the cache
1066        if let Some(cached) = cache.get(&blockidx) {
1067            // Check if the block has expired
1068            if let Some(ttl) = self.ttl {
1069                if cached.timestamp.elapsed() > ttl {
1070                    return false;
1071                }
1072            }
1073
1074            true
1075        } else {
1076            false
1077        }
1078    }
1079
1080    /// Get a block from the cache.
1081    ///
1082    /// # Arguments
1083    ///
1084    /// * `blockidx` - The index of the block to get
1085    ///
1086    /// # Returns
1087    ///
1088    /// The block if it is in the cache, `None` otherwise
1089    fn get_block(&self, blockidx: usize) -> Option<Vec<A>> {
1090        let mut cache = self.cache.write().expect("Operation failed");
1091
1092        // Check if the block is in the cache
1093        if let Some(mut cached) = cache.remove(&blockidx) {
1094            // Check if the block has expired
1095            if let Some(ttl) = self.ttl {
1096                if cached.timestamp.elapsed() > ttl {
1097                    return None;
1098                }
1099            }
1100
1101            // Update the timestamp
1102            cached.timestamp = Instant::now();
1103
1104            // Put the block back in the cache
1105            let data = cached.data.clone();
1106            cache.insert(blockidx, cached);
1107
1108            Some(data)
1109        } else {
1110            None
1111        }
1112    }
1113
1114    /// Put a block in the cache.
1115    ///
1116    /// # Arguments
1117    ///
1118    /// * `blockidx` - The index of the block to put
1119    /// * `block` - The block data
1120    fn put_block(&self, blockidx: usize, block: Vec<A>) {
1121        let mut cache = self.cache.write().expect("Operation failed");
1122
1123        // Check if we need to evict a block
1124        if cache.len() >= self.capacity && !cache.contains_key(&blockidx) {
1125            // Find the least recently used block
1126            if let Some(lru_idx) = cache
1127                .iter()
1128                .min_by_key(|(_, cached)| cached.timestamp)
1129                .map(|(idx, _)| *idx)
1130            {
1131                cache.remove(&lru_idx);
1132            }
1133        }
1134
1135        // Add the block to the cache
1136        cache.insert(
1137            blockidx,
1138            CachedBlock {
1139                data: block,
1140                timestamp: Instant::now(),
1141            },
1142        );
1143    }
1144
1145    /// Clear the cache.
1146    #[allow(dead_code)]
1147    fn clear(&self) {
1148        let mut cache = self.cache.write().expect("Operation failed");
1149        cache.clear();
1150    }
1151
1152    /// Get the number of blocks in the cache.
1153    #[allow(dead_code)]
1154    fn len(&self) -> usize {
1155        let cache = self.cache.read().expect("Operation failed");
1156        cache.len()
1157    }
1158
1159    /// Check if the cache is empty.
1160    #[allow(dead_code)]
1161    fn is_empty(&self) -> bool {
1162        let cache = self.cache.read().expect("Operation failed");
1163        cache.is_empty()
1164    }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169    use super::*;
1170    use ::ndarray::Array2;
1171    use tempfile::tempdir;
1172
1173    #[test]
1174    fn test_compressed_memmapped_array_1d() {
1175        // Create a temporary directory for our test files
1176        let dir = tempdir().expect("Operation failed");
1177        let file_path = dir.path().join("test_compressed_1d.cmm");
1178
1179        // Create test data
1180        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1181
1182        // Create a builder
1183        let builder = CompressedMemMapBuilder::new()
1184            .with_block_size(100)
1185            .with_algorithm(CompressionAlgorithm::Lz4)
1186            .with_level(1)
1187            .with_cache_size(4)
1188            .with_description("Test 1D array");
1189
1190        // Create the compressed memory-mapped array
1191        let cmm = builder
1192            .create_from_raw(&data, &[1000], &file_path)
1193            .expect("Operation failed");
1194
1195        // Check metadata
1196        assert_eq!(cmm.shape(), &[1000]);
1197        assert_eq!(cmm.size(), 1000);
1198        assert_eq!(cmm.ndim(), 1);
1199
1200        // Test random access
1201        for i in 0..10 {
1202            let val = cmm.get(&[i * 100]).expect("Operation failed");
1203            assert_eq!(val, (i * 100) as f64);
1204        }
1205
1206        // Test slicing
1207        let slice = cmm.slice(&[(200, 300)]).expect("Operation failed");
1208        assert_eq!(slice.shape(), &[100]);
1209        for i in 0..100 {
1210            assert_eq!(slice[crate::ndarray::IxDyn(&[i])], (i + 200) as f64);
1211        }
1212
1213        // Test block processing
1214        let sums = cmm
1215            .process_blocks(|block, _| block.iter().sum::<f64>())
1216            .expect("Test: operation failed");
1217
1218        assert_eq!(sums.len(), 10); // 1000 elements / 100 block size = 10 blocks
1219
1220        // Test loading the entire array
1221        let array = cmm.readonly_array().expect("Operation failed");
1222        assert_eq!(array.shape(), &[1000]);
1223        for i in 0..1000 {
1224            assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1225        }
1226    }
1227
1228    #[test]
1229    fn test_compressed_memmapped_array_2d() {
1230        // Create a temporary directory for our test files
1231        let dir = tempdir().expect("Operation failed");
1232        let file_path = dir.path().join("test_compressed_2d.cmm");
1233
1234        // Create test data - 10x10 matrix
1235        let data = Array2::<f64>::from_shape_fn((10, 10), |(i, j)| (i * 10 + j) as f64);
1236
1237        // Create a builder
1238        let builder = CompressedMemMapBuilder::new()
1239            .with_block_size(25) // 5x5 blocks
1240            .with_algorithm(CompressionAlgorithm::Lz4)
1241            .with_level(1)
1242            .with_cache_size(4)
1243            .with_description("Test 2D array");
1244
1245        // Create the compressed memory-mapped array
1246        let cmm = builder.create(&data, &file_path).expect("Operation failed");
1247
1248        // Check metadata
1249        assert_eq!(cmm.shape(), &[10, 10]);
1250        assert_eq!(cmm.size(), 100);
1251        assert_eq!(cmm.ndim(), 2);
1252
1253        // Test random access
1254        for i in 0..10 {
1255            for j in 0..10 {
1256                let val = cmm.get(&[i, j]).expect("Operation failed");
1257                assert_eq!(val, (i * 10 + j) as f64);
1258            }
1259        }
1260
1261        // Test slicing
1262        let slice = cmm.slice(&[(2, 5), (3, 7)]).expect("Operation failed");
1263        assert_eq!(slice.shape(), &[3, 4]);
1264        for i in 0..3 {
1265            for j in 0..4 {
1266                assert_eq!(
1267                    slice[crate::ndarray::IxDyn(&[i, j])],
1268                    ((i + 2) * 10 + (j + 3)) as f64
1269                );
1270            }
1271        }
1272
1273        // Test loading the entire array
1274        let array = cmm.readonly_array().expect("Operation failed");
1275        assert_eq!(array.shape(), &[10, 10]);
1276        for i in 0..10 {
1277            for j in 0..10 {
1278                assert_eq!(array[crate::ndarray::IxDyn(&[i, j])], (i * 10 + j) as f64);
1279            }
1280        }
1281    }
1282
1283    #[test]
1284    fn test_different_compression_algorithms() {
1285        // Create a temporary directory for our test files
1286        let dir = tempdir().expect("Operation failed");
1287
1288        // Create test data
1289        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1290
1291        // Test each compression algorithm
1292        for algorithm in &[
1293            CompressionAlgorithm::Lz4,
1294            CompressionAlgorithm::Zstd,
1295            CompressionAlgorithm::Snappy,
1296        ] {
1297            let file_path = dir.path().join(format!("test_{:?}.cmm", algorithm));
1298
1299            // Create a builder
1300            let builder = CompressedMemMapBuilder::new()
1301                .with_block_size(100)
1302                .with_algorithm(*algorithm)
1303                .with_level(1)
1304                .with_cache_size(4);
1305
1306            // Create the compressed memory-mapped array
1307            let cmm = builder
1308                .create_from_raw(&data, &[1000], &file_path)
1309                .expect("Operation failed");
1310
1311            // Test loading the entire array
1312            let array = cmm.readonly_array().expect("Operation failed");
1313            for i in 0..1000 {
1314                assert_eq!(array[crate::ndarray::IxDyn(&[i])], i as f64);
1315            }
1316        }
1317    }
1318
1319    #[test]
1320    fn test_block_cache() {
1321        // Create a temporary directory for our test files
1322        let dir = tempdir().expect("Operation failed");
1323        let file_path = dir.path().join("test_cache.cmm");
1324
1325        // Create test data
1326        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1327
1328        // Create compressed arrays with different cache settings
1329        let small_cache = CompressedMemMapBuilder::new()
1330            .with_block_size(100)
1331            .with_cache_size(2) // Very small cache
1332            .create_from_raw(&data, &[1000], &file_path).expect("Operation failed");
1333
1334        // Test cache behavior
1335        // First, load all blocks to fill the cache
1336        for i in 0..10 {
1337            small_cache.preload_block(i).expect("Operation failed");
1338        }
1339
1340        // Check the cache size - should be 2 (capacity)
1341        assert_eq!(small_cache.block_cache.len(), 2);
1342
1343        // Now access a block that's not in the cache
1344        // This should evict the least recently used block
1345        let val = small_cache.get(&[0]).expect("Operation failed"); // Block 0
1346        assert_eq!(val, 0.0);
1347
1348        // Check that the block is now in the cache
1349        assert!(small_cache.block_cache.has_block(0));
1350    }
1351
1352    #[test]
1353    fn test_block_preloading() {
1354        // Create a temporary directory for our test files
1355        let dir = tempdir().expect("Operation failed");
1356        let file_path = dir.path().join("test_preload.cmm");
1357
1358        // Create test data
1359        let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1360
1361        // Create the compressed memory-mapped array
1362        let cmm = CompressedMemMapBuilder::new()
1363            .with_block_size(100)
1364            .create_from_raw(&data, &[1000], &file_path)
1365            .expect("Test: operation failed");
1366
1367        // Preload a block
1368        cmm.preload_block(5).expect("Operation failed");
1369
1370        // Check that the block is now in the cache
1371        assert!(cmm.block_cache.has_block(5));
1372
1373        // Access an element from the preloaded block
1374        let val = cmm.get(&[550]).expect("Operation failed"); // In block 5
1375        assert_eq!(val, 550.0);
1376    }
1377}