1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use {
super::{JoinError, JoinMethod, JoinPlan, JoinType},
crate::{
executor::types::{ColumnInfo, Row},
Glue, IndexFilter, Ingredient, MetaRecipe, Method, PlannedRecipe, Recipe, Result, Value,
},
};
#[derive(Debug)]
pub struct JoinExecute {
pub database: Option<String>,
pub table: String,
pub method: JoinMethod,
pub join_type: JoinType,
pub widths: (usize, usize),
pub index_filter: Option<IndexFilter>,
}
impl JoinExecute {
pub fn new(
plan: JoinPlan,
plane_columns: &[ColumnInfo],
index_filter: Option<IndexFilter>,
) -> Result<Self> {
let JoinPlan {
database,
table,
join_type,
constraint,
columns,
..
} = plan;
let widths = (plane_columns.len(), columns.len());
let method = decide_method(constraint, columns, plane_columns)?;
Ok(Self {
database,
table,
method,
join_type,
widths,
index_filter,
})
}
pub fn set_first_table(&mut self) {
self.method = JoinMethod::FirstTable;
}
pub async fn execute<'a>(self, glue: &Glue, plane_rows: Vec<Row>) -> Result<Vec<Row>> {
let rows = glue
.get_rows(&self.table, &self.database, &self.index_filter)
.await?;
self.method.run(
&self.join_type,
self.widths.0,
self.widths.1,
plane_rows,
rows,
)
}
}
fn decide_method(
constraint: MetaRecipe,
self_columns: Vec<ColumnInfo>,
plane_columns: &[ColumnInfo],
) -> Result<JoinMethod> {
Ok(match &constraint.recipe {
Recipe::Ingredient(Ingredient::Value(Value::Bool(true))) => JoinMethod::All,
Recipe::Method(method) => match **method {
Method::BinaryOperation(
operator,
Recipe::Ingredient(Ingredient::Column(index_l)),
Recipe::Ingredient(Ingredient::Column(index_r)),
) if operator == Value::eq => {
let column_l = constraint
.meta
.objects
.get(index_l)
.ok_or(JoinError::Unreachable)?
.as_ref()
.ok_or(JoinError::Unreachable)?;
let column_r = constraint
.meta
.objects
.get(index_r)
.ok_or(JoinError::Unreachable)?
.as_ref()
.ok_or(JoinError::Unreachable)?;
let (self_index, plane_index) = if let Some(self_index) =
self_columns.iter().position(|column| column == column_l)
{
let plane_index = plane_columns
.iter()
.position(|column| column == column_r)
.ok_or(JoinError::Unreachable)?;
(self_index, plane_index)
} else {
let self_index = self_columns
.iter()
.position(|column| column == column_r)
.ok_or(JoinError::Unreachable)?;
let plane_index = plane_columns
.iter()
.position(|column| column == column_l)
.ok_or(JoinError::Unreachable)?;
(self_index, plane_index)
};
JoinMethod::ColumnEqColumn {
plane_trust_ordered: false,
plane_index,
self_trust_ordered: false,
self_index,
}
}
_ => JoinMethod::General(PlannedRecipe::new(constraint.clone(), plane_columns)?),
},
_ => JoinMethod::Ignore,
})
}