use std::rc::{Rc, Weak};
use std::cell::RefCell;
use std::default::Default;
use std::ops::DerefMut;
use std::collections::VecDeque;
use timely::dataflow::operators::{Enter, Map};
use timely::order::PartialOrder;
use timely::dataflow::{Scope, Stream};
use timely::dataflow::operators::generic::{Unary, Operator, source};
use timely::dataflow::channels::pact::{Pipeline, Exchange};
use timely::progress::Timestamp;
use timely::progress::frontier::Antichain;
use timely::dataflow::operators::Capability;
use timely::dataflow::scopes::Child;
use timely_sort::Unsigned;
use ::{Data, Diff, Collection, AsCollection, Hashable};
use lattice::Lattice;
use trace::{Trace, TraceReader, Batch, BatchReader, Batcher, Cursor};
use trace::implementations::ord::OrdValSpine as DefaultValTrace;
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
use trace::wrappers::enter::{TraceEnter, BatchEnter};
use trace::wrappers::rc::TraceBox;
#[derive(Clone,Eq,PartialEq,Debug)]
pub struct BatchWrapper<T> {
pub item: T,
}
impl<T> ::abomonation::Abomonation for BatchWrapper<T> {
unsafe fn entomb<W: ::std::io::Write>(&self, _write: &mut W) -> ::std::io::Result<()> { panic!("BatchWrapper Abomonation impl") }
unsafe fn exhume<'a,'b>(&'a mut self, _bytes: &'b mut [u8]) -> Option<&'b mut [u8]> { panic!("BatchWrapper Abomonation impl") }
}
pub struct TraceWriter<K, V, T, R, Tr>
where T: Lattice+Ord+Clone+'static, Tr: Trace<K,V,T,R>, Tr::Batch: Batch<K,V,T,R> {
phantom: ::std::marker::PhantomData<(K, V, R)>,
trace: Weak<RefCell<TraceBox<K, V, T, R, Tr>>>,
queues: Rc<RefCell<Vec<Weak<RefCell<VecDeque<(Vec<T>, Option<(T, Tr::Batch)>)>>>>>>,
}
impl<K, V, T, R, Tr> TraceWriter<K, V, T, R, Tr>
where T: Lattice+Ord+Clone+'static, Tr: Trace<K,V,T,R>, Tr::Batch: Batch<K,V,T,R> {
pub fn seal(&mut self, frontier: &[T], data: Option<(T, Tr::Batch)>) {
let mut borrow = self.queues.borrow_mut();
for queue in borrow.iter_mut() {
queue.upgrade().map(|queue| {
queue.borrow_mut().push_back((frontier.to_vec(), data.clone()));
});
}
borrow.retain(|w| w.upgrade().is_some());
if let Some((_time, batch)) = data {
if let Some(trace) = self.trace.upgrade() {
trace.borrow_mut().trace.insert(batch);
}
}
}
}
impl<K, V, T, R, Tr> Drop for TraceWriter<K, V, T, R, Tr>
where T: Lattice+Ord+Clone+'static, Tr: Trace<K,V,T,R>, Tr::Batch: Batch<K,V,T,R> {
fn drop(&mut self) {
let mut borrow = self.queues.borrow_mut();
for queue in borrow.iter_mut() {
queue.upgrade().map(|queue| {
queue.borrow_mut().push_back((Vec::new(), None));
});
}
borrow.retain(|w| w.upgrade().is_some());
}
}
pub struct TraceAgent<K, V, T, R, Tr>
where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K,V,T,R> {
phantom: ::std::marker::PhantomData<(K, V, R)>,
trace: Rc<RefCell<TraceBox<K, V, T, R, Tr>>>,
queues: Weak<RefCell<Vec<Weak<RefCell<VecDeque<(Vec<T>, Option<(T, Tr::Batch)>)>>>>>>,
advance: Vec<T>,
through: Vec<T>,
}
impl<K, V, T, R, Tr> TraceReader<K, V, T, R> for TraceAgent<K, V, T, R, Tr>
where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K,V,T,R> {
type Batch = Tr::Batch;
type Cursor = Tr::Cursor;
fn advance_by(&mut self, frontier: &[T]) {
self.trace.borrow_mut().adjust_advance_frontier(&self.advance[..], frontier);
self.advance.clear();
self.advance.extend(frontier.iter().cloned());
}
fn advance_frontier(&mut self) -> &[T] {
&self.advance[..]
}
fn distinguish_since(&mut self, frontier: &[T]) {
self.trace.borrow_mut().adjust_through_frontier(&self.through[..], frontier);
self.through.clear();
self.through.extend(frontier.iter().cloned());
}
fn distinguish_frontier(&mut self) -> &[T] {
&self.through[..]
}
fn cursor_through(&mut self, frontier: &[T]) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<K, V, T, R>>::Storage)> { self.trace.borrow_mut().trace.cursor_through(frontier) }
fn map_batches<F: FnMut(&Self::Batch)>(&mut self, f: F) { self.trace.borrow_mut().trace.map_batches(f) }
}
impl<K, V, T, R, Tr> TraceAgent<K, V, T, R, Tr>
where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K,V,T,R> {
pub fn new(trace: Tr) -> (Self, TraceWriter<K,V,T,R,Tr>) where Tr: Trace<K,V,T,R>, Tr::Batch: Batch<K,V,T,R> {
let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
let queues = Rc::new(RefCell::new(Vec::new()));
let reader = TraceAgent {
phantom: ::std::marker::PhantomData,
trace: trace.clone(),
queues: Rc::downgrade(&queues),
advance: trace.borrow().advance_frontiers.frontier().to_vec(),
through: trace.borrow().through_frontiers.frontier().to_vec(),
};
let writer = TraceWriter {
phantom: ::std::marker::PhantomData,
trace: Rc::downgrade(&trace),
queues: queues,
};
(reader, writer)
}
pub fn new_listener(&mut self) -> Rc<RefCell<VecDeque<(Vec<T>, Option<(T, <Tr as TraceReader<K,V,T,R>>::Batch)>)>>> where T: Default {
let mut new_queue = VecDeque::new();
self.trace.borrow_mut().trace.map_batches(|batch| new_queue.push_back((vec![T::default()], Some((T::default(), batch.clone())))));
let reference = Rc::new(RefCell::new(new_queue));
if let Some(queue) = self.queues.upgrade() {
let mut borrow = queue.borrow_mut();
borrow.push(Rc::downgrade(&reference));
}
else {
reference.borrow_mut().push_back((Vec::new(), None));
}
reference
}
}
impl<K, V, T, R, Tr> TraceAgent<K, V, T, R, Tr>
where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K,V,T,R> {
pub fn import<G: Scope<Timestamp=T>>(&mut self, scope: &G) -> Arranged<G, K, V, R, TraceAgent<K, V, T, R, Tr>> where T: Timestamp {
let queue = self.new_listener();
let collection = source(scope, "ArrangedSource", move |capability| {
let mut capabilities = vec![capability];
move |output| {
let mut borrow = queue.borrow_mut();
while let Some((frontier, sent)) = borrow.pop_front() {
if let Some((time, batch)) = sent {
if let Some(cap) = capabilities.iter().find(|c| c.time().less_equal(&time)) {
let delayed = cap.delayed(&time);
output.session(&delayed).give(BatchWrapper { item: batch });
}
else {
panic!("failed to find capability for {:?} in {:?}", time, capabilities);
}
}
let mut new_capabilities = Vec::new();
for time in frontier.iter() {
if let Some(cap) = capabilities.iter().find(|c| c.time().less_equal(&time)) {
new_capabilities.push(cap.delayed(&time));
}
else {
panic!("failed to find capability for {:?} in {:?}", time, capabilities);
}
}
capabilities = new_capabilities;
}
}
});
Arranged {
stream: collection,
trace: self.clone(),
}
}
}
impl<K, V, T, R, Tr> Clone for TraceAgent<K, V, T, R, Tr>
where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K,V,T,R> {
fn clone(&self) -> Self {
self.trace.borrow_mut().adjust_advance_frontier(&[], &self.advance[..]);
self.trace.borrow_mut().adjust_through_frontier(&[], &self.through[..]);
TraceAgent {
phantom: ::std::marker::PhantomData,
trace: self.trace.clone(),
queues: self.queues.clone(),
advance: self.advance.clone(),
through: self.through.clone(),
}
}
}
impl<K, V, T, R, Tr> Drop for TraceAgent<K, V, T, R, Tr>
where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K,V,T,R> {
fn drop(&mut self) {
self.trace.borrow_mut().adjust_advance_frontier(&self.advance[..], &[]);
self.trace.borrow_mut().adjust_through_frontier(&self.through[..], &[]);
}
}
pub struct Arranged<G: Scope, K, V, R, T> where G::Timestamp: Lattice+Ord, T: TraceReader<K, V, G::Timestamp, R>+Clone {
pub stream: Stream<G, BatchWrapper<T::Batch>>,
pub trace: T,
}
impl<G: Scope, K, V, R, T> Clone for Arranged<G, K, V, R, T>
where G::Timestamp: Lattice+Ord, T: TraceReader<K, V, G::Timestamp, R>+Clone {
fn clone(&self) -> Self {
Arranged {
stream: self.stream.clone(),
trace: self.trace.clone(),
}
}
}
impl<G: Scope, K, V, R, T> Arranged<G, K, V, R, T> where G::Timestamp: Lattice+Ord, T: TraceReader<K, V, G::Timestamp, R>+Clone {
pub fn enter<'a, TInner>(&self, child: &Child<'a, G, TInner>)
-> Arranged<Child<'a, G, TInner>, K, V, R, TraceEnter<K, V, G::Timestamp, R, T, TInner>>
where
T::Batch: Clone,
K: 'static,
V: 'static,
G::Timestamp: Clone+Default+'static,
TInner: Lattice+Timestamp+Clone+Default+'static,
R: 'static {
Arranged {
stream: self.stream.enter(child).map(|bw| BatchWrapper { item: BatchEnter::make_from(bw.item) }),
trace: TraceEnter::make_from(self.trace.clone()),
}
}
pub fn as_collection<D: Data, L>(&self, logic: L) -> Collection<G, D, R>
where
R: Diff,
T::Batch: Clone+'static,
K: Clone, V: Clone,
L: Fn(&K, &V) -> D+'static,
{
self.stream.unary_stream(Pipeline, "AsCollection", move |input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for wrapper in data.drain(..) {
let batch = wrapper.item;
let mut cursor = batch.cursor();
while cursor.key_valid(&batch) {
let key: &K = cursor.key(&batch);
while cursor.val_valid(&batch) {
let val: &V = cursor.val(&batch);
cursor.map_times(&batch, |time, diff| {
session.give((logic(key, val), time.clone(), diff.clone()));
});
cursor.step_val(&batch);
}
cursor.step_key(&batch);
}
}
});
})
.as_collection()
}
}
pub fn query<G: Scope, K, V, R, T>(queries: &Stream<G, (K, G::Timestamp)>, mut trace: T) -> Stream<G, (K, V, G::Timestamp, R)>
where
K: Data+Hashable,
V: Data,
G::Timestamp: Data+Lattice+Ord,
R: Diff,
T: TraceReader<K, V, G::Timestamp, R>+Clone+'static,
{
trace.distinguish_since(&[]);
let mut trace = Some(trace);
let mut stash = ::std::collections::HashMap::new();
let mut frontier = Antichain::new();
let exchange = Exchange::new(move |update: &(K,G::Timestamp)| update.0.hashed().as_u64());
queries.unary_frontier(exchange, "TraceQuery", move |_capability|
move |input, output| {
input.for_each(|capability, data|
stash.entry(capability)
.or_insert(Vec::new())
.extend(data.drain(..).map(|(k,t)| (k,t,1)))
);
if let Some(ref mut trace) = trace {
frontier.clear();
for time in trace.advance_frontier().iter() {
frontier.insert(time.clone());
}
for (capability, prefixes) in stash.iter_mut() {
if !frontier.less_equal(capability.time()) {
let mut session = output.session(capability);
prefixes.sort_by(|x,y| x.0.cmp(&y.0));
let (mut cursor, storage) = trace.cursor();
for &mut (ref key, ref time, ref mut cnt) in prefixes.iter_mut() {
if !frontier.less_equal(time) {
cursor.seek_key(&storage, key);
if cursor.get_key(&storage) == Some(key) {
while let Some(val) = cursor.get_val(&storage) {
let mut count = R::zero();
cursor.map_times(&storage, |t, d| if t.less_equal(time) {
count = count + d;
});
if !count.is_zero() {
session.give((key.clone(), val.clone(), time.clone(), count));
}
cursor.step_val(&storage);
}
}
*cnt = 0;
}
}
prefixes.retain(|ptd| ptd.2 != 0);
}
}
}
stash.retain(|_,prefixes| !prefixes.is_empty());
trace.as_mut().map(|trace| trace.advance_by(input.frontier().frontier()));
if input.frontier().is_empty() && stash.is_empty() {
trace = None;
}
}
)
}
pub trait Arrange<G: Scope, K, V, R: Diff, T>
where
G::Timestamp: Lattice,
T: Trace<K, V, G::Timestamp, R>+'static,
T::Batch: Batch<K, V, G::Timestamp, R>
{
fn arrange(&self, empty_trace: T) -> Arranged<G, K, V, R, TraceAgent<K, V, G::Timestamp, R, T>>;
}
impl<G: Scope, K: Data+Hashable, V: Data, R: Diff, T> Arrange<G, K, V, R, T> for Collection<G, (K, V), R>
where
G::Timestamp: Lattice+Ord,
T: Trace<K, V, G::Timestamp, R>+'static,
T::Batch: Batch<K, V, G::Timestamp, R> {
fn arrange(&self, empty_trace: T) -> Arranged<G, K, V, R, TraceAgent<K, V, G::Timestamp, R, T>> {
let (reader, mut writer) = TraceAgent::new(empty_trace);
let mut batcher = <T::Batch as Batch<K,V,G::Timestamp,R>>::Batcher::new();
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().as_u64());
let stream = self.inner.unary_frontier(exchange, "Arrange", move |_capability|
move |input, output| {
input.for_each(|cap, data| {
capabilities.insert(cap);
batcher.push_batch(data.deref_mut());
});
if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
let mut upper = Antichain::new();
for (index, capability) in capabilities.elements().iter().enumerate() {
if !input.frontier().less_equal(capability.time()) {
upper.clear();
for time in input.frontier().frontier().iter() {
upper.insert(time.clone());
}
for other_capability in &capabilities.elements()[(index + 1) .. ] {
upper.insert(other_capability.time().clone());
}
let batch = batcher.seal(upper.elements());
writer.seal(upper.elements(), Some((capability.time().clone(), batch.clone())));
output.session(&capabilities.elements()[index]).give(BatchWrapper { item: batch });
}
}
let mut new_capabilities = Antichain::new();
for time in batcher.frontier() {
if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
new_capabilities.insert(capability.delayed(time));
}
else {
panic!("failed to find capability");
}
}
capabilities = new_capabilities;
}
writer.seal(input.frontier().frontier(), None);
});
Arranged { stream: stream, trace: reader }
}
}
impl<G: Scope, K: Data+Hashable, R: Diff, T> Arrange<G, K, (), R, T> for Collection<G, K, R>
where
G::Timestamp: Lattice+Ord,
T: Trace<K, (), G::Timestamp, R>+'static,
T::Batch: Batch<K, (), G::Timestamp, R> {
fn arrange(&self, empty_trace: T) -> Arranged<G, K, (), R, TraceAgent<K, (), G::Timestamp, R, T>> {
self.map(|k| (k, ()))
.arrange(empty_trace)
}
}
impl<G, K, V, R, T> Arrange<G, K, V, R, T> for Arranged<G, K, V, R, TraceAgent<K, V, G::Timestamp, R, T>>
where
G: Scope,
G::Timestamp: Lattice,
R: Diff,
T: Trace<K, V, G::Timestamp, R>+Clone+'static,
T::Batch: Batch<K, V, G::Timestamp, R>
{
fn arrange(&self, _: T) -> Arranged<G, K, V, R, TraceAgent<K, V, G::Timestamp, R, T>> {
(*self).clone()
}
}
pub trait ArrangeByKey<G: Scope, K: Data+Hashable, V: Data, R: Diff>
where G::Timestamp: Lattice+Ord {
fn arrange_by_key(&self) -> Arranged<G, K, V, R, TraceAgent<K, V, G::Timestamp, R, DefaultValTrace<K, V, G::Timestamp, R>>>;
}
impl<G: Scope, K: Data+Hashable, V: Data, R: Diff> ArrangeByKey<G, K, V, R> for Collection<G, (K,V), R>
where G::Timestamp: Lattice+Ord {
fn arrange_by_key(&self) -> Arranged<G, K, V, R, TraceAgent<K, V, G::Timestamp, R, DefaultValTrace<K, V, G::Timestamp, R>>> {
self.arrange(DefaultValTrace::new())
}
}
pub trait ArrangeBySelf<G: Scope, K: Data+Hashable, R: Diff>
where G::Timestamp: Lattice+Ord {
fn arrange_by_self(&self) -> Arranged<G, K, (), R, TraceAgent<K, (), G::Timestamp, R, DefaultKeyTrace<K, G::Timestamp, R>>>;
}
impl<G: Scope, K: Data+Hashable, R: Diff> ArrangeBySelf<G, K, R> for Collection<G, K, R>
where G::Timestamp: Lattice+Ord {
fn arrange_by_self(&self) -> Arranged<G, K, (), R, TraceAgent<K, (), G::Timestamp, R, DefaultKeyTrace<K, G::Timestamp, R>>> {
self.map(|k| (k, ()))
.arrange(DefaultKeyTrace::new())
}
}