1use crate::collections::map::HashMap;
2use crate::collections::map::HashSet;
3use crate::state::{MutationPolicy, NeverEqual};
4use crate::MutableStateInner;
5use std::any::Any;
6use std::cell::{Cell, RefCell};
7use std::collections::VecDeque;
8use std::future::Future;
9use std::pin::Pin;
10use std::rc::{Rc, Weak};
11use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
12use std::sync::{mpsc, Arc};
13use std::task::{Context, Poll, Waker};
14use std::thread::ThreadId;
15use std::thread_local;
16
17#[cfg(any(feature = "internal", test))]
18use crate::frame_clock::FrameClock;
19use crate::platform::RuntimeScheduler;
20use crate::{Applier, Command, FrameCallbackId, NodeError, RecomposeScopeInner, ScopeId};
21
22enum UiMessage {
23 Task(Box<dyn FnOnce() + Send + 'static>),
24 Invoke { id: u64, value: Box<dyn Any + Send> },
25}
26
27type UiContinuation = Box<dyn Fn(Box<dyn Any>) + 'static>;
28type UiContinuationMap = HashMap<u64, UiContinuation>;
29
30struct TypedStateCell<T: Clone + 'static> {
31 inner: MutableStateInner<T>,
32}
33
34trait ScopeWatchCell {
35 fn unregister_scope(&self, scope_id: ScopeId);
36}
37
38impl<T: Clone + 'static> ScopeWatchCell for TypedStateCell<T> {
39 fn unregister_scope(&self, scope_id: ScopeId) {
40 self.inner.unregister_scope(scope_id);
41 }
42}
43
44struct StateArenaSlot {
45 generation: u32,
46 cell: Option<Rc<dyn Any>>,
47 watcher_cell: Option<Rc<dyn ScopeWatchCell>>,
48 lease: Option<Weak<StateHandleLease>>,
49}
50
51#[derive(Default)]
52struct StateArenaInner {
53 cells: Vec<StateArenaSlot>,
54 free: Vec<u32>,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
58pub struct StateArenaDebugStats {
59 pub cells_len: usize,
60 pub cells_cap: usize,
61 pub free_len: usize,
62 pub free_cap: usize,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
66pub struct RuntimeDebugStats {
67 pub node_updates_len: usize,
68 pub node_updates_cap: usize,
69 pub invalid_scopes_len: usize,
70 pub invalid_scopes_cap: usize,
71 pub scope_queue_len: usize,
72 pub scope_queue_cap: usize,
73 pub frame_callbacks_len: usize,
74 pub frame_callbacks_cap: usize,
75 pub local_tasks_len: usize,
76 pub local_tasks_cap: usize,
77 pub ui_conts_len: usize,
78 pub ui_conts_cap: usize,
79 pub tasks_len: usize,
80 pub tasks_cap: usize,
81 pub external_state_owners_len: usize,
82 pub external_state_owners_cap: usize,
83 pub ui_dispatcher_pending: usize,
84}
85
86#[derive(Default)]
87pub(crate) struct StateArena {
88 inner: RefCell<StateArenaInner>,
89}
90
91impl StateArena {
92 pub(crate) fn alloc<T: Clone + 'static>(&self, value: T, runtime: RuntimeHandle) -> StateId {
93 self.alloc_with_policy(value, runtime, Arc::new(NeverEqual))
94 }
95
96 pub(crate) fn alloc_with_policy<T: Clone + 'static>(
97 &self,
98 value: T,
99 runtime: RuntimeHandle,
100 policy: Arc<dyn MutationPolicy<T>>,
101 ) -> StateId {
102 let (slot, generation) = {
103 let mut inner = self.inner.borrow_mut();
104 match inner.free.pop() {
105 Some(slot) => {
106 let entry = inner
107 .cells
108 .get_mut(slot as usize)
109 .expect("state slot missing");
110 debug_assert!(entry.cell.is_none(), "reused state slot must be empty");
111 entry.generation = entry.generation.wrapping_add(1);
112 (slot, entry.generation)
113 }
114 None => {
115 let slot = inner.cells.len() as u32;
116 inner.cells.push(StateArenaSlot {
117 generation: 0,
118 cell: None,
119 watcher_cell: None,
120 lease: None,
121 });
122 (slot, 0)
123 }
124 }
125 };
126 let id = StateId::new(slot, generation);
127 let inner = MutableStateInner::new_with_policy(value, runtime.clone(), policy);
128 inner.install_snapshot_observer(id);
129 let typed_cell = Rc::new(TypedStateCell { inner });
130 let cell: Rc<dyn Any> = typed_cell.clone();
131 let watcher_cell: Rc<dyn ScopeWatchCell> = typed_cell;
132 let mut arena = self.inner.borrow_mut();
133 let slot_entry = &mut arena.cells[slot as usize];
134 slot_entry.cell = Some(cell);
135 slot_entry.watcher_cell = Some(watcher_cell);
136 id
137 }
138
139 fn get_cell_opt(&self, id: StateId) -> Option<Rc<dyn Any>> {
140 self.inner
141 .borrow()
142 .cells
143 .get(id.slot_index())
144 .filter(|cell| cell.generation == id.generation())
145 .and_then(|cell| cell.cell.as_ref())
146 .cloned()
147 }
148
149 fn get_typed<T: Clone + 'static>(&self, id: StateId) -> Rc<TypedStateCell<T>> {
150 match self.get_cell_opt(id) {
151 None => panic!(
152 "state cell missing: slot={}, gen={}, expected={}",
153 id.slot(),
154 id.generation(),
155 std::any::type_name::<T>(),
156 ),
157 Some(cell) => Rc::downcast::<TypedStateCell<T>>(cell).unwrap_or_else(|_| {
158 panic!(
159 "state cell type mismatch: slot={}, gen={}, expected={}",
160 id.slot(),
161 id.generation(),
162 std::any::type_name::<T>(),
163 )
164 }),
165 }
166 }
167
168 fn get_typed_opt<T: Clone + 'static>(&self, id: StateId) -> Option<Rc<TypedStateCell<T>>> {
169 Rc::downcast::<TypedStateCell<T>>(self.get_cell_opt(id)?).ok()
170 }
171
172 pub(crate) fn with_typed<T: Clone + 'static, R>(
173 &self,
174 id: StateId,
175 f: impl FnOnce(&MutableStateInner<T>) -> R,
176 ) -> R {
177 let cell = self.get_typed::<T>(id);
178 f(&cell.inner)
179 }
180
181 pub(crate) fn with_typed_opt<T: Clone + 'static, R>(
182 &self,
183 id: StateId,
184 f: impl FnOnce(&MutableStateInner<T>) -> R,
185 ) -> Option<R> {
186 let cell = self.get_typed_opt::<T>(id)?;
187 Some(f(&cell.inner))
188 }
189
190 pub(crate) fn release(&self, id: StateId) {
191 let cell = {
192 let mut inner = self.inner.borrow_mut();
193 let Some(slot) = inner.cells.get_mut(id.slot_index()) else {
194 return;
195 };
196 if slot.generation != id.generation() {
197 return;
198 }
199 slot.lease = None;
200 slot.watcher_cell = None;
201 let cell = slot.cell.take();
202 if cell.is_some() {
203 inner.free.push(id.slot());
204 }
205 cell
206 };
207 drop(cell);
208 }
209
210 pub(crate) fn stats(&self) -> (usize, usize) {
211 let inner = self.inner.borrow();
212 (inner.cells.len(), inner.free.len())
213 }
214
215 pub(crate) fn debug_stats(&self) -> StateArenaDebugStats {
216 let inner = self.inner.borrow();
217 StateArenaDebugStats {
218 cells_len: inner.cells.len(),
219 cells_cap: inner.cells.capacity(),
220 free_len: inner.free.len(),
221 free_cap: inner.free.capacity(),
222 }
223 }
224
225 pub(crate) fn unregister_scope(&self, id: StateId, scope_id: ScopeId) {
226 let watcher_cell = {
227 let inner = self.inner.borrow();
228 inner
229 .cells
230 .get(id.slot_index())
231 .filter(|slot| slot.generation == id.generation())
232 .and_then(|slot| slot.watcher_cell.as_ref())
233 .cloned()
234 };
235 if let Some(watcher_cell) = watcher_cell {
236 watcher_cell.unregister_scope(scope_id);
237 }
238 }
239
240 pub(crate) fn register_lease(&self, id: StateId, lease: &Rc<StateHandleLease>) {
241 let mut inner = self.inner.borrow_mut();
242 let Some(slot) = inner.cells.get_mut(id.slot_index()) else {
243 panic!("state slot missing");
244 };
245 assert_eq!(
246 slot.generation,
247 id.generation(),
248 "state generation mismatch"
249 );
250 slot.lease = Some(Rc::downgrade(lease));
251 }
252
253 pub(crate) fn retain_lease(&self, id: StateId) -> Option<Rc<StateHandleLease>> {
254 let inner = self.inner.borrow();
255 let slot = inner.cells.get(id.slot_index())?;
256 if slot.generation != id.generation() {
257 return None;
258 }
259 slot.lease.as_ref()?.upgrade()
260 }
261}
262
263#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
264pub struct StateId {
265 slot: u32,
266 generation: u32,
267}
268
269impl StateId {
270 const fn new(slot: u32, generation: u32) -> Self {
271 Self { slot, generation }
272 }
273
274 pub(crate) const fn slot(self) -> u32 {
275 self.slot
276 }
277
278 pub(crate) const fn slot_index(self) -> usize {
279 self.slot as usize
280 }
281
282 pub(crate) const fn generation(self) -> u32 {
283 self.generation
284 }
285}
286
287#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
288pub struct RuntimeId(u32);
289
290impl RuntimeId {
291 fn next() -> Self {
292 static NEXT_RUNTIME_ID: AtomicU32 = AtomicU32::new(1);
293 Self(NEXT_RUNTIME_ID.fetch_add(1, Ordering::Relaxed))
294 }
295}
296
297struct UiDispatcherInner {
298 scheduler: Arc<dyn RuntimeScheduler>,
299 tx: mpsc::Sender<UiMessage>,
300 pending: AtomicUsize,
301}
302
303impl UiDispatcherInner {
304 fn new(scheduler: Arc<dyn RuntimeScheduler>, tx: mpsc::Sender<UiMessage>) -> Self {
305 Self {
306 scheduler,
307 tx,
308 pending: AtomicUsize::new(0),
309 }
310 }
311
312 fn post(&self, task: impl FnOnce() + Send + 'static) {
313 self.pending.fetch_add(1, Ordering::SeqCst);
314 let _ = self.tx.send(UiMessage::Task(Box::new(task)));
315 self.scheduler.schedule_frame();
316 }
317
318 fn post_invoke(&self, id: u64, value: Box<dyn Any + Send>) {
319 self.pending.fetch_add(1, Ordering::SeqCst);
320 let _ = self.tx.send(UiMessage::Invoke { id, value });
321 self.scheduler.schedule_frame();
322 }
323
324 fn has_pending(&self) -> bool {
325 self.pending.load(Ordering::SeqCst) > 0
326 }
327}
328
329struct PendingGuard<'a> {
330 counter: &'a AtomicUsize,
331}
332
333impl<'a> PendingGuard<'a> {
334 fn new(counter: &'a AtomicUsize) -> Self {
335 Self { counter }
336 }
337}
338
339impl<'a> Drop for PendingGuard<'a> {
340 fn drop(&mut self) {
341 let previous = self.counter.fetch_sub(1, Ordering::SeqCst);
342 debug_assert!(previous > 0, "UI dispatcher pending count underflowed");
343 }
344}
345
346#[derive(Clone)]
347pub struct UiDispatcher {
348 inner: Arc<UiDispatcherInner>,
349}
350
351impl UiDispatcher {
352 fn new(inner: Arc<UiDispatcherInner>) -> Self {
353 Self { inner }
354 }
355
356 pub fn post(&self, task: impl FnOnce() + Send + 'static) {
357 self.inner.post(task);
358 }
359
360 pub fn post_invoke<T>(&self, id: u64, value: T)
361 where
362 T: Send + 'static,
363 {
364 self.inner.post_invoke(id, Box::new(value));
365 }
366
367 pub fn has_pending(&self) -> bool {
368 self.inner.has_pending()
369 }
370}
371
372struct RuntimeInner {
373 scheduler: Arc<dyn RuntimeScheduler>,
374 needs_frame: RefCell<bool>,
375 node_updates: RefCell<Vec<Command>>,
376 invalid_scopes: RefCell<HashSet<ScopeId>>,
377 scope_queue: RefCell<Vec<(ScopeId, Weak<RecomposeScopeInner>)>>,
378 frame_callbacks: RefCell<VecDeque<FrameCallbackEntry>>,
379 next_frame_callback_id: Cell<u64>,
380 ui_dispatcher: Arc<UiDispatcherInner>,
381 ui_rx: RefCell<mpsc::Receiver<UiMessage>>,
382 local_tasks: RefCell<VecDeque<Box<dyn FnOnce() + 'static>>>,
383 ui_conts: RefCell<UiContinuationMap>,
384 next_cont_id: Cell<u64>,
385 ui_thread_id: ThreadId,
386 tasks: RefCell<Vec<TaskEntry>>,
387 next_task_id: Cell<u64>,
388 task_waker: RefCell<Option<Waker>>,
389 state_arena: StateArena,
390 external_state_owners: RefCell<HashMap<StateId, Rc<StateHandleLease>>>,
391 runtime_id: RuntimeId,
392}
393
394struct TaskEntry {
395 id: u64,
396 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
397}
398
399impl RuntimeInner {
400 fn new(scheduler: Arc<dyn RuntimeScheduler>) -> Self {
401 let (tx, rx) = mpsc::channel();
402 let dispatcher = Arc::new(UiDispatcherInner::new(scheduler.clone(), tx));
403 Self {
404 scheduler,
405 needs_frame: RefCell::new(false),
406 node_updates: RefCell::new(Vec::new()),
407 invalid_scopes: RefCell::new(HashSet::default()),
408 scope_queue: RefCell::new(Vec::new()),
409 frame_callbacks: RefCell::new(VecDeque::new()),
410 next_frame_callback_id: Cell::new(1),
411 ui_dispatcher: dispatcher,
412 ui_rx: RefCell::new(rx),
413 local_tasks: RefCell::new(VecDeque::new()),
414 ui_conts: RefCell::new(UiContinuationMap::default()),
415 next_cont_id: Cell::new(1),
416 ui_thread_id: std::thread::current().id(),
417 tasks: RefCell::new(Vec::new()),
418 next_task_id: Cell::new(1),
419 task_waker: RefCell::new(None),
420 state_arena: StateArena::default(),
421 external_state_owners: RefCell::new(HashMap::default()),
422 runtime_id: RuntimeId::next(),
423 }
424 }
425
426 fn init_task_waker(this: &Rc<Self>) {
427 let weak = Rc::downgrade(this);
428 let waker = RuntimeTaskWaker::new(weak).into_waker();
429 *this.task_waker.borrow_mut() = Some(waker);
430 }
431
432 fn schedule(&self) {
433 *self.needs_frame.borrow_mut() = true;
434 self.scheduler.schedule_frame();
435 }
436
437 fn enqueue_update(&self, command: Command) {
438 self.node_updates.borrow_mut().push(command);
439 self.schedule(); }
441
442 fn take_updates(&self) -> Vec<Command> {
443 let updates = self.node_updates.borrow_mut().drain(..).collect::<Vec<_>>();
444 updates
445 }
446
447 fn has_updates(&self) -> bool {
448 !self.node_updates.borrow().is_empty() || self.has_invalid_scopes()
449 }
450
451 fn register_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
452 let mut invalid = self.invalid_scopes.borrow_mut();
453 if invalid.insert(id) {
454 self.scope_queue.borrow_mut().push((id, scope));
455 self.schedule();
456 }
457 }
458
459 fn requeue_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
460 if self.invalid_scopes.borrow().contains(&id) {
461 self.scope_queue.borrow_mut().push((id, scope));
462 self.schedule();
463 }
464 }
465
466 fn mark_scope_recomposed(&self, id: ScopeId) {
467 self.invalid_scopes.borrow_mut().remove(&id);
468 }
469
470 fn take_invalidated_scopes(&self) -> Vec<(ScopeId, Weak<RecomposeScopeInner>)> {
471 let mut queue = self.scope_queue.borrow_mut();
472 if queue.is_empty() {
473 return Vec::new();
474 }
475 let pending: Vec<_> = queue.drain(..).collect();
476 drop(queue);
477 let invalid = self.invalid_scopes.borrow();
478 pending
479 .into_iter()
480 .filter(|(id, _)| invalid.contains(id))
481 .collect()
482 }
483
484 fn has_invalid_scopes(&self) -> bool {
485 !self.invalid_scopes.borrow().is_empty()
486 }
487
488 fn has_frame_callbacks(&self) -> bool {
489 !self.frame_callbacks.borrow().is_empty()
490 }
491
492 fn enqueue_ui_task(&self, task: Box<dyn FnOnce() + 'static>) {
497 self.local_tasks.borrow_mut().push_back(task);
498 self.schedule();
499 }
500
501 fn spawn_ui_task(&self, future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> u64 {
502 let id = self.next_task_id.get();
503 self.next_task_id.set(id + 1);
504 self.tasks.borrow_mut().push(TaskEntry { id, future });
505 self.schedule();
506 id
507 }
508
509 fn cancel_task(&self, id: u64) {
510 let mut tasks = self.tasks.borrow_mut();
511 if tasks.iter().any(|entry| entry.id == id) {
512 tasks.retain(|entry| entry.id != id);
513 }
514 }
515
516 fn poll_async_tasks(&self) -> bool {
517 let waker = match self.task_waker.borrow().as_ref() {
518 Some(waker) => waker.clone(),
519 None => return false,
520 };
521 let mut cx = Context::from_waker(&waker);
522 let mut tasks_ref = self.tasks.borrow_mut();
523 let tasks = std::mem::take(&mut *tasks_ref);
524 drop(tasks_ref);
525 let mut pending = Vec::with_capacity(tasks.len());
526 let mut made_progress = false;
527 for mut entry in tasks.into_iter() {
528 match entry.future.as_mut().poll(&mut cx) {
529 Poll::Ready(()) => {
530 made_progress = true;
531 }
532 Poll::Pending => {
533 pending.push(entry);
534 }
535 }
536 }
537 if !pending.is_empty() {
538 self.tasks.borrow_mut().extend(pending);
539 }
540 made_progress
541 }
542
543 fn drain_ui(&self) {
544 loop {
545 let mut executed = false;
546
547 {
548 let rx = &mut *self.ui_rx.borrow_mut();
549 for message in rx.try_iter() {
550 executed = true;
551 let _guard = PendingGuard::new(&self.ui_dispatcher.pending);
552 match message {
553 UiMessage::Task(task) => {
554 task();
555 }
556 UiMessage::Invoke { id, value } => {
557 self.invoke_ui_cont(id, value);
558 }
559 }
560 }
561 }
562
563 loop {
564 let task = {
565 let mut local = self.local_tasks.borrow_mut();
566 local.pop_front()
567 };
568
569 match task {
570 Some(task) => {
571 executed = true;
572 task();
573 }
574 None => break,
575 }
576 }
577
578 if self.poll_async_tasks() {
579 executed = true;
580 }
581
582 if !executed {
583 break;
584 }
585 }
586 }
587
588 fn has_pending_ui(&self) -> bool {
589 let local_pending = self
590 .local_tasks
591 .try_borrow()
592 .map(|tasks| !tasks.is_empty())
593 .unwrap_or(true);
594
595 let async_pending = self
596 .tasks
597 .try_borrow()
598 .map(|tasks| !tasks.is_empty())
599 .unwrap_or(true);
600
601 local_pending || self.ui_dispatcher.has_pending() || async_pending
602 }
603
604 fn register_ui_cont<T: 'static>(&self, f: impl FnOnce(T) + 'static) -> u64 {
605 debug_assert_eq!(
606 std::thread::current().id(),
607 self.ui_thread_id,
608 "UI continuation registered off the runtime thread",
609 );
610 let id = self.next_cont_id.get();
611 self.next_cont_id.set(id + 1);
612 let callback = RefCell::new(Some(f));
613 self.ui_conts.borrow_mut().insert(
614 id,
615 Box::new(move |value: Box<dyn Any>| {
616 let slot = callback
617 .borrow_mut()
618 .take()
619 .expect("UI continuation invoked more than once");
620 let value = value
621 .downcast::<T>()
622 .expect("UI continuation type mismatch");
623 slot(*value);
624 }),
625 );
626 id
627 }
628
629 fn invoke_ui_cont(&self, id: u64, value: Box<dyn Any + Send>) {
630 debug_assert_eq!(
631 std::thread::current().id(),
632 self.ui_thread_id,
633 "UI continuation invoked off the runtime thread",
634 );
635 if let Some(callback) = self.ui_conts.borrow_mut().remove(&id) {
636 let value: Box<dyn Any> = value;
637 callback(value);
638 }
639 }
640
641 fn cancel_ui_cont(&self, id: u64) {
642 self.ui_conts.borrow_mut().remove(&id);
643 }
644
645 fn register_frame_callback(&self, callback: Box<dyn FnOnce(u64) + 'static>) -> FrameCallbackId {
646 let id = self.next_frame_callback_id.get();
647 self.next_frame_callback_id.set(id + 1);
648 self.frame_callbacks
649 .borrow_mut()
650 .push_back(FrameCallbackEntry {
651 id,
652 callback: Some(callback),
653 });
654 self.schedule();
655 id
656 }
657
658 fn cancel_frame_callback(&self, id: FrameCallbackId) {
659 let mut callbacks = self.frame_callbacks.borrow_mut();
660 if let Some(index) = callbacks.iter().position(|entry| entry.id == id) {
661 callbacks.remove(index);
662 }
663 let callbacks_empty = callbacks.is_empty();
664 drop(callbacks);
665 let local_pending = self
666 .local_tasks
667 .try_borrow()
668 .map(|tasks| !tasks.is_empty())
669 .unwrap_or(true);
670 let async_pending = self
671 .tasks
672 .try_borrow()
673 .map(|tasks| !tasks.is_empty())
674 .unwrap_or(true);
675 if !self.has_invalid_scopes()
676 && !self.has_updates()
677 && callbacks_empty
678 && !local_pending
679 && !self.ui_dispatcher.has_pending()
680 && !async_pending
681 {
682 *self.needs_frame.borrow_mut() = false;
683 }
684 }
685
686 fn drain_frame_callbacks(&self, frame_time_nanos: u64) {
687 let mut callbacks = self.frame_callbacks.borrow_mut();
688 let mut pending: Vec<Box<dyn FnOnce(u64) + 'static>> = Vec::with_capacity(callbacks.len());
689 while let Some(mut entry) = callbacks.pop_front() {
690 if let Some(callback) = entry.callback.take() {
691 pending.push(callback);
692 }
693 }
694 drop(callbacks);
695
696 if !pending.is_empty() {
701 let _ = crate::run_in_mutable_snapshot(|| {
702 for callback in pending {
703 callback(frame_time_nanos);
704 }
705 });
706 }
707
708 if !self.has_invalid_scopes()
709 && !self.has_updates()
710 && !self.has_frame_callbacks()
711 && !self.has_pending_ui()
712 {
713 *self.needs_frame.borrow_mut() = false;
714 }
715 }
716
717 fn debug_stats(&self) -> RuntimeDebugStats {
718 let node_updates = self.node_updates.borrow();
719 let invalid_scopes = self.invalid_scopes.borrow();
720 let scope_queue = self.scope_queue.borrow();
721 let frame_callbacks = self.frame_callbacks.borrow();
722 let local_tasks = self.local_tasks.borrow();
723 let ui_conts = self.ui_conts.borrow();
724 let tasks = self.tasks.borrow();
725 let external_state_owners = self.external_state_owners.borrow();
726
727 RuntimeDebugStats {
728 node_updates_len: node_updates.len(),
729 node_updates_cap: node_updates.capacity(),
730 invalid_scopes_len: invalid_scopes.len(),
731 invalid_scopes_cap: invalid_scopes.capacity(),
732 scope_queue_len: scope_queue.len(),
733 scope_queue_cap: scope_queue.capacity(),
734 frame_callbacks_len: frame_callbacks.len(),
735 frame_callbacks_cap: frame_callbacks.capacity(),
736 local_tasks_len: local_tasks.len(),
737 local_tasks_cap: local_tasks.capacity(),
738 ui_conts_len: ui_conts.len(),
739 ui_conts_cap: ui_conts.capacity(),
740 tasks_len: tasks.len(),
741 tasks_cap: tasks.capacity(),
742 external_state_owners_len: external_state_owners.len(),
743 external_state_owners_cap: external_state_owners.capacity(),
744 ui_dispatcher_pending: self.ui_dispatcher.pending.load(Ordering::SeqCst),
745 }
746 }
747}
748
749#[derive(Clone)]
750pub struct Runtime {
751 inner: Rc<RuntimeInner>,
752}
753
754impl Runtime {
755 pub fn new(scheduler: Arc<dyn RuntimeScheduler>) -> Self {
756 let inner = Rc::new(RuntimeInner::new(scheduler));
757 RuntimeInner::init_task_waker(&inner);
758 let runtime = Self { inner };
759 let handle = runtime.handle();
760 register_runtime_handle(&handle);
761 LAST_RUNTIME.with(|slot| *slot.borrow_mut() = Some(handle));
762 runtime
763 }
764
765 pub fn handle(&self) -> RuntimeHandle {
766 RuntimeHandle {
767 inner: Rc::downgrade(&self.inner),
768 dispatcher: UiDispatcher::new(self.inner.ui_dispatcher.clone()),
769 ui_thread_id: self.inner.ui_thread_id,
770 id: self.inner.runtime_id,
771 }
772 }
773
774 pub fn has_updates(&self) -> bool {
775 self.inner.has_updates()
776 }
777
778 pub fn needs_frame(&self) -> bool {
779 *self.inner.needs_frame.borrow() || self.inner.ui_dispatcher.has_pending()
780 }
781
782 pub fn set_needs_frame(&self, value: bool) {
783 *self.inner.needs_frame.borrow_mut() = value;
784 }
785
786 #[cfg(any(feature = "internal", test))]
787 pub fn frame_clock(&self) -> FrameClock {
788 FrameClock::new(self.handle())
789 }
790}
791
792impl Drop for Runtime {
793 fn drop(&mut self) {
794 if Rc::strong_count(&self.inner) != 1 {
795 return;
796 }
797 unregister_runtime_handle(self.inner.runtime_id);
798 LAST_RUNTIME.with(|slot| {
799 let should_clear = slot
800 .borrow()
801 .as_ref()
802 .is_some_and(|handle| handle.id() == self.inner.runtime_id);
803 if should_clear {
804 *slot.borrow_mut() = None;
805 }
806 });
807 }
808}
809
810#[derive(Default)]
811pub struct DefaultScheduler;
812
813impl RuntimeScheduler for DefaultScheduler {
814 fn schedule_frame(&self) {}
815}
816
817#[cfg(test)]
818#[derive(Default)]
819pub struct TestScheduler;
820
821#[cfg(test)]
822impl RuntimeScheduler for TestScheduler {
823 fn schedule_frame(&self) {}
824}
825
826#[cfg(test)]
827pub struct TestRuntime {
828 runtime: Runtime,
829}
830
831#[cfg(test)]
832impl Default for TestRuntime {
833 fn default() -> Self {
834 Self::new()
835 }
836}
837
838#[cfg(test)]
839impl TestRuntime {
840 pub fn new() -> Self {
841 Self {
842 runtime: Runtime::new(Arc::new(TestScheduler)),
843 }
844 }
845
846 pub fn handle(&self) -> RuntimeHandle {
847 self.runtime.handle()
848 }
849}
850
851#[derive(Clone)]
852pub struct RuntimeHandle {
853 inner: Weak<RuntimeInner>,
854 dispatcher: UiDispatcher,
855 ui_thread_id: ThreadId,
856 id: RuntimeId,
857}
858
859pub struct TaskHandle {
860 id: u64,
861 runtime: RuntimeHandle,
862}
863
864struct DeferredStateRelease {
865 runtime: RuntimeHandle,
866 id: StateId,
867}
868
869pub(crate) struct StateHandleLease {
870 id: StateId,
871 runtime: RuntimeHandle,
872}
873
874impl StateHandleLease {
875 pub(crate) fn id(&self) -> StateId {
876 self.id
877 }
878
879 pub(crate) fn runtime(&self) -> RuntimeHandle {
880 self.runtime.clone()
881 }
882}
883
884impl Drop for StateHandleLease {
885 fn drop(&mut self) {
886 defer_state_release(self.runtime.clone(), self.id);
887 }
888}
889
890impl RuntimeHandle {
891 pub fn id(&self) -> RuntimeId {
892 self.id
893 }
894
895 pub(crate) fn alloc_state<T: Clone + 'static>(&self, value: T) -> Rc<StateHandleLease> {
896 let id = self.with_state_arena(|arena| arena.alloc(value, self.clone()));
897 let lease = Rc::new(StateHandleLease {
898 id,
899 runtime: self.clone(),
900 });
901 self.with_state_arena(|arena| arena.register_lease(id, &lease));
902 lease
903 }
904
905 pub(crate) fn alloc_state_with_policy<T: Clone + 'static>(
906 &self,
907 value: T,
908 policy: Arc<dyn MutationPolicy<T>>,
909 ) -> Rc<StateHandleLease> {
910 let id =
911 self.with_state_arena(|arena| arena.alloc_with_policy(value, self.clone(), policy));
912 let lease = Rc::new(StateHandleLease {
913 id,
914 runtime: self.clone(),
915 });
916 self.with_state_arena(|arena| arena.register_lease(id, &lease));
917 lease
918 }
919
920 pub(crate) fn alloc_persistent_state<T: Clone + 'static>(
921 &self,
922 value: T,
923 ) -> crate::MutableState<T> {
924 let lease = self.alloc_state(value);
925 if let Some(inner) = self.inner.upgrade() {
926 inner
927 .external_state_owners
928 .borrow_mut()
929 .insert(lease.id(), Rc::clone(&lease));
930 }
931 crate::MutableState::from_lease(&lease)
932 }
933
934 pub(crate) fn retain_state_lease(&self, id: StateId) -> Option<Rc<StateHandleLease>> {
935 self.with_state_arena(|arena| arena.retain_lease(id))
936 }
937
938 pub(crate) fn with_state_arena<R>(&self, f: impl FnOnce(&StateArena) -> R) -> R {
939 self.inner
940 .upgrade()
941 .map(|inner| f(&inner.state_arena))
942 .unwrap_or_else(|| panic!("runtime dropped"))
943 }
944
945 fn release_state_immediate(&self, id: StateId) {
946 if let Some(inner) = self.inner.upgrade() {
947 inner.state_arena.release(id);
948 }
949 }
950
951 pub fn state_arena_stats(&self) -> (usize, usize) {
952 self.with_state_arena(StateArena::stats)
953 }
954
955 pub fn state_arena_debug_stats(&self) -> StateArenaDebugStats {
956 self.with_state_arena(StateArena::debug_stats)
957 }
958
959 pub fn debug_stats(&self) -> RuntimeDebugStats {
960 self.inner
961 .upgrade()
962 .map(|inner| inner.debug_stats())
963 .unwrap_or_default()
964 }
965
966 pub(crate) fn unregister_state_scope(&self, id: StateId, scope_id: ScopeId) {
967 if let Some(inner) = self.inner.upgrade() {
968 inner.state_arena.unregister_scope(id, scope_id);
969 }
970 }
971
972 pub fn schedule(&self) {
973 if let Some(inner) = self.inner.upgrade() {
974 inner.schedule();
975 }
976 }
977
978 pub(crate) fn enqueue_node_update(&self, command: Command) {
979 if let Some(inner) = self.inner.upgrade() {
980 inner.enqueue_update(command);
981 }
982 }
983
984 pub fn enqueue_ui_task(&self, task: Box<dyn FnOnce() + 'static>) {
991 if let Some(inner) = self.inner.upgrade() {
992 inner.enqueue_ui_task(task);
993 } else {
994 task();
995 }
996 }
997
998 pub fn spawn_ui<F>(&self, fut: F) -> Option<TaskHandle>
999 where
1000 F: Future<Output = ()> + 'static,
1001 {
1002 self.inner.upgrade().map(|inner| {
1003 let id = inner.spawn_ui_task(Box::pin(fut));
1004 TaskHandle {
1005 id,
1006 runtime: self.clone(),
1007 }
1008 })
1009 }
1010
1011 pub fn cancel_task(&self, id: u64) {
1012 if let Some(inner) = self.inner.upgrade() {
1013 inner.cancel_task(id);
1014 }
1015 }
1016
1017 pub fn post_ui(&self, task: impl FnOnce() + Send + 'static) {
1022 self.dispatcher.post(task);
1023 }
1024
1025 pub fn register_ui_cont<T: 'static>(&self, f: impl FnOnce(T) + 'static) -> Option<u64> {
1026 self.inner.upgrade().map(|inner| inner.register_ui_cont(f))
1027 }
1028
1029 pub fn cancel_ui_cont(&self, id: u64) {
1030 if let Some(inner) = self.inner.upgrade() {
1031 inner.cancel_ui_cont(id);
1032 }
1033 }
1034
1035 pub fn drain_ui(&self) {
1036 if let Some(inner) = self.inner.upgrade() {
1037 inner.drain_ui();
1038 }
1039 }
1040
1041 pub fn has_pending_ui(&self) -> bool {
1042 self.inner
1043 .upgrade()
1044 .map(|inner| inner.has_pending_ui())
1045 .unwrap_or_else(|| self.dispatcher.has_pending())
1046 }
1047
1048 pub fn register_frame_callback(
1049 &self,
1050 callback: impl FnOnce(u64) + 'static,
1051 ) -> Option<FrameCallbackId> {
1052 self.inner
1053 .upgrade()
1054 .map(|inner| inner.register_frame_callback(Box::new(callback)))
1055 }
1056
1057 pub fn cancel_frame_callback(&self, id: FrameCallbackId) {
1058 if let Some(inner) = self.inner.upgrade() {
1059 inner.cancel_frame_callback(id);
1060 }
1061 }
1062
1063 pub fn drain_frame_callbacks(&self, frame_time_nanos: u64) {
1064 if let Some(inner) = self.inner.upgrade() {
1065 inner.drain_frame_callbacks(frame_time_nanos);
1066 }
1067 }
1068
1069 #[cfg(any(feature = "internal", test))]
1070 pub fn frame_clock(&self) -> FrameClock {
1071 FrameClock::new(self.clone())
1072 }
1073
1074 pub fn set_needs_frame(&self, value: bool) {
1075 if let Some(inner) = self.inner.upgrade() {
1076 *inner.needs_frame.borrow_mut() = value;
1077 }
1078 }
1079
1080 pub(crate) fn take_updates(&self) -> Vec<Command> {
1081 self.inner
1082 .upgrade()
1083 .map(|inner| inner.take_updates())
1084 .unwrap_or_default()
1085 }
1086
1087 pub fn has_updates(&self) -> bool {
1088 self.inner
1089 .upgrade()
1090 .map(|inner| inner.has_updates())
1091 .unwrap_or(false)
1092 }
1093
1094 pub(crate) fn mark_scope_recomposed(&self, id: ScopeId) {
1095 if let Some(inner) = self.inner.upgrade() {
1096 inner.mark_scope_recomposed(id);
1097 }
1098 }
1099
1100 pub(crate) fn register_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
1101 if let Some(inner) = self.inner.upgrade() {
1102 inner.register_invalid_scope(id, scope);
1103 }
1104 }
1105
1106 pub(crate) fn requeue_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
1107 if let Some(inner) = self.inner.upgrade() {
1108 inner.requeue_invalid_scope(id, scope);
1109 }
1110 }
1111
1112 pub(crate) fn take_invalidated_scopes(&self) -> Vec<(ScopeId, Weak<RecomposeScopeInner>)> {
1113 self.inner
1114 .upgrade()
1115 .map(|inner| inner.take_invalidated_scopes())
1116 .unwrap_or_default()
1117 }
1118
1119 pub fn has_invalid_scopes(&self) -> bool {
1120 self.inner
1121 .upgrade()
1122 .map(|inner| inner.has_invalid_scopes())
1123 .unwrap_or(false)
1124 }
1125
1126 #[doc(hidden)]
1127 pub fn debug_invalid_scope_ids(&self) -> Vec<usize> {
1128 self.inner
1129 .upgrade()
1130 .map(|inner| inner.invalid_scopes.borrow().iter().copied().collect())
1131 .unwrap_or_default()
1132 }
1133
1134 pub fn has_frame_callbacks(&self) -> bool {
1135 self.inner
1136 .upgrade()
1137 .map(|inner| inner.has_frame_callbacks())
1138 .unwrap_or(false)
1139 }
1140
1141 pub fn assert_ui_thread(&self) {
1142 debug_assert_eq!(
1143 std::thread::current().id(),
1144 self.ui_thread_id,
1145 "state mutated off the runtime's UI thread"
1146 );
1147 }
1148
1149 pub fn dispatcher(&self) -> UiDispatcher {
1150 self.dispatcher.clone()
1151 }
1152
1153 #[doc(hidden)]
1154 pub fn with_deferred_state_releases<R>(&self, f: impl FnOnce() -> R) -> R {
1155 let _scope = enter_state_teardown_scope();
1156 f()
1157 }
1158}
1159
1160impl TaskHandle {
1161 pub fn cancel(self) {
1162 self.runtime.cancel_task(self.id);
1163 }
1164}
1165
1166pub(crate) struct FrameCallbackEntry {
1167 id: FrameCallbackId,
1168 callback: Option<Box<dyn FnOnce(u64) + 'static>>,
1169}
1170
1171struct RuntimeTaskWaker {
1172 scheduler: Arc<dyn RuntimeScheduler>,
1173}
1174
1175impl RuntimeTaskWaker {
1176 fn new(inner: Weak<RuntimeInner>) -> Self {
1177 let scheduler = inner
1180 .upgrade()
1181 .map(|rc| rc.scheduler.clone())
1182 .expect("RuntimeInner dropped before waker created");
1183 Self { scheduler }
1184 }
1185
1186 fn into_waker(self) -> Waker {
1187 futures_task::waker(Arc::new(self))
1188 }
1189}
1190
1191impl futures_task::ArcWake for RuntimeTaskWaker {
1192 fn wake_by_ref(arc_self: &Arc<Self>) {
1193 arc_self.scheduler.schedule_frame();
1194 }
1195}
1196
1197thread_local! {
1198 static ACTIVE_RUNTIMES: RefCell<Vec<RuntimeHandle>> = const { RefCell::new(Vec::new()) };
1199 static LAST_RUNTIME: RefCell<Option<RuntimeHandle>> = const { RefCell::new(None) };
1200 static REGISTERED_RUNTIMES: RefCell<HashMap<RuntimeId, RuntimeHandle>> = RefCell::new(HashMap::default());
1201 static STATE_TEARDOWN_DEPTH: Cell<usize> = const { Cell::new(0) };
1202 static DEFERRED_STATE_RELEASES: RefCell<Vec<DeferredStateRelease>> = const { RefCell::new(Vec::new()) };
1203}
1204
1205#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1206pub struct RuntimeThreadLocalDebugStats {
1207 pub active_runtimes_len: usize,
1208 pub active_runtimes_cap: usize,
1209 pub registered_runtimes_len: usize,
1210 pub registered_runtimes_cap: usize,
1211 pub deferred_state_releases_len: usize,
1212 pub deferred_state_releases_cap: usize,
1213}
1214
1215pub fn current_runtime_handle() -> Option<RuntimeHandle> {
1220 if let Some(handle) = ACTIVE_RUNTIMES.with(|stack| stack.borrow().last().cloned()) {
1221 return Some(handle);
1222 }
1223 LAST_RUNTIME.with(|slot| slot.borrow().clone())
1224}
1225
1226pub(crate) fn runtime_handle_by_id(id: RuntimeId) -> Option<RuntimeHandle> {
1227 REGISTERED_RUNTIMES.with(|registry| registry.borrow().get(&id).cloned())
1228}
1229
1230pub fn debug_runtime_thread_local_stats() -> RuntimeThreadLocalDebugStats {
1231 let (active_runtimes_len, active_runtimes_cap) = ACTIVE_RUNTIMES.with(|stack| {
1232 let stack = stack.borrow();
1233 (stack.len(), stack.capacity())
1234 });
1235 let (registered_runtimes_len, registered_runtimes_cap) = REGISTERED_RUNTIMES.with(|registry| {
1236 let registry = registry.borrow();
1237 (registry.len(), registry.capacity())
1238 });
1239 let (deferred_state_releases_len, deferred_state_releases_cap) =
1240 DEFERRED_STATE_RELEASES.with(|releases| {
1241 let releases = releases.borrow();
1242 (releases.len(), releases.capacity())
1243 });
1244
1245 RuntimeThreadLocalDebugStats {
1246 active_runtimes_len,
1247 active_runtimes_cap,
1248 registered_runtimes_len,
1249 registered_runtimes_cap,
1250 deferred_state_releases_len,
1251 deferred_state_releases_cap,
1252 }
1253}
1254
1255fn register_runtime_handle(handle: &RuntimeHandle) {
1256 REGISTERED_RUNTIMES.with(|registry| {
1257 registry.borrow_mut().insert(handle.id(), handle.clone());
1258 });
1259}
1260
1261fn unregister_runtime_handle(id: RuntimeId) {
1262 REGISTERED_RUNTIMES.with(|registry| {
1263 registry.borrow_mut().remove(&id);
1264 });
1265}
1266
1267fn defer_state_release(runtime: RuntimeHandle, id: StateId) {
1268 let teardown_active = STATE_TEARDOWN_DEPTH.with(|depth| depth.get() > 0);
1269 if teardown_active {
1270 DEFERRED_STATE_RELEASES.with(|releases| {
1271 releases
1272 .borrow_mut()
1273 .push(DeferredStateRelease { runtime, id });
1274 });
1275 } else {
1276 runtime.release_state_immediate(id);
1277 }
1278}
1279
1280fn flush_deferred_state_releases() {
1281 DEFERRED_STATE_RELEASES.with(|releases| {
1282 let mut releases = releases.borrow_mut();
1283 while let Some(deferred) = releases.pop() {
1284 deferred.runtime.release_state_immediate(deferred.id);
1285 }
1286 });
1287}
1288
1289pub(crate) struct StateTeardownScope;
1290
1291pub(crate) fn enter_state_teardown_scope() -> StateTeardownScope {
1292 STATE_TEARDOWN_DEPTH.with(|depth| depth.set(depth.get() + 1));
1293 StateTeardownScope
1294}
1295
1296impl Drop for StateTeardownScope {
1297 fn drop(&mut self) {
1298 STATE_TEARDOWN_DEPTH.with(|depth| {
1299 let next = depth.get().saturating_sub(1);
1300 depth.set(next);
1301 if next == 0 {
1302 flush_deferred_state_releases();
1303 }
1304 });
1305 }
1306}
1307
1308pub(crate) fn push_active_runtime(handle: &RuntimeHandle) {
1309 register_runtime_handle(handle);
1310 ACTIVE_RUNTIMES.with(|stack| stack.borrow_mut().push(handle.clone()));
1311 LAST_RUNTIME.with(|slot| *slot.borrow_mut() = Some(handle.clone()));
1312}
1313
1314pub(crate) fn pop_active_runtime() {
1315 ACTIVE_RUNTIMES.with(|stack| {
1316 stack.borrow_mut().pop();
1317 });
1318}
1319
1320pub fn schedule_frame() {
1322 if let Some(handle) = current_runtime_handle() {
1323 handle.schedule();
1324 return;
1325 }
1326 panic!("no runtime available to schedule frame");
1327}
1328
1329pub fn schedule_node_update(
1331 update: impl FnOnce(&mut dyn Applier) -> Result<(), NodeError> + 'static,
1332) {
1333 let handle = current_runtime_handle().expect("no runtime available to schedule node update");
1334 handle.enqueue_node_update(Command::callback(update));
1335}