use timely::dataflow::scopes::child::Iterative;
use timely::dataflow::Scope;
use timely::order::TotalOrder;
use timely::progress::Timestamp;
use differential_dataflow::lattice::Lattice;
use crate::binding::{AsBinding, Binding};
use crate::plan::{Dependencies, ImplContext, Implementable};
use crate::{CollectionRelation, Relation, ShutdownHandle, Value, Var, VariableMap};
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum Function {
TRUNCATE,
ADD,
SUBTRACT,
}
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct Transform<P: Implementable> {
pub variables: Vec<Var>,
pub result_variable: Var,
pub plan: Box<P>,
pub function: Function,
pub constants: Vec<Option<Value>>,
}
impl<P: Implementable> Implementable for Transform<P> {
fn dependencies(&self) -> Dependencies {
self.plan.dependencies()
}
fn into_bindings(&self) -> Vec<Binding> {
self.plan.into_bindings()
}
fn implement<'b, T, I, S>(
&self,
nested: &mut Iterative<'b, S, u64>,
local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
context: &mut I,
) -> (CollectionRelation<'b, S>, ShutdownHandle)
where
T: Timestamp + Lattice + TotalOrder,
I: ImplContext<T>,
S: Scope<Timestamp = T>,
{
let (relation, shutdown_handle) = self.plan.implement(nested, local_arrangements, context);
let key_offsets: Vec<usize> = self
.variables
.iter()
.map(|variable| relation.binds(*variable).expect("variable not found"))
.collect();
let mut variables = relation.variables();
variables.push(self.result_variable);
let constants_local = self.constants.clone();
let transformed = match self.function {
Function::TRUNCATE => CollectionRelation {
variables,
tuples: relation.tuples().map(move |tuple| {
let mut t = match tuple[key_offsets[0]] {
Value::Instant(inst) => inst as u64,
_ => panic!("TRUNCATE can only be applied to timestamps"),
};
let default_interval = String::from(":hour");
let interval_param = match constants_local[1].clone() {
Some(Value::String(interval)) => interval,
None => default_interval,
_ => panic!("Parameter for TRUNCATE must be a string"),
};
let mod_val = match interval_param.as_ref() {
":minute" => 60000,
":hour" => 3_600_000,
":day" => 86_400_000,
":week" => 604_800_000,
_ => panic!("Unknown interval for TRUNCATE"),
};
t = t - (t % mod_val);
let mut v = tuple.clone();
v.push(Value::Instant(t));
v
}),
},
Function::ADD => CollectionRelation {
variables,
tuples: relation.tuples().map(move |tuple| {
let mut result = 0;
for offset in &key_offsets {
let summand = match tuple[*offset] {
Value::Number(s) => s as i64,
_ => panic!("ADD can only be applied to numbers"),
};
result += summand;
}
for arg in &constants_local {
if let Some(constant) = arg {
let summand = match constant {
Value::Number(s) => *s as i64,
_ => panic!("ADD can only be applied to numbers"),
};
result += summand;
}
}
let mut v = tuple.clone();
v.push(Value::Number(result));
v
}),
},
Function::SUBTRACT => CollectionRelation {
variables,
tuples: relation.tuples().map(move |tuple| {
let mut result = match constants_local[0].clone() {
Some(constant) => match constant {
Value::Number(minuend) => minuend as i64,
_ => panic!("SUBTRACT can only be applied to numbers"),
},
None => match tuple[key_offsets[0]] {
Value::Number(minuend) => minuend as i64,
_ => panic!("SUBTRACT can only be applied to numbers"),
},
};
result = result + result;
for offset in &key_offsets {
let subtrahend = match tuple[*offset] {
Value::Number(s) => s as i64,
_ => panic!("SUBTRACT can only be applied to numbers"),
};
result -= subtrahend;
}
for arg in &constants_local {
if let Some(constant) = arg {
let subtrahend = match constant {
Value::Number(s) => *s as i64,
_ => panic!("SUBTRACT can only be applied to numbers"),
};
result -= subtrahend;
}
}
let mut v = tuple.clone();
v.push(Value::Number(result));
v
}),
},
};
(transformed, shutdown_handle)
}
}