Skip to main content

grafeo_common/
mvcc.rs

1//! MVCC (Multi-Version Concurrency Control) primitives.
2//!
3//! This is how Grafeo handles concurrent reads and writes without blocking.
4//! Each entity has a [`VersionChain`] that tracks all versions. Readers see
5//! consistent snapshots, writers create new versions, and old versions get
6//! garbage collected when no one needs them anymore.
7
8use std::collections::VecDeque;
9
10#[cfg(feature = "tiered-storage")]
11use smallvec::SmallVec;
12
13use crate::types::{EpochId, TransactionId};
14
15/// Tracks when a version was created and deleted for visibility checks.
16#[derive(Debug, Clone, Copy)]
17pub struct VersionInfo {
18    /// The epoch this version was created in.
19    pub created_epoch: EpochId,
20    /// The epoch this version was deleted in (if any).
21    pub deleted_epoch: Option<EpochId>,
22    /// The transaction that created this version.
23    pub created_by: TransactionId,
24}
25
26impl VersionInfo {
27    /// Creates a new version info.
28    #[must_use]
29    pub fn new(created_epoch: EpochId, created_by: TransactionId) -> Self {
30        Self {
31            created_epoch,
32            deleted_epoch: None,
33            created_by,
34        }
35    }
36
37    /// Marks this version as deleted.
38    pub fn mark_deleted(&mut self, epoch: EpochId) {
39        self.deleted_epoch = Some(epoch);
40    }
41
42    /// Checks if this version is visible at the given epoch.
43    #[inline]
44    #[must_use]
45    pub fn is_visible_at(&self, epoch: EpochId) -> bool {
46        // Visible if created before or at the viewing epoch
47        // and not deleted before the viewing epoch
48        if !self.created_epoch.is_visible_at(epoch) {
49            return false;
50        }
51
52        if let Some(deleted) = self.deleted_epoch {
53            // Not visible if deleted at or before the viewing epoch
54            deleted.as_u64() > epoch.as_u64()
55        } else {
56            true
57        }
58    }
59
60    /// Checks if this version is visible to a specific transaction.
61    ///
62    /// A version is visible to a transaction if:
63    /// 1. It was created by the same transaction, OR
64    /// 2. It was created in an epoch before the transaction's start epoch
65    ///    and not deleted before that epoch
66    #[inline]
67    #[must_use]
68    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TransactionId) -> bool {
69        // Own modifications are always visible
70        if self.created_by == viewing_tx {
71            return self.deleted_epoch.is_none();
72        }
73
74        // Otherwise, use epoch-based visibility
75        self.is_visible_at(viewing_epoch)
76    }
77}
78
79/// A single version of data.
80#[derive(Debug, Clone)]
81pub struct Version<T> {
82    /// Visibility metadata.
83    pub info: VersionInfo,
84    /// The actual data.
85    pub data: T,
86}
87
88impl<T> Version<T> {
89    /// Creates a new version.
90    #[must_use]
91    pub fn new(data: T, created_epoch: EpochId, created_by: TransactionId) -> Self {
92        Self {
93            info: VersionInfo::new(created_epoch, created_by),
94            data,
95        }
96    }
97}
98
99/// All versions of a single entity, newest first.
100///
101/// Each node/edge has one of these tracking its version history. Use
102/// [`visible_at()`](Self::visible_at) to get the version at a specific epoch,
103/// or [`visible_to()`](Self::visible_to) for transaction-aware visibility.
104#[derive(Debug, Clone)]
105pub struct VersionChain<T> {
106    /// Versions ordered newest-first.
107    versions: VecDeque<Version<T>>,
108}
109
110impl<T> VersionChain<T> {
111    /// Creates a new empty version chain.
112    #[must_use]
113    pub fn new() -> Self {
114        Self {
115            versions: VecDeque::new(),
116        }
117    }
118
119    /// Creates a version chain with an initial version.
120    #[must_use]
121    pub fn with_initial(data: T, created_epoch: EpochId, created_by: TransactionId) -> Self {
122        let mut chain = Self::new();
123        chain.add_version(data, created_epoch, created_by);
124        chain
125    }
126
127    /// Adds a new version to the chain.
128    ///
129    /// The new version becomes the head of the chain.
130    pub fn add_version(&mut self, data: T, created_epoch: EpochId, created_by: TransactionId) {
131        let version = Version::new(data, created_epoch, created_by);
132        self.versions.push_front(version);
133    }
134
135    /// Finds the version visible at the given epoch.
136    ///
137    /// Returns a reference to the visible version's data, or `None` if no version
138    /// is visible at that epoch.
139    #[inline]
140    #[must_use]
141    pub fn visible_at(&self, epoch: EpochId) -> Option<&T> {
142        self.versions
143            .iter()
144            .find(|v| v.info.is_visible_at(epoch))
145            .map(|v| &v.data)
146    }
147
148    /// Finds the version visible to a specific transaction.
149    ///
150    /// This considers both the transaction's epoch and its own uncommitted changes.
151    #[inline]
152    #[must_use]
153    pub fn visible_to(&self, epoch: EpochId, tx: TransactionId) -> Option<&T> {
154        self.versions
155            .iter()
156            .find(|v| v.info.is_visible_to(epoch, tx))
157            .map(|v| &v.data)
158    }
159
160    /// Marks the current visible version as deleted.
161    ///
162    /// Returns `true` if a version was marked, `false` if no visible version exists.
163    pub fn mark_deleted(&mut self, delete_epoch: EpochId) -> bool {
164        for version in &mut self.versions {
165            if version.info.deleted_epoch.is_none() {
166                version.info.mark_deleted(delete_epoch);
167                return true;
168            }
169        }
170        false
171    }
172
173    /// Checks if any version was modified by the given transaction.
174    #[must_use]
175    pub fn modified_by(&self, tx: TransactionId) -> bool {
176        self.versions.iter().any(|v| v.info.created_by == tx)
177    }
178
179    /// Removes all versions created by the given transaction.
180    ///
181    /// Used for rollback to discard uncommitted changes.
182    pub fn remove_versions_by(&mut self, tx: TransactionId) {
183        self.versions.retain(|v| v.info.created_by != tx);
184    }
185
186    /// Checks if there's a concurrent modification conflict.
187    ///
188    /// A conflict exists if another transaction modified this entity
189    /// after our start epoch.
190    #[must_use]
191    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TransactionId) -> bool {
192        self.versions.iter().any(|v| {
193            v.info.created_by != our_tx && v.info.created_epoch.as_u64() > start_epoch.as_u64()
194        })
195    }
196
197    /// Returns the number of versions in the chain.
198    #[must_use]
199    pub fn version_count(&self) -> usize {
200        self.versions.len()
201    }
202
203    /// Returns true if the chain has no versions.
204    #[must_use]
205    pub fn is_empty(&self) -> bool {
206        self.versions.is_empty()
207    }
208
209    /// Garbage collects old versions that are no longer visible to any transaction.
210    ///
211    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
212    pub fn gc(&mut self, min_epoch: EpochId) {
213        if self.versions.is_empty() {
214            return;
215        }
216
217        let mut keep_count = 0;
218        let mut found_old_visible = false;
219
220        for (i, version) in self.versions.iter().enumerate() {
221            if version.info.created_epoch.as_u64() >= min_epoch.as_u64() {
222                keep_count = i + 1;
223            } else if !found_old_visible {
224                // Keep the first (most recent) old version
225                found_old_visible = true;
226                keep_count = i + 1;
227            }
228        }
229
230        self.versions.truncate(keep_count);
231    }
232
233    /// Returns an iterator over all versions with their metadata, newest first.
234    ///
235    /// Each item is `(&VersionInfo, &T)` giving both the visibility metadata
236    /// and a reference to the version data.
237    pub fn history(&self) -> impl Iterator<Item = (&VersionInfo, &T)> {
238        self.versions.iter().map(|v| (&v.info, &v.data))
239    }
240
241    /// Returns a reference to the latest version's data regardless of visibility.
242    #[must_use]
243    pub fn latest(&self) -> Option<&T> {
244        self.versions.front().map(|v| &v.data)
245    }
246
247    /// Returns a mutable reference to the latest version's data.
248    #[must_use]
249    pub fn latest_mut(&mut self) -> Option<&mut T> {
250        self.versions.front_mut().map(|v| &mut v.data)
251    }
252}
253
254impl<T> Default for VersionChain<T> {
255    fn default() -> Self {
256        Self::new()
257    }
258}
259
260impl<T: Clone> VersionChain<T> {
261    /// Gets a mutable reference to the visible version's data for modification.
262    ///
263    /// If the version is not owned by this transaction, creates a new version
264    /// with a copy of the data.
265    pub fn get_mut(
266        &mut self,
267        epoch: EpochId,
268        tx: TransactionId,
269        modify_epoch: EpochId,
270    ) -> Option<&mut T> {
271        // Find the visible version
272        let visible_idx = self
273            .versions
274            .iter()
275            .position(|v| v.info.is_visible_to(epoch, tx))?;
276
277        let visible = &self.versions[visible_idx];
278
279        if visible.info.created_by == tx {
280            // Already our version, modify in place
281            Some(&mut self.versions[visible_idx].data)
282        } else {
283            // Create a new version with copied data
284            let new_data = visible.data.clone();
285            self.add_version(new_data, modify_epoch, tx);
286            Some(&mut self.versions[0].data)
287        }
288    }
289}
290
291// ============================================================================
292// Tiered Storage Types (Phase 13)
293// ============================================================================
294//
295// These types support the tiered hot/cold storage architecture where version
296// metadata is stored separately from version data. Data lives in arenas (hot)
297// or compressed epoch blocks (cold), while VersionIndex holds lightweight refs.
298
299/// Compact representation of an optional epoch ID.
300///
301/// Uses `u32::MAX` as sentinel for `None`, allowing epochs up to ~4 billion.
302/// This saves 4 bytes compared to `Option<EpochId>` due to niche optimization.
303#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
304#[repr(transparent)]
305#[cfg(feature = "tiered-storage")]
306pub struct OptionalEpochId(u32);
307
308#[cfg(feature = "tiered-storage")]
309impl OptionalEpochId {
310    /// Represents no epoch (deleted_epoch = None).
311    pub const NONE: Self = Self(u32::MAX);
312
313    /// Creates an `OptionalEpochId` from an epoch.
314    ///
315    /// # Panics
316    /// Panics if epoch exceeds u32::MAX - 1 (4,294,967,294).
317    #[must_use]
318    pub fn some(epoch: EpochId) -> Self {
319        assert!(
320            epoch.as_u64() < u64::from(u32::MAX),
321            "epoch {} exceeds OptionalEpochId capacity (max {})",
322            epoch.as_u64(),
323            u32::MAX as u64 - 1
324        );
325        Self(epoch.as_u64() as u32)
326    }
327
328    /// Returns the contained epoch, or `None` if this is `NONE`.
329    #[inline]
330    #[must_use]
331    pub fn get(self) -> Option<EpochId> {
332        if self.0 == u32::MAX {
333            None
334        } else {
335            Some(EpochId::new(u64::from(self.0)))
336        }
337    }
338
339    /// Returns `true` if this contains an epoch.
340    #[must_use]
341    pub fn is_some(self) -> bool {
342        self.0 != u32::MAX
343    }
344
345    /// Returns `true` if this is `NONE`.
346    #[inline]
347    #[must_use]
348    pub fn is_none(self) -> bool {
349        self.0 == u32::MAX
350    }
351}
352
353/// Reference to a hot (arena-allocated) version.
354///
355/// Hot versions are stored in the epoch's arena and can be accessed directly.
356/// This struct only holds metadata; the actual data lives in the arena.
357///
358/// # Memory Layout
359/// - `epoch`: 8 bytes
360/// - `arena_offset`: 4 bytes
361/// - `created_by`: 8 bytes
362/// - `deleted_epoch`: 4 bytes
363/// - Total: 24 bytes
364#[derive(Debug, Clone, Copy, PartialEq, Eq)]
365#[cfg(feature = "tiered-storage")]
366pub struct HotVersionRef {
367    /// Epoch when this version was created.
368    pub epoch: EpochId,
369    /// Offset within the epoch's arena where the data is stored.
370    pub arena_offset: u32,
371    /// Transaction that created this version.
372    pub created_by: TransactionId,
373    /// Epoch when this version was deleted (NONE if still alive).
374    pub deleted_epoch: OptionalEpochId,
375}
376
377#[cfg(feature = "tiered-storage")]
378impl HotVersionRef {
379    /// Creates a new hot version reference.
380    #[must_use]
381    pub fn new(epoch: EpochId, arena_offset: u32, created_by: TransactionId) -> Self {
382        Self {
383            epoch,
384            arena_offset,
385            created_by,
386            deleted_epoch: OptionalEpochId::NONE,
387        }
388    }
389
390    /// Checks if this version is visible at the given epoch.
391    #[inline]
392    #[must_use]
393    pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
394        // Must be created at or before the viewing epoch
395        if !self.epoch.is_visible_at(viewing_epoch) {
396            return false;
397        }
398        // Must not be deleted at or before the viewing epoch
399        match self.deleted_epoch.get() {
400            Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
401            None => true,
402        }
403    }
404
405    /// Checks if this version is visible to a specific transaction.
406    #[inline]
407    #[must_use]
408    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TransactionId) -> bool {
409        // Own modifications are always visible (if not deleted by self)
410        if self.created_by == viewing_tx {
411            return self.deleted_epoch.is_none();
412        }
413        // Otherwise use epoch-based visibility
414        self.is_visible_at(viewing_epoch)
415    }
416}
417
418/// Reference to a cold (compressed) version.
419///
420/// Cold versions are stored in compressed epoch blocks. This is a placeholder
421/// for Phase 14 - the actual compression logic will be implemented there.
422///
423/// # Memory Layout
424/// - `epoch`: 8 bytes
425/// - `block_offset`: 4 bytes
426/// - `length`: 2 bytes
427/// - `created_by`: 8 bytes
428/// - `deleted_epoch`: 4 bytes
429/// - Total: 26 bytes (+ 6 padding = 32 bytes aligned)
430#[derive(Debug, Clone, Copy, PartialEq, Eq)]
431#[cfg(feature = "tiered-storage")]
432pub struct ColdVersionRef {
433    /// Epoch when this version was created.
434    pub epoch: EpochId,
435    /// Offset within the compressed epoch block.
436    pub block_offset: u32,
437    /// Compressed length in bytes.
438    pub length: u16,
439    /// Transaction that created this version.
440    pub created_by: TransactionId,
441    /// Epoch when this version was deleted.
442    pub deleted_epoch: OptionalEpochId,
443}
444
445#[cfg(feature = "tiered-storage")]
446impl ColdVersionRef {
447    /// Checks if this version is visible at the given epoch.
448    #[inline]
449    #[must_use]
450    pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
451        if !self.epoch.is_visible_at(viewing_epoch) {
452            return false;
453        }
454        match self.deleted_epoch.get() {
455            Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
456            None => true,
457        }
458    }
459
460    /// Checks if this version is visible to a specific transaction.
461    #[inline]
462    #[must_use]
463    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TransactionId) -> bool {
464        if self.created_by == viewing_tx {
465            return self.deleted_epoch.is_none();
466        }
467        self.is_visible_at(viewing_epoch)
468    }
469}
470
471/// Unified reference to either a hot or cold version.
472#[derive(Debug, Clone, Copy)]
473#[cfg(feature = "tiered-storage")]
474pub enum VersionRef {
475    /// Version data is in arena (hot tier).
476    Hot(HotVersionRef),
477    /// Version data is in compressed storage (cold tier).
478    Cold(ColdVersionRef),
479}
480
481#[cfg(feature = "tiered-storage")]
482impl VersionRef {
483    /// Returns the epoch when this version was created.
484    #[must_use]
485    pub fn epoch(&self) -> EpochId {
486        match self {
487            Self::Hot(h) => h.epoch,
488            Self::Cold(c) => c.epoch,
489        }
490    }
491
492    /// Returns the transaction that created this version.
493    #[must_use]
494    pub fn created_by(&self) -> TransactionId {
495        match self {
496            Self::Hot(h) => h.created_by,
497            Self::Cold(c) => c.created_by,
498        }
499    }
500
501    /// Returns `true` if this is a hot version.
502    #[must_use]
503    pub fn is_hot(&self) -> bool {
504        matches!(self, Self::Hot(_))
505    }
506
507    /// Returns `true` if this is a cold version.
508    #[must_use]
509    pub fn is_cold(&self) -> bool {
510        matches!(self, Self::Cold(_))
511    }
512
513    /// Returns the epoch when this version was deleted, if any.
514    #[must_use]
515    pub fn deleted_epoch(&self) -> Option<EpochId> {
516        match self {
517            Self::Hot(h) => h.deleted_epoch.get(),
518            Self::Cold(c) => c.deleted_epoch.get(),
519        }
520    }
521}
522
523/// Tiered version index - replaces `VersionChain<T>` for hot/cold storage.
524///
525/// Instead of storing data inline, `VersionIndex` holds lightweight references
526/// to data stored in arenas (hot) or compressed blocks (cold). This enables:
527///
528/// - **No heap allocation** for typical 1-2 version case (SmallVec inline)
529/// - **Separation of metadata and data** for compression
530/// - **Fast visibility checks** via cached `latest_epoch`
531/// - **O(1) epoch drops** instead of per-version deallocation
532///
533/// # Memory Layout
534/// - `hot`: SmallVec<[HotVersionRef; 2]> ≈ 56 bytes inline
535/// - `cold`: SmallVec<[ColdVersionRef; 4]> ≈ 136 bytes inline
536/// - `latest_epoch`: 8 bytes
537/// - Total: ~200 bytes, no heap for typical case
538#[derive(Debug, Clone)]
539#[cfg(feature = "tiered-storage")]
540pub struct VersionIndex {
541    /// Hot versions in arena storage (most recent first).
542    hot: SmallVec<[HotVersionRef; 2]>,
543    /// Cold versions in compressed storage (most recent first).
544    cold: SmallVec<[ColdVersionRef; 4]>,
545    /// Cached epoch of the latest version for fast staleness checks.
546    latest_epoch: EpochId,
547}
548
549#[cfg(feature = "tiered-storage")]
550impl VersionIndex {
551    /// Creates a new empty version index.
552    #[must_use]
553    pub fn new() -> Self {
554        Self {
555            hot: SmallVec::new(),
556            cold: SmallVec::new(),
557            latest_epoch: EpochId::INITIAL,
558        }
559    }
560
561    /// Creates a version index with an initial hot version.
562    #[must_use]
563    pub fn with_initial(hot_ref: HotVersionRef) -> Self {
564        let mut index = Self::new();
565        index.add_hot(hot_ref);
566        index
567    }
568
569    /// Adds a new hot version (becomes the latest).
570    pub fn add_hot(&mut self, hot_ref: HotVersionRef) {
571        // Insert at front (most recent first)
572        self.hot.insert(0, hot_ref);
573        self.latest_epoch = hot_ref.epoch;
574    }
575
576    /// Returns the latest epoch for quick staleness checks.
577    #[must_use]
578    pub fn latest_epoch(&self) -> EpochId {
579        self.latest_epoch
580    }
581
582    /// Returns `true` if this entity has no versions.
583    #[must_use]
584    pub fn is_empty(&self) -> bool {
585        self.hot.is_empty() && self.cold.is_empty()
586    }
587
588    /// Returns the total version count (hot + cold).
589    #[must_use]
590    pub fn version_count(&self) -> usize {
591        self.hot.len() + self.cold.len()
592    }
593
594    /// Returns the number of hot versions.
595    #[must_use]
596    pub fn hot_count(&self) -> usize {
597        self.hot.len()
598    }
599
600    /// Returns the number of cold versions.
601    #[must_use]
602    pub fn cold_count(&self) -> usize {
603        self.cold.len()
604    }
605
606    /// Finds the version visible at the given epoch.
607    #[inline]
608    #[must_use]
609    pub fn visible_at(&self, epoch: EpochId) -> Option<VersionRef> {
610        // Check hot versions first (most recent first, likely case)
611        for v in &self.hot {
612            if v.is_visible_at(epoch) {
613                return Some(VersionRef::Hot(*v));
614            }
615        }
616        // Fall back to cold versions
617        for v in &self.cold {
618            if v.is_visible_at(epoch) {
619                return Some(VersionRef::Cold(*v));
620            }
621        }
622        None
623    }
624
625    /// Finds the version visible to a specific transaction.
626    #[inline]
627    #[must_use]
628    pub fn visible_to(&self, epoch: EpochId, tx: TransactionId) -> Option<VersionRef> {
629        // Check hot versions first
630        for v in &self.hot {
631            if v.is_visible_to(epoch, tx) {
632                return Some(VersionRef::Hot(*v));
633            }
634        }
635        // Fall back to cold versions
636        for v in &self.cold {
637            if v.is_visible_to(epoch, tx) {
638                return Some(VersionRef::Cold(*v));
639            }
640        }
641        None
642    }
643
644    /// Marks the currently visible version as deleted at the given epoch.
645    ///
646    /// Returns `true` if a version was marked, `false` if no visible version exists.
647    pub fn mark_deleted(&mut self, delete_epoch: EpochId) -> bool {
648        // Find the first non-deleted hot version and mark it
649        for v in &mut self.hot {
650            if v.deleted_epoch.is_none() {
651                v.deleted_epoch = OptionalEpochId::some(delete_epoch);
652                return true;
653            }
654        }
655        // Check cold versions (rare case)
656        for v in &mut self.cold {
657            if v.deleted_epoch.is_none() {
658                v.deleted_epoch = OptionalEpochId::some(delete_epoch);
659                return true;
660            }
661        }
662        false
663    }
664
665    /// Checks if any version was created by the given transaction.
666    #[must_use]
667    pub fn modified_by(&self, tx: TransactionId) -> bool {
668        self.hot.iter().any(|v| v.created_by == tx) || self.cold.iter().any(|v| v.created_by == tx)
669    }
670
671    /// Removes all versions created by the given transaction (for rollback).
672    pub fn remove_versions_by(&mut self, tx: TransactionId) {
673        self.hot.retain(|v| v.created_by != tx);
674        self.cold.retain(|v| v.created_by != tx);
675        self.recalculate_latest_epoch();
676    }
677
678    /// Checks for write conflict with concurrent transaction.
679    ///
680    /// A conflict exists if another transaction modified this entity
681    /// after our start epoch.
682    #[must_use]
683    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TransactionId) -> bool {
684        self.hot
685            .iter()
686            .any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
687            || self
688                .cold
689                .iter()
690                .any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
691    }
692
693    /// Garbage collects old versions not needed by any active transaction.
694    ///
695    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
696    pub fn gc(&mut self, min_epoch: EpochId) {
697        if self.is_empty() {
698            return;
699        }
700
701        // Keep versions that:
702        // 1. Were created at or after min_epoch
703        // 2. The first (most recent) version created before min_epoch
704        let mut found_old_visible = false;
705
706        self.hot.retain(|v| {
707            if v.epoch.as_u64() >= min_epoch.as_u64() {
708                true
709            } else if !found_old_visible {
710                found_old_visible = true;
711                true
712            } else {
713                false
714            }
715        });
716
717        // Same for cold, but only if we haven't found an old visible in hot
718        if !found_old_visible {
719            self.cold.retain(|v| {
720                if v.epoch.as_u64() >= min_epoch.as_u64() {
721                    true
722                } else if !found_old_visible {
723                    found_old_visible = true;
724                    true
725                } else {
726                    false
727                }
728            });
729        } else {
730            // All cold versions are older, only keep those >= min_epoch
731            self.cold.retain(|v| v.epoch.as_u64() >= min_epoch.as_u64());
732        }
733    }
734
735    /// Returns epoch IDs of all versions, newest first.
736    ///
737    /// Combines hot and cold versions, sorted by epoch descending.
738    #[must_use]
739    pub fn version_epochs(&self) -> Vec<EpochId> {
740        let mut epochs: Vec<EpochId> = self
741            .hot
742            .iter()
743            .map(|v| v.epoch)
744            .chain(self.cold.iter().map(|v| v.epoch))
745            .collect();
746        epochs.sort_by_key(|e| std::cmp::Reverse(e.as_u64()));
747        epochs
748    }
749
750    /// Returns all version references with their metadata, newest first.
751    ///
752    /// Each item is `(EpochId, Option<EpochId>, VersionRef)` giving
753    /// the created epoch, deleted epoch, and a reference to read the data.
754    #[must_use]
755    pub fn version_history(&self) -> Vec<(EpochId, Option<EpochId>, VersionRef)> {
756        let mut versions: Vec<(EpochId, Option<EpochId>, VersionRef)> = self
757            .hot
758            .iter()
759            .map(|v| (v.epoch, v.deleted_epoch.get(), VersionRef::Hot(*v)))
760            .chain(
761                self.cold
762                    .iter()
763                    .map(|v| (v.epoch, v.deleted_epoch.get(), VersionRef::Cold(*v))),
764            )
765            .collect();
766        versions.sort_by_key(|v| std::cmp::Reverse(v.0.as_u64()));
767        versions
768    }
769
770    /// Returns a reference to the latest version regardless of visibility.
771    #[must_use]
772    pub fn latest(&self) -> Option<VersionRef> {
773        self.hot
774            .first()
775            .map(|v| VersionRef::Hot(*v))
776            .or_else(|| self.cold.first().map(|v| VersionRef::Cold(*v)))
777    }
778
779    /// Freezes hot versions for a given epoch into cold storage.
780    ///
781    /// This is called when an epoch is no longer needed by any active transaction.
782    /// The actual compression happens in Phase 14; for now this just moves refs.
783    pub fn freeze_epoch(
784        &mut self,
785        epoch: EpochId,
786        cold_refs: impl Iterator<Item = ColdVersionRef>,
787    ) {
788        // Remove hot refs for this epoch
789        self.hot.retain(|v| v.epoch != epoch);
790
791        // Add cold refs
792        self.cold.extend(cold_refs);
793
794        // Keep cold sorted by epoch (descending = most recent first)
795        self.cold
796            .sort_by(|a, b| b.epoch.as_u64().cmp(&a.epoch.as_u64()));
797
798        self.recalculate_latest_epoch();
799    }
800
801    /// Returns hot version refs for a specific epoch (for freeze operation).
802    pub fn hot_refs_for_epoch(&self, epoch: EpochId) -> impl Iterator<Item = &HotVersionRef> {
803        self.hot.iter().filter(move |v| v.epoch == epoch)
804    }
805
806    /// Returns `true` if the hot SmallVec has spilled to the heap.
807    #[must_use]
808    pub fn hot_spilled(&self) -> bool {
809        self.hot.spilled()
810    }
811
812    /// Returns `true` if the cold SmallVec has spilled to the heap.
813    #[must_use]
814    pub fn cold_spilled(&self) -> bool {
815        self.cold.spilled()
816    }
817
818    fn recalculate_latest_epoch(&mut self) {
819        self.latest_epoch = self
820            .hot
821            .first()
822            .map(|v| v.epoch)
823            .or_else(|| self.cold.first().map(|v| v.epoch))
824            .unwrap_or(EpochId::INITIAL);
825    }
826}
827
828#[cfg(feature = "tiered-storage")]
829impl Default for VersionIndex {
830    fn default() -> Self {
831        Self::new()
832    }
833}
834
835#[cfg(test)]
836mod tests {
837    use super::*;
838
839    #[test]
840    fn test_version_visibility() {
841        let v = VersionInfo::new(EpochId::new(5), TransactionId::new(1));
842
843        // Not visible before creation
844        assert!(!v.is_visible_at(EpochId::new(4)));
845
846        // Visible at creation epoch and after
847        assert!(v.is_visible_at(EpochId::new(5)));
848        assert!(v.is_visible_at(EpochId::new(10)));
849    }
850
851    #[test]
852    fn test_deleted_version_visibility() {
853        let mut v = VersionInfo::new(EpochId::new(5), TransactionId::new(1));
854        v.mark_deleted(EpochId::new(10));
855
856        // Visible between creation and deletion
857        assert!(v.is_visible_at(EpochId::new(5)));
858        assert!(v.is_visible_at(EpochId::new(9)));
859
860        // Not visible at or after deletion
861        assert!(!v.is_visible_at(EpochId::new(10)));
862        assert!(!v.is_visible_at(EpochId::new(15)));
863    }
864
865    #[test]
866    fn test_version_visibility_to_transaction() {
867        let v = VersionInfo::new(EpochId::new(5), TransactionId::new(1));
868
869        // Creator can see it even if viewing at earlier epoch
870        assert!(v.is_visible_to(EpochId::new(3), TransactionId::new(1)));
871
872        // Other transactions can only see it at or after creation epoch
873        assert!(!v.is_visible_to(EpochId::new(3), TransactionId::new(2)));
874        assert!(v.is_visible_to(EpochId::new(5), TransactionId::new(2)));
875    }
876
877    #[test]
878    fn test_version_chain_basic() {
879        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TransactionId::new(1));
880
881        // Should see v1 at epoch 1+
882        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
883        assert_eq!(chain.visible_at(EpochId::new(0)), None);
884
885        // Add v2
886        chain.add_version("v2", EpochId::new(5), TransactionId::new(2));
887
888        // Should see v1 at epoch < 5, v2 at epoch >= 5
889        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
890        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
891        assert_eq!(chain.visible_at(EpochId::new(5)), Some(&"v2"));
892        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v2"));
893    }
894
895    #[test]
896    fn test_version_chain_rollback() {
897        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TransactionId::new(1));
898        chain.add_version("v2", EpochId::new(5), TransactionId::new(2));
899        chain.add_version("v3", EpochId::new(6), TransactionId::new(2));
900
901        assert_eq!(chain.version_count(), 3);
902
903        // Rollback tx 2's changes
904        chain.remove_versions_by(TransactionId::new(2));
905
906        assert_eq!(chain.version_count(), 1);
907        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v1"));
908    }
909
910    #[test]
911    fn test_version_chain_deletion() {
912        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TransactionId::new(1));
913
914        // Mark as deleted at epoch 5
915        assert!(chain.mark_deleted(EpochId::new(5)));
916
917        // Should see v1 before deletion, nothing after
918        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
919        assert_eq!(chain.visible_at(EpochId::new(5)), None);
920        assert_eq!(chain.visible_at(EpochId::new(10)), None);
921    }
922}
923
924// ============================================================================
925// Tiered Storage Tests
926// ============================================================================
927
928#[cfg(all(test, feature = "tiered-storage"))]
929mod tiered_storage_tests {
930    use super::*;
931
932    #[test]
933    fn test_optional_epoch_id() {
934        // Test NONE
935        let none = OptionalEpochId::NONE;
936        assert!(none.is_none());
937        assert!(!none.is_some());
938        assert_eq!(none.get(), None);
939
940        // Test Some
941        let some = OptionalEpochId::some(EpochId::new(42));
942        assert!(some.is_some());
943        assert!(!some.is_none());
944        assert_eq!(some.get(), Some(EpochId::new(42)));
945
946        // Test zero epoch
947        let zero = OptionalEpochId::some(EpochId::new(0));
948        assert!(zero.is_some());
949        assert_eq!(zero.get(), Some(EpochId::new(0)));
950    }
951
952    #[test]
953    fn test_hot_version_ref_visibility() {
954        let hot = HotVersionRef::new(EpochId::new(5), 100, TransactionId::new(1));
955
956        // Not visible before creation
957        assert!(!hot.is_visible_at(EpochId::new(4)));
958
959        // Visible at creation and after
960        assert!(hot.is_visible_at(EpochId::new(5)));
961        assert!(hot.is_visible_at(EpochId::new(10)));
962    }
963
964    #[test]
965    fn test_hot_version_ref_deleted_visibility() {
966        let mut hot = HotVersionRef::new(EpochId::new(5), 100, TransactionId::new(1));
967        hot.deleted_epoch = OptionalEpochId::some(EpochId::new(10));
968
969        // Visible between creation and deletion
970        assert!(hot.is_visible_at(EpochId::new(5)));
971        assert!(hot.is_visible_at(EpochId::new(9)));
972
973        // Not visible at or after deletion
974        assert!(!hot.is_visible_at(EpochId::new(10)));
975        assert!(!hot.is_visible_at(EpochId::new(15)));
976    }
977
978    #[test]
979    fn test_hot_version_ref_transaction_visibility() {
980        let hot = HotVersionRef::new(EpochId::new(5), 100, TransactionId::new(1));
981
982        // Creator can see it even at earlier epoch
983        assert!(hot.is_visible_to(EpochId::new(3), TransactionId::new(1)));
984
985        // Other transactions can only see it at or after creation
986        assert!(!hot.is_visible_to(EpochId::new(3), TransactionId::new(2)));
987        assert!(hot.is_visible_to(EpochId::new(5), TransactionId::new(2)));
988    }
989
990    #[test]
991    fn test_version_index_basic() {
992        let hot = HotVersionRef::new(EpochId::new(1), 0, TransactionId::new(1));
993        let mut index = VersionIndex::with_initial(hot);
994
995        // Should see version at epoch 1+
996        assert!(index.visible_at(EpochId::new(1)).is_some());
997        assert!(index.visible_at(EpochId::new(0)).is_none());
998
999        // Add another version
1000        let hot2 = HotVersionRef::new(EpochId::new(5), 100, TransactionId::new(2));
1001        index.add_hot(hot2);
1002
1003        // Should see v1 at epoch < 5, v2 at epoch >= 5
1004        let v1 = index.visible_at(EpochId::new(4)).unwrap();
1005        assert!(matches!(v1, VersionRef::Hot(h) if h.arena_offset == 0));
1006
1007        let v2 = index.visible_at(EpochId::new(5)).unwrap();
1008        assert!(matches!(v2, VersionRef::Hot(h) if h.arena_offset == 100));
1009    }
1010
1011    #[test]
1012    fn test_version_index_deletion() {
1013        let hot = HotVersionRef::new(EpochId::new(1), 0, TransactionId::new(1));
1014        let mut index = VersionIndex::with_initial(hot);
1015
1016        // Mark as deleted at epoch 5
1017        assert!(index.mark_deleted(EpochId::new(5)));
1018
1019        // Should see version before deletion, nothing after
1020        assert!(index.visible_at(EpochId::new(4)).is_some());
1021        assert!(index.visible_at(EpochId::new(5)).is_none());
1022        assert!(index.visible_at(EpochId::new(10)).is_none());
1023    }
1024
1025    #[test]
1026    fn test_version_index_transaction_visibility() {
1027        let tx = TransactionId::new(10);
1028        let hot = HotVersionRef::new(EpochId::new(5), 0, tx);
1029        let index = VersionIndex::with_initial(hot);
1030
1031        // Creator can see it even at earlier epoch
1032        assert!(index.visible_to(EpochId::new(3), tx).is_some());
1033
1034        // Other transactions cannot see it before creation
1035        assert!(
1036            index
1037                .visible_to(EpochId::new(3), TransactionId::new(20))
1038                .is_none()
1039        );
1040        assert!(
1041            index
1042                .visible_to(EpochId::new(5), TransactionId::new(20))
1043                .is_some()
1044        );
1045    }
1046
1047    #[test]
1048    fn test_version_index_rollback() {
1049        let tx1 = TransactionId::new(10);
1050        let tx2 = TransactionId::new(20);
1051
1052        let mut index = VersionIndex::new();
1053        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, tx1));
1054        index.add_hot(HotVersionRef::new(EpochId::new(2), 100, tx2));
1055        index.add_hot(HotVersionRef::new(EpochId::new(3), 200, tx2));
1056
1057        assert_eq!(index.version_count(), 3);
1058        assert!(index.modified_by(tx1));
1059        assert!(index.modified_by(tx2));
1060
1061        // Rollback tx2's changes
1062        index.remove_versions_by(tx2);
1063
1064        assert_eq!(index.version_count(), 1);
1065        assert!(index.modified_by(tx1));
1066        assert!(!index.modified_by(tx2));
1067
1068        // Should only see tx1's version
1069        let v = index.visible_at(EpochId::new(10)).unwrap();
1070        assert!(matches!(v, VersionRef::Hot(h) if h.created_by == tx1));
1071    }
1072
1073    #[test]
1074    fn test_version_index_gc() {
1075        let mut index = VersionIndex::new();
1076
1077        // Add versions at epochs 1, 3, 5
1078        for epoch in [1, 3, 5] {
1079            index.add_hot(HotVersionRef::new(
1080                EpochId::new(epoch),
1081                epoch as u32 * 100,
1082                TransactionId::new(epoch),
1083            ));
1084        }
1085
1086        assert_eq!(index.version_count(), 3);
1087
1088        // GC with min_epoch = 4
1089        // Should keep: epoch 5 (>= 4) and epoch 3 (first old visible)
1090        index.gc(EpochId::new(4));
1091
1092        assert_eq!(index.version_count(), 2);
1093
1094        // Verify we kept epochs 5 and 3
1095        assert!(index.visible_at(EpochId::new(5)).is_some());
1096        assert!(index.visible_at(EpochId::new(3)).is_some());
1097    }
1098
1099    #[test]
1100    fn test_version_index_conflict_detection() {
1101        let tx1 = TransactionId::new(10);
1102        let tx2 = TransactionId::new(20);
1103
1104        let mut index = VersionIndex::new();
1105        index.add_hot(HotVersionRef::new(EpochId::new(1), 0, tx1));
1106        index.add_hot(HotVersionRef::new(EpochId::new(5), 100, tx2));
1107
1108        // tx1 started at epoch 0, tx2 modified at epoch 5 -> conflict for tx1
1109        assert!(index.has_conflict(EpochId::new(0), tx1));
1110
1111        // tx2 started at epoch 0, tx1 modified at epoch 1 -> also conflict for tx2
1112        assert!(index.has_conflict(EpochId::new(0), tx2));
1113
1114        // tx1 started after tx2's modification -> no conflict
1115        assert!(!index.has_conflict(EpochId::new(5), tx1));
1116
1117        // tx2 started after tx1's modification -> no conflict
1118        assert!(!index.has_conflict(EpochId::new(1), tx2));
1119
1120        // If only tx1's version exists, tx1 doesn't conflict with itself
1121        let mut index2 = VersionIndex::new();
1122        index2.add_hot(HotVersionRef::new(EpochId::new(5), 0, tx1));
1123        assert!(!index2.has_conflict(EpochId::new(0), tx1));
1124    }
1125
1126    #[test]
1127    fn test_version_index_smallvec_no_heap() {
1128        let mut index = VersionIndex::new();
1129
1130        // Add 2 hot versions (within inline capacity)
1131        for i in 0..2 {
1132            index.add_hot(HotVersionRef::new(
1133                EpochId::new(i),
1134                i as u32,
1135                TransactionId::new(i),
1136            ));
1137        }
1138
1139        // SmallVec should not have spilled to heap
1140        assert!(!index.hot_spilled());
1141        assert!(!index.cold_spilled());
1142    }
1143
1144    #[test]
1145    fn test_version_index_freeze_epoch() {
1146        let mut index = VersionIndex::new();
1147        index.add_hot(HotVersionRef::new(
1148            EpochId::new(1),
1149            0,
1150            TransactionId::new(1),
1151        ));
1152        index.add_hot(HotVersionRef::new(
1153            EpochId::new(2),
1154            100,
1155            TransactionId::new(2),
1156        ));
1157
1158        assert_eq!(index.hot_count(), 2);
1159        assert_eq!(index.cold_count(), 0);
1160
1161        // Freeze epoch 1
1162        let cold_ref = ColdVersionRef {
1163            epoch: EpochId::new(1),
1164            block_offset: 0,
1165            length: 32,
1166            created_by: TransactionId::new(1),
1167            deleted_epoch: OptionalEpochId::NONE,
1168        };
1169        index.freeze_epoch(EpochId::new(1), std::iter::once(cold_ref));
1170
1171        // Hot should have 1, cold should have 1
1172        assert_eq!(index.hot_count(), 1);
1173        assert_eq!(index.cold_count(), 1);
1174
1175        // Visibility should still work
1176        assert!(index.visible_at(EpochId::new(1)).is_some());
1177        assert!(index.visible_at(EpochId::new(2)).is_some());
1178
1179        // Check that cold version is actually cold
1180        let v1 = index.visible_at(EpochId::new(1)).unwrap();
1181        assert!(v1.is_cold());
1182
1183        let v2 = index.visible_at(EpochId::new(2)).unwrap();
1184        assert!(v2.is_hot());
1185    }
1186
1187    #[test]
1188    fn test_version_ref_accessors() {
1189        let hot = HotVersionRef::new(EpochId::new(5), 100, TransactionId::new(10));
1190        let vr = VersionRef::Hot(hot);
1191
1192        assert_eq!(vr.epoch(), EpochId::new(5));
1193        assert_eq!(vr.created_by(), TransactionId::new(10));
1194        assert!(vr.is_hot());
1195        assert!(!vr.is_cold());
1196    }
1197
1198    #[test]
1199    fn test_version_index_latest_epoch() {
1200        let mut index = VersionIndex::new();
1201        assert_eq!(index.latest_epoch(), EpochId::INITIAL);
1202
1203        index.add_hot(HotVersionRef::new(
1204            EpochId::new(5),
1205            0,
1206            TransactionId::new(1),
1207        ));
1208        assert_eq!(index.latest_epoch(), EpochId::new(5));
1209
1210        index.add_hot(HotVersionRef::new(
1211            EpochId::new(10),
1212            100,
1213            TransactionId::new(2),
1214        ));
1215        assert_eq!(index.latest_epoch(), EpochId::new(10));
1216
1217        // After rollback, should recalculate
1218        index.remove_versions_by(TransactionId::new(2));
1219        assert_eq!(index.latest_epoch(), EpochId::new(5));
1220    }
1221
1222    #[test]
1223    fn test_version_index_default() {
1224        let index = VersionIndex::default();
1225        assert!(index.is_empty());
1226        assert_eq!(index.version_count(), 0);
1227    }
1228
1229    #[test]
1230    fn test_version_index_latest() {
1231        let mut index = VersionIndex::new();
1232        assert!(index.latest().is_none());
1233
1234        index.add_hot(HotVersionRef::new(
1235            EpochId::new(1),
1236            0,
1237            TransactionId::new(1),
1238        ));
1239        let latest = index.latest().unwrap();
1240        assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(1)));
1241
1242        index.add_hot(HotVersionRef::new(
1243            EpochId::new(5),
1244            100,
1245            TransactionId::new(2),
1246        ));
1247        let latest = index.latest().unwrap();
1248        assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(5)));
1249    }
1250}