vortex_layout/layouts/
repartition.rs1use std::collections::VecDeque;
5use std::sync::Arc;
6
7use async_stream::try_stream;
8use async_trait::async_trait;
9use futures::{StreamExt as _, pin_mut};
10use vortex_array::arrays::ChunkedArray;
11use vortex_array::{Array, ArrayContext, ArrayRef, IntoArray};
12use vortex_error::{VortexExpect, VortexResult};
13use vortex_io::runtime::Handle;
14
15use crate::segments::SegmentSinkRef;
16use crate::sequence::{
17 SendableSequentialStream, SequencePointer, SequentialStreamAdapter, SequentialStreamExt,
18};
19use crate::{LayoutRef, LayoutStrategy};
20
21#[derive(Clone)]
22pub struct RepartitionWriterOptions {
23 pub block_size_minimum: u64,
25 pub block_len_multiple: usize,
27 pub canonicalize: bool,
28}
29
30#[derive(Clone)]
35pub struct RepartitionStrategy {
36 child: Arc<dyn LayoutStrategy>,
37 options: RepartitionWriterOptions,
38}
39
40impl RepartitionStrategy {
41 pub fn new<S: LayoutStrategy>(child: S, options: RepartitionWriterOptions) -> Self {
42 Self {
43 child: Arc::new(child),
44 options,
45 }
46 }
47}
48
49#[async_trait]
50impl LayoutStrategy for RepartitionStrategy {
51 async fn write_stream(
52 &self,
53 ctx: ArrayContext,
54 segment_sink: SegmentSinkRef,
55 stream: SendableSequentialStream,
56 eof: SequencePointer,
57 handle: Handle,
58 ) -> VortexResult<LayoutRef> {
59 let dtype = stream.dtype().clone();
62 let stream = if self.options.canonicalize {
63 SequentialStreamAdapter::new(
64 dtype.clone(),
65 stream.map(|chunk| {
66 let (sequence_id, chunk) = chunk?;
67 VortexResult::Ok((sequence_id, chunk.to_canonical().into_array()))
68 }),
69 )
70 .sendable()
71 } else {
72 stream
73 };
74
75 let dtype_clone = dtype.clone();
76 let options = self.options.clone();
77 let repartitioned_stream = try_stream! {
78 let canonical_stream = stream.peekable();
79 pin_mut!(canonical_stream);
80
81 let mut chunks = ChunksBuffer::new(options.clone());
82 while let Some(chunk) = canonical_stream.as_mut().next().await {
83 let (sequence_id, chunk) = chunk?;
84 let mut sequence_pointer = sequence_id.descend();
85 let mut offset = 0;
86 while offset < chunk.len() {
87 let end = (offset + options.block_len_multiple).min(chunk.len());
88 let sliced = chunk.slice(offset..end);
89 chunks.push_back(sliced);
90 offset = end;
91
92 if chunks.have_enough() {
93 let output_chunks = chunks.collect_exact_blocks()?;
94 assert!(!output_chunks.is_empty());
95 let chunked =
96 ChunkedArray::try_new(output_chunks, dtype_clone.clone())?;
97 if !chunked.is_empty() {
98 yield (
99 sequence_pointer.advance(),
100 chunked.to_canonical().into_array(),
101 )
102 }
103 }
104 }
105 if canonical_stream.as_mut().peek().await.is_none() {
106 let to_flush = ChunkedArray::try_new(
107 chunks.data.drain(..).collect(),
108 dtype_clone.clone(),
109 )?;
110 if !to_flush.is_empty() {
111 yield (
112 sequence_pointer.advance(),
113 to_flush.to_canonical().into_array(),
114 )
115 }
116 }
117 }
118 };
119
120 self.child
121 .write_stream(
122 ctx,
123 segment_sink,
124 SequentialStreamAdapter::new(dtype, repartitioned_stream).sendable(),
125 eof,
126 handle,
127 )
128 .await
129 }
130
131 fn buffered_bytes(&self) -> u64 {
132 self.child.buffered_bytes()
137 }
138}
139
140struct ChunksBuffer {
141 data: VecDeque<ArrayRef>,
142 row_count: usize,
143 nbytes: u64,
144 options: RepartitionWriterOptions,
145}
146
147impl ChunksBuffer {
148 fn new(options: RepartitionWriterOptions) -> Self {
149 Self {
150 data: Default::default(),
151 row_count: 0,
152 nbytes: 0,
153 options,
154 }
155 }
156
157 fn have_enough(&self) -> bool {
158 self.nbytes >= self.options.block_size_minimum
159 && self.row_count >= self.options.block_len_multiple
160 }
161
162 fn collect_exact_blocks(&mut self) -> VortexResult<Vec<ArrayRef>> {
163 let nblocks = self.row_count / self.options.block_len_multiple;
164 let mut res = Vec::with_capacity(self.data.len());
165 let mut remaining = nblocks * self.options.block_len_multiple;
166 while remaining > 0 {
167 let chunk = self
168 .pop_front()
169 .vortex_expect("must have at least one chunk");
170 let len = chunk.len();
171
172 if len > remaining {
173 let left = chunk.slice(0..remaining);
174 let right = chunk.slice(remaining..len);
175 self.push_front(right);
176 res.push(left);
177 remaining = 0;
178 } else {
179 res.push(chunk);
180 remaining -= len;
181 }
182 }
183 Ok(res)
184 }
185
186 fn push_back(&mut self, chunk: ArrayRef) {
187 self.row_count += chunk.len();
188 self.nbytes += chunk.nbytes();
189 self.data.push_back(chunk);
190 }
191
192 fn push_front(&mut self, chunk: ArrayRef) {
193 self.row_count += chunk.len();
194 self.nbytes += chunk.nbytes();
195 self.data.push_front(chunk);
196 }
197
198 fn pop_front(&mut self) -> Option<ArrayRef> {
199 let res = self.data.pop_front();
200 if let Some(chunk) = res.as_ref() {
201 self.row_count -= chunk.len();
202 self.nbytes -= chunk.nbytes();
203 }
204 res
205 }
206}