use crate::soch_ql::SochValue;
use crate::sql::ast::{Expr, JoinType};
use super::eval::{eval_expr, eval_predicate, compare_values};
use super::node::PlanNode;
use super::types::{Row, Schema};
use sochdb_core::Result;
use std::collections::HashMap;
pub struct HashJoinNode {
build: Box<dyn PlanNode>,
probe: Box<dyn PlanNode>,
build_key_expr: Expr,
probe_key_expr: Expr,
join_type: JoinType,
output_schema: Schema,
hash_table: Option<HashMap<HashKey, Vec<Row>>>,
build_matched: Vec<bool>,
current_probe_row: Option<Row>,
current_matches: Vec<Row>,
match_idx: usize,
unmatched_buffer: Option<Vec<Row>>,
unmatched_pos: usize,
probe_exhausted: bool,
current_probe_matched: bool,
build_schema: Schema,
probe_schema: Schema,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum HashKey {
Int(i64),
UInt(u64),
Text(String),
Bool(bool),
Null,
Other(String),
}
impl From<&SochValue> for HashKey {
fn from(v: &SochValue) -> Self {
match v {
SochValue::Int(i) => HashKey::Int(*i),
SochValue::UInt(u) => HashKey::UInt(*u),
SochValue::Text(s) => HashKey::Text(s.clone()),
SochValue::Bool(b) => HashKey::Bool(*b),
SochValue::Null => HashKey::Null,
other => HashKey::Other(format!("{:?}", other)),
}
}
}
impl HashJoinNode {
pub fn new(
build: Box<dyn PlanNode>,
probe: Box<dyn PlanNode>,
build_key_expr: Expr,
probe_key_expr: Expr,
join_type: JoinType,
) -> Self {
let build_schema = build.schema().clone();
let probe_schema = probe.schema().clone();
let output_schema = build_schema.merge(&probe_schema);
Self {
build,
probe,
build_key_expr,
probe_key_expr,
join_type,
output_schema,
hash_table: None,
build_matched: Vec::new(),
current_probe_row: None,
current_matches: Vec::new(),
match_idx: 0,
unmatched_buffer: None,
unmatched_pos: 0,
probe_exhausted: false,
current_probe_matched: false,
build_schema,
probe_schema,
}
}
fn build_hash_table(&mut self) -> Result<()> {
if self.hash_table.is_some() {
return Ok(());
}
let mut table: HashMap<HashKey, Vec<Row>> = HashMap::new();
let mut all_build_rows: Vec<Row> = Vec::new();
let schema = self.build.schema().clone();
while let Some(row) = self.build.next()? {
let key_val = eval_expr(&self.build_key_expr, &row, &schema)?;
let key = HashKey::from(&key_val);
table.entry(key).or_default().push(row.clone());
all_build_rows.push(row);
}
self.build_matched = vec![false; all_build_rows.len()];
self.hash_table = Some(table);
Ok(())
}
fn null_row(schema: &Schema) -> Row {
vec![SochValue::Null; schema.len()]
}
fn combine(build_row: &Row, probe_row: &Row) -> Row {
let mut combined = build_row.clone();
combined.extend(probe_row.iter().cloned());
combined
}
}
impl PlanNode for HashJoinNode {
fn schema(&self) -> &Schema {
&self.output_schema
}
fn next(&mut self) -> Result<Option<Row>> {
self.build_hash_table()?;
loop {
if self.match_idx < self.current_matches.len() {
let build_row = &self.current_matches[self.match_idx];
let probe_row = self.current_probe_row.as_ref().unwrap();
self.match_idx += 1;
return Ok(Some(Self::combine(build_row, probe_row)));
}
if self.current_probe_row.is_some()
&& !self.current_probe_matched
&& matches!(self.join_type, JoinType::Left | JoinType::Full)
{
let probe_row = self.current_probe_row.take().unwrap();
let null_build = Self::null_row(&self.build_schema);
return Ok(Some(Self::combine(&null_build, &probe_row)));
}
self.current_probe_row = None;
self.current_matches.clear();
self.match_idx = 0;
self.current_probe_matched = false;
if !self.probe_exhausted {
match self.probe.next()? {
Some(probe_row) => {
let key_val = eval_expr(
&self.probe_key_expr,
&probe_row,
&self.probe_schema,
)?;
let key = HashKey::from(&key_val);
if let Some(ht) = &self.hash_table {
if let Some(matches) = ht.get(&key) {
self.current_matches = matches.clone();
self.current_probe_matched = true;
}
}
self.current_probe_row = Some(probe_row);
continue;
}
None => {
self.probe_exhausted = true;
}
}
}
if matches!(self.join_type, JoinType::Right | JoinType::Full) {
if self.unmatched_buffer.is_none() {
self.unmatched_buffer = Some(Vec::new());
}
if let Some(buf) = &self.unmatched_buffer {
if self.unmatched_pos < buf.len() {
let row = buf[self.unmatched_pos].clone();
self.unmatched_pos += 1;
let null_probe = Self::null_row(&self.probe_schema);
return Ok(Some(Self::combine(&row, &null_probe)));
}
}
}
return Ok(None);
}
}
fn reset(&mut self) -> Result<()> {
self.hash_table = None;
self.current_probe_row = None;
self.current_matches.clear();
self.match_idx = 0;
self.probe_exhausted = false;
self.unmatched_buffer = None;
self.unmatched_pos = 0;
self.build.reset()?;
self.probe.reset()
}
}
pub struct NestedLoopJoinNode {
outer: Box<dyn PlanNode>,
inner: Box<dyn PlanNode>,
condition: Option<Expr>,
join_type: JoinType,
output_schema: Schema,
current_outer: Option<Row>,
current_matched: bool,
outer_exhausted: bool,
_outer_schema: Schema,
inner_schema: Schema,
}
impl NestedLoopJoinNode {
pub fn new(
outer: Box<dyn PlanNode>,
inner: Box<dyn PlanNode>,
condition: Option<Expr>,
join_type: JoinType,
) -> Self {
let outer_schema = outer.schema().clone();
let inner_schema = inner.schema().clone();
let output_schema = outer_schema.merge(&inner_schema);
Self {
outer,
inner,
condition,
join_type,
output_schema,
current_outer: None,
current_matched: false,
outer_exhausted: false,
_outer_schema: outer_schema,
inner_schema,
}
}
fn combine(outer_row: &Row, inner_row: &Row) -> Row {
let mut combined = outer_row.clone();
combined.extend(inner_row.iter().cloned());
combined
}
fn null_row(schema: &Schema) -> Row {
vec![SochValue::Null; schema.len()]
}
}
impl PlanNode for NestedLoopJoinNode {
fn schema(&self) -> &Schema {
&self.output_schema
}
fn next(&mut self) -> Result<Option<Row>> {
loop {
if self.current_outer.is_none() {
if self.outer_exhausted {
return Ok(None);
}
match self.outer.next()? {
Some(row) => {
self.current_outer = Some(row);
self.current_matched = false;
self.inner.reset()?;
}
None => {
self.outer_exhausted = true;
return Ok(None);
}
}
}
let outer_row = self.current_outer.as_ref().unwrap();
match self.inner.next()? {
Some(inner_row) => {
let combined = Self::combine(outer_row, &inner_row);
let matched = match &self.condition {
Some(cond) => eval_predicate(cond, &combined, &self.output_schema)?,
None => true, };
if matched {
self.current_matched = true;
return Ok(Some(combined));
}
continue;
}
None => {
let need_null_row = !self.current_matched
&& matches!(self.join_type, JoinType::Left | JoinType::Full);
let outer_row = self.current_outer.take().unwrap();
if need_null_row {
let null_inner = Self::null_row(&self.inner_schema);
return Ok(Some(Self::combine(&outer_row, &null_inner)));
}
continue;
}
}
}
}
fn reset(&mut self) -> Result<()> {
self.current_outer = None;
self.current_matched = false;
self.outer_exhausted = false;
self.outer.reset()?;
self.inner.reset()
}
}
pub struct MergeJoinNode {
left: Box<dyn PlanNode>,
right: Box<dyn PlanNode>,
left_key_expr: Expr,
right_key_expr: Expr,
join_type: JoinType,
output_schema: Schema,
left_schema: Schema,
right_schema: Schema,
right_buffer: Vec<Row>,
right_buffer_key: Option<SochValue>,
right_buf_idx: usize,
current_left: Option<Row>,
current_left_key: Option<SochValue>,
right_exhausted: bool,
pending_right: Option<Row>,
}
impl MergeJoinNode {
pub fn new(
left: Box<dyn PlanNode>,
right: Box<dyn PlanNode>,
left_key_expr: Expr,
right_key_expr: Expr,
join_type: JoinType,
) -> Self {
let left_schema = left.schema().clone();
let right_schema = right.schema().clone();
let output_schema = left_schema.merge(&right_schema);
Self {
left,
right,
left_key_expr,
right_key_expr,
join_type,
output_schema,
left_schema,
right_schema,
right_buffer: Vec::new(),
right_buffer_key: None,
right_buf_idx: 0,
current_left: None,
current_left_key: None,
right_exhausted: false,
pending_right: None,
}
}
fn combine(left_row: &Row, right_row: &Row) -> Row {
let mut combined = left_row.clone();
combined.extend(right_row.iter().cloned());
combined
}
fn advance_right(&mut self) -> Result<Option<(SochValue, Row)>> {
if let Some(row) = self.pending_right.take() {
let key = eval_expr(&self.right_key_expr, &row, &self.right_schema)?;
return Ok(Some((key, row)));
}
match self.right.next()? {
Some(row) => {
let key = eval_expr(&self.right_key_expr, &row, &self.right_schema)?;
Ok(Some((key, row)))
}
None => {
self.right_exhausted = true;
Ok(None)
}
}
}
}
impl PlanNode for MergeJoinNode {
fn schema(&self) -> &Schema {
&self.output_schema
}
fn next(&mut self) -> Result<Option<Row>> {
loop {
if self.right_buf_idx < self.right_buffer.len() {
if let Some(left_row) = &self.current_left {
let right_row = &self.right_buffer[self.right_buf_idx];
self.right_buf_idx += 1;
return Ok(Some(Self::combine(left_row, right_row)));
}
}
let left_row = match self.left.next()? {
Some(row) => row,
None => return Ok(None),
};
let left_key = eval_expr(&self.left_key_expr, &left_row, &self.left_schema)?;
if self.right_buffer_key.as_ref().map_or(false, |k| {
compare_values(k, &left_key) == Some(std::cmp::Ordering::Equal)
}) {
self.current_left = Some(left_row);
self.current_left_key = Some(left_key);
self.right_buf_idx = 0;
continue;
}
self.right_buffer.clear();
self.right_buf_idx = 0;
if self.right_exhausted {
if matches!(self.join_type, JoinType::Left | JoinType::Full) {
let null_right = vec![SochValue::Null; self.right_schema.len()];
return Ok(Some(Self::combine(&left_row, &null_right)));
}
return Ok(None);
}
loop {
match self.advance_right()? {
Some((right_key, right_row)) => {
match compare_values(&right_key, &left_key) {
Some(std::cmp::Ordering::Equal) => {
self.right_buffer.push(right_row);
self.right_buffer_key = Some(right_key);
break;
}
Some(std::cmp::Ordering::Greater) => {
self.pending_right = Some(right_row);
break;
}
_ => {
continue;
}
}
}
None => break,
}
}
if !self.right_buffer.is_empty() {
loop {
match self.advance_right()? {
Some((right_key, right_row)) => {
if compare_values(&right_key, &left_key) == Some(std::cmp::Ordering::Equal) {
self.right_buffer.push(right_row);
} else {
self.pending_right = Some(right_row);
break;
}
}
None => break,
}
}
}
self.current_left = Some(left_row);
self.current_left_key = Some(left_key);
self.right_buf_idx = 0;
if self.right_buffer.is_empty() {
if matches!(self.join_type, JoinType::Left | JoinType::Full) {
let left_row = self.current_left.take().unwrap();
let null_right = vec![SochValue::Null; self.right_schema.len()];
return Ok(Some(Self::combine(&left_row, &null_right)));
}
continue;
}
}
}
fn reset(&mut self) -> Result<()> {
self.right_buffer.clear();
self.right_buffer_key = None;
self.right_buf_idx = 0;
self.current_left = None;
self.current_left_key = None;
self.right_exhausted = false;
self.pending_right = None;
self.left.reset()?;
self.right.reset()
}
}