1use crate::collections::map::HashMap;
2use crate::collections::map::HashSet;
3use crate::state::{MutationPolicy, NeverEqual};
4use crate::MutableStateInner;
5use std::any::Any;
6use std::cell::{Cell, RefCell};
7use std::collections::VecDeque;
8use std::future::Future;
9use std::pin::Pin;
10use std::rc::{Rc, Weak};
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::sync::{mpsc, Arc};
13use std::task::{Context, Poll, Waker};
14use std::thread::ThreadId;
15use std::thread_local;
16
17#[cfg(any(feature = "internal", test))]
18use crate::frame_clock::FrameClock;
19use crate::platform::RuntimeScheduler;
20use crate::{Applier, Command, FrameCallbackId, NodeError, RecomposeScopeInner, ScopeId};
21
22enum UiMessage {
23 Task(Box<dyn FnOnce() + Send + 'static>),
24 Invoke { id: u64, value: Box<dyn Any + Send> },
25}
26
27type UiContinuation = Box<dyn Fn(Box<dyn Any>) -> bool + 'static>;
28type UiContinuationMap = HashMap<u64, UiContinuation>;
29
30struct TypedStateCell<T: Clone + 'static> {
31 inner: MutableStateInner<T>,
32}
33
34trait ScopeWatchCell {
35 fn unregister_scope(&self, scope_id: ScopeId);
36}
37
38impl<T: Clone + 'static> ScopeWatchCell for TypedStateCell<T> {
39 fn unregister_scope(&self, scope_id: ScopeId) {
40 self.inner.unregister_scope(scope_id);
41 }
42}
43
44struct StateArenaSlot {
45 generation: u32,
46 cell: Option<Rc<dyn Any>>,
47 watcher_cell: Option<Rc<dyn ScopeWatchCell>>,
48 lease: Option<Weak<StateHandleLease>>,
49}
50
51#[derive(Default)]
52struct StateArenaInner {
53 cells: Vec<StateArenaSlot>,
54 free: Vec<u32>,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
58pub struct StateArenaDebugStats {
59 pub cells_len: usize,
60 pub cells_cap: usize,
61 pub free_len: usize,
62 pub free_cap: usize,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
66pub struct RuntimeDebugStats {
67 pub node_updates_len: usize,
68 pub node_updates_cap: usize,
69 pub invalid_scopes_len: usize,
70 pub invalid_scopes_cap: usize,
71 pub scope_queue_len: usize,
72 pub scope_queue_cap: usize,
73 pub frame_callbacks_len: usize,
74 pub frame_callbacks_cap: usize,
75 pub local_tasks_len: usize,
76 pub local_tasks_cap: usize,
77 pub ui_conts_len: usize,
78 pub ui_conts_cap: usize,
79 pub tasks_len: usize,
80 pub tasks_cap: usize,
81 pub external_state_owners_len: usize,
82 pub external_state_owners_cap: usize,
83 pub ui_dispatcher_pending: usize,
84}
85
86#[derive(Default)]
87pub(crate) struct StateArena {
88 inner: RefCell<StateArenaInner>,
89}
90
91impl StateArena {
92 pub(crate) fn alloc<T: Clone + 'static>(&self, value: T, runtime: RuntimeHandle) -> StateId {
93 self.alloc_with_policy(value, runtime, Arc::new(NeverEqual))
94 }
95
96 pub(crate) fn alloc_with_policy<T: Clone + 'static>(
97 &self,
98 value: T,
99 runtime: RuntimeHandle,
100 policy: Arc<dyn MutationPolicy<T>>,
101 ) -> StateId {
102 let (slot, generation) = {
103 let mut inner = self.inner.borrow_mut();
104 loop {
105 let Some(slot) = inner.free.pop() else {
106 let slot = inner.cells.len() as u32;
107 inner.cells.push(StateArenaSlot {
108 generation: 0,
109 cell: None,
110 watcher_cell: None,
111 lease: None,
112 });
113 break (slot, 0);
114 };
115
116 let Some(entry) = inner.cells.get_mut(slot as usize) else {
117 continue;
118 };
119 if entry.cell.is_some() {
120 continue;
121 }
122
123 entry.watcher_cell = None;
124 entry.lease = None;
125 entry.generation = entry.generation.wrapping_add(1);
126 break (slot, entry.generation);
127 }
128 };
129 let id = StateId::new(slot, generation);
130 let inner = MutableStateInner::new_with_policy(value, runtime.clone(), policy);
131 inner.install_snapshot_observer(id);
132 let typed_cell = Rc::new(TypedStateCell { inner });
133 let cell: Rc<dyn Any> = typed_cell.clone();
134 let watcher_cell: Rc<dyn ScopeWatchCell> = typed_cell;
135 let mut arena = self.inner.borrow_mut();
136 let slot_entry = &mut arena.cells[slot as usize];
137 slot_entry.cell = Some(cell);
138 slot_entry.watcher_cell = Some(watcher_cell);
139 id
140 }
141
142 fn get_cell_opt(&self, id: StateId) -> Option<Rc<dyn Any>> {
143 self.inner
144 .borrow()
145 .cells
146 .get(id.slot_index())
147 .filter(|cell| cell.generation == id.generation())
148 .and_then(|cell| cell.cell.as_ref())
149 .cloned()
150 }
151
152 fn get_typed<T: Clone + 'static>(&self, id: StateId) -> Rc<TypedStateCell<T>> {
153 match self.get_cell_opt(id) {
154 None => panic!(
155 "state cell missing: slot={}, gen={}, expected={}",
156 id.slot(),
157 id.generation(),
158 std::any::type_name::<T>(),
159 ),
160 Some(cell) => Rc::downcast::<TypedStateCell<T>>(cell).unwrap_or_else(|_| {
161 panic!(
162 "state cell type mismatch: slot={}, gen={}, expected={}",
163 id.slot(),
164 id.generation(),
165 std::any::type_name::<T>(),
166 )
167 }),
168 }
169 }
170
171 fn get_typed_opt<T: Clone + 'static>(&self, id: StateId) -> Option<Rc<TypedStateCell<T>>> {
172 Rc::downcast::<TypedStateCell<T>>(self.get_cell_opt(id)?).ok()
173 }
174
175 pub(crate) fn with_typed<T: Clone + 'static, R>(
176 &self,
177 id: StateId,
178 f: impl FnOnce(&MutableStateInner<T>) -> R,
179 ) -> R {
180 let cell = self.get_typed::<T>(id);
181 f(&cell.inner)
182 }
183
184 pub(crate) fn with_typed_opt<T: Clone + 'static, R>(
185 &self,
186 id: StateId,
187 f: impl FnOnce(&MutableStateInner<T>) -> R,
188 ) -> Option<R> {
189 let cell = self.get_typed_opt::<T>(id)?;
190 Some(f(&cell.inner))
191 }
192
193 pub(crate) fn release(&self, id: StateId) {
194 let cell = {
195 let mut inner = self.inner.borrow_mut();
196 let Some(slot) = inner.cells.get_mut(id.slot_index()) else {
197 return;
198 };
199 if slot.generation != id.generation() {
200 return;
201 }
202 slot.lease = None;
203 slot.watcher_cell = None;
204 let cell = slot.cell.take();
205 if cell.is_some() {
206 inner.free.push(id.slot());
207 }
208 cell
209 };
210 drop(cell);
211 }
212
213 pub(crate) fn stats(&self) -> (usize, usize) {
214 let inner = self.inner.borrow();
215 (inner.cells.len(), inner.free.len())
216 }
217
218 pub(crate) fn debug_stats(&self) -> StateArenaDebugStats {
219 let inner = self.inner.borrow();
220 StateArenaDebugStats {
221 cells_len: inner.cells.len(),
222 cells_cap: inner.cells.capacity(),
223 free_len: inner.free.len(),
224 free_cap: inner.free.capacity(),
225 }
226 }
227
228 pub(crate) fn unregister_scope(&self, id: StateId, scope_id: ScopeId) {
229 let watcher_cell = {
230 let inner = self.inner.borrow();
231 inner
232 .cells
233 .get(id.slot_index())
234 .filter(|slot| slot.generation == id.generation())
235 .and_then(|slot| slot.watcher_cell.as_ref())
236 .cloned()
237 };
238 if let Some(watcher_cell) = watcher_cell {
239 watcher_cell.unregister_scope(scope_id);
240 }
241 }
242
243 pub(crate) fn register_lease(&self, id: StateId, lease: &Rc<StateHandleLease>) {
244 let mut inner = self.inner.borrow_mut();
245 let Some(slot) = inner.cells.get_mut(id.slot_index()) else {
246 return;
247 };
248 if slot.generation != id.generation() {
249 return;
250 }
251 slot.lease = Some(Rc::downgrade(lease));
252 }
253
254 pub(crate) fn retain_lease(&self, id: StateId) -> Option<Rc<StateHandleLease>> {
255 let inner = self.inner.borrow();
256 let slot = inner.cells.get(id.slot_index())?;
257 if slot.generation != id.generation() {
258 return None;
259 }
260 slot.lease.as_ref()?.upgrade()
261 }
262}
263
264#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
265pub struct StateId {
266 slot: u32,
267 generation: u32,
268}
269
270impl StateId {
271 const fn new(slot: u32, generation: u32) -> Self {
272 Self { slot, generation }
273 }
274
275 pub(crate) const fn slot(self) -> u32 {
276 self.slot
277 }
278
279 pub(crate) const fn slot_index(self) -> usize {
280 self.slot as usize
281 }
282
283 pub(crate) const fn generation(self) -> u32 {
284 self.generation
285 }
286}
287
288#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
289pub struct RuntimeId(u32);
290
291impl RuntimeId {
292 fn next() -> Self {
293 NEXT_RUNTIME_ID.with(|next| {
294 let id = next.get();
295 next.set(id.wrapping_add(1));
296 Self(id)
297 })
298 }
299}
300
301struct UiDispatcherInner {
302 scheduler: Arc<dyn RuntimeScheduler>,
303 tx: mpsc::Sender<UiMessage>,
304 pending: AtomicUsize,
305}
306
307impl UiDispatcherInner {
308 fn new(scheduler: Arc<dyn RuntimeScheduler>, tx: mpsc::Sender<UiMessage>) -> Self {
309 Self {
310 scheduler,
311 tx,
312 pending: AtomicUsize::new(0),
313 }
314 }
315
316 fn post(&self, task: impl FnOnce() + Send + 'static) {
317 self.pending.fetch_add(1, Ordering::SeqCst);
318 if self.tx.send(UiMessage::Task(Box::new(task))).is_ok() {
319 self.scheduler.schedule_frame();
320 } else {
321 self.pending.fetch_sub(1, Ordering::SeqCst);
322 }
323 }
324
325 fn post_invoke(&self, id: u64, value: Box<dyn Any + Send>) {
326 self.pending.fetch_add(1, Ordering::SeqCst);
327 if self.tx.send(UiMessage::Invoke { id, value }).is_ok() {
328 self.scheduler.schedule_frame();
329 } else {
330 self.pending.fetch_sub(1, Ordering::SeqCst);
331 }
332 }
333
334 fn has_pending(&self) -> bool {
335 self.pending.load(Ordering::SeqCst) > 0
336 }
337}
338
339struct PendingGuard<'a> {
340 counter: &'a AtomicUsize,
341}
342
343impl<'a> PendingGuard<'a> {
344 fn new(counter: &'a AtomicUsize) -> Self {
345 Self { counter }
346 }
347}
348
349impl<'a> Drop for PendingGuard<'a> {
350 fn drop(&mut self) {
351 let mut current = self.counter.load(Ordering::SeqCst);
352 loop {
353 if current == 0 {
354 return;
355 }
356 match self.counter.compare_exchange(
357 current,
358 current - 1,
359 Ordering::SeqCst,
360 Ordering::SeqCst,
361 ) {
362 Ok(_) => return,
363 Err(next) => current = next,
364 }
365 }
366 }
367}
368
369#[derive(Clone)]
370pub struct UiDispatcher {
371 inner: Arc<UiDispatcherInner>,
372}
373
374impl UiDispatcher {
375 fn new(inner: Arc<UiDispatcherInner>) -> Self {
376 Self { inner }
377 }
378
379 pub fn post(&self, task: impl FnOnce() + Send + 'static) {
380 self.inner.post(task);
381 }
382
383 pub fn post_invoke<T>(&self, id: u64, value: T)
384 where
385 T: Send + 'static,
386 {
387 self.inner.post_invoke(id, Box::new(value));
388 }
389
390 pub fn has_pending(&self) -> bool {
391 self.inner.has_pending()
392 }
393}
394
395struct RuntimeInner {
396 scheduler: Arc<dyn RuntimeScheduler>,
397 needs_frame: RefCell<bool>,
398 node_updates: RefCell<Vec<Command>>,
399 invalid_scopes: RefCell<HashSet<ScopeId>>,
400 scope_queue: RefCell<Vec<(ScopeId, Weak<RecomposeScopeInner>)>>,
401 frame_callbacks: RefCell<VecDeque<FrameCallbackEntry>>,
402 next_frame_callback_id: Cell<u64>,
403 ui_dispatcher: Arc<UiDispatcherInner>,
404 ui_rx: RefCell<mpsc::Receiver<UiMessage>>,
405 local_tasks: RefCell<VecDeque<Box<dyn FnOnce() + 'static>>>,
406 ui_conts: RefCell<UiContinuationMap>,
407 next_cont_id: Cell<u64>,
408 ui_thread_id: ThreadId,
409 tasks: RefCell<Vec<TaskEntry>>,
410 next_task_id: Cell<u64>,
411 task_waker: RefCell<Option<Waker>>,
412 state_arena: StateArena,
413 external_state_owners: RefCell<HashMap<StateId, Rc<StateHandleLease>>>,
414 live_recompose_scope_count: Cell<usize>,
415 runtime_id: RuntimeId,
416}
417
418struct TaskEntry {
419 id: u64,
420 future: Pin<Box<dyn Future<Output = ()> + 'static>>,
421}
422
423impl RuntimeInner {
424 fn new(scheduler: Arc<dyn RuntimeScheduler>) -> Self {
425 let (tx, rx) = mpsc::channel();
426 let dispatcher = Arc::new(UiDispatcherInner::new(scheduler.clone(), tx));
427 Self {
428 scheduler,
429 needs_frame: RefCell::new(false),
430 node_updates: RefCell::new(Vec::new()),
431 invalid_scopes: RefCell::new(HashSet::default()),
432 scope_queue: RefCell::new(Vec::new()),
433 frame_callbacks: RefCell::new(VecDeque::new()),
434 next_frame_callback_id: Cell::new(1),
435 ui_dispatcher: dispatcher,
436 ui_rx: RefCell::new(rx),
437 local_tasks: RefCell::new(VecDeque::new()),
438 ui_conts: RefCell::new(UiContinuationMap::default()),
439 next_cont_id: Cell::new(1),
440 ui_thread_id: std::thread::current().id(),
441 tasks: RefCell::new(Vec::new()),
442 next_task_id: Cell::new(1),
443 task_waker: RefCell::new(None),
444 state_arena: StateArena::default(),
445 external_state_owners: RefCell::new(HashMap::default()),
446 live_recompose_scope_count: Cell::new(0),
447 runtime_id: RuntimeId::next(),
448 }
449 }
450
451 fn init_task_waker(this: &Rc<Self>) {
452 let waker = RuntimeTaskWaker::new(this).into_waker();
453 *this.task_waker.borrow_mut() = Some(waker);
454 }
455
456 fn schedule(&self) {
457 *self.needs_frame.borrow_mut() = true;
458 self.scheduler.schedule_frame();
459 }
460
461 fn enqueue_update(&self, command: Command) {
462 self.node_updates.borrow_mut().push(command);
463 self.schedule(); }
465
466 fn take_updates(&self) -> Vec<Command> {
467 let updates = self.node_updates.borrow_mut().drain(..).collect::<Vec<_>>();
468 updates
469 }
470
471 fn has_updates(&self) -> bool {
472 !self.node_updates.borrow().is_empty() || self.has_invalid_scopes()
473 }
474
475 fn register_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
476 let mut invalid = self.invalid_scopes.borrow_mut();
477 if invalid.insert(id) {
478 self.scope_queue.borrow_mut().push((id, scope));
479 self.schedule();
480 }
481 }
482
483 fn requeue_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
484 if self.invalid_scopes.borrow().contains(&id) {
485 self.scope_queue.borrow_mut().push((id, scope));
486 self.schedule();
487 }
488 }
489
490 fn mark_scope_recomposed(&self, id: ScopeId) {
491 self.invalid_scopes.borrow_mut().remove(&id);
492 }
493
494 fn take_invalidated_scopes(&self) -> Vec<(ScopeId, Weak<RecomposeScopeInner>)> {
495 let mut queue = self.scope_queue.borrow_mut();
496 if queue.is_empty() {
497 return Vec::new();
498 }
499 let pending: Vec<_> = queue.drain(..).collect();
500 drop(queue);
501 let invalid = self.invalid_scopes.borrow();
502 pending
503 .into_iter()
504 .filter(|(id, _)| invalid.contains(id))
505 .collect()
506 }
507
508 fn has_invalid_scopes(&self) -> bool {
509 !self.invalid_scopes.borrow().is_empty()
510 }
511
512 fn increment_live_recompose_scope_count(&self) {
513 self.live_recompose_scope_count
514 .set(self.live_recompose_scope_count.get().saturating_add(1));
515 }
516
517 fn decrement_live_recompose_scope_count(&self) {
518 self.live_recompose_scope_count
519 .set(self.live_recompose_scope_count.get().saturating_sub(1));
520 }
521
522 fn live_recompose_scope_count(&self) -> usize {
523 self.live_recompose_scope_count.get()
524 }
525
526 fn has_frame_callbacks(&self) -> bool {
527 !self.frame_callbacks.borrow().is_empty()
528 }
529
530 fn enqueue_ui_task(&self, task: Box<dyn FnOnce() + 'static>) {
535 self.local_tasks.borrow_mut().push_back(task);
536 self.schedule();
537 }
538
539 fn spawn_ui_task(&self, future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> u64 {
540 let id = self.next_task_id.get();
541 self.next_task_id.set(id + 1);
542 self.tasks.borrow_mut().push(TaskEntry { id, future });
543 self.schedule();
544 id
545 }
546
547 fn cancel_task(&self, id: u64) {
548 let mut tasks = self.tasks.borrow_mut();
549 if tasks.iter().any(|entry| entry.id == id) {
550 tasks.retain(|entry| entry.id != id);
551 }
552 }
553
554 fn poll_async_tasks(&self) -> bool {
555 let waker = match self.task_waker.borrow().as_ref() {
556 Some(waker) => waker.clone(),
557 None => return false,
558 };
559 let mut cx = Context::from_waker(&waker);
560 let mut tasks_ref = self.tasks.borrow_mut();
561 let tasks = std::mem::take(&mut *tasks_ref);
562 drop(tasks_ref);
563 let mut pending = Vec::with_capacity(tasks.len());
564 let mut made_progress = false;
565 for mut entry in tasks.into_iter() {
566 match entry.future.as_mut().poll(&mut cx) {
567 Poll::Ready(()) => {
568 made_progress = true;
569 }
570 Poll::Pending => {
571 pending.push(entry);
572 }
573 }
574 }
575 if !pending.is_empty() {
576 self.tasks.borrow_mut().extend(pending);
577 }
578 made_progress
579 }
580
581 fn drain_ui(&self) {
582 loop {
583 let mut executed = false;
584
585 {
586 let rx = &mut *self.ui_rx.borrow_mut();
587 for message in rx.try_iter() {
588 executed = true;
589 let _guard = PendingGuard::new(&self.ui_dispatcher.pending);
590 match message {
591 UiMessage::Task(task) => {
592 task();
593 }
594 UiMessage::Invoke { id, value } => {
595 self.invoke_ui_cont(id, value);
596 }
597 }
598 }
599 }
600
601 loop {
602 let task = {
603 let mut local = self.local_tasks.borrow_mut();
604 local.pop_front()
605 };
606
607 match task {
608 Some(task) => {
609 executed = true;
610 task();
611 }
612 None => break,
613 }
614 }
615
616 if self.poll_async_tasks() {
617 executed = true;
618 }
619
620 if !executed {
621 break;
622 }
623 }
624 }
625
626 fn has_pending_ui(&self) -> bool {
627 let local_pending = self
628 .local_tasks
629 .try_borrow()
630 .map(|tasks| !tasks.is_empty())
631 .unwrap_or(true);
632
633 let async_pending = self
634 .tasks
635 .try_borrow()
636 .map(|tasks| !tasks.is_empty())
637 .unwrap_or(true);
638
639 local_pending || self.ui_dispatcher.has_pending() || async_pending
640 }
641
642 fn register_ui_cont<T: 'static>(&self, f: impl FnOnce(T) + 'static) -> u64 {
643 debug_assert_eq!(
644 std::thread::current().id(),
645 self.ui_thread_id,
646 "UI continuation registered off the runtime thread",
647 );
648 let id = self.next_cont_id.get();
649 self.next_cont_id.set(id + 1);
650 let callback = RefCell::new(Some(f));
651 self.ui_conts.borrow_mut().insert(
652 id,
653 Box::new(move |value: Box<dyn Any>| {
654 let Ok(value) = value.downcast::<T>() else {
655 return false;
656 };
657 let Some(slot) = callback.borrow_mut().take() else {
658 return true;
659 };
660 slot(*value);
661 true
662 }),
663 );
664 id
665 }
666
667 fn invoke_ui_cont(&self, id: u64, value: Box<dyn Any + Send>) {
668 debug_assert_eq!(
669 std::thread::current().id(),
670 self.ui_thread_id,
671 "UI continuation invoked off the runtime thread",
672 );
673 let callback = { self.ui_conts.borrow_mut().remove(&id) };
674 if let Some(callback) = callback {
675 let value: Box<dyn Any> = value;
676 if !callback(value) {
677 self.ui_conts.borrow_mut().insert(id, callback);
678 }
679 }
680 }
681
682 fn cancel_ui_cont(&self, id: u64) {
683 self.ui_conts.borrow_mut().remove(&id);
684 }
685
686 fn register_frame_callback(&self, callback: Box<dyn FnOnce(u64) + 'static>) -> FrameCallbackId {
687 let id = self.next_frame_callback_id.get();
688 self.next_frame_callback_id.set(id + 1);
689 self.frame_callbacks
690 .borrow_mut()
691 .push_back(FrameCallbackEntry {
692 id,
693 callback: Some(callback),
694 });
695 self.schedule();
696 id
697 }
698
699 fn cancel_frame_callback(&self, id: FrameCallbackId) {
700 let mut callbacks = self.frame_callbacks.borrow_mut();
701 if let Some(index) = callbacks.iter().position(|entry| entry.id == id) {
702 callbacks.remove(index);
703 }
704 let callbacks_empty = callbacks.is_empty();
705 drop(callbacks);
706 let local_pending = self
707 .local_tasks
708 .try_borrow()
709 .map(|tasks| !tasks.is_empty())
710 .unwrap_or(true);
711 let async_pending = self
712 .tasks
713 .try_borrow()
714 .map(|tasks| !tasks.is_empty())
715 .unwrap_or(true);
716 if !self.has_invalid_scopes()
717 && !self.has_updates()
718 && callbacks_empty
719 && !local_pending
720 && !self.ui_dispatcher.has_pending()
721 && !async_pending
722 {
723 *self.needs_frame.borrow_mut() = false;
724 }
725 }
726
727 fn drain_frame_callbacks(&self, frame_time_nanos: u64) {
728 let mut callbacks = self.frame_callbacks.borrow_mut();
729 let mut pending: Vec<Box<dyn FnOnce(u64) + 'static>> = Vec::with_capacity(callbacks.len());
730 while let Some(mut entry) = callbacks.pop_front() {
731 if let Some(callback) = entry.callback.take() {
732 pending.push(callback);
733 }
734 }
735 drop(callbacks);
736
737 if !pending.is_empty() {
742 let _ = crate::run_in_mutable_snapshot(|| {
743 for callback in pending {
744 callback(frame_time_nanos);
745 }
746 });
747 }
748
749 if !self.has_invalid_scopes()
750 && !self.has_updates()
751 && !self.has_frame_callbacks()
752 && !self.has_pending_ui()
753 {
754 *self.needs_frame.borrow_mut() = false;
755 }
756 }
757
758 fn debug_stats(&self) -> RuntimeDebugStats {
759 let node_updates = self.node_updates.borrow();
760 let invalid_scopes = self.invalid_scopes.borrow();
761 let scope_queue = self.scope_queue.borrow();
762 let frame_callbacks = self.frame_callbacks.borrow();
763 let local_tasks = self.local_tasks.borrow();
764 let ui_conts = self.ui_conts.borrow();
765 let tasks = self.tasks.borrow();
766 let external_state_owners = self.external_state_owners.borrow();
767
768 RuntimeDebugStats {
769 node_updates_len: node_updates.len(),
770 node_updates_cap: node_updates.capacity(),
771 invalid_scopes_len: invalid_scopes.len(),
772 invalid_scopes_cap: invalid_scopes.capacity(),
773 scope_queue_len: scope_queue.len(),
774 scope_queue_cap: scope_queue.capacity(),
775 frame_callbacks_len: frame_callbacks.len(),
776 frame_callbacks_cap: frame_callbacks.capacity(),
777 local_tasks_len: local_tasks.len(),
778 local_tasks_cap: local_tasks.capacity(),
779 ui_conts_len: ui_conts.len(),
780 ui_conts_cap: ui_conts.capacity(),
781 tasks_len: tasks.len(),
782 tasks_cap: tasks.capacity(),
783 external_state_owners_len: external_state_owners.len(),
784 external_state_owners_cap: external_state_owners.capacity(),
785 ui_dispatcher_pending: self.ui_dispatcher.pending.load(Ordering::SeqCst),
786 }
787 }
788}
789
790#[derive(Clone)]
791pub struct Runtime {
792 inner: Rc<RuntimeInner>,
793}
794
795impl Runtime {
796 pub fn new(scheduler: Arc<dyn RuntimeScheduler>) -> Self {
797 let inner = Rc::new(RuntimeInner::new(scheduler));
798 RuntimeInner::init_task_waker(&inner);
799 let runtime = Self { inner };
800 let handle = runtime.handle();
801 register_runtime_handle(&handle);
802 LAST_RUNTIME.with(|slot| *slot.borrow_mut() = Some(handle));
803 runtime
804 }
805
806 pub fn handle(&self) -> RuntimeHandle {
807 RuntimeHandle {
808 inner: Rc::downgrade(&self.inner),
809 dispatcher: UiDispatcher::new(self.inner.ui_dispatcher.clone()),
810 ui_thread_id: self.inner.ui_thread_id,
811 id: self.inner.runtime_id,
812 }
813 }
814
815 pub fn has_updates(&self) -> bool {
816 self.inner.has_updates()
817 }
818
819 pub fn needs_frame(&self) -> bool {
820 *self.inner.needs_frame.borrow() || self.inner.ui_dispatcher.has_pending()
821 }
822
823 pub fn set_needs_frame(&self, value: bool) {
824 *self.inner.needs_frame.borrow_mut() = value;
825 }
826
827 #[cfg(any(feature = "internal", test))]
828 pub fn frame_clock(&self) -> FrameClock {
829 FrameClock::new(self.handle())
830 }
831}
832
833impl Drop for Runtime {
834 fn drop(&mut self) {
835 if Rc::strong_count(&self.inner) != 1 {
836 return;
837 }
838 unregister_runtime_handle(self.inner.runtime_id);
839 LAST_RUNTIME.with(|slot| {
840 let should_clear = slot
841 .borrow()
842 .as_ref()
843 .is_some_and(|handle| handle.id() == self.inner.runtime_id);
844 if should_clear {
845 *slot.borrow_mut() = None;
846 }
847 });
848 }
849}
850
851#[derive(Default)]
852pub struct DefaultScheduler;
853
854impl RuntimeScheduler for DefaultScheduler {
855 fn schedule_frame(&self) {}
856}
857
858#[cfg(test)]
859#[derive(Default)]
860pub struct TestScheduler;
861
862#[cfg(test)]
863impl RuntimeScheduler for TestScheduler {
864 fn schedule_frame(&self) {}
865}
866
867#[cfg(test)]
868pub struct TestRuntime {
869 runtime: Runtime,
870}
871
872#[cfg(test)]
873impl Default for TestRuntime {
874 fn default() -> Self {
875 Self::new()
876 }
877}
878
879#[cfg(test)]
880impl TestRuntime {
881 pub fn new() -> Self {
882 Self {
883 runtime: Runtime::new(Arc::new(TestScheduler)),
884 }
885 }
886
887 pub fn handle(&self) -> RuntimeHandle {
888 self.runtime.handle()
889 }
890}
891
892#[derive(Clone)]
893pub struct RuntimeHandle {
894 inner: Weak<RuntimeInner>,
895 dispatcher: UiDispatcher,
896 ui_thread_id: ThreadId,
897 id: RuntimeId,
898}
899
900pub struct TaskHandle {
901 id: u64,
902 runtime: RuntimeHandle,
903}
904
905struct DeferredStateRelease {
906 runtime: RuntimeHandle,
907 id: StateId,
908}
909
910pub(crate) struct StateHandleLease {
911 id: StateId,
912 runtime: RuntimeHandle,
913}
914
915impl StateHandleLease {
916 pub(crate) fn id(&self) -> StateId {
917 self.id
918 }
919
920 pub(crate) fn runtime(&self) -> RuntimeHandle {
921 self.runtime.clone()
922 }
923}
924
925impl Drop for StateHandleLease {
926 fn drop(&mut self) {
927 defer_state_release(self.runtime.clone(), self.id);
928 }
929}
930
931impl RuntimeHandle {
932 pub fn id(&self) -> RuntimeId {
933 self.id
934 }
935
936 pub(crate) fn alloc_state<T: Clone + 'static>(&self, value: T) -> Rc<StateHandleLease> {
937 let id = self.with_state_arena(|arena| arena.alloc(value, self.clone()));
938 let lease = Rc::new(StateHandleLease {
939 id,
940 runtime: self.clone(),
941 });
942 self.with_state_arena(|arena| arena.register_lease(id, &lease));
943 lease
944 }
945
946 pub(crate) fn alloc_state_with_policy<T: Clone + 'static>(
947 &self,
948 value: T,
949 policy: Arc<dyn MutationPolicy<T>>,
950 ) -> Rc<StateHandleLease> {
951 let id =
952 self.with_state_arena(|arena| arena.alloc_with_policy(value, self.clone(), policy));
953 let lease = Rc::new(StateHandleLease {
954 id,
955 runtime: self.clone(),
956 });
957 self.with_state_arena(|arena| arena.register_lease(id, &lease));
958 lease
959 }
960
961 pub(crate) fn alloc_persistent_state<T: Clone + 'static>(
962 &self,
963 value: T,
964 ) -> crate::MutableState<T> {
965 let lease = self.alloc_state(value);
966 if let Some(inner) = self.inner.upgrade() {
967 inner
968 .external_state_owners
969 .borrow_mut()
970 .insert(lease.id(), Rc::clone(&lease));
971 }
972 crate::MutableState::from_lease(&lease)
973 }
974
975 pub(crate) fn retain_state_lease(&self, id: StateId) -> Option<Rc<StateHandleLease>> {
976 self.with_state_arena(|arena| arena.retain_lease(id))
977 }
978
979 pub(crate) fn with_state_arena<R>(&self, f: impl FnOnce(&StateArena) -> R) -> R {
980 self.try_with_state_arena(f)
981 .unwrap_or_else(|| panic!("runtime dropped"))
982 }
983
984 pub(crate) fn try_with_state_arena<R>(&self, f: impl FnOnce(&StateArena) -> R) -> Option<R> {
985 self.inner.upgrade().map(|inner| f(&inner.state_arena))
986 }
987
988 fn release_state_immediate(&self, id: StateId) {
989 if let Some(inner) = self.inner.upgrade() {
990 inner.state_arena.release(id);
991 }
992 }
993
994 pub fn state_arena_stats(&self) -> (usize, usize) {
995 self.try_with_state_arena(StateArena::stats)
996 .unwrap_or_default()
997 }
998
999 pub fn state_arena_debug_stats(&self) -> StateArenaDebugStats {
1000 self.try_with_state_arena(StateArena::debug_stats)
1001 .unwrap_or_default()
1002 }
1003
1004 pub fn debug_stats(&self) -> RuntimeDebugStats {
1005 self.inner
1006 .upgrade()
1007 .map(|inner| inner.debug_stats())
1008 .unwrap_or_default()
1009 }
1010
1011 pub(crate) fn unregister_state_scope(&self, id: StateId, scope_id: ScopeId) {
1012 if let Some(inner) = self.inner.upgrade() {
1013 inner.state_arena.unregister_scope(id, scope_id);
1014 }
1015 }
1016
1017 pub fn schedule(&self) {
1018 if let Some(inner) = self.inner.upgrade() {
1019 inner.schedule();
1020 }
1021 }
1022
1023 pub(crate) fn enqueue_node_update(&self, command: Command) {
1024 if let Some(inner) = self.inner.upgrade() {
1025 inner.enqueue_update(command);
1026 }
1027 }
1028
1029 pub fn enqueue_ui_task(&self, task: Box<dyn FnOnce() + 'static>) {
1036 if let Some(inner) = self.inner.upgrade() {
1037 inner.enqueue_ui_task(task);
1038 } else {
1039 task();
1040 }
1041 }
1042
1043 pub fn spawn_ui<F>(&self, fut: F) -> Option<TaskHandle>
1044 where
1045 F: Future<Output = ()> + 'static,
1046 {
1047 self.inner.upgrade().map(|inner| {
1048 let id = inner.spawn_ui_task(Box::pin(fut));
1049 TaskHandle {
1050 id,
1051 runtime: self.clone(),
1052 }
1053 })
1054 }
1055
1056 pub fn cancel_task(&self, id: u64) {
1057 if let Some(inner) = self.inner.upgrade() {
1058 inner.cancel_task(id);
1059 }
1060 }
1061
1062 pub fn post_ui(&self, task: impl FnOnce() + Send + 'static) {
1067 self.dispatcher.post(task);
1068 }
1069
1070 pub fn register_ui_cont<T: 'static>(&self, f: impl FnOnce(T) + 'static) -> Option<u64> {
1071 self.inner.upgrade().map(|inner| inner.register_ui_cont(f))
1072 }
1073
1074 pub fn cancel_ui_cont(&self, id: u64) {
1075 if let Some(inner) = self.inner.upgrade() {
1076 inner.cancel_ui_cont(id);
1077 }
1078 }
1079
1080 pub fn drain_ui(&self) {
1081 if let Some(inner) = self.inner.upgrade() {
1082 inner.drain_ui();
1083 }
1084 }
1085
1086 pub fn has_pending_ui(&self) -> bool {
1087 self.inner
1088 .upgrade()
1089 .map(|inner| inner.has_pending_ui())
1090 .unwrap_or_else(|| self.dispatcher.has_pending())
1091 }
1092
1093 pub fn register_frame_callback(
1094 &self,
1095 callback: impl FnOnce(u64) + 'static,
1096 ) -> Option<FrameCallbackId> {
1097 self.inner
1098 .upgrade()
1099 .map(|inner| inner.register_frame_callback(Box::new(callback)))
1100 }
1101
1102 pub fn cancel_frame_callback(&self, id: FrameCallbackId) {
1103 if let Some(inner) = self.inner.upgrade() {
1104 inner.cancel_frame_callback(id);
1105 }
1106 }
1107
1108 pub fn drain_frame_callbacks(&self, frame_time_nanos: u64) {
1109 if let Some(inner) = self.inner.upgrade() {
1110 inner.drain_frame_callbacks(frame_time_nanos);
1111 }
1112 }
1113
1114 #[cfg(any(feature = "internal", test))]
1115 pub fn frame_clock(&self) -> FrameClock {
1116 FrameClock::new(self.clone())
1117 }
1118
1119 pub fn set_needs_frame(&self, value: bool) {
1120 if let Some(inner) = self.inner.upgrade() {
1121 *inner.needs_frame.borrow_mut() = value;
1122 }
1123 }
1124
1125 pub(crate) fn take_updates(&self) -> Vec<Command> {
1126 self.inner
1127 .upgrade()
1128 .map(|inner| inner.take_updates())
1129 .unwrap_or_default()
1130 }
1131
1132 pub fn has_updates(&self) -> bool {
1133 self.inner
1134 .upgrade()
1135 .map(|inner| inner.has_updates())
1136 .unwrap_or(false)
1137 }
1138
1139 pub(crate) fn mark_scope_recomposed(&self, id: ScopeId) {
1140 if let Some(inner) = self.inner.upgrade() {
1141 inner.mark_scope_recomposed(id);
1142 }
1143 }
1144
1145 pub(crate) fn register_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
1146 if let Some(inner) = self.inner.upgrade() {
1147 inner.register_invalid_scope(id, scope);
1148 }
1149 }
1150
1151 pub(crate) fn requeue_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
1152 if let Some(inner) = self.inner.upgrade() {
1153 inner.requeue_invalid_scope(id, scope);
1154 }
1155 }
1156
1157 pub(crate) fn take_invalidated_scopes(&self) -> Vec<(ScopeId, Weak<RecomposeScopeInner>)> {
1158 self.inner
1159 .upgrade()
1160 .map(|inner| inner.take_invalidated_scopes())
1161 .unwrap_or_default()
1162 }
1163
1164 pub fn has_invalid_scopes(&self) -> bool {
1165 self.inner
1166 .upgrade()
1167 .map(|inner| inner.has_invalid_scopes())
1168 .unwrap_or(false)
1169 }
1170
1171 pub(crate) fn increment_live_recompose_scope_count(&self) {
1172 if let Some(inner) = self.inner.upgrade() {
1173 inner.increment_live_recompose_scope_count();
1174 }
1175 }
1176
1177 pub(crate) fn decrement_live_recompose_scope_count(&self) {
1178 if let Some(inner) = self.inner.upgrade() {
1179 inner.decrement_live_recompose_scope_count();
1180 }
1181 }
1182
1183 fn live_recompose_scope_count(&self) -> usize {
1184 self.inner
1185 .upgrade()
1186 .map(|inner| inner.live_recompose_scope_count())
1187 .unwrap_or_default()
1188 }
1189
1190 #[doc(hidden)]
1191 pub fn debug_invalid_scope_ids(&self) -> Vec<usize> {
1192 self.inner
1193 .upgrade()
1194 .map(|inner| inner.invalid_scopes.borrow().iter().copied().collect())
1195 .unwrap_or_default()
1196 }
1197
1198 pub fn has_frame_callbacks(&self) -> bool {
1199 self.inner
1200 .upgrade()
1201 .map(|inner| inner.has_frame_callbacks())
1202 .unwrap_or(false)
1203 }
1204
1205 pub fn assert_ui_thread(&self) {
1206 debug_assert_eq!(
1207 std::thread::current().id(),
1208 self.ui_thread_id,
1209 "state mutated off the runtime's UI thread"
1210 );
1211 }
1212
1213 pub fn dispatcher(&self) -> UiDispatcher {
1214 self.dispatcher.clone()
1215 }
1216
1217 #[doc(hidden)]
1218 pub fn with_deferred_state_releases<R>(&self, f: impl FnOnce() -> R) -> R {
1219 let _scope = enter_state_teardown_scope();
1220 f()
1221 }
1222}
1223
1224impl TaskHandle {
1225 pub fn cancel(self) {
1226 self.runtime.cancel_task(self.id);
1227 }
1228}
1229
1230pub(crate) struct FrameCallbackEntry {
1231 id: FrameCallbackId,
1232 callback: Option<Box<dyn FnOnce(u64) + 'static>>,
1233}
1234
1235#[cfg(not(target_arch = "wasm32"))]
1236struct RuntimeTaskWaker {
1237 scheduler: Arc<dyn RuntimeScheduler>,
1238}
1239
1240#[cfg(target_arch = "wasm32")]
1241struct RuntimeTaskWaker {
1242 runtime_id: RuntimeId,
1243}
1244
1245impl RuntimeTaskWaker {
1246 #[cfg(not(target_arch = "wasm32"))]
1247 fn new(inner: &RuntimeInner) -> Self {
1248 let scheduler = inner.scheduler.clone();
1249 Self { scheduler }
1250 }
1251
1252 #[cfg(target_arch = "wasm32")]
1253 fn new(inner: &RuntimeInner) -> Self {
1254 let runtime_id = inner.runtime_id;
1255 Self { runtime_id }
1256 }
1257
1258 fn into_waker(self) -> Waker {
1259 futures_task::waker(Arc::new(self))
1260 }
1261}
1262
1263impl futures_task::ArcWake for RuntimeTaskWaker {
1264 #[cfg(not(target_arch = "wasm32"))]
1265 fn wake_by_ref(arc_self: &Arc<Self>) {
1266 arc_self.scheduler.schedule_frame();
1267 }
1268
1269 #[cfg(target_arch = "wasm32")]
1270 fn wake_by_ref(arc_self: &Arc<Self>) {
1271 REGISTERED_RUNTIMES.with(|registry| {
1272 if let Some(handle) = registry.borrow().get(&arc_self.runtime_id).cloned() {
1273 handle.schedule();
1274 }
1275 });
1276 }
1277}
1278
1279thread_local! {
1280 static NEXT_RUNTIME_ID: Cell<u32> = const { Cell::new(1) };
1281 static ACTIVE_RUNTIMES: RefCell<Vec<RuntimeHandle>> = const { RefCell::new(Vec::new()) };
1282 static LAST_RUNTIME: RefCell<Option<RuntimeHandle>> = const { RefCell::new(None) };
1283 static REGISTERED_RUNTIMES: RefCell<HashMap<RuntimeId, RuntimeHandle>> = RefCell::new(HashMap::default());
1284 static STATE_TEARDOWN_DEPTH: Cell<usize> = const { Cell::new(0) };
1285 static DEFERRED_STATE_RELEASES: RefCell<Vec<DeferredStateRelease>> = const { RefCell::new(Vec::new()) };
1286}
1287
1288#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1289pub struct RuntimeThreadLocalDebugStats {
1290 pub active_runtimes_len: usize,
1291 pub active_runtimes_cap: usize,
1292 pub registered_runtimes_len: usize,
1293 pub registered_runtimes_cap: usize,
1294 pub deferred_state_releases_len: usize,
1295 pub deferred_state_releases_cap: usize,
1296}
1297
1298pub fn current_runtime_handle() -> Option<RuntimeHandle> {
1303 if let Some(handle) = ACTIVE_RUNTIMES.with(|stack| stack.borrow().last().cloned()) {
1304 return Some(handle);
1305 }
1306 LAST_RUNTIME.with(|slot| slot.borrow().clone())
1307}
1308
1309pub(crate) fn runtime_handle_by_id(id: RuntimeId) -> Option<RuntimeHandle> {
1310 REGISTERED_RUNTIMES.with(|registry| registry.borrow().get(&id).cloned())
1311}
1312
1313pub(crate) fn live_recompose_scope_count() -> usize {
1314 REGISTERED_RUNTIMES.with(|registry| {
1315 registry
1316 .borrow()
1317 .values()
1318 .map(RuntimeHandle::live_recompose_scope_count)
1319 .sum()
1320 })
1321}
1322
1323pub fn debug_runtime_thread_local_stats() -> RuntimeThreadLocalDebugStats {
1324 let (active_runtimes_len, active_runtimes_cap) = ACTIVE_RUNTIMES.with(|stack| {
1325 let stack = stack.borrow();
1326 (stack.len(), stack.capacity())
1327 });
1328 let (registered_runtimes_len, registered_runtimes_cap) = REGISTERED_RUNTIMES.with(|registry| {
1329 let registry = registry.borrow();
1330 (registry.len(), registry.capacity())
1331 });
1332 let (deferred_state_releases_len, deferred_state_releases_cap) =
1333 DEFERRED_STATE_RELEASES.with(|releases| {
1334 let releases = releases.borrow();
1335 (releases.len(), releases.capacity())
1336 });
1337
1338 RuntimeThreadLocalDebugStats {
1339 active_runtimes_len,
1340 active_runtimes_cap,
1341 registered_runtimes_len,
1342 registered_runtimes_cap,
1343 deferred_state_releases_len,
1344 deferred_state_releases_cap,
1345 }
1346}
1347
1348fn register_runtime_handle(handle: &RuntimeHandle) {
1349 REGISTERED_RUNTIMES.with(|registry| {
1350 registry.borrow_mut().insert(handle.id(), handle.clone());
1351 });
1352}
1353
1354fn unregister_runtime_handle(id: RuntimeId) {
1355 REGISTERED_RUNTIMES.with(|registry| {
1356 registry.borrow_mut().remove(&id);
1357 });
1358}
1359
1360fn defer_state_release(runtime: RuntimeHandle, id: StateId) {
1361 let teardown_active = STATE_TEARDOWN_DEPTH.with(|depth| depth.get() > 0);
1362 if teardown_active {
1363 DEFERRED_STATE_RELEASES.with(|releases| {
1364 releases
1365 .borrow_mut()
1366 .push(DeferredStateRelease { runtime, id });
1367 });
1368 } else {
1369 runtime.release_state_immediate(id);
1370 }
1371}
1372
1373fn flush_deferred_state_releases() {
1374 DEFERRED_STATE_RELEASES.with(|releases| {
1375 let mut releases = releases.borrow_mut();
1376 while let Some(deferred) = releases.pop() {
1377 deferred.runtime.release_state_immediate(deferred.id);
1378 }
1379 });
1380}
1381
1382pub(crate) struct StateTeardownScope;
1383
1384pub(crate) fn enter_state_teardown_scope() -> StateTeardownScope {
1385 STATE_TEARDOWN_DEPTH.with(|depth| depth.set(depth.get() + 1));
1386 StateTeardownScope
1387}
1388
1389impl Drop for StateTeardownScope {
1390 fn drop(&mut self) {
1391 STATE_TEARDOWN_DEPTH.with(|depth| {
1392 let next = depth.get().saturating_sub(1);
1393 depth.set(next);
1394 if next == 0 {
1395 flush_deferred_state_releases();
1396 }
1397 });
1398 }
1399}
1400
1401pub(crate) fn push_active_runtime(handle: &RuntimeHandle) {
1402 register_runtime_handle(handle);
1403 ACTIVE_RUNTIMES.with(|stack| stack.borrow_mut().push(handle.clone()));
1404 LAST_RUNTIME.with(|slot| *slot.borrow_mut() = Some(handle.clone()));
1405}
1406
1407pub(crate) fn pop_active_runtime() {
1408 ACTIVE_RUNTIMES.with(|stack| {
1409 stack.borrow_mut().pop();
1410 });
1411}
1412
1413pub fn schedule_frame() {
1415 if let Some(handle) = current_runtime_handle() {
1416 handle.schedule();
1417 return;
1418 }
1419 log::debug!(
1420 target: "cranpose::runtime",
1421 "ignoring frame request without an active runtime",
1422 );
1423}
1424
1425pub fn schedule_node_update(
1427 update: impl FnOnce(&mut dyn Applier) -> Result<(), NodeError> + 'static,
1428) {
1429 if let Some(handle) = current_runtime_handle() {
1430 handle.enqueue_node_update(Command::callback(update));
1431 } else {
1432 drop(update);
1433 log::debug!(
1434 target: "cranpose::runtime",
1435 "ignoring node update request without an active runtime",
1436 );
1437 }
1438}
1439
1440#[cfg(test)]
1441mod tests {
1442 use super::*;
1443
1444 #[test]
1445 fn state_arena_alloc_skips_free_slot_outside_cell_storage() {
1446 let runtime = TestRuntime::new();
1447 let arena = StateArena::default();
1448 let first = arena.alloc(1_i32, runtime.handle());
1449 arena.inner.borrow_mut().free.push(u32::MAX);
1450
1451 let second = arena.alloc(2_i32, runtime.handle());
1452
1453 assert_eq!(first.slot(), 0);
1454 assert_eq!(second.slot(), 1);
1455 assert!(arena.get_typed_opt::<i32>(first).is_some());
1456 assert!(arena.get_typed_opt::<i32>(second).is_some());
1457 }
1458
1459 #[test]
1460 fn state_arena_alloc_skips_occupied_free_slot() {
1461 let runtime = TestRuntime::new();
1462 let arena = StateArena::default();
1463 let first = arena.alloc(1_i32, runtime.handle());
1464 arena.inner.borrow_mut().free.push(first.slot());
1465
1466 let second = arena.alloc(2_i32, runtime.handle());
1467
1468 assert_ne!(first.slot(), second.slot());
1469 assert_eq!(second.slot(), 1);
1470 assert!(arena.get_typed_opt::<i32>(first).is_some());
1471 assert!(arena.get_typed_opt::<i32>(second).is_some());
1472 }
1473
1474 #[test]
1475 fn state_arena_register_lease_ignores_stale_id() {
1476 let runtime = TestRuntime::new();
1477 let arena = StateArena::default();
1478 let stale_id = StateId::new(99, 0);
1479 let lease = Rc::new(StateHandleLease {
1480 id: stale_id,
1481 runtime: runtime.handle(),
1482 });
1483
1484 arena.register_lease(stale_id, &lease);
1485
1486 assert!(arena.retain_lease(stale_id).is_none());
1487 }
1488
1489 #[test]
1490 fn ui_continuation_type_mismatch_is_ignored_until_matching_payload() {
1491 let runtime = TestRuntime::new();
1492 let handle = runtime.handle();
1493 let received = Rc::new(Cell::new(None));
1494 let received_for_continuation = Rc::clone(&received);
1495 let cont_id = handle
1496 .register_ui_cont(move |value: u32| {
1497 received_for_continuation.set(Some(value));
1498 })
1499 .expect("test runtime is alive");
1500
1501 handle.dispatcher().post_invoke(cont_id, "wrong payload");
1502 handle.drain_ui();
1503
1504 assert_eq!(received.get(), None);
1505 assert_eq!(handle.debug_stats().ui_conts_len, 1);
1506
1507 handle.dispatcher().post_invoke(cont_id, 42_u32);
1508 handle.drain_ui();
1509
1510 assert_eq!(received.get(), Some(42));
1511 assert_eq!(handle.debug_stats().ui_conts_len, 0);
1512 }
1513
1514 #[test]
1515 fn ui_dispatcher_failed_send_does_not_leave_pending_work() {
1516 let runtime = Runtime::new(Arc::new(TestScheduler));
1517 let dispatcher = runtime.handle().dispatcher();
1518
1519 drop(runtime);
1520
1521 assert!(!dispatcher.has_pending());
1522 dispatcher.post(|| {});
1523 assert!(!dispatcher.has_pending());
1524 dispatcher.post_invoke(404, 12_u32);
1525 assert!(!dispatcher.has_pending());
1526 }
1527
1528 #[test]
1529 fn pending_guard_does_not_wrap_on_underflow() {
1530 let counter = AtomicUsize::new(0);
1531
1532 {
1533 let _guard = PendingGuard::new(&counter);
1534 }
1535
1536 assert_eq!(counter.load(Ordering::SeqCst), 0);
1537 }
1538
1539 #[test]
1540 fn pending_guard_decrements_pending_count() {
1541 let counter = AtomicUsize::new(2);
1542
1543 {
1544 let _guard = PendingGuard::new(&counter);
1545 }
1546
1547 assert_eq!(counter.load(Ordering::SeqCst), 1);
1548 }
1549
1550 #[test]
1551 fn schedule_frame_without_runtime_is_ignored() {
1552 let ok = std::thread::spawn(|| std::panic::catch_unwind(super::schedule_frame).is_ok())
1553 .join()
1554 .expect("test thread should join");
1555
1556 assert!(ok);
1557 }
1558
1559 #[test]
1560 fn schedule_node_update_without_runtime_is_ignored() {
1561 let ok = std::thread::spawn(|| {
1562 std::panic::catch_unwind(|| {
1563 super::schedule_node_update(|_| Ok(()));
1564 })
1565 .is_ok()
1566 })
1567 .join()
1568 .expect("test thread should join");
1569
1570 assert!(ok);
1571 }
1572}