1use std::io;
5use std::io::Write;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8
9use futures::FutureExt;
10use futures::StreamExt;
11use futures::TryStreamExt;
12use futures::future::Fuse;
13use futures::future::LocalBoxFuture;
14use futures::future::ready;
15use futures::pin_mut;
16use futures::select;
17use itertools::Itertools;
18use vortex_array::ArrayContext;
19use vortex_array::ArrayRef;
20use vortex_array::expr::stats::Stat;
21use vortex_array::iter::ArrayIterator;
22use vortex_array::iter::ArrayIteratorExt;
23use vortex_array::session::ArraySessionExt;
24use vortex_array::stats::PRUNING_STATS;
25use vortex_array::stream::ArrayStream;
26use vortex_array::stream::ArrayStreamAdapter;
27use vortex_array::stream::ArrayStreamExt;
28use vortex_array::stream::SendableArrayStream;
29use vortex_buffer::ByteBuffer;
30use vortex_dtype::DType;
31use vortex_error::VortexError;
32use vortex_error::VortexExpect;
33use vortex_error::VortexResult;
34use vortex_error::vortex_bail;
35use vortex_error::vortex_err;
36use vortex_io::IoBuf;
37use vortex_io::VortexWrite;
38use vortex_io::kanal_ext::KanalExt;
39use vortex_io::runtime::BlockingRuntime;
40use vortex_io::session::RuntimeSessionExt;
41use vortex_layout::LayoutStrategy;
42use vortex_layout::layouts::file_stats::accumulate_stats;
43use vortex_layout::sequence::SequenceId;
44use vortex_layout::sequence::SequentialStreamAdapter;
45use vortex_layout::sequence::SequentialStreamExt;
46use vortex_session::SessionExt;
47use vortex_session::VortexSession;
48
49use crate::Footer;
50use crate::MAGIC_BYTES;
51use crate::WriteStrategyBuilder;
52use crate::counting::CountingVortexWrite;
53use crate::footer::FileStatistics;
54use crate::segments::writer::BufferedSegmentSink;
55
56pub struct VortexWriteOptions {
62 session: VortexSession,
63 strategy: Arc<dyn LayoutStrategy>,
64 exclude_dtype: bool,
65 max_variable_length_statistics_size: usize,
66 file_statistics: Vec<Stat>,
67}
68
69pub trait WriteOptionsSessionExt: SessionExt {
70 fn write_options(&self) -> VortexWriteOptions {
72 let session = self.session();
73 VortexWriteOptions {
74 strategy: WriteStrategyBuilder::default().build(),
75 session,
76 exclude_dtype: false,
77 file_statistics: PRUNING_STATS.to_vec(),
78 max_variable_length_statistics_size: 64,
79 }
80 }
81}
82impl<S: SessionExt> WriteOptionsSessionExt for S {}
83
84impl VortexWriteOptions {
85 pub fn new(session: VortexSession) -> Self {
87 VortexWriteOptions {
88 strategy: WriteStrategyBuilder::default().build(),
89 session,
90 exclude_dtype: false,
91 file_statistics: PRUNING_STATS.to_vec(),
92 max_variable_length_statistics_size: 64,
93 }
94 }
95
96 pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
98 self.strategy = strategy;
99 self
100 }
101
102 pub fn exclude_dtype(mut self) -> Self {
106 self.exclude_dtype = true;
107 self
108 }
109
110 pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
112 self.file_statistics = file_statistics;
113 self
114 }
115}
116
117impl VortexWriteOptions {
118 pub fn blocking<B: BlockingRuntime>(self, runtime: &B) -> BlockingWrite<'_, B> {
120 BlockingWrite {
121 options: self,
122 runtime,
123 }
124 }
125
126 pub async fn write<W: VortexWrite + Unpin, S: ArrayStream + Send + 'static>(
131 self,
132 write: W,
133 stream: S,
134 ) -> VortexResult<WriteSummary> {
135 self.write_internal(write, ArrayStreamExt::boxed(stream))
136 .await
137 }
138
139 async fn write_internal<W: VortexWrite + Unpin>(
140 self,
141 mut write: W,
142 stream: SendableArrayStream,
143 ) -> VortexResult<WriteSummary> {
144 let ctx = ArrayContext::new(self.session.arrays().registry().ids().sorted().collect())
150 .with_registry(self.session.arrays().registry().clone());
152 let dtype = stream.dtype().clone();
153
154 let (mut ptr, eof) = SequenceId::root().split();
155
156 let stream = SequentialStreamAdapter::new(
157 dtype.clone(),
158 stream
159 .try_filter(|chunk| ready(!chunk.is_empty()))
160 .map(move |result| result.map(|chunk| (ptr.advance(), chunk))),
161 )
162 .sendable();
163 let (file_stats, stream) = accumulate_stats(
164 stream,
165 self.file_statistics.clone().into(),
166 self.max_variable_length_statistics_size,
167 );
168
169 write.write_all(ByteBuffer::copy_from(MAGIC_BYTES)).await?;
171 let mut position = MAGIC_BYTES.len() as u64;
172
173 let (send, recv) = kanal::bounded_async(1);
175
176 let segments = Arc::new(BufferedSegmentSink::new(send, position));
177
178 let ctx2 = ctx.clone();
181 let layout_fut = self.session.handle().spawn_nested(|h| async move {
182 let layout = self
183 .strategy
184 .write_stream(ctx2, segments.clone(), stream, eof, h)
185 .await?;
186 Ok::<_, VortexError>((layout, segments.segment_specs()))
187 });
188
189 let recv_stream = recv.into_stream();
191 pin_mut!(recv_stream);
192 while let Some(buffer) = recv_stream.next().await {
193 if buffer.is_empty() {
194 continue;
195 }
196 position += buffer.len() as u64;
197 write.write_all(buffer).await?;
198 }
199
200 let (layout, segment_specs) = layout_fut.await?;
201
202 let footer = Footer::new(
204 layout.clone(),
205 segment_specs,
206 if self.file_statistics.is_empty() {
207 None
208 } else {
209 Some(FileStatistics::new_with_dtype(
210 file_stats.stats_sets().into(),
211 &dtype,
212 ))
213 },
214 ctx,
215 );
216
217 let footer_buffers = footer
219 .clone()
220 .into_serializer()
221 .with_offset(position)
222 .with_exclude_dtype(self.exclude_dtype)
223 .serialize()?;
224 for buffer in footer_buffers {
225 position += buffer.len() as u64;
226 write.write_all(buffer).await?;
227 }
228
229 write.flush().await?;
230
231 Ok(WriteSummary {
232 footer,
233 size: position,
234 })
235 }
236
237 pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> {
239 let (arrays_send, arrays_recv) = kanal::bounded_async(1);
241
242 let arrays =
243 ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream()));
244
245 let write = CountingVortexWrite::new(write);
246 let bytes_written = write.counter();
247 let strategy = self.strategy.clone();
248 let future = self.write(write, arrays).boxed_local().fuse();
249
250 Writer {
251 arrays: Some(arrays_send),
252 future,
253 bytes_written,
254 strategy,
255 }
256 }
257}
258
259pub struct Writer<'w> {
261 arrays: Option<kanal::AsyncSender<VortexResult<ArrayRef>>>,
263 future: Fuse<LocalBoxFuture<'w, VortexResult<WriteSummary>>>,
265 bytes_written: Arc<AtomicU64>,
267 strategy: Arc<dyn LayoutStrategy>,
269}
270
271impl Writer<'_> {
272 pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
274 let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
275 let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse();
276 pin_mut!(send_fut);
277
278 select! {
281 result = send_fut => {
282 if result.is_err() {
284 return Err(self.handle_failed_task().await);
285 }
286 },
287 result = &mut self.future => {
288 match result {
292 Ok(_) => vortex_bail!("Internal error: writer future completed early"),
293 Err(e) => return Err(e),
294 }
295 }
296 }
297
298 Ok(())
299 }
300
301 pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> {
306 let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
307 let stream_fut = async move {
308 while let Some(chunk) = stream.next().await {
309 arrays.send(chunk).await?;
310 }
311 Ok::<_, kanal::SendError>(())
312 }
313 .fuse();
314 pin_mut!(stream_fut);
315
316 select! {
319 result = stream_fut => {
320 if let Err(_send_err) = result {
321 return Err(self.handle_failed_task().await);
323 }
324 }
325
326 result = &mut self.future => {
327 match result {
331 Ok(_) => vortex_bail!("Internal error: writer future completed early"),
332 Err(e) => return Err(e),
333 }
334 }
335 }
336
337 Ok(())
338 }
339
340 pub fn bytes_written(&self) -> u64 {
342 self.bytes_written
343 .load(std::sync::atomic::Ordering::Relaxed)
344 }
345
346 pub fn buffered_bytes(&self) -> u64 {
348 self.strategy.buffered_bytes()
349 }
350
351 pub async fn finish(mut self) -> VortexResult<WriteSummary> {
354 drop(self.arrays.take());
356
357 self.future.await
359 }
360
361 async fn handle_failed_task(&mut self) -> VortexError {
363 match (&mut self.future).await {
364 Ok(_) => vortex_err!(
365 "Internal error: writer task completed successfully but write future finished early"
366 ),
367 Err(e) => e,
368 }
369 }
370}
371
372pub struct BlockingWrite<'rt, B: BlockingRuntime> {
374 options: VortexWriteOptions,
375 runtime: &'rt B,
376}
377
378impl<'rt, B: BlockingRuntime> BlockingWrite<'rt, B> {
379 pub fn write<W: Write + Unpin>(
381 self,
382 write: W,
383 iter: impl ArrayIterator + Send + 'static,
384 ) -> VortexResult<WriteSummary> {
385 self.runtime.block_on(async move {
386 self.options
387 .write(BlockingWriteAdapter(write), iter.into_array_stream())
388 .await
389 })
390 }
391
392 pub fn writer<'w, W: Write + Unpin + 'w>(
393 self,
394 write: W,
395 dtype: DType,
396 ) -> BlockingWriter<'rt, 'w, B> {
397 BlockingWriter {
398 writer: self.options.writer(BlockingWriteAdapter(write), dtype),
399 runtime: self.runtime,
400 }
401 }
402}
403
404pub struct BlockingWriter<'rt, 'w, B: BlockingRuntime> {
406 runtime: &'rt B,
407 writer: Writer<'w>,
408}
409
410impl<B: BlockingRuntime> BlockingWriter<'_, '_, B> {
411 pub fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
412 self.runtime.block_on(self.writer.push(chunk))
413 }
414
415 pub fn bytes_written(&self) -> u64 {
416 self.writer.bytes_written()
417 }
418
419 pub fn buffered_bytes(&self) -> u64 {
420 self.writer.buffered_bytes()
421 }
422
423 pub fn finish(self) -> VortexResult<WriteSummary> {
424 self.runtime.block_on(self.writer.finish())
425 }
426}
427
428struct BlockingWriteAdapter<W>(W);
430
431impl<W: Write + Unpin> VortexWrite for BlockingWriteAdapter<W> {
432 async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
433 self.0.write_all(buffer.as_slice())?;
434 Ok(buffer)
435 }
436
437 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
438 ready(self.0.flush())
439 }
440
441 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
442 ready(Ok(()))
443 }
444}
445
446pub struct WriteSummary {
447 footer: Footer,
448 size: u64,
449 }
451
452impl WriteSummary {
453 pub fn footer(&self) -> &Footer {
455 &self.footer
456 }
457
458 pub fn size(&self) -> u64 {
460 self.size
461 }
462
463 pub fn row_count(&self) -> u64 {
465 self.footer.row_count()
466 }
467}