vortex_layout/layouts/
repartition.rs1use std::collections::VecDeque;
5
6use arcref::ArcRef;
7use async_stream::try_stream;
8use futures::{StreamExt as _, pin_mut};
9use vortex_array::arrays::ChunkedArray;
10use vortex_array::{Array, ArrayContext, ArrayRef, IntoArray};
11use vortex_error::{VortexExpect, VortexResult};
12
13use crate::segments::SequenceWriter;
14use crate::{
15 LayoutStrategy, SendableLayoutFuture, SendableSequentialStream, SequentialStreamAdapter,
16 SequentialStreamExt,
17};
18
19#[derive(Clone)]
20pub struct RepartitionWriterOptions {
21 pub block_size_minimum: u64,
23 pub block_len_multiple: usize,
25}
26
27pub struct RepartitionStrategy {
32 options: RepartitionWriterOptions,
33 child: ArcRef<dyn LayoutStrategy>,
34}
35
36impl RepartitionStrategy {
37 pub fn new(child: ArcRef<dyn LayoutStrategy>, options: RepartitionWriterOptions) -> Self {
38 Self { options, child }
39 }
40}
41
42impl LayoutStrategy for RepartitionStrategy {
43 fn write_stream(
44 &self,
45 ctx: &ArrayContext,
46 sequence_writer: SequenceWriter,
47 stream: SendableSequentialStream,
48 ) -> SendableLayoutFuture {
49 let dtype = stream.dtype().clone();
52 let canonical_stream = SequentialStreamAdapter::new(
53 dtype.clone(),
54 stream.map(|chunk| {
55 let (sequence_id, chunk) = chunk?;
56 VortexResult::Ok((sequence_id, chunk.to_canonical()?.into_array()))
57 }),
58 )
59 .sendable();
60
61 let dtype_clone = dtype.clone();
62 let options = self.options.clone();
63 let repartitioned_stream = try_stream! {
64 let canonical_stream = canonical_stream.peekable();
65 pin_mut!(canonical_stream);
66 let mut chunks = ChunksBuffer::new(options.clone());
67 while let Some(chunk) = canonical_stream.as_mut().next().await {
68 let (sequence_id, chunk) = chunk?;
69 let mut sequence_pointer = sequence_id.descend();
70 let mut offset = 0;
71 while offset < chunk.len() {
72 let end = (offset + options.block_len_multiple).min(chunk.len());
73 let sliced = chunk.slice(offset, end)?;
74 chunks.push_back(sliced);
75 offset = end;
76
77 if chunks.have_enough() {
78 let output_chunks = chunks.collect_exact_blocks()?;
79 assert!(!output_chunks.is_empty());
80 let chunked =
81 ChunkedArray::new_unchecked(output_chunks, dtype_clone.clone());
82 if !chunked.is_empty() {
83 yield (
84 sequence_pointer.advance(),
85 chunked.to_canonical()?.into_array(),
86 )
87 }
88 }
89 }
90 if canonical_stream.as_mut().peek().await.is_none() {
91 let to_flush = ChunkedArray::new_unchecked(
92 chunks.data.drain(..).collect(),
93 dtype_clone.clone(),
94 );
95 if !to_flush.is_empty() {
96 yield (
97 sequence_pointer.advance(),
98 to_flush.to_canonical()?.into_array(),
99 )
100 }
101 }
102 }
103 };
104
105 self.child.write_stream(
106 ctx,
107 sequence_writer,
108 SequentialStreamAdapter::new(dtype, repartitioned_stream).sendable(),
109 )
110 }
111}
112
113struct ChunksBuffer {
114 data: VecDeque<ArrayRef>,
115 row_count: usize,
116 nbytes: u64,
117 options: RepartitionWriterOptions,
118}
119
120impl ChunksBuffer {
121 fn new(options: RepartitionWriterOptions) -> Self {
122 Self {
123 data: Default::default(),
124 row_count: 0,
125 nbytes: 0,
126 options,
127 }
128 }
129
130 fn have_enough(&self) -> bool {
131 self.nbytes >= self.options.block_size_minimum
132 && self.row_count >= self.options.block_len_multiple
133 }
134
135 fn collect_exact_blocks(&mut self) -> VortexResult<Vec<ArrayRef>> {
136 let nblocks = self.row_count / self.options.block_len_multiple;
137 let mut res = Vec::with_capacity(self.data.len());
138 let mut remaining = nblocks * self.options.block_len_multiple;
139 while remaining > 0 {
140 let chunk = self
141 .pop_front()
142 .vortex_expect("must have at least one chunk");
143 let len = chunk.len();
144
145 if len > remaining {
146 let left = chunk.slice(0, remaining)?;
147 let right = chunk.slice(remaining, len)?;
148 self.push_front(right);
149 res.push(left);
150 remaining = 0;
151 } else {
152 res.push(chunk);
153 remaining -= len;
154 }
155 }
156 Ok(res)
157 }
158
159 fn push_back(&mut self, chunk: ArrayRef) {
160 self.row_count += chunk.len();
161 self.nbytes += chunk.nbytes();
162 self.data.push_back(chunk);
163 }
164
165 fn push_front(&mut self, chunk: ArrayRef) {
166 self.row_count += chunk.len();
167 self.nbytes += chunk.nbytes();
168 self.data.push_front(chunk);
169 }
170
171 fn pop_front(&mut self) -> Option<ArrayRef> {
172 let res = self.data.pop_front();
173 if let Some(chunk) = res.as_ref() {
174 self.row_count -= chunk.len();
175 self.nbytes -= chunk.nbytes();
176 }
177 res
178 }
179}