Skip to main content

grafeo_common/
mvcc.rs

1//! MVCC (Multi-Version Concurrency Control) primitives.
2//!
3//! This is how Grafeo handles concurrent reads and writes without blocking.
4//! Each entity has a [`VersionChain`] that tracks all versions. Readers see
5//! consistent snapshots, writers create new versions, and old versions get
6//! garbage collected when no one needs them anymore.
7
8use std::collections::VecDeque;
9
10#[cfg(feature = "tiered-storage")]
11use smallvec::SmallVec;
12
13use crate::types::{EpochId, TransactionId};
14
15/// Tracks when a version was created and deleted for visibility checks.
16#[derive(Debug, Clone, Copy)]
17pub struct VersionInfo {
18    /// The epoch this version was created in.
19    pub created_epoch: EpochId,
20    /// The epoch this version was deleted in (if any).
21    pub deleted_epoch: Option<EpochId>,
22    /// The transaction that created this version.
23    pub created_by: TransactionId,
24    /// The transaction that deleted this version (if any).
25    /// Used for rollback: only the deleting transaction's rollback can undelete.
26    pub deleted_by: Option<TransactionId>,
27}
28
29impl VersionInfo {
30    /// Creates a new version info.
31    #[must_use]
32    pub fn new(created_epoch: EpochId, created_by: TransactionId) -> Self {
33        Self {
34            created_epoch,
35            deleted_epoch: None,
36            created_by,
37            deleted_by: None,
38        }
39    }
40
41    /// Marks this version as deleted by a specific transaction.
42    pub fn mark_deleted(&mut self, epoch: EpochId, deleted_by: TransactionId) {
43        self.deleted_epoch = Some(epoch);
44        self.deleted_by = Some(deleted_by);
45    }
46
47    /// Unmarks deletion if deleted by the given transaction. Returns `true` if undeleted.
48    ///
49    /// Used during rollback to restore versions deleted within the rolled-back transaction.
50    pub fn unmark_deleted_by(&mut self, transaction_id: TransactionId) -> bool {
51        if self.deleted_by == Some(transaction_id) {
52            self.deleted_epoch = None;
53            self.deleted_by = None;
54            true
55        } else {
56            false
57        }
58    }
59
60    /// Checks if this version is visible at the given epoch.
61    #[inline]
62    #[must_use]
63    pub fn is_visible_at(&self, epoch: EpochId) -> bool {
64        // Visible if created before or at the viewing epoch
65        // and not deleted before the viewing epoch
66        if !self.created_epoch.is_visible_at(epoch) {
67            return false;
68        }
69
70        if let Some(deleted) = self.deleted_epoch {
71            // Not visible if deleted at or before the viewing epoch
72            deleted.as_u64() > epoch.as_u64()
73        } else {
74            true
75        }
76    }
77
78    /// Checks if this version is visible to a specific transaction.
79    ///
80    /// A version is visible to a transaction if:
81    /// 1. It was created by the same transaction and not deleted by self, OR
82    /// 2. It was created in an epoch before the transaction's start epoch
83    ///    and not deleted before that epoch, and not deleted by this transaction
84    #[inline]
85    #[must_use]
86    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TransactionId) -> bool {
87        // If this version was deleted by the viewing transaction, it is not visible
88        if self.deleted_by == Some(viewing_tx) {
89            return false;
90        }
91
92        // Own modifications are always visible (if not deleted)
93        if self.created_by == viewing_tx {
94            return self.deleted_epoch.is_none();
95        }
96
97        // Otherwise, use epoch-based visibility
98        self.is_visible_at(viewing_epoch)
99    }
100}
101
102/// A single version of data.
103#[derive(Debug, Clone)]
104pub struct Version<T> {
105    /// Visibility metadata.
106    pub info: VersionInfo,
107    /// The actual data.
108    pub data: T,
109}
110
111impl<T> Version<T> {
112    /// Creates a new version.
113    #[must_use]
114    pub fn new(data: T, created_epoch: EpochId, created_by: TransactionId) -> Self {
115        Self {
116            info: VersionInfo::new(created_epoch, created_by),
117            data,
118        }
119    }
120}
121
122/// All versions of a single entity, newest first.
123///
124/// Each node/edge has one of these tracking its version history. Use
125/// [`visible_at()`](Self::visible_at) to get the version at a specific epoch,
126/// or [`visible_to()`](Self::visible_to) for transaction-aware visibility.
127#[derive(Debug, Clone)]
128pub struct VersionChain<T> {
129    /// Versions ordered newest-first.
130    versions: VecDeque<Version<T>>,
131}
132
133impl<T> VersionChain<T> {
134    /// Creates a new empty version chain.
135    #[must_use]
136    pub fn new() -> Self {
137        Self {
138            versions: VecDeque::new(),
139        }
140    }
141
142    /// Creates a version chain with an initial version.
143    #[must_use]
144    pub fn with_initial(data: T, created_epoch: EpochId, created_by: TransactionId) -> Self {
145        Self {
146            versions: VecDeque::from(vec![Version::new(data, created_epoch, created_by)]),
147        }
148    }
149
150    /// Adds a new version to the chain.
151    ///
152    /// The new version becomes the head of the chain.
153    pub fn add_version(&mut self, data: T, created_epoch: EpochId, created_by: TransactionId) {
154        let version = Version::new(data, created_epoch, created_by);
155        self.versions.push_front(version);
156    }
157
158    /// Finds the version visible at the given epoch.
159    ///
160    /// Returns a reference to the visible version's data, or `None` if no version
161    /// is visible at that epoch.
162    #[inline]
163    #[must_use]
164    pub fn visible_at(&self, epoch: EpochId) -> Option<&T> {
165        self.versions
166            .iter()
167            .find(|v| v.info.is_visible_at(epoch))
168            .map(|v| &v.data)
169    }
170
171    /// Finds the version visible to a specific transaction.
172    ///
173    /// This considers both the transaction's epoch and its own uncommitted changes.
174    #[inline]
175    #[must_use]
176    pub fn visible_to(&self, epoch: EpochId, tx: TransactionId) -> Option<&T> {
177        self.versions
178            .iter()
179            .find(|v| v.info.is_visible_to(epoch, tx))
180            .map(|v| &v.data)
181    }
182
183    /// Marks the current visible version as deleted by a specific transaction.
184    ///
185    /// Returns `true` if a version was marked, `false` if no visible version exists.
186    pub fn mark_deleted(&mut self, delete_epoch: EpochId, deleted_by: TransactionId) -> bool {
187        for version in &mut self.versions {
188            if version.info.deleted_epoch.is_none() {
189                version.info.mark_deleted(delete_epoch, deleted_by);
190                return true;
191            }
192        }
193        false
194    }
195
196    /// Unmarks deletion for versions deleted by the given transaction.
197    ///
198    /// Returns `true` if any version was undeleted. Used during rollback to
199    /// restore versions that were deleted within the rolled-back transaction.
200    pub fn unmark_deleted_by(&mut self, tx: TransactionId) -> bool {
201        let mut any_undeleted = false;
202        for version in &mut self.versions {
203            if version.info.unmark_deleted_by(tx) {
204                any_undeleted = true;
205            }
206        }
207        any_undeleted
208    }
209
210    /// Checks if any version was modified by the given transaction.
211    #[must_use]
212    pub fn modified_by(&self, tx: TransactionId) -> bool {
213        self.versions.iter().any(|v| v.info.created_by == tx)
214    }
215
216    /// Checks if any version was deleted by the given transaction.
217    #[must_use]
218    pub fn deleted_by(&self, tx: TransactionId) -> bool {
219        self.versions.iter().any(|v| v.info.deleted_by == Some(tx))
220    }
221
222    /// Removes all versions created by the given transaction.
223    ///
224    /// Used for rollback to discard uncommitted changes.
225    pub fn remove_versions_by(&mut self, tx: TransactionId) {
226        self.versions.retain(|v| v.info.created_by != tx);
227    }
228
229    /// Finalizes PENDING epochs for versions created by the given transaction.
230    ///
231    /// Called at commit time to make uncommitted versions visible at the
232    /// real commit epoch instead of `EpochId::PENDING`.
233    pub fn finalize_epochs(&mut self, transaction_id: TransactionId, commit_epoch: EpochId) {
234        for version in &mut self.versions {
235            if version.info.created_by == transaction_id
236                && version.info.created_epoch == EpochId::PENDING
237            {
238                version.info.created_epoch = commit_epoch;
239            }
240        }
241    }
242
243    /// Checks if there's a concurrent modification conflict.
244    ///
245    /// A conflict exists if another transaction modified this entity
246    /// after our start epoch.
247    #[must_use]
248    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TransactionId) -> bool {
249        self.versions.iter().any(|v| {
250            v.info.created_by != our_tx && v.info.created_epoch.as_u64() > start_epoch.as_u64()
251        })
252    }
253
254    /// Returns the number of versions in the chain.
255    #[must_use]
256    pub fn version_count(&self) -> usize {
257        self.versions.len()
258    }
259
260    /// Returns true if the chain has no versions.
261    #[must_use]
262    pub fn is_empty(&self) -> bool {
263        self.versions.is_empty()
264    }
265
266    /// Garbage collects old versions that are no longer visible to any transaction.
267    ///
268    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
269    pub fn gc(&mut self, min_epoch: EpochId) {
270        if self.versions.is_empty() {
271            return;
272        }
273
274        let mut keep_count = 0;
275        let mut found_old_visible = false;
276
277        for (i, version) in self.versions.iter().enumerate() {
278            if version.info.created_epoch.as_u64() >= min_epoch.as_u64() {
279                keep_count = i + 1;
280            } else if !found_old_visible {
281                // Keep the first (most recent) old version
282                found_old_visible = true;
283                keep_count = i + 1;
284            }
285        }
286
287        self.versions.truncate(keep_count);
288    }
289
290    /// Returns an iterator over all versions with their metadata, newest first.
291    ///
292    /// Each item is `(&VersionInfo, &T)` giving both the visibility metadata
293    /// and a reference to the version data.
294    pub fn history(&self) -> impl Iterator<Item = (&VersionInfo, &T)> {
295        self.versions.iter().map(|v| (&v.info, &v.data))
296    }
297
298    /// Returns a reference to the latest version's data regardless of visibility.
299    #[must_use]
300    pub fn latest(&self) -> Option<&T> {
301        self.versions.front().map(|v| &v.data)
302    }
303
304    /// Returns a mutable reference to the latest version's data.
305    #[must_use]
306    pub fn latest_mut(&mut self) -> Option<&mut T> {
307        self.versions.front_mut().map(|v| &mut v.data)
308    }
309
310    /// Returns estimated heap memory in bytes for this version chain.
311    ///
312    /// Counts the `Vec` capacity overhead. Does not include the
313    /// size of `T` payloads (the caller accounts for those).
314    #[must_use]
315    pub fn heap_memory_bytes(&self) -> usize {
316        self.versions.capacity() * std::mem::size_of::<Version<T>>()
317    }
318}
319
320impl<T> Default for VersionChain<T> {
321    fn default() -> Self {
322        Self::new()
323    }
324}
325
326impl<T: Clone> VersionChain<T> {
327    /// Gets a mutable reference to the visible version's data for modification.
328    ///
329    /// If the version is not owned by this transaction, creates a new version
330    /// with a copy of the data.
331    pub fn get_mut(
332        &mut self,
333        epoch: EpochId,
334        tx: TransactionId,
335        modify_epoch: EpochId,
336    ) -> Option<&mut T> {
337        // Find the visible version
338        let visible_idx = self
339            .versions
340            .iter()
341            .position(|v| v.info.is_visible_to(epoch, tx))?;
342
343        let visible = &self.versions[visible_idx];
344
345        if visible.info.created_by == tx {
346            // Already our version, modify in place
347            Some(&mut self.versions[visible_idx].data)
348        } else {
349            // Create a new version with copied data
350            let new_data = visible.data.clone();
351            self.add_version(new_data, modify_epoch, tx);
352            Some(&mut self.versions[0].data)
353        }
354    }
355}
356
357// ============================================================================
358// Tiered Storage Types (Phase 13)
359// ============================================================================
360//
361// These types support the tiered hot/cold storage architecture where version
362// metadata is stored separately from version data. Data lives in arenas (hot)
363// or compressed epoch blocks (cold), while VersionIndex holds lightweight refs.
364
365/// Compact representation of an optional epoch ID.
366///
367/// Uses `u32::MAX` as sentinel for `None`, allowing epochs up to ~4 billion.
368/// This saves 4 bytes compared to `Option<EpochId>` due to niche optimization.
369#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
370#[repr(transparent)]
371#[cfg(feature = "tiered-storage")]
372pub struct OptionalEpochId(u32);
373
374#[cfg(feature = "tiered-storage")]
375impl OptionalEpochId {
376    /// Represents no epoch (deleted_epoch = None).
377    pub const NONE: Self = Self(u32::MAX);
378
379    /// Creates an `OptionalEpochId` from an epoch.
380    ///
381    /// # Panics
382    /// Panics if epoch exceeds u32::MAX - 1 (4,294,967,294).
383    #[must_use]
384    pub fn some(epoch: EpochId) -> Self {
385        assert!(
386            epoch.as_u64() < u64::from(u32::MAX),
387            "epoch {} exceeds OptionalEpochId capacity (max {})",
388            epoch.as_u64(),
389            u32::MAX as u64 - 1
390        );
391        // reason: the assert above guarantees epoch < u32::MAX
392        #[allow(clippy::cast_possible_truncation)]
393        Self(epoch.as_u64() as u32)
394    }
395
396    /// Returns the contained epoch, or `None` if this is `NONE`.
397    #[inline]
398    #[must_use]
399    pub fn get(self) -> Option<EpochId> {
400        if self.0 == u32::MAX {
401            None
402        } else {
403            Some(EpochId::new(u64::from(self.0)))
404        }
405    }
406
407    /// Returns `true` if this contains an epoch.
408    #[must_use]
409    pub fn is_some(self) -> bool {
410        self.0 != u32::MAX
411    }
412
413    /// Returns `true` if this is `NONE`.
414    #[inline]
415    #[must_use]
416    pub fn is_none(self) -> bool {
417        self.0 == u32::MAX
418    }
419}
420
421/// Reference to a hot (arena-allocated) version.
422///
423/// Hot versions are stored in the epoch's arena and can be accessed directly.
424/// This struct only holds metadata; the actual data lives in the arena.
425///
426/// # Memory Layout
427/// - `epoch`: 8 bytes
428/// - `arena_epoch`: 8 bytes
429/// - `arena_offset`: 4 bytes
430/// - `created_by`: 8 bytes
431/// - `deleted_epoch`: 4 bytes
432/// - Total: 32 bytes
433#[derive(Debug, Clone, Copy, PartialEq, Eq)]
434#[cfg(feature = "tiered-storage")]
435pub struct HotVersionRef {
436    /// Epoch used for MVCC visibility checks.
437    ///
438    /// Set to `EpochId::PENDING` for uncommitted transactional versions,
439    /// then finalized to the real commit epoch on commit.
440    pub epoch: EpochId,
441    /// Epoch whose arena holds the actual data.
442    ///
443    /// Always the real epoch (never PENDING), used for arena lookups.
444    pub arena_epoch: EpochId,
445    /// Offset within the epoch's arena where the data is stored.
446    pub arena_offset: u32,
447    /// Transaction that created this version.
448    pub created_by: TransactionId,
449    /// Epoch when this version was deleted (NONE if still alive).
450    pub deleted_epoch: OptionalEpochId,
451    /// Transaction that deleted this version (for rollback support).
452    pub deleted_by: Option<TransactionId>,
453}
454
455#[cfg(feature = "tiered-storage")]
456impl HotVersionRef {
457    /// Creates a new hot version reference.
458    ///
459    /// `epoch` is used for MVCC visibility (may be `PENDING` for uncommitted versions).
460    /// `arena_epoch` is the real epoch whose arena holds the data.
461    #[must_use]
462    pub fn new(
463        epoch: EpochId,
464        arena_epoch: EpochId,
465        arena_offset: u32,
466        created_by: TransactionId,
467    ) -> Self {
468        Self {
469            epoch,
470            arena_epoch,
471            arena_offset,
472            created_by,
473            deleted_epoch: OptionalEpochId::NONE,
474            deleted_by: None,
475        }
476    }
477
478    /// Marks this version as deleted by a specific transaction.
479    pub fn mark_deleted(&mut self, epoch: EpochId, deleted_by: TransactionId) {
480        self.deleted_epoch = OptionalEpochId::some(epoch);
481        self.deleted_by = Some(deleted_by);
482    }
483
484    /// Unmarks deletion if deleted by the given transaction. Returns `true` if undeleted.
485    pub fn unmark_deleted_by(&mut self, transaction_id: TransactionId) -> bool {
486        if self.deleted_by == Some(transaction_id) {
487            self.deleted_epoch = OptionalEpochId::NONE;
488            self.deleted_by = None;
489            true
490        } else {
491            false
492        }
493    }
494
495    /// Checks if this version is visible at the given epoch.
496    #[inline]
497    #[must_use]
498    pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
499        // Must be created at or before the viewing epoch
500        if !self.epoch.is_visible_at(viewing_epoch) {
501            return false;
502        }
503        // Must not be deleted at or before the viewing epoch
504        match self.deleted_epoch.get() {
505            Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
506            None => true,
507        }
508    }
509
510    /// Checks if this version is visible to a specific transaction.
511    #[inline]
512    #[must_use]
513    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TransactionId) -> bool {
514        // If deleted by the viewing transaction, not visible
515        if self.deleted_by == Some(viewing_tx) {
516            return false;
517        }
518        // Own modifications are always visible (if not deleted)
519        if self.created_by == viewing_tx {
520            return self.deleted_epoch.is_none();
521        }
522        // Otherwise use epoch-based visibility
523        self.is_visible_at(viewing_epoch)
524    }
525}
526
527/// Reference to a cold (compressed) version.
528///
529/// Cold versions are stored in compressed epoch blocks. This is a placeholder
530/// for Phase 14 - the actual compression logic will be implemented there.
531///
532/// # Memory Layout
533/// - `epoch`: 8 bytes
534/// - `block_offset`: 4 bytes
535/// - `length`: 2 bytes
536/// - `created_by`: 8 bytes
537/// - `deleted_epoch`: 4 bytes
538/// - Total: 26 bytes (+ 6 padding = 32 bytes aligned)
539#[derive(Debug, Clone, Copy, PartialEq, Eq)]
540#[cfg(feature = "tiered-storage")]
541pub struct ColdVersionRef {
542    /// Epoch when this version was created.
543    pub epoch: EpochId,
544    /// Offset within the compressed epoch block.
545    pub block_offset: u32,
546    /// Compressed length in bytes.
547    pub length: u16,
548    /// Transaction that created this version.
549    pub created_by: TransactionId,
550    /// Epoch when this version was deleted.
551    pub deleted_epoch: OptionalEpochId,
552    /// Transaction that deleted this version (for rollback support).
553    pub deleted_by: Option<TransactionId>,
554}
555
556#[cfg(feature = "tiered-storage")]
557impl ColdVersionRef {
558    /// Checks if this version is visible at the given epoch.
559    #[inline]
560    #[must_use]
561    pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
562        if !self.epoch.is_visible_at(viewing_epoch) {
563            return false;
564        }
565        match self.deleted_epoch.get() {
566            Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
567            None => true,
568        }
569    }
570
571    /// Checks if this version is visible to a specific transaction.
572    #[inline]
573    #[must_use]
574    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TransactionId) -> bool {
575        // If deleted by the viewing transaction, not visible
576        if self.deleted_by == Some(viewing_tx) {
577            return false;
578        }
579        if self.created_by == viewing_tx {
580            return self.deleted_epoch.is_none();
581        }
582        self.is_visible_at(viewing_epoch)
583    }
584}
585
586/// Unified reference to either a hot or cold version.
587#[derive(Debug, Clone, Copy)]
588#[cfg(feature = "tiered-storage")]
589#[non_exhaustive]
590pub enum VersionRef {
591    /// Version data is in arena (hot tier).
592    Hot(HotVersionRef),
593    /// Version data is in compressed storage (cold tier).
594    Cold(ColdVersionRef),
595}
596
597#[cfg(feature = "tiered-storage")]
598impl VersionRef {
599    /// Returns the epoch when this version was created.
600    #[must_use]
601    pub fn epoch(&self) -> EpochId {
602        match self {
603            Self::Hot(h) => h.epoch,
604            Self::Cold(c) => c.epoch,
605        }
606    }
607
608    /// Returns the transaction that created this version.
609    #[must_use]
610    pub fn created_by(&self) -> TransactionId {
611        match self {
612            Self::Hot(h) => h.created_by,
613            Self::Cold(c) => c.created_by,
614        }
615    }
616
617    /// Returns `true` if this is a hot version.
618    #[must_use]
619    pub fn is_hot(&self) -> bool {
620        matches!(self, Self::Hot(_))
621    }
622
623    /// Returns `true` if this is a cold version.
624    #[must_use]
625    pub fn is_cold(&self) -> bool {
626        matches!(self, Self::Cold(_))
627    }
628
629    /// Returns the epoch when this version was deleted, if any.
630    #[must_use]
631    pub fn deleted_epoch(&self) -> Option<EpochId> {
632        match self {
633            Self::Hot(h) => h.deleted_epoch.get(),
634            Self::Cold(c) => c.deleted_epoch.get(),
635        }
636    }
637}
638
639/// Tiered version index - replaces `VersionChain<T>` for hot/cold storage.
640///
641/// Instead of storing data inline, `VersionIndex` holds lightweight references
642/// to data stored in arenas (hot) or compressed blocks (cold). This enables:
643///
644/// - **No heap allocation** for typical 1-2 version case (SmallVec inline)
645/// - **Separation of metadata and data** for compression
646/// - **Fast visibility checks** via cached `latest_epoch`
647/// - **O(1) epoch drops** instead of per-version deallocation
648///
649/// # Memory Layout
650/// - `hot`: SmallVec<[HotVersionRef; 2]> ≈ 56 bytes inline
651/// - `cold`: SmallVec<[ColdVersionRef; 4]> ≈ 136 bytes inline
652/// - `latest_epoch`: 8 bytes
653/// - Total: ~200 bytes, no heap for typical case
654#[derive(Debug, Clone)]
655#[cfg(feature = "tiered-storage")]
656pub struct VersionIndex {
657    /// Hot versions in arena storage (most recent first).
658    hot: SmallVec<[HotVersionRef; 2]>,
659    /// Cold versions in compressed storage (most recent first).
660    cold: SmallVec<[ColdVersionRef; 4]>,
661    /// Cached epoch of the latest version for fast staleness checks.
662    latest_epoch: EpochId,
663}
664
665#[cfg(feature = "tiered-storage")]
666impl VersionIndex {
667    /// Creates a new empty version index.
668    #[must_use]
669    pub fn new() -> Self {
670        Self {
671            hot: SmallVec::new(),
672            cold: SmallVec::new(),
673            latest_epoch: EpochId::INITIAL,
674        }
675    }
676
677    /// Creates a version index with an initial hot version.
678    #[must_use]
679    pub fn with_initial(hot_ref: HotVersionRef) -> Self {
680        let mut index = Self::new();
681        index.add_hot(hot_ref);
682        index
683    }
684
685    /// Adds a new hot version (becomes the latest).
686    pub fn add_hot(&mut self, hot_ref: HotVersionRef) {
687        // Insert at front (most recent first)
688        self.hot.insert(0, hot_ref);
689        self.latest_epoch = hot_ref.epoch;
690    }
691
692    /// Returns the latest epoch for quick staleness checks.
693    #[must_use]
694    pub fn latest_epoch(&self) -> EpochId {
695        self.latest_epoch
696    }
697
698    /// Returns `true` if this entity has no versions.
699    #[must_use]
700    pub fn is_empty(&self) -> bool {
701        self.hot.is_empty() && self.cold.is_empty()
702    }
703
704    /// Returns the total version count (hot + cold).
705    #[must_use]
706    pub fn version_count(&self) -> usize {
707        self.hot.len() + self.cold.len()
708    }
709
710    /// Returns the number of hot versions.
711    #[must_use]
712    pub fn hot_count(&self) -> usize {
713        self.hot.len()
714    }
715
716    /// Returns the number of cold versions.
717    #[must_use]
718    pub fn cold_count(&self) -> usize {
719        self.cold.len()
720    }
721
722    /// Finds the version visible at the given epoch.
723    #[inline]
724    #[must_use]
725    pub fn visible_at(&self, epoch: EpochId) -> Option<VersionRef> {
726        // Check hot versions first (most recent first, likely case)
727        for v in &self.hot {
728            if v.is_visible_at(epoch) {
729                return Some(VersionRef::Hot(*v));
730            }
731        }
732        // Fall back to cold versions
733        for v in &self.cold {
734            if v.is_visible_at(epoch) {
735                return Some(VersionRef::Cold(*v));
736            }
737        }
738        None
739    }
740
741    /// Finds the version visible to a specific transaction.
742    #[inline]
743    #[must_use]
744    pub fn visible_to(&self, epoch: EpochId, tx: TransactionId) -> Option<VersionRef> {
745        // Check hot versions first
746        for v in &self.hot {
747            if v.is_visible_to(epoch, tx) {
748                return Some(VersionRef::Hot(*v));
749            }
750        }
751        // Fall back to cold versions
752        for v in &self.cold {
753            if v.is_visible_to(epoch, tx) {
754                return Some(VersionRef::Cold(*v));
755            }
756        }
757        None
758    }
759
760    /// Marks the currently visible version as deleted by a specific transaction.
761    ///
762    /// Returns `true` if a version was marked, `false` if no visible version exists.
763    pub fn mark_deleted(&mut self, delete_epoch: EpochId, deleted_by: TransactionId) -> bool {
764        // Find the first non-deleted hot version and mark it
765        for v in &mut self.hot {
766            if v.deleted_epoch.is_none() {
767                v.mark_deleted(delete_epoch, deleted_by);
768                return true;
769            }
770        }
771        // Check cold versions (rare case)
772        for v in &mut self.cold {
773            if v.deleted_epoch.is_none() {
774                v.deleted_epoch = OptionalEpochId::some(delete_epoch);
775                v.deleted_by = Some(deleted_by);
776                return true;
777            }
778        }
779        false
780    }
781
782    /// Unmarks deletion for versions deleted by the given transaction.
783    ///
784    /// Returns `true` if any version was undeleted. Used during rollback.
785    pub fn unmark_deleted_by(&mut self, tx: TransactionId) -> bool {
786        let mut any_undeleted = false;
787        for v in &mut self.hot {
788            if v.unmark_deleted_by(tx) {
789                any_undeleted = true;
790            }
791        }
792        for v in &mut self.cold {
793            if v.deleted_by == Some(tx) {
794                v.deleted_epoch = OptionalEpochId::NONE;
795                v.deleted_by = None;
796                any_undeleted = true;
797            }
798        }
799        any_undeleted
800    }
801
802    /// Checks if any version was created by the given transaction.
803    #[must_use]
804    pub fn modified_by(&self, tx: TransactionId) -> bool {
805        self.hot.iter().any(|v| v.created_by == tx) || self.cold.iter().any(|v| v.created_by == tx)
806    }
807
808    /// Checks if any version was deleted by the given transaction.
809    #[must_use]
810    pub fn deleted_by(&self, tx: TransactionId) -> bool {
811        self.hot.iter().any(|v| v.deleted_by == Some(tx))
812            || self.cold.iter().any(|v| v.deleted_by == Some(tx))
813    }
814
815    /// Removes all versions created by the given transaction (for rollback).
816    pub fn remove_versions_by(&mut self, tx: TransactionId) {
817        self.hot.retain(|v| v.created_by != tx);
818        self.cold.retain(|v| v.created_by != tx);
819        self.recalculate_latest_epoch();
820    }
821
822    /// Finalizes PENDING epochs for hot versions created by the given transaction.
823    ///
824    /// Called at commit time to make uncommitted versions visible at the
825    /// real commit epoch instead of `EpochId::PENDING`.
826    pub fn finalize_epochs(&mut self, transaction_id: TransactionId, commit_epoch: EpochId) {
827        for v in &mut self.hot {
828            if v.created_by == transaction_id && v.epoch == EpochId::PENDING {
829                v.epoch = commit_epoch;
830            }
831        }
832        self.recalculate_latest_epoch();
833    }
834
835    /// Checks for write conflict with concurrent transaction.
836    ///
837    /// A conflict exists if another transaction modified this entity
838    /// after our start epoch.
839    #[must_use]
840    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TransactionId) -> bool {
841        self.hot
842            .iter()
843            .any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
844            || self
845                .cold
846                .iter()
847                .any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
848    }
849
850    /// Garbage collects old versions not needed by any active transaction.
851    ///
852    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
853    pub fn gc(&mut self, min_epoch: EpochId) {
854        if self.is_empty() {
855            return;
856        }
857
858        // Keep versions that:
859        // 1. Were created at or after min_epoch
860        // 2. The first (most recent) version created before min_epoch
861        let mut found_old_visible = false;
862
863        self.hot.retain(|v| {
864            if v.epoch.as_u64() >= min_epoch.as_u64() {
865                true
866            } else if !found_old_visible {
867                found_old_visible = true;
868                true
869            } else {
870                false
871            }
872        });
873
874        // Same for cold, but only if we haven't found an old visible in hot
875        if !found_old_visible {
876            self.cold.retain(|v| {
877                if v.epoch.as_u64() >= min_epoch.as_u64() {
878                    true
879                } else if !found_old_visible {
880                    found_old_visible = true;
881                    true
882                } else {
883                    false
884                }
885            });
886        } else {
887            // All cold versions are older, only keep those >= min_epoch
888            self.cold.retain(|v| v.epoch.as_u64() >= min_epoch.as_u64());
889        }
890    }
891
892    /// Returns epoch IDs of all versions, newest first.
893    ///
894    /// Combines hot and cold versions, sorted by epoch descending.
895    #[must_use]
896    pub fn version_epochs(&self) -> Vec<EpochId> {
897        let mut epochs: Vec<EpochId> = self
898            .hot
899            .iter()
900            .map(|v| v.epoch)
901            .chain(self.cold.iter().map(|v| v.epoch))
902            .collect();
903        epochs.sort_by_key(|e| std::cmp::Reverse(e.as_u64()));
904        epochs
905    }
906
907    /// Returns all version references with their metadata, newest first.
908    ///
909    /// Each item is `(EpochId, Option<EpochId>, VersionRef)` giving
910    /// the created epoch, deleted epoch, and a reference to read the data.
911    #[must_use]
912    pub fn version_history(&self) -> Vec<(EpochId, Option<EpochId>, VersionRef)> {
913        let mut versions: Vec<(EpochId, Option<EpochId>, VersionRef)> = self
914            .hot
915            .iter()
916            .map(|v| (v.epoch, v.deleted_epoch.get(), VersionRef::Hot(*v)))
917            .chain(
918                self.cold
919                    .iter()
920                    .map(|v| (v.epoch, v.deleted_epoch.get(), VersionRef::Cold(*v))),
921            )
922            .collect();
923        versions.sort_by_key(|v| std::cmp::Reverse(v.0.as_u64()));
924        versions
925    }
926
927    /// Returns a reference to the latest version regardless of visibility.
928    #[must_use]
929    pub fn latest(&self) -> Option<VersionRef> {
930        self.hot
931            .first()
932            .map(|v| VersionRef::Hot(*v))
933            .or_else(|| self.cold.first().map(|v| VersionRef::Cold(*v)))
934    }
935
936    /// Freezes hot versions for a given epoch into cold storage.
937    ///
938    /// This is called when an epoch is no longer needed by any active transaction.
939    /// The actual compression happens in Phase 14; for now this just moves refs.
940    pub fn freeze_epoch(
941        &mut self,
942        epoch: EpochId,
943        cold_refs: impl Iterator<Item = ColdVersionRef>,
944    ) {
945        // Remove hot refs for this epoch
946        self.hot.retain(|v| v.epoch != epoch);
947
948        // Add cold refs
949        self.cold.extend(cold_refs);
950
951        // Keep cold sorted by epoch (descending = most recent first)
952        self.cold
953            .sort_by(|a, b| b.epoch.as_u64().cmp(&a.epoch.as_u64()));
954
955        self.recalculate_latest_epoch();
956    }
957
958    /// Returns hot version refs for a specific epoch (for freeze operation).
959    pub fn hot_refs_for_epoch(&self, epoch: EpochId) -> impl Iterator<Item = &HotVersionRef> {
960        self.hot.iter().filter(move |v| v.epoch == epoch)
961    }
962
963    /// Returns `true` if the hot SmallVec has spilled to the heap.
964    #[must_use]
965    pub fn hot_spilled(&self) -> bool {
966        self.hot.spilled()
967    }
968
969    /// Returns `true` if the cold SmallVec has spilled to the heap.
970    #[must_use]
971    pub fn cold_spilled(&self) -> bool {
972        self.cold.spilled()
973    }
974
975    fn recalculate_latest_epoch(&mut self) {
976        self.latest_epoch = self
977            .hot
978            .first()
979            .map(|v| v.epoch)
980            .or_else(|| self.cold.first().map(|v| v.epoch))
981            .unwrap_or(EpochId::INITIAL);
982    }
983}
984
985#[cfg(feature = "tiered-storage")]
986impl Default for VersionIndex {
987    fn default() -> Self {
988        Self::new()
989    }
990}
991
992#[cfg(test)]
993mod tests {
994    use super::*;
995
996    #[test]
997    fn test_version_visibility() {
998        let v = VersionInfo::new(EpochId::new(5), TransactionId::new(1));
999
1000        // Not visible before creation
1001        assert!(!v.is_visible_at(EpochId::new(4)));
1002
1003        // Visible at creation epoch and after
1004        assert!(v.is_visible_at(EpochId::new(5)));
1005        assert!(v.is_visible_at(EpochId::new(10)));
1006    }
1007
1008    #[test]
1009    fn test_deleted_version_visibility() {
1010        let mut v = VersionInfo::new(EpochId::new(5), TransactionId::new(1));
1011        v.mark_deleted(EpochId::new(10), TransactionId::new(99));
1012
1013        // Visible between creation and deletion
1014        assert!(v.is_visible_at(EpochId::new(5)));
1015        assert!(v.is_visible_at(EpochId::new(9)));
1016
1017        // Not visible at or after deletion
1018        assert!(!v.is_visible_at(EpochId::new(10)));
1019        assert!(!v.is_visible_at(EpochId::new(15)));
1020    }
1021
1022    #[test]
1023    fn test_version_visibility_to_transaction() {
1024        let v = VersionInfo::new(EpochId::new(5), TransactionId::new(1));
1025
1026        // Creator can see it even if viewing at earlier epoch
1027        assert!(v.is_visible_to(EpochId::new(3), TransactionId::new(1)));
1028
1029        // Other transactions can only see it at or after creation epoch
1030        assert!(!v.is_visible_to(EpochId::new(3), TransactionId::new(2)));
1031        assert!(v.is_visible_to(EpochId::new(5), TransactionId::new(2)));
1032    }
1033
1034    #[test]
1035    fn test_version_chain_basic() {
1036        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TransactionId::new(1));
1037
1038        // Should see v1 at epoch 1+
1039        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
1040        assert_eq!(chain.visible_at(EpochId::new(0)), None);
1041
1042        // Add v2
1043        chain.add_version("v2", EpochId::new(5), TransactionId::new(2));
1044
1045        // Should see v1 at epoch < 5, v2 at epoch >= 5
1046        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
1047        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
1048        assert_eq!(chain.visible_at(EpochId::new(5)), Some(&"v2"));
1049        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v2"));
1050    }
1051
1052    #[test]
1053    fn test_version_chain_rollback() {
1054        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TransactionId::new(1));
1055        chain.add_version("v2", EpochId::new(5), TransactionId::new(2));
1056        chain.add_version("v3", EpochId::new(6), TransactionId::new(2));
1057
1058        assert_eq!(chain.version_count(), 3);
1059
1060        // Rollback tx 2's changes
1061        chain.remove_versions_by(TransactionId::new(2));
1062
1063        assert_eq!(chain.version_count(), 1);
1064        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v1"));
1065    }
1066
1067    #[test]
1068    fn test_version_chain_deletion() {
1069        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TransactionId::new(1));
1070
1071        // Mark as deleted at epoch 5 by transaction 99 (system)
1072        assert!(chain.mark_deleted(EpochId::new(5), TransactionId::new(99)));
1073
1074        // Should see v1 before deletion, nothing after
1075        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
1076        assert_eq!(chain.visible_at(EpochId::new(5)), None);
1077        assert_eq!(chain.visible_at(EpochId::new(10)), None);
1078    }
1079}
1080
1081// ============================================================================
1082// Tiered Storage Tests
1083// ============================================================================
1084
1085#[cfg(all(test, feature = "tiered-storage"))]
1086mod tiered_storage_tests {
1087    use super::*;
1088
1089    #[test]
1090    fn test_optional_epoch_id() {
1091        // Test NONE
1092        let none = OptionalEpochId::NONE;
1093        assert!(none.is_none());
1094        assert!(!none.is_some());
1095        assert_eq!(none.get(), None);
1096
1097        // Test Some
1098        let some = OptionalEpochId::some(EpochId::new(42));
1099        assert!(some.is_some());
1100        assert!(!some.is_none());
1101        assert_eq!(some.get(), Some(EpochId::new(42)));
1102
1103        // Test zero epoch
1104        let zero = OptionalEpochId::some(EpochId::new(0));
1105        assert!(zero.is_some());
1106        assert_eq!(zero.get(), Some(EpochId::new(0)));
1107    }
1108
1109    #[test]
1110    fn test_hot_version_ref_visibility() {
1111        let hot = HotVersionRef::new(EpochId::new(5), EpochId::new(5), 100, TransactionId::new(1));
1112
1113        // Not visible before creation
1114        assert!(!hot.is_visible_at(EpochId::new(4)));
1115
1116        // Visible at creation and after
1117        assert!(hot.is_visible_at(EpochId::new(5)));
1118        assert!(hot.is_visible_at(EpochId::new(10)));
1119    }
1120
1121    #[test]
1122    fn test_hot_version_ref_deleted_visibility() {
1123        let mut hot =
1124            HotVersionRef::new(EpochId::new(5), EpochId::new(5), 100, TransactionId::new(1));
1125        hot.deleted_epoch = OptionalEpochId::some(EpochId::new(10));
1126
1127        // Visible between creation and deletion
1128        assert!(hot.is_visible_at(EpochId::new(5)));
1129        assert!(hot.is_visible_at(EpochId::new(9)));
1130
1131        // Not visible at or after deletion
1132        assert!(!hot.is_visible_at(EpochId::new(10)));
1133        assert!(!hot.is_visible_at(EpochId::new(15)));
1134    }
1135
1136    #[test]
1137    fn test_hot_version_ref_transaction_visibility() {
1138        let hot = HotVersionRef::new(EpochId::new(5), EpochId::new(5), 100, TransactionId::new(1));
1139
1140        // Creator can see it even at earlier epoch
1141        assert!(hot.is_visible_to(EpochId::new(3), TransactionId::new(1)));
1142
1143        // Other transactions can only see it at or after creation
1144        assert!(!hot.is_visible_to(EpochId::new(3), TransactionId::new(2)));
1145        assert!(hot.is_visible_to(EpochId::new(5), TransactionId::new(2)));
1146    }
1147
1148    #[test]
1149    fn test_version_index_basic() {
1150        let hot = HotVersionRef::new(EpochId::new(1), EpochId::new(1), 0, TransactionId::new(1));
1151        let mut index = VersionIndex::with_initial(hot);
1152
1153        // Should see version at epoch 1+
1154        assert!(index.visible_at(EpochId::new(1)).is_some());
1155        assert!(index.visible_at(EpochId::new(0)).is_none());
1156
1157        // Add another version
1158        let hot2 = HotVersionRef::new(EpochId::new(5), EpochId::new(5), 100, TransactionId::new(2));
1159        index.add_hot(hot2);
1160
1161        // Should see v1 at epoch < 5, v2 at epoch >= 5
1162        let v1 = index.visible_at(EpochId::new(4)).unwrap();
1163        assert!(matches!(v1, VersionRef::Hot(h) if h.arena_offset == 0));
1164
1165        let v2 = index.visible_at(EpochId::new(5)).unwrap();
1166        assert!(matches!(v2, VersionRef::Hot(h) if h.arena_offset == 100));
1167    }
1168
1169    #[test]
1170    fn test_version_index_deletion() {
1171        let hot = HotVersionRef::new(EpochId::new(1), EpochId::new(1), 0, TransactionId::new(1));
1172        let mut index = VersionIndex::with_initial(hot);
1173
1174        // Mark as deleted at epoch 5
1175        assert!(index.mark_deleted(EpochId::new(5), TransactionId::new(99)));
1176
1177        // Should see version before deletion, nothing after
1178        assert!(index.visible_at(EpochId::new(4)).is_some());
1179        assert!(index.visible_at(EpochId::new(5)).is_none());
1180        assert!(index.visible_at(EpochId::new(10)).is_none());
1181    }
1182
1183    #[test]
1184    fn test_version_index_transaction_visibility() {
1185        let tx = TransactionId::new(10);
1186        let hot = HotVersionRef::new(EpochId::new(5), EpochId::new(5), 0, tx);
1187        let index = VersionIndex::with_initial(hot);
1188
1189        // Creator can see it even at earlier epoch
1190        assert!(index.visible_to(EpochId::new(3), tx).is_some());
1191
1192        // Other transactions cannot see it before creation
1193        assert!(
1194            index
1195                .visible_to(EpochId::new(3), TransactionId::new(20))
1196                .is_none()
1197        );
1198        assert!(
1199            index
1200                .visible_to(EpochId::new(5), TransactionId::new(20))
1201                .is_some()
1202        );
1203    }
1204
1205    #[test]
1206    fn test_version_index_rollback() {
1207        let tx1 = TransactionId::new(10);
1208        let tx2 = TransactionId::new(20);
1209
1210        let mut index = VersionIndex::new();
1211        index.add_hot(HotVersionRef::new(EpochId::new(1), EpochId::new(1), 0, tx1));
1212        index.add_hot(HotVersionRef::new(
1213            EpochId::new(2),
1214            EpochId::new(2),
1215            100,
1216            tx2,
1217        ));
1218        index.add_hot(HotVersionRef::new(
1219            EpochId::new(3),
1220            EpochId::new(3),
1221            200,
1222            tx2,
1223        ));
1224
1225        assert_eq!(index.version_count(), 3);
1226        assert!(index.modified_by(tx1));
1227        assert!(index.modified_by(tx2));
1228
1229        // Rollback tx2's changes
1230        index.remove_versions_by(tx2);
1231
1232        assert_eq!(index.version_count(), 1);
1233        assert!(index.modified_by(tx1));
1234        assert!(!index.modified_by(tx2));
1235
1236        // Should only see tx1's version
1237        let v = index.visible_at(EpochId::new(10)).unwrap();
1238        assert!(matches!(v, VersionRef::Hot(h) if h.created_by == tx1));
1239    }
1240
1241    #[test]
1242    // reason: test epoch values are small, fit u32
1243    #[allow(clippy::cast_possible_truncation)]
1244    fn test_version_index_gc() {
1245        let mut index = VersionIndex::new();
1246
1247        // Add versions at epochs 1, 3, 5
1248        for epoch in [1, 3, 5] {
1249            index.add_hot(HotVersionRef::new(
1250                EpochId::new(epoch),
1251                EpochId::new(epoch),
1252                epoch as u32 * 100,
1253                TransactionId::new(epoch),
1254            ));
1255        }
1256
1257        assert_eq!(index.version_count(), 3);
1258
1259        // GC with min_epoch = 4
1260        // Should keep: epoch 5 (>= 4) and epoch 3 (first old visible)
1261        index.gc(EpochId::new(4));
1262
1263        assert_eq!(index.version_count(), 2);
1264
1265        // Verify we kept epochs 5 and 3
1266        assert!(index.visible_at(EpochId::new(5)).is_some());
1267        assert!(index.visible_at(EpochId::new(3)).is_some());
1268    }
1269
1270    #[test]
1271    fn test_version_index_conflict_detection() {
1272        let tx1 = TransactionId::new(10);
1273        let tx2 = TransactionId::new(20);
1274
1275        let mut index = VersionIndex::new();
1276        index.add_hot(HotVersionRef::new(EpochId::new(1), EpochId::new(1), 0, tx1));
1277        index.add_hot(HotVersionRef::new(
1278            EpochId::new(5),
1279            EpochId::new(5),
1280            100,
1281            tx2,
1282        ));
1283
1284        // tx1 started at epoch 0, tx2 modified at epoch 5 -> conflict for tx1
1285        assert!(index.has_conflict(EpochId::new(0), tx1));
1286
1287        // tx2 started at epoch 0, tx1 modified at epoch 1 -> also conflict for tx2
1288        assert!(index.has_conflict(EpochId::new(0), tx2));
1289
1290        // tx1 started after tx2's modification -> no conflict
1291        assert!(!index.has_conflict(EpochId::new(5), tx1));
1292
1293        // tx2 started after tx1's modification -> no conflict
1294        assert!(!index.has_conflict(EpochId::new(1), tx2));
1295
1296        // If only tx1's version exists, tx1 doesn't conflict with itself
1297        let mut index2 = VersionIndex::new();
1298        index2.add_hot(HotVersionRef::new(EpochId::new(5), EpochId::new(5), 0, tx1));
1299        assert!(!index2.has_conflict(EpochId::new(0), tx1));
1300    }
1301
1302    #[test]
1303    // reason: test loop index 0..2 fits u32
1304    #[allow(clippy::cast_possible_truncation)]
1305    fn test_version_index_smallvec_no_heap() {
1306        let mut index = VersionIndex::new();
1307
1308        // Add 2 hot versions (within inline capacity)
1309        for i in 0..2 {
1310            index.add_hot(HotVersionRef::new(
1311                EpochId::new(i),
1312                EpochId::new(i),
1313                i as u32,
1314                TransactionId::new(i),
1315            ));
1316        }
1317
1318        // SmallVec should not have spilled to heap
1319        assert!(!index.hot_spilled());
1320        assert!(!index.cold_spilled());
1321    }
1322
1323    #[test]
1324    fn test_version_index_freeze_epoch() {
1325        let mut index = VersionIndex::new();
1326        index.add_hot(HotVersionRef::new(
1327            EpochId::new(1),
1328            EpochId::new(1),
1329            0,
1330            TransactionId::new(1),
1331        ));
1332        index.add_hot(HotVersionRef::new(
1333            EpochId::new(2),
1334            EpochId::new(2),
1335            100,
1336            TransactionId::new(2),
1337        ));
1338
1339        assert_eq!(index.hot_count(), 2);
1340        assert_eq!(index.cold_count(), 0);
1341
1342        // Freeze epoch 1
1343        let cold_ref = ColdVersionRef {
1344            epoch: EpochId::new(1),
1345            block_offset: 0,
1346            length: 32,
1347            created_by: TransactionId::new(1),
1348            deleted_epoch: OptionalEpochId::NONE,
1349            deleted_by: None,
1350        };
1351        index.freeze_epoch(EpochId::new(1), std::iter::once(cold_ref));
1352
1353        // Hot should have 1, cold should have 1
1354        assert_eq!(index.hot_count(), 1);
1355        assert_eq!(index.cold_count(), 1);
1356
1357        // Visibility should still work
1358        assert!(index.visible_at(EpochId::new(1)).is_some());
1359        assert!(index.visible_at(EpochId::new(2)).is_some());
1360
1361        // Check that cold version is actually cold
1362        let v1 = index.visible_at(EpochId::new(1)).unwrap();
1363        assert!(v1.is_cold());
1364
1365        let v2 = index.visible_at(EpochId::new(2)).unwrap();
1366        assert!(v2.is_hot());
1367    }
1368
1369    #[test]
1370    fn test_version_ref_accessors() {
1371        let hot = HotVersionRef::new(
1372            EpochId::new(5),
1373            EpochId::new(5),
1374            100,
1375            TransactionId::new(10),
1376        );
1377        let vr = VersionRef::Hot(hot);
1378
1379        assert_eq!(vr.epoch(), EpochId::new(5));
1380        assert_eq!(vr.created_by(), TransactionId::new(10));
1381        assert!(vr.is_hot());
1382        assert!(!vr.is_cold());
1383    }
1384
1385    #[test]
1386    fn test_version_index_latest_epoch() {
1387        let mut index = VersionIndex::new();
1388        assert_eq!(index.latest_epoch(), EpochId::INITIAL);
1389
1390        index.add_hot(HotVersionRef::new(
1391            EpochId::new(5),
1392            EpochId::new(5),
1393            0,
1394            TransactionId::new(1),
1395        ));
1396        assert_eq!(index.latest_epoch(), EpochId::new(5));
1397
1398        index.add_hot(HotVersionRef::new(
1399            EpochId::new(10),
1400            EpochId::new(10),
1401            100,
1402            TransactionId::new(2),
1403        ));
1404        assert_eq!(index.latest_epoch(), EpochId::new(10));
1405
1406        // After rollback, should recalculate
1407        index.remove_versions_by(TransactionId::new(2));
1408        assert_eq!(index.latest_epoch(), EpochId::new(5));
1409    }
1410
1411    #[test]
1412    fn test_version_index_default() {
1413        let index = VersionIndex::default();
1414        assert!(index.is_empty());
1415        assert_eq!(index.version_count(), 0);
1416    }
1417
1418    #[test]
1419    fn test_version_index_latest() {
1420        let mut index = VersionIndex::new();
1421        assert!(index.latest().is_none());
1422
1423        index.add_hot(HotVersionRef::new(
1424            EpochId::new(1),
1425            EpochId::new(1),
1426            0,
1427            TransactionId::new(1),
1428        ));
1429        let latest = index.latest().unwrap();
1430        assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(1)));
1431
1432        index.add_hot(HotVersionRef::new(
1433            EpochId::new(5),
1434            EpochId::new(5),
1435            100,
1436            TransactionId::new(2),
1437        ));
1438        let latest = index.latest().unwrap();
1439        assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(5)));
1440    }
1441}