scirs2_core/memory_efficient/memmap_chunks.rs
1//! Chunked processing extension for memory-mapped arrays.
2//!
3//! This module provides utilities for working with large memory-mapped arrays
4//! in a memory-efficient manner by processing them in smaller chunks.
5//!
6//! ## Overview
7//!
8//! Memory-mapped arrays allow working with data that doesn't fit entirely in RAM,
9//! but processing such large arrays can still be challenging. This module extends
10//! `MemoryMappedArray` with chunked processing capabilities, enabling efficient
11//! processing of large datasets through a combination of:
12//!
13//! - Processing data in manageable chunks to control memory usage
14//! - Providing both iterator-based and callback-based processing APIs
15//! - Supporting various chunking strategies to optimize for different workloads
16//! - Ensuring mutations are properly persisted to the underlying memory-mapped file
17//!
18//! ## Usage
19//!
20//! There are three main ways to process chunks:
21//!
22//! 1. Using `process_chunks` for reading/processing chunks:
23//! ```no_run
24//! # use scirs2_core::memory_efficient::{MemoryMappedArray, MemoryMappedChunks, ChunkingStrategy};
25//! # let mmap: MemoryMappedArray<f64> = unimplemented!();
26//! // Process chunks of a large array and collect results
27//! let results = mmap.process_chunks(
28//! ChunkingStrategy::Fixed(1000),
29//! |chunk_data, chunk_idx| {
30//! // Process this chunk and return a result
31//! chunk_data.iter().sum::<f64>()
32//! }
33//! );
34//! ```
35//!
36//! 2. Using `process_chunks_mut` for mutating chunks:
37//! ```no_run
38//! # use scirs2_core::memory_efficient::{MemoryMappedArray, MemoryMappedChunks, ChunkingStrategy};
39//! # let mut mmap: MemoryMappedArray<f64> = unimplemented!();
40//! // Modify each chunk in-place
41//! mmap.process_chunks_mut(
42//! ChunkingStrategy::NumChunks(10),
43//! |chunk_data, chunk_idx| {
44//! // Modify the chunk data in-place
45//! for i in 0..chunk_data.len() {
46//! chunk_data[i] *= 2.0;
47//! }
48//! }
49//! );
50//! ```
51//!
52//! 3. Using the `chunks` iterator for element-by-element processing:
53//! ```no_run
54//! # use scirs2_core::memory_efficient::{MemoryMappedArray, MemoryMappedChunkIter, ChunkingStrategy};
55//! # let mmap: MemoryMappedArray<f64> = unimplemented!();
56//! // Process chunks using iterator
57//! for chunk in mmap.chunks(ChunkingStrategy::Auto) {
58//! // Each chunk is an Array1 of the appropriate type
59//! println!("Chunk sum: {}", chunk.sum());
60//! }
61//! ```
62//!
63//! 4. If you have the `parallel` feature enabled, you can also use parallel processing:
64//! ```ignore
65//! # #[cfg(feature = "parallel")]
66//! # {
67//! # use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunksParallel};
68//! # use scirs2_core::ndarray::Array1;
69//! # let data = Array1::<f64>::zeros(1000);
70//! # let mmap = create_mmap(&data, "/tmp/data.bin", AccessMode::Write, 0).expect("Operation failed");
71//! // Process chunks in parallel
72//! let results = mmap.process_chunks_parallel(
73//! ChunkingStrategy::Fixed(1000),
74//! |chunk_data, chunk_idx| {
75//! chunk_data.iter().sum::<f64>()
76//! }
77//! );
78//! # }
79//! ```
80//!
81//! ## Chunking Strategies
82//!
83//! This module supports different chunking strategies:
84//!
85//! - `ChunkingStrategy::Fixed(size)`: Fixed-size chunks
86//! - `ChunkingStrategy::NumChunks(n)`: Divide the array into a specific number of chunks
87//! - `ChunkingStrategy::Auto`: Automatically determine a reasonable chunk size
88//! - `ChunkingStrategy::FixedBytes(bytes)`: Chunks with a specific size in bytes
89//!
90//! ## Limitations
91//!
92//! - Currently only works with 1D arrays (1-dimensional arrays only)
93//! - For mutating operations, the module uses direct file I/O to ensure changes are
94//! properly persisted to disk, which may be slower than memory-only operations
95
96use crate::memory_efficient::chunked::ChunkingStrategy;
97use crate::memory_efficient::memmap::MemoryMappedArray;
98use ::ndarray::Array1;
99use std::fs::OpenOptions;
100use std::io::{Seek, SeekFrom, Write};
101
102#[cfg(feature = "parallel")]
103use crate::parallel_ops::*;
104
105/// Extension trait for MemoryMappedArray to enable chunked processing of large datasets.
106///
107/// This trait extends `MemoryMappedArray` with methods for processing large datasets
108/// in manageable chunks, which helps to control memory usage and enables working with
109/// arrays that might be too large to fit entirely in memory.
110pub trait MemoryMappedChunks<A: Clone + Copy + 'static + Send + Sync> {
111 /// Get the number of chunks for the given chunking strategy.
112 ///
113 /// # Arguments
114 ///
115 /// * `strategy` - The chunking strategy to determine chunk sizes
116 ///
117 /// # Returns
118 ///
119 /// The number of chunks that the array will be divided into
120 ///
121 /// # Examples
122 ///
123 /// ```
124 /// use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunks};
125 /// use scirs2_core::ndarray::Array1;
126 ///
127 /// // Create a memory-mapped array with 100 elements
128 /// let data = Array1::<f64>::linspace(0., 99., 100);
129 /// let file_path = "example.bin"; // In practice, use a proper temporary path
130 /// let mmap = create_mmap(&data, file_path.as_ref(), AccessMode::Write, 0).expect("Operation failed");
131 ///
132 /// // Check how many chunks we'll get with different strategies
133 /// assert_eq!(mmap.chunk_count(ChunkingStrategy::Fixed(10)), 10); // 10 chunks of 10 elements each
134 /// assert_eq!(mmap.chunk_count(ChunkingStrategy::NumChunks(5)), 5); // 5 chunks of 20 elements each
135 /// ```
136 fn chunk_count(&self, strategy: ChunkingStrategy) -> usize;
137
138 /// Process each chunk with a function and collect the results.
139 ///
140 /// This method divides the array into chunks according to the given strategy,
141 /// applies the provided function to each chunk, and collects the results into a vector.
142 /// It is efficient for read-only operations on large arrays.
143 ///
144 /// # Arguments
145 ///
146 /// * `strategy` - The chunking strategy to determine chunk sizes
147 /// * `f` - A function that processes each chunk and returns a result
148 ///
149 /// # Returns
150 ///
151 /// A vector containing the results from processing each chunk
152 ///
153 /// # Examples
154 ///
155 /// ```
156 /// use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunks};
157 /// use scirs2_core::ndarray::Array1;
158 ///
159 /// // Create a memory-mapped array with 20 elements (small numbers to avoid overflow)
160 /// let data = Array1::<f64>::from_vec((0..20).map(|x| x as f64).collect());
161 /// let file_path = "example.bin"; // In practice, use a proper temporary path
162 /// let mmap = create_mmap(&data, file_path.as_ref(), AccessMode::Write, 0).expect("Operation failed");
163 ///
164 /// // Calculate the sum of each chunk
165 /// let chunk_sums = mmap.process_chunks(
166 /// ChunkingStrategy::Fixed(5),
167 /// |chunk, chunk_idx| chunk.iter().sum::<f64>()
168 /// );
169 ///
170 /// // We should have 4 chunks with sums of elements 0-4, 5-9, 10-14, 15-19
171 /// assert_eq!(chunk_sums.len(), 4);
172 /// ```
173 fn process_chunks<F, R>(&self, strategy: ChunkingStrategy, f: F) -> Vec<R>
174 where
175 F: Fn(&[A], usize) -> R;
176
177 /// Process each chunk with a mutable function that modifies the data in-place.
178 ///
179 /// This method divides the array into chunks according to the given strategy
180 /// and applies the provided mutable function to each chunk. The function can
181 /// modify the chunk data in-place, and the changes will be saved to the
182 /// underlying memory-mapped file.
183 ///
184 /// # Arguments
185 ///
186 /// * `strategy` - The chunking strategy to determine chunk sizes
187 /// * `f` - A function that processes and potentially modifies each chunk
188 ///
189 /// # Examples
190 ///
191 /// ```
192 /// use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunks};
193 /// use scirs2_core::ndarray::Array1;
194 ///
195 /// // Create a memory-mapped array with 100 zeros
196 /// let data = Array1::<f64>::zeros(100);
197 /// let file_path = "example.bin"; // In practice, use a proper temporary path
198 /// let mut mmap = create_mmap(&data, file_path.as_ref(), AccessMode::Write, 0).expect("Operation failed");
199 ///
200 /// // Modify each chunk: set elements to their index
201 /// mmap.process_chunks_mut(
202 /// ChunkingStrategy::Fixed(10),
203 /// |chunk, chunk_idx| {
204 /// for (i, elem) in chunk.iter_mut().enumerate() {
205 /// *elem = (chunk_idx * 10 + i) as f64;
206 /// }
207 /// }
208 /// );
209 ///
210 /// // Now the array contains [0, 1, 2, ..., 99]
211 /// ```
212 ///
213 /// # Notes
214 ///
215 /// This method uses direct file I/O to ensure changes are properly persisted to disk,
216 /// which may be slower than memory-only operations but is more reliable for ensuring
217 /// data is properly saved, especially with large datasets.
218 fn process_chunks_mut<F>(&mut self, strategy: ChunkingStrategy, f: F)
219 where
220 F: Fn(&mut [A], usize);
221}
222
223/// Extension trait for parallel processing of memory-mapped arrays.
224///
225/// This trait is only available when the 'parallel' feature is enabled.
226/// It extends the `MemoryMappedChunks` trait with parallel processing capabilities.
227#[cfg(feature = "parallel")]
228pub trait MemoryMappedChunksParallel<A: Clone + Copy + 'static + Send + Sync + Send + Sync>:
229 MemoryMappedChunks<A>
230{
231 /// Process chunks in parallel and collect the results.
232 ///
233 /// This method works like `process_chunks` but processes the chunks in parallel using Rayon.
234 /// It's useful for computationally intensive operations on large datasets.
235 ///
236 /// # Arguments
237 ///
238 /// * `strategy` - The chunking strategy to determine chunk sizes
239 /// * `f` - A function that processes each chunk and returns a result
240 ///
241 /// # Returns
242 ///
243 /// A vector containing the results from processing each chunk, in chunk order
244 ///
245 /// # Examples
246 ///
247 /// ```ignore
248 /// # #[cfg(feature = "parallel")]
249 /// # {
250 /// use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunks, MemoryMappedChunksParallel};
251 /// use scirs2_core::ndarray::Array1;
252 ///
253 /// // Create a memory-mapped array with 20 elements (small numbers to avoid overflow)
254 /// let data = Array1::<i32>::from_vec((1..=20).collect());
255 /// let file_path = "example.bin"; // In practice, use a proper temporary path
256 /// let mmap = create_mmap(&data, file_path.as_ref(), AccessMode::Write, 0).expect("Operation failed");
257 ///
258 /// // Calculate the sum of each chunk in parallel
259 /// let chunk_sums = mmap.process_chunks_parallel(
260 /// ChunkingStrategy::Fixed(5),
261 /// |chunk, chunk_idx| chunk.iter().sum::<i32>()
262 /// );
263 ///
264 /// // We should have 4 chunks with reasonable sums
265 /// assert_eq!(chunk_sums.len(), 4);
266 /// # }
267 /// ```
268 fn process_chunks_parallel<F, R>(&self, strategy: ChunkingStrategy, f: F) -> Vec<R>
269 where
270 F: Fn(&[A], usize) -> R + Send + Sync,
271 R: Send;
272
273 /// Process chunks in parallel with a mutable function.
274 ///
275 /// This method works like `process_chunks_mut` but processes the chunks in parallel using Rayon.
276 /// It's useful for computationally intensive operations on large datasets.
277 ///
278 /// # Arguments
279 ///
280 /// * `strategy` - The chunking strategy to determine chunk sizes
281 /// * `f` - A function that processes and potentially modifies each chunk
282 ///
283 /// # Examples
284 ///
285 /// ```
286 /// # #[cfg(feature = "parallel")]
287 /// # {
288 /// use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunks, MemoryMappedChunksParallel};
289 /// use scirs2_core::ndarray::Array1;
290 ///
291 /// // Create a memory-mapped array with 100 zeros
292 /// let data = Array1::<f64>::zeros(100);
293 /// let file_path = "example.bin"; // In practice, use a proper temporary path
294 /// let mut mmap = create_mmap(&data, file_path.as_ref(), AccessMode::Write, 0).expect("Operation failed");
295 ///
296 /// // Modify each chunk in parallel: set elements to their index
297 /// mmap.process_chunks_mut_parallel(
298 /// ChunkingStrategy::Fixed(10),
299 /// |chunk, chunk_idx| {
300 /// for (i, elem) in chunk.iter_mut().enumerate() {
301 /// *elem = (chunk_idx * 10 + i) as f64;
302 /// }
303 /// }
304 /// );
305 /// # }
306 /// ```
307 ///
308 /// # Notes
309 ///
310 /// Even when used in parallel, this method ensures that file writes are safe
311 /// and do not conflict with each other by collecting all modifications and
312 /// applying them sequentially.
313 fn process_chunks_mut_parallel<F>(&mut self, strategy: ChunkingStrategy, f: F)
314 where
315 F: Fn(&mut [A], usize) + Send + Sync;
316}
317
318/// Iterator over chunks of a memory-mapped array (for 1D arrays only).
319///
320/// This iterator provides a convenient way to process a memory-mapped array in chunks,
321/// returning each chunk as an `Array1<A>`. It's particularly useful for operations where
322/// you want to process chunks sequentially and don't need to collect results.
323///
324/// # Examples
325///
326/// ```
327/// use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunkIter};
328/// use scirs2_core::ndarray::Array1;
329///
330/// // Create a memory-mapped array
331/// let data = Array1::<f64>::linspace(0., 99., 100);
332/// let file_path = "example.bin"; // In practice, use a proper temporary path
333/// let mmap = create_mmap(&data, file_path.as_ref(), AccessMode::Write, 0).expect("Operation failed");
334///
335/// // Process chunks using iterator
336/// let mut sum = 0.0;
337/// for chunk in mmap.chunks(ChunkingStrategy::Fixed(10)) {
338/// // Each chunk is an Array1<f64>
339/// sum += chunk.sum();
340/// }
341///
342/// // The sum should be the same as summing the original array
343/// assert!((sum - data.sum()).abs() < 1e-10);
344/// ```
345pub struct ChunkIter<'a, A>
346where
347 A: Clone + Copy + 'static + Send + Sync + Send + Sync,
348{
349 /// Reference to the memory-mapped array
350 array: &'a MemoryMappedArray<A>,
351 /// Current chunk index
352 current_idx: usize,
353 /// Total number of chunks
354 total_chunks: usize,
355 /// Chunking strategy
356 strategy: ChunkingStrategy,
357}
358
359impl<A> Iterator for ChunkIter<'_, A>
360where
361 A: Clone + Copy + 'static + Send + Sync,
362{
363 type Item = Array1<A>;
364
365 fn next(&mut self) -> Option<Self::Item> {
366 if self.current_idx >= self.total_chunks {
367 None
368 } else {
369 let chunk_idx = self.current_idx;
370 self.current_idx += 1;
371
372 // Get chunk start/end indices
373 let chunk_size = match self.strategy {
374 ChunkingStrategy::Fixed(size) => size,
375 ChunkingStrategy::NumChunks(n) => self.array.size.div_ceil(n),
376 ChunkingStrategy::Auto => (self.array.size / 100).max(1),
377 ChunkingStrategy::FixedBytes(bytes) => {
378 let element_size = std::mem::size_of::<A>();
379 let elements_per_chunk = bytes / element_size;
380 elements_per_chunk.max(1)
381 }
382 ChunkingStrategy::Advanced(_) => {
383 // For advanced strategies, fall back to auto sizing
384 (self.array.size / 100).max(1)
385 }
386 };
387
388 let start_idx = chunk_idx * chunk_size;
389 let end_idx = (start_idx + chunk_size).min(self.array.size);
390
391 // Get the array data to return a chunk
392 if let Ok(array_1d) = self.array.as_array::<crate::ndarray::Ix1>() {
393 Some(array_1d.slice(crate::s![start_idx..end_idx]).to_owned())
394 } else {
395 None
396 }
397 }
398 }
399
400 fn size_hint(&self) -> (usize, Option<usize>) {
401 let remaining = self.total_chunks - self.current_idx;
402 (remaining, Some(remaining))
403 }
404}
405
406impl<A> ExactSizeIterator for ChunkIter<'_, A> where A: Clone + Copy + 'static + Send + Sync {}
407
408/// Extension trait for MemoryMappedArray to enable chunked iteration.
409///
410/// This trait extends `MemoryMappedArray` with the ability to iterate over chunks
411/// of the array, which provides a convenient way to process large arrays sequentially
412/// in smaller, manageable pieces.
413pub trait MemoryMappedChunkIter<A: Clone + Copy + 'static + Send + Sync> {
414 /// Create an iterator over chunks of the array (for 1D arrays only).
415 ///
416 /// This method returns an iterator that yields chunks of the array as
417 /// `Array1<A>` values, making it easy to process the array in smaller pieces.
418 ///
419 /// # Arguments
420 ///
421 /// * `strategy` - The chunking strategy to determine chunk sizes
422 ///
423 /// # Returns
424 ///
425 /// An iterator that yields `Array1<A>` chunks of the memory-mapped array
426 ///
427 /// # Examples
428 ///
429 /// ```
430 /// use scirs2_core::memory_efficient::{create_mmap, AccessMode, ChunkingStrategy, MemoryMappedChunkIter};
431 /// use scirs2_core::ndarray::Array1;
432 ///
433 /// // Create a memory-mapped array
434 /// let data = Array1::<f64>::linspace(0., 99., 100);
435 /// let file_path = "example.bin"; // In practice, use a proper temporary path
436 /// let mmap = create_mmap(&data, file_path.as_ref(), AccessMode::Write, 0).expect("Operation failed");
437 ///
438 /// // Create a chunk iterator with chunks of size 25
439 /// let mut iter = mmap.chunks(ChunkingStrategy::Fixed(25));
440 ///
441 /// // Get the first chunk (elements 0-24)
442 /// let chunk1 = iter.next().expect("Operation failed");
443 /// assert_eq!(chunk1.len(), 25);
444 /// assert_eq!(chunk1[0], 0.0);
445 /// assert_eq!(chunk1[24], 24.0);
446 /// ```
447 fn chunks(&self, strategy: ChunkingStrategy) -> ChunkIter<A>;
448}
449
450impl<A: Clone + Copy + 'static + Send + Sync + Send + Sync> MemoryMappedChunks<A>
451 for MemoryMappedArray<A>
452{
453 fn chunk_count(&self, strategy: ChunkingStrategy) -> usize {
454 match strategy {
455 ChunkingStrategy::Fixed(size) => {
456 // Calculate how many chunks of the given size we need
457 self.size.div_ceil(size)
458 }
459 ChunkingStrategy::NumChunks(n) => {
460 // Number of chunks is explicitly specified
461 n
462 }
463 ChunkingStrategy::Auto => {
464 // Determine a reasonable chunk size based on the array size
465 let total_elements = self.size;
466 let optimal_chunk_size = (total_elements / 100).max(1);
467 total_elements.div_ceil(optimal_chunk_size)
468 }
469 ChunkingStrategy::FixedBytes(bytes) => {
470 // Calculate how many chunks based on bytes
471 let element_size = std::mem::size_of::<A>();
472 let elements_per_chunk = bytes / element_size;
473 let elements_per_chunk = elements_per_chunk.max(1); // Ensure at least 1 element per chunk
474 self.size.div_ceil(elements_per_chunk)
475 }
476 ChunkingStrategy::Advanced(_) => {
477 // For advanced strategies, fall back to auto sizing
478 let total_elements = self.size;
479 let optimal_chunk_size = (total_elements / 100).max(1);
480 total_elements.div_ceil(optimal_chunk_size)
481 }
482 }
483 }
484
485 fn process_chunks<F, R>(&self, strategy: ChunkingStrategy, f: F) -> Vec<R>
486 where
487 F: Fn(&[A], usize) -> R,
488 {
489 let total_chunks = self.chunk_count(strategy);
490 let mut results = Vec::with_capacity(total_chunks);
491
492 // Process each chunk by copying the data
493 for chunk_idx in 0..total_chunks {
494 // Calculate chunk size and indices
495 let chunk_size = match strategy {
496 ChunkingStrategy::Fixed(size) => size,
497 ChunkingStrategy::NumChunks(n) => self.size.div_ceil(n),
498 ChunkingStrategy::Auto => {
499 let total_elements = self.size;
500 (total_elements / 100).max(1)
501 }
502 ChunkingStrategy::FixedBytes(bytes) => {
503 let element_size = std::mem::size_of::<A>();
504 let elements_per_chunk = bytes / element_size;
505 elements_per_chunk.max(1)
506 }
507 ChunkingStrategy::Advanced(_) => {
508 // For advanced strategies, fall back to auto sizing
509 let total_elements = self.size;
510 (total_elements / 100).max(1)
511 }
512 };
513
514 let start_idx = chunk_idx * chunk_size;
515 let end_idx = (start_idx + chunk_size).min(self.size);
516
517 // Get the data for this chunk
518 if let Ok(array_1d) = self.as_array::<crate::ndarray::Ix1>() {
519 // Copy the data to a new array to avoid lifetime issues
520 let chunk_data = array_1d.slice(crate::s![start_idx..end_idx]).to_vec();
521
522 // Process the chunk data
523 results.push(f(&chunk_data, chunk_idx));
524 }
525 }
526
527 results
528 }
529
530 fn process_chunks_mut<F>(&mut self, strategy: ChunkingStrategy, f: F)
531 where
532 F: Fn(&mut [A], usize),
533 {
534 let total_chunks = self.chunk_count(strategy);
535 let element_size = std::mem::size_of::<A>();
536
537 // Process each chunk
538 for chunk_idx in 0..total_chunks {
539 // Calculate chunk size and indices
540 let chunk_size = match strategy {
541 ChunkingStrategy::Fixed(size) => size,
542 ChunkingStrategy::NumChunks(n) => self.size.div_ceil(n),
543 ChunkingStrategy::Auto => {
544 let total_elements = self.size;
545 (total_elements / 100).max(1)
546 }
547 ChunkingStrategy::FixedBytes(bytes) => {
548 let elements_per_chunk = bytes / element_size;
549 elements_per_chunk.max(1)
550 }
551 ChunkingStrategy::Advanced(_) => {
552 // For advanced strategies, fall back to auto sizing
553 let total_elements = self.size;
554 (total_elements / 100).max(1)
555 }
556 };
557
558 let start_idx = chunk_idx * chunk_size;
559 let end_idx = (start_idx + chunk_size).min(self.size);
560
561 // Get a copy of the data for this chunk
562 let mut chunk_data = Vec::with_capacity(end_idx - start_idx);
563
564 // Obtain the data safely through the memory mapping
565 if let Ok(array_1d) = self.as_array::<crate::ndarray::Ix1>() {
566 chunk_data.extend_from_slice(
567 array_1d
568 .slice(crate::s![start_idx..end_idx])
569 .as_slice()
570 .expect("Operation failed"),
571 );
572 } else {
573 continue;
574 }
575
576 // Process the chunk data with the provided function
577 f(&mut chunk_data, chunk_idx);
578
579 // Write the modified data back to the file directly
580 // This is the most reliable way to ensure changes are persisted
581 let file_path = &self.file_path;
582
583 if let Ok(mut file) = OpenOptions::new().write(true).open(file_path) {
584 // Calculate the effective offset (header + data offset + element position)
585 let effective_offset = self.offset + start_idx * element_size;
586
587 // Seek to the position and write the data
588 if file.seek(SeekFrom::Start(effective_offset as u64)).is_ok() {
589 // Convert the chunk data to bytes
590 let bytes = unsafe {
591 std::slice::from_raw_parts(
592 chunk_data.as_ptr() as *const u8,
593 chunk_data.len() * element_size,
594 )
595 };
596
597 // Write the bytes to the file
598 let _ = file.write_all(bytes);
599 let _ = file.flush();
600 }
601 }
602 }
603
604 // Reload the memory mapping to ensure changes are visible
605 let _ = self.reload();
606 }
607}
608
609// Add the parallel methods directly to the existing implementation
610#[cfg(feature = "parallel")]
611impl<A: Clone + Copy + 'static + Send + Sync + Send + Sync> MemoryMappedChunksParallel<A>
612 for MemoryMappedArray<A>
613{
614 fn process_chunks_parallel<F, R>(&self, strategy: ChunkingStrategy, f: F) -> Vec<R>
615 where
616 F: Fn(&[A], usize) -> R + Send + Sync,
617 R: Send,
618 {
619 // First, generate all the chunk indices and sizes
620 let total_chunks = self.chunk_count(strategy);
621 let chunks_info: Vec<_> = (0..total_chunks)
622 .map(|chunk_idx| {
623 let chunk_size = match strategy {
624 ChunkingStrategy::Fixed(size) => size,
625 ChunkingStrategy::NumChunks(n) => self.size.div_ceil(n),
626 ChunkingStrategy::Auto => {
627 let total_elements = self.size;
628 (total_elements / 100).max(1)
629 }
630 ChunkingStrategy::FixedBytes(bytes) => {
631 let element_size = std::mem::size_of::<A>();
632 let elements_per_chunk = bytes / element_size;
633 elements_per_chunk.max(1)
634 }
635 ChunkingStrategy::Advanced(_) => {
636 // For advanced strategies, fall back to auto sizing
637 let total_elements = self.size;
638 (total_elements / 100).max(1)
639 }
640 };
641
642 let start_idx = chunk_idx * chunk_size;
643 let end_idx = (start_idx + chunk_size).min(self.size);
644
645 (chunk_idx, start_idx, end_idx)
646 })
647 .collect();
648
649 // Get the full array data
650 let array_1d = match self.as_array::<crate::ndarray::Ix1>() {
651 Ok(arr) => arr,
652 Err(_) => return Vec::new(),
653 };
654
655 // Process chunks in parallel
656 let results: Vec<_> = chunks_info
657 .into_par_iter()
658 .map(|(chunk_idx, start_idx, end_idx)| {
659 // Copy the data for this chunk
660 let chunk_data = array_1d.slice(crate::s![start_idx..end_idx]).to_vec();
661
662 // Process the chunk and return the result
663 f(&chunk_data, chunk_idx)
664 })
665 .collect();
666
667 results
668 }
669
670 fn process_chunks_mut_parallel<F>(&mut self, strategy: ChunkingStrategy, f: F)
671 where
672 F: Fn(&mut [A], usize) + Send + Sync,
673 {
674 let total_chunks = self.chunk_count(strategy);
675 let element_size = std::mem::size_of::<A>();
676
677 // First, generate all the chunk indices and sizes
678 let chunks_info: Vec<_> = (0..total_chunks)
679 .map(|chunk_idx| {
680 let chunk_size = match strategy {
681 ChunkingStrategy::Fixed(size) => size,
682 ChunkingStrategy::NumChunks(n) => self.size.div_ceil(n),
683 ChunkingStrategy::Auto => {
684 let total_elements = self.size;
685 (total_elements / 100).max(1)
686 }
687 ChunkingStrategy::FixedBytes(bytes) => {
688 let elements_per_chunk = bytes / element_size;
689 elements_per_chunk.max(1)
690 }
691 ChunkingStrategy::Advanced(_) => {
692 // For advanced strategies, fall back to auto sizing
693 let total_elements = self.size;
694 (total_elements / 100).max(1)
695 }
696 };
697
698 let start_idx = chunk_idx * chunk_size;
699 let end_idx = (start_idx + chunk_size).min(self.size);
700
701 (chunk_idx, start_idx, end_idx)
702 })
703 .collect();
704
705 // Get reference to the file path for the closures
706 let file_path = self.file_path.clone();
707 let offset = self.offset;
708
709 // Get the full array data
710 let array_1d = match self.as_array::<crate::ndarray::Ix1>() {
711 Ok(arr) => arr,
712 Err(_) => return,
713 };
714
715 // Process chunks in parallel and collect the modified data
716 let modifications: Vec<_> = chunks_info
717 .into_par_iter()
718 .map(|(chunk_idx, start_idx, end_idx)| {
719 // Copy the data for this chunk
720 let mut chunk_data = array_1d.slice(crate::s![start_idx..end_idx]).to_vec();
721
722 // Process the chunk data with the provided function
723 f(&mut chunk_data, chunk_idx);
724
725 // Return the chunk index, start index, and modified data
726 (chunk_idx, start_idx, chunk_data)
727 })
728 .collect();
729
730 // Apply all modifications to the file sequentially to avoid conflicts
731 for (_, start_idx, chunk_data) in modifications {
732 if let Ok(mut file) = OpenOptions::new().write(true).open(&file_path) {
733 // Calculate the effective offset
734 let effective_offset = offset + start_idx * element_size;
735
736 // Seek to the position and write the data
737 if file.seek(SeekFrom::Start(effective_offset as u64)).is_ok() {
738 // Convert the chunk data to bytes
739 let bytes = unsafe {
740 std::slice::from_raw_parts(
741 chunk_data.as_ptr() as *const u8,
742 chunk_data.len() * element_size,
743 )
744 };
745
746 // Write the bytes to the file
747 let _ = file.write_all(bytes);
748 let _ = file.flush();
749 }
750 }
751 }
752
753 // Reload the memory mapping to ensure changes are visible
754 let _ = self.reload();
755 }
756}
757
758impl<A: Clone + Copy + 'static + Send + Sync> MemoryMappedChunkIter<A> for MemoryMappedArray<A> {
759 fn chunks(&self, strategy: ChunkingStrategy) -> ChunkIter<A> {
760 ChunkIter {
761 array: self,
762 current_idx: 0,
763 total_chunks: self.chunk_count(strategy),
764 strategy,
765 }
766 }
767}