mem-query 0.0.1

Relational algebra interface for Rust collections
use crate::{
    query::fallback::FallbackPlanner,
    query::filter::{Exact,IntoFilter,FilterForHeader},
    query::request::{QueryRequest,QueryRequestImpl,AddFilter,AllRows},
    query::{RealizedQuery,QueryPlan},
    relation::{RelationImpl,Relation,SelfQuery,Queryable,RelProxy,
        query::FilterRel},
    header::{Header,ProjectFrom,HasCol},
    record::{Record,Record,proj::Projection,ExternalRecord},
    col::Col,
};

use tylisp::{
    sexpr,eval,sexpr_quoted_types,calc,HNil,defun_nocalc,
    engine::{Eval,Calc},
    ops::{
        Phantom, Partial, If, Is,
        list::{Intersect,Union,Remove,CollatedBy,Contains,
               Concat,Cons,Without,BuildList},
        logic::{And, Or},
    },
};

use std::marker::PhantomData;

pub struct PeerJoin<L, R, K> {
    left: L,
    right: R,
    phantom: PhantomData<K>
}

/*
defun!{ BuildPeerJoin {
    (A:RelationImpl, B:RelationImpl) { a:A, b:B } =>
    {
    };
}}
*/

impl<L,R,K> PeerJoin<L,R,K> {
    pub(super) fn new(left:L, right:R) -> Self where
        Self: RelationImpl,
    {
        PeerJoin { left, right, phantom: PhantomData }
    }
}

pub trait PeerJoinCols {
    type Key: Col;
    type LInCols: Header;
    type RInCols: Header;
    type LFastCols: Header;
    type RFastCols: Header;
    type LOutCols: Header;
    type ROutCols: Header;
    type OutCols: Header;
    type FastCols: Header;
}

impl<L,R,K,LOutCols,ROutCols,OutCols,FastCols> PeerJoinCols for PeerJoin<L,R,K> where
    L: RelationImpl,
    R: RelationImpl,
    K: Col+Eq,
    L::Cols: CollatedBy<sexpr!{Partial, Is, @K},
                        Passed = sexpr!{K},
                        Failed=LOutCols>,
    R::Cols: CollatedBy<sexpr!{Partial, Is, @K},
                        Passed = sexpr!{K},
                        Failed = ROutCols>,
    LOutCols: Header,
    ROutCols: Header,
    sexpr!{Cons, @K, {Concat, @LOutCols, @ROutCols}}: Eval<Result=OutCols>,
    OutCols: Header,
    sexpr!{Without, @K, {Concat, {If, {Contains, @L::FastCols, @K},
                                       @R::FastCols, @{}},
                                 {If, {Contains, @R::FastCols, @K},
                                       @L::FastCols, @{}}}
    } : Eval<Result=FastCols>,
    FastCols: Header,
{    
    type Key = K;
    type LInCols = L::Cols;
    type RInCols = R::Cols;
    type LFastCols = L::FastCols;
    type RFastCols = R::FastCols;
    type LOutCols= LOutCols;
    type ROutCols= ROutCols;
    type OutCols= OutCols;
    type FastCols= FastCols;
}

impl<L,R,K> RelationImpl for PeerJoin<L,R,K>
where
    Self: PeerJoinCols
{
    type Cols = <Self as PeerJoinCols>::OutCols;
    type FastCols = <Self as PeerJoinCols>::FastCols;
    type Planner = FallbackPlanner;
}

/* Planner strategy:
     Any sort order => fallback
     Any filter for multiple columns => fallback

     Any filter on a left-side fast column => left priority
     Any filter on a right-side fast column && key is fast on left
        => Right priority (optional?)
     Otherwise, left priority

    Project everything at the end, don't worry about leaving out
    intermediate columns.  It's all references anyway.
*/

/*
defun!{ BuildLeftPlan {
    => &'a PeerJoin<L,R,K>,
       {BuildRequest, @LInCols, @LFilts, @{}},
       {BuildRequest, @RInCols, @RFilts, @{}},
       @LOutCols,
       @Req::OutputCols
*/

/*
#[derive(Debug, Default)]
pub struct PeerJoinPlanner;
defun_nocalc!{() PeerJoinPlanner {
    ('a, F:Col, P:RelationImpl, Req:QueryRequest)
    { _:&'a SubordinateJoin<P,F>, _:Req }
    => {Cond, {{And, {EmptyP, @Req::OrderBy},
                     {All, @CheckFilter<P, F::Inner>, @Req::Filters}},
               {BuildPeerJoinPlan, @&'a SubordinateJoin<P,F>, @Req}},
              {tn::True,
                {FallbackPlanner, @&'a SubordinateJoin<P,F>, @Req}}};
}}
*/

