use std::ffi::CStr;
use std::mem::MaybeUninit;
use std::num::NonZeroUsize;
use std::path::Path;
use std::{env, fmt, io, thread};
use heph::actor_ref::ActorGroup;
use log::{debug, warn};
use crate::coordinator::Coordinator;
use crate::trace;
use crate::{worker, Error, Runtime, MAX_THREADS};
#[derive(Debug)]
#[must_use = "`heph_rt::Setup` doesn't do anything until its `build`"]
pub struct Setup {
name: Option<String>,
threads: usize,
auto_cpu_affinity: bool,
trace_log: Option<trace::CoordinatorLog>,
}
impl Setup {
pub(super) const fn new() -> Setup {
Setup {
name: None,
threads: 1,
auto_cpu_affinity: false,
trace_log: None,
}
}
pub fn with_name(mut self, name: String) -> Setup {
assert!(!name.is_empty(), "Can't use an empty application name");
self.name = Some(name);
self
}
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
}
pub fn num_threads(mut self, n: usize) -> Self {
assert!(
n != 0,
"Can't create zero worker threads, one is the minimum"
);
assert!(
n < MAX_THREADS,
"Can't create {} worker threads, {} is the maximum",
n,
MAX_THREADS
);
self.threads = n;
self
}
pub fn use_all_cores(self) -> Self {
let n = match thread::available_parallelism() {
Ok(n) => n.get(),
Err(err) => {
warn!(
"failed to get the available concurrency: {}, using a single worker thread",
err
);
1
}
};
self.num_threads(n)
}
pub const fn get_threads(&self) -> usize {
self.threads
}
pub const fn auto_cpu_affinity(mut self) -> Self {
self.auto_cpu_affinity = true;
self
}
pub fn enable_tracing<P: AsRef<Path>>(&mut self, path: P) -> Result<(), Error> {
match trace::CoordinatorLog::open(path.as_ref()) {
Ok(trace_log) => {
self.trace_log = Some(trace_log);
Ok(())
}
Err(err) => Err(Error::setup_trace(err)),
}
}
pub fn build(self) -> Result<Runtime, Error> {
#[rustfmt::skip]
let Setup { name, threads, auto_cpu_affinity, mut trace_log } = self;
let name = name.unwrap_or_else(default_app_name).into_boxed_str();
debug!(name = name, workers = threads; "building Heph runtime");
let timing = trace::start(&trace_log);
let mut worker_setups = Vec::with_capacity(threads);
let mut thread_wakers = Vec::with_capacity(threads);
for id in 1..=threads {
let id = NonZeroUsize::new(id).unwrap();
let (worker_setup, thread_waker) = worker::setup(id).map_err(Error::start_worker)?;
worker_setups.push(worker_setup);
thread_wakers.push(thread_waker);
}
let thread_wakers = thread_wakers.into_boxed_slice();
let shared_trace_log = trace_log.as_ref().map(trace::CoordinatorLog::clone_shared);
let coordinator = Coordinator::init(name, thread_wakers, shared_trace_log)
.map_err(Error::init_coordinator)?;
let workers = worker_setups
.into_iter()
.map(|worker_setup| {
#[allow(clippy::cast_possible_truncation)]
let trace_log = trace_log
.as_ref()
.map(|trace_log| trace_log.new_stream(worker_setup.id() as u32));
worker_setup.start(
coordinator.shared_internals().clone(),
auto_cpu_affinity,
trace_log,
)
})
.collect::<io::Result<Vec<worker::Handle>>>()
.map_err(Error::start_worker)?;
trace::finish_rt(
trace_log.as_mut(),
timing,
"Spawning worker threads",
&[("amount", &threads)],
);
Ok(Runtime {
coordinator,
workers,
sync_actors: Vec::new(),
signals: ActorGroup::empty(),
trace_log,
})
}
}
fn default_app_name() -> String {
match env::args().next() {
Some(mut bin_path) => {
if let Some(idx) = bin_path.rfind('/') {
drop(bin_path.drain(..=idx));
}
bin_path
}
None => "<unknown>".to_string(),
}
}
pub(crate) fn host_info() -> io::Result<(Box<str>, Box<str>)> {
#[cfg(target_os = "linux")]
const OS: &str = "GNU/Linux";
#[cfg(target_os = "freebsd")]
const OS: &str = "FreeBSD";
#[cfg(target_os = "macos")]
const OS: &str = "macOS";
let mut uname_info: MaybeUninit<libc::utsname> = MaybeUninit::uninit();
if unsafe { libc::uname(uname_info.as_mut_ptr()) } == -1 {
return Err(io::Error::last_os_error());
}
let uname_info = unsafe { uname_info.assume_init() };
let sysname = unsafe { CStr::from_ptr(uname_info.sysname.as_ptr().cast()).to_string_lossy() };
let release = unsafe { CStr::from_ptr(uname_info.release.as_ptr().cast()).to_string_lossy() };
let version = unsafe { CStr::from_ptr(uname_info.version.as_ptr().cast()).to_string_lossy() };
let nodename = unsafe { CStr::from_ptr(uname_info.nodename.as_ptr().cast()).to_string_lossy() };
let os = format!("{} ({} {} {})", OS, sysname, release, version).into_boxed_str();
let hostname = nodename.into_owned().into_boxed_str();
Ok((os, hostname))
}
#[derive(Copy, Clone)]
#[allow(clippy::doc_markdown)]
pub(crate) struct Uuid(u128);
impl fmt::Display for Uuid {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:032x}", self.0)
}
}
impl fmt::Debug for Uuid {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self, f)
}
}
#[cfg(any(target_os = "freebsd", target_os = "linux"))]
pub(crate) fn host_id() -> io::Result<Uuid> {
use std::fs::File;
use std::io::Read;
#[cfg(target_os = "linux")]
const PATH: &str = "/etc/machine-id";
#[cfg(target_os = "linux")]
const EXPECTED_SIZE: usize = 32;
#[cfg(target_os = "freebsd")]
const PATH: &str = "/etc/hostid";
#[cfg(target_os = "freebsd")]
const EXPECTED_SIZE: usize = 36;
let mut buf = [0; EXPECTED_SIZE];
let mut file = File::open(PATH)?;
let n = file.read(&mut buf).map_err(|err| {
let msg = format!("can't open '{}': {}", PATH, err);
io::Error::new(err.kind(), msg)
})?;
if n == EXPECTED_SIZE {
#[cfg(target_os = "linux")]
let res = from_hex(&buf[..EXPECTED_SIZE]);
#[cfg(target_os = "freebsd")]
let res = from_hex_hyphenated(&buf[..EXPECTED_SIZE]);
res.map_err(|()| {
let msg = format!("invalid `{}` format: input is not hex", PATH);
io::Error::new(io::ErrorKind::InvalidData, msg)
})
} else {
let msg = format!(
"can't read '{}', invalid format: only read {} bytes (expected {})",
PATH, n, EXPECTED_SIZE,
);
Err(io::Error::new(io::ErrorKind::InvalidData, msg))
}
}
#[cfg(target_os = "linux")]
fn from_hex(input: &[u8]) -> Result<Uuid, ()> {
let mut bytes = [0; 16];
for (idx, chunk) in input.chunks_exact(2).enumerate() {
let lower = from_hex_byte(chunk[1])?;
let higher = from_hex_byte(chunk[0])?;
bytes[idx] = lower | (higher << 4);
}
Ok(Uuid(u128::from_be_bytes(bytes)))
}
#[cfg(target_os = "freebsd")]
fn from_hex_hyphenated(input: &[u8]) -> Result<Uuid, ()> {
let mut bytes = [0; 16];
let mut idx = 0;
let groups: [std::ops::Range<usize>; 5] = [0..8, 9..13, 14..18, 19..23, 24..36];
for group in groups {
let group_end = group.end;
for chunk in input[group].chunks_exact(2) {
let lower = from_hex_byte(chunk[1])?;
let higher = from_hex_byte(chunk[0])?;
bytes[idx] = lower | (higher << 4);
idx += 1;
}
if let Some(b) = input.get(group_end) {
if *b != b'-' {
return Err(());
}
}
}
Ok(Uuid(u128::from_be_bytes(bytes)))
}
#[cfg(any(target_os = "freebsd", target_os = "linux"))]
const fn from_hex_byte(b: u8) -> Result<u8, ()> {
match b {
b'A'..=b'F' => Ok(b - b'A' + 10),
b'a'..=b'f' => Ok(b - b'a' + 10),
b'0'..=b'9' => Ok(b - b'0'),
_ => Err(()),
}
}
#[cfg(target_os = "macos")]
pub(crate) fn host_id() -> io::Result<Uuid> {
let mut bytes = [0; 16];
let timeout = libc::timespec {
tv_sec: 1, tv_nsec: 0,
};
if unsafe { libc::gethostuuid(bytes.as_mut_ptr(), &timeout) } == -1 {
Err(io::Error::last_os_error())
} else {
Ok(Uuid(u128::from_be_bytes(bytes)))
}
}
pub(crate) fn set_cpu_affinity(worker_id: NonZeroUsize) -> Option<usize> {
#[cfg(not(target_os = "linux"))]
{
let _ = worker_id; None
}
#[cfg(target_os = "linux")]
{
let cpu = worker_id.get() - 1; let cpu_set = cpu_set(cpu);
match set_affinity(&cpu_set) {
Ok(()) => {
debug!(worker_id = {}; "worker thread CPU affinity set to {}", cpu);
Some(cpu)
}
Err(err) => {
warn!(worker_id = {}; "failed to set CPU affinity on thread: {}", err);
None
}
}
}
}
#[cfg(target_os = "linux")]
fn cpu_set(cpu: usize) -> libc::cpu_set_t {
let mut cpu_set = unsafe { std::mem::zeroed() };
unsafe { libc::CPU_ZERO(&mut cpu_set) };
unsafe { libc::CPU_SET(cpu % libc::CPU_SETSIZE as usize, &mut cpu_set) };
cpu_set
}
#[cfg(target_os = "linux")]
fn set_affinity(cpu_set: &libc::cpu_set_t) -> io::Result<()> {
let thread = unsafe { libc::pthread_self() };
let res =
unsafe { libc::pthread_setaffinity_np(thread, std::mem::size_of_val(cpu_set), cpu_set) };
if res == 0 {
Ok(())
} else {
Err(io::Error::last_os_error())
}
}