vortex_array/stream/
adapter.rs1use std::pin::Pin;
5use std::task::Poll;
6
7use futures_util::Stream;
8use pin_project::pin_project;
9use vortex_dtype::DType;
10use vortex_error::VortexResult;
11
12use crate::ArrayRef;
13use crate::stream::ArrayStream;
14
15#[pin_project]
17pub struct ArrayStreamAdapter<S> {
18 dtype: DType,
19 #[pin]
20 inner: S,
21}
22
23impl<S> ArrayStreamAdapter<S> {
24 pub fn new(dtype: DType, inner: S) -> Self {
25 Self { dtype, inner }
26 }
27}
28
29impl<S> ArrayStream for ArrayStreamAdapter<S>
30where
31 S: Stream<Item = VortexResult<ArrayRef>>,
32{
33 fn dtype(&self) -> &DType {
34 &self.dtype
35 }
36}
37
38impl<S> Stream for ArrayStreamAdapter<S>
39where
40 S: Stream<Item = VortexResult<ArrayRef>>,
41{
42 type Item = VortexResult<ArrayRef>;
43
44 fn poll_next(
45 self: Pin<&mut Self>,
46 cx: &mut std::task::Context<'_>,
47 ) -> Poll<Option<Self::Item>> {
48 let this = self.project();
49 let array = futures_util::ready!(this.inner.poll_next(cx));
50 if let Some(Ok(array)) = array.as_ref() {
51 debug_assert_eq!(array.dtype(), this.dtype);
52 }
53
54 Poll::Ready(array)
55 }
56
57 fn size_hint(&self) -> (usize, Option<usize>) {
58 self.inner.size_hint()
59 }
60}