timely/dataflow/scopes/
mod.rs

1//! Hierarchical organization of timely dataflow graphs.
2
3use crate::progress::{Timestamp, Operate, Source, Target};
4use crate::order::Product;
5use crate::progress::timestamp::Refines;
6use crate::communication::Allocate;
7use crate::worker::AsWorker;
8
9pub mod child;
10
11pub use self::child::Child;
12
13/// The information a child scope needs from its parent.
14pub trait ScopeParent: AsWorker+Clone {
15    /// The timestamp associated with data in this scope.
16    type Timestamp : Timestamp;
17}
18
19impl<A: Allocate> ScopeParent for crate::worker::Worker<A> {
20    type Timestamp = ();
21}
22
23
24/// The fundamental operations required to add and connect operators in a timely dataflow graph.
25///
26/// Importantly, this is often a *shared* object, backed by a `Rc<RefCell<>>` wrapper. Each method
27/// takes a shared reference, but can be thought of as first calling .clone() and then calling the
28/// method. Each method does not hold the `RefCell`'s borrow, and should prevent accidental panics.
29pub trait Scope: ScopeParent {
30    /// A useful name describing the scope.
31    fn name(&self) -> String;
32
33    /// A sequence of scope identifiers describing the path from the worker root to this scope.
34    fn addr(&self) -> Vec<usize>;
35
36    /// Connects a source of data with a target of the data. This only links the two for
37    /// the purposes of tracking progress, rather than effect any data movement itself.
38    fn add_edge(&self, source: Source, target: Target);
39
40    /// Adds a child `Operate` to the builder's scope. Returns the new child's index.
41    fn add_operator(&mut self, operator: Box<dyn Operate<Self::Timestamp>>) -> usize {
42        let index = self.allocate_operator_index();
43        let global = self.new_identifier();
44        self.add_operator_with_indices(operator, index, global);
45        index
46    }
47
48    /// Allocates a new scope-local operator index.
49    ///
50    /// This method is meant for use with `add_operator_with_index`, which accepts a scope-local
51    /// operator index allocated with this method. This method does cause the scope to expect that
52    /// an operator will be added, and it is an error not to eventually add such an operator.
53    fn allocate_operator_index(&mut self) -> usize;
54
55    /// Adds a child `Operate` to the builder's scope using a supplied index.
56    ///
57    /// This is used internally when there is a gap between allocate a child identifier and adding the
58    /// child, as happens in subgraph creation.
59    fn add_operator_with_index(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, index: usize) {
60        let global = self.new_identifier();
61        self.add_operator_with_indices(operator, index, global);
62    }
63
64    /// Adds a child `Operate` to the builder's scope using supplied indices.
65    ///
66    /// The two indices are the scope-local operator index, and a worker-unique index used for e.g. logging.
67    fn add_operator_with_indices(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, local: usize, global: usize);
68
69    /// Creates a dataflow subgraph.
70    ///
71    /// This method allows the user to create a nested scope with any timestamp that
72    /// "refines" the enclosing timestamp (informally: extends it in a reversible way).
73    ///
74    /// This is most commonly used to create new iterative contexts, and the provided
75    /// method `iterative` for this task demonstrates the use of this method.
76    ///
77    /// # Examples
78    /// ```
79    /// use timely::dataflow::Scope;
80    /// use timely::dataflow::operators::{Input, Enter, Leave};
81    /// use timely::order::Product;
82    ///
83    /// timely::execute_from_args(std::env::args(), |worker| {
84    ///     // must specify types as nothing else drives inference.
85    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
86    ///         let (input, stream) = child1.new_input::<String>();
87    ///         let output = child1.scoped::<Product<u64,u32>,_,_>("ScopeName", |child2| {
88    ///             stream.enter(child2).leave()
89    ///         });
90    ///         input
91    ///     });
92    /// });
93    /// ```
94    fn scoped<T, R, F>(&mut self, name: &str, func: F) -> R
95    where
96        T: Timestamp+Refines<<Self as ScopeParent>::Timestamp>,
97        F: FnOnce(&mut Child<Self, T>) -> R;
98
99    /// Creates a iterative dataflow subgraph.
100    ///
101    /// This method is a specialization of `scoped` which uses the `Product` timestamp
102    /// combinator, suitable for iterative computations in which iterative development
103    /// at some time cannot influence prior iterations at a future time.
104    ///
105    /// # Examples
106    /// ```
107    /// use timely::dataflow::Scope;
108    /// use timely::dataflow::operators::{Input, Enter, Leave};
109    ///
110    /// timely::execute_from_args(std::env::args(), |worker| {
111    ///     // must specify types as nothing else drives inference.
112    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
113    ///         let (input, stream) = child1.new_input::<String>();
114    ///         let output = child1.iterative::<u32,_,_>(|child2| {
115    ///             stream.enter(child2).leave()
116    ///         });
117    ///         input
118    ///     });
119    /// });
120    /// ```
121    fn iterative<T, R, F>(&mut self, func: F) -> R
122    where
123        T: Timestamp,
124        F: FnOnce(&mut Child<Self, Product<<Self as ScopeParent>::Timestamp, T>>) -> R,
125    {
126        self.scoped::<Product<<Self as ScopeParent>::Timestamp, T>,R,F>("Iterative", func)
127    }
128
129    /// Creates a dataflow region with the same timestamp.
130    ///
131    /// This method is a specialization of `scoped` which uses the same timestamp as the
132    /// containing scope. It is used mainly to group regions of a dataflow computation, and
133    /// provides some computational benefits by abstracting the specifics of the region.
134    ///
135    /// # Examples
136    /// ```
137    /// use timely::dataflow::Scope;
138    /// use timely::dataflow::operators::{Input, Enter, Leave};
139    ///
140    /// timely::execute_from_args(std::env::args(), |worker| {
141    ///     // must specify types as nothing else drives inference.
142    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
143    ///         let (input, stream) = child1.new_input::<String>();
144    ///         let output = child1.region(|child2| {
145    ///             stream.enter(child2).leave()
146    ///         });
147    ///         input
148    ///     });
149    /// });
150    /// ```
151    fn region<R, F>(&mut self, func: F) -> R
152    where
153        F: FnOnce(&mut Child<Self, <Self as ScopeParent>::Timestamp>) -> R,
154    {
155        self.region_named("Region", func)
156    }
157
158    /// Creates a dataflow region with the same timestamp.
159    ///
160    /// This method is a specialization of `scoped` which uses the same timestamp as the
161    /// containing scope. It is used mainly to group regions of a dataflow computation, and
162    /// provides some computational benefits by abstracting the specifics of the region.
163    ///
164    /// This variant allows you to specify a name for the region, which can be read out in
165    /// the timely logging streams.
166    ///
167    /// # Examples
168    /// ```
169    /// use timely::dataflow::Scope;
170    /// use timely::dataflow::operators::{Input, Enter, Leave};
171    ///
172    /// timely::execute_from_args(std::env::args(), |worker| {
173    ///     // must specify types as nothing else drives inference.
174    ///     let input = worker.dataflow::<u64,_,_>(|child1| {
175    ///         let (input, stream) = child1.new_input::<String>();
176    ///         let output = child1.region_named("region", |child2| {
177    ///             stream.enter(child2).leave()
178    ///         });
179    ///         input
180    ///     });
181    /// });
182    /// ```
183    fn region_named<R, F>(&mut self, name: &str, func: F) -> R
184    where
185        F: FnOnce(&mut Child<Self, <Self as ScopeParent>::Timestamp>) -> R,
186    {
187        self.scoped::<<Self as ScopeParent>::Timestamp,R,F>(name, func)
188    }
189
190}