Skip to main content

cranpose_core/
runtime.rs

1use crate::collections::map::HashMap;
2use crate::collections::map::HashSet;
3use crate::state::{MutationPolicy, NeverEqual};
4use crate::MutableStateInner;
5use std::any::Any;
6use std::cell::{Cell, RefCell};
7use std::collections::VecDeque;
8use std::future::Future;
9use std::pin::Pin;
10use std::rc::{Rc, Weak};
11use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
12use std::sync::{mpsc, Arc};
13use std::task::{Context, Poll, Waker};
14use std::thread::ThreadId;
15use std::thread_local;
16
17#[cfg(any(feature = "internal", test))]
18use crate::frame_clock::FrameClock;
19use crate::platform::RuntimeScheduler;
20use crate::{Applier, Command, FrameCallbackId, NodeError, RecomposeScopeInner, ScopeId};
21
22enum UiMessage {
23    Task(Box<dyn FnOnce() + Send + 'static>),
24    Invoke { id: u64, value: Box<dyn Any + Send> },
25}
26
27type UiContinuation = Box<dyn Fn(Box<dyn Any>) + 'static>;
28type UiContinuationMap = HashMap<u64, UiContinuation>;
29
30struct TypedStateCell<T: Clone + 'static> {
31    inner: MutableStateInner<T>,
32}
33
34trait ScopeWatchCell {
35    fn unregister_scope(&self, scope_id: ScopeId);
36}
37
38impl<T: Clone + 'static> ScopeWatchCell for TypedStateCell<T> {
39    fn unregister_scope(&self, scope_id: ScopeId) {
40        self.inner.unregister_scope(scope_id);
41    }
42}
43
44struct StateArenaSlot {
45    generation: u32,
46    cell: Option<Rc<dyn Any>>,
47    watcher_cell: Option<Rc<dyn ScopeWatchCell>>,
48    lease: Option<Weak<StateHandleLease>>,
49}
50
51#[derive(Default)]
52struct StateArenaInner {
53    cells: Vec<StateArenaSlot>,
54    free: Vec<u32>,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
58pub struct StateArenaDebugStats {
59    pub cells_len: usize,
60    pub cells_cap: usize,
61    pub free_len: usize,
62    pub free_cap: usize,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
66pub struct RuntimeDebugStats {
67    pub node_updates_len: usize,
68    pub node_updates_cap: usize,
69    pub invalid_scopes_len: usize,
70    pub invalid_scopes_cap: usize,
71    pub scope_queue_len: usize,
72    pub scope_queue_cap: usize,
73    pub frame_callbacks_len: usize,
74    pub frame_callbacks_cap: usize,
75    pub local_tasks_len: usize,
76    pub local_tasks_cap: usize,
77    pub ui_conts_len: usize,
78    pub ui_conts_cap: usize,
79    pub tasks_len: usize,
80    pub tasks_cap: usize,
81    pub external_state_owners_len: usize,
82    pub external_state_owners_cap: usize,
83    pub ui_dispatcher_pending: usize,
84}
85
86#[derive(Default)]
87pub(crate) struct StateArena {
88    inner: RefCell<StateArenaInner>,
89}
90
91impl StateArena {
92    pub(crate) fn alloc<T: Clone + 'static>(&self, value: T, runtime: RuntimeHandle) -> StateId {
93        self.alloc_with_policy(value, runtime, Arc::new(NeverEqual))
94    }
95
96    pub(crate) fn alloc_with_policy<T: Clone + 'static>(
97        &self,
98        value: T,
99        runtime: RuntimeHandle,
100        policy: Arc<dyn MutationPolicy<T>>,
101    ) -> StateId {
102        let (slot, generation) = {
103            let mut inner = self.inner.borrow_mut();
104            match inner.free.pop() {
105                Some(slot) => {
106                    let entry = inner
107                        .cells
108                        .get_mut(slot as usize)
109                        .expect("state slot missing");
110                    debug_assert!(entry.cell.is_none(), "reused state slot must be empty");
111                    entry.generation = entry.generation.wrapping_add(1);
112                    (slot, entry.generation)
113                }
114                None => {
115                    let slot = inner.cells.len() as u32;
116                    inner.cells.push(StateArenaSlot {
117                        generation: 0,
118                        cell: None,
119                        watcher_cell: None,
120                        lease: None,
121                    });
122                    (slot, 0)
123                }
124            }
125        };
126        let id = StateId::new(slot, generation);
127        let inner = MutableStateInner::new_with_policy(value, runtime.clone(), policy);
128        inner.install_snapshot_observer(id);
129        let typed_cell = Rc::new(TypedStateCell { inner });
130        let cell: Rc<dyn Any> = typed_cell.clone();
131        let watcher_cell: Rc<dyn ScopeWatchCell> = typed_cell;
132        let mut arena = self.inner.borrow_mut();
133        let slot_entry = &mut arena.cells[slot as usize];
134        slot_entry.cell = Some(cell);
135        slot_entry.watcher_cell = Some(watcher_cell);
136        id
137    }
138
139    fn get_cell_opt(&self, id: StateId) -> Option<Rc<dyn Any>> {
140        self.inner
141            .borrow()
142            .cells
143            .get(id.slot_index())
144            .filter(|cell| cell.generation == id.generation())
145            .and_then(|cell| cell.cell.as_ref())
146            .cloned()
147    }
148
149    fn get_typed<T: Clone + 'static>(&self, id: StateId) -> Rc<TypedStateCell<T>> {
150        match self.get_cell_opt(id) {
151            None => panic!(
152                "state cell missing: slot={}, gen={}, expected={}",
153                id.slot(),
154                id.generation(),
155                std::any::type_name::<T>(),
156            ),
157            Some(cell) => Rc::downcast::<TypedStateCell<T>>(cell).unwrap_or_else(|_| {
158                panic!(
159                    "state cell type mismatch: slot={}, gen={}, expected={}",
160                    id.slot(),
161                    id.generation(),
162                    std::any::type_name::<T>(),
163                )
164            }),
165        }
166    }
167
168    fn get_typed_opt<T: Clone + 'static>(&self, id: StateId) -> Option<Rc<TypedStateCell<T>>> {
169        Rc::downcast::<TypedStateCell<T>>(self.get_cell_opt(id)?).ok()
170    }
171
172    pub(crate) fn with_typed<T: Clone + 'static, R>(
173        &self,
174        id: StateId,
175        f: impl FnOnce(&MutableStateInner<T>) -> R,
176    ) -> R {
177        let cell = self.get_typed::<T>(id);
178        f(&cell.inner)
179    }
180
181    pub(crate) fn with_typed_opt<T: Clone + 'static, R>(
182        &self,
183        id: StateId,
184        f: impl FnOnce(&MutableStateInner<T>) -> R,
185    ) -> Option<R> {
186        let cell = self.get_typed_opt::<T>(id)?;
187        Some(f(&cell.inner))
188    }
189
190    pub(crate) fn release(&self, id: StateId) {
191        let cell = {
192            let mut inner = self.inner.borrow_mut();
193            let Some(slot) = inner.cells.get_mut(id.slot_index()) else {
194                return;
195            };
196            if slot.generation != id.generation() {
197                return;
198            }
199            slot.lease = None;
200            slot.watcher_cell = None;
201            let cell = slot.cell.take();
202            if cell.is_some() {
203                inner.free.push(id.slot());
204            }
205            cell
206        };
207        drop(cell);
208    }
209
210    pub(crate) fn stats(&self) -> (usize, usize) {
211        let inner = self.inner.borrow();
212        (inner.cells.len(), inner.free.len())
213    }
214
215    pub(crate) fn debug_stats(&self) -> StateArenaDebugStats {
216        let inner = self.inner.borrow();
217        StateArenaDebugStats {
218            cells_len: inner.cells.len(),
219            cells_cap: inner.cells.capacity(),
220            free_len: inner.free.len(),
221            free_cap: inner.free.capacity(),
222        }
223    }
224
225    pub(crate) fn unregister_scope(&self, id: StateId, scope_id: ScopeId) {
226        let watcher_cell = {
227            let inner = self.inner.borrow();
228            inner
229                .cells
230                .get(id.slot_index())
231                .filter(|slot| slot.generation == id.generation())
232                .and_then(|slot| slot.watcher_cell.as_ref())
233                .cloned()
234        };
235        if let Some(watcher_cell) = watcher_cell {
236            watcher_cell.unregister_scope(scope_id);
237        }
238    }
239
240    pub(crate) fn register_lease(&self, id: StateId, lease: &Rc<StateHandleLease>) {
241        let mut inner = self.inner.borrow_mut();
242        let Some(slot) = inner.cells.get_mut(id.slot_index()) else {
243            panic!("state slot missing");
244        };
245        assert_eq!(
246            slot.generation,
247            id.generation(),
248            "state generation mismatch"
249        );
250        slot.lease = Some(Rc::downgrade(lease));
251    }
252
253    pub(crate) fn retain_lease(&self, id: StateId) -> Option<Rc<StateHandleLease>> {
254        let inner = self.inner.borrow();
255        let slot = inner.cells.get(id.slot_index())?;
256        if slot.generation != id.generation() {
257            return None;
258        }
259        slot.lease.as_ref()?.upgrade()
260    }
261}
262
263#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
264pub struct StateId {
265    slot: u32,
266    generation: u32,
267}
268
269impl StateId {
270    const fn new(slot: u32, generation: u32) -> Self {
271        Self { slot, generation }
272    }
273
274    pub(crate) const fn slot(self) -> u32 {
275        self.slot
276    }
277
278    pub(crate) const fn slot_index(self) -> usize {
279        self.slot as usize
280    }
281
282    pub(crate) const fn generation(self) -> u32 {
283        self.generation
284    }
285}
286
287#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
288pub struct RuntimeId(u32);
289
290impl RuntimeId {
291    fn next() -> Self {
292        static NEXT_RUNTIME_ID: AtomicU32 = AtomicU32::new(1);
293        Self(NEXT_RUNTIME_ID.fetch_add(1, Ordering::Relaxed))
294    }
295}
296
297struct UiDispatcherInner {
298    scheduler: Arc<dyn RuntimeScheduler>,
299    tx: mpsc::Sender<UiMessage>,
300    pending: AtomicUsize,
301}
302
303impl UiDispatcherInner {
304    fn new(scheduler: Arc<dyn RuntimeScheduler>, tx: mpsc::Sender<UiMessage>) -> Self {
305        Self {
306            scheduler,
307            tx,
308            pending: AtomicUsize::new(0),
309        }
310    }
311
312    fn post(&self, task: impl FnOnce() + Send + 'static) {
313        self.pending.fetch_add(1, Ordering::SeqCst);
314        let _ = self.tx.send(UiMessage::Task(Box::new(task)));
315        self.scheduler.schedule_frame();
316    }
317
318    fn post_invoke(&self, id: u64, value: Box<dyn Any + Send>) {
319        self.pending.fetch_add(1, Ordering::SeqCst);
320        let _ = self.tx.send(UiMessage::Invoke { id, value });
321        self.scheduler.schedule_frame();
322    }
323
324    fn has_pending(&self) -> bool {
325        self.pending.load(Ordering::SeqCst) > 0
326    }
327}
328
329struct PendingGuard<'a> {
330    counter: &'a AtomicUsize,
331}
332
333impl<'a> PendingGuard<'a> {
334    fn new(counter: &'a AtomicUsize) -> Self {
335        Self { counter }
336    }
337}
338
339impl<'a> Drop for PendingGuard<'a> {
340    fn drop(&mut self) {
341        let previous = self.counter.fetch_sub(1, Ordering::SeqCst);
342        debug_assert!(previous > 0, "UI dispatcher pending count underflowed");
343    }
344}
345
346#[derive(Clone)]
347pub struct UiDispatcher {
348    inner: Arc<UiDispatcherInner>,
349}
350
351impl UiDispatcher {
352    fn new(inner: Arc<UiDispatcherInner>) -> Self {
353        Self { inner }
354    }
355
356    pub fn post(&self, task: impl FnOnce() + Send + 'static) {
357        self.inner.post(task);
358    }
359
360    pub fn post_invoke<T>(&self, id: u64, value: T)
361    where
362        T: Send + 'static,
363    {
364        self.inner.post_invoke(id, Box::new(value));
365    }
366
367    pub fn has_pending(&self) -> bool {
368        self.inner.has_pending()
369    }
370}
371
372struct RuntimeInner {
373    scheduler: Arc<dyn RuntimeScheduler>,
374    needs_frame: RefCell<bool>,
375    node_updates: RefCell<Vec<Command>>,
376    invalid_scopes: RefCell<HashSet<ScopeId>>,
377    scope_queue: RefCell<Vec<(ScopeId, Weak<RecomposeScopeInner>)>>,
378    frame_callbacks: RefCell<VecDeque<FrameCallbackEntry>>,
379    next_frame_callback_id: Cell<u64>,
380    ui_dispatcher: Arc<UiDispatcherInner>,
381    ui_rx: RefCell<mpsc::Receiver<UiMessage>>,
382    local_tasks: RefCell<VecDeque<Box<dyn FnOnce() + 'static>>>,
383    ui_conts: RefCell<UiContinuationMap>,
384    next_cont_id: Cell<u64>,
385    ui_thread_id: ThreadId,
386    tasks: RefCell<Vec<TaskEntry>>,
387    next_task_id: Cell<u64>,
388    task_waker: RefCell<Option<Waker>>,
389    state_arena: StateArena,
390    external_state_owners: RefCell<HashMap<StateId, Rc<StateHandleLease>>>,
391    runtime_id: RuntimeId,
392}
393
394struct TaskEntry {
395    id: u64,
396    future: Pin<Box<dyn Future<Output = ()> + 'static>>,
397}
398
399impl RuntimeInner {
400    fn new(scheduler: Arc<dyn RuntimeScheduler>) -> Self {
401        let (tx, rx) = mpsc::channel();
402        let dispatcher = Arc::new(UiDispatcherInner::new(scheduler.clone(), tx));
403        Self {
404            scheduler,
405            needs_frame: RefCell::new(false),
406            node_updates: RefCell::new(Vec::new()),
407            invalid_scopes: RefCell::new(HashSet::default()),
408            scope_queue: RefCell::new(Vec::new()),
409            frame_callbacks: RefCell::new(VecDeque::new()),
410            next_frame_callback_id: Cell::new(1),
411            ui_dispatcher: dispatcher,
412            ui_rx: RefCell::new(rx),
413            local_tasks: RefCell::new(VecDeque::new()),
414            ui_conts: RefCell::new(UiContinuationMap::default()),
415            next_cont_id: Cell::new(1),
416            ui_thread_id: std::thread::current().id(),
417            tasks: RefCell::new(Vec::new()),
418            next_task_id: Cell::new(1),
419            task_waker: RefCell::new(None),
420            state_arena: StateArena::default(),
421            external_state_owners: RefCell::new(HashMap::default()),
422            runtime_id: RuntimeId::next(),
423        }
424    }
425
426    fn init_task_waker(this: &Rc<Self>) {
427        let weak = Rc::downgrade(this);
428        let waker = RuntimeTaskWaker::new(weak).into_waker();
429        *this.task_waker.borrow_mut() = Some(waker);
430    }
431
432    fn schedule(&self) {
433        *self.needs_frame.borrow_mut() = true;
434        self.scheduler.schedule_frame();
435    }
436
437    fn enqueue_update(&self, command: Command) {
438        self.node_updates.borrow_mut().push(command);
439        self.schedule(); // Ensure frame is scheduled to process the command
440    }
441
442    fn take_updates(&self) -> Vec<Command> {
443        let updates = self.node_updates.borrow_mut().drain(..).collect::<Vec<_>>();
444        updates
445    }
446
447    fn has_updates(&self) -> bool {
448        !self.node_updates.borrow().is_empty() || self.has_invalid_scopes()
449    }
450
451    fn register_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
452        let mut invalid = self.invalid_scopes.borrow_mut();
453        if invalid.insert(id) {
454            self.scope_queue.borrow_mut().push((id, scope));
455            self.schedule();
456        }
457    }
458
459    fn requeue_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
460        if self.invalid_scopes.borrow().contains(&id) {
461            self.scope_queue.borrow_mut().push((id, scope));
462            self.schedule();
463        }
464    }
465
466    fn mark_scope_recomposed(&self, id: ScopeId) {
467        self.invalid_scopes.borrow_mut().remove(&id);
468    }
469
470    fn take_invalidated_scopes(&self) -> Vec<(ScopeId, Weak<RecomposeScopeInner>)> {
471        let mut queue = self.scope_queue.borrow_mut();
472        if queue.is_empty() {
473            return Vec::new();
474        }
475        let pending: Vec<_> = queue.drain(..).collect();
476        drop(queue);
477        let invalid = self.invalid_scopes.borrow();
478        pending
479            .into_iter()
480            .filter(|(id, _)| invalid.contains(id))
481            .collect()
482    }
483
484    fn has_invalid_scopes(&self) -> bool {
485        !self.invalid_scopes.borrow().is_empty()
486    }
487
488    fn has_frame_callbacks(&self) -> bool {
489        !self.frame_callbacks.borrow().is_empty()
490    }
491
492    /// Queues a closure that is already bound to the UI thread's local queue.
493    ///
494    /// The closure may capture `Rc`/`RefCell` values because it never leaves the
495    /// runtime thread. Callers must only invoke this from the runtime thread.
496    fn enqueue_ui_task(&self, task: Box<dyn FnOnce() + 'static>) {
497        self.local_tasks.borrow_mut().push_back(task);
498        self.schedule();
499    }
500
501    fn spawn_ui_task(&self, future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> u64 {
502        let id = self.next_task_id.get();
503        self.next_task_id.set(id + 1);
504        self.tasks.borrow_mut().push(TaskEntry { id, future });
505        self.schedule();
506        id
507    }
508
509    fn cancel_task(&self, id: u64) {
510        let mut tasks = self.tasks.borrow_mut();
511        if tasks.iter().any(|entry| entry.id == id) {
512            tasks.retain(|entry| entry.id != id);
513        }
514    }
515
516    fn poll_async_tasks(&self) -> bool {
517        let waker = match self.task_waker.borrow().as_ref() {
518            Some(waker) => waker.clone(),
519            None => return false,
520        };
521        let mut cx = Context::from_waker(&waker);
522        let mut tasks_ref = self.tasks.borrow_mut();
523        let tasks = std::mem::take(&mut *tasks_ref);
524        drop(tasks_ref);
525        let mut pending = Vec::with_capacity(tasks.len());
526        let mut made_progress = false;
527        for mut entry in tasks.into_iter() {
528            match entry.future.as_mut().poll(&mut cx) {
529                Poll::Ready(()) => {
530                    made_progress = true;
531                }
532                Poll::Pending => {
533                    pending.push(entry);
534                }
535            }
536        }
537        if !pending.is_empty() {
538            self.tasks.borrow_mut().extend(pending);
539        }
540        made_progress
541    }
542
543    fn drain_ui(&self) {
544        loop {
545            let mut executed = false;
546
547            {
548                let rx = &mut *self.ui_rx.borrow_mut();
549                for message in rx.try_iter() {
550                    executed = true;
551                    let _guard = PendingGuard::new(&self.ui_dispatcher.pending);
552                    match message {
553                        UiMessage::Task(task) => {
554                            task();
555                        }
556                        UiMessage::Invoke { id, value } => {
557                            self.invoke_ui_cont(id, value);
558                        }
559                    }
560                }
561            }
562
563            loop {
564                let task = {
565                    let mut local = self.local_tasks.borrow_mut();
566                    local.pop_front()
567                };
568
569                match task {
570                    Some(task) => {
571                        executed = true;
572                        task();
573                    }
574                    None => break,
575                }
576            }
577
578            if self.poll_async_tasks() {
579                executed = true;
580            }
581
582            if !executed {
583                break;
584            }
585        }
586    }
587
588    fn has_pending_ui(&self) -> bool {
589        let local_pending = self
590            .local_tasks
591            .try_borrow()
592            .map(|tasks| !tasks.is_empty())
593            .unwrap_or(true);
594
595        let async_pending = self
596            .tasks
597            .try_borrow()
598            .map(|tasks| !tasks.is_empty())
599            .unwrap_or(true);
600
601        local_pending || self.ui_dispatcher.has_pending() || async_pending
602    }
603
604    fn register_ui_cont<T: 'static>(&self, f: impl FnOnce(T) + 'static) -> u64 {
605        debug_assert_eq!(
606            std::thread::current().id(),
607            self.ui_thread_id,
608            "UI continuation registered off the runtime thread",
609        );
610        let id = self.next_cont_id.get();
611        self.next_cont_id.set(id + 1);
612        let callback = RefCell::new(Some(f));
613        self.ui_conts.borrow_mut().insert(
614            id,
615            Box::new(move |value: Box<dyn Any>| {
616                let slot = callback
617                    .borrow_mut()
618                    .take()
619                    .expect("UI continuation invoked more than once");
620                let value = value
621                    .downcast::<T>()
622                    .expect("UI continuation type mismatch");
623                slot(*value);
624            }),
625        );
626        id
627    }
628
629    fn invoke_ui_cont(&self, id: u64, value: Box<dyn Any + Send>) {
630        debug_assert_eq!(
631            std::thread::current().id(),
632            self.ui_thread_id,
633            "UI continuation invoked off the runtime thread",
634        );
635        if let Some(callback) = self.ui_conts.borrow_mut().remove(&id) {
636            let value: Box<dyn Any> = value;
637            callback(value);
638        }
639    }
640
641    fn cancel_ui_cont(&self, id: u64) {
642        self.ui_conts.borrow_mut().remove(&id);
643    }
644
645    fn register_frame_callback(&self, callback: Box<dyn FnOnce(u64) + 'static>) -> FrameCallbackId {
646        let id = self.next_frame_callback_id.get();
647        self.next_frame_callback_id.set(id + 1);
648        self.frame_callbacks
649            .borrow_mut()
650            .push_back(FrameCallbackEntry {
651                id,
652                callback: Some(callback),
653            });
654        self.schedule();
655        id
656    }
657
658    fn cancel_frame_callback(&self, id: FrameCallbackId) {
659        let mut callbacks = self.frame_callbacks.borrow_mut();
660        if let Some(index) = callbacks.iter().position(|entry| entry.id == id) {
661            callbacks.remove(index);
662        }
663        let callbacks_empty = callbacks.is_empty();
664        drop(callbacks);
665        let local_pending = self
666            .local_tasks
667            .try_borrow()
668            .map(|tasks| !tasks.is_empty())
669            .unwrap_or(true);
670        let async_pending = self
671            .tasks
672            .try_borrow()
673            .map(|tasks| !tasks.is_empty())
674            .unwrap_or(true);
675        if !self.has_invalid_scopes()
676            && !self.has_updates()
677            && callbacks_empty
678            && !local_pending
679            && !self.ui_dispatcher.has_pending()
680            && !async_pending
681        {
682            *self.needs_frame.borrow_mut() = false;
683        }
684    }
685
686    fn drain_frame_callbacks(&self, frame_time_nanos: u64) {
687        let mut callbacks = self.frame_callbacks.borrow_mut();
688        let mut pending: Vec<Box<dyn FnOnce(u64) + 'static>> = Vec::with_capacity(callbacks.len());
689        while let Some(mut entry) = callbacks.pop_front() {
690            if let Some(callback) = entry.callback.take() {
691                pending.push(callback);
692            }
693        }
694        drop(callbacks);
695
696        // Wrap ALL frame callbacks in a single mutable snapshot so state changes
697        // are properly applied to the global snapshot and visible to subsequent reads.
698        // Using a single snapshot for all callbacks avoids stack exhaustion from
699        // repeated snapshot creation in long-running animation loops.
700        if !pending.is_empty() {
701            let _ = crate::run_in_mutable_snapshot(|| {
702                for callback in pending {
703                    callback(frame_time_nanos);
704                }
705            });
706        }
707
708        if !self.has_invalid_scopes()
709            && !self.has_updates()
710            && !self.has_frame_callbacks()
711            && !self.has_pending_ui()
712        {
713            *self.needs_frame.borrow_mut() = false;
714        }
715    }
716
717    fn debug_stats(&self) -> RuntimeDebugStats {
718        let node_updates = self.node_updates.borrow();
719        let invalid_scopes = self.invalid_scopes.borrow();
720        let scope_queue = self.scope_queue.borrow();
721        let frame_callbacks = self.frame_callbacks.borrow();
722        let local_tasks = self.local_tasks.borrow();
723        let ui_conts = self.ui_conts.borrow();
724        let tasks = self.tasks.borrow();
725        let external_state_owners = self.external_state_owners.borrow();
726
727        RuntimeDebugStats {
728            node_updates_len: node_updates.len(),
729            node_updates_cap: node_updates.capacity(),
730            invalid_scopes_len: invalid_scopes.len(),
731            invalid_scopes_cap: invalid_scopes.capacity(),
732            scope_queue_len: scope_queue.len(),
733            scope_queue_cap: scope_queue.capacity(),
734            frame_callbacks_len: frame_callbacks.len(),
735            frame_callbacks_cap: frame_callbacks.capacity(),
736            local_tasks_len: local_tasks.len(),
737            local_tasks_cap: local_tasks.capacity(),
738            ui_conts_len: ui_conts.len(),
739            ui_conts_cap: ui_conts.capacity(),
740            tasks_len: tasks.len(),
741            tasks_cap: tasks.capacity(),
742            external_state_owners_len: external_state_owners.len(),
743            external_state_owners_cap: external_state_owners.capacity(),
744            ui_dispatcher_pending: self.ui_dispatcher.pending.load(Ordering::SeqCst),
745        }
746    }
747}
748
749#[derive(Clone)]
750pub struct Runtime {
751    inner: Rc<RuntimeInner>,
752}
753
754impl Runtime {
755    pub fn new(scheduler: Arc<dyn RuntimeScheduler>) -> Self {
756        let inner = Rc::new(RuntimeInner::new(scheduler));
757        RuntimeInner::init_task_waker(&inner);
758        let runtime = Self { inner };
759        let handle = runtime.handle();
760        register_runtime_handle(&handle);
761        LAST_RUNTIME.with(|slot| *slot.borrow_mut() = Some(handle));
762        runtime
763    }
764
765    pub fn handle(&self) -> RuntimeHandle {
766        RuntimeHandle {
767            inner: Rc::downgrade(&self.inner),
768            dispatcher: UiDispatcher::new(self.inner.ui_dispatcher.clone()),
769            ui_thread_id: self.inner.ui_thread_id,
770            id: self.inner.runtime_id,
771        }
772    }
773
774    pub fn has_updates(&self) -> bool {
775        self.inner.has_updates()
776    }
777
778    pub fn needs_frame(&self) -> bool {
779        *self.inner.needs_frame.borrow() || self.inner.ui_dispatcher.has_pending()
780    }
781
782    pub fn set_needs_frame(&self, value: bool) {
783        *self.inner.needs_frame.borrow_mut() = value;
784    }
785
786    #[cfg(any(feature = "internal", test))]
787    pub fn frame_clock(&self) -> FrameClock {
788        FrameClock::new(self.handle())
789    }
790}
791
792impl Drop for Runtime {
793    fn drop(&mut self) {
794        if Rc::strong_count(&self.inner) != 1 {
795            return;
796        }
797        unregister_runtime_handle(self.inner.runtime_id);
798        LAST_RUNTIME.with(|slot| {
799            let should_clear = slot
800                .borrow()
801                .as_ref()
802                .is_some_and(|handle| handle.id() == self.inner.runtime_id);
803            if should_clear {
804                *slot.borrow_mut() = None;
805            }
806        });
807    }
808}
809
810#[derive(Default)]
811pub struct DefaultScheduler;
812
813impl RuntimeScheduler for DefaultScheduler {
814    fn schedule_frame(&self) {}
815}
816
817#[cfg(test)]
818#[derive(Default)]
819pub struct TestScheduler;
820
821#[cfg(test)]
822impl RuntimeScheduler for TestScheduler {
823    fn schedule_frame(&self) {}
824}
825
826#[cfg(test)]
827pub struct TestRuntime {
828    runtime: Runtime,
829}
830
831#[cfg(test)]
832impl Default for TestRuntime {
833    fn default() -> Self {
834        Self::new()
835    }
836}
837
838#[cfg(test)]
839impl TestRuntime {
840    pub fn new() -> Self {
841        Self {
842            runtime: Runtime::new(Arc::new(TestScheduler)),
843        }
844    }
845
846    pub fn handle(&self) -> RuntimeHandle {
847        self.runtime.handle()
848    }
849}
850
851#[derive(Clone)]
852pub struct RuntimeHandle {
853    inner: Weak<RuntimeInner>,
854    dispatcher: UiDispatcher,
855    ui_thread_id: ThreadId,
856    id: RuntimeId,
857}
858
859pub struct TaskHandle {
860    id: u64,
861    runtime: RuntimeHandle,
862}
863
864struct DeferredStateRelease {
865    runtime: RuntimeHandle,
866    id: StateId,
867}
868
869pub(crate) struct StateHandleLease {
870    id: StateId,
871    runtime: RuntimeHandle,
872}
873
874impl StateHandleLease {
875    pub(crate) fn id(&self) -> StateId {
876        self.id
877    }
878
879    pub(crate) fn runtime(&self) -> RuntimeHandle {
880        self.runtime.clone()
881    }
882}
883
884impl Drop for StateHandleLease {
885    fn drop(&mut self) {
886        defer_state_release(self.runtime.clone(), self.id);
887    }
888}
889
890impl RuntimeHandle {
891    pub fn id(&self) -> RuntimeId {
892        self.id
893    }
894
895    pub(crate) fn alloc_state<T: Clone + 'static>(&self, value: T) -> Rc<StateHandleLease> {
896        let id = self.with_state_arena(|arena| arena.alloc(value, self.clone()));
897        let lease = Rc::new(StateHandleLease {
898            id,
899            runtime: self.clone(),
900        });
901        self.with_state_arena(|arena| arena.register_lease(id, &lease));
902        lease
903    }
904
905    pub(crate) fn alloc_state_with_policy<T: Clone + 'static>(
906        &self,
907        value: T,
908        policy: Arc<dyn MutationPolicy<T>>,
909    ) -> Rc<StateHandleLease> {
910        let id =
911            self.with_state_arena(|arena| arena.alloc_with_policy(value, self.clone(), policy));
912        let lease = Rc::new(StateHandleLease {
913            id,
914            runtime: self.clone(),
915        });
916        self.with_state_arena(|arena| arena.register_lease(id, &lease));
917        lease
918    }
919
920    pub(crate) fn alloc_persistent_state<T: Clone + 'static>(
921        &self,
922        value: T,
923    ) -> crate::MutableState<T> {
924        let lease = self.alloc_state(value);
925        if let Some(inner) = self.inner.upgrade() {
926            inner
927                .external_state_owners
928                .borrow_mut()
929                .insert(lease.id(), Rc::clone(&lease));
930        }
931        crate::MutableState::from_lease(&lease)
932    }
933
934    pub(crate) fn retain_state_lease(&self, id: StateId) -> Option<Rc<StateHandleLease>> {
935        self.with_state_arena(|arena| arena.retain_lease(id))
936    }
937
938    pub(crate) fn with_state_arena<R>(&self, f: impl FnOnce(&StateArena) -> R) -> R {
939        self.inner
940            .upgrade()
941            .map(|inner| f(&inner.state_arena))
942            .unwrap_or_else(|| panic!("runtime dropped"))
943    }
944
945    fn release_state_immediate(&self, id: StateId) {
946        if let Some(inner) = self.inner.upgrade() {
947            inner.state_arena.release(id);
948        }
949    }
950
951    pub fn state_arena_stats(&self) -> (usize, usize) {
952        self.with_state_arena(StateArena::stats)
953    }
954
955    pub fn state_arena_debug_stats(&self) -> StateArenaDebugStats {
956        self.with_state_arena(StateArena::debug_stats)
957    }
958
959    pub fn debug_stats(&self) -> RuntimeDebugStats {
960        self.inner
961            .upgrade()
962            .map(|inner| inner.debug_stats())
963            .unwrap_or_default()
964    }
965
966    pub(crate) fn unregister_state_scope(&self, id: StateId, scope_id: ScopeId) {
967        if let Some(inner) = self.inner.upgrade() {
968            inner.state_arena.unregister_scope(id, scope_id);
969        }
970    }
971
972    pub fn schedule(&self) {
973        if let Some(inner) = self.inner.upgrade() {
974            inner.schedule();
975        }
976    }
977
978    pub(crate) fn enqueue_node_update(&self, command: Command) {
979        if let Some(inner) = self.inner.upgrade() {
980            inner.enqueue_update(command);
981        }
982    }
983
984    /// Schedules work that must run on the runtime thread.
985    ///
986    /// The closure executes on the UI thread immediately when the runtime
987    /// drains its local queue, so it may capture `Rc`/`RefCell` values. Calling
988    /// this from any other thread is a logic error and will panic in debug
989    /// builds via the inner assertion.
990    pub fn enqueue_ui_task(&self, task: Box<dyn FnOnce() + 'static>) {
991        if let Some(inner) = self.inner.upgrade() {
992            inner.enqueue_ui_task(task);
993        } else {
994            task();
995        }
996    }
997
998    pub fn spawn_ui<F>(&self, fut: F) -> Option<TaskHandle>
999    where
1000        F: Future<Output = ()> + 'static,
1001    {
1002        self.inner.upgrade().map(|inner| {
1003            let id = inner.spawn_ui_task(Box::pin(fut));
1004            TaskHandle {
1005                id,
1006                runtime: self.clone(),
1007            }
1008        })
1009    }
1010
1011    pub fn cancel_task(&self, id: u64) {
1012        if let Some(inner) = self.inner.upgrade() {
1013            inner.cancel_task(id);
1014        }
1015    }
1016
1017    /// Enqueues work from any thread to run on the UI thread.
1018    ///
1019    /// The closure must be `Send` because it may cross threads before executing
1020    /// on the runtime thread. Use this when posting from background work.
1021    pub fn post_ui(&self, task: impl FnOnce() + Send + 'static) {
1022        self.dispatcher.post(task);
1023    }
1024
1025    pub fn register_ui_cont<T: 'static>(&self, f: impl FnOnce(T) + 'static) -> Option<u64> {
1026        self.inner.upgrade().map(|inner| inner.register_ui_cont(f))
1027    }
1028
1029    pub fn cancel_ui_cont(&self, id: u64) {
1030        if let Some(inner) = self.inner.upgrade() {
1031            inner.cancel_ui_cont(id);
1032        }
1033    }
1034
1035    pub fn drain_ui(&self) {
1036        if let Some(inner) = self.inner.upgrade() {
1037            inner.drain_ui();
1038        }
1039    }
1040
1041    pub fn has_pending_ui(&self) -> bool {
1042        self.inner
1043            .upgrade()
1044            .map(|inner| inner.has_pending_ui())
1045            .unwrap_or_else(|| self.dispatcher.has_pending())
1046    }
1047
1048    pub fn register_frame_callback(
1049        &self,
1050        callback: impl FnOnce(u64) + 'static,
1051    ) -> Option<FrameCallbackId> {
1052        self.inner
1053            .upgrade()
1054            .map(|inner| inner.register_frame_callback(Box::new(callback)))
1055    }
1056
1057    pub fn cancel_frame_callback(&self, id: FrameCallbackId) {
1058        if let Some(inner) = self.inner.upgrade() {
1059            inner.cancel_frame_callback(id);
1060        }
1061    }
1062
1063    pub fn drain_frame_callbacks(&self, frame_time_nanos: u64) {
1064        if let Some(inner) = self.inner.upgrade() {
1065            inner.drain_frame_callbacks(frame_time_nanos);
1066        }
1067    }
1068
1069    #[cfg(any(feature = "internal", test))]
1070    pub fn frame_clock(&self) -> FrameClock {
1071        FrameClock::new(self.clone())
1072    }
1073
1074    pub fn set_needs_frame(&self, value: bool) {
1075        if let Some(inner) = self.inner.upgrade() {
1076            *inner.needs_frame.borrow_mut() = value;
1077        }
1078    }
1079
1080    pub(crate) fn take_updates(&self) -> Vec<Command> {
1081        self.inner
1082            .upgrade()
1083            .map(|inner| inner.take_updates())
1084            .unwrap_or_default()
1085    }
1086
1087    pub fn has_updates(&self) -> bool {
1088        self.inner
1089            .upgrade()
1090            .map(|inner| inner.has_updates())
1091            .unwrap_or(false)
1092    }
1093
1094    pub(crate) fn mark_scope_recomposed(&self, id: ScopeId) {
1095        if let Some(inner) = self.inner.upgrade() {
1096            inner.mark_scope_recomposed(id);
1097        }
1098    }
1099
1100    pub(crate) fn register_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
1101        if let Some(inner) = self.inner.upgrade() {
1102            inner.register_invalid_scope(id, scope);
1103        }
1104    }
1105
1106    pub(crate) fn requeue_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
1107        if let Some(inner) = self.inner.upgrade() {
1108            inner.requeue_invalid_scope(id, scope);
1109        }
1110    }
1111
1112    pub(crate) fn take_invalidated_scopes(&self) -> Vec<(ScopeId, Weak<RecomposeScopeInner>)> {
1113        self.inner
1114            .upgrade()
1115            .map(|inner| inner.take_invalidated_scopes())
1116            .unwrap_or_default()
1117    }
1118
1119    pub fn has_invalid_scopes(&self) -> bool {
1120        self.inner
1121            .upgrade()
1122            .map(|inner| inner.has_invalid_scopes())
1123            .unwrap_or(false)
1124    }
1125
1126    #[doc(hidden)]
1127    pub fn debug_invalid_scope_ids(&self) -> Vec<usize> {
1128        self.inner
1129            .upgrade()
1130            .map(|inner| inner.invalid_scopes.borrow().iter().copied().collect())
1131            .unwrap_or_default()
1132    }
1133
1134    pub fn has_frame_callbacks(&self) -> bool {
1135        self.inner
1136            .upgrade()
1137            .map(|inner| inner.has_frame_callbacks())
1138            .unwrap_or(false)
1139    }
1140
1141    pub fn assert_ui_thread(&self) {
1142        debug_assert_eq!(
1143            std::thread::current().id(),
1144            self.ui_thread_id,
1145            "state mutated off the runtime's UI thread"
1146        );
1147    }
1148
1149    pub fn dispatcher(&self) -> UiDispatcher {
1150        self.dispatcher.clone()
1151    }
1152
1153    #[doc(hidden)]
1154    pub fn with_deferred_state_releases<R>(&self, f: impl FnOnce() -> R) -> R {
1155        let _scope = enter_state_teardown_scope();
1156        f()
1157    }
1158}
1159
1160impl TaskHandle {
1161    pub fn cancel(self) {
1162        self.runtime.cancel_task(self.id);
1163    }
1164}
1165
1166pub(crate) struct FrameCallbackEntry {
1167    id: FrameCallbackId,
1168    callback: Option<Box<dyn FnOnce(u64) + 'static>>,
1169}
1170
1171struct RuntimeTaskWaker {
1172    scheduler: Arc<dyn RuntimeScheduler>,
1173}
1174
1175impl RuntimeTaskWaker {
1176    fn new(inner: Weak<RuntimeInner>) -> Self {
1177        // Extract the Arc<RuntimeScheduler> which IS Send+Sync
1178        // This way we can wake the runtime without storing the Rc::Weak
1179        let scheduler = inner
1180            .upgrade()
1181            .map(|rc| rc.scheduler.clone())
1182            .expect("RuntimeInner dropped before waker created");
1183        Self { scheduler }
1184    }
1185
1186    fn into_waker(self) -> Waker {
1187        futures_task::waker(Arc::new(self))
1188    }
1189}
1190
1191impl futures_task::ArcWake for RuntimeTaskWaker {
1192    fn wake_by_ref(arc_self: &Arc<Self>) {
1193        arc_self.scheduler.schedule_frame();
1194    }
1195}
1196
1197thread_local! {
1198    static ACTIVE_RUNTIMES: RefCell<Vec<RuntimeHandle>> = const { RefCell::new(Vec::new()) };
1199    static LAST_RUNTIME: RefCell<Option<RuntimeHandle>> = const { RefCell::new(None) };
1200    static REGISTERED_RUNTIMES: RefCell<HashMap<RuntimeId, RuntimeHandle>> = RefCell::new(HashMap::default());
1201    static STATE_TEARDOWN_DEPTH: Cell<usize> = const { Cell::new(0) };
1202    static DEFERRED_STATE_RELEASES: RefCell<Vec<DeferredStateRelease>> = const { RefCell::new(Vec::new()) };
1203}
1204
1205#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1206pub struct RuntimeThreadLocalDebugStats {
1207    pub active_runtimes_len: usize,
1208    pub active_runtimes_cap: usize,
1209    pub registered_runtimes_len: usize,
1210    pub registered_runtimes_cap: usize,
1211    pub deferred_state_releases_len: usize,
1212    pub deferred_state_releases_cap: usize,
1213}
1214
1215/// Gets the current runtime handle from thread-local storage.
1216///
1217/// Returns the most recently pushed active runtime, or the last known runtime.
1218/// Used by fling animation and other components that need access to the runtime.
1219pub fn current_runtime_handle() -> Option<RuntimeHandle> {
1220    if let Some(handle) = ACTIVE_RUNTIMES.with(|stack| stack.borrow().last().cloned()) {
1221        return Some(handle);
1222    }
1223    LAST_RUNTIME.with(|slot| slot.borrow().clone())
1224}
1225
1226pub(crate) fn runtime_handle_by_id(id: RuntimeId) -> Option<RuntimeHandle> {
1227    REGISTERED_RUNTIMES.with(|registry| registry.borrow().get(&id).cloned())
1228}
1229
1230pub fn debug_runtime_thread_local_stats() -> RuntimeThreadLocalDebugStats {
1231    let (active_runtimes_len, active_runtimes_cap) = ACTIVE_RUNTIMES.with(|stack| {
1232        let stack = stack.borrow();
1233        (stack.len(), stack.capacity())
1234    });
1235    let (registered_runtimes_len, registered_runtimes_cap) = REGISTERED_RUNTIMES.with(|registry| {
1236        let registry = registry.borrow();
1237        (registry.len(), registry.capacity())
1238    });
1239    let (deferred_state_releases_len, deferred_state_releases_cap) =
1240        DEFERRED_STATE_RELEASES.with(|releases| {
1241            let releases = releases.borrow();
1242            (releases.len(), releases.capacity())
1243        });
1244
1245    RuntimeThreadLocalDebugStats {
1246        active_runtimes_len,
1247        active_runtimes_cap,
1248        registered_runtimes_len,
1249        registered_runtimes_cap,
1250        deferred_state_releases_len,
1251        deferred_state_releases_cap,
1252    }
1253}
1254
1255fn register_runtime_handle(handle: &RuntimeHandle) {
1256    REGISTERED_RUNTIMES.with(|registry| {
1257        registry.borrow_mut().insert(handle.id(), handle.clone());
1258    });
1259}
1260
1261fn unregister_runtime_handle(id: RuntimeId) {
1262    REGISTERED_RUNTIMES.with(|registry| {
1263        registry.borrow_mut().remove(&id);
1264    });
1265}
1266
1267fn defer_state_release(runtime: RuntimeHandle, id: StateId) {
1268    let teardown_active = STATE_TEARDOWN_DEPTH.with(|depth| depth.get() > 0);
1269    if teardown_active {
1270        DEFERRED_STATE_RELEASES.with(|releases| {
1271            releases
1272                .borrow_mut()
1273                .push(DeferredStateRelease { runtime, id });
1274        });
1275    } else {
1276        runtime.release_state_immediate(id);
1277    }
1278}
1279
1280fn flush_deferred_state_releases() {
1281    DEFERRED_STATE_RELEASES.with(|releases| {
1282        let mut releases = releases.borrow_mut();
1283        while let Some(deferred) = releases.pop() {
1284            deferred.runtime.release_state_immediate(deferred.id);
1285        }
1286    });
1287}
1288
1289pub(crate) struct StateTeardownScope;
1290
1291pub(crate) fn enter_state_teardown_scope() -> StateTeardownScope {
1292    STATE_TEARDOWN_DEPTH.with(|depth| depth.set(depth.get() + 1));
1293    StateTeardownScope
1294}
1295
1296impl Drop for StateTeardownScope {
1297    fn drop(&mut self) {
1298        STATE_TEARDOWN_DEPTH.with(|depth| {
1299            let next = depth.get().saturating_sub(1);
1300            depth.set(next);
1301            if next == 0 {
1302                flush_deferred_state_releases();
1303            }
1304        });
1305    }
1306}
1307
1308pub(crate) fn push_active_runtime(handle: &RuntimeHandle) {
1309    register_runtime_handle(handle);
1310    ACTIVE_RUNTIMES.with(|stack| stack.borrow_mut().push(handle.clone()));
1311    LAST_RUNTIME.with(|slot| *slot.borrow_mut() = Some(handle.clone()));
1312}
1313
1314pub(crate) fn pop_active_runtime() {
1315    ACTIVE_RUNTIMES.with(|stack| {
1316        stack.borrow_mut().pop();
1317    });
1318}
1319
1320/// Schedule a new frame render using the most recently active runtime handle.
1321pub fn schedule_frame() {
1322    if let Some(handle) = current_runtime_handle() {
1323        handle.schedule();
1324        return;
1325    }
1326    panic!("no runtime available to schedule frame");
1327}
1328
1329/// Schedule an in-place node update using the most recently active runtime.
1330pub fn schedule_node_update(
1331    update: impl FnOnce(&mut dyn Applier) -> Result<(), NodeError> + 'static,
1332) {
1333    let handle = current_runtime_handle().expect("no runtime available to schedule node update");
1334    handle.enqueue_node_update(Command::callback(update));
1335}