timely/worker.rs
1//! The root of each single-threaded worker.
2
3use std::rc::Rc;
4use std::cell::{RefCell, RefMut};
5use std::any::Any;
6use std::str::FromStr;
7use std::time::{Instant, Duration};
8use std::collections::HashMap;
9use std::collections::hash_map::Entry;
10use std::sync::Arc;
11
12use crate::communication::{Allocate, Data, Push, Pull};
13use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
14use crate::scheduling::{Schedule, Scheduler, Activations};
15use crate::progress::timestamp::{Refines};
16use crate::progress::SubgraphBuilder;
17use crate::progress::operate::Operate;
18use crate::dataflow::scopes::Child;
19use crate::logging::TimelyLogger;
20
21/// Different ways in which timely's progress tracking can work.
22///
23/// These options drive some buffering and accumulation that timely
24/// can do to try and trade volume of progress traffic against latency.
25/// By accumulating updates longer, a smaller total volume of messages
26/// are sent.
27///
28/// The `ProgressMode::Demand` variant is the most robust, and least
29/// likely to lead to catastrophic performance. The `Eager` variant
30/// is useful for getting the smallest latencies on systems with few
31/// workers, but does risk saturating the system with progress messages
32/// and should be used with care, or not at all.
33///
34/// If you are not certain which option to use, prefer `Demand`, and
35/// perhaps monitor the progress messages through timely's logging
36/// infrastructure to see if their volume is surprisingly high.
37#[derive(Debug, Clone, Copy, Eq, PartialEq)]
38pub enum ProgressMode {
39 /// Eagerly transmit all progress updates produced by a worker.
40 ///
41 /// Progress messages are transmitted without consideration for the
42 /// possibility that they may unblock other workers. This can result
43 /// in a substantial volume of messages that do not result in a
44 /// change to the lower bound of outstanding work.
45 Eager,
46 /// Delay transmission of progress updates until any could advance
47 /// the global frontier of timestamps.
48 ///
49 /// As timely executes, the progress messages inform each worker of
50 /// the outstanding work remaining in the system. As workers work,
51 /// they produce changes to this outstanding work. This option
52 /// delays the communication of those changes until they might
53 /// possibly cause a change in the lower bound of all outstanding
54 /// work.
55 ///
56 /// The most common case this remedies is when one worker transmits
57 /// messages to other workers, that worker holds a capability for the
58 /// operator and timestamp. Other workers will receive messages, and
59 /// with this option will not immediately acknowledge receiving the
60 /// messages, because the held capability is strictly prior to what
61 /// the messages can affect. Once the capability is released, the
62 /// progress messages are unblocked and transmitted, in accumulated
63 /// form.
64 Demand,
65}
66
67impl Default for ProgressMode {
68 fn default() -> ProgressMode {
69 ProgressMode::Demand
70 }
71}
72
73impl FromStr for ProgressMode {
74 type Err = String;
75
76 fn from_str(s: &str) -> Result<ProgressMode, String> {
77 match s {
78 "eager" => Ok(ProgressMode::Eager),
79 "demand" => Ok(ProgressMode::Demand),
80 _ => Err(format!("unknown progress mode: {}", s)),
81 }
82 }
83}
84
85/// Worker configuration.
86#[derive(Debug, Default, Clone)]
87pub struct Config {
88 /// The progress mode to use.
89 pub(crate) progress_mode: ProgressMode,
90 /// A map from parameter name to typed parameter values.
91 registry: HashMap<String, Arc<dyn Any + Send + Sync>>,
92}
93
94impl Config {
95 /// Installs options into a [getopts_dep::Options] struct that correspond
96 /// to the parameters in the configuration.
97 ///
98 /// It is the caller's responsibility to ensure that the installed options
99 /// do not conflict with any other options that may exist in `opts`, or
100 /// that may be installed into `opts` in the future.
101 ///
102 /// This method is only available if the `getopts` feature is enabled, which
103 /// it is by default.
104 #[cfg(feature = "getopts")]
105 pub fn install_options(opts: &mut getopts_dep::Options) {
106 opts.optopt("", "progress-mode", "progress tracking mode (eager or demand)", "MODE");
107 }
108
109 /// Instantiates a configuration based upon the parsed options in `matches`.
110 ///
111 /// The `matches` object must have been constructed from a
112 /// [getopts_dep::Options] which contained at least the options installed by
113 /// [Self::install_options].
114 ///
115 /// This method is only available if the `getopts` feature is enabled, which
116 /// it is by default.
117 #[cfg(feature = "getopts")]
118 pub fn from_matches(matches: &getopts_dep::Matches) -> Result<Config, String> {
119 let progress_mode = matches
120 .opt_get_default("progress-mode", ProgressMode::Eager)?;
121 Ok(Config::default().progress_mode(progress_mode))
122 }
123
124 /// Sets the progress mode to `progress_mode`.
125 pub fn progress_mode(mut self, progress_mode: ProgressMode) -> Self {
126 self.progress_mode = progress_mode;
127 self
128 }
129
130 /// Sets a typed configuration parameter for the given `key`.
131 ///
132 /// It is recommended to install a single configuration struct using a key
133 /// that uniquely identifies your project, to avoid clashes. For example,
134 /// differential dataflow registers a configuration struct under the key
135 /// "differential".
136 ///
137 /// # Examples
138 /// ```rust
139 /// let mut config = timely::Config::process(3);
140 /// config.worker.set("example".to_string(), 7u64);
141 /// timely::execute(config, |worker| {
142 /// use crate::timely::worker::AsWorker;
143 /// assert_eq!(worker.config().get::<u64>("example"), Some(&7));
144 /// }).unwrap();
145 /// ```
146 pub fn set<T>(&mut self, key: String, val: T) -> &mut Self
147 where
148 T: Send + Sync + 'static,
149 {
150 self.registry.insert(key, Arc::new(val));
151 self
152 }
153
154 /// Gets the value for configured parameter `key`.
155 ///
156 /// Returns `None` if `key` has not previously been set with
157 /// [Config::set], or if the specified `T` does not match the `T`
158 /// from the call to `set`.
159 ///
160 /// # Examples
161 /// ```rust
162 /// let mut config = timely::Config::process(3);
163 /// config.worker.set("example".to_string(), 7u64);
164 /// timely::execute(config, |worker| {
165 /// use crate::timely::worker::AsWorker;
166 /// assert_eq!(worker.config().get::<u64>("example"), Some(&7));
167 /// }).unwrap();
168 /// ```
169 pub fn get<T: 'static>(&self, key: &str) -> Option<&T> {
170 self.registry.get(key).and_then(|val| val.downcast_ref())
171 }
172}
173
174/// Methods provided by the root Worker.
175///
176/// These methods are often proxied by child scopes, and this trait provides access.
177pub trait AsWorker : Scheduler {
178 /// Returns the worker configuration parameters.
179 fn config(&self) -> &Config;
180 /// Index of the worker among its peers.
181 fn index(&self) -> usize;
182 /// Number of peer workers.
183 fn peers(&self) -> usize;
184 /// Allocates a new channel from a supplied identifier and address.
185 ///
186 /// The identifier is used to identify the underlying channel and route
187 /// its data. It should be distinct from other identifiers passed used
188 /// for allocation, but can otherwise be arbitrary.
189 ///
190 /// The address should specify a path to an operator that should be
191 /// scheduled in response to the receipt of records on the channel.
192 /// Most commonly, this would be the address of the *target* of the
193 /// channel.
194 fn allocate<T: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>);
195 /// Constructs a pipeline channel from the worker to itself.
196 ///
197 /// By default this method uses the native channel allocation mechanism, but the expectation is
198 /// that this behavior will be overriden to be more efficient.
199 fn pipeline<T: 'static>(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>);
200
201 /// Allocates a new worker-unique identifier.
202 fn new_identifier(&mut self) -> usize;
203 /// Provides access to named logging streams.
204 fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>;
205 /// Provides access to the timely logging stream.
206 fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.log_register().get("timely") }
207}
208
209/// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`,
210/// and has a list of dataflows that it manages.
211pub struct Worker<A: Allocate> {
212 config: Config,
213 timer: Instant,
214 paths: Rc<RefCell<HashMap<usize, Vec<usize>>>>,
215 allocator: Rc<RefCell<A>>,
216 identifiers: Rc<RefCell<usize>>,
217 // dataflows: Rc<RefCell<Vec<Wrapper>>>,
218 dataflows: Rc<RefCell<HashMap<usize, Wrapper>>>,
219 dataflow_counter: Rc<RefCell<usize>>,
220 logging: Rc<RefCell<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>>,
221
222 activations: Rc<RefCell<Activations>>,
223 active_dataflows: Vec<usize>,
224
225 // Temporary storage for channel identifiers during dataflow construction.
226 // These are then associated with a dataflow once constructed.
227 temp_channel_ids: Rc<RefCell<Vec<usize>>>,
228}
229
230impl<A: Allocate> AsWorker for Worker<A> {
231 fn config(&self) -> &Config { &self.config }
232 fn index(&self) -> usize { self.allocator.borrow().index() }
233 fn peers(&self) -> usize { self.allocator.borrow().peers() }
234 fn allocate<D: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
235 if address.is_empty() { panic!("Unacceptable address: Length zero"); }
236 let mut paths = self.paths.borrow_mut();
237 paths.insert(identifier, address.to_vec());
238 self.temp_channel_ids.borrow_mut().push(identifier);
239 self.allocator.borrow_mut().allocate(identifier)
240 }
241 fn pipeline<T: 'static>(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>) {
242 if address.is_empty() { panic!("Unacceptable address: Length zero"); }
243 let mut paths = self.paths.borrow_mut();
244 paths.insert(identifier, address.to_vec());
245 self.temp_channel_ids.borrow_mut().push(identifier);
246 self.allocator.borrow_mut().pipeline(identifier)
247 }
248
249 fn new_identifier(&mut self) -> usize { self.new_identifier() }
250 fn log_register(&self) -> RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>> {
251 self.log_register()
252 }
253}
254
255impl<A: Allocate> Scheduler for Worker<A> {
256 fn activations(&self) -> Rc<RefCell<Activations>> {
257 self.activations.clone()
258 }
259}
260
261impl<A: Allocate> Worker<A> {
262 /// Allocates a new `Worker` bound to a channel allocator.
263 pub fn new(config: Config, c: A) -> Worker<A> {
264 let now = Instant::now();
265 let index = c.index();
266 Worker {
267 config,
268 timer: now,
269 paths: Default::default(),
270 allocator: Rc::new(RefCell::new(c)),
271 identifiers: Default::default(),
272 dataflows: Default::default(),
273 dataflow_counter: Default::default(),
274 logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now, index))),
275 activations: Rc::new(RefCell::new(Activations::new(now))),
276 active_dataflows: Default::default(),
277 temp_channel_ids: Default::default(),
278 }
279 }
280
281 /// Performs one step of the computation.
282 ///
283 /// A step gives each dataflow operator a chance to run, and is the
284 /// main way to ensure that a computation proceeds.
285 ///
286 /// # Examples
287 ///
288 /// ```
289 /// timely::execute_from_args(::std::env::args(), |worker| {
290 ///
291 /// use timely::dataflow::operators::{ToStream, Inspect};
292 ///
293 /// worker.dataflow::<usize,_,_>(|scope| {
294 /// (0 .. 10)
295 /// .to_stream(scope)
296 /// .inspect(|x| println!("{:?}", x));
297 /// });
298 ///
299 /// worker.step();
300 /// });
301 /// ```
302 pub fn step(&mut self) -> bool {
303 self.step_or_park(Some(Duration::from_secs(0)))
304 }
305
306 /// Performs one step of the computation.
307 ///
308 /// A step gives each dataflow operator a chance to run, and is the
309 /// main way to ensure that a computation proceeds.
310 ///
311 /// This method takes an optional timeout and may park the thread until
312 /// there is work to perform or until this timeout expires. A value of
313 /// `None` allows the worker to park indefinitely, whereas a value of
314 /// `Some(Duration::new(0, 0))` will return without parking the thread.
315 ///
316 /// # Examples
317 ///
318 /// ```
319 /// timely::execute_from_args(::std::env::args(), |worker| {
320 ///
321 /// use std::time::Duration;
322 /// use timely::dataflow::operators::{ToStream, Inspect};
323 ///
324 /// worker.dataflow::<usize,_,_>(|scope| {
325 /// (0 .. 10)
326 /// .to_stream(scope)
327 /// .inspect(|x| println!("{:?}", x));
328 /// });
329 ///
330 /// worker.step_or_park(Some(Duration::from_secs(1)));
331 /// });
332 /// ```
333 pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool {
334
335 { // Process channel events. Activate responders.
336 let mut allocator = self.allocator.borrow_mut();
337 allocator.receive();
338 let events = allocator.events().clone();
339 let mut borrow = events.borrow_mut();
340 let paths = self.paths.borrow();
341 for (channel, _event) in borrow.drain(..) {
342 // TODO: Pay more attent to `_event`.
343 // Consider tracking whether a channel
344 // in non-empty, and only activating
345 // on the basis of non-empty channels.
346 // TODO: This is a sloppy way to deal
347 // with channels that may not be alloc'd.
348 if let Some(path) = paths.get(&channel) {
349 self.activations
350 .borrow_mut()
351 .activate(&path[..]);
352 }
353 }
354 }
355
356 // Organize activations.
357 self.activations
358 .borrow_mut()
359 .advance();
360
361 // Consider parking only if we have no pending events, some dataflows, and a non-zero duration.
362 let empty_for = self.activations.borrow().empty_for();
363 // Determine the minimum park duration, where `None` are an absence of a constraint.
364 let delay = match (duration, empty_for) {
365 (Some(x), Some(y)) => Some(std::cmp::min(x,y)),
366 (x, y) => x.or(y),
367 };
368
369 if delay != Some(Duration::new(0,0)) {
370
371 // Log parking and flush log.
372 if let Some(l) = self.logging().as_mut() {
373 l.log(crate::logging::ParkEvent::park(delay));
374 l.flush();
375 }
376
377 self.allocator
378 .borrow()
379 .await_events(delay);
380
381 // Log return from unpark.
382 self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
383 }
384 else { // Schedule active dataflows.
385
386 let active_dataflows = &mut self.active_dataflows;
387 self.activations
388 .borrow_mut()
389 .for_extensions(&[], |index| active_dataflows.push(index));
390
391 let mut dataflows = self.dataflows.borrow_mut();
392 for index in active_dataflows.drain(..) {
393 // Step dataflow if it exists, remove if not incomplete.
394 if let Entry::Occupied(mut entry) = dataflows.entry(index) {
395 // TODO: This is a moment at which a scheduling decision is being made.
396 let incomplete = entry.get_mut().step();
397 if !incomplete {
398 let mut paths = self.paths.borrow_mut();
399 for channel in entry.get_mut().channel_ids.drain(..) {
400 paths.remove(&channel);
401 }
402 entry.remove_entry();
403 }
404 }
405 }
406 }
407
408 // Clean up, indicate if dataflows remain.
409 self.logging.borrow_mut().flush();
410 self.allocator.borrow_mut().release();
411 !self.dataflows.borrow().is_empty()
412 }
413
414 /// Calls `self.step()` as long as `func` evaluates to true.
415 ///
416 /// This method will continually execute even if there is not work
417 /// for the worker to perform. Consider using the similar method
418 /// `Self::step_or_park_while(duration)` to allow the worker to yield
419 /// control if that is appropriate.
420 ///
421 /// # Examples
422 ///
423 /// ```
424 /// timely::execute_from_args(::std::env::args(), |worker| {
425 ///
426 /// use timely::dataflow::operators::{ToStream, Inspect, Probe};
427 ///
428 /// let probe =
429 /// worker.dataflow::<usize,_,_>(|scope| {
430 /// (0 .. 10)
431 /// .to_stream(scope)
432 /// .inspect(|x| println!("{:?}", x))
433 /// .probe()
434 /// });
435 ///
436 /// worker.step_while(|| probe.less_than(&0));
437 /// });
438 /// ```
439 pub fn step_while<F: FnMut()->bool>(&mut self, func: F) {
440 self.step_or_park_while(Some(Duration::from_secs(0)), func)
441 }
442
443 /// Calls `self.step_or_park(duration)` as long as `func` evaluates to true.
444 ///
445 /// This method may yield whenever there is no work to perform, as performed
446 /// by `Self::step_or_park()`. Please consult the documentation for further
447 /// information about that method and its behavior. In particular, the method
448 /// can park the worker indefinitely, if no new work re-awakens the worker.
449 ///
450 /// # Examples
451 ///
452 /// ```
453 /// timely::execute_from_args(::std::env::args(), |worker| {
454 ///
455 /// use timely::dataflow::operators::{ToStream, Inspect, Probe};
456 ///
457 /// let probe =
458 /// worker.dataflow::<usize,_,_>(|scope| {
459 /// (0 .. 10)
460 /// .to_stream(scope)
461 /// .inspect(|x| println!("{:?}", x))
462 /// .probe()
463 /// });
464 ///
465 /// worker.step_or_park_while(None, || probe.less_than(&0));
466 /// });
467 /// ```
468 pub fn step_or_park_while<F: FnMut()->bool>(&mut self, duration: Option<Duration>, mut func: F) {
469 while func() { self.step_or_park(duration); }
470 }
471
472 /// The index of the worker out of its peers.
473 ///
474 /// # Examples
475 /// ```
476 /// timely::execute_from_args(::std::env::args(), |worker| {
477 ///
478 /// let index = worker.index();
479 /// let peers = worker.peers();
480 /// let timer = worker.timer();
481 ///
482 /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
483 ///
484 /// });
485 /// ```
486 pub fn index(&self) -> usize { self.allocator.borrow().index() }
487 /// The total number of peer workers.
488 ///
489 /// # Examples
490 /// ```
491 /// timely::execute_from_args(::std::env::args(), |worker| {
492 ///
493 /// let index = worker.index();
494 /// let peers = worker.peers();
495 /// let timer = worker.timer();
496 ///
497 /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
498 ///
499 /// });
500 /// ```
501 pub fn peers(&self) -> usize { self.allocator.borrow().peers() }
502
503 /// A timer started at the initiation of the timely computation.
504 ///
505 /// # Examples
506 /// ```
507 /// timely::execute_from_args(::std::env::args(), |worker| {
508 ///
509 /// let index = worker.index();
510 /// let peers = worker.peers();
511 /// let timer = worker.timer();
512 ///
513 /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
514 ///
515 /// });
516 /// ```
517 pub fn timer(&self) -> Instant { self.timer }
518
519 /// Allocate a new worker-unique identifier.
520 ///
521 /// This method is public, though it is not expected to be widely used outside
522 /// of the timely dataflow system.
523 pub fn new_identifier(&mut self) -> usize {
524 *self.identifiers.borrow_mut() += 1;
525 *self.identifiers.borrow() - 1
526 }
527
528 /// Access to named loggers.
529 ///
530 /// # Examples
531 ///
532 /// ```
533 /// timely::execute_from_args(::std::env::args(), |worker| {
534 ///
535 /// worker.log_register()
536 /// .insert::<timely::logging::TimelyEvent,_>("timely", |time, data|
537 /// println!("{:?}\t{:?}", time, data)
538 /// );
539 /// });
540 /// ```
541 pub fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>> {
542 self.logging.borrow_mut()
543 }
544
545 /// Construct a new dataflow.
546 ///
547 /// # Examples
548 /// ```
549 /// timely::execute_from_args(::std::env::args(), |worker| {
550 ///
551 /// // We must supply the timestamp type here, although
552 /// // it would generally be determined by type inference.
553 /// worker.dataflow::<usize,_,_>(|scope| {
554 ///
555 /// // uses of `scope` to build dataflow
556 ///
557 /// });
558 /// });
559 /// ```
560 pub fn dataflow<T, R, F>(&mut self, func: F) -> R
561 where
562 T: Refines<()>,
563 F: FnOnce(&mut Child<Self, T>)->R,
564 {
565 let logging = self.logging.borrow_mut().get("timely");
566 self.dataflow_core("Dataflow", logging, Box::new(()), |_, child| func(child))
567 }
568
569 /// Construct a new dataflow with a (purely cosmetic) name.
570 ///
571 /// # Examples
572 /// ```
573 /// timely::execute_from_args(::std::env::args(), |worker| {
574 ///
575 /// // We must supply the timestamp type here, although
576 /// // it would generally be determined by type inference.
577 /// worker.dataflow_named::<usize,_,_>("Some Dataflow", |scope| {
578 ///
579 /// // uses of `scope` to build dataflow
580 ///
581 /// });
582 /// });
583 /// ```
584 pub fn dataflow_named<T, R, F>(&mut self, name: &str, func: F) -> R
585 where
586 T: Refines<()>,
587 F: FnOnce(&mut Child<Self, T>)->R,
588 {
589 let logging = self.logging.borrow_mut().get("timely");
590 self.dataflow_core(name, logging, Box::new(()), |_, child| func(child))
591 }
592
593 /// Construct a new dataflow with specific configurations.
594 ///
595 /// This method constructs a new dataflow, using a name, logger, and additional
596 /// resources specified as argument. The name is cosmetic, the logger is used to
597 /// handle events generated by the dataflow, and the additional resources are kept
598 /// alive for as long as the dataflow is alive (use case: shared library bindings).
599 ///
600 /// # Examples
601 /// ```
602 /// timely::execute_from_args(::std::env::args(), |worker| {
603 ///
604 /// // We must supply the timestamp type here, although
605 /// // it would generally be determined by type inference.
606 /// worker.dataflow_core::<usize,_,_,_>(
607 /// "dataflow X", // Dataflow name
608 /// None, // Optional logger
609 /// 37, // Any resources
610 /// |resources, scope| { // Closure
611 ///
612 /// // uses of `resources`, `scope`to build dataflow
613 ///
614 /// }
615 /// );
616 /// });
617 /// ```
618 pub fn dataflow_core<T, R, F, V>(&mut self, name: &str, mut logging: Option<TimelyLogger>, mut resources: V, func: F) -> R
619 where
620 T: Refines<()>,
621 F: FnOnce(&mut V, &mut Child<Self, T>)->R,
622 V: Any+'static,
623 {
624 let addr = vec![];
625 let dataflow_index = self.allocate_dataflow_index();
626 let identifier = self.new_identifier();
627
628 let progress_logging = self.logging.borrow_mut().get("timely/progress");
629 let subscope = SubgraphBuilder::new_from(dataflow_index, addr, logging.clone(), progress_logging.clone(), name);
630 let subscope = RefCell::new(subscope);
631
632 let result = {
633 let mut builder = Child {
634 subgraph: &subscope,
635 parent: self.clone(),
636 logging: logging.clone(),
637 progress_logging,
638 };
639 func(&mut resources, &mut builder)
640 };
641
642 let mut operator = subscope.into_inner().build(self);
643
644 if let Some(l) = logging.as_mut() {
645 l.log(crate::logging::OperatesEvent {
646 id: identifier,
647 addr: operator.path().to_vec(),
648 name: operator.name().to_string(),
649 });
650 l.flush();
651 }
652
653 operator.get_internal_summary();
654 operator.set_external_summary();
655
656 let mut temp_channel_ids = self.temp_channel_ids.borrow_mut();
657 let channel_ids = temp_channel_ids.drain(..).collect::<Vec<_>>();
658
659 let wrapper = Wrapper {
660 logging,
661 identifier,
662 operate: Some(Box::new(operator)),
663 resources: Some(Box::new(resources)),
664 channel_ids,
665 };
666 self.dataflows.borrow_mut().insert(dataflow_index, wrapper);
667
668 result
669
670 }
671
672 /// Drops an identified dataflow.
673 ///
674 /// This method removes the identified dataflow, which will no longer be scheduled.
675 /// Various other resources will be cleaned up, though the method is currently in
676 /// public beta rather than expected to work. Please report all crashes and unmet
677 /// expectations!
678 pub fn drop_dataflow(&mut self, dataflow_identifier: usize) {
679 if let Some(mut entry) = self.dataflows.borrow_mut().remove(&dataflow_identifier) {
680 // Garbage collect channel_id to path information.
681 let mut paths = self.paths.borrow_mut();
682 for channel in entry.channel_ids.drain(..) {
683 paths.remove(&channel);
684 }
685 }
686 }
687
688 /// Returns the next index to be used for dataflow construction.
689 ///
690 /// This identifier will appear in the address of contained operators, and can
691 /// be used to drop the dataflow using `self.drop_dataflow()`.
692 pub fn next_dataflow_index(&self) -> usize {
693 *self.dataflow_counter.borrow()
694 }
695
696 /// List the current dataflow indices.
697 pub fn installed_dataflows(&self) -> Vec<usize> {
698 self.dataflows.borrow().keys().cloned().collect()
699 }
700
701 /// True if there is at least one dataflow under management.
702 pub fn has_dataflows(&self) -> bool {
703 !self.dataflows.borrow().is_empty()
704 }
705
706 // Acquire a new distinct dataflow identifier.
707 fn allocate_dataflow_index(&mut self) -> usize {
708 *self.dataflow_counter.borrow_mut() += 1;
709 *self.dataflow_counter.borrow() - 1
710 }
711}
712
713use crate::communication::Message;
714
715impl<A: Allocate> Clone for Worker<A> {
716 fn clone(&self) -> Self {
717 Worker {
718 config: self.config.clone(),
719 timer: self.timer,
720 paths: self.paths.clone(),
721 allocator: self.allocator.clone(),
722 identifiers: self.identifiers.clone(),
723 dataflows: self.dataflows.clone(),
724 dataflow_counter: self.dataflow_counter.clone(),
725 logging: self.logging.clone(),
726 activations: self.activations.clone(),
727 active_dataflows: Vec::new(),
728 temp_channel_ids: self.temp_channel_ids.clone(),
729 }
730 }
731}
732
733struct Wrapper {
734 logging: Option<TimelyLogger>,
735 identifier: usize,
736 operate: Option<Box<dyn Schedule>>,
737 resources: Option<Box<dyn Any>>,
738 channel_ids: Vec<usize>,
739}
740
741impl Wrapper {
742 /// Steps the dataflow, indicates if it remains incomplete.
743 ///
744 /// If the dataflow is incomplete, this call will drop it and its resources,
745 /// dropping the dataflow first and then the resources (so that, e.g., shared
746 /// library bindings will outlive the dataflow).
747 fn step(&mut self) -> bool {
748
749 // Perhaps log information about the start of the schedule call.
750 if let Some(l) = self.logging.as_mut() {
751 l.log(crate::logging::ScheduleEvent::start(self.identifier));
752 }
753
754 let incomplete = self.operate.as_mut().map(|op| op.schedule()).unwrap_or(false);
755 if !incomplete {
756 self.operate = None;
757 self.resources = None;
758 }
759
760 // Perhaps log information about the stop of the schedule call.
761 if let Some(l) = self.logging.as_mut() {
762 l.log(crate::logging::ScheduleEvent::stop(self.identifier));
763 }
764
765 incomplete
766 }
767}
768
769impl Drop for Wrapper {
770 fn drop(&mut self) {
771 if let Some(l) = self.logging.as_mut() {
772 l.log(crate::logging::ShutdownEvent { id: self.identifier });
773 }
774 // ensure drop order
775 self.operate = None;
776 self.resources = None;
777 }
778}