use std::thread::JoinHandle as ThreadJoinHandle;
#[derive(Debug)]
#[non_exhaustive]
pub enum SpawnError {
ThreadCreation(std::io::Error),
}
impl std::fmt::Display for SpawnError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ThreadCreation(err) => {
write!(f, "could not create the backing thread: {err}")
}
}
}
}
impl std::error::Error for SpawnError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::ThreadCreation(err) => Some(err),
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum JoinError {
Panicked,
}
impl std::fmt::Display for JoinError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Panicked => write!(f, "the spawned body panicked"),
}
}
}
impl std::error::Error for JoinError {}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum JobStatus {
Running,
Finished,
}
pub trait JobHandle: Send + Sync {
fn join(self: Box<Self>) -> Result<(), JoinError>;
fn is_finished(&self) -> bool;
fn status(&self) -> JobStatus {
if self.is_finished() {
JobStatus::Finished
} else {
JobStatus::Running
}
}
}
pub trait Spawn: Send + Sync {
fn spawn(
&self,
name: String,
stack_size: Option<usize>,
body: Box<dyn FnOnce() + Send + 'static>,
) -> Result<Box<dyn JobHandle>, SpawnError>;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct ThreadSpawn;
struct ThreadJob(ThreadJoinHandle<()>);
impl JobHandle for ThreadJob {
fn join(self: Box<Self>) -> Result<(), JoinError> {
self.0.join().map_err(|_| JoinError::Panicked)
}
fn is_finished(&self) -> bool {
self.0.is_finished()
}
}
impl From<SpawnError> for crate::store::error::StoreError {
fn from(err: SpawnError) -> Self {
match err {
SpawnError::ThreadCreation(io) => Self::Io(io),
}
}
}
impl Spawn for ThreadSpawn {
fn spawn(
&self,
name: String,
stack_size: Option<usize>,
body: Box<dyn FnOnce() + Send + 'static>,
) -> Result<Box<dyn JobHandle>, SpawnError> {
let mut builder = std::thread::Builder::new().name(name);
if let Some(stack_size) = stack_size {
builder = builder.stack_size(stack_size);
}
let handle = builder.spawn(body).map_err(SpawnError::ThreadCreation)?;
Ok(Box::new(ThreadJob(handle)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
#[test]
fn thread_spawn_runs_body_and_join_returns_ok() {
let spawner: Arc<dyn Spawn> = Arc::new(ThreadSpawn);
let flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
let flag_for_body = Arc::clone(&flag);
let handle = spawner
.spawn(
"thread-spawn-ok-proof".to_string(),
None,
Box::new(move || {
flag_for_body.store(true, std::sync::atomic::Ordering::Release);
}),
)
.expect("spawn must succeed");
handle.join().expect("clean body must join Ok");
assert!(
flag.load(std::sync::atomic::Ordering::Acquire),
"PROPERTY: ThreadSpawn must run the supplied body to completion"
);
}
#[test]
fn thread_spawn_join_surfaces_panic_as_err() {
let spawner = ThreadSpawn;
let handle = spawner
.spawn(
"thread-spawn-panic-proof".to_string(),
Some(256 * 1024),
Box::new(|| {
std::hint::black_box(Option::<()>::None)
.expect("intentional spawn panic proof");
}),
)
.expect("spawn must succeed");
assert!(
matches!(handle.join(), Err(JoinError::Panicked)),
"PROPERTY: a panicking body surfaces through JobHandle::join as \
JoinError::Panicked, matching std::thread::JoinHandle::join"
);
}
#[test]
fn status_is_running_until_the_body_is_released_then_joins() {
use std::sync::atomic::Ordering;
let spawner = ThreadSpawn;
let gate = Arc::new(std::sync::atomic::AtomicBool::new(false));
let gate_for_body = Arc::clone(&gate);
let handle: Box<dyn JobHandle> = spawner
.spawn(
"thread-spawn-status-proof".to_string(),
None,
Box::new(move || {
while !gate_for_body.load(Ordering::Acquire) {
std::hint::spin_loop();
}
}),
)
.expect("spawn must succeed");
assert_eq!(
handle.status(),
JobStatus::Running,
"PROPERTY: a gated body reports Running before release"
);
gate.store(true, Ordering::Release);
handle.join().expect("released body joins Ok");
}
#[test]
fn is_finished_flips_to_true_after_the_body_completes() {
use std::time::{Duration, Instant};
let spawner = ThreadSpawn;
let handle: Box<dyn JobHandle> = spawner
.spawn(
"thread-spawn-is-finished-proof".to_string(),
None,
Box::new(|| {}),
)
.expect("spawn must succeed");
let deadline = Instant::now() + Duration::from_secs(2);
let mut observed_finished = false;
while Instant::now() < deadline {
if handle.is_finished() {
observed_finished = true;
break;
}
std::thread::sleep(Duration::from_millis(5));
}
assert!(
observed_finished,
"PROPERTY: is_finished must report true once the body completes (the \
`-> false` mutant never does)"
);
assert_eq!(
handle.status(),
JobStatus::Finished,
"PROPERTY: a completed body's status is Finished, defaulted from is_finished"
);
handle.join().expect("completed body joins Ok");
}
#[test]
fn spawn_error_is_a_typed_io_failure_preserving_its_source() {
let err = SpawnError::ThreadCreation(std::io::Error::other("simulated"));
assert!(err.to_string().contains("backing thread"));
assert!(std::error::Error::source(&err).is_some());
}
}