vortex_layout/segments/
sink.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use futures::TryStreamExt as _;
7use parking_lot::Mutex;
8use vortex_array::stream::SendableArrayStream;
9use vortex_buffer::ByteBuffer;
10use vortex_error::VortexResult;
11
12use crate::segments::SegmentId;
13use crate::sequence::{SequenceId, SequencePointer};
14use crate::{SendableSequentialStream, SequentialStreamAdapter, SequentialStreamExt as _};
15
16pub trait SegmentWriter: Send + Sync {
17    /// Write the given data into a segment and associate it with the given segment identifier.
18    /// The provided buffers are concatenated together to form the segment.
19    ///
20    // TODO(ngates): in order to support aligned Direct I/O, it is preferable for all segments to
21    //  be aligned to the logical block size (typically 512, but could be 4096). For this reason,
22    //  if we know we're going to read an entire FlatLayout together, then we should probably
23    //  serialize it into a single segment that is 512 byte aligned? Or else, we should guarantee
24    //  to align the the first segment to 512, and then assume that coalescing captures the rest.
25    fn put(&mut self, segment_id: SegmentId, buffer: Vec<ByteBuffer>) -> VortexResult<()>;
26}
27
28/// Utility struct to associate SequenceId's with
29/// chunks in an array stream. It wraps a [SegmentWriter]
30/// and enforces SegmentId's sent to it are monotonically
31/// increasing.
32/// See [SequenceId] docs for more information.
33#[derive(Clone)]
34pub struct SequenceWriter {
35    state: Arc<Mutex<State>>,
36}
37
38struct State {
39    segment_writer: Box<dyn SegmentWriter>,
40    eof_pointer: SequencePointer,
41}
42
43impl SequenceWriter {
44    pub fn new(segment_writer: Box<dyn SegmentWriter>) -> Self {
45        let eof_pointer = SequenceId::root();
46        Self {
47            state: Arc::new(Mutex::new(State {
48                segment_writer,
49                eof_pointer,
50            })),
51        }
52    }
53
54    /// Wait until the given SequenceId is the first non dropped
55    /// instance among all that self::new_sequential is created.
56    /// Calls [SegmentWriter::put] with the resulting SegmentId.
57    /// See [SequenceId::collapse] docs for more information.
58    pub async fn put(
59        &self,
60        sequence_id: SequenceId,
61        buffer: Vec<ByteBuffer>,
62    ) -> VortexResult<SegmentId> {
63        let segment_id = sequence_id.collapse().await;
64        self.state.lock().segment_writer.put(segment_id, buffer)?;
65        Ok(segment_id)
66    }
67
68    /// Annotate an array stream with sequence identifiers.
69    ///
70    /// Each sequence id on an item
71    /// in the stream would come after all others that come before it.
72    ///
73    /// Consecutive calls would guarantee that all sequence id's associated
74    /// on the latter stream would come after all that were associated
75    /// with the former stream.
76    pub fn new_sequential(&self, stream: SendableArrayStream) -> SendableSequentialStream {
77        let mut sequence_pointer = self.tail();
78        SequentialStreamAdapter::new(
79            stream.dtype().clone(),
80            stream.map_ok(move |chunk| (sequence_pointer.advance(), chunk)),
81        )
82        .sendable()
83    }
84
85    fn tail(&self) -> SequencePointer {
86        let mut guard = self.state.lock();
87        let head = guard.eof_pointer.advance();
88        head.descend()
89    }
90}