Skip to main content

grafeo_common/
mvcc.rs

1//! MVCC (Multi-Version Concurrency Control) primitives.
2//!
3//! This is how Grafeo handles concurrent reads and writes without blocking.
4//! Each entity has a [`VersionChain`] that tracks all versions. Readers see
5//! consistent snapshots, writers create new versions, and old versions get
6//! garbage collected when no one needs them anymore.
7
8use std::collections::VecDeque;
9
10#[cfg(feature = "tiered-storage")]
11use smallvec::SmallVec;
12
13use crate::types::{EpochId, TxId};
14
15/// Tracks when a version was created and deleted for visibility checks.
16#[derive(Debug, Clone, Copy)]
17pub struct VersionInfo {
18    /// The epoch this version was created in.
19    pub created_epoch: EpochId,
20    /// The epoch this version was deleted in (if any).
21    pub deleted_epoch: Option<EpochId>,
22    /// The transaction that created this version.
23    pub created_by: TxId,
24}
25
26impl VersionInfo {
27    /// Creates a new version info.
28    #[must_use]
29    pub fn new(created_epoch: EpochId, created_by: TxId) -> Self {
30        Self {
31            created_epoch,
32            deleted_epoch: None,
33            created_by,
34        }
35    }
36
37    /// Marks this version as deleted.
38    pub fn mark_deleted(&mut self, epoch: EpochId) {
39        self.deleted_epoch = Some(epoch);
40    }
41
42    /// Checks if this version is visible at the given epoch.
43    #[must_use]
44    pub fn is_visible_at(&self, epoch: EpochId) -> bool {
45        // Visible if created before or at the viewing epoch
46        // and not deleted before the viewing epoch
47        if !self.created_epoch.is_visible_at(epoch) {
48            return false;
49        }
50
51        if let Some(deleted) = self.deleted_epoch {
52            // Not visible if deleted at or before the viewing epoch
53            deleted.as_u64() > epoch.as_u64()
54        } else {
55            true
56        }
57    }
58
59    /// Checks if this version is visible to a specific transaction.
60    ///
61    /// A version is visible to a transaction if:
62    /// 1. It was created by the same transaction, OR
63    /// 2. It was created in an epoch before the transaction's start epoch
64    ///    and not deleted before that epoch
65    #[must_use]
66    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
67        // Own modifications are always visible
68        if self.created_by == viewing_tx {
69            return self.deleted_epoch.is_none();
70        }
71
72        // Otherwise, use epoch-based visibility
73        self.is_visible_at(viewing_epoch)
74    }
75}
76
77/// A single version of data.
78#[derive(Debug, Clone)]
79pub struct Version<T> {
80    /// Visibility metadata.
81    pub info: VersionInfo,
82    /// The actual data.
83    pub data: T,
84}
85
86impl<T> Version<T> {
87    /// Creates a new version.
88    #[must_use]
89    pub fn new(data: T, created_epoch: EpochId, created_by: TxId) -> Self {
90        Self {
91            info: VersionInfo::new(created_epoch, created_by),
92            data,
93        }
94    }
95}
96
97/// All versions of a single entity, newest first.
98///
99/// Each node/edge has one of these tracking its version history. Use
100/// [`visible_at()`](Self::visible_at) to get the version at a specific epoch,
101/// or [`visible_to()`](Self::visible_to) for transaction-aware visibility.
102#[derive(Debug, Clone)]
103pub struct VersionChain<T> {
104    /// Versions ordered newest-first.
105    versions: VecDeque<Version<T>>,
106}
107
108impl<T> VersionChain<T> {
109    /// Creates a new empty version chain.
110    #[must_use]
111    pub fn new() -> Self {
112        Self {
113            versions: VecDeque::new(),
114        }
115    }
116
117    /// Creates a version chain with an initial version.
118    #[must_use]
119    pub fn with_initial(data: T, created_epoch: EpochId, created_by: TxId) -> Self {
120        let mut chain = Self::new();
121        chain.add_version(data, created_epoch, created_by);
122        chain
123    }
124
125    /// Adds a new version to the chain.
126    ///
127    /// The new version becomes the head of the chain.
128    pub fn add_version(&mut self, data: T, created_epoch: EpochId, created_by: TxId) {
129        let version = Version::new(data, created_epoch, created_by);
130        self.versions.push_front(version);
131    }
132
133    /// Finds the version visible at the given epoch.
134    ///
135    /// Returns a reference to the visible version's data, or `None` if no version
136    /// is visible at that epoch.
137    #[must_use]
138    pub fn visible_at(&self, epoch: EpochId) -> Option<&T> {
139        self.versions
140            .iter()
141            .find(|v| v.info.is_visible_at(epoch))
142            .map(|v| &v.data)
143    }
144
145    /// Finds the version visible to a specific transaction.
146    ///
147    /// This considers both the transaction's epoch and its own uncommitted changes.
148    #[must_use]
149    pub fn visible_to(&self, epoch: EpochId, tx: TxId) -> Option<&T> {
150        self.versions
151            .iter()
152            .find(|v| v.info.is_visible_to(epoch, tx))
153            .map(|v| &v.data)
154    }
155
156    /// Marks the current visible version as deleted.
157    ///
158    /// Returns `true` if a version was marked, `false` if no visible version exists.
159    pub fn mark_deleted(&mut self, delete_epoch: EpochId) -> bool {
160        for version in &mut self.versions {
161            if version.info.deleted_epoch.is_none() {
162                version.info.mark_deleted(delete_epoch);
163                return true;
164            }
165        }
166        false
167    }
168
169    /// Checks if any version was modified by the given transaction.
170    #[must_use]
171    pub fn modified_by(&self, tx: TxId) -> bool {
172        self.versions.iter().any(|v| v.info.created_by == tx)
173    }
174
175    /// Removes all versions created by the given transaction.
176    ///
177    /// Used for rollback to discard uncommitted changes.
178    pub fn remove_versions_by(&mut self, tx: TxId) {
179        self.versions.retain(|v| v.info.created_by != tx);
180    }
181
182    /// Checks if there's a concurrent modification conflict.
183    ///
184    /// A conflict exists if another transaction modified this entity
185    /// after our start epoch.
186    #[must_use]
187    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TxId) -> bool {
188        self.versions.iter().any(|v| {
189            v.info.created_by != our_tx && v.info.created_epoch.as_u64() > start_epoch.as_u64()
190        })
191    }
192
193    /// Returns the number of versions in the chain.
194    #[must_use]
195    pub fn version_count(&self) -> usize {
196        self.versions.len()
197    }
198
199    /// Returns true if the chain has no versions.
200    #[must_use]
201    pub fn is_empty(&self) -> bool {
202        self.versions.is_empty()
203    }
204
205    /// Garbage collects old versions that are no longer visible to any transaction.
206    ///
207    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
208    pub fn gc(&mut self, min_epoch: EpochId) {
209        if self.versions.is_empty() {
210            return;
211        }
212
213        let mut keep_count = 0;
214        let mut found_old_visible = false;
215
216        for (i, version) in self.versions.iter().enumerate() {
217            if version.info.created_epoch.as_u64() >= min_epoch.as_u64() {
218                keep_count = i + 1;
219            } else if !found_old_visible {
220                // Keep the first (most recent) old version
221                found_old_visible = true;
222                keep_count = i + 1;
223            }
224        }
225
226        self.versions.truncate(keep_count);
227    }
228
229    /// Returns a reference to the latest version's data regardless of visibility.
230    #[must_use]
231    pub fn latest(&self) -> Option<&T> {
232        self.versions.front().map(|v| &v.data)
233    }
234
235    /// Returns a mutable reference to the latest version's data.
236    #[must_use]
237    pub fn latest_mut(&mut self) -> Option<&mut T> {
238        self.versions.front_mut().map(|v| &mut v.data)
239    }
240}
241
242impl<T> Default for VersionChain<T> {
243    fn default() -> Self {
244        Self::new()
245    }
246}
247
248impl<T: Clone> VersionChain<T> {
249    /// Gets a mutable reference to the visible version's data for modification.
250    ///
251    /// If the version is not owned by this transaction, creates a new version
252    /// with a copy of the data.
253    pub fn get_mut(&mut self, epoch: EpochId, tx: TxId, modify_epoch: EpochId) -> Option<&mut T> {
254        // Find the visible version
255        let visible_idx = self
256            .versions
257            .iter()
258            .position(|v| v.info.is_visible_to(epoch, tx))?;
259
260        let visible = &self.versions[visible_idx];
261
262        if visible.info.created_by == tx {
263            // Already our version, modify in place
264            Some(&mut self.versions[visible_idx].data)
265        } else {
266            // Create a new version with copied data
267            let new_data = visible.data.clone();
268            self.add_version(new_data, modify_epoch, tx);
269            Some(&mut self.versions[0].data)
270        }
271    }
272}
273
274// ============================================================================
275// Tiered Storage Types (Phase 13)
276// ============================================================================
277//
278// These types support the tiered hot/cold storage architecture where version
279// metadata is stored separately from version data. Data lives in arenas (hot)
280// or compressed epoch blocks (cold), while VersionIndex holds lightweight refs.
281
282/// Compact representation of an optional epoch ID.
283///
284/// Uses `u32::MAX` as sentinel for `None`, allowing epochs up to ~4 billion.
285/// This saves 4 bytes compared to `Option<EpochId>` due to niche optimization.
286#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
287#[repr(transparent)]
288#[cfg(feature = "tiered-storage")]
289pub struct OptionalEpochId(u32);
290
291#[cfg(feature = "tiered-storage")]
292impl OptionalEpochId {
293    /// Represents no epoch (deleted_epoch = None).
294    pub const NONE: Self = Self(u32::MAX);
295
296    /// Creates an `OptionalEpochId` from an epoch.
297    ///
298    /// # Panics
299    /// Debug-panics if epoch exceeds u32::MAX - 1.
300    #[must_use]
301    pub fn some(epoch: EpochId) -> Self {
302        debug_assert!(
303            epoch.as_u64() < u64::from(u32::MAX),
304            "epoch {} exceeds OptionalEpochId capacity",
305            epoch.as_u64()
306        );
307        Self(epoch.as_u64() as u32)
308    }
309
310    /// Returns the contained epoch, or `None` if this is `NONE`.
311    #[must_use]
312    pub fn get(self) -> Option<EpochId> {
313        if self.0 == u32::MAX {
314            None
315        } else {
316            Some(EpochId::new(u64::from(self.0)))
317        }
318    }
319
320    /// Returns `true` if this contains an epoch.
321    #[must_use]
322    pub fn is_some(self) -> bool {
323        self.0 != u32::MAX
324    }
325
326    /// Returns `true` if this is `NONE`.
327    #[must_use]
328    pub fn is_none(self) -> bool {
329        self.0 == u32::MAX
330    }
331}
332
333/// Reference to a hot (arena-allocated) version.
334///
335/// Hot versions are stored in the epoch's arena and can be accessed directly.
336/// This struct only holds metadata; the actual data lives in the arena.
337///
338/// # Memory Layout
339/// - `epoch`: 8 bytes
340/// - `arena_offset`: 4 bytes
341/// - `created_by`: 8 bytes
342/// - `deleted_epoch`: 4 bytes
343/// - Total: 24 bytes
344#[derive(Debug, Clone, Copy, PartialEq, Eq)]
345#[cfg(feature = "tiered-storage")]
346pub struct HotVersionRef {
347    /// Epoch when this version was created.
348    pub epoch: EpochId,
349    /// Offset within the epoch's arena where the data is stored.
350    pub arena_offset: u32,
351    /// Transaction that created this version.
352    pub created_by: TxId,
353    /// Epoch when this version was deleted (NONE if still alive).
354    pub deleted_epoch: OptionalEpochId,
355}
356
357#[cfg(feature = "tiered-storage")]
358impl HotVersionRef {
359    /// Creates a new hot version reference.
360    #[must_use]
361    pub fn new(epoch: EpochId, arena_offset: u32, created_by: TxId) -> Self {
362        Self {
363            epoch,
364            arena_offset,
365            created_by,
366            deleted_epoch: OptionalEpochId::NONE,
367        }
368    }
369
370    /// Checks if this version is visible at the given epoch.
371    #[must_use]
372    pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
373        // Must be created at or before the viewing epoch
374        if !self.epoch.is_visible_at(viewing_epoch) {
375            return false;
376        }
377        // Must not be deleted at or before the viewing epoch
378        match self.deleted_epoch.get() {
379            Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
380            None => true,
381        }
382    }
383
384    /// Checks if this version is visible to a specific transaction.
385    #[must_use]
386    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
387        // Own modifications are always visible (if not deleted by self)
388        if self.created_by == viewing_tx {
389            return self.deleted_epoch.is_none();
390        }
391        // Otherwise use epoch-based visibility
392        self.is_visible_at(viewing_epoch)
393    }
394}
395
396/// Reference to a cold (compressed) version.
397///
398/// Cold versions are stored in compressed epoch blocks. This is a placeholder
399/// for Phase 14 - the actual compression logic will be implemented there.
400///
401/// # Memory Layout
402/// - `epoch`: 8 bytes
403/// - `block_offset`: 4 bytes
404/// - `length`: 2 bytes
405/// - `created_by`: 8 bytes
406/// - `deleted_epoch`: 4 bytes
407/// - Total: 26 bytes (+ 6 padding = 32 bytes aligned)
408#[derive(Debug, Clone, Copy, PartialEq, Eq)]
409#[cfg(feature = "tiered-storage")]
410pub struct ColdVersionRef {
411    /// Epoch when this version was created.
412    pub epoch: EpochId,
413    /// Offset within the compressed epoch block.
414    pub block_offset: u32,
415    /// Compressed length in bytes.
416    pub length: u16,
417    /// Transaction that created this version.
418    pub created_by: TxId,
419    /// Epoch when this version was deleted.
420    pub deleted_epoch: OptionalEpochId,
421}
422
423#[cfg(feature = "tiered-storage")]
424impl ColdVersionRef {
425    /// Checks if this version is visible at the given epoch.
426    #[must_use]
427    pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
428        if !self.epoch.is_visible_at(viewing_epoch) {
429            return false;
430        }
431        match self.deleted_epoch.get() {
432            Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
433            None => true,
434        }
435    }
436
437    /// Checks if this version is visible to a specific transaction.
438    #[must_use]
439    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
440        if self.created_by == viewing_tx {
441            return self.deleted_epoch.is_none();
442        }
443        self.is_visible_at(viewing_epoch)
444    }
445}
446
447/// Unified reference to either a hot or cold version.
448#[derive(Debug, Clone, Copy)]
449#[cfg(feature = "tiered-storage")]
450pub enum VersionRef {
451    /// Version data is in arena (hot tier).
452    Hot(HotVersionRef),
453    /// Version data is in compressed storage (cold tier).
454    Cold(ColdVersionRef),
455}
456
457#[cfg(feature = "tiered-storage")]
458impl VersionRef {
459    /// Returns the epoch when this version was created.
460    #[must_use]
461    pub fn epoch(&self) -> EpochId {
462        match self {
463            Self::Hot(h) => h.epoch,
464            Self::Cold(c) => c.epoch,
465        }
466    }
467
468    /// Returns the transaction that created this version.
469    #[must_use]
470    pub fn created_by(&self) -> TxId {
471        match self {
472            Self::Hot(h) => h.created_by,
473            Self::Cold(c) => c.created_by,
474        }
475    }
476
477    /// Returns `true` if this is a hot version.
478    #[must_use]
479    pub fn is_hot(&self) -> bool {
480        matches!(self, Self::Hot(_))
481    }
482
483    /// Returns `true` if this is a cold version.
484    #[must_use]
485    pub fn is_cold(&self) -> bool {
486        matches!(self, Self::Cold(_))
487    }
488}
489
490/// Tiered version index - replaces `VersionChain<T>` for hot/cold storage.
491///
492/// Instead of storing data inline, `VersionIndex` holds lightweight references
493/// to data stored in arenas (hot) or compressed blocks (cold). This enables:
494///
495/// - **No heap allocation** for typical 1-2 version case (SmallVec inline)
496/// - **Separation of metadata and data** for compression
497/// - **Fast visibility checks** via cached `latest_epoch`
498/// - **O(1) epoch drops** instead of per-version deallocation
499///
500/// # Memory Layout
501/// - `hot`: SmallVec<[HotVersionRef; 2]> ≈ 56 bytes inline
502/// - `cold`: SmallVec<[ColdVersionRef; 4]> ≈ 136 bytes inline
503/// - `latest_epoch`: 8 bytes
504/// - Total: ~200 bytes, no heap for typical case
505#[derive(Debug, Clone)]
506#[cfg(feature = "tiered-storage")]
507pub struct VersionIndex {
508    /// Hot versions in arena storage (most recent first).
509    hot: SmallVec<[HotVersionRef; 2]>,
510    /// Cold versions in compressed storage (most recent first).
511    cold: SmallVec<[ColdVersionRef; 4]>,
512    /// Cached epoch of the latest version for fast staleness checks.
513    latest_epoch: EpochId,
514}
515
516#[cfg(feature = "tiered-storage")]
517impl VersionIndex {
518    /// Creates a new empty version index.
519    #[must_use]
520    pub fn new() -> Self {
521        Self {
522            hot: SmallVec::new(),
523            cold: SmallVec::new(),
524            latest_epoch: EpochId::INITIAL,
525        }
526    }
527
528    /// Creates a version index with an initial hot version.
529    #[must_use]
530    pub fn with_initial(hot_ref: HotVersionRef) -> Self {
531        let mut index = Self::new();
532        index.add_hot(hot_ref);
533        index
534    }
535
536    /// Adds a new hot version (becomes the latest).
537    pub fn add_hot(&mut self, hot_ref: HotVersionRef) {
538        // Insert at front (most recent first)
539        self.hot.insert(0, hot_ref);
540        self.latest_epoch = hot_ref.epoch;
541    }
542
543    /// Returns the latest epoch for quick staleness checks.
544    #[must_use]
545    pub fn latest_epoch(&self) -> EpochId {
546        self.latest_epoch
547    }
548
549    /// Returns `true` if this entity has no versions.
550    #[must_use]
551    pub fn is_empty(&self) -> bool {
552        self.hot.is_empty() && self.cold.is_empty()
553    }
554
555    /// Returns the total version count (hot + cold).
556    #[must_use]
557    pub fn version_count(&self) -> usize {
558        self.hot.len() + self.cold.len()
559    }
560
561    /// Returns the number of hot versions.
562    #[must_use]
563    pub fn hot_count(&self) -> usize {
564        self.hot.len()
565    }
566
567    /// Returns the number of cold versions.
568    #[must_use]
569    pub fn cold_count(&self) -> usize {
570        self.cold.len()
571    }
572
573    /// Finds the version visible at the given epoch.
574    #[must_use]
575    pub fn visible_at(&self, epoch: EpochId) -> Option<VersionRef> {
576        // Check hot versions first (most recent first, likely case)
577        for v in &self.hot {
578            if v.is_visible_at(epoch) {
579                return Some(VersionRef::Hot(*v));
580            }
581        }
582        // Fall back to cold versions
583        for v in &self.cold {
584            if v.is_visible_at(epoch) {
585                return Some(VersionRef::Cold(*v));
586            }
587        }
588        None
589    }
590
591    /// Finds the version visible to a specific transaction.
592    #[must_use]
593    pub fn visible_to(&self, epoch: EpochId, tx: TxId) -> Option<VersionRef> {
594        // Check hot versions first
595        for v in &self.hot {
596            if v.is_visible_to(epoch, tx) {
597                return Some(VersionRef::Hot(*v));
598            }
599        }
600        // Fall back to cold versions
601        for v in &self.cold {
602            if v.is_visible_to(epoch, tx) {
603                return Some(VersionRef::Cold(*v));
604            }
605        }
606        None
607    }
608
609    /// Marks the currently visible version as deleted at the given epoch.
610    ///
611    /// Returns `true` if a version was marked, `false` if no visible version exists.
612    pub fn mark_deleted(&mut self, delete_epoch: EpochId) -> bool {
613        // Find the first non-deleted hot version and mark it
614        for v in &mut self.hot {
615            if v.deleted_epoch.is_none() {
616                v.deleted_epoch = OptionalEpochId::some(delete_epoch);
617                return true;
618            }
619        }
620        // Check cold versions (rare case)
621        for v in &mut self.cold {
622            if v.deleted_epoch.is_none() {
623                v.deleted_epoch = OptionalEpochId::some(delete_epoch);
624                return true;
625            }
626        }
627        false
628    }
629
630    /// Checks if any version was created by the given transaction.
631    #[must_use]
632    pub fn modified_by(&self, tx: TxId) -> bool {
633        self.hot.iter().any(|v| v.created_by == tx) || self.cold.iter().any(|v| v.created_by == tx)
634    }
635
636    /// Removes all versions created by the given transaction (for rollback).
637    pub fn remove_versions_by(&mut self, tx: TxId) {
638        self.hot.retain(|v| v.created_by != tx);
639        self.cold.retain(|v| v.created_by != tx);
640        self.recalculate_latest_epoch();
641    }
642
643    /// Checks for write conflict with concurrent transaction.
644    ///
645    /// A conflict exists if another transaction modified this entity
646    /// after our start epoch.
647    #[must_use]
648    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TxId) -> bool {
649        self.hot
650            .iter()
651            .any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
652            || self
653                .cold
654                .iter()
655                .any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
656    }
657
658    /// Garbage collects old versions not needed by any active transaction.
659    ///
660    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
661    pub fn gc(&mut self, min_epoch: EpochId) {
662        if self.is_empty() {
663            return;
664        }
665
666        // Keep versions that:
667        // 1. Were created at or after min_epoch
668        // 2. The first (most recent) version created before min_epoch
669        let mut found_old_visible = false;
670
671        self.hot.retain(|v| {
672            if v.epoch.as_u64() >= min_epoch.as_u64() {
673                true
674            } else if !found_old_visible {
675                found_old_visible = true;
676                true
677            } else {
678                false
679            }
680        });
681
682        // Same for cold, but only if we haven't found an old visible in hot
683        if !found_old_visible {
684            self.cold.retain(|v| {
685                if v.epoch.as_u64() >= min_epoch.as_u64() {
686                    true
687                } else if !found_old_visible {
688                    found_old_visible = true;
689                    true
690                } else {
691                    false
692                }
693            });
694        } else {
695            // All cold versions are older, only keep those >= min_epoch
696            self.cold.retain(|v| v.epoch.as_u64() >= min_epoch.as_u64());
697        }
698    }
699
700    /// Returns a reference to the latest version regardless of visibility.
701    #[must_use]
702    pub fn latest(&self) -> Option<VersionRef> {
703        self.hot
704            .first()
705            .map(|v| VersionRef::Hot(*v))
706            .or_else(|| self.cold.first().map(|v| VersionRef::Cold(*v)))
707    }
708
709    /// Freezes hot versions for a given epoch into cold storage.
710    ///
711    /// This is called when an epoch is no longer needed by any active transaction.
712    /// The actual compression happens in Phase 14; for now this just moves refs.
713    pub fn freeze_epoch(
714        &mut self,
715        epoch: EpochId,
716        cold_refs: impl Iterator<Item = ColdVersionRef>,
717    ) {
718        // Remove hot refs for this epoch
719        self.hot.retain(|v| v.epoch != epoch);
720
721        // Add cold refs
722        self.cold.extend(cold_refs);
723
724        // Keep cold sorted by epoch (descending = most recent first)
725        self.cold
726            .sort_by(|a, b| b.epoch.as_u64().cmp(&a.epoch.as_u64()));
727
728        self.recalculate_latest_epoch();
729    }
730
731    /// Returns hot version refs for a specific epoch (for freeze operation).
732    pub fn hot_refs_for_epoch(&self, epoch: EpochId) -> impl Iterator<Item = &HotVersionRef> {
733        self.hot.iter().filter(move |v| v.epoch == epoch)
734    }
735
736    /// Returns `true` if the hot SmallVec has spilled to the heap.
737    #[must_use]
738    pub fn hot_spilled(&self) -> bool {
739        self.hot.spilled()
740    }
741
742    /// Returns `true` if the cold SmallVec has spilled to the heap.
743    #[must_use]
744    pub fn cold_spilled(&self) -> bool {
745        self.cold.spilled()
746    }
747
748    fn recalculate_latest_epoch(&mut self) {
749        self.latest_epoch = self
750            .hot
751            .first()
752            .map(|v| v.epoch)
753            .or_else(|| self.cold.first().map(|v| v.epoch))
754            .unwrap_or(EpochId::INITIAL);
755    }
756}
757
758#[cfg(feature = "tiered-storage")]
759impl Default for VersionIndex {
760    fn default() -> Self {
761        Self::new()
762    }
763}
764
765#[cfg(test)]
766mod tests {
767    use super::*;
768
769    #[test]
770    fn test_version_visibility() {
771        let v = VersionInfo::new(EpochId::new(5), TxId::new(1));
772
773        // Not visible before creation
774        assert!(!v.is_visible_at(EpochId::new(4)));
775
776        // Visible at creation epoch and after
777        assert!(v.is_visible_at(EpochId::new(5)));
778        assert!(v.is_visible_at(EpochId::new(10)));
779    }
780
781    #[test]
782    fn test_deleted_version_visibility() {
783        let mut v = VersionInfo::new(EpochId::new(5), TxId::new(1));
784        v.mark_deleted(EpochId::new(10));
785
786        // Visible between creation and deletion
787        assert!(v.is_visible_at(EpochId::new(5)));
788        assert!(v.is_visible_at(EpochId::new(9)));
789
790        // Not visible at or after deletion
791        assert!(!v.is_visible_at(EpochId::new(10)));
792        assert!(!v.is_visible_at(EpochId::new(15)));
793    }
794
795    #[test]
796    fn test_version_visibility_to_transaction() {
797        let v = VersionInfo::new(EpochId::new(5), TxId::new(1));
798
799        // Creator can see it even if viewing at earlier epoch
800        assert!(v.is_visible_to(EpochId::new(3), TxId::new(1)));
801
802        // Other transactions can only see it at or after creation epoch
803        assert!(!v.is_visible_to(EpochId::new(3), TxId::new(2)));
804        assert!(v.is_visible_to(EpochId::new(5), TxId::new(2)));
805    }
806
807    #[test]
808    fn test_version_chain_basic() {
809        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
810
811        // Should see v1 at epoch 1+
812        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
813        assert_eq!(chain.visible_at(EpochId::new(0)), None);
814
815        // Add v2
816        chain.add_version("v2", EpochId::new(5), TxId::new(2));
817
818        // Should see v1 at epoch < 5, v2 at epoch >= 5
819        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
820        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
821        assert_eq!(chain.visible_at(EpochId::new(5)), Some(&"v2"));
822        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v2"));
823    }
824
825    #[test]
826    fn test_version_chain_rollback() {
827        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
828        chain.add_version("v2", EpochId::new(5), TxId::new(2));
829        chain.add_version("v3", EpochId::new(6), TxId::new(2));
830
831        assert_eq!(chain.version_count(), 3);
832
833        // Rollback tx 2's changes
834        chain.remove_versions_by(TxId::new(2));
835
836        assert_eq!(chain.version_count(), 1);
837        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v1"));
838    }
839
840    #[test]
841    fn test_version_chain_deletion() {
842        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
843
844        // Mark as deleted at epoch 5
845        assert!(chain.mark_deleted(EpochId::new(5)));
846
847        // Should see v1 before deletion, nothing after
848        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
849        assert_eq!(chain.visible_at(EpochId::new(5)), None);
850        assert_eq!(chain.visible_at(EpochId::new(10)), None);
851    }
852}
853
854// ============================================================================
855// Tiered Storage Tests
856// ============================================================================
857
858#[cfg(all(test, feature = "tiered-storage"))]
859mod tiered_storage_tests {
860    use super::*;
861
862    #[test]
863    fn test_optional_epoch_id() {
864        // Test NONE
865        let none = OptionalEpochId::NONE;
866        assert!(none.is_none());
867        assert!(!none.is_some());
868        assert_eq!(none.get(), None);
869
870        // Test Some
871        let some = OptionalEpochId::some(EpochId::new(42));
872        assert!(some.is_some());
873        assert!(!some.is_none());
874        assert_eq!(some.get(), Some(EpochId::new(42)));
875
876        // Test zero epoch
877        let zero = OptionalEpochId::some(EpochId::new(0));
878        assert!(zero.is_some());
879        assert_eq!(zero.get(), Some(EpochId::new(0)));
880    }
881
882    #[test]
883    fn test_hot_version_ref_visibility() {
884        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
885
886        // Not visible before creation
887        assert!(!hot.is_visible_at(EpochId::new(4)));
888
889        // Visible at creation and after
890        assert!(hot.is_visible_at(EpochId::new(5)));
891        assert!(hot.is_visible_at(EpochId::new(10)));
892    }
893
894    #[test]
895    fn test_hot_version_ref_deleted_visibility() {
896        let mut hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
897        hot.deleted_epoch = OptionalEpochId::some(EpochId::new(10));
898
899        // Visible between creation and deletion
900        assert!(hot.is_visible_at(EpochId::new(5)));
901        assert!(hot.is_visible_at(EpochId::new(9)));
902
903        // Not visible at or after deletion
904        assert!(!hot.is_visible_at(EpochId::new(10)));
905        assert!(!hot.is_visible_at(EpochId::new(15)));
906    }
907
908    #[test]
909    fn test_hot_version_ref_transaction_visibility() {
910        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
911
912        // Creator can see it even at earlier epoch
913        assert!(hot.is_visible_to(EpochId::new(3), TxId::new(1)));
914
915        // Other transactions can only see it at or after creation
916        assert!(!hot.is_visible_to(EpochId::new(3), TxId::new(2)));
917        assert!(hot.is_visible_to(EpochId::new(5), TxId::new(2)));
918    }
919
920    #[test]
921    fn test_version_index_basic() {
922        let hot = HotVersionRef::new(EpochId::new(1), 0, TxId::new(1));
923        let mut index = VersionIndex::with_initial(hot);
924
925        // Should see version at epoch 1+
926        assert!(index.visible_at(EpochId::new(1)).is_some());
927        assert!(index.visible_at(EpochId::new(0)).is_none());
928
929        // Add another version
930        let hot2 = HotVersionRef::new(EpochId::new(5), 100, TxId::new(2));
931        index.add_hot(hot2);
932
933        // Should see v1 at epoch < 5, v2 at epoch >= 5
934        let v1 = index.visible_at(EpochId::new(4)).unwrap();
935        assert!(matches!(v1, VersionRef::Hot(h) if h.arena_offset == 0));
936
937        let v2 = index.visible_at(EpochId::new(5)).unwrap();
938        assert!(matches!(v2, VersionRef::Hot(h) if h.arena_offset == 100));
939    }
940
941    #[test]
942    fn test_version_index_deletion() {
943        let hot = HotVersionRef::new(EpochId::new(1), 0, TxId::new(1));
944        let mut index = VersionIndex::with_initial(hot);
945
946        // Mark as deleted at epoch 5
947        assert!(index.mark_deleted(EpochId::new(5)));
948
949        // Should see version before deletion, nothing after
950        assert!(index.visible_at(EpochId::new(4)).is_some());
951        assert!(index.visible_at(EpochId::new(5)).is_none());
952        assert!(index.visible_at(EpochId::new(10)).is_none());
953    }
954
955    #[test]
956    fn test_version_index_transaction_visibility() {
957        let tx = TxId::new(10);
958        let hot = HotVersionRef::new(EpochId::new(5), 0, tx);
959        let index = VersionIndex::with_initial(hot);
960
961        // Creator can see it even at earlier epoch
962        assert!(index.visible_to(EpochId::new(3), tx).is_some());
963
964        // Other transactions cannot see it before creation
965        assert!(index.visible_to(EpochId::new(3), TxId::new(20)).is_none());
966        assert!(index.visible_to(EpochId::new(5), TxId::new(20)).is_some());
967    }
968
969    #[test]
970    fn test_version_index_rollback() {
971        let tx1 = TxId::new(10);
972        let tx2 = TxId::new(20);
973
974        let mut index = VersionIndex::new();
975        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, tx1));
976        index.add_hot(HotVersionRef::new(EpochId::new(2), 100, tx2));
977        index.add_hot(HotVersionRef::new(EpochId::new(3), 200, tx2));
978
979        assert_eq!(index.version_count(), 3);
980        assert!(index.modified_by(tx1));
981        assert!(index.modified_by(tx2));
982
983        // Rollback tx2's changes
984        index.remove_versions_by(tx2);
985
986        assert_eq!(index.version_count(), 1);
987        assert!(index.modified_by(tx1));
988        assert!(!index.modified_by(tx2));
989
990        // Should only see tx1's version
991        let v = index.visible_at(EpochId::new(10)).unwrap();
992        assert!(matches!(v, VersionRef::Hot(h) if h.created_by == tx1));
993    }
994
995    #[test]
996    fn test_version_index_gc() {
997        let mut index = VersionIndex::new();
998
999        // Add versions at epochs 1, 3, 5
1000        for epoch in [1, 3, 5] {
1001            index.add_hot(HotVersionRef::new(
1002                EpochId::new(epoch),
1003                epoch as u32 * 100,
1004                TxId::new(epoch),
1005            ));
1006        }
1007
1008        assert_eq!(index.version_count(), 3);
1009
1010        // GC with min_epoch = 4
1011        // Should keep: epoch 5 (>= 4) and epoch 3 (first old visible)
1012        index.gc(EpochId::new(4));
1013
1014        assert_eq!(index.version_count(), 2);
1015
1016        // Verify we kept epochs 5 and 3
1017        assert!(index.visible_at(EpochId::new(5)).is_some());
1018        assert!(index.visible_at(EpochId::new(3)).is_some());
1019    }
1020
1021    #[test]
1022    fn test_version_index_conflict_detection() {
1023        let tx1 = TxId::new(10);
1024        let tx2 = TxId::new(20);
1025
1026        let mut index = VersionIndex::new();
1027        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, tx1));
1028        index.add_hot(HotVersionRef::new(EpochId::new(5), 100, tx2));
1029
1030        // tx1 started at epoch 0, tx2 modified at epoch 5 -> conflict for tx1
1031        assert!(index.has_conflict(EpochId::new(0), tx1));
1032
1033        // tx2 started at epoch 0, tx1 modified at epoch 1 -> also conflict for tx2
1034        assert!(index.has_conflict(EpochId::new(0), tx2));
1035
1036        // tx1 started after tx2's modification -> no conflict
1037        assert!(!index.has_conflict(EpochId::new(5), tx1));
1038
1039        // tx2 started after tx1's modification -> no conflict
1040        assert!(!index.has_conflict(EpochId::new(1), tx2));
1041
1042        // If only tx1's version exists, tx1 doesn't conflict with itself
1043        let mut index2 = VersionIndex::new();
1044        index2.add_hot(HotVersionRef::new(EpochId::new(5), 0, tx1));
1045        assert!(!index2.has_conflict(EpochId::new(0), tx1));
1046    }
1047
1048    #[test]
1049    fn test_version_index_smallvec_no_heap() {
1050        let mut index = VersionIndex::new();
1051
1052        // Add 2 hot versions (within inline capacity)
1053        for i in 0..2 {
1054            index.add_hot(HotVersionRef::new(EpochId::new(i), i as u32, TxId::new(i)));
1055        }
1056
1057        // SmallVec should not have spilled to heap
1058        assert!(!index.hot_spilled());
1059        assert!(!index.cold_spilled());
1060    }
1061
1062    #[test]
1063    fn test_version_index_freeze_epoch() {
1064        let mut index = VersionIndex::new();
1065        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, TxId::new(1)));
1066        index.add_hot(HotVersionRef::new(EpochId::new(2), 100, TxId::new(2)));
1067
1068        assert_eq!(index.hot_count(), 2);
1069        assert_eq!(index.cold_count(), 0);
1070
1071        // Freeze epoch 1
1072        let cold_ref = ColdVersionRef {
1073            epoch: EpochId::new(1),
1074            block_offset: 0,
1075            length: 32,
1076            created_by: TxId::new(1),
1077            deleted_epoch: OptionalEpochId::NONE,
1078        };
1079        index.freeze_epoch(EpochId::new(1), std::iter::once(cold_ref));
1080
1081        // Hot should have 1, cold should have 1
1082        assert_eq!(index.hot_count(), 1);
1083        assert_eq!(index.cold_count(), 1);
1084
1085        // Visibility should still work
1086        assert!(index.visible_at(EpochId::new(1)).is_some());
1087        assert!(index.visible_at(EpochId::new(2)).is_some());
1088
1089        // Check that cold version is actually cold
1090        let v1 = index.visible_at(EpochId::new(1)).unwrap();
1091        assert!(v1.is_cold());
1092
1093        let v2 = index.visible_at(EpochId::new(2)).unwrap();
1094        assert!(v2.is_hot());
1095    }
1096
1097    #[test]
1098    fn test_version_ref_accessors() {
1099        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(10));
1100        let vr = VersionRef::Hot(hot);
1101
1102        assert_eq!(vr.epoch(), EpochId::new(5));
1103        assert_eq!(vr.created_by(), TxId::new(10));
1104        assert!(vr.is_hot());
1105        assert!(!vr.is_cold());
1106    }
1107
1108    #[test]
1109    fn test_version_index_latest_epoch() {
1110        let mut index = VersionIndex::new();
1111        assert_eq!(index.latest_epoch(), EpochId::INITIAL);
1112
1113        index.add_hot(HotVersionRef::new(EpochId::new(5), 0, TxId::new(1)));
1114        assert_eq!(index.latest_epoch(), EpochId::new(5));
1115
1116        index.add_hot(HotVersionRef::new(EpochId::new(10), 100, TxId::new(2)));
1117        assert_eq!(index.latest_epoch(), EpochId::new(10));
1118
1119        // After rollback, should recalculate
1120        index.remove_versions_by(TxId::new(2));
1121        assert_eq!(index.latest_epoch(), EpochId::new(5));
1122    }
1123
1124    #[test]
1125    fn test_version_index_default() {
1126        let index = VersionIndex::default();
1127        assert!(index.is_empty());
1128        assert_eq!(index.version_count(), 0);
1129    }
1130
1131    #[test]
1132    fn test_version_index_latest() {
1133        let mut index = VersionIndex::new();
1134        assert!(index.latest().is_none());
1135
1136        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, TxId::new(1)));
1137        let latest = index.latest().unwrap();
1138        assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(1)));
1139
1140        index.add_hot(HotVersionRef::new(EpochId::new(5), 100, TxId::new(2)));
1141        let latest = index.latest().unwrap();
1142        assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(5)));
1143    }
1144}