timely/dataflow/operators/capture/extract.rs
1//! Traits and types for extracting captured timely dataflow streams.
2
3use super::EventCore;
4use crate::Container;
5use crate::Data;
6
7/// Supports extracting a sequence of timestamp and data.
8pub trait Extract<T: Ord, D: Ord> {
9 /// Converts `self` into a sequence of timestamped data.
10 ///
11 /// Currently this is only implemented for `Receiver<Event<T, Vec<D>>>`, and is used only
12 /// to easily pull data out of a timely dataflow computation once it has completed.
13 ///
14 /// # Examples
15 ///
16 /// ```rust
17 /// use std::rc::Rc;
18 /// use std::sync::{Arc, Mutex};
19 /// use timely::dataflow::Scope;
20 /// use timely::dataflow::operators::{Capture, ToStream, Inspect};
21 /// use timely::dataflow::operators::capture::{EventLinkCore, Replay, Extract};
22 ///
23 /// // get send and recv endpoints, wrap send to share
24 /// let (send, recv) = ::std::sync::mpsc::channel();
25 /// let send = Arc::new(Mutex::new(send));
26 ///
27 /// timely::execute(timely::Config::thread(), move |worker| {
28 ///
29 /// // this is only to validate the output.
30 /// let send = send.lock().unwrap().clone();
31 ///
32 /// // these are to capture/replay the stream.
33 /// let handle1 = Rc::new(EventLinkCore::new());
34 /// let handle2 = Some(handle1.clone());
35 ///
36 /// worker.dataflow::<u64,_,_>(|scope1|
37 /// (0..10).to_stream(scope1)
38 /// .capture_into(handle1)
39 /// );
40 ///
41 /// worker.dataflow(|scope2| {
42 /// handle2.replay_into(scope2)
43 /// .capture_into(send)
44 /// });
45 /// }).unwrap();
46 ///
47 /// assert_eq!(recv.extract()[0].1, (0..10).collect::<Vec<_>>());
48 /// ```
49 fn extract(self) -> Vec<(T, Vec<D>)>;
50}
51
52impl<T: Ord, D: Ord+Data> Extract<T,D> for ::std::sync::mpsc::Receiver<EventCore<T, Vec<D>>> {
53 fn extract(self) -> Vec<(T, Vec<D>)> {
54 let mut result = self.extract_core();
55
56 let mut current = 0;
57 for i in 1 .. result.len() {
58 if result[current].0 == result[i].0 {
59 let dataz = ::std::mem::replace(&mut result[i].1, Vec::new());
60 result[current].1.extend(dataz);
61 }
62 else {
63 current = i;
64 }
65 }
66
67 for &mut (_, ref mut data) in &mut result {
68 data.sort();
69 }
70 result.retain(|x| !x.1.is_empty());
71 result
72 }
73}
74
75/// Supports extracting a sequence of timestamp and data.
76pub trait ExtractCore<T, C> {
77 /// Converts `self` into a sequence of timestamped data.
78 ///
79 /// Currently this is only implemented for `Receiver<Event<T, C>>`, and is used only
80 /// to easily pull data out of a timely dataflow computation once it has completed.
81 ///
82 /// # Examples
83 ///
84 /// ```rust
85 /// use std::rc::Rc;
86 /// use std::sync::{Arc, Mutex};
87 /// use timely::dataflow::Scope;
88 /// use timely::dataflow::operators::{Capture, ToStream, Inspect};
89 /// use timely::dataflow::operators::capture::{EventLinkCore, Replay, ExtractCore};
90 ///
91 /// // get send and recv endpoints, wrap send to share
92 /// let (send, recv) = ::std::sync::mpsc::channel();
93 /// let send = Arc::new(Mutex::new(send));
94 ///
95 /// timely::execute(timely::Config::thread(), move |worker| {
96 ///
97 /// // this is only to validate the output.
98 /// let send = send.lock().unwrap().clone();
99 ///
100 /// // these are to capture/replay the stream.
101 /// let handle1 = Rc::new(EventLinkCore::new());
102 /// let handle2 = Some(handle1.clone());
103 ///
104 /// worker.dataflow::<u64,_,_>(|scope1|
105 /// (0..10).to_stream(scope1)
106 /// .capture_into(handle1)
107 /// );
108 ///
109 /// worker.dataflow(|scope2| {
110 /// handle2.replay_into(scope2)
111 /// .capture_into(send)
112 /// });
113 /// }).unwrap();
114 ///
115 /// assert_eq!(recv.extract_core().into_iter().flat_map(|x| x.1).collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
116 /// ```
117 fn extract_core(self) -> Vec<(T, C)>;
118}
119
120impl<T, C: Container> ExtractCore<T, C> for ::std::sync::mpsc::Receiver<EventCore<T, C>> {
121 fn extract_core(self) -> Vec<(T, C)> {
122 let mut result = Vec::new();
123 for event in self {
124 if let EventCore::Messages(time, data) = event {
125 result.push((time, data));
126 }
127 }
128 result.retain(|x| !x.1.is_empty());
129 result
130 }
131}