use std::time::Duration;
use abomonation_derive::Abomonation;
use serde::{Deserialize, Serialize};
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Abomonation)]
pub enum Message<D, T, R> {
Updates(Vec<(D, T, R)>),
Progress(Progress<T>),
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Abomonation)]
pub struct Progress<T> {
pub lower: Vec<T>,
pub upper: Vec<T>,
pub counts: Vec<(T, usize)>,
}
pub struct YieldingIter<I> {
start: Option<std::time::Instant>,
after: std::time::Duration,
iter: I,
}
impl<I> YieldingIter<I> {
pub fn new_from(iter: I, yield_after: std::time::Duration) -> Self {
Self {
start: None,
after: yield_after,
iter,
}
}
}
impl<I: Iterator> Iterator for YieldingIter<I> {
type Item = I::Item;
fn next(&mut self) -> Option<Self::Item> {
if self.start.is_none() {
self.start = Some(std::time::Instant::now());
}
let start = self.start.as_ref().unwrap();
if start.elapsed() > self.after {
self.start = None;
None
} else {
match self.iter.next() {
Some(x) => Some(x),
None => {
self.start = None;
None
}
}
}
}
}
pub trait Writer<T> {
fn poll(&mut self, item: &T) -> Option<Duration>;
fn done(&self) -> bool;
}
pub mod iterator {
use super::{Message, Progress};
use crate::lattice::Lattice;
use std::hash::Hash;
use timely::order::PartialOrder;
use timely::progress::{
frontier::{AntichainRef, MutableAntichain},
Antichain,
Timestamp,
};
pub struct Iter<I, D, T, R>
where
I: Iterator<Item = Message<D, T, R>>,
T: Hash + Ord + Lattice + Clone,
D: Hash + Eq,
T: Hash + Eq,
R: Hash + Eq,
{
iterator: I,
updates: std::collections::HashSet<(D, T, R)>,
reported_frontier: Antichain<T>,
progress_frontier: Antichain<T>,
messages_frontier: MutableAntichain<T>,
progress_queue: Vec<Progress<T>>,
}
impl<D, T, R, I> Iterator for Iter<I, D, T, R>
where
I: Iterator<Item = Message<D, T, R>>,
T: Hash + Ord + Lattice + Clone,
D: Hash + Eq + Clone,
R: Hash + Eq + Clone,
{
type Item = (Vec<(D, T, R)>, Antichain<T>);
fn next(&mut self) -> Option<Self::Item> {
while let Some(message) = self.iterator.next() {
match message {
Message::Updates(mut updates) => {
updates.retain(|dtr| {
self.reported_frontier.less_equal(&dtr.1) && !self.updates.contains(dtr)
});
self.messages_frontier
.update_iter(updates.iter().map(|(_, t, _)| (t.clone(), -1)));
self.updates.extend(updates.into_iter());
}
Message::Progress(progress) => {
self.progress_queue.push(progress);
}
}
while let Some(position) = self.progress_queue.iter().position(|p| {
<_ as PartialOrder>::less_equal(
&AntichainRef::new(&p.lower),
&self.progress_frontier.borrow(),
)
}) {
let mut progress = self.progress_queue.remove(position);
progress
.counts
.retain(|(time, _count)| self.progress_frontier.less_equal(time));
self.messages_frontier
.update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
let mut new_frontier = Antichain::new();
for time1 in progress.upper {
for time2 in self.progress_frontier.elements() {
new_frontier.insert(time1.join(time2));
}
}
self.progress_queue.retain(|p| {
!<_ as PartialOrder>::less_equal(
&AntichainRef::new(&p.upper),
&new_frontier.borrow(),
)
});
self.progress_frontier = new_frontier;
}
let mut lower_bound = self.progress_frontier.clone();
lower_bound.extend(self.messages_frontier.frontier().iter().cloned());
if lower_bound != self.reported_frontier {
let to_publish = self
.updates
.iter()
.filter(|(_, t, _)| !lower_bound.less_equal(t))
.cloned()
.collect::<Vec<_>>();
self.updates.retain(|(_, t, _)| lower_bound.less_equal(t));
self.reported_frontier = lower_bound.clone();
return Some((to_publish, lower_bound));
}
}
None
}
}
impl<D, T, R, I> Iter<I, D, T, R>
where
I: Iterator<Item = Message<D, T, R>>,
T: Hash + Ord + Lattice + Clone + Timestamp,
D: Hash + Eq + Clone,
R: Hash + Eq + Clone,
{
pub fn new(iterator: I) -> Self {
Self {
iterator,
updates: std::collections::HashSet::new(),
reported_frontier: Antichain::from_elem(T::minimum()),
progress_frontier: Antichain::from_elem(T::minimum()),
messages_frontier: MutableAntichain::new(),
progress_queue: Vec::new(),
}
}
}
}
pub mod source {
use super::{Message, Progress};
use crate::{lattice::Lattice, ExchangeData};
use std::cell::RefCell;
use std::hash::Hash;
use std::rc::Rc;
use std::marker::{Send, Sync};
use std::sync::Arc;
use timely::dataflow::{Scope, Stream, operators::{Capability, CapabilitySet}};
use timely::progress::Timestamp;
use timely::scheduling::SyncActivator;
struct DropActivator {
activator: Arc<SyncActivator>,
}
impl Drop for DropActivator {
fn drop(&mut self) {
let _ = self.activator.activate();
}
}
pub fn build<G, B, I, D, T, R>(
scope: G,
source_builder: B,
) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
where
G: Scope<Timestamp = T>,
B: FnOnce(SyncActivator) -> I,
I: Iterator<Item = Message<D, T, R>> + 'static,
D: ExchangeData + Hash,
T: ExchangeData + Hash + Timestamp + Lattice,
R: ExchangeData + Hash,
{
use crate::hashable::Hashable;
use timely::dataflow::channels::pact::Exchange;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::progress::frontier::MutableAntichain;
use timely::progress::ChangeBatch;
let workers = scope.peers();
let mut token = None;
let mut antichain = MutableAntichain::new();
antichain.update_iter(Some((T::minimum(), workers as i64)));
let shared_frontier = Rc::new(RefCell::new(antichain));
let shared_frontier2 = shared_frontier.clone();
let mut messages_op = OperatorBuilder::new("CDCV2_Messages".to_string(), scope.clone());
let address = messages_op.operator_info().address;
let activator = scope.sync_activator_for(&address);
let activator2 = scope.activator_for(&address);
let drop_activator = DropActivator { activator: Arc::new(scope.sync_activator_for(&address)) };
let mut source = source_builder(activator);
let (mut updates_out, updates) = messages_op.new_output();
let (mut progress_out, progress) = messages_op.new_output();
messages_op.build(|capabilities| {
let drop_activator_weak = Arc::downgrade(&drop_activator.activator);
token = Some(drop_activator);
let mut updates_caps = CapabilitySet::from_elem(capabilities[0].clone());
let mut progress_caps = CapabilitySet::from_elem(capabilities[1].clone());
let local_frontier = shared_frontier.clone();
move |_frontiers| {
if drop_activator_weak.upgrade().is_none() {
updates_caps.downgrade(&[]);
progress_caps.downgrade(&[]);
return;
}
let shared_frontier = local_frontier.borrow();
updates_caps.downgrade(&shared_frontier.frontier());
progress_caps.downgrade(&shared_frontier.frontier());
if !updates_caps.is_empty() && !progress_caps.is_empty() {
let mut updates = updates_out.activate();
let mut progress = progress_out.activate();
let mut updates_session = updates.session(&updates_caps[0]);
let mut progress_session = progress.session(&progress_caps[0]);
for message in source.by_ref() {
match message {
Message::Updates(mut updates) => {
updates_session.give_vec(&mut updates);
}
Message::Progress(progress) => {
let mut to_worker = vec![Vec::new(); workers];
for (time, count) in progress.counts {
to_worker[(time.hashed() as usize) % workers]
.push((time, count));
}
for (worker, counts) in to_worker.into_iter().enumerate() {
progress_session.give((
worker,
Progress {
lower: progress.lower.clone(),
upper: progress.upper.clone(),
counts,
},
));
}
}
}
}
}
}
});
let mut updates_op = OperatorBuilder::new("CDCV2_Updates".to_string(), scope.clone());
let mut input = updates_op.new_input(&updates, Exchange::new(|x: &(D, T, R)| x.hashed()));
let (mut changes_out, changes) = updates_op.new_output();
let (mut counts_out, counts) = updates_op.new_output();
updates_op.build(move |_capability| {
let mut pending = std::collections::HashMap::new();
let mut change_batch = ChangeBatch::<T>::new();
move |frontiers| {
pending.retain(|(_row, time), _diff| frontiers[0].less_equal(time));
let mut changes = changes_out.activate();
let mut counts = counts_out.activate();
while let Some((capability, updates)) = input.next() {
let mut changes_session = changes.session(&capability);
let mut counts_session = counts.session(&capability);
for (data, time, diff) in updates.iter() {
if frontiers[0].less_equal(time) {
if let Some(prior) = pending.insert((data.clone(), time.clone()), diff.clone()) {
assert_eq!(&prior, diff);
} else {
change_batch.update(time.clone(), -1);
changes_session.give((data.clone(), time.clone(), diff.clone()));
}
}
}
if !change_batch.is_empty() {
counts_session.give_iterator(change_batch.drain());
}
}
}
});
let mut progress_op = OperatorBuilder::new("CDCV2_Progress".to_string(), scope.clone());
let mut input = progress_op.new_input(
&progress,
Exchange::new(|x: &(usize, Progress<T>)| x.0 as u64),
);
let mut counts =
progress_op.new_input(&counts, Exchange::new(|x: &(T, i64)| (x.0).hashed()));
let (mut frontier_out, frontier) = progress_op.new_output();
progress_op.build(move |_capability| {
use timely::order::PartialOrder;
use timely::progress::{frontier::AntichainRef, Antichain};
let mut progress_queue = Vec::new();
let mut progress_frontier = Antichain::from_elem(T::minimum());
let mut updates_frontier = MutableAntichain::new();
let mut reported_frontier = Antichain::from_elem(T::minimum());
move |_frontiers| {
let mut frontier = frontier_out.activate();
let mut capability: Option<Capability<T>> = None;
while let Some((cap, counts)) = counts.next() {
updates_frontier.update_iter(counts.iter().cloned());
capability = Some(cap.retain());
}
while let Some((cap, progress)) = input.next() {
progress_queue.extend(progress.iter().map(|x| (x.1).clone()));
capability = Some(cap.retain());
}
while let Some(position) = progress_queue.iter().position(|p| {
<_ as PartialOrder>::less_equal(
&AntichainRef::new(&p.lower),
&progress_frontier.borrow(),
)
}) {
let mut progress = progress_queue.remove(position);
progress
.counts
.retain(|(time, _count)| progress_frontier.less_equal(time));
updates_frontier
.update_iter(progress.counts.drain(..).map(|(t, c)| (t, c as i64)));
let mut new_frontier = Antichain::new();
for time1 in progress.upper {
for time2 in progress_frontier.elements() {
new_frontier.insert(time1.join(time2));
}
}
progress_frontier = new_frontier;
}
let mut lower_bound = progress_frontier.clone();
lower_bound.extend(updates_frontier.frontier().iter().cloned());
if lower_bound != reported_frontier {
let capability =
capability.expect("Changes occurred, without surfacing a capability");
let mut changes = ChangeBatch::new();
changes.extend(lower_bound.elements().iter().map(|t| (t.clone(), 1)));
changes.extend(reported_frontier.elements().iter().map(|t| (t.clone(), -1)));
let mut frontier_session = frontier.session(&capability);
for peer in 0..workers {
frontier_session.give((peer, changes.clone()));
}
reported_frontier = lower_bound.clone();
}
}
});
let mut feedback_op = OperatorBuilder::new("CDCV2_Feedback".to_string(), scope.clone());
let mut input = feedback_op.new_input(
&frontier,
Exchange::new(|x: &(usize, ChangeBatch<T>)| x.0 as u64),
);
feedback_op.build(move |_capability| {
move |_frontiers| {
let mut antichain = shared_frontier2.borrow_mut();
let mut must_activate = false;
while let Some((_cap, frontier_changes)) = input.next() {
for (_self, input_changes) in frontier_changes.iter() {
if antichain.update_iter(input_changes.unstable_internal_updates().iter().cloned()).next().is_some() {
must_activate = true;
}
}
}
if must_activate { activator2.activate(); }
}
});
(Box::new(token.unwrap()), changes)
}
}
pub mod sink {
use std::hash::Hash;
use std::cell::RefCell;
use std::rc::Weak;
use serde::{Deserialize, Serialize};
use timely::order::PartialOrder;
use timely::progress::{Antichain, ChangeBatch, Timestamp};
use timely::dataflow::{Scope, Stream};
use timely::dataflow::channels::pact::{Exchange, Pipeline};
use timely::dataflow::operators::generic::{FrontieredInputHandle, builder_rc::OperatorBuilder};
use crate::{lattice::Lattice, ExchangeData};
use super::{Writer, Message, Progress};
pub fn build<G, BS, D, T, R>(
stream: &Stream<G, (D, T, R)>,
sink_hash: u64,
updates_sink: Weak<RefCell<BS>>,
progress_sink: Weak<RefCell<BS>>,
) where
G: Scope<Timestamp = T>,
BS: Writer<Message<D,T,R>> + 'static,
D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice,
R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
{
let mut builder = OperatorBuilder::new("UpdatesWriter".to_owned(), stream.scope());
let reactivator = stream.scope().activator_for(&builder.operator_info().address);
let mut input = builder.new_input(stream, Pipeline);
let (mut updates_out, updates) = builder.new_output();
builder.build_reschedule(
move |_capability| {
let mut timestamps = ChangeBatch::new();
let mut send_queue = std::collections::VecDeque::new();
move |_frontiers| {
let mut output = updates_out.activate();
input.for_each(|capability, updates| {
for (_data, time, _diff) in updates.iter() {
timestamps.update(time.clone(), 1);
}
send_queue.push_back(Message::Updates(updates.replace(Vec::new())));
output
.session(&capability)
.give_iterator(timestamps.drain());
});
if let Some(sink) = updates_sink.upgrade() {
let mut sink = sink.borrow_mut();
while let Some(message) = send_queue.front() {
if let Some(duration) = sink.poll(message) {
reactivator.activate_after(duration);
return true;
} else {
send_queue.pop_front();
}
}
!sink.done() || !send_queue.is_empty()
} else {
send_queue.clear();
false
}
}
},
);
let mut builder = OperatorBuilder::new("ProgressWriter".to_owned(), stream.scope());
let reactivator = stream.scope().activator_for(&builder.operator_info().address);
let mut input = builder.new_input(&updates, Exchange::new(move |_| sink_hash));
let should_write = stream.scope().index() == (sink_hash as usize) % stream.scope().peers();
let mut frontier = Antichain::from_elem(T::minimum());
let mut timestamps = ChangeBatch::new();
let mut send_queue = std::collections::VecDeque::new();
let mut retain = Vec::new();
builder.build_reschedule(|_capabilities| {
move |frontiers| {
let mut input = FrontieredInputHandle::new(&mut input, &frontiers[0]);
input.for_each(|_capability, counts| {
timestamps.extend(counts.iter().cloned());
});
if should_write {
if let Some(sink) = progress_sink.upgrade() {
let mut sink = sink.borrow_mut();
if <_ as PartialOrder>::less_than(
&frontier.borrow(),
&input.frontier.frontier(),
) {
let new_frontier = input.frontier.frontier();
let mut announce = Vec::new();
for (time, count) in timestamps.drain() {
if !new_frontier.less_equal(&time) {
announce.push((time, count as usize));
} else {
retain.push((time, count));
}
}
timestamps.extend(retain.drain(..));
let progress = Progress {
lower: frontier.elements().to_vec(),
upper: new_frontier.to_vec(),
counts: announce,
};
send_queue.push_back(Message::Progress(progress));
frontier = input.frontier.frontier().to_owned();
while let Some(message) = send_queue.front() {
if let Some(duration) = sink.poll(message) {
reactivator.activate_after(duration);
return true;
} else {
send_queue.pop_front();
}
}
}
!sink.done() || !send_queue.is_empty()
} else {
timestamps.clear();
send_queue.clear();
false
}
} else { false }
}
});
}
}