1use std::io;
5use std::io::Write;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8
9use futures::FutureExt;
10use futures::StreamExt;
11use futures::TryStreamExt;
12use futures::future::Fuse;
13use futures::future::LocalBoxFuture;
14use futures::future::ready;
15use futures::pin_mut;
16use futures::select;
17use itertools::Itertools;
18use vortex_array::ArrayContext;
19use vortex_array::ArrayRef;
20use vortex_array::dtype::DType;
21use vortex_array::expr::stats::Stat;
22use vortex_array::iter::ArrayIterator;
23use vortex_array::iter::ArrayIteratorExt;
24use vortex_array::session::ArraySessionExt;
25use vortex_array::stats::PRUNING_STATS;
26use vortex_array::stream::ArrayStream;
27use vortex_array::stream::ArrayStreamAdapter;
28use vortex_array::stream::ArrayStreamExt;
29use vortex_array::stream::SendableArrayStream;
30use vortex_buffer::ByteBuffer;
31use vortex_error::VortexError;
32use vortex_error::VortexExpect;
33use vortex_error::VortexResult;
34use vortex_error::vortex_bail;
35use vortex_error::vortex_err;
36use vortex_io::IoBuf;
37use vortex_io::VortexWrite;
38use vortex_io::kanal_ext::KanalExt;
39use vortex_io::runtime::BlockingRuntime;
40use vortex_io::session::RuntimeSessionExt;
41use vortex_layout::LayoutStrategy;
42use vortex_layout::layouts::file_stats::accumulate_stats;
43use vortex_layout::sequence::SequenceId;
44use vortex_layout::sequence::SequentialStreamAdapter;
45use vortex_layout::sequence::SequentialStreamExt;
46use vortex_session::SessionExt;
47use vortex_session::VortexSession;
48use vortex_session::registry::ReadContext;
49
50use crate::Footer;
51use crate::MAGIC_BYTES;
52use crate::WriteStrategyBuilder;
53use crate::counting::CountingVortexWrite;
54use crate::footer::FileStatistics;
55use crate::segments::writer::BufferedSegmentSink;
56
57pub struct VortexWriteOptions {
63 session: VortexSession,
64 strategy: Arc<dyn LayoutStrategy>,
65 exclude_dtype: bool,
66 max_variable_length_statistics_size: usize,
67 file_statistics: Vec<Stat>,
68}
69
70pub trait WriteOptionsSessionExt: SessionExt {
71 fn write_options(&self) -> VortexWriteOptions {
73 let session = self.session();
74 VortexWriteOptions {
75 strategy: WriteStrategyBuilder::default().build(),
76 session,
77 exclude_dtype: false,
78 file_statistics: PRUNING_STATS.to_vec(),
79 max_variable_length_statistics_size: 64,
80 }
81 }
82}
83impl<S: SessionExt> WriteOptionsSessionExt for S {}
84
85impl VortexWriteOptions {
86 pub fn new(session: VortexSession) -> Self {
88 VortexWriteOptions {
89 strategy: WriteStrategyBuilder::default().build(),
90 session,
91 exclude_dtype: false,
92 file_statistics: PRUNING_STATS.to_vec(),
93 max_variable_length_statistics_size: 64,
94 }
95 }
96
97 pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
99 self.strategy = strategy;
100 self
101 }
102
103 pub fn exclude_dtype(mut self) -> Self {
107 self.exclude_dtype = true;
108 self
109 }
110
111 pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
113 self.file_statistics = file_statistics;
114 self
115 }
116}
117
118impl VortexWriteOptions {
119 pub fn blocking<B: BlockingRuntime>(self, runtime: &B) -> BlockingWrite<'_, B> {
121 BlockingWrite {
122 options: self,
123 runtime,
124 }
125 }
126
127 pub async fn write<W: VortexWrite + Unpin, S: ArrayStream + Send + 'static>(
132 self,
133 write: W,
134 stream: S,
135 ) -> VortexResult<WriteSummary> {
136 self.write_internal(write, ArrayStreamExt::boxed(stream))
137 .await
138 }
139
140 async fn write_internal<W: VortexWrite + Unpin>(
141 self,
142 mut write: W,
143 stream: SendableArrayStream,
144 ) -> VortexResult<WriteSummary> {
145 let ctx = ArrayContext::new(self.session.arrays().registry().ids().sorted().collect())
151 .with_registry(self.session.arrays().registry().clone());
153 let dtype = stream.dtype().clone();
154
155 let (mut ptr, eof) = SequenceId::root().split();
156
157 let stream = SequentialStreamAdapter::new(
158 dtype.clone(),
159 stream
160 .try_filter(|chunk| ready(!chunk.is_empty()))
161 .map(move |result| result.map(|chunk| (ptr.advance(), chunk))),
162 )
163 .sendable();
164 let (file_stats, stream) = accumulate_stats(
165 stream,
166 self.file_statistics.clone().into(),
167 self.max_variable_length_statistics_size,
168 );
169
170 write.write_all(ByteBuffer::copy_from(MAGIC_BYTES)).await?;
172 let mut position = MAGIC_BYTES.len() as u64;
173
174 let (send, recv) = kanal::bounded_async(1);
176
177 let segments = Arc::new(BufferedSegmentSink::new(send, position));
178
179 let ctx2 = ctx.clone();
182 let layout_fut = self.session.handle().spawn_nested(|h| async move {
183 let layout = self
184 .strategy
185 .write_stream(ctx2, segments.clone(), stream, eof, h)
186 .await?;
187 Ok::<_, VortexError>((layout, segments.segment_specs()))
188 });
189
190 let recv_stream = recv.into_stream();
192 pin_mut!(recv_stream);
193 while let Some(buffer) = recv_stream.next().await {
194 if buffer.is_empty() {
195 continue;
196 }
197 position += buffer.len() as u64;
198 write.write_all(buffer).await?;
199 }
200
201 let (layout, segment_specs) = layout_fut.await?;
202
203 let mut footer = Footer::new(
205 layout.clone(),
206 segment_specs,
207 if self.file_statistics.is_empty() {
208 None
209 } else {
210 Some(FileStatistics::new_with_dtype(
211 file_stats.stats_sets().into(),
212 &dtype,
213 ))
214 },
215 ReadContext::new(ctx.to_ids()),
216 );
217
218 let footer_buffers = footer
220 .clone()
221 .into_serializer()
222 .with_offset(position)
223 .with_exclude_dtype(self.exclude_dtype)
224 .serialize()?;
225
226 footer = footer.with_approx_byte_size(footer_buffers.iter().map(|b| b.len()).sum());
229
230 for buffer in footer_buffers {
231 position += buffer.len() as u64;
232 write.write_all(buffer).await?;
233 }
234
235 write.flush().await?;
236
237 Ok(WriteSummary {
238 footer,
239 size: position,
240 })
241 }
242
243 pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> {
245 let (arrays_send, arrays_recv) = kanal::bounded_async(1);
247
248 let arrays =
249 ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream()));
250
251 let write = CountingVortexWrite::new(write);
252 let bytes_written = write.counter();
253 let strategy = self.strategy.clone();
254 let future = self.write(write, arrays).boxed_local().fuse();
255
256 Writer {
257 arrays: Some(arrays_send),
258 future,
259 bytes_written,
260 strategy,
261 }
262 }
263}
264
265pub struct Writer<'w> {
267 arrays: Option<kanal::AsyncSender<VortexResult<ArrayRef>>>,
269 future: Fuse<LocalBoxFuture<'w, VortexResult<WriteSummary>>>,
271 bytes_written: Arc<AtomicU64>,
273 strategy: Arc<dyn LayoutStrategy>,
275}
276
277impl Writer<'_> {
278 pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
280 let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
281 let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse();
282 pin_mut!(send_fut);
283
284 select! {
287 result = send_fut => {
288 if result.is_err() {
290 return Err(self.handle_failed_task().await);
291 }
292 },
293 result = &mut self.future => {
294 match result {
298 Ok(_) => vortex_bail!("Internal error: writer future completed early"),
299 Err(e) => return Err(e),
300 }
301 }
302 }
303
304 Ok(())
305 }
306
307 pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> {
312 let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
313 let stream_fut = async move {
314 while let Some(chunk) = stream.next().await {
315 arrays.send(chunk).await?;
316 }
317 Ok::<_, kanal::SendError>(())
318 }
319 .fuse();
320 pin_mut!(stream_fut);
321
322 select! {
325 result = stream_fut => {
326 if let Err(_send_err) = result {
327 return Err(self.handle_failed_task().await);
329 }
330 }
331
332 result = &mut self.future => {
333 match result {
337 Ok(_) => vortex_bail!("Internal error: writer future completed early"),
338 Err(e) => return Err(e),
339 }
340 }
341 }
342
343 Ok(())
344 }
345
346 pub fn bytes_written(&self) -> u64 {
348 self.bytes_written
349 .load(std::sync::atomic::Ordering::Relaxed)
350 }
351
352 pub fn buffered_bytes(&self) -> u64 {
354 self.strategy.buffered_bytes()
355 }
356
357 pub async fn finish(mut self) -> VortexResult<WriteSummary> {
360 drop(self.arrays.take());
362
363 self.future.await
365 }
366
367 async fn handle_failed_task(&mut self) -> VortexError {
369 match (&mut self.future).await {
370 Ok(_) => vortex_err!(
371 "Internal error: writer task completed successfully but write future finished early"
372 ),
373 Err(e) => e,
374 }
375 }
376}
377
378pub struct BlockingWrite<'rt, B: BlockingRuntime> {
380 options: VortexWriteOptions,
381 runtime: &'rt B,
382}
383
384impl<'rt, B: BlockingRuntime> BlockingWrite<'rt, B> {
385 pub fn write<W: Write + Unpin>(
387 self,
388 write: W,
389 iter: impl ArrayIterator + Send + 'static,
390 ) -> VortexResult<WriteSummary> {
391 self.runtime.block_on(async move {
392 self.options
393 .write(BlockingWriteAdapter(write), iter.into_array_stream())
394 .await
395 })
396 }
397
398 pub fn writer<'w, W: Write + Unpin + 'w>(
399 self,
400 write: W,
401 dtype: DType,
402 ) -> BlockingWriter<'rt, 'w, B> {
403 BlockingWriter {
404 writer: self.options.writer(BlockingWriteAdapter(write), dtype),
405 runtime: self.runtime,
406 }
407 }
408}
409
410pub struct BlockingWriter<'rt, 'w, B: BlockingRuntime> {
412 runtime: &'rt B,
413 writer: Writer<'w>,
414}
415
416impl<B: BlockingRuntime> BlockingWriter<'_, '_, B> {
417 pub fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
418 self.runtime.block_on(self.writer.push(chunk))
419 }
420
421 pub fn bytes_written(&self) -> u64 {
422 self.writer.bytes_written()
423 }
424
425 pub fn buffered_bytes(&self) -> u64 {
426 self.writer.buffered_bytes()
427 }
428
429 pub fn finish(self) -> VortexResult<WriteSummary> {
430 self.runtime.block_on(self.writer.finish())
431 }
432}
433
434struct BlockingWriteAdapter<W>(W);
436
437impl<W: Write + Unpin> VortexWrite for BlockingWriteAdapter<W> {
438 async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
439 self.0.write_all(buffer.as_slice())?;
440 Ok(buffer)
441 }
442
443 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
444 ready(self.0.flush())
445 }
446
447 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
448 ready(Ok(()))
449 }
450}
451
452pub struct WriteSummary {
453 footer: Footer,
454 size: u64,
455 }
457
458impl WriteSummary {
459 pub fn footer(&self) -> &Footer {
461 &self.footer
462 }
463
464 pub fn size(&self) -> u64 {
466 self.size
467 }
468
469 pub fn row_count(&self) -> u64 {
471 self.footer.row_count()
472 }
473}