declarative_dataflow/sources/
differential_logging.rs

1//! Operator and utilities to source data from the underlying
2//! Differential logging streams.
3
4use std::collections::HashMap;
5use std::time::Duration;
6
7use timely::communication::message::RefOrMut;
8use timely::dataflow::channels::pact::Pipeline;
9use timely::dataflow::operators::capture::Replay;
10use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
11use timely::dataflow::{Scope, Stream};
12
13use differential_dataflow::logging::DifferentialEvent;
14
15use crate::sources::{Sourceable, SourcingContext};
16use crate::{Aid, Value};
17use crate::{AttributeConfig, InputSemantics};
18use Value::{Eid, Number};
19
20/// One or more taps into Differential logging.
21#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
22pub struct DifferentialLogging {
23    /// The log attributes that should be materialized.
24    pub attributes: Vec<Aid>,
25}
26
27impl<S: Scope<Timestamp = Duration>> Sourceable<S> for DifferentialLogging {
28    fn source(
29        &self,
30        scope: &mut S,
31        context: SourcingContext<S::Timestamp>,
32    ) -> Vec<(
33        Aid,
34        AttributeConfig,
35        Stream<S, ((Value, Value), Duration, isize)>,
36    )> {
37        let input = Some(context.differential_events).replay_into(scope);
38
39        let mut demux =
40            OperatorBuilder::new("Differential Logging Demux".to_string(), scope.clone());
41
42        let mut input = demux.new_input(&input, Pipeline);
43
44        let mut wrappers = HashMap::with_capacity(self.attributes.len());
45        let mut streams = HashMap::with_capacity(self.attributes.len());
46
47        for aid in self.attributes.iter() {
48            let (wrapper, stream) = demux.new_output();
49            wrappers.insert(aid.to_string(), wrapper);
50            streams.insert(aid.to_string(), stream);
51        }
52
53        let mut demux_buffer = Vec::new();
54        let num_interests = self.attributes.len();
55
56        demux.build(move |_capability| {
57            move |_frontiers| {
58                let mut handles = HashMap::with_capacity(num_interests);
59                for (aid, wrapper) in wrappers.iter_mut() {
60                    handles.insert(aid.to_string(), wrapper.activate());
61                }
62
63                input.for_each(|time, data: RefOrMut<Vec<_>>| {
64                    data.swap(&mut demux_buffer);
65
66                    let mut sessions = HashMap::with_capacity(num_interests);
67                    for (aid, handle) in handles.iter_mut() {
68                        sessions.insert(aid.to_string(), handle.session(&time));
69                    }
70
71                    for (time, _worker, datum) in demux_buffer.drain(..) {
72                        match datum {
73                            DifferentialEvent::Batch(x) => {
74                                let operator = Eid((x.operator as u64).into());
75                                let length = Number(x.length as i64);
76
77                                sessions
78                                    .get_mut("differential.event/size")
79                                    .map(|s| s.give(((operator, length), time, 1)));
80                            }
81                            DifferentialEvent::Merge(x) => {
82                                trace!("[DIFFERENTIAL] {:?}", x);
83
84                                if let Some(complete_size) = x.complete {
85                                    let operator = Eid((x.operator as u64).into());
86                                    let size_diff =
87                                        (complete_size as i64) - (x.length1 + x.length2) as i64;
88
89                                    sessions
90                                        .get_mut("differential.event/size")
91                                        .map(|s| s.give(((operator, Number(size_diff)), time, 1)));
92                                }
93                            }
94                            _ => {}
95                        }
96                    }
97                });
98            }
99        });
100
101        self.attributes
102            .iter()
103            .map(|aid| {
104                (
105                    aid.to_string(),
106                    AttributeConfig::real_time(InputSemantics::Raw),
107                    streams.remove(aid).unwrap(),
108                )
109            })
110            .collect()
111    }
112}