Skip to main content

open_gpui_scheduler/
scheduler.rs

1mod clock;
2mod executor;
3mod test_scheduler;
4#[cfg(test)]
5mod tests;
6
7pub use clock::*;
8pub use executor::*;
9pub use test_scheduler::*;
10
11use async_task::Runnable;
12use futures::channel::oneshot;
13use std::{
14    any::Any,
15    future::Future,
16    panic::Location,
17    pin::Pin,
18    sync::Arc,
19    task::{Context, Poll},
20    thread,
21    time::Duration,
22};
23
24/// Task priority for background tasks.
25///
26/// Higher priority tasks are more likely to be scheduled before lower priority tasks,
27/// but this is not a strict guarantee - the scheduler may interleave tasks of different
28/// priorities to prevent starvation.
29#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
30#[repr(u8)]
31pub enum Priority {
32    /// Realtime priority
33    ///
34    /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task. Only use for audio.
35    RealtimeAudio,
36    /// High priority - use for tasks critical to user experience/responsiveness.
37    High,
38    /// Medium priority - suitable for most use cases.
39    #[default]
40    Medium,
41    /// Low priority - use for background work that can be deprioritized.
42    Low,
43}
44
45impl Priority {
46    /// Returns the relative probability weight for this priority level.
47    /// Used by schedulers to determine task selection probability.
48    pub const fn weight(self) -> u32 {
49        match self {
50            Priority::High => 60,
51            Priority::Medium => 30,
52            Priority::Low => 10,
53            // realtime priorities are not considered for probability scheduling
54            Priority::RealtimeAudio => 0,
55        }
56    }
57}
58
59#[derive(Clone, Copy, Debug)]
60pub struct SpawnTime(pub Instant);
61
62/// Metadata attached to runnables for debugging and profiling.
63#[derive(Clone, Debug)]
64pub struct RunnableMeta {
65    /// The source location where the task was spawned.
66    pub location: &'static Location<'static>,
67    /// The moment the task was spawned.
68    pub spawned: SpawnTime,
69}
70
71impl RunnableMeta {
72    #[track_caller]
73    pub fn new_with_callers_location() -> Self {
74        Self {
75            location: core::panic::Location::caller(),
76            spawned: SpawnTime(Instant::now()),
77        }
78    }
79}
80
81pub trait Scheduler: Send + Sync {
82    /// Block until the given future completes or timeout occurs.
83    ///
84    /// Returns `true` if the future completed, `false` if it timed out.
85    /// The future is passed as a pinned mutable reference so the caller
86    /// retains ownership and can continue polling or return it on timeout.
87    fn block(
88        &self,
89        session_id: Option<SessionId>,
90        future: Pin<&mut dyn Future<Output = ()>>,
91        timeout: Option<Duration>,
92    ) -> bool;
93
94    /// Schedule a runnable on the local (session-pinned) queue for `session_id`.
95    /// Runnables scheduled here run in order on whichever thread drains the
96    /// session — the main thread for ordinary sessions, or a dedicated OS
97    /// thread for sessions created via `spawn_dedicated_thread`.
98    fn schedule_local(&self, session_id: SessionId, runnable: Runnable<RunnableMeta>);
99
100    /// Schedule a background task with the given priority.
101    fn schedule_background_with_priority(
102        &self,
103        runnable: Runnable<RunnableMeta>,
104        priority: Priority,
105    );
106
107    /// Spawn a closure on a dedicated realtime thread for audio processing.
108    fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>);
109
110    /// Schedule a background task with default (medium) priority.
111    fn schedule_background(&self, runnable: Runnable<RunnableMeta>) {
112        self.schedule_background_with_priority(runnable, Priority::default());
113    }
114
115    #[track_caller]
116    fn timer(&self, timeout: Duration) -> Timer;
117    fn clock(&self) -> Arc<dyn Clock>;
118
119    /// Spawn a closure on a fresh session pinned to its own [`LocalExecutor`].
120    ///
121    /// `PlatformScheduler` runs the closure on a new OS thread (see
122    /// [`spawn_dedicated_thread`]). `TestScheduler` runs it on the test
123    /// scheduler's loop alongside everything else so determinism under
124    /// `TestScheduler::many` is preserved.
125    ///
126    /// This is the dyn-safe entry point: the closure's output is type-erased
127    /// as `Box<dyn Any + Send + Sync>` so the trait stays object-safe.
128    /// Callers typically reach for the type-safe wrappers on
129    /// [`LocalExecutor::spawn_dedicated`] and
130    /// [`BackgroundExecutor::spawn_dedicated`], which compose this method
131    /// with [`Task::downcast`] to recover the closure's concrete return type.
132    fn spawn_dedicated(
133        self: Arc<Self>,
134        f: Box<
135            dyn FnOnce(
136                    LocalExecutor,
137                )
138                    -> Pin<Box<dyn Future<Output = Box<dyn Any + Send + Sync>> + 'static>>
139                + Send
140                + 'static,
141        >,
142    ) -> Task<Box<dyn Any + Send + Sync>>;
143
144    fn as_test(&self) -> Option<&TestScheduler> {
145        None
146    }
147}
148
149/// Spawn work on a fresh OS thread that's exclusive to the returned task and
150/// anything spawned on the executor it provides. Blocking syscalls inside that
151/// work don't disturb any other executor in the process.
152///
153/// `f` is called on the dedicated thread with a [`LocalExecutor`] pinned
154/// to it. The future `f` returns may freely be `!Send`. The returned `Task` is
155/// that future's task: dropping it cancels the root, but detached children
156/// keep running until they finish. The thread shuts down once the executor and
157/// every task on it are gone.
158///
159/// The caller is responsible for supplying a `session_id` that's distinct from
160/// every other live session on `scheduler`. Concrete schedulers typically wrap
161/// this in an inherent method that allocates the id from their own counter.
162pub fn spawn_dedicated_thread<F, Fut>(
163    session_id: SessionId,
164    scheduler: Arc<dyn Scheduler>,
165    f: F,
166) -> Task<Fut::Output>
167where
168    F: FnOnce(LocalExecutor) -> Fut + Send + 'static,
169    Fut: Future + 'static,
170    Fut::Output: Send + 'static,
171{
172    let (runnable_sender, runnable_receiver) = flume::unbounded::<Runnable<RunnableMeta>>();
173    let (task_sender, task_receiver) = flume::bounded::<Task<Fut::Output>>(1);
174
175    thread::Builder::new()
176        .name(format!("spawn_dedicated session {:?}", session_id))
177        .spawn(move || {
178            let dispatch = move |runnable: Runnable<RunnableMeta>| {
179                let _ = runnable_sender.send(runnable);
180            };
181            let executor = LocalExecutor::new(session_id, scheduler, dispatch);
182            let root_task = executor.spawn(f(executor.clone()));
183            let _ = task_sender.send(root_task);
184            // After this drop, every strong reference to the runnable sender
185            // lives inside a spawned task or a user-held executor clone. The
186            // recv loop exits once all of those are gone.
187            drop(executor);
188
189            while let Ok(runnable) = runnable_receiver.recv() {
190                runnable.run();
191            }
192        })
193        .expect("failed to spawn dedicated thread");
194
195    task_receiver
196        .recv()
197        .expect("dedicated thread failed to produce root task")
198}
199
200#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
201pub struct SessionId(u16);
202
203impl SessionId {
204    pub fn new(id: u16) -> Self {
205        SessionId(id)
206    }
207}
208
209pub struct Timer(oneshot::Receiver<()>);
210
211impl Timer {
212    pub fn new(rx: oneshot::Receiver<()>) -> Self {
213        Timer(rx)
214    }
215}
216
217impl Future for Timer {
218    type Output = ();
219
220    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
221        match Pin::new(&mut self.0).poll(cx) {
222            Poll::Ready(_) => Poll::Ready(()),
223            Poll::Pending => Poll::Pending,
224        }
225    }
226}