vortex_array/stream/
adapter.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::task::Poll;
6
7use futures::Stream;
8use pin_project_lite::pin_project;
9use vortex_dtype::DType;
10use vortex_error::VortexResult;
11
12use crate::ArrayRef;
13use crate::stream::ArrayStream;
14
15pin_project! {
16    /// An adapter for a stream of array chunks to implement an ArrayReader.
17    pub struct ArrayStreamAdapter<S> {
18        dtype: DType,
19        #[pin]
20        inner: S,
21    }
22}
23
24impl<S> ArrayStreamAdapter<S>
25where
26    S: Stream<Item = VortexResult<ArrayRef>>,
27{
28    pub fn new(dtype: DType, inner: S) -> Self {
29        Self { dtype, inner }
30    }
31}
32
33impl<S> ArrayStream for ArrayStreamAdapter<S>
34where
35    S: Stream<Item = VortexResult<ArrayRef>>,
36{
37    fn dtype(&self) -> &DType {
38        &self.dtype
39    }
40}
41
42impl<S> Stream for ArrayStreamAdapter<S>
43where
44    S: Stream<Item = VortexResult<ArrayRef>>,
45{
46    type Item = VortexResult<ArrayRef>;
47
48    fn poll_next(
49        self: Pin<&mut Self>,
50        cx: &mut std::task::Context<'_>,
51    ) -> Poll<Option<Self::Item>> {
52        let this = self.project();
53        let array = futures::ready!(this.inner.poll_next(cx));
54        if let Some(Ok(array)) = array.as_ref() {
55            debug_assert_eq!(
56                array.dtype(),
57                this.dtype,
58                "ArrayStreamAdapter received an array with unexpected dtype"
59            );
60        }
61
62        Poll::Ready(array)
63    }
64
65    fn size_hint(&self) -> (usize, Option<usize>) {
66        self.inner.size_hint()
67    }
68}