use crate::{
query::fallback::FallbackPlanner,
query::filter::{QueryFilter,Exact,IntoFilter,FilterForHeader},
query::request::{QueryRequest,QueryRequestImpl,AddFilter,AllRows,BlankRequest},
query::{QueryPlan,QueryPlanImpl},
relation::{RelationImpl,Relation,SelfQuery,Queryable,RelProxy,QueryOutput,
query::FilterRel},
header::{Header,ProjectFrom,HasCol},
record::{Record,proj::Projection,ExternalRecord},
col::Col,
};
use tylisp::{
sexpr,eval,sexpr_quoted_types,calc,HNil,defun_nocalc,defun,
typenum as tn,
engine::Eval,
ops::{
Phantom, Partial, If, Is, Ret,
list::{Intersect,Union,Remove,CollatedBy,Contains,
Concat,Cons,Without,BuildList,EmptyP},
logic::{And, Or},
},
};
use either::Either;
use std::marker::PhantomData;
pub struct PeerJoin<L, R> {
left: L,
right: R,
}
pub trait PeerJoinTypes {
type OutCols: Header;
type FastCols: Header;
type Key: Col + Eq;
}
impl<L,R,K,Cols,FastCols> PeerJoinTypes for PeerJoin<L,R>
where
L: RelationImpl,
R: RelationImpl,
K: Col + Eq,
L::Cols: HasCol<K>,
R::Cols: HasCol<K>,
sexpr!{Intersect, {Phantom, @L::Cols}, @R::Cols}: Eval<Result = sexpr!{K}>,
sexpr!{Union, @L::Cols, @R::Cols}: Eval<Result = Cols>,
sexpr!{Union, {If, {Contains, @L::FastCols, @K}, @R::FastCols, @{}},
{If, {Contains, @R::FastCols, @K}, @L::FastCols, @{}}}:
Eval<Result = FastCols>,
Cols: Header + HasCol<K>,
FastCols: Header,
{
type OutCols = Cols;
type FastCols = FastCols;
type Key = K;
}
impl<L,R> RelationImpl for PeerJoin<L,R>
where
Self: PeerJoinTypes,
{
type Cols = <Self as PeerJoinTypes>::OutCols;
type FastCols = <Self as PeerJoinTypes>::FastCols;
type Planner = JoinPlanner;
}
#[derive(Default,Copy,Clone,Debug)]
pub struct JoinPlanner;
defun!{ JoinPlanner {
('a, L, R, Q) { _:&'a PeerJoin<L,R>, _:Q } => {Ret, @PeerJoinPlan<'a,L,R,Q> };
}}
impl<'a,L,R> QueryOutput<'a> for PeerJoin<L,R>
where
Self: PeerJoinTypes
+ RelationImpl<Cols = <Self as PeerJoinTypes>::OutCols>,
L: Relation<'a>,
R: Relation<'a>,
PeerJoinRow<L::QueryRow, R::QueryRow>:
ExternalRecord<'a, Cols = <Self as PeerJoinTypes>::OutCols>,
{
type QueryRow = PeerJoinRow<L::QueryRow, R::QueryRow>;
}
pub enum PeerJoinPlan<'a,L,R,Req> {
Left(&'a PeerJoin<L,R>, Req),
Right(&'a PeerJoin<L,R>, Req),
}
impl<'a,L,R,Req,Key,Unsorted> IntoIterator for PeerJoinPlan<'a,L,R,Req>
where
PeerJoin<L,R>: PeerJoinTypes<
Key = Key,
>,
Key: Col + Eq,
L::Cols: HasCol<Key>,
R::Cols: HasCol<Key>,
Req: QueryRequest<OrderBy = HNil> + 'a,
AddFilter<Req, Exact<&'a Key>>: QueryRequest,
L: Queryable<'a, Req>
+ Queryable<'a, AddFilter<Req, Exact<&'a Key>>>,
R: Queryable<'a, Req>
+ Queryable<'a, AddFilter<Req, Exact<&'a Key>>>,
sexpr!{EmptyP, @Req::OrderBy} : Eval<Result=Unsorted>,
{
type Item = PeerJoinRow<L::QueryRow, R::QueryRow>;
type IntoIter = impl Iterator<Item = Self::Item>;
fn into_iter(self) -> Self::IntoIter {
match self {
PeerJoinPlan::Left(rel, req) => Either::Left(
rel.left.query(req.clone()).flat_map(move |l_row|
rel.right.query(
req.clone()
.add_filter(Exact(l_row.ext_col_ref::<Key>()))
).map(move |r_row| PeerJoinRow(l_row, r_row))
)
),
PeerJoinPlan::Right(rel, req) => Either::Right(
rel.right.query(req.clone()).flat_map(move |r_row|
rel.left.query(
req.clone()
.add_filter(Exact(r_row.ext_col_ref::<Key>()))
).map(move |l_row| PeerJoinRow(l_row, r_row))
)
),
}
}
}
impl<'a,L,R,Req,Key,FiltCols>
QueryPlanImpl<'a, PeerJoin<L,R>, Req> for PeerJoinPlan<'a,L,R,Req>
where
L:RelationImpl,
R:RelationImpl,
Req: QueryRequest + 'a,
Req::Filters: QueryFilter<ReqCols=FiltCols>,
FiltCols: Header,
PeerJoin<L,R>: PeerJoinTypes<Key=Key>,
Key: Col + Eq,
{
fn prepare(rel: &'a PeerJoin<L,R>, req: Req)->Self {
match (L::FastCols::has_col::<Key>(),
R::FastCols::has_col::<Key>()) {
(true, false) => { return PeerJoinPlan::Right(rel, req); }
(false, true) => { return PeerJoinPlan::Left(rel, req); }
_ => (),
};
match (!FiltCols::is_disjoint::<L::FastCols>(),
!FiltCols::is_disjoint::<R::FastCols>()) {
(true, _) => { return PeerJoinPlan::Left(rel, req); }
(_, true) => { return PeerJoinPlan::Right(rel, req); }
_ => (),
};
match (!FiltCols::is_disjoint::<L::Cols>(),
!FiltCols::is_disjoint::<R::Cols>()) {
(true, false) => { return PeerJoinPlan::Left(rel, req); }
(false, true) => { return PeerJoinPlan::Right(rel, req); }
_ => (),
};
PeerJoinPlan::Left(rel, req)
}
}
impl<'a,L,R> IntoIterator for &'a PeerJoin<L,R>
where
PeerJoinPlan<'a,L,R,BlankRequest>: IntoIterator
{
type Item = <PeerJoinPlan<'a,L,R,BlankRequest> as IntoIterator>::Item;
type IntoIter = impl Iterator<Item = Self::Item>;
fn into_iter(self)->Self::IntoIter {
PeerJoinPlan::Left( self, BlankRequest ).into_iter()
}
}
impl<L,R> PeerJoin<L,R> {
pub(super) fn new(left:L, right:R) -> Self where
Self: RelationImpl,
{
PeerJoin { left, right }
}
}
#[derive(Copy,Clone,Debug)]
pub struct PeerJoinRow<L,R>(L,R);
impl<L,R,Cols>
Record for PeerJoinRow<L,R>
where
L:Record,
R:Record,
sexpr!{Union, @L::Cols, @R::Cols}: Eval<Result=Cols>,
Cols: Header
{
type Cols = Cols;
fn col_opt<C:Col>(&self)->Option<&C> {
self.0.col_opt::<C>().or_else(||self.1.col_opt::<C>())
}
}
impl<'a,L,R>
ExternalRecord<'a> for PeerJoinRow<L,R>
where
L: ExternalRecord<'a>,
R: ExternalRecord<'a>,
Self: Record,
{
fn ext_col_opt<C:Col>(&self)->Option<&'a C> {
self.0.ext_col_opt::<C>().or_else(||self.1.ext_col_opt::<C>())
}
}
#[cfg(test)] mod test {
use super::*;
use crate::relation::BTreeIndex;
use crate::relation::{OpaqueRel,Insert};
use tylisp::sexpr_val;
col!{A: usize}
col!{B: usize}
col!{C: usize}
col!{D: usize}
col!{E: usize}
#[test] fn peer_join_types() {
type VecVec = PeerJoin<Vec<(A,B)>, Vec<(B,C)>>;
assert_type_eq!{B: <VecVec as PeerJoinTypes>::Key};
assert_type_eq!{HNil: <VecVec as PeerJoinTypes>::FastCols};
dbg!{std::any::type_name::<<VecVec as PeerJoinTypes>::OutCols>()};
assert!{<VecVec as PeerJoinTypes>::OutCols::has_col::<A>()};
assert!{<VecVec as PeerJoinTypes>::OutCols::has_col::<B>()};
assert!{<VecVec as PeerJoinTypes>::OutCols::has_col::<C>()};
type VecJoin = PeerJoin<Vec<(A,D)>, VecVec>;
assert_type_eq!{A: <VecJoin as PeerJoinTypes>::Key};
assert_type_eq!{HNil: <VecVec as PeerJoinTypes>::FastCols};
dbg!{std::any::type_name::<<VecJoin as PeerJoinTypes>::OutCols>()};
assert!{<VecJoin as PeerJoinTypes>::OutCols::has_col::<A>()};
assert!{<VecJoin as PeerJoinTypes>::OutCols::has_col::<B>()};
assert!{<VecJoin as PeerJoinTypes>::OutCols::has_col::<C>()};
assert!{<VecJoin as PeerJoinTypes>::OutCols::has_col::<D>()};
type TreeAB = PeerJoin<BTreeIndex<A, Vec<(A,B)>>,
BTreeIndex<B, Vec<(B,C)>>>;
assert_type_eq!{B: <TreeAB as PeerJoinTypes>::Key};
assert_type_eq!{sexpr!{A}: <TreeAB as PeerJoinTypes>::FastCols};
dbg!{std::any::type_name::<<TreeAB as PeerJoinTypes>::OutCols>()};
assert!{<TreeAB as PeerJoinTypes>::OutCols::has_col::<A>()};
assert!{<TreeAB as PeerJoinTypes>::OutCols::has_col::<B>()};
assert!{<TreeAB as PeerJoinTypes>::OutCols::has_col::<C>()};
type TreeBB = PeerJoin<BTreeIndex<B, Vec<(A,B)>>,
BTreeIndex<B, Vec<(B,C)>>>;
assert_type_eq!{B: <TreeBB as PeerJoinTypes>::Key};
assert_type_eq!{sexpr!{B}: <TreeBB as PeerJoinTypes>::FastCols};
dbg!{std::any::type_name::<<TreeBB as PeerJoinTypes>::OutCols>()};
assert!{<TreeBB as PeerJoinTypes>::OutCols::has_col::<A>()};
assert!{<TreeBB as PeerJoinTypes>::OutCols::has_col::<B>()};
assert!{<TreeBB as PeerJoinTypes>::OutCols::has_col::<C>()};
}
#[test]
fn test_peer_join() {
let mut left: OpaqueRel<Vec<sexpr!{A,B}>> = Default::default();
let mut right: OpaqueRel<Vec<sexpr!{C,B,D}>> = Default::default();
let mut lr: OpaqueRel<Vec<sexpr!{A,B,C,D}>> = Default::default();
let mut third: OpaqueRel<Vec<sexpr!{C,E}>> = Default::default();
left.insert(sexpr_val!{A(1),B(1),C(3)}).unwrap();
left.insert(sexpr_val!{A(2),B(2),C(7)}).unwrap();
left.insert(sexpr_val!{A(3),B(5),C(7)}).unwrap();
right.insert(sexpr_val!{D(1),B(1),C(3)}).unwrap();
right.insert(sexpr_val!{D(2),B(2),C(7)}).unwrap();
right.insert(sexpr_val!{D(3),B(5),C(7)}).unwrap();
third.insert(sexpr_val!{C(7),E(2)}).unwrap();
third.insert(sexpr_val!{C(3),E(3)}).unwrap();
let join = PeerJoin::<_,_>{
left: left.as_ref(),
right: right.as_ref(),
};
#[cfg(feature = "slow_tests")] let join = PeerJoin::<_,_>{
left: join.as_ref(),
right: third.as_ref(),
};
assert_eq!(join.iter_all().count(), 3);
assert_eq!(join.where_eq(A(1)).iter_all().count(), 1);
}
}