#![deny(missing_docs, missing_debug_implementations)]
#![doc(html_root_url = "https://docs.rs/jobserver/0.1")]
use std::env;
use std::io;
use std::process::Command;
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
#[cfg(unix)]
#[path = "unix.rs"]
mod imp;
#[cfg(windows)]
#[path = "windows.rs"]
mod imp;
#[cfg(not(any(unix, windows)))]
#[path = "wasm.rs"]
mod imp;
#[derive(Clone, Debug)]
pub struct Client {
inner: Arc<imp::Client>,
}
#[derive(Debug)]
pub struct Acquired {
client: Arc<imp::Client>,
data: imp::Acquired,
disabled: bool,
}
impl Acquired {
pub fn drop_without_releasing(mut self) {
self.disabled = true;
}
}
#[derive(Default, Debug)]
struct HelperState {
lock: Mutex<HelperInner>,
cvar: Condvar,
}
#[derive(Default, Debug)]
struct HelperInner {
requests: usize,
producer_done: bool,
consumer_done: bool,
}
impl Client {
pub fn new(limit: usize) -> io::Result<Client> {
Ok(Client {
inner: Arc::new(imp::Client::new(limit)?),
})
}
pub unsafe fn from_env() -> Option<Client> {
let var = match env::var("CARGO_MAKEFLAGS")
.or_else(|_| env::var("MAKEFLAGS"))
.or_else(|_| env::var("MFLAGS"))
{
Ok(s) => s,
Err(_) => return None,
};
let mut arg = "--jobserver-fds=";
let pos = match var.find(arg) {
Some(i) => i,
None => {
arg = "--jobserver-auth=";
match var.find(arg) {
Some(i) => i,
None => return None,
}
}
};
let s = var[pos + arg.len()..].split(' ').next().unwrap();
imp::Client::open(s).map(|c| Client { inner: Arc::new(c) })
}
pub fn acquire(&self) -> io::Result<Acquired> {
let data = self.inner.acquire()?;
Ok(Acquired {
client: self.inner.clone(),
data,
disabled: false,
})
}
pub fn configure(&self, cmd: &mut Command) {
let arg = self.inner.string_arg();
let value = format!("-j --jobserver-fds={0} --jobserver-auth={0}", arg);
cmd.env("CARGO_MAKEFLAGS", &value);
self.inner.configure(cmd);
}
pub fn into_helper_thread<F>(self, f: F) -> io::Result<HelperThread>
where
F: FnMut(io::Result<Acquired>) + Send + 'static,
{
let state = Arc::new(HelperState::default());
Ok(HelperThread {
inner: Some(imp::spawn_helper(self, state.clone(), Box::new(f))?),
state,
})
}
pub fn acquire_raw(&self) -> io::Result<()> {
self.inner.acquire()?;
Ok(())
}
pub fn release_raw(&self) -> io::Result<()> {
self.inner.release(None)?;
Ok(())
}
}
impl Drop for Acquired {
fn drop(&mut self) {
if !self.disabled {
drop(self.client.release(Some(&self.data)));
}
}
}
#[derive(Debug)]
pub struct HelperThread {
inner: Option<imp::Helper>,
state: Arc<HelperState>,
}
impl HelperThread {
pub fn request_token(&self) {
self.state.lock().requests += 1;
self.state.cvar.notify_one();
}
}
impl Drop for HelperThread {
fn drop(&mut self) {
self.state.lock().producer_done = true;
self.state.cvar.notify_one();
self.inner.take().unwrap().join();
}
}
impl HelperState {
fn lock(&self) -> MutexGuard<'_, HelperInner> {
self.lock.lock().unwrap_or_else(|e| e.into_inner())
}
fn for_each_request(&self, mut f: impl FnMut(&HelperState)) {
let mut lock = self.lock();
while !lock.producer_done {
if lock.requests == 0 {
lock = self.cvar.wait(lock).unwrap_or_else(|e| e.into_inner());
continue;
}
lock.requests -= 1;
drop(lock);
f(self);
lock = self.lock();
}
lock.consumer_done = true;
self.cvar.notify_one();
}
fn producer_done(&self) -> bool {
self.lock().producer_done
}
}
#[test]
fn no_helper_deadlock() {
let x = crate::Client::new(32).unwrap();
let _y = x.clone();
std::mem::drop(x.into_helper_thread(|_| {}).unwrap());
}