declarative_dataflow/sources/
timely_logging.rs

1//! Operator and utilities to source data from the underlying Timely
2//! logging streams.
3
4use std::collections::HashMap;
5use std::time::Duration;
6
7use timely::communication::message::RefOrMut;
8use timely::dataflow::channels::pact::Pipeline;
9use timely::dataflow::operators::capture::Replay;
10use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
11use timely::dataflow::{Scope, Stream};
12use timely::logging::{TimelyEvent, WorkerIdentifier};
13
14use crate::sources::{Sourceable, SourcingContext};
15use crate::{Aid, Value};
16use crate::{AttributeConfig, InputSemantics};
17use Value::{Bool, Eid};
18
19/// One or more taps into Timely logging.
20#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
21pub struct TimelyLogging {
22    /// The log attributes that should be materialized.
23    pub attributes: Vec<Aid>,
24    /// Optionally listen for events from a number of remote workers,
25    /// rather than introspectively.
26    pub remote_peers: Option<usize>,
27}
28
29impl<S: Scope<Timestamp = Duration>> Sourceable<S> for TimelyLogging {
30    fn source(
31        &self,
32        scope: &mut S,
33        context: SourcingContext<S::Timestamp>,
34    ) -> Vec<(
35        Aid,
36        AttributeConfig,
37        Stream<S, ((Value, Value), Duration, isize)>,
38    )> {
39        let input = match self.remote_peers {
40            None => {
41                // Read events introspectively.
42                Some(context.timely_events).replay_into(scope)
43            }
44            Some(source_peers) => {
45                // Listen for events from a remote computation.
46                let sockets = open_sockets(source_peers);
47                make_replayers(sockets, scope.index(), scope.peers()).replay_into(scope)
48            }
49        };
50
51        let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
52
53        let mut input = demux.new_input(&input, Pipeline);
54
55        let mut wrappers = HashMap::with_capacity(self.attributes.len());
56        let mut streams = HashMap::with_capacity(self.attributes.len());
57
58        for aid in self.attributes.iter() {
59            let (wrapper, stream) = demux.new_output();
60            wrappers.insert(aid.to_string(), wrapper);
61            streams.insert(aid.to_string(), stream);
62        }
63
64        let mut demux_buffer = Vec::new();
65        let num_interests = self.attributes.len();
66
67        demux.build(move |_capability| {
68            move |_frontiers| {
69                let mut handles = HashMap::with_capacity(num_interests);
70                for (aid, wrapper) in wrappers.iter_mut() {
71                    handles.insert(aid.to_string(), wrapper.activate());
72                }
73
74                input.for_each(|time, data: RefOrMut<Vec<_>>| {
75                    data.swap(&mut demux_buffer);
76
77                    let mut sessions = HashMap::with_capacity(num_interests);
78                    for (aid, handle) in handles.iter_mut() {
79                        sessions.insert(aid.to_string(), handle.session(&time));
80                    }
81
82                    for (time, _worker, datum) in demux_buffer.drain(..) {
83                        match datum {
84                            TimelyEvent::Operates(mut x) => {
85                                let eid = Eid((x.id as u64).into());
86                                let name = Value::String(x.name);
87
88                                // The id of this operator within its scope.
89                                let local_id = Eid((x.addr.pop().unwrap() as u64).into());
90
91                                if x.addr.is_empty() {
92                                    // This is a top-level subgraph and thus lives in the root.
93                                    sessions
94                                        .get_mut("timely/scope")
95                                        .map(|s| s.give(((eid.clone(), Eid(0)), time, 1)));
96                                } else {
97                                    // The leaf scope that this operator resides in.
98                                    let scope = Eid((x.addr.pop().unwrap() as u64).into());
99
100                                    sessions
101                                        .get_mut("timely/scope")
102                                        .map(|s| s.give(((eid.clone(), scope.clone()), time, 1)));
103
104                                    if !x.addr.is_empty() {
105                                        // This means that there are one or more parent scopes.
106                                        // We want to make sure our scope is linked to its parent. But
107                                        // we can assume all others are doing the same, so we don't need to
108                                        // re-introduce edges for higher-level ancestors.
109                                        let parent = Eid((x.addr.pop().unwrap() as u64).into());
110                                        sessions
111                                            .get_mut("timely/scope")
112                                            .map(|s| s.give(((scope, parent), time, 1)));
113                                    }
114                                }
115
116                                sessions
117                                    .get_mut("timely.event.operates/local-id")
118                                    .map(|s| s.give(((eid.clone(), local_id), time, 1)));
119                                sessions
120                                    .get_mut("timely.event.operates/name")
121                                    .map(|s| s.give(((eid, name), time, 1)));
122                            }
123                            TimelyEvent::Shutdown(x) => {
124                                let eid = Eid((x.id as u64).into());
125                                sessions
126                                    .get_mut("timely.event.operates/shutdown?")
127                                    .map(|s| s.give(((eid, Bool(true)), time, 1)));
128                            }
129                            TimelyEvent::Channels(mut x) => {
130                                let eid = Eid((x.id as u64).into());
131                                let src_index = Eid((x.source.0 as u64).into());
132                                let src_port = Eid((x.source.1 as u64).into());
133                                let target_index = Eid((x.target.0 as u64).into());
134                                let target_port = Eid((x.target.1 as u64).into());
135
136                                // The leaf scope that this channel resides in.
137                                let scope = Eid((x.scope_addr.pop().unwrap() as u64).into());
138
139                                sessions
140                                    .get_mut("timely/scope")
141                                    .map(|s| s.give(((eid.clone(), scope.clone()), time, 1)));
142                                sessions
143                                    .get_mut("timely.event.channels/src-index")
144                                    .map(|s| s.give(((eid.clone(), src_index), time, 1)));
145                                sessions
146                                    .get_mut("timely.event.channels/src-port")
147                                    .map(|s| s.give(((eid.clone(), src_port), time, 1)));
148                                sessions
149                                    .get_mut("timely.event.channels/target-index")
150                                    .map(|s| s.give(((eid.clone(), target_index), time, 1)));
151                                sessions
152                                    .get_mut("timely.event.channels/target-port")
153                                    .map(|s| s.give(((eid, target_port), time, 1)));
154                            }
155                            TimelyEvent::Schedule(x) => {
156                                let eid = Eid((x.id as u64).into());
157                                let is_started =
158                                    Bool(x.start_stop == ::timely::logging::StartStop::Start);
159
160                                sessions
161                                    .get_mut("schedule/started?")
162                                    .map(|s| s.give(((eid, is_started), time, 1)));
163                            }
164                            TimelyEvent::Messages(_x) => {
165                                // @TODO
166                                // (MessagesChannel, (x.seq_noValue::Usize(x.channel), Value::Bool(x.is_send), Value::Usize(x.source), Value::Usize(x.target), Value::Usize(x.seq_no), Value::Usize(x.length)
167                            }
168                            _ => {}
169                        }
170                    }
171                });
172            }
173        });
174
175        self.attributes
176            .iter()
177            .map(|aid| {
178                (
179                    aid.to_string(),
180                    AttributeConfig::real_time(InputSemantics::CardinalityMany),
181                    streams.remove(aid).unwrap(),
182                )
183            })
184            .collect()
185    }
186}
187
188use std::net::{TcpListener, TcpStream};
189use std::sync::{Arc, Mutex};
190
191use timely::dataflow::operators::capture::EventReader;
192
193/// Listens on 127.0.0.1:8000 and opens `source_peers` sockets from
194/// the computations we're examining.
195fn open_sockets(source_peers: usize) -> Arc<Mutex<Vec<Option<TcpStream>>>> {
196    let listener = TcpListener::bind("127.0.0.1:8000").unwrap();
197    let sockets = (0..source_peers)
198        .map(|_| Some(listener.incoming().next().unwrap().unwrap()))
199        .collect::<Vec<_>>();
200
201    Arc::new(Mutex::new(sockets))
202}
203
204/// Construct replayers that read data from sockets and can stream it
205/// into timely dataflow.
206fn make_replayers(
207    sockets: Arc<Mutex<Vec<Option<TcpStream>>>>,
208    index: usize,
209    peers: usize,
210) -> Vec<EventReader<Duration, (Duration, WorkerIdentifier, TimelyEvent), TcpStream>> {
211    sockets
212        .lock()
213        .unwrap()
214        .iter_mut()
215        .enumerate()
216        .filter(|(i, _)| *i % peers == index)
217        .map(move |(_, s)| s.take().unwrap())
218        .map(|r| EventReader::<Duration, (Duration, WorkerIdentifier, TimelyEvent), _>::new(r))
219        .collect::<Vec<_>>()
220}