Skip to main content

cranpose_core/
runtime.rs

1use crate::collections::map::HashMap;
2use crate::collections::map::HashSet;
3use crate::state::{MutationPolicy, NeverEqual};
4use crate::MutableStateInner;
5use std::any::Any;
6use std::cell::{Cell, RefCell};
7use std::collections::VecDeque;
8use std::future::Future;
9use std::pin::Pin;
10use std::rc::{Rc, Weak};
11use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
12use std::sync::{mpsc, Arc};
13use std::task::{Context, Poll, Waker};
14use std::thread::ThreadId;
15use std::thread_local;
16
17#[cfg(any(feature = "internal", test))]
18use crate::frame_clock::FrameClock;
19use crate::platform::RuntimeScheduler;
20use crate::{Applier, Command, FrameCallbackId, NodeError, RecomposeScopeInner, ScopeId};
21
22enum UiMessage {
23    Task(Box<dyn FnOnce() + Send + 'static>),
24    Invoke { id: u64, value: Box<dyn Any + Send> },
25}
26
27type UiContinuation = Box<dyn Fn(Box<dyn Any>) + 'static>;
28type UiContinuationMap = HashMap<u64, UiContinuation>;
29
30struct TypedStateCell<T: Clone + 'static> {
31    inner: MutableStateInner<T>,
32}
33
34trait ScopeWatchCell {
35    fn unregister_scope(&self, scope_id: ScopeId);
36}
37
38impl<T: Clone + 'static> ScopeWatchCell for TypedStateCell<T> {
39    fn unregister_scope(&self, scope_id: ScopeId) {
40        self.inner.unregister_scope(scope_id);
41    }
42}
43
44struct StateArenaSlot {
45    generation: u32,
46    cell: Option<Rc<dyn Any>>,
47    watcher_cell: Option<Rc<dyn ScopeWatchCell>>,
48    lease: Option<Weak<StateHandleLease>>,
49}
50
51#[derive(Default)]
52struct StateArenaInner {
53    cells: Vec<StateArenaSlot>,
54    free: Vec<u32>,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
58pub struct StateArenaDebugStats {
59    pub cells_len: usize,
60    pub cells_cap: usize,
61    pub free_len: usize,
62    pub free_cap: usize,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
66pub struct RuntimeDebugStats {
67    pub node_updates_len: usize,
68    pub node_updates_cap: usize,
69    pub invalid_scopes_len: usize,
70    pub invalid_scopes_cap: usize,
71    pub scope_queue_len: usize,
72    pub scope_queue_cap: usize,
73    pub frame_callbacks_len: usize,
74    pub frame_callbacks_cap: usize,
75    pub local_tasks_len: usize,
76    pub local_tasks_cap: usize,
77    pub ui_conts_len: usize,
78    pub ui_conts_cap: usize,
79    pub tasks_len: usize,
80    pub tasks_cap: usize,
81    pub external_state_owners_len: usize,
82    pub external_state_owners_cap: usize,
83    pub ui_dispatcher_pending: usize,
84}
85
86#[derive(Default)]
87pub(crate) struct StateArena {
88    inner: RefCell<StateArenaInner>,
89}
90
91impl StateArena {
92    pub(crate) fn alloc<T: Clone + 'static>(&self, value: T, runtime: RuntimeHandle) -> StateId {
93        self.alloc_with_policy(value, runtime, Arc::new(NeverEqual))
94    }
95
96    pub(crate) fn alloc_with_policy<T: Clone + 'static>(
97        &self,
98        value: T,
99        runtime: RuntimeHandle,
100        policy: Arc<dyn MutationPolicy<T>>,
101    ) -> StateId {
102        let (slot, generation) = {
103            let mut inner = self.inner.borrow_mut();
104            match inner.free.pop() {
105                Some(slot) => {
106                    let entry = inner
107                        .cells
108                        .get_mut(slot as usize)
109                        .expect("state slot missing");
110                    debug_assert!(entry.cell.is_none(), "reused state slot must be empty");
111                    entry.generation = entry.generation.wrapping_add(1);
112                    (slot, entry.generation)
113                }
114                None => {
115                    let slot = inner.cells.len() as u32;
116                    inner.cells.push(StateArenaSlot {
117                        generation: 0,
118                        cell: None,
119                        watcher_cell: None,
120                        lease: None,
121                    });
122                    (slot, 0)
123                }
124            }
125        };
126        let id = StateId::new(slot, generation);
127        let inner = MutableStateInner::new_with_policy(value, runtime.clone(), policy);
128        inner.install_snapshot_observer(id);
129        let typed_cell = Rc::new(TypedStateCell { inner });
130        let cell: Rc<dyn Any> = typed_cell.clone();
131        let watcher_cell: Rc<dyn ScopeWatchCell> = typed_cell;
132        let mut arena = self.inner.borrow_mut();
133        let slot_entry = &mut arena.cells[slot as usize];
134        slot_entry.cell = Some(cell);
135        slot_entry.watcher_cell = Some(watcher_cell);
136        id
137    }
138
139    fn get_cell_opt(&self, id: StateId) -> Option<Rc<dyn Any>> {
140        self.inner
141            .borrow()
142            .cells
143            .get(id.slot_index())
144            .filter(|cell| cell.generation == id.generation())
145            .and_then(|cell| cell.cell.as_ref())
146            .cloned()
147    }
148
149    fn get_typed<T: Clone + 'static>(&self, id: StateId) -> Rc<TypedStateCell<T>> {
150        match self.get_cell_opt(id) {
151            None => panic!(
152                "state cell missing: slot={}, gen={}, expected={}",
153                id.slot(),
154                id.generation(),
155                std::any::type_name::<T>(),
156            ),
157            Some(cell) => Rc::downcast::<TypedStateCell<T>>(cell).unwrap_or_else(|_| {
158                panic!(
159                    "state cell type mismatch: slot={}, gen={}, expected={}",
160                    id.slot(),
161                    id.generation(),
162                    std::any::type_name::<T>(),
163                )
164            }),
165        }
166    }
167
168    fn get_typed_opt<T: Clone + 'static>(&self, id: StateId) -> Option<Rc<TypedStateCell<T>>> {
169        Rc::downcast::<TypedStateCell<T>>(self.get_cell_opt(id)?).ok()
170    }
171
172    pub(crate) fn with_typed<T: Clone + 'static, R>(
173        &self,
174        id: StateId,
175        f: impl FnOnce(&MutableStateInner<T>) -> R,
176    ) -> R {
177        let cell = self.get_typed::<T>(id);
178        f(&cell.inner)
179    }
180
181    pub(crate) fn with_typed_opt<T: Clone + 'static, R>(
182        &self,
183        id: StateId,
184        f: impl FnOnce(&MutableStateInner<T>) -> R,
185    ) -> Option<R> {
186        let cell = self.get_typed_opt::<T>(id)?;
187        Some(f(&cell.inner))
188    }
189
190    pub(crate) fn release(&self, id: StateId) {
191        let cell = {
192            let mut inner = self.inner.borrow_mut();
193            let Some(slot) = inner.cells.get_mut(id.slot_index()) else {
194                return;
195            };
196            if slot.generation != id.generation() {
197                return;
198            }
199            slot.lease = None;
200            slot.watcher_cell = None;
201            let cell = slot.cell.take();
202            if cell.is_some() {
203                inner.free.push(id.slot());
204            }
205            cell
206        };
207        drop(cell);
208    }
209
210    pub(crate) fn stats(&self) -> (usize, usize) {
211        let inner = self.inner.borrow();
212        (inner.cells.len(), inner.free.len())
213    }
214
215    pub(crate) fn debug_stats(&self) -> StateArenaDebugStats {
216        let inner = self.inner.borrow();
217        StateArenaDebugStats {
218            cells_len: inner.cells.len(),
219            cells_cap: inner.cells.capacity(),
220            free_len: inner.free.len(),
221            free_cap: inner.free.capacity(),
222        }
223    }
224
225    pub(crate) fn unregister_scope(&self, id: StateId, scope_id: ScopeId) {
226        let watcher_cell = {
227            let inner = self.inner.borrow();
228            inner
229                .cells
230                .get(id.slot_index())
231                .filter(|slot| slot.generation == id.generation())
232                .and_then(|slot| slot.watcher_cell.as_ref())
233                .cloned()
234        };
235        if let Some(watcher_cell) = watcher_cell {
236            watcher_cell.unregister_scope(scope_id);
237        }
238    }
239
240    pub(crate) fn register_lease(&self, id: StateId, lease: &Rc<StateHandleLease>) {
241        let mut inner = self.inner.borrow_mut();
242        let Some(slot) = inner.cells.get_mut(id.slot_index()) else {
243            panic!("state slot missing");
244        };
245        assert_eq!(
246            slot.generation,
247            id.generation(),
248            "state generation mismatch"
249        );
250        slot.lease = Some(Rc::downgrade(lease));
251    }
252
253    pub(crate) fn retain_lease(&self, id: StateId) -> Option<Rc<StateHandleLease>> {
254        let inner = self.inner.borrow();
255        let slot = inner.cells.get(id.slot_index())?;
256        if slot.generation != id.generation() {
257            return None;
258        }
259        slot.lease.as_ref()?.upgrade()
260    }
261}
262
263#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
264pub struct StateId {
265    slot: u32,
266    generation: u32,
267}
268
269impl StateId {
270    const fn new(slot: u32, generation: u32) -> Self {
271        Self { slot, generation }
272    }
273
274    pub(crate) const fn slot(self) -> u32 {
275        self.slot
276    }
277
278    pub(crate) const fn slot_index(self) -> usize {
279        self.slot as usize
280    }
281
282    pub(crate) const fn generation(self) -> u32 {
283        self.generation
284    }
285}
286
287#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
288pub struct RuntimeId(u32);
289
290impl RuntimeId {
291    fn next() -> Self {
292        static NEXT_RUNTIME_ID: AtomicU32 = AtomicU32::new(1);
293        Self(NEXT_RUNTIME_ID.fetch_add(1, Ordering::Relaxed))
294    }
295}
296
297struct UiDispatcherInner {
298    scheduler: Arc<dyn RuntimeScheduler>,
299    tx: mpsc::Sender<UiMessage>,
300    pending: AtomicUsize,
301}
302
303impl UiDispatcherInner {
304    fn new(scheduler: Arc<dyn RuntimeScheduler>, tx: mpsc::Sender<UiMessage>) -> Self {
305        Self {
306            scheduler,
307            tx,
308            pending: AtomicUsize::new(0),
309        }
310    }
311
312    fn post(&self, task: impl FnOnce() + Send + 'static) {
313        self.pending.fetch_add(1, Ordering::SeqCst);
314        let _ = self.tx.send(UiMessage::Task(Box::new(task)));
315        self.scheduler.schedule_frame();
316    }
317
318    fn post_invoke(&self, id: u64, value: Box<dyn Any + Send>) {
319        self.pending.fetch_add(1, Ordering::SeqCst);
320        let _ = self.tx.send(UiMessage::Invoke { id, value });
321        self.scheduler.schedule_frame();
322    }
323
324    fn has_pending(&self) -> bool {
325        self.pending.load(Ordering::SeqCst) > 0
326    }
327}
328
329struct PendingGuard<'a> {
330    counter: &'a AtomicUsize,
331}
332
333impl<'a> PendingGuard<'a> {
334    fn new(counter: &'a AtomicUsize) -> Self {
335        Self { counter }
336    }
337}
338
339impl<'a> Drop for PendingGuard<'a> {
340    fn drop(&mut self) {
341        let previous = self.counter.fetch_sub(1, Ordering::SeqCst);
342        debug_assert!(previous > 0, "UI dispatcher pending count underflowed");
343    }
344}
345
346#[derive(Clone)]
347pub struct UiDispatcher {
348    inner: Arc<UiDispatcherInner>,
349}
350
351impl UiDispatcher {
352    fn new(inner: Arc<UiDispatcherInner>) -> Self {
353        Self { inner }
354    }
355
356    pub fn post(&self, task: impl FnOnce() + Send + 'static) {
357        self.inner.post(task);
358    }
359
360    pub fn post_invoke<T>(&self, id: u64, value: T)
361    where
362        T: Send + 'static,
363    {
364        self.inner.post_invoke(id, Box::new(value));
365    }
366
367    pub fn has_pending(&self) -> bool {
368        self.inner.has_pending()
369    }
370}
371
372struct RuntimeInner {
373    scheduler: Arc<dyn RuntimeScheduler>,
374    needs_frame: RefCell<bool>,
375    node_updates: RefCell<Vec<Command>>,
376    invalid_scopes: RefCell<HashSet<ScopeId>>,
377    scope_queue: RefCell<Vec<(ScopeId, Weak<RecomposeScopeInner>)>>,
378    frame_callbacks: RefCell<VecDeque<FrameCallbackEntry>>,
379    next_frame_callback_id: Cell<u64>,
380    ui_dispatcher: Arc<UiDispatcherInner>,
381    ui_rx: RefCell<mpsc::Receiver<UiMessage>>,
382    local_tasks: RefCell<VecDeque<Box<dyn FnOnce() + 'static>>>,
383    ui_conts: RefCell<UiContinuationMap>,
384    next_cont_id: Cell<u64>,
385    ui_thread_id: ThreadId,
386    tasks: RefCell<Vec<TaskEntry>>,
387    next_task_id: Cell<u64>,
388    task_waker: RefCell<Option<Waker>>,
389    state_arena: StateArena,
390    external_state_owners: RefCell<HashMap<StateId, Rc<StateHandleLease>>>,
391    runtime_id: RuntimeId,
392}
393
394struct TaskEntry {
395    id: u64,
396    future: Pin<Box<dyn Future<Output = ()> + 'static>>,
397}
398
399impl RuntimeInner {
400    fn new(scheduler: Arc<dyn RuntimeScheduler>) -> Self {
401        let (tx, rx) = mpsc::channel();
402        let dispatcher = Arc::new(UiDispatcherInner::new(scheduler.clone(), tx));
403        Self {
404            scheduler,
405            needs_frame: RefCell::new(false),
406            node_updates: RefCell::new(Vec::new()),
407            invalid_scopes: RefCell::new(HashSet::default()),
408            scope_queue: RefCell::new(Vec::new()),
409            frame_callbacks: RefCell::new(VecDeque::new()),
410            next_frame_callback_id: Cell::new(1),
411            ui_dispatcher: dispatcher,
412            ui_rx: RefCell::new(rx),
413            local_tasks: RefCell::new(VecDeque::new()),
414            ui_conts: RefCell::new(UiContinuationMap::default()),
415            next_cont_id: Cell::new(1),
416            ui_thread_id: std::thread::current().id(),
417            tasks: RefCell::new(Vec::new()),
418            next_task_id: Cell::new(1),
419            task_waker: RefCell::new(None),
420            state_arena: StateArena::default(),
421            external_state_owners: RefCell::new(HashMap::default()),
422            runtime_id: RuntimeId::next(),
423        }
424    }
425
426    fn init_task_waker(this: &Rc<Self>) {
427        let weak = Rc::downgrade(this);
428        let waker = RuntimeTaskWaker::new(weak).into_waker();
429        *this.task_waker.borrow_mut() = Some(waker);
430    }
431
432    fn schedule(&self) {
433        *self.needs_frame.borrow_mut() = true;
434        self.scheduler.schedule_frame();
435    }
436
437    fn enqueue_update(&self, command: Command) {
438        self.node_updates.borrow_mut().push(command);
439        self.schedule(); // Ensure frame is scheduled to process the command
440    }
441
442    fn take_updates(&self) -> Vec<Command> {
443        let updates = self.node_updates.borrow_mut().drain(..).collect::<Vec<_>>();
444        updates
445    }
446
447    fn has_updates(&self) -> bool {
448        !self.node_updates.borrow().is_empty() || self.has_invalid_scopes()
449    }
450
451    fn register_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
452        let mut invalid = self.invalid_scopes.borrow_mut();
453        if invalid.insert(id) {
454            self.scope_queue.borrow_mut().push((id, scope));
455            self.schedule();
456        }
457    }
458
459    fn mark_scope_recomposed(&self, id: ScopeId) {
460        self.invalid_scopes.borrow_mut().remove(&id);
461    }
462
463    fn take_invalidated_scopes(&self) -> Vec<(ScopeId, Weak<RecomposeScopeInner>)> {
464        let mut queue = self.scope_queue.borrow_mut();
465        if queue.is_empty() {
466            return Vec::new();
467        }
468        let pending: Vec<_> = queue.drain(..).collect();
469        drop(queue);
470        let invalid = self.invalid_scopes.borrow();
471        pending
472            .into_iter()
473            .filter(|(id, _)| invalid.contains(id))
474            .collect()
475    }
476
477    fn has_invalid_scopes(&self) -> bool {
478        !self.invalid_scopes.borrow().is_empty()
479    }
480
481    fn has_frame_callbacks(&self) -> bool {
482        !self.frame_callbacks.borrow().is_empty()
483    }
484
485    /// Queues a closure that is already bound to the UI thread's local queue.
486    ///
487    /// The closure may capture `Rc`/`RefCell` values because it never leaves the
488    /// runtime thread. Callers must only invoke this from the runtime thread.
489    fn enqueue_ui_task(&self, task: Box<dyn FnOnce() + 'static>) {
490        self.local_tasks.borrow_mut().push_back(task);
491        self.schedule();
492    }
493
494    fn spawn_ui_task(&self, future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> u64 {
495        let id = self.next_task_id.get();
496        self.next_task_id.set(id + 1);
497        self.tasks.borrow_mut().push(TaskEntry { id, future });
498        self.schedule();
499        id
500    }
501
502    fn cancel_task(&self, id: u64) {
503        let mut tasks = self.tasks.borrow_mut();
504        if tasks.iter().any(|entry| entry.id == id) {
505            tasks.retain(|entry| entry.id != id);
506        }
507    }
508
509    fn poll_async_tasks(&self) -> bool {
510        let waker = match self.task_waker.borrow().as_ref() {
511            Some(waker) => waker.clone(),
512            None => return false,
513        };
514        let mut cx = Context::from_waker(&waker);
515        let mut tasks_ref = self.tasks.borrow_mut();
516        let tasks = std::mem::take(&mut *tasks_ref);
517        drop(tasks_ref);
518        let mut pending = Vec::with_capacity(tasks.len());
519        let mut made_progress = false;
520        for mut entry in tasks.into_iter() {
521            match entry.future.as_mut().poll(&mut cx) {
522                Poll::Ready(()) => {
523                    made_progress = true;
524                }
525                Poll::Pending => {
526                    pending.push(entry);
527                }
528            }
529        }
530        if !pending.is_empty() {
531            self.tasks.borrow_mut().extend(pending);
532        }
533        made_progress
534    }
535
536    fn drain_ui(&self) {
537        loop {
538            let mut executed = false;
539
540            {
541                let rx = &mut *self.ui_rx.borrow_mut();
542                for message in rx.try_iter() {
543                    executed = true;
544                    let _guard = PendingGuard::new(&self.ui_dispatcher.pending);
545                    match message {
546                        UiMessage::Task(task) => {
547                            task();
548                        }
549                        UiMessage::Invoke { id, value } => {
550                            self.invoke_ui_cont(id, value);
551                        }
552                    }
553                }
554            }
555
556            loop {
557                let task = {
558                    let mut local = self.local_tasks.borrow_mut();
559                    local.pop_front()
560                };
561
562                match task {
563                    Some(task) => {
564                        executed = true;
565                        task();
566                    }
567                    None => break,
568                }
569            }
570
571            if self.poll_async_tasks() {
572                executed = true;
573            }
574
575            if !executed {
576                break;
577            }
578        }
579    }
580
581    fn has_pending_ui(&self) -> bool {
582        let local_pending = self
583            .local_tasks
584            .try_borrow()
585            .map(|tasks| !tasks.is_empty())
586            .unwrap_or(true);
587
588        let async_pending = self
589            .tasks
590            .try_borrow()
591            .map(|tasks| !tasks.is_empty())
592            .unwrap_or(true);
593
594        local_pending || self.ui_dispatcher.has_pending() || async_pending
595    }
596
597    fn register_ui_cont<T: 'static>(&self, f: impl FnOnce(T) + 'static) -> u64 {
598        debug_assert_eq!(
599            std::thread::current().id(),
600            self.ui_thread_id,
601            "UI continuation registered off the runtime thread",
602        );
603        let id = self.next_cont_id.get();
604        self.next_cont_id.set(id + 1);
605        let callback = RefCell::new(Some(f));
606        self.ui_conts.borrow_mut().insert(
607            id,
608            Box::new(move |value: Box<dyn Any>| {
609                let slot = callback
610                    .borrow_mut()
611                    .take()
612                    .expect("UI continuation invoked more than once");
613                let value = value
614                    .downcast::<T>()
615                    .expect("UI continuation type mismatch");
616                slot(*value);
617            }),
618        );
619        id
620    }
621
622    fn invoke_ui_cont(&self, id: u64, value: Box<dyn Any + Send>) {
623        debug_assert_eq!(
624            std::thread::current().id(),
625            self.ui_thread_id,
626            "UI continuation invoked off the runtime thread",
627        );
628        if let Some(callback) = self.ui_conts.borrow_mut().remove(&id) {
629            let value: Box<dyn Any> = value;
630            callback(value);
631        }
632    }
633
634    fn cancel_ui_cont(&self, id: u64) {
635        self.ui_conts.borrow_mut().remove(&id);
636    }
637
638    fn register_frame_callback(&self, callback: Box<dyn FnOnce(u64) + 'static>) -> FrameCallbackId {
639        let id = self.next_frame_callback_id.get();
640        self.next_frame_callback_id.set(id + 1);
641        self.frame_callbacks
642            .borrow_mut()
643            .push_back(FrameCallbackEntry {
644                id,
645                callback: Some(callback),
646            });
647        self.schedule();
648        id
649    }
650
651    fn cancel_frame_callback(&self, id: FrameCallbackId) {
652        let mut callbacks = self.frame_callbacks.borrow_mut();
653        if let Some(index) = callbacks.iter().position(|entry| entry.id == id) {
654            callbacks.remove(index);
655        }
656        let callbacks_empty = callbacks.is_empty();
657        drop(callbacks);
658        let local_pending = self
659            .local_tasks
660            .try_borrow()
661            .map(|tasks| !tasks.is_empty())
662            .unwrap_or(true);
663        let async_pending = self
664            .tasks
665            .try_borrow()
666            .map(|tasks| !tasks.is_empty())
667            .unwrap_or(true);
668        if !self.has_invalid_scopes()
669            && !self.has_updates()
670            && callbacks_empty
671            && !local_pending
672            && !self.ui_dispatcher.has_pending()
673            && !async_pending
674        {
675            *self.needs_frame.borrow_mut() = false;
676        }
677    }
678
679    fn drain_frame_callbacks(&self, frame_time_nanos: u64) {
680        let mut callbacks = self.frame_callbacks.borrow_mut();
681        let mut pending: Vec<Box<dyn FnOnce(u64) + 'static>> = Vec::with_capacity(callbacks.len());
682        while let Some(mut entry) = callbacks.pop_front() {
683            if let Some(callback) = entry.callback.take() {
684                pending.push(callback);
685            }
686        }
687        drop(callbacks);
688
689        // Wrap ALL frame callbacks in a single mutable snapshot so state changes
690        // are properly applied to the global snapshot and visible to subsequent reads.
691        // Using a single snapshot for all callbacks avoids stack exhaustion from
692        // repeated snapshot creation in long-running animation loops.
693        if !pending.is_empty() {
694            let _ = crate::run_in_mutable_snapshot(|| {
695                for callback in pending {
696                    callback(frame_time_nanos);
697                }
698            });
699        }
700
701        if !self.has_invalid_scopes()
702            && !self.has_updates()
703            && !self.has_frame_callbacks()
704            && !self.has_pending_ui()
705        {
706            *self.needs_frame.borrow_mut() = false;
707        }
708    }
709
710    fn debug_stats(&self) -> RuntimeDebugStats {
711        let node_updates = self.node_updates.borrow();
712        let invalid_scopes = self.invalid_scopes.borrow();
713        let scope_queue = self.scope_queue.borrow();
714        let frame_callbacks = self.frame_callbacks.borrow();
715        let local_tasks = self.local_tasks.borrow();
716        let ui_conts = self.ui_conts.borrow();
717        let tasks = self.tasks.borrow();
718        let external_state_owners = self.external_state_owners.borrow();
719
720        RuntimeDebugStats {
721            node_updates_len: node_updates.len(),
722            node_updates_cap: node_updates.capacity(),
723            invalid_scopes_len: invalid_scopes.len(),
724            invalid_scopes_cap: invalid_scopes.capacity(),
725            scope_queue_len: scope_queue.len(),
726            scope_queue_cap: scope_queue.capacity(),
727            frame_callbacks_len: frame_callbacks.len(),
728            frame_callbacks_cap: frame_callbacks.capacity(),
729            local_tasks_len: local_tasks.len(),
730            local_tasks_cap: local_tasks.capacity(),
731            ui_conts_len: ui_conts.len(),
732            ui_conts_cap: ui_conts.capacity(),
733            tasks_len: tasks.len(),
734            tasks_cap: tasks.capacity(),
735            external_state_owners_len: external_state_owners.len(),
736            external_state_owners_cap: external_state_owners.capacity(),
737            ui_dispatcher_pending: self.ui_dispatcher.pending.load(Ordering::SeqCst),
738        }
739    }
740}
741
742#[derive(Clone)]
743pub struct Runtime {
744    inner: Rc<RuntimeInner>,
745}
746
747impl Runtime {
748    pub fn new(scheduler: Arc<dyn RuntimeScheduler>) -> Self {
749        let inner = Rc::new(RuntimeInner::new(scheduler));
750        RuntimeInner::init_task_waker(&inner);
751        let runtime = Self { inner };
752        let handle = runtime.handle();
753        register_runtime_handle(&handle);
754        LAST_RUNTIME.with(|slot| *slot.borrow_mut() = Some(handle));
755        runtime
756    }
757
758    pub fn handle(&self) -> RuntimeHandle {
759        RuntimeHandle {
760            inner: Rc::downgrade(&self.inner),
761            dispatcher: UiDispatcher::new(self.inner.ui_dispatcher.clone()),
762            ui_thread_id: self.inner.ui_thread_id,
763            id: self.inner.runtime_id,
764        }
765    }
766
767    pub fn has_updates(&self) -> bool {
768        self.inner.has_updates()
769    }
770
771    pub fn needs_frame(&self) -> bool {
772        *self.inner.needs_frame.borrow() || self.inner.ui_dispatcher.has_pending()
773    }
774
775    pub fn set_needs_frame(&self, value: bool) {
776        *self.inner.needs_frame.borrow_mut() = value;
777    }
778
779    #[cfg(any(feature = "internal", test))]
780    pub fn frame_clock(&self) -> FrameClock {
781        FrameClock::new(self.handle())
782    }
783}
784
785impl Drop for Runtime {
786    fn drop(&mut self) {
787        if Rc::strong_count(&self.inner) != 1 {
788            return;
789        }
790        unregister_runtime_handle(self.inner.runtime_id);
791        LAST_RUNTIME.with(|slot| {
792            let should_clear = slot
793                .borrow()
794                .as_ref()
795                .is_some_and(|handle| handle.id() == self.inner.runtime_id);
796            if should_clear {
797                *slot.borrow_mut() = None;
798            }
799        });
800    }
801}
802
803#[derive(Default)]
804pub struct DefaultScheduler;
805
806impl RuntimeScheduler for DefaultScheduler {
807    fn schedule_frame(&self) {}
808}
809
810#[cfg(test)]
811#[derive(Default)]
812pub struct TestScheduler;
813
814#[cfg(test)]
815impl RuntimeScheduler for TestScheduler {
816    fn schedule_frame(&self) {}
817}
818
819#[cfg(test)]
820pub struct TestRuntime {
821    runtime: Runtime,
822}
823
824#[cfg(test)]
825impl Default for TestRuntime {
826    fn default() -> Self {
827        Self::new()
828    }
829}
830
831#[cfg(test)]
832impl TestRuntime {
833    pub fn new() -> Self {
834        Self {
835            runtime: Runtime::new(Arc::new(TestScheduler)),
836        }
837    }
838
839    pub fn handle(&self) -> RuntimeHandle {
840        self.runtime.handle()
841    }
842}
843
844#[derive(Clone)]
845pub struct RuntimeHandle {
846    inner: Weak<RuntimeInner>,
847    dispatcher: UiDispatcher,
848    ui_thread_id: ThreadId,
849    id: RuntimeId,
850}
851
852pub struct TaskHandle {
853    id: u64,
854    runtime: RuntimeHandle,
855}
856
857struct DeferredStateRelease {
858    runtime: RuntimeHandle,
859    id: StateId,
860}
861
862pub(crate) struct StateHandleLease {
863    id: StateId,
864    runtime: RuntimeHandle,
865}
866
867impl StateHandleLease {
868    pub(crate) fn id(&self) -> StateId {
869        self.id
870    }
871
872    pub(crate) fn runtime(&self) -> RuntimeHandle {
873        self.runtime.clone()
874    }
875}
876
877impl Drop for StateHandleLease {
878    fn drop(&mut self) {
879        defer_state_release(self.runtime.clone(), self.id);
880    }
881}
882
883impl RuntimeHandle {
884    pub fn id(&self) -> RuntimeId {
885        self.id
886    }
887
888    pub(crate) fn alloc_state<T: Clone + 'static>(&self, value: T) -> Rc<StateHandleLease> {
889        let id = self.with_state_arena(|arena| arena.alloc(value, self.clone()));
890        let lease = Rc::new(StateHandleLease {
891            id,
892            runtime: self.clone(),
893        });
894        self.with_state_arena(|arena| arena.register_lease(id, &lease));
895        lease
896    }
897
898    pub(crate) fn alloc_state_with_policy<T: Clone + 'static>(
899        &self,
900        value: T,
901        policy: Arc<dyn MutationPolicy<T>>,
902    ) -> Rc<StateHandleLease> {
903        let id =
904            self.with_state_arena(|arena| arena.alloc_with_policy(value, self.clone(), policy));
905        let lease = Rc::new(StateHandleLease {
906            id,
907            runtime: self.clone(),
908        });
909        self.with_state_arena(|arena| arena.register_lease(id, &lease));
910        lease
911    }
912
913    pub(crate) fn alloc_persistent_state<T: Clone + 'static>(
914        &self,
915        value: T,
916    ) -> crate::MutableState<T> {
917        let lease = self.alloc_state(value);
918        if let Some(inner) = self.inner.upgrade() {
919            inner
920                .external_state_owners
921                .borrow_mut()
922                .insert(lease.id(), Rc::clone(&lease));
923        }
924        crate::MutableState::from_lease(&lease)
925    }
926
927    pub(crate) fn retain_state_lease(&self, id: StateId) -> Option<Rc<StateHandleLease>> {
928        self.with_state_arena(|arena| arena.retain_lease(id))
929    }
930
931    pub(crate) fn with_state_arena<R>(&self, f: impl FnOnce(&StateArena) -> R) -> R {
932        self.inner
933            .upgrade()
934            .map(|inner| f(&inner.state_arena))
935            .unwrap_or_else(|| panic!("runtime dropped"))
936    }
937
938    fn release_state_immediate(&self, id: StateId) {
939        if let Some(inner) = self.inner.upgrade() {
940            inner.state_arena.release(id);
941        }
942    }
943
944    pub fn state_arena_stats(&self) -> (usize, usize) {
945        self.with_state_arena(StateArena::stats)
946    }
947
948    pub fn state_arena_debug_stats(&self) -> StateArenaDebugStats {
949        self.with_state_arena(StateArena::debug_stats)
950    }
951
952    pub fn debug_stats(&self) -> RuntimeDebugStats {
953        self.inner
954            .upgrade()
955            .map(|inner| inner.debug_stats())
956            .unwrap_or_default()
957    }
958
959    pub(crate) fn unregister_state_scope(&self, id: StateId, scope_id: ScopeId) {
960        if let Some(inner) = self.inner.upgrade() {
961            inner.state_arena.unregister_scope(id, scope_id);
962        }
963    }
964
965    pub fn schedule(&self) {
966        if let Some(inner) = self.inner.upgrade() {
967            inner.schedule();
968        }
969    }
970
971    pub(crate) fn enqueue_node_update(&self, command: Command) {
972        if let Some(inner) = self.inner.upgrade() {
973            inner.enqueue_update(command);
974        }
975    }
976
977    /// Schedules work that must run on the runtime thread.
978    ///
979    /// The closure executes on the UI thread immediately when the runtime
980    /// drains its local queue, so it may capture `Rc`/`RefCell` values. Calling
981    /// this from any other thread is a logic error and will panic in debug
982    /// builds via the inner assertion.
983    pub fn enqueue_ui_task(&self, task: Box<dyn FnOnce() + 'static>) {
984        if let Some(inner) = self.inner.upgrade() {
985            inner.enqueue_ui_task(task);
986        } else {
987            task();
988        }
989    }
990
991    pub fn spawn_ui<F>(&self, fut: F) -> Option<TaskHandle>
992    where
993        F: Future<Output = ()> + 'static,
994    {
995        self.inner.upgrade().map(|inner| {
996            let id = inner.spawn_ui_task(Box::pin(fut));
997            TaskHandle {
998                id,
999                runtime: self.clone(),
1000            }
1001        })
1002    }
1003
1004    pub fn cancel_task(&self, id: u64) {
1005        if let Some(inner) = self.inner.upgrade() {
1006            inner.cancel_task(id);
1007        }
1008    }
1009
1010    /// Enqueues work from any thread to run on the UI thread.
1011    ///
1012    /// The closure must be `Send` because it may cross threads before executing
1013    /// on the runtime thread. Use this when posting from background work.
1014    pub fn post_ui(&self, task: impl FnOnce() + Send + 'static) {
1015        self.dispatcher.post(task);
1016    }
1017
1018    pub fn register_ui_cont<T: 'static>(&self, f: impl FnOnce(T) + 'static) -> Option<u64> {
1019        self.inner.upgrade().map(|inner| inner.register_ui_cont(f))
1020    }
1021
1022    pub fn cancel_ui_cont(&self, id: u64) {
1023        if let Some(inner) = self.inner.upgrade() {
1024            inner.cancel_ui_cont(id);
1025        }
1026    }
1027
1028    pub fn drain_ui(&self) {
1029        if let Some(inner) = self.inner.upgrade() {
1030            inner.drain_ui();
1031        }
1032    }
1033
1034    pub fn has_pending_ui(&self) -> bool {
1035        self.inner
1036            .upgrade()
1037            .map(|inner| inner.has_pending_ui())
1038            .unwrap_or_else(|| self.dispatcher.has_pending())
1039    }
1040
1041    pub fn register_frame_callback(
1042        &self,
1043        callback: impl FnOnce(u64) + 'static,
1044    ) -> Option<FrameCallbackId> {
1045        self.inner
1046            .upgrade()
1047            .map(|inner| inner.register_frame_callback(Box::new(callback)))
1048    }
1049
1050    pub fn cancel_frame_callback(&self, id: FrameCallbackId) {
1051        if let Some(inner) = self.inner.upgrade() {
1052            inner.cancel_frame_callback(id);
1053        }
1054    }
1055
1056    pub fn drain_frame_callbacks(&self, frame_time_nanos: u64) {
1057        if let Some(inner) = self.inner.upgrade() {
1058            inner.drain_frame_callbacks(frame_time_nanos);
1059        }
1060    }
1061
1062    #[cfg(any(feature = "internal", test))]
1063    pub fn frame_clock(&self) -> FrameClock {
1064        FrameClock::new(self.clone())
1065    }
1066
1067    pub fn set_needs_frame(&self, value: bool) {
1068        if let Some(inner) = self.inner.upgrade() {
1069            *inner.needs_frame.borrow_mut() = value;
1070        }
1071    }
1072
1073    pub(crate) fn take_updates(&self) -> Vec<Command> {
1074        self.inner
1075            .upgrade()
1076            .map(|inner| inner.take_updates())
1077            .unwrap_or_default()
1078    }
1079
1080    pub fn has_updates(&self) -> bool {
1081        self.inner
1082            .upgrade()
1083            .map(|inner| inner.has_updates())
1084            .unwrap_or(false)
1085    }
1086
1087    pub(crate) fn mark_scope_recomposed(&self, id: ScopeId) {
1088        if let Some(inner) = self.inner.upgrade() {
1089            inner.mark_scope_recomposed(id);
1090        }
1091    }
1092
1093    pub(crate) fn register_invalid_scope(&self, id: ScopeId, scope: Weak<RecomposeScopeInner>) {
1094        if let Some(inner) = self.inner.upgrade() {
1095            inner.register_invalid_scope(id, scope);
1096        }
1097    }
1098
1099    pub(crate) fn take_invalidated_scopes(&self) -> Vec<(ScopeId, Weak<RecomposeScopeInner>)> {
1100        self.inner
1101            .upgrade()
1102            .map(|inner| inner.take_invalidated_scopes())
1103            .unwrap_or_default()
1104    }
1105
1106    pub fn has_invalid_scopes(&self) -> bool {
1107        self.inner
1108            .upgrade()
1109            .map(|inner| inner.has_invalid_scopes())
1110            .unwrap_or(false)
1111    }
1112
1113    #[doc(hidden)]
1114    pub fn debug_invalid_scope_ids(&self) -> Vec<usize> {
1115        self.inner
1116            .upgrade()
1117            .map(|inner| inner.invalid_scopes.borrow().iter().copied().collect())
1118            .unwrap_or_default()
1119    }
1120
1121    pub fn has_frame_callbacks(&self) -> bool {
1122        self.inner
1123            .upgrade()
1124            .map(|inner| inner.has_frame_callbacks())
1125            .unwrap_or(false)
1126    }
1127
1128    pub fn assert_ui_thread(&self) {
1129        debug_assert_eq!(
1130            std::thread::current().id(),
1131            self.ui_thread_id,
1132            "state mutated off the runtime's UI thread"
1133        );
1134    }
1135
1136    pub fn dispatcher(&self) -> UiDispatcher {
1137        self.dispatcher.clone()
1138    }
1139
1140    #[doc(hidden)]
1141    pub fn with_deferred_state_releases<R>(&self, f: impl FnOnce() -> R) -> R {
1142        let _scope = enter_state_teardown_scope();
1143        f()
1144    }
1145}
1146
1147impl TaskHandle {
1148    pub fn cancel(self) {
1149        self.runtime.cancel_task(self.id);
1150    }
1151}
1152
1153pub(crate) struct FrameCallbackEntry {
1154    id: FrameCallbackId,
1155    callback: Option<Box<dyn FnOnce(u64) + 'static>>,
1156}
1157
1158struct RuntimeTaskWaker {
1159    scheduler: Arc<dyn RuntimeScheduler>,
1160}
1161
1162impl RuntimeTaskWaker {
1163    fn new(inner: Weak<RuntimeInner>) -> Self {
1164        // Extract the Arc<RuntimeScheduler> which IS Send+Sync
1165        // This way we can wake the runtime without storing the Rc::Weak
1166        let scheduler = inner
1167            .upgrade()
1168            .map(|rc| rc.scheduler.clone())
1169            .expect("RuntimeInner dropped before waker created");
1170        Self { scheduler }
1171    }
1172
1173    fn into_waker(self) -> Waker {
1174        futures_task::waker(Arc::new(self))
1175    }
1176}
1177
1178impl futures_task::ArcWake for RuntimeTaskWaker {
1179    fn wake_by_ref(arc_self: &Arc<Self>) {
1180        arc_self.scheduler.schedule_frame();
1181    }
1182}
1183
1184thread_local! {
1185    static ACTIVE_RUNTIMES: RefCell<Vec<RuntimeHandle>> = const { RefCell::new(Vec::new()) };
1186    static LAST_RUNTIME: RefCell<Option<RuntimeHandle>> = const { RefCell::new(None) };
1187    static REGISTERED_RUNTIMES: RefCell<HashMap<RuntimeId, RuntimeHandle>> = RefCell::new(HashMap::default());
1188    static STATE_TEARDOWN_DEPTH: Cell<usize> = const { Cell::new(0) };
1189    static DEFERRED_STATE_RELEASES: RefCell<Vec<DeferredStateRelease>> = const { RefCell::new(Vec::new()) };
1190}
1191
1192#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1193pub struct RuntimeThreadLocalDebugStats {
1194    pub active_runtimes_len: usize,
1195    pub active_runtimes_cap: usize,
1196    pub registered_runtimes_len: usize,
1197    pub registered_runtimes_cap: usize,
1198    pub deferred_state_releases_len: usize,
1199    pub deferred_state_releases_cap: usize,
1200}
1201
1202/// Gets the current runtime handle from thread-local storage.
1203///
1204/// Returns the most recently pushed active runtime, or the last known runtime.
1205/// Used by fling animation and other components that need access to the runtime.
1206pub fn current_runtime_handle() -> Option<RuntimeHandle> {
1207    if let Some(handle) = ACTIVE_RUNTIMES.with(|stack| stack.borrow().last().cloned()) {
1208        return Some(handle);
1209    }
1210    LAST_RUNTIME.with(|slot| slot.borrow().clone())
1211}
1212
1213pub(crate) fn runtime_handle_by_id(id: RuntimeId) -> Option<RuntimeHandle> {
1214    REGISTERED_RUNTIMES.with(|registry| registry.borrow().get(&id).cloned())
1215}
1216
1217pub fn debug_runtime_thread_local_stats() -> RuntimeThreadLocalDebugStats {
1218    let (active_runtimes_len, active_runtimes_cap) = ACTIVE_RUNTIMES.with(|stack| {
1219        let stack = stack.borrow();
1220        (stack.len(), stack.capacity())
1221    });
1222    let (registered_runtimes_len, registered_runtimes_cap) = REGISTERED_RUNTIMES.with(|registry| {
1223        let registry = registry.borrow();
1224        (registry.len(), registry.capacity())
1225    });
1226    let (deferred_state_releases_len, deferred_state_releases_cap) =
1227        DEFERRED_STATE_RELEASES.with(|releases| {
1228            let releases = releases.borrow();
1229            (releases.len(), releases.capacity())
1230        });
1231
1232    RuntimeThreadLocalDebugStats {
1233        active_runtimes_len,
1234        active_runtimes_cap,
1235        registered_runtimes_len,
1236        registered_runtimes_cap,
1237        deferred_state_releases_len,
1238        deferred_state_releases_cap,
1239    }
1240}
1241
1242fn register_runtime_handle(handle: &RuntimeHandle) {
1243    REGISTERED_RUNTIMES.with(|registry| {
1244        registry.borrow_mut().insert(handle.id(), handle.clone());
1245    });
1246}
1247
1248fn unregister_runtime_handle(id: RuntimeId) {
1249    REGISTERED_RUNTIMES.with(|registry| {
1250        registry.borrow_mut().remove(&id);
1251    });
1252}
1253
1254fn defer_state_release(runtime: RuntimeHandle, id: StateId) {
1255    let teardown_active = STATE_TEARDOWN_DEPTH.with(|depth| depth.get() > 0);
1256    if teardown_active {
1257        DEFERRED_STATE_RELEASES.with(|releases| {
1258            releases
1259                .borrow_mut()
1260                .push(DeferredStateRelease { runtime, id });
1261        });
1262    } else {
1263        runtime.release_state_immediate(id);
1264    }
1265}
1266
1267fn flush_deferred_state_releases() {
1268    DEFERRED_STATE_RELEASES.with(|releases| {
1269        let mut releases = releases.borrow_mut();
1270        while let Some(deferred) = releases.pop() {
1271            deferred.runtime.release_state_immediate(deferred.id);
1272        }
1273    });
1274}
1275
1276pub(crate) struct StateTeardownScope;
1277
1278pub(crate) fn enter_state_teardown_scope() -> StateTeardownScope {
1279    STATE_TEARDOWN_DEPTH.with(|depth| depth.set(depth.get() + 1));
1280    StateTeardownScope
1281}
1282
1283impl Drop for StateTeardownScope {
1284    fn drop(&mut self) {
1285        STATE_TEARDOWN_DEPTH.with(|depth| {
1286            let next = depth.get().saturating_sub(1);
1287            depth.set(next);
1288            if next == 0 {
1289                flush_deferred_state_releases();
1290            }
1291        });
1292    }
1293}
1294
1295pub(crate) fn push_active_runtime(handle: &RuntimeHandle) {
1296    register_runtime_handle(handle);
1297    ACTIVE_RUNTIMES.with(|stack| stack.borrow_mut().push(handle.clone()));
1298    LAST_RUNTIME.with(|slot| *slot.borrow_mut() = Some(handle.clone()));
1299}
1300
1301pub(crate) fn pop_active_runtime() {
1302    ACTIVE_RUNTIMES.with(|stack| {
1303        stack.borrow_mut().pop();
1304    });
1305}
1306
1307/// Schedule a new frame render using the most recently active runtime handle.
1308pub fn schedule_frame() {
1309    if let Some(handle) = current_runtime_handle() {
1310        handle.schedule();
1311        return;
1312    }
1313    panic!("no runtime available to schedule frame");
1314}
1315
1316/// Schedule an in-place node update using the most recently active runtime.
1317pub fn schedule_node_update(
1318    update: impl FnOnce(&mut dyn Applier) -> Result<(), NodeError> + 'static,
1319) {
1320    let handle = current_runtime_handle().expect("no runtime available to schedule node update");
1321    handle.enqueue_node_update(Command::callback(update));
1322}