1use std::marker::PhantomData;
23
24use crate::logging::{TimelyLogger, MessagesEvent};
25use crate::progress::Timestamp;
26use crate::progress::timestamp::Refines;
27use crate::progress::{Source, Target};
28use crate::order::Product;
29use crate::{Container, Data};
30use crate::communication::Push;
31use crate::dataflow::channels::pushers::{CounterCore, TeeCore};
32use crate::dataflow::channels::{BundleCore, Message};
33
34use crate::worker::AsWorker;
35use crate::dataflow::{StreamCore, Scope, Stream};
36use crate::dataflow::scopes::{Child, ScopeParent};
37use crate::dataflow::operators::delay::Delay;
38
39pub trait Enter<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container> {
41 fn enter<'a>(&self, _: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C>;
56}
57
58use crate::dataflow::scopes::child::Iterative;
59
60pub trait EnterAt<G: Scope, T: Timestamp, D: Data> {
62 fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, initial: F) -> Stream<Iterative<'a, G, T>, D> ;
77}
78
79impl<G: Scope, T: Timestamp, D: Data, E: Enter<G, Product<<G as ScopeParent>::Timestamp, T>, Vec<D>>> EnterAt<G, T, D> for E {
80 fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, mut initial: F) ->
81 Stream<Iterative<'a, G, T>, D> {
82 self.enter(scope).delay(move |datum, time| Product::new(time.clone().to_outer(), initial(datum)))
83 }
84}
85
86impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Data+Container> Enter<G, T, C> for StreamCore<G, C> {
87 fn enter<'a>(&self, scope: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C> {
88
89 use crate::scheduling::Scheduler;
90
91 let (targets, registrar) = TeeCore::<T, C>::new();
92 let ingress = IngressNub {
93 targets: CounterCore::new(targets),
94 phantom: PhantomData,
95 activator: scope.activator_for(&scope.addr()),
96 active: false,
97 };
98 let produced = ingress.targets.produced().clone();
99 let input = scope.subgraph.borrow_mut().new_input(produced);
100 let channel_id = scope.clone().new_identifier();
101
102 if let Some(logger) = scope.logging() {
103 let pusher = LogPusher::new(ingress, channel_id, scope.index(), logger);
104 self.connect_to(input, pusher, channel_id);
105 } else {
106 self.connect_to(input, ingress, channel_id);
107 }
108
109 StreamCore::new(
110 Source::new(0, input.port),
111 registrar,
112 scope.clone(),
113 )
114 }
115}
116
117pub trait Leave<G: Scope, D: Container> {
119 fn leave(&self) -> StreamCore<G, D>;
134}
135
136impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, D> for StreamCore<Child<'a, G, T>, D> {
137 fn leave(&self) -> StreamCore<G, D> {
138
139 let scope = self.scope();
140
141 let output = scope.subgraph.borrow_mut().new_output();
142 let target = Target::new(0, output.port);
143 let (targets, registrar) = TeeCore::<G::Timestamp, D>::new();
144 let egress = EgressNub { targets, phantom: PhantomData };
145 let channel_id = scope.clone().new_identifier();
146
147 if let Some(logger) = scope.logging() {
148 let pusher = LogPusher::new(egress, channel_id, scope.index(), logger);
149 self.connect_to(target, pusher, channel_id);
150 } else {
151 self.connect_to(target, egress, channel_id);
152 }
153
154 StreamCore::new(
155 output,
156 registrar,
157 scope.parent,
158 )
159 }
160}
161
162
163struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Container> {
164 targets: CounterCore<TInner, TData, TeeCore<TInner, TData>>,
165 phantom: ::std::marker::PhantomData<TOuter>,
166 activator: crate::scheduling::Activator,
167 active: bool,
168}
169
170impl<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Container> Push<BundleCore<TOuter, TData>> for IngressNub<TOuter, TInner, TData> {
171 fn push(&mut self, element: &mut Option<BundleCore<TOuter, TData>>) {
172 if let Some(message) = element {
173 let outer_message = message.as_mut();
174 let data = ::std::mem::take(&mut outer_message.data);
175 let mut inner_message = Some(BundleCore::from_typed(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0)));
176 self.targets.push(&mut inner_message);
177 if let Some(inner_message) = inner_message {
178 if let Some(inner_message) = inner_message.if_typed() {
179 outer_message.data = inner_message.data;
180 }
181 }
182 self.active = true;
183 }
184 else {
185 if self.active {
186 self.activator.activate();
187 self.active = false;
188 }
189 self.targets.done();
190 }
191 }
192}
193
194
195struct EgressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Data> {
196 targets: TeeCore<TOuter, TData>,
197 phantom: PhantomData<TInner>,
198}
199
200impl<TOuter, TInner, TData: Container> Push<BundleCore<TInner, TData>> for EgressNub<TOuter, TInner, TData>
201where TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Data {
202 fn push(&mut self, message: &mut Option<BundleCore<TInner, TData>>) {
203 if let Some(message) = message {
204 let inner_message = message.as_mut();
205 let data = ::std::mem::take(&mut inner_message.data);
206 let mut outer_message = Some(BundleCore::from_typed(Message::new(inner_message.time.clone().to_outer(), data, 0, 0)));
207 self.targets.push(&mut outer_message);
208 if let Some(outer_message) = outer_message {
209 if let Some(outer_message) = outer_message.if_typed() {
210 inner_message.data = outer_message.data;
211 }
212 }
213 }
214 else { self.targets.done(); }
215 }
216}
217
218struct LogPusher<P> {
225 pusher: P,
226 channel: usize,
227 counter: usize,
228 index: usize,
229 logger: TimelyLogger,
230}
231
232impl<P> LogPusher<P> {
233 fn new(pusher: P, channel: usize, index: usize, logger: TimelyLogger) -> Self {
234 Self {
235 pusher,
236 channel,
237 counter: 0,
238 index,
239 logger,
240 }
241 }
242}
243
244impl<T, D, P> Push<BundleCore<T, D>> for LogPusher<P>
245where
246 D: Container,
247 P: Push<BundleCore<T, D>>,
248{
249 fn push(&mut self, element: &mut Option<BundleCore<T, D>>) {
250 if let Some(bundle) = element {
251 let send_event = MessagesEvent {
252 is_send: true,
253 channel: self.channel,
254 source: self.index,
255 target: self.index,
256 seq_no: self.counter,
257 length: bundle.data.len(),
258 };
259 let recv_event = MessagesEvent {
260 is_send: false,
261 ..send_event
262 };
263
264 self.logger.log(send_event);
265 self.logger.log(recv_event);
266 self.counter += 1;
267 }
268
269 self.pusher.push(element);
270 }
271}
272
273#[cfg(test)]
274mod test {
275 #[test]
279 fn test_nested() {
280
281 use crate::dataflow::{InputHandle, ProbeHandle};
282 use crate::dataflow::operators::{Input, Inspect, Probe};
283
284 use crate::dataflow::Scope;
285 use crate::dataflow::operators::{Enter, Leave};
286
287 crate::execute(crate::Config::process(3), |worker| {
289
290 let index = worker.index();
291 let mut input = InputHandle::new();
292 let mut probe = ProbeHandle::new();
293
294 worker.dataflow(|scope| {
296 let data = scope.input_from(&mut input);
297
298 scope.region(|inner| {
299
300 let data = data.enter(inner);
301 inner.region(|inner2| data.enter(inner2).leave()).leave()
302 })
303 .inspect(move |x| println!("worker {}:\thello {}", index, x))
304 .probe_with(&mut probe);
305 });
306
307 input.advance_to(0);
309 for round in 0..10 {
310 if index == 0 {
311 input.send(round);
312 }
313 input.advance_to(round + 1);
314 while probe.less_than(input.time()) {
315 worker.step_or_park(None);
316 }
317 }
318 }).unwrap();
319 }
320
321}