sochdb_storage/
version_store.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Version Store for MVCC
16//!
17//! Manages version chains for multi-version concurrency control.
18//! This module provides the storage layer for transaction isolation.
19
20use parking_lot::RwLock;
21use std::collections::HashMap;
22use std::sync::atomic::{AtomicU64, Ordering};
23
24/// Version identifier
25pub type VersionId = u64;
26
27/// Transaction ID for MVCC visibility
28pub type TxnId = u64;
29
30/// A versioned value with transaction metadata
31#[derive(Debug, Clone)]
32pub struct VersionedValue<T> {
33    /// The value
34    pub value: T,
35    /// Transaction that created this version
36    pub created_by: TxnId,
37    /// Transaction that deleted this version (None if active)
38    pub deleted_by: Option<TxnId>,
39    /// Timestamp of creation
40    pub created_at: u64,
41}
42
43/// Version store for a single key's version chain
44#[derive(Debug)]
45pub struct VersionChain<T> {
46    /// Versions ordered by creation time (newest first)
47    versions: RwLock<Vec<VersionedValue<T>>>,
48}
49
50impl<T: Clone> VersionChain<T> {
51    /// Create a new empty version chain
52    pub fn new() -> Self {
53        Self {
54            versions: RwLock::new(Vec::new()),
55        }
56    }
57
58    /// Add a new version
59    pub fn add_version(&self, value: T, txn_id: TxnId, timestamp: u64) {
60        let version = VersionedValue {
61            value,
62            created_by: txn_id,
63            deleted_by: None,
64            created_at: timestamp,
65        };
66        self.versions.write().insert(0, version);
67    }
68
69    /// Get the visible version for a transaction
70    pub fn get_visible(&self, read_txn: TxnId) -> Option<T> {
71        let versions = self.versions.read();
72        for version in versions.iter() {
73            // Skip if created by a future transaction
74            if version.created_by > read_txn {
75                continue;
76            }
77            // Skip if deleted by a committed transaction <= read_txn
78            if let Some(deleted_by) = version.deleted_by
79                && deleted_by <= read_txn
80            {
81                continue;
82            }
83            return Some(version.value.clone());
84        }
85        None
86    }
87
88    /// Mark the current version as deleted
89    pub fn mark_deleted(&self, txn_id: TxnId) -> bool {
90        let mut versions = self.versions.write();
91        if let Some(version) = versions.first_mut()
92            && version.deleted_by.is_none()
93        {
94            version.deleted_by = Some(txn_id);
95            return true;
96        }
97        false
98    }
99
100    /// Garbage collect old versions not visible to any active transaction
101    pub fn gc(&self, oldest_active_txn: TxnId) {
102        let mut versions = self.versions.write();
103        // Keep at least one version, remove those not visible to any active txn
104        if versions.len() <= 1 {
105            return;
106        }
107
108        versions.retain(|v| {
109            // Keep if created by a transaction >= oldest active
110            // or if it's the most recent version
111            v.created_by >= oldest_active_txn || v.deleted_by.is_none()
112        });
113    }
114}
115
116impl<T: Clone> Default for VersionChain<T> {
117    fn default() -> Self {
118        Self::new()
119    }
120}
121
122/// Version store managing version chains for all keys
123pub struct VersionStore<K, V> {
124    /// Version chains by key
125    chains: RwLock<HashMap<K, VersionChain<V>>>,
126    /// Next transaction ID
127    next_txn_id: AtomicU64,
128}
129
130impl<K: std::hash::Hash + Eq + Clone, V: Clone> VersionStore<K, V> {
131    /// Create a new version store
132    pub fn new() -> Self {
133        Self {
134            chains: RwLock::new(HashMap::new()),
135            next_txn_id: AtomicU64::new(1),
136        }
137    }
138
139    /// Get the next transaction ID
140    pub fn next_txn_id(&self) -> TxnId {
141        self.next_txn_id.fetch_add(1, Ordering::SeqCst)
142    }
143
144    /// Insert or update a value
145    pub fn put(&self, key: K, value: V, txn_id: TxnId) {
146        let timestamp = std::time::SystemTime::now()
147            .duration_since(std::time::UNIX_EPOCH)
148            .unwrap()
149            .as_micros() as u64;
150
151        let chains = self.chains.read();
152        if let Some(chain) = chains.get(&key) {
153            chain.add_version(value, txn_id, timestamp);
154            return;
155        }
156        drop(chains);
157
158        // Need to create new chain
159        let mut chains = self.chains.write();
160        let chain = chains.entry(key).or_default();
161        chain.add_version(value, txn_id, timestamp);
162    }
163
164    /// Get the visible value for a transaction
165    pub fn get(&self, key: &K, read_txn: TxnId) -> Option<V> {
166        let chains = self.chains.read();
167        chains
168            .get(key)
169            .and_then(|chain| chain.get_visible(read_txn))
170    }
171
172    /// Delete a key
173    pub fn delete(&self, key: &K, txn_id: TxnId) -> bool {
174        let chains = self.chains.read();
175        if let Some(chain) = chains.get(key) {
176            return chain.mark_deleted(txn_id);
177        }
178        false
179    }
180
181    /// Garbage collect old versions
182    pub fn gc(&self, oldest_active_txn: TxnId) {
183        let chains = self.chains.read();
184        for chain in chains.values() {
185            chain.gc(oldest_active_txn);
186        }
187    }
188}
189
190impl<K: std::hash::Hash + Eq + Clone, V: Clone> Default for VersionStore<K, V> {
191    fn default() -> Self {
192        Self::new()
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199
200    #[test]
201    fn test_version_chain_basic() {
202        let chain = VersionChain::<i32>::new();
203
204        chain.add_version(1, 1, 100);
205        chain.add_version(2, 2, 200);
206        chain.add_version(3, 3, 300);
207
208        // Transaction 2 should see version 2
209        assert_eq!(chain.get_visible(2), Some(2));
210
211        // Transaction 4 should see version 3
212        assert_eq!(chain.get_visible(4), Some(3));
213
214        // Transaction 1 should see version 1
215        assert_eq!(chain.get_visible(1), Some(1));
216    }
217
218    #[test]
219    fn test_version_store_mvcc() {
220        let store = VersionStore::<String, i32>::new();
221
222        // Insert with transaction 1
223        store.put("key1".to_string(), 100, 1);
224
225        // Update with transaction 2
226        store.put("key1".to_string(), 200, 2);
227
228        // Transaction 1 should see 100
229        assert_eq!(store.get(&"key1".to_string(), 1), Some(100));
230
231        // Transaction 3 should see 200
232        assert_eq!(store.get(&"key1".to_string(), 3), Some(200));
233    }
234
235    #[test]
236    fn test_delete() {
237        let store = VersionStore::<String, i32>::new();
238
239        store.put("key1".to_string(), 100, 1);
240
241        // Delete with transaction 2
242        assert!(store.delete(&"key1".to_string(), 2));
243
244        // Transaction 1 should still see 100
245        assert_eq!(store.get(&"key1".to_string(), 1), Some(100));
246
247        // Transaction 3 should see None (deleted)
248        assert_eq!(store.get(&"key1".to_string(), 3), None);
249    }
250}