#[derive(Debug,Default)]
pub struct CheckFilter<R1,R2>(R1,R2);
defun_nocalc!{(R1, R2) CheckFilter<R1,R2> {
    (R1:RelationImpl, R2:RelationImpl, Filt) { _:Filt }
    => {Or, {FilterForHeader, @R1::Cols, @Filt},
            {FilterForHeader, @R2::Cols, @Filt}};
}}

pub struct LeftPrimaryPlan<'a,L,R,K,LReq,RReq,LOutCols,OutCols> {
    join: &'a PeerJoin<L,R,K>,
    l_req: LReq,
    r_req: RReq,
    phantom: PhantomData<(K,LOutCols,OutCols)>
}

impl<'a,L,R,K,LReq,RReq,LPlan,RPlan,LOutCols,OutCols> IntoIterator
for LeftPrimaryPlan<'a,L,R,K,LReq,RReq,LOutCols,OutCols> 
where 
    L:Queryable<'a, LReq, Plan=LPlan>,
    LReq: QueryRequest<OrderBy=sexpr!{}> + 'a,
    RReq: QueryRequest<OrderBy=sexpr!{}> + 'a,
    LPlan: QueryPlan<'a, L, LReq> + 'a,
    K: Col+Eq,
    LReq::OutputCols: HasCol<K>,
    LOutCols: ProjectFrom<LReq::OutputCols>,
    R:Queryable<'a, AddFilter<RReq, Exact<&'a K>>, Plan=RPlan>,
    RPlan:QueryPlan<'a, R, AddFilter<RReq, Exact<&'a K>>> + 'a,
    AddFilter<RReq, Exact<&'a K>>: QueryRequest<OrderBy=sexpr!{}>,
    (Projection<LPlan::Row, LOutCols>, RPlan::Row): ExternalRecord<'a>,
    OutCols: ProjectFrom<
        <(Projection<LPlan::Row, LOutCols>, RPlan::Row) as Record>::Cols
    >,
    Projection<(Projection<LPlan::Row, LOutCols>, RPlan::Row), OutCols>:
        ExternalRecord<'a, Cols=OutCols>

{
    type Item = Projection<(Projection<LPlan::Row, LOutCols>, RPlan::Row), OutCols>;
    type IntoIter = impl Iterator<Item = Self::Item>;

    fn into_iter(self) -> Self::IntoIter {
        let r_req = self.r_req;
        let right: &'a R = &self.join.right;
        self.join.left.query(self.l_req).flat_map(move |l_row| {
            let k:&'a K = l_row.ext_col_ref();
            let l_row = l_row.project::<LOutCols>();
            right.query(r_req.clone().add_filter(Exact(k)))
                .map(move |r_row| (l_row.clone(), r_row).project())
        })
    }
}

impl<'a,L,R,K,OutCols,LOutCols,ROutCols,LInCols,RInCols>
IntoIterator for &'a PeerJoin<L,R,K>
where
    PeerJoin<L,R,K>: PeerJoinCols<
        LOutCols=LOutCols,
        ROutCols=ROutCols,
        LInCols=LInCols,
        RInCols=RInCols,
        OutCols=OutCols>,
    L:RelationImpl,
    R:RelationImpl,
    LeftPrimaryPlan<'a,L,R,K,AllRows<L>,AllRows<R>,LOutCols,OutCols>: IntoIterator,
{
    type Item =
        <LeftPrimaryPlan<'a,L,R,K,AllRows<L>,AllRows<R>,LOutCols,OutCols>
         as IntoIterator>::Item;
    type IntoIter = <LeftPrimaryPlan<'a,L,R,K,AllRows<L>,AllRows<R>,LOutCols,OutCols>
         as IntoIterator>::IntoIter;

    fn into_iter(self)->Self::IntoIter {
        LeftPrimaryPlan {
            join: self,
            l_req: Default::default(),
            r_req: Default::default(),
            phantom: PhantomData,
        }.into_iter()
    }
}

