use progress::nested::subgraph::{Source, Target};
use Push;
use dataflow::Scope;
use dataflow::channels::pushers::tee::TeeHelper;
use dataflow::channels::Content;
#[derive(Clone)]
pub struct Stream<S: Scope, D> {
name: Source,
scope: S,
ports: TeeHelper<S::Timestamp, D>,
}
impl<S: Scope, D> Stream<S, D> {
pub fn connect_to<P: Push<(S::Timestamp, Content<D>)>+'static>(&self, target: Target, pusher: P, identifier: usize) {
::logging::log(&::logging::CHANNELS, ::logging::ChannelsEvent {
id: identifier,
scope_addr: self.scope.addr(),
source: (self.name.index, self.name.port),
target: (target.index, target.port),
});
self.scope.add_edge(self.name, target);
self.ports.add_pusher(pusher);
}
pub fn new(source: Source, output: TeeHelper<S::Timestamp, D>, scope: S) -> Self {
Stream { name: source, ports: output, scope: scope }
}
pub fn name(&self) -> &Source { &self.name }
pub fn scope(&self) -> S { self.scope.clone() }
}