scirs2_core/memory_efficient/
large_data.rs1use super::memmap::MemoryMappedArray;
7use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
8use std::marker::PhantomData;
9use std::ops::Range;
10
11pub struct StreamingChunkIterator<'a, A>
16where
17 A: Clone + Copy + Send + Sync + 'static,
18{
19 mmap: &'a MemoryMappedArray<A>,
21
22 total_elements: usize,
24
25 current_position: usize,
27
28 chunk_size: usize,
30
31 phantom: PhantomData<A>,
33}
34
35impl<'a, A> StreamingChunkIterator<'a, A>
36where
37 A: Clone + Copy + Send + Sync + 'static,
38{
39 pub fn new(mmap: &'a MemoryMappedArray<A>, chunk_size: usize) -> Self {
41 let total_elements = mmap.shape.iter().product();
42
43 Self {
44 mmap,
45 total_elements,
46 current_position: 0,
47 chunk_size,
48 phantom: PhantomData,
49 }
50 }
51
52 pub fn num_chunks(&self) -> usize {
54 self.total_elements.div_ceil(self.chunk_size)
55 }
56
57 pub fn current_chunk(&self) -> usize {
59 self.current_position / self.chunk_size
60 }
61
62 pub fn reset(&mut self) {
64 self.current_position = 0;
65 }
66
67 pub fn get_chunk(&self, chunk_index: usize) -> Option<&'a [A]> {
69 if chunk_index >= self.num_chunks() {
70 return None;
71 }
72
73 let start = chunk_index * self.chunk_size;
74 let end = ((chunk_index + 1) * self.chunk_size).min(self.total_elements);
75
76 let slice = self.mmap.as_slice();
77 Some(&slice[start..end])
78 }
79
80 pub fn chunk_byte_range(&self, chunk_index: usize) -> Option<Range<usize>> {
82 if chunk_index >= self.num_chunks() {
83 return None;
84 }
85
86 let elem_size = std::mem::size_of::<A>();
87 let start = chunk_index * self.chunk_size;
88 let end = ((chunk_index + 1) * self.chunk_size).min(self.total_elements);
89
90 Some((start * elem_size)..(end * elem_size))
91 }
92}
93
94impl<'a, A> Iterator for StreamingChunkIterator<'a, A>
95where
96 A: Clone + Copy + Send + Sync + 'static,
97{
98 type Item = &'a [A];
99
100 fn next(&mut self) -> Option<Self::Item> {
101 if self.current_position >= self.total_elements {
102 return None;
103 }
104
105 let start = self.current_position;
106 let end = (self.current_position + self.chunk_size).min(self.total_elements);
107
108 self.current_position = end;
109
110 let slice = self.mmap.as_slice();
111 Some(&slice[start..end])
112 }
113
114 fn size_hint(&self) -> (usize, Option<usize>) {
115 let remaining = self.total_elements - self.current_position;
116 let remaining_chunks = remaining.div_ceil(self.chunk_size);
117 (remaining_chunks, Some(remaining_chunks))
118 }
119}
120
121impl<'a, A> ExactSizeIterator for StreamingChunkIterator<'a, A>
122where
123 A: Clone + Copy + Send + Sync + 'static,
124{
125 fn len(&self) -> usize {
126 let remaining = self.total_elements - self.current_position;
127 remaining.div_ceil(self.chunk_size)
128 }
129}
130
131#[cfg(feature = "parallel")]
133pub struct ParallelStreamingProcessor<'a, A>
134where
135 A: Clone + Copy + Send + Sync + 'static,
136{
137 iterator: StreamingChunkIterator<'a, A>,
139
140 num_workers: usize,
142}
143
144#[cfg(feature = "parallel")]
145impl<'a, A> ParallelStreamingProcessor<'a, A>
146where
147 A: Clone + Copy + Send + Sync + 'static,
148{
149 pub fn new(mmap: &'a MemoryMappedArray<A>, chunk_size: usize, num_workers: usize) -> Self {
151 Self {
152 iterator: StreamingChunkIterator::new(mmap, chunk_size),
153 num_workers,
154 }
155 }
156
157 pub fn process<F, R>(&self, f: F) -> CoreResult<Vec<R>>
159 where
160 F: Fn(&[A]) -> R + Send + Sync,
161 R: Send,
162 {
163 use crate::parallel_ops::*;
164
165 let num_chunks = self.iterator.num_chunks();
166 let chunk_indices: Vec<usize> = (0..num_chunks).collect();
167
168 let results: Vec<R> = chunk_indices
169 .into_par_iter()
170 .filter_map(|idx| self.iterator.get_chunk(idx).map(|chunk| f(chunk)))
171 .collect();
172
173 Ok(results)
174 }
175
176 pub fn try_process<F, R, E>(&self, f: F) -> CoreResult<Vec<R>>
178 where
179 F: Fn(&[A]) -> Result<R, E> + Send + Sync,
180 R: Send,
181 E: std::fmt::Display + Send,
182 {
183 use crate::parallel_ops::*;
184
185 let num_chunks = self.iterator.num_chunks();
186 let chunk_indices: Vec<usize> = (0..num_chunks).collect();
187
188 let results: Result<Vec<R>, CoreError> = chunk_indices
189 .into_par_iter()
190 .map(|idx| {
191 self.iterator
192 .get_chunk(idx)
193 .ok_or_else(|| {
194 CoreError::IndexError(
195 ErrorContext::new(format!("Chunk {idx} not found"))
196 .with_location(ErrorLocation::new(file!(), line!())),
197 )
198 })
199 .and_then(|chunk| {
200 f(chunk).map_err(|e| {
201 CoreError::InvalidArgument(
202 ErrorContext::new(format!("Processing error: {e}"))
203 .with_location(ErrorLocation::new(file!(), line!())),
204 )
205 })
206 })
207 })
208 .collect();
209
210 results
211 }
212}
213
214#[allow(dead_code)]
216pub fn create_streaming_iterator<A>(
217 mmap: &MemoryMappedArray<A>,
218 chunk_size: usize,
219) -> StreamingChunkIterator<'_, A>
220where
221 A: Clone + Copy + Send + Sync + 'static,
222{
223 StreamingChunkIterator::new(mmap, chunk_size)
224}
225
226#[cfg(feature = "parallel")]
228#[allow(dead_code)]
229pub fn create_parallel_processor<A>(
230 mmap: &MemoryMappedArray<A>,
231 chunk_size: usize,
232 num_workers: usize,
233) -> ParallelStreamingProcessor<'_, A>
234where
235 A: Clone + Copy + Send + Sync + 'static,
236{
237 ParallelStreamingProcessor::new(mmap, chunk_size, num_workers)
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use crate::memory_efficient::memmap::{create_temp_mmap, AccessMode};
244 use crate::ndarray::Array1;
245
246 #[test]
247 fn test_streaming_iterator_creation() {
248 let data: Array1<f64> = Array1::from_vec((0..1000).map(|i| i as f64).collect());
250
251 let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
253
254 let iterator = StreamingChunkIterator::new(&mmap, 100);
256
257 assert_eq!(iterator.num_chunks(), 10);
258 assert_eq!(iterator.current_chunk(), 0);
259 }
260
261 #[test]
262 fn test_streaming_iterator_iteration() {
263 let data: Array1<f64> = Array1::from_vec((0..1000).map(|i| i as f64).collect());
264 let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
265
266 let iterator = StreamingChunkIterator::new(&mmap, 100);
267
268 let chunks: Vec<_> = iterator.collect();
269
270 assert_eq!(chunks.len(), 10);
271 assert_eq!(chunks[0].len(), 100);
272 assert_eq!(chunks[9].len(), 100);
273 }
274
275 #[test]
276 fn test_streaming_iterator_get_chunk() {
277 let data: Array1<f64> = Array1::from_vec((0..1000).map(|i| i as f64).collect());
278 let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
279
280 let iterator = StreamingChunkIterator::new(&mmap, 100);
281
282 let chunk = iterator.get_chunk(5).expect("Chunk not found");
284
285 assert_eq!(chunk.len(), 100);
286 assert!((chunk[0] - 500.0).abs() < 1e-10);
287 assert!((chunk[99] - 599.0).abs() < 1e-10);
288 }
289
290 #[test]
291 fn test_streaming_iterator_reset() {
292 let data: Array1<f64> = Array1::from_vec((0..1000).map(|i| i as f64).collect());
293 let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
294
295 let mut iterator = StreamingChunkIterator::new(&mmap, 100);
296
297 let _ = iterator.next();
299 let _ = iterator.next();
300
301 assert_eq!(iterator.current_chunk(), 2);
302
303 iterator.reset();
305
306 assert_eq!(iterator.current_chunk(), 0);
307 }
308
309 #[test]
310 fn test_streaming_iterator_exact_size() {
311 let data: Array1<f64> = Array1::from_vec((0..1000).map(|i| i as f64).collect());
312 let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
313
314 let iterator = StreamingChunkIterator::new(&mmap, 100);
315
316 assert_eq!(iterator.len(), 10);
317
318 let mut iter = iterator;
319 let _ = iter.next();
320 assert_eq!(iter.len(), 9);
321 }
322
323 #[test]
324 fn test_chunk_byte_range() {
325 let data: Array1<f64> = Array1::from_vec((0..1000).map(|i| i as f64).collect());
326 let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
327
328 let iterator = StreamingChunkIterator::new(&mmap, 100);
329
330 let range = iterator.chunk_byte_range(0).expect("Range not found");
331 let elem_size = std::mem::size_of::<f64>();
332
333 assert_eq!(range, 0..(100 * elem_size));
334 }
335
336 #[test]
337 #[cfg(feature = "parallel")]
338 fn test_parallel_processor() {
339 let data: Array1<f64> = Array1::from_vec((0..10000).map(|i| i as f64).collect());
340 let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
341
342 let processor = ParallelStreamingProcessor::new(&mmap, 1000, 4);
343
344 let chunk_sums = processor
346 .process(|chunk| chunk.iter().sum::<f64>())
347 .expect("Processing failed");
348
349 assert_eq!(chunk_sums.len(), 10);
350 }
351
352 #[test]
353 fn test_uneven_chunks() {
354 let data: Array1<f64> = Array1::from_vec((0..1050).map(|i| i as f64).collect());
356 let mmap = create_temp_mmap(&data, AccessMode::ReadOnly, 0).expect("Failed to create mmap");
357
358 let iterator = StreamingChunkIterator::new(&mmap, 100);
359
360 let chunks: Vec<_> = iterator.collect();
361
362 assert_eq!(chunks.len(), 11);
363 assert_eq!(chunks[10].len(), 50); }
365}