cherry_query/
lib.rs

1use anyhow::{anyhow, Context, Result};
2use arrow::array::{
3    Array, ArrowPrimitiveType, BinaryArray, BooleanArray, BooleanBuilder, GenericByteArray,
4    Int16Array, Int32Array, Int64Array, Int8Array, PrimitiveArray, StringArray, UInt16Array,
5    UInt32Array, UInt64Array, UInt8Array,
6};
7use arrow::buffer::BooleanBuffer;
8use arrow::compute;
9use arrow::datatypes::{ByteArrayType, DataType, ToByteSlice};
10use arrow::record_batch::RecordBatch;
11use arrow::row::{RowConverter, SortField};
12use hashbrown::HashTable;
13use rayon::prelude::*;
14use std::collections::btree_map::Entry;
15use std::collections::BTreeMap;
16use std::sync::Arc;
17use xxhash_rust::xxh3::xxh3_64;
18
19type TableName = String;
20type FieldName = String;
21
22#[derive(Clone)]
23pub struct Query {
24    pub selection: Arc<BTreeMap<TableName, Vec<TableSelection>>>,
25    pub fields: BTreeMap<TableName, Vec<FieldName>>,
26}
27
28impl Query {
29    pub fn add_request_and_include_fields(&mut self) -> Result<()> {
30        for (table_name, selections) in self.selection.iter() {
31            for selection in selections.iter() {
32                for col_name in selection.filters.keys() {
33                    let table_fields = self
34                        .fields
35                        .get_mut(table_name)
36                        .with_context(|| format!("get fields for table {}", table_name))?;
37                    table_fields.push(col_name.to_owned());
38                }
39
40                for include in selection.include.iter() {
41                    let other_table_fields = self
42                        .fields
43                        .get_mut(&include.other_table_name)
44                        .with_context(|| {
45                            format!("get fields for other table {}", include.other_table_name)
46                        })?;
47                    other_table_fields.extend_from_slice(&include.other_table_field_names);
48                    let table_fields = self
49                        .fields
50                        .get_mut(table_name)
51                        .with_context(|| format!("get fields for table {}", table_name))?;
52                    table_fields.extend_from_slice(&include.field_names);
53                }
54            }
55        }
56
57        Ok(())
58    }
59}
60
61pub struct TableSelection {
62    pub filters: BTreeMap<FieldName, Filter>,
63    pub include: Vec<Include>,
64}
65
66pub struct Include {
67    pub other_table_name: TableName,
68    pub field_names: Vec<FieldName>,
69    pub other_table_field_names: Vec<FieldName>,
70}
71
72pub enum Filter {
73    Contains(Contains),
74    StartsWith(StartsWith),
75    Bool(bool),
76}
77
78impl Filter {
79    pub fn contains(arr: Arc<dyn Array>) -> Result<Self> {
80        Ok(Self::Contains(Contains::new(arr)?))
81    }
82
83    pub fn starts_with(arr: Arc<dyn Array>) -> Result<Self> {
84        Ok(Self::StartsWith(StartsWith::new(arr)?))
85    }
86
87    pub fn bool(b: bool) -> Self {
88        Self::Bool(b)
89    }
90
91    fn check(&self, arr: &dyn Array) -> Result<BooleanArray> {
92        match self {
93            Self::Contains(ct) => ct.contains(arr),
94            Self::StartsWith(sw) => sw.starts_with(arr),
95            Self::Bool(b) => {
96                let arr = arr
97                    .as_any()
98                    .downcast_ref::<BooleanArray>()
99                    .context("cast array to boolean array")?;
100
101                let mut filter = if *b {
102                    arr.clone()
103                } else {
104                    compute::not(arr).context("negate array")?
105                };
106
107                if let Some(nulls) = filter.nulls() {
108                    if nulls.null_count() > 0 {
109                        let nulls = BooleanArray::from(nulls.inner().clone());
110                        filter = compute::and(&filter, &nulls).unwrap();
111                    }
112                }
113
114                Ok(filter)
115            }
116        }
117    }
118}
119
120pub struct Contains {
121    array: Arc<dyn Array>,
122    hash_table: Option<HashTable<usize>>,
123}
124
125impl Contains {
126    fn ht_from_primitive<T: ArrowPrimitiveType>(arr: &PrimitiveArray<T>) -> HashTable<usize> {
127        assert!(!arr.is_nullable());
128
129        let mut ht = HashTable::with_capacity(arr.len());
130
131        for (i, v) in arr.values().iter().enumerate() {
132            ht.insert_unique(xxh3_64(v.to_byte_slice()), i, |i| {
133                xxh3_64(unsafe { arr.value_unchecked(*i).to_byte_slice() })
134            });
135        }
136
137        ht
138    }
139
140    fn ht_from_bytes<T: ByteArrayType<Offset = i32>>(
141        arr: &GenericByteArray<T>,
142    ) -> HashTable<usize> {
143        assert!(!arr.is_nullable());
144
145        let mut ht = HashTable::with_capacity(arr.len());
146
147        for (i, v) in iter_byte_array_without_validity(arr).enumerate() {
148            ht.insert_unique(xxh3_64(v), i, |i| {
149                xxh3_64(unsafe { byte_array_get_unchecked(arr, *i) })
150            });
151        }
152
153        ht
154    }
155
156    fn ht_from_array(array: &dyn Array) -> Result<HashTable<usize>> {
157        let ht = match *array.data_type() {
158            DataType::UInt8 => {
159                let array = array.as_any().downcast_ref::<UInt8Array>().unwrap();
160                Self::ht_from_primitive(array)
161            }
162            DataType::UInt16 => {
163                let array = array.as_any().downcast_ref::<UInt16Array>().unwrap();
164                Self::ht_from_primitive(array)
165            }
166            DataType::UInt32 => {
167                let array = array.as_any().downcast_ref::<UInt32Array>().unwrap();
168                Self::ht_from_primitive(array)
169            }
170            DataType::UInt64 => {
171                let array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
172                Self::ht_from_primitive(array)
173            }
174            DataType::Int8 => {
175                let array = array.as_any().downcast_ref::<Int8Array>().unwrap();
176                Self::ht_from_primitive(array)
177            }
178            DataType::Int16 => {
179                let array = array.as_any().downcast_ref::<Int16Array>().unwrap();
180                Self::ht_from_primitive(array)
181            }
182            DataType::Int32 => {
183                let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
184                Self::ht_from_primitive(array)
185            }
186            DataType::Int64 => {
187                let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
188                Self::ht_from_primitive(array)
189            }
190            DataType::Binary => {
191                let array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
192                Self::ht_from_bytes(array)
193            }
194            DataType::Utf8 => {
195                let array = array.as_any().downcast_ref::<StringArray>().unwrap();
196                Self::ht_from_bytes(array)
197            }
198            _ => {
199                return Err(anyhow!("unsupported data type: {}", array.data_type()));
200            }
201        };
202
203        Ok(ht)
204    }
205
206    pub fn new(array: Arc<dyn Array>) -> Result<Self> {
207        if array.is_nullable() {
208            return Err(anyhow!(
209                "cannot construct contains filter with a nullable array"
210            ));
211        }
212
213        // only use a hash table if there are more than 128 elements
214        let hash_table = if array.len() >= 128 {
215            Some(Self::ht_from_array(&array).context("construct hash table")?)
216        } else {
217            None
218        };
219
220        Ok(Self { hash_table, array })
221    }
222
223    fn contains(&self, arr: &dyn Array) -> Result<BooleanArray> {
224        if arr.data_type() != self.array.data_type() {
225            return Err(anyhow!(
226                "filter array is of type {} but array to be filtered is of type {}",
227                self.array.data_type(),
228                arr.data_type(),
229            ));
230        }
231        assert!(!self.array.is_nullable());
232
233        let filter = match *arr.data_type() {
234            DataType::UInt8 => {
235                let self_arr = self.array.as_any().downcast_ref::<UInt8Array>().unwrap();
236                let other_arr = arr.as_any().downcast_ref().unwrap();
237                self.contains_primitive(self_arr, other_arr)
238            }
239            DataType::UInt16 => {
240                let self_arr = self.array.as_any().downcast_ref::<UInt16Array>().unwrap();
241                let other_arr = arr.as_any().downcast_ref().unwrap();
242                self.contains_primitive(self_arr, other_arr)
243            }
244            DataType::UInt32 => {
245                let self_arr = self.array.as_any().downcast_ref::<UInt32Array>().unwrap();
246                let other_arr = arr.as_any().downcast_ref().unwrap();
247                self.contains_primitive(self_arr, other_arr)
248            }
249            DataType::UInt64 => {
250                let self_arr = self.array.as_any().downcast_ref::<UInt64Array>().unwrap();
251                let other_arr = arr.as_any().downcast_ref().unwrap();
252                self.contains_primitive(self_arr, other_arr)
253            }
254            DataType::Int8 => {
255                let self_arr = self.array.as_any().downcast_ref::<Int8Array>().unwrap();
256                let other_arr = arr.as_any().downcast_ref().unwrap();
257                self.contains_primitive(self_arr, other_arr)
258            }
259            DataType::Int16 => {
260                let self_arr = self.array.as_any().downcast_ref::<Int16Array>().unwrap();
261                let other_arr = arr.as_any().downcast_ref().unwrap();
262                self.contains_primitive(self_arr, other_arr)
263            }
264            DataType::Int32 => {
265                let self_arr = self.array.as_any().downcast_ref::<Int32Array>().unwrap();
266                let other_arr = arr.as_any().downcast_ref().unwrap();
267                self.contains_primitive(self_arr, other_arr)
268            }
269            DataType::Int64 => {
270                let self_arr = self.array.as_any().downcast_ref::<Int64Array>().unwrap();
271                let other_arr = arr.as_any().downcast_ref().unwrap();
272                self.contains_primitive(self_arr, other_arr)
273            }
274            DataType::Binary => {
275                let self_arr = self.array.as_any().downcast_ref::<BinaryArray>().unwrap();
276                let other_arr = arr.as_any().downcast_ref().unwrap();
277                self.contains_bytes(self_arr, other_arr)
278            }
279            DataType::Utf8 => {
280                let self_arr = self.array.as_any().downcast_ref::<StringArray>().unwrap();
281                let other_arr = arr.as_any().downcast_ref().unwrap();
282                self.contains_bytes(self_arr, other_arr)
283            }
284            _ => {
285                return Err(anyhow!("unsupported data type: {}", arr.data_type()));
286            }
287        };
288
289        let mut filter = filter;
290
291        if let Some(nulls) = arr.nulls() {
292            if nulls.null_count() > 0 {
293                let nulls = BooleanArray::from(nulls.inner().clone());
294                filter = compute::and(&filter, &nulls).unwrap();
295            }
296        }
297
298        Ok(filter)
299    }
300
301    fn contains_primitive<T: ArrowPrimitiveType>(
302        &self,
303        self_arr: &PrimitiveArray<T>,
304        other_arr: &PrimitiveArray<T>,
305    ) -> BooleanArray {
306        let mut filter = BooleanBuilder::with_capacity(other_arr.len());
307
308        if let Some(ht) = self.hash_table.as_ref() {
309            let hash_one = |v: &T::Native| -> u64 { xxh3_64(v.to_byte_slice()) };
310
311            for v in other_arr.values().iter() {
312                let c = ht
313                    .find(hash_one(v), |idx| unsafe {
314                        self_arr.values().get_unchecked(*idx) == v
315                    })
316                    .is_some();
317                filter.append_value(c);
318            }
319        } else {
320            for v in other_arr.values().iter() {
321                filter.append_value(self_arr.values().iter().any(|x| x == v));
322            }
323        }
324
325        filter.finish()
326    }
327
328    fn contains_bytes<T: ByteArrayType<Offset = i32>>(
329        &self,
330        self_arr: &GenericByteArray<T>,
331        other_arr: &GenericByteArray<T>,
332    ) -> BooleanArray {
333        let mut filter = BooleanBuilder::with_capacity(other_arr.len());
334
335        if let Some(ht) = self.hash_table.as_ref() {
336            for v in iter_byte_array_without_validity(other_arr) {
337                let c = ht
338                    .find(xxh3_64(v), |idx| unsafe {
339                        byte_array_get_unchecked(self_arr, *idx) == v
340                    })
341                    .is_some();
342                filter.append_value(c);
343            }
344        } else {
345            for v in iter_byte_array_without_validity(other_arr) {
346                filter.append_value(iter_byte_array_without_validity(self_arr).any(|x| x == v));
347            }
348        }
349
350        filter.finish()
351    }
352}
353
354pub struct StartsWith {
355    array: Arc<dyn Array>,
356}
357
358impl StartsWith {
359    pub fn new(array: Arc<dyn Array>) -> Result<Self> {
360        if array.is_nullable() {
361            return Err(anyhow!(
362                "cannot construct starts_with filter with a nullable array"
363            ));
364        }
365
366        Ok(Self { array })
367    }
368
369    fn starts_with(&self, arr: &dyn Array) -> Result<BooleanArray> {
370        if arr.data_type() != self.array.data_type() {
371            return Err(anyhow!(
372                "filter array is of type {} but array to be filtered is of type {}",
373                self.array.data_type(),
374                arr.data_type(),
375            ));
376        }
377        assert!(!self.array.is_nullable());
378
379        let mut filter = match *arr.data_type() {
380            DataType::Binary => {
381                let self_arr = self.array.as_any().downcast_ref::<BinaryArray>().unwrap();
382                let other_arr = arr.as_any().downcast_ref().unwrap();
383                self.starts_with_bytes(self_arr, other_arr)
384            }
385            DataType::Utf8 => {
386                let self_arr = self.array.as_any().downcast_ref::<StringArray>().unwrap();
387                let other_arr = arr.as_any().downcast_ref().unwrap();
388                self.starts_with_bytes(self_arr, other_arr)
389            }
390            _ => {
391                return Err(anyhow!("unsupported data type: {}", arr.data_type()));
392            }
393        };
394
395        if let Some(nulls) = arr.nulls() {
396            if nulls.null_count() > 0 {
397                let nulls = BooleanArray::from(nulls.inner().clone());
398                filter = compute::and(&filter, &nulls).unwrap();
399            }
400        }
401
402        Ok(filter)
403    }
404
405    fn starts_with_bytes<T: ByteArrayType<Offset = i32>>(
406        &self,
407        self_arr: &GenericByteArray<T>,
408        other_arr: &GenericByteArray<T>,
409    ) -> BooleanArray {
410        let mut filter = BooleanBuilder::with_capacity(other_arr.len());
411
412        // For each value in other_arr, check if it starts with any value in self_arr
413        for v in iter_byte_array_without_validity(other_arr) {
414            let mut found = false;
415            for prefix in iter_byte_array_without_validity(self_arr) {
416                if v.starts_with(prefix) {
417                    found = true;
418                    break;
419                }
420            }
421            filter.append_value(found);
422        }
423
424        filter.finish()
425    }
426}
427
428// Taken from arrow-rs
429// https://docs.rs/arrow-array/54.2.1/src/arrow_array/array/byte_array.rs.html#278
430unsafe fn byte_array_get_unchecked<T: ByteArrayType<Offset = i32>>(
431    arr: &GenericByteArray<T>,
432    i: usize,
433) -> &[u8] {
434    let end = *arr.value_offsets().get_unchecked(i + 1);
435    let start = *arr.value_offsets().get_unchecked(i);
436
437    std::slice::from_raw_parts(
438        arr.value_data()
439            .as_ptr()
440            .offset(isize::try_from(start).unwrap()),
441        usize::try_from(end - start).unwrap(),
442    )
443}
444
445fn iter_byte_array_without_validity<T: ByteArrayType<Offset = i32>>(
446    arr: &GenericByteArray<T>,
447) -> impl Iterator<Item = &[u8]> {
448    (0..arr.len()).map(|i| unsafe { byte_array_get_unchecked(arr, i) })
449}
450
451pub fn run_query(
452    data: &BTreeMap<TableName, RecordBatch>,
453    query: &Query,
454) -> Result<BTreeMap<TableName, RecordBatch>> {
455    let filters = query
456        .selection
457        .par_iter()
458        .map(|(table_name, selections)| {
459            selections
460                .par_iter()
461                .enumerate()
462                .map(|(i, selection)| {
463                    run_table_selection(data, table_name, selection).with_context(|| {
464                        format!("run table selection no:{} for table {}", i, table_name)
465                    })
466                })
467                .collect::<Result<Vec<_>>>()
468        })
469        .collect::<Result<Vec<_>>>()?;
470
471    let data = select_fields(data, &query.fields).context("select fields")?;
472
473    data.par_iter()
474        .filter_map(|(table_name, table_data)| {
475            let mut combined_filter: Option<BooleanArray> = None;
476
477            for f in filters.iter() {
478                for f in f.iter() {
479                    let filter = match f.get(table_name) {
480                        Some(f) => f,
481                        None => continue,
482                    };
483
484                    match combined_filter.as_ref() {
485                        Some(e) => {
486                            let f = compute::or(e, filter)
487                                .with_context(|| format!("combine filters for {}", table_name));
488                            let f = match f {
489                                Ok(v) => v,
490                                Err(err) => return Some(Err(err)),
491                            };
492                            combined_filter = Some(f);
493                        }
494                        None => {
495                            combined_filter = Some(filter.clone());
496                        }
497                    }
498                }
499            }
500
501            let combined_filter = match combined_filter {
502                Some(f) => f,
503                None => return None,
504            };
505
506            let table_data = compute::filter_record_batch(table_data, &combined_filter)
507                .context("filter record batch");
508            let table_data = match table_data {
509                Ok(v) => v,
510                Err(err) => return Some(Err(err)),
511            };
512
513            Some(Ok((table_name.to_owned(), table_data)))
514        })
515        .collect()
516}
517
518pub fn select_fields(
519    data: &BTreeMap<TableName, RecordBatch>,
520    fields: &BTreeMap<TableName, Vec<FieldName>>,
521) -> Result<BTreeMap<TableName, RecordBatch>> {
522    let mut out = BTreeMap::new();
523
524    for (table_name, field_names) in fields.iter() {
525        let table_data = data
526            .get(table_name)
527            .with_context(|| format!("get data for table {}", table_name))?;
528
529        let indices = table_data
530            .schema_ref()
531            .fields()
532            .iter()
533            .enumerate()
534            .filter(|(_, field)| field_names.contains(field.name()))
535            .map(|(i, _)| i)
536            .collect::<Vec<usize>>();
537
538        let table_data = table_data
539            .project(&indices)
540            .with_context(|| format!("project table {}", table_name))?;
541        out.insert(table_name.to_owned(), table_data);
542    }
543
544    Ok(out)
545}
546
547fn run_table_selection(
548    data: &BTreeMap<TableName, RecordBatch>,
549    table_name: &str,
550    selection: &TableSelection,
551) -> Result<BTreeMap<TableName, BooleanArray>> {
552    let mut out = BTreeMap::new();
553
554    let table_data = data.get(table_name).context("get table data")?;
555    let mut combined_filter = None;
556    for (field_name, filter) in selection.filters.iter() {
557        let col = table_data
558            .column_by_name(field_name)
559            .with_context(|| format!("get field {}", field_name))?;
560
561        let f = filter
562            .check(&col)
563            .with_context(|| format!("check filter for column {}", field_name))?;
564
565        match combined_filter {
566            Some(cf) => {
567                combined_filter = Some(
568                    compute::and(&cf, &f)
569                        .with_context(|| format!("combine filter for column {}", field_name))?,
570                );
571            }
572            None => {
573                combined_filter = Some(f);
574            }
575        }
576    }
577
578    let combined_filter = match combined_filter {
579        Some(cf) => cf,
580        None => BooleanArray::new(BooleanBuffer::new_set(table_data.num_rows()), None),
581    };
582
583    out.insert(table_name.to_owned(), combined_filter.clone());
584
585    let mut filtered_cache = BTreeMap::new();
586
587    for (i, inc) in selection.include.iter().enumerate() {
588        if inc.other_table_field_names.len() != inc.field_names.len() {
589            return Err(anyhow!(
590                "field names are different for self table and other table while processing include no: {}. {} {}",
591                i,
592                inc.field_names.len(),
593                inc.other_table_field_names.len(),
594            ));
595        }
596
597        let other_table_data = data.get(&inc.other_table_name).with_context(|| {
598            format!(
599                "get data for table {} as other table data",
600                inc.other_table_name
601            )
602        })?;
603
604        let self_arr = columns_to_binary_array(table_data, &inc.field_names)
605            .context("get row format binary arr for self")?;
606
607        let contains = match filtered_cache.entry(inc.field_names.clone()) {
608            Entry::Vacant(entry) => {
609                let self_arr = compute::filter(&self_arr, &combined_filter)
610                    .context("apply combined filter to self arr")?;
611                let contains =
612                    Contains::new(Arc::new(self_arr)).context("create contains filter")?;
613                let contains = Arc::new(contains);
614                entry.insert(Arc::clone(&contains));
615                contains
616            }
617            Entry::Occupied(entry) => Arc::clone(entry.get()),
618        };
619
620        let other_arr = columns_to_binary_array(other_table_data, &inc.other_table_field_names)
621            .with_context(|| {
622                format!(
623                    "get row format binary arr for other table {}",
624                    inc.other_table_name
625                )
626            })?;
627
628        let f = contains
629            .contains(&other_arr)
630            .with_context(|| format!("run contains for other table {}", inc.other_table_name))?;
631
632        match out.entry(inc.other_table_name.clone()) {
633            Entry::Vacant(entry) => {
634                entry.insert(f);
635            }
636            Entry::Occupied(mut entry) => {
637                let new = compute::or(entry.get(), &f).with_context(|| {
638                    format!("or include filters for table {}", inc.other_table_name)
639                })?;
640                entry.insert(new);
641            }
642        }
643    }
644
645    Ok(out)
646}
647
648fn columns_to_binary_array(
649    table_data: &RecordBatch,
650    column_names: &[String],
651) -> Result<BinaryArray> {
652    let fields = column_names
653        .iter()
654        .map(|field_name| {
655            let f = table_data
656                .schema_ref()
657                .field_with_name(field_name)
658                .with_context(|| format!("get field {} from schema", field_name))?;
659            Ok(SortField::new(f.data_type().clone()))
660        })
661        .collect::<Result<Vec<_>>>()?;
662    let conv = RowConverter::new(fields).context("create row converter")?;
663
664    let columns = column_names
665        .iter()
666        .map(|field_name| {
667            let c = table_data
668                .column_by_name(field_name)
669                .with_context(|| format!("get data for column {}", field_name))?;
670            let c = Arc::clone(c);
671            Ok(c)
672        })
673        .collect::<Result<Vec<_>>>()?;
674
675    let rows = conv
676        .convert_columns(&columns)
677        .context("convert columns to row format")?;
678    let out = rows
679        .try_into_binary()
680        .context("convert row format to binary array")?;
681
682    Ok(out)
683}
684
685#[cfg(test)]
686mod tests {
687    use arrow::{
688        array::AsArray,
689        datatypes::{Field, Schema},
690    };
691
692    use super::*;
693
694    #[test]
695    fn basic_test_cherry_query() {
696        let team_a = RecordBatch::try_new(
697            Arc::new(Schema::new(vec![
698                Arc::new(Field::new("name", DataType::Utf8, true)),
699                Arc::new(Field::new("age", DataType::UInt64, true)),
700                Arc::new(Field::new("height", DataType::UInt64, true)),
701            ])),
702            vec![
703                Arc::new(StringArray::from_iter_values(
704                    vec!["kamil", "mahmut", "qwe", "kazim"].into_iter(),
705                )),
706                Arc::new(UInt64Array::from_iter(vec![11, 12, 13, 31].into_iter())),
707                Arc::new(UInt64Array::from_iter(vec![50, 60, 70, 60].into_iter())),
708            ],
709        )
710        .unwrap();
711        let team_b = RecordBatch::try_new(
712            Arc::new(Schema::new(vec![
713                Arc::new(Field::new("name2", DataType::Utf8, true)),
714                Arc::new(Field::new("age2", DataType::UInt64, true)),
715                Arc::new(Field::new("height2", DataType::UInt64, true)),
716            ])),
717            vec![
718                Arc::new(StringArray::from_iter_values(vec![
719                    "yusuf", "abuzer", "asd",
720                ])),
721                Arc::new(UInt64Array::from_iter(vec![11, 12, 13].into_iter())),
722                Arc::new(UInt64Array::from_iter(vec![50, 61, 70].into_iter())),
723            ],
724        )
725        .unwrap();
726
727        let query = Query {
728            fields: [
729                ("team_a".to_owned(), vec!["name".to_owned()]),
730                ("team_b".to_owned(), vec!["name2".to_owned()]),
731            ]
732            .into_iter()
733            .collect(),
734            selection: Arc::new(
735                [(
736                    "team_a".to_owned(),
737                    vec![TableSelection {
738                        filters: [(
739                            "name".to_owned(),
740                            Filter::Contains(
741                                Contains::new(Arc::new(StringArray::from_iter_values(
742                                    vec!["kamil", "mahmut"].into_iter(),
743                                )))
744                                .unwrap(),
745                            ),
746                        )]
747                        .into_iter()
748                        .collect(),
749                        include: vec![
750                            Include {
751                                field_names: vec!["age".to_owned(), "height".to_owned()],
752                                other_table_field_names: vec![
753                                    "age2".to_owned(),
754                                    "height2".to_owned(),
755                                ],
756                                other_table_name: "team_b".to_owned(),
757                            },
758                            Include {
759                                field_names: vec!["height".to_owned()],
760                                other_table_field_names: vec!["height".to_owned()],
761                                other_table_name: "team_a".to_owned(),
762                            },
763                        ],
764                    }],
765                )]
766                .into_iter()
767                .collect(),
768            ),
769        };
770
771        let data = [("team_a".to_owned(), team_a), ("team_b".to_owned(), team_b)]
772            .into_iter()
773            .collect::<BTreeMap<_, _>>();
774
775        let res = run_query(&data, &query).unwrap();
776
777        let team_a = res.get("team_a").unwrap();
778        let team_b = res.get("team_b").unwrap();
779
780        assert_eq!(res.len(), 2);
781
782        let name = team_a.column_by_name("name").unwrap();
783        let name2 = team_b.column_by_name("name2").unwrap();
784
785        assert_eq!(team_a.num_columns(), 1);
786        assert_eq!(team_b.num_columns(), 1);
787
788        assert_eq!(
789            name.as_string(),
790            &StringArray::from_iter_values(["kamil", "mahmut", "kazim"])
791        );
792        assert_eq!(name2.as_string(), &StringArray::from_iter_values(["yusuf"]));
793    }
794
795    #[test]
796    fn test_starts_with_filter() {
797        let data = RecordBatch::try_new(
798            Arc::new(Schema::new(vec![
799                Arc::new(Field::new("name", DataType::Utf8, true)),
800                Arc::new(Field::new("binary", DataType::Binary, true)),
801            ])),
802            vec![
803                Arc::new(StringArray::from_iter_values(
804                    vec!["hello", "world", "helloworld", "goodbye", "hell"].into_iter(),
805                )),
806                Arc::new(BinaryArray::from_iter_values(
807                    vec![b"hello", b"world", b"hepto", b"grace", b"heheh"].into_iter(),
808                )),
809            ],
810        )
811        .unwrap();
812
813        let query = Query {
814            fields: [(
815                "data".to_owned(),
816                vec!["name".to_owned(), "binary".to_owned()],
817            )]
818            .into_iter()
819            .collect(),
820            selection: Arc::new(
821                [(
822                    "data".to_owned(),
823                    vec![TableSelection {
824                        filters: [
825                            (
826                                "name".to_owned(),
827                                Filter::StartsWith(
828                                    StartsWith::new(Arc::new(StringArray::from_iter_values(
829                                        vec!["he"].into_iter(),
830                                    )))
831                                    .unwrap(),
832                                ),
833                            ),
834                            (
835                                "binary".to_owned(),
836                                Filter::StartsWith(
837                                    StartsWith::new(Arc::new(BinaryArray::from_iter_values(
838                                        vec![b"he"].into_iter(),
839                                    )))
840                                    .unwrap(),
841                                ),
842                            ),
843                        ]
844                        .into_iter()
845                        .collect(),
846                        include: vec![],
847                    }],
848                )]
849                .into_iter()
850                .collect(),
851            ),
852        };
853
854        let data = [("data".to_owned(), data)]
855            .into_iter()
856            .collect::<BTreeMap<_, _>>();
857
858        let res = run_query(&data, &query).unwrap();
859        let filtered = res.get("data").unwrap();
860
861        let name = filtered.column_by_name("name").unwrap();
862        let binary = filtered.column_by_name("binary").unwrap();
863        assert_eq!(
864            name.as_string(),
865            &StringArray::from_iter_values(["hello", "helloworld", "hell"])
866        );
867        assert_eq!(
868            binary.as_binary::<i32>(),
869            &BinaryArray::from_iter_values([b"hello", b"hepto", b"heheh"].into_iter())
870        );
871    }
872}