1#![allow(clippy::type_complexity)]
3
4use crate::collections::map::{HashMap, HashSet};
5use crate::hash::default as default_hash;
6use crate::snapshot_v2::{register_apply_observer, ReadObserver, StateObjectId};
7use crate::state::StateObject;
8use crate::{RecomposeScope, RecomposeScopeInner, ScopeId};
9use smallvec::SmallVec;
10use std::any::{Any, TypeId};
11use std::cell::{Cell, RefCell};
12use std::hash::{Hash, Hasher};
13use std::rc::{Rc, Weak};
14use std::sync::Arc;
15
16type Executor = dyn Fn(Box<dyn FnOnce() + 'static>) + 'static;
18
19#[derive(Clone)]
32pub struct SnapshotStateObserver {
33 inner: Rc<SnapshotStateObserverInner>,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub struct SnapshotStateObserverDebugStats {
38 pub scopes_len: usize,
39 pub scopes_cap: usize,
40 pub fast_scopes_len: usize,
41 pub fast_scopes_cap: usize,
42 pub stateless_scope_count: usize,
43 pub observed_state_count: usize,
44 pub observed_state_capacity: usize,
45}
46
47impl SnapshotStateObserver {
48 pub fn new(on_changed_executor: impl Fn(Box<dyn FnOnce() + 'static>) + 'static) -> Self {
50 let inner = Rc::new(SnapshotStateObserverInner::new(on_changed_executor));
51 inner.set_self(Rc::downgrade(&inner));
52 Self { inner }
53 }
54
55 pub fn observe_reads<T, R>(
61 &self,
62 scope: T,
63 on_value_changed_for_scope: impl Fn(&T) + 'static,
64 block: impl FnOnce() -> R,
65 ) -> R
66 where
67 T: Any + Clone + Eq + Hash + 'static,
68 {
69 self.inner
70 .observe_reads(scope, on_value_changed_for_scope, block)
71 }
72
73 pub fn begin_frame(&self) {
75 self.inner.begin_frame();
76 }
77
78 pub fn prune_dead_scopes(&self) {
80 self.inner.prune_dead_scopes();
81 }
82
83 pub fn with_no_observations<R>(&self, block: impl FnOnce() -> R) -> R {
85 self.inner.with_no_observations(block)
86 }
87
88 pub fn clear<T>(&self, scope: &T)
90 where
91 T: Any + Eq + Hash + 'static,
92 {
93 self.inner.clear(scope);
94 }
95
96 pub fn clear_if(&self, predicate: impl Fn(&dyn Any) -> bool) {
98 self.inner.clear_if(predicate);
99 }
100
101 pub fn clear_all(&self) {
103 self.inner.clear_all();
104 }
105
106 pub fn start(&self) {
108 let weak = Rc::downgrade(&self.inner);
109 self.inner.start(weak);
110 }
111
112 pub fn stop(&self) {
114 self.inner.stop();
115 }
116
117 pub fn debug_stats(&self) -> SnapshotStateObserverDebugStats {
118 self.inner.debug_stats()
119 }
120
121 #[cfg(test)]
123 pub fn notify_changes(&self, modified: &[Arc<dyn StateObject>]) {
124 self.inner.handle_apply(modified);
125 }
126}
127
128struct SnapshotStateObserverInner {
129 executor: Rc<Executor>,
143 owned_scopes: RefCell<HashMap<OwnedScopeIndexKey, OwnedScopeBucket>>,
144 fast_scopes: RefCell<HashMap<ScopeId, Rc<RefCell<ScopeEntry>>>>,
145 indexed_scopes: RefCell<HashMap<usize, Rc<RefCell<ScopeEntry>>>>,
146 observed_to_scopes: RefCell<HashMap<StateObjectId, HashSet<usize>>>,
147 pause_count: Rc<Cell<usize>>,
148 active_read_targets: Rc<RefCell<Vec<Rc<RefCell<ObservedIds>>>>>,
149 read_dispatcher: ReadObserver,
150 apply_handle: RefCell<Option<crate::snapshot_v2::ObserverHandle>>,
151 weak_self: RefCell<Weak<SnapshotStateObserverInner>>,
152 frame_version: Cell<u64>,
153 next_entry_id: Cell<usize>,
154}
155
156#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
157struct OwnedScopeIndexKey {
158 type_id: TypeId,
159 value_hash: u64,
160}
161
162type OwnedScopeBucket = SmallVec<[Rc<RefCell<ScopeEntry>>; 1]>;
163
164fn owned_scope_index_key<T>(scope: &T) -> OwnedScopeIndexKey
165where
166 T: Any + Hash + 'static,
167{
168 let mut hasher = default_hash::new();
169 scope.hash(&mut hasher);
170 OwnedScopeIndexKey {
171 type_id: TypeId::of::<T>(),
172 value_hash: hasher.finish(),
173 }
174}
175
176impl SnapshotStateObserverInner {
177 const MIN_RETAINED_SCOPE_CAPACITY: usize = 256;
178
179 fn new(on_changed_executor: impl Fn(Box<dyn FnOnce() + 'static>) + 'static) -> Self {
180 let pause_count = Rc::new(Cell::new(0));
181 let active_read_targets = Rc::new(RefCell::new(Vec::<Rc<RefCell<ObservedIds>>>::new()));
182 let dispatcher_pause_count = Rc::clone(&pause_count);
183 let dispatcher_targets = Rc::clone(&active_read_targets);
184 let read_dispatcher: ReadObserver = Arc::new(move |state| {
185 if dispatcher_pause_count.get() > 0 {
186 return;
187 }
188 let observed = dispatcher_targets.borrow().last().cloned();
189 if let Some(observed) = observed {
190 observed.borrow_mut().insert(state.object_id().as_usize());
191 }
192 });
193
194 Self {
195 executor: Rc::new(on_changed_executor),
196 owned_scopes: RefCell::new(HashMap::default()),
197 fast_scopes: RefCell::new(HashMap::default()),
198 indexed_scopes: RefCell::new(HashMap::default()),
199 observed_to_scopes: RefCell::new(HashMap::default()),
200 pause_count,
201 active_read_targets,
202 read_dispatcher,
203 apply_handle: RefCell::new(None),
204 weak_self: RefCell::new(Weak::new()),
205 frame_version: Cell::new(0),
206 next_entry_id: Cell::new(0),
207 }
208 }
209
210 fn set_self(&self, weak: Weak<SnapshotStateObserverInner>) {
211 self.weak_self.replace(weak);
212 }
213
214 fn begin_frame(&self) {
215 let next = self.frame_version.get().wrapping_add(1);
216 self.frame_version.set(next);
217 self.prune_dead_scopes();
218 }
219
220 fn observe_reads<T, R>(
221 &self,
222 scope: T,
223 on_value_changed_for_scope: impl Fn(&T) + 'static,
224 block: impl FnOnce() -> R,
225 ) -> R
226 where
227 T: Any + Clone + Eq + Hash + 'static,
228 {
229 let frame_version = self.frame_version.get();
230 let has_frame_version = frame_version != 0;
231
232 let on_changed: Rc<dyn Fn(&dyn Any)> = {
233 let callback = Rc::new(on_value_changed_for_scope);
234 Rc::new(move |scope_any: &dyn Any| {
235 if let Some(typed) = scope_any.downcast_ref::<T>() {
236 callback(typed);
237 }
238 })
239 };
240
241 let existing_entry = self.find_scope_entry(&scope);
242 if let Some(entry) = existing_entry.as_ref() {
243 let already_observed = {
244 let mut entry_mut = entry.borrow_mut();
245 entry_mut.update(scope.clone(), on_changed.clone());
246 has_frame_version && entry_mut.last_seen_version == frame_version
247 };
248 if already_observed {
249 return block();
250 }
251 }
252
253 let observed = Rc::new(RefCell::new(ObservedIds::new()));
254 self.active_read_targets
255 .borrow_mut()
256 .push(Rc::clone(&observed));
257 struct ActiveObservationGuard {
258 stack: Rc<RefCell<Vec<Rc<RefCell<ObservedIds>>>>>,
259 }
260 impl Drop for ActiveObservationGuard {
261 fn drop(&mut self) {
262 self.stack.borrow_mut().pop();
263 }
264 }
265 let _guard = ActiveObservationGuard {
266 stack: Rc::clone(&self.active_read_targets),
267 };
268
269 let result = self.run_with_read_observer(block);
270
271 if observed.borrow().is_empty() {
272 if existing_entry.is_some() {
273 self.clear(&scope);
274 }
275 return result;
276 }
277
278 let observed = {
279 let mut observed = observed.borrow_mut();
280 std::mem::replace(&mut *observed, ObservedIds::new())
281 };
282 let entry = existing_entry
283 .unwrap_or_else(|| self.insert_scope_entry(scope.clone(), on_changed.clone()));
284 {
285 let mut entry_mut = entry.borrow_mut();
286 entry_mut.update(scope, on_changed);
287 entry_mut.last_seen_version = if has_frame_version {
288 frame_version
289 } else {
290 u64::MAX
291 };
292 }
293 self.replace_observed_ids(&entry, observed);
294
295 result
296 }
297
298 fn with_no_observations<R>(&self, block: impl FnOnce() -> R) -> R {
299 self.pause_count.set(self.pause_count.get() + 1);
300 let result = block();
301 self.pause_count
302 .set(self.pause_count.get().saturating_sub(1));
303 result
304 }
305
306 fn clear<T>(&self, scope: &T)
307 where
308 T: Any + Eq + Hash + 'static,
309 {
310 if let Some(rc_scope) = (scope as &dyn Any).downcast_ref::<RecomposeScope>() {
311 if let Some(entry) = self.fast_scopes.borrow_mut().remove(&rc_scope.id()) {
312 self.unregister_entry(&entry);
313 }
314 return;
315 }
316
317 let removed = self.remove_owned_scope_entry(scope);
318 if let Some(entry) = removed {
319 self.unregister_entry(&entry);
320 }
321 }
322
323 fn clear_if(&self, predicate: impl Fn(&dyn Any) -> bool) {
324 let removed_fast = {
325 let mut fast_scopes = self.fast_scopes.borrow_mut();
326 let removed_ids: Vec<_> = fast_scopes
327 .iter()
328 .filter(|(_, entry)| entry.borrow().matches_predicate(&predicate))
329 .map(|(scope_id, _)| *scope_id)
330 .collect();
331 removed_ids
332 .into_iter()
333 .filter_map(|scope_id| fast_scopes.remove(&scope_id))
334 .collect::<Vec<_>>()
335 };
336 let removed_owned =
337 { self.partition_owned_scopes(|entry| entry.matches_predicate(&predicate)) };
338
339 for entry in removed_fast.into_iter().chain(removed_owned) {
340 self.unregister_entry(&entry);
341 }
342 }
343
344 fn clear_all(&self) {
345 self.fast_scopes.borrow_mut().clear();
346 self.owned_scopes.borrow_mut().clear();
347 self.indexed_scopes.borrow_mut().clear();
348 self.observed_to_scopes.borrow_mut().clear();
349 }
350
351 fn start(&self, weak_self: Weak<SnapshotStateObserverInner>) {
352 if self.apply_handle.borrow().is_some() {
353 return;
354 }
355
356 let handle = register_apply_observer(Rc::new(move |modified, _snapshot_id| {
357 if let Some(inner) = weak_self.upgrade() {
358 inner.handle_apply(modified);
359 }
360 }));
361 self.apply_handle.replace(Some(handle));
362 }
363
364 fn stop(&self) {
365 if let Some(handle) = self.apply_handle.borrow_mut().take() {
366 drop(handle);
367 }
368 }
369
370 fn find_scope_entry<T>(&self, scope: &T) -> Option<Rc<RefCell<ScopeEntry>>>
371 where
372 T: Any + Eq + Hash + 'static,
373 {
374 if let Some(scope) = (scope as &dyn Any).downcast_ref::<RecomposeScope>() {
375 return self.fast_scopes.borrow().get(&scope.id()).cloned();
376 }
377
378 self.find_owned_scope_entry(scope)
379 }
380
381 fn insert_scope_entry(
382 &self,
383 scope: impl Any + Clone + Eq + Hash + 'static,
384 on_changed: Rc<dyn Fn(&dyn Any)>,
385 ) -> Rc<RefCell<ScopeEntry>> {
386 let entry_id = self.next_entry_id.get();
387 self.next_entry_id.set(entry_id.wrapping_add(1));
388 let recompose_scope_id = (&scope as &dyn Any)
389 .downcast_ref::<RecomposeScope>()
390 .map(RecomposeScope::id);
391 let owned_scope_key = recompose_scope_id
392 .is_none()
393 .then(|| owned_scope_index_key(&scope));
394 let entry = Rc::new(RefCell::new(ScopeEntry::new(entry_id, scope, on_changed)));
395 self.indexed_scopes
396 .borrow_mut()
397 .insert(entry_id, Rc::clone(&entry));
398 if let Some(scope_id) = recompose_scope_id {
399 self.fast_scopes
400 .borrow_mut()
401 .insert(scope_id, Rc::clone(&entry));
402 } else if let Some(scope_key) = owned_scope_key {
403 self.owned_scopes
404 .borrow_mut()
405 .entry(scope_key)
406 .or_default()
407 .push(Rc::clone(&entry));
408 }
409 entry
410 }
411
412 fn prune_dead_scopes(&self) {
413 let removed_fast = {
414 let mut fast_scopes = self.fast_scopes.borrow_mut();
415 let removed_ids: Vec<_> = fast_scopes
416 .iter()
417 .filter(|(_, entry)| !entry.borrow().should_retain())
418 .map(|(scope_id, _)| *scope_id)
419 .collect();
420 let removed = removed_ids
421 .into_iter()
422 .filter_map(|scope_id| fast_scopes.remove(&scope_id))
423 .collect::<Vec<_>>();
424 shrink_map_if_sparse(&mut fast_scopes, Self::MIN_RETAINED_SCOPE_CAPACITY);
425 removed
426 };
427
428 let removed_owned = { self.partition_owned_scopes(|entry| !entry.should_retain()) };
429
430 for entry in removed_fast.into_iter().chain(removed_owned) {
431 self.unregister_entry(&entry);
432 }
433 }
434
435 fn find_owned_scope_entry<T>(&self, scope: &T) -> Option<Rc<RefCell<ScopeEntry>>>
436 where
437 T: Any + Eq + Hash + 'static,
438 {
439 let key = owned_scope_index_key(scope);
440 self.owned_scopes.borrow().get(&key).and_then(|bucket| {
441 bucket
442 .iter()
443 .find(|entry| entry.borrow().matches_scope(scope))
444 .cloned()
445 })
446 }
447
448 fn remove_owned_scope_entry<T>(&self, scope: &T) -> Option<Rc<RefCell<ScopeEntry>>>
449 where
450 T: Any + Eq + Hash + 'static,
451 {
452 let key = owned_scope_index_key(scope);
453 let mut owned_scopes = self.owned_scopes.borrow_mut();
454 let mut removed = None;
455 let mut remove_bucket = false;
456 if let Some(bucket) = owned_scopes.get_mut(&key) {
457 if let Some(index) = bucket
458 .iter()
459 .position(|entry| entry.borrow().matches_scope(scope))
460 {
461 removed = Some(bucket.remove(index));
462 remove_bucket = bucket.is_empty();
463 }
464 }
465 if remove_bucket {
466 owned_scopes.remove(&key);
467 }
468 shrink_map_if_sparse(&mut owned_scopes, Self::MIN_RETAINED_SCOPE_CAPACITY);
469 removed
470 }
471
472 fn partition_owned_scopes(
473 &self,
474 should_remove: impl Fn(&ScopeEntry) -> bool,
475 ) -> Vec<Rc<RefCell<ScopeEntry>>> {
476 let mut owned_scopes = self.owned_scopes.borrow_mut();
477 let mut retained = HashMap::default();
478 let mut removed = Vec::new();
479 for (key, mut bucket) in owned_scopes.drain() {
480 let mut retained_bucket = OwnedScopeBucket::new();
481 for entry in bucket.drain(..) {
482 if should_remove(&entry.borrow()) {
483 removed.push(entry);
484 } else {
485 retained_bucket.push(entry);
486 }
487 }
488 if !retained_bucket.is_empty() {
489 retained.insert(key, retained_bucket);
490 }
491 }
492 *owned_scopes = retained;
493 shrink_map_if_sparse(&mut owned_scopes, Self::MIN_RETAINED_SCOPE_CAPACITY);
494 removed
495 }
496
497 fn debug_stats(&self) -> SnapshotStateObserverDebugStats {
498 let owned_scopes = self.owned_scopes.borrow();
499 let fast_scopes = self.fast_scopes.borrow();
500 let owned_scope_len = owned_scopes.values().map(SmallVec::len).sum::<usize>();
501 let owned_scope_cap =
502 owned_scopes.capacity() + owned_scopes.values().map(SmallVec::capacity).sum::<usize>();
503 let scopes_len = owned_scope_len + fast_scopes.len();
504 let scopes_cap = owned_scope_cap + fast_scopes.capacity();
505 let mut observed_state_count = 0;
506 let mut observed_state_capacity = 0;
507 let mut stateless_scope_count = 0;
508
509 for entry in owned_scopes
510 .values()
511 .flat_map(|bucket| bucket.iter())
512 .chain(fast_scopes.values())
513 {
514 let entry = entry.borrow();
515 observed_state_count += entry.observed.len();
516 observed_state_capacity += entry.observed.capacity();
517 stateless_scope_count += usize::from(entry.observed.is_empty());
518 }
519
520 SnapshotStateObserverDebugStats {
521 scopes_len,
522 scopes_cap,
523 fast_scopes_len: fast_scopes.len(),
524 fast_scopes_cap: fast_scopes.capacity(),
525 stateless_scope_count,
526 observed_state_count,
527 observed_state_capacity,
528 }
529 }
530
531 fn run_with_read_observer<R>(&self, block: impl FnOnce() -> R) -> R {
532 use crate::snapshot_v2::take_transparent_observer_mutable_snapshot;
535
536 let snapshot =
539 take_transparent_observer_mutable_snapshot(Some(self.read_dispatcher.clone()), None);
540 let result = snapshot.enter(block);
541 snapshot.dispose();
542 result
543 }
544
545 fn handle_apply(&self, modified: &[Arc<dyn StateObject>]) {
546 if modified.is_empty() {
547 return;
548 }
549
550 let mut seen_scope_ids = HashSet::default();
551 let mut to_notify: Vec<Rc<RefCell<ScopeEntry>>> = Vec::new();
552 {
553 let observed_to_scopes = self.observed_to_scopes.borrow();
554 let indexed_scopes = self.indexed_scopes.borrow();
555 for state in modified {
556 if let Some(scope_ids) = observed_to_scopes.get(&state.object_id().as_usize()) {
557 let mut ordered_scope_ids: SmallVec<[usize; 8]> =
558 scope_ids.iter().copied().collect();
559 ordered_scope_ids.sort_unstable();
560 for scope_id in ordered_scope_ids {
561 if seen_scope_ids.insert(scope_id) {
562 if let Some(entry) = indexed_scopes.get(&scope_id) {
563 to_notify.push(entry.clone());
564 }
565 }
566 }
567 }
568 }
569 }
570
571 if to_notify.is_empty() {
572 return;
573 }
574
575 for entry in to_notify {
576 let executor = self.executor.clone();
577 executor(Box::new(move || {
578 if let Ok(entry) = entry.try_borrow() {
579 entry.notify();
580 }
581 }));
582 }
583 }
584
585 fn replace_observed_ids(&self, entry: &Rc<RefCell<ScopeEntry>>, observed: ObservedIds) {
586 let (entry_id, previous) = {
587 let mut entry_mut = entry.borrow_mut();
588 let entry_id = entry_mut.id;
589 let previous = std::mem::replace(&mut entry_mut.observed, observed);
590 (entry_id, previous)
591 };
592 self.unregister_observed_ids(entry_id, &previous);
593 let entry_ref = entry.borrow();
594 self.register_observed_ids(entry_id, &entry_ref.observed);
595 }
596
597 fn register_observed_ids(&self, entry_id: usize, observed: &ObservedIds) {
598 let mut observed_to_scopes = self.observed_to_scopes.borrow_mut();
599 for &state_id in observed.iter() {
600 let scope_ids = observed_to_scopes.entry(state_id).or_default();
601 scope_ids.insert(entry_id);
602 }
603 }
604
605 fn unregister_observed_ids(&self, entry_id: usize, observed: &ObservedIds) {
606 let mut observed_to_scopes = self.observed_to_scopes.borrow_mut();
607 let mut emptied = SmallVec::<[StateObjectId; MAX_OBSERVED_STATES]>::new();
608 for &state_id in observed.iter() {
609 if let Some(scope_ids) = observed_to_scopes.get_mut(&state_id) {
610 scope_ids.remove(&entry_id);
611 if scope_ids.is_empty() {
612 emptied.push(state_id);
613 }
614 }
615 }
616 for state_id in emptied {
617 observed_to_scopes.remove(&state_id);
618 }
619 shrink_map_if_sparse(&mut observed_to_scopes, Self::MIN_RETAINED_SCOPE_CAPACITY);
620 }
621
622 fn unregister_entry(&self, entry: &Rc<RefCell<ScopeEntry>>) {
623 let (entry_id, observed) = {
624 let mut entry_mut = entry.borrow_mut();
625 let observed = std::mem::replace(&mut entry_mut.observed, ObservedIds::new());
626 (entry_mut.id, observed)
627 };
628 self.unregister_observed_ids(entry_id, &observed);
629 self.indexed_scopes.borrow_mut().remove(&entry_id);
630 }
631}
632
633fn shrink_map_if_sparse<K, V>(map: &mut HashMap<K, V>, min_retained_capacity: usize)
634where
635 K: Eq + std::hash::Hash,
636{
637 if map.capacity() <= map.len().max(min_retained_capacity).saturating_mul(4) {
638 return;
639 }
640
641 let retained = map.len().max(min_retained_capacity);
642 let mut rebuilt = HashMap::default();
643 rebuilt.reserve(retained);
644 rebuilt.extend(map.drain());
645 *map = rebuilt;
646}
647
648enum ObservedIds {
649 Small(SmallVec<[StateObjectId; MAX_OBSERVED_STATES]>),
650 Large(HashSet<StateObjectId>),
651}
652
653impl ObservedIds {
654 fn new() -> Self {
655 ObservedIds::Small(SmallVec::new())
656 }
657
658 fn insert(&mut self, id: StateObjectId) {
659 match self {
660 ObservedIds::Small(small) => {
661 if small.contains(&id) {
662 return;
663 }
664 if small.len() < MAX_OBSERVED_STATES {
665 small.push(id);
666 } else {
667 let mut large =
668 HashSet::with_capacity_and_hasher(small.len() + 1, Default::default());
669 for existing in small.iter() {
670 large.insert(*existing);
671 }
672 large.insert(id);
673 *self = ObservedIds::Large(large);
674 }
675 }
676 ObservedIds::Large(large) => {
677 large.insert(id);
678 }
679 }
680 }
681
682 fn is_empty(&self) -> bool {
683 match self {
684 ObservedIds::Small(small) => small.is_empty(),
685 ObservedIds::Large(large) => large.is_empty(),
686 }
687 }
688
689 fn len(&self) -> usize {
690 match self {
691 ObservedIds::Small(small) => small.len(),
692 ObservedIds::Large(large) => large.len(),
693 }
694 }
695
696 fn capacity(&self) -> usize {
697 match self {
698 ObservedIds::Small(small) => small.capacity(),
699 ObservedIds::Large(large) => large.capacity(),
700 }
701 }
702
703 fn iter(&self) -> Box<dyn Iterator<Item = &StateObjectId> + '_> {
704 match self {
705 ObservedIds::Small(small) => Box::new(small.iter()),
706 ObservedIds::Large(large) => Box::new(large.iter()),
707 }
708 }
709}
710
711const MAX_OBSERVED_STATES: usize = 8;
713
714enum ScopeStorage {
715 Owned(Box<dyn Any>),
716 RecomposeScope {
717 id: ScopeId,
718 weak: Weak<RecomposeScopeInner>,
719 },
720}
721
722struct ScopeEntry {
723 id: usize,
724 scope: ScopeStorage,
725 on_changed: Rc<dyn Fn(&dyn Any)>,
726 observed: ObservedIds,
727 last_seen_version: u64,
728}
729
730impl ScopeEntry {
731 fn new<T>(id: usize, scope: T, on_changed: Rc<dyn Fn(&dyn Any)>) -> Self
732 where
733 T: Any + 'static,
734 {
735 Self {
736 id,
737 scope: ScopeStorage::from_value(scope),
738 on_changed,
739 observed: ObservedIds::new(),
740 last_seen_version: u64::MAX,
741 }
742 }
743
744 fn update<T>(&mut self, new_scope: T, on_changed: Rc<dyn Fn(&dyn Any)>)
745 where
746 T: Any + 'static,
747 {
748 self.scope = ScopeStorage::from_value(new_scope);
749 self.on_changed = on_changed;
750 }
751
752 fn matches_scope<T>(&self, scope: &T) -> bool
753 where
754 T: Any + Eq + 'static,
755 {
756 if let Some(scope) = (scope as &dyn Any).downcast_ref::<RecomposeScope>() {
757 return matches!(
758 &self.scope,
759 ScopeStorage::RecomposeScope { id, .. } if *id == scope.id()
760 );
761 }
762
763 match &self.scope {
764 ScopeStorage::Owned(stored) => stored
765 .downcast_ref::<T>()
766 .map(|stored| stored == scope)
767 .unwrap_or(false),
768 ScopeStorage::RecomposeScope { .. } => false,
769 }
770 }
771
772 fn matches_predicate(&self, predicate: &impl Fn(&dyn Any) -> bool) -> bool {
773 match &self.scope {
774 ScopeStorage::Owned(scope) => predicate(scope.as_ref()),
775 ScopeStorage::RecomposeScope { weak, .. } => weak
776 .upgrade()
777 .map(|inner| predicate(&RecomposeScope { inner }))
778 .unwrap_or(true),
779 }
780 }
781
782 fn should_retain(&self) -> bool {
783 match &self.scope {
784 ScopeStorage::Owned(_) => true,
785 ScopeStorage::RecomposeScope { weak, .. } => weak.upgrade().is_some(),
786 }
787 }
788
789 fn notify(&self) {
790 match &self.scope {
791 ScopeStorage::Owned(scope) => (self.on_changed)(scope.as_ref()),
792 ScopeStorage::RecomposeScope { weak, .. } => {
793 if let Some(inner) = weak.upgrade() {
794 (self.on_changed)(&RecomposeScope { inner });
795 }
796 }
797 }
798 }
799}
800
801impl ScopeStorage {
802 fn from_value<T>(value: T) -> Self
803 where
804 T: Any + 'static,
805 {
806 let any = &value as &dyn Any;
807 if let Some(scope) = any.downcast_ref::<RecomposeScope>() {
808 Self::RecomposeScope {
809 id: scope.id(),
810 weak: scope.downgrade(),
811 }
812 } else {
813 Self::Owned(Box::new(value))
814 }
815 }
816}
817
818#[cfg(test)]
819mod tests {
820 use super::*;
821 use crate::snapshot_v2::take_mutable_snapshot;
822 use crate::snapshot_v2::{reset_runtime_for_tests, TestRuntimeGuard};
823 use crate::state::{NeverEqual, SnapshotMutableState};
824 use std::cell::Cell;
825
826 fn reset_runtime() -> TestRuntimeGuard {
827 reset_runtime_for_tests()
828 }
829
830 #[derive(Clone, Eq, Hash, PartialEq)]
831 struct TestScope(&'static str);
832
833 #[test]
834 fn notifies_scope_when_state_changes() {
835 let _guard = reset_runtime();
836
837 let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
838 let triggered = Rc::new(Cell::new(0));
839 let observer_trigger = triggered.clone();
840
841 let observer = SnapshotStateObserver::new(|callback| callback());
842 observer.start();
843
844 let scope = TestScope("scope");
845 observer.observe_reads(
846 scope.clone(),
847 move |_| {
848 observer_trigger.set(observer_trigger.get() + 1);
849 },
850 || {
851 let _ = state.get();
852 },
853 );
854
855 let snapshot = take_mutable_snapshot(None, None);
856 snapshot.enter(|| {
857 state.set(1);
858 });
859 snapshot.apply().check();
860
861 assert_eq!(triggered.get(), 1);
862 observer.stop();
863 }
864
865 #[test]
866 fn clear_removes_scope_observation() {
867 let _guard = reset_runtime();
868
869 let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
870 let triggered = Rc::new(Cell::new(0));
871 let observer_trigger = triggered.clone();
872
873 let observer = SnapshotStateObserver::new(|callback| callback());
874 observer.start();
875
876 let scope = TestScope("scope");
877 observer.observe_reads(
878 scope.clone(),
879 move |_| {
880 observer_trigger.set(observer_trigger.get() + 1);
881 },
882 || {
883 let _ = state.get();
884 },
885 );
886
887 observer.clear(&scope);
888
889 let snapshot = take_mutable_snapshot(None, None);
890 snapshot.enter(|| {
891 state.set(1);
892 });
893 snapshot.apply().check();
894
895 assert_eq!(triggered.get(), 0);
896 observer.stop();
897 }
898
899 #[test]
900 fn repeated_owned_scope_observations_reuse_the_same_entry() {
901 let _guard = reset_runtime();
902
903 let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
904 let observer = SnapshotStateObserver::new(|callback| callback());
905 let scope = TestScope("scope");
906
907 observer.observe_reads(
908 scope.clone(),
909 |_| {},
910 || {
911 let _ = state.get();
912 },
913 );
914 observer.observe_reads(
915 scope,
916 |_| {},
917 || {
918 let _ = state.get();
919 },
920 );
921
922 let stats = observer.debug_stats();
923 assert_eq!(stats.scopes_len, 1);
924 assert_eq!(stats.fast_scopes_len, 0);
925 }
926
927 #[test]
928 fn with_no_observations_skips_reads() {
929 let _guard = reset_runtime();
930
931 let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
932 let triggered = Rc::new(Cell::new(0));
933 let observer_trigger = triggered.clone();
934
935 let observer = SnapshotStateObserver::new(|callback| callback());
936 observer.start();
937
938 let scope = TestScope("scope");
939 observer.observe_reads(
940 scope.clone(),
941 move |_| {
942 observer_trigger.set(observer_trigger.get() + 1);
943 },
944 || {
945 observer.with_no_observations(|| {
946 let _ = state.get();
947 });
948 },
949 );
950
951 let snapshot = take_mutable_snapshot(None, None);
952 snapshot.enter(|| {
953 state.set(1);
954 });
955 snapshot.apply().check();
956
957 assert_eq!(triggered.get(), 0);
958 observer.stop();
959 }
960
961 #[test]
962 fn nested_observe_reads_attributes_state_to_innermost_scope_only() {
963 let _guard = reset_runtime();
964
965 let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
966 let outer_triggered = Rc::new(Cell::new(0));
967 let inner_triggered = Rc::new(Cell::new(0));
968
969 let observer = SnapshotStateObserver::new(|callback| callback());
970 observer.start();
971
972 let outer_scope = TestScope("outer");
973 let inner_scope = TestScope("inner");
974 observer.observe_reads(
975 outer_scope.clone(),
976 {
977 let outer_triggered = Rc::clone(&outer_triggered);
978 move |_| outer_triggered.set(outer_triggered.get() + 1)
979 },
980 || {
981 observer.observe_reads(
982 inner_scope.clone(),
983 {
984 let inner_triggered = Rc::clone(&inner_triggered);
985 move |_| inner_triggered.set(inner_triggered.get() + 1)
986 },
987 || {
988 let _ = state.get();
989 },
990 );
991 },
992 );
993
994 let snapshot = take_mutable_snapshot(None, None);
995 snapshot.enter(|| {
996 state.set(1);
997 });
998 snapshot.apply().check();
999
1000 assert_eq!(outer_triggered.get(), 0);
1001 assert_eq!(inner_triggered.get(), 1);
1002 observer.stop();
1003 }
1004
1005 #[test]
1006 fn clearing_one_scope_keeps_shared_state_registered_for_other_scope() {
1007 let _guard = reset_runtime();
1008
1009 let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
1010 let first_triggered = Rc::new(Cell::new(0));
1011 let second_triggered = Rc::new(Cell::new(0));
1012
1013 let observer = SnapshotStateObserver::new(|callback| callback());
1014 observer.start();
1015
1016 let first_scope = TestScope("first");
1017 let second_scope = TestScope("second");
1018 observer.observe_reads(
1019 first_scope.clone(),
1020 {
1021 let first_triggered = Rc::clone(&first_triggered);
1022 move |_| first_triggered.set(first_triggered.get() + 1)
1023 },
1024 || {
1025 let _ = state.get();
1026 },
1027 );
1028 observer.observe_reads(
1029 second_scope.clone(),
1030 {
1031 let second_triggered = Rc::clone(&second_triggered);
1032 move |_| second_triggered.set(second_triggered.get() + 1)
1033 },
1034 || {
1035 let _ = state.get();
1036 },
1037 );
1038
1039 observer.clear(&first_scope);
1040
1041 let snapshot = take_mutable_snapshot(None, None);
1042 snapshot.enter(|| {
1043 state.set(1);
1044 });
1045 snapshot.apply().check();
1046
1047 assert_eq!(first_triggered.get(), 0);
1048 assert_eq!(second_triggered.get(), 1);
1049 observer.stop();
1050 }
1051
1052 #[test]
1053 fn shared_state_notifies_scopes_in_registration_order() {
1054 let _guard = reset_runtime();
1055
1056 let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
1057 let notifications = Rc::new(RefCell::new(Vec::new()));
1058
1059 let observer = SnapshotStateObserver::new(|callback| callback());
1060 observer.start();
1061
1062 observer.observe_reads(
1063 TestScope("first"),
1064 {
1065 let notifications = Rc::clone(¬ifications);
1066 move |_| notifications.borrow_mut().push("first")
1067 },
1068 || {
1069 let _ = state.get();
1070 },
1071 );
1072 observer.observe_reads(
1073 TestScope("second"),
1074 {
1075 let notifications = Rc::clone(¬ifications);
1076 move |_| notifications.borrow_mut().push("second")
1077 },
1078 || {
1079 let _ = state.get();
1080 },
1081 );
1082
1083 let snapshot = take_mutable_snapshot(None, None);
1084 snapshot.enter(|| {
1085 state.set(1);
1086 });
1087 snapshot.apply().check();
1088
1089 assert_eq!(notifications.borrow().as_slice(), &["first", "second"]);
1090 observer.stop();
1091 }
1092
1093 #[test]
1094 fn stateless_recompose_scope_does_not_retain_observer_entry() {
1095 let _guard = reset_runtime();
1096
1097 let observer = SnapshotStateObserver::new(|callback| callback());
1098 let runtime = crate::TestRuntime::new();
1099 let scope = RecomposeScope::new_for_test(runtime.handle());
1100
1101 observer.observe_reads(scope, |_| {}, || {});
1102
1103 let stats = observer.debug_stats();
1104 assert_eq!(stats.scopes_len, 0);
1105 assert_eq!(stats.fast_scopes_len, 0);
1106 assert_eq!(stats.stateless_scope_count, 0);
1107 }
1108
1109 #[test]
1110 fn scope_that_stops_reading_state_is_removed_immediately() {
1111 let _guard = reset_runtime();
1112
1113 let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
1114 let observer = SnapshotStateObserver::new(|callback| callback());
1115 let runtime = crate::TestRuntime::new();
1116 let scope = RecomposeScope::new_for_test(runtime.handle());
1117 let triggered = Rc::new(Cell::new(0));
1118 let observer_trigger = Rc::clone(&triggered);
1119
1120 observer.observe_reads(
1121 scope.clone(),
1122 move |_| observer_trigger.set(observer_trigger.get() + 1),
1123 || {
1124 let _ = state.get();
1125 },
1126 );
1127
1128 let after_stateful = observer.debug_stats();
1129 assert_eq!(after_stateful.scopes_len, 1);
1130 assert_eq!(after_stateful.fast_scopes_len, 1);
1131
1132 observer.observe_reads(scope, |_| {}, || {});
1133
1134 let after_stateless = observer.debug_stats();
1135 assert_eq!(after_stateless.scopes_len, 0);
1136 assert_eq!(after_stateless.fast_scopes_len, 0);
1137
1138 let snapshot = take_mutable_snapshot(None, None);
1139 snapshot.enter(|| {
1140 state.set(1);
1141 });
1142 snapshot.apply().check();
1143
1144 assert_eq!(triggered.get(), 0);
1145 }
1146
1147 #[test]
1148 fn begin_frame_prunes_dropped_recompose_scope_entries() {
1149 let _guard = reset_runtime();
1150
1151 let state = SnapshotMutableState::new_in_arc(0, Arc::new(NeverEqual));
1152 let observer = SnapshotStateObserver::new(|callback| callback());
1153 let runtime = crate::TestRuntime::new();
1154 let scope = RecomposeScope::new_for_test(runtime.handle());
1155
1156 observer.observe_reads(
1157 scope.clone(),
1158 |_| {},
1159 || {
1160 let _ = state.get();
1161 },
1162 );
1163
1164 let before_prune = observer.debug_stats();
1165 assert_eq!(before_prune.scopes_len, 1);
1166 assert_eq!(before_prune.fast_scopes_len, 1);
1167
1168 drop(scope);
1169 observer.begin_frame();
1170
1171 let after_prune = observer.debug_stats();
1172 assert_eq!(after_prune.scopes_len, 0);
1173 assert_eq!(after_prune.fast_scopes_len, 0);
1174 }
1175}