Skip to main content

Executor

Struct Executor 

Source
pub struct Executor { /* private fields */ }
Expand description

A single-threaded async task executor with priority queues.

Each Executor manages its own task slots, ready queues, and deferred callback buffers. Use Executor::new_instance to create an isolated executor (e.g. per SSR request), or use the global thread-local executor via spawn_global.

Implementations§

Source§

impl Executor

Source

pub fn active_task_count(&self) -> usize

Return the number of currently active (not-yet-completed) tasks.

Used by streaming SSR to determine whether the stream should wait for more work or terminate.

Source§

impl Executor

Source

pub fn new_instance() -> Rc<RefCell<Executor>>

Create a new isolated executor, wrapped for shared access.

The returned executor is independent of the global thread-local executor. Use with_executor to make it the current executor for the duration of a closure, so that spawned tasks and signal callbacks are routed to it.

Examples found in repository?
examples/multi_instance_isolated.rs (line 29)
28fn handle_request(request_id: u32) -> String {
29    let ex = Executor::new_instance();
30    Executor::install_flush_scheduler(&ex, Rc::new(SyncScheduler));
31
32    let scope = TaskScope::with_executor(&ex);
33    let data = Signal::new(String::new());
34
35    // "route handler" spawns a reactive effect
36    let d = data.clone();
37    scope.spawn(async move {
38        loop {
39            d.changed().await;
40            let val = d.read();
41            if val == "done" {
42                break;
43            }
44        }
45    });
46
47    // "middleware" sets up cleanup
48    let cleanup_called = Rc::new(Cell::new(false));
49    let cc = Rc::clone(&cleanup_called);
50    scope.on_cleanup(move || cc.set(true));
51
52    // "business logic" — would be I/O in real app
53    let result = auralis_task::with_executor(&ex, || {
54        data.set(format!("request {request_id}: processing"));
55        data.set(format!("request {request_id}: done"));
56        data.read()
57    });
58
59    // Drop scope → cancels effect, runs cleanup.
60    drop(scope);
61
62    assert!(cleanup_called.get());
63    result
64}
65
66fn main() {
67    // ---- Pattern 1: sequential requests (same thread, isolated) ----------
68    println!("=== 1. Sequential request isolation ===");
69    let r1 = handle_request(1);
70    let r2 = handle_request(2);
71    println!("  {r1}");
72    println!("  {r2}");
73    assert_eq!(r1, "request 1: done");
74    assert_eq!(r2, "request 2: done");
75
76    // ---- Pattern 2: same-signal isolation test (different executors) -----
77    println!("\n=== 2. Cross-executor signal isolation ===");
78    {
79        let ex_a = Executor::new_instance();
80        Executor::install_flush_scheduler(&ex_a, Rc::new(SyncScheduler));
81        let ex_b = Executor::new_instance();
82        Executor::install_flush_scheduler(&ex_b, Rc::new(SyncScheduler));
83
84        let sig_a1 = Signal::new(0i32);
85        let sig_b1 = Signal::new(0i32);
86
87        // Scope on executor A sets sig_a and reads sig_b.
88        let scope_a = TaskScope::with_executor(&ex_a);
89        let a_val = Rc::new(RefCell::new(Vec::new()));
90        let av = Rc::clone(&a_val);
91        {
92            let s = sig_a1.clone();
93            scope_a.spawn(async move {
94                s.set(42);
95                av.borrow_mut().push(s.read());
96            });
97        }
98        drop(scope_a);
99
100        // Scope on executor B sets sig_b and reads sig_a.
101        let scope_b = TaskScope::with_executor(&ex_b);
102        let b_val = Rc::new(RefCell::new(Vec::new()));
103        let bv = Rc::clone(&b_val);
104        {
105            let s = sig_b1.clone();
106            scope_b.spawn(async move {
107                s.set(99);
108                bv.borrow_mut().push(s.read());
109            });
110        }
111        drop(scope_b);
112
113        // Each executor only sees its own signal changes.
114        assert_eq!(*a_val.borrow(), vec![42]);
115        assert_eq!(*b_val.borrow(), vec![99]);
116        assert_eq!(sig_a1.read(), 42);
117        assert_eq!(sig_b1.read(), 99);
118    }
119
120    // ---- Pattern 3: the SSR mental model (one request = one executor) ----
121    println!("\n=== 3. SSR mental model ===");
122    for i in 0..3 {
123        let ex = Executor::new_instance();
124        Executor::install_flush_scheduler(&ex, Rc::new(SyncScheduler));
125        let counter = Signal::new(0i32);
126
127        let scope = TaskScope::with_executor(&ex);
128        let c = counter.clone();
129        let results = Rc::new(RefCell::new(Vec::new()));
130        let res = Rc::clone(&results);
131
132        scope.spawn(async move {
133            loop {
134                c.changed().await;
135                let v = c.read();
136                res.borrow_mut().push(v);
137                if v >= 3 {
138                    break;
139                }
140            }
141        });
142
143        auralis_task::with_executor(&ex, || {
144            counter.set(1);
145            counter.set(2);
146            counter.set(3);
147        });
148
149        drop(scope);
150        assert_eq!(*results.borrow(), vec![1, 2, 3]);
151        println!("  request {i}: signals {:?}", results.borrow());
152    }
153
154    println!("\nAll patterns completed — zero cross-request leakage.");
155}
Source

