timely/dataflow/operators/
unordered_input.rs

1//! Create new `Streams` connected to external inputs.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use crate::Container;
6
7use crate::scheduling::{Schedule, ActivateOnDrop};
8
9use crate::progress::frontier::Antichain;
10use crate::progress::{Operate, operate::SharedProgress, Timestamp};
11use crate::progress::Source;
12use crate::progress::ChangeBatch;
13
14use crate::Data;
15use crate::dataflow::channels::pushers::{CounterCore as PushCounter, TeeCore};
16use crate::dataflow::channels::pushers::buffer::{BufferCore as PushBuffer, AutoflushSessionCore};
17
18use crate::dataflow::operators::{ActivateCapability, Capability};
19
20use crate::dataflow::{Stream, Scope, StreamCore};
21
22/// Create a new `Stream` and `Handle` through which to supply input.
23pub trait UnorderedInput<G: Scope> {
24    /// Create a new capability-based `Stream` and `Handle` through which to supply input. This
25    /// input supports multiple open epochs (timestamps) at the same time.
26    ///
27    /// The `new_unordered_input` method returns `((Handle, Capability), Stream)` where the `Stream` can be used
28    /// immediately for timely dataflow construction, `Handle` and `Capability` are later used to introduce
29    /// data into the timely dataflow computation.
30    ///
31    /// The `Capability` returned is for the default value of the timestamp type in use. The
32    /// capability can be dropped to inform the system that the input has advanced beyond the
33    /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp
34    /// should be obtained first, via the `delayed` function for `Capability`.
35    ///
36    /// To communicate the end-of-input drop all available capabilities.
37    ///
38    /// # Examples
39    ///
40    /// ```
41    /// use std::sync::{Arc, Mutex};
42    ///
43    /// use timely::*;
44    /// use timely::dataflow::operators::*;
45    /// use timely::dataflow::operators::capture::Extract;
46    /// use timely::dataflow::Stream;
47    ///
48    /// // get send and recv endpoints, wrap send to share
49    /// let (send, recv) = ::std::sync::mpsc::channel();
50    /// let send = Arc::new(Mutex::new(send));
51    ///
52    /// timely::execute(Config::thread(), move |worker| {
53    ///
54    ///     // this is only to validate the output.
55    ///     let send = send.lock().unwrap().clone();
56    ///
57    ///     // create and capture the unordered input.
58    ///     let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
59    ///         let (input, stream) = scope.new_unordered_input();
60    ///         stream.capture_into(send);
61    ///         input
62    ///     });
63    ///
64    ///     // feed values 0..10 at times 0..10.
65    ///     for round in 0..10 {
66    ///         input.session(cap.clone()).give(round);
67    ///         cap = cap.delayed(&(round + 1));
68    ///         worker.step();
69    ///     }
70    /// }).unwrap();
71    ///
72    /// let extract = recv.extract();
73    /// for i in 0..10 {
74    ///     assert_eq!(extract[i], (i, vec![i]));
75    /// }
76    /// ```
77    fn new_unordered_input<D:Data>(&mut self) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>);
78}
79
80
81impl<G: Scope> UnorderedInput<G> for G {
82    fn new_unordered_input<D:Data>(&mut self) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>) {
83        self.new_unordered_input_core()
84    }
85}
86
87/// An unordered handle specialized to vectors.
88pub type UnorderedHandle<T, D> = UnorderedHandleCore<T, Vec<D>>;
89
90/// Create a new `Stream` and `Handle` through which to supply input.
91pub trait UnorderedInputCore<G: Scope> {
92    /// Create a new capability-based [StreamCore] and [UnorderedHandleCore] through which to supply input. This
93    /// input supports multiple open epochs (timestamps) at the same time.
94    ///
95    /// The `new_unordered_input_core` method returns `((HandleCore, Capability), StreamCore)` where the `StreamCore` can be used
96    /// immediately for timely dataflow construction, `HandleCore` and `Capability` are later used to introduce
97    /// data into the timely dataflow computation.
98    ///
99    /// The `Capability` returned is for the default value of the timestamp type in use. The
100    /// capability can be dropped to inform the system that the input has advanced beyond the
101    /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp
102    /// should be obtained first, via the `delayed` function for `Capability`.
103    ///
104    /// To communicate the end-of-input drop all available capabilities.
105    ///
106    /// # Examples
107    ///
108    /// ```
109    /// use std::sync::{Arc, Mutex};
110    ///
111    /// use timely::*;
112    /// use timely::dataflow::operators::*;
113    /// use timely::dataflow::operators::capture::Extract;
114    /// use timely::dataflow::Stream;
115    ///
116    /// // get send and recv endpoints, wrap send to share
117    /// let (send, recv) = ::std::sync::mpsc::channel();
118    /// let send = Arc::new(Mutex::new(send));
119    ///
120    /// timely::execute(Config::thread(), move |worker| {
121    ///
122    ///     // this is only to validate the output.
123    ///     let send = send.lock().unwrap().clone();
124    ///
125    ///     // create and capture the unordered input.
126    ///     let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
127    ///         let (input, stream) = scope.new_unordered_input_core();
128    ///         stream.capture_into(send);
129    ///         input
130    ///     });
131    ///
132    ///     // feed values 0..10 at times 0..10.
133    ///     for round in 0..10 {
134    ///         input.session(cap.clone()).give(round);
135    ///         cap = cap.delayed(&(round + 1));
136    ///         worker.step();
137    ///     }
138    /// }).unwrap();
139    ///
140    /// let extract = recv.extract();
141    /// for i in 0..10 {
142    ///     assert_eq!(extract[i], (i, vec![i]));
143    /// }
144    /// ```
145    fn new_unordered_input_core<D: Container>(&mut self) -> ((UnorderedHandleCore<G::Timestamp, D>, ActivateCapability<G::Timestamp>), StreamCore<G, D>);
146}
147
148
149impl<G: Scope> UnorderedInputCore<G> for G {
150    fn new_unordered_input_core<D: Container>(&mut self) -> ((UnorderedHandleCore<G::Timestamp, D>, ActivateCapability<G::Timestamp>), StreamCore<G, D>) {
151
152        let (output, registrar) = TeeCore::<G::Timestamp, D>::new();
153        let internal = Rc::new(RefCell::new(ChangeBatch::new()));
154        // let produced = Rc::new(RefCell::new(ChangeBatch::new()));
155        let cap = Capability::new(G::Timestamp::minimum(), internal.clone());
156        let counter = PushCounter::new(output);
157        let produced = counter.produced().clone();
158        let peers = self.peers();
159
160        let index = self.allocate_operator_index();
161        let mut address = self.addr();
162        address.push(index);
163
164        let cap = ActivateCapability::new(cap, &address, self.activations());
165
166        let helper = UnorderedHandleCore::new(counter);
167
168        self.add_operator_with_index(Box::new(UnorderedOperator {
169            name: "UnorderedInput".to_owned(),
170            address,
171            shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
172            internal,
173            produced,
174            peers,
175        }), index);
176
177        ((helper, cap), StreamCore::new(Source::new(index, 0), registrar, self.clone()))
178    }
179}
180
181struct UnorderedOperator<T:Timestamp> {
182    name: String,
183    address: Vec<usize>,
184    shared_progress: Rc<RefCell<SharedProgress<T>>>,
185    internal:   Rc<RefCell<ChangeBatch<T>>>,
186    produced:   Rc<RefCell<ChangeBatch<T>>>,
187    peers:     usize,
188}
189
190impl<T:Timestamp> Schedule for UnorderedOperator<T> {
191    fn name(&self) -> &str { &self.name }
192    fn path(&self) -> &[usize] { &self.address[..] }
193    fn schedule(&mut self) -> bool {
194        let shared_progress = &mut *self.shared_progress.borrow_mut();
195        self.internal.borrow_mut().drain_into(&mut shared_progress.internals[0]);
196        self.produced.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
197        false
198    }
199}
200
201impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
202    fn inputs(&self) -> usize { 0 }
203    fn outputs(&self) -> usize { 1 }
204
205    fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
206        let mut borrow = self.internal.borrow_mut();
207        for (time, count) in borrow.drain() {
208            self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));
209        }
210        (Vec::new(), self.shared_progress.clone())
211    }
212
213    fn notify_me(&self) -> bool { false }
214}
215
216/// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation.
217#[derive(Debug)]
218pub struct UnorderedHandleCore<T: Timestamp, D: Container> {
219    buffer: PushBuffer<T, D, PushCounter<T, D, TeeCore<T, D>>>,
220}
221
222impl<T: Timestamp, D: Container> UnorderedHandleCore<T, D> {
223    fn new(pusher: PushCounter<T, D, TeeCore<T, D>>) -> UnorderedHandleCore<T, D> {
224        UnorderedHandleCore {
225            buffer: PushBuffer::new(pusher),
226        }
227    }
228
229    /// Allocates a new automatically flushing session based on the supplied capability.
230    pub fn session<'b>(&'b mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSessionCore<'b, T, D, PushCounter<T, D, TeeCore<T, D>>>> {
231        ActivateOnDrop::new(self.buffer.autoflush_session(cap.capability.clone()), cap.address.clone(), cap.activations.clone())
232    }
233}