tantivy_columnar/columnar/merge/
mod.rs1mod merge_dict_column;
2mod merge_mapping;
3mod term_merger;
4
5use std::collections::{BTreeMap, HashSet};
6use std::io;
7use std::net::Ipv6Addr;
8use std::sync::Arc;
9
10use itertools::Itertools;
11pub use merge_mapping::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder};
12
13use super::writer::ColumnarSerializer;
14use crate::column::{serialize_column_mappable_to_u128, serialize_column_mappable_to_u64};
15use crate::column_values::MergedColumnValues;
16use crate::columnar::merge::merge_dict_column::merge_bytes_or_str_column;
17use crate::columnar::writer::CompatibleNumericalTypes;
18use crate::columnar::ColumnarReader;
19use crate::dynamic_column::DynamicColumn;
20use crate::{
21 BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, DynamicColumnHandle, NumericalType,
22 NumericalValue,
23};
24
25#[derive(Copy, Clone, Eq, PartialOrd, Ord, PartialEq, Hash, Debug)]
35pub(crate) enum ColumnTypeCategory {
36 Numerical,
37 Bytes,
38 Str,
39 Bool,
40 IpAddr,
41 DateTime,
42}
43
44impl From<ColumnType> for ColumnTypeCategory {
45 fn from(column_type: ColumnType) -> Self {
46 match column_type {
47 ColumnType::I64 => ColumnTypeCategory::Numerical,
48 ColumnType::U64 => ColumnTypeCategory::Numerical,
49 ColumnType::F64 => ColumnTypeCategory::Numerical,
50 ColumnType::Bytes => ColumnTypeCategory::Bytes,
51 ColumnType::Str => ColumnTypeCategory::Str,
52 ColumnType::Bool => ColumnTypeCategory::Bool,
53 ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
54 ColumnType::DateTime => ColumnTypeCategory::DateTime,
55 }
56 }
57}
58
59pub fn merge_columnar(
79 columnar_readers: &[&ColumnarReader],
80 required_columns: &[(String, ColumnType)],
81 merge_row_order: MergeRowOrder,
82 output: &mut impl io::Write,
83) -> io::Result<()> {
84 let mut serializer = ColumnarSerializer::new(output);
85 let num_rows_per_columnar = columnar_readers
86 .iter()
87 .map(|reader| reader.num_rows())
88 .collect::<Vec<u32>>();
89
90 let columns_to_merge =
91 group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?;
92 for res in columns_to_merge {
93 let ((column_name, _column_type_category), grouped_columns) = res;
94 let grouped_columns = grouped_columns.open(&merge_row_order)?;
95 if grouped_columns.is_empty() {
96 continue;
97 }
98
99 let column_type = grouped_columns.column_type_after_merge();
100 let mut columns = grouped_columns.columns;
101 coerce_columns(column_type, &mut columns)?;
102
103 let mut column_serializer =
104 serializer.start_serialize_column(column_name.as_bytes(), column_type);
105 merge_column(
106 column_type,
107 &num_rows_per_columnar,
108 columns,
109 &merge_row_order,
110 &mut column_serializer,
111 )?;
112 column_serializer.finalize()?;
113 }
114
115 serializer.finalize(merge_row_order.num_rows())?;
116 Ok(())
117}
118
119fn dynamic_column_to_u64_monotonic(dynamic_column: DynamicColumn) -> Option<Column<u64>> {
120 match dynamic_column {
121 DynamicColumn::Bool(column) => Some(column.to_u64_monotonic()),
122 DynamicColumn::I64(column) => Some(column.to_u64_monotonic()),
123 DynamicColumn::U64(column) => Some(column.to_u64_monotonic()),
124 DynamicColumn::F64(column) => Some(column.to_u64_monotonic()),
125 DynamicColumn::DateTime(column) => Some(column.to_u64_monotonic()),
126 DynamicColumn::IpAddr(_) | DynamicColumn::Bytes(_) | DynamicColumn::Str(_) => None,
127 }
128}
129
130fn merge_column(
131 column_type: ColumnType,
132 num_docs_per_column: &[u32],
133 columns: Vec<Option<DynamicColumn>>,
134 merge_row_order: &MergeRowOrder,
135 wrt: &mut impl io::Write,
136) -> io::Result<()> {
137 match column_type {
138 ColumnType::I64
139 | ColumnType::U64
140 | ColumnType::F64
141 | ColumnType::DateTime
142 | ColumnType::Bool => {
143 let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
144 let mut column_values: Vec<Option<Arc<dyn ColumnValues>>> =
145 Vec::with_capacity(columns.len());
146 for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
147 if let Some(Column { index: idx, values }) =
148 dynamic_column_opt.and_then(dynamic_column_to_u64_monotonic)
149 {
150 column_indexes.push(idx);
151 column_values.push(Some(values));
152 } else {
153 column_indexes.push(ColumnIndex::Empty {
154 num_docs: num_docs_per_column[i],
155 });
156 column_values.push(None);
157 }
158 }
159 let merged_column_index =
160 crate::column_index::merge_column_index(&column_indexes[..], merge_row_order);
161 let merge_column_values = MergedColumnValues {
162 column_indexes: &column_indexes[..],
163 column_values: &column_values[..],
164 merge_row_order,
165 };
166 serialize_column_mappable_to_u64(merged_column_index, &merge_column_values, wrt)?;
167 }
168 ColumnType::IpAddr => {
169 let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
170 let mut column_values: Vec<Option<Arc<dyn ColumnValues<Ipv6Addr>>>> =
171 Vec::with_capacity(columns.len());
172 for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
173 if let Some(DynamicColumn::IpAddr(Column { index: idx, values })) =
174 dynamic_column_opt
175 {
176 column_indexes.push(idx);
177 column_values.push(Some(values));
178 } else {
179 column_indexes.push(ColumnIndex::Empty {
180 num_docs: num_docs_per_column[i],
181 });
182 column_values.push(None);
183 }
184 }
185
186 let merged_column_index =
187 crate::column_index::merge_column_index(&column_indexes[..], merge_row_order);
188 let merge_column_values = MergedColumnValues {
189 column_indexes: &column_indexes[..],
190 column_values: &column_values,
191 merge_row_order,
192 };
193
194 serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?;
195 }
196 ColumnType::Bytes | ColumnType::Str => {
197 let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns.len());
198 let mut bytes_columns: Vec<Option<BytesColumn>> = Vec::with_capacity(columns.len());
199 for (i, dynamic_column_opt) in columns.into_iter().enumerate() {
200 match dynamic_column_opt {
201 Some(DynamicColumn::Str(str_column)) => {
202 column_indexes.push(str_column.term_ord_column.index.clone());
203 bytes_columns.push(Some(str_column.into()));
204 }
205 Some(DynamicColumn::Bytes(bytes_column)) => {
206 column_indexes.push(bytes_column.term_ord_column.index.clone());
207 bytes_columns.push(Some(bytes_column));
208 }
209 _ => {
210 column_indexes.push(ColumnIndex::Empty {
211 num_docs: num_docs_per_column[i],
212 });
213 bytes_columns.push(None);
214 }
215 }
216 }
217 let merged_column_index =
218 crate::column_index::merge_column_index(&column_indexes[..], merge_row_order);
219 merge_bytes_or_str_column(merged_column_index, &bytes_columns, merge_row_order, wrt)?;
220 }
221 }
222 Ok(())
223}
224
225struct GroupedColumns {
226 required_column_type: Option<ColumnType>,
227 columns: Vec<Option<DynamicColumn>>,
228}
229
230impl GroupedColumns {
231 fn is_empty(&self) -> bool {
233 self.required_column_type.is_none() && self.columns.iter().all(Option::is_none)
234 }
235
236 fn column_type_after_merge(&self) -> ColumnType {
241 if let Some(required_type) = self.required_column_type {
242 return required_type;
243 }
244 let column_type: HashSet<ColumnType> = self
245 .columns
246 .iter()
247 .flatten()
248 .map(|column| column.column_type())
249 .collect();
250 if column_type.len() == 1 {
251 return column_type.into_iter().next().unwrap();
252 }
253 assert!(self
256 .columns
257 .iter()
258 .flatten()
259 .all(|el| ColumnTypeCategory::from(el.column_type()) == ColumnTypeCategory::Numerical));
260 merged_numerical_columns_type(self.columns.iter().flatten()).into()
261 }
262}
263
264struct GroupedColumnsHandle {
265 required_column_type: Option<ColumnType>,
266 columns: Vec<Option<DynamicColumnHandle>>,
267}
268
269impl GroupedColumnsHandle {
270 fn new(num_columnars: usize) -> Self {
271 GroupedColumnsHandle {
272 required_column_type: None,
273 columns: vec![None; num_columnars],
274 }
275 }
276 fn open(self, merge_row_order: &MergeRowOrder) -> io::Result<GroupedColumns> {
277 let mut columns: Vec<Option<DynamicColumn>> = Vec::new();
278 for (columnar_id, column) in self.columns.iter().enumerate() {
279 if let Some(column) = column {
280 let column = column.open()?;
281 if is_empty_after_merge(merge_row_order, &column, columnar_id) {
286 columns.push(None);
287 } else {
288 columns.push(Some(column));
289 }
290 } else {
291 columns.push(None);
292 }
293 }
294 Ok(GroupedColumns {
295 required_column_type: self.required_column_type,
296 columns,
297 })
298 }
299
300 fn set_column(&mut self, columnar_id: usize, column: DynamicColumnHandle) {
302 self.columns[columnar_id] = Some(column);
303 }
304
305 fn require_type(&mut self, required_type: ColumnType) -> io::Result<()> {
307 if let Some(existing_required_type) = self.required_column_type {
308 if existing_required_type == required_type {
309 return Ok(());
312 } else {
313 return Err(io::Error::new(
314 io::ErrorKind::InvalidInput,
315 "Required column conflicts with another required column of the same type \
316 category.",
317 ));
318 }
319 }
320 self.required_column_type = Some(required_type);
321 Ok(())
322 }
323}
324
325fn merged_numerical_columns_type<'a>(
333 columns: impl Iterator<Item = &'a DynamicColumn>,
334) -> NumericalType {
335 let mut compatible_numerical_types = CompatibleNumericalTypes::default();
336 for column in columns {
337 let (min_value, max_value) =
338 min_max_if_numerical(column).expect("All columns re required to be numerical");
339 compatible_numerical_types.accept_value(min_value);
340 compatible_numerical_types.accept_value(max_value);
341 }
342 compatible_numerical_types.to_numerical_type()
343}
344
345fn is_empty_after_merge(
346 merge_row_order: &MergeRowOrder,
347 column: &DynamicColumn,
348 columnar_ord: usize,
349) -> bool {
350 if column.num_values() == 0u32 {
351 return true;
353 }
354 match merge_row_order {
355 MergeRowOrder::Stack(_) => {
356 false
358 }
359 MergeRowOrder::Shuffled(shuffled) => {
360 if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_ord] {
361 let column_index = column.column_index();
362 match column_index {
363 ColumnIndex::Empty { .. } => true,
364 ColumnIndex::Full => alive_bitset.len() == 0,
365 ColumnIndex::Optional(optional_index) => {
366 for doc in optional_index.iter_rows() {
367 if alive_bitset.contains(doc) {
368 return false;
369 }
370 }
371 true
372 }
373 ColumnIndex::Multivalued(multivalued_index) => {
374 for (doc_id, (start_index, end_index)) in multivalued_index
375 .start_index_column
376 .iter()
377 .tuple_windows()
378 .enumerate()
379 {
380 let doc_id = doc_id as u32;
381 if start_index == end_index {
382 continue;
384 }
385 if alive_bitset.contains(doc_id) {
388 return false;
389 }
390 }
391 true
392 }
393 }
394 } else {
395 false
398 }
399 }
400 }
401}
402
403fn group_columns_for_merge<'a>(
406 columnar_readers: &'a [&'a ColumnarReader],
407 required_columns: &'a [(String, ColumnType)],
408 _merge_row_order: &'a MergeRowOrder,
409) -> io::Result<BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle>> {
410 let mut columns: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = BTreeMap::new();
411
412 for &(ref column_name, column_type) in required_columns {
413 columns
414 .entry((column_name.clone(), column_type.into()))
415 .or_insert_with(|| GroupedColumnsHandle::new(columnar_readers.len()))
416 .require_type(column_type)?;
417 }
418
419 for (columnar_id, columnar_reader) in columnar_readers.iter().enumerate() {
420 let column_name_and_handle = columnar_reader.iter_columns()?;
421
422 for (column_name, handle) in column_name_and_handle {
423 let column_category: ColumnTypeCategory = handle.column_type().into();
424 columns
425 .entry((column_name, column_category))
426 .or_insert_with(|| GroupedColumnsHandle::new(columnar_readers.len()))
427 .set_column(columnar_id, handle);
428 }
429 }
430 Ok(columns)
431}
432
433fn coerce_columns(
434 column_type: ColumnType,
435 columns: &mut [Option<DynamicColumn>],
436) -> io::Result<()> {
437 for column_opt in columns.iter_mut() {
438 if let Some(column) = column_opt.take() {
439 *column_opt = Some(coerce_column(column_type, column)?);
440 }
441 }
442 Ok(())
443}
444
445fn coerce_column(column_type: ColumnType, column: DynamicColumn) -> io::Result<DynamicColumn> {
446 if let Some(numerical_type) = column_type.numerical_type() {
447 column
448 .coerce_numerical(numerical_type)
449 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, ""))
450 } else {
451 if column.column_type() != column_type {
452 return Err(io::Error::new(
453 io::ErrorKind::InvalidInput,
454 format!(
455 "Cannot coerce column of type `{:?}` to `{column_type:?}`",
456 column.column_type()
457 ),
458 ));
459 }
460 Ok(column)
461 }
462}
463
464fn min_max_if_numerical(column: &DynamicColumn) -> Option<(NumericalValue, NumericalValue)> {
473 match column {
474 DynamicColumn::I64(column) => Some((column.min_value().into(), column.max_value().into())),
475 DynamicColumn::U64(column) => Some((column.min_value().into(), column.max_value().into())),
476 DynamicColumn::F64(column) => Some((column.min_value().into(), column.max_value().into())),
477 DynamicColumn::Bool(_)
478 | DynamicColumn::IpAddr(_)
479 | DynamicColumn::DateTime(_)
480 | DynamicColumn::Bytes(_)
481 | DynamicColumn::Str(_) => None,
482 }
483}
484
485#[cfg(test)]
486mod tests;