use crate::catalog::{
ArithmeticPos, AtomArgumentSignature, ComparisonExprPos, FnCallPredicatePos, JoinPredicates,
KvPredicates,
};
use crate::common::compute_fp;
use crate::parser::ConstType;
use crate::planner::{Collection, PlanError};
#[derive(PartialEq, Clone, Eq, Hash, Debug)]
pub(crate) struct KeyValueLayout {
pub(crate) key: Vec<ArithmeticPos>,
pub(crate) value: Vec<ArithmeticPos>,
}
impl KeyValueLayout {
#[inline]
pub(crate) fn new(key: Vec<ArithmeticPos>, value: Vec<ArithmeticPos>) -> Self {
Self { key, value }
}
#[inline]
pub(crate) fn key(&self) -> &[ArithmeticPos] {
&self.key
}
#[inline]
pub(crate) fn value(&self) -> &[ArithmeticPos] {
&self.value
}
#[inline]
pub(crate) fn extract_argument_ids_from_layout(&self) -> (Vec<usize>, Vec<usize>) {
let extract = |positions: &[ArithmeticPos]| -> Vec<usize> {
positions
.iter()
.flat_map(|pos| pos.signatures())
.map(|sig| sig.argument_id())
.collect()
};
(extract(self.key()), extract(self.value()))
}
#[inline]
pub(crate) fn extract_atom_id(&self) -> Result<usize, PlanError> {
self.key()
.iter()
.chain(self.value().iter())
.flat_map(|pos| pos.signatures())
.map(|sig| sig.atom_signature().rhs_id())
.next()
.ok_or_else(|| {
PlanError::internal(
"extract_atom_id: empty key/value layout has no atom signatures",
)
})
}
}
#[derive(Clone, Debug)]
pub(crate) enum TransformationInfo {
KVToKV {
input_info_fp: u64,
input_name: String,
output_info_fp: u64,
output_name: String,
is_row_input: bool,
is_row_output: bool,
input_kv_layout: KeyValueLayout,
output_kv_layout: KeyValueLayout,
predicates: KvPredicates,
is_sip_projection: bool,
},
JoinToKV {
left_input_info_fp: u64,
left_input_name: String,
right_input_info_fp: u64,
right_input_name: String,
output_info_fp: u64,
output_name: String,
is_row_output: bool,
left_input_kv_layout: KeyValueLayout,
right_input_kv_layout: KeyValueLayout,
output_kv_layout: KeyValueLayout,
predicates: JoinPredicates,
},
AntiJoinToKV {
left_input_info_fp: u64,
left_input_name: String,
right_input_info_fp: u64,
right_input_name: String,
output_info_fp: u64,
output_name: String,
is_row_output: bool,
left_input_kv_layout: KeyValueLayout,
right_input_kv_layout: KeyValueLayout,
output_kv_layout: KeyValueLayout,
},
}
impl TransformationInfo {
pub(crate) fn kv_to_kv(
input_fake_sig: u64,
input_name: String,
output_name: String,
is_row_input: bool,
input_kv_layout: KeyValueLayout,
output_fake_kv_layout: KeyValueLayout,
predicates: KvPredicates,
) -> Self {
let fake_output_sig = compute_fp((
"kv_to_kv",
&input_fake_sig,
&input_kv_layout,
&output_fake_kv_layout,
&predicates,
));
Self::KVToKV {
input_info_fp: input_fake_sig,
input_name,
output_info_fp: fake_output_sig,
output_name,
is_row_input,
is_row_output: false,
input_kv_layout,
output_kv_layout: output_fake_kv_layout,
predicates,
is_sip_projection: false,
}
}
pub(crate) fn into_sip_projection(mut self) -> Result<Self, PlanError> {
let Self::KVToKV {
is_sip_projection, ..
} = &mut self
else {
return Err(PlanError::internal(
"into_sip_projection: only applicable to KVToKV transformations",
));
};
*is_sip_projection = true;
Ok(self)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn join_to_kv(
left_fake_sig: u64,
left_input_name: String,
right_fake_sig: u64,
right_input_name: String,
output_name: String,
left_kv_layout: KeyValueLayout,
right_kv_layout: KeyValueLayout,
output_fake_kv_layout: KeyValueLayout,
predicates: JoinPredicates,
) -> Self {
let fake_output_sig = compute_fp((
"join_to_kv",
&left_fake_sig,
&right_fake_sig,
&left_kv_layout,
&right_kv_layout,
&output_fake_kv_layout,
&predicates,
));
Self::JoinToKV {
left_input_info_fp: left_fake_sig,
left_input_name,
right_input_info_fp: right_fake_sig,
right_input_name,
output_info_fp: fake_output_sig,
output_name,
is_row_output: false,
left_input_kv_layout: left_kv_layout,
right_input_kv_layout: right_kv_layout,
output_kv_layout: output_fake_kv_layout,
predicates,
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn anti_join_to_kv(
left_fake_sig: u64,
left_input_name: String,
right_fake_sig: u64,
right_input_name: String,
output_name: String,
left_kv_layout: KeyValueLayout,
right_kv_layout: KeyValueLayout,
output_fake_kv_layout: KeyValueLayout,
) -> Self {
let fake_output_sig = compute_fp((
"anti_join_to_kv",
&left_fake_sig,
&right_fake_sig,
&left_kv_layout,
&right_kv_layout,
&output_fake_kv_layout,
));
Self::AntiJoinToKV {
left_input_info_fp: left_fake_sig,
left_input_name,
right_input_info_fp: right_fake_sig,
right_input_name,
output_info_fp: fake_output_sig,
output_name,
is_row_output: false,
left_input_kv_layout: left_kv_layout,
right_input_kv_layout: right_kv_layout,
output_kv_layout: output_fake_kv_layout,
}
}
}
impl TransformationInfo {
#[inline]
pub(crate) fn is_neg_join(&self) -> bool {
matches!(self, Self::AntiJoinToKV { .. })
}
#[cfg(test)]
#[inline]
pub(crate) fn is_sip_projection(&self) -> bool {
matches!(
self,
Self::KVToKV {
is_sip_projection: true,
..
}
)
}
#[inline]
pub(crate) fn input_info_fp(&self) -> (u64, Option<u64>) {
match self {
Self::KVToKV { input_info_fp, .. } => (*input_info_fp, None),
Self::JoinToKV {
left_input_info_fp,
right_input_info_fp,
..
}
| Self::AntiJoinToKV {
left_input_info_fp,
right_input_info_fp,
..
} => (*left_input_info_fp, Some(*right_input_info_fp)),
}
}
#[inline]
pub(crate) fn output_info_fp(&self) -> u64 {
match self {
Self::KVToKV { output_info_fp, .. }
| Self::JoinToKV { output_info_fp, .. }
| Self::AntiJoinToKV { output_info_fp, .. } => *output_info_fp,
}
}
#[inline]
pub(crate) fn input_name(&self) -> (&str, Option<&str>) {
match self {
Self::KVToKV { input_name, .. } => (input_name.as_str(), None),
Self::JoinToKV {
left_input_name,
right_input_name,
..
}
| Self::AntiJoinToKV {
left_input_name,
right_input_name,
..
} => (left_input_name.as_str(), Some(right_input_name.as_str())),
}
}
#[inline]
pub(crate) fn output_name(&self) -> &str {
match self {
Self::KVToKV { output_name, .. }
| Self::JoinToKV { output_name, .. }
| Self::AntiJoinToKV { output_name, .. } => output_name.as_str(),
}
}
#[inline]
pub(crate) fn is_row_input(&self) -> bool {
match self {
Self::KVToKV { is_row_input, .. } => *is_row_input,
_ => panic!("Planner error: is_row_input is only available for KVToKV"),
}
}
#[inline]
pub(crate) fn is_row_output(&self) -> bool {
match self {
Self::KVToKV { is_row_output, .. }
| Self::JoinToKV { is_row_output, .. }
| Self::AntiJoinToKV { is_row_output, .. } => *is_row_output,
}
}
#[inline]
pub(crate) fn input_kv_layout(&self) -> (&KeyValueLayout, Option<&KeyValueLayout>) {
match self {
Self::KVToKV {
input_kv_layout, ..
} => (input_kv_layout, None),
Self::JoinToKV {
left_input_kv_layout,
right_input_kv_layout,
..
}
| Self::AntiJoinToKV {
left_input_kv_layout,
right_input_kv_layout,
..
} => (left_input_kv_layout, Some(right_input_kv_layout)),
}
}
#[inline]
pub(crate) fn output_kv_layout(&self) -> &KeyValueLayout {
match self {
Self::KVToKV {
output_kv_layout, ..
}
| Self::JoinToKV {
output_kv_layout, ..
}
| Self::AntiJoinToKV {
output_kv_layout, ..
} => output_kv_layout,
}
}
#[inline]
pub(crate) fn update_input_layout(&mut self, new_input_kv_layout: KeyValueLayout) {
match self {
Self::KVToKV {
input_kv_layout, ..
} => {
*input_kv_layout = new_input_kv_layout;
}
_ => panic!(
"Planner error: update_input_layout is only applicable to KVToKV transformations"
),
}
}
#[inline]
pub(crate) fn kv_predicates(&self) -> &KvPredicates {
match self {
Self::KVToKV { predicates, .. } => predicates,
_ => panic!("Planner error: kv_predicates is only available for KVToKV"),
}
}
#[inline]
pub(crate) fn join_predicates(&self) -> &JoinPredicates {
match self {
Self::JoinToKV { predicates, .. } => predicates,
_ => panic!("Planner error: join_predicates is only available for JoinToKV"),
}
}
}
impl TransformationInfo {
pub(crate) fn update_input_fake_info_fp(&mut self, input_real_sig: u64, input_fake_sig: &u64) {
match self {
Self::KVToKV { input_info_fp, .. } => {
*input_info_fp = input_real_sig;
}
Self::JoinToKV {
left_input_info_fp,
right_input_info_fp,
..
}
| Self::AntiJoinToKV {
left_input_info_fp,
right_input_info_fp,
..
} => {
if left_input_info_fp == input_fake_sig {
*left_input_info_fp = input_real_sig;
} else {
*right_input_info_fp = input_real_sig;
}
}
}
}
pub(crate) fn update_row_output(&mut self, is_row_output: bool) {
match self {
Self::KVToKV {
is_row_output: row_out,
..
}
| Self::JoinToKV {
is_row_output: row_out,
..
}
| Self::AntiJoinToKV {
is_row_output: row_out,
..
} => {
*row_out = is_row_output;
}
}
}
pub(crate) fn update_output_name(&mut self, new_output_name: String) {
match self {
Self::KVToKV { output_name, .. }
| Self::JoinToKV { output_name, .. }
| Self::AntiJoinToKV { output_name, .. } => {
*output_name = new_output_name;
}
}
}
pub(crate) fn update_output_key_value_layout(&mut self, real_output_kv_layout: KeyValueLayout) {
match self {
Self::KVToKV {
output_kv_layout, ..
}
| Self::JoinToKV {
output_kv_layout, ..
}
| Self::AntiJoinToKV {
output_kv_layout, ..
} => {
*output_kv_layout = real_output_kv_layout;
}
}
}
pub(crate) fn refactor_output_key_value_layout(
&mut self,
real_key_indices: &[usize],
real_value_indices: &[usize],
) {
match self {
Self::KVToKV {
output_kv_layout,
output_info_fp,
..
}
| Self::JoinToKV {
output_kv_layout,
output_info_fp,
..
}
| Self::AntiJoinToKV {
output_kv_layout,
output_info_fp,
..
} => {
let all_positions: Vec<ArithmeticPos> = output_kv_layout
.key()
.iter()
.chain(output_kv_layout.value().iter())
.cloned()
.collect();
let remap = |indices: &[usize]| -> Vec<ArithmeticPos> {
indices
.iter()
.map(|idx| {
all_positions.get(*idx).cloned().unwrap_or_else(|| {
panic!(
"Planner error: 0x{:016x} output layout index {} out of bounds (len {})",
output_info_fp,
idx,
all_positions.len()
)
})
})
.collect()
};
*output_kv_layout =
KeyValueLayout::new(remap(real_key_indices), remap(real_value_indices));
}
}
}
pub(crate) fn update_comparisons(
&mut self,
new_compare_exprs: Vec<ComparisonExprPos>,
) -> Result<(), PlanError> {
match self {
Self::KVToKV { predicates, .. } => {
predicates.compare_exprs.extend(new_compare_exprs);
Ok(())
}
Self::JoinToKV { predicates, .. } => {
predicates.compare_exprs.extend(new_compare_exprs);
Ok(())
}
Self::AntiJoinToKV { .. } => Err(PlanError::internal(
"update_comparisons: AntiJoinToKV has no comparisons to update",
)),
}
}
pub(crate) fn update_fn_call_preds(
&mut self,
new_fn_call_preds: Vec<FnCallPredicatePos>,
) -> Result<(), PlanError> {
match self {
Self::KVToKV { predicates, .. } => {
predicates.fn_call_preds.extend(new_fn_call_preds);
Ok(())
}
Self::JoinToKV { predicates, .. } => {
predicates.fn_call_preds.extend(new_fn_call_preds);
Ok(())
}
Self::AntiJoinToKV { .. } => Err(PlanError::internal(
"update_fn_call_preds: AntiJoinToKV has no fn_call_preds to update",
)),
}
}
pub(crate) fn update_const_eq_and_var_eq_constraints(
&mut self,
const_eq: Vec<(AtomArgumentSignature, ConstType)>,
var_eq: Vec<(AtomArgumentSignature, AtomArgumentSignature)>,
) -> Result<(), PlanError> {
match self {
Self::KVToKV { predicates, .. } => {
predicates.const_eq.extend(const_eq);
predicates.var_eq.extend(var_eq);
Ok(())
}
Self::JoinToKV { .. } | Self::AntiJoinToKV { .. } => Err(PlanError::internal(
"update_const_eq_and_var_eq_constraints: only applicable to unary (KVToKV) transformations",
)),
}
}
pub(crate) fn update_output_fake_sig(&mut self) {
match self {
Self::KVToKV {
input_info_fp,
is_row_input,
is_row_output,
input_kv_layout,
output_kv_layout,
predicates,
output_info_fp,
..
} => {
*output_info_fp = compute_fp((
"kv_to_kv",
input_info_fp,
is_row_input,
is_row_output,
input_kv_layout,
output_kv_layout,
predicates,
));
}
Self::JoinToKV {
left_input_info_fp,
right_input_info_fp,
is_row_output,
left_input_kv_layout,
right_input_kv_layout,
output_kv_layout,
predicates,
output_info_fp,
..
} => {
*output_info_fp = compute_fp((
"join_to_kv",
left_input_info_fp,
right_input_info_fp,
is_row_output,
left_input_kv_layout,
right_input_kv_layout,
output_kv_layout,
predicates,
));
}
Self::AntiJoinToKV {
left_input_info_fp,
right_input_info_fp,
is_row_output,
left_input_kv_layout,
right_input_kv_layout,
output_kv_layout,
output_info_fp,
..
} => {
*output_info_fp = compute_fp((
"anti_join_to_kv",
left_input_info_fp,
right_input_info_fp,
is_row_output,
left_input_kv_layout,
right_input_kv_layout,
output_kv_layout,
));
}
}
}
}
impl TransformationInfo {
pub(crate) fn operation_name(&self) -> &'static str {
match self {
Self::KVToKV { .. } => match (self.is_row_input(), self.is_row_output()) {
(true, true) => "[Row -> Row]",
(true, false) => "[Row -> KV]",
(false, true) => "[KV -> Row]",
(false, false) => "[KV -> KV]",
},
Self::JoinToKV { .. } => {
if self.is_row_output() {
"[Join -> Row]"
} else {
"[Join -> KV]"
}
}
Self::AntiJoinToKV { .. } => {
if self.is_row_output() {
"[AntiJoin -> Row]"
} else {
"[AntiJoin -> KV]"
}
}
}
}
}
impl std::fmt::Display for TransformationInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let coll = |fp: u64, name: &str, kv: &KeyValueLayout| {
Collection::new(fp, name.to_string(), kv.key(), kv.value())
};
writeln!(f, "{}", self.operation_name())?;
match self {
Self::KVToKV {
input_info_fp,
input_name,
output_info_fp,
output_name,
input_kv_layout,
output_kv_layout,
predicates,
..
} => {
writeln!(
f,
" In : {}",
coll(*input_info_fp, input_name, input_kv_layout)
)?;
writeln!(
f,
" Out : {}",
coll(*output_info_fp, output_name, output_kv_layout)
)?;
if !predicates.is_empty() {
writeln!(f, " F : (if {})", predicates)?;
}
}
Self::JoinToKV {
left_input_info_fp,
left_input_name,
right_input_info_fp,
right_input_name,
output_info_fp,
output_name,
left_input_kv_layout,
right_input_kv_layout,
output_kv_layout,
predicates,
..
} => {
writeln!(
f,
" Left : {}",
coll(*left_input_info_fp, left_input_name, left_input_kv_layout)
)?;
writeln!(
f,
" Right: {}",
coll(
*right_input_info_fp,
right_input_name,
right_input_kv_layout
)
)?;
writeln!(
f,
" Out : {}",
coll(*output_info_fp, output_name, output_kv_layout)
)?;
if !predicates.is_empty() {
writeln!(f, " F : (if {})", predicates)?;
}
}
Self::AntiJoinToKV {
left_input_info_fp,
left_input_name,
right_input_info_fp,
right_input_name,
output_info_fp,
output_name,
left_input_kv_layout,
right_input_kv_layout,
output_kv_layout,
..
} => {
writeln!(
f,
" Left : {}",
coll(*left_input_info_fp, left_input_name, left_input_kv_layout)
)?;
writeln!(
f,
" Right: {}",
coll(
*right_input_info_fp,
right_input_name,
right_input_kv_layout
)
)?;
writeln!(
f,
" Out : {}",
coll(*output_info_fp, output_name, output_kv_layout)
)?;
}
}
Ok(())
}
}