Skip to main content

grafeo_common/
mvcc.rs

1//! MVCC (Multi-Version Concurrency Control) primitives.
2//!
3//! This is how Grafeo handles concurrent reads and writes without blocking.
4//! Each entity has a [`VersionChain`] that tracks all versions. Readers see
5//! consistent snapshots, writers create new versions, and old versions get
6//! garbage collected when no one needs them anymore.
7
8use std::collections::VecDeque;
9
10#[cfg(feature = "tiered-storage")]
11use smallvec::SmallVec;
12
13use crate::types::{EpochId, TxId};
14
15/// Tracks when a version was created and deleted for visibility checks.
16#[derive(Debug, Clone, Copy)]
17pub struct VersionInfo {
18    /// The epoch this version was created in.
19    pub created_epoch: EpochId,
20    /// The epoch this version was deleted in (if any).
21    pub deleted_epoch: Option<EpochId>,
22    /// The transaction that created this version.
23    pub created_by: TxId,
24}
25
26impl VersionInfo {
27    /// Creates a new version info.
28    #[must_use]
29    pub fn new(created_epoch: EpochId, created_by: TxId) -> Self {
30        Self {
31            created_epoch,
32            deleted_epoch: None,
33            created_by,
34        }
35    }
36
37    /// Marks this version as deleted.
38    pub fn mark_deleted(&mut self, epoch: EpochId) {
39        self.deleted_epoch = Some(epoch);
40    }
41
42    /// Checks if this version is visible at the given epoch.
43    #[inline]
44    #[must_use]
45    pub fn is_visible_at(&self, epoch: EpochId) -> bool {
46        // Visible if created before or at the viewing epoch
47        // and not deleted before the viewing epoch
48        if !self.created_epoch.is_visible_at(epoch) {
49            return false;
50        }
51
52        if let Some(deleted) = self.deleted_epoch {
53            // Not visible if deleted at or before the viewing epoch
54            deleted.as_u64() > epoch.as_u64()
55        } else {
56            true
57        }
58    }
59
60    /// Checks if this version is visible to a specific transaction.
61    ///
62    /// A version is visible to a transaction if:
63    /// 1. It was created by the same transaction, OR
64    /// 2. It was created in an epoch before the transaction's start epoch
65    ///    and not deleted before that epoch
66    #[inline]
67    #[must_use]
68    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
69        // Own modifications are always visible
70        if self.created_by == viewing_tx {
71            return self.deleted_epoch.is_none();
72        }
73
74        // Otherwise, use epoch-based visibility
75        self.is_visible_at(viewing_epoch)
76    }
77}
78
79/// A single version of data.
80#[derive(Debug, Clone)]
81pub struct Version<T> {
82    /// Visibility metadata.
83    pub info: VersionInfo,
84    /// The actual data.
85    pub data: T,
86}
87
88impl<T> Version<T> {
89    /// Creates a new version.
90    #[must_use]
91    pub fn new(data: T, created_epoch: EpochId, created_by: TxId) -> Self {
92        Self {
93            info: VersionInfo::new(created_epoch, created_by),
94            data,
95        }
96    }
97}
98
99/// All versions of a single entity, newest first.
100///
101/// Each node/edge has one of these tracking its version history. Use
102/// [`visible_at()`](Self::visible_at) to get the version at a specific epoch,
103/// or [`visible_to()`](Self::visible_to) for transaction-aware visibility.
104#[derive(Debug, Clone)]
105pub struct VersionChain<T> {
106    /// Versions ordered newest-first.
107    versions: VecDeque<Version<T>>,
108}
109
110impl<T> VersionChain<T> {
111    /// Creates a new empty version chain.
112    #[must_use]
113    pub fn new() -> Self {
114        Self {
115            versions: VecDeque::new(),
116        }
117    }
118
119    /// Creates a version chain with an initial version.
120    #[must_use]
121    pub fn with_initial(data: T, created_epoch: EpochId, created_by: TxId) -> Self {
122        let mut chain = Self::new();
123        chain.add_version(data, created_epoch, created_by);
124        chain
125    }
126
127    /// Adds a new version to the chain.
128    ///
129    /// The new version becomes the head of the chain.
130    pub fn add_version(&mut self, data: T, created_epoch: EpochId, created_by: TxId) {
131        let version = Version::new(data, created_epoch, created_by);
132        self.versions.push_front(version);
133    }
134
135    /// Finds the version visible at the given epoch.
136    ///
137    /// Returns a reference to the visible version's data, or `None` if no version
138    /// is visible at that epoch.
139    #[inline]
140    #[must_use]
141    pub fn visible_at(&self, epoch: EpochId) -> Option<&T> {
142        self.versions
143            .iter()
144            .find(|v| v.info.is_visible_at(epoch))
145            .map(|v| &v.data)
146    }
147
148    /// Finds the version visible to a specific transaction.
149    ///
150    /// This considers both the transaction's epoch and its own uncommitted changes.
151    #[inline]
152    #[must_use]
153    pub fn visible_to(&self, epoch: EpochId, tx: TxId) -> Option<&T> {
154        self.versions
155            .iter()
156            .find(|v| v.info.is_visible_to(epoch, tx))
157            .map(|v| &v.data)
158    }
159
160    /// Marks the current visible version as deleted.
161    ///
162    /// Returns `true` if a version was marked, `false` if no visible version exists.
163    pub fn mark_deleted(&mut self, delete_epoch: EpochId) -> bool {
164        for version in &mut self.versions {
165            if version.info.deleted_epoch.is_none() {
166                version.info.mark_deleted(delete_epoch);
167                return true;
168            }
169        }
170        false
171    }
172
173    /// Checks if any version was modified by the given transaction.
174    #[must_use]
175    pub fn modified_by(&self, tx: TxId) -> bool {
176        self.versions.iter().any(|v| v.info.created_by == tx)
177    }
178
179    /// Removes all versions created by the given transaction.
180    ///
181    /// Used for rollback to discard uncommitted changes.
182    pub fn remove_versions_by(&mut self, tx: TxId) {
183        self.versions.retain(|v| v.info.created_by != tx);
184    }
185
186    /// Checks if there's a concurrent modification conflict.
187    ///
188    /// A conflict exists if another transaction modified this entity
189    /// after our start epoch.
190    #[must_use]
191    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TxId) -> bool {
192        self.versions.iter().any(|v| {
193            v.info.created_by != our_tx && v.info.created_epoch.as_u64() > start_epoch.as_u64()
194        })
195    }
196
197    /// Returns the number of versions in the chain.
198    #[must_use]
199    pub fn version_count(&self) -> usize {
200        self.versions.len()
201    }
202
203    /// Returns true if the chain has no versions.
204    #[must_use]
205    pub fn is_empty(&self) -> bool {
206        self.versions.is_empty()
207    }
208
209    /// Garbage collects old versions that are no longer visible to any transaction.
210    ///
211    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
212    pub fn gc(&mut self, min_epoch: EpochId) {
213        if self.versions.is_empty() {
214            return;
215        }
216
217        let mut keep_count = 0;
218        let mut found_old_visible = false;
219
220        for (i, version) in self.versions.iter().enumerate() {
221            if version.info.created_epoch.as_u64() >= min_epoch.as_u64() {
222                keep_count = i + 1;
223            } else if !found_old_visible {
224                // Keep the first (most recent) old version
225                found_old_visible = true;
226                keep_count = i + 1;
227            }
228        }
229
230        self.versions.truncate(keep_count);
231    }
232
233    /// Returns an iterator over all versions with their metadata, newest first.
234    ///
235    /// Each item is `(&VersionInfo, &T)` giving both the visibility metadata
236    /// and a reference to the version data.
237    pub fn history(&self) -> impl Iterator<Item = (&VersionInfo, &T)> {
238        self.versions.iter().map(|v| (&v.info, &v.data))
239    }
240
241    /// Returns a reference to the latest version's data regardless of visibility.
242    #[must_use]
243    pub fn latest(&self) -> Option<&T> {
244        self.versions.front().map(|v| &v.data)
245    }
246
247    /// Returns a mutable reference to the latest version's data.
248    #[must_use]
249    pub fn latest_mut(&mut self) -> Option<&mut T> {
250        self.versions.front_mut().map(|v| &mut v.data)
251    }
252}
253
254impl<T> Default for VersionChain<T> {
255    fn default() -> Self {
256        Self::new()
257    }
258}
259
260impl<T: Clone> VersionChain<T> {
261    /// Gets a mutable reference to the visible version's data for modification.
262    ///
263    /// If the version is not owned by this transaction, creates a new version
264    /// with a copy of the data.
265    pub fn get_mut(&mut self, epoch: EpochId, tx: TxId, modify_epoch: EpochId) -> Option<&mut T> {
266        // Find the visible version
267        let visible_idx = self
268            .versions
269            .iter()
270            .position(|v| v.info.is_visible_to(epoch, tx))?;
271
272        let visible = &self.versions[visible_idx];
273
274        if visible.info.created_by == tx {
275            // Already our version, modify in place
276            Some(&mut self.versions[visible_idx].data)
277        } else {
278            // Create a new version with copied data
279            let new_data = visible.data.clone();
280            self.add_version(new_data, modify_epoch, tx);
281            Some(&mut self.versions[0].data)
282        }
283    }
284}
285
286// ============================================================================
287// Tiered Storage Types (Phase 13)
288// ============================================================================
289//
290// These types support the tiered hot/cold storage architecture where version
291// metadata is stored separately from version data. Data lives in arenas (hot)
292// or compressed epoch blocks (cold), while VersionIndex holds lightweight refs.
293
294/// Compact representation of an optional epoch ID.
295///
296/// Uses `u32::MAX` as sentinel for `None`, allowing epochs up to ~4 billion.
297/// This saves 4 bytes compared to `Option<EpochId>` due to niche optimization.
298#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
299#[repr(transparent)]
300#[cfg(feature = "tiered-storage")]
301pub struct OptionalEpochId(u32);
302
303#[cfg(feature = "tiered-storage")]
304impl OptionalEpochId {
305    /// Represents no epoch (deleted_epoch = None).
306    pub const NONE: Self = Self(u32::MAX);
307
308    /// Creates an `OptionalEpochId` from an epoch.
309    ///
310    /// # Panics
311    /// Panics if epoch exceeds u32::MAX - 1 (4,294,967,294).
312    #[must_use]
313    pub fn some(epoch: EpochId) -> Self {
314        assert!(
315            epoch.as_u64() < u64::from(u32::MAX),
316            "epoch {} exceeds OptionalEpochId capacity (max {})",
317            epoch.as_u64(),
318            u32::MAX as u64 - 1
319        );
320        Self(epoch.as_u64() as u32)
321    }
322
323    /// Returns the contained epoch, or `None` if this is `NONE`.
324    #[inline]
325    #[must_use]
326    pub fn get(self) -> Option<EpochId> {
327        if self.0 == u32::MAX {
328            None
329        } else {
330            Some(EpochId::new(u64::from(self.0)))
331        }
332    }
333
334    /// Returns `true` if this contains an epoch.
335    #[must_use]
336    pub fn is_some(self) -> bool {
337        self.0 != u32::MAX
338    }
339
340    /// Returns `true` if this is `NONE`.
341    #[inline]
342    #[must_use]
343    pub fn is_none(self) -> bool {
344        self.0 == u32::MAX
345    }
346}
347
348/// Reference to a hot (arena-allocated) version.
349///
350/// Hot versions are stored in the epoch's arena and can be accessed directly.
351/// This struct only holds metadata; the actual data lives in the arena.
352///
353/// # Memory Layout
354/// - `epoch`: 8 bytes
355/// - `arena_offset`: 4 bytes
356/// - `created_by`: 8 bytes
357/// - `deleted_epoch`: 4 bytes
358/// - Total: 24 bytes
359#[derive(Debug, Clone, Copy, PartialEq, Eq)]
360#[cfg(feature = "tiered-storage")]
361pub struct HotVersionRef {
362    /// Epoch when this version was created.
363    pub epoch: EpochId,
364    /// Offset within the epoch's arena where the data is stored.
365    pub arena_offset: u32,
366    /// Transaction that created this version.
367    pub created_by: TxId,
368    /// Epoch when this version was deleted (NONE if still alive).
369    pub deleted_epoch: OptionalEpochId,
370}
371
372#[cfg(feature = "tiered-storage")]
373impl HotVersionRef {
374    /// Creates a new hot version reference.
375    #[must_use]
376    pub fn new(epoch: EpochId, arena_offset: u32, created_by: TxId) -> Self {
377        Self {
378            epoch,
379            arena_offset,
380            created_by,
381            deleted_epoch: OptionalEpochId::NONE,
382        }
383    }
384
385    /// Checks if this version is visible at the given epoch.
386    #[inline]
387    #[must_use]
388    pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
389        // Must be created at or before the viewing epoch
390        if !self.epoch.is_visible_at(viewing_epoch) {
391            return false;
392        }
393        // Must not be deleted at or before the viewing epoch
394        match self.deleted_epoch.get() {
395            Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
396            None => true,
397        }
398    }
399
400    /// Checks if this version is visible to a specific transaction.
401    #[inline]
402    #[must_use]
403    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
404        // Own modifications are always visible (if not deleted by self)
405        if self.created_by == viewing_tx {
406            return self.deleted_epoch.is_none();
407        }
408        // Otherwise use epoch-based visibility
409        self.is_visible_at(viewing_epoch)
410    }
411}
412
413/// Reference to a cold (compressed) version.
414///
415/// Cold versions are stored in compressed epoch blocks. This is a placeholder
416/// for Phase 14 - the actual compression logic will be implemented there.
417///
418/// # Memory Layout
419/// - `epoch`: 8 bytes
420/// - `block_offset`: 4 bytes
421/// - `length`: 2 bytes
422/// - `created_by`: 8 bytes
423/// - `deleted_epoch`: 4 bytes
424/// - Total: 26 bytes (+ 6 padding = 32 bytes aligned)
425#[derive(Debug, Clone, Copy, PartialEq, Eq)]
426#[cfg(feature = "tiered-storage")]
427pub struct ColdVersionRef {
428    /// Epoch when this version was created.
429    pub epoch: EpochId,
430    /// Offset within the compressed epoch block.
431    pub block_offset: u32,
432    /// Compressed length in bytes.
433    pub length: u16,
434    /// Transaction that created this version.
435    pub created_by: TxId,
436    /// Epoch when this version was deleted.
437    pub deleted_epoch: OptionalEpochId,
438}
439
440#[cfg(feature = "tiered-storage")]
441impl ColdVersionRef {
442    /// Checks if this version is visible at the given epoch.
443    #[inline]
444    #[must_use]
445    pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
446        if !self.epoch.is_visible_at(viewing_epoch) {
447            return false;
448        }
449        match self.deleted_epoch.get() {
450            Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
451            None => true,
452        }
453    }
454
455    /// Checks if this version is visible to a specific transaction.
456    #[inline]
457    #[must_use]
458    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
459        if self.created_by == viewing_tx {
460            return self.deleted_epoch.is_none();
461        }
462        self.is_visible_at(viewing_epoch)
463    }
464}
465
466/// Unified reference to either a hot or cold version.
467#[derive(Debug, Clone, Copy)]
468#[cfg(feature = "tiered-storage")]
469pub enum VersionRef {
470    /// Version data is in arena (hot tier).
471    Hot(HotVersionRef),
472    /// Version data is in compressed storage (cold tier).
473    Cold(ColdVersionRef),
474}
475
476#[cfg(feature = "tiered-storage")]
477impl VersionRef {
478    /// Returns the epoch when this version was created.
479    #[must_use]
480    pub fn epoch(&self) -> EpochId {
481        match self {
482            Self::Hot(h) => h.epoch,
483            Self::Cold(c) => c.epoch,
484        }
485    }
486
487    /// Returns the transaction that created this version.
488    #[must_use]
489    pub fn created_by(&self) -> TxId {
490        match self {
491            Self::Hot(h) => h.created_by,
492            Self::Cold(c) => c.created_by,
493        }
494    }
495
496    /// Returns `true` if this is a hot version.
497    #[must_use]
498    pub fn is_hot(&self) -> bool {
499        matches!(self, Self::Hot(_))
500    }
501
502    /// Returns `true` if this is a cold version.
503    #[must_use]
504    pub fn is_cold(&self) -> bool {
505        matches!(self, Self::Cold(_))
506    }
507
508    /// Returns the epoch when this version was deleted, if any.
509    #[must_use]
510    pub fn deleted_epoch(&self) -> Option<EpochId> {
511        match self {
512            Self::Hot(h) => h.deleted_epoch.get(),
513            Self::Cold(c) => c.deleted_epoch.get(),
514        }
515    }
516}
517
518/// Tiered version index - replaces `VersionChain<T>` for hot/cold storage.
519///
520/// Instead of storing data inline, `VersionIndex` holds lightweight references
521/// to data stored in arenas (hot) or compressed blocks (cold). This enables:
522///
523/// - **No heap allocation** for typical 1-2 version case (SmallVec inline)
524/// - **Separation of metadata and data** for compression
525/// - **Fast visibility checks** via cached `latest_epoch`
526/// - **O(1) epoch drops** instead of per-version deallocation
527///
528/// # Memory Layout
529/// - `hot`: SmallVec<[HotVersionRef; 2]> ≈ 56 bytes inline
530/// - `cold`: SmallVec<[ColdVersionRef; 4]> ≈ 136 bytes inline
531/// - `latest_epoch`: 8 bytes
532/// - Total: ~200 bytes, no heap for typical case
533#[derive(Debug, Clone)]
534#[cfg(feature = "tiered-storage")]
535pub struct VersionIndex {
536    /// Hot versions in arena storage (most recent first).
537    hot: SmallVec<[HotVersionRef; 2]>,
538    /// Cold versions in compressed storage (most recent first).
539    cold: SmallVec<[ColdVersionRef; 4]>,
540    /// Cached epoch of the latest version for fast staleness checks.
541    latest_epoch: EpochId,
542}
543
544#[cfg(feature = "tiered-storage")]
545impl VersionIndex {
546    /// Creates a new empty version index.
547    #[must_use]
548    pub fn new() -> Self {
549        Self {
550            hot: SmallVec::new(),
551            cold: SmallVec::new(),
552            latest_epoch: EpochId::INITIAL,
553        }
554    }
555
556    /// Creates a version index with an initial hot version.
557    #[must_use]
558    pub fn with_initial(hot_ref: HotVersionRef) -> Self {
559        let mut index = Self::new();
560        index.add_hot(hot_ref);
561        index
562    }
563
564    /// Adds a new hot version (becomes the latest).
565    pub fn add_hot(&mut self, hot_ref: HotVersionRef) {
566        // Insert at front (most recent first)
567        self.hot.insert(0, hot_ref);
568        self.latest_epoch = hot_ref.epoch;
569    }
570
571    /// Returns the latest epoch for quick staleness checks.
572    #[must_use]
573    pub fn latest_epoch(&self) -> EpochId {
574        self.latest_epoch
575    }
576
577    /// Returns `true` if this entity has no versions.
578    #[must_use]
579    pub fn is_empty(&self) -> bool {
580        self.hot.is_empty() && self.cold.is_empty()
581    }
582
583    /// Returns the total version count (hot + cold).
584    #[must_use]
585    pub fn version_count(&self) -> usize {
586        self.hot.len() + self.cold.len()
587    }
588
589    /// Returns the number of hot versions.
590    #[must_use]
591    pub fn hot_count(&self) -> usize {
592        self.hot.len()
593    }
594
595    /// Returns the number of cold versions.
596    #[must_use]
597    pub fn cold_count(&self) -> usize {
598        self.cold.len()
599    }
600
601    /// Finds the version visible at the given epoch.
602    #[inline]
603    #[must_use]
604    pub fn visible_at(&self, epoch: EpochId) -> Option<VersionRef> {
605        // Check hot versions first (most recent first, likely case)
606        for v in &self.hot {
607            if v.is_visible_at(epoch) {
608                return Some(VersionRef::Hot(*v));
609            }
610        }
611        // Fall back to cold versions
612        for v in &self.cold {
613            if v.is_visible_at(epoch) {
614                return Some(VersionRef::Cold(*v));
615            }
616        }
617        None
618    }
619
620    /// Finds the version visible to a specific transaction.
621    #[inline]
622    #[must_use]
623    pub fn visible_to(&self, epoch: EpochId, tx: TxId) -> Option<VersionRef> {
624        // Check hot versions first
625        for v in &self.hot {
626            if v.is_visible_to(epoch, tx) {
627                return Some(VersionRef::Hot(*v));
628            }
629        }
630        // Fall back to cold versions
631        for v in &self.cold {
632            if v.is_visible_to(epoch, tx) {
633                return Some(VersionRef::Cold(*v));
634            }
635        }
636        None
637    }
638
639    /// Marks the currently visible version as deleted at the given epoch.
640    ///
641    /// Returns `true` if a version was marked, `false` if no visible version exists.
642    pub fn mark_deleted(&mut self, delete_epoch: EpochId) -> bool {
643        // Find the first non-deleted hot version and mark it
644        for v in &mut self.hot {
645            if v.deleted_epoch.is_none() {
646                v.deleted_epoch = OptionalEpochId::some(delete_epoch);
647                return true;
648            }
649        }
650        // Check cold versions (rare case)
651        for v in &mut self.cold {
652            if v.deleted_epoch.is_none() {
653                v.deleted_epoch = OptionalEpochId::some(delete_epoch);
654                return true;
655            }
656        }
657        false
658    }
659
660    /// Checks if any version was created by the given transaction.
661    #[must_use]
662    pub fn modified_by(&self, tx: TxId) -> bool {
663        self.hot.iter().any(|v| v.created_by == tx) || self.cold.iter().any(|v| v.created_by == tx)
664    }
665
666    /// Removes all versions created by the given transaction (for rollback).
667    pub fn remove_versions_by(&mut self, tx: TxId) {
668        self.hot.retain(|v| v.created_by != tx);
669        self.cold.retain(|v| v.created_by != tx);
670        self.recalculate_latest_epoch();
671    }
672
673    /// Checks for write conflict with concurrent transaction.
674    ///
675    /// A conflict exists if another transaction modified this entity
676    /// after our start epoch.
677    #[must_use]
678    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TxId) -> bool {
679        self.hot
680            .iter()
681            .any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
682            || self
683                .cold
684                .iter()
685                .any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
686    }
687
688    /// Garbage collects old versions not needed by any active transaction.
689    ///
690    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
691    pub fn gc(&mut self, min_epoch: EpochId) {
692        if self.is_empty() {
693            return;
694        }
695
696        // Keep versions that:
697        // 1. Were created at or after min_epoch
698        // 2. The first (most recent) version created before min_epoch
699        let mut found_old_visible = false;
700
701        self.hot.retain(|v| {
702            if v.epoch.as_u64() >= min_epoch.as_u64() {
703                true
704            } else if !found_old_visible {
705                found_old_visible = true;
706                true
707            } else {
708                false
709            }
710        });
711
712        // Same for cold, but only if we haven't found an old visible in hot
713        if !found_old_visible {
714            self.cold.retain(|v| {
715                if v.epoch.as_u64() >= min_epoch.as_u64() {
716                    true
717                } else if !found_old_visible {
718                    found_old_visible = true;
719                    true
720                } else {
721                    false
722                }
723            });
724        } else {
725            // All cold versions are older, only keep those >= min_epoch
726            self.cold.retain(|v| v.epoch.as_u64() >= min_epoch.as_u64());
727        }
728    }
729
730    /// Returns epoch IDs of all versions, newest first.
731    ///
732    /// Combines hot and cold versions, sorted by epoch descending.
733    #[must_use]
734    pub fn version_epochs(&self) -> Vec<EpochId> {
735        let mut epochs: Vec<EpochId> = self
736            .hot
737            .iter()
738            .map(|v| v.epoch)
739            .chain(self.cold.iter().map(|v| v.epoch))
740            .collect();
741        epochs.sort_by_key(|e| std::cmp::Reverse(e.as_u64()));
742        epochs
743    }
744
745    /// Returns all version references with their metadata, newest first.
746    ///
747    /// Each item is `(EpochId, Option<EpochId>, VersionRef)` giving
748    /// the created epoch, deleted epoch, and a reference to read the data.
749    #[must_use]
750    pub fn version_history(&self) -> Vec<(EpochId, Option<EpochId>, VersionRef)> {
751        let mut versions: Vec<(EpochId, Option<EpochId>, VersionRef)> = self
752            .hot
753            .iter()
754            .map(|v| (v.epoch, v.deleted_epoch.get(), VersionRef::Hot(*v)))
755            .chain(
756                self.cold
757                    .iter()
758                    .map(|v| (v.epoch, v.deleted_epoch.get(), VersionRef::Cold(*v))),
759            )
760            .collect();
761        versions.sort_by_key(|v| std::cmp::Reverse(v.0.as_u64()));
762        versions
763    }
764
765    /// Returns a reference to the latest version regardless of visibility.
766    #[must_use]
767    pub fn latest(&self) -> Option<VersionRef> {
768        self.hot
769            .first()
770            .map(|v| VersionRef::Hot(*v))
771            .or_else(|| self.cold.first().map(|v| VersionRef::Cold(*v)))
772    }
773
774    /// Freezes hot versions for a given epoch into cold storage.
775    ///
776    /// This is called when an epoch is no longer needed by any active transaction.
777    /// The actual compression happens in Phase 14; for now this just moves refs.
778    pub fn freeze_epoch(
779        &mut self,
780        epoch: EpochId,
781        cold_refs: impl Iterator<Item = ColdVersionRef>,
782    ) {
783        // Remove hot refs for this epoch
784        self.hot.retain(|v| v.epoch != epoch);
785
786        // Add cold refs
787        self.cold.extend(cold_refs);
788
789        // Keep cold sorted by epoch (descending = most recent first)
790        self.cold
791            .sort_by(|a, b| b.epoch.as_u64().cmp(&a.epoch.as_u64()));
792
793        self.recalculate_latest_epoch();
794    }
795
796    /// Returns hot version refs for a specific epoch (for freeze operation).
797    pub fn hot_refs_for_epoch(&self, epoch: EpochId) -> impl Iterator<Item = &HotVersionRef> {
798        self.hot.iter().filter(move |v| v.epoch == epoch)
799    }
800
801    /// Returns `true` if the hot SmallVec has spilled to the heap.
802    #[must_use]
803    pub fn hot_spilled(&self) -> bool {
804        self.hot.spilled()
805    }
806
807    /// Returns `true` if the cold SmallVec has spilled to the heap.
808    #[must_use]
809    pub fn cold_spilled(&self) -> bool {
810        self.cold.spilled()
811    }
812
813    fn recalculate_latest_epoch(&mut self) {
814        self.latest_epoch = self
815            .hot
816            .first()
817            .map(|v| v.epoch)
818            .or_else(|| self.cold.first().map(|v| v.epoch))
819            .unwrap_or(EpochId::INITIAL);
820    }
821}
822
823#[cfg(feature = "tiered-storage")]
824impl Default for VersionIndex {
825    fn default() -> Self {
826        Self::new()
827    }
828}
829
830#[cfg(test)]
831mod tests {
832    use super::*;
833
834    #[test]
835    fn test_version_visibility() {
836        let v = VersionInfo::new(EpochId::new(5), TxId::new(1));
837
838        // Not visible before creation
839        assert!(!v.is_visible_at(EpochId::new(4)));
840
841        // Visible at creation epoch and after
842        assert!(v.is_visible_at(EpochId::new(5)));
843        assert!(v.is_visible_at(EpochId::new(10)));
844    }
845
846    #[test]
847    fn test_deleted_version_visibility() {
848        let mut v = VersionInfo::new(EpochId::new(5), TxId::new(1));
849        v.mark_deleted(EpochId::new(10));
850
851        // Visible between creation and deletion
852        assert!(v.is_visible_at(EpochId::new(5)));
853        assert!(v.is_visible_at(EpochId::new(9)));
854
855        // Not visible at or after deletion
856        assert!(!v.is_visible_at(EpochId::new(10)));
857        assert!(!v.is_visible_at(EpochId::new(15)));
858    }
859
860    #[test]
861    fn test_version_visibility_to_transaction() {
862        let v = VersionInfo::new(EpochId::new(5), TxId::new(1));
863
864        // Creator can see it even if viewing at earlier epoch
865        assert!(v.is_visible_to(EpochId::new(3), TxId::new(1)));
866
867        // Other transactions can only see it at or after creation epoch
868        assert!(!v.is_visible_to(EpochId::new(3), TxId::new(2)));
869        assert!(v.is_visible_to(EpochId::new(5), TxId::new(2)));
870    }
871
872    #[test]
873    fn test_version_chain_basic() {
874        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
875
876        // Should see v1 at epoch 1+
877        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
878        assert_eq!(chain.visible_at(EpochId::new(0)), None);
879
880        // Add v2
881        chain.add_version("v2", EpochId::new(5), TxId::new(2));
882
883        // Should see v1 at epoch < 5, v2 at epoch >= 5
884        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
885        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
886        assert_eq!(chain.visible_at(EpochId::new(5)), Some(&"v2"));
887        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v2"));
888    }
889
890    #[test]
891    fn test_version_chain_rollback() {
892        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
893        chain.add_version("v2", EpochId::new(5), TxId::new(2));
894        chain.add_version("v3", EpochId::new(6), TxId::new(2));
895
896        assert_eq!(chain.version_count(), 3);
897
898        // Rollback tx 2's changes
899        chain.remove_versions_by(TxId::new(2));
900
901        assert_eq!(chain.version_count(), 1);
902        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v1"));
903    }
904
905    #[test]
906    fn test_version_chain_deletion() {
907        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
908
909        // Mark as deleted at epoch 5
910        assert!(chain.mark_deleted(EpochId::new(5)));
911
912        // Should see v1 before deletion, nothing after
913        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
914        assert_eq!(chain.visible_at(EpochId::new(5)), None);
915        assert_eq!(chain.visible_at(EpochId::new(10)), None);
916    }
917}
918
919// ============================================================================
920// Tiered Storage Tests
921// ============================================================================
922
923#[cfg(all(test, feature = "tiered-storage"))]
924mod tiered_storage_tests {
925    use super::*;
926
927    #[test]
928    fn test_optional_epoch_id() {
929        // Test NONE
930        let none = OptionalEpochId::NONE;
931        assert!(none.is_none());
932        assert!(!none.is_some());
933        assert_eq!(none.get(), None);
934
935        // Test Some
936        let some = OptionalEpochId::some(EpochId::new(42));
937        assert!(some.is_some());
938        assert!(!some.is_none());
939        assert_eq!(some.get(), Some(EpochId::new(42)));
940
941        // Test zero epoch
942        let zero = OptionalEpochId::some(EpochId::new(0));
943        assert!(zero.is_some());
944        assert_eq!(zero.get(), Some(EpochId::new(0)));
945    }
946
947    #[test]
948    fn test_hot_version_ref_visibility() {
949        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
950
951        // Not visible before creation
952        assert!(!hot.is_visible_at(EpochId::new(4)));
953
954        // Visible at creation and after
955        assert!(hot.is_visible_at(EpochId::new(5)));
956        assert!(hot.is_visible_at(EpochId::new(10)));
957    }
958
959    #[test]
960    fn test_hot_version_ref_deleted_visibility() {
961        let mut hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
962        hot.deleted_epoch = OptionalEpochId::some(EpochId::new(10));
963
964        // Visible between creation and deletion
965        assert!(hot.is_visible_at(EpochId::new(5)));
966        assert!(hot.is_visible_at(EpochId::new(9)));
967
968        // Not visible at or after deletion
969        assert!(!hot.is_visible_at(EpochId::new(10)));
970        assert!(!hot.is_visible_at(EpochId::new(15)));
971    }
972
973    #[test]
974    fn test_hot_version_ref_transaction_visibility() {
975        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(1));
976
977        // Creator can see it even at earlier epoch
978        assert!(hot.is_visible_to(EpochId::new(3), TxId::new(1)));
979
980        // Other transactions can only see it at or after creation
981        assert!(!hot.is_visible_to(EpochId::new(3), TxId::new(2)));
982        assert!(hot.is_visible_to(EpochId::new(5), TxId::new(2)));
983    }
984
985    #[test]
986    fn test_version_index_basic() {
987        let hot = HotVersionRef::new(EpochId::new(1), 0, TxId::new(1));
988        let mut index = VersionIndex::with_initial(hot);
989
990        // Should see version at epoch 1+
991        assert!(index.visible_at(EpochId::new(1)).is_some());
992        assert!(index.visible_at(EpochId::new(0)).is_none());
993
994        // Add another version
995        let hot2 = HotVersionRef::new(EpochId::new(5), 100, TxId::new(2));
996        index.add_hot(hot2);
997
998        // Should see v1 at epoch < 5, v2 at epoch >= 5
999        let v1 = index.visible_at(EpochId::new(4)).unwrap();
1000        assert!(matches!(v1, VersionRef::Hot(h) if h.arena_offset == 0));
1001
1002        let v2 = index.visible_at(EpochId::new(5)).unwrap();
1003        assert!(matches!(v2, VersionRef::Hot(h) if h.arena_offset == 100));
1004    }
1005
1006    #[test]
1007    fn test_version_index_deletion() {
1008        let hot = HotVersionRef::new(EpochId::new(1), 0, TxId::new(1));
1009        let mut index = VersionIndex::with_initial(hot);
1010
1011        // Mark as deleted at epoch 5
1012        assert!(index.mark_deleted(EpochId::new(5)));
1013
1014        // Should see version before deletion, nothing after
1015        assert!(index.visible_at(EpochId::new(4)).is_some());
1016        assert!(index.visible_at(EpochId::new(5)).is_none());
1017        assert!(index.visible_at(EpochId::new(10)).is_none());
1018    }
1019
1020    #[test]
1021    fn test_version_index_transaction_visibility() {
1022        let tx = TxId::new(10);
1023        let hot = HotVersionRef::new(EpochId::new(5), 0, tx);
1024        let index = VersionIndex::with_initial(hot);
1025
1026        // Creator can see it even at earlier epoch
1027        assert!(index.visible_to(EpochId::new(3), tx).is_some());
1028
1029        // Other transactions cannot see it before creation
1030        assert!(index.visible_to(EpochId::new(3), TxId::new(20)).is_none());
1031        assert!(index.visible_to(EpochId::new(5), TxId::new(20)).is_some());
1032    }
1033
1034    #[test]
1035    fn test_version_index_rollback() {
1036        let tx1 = TxId::new(10);
1037        let tx2 = TxId::new(20);
1038
1039        let mut index = VersionIndex::new();
1040        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, tx1));
1041        index.add_hot(HotVersionRef::new(EpochId::new(2), 100, tx2));
1042        index.add_hot(HotVersionRef::new(EpochId::new(3), 200, tx2));
1043
1044        assert_eq!(index.version_count(), 3);
1045        assert!(index.modified_by(tx1));
1046        assert!(index.modified_by(tx2));
1047
1048        // Rollback tx2's changes
1049        index.remove_versions_by(tx2);
1050
1051        assert_eq!(index.version_count(), 1);
1052        assert!(index.modified_by(tx1));
1053        assert!(!index.modified_by(tx2));
1054
1055        // Should only see tx1's version
1056        let v = index.visible_at(EpochId::new(10)).unwrap();
1057        assert!(matches!(v, VersionRef::Hot(h) if h.created_by == tx1));
1058    }
1059
1060    #[test]
1061    fn test_version_index_gc() {
1062        let mut index = VersionIndex::new();
1063
1064        // Add versions at epochs 1, 3, 5
1065        for epoch in [1, 3, 5] {
1066            index.add_hot(HotVersionRef::new(
1067                EpochId::new(epoch),
1068                epoch as u32 * 100,
1069                TxId::new(epoch),
1070            ));
1071        }
1072
1073        assert_eq!(index.version_count(), 3);
1074
1075        // GC with min_epoch = 4
1076        // Should keep: epoch 5 (>= 4) and epoch 3 (first old visible)
1077        index.gc(EpochId::new(4));
1078
1079        assert_eq!(index.version_count(), 2);
1080
1081        // Verify we kept epochs 5 and 3
1082        assert!(index.visible_at(EpochId::new(5)).is_some());
1083        assert!(index.visible_at(EpochId::new(3)).is_some());
1084    }
1085
1086    #[test]
1087    fn test_version_index_conflict_detection() {
1088        let tx1 = TxId::new(10);
1089        let tx2 = TxId::new(20);
1090
1091        let mut index = VersionIndex::new();
1092        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, tx1));
1093        index.add_hot(HotVersionRef::new(EpochId::new(5), 100, tx2));
1094
1095        // tx1 started at epoch 0, tx2 modified at epoch 5 -> conflict for tx1
1096        assert!(index.has_conflict(EpochId::new(0), tx1));
1097
1098        // tx2 started at epoch 0, tx1 modified at epoch 1 -> also conflict for tx2
1099        assert!(index.has_conflict(EpochId::new(0), tx2));
1100
1101        // tx1 started after tx2's modification -> no conflict
1102        assert!(!index.has_conflict(EpochId::new(5), tx1));
1103
1104        // tx2 started after tx1's modification -> no conflict
1105        assert!(!index.has_conflict(EpochId::new(1), tx2));
1106
1107        // If only tx1's version exists, tx1 doesn't conflict with itself
1108        let mut index2 = VersionIndex::new();
1109        index2.add_hot(HotVersionRef::new(EpochId::new(5), 0, tx1));
1110        assert!(!index2.has_conflict(EpochId::new(0), tx1));
1111    }
1112
1113    #[test]
1114    fn test_version_index_smallvec_no_heap() {
1115        let mut index = VersionIndex::new();
1116
1117        // Add 2 hot versions (within inline capacity)
1118        for i in 0..2 {
1119            index.add_hot(HotVersionRef::new(EpochId::new(i), i as u32, TxId::new(i)));
1120        }
1121
1122        // SmallVec should not have spilled to heap
1123        assert!(!index.hot_spilled());
1124        assert!(!index.cold_spilled());
1125    }
1126
1127    #[test]
1128    fn test_version_index_freeze_epoch() {
1129        let mut index = VersionIndex::new();
1130        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, TxId::new(1)));
1131        index.add_hot(HotVersionRef::new(EpochId::new(2), 100, TxId::new(2)));
1132
1133        assert_eq!(index.hot_count(), 2);
1134        assert_eq!(index.cold_count(), 0);
1135
1136        // Freeze epoch 1
1137        let cold_ref = ColdVersionRef {
1138            epoch: EpochId::new(1),
1139            block_offset: 0,
1140            length: 32,
1141            created_by: TxId::new(1),
1142            deleted_epoch: OptionalEpochId::NONE,
1143        };
1144        index.freeze_epoch(EpochId::new(1), std::iter::once(cold_ref));
1145
1146        // Hot should have 1, cold should have 1
1147        assert_eq!(index.hot_count(), 1);
1148        assert_eq!(index.cold_count(), 1);
1149
1150        // Visibility should still work
1151        assert!(index.visible_at(EpochId::new(1)).is_some());
1152        assert!(index.visible_at(EpochId::new(2)).is_some());
1153
1154        // Check that cold version is actually cold
1155        let v1 = index.visible_at(EpochId::new(1)).unwrap();
1156        assert!(v1.is_cold());
1157
1158        let v2 = index.visible_at(EpochId::new(2)).unwrap();
1159        assert!(v2.is_hot());
1160    }
1161
1162    #[test]
1163    fn test_version_ref_accessors() {
1164        let hot = HotVersionRef::new(EpochId::new(5), 100, TxId::new(10));
1165        let vr = VersionRef::Hot(hot);
1166
1167        assert_eq!(vr.epoch(), EpochId::new(5));
1168        assert_eq!(vr.created_by(), TxId::new(10));
1169        assert!(vr.is_hot());
1170        assert!(!vr.is_cold());
1171    }
1172
1173    #[test]
1174    fn test_version_index_latest_epoch() {
1175        let mut index = VersionIndex::new();
1176        assert_eq!(index.latest_epoch(), EpochId::INITIAL);
1177
1178        index.add_hot(HotVersionRef::new(EpochId::new(5), 0, TxId::new(1)));
1179        assert_eq!(index.latest_epoch(), EpochId::new(5));
1180
1181        index.add_hot(HotVersionRef::new(EpochId::new(10), 100, TxId::new(2)));
1182        assert_eq!(index.latest_epoch(), EpochId::new(10));
1183
1184        // After rollback, should recalculate
1185        index.remove_versions_by(TxId::new(2));
1186        assert_eq!(index.latest_epoch(), EpochId::new(5));
1187    }
1188
1189    #[test]
1190    fn test_version_index_default() {
1191        let index = VersionIndex::default();
1192        assert!(index.is_empty());
1193        assert_eq!(index.version_count(), 0);
1194    }
1195
1196    #[test]
1197    fn test_version_index_latest() {
1198        let mut index = VersionIndex::new();
1199        assert!(index.latest().is_none());
1200
1201        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, TxId::new(1)));
1202        let latest = index.latest().unwrap();
1203        assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(1)));
1204
1205        index.add_hot(HotVersionRef::new(EpochId::new(5), 100, TxId::new(2)));
1206        let latest = index.latest().unwrap();
1207        assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(5)));
1208    }
1209}