vortex_array/
mask_future.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future::Future;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::{BoxFuture, Shared};
9use futures::{FutureExt, TryFutureExt};
10use vortex_error::{SharedVortexResult, VortexError, VortexResult, vortex_panic};
11use vortex_mask::Mask;
12
13/// A future that resolves to a mask.
14#[derive(Clone)]
15pub struct MaskFuture {
16    inner: Shared<BoxFuture<'static, SharedVortexResult<Mask>>>,
17    len: usize,
18}
19
20impl MaskFuture {
21    /// Create a new MaskFuture from a future that returns a mask.
22    pub fn new<F>(len: usize, fut: F) -> Self
23    where
24        F: Future<Output = VortexResult<Mask>> + Send + 'static,
25    {
26        Self {
27            inner: fut
28                .inspect(move |r| {
29                    if let Ok(mask) = r
30                        && mask.len() != len {
31                            vortex_panic!("MaskFuture created with future that returned mask of incorrect length (expected {}, got {})", len, mask.len());
32                        }
33                })
34                .map_err(Arc::new)
35                .boxed()
36                .shared(),
37            len,
38        }
39    }
40
41    /// Returns the length of the mask.
42    pub fn len(&self) -> usize {
43        self.len
44    }
45
46    /// Returns true if the mask is empty.
47    pub fn is_empty(&self) -> bool {
48        self.len == 0
49    }
50
51    /// Create a MaskFuture from a ready mask.
52    pub fn ready(mask: Mask) -> Self {
53        Self::new(mask.len(), async move { Ok(mask) })
54    }
55
56    /// Create a MaskFuture that resolves to a mask with all values set to true.
57    pub fn new_true(row_count: usize) -> Self {
58        Self::ready(Mask::new_true(row_count))
59    }
60
61    /// Create a MaskFuture that resolves to a slice of the original mask.
62    pub fn slice(&self, range: Range<usize>) -> Self {
63        let inner = self.inner.clone();
64        Self::new(range.len(), async move { Ok(inner.await?.slice(range)) })
65    }
66}
67
68impl Future for MaskFuture {
69    type Output = VortexResult<Mask>;
70
71    fn poll(
72        mut self: std::pin::Pin<&mut Self>,
73        cx: &mut std::task::Context<'_>,
74    ) -> std::task::Poll<Self::Output> {
75        self.inner.poll_unpin(cx).map_err(VortexError::from)
76    }
77}