use std::collections::HashMap;
use timely::dataflow::operators::{Probe, UnorderedInput};
use timely::dataflow::{ProbeHandle, Scope, ScopeParent, Stream};
use timely::progress::frontier::AntichainRef;
use timely::progress::Timestamp;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::operators::Threshold;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::AsCollection;
use crate::operators::CardinalityOne;
use crate::{Aid, Error, Rewind, TxData, Value};
use crate::{AttributeConfig, IndexDirection, InputSemantics, QuerySupport};
use crate::{RelationConfig, RelationHandle};
use crate::{TraceKeyHandle, TraceValHandle};
mod unordered_session;
use unordered_session::UnorderedSession;
pub struct Domain<T: Timestamp + Lattice> {
now_at: T,
last_advance: Vec<T>,
input_sessions: HashMap<String, UnorderedSession<T, (Value, Value), isize>>,
domain_probe: ProbeHandle<T>,
probed_source_count: usize,
pub attributes: HashMap<Aid, AttributeConfig>,
pub forward_count: HashMap<Aid, TraceKeyHandle<Value, T, isize>>,
pub forward_propose: HashMap<Aid, TraceValHandle<Value, Value, T, isize>>,
pub forward_validate: HashMap<Aid, TraceKeyHandle<(Value, Value), T, isize>>,
pub reverse_count: HashMap<Aid, TraceKeyHandle<Value, T, isize>>,
pub reverse_propose: HashMap<Aid, TraceValHandle<Value, Value, T, isize>>,
pub reverse_validate: HashMap<Aid, TraceKeyHandle<(Value, Value), T, isize>>,
pub relations: HashMap<Aid, RelationConfig>,
pub arrangements: HashMap<Aid, RelationHandle<T>>,
}
impl<T> Domain<T>
where
T: Timestamp + Lattice + Rewind,
{
pub fn new(start_at: T) -> Self {
Domain {
now_at: start_at,
last_advance: vec![<T as Lattice>::minimum()],
input_sessions: HashMap::new(),
domain_probe: ProbeHandle::new(),
probed_source_count: 0,
attributes: HashMap::new(),
forward_count: HashMap::new(),
forward_propose: HashMap::new(),
forward_validate: HashMap::new(),
reverse_count: HashMap::new(),
reverse_propose: HashMap::new(),
reverse_validate: HashMap::new(),
relations: HashMap::new(),
arrangements: HashMap::new(),
}
}
fn create_attribute<S: Scope + ScopeParent<Timestamp = T>>(
&mut self,
name: &str,
config: AttributeConfig,
pairs: &Stream<S, ((Value, Value), T, isize)>,
) -> Result<(), Error> {
if self.attributes.contains_key(name) {
Err(Error::conflict(format!(
"An attribute of name {} already exists.",
name
)))
} else {
let tuples = match config.input_semantics {
InputSemantics::Raw => pairs.as_collection(),
InputSemantics::CardinalityOne => pairs.as_collection().cardinality_one(),
InputSemantics::CardinalityMany => pairs.as_collection().distinct(),
};
let tuples_reverse = tuples.map(|(e, v)| (v, e));
self.forward_propose.insert(
name.to_string(),
tuples.arrange_named(&format!("->Propose({})", &name)).trace,
);
if config.index_direction == IndexDirection::Both {
self.reverse_propose.insert(
name.to_string(),
tuples_reverse
.arrange_named(&format!("->_Propose({})", &name))
.trace,
);
}
if config.input_semantics != InputSemantics::CardinalityOne {
if config.query_support == QuerySupport::AdaptiveWCO {
self.forward_count.insert(
name.to_string(),
tuples
.map(|(k, _v)| (k, ()))
.arrange_named(&format!("->Count({})", name))
.trace,
);
if config.index_direction == IndexDirection::Both {
self.reverse_count.insert(
name.to_string(),
tuples_reverse
.map(|(k, _v)| (k, ()))
.arrange_named(&format!("->_Count({})", name))
.trace,
);
}
}
if config.query_support >= QuerySupport::Delta {
self.forward_validate.insert(
name.to_string(),
tuples
.map(|t| (t, ()))
.arrange_named(&format!("->Validate({})", &name))
.trace,
);
if config.index_direction == IndexDirection::Both {
self.reverse_validate.insert(
name.to_string(),
tuples_reverse
.map(|t| (t, ()))
.arrange_named(&format!("->_Validate({})", &name))
.trace,
);
}
}
}
self.attributes.insert(name.to_string(), config);
info!("Created attribute {}", name);
Ok(())
}
}
pub fn create_transactable_attribute<S: Scope<Timestamp = T>>(
&mut self,
name: &str,
config: AttributeConfig,
scope: &mut S,
) -> Result<(), Error> {
let pairs = {
let ((handle, cap), pairs) = scope.new_unordered_input::<((Value, Value), T, isize)>();
let session = UnorderedSession::from(handle, cap);
self.input_sessions.insert(name.to_string(), session);
pairs
};
self.create_attribute(name, config, &pairs)?;
Ok(())
}
pub fn create_sourced_attribute<S: Scope + ScopeParent<Timestamp = T>>(
&mut self,
name: &str,
config: AttributeConfig,
pairs: &Stream<S, ((Value, Value), T, isize)>,
) -> Result<(), Error> {
let source_pairs = if config.timeless {
pairs.to_owned()
} else {
self.probed_source_count += 1;
pairs.probe_with(&mut self.domain_probe)
};
self.create_attribute(name, config, &source_pairs)?;
Ok(())
}
pub fn register_arrangement(
&mut self,
name: String,
config: RelationConfig,
trace: RelationHandle<T>,
) {
self.relations.insert(name.clone(), config);
self.arrangements.insert(name, trace);
}
pub fn transact(&mut self, tx_data: Vec<TxData>) -> Result<(), Error> {
for TxData(op, e, a, v, t) in tx_data {
match self.input_sessions.get_mut(&a) {
None => {
return Err(Error::not_found(format!("Attribute {} does not exist.", a)));
}
Some(handle) => match t {
None => handle.update((e, v), op),
Some(t) => handle.update_at((e, v), t.into(), op),
},
}
}
Ok(())
}
pub fn close_input(&mut self, name: String) -> Result<(), Error> {
match self.input_sessions.remove(&name) {
None => Err(Error::not_found(format!("Input {} does not exist.", name))),
Some(handle) => {
handle.close();
Ok(())
}
}
}
pub fn advance(&mut self) -> Result<(), Error> {
if self.probed_source_count() == 0 {
self.advance_traces(&[self.epoch().clone()])
} else {
let frontier = self
.domain_probe
.with_frontier(|frontier| (*frontier).to_vec());
if frontier.is_empty() {
self.advance_traces(&[self.epoch().clone()])
} else {
if !AntichainRef::new(&frontier).less_equal(self.epoch()) {
let max = frontier.iter().max().unwrap().clone();
self.advance_epoch(max)?;
}
self.advance_traces(&frontier)
}
}
}
pub fn advance_epoch(&mut self, next: T) -> Result<(), Error> {
if !self.now_at.less_equal(&next) {
Err(Error::conflict(format!(
"Domain is at {:?}, you attempted to rewind to {:?}.",
&self.now_at, &next
)))
} else if !self.now_at.eq(&next) {
trace!("Advancing domain epoch to {:?} ", next);
for handle in self.input_sessions.values_mut() {
handle.advance_to(next.clone());
handle.flush();
}
self.now_at = next;
Ok(())
} else {
Ok(())
}
}
pub fn advance_traces(&mut self, frontier: &[T]) -> Result<(), Error> {
let last_advance = AntichainRef::new(&self.last_advance);
if frontier.iter().any(|t| last_advance.less_than(t)) {
trace!("Advancing traces to {:?}", frontier);
self.last_advance = frontier.to_vec();
let frontier = AntichainRef::new(frontier);
for (aid, config) in self.attributes.iter() {
if let Some(ref trace_slack) = config.trace_slack {
let slacking_frontier = frontier
.iter()
.map(|t| t.rewind(trace_slack.clone().into()))
.collect::<Vec<T>>();;
if let Some(trace) = self.forward_count.get_mut(aid) {
trace.advance_by(&slacking_frontier);
trace.distinguish_since(&slacking_frontier);
}
if let Some(trace) = self.forward_propose.get_mut(aid) {
trace.advance_by(&slacking_frontier);
trace.distinguish_since(&slacking_frontier);
}
if let Some(trace) = self.forward_validate.get_mut(aid) {
trace.advance_by(&slacking_frontier);
trace.distinguish_since(&slacking_frontier);
}
if let Some(trace) = self.reverse_count.get_mut(aid) {
trace.advance_by(&slacking_frontier);
trace.distinguish_since(&slacking_frontier);
}
if let Some(trace) = self.reverse_propose.get_mut(aid) {
trace.advance_by(&slacking_frontier);
trace.distinguish_since(&slacking_frontier);
}
if let Some(trace) = self.reverse_validate.get_mut(aid) {
trace.advance_by(&slacking_frontier);
trace.distinguish_since(&slacking_frontier);
}
}
}
for (name, config) in self.relations.iter() {
if let Some(ref trace_slack) = config.trace_slack {
let slacking_frontier = frontier
.iter()
.map(|t| t.rewind(trace_slack.clone().into()))
.collect::<Vec<T>>();
let trace = self.arrangements.get_mut(name).unwrap_or_else(|| {
panic!("Configuration available for unknown relation {}", name)
});
trace.advance_by(&slacking_frontier);
trace.distinguish_since(&slacking_frontier);
}
}
}
Ok(())
}
pub fn domain_probe(&self) -> &ProbeHandle<T> {
&self.domain_probe
}
pub fn epoch(&self) -> &T {
&self.now_at
}
pub fn probed_source_count(&self) -> usize {
self.probed_source_count
}
pub fn dominates(&self, frontier: AntichainRef<T>) -> bool {
if self.probed_source_count() == 0 {
frontier.less_than(self.epoch())
} else if frontier.is_empty() {
false
} else {
self.domain_probe().with_frontier(|domain_frontier| {
domain_frontier.iter().all(|t| frontier.less_than(t))
})
}
}
}