vortex_array/stream/
adapter.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::task::Poll;
6
7use futures::Stream;
8use pin_project::pin_project;
9use vortex_dtype::DType;
10use vortex_error::VortexResult;
11
12use crate::ArrayRef;
13use crate::stream::ArrayStream;
14
15/// An adapter for a stream of array chunks to implement an ArrayReader.
16#[pin_project]
17pub struct ArrayStreamAdapter<S> {
18    dtype: DType,
19    #[pin]
20    inner: S,
21}
22
23impl<S> ArrayStreamAdapter<S>
24where
25    S: Stream<Item = VortexResult<ArrayRef>>,
26{
27    pub fn new(dtype: DType, inner: S) -> Self {
28        Self { dtype, inner }
29    }
30}
31
32impl<S> ArrayStream for ArrayStreamAdapter<S>
33where
34    S: Stream<Item = VortexResult<ArrayRef>>,
35{
36    fn dtype(&self) -> &DType {
37        &self.dtype
38    }
39}
40
41impl<S> Stream for ArrayStreamAdapter<S>
42where
43    S: Stream<Item = VortexResult<ArrayRef>>,
44{
45    type Item = VortexResult<ArrayRef>;
46
47    fn poll_next(
48        self: Pin<&mut Self>,
49        cx: &mut std::task::Context<'_>,
50    ) -> Poll<Option<Self::Item>> {
51        let this = self.project();
52        let array = futures::ready!(this.inner.poll_next(cx));
53        if let Some(Ok(array)) = array.as_ref() {
54            debug_assert_eq!(
55                array.dtype(),
56                this.dtype,
57                "ArrayStreamAdapter received an array with unexpected dtype"
58            );
59        }
60
61        Poll::Ready(array)
62    }
63
64    fn size_hint(&self) -> (usize, Option<usize>) {
65        self.inner.size_hint()
66    }
67}