cranpose_core/snapshot_v2/
mutable.rs

1//! Mutable snapshot implementation.
2
3use super::*;
4use crate::collections::map::HashMap;
5use crate::state::{StateRecord, PREEXISTING_SNAPSHOT_ID};
6use std::sync::Arc;
7
8pub(super) fn find_record_by_id(
9    head: &Arc<StateRecord>,
10    target: SnapshotId,
11) -> Option<Arc<StateRecord>> {
12    let mut cursor = Some(Arc::clone(head));
13    while let Some(record) = cursor {
14        if !record.is_tombstone() && record.snapshot_id() == target {
15            return Some(record);
16        }
17        cursor = record.next();
18    }
19    None
20}
21
22pub(super) fn find_previous_record(
23    head: &Arc<StateRecord>,
24    base_snapshot_id: SnapshotId,
25) -> (Option<Arc<StateRecord>>, bool) {
26    let mut cursor = Some(Arc::clone(head));
27    let mut best: Option<Arc<StateRecord>> = None;
28    let mut fallback: Option<Arc<StateRecord>> = None;
29    let mut found_base = false;
30
31    while let Some(record) = cursor {
32        if !record.is_tombstone() {
33            fallback = Some(record.clone());
34            if record.snapshot_id() <= base_snapshot_id {
35                found_base = true;
36                let replace = best
37                    .as_ref()
38                    .map(|current| current.snapshot_id() < record.snapshot_id())
39                    .unwrap_or(true);
40                if replace {
41                    best = Some(record.clone());
42                }
43            }
44        }
45        cursor = record.next();
46    }
47
48    (best.or(fallback), found_base)
49}
50
51enum ApplyOperation {
52    PromoteChild {
53        object_id: StateObjectId,
54        state: Arc<dyn StateObject>,
55        writer_id: SnapshotId,
56    },
57    PromoteExisting {
58        object_id: StateObjectId,
59        state: Arc<dyn StateObject>,
60        source_id: SnapshotId,
61        applied: Arc<StateRecord>,
62    },
63    CommitMerged {
64        object_id: StateObjectId,
65        state: Arc<dyn StateObject>,
66        merged: Arc<StateRecord>,
67        applied: Arc<StateRecord>,
68    },
69}
70
71/// A mutable snapshot that allows isolated state changes.
72///
73/// Changes made in a mutable snapshot are isolated from other snapshots
74/// until `apply()` is called, at which point they become visible atomically.
75/// This is a root mutable snapshot (not nested).
76///
77/// # Thread Safety
78/// Contains `Cell<T>` which is not `Send`/`Sync`. This is safe because snapshots
79/// are stored in thread-local storage and never shared across threads. The `Arc`
80/// is used for cheap cloning within a single thread, not for cross-thread sharing.
81#[allow(clippy::arc_with_non_send_sync)]
82pub struct MutableSnapshot {
83    state: SnapshotState,
84    /// The parent's snapshot id at the time this snapshot was created
85    base_parent_id: SnapshotId,
86    /// Number of active nested snapshots
87    nested_count: Cell<usize>,
88    /// Whether this snapshot has been applied
89    applied: Cell<bool>,
90}
91
92impl MutableSnapshot {
93    pub(crate) fn from_parts(
94        id: SnapshotId,
95        invalid: SnapshotIdSet,
96        read_observer: Option<ReadObserver>,
97        write_observer: Option<WriteObserver>,
98        base_parent_id: SnapshotId,
99        runtime_tracked: bool,
100    ) -> Arc<Self> {
101        Arc::new(Self {
102            state: SnapshotState::new(id, invalid, read_observer, write_observer, runtime_tracked),
103            base_parent_id,
104            nested_count: Cell::new(0),
105            applied: Cell::new(false),
106        })
107    }
108
109    /// Create a new root mutable snapshot using the global runtime.
110    pub fn new_root(
111        read_observer: Option<ReadObserver>,
112        write_observer: Option<WriteObserver>,
113    ) -> Arc<Self> {
114        GlobalSnapshot::get_or_create().take_nested_mutable_snapshot(read_observer, write_observer)
115    }
116
117    /// Create a new root mutable snapshot.
118    pub fn new(
119        id: SnapshotId,
120        invalid: SnapshotIdSet,
121        read_observer: Option<ReadObserver>,
122        write_observer: Option<WriteObserver>,
123        base_parent_id: SnapshotId,
124    ) -> Arc<Self> {
125        Self::from_parts(
126            id,
127            invalid,
128            read_observer,
129            write_observer,
130            base_parent_id,
131            false,
132        )
133    }
134
135    fn validate_not_applied(&self) {
136        if self.applied.get() {
137            panic!("Snapshot has already been applied");
138        }
139    }
140
141    fn validate_not_disposed(&self) {
142        if self.state.disposed.get() {
143            panic!("Snapshot has been disposed");
144        }
145    }
146
147    pub fn snapshot_id(&self) -> SnapshotId {
148        self.state.id.get()
149    }
150
151    pub fn invalid(&self) -> SnapshotIdSet {
152        self.state.invalid.borrow().clone()
153    }
154
155    pub fn read_only(&self) -> bool {
156        false
157    }
158
159    pub(crate) fn set_on_dispose<F>(&self, f: F)
160    where
161        F: FnOnce() + 'static,
162    {
163        self.state.set_on_dispose(f);
164    }
165
166    pub fn root_mutable(self: &Arc<Self>) -> Arc<Self> {
167        self.clone()
168    }
169
170    pub fn enter<T>(self: &Arc<Self>, f: impl FnOnce() -> T) -> T {
171        let previous = current_snapshot();
172        set_current_snapshot(Some(AnySnapshot::Mutable(self.clone())));
173        let result = f();
174        set_current_snapshot(previous);
175        result
176    }
177
178    pub fn take_nested_snapshot(
179        self: &Arc<Self>,
180        read_observer: Option<ReadObserver>,
181    ) -> Arc<ReadonlySnapshot> {
182        self.validate_not_disposed();
183        self.validate_not_applied();
184
185        let merged_observer = merge_read_observers(read_observer, self.state.read_observer.clone());
186
187        // Create a nested read-only snapshot
188        let nested = ReadonlySnapshot::new(
189            self.state.id.get(),
190            self.state.invalid.borrow().clone(),
191            merged_observer,
192        );
193
194        self.nested_count.set(self.nested_count.get() + 1);
195
196        // When the nested snapshot is disposed, decrement this parent's nested_count
197        let parent_weak = Arc::downgrade(self);
198        nested.set_on_dispose(move || {
199            if let Some(parent) = parent_weak.upgrade() {
200                let cur = parent.nested_count.get();
201                if cur > 0 {
202                    parent.nested_count.set(cur - 1);
203                }
204            }
205        });
206        nested
207    }
208
209    pub fn has_pending_changes(&self) -> bool {
210        !self.state.modified.borrow().is_empty()
211    }
212
213    pub fn pending_children(&self) -> Vec<SnapshotId> {
214        self.state.pending_children()
215    }
216
217    pub fn has_pending_children(&self) -> bool {
218        self.state.has_pending_children()
219    }
220
221    pub fn dispose(&self) {
222        if !self.state.disposed.get() && self.nested_count.get() == 0 {
223            self.state.dispose();
224        }
225    }
226
227    pub fn record_read(&self, state: &dyn StateObject) {
228        self.state.record_read(state);
229    }
230
231    pub fn record_write(&self, state: Arc<dyn StateObject>) {
232        self.validate_not_applied();
233        self.validate_not_disposed();
234        self.state.record_write(state, self.state.id.get());
235    }
236
237    pub fn notify_objects_initialized(&self) {
238        if !self.applied.get() && !self.state.disposed.get() {
239            // Mark that objects are initialized
240            // In a full implementation, this would update internal state
241        }
242    }
243
244    pub fn close(&self) {
245        self.state.disposed.set(true);
246    }
247
248    pub fn is_disposed(&self) -> bool {
249        self.state.disposed.get()
250    }
251
252    pub fn apply(&self) -> SnapshotApplyResult {
253        // Check disposed state first - return Failure instead of panicking
254        if self.state.disposed.get() {
255            return SnapshotApplyResult::Failure;
256        }
257
258        if self.applied.get() {
259            return SnapshotApplyResult::Failure;
260        }
261
262        let modified = self.state.modified.borrow();
263        if modified.is_empty() {
264            // No changes to apply
265            self.applied.set(true);
266            self.state.dispose();
267            return SnapshotApplyResult::Success;
268        }
269
270        let this_id = self.state.id.get();
271        let mut modified_objects: Vec<(StateObjectId, Arc<dyn StateObject>, SnapshotId)> =
272            Vec::with_capacity(modified.len());
273        for (&obj_id, (obj, writer_id)) in modified.iter() {
274            modified_objects.push((obj_id, obj.clone(), *writer_id));
275        }
276
277        drop(modified);
278
279        let parent_snapshot = GlobalSnapshot::get_or_create();
280        let parent_snapshot_id = parent_snapshot.snapshot_id();
281        let parent_invalid = parent_snapshot.invalid();
282        drop(parent_snapshot);
283
284        let next_invalid = super::runtime::open_snapshots().clear(parent_snapshot_id);
285        let optimistic = super::optimistic_merges(
286            parent_snapshot_id,
287            self.base_parent_id,
288            &modified_objects,
289            &next_invalid,
290        );
291
292        let mut operations: Vec<ApplyOperation> = Vec::with_capacity(modified_objects.len());
293
294        for (obj_id, state, writer_id) in &modified_objects {
295            let head = state.first_record();
296            let applied = match find_record_by_id(&head, *writer_id) {
297                Some(record) => record,
298                None => return SnapshotApplyResult::Failure,
299            };
300
301            let current =
302                crate::state::readable_record_for(&head, parent_snapshot_id, &next_invalid)
303                    .unwrap_or_else(|| state.readable_record(parent_snapshot_id, &parent_invalid));
304            let (previous_opt, found_base) = find_previous_record(&head, self.base_parent_id);
305            let Some(previous) = previous_opt else {
306                return SnapshotApplyResult::Failure;
307            };
308
309            if !found_base || previous.snapshot_id() == PREEXISTING_SNAPSHOT_ID {
310                operations.push(ApplyOperation::PromoteChild {
311                    object_id: *obj_id,
312                    state: state.clone(),
313                    writer_id: *writer_id,
314                });
315                continue;
316            }
317
318            if Arc::ptr_eq(&current, &previous) {
319                operations.push(ApplyOperation::PromoteChild {
320                    object_id: *obj_id,
321                    state: state.clone(),
322                    writer_id: *writer_id,
323                });
324                continue;
325            }
326
327            let merged = if let Some(candidate) = optimistic
328                .as_ref()
329                .and_then(|map| map.get(&(Arc::as_ptr(&current) as usize)))
330                .cloned()
331            {
332                candidate
333            } else {
334                match state.merge_records(
335                    Arc::clone(&previous),
336                    Arc::clone(&current),
337                    Arc::clone(&applied),
338                ) {
339                    Some(record) => record,
340                    None => return SnapshotApplyResult::Failure,
341                }
342            };
343
344            if Arc::ptr_eq(&merged, &applied) {
345                operations.push(ApplyOperation::PromoteChild {
346                    object_id: *obj_id,
347                    state: state.clone(),
348                    writer_id: *writer_id,
349                });
350            } else if Arc::ptr_eq(&merged, &current) {
351                operations.push(ApplyOperation::PromoteExisting {
352                    object_id: *obj_id,
353                    state: state.clone(),
354                    source_id: current.snapshot_id(),
355                    applied: applied.clone(),
356                });
357            } else {
358                operations.push(ApplyOperation::CommitMerged {
359                    object_id: *obj_id,
360                    state: state.clone(),
361                    merged: merged.clone(),
362                    applied: applied.clone(),
363                });
364            }
365        }
366
367        let mut applied_info: Vec<(StateObjectId, Arc<dyn StateObject>, SnapshotId)> =
368            Vec::with_capacity(operations.len());
369
370        for operation in operations {
371            match operation {
372                ApplyOperation::PromoteChild {
373                    object_id,
374                    state,
375                    writer_id,
376                } => {
377                    if state.promote_record(writer_id).is_err() {
378                        return SnapshotApplyResult::Failure;
379                    }
380                    let new_head_id = state.first_record().snapshot_id();
381                    applied_info.push((object_id, state, new_head_id));
382                }
383                ApplyOperation::PromoteExisting {
384                    object_id,
385                    state,
386                    source_id,
387                    applied,
388                } => {
389                    if state.promote_record(source_id).is_err() {
390                        return SnapshotApplyResult::Failure;
391                    }
392                    applied.set_tombstone(true);
393                    applied.clear_value();
394                    let new_head_id = state.first_record().snapshot_id();
395                    applied_info.push((object_id, state, new_head_id));
396                }
397                ApplyOperation::CommitMerged {
398                    object_id,
399                    state,
400                    merged,
401                    applied,
402                } => {
403                    let Ok(new_head_id) = state.commit_merged_record(merged) else {
404                        return SnapshotApplyResult::Failure;
405                    };
406                    applied.set_tombstone(true);
407                    applied.clear_value();
408                    applied_info.push((object_id, state, new_head_id));
409                }
410            }
411        }
412
413        for (obj_id, _, head_id) in &applied_info {
414            super::set_last_write(*obj_id, *head_id);
415        }
416
417        self.applied.set(true);
418        self.state.dispose();
419
420        // Track modified states for future cleanup instead of cleaning immediately.
421        // This defers the O(record_chain) work to check_and_overwrite_unused_records_locked,
422        // which runs periodically (e.g., on global snapshot advance) rather than on every apply.
423        // This prevents performance regression during rapid scrolling while still preventing leaks.
424        for (_, state, _) in &applied_info {
425            super::EXTRA_STATE_OBJECTS.with(|cell| {
426                cell.borrow_mut().add_trait_object(state);
427            });
428        }
429
430        let observer_states: Vec<Arc<dyn StateObject>> = applied_info
431            .iter()
432            .map(|(_, state, _)| state.clone())
433            .collect();
434        super::notify_apply_observers(&observer_states, this_id);
435        SnapshotApplyResult::Success
436    }
437
438    pub fn take_nested_mutable_snapshot(
439        self: &Arc<Self>,
440        read_observer: Option<ReadObserver>,
441        write_observer: Option<WriteObserver>,
442    ) -> Arc<NestedMutableSnapshot> {
443        self.validate_not_disposed();
444        self.validate_not_applied();
445
446        let merged_read = merge_read_observers(read_observer, self.state.read_observer.clone());
447        let merged_write = merge_write_observers(write_observer, self.state.write_observer.clone());
448
449        let (new_id, runtime_invalid) = allocate_snapshot();
450
451        // Merge runtime invalid data with the parent's invalid set and ensure the parent
452        // also tracks the child snapshot id.
453        let mut parent_invalid = self.state.invalid.borrow().clone();
454        parent_invalid = parent_invalid.set(new_id);
455        self.state.invalid.replace(parent_invalid.clone());
456        let invalid = parent_invalid.or(&runtime_invalid);
457
458        let self_weak = Arc::downgrade(self);
459        let nested = NestedMutableSnapshot::new(
460            new_id,
461            invalid,
462            merged_read,
463            merged_write,
464            self_weak,
465            self.state.id.get(), // base_parent_id for child is this snapshot's id
466        );
467
468        self.nested_count.set(self.nested_count.get() + 1);
469        self.state.add_pending_child(new_id);
470
471        let parent_weak = Arc::downgrade(self);
472        nested.set_on_dispose({
473            let child_id = new_id;
474            move || {
475                if let Some(parent) = parent_weak.upgrade() {
476                    if parent.nested_count.get() > 0 {
477                        parent
478                            .nested_count
479                            .set(parent.nested_count.get().saturating_sub(1));
480                    }
481                    let mut invalid = parent.state.invalid.borrow_mut();
482                    let new_set = invalid.clone().clear(child_id);
483                    *invalid = new_set;
484                    parent.state.remove_pending_child(child_id);
485                }
486            }
487        });
488
489        nested
490    }
491
492    /// Merge a child's modified set into this snapshot's modified set.
493    ///
494    /// Returns Ok(()) on success, or Err(()) if a conflict is detected
495    /// (i.e., this snapshot already has a modification for the same object).
496    pub(crate) fn merge_child_modifications(
497        &self,
498        child_modified: &HashMap<StateObjectId, (Arc<dyn StateObject>, SnapshotId)>,
499    ) -> Result<(), ()> {
500        // Check for conflicts
501        {
502            let parent_mod = self.state.modified.borrow();
503            for key in child_modified.keys() {
504                if parent_mod.contains_key(key) {
505                    return Err(());
506                }
507            }
508        }
509
510        // Merge entries
511        let mut parent_mod = self.state.modified.borrow_mut();
512        for (key, value) in child_modified.iter() {
513            parent_mod.entry(*key).or_insert_with(|| value.clone());
514        }
515        Ok(())
516    }
517}
518
519#[cfg(test)]
520impl MutableSnapshot {
521    pub(crate) fn debug_modified_objects(
522        &self,
523    ) -> Vec<(StateObjectId, Arc<dyn StateObject>, SnapshotId)> {
524        let modified = self.state.modified.borrow();
525        modified
526            .iter()
527            .map(|(&obj_id, (state, writer_id))| (obj_id, state.clone(), *writer_id))
528            .collect()
529    }
530
531    pub(crate) fn debug_base_parent_id(&self) -> SnapshotId {
532        self.base_parent_id
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539    use crate::snapshot_v2::runtime::TestRuntimeGuard;
540    use crate::state::{NeverEqual, SnapshotMutableState, StateObject};
541    use std::cell::Cell;
542    use std::sync::Arc;
543
544    fn reset_runtime() -> TestRuntimeGuard {
545        reset_runtime_for_tests()
546    }
547
548    fn new_state(initial: i32) -> Arc<SnapshotMutableState<i32>> {
549        SnapshotMutableState::new_in_arc(initial, Arc::new(NeverEqual))
550    }
551
552    // Mock StateObject for testing
553    #[allow(dead_code)]
554    struct MockStateObject {
555        value: Cell<i32>,
556    }
557
558    impl StateObject for MockStateObject {
559        fn object_id(&self) -> crate::state::ObjectId {
560            crate::state::ObjectId(0)
561        }
562
563        fn first_record(&self) -> Arc<crate::state::StateRecord> {
564            unimplemented!("Not needed for tests")
565        }
566
567        fn readable_record(
568            &self,
569            _snapshot_id: crate::snapshot_id_set::SnapshotId,
570            _invalid: &SnapshotIdSet,
571        ) -> Arc<crate::state::StateRecord> {
572            unimplemented!("Not needed for tests")
573        }
574
575        fn prepend_state_record(&self, _record: Arc<crate::state::StateRecord>) {
576            unimplemented!("Not needed for tests")
577        }
578
579        fn promote_record(
580            &self,
581            _child_id: crate::snapshot_id_set::SnapshotId,
582        ) -> Result<(), &'static str> {
583            unimplemented!("Not needed for tests")
584        }
585
586        fn as_any(&self) -> &dyn std::any::Any {
587            self
588        }
589    }
590
591    #[test]
592    fn test_mutable_snapshot_creation() {
593        let _guard = reset_runtime();
594        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
595        assert_eq!(snapshot.snapshot_id(), 1);
596        assert!(!snapshot.read_only());
597        assert!(!snapshot.is_disposed());
598        assert!(!snapshot.applied.get());
599    }
600
601    #[test]
602    fn test_mutable_snapshot_no_pending_changes_initially() {
603        let _guard = reset_runtime();
604        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
605        assert!(!snapshot.has_pending_changes());
606    }
607
608    #[test]
609    fn test_mutable_snapshot_enter() {
610        let _guard = reset_runtime();
611        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
612
613        set_current_snapshot(None);
614        assert!(current_snapshot().is_none());
615
616        snapshot.enter(|| {
617            let current = current_snapshot();
618            assert!(current.is_some());
619            assert_eq!(current.unwrap().snapshot_id(), 1);
620        });
621
622        assert!(current_snapshot().is_none());
623    }
624
625    #[test]
626    fn test_mutable_snapshot_read_observer() {
627        let _guard = reset_runtime();
628        use std::sync::{Arc as StdArc, Mutex};
629
630        let read_count = StdArc::new(Mutex::new(0));
631        let read_count_clone = read_count.clone();
632
633        let observer = Arc::new(move |_: &dyn StateObject| {
634            *read_count_clone.lock().unwrap() += 1;
635        });
636
637        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), Some(observer), None, 0);
638        let mock_state = MockStateObject {
639            value: Cell::new(42),
640        };
641
642        snapshot.record_read(&mock_state);
643        snapshot.record_read(&mock_state);
644
645        assert_eq!(*read_count.lock().unwrap(), 2);
646    }
647
648    #[test]
649    fn test_mutable_snapshot_write_observer() {
650        let _guard = reset_runtime();
651        use std::sync::{Arc as StdArc, Mutex};
652
653        let write_count = StdArc::new(Mutex::new(0));
654        let write_count_clone = write_count.clone();
655
656        let observer = Arc::new(move |_: &dyn StateObject| {
657            *write_count_clone.lock().unwrap() += 1;
658        });
659
660        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, Some(observer), 0);
661        let mock_state = Arc::new(MockStateObject {
662            value: Cell::new(42),
663        });
664
665        snapshot.record_write(mock_state.clone());
666        snapshot.record_write(mock_state.clone()); // Second write should not call observer
667
668        // Note: Current implementation calls observer on every write
669        // In full implementation, it would only call on first write
670        assert!(*write_count.lock().unwrap() >= 1);
671    }
672
673    #[test]
674    fn test_mutable_snapshot_apply_empty() {
675        let _guard = reset_runtime();
676        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
677        let result = snapshot.apply();
678        assert!(result.is_success());
679        assert!(snapshot.applied.get());
680    }
681
682    #[test]
683    fn test_mutable_snapshot_apply_twice_fails() {
684        let _guard = reset_runtime();
685        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
686        snapshot.apply().check();
687
688        let result = snapshot.apply();
689        assert!(result.is_failure());
690    }
691
692    #[test]
693    fn test_mutable_snapshot_nested_readonly() {
694        let _guard = reset_runtime();
695        let parent = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
696        let nested = parent.take_nested_snapshot(None);
697
698        assert_eq!(nested.snapshot_id(), 1);
699        assert!(nested.read_only());
700        assert_eq!(parent.nested_count.get(), 1);
701    }
702
703    #[test]
704    fn test_mutable_snapshot_nested_mutable() {
705        let _guard = reset_runtime();
706        let parent = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
707        let nested = parent.take_nested_mutable_snapshot(None, None);
708
709        assert!(nested.snapshot_id() > parent.snapshot_id());
710        assert!(!nested.read_only());
711        assert_eq!(parent.nested_count.get(), 1);
712    }
713
714    #[test]
715    fn test_mutable_snapshot_nested_mutable_dispose_clears_invalid() {
716        let _guard = reset_runtime();
717        let parent = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
718        let nested = parent.take_nested_mutable_snapshot(None, None);
719
720        let child_id = nested.snapshot_id();
721        assert!(parent.state.invalid.borrow().get(child_id));
722
723        nested.dispose();
724
725        assert_eq!(parent.nested_count.get(), 0);
726        assert!(!parent.state.invalid.borrow().get(child_id));
727    }
728
729    #[test]
730    fn test_mutable_snapshot_nested_dispose() {
731        let _guard = reset_runtime();
732        let parent = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
733        let nested = parent.take_nested_snapshot(None);
734
735        assert_eq!(parent.nested_count.get(), 1);
736
737        nested.dispose();
738        assert_eq!(parent.nested_count.get(), 0);
739    }
740
741    #[test]
742    #[should_panic(expected = "Snapshot has already been applied")]
743    fn test_mutable_snapshot_write_after_apply_panics() {
744        let _guard = reset_runtime();
745        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
746        snapshot.apply().check();
747
748        let mock_state = Arc::new(MockStateObject {
749            value: Cell::new(42),
750        });
751        snapshot.record_write(mock_state);
752    }
753
754    #[test]
755    #[should_panic(expected = "Snapshot has been disposed")]
756    fn test_mutable_snapshot_write_after_dispose_panics() {
757        let _guard = reset_runtime();
758        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
759        snapshot.dispose();
760
761        let mock_state = Arc::new(MockStateObject {
762            value: Cell::new(42),
763        });
764        snapshot.record_write(mock_state);
765    }
766
767    #[test]
768    fn test_mutable_snapshot_dispose() {
769        let _guard = reset_runtime();
770        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
771        assert!(!snapshot.is_disposed());
772
773        snapshot.dispose();
774        assert!(snapshot.is_disposed());
775    }
776
777    #[test]
778    fn test_mutable_snapshot_apply_observer() {
779        let _guard = reset_runtime();
780        use std::sync::{Arc as StdArc, Mutex};
781
782        let applied_count = StdArc::new(Mutex::new(0));
783        let applied_count_clone = applied_count.clone();
784
785        let observer = Arc::new(
786            move |_modified: &[Arc<dyn StateObject>], _snapshot_id: SnapshotId| {
787                *applied_count_clone.lock().unwrap() += 1;
788            },
789        );
790
791        let _handle = register_apply_observer(observer);
792
793        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
794        let state = new_state(0);
795
796        snapshot.enter(|| state.set(10));
797        snapshot.apply().check();
798
799        assert_eq!(*applied_count.lock().unwrap(), 1);
800    }
801
802    #[test]
803    fn test_mutable_conflict_detection_same_object() {
804        let _guard = reset_runtime();
805        let global = GlobalSnapshot::get_or_create();
806        let state = new_state(0);
807
808        let s1 = global.take_nested_mutable_snapshot(None, None);
809        s1.enter(|| state.set(1));
810
811        let s2 = global.take_nested_mutable_snapshot(None, None);
812        s2.enter(|| state.set(2));
813
814        assert!(s1.apply().is_success());
815        assert!(s2.apply().is_failure());
816    }
817
818    #[test]
819    fn test_mutable_no_conflict_different_objects() {
820        let _guard = reset_runtime();
821        let global = GlobalSnapshot::get_or_create();
822        let state1 = new_state(0);
823        let state2 = new_state(0);
824
825        let s1 = global.take_nested_mutable_snapshot(None, None);
826        s1.enter(|| state1.set(10));
827
828        let s2 = global.take_nested_mutable_snapshot(None, None);
829        s2.enter(|| state2.set(20));
830
831        assert!(s1.apply().is_success());
832        assert!(s2.apply().is_success());
833    }
834}