Skip to main content

vortex_file/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::io::Write;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8
9use futures::FutureExt;
10use futures::StreamExt;
11use futures::TryStreamExt;
12use futures::future::Fuse;
13use futures::future::LocalBoxFuture;
14use futures::future::ready;
15use futures::pin_mut;
16use futures::select;
17use itertools::Itertools;
18use vortex_array::ArrayContext;
19use vortex_array::ArrayRef;
20use vortex_array::dtype::DType;
21use vortex_array::expr::stats::Stat;
22use vortex_array::iter::ArrayIterator;
23use vortex_array::iter::ArrayIteratorExt;
24use vortex_array::session::ArraySessionExt;
25use vortex_array::stats::PRUNING_STATS;
26use vortex_array::stream::ArrayStream;
27use vortex_array::stream::ArrayStreamAdapter;
28use vortex_array::stream::ArrayStreamExt;
29use vortex_array::stream::SendableArrayStream;
30use vortex_buffer::ByteBuffer;
31use vortex_error::VortexError;
32use vortex_error::VortexExpect;
33use vortex_error::VortexResult;
34use vortex_error::vortex_bail;
35use vortex_error::vortex_err;
36use vortex_io::IoBuf;
37use vortex_io::VortexWrite;
38use vortex_io::kanal_ext::KanalExt;
39use vortex_io::runtime::BlockingRuntime;
40use vortex_io::session::RuntimeSessionExt;
41use vortex_layout::LayoutStrategy;
42use vortex_layout::layouts::file_stats::accumulate_stats;
43use vortex_layout::sequence::SequenceId;
44use vortex_layout::sequence::SequentialStreamAdapter;
45use vortex_layout::sequence::SequentialStreamExt;
46use vortex_session::SessionExt;
47use vortex_session::VortexSession;
48
49use crate::Footer;
50use crate::MAGIC_BYTES;
51use crate::WriteStrategyBuilder;
52use crate::counting::CountingVortexWrite;
53use crate::footer::FileStatistics;
54use crate::segments::writer::BufferedSegmentSink;
55
56/// Configure a new writer, which can eventually be used to write an [`ArrayStream`] into a sink
57/// that implements [`VortexWrite`].
58///
59/// Unless overridden, the default [write strategy][crate::WriteStrategyBuilder] will be used with no
60/// additional configuration.
61pub struct VortexWriteOptions {
62    session: VortexSession,
63    strategy: Arc<dyn LayoutStrategy>,
64    exclude_dtype: bool,
65    max_variable_length_statistics_size: usize,
66    file_statistics: Vec<Stat>,
67}
68
69pub trait WriteOptionsSessionExt: SessionExt {
70    /// Create [`VortexWriteOptions`] for writing to a Vortex file.
71    fn write_options(&self) -> VortexWriteOptions {
72        let session = self.session();
73        VortexWriteOptions {
74            strategy: WriteStrategyBuilder::default().build(),
75            session,
76            exclude_dtype: false,
77            file_statistics: PRUNING_STATS.to_vec(),
78            max_variable_length_statistics_size: 64,
79        }
80    }
81}
82impl<S: SessionExt> WriteOptionsSessionExt for S {}
83
84impl VortexWriteOptions {
85    /// Create a new [`VortexWriteOptions`] with the given session.
86    pub fn new(session: VortexSession) -> Self {
87        VortexWriteOptions {
88            strategy: WriteStrategyBuilder::default().build(),
89            session,
90            exclude_dtype: false,
91            file_statistics: PRUNING_STATS.to_vec(),
92            max_variable_length_statistics_size: 64,
93        }
94    }
95
96    /// Replace the default layout strategy with the provided one.
97    pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
98        self.strategy = strategy;
99        self
100    }
101
102    /// Exclude the DType from the Vortex file. You must provide the DType to the reader.
103    // TODO(ngates): Should we store some sort of DType checksum to make sure the one passed at
104    //  read-time is sane? I guess most layouts will have some reasonable validation.
105    pub fn exclude_dtype(mut self) -> Self {
106        self.exclude_dtype = true;
107        self
108    }
109
110    /// Configure which statistics to compute at the file-level.
111    pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
112        self.file_statistics = file_statistics;
113        self
114    }
115}
116
117impl VortexWriteOptions {
118    /// Drop into the blocking writer API using the given runtime.
119    pub fn blocking<B: BlockingRuntime>(self, runtime: &B) -> BlockingWrite<'_, B> {
120        BlockingWrite {
121            options: self,
122            runtime,
123        }
124    }
125
126    /// Write an [`ArrayStream`] as a Vortex file.
127    ///
128    /// Note that buffers are flushed as soon as they are available with no buffering, the caller
129    /// is responsible for deciding how to configure buffering on the underlying `Write` sink.
130    pub async fn write<W: VortexWrite + Unpin, S: ArrayStream + Send + 'static>(
131        self,
132        write: W,
133        stream: S,
134    ) -> VortexResult<WriteSummary> {
135        self.write_internal(write, ArrayStreamExt::boxed(stream))
136            .await
137    }
138
139    async fn write_internal<W: VortexWrite + Unpin>(
140        self,
141        mut write: W,
142        stream: SendableArrayStream,
143    ) -> VortexResult<WriteSummary> {
144        // NOTE(os): Setup an array context that already has all known encodings pre-populated.
145        // This is preferred for now over having an empty context here, because only the
146        // serialised array order is deterministic. The serialisation of arrays are done
147        // parallel and with an empty context they can register their encodings to the context
148        // in different order, changing the written bytes from run to run.
149        let ctx = ArrayContext::new(self.session.arrays().registry().ids().sorted().collect())
150            // Configure a registry just to ensure only known encodings are interned.
151            .with_registry(self.session.arrays().registry().clone());
152        let dtype = stream.dtype().clone();
153
154        let (mut ptr, eof) = SequenceId::root().split();
155
156        let stream = SequentialStreamAdapter::new(
157            dtype.clone(),
158            stream
159                .try_filter(|chunk| ready(!chunk.is_empty()))
160                .map(move |result| result.map(|chunk| (ptr.advance(), chunk))),
161        )
162        .sendable();
163        let (file_stats, stream) = accumulate_stats(
164            stream,
165            self.file_statistics.clone().into(),
166            self.max_variable_length_statistics_size,
167        );
168
169        // First, write the magic bytes.
170        write.write_all(ByteBuffer::copy_from(MAGIC_BYTES)).await?;
171        let mut position = MAGIC_BYTES.len() as u64;
172
173        // Create a channel to send buffers from the segment sink to the output stream.
174        let (send, recv) = kanal::bounded_async(1);
175
176        let segments = Arc::new(BufferedSegmentSink::new(send, position));
177
178        // We spawn the layout future so it is driven in the background while we write the
179        // buffer stream, so we don't need to poll it until all buffers have been drained.
180        let ctx2 = ctx.clone();
181        let layout_fut = self.session.handle().spawn_nested(|h| async move {
182            let layout = self
183                .strategy
184                .write_stream(ctx2, segments.clone(), stream, eof, h)
185                .await?;
186            Ok::<_, VortexError>((layout, segments.segment_specs()))
187        });
188
189        // Flush buffers as they arrive
190        let recv_stream = recv.into_stream();
191        pin_mut!(recv_stream);
192        while let Some(buffer) = recv_stream.next().await {
193            if buffer.is_empty() {
194                continue;
195            }
196            position += buffer.len() as u64;
197            write.write_all(buffer).await?;
198        }
199
200        let (layout, segment_specs) = layout_fut.await?;
201
202        // Assemble the Footer object now that we have all the segments.
203        let mut footer = Footer::new(
204            layout.clone(),
205            segment_specs,
206            if self.file_statistics.is_empty() {
207                None
208            } else {
209                Some(FileStatistics::new_with_dtype(
210                    file_stats.stats_sets().into(),
211                    &dtype,
212                ))
213            },
214            ctx,
215        );
216
217        // Emit the footer buffers and EOF.
218        let footer_buffers = footer
219            .clone()
220            .into_serializer()
221            .with_offset(position)
222            .with_exclude_dtype(self.exclude_dtype)
223            .serialize()?;
224
225        // Update the approx footer size in the footer object, so it can be used for caching and
226        // memory management in the future.
227        footer = footer.with_approx_byte_size(footer_buffers.iter().map(|b| b.len()).sum());
228
229        for buffer in footer_buffers {
230            position += buffer.len() as u64;
231            write.write_all(buffer).await?;
232        }
233
234        write.flush().await?;
235
236        Ok(WriteSummary {
237            footer,
238            size: position,
239        })
240    }
241
242    /// Create a push-based [`Writer`] that can be used to incrementally write arrays to the file.
243    pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> {
244        // Create a channel for sending arrays to the layout task.
245        let (arrays_send, arrays_recv) = kanal::bounded_async(1);
246
247        let arrays =
248            ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream()));
249
250        let write = CountingVortexWrite::new(write);
251        let bytes_written = write.counter();
252        let strategy = self.strategy.clone();
253        let future = self.write(write, arrays).boxed_local().fuse();
254
255        Writer {
256            arrays: Some(arrays_send),
257            future,
258            bytes_written,
259            strategy,
260        }
261    }
262}
263
264/// An async API for writing Vortex files.
265pub struct Writer<'w> {
266    // The input channel for sending arrays to the writer.
267    arrays: Option<kanal::AsyncSender<VortexResult<ArrayRef>>>,
268    // The writer task that ultimately produces the footer.
269    future: Fuse<LocalBoxFuture<'w, VortexResult<WriteSummary>>>,
270    // The bytes written so far.
271    bytes_written: Arc<AtomicU64>,
272    // The layout strategy that is being used for the write.
273    strategy: Arc<dyn LayoutStrategy>,
274}
275
276impl Writer<'_> {
277    /// Push a new chunk into the writer.
278    pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
279        let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
280        let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse();
281        pin_mut!(send_fut);
282
283        // We poll the writer future to continue writing bytes to the output, while waiting for
284        // enough room to push the next chunk into the channel.
285        select! {
286            result = send_fut => {
287                // If the send future failed, the writer has failed or panicked.
288                if result.is_err() {
289                    return Err(self.handle_failed_task().await);
290                }
291            },
292            result = &mut self.future => {
293                // Under normal operation, the writer future should never complete until
294                // finish() is called. Therefore, we can assume the writer has failed.
295                // The writer future has failed, we need to propagate the error.
296                match result {
297                    Ok(_) => vortex_bail!("Internal error: writer future completed early"),
298                    Err(e) => return Err(e),
299                }
300            }
301        }
302
303        Ok(())
304    }
305
306    /// Push an entire [`ArrayStream`] into the writer, consuming it.
307    ///
308    /// A task is spawned to consume the stream and push it into the writer, with the current
309    /// thread being used to write buffers to the output.
310    pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> {
311        let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
312        let stream_fut = async move {
313            while let Some(chunk) = stream.next().await {
314                arrays.send(chunk).await?;
315            }
316            Ok::<_, kanal::SendError>(())
317        }
318        .fuse();
319        pin_mut!(stream_fut);
320
321        // We poll the writer future to continue writing bytes to the output, while waiting for
322        // enough room to push the stream into the channel.
323        select! {
324            result = stream_fut => {
325                if let Err(_send_err) = result {
326                    // If the send future failed, the writer has failed or panicked.
327                    return Err(self.handle_failed_task().await);
328                }
329            }
330
331            result = &mut self.future => {
332                // Under normal operation, the writer future should never complete until
333                // finish() is called. Therefore, we can assume the writer has failed.
334                // The writer future has failed, we need to propagate the error.
335                match result {
336                    Ok(_) => vortex_bail!("Internal error: writer future completed early"),
337                    Err(e) => return Err(e),
338                }
339            }
340        }
341
342        Ok(())
343    }
344
345    /// Returns the number of bytes written to the file so far.
346    pub fn bytes_written(&self) -> u64 {
347        self.bytes_written
348            .load(std::sync::atomic::Ordering::Relaxed)
349    }
350
351    /// Returns the number of bytes currently buffered by the layout writers.
352    pub fn buffered_bytes(&self) -> u64 {
353        self.strategy.buffered_bytes()
354    }
355
356    /// Finish writing the Vortex file, flushing any remaining buffers and returning the
357    /// new file's footer.
358    pub async fn finish(mut self) -> VortexResult<WriteSummary> {
359        // Drop the input channel to signal EOF.
360        drop(self.arrays.take());
361
362        // Await the future task.
363        self.future.await
364    }
365
366    /// Assuming the writer task has failed, await it to get the error.
367    async fn handle_failed_task(&mut self) -> VortexError {
368        match (&mut self.future).await {
369            Ok(_) => vortex_err!(
370                "Internal error: writer task completed successfully but write future finished early"
371            ),
372            Err(e) => e,
373        }
374    }
375}
376
377/// A blocking API for writing Vortex files.
378pub struct BlockingWrite<'rt, B: BlockingRuntime> {
379    options: VortexWriteOptions,
380    runtime: &'rt B,
381}
382
383impl<'rt, B: BlockingRuntime> BlockingWrite<'rt, B> {
384    /// Write a Vortex file into the given `Write` sink.
385    pub fn write<W: Write + Unpin>(
386        self,
387        write: W,
388        iter: impl ArrayIterator + Send + 'static,
389    ) -> VortexResult<WriteSummary> {
390        self.runtime.block_on(async move {
391            self.options
392                .write(BlockingWriteAdapter(write), iter.into_array_stream())
393                .await
394        })
395    }
396
397    pub fn writer<'w, W: Write + Unpin + 'w>(
398        self,
399        write: W,
400        dtype: DType,
401    ) -> BlockingWriter<'rt, 'w, B> {
402        BlockingWriter {
403            writer: self.options.writer(BlockingWriteAdapter(write), dtype),
404            runtime: self.runtime,
405        }
406    }
407}
408
409/// A blocking adapter around a [`Writer`], allowing incremental writing of arrays to a Vortex file.
410pub struct BlockingWriter<'rt, 'w, B: BlockingRuntime> {
411    runtime: &'rt B,
412    writer: Writer<'w>,
413}
414
415impl<B: BlockingRuntime> BlockingWriter<'_, '_, B> {
416    pub fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
417        self.runtime.block_on(self.writer.push(chunk))
418    }
419
420    pub fn bytes_written(&self) -> u64 {
421        self.writer.bytes_written()
422    }
423
424    pub fn buffered_bytes(&self) -> u64 {
425        self.writer.buffered_bytes()
426    }
427
428    pub fn finish(self) -> VortexResult<WriteSummary> {
429        self.runtime.block_on(self.writer.finish())
430    }
431}
432
433// TODO(ngates): this blocking API may change, for now we just run blocking I/O inline.
434struct BlockingWriteAdapter<W>(W);
435
436impl<W: Write + Unpin> VortexWrite for BlockingWriteAdapter<W> {
437    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
438        self.0.write_all(buffer.as_slice())?;
439        Ok(buffer)
440    }
441
442    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
443        ready(self.0.flush())
444    }
445
446    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
447        ready(Ok(()))
448    }
449}
450
451pub struct WriteSummary {
452    footer: Footer,
453    size: u64,
454    // TODO(ngates): add a checksum
455}
456
457impl WriteSummary {
458    /// The footer of the written Vortex file.
459    pub fn footer(&self) -> &Footer {
460        &self.footer
461    }
462
463    /// The total size of the written Vortex file in bytes.
464    pub fn size(&self) -> u64 {
465        self.size
466    }
467
468    /// The total number of rows in the written Vortex file.
469    pub fn row_count(&self) -> u64 {
470        self.footer.row_count()
471    }
472}