1use std::io;
5use std::io::Write;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8
9use futures::FutureExt;
10use futures::StreamExt;
11use futures::TryStreamExt;
12use futures::future::Fuse;
13use futures::future::LocalBoxFuture;
14use futures::future::ready;
15use futures::pin_mut;
16use futures::select;
17use vortex_array::ArrayContext;
18use vortex_array::ArrayRef;
19use vortex_array::expr::stats::Stat;
20use vortex_array::iter::ArrayIterator;
21use vortex_array::iter::ArrayIteratorExt;
22use vortex_array::session::ArraySessionExt;
23use vortex_array::stats::PRUNING_STATS;
24use vortex_array::stream::ArrayStream;
25use vortex_array::stream::ArrayStreamAdapter;
26use vortex_array::stream::ArrayStreamExt;
27use vortex_array::stream::SendableArrayStream;
28use vortex_buffer::ByteBuffer;
29use vortex_dtype::DType;
30use vortex_error::VortexError;
31use vortex_error::VortexExpect;
32use vortex_error::VortexResult;
33use vortex_error::vortex_bail;
34use vortex_error::vortex_err;
35use vortex_io::IoBuf;
36use vortex_io::VortexWrite;
37use vortex_io::kanal_ext::KanalExt;
38use vortex_io::runtime::BlockingRuntime;
39use vortex_io::session::RuntimeSessionExt;
40use vortex_layout::LayoutStrategy;
41use vortex_layout::layouts::file_stats::accumulate_stats;
42use vortex_layout::sequence::SequenceId;
43use vortex_layout::sequence::SequentialStreamAdapter;
44use vortex_layout::sequence::SequentialStreamExt;
45use vortex_session::SessionExt;
46use vortex_session::VortexSession;
47
48use crate::Footer;
49use crate::MAGIC_BYTES;
50use crate::WriteStrategyBuilder;
51use crate::counting::CountingVortexWrite;
52use crate::footer::FileStatistics;
53use crate::segments::writer::BufferedSegmentSink;
54
55pub struct VortexWriteOptions {
61 session: VortexSession,
62 strategy: Arc<dyn LayoutStrategy>,
63 exclude_dtype: bool,
64 max_variable_length_statistics_size: usize,
65 file_statistics: Vec<Stat>,
66}
67
68pub trait WriteOptionsSessionExt: SessionExt {
69 fn write_options(&self) -> VortexWriteOptions {
71 VortexWriteOptions {
72 session: self.session(),
73 strategy: WriteStrategyBuilder::new().build(),
74 exclude_dtype: false,
75 file_statistics: PRUNING_STATS.to_vec(),
76 max_variable_length_statistics_size: 64,
77 }
78 }
79}
80impl<S: SessionExt> WriteOptionsSessionExt for S {}
81
82impl VortexWriteOptions {
83 pub fn new(session: VortexSession) -> Self {
85 VortexWriteOptions {
86 session,
87 strategy: WriteStrategyBuilder::new().build(),
88 exclude_dtype: false,
89 file_statistics: PRUNING_STATS.to_vec(),
90 max_variable_length_statistics_size: 64,
91 }
92 }
93
94 pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
96 self.strategy = strategy;
97 self
98 }
99
100 pub fn exclude_dtype(mut self) -> Self {
104 self.exclude_dtype = true;
105 self
106 }
107
108 pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
110 self.file_statistics = file_statistics;
111 self
112 }
113}
114
115impl VortexWriteOptions {
116 pub fn blocking<B: BlockingRuntime>(self, runtime: &B) -> BlockingWrite<'_, B> {
118 BlockingWrite {
119 options: self,
120 runtime,
121 }
122 }
123
124 pub async fn write<W: VortexWrite + Unpin, S: ArrayStream + Send + 'static>(
129 self,
130 write: W,
131 stream: S,
132 ) -> VortexResult<WriteSummary> {
133 self.write_internal(write, ArrayStreamExt::boxed(stream))
134 .await
135 }
136
137 async fn write_internal<W: VortexWrite + Unpin>(
138 self,
139 mut write: W,
140 stream: SendableArrayStream,
141 ) -> VortexResult<WriteSummary> {
142 let ctx = ArrayContext::from_registry_sorted(self.session.arrays().registry());
148 let dtype = stream.dtype().clone();
149
150 let (mut ptr, eof) = SequenceId::root().split();
151
152 let stream = SequentialStreamAdapter::new(
153 dtype.clone(),
154 stream
155 .try_filter(|chunk| ready(!chunk.is_empty()))
156 .map(move |result| result.map(|chunk| (ptr.advance(), chunk))),
157 )
158 .sendable();
159 let (file_stats, stream) = accumulate_stats(
160 stream,
161 self.file_statistics.clone().into(),
162 self.max_variable_length_statistics_size,
163 );
164
165 write.write_all(ByteBuffer::copy_from(MAGIC_BYTES)).await?;
167 let mut position = MAGIC_BYTES.len() as u64;
168
169 let (send, recv) = kanal::bounded_async(1);
171
172 let segments = Arc::new(BufferedSegmentSink::new(send, position));
173
174 let ctx2 = ctx.clone();
177 let layout_fut = self.session.handle().spawn_nested(|h| async move {
178 let layout = self
179 .strategy
180 .write_stream(ctx2, segments.clone(), stream, eof, h)
181 .await?;
182 Ok::<_, VortexError>((layout, segments.segment_specs()))
183 });
184
185 let recv_stream = recv.into_stream();
187 pin_mut!(recv_stream);
188 while let Some(buffer) = recv_stream.next().await {
189 if buffer.is_empty() {
190 continue;
191 }
192 position += buffer.len() as u64;
193 write.write_all(buffer).await?;
194 }
195
196 let (layout, segment_specs) = layout_fut.await?;
197
198 let footer = Footer::new(
200 layout.clone(),
201 segment_specs,
202 if self.file_statistics.is_empty() {
203 None
204 } else {
205 Some(FileStatistics(file_stats.stats_sets().into()))
206 },
207 ctx,
208 );
209
210 let footer_buffers = footer
212 .clone()
213 .into_serializer()
214 .with_offset(position)
215 .with_exclude_dtype(self.exclude_dtype)
216 .serialize()?;
217 for buffer in footer_buffers {
218 position += buffer.len() as u64;
219 write.write_all(buffer).await?;
220 }
221
222 write.flush().await?;
223
224 Ok(WriteSummary {
225 footer,
226 size: position,
227 })
228 }
229
230 pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> {
232 let (arrays_send, arrays_recv) = kanal::bounded_async(1);
234
235 let arrays =
236 ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream()));
237
238 let write = CountingVortexWrite::new(write);
239 let bytes_written = write.counter();
240 let strategy = self.strategy.clone();
241 let future = self.write(write, arrays).boxed_local().fuse();
242
243 Writer {
244 arrays: Some(arrays_send),
245 future,
246 bytes_written,
247 strategy,
248 }
249 }
250}
251
252pub struct Writer<'w> {
254 arrays: Option<kanal::AsyncSender<VortexResult<ArrayRef>>>,
256 future: Fuse<LocalBoxFuture<'w, VortexResult<WriteSummary>>>,
258 bytes_written: Arc<AtomicU64>,
260 strategy: Arc<dyn LayoutStrategy>,
262}
263
264impl Writer<'_> {
265 pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
267 let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
268 let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse();
269 pin_mut!(send_fut);
270
271 select! {
274 result = send_fut => {
275 if result.is_err() {
277 return Err(self.handle_failed_task().await);
278 }
279 },
280 result = &mut self.future => {
281 match result {
285 Ok(_) => vortex_bail!("Internal error: writer future completed early"),
286 Err(e) => return Err(e),
287 }
288 }
289 }
290
291 Ok(())
292 }
293
294 pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> {
299 let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
300 let stream_fut = async move {
301 while let Some(chunk) = stream.next().await {
302 arrays.send(chunk).await?;
303 }
304 Ok::<_, kanal::SendError>(())
305 }
306 .fuse();
307 pin_mut!(stream_fut);
308
309 select! {
312 result = stream_fut => {
313 if let Err(_send_err) = result {
314 return Err(self.handle_failed_task().await);
316 }
317 }
318
319 result = &mut self.future => {
320 match result {
324 Ok(_) => vortex_bail!("Internal error: writer future completed early"),
325 Err(e) => return Err(e),
326 }
327 }
328 }
329
330 Ok(())
331 }
332
333 pub fn bytes_written(&self) -> u64 {
335 self.bytes_written
336 .load(std::sync::atomic::Ordering::Relaxed)
337 }
338
339 pub fn buffered_bytes(&self) -> u64 {
341 self.strategy.buffered_bytes()
342 }
343
344 pub async fn finish(mut self) -> VortexResult<WriteSummary> {
347 drop(self.arrays.take());
349
350 self.future.await
352 }
353
354 async fn handle_failed_task(&mut self) -> VortexError {
356 match (&mut self.future).await {
357 Ok(_) => vortex_err!(
358 "Internal error: writer task completed successfully but write future finished early"
359 ),
360 Err(e) => e,
361 }
362 }
363}
364
365pub struct BlockingWrite<'rt, B: BlockingRuntime> {
367 options: VortexWriteOptions,
368 runtime: &'rt B,
369}
370
371impl<'rt, B: BlockingRuntime> BlockingWrite<'rt, B> {
372 pub fn write<W: Write + Unpin>(
374 self,
375 write: W,
376 iter: impl ArrayIterator + Send + 'static,
377 ) -> VortexResult<WriteSummary> {
378 self.runtime.block_on(async move {
379 self.options
380 .write(BlockingWriteAdapter(write), iter.into_array_stream())
381 .await
382 })
383 }
384
385 pub fn writer<'w, W: Write + Unpin + 'w>(
386 self,
387 write: W,
388 dtype: DType,
389 ) -> BlockingWriter<'rt, 'w, B> {
390 BlockingWriter {
391 writer: self.options.writer(BlockingWriteAdapter(write), dtype),
392 runtime: self.runtime,
393 }
394 }
395}
396
397pub struct BlockingWriter<'rt, 'w, B: BlockingRuntime> {
399 runtime: &'rt B,
400 writer: Writer<'w>,
401}
402
403impl<B: BlockingRuntime> BlockingWriter<'_, '_, B> {
404 pub fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
405 self.runtime.block_on(self.writer.push(chunk))
406 }
407
408 pub fn bytes_written(&self) -> u64 {
409 self.writer.bytes_written()
410 }
411
412 pub fn buffered_bytes(&self) -> u64 {
413 self.writer.buffered_bytes()
414 }
415
416 pub fn finish(self) -> VortexResult<WriteSummary> {
417 self.runtime.block_on(self.writer.finish())
418 }
419}
420
421struct BlockingWriteAdapter<W>(W);
423
424impl<W: Write + Unpin> VortexWrite for BlockingWriteAdapter<W> {
425 async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
426 self.0.write_all(buffer.as_slice())?;
427 Ok(buffer)
428 }
429
430 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
431 ready(self.0.flush())
432 }
433
434 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
435 ready(Ok(()))
436 }
437}
438
439pub struct WriteSummary {
440 footer: Footer,
441 size: u64,
442 }
444
445impl WriteSummary {
446 pub fn footer(&self) -> &Footer {
448 &self.footer
449 }
450
451 pub fn size(&self) -> u64 {
453 self.size
454 }
455
456 pub fn row_count(&self) -> u64 {
458 self.footer.row_count()
459 }
460}