Skip to main content

grafeo_common/
mvcc.rs

1//! MVCC (Multi-Version Concurrency Control) primitives.
2//!
3//! This is how Grafeo handles concurrent reads and writes without blocking.
4//! Each entity has a [`VersionChain`] that tracks all versions. Readers see
5//! consistent snapshots, writers create new versions, and old versions get
6//! garbage collected when no one needs them anymore.
7
8use std::collections::VecDeque;
9
10#[cfg(feature = "tiered-storage")]
11use smallvec::SmallVec;
12
13use crate::types::{EpochId, TxId};
14
15/// Tracks when a version was created and deleted for visibility checks.
16#[derive(Debug, Clone, Copy)]
17pub struct VersionInfo {
18    /// The epoch this version was created in.
19    pub created_epoch: EpochId,
20    /// The epoch this version was deleted in (if any).
21    pub deleted_epoch: Option<EpochId>,
22    /// The transaction that created this version.
23    pub created_by: TxId,
24}
25
26impl VersionInfo {
27    /// Creates a new version info.
28    #[must_use]
29    pub fn new(created_epoch: EpochId, created_by: TxId) -> Self {
30        Self {
31            created_epoch,
32            deleted_epoch: None,
33            created_by,
34        }
35    }
36
37    /// Marks this version as deleted.
38    pub fn mark_deleted(&mut self, epoch: EpochId) {
39        self.deleted_epoch = Some(epoch);
40    }
41
42    /// Checks if this version is visible at the given epoch.
43    #[inline]
44    #[must_use]
45    pub fn is_visible_at(&self, epoch: EpochId) -> bool {
46        // Visible if created before or at the viewing epoch
47        // and not deleted before the viewing epoch
48        if !self.created_epoch.is_visible_at(epoch) {
49            return false;
50        }
51
52        if let Some(deleted) = self.deleted_epoch {
53            // Not visible if deleted at or before the viewing epoch
54            deleted.as_u64() > epoch.as_u64()
55        } else {
56            true
57        }
58    }
59
60    /// Checks if this version is visible to a specific transaction.
61    ///
62    /// A version is visible to a transaction if:
63    /// 1. It was created by the same transaction, OR
64    /// 2. It was created in an epoch before the transaction's start epoch
65    ///    and not deleted before that epoch
66    #[inline]
67    #[must_use]
68    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
69        // Own modifications are always visible
70        if self.created_by == viewing_tx {
71            return self.deleted_epoch.is_none();
72        }
73
74        // Otherwise, use epoch-based visibility
75        self.is_visible_at(viewing_epoch)
76    }
77}
78
79/// A single version of data.
80#[derive(Debug, Clone)]
81pub struct Version<T> {
82    /// Visibility metadata.
83    pub info: VersionInfo,
84    /// The actual data.
85    pub data: T,
86}
87
88impl<T> Version<T> {
89    /// Creates a new version.
90    #[must_use]
91    pub fn new(data: T, created_epoch: EpochId, created_by: TxId) -> Self {
92        Self {
93            info: VersionInfo::new(created_epoch, created_by),
94            data,
95        }
96    }
97}
98
99/// All versions of a single entity, newest first.
100///
101/// Each node/edge has one of these tracking its version history. Use
102/// [`visible_at()`](Self::visible_at) to get the version at a specific epoch,
103/// or [`visible_to()`](Self::visible_to) for transaction-aware visibility.
104#[derive(Debug, Clone)]
105pub struct VersionChain<T> {
106    /// Versions ordered newest-first.
107    versions: VecDeque<Version<T>>,
108}
109
110impl<T> VersionChain<T> {
111    /// Creates a new empty version chain.
112    #[must_use]
113    pub fn new() -> Self {
114        Self {
115            versions: VecDeque::new(),
116        }
117    }
118
119    /// Creates a version chain with an initial version.
120    #[must_use]
121    pub fn with_initial(data: T, created_epoch: EpochId, created_by: TxId) -> Self {
122        let mut chain = Self::new();
123        chain.add_version(data, created_epoch, created_by);
124        chain
125    }
126
127    /// Adds a new version to the chain.
128    ///
129    /// The new version becomes the head of the chain.
130    pub fn add_version(&mut self, data: T, created_epoch: EpochId, created_by: TxId) {
131        let version = Version::new(data, created_epoch, created_by);
132        self.versions.push_front(version);
133    }
134
135    /// Finds the version visible at the given epoch.
136    ///
137    /// Returns a reference to the visible version's data, or `None` if no version
138    /// is visible at that epoch.
139    #[must_use]
140    pub fn visible_at(&self, epoch: EpochId) -> Option<&T> {
141        self.versions
142            .iter()
143            .find(|v| v.info.is_visible_at(epoch))
144            .map(|v| &v.data)
145    }
146
147    /// Finds the version visible to a specific transaction.
148    ///
149    /// This considers both the transaction's epoch and its own uncommitted changes.
150    #[must_use]
151    pub fn visible_to(&self, epoch: EpochId, tx: TxId) -> Option<&T> {
152        self.versions
153            .iter()
154            .find(|v| v.info.is_visible_to(epoch, tx))
155            .map(|v| &v.data)
156    }
157
158    /// Marks the current visible version as deleted.
159    ///
160    /// Returns `true` if a version was marked, `false` if no visible version exists.
161    pub fn mark_deleted(&mut self, delete_epoch: EpochId) -> bool {
162        for version in &mut self.versions {
163            if version.info.deleted_epoch.is_none() {
164                version.info.mark_deleted(delete_epoch);
165                return true;
166            }
167        }
168        false
169    }
170
171    /// Checks if any version was modified by the given transaction.
172    #[must_use]
173    pub fn modified_by(&self, tx: TxId) -> bool {
174        self.versions.iter().any(|v| v.info.created_by == tx)
175    }
176
177    /// Removes all versions created by the given transaction.
178    ///
179    /// Used for rollback to discard uncommitted changes.
180    pub fn remove_versions_by(&mut self, tx: TxId) {
181        self.versions.retain(|v| v.info.created_by != tx);
182    }
183
184    /// Checks if there's a concurrent modification conflict.
185    ///
186    /// A conflict exists if another transaction modified this entity
187    /// after our start epoch.
188    #[must_use]
189    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TxId) -> bool {
190        self.versions.iter().any(|v| {
191            v.info.created_by != our_tx && v.info.created_epoch.as_u64() > start_epoch.as_u64()
192        })
193    }
194
195    /// Returns the number of versions in the chain.
196    #[must_use]
197    pub fn version_count(&self) -> usize {
198        self.versions.len()
199    }
200
201    /// Returns true if the chain has no versions.
202    #[must_use]
203    pub fn is_empty(&self) -> bool {
204        self.versions.is_empty()
205    }
206
207    /// Garbage collects old versions that are no longer visible to any transaction.
208    ///
209    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
210    pub fn gc(&mut self, min_epoch: EpochId) {
211        if self.versions.is_empty() {
212            return;
213        }
214
215        let mut keep_count = 0;
216        let mut found_old_visible = false;
217
218        for (i, version) in self.versions.iter().enumerate() {
219            if version.info.created_epoch.as_u64() >= min_epoch.as_u64() {
220                keep_count = i + 1;
221            } else if !found_old_visible {
222                // Keep the first (most recent) old version
223                found_old_visible = true;
224                keep_count = i + 1;
225            }
226        }
227
228        self.versions.truncate(keep_count);
229    }
230
231    /// Returns a reference to the latest version's data regardless of visibility.
232    #[must_use]
233    pub fn latest(&self) -> Option<&T> {
234        self.versions.front().map(|v| &v.data)
235    }
236
237    /// Returns a mutable reference to the latest version's data.
238    #[must_use]
239    pub fn latest_mut(&mut self) -> Option<&mut T> {
240        self.versions.front_mut().map(|v| &mut v.data)
241    }
242}
243
244impl<T> Default for VersionChain<T> {
245    fn default() -> Self {
246        Self::new()
247    }
248}
249
250impl<T: Clone> VersionChain<T> {
251    /// Gets a mutable reference to the visible version's data for modification.
252    ///
253    /// If the version is not owned by this transaction, creates a new version
254    /// with a copy of the data.
255    pub fn get_mut(&mut self, epoch: EpochId, tx: TxId, modify_epoch: EpochId) -> Option<&mut T> {
256        // Find the visible version
257        let visible_idx = self
258            .versions
259            .iter()
260            .position(|v| v.info.is_visible_to(epoch, tx))?;
261
262        let visible = &self.versions[visible_idx];
263
264        if visible.info.created_by == tx {
265            // Already our version, modify in place
266            Some(&mut self.versions[visible_idx].data)
267        } else {
268            // Create a new version with copied data
269            let new_data = visible.data.clone();
270            self.add_version(new_data, modify_epoch, tx);
271            Some(&mut self.versions[0].data)
272        }
273    }
274}
275
276// ============================================================================
277// Tiered Storage Types (Phase 13)
278// ============================================================================
279//
280// These types support the tiered hot/cold storage architecture where version
281// metadata is stored separately from version data. Data lives in arenas (hot)
282// or compressed epoch blocks (cold), while VersionIndex holds lightweight refs.
283
284/// Compact representation of an optional epoch ID.
285///
286/// Uses `u32::MAX` as sentinel for `None`, allowing epochs up to ~4 billion.
287/// This saves 4 bytes compared to `Option<EpochId>` due to niche optimization.
288#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
289#[repr(transparent)]
290#[cfg(feature = "tiered-storage")]
291pub struct OptionalEpochId(u32);
292
293#[cfg(feature = "tiered-storage")]
294impl OptionalEpochId {
295    /// Represents no epoch (deleted_epoch = None).
296    pub const NONE: Self = Self(u32::MAX);
297
298    /// Creates an `OptionalEpochId` from an epoch.
299    ///
300    /// # Panics
301    /// Debug-panics if epoch exceeds u32::MAX - 1.
302    #[must_use]
303    pub fn some(epoch: EpochId) -> Self {
304        debug_assert!(
305            epoch.as_u64() < u64::from(u32::MAX),
306            "epoch {} exceeds OptionalEpochId capacity",
307            epoch.as_u64()
308        );
309        Self(epoch.as_u64() as u32)
310    }
311
312    /// Returns the contained epoch, or `None` if this is `NONE`.
313    #[inline]
314    #[must_use]
315    pub fn get(self) -> Option<EpochId> {
316        if self.0 == u32::MAX {
317            None
318        } else {
319            Some(EpochId::new(u64::from(self.0)))
320        }
321    }
322
323    /// Returns `true` if this contains an epoch.
324    #[must_use]
325    pub fn is_some(self) -> bool {
326        self.0 != u32::MAX
327    }
328
329    /// Returns `true` if this is `NONE`.
330    #[inline]
331    #[must_use]
332    pub fn is_none(self) -> bool {
333        self.0 == u32::MAX
334    }
335}
336
337/// Reference to a hot (arena-allocated) version.
338///
339/// Hot versions are stored in the epoch's arena and can be accessed directly.
340/// This struct only holds metadata; the actual data lives in the arena.
341///
342/// # Memory Layout
343/// - `epoch`: 8 bytes
344/// - `arena_offset`: 4 bytes
345/// - `created_by`: 8 bytes
346/// - `deleted_epoch`: 4 bytes
347/// - Total: 24 bytes
348#[derive(Debug, Clone, Copy, PartialEq, Eq)]
349#[cfg(feature = "tiered-storage")]
350pub struct HotVersionRef {
351    /// Epoch when this version was created.
352    pub epoch: EpochId,
353    /// Offset within the epoch's arena where the data is stored.
354    pub arena_offset: u32,
355    /// Transaction that created this version.
356    pub created_by: TxId,
357    /// Epoch when this version was deleted (NONE if still alive).
358    pub deleted_epoch: OptionalEpochId,
359}
360
361#[cfg(feature = "tiered-storage")]
362impl HotVersionRef {
363    /// Creates a new hot version reference.
364    #[must_use]
365    pub fn new(epoch: EpochId, arena_offset: u32, created_by: TxId) -> Self {
366        Self {
367            epoch,
368            arena_offset,
369            created_by,
370            deleted_epoch: OptionalEpochId::NONE,
371        }
372    }
373
374    /// Checks if this version is visible at the given epoch.
375    #[inline]
376    #[must_use]
377    pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
378        // Must be created at or before the viewing epoch
379        if !self.epoch.is_visible_at(viewing_epoch) {
380            return false;
381        }
382        // Must not be deleted at or before the viewing epoch
383        match self.deleted_epoch.get() {
384            Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
385            None => true,
386        }
387    }
388
389    /// Checks if this version is visible to a specific transaction.
390    #[inline]
391    #[must_use]
392    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
393        // Own modifications are always visible (if not deleted by self)
394        if self.created_by == viewing_tx {
395            return self.deleted_epoch.is_none();
396        }
397        // Otherwise use epoch-based visibility
398        self.is_visible_at(viewing_epoch)
399    }
400}
401
402/// Reference to a cold (compressed) version.
403///
404/// Cold versions are stored in compressed epoch blocks. This is a placeholder
405/// for Phase 14 - the actual compression logic will be implemented there.
406///
407/// # Memory Layout
408/// - `epoch`: 8 bytes
409/// - `block_offset`: 4 bytes
410/// - `length`: 2 bytes
411/// - `created_by`: 8 bytes
412/// - `deleted_epoch`: 4 bytes
413/// - Total: 26 bytes (+ 6 padding = 32 bytes aligned)
414#[derive(Debug, Clone, Copy, PartialEq, Eq)]
415#[cfg(feature = "tiered-storage")]
416pub struct ColdVersionRef {
417    /// Epoch when this version was created.
418    pub epoch: EpochId,
419    /// Offset within the compressed epoch block.
420    pub block_offset: u32,
421    /// Compressed length in bytes.
422    pub length: u16,
423    /// Transaction that created this version.
424    pub created_by: TxId,
425    /// Epoch when this version was deleted.
426    pub deleted_epoch: OptionalEpochId,
427}
428
429#[cfg(feature = "tiered-storage")]
430impl ColdVersionRef {
431    /// Checks if this version is visible at the given epoch.
432    #[must_use]
433    pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
434        if !self.epoch.is_visible_at(viewing_epoch) {
435            return false;
436        }
437        match self.deleted_epoch.get() {
438            Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
439            None => true,
440        }
441    }
442
443    /// Checks if this version is visible to a specific transaction.
444    #[must_use]
445    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
446        if self.created_by == viewing_tx {
447            return self.deleted_epoch.is_none();
448        }
449        self.is_visible_at(viewing_epoch)
450    }
451}
452
453/// Unified reference to either a hot or cold version.
454#[derive(Debug, Clone, Copy)]
455#[cfg(feature = "tiered-storage")]
456pub enum VersionRef {
457    /// Version data is in arena (hot tier).
458    Hot(HotVersionRef),
459    /// Version data is in compressed storage (cold tier).
460    Cold(ColdVersionRef),
461}
462
463#[cfg(feature = "tiered-storage")]
464impl VersionRef {
465    /// Returns the epoch when this version was created.
466    #[must_use]
467    pub fn epoch(&self) -> EpochId {
468        match self {
469            Self::Hot(h) => h.epoch,
470            Self::Cold(c) => c.epoch,
471        }
472    }
473
474    /// Returns the transaction that created this version.
475    #[must_use]
476    pub fn created_by(&self) -> TxId {
477        match self {
478            Self::Hot(h) => h.created_by,
479            Self::Cold(c) => c.created_by,
480        }
481    }
482
483    /// Returns `true` if this is a hot version.
484    #[must_use]
485    pub fn is_hot(&self) -> bool {
486        matches!(self, Self::Hot(_))
487    }
488
489    /// Returns `true` if this is a cold version.
490    #[must_use]
491    pub fn is_cold(&self) -> bool {
492        matches!(self, Self::Cold(_))
493    }
494}
495
496/// Tiered version index - replaces `VersionChain<T>` for hot/cold storage.
497///
498/// Instead of storing data inline, `VersionIndex` holds lightweight references
499/// to data stored in arenas (hot) or compressed blocks (cold). This enables:
500///
501/// - **No heap allocation** for typical 1-2 version case (SmallVec inline)
502/// - **Separation of metadata and data** for compression
503/// - **Fast visibility checks** via cached `latest_epoch`
504/// - **O(1) epoch drops** instead of per-version deallocation
505///
506/// # Memory Layout
507/// - `hot`: SmallVec<[HotVersionRef; 2]> ≈ 56 bytes inline
508/// - `cold`: SmallVec<[ColdVersionRef; 4]> ≈ 136 bytes inline
509/// - `latest_epoch`: 8 bytes
510/// - Total: ~200 bytes, no heap for typical case
511#[derive(Debug, Clone)]
512#[cfg(feature = "tiered-storage")]
513pub struct VersionIndex {
514    /// Hot versions in arena storage (most recent first).
515    hot: SmallVec<[HotVersionRef; 2]>,
516    /// Cold versions in compressed storage (most recent first).
517    cold: SmallVec<[ColdVersionRef; 4]>,
518    /// Cached epoch of the latest version for fast staleness checks.
519    latest_epoch: EpochId,
520}
521
522#[cfg(feature = "tiered-storage")]
523impl VersionIndex {
524    /// Creates a new empty version index.
525    #[must_use]
526    pub fn new() -> Self {
527        Self {
528            hot: SmallVec::new(),
529            cold: SmallVec::new(),
530            latest_epoch: EpochId::INITIAL,
531        }
532    }
533
534    /// Creates a version index with an initial hot version.
535    #[must_use]
536    pub fn with_initial(hot_ref: HotVersionRef) -> Self {
537        let mut index = Self::new();
538        index.add_hot(hot_ref);
539        index
540    }
541
542    /// Adds a new hot version (becomes the latest).
543    pub fn add_hot(&mut self, hot_ref: HotVersionRef) {
544        // Insert at front (most recent first)
545        self.hot.insert(0, hot_ref);
546        self.latest_epoch = hot_ref.epoch;
547    }
548
549    /// Returns the latest epoch for quick staleness checks.
550    #[must_use]
551    pub fn latest_epoch(&self) -> EpochId {
552        self.latest_epoch
553    }
554
555    /// Returns `true` if this entity has no versions.
556    #[must_use]
557    pub fn is_empty(&self) -> bool {
558        self.hot.is_empty() && self.cold.is_empty()
559    }
560
561    /// Returns the total version count (hot + cold).
562    #[must_use]
563    pub fn version_count(&self) -> usize {
564        self.hot.len() + self.cold.len()
565    }
566
567    /// Returns the number of hot versions.
568    #[must_use]
569    pub fn hot_count(&self) -> usize {
570        self.hot.len()
571    }
572
573    /// Returns the number of cold versions.
574    #[must_use]
575    pub fn cold_count(&self) -> usize {
576        self.cold.len()
577    }
578
579    /// Finds the version visible at the given epoch.
580    #[must_use]
581    pub fn visible_at(&self, epoch: EpochId) -> Option<VersionRef> {
582        // Check hot versions first (most recent first, likely case)
583        for v in &self.hot {
584            if v.is_visible_at(epoch) {
585                return Some(VersionRef::Hot(*v));
586            }
587        }
588        // Fall back to cold versions
589        for v in &self.cold {
590            if v.is_visible_at(epoch) {
591                return Some(VersionRef::Cold(*v));
592            }
593        }
594        None
595    }
596
597    /// Finds the version visible to a specific transaction.
598    #[must_use]
599    pub fn visible_to(&self, epoch: EpochId, tx: TxId) -> Option<VersionRef> {
600        // Check hot versions first
601        for v in &self.hot {
602            if v.is_visible_to(epoch, tx) {
603                return Some(VersionRef::Hot(*v));
604            }
605        }
606        // Fall back to cold versions
607        for v in &self.cold {
608            if v.is_visible_to(epoch, tx) {
609                return Some(VersionRef::Cold(*v));
610            }
611        }
612        None
613    }
614
615    /// Marks the currently visible version as deleted at the given epoch.
616    ///
617    /// Returns `true` if a version was marked, `false` if no visible version exists.
618    pub fn mark_deleted(&mut self, delete_epoch: EpochId) -> bool {
619        // Find the first non-deleted hot version and mark it
620        for v in &mut self.hot {
621            if v.deleted_epoch.is_none() {
622                v.deleted_epoch = OptionalEpochId::some(delete_epoch);
623                return true;
624            }
625        }
626        // Check cold versions (rare case)
627        for v in &mut self.cold {
628            if v.deleted_epoch.is_none() {
629                v.deleted_epoch = OptionalEpochId::some(delete_epoch);
630                return true;
631            }
632        }
633        false
634    }
635
636    /// Checks if any version was created by the given transaction.
637    #[must_use]
638    pub fn modified_by(&self, tx: TxId) -> bool {
639        self.hot.iter().any(|v| v.created_by == tx) || self.cold.iter().any(|v| v.created_by == tx)
640    }
641
642    /// Removes all versions created by the given transaction (for rollback).
643    pub fn remove_versions_by(&mut self, tx: TxId) {
644        self.hot.retain(|v| v.created_by != tx);
645        self.cold.retain(|v| v.created_by != tx);
646        self.recalculate_latest_epoch();
647    }
648
649    /// Checks for write conflict with concurrent transaction.
650    ///
651    /// A conflict exists if another transaction modified this entity
652    /// after our start epoch.
653    #[must_use]
654    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TxId) -> bool {
655        self.hot
656            .iter()
657            .any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
658            || self
659                .cold
660                .iter()
661                .any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
662    }
663
664    /// Garbage collects old versions not needed by any active transaction.
665    ///
666    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
667    pub fn gc(&mut self, min_epoch: EpochId) {
668        if self.is_empty() {
669            return;
670        }
671
672        // Keep versions that:
673        // 1. Were created at or after min_epoch
674        // 2. The first (most recent) version created before min_epoch
675        let mut found_old_visible = false;
676
677        self.hot.retain(|v| {
678            if v.epoch.as_u64() >= min_epoch.as_u64() {
679                true
680            } else if !found_old_visible {
681                found_old_visible = true;
682                true
683            } else {
684                false
685            }
686        });
687
688        // Same for cold, but only if we haven't found an old visible in hot
689        if !found_old_visible {
690            self.cold.retain(|v| {
691                if v.epoch.as_u64() >= min_epoch.as_u64() {
692                    true
693                } else if !found_old_visible {
694                    found_old_visible = true;
695                    true
696                } else {
697                    false
698                }
699            });
700        } else {
701            // All cold versions are older, only keep those >= min_epoch
702            self.cold.retain(|v| v.epoch.as_u64() >= min_epoch.as_u64());
703        }
704    }
705
706    /// Returns a reference to the latest version regardless of visibility.
707    #[must_use]
708    pub fn latest(&self) -> Option<VersionRef> {
709        self.hot
710            .first()
711            .map(|v| VersionRef::Hot(*v))
712            .or_else(|| self.cold.first().map(|v| VersionRef::Cold(*v)))
713    }
714
715    /// Freezes hot versions for a given epoch into cold storage.
716    ///
717    /// This is called when an epoch is no longer needed by any active transaction.
718    /// The actual compression happens in Phase 14; for now this just moves refs.
719    pub fn freeze_epoch(
720        &mut self,
721        epoch: EpochId,
722        cold_refs: impl Iterator<Item = ColdVersionRef>,
723    ) {
724        // Remove hot refs for this epoch
725        self.hot.retain(|v| v.epoch != epoch);
726
727        // Add cold refs
728        self.cold.extend(cold_refs);
729
730        // Keep cold sorted by epoch (descending = most recent first)
731        self.cold
732            .sort_by(|a, b| b.epoch.as_u64().cmp(&a.epoch.as_u64()));
733
734        self.recalculate_latest_epoch();
735    }
736
737    /// Returns hot version refs for a specific epoch (for freeze operation).
738    pub fn hot_refs_for_epoch(&self, epoch: EpochId) -> impl Iterator<Item = &HotVersionRef> {
739        self.hot.iter().filter(move |v| v.epoch == epoch)
740    }
741
742    /// Returns `true` if the hot SmallVec has spilled to the heap.
743    #[must_use]
744    pub fn hot_spilled(&self) -> bool {
745        self.hot.spilled()
746    }
747
748    /// Returns `true` if the cold SmallVec has spilled to the heap.
749    #[must_use]
750    pub fn cold_spilled(&self) -> bool {
751        self.cold.spilled()
752    }
753
754    fn recalculate_latest_epoch(&mut self) {
755        self.latest_epoch = self
756            .hot
757            .first()
758            .map(|v| v.epoch)
759            .or_else(|| self.cold.first().map(|v| v.epoch))
760            .unwrap_or(EpochId::INITIAL);
761    }
762}
763
764#[cfg(feature = "tiered-storage")]
765impl Default for VersionIndex {
766    fn default() -> Self {
767        Self::new()
768    }
769}
770
771#[cfg(test)]
772mod tests {
773    use super::*;
774
775    #[test]
776    fn test_version_visibility() {
777        let v = VersionInfo::new(EpochId::new(5), TxId::new(1));
778
779        // Not visible before creation
780        assert!(!v.is_visible_at(EpochId::new(4)));
781
782        // Visible at creation epoch and after
783        assert!(v.is_visible_at(EpochId::new(5)));
784        assert!(v.is_visible_at(EpochId::new(10)));
785    }
786
787    #[test]
788    fn test_deleted_version_visibility() {
789        let mut v = VersionInfo::new(EpochId::new(5), TxId::new(1));
790        v.mark_deleted(EpochId::new(10));
791
792        // Visible between creation and deletion
793        assert!(v.is_visible_at(EpochId::new(5)));
794        assert!(v.is_visible_at(EpochId::new(9)));
795
796        // Not visible at or after deletion
797        assert!(!v.is_visible_at(EpochId::new(10)));
798        assert!(!v.is_visible_at(EpochId::new(15)));
799    }
800
801    #[test]
802    fn test_version_visibility_to_transaction() {
803        let v = VersionInfo::new(EpochId::new(5), TxId::new(1));
804
805        // Creator can see it even if viewing at earlier epoch
806        assert!(v.is_visible_to(EpochId::new(3), TxId::new(1)));
807
808        // Other transactions can only see it at or after creation epoch
809        assert!(!v.is_visible_to(EpochId::new(3), TxId::new(2)));
810        assert!(v.is_visible_to(EpochId::new(5), TxId::new(2)));
811    }
812
813    #[test]
814    fn test_version_chain_basic() {
815        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
816
817        // Should see v1 at epoch 1+
818        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
819        assert_eq!(chain.visible_at(EpochId::new(0)), None);
820
821        // Add v2
822        chain.add_version("v2", EpochId::new(5), TxId::new(2));
823
824        // Should see v1 at epoch < 5, v2 at epoch >= 5
825        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
826        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
827        assert_eq!(chain.visible_at(EpochId::new(5)), Some(&"v2"));
828        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v2"));
829    }
830
831    #[test]
832    fn test_version_chain_rollback() {
833        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
834        chain.add_version("v2", EpochId::new(5), TxId::new(2));
835        chain.add_version("v3", EpochId::new(6), TxId::new(2));
836
837        assert_eq!(chain.version_count(), 3);
838
839        // Rollback tx 2's changes
840        chain.remove_versions_by(TxId::new(2));
841
842        assert_eq!(chain.version_count(), 1);
843        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v1"));
844    }
845
846    #[test]
847    fn test_version_chain_deletion() {
848        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
849
850        // Mark as deleted at epoch 5
851        assert!(chain.mark_deleted(EpochId::new(5)));
852
853        // Should see v1 before deletion, nothing after
854        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
855        assert_eq!(chain.visible_at(EpochId::new(5)), None);
856        assert_eq!(chain.visible_at(EpochId::new(10)), None);
857    }
858}
859
860// ============================================================================
861// Tiered Storage Tests
862// ============================================================================
863
864#[cfg(all(test, feature = "tiered-storage"))]
865mod tiered_storage_tests {
866    use super::*;
867
868    #[test]
869    fn test_optional_epoch_id() {
870        // Test NONE
871        let none = OptionalEpochId::NONE;
872        assert!(none.is_none());
873        assert!(!none.is_some());
874        assert_eq!(none.get(), None);
875
876        // Test Some
877        let some = OptionalEpochId::some(EpochId::new(42));
878        assert!(some.is_some());
879        assert!(!some.is_none());
880        assert_eq!(some.get(), Some(EpochId::new(42)));
881
882        // Test zero epoch
883        let zero = OptionalEpochId::some(EpochId::new(0));
884        assert!(zero.is_some());
885        assert_eq!(zero.get(), Some(EpochId::new(0)));
886    }
887
888    #[test]
889    fn test_hot_version_ref_visibility() {
890        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
891
892        // Not visible before creation
893        assert!(!hot.is_visible_at(EpochId::new(4)));
894
895        // Visible at creation and after
896        assert!(hot.is_visible_at(EpochId::new(5)));
897        assert!(hot.is_visible_at(EpochId::new(10)));
898    }
899
900    #[test]
901    fn test_hot_version_ref_deleted_visibility() {
902        let mut hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
903        hot.deleted_epoch = OptionalEpochId::some(EpochId::new(10));
904
905        // Visible between creation and deletion
906        assert!(hot.is_visible_at(EpochId::new(5)));
907        assert!(hot.is_visible_at(EpochId::new(9)));
908
909        // Not visible at or after deletion
910        assert!(!hot.is_visible_at(EpochId::new(10)));
911        assert!(!hot.is_visible_at(EpochId::new(15)));
912    }
913
914    #[test]
915    fn test_hot_version_ref_transaction_visibility() {
916        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
917
918        // Creator can see it even at earlier epoch
919        assert!(hot.is_visible_to(EpochId::new(3), TxId::new(1)));
920
921        // Other transactions can only see it at or after creation
922        assert!(!hot.is_visible_to(EpochId::new(3), TxId::new(2)));
923        assert!(hot.is_visible_to(EpochId::new(5), TxId::new(2)));
924    }
925
926    #[test]
927    fn test_version_index_basic() {
928        let hot = HotVersionRef::new(EpochId::new(1), 0, TxId::new(1));
929        let mut index = VersionIndex::with_initial(hot);
930
931        // Should see version at epoch 1+
932        assert!(index.visible_at(EpochId::new(1)).is_some());
933        assert!(index.visible_at(EpochId::new(0)).is_none());
934
935        // Add another version
936        let hot2 = HotVersionRef::new(EpochId::new(5), 100, TxId::new(2));
937        index.add_hot(hot2);
938
939        // Should see v1 at epoch < 5, v2 at epoch >= 5
940        let v1 = index.visible_at(EpochId::new(4)).unwrap();
941        assert!(matches!(v1, VersionRef::Hot(h) if h.arena_offset == 0));
942
943        let v2 = index.visible_at(EpochId::new(5)).unwrap();
944        assert!(matches!(v2, VersionRef::Hot(h) if h.arena_offset == 100));
945    }
946
947    #[test]
948    fn test_version_index_deletion() {
949        let hot = HotVersionRef::new(EpochId::new(1), 0, TxId::new(1));
950        let mut index = VersionIndex::with_initial(hot);
951
952        // Mark as deleted at epoch 5
953        assert!(index.mark_deleted(EpochId::new(5)));
954
955        // Should see version before deletion, nothing after
956        assert!(index.visible_at(EpochId::new(4)).is_some());
957        assert!(index.visible_at(EpochId::new(5)).is_none());
958        assert!(index.visible_at(EpochId::new(10)).is_none());
959    }
960
961    #[test]
962    fn test_version_index_transaction_visibility() {
963        let tx = TxId::new(10);
964        let hot = HotVersionRef::new(EpochId::new(5), 0, tx);
965        let index = VersionIndex::with_initial(hot);
966
967        // Creator can see it even at earlier epoch
968        assert!(index.visible_to(EpochId::new(3), tx).is_some());
969
970        // Other transactions cannot see it before creation
971        assert!(index.visible_to(EpochId::new(3), TxId::new(20)).is_none());
972        assert!(index.visible_to(EpochId::new(5), TxId::new(20)).is_some());
973    }
974
975    #[test]
976    fn test_version_index_rollback() {
977        let tx1 = TxId::new(10);
978        let tx2 = TxId::new(20);
979
980        let mut index = VersionIndex::new();
981        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, tx1));
982        index.add_hot(HotVersionRef::new(EpochId::new(2), 100, tx2));
983        index.add_hot(HotVersionRef::new(EpochId::new(3), 200, tx2));
984
985        assert_eq!(index.version_count(), 3);
986        assert!(index.modified_by(tx1));
987        assert!(index.modified_by(tx2));
988
989        // Rollback tx2's changes
990        index.remove_versions_by(tx2);
991
992        assert_eq!(index.version_count(), 1);
993        assert!(index.modified_by(tx1));
994        assert!(!index.modified_by(tx2));
995
996        // Should only see tx1's version
997        let v = index.visible_at(EpochId::new(10)).unwrap();
998        assert!(matches!(v, VersionRef::Hot(h) if h.created_by == tx1));
999    }
1000
1001    #[test]
1002    fn test_version_index_gc() {
1003        let mut index = VersionIndex::new();
1004
1005        // Add versions at epochs 1, 3, 5
1006        for epoch in [1, 3, 5] {
1007            index.add_hot(HotVersionRef::new(
1008                EpochId::new(epoch),
1009                epoch as u32 * 100,
1010                TxId::new(epoch),
1011            ));
1012        }
1013
1014        assert_eq!(index.version_count(), 3);
1015
1016        // GC with min_epoch = 4
1017        // Should keep: epoch 5 (>= 4) and epoch 3 (first old visible)
1018        index.gc(EpochId::new(4));
1019
1020        assert_eq!(index.version_count(), 2);
1021
1022        // Verify we kept epochs 5 and 3
1023        assert!(index.visible_at(EpochId::new(5)).is_some());
1024        assert!(index.visible_at(EpochId::new(3)).is_some());
1025    }
1026
1027    #[test]
1028    fn test_version_index_conflict_detection() {
1029        let tx1 = TxId::new(10);
1030        let tx2 = TxId::new(20);
1031
1032        let mut index = VersionIndex::new();
1033        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, tx1));
1034        index.add_hot(HotVersionRef::new(EpochId::new(5), 100, tx2));
1035
1036        // tx1 started at epoch 0, tx2 modified at epoch 5 -> conflict for tx1
1037        assert!(index.has_conflict(EpochId::new(0), tx1));
1038
1039        // tx2 started at epoch 0, tx1 modified at epoch 1 -> also conflict for tx2
1040        assert!(index.has_conflict(EpochId::new(0), tx2));
1041
1042        // tx1 started after tx2's modification -> no conflict
1043        assert!(!index.has_conflict(EpochId::new(5), tx1));
1044
1045        // tx2 started after tx1's modification -> no conflict
1046        assert!(!index.has_conflict(EpochId::new(1), tx2));
1047
1048        // If only tx1's version exists, tx1 doesn't conflict with itself
1049        let mut index2 = VersionIndex::new();
1050        index2.add_hot(HotVersionRef::new(EpochId::new(5), 0, tx1));
1051        assert!(!index2.has_conflict(EpochId::new(0), tx1));
1052    }
1053
1054    #[test]
1055    fn test_version_index_smallvec_no_heap() {
1056        let mut index = VersionIndex::new();
1057
1058        // Add 2 hot versions (within inline capacity)
1059        for i in 0..2 {
1060            index.add_hot(HotVersionRef::new(EpochId::new(i), i as u32, TxId::new(i)));
1061        }
1062
1063        // SmallVec should not have spilled to heap
1064        assert!(!index.hot_spilled());
1065        assert!(!index.cold_spilled());
1066    }
1067
1068    #[test]
1069    fn test_version_index_freeze_epoch() {
1070        let mut index = VersionIndex::new();
1071        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, TxId::new(1)));
1072        index.add_hot(HotVersionRef::new(EpochId::new(2), 100, TxId::new(2)));
1073
1074        assert_eq!(index.hot_count(), 2);
1075        assert_eq!(index.cold_count(), 0);
1076
1077        // Freeze epoch 1
1078        let cold_ref = ColdVersionRef {
1079            epoch: EpochId::new(1),
1080            block_offset: 0,
1081            length: 32,
1082            created_by: TxId::new(1),
1083            deleted_epoch: OptionalEpochId::NONE,
1084        };
1085        index.freeze_epoch(EpochId::new(1), std::iter::once(cold_ref));
1086
1087        // Hot should have 1, cold should have 1
1088        assert_eq!(index.hot_count(), 1);
1089        assert_eq!(index.cold_count(), 1);
1090
1091        // Visibility should still work
1092        assert!(index.visible_at(EpochId::new(1)).is_some());
1093        assert!(index.visible_at(EpochId::new(2)).is_some());
1094
1095        // Check that cold version is actually cold
1096        let v1 = index.visible_at(EpochId::new(1)).unwrap();
1097        assert!(v1.is_cold());
1098
1099        let v2 = index.visible_at(EpochId::new(2)).unwrap();
1100        assert!(v2.is_hot());
1101    }
1102
1103    #[test]
1104    fn test_version_ref_accessors() {
1105        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(10));
1106        let vr = VersionRef::Hot(hot);
1107
1108        assert_eq!(vr.epoch(), EpochId::new(5));
1109        assert_eq!(vr.created_by(), TxId::new(10));
1110        assert!(vr.is_hot());
1111        assert!(!vr.is_cold());
1112    }
1113
1114    #[test]
1115    fn test_version_index_latest_epoch() {
1116        let mut index = VersionIndex::new();
1117        assert_eq!(index.latest_epoch(), EpochId::INITIAL);
1118
1119        index.add_hot(HotVersionRef::new(EpochId::new(5), 0, TxId::new(1)));
1120        assert_eq!(index.latest_epoch(), EpochId::new(5));
1121
1122        index.add_hot(HotVersionRef::new(EpochId::new(10), 100, TxId::new(2)));
1123        assert_eq!(index.latest_epoch(), EpochId::new(10));
1124
1125        // After rollback, should recalculate
1126        index.remove_versions_by(TxId::new(2));
1127        assert_eq!(index.latest_epoch(), EpochId::new(5));
1128    }
1129
1130    #[test]
1131    fn test_version_index_default() {
1132        let index = VersionIndex::default();
1133        assert!(index.is_empty());
1134        assert_eq!(index.version_count(), 0);
1135    }
1136
1137    #[test]
1138    fn test_version_index_latest() {
1139        let mut index = VersionIndex::new();
1140        assert!(index.latest().is_none());
1141
1142        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, TxId::new(1)));
1143        let latest = index.latest().unwrap();
1144        assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(1)));
1145
1146        index.add_hot(HotVersionRef::new(EpochId::new(5), 100, TxId::new(2)));
1147        let latest = index.latest().unwrap();
1148        assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(5)));
1149    }
1150}