vortex_scan/
multi_scan.rs1use futures::executor::LocalPool;
5use futures::future::BoxFuture;
6use vortex_error::VortexResult;
7
8use crate::work_queue::{TaskFactory, WorkStealingIterator, WorkStealingQueue};
9
10pub type ArrayFuture<T> = BoxFuture<'static, VortexResult<Option<T>>>;
11
12#[derive(Clone)]
14pub struct MultiScan<T> {
15 queue: WorkStealingQueue<ArrayFuture<T>>,
16}
17
18impl<T: 'static + Send> MultiScan<T> {
19 pub fn new<I, F>(closures: I) -> Self
21 where
22 F: FnOnce() -> VortexResult<Vec<ArrayFuture<T>>> + 'static + Send + Sync,
23 I: IntoIterator<Item = F>,
24 {
25 Self {
26 queue: WorkStealingQueue::new(
27 closures
28 .into_iter()
29 .map(|closure| Box::new(closure) as TaskFactory<ArrayFuture<T>>),
30 ),
31 }
32 }
33
34 pub fn new_iterator(self) -> MultiScanIterator<T> {
35 MultiScanIterator {
36 inner: self.queue.new_iterator(),
37 local_pool: LocalPool::new(),
38 }
39 }
40}
41
42pub struct MultiScanIterator<T> {
44 inner: WorkStealingIterator<ArrayFuture<T>>,
45 local_pool: LocalPool,
46}
47
48impl<T> Clone for MultiScanIterator<T> {
49 fn clone(&self) -> Self {
50 Self {
51 inner: self.inner.clone(),
52 local_pool: Default::default(),
53 }
54 }
55}
56
57impl<T: Send + Sync + 'static> Iterator for MultiScanIterator<T> {
58 type Item = VortexResult<T>;
59
60 fn next(&mut self) -> Option<VortexResult<T>> {
61 loop {
62 match self.inner.next()? {
63 Ok(task) => match self.local_pool.run_until(task) {
64 Ok(Some(value)) => return Some(Ok(value)),
67 Ok(None) => continue,
68 Err(e) => return Some(Err(e)),
69 },
70 Err(e) => return Some(Err(e)),
71 }
72 }
73 }
74}
75
76#[cfg(test)]
77mod tests {
78 use std::sync::Arc;
79 use std::sync::atomic::{AtomicUsize, Ordering};
80
81 use vortex_error::{VortexResult, vortex_err};
82
83 use super::*;
84
85 #[test]
86 fn test_multi_scan_basic() {
87 let closures = vec![
89 || -> VortexResult<Vec<ArrayFuture<i32>>> {
90 Ok(vec![
91 Box::pin(async { Ok(Some(1)) }),
92 Box::pin(async { Ok(Some(2)) }),
93 ])
94 },
95 || -> VortexResult<Vec<ArrayFuture<i32>>> {
96 Ok(vec![
97 Box::pin(async { Ok(Some(3)) }),
98 Box::pin(async { Ok(Some(4)) }),
99 ])
100 },
101 ];
102
103 let multi_scan = MultiScan::new(closures);
104 let iterator = multi_scan.new_iterator();
105
106 let mut results = Vec::new();
107 for result in iterator {
108 results.push(result.unwrap());
109 }
110
111 assert_eq!(results.len(), 4);
113 assert!(results.contains(&1));
114 assert!(results.contains(&2));
115 assert!(results.contains(&3));
116 assert!(results.contains(&4));
117 }
118
119 #[test]
120 fn test_multi_scan_error_handling() {
121 let closures = vec![
123 || -> VortexResult<Vec<ArrayFuture<i32>>> { Ok(vec![Box::pin(async { Ok(Some(1)) })]) },
124 || -> VortexResult<Vec<ArrayFuture<i32>>> { Err(vortex_err!("Task factory error")) },
125 || -> VortexResult<Vec<ArrayFuture<i32>>> { Ok(vec![Box::pin(async { Ok(Some(2)) })]) },
126 ];
127
128 let multi_scan = MultiScan::new(closures);
129 let iterator = multi_scan.new_iterator();
130
131 let mut has_error = false;
132 let mut values = Vec::new();
133
134 for result in iterator {
135 match result {
136 Ok(v) => values.push(v),
137 Err(_) => has_error = true,
138 }
139 }
140
141 assert!(has_error, "Expected to encounter an error");
142 assert!(values.contains(&1) || values.contains(&2));
144 }
145
146 #[test]
147 fn test_multi_scan_iterator_clone() {
148 let counter = Arc::new(AtomicUsize::new(0));
149 let counter_clone = counter.clone();
150
151 let closures = vec![move || -> VortexResult<Vec<ArrayFuture<i32>>> {
152 counter_clone.fetch_add(1, Ordering::SeqCst);
153 Ok(vec![
154 Box::pin(async { Ok(Some(1)) }),
155 Box::pin(async { Ok(Some(2)) }),
156 ])
157 }];
158
159 let multi_scan = MultiScan::new(closures);
160 let iterator1 = multi_scan.new_iterator();
161
162 let mut iterator2 = iterator1;
164
165 let result = iterator2.next();
167 assert!(result.is_some());
168
169 assert_eq!(counter.load(Ordering::SeqCst), 1);
171 }
172
173 #[test]
174 fn test_multi_scan_empty() {
175 type Factory = Box<dyn FnOnce() -> VortexResult<Vec<ArrayFuture<i32>>> + Send + Sync>;
176 let closures: Vec<Factory> = vec![];
177
178 let multi_scan = MultiScan::new(closures);
179 let mut iterator = multi_scan.new_iterator();
180
181 assert!(iterator.next().is_none());
183 }
184
185 #[test]
186 fn test_multi_scan_with_none_results() {
187 let closures = vec![|| -> VortexResult<Vec<ArrayFuture<Option<i32>>>> {
188 Ok(vec![
189 Box::pin(async { Ok(Some(None)) }),
190 Box::pin(async { Ok(Some(Some(1))) }),
191 Box::pin(async { Ok(Some(None)) }),
192 ])
193 }];
194
195 let multi_scan = MultiScan::new(closures);
196 let iterator = multi_scan.new_iterator();
197
198 let mut results = Vec::new();
199 for result in iterator {
200 if let Ok(Some(v)) = result {
201 results.push(v);
202 }
203 }
204
205 assert_eq!(results, vec![1]);
207 }
208
209 #[test]
210 fn test_multi_scan_concurrent_iterators() {
211 let closures = vec![|| -> VortexResult<Vec<ArrayFuture<i32>>> {
212 Ok((1..=10)
213 .map(|i| Box::pin(async move { Ok(Some(i)) }) as ArrayFuture<i32>)
214 .collect())
215 }];
216
217 let multi_scan = MultiScan::new(closures);
218
219 let mut iter1 = multi_scan.clone().new_iterator();
221 let mut iter2 = multi_scan.new_iterator();
222
223 let mut count1 = 0;
225 let mut count2 = 0;
226
227 loop {
229 let done1 = iter1
230 .next()
231 .map(|r| {
232 count1 += r.is_ok() as usize;
233 })
234 .is_none();
235 let done2 = iter2
236 .next()
237 .map(|r| {
238 count2 += r.is_ok() as usize;
239 })
240 .is_none();
241
242 if done1 && done2 {
243 break;
244 }
245 }
246
247 assert_eq!(count1 + count2, 10);
249 assert!(count1 > 0);
251 assert!(count2 > 0);
252 }
253
254 #[test]
255 fn test_local_pool_error_propagation() {
256 let closures = vec![|| -> VortexResult<Vec<ArrayFuture<String>>> {
257 Ok(vec![
258 Box::pin(async { Ok(Some("success".to_string())) }),
259 Box::pin(async { Err(vortex_err!("async error")) }),
260 Box::pin(async { Ok(Some("after_errors".to_string())) }),
261 ])
262 }];
263
264 let multi_scan = MultiScan::new(closures);
265 let iterator = multi_scan.new_iterator();
266
267 let mut results = Vec::new();
268 let mut errors = Vec::new();
269
270 for result in iterator {
271 match result {
272 Ok(v) => results.push(v),
273 Err(e) => errors.push(e),
274 }
275 }
276
277 assert!(results.contains(&"success".to_string()));
279 assert!(results.contains(&"after_errors".to_string()));
280 assert_eq!(errors.len(), 1);
281 }
282
283 #[test]
284 #[should_panic(expected = "Factory panic!")]
285 #[allow(clippy::panic)]
286 fn test_task_factory_panic_handling() {
287 let closures = vec![|| -> VortexResult<Vec<ArrayFuture<i32>>> {
289 panic!("Factory panic!");
290 }];
291
292 let multi_scan = MultiScan::new(closures);
293 let iterator = multi_scan.new_iterator();
294
295 for _ in iterator {
297 }
299 }
300}