Skip to main content

cranpose_core/
state.rs

1// StateRecord uses Rc with Cell for single-threaded shared ownership in the snapshot system.
2#![allow(clippy::arc_with_non_send_sync)]
3
4use crate::collections::map::{HashMap, HashSet};
5use crate::debug_trace::debug_record_scope_invalidation;
6use std::any::Any;
7use std::cell::{Cell, RefCell};
8use std::fmt;
9use std::hash::Hash;
10use std::marker::PhantomData;
11use std::ops::Deref;
12use std::rc::{Rc, Weak as RcWeak};
13use std::sync::{Arc, Mutex, Weak};
14
15use crate::snapshot_id_set::{SnapshotId, SnapshotIdSet};
16use crate::snapshot_pinning::lowest_pinned_snapshot;
17use crate::snapshot_v2::{
18    advance_global_snapshot, allocate_record_id, current_snapshot, AnySnapshot, GlobalSnapshot,
19};
20use crate::{runtime, with_current_composer_opt, RecomposeScope, RuntimeHandle, ScopeId, StateId};
21
22pub(crate) const PREEXISTING_SNAPSHOT_ID: SnapshotId = 1;
23
24const INVALID_SNAPSHOT_ID: SnapshotId = 0;
25
26/// Maximum snapshot ID used to mark records as invisible during initialization
27const SNAPSHOT_ID_MAX: SnapshotId = usize::MAX;
28
29#[derive(Clone, Copy, Eq, PartialEq, Hash, Debug, Default)]
30pub struct ObjectId(pub(crate) usize);
31
32impl ObjectId {
33    pub(crate) fn new<T: ?Sized + 'static>(object: &Arc<T>) -> Self {
34        Self(Arc::as_ptr(object) as *const () as usize)
35    }
36
37    #[inline]
38    pub(crate) fn as_usize(self) -> usize {
39        self.0
40    }
41}
42
43/// A record in the state history chain.
44///
45/// # Thread Safety
46/// Contains `Cell<T>` which is not `Send`/`Sync`. This is safe because state records
47/// are accessed only from the UI thread via thread-local snapshot system. The `Rc`
48/// is used for cheap cloning and shared ownership within a single thread.
49pub struct StateRecord {
50    snapshot_id: Cell<SnapshotId>,
51    tombstone: Cell<bool>,
52    next: Cell<Option<Rc<StateRecord>>>,
53    value: RefCell<Option<Box<dyn Any>>>,
54}
55
56impl StateRecord {
57    pub(crate) fn new<T: Any>(
58        snapshot_id: SnapshotId,
59        value: T,
60        next: Option<Rc<StateRecord>>,
61    ) -> Rc<Self> {
62        Rc::new(Self {
63            snapshot_id: Cell::new(snapshot_id),
64            tombstone: Cell::new(false),
65            next: Cell::new(next),
66            value: RefCell::new(Some(Box::new(value))),
67        })
68    }
69
70    #[inline]
71    pub(crate) fn snapshot_id(&self) -> SnapshotId {
72        self.snapshot_id.get()
73    }
74
75    #[inline]
76    pub(crate) fn set_snapshot_id(&self, id: SnapshotId) {
77        self.snapshot_id.set(id);
78    }
79
80    #[inline]
81    pub(crate) fn next(&self) -> Option<Rc<StateRecord>> {
82        self.next.take().inspect(|record| {
83            self.next.set(Some(Rc::clone(record)));
84        })
85    }
86
87    #[inline]
88    pub(crate) fn set_next(&self, next: Option<Rc<StateRecord>>) {
89        self.next.set(next);
90    }
91
92    #[inline]
93    pub(crate) fn is_tombstone(&self) -> bool {
94        self.tombstone.get()
95    }
96
97    #[inline]
98    pub(crate) fn set_tombstone(&self, tombstone: bool) {
99        self.tombstone.set(tombstone);
100    }
101
102    pub(crate) fn clear_value(&self) {
103        self.value.borrow_mut().take();
104    }
105
106    pub(crate) fn replace_value<T: Any>(&self, new_value: T) {
107        *self.value.borrow_mut() = Some(Box::new(new_value));
108    }
109
110    pub(crate) fn with_value<T: Any, R>(&self, f: impl FnOnce(&T) -> R) -> R {
111        let guard = self.value.borrow();
112        let value = guard
113            .as_ref()
114            .and_then(|boxed| boxed.downcast_ref::<T>())
115            .expect("StateRecord value missing or wrong type");
116        f(value)
117    }
118
119    /// Clears the value from this record to free memory.
120    /// Used when marking records as reusable - clears the value to reduce memory usage.
121    #[cfg(test)]
122    pub(crate) fn clear_for_reuse(&self) {
123        self.clear_value();
124    }
125
126    /// Copies the value from the source record into this record.
127    ///
128    /// This is used during record reuse to copy valid data from a readable record
129    /// into a reused record, and during cleanup to preserve data in records being
130    /// marked as INVALID_SNAPSHOT.
131    ///
132    /// # Type Safety
133    /// The caller must ensure both records contain values of type `T`.
134    /// Panics if the source record doesn't contain a value of type `T`.
135    pub(crate) fn assign_value<T: Any + Clone>(&self, source: &StateRecord) {
136        let cloned_value = source.with_value(|value: &T| value.clone());
137        self.replace_value(cloned_value);
138    }
139}
140
141impl Drop for StateRecord {
142    fn drop(&mut self) {
143        // Prevent recursive drop of deep chains which can cause stack overflow.
144        // We iteratively detach and drop the next record if we are the sole owner.
145        let mut next = self.next.take();
146        while let Some(node) = next {
147            match Rc::try_unwrap(node) {
148                Ok(record) => {
149                    // We were the last owner. Take its next pointer to continue the loop.
150                    // The record itself will be dropped here, but since next is None,
151                    // it won't recurse.
152                    next = record.next.take();
153                }
154                Err(_) => {
155                    // Someone else holds a reference to this node.
156                    // The chain destruction stops here.
157                    break;
158                }
159            }
160        }
161    }
162}
163
164/// Owns the mutable head pointer for a state's record chain.
165///
166/// Snapshot code clones the current head, prepends new records, and swaps in
167/// replacement heads frequently. Centralizing those operations keeps the
168/// `RefCell<Rc<StateRecord>>` borrow protocol out of the higher-level state
169/// logic.
170struct CurrentRecord {
171    head: RefCell<Rc<StateRecord>>,
172}
173
174impl CurrentRecord {
175    fn new(head: Rc<StateRecord>) -> Self {
176        Self {
177            head: RefCell::new(head),
178        }
179    }
180
181    fn clone_head(&self) -> Rc<StateRecord> {
182        self.head.borrow().clone()
183    }
184
185    fn replace(&self, new_head: Rc<StateRecord>) {
186        *self.head.borrow_mut() = new_head;
187    }
188
189    fn prepend(&self, record: Rc<StateRecord>) {
190        let current_head = self.clone_head();
191        record.set_next(Some(current_head));
192        self.replace(record);
193    }
194}
195
196#[inline]
197fn record_is_valid_for(
198    record: &Rc<StateRecord>,
199    snapshot_id: SnapshotId,
200    invalid: &SnapshotIdSet,
201) -> bool {
202    if record.is_tombstone() {
203        return false;
204    }
205
206    let candidate = record.snapshot_id();
207    if candidate == INVALID_SNAPSHOT_ID || candidate > snapshot_id {
208        return false;
209    }
210
211    candidate == snapshot_id || !invalid.get(candidate)
212}
213
214pub(crate) fn readable_record_for(
215    head: &Rc<StateRecord>,
216    snapshot_id: SnapshotId,
217    invalid: &SnapshotIdSet,
218) -> Option<Rc<StateRecord>> {
219    // Find the highest valid record in the chain.
220    // We must scan the full chain because reused records may not be prepended
221    // as the head but still need to be found (e.g., after writes using record reuse).
222    let mut best: Option<Rc<StateRecord>> = None;
223    let mut cursor = Some(Rc::clone(head));
224
225    while let Some(record) = cursor {
226        if record_is_valid_for(&record, snapshot_id, invalid) {
227            let replace = best
228                .as_ref()
229                .map(|current| current.snapshot_id() < record.snapshot_id())
230                .unwrap_or(true);
231            if replace {
232                best = Some(Rc::clone(&record));
233            }
234        }
235        cursor = record.next();
236    }
237
238    best
239}
240
241/// Finds the youngest record in the chain, or the first one matching the predicate.
242///
243/// Searches the record chain starting from the given head:
244/// - If a record matches the predicate, returns it immediately
245/// - Otherwise, tracks the youngest record (highest snapshot_id) and returns it
246fn find_youngest_or<F>(head: &Rc<StateRecord>, predicate: F) -> Rc<StateRecord>
247where
248    F: Fn(&Rc<StateRecord>) -> bool,
249{
250    let mut current = Some(Rc::clone(head));
251    let mut youngest = Rc::clone(head);
252
253    while let Some(record) = current {
254        if predicate(&record) {
255            return record;
256        }
257        if youngest.snapshot_id() < record.snapshot_id() {
258            youngest = Rc::clone(&record);
259        }
260        current = record.next();
261    }
262
263    youngest
264}
265
266/// Finds a StateRecord that can be safely reused because no open snapshot can see it.
267///
268/// Returns a record that either:
269/// 1. Is marked as INVALID_SNAPSHOT (abandoned/tombstone)
270/// 2. Is obscured by a newer record (both are below the reuse limit)
271///
272/// The reuse limit is `lowest_pinned_snapshot - 1`, meaning any record with a snapshot ID
273/// at or below this value cannot be selected by any currently open snapshot.
274///
275/// Note: PREEXISTING records (snapshot_id=1) are never reused to maintain the ability
276/// for all snapshots to read the initial state.
277pub(crate) fn used_locked(head: &Rc<StateRecord>) -> Option<Rc<StateRecord>> {
278    let mut current = Some(Rc::clone(head));
279    let mut valid_record: Option<Rc<StateRecord>> = None;
280
281    // Calculate reuse limit: records below this ID are invisible to all open snapshots
282    let reuse_limit = lowest_pinned_snapshot()
283        .map(|lowest| lowest.saturating_sub(1))
284        .unwrap_or_else(|| allocate_record_id().saturating_sub(1));
285
286    let invalid = SnapshotIdSet::EMPTY;
287
288    while let Some(record) = current {
289        let current_id = record.snapshot_id();
290
291        // Never reuse PREEXISTING records - they must always be available as a fallback
292        if current_id == PREEXISTING_SNAPSHOT_ID {
293            current = record.next();
294            continue;
295        }
296
297        // Fast path: records marked INVALID_SNAPSHOT can be reused immediately
298        if current_id == INVALID_SNAPSHOT_ID {
299            return Some(record);
300        }
301
302        if record.is_tombstone() && current_id < reuse_limit {
303            return Some(record);
304        }
305
306        // Check if this record is valid for snapshots at or below the reuse limit
307        if record_is_valid_for(&record, reuse_limit, &invalid) {
308            if let Some(ref existing) = valid_record {
309                // We found two valid records below the reuse limit.
310                // This means one obscures the other - return the older one for reuse.
311                return Some(if current_id < existing.snapshot_id() {
312                    record
313                } else {
314                    Rc::clone(existing)
315                });
316            } else {
317                // First valid record below reuse limit - keep looking
318                valid_record = Some(record.clone());
319            }
320        }
321
322        current = record.next();
323    }
324
325    // No reusable record found
326    None
327}
328
329/// Creates a new overwritable record for a state object, reusing an existing record if possible.
330///
331/// The record is initially marked with SNAPSHOT_ID_MAX to make it invisible to all snapshots
332/// during initialization. The caller must:
333/// 1. Copy/set the desired value into the record
334/// 2. Set the final snapshot_id
335///
336/// Returns a record that is either:
337/// - A reused record (if `used_locked()` found one), marked with SNAPSHOT_ID_MAX
338/// - A newly created record, prepended to the state's record chain via `prepend_state_record()`
339pub(crate) fn new_overwritable_record_locked(state: &dyn StateObject) -> Rc<StateRecord> {
340    let state_head = state.first_record();
341
342    // Try to reuse an existing record
343    if let Some(reusable) = used_locked(&state_head) {
344        // Mark as invisible during initialization
345        reusable.set_snapshot_id(SNAPSHOT_ID_MAX);
346        return reusable;
347    }
348
349    // No reusable record found - create a new one
350    // The new record is prepended to the chain with a placeholder value
351    // Caller must use replace_value() to set the actual value
352    let new_record = StateRecord::new(
353        SNAPSHOT_ID_MAX,
354        (),   // Placeholder value - caller will replace this
355        None, // next will be set by prepend_state_record
356    );
357
358    // Prepend the new record to the state's chain
359    state.prepend_state_record(Rc::clone(&new_record));
360
361    new_record
362}
363
364/// Creates an overwritable record and ensures it is the head of the record chain.
365///
366/// This is used for global snapshot writes where the newest record must be at the head
367/// to keep tombstoning logic consistent. Reused records are unlinked from their current
368/// position before being prepended.
369pub(crate) fn new_overwritable_record_as_head_locked(state: &dyn StateObject) -> Rc<StateRecord> {
370    let head = state.first_record();
371
372    if let Some(reusable) = used_locked(&head) {
373        reusable.set_snapshot_id(SNAPSHOT_ID_MAX);
374
375        if !Rc::ptr_eq(&head, &reusable) {
376            let mut cursor = Some(Rc::clone(&head));
377            let mut unlinked = false;
378
379            while let Some(node) = cursor {
380                let next = node.next();
381                if let Some(next_record) = next {
382                    if Rc::ptr_eq(&next_record, &reusable) {
383                        node.set_next(reusable.next());
384                        unlinked = true;
385                        break;
386                    }
387                    cursor = Some(next_record);
388                } else {
389                    break;
390                }
391            }
392
393            if !unlinked {
394                debug_assert!(
395                    false,
396                    "new_overwritable_record_as_head_locked: reusable record not found in chain"
397                );
398                let new_record = StateRecord::new(SNAPSHOT_ID_MAX, (), None);
399                state.prepend_state_record(Rc::clone(&new_record));
400                return new_record;
401            }
402
403            state.prepend_state_record(Rc::clone(&reusable));
404        }
405
406        return reusable;
407    }
408
409    let new_record = StateRecord::new(SNAPSHOT_ID_MAX, (), None);
410    state.prepend_state_record(Rc::clone(&new_record));
411    new_record
412}
413
414/// Overwrites unused records in a state object's record chain with data from retained records.
415///
416/// This function implements Kotlin's `overwriteUnusedRecordsLocked` to reclaim memory by:
417/// 1. Finding records below the reuse limit (records invisible to all open snapshots)
418/// 2. Keeping the highest record below the reuse limit (so lowest pinned snapshot can see it)
419/// 3. Marking older obscured records as INVALID_SNAPSHOT and copying valid data into them
420///
421/// The valid data is copied from a "young" record (above reuse limit) to ensure that if
422/// an invalidated record is somehow accessed, it contains current valid data rather than
423/// cleared/garbage values.
424///
425/// Returns `true` if the state has multiple retained records and should stay in extraStateObjects,
426/// `false` if it can be removed from tracking.
427pub(crate) fn overwrite_unused_records_locked<T: Any + Clone>(state: &dyn StateObject) -> bool {
428    let head = state.first_record();
429    let mut current = Some(Rc::clone(&head));
430    let mut overwrite_record: Option<Rc<StateRecord>> = None;
431    let mut valid_record: Option<Rc<StateRecord>> = None;
432
433    // Calculate reuse limit: records below this ID are invisible to all open snapshots
434    // Mirrors Kotlin's: val reuseLimit = pinningTable.lowestOrDefault(nextSnapshotId)
435    let reuse_limit =
436        lowest_pinned_snapshot().unwrap_or_else(crate::snapshot_v2::peek_next_snapshot_id);
437
438    let mut retained_records = 0;
439
440    while let Some(record) = current {
441        let current_id = record.snapshot_id();
442
443        if current_id == INVALID_SNAPSHOT_ID {
444            // Already invalid, skip
445        } else if current_id < reuse_limit {
446            if valid_record.is_none() {
447                // If any records are below reuse_limit, we must keep the highest one
448                // so the lowest snapshot can select it
449                valid_record = Some(Rc::clone(&record));
450                retained_records += 1;
451            } else {
452                // We have two records below the reuse limit - one obscures the other
453                // Overwrite the older one (lower snapshot_id)
454                let valid = valid_record.as_ref().unwrap();
455                let record_to_overwrite = if current_id < valid.snapshot_id() {
456                    Rc::clone(&record)
457                } else {
458                    // Keep current as valid, overwrite the previous valid
459                    let to_overwrite = Rc::clone(valid);
460                    valid_record = Some(Rc::clone(&record));
461                    to_overwrite
462                };
463
464                // Lazily find a young record to copy data from
465                if overwrite_record.is_none() {
466                    // Find the youngest record, or first record >= reuseLimit
467                    overwrite_record =
468                        Some(find_youngest_or(&head, |r| r.snapshot_id() >= reuse_limit));
469                }
470
471                // Mark the old record as invalid and copy valid data into it
472                record_to_overwrite.set_snapshot_id(INVALID_SNAPSHOT_ID);
473                record_to_overwrite.assign_value::<T>(overwrite_record.as_ref().unwrap());
474            }
475        } else {
476            // Record is above reuse limit - it's still visible and must be kept
477            retained_records += 1;
478        }
479
480        current = record.next();
481    }
482
483    // Return true if we have multiple records that must be retained
484    // (state should stay in extraStateObjects for future cleanup)
485    retained_records > 1
486}
487
488fn active_snapshot() -> AnySnapshot {
489    current_snapshot().unwrap_or_else(|| AnySnapshot::Global(GlobalSnapshot::get_or_create()))
490}
491
492pub(crate) trait MutationPolicy<T>: Send + Sync {
493    fn equivalent(&self, a: &T, b: &T) -> bool;
494    fn merge(&self, _previous: &T, _current: &T, _applied: &T) -> Option<T> {
495        None
496    }
497}
498
499pub(crate) struct NeverEqual;
500
501impl<T> MutationPolicy<T> for NeverEqual {
502    fn equivalent(&self, _a: &T, _b: &T) -> bool {
503        false
504    }
505}
506
507pub trait StateObject: Any {
508    fn object_id(&self) -> ObjectId;
509    fn first_record(&self) -> Rc<StateRecord>;
510    fn readable_record(&self, snapshot_id: SnapshotId, invalid: &SnapshotIdSet) -> Rc<StateRecord>;
511
512    /// Prepends a record to the head of the record chain.
513    /// This is used when reusing records - the record's next pointer is updated to point to the current head,
514    /// and the head is updated to point to the new record.
515    fn prepend_state_record(&self, record: Rc<StateRecord>);
516
517    fn merge_records(
518        &self,
519        _previous: Rc<StateRecord>,
520        _current: Rc<StateRecord>,
521        _applied: Rc<StateRecord>,
522    ) -> Option<Rc<StateRecord>> {
523        None
524    }
525
526    fn commit_merged_record(&self, _merged: Rc<StateRecord>) -> Result<SnapshotId, &'static str> {
527        Err("StateObject does not support merged record commits")
528    }
529    fn promote_record(&self, child_id: SnapshotId) -> Result<(), &'static str>;
530
531    /// Overwrites unused records in this state's record chain with valid data.
532    ///
533    /// Returns `true` if the state has multiple retained records and should stay in extraStateObjects,
534    /// `false` if it can be removed from tracking.
535    fn overwrite_unused_records(&self) -> bool {
536        false // Default implementation for states that don't support cleanup
537    }
538
539    /// Downcast to Any for testing/debugging purposes.
540    fn as_any(&self) -> &dyn Any;
541}
542
543pub(crate) struct SnapshotMutableState<T> {
544    head: CurrentRecord,
545    policy: Arc<dyn MutationPolicy<T>>,
546    id: ObjectId,
547    weak_self: Mutex<Option<Weak<Self>>>,
548    apply_observers: Mutex<Vec<Box<dyn Fn() + 'static>>>,
549}
550
551impl<T> SnapshotMutableState<T> {
552    fn assert_chain_integrity(&self, caller: &str, snapshot_context: Option<SnapshotId>) {
553        if !should_check_chain_integrity() {
554            return;
555        }
556        let head = self.head.clone_head();
557        let mut cursor = Some(head);
558        let mut seen: HashSet<usize> = HashSet::default();
559        let mut ids = Vec::new();
560
561        while let Some(record) = cursor {
562            let addr = Rc::as_ptr(&record) as usize;
563            assert!(
564                seen.insert(addr),
565                "SnapshotMutableState::{} detected duplicate/cycle at record {:p} for state {:?} (snapshot_context={:?}, chain_ids={:?})",
566                caller,
567                Rc::as_ptr(&record),
568                self.id,
569                snapshot_context,
570                ids
571            );
572            ids.push(record.snapshot_id());
573            cursor = record.next();
574        }
575
576        assert!(
577            !ids.is_empty(),
578            "SnapshotMutableState::{} finished integrity scan with empty id list for state {:?} (snapshot_context={:?})",
579            caller,
580            self.id,
581            snapshot_context
582        );
583    }
584}
585
586fn should_check_chain_integrity() -> bool {
587    #[cfg(debug_assertions)]
588    {
589        true
590    }
591
592    #[cfg(not(debug_assertions))]
593    {
594        use std::sync::OnceLock;
595        static CHECK: OnceLock<bool> = OnceLock::new();
596        *CHECK.get_or_init(|| std::env::var_os("CRANPOSE_ASSERT_STATE_CHAIN").is_some())
597    }
598}
599
600impl<T: Clone + 'static> SnapshotMutableState<T> {
601    fn readable_for(
602        &self,
603        snapshot_id: SnapshotId,
604        invalid: &SnapshotIdSet,
605    ) -> Option<Rc<StateRecord>> {
606        let head = self.first_record();
607        readable_record_for(&head, snapshot_id, invalid)
608    }
609
610    fn writable_record(&self, snapshot_id: SnapshotId, invalid: &SnapshotIdSet) -> Rc<StateRecord> {
611        let readable = match self.readable_for(snapshot_id, invalid) {
612            Some(record) => record,
613            None => {
614                let current_head = self.head.clone_head();
615                let refreshed = readable_record_for(&current_head, snapshot_id, invalid);
616                let source = refreshed.unwrap_or_else(|| current_head.clone());
617
618                // Create a new record
619                // Record reuse is NOT used here to preserve history for conflict detection
620                // Reuse happens during cleanup (overwrite_unused_records_locked)
621                let cloned_value = source.with_value(|value: &T| value.clone());
622                let new_head = StateRecord::new(snapshot_id, cloned_value, Some(current_head));
623                self.head.replace(new_head.clone());
624                self.assert_chain_integrity("writable_record(recover)", Some(snapshot_id));
625                return new_head;
626            }
627        };
628
629        if readable.snapshot_id() == snapshot_id {
630            return readable;
631        }
632
633        let refreshed = {
634            let current_head = self.head.clone_head();
635            let refreshed = readable_record_for(&current_head, snapshot_id, invalid).unwrap_or_else(
636                || {
637                    panic!(
638                        "SnapshotMutableState::writable_record failed to locate refreshed readable record (state {:?}, snapshot_id={}, invalid={:?})",
639                        self.id, snapshot_id, invalid
640                    )
641                },
642            );
643
644            if refreshed.snapshot_id() == snapshot_id {
645                return refreshed;
646            }
647
648            Rc::clone(&refreshed)
649        };
650
651        let overwritable = new_overwritable_record_locked(self);
652        overwritable.assign_value::<T>(&refreshed);
653        overwritable.set_snapshot_id(snapshot_id);
654        overwritable.set_tombstone(false);
655
656        self.assert_chain_integrity("writable_record(reuse)", Some(snapshot_id));
657
658        overwritable
659    }
660
661    pub(crate) fn new_in_arc(initial: T, policy: Arc<dyn MutationPolicy<T>>) -> Arc<Self> {
662        let snapshot = active_snapshot();
663        let snapshot_id = snapshot.snapshot_id();
664
665        let tail = StateRecord::new(PREEXISTING_SNAPSHOT_ID, initial.clone(), None);
666        let head = StateRecord::new(snapshot_id, initial, Some(tail));
667
668        let mut state = Arc::new(Self {
669            head: CurrentRecord::new(head),
670            policy,
671            id: ObjectId::default(),
672            weak_self: Mutex::new(None),
673            apply_observers: Mutex::new(Vec::new()),
674        });
675
676        let id = ObjectId::new(&state);
677        Arc::get_mut(&mut state).expect("fresh Arc").id = id;
678
679        *state.weak_self.lock().expect("Weak self lock poisoned") = Some(Arc::downgrade(&state));
680
681        // No need to advance the global snapshot for initial state creation
682
683        state
684    }
685
686    pub(crate) fn add_apply_observer(&self, observer: Box<dyn Fn() + 'static>) {
687        self.apply_observers
688            .lock()
689            .expect("Observers lock poisoned")
690            .push(observer);
691    }
692
693    fn notify_applied(&self) {
694        let observers = self
695            .apply_observers
696            .lock()
697            .expect("Observers lock poisoned");
698        for observer in observers.iter() {
699            observer();
700        }
701    }
702
703    #[inline]
704    pub(crate) fn id(&self) -> ObjectId {
705        self.id
706    }
707
708    pub(crate) fn get(&self) -> T {
709        let snapshot = active_snapshot();
710        if let Some(state) = self
711            .weak_self
712            .lock()
713            .expect("Weak self lock poisoned")
714            .as_ref()
715            .and_then(|weak| weak.upgrade())
716        {
717            snapshot.record_read(&*state);
718        }
719
720        let snapshot_id = snapshot.snapshot_id();
721        let invalid = snapshot.invalid();
722
723        if let Some(record) = self.readable_for(snapshot_id, &invalid) {
724            return record.with_value(|value: &T| value.clone());
725        }
726
727        // Retry with fresh snapshot in case global snapshot was advanced
728        let fresh_snapshot = active_snapshot();
729        let fresh_id = fresh_snapshot.snapshot_id();
730        let fresh_invalid = fresh_snapshot.invalid();
731
732        if let Some(record) = self.readable_for(fresh_id, &fresh_invalid) {
733            return record.with_value(|value: &T| value.clone());
734        }
735
736        // Fallback: try reading directly from the global snapshot.
737        // This handles the case where sibling mutable snapshots have been applied
738        // but our current snapshot's invalid set was fixed at creation time.
739        // The global snapshot should have all applied changes visible.
740        let global = GlobalSnapshot::get_or_create();
741        let global_id = global.snapshot_id();
742        let global_invalid = global.invalid();
743
744        if let Some(record) = self.readable_for(global_id, &global_invalid) {
745            return record.with_value(|value: &T| value.clone());
746        }
747
748        // Debug: print the record chain to understand what's available
749        let head = self.first_record();
750        let mut chain_ids = Vec::new();
751        let mut cursor = Some(head);
752        while let Some(record) = cursor {
753            chain_ids.push((record.snapshot_id(), record.is_tombstone()));
754            cursor = record.next();
755        }
756
757        // If still null, this is an error condition
758        panic!(
759            "Reading a state that was created after the snapshot was taken or in a snapshot that has not yet been applied\n\
760             state={:?}, snapshot_id={}, fresh_snapshot_id={}, fresh_invalid={:?}\n\
761             record_chain={:?}",
762            self.id, snapshot_id, fresh_id, fresh_invalid, chain_ids
763        );
764    }
765
766    pub(crate) fn set(&self, new_value: T) -> bool {
767        // Debug-only check: warn if modifying state in event handler without proper snapshot
768        #[cfg(debug_assertions)]
769        {
770            let in_handler = crate::in_event_handler();
771            let in_snapshot = crate::in_applied_snapshot();
772            if in_handler && !in_snapshot {
773                log::warn!(
774                    target: "cranpose::state",
775                    "State modified in event handler without run_in_mutable_snapshot; \
776                     this can make updates invisible to other contexts. Wrap the handler \
777                     in run_in_mutable_snapshot() or dispatch_ui_event(). State: {:?}",
778                    self.id
779                );
780            }
781        }
782
783        let snapshot = active_snapshot();
784        let snapshot_id = snapshot.snapshot_id();
785
786        match &snapshot {
787            AnySnapshot::Global(global) => {
788                let invalid = snapshot.invalid();
789                let equivalent = self
790                    .readable_for(snapshot_id, &invalid)
791                    .map(|record| {
792                        record.with_value(|current: &T| self.policy.equivalent(current, &new_value))
793                    })
794                    .unwrap_or(false);
795                if equivalent {
796                    return false;
797                }
798
799                if global.has_pending_children() {
800                    panic!(
801                        "SnapshotMutableState::set attempted global write while pending children {:?} exist (state {:?}, snapshot_id={})",
802                        global.pending_children(),
803                        self.id,
804                        snapshot_id
805                    );
806                }
807
808                let mut written_state: Option<Arc<dyn StateObject>> = None;
809                if let Some(state) = self
810                    .weak_self
811                    .lock()
812                    .expect("Weak self lock poisoned")
813                    .as_ref()
814                    .and_then(|weak| weak.upgrade())
815                {
816                    let trait_object: Arc<dyn StateObject> = state.clone();
817                    snapshot.record_write(trait_object.clone());
818                    written_state = Some(trait_object);
819                }
820                mark_update_write(self.id);
821
822                let new_id = allocate_record_id();
823                let record = new_overwritable_record_as_head_locked(self);
824                record.replace_value(new_value);
825                record.set_snapshot_id(new_id);
826                record.set_tombstone(false);
827                advance_global_snapshot(new_id);
828                self.assert_chain_integrity("set(global-push)", Some(snapshot_id));
829
830                if !global.has_pending_children() {
831                    let mut cursor = record.next();
832                    while let Some(node) = cursor {
833                        if !node.is_tombstone() && node.snapshot_id() != PREEXISTING_SNAPSHOT_ID {
834                            node.clear_value();
835                            node.set_tombstone(true);
836                        }
837                        cursor = node.next();
838                    }
839                    self.assert_chain_integrity("set(global-tombstone)", Some(snapshot_id));
840                }
841
842                if let Some(modified) = written_state.as_ref() {
843                    crate::snapshot_v2::notify_apply_observers(
844                        std::slice::from_ref(modified),
845                        new_id,
846                    );
847                }
848            }
849            AnySnapshot::Mutable(_)
850            | AnySnapshot::NestedMutable(_)
851            | AnySnapshot::TransparentMutable(_) => {
852                let invalid = snapshot.invalid();
853                let equivalent = self
854                    .readable_for(snapshot_id, &invalid)
855                    .map(|record| {
856                        record.with_value(|current: &T| self.policy.equivalent(current, &new_value))
857                    })
858                    .unwrap_or(false);
859                if equivalent {
860                    return false;
861                }
862
863                if let Some(state) = self
864                    .weak_self
865                    .lock()
866                    .expect("Weak self lock poisoned")
867                    .as_ref()
868                    .and_then(|weak| weak.upgrade())
869                {
870                    let trait_object: Arc<dyn StateObject> = state.clone();
871                    snapshot.record_write(trait_object);
872                }
873                mark_update_write(self.id);
874
875                let record = self.writable_record(snapshot_id, &invalid);
876                record.replace_value(new_value);
877                self.assert_chain_integrity("set(child-writable)", Some(snapshot_id));
878            }
879            AnySnapshot::Readonly(_)
880            | AnySnapshot::NestedReadonly(_)
881            | AnySnapshot::TransparentReadonly(_) => {
882                panic!("Cannot write to a read-only snapshot");
883            }
884        }
885
886        // Retain the prior record chain so concurrent readers never observe freed nodes.
887        // Compose proper prunes when it can prove no readers exist; for now we keep
888        // the historical chain with tombstoned values to avoid use-after-free crashes
889        // under heavy UI load.
890        true
891    }
892}
893
894thread_local! {
895    static ACTIVE_UPDATES: RefCell<HashSet<ObjectId>> = RefCell::new(HashSet::default());
896    static PENDING_WRITES: RefCell<HashSet<ObjectId>> = RefCell::new(HashSet::default());
897}
898
899pub(crate) struct UpdateScope {
900    id: ObjectId,
901    finished: bool,
902}
903
904impl UpdateScope {
905    pub(crate) fn new(id: ObjectId) -> Self {
906        ACTIVE_UPDATES.with(|active| {
907            active.borrow_mut().insert(id);
908        });
909        PENDING_WRITES.with(|pending| {
910            pending.borrow_mut().remove(&id);
911        });
912        Self {
913            id,
914            finished: false,
915        }
916    }
917
918    pub(crate) fn finish(mut self) -> bool {
919        self.finished = true;
920        ACTIVE_UPDATES.with(|active| {
921            active.borrow_mut().remove(&self.id);
922        });
923        PENDING_WRITES.with(|pending| pending.borrow_mut().remove(&self.id))
924    }
925}
926
927impl Drop for UpdateScope {
928    fn drop(&mut self) {
929        if self.finished {
930            return;
931        }
932        ACTIVE_UPDATES.with(|active| {
933            active.borrow_mut().remove(&self.id);
934        });
935        PENDING_WRITES.with(|pending| {
936            pending.borrow_mut().remove(&self.id);
937        });
938    }
939}
940
941fn mark_update_write(id: ObjectId) {
942    ACTIVE_UPDATES.with(|active| {
943        if active.borrow().contains(&id) {
944            PENDING_WRITES.with(|pending| {
945                pending.borrow_mut().insert(id);
946            });
947        }
948    });
949}
950
951impl<T: Clone + 'static> SnapshotMutableState<T> {
952    /// Try to find a readable record, returning None if no valid record exists.
953    fn try_readable_record(
954        &self,
955        snapshot_id: SnapshotId,
956        invalid: &SnapshotIdSet,
957    ) -> Option<Rc<StateRecord>> {
958        self.readable_for(snapshot_id, invalid)
959    }
960}
961
962impl<T: Clone + 'static> StateObject for SnapshotMutableState<T> {
963    fn object_id(&self) -> ObjectId {
964        self.id
965    }
966
967    fn first_record(&self) -> Rc<StateRecord> {
968        self.head.clone_head()
969    }
970
971    fn readable_record(&self, snapshot_id: SnapshotId, invalid: &SnapshotIdSet) -> Rc<StateRecord> {
972        self.try_readable_record(snapshot_id, invalid)
973            .unwrap_or_else(|| {
974                panic!(
975                    "SnapshotMutableState::readable_record returned null (state={:?}, snapshot_id={})",
976                    self.id, snapshot_id
977                )
978            })
979    }
980
981    fn prepend_state_record(&self, record: Rc<StateRecord>) {
982        self.head.prepend(record);
983    }
984
985    fn merge_records(
986        &self,
987        previous: Rc<StateRecord>,
988        current: Rc<StateRecord>,
989        applied: Rc<StateRecord>,
990    ) -> Option<Rc<StateRecord>> {
991        let current_vs_applied = current.with_value(|current: &T| {
992            applied.with_value(|applied_value: &T| self.policy.equivalent(current, applied_value))
993        });
994        if current_vs_applied {
995            return Some(current);
996        }
997
998        previous
999            .with_value(|prev: &T| {
1000                current.with_value(|current_value: &T| {
1001                    applied.with_value(|applied_value: &T| {
1002                        self.policy.merge(prev, current_value, applied_value)
1003                    })
1004                })
1005            })
1006            .map(|merged| StateRecord::new(applied.snapshot_id(), merged, None))
1007    }
1008
1009    fn promote_record(&self, child_id: SnapshotId) -> Result<(), &'static str> {
1010        let head = self.first_record();
1011        let mut cursor = Some(head);
1012        while let Some(record) = cursor {
1013            if record.snapshot_id() == child_id {
1014                let cloned = record.with_value(|value: &T| value.clone());
1015                let new_id = allocate_record_id();
1016                let current_head = self.head.clone_head();
1017                let new_head = StateRecord::new(new_id, cloned, Some(current_head));
1018                self.head.replace(new_head);
1019                advance_global_snapshot(new_id);
1020                self.notify_applied();
1021                self.assert_chain_integrity("promote_record", Some(child_id));
1022                return Ok(());
1023            }
1024            cursor = record.next();
1025        }
1026        panic!(
1027            "SnapshotMutableState::promote_record missing child record (state {:?}, child_id={})",
1028            self.id, child_id
1029        );
1030    }
1031
1032    fn commit_merged_record(&self, merged: Rc<StateRecord>) -> Result<SnapshotId, &'static str> {
1033        let value = merged.with_value(|value: &T| value.clone());
1034        let new_id = allocate_record_id();
1035        let current_head = self.head.clone_head();
1036        let new_head = StateRecord::new(new_id, value, Some(current_head));
1037        self.head.replace(new_head);
1038        advance_global_snapshot(new_id);
1039        self.notify_applied();
1040        self.assert_chain_integrity("commit_merged_record", Some(new_id));
1041        Ok(new_id)
1042    }
1043
1044    fn overwrite_unused_records(&self) -> bool {
1045        overwrite_unused_records_locked::<T>(self)
1046    }
1047
1048    fn as_any(&self) -> &dyn Any {
1049        self
1050    }
1051}
1052
1053pub(crate) struct MutableStateInner<T: Clone + 'static> {
1054    pub(crate) state: Arc<SnapshotMutableState<T>>,
1055    pub(crate) watchers: RefCell<HashMap<ScopeId, RcWeak<crate::RecomposeScopeInner>>>,
1056    runtime: RuntimeHandle,
1057    state_id: Cell<Option<StateId>>,
1058}
1059
1060fn shrink_watchers_if_sparse(watchers: &mut HashMap<ScopeId, RcWeak<crate::RecomposeScopeInner>>) {
1061    let len = watchers.len();
1062    let capacity = watchers.capacity();
1063    if capacity > len.saturating_mul(4).max(32) {
1064        watchers.shrink_to_fit();
1065    }
1066}
1067
1068impl<T: Clone + 'static> MutableStateInner<T> {
1069    pub(crate) fn new_with_policy(
1070        value: T,
1071        runtime: RuntimeHandle,
1072        policy: Arc<dyn MutationPolicy<T>>,
1073    ) -> Self {
1074        Self {
1075            state: SnapshotMutableState::new_in_arc(value, policy),
1076            watchers: RefCell::new(HashMap::default()),
1077            runtime,
1078            state_id: Cell::new(None),
1079        }
1080    }
1081
1082    pub(crate) fn install_snapshot_observer(&self, state_id: StateId) {
1083        self.state_id.set(Some(state_id));
1084        let runtime_handle = self.runtime.clone();
1085        self.state.add_apply_observer(Box::new(move || {
1086            let runtime = runtime_handle.clone();
1087            runtime_handle.enqueue_ui_task(Box::new(move || {
1088                runtime.with_state_arena(|arena| {
1089                    let _ = arena.with_typed_opt::<T, _>(state_id, |inner| {
1090                        inner.invalidate_watchers();
1091                    });
1092                });
1093            }));
1094        }));
1095    }
1096
1097    fn with_value<R>(&self, f: impl FnOnce(&T) -> R) -> R {
1098        let value = self.state.get();
1099        f(&value)
1100    }
1101
1102    fn register_scope(&self, scope: &RecomposeScope) -> bool {
1103        let mut watchers = self.watchers.borrow_mut();
1104        match watchers.get(&scope.id()) {
1105            Some(existing) if existing.upgrade().is_some() => false,
1106            _ => {
1107                watchers.insert(scope.id(), scope.downgrade());
1108                true
1109            }
1110        }
1111    }
1112
1113    pub(crate) fn unregister_scope(&self, scope_id: ScopeId) {
1114        let mut watchers = self.watchers.borrow_mut();
1115        watchers.remove(&scope_id);
1116        shrink_watchers_if_sparse(&mut watchers);
1117    }
1118
1119    fn state_id(&self) -> Option<StateId> {
1120        self.state_id.get()
1121    }
1122
1123    fn invalidate_watchers(&self) {
1124        let watchers: Vec<RecomposeScope> = {
1125            let mut watchers = self.watchers.borrow_mut();
1126            let mut live = Vec::with_capacity(watchers.len());
1127            watchers.retain(|_, weak| {
1128                if let Some(inner) = weak.upgrade() {
1129                    live.push(RecomposeScope { inner });
1130                    true
1131                } else {
1132                    false
1133                }
1134            });
1135            shrink_watchers_if_sparse(&mut watchers);
1136            live
1137        };
1138
1139        for watcher in watchers {
1140            debug_record_scope_invalidation::<T>(watcher.id(), self.state_id.get());
1141            watcher.invalidate();
1142        }
1143    }
1144}
1145
1146fn register_current_state_scope<T: Clone + 'static>(inner: &MutableStateInner<T>) {
1147    let Some(Some(scope)) =
1148        with_current_composer_opt(|composer| composer.current_recranpose_scope())
1149    else {
1150        return;
1151    };
1152    if inner.register_scope(&scope) {
1153        if let Some(state_id) = inner.state_id() {
1154            scope.record_state_subscription(state_id);
1155        }
1156    }
1157}
1158
1159/// Cheap copyable read-only view of a state cell.
1160pub struct State<T: Clone + 'static> {
1161    id: StateId,
1162    runtime_id: runtime::RuntimeId,
1163    _marker: PhantomData<fn() -> T>,
1164}
1165
1166/// Cheap copyable mutable view of a state cell.
1167///
1168/// Ownership lives elsewhere: a composition slot, an [`OwnedMutableState`], or
1169/// the runtime for states created with [`crate::mutableStateOf`] /
1170/// [`MutableState::with_runtime`].
1171pub struct MutableState<T: Clone + 'static> {
1172    id: StateId,
1173    runtime_id: runtime::RuntimeId,
1174    _marker: PhantomData<fn() -> T>,
1175}
1176
1177/// Owning state handle for reclaimable state cells.
1178#[derive(Clone)]
1179pub struct OwnedMutableState<T: Clone + 'static> {
1180    state: MutableState<T>,
1181    _lease: Rc<runtime::StateHandleLease>,
1182    _marker: PhantomData<fn() -> T>,
1183}
1184
1185impl<T: Clone + 'static> PartialEq for State<T> {
1186    fn eq(&self, other: &Self) -> bool {
1187        self.state_id() == other.state_id() && self.runtime_id() == other.runtime_id()
1188    }
1189}
1190
1191impl<T: Clone + 'static> Eq for State<T> {}
1192
1193impl<T: Clone + 'static> PartialEq for MutableState<T> {
1194    fn eq(&self, other: &Self) -> bool {
1195        self.state_id() == other.state_id() && self.runtime_id() == other.runtime_id()
1196    }
1197}
1198
1199impl<T: Clone + 'static> Eq for MutableState<T> {}
1200
1201impl<T: Clone + 'static> Copy for State<T> {}
1202
1203impl<T: Clone + 'static> Clone for State<T> {
1204    fn clone(&self) -> Self {
1205        *self
1206    }
1207}
1208
1209impl<T: Clone + 'static> Copy for MutableState<T> {}
1210
1211impl<T: Clone + 'static> Clone for MutableState<T> {
1212    fn clone(&self) -> Self {
1213        *self
1214    }
1215}
1216
1217impl<T: Clone + 'static> State<T> {
1218    fn state_id(&self) -> StateId {
1219        self.id
1220    }
1221
1222    fn runtime_id(&self) -> runtime::RuntimeId {
1223        self.runtime_id
1224    }
1225
1226    fn runtime_handle(&self) -> RuntimeHandle {
1227        runtime::runtime_handle_by_id(self.runtime_id())
1228            .unwrap_or_else(|| panic!("runtime {:?} dropped", self.runtime_id()))
1229    }
1230
1231    fn with_inner<R>(&self, f: impl FnOnce(&MutableStateInner<T>) -> R) -> R {
1232        self.runtime_handle()
1233            .with_state_arena(|arena| arena.with_typed::<T, R>(self.state_id(), f))
1234    }
1235
1236    fn subscribe_current_scope(&self) {
1237        self.with_inner(register_current_state_scope::<T>);
1238    }
1239
1240    pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
1241        self.subscribe_current_scope();
1242        self.with_inner(|inner| inner.with_value(f))
1243    }
1244
1245    pub fn value(&self) -> T {
1246        self.subscribe_current_scope();
1247        self.with_inner(|inner| inner.state.get())
1248    }
1249
1250    pub fn get(&self) -> T {
1251        self.value()
1252    }
1253}
1254
1255impl<T: Clone + 'static> MutableState<T> {
1256    pub fn with_runtime(value: T, runtime: RuntimeHandle) -> Self {
1257        runtime.alloc_persistent_state(value)
1258    }
1259
1260    fn from_parts(id: StateId, runtime_id: runtime::RuntimeId) -> Self {
1261        Self {
1262            id,
1263            runtime_id,
1264            _marker: PhantomData,
1265        }
1266    }
1267
1268    pub(crate) fn from_lease(lease: &Rc<runtime::StateHandleLease>) -> Self {
1269        Self::from_parts(lease.id(), lease.runtime().id())
1270    }
1271
1272    fn state_id(&self) -> StateId {
1273        self.id
1274    }
1275
1276    fn runtime_id(&self) -> runtime::RuntimeId {
1277        self.runtime_id
1278    }
1279
1280    fn runtime_handle(&self) -> RuntimeHandle {
1281        runtime::runtime_handle_by_id(self.runtime_id())
1282            .unwrap_or_else(|| panic!("runtime {:?} dropped", self.runtime_id()))
1283    }
1284
1285    fn with_inner<R>(&self, f: impl FnOnce(&MutableStateInner<T>) -> R) -> R {
1286        self.runtime_handle()
1287            .with_state_arena(|arena| arena.with_typed::<T, R>(self.state_id(), f))
1288    }
1289
1290    fn try_with_inner<R>(&self, f: impl FnOnce(&MutableStateInner<T>) -> R) -> Option<R> {
1291        self.runtime_handle()
1292            .with_state_arena(|arena| arena.with_typed_opt::<T, R>(self.state_id(), f))
1293    }
1294
1295    pub fn is_alive(&self) -> bool {
1296        self.try_with_inner(|_| ()).is_some()
1297    }
1298
1299    pub fn try_with<R>(&self, f: impl FnOnce(&T) -> R) -> Option<R> {
1300        self.try_with_inner(|inner| inner.with_value(f))
1301    }
1302
1303    pub fn try_value(&self) -> Option<T> {
1304        self.try_with_inner(|inner| inner.state.get())
1305    }
1306
1307    pub fn as_state(&self) -> State<T> {
1308        State {
1309            id: self.id,
1310            runtime_id: self.runtime_id,
1311            _marker: PhantomData,
1312        }
1313    }
1314
1315    pub fn try_retain(&self) -> Option<OwnedMutableState<T>> {
1316        let lease = self.runtime_handle().retain_state_lease(self.state_id())?;
1317        Some(OwnedMutableState {
1318            state: *self,
1319            _lease: lease,
1320            _marker: PhantomData,
1321        })
1322    }
1323
1324    pub fn retain(&self) -> OwnedMutableState<T> {
1325        self.try_retain()
1326            .unwrap_or_else(|| panic!("state {:?} is no longer alive", self.state_id()))
1327    }
1328
1329    pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
1330        self.subscribe_current_scope();
1331        self.with_inner(|inner| inner.with_value(f))
1332    }
1333
1334    pub fn update<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
1335        let runtime = self.runtime_handle();
1336        runtime.assert_ui_thread();
1337        runtime.with_state_arena(|arena| {
1338            arena.with_typed::<T, R>(self.state_id(), |inner| {
1339                let mut value = inner.state.get();
1340                let tracker = UpdateScope::new(inner.state.id());
1341                let result = f(&mut value);
1342                let wrote_elsewhere = tracker.finish();
1343                if !wrote_elsewhere && inner.state.set(value) {
1344                    inner.invalidate_watchers();
1345                }
1346                result
1347            })
1348        })
1349    }
1350
1351    pub fn replace(&self, value: T) {
1352        let runtime = self.runtime_handle();
1353        runtime.assert_ui_thread();
1354        runtime.with_state_arena(|arena| {
1355            if arena
1356                .with_typed_opt::<T, ()>(self.state_id(), |inner| {
1357                    if inner.state.set(value) {
1358                        inner.invalidate_watchers();
1359                    }
1360                })
1361                .is_none()
1362            {
1363                log::debug!(
1364                    "MutableState::replace skipped: state cell released (slot={}, gen={})",
1365                    self.state_id().slot(),
1366                    self.state_id().generation(),
1367                );
1368            }
1369        });
1370    }
1371
1372    pub fn set_value(&self, value: T) {
1373        self.replace(value);
1374    }
1375
1376    pub fn set(&self, value: T) {
1377        self.replace(value);
1378    }
1379
1380    pub fn value(&self) -> T {
1381        self.subscribe_current_scope();
1382        self.with_inner(|inner| inner.state.get())
1383    }
1384
1385    pub fn get(&self) -> T {
1386        self.value()
1387    }
1388
1389    pub fn get_non_reactive(&self) -> T {
1390        self.with_inner(|inner| inner.state.get())
1391    }
1392
1393    fn subscribe_current_scope(&self) {
1394        self.with_inner(register_current_state_scope::<T>);
1395    }
1396
1397    #[cfg(test)]
1398    pub(crate) fn watcher_count(&self) -> usize {
1399        self.with_inner(|inner| inner.watchers.borrow().len())
1400    }
1401
1402    #[cfg(test)]
1403    pub(crate) fn watcher_capacity(&self) -> usize {
1404        self.with_inner(|inner| inner.watchers.borrow().capacity())
1405    }
1406
1407    #[cfg(test)]
1408    pub(crate) fn state_id_for_test(&self) -> StateId {
1409        self.state_id()
1410    }
1411
1412    #[cfg(test)]
1413    pub(crate) fn subscribe_scope_for_test(&self, scope: &RecomposeScope) {
1414        self.as_state().subscribe_scope_for_test(scope);
1415    }
1416}
1417
1418impl<T: Clone + 'static> OwnedMutableState<T> {
1419    pub fn with_runtime(value: T, runtime: RuntimeHandle) -> Self {
1420        let lease = runtime.alloc_state(value);
1421        Self {
1422            state: MutableState::from_lease(&lease),
1423            _lease: lease,
1424            _marker: PhantomData,
1425        }
1426    }
1427
1428    pub(crate) fn with_runtime_and_policy(
1429        value: T,
1430        runtime: RuntimeHandle,
1431        policy: Arc<dyn MutationPolicy<T>>,
1432    ) -> Self {
1433        let lease = runtime.alloc_state_with_policy(value, policy);
1434        Self {
1435            state: MutableState::from_lease(&lease),
1436            _lease: lease,
1437            _marker: PhantomData,
1438        }
1439    }
1440
1441    pub fn handle(&self) -> MutableState<T> {
1442        self.state
1443    }
1444
1445    pub fn as_state(&self) -> State<T> {
1446        self.state.as_state()
1447    }
1448}
1449
1450impl<T: Clone + 'static> Deref for OwnedMutableState<T> {
1451    type Target = MutableState<T>;
1452
1453    fn deref(&self) -> &Self::Target {
1454        &self.state
1455    }
1456}
1457
1458#[cfg(test)]
1459impl<T: Clone + 'static> State<T> {
1460    pub(crate) fn subscribe_scope_for_test(&self, scope: &RecomposeScope) {
1461        self.with_inner(|inner| {
1462            if inner.register_scope(scope) {
1463                if let Some(state_id) = inner.state_id() {
1464                    scope.record_state_subscription(state_id);
1465                }
1466            }
1467        });
1468    }
1469}
1470
1471impl<T: fmt::Debug + Clone + 'static> fmt::Debug for MutableState<T> {
1472    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1473        self.with_inner(|inner| {
1474            inner.with_value(|value| {
1475                f.debug_struct("MutableState")
1476                    .field("value", value)
1477                    .finish()
1478            })
1479        })
1480    }
1481}
1482
1483#[derive(Clone)]
1484pub struct SnapshotStateList<T: Clone + 'static> {
1485    state: OwnedMutableState<Vec<T>>,
1486}
1487
1488impl<T: Clone + 'static> SnapshotStateList<T> {
1489    pub fn with_runtime<I>(values: I, runtime: RuntimeHandle) -> Self
1490    where
1491        I: IntoIterator<Item = T>,
1492    {
1493        let initial: Vec<T> = values.into_iter().collect();
1494        Self {
1495            state: OwnedMutableState::with_runtime(initial, runtime),
1496        }
1497    }
1498
1499    pub fn as_state(&self) -> State<Vec<T>> {
1500        self.state.as_state()
1501    }
1502
1503    pub fn as_mutable_state(&self) -> MutableState<Vec<T>> {
1504        self.state.handle()
1505    }
1506
1507    pub fn len(&self) -> usize {
1508        self.state.with(|values| values.len())
1509    }
1510
1511    pub fn is_empty(&self) -> bool {
1512        self.len() == 0
1513    }
1514
1515    pub fn to_vec(&self) -> Vec<T> {
1516        self.state.with(|values| values.clone())
1517    }
1518
1519    pub fn iter(&self) -> Vec<T> {
1520        self.to_vec()
1521    }
1522
1523    pub fn get(&self, index: usize) -> T {
1524        self.state.with(|values| values[index].clone())
1525    }
1526
1527    pub fn get_opt(&self, index: usize) -> Option<T> {
1528        self.state.with(|values| values.get(index).cloned())
1529    }
1530
1531    pub fn first(&self) -> Option<T> {
1532        self.get_opt(0)
1533    }
1534
1535    pub fn last(&self) -> Option<T> {
1536        self.state.with(|values| values.last().cloned())
1537    }
1538
1539    pub fn push(&self, value: T) {
1540        self.state.update(|values| values.push(value));
1541    }
1542
1543    pub fn extend<I>(&self, iter: I)
1544    where
1545        I: IntoIterator<Item = T>,
1546    {
1547        self.state.update(|values| values.extend(iter));
1548    }
1549
1550    pub fn insert(&self, index: usize, value: T) {
1551        self.state.update(|values| values.insert(index, value));
1552    }
1553
1554    pub fn set(&self, index: usize, value: T) -> T {
1555        self.state
1556            .update(|values| std::mem::replace(&mut values[index], value))
1557    }
1558
1559    pub fn remove(&self, index: usize) -> T {
1560        self.state.update(|values| values.remove(index))
1561    }
1562
1563    pub fn pop(&self) -> Option<T> {
1564        self.state.update(|values| values.pop())
1565    }
1566
1567    pub fn clear(&self) {
1568        self.state.replace(Vec::new());
1569    }
1570
1571    pub fn retain<F>(&self, mut predicate: F)
1572    where
1573        F: FnMut(&T) -> bool,
1574    {
1575        self.state
1576            .update(|values| values.retain(|value| predicate(value)));
1577    }
1578
1579    pub fn replace_with<I>(&self, iter: I)
1580    where
1581        I: IntoIterator<Item = T>,
1582    {
1583        self.state.replace(iter.into_iter().collect());
1584    }
1585}
1586
1587impl<T: fmt::Debug + Clone + 'static> fmt::Debug for SnapshotStateList<T> {
1588    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1589        let contents = self.to_vec();
1590        f.debug_struct("SnapshotStateList")
1591            .field("values", &contents)
1592            .finish()
1593    }
1594}
1595
1596#[derive(Clone)]
1597pub struct SnapshotStateMap<K, V>
1598where
1599    K: Clone + Eq + Hash + 'static,
1600    V: Clone + 'static,
1601{
1602    state: OwnedMutableState<HashMap<K, V>>,
1603}
1604
1605impl<K, V> SnapshotStateMap<K, V>
1606where
1607    K: Clone + Eq + Hash + 'static,
1608    V: Clone + 'static,
1609{
1610    pub fn with_runtime<I>(pairs: I, runtime: RuntimeHandle) -> Self
1611    where
1612        I: IntoIterator<Item = (K, V)>,
1613    {
1614        let map: HashMap<K, V> = pairs.into_iter().collect();
1615        Self {
1616            state: OwnedMutableState::with_runtime(map, runtime),
1617        }
1618    }
1619
1620    pub fn as_state(&self) -> State<HashMap<K, V>> {
1621        self.state.as_state()
1622    }
1623
1624    pub fn as_mutable_state(&self) -> MutableState<HashMap<K, V>> {
1625        self.state.handle()
1626    }
1627
1628    pub fn len(&self) -> usize {
1629        self.state.with(|map| map.len())
1630    }
1631
1632    pub fn is_empty(&self) -> bool {
1633        self.state.with(|map| map.is_empty())
1634    }
1635
1636    pub fn contains_key(&self, key: &K) -> bool {
1637        self.state.with(|map| map.contains_key(key))
1638    }
1639
1640    pub fn get(&self, key: &K) -> Option<V> {
1641        self.state.with(|map| map.get(key).cloned())
1642    }
1643
1644    pub fn to_hash_map(&self) -> HashMap<K, V> {
1645        self.state.with(|map| map.clone())
1646    }
1647
1648    pub fn insert(&self, key: K, value: V) -> Option<V> {
1649        self.state.update(|map| map.insert(key, value))
1650    }
1651
1652    pub fn extend<I>(&self, iter: I)
1653    where
1654        I: IntoIterator<Item = (K, V)>,
1655    {
1656        self.state.update(|map| map.extend(iter));
1657    }
1658
1659    pub fn remove(&self, key: &K) -> Option<V> {
1660        self.state.update(|map| map.remove(key))
1661    }
1662
1663    pub fn clear(&self) {
1664        self.state.replace(HashMap::default());
1665    }
1666
1667    pub fn retain<F>(&self, mut predicate: F)
1668    where
1669        F: FnMut(&K, &mut V) -> bool,
1670    {
1671        self.state.update(|map| map.retain(|k, v| predicate(k, v)));
1672    }
1673}
1674
1675impl<K, V> fmt::Debug for SnapshotStateMap<K, V>
1676where
1677    K: Clone + Eq + Hash + fmt::Debug + 'static,
1678    V: Clone + fmt::Debug + 'static,
1679{
1680    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1681        let contents = self.to_hash_map();
1682        f.debug_struct("SnapshotStateMap")
1683            .field("entries", &contents)
1684            .finish()
1685    }
1686}
1687
1688pub(crate) struct DerivedState<T: Clone + 'static> {
1689    compute: Rc<dyn Fn() -> T>,
1690    pub(crate) state: OwnedMutableState<T>,
1691}
1692
1693impl<T: Clone + 'static> DerivedState<T> {
1694    pub(crate) fn new(runtime: RuntimeHandle, compute: Rc<dyn Fn() -> T>) -> Self {
1695        let initial = compute();
1696        Self {
1697            compute,
1698            state: OwnedMutableState::with_runtime(initial, runtime),
1699        }
1700    }
1701
1702    pub(crate) fn set_compute(&mut self, compute: Rc<dyn Fn() -> T>) {
1703        self.compute = compute;
1704    }
1705
1706    pub(crate) fn recompute(&self) {
1707        let value = (self.compute)();
1708        self.state.set_value(value);
1709    }
1710}
1711
1712impl<T: fmt::Debug + Clone + 'static> fmt::Debug for State<T> {
1713    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1714        self.with_inner(|inner| {
1715            inner.with_value(|value| f.debug_struct("State").field("value", value).finish())
1716        })
1717    }
1718}
1719
1720#[cfg(test)]
1721mod tests {
1722    use super::*;
1723
1724    /// Helper to create a chain of records for testing
1725    fn create_record_chain(ids: &[SnapshotId]) -> Rc<StateRecord> {
1726        let mut head: Option<Rc<StateRecord>> = None;
1727
1728        // Build chain in reverse order (last ID becomes the tail)
1729        for &id in ids.iter().rev() {
1730            head = Some(StateRecord::new(id, 0i32, head));
1731        }
1732
1733        head.expect("create_record_chain called with empty ids")
1734    }
1735
1736    struct ManualState {
1737        head: Rc<StateRecord>,
1738    }
1739
1740    impl ManualState {
1741        fn new(head: Rc<StateRecord>) -> Self {
1742            Self { head }
1743        }
1744    }
1745
1746    impl StateObject for ManualState {
1747        fn object_id(&self) -> ObjectId {
1748            ObjectId(999)
1749        }
1750
1751        fn first_record(&self) -> Rc<StateRecord> {
1752            Rc::clone(&self.head)
1753        }
1754
1755        fn readable_record(&self, _: SnapshotId, _: &SnapshotIdSet) -> Rc<StateRecord> {
1756            Rc::clone(&self.head)
1757        }
1758
1759        fn prepend_state_record(&self, _: Rc<StateRecord>) {}
1760
1761        fn promote_record(&self, _: SnapshotId) -> Result<(), &'static str> {
1762            Ok(())
1763        }
1764
1765        fn as_any(&self) -> &dyn Any {
1766            self
1767        }
1768    }
1769
1770    #[test]
1771    fn test_used_locked_finds_invalid_snapshot() {
1772        // Create a chain with an INVALID_SNAPSHOT record
1773        let tail = StateRecord::new(PREEXISTING_SNAPSHOT_ID, 0i32, None);
1774        let invalid_rec = StateRecord::new(INVALID_SNAPSHOT_ID, 0i32, Some(tail));
1775        let head = StateRecord::new(10, 0i32, Some(invalid_rec.clone()));
1776
1777        let result = used_locked(&head);
1778        assert!(result.is_some());
1779        assert_eq!(result.unwrap().snapshot_id(), INVALID_SNAPSHOT_ID);
1780    }
1781
1782    #[test]
1783    fn test_used_locked_finds_obscured_record() {
1784        // Reset pinning state for clean test
1785        crate::snapshot_pinning::reset_pinning_table();
1786
1787        // Pin a high snapshot to set a known reuse limit
1788        // This ensures records 2 and 5 are both below (reuse_limit = 10 - 1 = 9)
1789        let pin_handle = crate::snapshot_pinning::track_pinning(10, &SnapshotIdSet::EMPTY);
1790
1791        // Create a chain with two old records below the reuse limit
1792        let oldest = StateRecord::new(2, 0i32, None);
1793        let newer = StateRecord::new(5, 0i32, Some(oldest.clone()));
1794        let head = StateRecord::new(100, 0i32, Some(newer));
1795
1796        let result = used_locked(&head);
1797
1798        // Should find the older of the two records below reuse limit
1799        assert!(result.is_some());
1800        let reused = result.unwrap();
1801        assert_eq!(
1802            reused.snapshot_id(),
1803            2,
1804            "Should return the oldest obscured record"
1805        );
1806
1807        // Clean up
1808        crate::snapshot_pinning::release_pinning(pin_handle);
1809    }
1810
1811    #[test]
1812    fn test_used_locked_no_reusable_record() {
1813        // Reset pinning state
1814        crate::snapshot_pinning::reset_pinning_table();
1815
1816        // Create a chain where all records are recent (above reuse limit)
1817        // Use very high IDs to ensure they're above any reuse limit
1818        let high_id = allocate_record_id() + 1000;
1819        let head = create_record_chain(&[high_id, high_id + 1, high_id + 2]);
1820
1821        let result = used_locked(&head);
1822        assert!(
1823            result.is_none(),
1824            "Should find no reusable records when all are recent"
1825        );
1826    }
1827
1828    #[test]
1829    fn test_used_locked_single_old_record() {
1830        // Reset pinning state
1831        crate::snapshot_pinning::reset_pinning_table();
1832
1833        // Create a chain with only one old record (should not be reused)
1834        let old = StateRecord::new(2, 0i32, None);
1835        let head = StateRecord::new(100, 0i32, Some(old));
1836
1837        let result = used_locked(&head);
1838        // With only ONE record below reuse limit, it's still valid and should not be reused
1839        assert!(result.is_none(), "Single old record should not be reused");
1840    }
1841
1842    #[test]
1843    fn test_readable_record_for_preexisting() {
1844        let head = create_record_chain(&[PREEXISTING_SNAPSHOT_ID]);
1845        let invalid = SnapshotIdSet::EMPTY;
1846
1847        let result = readable_record_for(&head, 10, &invalid);
1848        assert!(result.is_some());
1849        assert_eq!(result.unwrap().snapshot_id(), PREEXISTING_SNAPSHOT_ID);
1850    }
1851
1852    #[test]
1853    fn test_readable_record_for_picks_highest_valid() {
1854        let head = create_record_chain(&[10, 5, PREEXISTING_SNAPSHOT_ID]);
1855        let invalid = SnapshotIdSet::EMPTY;
1856
1857        // Reading at snapshot 10 should return record 10
1858        let result = readable_record_for(&head, 10, &invalid);
1859        assert!(result.is_some());
1860        assert_eq!(result.unwrap().snapshot_id(), 10);
1861
1862        // Reading at snapshot 7 should skip record 10 and return record 5
1863        let result = readable_record_for(&head, 7, &invalid);
1864        assert!(result.is_some());
1865        assert_eq!(result.unwrap().snapshot_id(), 5);
1866    }
1867
1868    #[test]
1869    fn test_new_overwritable_record_locked_reuses_invalid() {
1870        // Create a state with an INVALID record in the chain
1871        let state = SnapshotMutableState::new_in_arc(100i32, Arc::new(NeverEqual));
1872
1873        // Manually insert an INVALID record into the chain
1874        let current_head = state.first_record();
1875        let invalid_rec = StateRecord::new(INVALID_SNAPSHOT_ID, 0i32, current_head.next());
1876        current_head.set_next(Some(invalid_rec.clone()));
1877
1878        let result = new_overwritable_record_locked(&*state);
1879
1880        // Should reuse the INVALID record
1881        assert!(Rc::ptr_eq(&result, &invalid_rec));
1882        assert_eq!(result.snapshot_id(), SNAPSHOT_ID_MAX);
1883    }
1884
1885    #[test]
1886    fn test_new_overwritable_record_locked_creates_new() {
1887        crate::snapshot_pinning::reset_pinning_table();
1888
1889        // Pin snapshot 1 to prevent PREEXISTING (id=1) from being reusable
1890        // This ensures the reuse limit is above 1, so PREEXISTING won't be obscured
1891        let _pin_handle = crate::snapshot_pinning::track_pinning(1, &SnapshotIdSet::EMPTY);
1892
1893        // Create a state with all recent records (no reusable ones)
1894        let state = SnapshotMutableState::new_in_arc(100i32, Arc::new(NeverEqual));
1895        let old_head = state.first_record();
1896
1897        let result = new_overwritable_record_locked(&*state);
1898
1899        // Should create a new record
1900        assert_eq!(result.snapshot_id(), SNAPSHOT_ID_MAX);
1901
1902        // Should be prepended to the chain (becomes new head)
1903        let new_head = state.first_record();
1904        assert!(
1905            Rc::ptr_eq(&new_head, &result),
1906            "new_head ({:p}) should equal result ({:p})",
1907            Rc::as_ptr(&new_head),
1908            Rc::as_ptr(&result)
1909        );
1910
1911        // The new record should point to the old head
1912        assert!(result.next().is_some());
1913        assert!(Rc::ptr_eq(&result.next().unwrap(), &old_head));
1914    }
1915
1916    #[test]
1917    fn test_writable_record_reuses_invalid_record() {
1918        crate::snapshot_pinning::reset_pinning_table();
1919
1920        let state = SnapshotMutableState::new_in_arc(7i32, Arc::new(NeverEqual));
1921
1922        // Inject an INVALID record that should be reused on next write.
1923        let head = state.first_record();
1924        let invalid = StateRecord::new(INVALID_SNAPSHOT_ID, 0i32, head.next());
1925        head.set_next(Some(invalid.clone()));
1926
1927        let snapshot_id = allocate_record_id();
1928        let result = state.writable_record(snapshot_id, &SnapshotIdSet::EMPTY);
1929
1930        assert!(
1931            Rc::ptr_eq(&result, &invalid),
1932            "Expected writable_record to reuse the INVALID record"
1933        );
1934        assert_eq!(result.snapshot_id(), snapshot_id);
1935        result.with_value(|value: &i32| {
1936            assert_eq!(*value, 7, "Reused record should copy the readable value");
1937        });
1938        assert!(!result.is_tombstone());
1939    }
1940
1941    #[test]
1942    fn test_writable_record_creates_new_when_reuse_disallowed() {
1943        crate::snapshot_pinning::reset_pinning_table();
1944        let pin = crate::snapshot_pinning::track_pinning(1, &SnapshotIdSet::EMPTY);
1945
1946        let state = SnapshotMutableState::new_in_arc(42i32, Arc::new(NeverEqual));
1947        let original_head = state.first_record();
1948        let preexisting = original_head
1949            .next()
1950            .expect("preexisting record should exist for newly created state");
1951
1952        let snapshot_id = allocate_record_id();
1953        let result = state.writable_record(snapshot_id, &SnapshotIdSet::EMPTY);
1954
1955        assert!(
1956            !Rc::ptr_eq(&result, &original_head),
1957            "Should not reuse the current head when reuse is disallowed"
1958        );
1959        assert!(
1960            !Rc::ptr_eq(&result, &preexisting),
1961            "Should not reuse the PREEXISTING record"
1962        );
1963        assert_eq!(result.snapshot_id(), snapshot_id);
1964        result.with_value(|value: &i32| assert_eq!(*value, 42));
1965
1966        let new_head = state.first_record();
1967        assert!(
1968            Rc::ptr_eq(&new_head, &result),
1969            "Newly created record should become the head of the chain"
1970        );
1971
1972        crate::snapshot_pinning::release_pinning(pin);
1973    }
1974
1975    #[test]
1976    fn test_state_record_clear_for_reuse() {
1977        let record = StateRecord::new(10, 42i32, None);
1978
1979        // Verify value exists before clearing
1980        record.with_value(|val: &i32| {
1981            assert_eq!(*val, 42);
1982        });
1983
1984        // Clear the record for reuse
1985        record.clear_for_reuse();
1986
1987        // Value should be cleared (will panic if we try to access it)
1988        // Just verify snapshot_id is unchanged
1989        assert_eq!(record.snapshot_id(), 10);
1990    }
1991
1992    #[test]
1993    fn test_overwrite_unused_records_no_old_records() {
1994        crate::snapshot_pinning::reset_pinning_table();
1995
1996        // Create state first to establish snapshot IDs
1997        let state = SnapshotMutableState::new_in_arc(42i32, Arc::new(NeverEqual));
1998
1999        // Pin snapshot 1 so reuse limit is 1, making both initial records (1 and 2) above it
2000        // This ensures PREEXISTING won't be overwritten
2001        let _pin = crate::snapshot_pinning::track_pinning(1, &SnapshotIdSet::EMPTY);
2002
2003        let should_retain = state.overwrite_unused_records();
2004
2005        // With both records above/at reuse limit, we have 2 retained
2006        assert!(
2007            should_retain,
2008            "Should retain multiple records when none are old enough"
2009        );
2010
2011        // No records should be marked as INVALID
2012        let mut cursor = Some(state.first_record());
2013        while let Some(record) = cursor {
2014            assert_ne!(record.snapshot_id(), INVALID_SNAPSHOT_ID);
2015            cursor = record.next();
2016        }
2017    }
2018
2019    #[test]
2020    fn test_overwrite_unused_records_basic_cleanup() {
2021        // Test that old records get marked invalid when newer ones exist
2022        crate::snapshot_pinning::reset_pinning_table();
2023
2024        // Create simple manual chain to avoid snapshot ID allocation complexity
2025        let rec1 = StateRecord::new(100, 1i32, None);
2026        let rec2 = StateRecord::new(200, 2i32, Some(rec1.clone()));
2027        let rec3 = StateRecord::new(300, 3i32, Some(rec2.clone()));
2028
2029        // Mock state object for testing
2030        struct TestState {
2031            head: Rc<StateRecord>,
2032        }
2033        impl StateObject for TestState {
2034            fn object_id(&self) -> ObjectId {
2035                ObjectId(999)
2036            }
2037            fn first_record(&self) -> Rc<StateRecord> {
2038                Rc::clone(&self.head)
2039            }
2040            fn readable_record(&self, _: SnapshotId, _: &SnapshotIdSet) -> Rc<StateRecord> {
2041                Rc::clone(&self.head)
2042            }
2043            fn prepend_state_record(&self, _: Rc<StateRecord>) {}
2044            fn promote_record(&self, _: SnapshotId) -> Result<(), &'static str> {
2045                Ok(())
2046            }
2047            fn as_any(&self) -> &dyn Any {
2048                self
2049            }
2050        }
2051
2052        let test_state = TestState { head: rec3.clone() };
2053
2054        // Pin at 1000 so all three records (100, 200, 300) are below reuse limit
2055        let _pin = crate::snapshot_pinning::track_pinning(1000, &SnapshotIdSet::EMPTY);
2056
2057        let result = overwrite_unused_records_locked::<i32>(&test_state);
2058
2059        // Should keep highest (300), mark others invalid
2060        assert_eq!(rec3.snapshot_id(), 300);
2061        assert_eq!(rec2.snapshot_id(), INVALID_SNAPSHOT_ID);
2062        assert_eq!(rec1.snapshot_id(), INVALID_SNAPSHOT_ID);
2063
2064        // Only one record retained (300), so should return false
2065        assert!(!result);
2066    }
2067
2068    #[test]
2069    fn test_overwrite_unused_records_single_record_only() {
2070        crate::snapshot_pinning::reset_pinning_table();
2071
2072        let state = SnapshotMutableState::new_in_arc(42i32, Arc::new(NeverEqual));
2073
2074        // Remove the PREEXISTING record by setting next to None
2075        let head = state.first_record();
2076        head.set_next(None);
2077
2078        let should_retain = state.overwrite_unused_records();
2079
2080        // With only one record, should return false
2081        assert!(!should_retain, "Single record should return false");
2082    }
2083
2084    #[test]
2085    fn test_overwrite_unused_records_clears_values() {
2086        crate::snapshot_pinning::reset_pinning_table();
2087
2088        let tail = StateRecord::new(PREEXISTING_SNAPSHOT_ID, 0i32, None);
2089        let old_rec1 = StateRecord::new(2, 999i32, Some(tail.clone()));
2090        let old_rec2 = StateRecord::new(3, 888i32, Some(old_rec1.clone()));
2091        let head = StateRecord::new(150, 42i32, Some(old_rec2.clone()));
2092        let state = ManualState::new(head.clone());
2093
2094        // Verify value exists before cleanup
2095        old_rec1.with_value(|val: &i32| {
2096            assert_eq!(*val, 999);
2097        });
2098
2099        let _pin = crate::snapshot_pinning::track_pinning(100, &SnapshotIdSet::EMPTY);
2100        overwrite_unused_records_locked::<i32>(&state);
2101
2102        // The invalidated record should have its value cleared
2103        assert_eq!(old_rec1.snapshot_id(), INVALID_SNAPSHOT_ID);
2104        // Value access would panic, so we just verify it was marked invalid
2105    }
2106
2107    #[test]
2108    fn test_overwrite_unused_records_mixed_old_and_new() {
2109        crate::snapshot_pinning::reset_pinning_table();
2110
2111        // Create mixed chain: recent (50) -> old (5) -> old (2) -> PREEXISTING
2112        let preexisting = StateRecord::new(PREEXISTING_SNAPSHOT_ID, 0i32, None);
2113        let rec2 = StateRecord::new(2, 100i32, Some(preexisting.clone()));
2114        let rec5 = StateRecord::new(5, 100i32, Some(rec2.clone()));
2115        let rec50 = StateRecord::new(50, 100i32, Some(rec5.clone()));
2116        let head = StateRecord::new(120, 100i32, Some(rec50.clone()));
2117        let state = ManualState::new(head.clone());
2118
2119        // Pin snapshot 40 so reuse limit is ~40, making 2 and 5 old but 50 recent
2120        let _pin = crate::snapshot_pinning::track_pinning(40, &SnapshotIdSet::EMPTY);
2121
2122        let should_retain = overwrite_unused_records_locked::<i32>(&state);
2123        assert!(should_retain);
2124
2125        // rec50 is above reuse limit - should stay valid
2126        assert_eq!(rec50.snapshot_id(), 50);
2127        // rec5 is highest below reuse limit - should stay valid
2128        assert_eq!(rec5.snapshot_id(), 5);
2129        // rec2 is older and below reuse limit - should be invalidated
2130        assert_eq!(rec2.snapshot_id(), INVALID_SNAPSHOT_ID);
2131    }
2132
2133    #[test]
2134    fn test_readable_record_for_skips_invalid_set() {
2135        let head = create_record_chain(&[10, 5, PREEXISTING_SNAPSHOT_ID]);
2136        let invalid = SnapshotIdSet::new().set(5);
2137
2138        // Reading at snapshot 10 should skip record 5 (in invalid set)
2139        let result = readable_record_for(&head, 10, &invalid);
2140        assert!(result.is_some());
2141        assert_eq!(result.unwrap().snapshot_id(), 10);
2142
2143        // Reading at snapshot 7 should skip 5 and fall back to PREEXISTING
2144        let result = readable_record_for(&head, 7, &invalid);
2145        assert!(result.is_some());
2146        assert_eq!(result.unwrap().snapshot_id(), PREEXISTING_SNAPSHOT_ID);
2147    }
2148
2149    // ========== Tests for assign_value() ==========
2150
2151    #[test]
2152    fn test_assign_value_copies_int() {
2153        let source = StateRecord::new(10, 42i32, None);
2154        let target = StateRecord::new(20, 0i32, None);
2155
2156        target.assign_value::<i32>(&source);
2157
2158        // Verify the value was copied
2159        target.with_value(|val: &i32| {
2160            assert_eq!(*val, 42);
2161        });
2162
2163        // Verify source is unchanged
2164        source.with_value(|val: &i32| {
2165            assert_eq!(*val, 42);
2166        });
2167
2168        // Verify snapshot IDs are unchanged
2169        assert_eq!(source.snapshot_id(), 10);
2170        assert_eq!(target.snapshot_id(), 20);
2171    }
2172
2173    #[test]
2174    fn test_assign_value_copies_string() {
2175        let source = StateRecord::new(10, "hello".to_string(), None);
2176        let target = StateRecord::new(20, "world".to_string(), None);
2177
2178        target.assign_value::<String>(&source);
2179
2180        // Verify the value was copied
2181        target.with_value(|val: &String| {
2182            assert_eq!(val, "hello");
2183        });
2184
2185        // Verify source is unchanged
2186        source.with_value(|val: &String| {
2187            assert_eq!(val, "hello");
2188        });
2189    }
2190
2191    #[test]
2192    #[should_panic(expected = "StateRecord value missing or wrong type")]
2193    fn test_assign_value_copies_from_cleared_source_panics() {
2194        let source = StateRecord::new(10, 42i32, None);
2195        let target = StateRecord::new(20, 0i32, None);
2196
2197        // Clear the source value
2198        source.clear_value();
2199
2200        // Should panic because source has no value
2201        target.assign_value::<i32>(&source);
2202    }
2203
2204    #[test]
2205    fn test_assign_value_overwrites_existing_value() {
2206        let source = StateRecord::new(10, 100i32, None);
2207        let target = StateRecord::new(20, 999i32, None);
2208
2209        // Verify target has initial value
2210        target.with_value(|val: &i32| {
2211            assert_eq!(*val, 999);
2212        });
2213
2214        // Assign from source
2215        target.assign_value::<i32>(&source);
2216
2217        // Verify target now has source's value
2218        target.with_value(|val: &i32| {
2219            assert_eq!(*val, 100);
2220        });
2221    }
2222
2223    #[test]
2224    fn test_assign_value_with_custom_type() {
2225        #[derive(Clone, PartialEq, Debug)]
2226        struct Point {
2227            x: f64,
2228            y: f64,
2229        }
2230
2231        let source = StateRecord::new(10, Point { x: 1.5, y: 2.5 }, None);
2232        let target = StateRecord::new(20, Point { x: 0.0, y: 0.0 }, None);
2233
2234        target.assign_value::<Point>(&source);
2235
2236        target.with_value(|val: &Point| {
2237            assert_eq!(val, &Point { x: 1.5, y: 2.5 });
2238        });
2239    }
2240
2241    #[test]
2242    fn test_assign_value_self_assignment() {
2243        let record = StateRecord::new(10, 42i32, None);
2244
2245        // Self-assignment should work (though not useful in practice)
2246        record.assign_value::<i32>(&record);
2247
2248        record.with_value(|val: &i32| {
2249            assert_eq!(*val, 42);
2250        });
2251    }
2252
2253    #[test]
2254    fn test_assign_value_with_vec() {
2255        let source = StateRecord::new(10, vec![1, 2, 3, 4, 5], None);
2256        let target = StateRecord::new(20, Vec::<i32>::new(), None);
2257
2258        target.assign_value::<Vec<i32>>(&source);
2259
2260        target.with_value(|val: &Vec<i32>| {
2261            assert_eq!(val, &vec![1, 2, 3, 4, 5]);
2262        });
2263
2264        // Verify it's a deep copy (modifying source won't affect target)
2265        source.replace_value(vec![10, 20]);
2266        target.with_value(|val: &Vec<i32>| {
2267            assert_eq!(val, &vec![1, 2, 3, 4, 5]);
2268        });
2269    }
2270}