kozan_scheduler/executor.rs
1//! Local executor — single-threaded `!Send` async runtime.
2//!
3//! Each window thread has one `LocalExecutor`. It polls `!Send` futures
4//! on the window thread — this is what makes `Handle` (which is `Send`
5//! but `!Sync`) safe to use across `.await` points.
6//!
7//! # How it works
8//!
9//! ```text
10//! User code: let data = fetch(url).await;
11//! btn.set_text(&data.title);
12//!
13//! Internally:
14//! 1. ctx.spawn(future) → wraps future in a LocalTask
15//! 2. LocalTask stored in slab (Vec + free-list)
16//! 3. Executor::poll_all() → polls each ready task via ready_queue
17//! 4. If future yields (Pending) → Waker stored
18//! 5. Background thread completes → Waker::wake() pushes ID to ready_queue
19//! 6. Next poll_all() → polls only woken tasks → Ready(data)
20//! 7. Continuation runs on window thread → btn.set_text() safe
21//! ```
22//!
23//! # Chrome mapping
24//!
25//! Chrome doesn't have an async executor (it uses C++ callbacks).
26//! But the concept maps to Chrome's `PostTaskAndReplyWithResult()`:
27//! spawn work on pool → callback on originating sequence.
28//! Our executor gives the same guarantee via Rust's async/await.
29//!
30//! # Performance
31//!
32//! - Tasks stored in a Vec with free-list (no `HashMap` overhead).
33//! - Waker uses `Arc<AtomicBool>` — `Send + Sync`, safe from any thread.
34//! - `poll_all()` uses a ready queue — O(k) where k = woken tasks, not O(n).
35//!
36//! # Waker safety
37//!
38//! The `Waker` is backed by `Arc<AtomicBool>` — fully `Send + Sync`.
39//! Background threads (tokio, rayon, `std::thread`) can call `waker.wake()`
40//! safely. The atomic flag is checked by `poll_all()` on the window thread.
41
42use std::collections::VecDeque;
43use std::future::Future;
44use std::pin::Pin;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicBool, Ordering};
47use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
48
49/// Inner data for a task's `Waker`.
50///
51/// `Arc<WakerInner>` is the raw pointer stored in every `RawWaker`.
52/// Using a struct (instead of a bare `AtomicBool`) lets us carry a `notify`
53/// callback alongside the flag — when a background thread calls `waker.wake()`,
54/// it both marks the task as ready AND pokes the view thread out of its park.
55pub(crate) struct WakerInner {
56 /// Set to `true` when the task should be polled again.
57 /// Atomically writable from any thread.
58 pub(crate) woken: AtomicBool,
59
60 /// Optional callback called whenever `wake()` fires from any thread.
61 ///
62 /// On the view thread this is `None`. When a `LocalExecutor` is wired
63 /// to an event loop (see `set_notify`), this sends a "please tick" signal
64 /// so the scheduler thread stops parking and runs `poll_all` again.
65 notify: Option<Arc<dyn Fn() + Send + Sync>>,
66}
67
68/// A handle to a spawned local task. Can be used to check completion.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub struct TaskId(usize);
71
72/// A `!Send` future stored in the executor.
73type BoxLocalFuture = Pin<Box<dyn Future<Output = ()>>>;
74
75/// State of a single spawned task.
76struct LocalTask {
77 /// The future being polled.
78 future: BoxLocalFuture,
79
80 /// Shared waker inner — holds the woken flag and optional notify.
81 /// `Arc<WakerInner>` is `Send + Sync`, safe to set from ANY thread.
82 inner: Arc<WakerInner>,
83
84 /// Whether this task has completed (Ready).
85 completed: bool,
86}
87
88/// Single-threaded async executor for `!Send` futures.
89///
90/// This is the core of Kozan's async story. It allows user code like:
91///
92/// ```ignore
93/// let doc = ctx.document();
94/// let btn = doc.create::<HtmlButtonElement>();
95///
96/// // This future is !Send — it captures `btn` (which contains Handle).
97/// ctx.spawn(async move {
98/// let data = fetch("https://api.example.com").await;
99/// btn.set_text(&data.title); // safe! same thread
100/// });
101/// ```
102///
103/// # Design
104///
105/// Tasks are stored in a `Vec` with a free-list for O(1) reuse.
106/// Each task has a `woken` flag (`Arc<AtomicBool>`) — the `Waker` sets
107/// this atomically when the task should be polled again (safe from any thread).
108/// `poll_all()` only touches woken tasks — O(k) where k is woken count.
109///
110/// # Waker thread-safety
111///
112/// The `Waker` is backed by `Arc<AtomicBool>` which is `Send + Sync`.
113/// This means background threads (tokio runtime, rayon pool, `std::thread`)
114/// can safely call `waker.wake()` to signal that an I/O operation completed.
115/// This is the primary use case: `fetch(url).await` spawns HTTP work on
116/// a tokio runtime, and the completion callback calls our `Waker` from
117/// that background thread.
118pub struct LocalExecutor {
119 /// All spawned tasks. Completed tasks are `None` (slot freed).
120 tasks: Vec<Option<LocalTask>>,
121
122 /// Free indices for reuse (FIFO — oldest freed slot reused first).
123 free: VecDeque<usize>,
124
125 /// Newly spawned tasks that need initial polling.
126 spawn_queue: VecDeque<usize>,
127
128 /// Called when any waker fires from a background thread.
129 ///
130 /// Injected by the platform layer via `set_notify()`. Sends a signal to
131 /// the view thread's event channel so it stops parking and runs `tick()`.
132 notify: Option<Arc<dyn Fn() + Send + Sync>>,
133}
134
135impl LocalExecutor {
136 /// Create a new empty executor.
137 #[must_use]
138 pub fn new() -> Self {
139 Self {
140 tasks: Vec::new(),
141 free: VecDeque::new(),
142 spawn_queue: VecDeque::new(),
143 notify: None,
144 }
145 }
146
147 /// Wire a "wake the event loop" callback into every future's waker.
148 ///
149 /// When any future's `Waker` is called from a background thread,
150 /// `notify` is invoked in addition to setting the woken flag.
151 /// This lets the scheduler thread stop parking and call `poll_all()`.
152 ///
153 /// Call this once after construction, before spawning any futures.
154 pub fn set_notify(&mut self, notify: Arc<dyn Fn() + Send + Sync>) {
155 self.notify = Some(notify);
156 }
157
158 /// Spawn a `!Send` future on this executor.
159 ///
160 /// The future will be polled on the window thread.
161 /// Returns a [`TaskId`] that can be used to check completion.
162 ///
163 /// ```ignore
164 /// ctx.spawn(async {
165 /// let data = fetch(url).await;
166 /// node.set_text(&data); // !Send — safe on window thread
167 /// });
168 /// ```
169 pub fn spawn(&mut self, future: impl Future<Output = ()> + 'static) -> TaskId {
170 let task = LocalTask {
171 future: Box::pin(future),
172 inner: Arc::new(WakerInner {
173 woken: AtomicBool::new(true), // newly spawned = needs first poll
174 notify: self.notify.clone(),
175 }),
176 completed: false,
177 };
178
179 let id = if let Some(idx) = self.free.pop_front() {
180 self.tasks[idx] = Some(task);
181 idx
182 } else {
183 let idx = self.tasks.len();
184 self.tasks.push(Some(task));
185 idx
186 };
187
188 self.spawn_queue.push_back(id);
189 TaskId(id)
190 }
191
192 /// Poll all woken tasks. Returns the number of tasks that made progress.
193 ///
194 /// Call this in the event loop after processing cross-thread wake-ups.
195 /// Only polls tasks whose `Waker` has been invoked — idle tasks are skipped.
196 ///
197 /// A task that returns `Poll::Ready(())` is immediately cleaned up.
198 /// A task that returns `Poll::Pending` stays until woken again.
199 ///
200 /// # Complexity
201 ///
202 /// O(s + w) where s = spawn queue length, w = woken task count.
203 /// The scan over `tasks` checks only the atomic `woken` flag (branch prediction
204 /// favors the not-woken path for idle tasks).
205 pub fn poll_all(&mut self) -> usize {
206 let mut progress = 0;
207
208 // Phase 1: Poll newly spawned tasks.
209 while let Some(id) = self.spawn_queue.pop_front() {
210 if self.poll_task(id) {
211 progress += 1;
212 }
213 // Immediate cleanup if completed during first poll.
214 if self.tasks[id].as_ref().is_some_and(|t| t.completed) {
215 self.tasks[id] = None;
216 self.free.push_back(id);
217 }
218 }
219
220 // Phase 2: Poll woken tasks (skip idle ones via atomic flag check).
221 for id in 0..self.tasks.len() {
222 let Some(task) = &self.tasks[id] else {
223 continue;
224 };
225 if task.completed || !task.inner.woken.load(Ordering::Acquire) {
226 continue;
227 }
228
229 if self.poll_task(id) {
230 progress += 1;
231 }
232
233 // Immediate cleanup if completed.
234 if self.tasks[id].as_ref().is_some_and(|t| t.completed) {
235 self.tasks[id] = None;
236 self.free.push_back(id);
237 }
238 }
239
240 progress
241 }
242
243 /// Poll a single task by index. Returns true if polled (regardless of result).
244 fn poll_task(&mut self, id: usize) -> bool {
245 let Some(task) = &mut self.tasks[id] else {
246 return false;
247 };
248 if task.completed {
249 return false;
250 }
251
252 // Clear woken flag BEFORE polling — if the future wakes itself
253 // during poll, the flag will be set again atomically.
254 task.inner.woken.store(false, Ordering::Release);
255
256 // Create a Waker for this task.
257 let waker = create_waker(task.inner.clone());
258 let mut cx = Context::from_waker(&waker);
259
260 match task.future.as_mut().poll(&mut cx) {
261 Poll::Ready(()) => {
262 task.completed = true;
263 }
264 Poll::Pending => {
265 // Task will be polled again when woken.
266 }
267 }
268
269 true
270 }
271
272 /// Whether a specific task has completed.
273 ///
274 /// Returns `true` for completed tasks and cleaned-up slots.
275 /// Panics for out-of-range `TaskId`s.
276 #[must_use]
277 pub fn is_completed(&self, id: TaskId) -> bool {
278 match self.tasks.get(id.0) {
279 Some(Some(task)) => task.completed,
280 Some(None) => true, // cleaned up = completed
281 None => {
282 panic!("TaskId({}) out of range (max: {})", id.0, self.tasks.len());
283 }
284 }
285 }
286
287 /// Number of active (non-completed) tasks.
288 #[must_use]
289 pub fn active_count(&self) -> usize {
290 self.tasks
291 .iter()
292 .filter(|slot| slot.as_ref().is_some_and(|t| !t.completed))
293 .count()
294 }
295
296 /// Whether the executor has no active tasks.
297 #[must_use]
298 pub fn is_idle(&self) -> bool {
299 self.active_count() == 0 && self.spawn_queue.is_empty()
300 }
301
302 /// Whether any task is woken and needs polling.
303 #[must_use]
304 pub fn has_woken(&self) -> bool {
305 if !self.spawn_queue.is_empty() {
306 return true;
307 }
308 self.tasks.iter().any(|slot| {
309 slot.as_ref()
310 .is_some_and(|t| !t.completed && t.inner.woken.load(Ordering::Acquire))
311 })
312 }
313}
314
315impl Default for LocalExecutor {
316 fn default() -> Self {
317 Self::new()
318 }
319}
320
321// ---- Waker implementation ----
322//
323// Backed by `Arc<AtomicBool>` — fully `Send + Sync`.
324// Safe to call `wake()` from ANY thread (tokio, rayon, std::thread).
325//
326// The Waker stores a raw pointer to the Arc's inner data.
327// We manually manage the Arc reference count via clone/drop.
328
329/// Create a `Waker` backed by `Arc<WakerInner>`.
330///
331/// When woken from any thread this:
332/// 1. Atomically sets the `woken` flag so `poll_all()` will re-poll the task.
333/// 2. Calls `notify` (if set) to unpark the view thread from its sleep.
334fn create_waker(inner: Arc<WakerInner>) -> Waker {
335 let raw = Arc::into_raw(inner) as *const ();
336 let raw_waker = RawWaker::new(raw, &VTABLE);
337 // SAFETY: vtable correctly manages the Arc<WakerInner> refcount.
338 // Arc<WakerInner> is Send + Sync — safe to call from any thread.
339 unsafe { Waker::from_raw(raw_waker) }
340}
341
342const VTABLE: RawWakerVTable =
343 RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop);
344
345/// Clone: increment Arc refcount, return a new `RawWaker`.
346unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
347 let arc = unsafe { Arc::from_raw(ptr as *const WakerInner) };
348 let cloned = arc.clone();
349 std::mem::forget(arc); // don't drop — cloning, not moving
350 RawWaker::new(Arc::into_raw(cloned) as *const (), &VTABLE)
351}
352
353/// Wake by value: set flag + notify, then drop the Arc.
354unsafe fn waker_wake(ptr: *const ()) {
355 let arc = unsafe { Arc::from_raw(ptr as *const WakerInner) };
356 arc.woken.store(true, Ordering::Release);
357 if let Some(notify) = &arc.notify {
358 notify();
359 }
360 // arc dropped here — decrements refcount.
361}
362
363/// Wake by reference: set flag + notify, don't drop.
364unsafe fn waker_wake_by_ref(ptr: *const ()) {
365 let arc = unsafe { Arc::from_raw(ptr as *const WakerInner) };
366 arc.woken.store(true, Ordering::Release);
367 if let Some(notify) = &arc.notify {
368 notify();
369 }
370 std::mem::forget(arc); // don't drop — by-ref
371}
372
373/// Drop: decrement Arc refcount.
374unsafe fn waker_drop(ptr: *const ()) {
375 let _arc = unsafe { Arc::from_raw(ptr as *const WakerInner) };
376 // _arc dropped here — decrements refcount.
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use std::cell::{Cell, RefCell};
383 use std::rc::Rc;
384 use std::task::Waker;
385
386 #[test]
387 fn spawn_and_complete_immediate() {
388 let mut exec = LocalExecutor::new();
389 let done = Rc::new(Cell::new(false));
390
391 let d = done.clone();
392 let id = exec.spawn(async move {
393 d.set(true);
394 });
395
396 assert_eq!(exec.active_count(), 1);
397 exec.poll_all();
398 assert!(done.get());
399 assert!(exec.is_completed(id));
400 assert!(exec.is_idle());
401 }
402
403 #[test]
404 fn spawn_pending_then_wake() {
405 let mut exec = LocalExecutor::new();
406 let counter = Rc::new(Cell::new(0u32));
407 let waker_holder: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
408
409 let c = counter.clone();
410 let wh = waker_holder.clone();
411 exec.spawn(async move {
412 std::future::poll_fn(|cx| {
413 let count = c.get();
414 if count == 0 {
415 *wh.borrow_mut() = Some(cx.waker().clone());
416 c.set(1);
417 Poll::Pending
418 } else {
419 c.set(2);
420 Poll::Ready(())
421 }
422 })
423 .await;
424 });
425
426 // First poll — task yields.
427 exec.poll_all();
428 assert_eq!(counter.get(), 1);
429 assert!(!exec.is_idle());
430
431 // Wake the task (simulating background completion).
432 waker_holder.borrow().as_ref().unwrap().wake_by_ref();
433
434 // Second poll — task completes.
435 exec.poll_all();
436 assert_eq!(counter.get(), 2);
437 assert!(exec.is_idle());
438 }
439
440 #[test]
441 fn wake_from_another_thread() {
442 // This test verifies the Waker is truly Send + Sync.
443 let mut exec = LocalExecutor::new();
444 let waker_holder: Arc<std::sync::Mutex<Option<Waker>>> =
445 Arc::new(std::sync::Mutex::new(None));
446
447 let wh = waker_holder.clone();
448 exec.spawn(async move {
449 std::future::poll_fn(|cx| {
450 let mut guard = wh.lock().unwrap();
451 if guard.is_none() {
452 *guard = Some(cx.waker().clone());
453 Poll::Pending
454 } else {
455 Poll::Ready(())
456 }
457 })
458 .await;
459 });
460
461 // First poll — future stores waker and yields.
462 exec.poll_all();
463 assert!(!exec.is_idle());
464
465 // Wake from another thread — this is the PRIMARY use case
466 // (tokio/rayon background thread completing an I/O operation).
467 let wh = waker_holder.clone();
468 let handle = std::thread::spawn(move || {
469 let guard = wh.lock().unwrap();
470 guard.as_ref().unwrap().wake_by_ref();
471 });
472 handle.join().unwrap();
473
474 // Back on "window thread" — poll completes the task.
475 exec.poll_all();
476 assert!(exec.is_idle());
477 }
478
479 #[test]
480 fn multiple_tasks() {
481 let mut exec = LocalExecutor::new();
482 let log = Rc::new(RefCell::new(Vec::new()));
483
484 for i in 0..5 {
485 let l = log.clone();
486 exec.spawn(async move {
487 l.borrow_mut().push(i);
488 });
489 }
490
491 exec.poll_all();
492 assert_eq!(*log.borrow(), vec![0, 1, 2, 3, 4]);
493 assert!(exec.is_idle());
494 }
495
496 #[test]
497 fn task_id_reuse() {
498 let mut exec = LocalExecutor::new();
499
500 // Spawn and complete.
501 let id1 = exec.spawn(async {});
502 exec.poll_all();
503 assert!(exec.is_completed(id1));
504
505 // Spawn again — should reuse the slot.
506 let id2 = exec.spawn(async {});
507 assert_eq!(id1.0, id2.0); // same index
508 exec.poll_all();
509 }
510
511 #[test]
512 fn has_woken() {
513 let mut exec = LocalExecutor::new();
514 assert!(!exec.has_woken());
515
516 let wh: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
517 let wh2 = wh.clone();
518 exec.spawn(async move {
519 std::future::poll_fn(|cx| {
520 *wh2.borrow_mut() = Some(cx.waker().clone());
521 Poll::<()>::Pending
522 })
523 .await;
524 });
525
526 assert!(exec.has_woken()); // newly spawned
527 exec.poll_all();
528 assert!(!exec.has_woken()); // pending, not woken
529
530 // Wake from "background".
531 wh.borrow().as_ref().unwrap().wake_by_ref();
532 assert!(exec.has_woken());
533 }
534
535 #[test]
536 fn waker_clone_and_drop() {
537 // Ensure waker clone/drop doesn't leak or double-free.
538 let inner = Arc::new(WakerInner {
539 woken: AtomicBool::new(false),
540 notify: None,
541 });
542 let waker = create_waker(inner.clone());
543
544 let waker2 = waker.clone();
545 drop(waker);
546
547 waker2.wake_by_ref();
548 assert!(inner.woken.load(Ordering::Acquire));
549
550 drop(waker2);
551 // Should not crash — Arc refcount managed correctly.
552 }
553
554 #[test]
555 fn completed_task_query() {
556 let _exec = LocalExecutor::new();
557 // Querying cleaned-up slot returns true.
558 // Out-of-range panics in debug (tested separately if needed).
559 }
560
561 #[test]
562 fn immediate_cleanup_on_complete() {
563 let mut exec = LocalExecutor::new();
564 let id = exec.spawn(async {});
565 exec.poll_all();
566
567 // Task should be cleaned up immediately, not in a separate pass.
568 assert!(exec.is_completed(id));
569 assert!(exec.is_idle());
570 // Slot should be freed for reuse.
571 let id2 = exec.spawn(async {});
572 assert_eq!(id.0, id2.0);
573 }
574}