1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use {
	super::{JoinError, JoinMethod, JoinPlan, JoinType},
	crate::{
		executor::types::{ColumnInfo, Row},
		Glue, IndexFilter, Ingredient, MetaRecipe, Method, PlannedRecipe, Recipe, Result, Value,
	},
};

#[derive(Debug)]
pub struct JoinExecute {
	pub database: Option<String>,
	pub table: String,
	pub method: JoinMethod,
	pub join_type: JoinType,
	pub widths: (usize, usize),
	pub index_filter: Option<IndexFilter>,
}

impl JoinExecute {
	pub fn new(
		plan: JoinPlan,
		plane_columns: &[ColumnInfo],
		index_filter: Option<IndexFilter>,
	) -> Result<Self> {
		let JoinPlan {
			database,
			table,
			join_type,
			constraint,
			columns,
			..
		} = plan;
		let widths = (plane_columns.len(), columns.len());
		let method = decide_method(constraint, columns, plane_columns)?;
		Ok(Self {
			database,
			table,
			method,
			join_type,
			widths,
			index_filter,
		})
	}
	pub fn set_first_table(&mut self) {
		self.method = JoinMethod::FirstTable;
	}
	pub async fn execute<'a>(self, glue: &Glue, plane_rows: Vec<Row>) -> Result<Vec<Row>> {
		let rows = glue
			.get_rows(&self.table, &self.database, &self.index_filter)
			.await?;
		self.method.run(
			&self.join_type,
			self.widths.0,
			self.widths.1,
			plane_rows,
			rows,
		)
	}
}

fn decide_method(
	constraint: MetaRecipe,
	self_columns: Vec<ColumnInfo>,
	plane_columns: &[ColumnInfo],
) -> Result<JoinMethod> {
	Ok(match &constraint.recipe {
		Recipe::Ingredient(Ingredient::Value(Value::Bool(true))) => JoinMethod::All,
		Recipe::Method(method) => match **method {
			Method::BinaryOperation(
				operator,
				Recipe::Ingredient(Ingredient::Column(index_l)),
				Recipe::Ingredient(Ingredient::Column(index_r)),
			) if operator == Value::eq => {
				// TODO: Be more strict, ensure that one column is from plane, and another from self.
				let column_l = constraint
					.meta
					.objects
					.get(index_l)
					.ok_or(JoinError::Unreachable)?
					.as_ref()
					.ok_or(JoinError::Unreachable)?;
				let column_r = constraint
					.meta
					.objects
					.get(index_r)
					.ok_or(JoinError::Unreachable)?
					.as_ref()
					.ok_or(JoinError::Unreachable)?;

				let (self_index, plane_index) = if let Some(self_index) =
					self_columns.iter().position(|column| column == column_l)
				{
					let plane_index = plane_columns
						.iter()
						.position(|column| column == column_r)
						.ok_or(JoinError::Unreachable)?;
					(self_index, plane_index)
				} else {
					let self_index = self_columns
						.iter()
						.position(|column| column == column_r)
						.ok_or(JoinError::Unreachable)?;
					let plane_index = plane_columns
						.iter()
						.position(|column| column == column_l)
						.ok_or(JoinError::Unreachable)?;
					(self_index, plane_index)
				};

				JoinMethod::ColumnEqColumn {
					plane_trust_ordered: false,
					plane_index,
					self_trust_ordered: false,
					self_index,
				}
			}
			// TODO: Methods for:
			// (plan)Column = (other)Column AND (plan)Column = (other or otherother)Column
			// (plan)Column = (other)Column OR (plan)Column = (other or otherother)Column
			_ => JoinMethod::General(PlannedRecipe::new(constraint.clone(), plane_columns)?),
		},
		_ => JoinMethod::Ignore,
	})
}