use std::collections::HashMap;
use crate::errors::MdqlError;
use crate::model::{Row, Value};
use crate::query_ast::*;
use crate::query_engine::evaluate;
use crate::schema::Schema;
pub fn execute_join_query(
query: &SelectQuery,
tables: &HashMap<String, (Schema, Vec<Row>)>,
) -> crate::errors::Result<(Vec<Row>, Vec<String>)> {
if query.joins.is_empty() {
return Err(MdqlError::QueryExecution("No JOIN clause in query".into()));
}
let left_name = &query.table;
let left_alias = query.table_alias.as_deref().unwrap_or(left_name);
let (_left_schema, left_rows) = tables.get(left_name.as_str()).ok_or_else(|| {
MdqlError::QueryExecution(format!("Unknown table '{}'", left_name))
})?;
let mut current_rows: Vec<Row> = left_rows
.iter()
.map(|r| {
let mut prefixed = Row::new();
for (k, v) in r {
prefixed.insert(format!("{}.{}", left_alias, k), v.clone());
}
prefixed
})
.collect();
for join in &query.joins {
let right_name = &join.table;
let right_alias = join.alias.as_deref().unwrap_or(right_name);
let (_right_schema, right_rows) = tables.get(right_name.as_str()).ok_or_else(|| {
MdqlError::QueryExecution(format!("Unknown table '{}'", right_name))
})?;
let right_columns: Vec<String> = right_rows
.first()
.map(|r| r.keys().cloned().collect())
.unwrap_or_default();
let mut next_rows: Vec<Row> = Vec::new();
for lr in ¤t_rows {
let mut matched = false;
for rr in right_rows {
let mut merged = lr.clone();
for (k, v) in rr {
merged.insert(format!("{}.{}", right_alias, k), v.clone());
}
if evaluate(&join.condition, &merged) {
next_rows.push(merged);
matched = true;
}
}
if !matched && join.join_type == JoinType::Left {
let mut merged = lr.clone();
for col in &right_columns {
merged.insert(format!("{}.{}", right_alias, col), Value::Null);
}
next_rows.push(merged);
}
}
current_rows = next_rows;
}
let (mut result, columns) = super::query_engine::execute_inner(query, ¤t_rows, None)?;
if !result.is_empty() {
let mut base_counts: HashMap<String, usize> = HashMap::new();
for key in &columns {
if let Some((_prefix, base)) = key.split_once('.') {
*base_counts.entry(base.to_string()).or_default() += 1;
}
}
let unique_bases: Vec<String> = base_counts
.into_iter()
.filter(|(_, count)| *count == 1)
.map(|(base, _)| base)
.collect();
if !unique_bases.is_empty() {
let unique_set: std::collections::HashSet<&str> =
unique_bases.iter().map(|s| s.as_str()).collect();
for row in &mut result {
let additions: Vec<(String, Value)> = row
.iter()
.filter_map(|(k, v)| {
k.split_once('.').and_then(|(_, base)| {
if unique_set.contains(base) {
Some((base.to_string(), v.clone()))
} else {
None
}
})
})
.collect();
for (k, v) in additions {
row.insert(k, v);
}
}
}
}
Ok((result, columns))
}