sochdb_storage/
version_store.rs1use parking_lot::RwLock;
21use std::collections::HashMap;
22use std::sync::atomic::{AtomicU64, Ordering};
23
24pub type VersionId = u64;
26
27pub type TxnId = u64;
29
30#[derive(Debug, Clone)]
32pub struct VersionedValue<T> {
33 pub value: T,
35 pub created_by: TxnId,
37 pub deleted_by: Option<TxnId>,
39 pub created_at: u64,
41}
42
43#[derive(Debug)]
45pub struct VersionChain<T> {
46 versions: RwLock<Vec<VersionedValue<T>>>,
48}
49
50impl<T: Clone> VersionChain<T> {
51 pub fn new() -> Self {
53 Self {
54 versions: RwLock::new(Vec::new()),
55 }
56 }
57
58 pub fn add_version(&self, value: T, txn_id: TxnId, timestamp: u64) {
60 let version = VersionedValue {
61 value,
62 created_by: txn_id,
63 deleted_by: None,
64 created_at: timestamp,
65 };
66 self.versions.write().insert(0, version);
67 }
68
69 pub fn get_visible(&self, read_txn: TxnId) -> Option<T> {
71 let versions = self.versions.read();
72 for version in versions.iter() {
73 if version.created_by > read_txn {
75 continue;
76 }
77 if let Some(deleted_by) = version.deleted_by
79 && deleted_by <= read_txn
80 {
81 continue;
82 }
83 return Some(version.value.clone());
84 }
85 None
86 }
87
88 pub fn mark_deleted(&self, txn_id: TxnId) -> bool {
90 let mut versions = self.versions.write();
91 if let Some(version) = versions.first_mut()
92 && version.deleted_by.is_none()
93 {
94 version.deleted_by = Some(txn_id);
95 return true;
96 }
97 false
98 }
99
100 pub fn gc(&self, oldest_active_txn: TxnId) {
102 let mut versions = self.versions.write();
103 if versions.len() <= 1 {
105 return;
106 }
107
108 versions.retain(|v| {
109 v.created_by >= oldest_active_txn || v.deleted_by.is_none()
112 });
113 }
114}
115
116impl<T: Clone> Default for VersionChain<T> {
117 fn default() -> Self {
118 Self::new()
119 }
120}
121
122pub struct VersionStore<K, V> {
124 chains: RwLock<HashMap<K, VersionChain<V>>>,
126 next_txn_id: AtomicU64,
128}
129
130impl<K: std::hash::Hash + Eq + Clone, V: Clone> VersionStore<K, V> {
131 pub fn new() -> Self {
133 Self {
134 chains: RwLock::new(HashMap::new()),
135 next_txn_id: AtomicU64::new(1),
136 }
137 }
138
139 pub fn next_txn_id(&self) -> TxnId {
141 self.next_txn_id.fetch_add(1, Ordering::SeqCst)
142 }
143
144 pub fn put(&self, key: K, value: V, txn_id: TxnId) {
146 let timestamp = std::time::SystemTime::now()
147 .duration_since(std::time::UNIX_EPOCH)
148 .unwrap()
149 .as_micros() as u64;
150
151 let chains = self.chains.read();
152 if let Some(chain) = chains.get(&key) {
153 chain.add_version(value, txn_id, timestamp);
154 return;
155 }
156 drop(chains);
157
158 let mut chains = self.chains.write();
160 let chain = chains.entry(key).or_default();
161 chain.add_version(value, txn_id, timestamp);
162 }
163
164 pub fn get(&self, key: &K, read_txn: TxnId) -> Option<V> {
166 let chains = self.chains.read();
167 chains
168 .get(key)
169 .and_then(|chain| chain.get_visible(read_txn))
170 }
171
172 pub fn delete(&self, key: &K, txn_id: TxnId) -> bool {
174 let chains = self.chains.read();
175 if let Some(chain) = chains.get(key) {
176 return chain.mark_deleted(txn_id);
177 }
178 false
179 }
180
181 pub fn gc(&self, oldest_active_txn: TxnId) {
183 let chains = self.chains.read();
184 for chain in chains.values() {
185 chain.gc(oldest_active_txn);
186 }
187 }
188}
189
190impl<K: std::hash::Hash + Eq + Clone, V: Clone> Default for VersionStore<K, V> {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199
200 #[test]
201 fn test_version_chain_basic() {
202 let chain = VersionChain::<i32>::new();
203
204 chain.add_version(1, 1, 100);
205 chain.add_version(2, 2, 200);
206 chain.add_version(3, 3, 300);
207
208 assert_eq!(chain.get_visible(2), Some(2));
210
211 assert_eq!(chain.get_visible(4), Some(3));
213
214 assert_eq!(chain.get_visible(1), Some(1));
216 }
217
218 #[test]
219 fn test_version_store_mvcc() {
220 let store = VersionStore::<String, i32>::new();
221
222 store.put("key1".to_string(), 100, 1);
224
225 store.put("key1".to_string(), 200, 2);
227
228 assert_eq!(store.get(&"key1".to_string(), 1), Some(100));
230
231 assert_eq!(store.get(&"key1".to_string(), 3), Some(200));
233 }
234
235 #[test]
236 fn test_delete() {
237 let store = VersionStore::<String, i32>::new();
238
239 store.put("key1".to_string(), 100, 1);
240
241 assert!(store.delete(&"key1".to_string(), 2));
243
244 assert_eq!(store.get(&"key1".to_string(), 1), Some(100));
246
247 assert_eq!(store.get(&"key1".to_string(), 3), None);
249 }
250}