start/db/
recovery_unit.rs

1use std::{cell::RefCell, rc::Rc};
2
3use log::trace;
4
5use super::storage::start_storage::StartStorage;
6
7/// Write operation, contains information:
8/// 
9/// Offset to certain place in physical db
10/// 
11/// Old-data for rollback
12/// 
13/// New data to write for commit
14pub struct WriteOp {
15    offset: usize,
16    new_data: Vec<u8>,
17    old_data: Vec<u8>,
18}
19
20/// RecoveryUnit is core concept of database
21/// 
22/// It provides utils for `Atomic transactions`
23/// 
24/// Stores changes that user written inside
25///
26/// Commit it to storage on commit operation,
27/// 
28/// or rollback it
29pub struct RecoveryUnit {
30    storage: Rc<RefCell<StartStorage>>,
31    pending_ops: Vec<WriteOp>,
32    committed: bool
33}
34
35impl RecoveryUnit {
36    pub fn new(storage: Rc<RefCell<StartStorage>>) -> Self {
37        Self {
38            storage,
39            pending_ops: vec![],
40            committed: false,
41        }
42    }
43
44    /// Operation to add pending operation, that might be written on storage on commit
45    pub fn write(&mut self, offset: usize, data: &[u8]) {
46        let old = self.storage.borrow()[offset..offset+data.len()].to_vec();
47
48        self.pending_ops.push(WriteOp {
49            offset,
50            new_data: data.to_vec(),
51            old_data: old,
52        });
53    }
54
55    /// Apply all actual pending operations
56    pub fn commit(&mut self) {
57        let mut ss = self.storage.borrow_mut();
58        for op in self.pending_ops.iter() {
59            trace!("Commiting op");
60            trace!("{}: '{:?}' to '{:?}'", op.offset, op.old_data, op.new_data);
61            ss[op.offset..op.offset+op.new_data.len()].copy_from_slice(&op.new_data);
62        }
63        self.committed = true;
64    }
65
66    /// Clear pending operations
67    pub fn rollback(&mut self) {
68        self.pending_ops.clear();
69    }
70
71    /// Gives actual content of database with apply of recovery unit context
72    pub fn effective_view(&self, offset: usize, len: usize) -> Vec<u8> {
73        // Start with the base data from the storage
74        let mut result = self.storage.borrow()[offset..offset + len].to_vec();
75
76        // Apply all pending writes that affect this range
77        for op in &self.pending_ops {
78            let op_start = op.offset;
79            let op_end = op.offset + op.new_data.len();
80            let view_start = offset;
81            let view_end = offset + len;
82
83            // Find overlap
84            if op_end > view_start && op_start < view_end {
85                let overlap_start = op_start.max(view_start);
86                let overlap_end = op_end.min(view_end);
87                let result_start = overlap_start - view_start;
88                let op_data_start = overlap_start - op_start;
89
90                let count = overlap_end - overlap_start;
91                result[result_start..result_start + count]
92                    .copy_from_slice(&op.new_data[op_data_start..op_data_start + count]);
93            }
94        }
95
96        result
97    }
98
99    pub fn is_committed(&self) -> bool {
100        self.committed
101    }
102}
103
104/// Like RAII in Mongo DB
105impl Drop for RecoveryUnit {
106    fn drop(&mut self) {
107        if !self.committed {
108            self.rollback();
109        }
110    }
111}
112
113#[test]
114fn test_atomic_commit_and_rollback() {
115    use std::rc::Rc;
116    use std::cell::RefCell;
117
118    // Set up initial storage
119    let storage = Rc::new(RefCell::new(StartStorage::in_memory()));
120    storage.borrow_mut().resize(16).unwrap();
121    {
122        let mut s = storage.borrow_mut();
123        s[0..4].copy_from_slice(&[1, 2, 3, 4]);
124        s[4..8].copy_from_slice(&[5, 6, 7, 8]);
125    }
126
127    // Simulate commit
128    {
129        let mut ru = RecoveryUnit::new(storage.clone());
130        ru.write(0, &[10, 11, 12, 13]);
131        ru.write(4, &[20, 21, 22, 23]);
132        ru.commit();
133        assert!(ru.is_committed());
134    }
135
136    {
137        let s = storage.borrow();
138        assert_eq!(&s[0..4], &[10, 11, 12, 13]);
139        assert_eq!(&s[4..8], &[20, 21, 22, 23]);
140    }
141
142    // Simulate rollback
143    {
144        let mut ru = RecoveryUnit::new(storage.clone());
145        ru.write(0, &[100, 101, 102, 103]);
146        ru.write(4, &[200, 201, 202, 203]);
147        assert!(!ru.is_committed()); // hasn't been committed yet
148        // ru drops without commit => triggers rollback
149    }
150
151    {
152        let s = storage.borrow();
153        assert_eq!(&s[0..4], &[10, 11, 12, 13]);
154        assert_eq!(&s[4..8], &[20, 21, 22, 23]);
155    }
156}