dbx_core/transaction/mvcc/
version_manager.rs1use crate::error::{DbxError, DbxResult};
6use crate::transaction::mvcc::manager::TimestampOracle;
7use crate::transaction::mvcc::versionable::Versionable;
8use std::collections::BTreeMap;
9use std::sync::{Arc, RwLock};
10
11type VersionStorage<T> = Arc<RwLock<BTreeMap<Vec<u8>, Vec<(u64, T)>>>>;
41
42#[derive(Clone)]
43pub struct VersionManager<T: Versionable> {
44 versions: VersionStorage<T>,
47
48 #[allow(dead_code)]
50 oracle: Option<Arc<TimestampOracle>>,
51}
52
53impl<T: Versionable> VersionManager<T> {
54 pub fn new(oracle: Arc<TimestampOracle>) -> Self {
60 Self {
61 versions: Arc::new(RwLock::new(BTreeMap::new())),
62 oracle: Some(oracle),
63 }
64 }
65
66 pub fn new_without_oracle() -> Self {
70 Self {
71 versions: Arc::new(RwLock::new(BTreeMap::new())),
72 oracle: None,
73 }
74 }
75
76 pub fn add_version(&self, key: Vec<u8>, value: T, commit_ts: u64) -> DbxResult<()> {
88 let mut versions = self
89 .versions
90 .write()
91 .map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
92
93 let version_list = versions.entry(key).or_insert_with(Vec::new);
94
95 let insert_pos = version_list
97 .binary_search_by(|(ts, _)| commit_ts.cmp(ts))
98 .unwrap_or_else(|pos| pos);
99
100 version_list.insert(insert_pos, (commit_ts, value));
101
102 Ok(())
103 }
104
105 pub fn get_at_snapshot(&self, key: &[u8], read_ts: u64) -> DbxResult<Option<T>> {
118 let versions = self
119 .versions
120 .read()
121 .map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
122
123 if let Some(version_list) = versions.get(key) {
124 for (commit_ts, value) in version_list {
127 if *commit_ts <= read_ts {
128 return Ok(Some(value.clone()));
129 }
130 }
131 }
132
133 Ok(None)
134 }
135
136 pub fn collect_garbage(&self, min_active_ts: u64) -> DbxResult<usize> {
149 let mut versions = self
150 .versions
151 .write()
152 .map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
153
154 let mut deleted_count = 0;
155
156 for (_key, version_list) in versions.iter_mut() {
157 if version_list.len() <= 1 {
158 continue;
160 }
161
162 let mut to_remove = Vec::new();
164
165 for (i, (commit_ts, _)) in version_list.iter().enumerate() {
166 if i == 0 {
168 continue;
169 }
170
171 if *commit_ts < min_active_ts {
173 to_remove.push(i);
174 }
175 }
176
177 for &idx in to_remove.iter().rev() {
179 version_list.remove(idx);
180 deleted_count += 1;
181 }
182 }
183
184 Ok(deleted_count)
185 }
186
187 pub fn version_count(&self, key: &[u8]) -> DbxResult<usize> {
189 let versions = self
190 .versions
191 .read()
192 .map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
193
194 Ok(versions.get(key).map(|v| v.len()).unwrap_or(0))
195 }
196
197 pub fn key_count(&self) -> DbxResult<usize> {
199 let versions = self
200 .versions
201 .read()
202 .map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
203
204 Ok(versions.len())
205 }
206
207 pub fn total_version_count(&self) -> DbxResult<usize> {
209 let versions = self
210 .versions
211 .read()
212 .map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
213
214 Ok(versions.values().map(|v| v.len()).sum())
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221
222 #[test]
223 fn test_version_manager_add_and_get() -> DbxResult<()> {
224 let oracle = Arc::new(TimestampOracle::default());
225 let manager = VersionManager::<String>::new(Arc::clone(&oracle));
226
227 manager.add_version(b"user:1".to_vec(), "Alice v1".to_string(), 10)?;
229 manager.add_version(b"user:1".to_vec(), "Alice v2".to_string(), 20)?;
230 manager.add_version(b"user:1".to_vec(), "Alice v3".to_string(), 30)?;
231
232 assert_eq!(
234 manager.get_at_snapshot(b"user:1", 5)?,
235 None );
237 assert_eq!(
238 manager.get_at_snapshot(b"user:1", 15)?,
239 Some("Alice v1".to_string()) );
241 assert_eq!(
242 manager.get_at_snapshot(b"user:1", 25)?,
243 Some("Alice v2".to_string()) );
245 assert_eq!(
246 manager.get_at_snapshot(b"user:1", 35)?,
247 Some("Alice v3".to_string()) );
249
250 Ok(())
251 }
252
253 #[test]
254 fn test_version_manager_snapshot_isolation() -> DbxResult<()> {
255 let oracle = Arc::new(TimestampOracle::default());
256 let manager = VersionManager::<Vec<u8>>::new(Arc::clone(&oracle));
257
258 manager.add_version(b"key1".to_vec(), b"value1".to_vec(), 10)?;
260
261 let snapshot_ts = 15;
263 let value_at_15 = manager.get_at_snapshot(b"key1", snapshot_ts)?;
264 assert_eq!(value_at_15, Some(b"value1".to_vec()));
265
266 manager.add_version(b"key1".to_vec(), b"value2".to_vec(), 20)?;
268
269 let value_at_15_again = manager.get_at_snapshot(b"key1", snapshot_ts)?;
271 assert_eq!(value_at_15_again, Some(b"value1".to_vec()));
272
273 let value_at_25 = manager.get_at_snapshot(b"key1", 25)?;
275 assert_eq!(value_at_25, Some(b"value2".to_vec()));
276
277 Ok(())
278 }
279
280 #[test]
281 fn test_version_manager_garbage_collection() -> DbxResult<()> {
282 let oracle = Arc::new(TimestampOracle::default());
283 let manager = VersionManager::<String>::new(Arc::clone(&oracle));
284
285 manager.add_version(b"key1".to_vec(), "v1".to_string(), 10)?;
287 manager.add_version(b"key1".to_vec(), "v2".to_string(), 20)?;
288 manager.add_version(b"key1".to_vec(), "v3".to_string(), 30)?;
289 manager.add_version(b"key1".to_vec(), "v4".to_string(), 40)?;
290
291 assert_eq!(manager.version_count(b"key1")?, 4);
293
294 let deleted = manager.collect_garbage(25)?;
297 assert_eq!(deleted, 2);
298
299 assert_eq!(manager.version_count(b"key1")?, 2);
301
302 assert_eq!(
304 manager.get_at_snapshot(b"key1", 15)?,
305 None );
307 assert_eq!(
308 manager.get_at_snapshot(b"key1", 35)?,
309 Some("v3".to_string()) );
311 assert_eq!(
312 manager.get_at_snapshot(b"key1", 45)?,
313 Some("v4".to_string()) );
315
316 Ok(())
317 }
318
319 #[test]
320 fn test_version_manager_multiple_keys() -> DbxResult<()> {
321 let manager = VersionManager::<String>::new_without_oracle();
322
323 manager.add_version(b"user:1".to_vec(), "Alice".to_string(), 10)?;
325 manager.add_version(b"user:2".to_vec(), "Bob".to_string(), 15)?;
326 manager.add_version(b"user:1".to_vec(), "Alice Updated".to_string(), 20)?;
327
328 assert_eq!(manager.key_count()?, 2);
330
331 assert_eq!(manager.total_version_count()?, 3);
333
334 assert_eq!(
336 manager.get_at_snapshot(b"user:1", 12)?,
337 Some("Alice".to_string())
338 );
339 assert_eq!(
340 manager.get_at_snapshot(b"user:2", 18)?,
341 Some("Bob".to_string())
342 );
343 assert_eq!(
344 manager.get_at_snapshot(b"user:1", 25)?,
345 Some("Alice Updated".to_string())
346 );
347
348 Ok(())
349 }
350
351 #[test]
352 fn test_version_manager_gc_preserves_latest() -> DbxResult<()> {
353 let manager = VersionManager::<Vec<u8>>::new_without_oracle();
354
355 manager.add_version(b"key1".to_vec(), b"value1".to_vec(), 10)?;
357
358 let deleted = manager.collect_garbage(100)?;
360
361 assert_eq!(deleted, 0);
363 assert_eq!(manager.version_count(b"key1")?, 1);
364
365 Ok(())
366 }
367}