#![forbid(missing_docs)]
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
pub mod binding;
pub mod domain;
pub mod plan;
pub mod server;
pub mod sinks;
pub mod sources;
pub mod timestamp;
use std::collections::{HashMap, HashSet, VecDeque};
use std::hash::Hash;
use std::time::Duration;
use timely::dataflow::operators::CapabilitySet;
use timely::dataflow::scopes::child::{Child, Iterative};
use timely::dataflow::*;
use timely::order::{Product, TotalOrder};
use timely::progress::timestamp::Refines;
use timely::progress::Timestamp;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::{Arrange, Arranged, ShutdownButton, TraceAgent};
use differential_dataflow::operators::iterate::Variable;
#[cfg(not(feature = "set-semantics"))]
use differential_dataflow::operators::Consolidate;
#[cfg(feature = "set-semantics")]
use differential_dataflow::operators::Threshold;
use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine};
use differential_dataflow::trace::wrappers::enter::TraceEnter;
use differential_dataflow::trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::{Collection, Data};
pub use num_rational::Rational32;
pub use binding::{AsBinding, AttributeBinding, Binding};
pub use plan::{Hector, ImplContext, Implementable, Plan};
pub type Eid = u64;
pub type Aid = String;
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum Value {
Aid(Aid),
String(String),
Bool(bool),
Number(i64),
Rational32(Rational32),
Eid(Eid),
Instant(u64),
Uuid([u8; 16]),
}
impl std::convert::From<Value> for Eid {
fn from(v: Value) -> Eid {
if let Value::Eid(eid) = v {
eid
} else {
panic!("Value {:?} can't be converted to Eid", v);
}
}
}
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum Time {
TxId(u64),
Real(Duration),
}
impl std::convert::From<Time> for u64 {
fn from(t: Time) -> u64 {
if let Time::TxId(time) = t {
time
} else {
panic!("Time {:?} can't be converted to u64", t);
}
}
}
impl std::convert::From<Time> for Duration {
fn from(t: Time) -> Duration {
if let Time::Real(time) = t {
time
} else {
panic!("Time {:?} can't be converted to Duration", t);
}
}
}
#[derive(Debug)]
pub struct Error {
pub category: &'static str,
pub message: String,
}
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct TxData(pub isize, pub Eid, pub Aid, pub Value);
pub type ResultDiff<T> = (Vec<Value>, T, isize);
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct Datom(pub Eid, pub Aid, pub Value);
pub type TraceKeyHandle<K, T, R> = TraceAgent<K, (), T, R, OrdKeySpine<K, T, R>>;
pub type TraceValHandle<K, V, T, R> = TraceAgent<K, V, T, R, OrdValSpine<K, V, T, R>>;
pub type RelationHandle<T> = TraceKeyHandle<Vec<Value>, T, isize>;
type VariableMap<G> = HashMap<String, Variable<G, Vec<Value>, isize>>;
trait Shutdownable {
fn press(&mut self);
}
impl<T> Shutdownable for ShutdownButton<T> {
#[inline(always)]
fn press(&mut self) {
self.press();
}
}
pub struct ShutdownHandle {
shutdown_buttons: Vec<Box<dyn Shutdownable>>,
}
impl Drop for ShutdownHandle {
fn drop(&mut self) {
for mut button in self.shutdown_buttons.drain(..) {
button.press();
}
}
}
impl ShutdownHandle {
pub fn empty() -> Self {
ShutdownHandle {
shutdown_buttons: Vec::new(),
}
}
pub fn from_button<T: Timestamp>(button: ShutdownButton<CapabilitySet<T>>) -> Self {
ShutdownHandle {
shutdown_buttons: vec![Box::new(button)],
}
}
pub fn add_button<T: Timestamp>(&mut self, button: ShutdownButton<CapabilitySet<T>>) {
self.shutdown_buttons.push(Box::new(button));
}
pub fn merge_with(&mut self, mut other: Self) {
self.shutdown_buttons.append(&mut other.shutdown_buttons);
}
pub fn merge(mut left: Self, mut right: Self) -> Self {
let mut shutdown_buttons =
Vec::with_capacity(left.shutdown_buttons.len() + right.shutdown_buttons.len());
shutdown_buttons.append(&mut left.shutdown_buttons);
shutdown_buttons.append(&mut right.shutdown_buttons);
ShutdownHandle { shutdown_buttons }
}
}
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum InputSemantics {
Raw,
CardinalityOne,
CardinalityMany,
}
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct AttributeConfig {
pub input_semantics: InputSemantics,
pub trace_slack: Option<Time>,
}
impl AttributeConfig {
pub fn tx_time(input_semantics: InputSemantics) -> Self {
AttributeConfig {
input_semantics,
trace_slack: Some(Time::TxId(1)),
}
}
pub fn real_time(input_semantics: InputSemantics) -> Self {
AttributeConfig {
input_semantics,
trace_slack: Some(Time::Real(Duration::from_secs(1))),
}
}
pub fn uncompacted(input_semantics: InputSemantics) -> Self {
AttributeConfig {
input_semantics,
trace_slack: None,
}
}
}
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
pub struct RelationConfig<T>
where
T: Timestamp + Lattice + TotalOrder,
{
pub trace_slack: Option<T>,
}
pub struct CollectionIndex<K, V, T>
where
K: Data,
V: Data,
T: Lattice + Data,
{
pub name: String,
count_trace: TraceKeyHandle<K, T, isize>,
propose_trace: TraceValHandle<K, V, T, isize>,
validate_trace: TraceKeyHandle<(K, V), T, isize>,
}
impl<K, V, T> Clone for CollectionIndex<K, V, T>
where
K: Data + Hash,
V: Data + Hash,
T: Lattice + Data + Timestamp,
{
fn clone(&self) -> Self {
CollectionIndex {
name: self.name.clone(),
count_trace: self.count_trace.clone(),
propose_trace: self.propose_trace.clone(),
validate_trace: self.validate_trace.clone(),
}
}
}
impl<K, V, T> CollectionIndex<K, V, T>
where
K: Data + Hash,
V: Data + Hash,
T: Lattice + Data + Timestamp,
{
pub fn index<G: Scope<Timestamp = T>>(
name: &str,
collection: &Collection<G, (K, V), isize>,
) -> Self {
let mut count_trace = collection
.map(|(k, _v)| (k, ()))
.arrange_named(&format!("Counts({})", name))
.trace;
let mut propose_trace = collection
.arrange_named(&format!("Proposals({})", &name))
.trace;
let mut validate_trace = collection
.map(|t| (t, ()))
.arrange_named(&format!("Validations({})", &name))
.trace;
count_trace.distinguish_since(&[]);
propose_trace.distinguish_since(&[]);
validate_trace.distinguish_since(&[]);
CollectionIndex {
name: name.to_string(),
count_trace,
propose_trace,
validate_trace,
}
}
pub fn import<G: Scope<Timestamp = T>>(
&mut self,
scope: &G,
) -> (
LiveIndex<
G,
K,
V,
TraceKeyHandle<K, T, isize>,
TraceValHandle<K, V, T, isize>,
TraceKeyHandle<(K, V), T, isize>,
>,
ShutdownHandle,
) {
let (count, shutdown_count) = self
.count_trace
.import_core(scope, &format!("Counts({})", self.name));
let (propose, shutdown_propose) = self
.propose_trace
.import_core(scope, &format!("Proposals({})", self.name));
let (validate, shutdown_validate) = self
.validate_trace
.import_core(scope, &format!("Validations({})", self.name));
let index = LiveIndex {
count,
propose,
validate,
};
let mut shutdown_handle = ShutdownHandle::empty();
shutdown_handle.add_button(shutdown_count);
shutdown_handle.add_button(shutdown_propose);
shutdown_handle.add_button(shutdown_validate);
(index, shutdown_handle)
}
pub fn advance_by(&mut self, frontier: &[T]) {
self.count_trace.advance_by(frontier);
self.propose_trace.advance_by(frontier);
self.validate_trace.advance_by(frontier);
}
}
pub struct LiveIndex<G, K, V, TrCount, TrPropose, TrValidate>
where
G: Scope,
G::Timestamp: Lattice + Data,
K: Data,
V: Data,
TrCount: TraceReader<K, (), G::Timestamp, isize> + Clone,
TrPropose: TraceReader<K, V, G::Timestamp, isize> + Clone,
TrValidate: TraceReader<(K, V), (), G::Timestamp, isize> + Clone,
{
count: Arranged<G, K, (), isize, TrCount>,
propose: Arranged<G, K, V, isize, TrPropose>,
validate: Arranged<G, (K, V), (), isize, TrValidate>,
}
impl<G, K, V, TrCount, TrPropose, TrValidate> Clone
for LiveIndex<G, K, V, TrCount, TrPropose, TrValidate>
where
G: Scope,
G::Timestamp: Lattice + Data,
K: Data,
V: Data,
TrCount: TraceReader<K, (), G::Timestamp, isize> + Clone,
TrPropose: TraceReader<K, V, G::Timestamp, isize> + Clone,
TrValidate: TraceReader<(K, V), (), G::Timestamp, isize> + Clone,
{
fn clone(&self) -> Self {
LiveIndex {
count: self.count.clone(),
propose: self.propose.clone(),
validate: self.validate.clone(),
}
}
}
impl<G, K, V, TrCount, TrPropose, TrValidate> LiveIndex<G, K, V, TrCount, TrPropose, TrValidate>
where
G: Scope,
G::Timestamp: Lattice + Data,
K: Data,
V: Data,
TrCount: TraceReader<K, (), G::Timestamp, isize> + Clone,
TrPropose: TraceReader<K, V, G::Timestamp, isize> + Clone,
TrValidate: TraceReader<(K, V), (), G::Timestamp, isize> + Clone,
{
pub fn enter<'a, TInner>(
&self,
child: &Child<'a, G, TInner>,
) -> LiveIndex<
Child<'a, G, TInner>,
K,
V,
TraceEnter<K, (), G::Timestamp, isize, TrCount, TInner>,
TraceEnter<K, V, G::Timestamp, isize, TrPropose, TInner>,
TraceEnter<(K, V), (), G::Timestamp, isize, TrValidate, TInner>,
>
where
TrCount::Batch: Clone,
TrPropose::Batch: Clone,
TrValidate::Batch: Clone,
K: 'static,
V: 'static,
G::Timestamp: Clone + Default + 'static,
TInner: Refines<G::Timestamp> + Lattice + Timestamp + Clone + Default + 'static,
{
LiveIndex {
count: self.count.enter(child),
propose: self.propose.enter(child),
validate: self.validate.enter(child),
}
}
pub fn enter_at<'a, TInner, FCount, FPropose, FValidate>(
&self,
child: &Child<'a, G, TInner>,
fcount: FCount,
fpropose: FPropose,
fvalidate: FValidate,
) -> LiveIndex<
Child<'a, G, TInner>,
K,
V,
TraceEnterAt<K, (), G::Timestamp, isize, TrCount, TInner, FCount>,
TraceEnterAt<K, V, G::Timestamp, isize, TrPropose, TInner, FPropose>,
TraceEnterAt<(K, V), (), G::Timestamp, isize, TrValidate, TInner, FValidate>,
>
where
TrCount::Batch: Clone,
TrPropose::Batch: Clone,
TrValidate::Batch: Clone,
K: 'static,
V: 'static,
G::Timestamp: Clone + Default + 'static,
TInner: Refines<G::Timestamp> + Lattice + Timestamp + Clone + Default + 'static,
FCount: Fn(&K, &(), &G::Timestamp) -> TInner + 'static,
FPropose: Fn(&K, &V, &G::Timestamp) -> TInner + 'static,
FValidate: Fn(&(K, V), &(), &G::Timestamp) -> TInner + 'static,
{
LiveIndex {
count: self.count.enter_at(child, fcount),
propose: self.propose.enter_at(child, fpropose),
validate: self.validate.enter_at(child, fvalidate),
}
}
}
type Var = u32;
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct Rule {
pub name: String,
pub plan: Plan,
}
trait Relation<'a, G: Scope>: AsBinding
where
G::Timestamp: Lattice + Data,
{
fn tuples(self) -> Collection<Iterative<'a, G, u64>, Vec<Value>, isize>;
fn projected(
self,
target_variables: &[Var],
) -> Collection<Iterative<'a, G, u64>, Vec<Value>, isize>;
fn tuples_by_variables(
self,
variables: &[Var],
) -> Collection<Iterative<'a, G, u64>, (Vec<Value>, Vec<Value>), isize>;
fn arrange_by_variables(
self,
variables: &[Var],
) -> Arranged<
Iterative<'a, G, u64>,
Vec<Value>,
Vec<Value>,
isize,
TraceValHandle<Vec<Value>, Vec<Value>, Product<G::Timestamp, u64>, isize>,
>;
}
pub struct CollectionRelation<'a, G: Scope> {
variables: Vec<Var>,
tuples: Collection<Iterative<'a, G, u64>, Vec<Value>, isize>,
}
impl<'a, G: Scope> AsBinding for CollectionRelation<'a, G>
where
G::Timestamp: Lattice + Data,
{
fn variables(&self) -> Vec<Var> {
self.variables.clone()
}
fn binds(&self, variable: Var) -> Option<usize> {
self.variables.binds(variable)
}
fn ready_to_extend(&self, _prefix: &AsBinding) -> Option<Var> {
unimplemented!();
}
fn required_to_extend(&self, _prefix: &AsBinding, _target: Var) -> Option<Option<Var>> {
unimplemented!();
}
}
impl<'a, G: Scope> Relation<'a, G> for CollectionRelation<'a, G>
where
G::Timestamp: Lattice + Data,
{
fn tuples(self) -> Collection<Iterative<'a, G, u64>, Vec<Value>, isize> {
self.tuples
}
fn projected(
self,
target_variables: &[Var],
) -> Collection<Iterative<'a, G, u64>, Vec<Value>, isize> {
if self.variables() == target_variables {
self.tuples
} else {
let relation_variables = self.variables();
let target_variables = target_variables.to_vec();
self.tuples().map(move |tuple| {
target_variables
.iter()
.map(|x| {
let idx = relation_variables.binds(*x).unwrap();
tuple[idx].clone()
})
.collect()
})
}
}
fn tuples_by_variables(
self,
variables: &[Var],
) -> Collection<Iterative<'a, G, u64>, (Vec<Value>, Vec<Value>), isize> {
if variables == &self.variables()[..] {
self.tuples().map(|x| (x, Vec::new()))
} else if variables.is_empty() {
self.tuples().map(|x| (Vec::new(), x))
} else {
let key_length = variables.len();
let values_length = self.variables().len() - key_length;
let mut key_offsets: Vec<usize> = Vec::with_capacity(key_length);
let mut value_offsets: Vec<usize> = Vec::with_capacity(values_length);
let variable_set: HashSet<Var> = variables.iter().cloned().collect();
for variable in variables.iter() {
key_offsets.push(self.binds(*variable).unwrap());
}
for (idx, variable) in self.variables().iter().enumerate() {
if !variable_set.contains(variable) {
value_offsets.push(idx);
}
}
self.tuples().map(move |tuple| {
let key: Vec<Value> = key_offsets.iter().map(|i| tuple[*i].clone()).collect();
let values: Vec<Value> = value_offsets
.iter()
.map(move |i| tuple[*i].clone())
.collect();
(key, values)
})
}
}
fn arrange_by_variables(
self,
variables: &[Var],
) -> Arranged<
Iterative<'a, G, u64>,
Vec<Value>,
Vec<Value>,
isize,
TraceValHandle<Vec<Value>, Vec<Value>, Product<G::Timestamp, u64>, isize>,
> {
self.tuples_by_variables(variables).arrange()
}
}
pub fn q(target_variables: Vec<Var>, bindings: Vec<Binding>) -> Plan {
Plan::Hector(Hector {
variables: target_variables,
bindings,
})
}
pub fn collect_dependencies<T, I>(context: &I, names: &[&str]) -> Result<Vec<Rule>, Error>
where
T: Timestamp + Lattice + TotalOrder,
I: ImplContext<T>,
{
let mut seen = HashSet::new();
let mut rules = Vec::new();
let mut queue = VecDeque::new();
for name in names {
match context.rule(name) {
None => {
return Err(Error {
category: "df.error.category/not-found",
message: format!("Unknown rule {}.", name),
});
}
Some(rule) => {
seen.insert(name.to_string());
queue.push_back(rule.clone());
}
}
}
while let Some(next) = queue.pop_front() {
let dependencies = next.plan.dependencies();
for dep_name in dependencies.names.iter() {
if !seen.contains(dep_name) {
match context.rule(dep_name) {
None => {
return Err(Error {
category: "df.error.category/not-found",
message: format!("Unknown rule {}", dep_name),
});
}
Some(rule) => {
seen.insert(dep_name.to_string());
queue.push_back(rule.clone());
}
}
}
}
for aid in dependencies.attributes.iter() {
if !context.has_attribute(aid) {
return Err(Error {
category: "df.error.category/not-found",
message: format!("Rule depends on unknown attribute {}", aid),
});
}
}
rules.push(next);
}
Ok(rules)
}
pub fn implement<T, I, S>(
name: &str,
scope: &mut S,
context: &mut I,
) -> Result<
(
HashMap<String, Collection<S, Vec<Value>, isize>>,
ShutdownHandle,
),
Error,
>
where
T: Timestamp + Lattice + TotalOrder + Default,
I: ImplContext<T>,
S: Scope<Timestamp = T>,
{
scope.iterative::<u64, _, _>(|nested| {
let publish = vec![name];
let mut rules = collect_dependencies(&*context, &publish[..])?;
let mut local_arrangements = VariableMap::new();
let mut result_map = HashMap::new();
if rules.is_empty() {
return Err(Error {
category: "df.error.category/not-found",
message: format!("Couldn't find any rules for name {}.", name),
});
}
rules.sort_by(|x, y| x.name.cmp(&y.name));
for index in 1..rules.len() - 1 {
if rules[index].name == rules[index - 1].name {
return Err(Error {
category: "df.error.category/conflict",
message: format!("Duplicate rule definitions for rule {}", rules[index].name),
});
}
}
for rule in rules.iter() {
if context.is_underconstrained(&rule.name) {
local_arrangements.insert(
rule.name.clone(),
Variable::new(nested, Product::new(Default::default(), 1)),
);
}
}
for name in publish.into_iter() {
if let Some(relation) = local_arrangements.get(name) {
result_map.insert(name.to_string(), relation.leave());
} else {
return Err(Error {
category: "df.error.category/not-found",
message: format!("Attempted to publish undefined name {}.", name),
});
}
}
let mut executions = Vec::with_capacity(rules.len());
let mut shutdown_handle = ShutdownHandle::empty();
for rule in rules.iter() {
info!("planning {:?}", rule.name);
let (relation, shutdown) = rule.plan.implement(nested, &local_arrangements, context);
executions.push(relation);
shutdown_handle.merge_with(shutdown);
}
for (rule, execution) in rules.iter().zip(executions.drain(..)) {
match local_arrangements.remove(&rule.name) {
None => {
return Err(Error {
category: "df.error.category/not-found",
message: format!(
"Rule {} should be in local arrangements, but isn't.",
&rule.name
),
});
}
Some(variable) => {
#[cfg(feature = "set-semantics")]
variable.set(&execution.tuples().distinct());
#[cfg(not(feature = "set-semantics"))]
variable.set(&execution.tuples().consolidate());
}
}
}
Ok((result_map, shutdown_handle))
})
}
pub fn implement_neu<T, I, S>(
name: &str,
scope: &mut S,
context: &mut I,
) -> Result<
(
HashMap<String, Collection<S, Vec<Value>, isize>>,
ShutdownHandle,
),
Error,
>
where
T: Timestamp + Lattice + TotalOrder + Default,
I: ImplContext<T>,
S: Scope<Timestamp = T>,
{
scope.iterative::<u64, _, _>(move |nested| {
let publish = vec![name];
let mut rules = collect_dependencies(&*context, &publish[..])?;
let mut local_arrangements = VariableMap::new();
let mut result_map = HashMap::new();
if rules.is_empty() {
return Err(Error {
category: "df.error.category/not-found",
message: format!("Couldn't find any rules for name {}.", name),
});
}
rules.sort_by(|x, y| x.name.cmp(&y.name));
for index in 1..rules.len() - 1 {
if rules[index].name == rules[index - 1].name {
return Err(Error {
category: "df.error.category/conflict",
message: format!("Duplicate rule definitions for rule {}", rules[index].name),
});
}
}
for name in publish.iter() {
if context.is_underconstrained(name) {
local_arrangements.insert(
name.to_string(),
Variable::new(nested, Product::new(Default::default(), 1)),
);
}
}
for name in publish.into_iter() {
if let Some(relation) = local_arrangements.get(name) {
result_map.insert(name.to_string(), relation.leave());
} else {
return Err(Error {
category: "df.error.category/not-found",
message: format!("Attempted to publish undefined name {}.", name),
});
}
}
let mut executions = Vec::with_capacity(rules.len());
let mut shutdown_handle = ShutdownHandle::empty();
for rule in rules.iter() {
info!("neu_planning {:?}", rule.name);
let plan = q(rule.plan.variables(), rule.plan.into_bindings());
let (relation, shutdown) = plan.implement(nested, &local_arrangements, context);
executions.push(relation);
shutdown_handle.merge_with(shutdown);
}
for (rule, execution) in rules.iter().zip(executions.drain(..)) {
match local_arrangements.remove(&rule.name) {
None => {
return Err(Error {
category: "df.error.category/not-found",
message: format!(
"Rule {} should be in local arrangements, but isn't.",
&rule.name
),
});
}
Some(variable) => {
#[cfg(feature = "set-semantics")]
variable.set(&execution.tuples().distinct());
#[cfg(not(feature = "set-semantics"))]
variable.set(&execution.tuples().consolidate());
}
}
}
Ok((result_map, shutdown_handle))
})
}