mem-query 0.0.1

Relational algebra interface for Rust collections
use crate::{
    col::Col,
    query::{
        request::{QueryRequest,NewRequest},
        filter::{FilterForHeader},
        {QueryPlanImpl, QueryPlan},
    },
    record::{Record, Record, ExternalRecord},
    header::{HasCol, ProjectFrom},
    relation::{Relation,RelationImpl,Queryable},
    query::{fallback::FallbackPlanner},
};
use tylisp::{
    sexpr, defun_nocalc, defun, calc,
    typenum as tn,
    engine::Calc,
    ops::{
        Partial, Ret, Phantom, Cond,
        list::{All, Collate, Filter, SetInsert, SupersetP, Contains },
        logic::{Or, And},
    },
};
use std::{
    marker::PhantomData,
};

#[cfg(test)] use {
    tylisp::{ops::list::BuildList},
    crate::query::RealizedQuery,
};

pub struct SubordinateJoin<Parent,Field>(Parent, PhantomData<Field>);

impl<P,F> SubordinateJoin<P,F> {
    pub(super) fn new(parent:P) -> Self where Self: RelationImpl {
        SubordinateJoin(parent, PhantomData)
    }
}

impl<Parent, Field, Child> RelationImpl for SubordinateJoin<Parent,Field> where
    Parent: RelationImpl,
    Parent::Cols: HasCol<Field>,
    Field: Col<Inner=Child>,
    Child: RelationImpl,
    (Parent::Cols, Child::Cols): Record,
{
    type Cols = <(Parent::Cols, Child::Cols) as Record>::Cols;
    type FastCols = Parent::FastCols;
    type Planner = SubJoinPlanner;
}

