use std::collections::HashMap;
use std::ops::Sub;
use timely::dataflow::operators::aggregation::StateMachine;
use timely::dataflow::operators::Map;
use timely::dataflow::{ProbeHandle, Scope, Stream};
use timely::order::TotalOrder;
use timely::progress::Timestamp;
use differential_dataflow::input::{Input, InputSession};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::Threshold;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::AsCollection;
use crate::{Aid, Error, Time, TxData, Value};
use crate::{AttributeConfig, CollectionIndex, InputSemantics, RelationConfig, RelationHandle};
pub struct Domain<T: Timestamp + Lattice + TotalOrder> {
now_at: T,
input_sessions: HashMap<String, InputSession<T, (Value, Value), isize>>,
probe: ProbeHandle<T>,
pub attributes: HashMap<Aid, AttributeConfig>,
pub forward: HashMap<Aid, CollectionIndex<Value, Value, T>>,
pub reverse: HashMap<Aid, CollectionIndex<Value, Value, T>>,
pub relations: HashMap<Aid, RelationConfig<T>>,
pub arrangements: HashMap<Aid, RelationHandle<T>>,
}
impl<T> Domain<T>
where
T: Timestamp + Lattice + TotalOrder + Sub<Output = T> + std::convert::From<Time>,
{
pub fn new(start_at: T) -> Self {
Domain {
now_at: start_at,
input_sessions: HashMap::new(),
probe: ProbeHandle::new(),
attributes: HashMap::new(),
forward: HashMap::new(),
reverse: HashMap::new(),
relations: HashMap::new(),
arrangements: HashMap::new(),
}
}
pub fn create_attribute<S: Scope<Timestamp = T>>(
&mut self,
name: &str,
config: AttributeConfig,
scope: &mut S,
) -> Result<(), Error> {
if self.forward.contains_key(name) {
Err(Error {
category: "df.error.category/conflict",
message: format!("An attribute of name {} already exists.", name),
})
} else {
let (handle, mut tuples) = scope.new_collection::<(Value, Value), isize>();
tuples = match config.input_semantics {
InputSemantics::Raw => tuples,
InputSemantics::CardinalityOne => {
tuples
.inner
.map(|((e, next_v), t, diff)| (e, (next_v, t, diff)))
.state_machine(
|e, (next_v, t, diff), v| {
match v {
None => {
assert!(diff > 0, "Received a retraction of a new key on a CardinalityOne attribute");
*v = Some(next_v.clone());
(false, vec![((e.clone(), next_v), t, 1)])
}
Some(old_v) => {
let old_v = old_v.clone();
if diff > 0 {
*v = Some(next_v.clone());
(false, vec![
((e.clone(), old_v), t.clone(), -1),
((e.clone(), next_v), t, 1),
])
} else {
(true, vec![((e.clone(), old_v), t, -1)])
}
}
}
},
|e| {
if let Value::Eid(eid) = e {
*eid as u64
} else {
panic!("Expected an eid.");
}
}
)
.as_collection()
}
InputSemantics::CardinalityMany => {
tuples.distinct()
}
};
self.attributes.insert(name.to_string(), config);
let forward = CollectionIndex::index(name, &tuples);
let reverse = CollectionIndex::index(name, &tuples.map(|(e, v)| (v, e)));
self.forward.insert(name.to_string(), forward);
self.reverse.insert(name.to_string(), reverse);
self.input_sessions.insert(name.to_string(), handle);
Ok(())
}
}
pub fn create_source<S: Scope<Timestamp = T>>(
&mut self,
name: &str,
datoms: &Stream<S, ((Value, Value), T, isize)>,
) -> Result<(), Error> {
if self.forward.contains_key(name) {
Err(Error {
category: "df.error.category/conflict",
message: format!("An attribute of name {} already exists.", name),
})
} else {
let tuples = datoms
.as_collection()
.distinct();
let forward = CollectionIndex::index(&name, &tuples);
let reverse = CollectionIndex::index(&name, &tuples.map(|(e, v)| (v, e)));
self.forward.insert(name.to_string(), forward);
self.reverse.insert(name.to_string(), reverse);
info!("Created source {}", name);
Ok(())
}
}
pub fn register_arrangement(
&mut self,
name: String,
config: RelationConfig<T>,
mut trace: RelationHandle<T>,
) {
trace.distinguish_since(&[]);
self.relations.insert(name.clone(), config);
self.arrangements.insert(name, trace);
}
pub fn transact(&mut self, tx_data: Vec<TxData>) -> Result<(), Error> {
for TxData(op, e, a, v) in tx_data {
match self.input_sessions.get_mut(&a) {
None => {
return Err(Error {
category: "df.error.category/not-found",
message: format!("Attribute {} does not exist.", a),
});
}
Some(handle) => {
handle.update((Value::Eid(e), v), op);
}
}
}
Ok(())
}
pub fn close_input(&mut self, name: String) -> Result<(), Error> {
match self.input_sessions.remove(&name) {
None => Err(Error {
category: "df.error.category/not-found",
message: format!("Input {} does not exist.", name),
}),
Some(handle) => {
handle.close();
Ok(())
}
}
}
pub fn advance_to(&mut self, next: T) -> Result<(), Error> {
if !self.now_at.less_equal(&next) {
Err(Error {
category: "df.error.category/conflict",
message: format!(
"Domain is at {:?}, you attempted to rewind to {:?}.",
&self.now_at, &next
),
})
} else if !self.now_at.eq(&next) {
self.now_at = next.clone();
for handle in self.input_sessions.values_mut() {
handle.advance_to(next.clone());
handle.flush();
}
for (aid, config) in self.attributes.iter() {
if let Some(ref trace_slack) = config.trace_slack {
let frontier = &[next.clone() - trace_slack.clone().into()];
self.forward
.get_mut(aid)
.unwrap_or_else(|| {
panic!("Configuration available for unknown attribute {}", aid)
})
.advance_by(frontier);
self.reverse
.get_mut(aid)
.unwrap_or_else(|| {
panic!("Configuration available for unknown attribute {}", aid)
})
.advance_by(frontier);
}
}
for (name, config) in self.relations.iter() {
if let Some(ref trace_slack) = config.trace_slack {
let frontier = &[next.clone() - trace_slack.clone()];
self.arrangements
.get_mut(name)
.unwrap_or_else(|| {
panic!("Configuration available for unknown relation {}", name)
})
.advance_by(frontier);
}
}
Ok(())
} else {
Ok(())
}
}
pub fn time(&self) -> &T {
&self.now_at
}
}