use std::collections::{BTreeMap, BinaryHeap};
use timely::dataflow::channels::pact::Exchange;
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::operators::Capability;
use timely::dataflow::{Scope, Stream};
use timely::order::{PartialOrder, TotalOrder};
use timely::progress::Antichain;
use timely::progress::Timestamp;
use crate::operators::arrange::arrangement::Arranged;
use crate::trace::{self, Cursor, Trace, TraceReader};
use crate::trace::{Builder, Description};
use crate::{ExchangeData, Hashable};
use crate::trace::implementations::containers::BatchContainer;
use super::TraceAgent;
pub fn arrange_from_upsert<G, Bu, Tr>(
stream: &Stream<G, (Tr::KeyOwn, Option<Tr::ValOwn>, G::Timestamp)>,
name: &str,
) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope<Timestamp = Tr::Time>,
Tr: for<'a> Trace<
KeyOwn: ExchangeData + Hashable + std::hash::Hash,
ValOwn: ExchangeData,
Time: TotalOrder + ExchangeData,
Diff = isize,
> + 'static,
Bu: Builder<
Time = G::Timestamp,
Input = Vec<((Tr::KeyOwn, Tr::ValOwn), Tr::Time, Tr::Diff)>,
Output = Tr::Batch,
>,
{
let mut reader: Option<TraceAgent<Tr>> = None;
let stream = {
let reader = &mut reader;
let exchange = Exchange::new(
move |update: &(Tr::KeyOwn, Option<Tr::ValOwn>, G::Timestamp)| {
(update.0).hashed().into()
},
);
stream.unary_frontier(exchange, name, move |_capability, info| {
let logger = stream
.scope()
.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange")
.map(Into::into);
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let activator = Some(stream.scope().activator_for(info.address.clone()));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
if let Some(exert_logic) = stream
.scope()
.config()
.get::<trace::ExertionLogic>("differential/default_exert_logic")
.cloned()
{
empty_trace.set_exert_logic(exert_logic);
}
let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
*reader = Some(reader_local.clone());
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
let mut priority_queue = BinaryHeap::<
std::cmp::Reverse<(G::Timestamp, Tr::KeyOwn, Option<Tr::ValOwn>)>,
>::new();
let mut updates = Vec::new();
move |(input, frontier), output| {
input.for_each(|cap, data| {
capabilities.insert(cap.retain());
for (key, val, time) in data.drain(..) {
priority_queue.push(std::cmp::Reverse((time, key, val)))
}
});
assert!(PartialOrder::less_equal(
&prev_frontier.borrow(),
&frontier.frontier()
));
if prev_frontier.borrow() != frontier.frontier() {
if capabilities
.elements()
.iter()
.any(|c| !frontier.less_equal(c.time()))
{
let mut upper = Antichain::new();
for (index, capability) in capabilities.elements().iter().enumerate() {
if !frontier.less_equal(capability.time()) {
upper.clear();
for time in frontier.frontier().iter() {
upper.insert(time.clone());
}
for other_capability in &capabilities.elements()[(index + 1)..] {
upper.insert(other_capability.time().clone());
}
let mut to_process = BTreeMap::new();
while priority_queue
.peek()
.map(|std::cmp::Reverse((t, _k, _v))| !upper.less_equal(t))
.unwrap_or(false)
{
let std::cmp::Reverse((time, key, val)) = priority_queue
.pop()
.expect("Priority queue just ensured non-empty");
to_process
.entry(key)
.or_insert(Vec::new())
.push((time, std::cmp::Reverse(val)));
}
if priority_queue.capacity() > 4 * priority_queue.len() {
priority_queue.shrink_to_fit();
}
let (mut trace_cursor, trace_storage) = reader_local.cursor();
let mut builder = Bu::new();
let mut key_con = Tr::KeyContainer::with_capacity(1);
for (key, mut list) in to_process {
key_con.clear();
key_con.push_own(&key);
let mut prev_value: Option<Tr::ValOwn> = None;
trace_cursor.seek_key(&trace_storage, key_con.index(0));
if trace_cursor
.get_key(&trace_storage)
.map(|k| k.eq(&key_con.index(0)))
.unwrap_or(false)
{
while let Some(val) = trace_cursor.get_val(&trace_storage) {
let mut count = 0;
trace_cursor
.map_times(&trace_storage, |_time, diff| {
count += Tr::owned_diff(diff)
});
assert!(count == 0 || count == 1);
if count == 1 {
assert!(prev_value.is_none());
prev_value = Some(Tr::owned_val(val));
}
trace_cursor.step_val(&trace_storage);
}
trace_cursor.step_key(&trace_storage);
}
list.sort();
list.dedup_by(|(t1, _), (t2, _)| t1 == t2);
for (time, std::cmp::Reverse(next)) in list {
if prev_value != next {
if let Some(prev) = prev_value {
updates.push((
(key.clone(), prev),
time.clone(),
-1,
));
}
if let Some(next) = next.as_ref() {
updates.push((
(key.clone(), next.clone()),
time.clone(),
1,
));
}
prev_value = next;
}
}
updates.sort();
builder.push(&mut updates);
}
let description = Description::new(
prev_frontier.clone(),
upper.clone(),
Antichain::from_elem(G::Timestamp::minimum()),
);
let batch = builder.done(description);
prev_frontier.clone_from(&upper);
writer.insert(batch.clone(), Some(capability.time().clone()));
output.session(&capabilities.elements()[index]).give(batch);
}
}
let mut new_capabilities = Antichain::new();
if let Some(std::cmp::Reverse((time, _, _))) = priority_queue.peek() {
if let Some(capability) = capabilities
.elements()
.iter()
.find(|c| c.time().less_equal(time))
{
new_capabilities.insert(capability.delayed(time));
} else {
panic!("failed to find capability");
}
}
capabilities = new_capabilities;
} else {
writer.seal(frontier.frontier().to_owned());
}
prev_frontier.clear();
prev_frontier.extend(frontier.frontier().iter().cloned());
reader_local.set_logical_compaction(prev_frontier.borrow());
reader_local.set_physical_compaction(prev_frontier.borrow());
}
writer.exert();
}
})
};
Arranged {
stream,
trace: reader.unwrap(),
}
}