1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
//! Hierarchical organization of timely dataflow graphs. use progress::{Timestamp, Operate}; use progress::nested::{Source, Target}; // use logging::TimelyLogger as Logger; use communication::Allocate; use worker::AsWorker; // pub mod root; pub mod child; pub use self::child::Child; // pub use self::root::Root; /// The information a child scope needs from its parent. pub trait ScopeParent: AsWorker+Clone { /// The timestamp associated with data in this scope. type Timestamp : Timestamp; } impl<A: Allocate> ScopeParent for ::worker::Worker<A> { type Timestamp = ::progress::timestamp::RootTimestamp; } /// The fundamental operations required to add and connect operators in a timely dataflow graph. /// /// Importantly, this is often a *shared* object, backed by a `Rc<RefCell<>>` wrapper. Each method /// takes a shared reference, but can be thought of as first calling .clone() and then calling the /// method. Each method does not hold the `RefCell`'s borrow, and should prevent accidental panics. pub trait Scope: ScopeParent { /// A useful name describing the scope. fn name(&self) -> String; /// A sequence of scope identifiers describing the path from the `Root` to this scope. fn addr(&self) -> Vec<usize>; /// Connects a source of data with a target of the data. This only links the two for /// the purposes of tracking progress, rather than effect any data movement itself. fn add_edge(&self, source: Source, target: Target); /// Adds a child `Operate` to the builder's scope. Returns the new child's index. fn add_operator(&mut self, operator: Box<Operate<Self::Timestamp>>) -> usize { let index = self.allocate_operator_index(); let global = self.new_identifier(); self.add_operator_with_indices(operator, index, global); index } /// Allocates a new scope-local operator index. /// /// This method is meant for use with `add_operator_with_index`, which accepts a scope-local /// operator index allocated with this method. This method does cause the scope to expect that /// an operator will be added, and it is an error not to eventually add such an operator. fn allocate_operator_index(&mut self) -> usize; /// Adds a child `Operate` to the builder's scope using a supplied index. /// /// This is used internally when there is a gap between allocate a child identifier and adding the /// child, as happens in subgraph creation. fn add_operator_with_index(&mut self, operator: Box<Operate<Self::Timestamp>>, index: usize) { let global = self.new_identifier(); self.add_operator_with_indices(operator, index, global); } /// Adds a child `Operate` to the builder's scope using supplied indices. /// /// The two indices are the scope-local operator index, and a worker-unique index used for e.g. logging. fn add_operator_with_indices(&mut self, operator: Box<Operate<Self::Timestamp>>, local: usize, global: usize); /// Creates a `Subgraph` from a closure acting on a `Child` scope, and returning /// whatever the closure returns. /// /// Commonly used to create new timely dataflow subgraphs, either creating new input streams /// and the input handle, or ingressing data streams and returning the egresses stream. /// /// # Examples /// ``` /// use timely::dataflow::Scope; /// use timely::dataflow::operators::{Input, Enter, Leave}; /// /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. /// let input = worker.dataflow::<u64,_,_>(|child1| { /// let (input, stream) = child1.new_input::<String>(); /// let output = child1.scoped::<u32,_,_>(|child2| { /// stream.enter(child2).leave() /// }); /// input /// }); /// }); /// ``` fn scoped<T: Timestamp, R, F:FnOnce(&mut Child<Self, T>)->R>(&mut self, func: F) -> R; }