vortex_file/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::io::Write;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8
9use futures::FutureExt;
10use futures::StreamExt;
11use futures::TryStreamExt;
12use futures::future::Fuse;
13use futures::future::LocalBoxFuture;
14use futures::future::ready;
15use futures::pin_mut;
16use futures::select;
17use vortex_array::ArrayContext;
18use vortex_array::ArrayRef;
19use vortex_array::expr::stats::Stat;
20use vortex_array::iter::ArrayIterator;
21use vortex_array::iter::ArrayIteratorExt;
22use vortex_array::stats::PRUNING_STATS;
23use vortex_array::stream::ArrayStream;
24use vortex_array::stream::ArrayStreamAdapter;
25use vortex_array::stream::ArrayStreamExt;
26use vortex_array::stream::SendableArrayStream;
27use vortex_buffer::ByteBuffer;
28use vortex_dtype::DType;
29use vortex_error::VortexError;
30use vortex_error::VortexExpect;
31use vortex_error::VortexResult;
32use vortex_error::vortex_bail;
33use vortex_error::vortex_err;
34use vortex_io::IoBuf;
35use vortex_io::VortexWrite;
36use vortex_io::kanal_ext::KanalExt;
37use vortex_io::runtime::BlockingRuntime;
38use vortex_io::session::RuntimeSessionExt;
39use vortex_layout::LayoutStrategy;
40use vortex_layout::layouts::file_stats::accumulate_stats;
41use vortex_layout::sequence::SequenceId;
42use vortex_layout::sequence::SequentialStreamAdapter;
43use vortex_layout::sequence::SequentialStreamExt;
44use vortex_session::SessionExt;
45use vortex_session::VortexSession;
46
47use crate::Footer;
48use crate::MAGIC_BYTES;
49use crate::WriteStrategyBuilder;
50use crate::counting::CountingVortexWrite;
51use crate::footer::FileStatistics;
52use crate::segments::writer::BufferedSegmentSink;
53
54/// Configure a new writer, which can eventually be used to write an [`ArrayStream`] into a sink
55/// that implements [`VortexWrite`].
56///
57/// Unless overridden, the default [write strategy][crate::WriteStrategyBuilder] will be used with no
58/// additional configuration.
59pub struct VortexWriteOptions {
60    session: VortexSession,
61    strategy: Arc<dyn LayoutStrategy>,
62    exclude_dtype: bool,
63    max_variable_length_statistics_size: usize,
64    file_statistics: Vec<Stat>,
65}
66
67pub trait WriteOptionsSessionExt: SessionExt {
68    /// Create [`VortexWriteOptions`] for writing to a Vortex file.
69    fn write_options(&self) -> VortexWriteOptions {
70        VortexWriteOptions {
71            session: self.session(),
72            strategy: WriteStrategyBuilder::new().build(),
73            exclude_dtype: false,
74            file_statistics: PRUNING_STATS.to_vec(),
75            max_variable_length_statistics_size: 64,
76        }
77    }
78}
79impl<S: SessionExt> WriteOptionsSessionExt for S {}
80
81impl VortexWriteOptions {
82    /// Create a new [`VortexWriteOptions`] with the given session.
83    pub fn new(session: VortexSession) -> Self {
84        VortexWriteOptions {
85            session,
86            strategy: WriteStrategyBuilder::new().build(),
87            exclude_dtype: false,
88            file_statistics: PRUNING_STATS.to_vec(),
89            max_variable_length_statistics_size: 64,
90        }
91    }
92
93    /// Replace the default layout strategy with the provided one.
94    pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
95        self.strategy = strategy;
96        self
97    }
98
99    /// Exclude the DType from the Vortex file. You must provide the DType to the reader.
100    // TODO(ngates): Should we store some sort of DType checksum to make sure the one passed at
101    //  read-time is sane? I guess most layouts will have some reasonable validation.
102    pub fn exclude_dtype(mut self) -> Self {
103        self.exclude_dtype = true;
104        self
105    }
106
107    /// Configure which statistics to compute at the file-level.
108    pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
109        self.file_statistics = file_statistics;
110        self
111    }
112}
113
114impl VortexWriteOptions {
115    /// Drop into the blocking writer API using the given runtime.
116    pub fn blocking<B: BlockingRuntime>(self, runtime: &B) -> BlockingWrite<'_, B> {
117        BlockingWrite {
118            options: self,
119            runtime,
120        }
121    }
122
123    /// Write an [`ArrayStream`] as a Vortex file.
124    ///
125    /// Note that buffers are flushed as soon as they are available with no buffering, the caller
126    /// is responsible for deciding how to configure buffering on the underlying `Write` sink.
127    pub async fn write<W: VortexWrite + Unpin, S: ArrayStream + Send + 'static>(
128        self,
129        write: W,
130        stream: S,
131    ) -> VortexResult<WriteSummary> {
132        self.write_internal(write, ArrayStreamExt::boxed(stream))
133            .await
134    }
135
136    async fn write_internal<W: VortexWrite + Unpin>(
137        self,
138        mut write: W,
139        stream: SendableArrayStream,
140    ) -> VortexResult<WriteSummary> {
141        // Set up a Context to capture the encodings used in the file.
142        let ctx = ArrayContext::empty();
143        let dtype = stream.dtype().clone();
144
145        let (mut ptr, eof) = SequenceId::root().split();
146
147        let stream = SequentialStreamAdapter::new(
148            dtype.clone(),
149            stream
150                .try_filter(|chunk| ready(!chunk.is_empty()))
151                .map(move |result| result.map(|chunk| (ptr.advance(), chunk))),
152        )
153        .sendable();
154        let (file_stats, stream) = accumulate_stats(
155            stream,
156            self.file_statistics.clone().into(),
157            self.max_variable_length_statistics_size,
158        );
159
160        // First, write the magic bytes.
161        write.write_all(ByteBuffer::copy_from(MAGIC_BYTES)).await?;
162        let mut position = MAGIC_BYTES.len() as u64;
163
164        // Create a channel to send buffers from the segment sink to the output stream.
165        let (send, recv) = kanal::bounded_async(1);
166
167        let segments = Arc::new(BufferedSegmentSink::new(send, position));
168
169        // We spawn the layout future so it is driven in the background while we write the
170        // buffer stream, so we don't need to poll it until all buffers have been drained.
171        let ctx2 = ctx.clone();
172        let layout_fut = self.session.handle().spawn_nested(|h| async move {
173            let layout = self
174                .strategy
175                .write_stream(ctx2, segments.clone(), stream, eof, h)
176                .await?;
177            Ok::<_, VortexError>((layout, segments.segment_specs()))
178        });
179
180        // Flush buffers as they arrive
181        let recv_stream = recv.into_stream();
182        pin_mut!(recv_stream);
183        while let Some(buffer) = recv_stream.next().await {
184            if buffer.is_empty() {
185                continue;
186            }
187            position += buffer.len() as u64;
188            write.write_all(buffer).await?;
189        }
190
191        let (layout, segment_specs) = layout_fut.await?;
192
193        // Assemble the Footer object now that we have all the segments.
194        let footer = Footer::new(
195            layout.clone(),
196            segment_specs,
197            if self.file_statistics.is_empty() {
198                None
199            } else {
200                Some(FileStatistics(file_stats.stats_sets().into()))
201            },
202            ctx,
203        );
204
205        // Emit the footer buffers and EOF.
206        let footer_buffers = footer
207            .clone()
208            .into_serializer()
209            .with_offset(position)
210            .with_exclude_dtype(self.exclude_dtype)
211            .serialize()?;
212        for buffer in footer_buffers {
213            position += buffer.len() as u64;
214            write.write_all(buffer).await?;
215        }
216
217        write.flush().await?;
218
219        Ok(WriteSummary {
220            footer,
221            size: position,
222        })
223    }
224
225    /// Create a push-based [`Writer`] that can be used to incrementally write arrays to the file.
226    pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> {
227        // Create a channel for sending arrays to the layout task.
228        let (arrays_send, arrays_recv) = kanal::bounded_async(1);
229
230        let arrays =
231            ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream()));
232
233        let write = CountingVortexWrite::new(write);
234        let bytes_written = write.counter();
235        let strategy = self.strategy.clone();
236        let future = self.write(write, arrays).boxed_local().fuse();
237
238        Writer {
239            arrays: Some(arrays_send),
240            future,
241            bytes_written,
242            strategy,
243        }
244    }
245}
246
247/// An async API for writing Vortex files.
248pub struct Writer<'w> {
249    // The input channel for sending arrays to the writer.
250    arrays: Option<kanal::AsyncSender<VortexResult<ArrayRef>>>,
251    // The writer task that ultimately produces the footer.
252    future: Fuse<LocalBoxFuture<'w, VortexResult<WriteSummary>>>,
253    // The bytes written so far.
254    bytes_written: Arc<AtomicU64>,
255    // The layout strategy that is being used for the write.
256    strategy: Arc<dyn LayoutStrategy>,
257}
258
259impl Writer<'_> {
260    /// Push a new chunk into the writer.
261    pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
262        let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
263        let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse();
264        pin_mut!(send_fut);
265
266        // We poll the writer future to continue writing bytes to the output, while waiting for
267        // enough room to push the next chunk into the channel.
268        select! {
269            result = send_fut => {
270                // If the send future failed, the writer has failed or panicked.
271                if result.is_err() {
272                    return Err(self.handle_failed_task().await);
273                }
274            },
275            result = &mut self.future => {
276                // Under normal operation, the writer future should never complete until
277                // finish() is called. Therefore, we can assume the writer has failed.
278                // The writer future has failed, we need to propagate the error.
279                match result {
280                    Ok(_) => vortex_bail!("Internal error: writer future completed early"),
281                    Err(e) => return Err(e),
282                }
283            }
284        }
285
286        Ok(())
287    }
288
289    /// Push an entire [`ArrayStream`] into the writer, consuming it.
290    ///
291    /// A task is spawned to consume the stream and push it into the writer, with the current
292    /// thread being used to write buffers to the output.
293    pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> {
294        let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
295        let stream_fut = async move {
296            while let Some(chunk) = stream.next().await {
297                arrays.send(chunk).await?;
298            }
299            Ok::<_, kanal::SendError>(())
300        }
301        .fuse();
302        pin_mut!(stream_fut);
303
304        // We poll the writer future to continue writing bytes to the output, while waiting for
305        // enough room to push the stream into the channel.
306        select! {
307            result = stream_fut => {
308                if let Err(_send_err) = result {
309                    // If the send future failed, the writer has failed or panicked.
310                    return Err(self.handle_failed_task().await);
311                }
312            }
313
314            result = &mut self.future => {
315                // Under normal operation, the writer future should never complete until
316                // finish() is called. Therefore, we can assume the writer has failed.
317                // The writer future has failed, we need to propagate the error.
318                match result {
319                    Ok(_) => vortex_bail!("Internal error: writer future completed early"),
320                    Err(e) => return Err(e),
321                }
322            }
323        }
324
325        Ok(())
326    }
327
328    /// Returns the number of bytes written to the file so far.
329    pub fn bytes_written(&self) -> u64 {
330        self.bytes_written
331            .load(std::sync::atomic::Ordering::Relaxed)
332    }
333
334    /// Returns the number of bytes currently buffered by the layout writers.
335    pub fn buffered_bytes(&self) -> u64 {
336        self.strategy.buffered_bytes()
337    }
338
339    /// Finish writing the Vortex file, flushing any remaining buffers and returning the
340    /// new file's footer.
341    pub async fn finish(mut self) -> VortexResult<WriteSummary> {
342        // Drop the input channel to signal EOF.
343        drop(self.arrays.take());
344
345        // Await the future task.
346        self.future.await
347    }
348
349    /// Assuming the writer task has failed, await it to get the error.
350    async fn handle_failed_task(&mut self) -> VortexError {
351        match (&mut self.future).await {
352            Ok(_) => vortex_err!(
353                "Internal error: writer task completed successfully but write future finished early"
354            ),
355            Err(e) => e,
356        }
357    }
358}
359
360/// A blocking API for writing Vortex files.
361pub struct BlockingWrite<'rt, B: BlockingRuntime> {
362    options: VortexWriteOptions,
363    runtime: &'rt B,
364}
365
366impl<'rt, B: BlockingRuntime> BlockingWrite<'rt, B> {
367    /// Write a Vortex file into the given `Write` sink.
368    pub fn write<W: Write + Unpin>(
369        self,
370        write: W,
371        iter: impl ArrayIterator + Send + 'static,
372    ) -> VortexResult<WriteSummary> {
373        self.runtime.block_on(async move {
374            self.options
375                .write(BlockingWriteAdapter(write), iter.into_array_stream())
376                .await
377        })
378    }
379
380    pub fn writer<'w, W: Write + Unpin + 'w>(
381        self,
382        write: W,
383        dtype: DType,
384    ) -> BlockingWriter<'rt, 'w, B> {
385        BlockingWriter {
386            writer: self.options.writer(BlockingWriteAdapter(write), dtype),
387            runtime: self.runtime,
388        }
389    }
390}
391
392/// A blocking adapter around a [`Writer`], allowing incremental writing of arrays to a Vortex file.
393pub struct BlockingWriter<'rt, 'w, B: BlockingRuntime> {
394    runtime: &'rt B,
395    writer: Writer<'w>,
396}
397
398impl<B: BlockingRuntime> BlockingWriter<'_, '_, B> {
399    pub fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
400        self.runtime.block_on(self.writer.push(chunk))
401    }
402
403    pub fn bytes_written(&self) -> u64 {
404        self.writer.bytes_written()
405    }
406
407    pub fn buffered_bytes(&self) -> u64 {
408        self.writer.buffered_bytes()
409    }
410
411    pub fn finish(self) -> VortexResult<WriteSummary> {
412        self.runtime.block_on(self.writer.finish())
413    }
414}
415
416// TODO(ngates): this blocking API may change, for now we just run blocking I/O inline.
417struct BlockingWriteAdapter<W>(W);
418
419impl<W: Write + Unpin> VortexWrite for BlockingWriteAdapter<W> {
420    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
421        self.0.write_all(buffer.as_slice())?;
422        Ok(buffer)
423    }
424
425    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
426        ready(self.0.flush())
427    }
428
429    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
430        ready(Ok(()))
431    }
432}
433
434pub struct WriteSummary {
435    footer: Footer,
436    size: u64,
437    // TODO(ngates): add a checksum
438}
439
440impl WriteSummary {
441    /// The footer of the written Vortex file.
442    pub fn footer(&self) -> &Footer {
443        &self.footer
444    }
445
446    /// The total size of the written Vortex file in bytes.
447    pub fn size(&self) -> u64 {
448        self.size
449    }
450
451    /// The footer of the written Vortex file.
452    pub fn row_count(&self) -> u64 {
453        self.footer.row_count()
454    }
455}