vortex_array/
mask_future.rs1use std::future::Future;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::{BoxFuture, Shared};
9use futures::{FutureExt, TryFutureExt};
10use vortex_error::{SharedVortexResult, VortexError, VortexResult, vortex_panic};
11use vortex_mask::Mask;
12
13#[derive(Clone)]
15pub struct MaskFuture {
16 inner: Shared<BoxFuture<'static, SharedVortexResult<Mask>>>,
17 len: usize,
18}
19
20impl MaskFuture {
21 pub fn new<F>(len: usize, fut: F) -> Self
23 where
24 F: Future<Output = VortexResult<Mask>> + Send + 'static,
25 {
26 Self {
27 inner: fut
28 .inspect(move |r| {
29 if let Ok(mask) = r
30 && mask.len() != len {
31 vortex_panic!("MaskFuture created with future that returned mask of incorrect length (expected {}, got {})", len, mask.len());
32 }
33 })
34 .map_err(Arc::new)
35 .boxed()
36 .shared(),
37 len,
38 }
39 }
40
41 pub fn len(&self) -> usize {
43 self.len
44 }
45
46 pub fn is_empty(&self) -> bool {
48 self.len == 0
49 }
50
51 pub fn ready(mask: Mask) -> Self {
53 Self::new(mask.len(), async move { Ok(mask) })
54 }
55
56 pub fn new_true(row_count: usize) -> Self {
58 Self::ready(Mask::new_true(row_count))
59 }
60
61 pub fn slice(&self, range: Range<usize>) -> Self {
63 let inner = self.inner.clone();
64 Self::new(range.len(), async move { Ok(inner.await?.slice(range)) })
65 }
66}
67
68impl Future for MaskFuture {
69 type Output = VortexResult<Mask>;
70
71 fn poll(
72 mut self: std::pin::Pin<&mut Self>,
73 cx: &mut std::task::Context<'_>,
74 ) -> std::task::Poll<Self::Output> {
75 self.inner.poll_unpin(cx).map_err(VortexError::from)
76 }
77}