1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
//! Operators to capture and replay timely dataflow streams.
//!
//! The `capture_into` and `replay_into` operators respectively capture what a unary operator
//! sees as input (both data and progress information), and play this information back as a new
//! input.
//!
//! The `capture_into` method requires a `P: EventPusher<T, D>`, which is some type accepting
//! `Event<T, D>` inputs. This module provides several examples, including the linked list
//! `EventLink<T, D>`, and the binary `EventWriter<T, D, W>` wrapping any `W: Write`.
//!
//! Streams are captured at the worker granularity, and one can replay an arbitrary subset of
//! the captured streams on any number of workers (fewer, more, or as many as were captured).
//! There is a protocol the captured stream uses, and implementors of new event streams should
//! make sure to understand this (and complain if it is not clear).
//!
//! # Examples
//!
//! The type `Rc<EventLink<T,D>>` implements a typed linked list,
//! and can be captured into and replayed from.
//!
//! ```rust
//! use std::rc::Rc;
//! use timely::dataflow::Scope;
//! use timely::dataflow::operators::{Capture, ToStream, Inspect};
//! use timely::dataflow::operators::capture::{EventLink, Replay};
//!
//! # #[cfg(miri)] fn main() {}
//! # #[cfg(not(miri))]
//! # fn main() {
//! timely::execute(timely::Config::thread(), |worker| {
//! let handle1 = Rc::new(EventLink::new());
//! let handle2 = Some(handle1.clone());
//!
//! worker.dataflow::<u64,_,_>(|scope1|
//! (0..10).to_stream(scope1)
//! .container::<Vec<_>>()
//! .capture_into(handle1)
//! );
//!
//! worker.dataflow(|scope2| {
//! handle2.replay_into(scope2)
//! .inspect(|x| println!("replayed: {:?}", x));
//! })
//! }).unwrap();
//! # }
//! ```
//!
//! The types `EventWriter<T, D, W>` and `EventReader<T, D, R>` can be
//! captured into and replayed from, respectively. The use binary writers
//! and readers respectively, and can be backed by files, network sockets,
//! etc.
//!
//! ```
//! use std::rc::Rc;
//! use std::net::{TcpListener, TcpStream};
//! use timely::dataflow::Scope;
//! use timely::dataflow::operators::{Capture, ToStream, Inspect};
//! use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay};
//!
//! # #[cfg(miri)] fn main() {}
//! # #[cfg(not(miri))]
//! # fn main() {
//! timely::execute(timely::Config::thread(), |worker| {
//! let list = TcpListener::bind("127.0.0.1:8000").unwrap();
//! let send = TcpStream::connect("127.0.0.1:8000").unwrap();
//! let recv = list.incoming().next().unwrap().unwrap();
//!
//! recv.set_nonblocking(true).unwrap();
//!
//! worker.dataflow::<u64,_,_>(|scope1|
//! (0..10u64)
//! .to_stream(scope1)
//! .container::<Vec<_>>()
//! .capture_into(EventWriter::new(send))
//! );
//!
//! worker.dataflow::<u64,_,_>(|scope2| {
//! Some(EventReader::<_,Vec<u64>,_>::new(recv))
//! .replay_into(scope2)
//! .inspect(|x| println!("replayed: {:?}", x));
//! })
//! }).unwrap();
//! # }
//! ```
pub use Capture;
pub use Replay;
pub use Extract;
pub use ;
pub use EventLink;
pub use EventReader;
pub use EventWriter;