timely/
worker.rs

1//! The root of each single-threaded worker.
2
3use std::rc::Rc;
4use std::cell::{RefCell, RefMut};
5use std::any::Any;
6use std::str::FromStr;
7use std::time::{Instant, Duration};
8use std::collections::HashMap;
9use std::collections::hash_map::Entry;
10use std::sync::Arc;
11
12use crate::communication::{Allocate, Data, Push, Pull};
13use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
14use crate::scheduling::{Schedule, Scheduler, Activations};
15use crate::progress::timestamp::{Refines};
16use crate::progress::SubgraphBuilder;
17use crate::progress::operate::Operate;
18use crate::dataflow::scopes::Child;
19use crate::logging::TimelyLogger;
20
21/// Different ways in which timely's progress tracking can work.
22///
23/// These options drive some buffering and accumulation that timely
24/// can do to try and trade volume of progress traffic against latency.
25/// By accumulating updates longer, a smaller total volume of messages
26/// are sent.
27///
28/// The `ProgressMode::Demand` variant is the most robust, and least
29/// likely to lead to catastrophic performance. The `Eager` variant
30/// is useful for getting the smallest latencies on systems with few
31/// workers, but does risk saturating the system with progress messages
32/// and should be used with care, or not at all.
33///
34/// If you are not certain which option to use, prefer `Demand`, and
35/// perhaps monitor the progress messages through timely's logging
36/// infrastructure to see if their volume is surprisingly high.
37#[derive(Debug, Clone, Copy, Eq, PartialEq)]
38pub enum ProgressMode {
39    /// Eagerly transmit all progress updates produced by a worker.
40    ///
41    /// Progress messages are transmitted without consideration for the
42    /// possibility that they may unblock other workers. This can result
43    /// in a substantial volume of messages that do not result in a
44    /// change to the lower bound of outstanding work.
45    Eager,
46    /// Delay transmission of progress updates until any could advance
47    /// the global frontier of timestamps.
48    ///
49    /// As timely executes, the progress messages inform each worker of
50    /// the outstanding work remaining in the system. As workers work,
51    /// they produce changes to this outstanding work. This option
52    /// delays the communication of those changes until they might
53    /// possibly cause a change in the lower bound of all outstanding
54    /// work.
55    ///
56    /// The most common case this remedies is when one worker transmits
57    /// messages to other workers, that worker holds a capability for the
58    /// operator and timestamp. Other workers will receive messages, and
59    /// with this option will not immediately acknowledge receiving the
60    /// messages, because the held capability is strictly prior to what
61    /// the messages can affect. Once the capability is released, the
62    /// progress messages are unblocked and transmitted, in accumulated
63    /// form.
64    Demand,
65}
66
67impl Default for ProgressMode {
68    fn default() -> ProgressMode {
69        ProgressMode::Demand
70    }
71}
72
73impl FromStr for ProgressMode {
74    type Err = String;
75
76    fn from_str(s: &str) -> Result<ProgressMode, String> {
77        match s {
78            "eager" => Ok(ProgressMode::Eager),
79            "demand" => Ok(ProgressMode::Demand),
80            _ => Err(format!("unknown progress mode: {}", s)),
81        }
82    }
83}
84
85/// Worker configuration.
86#[derive(Debug, Default, Clone)]
87pub struct Config {
88    /// The progress mode to use.
89    pub(crate) progress_mode: ProgressMode,
90    /// A map from parameter name to typed parameter values.
91    registry: HashMap<String, Arc<dyn Any + Send + Sync>>,
92}
93
94impl Config {
95    /// Installs options into a [getopts_dep::Options] struct that correspond
96    /// to the parameters in the configuration.
97    ///
98    /// It is the caller's responsibility to ensure that the installed options
99    /// do not conflict with any other options that may exist in `opts`, or
100    /// that may be installed into `opts` in the future.
101    ///
102    /// This method is only available if the `getopts` feature is enabled, which
103    /// it is by default.
104    #[cfg(feature = "getopts")]
105    pub fn install_options(opts: &mut getopts_dep::Options) {
106        opts.optopt("", "progress-mode", "progress tracking mode (eager or demand)", "MODE");
107    }
108
109    /// Instantiates a configuration based upon the parsed options in `matches`.
110    ///
111    /// The `matches` object must have been constructed from a
112    /// [getopts_dep::Options] which contained at least the options installed by
113    /// [Self::install_options].
114    ///
115    /// This method is only available if the `getopts` feature is enabled, which
116    /// it is by default.
117    #[cfg(feature = "getopts")]
118    pub fn from_matches(matches: &getopts_dep::Matches) -> Result<Config, String> {
119        let progress_mode = matches
120            .opt_get_default("progress-mode", ProgressMode::Eager)?;
121        Ok(Config::default().progress_mode(progress_mode))
122    }
123
124    /// Sets the progress mode to `progress_mode`.
125    pub fn progress_mode(mut self, progress_mode: ProgressMode) -> Self {
126        self.progress_mode = progress_mode;
127        self
128    }
129
130    /// Sets a typed configuration parameter for the given `key`.
131    ///
132    /// It is recommended to install a single configuration struct using a key
133    /// that uniquely identifies your project, to avoid clashes. For example,
134    /// differential dataflow registers a configuration struct under the key
135    /// "differential".
136    ///
137    /// # Examples
138    /// ```rust
139    /// let mut config = timely::Config::process(3);
140    /// config.worker.set("example".to_string(), 7u64);
141    /// timely::execute(config, |worker| {
142    ///    use crate::timely::worker::AsWorker;
143    ///    assert_eq!(worker.config().get::<u64>("example"), Some(&7));
144    /// }).unwrap();
145    /// ```
146    pub fn set<T>(&mut self, key: String, val: T) -> &mut Self
147    where
148        T: Send + Sync + 'static,
149    {
150        self.registry.insert(key, Arc::new(val));
151        self
152    }
153
154    /// Gets the value for configured parameter `key`.
155    ///
156    /// Returns `None` if `key` has not previously been set with
157    /// [Config::set], or if the specified `T` does not match the `T`
158    /// from the call to `set`.
159    ///
160    /// # Examples
161    /// ```rust
162    /// let mut config = timely::Config::process(3);
163    /// config.worker.set("example".to_string(), 7u64);
164    /// timely::execute(config, |worker| {
165    ///    use crate::timely::worker::AsWorker;
166    ///    assert_eq!(worker.config().get::<u64>("example"), Some(&7));
167    /// }).unwrap();
168    /// ```
169    pub fn get<T: 'static>(&self, key: &str) -> Option<&T> {
170        self.registry.get(key).and_then(|val| val.downcast_ref())
171    }
172}
173
174/// Methods provided by the root Worker.
175///
176/// These methods are often proxied by child scopes, and this trait provides access.
177pub trait AsWorker : Scheduler {
178    /// Returns the worker configuration parameters.
179    fn config(&self) -> &Config;
180    /// Index of the worker among its peers.
181    fn index(&self) -> usize;
182    /// Number of peer workers.
183    fn peers(&self) -> usize;
184    /// Allocates a new channel from a supplied identifier and address.
185    ///
186    /// The identifier is used to identify the underlying channel and route
187    /// its data. It should be distinct from other identifiers passed used
188    /// for allocation, but can otherwise be arbitrary.
189    ///
190    /// The address should specify a path to an operator that should be
191    /// scheduled in response to the receipt of records on the channel.
192    /// Most commonly, this would be the address of the *target* of the
193    /// channel.
194    fn allocate<T: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>);
195    /// Constructs a pipeline channel from the worker to itself.
196    ///
197    /// By default this method uses the native channel allocation mechanism, but the expectation is
198    /// that this behavior will be overriden to be more efficient.
199    fn pipeline<T: 'static>(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>);
200
201    /// Allocates a new worker-unique identifier.
202    fn new_identifier(&mut self) -> usize;
203    /// Provides access to named logging streams.
204    fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>;
205    /// Provides access to the timely logging stream.
206    fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.log_register().get("timely") }
207}
208
209/// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`,
210/// and has a list of dataflows that it manages.
211pub struct Worker<A: Allocate> {
212    config: Config,
213    timer: Instant,
214    paths: Rc<RefCell<HashMap<usize, Vec<usize>>>>,
215    allocator: Rc<RefCell<A>>,
216    identifiers: Rc<RefCell<usize>>,
217    // dataflows: Rc<RefCell<Vec<Wrapper>>>,
218    dataflows: Rc<RefCell<HashMap<usize, Wrapper>>>,
219    dataflow_counter: Rc<RefCell<usize>>,
220    logging: Rc<RefCell<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>>,
221
222    activations: Rc<RefCell<Activations>>,
223    active_dataflows: Vec<usize>,
224
225    // Temporary storage for channel identifiers during dataflow construction.
226    // These are then associated with a dataflow once constructed.
227    temp_channel_ids: Rc<RefCell<Vec<usize>>>,
228}
229
230impl<A: Allocate> AsWorker for Worker<A> {
231    fn config(&self) -> &Config { &self.config }
232    fn index(&self) -> usize { self.allocator.borrow().index() }
233    fn peers(&self) -> usize { self.allocator.borrow().peers() }
234    fn allocate<D: Data>(&mut self, identifier: usize, address: &[usize]) -> (Vec<Box<dyn Push<Message<D>>>>, Box<dyn Pull<Message<D>>>) {
235        if address.is_empty() { panic!("Unacceptable address: Length zero"); }
236        let mut paths = self.paths.borrow_mut();
237        paths.insert(identifier, address.to_vec());
238        self.temp_channel_ids.borrow_mut().push(identifier);
239        self.allocator.borrow_mut().allocate(identifier)
240    }
241    fn pipeline<T: 'static>(&mut self, identifier: usize, address: &[usize]) -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>) {
242        if address.is_empty() { panic!("Unacceptable address: Length zero"); }
243        let mut paths = self.paths.borrow_mut();
244        paths.insert(identifier, address.to_vec());
245        self.temp_channel_ids.borrow_mut().push(identifier);
246        self.allocator.borrow_mut().pipeline(identifier)
247    }
248
249    fn new_identifier(&mut self) -> usize { self.new_identifier() }
250    fn log_register(&self) -> RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>> {
251        self.log_register()
252    }
253}
254
255impl<A: Allocate> Scheduler for Worker<A> {
256    fn activations(&self) -> Rc<RefCell<Activations>> {
257        self.activations.clone()
258    }
259}
260
261impl<A: Allocate> Worker<A> {
262    /// Allocates a new `Worker` bound to a channel allocator.
263    pub fn new(config: Config, c: A) -> Worker<A> {
264        let now = Instant::now();
265        let index = c.index();
266        Worker {
267            config,
268            timer: now,
269            paths:  Default::default(),
270            allocator: Rc::new(RefCell::new(c)),
271            identifiers:  Default::default(),
272            dataflows: Default::default(),
273            dataflow_counter:  Default::default(),
274            logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now, index))),
275            activations: Rc::new(RefCell::new(Activations::new(now))),
276            active_dataflows: Default::default(),
277            temp_channel_ids:  Default::default(),
278        }
279    }
280
281    /// Performs one step of the computation.
282    ///
283    /// A step gives each dataflow operator a chance to run, and is the
284    /// main way to ensure that a computation proceeds.
285    ///
286    /// # Examples
287    ///
288    /// ```
289    /// timely::execute_from_args(::std::env::args(), |worker| {
290    ///
291    ///     use timely::dataflow::operators::{ToStream, Inspect};
292    ///
293    ///     worker.dataflow::<usize,_,_>(|scope| {
294    ///         (0 .. 10)
295    ///             .to_stream(scope)
296    ///             .inspect(|x| println!("{:?}", x));
297    ///     });
298    ///
299    ///     worker.step();
300    /// });
301    /// ```
302    pub fn step(&mut self) -> bool {
303        self.step_or_park(Some(Duration::from_secs(0)))
304    }
305
306    /// Performs one step of the computation.
307    ///
308    /// A step gives each dataflow operator a chance to run, and is the
309    /// main way to ensure that a computation proceeds.
310    ///
311    /// This method takes an optional timeout and may park the thread until
312    /// there is work to perform or until this timeout expires. A value of
313    /// `None` allows the worker to park indefinitely, whereas a value of
314    /// `Some(Duration::new(0, 0))` will return without parking the thread.
315    ///
316    /// # Examples
317    ///
318    /// ```
319    /// timely::execute_from_args(::std::env::args(), |worker| {
320    ///
321    ///     use std::time::Duration;
322    ///     use timely::dataflow::operators::{ToStream, Inspect};
323    ///
324    ///     worker.dataflow::<usize,_,_>(|scope| {
325    ///         (0 .. 10)
326    ///             .to_stream(scope)
327    ///             .inspect(|x| println!("{:?}", x));
328    ///     });
329    ///
330    ///     worker.step_or_park(Some(Duration::from_secs(1)));
331    /// });
332    /// ```
333    pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool {
334
335        {   // Process channel events. Activate responders.
336            let mut allocator = self.allocator.borrow_mut();
337            allocator.receive();
338            let events = allocator.events().clone();
339            let mut borrow = events.borrow_mut();
340            let paths = self.paths.borrow();
341            for (channel, _event) in borrow.drain(..) {
342                // TODO: Pay more attent to `_event`.
343                // Consider tracking whether a channel
344                // in non-empty, and only activating
345                // on the basis of non-empty channels.
346                // TODO: This is a sloppy way to deal
347                // with channels that may not be alloc'd.
348                if let Some(path) = paths.get(&channel) {
349                    self.activations
350                        .borrow_mut()
351                        .activate(&path[..]);
352                }
353            }
354        }
355
356        // Organize activations.
357        self.activations
358            .borrow_mut()
359            .advance();
360
361        // Consider parking only if we have no pending events, some dataflows, and a non-zero duration.
362        let empty_for = self.activations.borrow().empty_for();
363        // Determine the minimum park duration, where `None` are an absence of a constraint.
364        let delay = match (duration, empty_for) {
365            (Some(x), Some(y)) => Some(std::cmp::min(x,y)),
366            (x, y) => x.or(y),
367        };
368
369        if delay != Some(Duration::new(0,0)) {
370
371            // Log parking and flush log.
372            if let Some(l) = self.logging().as_mut() {
373                l.log(crate::logging::ParkEvent::park(delay));
374                l.flush();
375            }
376
377            self.allocator
378                .borrow()
379                .await_events(delay);
380
381            // Log return from unpark.
382            self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
383        }
384        else {   // Schedule active dataflows.
385
386            let active_dataflows = &mut self.active_dataflows;
387            self.activations
388                .borrow_mut()
389                .for_extensions(&[], |index| active_dataflows.push(index));
390
391            let mut dataflows = self.dataflows.borrow_mut();
392            for index in active_dataflows.drain(..) {
393                // Step dataflow if it exists, remove if not incomplete.
394                if let Entry::Occupied(mut entry) = dataflows.entry(index) {
395                    // TODO: This is a moment at which a scheduling decision is being made.
396                    let incomplete = entry.get_mut().step();
397                    if !incomplete {
398                        let mut paths = self.paths.borrow_mut();
399                        for channel in entry.get_mut().channel_ids.drain(..) {
400                            paths.remove(&channel);
401                        }
402                        entry.remove_entry();
403                    }
404                }
405            }
406        }
407
408        // Clean up, indicate if dataflows remain.
409        self.logging.borrow_mut().flush();
410        self.allocator.borrow_mut().release();
411        !self.dataflows.borrow().is_empty()
412    }
413
414    /// Calls `self.step()` as long as `func` evaluates to true.
415    ///
416    /// This method will continually execute even if there is not work
417    /// for the worker to perform. Consider using the similar method
418    /// `Self::step_or_park_while(duration)` to allow the worker to yield
419    /// control if that is appropriate.
420    ///
421    /// # Examples
422    ///
423    /// ```
424    /// timely::execute_from_args(::std::env::args(), |worker| {
425    ///
426    ///     use timely::dataflow::operators::{ToStream, Inspect, Probe};
427    ///
428    ///     let probe =
429    ///     worker.dataflow::<usize,_,_>(|scope| {
430    ///         (0 .. 10)
431    ///             .to_stream(scope)
432    ///             .inspect(|x| println!("{:?}", x))
433    ///             .probe()
434    ///     });
435    ///
436    ///     worker.step_while(|| probe.less_than(&0));
437    /// });
438    /// ```
439    pub fn step_while<F: FnMut()->bool>(&mut self, func: F) {
440        self.step_or_park_while(Some(Duration::from_secs(0)), func)
441    }
442
443    /// Calls `self.step_or_park(duration)` as long as `func` evaluates to true.
444    ///
445    /// This method may yield whenever there is no work to perform, as performed
446    /// by `Self::step_or_park()`. Please consult the documentation for further
447    /// information about that method and its behavior. In particular, the method
448    /// can park the worker indefinitely, if no new work re-awakens the worker.
449    ///
450    /// # Examples
451    ///
452    /// ```
453    /// timely::execute_from_args(::std::env::args(), |worker| {
454    ///
455    ///     use timely::dataflow::operators::{ToStream, Inspect, Probe};
456    ///
457    ///     let probe =
458    ///     worker.dataflow::<usize,_,_>(|scope| {
459    ///         (0 .. 10)
460    ///             .to_stream(scope)
461    ///             .inspect(|x| println!("{:?}", x))
462    ///             .probe()
463    ///     });
464    ///
465    ///     worker.step_or_park_while(None, || probe.less_than(&0));
466    /// });
467    /// ```
468    pub fn step_or_park_while<F: FnMut()->bool>(&mut self, duration: Option<Duration>, mut func: F) {
469        while func() { self.step_or_park(duration); }
470    }
471
472    /// The index of the worker out of its peers.
473    ///
474    /// # Examples
475    /// ```
476    /// timely::execute_from_args(::std::env::args(), |worker| {
477    ///
478    ///     let index = worker.index();
479    ///     let peers = worker.peers();
480    ///     let timer = worker.timer();
481    ///
482    ///     println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
483    ///
484    /// });
485    /// ```
486    pub fn index(&self) -> usize { self.allocator.borrow().index() }
487    /// The total number of peer workers.
488    ///
489    /// # Examples
490    /// ```
491    /// timely::execute_from_args(::std::env::args(), |worker| {
492    ///
493    ///     let index = worker.index();
494    ///     let peers = worker.peers();
495    ///     let timer = worker.timer();
496    ///
497    ///     println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
498    ///
499    /// });
500    /// ```
501    pub fn peers(&self) -> usize { self.allocator.borrow().peers() }
502
503    /// A timer started at the initiation of the timely computation.
504    ///
505    /// # Examples
506    /// ```
507    /// timely::execute_from_args(::std::env::args(), |worker| {
508    ///
509    ///     let index = worker.index();
510    ///     let peers = worker.peers();
511    ///     let timer = worker.timer();
512    ///
513    ///     println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
514    ///
515    /// });
516    /// ```
517    pub fn timer(&self) -> Instant { self.timer }
518
519    /// Allocate a new worker-unique identifier.
520    ///
521    /// This method is public, though it is not expected to be widely used outside
522    /// of the timely dataflow system.
523    pub fn new_identifier(&mut self) -> usize {
524        *self.identifiers.borrow_mut() += 1;
525        *self.identifiers.borrow() - 1
526    }
527
528    /// Access to named loggers.
529    ///
530    /// # Examples
531    ///
532    /// ```
533    /// timely::execute_from_args(::std::env::args(), |worker| {
534    ///
535    ///     worker.log_register()
536    ///           .insert::<timely::logging::TimelyEvent,_>("timely", |time, data|
537    ///               println!("{:?}\t{:?}", time, data)
538    ///           );
539    /// });
540    /// ```
541    pub fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry<crate::logging::WorkerIdentifier>> {
542        self.logging.borrow_mut()
543    }
544
545    /// Construct a new dataflow.
546    ///
547    /// # Examples
548    /// ```
549    /// timely::execute_from_args(::std::env::args(), |worker| {
550    ///
551    ///     // We must supply the timestamp type here, although
552    ///     // it would generally be determined by type inference.
553    ///     worker.dataflow::<usize,_,_>(|scope| {
554    ///
555    ///         // uses of `scope` to build dataflow
556    ///
557    ///     });
558    /// });
559    /// ```
560    pub fn dataflow<T, R, F>(&mut self, func: F) -> R
561    where
562        T: Refines<()>,
563        F: FnOnce(&mut Child<Self, T>)->R,
564    {
565        let logging = self.logging.borrow_mut().get("timely");
566        self.dataflow_core("Dataflow", logging, Box::new(()), |_, child| func(child))
567    }
568
569    /// Construct a new dataflow with a (purely cosmetic) name.
570    ///
571    /// # Examples
572    /// ```
573    /// timely::execute_from_args(::std::env::args(), |worker| {
574    ///
575    ///     // We must supply the timestamp type here, although
576    ///     // it would generally be determined by type inference.
577    ///     worker.dataflow_named::<usize,_,_>("Some Dataflow", |scope| {
578    ///
579    ///         // uses of `scope` to build dataflow
580    ///
581    ///     });
582    /// });
583    /// ```
584    pub fn dataflow_named<T, R, F>(&mut self, name: &str, func: F) -> R
585    where
586        T: Refines<()>,
587        F: FnOnce(&mut Child<Self, T>)->R,
588    {
589        let logging = self.logging.borrow_mut().get("timely");
590        self.dataflow_core(name, logging, Box::new(()), |_, child| func(child))
591    }
592
593    /// Construct a new dataflow with specific configurations.
594    ///
595    /// This method constructs a new dataflow, using a name, logger, and additional
596    /// resources specified as argument. The name is cosmetic, the logger is used to
597    /// handle events generated by the dataflow, and the additional resources are kept
598    /// alive for as long as the dataflow is alive (use case: shared library bindings).
599    ///
600    /// # Examples
601    /// ```
602    /// timely::execute_from_args(::std::env::args(), |worker| {
603    ///
604    ///     // We must supply the timestamp type here, although
605    ///     // it would generally be determined by type inference.
606    ///     worker.dataflow_core::<usize,_,_,_>(
607    ///         "dataflow X",           // Dataflow name
608    ///         None,                   // Optional logger
609    ///         37,                     // Any resources
610    ///         |resources, scope| {    // Closure
611    ///
612    ///             // uses of `resources`, `scope`to build dataflow
613    ///
614    ///         }
615    ///     );
616    /// });
617    /// ```
618    pub fn dataflow_core<T, R, F, V>(&mut self, name: &str, mut logging: Option<TimelyLogger>, mut resources: V, func: F) -> R
619    where
620        T: Refines<()>,
621        F: FnOnce(&mut V, &mut Child<Self, T>)->R,
622        V: Any+'static,
623    {
624        let addr = vec![];
625        let dataflow_index = self.allocate_dataflow_index();
626        let identifier = self.new_identifier();
627
628        let progress_logging = self.logging.borrow_mut().get("timely/progress");
629        let subscope = SubgraphBuilder::new_from(dataflow_index, addr, logging.clone(), progress_logging.clone(), name);
630        let subscope = RefCell::new(subscope);
631
632        let result = {
633            let mut builder = Child {
634                subgraph: &subscope,
635                parent: self.clone(),
636                logging: logging.clone(),
637                progress_logging,
638            };
639            func(&mut resources, &mut builder)
640        };
641
642        let mut operator = subscope.into_inner().build(self);
643
644        if let Some(l) = logging.as_mut() {
645            l.log(crate::logging::OperatesEvent {
646                id: identifier,
647                addr: operator.path().to_vec(),
648                name: operator.name().to_string(),
649            });
650            l.flush();
651        }
652
653        operator.get_internal_summary();
654        operator.set_external_summary();
655
656        let mut temp_channel_ids = self.temp_channel_ids.borrow_mut();
657        let channel_ids = temp_channel_ids.drain(..).collect::<Vec<_>>();
658
659        let wrapper = Wrapper {
660            logging,
661            identifier,
662            operate: Some(Box::new(operator)),
663            resources: Some(Box::new(resources)),
664            channel_ids,
665        };
666        self.dataflows.borrow_mut().insert(dataflow_index, wrapper);
667
668        result
669
670    }
671
672    /// Drops an identified dataflow.
673    ///
674    /// This method removes the identified dataflow, which will no longer be scheduled.
675    /// Various other resources will be cleaned up, though the method is currently in
676    /// public beta rather than expected to work. Please report all crashes and unmet
677    /// expectations!
678    pub fn drop_dataflow(&mut self, dataflow_identifier: usize) {
679        if let Some(mut entry) = self.dataflows.borrow_mut().remove(&dataflow_identifier) {
680            // Garbage collect channel_id to path information.
681            let mut paths = self.paths.borrow_mut();
682            for channel in entry.channel_ids.drain(..) {
683                paths.remove(&channel);
684            }
685        }
686    }
687
688    /// Returns the next index to be used for dataflow construction.
689    ///
690    /// This identifier will appear in the address of contained operators, and can
691    /// be used to drop the dataflow using `self.drop_dataflow()`.
692    pub fn next_dataflow_index(&self) -> usize {
693        *self.dataflow_counter.borrow()
694    }
695
696    /// List the current dataflow indices.
697    pub fn installed_dataflows(&self) -> Vec<usize> {
698        self.dataflows.borrow().keys().cloned().collect()
699    }
700
701    /// True if there is at least one dataflow under management.
702    pub fn has_dataflows(&self) -> bool {
703        !self.dataflows.borrow().is_empty()
704    }
705
706    // Acquire a new distinct dataflow identifier.
707    fn allocate_dataflow_index(&mut self) -> usize {
708        *self.dataflow_counter.borrow_mut() += 1;
709        *self.dataflow_counter.borrow() - 1
710    }
711}
712
713use crate::communication::Message;
714
715impl<A: Allocate> Clone for Worker<A> {
716    fn clone(&self) -> Self {
717        Worker {
718            config: self.config.clone(),
719            timer: self.timer,
720            paths: self.paths.clone(),
721            allocator: self.allocator.clone(),
722            identifiers: self.identifiers.clone(),
723            dataflows: self.dataflows.clone(),
724            dataflow_counter: self.dataflow_counter.clone(),
725            logging: self.logging.clone(),
726            activations: self.activations.clone(),
727            active_dataflows: Vec::new(),
728            temp_channel_ids: self.temp_channel_ids.clone(),
729        }
730    }
731}
732
733struct Wrapper {
734    logging: Option<TimelyLogger>,
735    identifier: usize,
736    operate: Option<Box<dyn Schedule>>,
737    resources: Option<Box<dyn Any>>,
738    channel_ids: Vec<usize>,
739}
740
741impl Wrapper {
742    /// Steps the dataflow, indicates if it remains incomplete.
743    ///
744    /// If the dataflow is incomplete, this call will drop it and its resources,
745    /// dropping the dataflow first and then the resources (so that, e.g., shared
746    /// library bindings will outlive the dataflow).
747    fn step(&mut self) -> bool {
748
749        // Perhaps log information about the start of the schedule call.
750        if let Some(l) = self.logging.as_mut() {
751            l.log(crate::logging::ScheduleEvent::start(self.identifier));
752        }
753
754        let incomplete = self.operate.as_mut().map(|op| op.schedule()).unwrap_or(false);
755        if !incomplete {
756            self.operate = None;
757            self.resources = None;
758        }
759
760        // Perhaps log information about the stop of the schedule call.
761        if let Some(l) = self.logging.as_mut() {
762            l.log(crate::logging::ScheduleEvent::stop(self.identifier));
763        }
764
765        incomplete
766    }
767}
768
769impl Drop for Wrapper {
770    fn drop(&mut self) {
771        if let Some(l) = self.logging.as_mut() {
772            l.log(crate::logging::ShutdownEvent { id: self.identifier });
773        }
774        // ensure drop order
775        self.operate = None;
776        self.resources = None;
777    }
778}