tantivy_columnar/columnar/merge/
mod.rs

1mod merge_dict_column;
2mod merge_mapping;
3mod term_merger;
4
5use std::collections::{BTreeMap, HashSet};
6use std::io;
7use std::net::Ipv6Addr;
8use std::sync::Arc;
9
10use itertools::Itertools;
11pub use merge_mapping::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder};
12
13use super::writer::ColumnarSerializer;
14use crate::column::{serialize_column_mappable_to_u128, serialize_column_mappable_to_u64};
15use crate::column_values::MergedColumnValues;
16use crate::columnar::merge::merge_dict_column::merge_bytes_or_str_column;
17use crate::columnar::writer::CompatibleNumericalTypes;
18use crate::columnar::ColumnarReader;
19use crate::dynamic_column::DynamicColumn;
20use crate::{
21    BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, DynamicColumnHandle, NumericalType,
22    NumericalValue,
23};
24
25/// Column types are grouped into different categories.
26/// After merge, all columns belonging to the same category are coerced to
27/// the same column type.
28///
29/// In practise, today, only Numerical colummns are coerced into one type today.
30///
31/// See also [README.md].
32///
33/// The ordering has to match the ordering of the variants in [ColumnType].
34#[derive(Copy, Clone, Eq, PartialOrd, Ord, PartialEq, Hash, Debug)]
35pub(crate) enum ColumnTypeCategory {
36    Numerical,
37    Bytes,
38    Str,
39    Bool,
40    IpAddr,
41    DateTime,
42}
43
44impl From<ColumnType> for ColumnTypeCategory {
45    fn from(column_type: ColumnType) -> Self {
46        match column_type {
47            ColumnType::I64 => ColumnTypeCategory::Numerical,
48            ColumnType::U64 => ColumnTypeCategory::Numerical,
49            ColumnType::F64 => ColumnTypeCategory::Numerical,
50            ColumnType::Bytes => ColumnTypeCategory::Bytes,
51            ColumnType::Str => ColumnTypeCategory::Str,
52            ColumnType::Bool => ColumnTypeCategory::Bool,
53            ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
54            ColumnType::DateTime => ColumnTypeCategory::DateTime,
55        }
56    }
57}
58
59/// Merge several columnar table together.
60///
61/// If several columns with the same name are conflicting with the numerical types in the
62/// input columnars, the first type compatible out of i64, u64, f64 in that order will be used.
63///
64/// `require_columns` makes it possible to ensure that some columns will be present in the
65/// resulting columnar. When a required column is a numerical column type, one of two things can
66/// happen:
67/// - If the required column type is compatible with all of the input columnar, the resulsting
68///   merged
69/// columnar will simply coerce the input column and use the required column type.
70/// - If the required column type is incompatible with one of the input columnar, the merged
71/// will fail with an InvalidData error.
72///
73/// `merge_row_order` makes it possible to remove or reorder row in the resulting
74/// `Columnar` table.
75///
76/// Reminder: a string and a numerical column may bare the same column name. This is not
77/// considered a conflict.
78pub fn merge_columnar(
79    columnar_readers: &[&ColumnarReader],
80    required_columns: &[(String, ColumnType)],
81    merge_row_order: MergeRowOrder,
82    output: &mut impl io::Write,
83) -> io::Result<()> {
84    let mut serializer = ColumnarSerializer::new(output);
85    let num_rows_per_columnar = columnar_readers
86        .iter()
87        .map(|reader| reader.num_rows())
88        .collect::<Vec<u32>>();
89
90    let columns_to_merge =
91        group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?;
92    for res in columns_to_merge {
93        let ((column_name, _column_type_category), grouped_columns) = res;
94        let grouped_columns = grouped_columns.open(&merge_row_order)?;
95        if grouped_columns.is_empty() {
96            continue;
97        }
98
99        let column_type = grouped_columns.column_type_after_merge();
100        let mut columns = grouped_columns.columns;
101        coerce_columns(column_type, &mut columns)?;
102
103        let mut column_serializer =
104            serializer.start_serialize_column(column_name.as_bytes(), column_type);
105        merge_column(
106            column_type,
107            &num_rows_per_columnar,
108            columns,
109            &merge_row_order,
110            &mut column_serializer,
111        )?;
112        column_serializer.finalize()?;
113    }
114
115    serializer.finalize(merge_row_order.num_rows())?;
116    Ok(())
117}
118
119fn dynamic_column_to_u64_monotonic(dynamic_column: DynamicColumn) -> Option<Column<u64>> {
120    match dynamic_column {
121        DynamicColumn::Bool(column) => Some(column.to_u64_monotonic()),
122        DynamicColumn::I64(column) => Some(column.to_u64_monotonic()),
123        DynamicColumn::U64(column) => Some(column.to_u64_monotonic()),
124        DynamicColumn::F64(column) => Some(column.to_u64_monotonic()),
125        DynamicColumn::DateTime(column) => Some(column.to_u64_monotonic()),
126        DynamicColumn::IpAddr(_) | DynamicColumn::Bytes(_) | DynamicColumn::Str(_) => None,
127    }
128}
129
130fn merge_column(
131    column_type: ColumnType,
132    num_docs_per_column: &[u32],
133    columns: Vec<Option<DynamicColumn>>,
134    merge_row_order: &MergeRowOrder,
135    wrt: &mut impl io::Write,
136) -> io::Result<()> {
137    match column_type {
138        ColumnType::I64
139        | ColumnType::U64
140        | ColumnType::F64
141        | ColumnType::DateTime
142        | ColumnType::Bool => {
143            let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
144            let mut column_values: Vec<Option<Arc<dyn ColumnValues>>> =
145                Vec::with_capacity(columns.len());
146            for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
147                if let Some(Column { index: idx, values }) =
148                    dynamic_column_opt.and_then(dynamic_column_to_u64_monotonic)
149                {
150                    column_indexes.push(idx);
151                    column_values.push(Some(values));
152                } else {
153                    column_indexes.push(ColumnIndex::Empty {
154                        num_docs: num_docs_per_column[i],
155                    });
156                    column_values.push(None);
157                }
158            }
159            let merged_column_index =
160                crate::column_index::merge_column_index(&column_indexes[..], merge_row_order);
161            let merge_column_values = MergedColumnValues {
162                column_indexes: &column_indexes[..],
163                column_values: &column_values[..],
164                merge_row_order,
165            };
166            serialize_column_mappable_to_u64(merged_column_index, &merge_column_values, wrt)?;
167        }
168        ColumnType::IpAddr => {
169            let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
170            let mut column_values: Vec<Option<Arc<dyn ColumnValues<Ipv6Addr>>>> =
171                Vec::with_capacity(columns.len());
172            for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
173                if let Some(DynamicColumn::IpAddr(Column { index: idx, values })) =
174                    dynamic_column_opt
175                {
176                    column_indexes.push(idx);
177                    column_values.push(Some(values));
178                } else {
179                    column_indexes.push(ColumnIndex::Empty {
180                        num_docs: num_docs_per_column[i],
181                    });
182                    column_values.push(None);
183                }
184            }
185
186            let merged_column_index =
187                crate::column_index::merge_column_index(&column_indexes[..], merge_row_order);
188            let merge_column_values = MergedColumnValues {
189                column_indexes: &column_indexes[..],
190                column_values: &column_values,
191                merge_row_order,
192            };
193
194            serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?;
195        }
196        ColumnType::Bytes | ColumnType::Str => {
197            let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
198            let mut bytes_columns: Vec<Option<BytesColumn>> = Vec::with_capacity(columns.len());
199            for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
200                match dynamic_column_opt {
201                    Some(DynamicColumn::Str(str_column)) => {
202                        column_indexes.push(str_column.term_ord_column.index.clone());
203                        bytes_columns.push(Some(str_column.into()));
204                    }
205                    Some(DynamicColumn::Bytes(bytes_column)) => {
206                        column_indexes.push(bytes_column.term_ord_column.index.clone());
207                        bytes_columns.push(Some(bytes_column));
208                    }
209                    _ => {
210                        column_indexes.push(ColumnIndex::Empty {
211                            num_docs: num_docs_per_column[i],
212                        });
213                        bytes_columns.push(None);
214                    }
215                }
216            }
217            let merged_column_index =
218                crate::column_index::merge_column_index(&column_indexes[..], merge_row_order);
219            merge_bytes_or_str_column(merged_column_index, &bytes_columns, merge_row_order, wrt)?;
220        }
221    }
222    Ok(())
223}
224
225struct GroupedColumns {
226    required_column_type: Option<ColumnType>,
227    columns: Vec<Option<DynamicColumn>>,
228}
229
230impl GroupedColumns {
231    /// Check is column group can be skipped during serialization.
232    fn is_empty(&self) -> bool {
233        self.required_column_type.is_none() && self.columns.iter().all(Option::is_none)
234    }
235
236    /// Returns the column type after merge.
237    ///
238    /// This method does not check if the column types can actually be coerced to
239    /// this type.
240    fn column_type_after_merge(&self) -> ColumnType {
241        if let Some(required_type) = self.required_column_type {
242            return required_type;
243        }
244        let column_type: HashSet<ColumnType> = self
245            .columns
246            .iter()
247            .flatten()
248            .map(|column| column.column_type())
249            .collect();
250        if column_type.len() == 1 {
251            return column_type.into_iter().next().unwrap();
252        }
253        // At the moment, only the numerical categorical column type has more than one possible
254        // column type.
255        assert!(self
256            .columns
257            .iter()
258            .flatten()
259            .all(|el| ColumnTypeCategory::from(el.column_type()) == ColumnTypeCategory::Numerical));
260        merged_numerical_columns_type(self.columns.iter().flatten()).into()
261    }
262}
263
264struct GroupedColumnsHandle {
265    required_column_type: Option<ColumnType>,
266    columns: Vec<Option<DynamicColumnHandle>>,
267}
268
269impl GroupedColumnsHandle {
270    fn new(num_columnars: usize) -> Self {
271        GroupedColumnsHandle {
272            required_column_type: None,
273            columns: vec![None; num_columnars],
274        }
275    }
276    fn open(self, merge_row_order: &MergeRowOrder) -> io::Result<GroupedColumns> {
277        let mut columns: Vec<Option<DynamicColumn>> = Vec::new();
278        for (columnar_id, column) in self.columns.iter().enumerate() {
279            if let Some(column) = column {
280                let column = column.open()?;
281                // We skip columns that end up with 0 documents.
282                // That way, we make sure they don't end up influencing the merge type or
283                // creating empty columns.
284
285                if is_empty_after_merge(merge_row_order, &column, columnar_id) {
286                    columns.push(None);
287                } else {
288                    columns.push(Some(column));
289                }
290            } else {
291                columns.push(None);
292            }
293        }
294        Ok(GroupedColumns {
295            required_column_type: self.required_column_type,
296            columns,
297        })
298    }
299
300    /// Set the dynamic column for a given columnar.
301    fn set_column(&mut self, columnar_id: usize, column: DynamicColumnHandle) {
302        self.columns[columnar_id] = Some(column);
303    }
304
305    /// Force the existence of a column, as well as its type.
306    fn require_type(&mut self, required_type: ColumnType) -> io::Result<()> {
307        if let Some(existing_required_type) = self.required_column_type {
308            if existing_required_type == required_type {
309                // This was just a duplicate in the `required_columns`.
310                // Nothing to do.
311                return Ok(());
312            } else {
313                return Err(io::Error::new(
314                    io::ErrorKind::InvalidInput,
315                    "Required column conflicts with another required column of the same type \
316                     category.",
317                ));
318            }
319        }
320        self.required_column_type = Some(required_type);
321        Ok(())
322    }
323}
324
325/// Returns the type of the merged numerical column.
326///
327/// This function picks the first numerical type out of i64, u64, f64 (order matters
328/// here), that is compatible with all the `columns`.
329///
330/// # Panics
331/// Panics if one of the column is not numerical.
332fn merged_numerical_columns_type<'a>(
333    columns: impl Iterator<Item = &'a DynamicColumn>,
334) -> NumericalType {
335    let mut compatible_numerical_types = CompatibleNumericalTypes::default();
336    for column in columns {
337        let (min_value, max_value) =
338            min_max_if_numerical(column).expect("All columns re required to be numerical");
339        compatible_numerical_types.accept_value(min_value);
340        compatible_numerical_types.accept_value(max_value);
341    }
342    compatible_numerical_types.to_numerical_type()
343}
344
345fn is_empty_after_merge(
346    merge_row_order: &MergeRowOrder,
347    column: &DynamicColumn,
348    columnar_ord: usize,
349) -> bool {
350    if column.num_values() == 0u32 {
351        // It was empty before the merge.
352        return true;
353    }
354    match merge_row_order {
355        MergeRowOrder::Stack(_) => {
356            // If we are stacking the columnar, no rows are being deleted.
357            false
358        }
359        MergeRowOrder::Shuffled(shuffled) => {
360            if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_ord] {
361                let column_index = column.column_index();
362                match column_index {
363                    ColumnIndex::Empty { .. } => true,
364                    ColumnIndex::Full => alive_bitset.len() == 0,
365                    ColumnIndex::Optional(optional_index) => {
366                        for doc in optional_index.iter_rows() {
367                            if alive_bitset.contains(doc) {
368                                return false;
369                            }
370                        }
371                        true
372                    }
373                    ColumnIndex::Multivalued(multivalued_index) => {
374                        for (doc_id, (start_index, end_index)) in multivalued_index
375                            .start_index_column
376                            .iter()
377                            .tuple_windows()
378                            .enumerate()
379                        {
380                            let doc_id = doc_id as u32;
381                            if start_index == end_index {
382                                // There are no values in this document
383                                continue;
384                            }
385                            // The document contains values and is present in the alive bitset.
386                            // The column is therefore not empty.
387                            if alive_bitset.contains(doc_id) {
388                                return false;
389                            }
390                        }
391                        true
392                    }
393                }
394            } else {
395                // No document is being deleted.
396                // The shuffle is applying a permutation.
397                false
398            }
399        }
400    }
401}
402
403/// Iterates over the columns of the columnar readers, grouped by column name.
404/// Key functionality is that `open` of the Columns is done lazy per group.
405fn group_columns_for_merge<'a>(
406    columnar_readers: &'a [&'a ColumnarReader],
407    required_columns: &'a [(String, ColumnType)],
408    _merge_row_order: &'a MergeRowOrder,
409) -> io::Result<BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle>> {
410    let mut columns: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = BTreeMap::new();
411
412    for &(ref column_name, column_type) in required_columns {
413        columns
414            .entry((column_name.clone(), column_type.into()))
415            .or_insert_with(|| GroupedColumnsHandle::new(columnar_readers.len()))
416            .require_type(column_type)?;
417    }
418
419    for (columnar_id, columnar_reader) in columnar_readers.iter().enumerate() {
420        let column_name_and_handle = columnar_reader.iter_columns()?;
421
422        for (column_name, handle) in column_name_and_handle {
423            let column_category: ColumnTypeCategory = handle.column_type().into();
424            columns
425                .entry((column_name, column_category))
426                .or_insert_with(|| GroupedColumnsHandle::new(columnar_readers.len()))
427                .set_column(columnar_id, handle);
428        }
429    }
430    Ok(columns)
431}
432
433fn coerce_columns(
434    column_type: ColumnType,
435    columns: &mut [Option<DynamicColumn>],
436) -> io::Result<()> {
437    for column_opt in columns.iter_mut() {
438        if let Some(column) = column_opt.take() {
439            *column_opt = Some(coerce_column(column_type, column)?);
440        }
441    }
442    Ok(())
443}
444
445fn coerce_column(column_type: ColumnType, column: DynamicColumn) -> io::Result<DynamicColumn> {
446    if let Some(numerical_type) = column_type.numerical_type() {
447        column
448            .coerce_numerical(numerical_type)
449            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, ""))
450    } else {
451        if column.column_type() != column_type {
452            return Err(io::Error::new(
453                io::ErrorKind::InvalidInput,
454                format!(
455                    "Cannot coerce column of type `{:?}` to `{column_type:?}`",
456                    column.column_type()
457                ),
458            ));
459        }
460        Ok(column)
461    }
462}
463
464/// Returns the (min, max) of a column provided it is numerical (i64, u64. f64).
465///
466/// The min and the max are simply the numerical value as defined by `ColumnValue::min_value()`,
467/// and `ColumnValue::max_value()`.
468///
469/// It is important to note that these values are only guaranteed to be lower/upper bound
470/// (as opposed to min/max value).
471/// If a column is empty, the min and max values are currently set to 0.
472fn min_max_if_numerical(column: &DynamicColumn) -> Option<(NumericalValue, NumericalValue)> {
473    match column {
474        DynamicColumn::I64(column) => Some((column.min_value().into(), column.max_value().into())),
475        DynamicColumn::U64(column) => Some((column.min_value().into(), column.max_value().into())),
476        DynamicColumn::F64(column) => Some((column.min_value().into(), column.max_value().into())),
477        DynamicColumn::Bool(_)
478        | DynamicColumn::IpAddr(_)
479        | DynamicColumn::DateTime(_)
480        | DynamicColumn::Bytes(_)
481        | DynamicColumn::Str(_) => None,
482    }
483}
484
485#[cfg(test)]
486mod tests;