vortex_scan/
multi_scan.rs1use futures::executor::LocalPool;
5use futures::future::BoxFuture;
6use vortex_error::VortexResult;
7
8use crate::work_queue::{TaskFactory, WorkStealingIterator, WorkStealingQueue};
9
10pub type ArrayFuture<T> = BoxFuture<'static, VortexResult<Option<T>>>;
11
12#[derive(Clone)]
14pub struct MultiScan<T> {
15 queue: WorkStealingQueue<ArrayFuture<T>>,
16}
17
18impl<T: 'static + Send> MultiScan<T> {
19 pub fn new<I, F>(closures: I) -> Self
21 where
22 F: FnOnce() -> VortexResult<Vec<ArrayFuture<T>>> + 'static + Send + Sync,
23 I: IntoIterator<Item = F>,
24 {
25 Self {
26 queue: WorkStealingQueue::new(
27 closures
28 .into_iter()
29 .map(|closure| Box::new(closure) as TaskFactory<ArrayFuture<T>>),
30 ),
31 }
32 }
33
34 pub fn new_iterator(self) -> MultiScanIterator<T> {
35 MultiScanIterator {
36 inner: self.queue.new_iterator(),
37 local_pool: LocalPool::new(),
38 }
39 }
40}
41
42pub struct MultiScanIterator<T> {
44 inner: WorkStealingIterator<ArrayFuture<T>>,
45 local_pool: LocalPool,
46}
47
48impl<T> Clone for MultiScanIterator<T> {
49 fn clone(&self) -> Self {
50 Self {
51 inner: self.inner.clone(),
52 local_pool: Default::default(),
53 }
54 }
55}
56
57impl<T: Send + Sync + 'static> Iterator for MultiScanIterator<T> {
58 type Item = VortexResult<T>;
59
60 fn next(&mut self) -> Option<VortexResult<T>> {
61 match self.inner.next()? {
62 Ok(task) => self.local_pool.run_until(task).transpose(),
63 Err(e) => Some(Err(e)),
64 }
65 }
66}