Skip to main content

kozan_scheduler/
executor.rs

1//! Local executor — single-threaded `!Send` async runtime.
2//!
3//! Each window thread has one `LocalExecutor`. It polls `!Send` futures
4//! on the window thread — this is what makes `Handle` (which is `Send`
5//! but `!Sync`) safe to use across `.await` points.
6//!
7//! # How it works
8//!
9//! ```text
10//! User code:  let data = fetch(url).await;
11//!             btn.set_text(&data.title);
12//!
13//! Internally:
14//! 1. ctx.spawn(future) → wraps future in a LocalTask
15//! 2. LocalTask stored in slab (Vec + free-list)
16//! 3. Executor::poll_all() → polls each ready task via ready_queue
17//! 4. If future yields (Pending) → Waker stored
18//! 5. Background thread completes → Waker::wake() pushes ID to ready_queue
19//! 6. Next poll_all() → polls only woken tasks → Ready(data)
20//! 7. Continuation runs on window thread → btn.set_text() safe
21//! ```
22//!
23//! # Chrome mapping
24//!
25//! Chrome doesn't have an async executor (it uses C++ callbacks).
26//! But the concept maps to Chrome's `PostTaskAndReplyWithResult()`:
27//! spawn work on pool → callback on originating sequence.
28//! Our executor gives the same guarantee via Rust's async/await.
29//!
30//! # Performance
31//!
32//! - Tasks stored in a Vec with free-list (no `HashMap` overhead).
33//! - Waker uses `Arc<AtomicBool>` — `Send + Sync`, safe from any thread.
34//! - `poll_all()` uses a ready queue — O(k) where k = woken tasks, not O(n).
35//!
36//! # Waker safety
37//!
38//! The `Waker` is backed by `Arc<AtomicBool>` — fully `Send + Sync`.
39//! Background threads (tokio, rayon, `std::thread`) can call `waker.wake()`
40//! safely. The atomic flag is checked by `poll_all()` on the window thread.
41
42use std::collections::VecDeque;
43use std::future::Future;
44use std::pin::Pin;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicBool, Ordering};
47use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
48
49/// Inner data for a task's `Waker`.
50///
51/// `Arc<WakerInner>` is the raw pointer stored in every `RawWaker`.
52/// Using a struct (instead of a bare `AtomicBool`) lets us carry a `notify`
53/// callback alongside the flag — when a background thread calls `waker.wake()`,
54/// it both marks the task as ready AND pokes the view thread out of its park.
55pub(crate) struct WakerInner {
56    /// Set to `true` when the task should be polled again.
57    /// Atomically writable from any thread.
58    pub(crate) woken: AtomicBool,
59
60    /// Optional callback called whenever `wake()` fires from any thread.
61    ///
62    /// On the view thread this is `None`.  When a `LocalExecutor` is wired
63    /// to an event loop (see `set_notify`), this sends a "please tick" signal
64    /// so the scheduler thread stops parking and runs `poll_all` again.
65    notify: Option<Arc<dyn Fn() + Send + Sync>>,
66}
67
68/// A handle to a spawned local task. Can be used to check completion.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub struct TaskId(usize);
71
72/// A `!Send` future stored in the executor.
73type BoxLocalFuture = Pin<Box<dyn Future<Output = ()>>>;
74
75/// State of a single spawned task.
76struct LocalTask {
77    /// The future being polled.
78    future: BoxLocalFuture,
79
80    /// Shared waker inner — holds the woken flag and optional notify.
81    /// `Arc<WakerInner>` is `Send + Sync`, safe to set from ANY thread.
82    inner: Arc<WakerInner>,
83
84    /// Whether this task has completed (Ready).
85    completed: bool,
86}
87
88/// Single-threaded async executor for `!Send` futures.
89///
90/// This is the core of Kozan's async story. It allows user code like:
91///
92/// ```ignore
93/// let doc = ctx.document();
94/// let btn = doc.create::<HtmlButtonElement>();
95///
96/// // This future is !Send — it captures `btn` (which contains Handle).
97/// ctx.spawn(async move {
98///     let data = fetch("https://api.example.com").await;
99///     btn.set_text(&data.title);  // safe! same thread
100/// });
101/// ```
102///
103/// # Design
104///
105/// Tasks are stored in a `Vec` with a free-list for O(1) reuse.
106/// Each task has a `woken` flag (`Arc<AtomicBool>`) — the `Waker` sets
107/// this atomically when the task should be polled again (safe from any thread).
108/// `poll_all()` only touches woken tasks — O(k) where k is woken count.
109///
110/// # Waker thread-safety
111///
112/// The `Waker` is backed by `Arc<AtomicBool>` which is `Send + Sync`.
113/// This means background threads (tokio runtime, rayon pool, `std::thread`)
114/// can safely call `waker.wake()` to signal that an I/O operation completed.
115/// This is the primary use case: `fetch(url).await` spawns HTTP work on
116/// a tokio runtime, and the completion callback calls our `Waker` from
117/// that background thread.
118pub struct LocalExecutor {
119    /// All spawned tasks. Completed tasks are `None` (slot freed).
120    tasks: Vec<Option<LocalTask>>,
121
122    /// Free indices for reuse (FIFO — oldest freed slot reused first).
123    free: VecDeque<usize>,
124
125    /// Newly spawned tasks that need initial polling.
126    spawn_queue: VecDeque<usize>,
127
128    /// Called when any waker fires from a background thread.
129    ///
130    /// Injected by the platform layer via `set_notify()`.  Sends a signal to
131    /// the view thread's event channel so it stops parking and runs `tick()`.
132    notify: Option<Arc<dyn Fn() + Send + Sync>>,
133}
134
135impl LocalExecutor {
136    /// Create a new empty executor.
137    #[must_use]
138    pub fn new() -> Self {
139        Self {
140            tasks: Vec::new(),
141            free: VecDeque::new(),
142            spawn_queue: VecDeque::new(),
143            notify: None,
144        }
145    }
146
147    /// Wire a "wake the event loop" callback into every future's waker.
148    ///
149    /// When any future's `Waker` is called from a background thread,
150    /// `notify` is invoked in addition to setting the woken flag.
151    /// This lets the scheduler thread stop parking and call `poll_all()`.
152    ///
153    /// Call this once after construction, before spawning any futures.
154    pub fn set_notify(&mut self, notify: Arc<dyn Fn() + Send + Sync>) {
155        self.notify = Some(notify);
156    }
157
158    /// Spawn a `!Send` future on this executor.
159    ///
160    /// The future will be polled on the window thread.
161    /// Returns a [`TaskId`] that can be used to check completion.
162    ///
163    /// ```ignore
164    /// ctx.spawn(async {
165    ///     let data = fetch(url).await;
166    ///     node.set_text(&data);  // !Send — safe on window thread
167    /// });
168    /// ```
169    pub fn spawn(&mut self, future: impl Future<Output = ()> + 'static) -> TaskId {
170        let task = LocalTask {
171            future: Box::pin(future),
172            inner: Arc::new(WakerInner {
173                woken: AtomicBool::new(true), // newly spawned = needs first poll
174                notify: self.notify.clone(),
175            }),
176            completed: false,
177        };
178
179        let id = if let Some(idx) = self.free.pop_front() {
180            self.tasks[idx] = Some(task);
181            idx
182        } else {
183            let idx = self.tasks.len();
184            self.tasks.push(Some(task));
185            idx
186        };
187
188        self.spawn_queue.push_back(id);
189        TaskId(id)
190    }
191
192    /// Poll all woken tasks. Returns the number of tasks that made progress.
193    ///
194    /// Call this in the event loop after processing cross-thread wake-ups.
195    /// Only polls tasks whose `Waker` has been invoked — idle tasks are skipped.
196    ///
197    /// A task that returns `Poll::Ready(())` is immediately cleaned up.
198    /// A task that returns `Poll::Pending` stays until woken again.
199    ///
200    /// # Complexity
201    ///
202    /// O(s + w) where s = spawn queue length, w = woken task count.
203    /// The scan over `tasks` checks only the atomic `woken` flag (branch prediction
204    /// favors the not-woken path for idle tasks).
205    pub fn poll_all(&mut self) -> usize {
206        let mut progress = 0;
207
208        // Phase 1: Poll newly spawned tasks.
209        while let Some(id) = self.spawn_queue.pop_front() {
210            if self.poll_task(id) {
211                progress += 1;
212            }
213            // Immediate cleanup if completed during first poll.
214            if self.tasks[id].as_ref().is_some_and(|t| t.completed) {
215                self.tasks[id] = None;
216                self.free.push_back(id);
217            }
218        }
219
220        // Phase 2: Poll woken tasks (skip idle ones via atomic flag check).
221        for id in 0..self.tasks.len() {
222            let Some(task) = &self.tasks[id] else {
223                continue;
224            };
225            if task.completed || !task.inner.woken.load(Ordering::Acquire) {
226                continue;
227            }
228
229            if self.poll_task(id) {
230                progress += 1;
231            }
232
233            // Immediate cleanup if completed.
234            if self.tasks[id].as_ref().is_some_and(|t| t.completed) {
235                self.tasks[id] = None;
236                self.free.push_back(id);
237            }
238        }
239
240        progress
241    }
242
243    /// Poll a single task by index. Returns true if polled (regardless of result).
244    fn poll_task(&mut self, id: usize) -> bool {
245        let Some(task) = &mut self.tasks[id] else {
246            return false;
247        };
248        if task.completed {
249            return false;
250        }
251
252        // Clear woken flag BEFORE polling — if the future wakes itself
253        // during poll, the flag will be set again atomically.
254        task.inner.woken.store(false, Ordering::Release);
255
256        // Create a Waker for this task.
257        let waker = create_waker(task.inner.clone());
258        let mut cx = Context::from_waker(&waker);
259
260        match task.future.as_mut().poll(&mut cx) {
261            Poll::Ready(()) => {
262                task.completed = true;
263            }
264            Poll::Pending => {
265                // Task will be polled again when woken.
266            }
267        }
268
269        true
270    }
271
272    /// Whether a specific task has completed.
273    ///
274    /// Returns `true` for completed tasks and cleaned-up slots.
275    /// Panics for out-of-range `TaskId`s.
276    #[must_use]
277    pub fn is_completed(&self, id: TaskId) -> bool {
278        match self.tasks.get(id.0) {
279            Some(Some(task)) => task.completed,
280            Some(None) => true, // cleaned up = completed
281            None => {
282                panic!("TaskId({}) out of range (max: {})", id.0, self.tasks.len());
283            }
284        }
285    }
286
287    /// Number of active (non-completed) tasks.
288    #[must_use]
289    pub fn active_count(&self) -> usize {
290        self.tasks
291            .iter()
292            .filter(|slot| slot.as_ref().is_some_and(|t| !t.completed))
293            .count()
294    }
295
296    /// Whether the executor has no active tasks.
297    #[must_use]
298    pub fn is_idle(&self) -> bool {
299        self.active_count() == 0 && self.spawn_queue.is_empty()
300    }
301
302    /// Whether any task is woken and needs polling.
303    #[must_use]
304    pub fn has_woken(&self) -> bool {
305        if !self.spawn_queue.is_empty() {
306            return true;
307        }
308        self.tasks.iter().any(|slot| {
309            slot.as_ref()
310                .is_some_and(|t| !t.completed && t.inner.woken.load(Ordering::Acquire))
311        })
312    }
313}
314
315impl Default for LocalExecutor {
316    fn default() -> Self {
317        Self::new()
318    }
319}
320
321// ---- Waker implementation ----
322//
323// Backed by `Arc<AtomicBool>` — fully `Send + Sync`.
324// Safe to call `wake()` from ANY thread (tokio, rayon, std::thread).
325//
326// The Waker stores a raw pointer to the Arc's inner data.
327// We manually manage the Arc reference count via clone/drop.
328
329/// Create a `Waker` backed by `Arc<WakerInner>`.
330///
331/// When woken from any thread this:
332/// 1. Atomically sets the `woken` flag so `poll_all()` will re-poll the task.
333/// 2. Calls `notify` (if set) to unpark the view thread from its sleep.
334fn create_waker(inner: Arc<WakerInner>) -> Waker {
335    let raw = Arc::into_raw(inner) as *const ();
336    let raw_waker = RawWaker::new(raw, &VTABLE);
337    // SAFETY: vtable correctly manages the Arc<WakerInner> refcount.
338    // Arc<WakerInner> is Send + Sync — safe to call from any thread.
339    unsafe { Waker::from_raw(raw_waker) }
340}
341
342const VTABLE: RawWakerVTable =
343    RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop);
344
345/// Clone: increment Arc refcount, return a new `RawWaker`.
346unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
347    let arc = unsafe { Arc::from_raw(ptr as *const WakerInner) };
348    let cloned = arc.clone();
349    std::mem::forget(arc); // don't drop — cloning, not moving
350    RawWaker::new(Arc::into_raw(cloned) as *const (), &VTABLE)
351}
352
353/// Wake by value: set flag + notify, then drop the Arc.
354unsafe fn waker_wake(ptr: *const ()) {
355    let arc = unsafe { Arc::from_raw(ptr as *const WakerInner) };
356    arc.woken.store(true, Ordering::Release);
357    if let Some(notify) = &arc.notify {
358        notify();
359    }
360    // arc dropped here — decrements refcount.
361}
362
363/// Wake by reference: set flag + notify, don't drop.
364unsafe fn waker_wake_by_ref(ptr: *const ()) {
365    let arc = unsafe { Arc::from_raw(ptr as *const WakerInner) };
366    arc.woken.store(true, Ordering::Release);
367    if let Some(notify) = &arc.notify {
368        notify();
369    }
370    std::mem::forget(arc); // don't drop — by-ref
371}
372
373/// Drop: decrement Arc refcount.
374unsafe fn waker_drop(ptr: *const ()) {
375    let _arc = unsafe { Arc::from_raw(ptr as *const WakerInner) };
376    // _arc dropped here — decrements refcount.
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382    use std::cell::{Cell, RefCell};
383    use std::rc::Rc;
384    use std::task::Waker;
385
386    #[test]
387    fn spawn_and_complete_immediate() {
388        let mut exec = LocalExecutor::new();
389        let done = Rc::new(Cell::new(false));
390
391        let d = done.clone();
392        let id = exec.spawn(async move {
393            d.set(true);
394        });
395
396        assert_eq!(exec.active_count(), 1);
397        exec.poll_all();
398        assert!(done.get());
399        assert!(exec.is_completed(id));
400        assert!(exec.is_idle());
401    }
402
403    #[test]
404    fn spawn_pending_then_wake() {
405        let mut exec = LocalExecutor::new();
406        let counter = Rc::new(Cell::new(0u32));
407        let waker_holder: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
408
409        let c = counter.clone();
410        let wh = waker_holder.clone();
411        exec.spawn(async move {
412            std::future::poll_fn(|cx| {
413                let count = c.get();
414                if count == 0 {
415                    *wh.borrow_mut() = Some(cx.waker().clone());
416                    c.set(1);
417                    Poll::Pending
418                } else {
419                    c.set(2);
420                    Poll::Ready(())
421                }
422            })
423            .await;
424        });
425
426        // First poll — task yields.
427        exec.poll_all();
428        assert_eq!(counter.get(), 1);
429        assert!(!exec.is_idle());
430
431        // Wake the task (simulating background completion).
432        waker_holder.borrow().as_ref().unwrap().wake_by_ref();
433
434        // Second poll — task completes.
435        exec.poll_all();
436        assert_eq!(counter.get(), 2);
437        assert!(exec.is_idle());
438    }
439
440    #[test]
441    fn wake_from_another_thread() {
442        // This test verifies the Waker is truly Send + Sync.
443        let mut exec = LocalExecutor::new();
444        let waker_holder: Arc<std::sync::Mutex<Option<Waker>>> =
445            Arc::new(std::sync::Mutex::new(None));
446
447        let wh = waker_holder.clone();
448        exec.spawn(async move {
449            std::future::poll_fn(|cx| {
450                let mut guard = wh.lock().unwrap();
451                if guard.is_none() {
452                    *guard = Some(cx.waker().clone());
453                    Poll::Pending
454                } else {
455                    Poll::Ready(())
456                }
457            })
458            .await;
459        });
460
461        // First poll — future stores waker and yields.
462        exec.poll_all();
463        assert!(!exec.is_idle());
464
465        // Wake from another thread — this is the PRIMARY use case
466        // (tokio/rayon background thread completing an I/O operation).
467        let wh = waker_holder.clone();
468        let handle = std::thread::spawn(move || {
469            let guard = wh.lock().unwrap();
470            guard.as_ref().unwrap().wake_by_ref();
471        });
472        handle.join().unwrap();
473
474        // Back on "window thread" — poll completes the task.
475        exec.poll_all();
476        assert!(exec.is_idle());
477    }
478
479    #[test]
480    fn multiple_tasks() {
481        let mut exec = LocalExecutor::new();
482        let log = Rc::new(RefCell::new(Vec::new()));
483
484        for i in 0..5 {
485            let l = log.clone();
486            exec.spawn(async move {
487                l.borrow_mut().push(i);
488            });
489        }
490
491        exec.poll_all();
492        assert_eq!(*log.borrow(), vec![0, 1, 2, 3, 4]);
493        assert!(exec.is_idle());
494    }
495
496    #[test]
497    fn task_id_reuse() {
498        let mut exec = LocalExecutor::new();
499
500        // Spawn and complete.
501        let id1 = exec.spawn(async {});
502        exec.poll_all();
503        assert!(exec.is_completed(id1));
504
505        // Spawn again — should reuse the slot.
506        let id2 = exec.spawn(async {});
507        assert_eq!(id1.0, id2.0); // same index
508        exec.poll_all();
509    }
510
511    #[test]
512    fn has_woken() {
513        let mut exec = LocalExecutor::new();
514        assert!(!exec.has_woken());
515
516        let wh: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
517        let wh2 = wh.clone();
518        exec.spawn(async move {
519            std::future::poll_fn(|cx| {
520                *wh2.borrow_mut() = Some(cx.waker().clone());
521                Poll::<()>::Pending
522            })
523            .await;
524        });
525
526        assert!(exec.has_woken()); // newly spawned
527        exec.poll_all();
528        assert!(!exec.has_woken()); // pending, not woken
529
530        // Wake from "background".
531        wh.borrow().as_ref().unwrap().wake_by_ref();
532        assert!(exec.has_woken());
533    }
534
535    #[test]
536    fn waker_clone_and_drop() {
537        // Ensure waker clone/drop doesn't leak or double-free.
538        let inner = Arc::new(WakerInner {
539            woken: AtomicBool::new(false),
540            notify: None,
541        });
542        let waker = create_waker(inner.clone());
543
544        let waker2 = waker.clone();
545        drop(waker);
546
547        waker2.wake_by_ref();
548        assert!(inner.woken.load(Ordering::Acquire));
549
550        drop(waker2);
551        // Should not crash — Arc refcount managed correctly.
552    }
553
554    #[test]
555    fn completed_task_query() {
556        let _exec = LocalExecutor::new();
557        // Querying cleaned-up slot returns true.
558        // Out-of-range panics in debug (tested separately if needed).
559    }
560
561    #[test]
562    fn immediate_cleanup_on_complete() {
563        let mut exec = LocalExecutor::new();
564        let id = exec.spawn(async {});
565        exec.poll_all();
566
567        // Task should be cleaned up immediately, not in a separate pass.
568        assert!(exec.is_completed(id));
569        assert!(exec.is_idle());
570        // Slot should be freed for reuse.
571        let id2 = exec.spawn(async {});
572        assert_eq!(id.0, id2.0);
573    }
574}