open_gpui_scheduler/scheduler.rs
1mod clock;
2mod executor;
3mod test_scheduler;
4#[cfg(test)]
5mod tests;
6
7pub use clock::*;
8pub use executor::*;
9pub use test_scheduler::*;
10
11use async_task::Runnable;
12use futures::channel::oneshot;
13use std::{
14 any::Any,
15 future::Future,
16 panic::Location,
17 pin::Pin,
18 sync::Arc,
19 task::{Context, Poll},
20 thread,
21 time::Duration,
22};
23
24/// Task priority for background tasks.
25///
26/// Higher priority tasks are more likely to be scheduled before lower priority tasks,
27/// but this is not a strict guarantee - the scheduler may interleave tasks of different
28/// priorities to prevent starvation.
29#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
30#[repr(u8)]
31pub enum Priority {
32 /// Realtime priority
33 ///
34 /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task. Only use for audio.
35 RealtimeAudio,
36 /// High priority - use for tasks critical to user experience/responsiveness.
37 High,
38 /// Medium priority - suitable for most use cases.
39 #[default]
40 Medium,
41 /// Low priority - use for background work that can be deprioritized.
42 Low,
43}
44
45impl Priority {
46 /// Returns the relative probability weight for this priority level.
47 /// Used by schedulers to determine task selection probability.
48 pub const fn weight(self) -> u32 {
49 match self {
50 Priority::High => 60,
51 Priority::Medium => 30,
52 Priority::Low => 10,
53 // realtime priorities are not considered for probability scheduling
54 Priority::RealtimeAudio => 0,
55 }
56 }
57}
58
59#[derive(Clone, Copy, Debug)]
60pub struct SpawnTime(pub Instant);
61
62/// Metadata attached to runnables for debugging and profiling.
63#[derive(Clone, Debug)]
64pub struct RunnableMeta {
65 /// The source location where the task was spawned.
66 pub location: &'static Location<'static>,
67 /// The moment the task was spawned.
68 pub spawned: SpawnTime,
69}
70
71impl RunnableMeta {
72 #[track_caller]
73 pub fn new_with_callers_location() -> Self {
74 Self {
75 location: core::panic::Location::caller(),
76 spawned: SpawnTime(Instant::now()),
77 }
78 }
79}
80
81pub trait Scheduler: Send + Sync {
82 /// Block until the given future completes or timeout occurs.
83 ///
84 /// Returns `true` if the future completed, `false` if it timed out.
85 /// The future is passed as a pinned mutable reference so the caller
86 /// retains ownership and can continue polling or return it on timeout.
87 fn block(
88 &self,
89 session_id: Option<SessionId>,
90 future: Pin<&mut dyn Future<Output = ()>>,
91 timeout: Option<Duration>,
92 ) -> bool;
93
94 /// Schedule a runnable on the local (session-pinned) queue for `session_id`.
95 /// Runnables scheduled here run in order on whichever thread drains the
96 /// session — the main thread for ordinary sessions, or a dedicated OS
97 /// thread for sessions created via `spawn_dedicated_thread`.
98 fn schedule_local(&self, session_id: SessionId, runnable: Runnable<RunnableMeta>);
99
100 /// Schedule a background task with the given priority.
101 fn schedule_background_with_priority(
102 &self,
103 runnable: Runnable<RunnableMeta>,
104 priority: Priority,
105 );
106
107 /// Spawn a closure on a dedicated realtime thread for audio processing.
108 fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>);
109
110 /// Schedule a background task with default (medium) priority.
111 fn schedule_background(&self, runnable: Runnable<RunnableMeta>) {
112 self.schedule_background_with_priority(runnable, Priority::default());
113 }
114
115 #[track_caller]
116 fn timer(&self, timeout: Duration) -> Timer;
117 fn clock(&self) -> Arc<dyn Clock>;
118
119 /// Spawn a closure on a fresh session pinned to its own [`LocalExecutor`].
120 ///
121 /// `PlatformScheduler` runs the closure on a new OS thread (see
122 /// [`spawn_dedicated_thread`]). `TestScheduler` runs it on the test
123 /// scheduler's loop alongside everything else so determinism under
124 /// `TestScheduler::many` is preserved.
125 ///
126 /// This is the dyn-safe entry point: the closure's output is type-erased
127 /// as `Box<dyn Any + Send + Sync>` so the trait stays object-safe.
128 /// Callers typically reach for the type-safe wrappers on
129 /// [`LocalExecutor::spawn_dedicated`] and
130 /// [`BackgroundExecutor::spawn_dedicated`], which compose this method
131 /// with [`Task::downcast`] to recover the closure's concrete return type.
132 fn spawn_dedicated(
133 self: Arc<Self>,
134 f: Box<
135 dyn FnOnce(
136 LocalExecutor,
137 )
138 -> Pin<Box<dyn Future<Output = Box<dyn Any + Send + Sync>> + 'static>>
139 + Send
140 + 'static,
141 >,
142 ) -> Task<Box<dyn Any + Send + Sync>>;
143
144 fn as_test(&self) -> Option<&TestScheduler> {
145 None
146 }
147}
148
149/// Spawn work on a fresh OS thread that's exclusive to the returned task and
150/// anything spawned on the executor it provides. Blocking syscalls inside that
151/// work don't disturb any other executor in the process.
152///
153/// `f` is called on the dedicated thread with a [`LocalExecutor`] pinned
154/// to it. The future `f` returns may freely be `!Send`. The returned `Task` is
155/// that future's task: dropping it cancels the root, but detached children
156/// keep running until they finish. The thread shuts down once the executor and
157/// every task on it are gone.
158///
159/// The caller is responsible for supplying a `session_id` that's distinct from
160/// every other live session on `scheduler`. Concrete schedulers typically wrap
161/// this in an inherent method that allocates the id from their own counter.
162pub fn spawn_dedicated_thread<F, Fut>(
163 session_id: SessionId,
164 scheduler: Arc<dyn Scheduler>,
165 f: F,
166) -> Task<Fut::Output>
167where
168 F: FnOnce(LocalExecutor) -> Fut + Send + 'static,
169 Fut: Future + 'static,
170 Fut::Output: Send + 'static,
171{
172 let (runnable_sender, runnable_receiver) = flume::unbounded::<Runnable<RunnableMeta>>();
173 let (task_sender, task_receiver) = flume::bounded::<Task<Fut::Output>>(1);
174
175 thread::Builder::new()
176 .name(format!("spawn_dedicated session {:?}", session_id))
177 .spawn(move || {
178 let dispatch = move |runnable: Runnable<RunnableMeta>| {
179 let _ = runnable_sender.send(runnable);
180 };
181 let executor = LocalExecutor::new(session_id, scheduler, dispatch);
182 let root_task = executor.spawn(f(executor.clone()));
183 let _ = task_sender.send(root_task);
184 // After this drop, every strong reference to the runnable sender
185 // lives inside a spawned task or a user-held executor clone. The
186 // recv loop exits once all of those are gone.
187 drop(executor);
188
189 while let Ok(runnable) = runnable_receiver.recv() {
190 runnable.run();
191 }
192 })
193 .expect("failed to spawn dedicated thread");
194
195 task_receiver
196 .recv()
197 .expect("dedicated thread failed to produce root task")
198}
199
200#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
201pub struct SessionId(u16);
202
203impl SessionId {
204 pub fn new(id: u16) -> Self {
205 SessionId(id)
206 }
207}
208
209pub struct Timer(oneshot::Receiver<()>);
210
211impl Timer {
212 pub fn new(rx: oneshot::Receiver<()>) -> Self {
213 Timer(rx)
214 }
215}
216
217impl Future for Timer {
218 type Output = ();
219
220 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
221 match Pin::new(&mut self.0).poll(cx) {
222 Poll::Ready(_) => Poll::Ready(()),
223 Poll::Pending => Poll::Pending,
224 }
225 }
226}