timely/dataflow/stream.rs
1//! A handle to a typed stream of timely data.
2//!
3//! Most high-level timely dataflow programming is done with streams, which are each a handle to an
4//! operator output. Extension methods on the `Stream` type provide the appearance of higher-level
5//! declarative programming, while constructing a dataflow graph underneath.
6
7use crate::progress::{Source, Target};
8
9use crate::communication::Push;
10use crate::dataflow::Scope;
11use crate::dataflow::channels::pushers::tee::TeeHelper;
12use crate::dataflow::channels::BundleCore;
13use std::fmt::{self, Debug};
14use crate::Container;
15
16// use dataflow::scopes::root::loggers::CHANNELS_Q;
17
18/// Abstraction of a stream of `D: Container` records timestamped with `S::Timestamp`.
19///
20/// Internally `Stream` maintains a list of data recipients who should be presented with data
21/// produced by the source of the stream.
22#[derive(Clone)]
23pub struct StreamCore<S: Scope, D> {
24 /// The progress identifier of the stream's data source.
25 name: Source,
26 /// The `Scope` containing the stream.
27 scope: S,
28 /// Maintains a list of Push<Bundle<T, D>> interested in the stream's output.
29 ports: TeeHelper<S::Timestamp, D>,
30}
31
32/// A stream batching data in vectors.
33pub type Stream<S, D> = StreamCore<S, Vec<D>>;
34
35impl<S: Scope, D: Container> StreamCore<S, D> {
36 /// Connects the stream to a destination.
37 ///
38 /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the
39 /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes.
40 pub fn connect_to<P: Push<BundleCore<S::Timestamp, D>>+'static>(&self, target: Target, pusher: P, identifier: usize) {
41
42 let mut logging = self.scope().logging();
43 logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {
44 id: identifier,
45 scope_addr: self.scope.addr(),
46 source: (self.name.node, self.name.port),
47 target: (target.node, target.port),
48 }));
49
50 self.scope.add_edge(self.name, target);
51 self.ports.add_pusher(pusher);
52 }
53 /// Allocates a `Stream` from a supplied `Source` name and rendezvous point.
54 pub fn new(source: Source, output: TeeHelper<S::Timestamp, D>, scope: S) -> Self {
55 Self { name: source, ports: output, scope }
56 }
57 /// The name of the stream's source operator.
58 pub fn name(&self) -> &Source { &self.name }
59 /// The scope immediately containing the stream.
60 pub fn scope(&self) -> S { self.scope.clone() }
61}
62
63impl<S, D> Debug for StreamCore<S, D>
64where
65 S: Scope,
66{
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 f.debug_struct("Stream")
69 .field("source", &self.name)
70 // TODO: Use `.finish_non_exhaustive()` after rust/#67364 lands
71 .finish()
72 }
73}