timely/dataflow/operators/generic/handles.rs
1//! Handles to an operator's input and output streams.
2//!
3//! These handles are used by the generic operator interfaces to allow user closures to interact as
4//! the operator would with its input and output streams.
5
6use std::rc::Rc;
7use std::cell::RefCell;
8
9use crate::progress::Antichain;
10use crate::progress::Timestamp;
11use crate::progress::ChangeBatch;
12use crate::progress::frontier::MutableAntichain;
13use crate::dataflow::channels::pullers::Counter as PullCounter;
14use crate::dataflow::channels::pushers::CounterCore as PushCounter;
15use crate::dataflow::channels::pushers::buffer::{BufferCore, Session};
16use crate::dataflow::channels::BundleCore;
17use crate::communication::{Push, Pull, message::RefOrMut};
18use crate::Container;
19use crate::logging::TimelyLogger as Logger;
20
21use crate::dataflow::operators::InputCapability;
22use crate::dataflow::operators::capability::CapabilityTrait;
23
24/// Handle to an operator's input stream.
25pub struct InputHandleCore<T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> {
26 pull_counter: PullCounter<T, D, P>,
27 internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
28 /// Timestamp summaries from this input to each output.
29 ///
30 /// Each timestamp received through this input may only produce output timestamps
31 /// greater or equal to the input timestamp subjected to at least one of these summaries.
32 summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
33 logging: Option<Logger>,
34}
35
36/// Handle to an operator's input stream, specialized to vectors.
37pub type InputHandle<T, D, P> = InputHandleCore<T, Vec<D>, P>;
38
39/// Handle to an operator's input stream and frontier.
40pub struct FrontieredInputHandleCore<'a, T: Timestamp, D: Container+'a, P: Pull<BundleCore<T, D>>+'a> {
41 /// The underlying input handle.
42 pub handle: &'a mut InputHandleCore<T, D, P>,
43 /// The frontier as reported by timely progress tracking.
44 pub frontier: &'a MutableAntichain<T>,
45}
46
47/// Handle to an operator's input stream and frontier, specialized to vectors.
48pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec<D>, P>;
49
50impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> InputHandleCore<T, D, P> {
51
52 /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
53 /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
54 /// Returns `None` when there's no more data available.
55 #[inline]
56 pub fn next(&mut self) -> Option<(InputCapability<T>, RefOrMut<D>)> {
57 let internal = &self.internal;
58 let summaries = &self.summaries;
59 self.pull_counter.next_guarded().map(|(guard, bundle)| {
60 match bundle.as_ref_or_mut() {
61 RefOrMut::Ref(bundle) => {
62 (InputCapability::new(internal.clone(), summaries.clone(), guard), RefOrMut::Ref(&bundle.data))
63 },
64 RefOrMut::Mut(bundle) => {
65 (InputCapability::new(internal.clone(), summaries.clone(), guard), RefOrMut::Mut(&mut bundle.data))
66 },
67 }
68 })
69 }
70
71 /// Repeatedly calls `logic` till exhaustion of the available input data.
72 /// `logic` receives a capability and an input buffer.
73 ///
74 /// # Examples
75 /// ```
76 /// use timely::dataflow::operators::ToStream;
77 /// use timely::dataflow::operators::generic::Operator;
78 /// use timely::dataflow::channels::pact::Pipeline;
79 ///
80 /// timely::example(|scope| {
81 /// (0..10).to_stream(scope)
82 /// .unary(Pipeline, "example", |_cap, _info| |input, output| {
83 /// input.for_each(|cap, data| {
84 /// output.session(&cap).give_vec(&mut data.replace(Vec::new()));
85 /// });
86 /// });
87 /// });
88 /// ```
89 #[inline]
90 pub fn for_each<F: FnMut(InputCapability<T>, RefOrMut<D>)>(&mut self, mut logic: F) {
91 let mut logging = self.logging.take();
92 while let Some((cap, data)) = self.next() {
93 logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true }));
94 logic(cap, data);
95 logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: false }));
96 }
97 self.logging = logging;
98 }
99
100}
101
102impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>+'a> FrontieredInputHandleCore<'a, T, D, P> {
103 /// Allocate a new frontiered input handle.
104 pub fn new(handle: &'a mut InputHandleCore<T, D, P>, frontier: &'a MutableAntichain<T>) -> Self {
105 FrontieredInputHandleCore {
106 handle,
107 frontier,
108 }
109 }
110
111 /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
112 /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
113 /// Returns `None` when there's no more data available.
114 #[inline]
115 pub fn next(&mut self) -> Option<(InputCapability<T>, RefOrMut<D>)> {
116 self.handle.next()
117 }
118
119 /// Repeatedly calls `logic` till exhaustion of the available input data.
120 /// `logic` receives a capability and an input buffer.
121 ///
122 /// # Examples
123 /// ```
124 /// use timely::dataflow::operators::ToStream;
125 /// use timely::dataflow::operators::generic::Operator;
126 /// use timely::dataflow::channels::pact::Pipeline;
127 ///
128 /// timely::example(|scope| {
129 /// (0..10).to_stream(scope)
130 /// .unary(Pipeline, "example", |_cap,_info| |input, output| {
131 /// input.for_each(|cap, data| {
132 /// output.session(&cap).give_vec(&mut data.replace(Vec::new()));
133 /// });
134 /// });
135 /// });
136 /// ```
137 #[inline]
138 pub fn for_each<F: FnMut(InputCapability<T>, RefOrMut<D>)>(&mut self, logic: F) {
139 self.handle.for_each(logic)
140 }
141
142 /// Inspect the frontier associated with this input.
143 #[inline]
144 pub fn frontier(&self) -> &'a MutableAntichain<T> {
145 self.frontier
146 }
147}
148
149pub fn _access_pull_counter<T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>>(input: &mut InputHandleCore<T, D, P>) -> &mut PullCounter<T, D, P> {
150 &mut input.pull_counter
151}
152
153/// Constructs an input handle.
154/// Declared separately so that it can be kept private when `InputHandle` is re-exported.
155pub fn new_input_handle<T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>>(
156 pull_counter: PullCounter<T, D, P>,
157 internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
158 summaries: Rc<RefCell<Vec<Antichain<T::Summary>>>>,
159 logging: Option<Logger>
160) -> InputHandleCore<T, D, P> {
161 InputHandleCore {
162 pull_counter,
163 internal,
164 summaries,
165 logging,
166 }
167}
168
169/// An owned instance of an output buffer which ensures certain API use.
170///
171/// An `OutputWrapper` exists to prevent anyone from using the wrapped buffer in any way other
172/// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the
173/// pusher is flushed (via the `cease` method) once it is no longer used.
174#[derive(Debug)]
175pub struct OutputWrapper<T: Timestamp, D: Container, P: Push<BundleCore<T, D>>> {
176 push_buffer: BufferCore<T, D, PushCounter<T, D, P>>,
177 internal_buffer: Rc<RefCell<ChangeBatch<T>>>,
178}
179
180impl<T: Timestamp, D: Container, P: Push<BundleCore<T, D>>> OutputWrapper<T, D, P> {
181 /// Creates a new output wrapper from a push buffer.
182 pub fn new(push_buffer: BufferCore<T, D, PushCounter<T, D, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>) -> Self {
183 OutputWrapper {
184 push_buffer,
185 internal_buffer,
186 }
187 }
188 /// Borrows the push buffer into a handle, which can be used to send records.
189 ///
190 /// This method ensures that the only access to the push buffer is through the `OutputHandle`
191 /// type which ensures the use of capabilities, and which calls `cease` when it is dropped.
192 pub fn activate(&mut self) -> OutputHandleCore<T, D, P> {
193 OutputHandleCore {
194 push_buffer: &mut self.push_buffer,
195 internal_buffer: &self.internal_buffer,
196 }
197 }
198}
199
200
201/// Handle to an operator's output stream.
202pub struct OutputHandleCore<'a, T: Timestamp, C: Container+'a, P: Push<BundleCore<T, C>>+'a> {
203 push_buffer: &'a mut BufferCore<T, C, PushCounter<T, C, P>>,
204 internal_buffer: &'a Rc<RefCell<ChangeBatch<T>>>,
205}
206
207/// Handle specialized to `Vec`-based container.
208pub type OutputHandle<'a, T, D, P> = OutputHandleCore<'a, T, Vec<D>, P>;
209
210impl<'a, T: Timestamp, C: Container, P: Push<BundleCore<T, C>>> OutputHandleCore<'a, T, C, P> {
211 /// Obtains a session that can send data at the timestamp associated with capability `cap`.
212 ///
213 /// In order to send data at a future timestamp, obtain a capability for the new timestamp
214 /// first, as show in the example.
215 ///
216 /// # Examples
217 /// ```
218 /// use timely::dataflow::operators::ToStream;
219 /// use timely::dataflow::operators::generic::Operator;
220 /// use timely::dataflow::channels::pact::Pipeline;
221 ///
222 /// timely::example(|scope| {
223 /// (0..10).to_stream(scope)
224 /// .unary(Pipeline, "example", |_cap, _info| |input, output| {
225 /// input.for_each(|cap, data| {
226 /// let time = cap.time().clone() + 1;
227 /// output.session(&cap.delayed(&time))
228 /// .give_vec(&mut data.replace(Vec::new()));
229 /// });
230 /// });
231 /// });
232 /// ```
233 pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, C, PushCounter<T, C, P>> where 'a: 'b {
234 assert!(cap.valid_for_output(&self.internal_buffer), "Attempted to open output session with invalid capability");
235 self.push_buffer.session(cap.time())
236 }
237
238 /// Flushes all pending data and indicate that no more data immediately follows.
239 pub fn cease(&mut self) {
240 self.push_buffer.cease();
241 }
242}
243
244impl<'a, T: Timestamp, C: Container, P: Push<BundleCore<T, C>>> Drop for OutputHandleCore<'a, T, C, P> {
245 fn drop(&mut self) {
246 self.push_buffer.cease();
247 }
248}