1use std::io;
5use std::io::Write;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8
9use futures::future::{Fuse, LocalBoxFuture, ready};
10use futures::{FutureExt, StreamExt, TryStreamExt, pin_mut, select};
11use vortex_array::iter::{ArrayIterator, ArrayIteratorExt};
12use vortex_array::stats::{PRUNING_STATS, Stat};
13use vortex_array::stream::{ArrayStream, ArrayStreamAdapter, ArrayStreamExt, SendableArrayStream};
14use vortex_array::{ArrayContext, ArrayRef};
15use vortex_buffer::ByteBuffer;
16use vortex_dtype::DType;
17use vortex_error::{
18 VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
19};
20use vortex_io::kanal_ext::KanalExt;
21use vortex_io::runtime::{BlockingRuntime, Handle};
22use vortex_io::{IoBuf, VortexWrite};
23use vortex_layout::LayoutStrategy;
24use vortex_layout::layouts::file_stats::accumulate_stats;
25use vortex_layout::sequence::{SequenceId, SequentialStreamAdapter, SequentialStreamExt};
26
27use crate::counting::CountingVortexWrite;
28use crate::footer::FileStatistics;
29use crate::segments::writer::BufferedSegmentSink;
30use crate::{Footer, MAGIC_BYTES, WriteStrategyBuilder};
31
32pub struct VortexWriteOptions {
37 strategy: Arc<dyn LayoutStrategy>,
38 exclude_dtype: bool,
39 max_variable_length_statistics_size: usize,
40 file_statistics: Vec<Stat>,
41 handle: Option<Handle>,
42}
43
44impl Default for VortexWriteOptions {
45 fn default() -> Self {
46 Self {
47 strategy: WriteStrategyBuilder::new().build(),
48 exclude_dtype: false,
49 file_statistics: PRUNING_STATS.to_vec(),
50 max_variable_length_statistics_size: 64,
51 handle: Handle::find(),
52 }
53 }
54}
55
56impl VortexWriteOptions {
57 pub fn with_handle(mut self, handle: Handle) -> Self {
61 self.handle = Some(handle);
62 self
63 }
64
65 pub fn with_some_handle(mut self, handle: Option<Handle>) -> Self {
67 self.handle = handle.or(self.handle);
68 self
69 }
70
71 pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
73 self.strategy = strategy;
74 self
75 }
76
77 pub fn exclude_dtype(mut self) -> Self {
81 self.exclude_dtype = true;
82 self
83 }
84
85 pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
87 self.file_statistics = file_statistics;
88 self
89 }
90}
91
92impl VortexWriteOptions {
93 pub fn blocking<B: BlockingRuntime + Default>(self) -> BlockingWrite<B> {
95 self.with_blocking(B::default())
96 }
97
98 pub fn with_blocking<B: BlockingRuntime>(self, runtime: B) -> BlockingWrite<B> {
100 if self.handle.is_some() {
101 vortex_panic!("Must not provide or infer a Handle when using the blocking writer API")
102 }
103 BlockingWrite {
104 options: self,
105 runtime,
106 }
107 }
108
109 pub async fn write<W: VortexWrite + Unpin, S: ArrayStream + Send + 'static>(
114 self,
115 write: W,
116 stream: S,
117 ) -> VortexResult<WriteSummary> {
118 self.write_internal(write, ArrayStreamExt::boxed(stream))
119 .await
120 }
121
122 async fn write_internal<W: VortexWrite + Unpin>(
123 self,
124 mut write: W,
125 stream: SendableArrayStream,
126 ) -> VortexResult<WriteSummary> {
127 let Some(handle) = self.handle else {
128 vortex_panic!("Must provide a Handle to use the async writer API");
129 };
130
131 let ctx = ArrayContext::empty();
133 let dtype = stream.dtype().clone();
134
135 let (mut ptr, eof) = SequenceId::root().split();
136
137 let stream = SequentialStreamAdapter::new(
138 dtype.clone(),
139 stream
140 .try_filter(|chunk| ready(!chunk.is_empty()))
141 .map(move |result| result.map(|chunk| (ptr.advance(), chunk))),
142 )
143 .sendable();
144 let (file_stats, stream) = accumulate_stats(
145 stream,
146 self.file_statistics.clone().into(),
147 self.max_variable_length_statistics_size,
148 );
149
150 write.write_all(ByteBuffer::copy_from(MAGIC_BYTES)).await?;
152 let mut position = MAGIC_BYTES.len() as u64;
153
154 let (send, recv) = kanal::bounded_async(1);
156
157 let segments = Arc::new(BufferedSegmentSink::new(send, position));
158
159 let ctx2 = ctx.clone();
162 let layout_fut = handle.spawn_nested(|h| async move {
163 let layout = self
164 .strategy
165 .write_stream(ctx2, segments.clone(), stream, eof, h)
166 .await?;
167 Ok::<_, VortexError>((layout, segments.segment_specs()))
168 });
169
170 let recv_stream = recv.into_stream();
172 pin_mut!(recv_stream);
173 while let Some(buffer) = recv_stream.next().await {
174 if buffer.is_empty() {
175 continue;
176 }
177 position += buffer.len() as u64;
178 write.write_all(buffer).await?;
179 }
180
181 let (layout, segment_specs) = layout_fut.await?;
182
183 let footer = Footer::new(
185 layout.clone(),
186 segment_specs,
187 if self.file_statistics.is_empty() {
188 None
189 } else {
190 Some(FileStatistics(file_stats.stats_sets().into()))
191 },
192 ctx,
193 );
194
195 let footer_buffers = footer
197 .clone()
198 .into_serializer()
199 .with_offset(position)
200 .with_exclude_dtype(self.exclude_dtype)
201 .serialize()?;
202 for buffer in footer_buffers {
203 position += buffer.len() as u64;
204 write.write_all(buffer).await?;
205 }
206
207 write.flush().await?;
208
209 Ok(WriteSummary {
210 footer,
211 size: position,
212 })
213 }
214
215 pub fn writer<'w, W: VortexWrite + Unpin + 'w>(self, write: W, dtype: DType) -> Writer<'w> {
217 let (arrays_send, arrays_recv) = kanal::bounded_async(1);
219
220 let arrays =
221 ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, arrays_recv.into_stream()));
222
223 let write = CountingVortexWrite::new(write);
224 let bytes_written = write.counter();
225 let strategy = self.strategy.clone();
226 let future = self.write(write, arrays).boxed_local().fuse();
227
228 Writer {
229 arrays: Some(arrays_send),
230 future,
231 bytes_written,
232 strategy,
233 }
234 }
235}
236
237pub struct Writer<'w> {
239 arrays: Option<kanal::AsyncSender<VortexResult<ArrayRef>>>,
241 future: Fuse<LocalBoxFuture<'w, VortexResult<WriteSummary>>>,
243 bytes_written: Arc<AtomicU64>,
245 strategy: Arc<dyn LayoutStrategy>,
247}
248
249impl Writer<'_> {
250 pub async fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
252 let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
253 let send_fut = async move { arrays.send(Ok(chunk)).await }.fuse();
254 pin_mut!(send_fut);
255
256 select! {
259 result = send_fut => {
260 if result.is_err() {
262 return Err(self.handle_failed_task().await);
263 }
264 },
265 result = &mut self.future => {
266 match result {
270 Ok(_) => vortex_bail!("Internal error: writer future completed early"),
271 Err(e) => return Err(e),
272 }
273 }
274 }
275
276 Ok(())
277 }
278
279 pub async fn push_stream(&mut self, mut stream: SendableArrayStream) -> VortexResult<()> {
284 let arrays = self.arrays.clone().vortex_expect("missing arrays sender");
285 let stream_fut = async move {
286 while let Some(chunk) = stream.next().await {
287 arrays.send(chunk).await?;
288 }
289 Ok::<_, kanal::SendError>(())
290 }
291 .fuse();
292 pin_mut!(stream_fut);
293
294 select! {
297 result = stream_fut => {
298 if let Err(_send_err) = result {
299 return Err(self.handle_failed_task().await);
301 }
302 }
303
304 result = &mut self.future => {
305 match result {
309 Ok(_) => vortex_bail!("Internal error: writer future completed early"),
310 Err(e) => return Err(e),
311 }
312 }
313 }
314
315 Ok(())
316 }
317
318 pub fn bytes_written(&self) -> u64 {
320 self.bytes_written
321 .load(std::sync::atomic::Ordering::Relaxed)
322 }
323
324 pub fn buffered_bytes(&self) -> u64 {
326 self.strategy.buffered_bytes()
327 }
328
329 pub async fn finish(mut self) -> VortexResult<WriteSummary> {
332 drop(self.arrays.take());
334
335 self.future.await
337 }
338
339 async fn handle_failed_task(&mut self) -> VortexError {
341 match (&mut self.future).await {
342 Ok(_) => vortex_err!(
343 "Internal error: writer task completed successfully but write future finished early"
344 ),
345 Err(e) => e,
346 }
347 }
348}
349
350pub struct BlockingWrite<B: BlockingRuntime> {
352 options: VortexWriteOptions,
353 runtime: B,
354}
355
356impl<B: BlockingRuntime> BlockingWrite<B> {
357 pub fn write<W: Write + Unpin>(
359 self,
360 write: W,
361 iter: impl ArrayIterator + Send + 'static,
362 ) -> VortexResult<WriteSummary> {
363 self.runtime.block_on(|handle| async move {
364 self.options
365 .with_handle(handle)
366 .write(BlockingWriteAdapter(write), iter.into_array_stream())
367 .await
368 })
369 }
370
371 pub fn writer<'w, W: Write + Unpin + 'w>(
372 self,
373 write: W,
374 dtype: DType,
375 ) -> BlockingWriter<'w, B> {
376 BlockingWriter {
377 writer: self
378 .options
379 .with_handle(self.runtime.handle())
380 .writer(BlockingWriteAdapter(write), dtype),
381 runtime: self.runtime,
382 }
383 }
384}
385
386pub struct BlockingWriter<'w, B: BlockingRuntime> {
388 runtime: B,
389 writer: Writer<'w>,
390}
391
392impl<B: BlockingRuntime> BlockingWriter<'_, B> {
393 pub fn push(&mut self, chunk: ArrayRef) -> VortexResult<()> {
394 self.runtime.block_on(|_| self.writer.push(chunk))
395 }
396
397 pub fn bytes_written(&self) -> u64 {
398 self.writer.bytes_written()
399 }
400
401 pub fn buffered_bytes(&self) -> u64 {
402 self.writer.buffered_bytes()
403 }
404
405 pub fn finish(self) -> VortexResult<WriteSummary> {
406 self.runtime.block_on(|_| self.writer.finish())
407 }
408}
409
410struct BlockingWriteAdapter<W>(W);
412
413impl<W: Write + Unpin> VortexWrite for BlockingWriteAdapter<W> {
414 async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
415 self.0.write_all(buffer.as_slice())?;
416 Ok(buffer)
417 }
418
419 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
420 ready(self.0.flush())
421 }
422
423 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
424 ready(Ok(()))
425 }
426}
427
428pub struct WriteSummary {
429 footer: Footer,
430 size: u64,
431 }
433
434impl WriteSummary {
435 pub fn footer(&self) -> &Footer {
437 &self.footer
438 }
439
440 pub fn size(&self) -> u64 {
442 self.size
443 }
444
445 pub fn row_count(&self) -> u64 {
447 self.footer.row_count()
448 }
449}