fayalite 0.2.0

Hardware Description Language embedded in Rust, using FIRRTL's semantics
Documentation
// SPDX-License-Identifier: LGPL-3.0-or-later
// See Notices.txt for copyright information

use ctor::ctor;
use jobslot::{Acquired, Client};
use std::{
    ffi::OsString,
    mem,
    num::NonZeroUsize,
    sync::{Condvar, Mutex, Once, OnceLock},
    thread::spawn,
};

fn get_or_make_client() -> &'static Client {
    #[ctor]
    static CLIENT: OnceLock<Client> = unsafe {
        match Client::from_env() {
            Some(client) => OnceLock::from(client),
            None => OnceLock::new(),
        }
    };

    CLIENT.get_or_init(|| {
        let mut available_parallelism = None;
        let mut args = std::env::args_os().skip(1);
        while let Some(arg) = args.next() {
            const TEST_THREADS_OPTION: &'static [u8] = b"--test-threads";
            if arg.as_encoded_bytes().starts_with(TEST_THREADS_OPTION) {
                match arg.as_encoded_bytes().get(TEST_THREADS_OPTION.len()) {
                    Some(b'=') => {
                        let mut arg = arg.into_encoded_bytes();
                        arg.drain(..=TEST_THREADS_OPTION.len());
                        available_parallelism = Some(arg);
                        break;
                    }
                    None => {
                        available_parallelism = args.next().map(OsString::into_encoded_bytes);
                        break;
                    }
                    _ => {}
                }
            }
        }
        let available_parallelism = if let Some(available_parallelism) = available_parallelism
            .as_deref()
            .and_then(|v| std::str::from_utf8(v).ok())
            .and_then(|v| v.parse().ok())
        {
            available_parallelism
        } else if let Ok(available_parallelism) = std::thread::available_parallelism() {
            available_parallelism
        } else {
            NonZeroUsize::new(1).unwrap()
        };
        Client::new_with_fifo(available_parallelism.get() - 1).expect("failed to create job server")
    })
}

struct State {
    waiting_count: usize,
    available: Vec<Acquired>,
    implicit_available: bool,
}

impl State {
    fn total_available(&self) -> usize {
        self.available.len() + self.implicit_available as usize
    }
    fn additional_waiting(&self) -> usize {
        self.waiting_count.saturating_sub(self.total_available())
    }
}

static STATE: Mutex<State> = Mutex::new(State {
    waiting_count: 0,
    available: Vec::new(),
    implicit_available: true,
});
static COND_VAR: Condvar = Condvar::new();

#[derive(Debug)]
enum AcquiredJobInner {
    FromJobServer(Acquired),
    ImplicitJob,
}

#[derive(Debug)]
pub struct AcquiredJob {
    job: AcquiredJobInner,
}

impl AcquiredJob {
    fn start_acquire_thread() {
        static STARTED_THREAD: Once = Once::new();
        STARTED_THREAD.call_once(|| {
            spawn(|| {
                let mut acquired = None;
                let client = get_or_make_client();
                let mut state = STATE.lock().unwrap();
                loop {
                    state = if state.additional_waiting() == 0 {
                        if acquired.is_some() {
                            drop(state);
                            drop(acquired.take()); // drop Acquired outside of lock
                            STATE.lock().unwrap()
                        } else {
                            COND_VAR.wait(state).unwrap()
                        }
                    } else if acquired.is_some() {
                        // allocate space before moving Acquired to ensure we
                        // drop Acquired outside of the lock on panic
                        state.available.reserve(1);
                        state.available.push(acquired.take().unwrap());
                        COND_VAR.notify_all();
                        state
                    } else {
                        drop(state);
                        acquired = Some(
                            client
                                .acquire()
                                .expect("can't acquire token from job server"),
                        );
                        STATE.lock().unwrap()
                    };
                }
            });
        });
    }
    fn acquire_inner(block: bool) -> Option<Self> {
        Self::start_acquire_thread();
        let mut state = STATE.lock().unwrap();
        loop {
            if let Some(acquired) = state.available.pop() {
                return Some(Self {
                    job: AcquiredJobInner::FromJobServer(acquired),
                });
            }
            if state.implicit_available {
                state.implicit_available = false;
                return Some(Self {
                    job: AcquiredJobInner::ImplicitJob,
                });
            }
            if !block {
                return None;
            }
            state.waiting_count += 1;
            state = COND_VAR.wait(state).unwrap();
            state.waiting_count -= 1;
        }
    }
    pub fn try_acquire() -> Option<Self> {
        Self::acquire_inner(false)
    }
    pub fn acquire() -> Self {
        Self::acquire_inner(true).expect("failed to acquire token")
    }
    pub fn run_command<R>(
        &mut self,
        cmd: std::process::Command,
        f: impl FnOnce(&mut std::process::Command) -> std::io::Result<R>,
    ) -> std::io::Result<R> {
        get_or_make_client().configure_make_and_run_with_fifo(cmd, f)
    }
}

impl Drop for AcquiredJob {
    fn drop(&mut self) {
        let mut state = STATE.lock().unwrap();
        match &self.job {
            AcquiredJobInner::FromJobServer(_) => {
                if state.waiting_count > state.available.len() + state.implicit_available as usize {
                    // allocate space before moving Acquired to ensure we
                    // drop Acquired outside of the lock on panic
                    state.available.reserve(1);
                    let AcquiredJobInner::FromJobServer(acquired) =
                        mem::replace(&mut self.job, AcquiredJobInner::ImplicitJob)
                    else {
                        unreachable!()
                    };
                    state.available.push(acquired);
                    COND_VAR.notify_all();
                }
            }
            AcquiredJobInner::ImplicitJob => {
                state.implicit_available = true;
                if state.waiting_count > state.available.len() {
                    COND_VAR.notify_all();
                }
            }
        }
    }
}