use super::RulePlanner;
use crate::catalog::{
ArithmeticPos, AtomArgumentSignature, AtomSignature, Catalog, JoinPredicates, KvPredicates,
};
use crate::planner::{KeyValueLayout, PlanError, TransformationInfo};
use tracing::trace;
impl RulePlanner {
pub(crate) fn apply_sip(&mut self, catalog: &mut Catalog) -> Result<(), PlanError> {
let n = catalog.positive_atom_number();
if n <= 2 {
return Ok(());
}
for left in 0..n {
for right in (left + 1)..n {
self.try_apply_sip_pair(catalog, left, right, "forward")?;
}
}
for left in (0..n).rev() {
for right in (0..left).rev() {
self.try_apply_sip_pair(catalog, left, right, "backward")?;
}
}
Ok(())
}
fn try_apply_sip_pair(
&mut self,
catalog: &mut Catalog,
left: usize,
right: usize,
direction: &str,
) -> Result<(), PlanError> {
if !catalog.check_sip_pair(left, right) {
return Ok(());
}
trace!("SIP: {direction} atom_pos{} -> atom_pos{}", left, right);
self.apply_sip_premaps(catalog, (left, right))?;
self.apply_sip_projection_semijoin(catalog, left, right)?;
trace!("Catalog:\n{}", catalog);
trace!("{}", "-".repeat(60));
Ok(())
}
fn apply_sip_premaps(
&mut self,
catalog: &mut Catalog,
sip_pair: (usize, usize),
) -> Result<(), PlanError> {
for idx in [sip_pair.0, sip_pair.1] {
let fp = catalog.positive_atom_fingerprint(idx);
if catalog.original_atom_fingerprints().contains(&fp) {
self.create_edb_premap_transformations(catalog, idx, true)?;
}
}
Ok(())
}
fn apply_sip_projection_semijoin(
&mut self,
catalog: &mut Catalog,
lhs_pos_idx: usize,
rhs_pos_idx: usize,
) -> Result<(), PlanError> {
let base_idx = self.transformation_infos.len();
let originals = catalog.original_atom_fingerprints();
let left_fp = catalog.positive_atom_fingerprint(lhs_pos_idx);
let left_arg_sigs = catalog.positive_atom_argument_signature(lhs_pos_idx);
let right_fp = catalog.positive_atom_fingerprint(rhs_pos_idx);
let right_atom_sig = AtomSignature::new(true, rhs_pos_idx);
let right_arg_sigs = catalog.positive_atom_argument_signature(rhs_pos_idx);
self.insert_consumer(originals, left_fp, base_idx)?;
self.insert_consumer(originals, right_fp, base_idx + 1)?;
let (lhs_keys, lhs_vals, rhs_keys, rhs_vals) =
Self::partition_shared_keys(catalog, left_arg_sigs, right_arg_sigs);
Self::trace_sip_partitions(catalog, &lhs_keys, &lhs_vals, &rhs_vals);
let left_name = catalog.positive_atom_name(lhs_pos_idx)?.to_string();
let lhs_key_names = RulePlanner::attrs_from_positions(&lhs_keys, catalog);
let proj_name = RulePlanner::proj_name(&left_name, &lhs_key_names);
let proj_tx = TransformationInfo::kv_to_kv(
left_fp,
left_name,
proj_name.clone(),
originals.contains(&left_fp),
KeyValueLayout::new(lhs_keys.clone(), lhs_vals),
KeyValueLayout::new(lhs_keys.clone(), vec![]),
KvPredicates::default(),
)
.into_sip_projection()?;
let proj_fp = proj_tx.output_info_fp();
self.insert_producer(proj_fp, base_idx);
self.transformation_infos.push(proj_tx);
self.insert_consumer(originals, proj_fp, base_idx + 1)?;
let lhs_new_keys: Vec<ArithmeticPos> = lhs_keys
.iter()
.enumerate()
.map(|(idx, pos)| {
let sig = pos.init().as_var_signature().unwrap();
let new_sig = AtomArgumentSignature::new(*sig.atom_signature(), idx);
ArithmeticPos::from_var_signature(new_sig)
})
.collect();
let right_name = catalog.positive_atom_name(rhs_pos_idx)?.to_string();
let semijoin_name = RulePlanner::semijoin_name(&proj_name, &right_name, &lhs_key_names);
let semijoin_tx = TransformationInfo::join_to_kv(
proj_fp,
proj_name,
right_fp,
right_name,
semijoin_name.clone(),
KeyValueLayout::new(lhs_new_keys.clone(), vec![]),
KeyValueLayout::new(rhs_keys.clone(), rhs_vals.clone()),
KeyValueLayout::new(lhs_new_keys.clone(), rhs_vals.clone()),
JoinPredicates::default(),
);
let semijoin_fp = semijoin_tx.output_info_fp();
self.insert_producer(semijoin_fp, base_idx + 1);
self.transformation_infos.push(semijoin_tx);
let new_arguments_list = rhs_keys
.iter()
.chain(rhs_vals.iter())
.map(|pos| *pos.init().as_var_signature().unwrap())
.collect();
catalog.sip_modify(
right_atom_sig,
new_arguments_list,
semijoin_name,
semijoin_fp,
)?;
Ok(())
}
fn trace_sip_partitions(
catalog: &Catalog,
lhs_keys: &[ArithmeticPos],
lhs_vals: &[ArithmeticPos],
rhs_vals: &[ArithmeticPos],
) {
let fmt = |pos: &ArithmeticPos| {
(
pos.clone(),
catalog.signature_to_argument_str(pos.init().as_var_signature().unwrap()),
)
};
trace!(
"SIP semijoin keys: {:?}",
lhs_keys.iter().map(fmt).collect::<Vec<_>>()
);
trace!(
"SIP semijoin LHS values: {:?}",
lhs_vals.iter().map(fmt).collect::<Vec<_>>()
);
trace!(
"SIP semijoin RHS values: {:?}",
rhs_vals.iter().map(fmt).collect::<Vec<_>>()
);
}
}
#[cfg(test)]
mod tests {
use super::super::common::test_setup;
#[test]
fn apply_sip_skips_two_atom_rule() {
let (mut planner, mut catalog) = test_setup(
"\
.decl A(a: int32)\n\
.decl B(a: int32)\n\
.decl Out(x: int32)\n\
.input A(IO=\"file\", filename=\"A.csv\", delimiter=\",\")\n\
.input B(IO=\"file\", filename=\"B.csv\", delimiter=\",\")\n\
.output Out\n\
Out(x) :- A(x), B(x).\n",
);
planner.apply_sip(&mut catalog).expect("sip");
assert_eq!(
planner.transformation_infos().len(),
0,
"SIP must not run on 2-atom rules"
);
assert_eq!(catalog.positive_atom_number(), 2);
}
#[test]
fn apply_sip_forward_pass_fires() {
let (mut planner, mut catalog) = test_setup(
"\
.decl A(a: int32)\n\
.decl B(a: int32, b: int32)\n\
.decl C(a: int32, b: int32)\n\
.input A(IO=\"file\", filename=\"A.csv\", delimiter=\",\")\n\
.input B(IO=\"file\", filename=\"B.csv\", delimiter=\",\")\n\
.input C(IO=\"file\", filename=\"C.csv\", delimiter=\",\")\n\
.decl Out(x: int32, z: int32)\n\
.output Out\n\
Out(x, z) :- A(x), B(x, y), C(y, z).\n",
);
planner.apply_sip(&mut catalog).expect("sip");
let proj_count = planner
.transformation_infos()
.iter()
.filter(|t| t.is_sip_projection())
.count();
assert!(
proj_count >= 2,
"expected at least 2 SIP projections from forward pass (A→B, B→C), got {proj_count}"
);
assert_eq!(catalog.positive_atom_number(), 3);
}
#[test]
fn apply_sip_backward_pass_filters_earlier_atom() {
let (mut planner, mut catalog) = test_setup(
"\
.decl A(a: int32)\n\
.decl B(a: int32, b: int32)\n\
.decl C(a: int32)\n\
.input A(IO=\"file\", filename=\"A.csv\", delimiter=\",\")\n\
.input B(IO=\"file\", filename=\"B.csv\", delimiter=\",\")\n\
.input C(IO=\"file\", filename=\"C.csv\", delimiter=\",\")\n\
.decl Out(z: int32)\n\
.output Out\n\
Out(z) :- A(y), B(y, z), C(z).\n",
);
planner.apply_sip(&mut catalog).expect("sip");
let proj_count = planner
.transformation_infos()
.iter()
.filter(|t| t.is_sip_projection())
.count();
assert!(
proj_count >= 4,
"forward alone yields 2 projections; with backward expect ≥4, got {proj_count}"
);
assert_eq!(catalog.positive_atom_number(), 3);
}
}