vortex_scan/
multi_scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use futures::executor::LocalPool;
5use futures::future::BoxFuture;
6use vortex_error::VortexResult;
7
8use crate::work_queue::{TaskFactory, WorkStealingIterator, WorkStealingQueue};
9
10pub type ArrayFuture<T> = BoxFuture<'static, VortexResult<Option<T>>>;
11
12/// A multi-scan for executing multiple scans concurrently across workers.
13#[derive(Clone)]
14pub struct MultiScan<T> {
15    queue: WorkStealingQueue<ArrayFuture<T>>,
16}
17
18impl<T: 'static + Send> MultiScan<T> {
19    /// Created with lazily constructed scan builders closures.
20    pub fn new<I, F>(closures: I) -> Self
21    where
22        F: FnOnce() -> VortexResult<Vec<ArrayFuture<T>>> + 'static + Send + Sync,
23        I: IntoIterator<Item = F>,
24    {
25        Self {
26            queue: WorkStealingQueue::new(
27                closures
28                    .into_iter()
29                    .map(|closure| Box::new(closure) as TaskFactory<ArrayFuture<T>>),
30            ),
31        }
32    }
33
34    pub fn new_iterator(self) -> MultiScanIterator<T> {
35        MultiScanIterator {
36            inner: self.queue.new_iterator(),
37            local_pool: LocalPool::new(),
38        }
39    }
40}
41
42/// Scan iterator to participate in a `MultiScan`.
43pub struct MultiScanIterator<T> {
44    inner: WorkStealingIterator<ArrayFuture<T>>,
45    local_pool: LocalPool,
46}
47
48impl<T> Clone for MultiScanIterator<T> {
49    fn clone(&self) -> Self {
50        Self {
51            inner: self.inner.clone(),
52            local_pool: Default::default(),
53        }
54    }
55}
56
57impl<T: Send + Sync + 'static> Iterator for MultiScanIterator<T> {
58    type Item = VortexResult<T>;
59
60    fn next(&mut self) -> Option<VortexResult<T>> {
61        match self.inner.next()? {
62            Ok(task) => self.local_pool.run_until(task).transpose(),
63            Err(e) => Some(Err(e)),
64        }
65    }
66}