1use crate::{Error, Result};
7use std::collections::{BTreeMap, HashMap};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, RwLock};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
14pub enum IsolationLevel {
15 ReadUncommitted,
17 ReadCommitted,
19 #[default]
21 RepeatableRead,
22 Serializable,
24}
25
26pub type TransactionId = u64;
28
29pub type Timestamp = u64;
31
32#[derive(Debug, Clone)]
34pub struct VersionedValue {
35 pub value: Option<Vec<u8>>,
37 pub txn_id: TransactionId,
39 pub created_at: Timestamp,
41 pub deleted_at: Option<Timestamp>,
43 pub committed: bool,
45}
46
47impl VersionedValue {
48 pub fn is_visible(&self, snapshot_ts: Timestamp, current_txn_id: TransactionId) -> bool {
50 if !self.committed && self.txn_id != current_txn_id {
52 return false;
53 }
54
55 if self.created_at > snapshot_ts && self.txn_id != current_txn_id {
57 return false;
58 }
59
60 if let Some(deleted_ts) = self.deleted_at {
62 if deleted_ts <= snapshot_ts {
63 return false;
64 }
65 }
66
67 self.value.is_some()
69 }
70}
71
72#[derive(Debug, Clone)]
74pub struct VersionChain {
75 versions: Vec<VersionedValue>,
77}
78
79impl VersionChain {
80 pub fn new() -> Self {
82 Self {
83 versions: Vec::new(),
84 }
85 }
86
87 pub fn add_version(&mut self, version: VersionedValue) {
89 self.versions.insert(0, version);
90 }
91
92 pub fn get_visible(
94 &self,
95 snapshot_ts: Timestamp,
96 current_txn_id: TransactionId,
97 ) -> Option<Vec<u8>> {
98 for version in &self.versions {
99 if version.is_visible(snapshot_ts, current_txn_id) {
100 return version.value.clone();
101 }
102 }
103 None
104 }
105
106 pub fn commit_transaction(&mut self, txn_id: TransactionId) {
108 for version in &mut self.versions {
109 if version.txn_id == txn_id {
110 version.committed = true;
111 }
112 }
113 }
114
115 pub fn rollback_transaction(&mut self, txn_id: TransactionId) {
117 self.versions.retain(|v| v.txn_id != txn_id);
118 }
119
120 pub fn gc(&mut self, min_active_ts: Timestamp) {
122 let mut found_visible = false;
124 self.versions.retain(|v| {
125 if found_visible && v.committed && v.created_at < min_active_ts {
126 false
127 } else {
128 if v.committed && v.created_at <= min_active_ts {
129 found_visible = true;
130 }
131 true
132 }
133 });
134 }
135}
136
137impl Default for VersionChain {
138 fn default() -> Self {
139 Self::new()
140 }
141}
142
143pub struct MVCCStorage {
145 data: RwLock<HashMap<Vec<u8>, VersionChain>>,
147}
148
149impl MVCCStorage {
150 pub fn new() -> Self {
152 Self {
153 data: RwLock::new(HashMap::new()),
154 }
155 }
156
157 pub fn read(
159 &self,
160 key: &[u8],
161 snapshot_ts: Timestamp,
162 txn_id: TransactionId,
163 ) -> Result<Option<Vec<u8>>> {
164 let data = self.data.read().map_err(|_| Error::LockPoisoned)?;
165
166 if let Some(chain) = data.get(key) {
167 Ok(chain.get_visible(snapshot_ts, txn_id))
168 } else {
169 Ok(None)
170 }
171 }
172
173 pub fn write(
175 &self,
176 key: Vec<u8>,
177 value: Vec<u8>,
178 txn_id: TransactionId,
179 timestamp: Timestamp,
180 ) -> Result<()> {
181 let mut data = self.data.write().map_err(|_| Error::LockPoisoned)?;
182
183 let chain = data.entry(key).or_insert_with(VersionChain::new);
184
185 chain.add_version(VersionedValue {
186 value: Some(value),
187 txn_id,
188 created_at: timestamp,
189 deleted_at: None,
190 committed: false,
191 });
192
193 Ok(())
194 }
195
196 pub fn delete(&self, key: &[u8], txn_id: TransactionId, timestamp: Timestamp) -> Result<()> {
198 let mut data = self.data.write().map_err(|_| Error::LockPoisoned)?;
199
200 let chain = data.entry(key.to_vec()).or_insert_with(VersionChain::new);
201
202 if let Some(prev) = chain.versions.first_mut() {
204 if prev.txn_id != txn_id {
205 prev.deleted_at = Some(timestamp);
206 }
207 }
208
209 chain.add_version(VersionedValue {
211 value: None,
212 txn_id,
213 created_at: timestamp,
214 deleted_at: None,
215 committed: false,
216 });
217
218 Ok(())
219 }
220
221 pub fn commit(&self, txn_id: TransactionId) -> Result<()> {
223 let mut data = self.data.write().map_err(|_| Error::LockPoisoned)?;
224
225 for chain in data.values_mut() {
226 chain.commit_transaction(txn_id);
227 }
228
229 Ok(())
230 }
231
232 pub fn rollback(&self, txn_id: TransactionId) -> Result<()> {
234 let mut data = self.data.write().map_err(|_| Error::LockPoisoned)?;
235
236 for chain in data.values_mut() {
237 chain.rollback_transaction(txn_id);
238 }
239
240 data.retain(|_, chain| !chain.versions.is_empty());
242
243 Ok(())
244 }
245
246 pub fn gc(&self, min_active_ts: Timestamp) -> Result<()> {
248 let mut data = self.data.write().map_err(|_| Error::LockPoisoned)?;
249
250 for chain in data.values_mut() {
251 chain.gc(min_active_ts);
252 }
253
254 Ok(())
255 }
256
257 pub fn scan_prefix(
259 &self,
260 prefix: &[u8],
261 snapshot_ts: Timestamp,
262 txn_id: TransactionId,
263 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
264 let data = self.data.read().map_err(|_| Error::LockPoisoned)?;
265
266 let mut results = Vec::new();
267 for (key, chain) in data.iter() {
268 if key.starts_with(prefix) {
269 if let Some(value) = chain.get_visible(snapshot_ts, txn_id) {
270 results.push((key.clone(), value));
271 }
272 }
273 }
274
275 results.sort_by(|a, b| a.0.cmp(&b.0));
276 Ok(results)
277 }
278}
279
280impl Default for MVCCStorage {
281 fn default() -> Self {
282 Self::new()
283 }
284}
285
286#[derive(Debug, Clone)]
288#[allow(dead_code)]
289struct ActiveTransaction {
290 txn_id: TransactionId,
291 snapshot_ts: Timestamp,
292 isolation: IsolationLevel,
293}
294
295pub struct TransactionManager {
297 next_txn_id: AtomicU64,
299 next_timestamp: AtomicU64,
301 active_txns: RwLock<BTreeMap<TransactionId, ActiveTransaction>>,
303 storage: Arc<MVCCStorage>,
305 self_ref: RwLock<Option<std::sync::Weak<TransactionManager>>>,
307}
308
309impl TransactionManager {
310 pub fn new(storage: Arc<MVCCStorage>) -> Arc<Self> {
312 let manager = Arc::new(Self {
313 next_txn_id: AtomicU64::new(1),
314 next_timestamp: AtomicU64::new(Self::current_timestamp()),
315 active_txns: RwLock::new(BTreeMap::new()),
316 storage,
317 self_ref: RwLock::new(None),
318 });
319
320 *manager.self_ref.write().unwrap() = Some(Arc::downgrade(&manager));
322 manager
323 }
324
325 fn current_timestamp() -> u64 {
327 SystemTime::now()
328 .duration_since(UNIX_EPOCH)
329 .unwrap()
330 .as_millis() as u64
331 }
332
333 fn next_txn_id(&self) -> TransactionId {
335 self.next_txn_id.fetch_add(1, Ordering::SeqCst)
336 }
337
338 fn next_timestamp(&self) -> Timestamp {
340 self.next_timestamp.fetch_add(1, Ordering::SeqCst)
341 }
342
343 pub fn begin(self: &Arc<Self>, isolation: IsolationLevel) -> Result<Transaction> {
345 let txn_id = self.next_txn_id();
346 let snapshot_ts = self.next_timestamp();
347
348 let active_txn = ActiveTransaction {
349 txn_id,
350 snapshot_ts,
351 isolation,
352 };
353
354 {
355 let mut active = self.active_txns.write().map_err(|_| Error::LockPoisoned)?;
356 active.insert(txn_id, active_txn.clone());
357 }
358
359 Ok(Transaction {
360 txn_id,
361 snapshot_ts,
362 isolation,
363 storage: Arc::clone(&self.storage),
364 manager: Some(Arc::clone(self)),
365 write_set: RwLock::new(HashMap::new()),
366 committed: false,
367 })
368 }
369
370 pub fn commit(&self, txn_id: TransactionId) -> Result<()> {
372 self.storage.commit(txn_id)?;
377
378 {
380 let mut active = self.active_txns.write().map_err(|_| Error::LockPoisoned)?;
381 active.remove(&txn_id);
382 }
383
384 Ok(())
385 }
386
387 pub fn rollback(&self, txn_id: TransactionId) -> Result<()> {
389 self.storage.rollback(txn_id)?;
391
392 {
394 let mut active = self.active_txns.write().map_err(|_| Error::LockPoisoned)?;
395 active.remove(&txn_id);
396 }
397
398 Ok(())
399 }
400
401 pub fn gc(&self) -> Result<()> {
403 let min_active_ts = {
405 let active = self.active_txns.read().map_err(|_| Error::LockPoisoned)?;
406 active
407 .values()
408 .map(|txn| txn.snapshot_ts)
409 .min()
410 .unwrap_or(self.next_timestamp())
411 };
412
413 self.storage.gc(min_active_ts)
414 }
415}
416
417pub struct Transaction {
419 pub txn_id: TransactionId,
421 snapshot_ts: Timestamp,
423 isolation: IsolationLevel,
425 storage: Arc<MVCCStorage>,
427 manager: Option<Arc<TransactionManager>>,
429 write_set: RwLock<HashMap<Vec<u8>, Vec<u8>>>,
431 committed: bool,
433}
434
435impl Transaction {
436 pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
438 {
440 let write_set = self.write_set.read().map_err(|_| Error::LockPoisoned)?;
441 if let Some(value) = write_set.get(key) {
442 return Ok(Some(value.clone()));
443 }
444 }
445
446 self.storage.read(key, self.snapshot_ts, self.txn_id)
448 }
449
450 pub fn put(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
452 {
454 let mut write_set = self.write_set.write().map_err(|_| Error::LockPoisoned)?;
455 write_set.insert(key.clone(), value.clone());
456 }
457
458 self.storage
460 .write(key, value, self.txn_id, self.snapshot_ts)
461 }
462
463 pub fn delete(&mut self, key: &[u8]) -> Result<()> {
465 {
467 let mut write_set = self.write_set.write().map_err(|_| Error::LockPoisoned)?;
468 write_set.remove(key);
469 }
470
471 self.storage.delete(key, self.txn_id, self.snapshot_ts)
472 }
473
474 pub fn scan(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
476 self.storage
477 .scan_prefix(prefix, self.snapshot_ts, self.txn_id)
478 }
479
480 pub fn commit(mut self) -> Result<()> {
482 if self.committed {
483 return Err(Error::Transaction("Transaction already committed".into()));
484 }
485
486 if let Some(manager) = &self.manager {
487 manager.commit(self.txn_id)?;
488 } else {
489 self.storage.commit(self.txn_id)?;
490 }
491
492 self.committed = true;
493 Ok(())
494 }
495
496 pub fn rollback(self) -> Result<()> {
498 if self.committed {
499 return Err(Error::Transaction("Transaction already committed".into()));
500 }
501
502 if let Some(manager) = &self.manager {
503 manager.rollback(self.txn_id)
504 } else {
505 self.storage.rollback(self.txn_id)
506 }
507 }
508
509 pub fn id(&self) -> TransactionId {
511 self.txn_id
512 }
513
514 pub fn isolation_level(&self) -> IsolationLevel {
516 self.isolation
517 }
518}