pub struct Executor { /* private fields */ }Expand description
A single-threaded async task executor with priority queues.
Each Executor manages its own task slots, ready queues, and
deferred callback buffers. Use Executor::new_instance to create
an isolated executor (e.g. per SSR request), or use the global
thread-local executor via spawn_global.
Implementations§
Source§impl Executor
impl Executor
Sourcepub fn active_task_count(&self) -> usize
pub fn active_task_count(&self) -> usize
Return the number of currently active (not-yet-completed) tasks.
Used by streaming SSR to determine whether the stream should wait for more work or terminate.
Source§impl Executor
impl Executor
Sourcepub fn new_instance() -> Rc<RefCell<Executor>>
pub fn new_instance() -> Rc<RefCell<Executor>>
Create a new isolated executor, wrapped for shared access.
The returned executor is independent of the global thread-local
executor. Use with_executor to make it the current executor
for the duration of a closure, so that spawned tasks and signal
callbacks are routed to it.
Examples found in repository?
28fn handle_request(request_id: u32) -> String {
29 let ex = Executor::new_instance();
30 Executor::install_flush_scheduler(&ex, Rc::new(SyncScheduler));
31
32 let scope = TaskScope::with_executor(&ex);
33 let data = Signal::new(String::new());
34
35 // "route handler" spawns a reactive effect
36 let d = data.clone();
37 scope.spawn(async move {
38 loop {
39 d.changed().await;
40 let val = d.read();
41 if val == "done" {
42 break;
43 }
44 }
45 });
46
47 // "middleware" sets up cleanup
48 let cleanup_called = Rc::new(Cell::new(false));
49 let cc = Rc::clone(&cleanup_called);
50 scope.on_cleanup(move || cc.set(true));
51
52 // "business logic" — would be I/O in real app
53 let result = auralis_task::with_executor(&ex, || {
54 data.set(format!("request {request_id}: processing"));
55 data.set(format!("request {request_id}: done"));
56 data.read()
57 });
58
59 // Drop scope → cancels effect, runs cleanup.
60 drop(scope);
61
62 assert!(cleanup_called.get());
63 result
64}
65
66fn main() {
67 // ---- Pattern 1: sequential requests (same thread, isolated) ----------
68 println!("=== 1. Sequential request isolation ===");
69 let r1 = handle_request(1);
70 let r2 = handle_request(2);
71 println!(" {r1}");
72 println!(" {r2}");
73 assert_eq!(r1, "request 1: done");
74 assert_eq!(r2, "request 2: done");
75
76 // ---- Pattern 2: same-signal isolation test (different executors) -----
77 println!("\n=== 2. Cross-executor signal isolation ===");
78 {
79 let ex_a = Executor::new_instance();
80 Executor::install_flush_scheduler(&ex_a, Rc::new(SyncScheduler));
81 let ex_b = Executor::new_instance();
82 Executor::install_flush_scheduler(&ex_b, Rc::new(SyncScheduler));
83
84 let sig_a1 = Signal::new(0i32);
85 let sig_b1 = Signal::new(0i32);
86
87 // Scope on executor A sets sig_a and reads sig_b.
88 let scope_a = TaskScope::with_executor(&ex_a);
89 let a_val = Rc::new(RefCell::new(Vec::new()));
90 let av = Rc::clone(&a_val);
91 {
92 let s = sig_a1.clone();
93 scope_a.spawn(async move {
94 s.set(42);
95 av.borrow_mut().push(s.read());
96 });
97 }
98 drop(scope_a);
99
100 // Scope on executor B sets sig_b and reads sig_a.
101 let scope_b = TaskScope::with_executor(&ex_b);
102 let b_val = Rc::new(RefCell::new(Vec::new()));
103 let bv = Rc::clone(&b_val);
104 {
105 let s = sig_b1.clone();
106 scope_b.spawn(async move {
107 s.set(99);
108 bv.borrow_mut().push(s.read());
109 });
110 }
111 drop(scope_b);
112
113 // Each executor only sees its own signal changes.
114 assert_eq!(*a_val.borrow(), vec![42]);
115 assert_eq!(*b_val.borrow(), vec![99]);
116 assert_eq!(sig_a1.read(), 42);
117 assert_eq!(sig_b1.read(), 99);
118 }
119
120 // ---- Pattern 3: the SSR mental model (one request = one executor) ----
121 println!("\n=== 3. SSR mental model ===");
122 for i in 0..3 {
123 let ex = Executor::new_instance();
124 Executor::install_flush_scheduler(&ex, Rc::new(SyncScheduler));
125 let counter = Signal::new(0i32);
126
127 let scope = TaskScope::with_executor(&ex);
128 let c = counter.clone();
129 let results = Rc::new(RefCell::new(Vec::new()));
130 let res = Rc::clone(&results);
131
132 scope.spawn(async move {
133 loop {
134 c.changed().await;
135 let v = c.read();
136 res.borrow_mut().push(v);
137 if v >= 3 {
138 break;
139 }
140 }
141 });
142
143 auralis_task::with_executor(&ex, || {
144 counter.set(1);
145 counter.set(2);
146 counter.set(3);
147 });
148
149 drop(scope);
150 assert_eq!(*results.borrow(), vec![1, 2, 3]);
151 println!(" request {i}: signals {:?}", results.borrow());
152 }
153
154 println!("\nAll patterns completed — zero cross-request leakage.");
155}Sourcepub fn install_flush_scheduler(
ex: &Rc<RefCell<Executor>>,
sched: Rc<dyn ScheduleFlush>,
)
pub fn install_flush_scheduler( ex: &Rc<RefCell<Executor>>, sched: Rc<dyn ScheduleFlush>, )
Install a flush scheduler on this executor instance.
Examples found in repository?
28fn handle_request(request_id: u32) -> String {
29 let ex = Executor::new_instance();
30 Executor::install_flush_scheduler(&ex, Rc::new(SyncScheduler));
31
32 let scope = TaskScope::with_executor(&ex);
33 let data = Signal::new(String::new());
34
35 // "route handler" spawns a reactive effect
36 let d = data.clone();
37 scope.spawn(async move {
38 loop {
39 d.changed().await;
40 let val = d.read();
41 if val == "done" {
42 break;
43 }
44 }
45 });
46
47 // "middleware" sets up cleanup
48 let cleanup_called = Rc::new(Cell::new(false));
49 let cc = Rc::clone(&cleanup_called);
50 scope.on_cleanup(move || cc.set(true));
51
52 // "business logic" — would be I/O in real app
53 let result = auralis_task::with_executor(&ex, || {
54 data.set(format!("request {request_id}: processing"));
55 data.set(format!("request {request_id}: done"));
56 data.read()
57 });
58
59 // Drop scope → cancels effect, runs cleanup.
60 drop(scope);
61
62 assert!(cleanup_called.get());
63 result
64}
65
66fn main() {
67 // ---- Pattern 1: sequential requests (same thread, isolated) ----------
68 println!("=== 1. Sequential request isolation ===");
69 let r1 = handle_request(1);
70 let r2 = handle_request(2);
71 println!(" {r1}");
72 println!(" {r2}");
73 assert_eq!(r1, "request 1: done");
74 assert_eq!(r2, "request 2: done");
75
76 // ---- Pattern 2: same-signal isolation test (different executors) -----
77 println!("\n=== 2. Cross-executor signal isolation ===");
78 {
79 let ex_a = Executor::new_instance();
80 Executor::install_flush_scheduler(&ex_a, Rc::new(SyncScheduler));
81 let ex_b = Executor::new_instance();
82 Executor::install_flush_scheduler(&ex_b, Rc::new(SyncScheduler));
83
84 let sig_a1 = Signal::new(0i32);
85 let sig_b1 = Signal::new(0i32);
86
87 // Scope on executor A sets sig_a and reads sig_b.
88 let scope_a = TaskScope::with_executor(&ex_a);
89 let a_val = Rc::new(RefCell::new(Vec::new()));
90 let av = Rc::clone(&a_val);
91 {
92 let s = sig_a1.clone();
93 scope_a.spawn(async move {
94 s.set(42);
95 av.borrow_mut().push(s.read());
96 });
97 }
98 drop(scope_a);
99
100 // Scope on executor B sets sig_b and reads sig_a.
101 let scope_b = TaskScope::with_executor(&ex_b);
102 let b_val = Rc::new(RefCell::new(Vec::new()));
103 let bv = Rc::clone(&b_val);
104 {
105 let s = sig_b1.clone();
106 scope_b.spawn(async move {
107 s.set(99);
108 bv.borrow_mut().push(s.read());
109 });
110 }
111 drop(scope_b);
112
113 // Each executor only sees its own signal changes.
114 assert_eq!(*a_val.borrow(), vec![42]);
115 assert_eq!(*b_val.borrow(), vec![99]);
116 assert_eq!(sig_a1.read(), 42);
117 assert_eq!(sig_b1.read(), 99);
118 }
119
120 // ---- Pattern 3: the SSR mental model (one request = one executor) ----
121 println!("\n=== 3. SSR mental model ===");
122 for i in 0..3 {
123 let ex = Executor::new_instance();
124 Executor::install_flush_scheduler(&ex, Rc::new(SyncScheduler));
125 let counter = Signal::new(0i32);
126
127 let scope = TaskScope::with_executor(&ex);
128 let c = counter.clone();
129 let results = Rc::new(RefCell::new(Vec::new()));
130 let res = Rc::clone(&results);
131
132 scope.spawn(async move {
133 loop {
134 c.changed().await;
135 let v = c.read();
136 res.borrow_mut().push(v);
137 if v >= 3 {
138 break;
139 }
140 }
141 });
142
143 auralis_task::with_executor(&ex, || {
144 counter.set(1);
145 counter.set(2);
146 counter.set(3);
147 });
148
149 drop(scope);
150 assert_eq!(*results.borrow(), vec![1, 2, 3]);
151 println!(" request {i}: signals {:?}", results.borrow());
152 }
153
154 println!("\nAll patterns completed — zero cross-request leakage.");
155}Sourcepub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>)
pub fn install_time_source(ex: &Rc<RefCell<Executor>>, ts: Rc<dyn TimeSource>)
Install a time source on this executor instance.
Sourcepub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64)
pub fn set_time_budget(ex: &Rc<RefCell<Executor>>, budget_ms: u64)
Set the maximum time (in milliseconds) a single flush may spend before yielding back to the host event loop.
The default is 8 ms (~120 fps frame budget, leaving time for the
browser to render between flushes). Set to u64::MAX to disable
time-budget yielding (flush runs to completion).
§Semantics
The budget is checked between task polls — the currently
executing task is never interrupted. When the budget is exhausted
the executor sets in_flush = false and schedules a follow-up
flush so the remaining ready tasks will be polled on the next
microtask tick. This is cooperative (.await-bound) yielding,
not preemptive.
This affects this executor only. For the global thread-local
executor use set_global_time_budget.
Sourcepub fn set_max_deferred_callbacks(
ex: &Rc<RefCell<Executor>>,
limit: Option<usize>,
)
pub fn set_max_deferred_callbacks( ex: &Rc<RefCell<Executor>>, limit: Option<usize>, )
Set a safety cap on the deferred signal callback queue.
When set to Some(n), the executor will panic if more than n
deferred callbacks accumulate between two flush cycles. This is a
safety net for SSR / multi-tenant servers where a runaway signal
loop could exhaust memory — in a single-threaded Wasm context,
unbounded accumulation is acceptable because it blocks the UI
thread anyway.
Default: None (no limit).
Sourcepub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>)
pub fn set_panic_hook(ex: &Rc<RefCell<Executor>>, hook: Rc<dyn Fn(PanicInfo)>)
Register a callback invoked whenever a spawned task panics.
The default is no hook — panicking tasks are silently removed
from the executor (the same behaviour as a task returning
Poll::Ready(())).
§Example
Executor::set_panic_hook(&ex, Rc::new(|info| {
eprintln!("task {} in scope {} panicked", info.task_id, info.scope_id);
}));Sourcepub fn spawn(
ex: &Rc<RefCell<Executor>>,
future: impl Future<Output = ()> + 'static,
)
pub fn spawn( ex: &Rc<RefCell<Executor>>, future: impl Future<Output = ()> + 'static, )
Spawn a future on this executor instance.
Sourcepub fn flush_instance(ex: &Rc<RefCell<Executor>>)
pub fn flush_instance(ex: &Rc<RefCell<Executor>>)
Run a full flush cycle on this executor instance.
Mirrors the global flush cycle but operates on an
isolated executor (used for SSR). Includes all the same
protections: catch_unwind, suspend checks, time-budget
yielding, and callback-drain budget.