use reifydb_core::{interface::change::Diff, value::column::columns::Columns};
use reifydb_runtime::hash::Hash128;
use reifydb_type::Result;
use super::{
JoinContext, UpdateKeys,
hash::{
JoinEmitContext, add_to_state_entry_batch, emit_joined_columns_batch, emit_remove_joined_columns_batch,
emit_update_joined_columns, is_first_right_row, pull_left_columns, remove_from_state_entry,
update_row_in_entry,
},
};
use crate::{operator::join::state::JoinSide, transaction::FlowTransaction};
pub(crate) struct LeftHashJoin;
impl LeftHashJoin {
pub(crate) fn handle_insert_undefined(
&self,
txn: &mut FlowTransaction,
post: &Columns,
row_idx: usize,
ctx: &mut JoinContext,
) -> Result<Vec<Diff>> {
match ctx.side {
JoinSide::Left => {
let unmatched = ctx.operator.unmatched_left_columns(txn, post, row_idx)?;
Ok(vec![Diff::Insert {
post: unmatched,
}])
}
JoinSide::Right => {
Ok(Vec::new())
}
}
}
pub(crate) fn handle_remove_undefined(
&self,
txn: &mut FlowTransaction,
pre: &Columns,
row_idx: usize,
ctx: &mut JoinContext,
) -> Result<Vec<Diff>> {
let row_number = pre.row_numbers[row_idx];
match ctx.side {
JoinSide::Left => {
let unmatched = ctx.operator.unmatched_left_columns(txn, pre, row_idx)?;
ctx.operator.cleanup_left_row_joins(txn, *row_number)?;
Ok(vec![Diff::Remove {
pre: unmatched,
}])
}
JoinSide::Right => {
Ok(Vec::new())
}
}
}
pub(crate) fn handle_update_undefined(
&self,
txn: &mut FlowTransaction,
pre: &Columns,
post: &Columns,
row_idx: usize,
ctx: &mut JoinContext,
) -> Result<Vec<Diff>> {
match ctx.side {
JoinSide::Left => {
let unmatched_pre = ctx.operator.unmatched_left_columns(txn, pre, row_idx)?;
let unmatched_post = ctx.operator.unmatched_left_columns(txn, post, row_idx)?;
Ok(vec![Diff::Update {
pre: unmatched_pre,
post: unmatched_post,
}])
}
JoinSide::Right => {
Ok(Vec::new())
}
}
}
pub(crate) fn handle_insert(
&self,
txn: &mut FlowTransaction,
post: &Columns,
indices: &[usize],
key_hash: &Hash128,
ctx: &mut JoinContext,
) -> Result<Vec<Diff>> {
if indices.is_empty() {
return Ok(Vec::new());
}
let mut result = Vec::new();
match ctx.side {
JoinSide::Left => {
add_to_state_entry_batch(txn, &mut ctx.state.left, key_hash, post, indices)?;
let emit_ctx = JoinEmitContext {
opposite_store: &ctx.state.right,
key_hash,
operator: ctx.operator,
opposite_parent: &ctx.operator.right_parent,
};
if let Some(diff) =
emit_joined_columns_batch(txn, post, indices, JoinSide::Left, &emit_ctx)?
{
result.push(diff);
} else {
let unmatched =
ctx.operator.unmatched_left_columns_batch(txn, post, indices)?;
result.push(Diff::Insert {
post: unmatched,
});
}
}
JoinSide::Right => {
let is_first = is_first_right_row(txn, &ctx.state.right, key_hash)?;
add_to_state_entry_batch(txn, &mut ctx.state.right, key_hash, post, indices)?;
if is_first && let Some(left_entry) = ctx.state.left.get(txn, key_hash)? {
let left_columns = ctx.operator.left_parent.pull(txn, &left_entry.rows)?;
let left_indices: Vec<usize> = (0..left_columns.row_count()).collect();
let unmatched = ctx.operator.unmatched_left_columns_batch(
txn,
&left_columns,
&left_indices,
)?;
result.push(Diff::Remove {
pre: unmatched,
});
}
let emit_ctx = JoinEmitContext {
opposite_store: &ctx.state.left,
key_hash,
operator: ctx.operator,
opposite_parent: &ctx.operator.left_parent,
};
if let Some(diff) =
emit_joined_columns_batch(txn, post, indices, JoinSide::Right, &emit_ctx)?
{
result.push(diff);
}
}
}
Ok(result)
}
pub(crate) fn handle_remove(
&self,
txn: &mut FlowTransaction,
pre: &Columns,
indices: &[usize],
key_hash: &Hash128,
ctx: &mut JoinContext,
) -> Result<Vec<Diff>> {
if indices.is_empty() {
return Ok(Vec::new());
}
let mut result = Vec::new();
match ctx.side {
JoinSide::Left => {
for &idx in indices {
let row_number = pre.row_numbers[idx];
ctx.operator.cleanup_left_row_joins(txn, *row_number)?;
}
let emit_ctx = JoinEmitContext {
opposite_store: &ctx.state.right,
key_hash,
operator: ctx.operator,
opposite_parent: &ctx.operator.right_parent,
};
if let Some(diff) =
emit_remove_joined_columns_batch(txn, pre, indices, JoinSide::Left, &emit_ctx)?
{
result.push(diff);
} else {
let unmatched = ctx.operator.unmatched_left_columns_batch(txn, pre, indices)?;
result.push(Diff::Remove {
pre: unmatched,
});
}
for &idx in indices {
let row_number = pre.row_numbers[idx];
remove_from_state_entry(txn, &mut ctx.state.left, key_hash, row_number)?;
}
}
JoinSide::Right => {
let emit_ctx = JoinEmitContext {
opposite_store: &ctx.state.left,
key_hash,
operator: ctx.operator,
opposite_parent: &ctx.operator.left_parent,
};
if let Some(diff) =
emit_remove_joined_columns_batch(txn, pre, indices, JoinSide::Right, &emit_ctx)?
{
result.push(diff);
}
let will_become_empty = if let Some(entry) = ctx.state.right.get(txn, key_hash)? {
entry.rows.len() <= indices.len()
} else {
false
};
for &idx in indices {
let row_number = pre.row_numbers[idx];
remove_from_state_entry(txn, &mut ctx.state.right, key_hash, row_number)?;
}
if will_become_empty && !ctx.state.right.contains_key(txn, key_hash)? {
let left_columns = pull_left_columns(
txn,
&ctx.state.left,
key_hash,
&ctx.operator.left_parent,
)?;
if !left_columns.is_empty() {
let left_indices: Vec<usize> = (0..left_columns.row_count()).collect();
let unmatched = ctx.operator.unmatched_left_columns_batch(
txn,
&left_columns,
&left_indices,
)?;
result.push(Diff::Insert {
post: unmatched,
});
}
}
}
}
Ok(result)
}
pub(crate) fn handle_update(
&self,
txn: &mut FlowTransaction,
pre: &Columns,
post: &Columns,
indices: &[usize],
keys: UpdateKeys,
ctx: &mut JoinContext,
) -> Result<Vec<Diff>> {
if indices.is_empty() {
return Ok(Vec::new());
}
let mut result = Vec::new();
if keys.pre == keys.post {
for &row_idx in indices {
let pre_row_number = pre.row_numbers[row_idx];
let post_row_number = post.row_numbers[row_idx];
match ctx.side {
JoinSide::Left => {
if update_row_in_entry(
txn,
&mut ctx.state.left,
keys.pre,
pre_row_number,
post_row_number,
)? {
let emit_ctx = JoinEmitContext {
opposite_store: &ctx.state.right,
key_hash: keys.pre,
operator: ctx.operator,
opposite_parent: &ctx.operator.right_parent,
};
if let Some(diff) = emit_update_joined_columns(
txn,
pre,
post,
row_idx,
JoinSide::Left,
&emit_ctx,
)? {
result.push(diff);
} else {
let unmatched_pre = ctx
.operator
.unmatched_left_columns(txn, pre, row_idx)?;
let unmatched_post = ctx
.operator
.unmatched_left_columns(txn, post, row_idx)?;
result.push(Diff::Update {
pre: unmatched_pre,
post: unmatched_post,
});
}
}
}
JoinSide::Right => {
if update_row_in_entry(
txn,
&mut ctx.state.right,
keys.pre,
pre_row_number,
post_row_number,
)? {
let emit_ctx = JoinEmitContext {
opposite_store: &ctx.state.left,
key_hash: keys.pre,
operator: ctx.operator,
opposite_parent: &ctx.operator.left_parent,
};
if let Some(diff) = emit_update_joined_columns(
txn,
pre,
post,
row_idx,
JoinSide::Right,
&emit_ctx,
)? {
result.push(diff);
}
}
}
}
}
} else {
let remove_diffs = self.handle_remove(txn, pre, indices, keys.pre, ctx)?;
result.extend(remove_diffs);
let insert_diffs = self.handle_insert(txn, post, indices, keys.post, ctx)?;
result.extend(insert_diffs);
}
Ok(result)
}
}