reddb_server/storage/btree/
version.rs1pub use crate::storage::primitives::ids::{current_timestamp, next_timestamp, Timestamp, TxnId};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum VersionVisibility {
15 Visible,
17 Uncommitted,
19 Deleted,
21 Future,
23}
24
25#[derive(Debug, Clone)]
27pub struct Version<V: Clone> {
28 pub created_by: TxnId,
30 pub created_at: Timestamp,
32 pub deleted_by: TxnId,
34 pub deleted_at: Timestamp,
36 pub value: Option<V>,
38 pub prev: Option<Box<Version<V>>>,
40}
41
42impl<V: Clone> Version<V> {
43 pub fn new(value: V, txn_id: TxnId, timestamp: Timestamp) -> Self {
45 Self {
46 created_by: txn_id,
47 created_at: timestamp,
48 deleted_by: TxnId::ZERO,
49 deleted_at: Timestamp::EPOCH,
50 value: Some(value),
51 prev: None,
52 }
53 }
54
55 pub fn tombstone(txn_id: TxnId, timestamp: Timestamp) -> Self {
57 Self {
58 created_by: txn_id,
59 created_at: timestamp,
60 deleted_by: TxnId::ZERO,
61 deleted_at: Timestamp::EPOCH,
62 value: None,
63 prev: None,
64 }
65 }
66
67 pub fn is_tombstone(&self) -> bool {
69 self.value.is_none()
70 }
71
72 pub fn is_deleted(&self) -> bool {
74 !self.deleted_by.is_zero()
75 }
76
77 pub fn mark_deleted(&mut self, txn_id: TxnId, timestamp: Timestamp) {
79 self.deleted_by = txn_id;
80 self.deleted_at = timestamp;
81 }
82
83 pub fn check_visibility(&self, snapshot: &Snapshot) -> VersionVisibility {
85 if !self.created_by.is_zero() && !snapshot.is_committed(self.created_by) {
87 if self.created_by == snapshot.txn_id {
88 if self.is_deleted() && self.deleted_by == snapshot.txn_id {
90 return VersionVisibility::Deleted;
91 }
92 return VersionVisibility::Visible;
93 }
94 return VersionVisibility::Uncommitted;
95 }
96
97 if self.created_at > snapshot.start_ts {
99 return VersionVisibility::Future;
100 }
101
102 if self.is_deleted()
104 && snapshot.is_committed(self.deleted_by)
105 && self.deleted_at <= snapshot.start_ts
106 {
107 return VersionVisibility::Deleted;
108 }
109
110 VersionVisibility::Visible
112 }
113}
114
115#[derive(Debug, Clone)]
117pub struct VersionChain<V: Clone> {
118 head: Option<Box<Version<V>>>,
120 version_count: usize,
122 oldest_ts: Timestamp,
124}
125
126impl<V: Clone> VersionChain<V> {
127 pub fn new() -> Self {
129 Self {
130 head: None,
131 version_count: 0,
132 oldest_ts: Timestamp::EPOCH,
133 }
134 }
135
136 pub fn with_value(value: V, txn_id: TxnId, timestamp: Timestamp) -> Self {
138 Self {
139 head: Some(Box::new(Version::new(value, txn_id, timestamp))),
140 version_count: 1,
141 oldest_ts: timestamp,
142 }
143 }
144
145 pub fn is_empty(&self) -> bool {
147 self.head.is_none()
148 }
149
150 pub fn len(&self) -> usize {
152 self.version_count
153 }
154
155 pub fn get(&self, snapshot: &Snapshot) -> Option<&V> {
157 let mut current = self.head.as_ref();
158
159 while let Some(version) = current {
160 match version.check_visibility(snapshot) {
161 VersionVisibility::Visible => {
162 return version.value.as_ref();
163 }
164 VersionVisibility::Deleted => {
165 return None;
166 }
167 VersionVisibility::Uncommitted | VersionVisibility::Future => {
168 current = version.prev.as_ref();
170 }
171 }
172 }
173
174 None
175 }
176
177 pub fn insert(&mut self, value: V, txn_id: TxnId, timestamp: Timestamp) {
179 let mut new_version = Box::new(Version::new(value, txn_id, timestamp));
180 new_version.prev = self.head.take();
181 self.head = Some(new_version);
182 self.version_count += 1;
183
184 if self.oldest_ts.is_epoch() {
185 self.oldest_ts = timestamp;
186 }
187 }
188
189 pub fn update(&mut self, value: V, txn_id: TxnId, timestamp: Timestamp) {
191 self.insert(value, txn_id, timestamp);
192 }
193
194 pub fn delete(&mut self, txn_id: TxnId, timestamp: Timestamp) {
196 let mut tombstone = Box::new(Version::tombstone(txn_id, timestamp));
197 tombstone.prev = self.head.take();
198 self.head = Some(tombstone);
199 self.version_count += 1;
200 }
201
202 pub fn head(&self) -> Option<&Version<V>> {
204 self.head.as_ref().map(|v| v.as_ref())
205 }
206
207 pub fn head_mut(&mut self) -> Option<&mut Version<V>> {
209 self.head.as_mut().map(|v| v.as_mut())
210 }
211
212 pub fn gc(&mut self, watermark: Timestamp) -> usize {
214 let mut removed = 0;
215
216 let mut current = &mut self.head;
218 let mut found_visible = false;
219
220 while let Some(version) = current {
221 if version.created_at <= watermark {
223 if found_visible {
224 if let Some(prev) = version.prev.take() {
226 removed += 1 + self.count_chain(&prev);
227 }
228 break;
229 }
230 found_visible = true;
231 }
232 current = &mut version.prev;
233 }
234
235 self.version_count -= removed;
236 removed
237 }
238
239 fn count_chain(&self, version: &Version<V>) -> usize {
241 let mut count = 1;
242 let mut current = version.prev.as_ref();
243 while let Some(v) = current {
244 count += 1;
245 current = v.prev.as_ref();
246 }
247 count
248 }
249
250 pub fn is_all_deleted(&self) -> bool {
252 let mut current = self.head.as_ref();
253 while let Some(version) = current {
254 if !version.is_tombstone() {
255 return false;
256 }
257 current = version.prev.as_ref();
258 }
259 true
260 }
261
262 pub fn oldest_timestamp(&self) -> Timestamp {
264 self.oldest_ts
265 }
266}
267
268impl<V: Clone> Default for VersionChain<V> {
269 fn default() -> Self {
270 Self::new()
271 }
272}
273
274#[derive(Debug, Clone)]
276pub struct Snapshot {
277 pub txn_id: TxnId,
279 pub start_ts: Timestamp,
281 active_txns: Vec<TxnId>,
283 committed_txns: Vec<TxnId>,
285}
286
287impl Snapshot {
288 pub fn new(txn_id: TxnId, start_ts: Timestamp) -> Self {
290 Self {
291 txn_id,
292 start_ts,
293 active_txns: Vec::new(),
294 committed_txns: Vec::new(),
295 }
296 }
297
298 pub fn with_active(txn_id: TxnId, start_ts: Timestamp, active: Vec<TxnId>) -> Self {
300 Self {
301 txn_id,
302 start_ts,
303 active_txns: active,
304 committed_txns: Vec::new(),
305 }
306 }
307
308 pub fn add_committed(&mut self, txn_id: TxnId) {
310 if !self.committed_txns.contains(&txn_id) {
311 self.committed_txns.push(txn_id);
312 }
313 }
314
315 pub fn is_committed(&self, txn_id: TxnId) -> bool {
317 if txn_id.is_zero() {
319 return true;
320 }
321
322 if self.active_txns.contains(&txn_id) {
324 return false;
325 }
326
327 if self.committed_txns.contains(&txn_id) {
329 return true;
330 }
331
332 true
335 }
336
337 pub fn is_active(&self, txn_id: TxnId) -> bool {
339 self.active_txns.contains(&txn_id)
340 }
341}
342
343#[derive(Debug, Clone, Copy, PartialEq, Eq)]
345pub enum TxnState {
346 Active,
348 Committed,
350 Aborted,
352}
353
354#[derive(Debug)]
356pub struct ActiveTransaction {
357 pub id: TxnId,
359 pub start_ts: Timestamp,
361 pub state: TxnState,
363 pub snapshot: Snapshot,
365 write_set: Vec<Vec<u8>>,
367 read_set: Vec<Vec<u8>>,
369}
370
371impl ActiveTransaction {
372 pub fn new(id: TxnId, active_txns: Vec<TxnId>) -> Self {
374 let start_ts = next_timestamp();
375 Self {
376 id,
377 start_ts,
378 state: TxnState::Active,
379 snapshot: Snapshot::with_active(id, start_ts, active_txns),
380 write_set: Vec::new(),
381 read_set: Vec::new(),
382 }
383 }
384
385 pub fn record_read(&mut self, key: &[u8]) {
387 if !self.read_set.iter().any(|k| k == key) {
388 self.read_set.push(key.to_vec());
389 }
390 }
391
392 pub fn record_write(&mut self, key: &[u8]) {
394 if !self.write_set.iter().any(|k| k == key) {
395 self.write_set.push(key.to_vec());
396 }
397 }
398
399 pub fn write_set(&self) -> &[Vec<u8>] {
401 &self.write_set
402 }
403
404 pub fn read_set(&self) -> &[Vec<u8>] {
406 &self.read_set
407 }
408
409 pub fn commit(&mut self) {
411 self.state = TxnState::Committed;
412 }
413
414 pub fn abort(&mut self) {
416 self.state = TxnState::Aborted;
417 }
418
419 pub fn is_active(&self) -> bool {
421 self.state == TxnState::Active
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428
429 #[test]
430 fn test_version_chain_basic() {
431 let mut chain: VersionChain<String> = VersionChain::new();
432 assert!(chain.is_empty());
433
434 chain.insert("v1".to_string(), TxnId(1), Timestamp(1));
435 assert_eq!(chain.len(), 1);
436
437 chain.update("v2".to_string(), TxnId(2), Timestamp(2));
438 assert_eq!(chain.len(), 2);
439 }
440
441 #[test]
442 fn test_version_visibility() {
443 let mut chain: VersionChain<String> = VersionChain::new();
444
445 chain.insert("v1".to_string(), TxnId(1), Timestamp(1));
447
448 chain.update("v2".to_string(), TxnId(2), Timestamp(2));
450
451 let _snap1 = Snapshot::new(TxnId(3), Timestamp(1));
453 let snap2 = Snapshot::new(TxnId(3), Timestamp(2));
458 assert_eq!(chain.get(&snap2), Some(&"v2".to_string()));
459 }
460
461 #[test]
462 fn test_version_delete() {
463 let mut chain: VersionChain<String> = VersionChain::new();
464
465 chain.insert("v1".to_string(), TxnId(1), Timestamp(1));
466 chain.delete(TxnId(2), Timestamp(2));
467
468 let snap = Snapshot::new(TxnId(3), Timestamp(2));
470 assert!(chain.get(&snap).is_none());
471 }
472
473 #[test]
474 fn test_version_gc() {
475 let mut chain: VersionChain<String> = VersionChain::new();
476
477 chain.insert("v1".to_string(), TxnId(1), Timestamp(1));
478 chain.update("v2".to_string(), TxnId(2), Timestamp(2));
479 chain.update("v3".to_string(), TxnId(3), Timestamp(3));
480 chain.update("v4".to_string(), TxnId(4), Timestamp(4));
481
482 assert_eq!(chain.len(), 4);
483
484 let removed = chain.gc(Timestamp(3));
486 assert!(removed > 0);
487 assert!(chain.len() < 4);
488 }
489
490 #[test]
491 fn test_snapshot() {
492 let snap = Snapshot::new(TxnId(5), Timestamp(10));
493
494 assert!(snap.is_committed(TxnId::ZERO));
496
497 assert!(snap.is_committed(TxnId(3)));
499 }
500
501 #[test]
502 fn test_snapshot_with_active() {
503 let snap = Snapshot::with_active(TxnId(5), Timestamp(10), vec![TxnId(3), TxnId(4)]);
504
505 assert!(!snap.is_committed(TxnId(3)));
507 assert!(!snap.is_committed(TxnId(4)));
508
509 assert!(snap.is_committed(TxnId(1)));
511 assert!(snap.is_committed(TxnId(2)));
512 }
513
514 #[test]
515 fn test_active_transaction() {
516 let mut txn = ActiveTransaction::new(TxnId(1), vec![]);
517
518 assert!(txn.is_active());
519
520 txn.record_read(b"key1");
521 txn.record_write(b"key2");
522
523 assert_eq!(txn.read_set().len(), 1);
524 assert_eq!(txn.write_set().len(), 1);
525
526 txn.commit();
527 assert!(!txn.is_active());
528 assert_eq!(txn.state, TxnState::Committed);
529 }
530
531 #[test]
532 fn test_timestamp_generation() {
533 let ts1 = next_timestamp();
534 let ts2 = next_timestamp();
535 let ts3 = next_timestamp();
536
537 assert!(ts2 > ts1);
538 assert!(ts3 > ts2);
539 }
540}