1use std::io;
5use std::io::Write;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8
9use futures::FutureExt;
10use futures::StreamExt;
11use futures::TryStreamExt;
12use futures::future::Fuse;
13use futures::future::LocalBoxFuture;
14use futures::future::ready;
15use futures::pin_mut;
16use futures::select;
17use vortex_array::ArrayContext;
18use vortex_array::ArrayRef;
19use vortex_array::expr::stats::Stat;
20use vortex_array::iter::ArrayIterator;
21use vortex_array::iter::ArrayIteratorExt;
22use vortex_array::stats::PRUNING_STATS;
23use vortex_array::stream::ArrayStream;
24use vortex_array::stream::ArrayStreamAdapter;
25use vortex_array::stream::ArrayStreamExt;
26use vortex_array::stream::SendableArrayStream;
27use vortex_buffer::ByteBuffer;
28use vortex_dtype::DType;
29use vortex_error::VortexError;
30use vortex_error::VortexExpect;
31use vortex_error::VortexResult;
32use vortex_error::vortex_bail;
33use vortex_error::vortex_err;
34use vortex_io::IoBuf;
35use vortex_io::VortexWrite;
36use vortex_io::kanal_ext::KanalExt;
37use vortex_io::runtime::BlockingRuntime;
38use vortex_io::session::RuntimeSessionExt;
39use vortex_layout::LayoutStrategy;
40use vortex_layout::layouts::file_stats::accumulate_stats;
41use vortex_layout::sequence::SequenceId;
42use vortex_layout::sequence::SequentialStreamAdapter;
43use vortex_layout::sequence::SequentialStreamExt;
44use vortex_session::SessionExt;
45use vortex_session::VortexSession;
46
47use crate::Footer;
48use crate::MAGIC_BYTES;
49use crate::WriteStrategyBuilder;
50use crate::counting::CountingVortexWrite;
51use crate::footer::FileStatistics;
52use crate::segments::writer::BufferedSegmentSink;
53
54pub struct VortexWriteOptions {
60 session: VortexSession,
61 strategy: Arc<dyn LayoutStrategy>,
62 exclude_dtype: bool,
63 max_variable_length_statistics_size: usize,
64 file_statistics: Vec<Stat>,
65}
66
67pub trait WriteOptionsSessionExt: SessionExt {
68 fn write_options(&self) -> VortexWriteOptions {
70 VortexWriteOptions {
71 session: self.session(),
72 strategy: WriteStrategyBuilder::new().build(),
73 exclude_dtype: false,
74 file_statistics: PRUNING_STATS.to_vec(),
75 max_variable_length_statistics_size: 64,
76 }
77 }
78}
79impl<S: SessionExt> WriteOptionsSessionExt for S {}
80
81impl VortexWriteOptions {
82 pub fn new(session: VortexSession) -> Self {
84 VortexWriteOptions {
85 session,
86 strategy: WriteStrategyBuilder::new().build(),
87 exclude_dtype: false,
88 file_statistics: PRUNING_STATS.to_vec(),
89 max_variable_length_statistics_size: 64,
90 }
91 }
92
93 pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
95 self.strategy = strategy;
96 self
97 }
98
99 pub fn exclude_dtype(mut self) -> Self {
103 self.exclude_dtype = true;
104 self
105 }
106
107 pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
109 self.file_statistics = file_statistics;
110 self
111 }
112}
113
114impl VortexWriteOptions {
115 pub fn blocking<B: BlockingRuntime>(self, runtime: &B) -> BlockingWrite<'_, B> {
117 BlockingWrite {
118 options: self,
119 runtime,
120 }
121 }
122
123 pub async fn write<W: VortexWrite + Unpin, S: ArrayStream + Send + 'static>(
128 self,
129 write: W,
130 stream: S,
131 ) -> VortexResult<WriteSummary> {
132 self.write_internal(write, ArrayStreamExt::boxed(stream))
133 .await
134 }
135
136 async fn write_internal<W: VortexWrite + Unpin>(
137 self,
138 mut write: W,
139 stream: SendableArrayStream,
140 ) -> VortexResult<WriteSummary> {
141 let ctx = ArrayContext::empty();
143 let dtype = stream.dtype().clone();
144
145 let (mut ptr, eof) = SequenceId::root().split();
146
147 let stream = SequentialStreamAdapter::new(
148 dtype.clone(),
149 stream
150 .try_filter(|chunk| ready(!chunk.is_empty()))
151 .map(move |result| result.map(|chunk| (ptr.advance(), chunk))),
152 )
153 .sendable();
154 let (file_stats, stream) = accumulate_stats(
155 stream,
156 self.file_statistics.clone().into(),
157 self.max_variable_length_statistics_size,
158 );
159
160 write.write_all(ByteBuffer::copy_from(MAGIC_BYTES)).await?;
162 let mut position = MAGIC_BYTES.len() as u64;
163
164 let (send, recv) = kanal::bounded_async(1);
166
167 let segments = Arc::new(BufferedSegmentSink::new(send, position));
168
169 let ctx2 = ctx.clone();
172 let layout_fut = self.session.handle().spawn_nested(|h| async move {
173 let layout = self
174 .strategy
175 .write_stream(ctx2, segments.clone(), stream, eof, h)
176 .await?;
177 Ok::<_, VortexError>((layout, segments.segment_specs()))
178 });
179
180 let recv_stream = recv.into_stream();
182 pin_mut!(recv_stream);
183 while let Some(buffer) = recv_stream.next().await {
184 if buffer.is_empty() {
185 continue;
186 }
187 position += buffer.len() as u64;
188 write.write_all(buffer).await?;
189 }
190
191 let (layout, segment_specs) = layout_fut.await?;
192
193 let footer = Footer::new(
195 layout.clone(),
196 segment_specs,
197 if self.file_statistics.is_empty() {
198 None
199 } else {
200 Some(FileStatistics(file_stats.stats_sets().into()))
201 },
202 ctx,
203 );
204
205 let footer_buffers = footer
207 .clone()
208 .into_serializer()
209 .with_offset(position)
210 .with_exclude_dtype(self.exclude_dtype)
211 .serialize()?;
212 for buffer in footer_buffers {
213 position += buffer.len() as u64;
214 write.write_all(buffer).await?;
215 }
216
217 write.flush().await?;
218
219 Ok(WriteSummary {
220 footer,
221 size: position,
222 })
223 }
224
225 pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> {
227 let (arrays_send, arrays_recv) = kanal::bounded_async(1);
229
230 let arrays =
231 ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream()));
232
233 let write = CountingVortexWrite::new(write);
234 let bytes_written = write.counter();
235 let strategy = self.strategy.clone();
236 let future = self.write(write, arrays).boxed_local().fuse();
237
238 Writer {
239 arrays: Some(arrays_send),
240 future,
241 bytes_written,
242 strategy,
243 }
244 }
245}
246
247pub struct Writer<'w> {
249 arrays: Option<kanal::AsyncSender<VortexResult<ArrayRef>>>,
251 future: Fuse<LocalBoxFuture<'w, VortexResult<WriteSummary>>>,
253 bytes_written: Arc<AtomicU64>,
255 strategy: Arc<dyn LayoutStrategy>,
257}
258
259impl Writer<'_> {
260 pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
262 let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
263 let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse();
264 pin_mut!(send_fut);
265
266 select! {
269 result = send_fut => {
270 if result.is_err() {
272 return Err(self.handle_failed_task().await);
273 }
274 },
275 result = &mut self.future => {
276 match result {
280 Ok(_) => vortex_bail!("Internal error: writer future completed early"),
281 Err(e) => return Err(e),
282 }
283 }
284 }
285
286 Ok(())
287 }
288
289 pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> {
294 let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
295 let stream_fut = async move {
296 while let Some(chunk) = stream.next().await {
297 arrays.send(chunk).await?;
298 }
299 Ok::<_, kanal::SendError>(())
300 }
301 .fuse();
302 pin_mut!(stream_fut);
303
304 select! {
307 result = stream_fut => {
308 if let Err(_send_err) = result {
309 return Err(self.handle_failed_task().await);
311 }
312 }
313
314 result = &mut self.future => {
315 match result {
319 Ok(_) => vortex_bail!("Internal error: writer future completed early"),
320 Err(e) => return Err(e),
321 }
322 }
323 }
324
325 Ok(())
326 }
327
328 pub fn bytes_written(&self) -> u64 {
330 self.bytes_written
331 .load(std::sync::atomic::Ordering::Relaxed)
332 }
333
334 pub fn buffered_bytes(&self) -> u64 {
336 self.strategy.buffered_bytes()
337 }
338
339 pub async fn finish(mut self) -> VortexResult<WriteSummary> {
342 drop(self.arrays.take());
344
345 self.future.await
347 }
348
349 async fn handle_failed_task(&mut self) -> VortexError {
351 match (&mut self.future).await {
352 Ok(_) => vortex_err!(
353 "Internal error: writer task completed successfully but write future finished early"
354 ),
355 Err(e) => e,
356 }
357 }
358}
359
360pub struct BlockingWrite<'rt, B: BlockingRuntime> {
362 options: VortexWriteOptions,
363 runtime: &'rt B,
364}
365
366impl<'rt, B: BlockingRuntime> BlockingWrite<'rt, B> {
367 pub fn write<W: Write + Unpin>(
369 self,
370 write: W,
371 iter: impl ArrayIterator + Send + 'static,
372 ) -> VortexResult<WriteSummary> {
373 self.runtime.block_on(async move {
374 self.options
375 .write(BlockingWriteAdapter(write), iter.into_array_stream())
376 .await
377 })
378 }
379
380 pub fn writer<'w, W: Write + Unpin + 'w>(
381 self,
382 write: W,
383 dtype: DType,
384 ) -> BlockingWriter<'rt, 'w, B> {
385 BlockingWriter {
386 writer: self.options.writer(BlockingWriteAdapter(write), dtype),
387 runtime: self.runtime,
388 }
389 }
390}
391
392pub struct BlockingWriter<'rt, 'w, B: BlockingRuntime> {
394 runtime: &'rt B,
395 writer: Writer<'w>,
396}
397
398impl<B: BlockingRuntime> BlockingWriter<'_, '_, B> {
399 pub fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
400 self.runtime.block_on(self.writer.push(chunk))
401 }
402
403 pub fn bytes_written(&self) -> u64 {
404 self.writer.bytes_written()
405 }
406
407 pub fn buffered_bytes(&self) -> u64 {
408 self.writer.buffered_bytes()
409 }
410
411 pub fn finish(self) -> VortexResult<WriteSummary> {
412 self.runtime.block_on(self.writer.finish())
413 }
414}
415
416struct BlockingWriteAdapter<W>(W);
418
419impl<W: Write + Unpin> VortexWrite for BlockingWriteAdapter<W> {
420 async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
421 self.0.write_all(buffer.as_slice())?;
422 Ok(buffer)
423 }
424
425 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
426 ready(self.0.flush())
427 }
428
429 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
430 ready(Ok(()))
431 }
432}
433
434pub struct WriteSummary {
435 footer: Footer,
436 size: u64,
437 }
439
440impl WriteSummary {
441 pub fn footer(&self) -> &Footer {
443 &self.footer
444 }
445
446 pub fn size(&self) -> u64 {
448 self.size
449 }
450
451 pub fn row_count(&self) -> u64 {
453 self.footer.row_count()
454 }
455}