use std::collections::VecDeque;
use progress::frontier::MutableAntichain;
use progress::Timestamp;
use dataflow::operators::Capability;
pub struct Notificator<'a, T: Timestamp> {
frontiers: &'a [&'a MutableAntichain<T>],
inner: &'a mut FrontierNotificator<T>,
}
impl<'a, T: Timestamp> Notificator<'a, T> {
pub fn new(
frontiers: &'a [&'a MutableAntichain<T>],
inner: &'a mut FrontierNotificator<T>) -> Notificator<'a, T> {
Notificator {
frontiers: frontiers,
inner: inner,
}
}
pub fn frontier(&self, input: usize) -> &[T] {
self.frontiers[input].frontier()
}
#[inline]
pub fn notify_at(&mut self, cap: Capability<T>) {
self.inner.notify_at(cap);
}
#[inline]
pub fn for_each<F: FnMut(Capability<T>, u64, &mut Notificator<T>)>(&mut self, mut logic: F) {
while let Some((cap, count)) = self.next() {
::logging::log(&::logging::GUARDED_PROGRESS, true);
logic(cap, count, self);
::logging::log(&::logging::GUARDED_PROGRESS, false);
}
}
}
impl<'a, T: Timestamp> Iterator for Notificator<'a, T> {
type Item = (Capability<T>, u64);
#[inline]
fn next(&mut self) -> Option<(Capability<T>, u64)> {
self.inner.next(self.frontiers).map(|x| (x,1))
}
}
trait DrainIntoIf<T> {
fn drain_into_if<P>(&mut self, target: &mut Vec<T>, p: P) -> () where P: FnMut(&T) -> bool;
}
impl<T> DrainIntoIf<T> for Vec<T> {
fn drain_into_if<P>(&mut self, target: &mut Vec<T>, mut p: P) -> () where P: FnMut(&T) -> bool {
let mut i = 0;
while i < self.len() {
let matches = {
let v = &mut **self;
p(&v[i])
};
if matches {
target.push(self.swap_remove(i));
} else {
i += 1;
}
}
}
}
#[test]
fn drain_into_if_behaves_correctly() {
let mut v = vec![3, 10, 4, 5, 13, 7, 2, 1];
let mut v1 = Vec::new();
v.drain_into_if(&mut v1, |x| x >= &5);
v.sort();
v1.sort();
assert!(v == vec![1, 2, 3, 4]);
assert!(v1 == vec![5, 7, 10, 13]);
}
#[test]
fn notificator_delivers_notifications_in_topo_order() {
use std::rc::Rc;
use std::cell::RefCell;
use progress::ChangeBatch;
use progress::frontier::MutableAntichain;
use progress::nested::product::Product;
use dataflow::operators::capability::mint as mint_capability;
let mut frontier_notificator = FrontierNotificator::new();
let mut frontier = MutableAntichain::new_bottom(Product::new(0, 0));
let internal_changes = Rc::new(RefCell::new(ChangeBatch::new()));
let times = vec![
Product::new(3, 5),
Product::new(5, 4),
Product::new(1, 2),
Product::new(1, 1),
Product::new(1, 1),
Product::new(5, 4),
Product::new(6, 0),
Product::new(5, 8),
].into_iter().map(|ts| mint_capability(ts, internal_changes.clone()));
for t in times {
frontier_notificator.notify_at(t);
}
fn check_notifications<T: Timestamp>(
notificator: &mut Notificator<T>,
expected_counts: Vec<T>) {
let mut notified = notificator.by_ref().map(|(t, _)| t.time().clone()).collect::<Vec<_>>();
notified.sort();
assert_eq!(notified, expected_counts);
}
frontier.update_iter(vec![(Product::new(0,0),-1), (Product::new(5,7), 1), (Product::new(6,0), 1)]);
check_notifications(&mut Notificator::new(&[&frontier], &mut frontier_notificator), vec![
Product::new(1, 1),
Product::new(1, 2),
Product::new(3, 5),
Product::new(5, 4),
]);
frontier.update_iter(vec![(Product::new(5,7), -1), (Product::new(6,0), -1), (Product::new(6,10), 1)]);
check_notifications(&mut Notificator::new(&[&frontier], &mut frontier_notificator), vec![
Product::new(5, 8),
Product::new(6, 0),
]);
}
pub struct FrontierNotificator<T: Timestamp> {
pending: Vec<(Capability<T>, u64)>,
available: VecDeque<Capability<T>>,
}
impl<T: Timestamp> FrontierNotificator<T> {
pub fn new() -> FrontierNotificator<T> {
FrontierNotificator {
pending: Vec::new(),
available: VecDeque::new(),
}
}
#[inline]
pub fn notify_at(&mut self, cap: Capability<T>) {
self.pending.push((cap, 1));
}
pub fn drain<'a>(&'a mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> ::std::collections::vec_deque::Drain<'a, Capability<T>> {
self.make_available(frontiers);
self.available.drain(..)
}
fn make_available<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) {
if !self.pending.is_empty() {
self.pending.sort_by(|x,y| x.0.time().cmp(y.0.time()));
for i in 0 .. self.pending.len() - 1 {
if self.pending[i].0.time() == self.pending[i+1].0.time() {
self.pending[i+1].1 += self.pending[i].1;
self.pending[i].1 = 0;
}
}
self.pending.retain(|x| x.1 > 0);
for i in 0 .. self.pending.len() {
if frontiers.iter().all(|f| !f.less_equal(&self.pending[i].0)) {
self.available.push_back(self.pending[i].0.clone());
self.pending[i].1 = 0;
}
}
self.pending.retain(|x| x.1 > 0);
}
}
#[inline]
fn next<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Option<Capability<T>> {
if self.available.is_empty() {
self.make_available(frontiers);
}
self.available.pop_front()
}
#[inline]
pub fn for_each<'a, F: FnMut(Capability<T>, &mut FrontierNotificator<T>)>(&mut self, frontiers: &'a [&'a MutableAntichain<T>], mut logic: F) {
self.make_available(frontiers);
while let Some(cap) = self.available.pop_front() {
::logging::log(&::logging::GUARDED_PROGRESS, true);
logic(cap, self);
::logging::log(&::logging::GUARDED_PROGRESS, false);
}
}
}