Skip to main content

vortex_file/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::io::Write;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8
9use futures::FutureExt;
10use futures::StreamExt;
11use futures::TryStreamExt;
12use futures::future::Fuse;
13use futures::future::LocalBoxFuture;
14use futures::future::ready;
15use futures::pin_mut;
16use futures::select;
17use itertools::Itertools;
18use vortex_array::ArrayContext;
19use vortex_array::ArrayRef;
20use vortex_array::dtype::DType;
21use vortex_array::expr::stats::Stat;
22use vortex_array::iter::ArrayIterator;
23use vortex_array::iter::ArrayIteratorExt;
24use vortex_array::session::ArraySessionExt;
25use vortex_array::stats::PRUNING_STATS;
26use vortex_array::stream::ArrayStream;
27use vortex_array::stream::ArrayStreamAdapter;
28use vortex_array::stream::ArrayStreamExt;
29use vortex_array::stream::SendableArrayStream;
30use vortex_buffer::ByteBuffer;
31use vortex_error::VortexError;
32use vortex_error::VortexExpect;
33use vortex_error::VortexResult;
34use vortex_error::vortex_bail;
35use vortex_error::vortex_err;
36use vortex_io::IoBuf;
37use vortex_io::VortexWrite;
38use vortex_io::kanal_ext::KanalExt;
39use vortex_io::runtime::BlockingRuntime;
40use vortex_io::session::RuntimeSessionExt;
41use vortex_layout::LayoutStrategy;
42use vortex_layout::layouts::file_stats::accumulate_stats;
43use vortex_layout::sequence::SequenceId;
44use vortex_layout::sequence::SequentialStreamAdapter;
45use vortex_layout::sequence::SequentialStreamExt;
46use vortex_session::SessionExt;
47use vortex_session::VortexSession;
48use vortex_session::registry::ReadContext;
49
50use crate::Footer;
51use crate::MAGIC_BYTES;
52use crate::WriteStrategyBuilder;
53use crate::counting::CountingVortexWrite;
54use crate::footer::FileStatistics;
55use crate::segments::writer::BufferedSegmentSink;
56
57/// Configure a new writer, which can eventually be used to write an [`ArrayStream`] into a sink
58/// that implements [`VortexWrite`].
59///
60/// Unless overridden, the default [write strategy][crate::WriteStrategyBuilder] will be used with no
61/// additional configuration.
62pub struct VortexWriteOptions {
63    session: VortexSession,
64    strategy: Arc<dyn LayoutStrategy>,
65    exclude_dtype: bool,
66    max_variable_length_statistics_size: usize,
67    file_statistics: Vec<Stat>,
68}
69
70pub trait WriteOptionsSessionExt: SessionExt {
71    /// Create [`VortexWriteOptions`] for writing to a Vortex file.
72    fn write_options(&self) -> VortexWriteOptions {
73        let session = self.session();
74        VortexWriteOptions {
75            strategy: WriteStrategyBuilder::default().build(),
76            session,
77            exclude_dtype: false,
78            file_statistics: PRUNING_STATS.to_vec(),
79            max_variable_length_statistics_size: 64,
80        }
81    }
82}
83impl<S: SessionExt> WriteOptionsSessionExt for S {}
84
85impl VortexWriteOptions {
86    /// Create a new [`VortexWriteOptions`] with the given session.
87    pub fn new(session: VortexSession) -> Self {
88        VortexWriteOptions {
89            strategy: WriteStrategyBuilder::default().build(),
90            session,
91            exclude_dtype: false,
92            file_statistics: PRUNING_STATS.to_vec(),
93            max_variable_length_statistics_size: 64,
94        }
95    }
96
97    /// Replace the default layout strategy with the provided one.
98    pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
99        self.strategy = strategy;
100        self
101    }
102
103    /// Exclude the DType from the Vortex file. You must provide the DType to the reader.
104    // TODO(ngates): Should we store some sort of DType checksum to make sure the one passed at
105    //  read-time is sane? I guess most layouts will have some reasonable validation.
106    pub fn exclude_dtype(mut self) -> Self {
107        self.exclude_dtype = true;
108        self
109    }
110
111    /// Configure which statistics to compute at the file-level.
112    pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
113        self.file_statistics = file_statistics;
114        self
115    }
116}
117
118impl VortexWriteOptions {
119    /// Drop into the blocking writer API using the given runtime.
120    pub fn blocking<B: BlockingRuntime>(self, runtime: &B) -> BlockingWrite<'_, B> {
121        BlockingWrite {
122            options: self,
123            runtime,
124        }
125    }
126
127    /// Write an [`ArrayStream`] as a Vortex file.
128    ///
129    /// Note that buffers are flushed as soon as they are available with no buffering, the caller
130    /// is responsible for deciding how to configure buffering on the underlying `Write` sink.
131    pub async fn write<W: VortexWrite + Unpin, S: ArrayStream + Send + 'static>(
132        self,
133        write: W,
134        stream: S,
135    ) -> VortexResult<WriteSummary> {
136        self.write_internal(write, ArrayStreamExt::boxed(stream))
137            .await
138    }
139
140    async fn write_internal<W: VortexWrite + Unpin>(
141        self,
142        mut write: W,
143        stream: SendableArrayStream,
144    ) -> VortexResult<WriteSummary> {
145        // NOTE(os): Setup an array context that already has all known encodings pre-populated.
146        // This is preferred for now over having an empty context here, because only the
147        // serialised array order is deterministic. The serialisation of arrays are done
148        // parallel and with an empty context they can register their encodings to the context
149        // in different order, changing the written bytes from run to run.
150        let ctx = ArrayContext::new(self.session.arrays().registry().ids().sorted().collect())
151            // Configure a registry just to ensure only known encodings are interned.
152            .with_registry(self.session.arrays().registry().clone());
153        let dtype = stream.dtype().clone();
154
155        let (mut ptr, eof) = SequenceId::root().split();
156
157        let stream = SequentialStreamAdapter::new(
158            dtype.clone(),
159            stream
160                .try_filter(|chunk| ready(!chunk.is_empty()))
161                .map(move |result| result.map(|chunk| (ptr.advance(), chunk))),
162        )
163        .sendable();
164        let (file_stats, stream) = accumulate_stats(
165            stream,
166            self.file_statistics.clone().into(),
167            self.max_variable_length_statistics_size,
168        );
169
170        // First, write the magic bytes.
171        write.write_all(ByteBuffer::copy_from(MAGIC_BYTES)).await?;
172        let mut position = MAGIC_BYTES.len() as u64;
173
174        // Create a channel to send buffers from the segment sink to the output stream.
175        let (send, recv) = kanal::bounded_async(1);
176
177        let segments = Arc::new(BufferedSegmentSink::new(send, position));
178
179        // We spawn the layout future so it is driven in the background while we write the
180        // buffer stream, so we don't need to poll it until all buffers have been drained.
181        let ctx2 = ctx.clone();
182        let layout_fut = self.session.handle().spawn_nested(|h| async move {
183            let layout = self
184                .strategy
185                .write_stream(ctx2, segments.clone(), stream, eof, h)
186                .await?;
187            Ok::<_, VortexError>((layout, segments.segment_specs()))
188        });
189
190        // Flush buffers as they arrive
191        let recv_stream = recv.into_stream();
192        pin_mut!(recv_stream);
193        while let Some(buffer) = recv_stream.next().await {
194            if buffer.is_empty() {
195                continue;
196            }
197            position += buffer.len() as u64;
198            write.write_all(buffer).await?;
199        }
200
201        let (layout, segment_specs) = layout_fut.await?;
202
203        // Assemble the Footer object now that we have all the segments.
204        let mut footer = Footer::new(
205            layout.clone(),
206            segment_specs,
207            if self.file_statistics.is_empty() {
208                None
209            } else {
210                Some(FileStatistics::new_with_dtype(
211                    file_stats.stats_sets().into(),
212                    &dtype,
213                ))
214            },
215            ReadContext::new(ctx.to_ids()),
216        );
217
218        // Emit the footer buffers and EOF.
219        let footer_buffers = footer
220            .clone()
221            .into_serializer()
222            .with_offset(position)
223            .with_exclude_dtype(self.exclude_dtype)
224            .serialize()?;
225
226        // Update the approx footer size in the footer object, so it can be used for caching and
227        // memory management in the future.
228        footer = footer.with_approx_byte_size(footer_buffers.iter().map(|b| b.len()).sum());
229
230        for buffer in footer_buffers {
231            position += buffer.len() as u64;
232            write.write_all(buffer).await?;
233        }
234
235        write.flush().await?;
236
237        Ok(WriteSummary {
238            footer,
239            size: position,
240        })
241    }
242
243    /// Create a push-based [`Writer`] that can be used to incrementally write arrays to the file.
244    pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> {
245        // Create a channel for sending arrays to the layout task.
246        let (arrays_send, arrays_recv) = kanal::bounded_async(1);
247
248        let arrays =
249            ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream()));
250
251        let write = CountingVortexWrite::new(write);
252        let bytes_written = write.counter();
253        let strategy = self.strategy.clone();
254        let future = self.write(write, arrays).boxed_local().fuse();
255
256        Writer {
257            arrays: Some(arrays_send),
258            future,
259            bytes_written,
260            strategy,
261        }
262    }
263}
264
265/// An async API for writing Vortex files.
266pub struct Writer<'w> {
267    // The input channel for sending arrays to the writer.
268    arrays: Option<kanal::AsyncSender<VortexResult<ArrayRef>>>,
269    // The writer task that ultimately produces the footer.
270    future: Fuse<LocalBoxFuture<'w, VortexResult<WriteSummary>>>,
271    // The bytes written so far.
272    bytes_written: Arc<AtomicU64>,
273    // The layout strategy that is being used for the write.
274    strategy: Arc<dyn LayoutStrategy>,
275}
276
277impl Writer<'_> {
278    /// Push a new chunk into the writer.
279    pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
280        let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
281        let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse();
282        pin_mut!(send_fut);
283
284        // We poll the writer future to continue writing bytes to the output, while waiting for
285        // enough room to push the next chunk into the channel.
286        select! {
287            result = send_fut => {
288                // If the send future failed, the writer has failed or panicked.
289                if result.is_err() {
290                    return Err(self.handle_failed_task().await);
291                }
292            },
293            result = &mut self.future => {
294                // Under normal operation, the writer future should never complete until
295                // finish() is called. Therefore, we can assume the writer has failed.
296                // The writer future has failed, we need to propagate the error.
297                match result {
298                    Ok(_) => vortex_bail!("Internal error: writer future completed early"),
299                    Err(e) => return Err(e),
300                }
301            }
302        }
303
304        Ok(())
305    }
306
307    /// Push an entire [`ArrayStream`] into the writer, consuming it.
308    ///
309    /// A task is spawned to consume the stream and push it into the writer, with the current
310    /// thread being used to write buffers to the output.
311    pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> {
312        let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
313        let stream_fut = async move {
314            while let Some(chunk) = stream.next().await {
315                arrays.send(chunk).await?;
316            }
317            Ok::<_, kanal::SendError>(())
318        }
319        .fuse();
320        pin_mut!(stream_fut);
321
322        // We poll the writer future to continue writing bytes to the output, while waiting for
323        // enough room to push the stream into the channel.
324        select! {
325            result = stream_fut => {
326                if let Err(_send_err) = result {
327                    // If the send future failed, the writer has failed or panicked.
328                    return Err(self.handle_failed_task().await);
329                }
330            }
331
332            result = &mut self.future => {
333                // Under normal operation, the writer future should never complete until
334                // finish() is called. Therefore, we can assume the writer has failed.
335                // The writer future has failed, we need to propagate the error.
336                match result {
337                    Ok(_) => vortex_bail!("Internal error: writer future completed early"),
338                    Err(e) => return Err(e),
339                }
340            }
341        }
342
343        Ok(())
344    }
345
346    /// Returns the number of bytes written to the file so far.
347    pub fn bytes_written(&self) -> u64 {
348        self.bytes_written
349            .load(std::sync::atomic::Ordering::Relaxed)
350    }
351
352    /// Returns the number of bytes currently buffered by the layout writers.
353    pub fn buffered_bytes(&self) -> u64 {
354        self.strategy.buffered_bytes()
355    }
356
357    /// Finish writing the Vortex file, flushing any remaining buffers and returning the
358    /// new file's footer.
359    pub async fn finish(mut self) -> VortexResult<WriteSummary> {
360        // Drop the input channel to signal EOF.
361        drop(self.arrays.take());
362
363        // Await the future task.
364        self.future.await
365    }
366
367    /// Assuming the writer task has failed, await it to get the error.
368    async fn handle_failed_task(&mut self) -> VortexError {
369        match (&mut self.future).await {
370            Ok(_) => vortex_err!(
371                "Internal error: writer task completed successfully but write future finished early"
372            ),
373            Err(e) => e,
374        }
375    }
376}
377
378/// A blocking API for writing Vortex files.
379pub struct BlockingWrite<'rt, B: BlockingRuntime> {
380    options: VortexWriteOptions,
381    runtime: &'rt B,
382}
383
384impl<'rt, B: BlockingRuntime> BlockingWrite<'rt, B> {
385    /// Write a Vortex file into the given `Write` sink.
386    pub fn write<W: Write + Unpin>(
387        self,
388        write: W,
389        iter: impl ArrayIterator + Send + 'static,
390    ) -> VortexResult<WriteSummary> {
391        self.runtime.block_on(async move {
392            self.options
393                .write(BlockingWriteAdapter(write), iter.into_array_stream())
394                .await
395        })
396    }
397
398    pub fn writer<'w, W: Write + Unpin + 'w>(
399        self,
400        write: W,
401        dtype: DType,
402    ) -> BlockingWriter<'rt, 'w, B> {
403        BlockingWriter {
404            writer: self.options.writer(BlockingWriteAdapter(write), dtype),
405            runtime: self.runtime,
406        }
407    }
408}
409
410/// A blocking adapter around a [`Writer`], allowing incremental writing of arrays to a Vortex file.
411pub struct BlockingWriter<'rt, 'w, B: BlockingRuntime> {
412    runtime: &'rt B,
413    writer: Writer<'w>,
414}
415
416impl<B: BlockingRuntime> BlockingWriter<'_, '_, B> {
417    pub fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
418        self.runtime.block_on(self.writer.push(chunk))
419    }
420
421    pub fn bytes_written(&self) -> u64 {
422        self.writer.bytes_written()
423    }
424
425    pub fn buffered_bytes(&self) -> u64 {
426        self.writer.buffered_bytes()
427    }
428
429    pub fn finish(self) -> VortexResult<WriteSummary> {
430        self.runtime.block_on(self.writer.finish())
431    }
432}
433
434// TODO(ngates): this blocking API may change, for now we just run blocking I/O inline.
435struct BlockingWriteAdapter<W>(W);
436
437impl<W: Write + Unpin> VortexWrite for BlockingWriteAdapter<W> {
438    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
439        self.0.write_all(buffer.as_slice())?;
440        Ok(buffer)
441    }
442
443    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
444        ready(self.0.flush())
445    }
446
447    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
448        ready(Ok(()))
449    }
450}
451
452pub struct WriteSummary {
453    footer: Footer,
454    size: u64,
455    // TODO(ngates): add a checksum
456}
457
458impl WriteSummary {
459    /// The footer of the written Vortex file.
460    pub fn footer(&self) -> &Footer {
461        &self.footer
462    }
463
464    /// The total size of the written Vortex file in bytes.
465    pub fn size(&self) -> u64 {
466        self.size
467    }
468
469    /// The total number of rows in the written Vortex file.
470    pub fn row_count(&self) -> u64 {
471        self.footer.row_count()
472    }
473}