Skip to main content

graphos_common/
mvcc.rs

1//! MVCC (Multi-Version Concurrency Control) primitives.
2//!
3//! This module provides the version chain and visibility logic
4//! for concurrent read/write access to graph data.
5
6use std::collections::VecDeque;
7
8use crate::types::{EpochId, TxId};
9
10/// Visibility information for a version.
11#[derive(Debug, Clone, Copy)]
12pub struct VersionInfo {
13    /// The epoch this version was created in.
14    pub created_epoch: EpochId,
15    /// The epoch this version was deleted in (if any).
16    pub deleted_epoch: Option<EpochId>,
17    /// The transaction that created this version.
18    pub created_by: TxId,
19}
20
21impl VersionInfo {
22    /// Creates a new version info.
23    #[must_use]
24    pub fn new(created_epoch: EpochId, created_by: TxId) -> Self {
25        Self {
26            created_epoch,
27            deleted_epoch: None,
28            created_by,
29        }
30    }
31
32    /// Marks this version as deleted.
33    pub fn mark_deleted(&mut self, epoch: EpochId) {
34        self.deleted_epoch = Some(epoch);
35    }
36
37    /// Checks if this version is visible at the given epoch.
38    #[must_use]
39    pub fn is_visible_at(&self, epoch: EpochId) -> bool {
40        // Visible if created before or at the viewing epoch
41        // and not deleted before the viewing epoch
42        if !self.created_epoch.is_visible_at(epoch) {
43            return false;
44        }
45
46        if let Some(deleted) = self.deleted_epoch {
47            // Not visible if deleted at or before the viewing epoch
48            deleted.as_u64() > epoch.as_u64()
49        } else {
50            true
51        }
52    }
53
54    /// Checks if this version is visible to a specific transaction.
55    ///
56    /// A version is visible to a transaction if:
57    /// 1. It was created by the same transaction, OR
58    /// 2. It was created in an epoch before the transaction's start epoch
59    ///    and not deleted before that epoch
60    #[must_use]
61    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
62        // Own modifications are always visible
63        if self.created_by == viewing_tx {
64            return self.deleted_epoch.is_none();
65        }
66
67        // Otherwise, use epoch-based visibility
68        self.is_visible_at(viewing_epoch)
69    }
70}
71
72/// A single version of data.
73#[derive(Debug, Clone)]
74pub struct Version<T> {
75    /// Visibility metadata.
76    pub info: VersionInfo,
77    /// The actual data.
78    pub data: T,
79}
80
81impl<T> Version<T> {
82    /// Creates a new version.
83    #[must_use]
84    pub fn new(data: T, created_epoch: EpochId, created_by: TxId) -> Self {
85        Self {
86            info: VersionInfo::new(created_epoch, created_by),
87            data,
88        }
89    }
90}
91
92/// A chain of versions for a single entity.
93///
94/// Versions are stored newest-first for efficient access to recent versions.
95/// Old versions can be garbage collected when no active transactions can see them.
96#[derive(Debug, Clone)]
97pub struct VersionChain<T> {
98    /// Versions ordered newest-first.
99    versions: VecDeque<Version<T>>,
100}
101
102impl<T> VersionChain<T> {
103    /// Creates a new empty version chain.
104    #[must_use]
105    pub fn new() -> Self {
106        Self {
107            versions: VecDeque::new(),
108        }
109    }
110
111    /// Creates a version chain with an initial version.
112    #[must_use]
113    pub fn with_initial(data: T, created_epoch: EpochId, created_by: TxId) -> Self {
114        let mut chain = Self::new();
115        chain.add_version(data, created_epoch, created_by);
116        chain
117    }
118
119    /// Adds a new version to the chain.
120    ///
121    /// The new version becomes the head of the chain.
122    pub fn add_version(&mut self, data: T, created_epoch: EpochId, created_by: TxId) {
123        let version = Version::new(data, created_epoch, created_by);
124        self.versions.push_front(version);
125    }
126
127    /// Finds the version visible at the given epoch.
128    ///
129    /// Returns a reference to the visible version's data, or `None` if no version
130    /// is visible at that epoch.
131    #[must_use]
132    pub fn visible_at(&self, epoch: EpochId) -> Option<&T> {
133        self.versions
134            .iter()
135            .find(|v| v.info.is_visible_at(epoch))
136            .map(|v| &v.data)
137    }
138
139    /// Finds the version visible to a specific transaction.
140    ///
141    /// This considers both the transaction's epoch and its own uncommitted changes.
142    #[must_use]
143    pub fn visible_to(&self, epoch: EpochId, tx: TxId) -> Option<&T> {
144        self.versions
145            .iter()
146            .find(|v| v.info.is_visible_to(epoch, tx))
147            .map(|v| &v.data)
148    }
149
150    /// Marks the current visible version as deleted.
151    ///
152    /// Returns `true` if a version was marked, `false` if no visible version exists.
153    pub fn mark_deleted(&mut self, delete_epoch: EpochId) -> bool {
154        for version in &mut self.versions {
155            if version.info.deleted_epoch.is_none() {
156                version.info.mark_deleted(delete_epoch);
157                return true;
158            }
159        }
160        false
161    }
162
163    /// Checks if any version was modified by the given transaction.
164    #[must_use]
165    pub fn modified_by(&self, tx: TxId) -> bool {
166        self.versions.iter().any(|v| v.info.created_by == tx)
167    }
168
169    /// Removes all versions created by the given transaction.
170    ///
171    /// Used for rollback to discard uncommitted changes.
172    pub fn remove_versions_by(&mut self, tx: TxId) {
173        self.versions.retain(|v| v.info.created_by != tx);
174    }
175
176    /// Checks if there's a concurrent modification conflict.
177    ///
178    /// A conflict exists if another transaction modified this entity
179    /// after our start epoch.
180    #[must_use]
181    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TxId) -> bool {
182        self.versions.iter().any(|v| {
183            v.info.created_by != our_tx && v.info.created_epoch.as_u64() > start_epoch.as_u64()
184        })
185    }
186
187    /// Returns the number of versions in the chain.
188    #[must_use]
189    pub fn version_count(&self) -> usize {
190        self.versions.len()
191    }
192
193    /// Returns true if the chain has no versions.
194    #[must_use]
195    pub fn is_empty(&self) -> bool {
196        self.versions.is_empty()
197    }
198
199    /// Garbage collects old versions that are no longer visible to any transaction.
200    ///
201    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
202    pub fn gc(&mut self, min_epoch: EpochId) {
203        if self.versions.is_empty() {
204            return;
205        }
206
207        let mut keep_count = 0;
208        let mut found_old_visible = false;
209
210        for (i, version) in self.versions.iter().enumerate() {
211            if version.info.created_epoch.as_u64() >= min_epoch.as_u64() {
212                keep_count = i + 1;
213            } else if !found_old_visible {
214                // Keep the first (most recent) old version
215                found_old_visible = true;
216                keep_count = i + 1;
217            }
218        }
219
220        self.versions.truncate(keep_count);
221    }
222
223    /// Returns a reference to the latest version's data regardless of visibility.
224    #[must_use]
225    pub fn latest(&self) -> Option<&T> {
226        self.versions.front().map(|v| &v.data)
227    }
228
229    /// Returns a mutable reference to the latest version's data.
230    #[must_use]
231    pub fn latest_mut(&mut self) -> Option<&mut T> {
232        self.versions.front_mut().map(|v| &mut v.data)
233    }
234}
235
236impl<T> Default for VersionChain<T> {
237    fn default() -> Self {
238        Self::new()
239    }
240}
241
242impl<T: Clone> VersionChain<T> {
243    /// Gets a mutable reference to the visible version's data for modification.
244    ///
245    /// If the version is not owned by this transaction, creates a new version
246    /// with a copy of the data.
247    pub fn get_mut(&mut self, epoch: EpochId, tx: TxId, modify_epoch: EpochId) -> Option<&mut T> {
248        // Find the visible version
249        let visible_idx = self
250            .versions
251            .iter()
252            .position(|v| v.info.is_visible_to(epoch, tx))?;
253
254        let visible = &self.versions[visible_idx];
255
256        if visible.info.created_by == tx {
257            // Already our version, modify in place
258            Some(&mut self.versions[visible_idx].data)
259        } else {
260            // Create a new version with copied data
261            let new_data = visible.data.clone();
262            self.add_version(new_data, modify_epoch, tx);
263            Some(&mut self.versions[0].data)
264        }
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    #[test]
273    fn test_version_visibility() {
274        let v = VersionInfo::new(EpochId::new(5), TxId::new(1));
275
276        // Not visible before creation
277        assert!(!v.is_visible_at(EpochId::new(4)));
278
279        // Visible at creation epoch and after
280        assert!(v.is_visible_at(EpochId::new(5)));
281        assert!(v.is_visible_at(EpochId::new(10)));
282    }
283
284    #[test]
285    fn test_deleted_version_visibility() {
286        let mut v = VersionInfo::new(EpochId::new(5), TxId::new(1));
287        v.mark_deleted(EpochId::new(10));
288
289        // Visible between creation and deletion
290        assert!(v.is_visible_at(EpochId::new(5)));
291        assert!(v.is_visible_at(EpochId::new(9)));
292
293        // Not visible at or after deletion
294        assert!(!v.is_visible_at(EpochId::new(10)));
295        assert!(!v.is_visible_at(EpochId::new(15)));
296    }
297
298    #[test]
299    fn test_version_visibility_to_transaction() {
300        let v = VersionInfo::new(EpochId::new(5), TxId::new(1));
301
302        // Creator can see it even if viewing at earlier epoch
303        assert!(v.is_visible_to(EpochId::new(3), TxId::new(1)));
304
305        // Other transactions can only see it at or after creation epoch
306        assert!(!v.is_visible_to(EpochId::new(3), TxId::new(2)));
307        assert!(v.is_visible_to(EpochId::new(5), TxId::new(2)));
308    }
309
310    #[test]
311    fn test_version_chain_basic() {
312        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
313
314        // Should see v1 at epoch 1+
315        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
316        assert_eq!(chain.visible_at(EpochId::new(0)), None);
317
318        // Add v2
319        chain.add_version("v2", EpochId::new(5), TxId::new(2));
320
321        // Should see v1 at epoch < 5, v2 at epoch >= 5
322        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
323        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
324        assert_eq!(chain.visible_at(EpochId::new(5)), Some(&"v2"));
325        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v2"));
326    }
327
328    #[test]
329    fn test_version_chain_rollback() {
330        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
331        chain.add_version("v2", EpochId::new(5), TxId::new(2));
332        chain.add_version("v3", EpochId::new(6), TxId::new(2));
333
334        assert_eq!(chain.version_count(), 3);
335
336        // Rollback tx 2's changes
337        chain.remove_versions_by(TxId::new(2));
338
339        assert_eq!(chain.version_count(), 1);
340        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v1"));
341    }
342
343    #[test]
344    fn test_version_chain_deletion() {
345        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
346
347        // Mark as deleted at epoch 5
348        assert!(chain.mark_deleted(EpochId::new(5)));
349
350        // Should see v1 before deletion, nothing after
351        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
352        assert_eq!(chain.visible_at(EpochId::new(5)), None);
353        assert_eq!(chain.visible_at(EpochId::new(10)), None);
354    }
355}