vortex_layout/layouts/
repartition.rs1use std::collections::VecDeque;
2
3use vortex_array::arcref::ArcRef;
4use vortex_array::arrays::ChunkedArray;
5use vortex_array::compute::slice;
6use vortex_array::nbytes::NBytes;
7use vortex_array::{Array, ArrayContext, ArrayRef, IntoArray};
8use vortex_dtype::DType;
9use vortex_error::{VortexExpect, VortexResult};
10
11use crate::segments::SegmentWriter;
12use crate::{Layout, LayoutStrategy, LayoutWriter, LayoutWriterExt};
13
14pub struct RepartitionStrategy {
15 pub options: RepartitionWriterOptions,
16 pub child: ArcRef<dyn LayoutStrategy>,
17}
18
19impl LayoutStrategy for RepartitionStrategy {
20 fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
21 Ok(RepartitionWriter::new(
22 dtype.clone(),
23 self.child.new_writer(ctx, dtype)?,
24 self.options.clone(),
25 )
26 .boxed())
27 }
28}
29
30#[derive(Clone)]
31pub struct RepartitionWriterOptions {
32 pub block_size_minimum: usize,
34 pub block_len_multiple: usize,
36}
37
38pub struct RepartitionWriter {
43 dtype: DType,
44 chunks: VecDeque<ArrayRef>,
45 row_count: usize,
46 nbytes: usize,
47 writer: Box<dyn LayoutWriter>,
48 options: RepartitionWriterOptions,
49}
50
51impl RepartitionWriter {
52 pub fn new(
53 dtype: DType,
54 writer: Box<dyn LayoutWriter>,
55 options: RepartitionWriterOptions,
56 ) -> Self {
57 Self {
58 dtype,
59 chunks: VecDeque::new(),
60 row_count: 0,
61 nbytes: 0,
62 writer,
63 options,
64 }
65 }
66
67 fn maybe_flush_chunk(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<()> {
68 if self.nbytes >= self.options.block_size_minimum {
69 let nblocks = self.row_count / self.options.block_len_multiple;
70
71 if nblocks == 0 {
73 return Ok(());
74 }
75
76 let mut chunks = Vec::with_capacity(self.chunks.len());
77 let mut remaining = nblocks * self.options.block_len_multiple;
78
79 while remaining > 0 {
80 let chunk = self.chunks.pop_front().vortex_expect("chunk is missing");
81 self.row_count -= chunk.len();
82 self.nbytes -= chunk.nbytes();
83
84 let len = chunk.len();
85
86 if len > remaining {
87 let left = slice(&chunk, 0, remaining)?;
88 let right = slice(&chunk, remaining, len)?;
89 self.row_count += right.len();
90 self.nbytes += right.nbytes();
91 self.chunks.push_front(right);
92
93 chunks.push(left);
94 remaining = 0;
95 } else {
96 chunks.push(chunk);
97 remaining -= len;
98 }
99 }
100
101 assert!(!chunks.is_empty());
103 let chunk = ChunkedArray::new_unchecked(chunks, self.dtype.clone())
104 .to_canonical()?
105 .into_array();
106
107 self.writer.push_chunk(segments, chunk)?;
108 }
109
110 Ok(())
111 }
112}
113
114impl LayoutWriter for RepartitionWriter {
115 fn push_chunk(
116 &mut self,
117 segment_writer: &mut dyn SegmentWriter,
118 chunk: ArrayRef,
119 ) -> VortexResult<()> {
120 let chunk = chunk.to_canonical()?.into_array();
122
123 let mut offset = 0;
125 while offset < chunk.len() {
126 let end = (offset + self.options.block_len_multiple).min(chunk.len());
127 let c = slice(&chunk, offset, end)?;
128 self.row_count += c.len();
129 self.nbytes += c.nbytes();
130 self.chunks.push_back(c);
131 offset = end;
132
133 self.maybe_flush_chunk(segment_writer)?;
134 }
135
136 Ok(())
137 }
138
139 fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
140 let chunk =
141 ChunkedArray::new_unchecked(self.chunks.drain(..).collect(), self.dtype.clone())
142 .to_canonical()?
143 .into_array();
144 self.writer.push_chunk(segment_writer, chunk)?;
145 self.writer.flush(segment_writer)
146 }
147
148 fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
149 self.writer.finish(segment_writer)
150 }
151}