vortex_layout/
strategy.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use async_trait::async_trait;
8use futures::Stream;
9use pin_project_lite::pin_project;
10use vortex_array::{ArrayContext, ArrayRef};
11use vortex_dtype::DType;
12use vortex_error::VortexResult;
13
14use crate::LayoutRef;
15use crate::segments::SequenceWriter;
16use crate::sequence::SequenceId;
17
18pub trait SequentialStream: Stream<Item = VortexResult<(SequenceId, ArrayRef)>> {
19    fn dtype(&self) -> &DType;
20}
21
22pub type SendableSequentialStream = Pin<Box<dyn SequentialStream + Send>>;
23
24impl SequentialStream for SendableSequentialStream {
25    fn dtype(&self) -> &DType {
26        (**self).dtype()
27    }
28}
29
30// [layout writer]
31#[async_trait]
32pub trait LayoutStrategy: 'static + Send + Sync {
33    /// Asynchronously process an ordered stream of array chunks, emitting them into a sink and
34    /// returning the [`Layout`][crate::Layout] instance that can be parsed to retrieve the data
35    /// from rest.
36    ///
37    /// This trait uses the `#[async_trait]` attribute to denote that trait objects of this type
38    /// can be `Box`ed or `Arc`ed and shared around. Commonly, these strategies are composed to
39    /// form a pipeline of operations, each of which modifies the chunk stream in some way before
40    /// passing the data on to a downstream writer.
41    ///
42    /// # Blocking operations
43    ///
44    /// This is an async trait method, which will return a `BoxFuture` that you can await from
45    /// any runtime. Implementations should avoid directly performing blocking work within the
46    /// `write_stream`, and should instead spawn it onto an appropriate runtime or threadpool
47    /// dedicated to such work.
48    ///
49    /// Such operations are common, and include things like compression and parsing large blobs
50    /// of data, or serializing very large messages to flatbuffers.
51    ///
52    /// Consider accepting a [`TaskExecutor`][crate::TaskExecutor] as an input to your strategy
53    /// to support spawning this work in the background.
54    async fn write_stream(
55        &self,
56        ctx: &ArrayContext,
57        sequence_writer: SequenceWriter,
58        stream: SendableSequentialStream,
59    ) -> VortexResult<LayoutRef>;
60}
61// [layout writer]
62
63pub trait SequentialStreamExt: SequentialStream {
64    // not named boxed to prevent clashing with StreamExt
65    fn sendable(self) -> SendableSequentialStream
66    where
67        Self: Sized + Send + 'static,
68    {
69        Box::pin(self)
70    }
71}
72
73impl<S: SequentialStream> SequentialStreamExt for S {}
74
75pin_project! {
76    pub struct SequentialStreamAdapter<S> {
77        dtype: DType,
78        #[pin]
79        inner: S,
80    }
81}
82
83impl<S> SequentialStreamAdapter<S> {
84    pub fn new(dtype: DType, inner: S) -> Self {
85        Self { dtype, inner }
86    }
87}
88
89impl<S> SequentialStream for SequentialStreamAdapter<S>
90where
91    S: Stream<Item = VortexResult<(SequenceId, ArrayRef)>>,
92{
93    fn dtype(&self) -> &DType {
94        &self.dtype
95    }
96}
97
98impl<S> Stream for SequentialStreamAdapter<S>
99where
100    S: Stream<Item = VortexResult<(SequenceId, ArrayRef)>>,
101{
102    type Item = VortexResult<(SequenceId, ArrayRef)>;
103
104    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
105        let this = self.project();
106        let array = futures::ready!(this.inner.poll_next(cx));
107        if let Some(Ok((_, array))) = array.as_ref() {
108            assert_eq!(
109                array.dtype(),
110                this.dtype,
111                "Sequential stream of {} got chunk of {}.",
112                array.dtype(),
113                this.dtype
114            );
115        }
116
117        Poll::Ready(array)
118    }
119
120    fn size_hint(&self) -> (usize, Option<usize>) {
121        self.inner.size_hint()
122    }
123}