vortex_layout/layouts/
repartition.rs1use std::collections::VecDeque;
5use std::sync::Arc;
6
7use async_stream::try_stream;
8use async_trait::async_trait;
9use futures::StreamExt as _;
10use futures::pin_mut;
11use vortex_array::Array;
12use vortex_array::ArrayContext;
13use vortex_array::ArrayRef;
14use vortex_array::IntoArray;
15use vortex_array::arrays::ChunkedArray;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_io::runtime::Handle;
19
20use crate::LayoutRef;
21use crate::LayoutStrategy;
22use crate::segments::SegmentSinkRef;
23use crate::sequence::SendableSequentialStream;
24use crate::sequence::SequencePointer;
25use crate::sequence::SequentialStreamAdapter;
26use crate::sequence::SequentialStreamExt;
27
28#[derive(Clone)]
29pub struct RepartitionWriterOptions {
30 pub block_size_minimum: u64,
32 pub block_len_multiple: usize,
34 pub canonicalize: bool,
35}
36
37#[derive(Clone)]
42pub struct RepartitionStrategy {
43 child: Arc<dyn LayoutStrategy>,
44 options: RepartitionWriterOptions,
45}
46
47impl RepartitionStrategy {
48 pub fn new<S: LayoutStrategy>(child: S, options: RepartitionWriterOptions) -> Self {
49 Self {
50 child: Arc::new(child),
51 options,
52 }
53 }
54}
55
56#[async_trait]
57impl LayoutStrategy for RepartitionStrategy {
58 async fn write_stream(
59 &self,
60 ctx: ArrayContext,
61 segment_sink: SegmentSinkRef,
62 stream: SendableSequentialStream,
63 eof: SequencePointer,
64 handle: Handle,
65 ) -> VortexResult<LayoutRef> {
66 let dtype = stream.dtype().clone();
69 let stream = if self.options.canonicalize {
70 SequentialStreamAdapter::new(
71 dtype.clone(),
72 stream.map(|chunk| {
73 let (sequence_id, chunk) = chunk?;
74 VortexResult::Ok((sequence_id, chunk.to_canonical().into_array()))
75 }),
76 )
77 .sendable()
78 } else {
79 stream
80 };
81
82 let dtype_clone = dtype.clone();
83 let options = self.options.clone();
84 let repartitioned_stream = try_stream! {
85 let canonical_stream = stream.peekable();
86 pin_mut!(canonical_stream);
87
88 let mut chunks = ChunksBuffer::new(options.clone());
89 while let Some(chunk) = canonical_stream.as_mut().next().await {
90 let (sequence_id, chunk) = chunk?;
91 let mut sequence_pointer = sequence_id.descend();
92 let mut offset = 0;
93 while offset < chunk.len() {
94 let end = (offset + options.block_len_multiple).min(chunk.len());
95 let sliced = chunk.slice(offset..end);
96 chunks.push_back(sliced);
97 offset = end;
98
99 if chunks.have_enough() {
100 let output_chunks = chunks.collect_exact_blocks()?;
101 assert!(!output_chunks.is_empty());
102 let chunked =
103 ChunkedArray::try_new(output_chunks, dtype_clone.clone())?;
104 if !chunked.is_empty() {
105 yield (
106 sequence_pointer.advance(),
107 chunked.to_canonical().into_array(),
108 )
109 }
110 }
111 }
112 if canonical_stream.as_mut().peek().await.is_none() {
113 let to_flush = ChunkedArray::try_new(
114 chunks.data.drain(..).collect(),
115 dtype_clone.clone(),
116 )?;
117 if !to_flush.is_empty() {
118 yield (
119 sequence_pointer.advance(),
120 to_flush.to_canonical().into_array(),
121 )
122 }
123 }
124 }
125 };
126
127 self.child
128 .write_stream(
129 ctx,
130 segment_sink,
131 SequentialStreamAdapter::new(dtype, repartitioned_stream).sendable(),
132 eof,
133 handle,
134 )
135 .await
136 }
137
138 fn buffered_bytes(&self) -> u64 {
139 self.child.buffered_bytes()
144 }
145}
146
147struct ChunksBuffer {
148 data: VecDeque<ArrayRef>,
149 row_count: usize,
150 nbytes: u64,
151 options: RepartitionWriterOptions,
152}
153
154impl ChunksBuffer {
155 fn new(options: RepartitionWriterOptions) -> Self {
156 Self {
157 data: Default::default(),
158 row_count: 0,
159 nbytes: 0,
160 options,
161 }
162 }
163
164 fn have_enough(&self) -> bool {
165 self.nbytes >= self.options.block_size_minimum
166 && self.row_count >= self.options.block_len_multiple
167 }
168
169 fn collect_exact_blocks(&mut self) -> VortexResult<Vec<ArrayRef>> {
170 let nblocks = self.row_count / self.options.block_len_multiple;
171 let mut res = Vec::with_capacity(self.data.len());
172 let mut remaining = nblocks * self.options.block_len_multiple;
173 while remaining > 0 {
174 let chunk = self
175 .pop_front()
176 .vortex_expect("must have at least one chunk");
177 let len = chunk.len();
178
179 if len > remaining {
180 let left = chunk.slice(0..remaining);
181 let right = chunk.slice(remaining..len);
182 self.push_front(right);
183 res.push(left);
184 remaining = 0;
185 } else {
186 res.push(chunk);
187 remaining -= len;
188 }
189 }
190 Ok(res)
191 }
192
193 fn push_back(&mut self, chunk: ArrayRef) {
194 self.row_count += chunk.len();
195 self.nbytes += chunk.nbytes();
196 self.data.push_back(chunk);
197 }
198
199 fn push_front(&mut self, chunk: ArrayRef) {
200 self.row_count += chunk.len();
201 self.nbytes += chunk.nbytes();
202 self.data.push_front(chunk);
203 }
204
205 fn pop_front(&mut self) -> Option<ArrayRef> {
206 let res = self.data.pop_front();
207 if let Some(chunk) = res.as_ref() {
208 self.row_count -= chunk.len();
209 self.nbytes -= chunk.nbytes();
210 }
211 res
212 }
213}