vortex_scan/
multi_scan.rs1use futures::executor::LocalPool;
5use futures::future::BoxFuture;
6use vortex_error::VortexResult;
7
8use crate::work_queue::{TaskFactory, WorkStealingIterator, WorkStealingQueue};
9
10pub type ArrayFuture<T> = BoxFuture<'static, VortexResult<Option<T>>>;
11
12#[derive(Clone)]
14pub struct MultiScan<T> {
15 queue: WorkStealingQueue<ArrayFuture<T>>,
16}
17
18impl<T: 'static + Send> MultiScan<T> {
19 pub fn new<I, F>(closures: I) -> Self
21 where
22 F: FnOnce() -> VortexResult<Vec<ArrayFuture<T>>> + 'static + Send + Sync,
23 I: IntoIterator<Item = F>,
24 {
25 Self {
26 queue: WorkStealingQueue::new(
27 closures
28 .into_iter()
29 .map(|closure| Box::new(closure) as TaskFactory<ArrayFuture<T>>),
30 ),
31 }
32 }
33
34 pub fn new_iterator(self) -> MultiScanIterator<T> {
35 MultiScanIterator {
36 inner: self.queue.new_iterator(),
37 local_pool: LocalPool::new(),
38 }
39 }
40}
41
42pub struct MultiScanIterator<T> {
44 inner: WorkStealingIterator<ArrayFuture<T>>,
45 local_pool: LocalPool,
46}
47
48impl<T> Clone for MultiScanIterator<T> {
49 fn clone(&self) -> Self {
50 Self {
51 inner: self.inner.clone(),
52 local_pool: Default::default(),
53 }
54 }
55}
56
57impl<T: Send + Sync + 'static> Iterator for MultiScanIterator<T> {
58 type Item = VortexResult<T>;
59
60 #[inline]
61 fn next(&mut self) -> Option<VortexResult<T>> {
62 loop {
63 match self.inner.next()? {
64 Ok(task) => match self.local_pool.run_until(task) {
65 Ok(Some(value)) => return Some(Ok(value)),
68 Ok(None) => continue,
69 Err(e) => return Some(Err(e)),
70 },
71 Err(e) => return Some(Err(e)),
72 }
73 }
74 }
75}
76
77#[cfg(test)]
78mod tests {
79 use std::sync::Arc;
80 use std::sync::atomic::{AtomicUsize, Ordering};
81
82 use vortex_error::{VortexResult, vortex_err};
83
84 use super::*;
85
86 #[test]
87 fn test_multi_scan_basic() {
88 let closures = vec![
90 || -> VortexResult<Vec<ArrayFuture<i32>>> {
91 Ok(vec![
92 Box::pin(async { Ok(Some(1)) }),
93 Box::pin(async { Ok(Some(2)) }),
94 ])
95 },
96 || -> VortexResult<Vec<ArrayFuture<i32>>> {
97 Ok(vec![
98 Box::pin(async { Ok(Some(3)) }),
99 Box::pin(async { Ok(Some(4)) }),
100 ])
101 },
102 ];
103
104 let multi_scan = MultiScan::new(closures);
105 let iterator = multi_scan.new_iterator();
106
107 let mut results = Vec::new();
108 for result in iterator {
109 results.push(result.unwrap());
110 }
111
112 assert_eq!(results.len(), 4);
114 assert!(results.contains(&1));
115 assert!(results.contains(&2));
116 assert!(results.contains(&3));
117 assert!(results.contains(&4));
118 }
119
120 #[test]
121 fn test_multi_scan_error_handling() {
122 let closures = vec![
124 || -> VortexResult<Vec<ArrayFuture<i32>>> { Ok(vec![Box::pin(async { Ok(Some(1)) })]) },
125 || -> VortexResult<Vec<ArrayFuture<i32>>> { Err(vortex_err!("Task factory error")) },
126 || -> VortexResult<Vec<ArrayFuture<i32>>> { Ok(vec![Box::pin(async { Ok(Some(2)) })]) },
127 ];
128
129 let multi_scan = MultiScan::new(closures);
130 let iterator = multi_scan.new_iterator();
131
132 let mut has_error = false;
133 let mut values = Vec::new();
134
135 for result in iterator {
136 match result {
137 Ok(v) => values.push(v),
138 Err(_) => has_error = true,
139 }
140 }
141
142 assert!(has_error, "Expected to encounter an error");
143 assert!(values.contains(&1) || values.contains(&2));
145 }
146
147 #[test]
148 fn test_multi_scan_iterator_clone() {
149 let counter = Arc::new(AtomicUsize::new(0));
150 let counter_clone = counter.clone();
151
152 let closures = vec![move || -> VortexResult<Vec<ArrayFuture<i32>>> {
153 counter_clone.fetch_add(1, Ordering::SeqCst);
154 Ok(vec![
155 Box::pin(async { Ok(Some(1)) }),
156 Box::pin(async { Ok(Some(2)) }),
157 ])
158 }];
159
160 let multi_scan = MultiScan::new(closures);
161 let iterator1 = multi_scan.new_iterator();
162
163 let mut iterator2 = iterator1;
165
166 let result = iterator2.next();
168 assert!(result.is_some());
169
170 assert_eq!(counter.load(Ordering::SeqCst), 1);
172 }
173
174 #[test]
175 fn test_multi_scan_empty() {
176 type Factory = Box<dyn FnOnce() -> VortexResult<Vec<ArrayFuture<i32>>> + Send + Sync>;
177 let closures: Vec<Factory> = vec![];
178
179 let multi_scan = MultiScan::new(closures);
180 let mut iterator = multi_scan.new_iterator();
181
182 assert!(iterator.next().is_none());
184 }
185
186 #[test]
187 fn test_multi_scan_with_none_results() {
188 let closures = vec![|| -> VortexResult<Vec<ArrayFuture<Option<i32>>>> {
189 Ok(vec![
190 Box::pin(async { Ok(Some(None)) }),
191 Box::pin(async { Ok(Some(Some(1))) }),
192 Box::pin(async { Ok(Some(None)) }),
193 ])
194 }];
195
196 let multi_scan = MultiScan::new(closures);
197 let iterator = multi_scan.new_iterator();
198
199 let mut results = Vec::new();
200 for result in iterator {
201 if let Ok(Some(v)) = result {
202 results.push(v);
203 }
204 }
205
206 assert_eq!(results, vec![1]);
208 }
209
210 #[test]
211 fn test_multi_scan_concurrent_iterators() {
212 let closures = vec![|| -> VortexResult<Vec<ArrayFuture<i32>>> {
213 Ok((1..=10)
214 .map(|i| Box::pin(async move { Ok(Some(i)) }) as ArrayFuture<i32>)
215 .collect())
216 }];
217
218 let multi_scan = MultiScan::new(closures);
219
220 let mut iter1 = multi_scan.clone().new_iterator();
222 let mut iter2 = multi_scan.new_iterator();
223
224 let mut count1 = 0;
226 let mut count2 = 0;
227
228 loop {
230 let done1 = iter1
231 .next()
232 .map(|r| {
233 count1 += r.is_ok() as usize;
234 })
235 .is_none();
236 let done2 = iter2
237 .next()
238 .map(|r| {
239 count2 += r.is_ok() as usize;
240 })
241 .is_none();
242
243 if done1 && done2 {
244 break;
245 }
246 }
247
248 assert_eq!(count1 + count2, 10);
250 assert!(count1 > 0);
252 assert!(count2 > 0);
253 }
254
255 #[test]
256 fn test_local_pool_error_propagation() {
257 let closures = vec![|| -> VortexResult<Vec<ArrayFuture<String>>> {
258 Ok(vec![
259 Box::pin(async { Ok(Some("success".to_string())) }),
260 Box::pin(async { Err(vortex_err!("async error")) }),
261 Box::pin(async { Ok(Some("after_errors".to_string())) }),
262 ])
263 }];
264
265 let multi_scan = MultiScan::new(closures);
266 let iterator = multi_scan.new_iterator();
267
268 let mut results = Vec::new();
269 let mut errors = Vec::new();
270
271 for result in iterator {
272 match result {
273 Ok(v) => results.push(v),
274 Err(e) => errors.push(e),
275 }
276 }
277
278 assert!(results.contains(&"success".to_string()));
280 assert!(results.contains(&"after_errors".to_string()));
281 assert_eq!(errors.len(), 1);
282 }
283
284 #[test]
285 #[should_panic(expected = "Factory panic!")]
286 fn test_task_factory_panic_handling() {
287 let closures = vec![|| -> VortexResult<Vec<ArrayFuture<i32>>> {
289 panic!("Factory panic!");
290 }];
291
292 let multi_scan = MultiScan::new(closures);
293 let iterator = multi_scan.new_iterator();
294
295 for _ in iterator {
297 }
299 }
300}