1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
//! Hierarchical organization of timely dataflow graphs. use progress::{Timestamp, Operate}; use progress::nested::{Source, Target}; use timely_communication::Allocate; pub mod root; pub mod child; pub use self::child::Child; pub use self::root::Root; /// The information a child scope needs from its parent. pub trait ScopeParent: Allocate+Clone { /// The timestamp associated with data in this scope. type Timestamp : Timestamp; /// Allocates a new locally unique identifier. fn new_identifier(&mut self) -> usize; } /// The fundamental operations required to add and connect operators in a timely dataflow graph. /// /// Importantly, this is often a *shared* object, backed by a `Rc<RefCell<>>` wrapper. Each method /// takes a shared reference, but can be thought of as first calling .clone() and then calling the /// method. Each method does not hold the `RefCell`'s borrow, and should prevent accidental panics. pub trait Scope: ScopeParent { /// A useful name describing the scope. fn name(&self) -> String; /// A sequence of scope identifiers describing the path from the `Root` to this scope. fn addr(&self) -> Vec<usize>; /// Connects a source of data with a target of the data. This only links the two for /// the purposes of tracking progress, rather than effect any data movement itself. fn add_edge(&self, source: Source, target: Target); /// Adds a child `Operate` to the builder's scope. Returns the new child's index. fn add_operator<SC: Operate<Self::Timestamp>+'static>(&mut self, scope: SC) -> usize { let index = self.allocate_operator_index(); self.add_operator_with_index(scope, index); index } /// Allocates a new operator index, for use with `add_operator_with_index`. fn allocate_operator_index(&mut self) -> usize; /// Adds a child `Operate` to the builder's scope using a supplied index. /// /// This is used interally when there is a gap between allocate a child identifier and adding the /// child, as happens in subgraph creation. fn add_operator_with_index<SC: Operate<Self::Timestamp>+'static>(&mut self, scope: SC, index: usize); /// Creates a `Subgraph` from a closure acting on a `Child` scope, and returning /// whatever the closure returns. /// /// Commonly used to create new timely dataflow subgraphs, either creating new input streams /// and the input handle, or ingressing data streams and returning the egresses stream. /// /// # Examples /// ``` /// use timely::dataflow::Scope; /// use timely::dataflow::operators::{Input, Enter, Leave}; /// /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. /// let input = worker.dataflow::<u64,_,_>(|child1| { /// let (input, stream) = child1.new_input::<String>(); /// let output = child1.scoped::<u32,_,_>(|child2| { /// stream.enter(child2).leave() /// }); /// input /// }); /// }); /// ``` fn scoped<T: Timestamp, R, F:FnOnce(&mut Child<Self, T>)->R>(&mut self, func: F) -> R; }