#[derive(Debug, Default)]
pub struct SubJoinPlanner;
defun_nocalc!{() SubJoinPlanner {
    ('a, F:Col, P:RelationImpl, Req:QueryRequest)
    { _:&'a SubordinateJoin<P,F>, _:Req }
    => {Cond, {{And, {All, {Partial, SupersetP, @P::Cols}, @Req::OrderBy},
                     {All, @CheckFilter<P, F::Inner>, @Req::Filters}},
               {BuildSubJoinPlan, @&'a SubordinateJoin<P,F>, @Req}},
              {tn::True, 
                {FallbackPlanner, @&'a SubordinateJoin<P,F>, @Req}}};
}}

#[derive(Debug,Default)]
pub struct CheckFilter<R1,R2>(R1,R2);
defun_nocalc!{(R1, R2) CheckFilter<R1,R2> {
    (R1:RelationImpl, R2:RelationImpl, Filt) { _:Filt }
    => {Or, {FilterForHeader, @R1::Cols, @Filt},
            {FilterForHeader, @R2::Cols, @Filt}};
}}

#[derive(Debug,Default)]
pub struct BuildSubJoinPlan;
defun!{ BuildSubJoinPlan {
    ('a, P:RelationImpl, F, Req:QueryRequest) { join: &'a SubordinateJoin<P,F>, req:Req } =>
    {BuildSubJoinPlanImpl,
        @&'a SubordinateJoin<P,F> = join,
        {Phantom, @Req},
        {Collate, {Partial, FilterForHeader, @P::Cols},
                  @Req::Filters = req.into_filters() }
    };
}}

#[derive(Copy,Clone,Default)]
pub struct BuildSubJoinPlanImpl;
defun!{ BuildSubJoinPlanImpl {
    ('a, 
        P:RelationImpl,
        F: Col<Inner=C>,
        C:RelationImpl, 
        Req:QueryRequest,
        PFilt, CFilt,
    )
    {join: &'a SubordinateJoin<P,F>, _:PhantomData<Req>, {parent_filters: PFilt, child_filters: CFilt}} =>
    { NewPlan,
      @&'a SubordinateJoin<P, F> = join, 
      { NewRequest, {Phantom, {SetInsert, @F, {Filter, {Partial, Contains, @P::Cols}, @Req::OutputCols}}},
                    @PFilt = parent_filters,
                    {Phantom, @Req::OrderBy}},
      { NewRequest, {Phantom, {Filter, {Partial, Contains, @C::Cols}, @Req::OutputCols}},
                    @CFilt = child_filters,
                    {Phantom, @{}}},
      {Phantom, @Req::OutputCols}};
}}

#[test]
fn test_build_subjoin_plan() {
    col!{A: usize}
    col!{B: usize}
    col!{C: usize}
    col!{CVec: Vec<C>}
    assert_trait!{Vec<C>: RelationImpl}
    assert_trait!{Vec<(A, CVec)>: RelationImpl}
    
    use crate::query::request::*;
    use crate::query::filter::Exact;
    use crate::relation::RelProxy;

    type Join<'a> = SubordinateJoin<RelProxy<&'a Vec<(A, CVec)>>, CVec>;

    let rel: Vec<(A, CVec)> = vec![];

    assert_trait!{Join: RelationImpl}
    let join: Join = SubordinateJoin::new(RelationImpl::as_ref(&rel));

    fn test_blank<'a>(join: &'a Join) {
        let plan = calc!{BuildSubJoinPlan, @&'a Join<'a> = &join, @BlankRequest = BlankRequest};
        let _ = plan.execute();
    }

    fn test_cols<'a>(join: &'a Join) {
        let plan = calc!{BuildSubJoinPlan, @&'a Join<'a> = &join,
                         {NewRequest, {Phantom, @{A,C}}, {BuildList}, {Phantom, @{}}}};
        let _ = plan.execute();
    }

    fn test_filter_a<'a>(join: &'a Join) {
        let plan = calc!{BuildSubJoinPlan, @&'a Join<'a> = &join,
                         {NewRequest, {Phantom, @{A,C}}, {BuildList, @Exact<A> = Exact(A(42))}, {Phantom, @{}}}};
        let _ = plan.execute();
    }

    fn test_filter_c<'a>(join: &'a Join) {
        let plan = calc!{BuildSubJoinPlan, @&'a Join<'a> = &join,
                         {NewRequest, {Phantom, @{A,C}}, {BuildList, @Exact<C> = Exact(C(42))}, {Phantom, @{}}}};
        let _ = plan.execute();
    }

    test_blank(&join);
    test_cols(&join);
    test_filter_a(&join);
    test_filter_c(&join);
}

#[derive(Copy,Clone,Default)]
pub struct NewPlan;
defun!{ NewPlan {
    ('a, P, F, PReq, CReq, OutCols) {
        subjoin: &'a SubordinateJoin<P, F>,
        parent_req: PReq,
        child_req: CReq,
        out_cols: PhantomData<OutCols>
    } => { Ret, @SubJoinPlan<'a, P, F, PReq, CReq, OutCols> = SubJoinPlan { subjoin, parent_req, child_req, out_cols } };
}}

#[test]
fn test_new_plan() {
    col!{A: usize}
    col!{B: usize}
    col!{C: usize}
    col!{CVec: Vec<C>}
    assert_trait!{Vec<C>: RelationImpl}
    assert_trait!{Vec<(A, CVec)>: RelationImpl}
    
    use crate::query::request::*;
    use crate::relation::RelProxy;
    

    type Join<'a> = SubordinateJoin<RelProxy<&'a Vec<(A, CVec)>>, CVec>;

    let rel: Vec<(A, CVec)> = vec![];

    assert_trait!{Join: RelationImpl}
    let join: Join = SubordinateJoin::new(RelationImpl::as_ref(&rel));

    fn test<'a>(join: &'a Join<'a>) {
        let plan = calc!{NewPlan,
            @&'a Join = &join,
            {NewRequest, {Phantom, @{CVec}}, {BuildList}, {Phantom, @{}}},
            @BlankRequest = BlankRequest,
            {Phantom, @{}}
        };
        let _ = plan.execute();
    }

    test(&join);
}


impl<'a, Rel:'a, Req, Parent, Field, PReq, CReq, OutCols> QueryPlanImpl<'a, Rel, Req>
for SubJoinPlan<'a, Parent, Field, PReq, CReq, OutCols>
where
    sexpr!{BuildSubJoinPlan, @&'a Rel = rel, @Req = req}: Calc<sexpr!{(), &'a Rel, Req}, Result = Self>
{
    fn prepare(rel: &'a Rel, req:Req) -> Self {
        calc!{BuildSubJoinPlan, @&'a Rel = rel, @Req = req}
    }
}

pub struct SubJoinPlan<'a, P, F, PReq, CReq, OutCols> {
    subjoin: &'a SubordinateJoin<P, F>,
    parent_req: PReq,
    child_req: CReq,
    out_cols: PhantomData<OutCols>,
}

impl<'a, Parent, Field, Child, PReq, CReq, PPlan, CPlan, OutCols>
IntoIterator for SubJoinPlan<'a, Parent, Field, PReq, CReq, OutCols>
where
    PReq::OutputCols: HasCol<Field>,
    Parent: Queryable<'a, PReq, Plan=PPlan>,
    Field: Col<Inner=Child>,
    Child: Queryable<'a, CReq, Plan=CPlan>,
    (PPlan::Row, CPlan::Row): ExternalRecord<'a>,
    OutCols: ProjectFrom<<(PPlan::Row, CPlan::Row) as Record>::Cols>,
    CReq: QueryRequest + 'a,
    PReq: QueryRequest + 'a,
    PPlan: QueryPlan<'a, Parent, PReq>,
    CPlan: QueryPlan<'a, Child, CReq>,
{
    type Item = crate::record::proj::Projection<(PPlan::Row, CPlan::Row), OutCols>;
    type IntoIter = impl Iterator<Item = Self::Item>;
//    type IntoIter = std::iter::Once<Self::Item>;

    fn into_iter(self) -> Self::IntoIter {
        let child_req = self.child_req;
        self.subjoin.0.query(self.parent_req).flat_map(move |parent_row| {
            let child: &Field = parent_row.ext_col_ref();
            child.as_ref()
                .query(child_req.clone())
                .map(move |child_row| (parent_row.clone(), child_row))
        }).map(Record::project::<OutCols>)
    }
}


impl<'a,Parent,Field,Child:'static> IntoIterator for &'a SubordinateJoin<Parent, Field> where 
    SubordinateJoin<Parent, Field>: RelationImpl,
    Parent: Relation<'a>,
    Parent::Cols: HasCol<Field>,
    Parent::Item: Clone,
    Field: Col<Inner=Child>,
    Child: Relation<'a>,
{
    type Item = (Parent::Item, Child::Item);
//    type IntoIter = Box<dyn 'a + Iterator<Item = Self::Item>>;
    type IntoIter = impl Iterator<Item = Self::Item> + 'a;

    fn into_iter(self)->Self::IntoIter {
        self.0.iter_all()
            .flat_map(move |parent_row| {
                let child: &Field = parent_row.ext_col_ref();
                child.as_ref().iter_all()
                    .map(move |child_row| (parent_row.clone(), child_row))
            })
    }
}