use std::thread;
#[cfg(feature = "getopts")]
use std::io::BufRead;
use std::sync::Arc;
use std::fmt::{Debug, Formatter};
use std::any::Any;
use std::ops::DerefMut;
#[cfg(feature = "getopts")]
use getopts;
use timely_logging::Logger;
use crate::allocator::thread::ThreadBuilder;
use crate::allocator::{AllocateBuilder, Allocator, AllocatorBuilder, ProcessBuilder};
use crate::allocator::zero_copy::bytes_slab::BytesRefill;
use crate::allocator::zero_copy::initialize::initialize_networking;
use crate::logging::{CommunicationEventBuilder, CommunicationSetup};
#[derive(Clone)]
pub enum Config {
Thread,
Process(usize),
ProcessBinary(usize),
Cluster {
threads: usize,
process: usize,
addresses: Vec<String>,
report: bool,
zerocopy: bool,
log_fn: Arc<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEventBuilder>> + Send + Sync>,
}
}
impl Debug for Config {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Config::Thread => write!(f, "Config::Thread()"),
Config::Process(n) => write!(f, "Config::Process({})", n),
Config::ProcessBinary(n) => write!(f, "Config::ProcessBinary({})", n),
Config::Cluster { threads, process, addresses, report, zerocopy, log_fn: _ } => f
.debug_struct("Config::Cluster")
.field("threads", threads)
.field("process", process)
.field("addresses", addresses)
.field("report", report)
.field("zerocopy", zerocopy)
.finish_non_exhaustive()
}
}
}
impl Config {
#[cfg(feature = "getopts")]
pub fn install_options(opts: &mut getopts::Options) {
opts.optopt("w", "threads", "number of per-process worker threads", "NUM");
opts.optopt("p", "process", "identity of this process", "IDX");
opts.optopt("n", "processes", "number of processes", "NUM");
opts.optopt("h", "hostfile", "text file whose lines are process addresses", "FILE");
opts.optflag("r", "report", "reports connection progress");
opts.optflag("z", "zerocopy", "enable zero-copy for intra-process communication");
}
#[cfg(feature = "getopts")]
pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
let threads = matches.opt_get_default("w", 1_usize).map_err(|e| e.to_string())?;
let process = matches.opt_get_default("p", 0_usize).map_err(|e| e.to_string())?;
let processes = matches.opt_get_default("n", 1_usize).map_err(|e| e.to_string())?;
let report = matches.opt_present("report");
let zerocopy = matches.opt_present("zerocopy");
if processes > 1 {
let mut addresses = Vec::new();
if let Some(hosts) = matches.opt_str("h") {
let file = ::std::fs::File::open(hosts.clone()).map_err(|e| e.to_string())?;
let reader = ::std::io::BufReader::new(file);
for line in reader.lines().take(processes) {
addresses.push(line.map_err(|e| e.to_string())?);
}
if addresses.len() < processes {
return Err(format!("could only read {} addresses from {}, but -n: {}", addresses.len(), hosts, processes));
}
}
else {
for index in 0..processes {
addresses.push(format!("localhost:{}", 2101 + index));
}
}
assert_eq!(processes, addresses.len());
Ok(Config::Cluster {
threads,
process,
addresses,
report,
zerocopy,
log_fn: Arc::new(|_| None),
})
} else if threads > 1 {
if zerocopy {
Ok(Config::ProcessBinary(threads))
} else {
Ok(Config::Process(threads))
}
} else {
Ok(Config::Thread)
}
}
#[cfg(feature = "getopts")]
pub fn from_args<I: Iterator<Item=String>>(args: I) -> Result<(Config, Vec<String>), String> {
let mut opts = getopts::Options::new();
Config::install_options(&mut opts);
let matches = opts.parse(args).map_err(|e| e.to_string())?;
Config::from_matches(&matches).map(|c| (c, matches.free))
}
pub fn try_build(self) -> Result<(Vec<AllocatorBuilder>, Box<dyn Any+Send>), String> {
let refill = BytesRefill {
logic: Arc::new(|size| Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target=[u8]>>),
limit: None,
};
self.try_build_with(refill)
}
pub fn try_build_with(self, refill: BytesRefill) -> Result<(Vec<AllocatorBuilder>, Box<dyn Any+Send>), String> {
match self {
Config::Thread => {
Ok((vec![AllocatorBuilder::Thread(ThreadBuilder)], Box::new(())))
},
Config::Process(threads) => {
let builders = ProcessBuilder::new_typed_vector(threads, refill)
.into_iter()
.map(AllocatorBuilder::Process)
.collect();
Ok((builders, Box::new(())))
},
Config::ProcessBinary(threads) => {
let builders = ProcessBuilder::new_bytes_vector(threads, refill)
.into_iter()
.map(AllocatorBuilder::Process)
.collect();
Ok((builders, Box::new(())))
},
Config::Cluster { threads, process, addresses, report, zerocopy: false, log_fn } => {
let process_allocators = ProcessBuilder::new_typed_vector(threads, refill.clone());
match initialize_networking(process_allocators, addresses, process, threads, report, refill, log_fn) {
Ok((stuff, guard)) => {
Ok((stuff.into_iter().map(AllocatorBuilder::Tcp).collect(), Box::new(guard)))
},
Err(err) => Err(format!("failed to initialize networking: {}", err))
}
},
Config::Cluster { threads, process, addresses, report, zerocopy: true, log_fn } => {
let process_allocators = ProcessBuilder::new_bytes_vector(threads, refill.clone());
match initialize_networking(process_allocators, addresses, process, threads, report, refill, log_fn) {
Ok((stuff, guard)) => {
Ok((stuff.into_iter().map(AllocatorBuilder::Tcp).collect(), Box::new(guard)))
},
Err(err) => Err(format!("failed to initialize networking: {}", err))
}
}
}
}
}
pub fn initialize<T:Send+'static, F: Fn(Allocator)->T+Send+Sync+'static>(
config: Config,
func: F,
) -> Result<WorkerGuards<T>,String> {
let (allocators, others) = config.try_build()?;
initialize_from(allocators, others, func)
}
pub fn initialize_from<T, F>(
builders: Vec<AllocatorBuilder>,
others: Box<dyn Any+Send>,
func: F,
) -> Result<WorkerGuards<T>,String>
where
T: Send+'static,
F: Fn(Allocator)->T+Send+Sync+'static
{
let logic = Arc::new(func);
let mut guards = Vec::new();
for (index, builder) in builders.into_iter().enumerate() {
let clone = Arc::clone(&logic);
guards.push(thread::Builder::new()
.name(format!("timely:work-{}", index))
.spawn(move || {
let communicator = builder.build();
(*clone)(communicator)
})
.map_err(|e| format!("{:?}", e))?);
}
Ok(WorkerGuards { guards, others })
}
pub struct WorkerGuards<T:Send+'static> {
guards: Vec<::std::thread::JoinHandle<T>>,
others: Box<dyn Any+Send>,
}
impl<T:Send+'static> WorkerGuards<T> {
pub fn guards(&self) -> &[std::thread::JoinHandle<T>] {
&self.guards[..]
}
pub fn others(&self) -> &Box<dyn Any+Send> {
&self.others
}
pub fn join(mut self) -> Vec<Result<T, String>> {
self.guards
.drain(..)
.map(|guard| guard.join().map_err(|e| format!("{:?}", e)))
.collect()
}
}
impl<T:Send+'static> Drop for WorkerGuards<T> {
fn drop(&mut self) {
for guard in self.guards.drain(..) {
guard.join().expect("Worker panic");
}
}
}