mod clock;
mod executor;
mod test_scheduler;
#[cfg(test)]
mod tests;
pub use clock::*;
pub use executor::*;
pub use test_scheduler::*;
use async_task::Runnable;
use futures::channel::oneshot;
use std::{
any::Any,
future::Future,
panic::Location,
pin::Pin,
sync::Arc,
task::{Context, Poll},
thread,
time::Duration,
};
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum Priority {
RealtimeAudio,
High,
#[default]
Medium,
Low,
}
impl Priority {
pub const fn weight(self) -> u32 {
match self {
Priority::High => 60,
Priority::Medium => 30,
Priority::Low => 10,
Priority::RealtimeAudio => 0,
}
}
}
#[derive(Clone, Copy, Debug)]
pub struct SpawnTime(pub Instant);
#[derive(Clone, Debug)]
pub struct RunnableMeta {
pub location: &'static Location<'static>,
pub spawned: SpawnTime,
}
impl RunnableMeta {
#[track_caller]
pub fn new_with_callers_location() -> Self {
Self {
location: core::panic::Location::caller(),
spawned: SpawnTime(Instant::now()),
}
}
}
pub trait Scheduler: Send + Sync {
fn block(
&self,
session_id: Option<SessionId>,
future: Pin<&mut dyn Future<Output = ()>>,
timeout: Option<Duration>,
) -> bool;
fn schedule_local(&self, session_id: SessionId, runnable: Runnable<RunnableMeta>);
fn schedule_background_with_priority(
&self,
runnable: Runnable<RunnableMeta>,
priority: Priority,
);
fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>);
fn schedule_background(&self, runnable: Runnable<RunnableMeta>) {
self.schedule_background_with_priority(runnable, Priority::default());
}
#[track_caller]
fn timer(&self, timeout: Duration) -> Timer;
fn clock(&self) -> Arc<dyn Clock>;
fn spawn_dedicated(
self: Arc<Self>,
f: Box<
dyn FnOnce(
LocalExecutor,
)
-> Pin<Box<dyn Future<Output = Box<dyn Any + Send + Sync>> + 'static>>
+ Send
+ 'static,
>,
) -> Task<Box<dyn Any + Send + Sync>>;
fn as_test(&self) -> Option<&TestScheduler> {
None
}
}
pub fn spawn_dedicated_thread<F, Fut>(
session_id: SessionId,
scheduler: Arc<dyn Scheduler>,
f: F,
) -> Task<Fut::Output>
where
F: FnOnce(LocalExecutor) -> Fut + Send + 'static,
Fut: Future + 'static,
Fut::Output: Send + 'static,
{
let (runnable_sender, runnable_receiver) = flume::unbounded::<Runnable<RunnableMeta>>();
let (task_sender, task_receiver) = flume::bounded::<Task<Fut::Output>>(1);
thread::Builder::new()
.name(format!("spawn_dedicated session {:?}", session_id))
.spawn(move || {
let dispatch = move |runnable: Runnable<RunnableMeta>| {
let _ = runnable_sender.send(runnable);
};
let executor = LocalExecutor::new(session_id, scheduler, dispatch);
let root_task = executor.spawn(f(executor.clone()));
let _ = task_sender.send(root_task);
drop(executor);
while let Ok(runnable) = runnable_receiver.recv() {
runnable.run();
}
})
.expect("failed to spawn dedicated thread");
task_receiver
.recv()
.expect("dedicated thread failed to produce root task")
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct SessionId(u16);
impl SessionId {
pub fn new(id: u16) -> Self {
SessionId(id)
}
}
pub struct Timer(oneshot::Receiver<()>);
impl Timer {
pub fn new(rx: oneshot::Receiver<()>) -> Self {
Timer(rx)
}
}
impl Future for Timer {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending,
}
}
}