vortex_layout/layouts/
repartition.rs1use std::collections::VecDeque;
2
3use arcref::ArcRef;
4use vortex_array::arrays::ChunkedArray;
5use vortex_array::{Array, ArrayContext, ArrayRef, IntoArray};
6use vortex_dtype::DType;
7use vortex_error::{VortexExpect, VortexResult};
8
9use crate::segments::SegmentWriter;
10use crate::{Layout, LayoutStrategy, LayoutWriter, LayoutWriterExt};
11
12pub struct RepartitionStrategy {
13 pub options: RepartitionWriterOptions,
14 pub child: ArcRef<dyn LayoutStrategy>,
15}
16
17impl LayoutStrategy for RepartitionStrategy {
18 fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
19 Ok(RepartitionWriter::new(
20 dtype.clone(),
21 self.child.new_writer(ctx, dtype)?,
22 self.options.clone(),
23 )
24 .boxed())
25 }
26}
27
28#[derive(Clone)]
29pub struct RepartitionWriterOptions {
30 pub block_size_minimum: usize,
32 pub block_len_multiple: usize,
34}
35
36pub struct RepartitionWriter {
41 dtype: DType,
42 chunks: VecDeque<ArrayRef>,
43 row_count: usize,
44 nbytes: usize,
45 writer: Box<dyn LayoutWriter>,
46 options: RepartitionWriterOptions,
47}
48
49impl RepartitionWriter {
50 pub fn new(
51 dtype: DType,
52 writer: Box<dyn LayoutWriter>,
53 options: RepartitionWriterOptions,
54 ) -> Self {
55 Self {
56 dtype,
57 chunks: VecDeque::new(),
58 row_count: 0,
59 nbytes: 0,
60 writer,
61 options,
62 }
63 }
64
65 fn maybe_flush_chunk(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<()> {
66 if self.nbytes >= self.options.block_size_minimum {
67 let nblocks = self.row_count / self.options.block_len_multiple;
68
69 if nblocks == 0 {
71 return Ok(());
72 }
73
74 let mut chunks = Vec::with_capacity(self.chunks.len());
75 let mut remaining = nblocks * self.options.block_len_multiple;
76
77 while remaining > 0 {
78 let chunk = self.chunks.pop_front().vortex_expect("chunk is missing");
79 self.row_count -= chunk.len();
80 self.nbytes -= chunk.nbytes();
81
82 let len = chunk.len();
83
84 if len > remaining {
85 let left = chunk.slice(0, remaining)?;
86 let right = chunk.slice(remaining, len)?;
87 self.row_count += right.len();
88 self.nbytes += right.nbytes();
89 self.chunks.push_front(right);
90
91 chunks.push(left);
92 remaining = 0;
93 } else {
94 chunks.push(chunk);
95 remaining -= len;
96 }
97 }
98
99 assert!(!chunks.is_empty());
101 let chunk = ChunkedArray::new_unchecked(chunks, self.dtype.clone())
102 .to_canonical()?
103 .into_array();
104
105 self.writer.push_chunk(segments, chunk)?;
106 }
107
108 Ok(())
109 }
110}
111
112impl LayoutWriter for RepartitionWriter {
113 fn push_chunk(
114 &mut self,
115 segment_writer: &mut dyn SegmentWriter,
116 chunk: ArrayRef,
117 ) -> VortexResult<()> {
118 assert_eq!(
119 chunk.dtype(),
120 &self.dtype,
121 "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
122 chunk.dtype(),
123 self.dtype
124 );
125 let chunk = chunk.to_canonical()?.into_array();
127
128 let mut offset = 0;
130 while offset < chunk.len() {
131 let end = (offset + self.options.block_len_multiple).min(chunk.len());
132 let c = chunk.slice(offset, end)?;
133 self.row_count += c.len();
134 self.nbytes += c.nbytes();
135 self.chunks.push_back(c);
136 offset = end;
137
138 self.maybe_flush_chunk(segment_writer)?;
139 }
140
141 Ok(())
142 }
143
144 fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
145 let chunk =
146 ChunkedArray::new_unchecked(self.chunks.drain(..).collect(), self.dtype.clone())
147 .to_canonical()?
148 .into_array();
149 self.writer.push_chunk(segment_writer, chunk)?;
150 self.writer.flush(segment_writer)
151 }
152
153 fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
154 self.writer.finish(segment_writer)
155 }
156}