vortex_scan/
multi_scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use futures::executor::LocalPool;
5use futures::future::BoxFuture;
6use vortex_error::VortexResult;
7
8use crate::work_queue::{TaskFactory, WorkStealingIterator, WorkStealingQueue};
9
10pub type ArrayFuture<T> = BoxFuture<'static, VortexResult<Option<T>>>;
11
12/// A multi-scan for executing multiple scans concurrently across workers.
13#[derive(Clone)]
14pub struct MultiScan<T> {
15    queue: WorkStealingQueue<ArrayFuture<T>>,
16}
17
18impl<T: 'static + Send> MultiScan<T> {
19    /// Created with lazily constructed scan builders closures.
20    pub fn new<I, F>(closures: I) -> Self
21    where
22        F: FnOnce() -> VortexResult<Vec<ArrayFuture<T>>> + 'static + Send + Sync,
23        I: IntoIterator<Item = F>,
24    {
25        Self {
26            queue: WorkStealingQueue::new(
27                closures
28                    .into_iter()
29                    .map(|closure| Box::new(closure) as TaskFactory<ArrayFuture<T>>),
30            ),
31        }
32    }
33
34    pub fn new_iterator(self) -> MultiScanIterator<T> {
35        MultiScanIterator {
36            inner: self.queue.new_iterator(),
37            local_pool: LocalPool::new(),
38        }
39    }
40}
41
42/// Scan iterator to participate in a `MultiScan`.
43pub struct MultiScanIterator<T> {
44    inner: WorkStealingIterator<ArrayFuture<T>>,
45    local_pool: LocalPool,
46}
47
48impl<T> Clone for MultiScanIterator<T> {
49    fn clone(&self) -> Self {
50        Self {
51            inner: self.inner.clone(),
52            local_pool: Default::default(),
53        }
54    }
55}
56
57impl<T: Send + Sync + 'static> Iterator for MultiScanIterator<T> {
58    type Item = VortexResult<T>;
59
60    fn next(&mut self) -> Option<VortexResult<T>> {
61        loop {
62            match self.inner.next()? {
63                Ok(task) => match self.local_pool.run_until(task) {
64                    // If the underlying future returns Ok(None) we have to keep going
65                    // until we find the next present element or end of iterator.
66                    Ok(Some(value)) => return Some(Ok(value)),
67                    Ok(None) => continue,
68                    Err(e) => return Some(Err(e)),
69                },
70                Err(e) => return Some(Err(e)),
71            }
72        }
73    }
74}
75
76#[cfg(test)]
77mod tests {
78    use std::sync::Arc;
79    use std::sync::atomic::{AtomicUsize, Ordering};
80
81    use vortex_error::{VortexResult, vortex_err};
82
83    use super::*;
84
85    #[test]
86    fn test_multi_scan_basic() {
87        // Create multiple scan tasks
88        let closures = vec![
89            || -> VortexResult<Vec<ArrayFuture<i32>>> {
90                Ok(vec![
91                    Box::pin(async { Ok(Some(1)) }),
92                    Box::pin(async { Ok(Some(2)) }),
93                ])
94            },
95            || -> VortexResult<Vec<ArrayFuture<i32>>> {
96                Ok(vec![
97                    Box::pin(async { Ok(Some(3)) }),
98                    Box::pin(async { Ok(Some(4)) }),
99                ])
100            },
101        ];
102
103        let multi_scan = MultiScan::new(closures);
104        let iterator = multi_scan.new_iterator();
105
106        let mut results = Vec::new();
107        for result in iterator {
108            results.push(result.unwrap());
109        }
110
111        // Should get all 4 values
112        assert_eq!(results.len(), 4);
113        assert!(results.contains(&1));
114        assert!(results.contains(&2));
115        assert!(results.contains(&3));
116        assert!(results.contains(&4));
117    }
118
119    #[test]
120    fn test_multi_scan_error_handling() {
121        // Create closures where one returns an error
122        let closures = vec![
123            || -> VortexResult<Vec<ArrayFuture<i32>>> { Ok(vec![Box::pin(async { Ok(Some(1)) })]) },
124            || -> VortexResult<Vec<ArrayFuture<i32>>> { Err(vortex_err!("Task factory error")) },
125            || -> VortexResult<Vec<ArrayFuture<i32>>> { Ok(vec![Box::pin(async { Ok(Some(2)) })]) },
126        ];
127
128        let multi_scan = MultiScan::new(closures);
129        let iterator = multi_scan.new_iterator();
130
131        let mut has_error = false;
132        let mut values = Vec::new();
133
134        for result in iterator {
135            match result {
136                Ok(v) => values.push(v),
137                Err(_) => has_error = true,
138            }
139        }
140
141        assert!(has_error, "Expected to encounter an error");
142        // Should still get the values from successful factories
143        assert!(values.contains(&1) || values.contains(&2));
144    }
145
146    #[test]
147    fn test_multi_scan_iterator_clone() {
148        let counter = Arc::new(AtomicUsize::new(0));
149        let counter_clone = counter.clone();
150
151        let closures = vec![move || -> VortexResult<Vec<ArrayFuture<i32>>> {
152            counter_clone.fetch_add(1, Ordering::SeqCst);
153            Ok(vec![
154                Box::pin(async { Ok(Some(1)) }),
155                Box::pin(async { Ok(Some(2)) }),
156            ])
157        }];
158
159        let multi_scan = MultiScan::new(closures);
160        let iterator1 = multi_scan.new_iterator();
161
162        // Clone the iterator
163        let mut iterator2 = iterator1;
164
165        // Both iterators should be able to get results
166        let result = iterator2.next();
167        assert!(result.is_some());
168
169        // Factory should only be called once despite having two iterators
170        assert_eq!(counter.load(Ordering::SeqCst), 1);
171    }
172
173    #[test]
174    fn test_multi_scan_empty() {
175        type Factory = Box<dyn FnOnce() -> VortexResult<Vec<ArrayFuture<i32>>> + Send + Sync>;
176        let closures: Vec<Factory> = vec![];
177
178        let multi_scan = MultiScan::new(closures);
179        let mut iterator = multi_scan.new_iterator();
180
181        // Should return None immediately
182        assert!(iterator.next().is_none());
183    }
184
185    #[test]
186    fn test_multi_scan_with_none_results() {
187        let closures = vec![|| -> VortexResult<Vec<ArrayFuture<Option<i32>>>> {
188            Ok(vec![
189                Box::pin(async { Ok(Some(None)) }),
190                Box::pin(async { Ok(Some(Some(1))) }),
191                Box::pin(async { Ok(Some(None)) }),
192            ])
193        }];
194
195        let multi_scan = MultiScan::new(closures);
196        let iterator = multi_scan.new_iterator();
197
198        let mut results = Vec::new();
199        for result in iterator {
200            if let Ok(Some(v)) = result {
201                results.push(v);
202            }
203        }
204
205        // Should only get the Some(1) value
206        assert_eq!(results, vec![1]);
207    }
208
209    #[test]
210    fn test_multi_scan_concurrent_iterators() {
211        let closures = vec![|| -> VortexResult<Vec<ArrayFuture<i32>>> {
212            Ok((1..=10)
213                .map(|i| Box::pin(async move { Ok(Some(i)) }) as ArrayFuture<i32>)
214                .collect())
215        }];
216
217        let multi_scan = MultiScan::new(closures);
218
219        // Create multiple iterators
220        let mut iter1 = multi_scan.clone().new_iterator();
221        let mut iter2 = multi_scan.new_iterator();
222
223        // Both should be able to steal work
224        let mut count1 = 0;
225        let mut count2 = 0;
226
227        // Interleave taking from both iterators
228        loop {
229            let done1 = iter1
230                .next()
231                .map(|r| {
232                    count1 += r.is_ok() as usize;
233                })
234                .is_none();
235            let done2 = iter2
236                .next()
237                .map(|r| {
238                    count2 += r.is_ok() as usize;
239                })
240                .is_none();
241
242            if done1 && done2 {
243                break;
244            }
245        }
246
247        // Together they should process all 10 items
248        assert_eq!(count1 + count2, 10);
249        // Both should have gotten some work
250        assert!(count1 > 0);
251        assert!(count2 > 0);
252    }
253
254    #[test]
255    fn test_local_pool_error_propagation() {
256        let closures = vec![|| -> VortexResult<Vec<ArrayFuture<String>>> {
257            Ok(vec![
258                Box::pin(async { Ok(Some("success".to_string())) }),
259                Box::pin(async { Err(vortex_err!("async error")) }),
260                Box::pin(async { Ok(Some("after_errors".to_string())) }),
261            ])
262        }];
263
264        let multi_scan = MultiScan::new(closures);
265        let iterator = multi_scan.new_iterator();
266
267        let mut results = Vec::new();
268        let mut errors = Vec::new();
269
270        for result in iterator {
271            match result {
272                Ok(v) => results.push(v),
273                Err(e) => errors.push(e),
274            }
275        }
276
277        // Should get both successful results and the error
278        assert!(results.contains(&"success".to_string()));
279        assert!(results.contains(&"after_errors".to_string()));
280        assert_eq!(errors.len(), 1);
281    }
282
283    #[test]
284    #[should_panic(expected = "Factory panic!")]
285    #[allow(clippy::panic)]
286    fn test_task_factory_panic_handling() {
287        // Test that panics in task factories are propagated
288        let closures = vec![|| -> VortexResult<Vec<ArrayFuture<i32>>> {
289            panic!("Factory panic!");
290        }];
291
292        let multi_scan = MultiScan::new(closures);
293        let iterator = multi_scan.new_iterator();
294
295        // This should panic when the factory is executed
296        for _ in iterator {
297            // Consume iterator
298        }
299    }
300}