pub fn install_flush_scheduler( ex: &Rc<RefCell<Executor>>, sched: Rc<dyn ScheduleFlush>, )

Install a flush scheduler on this executor instance.

Examples found in repository?
examples/multi_instance_isolated.rs (line 30)
28fn handle_request(request_id: u32) -> String {
29    let ex = Executor::new_instance();
30    Executor::install_flush_scheduler(&ex, Rc::new(SyncScheduler));
31
32    let scope = TaskScope::with_executor(&ex);
33    let data = Signal::new(String::new());
34
35    // "route handler" spawns a reactive effect
36    let d = data.clone();
37    scope.spawn(async move {
38        loop {
39            d.changed().await;
40            let val = d.read();
41            if val == "done" {
42                break;
43            }
44        }
45    });
46
47    // "middleware" sets up cleanup
48    let cleanup_called = Rc::new(Cell::new(false));
49    let cc = Rc::clone(&cleanup_called);
50    scope.on_cleanup(move || cc.set(true));
51
52    // "business logic" — would be I/O in real app
53    let result = auralis_task::with_executor(&ex, || {
54        data.set(format!("request {request_id}: processing"));
55        data.set(format!("request {request_id}: done"));
56        data.read()
57    });
58
59    // Drop scope → cancels effect, runs cleanup.
60    drop(scope);
61
62    assert!(cleanup_called.get());
63    result
64}
65
66fn main() {
67    // ---- Pattern 1: sequential requests (same thread, isolated) ----------
68    println!("=== 1. Sequential request isolation ===");
69    let r1 = handle_request(1);
70    let r2 = handle_request(2);
71    println!("  {r1}");
72    println!("  {r2}");
73    assert_eq!(r1, "request 1: done");
74    assert_eq!(r2, "request 2: done");
75
76    // ---- Pattern 2: same-signal isolation test (different executors) -----
77    println!("\n=== 2. Cross-executor signal isolation ===");
78    {
79        let ex_a = Executor::new_instance();
80        Executor::install_flush_scheduler(&ex_a, Rc::new(SyncScheduler));
81        let ex_b = Executor::new_instance();
82        Executor::install_flush_scheduler(&ex_b, Rc::new(SyncScheduler));
83
84        let sig_a1 = Signal::new(0i32);
85        let sig_b1 = Signal::new(0i32);
86
87        // Scope on executor A sets sig_a and reads sig_b.
88        let scope_a = TaskScope::with_executor(&ex_a);
89        let a_val = Rc::new(RefCell::new(Vec::new()));
90        let av = Rc::clone(&a_val);
91        {
92            let s = sig_a1.clone();
93            scope_a.spawn(async move {
94                s.set(42);
95                av.borrow_mut().push(s.read());
96            });
97        }
98        drop(scope_a);
99
100        // Scope on executor B sets sig_b and reads sig_a.
101        let scope_b = TaskScope::with_executor(&ex_b);
102        let b_val = Rc::new(RefCell::new(Vec::new()));
103        let bv = Rc::clone(&b_val);
104        {
105            let s = sig_b1.clone();
106            scope_b.spawn(async move {
107                s.set(99);
108                bv.borrow_mut().push(s.read());
109            });
110        }
111        drop(scope_b);
112
113        // Each executor only sees its own signal changes.
114        assert_eq!(*a_val.borrow(), vec![42]);
115        assert_eq!(*b_val.borrow(), vec![99]);
116        assert_eq!(sig_a1.read(), 42);
117        assert_eq!(sig_b1.read(), 99);
118    }
119
120    // ---- Pattern 3: the SSR mental model (one request = one executor) ----
121    println!("\n=== 3. SSR mental model ===");
122    for i in 0..3 {
123        let ex = Executor::new_instance();
124        Executor::install_flush_scheduler(&ex, Rc::new(SyncScheduler));
125        let counter = Signal::new(0i32);
126
127        let scope = TaskScope::with_executor(&ex);
128        let c = counter.clone();
129        let results = Rc::new(RefCell::new(Vec::new()));
130        let res = Rc::clone(&results);
131
132        scope.spawn(async move {
133            loop {
134                c.changed().await;
135                let v = c.read();
136                res.borrow_mut().push(v);
137                if v >= 3 {
138                    break;
139                }
140            }
141        });
142
143        auralis_task::with_executor(&ex, || {
144            counter.set(1);
145            counter.set(2);
146            counter.set(3);
147        });
148
149        drop(scope);
150        assert_eq!(*results.borrow(), vec![1, 2, 3]);
151        println!("  request {i}: signals {:?}", results.borrow());
152    }
153
154    println!("\nAll patterns completed — zero cross-request leakage.");
155}
Source

