vortex_layout/writers/
repartition.rs1use std::collections::VecDeque;
2
3use vortex_array::arrays::ChunkedArray;
4use vortex_array::compute::slice;
5use vortex_array::nbytes::NBytes;
6use vortex_array::{Array, ArrayRef, IntoArray};
7use vortex_dtype::DType;
8use vortex_error::{VortexExpect, VortexResult};
9
10use crate::segments::SegmentWriter;
11use crate::{Layout, LayoutWriter};
12
13pub struct RepartitionWriterOptions {
14 pub block_size_minimum: usize,
16 pub block_len_multiple: usize,
18}
19
20pub struct RepartitionWriter {
25 dtype: DType,
26 chunks: VecDeque<ArrayRef>,
27 row_count: usize,
28 nbytes: usize,
29 writer: Box<dyn LayoutWriter>,
30 options: RepartitionWriterOptions,
31}
32
33impl RepartitionWriter {
34 pub fn new(
35 dtype: DType,
36 writer: Box<dyn LayoutWriter>,
37 options: RepartitionWriterOptions,
38 ) -> Self {
39 Self {
40 dtype,
41 chunks: VecDeque::new(),
42 row_count: 0,
43 nbytes: 0,
44 writer,
45 options,
46 }
47 }
48
49 fn maybe_flush_chunk(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<()> {
50 if self.nbytes >= self.options.block_size_minimum {
51 let nblocks = self.row_count / self.options.block_len_multiple;
52
53 if nblocks == 0 {
55 return Ok(());
56 }
57
58 let mut chunks = Vec::with_capacity(self.chunks.len());
59 let mut remaining = nblocks * self.options.block_len_multiple;
60
61 while remaining > 0 {
62 let chunk = self.chunks.pop_front().vortex_expect("chunk is missing");
63 self.row_count -= chunk.len();
64 self.nbytes -= chunk.nbytes();
65
66 let len = chunk.len();
67
68 if len > remaining {
69 let left = slice(&chunk, 0, remaining)?;
70 let right = slice(&chunk, remaining, len)?;
71 self.row_count += right.len();
72 self.nbytes += right.nbytes();
73 self.chunks.push_front(right);
74
75 chunks.push(left);
76 remaining = 0;
77 } else {
78 chunks.push(chunk);
79 remaining -= len;
80 }
81 }
82
83 assert!(!chunks.is_empty());
85 let chunk = ChunkedArray::new_unchecked(chunks, self.dtype.clone())
86 .to_canonical()?
87 .into_array();
88
89 self.writer.push_chunk(segments, chunk)?;
90 }
91
92 Ok(())
93 }
94}
95
96impl LayoutWriter for RepartitionWriter {
97 fn push_chunk(
98 &mut self,
99 segment_writer: &mut dyn SegmentWriter,
100 chunk: ArrayRef,
101 ) -> VortexResult<()> {
102 let chunk = chunk.to_canonical()?.into_array();
104
105 let mut offset = 0;
107 while offset < chunk.len() {
108 let end = (offset + self.options.block_len_multiple).min(chunk.len());
109 let c = slice(&chunk, offset, end)?;
110 self.row_count += c.len();
111 self.nbytes += c.nbytes();
112 self.chunks.push_back(c);
113 offset = end;
114
115 self.maybe_flush_chunk(segment_writer)?;
116 }
117
118 Ok(())
119 }
120
121 fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
122 let chunk =
123 ChunkedArray::new_unchecked(self.chunks.drain(..).collect(), self.dtype.clone())
124 .to_canonical()?
125 .into_array();
126 self.writer.push_chunk(segment_writer, chunk)?;
127 self.writer.flush(segment_writer)
128 }
129
130 fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
131 self.writer.finish(segment_writer)
132 }
133}