use std::collections::HashMap;
use timely::dataflow::channels::pact::ParallelizationContract;
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::{ProbeHandle, Scope, Stream};
use timely::progress::Timestamp;
use differential_dataflow::lattice::Lattice;
use serde_json::map::Map;
use serde_json::Value as JValue;
use serde_json::Value::Object;
use crate::{Error, Output, ResultDiff, Time};
use super::{Sinkable, SinkingContext};
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct AssocIn {
pub stateful: Option<usize>,
}
impl<T> Sinkable<T> for AssocIn
where
T: Timestamp + Lattice + std::convert::Into<Time>,
{
fn sink<S, P>(
&self,
stream: &Stream<S, ResultDiff<T>>,
pact: P,
_probe: &mut ProbeHandle<T>,
context: SinkingContext,
) -> Result<Option<Stream<S, Output>>, Error>
where
S: Scope<Timestamp = T>,
P: ParallelizationContract<S::Timestamp, ResultDiff<T>>,
{
let mut paths = HashMap::new();
let mut states = if self.stateful.is_some() {
Some(Map::new())
} else {
None
};
let granularity = self.stateful.unwrap_or(1);
let mut vector = Vec::new();
let name = context.name;
let sunk = stream.unary_notify(
pact,
"AssocIn",
vec![],
move |input, output, notificator| {
input.for_each(|cap, data| {
data.swap(&mut vector);
paths
.entry(cap.time().clone())
.or_insert_with(Vec::new)
.extend(vector.drain(..));
notificator.notify_at(cap.retain());
});
notificator.for_each(|cap, _, _| {
if let Some(paths_at_time) = paths.remove(cap.time()) {
match states {
None => {
let t = cap.time();
let mut map = Map::new();
merge_paths(&mut map, paths_at_time, granularity);
let keys: Vec<String> = map.keys().cloned().collect();
output.session(&cap).give_iterator(keys.iter().map(|key| {
Output::Json(
name.clone(),
map.remove(key).unwrap(),
t.clone().into(),
1,
)
}));
}
Some(ref mut states) => {
let t = cap.time();
let changes = merge_paths(states, paths_at_time, granularity);
output.session(&cap).give_iterator(changes.iter().map(
|change_key| {
let mut snapshot = &states[&change_key[0]];
for key in &change_key[1..] {
if let Object(map) = snapshot {
snapshot = &map[key];
}
}
Output::Json(
name.clone(),
snapshot.clone(),
t.clone().into(),
1,
)
},
));
}
}
}
});
},
);
Ok(Some(sunk))
}
}
fn merge_paths<T>(
acc: &mut Map<String, JValue>,
mut paths: Vec<(Vec<crate::Value>, T, isize)>,
granularity: usize,
) -> Vec<Vec<String>>
where
T: Timestamp + Lattice + std::convert::Into<Time>,
{
use crate::Value;
let parse_key = |v: Value| match v {
Value::Aid(x) => x,
Value::Eid(x) => x.to_string(),
Value::String(x) => x,
_ => panic!("Malformed pull path. Expected a key."),
};
paths.sort_by_key(|(_path, _t, diff)| *diff);
paths.sort_by_key(|(_path, t, _diff)| t.clone());
let mut changes: Vec<Vec<String>> = Vec::new();
for (mut path, _t, diff) in paths {
let mut change_key: Vec<String> = Vec::new();
let leaf_val: Value = path.pop().expect("leaf value missing");
let leaf_key: String = parse_key(path.pop().expect("leaf keay missing"));
if path.is_empty() {
if change_key.len() < granularity {
change_key.push(leaf_key.clone());
}
acc.insert(leaf_key, serde_json::Value::from(leaf_val));
} else {
let first_key = parse_key(path[0].clone());
if change_key.len() < granularity {
change_key.push(first_key.clone());
}
let mut entry = acc.entry(first_key).or_insert_with(|| Object(Map::new()));
for key in path.drain(1..) {
if change_key.len() < granularity {
change_key.push(parse_key(key.clone()));
}
if let Object(map) = entry {
entry = map
.entry(parse_key(key))
.or_insert_with(|| Object(Map::new()));
}
}
if let Object(map) = entry {
if diff > 0 {
map.insert(leaf_key, serde_json::Value::from(leaf_val));
} else {
map.remove(&leaf_key);
}
}
}
changes.push(change_key);
}
changes.sort();
changes.dedup();
changes
}