declarative_dataflow/sources/
timely_logging.rs1use std::collections::HashMap;
5use std::time::Duration;
6
7use timely::communication::message::RefOrMut;
8use timely::dataflow::channels::pact::Pipeline;
9use timely::dataflow::operators::capture::Replay;
10use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
11use timely::dataflow::{Scope, Stream};
12use timely::logging::{TimelyEvent, WorkerIdentifier};
13
14use crate::sources::{Sourceable, SourcingContext};
15use crate::{Aid, Value};
16use crate::{AttributeConfig, InputSemantics};
17use Value::{Bool, Eid};
18
19#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
21pub struct TimelyLogging {
22 pub attributes: Vec<Aid>,
24 pub remote_peers: Option<usize>,
27}
28
29impl<S: Scope<Timestamp = Duration>> Sourceable<S> for TimelyLogging {
30 fn source(
31 &self,
32 scope: &mut S,
33 context: SourcingContext<S::Timestamp>,
34 ) -> Vec<(
35 Aid,
36 AttributeConfig,
37 Stream<S, ((Value, Value), Duration, isize)>,
38 )> {
39 let input = match self.remote_peers {
40 None => {
41 Some(context.timely_events).replay_into(scope)
43 }
44 Some(source_peers) => {
45 let sockets = open_sockets(source_peers);
47 make_replayers(sockets, scope.index(), scope.peers()).replay_into(scope)
48 }
49 };
50
51 let mut demux = OperatorBuilder::new("Timely Logging Demux".to_string(), scope.clone());
52
53 let mut input = demux.new_input(&input, Pipeline);
54
55 let mut wrappers = HashMap::with_capacity(self.attributes.len());
56 let mut streams = HashMap::with_capacity(self.attributes.len());
57
58 for aid in self.attributes.iter() {
59 let (wrapper, stream) = demux.new_output();
60 wrappers.insert(aid.to_string(), wrapper);
61 streams.insert(aid.to_string(), stream);
62 }
63
64 let mut demux_buffer = Vec::new();
65 let num_interests = self.attributes.len();
66
67 demux.build(move |_capability| {
68 move |_frontiers| {
69 let mut handles = HashMap::with_capacity(num_interests);
70 for (aid, wrapper) in wrappers.iter_mut() {
71 handles.insert(aid.to_string(), wrapper.activate());
72 }
73
74 input.for_each(|time, data: RefOrMut<Vec<_>>| {
75 data.swap(&mut demux_buffer);
76
77 let mut sessions = HashMap::with_capacity(num_interests);
78 for (aid, handle) in handles.iter_mut() {
79 sessions.insert(aid.to_string(), handle.session(&time));
80 }
81
82 for (time, _worker, datum) in demux_buffer.drain(..) {
83 match datum {
84 TimelyEvent::Operates(mut x) => {
85 let eid = Eid((x.id as u64).into());
86 let name = Value::String(x.name);
87
88 let local_id = Eid((x.addr.pop().unwrap() as u64).into());
90
91 if x.addr.is_empty() {
92 sessions
94 .get_mut("timely/scope")
95 .map(|s| s.give(((eid.clone(), Eid(0)), time, 1)));
96 } else {
97 let scope = Eid((x.addr.pop().unwrap() as u64).into());
99
100 sessions
101 .get_mut("timely/scope")
102 .map(|s| s.give(((eid.clone(), scope.clone()), time, 1)));
103
104 if !x.addr.is_empty() {
105 let parent = Eid((x.addr.pop().unwrap() as u64).into());
110 sessions
111 .get_mut("timely/scope")
112 .map(|s| s.give(((scope, parent), time, 1)));
113 }
114 }
115
116 sessions
117 .get_mut("timely.event.operates/local-id")
118 .map(|s| s.give(((eid.clone(), local_id), time, 1)));
119 sessions
120 .get_mut("timely.event.operates/name")
121 .map(|s| s.give(((eid, name), time, 1)));
122 }
123 TimelyEvent::Shutdown(x) => {
124 let eid = Eid((x.id as u64).into());
125 sessions
126 .get_mut("timely.event.operates/shutdown?")
127 .map(|s| s.give(((eid, Bool(true)), time, 1)));
128 }
129 TimelyEvent::Channels(mut x) => {
130 let eid = Eid((x.id as u64).into());
131 let src_index = Eid((x.source.0 as u64).into());
132 let src_port = Eid((x.source.1 as u64).into());
133 let target_index = Eid((x.target.0 as u64).into());
134 let target_port = Eid((x.target.1 as u64).into());
135
136 let scope = Eid((x.scope_addr.pop().unwrap() as u64).into());
138
139 sessions
140 .get_mut("timely/scope")
141 .map(|s| s.give(((eid.clone(), scope.clone()), time, 1)));
142 sessions
143 .get_mut("timely.event.channels/src-index")
144 .map(|s| s.give(((eid.clone(), src_index), time, 1)));
145 sessions
146 .get_mut("timely.event.channels/src-port")
147 .map(|s| s.give(((eid.clone(), src_port), time, 1)));
148 sessions
149 .get_mut("timely.event.channels/target-index")
150 .map(|s| s.give(((eid.clone(), target_index), time, 1)));
151 sessions
152 .get_mut("timely.event.channels/target-port")
153 .map(|s| s.give(((eid, target_port), time, 1)));
154 }
155 TimelyEvent::Schedule(x) => {
156 let eid = Eid((x.id as u64).into());
157 let is_started =
158 Bool(x.start_stop == ::timely::logging::StartStop::Start);
159
160 sessions
161 .get_mut("schedule/started?")
162 .map(|s| s.give(((eid, is_started), time, 1)));
163 }
164 TimelyEvent::Messages(_x) => {
165 }
168 _ => {}
169 }
170 }
171 });
172 }
173 });
174
175 self.attributes
176 .iter()
177 .map(|aid| {
178 (
179 aid.to_string(),
180 AttributeConfig::real_time(InputSemantics::CardinalityMany),
181 streams.remove(aid).unwrap(),
182 )
183 })
184 .collect()
185 }
186}
187
188use std::net::{TcpListener, TcpStream};
189use std::sync::{Arc, Mutex};
190
191use timely::dataflow::operators::capture::EventReader;
192
193fn open_sockets(source_peers: usize) -> Arc<Mutex<Vec<Option<TcpStream>>>> {
196 let listener = TcpListener::bind("127.0.0.1:8000").unwrap();
197 let sockets = (0..source_peers)
198 .map(|_| Some(listener.incoming().next().unwrap().unwrap()))
199 .collect::<Vec<_>>();
200
201 Arc::new(Mutex::new(sockets))
202}
203
204fn make_replayers(
207 sockets: Arc<Mutex<Vec<Option<TcpStream>>>>,
208 index: usize,
209 peers: usize,
210) -> Vec<EventReader<Duration, (Duration, WorkerIdentifier, TimelyEvent), TcpStream>> {
211 sockets
212 .lock()
213 .unwrap()
214 .iter_mut()
215 .enumerate()
216 .filter(|(i, _)| *i % peers == index)
217 .map(move |(_, s)| s.take().unwrap())
218 .map(|r| EventReader::<Duration, (Duration, WorkerIdentifier, TimelyEvent), _>::new(r))
219 .collect::<Vec<_>>()
220}