declarative_dataflow/sources/
differential_logging.rs1use std::collections::HashMap;
5use std::time::Duration;
6
7use timely::communication::message::RefOrMut;
8use timely::dataflow::channels::pact::Pipeline;
9use timely::dataflow::operators::capture::Replay;
10use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
11use timely::dataflow::{Scope, Stream};
12
13use differential_dataflow::logging::DifferentialEvent;
14
15use crate::sources::{Sourceable, SourcingContext};
16use crate::{Aid, Value};
17use crate::{AttributeConfig, InputSemantics};
18use Value::{Eid, Number};
19
20#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
22pub struct DifferentialLogging {
23 pub attributes: Vec<Aid>,
25}
26
27impl<S: Scope<Timestamp = Duration>> Sourceable<S> for DifferentialLogging {
28 fn source(
29 &self,
30 scope: &mut S,
31 context: SourcingContext<S::Timestamp>,
32 ) -> Vec<(
33 Aid,
34 AttributeConfig,
35 Stream<S, ((Value, Value), Duration, isize)>,
36 )> {
37 let input = Some(context.differential_events).replay_into(scope);
38
39 let mut demux =
40 OperatorBuilder::new("Differential Logging Demux".to_string(), scope.clone());
41
42 let mut input = demux.new_input(&input, Pipeline);
43
44 let mut wrappers = HashMap::with_capacity(self.attributes.len());
45 let mut streams = HashMap::with_capacity(self.attributes.len());
46
47 for aid in self.attributes.iter() {
48 let (wrapper, stream) = demux.new_output();
49 wrappers.insert(aid.to_string(), wrapper);
50 streams.insert(aid.to_string(), stream);
51 }
52
53 let mut demux_buffer = Vec::new();
54 let num_interests = self.attributes.len();
55
56 demux.build(move |_capability| {
57 move |_frontiers| {
58 let mut handles = HashMap::with_capacity(num_interests);
59 for (aid, wrapper) in wrappers.iter_mut() {
60 handles.insert(aid.to_string(), wrapper.activate());
61 }
62
63 input.for_each(|time, data: RefOrMut<Vec<_>>| {
64 data.swap(&mut demux_buffer);
65
66 let mut sessions = HashMap::with_capacity(num_interests);
67 for (aid, handle) in handles.iter_mut() {
68 sessions.insert(aid.to_string(), handle.session(&time));
69 }
70
71 for (time, _worker, datum) in demux_buffer.drain(..) {
72 match datum {
73 DifferentialEvent::Batch(x) => {
74 let operator = Eid((x.operator as u64).into());
75 let length = Number(x.length as i64);
76
77 sessions
78 .get_mut("differential.event/size")
79 .map(|s| s.give(((operator, length), time, 1)));
80 }
81 DifferentialEvent::Merge(x) => {
82 trace!("[DIFFERENTIAL] {:?}", x);
83
84 if let Some(complete_size) = x.complete {
85 let operator = Eid((x.operator as u64).into());
86 let size_diff =
87 (complete_size as i64) - (x.length1 + x.length2) as i64;
88
89 sessions
90 .get_mut("differential.event/size")
91 .map(|s| s.give(((operator, Number(size_diff)), time, 1)));
92 }
93 }
94 _ => {}
95 }
96 }
97 });
98 }
99 });
100
101 self.attributes
102 .iter()
103 .map(|aid| {
104 (
105 aid.to_string(),
106 AttributeConfig::real_time(InputSemantics::Raw),
107 streams.remove(aid).unwrap(),
108 )
109 })
110 .collect()
111 }
112}