1use std::io;
5use std::io::Write;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8
9use futures::FutureExt;
10use futures::StreamExt;
11use futures::TryStreamExt;
12use futures::future::Fuse;
13use futures::future::LocalBoxFuture;
14use futures::future::ready;
15use futures::pin_mut;
16use futures::select;
17use itertools::Itertools;
18use vortex_array::ArrayContext;
19use vortex_array::ArrayRef;
20use vortex_array::dtype::DType;
21use vortex_array::expr::stats::Stat;
22use vortex_array::iter::ArrayIterator;
23use vortex_array::iter::ArrayIteratorExt;
24use vortex_array::session::ArraySessionExt;
25use vortex_array::stats::PRUNING_STATS;
26use vortex_array::stream::ArrayStream;
27use vortex_array::stream::ArrayStreamAdapter;
28use vortex_array::stream::ArrayStreamExt;
29use vortex_array::stream::SendableArrayStream;
30use vortex_buffer::ByteBuffer;
31use vortex_error::VortexError;
32use vortex_error::VortexExpect;
33use vortex_error::VortexResult;
34use vortex_error::vortex_bail;
35use vortex_error::vortex_err;
36use vortex_io::IoBuf;
37use vortex_io::VortexWrite;
38use vortex_io::kanal_ext::KanalExt;
39use vortex_io::runtime::BlockingRuntime;
40use vortex_io::session::RuntimeSessionExt;
41use vortex_layout::LayoutStrategy;
42use vortex_layout::layouts::file_stats::accumulate_stats;
43use vortex_layout::sequence::SequenceId;
44use vortex_layout::sequence::SequentialStreamAdapter;
45use vortex_layout::sequence::SequentialStreamExt;
46use vortex_session::SessionExt;
47use vortex_session::VortexSession;
48
49use crate::Footer;
50use crate::MAGIC_BYTES;
51use crate::WriteStrategyBuilder;
52use crate::counting::CountingVortexWrite;
53use crate::footer::FileStatistics;
54use crate::segments::writer::BufferedSegmentSink;
55
56pub struct VortexWriteOptions {
62 session: VortexSession,
63 strategy: Arc<dyn LayoutStrategy>,
64 exclude_dtype: bool,
65 max_variable_length_statistics_size: usize,
66 file_statistics: Vec<Stat>,
67}
68
69pub trait WriteOptionsSessionExt: SessionExt {
70 fn write_options(&self) -> VortexWriteOptions {
72 let session = self.session();
73 VortexWriteOptions {
74 strategy: WriteStrategyBuilder::default().build(),
75 session,
76 exclude_dtype: false,
77 file_statistics: PRUNING_STATS.to_vec(),
78 max_variable_length_statistics_size: 64,
79 }
80 }
81}
82impl<S: SessionExt> WriteOptionsSessionExt for S {}
83
84impl VortexWriteOptions {
85 pub fn new(session: VortexSession) -> Self {
87 VortexWriteOptions {
88 strategy: WriteStrategyBuilder::default().build(),
89 session,
90 exclude_dtype: false,
91 file_statistics: PRUNING_STATS.to_vec(),
92 max_variable_length_statistics_size: 64,
93 }
94 }
95
96 pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
98 self.strategy = strategy;
99 self
100 }
101
102 pub fn exclude_dtype(mut self) -> Self {
106 self.exclude_dtype = true;
107 self
108 }
109
110 pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
112 self.file_statistics = file_statistics;
113 self
114 }
115}
116
117impl VortexWriteOptions {
118 pub fn blocking<B: BlockingRuntime>(self, runtime: &B) -> BlockingWrite<'_, B> {
120 BlockingWrite {
121 options: self,
122 runtime,
123 }
124 }
125
126 pub async fn write<W: VortexWrite + Unpin, S: ArrayStream + Send + 'static>(
131 self,
132 write: W,
133 stream: S,
134 ) -> VortexResult<WriteSummary> {
135 self.write_internal(write, ArrayStreamExt::boxed(stream))
136 .await
137 }
138
139 async fn write_internal<W: VortexWrite + Unpin>(
140 self,
141 mut write: W,
142 stream: SendableArrayStream,
143 ) -> VortexResult<WriteSummary> {
144 let ctx = ArrayContext::new(self.session.arrays().registry().ids().sorted().collect())
150 .with_registry(self.session.arrays().registry().clone());
152 let dtype = stream.dtype().clone();
153
154 let (mut ptr, eof) = SequenceId::root().split();
155
156 let stream = SequentialStreamAdapter::new(
157 dtype.clone(),
158 stream
159 .try_filter(|chunk| ready(!chunk.is_empty()))
160 .map(move |result| result.map(|chunk| (ptr.advance(), chunk))),
161 )
162 .sendable();
163 let (file_stats, stream) = accumulate_stats(
164 stream,
165 self.file_statistics.clone().into(),
166 self.max_variable_length_statistics_size,
167 );
168
169 write.write_all(ByteBuffer::copy_from(MAGIC_BYTES)).await?;
171 let mut position = MAGIC_BYTES.len() as u64;
172
173 let (send, recv) = kanal::bounded_async(1);
175
176 let segments = Arc::new(BufferedSegmentSink::new(send, position));
177
178 let ctx2 = ctx.clone();
181 let layout_fut = self.session.handle().spawn_nested(|h| async move {
182 let layout = self
183 .strategy
184 .write_stream(ctx2, segments.clone(), stream, eof, h)
185 .await?;
186 Ok::<_, VortexError>((layout, segments.segment_specs()))
187 });
188
189 let recv_stream = recv.into_stream();
191 pin_mut!(recv_stream);
192 while let Some(buffer) = recv_stream.next().await {
193 if buffer.is_empty() {
194 continue;
195 }
196 position += buffer.len() as u64;
197 write.write_all(buffer).await?;
198 }
199
200 let (layout, segment_specs) = layout_fut.await?;
201
202 let mut footer = Footer::new(
204 layout.clone(),
205 segment_specs,
206 if self.file_statistics.is_empty() {
207 None
208 } else {
209 Some(FileStatistics::new_with_dtype(
210 file_stats.stats_sets().into(),
211 &dtype,
212 ))
213 },
214 ctx,
215 );
216
217 let footer_buffers = footer
219 .clone()
220 .into_serializer()
221 .with_offset(position)
222 .with_exclude_dtype(self.exclude_dtype)
223 .serialize()?;
224
225 footer = footer.with_approx_byte_size(footer_buffers.iter().map(|b| b.len()).sum());
228
229 for buffer in footer_buffers {
230 position += buffer.len() as u64;
231 write.write_all(buffer).await?;
232 }
233
234 write.flush().await?;
235
236 Ok(WriteSummary {
237 footer,
238 size: position,
239 })
240 }
241
242 pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> {
244 let (arrays_send, arrays_recv) = kanal::bounded_async(1);
246
247 let arrays =
248 ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream()));
249
250 let write = CountingVortexWrite::new(write);
251 let bytes_written = write.counter();
252 let strategy = self.strategy.clone();
253 let future = self.write(write, arrays).boxed_local().fuse();
254
255 Writer {
256 arrays: Some(arrays_send),
257 future,
258 bytes_written,
259 strategy,
260 }
261 }
262}
263
264pub struct Writer<'w> {
266 arrays: Option<kanal::AsyncSender<VortexResult<ArrayRef>>>,
268 future: Fuse<LocalBoxFuture<'w, VortexResult<WriteSummary>>>,
270 bytes_written: Arc<AtomicU64>,
272 strategy: Arc<dyn LayoutStrategy>,
274}
275
276impl Writer<'_> {
277 pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
279 let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
280 let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse();
281 pin_mut!(send_fut);
282
283 select! {
286 result = send_fut => {
287 if result.is_err() {
289 return Err(self.handle_failed_task().await);
290 }
291 },
292 result = &mut self.future => {
293 match result {
297 Ok(_) => vortex_bail!("Internal error: writer future completed early"),
298 Err(e) => return Err(e),
299 }
300 }
301 }
302
303 Ok(())
304 }
305
306 pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> {
311 let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
312 let stream_fut = async move {
313 while let Some(chunk) = stream.next().await {
314 arrays.send(chunk).await?;
315 }
316 Ok::<_, kanal::SendError>(())
317 }
318 .fuse();
319 pin_mut!(stream_fut);
320
321 select! {
324 result = stream_fut => {
325 if let Err(_send_err) = result {
326 return Err(self.handle_failed_task().await);
328 }
329 }
330
331 result = &mut self.future => {
332 match result {
336 Ok(_) => vortex_bail!("Internal error: writer future completed early"),
337 Err(e) => return Err(e),
338 }
339 }
340 }
341
342 Ok(())
343 }
344
345 pub fn bytes_written(&self) -> u64 {
347 self.bytes_written
348 .load(std::sync::atomic::Ordering::Relaxed)
349 }
350
351 pub fn buffered_bytes(&self) -> u64 {
353 self.strategy.buffered_bytes()
354 }
355
356 pub async fn finish(mut self) -> VortexResult<WriteSummary> {
359 drop(self.arrays.take());
361
362 self.future.await
364 }
365
366 async fn handle_failed_task(&mut self) -> VortexError {
368 match (&mut self.future).await {
369 Ok(_) => vortex_err!(
370 "Internal error: writer task completed successfully but write future finished early"
371 ),
372 Err(e) => e,
373 }
374 }
375}
376
377pub struct BlockingWrite<'rt, B: BlockingRuntime> {
379 options: VortexWriteOptions,
380 runtime: &'rt B,
381}
382
383impl<'rt, B: BlockingRuntime> BlockingWrite<'rt, B> {
384 pub fn write<W: Write + Unpin>(
386 self,
387 write: W,
388 iter: impl ArrayIterator + Send + 'static,
389 ) -> VortexResult<WriteSummary> {
390 self.runtime.block_on(async move {
391 self.options
392 .write(BlockingWriteAdapter(write), iter.into_array_stream())
393 .await
394 })
395 }
396
397 pub fn writer<'w, W: Write + Unpin + 'w>(
398 self,
399 write: W,
400 dtype: DType,
401 ) -> BlockingWriter<'rt, 'w, B> {
402 BlockingWriter {
403 writer: self.options.writer(BlockingWriteAdapter(write), dtype),
404 runtime: self.runtime,
405 }
406 }
407}
408
409pub struct BlockingWriter<'rt, 'w, B: BlockingRuntime> {
411 runtime: &'rt B,
412 writer: Writer<'w>,
413}
414
415impl<B: BlockingRuntime> BlockingWriter<'_, '_, B> {
416 pub fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
417 self.runtime.block_on(self.writer.push(chunk))
418 }
419
420 pub fn bytes_written(&self) -> u64 {
421 self.writer.bytes_written()
422 }
423
424 pub fn buffered_bytes(&self) -> u64 {
425 self.writer.buffered_bytes()
426 }
427
428 pub fn finish(self) -> VortexResult<WriteSummary> {
429 self.runtime.block_on(self.writer.finish())
430 }
431}
432
433struct BlockingWriteAdapter<W>(W);
435
436impl<W: Write + Unpin> VortexWrite for BlockingWriteAdapter<W> {
437 async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
438 self.0.write_all(buffer.as_slice())?;
439 Ok(buffer)
440 }
441
442 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
443 ready(self.0.flush())
444 }
445
446 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
447 ready(Ok(()))
448 }
449}
450
451pub struct WriteSummary {
452 footer: Footer,
453 size: u64,
454 }
456
457impl WriteSummary {
458 pub fn footer(&self) -> &Footer {
460 &self.footer
461 }
462
463 pub fn size(&self) -> u64 {
465 self.size
466 }
467
468 pub fn row_count(&self) -> u64 {
470 self.footer.row_count()
471 }
472}