Skip to main content

scirs2_core/memory_efficient/
large_data.rs

1//! Large dataset support with streaming iterators and zero-copy processing.
2//!
3//! This module provides efficient processing capabilities for GB-scale datasets
4//! that don't fit in memory, using memory-mapped files and streaming iterators.
5
6use super::memmap::MemoryMappedArray;
7use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
8use std::marker::PhantomData;
9use std::ops::Range;
10
11/// Streaming chunk iterator for large datasets
12///
13/// This iterator yields slices from a memory-mapped file without copying data.
14/// It's optimized for sequential access patterns with minimal memory footprint.
15pub struct StreamingChunkIterator<'a, A>
16where
17    A: Clone + Copy + Send + Sync + 'static,
18{
19    /// The underlying memory-mapped array
20    mmap: &'a MemoryMappedArray<A>,
21
22    /// Total number of elements
23    total_elements: usize,
24
25    /// Current position in the array
26    current_position: usize,
27
28    /// Chunk size in elements
29    chunk_size: usize,
30
31    /// Phantom data for type parameter
32    phantom: PhantomData<A>,
33}
34
35impl<'a, A> StreamingChunkIterator<'a, A>
36where
37    A: Clone + Copy + Send + Sync + 'static,
38{
39    /// Create a new streaming chunk iterator
40    pub fn new(mmap: &'a MemoryMappedArray<A>, chunk_size: usize) -> Self {
41        let total_elements = mmap.shape.iter().product();
42
43        Self {
44            mmap,
45            total_elements,
46            current_position: 0,
47            chunk_size,
48            phantom: PhantomData,
49        }
50    }
51
52    /// Get the total number of chunks
53    pub fn num_chunks(&self) -> usize {
54        self.total_elements.div_ceil(self.chunk_size)
55    }
56
57    /// Get the current chunk index
58    pub fn current_chunk(&self) -> usize {
59        self.current_position / self.chunk_size
60    }
61
62    /// Reset iterator to the beginning
63    pub fn reset(&mut self) {
64        self.current_position = 0;
65    }
66
67    /// Get a specific chunk by index without advancing the iterator
68    pub fn get_chunk(&self, chunk_index: usize) -> Option<&'a [A]> {
69        if chunk_index >= self.num_chunks() {
70            return None;
71        }
72
73        let start = chunk_index * self.chunk_size;
74        let end = ((chunk_index + 1) * self.chunk_size).min(self.total_elements);
75
76        let slice = self.mmap.as_slice();
77        Some(&slice[start..end])
78    }
79
80    /// Get the byte range for a specific chunk
81    pub fn chunk_byte_range(&self, chunk_index: usize) -> Option<Range<usize>> {
82        if chunk_index >= self.num_chunks() {
83            return None;
84        }
85
86        let elem_size = std::mem::size_of::<A>();
87        let start = chunk_index * self.chunk_size;
88        let end = ((chunk_index + 1) * self.chunk_size).min(self.total_elements);
89
90        Some((start * elem_size)..(end * elem_size))
91    }
92}
93
94impl<'a, A> Iterator for StreamingChunkIterator<'a, A>
95where
96    A: Clone + Copy + Send + Sync + 'static,
97{
98    type Item = &'a [A];
99
100    fn next(&mut self) -> Option<Self::Item> {
101        if self.current_position >= self.total_elements {
102            return None;
103        }
104
105        let start = self.current_position;
106        let end = (self.current_position + self.chunk_size).min(self.total_elements);
107
108        self.current_position = end;
109
110        let slice = self.mmap.as_slice();
111        Some(&slice[start..end])
112    }
113
114    fn size_hint(&self) -> (usize, Option<usize>) {
115        let remaining = self.total_elements - self.current_position;
116        let remaining_chunks = remaining.div_ceil(self.chunk_size);
117        (remaining_chunks, Some(remaining_chunks))
118    }
119}
120
121impl<'a, A> ExactSizeIterator for StreamingChunkIterator<'a, A>
122where
123    A: Clone + Copy + Send + Sync + 'static,
124{
125    fn len(&self) -> usize {
126        let remaining = self.total_elements - self.current_position;
127        remaining.div_ceil(self.chunk_size)
128    }
129}
130
131/// Parallel processing support for streaming iterators
132#[cfg(feature = "parallel")]
133pub struct ParallelStreamingProcessor<'a, A>
134where
135    A: Clone + Copy + Send + Sync + 'static,
136{
137    /// The underlying iterator
138    iterator: StreamingChunkIterator<'a, A>,
139
140    /// Number of parallel workers
141    num_workers: usize,
142}
143
144#[cfg(feature = "parallel")]
145impl<'a, A> ParallelStreamingProcessor<'a, A>
146where
147    A: Clone + Copy + Send + Sync + 'static,
148{
149    /// Create a new parallel streaming processor
150    pub fn new(mmap: &'a MemoryMappedArray<A>, chunk_size: usize, num_workers: usize) -> Self {
151        Self {
152            iterator: StreamingChunkIterator::new(mmap, chunk_size),
153            num_workers,
154        }
155    }
156
157    /// Process all chunks in parallel
158    pub fn process<F, R>(&self, f: F) -> CoreResult<Vec<R>>
159    where
160        F: Fn(&[A]) -> R + Send + Sync,
161        R: Send,
162    {
163        use crate::parallel_ops::*;
164
165        let num_chunks = self.iterator.num_chunks();
166        let chunk_indices: Vec<usize> = (0..num_chunks).collect();
167
168        let results: Vec<R> = chunk_indices
169            .into_par_iter()
170            .filter_map(|idx| self.iterator.get_chunk(idx).map(|chunk| f(chunk)))
171            .collect();
172
173        Ok(results)
174    }
175
176    /// Process chunks in parallel with error handling
177    pub fn try_process<F, R, E>(&self, f: F) -> CoreResult<Vec<R>>
178    where
179        F: Fn(&[A]) -> Result<R, E> + Send + Sync,
180        R: Send,
181        E: std::fmt::Display + Send,
182    {
183        use crate::parallel_ops::*;
184
185        let num_chunks = self.iterator.num_chunks();
186        let chunk_indices: Vec<usize> = (0..num_chunks).collect();
187
188        let results: Result<Vec<R>, CoreError> = chunk_indices
189            .into_par_iter()
190            .map(|idx| {
191                self.iterator
192                    .get_chunk(idx)
193                    .ok_or_else(|| {
194                        CoreError::IndexError(
195                            ErrorContext::new(format!("Chunk {idx} not found"))
196                                .with_location(ErrorLocation::new(file!(), line!())),
197                        )
198                    })
199                    .and_then(|chunk| {
200                        f(chunk).map_err(|e| {
201                            CoreError::InvalidArgument(
202                                ErrorContext::new(format!("Processing error: {e}"))
203                                    .with_location(ErrorLocation::new(file!(), line!())),
204                            )
205                        })
206                    })
207            })
208            .collect();
209
210        results
211    }
212}
213
214/// Create a streaming iterator for a memory-mapped array
215#[allow(dead_code)]
216pub fn create_streaming_iterator<A>(
217    mmap: &MemoryMappedArray<A>,
218    chunk_size: usize,
219) -> StreamingChunkIterator<'_, A>
220where
221    A: Clone + Copy + Send + Sync + 'static,
222{
223    StreamingChunkIterator::new(mmap, chunk_size)
224}
225
226/// Create a parallel streaming processor
227#[cfg(feature = "parallel")]
228#[allow(dead_code)]
229pub fn create_parallel_processor<A>(
230    mmap: &MemoryMappedArray<A>,
231    chunk_size: usize,
232    num_workers: usize,
233) -> ParallelStreamingProcessor<'_, A>
234where
235    A: Clone + Copy + Send + Sync + 'static,
236{
237    ParallelStreamingProcessor::new(mmap, chunk_size, num_workers)
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use crate::memory_efficient::memmap::{create_temp_mmap, AccessMode};
244    use crate::ndarray::Array1;
245
246    #[test]
247    fn test_streaming_iterator_creation() {
248        // Create test data
249        let data: Array1<f64> = Array1::from_vec((0..1000).map(|i| i as f64).collect());
250
251        // Create temporary memory-mapped array
252        let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
253
254        // Create streaming iterator with 100-element chunks
255        let iterator = StreamingChunkIterator::new(&mmap, 100);
256
257        assert_eq!(iterator.num_chunks(), 10);
258        assert_eq!(iterator.current_chunk(), 0);
259    }
260
261    #[test]
262    fn test_streaming_iterator_iteration() {
263        let data: Array1<f64> = Array1::from_vec((0..1000).map(|i| i as f64).collect());
264        let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
265
266        let iterator = StreamingChunkIterator::new(&mmap, 100);
267
268        let chunks: Vec<_> = iterator.collect();
269
270        assert_eq!(chunks.len(), 10);
271        assert_eq!(chunks[0].len(), 100);
272        assert_eq!(chunks[9].len(), 100);
273    }
274
275    #[test]
276    fn test_streaming_iterator_get_chunk() {
277        let data: Array1<f64> = Array1::from_vec((0..1000).map(|i| i as f64).collect());
278        let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
279
280        let iterator = StreamingChunkIterator::new(&mmap, 100);
281
282        // Get specific chunk
283        let chunk = iterator.get_chunk(5).expect("Chunk not found");
284
285        assert_eq!(chunk.len(), 100);
286        assert!((chunk[0] - 500.0).abs() < 1e-10);
287        assert!((chunk[99] - 599.0).abs() < 1e-10);
288    }
289
290    #[test]
291    fn test_streaming_iterator_reset() {
292        let data: Array1<f64> = Array1::from_vec((0..1000).map(|i| i as f64).collect());
293        let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
294
295        let mut iterator = StreamingChunkIterator::new(&mmap, 100);
296
297        // Advance iterator
298        let _ = iterator.next();
299        let _ = iterator.next();
300
301        assert_eq!(iterator.current_chunk(), 2);
302
303        // Reset
304        iterator.reset();
305
306        assert_eq!(iterator.current_chunk(), 0);
307    }
308
309    #[test]
310    fn test_streaming_iterator_exact_size() {
311        let data: Array1<f64> = Array1::from_vec((0..1000).map(|i| i as f64).collect());
312        let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
313
314        let iterator = StreamingChunkIterator::new(&mmap, 100);
315
316        assert_eq!(iterator.len(), 10);
317
318        let mut iter = iterator;
319        let _ = iter.next();
320        assert_eq!(iter.len(), 9);
321    }
322
323    #[test]
324    fn test_chunk_byte_range() {
325        let data: Array1<f64> = Array1::from_vec((0..1000).map(|i| i as f64).collect());
326        let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
327
328        let iterator = StreamingChunkIterator::new(&mmap, 100);
329
330        let range = iterator.chunk_byte_range(0).expect("Range not found");
331        let elem_size = std::mem::size_of::<f64>();
332
333        assert_eq!(range, 0..(100 * elem_size));
334    }
335
336    #[test]
337    #[cfg(feature = "parallel")]
338    fn test_parallel_processor() {
339        let data: Array1<f64> = Array1::from_vec((0..10000).map(|i| i as f64).collect());
340        let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
341
342        let processor = ParallelStreamingProcessor::new(&mmap, 1000, 4);
343
344        // Sum all elements in each chunk
345        let chunk_sums = processor
346            .process(|chunk| chunk.iter().sum::<f64>())
347            .expect("Processing failed");
348
349        assert_eq!(chunk_sums.len(), 10);
350    }
351
352    #[test]
353    fn test_uneven_chunks() {
354        // Test with data that doesn't divide evenly by chunk size
355        let data: Array1<f64> = Array1::from_vec((0..1050).map(|i| i as f64).collect());
356        let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
357
358        let iterator = StreamingChunkIterator::new(&mmap, 100);
359
360        let chunks: Vec<_> = iterator.collect();
361
362        assert_eq!(chunks.len(), 11);
363        assert_eq!(chunks[10].len(), 50); // Last chunk has 50 elements
364    }
365}