vortex_layout/layouts/
table.rs1use std::sync::Arc;
9
10use async_trait::async_trait;
11use futures::StreamExt;
12use futures::TryStreamExt;
13use futures::future::try_join_all;
14use futures::pin_mut;
15use itertools::Itertools;
16use vortex_array::ArrayContext;
17use vortex_array::ArrayRef;
18use vortex_array::IntoArray;
19use vortex_array::LEGACY_SESSION;
20use vortex_array::ToCanonical;
21use vortex_array::VortexSessionExecute;
22use vortex_array::arrays::struct_::StructArrayExt;
23use vortex_array::dtype::DType;
24use vortex_array::dtype::Field;
25use vortex_array::dtype::FieldName;
26use vortex_array::dtype::FieldPath;
27use vortex_array::dtype::Nullability;
28use vortex_error::VortexError;
29use vortex_error::VortexResult;
30use vortex_error::vortex_bail;
31use vortex_io::kanal_ext::KanalExt;
32use vortex_io::session::RuntimeSessionExt;
33use vortex_session::VortexSession;
34use vortex_utils::aliases::DefaultHashBuilder;
35use vortex_utils::aliases::hash_map::HashMap;
36use vortex_utils::aliases::hash_set::HashSet;
37
38use crate::IntoLayout;
39use crate::LayoutRef;
40use crate::LayoutStrategy;
41use crate::layouts::struct_::StructLayout;
42use crate::segments::SegmentSinkRef;
43use crate::sequence::SendableSequentialStream;
44use crate::sequence::SequenceId;
45use crate::sequence::SequencePointer;
46use crate::sequence::SequentialStreamAdapter;
47use crate::sequence::SequentialStreamExt;
48
49pub struct TableStrategy {
52 leaf_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
54 validity: Arc<dyn LayoutStrategy>,
56 fallback: Arc<dyn LayoutStrategy>,
58}
59
60impl TableStrategy {
61 pub fn new(validity: Arc<dyn LayoutStrategy>, fallback: Arc<dyn LayoutStrategy>) -> Self {
79 Self {
80 leaf_writers: Default::default(),
81 validity,
82 fallback,
83 }
84 }
85
86 pub fn with_field_writer(
115 mut self,
116 field_path: impl Into<FieldPath>,
117 writer: Arc<dyn LayoutStrategy>,
118 ) -> Self {
119 self.leaf_writers
120 .insert(self.validate_path(field_path.into()), writer);
121 self
122 }
123
124 pub fn with_field_writers(
128 mut self,
129 writers: impl IntoIterator<Item = (FieldPath, Arc<dyn LayoutStrategy>)>,
130 ) -> Self {
131 for (field_path, strategy) in writers {
132 self.leaf_writers
133 .insert(self.validate_path(field_path), strategy);
134 }
135 self
136 }
137
138 pub fn with_default_strategy(mut self, default: Arc<dyn LayoutStrategy>) -> Self {
140 self.fallback = default;
141 self
142 }
143
144 pub fn with_validity_strategy(mut self, validity: Arc<dyn LayoutStrategy>) -> Self {
146 self.validity = validity;
147 self
148 }
149}
150
151impl TableStrategy {
152 fn descend(&self, field: &Field) -> Self {
154 let mut new_writers = HashMap::with_capacity(self.leaf_writers.len());
157
158 for (field_path, strategy) in &self.leaf_writers {
159 if field_path.starts_with_field(field)
160 && let Some(subpath) = field_path.clone().step_into()
161 {
162 new_writers.insert(subpath, Arc::clone(strategy));
163 }
164 }
165
166 Self {
167 leaf_writers: new_writers,
168 validity: Arc::clone(&self.validity),
169 fallback: Arc::clone(&self.fallback),
170 }
171 }
172
173 fn validate_path(&self, path: FieldPath) -> FieldPath {
174 assert!(
175 !path.is_root(),
176 "Do not set override as a root strategy, instead set the default strategy"
177 );
178
179 for field_path in self.leaf_writers.keys() {
182 assert!(
183 !path.overlap(field_path),
184 "Override for field_path {path} conflicts with existing override for {field_path}"
185 );
186 }
187
188 path
189 }
190}
191
192#[async_trait]
194impl LayoutStrategy for TableStrategy {
195 async fn write_stream(
196 &self,
197 ctx: ArrayContext,
198 segment_sink: SegmentSinkRef,
199 stream: SendableSequentialStream,
200 mut eof: SequencePointer,
201 session: &VortexSession,
202 ) -> VortexResult<LayoutRef> {
203 let dtype = stream.dtype().clone();
204
205 if !dtype.is_struct() {
207 return self
208 .fallback
209 .write_stream(ctx, segment_sink, stream, eof, session)
210 .await;
211 }
212
213 let struct_dtype = dtype.as_struct_fields();
214
215 if HashSet::<_, DefaultHashBuilder>::from_iter(struct_dtype.names().iter()).len()
217 != struct_dtype.names().len()
218 {
219 vortex_bail!("StructLayout must have unique field names");
220 }
221 let is_nullable = dtype.is_nullable();
222
223 if struct_dtype.nfields() == 0 && !is_nullable {
226 let row_count = stream
227 .try_fold(
228 0u64,
229 |acc, (_, arr)| async move { Ok(acc + arr.len() as u64) },
230 )
231 .await?;
232 return Ok(StructLayout::new(row_count, dtype, vec![]).into_layout());
233 }
234
235 let columns_vec_stream = stream.map(move |chunk| {
237 let (sequence_id, chunk) = chunk?;
238 let mut sequence_pointer = sequence_id.descend();
239 let struct_chunk = chunk.to_struct();
240 let mut columns: Vec<(SequenceId, ArrayRef)> = Vec::new();
241 if is_nullable {
242 columns.push((
243 sequence_pointer.advance(),
244 chunk
245 .validity()?
246 .to_mask(chunk.len(), &mut LEGACY_SESSION.create_execution_ctx())?
247 .into_array(),
248 ));
249 }
250
251 columns.extend(
252 struct_chunk
253 .iter_unmasked_fields()
254 .map(|field| (sequence_pointer.advance(), field.clone())),
255 );
256
257 Ok(columns)
258 });
259
260 let mut stream_count = struct_dtype.nfields();
261 if is_nullable {
262 stream_count += 1;
263 }
264
265 let (column_streams_tx, column_streams_rx): (Vec<_>, Vec<_>) =
266 (0..stream_count).map(|_| kanal::bounded_async(1)).unzip();
267
268 let handle = session.handle();
270 handle
271 .spawn(async move {
272 pin_mut!(columns_vec_stream);
273 while let Some(result) = columns_vec_stream.next().await {
274 match result {
275 Ok(columns) => {
276 for (tx, column) in column_streams_tx.iter().zip_eq(columns.into_iter())
277 {
278 let _ = tx.send(Ok(column)).await;
279 }
280 }
281 Err(e) => {
282 let e: Arc<VortexError> = Arc::new(e);
283 for tx in column_streams_tx.iter() {
284 let _ = tx.send(Err(VortexError::from(Arc::clone(&e)))).await;
285 }
286 break;
287 }
288 }
289 }
290 })
291 .detach();
292
293 let column_dtypes: Vec<DType> = if is_nullable {
295 std::iter::once(DType::Bool(Nullability::NonNullable))
296 .chain(struct_dtype.fields())
297 .collect()
298 } else {
299 struct_dtype.fields().collect()
300 };
301
302 let column_names: Vec<FieldName> = if is_nullable {
303 std::iter::once(FieldName::from("__validity"))
304 .chain(struct_dtype.names().iter().cloned())
305 .collect()
306 } else {
307 struct_dtype.names().iter().cloned().collect()
308 };
309
310 let layout_futures: Vec<_> = column_dtypes
311 .into_iter()
312 .zip_eq(column_streams_rx)
313 .zip_eq(column_names)
314 .enumerate()
315 .map(move |(index, ((dtype, recv), name))| {
316 let column_stream =
317 SequentialStreamAdapter::new(dtype.clone(), recv.into_stream().boxed())
318 .sendable();
319 let child_eof = eof.split_off();
320 let field = Field::Name(name.clone());
321 let session = session.clone();
322 let ctx = ctx.clone();
323 let segment_sink = Arc::clone(&segment_sink);
324 handle.spawn_nested(move |h| {
325 let validity = Arc::clone(&self.validity);
326 let writer = self
328 .leaf_writers
329 .get(&FieldPath::from_name(name))
330 .cloned()
331 .unwrap_or_else(|| {
332 if dtype.is_struct() {
333 Arc::new(self.descend(&field))
335 } else {
336 Arc::clone(&self.fallback)
338 }
339 });
340 let session = session.with_handle(h);
341
342 async move {
343 if index == 0 && is_nullable {
347 validity
348 .write_stream(ctx, segment_sink, column_stream, child_eof, &session)
349 .await
350 } else {
351 writer
353 .write_stream(ctx, segment_sink, column_stream, child_eof, &session)
354 .await
355 }
356 }
357 })
358 })
359 .collect();
360
361 let column_layouts = try_join_all(layout_futures).await?;
362 let row_count = column_layouts.first().map(|l| l.row_count()).unwrap_or(0);
365 Ok(StructLayout::new(row_count, dtype, column_layouts).into_layout())
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use std::sync::Arc;
372
373 use vortex_array::dtype::FieldPath;
374 use vortex_array::field_path;
375
376 use crate::layouts::flat::writer::FlatLayoutStrategy;
377 use crate::layouts::table::TableStrategy;
378
379 #[test]
380 #[should_panic(
381 expected = "Override for field_path $a.$b conflicts with existing override for $a.$b.$c"
382 )]
383 fn test_overlapping_paths_fail() {
384 let flat = Arc::new(FlatLayoutStrategy::default());
385
386 let path = TableStrategy::new(
388 Arc::<FlatLayoutStrategy>::clone(&flat),
389 Arc::<FlatLayoutStrategy>::clone(&flat),
390 )
391 .with_field_writer(field_path!(a.b.c), Arc::<FlatLayoutStrategy>::clone(&flat));
392
393 let _path = path.with_field_writer(field_path!(a.b), flat);
395 }
396
397 #[test]
398 #[should_panic(
399 expected = "Do not set override as a root strategy, instead set the default strategy"
400 )]
401 fn test_root_override() {
402 let flat = Arc::new(FlatLayoutStrategy::default());
403 let _strategy = TableStrategy::new(
404 Arc::<FlatLayoutStrategy>::clone(&flat),
405 Arc::<FlatLayoutStrategy>::clone(&flat),
406 )
407 .with_field_writer(FieldPath::root(), flat);
408 }
409}