use std::collections::{BinaryHeap, BTreeMap};
use timely::order::{PartialOrder, TotalOrder};
use timely::dataflow::Stream;
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Exchange;
use timely::progress::{Antichain, Timestamp};
use timely::dataflow::operators::Capability;
use crate::operators::arrange::arrangement::Arranged;
use crate::trace::{Builder, Description};
use crate::trace::{self, Trace, TraceReader, Cursor};
use crate::{ExchangeData, Hashable};
use crate::trace::implementations::containers::BatchContainer;
use super::TraceAgent;
pub fn arrange_from_upsert<'scope, Bu, Tr, K, V>(
stream: Stream<'scope, Tr::Time, Vec<(K, Option<V>, Tr::Time)>>,
name: &str,
) -> Arranged<'scope, TraceAgent<Tr>>
where
K: ExchangeData+Hashable+std::hash::Hash,
V: ExchangeData,
Tr: for<'a> Trace<
Key<'a> = &'a K,
Val<'a> = &'a V,
Time: TotalOrder+ExchangeData,
Diff=isize,
>+'static,
Bu: Builder<Time=Tr::Time, Input = Vec<((K, V), Tr::Time, Tr::Diff)>, Output = Tr::Batch>,
{
let mut reader: Option<TraceAgent<Tr>> = None;
let stream = {
let reader = &mut reader;
let exchange = Exchange::new(move |update: &(K,Option<V>,Tr::Time)| (update.0).hashed().into());
let scope = stream.scope();
stream.unary_frontier(exchange, name, move |_capability, info| {
let logger = scope.worker().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
let mut capabilities = Antichain::<Capability<Tr::Time>>::new();
let activator = Some(scope.activator_for(info.address.clone()));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
if let Some(exert_logic) = scope.worker().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
empty_trace.set_exert_logic(exert_logic);
}
let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
*reader = Some(reader_local.clone());
let mut prev_frontier = Antichain::from_elem(Tr::Time::minimum());
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(Tr::Time, K, Option<V>)>>::new();
let mut updates = Vec::new();
move |(input, frontier), output| {
input.for_each(|cap, data| {
capabilities.insert(cap.retain(0));
for (key, val, time) in data.drain(..) {
priority_queue.push(std::cmp::Reverse((time, key, val)))
}
});
assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &frontier.frontier()));
if prev_frontier.borrow() != frontier.frontier() {
if capabilities.elements().iter().any(|c| !frontier.less_equal(c.time())) {
let mut upper = Antichain::new();
for (index, capability) in capabilities.elements().iter().enumerate() {
if !frontier.less_equal(capability.time()) {
upper.clear();
for time in frontier.frontier().iter() {
upper.insert(time.clone());
}
for other_capability in &capabilities.elements()[(index + 1) .. ] {
upper.insert(other_capability.time().clone());
}
let mut to_process = BTreeMap::new();
while priority_queue.peek().map(|std::cmp::Reverse((t,_k,_v))| !upper.less_equal(t)).unwrap_or(false) {
let std::cmp::Reverse((time, key, val)) = priority_queue.pop().expect("Priority queue just ensured non-empty");
to_process.entry(key).or_insert(Vec::new()).push((time, std::cmp::Reverse(val)));
}
if priority_queue.capacity() > 4 * priority_queue.len() {
priority_queue.shrink_to_fit();
}
let (mut trace_cursor, trace_storage) = reader_local.cursor();
let mut builder = Bu::new();
let mut key_con = Tr::KeyContainer::with_capacity(1);
for (key, mut list) in to_process {
key_con.clear(); key_con.push_ref(&key);
let mut prev_value: Option<V> = None;
trace_cursor.seek_key(&trace_storage, key_con.index(0));
if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&key_con.index(0))).unwrap_or(false) {
while let Some(val) = trace_cursor.get_val(&trace_storage) {
let mut count = 0;
trace_cursor.map_times(&trace_storage, |_time, diff| count += Tr::owned_diff(diff));
assert!(count == 0 || count == 1);
if count == 1 {
assert!(prev_value.is_none());
prev_value = Some(val.clone());
}
trace_cursor.step_val(&trace_storage);
}
trace_cursor.step_key(&trace_storage);
}
list.sort();
list.dedup_by(|(t1,_), (t2,_)| t1 == t2);
for (time, std::cmp::Reverse(next)) in list {
if prev_value != next {
if let Some(prev) = prev_value {
updates.push(((key.clone(), prev), time.clone(), -1));
}
if let Some(next) = next.as_ref() {
updates.push(((key.clone(), next.clone()), time.clone(), 1));
}
prev_value = next;
}
}
updates.sort();
builder.push(&mut updates);
}
let description = Description::new(prev_frontier.clone(), upper.clone(), Antichain::from_elem(Tr::Time::minimum()));
let batch = builder.done(description);
prev_frontier.clone_from(&upper);
writer.insert(batch.clone(), Some(capability.time().clone()));
output.session(&capabilities.elements()[index]).give(batch);
}
}
let mut new_capabilities = Antichain::new();
if let Some(std::cmp::Reverse((time, _, _))) = priority_queue.peek() {
if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
new_capabilities.insert(capability.delayed(time));
}
else {
panic!("failed to find capability");
}
}
capabilities = new_capabilities;
}
else {
writer.seal(frontier.frontier().to_owned());
}
prev_frontier.clear();
prev_frontier.extend(frontier.frontier().iter().cloned());
reader_local.set_logical_compaction(prev_frontier.borrow());
reader_local.set_physical_compaction(prev_frontier.borrow());
}
writer.exert();
}
})
};
Arranged { stream, trace: reader.unwrap() }
}