Skip to main content

cranpose_core/snapshot_v2/
mutable.rs

1//! Mutable snapshot implementation.
2
3use super::*;
4use crate::collections::map::HashMap;
5use crate::state::{StateRecord, PREEXISTING_SNAPSHOT_ID};
6use std::rc::Rc;
7use std::sync::Arc;
8
9pub(super) fn find_record_by_id(
10    head: &Rc<StateRecord>,
11    target: SnapshotId,
12) -> Option<Rc<StateRecord>> {
13    let mut cursor = Some(Rc::clone(head));
14    while let Some(record) = cursor {
15        if !record.is_tombstone() && record.snapshot_id() == target {
16            return Some(record);
17        }
18        cursor = record.next();
19    }
20    None
21}
22
23pub(super) fn find_previous_record(
24    head: &Rc<StateRecord>,
25    base_snapshot_id: SnapshotId,
26) -> (Option<Rc<StateRecord>>, bool) {
27    let mut cursor = Some(Rc::clone(head));
28    let mut best: Option<Rc<StateRecord>> = None;
29    let mut fallback: Option<Rc<StateRecord>> = None;
30    let mut found_base = false;
31
32    while let Some(record) = cursor {
33        if !record.is_tombstone() {
34            fallback = Some(record.clone());
35            if record.snapshot_id() <= base_snapshot_id {
36                found_base = true;
37                let replace = best
38                    .as_ref()
39                    .map(|current| current.snapshot_id() < record.snapshot_id())
40                    .unwrap_or(true);
41                if replace {
42                    best = Some(record.clone());
43                }
44            }
45        }
46        cursor = record.next();
47    }
48
49    (best.or(fallback), found_base)
50}
51
52enum ApplyOperation {
53    PromoteChild {
54        object_id: StateObjectId,
55        state: Arc<dyn StateObject>,
56        writer_id: SnapshotId,
57    },
58    PromoteExisting {
59        object_id: StateObjectId,
60        state: Arc<dyn StateObject>,
61        source_id: SnapshotId,
62        applied: Rc<StateRecord>,
63    },
64    CommitMerged {
65        object_id: StateObjectId,
66        state: Arc<dyn StateObject>,
67        merged: Rc<StateRecord>,
68        applied: Rc<StateRecord>,
69    },
70}
71
72/// A mutable snapshot that allows isolated state changes.
73///
74/// Changes made in a mutable snapshot are isolated from other snapshots
75/// until `apply()` is called, at which point they become visible atomically.
76/// This is a root mutable snapshot (not nested).
77///
78/// # Thread Safety
79/// Contains `Cell<T>` which is not `Send`/`Sync`. This is safe because snapshots
80/// are stored in thread-local storage and never shared across threads. The `Arc`
81/// is used for cheap cloning within a single thread, not for cross-thread sharing.
82#[allow(clippy::arc_with_non_send_sync)]
83pub struct MutableSnapshot {
84    state: SnapshotState,
85    /// The parent's snapshot id at the time this snapshot was created
86    base_parent_id: SnapshotId,
87    /// Number of active nested snapshots
88    nested_count: Cell<usize>,
89    /// Whether this snapshot has been applied
90    applied: Cell<bool>,
91}
92
93impl MutableSnapshot {
94    pub(crate) fn from_parts(
95        id: SnapshotId,
96        invalid: SnapshotIdSet,
97        read_observer: Option<ReadObserver>,
98        write_observer: Option<WriteObserver>,
99        base_parent_id: SnapshotId,
100        runtime_tracked: bool,
101    ) -> Arc<Self> {
102        Arc::new(Self {
103            state: SnapshotState::new(id, invalid, read_observer, write_observer, runtime_tracked),
104            base_parent_id,
105            nested_count: Cell::new(0),
106            applied: Cell::new(false),
107        })
108    }
109
110    /// Create a new root mutable snapshot using the global runtime.
111    pub fn new_root(
112        read_observer: Option<ReadObserver>,
113        write_observer: Option<WriteObserver>,
114    ) -> Arc<Self> {
115        GlobalSnapshot::get_or_create().take_nested_mutable_snapshot(read_observer, write_observer)
116    }
117
118    /// Create a new root mutable snapshot.
119    pub fn new(
120        id: SnapshotId,
121        invalid: SnapshotIdSet,
122        read_observer: Option<ReadObserver>,
123        write_observer: Option<WriteObserver>,
124        base_parent_id: SnapshotId,
125    ) -> Arc<Self> {
126        Self::from_parts(
127            id,
128            invalid,
129            read_observer,
130            write_observer,
131            base_parent_id,
132            false,
133        )
134    }
135
136    fn validate_not_applied(&self) {
137        if self.applied.get() {
138            panic!("Snapshot has already been applied");
139        }
140    }
141
142    fn validate_not_disposed(&self) {
143        if self.state.disposed.get() {
144            panic!("Snapshot has been disposed");
145        }
146    }
147
148    pub fn snapshot_id(&self) -> SnapshotId {
149        self.state.id.get()
150    }
151
152    pub fn invalid(&self) -> SnapshotIdSet {
153        self.state.invalid.borrow().clone()
154    }
155
156    pub fn read_only(&self) -> bool {
157        false
158    }
159
160    pub(crate) fn set_on_dispose<F>(&self, f: F)
161    where
162        F: FnOnce() + 'static,
163    {
164        self.state.set_on_dispose(f);
165    }
166
167    pub fn root_mutable(self: &Arc<Self>) -> Arc<Self> {
168        self.clone()
169    }
170
171    pub fn enter<T>(self: &Arc<Self>, f: impl FnOnce() -> T) -> T {
172        let previous = current_snapshot();
173        set_current_snapshot(Some(AnySnapshot::Mutable(self.clone())));
174        let result = f();
175        set_current_snapshot(previous);
176        result
177    }
178
179    pub fn take_nested_snapshot(
180        self: &Arc<Self>,
181        read_observer: Option<ReadObserver>,
182    ) -> Arc<ReadonlySnapshot> {
183        self.validate_not_disposed();
184        self.validate_not_applied();
185
186        let merged_observer = merge_read_observers(read_observer, self.state.read_observer.clone());
187
188        // Create a nested read-only snapshot
189        let nested = ReadonlySnapshot::new(
190            self.state.id.get(),
191            self.state.invalid.borrow().clone(),
192            merged_observer,
193        );
194
195        self.nested_count.set(self.nested_count.get() + 1);
196
197        // When the nested snapshot is disposed, decrement this parent's nested_count
198        let parent_weak = Arc::downgrade(self);
199        nested.set_on_dispose(move || {
200            if let Some(parent) = parent_weak.upgrade() {
201                let cur = parent.nested_count.get();
202                if cur > 0 {
203                    parent.nested_count.set(cur - 1);
204                }
205            }
206        });
207        nested
208    }
209
210    pub fn has_pending_changes(&self) -> bool {
211        !self.state.modified.borrow().is_empty()
212    }
213
214    pub fn pending_children(&self) -> Vec<SnapshotId> {
215        self.state.pending_children()
216    }
217
218    pub fn has_pending_children(&self) -> bool {
219        self.state.has_pending_children()
220    }
221
222    pub fn dispose(&self) {
223        if !self.state.disposed.get() && self.nested_count.get() == 0 {
224            self.state.dispose();
225        }
226    }
227
228    pub fn record_read(&self, state: &dyn StateObject) {
229        self.state.record_read(state);
230    }
231
232    pub fn record_write(&self, state: Arc<dyn StateObject>) {
233        self.validate_not_applied();
234        self.validate_not_disposed();
235        self.state.record_write(state, self.state.id.get());
236    }
237
238    pub fn notify_objects_initialized(&self) {
239        if !self.applied.get() && !self.state.disposed.get() {
240            // Mark that objects are initialized
241            // In a full implementation, this would update internal state
242        }
243    }
244
245    pub fn close(&self) {
246        self.state.disposed.set(true);
247    }
248
249    pub fn is_disposed(&self) -> bool {
250        self.state.disposed.get()
251    }
252
253    pub fn apply(&self) -> SnapshotApplyResult {
254        // Check disposed state first - return Failure instead of panicking
255        if self.state.disposed.get() {
256            return SnapshotApplyResult::Failure;
257        }
258
259        if self.applied.get() {
260            return SnapshotApplyResult::Failure;
261        }
262
263        let modified = self.state.modified.borrow();
264        if modified.is_empty() {
265            // No changes to apply
266            self.applied.set(true);
267            self.state.dispose();
268            return SnapshotApplyResult::Success;
269        }
270
271        let this_id = self.state.id.get();
272        let mut modified_objects: Vec<(StateObjectId, Arc<dyn StateObject>, SnapshotId)> =
273            Vec::with_capacity(modified.len());
274        for (&obj_id, (obj, writer_id)) in modified.iter() {
275            modified_objects.push((obj_id, obj.clone(), *writer_id));
276        }
277
278        drop(modified);
279
280        let parent_snapshot = GlobalSnapshot::get_or_create();
281        let parent_snapshot_id = parent_snapshot.snapshot_id();
282        let parent_invalid = parent_snapshot.invalid();
283        drop(parent_snapshot);
284
285        let next_invalid = super::runtime::open_snapshots().clear(parent_snapshot_id);
286        let optimistic = super::optimistic_merges(
287            parent_snapshot_id,
288            self.base_parent_id,
289            &modified_objects,
290            &next_invalid,
291        );
292
293        let mut operations: Vec<ApplyOperation> = Vec::with_capacity(modified_objects.len());
294
295        for (obj_id, state, writer_id) in &modified_objects {
296            let head = state.first_record();
297            let applied = match find_record_by_id(&head, *writer_id) {
298                Some(record) => record,
299                None => return SnapshotApplyResult::Failure,
300            };
301
302            let current =
303                crate::state::readable_record_for(&head, parent_snapshot_id, &next_invalid)
304                    .unwrap_or_else(|| state.readable_record(parent_snapshot_id, &parent_invalid));
305            let (previous_opt, found_base) = find_previous_record(&head, self.base_parent_id);
306            let Some(previous) = previous_opt else {
307                return SnapshotApplyResult::Failure;
308            };
309
310            if !found_base || previous.snapshot_id() == PREEXISTING_SNAPSHOT_ID {
311                operations.push(ApplyOperation::PromoteChild {
312                    object_id: *obj_id,
313                    state: state.clone(),
314                    writer_id: *writer_id,
315                });
316                continue;
317            }
318
319            if Rc::ptr_eq(&current, &previous) {
320                operations.push(ApplyOperation::PromoteChild {
321                    object_id: *obj_id,
322                    state: state.clone(),
323                    writer_id: *writer_id,
324                });
325                continue;
326            }
327
328            let merged = if let Some(candidate) = optimistic
329                .as_ref()
330                .and_then(|map| map.get(&(Rc::as_ptr(&current) as usize)))
331                .cloned()
332            {
333                candidate
334            } else {
335                match state.merge_records(
336                    Rc::clone(&previous),
337                    Rc::clone(&current),
338                    Rc::clone(&applied),
339                ) {
340                    Some(record) => record,
341                    None => return SnapshotApplyResult::Failure,
342                }
343            };
344
345            if Rc::ptr_eq(&merged, &applied) {
346                operations.push(ApplyOperation::PromoteChild {
347                    object_id: *obj_id,
348                    state: state.clone(),
349                    writer_id: *writer_id,
350                });
351            } else if Rc::ptr_eq(&merged, &current) {
352                operations.push(ApplyOperation::PromoteExisting {
353                    object_id: *obj_id,
354                    state: state.clone(),
355                    source_id: current.snapshot_id(),
356                    applied: applied.clone(),
357                });
358            } else {
359                operations.push(ApplyOperation::CommitMerged {
360                    object_id: *obj_id,
361                    state: state.clone(),
362                    merged: merged.clone(),
363                    applied: applied.clone(),
364                });
365            }
366        }
367
368        let mut applied_info: Vec<(StateObjectId, Arc<dyn StateObject>, SnapshotId)> =
369            Vec::with_capacity(operations.len());
370
371        for operation in operations {
372            match operation {
373                ApplyOperation::PromoteChild {
374                    object_id,
375                    state,
376                    writer_id,
377                } => {
378                    if state.promote_record(writer_id).is_err() {
379                        return SnapshotApplyResult::Failure;
380                    }
381                    let new_head_id = state.first_record().snapshot_id();
382                    applied_info.push((object_id, state, new_head_id));
383                }
384                ApplyOperation::PromoteExisting {
385                    object_id,
386                    state,
387                    source_id,
388                    applied,
389                } => {
390                    if state.promote_record(source_id).is_err() {
391                        return SnapshotApplyResult::Failure;
392                    }
393                    applied.set_tombstone(true);
394                    applied.clear_value();
395                    let new_head_id = state.first_record().snapshot_id();
396                    applied_info.push((object_id, state, new_head_id));
397                }
398                ApplyOperation::CommitMerged {
399                    object_id,
400                    state,
401                    merged,
402                    applied,
403                } => {
404                    let Ok(new_head_id) = state.commit_merged_record(merged) else {
405                        return SnapshotApplyResult::Failure;
406                    };
407                    applied.set_tombstone(true);
408                    applied.clear_value();
409                    applied_info.push((object_id, state, new_head_id));
410                }
411            }
412        }
413
414        for (obj_id, _, head_id) in &applied_info {
415            super::set_last_write(*obj_id, *head_id);
416        }
417
418        self.applied.set(true);
419        self.state.dispose();
420
421        // Track modified states for future cleanup instead of cleaning immediately.
422        // This defers the O(record_chain) work to check_and_overwrite_unused_records_locked,
423        // which runs periodically (e.g., on global snapshot advance) rather than on every apply.
424        // This prevents performance regression during rapid scrolling while still preventing leaks.
425        for (_, state, _) in &applied_info {
426            super::EXTRA_STATE_OBJECTS.with(|cell| {
427                cell.borrow_mut().add_trait_object(state);
428            });
429        }
430
431        let observer_states: Vec<Arc<dyn StateObject>> = applied_info
432            .iter()
433            .map(|(_, state, _)| state.clone())
434            .collect();
435        super::notify_apply_observers(&observer_states, this_id);
436        SnapshotApplyResult::Success
437    }
438
439    pub fn take_nested_mutable_snapshot(
440        self: &Arc<Self>,
441        read_observer: Option<ReadObserver>,
442        write_observer: Option<WriteObserver>,
443    ) -> Arc<NestedMutableSnapshot> {
444        self.validate_not_disposed();
445        self.validate_not_applied();
446
447        let merged_read = merge_read_observers(read_observer, self.state.read_observer.clone());
448        let merged_write = merge_write_observers(write_observer, self.state.write_observer.clone());
449
450        let (new_id, runtime_invalid) = allocate_snapshot();
451
452        // Merge runtime invalid data with the parent's invalid set and ensure the parent
453        // also tracks the child snapshot id.
454        let mut parent_invalid = self.state.invalid.borrow().clone();
455        parent_invalid = parent_invalid.set(new_id);
456        self.state.invalid.replace(parent_invalid.clone());
457        let invalid = parent_invalid.or(&runtime_invalid);
458
459        let self_weak = Arc::downgrade(self);
460        let nested = NestedMutableSnapshot::new(
461            new_id,
462            invalid,
463            merged_read,
464            merged_write,
465            self_weak,
466            self.state.id.get(), // base_parent_id for child is this snapshot's id
467        );
468
469        self.nested_count.set(self.nested_count.get() + 1);
470        self.state.add_pending_child(new_id);
471
472        let parent_weak = Arc::downgrade(self);
473        nested.set_on_dispose({
474            let child_id = new_id;
475            move || {
476                if let Some(parent) = parent_weak.upgrade() {
477                    if parent.nested_count.get() > 0 {
478                        parent
479                            .nested_count
480                            .set(parent.nested_count.get().saturating_sub(1));
481                    }
482                    let mut invalid = parent.state.invalid.borrow_mut();
483                    let new_set = invalid.clone().clear(child_id);
484                    *invalid = new_set;
485                    parent.state.remove_pending_child(child_id);
486                }
487            }
488        });
489
490        nested
491    }
492
493    /// Merge a child's modified set into this snapshot's modified set.
494    ///
495    /// Returns Ok(()) on success, or Err(()) if a conflict is detected
496    /// (i.e., this snapshot already has a modification for the same object).
497    pub(crate) fn merge_child_modifications(
498        &self,
499        child_modified: &HashMap<StateObjectId, (Arc<dyn StateObject>, SnapshotId)>,
500    ) -> Result<(), ()> {
501        // Check for conflicts
502        {
503            let parent_mod = self.state.modified.borrow();
504            for key in child_modified.keys() {
505                if parent_mod.contains_key(key) {
506                    return Err(());
507                }
508            }
509        }
510
511        // Merge entries
512        let mut parent_mod = self.state.modified.borrow_mut();
513        for (key, value) in child_modified.iter() {
514            parent_mod.entry(*key).or_insert_with(|| value.clone());
515        }
516        Ok(())
517    }
518}
519
520#[cfg(test)]
521impl MutableSnapshot {
522    pub(crate) fn debug_modified_objects(
523        &self,
524    ) -> Vec<(StateObjectId, Arc<dyn StateObject>, SnapshotId)> {
525        let modified = self.state.modified.borrow();
526        modified
527            .iter()
528            .map(|(&obj_id, (state, writer_id))| (obj_id, state.clone(), *writer_id))
529            .collect()
530    }
531
532    pub(crate) fn debug_base_parent_id(&self) -> SnapshotId {
533        self.base_parent_id
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540    use crate::snapshot_v2::runtime::TestRuntimeGuard;
541    use crate::state::{NeverEqual, SnapshotMutableState, StateObject};
542    use std::cell::Cell;
543    use std::sync::Arc;
544
545    fn reset_runtime() -> TestRuntimeGuard {
546        reset_runtime_for_tests()
547    }
548
549    fn new_state(initial: i32) -> Arc<SnapshotMutableState<i32>> {
550        SnapshotMutableState::new_in_arc(initial, Arc::new(NeverEqual))
551    }
552
553    // Mock StateObject for testing
554    #[allow(dead_code)]
555    struct MockStateObject {
556        value: Cell<i32>,
557    }
558
559    impl StateObject for MockStateObject {
560        fn object_id(&self) -> crate::state::ObjectId {
561            crate::state::ObjectId(0)
562        }
563
564        fn first_record(&self) -> Rc<crate::state::StateRecord> {
565            unimplemented!("Not needed for tests")
566        }
567
568        fn readable_record(
569            &self,
570            _snapshot_id: crate::snapshot_id_set::SnapshotId,
571            _invalid: &SnapshotIdSet,
572        ) -> Rc<crate::state::StateRecord> {
573            unimplemented!("Not needed for tests")
574        }
575
576        fn prepend_state_record(&self, _record: Rc<crate::state::StateRecord>) {
577            unimplemented!("Not needed for tests")
578        }
579
580        fn promote_record(
581            &self,
582            _child_id: crate::snapshot_id_set::SnapshotId,
583        ) -> Result<(), &'static str> {
584            unimplemented!("Not needed for tests")
585        }
586
587        fn as_any(&self) -> &dyn std::any::Any {
588            self
589        }
590    }
591
592    #[test]
593    fn test_mutable_snapshot_creation() {
594        let _guard = reset_runtime();
595        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
596        assert_eq!(snapshot.snapshot_id(), 1);
597        assert!(!snapshot.read_only());
598        assert!(!snapshot.is_disposed());
599        assert!(!snapshot.applied.get());
600    }
601
602    #[test]
603    fn test_mutable_snapshot_no_pending_changes_initially() {
604        let _guard = reset_runtime();
605        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
606        assert!(!snapshot.has_pending_changes());
607    }
608
609    #[test]
610    fn test_mutable_snapshot_enter() {
611        let _guard = reset_runtime();
612        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
613
614        set_current_snapshot(None);
615        assert!(current_snapshot().is_none());
616
617        snapshot.enter(|| {
618            let current = current_snapshot();
619            assert!(current.is_some());
620            assert_eq!(current.unwrap().snapshot_id(), 1);
621        });
622
623        assert!(current_snapshot().is_none());
624    }
625
626    #[test]
627    fn test_mutable_snapshot_read_observer() {
628        let _guard = reset_runtime();
629        use std::sync::{Arc as StdArc, Mutex};
630
631        let read_count = StdArc::new(Mutex::new(0));
632        let read_count_clone = read_count.clone();
633
634        let observer = Arc::new(move |_: &dyn StateObject| {
635            *read_count_clone.lock().unwrap() += 1;
636        });
637
638        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), Some(observer), None, 0);
639        let mock_state = MockStateObject {
640            value: Cell::new(42),
641        };
642
643        snapshot.record_read(&mock_state);
644        snapshot.record_read(&mock_state);
645
646        assert_eq!(*read_count.lock().unwrap(), 2);
647    }
648
649    #[test]
650    fn test_mutable_snapshot_write_observer() {
651        let _guard = reset_runtime();
652        use std::sync::{Arc as StdArc, Mutex};
653
654        let write_count = StdArc::new(Mutex::new(0));
655        let write_count_clone = write_count.clone();
656
657        let observer = Arc::new(move |_: &dyn StateObject| {
658            *write_count_clone.lock().unwrap() += 1;
659        });
660
661        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, Some(observer), 0);
662        let mock_state = Arc::new(MockStateObject {
663            value: Cell::new(42),
664        });
665
666        snapshot.record_write(mock_state.clone());
667        snapshot.record_write(mock_state.clone()); // Second write should not call observer
668
669        // Note: Current implementation calls observer on every write
670        // In full implementation, it would only call on first write
671        assert!(*write_count.lock().unwrap() >= 1);
672    }
673
674    #[test]
675    fn test_mutable_snapshot_apply_empty() {
676        let _guard = reset_runtime();
677        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
678        let result = snapshot.apply();
679        assert!(result.is_success());
680        assert!(snapshot.applied.get());
681    }
682
683    #[test]
684    fn test_mutable_snapshot_apply_twice_fails() {
685        let _guard = reset_runtime();
686        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
687        snapshot.apply().check();
688
689        let result = snapshot.apply();
690        assert!(result.is_failure());
691    }
692
693    #[test]
694    fn test_mutable_snapshot_nested_readonly() {
695        let _guard = reset_runtime();
696        let parent = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
697        let nested = parent.take_nested_snapshot(None);
698
699        assert_eq!(nested.snapshot_id(), 1);
700        assert!(nested.read_only());
701        assert_eq!(parent.nested_count.get(), 1);
702    }
703
704    #[test]
705    fn test_mutable_snapshot_nested_mutable() {
706        let _guard = reset_runtime();
707        let parent = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
708        let nested = parent.take_nested_mutable_snapshot(None, None);
709
710        assert!(nested.snapshot_id() > parent.snapshot_id());
711        assert!(!nested.read_only());
712        assert_eq!(parent.nested_count.get(), 1);
713    }
714
715    #[test]
716    fn test_mutable_snapshot_nested_mutable_dispose_clears_invalid() {
717        let _guard = reset_runtime();
718        let parent = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
719        let nested = parent.take_nested_mutable_snapshot(None, None);
720
721        let child_id = nested.snapshot_id();
722        assert!(parent.state.invalid.borrow().get(child_id));
723
724        nested.dispose();
725
726        assert_eq!(parent.nested_count.get(), 0);
727        assert!(!parent.state.invalid.borrow().get(child_id));
728    }
729
730    #[test]
731    fn test_mutable_snapshot_nested_dispose() {
732        let _guard = reset_runtime();
733        let parent = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
734        let nested = parent.take_nested_snapshot(None);
735
736        assert_eq!(parent.nested_count.get(), 1);
737
738        nested.dispose();
739        assert_eq!(parent.nested_count.get(), 0);
740    }
741
742    #[test]
743    #[should_panic(expected = "Snapshot has already been applied")]
744    fn test_mutable_snapshot_write_after_apply_panics() {
745        let _guard = reset_runtime();
746        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
747        snapshot.apply().check();
748
749        let mock_state = Arc::new(MockStateObject {
750            value: Cell::new(42),
751        });
752        snapshot.record_write(mock_state);
753    }
754
755    #[test]
756    #[should_panic(expected = "Snapshot has been disposed")]
757    fn test_mutable_snapshot_write_after_dispose_panics() {
758        let _guard = reset_runtime();
759        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
760        snapshot.dispose();
761
762        let mock_state = Arc::new(MockStateObject {
763            value: Cell::new(42),
764        });
765        snapshot.record_write(mock_state);
766    }
767
768    #[test]
769    fn test_mutable_snapshot_dispose() {
770        let _guard = reset_runtime();
771        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
772        assert!(!snapshot.is_disposed());
773
774        snapshot.dispose();
775        assert!(snapshot.is_disposed());
776    }
777
778    #[test]
779    fn test_mutable_snapshot_apply_observer() {
780        let _guard = reset_runtime();
781        use std::sync::{Arc as StdArc, Mutex};
782
783        let applied_count = StdArc::new(Mutex::new(0));
784        let applied_count_clone = applied_count.clone();
785
786        let observer = Arc::new(
787            move |_modified: &[Arc<dyn StateObject>], _snapshot_id: SnapshotId| {
788                *applied_count_clone.lock().unwrap() += 1;
789            },
790        );
791
792        let _handle = register_apply_observer(observer);
793
794        let snapshot = MutableSnapshot::new(1, SnapshotIdSet::new(), None, None, 0);
795        let state = new_state(0);
796
797        snapshot.enter(|| state.set(10));
798        snapshot.apply().check();
799
800        assert_eq!(*applied_count.lock().unwrap(), 1);
801    }
802
803    #[test]
804    fn test_mutable_conflict_detection_same_object() {
805        let _guard = reset_runtime();
806        let global = GlobalSnapshot::get_or_create();
807        let state = new_state(0);
808
809        let s1 = global.take_nested_mutable_snapshot(None, None);
810        s1.enter(|| state.set(1));
811
812        let s2 = global.take_nested_mutable_snapshot(None, None);
813        s2.enter(|| state.set(2));
814
815        assert!(s1.apply().is_success());
816        assert!(s2.apply().is_failure());
817    }
818
819    #[test]
820    fn test_mutable_no_conflict_different_objects() {
821        let _guard = reset_runtime();
822        let global = GlobalSnapshot::get_or_create();
823        let state1 = new_state(0);
824        let state2 = new_state(0);
825
826        let s1 = global.take_nested_mutable_snapshot(None, None);
827        s1.enter(|| state1.set(10));
828
829        let s2 = global.take_nested_mutable_snapshot(None, None);
830        s2.enter(|| state2.set(20));
831
832        assert!(s1.apply().is_success());
833        assert!(s2.apply().is_success());
834    }
835}