vortex_layout/
strategy.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Each [`LayoutWriter`] is passed horizontal chunks of a Vortex array one-by-one, and is
5//! eventually asked to return a [`crate::LayoutData`]. The writers can buffer, re-chunk, flush, or
6//! otherwise manipulate the chunks of data enabling experimentation with different strategies
7//! all while remaining independent of the read code.
8
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12use futures::Stream;
13use pin_project_lite::pin_project;
14use vortex_array::{ArrayContext, ArrayRef};
15use vortex_dtype::DType;
16use vortex_error::VortexResult;
17
18use crate::SendableLayoutFuture;
19use crate::segments::SequenceWriter;
20use crate::sequence::SequenceId;
21
22pub trait SequentialStream: Stream<Item = VortexResult<(SequenceId, ArrayRef)>> {
23    fn dtype(&self) -> &DType;
24}
25
26pub type SendableSequentialStream = Pin<Box<dyn SequentialStream + Send>>;
27
28impl SequentialStream for SendableSequentialStream {
29    fn dtype(&self) -> &DType {
30        (**self).dtype()
31    }
32}
33
34pub trait LayoutStrategy: 'static + Send + Sync {
35    fn write_stream(
36        &self,
37        ctx: &ArrayContext,
38        sequence_writer: SequenceWriter,
39        stream: SendableSequentialStream,
40    ) -> SendableLayoutFuture;
41}
42
43pub trait SequentialStreamExt: SequentialStream {
44    // not named boxed to prevent clashing with StreamExt
45    fn sendable(self) -> SendableSequentialStream
46    where
47        Self: Sized + Send + 'static,
48    {
49        Box::pin(self)
50    }
51}
52
53impl<S: SequentialStream> SequentialStreamExt for S {}
54
55pin_project! {
56    pub struct SequentialStreamAdapter<S> {
57        dtype: DType,
58        #[pin]
59        inner: S,
60    }
61}
62
63impl<S> SequentialStreamAdapter<S> {
64    pub fn new(dtype: DType, inner: S) -> Self {
65        Self { dtype, inner }
66    }
67}
68
69impl<S> SequentialStream for SequentialStreamAdapter<S>
70where
71    S: Stream<Item = VortexResult<(SequenceId, ArrayRef)>>,
72{
73    fn dtype(&self) -> &DType {
74        &self.dtype
75    }
76}
77
78impl<S> Stream for SequentialStreamAdapter<S>
79where
80    S: Stream<Item = VortexResult<(SequenceId, ArrayRef)>>,
81{
82    type Item = VortexResult<(SequenceId, ArrayRef)>;
83
84    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85        let this = self.project();
86        let array = futures::ready!(this.inner.poll_next(cx));
87        if let Some(Ok((_, array))) = array.as_ref() {
88            assert_eq!(
89                array.dtype(),
90                this.dtype,
91                "Sequential stream of {} got chunk of {}.",
92                array.dtype(),
93                this.dtype
94            );
95        }
96
97        Poll::Ready(array)
98    }
99
100    fn size_hint(&self) -> (usize, Option<usize>) {
101        self.inner.size_hint()
102    }
103}