vortex_layout/
strategy.rs1use std::pin::Pin;
10use std::task::{Context, Poll};
11
12use futures::Stream;
13use pin_project_lite::pin_project;
14use vortex_array::{ArrayContext, ArrayRef};
15use vortex_dtype::DType;
16use vortex_error::VortexResult;
17
18use crate::SendableLayoutFuture;
19use crate::segments::SequenceWriter;
20use crate::sequence::SequenceId;
21
22pub trait SequentialStream: Stream<Item = VortexResult<(SequenceId, ArrayRef)>> {
23 fn dtype(&self) -> &DType;
24}
25
26pub type SendableSequentialStream = Pin<Box<dyn SequentialStream + Send>>;
27
28impl SequentialStream for SendableSequentialStream {
29 fn dtype(&self) -> &DType {
30 (**self).dtype()
31 }
32}
33
34pub trait LayoutStrategy: 'static + Send + Sync {
35 fn write_stream(
36 &self,
37 ctx: &ArrayContext,
38 sequence_writer: SequenceWriter,
39 stream: SendableSequentialStream,
40 ) -> SendableLayoutFuture;
41}
42
43pub trait SequentialStreamExt: SequentialStream {
44 fn sendable(self) -> SendableSequentialStream
46 where
47 Self: Sized + Send + 'static,
48 {
49 Box::pin(self)
50 }
51}
52
53impl<S: SequentialStream> SequentialStreamExt for S {}
54
55pin_project! {
56 pub struct SequentialStreamAdapter<S> {
57 dtype: DType,
58 #[pin]
59 inner: S,
60 }
61}
62
63impl<S> SequentialStreamAdapter<S> {
64 pub fn new(dtype: DType, inner: S) -> Self {
65 Self { dtype, inner }
66 }
67}
68
69impl<S> SequentialStream for SequentialStreamAdapter<S>
70where
71 S: Stream<Item = VortexResult<(SequenceId, ArrayRef)>>,
72{
73 fn dtype(&self) -> &DType {
74 &self.dtype
75 }
76}
77
78impl<S> Stream for SequentialStreamAdapter<S>
79where
80 S: Stream<Item = VortexResult<(SequenceId, ArrayRef)>>,
81{
82 type Item = VortexResult<(SequenceId, ArrayRef)>;
83
84 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85 let this = self.project();
86 let array = futures::ready!(this.inner.poll_next(cx));
87 if let Some(Ok((_, array))) = array.as_ref() {
88 assert_eq!(
89 array.dtype(),
90 this.dtype,
91 "Sequential stream of {} got chunk of {}.",
92 array.dtype(),
93 this.dtype
94 );
95 }
96
97 Poll::Ready(array)
98 }
99
100 fn size_hint(&self) -> (usize, Option<usize>) {
101 self.inner.size_hint()
102 }
103}