timely/dataflow/operators/capture/
capture.rs

1//! Traits and types for capturing timely dataflow streams.
2//!
3//! All timely dataflow streams can be captured, but there are many ways to capture
4//! these streams. A stream may be `capture_into`'d any type implementing `EventPusher`,
5//! and there are several default implementations, including a linked-list, Rust's MPSC
6//! queue, and a binary serializer wrapping any `W: Write`.
7
8use crate::dataflow::{Scope, StreamCore};
9use crate::dataflow::channels::pact::Pipeline;
10use crate::dataflow::channels::pullers::Counter as PullCounter;
11use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
12
13use crate::Container;
14use crate::progress::ChangeBatch;
15use crate::progress::Timestamp;
16
17use super::{EventCore, EventPusherCore};
18
19/// Capture a stream of timestamped data for later replay.
20pub trait Capture<T: Timestamp, D: Container> {
21    /// Captures a stream of timestamped data for later replay.
22    ///
23    /// # Examples
24    ///
25    /// The type `Rc<EventLink<T,D>>` implements a typed linked list,
26    /// and can be captured into and replayed from.
27    ///
28    /// ```rust
29    /// use std::rc::Rc;
30    /// use std::sync::{Arc, Mutex};
31    /// use timely::dataflow::Scope;
32    /// use timely::dataflow::operators::{Capture, ToStream, Inspect};
33    /// use timely::dataflow::operators::capture::{EventLinkCore, Replay, Extract};
34    ///
35    /// // get send and recv endpoints, wrap send to share
36    /// let (send, recv) = ::std::sync::mpsc::channel();
37    /// let send = Arc::new(Mutex::new(send));
38    ///
39    /// timely::execute(timely::Config::thread(), move |worker| {
40    ///
41    ///     // this is only to validate the output.
42    ///     let send = send.lock().unwrap().clone();
43    ///
44    ///     // these are to capture/replay the stream.
45    ///     let handle1 = Rc::new(EventLinkCore::new());
46    ///     let handle2 = Some(handle1.clone());
47    ///
48    ///     worker.dataflow::<u64,_,_>(|scope1|
49    ///         (0..10).to_stream(scope1)
50    ///                .capture_into(handle1)
51    ///     );
52    ///
53    ///     worker.dataflow(|scope2| {
54    ///         handle2.replay_into(scope2)
55    ///                .capture_into(send)
56    ///     });
57    /// }).unwrap();
58    ///
59    /// assert_eq!(recv.extract()[0].1, (0..10).collect::<Vec<_>>());
60    /// ```
61    ///
62    /// The types `EventWriter<T, D, W>` and `EventReader<T, D, R>` can be
63    /// captured into and replayed from, respectively. They use binary writers
64    /// and readers respectively, and can be backed by files, network sockets,
65    /// etc.
66    ///
67    /// ```
68    /// use std::rc::Rc;
69    /// use std::net::{TcpListener, TcpStream};
70    /// use std::sync::{Arc, Mutex};
71    /// use timely::dataflow::Scope;
72    /// use timely::dataflow::operators::{Capture, ToStream, Inspect};
73    /// use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay, Extract};
74    ///
75    /// // get send and recv endpoints, wrap send to share
76    /// let (send0, recv0) = ::std::sync::mpsc::channel();
77    /// let send0 = Arc::new(Mutex::new(send0));
78    ///
79    /// timely::execute(timely::Config::thread(), move |worker| {
80    ///
81    ///     // this is only to validate the output.
82    ///     let send0 = send0.lock().unwrap().clone();
83    ///
84    ///     // these allow us to capture / replay a timely stream.
85    ///     let list = TcpListener::bind("127.0.0.1:8000").unwrap();
86    ///     let send = TcpStream::connect("127.0.0.1:8000").unwrap();
87    ///     let recv = list.incoming().next().unwrap().unwrap();
88    ///
89    ///     recv.set_nonblocking(true).unwrap();
90    ///
91    ///     worker.dataflow::<u64,_,_>(|scope1|
92    ///         (0..10u64)
93    ///             .to_stream(scope1)
94    ///             .capture_into(EventWriter::new(send))
95    ///     );
96    ///
97    ///     worker.dataflow::<u64,_,_>(|scope2| {
98    ///         Some(EventReader::<_,u64,_>::new(recv))
99    ///             .replay_into(scope2)
100    ///             .capture_into(send0)
101    ///     });
102    /// }).unwrap();
103    ///
104    /// assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());
105    /// ```
106    fn capture_into<P: EventPusherCore<T, D>+'static>(&self, pusher: P);
107
108    /// Captures a stream using Rust's MPSC channels.
109    fn capture(&self) -> ::std::sync::mpsc::Receiver<EventCore<T, D>> {
110        let (send, recv) = ::std::sync::mpsc::channel();
111        self.capture_into(send);
112        recv
113    }
114}
115
116impl<S: Scope, D: Container> Capture<S::Timestamp, D> for StreamCore<S, D> {
117    fn capture_into<P: EventPusherCore<S::Timestamp, D>+'static>(&self, mut event_pusher: P) {
118
119        let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());
120        let mut input = PullCounter::new(builder.new_input(self, Pipeline));
121        let mut started = false;
122
123        builder.build(
124            move |progress| {
125
126                if !started {
127                    // discard initial capability.
128                    progress.frontiers[0].update(S::Timestamp::minimum(), -1);
129                    started = true;
130                }
131                if !progress.frontiers[0].is_empty() {
132                    // transmit any frontier progress.
133                    let to_send = ::std::mem::replace(&mut progress.frontiers[0], ChangeBatch::new());
134                    event_pusher.push(EventCore::Progress(to_send.into_inner()));
135                }
136
137                use crate::communication::message::RefOrMut;
138
139                // turn each received message into an event.
140                while let Some(message) = input.next() {
141                    let (time, data) = match message.as_ref_or_mut() {
142                        RefOrMut::Ref(reference) => (&reference.time, RefOrMut::Ref(&reference.data)),
143                        RefOrMut::Mut(reference) => (&reference.time, RefOrMut::Mut(&mut reference.data)),
144                    };
145                    let vector = data.replace(Default::default());
146                    event_pusher.push(EventCore::Messages(time.clone(), vector));
147                }
148                input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]);
149                false
150            }
151        );
152    }
153}