mem-query 0.0.1

Relational algebra interface for Rust collections
use {
    std::collections::{BTreeMap,BTreeSet},
    crate::{
        query::{
            QueryPlanImpl,
            fallback::FallbackPlanner,
            filter::{Exact,QueryFilter},
            request::{QueryRequest, AddFilter},
        },
        col::Col,
        header::{Header,HasCol},
        relation::{Relation,RelationImpl,QueryOutput,Queryable,Insert},
        transaction::{RevertableOp,UndoLog,Transaction,Accessor},
        record::Record,        
    },
    tylisp::{
        engine::Eval, sexpr, defun,
        HNil,
        ops::{Ret, list::SetInsert},
    },
    either::Either,
};

#[derive(Clone)]
pub struct RedundantIndex<K2,K1,Store> {
    index: BTreeMap<K2, BTreeSet<K1>>,
    store: Store
}

impl<K2,K1,Store> Default for RedundantIndex<K2,K1,Store>
where Store:Default, K2:Ord {
    fn default()->Self {
        RedundantIndex {
            index: BTreeMap::new(),
            store: Store::default(),
        }
    }
}

impl<K2,K1,Store,FastCols> RelationImpl for RedundantIndex<K2,K1,Store>
where
    Store: RelationImpl,
    K2:Col + Ord,
    K1:Col + Ord,
    sexpr!{SetInsert, @K2, @Store::FastCols}: Eval<Result = FastCols>,
    FastCols: Header,
{
    type Cols = Store::Cols;
    type FastCols = FastCols;

    type Planner = RedundantPlanner;
}

#[derive(Clone,Copy,Default)]
pub struct RedundantPlanner;
defun!{ RedundantPlanner {
    ('a,K2,K1,Store,Req) { _:&'a RedundantIndex<K2,K1,Store>, _:Req }
    => {Ret, @RedundantQueryPlan<'a,K2,K1,Store,Req>};
}}

impl<'a,K2,K1,Store> QueryOutput<'a> for RedundantIndex<K2,K1,Store>
where Self: RelationImpl<Cols = Store::Cols> + 'a, Store: QueryOutput<'a>
{
    type QueryRow = Store::QueryRow;
}

impl<'a,K2,K1,Store> IntoIterator for &'a RedundantIndex<K2,K1,Store>
where &'a Store: IntoIterator {
    type Item = <&'a Store as IntoIterator>::Item;
    type IntoIter = <&'a Store as IntoIterator>::IntoIter;

    fn into_iter(self)->Self::IntoIter {
        let store: &'a Store = &self.store;
        store.into_iter()
    }
}

pub enum RedundantQueryPlan<'a,K2,K1,Store,Req> {
    IndexPlan(&'a Store, Req, &'a BTreeMap<K2,BTreeSet<K1>>),
    DirectPlan(&'a Store, Req)
}

impl<'a,K2,K1,Store,Req:'a>
    IntoIterator for RedundantQueryPlan<'a,K2,K1,Store,Req>
where
    Store: Queryable<'a, Req>
         + Queryable<'a, AddFilter<Req, Exact<&'a K1>>>,
    Req: QueryRequest<OrderBy = HNil>,
    AddFilter<Req, Exact<&'a K1>>: QueryRequest,
    K1: Col + Ord,
    K2: Col + Ord,
{
    type Item = Store::QueryRow;
    type IntoIter = impl 'a + Iterator<Item = Self::Item>;

    fn into_iter(self)->Self::IntoIter {
        use crate::range::RangeExt;
        use RedundantQueryPlan::*;
        match self {
            DirectPlan(store, req) => Either::Left(store.query(req)),
            IndexPlan(store, req, index) => Either::Right({
                let filters = req.clone().into_filters();
                filters.bounds_of::<K2>().into_verified()
                       .get_from_map(index)
                       .flat_map(move |(_,set)| set.iter())
                       .flat_map(move |k1|
                            store.query(req.clone().add_filter(Exact(k1)))
                       )
            })
        }
    }
}

impl<'a,K2,K1,Store,Req:'a>
    QueryPlanImpl<'a, RedundantIndex<K2,K1,Store>, Req>
    for RedundantQueryPlan<'a,K2,K1,Store,Req>
where
    K2: Col + Ord,
    Req: QueryRequest + 'a,
    Store: RelationImpl,
{
    #[inline(always)]
    fn prepare(rel: &'a RedundantIndex<K2,K1,Store>, req: Req)->Self {
        if <Req::Filters as QueryFilter>::ReqCols::is_disjoint::<Store::FastCols>()
            && <Req::Filters as QueryFilter>::ReqCols::has_col::<K2>() {
            RedundantQueryPlan::IndexPlan(&rel.store, req, &rel.index)
        } else {
            RedundantQueryPlan::DirectPlan(&rel.store, req)
        }
    }
}


impl<K2,K1,Store,H:Header> Insert<H> for RedundantIndex<K2,K1,Store>
where K1: Col + Ord,
      K2: Col + Ord,
      H: HasCol<K1> + HasCol<K2>,
      Store: Insert<H> + Default,
      Self: RelationImpl {
    type Remainder = Store::Remainder;
    type Op = InsertRequest<K2,K1,Store::Op>;

    fn insert_op<FromRec:Record<Cols=H>>(rec:FromRec)
    -> (Self::Op, Self::Remainder) {
        let k1 = rec.col_ref::<K1>().clone();
        let k2 = rec.col_ref::<K2>().clone();
        let (op, remainder) = Store::insert_op(rec);
        (InsertRequest { k2, k1, op }, remainder)
    }
}

pub struct InsertRequest<K2,K1,Op> {
    k2: K2,
    k1: K1,
    op: Op,
}

