use futures::{
executor::{LocalPool, LocalSpawner},
task::LocalSpawnExt,
};
use std::thread::ThreadId;
use thiserror::Error;
#[derive(Clone, Debug)]
pub(crate) struct ThreadLocalSpawner {
spawner: LocalSpawner,
thread: ThreadId,
}
unsafe impl Send for ThreadLocalSpawner {}
unsafe impl Sync for ThreadLocalSpawner {}
#[derive(Debug, Error)]
pub enum ThreadLocalSpawnerError {
#[error(
"The ThreadLocalSpawner can only spawn tasks on the thread it was created on. Expected to be on {expected:?} but was actually on {found:?}"
)]
NotOnTheCorrectThread { expected: ThreadId, found: ThreadId },
#[error(
"The local executor associated with this spawner has been shut down and cannot accept new tasks"
)]
LocalPoolShutDown,
#[error("An error occurred while spawning the task")]
SpawnError,
}
impl ThreadLocalSpawner {
pub(crate) fn spawn_local<F: Future<Output = ()> + 'static>(
&self,
future: F,
) -> Result<(), ThreadLocalSpawnerError> {
if std::thread::current().id() != self.thread {
return Err(ThreadLocalSpawnerError::NotOnTheCorrectThread {
expected: self.thread,
found: std::thread::current().id(),
});
}
self.spawner
.spawn_local(future)
.map_err(|e| match e.is_shutdown() {
true => ThreadLocalSpawnerError::LocalPoolShutDown,
false => ThreadLocalSpawnerError::SpawnError,
})
}
}
pub(crate) struct ThreadLocalExecutor {
pool: LocalPool,
}
impl ThreadLocalExecutor {
pub(crate) fn new() -> Self {
let local_pool = futures::executor::LocalPool::new();
Self { pool: local_pool }
}
pub(crate) fn spawner(&self) -> ThreadLocalSpawner {
ThreadLocalSpawner {
spawner: self.pool.spawner(),
thread: std::thread::current().id(),
}
}
pub(crate) fn run_until<F: Future>(&mut self, future: F) -> F::Output {
self.pool.run_until(future)
}
}