Skip to main content

vortex_layout/layouts/
table.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! A configurable writer strategy for tabular data.
5//!
6//! Allows the caller to override specific leaf fields with custom layout strategies.
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use futures::StreamExt;
12use futures::TryStreamExt;
13use futures::future::try_join_all;
14use futures::pin_mut;
15use itertools::Itertools;
16use vortex_array::ArrayContext;
17use vortex_array::ArrayRef;
18use vortex_array::IntoArray;
19use vortex_array::LEGACY_SESSION;
20use vortex_array::ToCanonical;
21use vortex_array::VortexSessionExecute;
22use vortex_array::arrays::struct_::StructArrayExt;
23use vortex_array::dtype::DType;
24use vortex_array::dtype::Field;
25use vortex_array::dtype::FieldName;
26use vortex_array::dtype::FieldPath;
27use vortex_array::dtype::Nullability;
28use vortex_error::VortexError;
29use vortex_error::VortexResult;
30use vortex_error::vortex_bail;
31use vortex_io::kanal_ext::KanalExt;
32use vortex_io::session::RuntimeSessionExt;
33use vortex_session::VortexSession;
34use vortex_utils::aliases::DefaultHashBuilder;
35use vortex_utils::aliases::hash_map::HashMap;
36use vortex_utils::aliases::hash_set::HashSet;
37
38use crate::IntoLayout;
39use crate::LayoutRef;
40use crate::LayoutStrategy;
41use crate::layouts::struct_::StructLayout;
42use crate::segments::SegmentSinkRef;
43use crate::sequence::SendableSequentialStream;
44use crate::sequence::SequenceId;
45use crate::sequence::SequencePointer;
46use crate::sequence::SequentialStreamAdapter;
47use crate::sequence::SequentialStreamExt;
48
49/// A configurable strategy for writing tables with nested field columns, allowing
50/// overrides for specific leaf columns.
51pub struct TableStrategy {
52    /// A set of leaf field overrides, e.g. to force one column to be compact-compressed.
53    leaf_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
54    /// The writer for any validity arrays that may be present
55    validity: Arc<dyn LayoutStrategy>,
56    /// The fallback writer for any fields that do not have an explicit writer set in `leaf_writers`
57    fallback: Arc<dyn LayoutStrategy>,
58}
59
60impl TableStrategy {
61    /// Create a new writer with the specified write strategies for validity, and for all leaf
62    /// fields, with no overrides.
63    ///
64    /// Additional overrides can be configured using the `with_leaf_strategy` method.
65    ///
66    /// ## Example
67    ///
68    /// ```ignore
69    /// # use std::sync::Arc;
70    /// # use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
71    /// # use vortex_layout::layouts::table::TableStrategy;
72    ///
73    /// // Build a write strategy that does not compress validity or any leaf fields.
74    /// let flat = Arc::new(FlatLayoutStrategy::default());
75    ///
76    /// let strategy = TableStrategy::new(Arc::<FlatLayoutStrategy>::clone(&flat), Arc::<FlatLayoutStrategy>::clone(&flat));
77    /// ```
78    pub fn new(validity: Arc<dyn LayoutStrategy>, fallback: Arc<dyn LayoutStrategy>) -> Self {
79        Self {
80            leaf_writers: Default::default(),
81            validity,
82            fallback,
83        }
84    }
85
86    /// Add a custom write strategy for the given leaf field.
87    ///
88    /// ## Example
89    ///
90    /// ```ignore
91    /// # use std::sync::Arc;
92    /// # use vortex_array::dtype::{field_path, Field, FieldPath};
93    /// # use vortex_btrblocks::BtrBlocksCompressor;
94    /// # use vortex_layout::layouts::compressed::CompressingStrategy;
95    /// # use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
96    /// # use vortex_layout::layouts::table::TableStrategy;
97    ///
98    /// // A strategy for compressing data using the balanced BtrBlocks compressor.
99    /// let compress =
100    ///     CompressingStrategy::new(FlatLayoutStrategy::default(), BtrBlocksCompressor::default());
101    ///
102    /// // Our combined strategy uses no compression for validity buffers, BtrBlocks compression
103    /// // for most columns, and stores a nested binary column uncompressed (flat) because it
104    /// // is pre-compressed or never filtered on.
105    /// let strategy = TableStrategy::new(
106    ///         Arc::new(FlatLayoutStrategy::default()),
107    ///         Arc::new(compress),
108    ///     )
109    ///     .with_field_writer(
110    ///         field_path!(request.body.bytes),
111    ///         Arc::new(FlatLayoutStrategy::default()),
112    ///     );
113    /// ```
114    pub fn with_field_writer(
115        mut self,
116        field_path: impl Into<FieldPath>,
117        writer: Arc<dyn LayoutStrategy>,
118    ) -> Self {
119        self.leaf_writers
120            .insert(self.validate_path(field_path.into()), writer);
121        self
122    }
123
124    /// Set writers for several fields at once.
125    ///
126    /// See also: [`with_field_writer`][Self::with_field_writer].
127    pub fn with_field_writers(
128        mut self,
129        writers: impl IntoIterator<Item = (FieldPath, Arc<dyn LayoutStrategy>)>,
130    ) -> Self {
131        for (field_path, strategy) in writers {
132            self.leaf_writers
133                .insert(self.validate_path(field_path), strategy);
134        }
135        self
136    }
137
138    /// Override the default strategy for leaf columns that don't have overrides.
139    pub fn with_default_strategy(mut self, default: Arc<dyn LayoutStrategy>) -> Self {
140        self.fallback = default;
141        self
142    }
143
144    /// Override the strategy for compressing struct validity at all levels of the schema tree.
145    pub fn with_validity_strategy(mut self, validity: Arc<dyn LayoutStrategy>) -> Self {
146        self.validity = validity;
147        self
148    }
149}
150
151impl TableStrategy {
152    /// Descend into a subfield for the writer.
153    fn descend(&self, field: &Field) -> Self {
154        // Start with the existing set of overrides, then only retain the ones that contain
155        // the current field
156        let mut new_writers = HashMap::with_capacity(self.leaf_writers.len());
157
158        for (field_path, strategy) in &self.leaf_writers {
159            if field_path.starts_with_field(field)
160                && let Some(subpath) = field_path.clone().step_into()
161            {
162                new_writers.insert(subpath, Arc::clone(strategy));
163            }
164        }
165
166        Self {
167            leaf_writers: new_writers,
168            validity: Arc::clone(&self.validity),
169            fallback: Arc::clone(&self.fallback),
170        }
171    }
172
173    fn validate_path(&self, path: FieldPath) -> FieldPath {
174        assert!(
175            !path.is_root(),
176            "Do not set override as a root strategy, instead set the default strategy"
177        );
178
179        // Validate that the field path does not conflict with any overrides
180        // that we've added by overlapping.
181        for field_path in self.leaf_writers.keys() {
182            assert!(
183                !path.overlap(field_path),
184                "Override for field_path {path} conflicts with existing override for {field_path}"
185            );
186        }
187
188        path
189    }
190}
191
192/// Specialized strategy for when we exactly know the input schema.
193#[async_trait]
194impl LayoutStrategy for TableStrategy {
195    async fn write_stream(
196        &self,
197        ctx: ArrayContext,
198        segment_sink: SegmentSinkRef,
199        stream: SendableSequentialStream,
200        mut eof: SequencePointer,
201        session: &VortexSession,
202    ) -> VortexResult<LayoutRef> {
203        let dtype = stream.dtype().clone();
204
205        // Fallback: if the array is not a struct, fallback to writing a single array.
206        if !dtype.is_struct() {
207            return self
208                .fallback
209                .write_stream(ctx, segment_sink, stream, eof, session)
210                .await;
211        }
212
213        let struct_dtype = dtype.as_struct_fields();
214
215        // Check for unique field names at write time.
216        if HashSet::<_, DefaultHashBuilder>::from_iter(struct_dtype.names().iter()).len()
217            != struct_dtype.names().len()
218        {
219            vortex_bail!("StructLayout must have unique field names");
220        }
221        let is_nullable = dtype.is_nullable();
222
223        // Optimization: when there are no fields, don't spawn any work and just write a trivial
224        // StructLayout.
225        if struct_dtype.nfields() == 0 && !is_nullable {
226            let row_count = stream
227                .try_fold(
228                    0u64,
229                    |acc, (_, arr)| async move { Ok(acc + arr.len() as u64) },
230                )
231                .await?;
232            return Ok(StructLayout::new(row_count, dtype, vec![]).into_layout());
233        }
234
235        // stream<struct_chunk> -> stream<vec<column_chunk>>
236        let columns_vec_stream = stream.map(move |chunk| {
237            let (sequence_id, chunk) = chunk?;
238            let mut sequence_pointer = sequence_id.descend();
239            let struct_chunk = chunk.to_struct();
240            let mut columns: Vec<(SequenceId, ArrayRef)> = Vec::new();
241            if is_nullable {
242                columns.push((
243                    sequence_pointer.advance(),
244                    chunk
245                        .validity()?
246                        .to_mask(chunk.len(), &mut LEGACY_SESSION.create_execution_ctx())?
247                        .into_array(),
248                ));
249            }
250
251            columns.extend(
252                struct_chunk
253                    .iter_unmasked_fields()
254                    .map(|field| (sequence_pointer.advance(), field.clone())),
255            );
256
257            Ok(columns)
258        });
259
260        let mut stream_count = struct_dtype.nfields();
261        if is_nullable {
262            stream_count += 1;
263        }
264
265        let (column_streams_tx, column_streams_rx): (Vec<_>, Vec<_>) =
266            (0..stream_count).map(|_| kanal::bounded_async(1)).unzip();
267
268        // Spawn a task to fan out column chunks to their respective transposed streams
269        let handle = session.handle();
270        handle
271            .spawn(async move {
272                pin_mut!(columns_vec_stream);
273                while let Some(result) = columns_vec_stream.next().await {
274                    match result {
275                        Ok(columns) => {
276                            for (tx, column) in column_streams_tx.iter().zip_eq(columns.into_iter())
277                            {
278                                let _ = tx.send(Ok(column)).await;
279                            }
280                        }
281                        Err(e) => {
282                            let e: Arc<VortexError> = Arc::new(e);
283                            for tx in column_streams_tx.iter() {
284                                let _ = tx.send(Err(VortexError::from(Arc::clone(&e)))).await;
285                            }
286                            break;
287                        }
288                    }
289                }
290            })
291            .detach();
292
293        // First child column is the validity, subsequence children are the individual struct fields
294        let column_dtypes: Vec<DType> = if is_nullable {
295            std::iter::once(DType::Bool(Nullability::NonNullable))
296                .chain(struct_dtype.fields())
297                .collect()
298        } else {
299            struct_dtype.fields().collect()
300        };
301
302        let column_names: Vec<FieldName> = if is_nullable {
303            std::iter::once(FieldName::from("__validity"))
304                .chain(struct_dtype.names().iter().cloned())
305                .collect()
306        } else {
307            struct_dtype.names().iter().cloned().collect()
308        };
309
310        let layout_futures: Vec<_> = column_dtypes
311            .into_iter()
312            .zip_eq(column_streams_rx)
313            .zip_eq(column_names)
314            .enumerate()
315            .map(move |(index, ((dtype, recv), name))| {
316                let column_stream =
317                    SequentialStreamAdapter::new(dtype.clone(), recv.into_stream().boxed())
318                        .sendable();
319                let child_eof = eof.split_off();
320                let field = Field::Name(name.clone());
321                let session = session.clone();
322                let ctx = ctx.clone();
323                let segment_sink = Arc::clone(&segment_sink);
324                handle.spawn_nested(move |h| {
325                    let validity = Arc::clone(&self.validity);
326                    // descend further and try with new fields
327                    let writer = self
328                        .leaf_writers
329                        .get(&FieldPath::from_name(name))
330                        .cloned()
331                        .unwrap_or_else(|| {
332                            if dtype.is_struct() {
333                                // Step into the field path for struct columns
334                                Arc::new(self.descend(&field))
335                            } else {
336                                // Use fallback for leaf columns
337                                Arc::clone(&self.fallback)
338                            }
339                        });
340                    let session = session.with_handle(h);
341
342                    async move {
343                        // If we have a matching writer, we use it.
344                        // Otherwise, we descend into a new modified one.
345                        // Write validity stream
346                        if index == 0 && is_nullable {
347                            validity
348                                .write_stream(ctx, segment_sink, column_stream, child_eof, &session)
349                                .await
350                        } else {
351                            // Use the underlying writer, otherwise use the fallback writer.
352                            writer
353                                .write_stream(ctx, segment_sink, column_stream, child_eof, &session)
354                                .await
355                        }
356                    }
357                })
358            })
359            .collect();
360
361        let column_layouts = try_join_all(layout_futures).await?;
362        // TODO(os): transposed stream could count row counts as well,
363        // This must hold though, all columns must have the same row count of the struct layout
364        let row_count = column_layouts.first().map(|l| l.row_count()).unwrap_or(0);
365        Ok(StructLayout::new(row_count, dtype, column_layouts).into_layout())
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use std::sync::Arc;
372
373    use vortex_array::dtype::FieldPath;
374    use vortex_array::field_path;
375
376    use crate::layouts::flat::writer::FlatLayoutStrategy;
377    use crate::layouts::table::TableStrategy;
378
379    #[test]
380    #[should_panic(
381        expected = "Override for field_path $a.$b conflicts with existing override for $a.$b.$c"
382    )]
383    fn test_overlapping_paths_fail() {
384        let flat = Arc::new(FlatLayoutStrategy::default());
385
386        // Success
387        let path = TableStrategy::new(
388            Arc::<FlatLayoutStrategy>::clone(&flat),
389            Arc::<FlatLayoutStrategy>::clone(&flat),
390        )
391        .with_field_writer(field_path!(a.b.c), Arc::<FlatLayoutStrategy>::clone(&flat));
392
393        // Should panic right here.
394        let _path = path.with_field_writer(field_path!(a.b), flat);
395    }
396
397    #[test]
398    #[should_panic(
399        expected = "Do not set override as a root strategy, instead set the default strategy"
400    )]
401    fn test_root_override() {
402        let flat = Arc::new(FlatLayoutStrategy::default());
403        let _strategy = TableStrategy::new(
404            Arc::<FlatLayoutStrategy>::clone(&flat),
405            Arc::<FlatLayoutStrategy>::clone(&flat),
406        )
407        .with_field_writer(FieldPath::root(), flat);
408    }
409}