Skip to main content

vortex_file/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::io::Write;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8
9use futures::FutureExt;
10use futures::StreamExt;
11use futures::TryStreamExt;
12use futures::future::Fuse;
13use futures::future::LocalBoxFuture;
14use futures::future::ready;
15use futures::pin_mut;
16use futures::select;
17use itertools::Itertools;
18use vortex_array::ArrayContext;
19use vortex_array::ArrayRef;
20use vortex_array::expr::stats::Stat;
21use vortex_array::iter::ArrayIterator;
22use vortex_array::iter::ArrayIteratorExt;
23use vortex_array::session::ArraySessionExt;
24use vortex_array::stats::PRUNING_STATS;
25use vortex_array::stream::ArrayStream;
26use vortex_array::stream::ArrayStreamAdapter;
27use vortex_array::stream::ArrayStreamExt;
28use vortex_array::stream::SendableArrayStream;
29use vortex_buffer::ByteBuffer;
30use vortex_dtype::DType;
31use vortex_error::VortexError;
32use vortex_error::VortexExpect;
33use vortex_error::VortexResult;
34use vortex_error::vortex_bail;
35use vortex_error::vortex_err;
36use vortex_io::IoBuf;
37use vortex_io::VortexWrite;
38use vortex_io::kanal_ext::KanalExt;
39use vortex_io::runtime::BlockingRuntime;
40use vortex_io::session::RuntimeSessionExt;
41use vortex_layout::LayoutStrategy;
42use vortex_layout::layouts::file_stats::accumulate_stats;
43use vortex_layout::sequence::SequenceId;
44use vortex_layout::sequence::SequentialStreamAdapter;
45use vortex_layout::sequence::SequentialStreamExt;
46use vortex_session::SessionExt;
47use vortex_session::VortexSession;
48
49use crate::Footer;
50use crate::MAGIC_BYTES;
51use crate::WriteStrategyBuilder;
52use crate::counting::CountingVortexWrite;
53use crate::footer::FileStatistics;
54use crate::segments::writer::BufferedSegmentSink;
55
56/// Configure a new writer, which can eventually be used to write an [`ArrayStream`] into a sink
57/// that implements [`VortexWrite`].
58///
59/// Unless overridden, the default [write strategy][crate::WriteStrategyBuilder] will be used with no
60/// additional configuration.
61pub struct VortexWriteOptions {
62    session: VortexSession,
63    strategy: Arc<dyn LayoutStrategy>,
64    exclude_dtype: bool,
65    max_variable_length_statistics_size: usize,
66    file_statistics: Vec<Stat>,
67}
68
69pub trait WriteOptionsSessionExt: SessionExt {
70    /// Create [`VortexWriteOptions`] for writing to a Vortex file.
71    fn write_options(&self) -> VortexWriteOptions {
72        let session = self.session();
73        VortexWriteOptions {
74            strategy: WriteStrategyBuilder::default().build(),
75            session,
76            exclude_dtype: false,
77            file_statistics: PRUNING_STATS.to_vec(),
78            max_variable_length_statistics_size: 64,
79        }
80    }
81}
82impl<S: SessionExt> WriteOptionsSessionExt for S {}
83
84impl VortexWriteOptions {
85    /// Create a new [`VortexWriteOptions`] with the given session.
86    pub fn new(session: VortexSession) -> Self {
87        VortexWriteOptions {
88            strategy: WriteStrategyBuilder::default().build(),
89            session,
90            exclude_dtype: false,
91            file_statistics: PRUNING_STATS.to_vec(),
92            max_variable_length_statistics_size: 64,
93        }
94    }
95
96    /// Replace the default layout strategy with the provided one.
97    pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
98        self.strategy = strategy;
99        self
100    }
101
102    /// Exclude the DType from the Vortex file. You must provide the DType to the reader.
103    // TODO(ngates): Should we store some sort of DType checksum to make sure the one passed at
104    //  read-time is sane? I guess most layouts will have some reasonable validation.
105    pub fn exclude_dtype(mut self) -> Self {
106        self.exclude_dtype = true;
107        self
108    }
109
110    /// Configure which statistics to compute at the file-level.
111    pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
112        self.file_statistics = file_statistics;
113        self
114    }
115}
116
117impl VortexWriteOptions {
118    /// Drop into the blocking writer API using the given runtime.
119    pub fn blocking<B: BlockingRuntime>(self, runtime: &B) -> BlockingWrite<'_, B> {
120        BlockingWrite {
121            options: self,
122            runtime,
123        }
124    }
125
126    /// Write an [`ArrayStream`] as a Vortex file.
127    ///
128    /// Note that buffers are flushed as soon as they are available with no buffering, the caller
129    /// is responsible for deciding how to configure buffering on the underlying `Write` sink.
130    pub async fn write<W: VortexWrite + Unpin, S: ArrayStream + Send + 'static>(
131        self,
132        write: W,
133        stream: S,
134    ) -> VortexResult<WriteSummary> {
135        self.write_internal(write, ArrayStreamExt::boxed(stream))
136            .await
137    }
138
139    async fn write_internal<W: VortexWrite + Unpin>(
140        self,
141        mut write: W,
142        stream: SendableArrayStream,
143    ) -> VortexResult<WriteSummary> {
144        // NOTE(os): Setup an array context that already has all known encodings pre-populated.
145        // This is preferred for now over having an empty context here, because only the
146        // serialised array order is deterministic. The serialisation of arrays are done
147        // parallel and with an empty context they can register their encodings to the context
148        // in different order, changing the written bytes from run to run.
149        let ctx = ArrayContext::new(self.session.arrays().registry().ids().sorted().collect())
150            // Configure a registry just to ensure only known encodings are interned.
151            .with_registry(self.session.arrays().registry().clone());
152        let dtype = stream.dtype().clone();
153
154        let (mut ptr, eof) = SequenceId::root().split();
155
156        let stream = SequentialStreamAdapter::new(
157            dtype.clone(),
158            stream
159                .try_filter(|chunk| ready(!chunk.is_empty()))
160                .map(move |result| result.map(|chunk| (ptr.advance(), chunk))),
161        )
162        .sendable();
163        let (file_stats, stream) = accumulate_stats(
164            stream,
165            self.file_statistics.clone().into(),
166            self.max_variable_length_statistics_size,
167        );
168
169        // First, write the magic bytes.
170        write.write_all(ByteBuffer::copy_from(MAGIC_BYTES)).await?;
171        let mut position = MAGIC_BYTES.len() as u64;
172
173        // Create a channel to send buffers from the segment sink to the output stream.
174        let (send, recv) = kanal::bounded_async(1);
175
176        let segments = Arc::new(BufferedSegmentSink::new(send, position));
177
178        // We spawn the layout future so it is driven in the background while we write the
179        // buffer stream, so we don't need to poll it until all buffers have been drained.
180        let ctx2 = ctx.clone();
181        let layout_fut = self.session.handle().spawn_nested(|h| async move {
182            let layout = self
183                .strategy
184                .write_stream(ctx2, segments.clone(), stream, eof, h)
185                .await?;
186            Ok::<_, VortexError>((layout, segments.segment_specs()))
187        });
188
189        // Flush buffers as they arrive
190        let recv_stream = recv.into_stream();
191        pin_mut!(recv_stream);
192        while let Some(buffer) = recv_stream.next().await {
193            if buffer.is_empty() {
194                continue;
195            }
196            position += buffer.len() as u64;
197            write.write_all(buffer).await?;
198        }
199
200        let (layout, segment_specs) = layout_fut.await?;
201
202        // Assemble the Footer object now that we have all the segments.
203        let footer = Footer::new(
204            layout.clone(),
205            segment_specs,
206            if self.file_statistics.is_empty() {
207                None
208            } else {
209                Some(FileStatistics::new_with_dtype(
210                    file_stats.stats_sets().into(),
211                    &dtype,
212                ))
213            },
214            ctx,
215        );
216
217        // Emit the footer buffers and EOF.
218        let footer_buffers = footer
219            .clone()
220            .into_serializer()
221            .with_offset(position)
222            .with_exclude_dtype(self.exclude_dtype)
223            .serialize()?;
224        for buffer in footer_buffers {
225            position += buffer.len() as u64;
226            write.write_all(buffer).await?;
227        }
228
229        write.flush().await?;
230
231        Ok(WriteSummary {
232            footer,
233            size: position,
234        })
235    }
236
237    /// Create a push-based [`Writer`] that can be used to incrementally write arrays to the file.
238    pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> {
239        // Create a channel for sending arrays to the layout task.
240        let (arrays_send, arrays_recv) = kanal::bounded_async(1);
241
242        let arrays =
243            ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream()));
244
245        let write = CountingVortexWrite::new(write);
246        let bytes_written = write.counter();
247        let strategy = self.strategy.clone();
248        let future = self.write(write, arrays).boxed_local().fuse();
249
250        Writer {
251            arrays: Some(arrays_send),
252            future,
253            bytes_written,
254            strategy,
255        }
256    }
257}
258
259/// An async API for writing Vortex files.
260pub struct Writer<'w> {
261    // The input channel for sending arrays to the writer.
262    arrays: Option<kanal::AsyncSender<VortexResult<ArrayRef>>>,
263    // The writer task that ultimately produces the footer.
264    future: Fuse<LocalBoxFuture<'w, VortexResult<WriteSummary>>>,
265    // The bytes written so far.
266    bytes_written: Arc<AtomicU64>,
267    // The layout strategy that is being used for the write.
268    strategy: Arc<dyn LayoutStrategy>,
269}
270
271impl Writer<'_> {
272    /// Push a new chunk into the writer.
273    pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
274        let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
275        let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse();
276        pin_mut!(send_fut);
277
278        // We poll the writer future to continue writing bytes to the output, while waiting for
279        // enough room to push the next chunk into the channel.
280        select! {
281            result = send_fut => {
282                // If the send future failed, the writer has failed or panicked.
283                if result.is_err() {
284                    return Err(self.handle_failed_task().await);
285                }
286            },
287            result = &mut self.future => {
288                // Under normal operation, the writer future should never complete until
289                // finish() is called. Therefore, we can assume the writer has failed.
290                // The writer future has failed, we need to propagate the error.
291                match result {
292                    Ok(_) => vortex_bail!("Internal error: writer future completed early"),
293                    Err(e) => return Err(e),
294                }
295            }
296        }
297
298        Ok(())
299    }
300
301    /// Push an entire [`ArrayStream`] into the writer, consuming it.
302    ///
303    /// A task is spawned to consume the stream and push it into the writer, with the current
304    /// thread being used to write buffers to the output.
305    pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> {
306        let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
307        let stream_fut = async move {
308            while let Some(chunk) = stream.next().await {
309                arrays.send(chunk).await?;
310            }
311            Ok::<_, kanal::SendError>(())
312        }
313        .fuse();
314        pin_mut!(stream_fut);
315
316        // We poll the writer future to continue writing bytes to the output, while waiting for
317        // enough room to push the stream into the channel.
318        select! {
319            result = stream_fut => {
320                if let Err(_send_err) = result {
321                    // If the send future failed, the writer has failed or panicked.
322                    return Err(self.handle_failed_task().await);
323                }
324            }
325
326            result = &mut self.future => {
327                // Under normal operation, the writer future should never complete until
328                // finish() is called. Therefore, we can assume the writer has failed.
329                // The writer future has failed, we need to propagate the error.
330                match result {
331                    Ok(_) => vortex_bail!("Internal error: writer future completed early"),
332                    Err(e) => return Err(e),
333                }
334            }
335        }
336
337        Ok(())
338    }
339
340    /// Returns the number of bytes written to the file so far.
341    pub fn bytes_written(&self) -> u64 {
342        self.bytes_written
343            .load(std::sync::atomic::Ordering::Relaxed)
344    }
345
346    /// Returns the number of bytes currently buffered by the layout writers.
347    pub fn buffered_bytes(&self) -> u64 {
348        self.strategy.buffered_bytes()
349    }
350
351    /// Finish writing the Vortex file, flushing any remaining buffers and returning the
352    /// new file's footer.
353    pub async fn finish(mut self) -> VortexResult<WriteSummary> {
354        // Drop the input channel to signal EOF.
355        drop(self.arrays.take());
356
357        // Await the future task.
358        self.future.await
359    }
360
361    /// Assuming the writer task has failed, await it to get the error.
362    async fn handle_failed_task(&mut self) -> VortexError {
363        match (&mut self.future).await {
364            Ok(_) => vortex_err!(
365                "Internal error: writer task completed successfully but write future finished early"
366            ),
367            Err(e) => e,
368        }
369    }
370}
371
372/// A blocking API for writing Vortex files.
373pub struct BlockingWrite<'rt, B: BlockingRuntime> {
374    options: VortexWriteOptions,
375    runtime: &'rt B,
376}
377
378impl<'rt, B: BlockingRuntime> BlockingWrite<'rt, B> {
379    /// Write a Vortex file into the given `Write` sink.
380    pub fn write<W: Write + Unpin>(
381        self,
382        write: W,
383        iter: impl ArrayIterator + Send + 'static,
384    ) -> VortexResult<WriteSummary> {
385        self.runtime.block_on(async move {
386            self.options
387                .write(BlockingWriteAdapter(write), iter.into_array_stream())
388                .await
389        })
390    }
391
392    pub fn writer<'w, W: Write + Unpin + 'w>(
393        self,
394        write: W,
395        dtype: DType,
396    ) -> BlockingWriter<'rt, 'w, B> {
397        BlockingWriter {
398            writer: self.options.writer(BlockingWriteAdapter(write), dtype),
399            runtime: self.runtime,
400        }
401    }
402}
403
404/// A blocking adapter around a [`Writer`], allowing incremental writing of arrays to a Vortex file.
405pub struct BlockingWriter<'rt, 'w, B: BlockingRuntime> {
406    runtime: &'rt B,
407    writer: Writer<'w>,
408}
409
410impl<B: BlockingRuntime> BlockingWriter<'_, '_, B> {
411    pub fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
412        self.runtime.block_on(self.writer.push(chunk))
413    }
414
415    pub fn bytes_written(&self) -> u64 {
416        self.writer.bytes_written()
417    }
418
419    pub fn buffered_bytes(&self) -> u64 {
420        self.writer.buffered_bytes()
421    }
422
423    pub fn finish(self) -> VortexResult<WriteSummary> {
424        self.runtime.block_on(self.writer.finish())
425    }
426}
427
428// TODO(ngates): this blocking API may change, for now we just run blocking I/O inline.
429struct BlockingWriteAdapter<W>(W);
430
431impl<W: Write + Unpin> VortexWrite for BlockingWriteAdapter<W> {
432    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
433        self.0.write_all(buffer.as_slice())?;
434        Ok(buffer)
435    }
436
437    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
438        ready(self.0.flush())
439    }
440
441    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
442        ready(Ok(()))
443    }
444}
445
446pub struct WriteSummary {
447    footer: Footer,
448    size: u64,
449    // TODO(ngates): add a checksum
450}
451
452impl WriteSummary {
453    /// The footer of the written Vortex file.
454    pub fn footer(&self) -> &Footer {
455        &self.footer
456    }
457
458    /// The total size of the written Vortex file in bytes.
459    pub fn size(&self) -> u64 {
460        self.size
461    }
462
463    /// The total number of rows in the written Vortex file.
464    pub fn row_count(&self) -> u64 {
465        self.footer.row_count()
466    }
467}