timely/dataflow/operators/capture/
extract.rs

1//! Traits and types for extracting captured timely dataflow streams.
2
3use super::EventCore;
4use crate::Container;
5use crate::Data;
6
7/// Supports extracting a sequence of timestamp and data.
8pub trait Extract<T: Ord, D: Ord> {
9    /// Converts `self` into a sequence of timestamped data.
10    ///
11    /// Currently this is only implemented for `Receiver<Event<T, Vec<D>>>`, and is used only
12    /// to easily pull data out of a timely dataflow computation once it has completed.
13    ///
14    /// # Examples
15    ///
16    /// ```rust
17    /// use std::rc::Rc;
18    /// use std::sync::{Arc, Mutex};
19    /// use timely::dataflow::Scope;
20    /// use timely::dataflow::operators::{Capture, ToStream, Inspect};
21    /// use timely::dataflow::operators::capture::{EventLinkCore, Replay, Extract};
22    ///
23    /// // get send and recv endpoints, wrap send to share
24    /// let (send, recv) = ::std::sync::mpsc::channel();
25    /// let send = Arc::new(Mutex::new(send));
26    ///
27    /// timely::execute(timely::Config::thread(), move |worker| {
28    ///
29    ///     // this is only to validate the output.
30    ///     let send = send.lock().unwrap().clone();
31    ///
32    ///     // these are to capture/replay the stream.
33    ///     let handle1 = Rc::new(EventLinkCore::new());
34    ///     let handle2 = Some(handle1.clone());
35    ///
36    ///     worker.dataflow::<u64,_,_>(|scope1|
37    ///         (0..10).to_stream(scope1)
38    ///                .capture_into(handle1)
39    ///     );
40    ///
41    ///     worker.dataflow(|scope2| {
42    ///         handle2.replay_into(scope2)
43    ///                .capture_into(send)
44    ///     });
45    /// }).unwrap();
46    ///
47    /// assert_eq!(recv.extract()[0].1, (0..10).collect::<Vec<_>>());
48    /// ```
49    fn extract(self) -> Vec<(T, Vec<D>)>;
50}
51
52impl<T: Ord, D: Ord+Data> Extract<T,D> for ::std::sync::mpsc::Receiver<EventCore<T, Vec<D>>> {
53    fn extract(self) -> Vec<(T, Vec<D>)> {
54        let mut result = self.extract_core();
55
56        let mut current = 0;
57        for i in 1 .. result.len() {
58            if result[current].0 == result[i].0 {
59                let dataz = ::std::mem::replace(&mut result[i].1, Vec::new());
60                result[current].1.extend(dataz);
61            }
62            else {
63                current = i;
64            }
65        }
66
67        for &mut (_, ref mut data) in &mut result {
68            data.sort();
69        }
70        result.retain(|x| !x.1.is_empty());
71        result
72    }
73}
74
75/// Supports extracting a sequence of timestamp and data.
76pub trait ExtractCore<T, C> {
77    /// Converts `self` into a sequence of timestamped data.
78    ///
79    /// Currently this is only implemented for `Receiver<Event<T, C>>`, and is used only
80    /// to easily pull data out of a timely dataflow computation once it has completed.
81    ///
82    /// # Examples
83    ///
84    /// ```rust
85    /// use std::rc::Rc;
86    /// use std::sync::{Arc, Mutex};
87    /// use timely::dataflow::Scope;
88    /// use timely::dataflow::operators::{Capture, ToStream, Inspect};
89    /// use timely::dataflow::operators::capture::{EventLinkCore, Replay, ExtractCore};
90    ///
91    /// // get send and recv endpoints, wrap send to share
92    /// let (send, recv) = ::std::sync::mpsc::channel();
93    /// let send = Arc::new(Mutex::new(send));
94    ///
95    /// timely::execute(timely::Config::thread(), move |worker| {
96    ///
97    ///     // this is only to validate the output.
98    ///     let send = send.lock().unwrap().clone();
99    ///
100    ///     // these are to capture/replay the stream.
101    ///     let handle1 = Rc::new(EventLinkCore::new());
102    ///     let handle2 = Some(handle1.clone());
103    ///
104    ///     worker.dataflow::<u64,_,_>(|scope1|
105    ///         (0..10).to_stream(scope1)
106    ///                .capture_into(handle1)
107    ///     );
108    ///
109    ///     worker.dataflow(|scope2| {
110    ///         handle2.replay_into(scope2)
111    ///                .capture_into(send)
112    ///     });
113    /// }).unwrap();
114    ///
115    /// assert_eq!(recv.extract_core().into_iter().flat_map(|x| x.1).collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
116    /// ```
117    fn extract_core(self) -> Vec<(T, C)>;
118}
119
120impl<T, C: Container> ExtractCore<T, C> for ::std::sync::mpsc::Receiver<EventCore<T, C>> {
121    fn extract_core(self) -> Vec<(T, C)> {
122        let mut result = Vec::new();
123        for event in self {
124            if let EventCore::Messages(time, data) = event {
125                result.push((time, data));
126            }
127        }
128        result.retain(|x| !x.1.is_empty());
129        result
130    }
131}