#![allow(clippy::arc_with_non_send_sync)]
use crate::collections::map::HashSet;
use std::any::Any;
use std::cell::{Cell, RefCell};
use std::sync::{Arc, Mutex, RwLock, Weak};
use crate::snapshot_id_set::{SnapshotId, SnapshotIdSet};
use crate::snapshot_pinning::lowest_pinned_snapshot;
use crate::snapshot_v2::{
advance_global_snapshot, allocate_record_id, current_snapshot, AnySnapshot, GlobalSnapshot,
};
pub(crate) const PREEXISTING_SNAPSHOT_ID: SnapshotId = 1;
const INVALID_SNAPSHOT_ID: SnapshotId = 0;
const SNAPSHOT_ID_MAX: SnapshotId = usize::MAX;
#[derive(Clone, Copy, Eq, PartialEq, Hash, Debug, Default)]
pub struct ObjectId(pub(crate) usize);
impl ObjectId {
pub(crate) fn new<T: ?Sized + 'static>(object: &Arc<T>) -> Self {
Self(Arc::as_ptr(object) as *const () as usize)
}
#[inline]
pub(crate) fn as_usize(self) -> usize {
self.0
}
}
#[allow(clippy::arc_with_non_send_sync)]
pub struct StateRecord {
snapshot_id: Cell<SnapshotId>,
tombstone: Cell<bool>,
next: Cell<Option<Arc<StateRecord>>>,
value: RwLock<Option<Box<dyn Any>>>,
}
impl StateRecord {
pub(crate) fn new<T: Any>(
snapshot_id: SnapshotId,
value: T,
next: Option<Arc<StateRecord>>,
) -> Arc<Self> {
Arc::new(Self {
snapshot_id: Cell::new(snapshot_id),
tombstone: Cell::new(false),
next: Cell::new(next),
value: RwLock::new(Some(Box::new(value))),
})
}
#[inline]
pub(crate) fn snapshot_id(&self) -> SnapshotId {
self.snapshot_id.get()
}
#[inline]
pub(crate) fn set_snapshot_id(&self, id: SnapshotId) {
self.snapshot_id.set(id);
}
#[inline]
pub(crate) fn next(&self) -> Option<Arc<StateRecord>> {
self.next.take().inspect(|arc| {
self.next.set(Some(Arc::clone(arc)));
})
}
#[inline]
pub(crate) fn set_next(&self, next: Option<Arc<StateRecord>>) {
self.next.set(next);
}
#[inline]
pub(crate) fn is_tombstone(&self) -> bool {
self.tombstone.get()
}
#[inline]
pub(crate) fn set_tombstone(&self, tombstone: bool) {
self.tombstone.set(tombstone);
}
pub(crate) fn clear_value(&self) {
self.value
.write()
.expect("StateRecord lock poisoned")
.take();
}
pub(crate) fn replace_value<T: Any>(&self, new_value: T) {
*self.value.write().expect("StateRecord lock poisoned") = Some(Box::new(new_value));
}
pub(crate) fn with_value<T: Any, R>(&self, f: impl FnOnce(&T) -> R) -> R {
let guard = self.value.read().expect("StateRecord lock poisoned");
let value = guard
.as_ref()
.and_then(|boxed| boxed.downcast_ref::<T>())
.expect("StateRecord value missing or wrong type");
f(value)
}
#[allow(dead_code)]
pub(crate) fn clear_for_reuse(&self) {
self.clear_value();
}
pub(crate) fn assign_value<T: Any + Clone>(&self, source: &StateRecord) {
let cloned_value = source.with_value(|value: &T| value.clone());
self.replace_value(cloned_value);
}
}
#[inline]
fn record_is_valid_for(
record: &Arc<StateRecord>,
snapshot_id: SnapshotId,
invalid: &SnapshotIdSet,
) -> bool {
if record.is_tombstone() {
return false;
}
let candidate = record.snapshot_id();
if candidate == INVALID_SNAPSHOT_ID || candidate > snapshot_id {
return false;
}
candidate == snapshot_id || !invalid.get(candidate)
}
pub(crate) fn readable_record_for(
head: &Arc<StateRecord>,
snapshot_id: SnapshotId,
invalid: &SnapshotIdSet,
) -> Option<Arc<StateRecord>> {
let mut best: Option<Arc<StateRecord>> = None;
let mut cursor = Some(Arc::clone(head));
while let Some(record) = cursor {
if record_is_valid_for(&record, snapshot_id, invalid) {
let replace = best
.as_ref()
.map(|current| current.snapshot_id() < record.snapshot_id())
.unwrap_or(true);
if replace {
best = Some(Arc::clone(&record));
}
}
cursor = record.next();
}
best
}
fn find_youngest_or<F>(head: &Arc<StateRecord>, predicate: F) -> Arc<StateRecord>
where
F: Fn(&Arc<StateRecord>) -> bool,
{
let mut current = Some(Arc::clone(head));
let mut youngest = Arc::clone(head);
while let Some(record) = current {
if predicate(&record) {
return record;
}
if youngest.snapshot_id() < record.snapshot_id() {
youngest = Arc::clone(&record);
}
current = record.next();
}
youngest
}
pub(crate) fn used_locked(head: &Arc<StateRecord>) -> Option<Arc<StateRecord>> {
let mut current = Some(Arc::clone(head));
let mut valid_record: Option<Arc<StateRecord>> = None;
let reuse_limit = lowest_pinned_snapshot()
.map(|lowest| lowest.saturating_sub(1))
.unwrap_or_else(|| allocate_record_id().saturating_sub(1));
let invalid = SnapshotIdSet::EMPTY;
while let Some(record) = current {
let current_id = record.snapshot_id();
if current_id == PREEXISTING_SNAPSHOT_ID {
current = record.next();
continue;
}
if current_id == INVALID_SNAPSHOT_ID {
return Some(record);
}
if record_is_valid_for(&record, reuse_limit, &invalid) {
if let Some(ref existing) = valid_record {
return Some(if current_id < existing.snapshot_id() {
record
} else {
Arc::clone(existing)
});
} else {
valid_record = Some(record.clone());
}
}
current = record.next();
}
None
}
pub(crate) fn new_overwritable_record_locked(state: &dyn StateObject) -> Arc<StateRecord> {
let state_head = state.first_record();
if let Some(reusable) = used_locked(&state_head) {
reusable.set_snapshot_id(SNAPSHOT_ID_MAX);
return reusable;
}
let new_record = StateRecord::new(
SNAPSHOT_ID_MAX,
(), None, );
state.prepend_state_record(Arc::clone(&new_record));
new_record
}
pub(crate) fn overwrite_unused_records_locked<T: Any + Clone>(state: &dyn StateObject) -> bool {
let head = state.first_record();
let mut current = Some(Arc::clone(&head));
let mut overwrite_record: Option<Arc<StateRecord>> = None;
let mut valid_record: Option<Arc<StateRecord>> = None;
let reuse_limit =
lowest_pinned_snapshot().unwrap_or_else(crate::snapshot_v2::peek_next_snapshot_id);
let mut retained_records = 0;
while let Some(record) = current {
let current_id = record.snapshot_id();
if current_id != INVALID_SNAPSHOT_ID {
if current_id < reuse_limit {
if valid_record.is_none() {
valid_record = Some(Arc::clone(&record));
retained_records += 1;
} else {
let valid = valid_record.as_ref().unwrap();
let record_to_overwrite = if current_id < valid.snapshot_id() {
Arc::clone(&record)
} else {
let to_overwrite = Arc::clone(valid);
valid_record = Some(Arc::clone(&record));
to_overwrite
};
if overwrite_record.is_none() {
overwrite_record =
Some(find_youngest_or(&head, |r| r.snapshot_id() >= reuse_limit));
}
record_to_overwrite.set_snapshot_id(INVALID_SNAPSHOT_ID);
record_to_overwrite.assign_value::<T>(overwrite_record.as_ref().unwrap());
}
} else {
retained_records += 1;
}
}
current = record.next();
}
retained_records > 1
}
fn active_snapshot() -> AnySnapshot {
current_snapshot().unwrap_or_else(|| AnySnapshot::Global(GlobalSnapshot::get_or_create()))
}
pub(crate) trait MutationPolicy<T>: Send + Sync {
fn equivalent(&self, a: &T, b: &T) -> bool;
fn merge(&self, _previous: &T, _current: &T, _applied: &T) -> Option<T> {
None
}
}
pub(crate) struct NeverEqual;
impl<T> MutationPolicy<T> for NeverEqual {
fn equivalent(&self, _a: &T, _b: &T) -> bool {
false
}
}
pub trait StateObject: Any {
fn object_id(&self) -> ObjectId;
fn first_record(&self) -> Arc<StateRecord>;
fn readable_record(&self, snapshot_id: SnapshotId, invalid: &SnapshotIdSet)
-> Arc<StateRecord>;
fn prepend_state_record(&self, record: Arc<StateRecord>);
fn merge_records(
&self,
_previous: Arc<StateRecord>,
_current: Arc<StateRecord>,
_applied: Arc<StateRecord>,
) -> Option<Arc<StateRecord>> {
None
}
fn commit_merged_record(&self, _merged: Arc<StateRecord>) -> Result<SnapshotId, &'static str> {
Err("StateObject does not support merged record commits")
}
fn promote_record(&self, child_id: SnapshotId) -> Result<(), &'static str>;
fn overwrite_unused_records(&self) -> bool {
false }
fn as_any(&self) -> &dyn Any;
}
pub(crate) struct SnapshotMutableState<T> {
head: RwLock<Arc<StateRecord>>,
policy: Arc<dyn MutationPolicy<T>>,
id: ObjectId,
weak_self: Mutex<Option<Weak<Self>>>,
apply_observers: Mutex<Vec<Box<dyn Fn() + 'static>>>,
}
impl<T> SnapshotMutableState<T> {
fn assert_chain_integrity(&self, caller: &str, snapshot_context: Option<SnapshotId>) {
let head = self.head.read().expect("State head lock poisoned").clone();
let mut cursor = Some(head);
let mut seen: HashSet<usize> = HashSet::default();
let mut ids = Vec::new();
while let Some(record) = cursor {
let addr = Arc::as_ptr(&record) as usize;
assert!(
seen.insert(addr),
"SnapshotMutableState::{} detected duplicate/cycle at record {:p} for state {:?} (snapshot_context={:?}, chain_ids={:?})",
caller,
Arc::as_ptr(&record),
self.id,
snapshot_context,
ids
);
ids.push(record.snapshot_id());
cursor = record.next();
}
assert!(
!ids.is_empty(),
"SnapshotMutableState::{} finished integrity scan with empty id list for state {:?} (snapshot_context={:?})",
caller,
self.id,
snapshot_context
);
}
}
impl<T: Clone + 'static> SnapshotMutableState<T> {
fn readable_for(
&self,
snapshot_id: SnapshotId,
invalid: &SnapshotIdSet,
) -> Option<Arc<StateRecord>> {
let head = self.first_record();
readable_record_for(&head, snapshot_id, invalid)
}
fn writable_record(
&self,
snapshot_id: SnapshotId,
invalid: &SnapshotIdSet,
) -> Arc<StateRecord> {
let readable = match self.readable_for(snapshot_id, invalid) {
Some(record) => record,
None => {
let mut head_guard = self.head.write().expect("State head lock poisoned");
let current_head = head_guard.clone();
let refreshed = readable_record_for(¤t_head, snapshot_id, invalid);
let source = refreshed.unwrap_or_else(|| current_head.clone());
let cloned_value = source.with_value(|value: &T| value.clone());
let new_head = StateRecord::new(snapshot_id, cloned_value, Some(current_head));
*head_guard = new_head.clone();
drop(head_guard);
self.assert_chain_integrity("writable_record(recover)", Some(snapshot_id));
return new_head;
}
};
if readable.snapshot_id() == snapshot_id {
return readable;
}
let refreshed = {
let head_guard = self.head.read().expect("State head lock poisoned");
let current_head = head_guard.clone();
let refreshed = readable_record_for(¤t_head, snapshot_id, invalid).unwrap_or_else(
|| {
panic!(
"SnapshotMutableState::writable_record failed to locate refreshed readable record (state {:?}, snapshot_id={}, invalid={:?})",
self.id, snapshot_id, invalid
)
},
);
if refreshed.snapshot_id() == snapshot_id {
return refreshed;
}
Arc::clone(&refreshed)
};
let overwritable = new_overwritable_record_locked(self);
overwritable.assign_value::<T>(&refreshed);
overwritable.set_snapshot_id(snapshot_id);
overwritable.set_tombstone(false);
self.assert_chain_integrity("writable_record(reuse)", Some(snapshot_id));
overwritable
}
pub(crate) fn new_in_arc(initial: T, policy: Arc<dyn MutationPolicy<T>>) -> Arc<Self> {
let snapshot = active_snapshot();
let snapshot_id = snapshot.snapshot_id();
let tail = StateRecord::new(PREEXISTING_SNAPSHOT_ID, initial.clone(), None);
let head = StateRecord::new(snapshot_id, initial, Some(tail));
let mut state = Arc::new(Self {
head: RwLock::new(head),
policy,
id: ObjectId::default(),
weak_self: Mutex::new(None),
apply_observers: Mutex::new(Vec::new()),
});
let id = ObjectId::new(&state);
Arc::get_mut(&mut state).expect("fresh Arc").id = id;
*state.weak_self.lock().expect("Weak self lock poisoned") = Some(Arc::downgrade(&state));
state
}
pub(crate) fn add_apply_observer(&self, observer: Box<dyn Fn() + 'static>) {
self.apply_observers
.lock()
.expect("Observers lock poisoned")
.push(observer);
}
fn notify_applied(&self) {
let observers = self
.apply_observers
.lock()
.expect("Observers lock poisoned");
for observer in observers.iter() {
observer();
}
}
#[inline]
pub(crate) fn id(&self) -> ObjectId {
self.id
}
pub(crate) fn get(&self) -> T {
let snapshot = active_snapshot();
if let Some(state) = self
.weak_self
.lock()
.expect("Weak self lock poisoned")
.as_ref()
.and_then(|weak| weak.upgrade())
{
snapshot.record_read(&*state);
}
let snapshot_id = snapshot.snapshot_id();
let invalid = snapshot.invalid();
if let Some(record) = self.readable_for(snapshot_id, &invalid) {
return record.with_value(|value: &T| value.clone());
}
let fresh_snapshot = active_snapshot();
let fresh_id = fresh_snapshot.snapshot_id();
let fresh_invalid = fresh_snapshot.invalid();
if let Some(record) = self.readable_for(fresh_id, &fresh_invalid) {
return record.with_value(|value: &T| value.clone());
}
let head = self.first_record();
let mut chain_ids = Vec::new();
let mut cursor = Some(head);
while let Some(record) = cursor {
chain_ids.push((record.snapshot_id(), record.is_tombstone()));
cursor = record.next();
}
panic!(
"Reading a state that was created after the snapshot was taken or in a snapshot that has not yet been applied\n\
state={:?}, snapshot_id={}, fresh_snapshot_id={}, fresh_invalid={:?}\n\
record_chain={:?}",
self.id, snapshot_id, fresh_id, fresh_invalid, chain_ids
);
}
pub(crate) fn set(&self, new_value: T) {
#[cfg(debug_assertions)]
{
let in_handler = crate::in_event_handler();
let in_snapshot = crate::in_applied_snapshot();
if in_handler && !in_snapshot {
eprintln!(
"⚠️ WARNING: State modified in event handler without run_in_mutable_snapshot!\n\
This can cause state updates to be invisible to other contexts.\n\
Wrap your handler in run_in_mutable_snapshot() or dispatch_ui_event().\n\
State: {:?}",
self.id
);
}
}
let snapshot = active_snapshot();
if let Some(state) = self
.weak_self
.lock()
.expect("Weak self lock poisoned")
.as_ref()
.and_then(|weak| weak.upgrade())
{
let trait_object: Arc<dyn StateObject> = state.clone();
snapshot.record_write(trait_object);
}
mark_update_write(self.id);
let snapshot_id = snapshot.snapshot_id();
match &snapshot {
AnySnapshot::Global(global) => {
let mut head_guard = self.head.write().expect("State head lock poisoned");
let head = head_guard.clone();
if global.has_pending_children() {
panic!(
"SnapshotMutableState::set attempted global write while pending children {:?} exist (state {:?}, snapshot_id={})",
global.pending_children(),
self.id,
snapshot_id
);
}
let new_id = allocate_record_id();
let record = StateRecord::new(new_id, new_value, Some(head));
*head_guard = record.clone();
drop(head_guard);
advance_global_snapshot(new_id);
self.assert_chain_integrity("set(global-push)", Some(snapshot_id));
if !global.has_pending_children() {
let mut cursor = record.next();
while let Some(node) = cursor {
if !node.is_tombstone() && node.snapshot_id() != PREEXISTING_SNAPSHOT_ID {
node.clear_value();
node.set_tombstone(true);
}
cursor = node.next();
}
self.assert_chain_integrity("set(global-tombstone)", Some(snapshot_id));
}
}
AnySnapshot::Mutable(_)
| AnySnapshot::NestedMutable(_)
| AnySnapshot::TransparentMutable(_) => {
let invalid = snapshot.invalid();
let record = self.writable_record(snapshot_id, &invalid);
let equivalent =
record.with_value(|current: &T| self.policy.equivalent(current, &new_value));
if !equivalent {
record.replace_value(new_value);
}
self.assert_chain_integrity("set(child-writable)", Some(snapshot_id));
}
AnySnapshot::Readonly(_)
| AnySnapshot::NestedReadonly(_)
| AnySnapshot::TransparentReadonly(_) => {
panic!("Cannot write to a read-only snapshot");
}
}
}
}
thread_local! {
static ACTIVE_UPDATES: RefCell<HashSet<ObjectId>> = RefCell::new(HashSet::default());
static PENDING_WRITES: RefCell<HashSet<ObjectId>> = RefCell::new(HashSet::default());
}
pub(crate) struct UpdateScope {
id: ObjectId,
finished: bool,
}
impl UpdateScope {
pub(crate) fn new(id: ObjectId) -> Self {
ACTIVE_UPDATES.with(|active| {
active.borrow_mut().insert(id);
});
PENDING_WRITES.with(|pending| {
pending.borrow_mut().remove(&id);
});
Self {
id,
finished: false,
}
}
pub(crate) fn finish(mut self) -> bool {
self.finished = true;
ACTIVE_UPDATES.with(|active| {
active.borrow_mut().remove(&self.id);
});
PENDING_WRITES.with(|pending| pending.borrow_mut().remove(&self.id))
}
}
impl Drop for UpdateScope {
fn drop(&mut self) {
if self.finished {
return;
}
ACTIVE_UPDATES.with(|active| {
active.borrow_mut().remove(&self.id);
});
PENDING_WRITES.with(|pending| {
pending.borrow_mut().remove(&self.id);
});
}
}
fn mark_update_write(id: ObjectId) {
ACTIVE_UPDATES.with(|active| {
if active.borrow().contains(&id) {
PENDING_WRITES.with(|pending| {
pending.borrow_mut().insert(id);
});
}
});
}
impl<T: Clone + 'static> SnapshotMutableState<T> {
fn try_readable_record(
&self,
snapshot_id: SnapshotId,
invalid: &SnapshotIdSet,
) -> Option<Arc<StateRecord>> {
self.readable_for(snapshot_id, invalid)
}
}
impl<T: Clone + 'static> StateObject for SnapshotMutableState<T> {
fn object_id(&self) -> ObjectId {
self.id
}
fn first_record(&self) -> Arc<StateRecord> {
self.head.read().expect("State head lock poisoned").clone()
}
fn readable_record(
&self,
snapshot_id: SnapshotId,
invalid: &SnapshotIdSet,
) -> Arc<StateRecord> {
self.try_readable_record(snapshot_id, invalid)
.unwrap_or_else(|| {
panic!(
"SnapshotMutableState::readable_record returned null (state={:?}, snapshot_id={})",
self.id, snapshot_id
)
})
}
fn prepend_state_record(&self, record: Arc<StateRecord>) {
let mut head_guard = self.head.write().expect("State head lock poisoned");
let current_head = head_guard.clone();
record.set_next(Some(current_head));
*head_guard = record;
}
fn merge_records(
&self,
previous: Arc<StateRecord>,
current: Arc<StateRecord>,
applied: Arc<StateRecord>,
) -> Option<Arc<StateRecord>> {
let current_vs_applied = current.with_value(|current: &T| {
applied.with_value(|applied_value: &T| self.policy.equivalent(current, applied_value))
});
if current_vs_applied {
return Some(current);
}
previous
.with_value(|prev: &T| {
current.with_value(|current_value: &T| {
applied.with_value(|applied_value: &T| {
self.policy.merge(prev, current_value, applied_value)
})
})
})
.map(|merged| StateRecord::new(applied.snapshot_id(), merged, None))
}
fn promote_record(&self, child_id: SnapshotId) -> Result<(), &'static str> {
let head = self.first_record();
let mut cursor = Some(head);
while let Some(record) = cursor {
if record.snapshot_id() == child_id {
let cloned = record.with_value(|value: &T| value.clone());
let new_id = allocate_record_id();
let mut head_guard = self.head.write().expect("State head lock poisoned");
let current_head = head_guard.clone();
let new_head = StateRecord::new(new_id, cloned, Some(current_head));
*head_guard = new_head;
drop(head_guard);
advance_global_snapshot(new_id);
self.notify_applied();
self.assert_chain_integrity("promote_record", Some(child_id));
return Ok(());
}
cursor = record.next();
}
panic!(
"SnapshotMutableState::promote_record missing child record (state {:?}, child_id={})",
self.id, child_id
);
}
fn commit_merged_record(&self, merged: Arc<StateRecord>) -> Result<SnapshotId, &'static str> {
let value = merged.with_value(|value: &T| value.clone());
let new_id = allocate_record_id();
let mut head_guard = self.head.write().expect("State head lock poisoned");
let current_head = head_guard.clone();
let new_head = StateRecord::new(new_id, value, Some(current_head));
*head_guard = new_head;
drop(head_guard);
advance_global_snapshot(new_id);
self.notify_applied();
self.assert_chain_integrity("commit_merged_record", Some(new_id));
Ok(new_id)
}
fn overwrite_unused_records(&self) -> bool {
overwrite_unused_records_locked::<T>(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_record_chain(ids: &[SnapshotId]) -> Arc<StateRecord> {
let mut head: Option<Arc<StateRecord>> = None;
for &id in ids.iter().rev() {
head = Some(StateRecord::new(id, 0i32, head));
}
head.expect("create_record_chain called with empty ids")
}
struct ManualState {
head: Arc<StateRecord>,
}
impl ManualState {
fn new(head: Arc<StateRecord>) -> Self {
Self { head }
}
}
impl StateObject for ManualState {
fn object_id(&self) -> ObjectId {
ObjectId(999)
}
fn first_record(&self) -> Arc<StateRecord> {
Arc::clone(&self.head)
}
fn readable_record(&self, _: SnapshotId, _: &SnapshotIdSet) -> Arc<StateRecord> {
Arc::clone(&self.head)
}
fn prepend_state_record(&self, _: Arc<StateRecord>) {}
fn promote_record(&self, _: SnapshotId) -> Result<(), &'static str> {
Ok(())
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[test]
fn test_used_locked_finds_invalid_snapshot() {
let tail = StateRecord::new(PREEXISTING_SNAPSHOT_ID, 0i32, None);
let invalid_rec = StateRecord::new(INVALID_SNAPSHOT_ID, 0i32, Some(tail));
let head = StateRecord::new(10, 0i32, Some(invalid_rec.clone()));
let result = used_locked(&head);
assert!(result.is_some());
assert_eq!(result.unwrap().snapshot_id(), INVALID_SNAPSHOT_ID);
}
#[test]
fn test_used_locked_finds_obscured_record() {
crate::snapshot_pinning::reset_pinning_table();
let pin_handle = crate::snapshot_pinning::track_pinning(10, &SnapshotIdSet::EMPTY);
let oldest = StateRecord::new(2, 0i32, None);
let newer = StateRecord::new(5, 0i32, Some(oldest.clone()));
let head = StateRecord::new(100, 0i32, Some(newer));
let result = used_locked(&head);
assert!(result.is_some());
let reused = result.unwrap();
assert_eq!(
reused.snapshot_id(),
2,
"Should return the oldest obscured record"
);
crate::snapshot_pinning::release_pinning(pin_handle);
}
#[test]
fn test_used_locked_no_reusable_record() {
crate::snapshot_pinning::reset_pinning_table();
let high_id = allocate_record_id() + 1000;
let head = create_record_chain(&[high_id, high_id + 1, high_id + 2]);
let result = used_locked(&head);
assert!(
result.is_none(),
"Should find no reusable records when all are recent"
);
}
#[test]
fn test_used_locked_single_old_record() {
crate::snapshot_pinning::reset_pinning_table();
let old = StateRecord::new(2, 0i32, None);
let head = StateRecord::new(100, 0i32, Some(old));
let result = used_locked(&head);
assert!(result.is_none(), "Single old record should not be reused");
}
#[test]
fn test_readable_record_for_preexisting() {
let head = create_record_chain(&[PREEXISTING_SNAPSHOT_ID]);
let invalid = SnapshotIdSet::EMPTY;
let result = readable_record_for(&head, 10, &invalid);
assert!(result.is_some());
assert_eq!(result.unwrap().snapshot_id(), PREEXISTING_SNAPSHOT_ID);
}
#[test]
fn test_readable_record_for_picks_highest_valid() {
let head = create_record_chain(&[10, 5, PREEXISTING_SNAPSHOT_ID]);
let invalid = SnapshotIdSet::EMPTY;
let result = readable_record_for(&head, 10, &invalid);
assert!(result.is_some());
assert_eq!(result.unwrap().snapshot_id(), 10);
let result = readable_record_for(&head, 7, &invalid);
assert!(result.is_some());
assert_eq!(result.unwrap().snapshot_id(), 5);
}
#[test]
fn test_new_overwritable_record_locked_reuses_invalid() {
let state = SnapshotMutableState::new_in_arc(100i32, Arc::new(NeverEqual));
let current_head = state.first_record();
let invalid_rec = StateRecord::new(INVALID_SNAPSHOT_ID, 0i32, current_head.next());
current_head.set_next(Some(invalid_rec.clone()));
let result = new_overwritable_record_locked(&*state);
assert!(Arc::ptr_eq(&result, &invalid_rec));
assert_eq!(result.snapshot_id(), SNAPSHOT_ID_MAX);
}
#[test]
fn test_new_overwritable_record_locked_creates_new() {
crate::snapshot_pinning::reset_pinning_table();
let _pin_handle = crate::snapshot_pinning::track_pinning(1, &SnapshotIdSet::EMPTY);
let state = SnapshotMutableState::new_in_arc(100i32, Arc::new(NeverEqual));
let old_head = state.first_record();
let result = new_overwritable_record_locked(&*state);
assert_eq!(result.snapshot_id(), SNAPSHOT_ID_MAX);
let new_head = state.first_record();
assert!(
Arc::ptr_eq(&new_head, &result),
"new_head ({:p}) should equal result ({:p})",
Arc::as_ptr(&new_head),
Arc::as_ptr(&result)
);
assert!(result.next().is_some());
assert!(Arc::ptr_eq(&result.next().unwrap(), &old_head));
}
#[test]
fn test_writable_record_reuses_invalid_record() {
crate::snapshot_pinning::reset_pinning_table();
let state = SnapshotMutableState::new_in_arc(7i32, Arc::new(NeverEqual));
let head = state.first_record();
let invalid = StateRecord::new(INVALID_SNAPSHOT_ID, 0i32, head.next());
head.set_next(Some(invalid.clone()));
let snapshot_id = allocate_record_id();
let result = state.writable_record(snapshot_id, &SnapshotIdSet::EMPTY);
assert!(
Arc::ptr_eq(&result, &invalid),
"Expected writable_record to reuse the INVALID record"
);
assert_eq!(result.snapshot_id(), snapshot_id);
result.with_value(|value: &i32| {
assert_eq!(*value, 7, "Reused record should copy the readable value");
});
assert!(!result.is_tombstone());
}
#[test]
fn test_writable_record_creates_new_when_reuse_disallowed() {
crate::snapshot_pinning::reset_pinning_table();
let pin = crate::snapshot_pinning::track_pinning(1, &SnapshotIdSet::EMPTY);
let state = SnapshotMutableState::new_in_arc(42i32, Arc::new(NeverEqual));
let original_head = state.first_record();
let preexisting = original_head
.next()
.expect("preexisting record should exist for newly created state");
let snapshot_id = allocate_record_id();
let result = state.writable_record(snapshot_id, &SnapshotIdSet::EMPTY);
assert!(
!Arc::ptr_eq(&result, &original_head),
"Should not reuse the current head when reuse is disallowed"
);
assert!(
!Arc::ptr_eq(&result, &preexisting),
"Should not reuse the PREEXISTING record"
);
assert_eq!(result.snapshot_id(), snapshot_id);
result.with_value(|value: &i32| assert_eq!(*value, 42));
let new_head = state.first_record();
assert!(
Arc::ptr_eq(&new_head, &result),
"Newly created record should become the head of the chain"
);
crate::snapshot_pinning::release_pinning(pin);
}
#[test]
fn test_state_record_clear_for_reuse() {
let record = StateRecord::new(10, 42i32, None);
record.with_value(|val: &i32| {
assert_eq!(*val, 42);
});
record.clear_for_reuse();
assert_eq!(record.snapshot_id(), 10);
}
#[test]
fn test_overwrite_unused_records_no_old_records() {
crate::snapshot_pinning::reset_pinning_table();
let state = SnapshotMutableState::new_in_arc(42i32, Arc::new(NeverEqual));
let _pin = crate::snapshot_pinning::track_pinning(1, &SnapshotIdSet::EMPTY);
let should_retain = state.overwrite_unused_records();
assert!(
should_retain,
"Should retain multiple records when none are old enough"
);
let mut cursor = Some(state.first_record());
while let Some(record) = cursor {
assert_ne!(record.snapshot_id(), INVALID_SNAPSHOT_ID);
cursor = record.next();
}
}
#[test]
fn test_overwrite_unused_records_basic_cleanup() {
crate::snapshot_pinning::reset_pinning_table();
let rec1 = StateRecord::new(100, 1i32, None);
let rec2 = StateRecord::new(200, 2i32, Some(rec1.clone()));
let rec3 = StateRecord::new(300, 3i32, Some(rec2.clone()));
struct TestState {
head: Arc<StateRecord>,
}
impl StateObject for TestState {
fn object_id(&self) -> ObjectId {
ObjectId(999)
}
fn first_record(&self) -> Arc<StateRecord> {
Arc::clone(&self.head)
}
fn readable_record(&self, _: SnapshotId, _: &SnapshotIdSet) -> Arc<StateRecord> {
Arc::clone(&self.head)
}
fn prepend_state_record(&self, _: Arc<StateRecord>) {}
fn promote_record(&self, _: SnapshotId) -> Result<(), &'static str> {
Ok(())
}
fn as_any(&self) -> &dyn Any {
self
}
}
let test_state = TestState { head: rec3.clone() };
let _pin = crate::snapshot_pinning::track_pinning(1000, &SnapshotIdSet::EMPTY);
let result = overwrite_unused_records_locked::<i32>(&test_state);
assert_eq!(rec3.snapshot_id(), 300);
assert_eq!(rec2.snapshot_id(), INVALID_SNAPSHOT_ID);
assert_eq!(rec1.snapshot_id(), INVALID_SNAPSHOT_ID);
assert!(!result);
}
#[test]
fn test_overwrite_unused_records_single_record_only() {
crate::snapshot_pinning::reset_pinning_table();
let state = SnapshotMutableState::new_in_arc(42i32, Arc::new(NeverEqual));
let head = state.first_record();
head.set_next(None);
let should_retain = state.overwrite_unused_records();
assert!(!should_retain, "Single record should return false");
}
#[test]
fn test_overwrite_unused_records_clears_values() {
crate::snapshot_pinning::reset_pinning_table();
let tail = StateRecord::new(PREEXISTING_SNAPSHOT_ID, 0i32, None);
let old_rec1 = StateRecord::new(2, 999i32, Some(tail.clone()));
let old_rec2 = StateRecord::new(3, 888i32, Some(old_rec1.clone()));
let head = StateRecord::new(150, 42i32, Some(old_rec2.clone()));
let state = ManualState::new(head.clone());
old_rec1.with_value(|val: &i32| {
assert_eq!(*val, 999);
});
let _pin = crate::snapshot_pinning::track_pinning(100, &SnapshotIdSet::EMPTY);
overwrite_unused_records_locked::<i32>(&state);
assert_eq!(old_rec1.snapshot_id(), INVALID_SNAPSHOT_ID);
}
#[test]
fn test_overwrite_unused_records_mixed_old_and_new() {
crate::snapshot_pinning::reset_pinning_table();
let preexisting = StateRecord::new(PREEXISTING_SNAPSHOT_ID, 0i32, None);
let rec2 = StateRecord::new(2, 100i32, Some(preexisting.clone()));
let rec5 = StateRecord::new(5, 100i32, Some(rec2.clone()));
let rec50 = StateRecord::new(50, 100i32, Some(rec5.clone()));
let head = StateRecord::new(120, 100i32, Some(rec50.clone()));
let state = ManualState::new(head.clone());
let _pin = crate::snapshot_pinning::track_pinning(40, &SnapshotIdSet::EMPTY);
let should_retain = overwrite_unused_records_locked::<i32>(&state);
assert!(should_retain);
assert_eq!(rec50.snapshot_id(), 50);
assert_eq!(rec5.snapshot_id(), 5);
assert_eq!(rec2.snapshot_id(), INVALID_SNAPSHOT_ID);
}
#[test]
fn test_readable_record_for_skips_invalid_set() {
let head = create_record_chain(&[10, 5, PREEXISTING_SNAPSHOT_ID]);
let invalid = SnapshotIdSet::new().set(5);
let result = readable_record_for(&head, 10, &invalid);
assert!(result.is_some());
assert_eq!(result.unwrap().snapshot_id(), 10);
let result = readable_record_for(&head, 7, &invalid);
assert!(result.is_some());
assert_eq!(result.unwrap().snapshot_id(), PREEXISTING_SNAPSHOT_ID);
}
#[test]
fn test_assign_value_copies_int() {
let source = StateRecord::new(10, 42i32, None);
let target = StateRecord::new(20, 0i32, None);
target.assign_value::<i32>(&source);
target.with_value(|val: &i32| {
assert_eq!(*val, 42);
});
source.with_value(|val: &i32| {
assert_eq!(*val, 42);
});
assert_eq!(source.snapshot_id(), 10);
assert_eq!(target.snapshot_id(), 20);
}
#[test]
fn test_assign_value_copies_string() {
let source = StateRecord::new(10, "hello".to_string(), None);
let target = StateRecord::new(20, "world".to_string(), None);
target.assign_value::<String>(&source);
target.with_value(|val: &String| {
assert_eq!(val, "hello");
});
source.with_value(|val: &String| {
assert_eq!(val, "hello");
});
}
#[test]
#[should_panic(expected = "StateRecord value missing or wrong type")]
fn test_assign_value_copies_from_cleared_source_panics() {
let source = StateRecord::new(10, 42i32, None);
let target = StateRecord::new(20, 0i32, None);
source.clear_value();
target.assign_value::<i32>(&source);
}
#[test]
fn test_assign_value_overwrites_existing_value() {
let source = StateRecord::new(10, 100i32, None);
let target = StateRecord::new(20, 999i32, None);
target.with_value(|val: &i32| {
assert_eq!(*val, 999);
});
target.assign_value::<i32>(&source);
target.with_value(|val: &i32| {
assert_eq!(*val, 100);
});
}
#[test]
fn test_assign_value_with_custom_type() {
#[derive(Clone, PartialEq, Debug)]
struct Point {
x: f64,
y: f64,
}
let source = StateRecord::new(10, Point { x: 1.5, y: 2.5 }, None);
let target = StateRecord::new(20, Point { x: 0.0, y: 0.0 }, None);
target.assign_value::<Point>(&source);
target.with_value(|val: &Point| {
assert_eq!(val, &Point { x: 1.5, y: 2.5 });
});
}
#[test]
fn test_assign_value_self_assignment() {
let record = StateRecord::new(10, 42i32, None);
record.assign_value::<i32>(&record);
record.with_value(|val: &i32| {
assert_eq!(*val, 42);
});
}
#[test]
fn test_assign_value_with_vec() {
let source = StateRecord::new(10, vec![1, 2, 3, 4, 5], None);
let target = StateRecord::new(20, Vec::<i32>::new(), None);
target.assign_value::<Vec<i32>>(&source);
target.with_value(|val: &Vec<i32>| {
assert_eq!(val, &vec![1, 2, 3, 4, 5]);
});
source.replace_value(vec![10, 20]);
target.with_value(|val: &Vec<i32>| {
assert_eq!(val, &vec![1, 2, 3, 4, 5]);
});
}
}