pub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>)

Install a time source on this executor instance.

Source

pub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64)

Set the maximum time (in milliseconds) a single flush may spend before yielding back to the host event loop.

The default is 8 ms (~120 fps frame budget, leaving time for the browser to render between flushes). Set to u64::MAX to disable time-budget yielding (flush runs to completion).

§Semantics

The budget is checked between task polls — the currently executing task is never interrupted. When the budget is exhausted the executor sets in_flush = false and schedules a follow-up flush so the remaining ready tasks will be polled on the next microtask tick. This is cooperative (.await-bound) yielding, not preemptive.

This affects this executor only. For the global thread-local executor use set_global_time_budget.

Source

pub fn set_max_deferred_callbacks( ex: &Rc<RefCell<Executor>>, limit: Option<usize>, )

Set a safety cap on the deferred signal callback queue.

When set to Some(n), the executor will panic if more than n deferred callbacks accumulate between two flush cycles. This is a safety net for SSR / multi-tenant servers where a runaway signal loop could exhaust memory — in a single-threaded Wasm context, unbounded accumulation is acceptable because it blocks the UI thread anyway.

Default: None (no limit).

Source

pub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>)

Register a callback invoked whenever a spawned task panics.

The default is no hook — panicking tasks are silently removed from the executor (the same behaviour as a task returning Poll::Ready(())).

§Example
Executor::set_panic_hook(&ex, Rc::new(|info| {
    eprintln!("task {} in scope {} panicked", info.task_id, info.scope_id);
}));
Source

pub fn spawn( ex: &Rc<RefCell<Executor>>, future: impl Future<Output = ()> + 'static, )

Spawn a future on this executor instance.

Source

pub fn flush_instance(ex: &Rc<RefCell<Executor>>)

Run a full flush cycle on this executor instance.

Mirrors the global flush cycle but operates on an isolated executor (used for SSR). Includes all the same protections: catch_unwind, suspend checks, time-budget yielding, and callback-drain budget.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.