vortex_array/stream/
adapter.rs

1use std::pin::Pin;
2use std::task::Poll;
3
4use futures_util::Stream;
5use pin_project::pin_project;
6use vortex_dtype::DType;
7use vortex_error::VortexResult;
8
9use crate::ArrayRef;
10use crate::stream::ArrayStream;
11
12/// An adapter for a stream of array chunks to implement an ArrayReader.
13#[pin_project]
14pub struct ArrayStreamAdapter<S> {
15    dtype: DType,
16    #[pin]
17    inner: S,
18}
19
20impl<S> ArrayStreamAdapter<S> {
21    pub fn new(dtype: DType, inner: S) -> Self {
22        Self { dtype, inner }
23    }
24}
25
26impl<S> ArrayStream for ArrayStreamAdapter<S>
27where
28    S: Stream<Item = VortexResult<ArrayRef>>,
29{
30    fn dtype(&self) -> &DType {
31        &self.dtype
32    }
33}
34
35impl<S> Stream for ArrayStreamAdapter<S>
36where
37    S: Stream<Item = VortexResult<ArrayRef>>,
38{
39    type Item = VortexResult<ArrayRef>;
40
41    fn poll_next(
42        self: Pin<&mut Self>,
43        cx: &mut std::task::Context<'_>,
44    ) -> Poll<Option<Self::Item>> {
45        let this = self.project();
46        let array = futures_util::ready!(this.inner.poll_next(cx));
47        if let Some(Ok(array)) = array.as_ref() {
48            debug_assert_eq!(array.dtype(), this.dtype);
49        }
50
51        Poll::Ready(array)
52    }
53
54    fn size_hint(&self) -> (usize, Option<usize>) {
55        self.inner.size_hint()
56    }
57}