1use crate::collections::map::HashMap;
2use crate::collections::map::HashSet;
3use crate::state::{MutationPolicy, NeverEqual};
4use crate::MutableStateInner;
5use std::any::Any;
6use std::cell::{Cell, RefCell};
7use std::collections::VecDeque;
8use std::future::Future;
9use std::pin::Pin;
10use std::rc::{Rc, Weak};
11use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
12use std::sync::{mpsc, Arc};
13use std::task::{Context, Poll, Waker};
14use std::thread::ThreadId;
15use std::thread_local;
16
17#[cfg(any(feature = "internal", test))]
18use crate::frame_clock::FrameClock;
19use crate::platform::RuntimeScheduler;
20use crate::{Applier, Command, FrameCallbackId, NodeError, RecomposeScopeInner, ScopeId};
21
22enum UiMessage {
23 Task(Box<dyn FnOnce() + Send + 'static>),
24 Invoke { id: u64, value: Box<dyn Any + Send> },
25}
26
27type UiContinuation = Box<dyn Fn(Box<dyn Any>) + 'static>;
28type UiContinuationMap = HashMap<u64, UiContinuation>;
29
30struct TypedStateCell<T: Clone + 'static> {
31 inner: MutableStateInner<T>,
32}
33
34trait ScopeWatchCell {
35 fn unregister_scope(&self, scope_id: ScopeId);
36}
37
38impl<T: Clone + 'static> ScopeWatchCell for TypedStateCell<T> {
39 fn unregister_scope(&self, scope_id: ScopeId) {
40 self.inner.unregister_scope(scope_id);
41 }
42}
43
44struct StateArenaSlot {
45 generation: u32,
46 cell: Option<Rc<dyn Any>>,
47 watcher_cell: Option<Rc<dyn ScopeWatchCell>>,
48 lease: Option<Weak<StateHandleLease>>,
49}
50
51#[derive(Default)]
52struct StateArenaInner {
53 cells: Vec<StateArenaSlot>,
54 free: Vec<u32>,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
58pub struct StateArenaDebugStats {
59 pub cells_len: usize,
60 pub cells_cap: usize,
61 pub free_len: usize,
62 pub free_cap: usize,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
66pub struct RuntimeDebugStats {
67 pub node_updates_len: usize,
68 pub node_updates_cap: usize,
69 pub invalid_scopes_len: usize,
70 pub invalid_scopes_cap: usize,
71 pub scope_queue_len: usize,
72 pub scope_queue_cap: usize,
73 pub frame_callbacks_len: usize,
74 pub frame_callbacks_cap: usize,
75 pub local_tasks_len: usize,
76 pub local_tasks_cap: usize,
77 pub ui_conts_len: usize,
78 pub ui_conts_cap: usize,
79 pub tasks_len: usize,
80 pub tasks_cap: usize,
81 pub external_state_owners_len: usize,
82 pub external_state_owners_cap: usize,
83 pub ui_dispatcher_pending: usize,
84}
85
86#[derive(Default)]
87pub(crate) struct StateArena {
88 inner: RefCell<StateArenaInner>,
89}
90
91impl StateArena {
92 pub(crate) fn alloc<T: Clone + 'static>(&self, value: T, runtime: RuntimeHandle) -> StateId {
93 self.alloc_with_policy(value, runtime, Arc::new(NeverEqual))
94 }
95
96 pub(crate) fn alloc_with_policy<T: Clone + 'static>(
97 &self,
98 value: T,
99 runtime: RuntimeHandle,
100 policy: Arc<dyn MutationPolicy<T>>,
101 ) -> StateId {
102 let (slot, generation) = {
103 let mut inner = self.inner.borrow_mut();
104 match inner.free.pop() {
105 Some(slot) => {
106 let entry = inner
107 .cells
108 .get_mut(slot as usize)
109 .expect("state slot missing");
110 debug_assert!(entry.cell.is_none(), "reused state slot must be empty");
111 entry.generation = entry.generation.wrapping_add(1);
112 (slot, entry.generation)
113 }
114 None => {
115 let slot = inner.cells.len() as u32;
116 inner.cells.push(StateArenaSlot {
117 generation: 0,
118 cell: None,
119 watcher_cell: None,
120 lease: None,
121 });
122 (slot, 0)
123 }
124 }
125 };
126 let id = StateId::new(slot, generation);
127 let inner = MutableStateInner::new_with_policy(value, runtime.clone(), policy);
128 inner.install_snapshot_observer(id);
129 let typed_cell = Rc::new(TypedStateCell { inner });
130 let cell: Rc<dyn Any> = typed_cell.clone();
131 let watcher_cell: Rc<dyn ScopeWatchCell> = typed_cell;
132 let mut arena = self.inner.borrow_mut();
133 let slot_entry = &mut arena.cells[slot as usize];
134 slot_entry.cell = Some(cell);
135 slot_entry.watcher_cell = Some(watcher_cell);
136 id
137 }
138
139 fn get_cell_opt(&self, id: StateId) -> Option<Rc<dyn Any>> {
140 self.inner
141 .borrow()
142 .cells
143 .get(id.slot_index())
144 .filter(|cell| cell.generation == id.generation())
145 .and_then(|cell| cell.cell.as_ref())
146 .cloned()
147 }
148
149 fn get_typed<T: Clone + 'static>(&self, id: StateId) -> Rc<TypedStateCell<T>> {
150 match self.get_cell_opt(id) {
151 None => panic!(
152 "state cell missing: slot={}, gen={}, expected={}",
153 id.slot(),
154 id.generation(),
155 std::any::type_name::<T>(),
156 ),
157 Some(cell) => Rc::downcast::<TypedStateCell<T>>(cell).unwrap_or_else(|_| {
158 panic!(
159 "state cell type mismatch: slot={}, gen={}, expected={}",
160 id.slot(),
161 id.generation(),
162 std::any::type_name::<T>(),
163 )
164 }),
165 }
166 }
167
168 fn get_typed_opt<T: Clone + 'static>(&self, id: StateId) -> Option<Rc<TypedStateCell<T>>> {
169 Rc::downcast::<TypedStateCell<T>>(self.get_cell_opt(id)?).ok()
170 }
171
172 pub(crate) fn with_typed<T: Clone + 'static, R>(
173 &self,
174 id: StateId,
175 f: impl FnOnce(&MutableStateInner<T>) -> R,
176 ) -> R {
177 let cell = self.get_typed::<T>(id);
178 f(&cell.inner)
179 }
180
181 pub(crate) fn with_typed_opt<T: Clone + 'static, R>(
182 &self,
183 id: StateId,
184 f: impl FnOnce(&MutableStateInner<T>) -> R,
185 ) -> Option<R> {
186 let cell = self.get_typed_opt::<T>(id)?;
187 Some(f(&cell.inner))
188 }
189
190 pub(crate) fn release(&self, id: StateId) {
191 let cell = {
192 let mut inner = self.inner.borrow_mut();
193 let Some(slot) = inner.cells.get_mut(id.slot_index()) else {
194 return;
195 };
196 if slot.generation != id.generation() {
197 return;
198 }
199 slot.lease = None;
200 slot.watcher_cell = None;
201 let cell = slot.cell.take();
202 if cell.is_some() {
203 inner.free.push(id.slot());
204 }
205 cell
206 };
207 drop(cell);
208 }
209
210 pub(crate) fn stats(&self) -> (usize, usize) {
211 let inner = self.inner.borrow();
212 (inner.cells.len(), inner.free.len())
213 }
214
215 pub(crate) fn debug_stats(&self) -> StateArenaDebugStats {
216 let inner = self.inner.borrow();
217 StateArenaDebugStats {
218 cells_len: inner.cells.len(),
219 cells_cap: inner.cells.capacity(),
220 free_len: inner.free.len(),
221 free_cap: inner.free.capacity(),
222 }
223 }
224
225 pub(crate) fn unregister_scope(&self, id: StateId, scope_id: ScopeId) {
226 let watcher_cell = {
227 let inner = self.inner.borrow();
228 inner
229 .cells
230 .get(id.slot_index())
231 .filter(|slot| slot.generation == id.generation())
232 .and_then(|slot| slot.watcher_cell.as_ref())
233 .cloned()
234 };
235 if let Some(watcher_cell) = watcher_cell {
236 watcher_cell.unregister_scope(scope_id);
237 }
238 }
239
240 pub(crate) fn register_lease(&self, id: StateId, lease: &Rc<StateHandleLease>) {
241 let mut inner = self.inner.borrow_mut();
242 let Some(slot) = inner.cells.get_mut(id.slot_index()) else {
243 panic!("state slot missing");
244 };
245 assert_eq!(
246 slot.generation,
247 id.generation(),
248 "state generation mismatch"
249 );
250 slot.lease = Some(Rc::downgrade(lease));
251 }
252
253 pub(crate) fn retain_lease(&self, id: StateId) -> Option<Rc<StateHandleLease>> {
254 let inner = self.inner.borrow();
255 let slot = inner.cells.get(id.slot_index())?;
256 if slot.generation != id.generation() {
257 return None;
258 }
259 slot.lease.as_ref()?.upgrade()
260 }
261}
262
263#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
264pub struct StateId {
265 slot: u32,
266 generation: u32,
267}
268
269impl StateId {
270 const fn new(slot: u32, generation: u32) -> Self {
271 Self { slot, generation }
272 }
273
274 pub(crate) const fn slot(self) -> u32 {
275 self.slot
276 }
277
278 pub(crate) const fn slot_index(self) -> usize {
279 self.slot as usize
280 }
281
282 pub(crate) const fn generation(self) -> u32 {
283 self.generation
284 }
285}
286
287#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
288pub struct RuntimeId(u32);
289
290impl RuntimeId {
291 fn next() -> Self {
292 static NEXT_RUNTIME_ID: AtomicU32 = AtomicU32::new(1);
293 Self(NEXT_RUNTIME_ID.fetch_add(1, Ordering::Relaxed))
294 }
295}
296
297struct UiDispatcherInner {
298 scheduler: Arc<dyn RuntimeScheduler>,
299 tx: mpsc::Sender<UiMessage>,
300 pending: AtomicUsize,
301}
302
303impl UiDispatcherInner {
304 fn new(scheduler: Arc<dyn RuntimeScheduler>, tx: mpsc::Sender<UiMessage>) -> Self {
305 Self {
306 scheduler,
307 tx,
308 pending: AtomicUsize::new(0),
309 }
310 }
311
312 fn post(&self, task: impl FnOnce() + Send + 'static) {
313 self.pending.fetch_add(1, Ordering::SeqCst);
314 let _ = self.tx.send(UiMessage::Task(Box::new(task)));
315 self.scheduler.schedule_frame();
316 }
317
318 fn post_invoke(&self, id: u64, value: Box<dyn Any + Send>) {
319 self.pending.fetch_add(1, Ordering::SeqCst);
320 let _ = self.tx.send(UiMessage::Invoke { id, value });
321 self.scheduler.schedule_frame();
322 }
323
324 fn has_pending(&self) -> bool {
325 self.pending.load(Ordering::SeqCst) > 0
326 }
327}
328
329struct PendingGuard<'a> {
330 counter: &'a AtomicUsize,
331}
332
333impl<'a> PendingGuard<'a> {
334 fn new(counter: &'a AtomicUsize) -> Self {
335 Self { counter }
336 }
337}
338
339impl<'a> Drop for PendingGuard<'a> {
340 fn drop(&mut self) {
341 let previous = self.counter.fetch_sub(1, Ordering::SeqCst);
342 debug_assert!(previous > 0, "UI dispatcher pending count underflowed");
343 }
344}
345
346#[derive(Clone)]
347pub struct UiDispatcher {
348 inner: Arc<UiDispatcherInner>,
349}
350
351impl UiDispatcher {
352 fn new(inner: Arc<UiDispatcherInner>) -> Self {
353 Self { inner }
354 }
355
356 pub fn post(&self, task: impl FnOnce() + Send + 'static) {
357 self.inner.post(task);
358 }
359
360 pub fn post_invoke<T>(&self, id: u64, value: T)
361 where
362 T: Send + 'static,
363 {
364 self.inner.post_invoke(id, Box::new(value));
365 }
366
367 pub fn has_pending(&self) -> bool {
368 self.inner.has_pending()
369 }
370}
371
372struct RuntimeInner {
373 scheduler: Arc<dyn RuntimeScheduler>,
374 needs_frame: RefCell<bool>,
375 node_updates: RefCell<Vec<Command>>,
376 invalid_scopes: RefCell<HashSet<ScopeId>>,
377 scope_queue: RefCell<Vec<(ScopeId, Weak<RecomposeScopeInner>)>>,
378 frame_callbacks: RefCell<VecDeque<FrameCallbackEntry>>,
379 next_frame_callback_id: Cell<u64>,
380 ui_dispatcher: Arc<UiDispatcherInner>,
381 ui_rx: RefCell<mpsc::Receiver<UiMessage>>,
382 local_tasks: RefCell<VecDeque<Box<dyn FnOnce() + 'static>>>,
383 ui_conts: RefCell<UiContinuationMap>,
384 next_cont_id: Cell<u64>,
385 ui_thread_id: ThreadId,
386 tasks: RefCell<Vec<TaskEntry>>,
387 next_task_id: Cell<u64>,
388 task_waker: RefCell<Option<Waker>>,
389 state_arena: StateArena,
390 external_state_owners: RefCell<HashMap<StateId, Rc<StateHandleLease>>>,
391 runtime_id: RuntimeId,
392}
393
394struct TaskEntry {
395 id: u64,
396 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
397}
398
399impl RuntimeInner {
400 fn new(scheduler: Arc<dyn RuntimeScheduler>) -> Self {
401 let (tx, rx) = mpsc::channel();
402 let dispatcher = Arc::new(UiDispatcherInner::new(scheduler.clone(), tx));
403 Self {
404 scheduler,
405 needs_frame: RefCell::new(false),
406 node_updates: RefCell::new(Vec::new()),
407 invalid_scopes: RefCell::new(HashSet::default()),
408 scope_queue: RefCell::new(Vec::new()),
409 frame_callbacks: RefCell::new(VecDeque::new()),
410 next_frame_callback_id: Cell::new(1),
411 ui_dispatcher: dispatcher,
412 ui_rx: RefCell::new(rx),
413 local_tasks: RefCell::new(VecDeque::new()),
414 ui_conts: RefCell::new(UiContinuationMap::default()),
415 next_cont_id: Cell::new(1),
416 ui_thread_id: std::thread::current().id(),
417 tasks: RefCell::new(Vec::new()),
418 next_task_id: Cell::new(1),
419 task_waker: RefCell::new(None),
420 state_arena: StateArena::default(),
421 external_state_owners: RefCell::new(HashMap::default()),
422 runtime_id: RuntimeId::next(),
423 }
424 }
425
426 fn init_task_waker(this: &Rc<Self>) {
427 let weak = Rc::downgrade(this);
428 let waker = RuntimeTaskWaker::new(weak).into_waker();
429 *this.task_waker.borrow_mut() = Some(waker);
430 }
431
432 fn schedule(&self) {
433 *self.needs_frame.borrow_mut() = true;
434 self.scheduler.schedule_frame();
435 }
436
437 fn enqueue_update(&self, command: Command) {
438 self.node_updates.borrow_mut().push(command);
439 self.schedule(); }
441
442 fn take_updates(&self) -> Vec<Command> {
443 let updates = self.node_updates.borrow_mut().drain(..).collect::<Vec<_>>();
444 updates
445 }
446
447 fn has_updates(&self) -> bool {
448 !self.node_updates.borrow().is_empty() || self.has_invalid_scopes()
449 }
450
451 fn register_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
452 let mut invalid = self.invalid_scopes.borrow_mut();
453 if invalid.insert(id) {
454 self.scope_queue.borrow_mut().push((id, scope));
455 self.schedule();
456 }
457 }
458
459 fn mark_scope_recomposed(&self, id: ScopeId) {
460 self.invalid_scopes.borrow_mut().remove(&id);
461 }
462
463 fn take_invalidated_scopes(&self) -> Vec<(ScopeId, Weak<RecomposeScopeInner>)> {
464 let mut queue = self.scope_queue.borrow_mut();
465 if queue.is_empty() {
466 return Vec::new();
467 }
468 let pending: Vec<_> = queue.drain(..).collect();
469 drop(queue);
470 let invalid = self.invalid_scopes.borrow();
471 pending
472 .into_iter()
473 .filter(|(id, _)| invalid.contains(id))
474 .collect()
475 }
476
477 fn has_invalid_scopes(&self) -> bool {
478 !self.invalid_scopes.borrow().is_empty()
479 }
480
481 fn has_frame_callbacks(&self) -> bool {
482 !self.frame_callbacks.borrow().is_empty()
483 }
484
485 fn enqueue_ui_task(&self, task: Box<dyn FnOnce() + 'static>) {
490 self.local_tasks.borrow_mut().push_back(task);
491 self.schedule();
492 }
493
494 fn spawn_ui_task(&self, future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> u64 {
495 let id = self.next_task_id.get();
496 self.next_task_id.set(id + 1);
497 self.tasks.borrow_mut().push(TaskEntry { id, future });
498 self.schedule();
499 id
500 }
501
502 fn cancel_task(&self, id: u64) {
503 let mut tasks = self.tasks.borrow_mut();
504 if tasks.iter().any(|entry| entry.id == id) {
505 tasks.retain(|entry| entry.id != id);
506 }
507 }
508
509 fn poll_async_tasks(&self) -> bool {
510 let waker = match self.task_waker.borrow().as_ref() {
511 Some(waker) => waker.clone(),
512 None => return false,
513 };
514 let mut cx = Context::from_waker(&waker);
515 let mut tasks_ref = self.tasks.borrow_mut();
516 let tasks = std::mem::take(&mut *tasks_ref);
517 drop(tasks_ref);
518 let mut pending = Vec::with_capacity(tasks.len());
519 let mut made_progress = false;
520 for mut entry in tasks.into_iter() {
521 match entry.future.as_mut().poll(&mut cx) {
522 Poll::Ready(()) => {
523 made_progress = true;
524 }
525 Poll::Pending => {
526 pending.push(entry);
527 }
528 }
529 }
530 if !pending.is_empty() {
531 self.tasks.borrow_mut().extend(pending);
532 }
533 made_progress
534 }
535
536 fn drain_ui(&self) {
537 loop {
538 let mut executed = false;
539
540 {
541 let rx = &mut *self.ui_rx.borrow_mut();
542 for message in rx.try_iter() {
543 executed = true;
544 let _guard = PendingGuard::new(&self.ui_dispatcher.pending);
545 match message {
546 UiMessage::Task(task) => {
547 task();
548 }
549 UiMessage::Invoke { id, value } => {
550 self.invoke_ui_cont(id, value);
551 }
552 }
553 }
554 }
555
556 loop {
557 let task = {
558 let mut local = self.local_tasks.borrow_mut();
559 local.pop_front()
560 };
561
562 match task {
563 Some(task) => {
564 executed = true;
565 task();
566 }
567 None => break,
568 }
569 }
570
571 if self.poll_async_tasks() {
572 executed = true;
573 }
574
575 if !executed {
576 break;
577 }
578 }
579 }
580
581 fn has_pending_ui(&self) -> bool {
582 let local_pending = self
583 .local_tasks
584 .try_borrow()
585 .map(|tasks| !tasks.is_empty())
586 .unwrap_or(true);
587
588 let async_pending = self
589 .tasks
590 .try_borrow()
591 .map(|tasks| !tasks.is_empty())
592 .unwrap_or(true);
593
594 local_pending || self.ui_dispatcher.has_pending() || async_pending
595 }
596
597 fn register_ui_cont<T: 'static>(&self, f: impl FnOnce(T) + 'static) -> u64 {
598 debug_assert_eq!(
599 std::thread::current().id(),
600 self.ui_thread_id,
601 "UI continuation registered off the runtime thread",
602 );
603 let id = self.next_cont_id.get();
604 self.next_cont_id.set(id + 1);
605 let callback = RefCell::new(Some(f));
606 self.ui_conts.borrow_mut().insert(
607 id,
608 Box::new(move |value: Box<dyn Any>| {
609 let slot = callback
610 .borrow_mut()
611 .take()
612 .expect("UI continuation invoked more than once");
613 let value = value
614 .downcast::<T>()
615 .expect("UI continuation type mismatch");
616 slot(*value);
617 }),
618 );
619 id
620 }
621
622 fn invoke_ui_cont(&self, id: u64, value: Box<dyn Any + Send>) {
623 debug_assert_eq!(
624 std::thread::current().id(),
625 self.ui_thread_id,
626 "UI continuation invoked off the runtime thread",
627 );
628 if let Some(callback) = self.ui_conts.borrow_mut().remove(&id) {
629 let value: Box<dyn Any> = value;
630 callback(value);
631 }
632 }
633
634 fn cancel_ui_cont(&self, id: u64) {
635 self.ui_conts.borrow_mut().remove(&id);
636 }
637
638 fn register_frame_callback(&self, callback: Box<dyn FnOnce(u64) + 'static>) -> FrameCallbackId {
639 let id = self.next_frame_callback_id.get();
640 self.next_frame_callback_id.set(id + 1);
641 self.frame_callbacks
642 .borrow_mut()
643 .push_back(FrameCallbackEntry {
644 id,
645 callback: Some(callback),
646 });
647 self.schedule();
648 id
649 }
650
651 fn cancel_frame_callback(&self, id: FrameCallbackId) {
652 let mut callbacks = self.frame_callbacks.borrow_mut();
653 if let Some(index) = callbacks.iter().position(|entry| entry.id == id) {
654 callbacks.remove(index);
655 }
656 let callbacks_empty = callbacks.is_empty();
657 drop(callbacks);
658 let local_pending = self
659 .local_tasks
660 .try_borrow()
661 .map(|tasks| !tasks.is_empty())
662 .unwrap_or(true);
663 let async_pending = self
664 .tasks
665 .try_borrow()
666 .map(|tasks| !tasks.is_empty())
667 .unwrap_or(true);
668 if !self.has_invalid_scopes()
669 && !self.has_updates()
670 && callbacks_empty
671 && !local_pending
672 && !self.ui_dispatcher.has_pending()
673 && !async_pending
674 {
675 *self.needs_frame.borrow_mut() = false;
676 }
677 }
678
679 fn drain_frame_callbacks(&self, frame_time_nanos: u64) {
680 let mut callbacks = self.frame_callbacks.borrow_mut();
681 let mut pending: Vec<Box<dyn FnOnce(u64) + 'static>> = Vec::with_capacity(callbacks.len());
682 while let Some(mut entry) = callbacks.pop_front() {
683 if let Some(callback) = entry.callback.take() {
684 pending.push(callback);
685 }
686 }
687 drop(callbacks);
688
689 if !pending.is_empty() {
694 let _ = crate::run_in_mutable_snapshot(|| {
695 for callback in pending {
696 callback(frame_time_nanos);
697 }
698 });
699 }
700
701 if !self.has_invalid_scopes()
702 && !self.has_updates()
703 && !self.has_frame_callbacks()
704 && !self.has_pending_ui()
705 {
706 *self.needs_frame.borrow_mut() = false;
707 }
708 }
709
710 fn debug_stats(&self) -> RuntimeDebugStats {
711 let node_updates = self.node_updates.borrow();
712 let invalid_scopes = self.invalid_scopes.borrow();
713 let scope_queue = self.scope_queue.borrow();
714 let frame_callbacks = self.frame_callbacks.borrow();
715 let local_tasks = self.local_tasks.borrow();
716 let ui_conts = self.ui_conts.borrow();
717 let tasks = self.tasks.borrow();
718 let external_state_owners = self.external_state_owners.borrow();
719
720 RuntimeDebugStats {
721 node_updates_len: node_updates.len(),
722 node_updates_cap: node_updates.capacity(),
723 invalid_scopes_len: invalid_scopes.len(),
724 invalid_scopes_cap: invalid_scopes.capacity(),
725 scope_queue_len: scope_queue.len(),
726 scope_queue_cap: scope_queue.capacity(),
727 frame_callbacks_len: frame_callbacks.len(),
728 frame_callbacks_cap: frame_callbacks.capacity(),
729 local_tasks_len: local_tasks.len(),
730 local_tasks_cap: local_tasks.capacity(),
731 ui_conts_len: ui_conts.len(),
732 ui_conts_cap: ui_conts.capacity(),
733 tasks_len: tasks.len(),
734 tasks_cap: tasks.capacity(),
735 external_state_owners_len: external_state_owners.len(),
736 external_state_owners_cap: external_state_owners.capacity(),
737 ui_dispatcher_pending: self.ui_dispatcher.pending.load(Ordering::SeqCst),
738 }
739 }
740}
741
742#[derive(Clone)]
743pub struct Runtime {
744 inner: Rc<RuntimeInner>,
745}
746
747impl Runtime {
748 pub fn new(scheduler: Arc<dyn RuntimeScheduler>) -> Self {
749 let inner = Rc::new(RuntimeInner::new(scheduler));
750 RuntimeInner::init_task_waker(&inner);
751 let runtime = Self { inner };
752 let handle = runtime.handle();
753 register_runtime_handle(&handle);
754 LAST_RUNTIME.with(|slot| *slot.borrow_mut() = Some(handle));
755 runtime
756 }
757
758 pub fn handle(&self) -> RuntimeHandle {
759 RuntimeHandle {
760 inner: Rc::downgrade(&self.inner),
761 dispatcher: UiDispatcher::new(self.inner.ui_dispatcher.clone()),
762 ui_thread_id: self.inner.ui_thread_id,
763 id: self.inner.runtime_id,
764 }
765 }
766
767 pub fn has_updates(&self) -> bool {
768 self.inner.has_updates()
769 }
770
771 pub fn needs_frame(&self) -> bool {
772 *self.inner.needs_frame.borrow() || self.inner.ui_dispatcher.has_pending()
773 }
774
775 pub fn set_needs_frame(&self, value: bool) {
776 *self.inner.needs_frame.borrow_mut() = value;
777 }
778
779 #[cfg(any(feature = "internal", test))]
780 pub fn frame_clock(&self) -> FrameClock {
781 FrameClock::new(self.handle())
782 }
783}
784
785impl Drop for Runtime {
786 fn drop(&mut self) {
787 if Rc::strong_count(&self.inner) != 1 {
788 return;
789 }
790 unregister_runtime_handle(self.inner.runtime_id);
791 LAST_RUNTIME.with(|slot| {
792 let should_clear = slot
793 .borrow()
794 .as_ref()
795 .is_some_and(|handle| handle.id() == self.inner.runtime_id);
796 if should_clear {
797 *slot.borrow_mut() = None;
798 }
799 });
800 }
801}
802
803#[derive(Default)]
804pub struct DefaultScheduler;
805
806impl RuntimeScheduler for DefaultScheduler {
807 fn schedule_frame(&self) {}
808}
809
810#[cfg(test)]
811#[derive(Default)]
812pub struct TestScheduler;
813
814#[cfg(test)]
815impl RuntimeScheduler for TestScheduler {
816 fn schedule_frame(&self) {}
817}
818
819#[cfg(test)]
820pub struct TestRuntime {
821 runtime: Runtime,
822}
823
824#[cfg(test)]
825impl Default for TestRuntime {
826 fn default() -> Self {
827 Self::new()
828 }
829}
830
831#[cfg(test)]
832impl TestRuntime {
833 pub fn new() -> Self {
834 Self {
835 runtime: Runtime::new(Arc::new(TestScheduler)),
836 }
837 }
838
839 pub fn handle(&self) -> RuntimeHandle {
840 self.runtime.handle()
841 }
842}
843
844#[derive(Clone)]
845pub struct RuntimeHandle {
846 inner: Weak<RuntimeInner>,
847 dispatcher: UiDispatcher,
848 ui_thread_id: ThreadId,
849 id: RuntimeId,
850}
851
852pub struct TaskHandle {
853 id: u64,
854 runtime: RuntimeHandle,
855}
856
857struct DeferredStateRelease {
858 runtime: RuntimeHandle,
859 id: StateId,
860}
861
862pub(crate) struct StateHandleLease {
863 id: StateId,
864 runtime: RuntimeHandle,
865}
866
867impl StateHandleLease {
868 pub(crate) fn id(&self) -> StateId {
869 self.id
870 }
871
872 pub(crate) fn runtime(&self) -> RuntimeHandle {
873 self.runtime.clone()
874 }
875}
876
877impl Drop for StateHandleLease {
878 fn drop(&mut self) {
879 defer_state_release(self.runtime.clone(), self.id);
880 }
881}
882
883impl RuntimeHandle {
884 pub fn id(&self) -> RuntimeId {
885 self.id
886 }
887
888 pub(crate) fn alloc_state<T: Clone + 'static>(&self, value: T) -> Rc<StateHandleLease> {
889 let id = self.with_state_arena(|arena| arena.alloc(value, self.clone()));
890 let lease = Rc::new(StateHandleLease {
891 id,
892 runtime: self.clone(),
893 });
894 self.with_state_arena(|arena| arena.register_lease(id, &lease));
895 lease
896 }
897
898 pub(crate) fn alloc_state_with_policy<T: Clone + 'static>(
899 &self,
900 value: T,
901 policy: Arc<dyn MutationPolicy<T>>,
902 ) -> Rc<StateHandleLease> {
903 let id =
904 self.with_state_arena(|arena| arena.alloc_with_policy(value, self.clone(), policy));
905 let lease = Rc::new(StateHandleLease {
906 id,
907 runtime: self.clone(),
908 });
909 self.with_state_arena(|arena| arena.register_lease(id, &lease));
910 lease
911 }
912
913 pub(crate) fn alloc_persistent_state<T: Clone + 'static>(
914 &self,
915 value: T,
916 ) -> crate::MutableState<T> {
917 let lease = self.alloc_state(value);
918 if let Some(inner) = self.inner.upgrade() {
919 inner
920 .external_state_owners
921 .borrow_mut()
922 .insert(lease.id(), Rc::clone(&lease));
923 }
924 crate::MutableState::from_lease(&lease)
925 }
926
927 pub(crate) fn retain_state_lease(&self, id: StateId) -> Option<Rc<StateHandleLease>> {
928 self.with_state_arena(|arena| arena.retain_lease(id))
929 }
930
931 pub(crate) fn with_state_arena<R>(&self, f: impl FnOnce(&StateArena) -> R) -> R {
932 self.inner
933 .upgrade()
934 .map(|inner| f(&inner.state_arena))
935 .unwrap_or_else(|| panic!("runtime dropped"))
936 }
937
938 fn release_state_immediate(&self, id: StateId) {
939 if let Some(inner) = self.inner.upgrade() {
940 inner.state_arena.release(id);
941 }
942 }
943
944 pub fn state_arena_stats(&self) -> (usize, usize) {
945 self.with_state_arena(StateArena::stats)
946 }
947
948 pub fn state_arena_debug_stats(&self) -> StateArenaDebugStats {
949 self.with_state_arena(StateArena::debug_stats)
950 }
951
952 pub fn debug_stats(&self) -> RuntimeDebugStats {
953 self.inner
954 .upgrade()
955 .map(|inner| inner.debug_stats())
956 .unwrap_or_default()
957 }
958
959 pub(crate) fn unregister_state_scope(&self, id: StateId, scope_id: ScopeId) {
960 if let Some(inner) = self.inner.upgrade() {
961 inner.state_arena.unregister_scope(id, scope_id);
962 }
963 }
964
965 pub fn schedule(&self) {
966 if let Some(inner) = self.inner.upgrade() {
967 inner.schedule();
968 }
969 }
970
971 pub(crate) fn enqueue_node_update(&self, command: Command) {
972 if let Some(inner) = self.inner.upgrade() {
973 inner.enqueue_update(command);
974 }
975 }
976
977 pub fn enqueue_ui_task(&self, task: Box<dyn FnOnce() + 'static>) {
984 if let Some(inner) = self.inner.upgrade() {
985 inner.enqueue_ui_task(task);
986 } else {
987 task();
988 }
989 }
990
991 pub fn spawn_ui<F>(&self, fut: F) -> Option<TaskHandle>
992 where
993 F: Future<Output = ()> + 'static,
994 {
995 self.inner.upgrade().map(|inner| {
996 let id = inner.spawn_ui_task(Box::pin(fut));
997 TaskHandle {
998 id,
999 runtime: self.clone(),
1000 }
1001 })
1002 }
1003
1004 pub fn cancel_task(&self, id: u64) {
1005 if let Some(inner) = self.inner.upgrade() {
1006 inner.cancel_task(id);
1007 }
1008 }
1009
1010 pub fn post_ui(&self, task: impl FnOnce() + Send + 'static) {
1015 self.dispatcher.post(task);
1016 }
1017
1018 pub fn register_ui_cont<T: 'static>(&self, f: impl FnOnce(T) + 'static) -> Option<u64> {
1019 self.inner.upgrade().map(|inner| inner.register_ui_cont(f))
1020 }
1021
1022 pub fn cancel_ui_cont(&self, id: u64) {
1023 if let Some(inner) = self.inner.upgrade() {
1024 inner.cancel_ui_cont(id);
1025 }
1026 }
1027
1028 pub fn drain_ui(&self) {
1029 if let Some(inner) = self.inner.upgrade() {
1030 inner.drain_ui();
1031 }
1032 }
1033
1034 pub fn has_pending_ui(&self) -> bool {
1035 self.inner
1036 .upgrade()
1037 .map(|inner| inner.has_pending_ui())
1038 .unwrap_or_else(|| self.dispatcher.has_pending())
1039 }
1040
1041 pub fn register_frame_callback(
1042 &self,
1043 callback: impl FnOnce(u64) + 'static,
1044 ) -> Option<FrameCallbackId> {
1045 self.inner
1046 .upgrade()
1047 .map(|inner| inner.register_frame_callback(Box::new(callback)))
1048 }
1049
1050 pub fn cancel_frame_callback(&self, id: FrameCallbackId) {
1051 if let Some(inner) = self.inner.upgrade() {
1052 inner.cancel_frame_callback(id);
1053 }
1054 }
1055
1056 pub fn drain_frame_callbacks(&self, frame_time_nanos: u64) {
1057 if let Some(inner) = self.inner.upgrade() {
1058 inner.drain_frame_callbacks(frame_time_nanos);
1059 }
1060 }
1061
1062 #[cfg(any(feature = "internal", test))]
1063 pub fn frame_clock(&self) -> FrameClock {
1064 FrameClock::new(self.clone())
1065 }
1066
1067 pub fn set_needs_frame(&self, value: bool) {
1068 if let Some(inner) = self.inner.upgrade() {
1069 *inner.needs_frame.borrow_mut() = value;
1070 }
1071 }
1072
1073 pub(crate) fn take_updates(&self) -> Vec<Command> {
1074 self.inner
1075 .upgrade()
1076 .map(|inner| inner.take_updates())
1077 .unwrap_or_default()
1078 }
1079
1080 pub fn has_updates(&self) -> bool {
1081 self.inner
1082 .upgrade()
1083 .map(|inner| inner.has_updates())
1084 .unwrap_or(false)
1085 }
1086
1087 pub(crate) fn mark_scope_recomposed(&self, id: ScopeId) {
1088 if let Some(inner) = self.inner.upgrade() {
1089 inner.mark_scope_recomposed(id);
1090 }
1091 }
1092
1093 pub(crate) fn register_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
1094 if let Some(inner) = self.inner.upgrade() {
1095 inner.register_invalid_scope(id, scope);
1096 }
1097 }
1098
1099 pub(crate) fn take_invalidated_scopes(&self) -> Vec<(ScopeId, Weak<RecomposeScopeInner>)> {
1100 self.inner
1101 .upgrade()
1102 .map(|inner| inner.take_invalidated_scopes())
1103 .unwrap_or_default()
1104 }
1105
1106 pub fn has_invalid_scopes(&self) -> bool {
1107 self.inner
1108 .upgrade()
1109 .map(|inner| inner.has_invalid_scopes())
1110 .unwrap_or(false)
1111 }
1112
1113 #[doc(hidden)]
1114 pub fn debug_invalid_scope_ids(&self) -> Vec<usize> {
1115 self.inner
1116 .upgrade()
1117 .map(|inner| inner.invalid_scopes.borrow().iter().copied().collect())
1118 .unwrap_or_default()
1119 }
1120
1121 pub fn has_frame_callbacks(&self) -> bool {
1122 self.inner
1123 .upgrade()
1124 .map(|inner| inner.has_frame_callbacks())
1125 .unwrap_or(false)
1126 }
1127
1128 pub fn assert_ui_thread(&self) {
1129 debug_assert_eq!(
1130 std::thread::current().id(),
1131 self.ui_thread_id,
1132 "state mutated off the runtime's UI thread"
1133 );
1134 }
1135
1136 pub fn dispatcher(&self) -> UiDispatcher {
1137 self.dispatcher.clone()
1138 }
1139
1140 #[doc(hidden)]
1141 pub fn with_deferred_state_releases<R>(&self, f: impl FnOnce() -> R) -> R {
1142 let _scope = enter_state_teardown_scope();
1143 f()
1144 }
1145}
1146
1147impl TaskHandle {
1148 pub fn cancel(self) {
1149 self.runtime.cancel_task(self.id);
1150 }
1151}
1152
1153pub(crate) struct FrameCallbackEntry {
1154 id: FrameCallbackId,
1155 callback: Option<Box<dyn FnOnce(u64) + 'static>>,
1156}
1157
1158struct RuntimeTaskWaker {
1159 scheduler: Arc<dyn RuntimeScheduler>,
1160}
1161
1162impl RuntimeTaskWaker {
1163 fn new(inner: Weak<RuntimeInner>) -> Self {
1164 let scheduler = inner
1167 .upgrade()
1168 .map(|rc| rc.scheduler.clone())
1169 .expect("RuntimeInner dropped before waker created");
1170 Self { scheduler }
1171 }
1172
1173 fn into_waker(self) -> Waker {
1174 futures_task::waker(Arc::new(self))
1175 }
1176}
1177
1178impl futures_task::ArcWake for RuntimeTaskWaker {
1179 fn wake_by_ref(arc_self: &Arc<Self>) {
1180 arc_self.scheduler.schedule_frame();
1181 }
1182}
1183
1184thread_local! {
1185 static ACTIVE_RUNTIMES: RefCell<Vec<RuntimeHandle>> = const { RefCell::new(Vec::new()) };
1186 static LAST_RUNTIME: RefCell<Option<RuntimeHandle>> = const { RefCell::new(None) };
1187 static REGISTERED_RUNTIMES: RefCell<HashMap<RuntimeId, RuntimeHandle>> = RefCell::new(HashMap::default());
1188 static STATE_TEARDOWN_DEPTH: Cell<usize> = const { Cell::new(0) };
1189 static DEFERRED_STATE_RELEASES: RefCell<Vec<DeferredStateRelease>> = const { RefCell::new(Vec::new()) };
1190}
1191
1192#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1193pub struct RuntimeThreadLocalDebugStats {
1194 pub active_runtimes_len: usize,
1195 pub active_runtimes_cap: usize,
1196 pub registered_runtimes_len: usize,
1197 pub registered_runtimes_cap: usize,
1198 pub deferred_state_releases_len: usize,
1199 pub deferred_state_releases_cap: usize,
1200}
1201
1202pub fn current_runtime_handle() -> Option<RuntimeHandle> {
1207 if let Some(handle) = ACTIVE_RUNTIMES.with(|stack| stack.borrow().last().cloned()) {
1208 return Some(handle);
1209 }
1210 LAST_RUNTIME.with(|slot| slot.borrow().clone())
1211}
1212
1213pub(crate) fn runtime_handle_by_id(id: RuntimeId) -> Option<RuntimeHandle> {
1214 REGISTERED_RUNTIMES.with(|registry| registry.borrow().get(&id).cloned())
1215}
1216
1217pub fn debug_runtime_thread_local_stats() -> RuntimeThreadLocalDebugStats {
1218 let (active_runtimes_len, active_runtimes_cap) = ACTIVE_RUNTIMES.with(|stack| {
1219 let stack = stack.borrow();
1220 (stack.len(), stack.capacity())
1221 });
1222 let (registered_runtimes_len, registered_runtimes_cap) = REGISTERED_RUNTIMES.with(|registry| {
1223 let registry = registry.borrow();
1224 (registry.len(), registry.capacity())
1225 });
1226 let (deferred_state_releases_len, deferred_state_releases_cap) =
1227 DEFERRED_STATE_RELEASES.with(|releases| {
1228 let releases = releases.borrow();
1229 (releases.len(), releases.capacity())
1230 });
1231
1232 RuntimeThreadLocalDebugStats {
1233 active_runtimes_len,
1234 active_runtimes_cap,
1235 registered_runtimes_len,
1236 registered_runtimes_cap,
1237 deferred_state_releases_len,
1238 deferred_state_releases_cap,
1239 }
1240}
1241
1242fn register_runtime_handle(handle: &RuntimeHandle) {
1243 REGISTERED_RUNTIMES.with(|registry| {
1244 registry.borrow_mut().insert(handle.id(), handle.clone());
1245 });
1246}
1247
1248fn unregister_runtime_handle(id: RuntimeId) {
1249 REGISTERED_RUNTIMES.with(|registry| {
1250 registry.borrow_mut().remove(&id);
1251 });
1252}
1253
1254fn defer_state_release(runtime: RuntimeHandle, id: StateId) {
1255 let teardown_active = STATE_TEARDOWN_DEPTH.with(|depth| depth.get() > 0);
1256 if teardown_active {
1257 DEFERRED_STATE_RELEASES.with(|releases| {
1258 releases
1259 .borrow_mut()
1260 .push(DeferredStateRelease { runtime, id });
1261 });
1262 } else {
1263 runtime.release_state_immediate(id);
1264 }
1265}
1266
1267fn flush_deferred_state_releases() {
1268 DEFERRED_STATE_RELEASES.with(|releases| {
1269 let mut releases = releases.borrow_mut();
1270 while let Some(deferred) = releases.pop() {
1271 deferred.runtime.release_state_immediate(deferred.id);
1272 }
1273 });
1274}
1275
1276pub(crate) struct StateTeardownScope;
1277
1278pub(crate) fn enter_state_teardown_scope() -> StateTeardownScope {
1279 STATE_TEARDOWN_DEPTH.with(|depth| depth.set(depth.get() + 1));
1280 StateTeardownScope
1281}
1282
1283impl Drop for StateTeardownScope {
1284 fn drop(&mut self) {
1285 STATE_TEARDOWN_DEPTH.with(|depth| {
1286 let next = depth.get().saturating_sub(1);
1287 depth.set(next);
1288 if next == 0 {
1289 flush_deferred_state_releases();
1290 }
1291 });
1292 }
1293}
1294
1295pub(crate) fn push_active_runtime(handle: &RuntimeHandle) {
1296 register_runtime_handle(handle);
1297 ACTIVE_RUNTIMES.with(|stack| stack.borrow_mut().push(handle.clone()));
1298 LAST_RUNTIME.with(|slot| *slot.borrow_mut() = Some(handle.clone()));
1299}
1300
1301pub(crate) fn pop_active_runtime() {
1302 ACTIVE_RUNTIMES.with(|stack| {
1303 stack.borrow_mut().pop();
1304 });
1305}
1306
1307pub fn schedule_frame() {
1309 if let Some(handle) = current_runtime_handle() {
1310 handle.schedule();
1311 return;
1312 }
1313 panic!("no runtime available to schedule frame");
1314}
1315
1316pub fn schedule_node_update(
1318 update: impl FnOnce(&mut dyn Applier) -> Result<(), NodeError> + 'static,
1319) {
1320 let handle = current_runtime_handle().expect("no runtime available to schedule node update");
1321 handle.enqueue_node_update(Command::callback(update));
1322}