1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
//! A handle to a typed stream of timely data. //! //! Most high-level timely dataflow programming is done with streams, which are each a handle to an //! operator output. Extension methods on the `Stream` type provide the appearance of higher-level //! declarative progamming, while constructing a dataflow graph underneath. use progress::nested::subgraph::{Source, Target}; use Push; use dataflow::Scope; use dataflow::channels::pushers::tee::TeeHelper; use dataflow::channels::Content; // use dataflow::scopes::root::loggers::CHANNELS_Q; /// Abstraction of a stream of `D: Data` records timestamped with `S::Timestamp`. /// /// Internally `Stream` maintains a list of data recipients who should be presented with data /// produced by the source of the stream. #[derive(Clone)] pub struct Stream<S: Scope, D> { /// The progress identifier of the stream's data source. name: Source, /// The `Scope` containing the stream. scope: S, /// Maintains a list of Push<(T, Content<D>)> interested in the stream's output. ports: TeeHelper<S::Timestamp, D>, } impl<S: Scope, D> Stream<S, D> { /// Connects the stream to a destination. /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. pub fn connect_to<P: Push<(S::Timestamp, Content<D>)>+'static>(&self, target: Target, pusher: P, identifier: usize) { ::logging::log(&::logging::CHANNELS, ::logging::ChannelsEvent { id: identifier, scope_addr: self.scope.addr(), source: (self.name.index, self.name.port), target: (target.index, target.port), }); self.scope.add_edge(self.name, target); self.ports.add_pusher(pusher); } /// Allocates a `Stream` from a supplied `Source` name and rendezvous point. pub fn new(source: Source, output: TeeHelper<S::Timestamp, D>, scope: S) -> Self { Stream { name: source, ports: output, scope: scope } } /// The name of the stream's source operator. pub fn name(&self) -> &Source { &self.name } /// The scope immediately containing the stream. pub fn scope(&self) -> S { self.scope.clone() } }