use std::collections::BinaryHeap;
use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
pub type FunctionId = u32;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(u8)]
pub enum Tier {
Baseline = 0,
Maglev = 1,
Turbofan = 2,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct CompileJob {
pub function_id: FunctionId,
pub tier: Tier,
sequence: u64,
}
impl Ord for CompileJob {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.tier
.cmp(&other.tier)
.then_with(|| other.sequence.cmp(&self.sequence))
}
}
impl PartialOrd for CompileJob {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug)]
pub struct CompileResult {
pub function_id: FunctionId,
pub tier: Tier,
pub code_ptr: *const u8,
}
unsafe impl Send for CompileResult {}
pub struct AtomicCodeSlot {
ptr: AtomicPtr<u8>,
}
impl AtomicCodeSlot {
pub fn new() -> Self {
Self {
ptr: AtomicPtr::new(std::ptr::null_mut()),
}
}
pub fn store(&self, ptr: *mut u8) {
self.ptr.store(ptr, Ordering::Release);
}
pub fn load(&self) -> *const u8 {
self.ptr.load(Ordering::Acquire)
}
}
impl Default for AtomicCodeSlot {
fn default() -> Self {
Self::new()
}
}
pub struct CompilationQueue {
inner: Mutex<BinaryHeap<CompileJob>>,
condvar: Condvar,
next_seq: AtomicU64,
}
impl CompilationQueue {
pub fn new() -> Self {
Self {
inner: Mutex::new(BinaryHeap::new()),
condvar: Condvar::new(),
next_seq: AtomicU64::new(0),
}
}
pub fn push(&self, function_id: FunctionId, tier: Tier) {
let sequence = self.next_seq.fetch_add(1, Ordering::Relaxed);
let job = CompileJob {
function_id,
tier,
sequence,
};
{
let mut heap = self.inner.lock().expect("queue lock poisoned");
heap.push(job);
}
self.condvar.notify_one();
}
pub fn pop_blocking(&self) -> CompileJob {
let mut heap = self.inner.lock().expect("queue lock poisoned");
loop {
if let Some(job) = heap.pop() {
return job;
}
heap = self.condvar.wait(heap).expect("queue lock poisoned");
}
}
pub fn try_pop(&self) -> Option<CompileJob> {
self.inner.lock().expect("queue lock poisoned").pop()
}
pub fn len(&self) -> usize {
self.inner.lock().expect("queue lock poisoned").len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Default for CompilationQueue {
fn default() -> Self {
Self::new()
}
}
pub type CompileFn = Box<dyn Fn(&CompileJob) -> *const u8 + Send + Sync>;
pub struct CompilationPool {
queue: Arc<CompilationQueue>,
workers: Vec<thread::JoinHandle<()>>,
shutdown: Arc<std::sync::atomic::AtomicBool>,
}
impl CompilationPool {
pub fn new(num_workers: usize, compile_fn: Arc<CompileFn>) -> Self {
let num_workers = num_workers.max(1);
let queue = Arc::new(CompilationQueue::new());
let shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
let mut workers = Vec::with_capacity(num_workers);
for id in 0..num_workers {
let q = Arc::clone(&queue);
let f = Arc::clone(&compile_fn);
let stop = Arc::clone(&shutdown);
let handle = thread::Builder::new()
.name(format!("stator-compile-{id}"))
.spawn(move || {
while !stop.load(Ordering::Relaxed) {
if let Some(job) = q.try_pop() {
let _code = f(&job);
} else {
thread::park_timeout(std::time::Duration::from_millis(5));
}
}
})
.expect("failed to spawn compilation worker");
workers.push(handle);
}
Self {
queue,
workers,
shutdown,
}
}
pub fn submit(&self, function_id: FunctionId, tier: Tier) {
self.queue.push(function_id, tier);
if let Some(w) = self.workers.first() {
w.thread().unpark();
}
}
pub fn queue(&self) -> &Arc<CompilationQueue> {
&self.queue
}
pub fn num_workers(&self) -> usize {
self.workers.len()
}
}
impl Drop for CompilationPool {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
for w in &self.workers {
w.thread().unpark();
}
for handle in self.workers.drain(..) {
let _ = handle.join();
}
}
}
pub fn on_tier_up_request(pool: &CompilationPool, function_id: FunctionId, tier: Tier) {
pool.submit(function_id, tier);
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicUsize;
#[test]
fn test_compile_job_priority_higher_tier_first() {
let queue = CompilationQueue::new();
queue.push(1, Tier::Baseline);
queue.push(2, Tier::Turbofan);
queue.push(3, Tier::Maglev);
let first = queue.try_pop().unwrap();
assert_eq!(first.tier, Tier::Turbofan);
let second = queue.try_pop().unwrap();
assert_eq!(second.tier, Tier::Maglev);
let third = queue.try_pop().unwrap();
assert_eq!(third.tier, Tier::Baseline);
}
#[test]
fn test_compile_job_fifo_within_same_tier() {
let queue = CompilationQueue::new();
queue.push(10, Tier::Maglev);
queue.push(20, Tier::Maglev);
queue.push(30, Tier::Maglev);
assert_eq!(queue.try_pop().unwrap().function_id, 10);
assert_eq!(queue.try_pop().unwrap().function_id, 20);
assert_eq!(queue.try_pop().unwrap().function_id, 30);
}
#[test]
fn test_queue_is_empty() {
let queue = CompilationQueue::new();
assert!(queue.is_empty());
queue.push(1, Tier::Baseline);
assert!(!queue.is_empty());
queue.try_pop();
assert!(queue.is_empty());
}
#[test]
fn test_atomic_code_slot_store_load() {
let slot = AtomicCodeSlot::new();
assert!(slot.load().is_null());
let dummy: u8 = 42;
let ptr = &dummy as *const u8 as *mut u8;
slot.store(ptr);
assert_eq!(slot.load(), ptr as *const u8);
}
#[test]
fn test_pool_processes_jobs() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let compile_fn: Arc<CompileFn> = Arc::new(Box::new(move |_job: &CompileJob| {
counter_clone.fetch_add(1, Ordering::Relaxed);
std::ptr::null()
}));
let pool = CompilationPool::new(2, compile_fn);
pool.submit(1, Tier::Baseline);
pool.submit(2, Tier::Maglev);
pool.submit(3, Tier::Turbofan);
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(counter.load(Ordering::Relaxed), 3);
}
#[test]
fn test_pool_shutdown_joins_workers() {
let compile_fn: Arc<CompileFn> = Arc::new(Box::new(|_| std::ptr::null()));
let pool = CompilationPool::new(2, compile_fn);
assert_eq!(pool.num_workers(), 2);
drop(pool);
}
#[test]
fn test_on_tier_up_request_enqueues() {
let compile_fn: Arc<CompileFn> = Arc::new(Box::new(|_| std::ptr::null()));
let pool = CompilationPool::new(1, compile_fn);
on_tier_up_request(&pool, 42, Tier::Turbofan);
std::thread::sleep(std::time::Duration::from_millis(50));
assert!(pool.queue().is_empty());
}
}