#[test]
fn test_peer_join() {
    use crate::relation::{OpaqueRel,Insert};
    use tylisp::sexpr_val;

    col!{A: usize}
    col!{B: usize}
    col!{C: usize}
    col!{D: usize}
    col!{E: usize}

    let mut left: OpaqueRel<Vec<sexpr!{A,B}>> = Default::default();
    let mut right: OpaqueRel<Vec<sexpr!{C,B,D}>> = Default::default();
    let mut lr: OpaqueRel<Vec<sexpr!{A,B,C,D}>> = Default::default();
    let mut third: OpaqueRel<Vec<sexpr!{C,E}>> = Default::default();

    left.insert(sexpr_val!{A(1),B(1),C(3)}).unwrap();
    left.insert(sexpr_val!{A(2),B(2),C(7)}).unwrap();
    left.insert(sexpr_val!{A(3),B(5),C(7)}).unwrap();

    right.insert(sexpr_val!{D(1),B(1),C(3)}).unwrap();
    right.insert(sexpr_val!{D(2),B(2),C(7)}).unwrap();
    right.insert(sexpr_val!{D(3),B(5),C(7)}).unwrap();

    third.insert(sexpr_val!{C(7),E(2)}).unwrap();
    third.insert(sexpr_val!{C(3),E(3)}).unwrap();

    let join = PeerJoin::<_,_,B>{
        left: left.as_ref().as_dyn(),
        right: right.as_ref().as_dyn(),
        phantom: PhantomData
    };

    let join = PeerJoin::<_,_,C>{
        left: join.as_ref().as_dyn(),
        right: third.as_ref().as_dyn(),
        phantom: PhantomData
    };

    //assert_eq!(3, IntoIterator::into_iter(&join2).count());
    assert_eq!(join.iter_all().count(), 3);
} 

#[test]
fn test_peer_join_btree() {
    use crate::relation::{OpaqueRel,Insert,BTreeIndex};
    use tylisp::sexpr_val;

    col!{A: usize}
    col!{B: usize}
    col!{C: usize}
    col!{D: usize}
    col!{E: usize}

    let mut left: BTreeIndex<A,Vec<sexpr!{A,B}>> = Default::default();
    let mut right: BTreeIndex<B,Vec<sexpr!{C,B,D}>> = Default::default();
    let mut third: BTreeIndex<C, Vec<sexpr!{C,E}>> = Default::default();

    left.insert(sexpr_val!{A(1),B(1),C(3)}).unwrap();
    left.insert(sexpr_val!{A(2),B(2),C(7)}).unwrap();
    left.insert(sexpr_val!{A(3),B(5),C(7)}).unwrap();

    right.insert(sexpr_val!{D(1),B(1),C(3)}).unwrap();
    right.insert(sexpr_val!{D(2),B(2),C(7)}).unwrap();
    right.insert(sexpr_val!{D(3),B(5),C(7)}).unwrap();

    third.insert(sexpr_val!{C(7),E(2)}).unwrap();

    let join = PeerJoin::<_,_,B>{
        left: left.as_ref(),
        right: right.as_ref(),
        phantom: PhantomData
    };
/*
    let join2 = PeerJoin::<_,_,C>{
        left: join.as_ref(),
        right: third.as_ref(),
        phantom: PhantomData
    };
*/

//    assert_eq!(3, IntoIterator::into_iter(&join2).count());
//    assert_eq!(join.iter_all().count(), 3);
} 

/*
#[test]
fn test_peer_join_btree() {
    use crate::relation::{BTreeIndex,Insert};
    use tylisp::sexpr_val;

    col!{A: usize}
    col!{B: usize}
    col!{C: usize}
    col!{D: usize}
    col!{E: usize}

    let mut left: BTreeIndex<A,BTreeIndex<B,Option<sexpr!{A,B,C}>>> = Default::default();
    let mut right: BTreeIndex<C,Vec<sexpr!{C,D}>> = Default::default();
    let mut third: BTreeIndex<D, Option<(E,D)>> = Default::default();

    left.insert(sexpr_val!{A(1),B(2),C(3)}).unwrap();
    left.insert(sexpr_val!{A(2),B(2),C(7)}).unwrap();
    left.insert(sexpr_val!{A(3),B(5),C(7)}).unwrap();

    right.insert(sexpr_val!{D(1),B(2),C(3)}).unwrap();
    right.insert(sexpr_val!{D(2),B(2),C(7)}).unwrap();
    right.insert(sexpr_val!{D(3),B(5),C(7)}).unwrap();

    third.insert(sexpr_val!{D(2),E(2)}).unwrap();

    let join = PeerJoin { left: left.as_ref(), right: right.as_ref() };

    assert_eq!(3, IntoIterator::into_iter(&join).count());
    assert!(join.iter_all().all(|r| **(r.col_ref::<A>()) == **(r.col_ref::<D>())));

    let newjoin = join.join(third.as_ref());
    assert_eq!(1, newjoin.iter_all().count());
} */