vortex_file/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io;
5use std::io::Write;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8
9use futures::future::{Fuse, LocalBoxFuture, ready};
10use futures::{FutureExt, StreamExt, TryStreamExt, pin_mut, select};
11use vortex_array::iter::{ArrayIterator, ArrayIteratorExt};
12use vortex_array::stats::{PRUNING_STATS, Stat};
13use vortex_array::stream::{ArrayStream, ArrayStreamAdapter, ArrayStreamExt, SendableArrayStream};
14use vortex_array::{ArrayContext, ArrayRef};
15use vortex_buffer::ByteBuffer;
16use vortex_dtype::DType;
17use vortex_error::{
18    VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
19};
20use vortex_io::kanal_ext::KanalExt;
21use vortex_io::runtime::{BlockingRuntime, Handle};
22use vortex_io::{IoBuf, VortexWrite};
23use vortex_layout::LayoutStrategy;
24use vortex_layout::layouts::file_stats::accumulate_stats;
25use vortex_layout::sequence::{SequenceId, SequentialStreamAdapter, SequentialStreamExt};
26
27use crate::counting::CountingVortexWrite;
28use crate::footer::FileStatistics;
29use crate::segments::writer::BufferedSegmentSink;
30use crate::{Footer, MAGIC_BYTES, WriteStrategyBuilder};
31
32/// Configure a new writer, which can eventually be used to write an [`ArrayStream`] into a sink that implements [`VortexWrite`].
33///
34/// Unless overridden, the default [write strategy][crate::WriteStrategyBuilder] will be used with no
35/// additional configuration.
36pub struct VortexWriteOptions {
37    strategy: Arc<dyn LayoutStrategy>,
38    exclude_dtype: bool,
39    max_variable_length_statistics_size: usize,
40    file_statistics: Vec<Stat>,
41    handle: Option<Handle>,
42}
43
44impl Default for VortexWriteOptions {
45    fn default() -> Self {
46        Self {
47            strategy: WriteStrategyBuilder::new().build(),
48            exclude_dtype: false,
49            file_statistics: PRUNING_STATS.to_vec(),
50            max_variable_length_statistics_size: 64,
51            handle: Handle::find(),
52        }
53    }
54}
55
56impl VortexWriteOptions {
57    /// Configure a [`Handle`] for driving async tasks.
58    ///
59    /// If not provided, a handle will try to be inferred from [`Handle::find`].
60    pub fn with_handle(mut self, handle: Handle) -> Self {
61        self.handle = Some(handle);
62        self
63    }
64
65    /// See [`VortexWriteOptions::with_handle`].
66    pub fn with_some_handle(mut self, handle: Option<Handle>) -> Self {
67        self.handle = handle.or(self.handle);
68        self
69    }
70
71    /// Replace the default layout strategy with the provided one.
72    pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
73        self.strategy = strategy;
74        self
75    }
76
77    /// Exclude the DType from the Vortex file. You must provide the DType to the reader.
78    // TODO(ngates): Should we store some sort of DType checksum to make sure the one passed at
79    //  read-time is sane? I guess most layouts will have some reasonable validation.
80    pub fn exclude_dtype(mut self) -> Self {
81        self.exclude_dtype = true;
82        self
83    }
84
85    /// Configure which statistics to compute at the file-level.
86    pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
87        self.file_statistics = file_statistics;
88        self
89    }
90}
91
92impl VortexWriteOptions {
93    /// Drop into the blocking writer API using the given runtime.
94    pub fn blocking<B: BlockingRuntime + Default>(self) -> BlockingWrite<B> {
95        self.with_blocking(B::default())
96    }
97
98    /// Drop into the blocking writer API using the given runtime.
99    pub fn with_blocking<B: BlockingRuntime>(self, runtime: B) -> BlockingWrite<B> {
100        if self.handle.is_some() {
101            vortex_panic!("Must not provide or infer a Handle when using the blocking writer API")
102        }
103        BlockingWrite {
104            options: self,
105            runtime,
106        }
107    }
108
109    /// Write an [`ArrayStream`] as a Vortex file.
110    ///
111    /// Note that buffers are flushed as soon as they are available with no buffering, the caller
112    /// is responsible for deciding how to configure buffering on the underlying `Write` sink.
113    pub async fn write<W: VortexWrite + Unpin, S: ArrayStream + Send + 'static>(
114        self,
115        write: W,
116        stream: S,
117    ) -> VortexResult<WriteSummary> {
118        self.write_internal(write, ArrayStreamExt::boxed(stream))
119            .await
120    }
121
122    async fn write_internal<W: VortexWrite + Unpin>(
123        self,
124        mut write: W,
125        stream: SendableArrayStream,
126    ) -> VortexResult<WriteSummary> {
127        let Some(handle) = self.handle else {
128            vortex_panic!("Must provide a Handle to use the async writer API");
129        };
130
131        // Set up a Context to capture the encodings used in the file.
132        let ctx = ArrayContext::empty();
133        let dtype = stream.dtype().clone();
134
135        let (mut ptr, eof) = SequenceId::root().split();
136
137        let stream = SequentialStreamAdapter::new(
138            dtype.clone(),
139            stream
140                .try_filter(|chunk| ready(!chunk.is_empty()))
141                .map(move |result| result.map(|chunk| (ptr.advance(), chunk))),
142        )
143        .sendable();
144        let (file_stats, stream) = accumulate_stats(
145            stream,
146            self.file_statistics.clone().into(),
147            self.max_variable_length_statistics_size,
148        );
149
150        // First, write the magic bytes.
151        write.write_all(ByteBuffer::copy_from(MAGIC_BYTES)).await?;
152        let mut position = MAGIC_BYTES.len() as u64;
153
154        // Create a channel to send buffers from the segment sink to the output stream.
155        let (send, recv) = kanal::bounded_async(1);
156
157        let segments = Arc::new(BufferedSegmentSink::new(send, position));
158
159        // We spawn the layout future so it is driven in the background while we write the
160        // buffer stream, so we don't need to poll it until all buffers have been drained.
161        let ctx2 = ctx.clone();
162        let layout_fut = handle.spawn_nested(|h| async move {
163            let layout = self
164                .strategy
165                .write_stream(ctx2, segments.clone(), stream, eof, h)
166                .await?;
167            Ok::<_, VortexError>((layout, segments.segment_specs()))
168        });
169
170        // Flush buffers as they arrive
171        let recv_stream = recv.into_stream();
172        pin_mut!(recv_stream);
173        while let Some(buffer) = recv_stream.next().await {
174            if buffer.is_empty() {
175                continue;
176            }
177            position += buffer.len() as u64;
178            write.write_all(buffer).await?;
179        }
180
181        let (layout, segment_specs) = layout_fut.await?;
182
183        // Assemble the Footer object now that we have all the segments.
184        let footer = Footer::new(
185            layout.clone(),
186            segment_specs,
187            if self.file_statistics.is_empty() {
188                None
189            } else {
190                Some(FileStatistics(file_stats.stats_sets().into()))
191            },
192            ctx,
193        );
194
195        // Emit the footer buffers and EOF.
196        let footer_buffers = footer
197            .clone()
198            .into_serializer()
199            .with_offset(position)
200            .with_exclude_dtype(self.exclude_dtype)
201            .serialize()?;
202        for buffer in footer_buffers {
203            position += buffer.len() as u64;
204            write.write_all(buffer).await?;
205        }
206
207        write.flush().await?;
208
209        Ok(WriteSummary {
210            footer,
211            size: position,
212        })
213    }
214
215    /// Create a push-based [`Writer`] that can be used to incrementally write arrays to the file.
216    pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> {
217        // Create a channel for sending arrays to the layout task.
218        let (arrays_send, arrays_recv) = kanal::bounded_async(1);
219
220        let arrays =
221            ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream()));
222
223        let write = CountingVortexWrite::new(write);
224        let bytes_written = write.counter();
225        let strategy = self.strategy.clone();
226        let future = self.write(write, arrays).boxed_local().fuse();
227
228        Writer {
229            arrays: Some(arrays_send),
230            future,
231            bytes_written,
232            strategy,
233        }
234    }
235}
236
237/// An async API for writing Vortex files.
238pub struct Writer<'w> {
239    // The input channel for sending arrays to the writer.
240    arrays: Option<kanal::AsyncSender<VortexResult<ArrayRef>>>,
241    // The writer task that ultimately produces the footer.
242    future: Fuse<LocalBoxFuture<'w, VortexResult<WriteSummary>>>,
243    // The bytes written so far.
244    bytes_written: Arc<AtomicU64>,
245    // The layout strategy that is being used for the write.
246    strategy: Arc<dyn LayoutStrategy>,
247}
248
249impl Writer<'_> {
250    /// Push a new chunk into the writer.
251    pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
252        let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
253        let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse();
254        pin_mut!(send_fut);
255
256        // We poll the writer future to continue writing bytes to the output, while waiting for
257        // enough room to push the next chunk into the channel.
258        select! {
259            result = send_fut => {
260                // If the send future failed, the writer has failed or panicked.
261                if result.is_err() {
262                    return Err(self.handle_failed_task().await);
263                }
264            },
265            result = &mut self.future => {
266                // Under normal operation, the writer future should never complete until
267                // finish() is called. Therefore, we can assume the writer has failed.
268                // The writer future has failed, we need to propagate the error.
269                match result {
270                    Ok(_) => vortex_bail!("Internal error: writer future completed early"),
271                    Err(e) => return Err(e),
272                }
273            }
274        }
275
276        Ok(())
277    }
278
279    /// Push an entire [`ArrayStream`] into the writer, consuming it.
280    ///
281    /// A task is spawned to consume the stream and push it into the writer, with the current
282    /// thread being used to write buffers to the output.
283    pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> {
284        let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
285        let stream_fut = async move {
286            while let Some(chunk) = stream.next().await {
287                arrays.send(chunk).await?;
288            }
289            Ok::<_, kanal::SendError>(())
290        }
291        .fuse();
292        pin_mut!(stream_fut);
293
294        // We poll the writer future to continue writing bytes to the output, while waiting for
295        // enough room to push the stream into the channel.
296        select! {
297            result = stream_fut => {
298                if let Err(_send_err) = result {
299                    // If the send future failed, the writer has failed or panicked.
300                    return Err(self.handle_failed_task().await);
301                }
302            }
303
304            result = &mut self.future => {
305                // Under normal operation, the writer future should never complete until
306                // finish() is called. Therefore, we can assume the writer has failed.
307                // The writer future has failed, we need to propagate the error.
308                match result {
309                    Ok(_) => vortex_bail!("Internal error: writer future completed early"),
310                    Err(e) => return Err(e),
311                }
312            }
313        }
314
315        Ok(())
316    }
317
318    /// Returns the number of bytes written to the file so far.
319    pub fn bytes_written(&self) -> u64 {
320        self.bytes_written
321            .load(std::sync::atomic::Ordering::Relaxed)
322    }
323
324    /// Returns the number of bytes currently buffered by the layout writers.
325    pub fn buffered_bytes(&self) -> u64 {
326        self.strategy.buffered_bytes()
327    }
328
329    /// Finish writing the Vortex file, flushing any remaining buffers and returning the
330    /// new file's footer.
331    pub async fn finish(mut self) -> VortexResult<WriteSummary> {
332        // Drop the input channel to signal EOF.
333        drop(self.arrays.take());
334
335        // Await the future task.
336        self.future.await
337    }
338
339    /// Assuming the writer task has failed, await it to get the error.
340    async fn handle_failed_task(&mut self) -> VortexError {
341        match (&mut self.future).await {
342            Ok(_) => vortex_err!(
343                "Internal error: writer task completed successfully but write future finished early"
344            ),
345            Err(e) => e,
346        }
347    }
348}
349
350/// A blocking API for writing Vortex files.
351pub struct BlockingWrite<B: BlockingRuntime> {
352    options: VortexWriteOptions,
353    runtime: B,
354}
355
356impl<B: BlockingRuntime> BlockingWrite<B> {
357    /// Write a Vortex file into the given `Write` sink.
358    pub fn write<W: Write + Unpin>(
359        self,
360        write: W,
361        iter: impl ArrayIterator + Send + 'static,
362    ) -> VortexResult<WriteSummary> {
363        self.runtime.block_on(|handle| async move {
364            self.options
365                .with_handle(handle)
366                .write(BlockingWriteAdapter(write), iter.into_array_stream())
367                .await
368        })
369    }
370
371    pub fn writer<'w, W: Write + Unpin + 'w>(
372        self,
373        write: W,
374        dtype: DType,
375    ) -> BlockingWriter<'w, B> {
376        BlockingWriter {
377            writer: self
378                .options
379                .with_handle(self.runtime.handle())
380                .writer(BlockingWriteAdapter(write), dtype),
381            runtime: self.runtime,
382        }
383    }
384}
385
386/// A blocking adapter around a [`Writer`], allowing incremental writing of arrays to a Vortex file.
387pub struct BlockingWriter<'w, B: BlockingRuntime> {
388    runtime: B,
389    writer: Writer<'w>,
390}
391
392impl<B: BlockingRuntime> BlockingWriter<'_, B> {
393    pub fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
394        self.runtime.block_on(|_| self.writer.push(chunk))
395    }
396
397    pub fn bytes_written(&self) -> u64 {
398        self.writer.bytes_written()
399    }
400
401    pub fn buffered_bytes(&self) -> u64 {
402        self.writer.buffered_bytes()
403    }
404
405    pub fn finish(self) -> VortexResult<WriteSummary> {
406        self.runtime.block_on(|_| self.writer.finish())
407    }
408}
409
410// TODO(ngates): this blocking API may change, for now we just run blocking I/O inline.
411struct BlockingWriteAdapter<W>(W);
412
413impl<W: Write + Unpin> VortexWrite for BlockingWriteAdapter<W> {
414    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
415        self.0.write_all(buffer.as_slice())?;
416        Ok(buffer)
417    }
418
419    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
420        ready(self.0.flush())
421    }
422
423    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
424        ready(Ok(()))
425    }
426}
427
428pub struct WriteSummary {
429    footer: Footer,
430    size: u64,
431    // TODO(ngates): add a checksum
432}
433
434impl WriteSummary {
435    /// The footer of the written Vortex file.
436    pub fn footer(&self) -> &Footer {
437        &self.footer
438    }
439
440    /// The total size of the written Vortex file in bytes.
441    pub fn size(&self) -> u64 {
442        self.size
443    }
444
445    /// The footer of the written Vortex file.
446    pub fn row_count(&self) -> u64 {
447        self.footer.row_count()
448    }
449}