use std::rc::Rc;
use std::cell::RefCell;
use timely::progress::frontier::MutableAntichain;
use lattice::Lattice;
use trace::TraceReader;
use trace::cursor::Cursor;
pub struct TraceBox<Tr>
where
Tr::Time: Lattice+Ord+Clone+'static,
Tr: TraceReader
{
pub advance_frontiers: MutableAntichain<Tr::Time>,
pub through_frontiers: MutableAntichain<Tr::Time>,
pub trace: Tr,
}
impl<Tr> TraceBox<Tr>
where
Tr::Time: Lattice+Ord+Clone+'static,
Tr: TraceReader,
{
pub fn new(mut trace: Tr) -> Self {
let mut advance = MutableAntichain::new();
advance.update_iter(trace.advance_frontier().iter().cloned().map(|t| (t,1)));
let mut through = MutableAntichain::new();
through.update_iter(trace.distinguish_frontier().iter().cloned().map(|t| (t,1)));
TraceBox {
advance_frontiers: advance,
through_frontiers: through,
trace: trace,
}
}
pub fn adjust_advance_frontier(&mut self, lower: &[Tr::Time], upper: &[Tr::Time]) {
self.advance_frontiers.update_iter(upper.iter().cloned().map(|t| (t,1)));
self.advance_frontiers.update_iter(lower.iter().cloned().map(|t| (t,-1)));
self.trace.advance_by(&self.advance_frontiers.frontier());
}
pub fn adjust_through_frontier(&mut self, lower: &[Tr::Time], upper: &[Tr::Time]) {
self.through_frontiers.update_iter(upper.iter().cloned().map(|t| (t,1)));
self.through_frontiers.update_iter(lower.iter().cloned().map(|t| (t,-1)));
self.trace.distinguish_since(&self.through_frontiers.frontier());
}
}
pub struct TraceRc<Tr>
where
Tr::Time: Lattice+Ord+Clone+'static,
Tr: TraceReader,
{
advance_frontier: Vec<Tr::Time>,
through_frontier: Vec<Tr::Time>,
pub wrapper: Rc<RefCell<TraceBox<Tr>>>,
}
impl<Tr> TraceReader for TraceRc<Tr>
where
Tr::Time: Lattice+Ord+Clone+'static,
Tr: TraceReader,
{
type Key = Tr::Key;
type Val = Tr::Val;
type Time = Tr::Time;
type R = Tr::R;
type Batch = Tr::Batch;
type Cursor = Tr::Cursor;
fn advance_by(&mut self, frontier: &[Tr::Time]) {
self.wrapper.borrow_mut().adjust_advance_frontier(&self.advance_frontier[..], frontier);
self.advance_frontier = frontier.to_vec();
}
fn advance_frontier(&mut self) -> &[Tr::Time] { &self.advance_frontier[..] }
fn distinguish_since(&mut self, frontier: &[Tr::Time]) {
self.wrapper.borrow_mut().adjust_through_frontier(&self.through_frontier[..], frontier);
self.through_frontier = frontier.to_vec();
}
fn distinguish_frontier(&mut self) -> &[Tr::Time] { &self.through_frontier[..] }
fn cursor_through(&mut self, frontier: &[Tr::Time]) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)> {
::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier)
}
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, f: F) {
::std::cell::RefCell::borrow_mut(&self.wrapper).trace.map_batches(f)
}
}
impl<Tr> TraceRc<Tr>
where
Tr::Time: Lattice+Ord+Clone+'static,
Tr: TraceReader,
{
pub fn make_from(trace: Tr) -> (Self, Rc<RefCell<TraceBox<Tr>>>) {
let wrapped = Rc::new(RefCell::new(TraceBox::new(trace)));
let handle = TraceRc {
advance_frontier: wrapped.borrow().advance_frontiers.frontier().to_vec(),
through_frontier: wrapped.borrow().through_frontiers.frontier().to_vec(),
wrapper: wrapped.clone(),
};
(handle, wrapped)
}
}
impl<Tr> Clone for TraceRc<Tr>
where
Tr::Time: Lattice+Ord+Clone,
Tr: TraceReader,
{
fn clone(&self) -> Self {
self.wrapper.borrow_mut().adjust_advance_frontier(&[], &self.advance_frontier[..]);
self.wrapper.borrow_mut().adjust_through_frontier(&[], &self.through_frontier[..]);
TraceRc {
advance_frontier: self.advance_frontier.clone(),
through_frontier: self.through_frontier.clone(),
wrapper: self.wrapper.clone(),
}
}
}
impl<Tr> Drop for TraceRc<Tr>
where
Tr::Time: Lattice+Ord+Clone+'static,
Tr: TraceReader,
{
fn drop(&mut self) {
self.wrapper.borrow_mut().adjust_advance_frontier(&self.advance_frontier[..], &[]);
self.wrapper.borrow_mut().adjust_through_frontier(&self.through_frontier[..], &[]);
self.advance_frontier = Vec::new();
self.through_frontier = Vec::new();
}
}