use crate::store::platform::spawn::{JobHandle, JoinError, Spawn, SpawnError};
use std::collections::VecDeque;
use std::panic::AssertUnwindSafe;
use std::sync::{Arc, Mutex};
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum SlotState {
Pending,
Done,
Panicked,
}
struct Slot {
body: Option<Box<dyn FnOnce() + Send + 'static>>,
state: SlotState,
}
#[derive(Default)]
struct Shared {
slots: Mutex<Vec<Slot>>,
queue: Mutex<VecDeque<usize>>,
}
impl Shared {
fn enqueue(&self, body: Box<dyn FnOnce() + Send + 'static>) -> usize {
let mut slots = self
.slots
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let id = slots.len();
slots.push(Slot {
body: Some(body),
state: SlotState::Pending,
});
drop(slots);
self.queue
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push_back(id);
id
}
fn next_pending(&self) -> Option<usize> {
self.queue
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.pop_front()
}
fn run_slot(&self, id: usize) {
let body = self
.slots
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)[id]
.body
.take();
let Some(body) = body else {
return;
};
let result = std::panic::catch_unwind(AssertUnwindSafe(body));
self.slots
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)[id]
.state = if result.is_ok() {
SlotState::Done
} else {
SlotState::Panicked
};
}
fn is_finished(&self, id: usize) -> bool {
matches!(
self.slots
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)[id]
.state,
SlotState::Done | SlotState::Panicked
)
}
fn join_slot(&self, id: usize) -> Result<(), JoinError> {
while !self.is_finished(id) {
match self.next_pending() {
Some(next) => self.run_slot(next),
None => self.run_slot(id),
}
}
let state = self
.slots
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)[id]
.state;
match state {
SlotState::Done | SlotState::Pending => Ok(()),
SlotState::Panicked => Err(JoinError::Panicked),
}
}
}
#[derive(Default)]
pub(crate) struct SimScheduler {
shared: Arc<Shared>,
}
impl SimScheduler {
pub(crate) fn new() -> Self {
Self::default()
}
pub(crate) fn run_all(&self) {
while let Some(id) = self.shared.next_pending() {
self.shared.run_slot(id);
}
}
pub(crate) fn spawn_owned(
&self,
body: Box<dyn FnOnce() + Send + 'static>,
) -> Box<dyn JobHandle> {
let id = self.shared.enqueue(body);
Box::new(SimJoinHandle {
shared: Arc::clone(&self.shared),
id,
})
}
}
struct SimJoinHandle {
shared: Arc<Shared>,
id: usize,
}
impl JobHandle for SimJoinHandle {
fn join(self: Box<Self>) -> Result<(), JoinError> {
self.shared.join_slot(self.id)
}
fn is_finished(&self) -> bool {
self.shared.is_finished(self.id)
}
}
impl Spawn for SimScheduler {
fn spawn(
&self,
_name: String,
_stack_size: Option<usize>,
body: Box<dyn FnOnce() + Send + 'static>,
) -> Result<Box<dyn JobHandle>, SpawnError> {
Ok(self.spawn_owned(body))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn run_all_executes_in_enqueue_order() {
let sched = SimScheduler::new();
let log = Arc::new(std::sync::Mutex::new(Vec::new()));
for i in 0..5usize {
let log = Arc::clone(&log);
sched
.spawn(
"t".to_string(),
None,
Box::new(move || log.lock().expect("test log lock").push(i)),
)
.expect("sim spawn cannot fail");
}
sched.run_all();
assert_eq!(
*log.lock().expect("test log lock"),
vec![0, 1, 2, 3, 4],
"PROPERTY: cooperative scheduler runs bodies in deterministic enqueue order"
);
}
#[test]
fn spawn_join_drains_and_returns_ok() {
let sched = SimScheduler::new();
let ran = Arc::new(AtomicUsize::new(0));
let ran_body = Arc::clone(&ran);
let handle = sched
.spawn(
"owned".to_string(),
None,
Box::new(move || {
ran_body.fetch_add(1, Ordering::SeqCst);
}),
)
.expect("sim spawn cannot fail");
handle.join().expect("clean body joins Ok");
assert_eq!(
ran.load(Ordering::SeqCst),
1,
"PROPERTY: join drains the cooperative queue until the body completes"
);
}
#[test]
fn spawn_join_surfaces_panic_as_err() {
let sched = SimScheduler::new();
let handle = sched.spawn_owned(Box::new(|| {
std::hint::black_box(Option::<()>::None).expect("intentional sim panic proof")
}));
assert!(
handle.join().is_err(),
"PROPERTY: a panicking body surfaces through JobHandle::join as Err, matching std::thread"
);
}
#[test]
fn thread_spawn_and_sim_scheduler_agree_on_join_outcome() {
use crate::store::platform::spawn::{JobHandle, Spawn, ThreadSpawn};
fn join_ok(spawner: &dyn Spawn, panic: bool) -> bool {
let handle: Box<dyn JobHandle> = spawner
.spawn(
"shared-drive-rule".to_string(),
None,
Box::new(move || {
if panic {
std::hint::black_box(Option::<()>::None)
.expect("intentional shared-drive panic proof");
}
}),
)
.expect("spawn succeeds on both paths");
handle.join().is_ok()
}
let thread = ThreadSpawn;
let sim = SimScheduler::new();
assert_eq!(
join_ok(&thread, false),
join_ok(&sim, false),
"shared-drive: a clean body joins identically on thread + cooperative paths"
);
assert!(join_ok(&ThreadSpawn, false), "clean body joins Ok");
assert_eq!(
join_ok(&thread, true),
join_ok(&sim, true),
"shared-drive: a panicking body joins identically on thread + cooperative paths"
);
assert!(!join_ok(&ThreadSpawn, true), "panicking body joins Err");
}
}