1use crate::collections::map::HashMap;
2use crate::collections::map::HashSet;
3use crate::MutableStateInner;
4use std::any::Any;
5use std::cell::{Cell, RefCell};
6use std::collections::VecDeque;
7use std::future::Future;
8use std::pin::Pin;
9use std::rc::{Rc, Weak};
10use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
11use std::sync::{mpsc, Arc};
12use std::task::{Context, Poll, Waker};
13use std::thread::ThreadId;
14use std::thread_local;
15
16#[cfg(any(feature = "internal", test))]
17use crate::frame_clock::FrameClock;
18use crate::platform::RuntimeScheduler;
19use crate::{Applier, Command, FrameCallbackId, NodeError, RecomposeScopeInner, ScopeId};
20
21enum UiMessage {
22 Task(Box<dyn FnOnce() + Send + 'static>),
23 Invoke { id: u64, value: Box<dyn Any + Send> },
24}
25
26type UiContinuation = Box<dyn Fn(Box<dyn Any>) + 'static>;
27type UiContinuationMap = HashMap<u64, UiContinuation>;
28
29struct TypedStateCell<T: Clone + 'static> {
30 inner: MutableStateInner<T>,
31}
32
33#[allow(dead_code)]
34struct RawStateCell<T: 'static> {
35 value: T,
36}
37
38struct StateArenaSlot {
39 generation: u32,
40 cell: Option<Rc<dyn Any>>,
41 lease: Option<Weak<StateHandleLease>>,
42}
43
44#[derive(Default)]
45struct StateArenaInner {
46 cells: Vec<StateArenaSlot>,
47 free: Vec<u32>,
48}
49
50#[derive(Default)]
51pub(crate) struct StateArena {
52 inner: RefCell<StateArenaInner>,
53}
54
55impl StateArena {
56 pub(crate) fn alloc<T: Clone + 'static>(&self, value: T, runtime: RuntimeHandle) -> StateId {
57 let (slot, generation) = {
58 let mut inner = self.inner.borrow_mut();
59 match inner.free.pop() {
60 Some(slot) => {
61 let entry = inner
62 .cells
63 .get_mut(slot as usize)
64 .expect("state slot missing");
65 debug_assert!(entry.cell.is_none(), "reused state slot must be empty");
66 entry.generation = entry.generation.wrapping_add(1);
67 (slot, entry.generation)
68 }
69 None => {
70 let slot = inner.cells.len() as u32;
71 inner.cells.push(StateArenaSlot {
72 generation: 0,
73 cell: None,
74 lease: None,
75 });
76 (slot, 0)
77 }
78 }
79 };
80 let id = StateId::new(slot, generation);
81 let inner = MutableStateInner::new(value, runtime.clone());
82 inner.install_snapshot_observer(id);
83 let cell: Rc<dyn Any> = Rc::new(TypedStateCell { inner });
84 self.inner.borrow_mut().cells[slot as usize].cell = Some(cell);
85 id
86 }
87
88 #[allow(dead_code)]
89 pub(crate) fn alloc_raw<T: 'static>(&self, value: T) -> StateId {
90 let (slot, generation) = {
91 let mut inner = self.inner.borrow_mut();
92 match inner.free.pop() {
93 Some(slot) => {
94 let entry = inner
95 .cells
96 .get_mut(slot as usize)
97 .expect("state slot missing");
98 debug_assert!(entry.cell.is_none(), "reused state slot must be empty");
99 entry.generation = entry.generation.wrapping_add(1);
100 (slot, entry.generation)
101 }
102 None => {
103 let slot = inner.cells.len() as u32;
104 inner.cells.push(StateArenaSlot {
105 generation: 0,
106 cell: None,
107 lease: None,
108 });
109 (slot, 0)
110 }
111 }
112 };
113 let id = StateId::new(slot, generation);
114 let cell: Rc<dyn Any> = Rc::new(RawStateCell { value });
115 self.inner.borrow_mut().cells[slot as usize].cell = Some(cell);
116 id
117 }
118
119 fn get_cell(&self, id: StateId) -> Rc<dyn Any> {
120 self.get_cell_opt(id).expect("state cell missing")
121 }
122
123 fn get_cell_opt(&self, id: StateId) -> Option<Rc<dyn Any>> {
124 self.inner
125 .borrow()
126 .cells
127 .get(id.slot_index())
128 .filter(|cell| cell.generation == id.generation())
129 .and_then(|cell| cell.cell.as_ref())
130 .cloned()
131 }
132
133 fn get_typed<T: Clone + 'static>(&self, id: StateId) -> Rc<TypedStateCell<T>> {
134 match self.get_cell_opt(id) {
135 None => panic!(
136 "state cell missing: slot={}, gen={}, expected={}",
137 id.slot(),
138 id.generation(),
139 std::any::type_name::<T>(),
140 ),
141 Some(cell) => Rc::downcast::<TypedStateCell<T>>(cell).unwrap_or_else(|_| {
142 panic!(
143 "state cell type mismatch: slot={}, gen={}, expected={}",
144 id.slot(),
145 id.generation(),
146 std::any::type_name::<T>(),
147 )
148 }),
149 }
150 }
151
152 fn get_typed_opt<T: Clone + 'static>(&self, id: StateId) -> Option<Rc<TypedStateCell<T>>> {
153 Rc::downcast::<TypedStateCell<T>>(self.get_cell_opt(id)?).ok()
154 }
155
156 pub(crate) fn with_typed<T: Clone + 'static, R>(
157 &self,
158 id: StateId,
159 f: impl FnOnce(&MutableStateInner<T>) -> R,
160 ) -> R {
161 let cell = self.get_typed::<T>(id);
162 f(&cell.inner)
163 }
164
165 pub(crate) fn with_typed_opt<T: Clone + 'static, R>(
166 &self,
167 id: StateId,
168 f: impl FnOnce(&MutableStateInner<T>) -> R,
169 ) -> Option<R> {
170 let cell = self.get_typed_opt::<T>(id)?;
171 Some(f(&cell.inner))
172 }
173
174 #[allow(dead_code)]
175 pub(crate) fn with_raw<T: 'static, R>(&self, id: StateId, f: impl FnOnce(&T) -> R) -> R {
176 let cell = Rc::downcast::<RawStateCell<T>>(self.get_cell(id))
177 .expect("raw state cell type mismatch");
178 f(&cell.value)
179 }
180
181 pub(crate) fn release(&self, id: StateId) {
182 let cell = {
183 let mut inner = self.inner.borrow_mut();
184 let Some(slot) = inner.cells.get_mut(id.slot_index()) else {
185 return;
186 };
187 if slot.generation != id.generation() {
188 return;
189 }
190 slot.lease = None;
191 let cell = slot.cell.take();
192 if cell.is_some() {
193 inner.free.push(id.slot());
194 }
195 cell
196 };
197 drop(cell);
198 }
199
200 #[cfg(test)]
201 pub(crate) fn stats(&self) -> (usize, usize) {
202 let inner = self.inner.borrow();
203 (inner.cells.len(), inner.free.len())
204 }
205
206 pub(crate) fn register_lease(&self, id: StateId, lease: &Rc<StateHandleLease>) {
207 let mut inner = self.inner.borrow_mut();
208 let Some(slot) = inner.cells.get_mut(id.slot_index()) else {
209 panic!("state slot missing");
210 };
211 assert_eq!(
212 slot.generation,
213 id.generation(),
214 "state generation mismatch"
215 );
216 slot.lease = Some(Rc::downgrade(lease));
217 }
218
219 pub(crate) fn retain_lease(&self, id: StateId) -> Option<Rc<StateHandleLease>> {
220 let inner = self.inner.borrow();
221 let slot = inner.cells.get(id.slot_index())?;
222 if slot.generation != id.generation() {
223 return None;
224 }
225 slot.lease.as_ref()?.upgrade()
226 }
227}
228
229#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
230pub struct StateId {
231 slot: u32,
232 generation: u32,
233}
234
235impl StateId {
236 const fn new(slot: u32, generation: u32) -> Self {
237 Self { slot, generation }
238 }
239
240 pub(crate) const fn slot(self) -> u32 {
241 self.slot
242 }
243
244 pub(crate) const fn slot_index(self) -> usize {
245 self.slot as usize
246 }
247
248 pub(crate) const fn generation(self) -> u32 {
249 self.generation
250 }
251}
252
253#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
254pub struct RuntimeId(u32);
255
256impl RuntimeId {
257 fn next() -> Self {
258 static NEXT_RUNTIME_ID: AtomicU32 = AtomicU32::new(1);
259 Self(NEXT_RUNTIME_ID.fetch_add(1, Ordering::Relaxed))
260 }
261}
262
263struct UiDispatcherInner {
264 scheduler: Arc<dyn RuntimeScheduler>,
265 tx: mpsc::Sender<UiMessage>,
266 pending: AtomicUsize,
267}
268
269impl UiDispatcherInner {
270 fn new(scheduler: Arc<dyn RuntimeScheduler>, tx: mpsc::Sender<UiMessage>) -> Self {
271 Self {
272 scheduler,
273 tx,
274 pending: AtomicUsize::new(0),
275 }
276 }
277
278 fn post(&self, task: impl FnOnce() + Send + 'static) {
279 self.pending.fetch_add(1, Ordering::SeqCst);
280 let _ = self.tx.send(UiMessage::Task(Box::new(task)));
281 self.scheduler.schedule_frame();
282 }
283
284 fn post_invoke(&self, id: u64, value: Box<dyn Any + Send>) {
285 self.pending.fetch_add(1, Ordering::SeqCst);
286 let _ = self.tx.send(UiMessage::Invoke { id, value });
287 self.scheduler.schedule_frame();
288 }
289
290 fn has_pending(&self) -> bool {
291 self.pending.load(Ordering::SeqCst) > 0
292 }
293}
294
295struct PendingGuard<'a> {
296 counter: &'a AtomicUsize,
297}
298
299impl<'a> PendingGuard<'a> {
300 fn new(counter: &'a AtomicUsize) -> Self {
301 Self { counter }
302 }
303}
304
305impl<'a> Drop for PendingGuard<'a> {
306 fn drop(&mut self) {
307 let previous = self.counter.fetch_sub(1, Ordering::SeqCst);
308 debug_assert!(previous > 0, "UI dispatcher pending count underflowed");
309 }
310}
311
312#[derive(Clone)]
313pub struct UiDispatcher {
314 inner: Arc<UiDispatcherInner>,
315}
316
317impl UiDispatcher {
318 fn new(inner: Arc<UiDispatcherInner>) -> Self {
319 Self { inner }
320 }
321
322 pub fn post(&self, task: impl FnOnce() + Send + 'static) {
323 self.inner.post(task);
324 }
325
326 pub fn post_invoke<T>(&self, id: u64, value: T)
327 where
328 T: Send + 'static,
329 {
330 self.inner.post_invoke(id, Box::new(value));
331 }
332
333 pub fn has_pending(&self) -> bool {
334 self.inner.has_pending()
335 }
336}
337
338struct RuntimeInner {
339 scheduler: Arc<dyn RuntimeScheduler>,
340 needs_frame: RefCell<bool>,
341 node_updates: RefCell<Vec<Command>>, invalid_scopes: RefCell<HashSet<ScopeId>>, scope_queue: RefCell<Vec<(ScopeId, Weak<RecomposeScopeInner>)>>, frame_callbacks: RefCell<VecDeque<FrameCallbackEntry>>, next_frame_callback_id: Cell<u64>,
346 ui_dispatcher: Arc<UiDispatcherInner>,
347 ui_rx: RefCell<mpsc::Receiver<UiMessage>>,
348 local_tasks: RefCell<VecDeque<Box<dyn FnOnce() + 'static>>>,
349 ui_conts: RefCell<UiContinuationMap>,
350 next_cont_id: Cell<u64>,
351 ui_thread_id: ThreadId,
352 tasks: RefCell<Vec<TaskEntry>>, next_task_id: Cell<u64>,
354 task_waker: RefCell<Option<Waker>>,
355 state_arena: StateArena,
356 external_state_owners: RefCell<HashMap<StateId, Rc<StateHandleLease>>>,
357 runtime_id: RuntimeId,
358}
359
360struct TaskEntry {
361 id: u64,
362 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
363}
364
365impl RuntimeInner {
366 fn new(scheduler: Arc<dyn RuntimeScheduler>) -> Self {
367 let (tx, rx) = mpsc::channel();
368 let dispatcher = Arc::new(UiDispatcherInner::new(scheduler.clone(), tx));
369 Self {
370 scheduler,
371 needs_frame: RefCell::new(false),
372 node_updates: RefCell::new(Vec::new()),
373 invalid_scopes: RefCell::new(HashSet::default()),
374 scope_queue: RefCell::new(Vec::new()),
375 frame_callbacks: RefCell::new(VecDeque::new()),
376 next_frame_callback_id: Cell::new(1),
377 ui_dispatcher: dispatcher,
378 ui_rx: RefCell::new(rx),
379 local_tasks: RefCell::new(VecDeque::new()),
380 ui_conts: RefCell::new(UiContinuationMap::default()),
381 next_cont_id: Cell::new(1),
382 ui_thread_id: std::thread::current().id(),
383 tasks: RefCell::new(Vec::new()),
384 next_task_id: Cell::new(1),
385 task_waker: RefCell::new(None),
386 state_arena: StateArena::default(),
387 external_state_owners: RefCell::new(HashMap::default()),
388 runtime_id: RuntimeId::next(),
389 }
390 }
391
392 fn init_task_waker(this: &Rc<Self>) {
393 let weak = Rc::downgrade(this);
394 let waker = RuntimeTaskWaker::new(weak).into_waker();
395 *this.task_waker.borrow_mut() = Some(waker);
396 }
397
398 fn schedule(&self) {
399 *self.needs_frame.borrow_mut() = true;
400 self.scheduler.schedule_frame();
401 }
402
403 fn enqueue_update(&self, command: Command) {
404 self.node_updates.borrow_mut().push(command);
405 self.schedule(); }
407
408 fn take_updates(&self) -> Vec<Command> {
409 let updates = self.node_updates.borrow_mut().drain(..).collect::<Vec<_>>();
411 updates
412 }
413
414 fn has_updates(&self) -> bool {
415 !self.node_updates.borrow().is_empty() || self.has_invalid_scopes()
416 }
417
418 fn register_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
419 let mut invalid = self.invalid_scopes.borrow_mut();
420 if invalid.insert(id) {
421 self.scope_queue.borrow_mut().push((id, scope));
422 self.schedule();
423 }
424 }
425
426 fn mark_scope_recomposed(&self, id: ScopeId) {
427 self.invalid_scopes.borrow_mut().remove(&id);
428 }
429
430 fn take_invalidated_scopes(&self) -> Vec<(ScopeId, Weak<RecomposeScopeInner>)> {
431 let mut queue = self.scope_queue.borrow_mut();
433 if queue.is_empty() {
434 return Vec::new();
435 }
436 let pending: Vec<_> = queue.drain(..).collect();
437 drop(queue);
438 let invalid = self.invalid_scopes.borrow();
439 pending
440 .into_iter()
441 .filter(|(id, _)| invalid.contains(id))
442 .collect()
443 }
444
445 fn has_invalid_scopes(&self) -> bool {
446 !self.invalid_scopes.borrow().is_empty()
447 }
448
449 fn has_frame_callbacks(&self) -> bool {
450 !self.frame_callbacks.borrow().is_empty()
451 }
452
453 fn enqueue_ui_task(&self, task: Box<dyn FnOnce() + 'static>) {
458 self.local_tasks.borrow_mut().push_back(task);
459 self.schedule();
460 }
461
462 fn spawn_ui_task(&self, future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> u64 {
463 let id = self.next_task_id.get();
464 self.next_task_id.set(id + 1);
465 self.tasks.borrow_mut().push(TaskEntry { id, future });
466 self.schedule();
467 id
468 }
469
470 fn cancel_task(&self, id: u64) {
471 let mut tasks = self.tasks.borrow_mut();
472 if tasks.iter().any(|entry| entry.id == id) {
473 tasks.retain(|entry| entry.id != id);
474 }
475 }
476
477 fn poll_async_tasks(&self) -> bool {
478 let waker = match self.task_waker.borrow().as_ref() {
479 Some(waker) => waker.clone(),
480 None => return false,
481 };
482 let mut cx = Context::from_waker(&waker);
483 let mut tasks_ref = self.tasks.borrow_mut();
484 let tasks = std::mem::take(&mut *tasks_ref);
485 drop(tasks_ref);
486 let mut pending = Vec::with_capacity(tasks.len());
487 let mut made_progress = false;
488 for mut entry in tasks.into_iter() {
489 match entry.future.as_mut().poll(&mut cx) {
490 Poll::Ready(()) => {
491 made_progress = true;
492 }
493 Poll::Pending => {
494 pending.push(entry);
495 }
496 }
497 }
498 if !pending.is_empty() {
499 self.tasks.borrow_mut().extend(pending);
500 }
501 made_progress
502 }
503
504 fn drain_ui(&self) {
505 loop {
506 let mut executed = false;
507
508 {
509 let rx = &mut *self.ui_rx.borrow_mut();
510 for message in rx.try_iter() {
511 executed = true;
512 let _guard = PendingGuard::new(&self.ui_dispatcher.pending);
513 match message {
514 UiMessage::Task(task) => {
515 task();
516 }
517 UiMessage::Invoke { id, value } => {
518 self.invoke_ui_cont(id, value);
519 }
520 }
521 }
522 }
523
524 loop {
525 let task = {
526 let mut local = self.local_tasks.borrow_mut();
527 local.pop_front()
528 };
529
530 match task {
531 Some(task) => {
532 executed = true;
533 task();
534 }
535 None => break,
536 }
537 }
538
539 if self.poll_async_tasks() {
540 executed = true;
541 }
542
543 if !executed {
544 break;
545 }
546 }
547 }
548
549 fn has_pending_ui(&self) -> bool {
550 let local_pending = self
551 .local_tasks
552 .try_borrow()
553 .map(|tasks| !tasks.is_empty())
554 .unwrap_or(true);
555
556 let async_pending = self
557 .tasks
558 .try_borrow()
559 .map(|tasks| !tasks.is_empty())
560 .unwrap_or(true);
561
562 local_pending || self.ui_dispatcher.has_pending() || async_pending
563 }
564
565 fn register_ui_cont<T: 'static>(&self, f: impl FnOnce(T) + 'static) -> u64 {
566 debug_assert_eq!(
567 std::thread::current().id(),
568 self.ui_thread_id,
569 "UI continuation registered off the runtime thread",
570 );
571 let id = self.next_cont_id.get();
572 self.next_cont_id.set(id + 1);
573 let callback = RefCell::new(Some(f));
574 self.ui_conts.borrow_mut().insert(
575 id,
576 Box::new(move |value: Box<dyn Any>| {
577 let slot = callback
578 .borrow_mut()
579 .take()
580 .expect("UI continuation invoked more than once");
581 let value = value
582 .downcast::<T>()
583 .expect("UI continuation type mismatch");
584 slot(*value);
585 }),
586 );
587 id
588 }
589
590 fn invoke_ui_cont(&self, id: u64, value: Box<dyn Any + Send>) {
591 debug_assert_eq!(
592 std::thread::current().id(),
593 self.ui_thread_id,
594 "UI continuation invoked off the runtime thread",
595 );
596 if let Some(callback) = self.ui_conts.borrow_mut().remove(&id) {
597 let value: Box<dyn Any> = value;
598 callback(value);
599 }
600 }
601
602 fn cancel_ui_cont(&self, id: u64) {
603 self.ui_conts.borrow_mut().remove(&id);
604 }
605
606 fn register_frame_callback(&self, callback: Box<dyn FnOnce(u64) + 'static>) -> FrameCallbackId {
607 let id = self.next_frame_callback_id.get();
608 self.next_frame_callback_id.set(id + 1);
609 self.frame_callbacks
610 .borrow_mut()
611 .push_back(FrameCallbackEntry {
612 id,
613 callback: Some(callback),
614 });
615 self.schedule();
616 id
617 }
618
619 fn cancel_frame_callback(&self, id: FrameCallbackId) {
620 let mut callbacks = self.frame_callbacks.borrow_mut();
621 if let Some(index) = callbacks.iter().position(|entry| entry.id == id) {
622 callbacks.remove(index);
623 }
624 let callbacks_empty = callbacks.is_empty();
625 drop(callbacks);
626 let local_pending = self
627 .local_tasks
628 .try_borrow()
629 .map(|tasks| !tasks.is_empty())
630 .unwrap_or(true);
631 let async_pending = self
632 .tasks
633 .try_borrow()
634 .map(|tasks| !tasks.is_empty())
635 .unwrap_or(true);
636 if !self.has_invalid_scopes()
637 && !self.has_updates()
638 && callbacks_empty
639 && !local_pending
640 && !self.ui_dispatcher.has_pending()
641 && !async_pending
642 {
643 *self.needs_frame.borrow_mut() = false;
644 }
645 }
646
647 fn drain_frame_callbacks(&self, frame_time_nanos: u64) {
648 let mut callbacks = self.frame_callbacks.borrow_mut();
649 let mut pending: Vec<Box<dyn FnOnce(u64) + 'static>> = Vec::with_capacity(callbacks.len());
650 while let Some(mut entry) = callbacks.pop_front() {
651 if let Some(callback) = entry.callback.take() {
652 pending.push(callback);
653 }
654 }
655 drop(callbacks);
656
657 if !pending.is_empty() {
662 let _ = crate::run_in_mutable_snapshot(|| {
663 for callback in pending {
664 callback(frame_time_nanos);
665 }
666 });
667 }
668
669 if !self.has_invalid_scopes()
670 && !self.has_updates()
671 && !self.has_frame_callbacks()
672 && !self.has_pending_ui()
673 {
674 *self.needs_frame.borrow_mut() = false;
675 }
676 }
677}
678
679#[derive(Clone)]
680pub struct Runtime {
681 inner: Rc<RuntimeInner>, }
683
684impl Runtime {
685 pub fn new(scheduler: Arc<dyn RuntimeScheduler>) -> Self {
686 let inner = Rc::new(RuntimeInner::new(scheduler));
687 RuntimeInner::init_task_waker(&inner);
688 let runtime = Self { inner };
689 let handle = runtime.handle();
690 register_runtime_handle(&handle);
691 LAST_RUNTIME.with(|slot| *slot.borrow_mut() = Some(handle));
692 runtime
693 }
694
695 pub fn handle(&self) -> RuntimeHandle {
696 RuntimeHandle {
697 inner: Rc::downgrade(&self.inner),
698 dispatcher: UiDispatcher::new(self.inner.ui_dispatcher.clone()),
699 ui_thread_id: self.inner.ui_thread_id,
700 id: self.inner.runtime_id,
701 }
702 }
703
704 pub fn has_updates(&self) -> bool {
705 self.inner.has_updates()
706 }
707
708 pub fn needs_frame(&self) -> bool {
709 *self.inner.needs_frame.borrow() || self.inner.ui_dispatcher.has_pending()
710 }
711
712 pub fn set_needs_frame(&self, value: bool) {
713 *self.inner.needs_frame.borrow_mut() = value;
714 }
715
716 #[cfg(any(feature = "internal", test))]
717 pub fn frame_clock(&self) -> FrameClock {
718 FrameClock::new(self.handle())
719 }
720}
721
722impl Drop for Runtime {
723 fn drop(&mut self) {
724 if Rc::strong_count(&self.inner) != 1 {
725 return;
726 }
727 unregister_runtime_handle(self.inner.runtime_id);
728 LAST_RUNTIME.with(|slot| {
729 let should_clear = slot
730 .borrow()
731 .as_ref()
732 .is_some_and(|handle| handle.id() == self.inner.runtime_id);
733 if should_clear {
734 *slot.borrow_mut() = None;
735 }
736 });
737 }
738}
739
740#[derive(Default)]
741pub struct DefaultScheduler;
742
743impl RuntimeScheduler for DefaultScheduler {
744 fn schedule_frame(&self) {}
745}
746
747#[cfg(test)]
748#[derive(Default)]
749pub struct TestScheduler;
750
751#[cfg(test)]
752impl RuntimeScheduler for TestScheduler {
753 fn schedule_frame(&self) {}
754}
755
756#[cfg(test)]
757pub struct TestRuntime {
758 runtime: Runtime,
759}
760
761#[cfg(test)]
762impl Default for TestRuntime {
763 fn default() -> Self {
764 Self::new()
765 }
766}
767
768#[cfg(test)]
769impl TestRuntime {
770 pub fn new() -> Self {
771 Self {
772 runtime: Runtime::new(Arc::new(TestScheduler)),
773 }
774 }
775
776 pub fn handle(&self) -> RuntimeHandle {
777 self.runtime.handle()
778 }
779}
780
781#[derive(Clone)]
782pub struct RuntimeHandle {
783 inner: Weak<RuntimeInner>,
784 dispatcher: UiDispatcher,
785 ui_thread_id: ThreadId,
786 id: RuntimeId,
787}
788
789pub struct TaskHandle {
790 id: u64,
791 runtime: RuntimeHandle,
792}
793
794struct DeferredStateRelease {
795 runtime: RuntimeHandle,
796 id: StateId,
797}
798
799pub(crate) struct StateHandleLease {
800 id: StateId,
801 runtime: RuntimeHandle,
802}
803
804impl StateHandleLease {
805 pub(crate) fn id(&self) -> StateId {
806 self.id
807 }
808
809 pub(crate) fn runtime(&self) -> RuntimeHandle {
810 self.runtime.clone()
811 }
812}
813
814impl Drop for StateHandleLease {
815 fn drop(&mut self) {
816 defer_state_release(self.runtime.clone(), self.id);
817 }
818}
819
820impl RuntimeHandle {
821 pub fn id(&self) -> RuntimeId {
822 self.id
823 }
824
825 pub(crate) fn alloc_state<T: Clone + 'static>(&self, value: T) -> Rc<StateHandleLease> {
826 let id = self.with_state_arena(|arena| arena.alloc(value, self.clone()));
827 let lease = Rc::new(StateHandleLease {
828 id,
829 runtime: self.clone(),
830 });
831 self.with_state_arena(|arena| arena.register_lease(id, &lease));
832 lease
833 }
834
835 pub(crate) fn alloc_persistent_state<T: Clone + 'static>(
836 &self,
837 value: T,
838 ) -> crate::MutableState<T> {
839 let lease = self.alloc_state(value);
840 if let Some(inner) = self.inner.upgrade() {
841 inner
842 .external_state_owners
843 .borrow_mut()
844 .insert(lease.id(), Rc::clone(&lease));
845 }
846 crate::MutableState::from_lease(&lease)
847 }
848
849 pub(crate) fn retain_state_lease(&self, id: StateId) -> Option<Rc<StateHandleLease>> {
850 self.with_state_arena(|arena| arena.retain_lease(id))
851 }
852
853 pub(crate) fn with_state_arena<R>(&self, f: impl FnOnce(&StateArena) -> R) -> R {
854 self.inner
855 .upgrade()
856 .map(|inner| f(&inner.state_arena))
857 .unwrap_or_else(|| panic!("runtime dropped"))
858 }
859
860 #[allow(dead_code)]
861 pub(crate) fn alloc_value<T: 'static>(&self, value: T) -> StateId {
862 self.with_state_arena(|arena| arena.alloc_raw(value))
863 }
864
865 #[allow(dead_code)]
866 pub(crate) fn with_value<T: 'static, R>(&self, id: StateId, f: impl FnOnce(&T) -> R) -> R {
867 self.with_state_arena(|arena| arena.with_raw::<T, R>(id, f))
868 }
869
870 fn release_state_immediate(&self, id: StateId) {
871 if let Some(inner) = self.inner.upgrade() {
872 inner.state_arena.release(id);
873 }
874 }
875
876 #[cfg(test)]
877 pub(crate) fn state_arena_stats(&self) -> (usize, usize) {
878 self.with_state_arena(StateArena::stats)
879 }
880
881 pub fn schedule(&self) {
882 if let Some(inner) = self.inner.upgrade() {
883 inner.schedule();
884 }
885 }
886
887 pub(crate) fn enqueue_node_update(&self, command: Command) {
888 if let Some(inner) = self.inner.upgrade() {
889 inner.enqueue_update(command);
890 }
891 }
892
893 pub fn enqueue_ui_task(&self, task: Box<dyn FnOnce() + 'static>) {
900 if let Some(inner) = self.inner.upgrade() {
901 inner.enqueue_ui_task(task);
902 } else {
903 task();
904 }
905 }
906
907 pub fn spawn_ui<F>(&self, fut: F) -> Option<TaskHandle>
908 where
909 F: Future<Output = ()> + 'static,
910 {
911 self.inner.upgrade().map(|inner| {
912 let id = inner.spawn_ui_task(Box::pin(fut));
913 TaskHandle {
914 id,
915 runtime: self.clone(),
916 }
917 })
918 }
919
920 pub fn cancel_task(&self, id: u64) {
921 if let Some(inner) = self.inner.upgrade() {
922 inner.cancel_task(id);
923 }
924 }
925
926 pub fn post_ui(&self, task: impl FnOnce() + Send + 'static) {
931 self.dispatcher.post(task);
932 }
933
934 pub fn register_ui_cont<T: 'static>(&self, f: impl FnOnce(T) + 'static) -> Option<u64> {
935 self.inner.upgrade().map(|inner| inner.register_ui_cont(f))
936 }
937
938 pub fn cancel_ui_cont(&self, id: u64) {
939 if let Some(inner) = self.inner.upgrade() {
940 inner.cancel_ui_cont(id);
941 }
942 }
943
944 pub fn drain_ui(&self) {
945 if let Some(inner) = self.inner.upgrade() {
946 inner.drain_ui();
947 }
948 }
949
950 pub fn has_pending_ui(&self) -> bool {
951 self.inner
952 .upgrade()
953 .map(|inner| inner.has_pending_ui())
954 .unwrap_or_else(|| self.dispatcher.has_pending())
955 }
956
957 pub fn register_frame_callback(
958 &self,
959 callback: impl FnOnce(u64) + 'static,
960 ) -> Option<FrameCallbackId> {
961 self.inner
962 .upgrade()
963 .map(|inner| inner.register_frame_callback(Box::new(callback)))
964 }
965
966 pub fn cancel_frame_callback(&self, id: FrameCallbackId) {
967 if let Some(inner) = self.inner.upgrade() {
968 inner.cancel_frame_callback(id);
969 }
970 }
971
972 pub fn drain_frame_callbacks(&self, frame_time_nanos: u64) {
973 if let Some(inner) = self.inner.upgrade() {
974 inner.drain_frame_callbacks(frame_time_nanos);
975 }
976 }
977
978 #[cfg(any(feature = "internal", test))]
979 pub fn frame_clock(&self) -> FrameClock {
980 FrameClock::new(self.clone())
981 }
982
983 pub fn set_needs_frame(&self, value: bool) {
984 if let Some(inner) = self.inner.upgrade() {
985 *inner.needs_frame.borrow_mut() = value;
986 }
987 }
988
989 pub(crate) fn take_updates(&self) -> Vec<Command> {
990 self.inner
992 .upgrade()
993 .map(|inner| inner.take_updates())
994 .unwrap_or_default()
995 }
996
997 pub fn has_updates(&self) -> bool {
998 self.inner
999 .upgrade()
1000 .map(|inner| inner.has_updates())
1001 .unwrap_or(false)
1002 }
1003
1004 pub(crate) fn register_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
1005 if let Some(inner) = self.inner.upgrade() {
1006 inner.register_invalid_scope(id, scope);
1007 }
1008 }
1009
1010 pub(crate) fn mark_scope_recomposed(&self, id: ScopeId) {
1011 if let Some(inner) = self.inner.upgrade() {
1012 inner.mark_scope_recomposed(id);
1013 }
1014 }
1015
1016 pub(crate) fn take_invalidated_scopes(&self) -> Vec<(ScopeId, Weak<RecomposeScopeInner>)> {
1017 self.inner
1019 .upgrade()
1020 .map(|inner| inner.take_invalidated_scopes())
1021 .unwrap_or_default()
1022 }
1023
1024 pub fn has_invalid_scopes(&self) -> bool {
1025 self.inner
1026 .upgrade()
1027 .map(|inner| inner.has_invalid_scopes())
1028 .unwrap_or(false)
1029 }
1030
1031 pub fn has_frame_callbacks(&self) -> bool {
1032 self.inner
1033 .upgrade()
1034 .map(|inner| inner.has_frame_callbacks())
1035 .unwrap_or(false)
1036 }
1037
1038 pub fn assert_ui_thread(&self) {
1039 debug_assert_eq!(
1040 std::thread::current().id(),
1041 self.ui_thread_id,
1042 "state mutated off the runtime's UI thread"
1043 );
1044 }
1045
1046 pub fn dispatcher(&self) -> UiDispatcher {
1047 self.dispatcher.clone()
1048 }
1049}
1050
1051impl TaskHandle {
1052 pub fn cancel(self) {
1053 self.runtime.cancel_task(self.id);
1054 }
1055}
1056
1057pub(crate) struct FrameCallbackEntry {
1058 id: FrameCallbackId,
1059 callback: Option<Box<dyn FnOnce(u64) + 'static>>,
1060}
1061
1062struct RuntimeTaskWaker {
1063 scheduler: Arc<dyn RuntimeScheduler>,
1064}
1065
1066impl RuntimeTaskWaker {
1067 fn new(inner: Weak<RuntimeInner>) -> Self {
1068 let scheduler = inner
1071 .upgrade()
1072 .map(|rc| rc.scheduler.clone())
1073 .expect("RuntimeInner dropped before waker created");
1074 Self { scheduler }
1075 }
1076
1077 fn into_waker(self) -> Waker {
1078 futures_task::waker(Arc::new(self))
1079 }
1080}
1081
1082impl futures_task::ArcWake for RuntimeTaskWaker {
1083 fn wake_by_ref(arc_self: &Arc<Self>) {
1084 arc_self.scheduler.schedule_frame();
1085 }
1086}
1087
1088thread_local! {
1089 static ACTIVE_RUNTIMES: RefCell<Vec<RuntimeHandle>> = const { RefCell::new(Vec::new()) }; static LAST_RUNTIME: RefCell<Option<RuntimeHandle>> = const { RefCell::new(None) };
1091 static REGISTERED_RUNTIMES: RefCell<HashMap<RuntimeId, RuntimeHandle>> = RefCell::new(HashMap::default());
1092 static STATE_TEARDOWN_DEPTH: Cell<usize> = const { Cell::new(0) };
1093 static DEFERRED_STATE_RELEASES: RefCell<Vec<DeferredStateRelease>> = const { RefCell::new(Vec::new()) };
1094}
1095
1096pub fn current_runtime_handle() -> Option<RuntimeHandle> {
1101 if let Some(handle) = ACTIVE_RUNTIMES.with(|stack| stack.borrow().last().cloned()) {
1102 return Some(handle);
1103 }
1104 LAST_RUNTIME.with(|slot| slot.borrow().clone())
1105}
1106
1107pub(crate) fn runtime_handle_by_id(id: RuntimeId) -> Option<RuntimeHandle> {
1108 REGISTERED_RUNTIMES.with(|registry| registry.borrow().get(&id).cloned())
1109}
1110
1111fn register_runtime_handle(handle: &RuntimeHandle) {
1112 REGISTERED_RUNTIMES.with(|registry| {
1113 registry.borrow_mut().insert(handle.id(), handle.clone());
1114 });
1115}
1116
1117fn unregister_runtime_handle(id: RuntimeId) {
1118 REGISTERED_RUNTIMES.with(|registry| {
1119 registry.borrow_mut().remove(&id);
1120 });
1121}
1122
1123fn defer_state_release(runtime: RuntimeHandle, id: StateId) {
1124 let teardown_active = STATE_TEARDOWN_DEPTH.with(|depth| depth.get() > 0);
1125 if teardown_active {
1126 DEFERRED_STATE_RELEASES.with(|releases| {
1127 releases
1128 .borrow_mut()
1129 .push(DeferredStateRelease { runtime, id });
1130 });
1131 } else {
1132 runtime.release_state_immediate(id);
1133 }
1134}
1135
1136fn flush_deferred_state_releases() {
1137 DEFERRED_STATE_RELEASES.with(|releases| {
1138 let mut releases = releases.borrow_mut();
1139 while let Some(deferred) = releases.pop() {
1140 deferred.runtime.release_state_immediate(deferred.id);
1141 }
1142 });
1143}
1144
1145pub(crate) struct StateTeardownScope;
1146
1147pub(crate) fn enter_state_teardown_scope() -> StateTeardownScope {
1148 STATE_TEARDOWN_DEPTH.with(|depth| depth.set(depth.get() + 1));
1149 StateTeardownScope
1150}
1151
1152impl Drop for StateTeardownScope {
1153 fn drop(&mut self) {
1154 STATE_TEARDOWN_DEPTH.with(|depth| {
1155 let next = depth.get().saturating_sub(1);
1156 depth.set(next);
1157 if next == 0 {
1158 flush_deferred_state_releases();
1159 }
1160 });
1161 }
1162}
1163
1164pub(crate) fn push_active_runtime(handle: &RuntimeHandle) {
1165 register_runtime_handle(handle);
1166 ACTIVE_RUNTIMES.with(|stack| stack.borrow_mut().push(handle.clone()));
1167 LAST_RUNTIME.with(|slot| *slot.borrow_mut() = Some(handle.clone()));
1168}
1169
1170pub(crate) fn pop_active_runtime() {
1171 ACTIVE_RUNTIMES.with(|stack| {
1172 stack.borrow_mut().pop();
1173 });
1174}
1175
1176pub fn schedule_frame() {
1178 if let Some(handle) = current_runtime_handle() {
1179 handle.schedule();
1180 return;
1181 }
1182 panic!("no runtime available to schedule frame");
1183}
1184
1185pub fn schedule_node_update(
1187 update: impl FnOnce(&mut dyn Applier) -> Result<(), NodeError> + 'static,
1188) {
1189 let handle = current_runtime_handle().expect("no runtime available to schedule node update");
1190 handle.enqueue_node_update(Command::callback(update));
1191}