impl<K2,K1,Store,Op> RevertableOp<RedundantIndex<K2,K1,Store>>
for InsertRequest<K2,K1,Op>
where
    Op: RevertableOp<Store>,
    K2: Col + Ord,
    K1: Col + Ord,
{
    type Err = impl std::error::Error;
    type Log = impl UndoLog<RedundantIndex<K2,K1,Store>>;
    
    fn apply(self, idx: &mut RedundantIndex<K2,K1,Store>)
    ->Result<Self::Log, Self::Err>
    {
        let InsertRequest { k2, k1, op } = self;
        Transaction::start(idx)
            .subtransaction(
                AccessStore,
                move |xact| xact.apply(op)
            )
            .subtransaction(
                AccessIndex,
                move |xact| xact.apply(AddEntry { k2,k1 })
            )
            .into_undo_log()
    }
}

struct AccessStore;
impl<K2,K1,Store> Accessor<RedundantIndex<K2,K1,Store>> for AccessStore {
    type Dest= Store;
    fn access<F,O>(&self, src: &mut RedundantIndex<K2,K1,Store>, op:F)->O
        where F: FnOnce(&mut Self::Dest)->O
    {
        op(&mut src.store)
    }
}

struct AccessIndex;
impl<K2,K1,Store> Accessor<RedundantIndex<K2,K1,Store>> for AccessIndex {
    type Dest= BTreeMap<K2,BTreeSet<K1>>;
    fn access<F,O>(&self, src: &mut RedundantIndex<K2,K1,Store>, op:F)->O
        where F: FnOnce(&mut Self::Dest)->O
    {
        op(&mut src.index)
    }
}

struct AddEntry<K2,K1> {
    k2: K2,
    k1: K1
}

impl<K2:Col + Ord, K1: Col + Ord>
RevertableOp<BTreeMap<K2, BTreeSet<K1>>> for AddEntry<K2,K1> {
    type Err = std::convert::Infallible;
    type Log = RemoveEntry<K2,K1>;
    fn apply(self, map:&mut BTreeMap<K2, BTreeSet<K1>>)
    ->Result<Self::Log, Self::Err> {
        use std::collections::btree_map::Entry::*;
        match map.entry(self.k2.clone()) {
            Vacant(e) => {
                e.insert(std::iter::once(self.k1).collect());
                Ok(RemoveEntry::Vacant(self.k2))
            },
            Occupied(mut e) => {
                if e.get_mut().insert(self.k1.clone()) {
                    Ok(RemoveEntry::Occupied(self.k2, self.k1))
                } else {
                    Ok(RemoveEntry::Duplicate)
                }
            }
        }
    }
}

enum RemoveEntry<K2,K1> {
    Vacant(K2),
    Occupied(K2,K1),
    Duplicate
}

impl<K2,K1> UndoLog<BTreeMap<K2, BTreeSet<K1>>>
for RemoveEntry<K2,K1>
where K2: Col+Ord, K1:Col+Ord {
    fn revert(self, map:&mut BTreeMap<K2, BTreeSet<K1>>) {
        use RemoveEntry::*;
        match self {
            Duplicate => (),
            Vacant(k2) => { map.remove::<K2>(&k2); },
            Occupied(k2,k1) => {
                map.get_mut::<K2>(&k2).and_then(|set| Some(set.remove::<K1>(&k1)));
            }
        }
    }
}

#[cfg(test)] mod tests {
    use super::*;
    use tylisp::sexpr_val;
    
    col!{A:usize}
    col!{B:usize}
    col!{C:usize}

    use crate::relation::{BTreeIndex,SelfQuery};
    use crate::query::request::BlankRequest;

    use std::marker::PhantomData;

    fn test_planner() {
        type IdxType = RedundantIndex<B,A,BTreeIndex<A, Vec<sexpr!{A,B,C}>>>;
        let mut idx: IdxType = Default::default();

        macro_rules! assert_direct {
            ($e:expr) => { match $e {
                RedundantQueryPlan::DirectPlan( .. ) => (),
                _ => panic!()
            }}
        };

        macro_rules! assert_index {
            ($e:expr) => { match $e {
                RedundantQueryPlan::IndexPlan( .. ) => (),
                _ => panic!()
            }}
        };

        let _:PhantomData< <IdxType as RelationImpl >::Cols> = PhantomData;

        assert_direct!(idx.prepare(BlankRequest));
        assert_direct!(idx.prepare(BlankRequest.add_filter(Exact(A(1)))));
        assert_direct!(idx.prepare(BlankRequest.add_filter(Exact(C(1)))));
        assert_direct!(idx.prepare(BlankRequest.add_filter(Exact(B(1)))
                                               .add_filter(Exact(A(1)))));

        assert_index!(idx.prepare(BlankRequest.add_filter(Exact(B(1)))));
        assert_index!(idx.prepare(BlankRequest.add_filter(Exact(C(1)))
                                              .add_filter(Exact(B(1)))));

        idx.insert_multi(vec![
            sexpr_val!{ A(1), B(1), C(1) },
            sexpr_val!{ A(2), B(2), C(2) },
            sexpr_val!{ A(3), B(3), C(3) },
            sexpr_val!{ A(4), B(4), C(4) },
            sexpr_val!{ A(5), B(5), C(5) },
            sexpr_val!{ A(6), B(6), C(6) },
        ]).unwrap();

        assert_eq!( 6, idx.iter_all().count() );
        assert_eq!( 1, idx.as_ref().where_eq(A(1)).iter_all().count() );
        assert_eq!( 1, idx.as_ref().where_eq(B(1)).iter_all().count() );
        assert_eq!( 1, idx.as_ref().where_eq(C(1)).iter_all().count() );
    }
}