vortex_layout/segments/sink.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use futures::TryStreamExt as _;
7use parking_lot::Mutex;
8use vortex_array::stream::SendableArrayStream;
9use vortex_buffer::ByteBuffer;
10use vortex_error::VortexResult;
11
12use crate::segments::SegmentId;
13use crate::sequence::{SequenceId, SequencePointer};
14use crate::{SendableSequentialStream, SequentialStreamAdapter, SequentialStreamExt as _};
15
16pub trait SegmentWriter: Send + Sync {
17 /// Write the given data into a segment and associate it with the given segment identifier.
18 /// The provided buffers are concatenated together to form the segment.
19 ///
20 // TODO(ngates): in order to support aligned Direct I/O, it is preferable for all segments to
21 // be aligned to the logical block size (typically 512, but could be 4096). For this reason,
22 // if we know we're going to read an entire FlatLayout together, then we should probably
23 // serialize it into a single segment that is 512 byte aligned? Or else, we should guarantee
24 // to align the the first segment to 512, and then assume that coalescing captures the rest.
25 fn put(&mut self, segment_id: SegmentId, buffer: Vec<ByteBuffer>) -> VortexResult<()>;
26}
27
28/// Utility struct to associate SequenceId's with
29/// chunks in an array stream. It wraps a [SegmentWriter]
30/// and enforces SegmentId's sent to it are monotonically
31/// increasing.
32/// See [SequenceId] docs for more information.
33#[derive(Clone)]
34pub struct SequenceWriter {
35 state: Arc<Mutex<State>>,
36}
37
38struct State {
39 segment_writer: Box<dyn SegmentWriter>,
40 eof_pointer: SequencePointer,
41}
42
43impl SequenceWriter {
44 pub fn new(segment_writer: Box<dyn SegmentWriter>) -> Self {
45 let eof_pointer = SequenceId::root();
46 Self {
47 state: Arc::new(Mutex::new(State {
48 segment_writer,
49 eof_pointer,
50 })),
51 }
52 }
53
54 /// Wait until the given SequenceId is the first non dropped
55 /// instance among all that self::new_sequential is created.
56 /// Calls [SegmentWriter::put] with the resulting SegmentId.
57 /// See [SequenceId::collapse] docs for more information.
58 pub async fn put(
59 &self,
60 sequence_id: SequenceId,
61 buffer: Vec<ByteBuffer>,
62 ) -> VortexResult<SegmentId> {
63 let segment_id = sequence_id.collapse().await;
64 self.state.lock().segment_writer.put(segment_id, buffer)?;
65 Ok(segment_id)
66 }
67
68 /// Annotate an array stream with sequence identifiers.
69 ///
70 /// Each sequence id on an item
71 /// in the stream would come after all others that come before it.
72 ///
73 /// Consecutive calls would guarantee that all sequence id's associated
74 /// on the latter stream would come after all that were associated
75 /// with the former stream.
76 pub fn new_sequential(&self, stream: SendableArrayStream) -> SendableSequentialStream {
77 let mut sequence_pointer = self.tail();
78 SequentialStreamAdapter::new(
79 stream.dtype().clone(),
80 stream.map_ok(move |chunk| (sequence_pointer.advance(), chunk)),
81 )
82 .sendable()
83 }
84
85 fn tail(&self) -> SequencePointer {
86 let mut guard = self.state.lock();
87 let head = guard.eof_pointer.advance();
88 head.descend()
89 }
90}