timely_communication/
initialize.rs

1//! Initialization logic for a generic instance of the `Allocate` channel allocation trait.
2
3use std::thread;
4#[cfg(feature = "getopts")]
5use std::io::BufRead;
6#[cfg(feature = "getopts")]
7use getopts;
8use std::sync::Arc;
9
10use std::any::Any;
11
12use crate::allocator::thread::ThreadBuilder;
13use crate::allocator::{AllocateBuilder, Process, Generic, GenericBuilder};
14use crate::allocator::zero_copy::allocator_process::ProcessBuilder;
15use crate::allocator::zero_copy::initialize::initialize_networking;
16
17use crate::logging::{CommunicationSetup, CommunicationEvent};
18use logging_core::Logger;
19use std::fmt::{Debug, Formatter};
20
21
22/// Possible configurations for the communication infrastructure.
23pub enum Config {
24    /// Use one thread.
25    Thread,
26    /// Use one process with an indicated number of threads.
27    Process(usize),
28    /// Use one process with an indicated number of threads. Use zero-copy exchange channels.
29    ProcessBinary(usize),
30    /// Expect multiple processes.
31    Cluster {
32        /// Number of per-process worker threads
33        threads: usize,
34        /// Identity of this process
35        process: usize,
36        /// Addresses of all processes
37        addresses: Vec<String>,
38        /// Verbosely report connection process
39        report: bool,
40        /// Closure to create a new logger for a communication thread
41        log_fn: Box<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEvent, CommunicationSetup>> + Send + Sync>,
42    }
43}
44
45impl Debug for Config {
46    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
47        match self {
48            Config::Thread => write!(f, "Config::Thread()"),
49            Config::Process(n) => write!(f, "Config::Process({})", n),
50            Config::ProcessBinary(n) => write!(f, "Config::ProcessBinary({})", n),
51            Config::Cluster { threads, process, addresses, report, .. } => f
52                .debug_struct("Config::Cluster")
53                .field("threads", threads)
54                .field("process", process)
55                .field("addresses", addresses)
56                .field("report", report)
57                // TODO: Use `.finish_non_exhaustive()` after rust/#67364 lands
58                .finish()
59        }
60    }
61}
62
63impl Config {
64    /// Installs options into a [`getopts::Options`] struct that corresponds
65    /// to the parameters in the configuration.
66    ///
67    /// It is the caller's responsibility to ensure that the installed options
68    /// do not conflict with any other options that may exist in `opts`, or
69    /// that may be installed into `opts` in the future.
70    ///
71    /// This method is only available if the `getopts` feature is enabled, which
72    /// it is by default.
73    #[cfg(feature = "getopts")]
74    pub fn install_options(opts: &mut getopts::Options) {
75        opts.optopt("w", "threads", "number of per-process worker threads", "NUM");
76        opts.optopt("p", "process", "identity of this process", "IDX");
77        opts.optopt("n", "processes", "number of processes", "NUM");
78        opts.optopt("h", "hostfile", "text file whose lines are process addresses", "FILE");
79        opts.optflag("r", "report", "reports connection progress");
80        opts.optflag("z", "zerocopy", "enable zero-copy for intra-process communication");
81    }
82
83    /// Instantiates a configuration based upon the parsed options in `matches`.
84    ///
85    /// The `matches` object must have been constructed from a
86    /// [`getopts::Options`] which contained at least the options installed by
87    /// [`Self::install_options`].
88    ///
89    /// This method is only available if the `getopts` feature is enabled, which
90    /// it is by default.
91    #[cfg(feature = "getopts")]
92    pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
93        let threads = matches.opt_get_default("w", 1_usize).map_err(|e| e.to_string())?;
94        let process = matches.opt_get_default("p", 0_usize).map_err(|e| e.to_string())?;
95        let processes = matches.opt_get_default("n", 1_usize).map_err(|e| e.to_string())?;
96        let report = matches.opt_present("report");
97        let zerocopy = matches.opt_present("zerocopy");
98
99        if processes > 1 {
100            let mut addresses = Vec::new();
101            if let Some(hosts) = matches.opt_str("h") {
102                let file = ::std::fs::File::open(hosts.clone()).map_err(|e| e.to_string())?;
103                let reader = ::std::io::BufReader::new(file);
104                for line in reader.lines().take(processes) {
105                    addresses.push(line.map_err(|e| e.to_string())?);
106                }
107                if addresses.len() < processes {
108                    return Err(format!("could only read {} addresses from {}, but -n: {}", addresses.len(), hosts, processes));
109                }
110            }
111            else {
112                for index in 0..processes {
113                    addresses.push(format!("localhost:{}", 2101 + index));
114                }
115            }
116
117            assert!(processes == addresses.len());
118            Ok(Config::Cluster {
119                threads,
120                process,
121                addresses,
122                report,
123                log_fn: Box::new( | _ | None),
124            })
125        } else if threads > 1 {
126            if zerocopy {
127                Ok(Config::ProcessBinary(threads))
128            } else {
129                Ok(Config::Process(threads))
130            }
131        } else {
132            Ok(Config::Thread)
133        }
134    }
135
136    /// Constructs a new configuration by parsing the supplied text arguments.
137    ///
138    /// Most commonly, callers supply `std::env::args()` as the iterator.
139    ///
140    /// This method is only available if the `getopts` feature is enabled, which
141    /// it is by default.
142    #[cfg(feature = "getopts")]
143    pub fn from_args<I: Iterator<Item=String>>(args: I) -> Result<Config, String> {
144        let mut opts = getopts::Options::new();
145        Config::install_options(&mut opts);
146        let matches = opts.parse(args).map_err(|e| e.to_string())?;
147        Config::from_matches(&matches)
148    }
149
150    /// Attempts to assemble the described communication infrastructure.
151    pub fn try_build(self) -> Result<(Vec<GenericBuilder>, Box<dyn Any+Send>), String> {
152        match self {
153            Config::Thread => {
154                Ok((vec![GenericBuilder::Thread(ThreadBuilder)], Box::new(())))
155            },
156            Config::Process(threads) => {
157                Ok((Process::new_vector(threads).into_iter().map(|x| GenericBuilder::Process(x)).collect(), Box::new(())))
158            },
159            Config::ProcessBinary(threads) => {
160                Ok((ProcessBuilder::new_vector(threads).into_iter().map(|x| GenericBuilder::ProcessBinary(x)).collect(), Box::new(())))
161            },
162            Config::Cluster { threads, process, addresses, report, log_fn } => {
163                match initialize_networking(addresses, process, threads, report, log_fn) {
164                    Ok((stuff, guard)) => {
165                        Ok((stuff.into_iter().map(|x| GenericBuilder::ZeroCopy(x)).collect(), Box::new(guard)))
166                    },
167                    Err(err) => Err(format!("failed to initialize networking: {}", err))
168                }
169            },
170        }
171    }
172}
173
174/// Initializes communication and executes a distributed computation.
175///
176/// This method allocates an `allocator::Generic` for each thread, spawns local worker threads,
177/// and invokes the supplied function with the allocator.
178/// The method returns a `WorkerGuards<T>` which can be `join`ed to retrieve the return values
179/// (or errors) of the workers.
180///
181///
182/// # Examples
183/// ```
184/// use timely_communication::Allocate;
185///
186/// // configure for two threads, just one process.
187/// let config = timely_communication::Config::Process(2);
188///
189/// // initializes communication, spawns workers
190/// let guards = timely_communication::initialize(config, |mut allocator| {
191///     println!("worker {} started", allocator.index());
192///
193///     // allocates a pair of senders list and one receiver.
194///     let (mut senders, mut receiver) = allocator.allocate(0);
195///
196///     // send typed data along each channel
197///     use timely_communication::Message;
198///     senders[0].send(Message::from_typed(format!("hello, {}", 0)));
199///     senders[1].send(Message::from_typed(format!("hello, {}", 1)));
200///
201///     // no support for termination notification,
202///     // we have to count down ourselves.
203///     let mut expecting = 2;
204///     while expecting > 0 {
205///         allocator.receive();
206///         if let Some(message) = receiver.recv() {
207///             use std::ops::Deref;
208///             println!("worker {}: received: <{}>", allocator.index(), message.deref());
209///             expecting -= 1;
210///         }
211///         allocator.release();
212///     }
213///
214///     // optionally, return something
215///     allocator.index()
216/// });
217///
218/// // computation runs until guards are joined or dropped.
219/// if let Ok(guards) = guards {
220///     for guard in guards.join() {
221///         println!("result: {:?}", guard);
222///     }
223/// }
224/// else { println!("error in computation"); }
225/// ```
226///
227/// The should produce output like:
228///
229/// ```ignore
230/// worker 0 started
231/// worker 1 started
232/// worker 0: received: <hello, 0>
233/// worker 1: received: <hello, 1>
234/// worker 0: received: <hello, 0>
235/// worker 1: received: <hello, 1>
236/// result: Ok(0)
237/// result: Ok(1)
238/// ```
239pub fn initialize<T:Send+'static, F: Fn(Generic)->T+Send+Sync+'static>(
240    config: Config,
241    func: F,
242) -> Result<WorkerGuards<T>,String> {
243    let (allocators, others) = config.try_build()?;
244    initialize_from(allocators, others, func)
245}
246
247/// Initializes computation and runs a distributed computation.
248///
249/// This version of `initialize` allows you to explicitly specify the allocators that
250/// you want to use, by providing an explicit list of allocator builders. Additionally,
251/// you provide `others`, a `Box<Any>` which will be held by the resulting worker guard
252/// and dropped when it is dropped, which allows you to join communication threads.
253///
254/// # Examples
255/// ```
256/// use timely_communication::Allocate;
257///
258/// // configure for two threads, just one process.
259/// let builders = timely_communication::allocator::process::Process::new_vector(2);
260///
261/// // initializes communication, spawns workers
262/// let guards = timely_communication::initialize_from(builders, Box::new(()), |mut allocator| {
263///     println!("worker {} started", allocator.index());
264///
265///     // allocates a pair of senders list and one receiver.
266///     let (mut senders, mut receiver) = allocator.allocate(0);
267///
268///     // send typed data along each channel
269///     use timely_communication::Message;
270///     senders[0].send(Message::from_typed(format!("hello, {}", 0)));
271///     senders[1].send(Message::from_typed(format!("hello, {}", 1)));
272///
273///     // no support for termination notification,
274///     // we have to count down ourselves.
275///     let mut expecting = 2;
276///     while expecting > 0 {
277///         allocator.receive();
278///         if let Some(message) = receiver.recv() {
279///             use std::ops::Deref;
280///             println!("worker {}: received: <{}>", allocator.index(), message.deref());
281///             expecting -= 1;
282///         }
283///         allocator.release();
284///     }
285///
286///     // optionally, return something
287///     allocator.index()
288/// });
289///
290/// // computation runs until guards are joined or dropped.
291/// if let Ok(guards) = guards {
292///     for guard in guards.join() {
293///         println!("result: {:?}", guard);
294///     }
295/// }
296/// else { println!("error in computation"); }
297/// ```
298pub fn initialize_from<A, T, F>(
299    builders: Vec<A>,
300    others: Box<dyn Any+Send>,
301    func: F,
302) -> Result<WorkerGuards<T>,String>
303where
304    A: AllocateBuilder+'static,
305    T: Send+'static,
306    F: Fn(<A as AllocateBuilder>::Allocator)->T+Send+Sync+'static
307{
308    let logic = Arc::new(func);
309    let mut guards = Vec::new();
310    for (index, builder) in builders.into_iter().enumerate() {
311        let clone = logic.clone();
312        guards.push(thread::Builder::new()
313                            .name(format!("timely:work-{}", index))
314                            .spawn(move || {
315                                let communicator = builder.build();
316                                (*clone)(communicator)
317                            })
318                            .map_err(|e| format!("{:?}", e))?);
319    }
320
321    Ok(WorkerGuards { guards, others })
322}
323
324/// Maintains `JoinHandle`s for worker threads.
325pub struct WorkerGuards<T:Send+'static> {
326    guards: Vec<::std::thread::JoinHandle<T>>,
327    others: Box<dyn Any+Send>,
328}
329
330impl<T:Send+'static> WorkerGuards<T> {
331
332    /// Returns a reference to the indexed guard.
333    pub fn guards(&self) -> &[std::thread::JoinHandle<T>] {
334        &self.guards[..]
335    }
336
337    /// Provides access to handles that are not worker threads.
338    pub fn others(&self) -> &Box<dyn Any+Send> {
339        &self.others
340    }
341
342    /// Waits on the worker threads and returns the results they produce.
343    pub fn join(mut self) -> Vec<Result<T, String>> {
344        self.guards
345            .drain(..)
346            .map(|guard| guard.join().map_err(|e| format!("{:?}", e)))
347            .collect()
348    }
349}
350
351impl<T:Send+'static> Drop for WorkerGuards<T> {
352    fn drop(&mut self) {
353        for guard in self.guards.drain(..) {
354            guard.join().expect("Worker panic");
355        }
356        // println!("WORKER THREADS JOINED");
357    }
358}