Skip to main content

grafeo_common/
mvcc.rs

1//! MVCC (Multi-Version Concurrency Control) primitives.
2//!
3//! This is how Grafeo handles concurrent reads and writes without blocking.
4//! Each entity has a [`VersionChain`] that tracks all versions. Readers see
5//! consistent snapshots, writers create new versions, and old versions get
6//! garbage collected when no one needs them anymore.
7
8use std::collections::VecDeque;
9
10#[cfg(feature = "tiered-storage")]
11use smallvec::SmallVec;
12
13use crate::types::{EpochId, TxId};
14
15/// Tracks when a version was created and deleted for visibility checks.
16#[derive(Debug, Clone, Copy)]
17pub struct VersionInfo {
18    /// The epoch this version was created in.
19    pub created_epoch: EpochId,
20    /// The epoch this version was deleted in (if any).
21    pub deleted_epoch: Option<EpochId>,
22    /// The transaction that created this version.
23    pub created_by: TxId,
24}
25
26impl VersionInfo {
27    /// Creates a new version info.
28    #[must_use]
29    pub fn new(created_epoch: EpochId, created_by: TxId) -> Self {
30        Self {
31            created_epoch,
32            deleted_epoch: None,
33            created_by,
34        }
35    }
36
37    /// Marks this version as deleted.
38    pub fn mark_deleted(&mut self, epoch: EpochId) {
39        self.deleted_epoch = Some(epoch);
40    }
41
42    /// Checks if this version is visible at the given epoch.
43    #[inline]
44    #[must_use]
45    pub fn is_visible_at(&self, epoch: EpochId) -> bool {
46        // Visible if created before or at the viewing epoch
47        // and not deleted before the viewing epoch
48        if !self.created_epoch.is_visible_at(epoch) {
49            return false;
50        }
51
52        if let Some(deleted) = self.deleted_epoch {
53            // Not visible if deleted at or before the viewing epoch
54            deleted.as_u64() > epoch.as_u64()
55        } else {
56            true
57        }
58    }
59
60    /// Checks if this version is visible to a specific transaction.
61    ///
62    /// A version is visible to a transaction if:
63    /// 1. It was created by the same transaction, OR
64    /// 2. It was created in an epoch before the transaction's start epoch
65    ///    and not deleted before that epoch
66    #[inline]
67    #[must_use]
68    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
69        // Own modifications are always visible
70        if self.created_by == viewing_tx {
71            return self.deleted_epoch.is_none();
72        }
73
74        // Otherwise, use epoch-based visibility
75        self.is_visible_at(viewing_epoch)
76    }
77}
78
79/// A single version of data.
80#[derive(Debug, Clone)]
81pub struct Version<T> {
82    /// Visibility metadata.
83    pub info: VersionInfo,
84    /// The actual data.
85    pub data: T,
86}
87
88impl<T> Version<T> {
89    /// Creates a new version.
90    #[must_use]
91    pub fn new(data: T, created_epoch: EpochId, created_by: TxId) -> Self {
92        Self {
93            info: VersionInfo::new(created_epoch, created_by),
94            data,
95        }
96    }
97}
98
99/// All versions of a single entity, newest first.
100///
101/// Each node/edge has one of these tracking its version history. Use
102/// [`visible_at()`](Self::visible_at) to get the version at a specific epoch,
103/// or [`visible_to()`](Self::visible_to) for transaction-aware visibility.
104#[derive(Debug, Clone)]
105pub struct VersionChain<T> {
106    /// Versions ordered newest-first.
107    versions: VecDeque<Version<T>>,
108}
109
110impl<T> VersionChain<T> {
111    /// Creates a new empty version chain.
112    #[must_use]
113    pub fn new() -> Self {
114        Self {
115            versions: VecDeque::new(),
116        }
117    }
118
119    /// Creates a version chain with an initial version.
120    #[must_use]
121    pub fn with_initial(data: T, created_epoch: EpochId, created_by: TxId) -> Self {
122        let mut chain = Self::new();
123        chain.add_version(data, created_epoch, created_by);
124        chain
125    }
126
127    /// Adds a new version to the chain.
128    ///
129    /// The new version becomes the head of the chain.
130    pub fn add_version(&mut self, data: T, created_epoch: EpochId, created_by: TxId) {
131        let version = Version::new(data, created_epoch, created_by);
132        self.versions.push_front(version);
133    }
134
135    /// Finds the version visible at the given epoch.
136    ///
137    /// Returns a reference to the visible version's data, or `None` if no version
138    /// is visible at that epoch.
139    #[inline]
140    #[must_use]
141    pub fn visible_at(&self, epoch: EpochId) -> Option<&T> {
142        self.versions
143            .iter()
144            .find(|v| v.info.is_visible_at(epoch))
145            .map(|v| &v.data)
146    }
147
148    /// Finds the version visible to a specific transaction.
149    ///
150    /// This considers both the transaction's epoch and its own uncommitted changes.
151    #[inline]
152    #[must_use]
153    pub fn visible_to(&self, epoch: EpochId, tx: TxId) -> Option<&T> {
154        self.versions
155            .iter()
156            .find(|v| v.info.is_visible_to(epoch, tx))
157            .map(|v| &v.data)
158    }
159
160    /// Marks the current visible version as deleted.
161    ///
162    /// Returns `true` if a version was marked, `false` if no visible version exists.
163    pub fn mark_deleted(&mut self, delete_epoch: EpochId) -> bool {
164        for version in &mut self.versions {
165            if version.info.deleted_epoch.is_none() {
166                version.info.mark_deleted(delete_epoch);
167                return true;
168            }
169        }
170        false
171    }
172
173    /// Checks if any version was modified by the given transaction.
174    #[must_use]
175    pub fn modified_by(&self, tx: TxId) -> bool {
176        self.versions.iter().any(|v| v.info.created_by == tx)
177    }
178
179    /// Removes all versions created by the given transaction.
180    ///
181    /// Used for rollback to discard uncommitted changes.
182    pub fn remove_versions_by(&mut self, tx: TxId) {
183        self.versions.retain(|v| v.info.created_by != tx);
184    }
185
186    /// Checks if there's a concurrent modification conflict.
187    ///
188    /// A conflict exists if another transaction modified this entity
189    /// after our start epoch.
190    #[must_use]
191    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TxId) -> bool {
192        self.versions.iter().any(|v| {
193            v.info.created_by != our_tx && v.info.created_epoch.as_u64() > start_epoch.as_u64()
194        })
195    }
196
197    /// Returns the number of versions in the chain.
198    #[must_use]
199    pub fn version_count(&self) -> usize {
200        self.versions.len()
201    }
202
203    /// Returns true if the chain has no versions.
204    #[must_use]
205    pub fn is_empty(&self) -> bool {
206        self.versions.is_empty()
207    }
208
209    /// Garbage collects old versions that are no longer visible to any transaction.
210    ///
211    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
212    pub fn gc(&mut self, min_epoch: EpochId) {
213        if self.versions.is_empty() {
214            return;
215        }
216
217        let mut keep_count = 0;
218        let mut found_old_visible = false;
219
220        for (i, version) in self.versions.iter().enumerate() {
221            if version.info.created_epoch.as_u64() >= min_epoch.as_u64() {
222                keep_count = i + 1;
223            } else if !found_old_visible {
224                // Keep the first (most recent) old version
225                found_old_visible = true;
226                keep_count = i + 1;
227            }
228        }
229
230        self.versions.truncate(keep_count);
231    }
232
233    /// Returns a reference to the latest version's data regardless of visibility.
234    #[must_use]
235    pub fn latest(&self) -> Option<&T> {
236        self.versions.front().map(|v| &v.data)
237    }
238
239    /// Returns a mutable reference to the latest version's data.
240    #[must_use]
241    pub fn latest_mut(&mut self) -> Option<&mut T> {
242        self.versions.front_mut().map(|v| &mut v.data)
243    }
244}
245
246impl<T> Default for VersionChain<T> {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252impl<T: Clone> VersionChain<T> {
253    /// Gets a mutable reference to the visible version's data for modification.
254    ///
255    /// If the version is not owned by this transaction, creates a new version
256    /// with a copy of the data.
257    pub fn get_mut(&mut self, epoch: EpochId, tx: TxId, modify_epoch: EpochId) -> Option<&mut T> {
258        // Find the visible version
259        let visible_idx = self
260            .versions
261            .iter()
262            .position(|v| v.info.is_visible_to(epoch, tx))?;
263
264        let visible = &self.versions[visible_idx];
265
266        if visible.info.created_by == tx {
267            // Already our version, modify in place
268            Some(&mut self.versions[visible_idx].data)
269        } else {
270            // Create a new version with copied data
271            let new_data = visible.data.clone();
272            self.add_version(new_data, modify_epoch, tx);
273            Some(&mut self.versions[0].data)
274        }
275    }
276}
277
278// ============================================================================
279// Tiered Storage Types (Phase 13)
280// ============================================================================
281//
282// These types support the tiered hot/cold storage architecture where version
283// metadata is stored separately from version data. Data lives in arenas (hot)
284// or compressed epoch blocks (cold), while VersionIndex holds lightweight refs.
285
286/// Compact representation of an optional epoch ID.
287///
288/// Uses `u32::MAX` as sentinel for `None`, allowing epochs up to ~4 billion.
289/// This saves 4 bytes compared to `Option<EpochId>` due to niche optimization.
290#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
291#[repr(transparent)]
292#[cfg(feature = "tiered-storage")]
293pub struct OptionalEpochId(u32);
294
295#[cfg(feature = "tiered-storage")]
296impl OptionalEpochId {
297    /// Represents no epoch (deleted_epoch = None).
298    pub const NONE: Self = Self(u32::MAX);
299
300    /// Creates an `OptionalEpochId` from an epoch.
301    ///
302    /// # Panics
303    /// Debug-panics if epoch exceeds u32::MAX - 1.
304    #[must_use]
305    pub fn some(epoch: EpochId) -> Self {
306        debug_assert!(
307            epoch.as_u64() < u64::from(u32::MAX),
308            "epoch {} exceeds OptionalEpochId capacity",
309            epoch.as_u64()
310        );
311        Self(epoch.as_u64() as u32)
312    }
313
314    /// Returns the contained epoch, or `None` if this is `NONE`.
315    #[inline]
316    #[must_use]
317    pub fn get(self) -> Option<EpochId> {
318        if self.0 == u32::MAX {
319            None
320        } else {
321            Some(EpochId::new(u64::from(self.0)))
322        }
323    }
324
325    /// Returns `true` if this contains an epoch.
326    #[must_use]
327    pub fn is_some(self) -> bool {
328        self.0 != u32::MAX
329    }
330
331    /// Returns `true` if this is `NONE`.
332    #[inline]
333    #[must_use]
334    pub fn is_none(self) -> bool {
335        self.0 == u32::MAX
336    }
337}
338
339/// Reference to a hot (arena-allocated) version.
340///
341/// Hot versions are stored in the epoch's arena and can be accessed directly.
342/// This struct only holds metadata; the actual data lives in the arena.
343///
344/// # Memory Layout
345/// - `epoch`: 8 bytes
346/// - `arena_offset`: 4 bytes
347/// - `created_by`: 8 bytes
348/// - `deleted_epoch`: 4 bytes
349/// - Total: 24 bytes
350#[derive(Debug, Clone, Copy, PartialEq, Eq)]
351#[cfg(feature = "tiered-storage")]
352pub struct HotVersionRef {
353    /// Epoch when this version was created.
354    pub epoch: EpochId,
355    /// Offset within the epoch's arena where the data is stored.
356    pub arena_offset: u32,
357    /// Transaction that created this version.
358    pub created_by: TxId,
359    /// Epoch when this version was deleted (NONE if still alive).
360    pub deleted_epoch: OptionalEpochId,
361}
362
363#[cfg(feature = "tiered-storage")]
364impl HotVersionRef {
365    /// Creates a new hot version reference.
366    #[must_use]
367    pub fn new(epoch: EpochId, arena_offset: u32, created_by: TxId) -> Self {
368        Self {
369            epoch,
370            arena_offset,
371            created_by,
372            deleted_epoch: OptionalEpochId::NONE,
373        }
374    }
375
376    /// Checks if this version is visible at the given epoch.
377    #[inline]
378    #[must_use]
379    pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
380        // Must be created at or before the viewing epoch
381        if !self.epoch.is_visible_at(viewing_epoch) {
382            return false;
383        }
384        // Must not be deleted at or before the viewing epoch
385        match self.deleted_epoch.get() {
386            Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
387            None => true,
388        }
389    }
390
391    /// Checks if this version is visible to a specific transaction.
392    #[inline]
393    #[must_use]
394    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
395        // Own modifications are always visible (if not deleted by self)
396        if self.created_by == viewing_tx {
397            return self.deleted_epoch.is_none();
398        }
399        // Otherwise use epoch-based visibility
400        self.is_visible_at(viewing_epoch)
401    }
402}
403
404/// Reference to a cold (compressed) version.
405///
406/// Cold versions are stored in compressed epoch blocks. This is a placeholder
407/// for Phase 14 - the actual compression logic will be implemented there.
408///
409/// # Memory Layout
410/// - `epoch`: 8 bytes
411/// - `block_offset`: 4 bytes
412/// - `length`: 2 bytes
413/// - `created_by`: 8 bytes
414/// - `deleted_epoch`: 4 bytes
415/// - Total: 26 bytes (+ 6 padding = 32 bytes aligned)
416#[derive(Debug, Clone, Copy, PartialEq, Eq)]
417#[cfg(feature = "tiered-storage")]
418pub struct ColdVersionRef {
419    /// Epoch when this version was created.
420    pub epoch: EpochId,
421    /// Offset within the compressed epoch block.
422    pub block_offset: u32,
423    /// Compressed length in bytes.
424    pub length: u16,
425    /// Transaction that created this version.
426    pub created_by: TxId,
427    /// Epoch when this version was deleted.
428    pub deleted_epoch: OptionalEpochId,
429}
430
431#[cfg(feature = "tiered-storage")]
432impl ColdVersionRef {
433    /// Checks if this version is visible at the given epoch.
434    #[inline]
435    #[must_use]
436    pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
437        if !self.epoch.is_visible_at(viewing_epoch) {
438            return false;
439        }
440        match self.deleted_epoch.get() {
441            Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
442            None => true,
443        }
444    }
445
446    /// Checks if this version is visible to a specific transaction.
447    #[inline]
448    #[must_use]
449    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
450        if self.created_by == viewing_tx {
451            return self.deleted_epoch.is_none();
452        }
453        self.is_visible_at(viewing_epoch)
454    }
455}
456
457/// Unified reference to either a hot or cold version.
458#[derive(Debug, Clone, Copy)]
459#[cfg(feature = "tiered-storage")]
460pub enum VersionRef {
461    /// Version data is in arena (hot tier).
462    Hot(HotVersionRef),
463    /// Version data is in compressed storage (cold tier).
464    Cold(ColdVersionRef),
465}
466
467#[cfg(feature = "tiered-storage")]
468impl VersionRef {
469    /// Returns the epoch when this version was created.
470    #[must_use]
471    pub fn epoch(&self) -> EpochId {
472        match self {
473            Self::Hot(h) => h.epoch,
474            Self::Cold(c) => c.epoch,
475        }
476    }
477
478    /// Returns the transaction that created this version.
479    #[must_use]
480    pub fn created_by(&self) -> TxId {
481        match self {
482            Self::Hot(h) => h.created_by,
483            Self::Cold(c) => c.created_by,
484        }
485    }
486
487    /// Returns `true` if this is a hot version.
488    #[must_use]
489    pub fn is_hot(&self) -> bool {
490        matches!(self, Self::Hot(_))
491    }
492
493    /// Returns `true` if this is a cold version.
494    #[must_use]
495    pub fn is_cold(&self) -> bool {
496        matches!(self, Self::Cold(_))
497    }
498}
499
500/// Tiered version index - replaces `VersionChain<T>` for hot/cold storage.
501///
502/// Instead of storing data inline, `VersionIndex` holds lightweight references
503/// to data stored in arenas (hot) or compressed blocks (cold). This enables:
504///
505/// - **No heap allocation** for typical 1-2 version case (SmallVec inline)
506/// - **Separation of metadata and data** for compression
507/// - **Fast visibility checks** via cached `latest_epoch`
508/// - **O(1) epoch drops** instead of per-version deallocation
509///
510/// # Memory Layout
511/// - `hot`: SmallVec<[HotVersionRef; 2]> ≈ 56 bytes inline
512/// - `cold`: SmallVec<[ColdVersionRef; 4]> ≈ 136 bytes inline
513/// - `latest_epoch`: 8 bytes
514/// - Total: ~200 bytes, no heap for typical case
515#[derive(Debug, Clone)]
516#[cfg(feature = "tiered-storage")]
517pub struct VersionIndex {
518    /// Hot versions in arena storage (most recent first).
519    hot: SmallVec<[HotVersionRef; 2]>,
520    /// Cold versions in compressed storage (most recent first).
521    cold: SmallVec<[ColdVersionRef; 4]>,
522    /// Cached epoch of the latest version for fast staleness checks.
523    latest_epoch: EpochId,
524}
525
526#[cfg(feature = "tiered-storage")]
527impl VersionIndex {
528    /// Creates a new empty version index.
529    #[must_use]
530    pub fn new() -> Self {
531        Self {
532            hot: SmallVec::new(),
533            cold: SmallVec::new(),
534            latest_epoch: EpochId::INITIAL,
535        }
536    }
537
538    /// Creates a version index with an initial hot version.
539    #[must_use]
540    pub fn with_initial(hot_ref: HotVersionRef) -> Self {
541        let mut index = Self::new();
542        index.add_hot(hot_ref);
543        index
544    }
545
546    /// Adds a new hot version (becomes the latest).
547    pub fn add_hot(&mut self, hot_ref: HotVersionRef) {
548        // Insert at front (most recent first)
549        self.hot.insert(0, hot_ref);
550        self.latest_epoch = hot_ref.epoch;
551    }
552
553    /// Returns the latest epoch for quick staleness checks.
554    #[must_use]
555    pub fn latest_epoch(&self) -> EpochId {
556        self.latest_epoch
557    }
558
559    /// Returns `true` if this entity has no versions.
560    #[must_use]
561    pub fn is_empty(&self) -> bool {
562        self.hot.is_empty() && self.cold.is_empty()
563    }
564
565    /// Returns the total version count (hot + cold).
566    #[must_use]
567    pub fn version_count(&self) -> usize {
568        self.hot.len() + self.cold.len()
569    }
570
571    /// Returns the number of hot versions.
572    #[must_use]
573    pub fn hot_count(&self) -> usize {
574        self.hot.len()
575    }
576
577    /// Returns the number of cold versions.
578    #[must_use]
579    pub fn cold_count(&self) -> usize {
580        self.cold.len()
581    }
582
583    /// Finds the version visible at the given epoch.
584    #[inline]
585    #[must_use]
586    pub fn visible_at(&self, epoch: EpochId) -> Option<VersionRef> {
587        // Check hot versions first (most recent first, likely case)
588        for v in &self.hot {
589            if v.is_visible_at(epoch) {
590                return Some(VersionRef::Hot(*v));
591            }
592        }
593        // Fall back to cold versions
594        for v in &self.cold {
595            if v.is_visible_at(epoch) {
596                return Some(VersionRef::Cold(*v));
597            }
598        }
599        None
600    }
601
602    /// Finds the version visible to a specific transaction.
603    #[inline]
604    #[must_use]
605    pub fn visible_to(&self, epoch: EpochId, tx: TxId) -> Option<VersionRef> {
606        // Check hot versions first
607        for v in &self.hot {
608            if v.is_visible_to(epoch, tx) {
609                return Some(VersionRef::Hot(*v));
610            }
611        }
612        // Fall back to cold versions
613        for v in &self.cold {
614            if v.is_visible_to(epoch, tx) {
615                return Some(VersionRef::Cold(*v));
616            }
617        }
618        None
619    }
620
621    /// Marks the currently visible version as deleted at the given epoch.
622    ///
623    /// Returns `true` if a version was marked, `false` if no visible version exists.
624    pub fn mark_deleted(&mut self, delete_epoch: EpochId) -> bool {
625        // Find the first non-deleted hot version and mark it
626        for v in &mut self.hot {
627            if v.deleted_epoch.is_none() {
628                v.deleted_epoch = OptionalEpochId::some(delete_epoch);
629                return true;
630            }
631        }
632        // Check cold versions (rare case)
633        for v in &mut self.cold {
634            if v.deleted_epoch.is_none() {
635                v.deleted_epoch = OptionalEpochId::some(delete_epoch);
636                return true;
637            }
638        }
639        false
640    }
641
642    /// Checks if any version was created by the given transaction.
643    #[must_use]
644    pub fn modified_by(&self, tx: TxId) -> bool {
645        self.hot.iter().any(|v| v.created_by == tx) || self.cold.iter().any(|v| v.created_by == tx)
646    }
647
648    /// Removes all versions created by the given transaction (for rollback).
649    pub fn remove_versions_by(&mut self, tx: TxId) {
650        self.hot.retain(|v| v.created_by != tx);
651        self.cold.retain(|v| v.created_by != tx);
652        self.recalculate_latest_epoch();
653    }
654
655    /// Checks for write conflict with concurrent transaction.
656    ///
657    /// A conflict exists if another transaction modified this entity
658    /// after our start epoch.
659    #[must_use]
660    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TxId) -> bool {
661        self.hot
662            .iter()
663            .any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
664            || self
665                .cold
666                .iter()
667                .any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
668    }
669
670    /// Garbage collects old versions not needed by any active transaction.
671    ///
672    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
673    pub fn gc(&mut self, min_epoch: EpochId) {
674        if self.is_empty() {
675            return;
676        }
677
678        // Keep versions that:
679        // 1. Were created at or after min_epoch
680        // 2. The first (most recent) version created before min_epoch
681        let mut found_old_visible = false;
682
683        self.hot.retain(|v| {
684            if v.epoch.as_u64() >= min_epoch.as_u64() {
685                true
686            } else if !found_old_visible {
687                found_old_visible = true;
688                true
689            } else {
690                false
691            }
692        });
693
694        // Same for cold, but only if we haven't found an old visible in hot
695        if !found_old_visible {
696            self.cold.retain(|v| {
697                if v.epoch.as_u64() >= min_epoch.as_u64() {
698                    true
699                } else if !found_old_visible {
700                    found_old_visible = true;
701                    true
702                } else {
703                    false
704                }
705            });
706        } else {
707            // All cold versions are older, only keep those >= min_epoch
708            self.cold.retain(|v| v.epoch.as_u64() >= min_epoch.as_u64());
709        }
710    }
711
712    /// Returns a reference to the latest version regardless of visibility.
713    #[must_use]
714    pub fn latest(&self) -> Option<VersionRef> {
715        self.hot
716            .first()
717            .map(|v| VersionRef::Hot(*v))
718            .or_else(|| self.cold.first().map(|v| VersionRef::Cold(*v)))
719    }
720
721    /// Freezes hot versions for a given epoch into cold storage.
722    ///
723    /// This is called when an epoch is no longer needed by any active transaction.
724    /// The actual compression happens in Phase 14; for now this just moves refs.
725    pub fn freeze_epoch(
726        &mut self,
727        epoch: EpochId,
728        cold_refs: impl Iterator<Item = ColdVersionRef>,
729    ) {
730        // Remove hot refs for this epoch
731        self.hot.retain(|v| v.epoch != epoch);
732
733        // Add cold refs
734        self.cold.extend(cold_refs);
735
736        // Keep cold sorted by epoch (descending = most recent first)
737        self.cold
738            .sort_by(|a, b| b.epoch.as_u64().cmp(&a.epoch.as_u64()));
739
740        self.recalculate_latest_epoch();
741    }
742
743    /// Returns hot version refs for a specific epoch (for freeze operation).
744    pub fn hot_refs_for_epoch(&self, epoch: EpochId) -> impl Iterator<Item = &HotVersionRef> {
745        self.hot.iter().filter(move |v| v.epoch == epoch)
746    }
747
748    /// Returns `true` if the hot SmallVec has spilled to the heap.
749    #[must_use]
750    pub fn hot_spilled(&self) -> bool {
751        self.hot.spilled()
752    }
753
754    /// Returns `true` if the cold SmallVec has spilled to the heap.
755    #[must_use]
756    pub fn cold_spilled(&self) -> bool {
757        self.cold.spilled()
758    }
759
760    fn recalculate_latest_epoch(&mut self) {
761        self.latest_epoch = self
762            .hot
763            .first()
764            .map(|v| v.epoch)
765            .or_else(|| self.cold.first().map(|v| v.epoch))
766            .unwrap_or(EpochId::INITIAL);
767    }
768}
769
770#[cfg(feature = "tiered-storage")]
771impl Default for VersionIndex {
772    fn default() -> Self {
773        Self::new()
774    }
775}
776
777#[cfg(test)]
778mod tests {
779    use super::*;
780
781    #[test]
782    fn test_version_visibility() {
783        let v = VersionInfo::new(EpochId::new(5), TxId::new(1));
784
785        // Not visible before creation
786        assert!(!v.is_visible_at(EpochId::new(4)));
787
788        // Visible at creation epoch and after
789        assert!(v.is_visible_at(EpochId::new(5)));
790        assert!(v.is_visible_at(EpochId::new(10)));
791    }
792
793    #[test]
794    fn test_deleted_version_visibility() {
795        let mut v = VersionInfo::new(EpochId::new(5), TxId::new(1));
796        v.mark_deleted(EpochId::new(10));
797
798        // Visible between creation and deletion
799        assert!(v.is_visible_at(EpochId::new(5)));
800        assert!(v.is_visible_at(EpochId::new(9)));
801
802        // Not visible at or after deletion
803        assert!(!v.is_visible_at(EpochId::new(10)));
804        assert!(!v.is_visible_at(EpochId::new(15)));
805    }
806
807    #[test]
808    fn test_version_visibility_to_transaction() {
809        let v = VersionInfo::new(EpochId::new(5), TxId::new(1));
810
811        // Creator can see it even if viewing at earlier epoch
812        assert!(v.is_visible_to(EpochId::new(3), TxId::new(1)));
813
814        // Other transactions can only see it at or after creation epoch
815        assert!(!v.is_visible_to(EpochId::new(3), TxId::new(2)));
816        assert!(v.is_visible_to(EpochId::new(5), TxId::new(2)));
817    }
818
819    #[test]
820    fn test_version_chain_basic() {
821        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
822
823        // Should see v1 at epoch 1+
824        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
825        assert_eq!(chain.visible_at(EpochId::new(0)), None);
826
827        // Add v2
828        chain.add_version("v2", EpochId::new(5), TxId::new(2));
829
830        // Should see v1 at epoch < 5, v2 at epoch >= 5
831        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
832        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
833        assert_eq!(chain.visible_at(EpochId::new(5)), Some(&"v2"));
834        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v2"));
835    }
836
837    #[test]
838    fn test_version_chain_rollback() {
839        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
840        chain.add_version("v2", EpochId::new(5), TxId::new(2));
841        chain.add_version("v3", EpochId::new(6), TxId::new(2));
842
843        assert_eq!(chain.version_count(), 3);
844
845        // Rollback tx 2's changes
846        chain.remove_versions_by(TxId::new(2));
847
848        assert_eq!(chain.version_count(), 1);
849        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v1"));
850    }
851
852    #[test]
853    fn test_version_chain_deletion() {
854        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
855
856        // Mark as deleted at epoch 5
857        assert!(chain.mark_deleted(EpochId::new(5)));
858
859        // Should see v1 before deletion, nothing after
860        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
861        assert_eq!(chain.visible_at(EpochId::new(5)), None);
862        assert_eq!(chain.visible_at(EpochId::new(10)), None);
863    }
864}
865
866// ============================================================================
867// Tiered Storage Tests
868// ============================================================================
869
870#[cfg(all(test, feature = "tiered-storage"))]
871mod tiered_storage_tests {
872    use super::*;
873
874    #[test]
875    fn test_optional_epoch_id() {
876        // Test NONE
877        let none = OptionalEpochId::NONE;
878        assert!(none.is_none());
879        assert!(!none.is_some());
880        assert_eq!(none.get(), None);
881
882        // Test Some
883        let some = OptionalEpochId::some(EpochId::new(42));
884        assert!(some.is_some());
885        assert!(!some.is_none());
886        assert_eq!(some.get(), Some(EpochId::new(42)));
887
888        // Test zero epoch
889        let zero = OptionalEpochId::some(EpochId::new(0));
890        assert!(zero.is_some());
891        assert_eq!(zero.get(), Some(EpochId::new(0)));
892    }
893
894    #[test]
895    fn test_hot_version_ref_visibility() {
896        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
897
898        // Not visible before creation
899        assert!(!hot.is_visible_at(EpochId::new(4)));
900
901        // Visible at creation and after
902        assert!(hot.is_visible_at(EpochId::new(5)));
903        assert!(hot.is_visible_at(EpochId::new(10)));
904    }
905
906    #[test]
907    fn test_hot_version_ref_deleted_visibility() {
908        let mut hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
909        hot.deleted_epoch = OptionalEpochId::some(EpochId::new(10));
910
911        // Visible between creation and deletion
912        assert!(hot.is_visible_at(EpochId::new(5)));
913        assert!(hot.is_visible_at(EpochId::new(9)));
914
915        // Not visible at or after deletion
916        assert!(!hot.is_visible_at(EpochId::new(10)));
917        assert!(!hot.is_visible_at(EpochId::new(15)));
918    }
919
920    #[test]
921    fn test_hot_version_ref_transaction_visibility() {
922        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
923
924        // Creator can see it even at earlier epoch
925        assert!(hot.is_visible_to(EpochId::new(3), TxId::new(1)));
926
927        // Other transactions can only see it at or after creation
928        assert!(!hot.is_visible_to(EpochId::new(3), TxId::new(2)));
929        assert!(hot.is_visible_to(EpochId::new(5), TxId::new(2)));
930    }
931
932    #[test]
933    fn test_version_index_basic() {
934        let hot = HotVersionRef::new(EpochId::new(1), 0, TxId::new(1));
935        let mut index = VersionIndex::with_initial(hot);
936
937        // Should see version at epoch 1+
938        assert!(index.visible_at(EpochId::new(1)).is_some());
939        assert!(index.visible_at(EpochId::new(0)).is_none());
940
941        // Add another version
942        let hot2 = HotVersionRef::new(EpochId::new(5), 100, TxId::new(2));
943        index.add_hot(hot2);
944
945        // Should see v1 at epoch < 5, v2 at epoch >= 5
946        let v1 = index.visible_at(EpochId::new(4)).unwrap();
947        assert!(matches!(v1, VersionRef::Hot(h) if h.arena_offset == 0));
948
949        let v2 = index.visible_at(EpochId::new(5)).unwrap();
950        assert!(matches!(v2, VersionRef::Hot(h) if h.arena_offset == 100));
951    }
952
953    #[test]
954    fn test_version_index_deletion() {
955        let hot = HotVersionRef::new(EpochId::new(1), 0, TxId::new(1));
956        let mut index = VersionIndex::with_initial(hot);
957
958        // Mark as deleted at epoch 5
959        assert!(index.mark_deleted(EpochId::new(5)));
960
961        // Should see version before deletion, nothing after
962        assert!(index.visible_at(EpochId::new(4)).is_some());
963        assert!(index.visible_at(EpochId::new(5)).is_none());
964        assert!(index.visible_at(EpochId::new(10)).is_none());
965    }
966
967    #[test]
968    fn test_version_index_transaction_visibility() {
969        let tx = TxId::new(10);
970        let hot = HotVersionRef::new(EpochId::new(5), 0, tx);
971        let index = VersionIndex::with_initial(hot);
972
973        // Creator can see it even at earlier epoch
974        assert!(index.visible_to(EpochId::new(3), tx).is_some());
975
976        // Other transactions cannot see it before creation
977        assert!(index.visible_to(EpochId::new(3), TxId::new(20)).is_none());
978        assert!(index.visible_to(EpochId::new(5), TxId::new(20)).is_some());
979    }
980
981    #[test]
982    fn test_version_index_rollback() {
983        let tx1 = TxId::new(10);
984        let tx2 = TxId::new(20);
985
986        let mut index = VersionIndex::new();
987        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, tx1));
988        index.add_hot(HotVersionRef::new(EpochId::new(2), 100, tx2));
989        index.add_hot(HotVersionRef::new(EpochId::new(3), 200, tx2));
990
991        assert_eq!(index.version_count(), 3);
992        assert!(index.modified_by(tx1));
993        assert!(index.modified_by(tx2));
994
995        // Rollback tx2's changes
996        index.remove_versions_by(tx2);
997
998        assert_eq!(index.version_count(), 1);
999        assert!(index.modified_by(tx1));
1000        assert!(!index.modified_by(tx2));
1001
1002        // Should only see tx1's version
1003        let v = index.visible_at(EpochId::new(10)).unwrap();
1004        assert!(matches!(v, VersionRef::Hot(h) if h.created_by == tx1));
1005    }
1006
1007    #[test]
1008    fn test_version_index_gc() {
1009        let mut index = VersionIndex::new();
1010
1011        // Add versions at epochs 1, 3, 5
1012        for epoch in [1, 3, 5] {
1013            index.add_hot(HotVersionRef::new(
1014                EpochId::new(epoch),
1015                epoch as u32 * 100,
1016                TxId::new(epoch),
1017            ));
1018        }
1019
1020        assert_eq!(index.version_count(), 3);
1021
1022        // GC with min_epoch = 4
1023        // Should keep: epoch 5 (>= 4) and epoch 3 (first old visible)
1024        index.gc(EpochId::new(4));
1025
1026        assert_eq!(index.version_count(), 2);
1027
1028        // Verify we kept epochs 5 and 3
1029        assert!(index.visible_at(EpochId::new(5)).is_some());
1030        assert!(index.visible_at(EpochId::new(3)).is_some());
1031    }
1032
1033    #[test]
1034    fn test_version_index_conflict_detection() {
1035        let tx1 = TxId::new(10);
1036        let tx2 = TxId::new(20);
1037
1038        let mut index = VersionIndex::new();
1039        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, tx1));
1040        index.add_hot(HotVersionRef::new(EpochId::new(5), 100, tx2));
1041
1042        // tx1 started at epoch 0, tx2 modified at epoch 5 -> conflict for tx1
1043        assert!(index.has_conflict(EpochId::new(0), tx1));
1044
1045        // tx2 started at epoch 0, tx1 modified at epoch 1 -> also conflict for tx2
1046        assert!(index.has_conflict(EpochId::new(0), tx2));
1047
1048        // tx1 started after tx2's modification -> no conflict
1049        assert!(!index.has_conflict(EpochId::new(5), tx1));
1050
1051        // tx2 started after tx1's modification -> no conflict
1052        assert!(!index.has_conflict(EpochId::new(1), tx2));
1053
1054        // If only tx1's version exists, tx1 doesn't conflict with itself
1055        let mut index2 = VersionIndex::new();
1056        index2.add_hot(HotVersionRef::new(EpochId::new(5), 0, tx1));
1057        assert!(!index2.has_conflict(EpochId::new(0), tx1));
1058    }
1059
1060    #[test]
1061    fn test_version_index_smallvec_no_heap() {
1062        let mut index = VersionIndex::new();
1063
1064        // Add 2 hot versions (within inline capacity)
1065        for i in 0..2 {
1066            index.add_hot(HotVersionRef::new(EpochId::new(i), i as u32, TxId::new(i)));
1067        }
1068
1069        // SmallVec should not have spilled to heap
1070        assert!(!index.hot_spilled());
1071        assert!(!index.cold_spilled());
1072    }
1073
1074    #[test]
1075    fn test_version_index_freeze_epoch() {
1076        let mut index = VersionIndex::new();
1077        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, TxId::new(1)));
1078        index.add_hot(HotVersionRef::new(EpochId::new(2), 100, TxId::new(2)));
1079
1080        assert_eq!(index.hot_count(), 2);
1081        assert_eq!(index.cold_count(), 0);
1082
1083        // Freeze epoch 1
1084        let cold_ref = ColdVersionRef {
1085            epoch: EpochId::new(1),
1086            block_offset: 0,
1087            length: 32,
1088            created_by: TxId::new(1),
1089            deleted_epoch: OptionalEpochId::NONE,
1090        };
1091        index.freeze_epoch(EpochId::new(1), std::iter::once(cold_ref));
1092
1093        // Hot should have 1, cold should have 1
1094        assert_eq!(index.hot_count(), 1);
1095        assert_eq!(index.cold_count(), 1);
1096
1097        // Visibility should still work
1098        assert!(index.visible_at(EpochId::new(1)).is_some());
1099        assert!(index.visible_at(EpochId::new(2)).is_some());
1100
1101        // Check that cold version is actually cold
1102        let v1 = index.visible_at(EpochId::new(1)).unwrap();
1103        assert!(v1.is_cold());
1104
1105        let v2 = index.visible_at(EpochId::new(2)).unwrap();
1106        assert!(v2.is_hot());
1107    }
1108
1109    #[test]
1110    fn test_version_ref_accessors() {
1111        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(10));
1112        let vr = VersionRef::Hot(hot);
1113
1114        assert_eq!(vr.epoch(), EpochId::new(5));
1115        assert_eq!(vr.created_by(), TxId::new(10));
1116        assert!(vr.is_hot());
1117        assert!(!vr.is_cold());
1118    }
1119
1120    #[test]
1121    fn test_version_index_latest_epoch() {
1122        let mut index = VersionIndex::new();
1123        assert_eq!(index.latest_epoch(), EpochId::INITIAL);
1124
1125        index.add_hot(HotVersionRef::new(EpochId::new(5), 0, TxId::new(1)));
1126        assert_eq!(index.latest_epoch(), EpochId::new(5));
1127
1128        index.add_hot(HotVersionRef::new(EpochId::new(10), 100, TxId::new(2)));
1129        assert_eq!(index.latest_epoch(), EpochId::new(10));
1130
1131        // After rollback, should recalculate
1132        index.remove_versions_by(TxId::new(2));
1133        assert_eq!(index.latest_epoch(), EpochId::new(5));
1134    }
1135
1136    #[test]
1137    fn test_version_index_default() {
1138        let index = VersionIndex::default();
1139        assert!(index.is_empty());
1140        assert_eq!(index.version_count(), 0);
1141    }
1142
1143    #[test]
1144    fn test_version_index_latest() {
1145        let mut index = VersionIndex::new();
1146        assert!(index.latest().is_none());
1147
1148        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, TxId::new(1)));
1149        let latest = index.latest().unwrap();
1150        assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(1)));
1151
1152        index.add_hot(HotVersionRef::new(EpochId::new(5), 100, TxId::new(2)));
1153        let latest = index.latest().unwrap();
1154        assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(5)));
1155    }
1156}