timely/dataflow/operators/unordered_input.rs
1//! Create new `Streams` connected to external inputs.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use crate::Container;
6
7use crate::scheduling::{Schedule, ActivateOnDrop};
8
9use crate::progress::frontier::Antichain;
10use crate::progress::{Operate, operate::SharedProgress, Timestamp};
11use crate::progress::Source;
12use crate::progress::ChangeBatch;
13
14use crate::Data;
15use crate::dataflow::channels::pushers::{CounterCore as PushCounter, TeeCore};
16use crate::dataflow::channels::pushers::buffer::{BufferCore as PushBuffer, AutoflushSessionCore};
17
18use crate::dataflow::operators::{ActivateCapability, Capability};
19
20use crate::dataflow::{Stream, Scope, StreamCore};
21
22/// Create a new `Stream` and `Handle` through which to supply input.
23pub trait UnorderedInput<G: Scope> {
24 /// Create a new capability-based `Stream` and `Handle` through which to supply input. This
25 /// input supports multiple open epochs (timestamps) at the same time.
26 ///
27 /// The `new_unordered_input` method returns `((Handle, Capability), Stream)` where the `Stream` can be used
28 /// immediately for timely dataflow construction, `Handle` and `Capability` are later used to introduce
29 /// data into the timely dataflow computation.
30 ///
31 /// The `Capability` returned is for the default value of the timestamp type in use. The
32 /// capability can be dropped to inform the system that the input has advanced beyond the
33 /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp
34 /// should be obtained first, via the `delayed` function for `Capability`.
35 ///
36 /// To communicate the end-of-input drop all available capabilities.
37 ///
38 /// # Examples
39 ///
40 /// ```
41 /// use std::sync::{Arc, Mutex};
42 ///
43 /// use timely::*;
44 /// use timely::dataflow::operators::*;
45 /// use timely::dataflow::operators::capture::Extract;
46 /// use timely::dataflow::Stream;
47 ///
48 /// // get send and recv endpoints, wrap send to share
49 /// let (send, recv) = ::std::sync::mpsc::channel();
50 /// let send = Arc::new(Mutex::new(send));
51 ///
52 /// timely::execute(Config::thread(), move |worker| {
53 ///
54 /// // this is only to validate the output.
55 /// let send = send.lock().unwrap().clone();
56 ///
57 /// // create and capture the unordered input.
58 /// let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
59 /// let (input, stream) = scope.new_unordered_input();
60 /// stream.capture_into(send);
61 /// input
62 /// });
63 ///
64 /// // feed values 0..10 at times 0..10.
65 /// for round in 0..10 {
66 /// input.session(cap.clone()).give(round);
67 /// cap = cap.delayed(&(round + 1));
68 /// worker.step();
69 /// }
70 /// }).unwrap();
71 ///
72 /// let extract = recv.extract();
73 /// for i in 0..10 {
74 /// assert_eq!(extract[i], (i, vec![i]));
75 /// }
76 /// ```
77 fn new_unordered_input<D:Data>(&mut self) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>);
78}
79
80
81impl<G: Scope> UnorderedInput<G> for G {
82 fn new_unordered_input<D:Data>(&mut self) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>) {
83 self.new_unordered_input_core()
84 }
85}
86
87/// An unordered handle specialized to vectors.
88pub type UnorderedHandle<T, D> = UnorderedHandleCore<T, Vec<D>>;
89
90/// Create a new `Stream` and `Handle` through which to supply input.
91pub trait UnorderedInputCore<G: Scope> {
92 /// Create a new capability-based [StreamCore] and [UnorderedHandleCore] through which to supply input. This
93 /// input supports multiple open epochs (timestamps) at the same time.
94 ///
95 /// The `new_unordered_input_core` method returns `((HandleCore, Capability), StreamCore)` where the `StreamCore` can be used
96 /// immediately for timely dataflow construction, `HandleCore` and `Capability` are later used to introduce
97 /// data into the timely dataflow computation.
98 ///
99 /// The `Capability` returned is for the default value of the timestamp type in use. The
100 /// capability can be dropped to inform the system that the input has advanced beyond the
101 /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp
102 /// should be obtained first, via the `delayed` function for `Capability`.
103 ///
104 /// To communicate the end-of-input drop all available capabilities.
105 ///
106 /// # Examples
107 ///
108 /// ```
109 /// use std::sync::{Arc, Mutex};
110 ///
111 /// use timely::*;
112 /// use timely::dataflow::operators::*;
113 /// use timely::dataflow::operators::capture::Extract;
114 /// use timely::dataflow::Stream;
115 ///
116 /// // get send and recv endpoints, wrap send to share
117 /// let (send, recv) = ::std::sync::mpsc::channel();
118 /// let send = Arc::new(Mutex::new(send));
119 ///
120 /// timely::execute(Config::thread(), move |worker| {
121 ///
122 /// // this is only to validate the output.
123 /// let send = send.lock().unwrap().clone();
124 ///
125 /// // create and capture the unordered input.
126 /// let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
127 /// let (input, stream) = scope.new_unordered_input_core();
128 /// stream.capture_into(send);
129 /// input
130 /// });
131 ///
132 /// // feed values 0..10 at times 0..10.
133 /// for round in 0..10 {
134 /// input.session(cap.clone()).give(round);
135 /// cap = cap.delayed(&(round + 1));
136 /// worker.step();
137 /// }
138 /// }).unwrap();
139 ///
140 /// let extract = recv.extract();
141 /// for i in 0..10 {
142 /// assert_eq!(extract[i], (i, vec![i]));
143 /// }
144 /// ```
145 fn new_unordered_input_core<D: Container>(&mut self) -> ((UnorderedHandleCore<G::Timestamp, D>, ActivateCapability<G::Timestamp>), StreamCore<G, D>);
146}
147
148
149impl<G: Scope> UnorderedInputCore<G> for G {
150 fn new_unordered_input_core<D: Container>(&mut self) -> ((UnorderedHandleCore<G::Timestamp, D>, ActivateCapability<G::Timestamp>), StreamCore<G, D>) {
151
152 let (output, registrar) = TeeCore::<G::Timestamp, D>::new();
153 let internal = Rc::new(RefCell::new(ChangeBatch::new()));
154 // let produced = Rc::new(RefCell::new(ChangeBatch::new()));
155 let cap = Capability::new(G::Timestamp::minimum(), internal.clone());
156 let counter = PushCounter::new(output);
157 let produced = counter.produced().clone();
158 let peers = self.peers();
159
160 let index = self.allocate_operator_index();
161 let mut address = self.addr();
162 address.push(index);
163
164 let cap = ActivateCapability::new(cap, &address, self.activations());
165
166 let helper = UnorderedHandleCore::new(counter);
167
168 self.add_operator_with_index(Box::new(UnorderedOperator {
169 name: "UnorderedInput".to_owned(),
170 address,
171 shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
172 internal,
173 produced,
174 peers,
175 }), index);
176
177 ((helper, cap), StreamCore::new(Source::new(index, 0), registrar, self.clone()))
178 }
179}
180
181struct UnorderedOperator<T:Timestamp> {
182 name: String,
183 address: Vec<usize>,
184 shared_progress: Rc<RefCell<SharedProgress<T>>>,
185 internal: Rc<RefCell<ChangeBatch<T>>>,
186 produced: Rc<RefCell<ChangeBatch<T>>>,
187 peers: usize,
188}
189
190impl<T:Timestamp> Schedule for UnorderedOperator<T> {
191 fn name(&self) -> &str { &self.name }
192 fn path(&self) -> &[usize] { &self.address[..] }
193 fn schedule(&mut self) -> bool {
194 let shared_progress = &mut *self.shared_progress.borrow_mut();
195 self.internal.borrow_mut().drain_into(&mut shared_progress.internals[0]);
196 self.produced.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
197 false
198 }
199}
200
201impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
202 fn inputs(&self) -> usize { 0 }
203 fn outputs(&self) -> usize { 1 }
204
205 fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
206 let mut borrow = self.internal.borrow_mut();
207 for (time, count) in borrow.drain() {
208 self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));
209 }
210 (Vec::new(), self.shared_progress.clone())
211 }
212
213 fn notify_me(&self) -> bool { false }
214}
215
216/// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation.
217#[derive(Debug)]
218pub struct UnorderedHandleCore<T: Timestamp, D: Container> {
219 buffer: PushBuffer<T, D, PushCounter<T, D, TeeCore<T, D>>>,
220}
221
222impl<T: Timestamp, D: Container> UnorderedHandleCore<T, D> {
223 fn new(pusher: PushCounter<T, D, TeeCore<T, D>>) -> UnorderedHandleCore<T, D> {
224 UnorderedHandleCore {
225 buffer: PushBuffer::new(pusher),
226 }
227 }
228
229 /// Allocates a new automatically flushing session based on the supplied capability.
230 pub fn session<'b>(&'b mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSessionCore<'b, T, D, PushCounter<T, D, TeeCore<T, D>>>> {
231 ActivateOnDrop::new(self.buffer.autoflush_session(cap.capability.clone()), cap.address.clone(), cap.activations.clone())
232 }
233}