1use std::io;
5use std::io::Write;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8
9use futures::future::{Fuse, LocalBoxFuture, ready};
10use futures::{FutureExt, StreamExt, TryStreamExt, pin_mut, select};
11use vortex_array::iter::{ArrayIterator, ArrayIteratorExt};
12use vortex_array::stats::{PRUNING_STATS, Stat};
13use vortex_array::stream::{ArrayStream, ArrayStreamAdapter, ArrayStreamExt, SendableArrayStream};
14use vortex_array::{ArrayContext, ArrayRef};
15use vortex_buffer::ByteBuffer;
16use vortex_dtype::DType;
17use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err};
18use vortex_io::kanal_ext::KanalExt;
19use vortex_io::runtime::BlockingRuntime;
20use vortex_io::session::RuntimeSessionExt;
21use vortex_io::{IoBuf, VortexWrite};
22use vortex_layout::LayoutStrategy;
23use vortex_layout::layouts::file_stats::accumulate_stats;
24use vortex_layout::sequence::{SequenceId, SequentialStreamAdapter, SequentialStreamExt};
25use vortex_session::{SessionExt, VortexSession};
26
27use crate::counting::CountingVortexWrite;
28use crate::footer::FileStatistics;
29use crate::segments::writer::BufferedSegmentSink;
30use crate::{Footer, MAGIC_BYTES, WriteStrategyBuilder};
31
32pub struct VortexWriteOptions {
38 session: VortexSession,
39 strategy: Arc<dyn LayoutStrategy>,
40 exclude_dtype: bool,
41 max_variable_length_statistics_size: usize,
42 file_statistics: Vec<Stat>,
43}
44
45pub trait WriteOptionsSessionExt: SessionExt {
46 fn write_options(&self) -> VortexWriteOptions {
48 VortexWriteOptions {
49 session: self.session(),
50 strategy: WriteStrategyBuilder::new().build(),
51 exclude_dtype: false,
52 file_statistics: PRUNING_STATS.to_vec(),
53 max_variable_length_statistics_size: 64,
54 }
55 }
56}
57impl<S: SessionExt> WriteOptionsSessionExt for S {}
58
59impl VortexWriteOptions {
60 pub fn new(session: VortexSession) -> Self {
62 VortexWriteOptions {
63 session,
64 strategy: WriteStrategyBuilder::new().build(),
65 exclude_dtype: false,
66 file_statistics: PRUNING_STATS.to_vec(),
67 max_variable_length_statistics_size: 64,
68 }
69 }
70
71 pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
73 self.strategy = strategy;
74 self
75 }
76
77 pub fn exclude_dtype(mut self) -> Self {
81 self.exclude_dtype = true;
82 self
83 }
84
85 pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
87 self.file_statistics = file_statistics;
88 self
89 }
90}
91
92impl VortexWriteOptions {
93 pub fn blocking<B: BlockingRuntime>(self, runtime: &B) -> BlockingWrite<'_, B> {
95 BlockingWrite {
96 options: self,
97 runtime,
98 }
99 }
100
101 pub async fn write<W: VortexWrite + Unpin, S: ArrayStream + Send + 'static>(
106 self,
107 write: W,
108 stream: S,
109 ) -> VortexResult<WriteSummary> {
110 self.write_internal(write, ArrayStreamExt::boxed(stream))
111 .await
112 }
113
114 async fn write_internal<W: VortexWrite + Unpin>(
115 self,
116 mut write: W,
117 stream: SendableArrayStream,
118 ) -> VortexResult<WriteSummary> {
119 let ctx = ArrayContext::empty();
121 let dtype = stream.dtype().clone();
122
123 let (mut ptr, eof) = SequenceId::root().split();
124
125 let stream = SequentialStreamAdapter::new(
126 dtype.clone(),
127 stream
128 .try_filter(|chunk| ready(!chunk.is_empty()))
129 .map(move |result| result.map(|chunk| (ptr.advance(), chunk))),
130 )
131 .sendable();
132 let (file_stats, stream) = accumulate_stats(
133 stream,
134 self.file_statistics.clone().into(),
135 self.max_variable_length_statistics_size,
136 );
137
138 write.write_all(ByteBuffer::copy_from(MAGIC_BYTES)).await?;
140 let mut position = MAGIC_BYTES.len() as u64;
141
142 let (send, recv) = kanal::bounded_async(1);
144
145 let segments = Arc::new(BufferedSegmentSink::new(send, position));
146
147 let ctx2 = ctx.clone();
150 let layout_fut = self.session.handle().spawn_nested(|h| async move {
151 let layout = self
152 .strategy
153 .write_stream(ctx2, segments.clone(), stream, eof, h)
154 .await?;
155 Ok::<_, VortexError>((layout, segments.segment_specs()))
156 });
157
158 let recv_stream = recv.into_stream();
160 pin_mut!(recv_stream);
161 while let Some(buffer) = recv_stream.next().await {
162 if buffer.is_empty() {
163 continue;
164 }
165 position += buffer.len() as u64;
166 write.write_all(buffer).await?;
167 }
168
169 let (layout, segment_specs) = layout_fut.await?;
170
171 let footer = Footer::new(
173 layout.clone(),
174 segment_specs,
175 if self.file_statistics.is_empty() {
176 None
177 } else {
178 Some(FileStatistics(file_stats.stats_sets().into()))
179 },
180 ctx,
181 );
182
183 let footer_buffers = footer
185 .clone()
186 .into_serializer()
187 .with_offset(position)
188 .with_exclude_dtype(self.exclude_dtype)
189 .serialize()?;
190 for buffer in footer_buffers {
191 position += buffer.len() as u64;
192 write.write_all(buffer).await?;
193 }
194
195 write.flush().await?;
196
197 Ok(WriteSummary {
198 footer,
199 size: position,
200 })
201 }
202
203 pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> {
205 let (arrays_send, arrays_recv) = kanal::bounded_async(1);
207
208 let arrays =
209 ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream()));
210
211 let write = CountingVortexWrite::new(write);
212 let bytes_written = write.counter();
213 let strategy = self.strategy.clone();
214 let future = self.write(write, arrays).boxed_local().fuse();
215
216 Writer {
217 arrays: Some(arrays_send),
218 future,
219 bytes_written,
220 strategy,
221 }
222 }
223}
224
225pub struct Writer<'w> {
227 arrays: Option<kanal::AsyncSender<VortexResult<ArrayRef>>>,
229 future: Fuse<LocalBoxFuture<'w, VortexResult<WriteSummary>>>,
231 bytes_written: Arc<AtomicU64>,
233 strategy: Arc<dyn LayoutStrategy>,
235}
236
237impl Writer<'_> {
238 pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
240 let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
241 let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse();
242 pin_mut!(send_fut);
243
244 select! {
247 result = send_fut => {
248 if result.is_err() {
250 return Err(self.handle_failed_task().await);
251 }
252 },
253 result = &mut self.future => {
254 match result {
258 Ok(_) => vortex_bail!("Internal error: writer future completed early"),
259 Err(e) => return Err(e),
260 }
261 }
262 }
263
264 Ok(())
265 }
266
267 pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> {
272 let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
273 let stream_fut = async move {
274 while let Some(chunk) = stream.next().await {
275 arrays.send(chunk).await?;
276 }
277 Ok::<_, kanal::SendError>(())
278 }
279 .fuse();
280 pin_mut!(stream_fut);
281
282 select! {
285 result = stream_fut => {
286 if let Err(_send_err) = result {
287 return Err(self.handle_failed_task().await);
289 }
290 }
291
292 result = &mut self.future => {
293 match result {
297 Ok(_) => vortex_bail!("Internal error: writer future completed early"),
298 Err(e) => return Err(e),
299 }
300 }
301 }
302
303 Ok(())
304 }
305
306 pub fn bytes_written(&self) -> u64 {
308 self.bytes_written
309 .load(std::sync::atomic::Ordering::Relaxed)
310 }
311
312 pub fn buffered_bytes(&self) -> u64 {
314 self.strategy.buffered_bytes()
315 }
316
317 pub async fn finish(mut self) -> VortexResult<WriteSummary> {
320 drop(self.arrays.take());
322
323 self.future.await
325 }
326
327 async fn handle_failed_task(&mut self) -> VortexError {
329 match (&mut self.future).await {
330 Ok(_) => vortex_err!(
331 "Internal error: writer task completed successfully but write future finished early"
332 ),
333 Err(e) => e,
334 }
335 }
336}
337
338pub struct BlockingWrite<'rt, B: BlockingRuntime> {
340 options: VortexWriteOptions,
341 runtime: &'rt B,
342}
343
344impl<'rt, B: BlockingRuntime> BlockingWrite<'rt, B> {
345 pub fn write<W: Write + Unpin>(
347 self,
348 write: W,
349 iter: impl ArrayIterator + Send + 'static,
350 ) -> VortexResult<WriteSummary> {
351 self.runtime.block_on(async move {
352 self.options
353 .write(BlockingWriteAdapter(write), iter.into_array_stream())
354 .await
355 })
356 }
357
358 pub fn writer<'w, W: Write + Unpin + 'w>(
359 self,
360 write: W,
361 dtype: DType,
362 ) -> BlockingWriter<'rt, 'w, B> {
363 BlockingWriter {
364 writer: self.options.writer(BlockingWriteAdapter(write), dtype),
365 runtime: self.runtime,
366 }
367 }
368}
369
370pub struct BlockingWriter<'rt, 'w, B: BlockingRuntime> {
372 runtime: &'rt B,
373 writer: Writer<'w>,
374}
375
376impl<B: BlockingRuntime> BlockingWriter<'_, '_, B> {
377 pub fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
378 self.runtime.block_on(self.writer.push(chunk))
379 }
380
381 pub fn bytes_written(&self) -> u64 {
382 self.writer.bytes_written()
383 }
384
385 pub fn buffered_bytes(&self) -> u64 {
386 self.writer.buffered_bytes()
387 }
388
389 pub fn finish(self) -> VortexResult<WriteSummary> {
390 self.runtime.block_on(self.writer.finish())
391 }
392}
393
394struct BlockingWriteAdapter<W>(W);
396
397impl<W: Write + Unpin> VortexWrite for BlockingWriteAdapter<W> {
398 async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
399 self.0.write_all(buffer.as_slice())?;
400 Ok(buffer)
401 }
402
403 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
404 ready(self.0.flush())
405 }
406
407 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
408 ready(Ok(()))
409 }
410}
411
412pub struct WriteSummary {
413 footer: Footer,
414 size: u64,
415 }
417
418impl WriteSummary {
419 pub fn footer(&self) -> &Footer {
421 &self.footer
422 }
423
424 pub fn size(&self) -> u64 {
426 self.size
427 }
428
429 pub fn row_count(&self) -> u64 {
431 self.footer.row_count()
432 }
433}