vortex_layout/layouts/
repartition.rs1use std::collections::VecDeque;
5
6use async_stream::try_stream;
7use async_trait::async_trait;
8use futures::{StreamExt as _, pin_mut};
9use vortex_array::arrays::ChunkedArray;
10use vortex_array::{Array, ArrayContext, ArrayRef, IntoArray};
11use vortex_error::{VortexExpect, VortexResult};
12
13use crate::segments::SequenceWriter;
14use crate::{
15 LayoutRef, LayoutStrategy, SendableSequentialStream, SequentialStreamAdapter,
16 SequentialStreamExt,
17};
18
19#[derive(Clone)]
20pub struct RepartitionWriterOptions {
21 pub block_size_minimum: u64,
23 pub block_len_multiple: usize,
25}
26
27#[derive(Clone)]
32pub struct RepartitionStrategy<S> {
33 child: S,
34 options: RepartitionWriterOptions,
35}
36
37impl<S> RepartitionStrategy<S>
38where
39 S: LayoutStrategy,
40{
41 pub fn new(child: S, options: RepartitionWriterOptions) -> Self {
42 Self { child, options }
43 }
44}
45
46#[async_trait]
47impl<S> LayoutStrategy for RepartitionStrategy<S>
48where
49 S: LayoutStrategy,
50{
51 async fn write_stream(
52 &self,
53 ctx: &ArrayContext,
54 sequence_writer: SequenceWriter,
55 stream: SendableSequentialStream,
56 ) -> VortexResult<LayoutRef> {
57 let dtype = stream.dtype().clone();
60 let canonical_stream = SequentialStreamAdapter::new(
61 dtype.clone(),
62 stream.map(|chunk| {
63 let (sequence_id, chunk) = chunk?;
64 VortexResult::Ok((sequence_id, chunk.to_canonical()?.into_array()))
65 }),
66 )
67 .sendable();
68
69 let dtype_clone = dtype.clone();
70 let options = self.options.clone();
71 let repartitioned_stream = try_stream! {
72 let canonical_stream = canonical_stream.peekable();
73 pin_mut!(canonical_stream);
74 let mut chunks = ChunksBuffer::new(options.clone());
75 while let Some(chunk) = canonical_stream.as_mut().next().await {
76 let (sequence_id, chunk) = chunk?;
77 let mut sequence_pointer = sequence_id.descend();
78 let mut offset = 0;
79 while offset < chunk.len() {
80 let end = (offset + options.block_len_multiple).min(chunk.len());
81 let sliced = chunk.slice(offset, end)?;
82 chunks.push_back(sliced);
83 offset = end;
84
85 if chunks.have_enough() {
86 let output_chunks = chunks.collect_exact_blocks()?;
87 assert!(!output_chunks.is_empty());
88 let chunked =
89 ChunkedArray::new_unchecked(output_chunks, dtype_clone.clone());
90 if !chunked.is_empty() {
91 yield (
92 sequence_pointer.advance(),
93 chunked.to_canonical()?.into_array(),
94 )
95 }
96 }
97 }
98 if canonical_stream.as_mut().peek().await.is_none() {
99 let to_flush = ChunkedArray::new_unchecked(
100 chunks.data.drain(..).collect(),
101 dtype_clone.clone(),
102 );
103 if !to_flush.is_empty() {
104 yield (
105 sequence_pointer.advance(),
106 to_flush.to_canonical()?.into_array(),
107 )
108 }
109 }
110 }
111 };
112
113 self.child
114 .write_stream(
115 ctx,
116 sequence_writer,
117 SequentialStreamAdapter::new(dtype, repartitioned_stream).sendable(),
118 )
119 .await
120 }
121}
122
123struct ChunksBuffer {
124 data: VecDeque<ArrayRef>,
125 row_count: usize,
126 nbytes: u64,
127 options: RepartitionWriterOptions,
128}
129
130impl ChunksBuffer {
131 fn new(options: RepartitionWriterOptions) -> Self {
132 Self {
133 data: Default::default(),
134 row_count: 0,
135 nbytes: 0,
136 options,
137 }
138 }
139
140 fn have_enough(&self) -> bool {
141 self.nbytes >= self.options.block_size_minimum
142 && self.row_count >= self.options.block_len_multiple
143 }
144
145 fn collect_exact_blocks(&mut self) -> VortexResult<Vec<ArrayRef>> {
146 let nblocks = self.row_count / self.options.block_len_multiple;
147 let mut res = Vec::with_capacity(self.data.len());
148 let mut remaining = nblocks * self.options.block_len_multiple;
149 while remaining > 0 {
150 let chunk = self
151 .pop_front()
152 .vortex_expect("must have at least one chunk");
153 let len = chunk.len();
154
155 if len > remaining {
156 let left = chunk.slice(0, remaining)?;
157 let right = chunk.slice(remaining, len)?;
158 self.push_front(right);
159 res.push(left);
160 remaining = 0;
161 } else {
162 res.push(chunk);
163 remaining -= len;
164 }
165 }
166 Ok(res)
167 }
168
169 fn push_back(&mut self, chunk: ArrayRef) {
170 self.row_count += chunk.len();
171 self.nbytes += chunk.nbytes();
172 self.data.push_back(chunk);
173 }
174
175 fn push_front(&mut self, chunk: ArrayRef) {
176 self.row_count += chunk.len();
177 self.nbytes += chunk.nbytes();
178 self.data.push_front(chunk);
179 }
180
181 fn pop_front(&mut self) -> Option<ArrayRef> {
182 let res = self.data.pop_front();
183 if let Some(chunk) = res.as_ref() {
184 self.row_count -= chunk.len();
185 self.nbytes -= chunk.nbytes();
186 }
187 res
188 }
189}