use std::rc::Rc;
use std::cell::RefCell;
use timely::progress::frontier::MutableAntichain;
use lattice::Lattice;
use trace::TraceReader;
use trace::cursor::Cursor;
pub struct TraceBox<K, V, T, R, Tr>
where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K,V,T,R> {
phantom: ::std::marker::PhantomData<(K, V, R)>,
pub advance_frontiers: MutableAntichain<T>,
pub through_frontiers: MutableAntichain<T>,
pub trace: Tr,
}
impl<K,V,T,R,Tr> TraceBox<K,V,T,R,Tr>
where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K,V,T,R> {
pub fn new(mut trace: Tr) -> Self {
let mut advance = MutableAntichain::new();
advance.update_iter(trace.advance_frontier().iter().cloned().map(|t| (t,1)));
let mut through = MutableAntichain::new();
through.update_iter(trace.distinguish_frontier().iter().cloned().map(|t| (t,1)));
TraceBox {
phantom: ::std::marker::PhantomData,
advance_frontiers: advance,
through_frontiers: through,
trace: trace,
}
}
pub fn adjust_advance_frontier(&mut self, lower: &[T], upper: &[T]) {
self.advance_frontiers.update_iter(upper.iter().cloned().map(|t| (t,1)));
self.advance_frontiers.update_iter(lower.iter().cloned().map(|t| (t,-1)));
self.trace.advance_by(&self.advance_frontiers.frontier());
}
pub fn adjust_through_frontier(&mut self, lower: &[T], upper: &[T]) {
self.through_frontiers.update_iter(upper.iter().cloned().map(|t| (t,1)));
self.through_frontiers.update_iter(lower.iter().cloned().map(|t| (t,-1)));
self.trace.distinguish_since(&self.through_frontiers.frontier());
}
}
pub struct TraceRc<K,V,T,R,Tr> where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K,V,T,R> {
advance_frontier: Vec<T>,
through_frontier: Vec<T>,
pub wrapper: Rc<RefCell<TraceBox<K,V,T,R,Tr>>>,
}
impl<K,V,T,R,Tr> TraceReader<K, V, T, R> for TraceRc<K,V,T,R,Tr> where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K,V,T,R> {
type Batch = Tr::Batch;
type Cursor = Tr::Cursor;
fn advance_by(&mut self, frontier: &[T]) {
self.wrapper.borrow_mut().adjust_advance_frontier(&self.advance_frontier[..], frontier);
self.advance_frontier = frontier.to_vec();
}
fn advance_frontier(&mut self) -> &[T] { &self.advance_frontier[..] }
fn distinguish_since(&mut self, frontier: &[T]) {
self.wrapper.borrow_mut().adjust_through_frontier(&self.through_frontier[..], frontier);
self.through_frontier = frontier.to_vec();
}
fn distinguish_frontier(&mut self) -> &[T] { &self.through_frontier[..] }
fn cursor_through(&mut self, frontier: &[T]) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<K, V, T, R>>::Storage)> {
::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier)
}
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, f: F) {
::std::cell::RefCell::borrow_mut(&self.wrapper).trace.map_batches(f)
}
}
impl<K,V,T,R,Tr> TraceRc<K,V,T,R,Tr> where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K,V,T,R> {
pub fn make_from(trace: Tr) -> (Self, Rc<RefCell<TraceBox<K,V,T,R,Tr>>>) {
let wrapped = Rc::new(RefCell::new(TraceBox::new(trace)));
let handle = TraceRc {
advance_frontier: wrapped.borrow().advance_frontiers.frontier().to_vec(),
through_frontier: wrapped.borrow().through_frontiers.frontier().to_vec(),
wrapper: wrapped.clone(),
};
(handle, wrapped)
}
}
impl<K, V, T: Lattice+Ord+Clone, R, Tr> Clone for TraceRc<K, V, T, R, Tr> where Tr: TraceReader<K, V, T, R> {
fn clone(&self) -> Self {
self.wrapper.borrow_mut().adjust_advance_frontier(&[], &self.advance_frontier[..]);
self.wrapper.borrow_mut().adjust_through_frontier(&[], &self.through_frontier[..]);
TraceRc {
advance_frontier: self.advance_frontier.clone(),
through_frontier: self.through_frontier.clone(),
wrapper: self.wrapper.clone(),
}
}
}
impl<K, V, T, R, Tr> Drop for TraceRc<K, V, T, R, Tr>
where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K, V, T, R> {
fn drop(&mut self) {
self.wrapper.borrow_mut().adjust_advance_frontier(&self.advance_frontier[..], &[]);
self.wrapper.borrow_mut().adjust_through_frontier(&self.through_frontier[..], &[]);
self.advance_frontier = Vec::new();
self.through_frontier = Vec::new();
}
}