1use anyhow::{anyhow, Context, Result};
2use arrow::array::{
3 Array, ArrowPrimitiveType, BinaryArray, BooleanArray, BooleanBuilder, GenericByteArray,
4 Int16Array, Int32Array, Int64Array, Int8Array, PrimitiveArray, StringArray, UInt16Array,
5 UInt32Array, UInt64Array, UInt8Array,
6};
7use arrow::buffer::BooleanBuffer;
8use arrow::compute;
9use arrow::datatypes::{ByteArrayType, DataType, ToByteSlice};
10use arrow::record_batch::RecordBatch;
11use arrow::row::{RowConverter, SortField};
12use hashbrown::HashTable;
13use rayon::prelude::*;
14use std::collections::btree_map::Entry;
15use std::collections::BTreeMap;
16use std::sync::Arc;
17use xxhash_rust::xxh3::xxh3_64;
18
19type TableName = String;
20type FieldName = String;
21
22#[derive(Clone)]
23pub struct Query {
24 pub selection: Arc<BTreeMap<TableName, Vec<TableSelection>>>,
25 pub fields: BTreeMap<TableName, Vec<FieldName>>,
26}
27
28impl Query {
29 pub fn add_request_and_include_fields(&mut self) -> Result<()> {
30 for (table_name, selections) in self.selection.iter() {
31 for selection in selections.iter() {
32 for col_name in selection.filters.keys() {
33 let table_fields = self
34 .fields
35 .get_mut(table_name)
36 .with_context(|| format!("get fields for table {}", table_name))?;
37 table_fields.push(col_name.to_owned());
38 }
39
40 for include in selection.include.iter() {
41 let other_table_fields = self
42 .fields
43 .get_mut(&include.other_table_name)
44 .with_context(|| {
45 format!("get fields for other table {}", include.other_table_name)
46 })?;
47 other_table_fields.extend_from_slice(&include.other_table_field_names);
48 let table_fields = self
49 .fields
50 .get_mut(table_name)
51 .with_context(|| format!("get fields for table {}", table_name))?;
52 table_fields.extend_from_slice(&include.field_names);
53 }
54 }
55 }
56
57 Ok(())
58 }
59}
60
61pub struct TableSelection {
62 pub filters: BTreeMap<FieldName, Filter>,
63 pub include: Vec<Include>,
64}
65
66pub struct Include {
67 pub other_table_name: TableName,
68 pub field_names: Vec<FieldName>,
69 pub other_table_field_names: Vec<FieldName>,
70}
71
72pub enum Filter {
73 Contains(Contains),
74 StartsWith(StartsWith),
75 Bool(bool),
76}
77
78impl Filter {
79 pub fn contains(arr: Arc<dyn Array>) -> Result<Self> {
80 Ok(Self::Contains(Contains::new(arr)?))
81 }
82
83 pub fn starts_with(arr: Arc<dyn Array>) -> Result<Self> {
84 Ok(Self::StartsWith(StartsWith::new(arr)?))
85 }
86
87 pub fn bool(b: bool) -> Self {
88 Self::Bool(b)
89 }
90
91 fn check(&self, arr: &dyn Array) -> Result<BooleanArray> {
92 match self {
93 Self::Contains(ct) => ct.contains(arr),
94 Self::StartsWith(sw) => sw.starts_with(arr),
95 Self::Bool(b) => {
96 let arr = arr
97 .as_any()
98 .downcast_ref::<BooleanArray>()
99 .context("cast array to boolean array")?;
100
101 let mut filter = if *b {
102 arr.clone()
103 } else {
104 compute::not(arr).context("negate array")?
105 };
106
107 if let Some(nulls) = filter.nulls() {
108 if nulls.null_count() > 0 {
109 let nulls = BooleanArray::from(nulls.inner().clone());
110 filter = compute::and(&filter, &nulls).unwrap();
111 }
112 }
113
114 Ok(filter)
115 }
116 }
117 }
118}
119
120pub struct Contains {
121 array: Arc<dyn Array>,
122 hash_table: Option<HashTable<usize>>,
123}
124
125impl Contains {
126 fn ht_from_primitive<T: ArrowPrimitiveType>(arr: &PrimitiveArray<T>) -> HashTable<usize> {
127 assert!(!arr.is_nullable());
128
129 let mut ht = HashTable::with_capacity(arr.len());
130
131 for (i, v) in arr.values().iter().enumerate() {
132 ht.insert_unique(xxh3_64(v.to_byte_slice()), i, |i| {
133 xxh3_64(unsafe { arr.value_unchecked(*i).to_byte_slice() })
134 });
135 }
136
137 ht
138 }
139
140 fn ht_from_bytes<T: ByteArrayType<Offset = i32>>(
141 arr: &GenericByteArray<T>,
142 ) -> HashTable<usize> {
143 assert!(!arr.is_nullable());
144
145 let mut ht = HashTable::with_capacity(arr.len());
146
147 for (i, v) in iter_byte_array_without_validity(arr).enumerate() {
148 ht.insert_unique(xxh3_64(v), i, |i| {
149 xxh3_64(unsafe { byte_array_get_unchecked(arr, *i) })
150 });
151 }
152
153 ht
154 }
155
156 fn ht_from_array(array: &dyn Array) -> Result<HashTable<usize>> {
157 let ht = match *array.data_type() {
158 DataType::UInt8 => {
159 let array = array.as_any().downcast_ref::<UInt8Array>().unwrap();
160 Self::ht_from_primitive(array)
161 }
162 DataType::UInt16 => {
163 let array = array.as_any().downcast_ref::<UInt16Array>().unwrap();
164 Self::ht_from_primitive(array)
165 }
166 DataType::UInt32 => {
167 let array = array.as_any().downcast_ref::<UInt32Array>().unwrap();
168 Self::ht_from_primitive(array)
169 }
170 DataType::UInt64 => {
171 let array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
172 Self::ht_from_primitive(array)
173 }
174 DataType::Int8 => {
175 let array = array.as_any().downcast_ref::<Int8Array>().unwrap();
176 Self::ht_from_primitive(array)
177 }
178 DataType::Int16 => {
179 let array = array.as_any().downcast_ref::<Int16Array>().unwrap();
180 Self::ht_from_primitive(array)
181 }
182 DataType::Int32 => {
183 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
184 Self::ht_from_primitive(array)
185 }
186 DataType::Int64 => {
187 let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
188 Self::ht_from_primitive(array)
189 }
190 DataType::Binary => {
191 let array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
192 Self::ht_from_bytes(array)
193 }
194 DataType::Utf8 => {
195 let array = array.as_any().downcast_ref::<StringArray>().unwrap();
196 Self::ht_from_bytes(array)
197 }
198 _ => {
199 return Err(anyhow!("unsupported data type: {}", array.data_type()));
200 }
201 };
202
203 Ok(ht)
204 }
205
206 pub fn new(array: Arc<dyn Array>) -> Result<Self> {
207 if array.is_nullable() {
208 return Err(anyhow!(
209 "cannot construct contains filter with a nullable array"
210 ));
211 }
212
213 let hash_table = if array.len() >= 128 {
215 Some(Self::ht_from_array(&array).context("construct hash table")?)
216 } else {
217 None
218 };
219
220 Ok(Self { hash_table, array })
221 }
222
223 fn contains(&self, arr: &dyn Array) -> Result<BooleanArray> {
224 if arr.data_type() != self.array.data_type() {
225 return Err(anyhow!(
226 "filter array is of type {} but array to be filtered is of type {}",
227 self.array.data_type(),
228 arr.data_type(),
229 ));
230 }
231 assert!(!self.array.is_nullable());
232
233 let filter = match *arr.data_type() {
234 DataType::UInt8 => {
235 let self_arr = self.array.as_any().downcast_ref::<UInt8Array>().unwrap();
236 let other_arr = arr.as_any().downcast_ref().unwrap();
237 self.contains_primitive(self_arr, other_arr)
238 }
239 DataType::UInt16 => {
240 let self_arr = self.array.as_any().downcast_ref::<UInt16Array>().unwrap();
241 let other_arr = arr.as_any().downcast_ref().unwrap();
242 self.contains_primitive(self_arr, other_arr)
243 }
244 DataType::UInt32 => {
245 let self_arr = self.array.as_any().downcast_ref::<UInt32Array>().unwrap();
246 let other_arr = arr.as_any().downcast_ref().unwrap();
247 self.contains_primitive(self_arr, other_arr)
248 }
249 DataType::UInt64 => {
250 let self_arr = self.array.as_any().downcast_ref::<UInt64Array>().unwrap();
251 let other_arr = arr.as_any().downcast_ref().unwrap();
252 self.contains_primitive(self_arr, other_arr)
253 }
254 DataType::Int8 => {
255 let self_arr = self.array.as_any().downcast_ref::<Int8Array>().unwrap();
256 let other_arr = arr.as_any().downcast_ref().unwrap();
257 self.contains_primitive(self_arr, other_arr)
258 }
259 DataType::Int16 => {
260 let self_arr = self.array.as_any().downcast_ref::<Int16Array>().unwrap();
261 let other_arr = arr.as_any().downcast_ref().unwrap();
262 self.contains_primitive(self_arr, other_arr)
263 }
264 DataType::Int32 => {
265 let self_arr = self.array.as_any().downcast_ref::<Int32Array>().unwrap();
266 let other_arr = arr.as_any().downcast_ref().unwrap();
267 self.contains_primitive(self_arr, other_arr)
268 }
269 DataType::Int64 => {
270 let self_arr = self.array.as_any().downcast_ref::<Int64Array>().unwrap();
271 let other_arr = arr.as_any().downcast_ref().unwrap();
272 self.contains_primitive(self_arr, other_arr)
273 }
274 DataType::Binary => {
275 let self_arr = self.array.as_any().downcast_ref::<BinaryArray>().unwrap();
276 let other_arr = arr.as_any().downcast_ref().unwrap();
277 self.contains_bytes(self_arr, other_arr)
278 }
279 DataType::Utf8 => {
280 let self_arr = self.array.as_any().downcast_ref::<StringArray>().unwrap();
281 let other_arr = arr.as_any().downcast_ref().unwrap();
282 self.contains_bytes(self_arr, other_arr)
283 }
284 _ => {
285 return Err(anyhow!("unsupported data type: {}", arr.data_type()));
286 }
287 };
288
289 let mut filter = filter;
290
291 if let Some(nulls) = arr.nulls() {
292 if nulls.null_count() > 0 {
293 let nulls = BooleanArray::from(nulls.inner().clone());
294 filter = compute::and(&filter, &nulls).unwrap();
295 }
296 }
297
298 Ok(filter)
299 }
300
301 fn contains_primitive<T: ArrowPrimitiveType>(
302 &self,
303 self_arr: &PrimitiveArray<T>,
304 other_arr: &PrimitiveArray<T>,
305 ) -> BooleanArray {
306 let mut filter = BooleanBuilder::with_capacity(other_arr.len());
307
308 if let Some(ht) = self.hash_table.as_ref() {
309 let hash_one = |v: &T::Native| -> u64 { xxh3_64(v.to_byte_slice()) };
310
311 for v in other_arr.values().iter() {
312 let c = ht
313 .find(hash_one(v), |idx| unsafe {
314 self_arr.values().get_unchecked(*idx) == v
315 })
316 .is_some();
317 filter.append_value(c);
318 }
319 } else {
320 for v in other_arr.values().iter() {
321 filter.append_value(self_arr.values().iter().any(|x| x == v));
322 }
323 }
324
325 filter.finish()
326 }
327
328 fn contains_bytes<T: ByteArrayType<Offset = i32>>(
329 &self,
330 self_arr: &GenericByteArray<T>,
331 other_arr: &GenericByteArray<T>,
332 ) -> BooleanArray {
333 let mut filter = BooleanBuilder::with_capacity(other_arr.len());
334
335 if let Some(ht) = self.hash_table.as_ref() {
336 for v in iter_byte_array_without_validity(other_arr) {
337 let c = ht
338 .find(xxh3_64(v), |idx| unsafe {
339 byte_array_get_unchecked(self_arr, *idx) == v
340 })
341 .is_some();
342 filter.append_value(c);
343 }
344 } else {
345 for v in iter_byte_array_without_validity(other_arr) {
346 filter.append_value(iter_byte_array_without_validity(self_arr).any(|x| x == v));
347 }
348 }
349
350 filter.finish()
351 }
352}
353
354pub struct StartsWith {
355 array: Arc<dyn Array>,
356}
357
358impl StartsWith {
359 pub fn new(array: Arc<dyn Array>) -> Result<Self> {
360 if array.is_nullable() {
361 return Err(anyhow!(
362 "cannot construct starts_with filter with a nullable array"
363 ));
364 }
365
366 Ok(Self { array })
367 }
368
369 fn starts_with(&self, arr: &dyn Array) -> Result<BooleanArray> {
370 if arr.data_type() != self.array.data_type() {
371 return Err(anyhow!(
372 "filter array is of type {} but array to be filtered is of type {}",
373 self.array.data_type(),
374 arr.data_type(),
375 ));
376 }
377 assert!(!self.array.is_nullable());
378
379 let mut filter = match *arr.data_type() {
380 DataType::Binary => {
381 let self_arr = self.array.as_any().downcast_ref::<BinaryArray>().unwrap();
382 let other_arr = arr.as_any().downcast_ref().unwrap();
383 self.starts_with_bytes(self_arr, other_arr)
384 }
385 DataType::Utf8 => {
386 let self_arr = self.array.as_any().downcast_ref::<StringArray>().unwrap();
387 let other_arr = arr.as_any().downcast_ref().unwrap();
388 self.starts_with_bytes(self_arr, other_arr)
389 }
390 _ => {
391 return Err(anyhow!("unsupported data type: {}", arr.data_type()));
392 }
393 };
394
395 if let Some(nulls) = arr.nulls() {
396 if nulls.null_count() > 0 {
397 let nulls = BooleanArray::from(nulls.inner().clone());
398 filter = compute::and(&filter, &nulls).unwrap();
399 }
400 }
401
402 Ok(filter)
403 }
404
405 fn starts_with_bytes<T: ByteArrayType<Offset = i32>>(
406 &self,
407 self_arr: &GenericByteArray<T>,
408 other_arr: &GenericByteArray<T>,
409 ) -> BooleanArray {
410 let mut filter = BooleanBuilder::with_capacity(other_arr.len());
411
412 for v in iter_byte_array_without_validity(other_arr) {
414 let mut found = false;
415 for prefix in iter_byte_array_without_validity(self_arr) {
416 if v.starts_with(prefix) {
417 found = true;
418 break;
419 }
420 }
421 filter.append_value(found);
422 }
423
424 filter.finish()
425 }
426}
427
428unsafe fn byte_array_get_unchecked<T: ByteArrayType<Offset = i32>>(
431 arr: &GenericByteArray<T>,
432 i: usize,
433) -> &[u8] {
434 let end = *arr.value_offsets().get_unchecked(i + 1);
435 let start = *arr.value_offsets().get_unchecked(i);
436
437 std::slice::from_raw_parts(
438 arr.value_data()
439 .as_ptr()
440 .offset(isize::try_from(start).unwrap()),
441 usize::try_from(end - start).unwrap(),
442 )
443}
444
445fn iter_byte_array_without_validity<T: ByteArrayType<Offset = i32>>(
446 arr: &GenericByteArray<T>,
447) -> impl Iterator<Item = &[u8]> {
448 (0..arr.len()).map(|i| unsafe { byte_array_get_unchecked(arr, i) })
449}
450
451pub fn run_query(
452 data: &BTreeMap<TableName, RecordBatch>,
453 query: &Query,
454) -> Result<BTreeMap<TableName, RecordBatch>> {
455 let filters = query
456 .selection
457 .par_iter()
458 .map(|(table_name, selections)| {
459 selections
460 .par_iter()
461 .enumerate()
462 .map(|(i, selection)| {
463 run_table_selection(data, table_name, selection).with_context(|| {
464 format!("run table selection no:{} for table {}", i, table_name)
465 })
466 })
467 .collect::<Result<Vec<_>>>()
468 })
469 .collect::<Result<Vec<_>>>()?;
470
471 let data = select_fields(data, &query.fields).context("select fields")?;
472
473 data.par_iter()
474 .filter_map(|(table_name, table_data)| {
475 let mut combined_filter: Option<BooleanArray> = None;
476
477 for f in filters.iter() {
478 for f in f.iter() {
479 let filter = match f.get(table_name) {
480 Some(f) => f,
481 None => continue,
482 };
483
484 match combined_filter.as_ref() {
485 Some(e) => {
486 let f = compute::or(e, filter)
487 .with_context(|| format!("combine filters for {}", table_name));
488 let f = match f {
489 Ok(v) => v,
490 Err(err) => return Some(Err(err)),
491 };
492 combined_filter = Some(f);
493 }
494 None => {
495 combined_filter = Some(filter.clone());
496 }
497 }
498 }
499 }
500
501 let combined_filter = match combined_filter {
502 Some(f) => f,
503 None => return None,
504 };
505
506 let table_data = compute::filter_record_batch(table_data, &combined_filter)
507 .context("filter record batch");
508 let table_data = match table_data {
509 Ok(v) => v,
510 Err(err) => return Some(Err(err)),
511 };
512
513 Some(Ok((table_name.to_owned(), table_data)))
514 })
515 .collect()
516}
517
518pub fn select_fields(
519 data: &BTreeMap<TableName, RecordBatch>,
520 fields: &BTreeMap<TableName, Vec<FieldName>>,
521) -> Result<BTreeMap<TableName, RecordBatch>> {
522 let mut out = BTreeMap::new();
523
524 for (table_name, field_names) in fields.iter() {
525 let table_data = data
526 .get(table_name)
527 .with_context(|| format!("get data for table {}", table_name))?;
528
529 let indices = table_data
530 .schema_ref()
531 .fields()
532 .iter()
533 .enumerate()
534 .filter(|(_, field)| field_names.contains(field.name()))
535 .map(|(i, _)| i)
536 .collect::<Vec<usize>>();
537
538 let table_data = table_data
539 .project(&indices)
540 .with_context(|| format!("project table {}", table_name))?;
541 out.insert(table_name.to_owned(), table_data);
542 }
543
544 Ok(out)
545}
546
547fn run_table_selection(
548 data: &BTreeMap<TableName, RecordBatch>,
549 table_name: &str,
550 selection: &TableSelection,
551) -> Result<BTreeMap<TableName, BooleanArray>> {
552 let mut out = BTreeMap::new();
553
554 let table_data = data.get(table_name).context("get table data")?;
555 let mut combined_filter = None;
556 for (field_name, filter) in selection.filters.iter() {
557 let col = table_data
558 .column_by_name(field_name)
559 .with_context(|| format!("get field {}", field_name))?;
560
561 let f = filter
562 .check(&col)
563 .with_context(|| format!("check filter for column {}", field_name))?;
564
565 match combined_filter {
566 Some(cf) => {
567 combined_filter = Some(
568 compute::and(&cf, &f)
569 .with_context(|| format!("combine filter for column {}", field_name))?,
570 );
571 }
572 None => {
573 combined_filter = Some(f);
574 }
575 }
576 }
577
578 let combined_filter = match combined_filter {
579 Some(cf) => cf,
580 None => BooleanArray::new(BooleanBuffer::new_set(table_data.num_rows()), None),
581 };
582
583 out.insert(table_name.to_owned(), combined_filter.clone());
584
585 let mut filtered_cache = BTreeMap::new();
586
587 for (i, inc) in selection.include.iter().enumerate() {
588 if inc.other_table_field_names.len() != inc.field_names.len() {
589 return Err(anyhow!(
590 "field names are different for self table and other table while processing include no: {}. {} {}",
591 i,
592 inc.field_names.len(),
593 inc.other_table_field_names.len(),
594 ));
595 }
596
597 let other_table_data = data.get(&inc.other_table_name).with_context(|| {
598 format!(
599 "get data for table {} as other table data",
600 inc.other_table_name
601 )
602 })?;
603
604 let self_arr = columns_to_binary_array(table_data, &inc.field_names)
605 .context("get row format binary arr for self")?;
606
607 let contains = match filtered_cache.entry(inc.field_names.clone()) {
608 Entry::Vacant(entry) => {
609 let self_arr = compute::filter(&self_arr, &combined_filter)
610 .context("apply combined filter to self arr")?;
611 let contains =
612 Contains::new(Arc::new(self_arr)).context("create contains filter")?;
613 let contains = Arc::new(contains);
614 entry.insert(Arc::clone(&contains));
615 contains
616 }
617 Entry::Occupied(entry) => Arc::clone(entry.get()),
618 };
619
620 let other_arr = columns_to_binary_array(other_table_data, &inc.other_table_field_names)
621 .with_context(|| {
622 format!(
623 "get row format binary arr for other table {}",
624 inc.other_table_name
625 )
626 })?;
627
628 let f = contains
629 .contains(&other_arr)
630 .with_context(|| format!("run contains for other table {}", inc.other_table_name))?;
631
632 match out.entry(inc.other_table_name.clone()) {
633 Entry::Vacant(entry) => {
634 entry.insert(f);
635 }
636 Entry::Occupied(mut entry) => {
637 let new = compute::or(entry.get(), &f).with_context(|| {
638 format!("or include filters for table {}", inc.other_table_name)
639 })?;
640 entry.insert(new);
641 }
642 }
643 }
644
645 Ok(out)
646}
647
648fn columns_to_binary_array(
649 table_data: &RecordBatch,
650 column_names: &[String],
651) -> Result<BinaryArray> {
652 let fields = column_names
653 .iter()
654 .map(|field_name| {
655 let f = table_data
656 .schema_ref()
657 .field_with_name(field_name)
658 .with_context(|| format!("get field {} from schema", field_name))?;
659 Ok(SortField::new(f.data_type().clone()))
660 })
661 .collect::<Result<Vec<_>>>()?;
662 let conv = RowConverter::new(fields).context("create row converter")?;
663
664 let columns = column_names
665 .iter()
666 .map(|field_name| {
667 let c = table_data
668 .column_by_name(field_name)
669 .with_context(|| format!("get data for column {}", field_name))?;
670 let c = Arc::clone(c);
671 Ok(c)
672 })
673 .collect::<Result<Vec<_>>>()?;
674
675 let rows = conv
676 .convert_columns(&columns)
677 .context("convert columns to row format")?;
678 let out = rows
679 .try_into_binary()
680 .context("convert row format to binary array")?;
681
682 Ok(out)
683}
684
685#[cfg(test)]
686mod tests {
687 use arrow::{
688 array::AsArray,
689 datatypes::{Field, Schema},
690 };
691
692 use super::*;
693
694 #[test]
695 fn basic_test_cherry_query() {
696 let team_a = RecordBatch::try_new(
697 Arc::new(Schema::new(vec![
698 Arc::new(Field::new("name", DataType::Utf8, true)),
699 Arc::new(Field::new("age", DataType::UInt64, true)),
700 Arc::new(Field::new("height", DataType::UInt64, true)),
701 ])),
702 vec![
703 Arc::new(StringArray::from_iter_values(
704 vec!["kamil", "mahmut", "qwe", "kazim"].into_iter(),
705 )),
706 Arc::new(UInt64Array::from_iter(vec![11, 12, 13, 31].into_iter())),
707 Arc::new(UInt64Array::from_iter(vec![50, 60, 70, 60].into_iter())),
708 ],
709 )
710 .unwrap();
711 let team_b = RecordBatch::try_new(
712 Arc::new(Schema::new(vec![
713 Arc::new(Field::new("name2", DataType::Utf8, true)),
714 Arc::new(Field::new("age2", DataType::UInt64, true)),
715 Arc::new(Field::new("height2", DataType::UInt64, true)),
716 ])),
717 vec![
718 Arc::new(StringArray::from_iter_values(vec![
719 "yusuf", "abuzer", "asd",
720 ])),
721 Arc::new(UInt64Array::from_iter(vec![11, 12, 13].into_iter())),
722 Arc::new(UInt64Array::from_iter(vec![50, 61, 70].into_iter())),
723 ],
724 )
725 .unwrap();
726
727 let query = Query {
728 fields: [
729 ("team_a".to_owned(), vec!["name".to_owned()]),
730 ("team_b".to_owned(), vec!["name2".to_owned()]),
731 ]
732 .into_iter()
733 .collect(),
734 selection: Arc::new(
735 [(
736 "team_a".to_owned(),
737 vec![TableSelection {
738 filters: [(
739 "name".to_owned(),
740 Filter::Contains(
741 Contains::new(Arc::new(StringArray::from_iter_values(
742 vec!["kamil", "mahmut"].into_iter(),
743 )))
744 .unwrap(),
745 ),
746 )]
747 .into_iter()
748 .collect(),
749 include: vec![
750 Include {
751 field_names: vec!["age".to_owned(), "height".to_owned()],
752 other_table_field_names: vec![
753 "age2".to_owned(),
754 "height2".to_owned(),
755 ],
756 other_table_name: "team_b".to_owned(),
757 },
758 Include {
759 field_names: vec!["height".to_owned()],
760 other_table_field_names: vec!["height".to_owned()],
761 other_table_name: "team_a".to_owned(),
762 },
763 ],
764 }],
765 )]
766 .into_iter()
767 .collect(),
768 ),
769 };
770
771 let data = [("team_a".to_owned(), team_a), ("team_b".to_owned(), team_b)]
772 .into_iter()
773 .collect::<BTreeMap<_, _>>();
774
775 let res = run_query(&data, &query).unwrap();
776
777 let team_a = res.get("team_a").unwrap();
778 let team_b = res.get("team_b").unwrap();
779
780 assert_eq!(res.len(), 2);
781
782 let name = team_a.column_by_name("name").unwrap();
783 let name2 = team_b.column_by_name("name2").unwrap();
784
785 assert_eq!(team_a.num_columns(), 1);
786 assert_eq!(team_b.num_columns(), 1);
787
788 assert_eq!(
789 name.as_string(),
790 &StringArray::from_iter_values(["kamil", "mahmut", "kazim"])
791 );
792 assert_eq!(name2.as_string(), &StringArray::from_iter_values(["yusuf"]));
793 }
794
795 #[test]
796 fn test_starts_with_filter() {
797 let data = RecordBatch::try_new(
798 Arc::new(Schema::new(vec![
799 Arc::new(Field::new("name", DataType::Utf8, true)),
800 Arc::new(Field::new("binary", DataType::Binary, true)),
801 ])),
802 vec![
803 Arc::new(StringArray::from_iter_values(
804 vec!["hello", "world", "helloworld", "goodbye", "hell"].into_iter(),
805 )),
806 Arc::new(BinaryArray::from_iter_values(
807 vec![b"hello", b"world", b"hepto", b"grace", b"heheh"].into_iter(),
808 )),
809 ],
810 )
811 .unwrap();
812
813 let query = Query {
814 fields: [(
815 "data".to_owned(),
816 vec!["name".to_owned(), "binary".to_owned()],
817 )]
818 .into_iter()
819 .collect(),
820 selection: Arc::new(
821 [(
822 "data".to_owned(),
823 vec![TableSelection {
824 filters: [
825 (
826 "name".to_owned(),
827 Filter::StartsWith(
828 StartsWith::new(Arc::new(StringArray::from_iter_values(
829 vec!["he"].into_iter(),
830 )))
831 .unwrap(),
832 ),
833 ),
834 (
835 "binary".to_owned(),
836 Filter::StartsWith(
837 StartsWith::new(Arc::new(BinaryArray::from_iter_values(
838 vec![b"he"].into_iter(),
839 )))
840 .unwrap(),
841 ),
842 ),
843 ]
844 .into_iter()
845 .collect(),
846 include: vec![],
847 }],
848 )]
849 .into_iter()
850 .collect(),
851 ),
852 };
853
854 let data = [("data".to_owned(), data)]
855 .into_iter()
856 .collect::<BTreeMap<_, _>>();
857
858 let res = run_query(&data, &query).unwrap();
859 let filtered = res.get("data").unwrap();
860
861 let name = filtered.column_by_name("name").unwrap();
862 let binary = filtered.column_by_name("binary").unwrap();
863 assert_eq!(
864 name.as_string(),
865 &StringArray::from_iter_values(["hello", "helloworld", "hell"])
866 );
867 assert_eq!(
868 binary.as_binary::<i32>(),
869 &BinaryArray::from_iter_values([b"hello", b"hepto", b"heheh"].into_iter())
870 );
871 }
872}