Skip to main content

grafeo_common/
mvcc.rs

1//! MVCC (Multi-Version Concurrency Control) primitives.
2//!
3//! This is how Grafeo handles concurrent reads and writes without blocking.
4//! Each entity has a [`VersionChain`] that tracks all versions. Readers see
5//! consistent snapshots, writers create new versions, and old versions get
6//! garbage collected when no one needs them anymore.
7
8use std::collections::VecDeque;
9
10#[cfg(feature = "tiered-storage")]
11use smallvec::SmallVec;
12
13use crate::types::{EpochId, TxId};
14
15/// Tracks when a version was created and deleted for visibility checks.
16#[derive(Debug, Clone, Copy)]
17pub struct VersionInfo {
18    /// The epoch this version was created in.
19    pub created_epoch: EpochId,
20    /// The epoch this version was deleted in (if any).
21    pub deleted_epoch: Option<EpochId>,
22    /// The transaction that created this version.
23    pub created_by: TxId,
24}
25
26impl VersionInfo {
27    /// Creates a new version info.
28    #[must_use]
29    pub fn new(created_epoch: EpochId, created_by: TxId) -> Self {
30        Self {
31            created_epoch,
32            deleted_epoch: None,
33            created_by,
34        }
35    }
36
37    /// Marks this version as deleted.
38    pub fn mark_deleted(&mut self, epoch: EpochId) {
39        self.deleted_epoch = Some(epoch);
40    }
41
42    /// Checks if this version is visible at the given epoch.
43    #[inline]
44    #[must_use]
45    pub fn is_visible_at(&self, epoch: EpochId) -> bool {
46        // Visible if created before or at the viewing epoch
47        // and not deleted before the viewing epoch
48        if !self.created_epoch.is_visible_at(epoch) {
49            return false;
50        }
51
52        if let Some(deleted) = self.deleted_epoch {
53            // Not visible if deleted at or before the viewing epoch
54            deleted.as_u64() > epoch.as_u64()
55        } else {
56            true
57        }
58    }
59
60    /// Checks if this version is visible to a specific transaction.
61    ///
62    /// A version is visible to a transaction if:
63    /// 1. It was created by the same transaction, OR
64    /// 2. It was created in an epoch before the transaction's start epoch
65    ///    and not deleted before that epoch
66    #[inline]
67    #[must_use]
68    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
69        // Own modifications are always visible
70        if self.created_by == viewing_tx {
71            return self.deleted_epoch.is_none();
72        }
73
74        // Otherwise, use epoch-based visibility
75        self.is_visible_at(viewing_epoch)
76    }
77}
78
79/// A single version of data.
80#[derive(Debug, Clone)]
81pub struct Version<T> {
82    /// Visibility metadata.
83    pub info: VersionInfo,
84    /// The actual data.
85    pub data: T,
86}
87
88impl<T> Version<T> {
89    /// Creates a new version.
90    #[must_use]
91    pub fn new(data: T, created_epoch: EpochId, created_by: TxId) -> Self {
92        Self {
93            info: VersionInfo::new(created_epoch, created_by),
94            data,
95        }
96    }
97}
98
99/// All versions of a single entity, newest first.
100///
101/// Each node/edge has one of these tracking its version history. Use
102/// [`visible_at()`](Self::visible_at) to get the version at a specific epoch,
103/// or [`visible_to()`](Self::visible_to) for transaction-aware visibility.
104#[derive(Debug, Clone)]
105pub struct VersionChain<T> {
106    /// Versions ordered newest-first.
107    versions: VecDeque<Version<T>>,
108}
109
110impl<T> VersionChain<T> {
111    /// Creates a new empty version chain.
112    #[must_use]
113    pub fn new() -> Self {
114        Self {
115            versions: VecDeque::new(),
116        }
117    }
118
119    /// Creates a version chain with an initial version.
120    #[must_use]
121    pub fn with_initial(data: T, created_epoch: EpochId, created_by: TxId) -> Self {
122        let mut chain = Self::new();
123        chain.add_version(data, created_epoch, created_by);
124        chain
125    }
126
127    /// Adds a new version to the chain.
128    ///
129    /// The new version becomes the head of the chain.
130    pub fn add_version(&mut self, data: T, created_epoch: EpochId, created_by: TxId) {
131        let version = Version::new(data, created_epoch, created_by);
132        self.versions.push_front(version);
133    }
134
135    /// Finds the version visible at the given epoch.
136    ///
137    /// Returns a reference to the visible version's data, or `None` if no version
138    /// is visible at that epoch.
139    #[inline]
140    #[must_use]
141    pub fn visible_at(&self, epoch: EpochId) -> Option<&T> {
142        self.versions
143            .iter()
144            .find(|v| v.info.is_visible_at(epoch))
145            .map(|v| &v.data)
146    }
147
148    /// Finds the version visible to a specific transaction.
149    ///
150    /// This considers both the transaction's epoch and its own uncommitted changes.
151    #[inline]
152    #[must_use]
153    pub fn visible_to(&self, epoch: EpochId, tx: TxId) -> Option<&T> {
154        self.versions
155            .iter()
156            .find(|v| v.info.is_visible_to(epoch, tx))
157            .map(|v| &v.data)
158    }
159
160    /// Marks the current visible version as deleted.
161    ///
162    /// Returns `true` if a version was marked, `false` if no visible version exists.
163    pub fn mark_deleted(&mut self, delete_epoch: EpochId) -> bool {
164        for version in &mut self.versions {
165            if version.info.deleted_epoch.is_none() {
166                version.info.mark_deleted(delete_epoch);
167                return true;
168            }
169        }
170        false
171    }
172
173    /// Checks if any version was modified by the given transaction.
174    #[must_use]
175    pub fn modified_by(&self, tx: TxId) -> bool {
176        self.versions.iter().any(|v| v.info.created_by == tx)
177    }
178
179    /// Removes all versions created by the given transaction.
180    ///
181    /// Used for rollback to discard uncommitted changes.
182    pub fn remove_versions_by(&mut self, tx: TxId) {
183        self.versions.retain(|v| v.info.created_by != tx);
184    }
185
186    /// Checks if there's a concurrent modification conflict.
187    ///
188    /// A conflict exists if another transaction modified this entity
189    /// after our start epoch.
190    #[must_use]
191    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TxId) -> bool {
192        self.versions.iter().any(|v| {
193            v.info.created_by != our_tx && v.info.created_epoch.as_u64() > start_epoch.as_u64()
194        })
195    }
196
197    /// Returns the number of versions in the chain.
198    #[must_use]
199    pub fn version_count(&self) -> usize {
200        self.versions.len()
201    }
202
203    /// Returns true if the chain has no versions.
204    #[must_use]
205    pub fn is_empty(&self) -> bool {
206        self.versions.is_empty()
207    }
208
209    /// Garbage collects old versions that are no longer visible to any transaction.
210    ///
211    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
212    pub fn gc(&mut self, min_epoch: EpochId) {
213        if self.versions.is_empty() {
214            return;
215        }
216
217        let mut keep_count = 0;
218        let mut found_old_visible = false;
219
220        for (i, version) in self.versions.iter().enumerate() {
221            if version.info.created_epoch.as_u64() >= min_epoch.as_u64() {
222                keep_count = i + 1;
223            } else if !found_old_visible {
224                // Keep the first (most recent) old version
225                found_old_visible = true;
226                keep_count = i + 1;
227            }
228        }
229
230        self.versions.truncate(keep_count);
231    }
232
233    /// Returns a reference to the latest version's data regardless of visibility.
234    #[must_use]
235    pub fn latest(&self) -> Option<&T> {
236        self.versions.front().map(|v| &v.data)
237    }
238
239    /// Returns a mutable reference to the latest version's data.
240    #[must_use]
241    pub fn latest_mut(&mut self) -> Option<&mut T> {
242        self.versions.front_mut().map(|v| &mut v.data)
243    }
244}
245
246impl<T> Default for VersionChain<T> {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252impl<T: Clone> VersionChain<T> {
253    /// Gets a mutable reference to the visible version's data for modification.
254    ///
255    /// If the version is not owned by this transaction, creates a new version
256    /// with a copy of the data.
257    pub fn get_mut(&mut self, epoch: EpochId, tx: TxId, modify_epoch: EpochId) -> Option<&mut T> {
258        // Find the visible version
259        let visible_idx = self
260            .versions
261            .iter()
262            .position(|v| v.info.is_visible_to(epoch, tx))?;
263
264        let visible = &self.versions[visible_idx];
265
266        if visible.info.created_by == tx {
267            // Already our version, modify in place
268            Some(&mut self.versions[visible_idx].data)
269        } else {
270            // Create a new version with copied data
271            let new_data = visible.data.clone();
272            self.add_version(new_data, modify_epoch, tx);
273            Some(&mut self.versions[0].data)
274        }
275    }
276}
277
278// ============================================================================
279// Tiered Storage Types (Phase 13)
280// ============================================================================
281//
282// These types support the tiered hot/cold storage architecture where version
283// metadata is stored separately from version data. Data lives in arenas (hot)
284// or compressed epoch blocks (cold), while VersionIndex holds lightweight refs.
285
286/// Compact representation of an optional epoch ID.
287///
288/// Uses `u32::MAX` as sentinel for `None`, allowing epochs up to ~4 billion.
289/// This saves 4 bytes compared to `Option<EpochId>` due to niche optimization.
290#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
291#[repr(transparent)]
292#[cfg(feature = "tiered-storage")]
293pub struct OptionalEpochId(u32);
294
295#[cfg(feature = "tiered-storage")]
296impl OptionalEpochId {
297    /// Represents no epoch (deleted_epoch = None).
298    pub const NONE: Self = Self(u32::MAX);
299
300    /// Creates an `OptionalEpochId` from an epoch.
301    ///
302    /// # Panics
303    /// Panics if epoch exceeds u32::MAX - 1 (4,294,967,294).
304    #[must_use]
305    pub fn some(epoch: EpochId) -> Self {
306        assert!(
307            epoch.as_u64() < u64::from(u32::MAX),
308            "epoch {} exceeds OptionalEpochId capacity (max {})",
309            epoch.as_u64(),
310            u32::MAX as u64 - 1
311        );
312        Self(epoch.as_u64() as u32)
313    }
314
315    /// Returns the contained epoch, or `None` if this is `NONE`.
316    #[inline]
317    #[must_use]
318    pub fn get(self) -> Option<EpochId> {
319        if self.0 == u32::MAX {
320            None
321        } else {
322            Some(EpochId::new(u64::from(self.0)))
323        }
324    }
325
326    /// Returns `true` if this contains an epoch.
327    #[must_use]
328    pub fn is_some(self) -> bool {
329        self.0 != u32::MAX
330    }
331
332    /// Returns `true` if this is `NONE`.
333    #[inline]
334    #[must_use]
335    pub fn is_none(self) -> bool {
336        self.0 == u32::MAX
337    }
338}
339
340/// Reference to a hot (arena-allocated) version.
341///
342/// Hot versions are stored in the epoch's arena and can be accessed directly.
343/// This struct only holds metadata; the actual data lives in the arena.
344///
345/// # Memory Layout
346/// - `epoch`: 8 bytes
347/// - `arena_offset`: 4 bytes
348/// - `created_by`: 8 bytes
349/// - `deleted_epoch`: 4 bytes
350/// - Total: 24 bytes
351#[derive(Debug, Clone, Copy, PartialEq, Eq)]
352#[cfg(feature = "tiered-storage")]
353pub struct HotVersionRef {
354    /// Epoch when this version was created.
355    pub epoch: EpochId,
356    /// Offset within the epoch's arena where the data is stored.
357    pub arena_offset: u32,
358    /// Transaction that created this version.
359    pub created_by: TxId,
360    /// Epoch when this version was deleted (NONE if still alive).
361    pub deleted_epoch: OptionalEpochId,
362}
363
364#[cfg(feature = "tiered-storage")]
365impl HotVersionRef {
366    /// Creates a new hot version reference.
367    #[must_use]
368    pub fn new(epoch: EpochId, arena_offset: u32, created_by: TxId) -> Self {
369        Self {
370            epoch,
371            arena_offset,
372            created_by,
373            deleted_epoch: OptionalEpochId::NONE,
374        }
375    }
376
377    /// Checks if this version is visible at the given epoch.
378    #[inline]
379    #[must_use]
380    pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
381        // Must be created at or before the viewing epoch
382        if !self.epoch.is_visible_at(viewing_epoch) {
383            return false;
384        }
385        // Must not be deleted at or before the viewing epoch
386        match self.deleted_epoch.get() {
387            Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
388            None => true,
389        }
390    }
391
392    /// Checks if this version is visible to a specific transaction.
393    #[inline]
394    #[must_use]
395    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
396        // Own modifications are always visible (if not deleted by self)
397        if self.created_by == viewing_tx {
398            return self.deleted_epoch.is_none();
399        }
400        // Otherwise use epoch-based visibility
401        self.is_visible_at(viewing_epoch)
402    }
403}
404
405/// Reference to a cold (compressed) version.
406///
407/// Cold versions are stored in compressed epoch blocks. This is a placeholder
408/// for Phase 14 - the actual compression logic will be implemented there.
409///
410/// # Memory Layout
411/// - `epoch`: 8 bytes
412/// - `block_offset`: 4 bytes
413/// - `length`: 2 bytes
414/// - `created_by`: 8 bytes
415/// - `deleted_epoch`: 4 bytes
416/// - Total: 26 bytes (+ 6 padding = 32 bytes aligned)
417#[derive(Debug, Clone, Copy, PartialEq, Eq)]
418#[cfg(feature = "tiered-storage")]
419pub struct ColdVersionRef {
420    /// Epoch when this version was created.
421    pub epoch: EpochId,
422    /// Offset within the compressed epoch block.
423    pub block_offset: u32,
424    /// Compressed length in bytes.
425    pub length: u16,
426    /// Transaction that created this version.
427    pub created_by: TxId,
428    /// Epoch when this version was deleted.
429    pub deleted_epoch: OptionalEpochId,
430}
431
432#[cfg(feature = "tiered-storage")]
433impl ColdVersionRef {
434    /// Checks if this version is visible at the given epoch.
435    #[inline]
436    #[must_use]
437    pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
438        if !self.epoch.is_visible_at(viewing_epoch) {
439            return false;
440        }
441        match self.deleted_epoch.get() {
442            Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
443            None => true,
444        }
445    }
446
447    /// Checks if this version is visible to a specific transaction.
448    #[inline]
449    #[must_use]
450    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
451        if self.created_by == viewing_tx {
452            return self.deleted_epoch.is_none();
453        }
454        self.is_visible_at(viewing_epoch)
455    }
456}
457
458/// Unified reference to either a hot or cold version.
459#[derive(Debug, Clone, Copy)]
460#[cfg(feature = "tiered-storage")]
461pub enum VersionRef {
462    /// Version data is in arena (hot tier).
463    Hot(HotVersionRef),
464    /// Version data is in compressed storage (cold tier).
465    Cold(ColdVersionRef),
466}
467
468#[cfg(feature = "tiered-storage")]
469impl VersionRef {
470    /// Returns the epoch when this version was created.
471    #[must_use]
472    pub fn epoch(&self) -> EpochId {
473        match self {
474            Self::Hot(h) => h.epoch,
475            Self::Cold(c) => c.epoch,
476        }
477    }
478
479    /// Returns the transaction that created this version.
480    #[must_use]
481    pub fn created_by(&self) -> TxId {
482        match self {
483            Self::Hot(h) => h.created_by,
484            Self::Cold(c) => c.created_by,
485        }
486    }
487
488    /// Returns `true` if this is a hot version.
489    #[must_use]
490    pub fn is_hot(&self) -> bool {
491        matches!(self, Self::Hot(_))
492    }
493
494    /// Returns `true` if this is a cold version.
495    #[must_use]
496    pub fn is_cold(&self) -> bool {
497        matches!(self, Self::Cold(_))
498    }
499}
500
501/// Tiered version index - replaces `VersionChain<T>` for hot/cold storage.
502///
503/// Instead of storing data inline, `VersionIndex` holds lightweight references
504/// to data stored in arenas (hot) or compressed blocks (cold). This enables:
505///
506/// - **No heap allocation** for typical 1-2 version case (SmallVec inline)
507/// - **Separation of metadata and data** for compression
508/// - **Fast visibility checks** via cached `latest_epoch`
509/// - **O(1) epoch drops** instead of per-version deallocation
510///
511/// # Memory Layout
512/// - `hot`: SmallVec<[HotVersionRef; 2]> ≈ 56 bytes inline
513/// - `cold`: SmallVec<[ColdVersionRef; 4]> ≈ 136 bytes inline
514/// - `latest_epoch`: 8 bytes
515/// - Total: ~200 bytes, no heap for typical case
516#[derive(Debug, Clone)]
517#[cfg(feature = "tiered-storage")]
518pub struct VersionIndex {
519    /// Hot versions in arena storage (most recent first).
520    hot: SmallVec<[HotVersionRef; 2]>,
521    /// Cold versions in compressed storage (most recent first).
522    cold: SmallVec<[ColdVersionRef; 4]>,
523    /// Cached epoch of the latest version for fast staleness checks.
524    latest_epoch: EpochId,
525}
526
527#[cfg(feature = "tiered-storage")]
528impl VersionIndex {
529    /// Creates a new empty version index.
530    #[must_use]
531    pub fn new() -> Self {
532        Self {
533            hot: SmallVec::new(),
534            cold: SmallVec::new(),
535            latest_epoch: EpochId::INITIAL,
536        }
537    }
538
539    /// Creates a version index with an initial hot version.
540    #[must_use]
541    pub fn with_initial(hot_ref: HotVersionRef) -> Self {
542        let mut index = Self::new();
543        index.add_hot(hot_ref);
544        index
545    }
546
547    /// Adds a new hot version (becomes the latest).
548    pub fn add_hot(&mut self, hot_ref: HotVersionRef) {
549        // Insert at front (most recent first)
550        self.hot.insert(0, hot_ref);
551        self.latest_epoch = hot_ref.epoch;
552    }
553
554    /// Returns the latest epoch for quick staleness checks.
555    #[must_use]
556    pub fn latest_epoch(&self) -> EpochId {
557        self.latest_epoch
558    }
559
560    /// Returns `true` if this entity has no versions.
561    #[must_use]
562    pub fn is_empty(&self) -> bool {
563        self.hot.is_empty() && self.cold.is_empty()
564    }
565
566    /// Returns the total version count (hot + cold).
567    #[must_use]
568    pub fn version_count(&self) -> usize {
569        self.hot.len() + self.cold.len()
570    }
571
572    /// Returns the number of hot versions.
573    #[must_use]
574    pub fn hot_count(&self) -> usize {
575        self.hot.len()
576    }
577
578    /// Returns the number of cold versions.
579    #[must_use]
580    pub fn cold_count(&self) -> usize {
581        self.cold.len()
582    }
583
584    /// Finds the version visible at the given epoch.
585    #[inline]
586    #[must_use]
587    pub fn visible_at(&self, epoch: EpochId) -> Option<VersionRef> {
588        // Check hot versions first (most recent first, likely case)
589        for v in &self.hot {
590            if v.is_visible_at(epoch) {
591                return Some(VersionRef::Hot(*v));
592            }
593        }
594        // Fall back to cold versions
595        for v in &self.cold {
596            if v.is_visible_at(epoch) {
597                return Some(VersionRef::Cold(*v));
598            }
599        }
600        None
601    }
602
603    /// Finds the version visible to a specific transaction.
604    #[inline]
605    #[must_use]
606    pub fn visible_to(&self, epoch: EpochId, tx: TxId) -> Option<VersionRef> {
607        // Check hot versions first
608        for v in &self.hot {
609            if v.is_visible_to(epoch, tx) {
610                return Some(VersionRef::Hot(*v));
611            }
612        }
613        // Fall back to cold versions
614        for v in &self.cold {
615            if v.is_visible_to(epoch, tx) {
616                return Some(VersionRef::Cold(*v));
617            }
618        }
619        None
620    }
621
622    /// Marks the currently visible version as deleted at the given epoch.
623    ///
624    /// Returns `true` if a version was marked, `false` if no visible version exists.
625    pub fn mark_deleted(&mut self, delete_epoch: EpochId) -> bool {
626        // Find the first non-deleted hot version and mark it
627        for v in &mut self.hot {
628            if v.deleted_epoch.is_none() {
629                v.deleted_epoch = OptionalEpochId::some(delete_epoch);
630                return true;
631            }
632        }
633        // Check cold versions (rare case)
634        for v in &mut self.cold {
635            if v.deleted_epoch.is_none() {
636                v.deleted_epoch = OptionalEpochId::some(delete_epoch);
637                return true;
638            }
639        }
640        false
641    }
642
643    /// Checks if any version was created by the given transaction.
644    #[must_use]
645    pub fn modified_by(&self, tx: TxId) -> bool {
646        self.hot.iter().any(|v| v.created_by == tx) || self.cold.iter().any(|v| v.created_by == tx)
647    }
648
649    /// Removes all versions created by the given transaction (for rollback).
650    pub fn remove_versions_by(&mut self, tx: TxId) {
651        self.hot.retain(|v| v.created_by != tx);
652        self.cold.retain(|v| v.created_by != tx);
653        self.recalculate_latest_epoch();
654    }
655
656    /// Checks for write conflict with concurrent transaction.
657    ///
658    /// A conflict exists if another transaction modified this entity
659    /// after our start epoch.
660    #[must_use]
661    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TxId) -> bool {
662        self.hot
663            .iter()
664            .any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
665            || self
666                .cold
667                .iter()
668                .any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
669    }
670
671    /// Garbage collects old versions not needed by any active transaction.
672    ///
673    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
674    pub fn gc(&mut self, min_epoch: EpochId) {
675        if self.is_empty() {
676            return;
677        }
678
679        // Keep versions that:
680        // 1. Were created at or after min_epoch
681        // 2. The first (most recent) version created before min_epoch
682        let mut found_old_visible = false;
683
684        self.hot.retain(|v| {
685            if v.epoch.as_u64() >= min_epoch.as_u64() {
686                true
687            } else if !found_old_visible {
688                found_old_visible = true;
689                true
690            } else {
691                false
692            }
693        });
694
695        // Same for cold, but only if we haven't found an old visible in hot
696        if !found_old_visible {
697            self.cold.retain(|v| {
698                if v.epoch.as_u64() >= min_epoch.as_u64() {
699                    true
700                } else if !found_old_visible {
701                    found_old_visible = true;
702                    true
703                } else {
704                    false
705                }
706            });
707        } else {
708            // All cold versions are older, only keep those >= min_epoch
709            self.cold.retain(|v| v.epoch.as_u64() >= min_epoch.as_u64());
710        }
711    }
712
713    /// Returns a reference to the latest version regardless of visibility.
714    #[must_use]
715    pub fn latest(&self) -> Option<VersionRef> {
716        self.hot
717            .first()
718            .map(|v| VersionRef::Hot(*v))
719            .or_else(|| self.cold.first().map(|v| VersionRef::Cold(*v)))
720    }
721
722    /// Freezes hot versions for a given epoch into cold storage.
723    ///
724    /// This is called when an epoch is no longer needed by any active transaction.
725    /// The actual compression happens in Phase 14; for now this just moves refs.
726    pub fn freeze_epoch(
727        &mut self,
728        epoch: EpochId,
729        cold_refs: impl Iterator<Item = ColdVersionRef>,
730    ) {
731        // Remove hot refs for this epoch
732        self.hot.retain(|v| v.epoch != epoch);
733
734        // Add cold refs
735        self.cold.extend(cold_refs);
736
737        // Keep cold sorted by epoch (descending = most recent first)
738        self.cold
739            .sort_by(|a, b| b.epoch.as_u64().cmp(&a.epoch.as_u64()));
740
741        self.recalculate_latest_epoch();
742    }
743
744    /// Returns hot version refs for a specific epoch (for freeze operation).
745    pub fn hot_refs_for_epoch(&self, epoch: EpochId) -> impl Iterator<Item = &HotVersionRef> {
746        self.hot.iter().filter(move |v| v.epoch == epoch)
747    }
748
749    /// Returns `true` if the hot SmallVec has spilled to the heap.
750    #[must_use]
751    pub fn hot_spilled(&self) -> bool {
752        self.hot.spilled()
753    }
754
755    /// Returns `true` if the cold SmallVec has spilled to the heap.
756    #[must_use]
757    pub fn cold_spilled(&self) -> bool {
758        self.cold.spilled()
759    }
760
761    fn recalculate_latest_epoch(&mut self) {
762        self.latest_epoch = self
763            .hot
764            .first()
765            .map(|v| v.epoch)
766            .or_else(|| self.cold.first().map(|v| v.epoch))
767            .unwrap_or(EpochId::INITIAL);
768    }
769}
770
771#[cfg(feature = "tiered-storage")]
772impl Default for VersionIndex {
773    fn default() -> Self {
774        Self::new()
775    }
776}
777
778#[cfg(test)]
779mod tests {
780    use super::*;
781
782    #[test]
783    fn test_version_visibility() {
784        let v = VersionInfo::new(EpochId::new(5), TxId::new(1));
785
786        // Not visible before creation
787        assert!(!v.is_visible_at(EpochId::new(4)));
788
789        // Visible at creation epoch and after
790        assert!(v.is_visible_at(EpochId::new(5)));
791        assert!(v.is_visible_at(EpochId::new(10)));
792    }
793
794    #[test]
795    fn test_deleted_version_visibility() {
796        let mut v = VersionInfo::new(EpochId::new(5), TxId::new(1));
797        v.mark_deleted(EpochId::new(10));
798
799        // Visible between creation and deletion
800        assert!(v.is_visible_at(EpochId::new(5)));
801        assert!(v.is_visible_at(EpochId::new(9)));
802
803        // Not visible at or after deletion
804        assert!(!v.is_visible_at(EpochId::new(10)));
805        assert!(!v.is_visible_at(EpochId::new(15)));
806    }
807
808    #[test]
809    fn test_version_visibility_to_transaction() {
810        let v = VersionInfo::new(EpochId::new(5), TxId::new(1));
811
812        // Creator can see it even if viewing at earlier epoch
813        assert!(v.is_visible_to(EpochId::new(3), TxId::new(1)));
814
815        // Other transactions can only see it at or after creation epoch
816        assert!(!v.is_visible_to(EpochId::new(3), TxId::new(2)));
817        assert!(v.is_visible_to(EpochId::new(5), TxId::new(2)));
818    }
819
820    #[test]
821    fn test_version_chain_basic() {
822        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
823
824        // Should see v1 at epoch 1+
825        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
826        assert_eq!(chain.visible_at(EpochId::new(0)), None);
827
828        // Add v2
829        chain.add_version("v2", EpochId::new(5), TxId::new(2));
830
831        // Should see v1 at epoch < 5, v2 at epoch >= 5
832        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
833        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
834        assert_eq!(chain.visible_at(EpochId::new(5)), Some(&"v2"));
835        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v2"));
836    }
837
838    #[test]
839    fn test_version_chain_rollback() {
840        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
841        chain.add_version("v2", EpochId::new(5), TxId::new(2));
842        chain.add_version("v3", EpochId::new(6), TxId::new(2));
843
844        assert_eq!(chain.version_count(), 3);
845
846        // Rollback tx 2's changes
847        chain.remove_versions_by(TxId::new(2));
848
849        assert_eq!(chain.version_count(), 1);
850        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v1"));
851    }
852
853    #[test]
854    fn test_version_chain_deletion() {
855        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
856
857        // Mark as deleted at epoch 5
858        assert!(chain.mark_deleted(EpochId::new(5)));
859
860        // Should see v1 before deletion, nothing after
861        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
862        assert_eq!(chain.visible_at(EpochId::new(5)), None);
863        assert_eq!(chain.visible_at(EpochId::new(10)), None);
864    }
865}
866
867// ============================================================================
868// Tiered Storage Tests
869// ============================================================================
870
871#[cfg(all(test, feature = "tiered-storage"))]
872mod tiered_storage_tests {
873    use super::*;
874
875    #[test]
876    fn test_optional_epoch_id() {
877        // Test NONE
878        let none = OptionalEpochId::NONE;
879        assert!(none.is_none());
880        assert!(!none.is_some());
881        assert_eq!(none.get(), None);
882
883        // Test Some
884        let some = OptionalEpochId::some(EpochId::new(42));
885        assert!(some.is_some());
886        assert!(!some.is_none());
887        assert_eq!(some.get(), Some(EpochId::new(42)));
888
889        // Test zero epoch
890        let zero = OptionalEpochId::some(EpochId::new(0));
891        assert!(zero.is_some());
892        assert_eq!(zero.get(), Some(EpochId::new(0)));
893    }
894
895    #[test]
896    fn test_hot_version_ref_visibility() {
897        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
898
899        // Not visible before creation
900        assert!(!hot.is_visible_at(EpochId::new(4)));
901
902        // Visible at creation and after
903        assert!(hot.is_visible_at(EpochId::new(5)));
904        assert!(hot.is_visible_at(EpochId::new(10)));
905    }
906
907    #[test]
908    fn test_hot_version_ref_deleted_visibility() {
909        let mut hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
910        hot.deleted_epoch = OptionalEpochId::some(EpochId::new(10));
911
912        // Visible between creation and deletion
913        assert!(hot.is_visible_at(EpochId::new(5)));
914        assert!(hot.is_visible_at(EpochId::new(9)));
915
916        // Not visible at or after deletion
917        assert!(!hot.is_visible_at(EpochId::new(10)));
918        assert!(!hot.is_visible_at(EpochId::new(15)));
919    }
920
921    #[test]
922    fn test_hot_version_ref_transaction_visibility() {
923        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
924
925        // Creator can see it even at earlier epoch
926        assert!(hot.is_visible_to(EpochId::new(3), TxId::new(1)));
927
928        // Other transactions can only see it at or after creation
929        assert!(!hot.is_visible_to(EpochId::new(3), TxId::new(2)));
930        assert!(hot.is_visible_to(EpochId::new(5), TxId::new(2)));
931    }
932
933    #[test]
934    fn test_version_index_basic() {
935        let hot = HotVersionRef::new(EpochId::new(1), 0, TxId::new(1));
936        let mut index = VersionIndex::with_initial(hot);
937
938        // Should see version at epoch 1+
939        assert!(index.visible_at(EpochId::new(1)).is_some());
940        assert!(index.visible_at(EpochId::new(0)).is_none());
941
942        // Add another version
943        let hot2 = HotVersionRef::new(EpochId::new(5), 100, TxId::new(2));
944        index.add_hot(hot2);
945
946        // Should see v1 at epoch < 5, v2 at epoch >= 5
947        let v1 = index.visible_at(EpochId::new(4)).unwrap();
948        assert!(matches!(v1, VersionRef::Hot(h) if h.arena_offset == 0));
949
950        let v2 = index.visible_at(EpochId::new(5)).unwrap();
951        assert!(matches!(v2, VersionRef::Hot(h) if h.arena_offset == 100));
952    }
953
954    #[test]
955    fn test_version_index_deletion() {
956        let hot = HotVersionRef::new(EpochId::new(1), 0, TxId::new(1));
957        let mut index = VersionIndex::with_initial(hot);
958
959        // Mark as deleted at epoch 5
960        assert!(index.mark_deleted(EpochId::new(5)));
961
962        // Should see version before deletion, nothing after
963        assert!(index.visible_at(EpochId::new(4)).is_some());
964        assert!(index.visible_at(EpochId::new(5)).is_none());
965        assert!(index.visible_at(EpochId::new(10)).is_none());
966    }
967
968    #[test]
969    fn test_version_index_transaction_visibility() {
970        let tx = TxId::new(10);
971        let hot = HotVersionRef::new(EpochId::new(5), 0, tx);
972        let index = VersionIndex::with_initial(hot);
973
974        // Creator can see it even at earlier epoch
975        assert!(index.visible_to(EpochId::new(3), tx).is_some());
976
977        // Other transactions cannot see it before creation
978        assert!(index.visible_to(EpochId::new(3), TxId::new(20)).is_none());
979        assert!(index.visible_to(EpochId::new(5), TxId::new(20)).is_some());
980    }
981
982    #[test]
983    fn test_version_index_rollback() {
984        let tx1 = TxId::new(10);
985        let tx2 = TxId::new(20);
986
987        let mut index = VersionIndex::new();
988        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, tx1));
989        index.add_hot(HotVersionRef::new(EpochId::new(2), 100, tx2));
990        index.add_hot(HotVersionRef::new(EpochId::new(3), 200, tx2));
991
992        assert_eq!(index.version_count(), 3);
993        assert!(index.modified_by(tx1));
994        assert!(index.modified_by(tx2));
995
996        // Rollback tx2's changes
997        index.remove_versions_by(tx2);
998
999        assert_eq!(index.version_count(), 1);
1000        assert!(index.modified_by(tx1));
1001        assert!(!index.modified_by(tx2));
1002
1003        // Should only see tx1's version
1004        let v = index.visible_at(EpochId::new(10)).unwrap();
1005        assert!(matches!(v, VersionRef::Hot(h) if h.created_by == tx1));
1006    }
1007
1008    #[test]
1009    fn test_version_index_gc() {
1010        let mut index = VersionIndex::new();
1011
1012        // Add versions at epochs 1, 3, 5
1013        for epoch in [1, 3, 5] {
1014            index.add_hot(HotVersionRef::new(
1015                EpochId::new(epoch),
1016                epoch as u32 * 100,
1017                TxId::new(epoch),
1018            ));
1019        }
1020
1021        assert_eq!(index.version_count(), 3);
1022
1023        // GC with min_epoch = 4
1024        // Should keep: epoch 5 (>= 4) and epoch 3 (first old visible)
1025        index.gc(EpochId::new(4));
1026
1027        assert_eq!(index.version_count(), 2);
1028
1029        // Verify we kept epochs 5 and 3
1030        assert!(index.visible_at(EpochId::new(5)).is_some());
1031        assert!(index.visible_at(EpochId::new(3)).is_some());
1032    }
1033
1034    #[test]
1035    fn test_version_index_conflict_detection() {
1036        let tx1 = TxId::new(10);
1037        let tx2 = TxId::new(20);
1038
1039        let mut index = VersionIndex::new();
1040        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, tx1));
1041        index.add_hot(HotVersionRef::new(EpochId::new(5), 100, tx2));
1042
1043        // tx1 started at epoch 0, tx2 modified at epoch 5 -> conflict for tx1
1044        assert!(index.has_conflict(EpochId::new(0), tx1));
1045
1046        // tx2 started at epoch 0, tx1 modified at epoch 1 -> also conflict for tx2
1047        assert!(index.has_conflict(EpochId::new(0), tx2));
1048
1049        // tx1 started after tx2's modification -> no conflict
1050        assert!(!index.has_conflict(EpochId::new(5), tx1));
1051
1052        // tx2 started after tx1's modification -> no conflict
1053        assert!(!index.has_conflict(EpochId::new(1), tx2));
1054
1055        // If only tx1's version exists, tx1 doesn't conflict with itself
1056        let mut index2 = VersionIndex::new();
1057        index2.add_hot(HotVersionRef::new(EpochId::new(5), 0, tx1));
1058        assert!(!index2.has_conflict(EpochId::new(0), tx1));
1059    }
1060
1061    #[test]
1062    fn test_version_index_smallvec_no_heap() {
1063        let mut index = VersionIndex::new();
1064
1065        // Add 2 hot versions (within inline capacity)
1066        for i in 0..2 {
1067            index.add_hot(HotVersionRef::new(EpochId::new(i), i as u32, TxId::new(i)));
1068        }
1069
1070        // SmallVec should not have spilled to heap
1071        assert!(!index.hot_spilled());
1072        assert!(!index.cold_spilled());
1073    }
1074
1075    #[test]
1076    fn test_version_index_freeze_epoch() {
1077        let mut index = VersionIndex::new();
1078        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, TxId::new(1)));
1079        index.add_hot(HotVersionRef::new(EpochId::new(2), 100, TxId::new(2)));
1080
1081        assert_eq!(index.hot_count(), 2);
1082        assert_eq!(index.cold_count(), 0);
1083
1084        // Freeze epoch 1
1085        let cold_ref = ColdVersionRef {
1086            epoch: EpochId::new(1),
1087            block_offset: 0,
1088            length: 32,
1089            created_by: TxId::new(1),
1090            deleted_epoch: OptionalEpochId::NONE,
1091        };
1092        index.freeze_epoch(EpochId::new(1), std::iter::once(cold_ref));
1093
1094        // Hot should have 1, cold should have 1
1095        assert_eq!(index.hot_count(), 1);
1096        assert_eq!(index.cold_count(), 1);
1097
1098        // Visibility should still work
1099        assert!(index.visible_at(EpochId::new(1)).is_some());
1100        assert!(index.visible_at(EpochId::new(2)).is_some());
1101
1102        // Check that cold version is actually cold
1103        let v1 = index.visible_at(EpochId::new(1)).unwrap();
1104        assert!(v1.is_cold());
1105
1106        let v2 = index.visible_at(EpochId::new(2)).unwrap();
1107        assert!(v2.is_hot());
1108    }
1109
1110    #[test]
1111    fn test_version_ref_accessors() {
1112        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(10));
1113        let vr = VersionRef::Hot(hot);
1114
1115        assert_eq!(vr.epoch(), EpochId::new(5));
1116        assert_eq!(vr.created_by(), TxId::new(10));
1117        assert!(vr.is_hot());
1118        assert!(!vr.is_cold());
1119    }
1120
1121    #[test]
1122    fn test_version_index_latest_epoch() {
1123        let mut index = VersionIndex::new();
1124        assert_eq!(index.latest_epoch(), EpochId::INITIAL);
1125
1126        index.add_hot(HotVersionRef::new(EpochId::new(5), 0, TxId::new(1)));
1127        assert_eq!(index.latest_epoch(), EpochId::new(5));
1128
1129        index.add_hot(HotVersionRef::new(EpochId::new(10), 100, TxId::new(2)));
1130        assert_eq!(index.latest_epoch(), EpochId::new(10));
1131
1132        // After rollback, should recalculate
1133        index.remove_versions_by(TxId::new(2));
1134        assert_eq!(index.latest_epoch(), EpochId::new(5));
1135    }
1136
1137    #[test]
1138    fn test_version_index_default() {
1139        let index = VersionIndex::default();
1140        assert!(index.is_empty());
1141        assert_eq!(index.version_count(), 0);
1142    }
1143
1144    #[test]
1145    fn test_version_index_latest() {
1146        let mut index = VersionIndex::new();
1147        assert!(index.latest().is_none());
1148
1149        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, TxId::new(1)));
1150        let latest = index.latest().unwrap();
1151        assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(1)));
1152
1153        index.add_hot(HotVersionRef::new(EpochId::new(5), 100, TxId::new(2)));
1154        let latest = index.latest().unwrap();
1155        assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(5)));
1156    }
1157}