vortex_scan/
multi_scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use futures::executor::LocalPool;
5use futures::future::BoxFuture;
6use vortex_error::VortexResult;
7
8use crate::work_queue::{TaskFactory, WorkStealingIterator, WorkStealingQueue};
9
10pub type ArrayFuture<T> = BoxFuture<'static, VortexResult<Option<T>>>;
11
12/// A multi-scan for executing multiple scans concurrently across workers.
13#[derive(Clone)]
14pub struct MultiScan<T> {
15    queue: WorkStealingQueue<ArrayFuture<T>>,
16}
17
18impl<T: 'static + Send> MultiScan<T> {
19    /// Created with lazily constructed scan builders closures.
20    pub fn new<I, F>(closures: I) -> Self
21    where
22        F: FnOnce() -> VortexResult<Vec<ArrayFuture<T>>> + 'static + Send + Sync,
23        I: IntoIterator<Item = F>,
24    {
25        Self {
26            queue: WorkStealingQueue::new(
27                closures
28                    .into_iter()
29                    .map(|closure| Box::new(closure) as TaskFactory<ArrayFuture<T>>),
30            ),
31        }
32    }
33
34    pub fn new_iterator(self) -> MultiScanIterator<T> {
35        MultiScanIterator {
36            inner: self.queue.new_iterator(),
37            local_pool: LocalPool::new(),
38        }
39    }
40}
41
42/// Scan iterator to participate in a `MultiScan`.
43pub struct MultiScanIterator<T> {
44    inner: WorkStealingIterator<ArrayFuture<T>>,
45    local_pool: LocalPool,
46}
47
48impl<T> Clone for MultiScanIterator<T> {
49    fn clone(&self) -> Self {
50        Self {
51            inner: self.inner.clone(),
52            local_pool: Default::default(),
53        }
54    }
55}
56
57impl<T: Send + Sync + 'static> Iterator for MultiScanIterator<T> {
58    type Item = VortexResult<T>;
59
60    #[inline]
61    fn next(&mut self) -> Option<VortexResult<T>> {
62        loop {
63            match self.inner.next()? {
64                Ok(task) => match self.local_pool.run_until(task) {
65                    // If the underlying future returns Ok(None) we have to keep going
66                    // until we find the next present element or end of iterator.
67                    Ok(Some(value)) => return Some(Ok(value)),
68                    Ok(None) => continue,
69                    Err(e) => return Some(Err(e)),
70                },
71                Err(e) => return Some(Err(e)),
72            }
73        }
74    }
75}
76
77#[cfg(test)]
78mod tests {
79    use std::sync::Arc;
80    use std::sync::atomic::{AtomicUsize, Ordering};
81
82    use vortex_error::{VortexResult, vortex_err};
83
84    use super::*;
85
86    #[test]
87    fn test_multi_scan_basic() {
88        // Create multiple scan tasks
89        let closures = vec![
90            || -> VortexResult<Vec<ArrayFuture<i32>>> {
91                Ok(vec![
92                    Box::pin(async { Ok(Some(1)) }),
93                    Box::pin(async { Ok(Some(2)) }),
94                ])
95            },
96            || -> VortexResult<Vec<ArrayFuture<i32>>> {
97                Ok(vec![
98                    Box::pin(async { Ok(Some(3)) }),
99                    Box::pin(async { Ok(Some(4)) }),
100                ])
101            },
102        ];
103
104        let multi_scan = MultiScan::new(closures);
105        let iterator = multi_scan.new_iterator();
106
107        let mut results = Vec::new();
108        for result in iterator {
109            results.push(result.unwrap());
110        }
111
112        // Should get all 4 values
113        assert_eq!(results.len(), 4);
114        assert!(results.contains(&1));
115        assert!(results.contains(&2));
116        assert!(results.contains(&3));
117        assert!(results.contains(&4));
118    }
119
120    #[test]
121    fn test_multi_scan_error_handling() {
122        // Create closures where one returns an error
123        let closures = vec![
124            || -> VortexResult<Vec<ArrayFuture<i32>>> { Ok(vec![Box::pin(async { Ok(Some(1)) })]) },
125            || -> VortexResult<Vec<ArrayFuture<i32>>> { Err(vortex_err!("Task factory error")) },
126            || -> VortexResult<Vec<ArrayFuture<i32>>> { Ok(vec![Box::pin(async { Ok(Some(2)) })]) },
127        ];
128
129        let multi_scan = MultiScan::new(closures);
130        let iterator = multi_scan.new_iterator();
131
132        let mut has_error = false;
133        let mut values = Vec::new();
134
135        for result in iterator {
136            match result {
137                Ok(v) => values.push(v),
138                Err(_) => has_error = true,
139            }
140        }
141
142        assert!(has_error, "Expected to encounter an error");
143        // Should still get the values from successful factories
144        assert!(values.contains(&1) || values.contains(&2));
145    }
146
147    #[test]
148    fn test_multi_scan_iterator_clone() {
149        let counter = Arc::new(AtomicUsize::new(0));
150        let counter_clone = counter.clone();
151
152        let closures = vec![move || -> VortexResult<Vec<ArrayFuture<i32>>> {
153            counter_clone.fetch_add(1, Ordering::SeqCst);
154            Ok(vec![
155                Box::pin(async { Ok(Some(1)) }),
156                Box::pin(async { Ok(Some(2)) }),
157            ])
158        }];
159
160        let multi_scan = MultiScan::new(closures);
161        let iterator1 = multi_scan.new_iterator();
162
163        // Clone the iterator
164        let mut iterator2 = iterator1;
165
166        // Both iterators should be able to get results
167        let result = iterator2.next();
168        assert!(result.is_some());
169
170        // Factory should only be called once despite having two iterators
171        assert_eq!(counter.load(Ordering::SeqCst), 1);
172    }
173
174    #[test]
175    fn test_multi_scan_empty() {
176        type Factory = Box<dyn FnOnce() -> VortexResult<Vec<ArrayFuture<i32>>> + Send + Sync>;
177        let closures: Vec<Factory> = vec![];
178
179        let multi_scan = MultiScan::new(closures);
180        let mut iterator = multi_scan.new_iterator();
181
182        // Should return None immediately
183        assert!(iterator.next().is_none());
184    }
185
186    #[test]
187    fn test_multi_scan_with_none_results() {
188        let closures = vec![|| -> VortexResult<Vec<ArrayFuture<Option<i32>>>> {
189            Ok(vec![
190                Box::pin(async { Ok(Some(None)) }),
191                Box::pin(async { Ok(Some(Some(1))) }),
192                Box::pin(async { Ok(Some(None)) }),
193            ])
194        }];
195
196        let multi_scan = MultiScan::new(closures);
197        let iterator = multi_scan.new_iterator();
198
199        let mut results = Vec::new();
200        for result in iterator {
201            if let Ok(Some(v)) = result {
202                results.push(v);
203            }
204        }
205
206        // Should only get the Some(1) value
207        assert_eq!(results, vec![1]);
208    }
209
210    #[test]
211    fn test_multi_scan_concurrent_iterators() {
212        let closures = vec![|| -> VortexResult<Vec<ArrayFuture<i32>>> {
213            Ok((1..=10)
214                .map(|i| Box::pin(async move { Ok(Some(i)) }) as ArrayFuture<i32>)
215                .collect())
216        }];
217
218        let multi_scan = MultiScan::new(closures);
219
220        // Create multiple iterators
221        let mut iter1 = multi_scan.clone().new_iterator();
222        let mut iter2 = multi_scan.new_iterator();
223
224        // Both should be able to steal work
225        let mut count1 = 0;
226        let mut count2 = 0;
227
228        // Interleave taking from both iterators
229        loop {
230            let done1 = iter1
231                .next()
232                .map(|r| {
233                    count1 += r.is_ok() as usize;
234                })
235                .is_none();
236            let done2 = iter2
237                .next()
238                .map(|r| {
239                    count2 += r.is_ok() as usize;
240                })
241                .is_none();
242
243            if done1 && done2 {
244                break;
245            }
246        }
247
248        // Together they should process all 10 items
249        assert_eq!(count1 + count2, 10);
250        // Both should have gotten some work
251        assert!(count1 > 0);
252        assert!(count2 > 0);
253    }
254
255    #[test]
256    fn test_local_pool_error_propagation() {
257        let closures = vec![|| -> VortexResult<Vec<ArrayFuture<String>>> {
258            Ok(vec![
259                Box::pin(async { Ok(Some("success".to_string())) }),
260                Box::pin(async { Err(vortex_err!("async error")) }),
261                Box::pin(async { Ok(Some("after_errors".to_string())) }),
262            ])
263        }];
264
265        let multi_scan = MultiScan::new(closures);
266        let iterator = multi_scan.new_iterator();
267
268        let mut results = Vec::new();
269        let mut errors = Vec::new();
270
271        for result in iterator {
272            match result {
273                Ok(v) => results.push(v),
274                Err(e) => errors.push(e),
275            }
276        }
277
278        // Should get both successful results and the error
279        assert!(results.contains(&"success".to_string()));
280        assert!(results.contains(&"after_errors".to_string()));
281        assert_eq!(errors.len(), 1);
282    }
283
284    #[test]
285    #[should_panic(expected = "Factory panic!")]
286    fn test_task_factory_panic_handling() {
287        // Test that panics in task factories are propagated
288        let closures = vec![|| -> VortexResult<Vec<ArrayFuture<i32>>> {
289            panic!("Factory panic!");
290        }];
291
292        let multi_scan = MultiScan::new(closures);
293        let iterator = multi_scan.new_iterator();
294
295        // This should panic when the factory is executed
296        for _ in iterator {
297            // Consume iterator
298        }
299    }
300}