Skip to main content

sochdb_storage/
version_store.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Version Store for MVCC
19//!
20//! Manages version chains for multi-version concurrency control.
21//! This module provides the storage layer for transaction isolation.
22
23use parking_lot::RwLock;
24use std::collections::HashMap;
25use std::sync::atomic::{AtomicU64, Ordering};
26
27/// Version identifier
28pub type VersionId = u64;
29
30/// Transaction ID for MVCC visibility
31pub type TxnId = u64;
32
33/// A versioned value with transaction metadata
34#[derive(Debug, Clone)]
35pub struct VersionedValue<T> {
36    /// The value
37    pub value: T,
38    /// Transaction that created this version
39    pub created_by: TxnId,
40    /// Transaction that deleted this version (None if active)
41    pub deleted_by: Option<TxnId>,
42    /// Timestamp of creation
43    pub created_at: u64,
44}
45
46/// Version store for a single key's version chain
47#[derive(Debug)]
48pub struct VersionChain<T> {
49    /// Versions ordered by creation time (newest first)
50    versions: RwLock<Vec<VersionedValue<T>>>,
51}
52
53impl<T: Clone> VersionChain<T> {
54    /// Create a new empty version chain
55    pub fn new() -> Self {
56        Self {
57            versions: RwLock::new(Vec::new()),
58        }
59    }
60
61    /// Add a new version
62    pub fn add_version(&self, value: T, txn_id: TxnId, timestamp: u64) {
63        let version = VersionedValue {
64            value,
65            created_by: txn_id,
66            deleted_by: None,
67            created_at: timestamp,
68        };
69        self.versions.write().insert(0, version);
70    }
71
72    /// Get the visible version for a transaction
73    pub fn get_visible(&self, read_txn: TxnId) -> Option<T> {
74        let versions = self.versions.read();
75        for version in versions.iter() {
76            // Skip if created by a future transaction
77            if version.created_by > read_txn {
78                continue;
79            }
80            // Skip if deleted by a committed transaction <= read_txn
81            if let Some(deleted_by) = version.deleted_by
82                && deleted_by <= read_txn
83            {
84                continue;
85            }
86            return Some(version.value.clone());
87        }
88        None
89    }
90
91    /// Mark the current version as deleted
92    pub fn mark_deleted(&self, txn_id: TxnId) -> bool {
93        let mut versions = self.versions.write();
94        if let Some(version) = versions.first_mut()
95            && version.deleted_by.is_none()
96        {
97            version.deleted_by = Some(txn_id);
98            return true;
99        }
100        false
101    }
102
103    /// Garbage collect old versions not visible to any active transaction
104    pub fn gc(&self, oldest_active_txn: TxnId) {
105        let mut versions = self.versions.write();
106        // Keep at least one version, remove those not visible to any active txn
107        if versions.len() <= 1 {
108            return;
109        }
110
111        versions.retain(|v| {
112            // Keep if created by a transaction >= oldest active
113            // or if it's the most recent version
114            v.created_by >= oldest_active_txn || v.deleted_by.is_none()
115        });
116    }
117}
118
119impl<T: Clone> Default for VersionChain<T> {
120    fn default() -> Self {
121        Self::new()
122    }
123}
124
125/// Version store managing version chains for all keys
126pub struct VersionStore<K, V> {
127    /// Version chains by key
128    chains: RwLock<HashMap<K, VersionChain<V>>>,
129    /// Next transaction ID
130    next_txn_id: AtomicU64,
131}
132
133impl<K: std::hash::Hash + Eq + Clone, V: Clone> VersionStore<K, V> {
134    /// Create a new version store
135    pub fn new() -> Self {
136        Self {
137            chains: RwLock::new(HashMap::new()),
138            next_txn_id: AtomicU64::new(1),
139        }
140    }
141
142    /// Get the next transaction ID
143    pub fn next_txn_id(&self) -> TxnId {
144        self.next_txn_id.fetch_add(1, Ordering::SeqCst)
145    }
146
147    /// Insert or update a value
148    pub fn put(&self, key: K, value: V, txn_id: TxnId) {
149        let timestamp = std::time::SystemTime::now()
150            .duration_since(std::time::UNIX_EPOCH)
151            .unwrap()
152            .as_micros() as u64;
153
154        let chains = self.chains.read();
155        if let Some(chain) = chains.get(&key) {
156            chain.add_version(value, txn_id, timestamp);
157            return;
158        }
159        drop(chains);
160
161        // Need to create new chain
162        let mut chains = self.chains.write();
163        let chain = chains.entry(key).or_default();
164        chain.add_version(value, txn_id, timestamp);
165    }
166
167    /// Get the visible value for a transaction
168    pub fn get(&self, key: &K, read_txn: TxnId) -> Option<V> {
169        let chains = self.chains.read();
170        chains
171            .get(key)
172            .and_then(|chain| chain.get_visible(read_txn))
173    }
174
175    /// Delete a key
176    pub fn delete(&self, key: &K, txn_id: TxnId) -> bool {
177        let chains = self.chains.read();
178        if let Some(chain) = chains.get(key) {
179            return chain.mark_deleted(txn_id);
180        }
181        false
182    }
183
184    /// Garbage collect old versions
185    pub fn gc(&self, oldest_active_txn: TxnId) {
186        let chains = self.chains.read();
187        for chain in chains.values() {
188            chain.gc(oldest_active_txn);
189        }
190    }
191}
192
193impl<K: std::hash::Hash + Eq + Clone, V: Clone> Default for VersionStore<K, V> {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    #[test]
204    fn test_version_chain_basic() {
205        let chain = VersionChain::<i32>::new();
206
207        chain.add_version(1, 1, 100);
208        chain.add_version(2, 2, 200);
209        chain.add_version(3, 3, 300);
210
211        // Transaction 2 should see version 2
212        assert_eq!(chain.get_visible(2), Some(2));
213
214        // Transaction 4 should see version 3
215        assert_eq!(chain.get_visible(4), Some(3));
216
217        // Transaction 1 should see version 1
218        assert_eq!(chain.get_visible(1), Some(1));
219    }
220
221    #[test]
222    fn test_version_store_mvcc() {
223        let store = VersionStore::<String, i32>::new();
224
225        // Insert with transaction 1
226        store.put("key1".to_string(), 100, 1);
227
228        // Update with transaction 2
229        store.put("key1".to_string(), 200, 2);
230
231        // Transaction 1 should see 100
232        assert_eq!(store.get(&"key1".to_string(), 1), Some(100));
233
234        // Transaction 3 should see 200
235        assert_eq!(store.get(&"key1".to_string(), 3), Some(200));
236    }
237
238    #[test]
239    fn test_delete() {
240        let store = VersionStore::<String, i32>::new();
241
242        store.put("key1".to_string(), 100, 1);
243
244        // Delete with transaction 2
245        assert!(store.delete(&"key1".to_string(), 2));
246
247        // Transaction 1 should still see 100
248        assert_eq!(store.get(&"key1".to_string(), 1), Some(100));
249
250        // Transaction 3 should see None (deleted)
251        assert_eq!(store.get(&"key1".to_string(), 3), None);
252    }
253}