Skip to main content

cranpose_core/
snapshot_state_observer.rs

1// Complex types are inherent to the observer pattern with nested callbacks and state tracking
2#![allow(clippy::type_complexity)]
3
4use crate::collections::map::{HashMap, HashSet};
5use crate::hash::default as default_hash;
6use crate::snapshot_v2::{register_apply_observer, ReadObserver, StateObjectId};
7use crate::state::StateObject;
8use crate::{RecomposeScope, RecomposeScopeInner, ScopeId};
9use smallvec::SmallVec;
10use std::any::{Any, TypeId};
11use std::cell::{Cell, RefCell};
12use std::hash::{Hash, Hasher};
13use std::rc::{Rc, Weak};
14use std::sync::Arc;
15
16/// Executes a callback once changes are delivered.
17type Executor = dyn Fn(Box<dyn FnOnce() + 'static>) + 'static;
18
19/// Observer that records state object reads performed inside a given scope and
20/// notifies the caller when any of the observed objects change.
21///
22/// This is a pragmatic Rust translation of Jetpack Compose's
23/// `SnapshotStateObserver`. The implementation focuses on the core behaviour
24/// needed by the Cranpose runtime:
25/// - Tracking state object reads per logical scope.
26/// - Reacting to snapshot apply notifications.
27/// - Scheduling invalidation callbacks via the supplied executor.
28///
29/// Advanced features from the Kotlin version (derived state tracking, change
30/// coalescing, queue minimisation) are deferred
31#[derive(Clone)]
32pub struct SnapshotStateObserver {
33    inner: Rc<SnapshotStateObserverInner>,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub struct SnapshotStateObserverDebugStats {
38    pub scopes_len: usize,
39    pub scopes_cap: usize,
40    pub fast_scopes_len: usize,
41    pub fast_scopes_cap: usize,
42    pub stateless_scope_count: usize,
43    pub observed_state_count: usize,
44    pub observed_state_capacity: usize,
45}
46
47impl SnapshotStateObserver {
48    /// Create a new observer that schedules callbacks using `on_changed_executor`.
49    pub fn new(on_changed_executor: impl Fn(Box<dyn FnOnce() + 'static>) + 'static) -> Self {
50        let inner = Rc::new(SnapshotStateObserverInner::new(on_changed_executor));
51        inner.set_self(Rc::downgrade(&inner));
52        Self { inner }
53    }
54
55    /// Observe state object reads performed while executing `block`.
56    ///
57    /// Subsequent calls to `observe_reads` replace any previously recorded
58    /// observations for the provided `scope`. When one of the observed objects
59    /// mutates, `on_value_changed_for_scope` will be invoked on the executor.
60    pub fn observe_reads<T, R>(
61        &self,
62        scope: T,
63        on_value_changed_for_scope: impl Fn(&T) + 'static,
64        block: impl FnOnce() -> R,
65    ) -> R
66    where
67        T: Any + Clone + Eq + Hash + 'static,
68    {
69        self.inner
70            .observe_reads(scope, on_value_changed_for_scope, block)
71    }
72
73    /// Notify the observer that a new composition frame is starting.
74    pub fn begin_frame(&self) {
75        self.inner.begin_frame();
76    }
77
78    /// Drop bookkeeping for scopes that were released during the current frame.
79    pub fn prune_dead_scopes(&self) {
80        self.inner.prune_dead_scopes();
81    }
82
83    /// Temporarily pause read observation while executing `block`.
84    pub fn with_no_observations<R>(&self, block: impl FnOnce() -> R) -> R {
85        self.inner.with_no_observations(block)
86    }
87
88    /// Remove any recorded reads for `scope`.
89    pub fn clear<T>(&self, scope: &T)
90    where
91        T: Any + Eq + Hash + 'static,
92    {
93        self.inner.clear(scope);
94    }
95
96    /// Remove recorded reads for scopes that satisfy `predicate`.
97    pub fn clear_if(&self, predicate: impl Fn(&dyn Any) -> bool) {
98        self.inner.clear_if(predicate);
99    }
100
101    /// Remove all recorded observations.
102    pub fn clear_all(&self) {
103        self.inner.clear_all();
104    }
105
106    /// Begin listening for snapshot apply notifications.
107    pub fn start(&self) {
108        let weak = Rc::downgrade(&self.inner);
109        self.inner.start(weak);
110    }
111
112    /// Stop listening for snapshot apply notifications.
113    pub fn stop(&self) {
114        self.inner.stop();
115    }
116
117    pub fn debug_stats(&self) -> SnapshotStateObserverDebugStats {
118        self.inner.debug_stats()
119    }
120
121    /// Test-only helper to simulate snapshot changes.
122    #[cfg(test)]
123    pub fn notify_changes(&self, modified: &[Arc<dyn StateObject>]) {
124        self.inner.handle_apply(modified);
125    }
126}
127
128struct SnapshotStateObserverInner {
129    // Borrowing invariants:
130    // - `observe_reads` may push/pop `active_read_targets`, but it must not hold that
131    //   RefCell borrow across the user-provided `block`; the scoped guard only keeps
132    //   ownership of the target handle, not an active borrow.
133    // - `read_dispatcher` borrows `active_read_targets` just long enough to clone the
134    //   current target, then mutates only the selected `ObservedIds`.
135    // - `handle_apply` snapshots `observed_to_scopes` and `indexed_scopes` into a
136    //   separate notification list before borrowing any `ScopeEntry`, so map borrows
137    //   are never held while callbacks may inspect or mutate entry state.
138    // - `replace_observed_ids`, `clear`, `clear_if`, and `prune_dead_scopes` may borrow
139    //   scope maps mutably, but they must release any `ScopeEntry` borrow before removing
140    //   the entry from `indexed_scopes`, `fast_scopes`, `owned_scopes`, or
141    //   `observed_to_scopes`.
142    executor: Rc<Executor>,
143    owned_scopes: RefCell<HashMap<OwnedScopeIndexKey, OwnedScopeBucket>>,
144    fast_scopes: RefCell<HashMap<ScopeId, Rc<RefCell<ScopeEntry>>>>,
145    indexed_scopes: RefCell<HashMap<usize, Rc<RefCell<ScopeEntry>>>>,
146    observed_to_scopes: RefCell<HashMap<StateObjectId, HashSet<usize>>>,
147    pause_count: Rc<Cell<usize>>,
148    active_read_targets: Rc<RefCell<Vec<Rc<RefCell<ObservedIds>>>>>,
149    read_dispatcher: ReadObserver,
150    apply_handle: RefCell<Option<crate::snapshot_v2::ObserverHandle>>,
151    weak_self: RefCell<Weak<SnapshotStateObserverInner>>,
152    frame_version: Cell<u64>,
153    next_entry_id: Cell<usize>,
154}
155
156#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
157struct OwnedScopeIndexKey {
158    type_id: TypeId,
159    value_hash: u64,
160}
161
162type OwnedScopeBucket = SmallVec<[Rc<RefCell<ScopeEntry>>; 1]>;
163
164fn owned_scope_index_key<T>(scope: &T) -> OwnedScopeIndexKey
165where
166    T: Any + Hash + 'static,
167{
168    let mut hasher = default_hash::new();
169    scope.hash(&mut hasher);
170    OwnedScopeIndexKey {
171        type_id: TypeId::of::<T>(),
172        value_hash: hasher.finish(),
173    }
174}
175
176impl SnapshotStateObserverInner {
177    const MIN_RETAINED_SCOPE_CAPACITY: usize = 256;
178
179    fn new(on_changed_executor: impl Fn(Box<dyn FnOnce() + 'static>) + 'static) -> Self {
180        let pause_count = Rc::new(Cell::new(0));
181        let active_read_targets = Rc::new(RefCell::new(Vec::<Rc<RefCell<ObservedIds>>>::new()));
182        let dispatcher_pause_count = Rc::clone(&pause_count);
183        let dispatcher_targets = Rc::clone(&active_read_targets);
184        let read_dispatcher: ReadObserver = Arc::new(move |state| {
185            if dispatcher_pause_count.get() > 0 {
186                return;
187            }
188            let observed = dispatcher_targets.borrow().last().cloned();
189            if let Some(observed) = observed {
190                observed.borrow_mut().insert(state.object_id().as_usize());
191            }
192        });
193
194        Self {
195            executor: Rc::new(on_changed_executor),
196            owned_scopes: RefCell::new(HashMap::default()),
197            fast_scopes: RefCell::new(HashMap::default()),
198            indexed_scopes: RefCell::new(HashMap::default()),
199            observed_to_scopes: RefCell::new(HashMap::default()),
200            pause_count,
201            active_read_targets,
202            read_dispatcher,
203            apply_handle: RefCell::new(None),
204            weak_self: RefCell::new(Weak::new()),
205            frame_version: Cell::new(0),
206            next_entry_id: Cell::new(0),
207        }
208    }
209
210    fn set_self(&self, weak: Weak<SnapshotStateObserverInner>) {
211        self.weak_self.replace(weak);
212    }
213
214    fn begin_frame(&self) {
215        let next = self.frame_version.get().wrapping_add(1);
216        self.frame_version.set(next);
217        self.prune_dead_scopes();
218    }
219
220    fn observe_reads<T, R>(
221        &self,
222        scope: T,
223        on_value_changed_for_scope: impl Fn(&T) + 'static,
224        block: impl FnOnce() -> R,
225    ) -> R
226    where
227        T: Any + Clone + Eq + Hash + 'static,
228    {
229        let frame_version = self.frame_version.get();
230        let has_frame_version = frame_version != 0;
231
232        let on_changed: Rc<dyn Fn(&dyn Any)> = {
233            let callback = Rc::new(on_value_changed_for_scope);
234            Rc::new(move |scope_any: &dyn Any| {
235                if let Some(typed) = scope_any.downcast_ref::<T>() {
236                    callback(typed);
237                }
238            })
239        };
240
241        let existing_entry = self.find_scope_entry(&scope);
242        if let Some(entry) = existing_entry.as_ref() {
243            let already_observed = {
244                let mut entry_mut = entry.borrow_mut();
245                entry_mut.update(scope.clone(), on_changed.clone());
246                has_frame_version && entry_mut.last_seen_version == frame_version
247            };
248            if already_observed {
249                return block();
250            }
251        }
252
253        let observed = Rc::new(RefCell::new(ObservedIds::new()));
254        self.active_read_targets
255            .borrow_mut()
256            .push(Rc::clone(&observed));
257        struct ActiveObservationGuard {
258            stack: Rc<RefCell<Vec<Rc<RefCell<ObservedIds>>>>>,
259        }
260        impl Drop for ActiveObservationGuard {
261            fn drop(&mut self) {
262                self.stack.borrow_mut().pop();
263            }
264        }
265        let _guard = ActiveObservationGuard {
266            stack: Rc::clone(&self.active_read_targets),
267        };
268
269        let result = self.run_with_read_observer(block);
270
271        if observed.borrow().is_empty() {
272            if existing_entry.is_some() {
273                self.clear(&scope);
274            }
275            return result;
276        }
277
278        let observed = {
279            let mut observed = observed.borrow_mut();
280            std::mem::replace(&mut *observed, ObservedIds::new())
281        };
282        let entry = existing_entry
283            .unwrap_or_else(|| self.insert_scope_entry(scope.clone(), on_changed.clone()));
284        {
285            let mut entry_mut = entry.borrow_mut();
286            entry_mut.update(scope, on_changed);
287            entry_mut.last_seen_version = if has_frame_version {
288                frame_version
289            } else {
290                u64::MAX
291            };
292        }
293        self.replace_observed_ids(&entry, observed);
294
295        result
296    }
297
298    fn with_no_observations<R>(&self, block: impl FnOnce() -> R) -> R {
299        self.pause_count.set(self.pause_count.get() + 1);
300        let result = block();
301        self.pause_count
302            .set(self.pause_count.get().saturating_sub(1));
303        result
304    }
305
306    fn clear<T>(&self, scope: &T)
307    where
308        T: Any + Eq + Hash + 'static,
309    {
310        if let Some(rc_scope) = (scope as &dyn Any).downcast_ref::<RecomposeScope>() {
311            if let Some(entry) = self.fast_scopes.borrow_mut().remove(&rc_scope.id()) {
312                self.unregister_entry(&entry);
313            }
314            return;
315        }
316
317        let removed = self.remove_owned_scope_entry(scope);
318        if let Some(entry) = removed {
319            self.unregister_entry(&entry);
320        }
321    }
322
323    fn clear_if(&self, predicate: impl Fn(&dyn Any) -> bool) {
324        let removed_fast = {
325            let mut fast_scopes = self.fast_scopes.borrow_mut();
326            let removed_ids: Vec<_> = fast_scopes
327                .iter()
328                .filter(|(_, entry)| entry.borrow().matches_predicate(&predicate))
329                .map(|(scope_id, _)| *scope_id)
330                .collect();
331            removed_ids
332                .into_iter()
333                .filter_map(|scope_id| fast_scopes.remove(&scope_id))
334                .collect::<Vec<_>>()
335        };
336        let removed_owned =
337            { self.partition_owned_scopes(|entry| entry.matches_predicate(&predicate)) };
338
339        for entry in removed_fast.into_iter().chain(removed_owned) {
340            self.unregister_entry(&entry);
341        }
342    }
343
344    fn clear_all(&self) {
345        self.fast_scopes.borrow_mut().clear();
346        self.owned_scopes.borrow_mut().clear();
347        self.indexed_scopes.borrow_mut().clear();
348        self.observed_to_scopes.borrow_mut().clear();
349    }
350
351    fn start(&self, weak_self: Weak<SnapshotStateObserverInner>) {
352        if self.apply_handle.borrow().is_some() {
353            return;
354        }
355
356        let handle = register_apply_observer(Rc::new(move |modified, _snapshot_id| {
357            if let Some(inner) = weak_self.upgrade() {
358                inner.handle_apply(modified);
359            }
360        }));
361        self.apply_handle.replace(Some(handle));
362    }
363
364    fn stop(&self) {
365        if let Some(handle) = self.apply_handle.borrow_mut().take() {
366            drop(handle);
367        }
368    }
369
370    fn find_scope_entry<T>(&self, scope: &T) -> Option<Rc<RefCell<ScopeEntry>>>
371    where
372        T: Any + Eq + Hash + 'static,
373    {
374        if let Some(scope) = (scope as &dyn Any).downcast_ref::<RecomposeScope>() {
375            return self.fast_scopes.borrow().get(&scope.id()).cloned();
376        }
377
378        self.find_owned_scope_entry(scope)
379    }
380
381    fn insert_scope_entry(
382        &self,
383        scope: impl Any + Clone + Eq + Hash + 'static,
384        on_changed: Rc<dyn Fn(&dyn Any)>,
385    ) -> Rc<RefCell<ScopeEntry>> {
386        let entry_id = self.next_entry_id.get();
387        self.next_entry_id.set(entry_id.wrapping_add(1));
388        let recompose_scope_id = (&scope as &dyn Any)
389            .downcast_ref::<RecomposeScope>()
390            .map(RecomposeScope::id);
391        let owned_scope_key = recompose_scope_id
392            .is_none()
393            .then(|| owned_scope_index_key(&scope));
394        let entry = Rc::new(RefCell::new(ScopeEntry::new(entry_id, scope, on_changed)));
395        self.indexed_scopes
396            .borrow_mut()
397            .insert(entry_id, Rc::clone(&entry));
398        if let Some(scope_id) = recompose_scope_id {
399            self.fast_scopes
400                .borrow_mut()
401                .insert(scope_id, Rc::clone(&entry));
402        } else if let Some(scope_key) = owned_scope_key {
403            self.owned_scopes
404                .borrow_mut()
405                .entry(scope_key)
406                .or_default()
407                .push(Rc::clone(&entry));
408        }
409        entry
410    }
411
412    fn prune_dead_scopes(&self) {
413        let removed_fast = {
414            let mut fast_scopes = self.fast_scopes.borrow_mut();
415            let removed_ids: Vec<_> = fast_scopes
416                .iter()
417                .filter(|(_, entry)| !entry.borrow().should_retain())
418                .map(|(scope_id, _)| *scope_id)
419                .collect();
420            let removed = removed_ids
421                .into_iter()
422                .filter_map(|scope_id| fast_scopes.remove(&scope_id))
423                .collect::<Vec<_>>();
424            shrink_map_if_sparse(&mut fast_scopes, Self::MIN_RETAINED_SCOPE_CAPACITY);
425            removed
426        };
427
428        let removed_owned = { self.partition_owned_scopes(|entry| !entry.should_retain()) };
429
430        for entry in removed_fast.into_iter().chain(removed_owned) {
431            self.unregister_entry(&entry);
432        }
433    }
434
435    fn find_owned_scope_entry<T>(&self, scope: &T) -> Option<Rc<RefCell<ScopeEntry>>>
436    where
437        T: Any + Eq + Hash + 'static,
438    {
439        let key = owned_scope_index_key(scope);
440        self.owned_scopes.borrow().get(&key).and_then(|bucket| {
441            bucket
442                .iter()
443                .find(|entry| entry.borrow().matches_scope(scope))
444                .cloned()
445        })
446    }
447
448    fn remove_owned_scope_entry<T>(&self, scope: &T) -> Option<Rc<RefCell<ScopeEntry>>>
449    where
450        T: Any + Eq + Hash + 'static,
451    {
452        let key = owned_scope_index_key(scope);
453        let mut owned_scopes = self.owned_scopes.borrow_mut();
454        let mut removed = None;
455        let mut remove_bucket = false;
456        if let Some(bucket) = owned_scopes.get_mut(&key) {
457            if let Some(index) = bucket
458                .iter()
459                .position(|entry| entry.borrow().matches_scope(scope))
460            {
461                removed = Some(bucket.remove(index));
462                remove_bucket = bucket.is_empty();
463            }
464        }
465        if remove_bucket {
466            owned_scopes.remove(&key);
467        }
468        shrink_map_if_sparse(&mut owned_scopes, Self::MIN_RETAINED_SCOPE_CAPACITY);
469        removed
470    }
471
472    fn partition_owned_scopes(
473        &self,
474        should_remove: impl Fn(&ScopeEntry) -> bool,
475    ) -> Vec<Rc<RefCell<ScopeEntry>>> {
476        let mut owned_scopes = self.owned_scopes.borrow_mut();
477        let mut retained = HashMap::default();
478        let mut removed = Vec::new();
479        for (key, mut bucket) in owned_scopes.drain() {
480            let mut retained_bucket = OwnedScopeBucket::new();
481            for entry in bucket.drain(..) {
482                if should_remove(&entry.borrow()) {
483                    removed.push(entry);
484                } else {
485                    retained_bucket.push(entry);
486                }
487            }
488            if !retained_bucket.is_empty() {
489                retained.insert(key, retained_bucket);
490            }
491        }
492        *owned_scopes = retained;
493        shrink_map_if_sparse(&mut owned_scopes, Self::MIN_RETAINED_SCOPE_CAPACITY);
494        removed
495    }
496
497    fn debug_stats(&self) -> SnapshotStateObserverDebugStats {
498        let owned_scopes = self.owned_scopes.borrow();
499        let fast_scopes = self.fast_scopes.borrow();
500        let owned_scope_len = owned_scopes.values().map(SmallVec::len).sum::<usize>();
501        let owned_scope_cap =
502            owned_scopes.capacity() + owned_scopes.values().map(SmallVec::capacity).sum::<usize>();
503        let scopes_len = owned_scope_len + fast_scopes.len();
504        let scopes_cap = owned_scope_cap + fast_scopes.capacity();
505        let mut observed_state_count = 0;
506        let mut observed_state_capacity = 0;
507        let mut stateless_scope_count = 0;
508
509        for entry in owned_scopes
510            .values()
511            .flat_map(|bucket| bucket.iter())
512            .chain(fast_scopes.values())
513        {
514            let entry = entry.borrow();
515            observed_state_count += entry.observed.len();
516            observed_state_capacity += entry.observed.capacity();
517            stateless_scope_count += usize::from(entry.observed.is_empty());
518        }
519
520        SnapshotStateObserverDebugStats {
521            scopes_len,
522            scopes_cap,
523            fast_scopes_len: fast_scopes.len(),
524            fast_scopes_cap: fast_scopes.capacity(),
525            stateless_scope_count,
526            observed_state_count,
527            observed_state_capacity,
528        }
529    }
530
531    fn run_with_read_observer<R>(&self, block: impl FnOnce() -> R) -> R {
532        // Kotlin uses Snapshot.observeInternal which creates a TransparentObserverMutableSnapshot,
533        // not a readonly snapshot. This allows writes to happen during observation (composition).
534        use crate::snapshot_v2::take_transparent_observer_mutable_snapshot;
535
536        // Create a transparent mutable snapshot (not readonly!) for observation
537        // This matches Kotlin's Snapshot.observeInternal behavior
538        let snapshot =
539            take_transparent_observer_mutable_snapshot(Some(self.read_dispatcher.clone()), None);
540        let result = snapshot.enter(block);
541        snapshot.dispose();
542        result
543    }
544
545    fn handle_apply(&self, modified: &[Arc<dyn StateObject>]) {
546        if modified.is_empty() {
547            return;
548        }
549
550        let mut seen_scope_ids = HashSet::default();
551        let mut to_notify: Vec<Rc<RefCell<ScopeEntry>>> = Vec::new();
552        {
553            let observed_to_scopes = self.observed_to_scopes.borrow();
554            let indexed_scopes = self.indexed_scopes.borrow();
555            for state in modified {
556                if let Some(scope_ids) = observed_to_scopes.get(&state.object_id().as_usize()) {
557                    let mut ordered_scope_ids: SmallVec<[usize; 8]> =
558                        scope_ids.iter().copied().collect();
559                    ordered_scope_ids.sort_unstable();
560                    for scope_id in ordered_scope_ids {
561                        if seen_scope_ids.insert(scope_id) {
562                            if let Some(entry) = indexed_scopes.get(&scope_id) {
563                                to_notify.push(entry.clone());
564                            }
565                        }
566                    }
567                }
568            }
569        }
570
571        if to_notify.is_empty() {
572            return;
573        }
574
575        for entry in to_notify {
576            let executor = self.executor.clone();
577            executor(Box::new(move || {
578                if let Ok(entry) = entry.try_borrow() {
579                    entry.notify();
580                }
581            }));
582        }
583    }
584
585    fn replace_observed_ids(&self, entry: &Rc<RefCell<ScopeEntry>>, observed: ObservedIds) {
586        let (entry_id, previous) = {
587            let mut entry_mut = entry.borrow_mut();
588            let entry_id = entry_mut.id;
589            let previous = std::mem::replace(&mut entry_mut.observed, observed);
590            (entry_id, previous)
591        };
592        self.unregister_observed_ids(entry_id, &previous);
593        let entry_ref = entry.borrow();
594        self.register_observed_ids(entry_id, &entry_ref.observed);
595    }
596
597    fn register_observed_ids(&self, entry_id: usize, observed: &ObservedIds) {
598        let mut observed_to_scopes = self.observed_to_scopes.borrow_mut();
599        for &state_id in observed.iter() {
600            let scope_ids = observed_to_scopes.entry(state_id).or_default();
601            scope_ids.insert(entry_id);
602        }
603    }
604
605    fn unregister_observed_ids(&self, entry_id: usize, observed: &ObservedIds) {
606        let mut observed_to_scopes = self.observed_to_scopes.borrow_mut();
607        let mut emptied = SmallVec::<[StateObjectId; MAX_OBSERVED_STATES]>::new();
608        for &state_id in observed.iter() {
609            if let Some(scope_ids) = observed_to_scopes.get_mut(&state_id) {
610                scope_ids.remove(&entry_id);
611                if scope_ids.is_empty() {
612                    emptied.push(state_id);
613                }
614            }
615        }
616        for state_id in emptied {
617            observed_to_scopes.remove(&state_id);
618        }
619        shrink_map_if_sparse(&mut observed_to_scopes, Self::MIN_RETAINED_SCOPE_CAPACITY);
620    }
621
622    fn unregister_entry(&self, entry: &Rc<RefCell<ScopeEntry>>) {
623        let (entry_id, observed) = {
624            let mut entry_mut = entry.borrow_mut();
625            let observed = std::mem::replace(&mut entry_mut.observed, ObservedIds::new());
626            (entry_mut.id, observed)
627        };
628        self.unregister_observed_ids(entry_id, &observed);
629        self.indexed_scopes.borrow_mut().remove(&entry_id);
630    }
631}
632
633fn shrink_map_if_sparse<K, V>(map: &mut HashMap<K, V>, min_retained_capacity: usize)
634where
635    K: Eq + std::hash::Hash,
636{
637    if map.capacity() <= map.len().max(min_retained_capacity).saturating_mul(4) {
638        return;
639    }
640
641    let retained = map.len().max(min_retained_capacity);
642    let mut rebuilt = HashMap::default();
643    rebuilt.reserve(retained);
644    rebuilt.extend(map.drain());
645    *map = rebuilt;
646}
647
648enum ObservedIds {
649    Small(SmallVec<[StateObjectId; MAX_OBSERVED_STATES]>),
650    Large(HashSet<StateObjectId>),
651}
652
653impl ObservedIds {
654    fn new() -> Self {
655        ObservedIds::Small(SmallVec::new())
656    }
657
658    fn insert(&mut self, id: StateObjectId) {
659        match self {
660            ObservedIds::Small(small) => {
661                if small.contains(&id) {
662                    return;
663                }
664                if small.len() < MAX_OBSERVED_STATES {
665                    small.push(id);
666                } else {
667                    let mut large =
668                        HashSet::with_capacity_and_hasher(small.len() + 1, Default::default());
669                    for existing in small.iter() {
670                        large.insert(*existing);
671                    }
672                    large.insert(id);
673                    *self = ObservedIds::Large(large);
674                }
675            }
676            ObservedIds::Large(large) => {
677                large.insert(id);
678            }
679        }
680    }
681
682    fn is_empty(&self) -> bool {
683        match self {
684            ObservedIds::Small(small) => small.is_empty(),
685            ObservedIds::Large(large) => large.is_empty(),
686        }
687    }
688
689    fn len(&self) -> usize {
690        match self {
691            ObservedIds::Small(small) => small.len(),
692            ObservedIds::Large(large) => large.len(),
693        }
694    }
695
696    fn capacity(&self) -> usize {
697        match self {
698            ObservedIds::Small(small) => small.capacity(),
699            ObservedIds::Large(large) => large.capacity(),
700        }
701    }
702
703    fn iter(&self) -> Box<dyn Iterator<Item = &StateObjectId> + '_> {
704        match self {
705            ObservedIds::Small(small) => Box::new(small.iter()),
706            ObservedIds::Large(large) => Box::new(large.iter()),
707        }
708    }
709}
710
711// Most scopes observe only a handful of state objects. Spill into HashSet after that.
712const MAX_OBSERVED_STATES: usize = 8;
713
714enum ScopeStorage {
715    Owned(Box<dyn Any>),
716    RecomposeScope {
717        id: ScopeId,
718        weak: Weak<RecomposeScopeInner>,
719    },
720}
721
722struct ScopeEntry {
723    id: usize,
724    scope: ScopeStorage,
725    on_changed: Rc<dyn Fn(&dyn Any)>,
726    observed: ObservedIds,
727    last_seen_version: u64,
728}
729
730impl ScopeEntry {
731    fn new<T>(id: usize, scope: T, on_changed: Rc<dyn Fn(&dyn Any)>) -> Self
732    where
733        T: Any + 'static,
734    {
735        Self {
736            id,
737            scope: ScopeStorage::from_value(scope),
738            on_changed,
739            observed: ObservedIds::new(),
740            last_seen_version: u64::MAX,
741        }
742    }
743
744    fn update<T>(&mut self, new_scope: T, on_changed: Rc<dyn Fn(&dyn Any)>)
745    where
746        T: Any + 'static,
747    {
748        self.scope = ScopeStorage::from_value(new_scope);
749        self.on_changed = on_changed;
750    }
751
752    fn matches_scope<T>(&self, scope: &T) -> bool
753    where
754        T: Any + Eq + 'static,
755    {
756        if let Some(scope) = (scope as &dyn Any).downcast_ref::<RecomposeScope>() {
757            return matches!(
758                &self.scope,
759                ScopeStorage::RecomposeScope { id, .. } if *id == scope.id()
760            );
761        }
762
763        match &self.scope {
764            ScopeStorage::Owned(stored) => stored
765                .downcast_ref::<T>()
766                .map(|stored| stored == scope)
767                .unwrap_or(false),
768            ScopeStorage::RecomposeScope { .. } => false,
769        }
770    }
771
772    fn matches_predicate(&self, predicate: &impl Fn(&dyn Any) -> bool) -> bool {
773        match &self.scope {
774            ScopeStorage::Owned(scope) => predicate(scope.as_ref()),
775            ScopeStorage::RecomposeScope { weak, .. } => weak
776                .upgrade()
777                .map(|inner| predicate(&RecomposeScope { inner }))
778                .unwrap_or(true),
779        }
780    }
781
782    fn should_retain(&self) -> bool {
783        match &self.scope {
784            ScopeStorage::Owned(_) => true,
785            ScopeStorage::RecomposeScope { weak, .. } => weak.upgrade().is_some(),
786        }
787    }
788
789    fn notify(&self) {
790        match &self.scope {
791            ScopeStorage::Owned(scope) => (self.on_changed)(scope.as_ref()),
792            ScopeStorage::RecomposeScope { weak, .. } => {
793                if let Some(inner) = weak.upgrade() {
794                    (self.on_changed)(&RecomposeScope { inner });
795                }
796            }
797        }
798    }
799}
800
801impl ScopeStorage {
802    fn from_value<T>(value: T) -> Self
803    where
804        T: Any + 'static,
805    {
806        let any = &value as &dyn Any;
807        if let Some(scope) = any.downcast_ref::<RecomposeScope>() {
808            Self::RecomposeScope {
809                id: scope.id(),
810                weak: scope.downgrade(),
811            }
812        } else {
813            Self::Owned(Box::new(value))
814        }
815    }
816}
817
818#[cfg(test)]
819mod tests {
820    use super::*;
821    use crate::snapshot_v2::take_mutable_snapshot;
822    use crate::snapshot_v2::{reset_runtime_for_tests, TestRuntimeGuard};
823    use crate::state::{NeverEqual, SnapshotMutableState};
824    use std::cell::Cell;
825
826    fn reset_runtime() -> TestRuntimeGuard {
827        reset_runtime_for_tests()
828    }
829
830    #[derive(Clone, Eq, Hash, PartialEq)]
831    struct TestScope(&'static str);
832
833    #[test]
834    fn notifies_scope_when_state_changes() {
835        let _guard = reset_runtime();
836
837        let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
838        let triggered = Rc::new(Cell::new(0));
839        let observer_trigger = triggered.clone();
840
841        let observer = SnapshotStateObserver::new(|callback| callback());
842        observer.start();
843
844        let scope = TestScope("scope");
845        observer.observe_reads(
846            scope.clone(),
847            move |_| {
848                observer_trigger.set(observer_trigger.get() + 1);
849            },
850            || {
851                let _ = state.get();
852            },
853        );
854
855        let snapshot = take_mutable_snapshot(None, None);
856        snapshot.enter(|| {
857            state.set(1);
858        });
859        snapshot.apply().check();
860
861        assert_eq!(triggered.get(), 1);
862        observer.stop();
863    }
864
865    #[test]
866    fn clear_removes_scope_observation() {
867        let _guard = reset_runtime();
868
869        let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
870        let triggered = Rc::new(Cell::new(0));
871        let observer_trigger = triggered.clone();
872
873        let observer = SnapshotStateObserver::new(|callback| callback());
874        observer.start();
875
876        let scope = TestScope("scope");
877        observer.observe_reads(
878            scope.clone(),
879            move |_| {
880                observer_trigger.set(observer_trigger.get() + 1);
881            },
882            || {
883                let _ = state.get();
884            },
885        );
886
887        observer.clear(&scope);
888
889        let snapshot = take_mutable_snapshot(None, None);
890        snapshot.enter(|| {
891            state.set(1);
892        });
893        snapshot.apply().check();
894
895        assert_eq!(triggered.get(), 0);
896        observer.stop();
897    }
898
899    #[test]
900    fn repeated_owned_scope_observations_reuse_the_same_entry() {
901        let _guard = reset_runtime();
902
903        let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
904        let observer = SnapshotStateObserver::new(|callback| callback());
905        let scope = TestScope("scope");
906
907        observer.observe_reads(
908            scope.clone(),
909            |_| {},
910            || {
911                let _ = state.get();
912            },
913        );
914        observer.observe_reads(
915            scope,
916            |_| {},
917            || {
918                let _ = state.get();
919            },
920        );
921
922        let stats = observer.debug_stats();
923        assert_eq!(stats.scopes_len, 1);
924        assert_eq!(stats.fast_scopes_len, 0);
925    }
926
927    #[test]
928    fn with_no_observations_skips_reads() {
929        let _guard = reset_runtime();
930
931        let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
932        let triggered = Rc::new(Cell::new(0));
933        let observer_trigger = triggered.clone();
934
935        let observer = SnapshotStateObserver::new(|callback| callback());
936        observer.start();
937
938        let scope = TestScope("scope");
939        observer.observe_reads(
940            scope.clone(),
941            move |_| {
942                observer_trigger.set(observer_trigger.get() + 1);
943            },
944            || {
945                observer.with_no_observations(|| {
946                    let _ = state.get();
947                });
948            },
949        );
950
951        let snapshot = take_mutable_snapshot(None, None);
952        snapshot.enter(|| {
953            state.set(1);
954        });
955        snapshot.apply().check();
956
957        assert_eq!(triggered.get(), 0);
958        observer.stop();
959    }
960
961    #[test]
962    fn nested_observe_reads_attributes_state_to_innermost_scope_only() {
963        let _guard = reset_runtime();
964
965        let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
966        let outer_triggered = Rc::new(Cell::new(0));
967        let inner_triggered = Rc::new(Cell::new(0));
968
969        let observer = SnapshotStateObserver::new(|callback| callback());
970        observer.start();
971
972        let outer_scope = TestScope("outer");
973        let inner_scope = TestScope("inner");
974        observer.observe_reads(
975            outer_scope.clone(),
976            {
977                let outer_triggered = Rc::clone(&outer_triggered);
978                move |_| outer_triggered.set(outer_triggered.get() + 1)
979            },
980            || {
981                observer.observe_reads(
982                    inner_scope.clone(),
983                    {
984                        let inner_triggered = Rc::clone(&inner_triggered);
985                        move |_| inner_triggered.set(inner_triggered.get() + 1)
986                    },
987                    || {
988                        let _ = state.get();
989                    },
990                );
991            },
992        );
993
994        let snapshot = take_mutable_snapshot(None, None);
995        snapshot.enter(|| {
996            state.set(1);
997        });
998        snapshot.apply().check();
999
1000        assert_eq!(outer_triggered.get(), 0);
1001        assert_eq!(inner_triggered.get(), 1);
1002        observer.stop();
1003    }
1004
1005    #[test]
1006    fn clearing_one_scope_keeps_shared_state_registered_for_other_scope() {
1007        let _guard = reset_runtime();
1008
1009        let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
1010        let first_triggered = Rc::new(Cell::new(0));
1011        let second_triggered = Rc::new(Cell::new(0));
1012
1013        let observer = SnapshotStateObserver::new(|callback| callback());
1014        observer.start();
1015
1016        let first_scope = TestScope("first");
1017        let second_scope = TestScope("second");
1018        observer.observe_reads(
1019            first_scope.clone(),
1020            {
1021                let first_triggered = Rc::clone(&first_triggered);
1022                move |_| first_triggered.set(first_triggered.get() + 1)
1023            },
1024            || {
1025                let _ = state.get();
1026            },
1027        );
1028        observer.observe_reads(
1029            second_scope.clone(),
1030            {
1031                let second_triggered = Rc::clone(&second_triggered);
1032                move |_| second_triggered.set(second_triggered.get() + 1)
1033            },
1034            || {
1035                let _ = state.get();
1036            },
1037        );
1038
1039        observer.clear(&first_scope);
1040
1041        let snapshot = take_mutable_snapshot(None, None);
1042        snapshot.enter(|| {
1043            state.set(1);
1044        });
1045        snapshot.apply().check();
1046
1047        assert_eq!(first_triggered.get(), 0);
1048        assert_eq!(second_triggered.get(), 1);
1049        observer.stop();
1050    }
1051
1052    #[test]
1053    fn shared_state_notifies_scopes_in_registration_order() {
1054        let _guard = reset_runtime();
1055
1056        let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
1057        let notifications = Rc::new(RefCell::new(Vec::new()));
1058
1059        let observer = SnapshotStateObserver::new(|callback| callback());
1060        observer.start();
1061
1062        observer.observe_reads(
1063            TestScope("first"),
1064            {
1065                let notifications = Rc::clone(&notifications);
1066                move |_| notifications.borrow_mut().push("first")
1067            },
1068            || {
1069                let _ = state.get();
1070            },
1071        );
1072        observer.observe_reads(
1073            TestScope("second"),
1074            {
1075                let notifications = Rc::clone(&notifications);
1076                move |_| notifications.borrow_mut().push("second")
1077            },
1078            || {
1079                let _ = state.get();
1080            },
1081        );
1082
1083        let snapshot = take_mutable_snapshot(None, None);
1084        snapshot.enter(|| {
1085            state.set(1);
1086        });
1087        snapshot.apply().check();
1088
1089        assert_eq!(notifications.borrow().as_slice(), &["first", "second"]);
1090        observer.stop();
1091    }
1092
1093    #[test]
1094    fn stateless_recompose_scope_does_not_retain_observer_entry() {
1095        let _guard = reset_runtime();
1096
1097        let observer = SnapshotStateObserver::new(|callback| callback());
1098        let runtime = crate::TestRuntime::new();
1099        let scope = RecomposeScope::new_for_test(runtime.handle());
1100
1101        observer.observe_reads(scope, |_| {}, || {});
1102
1103        let stats = observer.debug_stats();
1104        assert_eq!(stats.scopes_len, 0);
1105        assert_eq!(stats.fast_scopes_len, 0);
1106        assert_eq!(stats.stateless_scope_count, 0);
1107    }
1108
1109    #[test]
1110    fn scope_that_stops_reading_state_is_removed_immediately() {
1111        let _guard = reset_runtime();
1112
1113        let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
1114        let observer = SnapshotStateObserver::new(|callback| callback());
1115        let runtime = crate::TestRuntime::new();
1116        let scope = RecomposeScope::new_for_test(runtime.handle());
1117        let triggered = Rc::new(Cell::new(0));
1118        let observer_trigger = Rc::clone(&triggered);
1119
1120        observer.observe_reads(
1121            scope.clone(),
1122            move |_| observer_trigger.set(observer_trigger.get() + 1),
1123            || {
1124                let _ = state.get();
1125            },
1126        );
1127
1128        let after_stateful = observer.debug_stats();
1129        assert_eq!(after_stateful.scopes_len, 1);
1130        assert_eq!(after_stateful.fast_scopes_len, 1);
1131
1132        observer.observe_reads(scope, |_| {}, || {});
1133
1134        let after_stateless = observer.debug_stats();
1135        assert_eq!(after_stateless.scopes_len, 0);
1136        assert_eq!(after_stateless.fast_scopes_len, 0);
1137
1138        let snapshot = take_mutable_snapshot(None, None);
1139        snapshot.enter(|| {
1140            state.set(1);
1141        });
1142        snapshot.apply().check();
1143
1144        assert_eq!(triggered.get(), 0);
1145    }
1146
1147    #[test]
1148    fn begin_frame_prunes_dropped_recompose_scope_entries() {
1149        let _guard = reset_runtime();
1150
1151        let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
1152        let observer = SnapshotStateObserver::new(|callback| callback());
1153        let runtime = crate::TestRuntime::new();
1154        let scope = RecomposeScope::new_for_test(runtime.handle());
1155
1156        observer.observe_reads(
1157            scope.clone(),
1158            |_| {},
1159            || {
1160                let _ = state.get();
1161            },
1162        );
1163
1164        let before_prune = observer.debug_stats();
1165        assert_eq!(before_prune.scopes_len, 1);
1166        assert_eq!(before_prune.fast_scopes_len, 1);
1167
1168        drop(scope);
1169        observer.begin_frame();
1170
1171        let after_prune = observer.debug_stats();
1172        assert_eq!(after_prune.scopes_len, 0);
1173        assert_eq!(after_prune.fast_scopes_len, 0);
1174    }
1175}