vortex_file/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::io::Write;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8
9use futures::FutureExt;
10use futures::StreamExt;
11use futures::TryStreamExt;
12use futures::future::Fuse;
13use futures::future::LocalBoxFuture;
14use futures::future::ready;
15use futures::pin_mut;
16use futures::select;
17use vortex_array::ArrayContext;
18use vortex_array::ArrayRef;
19use vortex_array::expr::stats::Stat;
20use vortex_array::iter::ArrayIterator;
21use vortex_array::iter::ArrayIteratorExt;
22use vortex_array::session::ArraySessionExt;
23use vortex_array::stats::PRUNING_STATS;
24use vortex_array::stream::ArrayStream;
25use vortex_array::stream::ArrayStreamAdapter;
26use vortex_array::stream::ArrayStreamExt;
27use vortex_array::stream::SendableArrayStream;
28use vortex_buffer::ByteBuffer;
29use vortex_dtype::DType;
30use vortex_error::VortexError;
31use vortex_error::VortexExpect;
32use vortex_error::VortexResult;
33use vortex_error::vortex_bail;
34use vortex_error::vortex_err;
35use vortex_io::IoBuf;
36use vortex_io::VortexWrite;
37use vortex_io::kanal_ext::KanalExt;
38use vortex_io::runtime::BlockingRuntime;
39use vortex_io::session::RuntimeSessionExt;
40use vortex_layout::LayoutStrategy;
41use vortex_layout::layouts::file_stats::accumulate_stats;
42use vortex_layout::sequence::SequenceId;
43use vortex_layout::sequence::SequentialStreamAdapter;
44use vortex_layout::sequence::SequentialStreamExt;
45use vortex_session::SessionExt;
46use vortex_session::VortexSession;
47
48use crate::Footer;
49use crate::MAGIC_BYTES;
50use crate::WriteStrategyBuilder;
51use crate::counting::CountingVortexWrite;
52use crate::footer::FileStatistics;
53use crate::segments::writer::BufferedSegmentSink;
54
55/// Configure a new writer, which can eventually be used to write an [`ArrayStream`] into a sink
56/// that implements [`VortexWrite`].
57///
58/// Unless overridden, the default [write strategy][crate::WriteStrategyBuilder] will be used with no
59/// additional configuration.
60pub struct VortexWriteOptions {
61    session: VortexSession,
62    strategy: Arc<dyn LayoutStrategy>,
63    exclude_dtype: bool,
64    max_variable_length_statistics_size: usize,
65    file_statistics: Vec<Stat>,
66}
67
68pub trait WriteOptionsSessionExt: SessionExt {
69    /// Create [`VortexWriteOptions`] for writing to a Vortex file.
70    fn write_options(&self) -> VortexWriteOptions {
71        VortexWriteOptions {
72            session: self.session(),
73            strategy: WriteStrategyBuilder::new().build(),
74            exclude_dtype: false,
75            file_statistics: PRUNING_STATS.to_vec(),
76            max_variable_length_statistics_size: 64,
77        }
78    }
79}
80impl<S: SessionExt> WriteOptionsSessionExt for S {}
81
82impl VortexWriteOptions {
83    /// Create a new [`VortexWriteOptions`] with the given session.
84    pub fn new(session: VortexSession) -> Self {
85        VortexWriteOptions {
86            session,
87            strategy: WriteStrategyBuilder::new().build(),
88            exclude_dtype: false,
89            file_statistics: PRUNING_STATS.to_vec(),
90            max_variable_length_statistics_size: 64,
91        }
92    }
93
94    /// Replace the default layout strategy with the provided one.
95    pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
96        self.strategy = strategy;
97        self
98    }
99
100    /// Exclude the DType from the Vortex file. You must provide the DType to the reader.
101    // TODO(ngates): Should we store some sort of DType checksum to make sure the one passed at
102    //  read-time is sane? I guess most layouts will have some reasonable validation.
103    pub fn exclude_dtype(mut self) -> Self {
104        self.exclude_dtype = true;
105        self
106    }
107
108    /// Configure which statistics to compute at the file-level.
109    pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
110        self.file_statistics = file_statistics;
111        self
112    }
113}
114
115impl VortexWriteOptions {
116    /// Drop into the blocking writer API using the given runtime.
117    pub fn blocking<B: BlockingRuntime>(self, runtime: &B) -> BlockingWrite<'_, B> {
118        BlockingWrite {
119            options: self,
120            runtime,
121        }
122    }
123
124    /// Write an [`ArrayStream`] as a Vortex file.
125    ///
126    /// Note that buffers are flushed as soon as they are available with no buffering, the caller
127    /// is responsible for deciding how to configure buffering on the underlying `Write` sink.
128    pub async fn write<W: VortexWrite + Unpin, S: ArrayStream + Send + 'static>(
129        self,
130        write: W,
131        stream: S,
132    ) -> VortexResult<WriteSummary> {
133        self.write_internal(write, ArrayStreamExt::boxed(stream))
134            .await
135    }
136
137    async fn write_internal<W: VortexWrite + Unpin>(
138        self,
139        mut write: W,
140        stream: SendableArrayStream,
141    ) -> VortexResult<WriteSummary> {
142        // NOTE(os): Setup an array context that already has all known encodings pre-populated.
143        // This is preferred for now over having an empty context here, because only the
144        // serialised array order is deterministic. The serialisation of arrays are done
145        // parallel and with an empty context they can register their encodings to the context
146        // in different order, changing the written bytes from run to run.
147        let ctx = ArrayContext::from_registry_sorted(self.session.arrays().registry());
148        let dtype = stream.dtype().clone();
149
150        let (mut ptr, eof) = SequenceId::root().split();
151
152        let stream = SequentialStreamAdapter::new(
153            dtype.clone(),
154            stream
155                .try_filter(|chunk| ready(!chunk.is_empty()))
156                .map(move |result| result.map(|chunk| (ptr.advance(), chunk))),
157        )
158        .sendable();
159        let (file_stats, stream) = accumulate_stats(
160            stream,
161            self.file_statistics.clone().into(),
162            self.max_variable_length_statistics_size,
163        );
164
165        // First, write the magic bytes.
166        write.write_all(ByteBuffer::copy_from(MAGIC_BYTES)).await?;
167        let mut position = MAGIC_BYTES.len() as u64;
168
169        // Create a channel to send buffers from the segment sink to the output stream.
170        let (send, recv) = kanal::bounded_async(1);
171
172        let segments = Arc::new(BufferedSegmentSink::new(send, position));
173
174        // We spawn the layout future so it is driven in the background while we write the
175        // buffer stream, so we don't need to poll it until all buffers have been drained.
176        let ctx2 = ctx.clone();
177        let layout_fut = self.session.handle().spawn_nested(|h| async move {
178            let layout = self
179                .strategy
180                .write_stream(ctx2, segments.clone(), stream, eof, h)
181                .await?;
182            Ok::<_, VortexError>((layout, segments.segment_specs()))
183        });
184
185        // Flush buffers as they arrive
186        let recv_stream = recv.into_stream();
187        pin_mut!(recv_stream);
188        while let Some(buffer) = recv_stream.next().await {
189            if buffer.is_empty() {
190                continue;
191            }
192            position += buffer.len() as u64;
193            write.write_all(buffer).await?;
194        }
195
196        let (layout, segment_specs) = layout_fut.await?;
197
198        // Assemble the Footer object now that we have all the segments.
199        let footer = Footer::new(
200            layout.clone(),
201            segment_specs,
202            if self.file_statistics.is_empty() {
203                None
204            } else {
205                Some(FileStatistics(file_stats.stats_sets().into()))
206            },
207            ctx,
208        );
209
210        // Emit the footer buffers and EOF.
211        let footer_buffers = footer
212            .clone()
213            .into_serializer()
214            .with_offset(position)
215            .with_exclude_dtype(self.exclude_dtype)
216            .serialize()?;
217        for buffer in footer_buffers {
218            position += buffer.len() as u64;
219            write.write_all(buffer).await?;
220        }
221
222        write.flush().await?;
223
224        Ok(WriteSummary {
225            footer,
226            size: position,
227        })
228    }
229
230    /// Create a push-based [`Writer`] that can be used to incrementally write arrays to the file.
231    pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> {
232        // Create a channel for sending arrays to the layout task.
233        let (arrays_send, arrays_recv) = kanal::bounded_async(1);
234
235        let arrays =
236            ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream()));
237
238        let write = CountingVortexWrite::new(write);
239        let bytes_written = write.counter();
240        let strategy = self.strategy.clone();
241        let future = self.write(write, arrays).boxed_local().fuse();
242
243        Writer {
244            arrays: Some(arrays_send),
245            future,
246            bytes_written,
247            strategy,
248        }
249    }
250}
251
252/// An async API for writing Vortex files.
253pub struct Writer<'w> {
254    // The input channel for sending arrays to the writer.
255    arrays: Option<kanal::AsyncSender<VortexResult<ArrayRef>>>,
256    // The writer task that ultimately produces the footer.
257    future: Fuse<LocalBoxFuture<'w, VortexResult<WriteSummary>>>,
258    // The bytes written so far.
259    bytes_written: Arc<AtomicU64>,
260    // The layout strategy that is being used for the write.
261    strategy: Arc<dyn LayoutStrategy>,
262}
263
264impl Writer<'_> {
265    /// Push a new chunk into the writer.
266    pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
267        let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
268        let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse();
269        pin_mut!(send_fut);
270
271        // We poll the writer future to continue writing bytes to the output, while waiting for
272        // enough room to push the next chunk into the channel.
273        select! {
274            result = send_fut => {
275                // If the send future failed, the writer has failed or panicked.
276                if result.is_err() {
277                    return Err(self.handle_failed_task().await);
278                }
279            },
280            result = &mut self.future => {
281                // Under normal operation, the writer future should never complete until
282                // finish() is called. Therefore, we can assume the writer has failed.
283                // The writer future has failed, we need to propagate the error.
284                match result {
285                    Ok(_) => vortex_bail!("Internal error: writer future completed early"),
286                    Err(e) => return Err(e),
287                }
288            }
289        }
290
291        Ok(())
292    }
293
294    /// Push an entire [`ArrayStream`] into the writer, consuming it.
295    ///
296    /// A task is spawned to consume the stream and push it into the writer, with the current
297    /// thread being used to write buffers to the output.
298    pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> {
299        let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
300        let stream_fut = async move {
301            while let Some(chunk) = stream.next().await {
302                arrays.send(chunk).await?;
303            }
304            Ok::<_, kanal::SendError>(())
305        }
306        .fuse();
307        pin_mut!(stream_fut);
308
309        // We poll the writer future to continue writing bytes to the output, while waiting for
310        // enough room to push the stream into the channel.
311        select! {
312            result = stream_fut => {
313                if let Err(_send_err) = result {
314                    // If the send future failed, the writer has failed or panicked.
315                    return Err(self.handle_failed_task().await);
316                }
317            }
318
319            result = &mut self.future => {
320                // Under normal operation, the writer future should never complete until
321                // finish() is called. Therefore, we can assume the writer has failed.
322                // The writer future has failed, we need to propagate the error.
323                match result {
324                    Ok(_) => vortex_bail!("Internal error: writer future completed early"),
325                    Err(e) => return Err(e),
326                }
327            }
328        }
329
330        Ok(())
331    }
332
333    /// Returns the number of bytes written to the file so far.
334    pub fn bytes_written(&self) -> u64 {
335        self.bytes_written
336            .load(std::sync::atomic::Ordering::Relaxed)
337    }
338
339    /// Returns the number of bytes currently buffered by the layout writers.
340    pub fn buffered_bytes(&self) -> u64 {
341        self.strategy.buffered_bytes()
342    }
343
344    /// Finish writing the Vortex file, flushing any remaining buffers and returning the
345    /// new file's footer.
346    pub async fn finish(mut self) -> VortexResult<WriteSummary> {
347        // Drop the input channel to signal EOF.
348        drop(self.arrays.take());
349
350        // Await the future task.
351        self.future.await
352    }
353
354    /// Assuming the writer task has failed, await it to get the error.
355    async fn handle_failed_task(&mut self) -> VortexError {
356        match (&mut self.future).await {
357            Ok(_) => vortex_err!(
358                "Internal error: writer task completed successfully but write future finished early"
359            ),
360            Err(e) => e,
361        }
362    }
363}
364
365/// A blocking API for writing Vortex files.
366pub struct BlockingWrite<'rt, B: BlockingRuntime> {
367    options: VortexWriteOptions,
368    runtime: &'rt B,
369}
370
371impl<'rt, B: BlockingRuntime> BlockingWrite<'rt, B> {
372    /// Write a Vortex file into the given `Write` sink.
373    pub fn write<W: Write + Unpin>(
374        self,
375        write: W,
376        iter: impl ArrayIterator + Send + 'static,
377    ) -> VortexResult<WriteSummary> {
378        self.runtime.block_on(async move {
379            self.options
380                .write(BlockingWriteAdapter(write), iter.into_array_stream())
381                .await
382        })
383    }
384
385    pub fn writer<'w, W: Write + Unpin + 'w>(
386        self,
387        write: W,
388        dtype: DType,
389    ) -> BlockingWriter<'rt, 'w, B> {
390        BlockingWriter {
391            writer: self.options.writer(BlockingWriteAdapter(write), dtype),
392            runtime: self.runtime,
393        }
394    }
395}
396
397/// A blocking adapter around a [`Writer`], allowing incremental writing of arrays to a Vortex file.
398pub struct BlockingWriter<'rt, 'w, B: BlockingRuntime> {
399    runtime: &'rt B,
400    writer: Writer<'w>,
401}
402
403impl<B: BlockingRuntime> BlockingWriter<'_, '_, B> {
404    pub fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
405        self.runtime.block_on(self.writer.push(chunk))
406    }
407
408    pub fn bytes_written(&self) -> u64 {
409        self.writer.bytes_written()
410    }
411
412    pub fn buffered_bytes(&self) -> u64 {
413        self.writer.buffered_bytes()
414    }
415
416    pub fn finish(self) -> VortexResult<WriteSummary> {
417        self.runtime.block_on(self.writer.finish())
418    }
419}
420
421// TODO(ngates): this blocking API may change, for now we just run blocking I/O inline.
422struct BlockingWriteAdapter<W>(W);
423
424impl<W: Write + Unpin> VortexWrite for BlockingWriteAdapter<W> {
425    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
426        self.0.write_all(buffer.as_slice())?;
427        Ok(buffer)
428    }
429
430    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
431        ready(self.0.flush())
432    }
433
434    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
435        ready(Ok(()))
436    }
437}
438
439pub struct WriteSummary {
440    footer: Footer,
441    size: u64,
442    // TODO(ngates): add a checksum
443}
444
445impl WriteSummary {
446    /// The footer of the written Vortex file.
447    pub fn footer(&self) -> &Footer {
448        &self.footer
449    }
450
451    /// The total size of the written Vortex file in bytes.
452    pub fn size(&self) -> u64 {
453        self.size
454    }
455
456    /// The footer of the written Vortex file.
457    pub fn row_count(&self) -> u64 {
458        self.footer.row_count()
459    }
460}