1use crate::Snapshot;
4use bytes::{Buf, BufMut};
5use featherdb_core::{Error, Result, TransactionId, Value};
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub struct VersionPtr(pub u64);
12
13impl VersionPtr {
14 pub const NULL: VersionPtr = VersionPtr(0);
15
16 pub fn is_null(&self) -> bool {
17 self.0 == 0
18 }
19}
20
21#[derive(Debug, Clone)]
23pub struct VersionedValue {
24 pub current: Vec<Value>,
26 pub created_by: TransactionId,
28 pub deleted_by: Option<TransactionId>,
30 pub version_chain: Option<VersionPtr>,
32}
33
34impl VersionedValue {
35 pub fn new(data: Vec<Value>, created_by: TransactionId) -> Self {
37 VersionedValue {
38 current: data,
39 created_by,
40 deleted_by: None,
41 version_chain: None,
42 }
43 }
44
45 pub fn mark_deleted(&mut self, deleted_by: TransactionId) {
47 self.deleted_by = Some(deleted_by);
48 }
49
50 pub fn is_visible(&self, snapshot: &Snapshot) -> bool {
52 snapshot.is_visible(self.created_by, self.deleted_by)
53 }
54
55 pub fn serialize(&self) -> Vec<u8> {
57 let mut buf = Vec::new();
58
59 buf.put_u64_le(self.created_by.0);
61
62 match self.deleted_by {
64 Some(txn) => {
65 buf.put_u8(1);
66 buf.put_u64_le(txn.0);
67 }
68 None => {
69 buf.put_u8(0);
70 }
71 }
72
73 match self.version_chain {
75 Some(ptr) => {
76 buf.put_u8(1);
77 buf.put_u64_le(ptr.0);
78 }
79 None => {
80 buf.put_u8(0);
81 }
82 }
83
84 buf.put_u32_le(self.current.len() as u32);
86
87 for value in &self.current {
89 value.serialize(&mut buf);
90 }
91
92 buf
93 }
94
95 pub fn deserialize(data: &[u8]) -> Result<Self> {
97 let mut cursor = data;
98
99 if cursor.remaining() < 9 {
100 return Err(Error::Serialization("VersionedValue too small".into()));
101 }
102
103 let created_by = TransactionId(cursor.get_u64_le());
104
105 let deleted_by = if cursor.get_u8() == 1 {
106 Some(TransactionId(cursor.get_u64_le()))
107 } else {
108 None
109 };
110
111 let version_chain = if cursor.get_u8() == 1 {
112 Some(VersionPtr(cursor.get_u64_le()))
113 } else {
114 None
115 };
116
117 let value_count = cursor.get_u32_le() as usize;
118 let mut current = Vec::with_capacity(value_count);
119
120 for _ in 0..value_count {
121 current.push(Value::deserialize(&mut cursor)?);
122 }
123
124 Ok(VersionedValue {
125 current,
126 created_by,
127 deleted_by,
128 version_chain,
129 })
130 }
131}
132
133#[derive(Debug, Clone)]
135pub struct OldVersion {
136 pub data: Vec<Value>,
138 pub created_by: TransactionId,
140 pub deleted_by: Option<TransactionId>,
142 pub prev_version: Option<VersionPtr>,
144}
145
146impl OldVersion {
147 pub fn serialize(&self) -> Vec<u8> {
149 let mut buf = Vec::new();
150
151 buf.put_u64_le(self.created_by.0);
152
153 match self.deleted_by {
154 Some(txn) => {
155 buf.put_u8(1);
156 buf.put_u64_le(txn.0);
157 }
158 None => {
159 buf.put_u8(0);
160 }
161 }
162
163 match self.prev_version {
164 Some(ptr) => {
165 buf.put_u8(1);
166 buf.put_u64_le(ptr.0);
167 }
168 None => {
169 buf.put_u8(0);
170 }
171 }
172
173 buf.put_u32_le(self.data.len() as u32);
174 for value in &self.data {
175 value.serialize(&mut buf);
176 }
177
178 buf
179 }
180
181 pub fn deserialize(data: &[u8]) -> Result<Self> {
183 let mut cursor = data;
184
185 let created_by = TransactionId(cursor.get_u64_le());
186
187 let deleted_by = if cursor.get_u8() == 1 {
188 Some(TransactionId(cursor.get_u64_le()))
189 } else {
190 None
191 };
192
193 let prev_version = if cursor.get_u8() == 1 {
194 Some(VersionPtr(cursor.get_u64_le()))
195 } else {
196 None
197 };
198
199 let value_count = cursor.get_u32_le() as usize;
200 let mut data_values = Vec::with_capacity(value_count);
201
202 for _ in 0..value_count {
203 data_values.push(Value::deserialize(&mut cursor)?);
204 }
205
206 Ok(OldVersion {
207 data: data_values,
208 created_by,
209 deleted_by,
210 prev_version,
211 })
212 }
213}
214
215pub struct VersionStore {
217 versions: HashMap<VersionPtr, OldVersion>,
219 next_ptr: AtomicU64,
221}
222
223impl VersionStore {
224 pub fn new() -> Self {
226 VersionStore {
227 versions: HashMap::new(),
228 next_ptr: AtomicU64::new(1), }
230 }
231
232 pub fn insert(&mut self, version: OldVersion) -> VersionPtr {
234 let ptr = VersionPtr(self.next_ptr.fetch_add(1, Ordering::SeqCst));
235 self.versions.insert(ptr, version);
236 ptr
237 }
238
239 pub fn get(&self, ptr: VersionPtr) -> Option<&OldVersion> {
241 self.versions.get(&ptr)
242 }
243
244 pub fn remove(&mut self, ptr: VersionPtr) -> Option<OldVersion> {
246 self.versions.remove(&ptr)
247 }
248
249 pub fn len(&self) -> usize {
251 self.versions.len()
252 }
253
254 pub fn is_empty(&self) -> bool {
256 self.versions.is_empty()
257 }
258}
259
260impl Default for VersionStore {
261 fn default() -> Self {
262 Self::new()
263 }
264}
265
266#[derive(Debug, Clone, Default)]
268pub struct GcStats {
269 pub versions_removed: usize,
271 pub bytes_freed: usize,
273}
274
275impl VersionStore {
276 pub fn gc(&mut self, oldest_active_txn: Option<featherdb_core::TransactionId>) -> GcStats {
289 let mut stats = GcStats::default();
290
291 let oldest = match oldest_active_txn {
294 Some(txn) => txn.0,
295 None => u64::MAX, };
297
298 let to_remove: Vec<VersionPtr> = self
300 .versions
301 .iter()
302 .filter_map(|(ptr, version)| {
303 if let Some(deleted_by) = version.deleted_by {
307 if deleted_by.0 < oldest {
308 return Some(*ptr);
311 }
312 }
313 None
314 })
315 .collect();
316
317 for ptr in to_remove {
319 if let Some(version) = self.versions.remove(&ptr) {
320 stats.versions_removed += 1;
321 stats.bytes_freed += version
323 .data
324 .iter()
325 .map(|v| v.size_estimate())
326 .sum::<usize>();
327 }
328 }
329
330 stats
331 }
332
333 pub fn pointers(&self) -> impl Iterator<Item = &VersionPtr> {
335 self.versions.keys()
336 }
337}
338
339pub struct VersionChain<'a> {
341 version_store: &'a VersionStore,
342 current: Option<VersionPtr>,
343}
344
345impl<'a> VersionChain<'a> {
346 pub fn new(version_store: &'a VersionStore, start: Option<VersionPtr>) -> Self {
348 VersionChain {
349 version_store,
350 current: start,
351 }
352 }
353
354 pub fn find_visible(&mut self, snapshot: &Snapshot) -> Option<&'a OldVersion> {
356 while let Some(ptr) = self.current {
357 if let Some(version) = self.version_store.get(ptr) {
358 if snapshot.is_visible(version.created_by, version.deleted_by) {
359 return Some(version);
360 }
361 self.current = version.prev_version;
362 } else {
363 break;
364 }
365 }
366 None
367 }
368}
369
370impl<'a> Iterator for VersionChain<'a> {
371 type Item = &'a OldVersion;
372
373 fn next(&mut self) -> Option<Self::Item> {
374 let ptr = self.current?;
375 let version = self.version_store.get(ptr)?;
376 self.current = version.prev_version;
377 Some(version)
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384
385 #[test]
386 fn test_versioned_value_serialization() {
387 let value = VersionedValue {
388 current: vec![Value::Integer(42), Value::Text("hello".into())],
389 created_by: TransactionId(10),
390 deleted_by: Some(TransactionId(20)),
391 version_chain: Some(VersionPtr(5)),
392 };
393
394 let serialized = value.serialize();
395 let deserialized = VersionedValue::deserialize(&serialized).unwrap();
396
397 assert_eq!(value.created_by, deserialized.created_by);
398 assert_eq!(value.deleted_by, deserialized.deleted_by);
399 assert_eq!(value.version_chain, deserialized.version_chain);
400 assert_eq!(value.current.len(), deserialized.current.len());
401 }
402
403 #[test]
404 fn test_version_store() {
405 let mut store = VersionStore::new();
406
407 let v1 = OldVersion {
408 data: vec![Value::Integer(1)],
409 created_by: TransactionId(1),
410 deleted_by: None,
411 prev_version: None,
412 };
413
414 let ptr1 = store.insert(v1);
415 assert!(!ptr1.is_null());
416 assert!(store.get(ptr1).is_some());
417 }
418
419 #[test]
420 fn test_version_chain() {
421 let mut store = VersionStore::new();
422
423 let v1 = OldVersion {
425 data: vec![Value::Integer(1)],
426 created_by: TransactionId(1),
427 deleted_by: Some(TransactionId(2)),
428 prev_version: None,
429 };
430 let ptr1 = store.insert(v1);
431
432 let v2 = OldVersion {
433 data: vec![Value::Integer(2)],
434 created_by: TransactionId(2),
435 deleted_by: Some(TransactionId(3)),
436 prev_version: Some(ptr1),
437 };
438 let ptr2 = store.insert(v2);
439
440 let chain: Vec<_> = VersionChain::new(&store, Some(ptr2)).collect();
442 assert_eq!(chain.len(), 2);
443 assert_eq!(chain[0].data[0], Value::Integer(2));
444 assert_eq!(chain[1].data[0], Value::Integer(1));
445 }
446
447 #[test]
448 fn test_gc_removes_old_deleted_versions() {
449 let mut store = VersionStore::new();
450
451 let v1 = OldVersion {
453 data: vec![Value::Integer(100)],
454 created_by: TransactionId(1),
455 deleted_by: Some(TransactionId(5)),
456 prev_version: None,
457 };
458 store.insert(v1);
459
460 let v2 = OldVersion {
462 data: vec![Value::Integer(200)],
463 created_by: TransactionId(2),
464 deleted_by: Some(TransactionId(10)),
465 prev_version: None,
466 };
467 store.insert(v2);
468
469 let v3 = OldVersion {
471 data: vec![Value::Integer(300)],
472 created_by: TransactionId(3),
473 deleted_by: None,
474 prev_version: None,
475 };
476 store.insert(v3);
477
478 assert_eq!(store.len(), 3);
479
480 let stats = store.gc(Some(TransactionId(7)));
485
486 assert_eq!(stats.versions_removed, 1);
487 assert_eq!(store.len(), 2);
488 }
489
490 #[test]
491 fn test_gc_with_no_active_transactions() {
492 let mut store = VersionStore::new();
493
494 let v1 = OldVersion {
496 data: vec![Value::Integer(100)],
497 created_by: TransactionId(1),
498 deleted_by: Some(TransactionId(5)),
499 prev_version: None,
500 };
501 store.insert(v1);
502
503 let v2 = OldVersion {
504 data: vec![Value::Integer(200)],
505 created_by: TransactionId(2),
506 deleted_by: Some(TransactionId(10)),
507 prev_version: None,
508 };
509 store.insert(v2);
510
511 let v3 = OldVersion {
513 data: vec![Value::Integer(300)],
514 created_by: TransactionId(3),
515 deleted_by: None,
516 prev_version: None,
517 };
518 store.insert(v3);
519
520 assert_eq!(store.len(), 3);
521
522 let stats = store.gc(None);
525
526 assert_eq!(stats.versions_removed, 2);
527 assert_eq!(store.len(), 1); }
529
530 #[test]
531 fn test_gc_bytes_freed() {
532 let mut store = VersionStore::new();
533
534 let v1 = OldVersion {
536 data: vec![Value::Text("hello world".to_string())],
537 created_by: TransactionId(1),
538 deleted_by: Some(TransactionId(5)),
539 prev_version: None,
540 };
541 store.insert(v1);
542
543 let stats = store.gc(Some(TransactionId(10)));
544
545 assert_eq!(stats.versions_removed, 1);
546 assert!(stats.bytes_freed > 0);
547 }
548}