timely/dataflow/operators/generic/
handles.rs

1//! Handles to an operator's input and output streams.
2//!
3//! These handles are used by the generic operator interfaces to allow user closures to interact as
4//! the operator would with its input and output streams.
5
6use std::rc::Rc;
7use std::cell::RefCell;
8
9use crate::progress::Antichain;
10use crate::progress::Timestamp;
11use crate::progress::ChangeBatch;
12use crate::progress::frontier::MutableAntichain;
13use crate::dataflow::channels::pullers::Counter as PullCounter;
14use crate::dataflow::channels::pushers::CounterCore as PushCounter;
15use crate::dataflow::channels::pushers::buffer::{BufferCore, Session};
16use crate::dataflow::channels::BundleCore;
17use crate::communication::{Push, Pull, message::RefOrMut};
18use crate::Container;
19use crate::logging::TimelyLogger as Logger;
20
21use crate::dataflow::operators::InputCapability;
22use crate::dataflow::operators::capability::CapabilityTrait;
23
24/// Handle to an operator's input stream.
25pub struct InputHandleCore<T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> {
26    pull_counter: PullCounter<T, D, P>,
27    internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
28    /// Timestamp summaries from this input to each output.
29    ///
30    /// Each timestamp received through this input may only produce output timestamps
31    /// greater or equal to the input timestamp subjected to at least one of these summaries.
32    summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>, 
33    logging: Option<Logger>,
34}
35
36/// Handle to an operator's input stream, specialized to vectors.
37pub type InputHandle<T, D, P> = InputHandleCore<T, Vec<D>, P>;
38
39/// Handle to an operator's input stream and frontier.
40pub struct FrontieredInputHandleCore<'a, T: Timestamp, D: Container+'a, P: Pull<BundleCore<T, D>>+'a> {
41    /// The underlying input handle.
42    pub handle: &'a mut InputHandleCore<T, D, P>,
43    /// The frontier as reported by timely progress tracking.
44    pub frontier: &'a MutableAntichain<T>,
45}
46
47/// Handle to an operator's input stream and frontier, specialized to vectors.
48pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec<D>, P>;
49
50impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> InputHandleCore<T, D, P> {
51
52    /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
53    /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
54    /// Returns `None` when there's no more data available.
55    #[inline]
56    pub fn next(&mut self) -> Option<(InputCapability<T>, RefOrMut<D>)> {
57        let internal = &self.internal;
58        let summaries = &self.summaries;
59        self.pull_counter.next_guarded().map(|(guard, bundle)| {
60            match bundle.as_ref_or_mut() {
61                RefOrMut::Ref(bundle) => {
62                    (InputCapability::new(internal.clone(), summaries.clone(), guard), RefOrMut::Ref(&bundle.data))
63                },
64                RefOrMut::Mut(bundle) => {
65                    (InputCapability::new(internal.clone(), summaries.clone(), guard), RefOrMut::Mut(&mut bundle.data))
66                },
67            }
68        })
69    }
70
71    /// Repeatedly calls `logic` till exhaustion of the available input data.
72    /// `logic` receives a capability and an input buffer.
73    ///
74    /// # Examples
75    /// ```
76    /// use timely::dataflow::operators::ToStream;
77    /// use timely::dataflow::operators::generic::Operator;
78    /// use timely::dataflow::channels::pact::Pipeline;
79    ///
80    /// timely::example(|scope| {
81    ///     (0..10).to_stream(scope)
82    ///            .unary(Pipeline, "example", |_cap, _info| |input, output| {
83    ///                input.for_each(|cap, data| {
84    ///                    output.session(&cap).give_vec(&mut data.replace(Vec::new()));
85    ///                });
86    ///            });
87    /// });
88    /// ```
89    #[inline]
90    pub fn for_each<F: FnMut(InputCapability<T>, RefOrMut<D>)>(&mut self, mut logic: F) {
91        let mut logging = self.logging.take();
92        while let Some((cap, data)) = self.next() {
93            logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true }));
94            logic(cap, data);
95            logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: false }));
96        }
97        self.logging = logging;
98    }
99
100}
101
102impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>+'a> FrontieredInputHandleCore<'a, T, D, P> {
103    /// Allocate a new frontiered input handle.
104    pub fn new(handle: &'a mut InputHandleCore<T, D, P>, frontier: &'a MutableAntichain<T>) -> Self {
105        FrontieredInputHandleCore {
106            handle,
107            frontier,
108        }
109    }
110
111    /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
112    /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
113    /// Returns `None` when there's no more data available.
114    #[inline]
115    pub fn next(&mut self) -> Option<(InputCapability<T>, RefOrMut<D>)> {
116        self.handle.next()
117    }
118
119    /// Repeatedly calls `logic` till exhaustion of the available input data.
120    /// `logic` receives a capability and an input buffer.
121    ///
122    /// # Examples
123    /// ```
124    /// use timely::dataflow::operators::ToStream;
125    /// use timely::dataflow::operators::generic::Operator;
126    /// use timely::dataflow::channels::pact::Pipeline;
127    ///
128    /// timely::example(|scope| {
129    ///     (0..10).to_stream(scope)
130    ///            .unary(Pipeline, "example", |_cap,_info| |input, output| {
131    ///                input.for_each(|cap, data| {
132    ///                    output.session(&cap).give_vec(&mut data.replace(Vec::new()));
133    ///                });
134    ///            });
135    /// });
136    /// ```
137    #[inline]
138    pub fn for_each<F: FnMut(InputCapability<T>, RefOrMut<D>)>(&mut self, logic: F) {
139        self.handle.for_each(logic)
140    }
141
142    /// Inspect the frontier associated with this input.
143    #[inline]
144    pub fn frontier(&self) -> &'a MutableAntichain<T> {
145        self.frontier
146    }
147}
148
149pub fn _access_pull_counter<T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>>(input: &mut InputHandleCore<T, D, P>) -> &mut PullCounter<T, D, P> {
150    &mut input.pull_counter
151}
152
153/// Constructs an input handle.
154/// Declared separately so that it can be kept private when `InputHandle` is re-exported.
155pub fn new_input_handle<T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>>(
156    pull_counter: PullCounter<T, D, P>, 
157    internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>, 
158    summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>, 
159    logging: Option<Logger>
160) -> InputHandleCore<T, D, P> {
161    InputHandleCore {
162        pull_counter,
163        internal,
164        summaries,
165        logging,
166    }
167}
168
169/// An owned instance of an output buffer which ensures certain API use.
170///
171/// An `OutputWrapper` exists to prevent anyone from using the wrapped buffer in any way other
172/// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the
173/// pusher is flushed (via the `cease` method) once it is no longer used.
174#[derive(Debug)]
175pub struct OutputWrapper<T: Timestamp, D: Container, P: Push<BundleCore<T, D>>> {
176    push_buffer: BufferCore<T, D, PushCounter<T, D, P>>,
177    internal_buffer: Rc<RefCell<ChangeBatch<T>>>,
178}
179
180impl<T: Timestamp, D: Container, P: Push<BundleCore<T, D>>> OutputWrapper<T, D, P> {
181    /// Creates a new output wrapper from a push buffer.
182    pub fn new(push_buffer: BufferCore<T, D, PushCounter<T, D, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>) -> Self {
183        OutputWrapper {
184            push_buffer,
185            internal_buffer,
186        }
187    }
188    /// Borrows the push buffer into a handle, which can be used to send records.
189    ///
190    /// This method ensures that the only access to the push buffer is through the `OutputHandle`
191    /// type which ensures the use of capabilities, and which calls `cease` when it is dropped.
192    pub fn activate(&mut self) -> OutputHandleCore<T, D, P> {
193        OutputHandleCore {
194            push_buffer: &mut self.push_buffer,
195            internal_buffer: &self.internal_buffer,
196        }
197    }
198}
199
200
201/// Handle to an operator's output stream.
202pub struct OutputHandleCore<'a, T: Timestamp, C: Container+'a, P: Push<BundleCore<T, C>>+'a> {
203    push_buffer: &'a mut BufferCore<T, C, PushCounter<T, C, P>>,
204    internal_buffer: &'a Rc<RefCell<ChangeBatch<T>>>,
205}
206
207/// Handle specialized to `Vec`-based container.
208pub type OutputHandle<'a, T, D, P> = OutputHandleCore<'a, T, Vec<D>, P>;
209
210impl<'a, T: Timestamp, C: Container, P: Push<BundleCore<T, C>>> OutputHandleCore<'a, T, C, P> {
211    /// Obtains a session that can send data at the timestamp associated with capability `cap`.
212    ///
213    /// In order to send data at a future timestamp, obtain a capability for the new timestamp
214    /// first, as show in the example.
215    ///
216    /// # Examples
217    /// ```
218    /// use timely::dataflow::operators::ToStream;
219    /// use timely::dataflow::operators::generic::Operator;
220    /// use timely::dataflow::channels::pact::Pipeline;
221    ///
222    /// timely::example(|scope| {
223    ///     (0..10).to_stream(scope)
224    ///            .unary(Pipeline, "example", |_cap, _info| |input, output| {
225    ///                input.for_each(|cap, data| {
226    ///                    let time = cap.time().clone() + 1;
227    ///                    output.session(&cap.delayed(&time))
228    ///                          .give_vec(&mut data.replace(Vec::new()));
229    ///                });
230    ///            });
231    /// });
232    /// ```
233    pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, C, PushCounter<T, C, P>> where 'a: 'b {
234        assert!(cap.valid_for_output(&self.internal_buffer), "Attempted to open output session with invalid capability");
235        self.push_buffer.session(cap.time())
236    }
237
238    /// Flushes all pending data and indicate that no more data immediately follows.
239    pub fn cease(&mut self) {
240        self.push_buffer.cease();
241    }
242}
243
244impl<'a, T: Timestamp, C: Container, P: Push<BundleCore<T, C>>> Drop for OutputHandleCore<'a, T, C, P> {
245    fn drop(&mut self) {
246        self.push_buffer.cease();
247    }
248}