use crate::progress::{Source, Target};
use crate::communication::Push;
use crate::dataflow::Scope;
use crate::dataflow::channels::pushers::tee::TeeHelper;
use crate::dataflow::channels::Bundle;
#[derive(Clone)]
pub struct Stream<S: Scope, D> {
name: Source,
scope: S,
ports: TeeHelper<S::Timestamp, D>,
}
impl<S: Scope, D> Stream<S, D> {
pub fn connect_to<P: Push<Bundle<S::Timestamp, D>>+'static>(&self, target: Target, pusher: P, identifier: usize) {
let mut logging = self.scope().logging();
logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {
id: identifier,
scope_addr: self.scope.addr(),
source: (self.name.index, self.name.port),
target: (target.index, target.port),
}));
self.scope.add_edge(self.name, target);
self.ports.add_pusher(pusher);
}
pub fn new(source: Source, output: TeeHelper<S::Timestamp, D>, scope: S) -> Self {
Stream { name: source, ports: output, scope }
}
pub fn name(&self) -> &Source { &self.name }
pub fn scope(&self) -> S { self.scope.clone() }
}