use std::io;
use std::process::Command;
use std::sync::Arc;
use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::StreamExt;
use crate::errors::*;
#[derive(Clone)]
pub struct Client {
tx: Option<mpsc::UnboundedSender<oneshot::Sender<io::Result<jobserver::Acquired>>>>,
inner: jobserver::Client,
helper: Option<Arc<jobserver::HelperThread>>,
}
pub struct Acquired {
_token: Option<jobserver::Acquired>,
}
impl Client {
pub unsafe fn new() -> Client {
match jobserver::Client::from_env() {
Some(c) => Client::_new(c, true),
None => Client::new_num(num_cpus::get()),
}
}
pub fn new_num(num: usize) -> Client {
let inner = jobserver::Client::new(num).expect("failed to create jobserver");
Client::_new(inner, false)
}
fn _new(inner: jobserver::Client, inherited: bool) -> Client {
let (helper, tx) = if inherited {
(None, None)
} else {
let (tx, mut rx) = mpsc::unbounded::<oneshot::Sender<_>>();
let helper = inner
.clone()
.into_helper_thread(move |token| {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(async {
if let Some(sender) = rx.next().await {
drop(sender.send(token));
}
});
})
.expect("failed to spawn helper thread");
(Some(Arc::new(helper)), Some(tx))
};
Client { tx, inner, helper }
}
pub fn configure(&self, cmd: &mut Command) {
self.inner.configure(cmd)
}
pub async fn acquire(&self) -> Result<Acquired> {
let (helper, tx) = match (self.helper.as_ref(), self.tx.as_ref()) {
(Some(a), Some(b)) => (a, b),
_ => return Ok(Acquired { _token: None }),
};
let (mytx, myrx) = oneshot::channel();
helper.request_token();
tx.unbounded_send(mytx).unwrap();
let acquired = myrx
.await
.context("jobserver helper panicked")?
.context("failed to acquire jobserver token")?;
Ok(Acquired {
_token: Some(acquired),
})
}
}