scirs2_core/memory_efficient/
memmap_chunks.rs

1//! Chunked processing extension for memory-mapped arrays.
2//!
3//! This module provides utilities for working with large memory-mapped arrays
4//! in a memory-efficient manner by processing them in smaller chunks.
5//!
6//! ## Overview
7//!
8//! Memory-mapped arrays allow working with data that doesn't fit entirely in RAM,
9//! but processing such large arrays can still be challenging. This module extends
10//! `MemoryMappedArray` with chunked processing capabilities, enabling efficient
11//! processing of large datasets through a combination of:
12//!
13//! - Processing data in manageable chunks to control memory usage
14//! - Providing both iterator-based and callback-based processing APIs
15//! - Supporting various chunking strategies to optimize for different workloads
16//! - Ensuring mutations are properly persisted to the underlying memory-mapped file
17//!
18//! ## Usage
19//!
20//! There are three main ways to process chunks:
21//!
22//! 1. Using `process_chunks` for reading/processing chunks:
23//!    ```no_run
24//!    # use scirs2_core::memory_efficient::{MemoryMappedArray, MemoryMappedChunks, ChunkingStrategy};
25//!    # let mmap: MemoryMappedArray<f64> = unimplemented!();
26//!    // Process chunks of a large array and collect results
27//!    let results = mmap.process_chunks(
28//!        ChunkingStrategy::Fixed(1000),
29//!        |chunk_data, chunk_idx| {
30//!            // Process this chunk and return a result
31//!            chunk_data.iter().sum::<f64>()
32//!        }
33//!    );
34//!    ```
35//!
36//! 2. Using `process_chunks_mut` for mutating chunks:
37//!    ```no_run
38//!    # use scirs2_core::memory_efficient::{MemoryMappedArray, MemoryMappedChunks, ChunkingStrategy};
39//!    # let mut mmap: MemoryMappedArray<f64> = unimplemented!();
40//!    // Modify each chunk in-place
41//!    mmap.process_chunks_mut(
42//!        ChunkingStrategy::NumChunks(10),
43//!        |chunk_data, chunk_idx| {
44//!            // Modify the chunk data in-place
45//!            for i in 0..chunk_data.len() {
46//!                chunk_data[i] *= 2.0;
47//!            }
48//!        }
49//!    );
50//!    ```
51//!
52//! 3. Using the `chunks` iterator for element-by-element processing:
53//!    ```no_run
54//!    # use scirs2_core::memory_efficient::{MemoryMappedArray, MemoryMappedChunkIter, ChunkingStrategy};
55//!    # let mmap: MemoryMappedArray<f64> = unimplemented!();
56//!    // Process chunks using iterator
57//!    for chunk in mmap.chunks(ChunkingStrategy::Auto) {
58//!        // Each chunk is an Array1 of the appropriate type
59//!        println!("Chunk sum: {}", chunk.sum());
60//!    }
61//!    ```
62//!
63//! 4. If you have the `parallel` feature enabled, you can also use parallel processing:
64//!    ```ignore
65//!    # #[cfg(feature = "parallel")]
66//!    # {
67//!    # use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunksParallel};
68//!    # use scirs2_core::ndarray::Array1;
69//!    # let data = Array1::<f64>::zeros(1000);
70//!    # let mmap = create_mmap(&data, "/tmp/data.bin", AccessMode::Write, 0).expect("Operation failed");
71//!    // Process chunks in parallel
72//!    let results = mmap.process_chunks_parallel(
73//!        ChunkingStrategy::Fixed(1000),
74//!        |chunk_data, chunk_idx| {
75//!            chunk_data.iter().sum::<f64>()
76//!        }
77//!    );
78//!    # }
79//!    ```
80//!
81//! ## Chunking Strategies
82//!
83//! This module supports different chunking strategies:
84//!
85//! - `ChunkingStrategy::Fixed(size)`: Fixed-size chunks
86//! - `ChunkingStrategy::NumChunks(n)`: Divide the array into a specific number of chunks
87//! - `ChunkingStrategy::Auto`: Automatically determine a reasonable chunk size
88//! - `ChunkingStrategy::FixedBytes(bytes)`: Chunks with a specific size in bytes
89//!
90//! ## Limitations
91//!
92//! - Currently only works with 1D arrays (1-dimensional arrays only)
93//! - For mutating operations, the module uses direct file I/O to ensure changes are
94//!   properly persisted to disk, which may be slower than memory-only operations
95
96use crate::memory_efficient::chunked::ChunkingStrategy;
97use crate::memory_efficient::memmap::MemoryMappedArray;
98use ::ndarray::Array1;
99use std::fs::OpenOptions;
100use std::io::{Seek, SeekFrom, Write};
101
102#[cfg(feature = "parallel")]
103use crate::parallel_ops::*;
104
105/// Extension trait for MemoryMappedArray to enable chunked processing of large datasets.
106///
107/// This trait extends `MemoryMappedArray` with methods for processing large datasets
108/// in manageable chunks, which helps to control memory usage and enables working with
109/// arrays that might be too large to fit entirely in memory.
110pub trait MemoryMappedChunks<A: Clone + Copy + 'static + Send + Sync> {
111    /// Get the number of chunks for the given chunking strategy.
112    ///
113    /// # Arguments
114    ///
115    /// * `strategy` - The chunking strategy to determine chunk sizes
116    ///
117    /// # Returns
118    ///
119    /// The number of chunks that the array will be divided into
120    ///
121    /// # Examples
122    ///
123    /// ```
124    /// use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunks};
125    /// use scirs2_core::ndarray::Array1;
126    ///
127    /// // Create a memory-mapped array with 100 elements
128    /// let data = Array1::<f64>::linspace(0., 99., 100);
129    /// let file_path = "example.bin";  // In practice, use a proper temporary path
130    /// let mmap = create_mmap(&data, file_path.as_ref(), AccessMode::Write, 0).expect("Operation failed");
131    ///
132    /// // Check how many chunks we'll get with different strategies
133    /// assert_eq!(mmap.chunk_count(ChunkingStrategy::Fixed(10)), 10);  // 10 chunks of 10 elements each
134    /// assert_eq!(mmap.chunk_count(ChunkingStrategy::NumChunks(5)), 5);  // 5 chunks of 20 elements each
135    /// ```
136    fn chunk_count(&self, strategy: ChunkingStrategy) -> usize;
137
138    /// Process each chunk with a function and collect the results.
139    ///
140    /// This method divides the array into chunks according to the given strategy,
141    /// applies the provided function to each chunk, and collects the results into a vector.
142    /// It is efficient for read-only operations on large arrays.
143    ///
144    /// # Arguments
145    ///
146    /// * `strategy` - The chunking strategy to determine chunk sizes
147    /// * `f` - A function that processes each chunk and returns a result
148    ///
149    /// # Returns
150    ///
151    /// A vector containing the results from processing each chunk
152    ///
153    /// # Examples
154    ///
155    /// ```
156    /// use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunks};
157    /// use scirs2_core::ndarray::Array1;
158    ///
159    /// // Create a memory-mapped array with 20 elements (small numbers to avoid overflow)
160    /// let data = Array1::<f64>::from_vec((0..20).map(|x| x as f64).collect());
161    /// let file_path = "example.bin";  // In practice, use a proper temporary path
162    /// let mmap = create_mmap(&data, file_path.as_ref(), AccessMode::Write, 0).expect("Operation failed");
163    ///
164    /// // Calculate the sum of each chunk
165    /// let chunk_sums = mmap.process_chunks(
166    ///     ChunkingStrategy::Fixed(5),
167    ///     |chunk, chunk_idx| chunk.iter().sum::<f64>()
168    /// );
169    ///
170    /// // We should have 4 chunks with sums of elements 0-4, 5-9, 10-14, 15-19
171    /// assert_eq!(chunk_sums.len(), 4);
172    /// ```
173    fn process_chunks<F, R>(&self, strategy: ChunkingStrategy, f: F) -> Vec<R>
174    where
175        F: Fn(&[A], usize) -> R;
176
177    /// Process each chunk with a mutable function that modifies the data in-place.
178    ///
179    /// This method divides the array into chunks according to the given strategy
180    /// and applies the provided mutable function to each chunk. The function can
181    /// modify the chunk data in-place, and the changes will be saved to the
182    /// underlying memory-mapped file.
183    ///
184    /// # Arguments
185    ///
186    /// * `strategy` - The chunking strategy to determine chunk sizes
187    /// * `f` - A function that processes and potentially modifies each chunk
188    ///
189    /// # Examples
190    ///
191    /// ```
192    /// use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunks};
193    /// use scirs2_core::ndarray::Array1;
194    ///
195    /// // Create a memory-mapped array with 100 zeros
196    /// let data = Array1::<f64>::zeros(100);
197    /// let file_path = "example.bin";  // In practice, use a proper temporary path
198    /// let mut mmap = create_mmap(&data, file_path.as_ref(), AccessMode::Write, 0).expect("Operation failed");
199    ///
200    /// // Modify each chunk: set elements to their index
201    /// mmap.process_chunks_mut(
202    ///     ChunkingStrategy::Fixed(10),
203    ///     |chunk, chunk_idx| {
204    ///         for (i, elem) in chunk.iter_mut().enumerate() {
205    ///             *elem = (chunk_idx * 10 + i) as f64;
206    ///         }
207    ///     }
208    /// );
209    ///
210    /// // Now the array contains [0, 1, 2, ..., 99]
211    /// ```
212    ///
213    /// # Notes
214    ///
215    /// This method uses direct file I/O to ensure changes are properly persisted to disk,
216    /// which may be slower than memory-only operations but is more reliable for ensuring
217    /// data is properly saved, especially with large datasets.
218    fn process_chunks_mut<F>(&mut self, strategy: ChunkingStrategy, f: F)
219    where
220        F: Fn(&mut [A], usize);
221}
222
223/// Extension trait for parallel processing of memory-mapped arrays.
224///
225/// This trait is only available when the 'parallel' feature is enabled.
226/// It extends the `MemoryMappedChunks` trait with parallel processing capabilities.
227#[cfg(feature = "parallel")]
228pub trait MemoryMappedChunksParallel<A: Clone + Copy + 'static + Send + Sync + Send + Sync>:
229    MemoryMappedChunks<A>
230{
231    /// Process chunks in parallel and collect the results.
232    ///
233    /// This method works like `process_chunks` but processes the chunks in parallel using Rayon.
234    /// It's useful for computationally intensive operations on large datasets.
235    ///
236    /// # Arguments
237    ///
238    /// * `strategy` - The chunking strategy to determine chunk sizes
239    /// * `f` - A function that processes each chunk and returns a result
240    ///
241    /// # Returns
242    ///
243    /// A vector containing the results from processing each chunk, in chunk order
244    ///
245    /// # Examples
246    ///
247    /// ```ignore
248    /// # #[cfg(feature = "parallel")]
249    /// # {
250    /// use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunks, MemoryMappedChunksParallel};
251    /// use scirs2_core::ndarray::Array1;
252    ///
253    /// // Create a memory-mapped array with 20 elements (small numbers to avoid overflow)
254    /// let data = Array1::<i32>::from_vec((1..=20).collect());
255    /// let file_path = "example.bin";  // In practice, use a proper temporary path
256    /// let mmap = create_mmap(&data, file_path.as_ref(), AccessMode::Write, 0).expect("Operation failed");
257    ///
258    /// // Calculate the sum of each chunk in parallel
259    /// let chunk_sums = mmap.process_chunks_parallel(
260    ///     ChunkingStrategy::Fixed(5),
261    ///     |chunk, chunk_idx| chunk.iter().sum::<i32>()
262    /// );
263    ///
264    /// // We should have 4 chunks with reasonable sums
265    /// assert_eq!(chunk_sums.len(), 4);
266    /// # }
267    /// ```
268    fn process_chunks_parallel<F, R>(&self, strategy: ChunkingStrategy, f: F) -> Vec<R>
269    where
270        F: Fn(&[A], usize) -> R + Send + Sync,
271        R: Send;
272
273    /// Process chunks in parallel with a mutable function.
274    ///
275    /// This method works like `process_chunks_mut` but processes the chunks in parallel using Rayon.
276    /// It's useful for computationally intensive operations on large datasets.
277    ///
278    /// # Arguments
279    ///
280    /// * `strategy` - The chunking strategy to determine chunk sizes
281    /// * `f` - A function that processes and potentially modifies each chunk
282    ///
283    /// # Examples
284    ///
285    /// ```
286    /// # #[cfg(feature = "parallel")]
287    /// # {
288    /// use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunks, MemoryMappedChunksParallel};
289    /// use scirs2_core::ndarray::Array1;
290    ///
291    /// // Create a memory-mapped array with 100 zeros
292    /// let data = Array1::<f64>::zeros(100);
293    /// let file_path = "example.bin";  // In practice, use a proper temporary path
294    /// let mut mmap = create_mmap(&data, file_path.as_ref(), AccessMode::Write, 0).expect("Operation failed");
295    ///
296    /// // Modify each chunk in parallel: set elements to their index
297    /// mmap.process_chunks_mut_parallel(
298    ///     ChunkingStrategy::Fixed(10),
299    ///     |chunk, chunk_idx| {
300    ///         for (i, elem) in chunk.iter_mut().enumerate() {
301    ///             *elem = (chunk_idx * 10 + i) as f64;
302    ///         }
303    ///     }
304    /// );
305    /// # }
306    /// ```
307    ///
308    /// # Notes
309    ///
310    /// Even when used in parallel, this method ensures that file writes are safe
311    /// and do not conflict with each other by collecting all modifications and
312    /// applying them sequentially.
313    fn process_chunks_mut_parallel<F>(&mut self, strategy: ChunkingStrategy, f: F)
314    where
315        F: Fn(&mut [A], usize) + Send + Sync;
316}
317
318/// Iterator over chunks of a memory-mapped array (for 1D arrays only).
319///
320/// This iterator provides a convenient way to process a memory-mapped array in chunks,
321/// returning each chunk as an `Array1<A>`. It's particularly useful for operations where
322/// you want to process chunks sequentially and don't need to collect results.
323///
324/// # Examples
325///
326/// ```
327/// use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunkIter};
328/// use scirs2_core::ndarray::Array1;
329///
330/// // Create a memory-mapped array
331/// let data = Array1::<f64>::linspace(0., 99., 100);
332/// let file_path = "example.bin";  // In practice, use a proper temporary path
333/// let mmap = create_mmap(&data, file_path.as_ref(), AccessMode::Write, 0).expect("Operation failed");
334///
335/// // Process chunks using iterator
336/// let mut sum = 0.0;
337/// for chunk in mmap.chunks(ChunkingStrategy::Fixed(10)) {
338///     // Each chunk is an Array1<f64>
339///     sum += chunk.sum();
340/// }
341///
342/// // The sum should be the same as summing the original array
343/// assert!((sum - data.sum()).abs() < 1e-10);
344/// ```
345pub struct ChunkIter<'a, A>
346where
347    A: Clone + Copy + 'static + Send + Sync + Send + Sync,
348{
349    /// Reference to the memory-mapped array
350    array: &'a MemoryMappedArray<A>,
351    /// Current chunk index
352    current_idx: usize,
353    /// Total number of chunks
354    total_chunks: usize,
355    /// Chunking strategy
356    strategy: ChunkingStrategy,
357}
358
359impl<A> Iterator for ChunkIter<'_, A>
360where
361    A: Clone + Copy + 'static + Send + Sync,
362{
363    type Item = Array1<A>;
364
365    fn next(&mut self) -> Option<Self::Item> {
366        if self.current_idx >= self.total_chunks {
367            None
368        } else {
369            let chunk_idx = self.current_idx;
370            self.current_idx += 1;
371
372            // Get chunk start/end indices
373            let chunk_size = match self.strategy {
374                ChunkingStrategy::Fixed(size) => size,
375                ChunkingStrategy::NumChunks(n) => self.array.size.div_ceil(n),
376                ChunkingStrategy::Auto => (self.array.size / 100).max(1),
377                ChunkingStrategy::FixedBytes(bytes) => {
378                    let element_size = std::mem::size_of::<A>();
379                    let elements_per_chunk = bytes / element_size;
380                    elements_per_chunk.max(1)
381                }
382                ChunkingStrategy::Advanced(_) => {
383                    // For advanced strategies, fall back to auto sizing
384                    (self.array.size / 100).max(1)
385                }
386            };
387
388            let start_idx = chunk_idx * chunk_size;
389            let end_idx = (start_idx + chunk_size).min(self.array.size);
390
391            // Get the array data to return a chunk
392            if let Ok(array_1d) = self.array.as_array::<crate::ndarray::Ix1>() {
393                Some(array_1d.slice(crate::s![start_idx..end_idx]).to_owned())
394            } else {
395                None
396            }
397        }
398    }
399
400    fn size_hint(&self) -> (usize, Option<usize>) {
401        let remaining = self.total_chunks - self.current_idx;
402        (remaining, Some(remaining))
403    }
404}
405
406impl<A> ExactSizeIterator for ChunkIter<'_, A> where A: Clone + Copy + 'static + Send + Sync {}
407
408/// Extension trait for MemoryMappedArray to enable chunked iteration.
409///
410/// This trait extends `MemoryMappedArray` with the ability to iterate over chunks
411/// of the array, which provides a convenient way to process large arrays sequentially
412/// in smaller, manageable pieces.
413pub trait MemoryMappedChunkIter<A: Clone + Copy + 'static + Send + Sync> {
414    /// Create an iterator over chunks of the array (for 1D arrays only).
415    ///
416    /// This method returns an iterator that yields chunks of the array as
417    /// `Array1<A>` values, making it easy to process the array in smaller pieces.
418    ///
419    /// # Arguments
420    ///
421    /// * `strategy` - The chunking strategy to determine chunk sizes
422    ///
423    /// # Returns
424    ///
425    /// An iterator that yields `Array1<A>` chunks of the memory-mapped array
426    ///
427    /// # Examples
428    ///
429    /// ```
430    /// use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunkIter};
431    /// use scirs2_core::ndarray::Array1;
432    ///
433    /// // Create a memory-mapped array
434    /// let data = Array1::<f64>::linspace(0., 99., 100);
435    /// let file_path = "example.bin";  // In practice, use a proper temporary path
436    /// let mmap = create_mmap(&data, file_path.as_ref(), AccessMode::Write, 0).expect("Operation failed");
437    ///
438    /// // Create a chunk iterator with chunks of size 25
439    /// let mut iter = mmap.chunks(ChunkingStrategy::Fixed(25));
440    ///
441    /// // Get the first chunk (elements 0-24)
442    /// let chunk1 = iter.next().expect("Operation failed");
443    /// assert_eq!(chunk1.len(), 25);
444    /// assert_eq!(chunk1[0], 0.0);
445    /// assert_eq!(chunk1[24], 24.0);
446    /// ```
447    fn chunks(&self, strategy: ChunkingStrategy) -> ChunkIter<A>;
448}
449
450impl<A: Clone + Copy + 'static + Send + Sync + Send + Sync> MemoryMappedChunks<A>
451    for MemoryMappedArray<A>
452{
453    fn chunk_count(&self, strategy: ChunkingStrategy) -> usize {
454        match strategy {
455            ChunkingStrategy::Fixed(size) => {
456                // Calculate how many chunks of the given size we need
457                self.size.div_ceil(size)
458            }
459            ChunkingStrategy::NumChunks(n) => {
460                // Number of chunks is explicitly specified
461                n
462            }
463            ChunkingStrategy::Auto => {
464                // Determine a reasonable chunk size based on the array size
465                let total_elements = self.size;
466                let optimal_chunk_size = (total_elements / 100).max(1);
467                total_elements.div_ceil(optimal_chunk_size)
468            }
469            ChunkingStrategy::FixedBytes(bytes) => {
470                // Calculate how many chunks based on bytes
471                let element_size = std::mem::size_of::<A>();
472                let elements_per_chunk = bytes / element_size;
473                let elements_per_chunk = elements_per_chunk.max(1); // Ensure at least 1 element per chunk
474                self.size.div_ceil(elements_per_chunk)
475            }
476            ChunkingStrategy::Advanced(_) => {
477                // For advanced strategies, fall back to auto sizing
478                let total_elements = self.size;
479                let optimal_chunk_size = (total_elements / 100).max(1);
480                total_elements.div_ceil(optimal_chunk_size)
481            }
482        }
483    }
484
485    fn process_chunks<F, R>(&self, strategy: ChunkingStrategy, f: F) -> Vec<R>
486    where
487        F: Fn(&[A], usize) -> R,
488    {
489        let total_chunks = self.chunk_count(strategy);
490        let mut results = Vec::with_capacity(total_chunks);
491
492        // Process each chunk by copying the data
493        for chunk_idx in 0..total_chunks {
494            // Calculate chunk size and indices
495            let chunk_size = match strategy {
496                ChunkingStrategy::Fixed(size) => size,
497                ChunkingStrategy::NumChunks(n) => self.size.div_ceil(n),
498                ChunkingStrategy::Auto => {
499                    let total_elements = self.size;
500                    (total_elements / 100).max(1)
501                }
502                ChunkingStrategy::FixedBytes(bytes) => {
503                    let element_size = std::mem::size_of::<A>();
504                    let elements_per_chunk = bytes / element_size;
505                    elements_per_chunk.max(1)
506                }
507                ChunkingStrategy::Advanced(_) => {
508                    // For advanced strategies, fall back to auto sizing
509                    let total_elements = self.size;
510                    (total_elements / 100).max(1)
511                }
512            };
513
514            let start_idx = chunk_idx * chunk_size;
515            let end_idx = (start_idx + chunk_size).min(self.size);
516
517            // Get the data for this chunk
518            if let Ok(array_1d) = self.as_array::<crate::ndarray::Ix1>() {
519                // Copy the data to a new array to avoid lifetime issues
520                let chunk_data = array_1d.slice(crate::s![start_idx..end_idx]).to_vec();
521
522                // Process the chunk data
523                results.push(f(&chunk_data, chunk_idx));
524            }
525        }
526
527        results
528    }
529
530    fn process_chunks_mut<F>(&mut self, strategy: ChunkingStrategy, f: F)
531    where
532        F: Fn(&mut [A], usize),
533    {
534        let total_chunks = self.chunk_count(strategy);
535        let element_size = std::mem::size_of::<A>();
536
537        // Process each chunk
538        for chunk_idx in 0..total_chunks {
539            // Calculate chunk size and indices
540            let chunk_size = match strategy {
541                ChunkingStrategy::Fixed(size) => size,
542                ChunkingStrategy::NumChunks(n) => self.size.div_ceil(n),
543                ChunkingStrategy::Auto => {
544                    let total_elements = self.size;
545                    (total_elements / 100).max(1)
546                }
547                ChunkingStrategy::FixedBytes(bytes) => {
548                    let elements_per_chunk = bytes / element_size;
549                    elements_per_chunk.max(1)
550                }
551                ChunkingStrategy::Advanced(_) => {
552                    // For advanced strategies, fall back to auto sizing
553                    let total_elements = self.size;
554                    (total_elements / 100).max(1)
555                }
556            };
557
558            let start_idx = chunk_idx * chunk_size;
559            let end_idx = (start_idx + chunk_size).min(self.size);
560
561            // Get a copy of the data for this chunk
562            let mut chunk_data = Vec::with_capacity(end_idx - start_idx);
563
564            // Obtain the data safely through the memory mapping
565            if let Ok(array_1d) = self.as_array::<crate::ndarray::Ix1>() {
566                chunk_data.extend_from_slice(
567                    array_1d
568                        .slice(crate::s![start_idx..end_idx])
569                        .as_slice()
570                        .expect("Operation failed"),
571                );
572            } else {
573                continue;
574            }
575
576            // Process the chunk data with the provided function
577            f(&mut chunk_data, chunk_idx);
578
579            // Write the modified data back to the file directly
580            // This is the most reliable way to ensure changes are persisted
581            let file_path = &self.file_path;
582
583            if let Ok(mut file) = OpenOptions::new().write(true).open(file_path) {
584                // Calculate the effective offset (header + data offset + element position)
585                let effective_offset = self.offset + start_idx * element_size;
586
587                // Seek to the position and write the data
588                if file.seek(SeekFrom::Start(effective_offset as u64)).is_ok() {
589                    // Convert the chunk data to bytes
590                    let bytes = unsafe {
591                        std::slice::from_raw_parts(
592                            chunk_data.as_ptr() as *const u8,
593                            chunk_data.len() * element_size,
594                        )
595                    };
596
597                    // Write the bytes to the file
598                    let _ = file.write_all(bytes);
599                    let _ = file.flush();
600                }
601            }
602        }
603
604        // Reload the memory mapping to ensure changes are visible
605        let _ = self.reload();
606    }
607}
608
609// Add the parallel methods directly to the existing implementation
610#[cfg(feature = "parallel")]
611impl<A: Clone + Copy + 'static + Send + Sync + Send + Sync> MemoryMappedChunksParallel<A>
612    for MemoryMappedArray<A>
613{
614    fn process_chunks_parallel<F, R>(&self, strategy: ChunkingStrategy, f: F) -> Vec<R>
615    where
616        F: Fn(&[A], usize) -> R + Send + Sync,
617        R: Send,
618    {
619        // First, generate all the chunk indices and sizes
620        let total_chunks = self.chunk_count(strategy);
621        let chunks_info: Vec<_> = (0..total_chunks)
622            .map(|chunk_idx| {
623                let chunk_size = match strategy {
624                    ChunkingStrategy::Fixed(size) => size,
625                    ChunkingStrategy::NumChunks(n) => self.size.div_ceil(n),
626                    ChunkingStrategy::Auto => {
627                        let total_elements = self.size;
628                        (total_elements / 100).max(1)
629                    }
630                    ChunkingStrategy::FixedBytes(bytes) => {
631                        let element_size = std::mem::size_of::<A>();
632                        let elements_per_chunk = bytes / element_size;
633                        elements_per_chunk.max(1)
634                    }
635                    ChunkingStrategy::Advanced(_) => {
636                        // For advanced strategies, fall back to auto sizing
637                        let total_elements = self.size;
638                        (total_elements / 100).max(1)
639                    }
640                };
641
642                let start_idx = chunk_idx * chunk_size;
643                let end_idx = (start_idx + chunk_size).min(self.size);
644
645                (chunk_idx, start_idx, end_idx)
646            })
647            .collect();
648
649        // Get the full array data
650        let array_1d = match self.as_array::<crate::ndarray::Ix1>() {
651            Ok(arr) => arr,
652            Err(_) => return Vec::new(),
653        };
654
655        // Process chunks in parallel
656        let results: Vec<_> = chunks_info
657            .into_par_iter()
658            .map(|(chunk_idx, start_idx, end_idx)| {
659                // Copy the data for this chunk
660                let chunk_data = array_1d.slice(crate::s![start_idx..end_idx]).to_vec();
661
662                // Process the chunk and return the result
663                f(&chunk_data, chunk_idx)
664            })
665            .collect();
666
667        results
668    }
669
670    fn process_chunks_mut_parallel<F>(&mut self, strategy: ChunkingStrategy, f: F)
671    where
672        F: Fn(&mut [A], usize) + Send + Sync,
673    {
674        let total_chunks = self.chunk_count(strategy);
675        let element_size = std::mem::size_of::<A>();
676
677        // First, generate all the chunk indices and sizes
678        let chunks_info: Vec<_> = (0..total_chunks)
679            .map(|chunk_idx| {
680                let chunk_size = match strategy {
681                    ChunkingStrategy::Fixed(size) => size,
682                    ChunkingStrategy::NumChunks(n) => self.size.div_ceil(n),
683                    ChunkingStrategy::Auto => {
684                        let total_elements = self.size;
685                        (total_elements / 100).max(1)
686                    }
687                    ChunkingStrategy::FixedBytes(bytes) => {
688                        let elements_per_chunk = bytes / element_size;
689                        elements_per_chunk.max(1)
690                    }
691                    ChunkingStrategy::Advanced(_) => {
692                        // For advanced strategies, fall back to auto sizing
693                        let total_elements = self.size;
694                        (total_elements / 100).max(1)
695                    }
696                };
697
698                let start_idx = chunk_idx * chunk_size;
699                let end_idx = (start_idx + chunk_size).min(self.size);
700
701                (chunk_idx, start_idx, end_idx)
702            })
703            .collect();
704
705        // Get reference to the file path for the closures
706        let file_path = self.file_path.clone();
707        let offset = self.offset;
708
709        // Get the full array data
710        let array_1d = match self.as_array::<crate::ndarray::Ix1>() {
711            Ok(arr) => arr,
712            Err(_) => return,
713        };
714
715        // Process chunks in parallel and collect the modified data
716        let modifications: Vec<_> = chunks_info
717            .into_par_iter()
718            .map(|(chunk_idx, start_idx, end_idx)| {
719                // Copy the data for this chunk
720                let mut chunk_data = array_1d.slice(crate::s![start_idx..end_idx]).to_vec();
721
722                // Process the chunk data with the provided function
723                f(&mut chunk_data, chunk_idx);
724
725                // Return the chunk index, start index, and modified data
726                (chunk_idx, start_idx, chunk_data)
727            })
728            .collect();
729
730        // Apply all modifications to the file sequentially to avoid conflicts
731        for (_, start_idx, chunk_data) in modifications {
732            if let Ok(mut file) = OpenOptions::new().write(true).open(&file_path) {
733                // Calculate the effective offset
734                let effective_offset = offset + start_idx * element_size;
735
736                // Seek to the position and write the data
737                if file.seek(SeekFrom::Start(effective_offset as u64)).is_ok() {
738                    // Convert the chunk data to bytes
739                    let bytes = unsafe {
740                        std::slice::from_raw_parts(
741                            chunk_data.as_ptr() as *const u8,
742                            chunk_data.len() * element_size,
743                        )
744                    };
745
746                    // Write the bytes to the file
747                    let _ = file.write_all(bytes);
748                    let _ = file.flush();
749                }
750            }
751        }
752
753        // Reload the memory mapping to ensure changes are visible
754        let _ = self.reload();
755    }
756}
757
758impl<A: Clone + Copy + 'static + Send + Sync> MemoryMappedChunkIter<A> for MemoryMappedArray<A> {
759    fn chunks(&self, strategy: ChunkingStrategy) -> ChunkIter<A> {
760        ChunkIter {
761            array: self,
762            current_idx: 0,
763            total_chunks: self.chunk_count(strategy),
764            strategy,
765        }
766    }
767}