vortex_layout/
strategy.rs1use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use async_trait::async_trait;
8use futures::Stream;
9use pin_project_lite::pin_project;
10use vortex_array::{ArrayContext, ArrayRef};
11use vortex_dtype::DType;
12use vortex_error::VortexResult;
13
14use crate::LayoutRef;
15use crate::segments::SequenceWriter;
16use crate::sequence::SequenceId;
17
18pub trait SequentialStream: Stream<Item = VortexResult<(SequenceId, ArrayRef)>> {
19 fn dtype(&self) -> &DType;
20}
21
22pub type SendableSequentialStream = Pin<Box<dyn SequentialStream + Send>>;
23
24impl SequentialStream for SendableSequentialStream {
25 fn dtype(&self) -> &DType {
26 (**self).dtype()
27 }
28}
29
30#[async_trait]
32pub trait LayoutStrategy: 'static + Send + Sync {
33 async fn write_stream(
55 &self,
56 ctx: &ArrayContext,
57 sequence_writer: SequenceWriter,
58 stream: SendableSequentialStream,
59 ) -> VortexResult<LayoutRef>;
60}
61pub trait SequentialStreamExt: SequentialStream {
64 fn sendable(self) -> SendableSequentialStream
66 where
67 Self: Sized + Send + 'static,
68 {
69 Box::pin(self)
70 }
71}
72
73impl<S: SequentialStream> SequentialStreamExt for S {}
74
75pin_project! {
76 pub struct SequentialStreamAdapter<S> {
77 dtype: DType,
78 #[pin]
79 inner: S,
80 }
81}
82
83impl<S> SequentialStreamAdapter<S> {
84 pub fn new(dtype: DType, inner: S) -> Self {
85 Self { dtype, inner }
86 }
87}
88
89impl<S> SequentialStream for SequentialStreamAdapter<S>
90where
91 S: Stream<Item = VortexResult<(SequenceId, ArrayRef)>>,
92{
93 fn dtype(&self) -> &DType {
94 &self.dtype
95 }
96}
97
98impl<S> Stream for SequentialStreamAdapter<S>
99where
100 S: Stream<Item = VortexResult<(SequenceId, ArrayRef)>>,
101{
102 type Item = VortexResult<(SequenceId, ArrayRef)>;
103
104 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
105 let this = self.project();
106 let array = futures::ready!(this.inner.poll_next(cx));
107 if let Some(Ok((_, array))) = array.as_ref() {
108 assert_eq!(
109 array.dtype(),
110 this.dtype,
111 "Sequential stream of {} got chunk of {}.",
112 array.dtype(),
113 this.dtype
114 );
115 }
116
117 Poll::Ready(array)
118 }
119
120 fn size_hint(&self) -> (usize, Option<usize>) {
121 self.inner.size_hint()
122 }
123}