timely/
execute.rs

1//! Starts a timely dataflow execution from configuration information and per-worker logic.
2
3use crate::communication::{initialize_from, Allocator, allocator::AllocateBuilder, WorkerGuards};
4use crate::dataflow::scopes::Child;
5use crate::worker::Worker;
6use crate::{CommunicationConfig, WorkerConfig};
7
8/// Configures the execution of a timely dataflow computation.
9pub struct Config {
10    /// Configuration for the communication infrastructure.
11    pub communication: CommunicationConfig,
12    /// Configuration for the worker threads.
13    pub worker: WorkerConfig,
14}
15
16impl Config {
17    /// Installs options into a [getopts_dep::Options] struct that correspond
18    /// to the parameters in the configuration.
19    ///
20    /// It is the caller's responsibility to ensure that the installed options
21    /// do not conflict with any other options that may exist in `opts`, or
22    /// that may be installed into `opts` in the future.
23    ///
24    /// This method is only available if the `getopts` feature is enabled, which
25    /// it is by default.
26    #[cfg(feature = "getopts")]
27    pub fn install_options(opts: &mut getopts_dep::Options) {
28        CommunicationConfig::install_options(opts);
29        WorkerConfig::install_options(opts);
30    }
31
32    /// Instantiates a configuration based upon the parsed options in `matches`.
33    ///
34    /// The `matches` object must have been constructed from a
35    /// [getopts_dep::Options] which contained at least the options installed by
36    /// [Self::install_options].
37    ///
38    /// This method is only available if the `getopts` feature is enabled, which
39    /// it is by default.
40    #[cfg(feature = "getopts")]
41    pub fn from_matches(matches: &getopts_dep::Matches) -> Result<Config, String> {
42        Ok(Config {
43            communication: CommunicationConfig::from_matches(matches)?,
44            worker: WorkerConfig::from_matches(matches)?,
45        })
46    }
47
48    /// Constructs a new configuration by parsing the supplied text arguments.
49    ///
50    /// Most commonly, callers supply `std::env::args()` as the iterator.
51    #[cfg(feature = "getopts")]
52    pub fn from_args<I: Iterator<Item=String>>(args: I) -> Result<Config, String> {
53        let mut opts = getopts_dep::Options::new();
54        Config::install_options(&mut opts);
55        let matches = opts.parse(args).map_err(|e| e.to_string())?;
56        Config::from_matches(&matches)
57    }
58
59    /// Constructs a `Config` that uses one worker thread and the
60    /// defaults for all other parameters.
61    pub fn thread() -> Config {
62        Config {
63            communication: CommunicationConfig::Thread,
64            worker: WorkerConfig::default(),
65        }
66    }
67
68    /// Constructs an `Config` that uses `n` worker threads and the
69    /// defaults for all other parameters.
70    pub fn process(n: usize) -> Config {
71        Config {
72            communication: CommunicationConfig::Process(n),
73            worker: WorkerConfig::default(),
74        }
75    }
76}
77
78/// Executes a single-threaded timely dataflow computation.
79///
80/// The `example` method takes a closure on a `Scope` which it executes to initialize and run a
81/// timely dataflow computation on a single thread. This method is intended for use in examples,
82/// rather than programs that may need to run across multiple workers.
83///
84/// The `example` method returns whatever the single worker returns from its closure.
85/// This is often nothing, but the worker can return something about the data it saw in order to
86/// test computations.
87///
88/// The method aggressively unwraps returned `Result<_>` types.
89///
90/// # Examples
91///
92/// The simplest example creates a stream of data and inspects it.
93///
94/// ```rust
95/// use timely::dataflow::operators::{ToStream, Inspect};
96///
97/// timely::example(|scope| {
98///     (0..10).to_stream(scope)
99///            .inspect(|x| println!("seen: {:?}", x));
100/// });
101/// ```
102///
103/// This next example captures the data and displays them once the computation is complete.
104///
105/// More precisely, the example captures a stream of events (receiving batches of data,
106/// updates to input capabilities) and displays these events.
107///
108/// ```rust
109/// use timely::dataflow::operators::{ToStream, Inspect, Capture};
110/// use timely::dataflow::operators::capture::Extract;
111///
112/// let data = timely::example(|scope| {
113///     (0..10).to_stream(scope)
114///            .inspect(|x| println!("seen: {:?}", x))
115///            .capture()
116/// });
117///
118/// // the extracted data should have data (0..10) at timestamp 0.
119/// assert_eq!(data.extract()[0].1, (0..10).collect::<Vec<_>>());
120/// ```
121pub fn example<T, F>(func: F) -> T
122where
123    T: Send+'static,
124    F: FnOnce(&mut Child<Worker<crate::communication::allocator::thread::Thread>,u64>)->T+Send+Sync+'static
125{
126    crate::execute::execute_directly(|worker| worker.dataflow(|scope| func(scope)))
127}
128
129
130/// Executes a single-threaded timely dataflow computation.
131///
132/// The `execute_directly` constructs a `Worker` and directly executes the supplied
133/// closure to construct and run a timely dataflow computation. It does not create any
134/// worker threads, and simply uses the current thread of control.
135///
136/// The closure may return a result, which will be returned from the computation.
137///
138/// # Examples
139/// ```rust
140/// use timely::dataflow::operators::{ToStream, Inspect};
141///
142/// // execute a timely dataflow using three worker threads.
143/// timely::execute_directly(|worker| {
144///     worker.dataflow::<(),_,_>(|scope| {
145///         (0..10).to_stream(scope)
146///                .inspect(|x| println!("seen: {:?}", x));
147///     })
148/// });
149/// ```
150pub fn execute_directly<T, F>(func: F) -> T
151where
152    T: Send+'static,
153    F: FnOnce(&mut Worker<crate::communication::allocator::thread::Thread>)->T+Send+Sync+'static
154{
155    let alloc = crate::communication::allocator::thread::Thread::new();
156    let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc);
157    let result = func(&mut worker);
158    while worker.has_dataflows() {
159        worker.step_or_park(None);
160    }
161    result
162}
163
164/// Executes a timely dataflow from a configuration and per-communicator logic.
165///
166/// The `execute` method takes a `Configuration` and spins up some number of
167/// workers threads, each of which execute the supplied closure to construct
168/// and run a timely dataflow computation.
169///
170/// The closure may return a `T: Send+'static`.  The `execute` method returns
171/// immediately after initializing the timely computation with a result
172/// containing a `WorkerGuards<T>` (or error information), which can be joined
173/// to recover the result `T` values from the local workers.
174///
175/// *Note*: if the caller drops the result of `execute`, the drop code will
176/// block awaiting the completion of the timely computation. If the result
177/// of the method is not captured it will be dropped, which gives the experience
178/// of `execute` blocking; to regain control after `execute` be sure to
179/// capture the results and drop them only when the calling thread has no
180/// other work to perform.
181///
182/// # Examples
183/// ```rust
184/// use timely::dataflow::operators::{ToStream, Inspect};
185///
186/// // execute a timely dataflow using three worker threads.
187/// timely::execute(timely::Config::process(3), |worker| {
188///     worker.dataflow::<(),_,_>(|scope| {
189///         (0..10).to_stream(scope)
190///                .inspect(|x| println!("seen: {:?}", x));
191///     })
192/// }).unwrap();
193/// ```
194///
195/// The following example demonstrates how one can extract data from a multi-worker execution.
196/// In a multi-process setting, each process will only receive those records present at workers
197/// in the process.
198///
199/// ```rust
200/// use std::sync::{Arc, Mutex};
201/// use timely::dataflow::operators::{ToStream, Inspect, Capture};
202/// use timely::dataflow::operators::capture::Extract;
203///
204/// // get send and recv endpoints, wrap send to share
205/// let (send, recv) = ::std::sync::mpsc::channel();
206/// let send = Arc::new(Mutex::new(send));
207///
208/// // execute a timely dataflow using three worker threads.
209/// timely::execute(timely::Config::process(3), move |worker| {
210///     let send = send.lock().unwrap().clone();
211///     worker.dataflow::<(),_,_>(move |scope| {
212///         (0..10).to_stream(scope)
213///                .inspect(|x| println!("seen: {:?}", x))
214///                .capture_into(send);
215///     });
216/// }).unwrap();
217///
218/// // the extracted data should have data (0..10) thrice at timestamp 0.
219/// assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::<Vec<_>>());
220/// ```
221pub fn execute<T, F>(
222    mut config: Config,
223    func: F
224) -> Result<WorkerGuards<T>,String>
225where
226    T:Send+'static,
227    F: Fn(&mut Worker<Allocator>)->T+Send+Sync+'static {
228
229    if let CommunicationConfig::Cluster { ref mut log_fn, .. } = config.communication {
230
231        *log_fn = Box::new(|events_setup| {
232
233            let mut result = None;
234            if let Ok(addr) = ::std::env::var("TIMELY_COMM_LOG_ADDR") {
235
236                use ::std::net::TcpStream;
237                use crate::logging::BatchLogger;
238                use crate::dataflow::operators::capture::EventWriterCore;
239
240                eprintln!("enabled COMM logging to {}", addr);
241
242                if let Ok(stream) = TcpStream::connect(&addr) {
243                    let writer = EventWriterCore::new(stream);
244                    let mut logger = BatchLogger::new(writer);
245                    result = Some(crate::logging_core::Logger::new(
246                        ::std::time::Instant::now(),
247                        ::std::time::Duration::default(),
248                        events_setup,
249                        move |time, data| logger.publish_batch(time, data)
250                    ));
251                }
252                else {
253                    panic!("Could not connect to communication log address: {:?}", addr);
254                }
255            }
256            result
257        });
258    }
259
260    let (allocators, other) = config.communication.try_build()?;
261
262    let worker_config = config.worker;
263    initialize_from(allocators, other, move |allocator| {
264
265        let mut worker = Worker::new(worker_config.clone(), allocator);
266
267        // If an environment variable is set, use it as the default timely logging.
268        if let Ok(addr) = ::std::env::var("TIMELY_WORKER_LOG_ADDR") {
269
270            use ::std::net::TcpStream;
271            use crate::logging::{BatchLogger, TimelyEvent};
272            use crate::dataflow::operators::capture::EventWriterCore;
273
274            if let Ok(stream) = TcpStream::connect(&addr) {
275                let writer = EventWriterCore::new(stream);
276                let mut logger = BatchLogger::new(writer);
277                worker.log_register()
278                    .insert::<TimelyEvent,_>("timely", move |time, data|
279                        logger.publish_batch(time, data)
280                    );
281            }
282            else {
283                panic!("Could not connect logging stream to: {:?}", addr);
284            }
285        }
286
287        let result = func(&mut worker);
288        while worker.has_dataflows() {
289            worker.step_or_park(None);
290        }
291        result
292    })
293}
294
295/// Executes a timely dataflow from supplied arguments and per-communicator logic.
296///
297/// The `execute` method takes arguments (typically `std::env::args()`) and spins up some number of
298/// workers threads, each of which execute the supplied closure to construct and run a timely
299/// dataflow computation.
300///
301/// The closure may return a `T: Send+'static`.  The `execute_from_args` method
302/// returns immediately after initializing the timely computation with a result
303/// containing a `WorkerGuards<T>` (or error information), which can be joined
304/// to recover the result `T` values from the local workers.
305///
306/// *Note*: if the caller drops the result of `execute_from_args`, the drop code
307/// will block awaiting the completion of the timely computation.
308///
309/// The arguments `execute_from_args` currently understands are:
310///
311/// `-w, --workers`: number of per-process worker threads.
312///
313/// `-n, --processes`: number of processes involved in the computation.
314///
315/// `-p, --process`: identity of this process; from 0 to n-1.
316///
317/// `-h, --hostfile`: a text file whose lines are "hostname:port" in order of process identity.
318/// If not specified, `localhost` will be used, with port numbers increasing from 2101 (chosen
319/// arbitrarily).
320///
321/// This method is only available if the `getopts` feature is enabled, which
322/// it is by default.
323///
324/// # Examples
325///
326/// ```rust
327/// use timely::dataflow::operators::{ToStream, Inspect};
328///
329/// // execute a timely dataflow using command line parameters
330/// timely::execute_from_args(std::env::args(), |worker| {
331///     worker.dataflow::<(),_,_>(|scope| {
332///         (0..10).to_stream(scope)
333///                .inspect(|x| println!("seen: {:?}", x));
334///     })
335/// }).unwrap();
336/// ```
337/// ```ignore
338/// host0% cargo run -- -w 2 -n 4 -h hosts.txt -p 0
339/// host1% cargo run -- -w 2 -n 4 -h hosts.txt -p 1
340/// host2% cargo run -- -w 2 -n 4 -h hosts.txt -p 2
341/// host3% cargo run -- -w 2 -n 4 -h hosts.txt -p 3
342/// ```
343/// ```ignore
344/// % cat hosts.txt
345/// host0:port
346/// host1:port
347/// host2:port
348/// host3:port
349/// ```
350#[cfg(feature = "getopts")]
351pub fn execute_from_args<I, T, F>(iter: I, func: F) -> Result<WorkerGuards<T>,String>
352    where I: Iterator<Item=String>,
353          T:Send+'static,
354          F: Fn(&mut Worker<Allocator>)->T+Send+Sync+'static, {
355    let config = Config::from_args(iter)?;
356    execute(config, func)
357}
358
359/// Executes a timely dataflow from supplied allocators and logging.
360///
361/// Refer to [`execute`](execute()) for more details.
362///
363/// ```rust
364/// use timely::dataflow::operators::{ToStream, Inspect};
365/// use timely::WorkerConfig;
366///
367/// // execute a timely dataflow using command line parameters
368/// let (builders, other) = timely::CommunicationConfig::Process(3).try_build().unwrap();
369/// timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| {
370///     worker.dataflow::<(),_,_>(|scope| {
371///         (0..10).to_stream(scope)
372///                .inspect(|x| println!("seen: {:?}", x));
373///     })
374/// }).unwrap();
375/// ```
376pub fn execute_from<A, T, F>(
377    builders: Vec<A>,
378    others: Box<dyn ::std::any::Any+Send>,
379    worker_config: WorkerConfig,
380    func: F,
381) -> Result<WorkerGuards<T>, String>
382where
383    A: AllocateBuilder+'static,
384    T: Send+'static,
385    F: Fn(&mut Worker<<A as AllocateBuilder>::Allocator>)->T+Send+Sync+'static {
386    initialize_from(builders, others, move |allocator| {
387        let mut worker = Worker::new(worker_config.clone(), allocator);
388        let result = func(&mut worker);
389        while worker.has_dataflows() {
390            worker.step_or_park(None);
391        }
392        result
393    })
394}