Skip to main content

init_flush_scheduler

Function init_flush_scheduler 

Source
pub fn init_flush_scheduler(sched: Rc<dyn ScheduleFlush>)
Expand description

Set the platform flush scheduler and install the signal deferred- callback hook.

Idempotent — subsequent calls are no-ops (the hook is installed via std::sync::OnceLock, so it fires exactly once per process).

§Threading constraint

The hook is per-process and routes signal notifications to the executor that is “current” when the notification fires (see with_executor). For single-threaded use (Wasm, CLI) this is transparent. For multi-threaded SSR, enable the ssr-tokio feature and call init_scope_store_tokio. See with_executor for the full routing contract.

Examples found in repository?
examples/scope_bench.rs (line 21)
20fn init() {
21    init_flush_scheduler(Rc::new(BenchScheduleFlush));
22}
23
24// ---------------------------------------------------------------------------
25// Batched scheduler
26// ---------------------------------------------------------------------------
27
28struct BatchedFlush {
29    callbacks: RefCell<Vec<Box<dyn FnOnce()>>>,
30}
31impl BatchedFlush {
32    fn new() -> Rc<Self> {
33        Rc::new(Self {
34            callbacks: RefCell::new(Vec::new()),
35        })
36    }
37    fn drain(&self) {
38        while let Some(cb) = self.callbacks.borrow_mut().pop() {
39            cb();
40        }
41    }
42}
43impl ScheduleFlush for BatchedFlush {
44    fn schedule(&self, callback: Box<dyn FnOnce()>) {
45        self.callbacks.borrow_mut().push(callback);
46    }
47}
48
49// ---------------------------------------------------------------------------
50// Timer helper
51// ---------------------------------------------------------------------------
52
53fn time<R>(label: &str, iterations: u64, mut f: impl FnMut() -> R) {
54    // Warm-up
55    for _ in 0..(iterations / 10).max(1) {
56        f();
57    }
58    let start = Instant::now();
59    for _ in 0..iterations {
60        f();
61    }
62    let elapsed = start.elapsed().as_secs_f64() * 1_000_000.0 / iterations as f64;
63    println!("  {:<42} {:>8.2} µs/iter", label, elapsed);
64}
65
66// ---------------------------------------------------------------------------
67
68fn main() {
69    println!("=== Auralis Task Scope Benchmarks ===\n");
70
71    // 1. Scope create + destroy (100 tasks)
72    let iterations = 50_000;
73    init();
74    time("scope_100_tasks_create_destroy", iterations, || {
75        let scope = TaskScope::new();
76        for _ in 0..100 {
77            scope.spawn(async {});
78        }
79        drop(scope);
80    });
81
82    // 2. Deep nesting drop (200 levels)
83    init();
84    time("scope_200_levels_deep_drop", 5_000, || {
85        let root = TaskScope::new();
86        {
87            let mut current = TaskScope::new_child(&root);
88            for _ in 0..199 {
89                current.spawn(async {});
90                current = TaskScope::new_child(&current);
91            }
92        }
93        drop(root);
94    });
95
96    // 3. Priority ordering (batched flush)
97    {
98        let order: Rc<RefCell<Vec<String>>> = Rc::new(RefCell::new(Vec::new()));
99        let min_iterations = 5_000;
100        // Warm-up
101        for _ in 0..500 {
102            let sched = BatchedFlush::new();
103            init_flush_scheduler(Rc::clone(&sched) as Rc<dyn ScheduleFlush>);
104            order.borrow_mut().clear();
105            for i in 0..1000 {
106                let o = Rc::clone(&order);
107                auralis_task::spawn_global_with_priority(Priority::Low, async move {
108                    o.borrow_mut().push(format!("low_{i}"));
109                });
110            }
111            for i in 0..10 {
112                let o = Rc::clone(&order);
113                auralis_task::spawn_global_with_priority(Priority::High, async move {
114                    o.borrow_mut().push(format!("high_{i}"));
115                });
116            }
117            sched.drain();
118        }
119
120        let start = Instant::now();
121        for _ in 0..min_iterations {
122            let sched = BatchedFlush::new();
123            init_flush_scheduler(Rc::clone(&sched) as Rc<dyn ScheduleFlush>);
124            order.borrow_mut().clear();
125            for i in 0..1000 {
126                let o = Rc::clone(&order);
127                auralis_task::spawn_global_with_priority(Priority::Low, async move {
128                    o.borrow_mut().push(format!("low_{i}"));
129                });
130            }
131            for i in 0..10 {
132                let o = Rc::clone(&order);
133                auralis_task::spawn_global_with_priority(Priority::High, async move {
134                    o.borrow_mut().push(format!("high_{i}"));
135                });
136            }
137            sched.drain();
138            // Verify correctness.
139            let result = order.borrow().clone();
140            debug_assert_eq!(result.len(), 1010);
141        }
142        let elapsed = start.elapsed().as_secs_f64() * 1_000_000.0 / min_iterations as f64;
143        println!(
144            "  {:<42} {:>8.2} µs/iter",
145            "priority_1000_low_10_high", elapsed
146        );
147        init(); // restore sync scheduler
148    }
149
150    // 4. Scope churn (100 scopes x 10 tasks, batch drop)
151    init();
152    time("scope_churn_100x10_batch_drop", 5_000, || {
153        let scopes: Vec<TaskScope> = (0..100)
154            .map(|_| {
155                let s = TaskScope::new();
156                for _ in 0..10 {
157                    s.spawn(async {});
158                }
159                s
160            })
161            .collect();
162        drop(scopes);
163    });
164
165    // 5. Scope suspend + resume (1000 tasks)
166    init();
167    let scope = TaskScope::new();
168    for _ in 0..1000 {
169        scope.spawn(async {});
170    }
171    time("scope_suspend_resume_1000_tasks", 500_000, || {
172        scope.suspend();
173        scope.resume();
174    });
175
176    // 6. Wide shallow tree drop (50x50, ~2550 tasks)
177    init();
178    time("scope_wide_tree_50x3_drop", 500, || {
179        let root = TaskScope::new();
180        for _ in 0..50 {
181            let child = TaskScope::new_child(&root);
182            child.spawn(async {});
183            for _ in 0..50 {
184                let grandchild = TaskScope::new_child(&child);
185                grandchild.spawn(async {});
186            }
187        }
188        drop(root);
189    });
190
191    println!("\n=== Done ===");
192}
More examples
Hide additional examples
examples/counter.rs (line 39)
37fn main() {
38    // One-time executor setup.
39    init_flush_scheduler(Rc::new(CliScheduler));
40    init_time_source(Rc::new(CliClock::new()));
41
42    // ---- Signal basics ----
43    println!("=== Signal basics ===");
44    let count = Signal::new(0);
45    println!("count = {}", count.read());
46    count.set(42);
47    println!("count = {}", count.read());
48
49    // ---- Memo (auto-tracking computed value) ----
50    println!("\n=== Memo ===");
51    let a = Signal::new(2);
52    let b = Signal::new(3);
53    let a2 = a.clone();
54    let b2 = b.clone();
55    let sum = Memo::new(move || a2.read() + b2.read());
56    println!("2 + 3 = {}", sum.read());
57    a.set(10);
58    println!("10 + 3 = {}", sum.read());
59
60    // ---- TaskScope: spawn a task that watches a signal ----
61    println!("\n=== TaskScope (scoped signal watcher) ===");
62    let messages = Signal::new(vec!["hello".to_string()]);
63
64    {
65        let scope = TaskScope::new();
66        let msgs = messages.clone();
67        scope.spawn(async move {
68            loop {
69                let val = msgs.changed().await;
70                println!("  task observed: {:?}", val);
71            }
72        });
73
74        messages.set(vec!["hello".to_string(), "world".to_string()]);
75        messages.set(vec![
76            "hello".to_string(),
77            "world".to_string(),
78            "!".to_string(),
79        ]);
80
81        // Explicit drop — cancels the task and cleans up resources.
82        drop(scope);
83    }
84    println!("(scope dropped — task and subscriptions cleaned up)");
85
86    // ---- Batch: multiple sets, one notification ----
87    println!("\n=== Batch ===");
88    let x = Signal::new(0);
89    let notifications = Rc::new(Cell::new(0u32));
90    let n = Rc::clone(&notifications);
91    let x2 = x.clone();
92
93    let scope = TaskScope::new();
94    scope.spawn(async move {
95        loop {
96            x2.changed().await;
97            n.set(n.get() + 1);
98        }
99    });
100
101    batch(|| {
102        x.set(1);
103        x.set(2);
104        x.set(3);
105    });
106    println!(
107        "After 3 sets in a batch: {} notification(s)",
108        notifications.get()
109    );
110    // Should be 1 — not 3.
111
112    println!("\nDone.");
113}