use crate::progress::frontier::{AntichainRef, MutableAntichain};
use crate::progress::Timestamp;
use crate::dataflow::operators::Capability;
#[derive(Debug)]
pub struct Notificator<'a, T: Timestamp> {
frontiers: &'a [&'a MutableAntichain<T>],
inner: &'a mut FrontierNotificator<T>,
}
impl<'a, T: Timestamp> Notificator<'a, T> {
pub fn new(
frontiers: &'a [&'a MutableAntichain<T>],
inner: &'a mut FrontierNotificator<T>,
) -> Self {
inner.make_available(frontiers);
Notificator {
frontiers,
inner,
}
}
pub fn frontier(&self, input: usize) -> AntichainRef<'_, T> {
self.frontiers[input].frontier()
}
#[inline]
pub fn notify_at(&mut self, cap: Capability<T>) {
self.inner.notify_at_frontiered(cap, self.frontiers);
}
#[inline]
pub fn for_each<F: FnMut(Capability<T>, u64, &mut Notificator<T>)>(&mut self, mut logic: F) {
while let Some((cap, count)) = self.next() {
logic(cap, count, self);
}
}
}
impl<T: Timestamp> Iterator for Notificator<'_, T> {
type Item = (Capability<T>, u64);
#[inline]
fn next(&mut self) -> Option<(Capability<T>, u64)> {
self.inner.next_count(self.frontiers)
}
}
#[test]
fn notificator_delivers_notifications_in_topo_order() {
use std::rc::Rc;
use std::cell::RefCell;
use crate::progress::ChangeBatch;
use crate::progress::frontier::MutableAntichain;
use crate::order::Product;
use crate::dataflow::operators::capability::Capability;
let mut frontier = MutableAntichain::from_elem(Product::new(0, 0));
let root_capability = Capability::new(Product::new(0,0), Rc::new(RefCell::new(ChangeBatch::new())));
let times = [
Product::new(3, 5),
Product::new(5, 4),
Product::new(1, 2),
Product::new(1, 1),
Product::new(1, 1),
Product::new(5, 4),
Product::new(6, 0),
Product::new(6, 2),
Product::new(5, 8),
];
let mut frontier_notificator = FrontierNotificator::from(times.iter().map(|t| root_capability.delayed(t)));
assert!(frontier_notificator.monotonic(&[&frontier]).next().is_none());
frontier.update_iter(vec![(Product::new(0,0),-1), (Product::new(5,7), 1), (Product::new(6,1), 1)]);
{
let frontiers = [&frontier];
let mut notificator = frontier_notificator.monotonic(&frontiers);
assert_eq!(notificator.next().unwrap().0.time(), &Product::new(1,1));
assert_eq!(notificator.next().unwrap().0.time(), &Product::new(1,2));
assert_eq!(notificator.next().unwrap().0.time(), &Product::new(3,5));
assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5,4));
assert_eq!(notificator.next().unwrap().0.time(), &Product::new(6,0));
assert_eq!(notificator.next(), None);
}
frontier.update_iter(vec![(Product::new(5,7), -1), (Product::new(6,1), -1), (Product::new(6,10), 1)]);
{
let frontiers = [&frontier];
let mut notificator = frontier_notificator.monotonic(&frontiers);
assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5, 8));
notificator.notify_at(root_capability.delayed(&Product::new(5,9)));
assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5,9));
assert_eq!(notificator.next().unwrap().0.time(), &Product::new(6,2));
assert_eq!(notificator.next(), None);
}
}
#[derive(Debug)]
pub struct FrontierNotificator<T: Timestamp> {
pending: Vec<(Capability<T>, u64)>,
available: ::std::collections::BinaryHeap<OrderReversed<T>>,
}
impl<T: Timestamp> Default for FrontierNotificator<T> {
fn default() -> Self {
FrontierNotificator {
pending: Vec::new(),
available: ::std::collections::BinaryHeap::new(),
}
}
}
impl<T: Timestamp> FrontierNotificator<T> {
pub fn from<I: IntoIterator<Item=Capability<T>>>(iter: I) -> Self {
FrontierNotificator {
pending: iter.into_iter().map(|x| (x,1)).collect(),
available: ::std::collections::BinaryHeap::new(),
}
}
#[inline]
pub fn notify_at(&mut self, cap: Capability<T>) {
self.pending.push((cap,1));
}
#[inline]
pub fn notify_at_frontiered<'a>(&mut self, cap: Capability<T>, frontiers: &'a [&'a MutableAntichain<T>]) {
if frontiers.iter().all(|f| !f.less_equal(cap.time())) {
self.available.push(OrderReversed::new(cap, 1));
}
else {
self.pending.push((cap,1));
}
}
pub fn make_available<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) {
if !self.pending.is_empty() {
self.pending.sort_unstable_by(|x,y| x.0.time().cmp(y.0.time()));
for i in 0 .. self.pending.len() - 1 {
if self.pending[i].0.time() == self.pending[i+1].0.time() {
self.pending[i+1].1 += self.pending[i].1;
self.pending[i].1 = 0;
}
}
self.pending.retain(|x| x.1 > 0);
for i in 0 .. self.pending.len() {
if frontiers.iter().all(|f| !f.less_equal(&self.pending[i].0)) {
self.available.push(OrderReversed::new(self.pending[i].0.clone(), self.pending[i].1));
self.pending[i].1 = 0;
}
}
self.pending.retain(|x| x.1 > 0);
}
}
#[inline]
pub fn next_count<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Option<(Capability<T>, u64)> {
if self.available.is_empty() {
self.make_available(frontiers);
}
self.available.pop().map(|front| {
let mut count = front.value;
while self.available.peek() == Some(&front) {
count += self.available.pop().unwrap().value;
}
(front.element, count)
})
}
#[inline]
pub fn next<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Option<Capability<T>> {
self.next_count(frontiers).map(|(cap, _)| cap)
}
#[inline]
pub fn for_each<'a, F: FnMut(Capability<T>, &mut FrontierNotificator<T>)>(&mut self, frontiers: &'a [&'a MutableAntichain<T>], mut logic: F) {
self.make_available(frontiers);
while let Some(cap) = self.next(frontiers) {
logic(cap, self);
}
}
#[inline]
pub fn monotonic<'a>(&'a mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Notificator<'a, T> {
Notificator::new(frontiers, self)
}
pub fn pending(&self) -> ::std::slice::Iter<'_, (Capability<T>, u64)> {
self.pending.iter()
}
}
#[derive(Debug, PartialEq, Eq)]
struct OrderReversed<T: Timestamp> {
element: Capability<T>,
value: u64,
}
impl<T: Timestamp> OrderReversed<T> {
fn new(element: Capability<T>, value: u64) -> Self { OrderReversed { element, value} }
}
impl<T: Timestamp> PartialOrd for OrderReversed<T> {
fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<T: Timestamp> Ord for OrderReversed<T> {
fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
other.element.time().cmp(self.element.time())
}
}