vortex_file/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::io::Write;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8
9use futures::future::{Fuse, LocalBoxFuture, ready};
10use futures::{FutureExt, StreamExt, TryStreamExt, pin_mut, select};
11use vortex_array::iter::{ArrayIterator, ArrayIteratorExt};
12use vortex_array::stats::{PRUNING_STATS, Stat};
13use vortex_array::stream::{ArrayStream, ArrayStreamAdapter, ArrayStreamExt, SendableArrayStream};
14use vortex_array::{ArrayContext, ArrayRef};
15use vortex_buffer::ByteBuffer;
16use vortex_dtype::DType;
17use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err};
18use vortex_io::kanal_ext::KanalExt;
19use vortex_io::runtime::BlockingRuntime;
20use vortex_io::session::RuntimeSessionExt;
21use vortex_io::{IoBuf, VortexWrite};
22use vortex_layout::LayoutStrategy;
23use vortex_layout::layouts::file_stats::accumulate_stats;
24use vortex_layout::sequence::{SequenceId, SequentialStreamAdapter, SequentialStreamExt};
25use vortex_session::{SessionExt, VortexSession};
26
27use crate::counting::CountingVortexWrite;
28use crate::footer::FileStatistics;
29use crate::segments::writer::BufferedSegmentSink;
30use crate::{Footer, MAGIC_BYTES, WriteStrategyBuilder};
31
32/// Configure a new writer, which can eventually be used to write an [`ArrayStream`] into a sink
33/// that implements [`VortexWrite`].
34///
35/// Unless overridden, the default [write strategy][crate::WriteStrategyBuilder] will be used with no
36/// additional configuration.
37pub struct VortexWriteOptions {
38    session: VortexSession,
39    strategy: Arc<dyn LayoutStrategy>,
40    exclude_dtype: bool,
41    max_variable_length_statistics_size: usize,
42    file_statistics: Vec<Stat>,
43}
44
45pub trait WriteOptionsSessionExt: SessionExt {
46    /// Create [`VortexWriteOptions`] for writing to a Vortex file.
47    fn write_options(&self) -> VortexWriteOptions {
48        VortexWriteOptions {
49            session: self.session(),
50            strategy: WriteStrategyBuilder::new().build(),
51            exclude_dtype: false,
52            file_statistics: PRUNING_STATS.to_vec(),
53            max_variable_length_statistics_size: 64,
54        }
55    }
56}
57impl<S: SessionExt> WriteOptionsSessionExt for S {}
58
59impl VortexWriteOptions {
60    /// Create a new [`VortexWriteOptions`] with the given session.
61    pub fn new(session: VortexSession) -> Self {
62        VortexWriteOptions {
63            session,
64            strategy: WriteStrategyBuilder::new().build(),
65            exclude_dtype: false,
66            file_statistics: PRUNING_STATS.to_vec(),
67            max_variable_length_statistics_size: 64,
68        }
69    }
70
71    /// Replace the default layout strategy with the provided one.
72    pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
73        self.strategy = strategy;
74        self
75    }
76
77    /// Exclude the DType from the Vortex file. You must provide the DType to the reader.
78    // TODO(ngates): Should we store some sort of DType checksum to make sure the one passed at
79    //  read-time is sane? I guess most layouts will have some reasonable validation.
80    pub fn exclude_dtype(mut self) -> Self {
81        self.exclude_dtype = true;
82        self
83    }
84
85    /// Configure which statistics to compute at the file-level.
86    pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
87        self.file_statistics = file_statistics;
88        self
89    }
90}
91
92impl VortexWriteOptions {
93    /// Drop into the blocking writer API using the given runtime.
94    pub fn blocking<B: BlockingRuntime>(self, runtime: &B) -> BlockingWrite<'_, B> {
95        BlockingWrite {
96            options: self,
97            runtime,
98        }
99    }
100
101    /// Write an [`ArrayStream`] as a Vortex file.
102    ///
103    /// Note that buffers are flushed as soon as they are available with no buffering, the caller
104    /// is responsible for deciding how to configure buffering on the underlying `Write` sink.
105    pub async fn write<W: VortexWrite + Unpin, S: ArrayStream + Send + 'static>(
106        self,
107        write: W,
108        stream: S,
109    ) -> VortexResult<WriteSummary> {
110        self.write_internal(write, ArrayStreamExt::boxed(stream))
111            .await
112    }
113
114    async fn write_internal<W: VortexWrite + Unpin>(
115        self,
116        mut write: W,
117        stream: SendableArrayStream,
118    ) -> VortexResult<WriteSummary> {
119        // Set up a Context to capture the encodings used in the file.
120        let ctx = ArrayContext::empty();
121        let dtype = stream.dtype().clone();
122
123        let (mut ptr, eof) = SequenceId::root().split();
124
125        let stream = SequentialStreamAdapter::new(
126            dtype.clone(),
127            stream
128                .try_filter(|chunk| ready(!chunk.is_empty()))
129                .map(move |result| result.map(|chunk| (ptr.advance(), chunk))),
130        )
131        .sendable();
132        let (file_stats, stream) = accumulate_stats(
133            stream,
134            self.file_statistics.clone().into(),
135            self.max_variable_length_statistics_size,
136        );
137
138        // First, write the magic bytes.
139        write.write_all(ByteBuffer::copy_from(MAGIC_BYTES)).await?;
140        let mut position = MAGIC_BYTES.len() as u64;
141
142        // Create a channel to send buffers from the segment sink to the output stream.
143        let (send, recv) = kanal::bounded_async(1);
144
145        let segments = Arc::new(BufferedSegmentSink::new(send, position));
146
147        // We spawn the layout future so it is driven in the background while we write the
148        // buffer stream, so we don't need to poll it until all buffers have been drained.
149        let ctx2 = ctx.clone();
150        let layout_fut = self.session.handle().spawn_nested(|h| async move {
151            let layout = self
152                .strategy
153                .write_stream(ctx2, segments.clone(), stream, eof, h)
154                .await?;
155            Ok::<_, VortexError>((layout, segments.segment_specs()))
156        });
157
158        // Flush buffers as they arrive
159        let recv_stream = recv.into_stream();
160        pin_mut!(recv_stream);
161        while let Some(buffer) = recv_stream.next().await {
162            if buffer.is_empty() {
163                continue;
164            }
165            position += buffer.len() as u64;
166            write.write_all(buffer).await?;
167        }
168
169        let (layout, segment_specs) = layout_fut.await?;
170
171        // Assemble the Footer object now that we have all the segments.
172        let footer = Footer::new(
173            layout.clone(),
174            segment_specs,
175            if self.file_statistics.is_empty() {
176                None
177            } else {
178                Some(FileStatistics(file_stats.stats_sets().into()))
179            },
180            ctx,
181        );
182
183        // Emit the footer buffers and EOF.
184        let footer_buffers = footer
185            .clone()
186            .into_serializer()
187            .with_offset(position)
188            .with_exclude_dtype(self.exclude_dtype)
189            .serialize()?;
190        for buffer in footer_buffers {
191            position += buffer.len() as u64;
192            write.write_all(buffer).await?;
193        }
194
195        write.flush().await?;
196
197        Ok(WriteSummary {
198            footer,
199            size: position,
200        })
201    }
202
203    /// Create a push-based [`Writer`] that can be used to incrementally write arrays to the file.
204    pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> {
205        // Create a channel for sending arrays to the layout task.
206        let (arrays_send, arrays_recv) = kanal::bounded_async(1);
207
208        let arrays =
209            ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream()));
210
211        let write = CountingVortexWrite::new(write);
212        let bytes_written = write.counter();
213        let strategy = self.strategy.clone();
214        let future = self.write(write, arrays).boxed_local().fuse();
215
216        Writer {
217            arrays: Some(arrays_send),
218            future,
219            bytes_written,
220            strategy,
221        }
222    }
223}
224
225/// An async API for writing Vortex files.
226pub struct Writer<'w> {
227    // The input channel for sending arrays to the writer.
228    arrays: Option<kanal::AsyncSender<VortexResult<ArrayRef>>>,
229    // The writer task that ultimately produces the footer.
230    future: Fuse<LocalBoxFuture<'w, VortexResult<WriteSummary>>>,
231    // The bytes written so far.
232    bytes_written: Arc<AtomicU64>,
233    // The layout strategy that is being used for the write.
234    strategy: Arc<dyn LayoutStrategy>,
235}
236
237impl Writer<'_> {
238    /// Push a new chunk into the writer.
239    pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
240        let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
241        let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse();
242        pin_mut!(send_fut);
243
244        // We poll the writer future to continue writing bytes to the output, while waiting for
245        // enough room to push the next chunk into the channel.
246        select! {
247            result = send_fut => {
248                // If the send future failed, the writer has failed or panicked.
249                if result.is_err() {
250                    return Err(self.handle_failed_task().await);
251                }
252            },
253            result = &mut self.future => {
254                // Under normal operation, the writer future should never complete until
255                // finish() is called. Therefore, we can assume the writer has failed.
256                // The writer future has failed, we need to propagate the error.
257                match result {
258                    Ok(_) => vortex_bail!("Internal error: writer future completed early"),
259                    Err(e) => return Err(e),
260                }
261            }
262        }
263
264        Ok(())
265    }
266
267    /// Push an entire [`ArrayStream`] into the writer, consuming it.
268    ///
269    /// A task is spawned to consume the stream and push it into the writer, with the current
270    /// thread being used to write buffers to the output.
271    pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> {
272        let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
273        let stream_fut = async move {
274            while let Some(chunk) = stream.next().await {
275                arrays.send(chunk).await?;
276            }
277            Ok::<_, kanal::SendError>(())
278        }
279        .fuse();
280        pin_mut!(stream_fut);
281
282        // We poll the writer future to continue writing bytes to the output, while waiting for
283        // enough room to push the stream into the channel.
284        select! {
285            result = stream_fut => {
286                if let Err(_send_err) = result {
287                    // If the send future failed, the writer has failed or panicked.
288                    return Err(self.handle_failed_task().await);
289                }
290            }
291
292            result = &mut self.future => {
293                // Under normal operation, the writer future should never complete until
294                // finish() is called. Therefore, we can assume the writer has failed.
295                // The writer future has failed, we need to propagate the error.
296                match result {
297                    Ok(_) => vortex_bail!("Internal error: writer future completed early"),
298                    Err(e) => return Err(e),
299                }
300            }
301        }
302
303        Ok(())
304    }
305
306    /// Returns the number of bytes written to the file so far.
307    pub fn bytes_written(&self) -> u64 {
308        self.bytes_written
309            .load(std::sync::atomic::Ordering::Relaxed)
310    }
311
312    /// Returns the number of bytes currently buffered by the layout writers.
313    pub fn buffered_bytes(&self) -> u64 {
314        self.strategy.buffered_bytes()
315    }
316
317    /// Finish writing the Vortex file, flushing any remaining buffers and returning the
318    /// new file's footer.
319    pub async fn finish(mut self) -> VortexResult<WriteSummary> {
320        // Drop the input channel to signal EOF.
321        drop(self.arrays.take());
322
323        // Await the future task.
324        self.future.await
325    }
326
327    /// Assuming the writer task has failed, await it to get the error.
328    async fn handle_failed_task(&mut self) -> VortexError {
329        match (&mut self.future).await {
330            Ok(_) => vortex_err!(
331                "Internal error: writer task completed successfully but write future finished early"
332            ),
333            Err(e) => e,
334        }
335    }
336}
337
338/// A blocking API for writing Vortex files.
339pub struct BlockingWrite<'rt, B: BlockingRuntime> {
340    options: VortexWriteOptions,
341    runtime: &'rt B,
342}
343
344impl<'rt, B: BlockingRuntime> BlockingWrite<'rt, B> {
345    /// Write a Vortex file into the given `Write` sink.
346    pub fn write<W: Write + Unpin>(
347        self,
348        write: W,
349        iter: impl ArrayIterator + Send + 'static,
350    ) -> VortexResult<WriteSummary> {
351        self.runtime.block_on(async move {
352            self.options
353                .write(BlockingWriteAdapter(write), iter.into_array_stream())
354                .await
355        })
356    }
357
358    pub fn writer<'w, W: Write + Unpin + 'w>(
359        self,
360        write: W,
361        dtype: DType,
362    ) -> BlockingWriter<'rt, 'w, B> {
363        BlockingWriter {
364            writer: self.options.writer(BlockingWriteAdapter(write), dtype),
365            runtime: self.runtime,
366        }
367    }
368}
369
370/// A blocking adapter around a [`Writer`], allowing incremental writing of arrays to a Vortex file.
371pub struct BlockingWriter<'rt, 'w, B: BlockingRuntime> {
372    runtime: &'rt B,
373    writer: Writer<'w>,
374}
375
376impl<B: BlockingRuntime> BlockingWriter<'_, '_, B> {
377    pub fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
378        self.runtime.block_on(self.writer.push(chunk))
379    }
380
381    pub fn bytes_written(&self) -> u64 {
382        self.writer.bytes_written()
383    }
384
385    pub fn buffered_bytes(&self) -> u64 {
386        self.writer.buffered_bytes()
387    }
388
389    pub fn finish(self) -> VortexResult<WriteSummary> {
390        self.runtime.block_on(self.writer.finish())
391    }
392}
393
394// TODO(ngates): this blocking API may change, for now we just run blocking I/O inline.
395struct BlockingWriteAdapter<W>(W);
396
397impl<W: Write + Unpin> VortexWrite for BlockingWriteAdapter<W> {
398    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
399        self.0.write_all(buffer.as_slice())?;
400        Ok(buffer)
401    }
402
403    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
404        ready(self.0.flush())
405    }
406
407    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
408        ready(Ok(()))
409    }
410}
411
412pub struct WriteSummary {
413    footer: Footer,
414    size: u64,
415    // TODO(ngates): add a checksum
416}
417
418impl WriteSummary {
419    /// The footer of the written Vortex file.
420    pub fn footer(&self) -> &Footer {
421        &self.footer
422    }
423
424    /// The total size of the written Vortex file in bytes.
425    pub fn size(&self) -> u64 {
426        self.size
427    }
428
429    /// The footer of the written Vortex file.
430    pub fn row_count(&self) -> u64 {
431        self.footer.row_count()
432    }
433}