1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
//! Create new `Streams` connected to external inputs.
use crate::container::CapacityContainerBuilder;
use crate::progress::Timestamp;
use crate::dataflow::operators::{ActivateCapability};
use crate::dataflow::operators::core::{UnorderedInput as UnorderedInputCore, UnorderedHandle as UnorderedHandleCore};
use crate::dataflow::{StreamVec, Scope};
/// Create a new `StreamVec` and `Handle` through which to supply input.
pub trait UnorderedInput<'scope, T: Timestamp> {
/// Create a new capability-based `StreamVec` and `Handle` through which to supply input. This
/// input supports multiple open epochs (timestamps) at the same time.
///
/// The `new_unordered_input` method returns `((Handle, Capability), Stream)` where the `StreamVec` can be used
/// immediately for timely dataflow construction, `Handle` and `Capability` are later used to introduce
/// data into the timely dataflow computation.
///
/// The `Capability` returned is for the default value of the timestamp type in use. The
/// capability can be dropped to inform the system that the input has advanced beyond the
/// capability's timestamp. To retain the ability to send, a new capability at a later timestamp
/// should be obtained first, via the `delayed` function for `Capability`.
///
/// To communicate the end-of-input drop all available capabilities.
///
/// # Examples
///
/// ```
/// use std::sync::{Arc, Mutex};
///
/// use timely::*;
/// use timely::dataflow::operators::*;
/// use timely::dataflow::operators::vec::UnorderedInput;
/// use timely::dataflow::operators::capture::Extract;
///
/// // get send and recv endpoints, wrap send to share
/// let (send, recv) = ::std::sync::mpsc::channel();
/// let send = Arc::new(Mutex::new(send));
///
/// timely::execute(Config::thread(), move |worker| {
///
/// // this is only to validate the output.
/// let send = send.lock().unwrap().clone();
///
/// // create and capture the unordered input.
/// let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
/// let (input, stream) = scope.new_unordered_input();
/// stream.capture_into(send);
/// input
/// });
///
/// // feed values 0..10 at times 0..10.
/// for round in 0..10 {
/// input.activate().session(&cap).give(round);
/// cap = cap.delayed(&(round + 1));
/// worker.step();
/// }
/// }).unwrap();
///
/// let extract = recv.extract();
/// for i in 0..10 {
/// assert_eq!(extract[i], (i, vec![i]));
/// }
/// ```
fn new_unordered_input<D: 'static>(&self) -> ((UnorderedHandle<T, D>, ActivateCapability<T>), StreamVec<'scope, T, D>);
}
impl<'scope, T: Timestamp> UnorderedInput<'scope, T> for Scope<'scope, T> {
fn new_unordered_input<D: 'static>(&self) -> ((UnorderedHandle<T, D>, ActivateCapability<T>), StreamVec<'scope, T, D>) {
UnorderedInputCore::new_unordered_input(self)
}
}
/// An unordered handle specialized to vectors.
pub type UnorderedHandle<T, D> = UnorderedHandleCore<T, CapacityContainerBuilder<Vec<D>>>;