Skip to main content

grafeo_common/
mvcc.rs

1//! MVCC (Multi-Version Concurrency Control) primitives.
2//!
3//! This is how Grafeo handles concurrent reads and writes without blocking.
4//! Each entity has a [`VersionChain`] that tracks all versions. Readers see
5//! consistent snapshots, writers create new versions, and old versions get
6//! garbage collected when no one needs them anymore.
7
8use std::collections::VecDeque;
9
10use crate::types::{EpochId, TxId};
11
12/// Tracks when a version was created and deleted for visibility checks.
13#[derive(Debug, Clone, Copy)]
14pub struct VersionInfo {
15    /// The epoch this version was created in.
16    pub created_epoch: EpochId,
17    /// The epoch this version was deleted in (if any).
18    pub deleted_epoch: Option<EpochId>,
19    /// The transaction that created this version.
20    pub created_by: TxId,
21}
22
23impl VersionInfo {
24    /// Creates a new version info.
25    #[must_use]
26    pub fn new(created_epoch: EpochId, created_by: TxId) -> Self {
27        Self {
28            created_epoch,
29            deleted_epoch: None,
30            created_by,
31        }
32    }
33
34    /// Marks this version as deleted.
35    pub fn mark_deleted(&mut self, epoch: EpochId) {
36        self.deleted_epoch = Some(epoch);
37    }
38
39    /// Checks if this version is visible at the given epoch.
40    #[must_use]
41    pub fn is_visible_at(&self, epoch: EpochId) -> bool {
42        // Visible if created before or at the viewing epoch
43        // and not deleted before the viewing epoch
44        if !self.created_epoch.is_visible_at(epoch) {
45            return false;
46        }
47
48        if let Some(deleted) = self.deleted_epoch {
49            // Not visible if deleted at or before the viewing epoch
50            deleted.as_u64() > epoch.as_u64()
51        } else {
52            true
53        }
54    }
55
56    /// Checks if this version is visible to a specific transaction.
57    ///
58    /// A version is visible to a transaction if:
59    /// 1. It was created by the same transaction, OR
60    /// 2. It was created in an epoch before the transaction's start epoch
61    ///    and not deleted before that epoch
62    #[must_use]
63    pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TxId) -> bool {
64        // Own modifications are always visible
65        if self.created_by == viewing_tx {
66            return self.deleted_epoch.is_none();
67        }
68
69        // Otherwise, use epoch-based visibility
70        self.is_visible_at(viewing_epoch)
71    }
72}
73
74/// A single version of data.
75#[derive(Debug, Clone)]
76pub struct Version<T> {
77    /// Visibility metadata.
78    pub info: VersionInfo,
79    /// The actual data.
80    pub data: T,
81}
82
83impl<T> Version<T> {
84    /// Creates a new version.
85    #[must_use]
86    pub fn new(data: T, created_epoch: EpochId, created_by: TxId) -> Self {
87        Self {
88            info: VersionInfo::new(created_epoch, created_by),
89            data,
90        }
91    }
92}
93
94/// All versions of a single entity, newest first.
95///
96/// Each node/edge has one of these tracking its version history. Use
97/// [`visible_at()`](Self::visible_at) to get the version at a specific epoch,
98/// or [`visible_to()`](Self::visible_to) for transaction-aware visibility.
99#[derive(Debug, Clone)]
100pub struct VersionChain<T> {
101    /// Versions ordered newest-first.
102    versions: VecDeque<Version<T>>,
103}
104
105impl<T> VersionChain<T> {
106    /// Creates a new empty version chain.
107    #[must_use]
108    pub fn new() -> Self {
109        Self {
110            versions: VecDeque::new(),
111        }
112    }
113
114    /// Creates a version chain with an initial version.
115    #[must_use]
116    pub fn with_initial(data: T, created_epoch: EpochId, created_by: TxId) -> Self {
117        let mut chain = Self::new();
118        chain.add_version(data, created_epoch, created_by);
119        chain
120    }
121
122    /// Adds a new version to the chain.
123    ///
124    /// The new version becomes the head of the chain.
125    pub fn add_version(&mut self, data: T, created_epoch: EpochId, created_by: TxId) {
126        let version = Version::new(data, created_epoch, created_by);
127        self.versions.push_front(version);
128    }
129
130    /// Finds the version visible at the given epoch.
131    ///
132    /// Returns a reference to the visible version's data, or `None` if no version
133    /// is visible at that epoch.
134    #[must_use]
135    pub fn visible_at(&self, epoch: EpochId) -> Option<&T> {
136        self.versions
137            .iter()
138            .find(|v| v.info.is_visible_at(epoch))
139            .map(|v| &v.data)
140    }
141
142    /// Finds the version visible to a specific transaction.
143    ///
144    /// This considers both the transaction's epoch and its own uncommitted changes.
145    #[must_use]
146    pub fn visible_to(&self, epoch: EpochId, tx: TxId) -> Option<&T> {
147        self.versions
148            .iter()
149            .find(|v| v.info.is_visible_to(epoch, tx))
150            .map(|v| &v.data)
151    }
152
153    /// Marks the current visible version as deleted.
154    ///
155    /// Returns `true` if a version was marked, `false` if no visible version exists.
156    pub fn mark_deleted(&mut self, delete_epoch: EpochId) -> bool {
157        for version in &mut self.versions {
158            if version.info.deleted_epoch.is_none() {
159                version.info.mark_deleted(delete_epoch);
160                return true;
161            }
162        }
163        false
164    }
165
166    /// Checks if any version was modified by the given transaction.
167    #[must_use]
168    pub fn modified_by(&self, tx: TxId) -> bool {
169        self.versions.iter().any(|v| v.info.created_by == tx)
170    }
171
172    /// Removes all versions created by the given transaction.
173    ///
174    /// Used for rollback to discard uncommitted changes.
175    pub fn remove_versions_by(&mut self, tx: TxId) {
176        self.versions.retain(|v| v.info.created_by != tx);
177    }
178
179    /// Checks if there's a concurrent modification conflict.
180    ///
181    /// A conflict exists if another transaction modified this entity
182    /// after our start epoch.
183    #[must_use]
184    pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TxId) -> bool {
185        self.versions.iter().any(|v| {
186            v.info.created_by != our_tx && v.info.created_epoch.as_u64() > start_epoch.as_u64()
187        })
188    }
189
190    /// Returns the number of versions in the chain.
191    #[must_use]
192    pub fn version_count(&self) -> usize {
193        self.versions.len()
194    }
195
196    /// Returns true if the chain has no versions.
197    #[must_use]
198    pub fn is_empty(&self) -> bool {
199        self.versions.is_empty()
200    }
201
202    /// Garbage collects old versions that are no longer visible to any transaction.
203    ///
204    /// Keeps versions that might still be visible to transactions at or after `min_epoch`.
205    pub fn gc(&mut self, min_epoch: EpochId) {
206        if self.versions.is_empty() {
207            return;
208        }
209
210        let mut keep_count = 0;
211        let mut found_old_visible = false;
212
213        for (i, version) in self.versions.iter().enumerate() {
214            if version.info.created_epoch.as_u64() >= min_epoch.as_u64() {
215                keep_count = i + 1;
216            } else if !found_old_visible {
217                // Keep the first (most recent) old version
218                found_old_visible = true;
219                keep_count = i + 1;
220            }
221        }
222
223        self.versions.truncate(keep_count);
224    }
225
226    /// Returns a reference to the latest version's data regardless of visibility.
227    #[must_use]
228    pub fn latest(&self) -> Option<&T> {
229        self.versions.front().map(|v| &v.data)
230    }
231
232    /// Returns a mutable reference to the latest version's data.
233    #[must_use]
234    pub fn latest_mut(&mut self) -> Option<&mut T> {
235        self.versions.front_mut().map(|v| &mut v.data)
236    }
237}
238
239impl<T> Default for VersionChain<T> {
240    fn default() -> Self {
241        Self::new()
242    }
243}
244
245impl<T: Clone> VersionChain<T> {
246    /// Gets a mutable reference to the visible version's data for modification.
247    ///
248    /// If the version is not owned by this transaction, creates a new version
249    /// with a copy of the data.
250    pub fn get_mut(&mut self, epoch: EpochId, tx: TxId, modify_epoch: EpochId) -> Option<&mut T> {
251        // Find the visible version
252        let visible_idx = self
253            .versions
254            .iter()
255            .position(|v| v.info.is_visible_to(epoch, tx))?;
256
257        let visible = &self.versions[visible_idx];
258
259        if visible.info.created_by == tx {
260            // Already our version, modify in place
261            Some(&mut self.versions[visible_idx].data)
262        } else {
263            // Create a new version with copied data
264            let new_data = visible.data.clone();
265            self.add_version(new_data, modify_epoch, tx);
266            Some(&mut self.versions[0].data)
267        }
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274
275    #[test]
276    fn test_version_visibility() {
277        let v = VersionInfo::new(EpochId::new(5), TxId::new(1));
278
279        // Not visible before creation
280        assert!(!v.is_visible_at(EpochId::new(4)));
281
282        // Visible at creation epoch and after
283        assert!(v.is_visible_at(EpochId::new(5)));
284        assert!(v.is_visible_at(EpochId::new(10)));
285    }
286
287    #[test]
288    fn test_deleted_version_visibility() {
289        let mut v = VersionInfo::new(EpochId::new(5), TxId::new(1));
290        v.mark_deleted(EpochId::new(10));
291
292        // Visible between creation and deletion
293        assert!(v.is_visible_at(EpochId::new(5)));
294        assert!(v.is_visible_at(EpochId::new(9)));
295
296        // Not visible at or after deletion
297        assert!(!v.is_visible_at(EpochId::new(10)));
298        assert!(!v.is_visible_at(EpochId::new(15)));
299    }
300
301    #[test]
302    fn test_version_visibility_to_transaction() {
303        let v = VersionInfo::new(EpochId::new(5), TxId::new(1));
304
305        // Creator can see it even if viewing at earlier epoch
306        assert!(v.is_visible_to(EpochId::new(3), TxId::new(1)));
307
308        // Other transactions can only see it at or after creation epoch
309        assert!(!v.is_visible_to(EpochId::new(3), TxId::new(2)));
310        assert!(v.is_visible_to(EpochId::new(5), TxId::new(2)));
311    }
312
313    #[test]
314    fn test_version_chain_basic() {
315        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
316
317        // Should see v1 at epoch 1+
318        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
319        assert_eq!(chain.visible_at(EpochId::new(0)), None);
320
321        // Add v2
322        chain.add_version("v2", EpochId::new(5), TxId::new(2));
323
324        // Should see v1 at epoch < 5, v2 at epoch >= 5
325        assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
326        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
327        assert_eq!(chain.visible_at(EpochId::new(5)), Some(&"v2"));
328        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v2"));
329    }
330
331    #[test]
332    fn test_version_chain_rollback() {
333        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
334        chain.add_version("v2", EpochId::new(5), TxId::new(2));
335        chain.add_version("v3", EpochId::new(6), TxId::new(2));
336
337        assert_eq!(chain.version_count(), 3);
338
339        // Rollback tx 2's changes
340        chain.remove_versions_by(TxId::new(2));
341
342        assert_eq!(chain.version_count(), 1);
343        assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v1"));
344    }
345
346    #[test]
347    fn test_version_chain_deletion() {
348        let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TxId::new(1));
349
350        // Mark as deleted at epoch 5
351        assert!(chain.mark_deleted(EpochId::new(5)));
352
353        // Should see v1 before deletion, nothing after
354        assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
355        assert_eq!(chain.visible_at(EpochId::new(5)), None);
356        assert_eq!(chain.visible_at(EpochId::new(10)), None);
357    }
358}