sochdb_storage/
version_store.rs1use parking_lot::RwLock;
24use std::collections::HashMap;
25use std::sync::atomic::{AtomicU64, Ordering};
26
27pub type VersionId = u64;
29
30pub type TxnId = u64;
32
33#[derive(Debug, Clone)]
35pub struct VersionedValue<T> {
36 pub value: T,
38 pub created_by: TxnId,
40 pub deleted_by: Option<TxnId>,
42 pub created_at: u64,
44}
45
46#[derive(Debug)]
48pub struct VersionChain<T> {
49 versions: RwLock<Vec<VersionedValue<T>>>,
51}
52
53impl<T: Clone> VersionChain<T> {
54 pub fn new() -> Self {
56 Self {
57 versions: RwLock::new(Vec::new()),
58 }
59 }
60
61 pub fn add_version(&self, value: T, txn_id: TxnId, timestamp: u64) {
63 let version = VersionedValue {
64 value,
65 created_by: txn_id,
66 deleted_by: None,
67 created_at: timestamp,
68 };
69 self.versions.write().insert(0, version);
70 }
71
72 pub fn get_visible(&self, read_txn: TxnId) -> Option<T> {
74 let versions = self.versions.read();
75 for version in versions.iter() {
76 if version.created_by > read_txn {
78 continue;
79 }
80 if let Some(deleted_by) = version.deleted_by
82 && deleted_by <= read_txn
83 {
84 continue;
85 }
86 return Some(version.value.clone());
87 }
88 None
89 }
90
91 pub fn mark_deleted(&self, txn_id: TxnId) -> bool {
93 let mut versions = self.versions.write();
94 if let Some(version) = versions.first_mut()
95 && version.deleted_by.is_none()
96 {
97 version.deleted_by = Some(txn_id);
98 return true;
99 }
100 false
101 }
102
103 pub fn gc(&self, oldest_active_txn: TxnId) {
105 let mut versions = self.versions.write();
106 if versions.len() <= 1 {
108 return;
109 }
110
111 versions.retain(|v| {
112 v.created_by >= oldest_active_txn || v.deleted_by.is_none()
115 });
116 }
117}
118
119impl<T: Clone> Default for VersionChain<T> {
120 fn default() -> Self {
121 Self::new()
122 }
123}
124
125pub struct VersionStore<K, V> {
127 chains: RwLock<HashMap<K, VersionChain<V>>>,
129 next_txn_id: AtomicU64,
131}
132
133impl<K: std::hash::Hash + Eq + Clone, V: Clone> VersionStore<K, V> {
134 pub fn new() -> Self {
136 Self {
137 chains: RwLock::new(HashMap::new()),
138 next_txn_id: AtomicU64::new(1),
139 }
140 }
141
142 pub fn next_txn_id(&self) -> TxnId {
144 self.next_txn_id.fetch_add(1, Ordering::SeqCst)
145 }
146
147 pub fn put(&self, key: K, value: V, txn_id: TxnId) {
149 let timestamp = std::time::SystemTime::now()
150 .duration_since(std::time::UNIX_EPOCH)
151 .unwrap()
152 .as_micros() as u64;
153
154 let chains = self.chains.read();
155 if let Some(chain) = chains.get(&key) {
156 chain.add_version(value, txn_id, timestamp);
157 return;
158 }
159 drop(chains);
160
161 let mut chains = self.chains.write();
163 let chain = chains.entry(key).or_default();
164 chain.add_version(value, txn_id, timestamp);
165 }
166
167 pub fn get(&self, key: &K, read_txn: TxnId) -> Option<V> {
169 let chains = self.chains.read();
170 chains
171 .get(key)
172 .and_then(|chain| chain.get_visible(read_txn))
173 }
174
175 pub fn delete(&self, key: &K, txn_id: TxnId) -> bool {
177 let chains = self.chains.read();
178 if let Some(chain) = chains.get(key) {
179 return chain.mark_deleted(txn_id);
180 }
181 false
182 }
183
184 pub fn gc(&self, oldest_active_txn: TxnId) {
186 let chains = self.chains.read();
187 for chain in chains.values() {
188 chain.gc(oldest_active_txn);
189 }
190 }
191}
192
193impl<K: std::hash::Hash + Eq + Clone, V: Clone> Default for VersionStore<K, V> {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 #[test]
204 fn test_version_chain_basic() {
205 let chain = VersionChain::<i32>::new();
206
207 chain.add_version(1, 1, 100);
208 chain.add_version(2, 2, 200);
209 chain.add_version(3, 3, 300);
210
211 assert_eq!(chain.get_visible(2), Some(2));
213
214 assert_eq!(chain.get_visible(4), Some(3));
216
217 assert_eq!(chain.get_visible(1), Some(1));
219 }
220
221 #[test]
222 fn test_version_store_mvcc() {
223 let store = VersionStore::<String, i32>::new();
224
225 store.put("key1".to_string(), 100, 1);
227
228 store.put("key1".to_string(), 200, 2);
230
231 assert_eq!(store.get(&"key1".to_string(), 1), Some(100));
233
234 assert_eq!(store.get(&"key1".to_string(), 3), Some(200));
236 }
237
238 #[test]
239 fn test_delete() {
240 let store = VersionStore::<String, i32>::new();
241
242 store.put("key1".to_string(), 100, 1);
243
244 assert!(store.delete(&"key1".to_string(), 2));
246
247 assert_eq!(store.get(&"key1".to_string(), 1), Some(100));
249
250 assert_eq!(store.get(&"key1".to_string(), 3), None);
252 }
253}