timely/dataflow/scopes/mod.rs
1//! Hierarchical organization of timely dataflow graphs.
2
3use crate::progress::{Timestamp, Operate, Source, Target};
4use crate::order::Product;
5use crate::progress::timestamp::Refines;
6use crate::communication::Allocate;
7use crate::worker::AsWorker;
8
9pub mod child;
10
11pub use self::child::Child;
12
13/// The information a child scope needs from its parent.
14pub trait ScopeParent: AsWorker+Clone {
15 /// The timestamp associated with data in this scope.
16 type Timestamp : Timestamp;
17}
18
19impl<A: Allocate> ScopeParent for crate::worker::Worker<A> {
20 type Timestamp = ();
21}
22
23
24/// The fundamental operations required to add and connect operators in a timely dataflow graph.
25///
26/// Importantly, this is often a *shared* object, backed by a `Rc<RefCell<>>` wrapper. Each method
27/// takes a shared reference, but can be thought of as first calling .clone() and then calling the
28/// method. Each method does not hold the `RefCell`'s borrow, and should prevent accidental panics.
29pub trait Scope: ScopeParent {
30 /// A useful name describing the scope.
31 fn name(&self) -> String;
32
33 /// A sequence of scope identifiers describing the path from the worker root to this scope.
34 fn addr(&self) -> Vec<usize>;
35
36 /// Connects a source of data with a target of the data. This only links the two for
37 /// the purposes of tracking progress, rather than effect any data movement itself.
38 fn add_edge(&self, source: Source, target: Target);
39
40 /// Adds a child `Operate` to the builder's scope. Returns the new child's index.
41 fn add_operator(&mut self, operator: Box<dyn Operate<Self::Timestamp>>) -> usize {
42 let index = self.allocate_operator_index();
43 let global = self.new_identifier();
44 self.add_operator_with_indices(operator, index, global);
45 index
46 }
47
48 /// Allocates a new scope-local operator index.
49 ///
50 /// This method is meant for use with `add_operator_with_index`, which accepts a scope-local
51 /// operator index allocated with this method. This method does cause the scope to expect that
52 /// an operator will be added, and it is an error not to eventually add such an operator.
53 fn allocate_operator_index(&mut self) -> usize;
54
55 /// Adds a child `Operate` to the builder's scope using a supplied index.
56 ///
57 /// This is used internally when there is a gap between allocate a child identifier and adding the
58 /// child, as happens in subgraph creation.
59 fn add_operator_with_index(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, index: usize) {
60 let global = self.new_identifier();
61 self.add_operator_with_indices(operator, index, global);
62 }
63
64 /// Adds a child `Operate` to the builder's scope using supplied indices.
65 ///
66 /// The two indices are the scope-local operator index, and a worker-unique index used for e.g. logging.
67 fn add_operator_with_indices(&mut self, operator: Box<dyn Operate<Self::Timestamp>>, local: usize, global: usize);
68
69 /// Creates a dataflow subgraph.
70 ///
71 /// This method allows the user to create a nested scope with any timestamp that
72 /// "refines" the enclosing timestamp (informally: extends it in a reversible way).
73 ///
74 /// This is most commonly used to create new iterative contexts, and the provided
75 /// method `iterative` for this task demonstrates the use of this method.
76 ///
77 /// # Examples
78 /// ```
79 /// use timely::dataflow::Scope;
80 /// use timely::dataflow::operators::{Input, Enter, Leave};
81 /// use timely::order::Product;
82 ///
83 /// timely::execute_from_args(std::env::args(), |worker| {
84 /// // must specify types as nothing else drives inference.
85 /// let input = worker.dataflow::<u64,_,_>(|child1| {
86 /// let (input, stream) = child1.new_input::<String>();
87 /// let output = child1.scoped::<Product<u64,u32>,_,_>("ScopeName", |child2| {
88 /// stream.enter(child2).leave()
89 /// });
90 /// input
91 /// });
92 /// });
93 /// ```
94 fn scoped<T, R, F>(&mut self, name: &str, func: F) -> R
95 where
96 T: Timestamp+Refines<<Self as ScopeParent>::Timestamp>,
97 F: FnOnce(&mut Child<Self, T>) -> R;
98
99 /// Creates a iterative dataflow subgraph.
100 ///
101 /// This method is a specialization of `scoped` which uses the `Product` timestamp
102 /// combinator, suitable for iterative computations in which iterative development
103 /// at some time cannot influence prior iterations at a future time.
104 ///
105 /// # Examples
106 /// ```
107 /// use timely::dataflow::Scope;
108 /// use timely::dataflow::operators::{Input, Enter, Leave};
109 ///
110 /// timely::execute_from_args(std::env::args(), |worker| {
111 /// // must specify types as nothing else drives inference.
112 /// let input = worker.dataflow::<u64,_,_>(|child1| {
113 /// let (input, stream) = child1.new_input::<String>();
114 /// let output = child1.iterative::<u32,_,_>(|child2| {
115 /// stream.enter(child2).leave()
116 /// });
117 /// input
118 /// });
119 /// });
120 /// ```
121 fn iterative<T, R, F>(&mut self, func: F) -> R
122 where
123 T: Timestamp,
124 F: FnOnce(&mut Child<Self, Product<<Self as ScopeParent>::Timestamp, T>>) -> R,
125 {
126 self.scoped::<Product<<Self as ScopeParent>::Timestamp, T>,R,F>("Iterative", func)
127 }
128
129 /// Creates a dataflow region with the same timestamp.
130 ///
131 /// This method is a specialization of `scoped` which uses the same timestamp as the
132 /// containing scope. It is used mainly to group regions of a dataflow computation, and
133 /// provides some computational benefits by abstracting the specifics of the region.
134 ///
135 /// # Examples
136 /// ```
137 /// use timely::dataflow::Scope;
138 /// use timely::dataflow::operators::{Input, Enter, Leave};
139 ///
140 /// timely::execute_from_args(std::env::args(), |worker| {
141 /// // must specify types as nothing else drives inference.
142 /// let input = worker.dataflow::<u64,_,_>(|child1| {
143 /// let (input, stream) = child1.new_input::<String>();
144 /// let output = child1.region(|child2| {
145 /// stream.enter(child2).leave()
146 /// });
147 /// input
148 /// });
149 /// });
150 /// ```
151 fn region<R, F>(&mut self, func: F) -> R
152 where
153 F: FnOnce(&mut Child<Self, <Self as ScopeParent>::Timestamp>) -> R,
154 {
155 self.region_named("Region", func)
156 }
157
158 /// Creates a dataflow region with the same timestamp.
159 ///
160 /// This method is a specialization of `scoped` which uses the same timestamp as the
161 /// containing scope. It is used mainly to group regions of a dataflow computation, and
162 /// provides some computational benefits by abstracting the specifics of the region.
163 ///
164 /// This variant allows you to specify a name for the region, which can be read out in
165 /// the timely logging streams.
166 ///
167 /// # Examples
168 /// ```
169 /// use timely::dataflow::Scope;
170 /// use timely::dataflow::operators::{Input, Enter, Leave};
171 ///
172 /// timely::execute_from_args(std::env::args(), |worker| {
173 /// // must specify types as nothing else drives inference.
174 /// let input = worker.dataflow::<u64,_,_>(|child1| {
175 /// let (input, stream) = child1.new_input::<String>();
176 /// let output = child1.region_named("region", |child2| {
177 /// stream.enter(child2).leave()
178 /// });
179 /// input
180 /// });
181 /// });
182 /// ```
183 fn region_named<R, F>(&mut self, name: &str, func: F) -> R
184 where
185 F: FnOnce(&mut Child<Self, <Self as ScopeParent>::Timestamp>) -> R,
186 {
187 self.scoped::<<Self as ScopeParent>::Timestamp,R,F>(name, func)
188 }
189
190}