limbo_core/mvcc/database/mod.rs
1use crate::mvcc::clock::LogicalClock;
2use crate::mvcc::errors::DatabaseError;
3use crate::mvcc::persistent_storage::Storage;
4use crossbeam_skiplist::{SkipMap, SkipSet};
5use std::fmt::Debug;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::RwLock;
8
9pub type Result<T> = std::result::Result<T, DatabaseError>;
10
11#[cfg(test)]
12mod tests;
13
14#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
15pub struct RowID {
16 pub table_id: u64,
17 pub row_id: i64,
18}
19
20impl RowID {
21 pub fn new(table_id: u64, row_id: i64) -> Self {
22 Self { table_id, row_id }
23 }
24}
25
26#[derive(Clone, Debug, PartialEq, PartialOrd)]
27
28pub struct Row {
29 pub id: RowID,
30 pub data: Vec<u8>,
31}
32
33impl Row {
34 pub fn new(id: RowID, data: Vec<u8>) -> Self {
35 Self { id, data }
36 }
37}
38
39/// A row version.
40#[derive(Clone, Debug, PartialEq)]
41pub struct RowVersion {
42 begin: TxTimestampOrID,
43 end: Option<TxTimestampOrID>,
44 row: Row,
45}
46
47pub type TxID = u64;
48
49/// A log record contains all the versions inserted and deleted by a transaction.
50#[derive(Clone, Debug)]
51pub struct LogRecord {
52 pub(crate) tx_timestamp: TxID,
53 row_versions: Vec<RowVersion>,
54}
55
56impl LogRecord {
57 fn new(tx_timestamp: TxID) -> Self {
58 Self {
59 tx_timestamp,
60 row_versions: Vec::new(),
61 }
62 }
63}
64
65/// A transaction timestamp or ID.
66///
67/// Versions either track a timestamp or a transaction ID, depending on the
68/// phase of the transaction. During the active phase, new versions track the
69/// transaction ID in the `begin` and `end` fields. After a transaction commits,
70/// versions switch to tracking timestamps.
71#[derive(Clone, Debug, PartialEq, PartialOrd)]
72enum TxTimestampOrID {
73 /// A committed transaction's timestamp.
74 Timestamp(u64),
75 /// The ID of a non-committed transaction.
76 TxID(TxID),
77}
78
79/// Transaction
80#[derive(Debug)]
81pub struct Transaction {
82 /// The state of the transaction.
83 state: AtomicTransactionState,
84 /// The transaction ID.
85 tx_id: u64,
86 /// The transaction begin timestamp.
87 begin_ts: u64,
88 /// The transaction write set.
89 write_set: SkipSet<RowID>,
90 /// The transaction read set.
91 read_set: SkipSet<RowID>,
92}
93
94impl Transaction {
95 fn new(tx_id: u64, begin_ts: u64) -> Transaction {
96 Transaction {
97 state: TransactionState::Active.into(),
98 tx_id,
99 begin_ts,
100 write_set: SkipSet::new(),
101 read_set: SkipSet::new(),
102 }
103 }
104
105 fn insert_to_read_set(&self, id: RowID) {
106 self.read_set.insert(id);
107 }
108
109 fn insert_to_write_set(&mut self, id: RowID) {
110 self.write_set.insert(id);
111 }
112}
113
114impl std::fmt::Display for Transaction {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
116 write!(
117 f,
118 "{{ state: {}, id: {}, begin_ts: {}, write_set: {:?}, read_set: {:?}",
119 self.state.load(),
120 self.tx_id,
121 self.begin_ts,
122 // FIXME: I'm sorry, we obviously shouldn't be cloning here.
123 self.write_set
124 .iter()
125 .map(|v| *v.value())
126 .collect::<Vec<RowID>>(),
127 self.read_set
128 .iter()
129 .map(|v| *v.value())
130 .collect::<Vec<RowID>>()
131 )
132 }
133}
134
135/// Transaction state.
136#[derive(Debug, Clone, PartialEq)]
137enum TransactionState {
138 Active,
139 Preparing,
140 Aborted,
141 Terminated,
142 Committed(u64),
143}
144
145impl TransactionState {
146 pub fn encode(&self) -> u64 {
147 match self {
148 TransactionState::Active => 0,
149 TransactionState::Preparing => 1,
150 TransactionState::Aborted => 2,
151 TransactionState::Terminated => 3,
152 TransactionState::Committed(ts) => {
153 // We only support 2*62 - 1 timestamps, because the extra bit
154 // is used to encode the type.
155 assert!(ts & 0x8000_0000_0000_0000 == 0);
156 0x8000_0000_0000_0000 | ts
157 }
158 }
159 }
160
161 pub fn decode(v: u64) -> Self {
162 match v {
163 0 => TransactionState::Active,
164 1 => TransactionState::Preparing,
165 2 => TransactionState::Aborted,
166 3 => TransactionState::Terminated,
167 v if v & 0x8000_0000_0000_0000 != 0 => {
168 TransactionState::Committed(v & 0x7fff_ffff_ffff_ffff)
169 }
170 _ => panic!("Invalid transaction state"),
171 }
172 }
173}
174
175// Transaction state encoded into a single 64-bit atomic.
176#[derive(Debug)]
177pub(crate) struct AtomicTransactionState {
178 pub(crate) state: AtomicU64,
179}
180
181impl From<TransactionState> for AtomicTransactionState {
182 fn from(state: TransactionState) -> Self {
183 Self {
184 state: AtomicU64::new(state.encode()),
185 }
186 }
187}
188
189impl From<AtomicTransactionState> for TransactionState {
190 fn from(state: AtomicTransactionState) -> Self {
191 let encoded = state.state.load(Ordering::Acquire);
192 TransactionState::decode(encoded)
193 }
194}
195
196impl std::cmp::PartialEq<TransactionState> for AtomicTransactionState {
197 fn eq(&self, other: &TransactionState) -> bool {
198 &self.load() == other
199 }
200}
201
202impl std::fmt::Display for TransactionState {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
204 match self {
205 TransactionState::Active => write!(f, "Active"),
206 TransactionState::Preparing => write!(f, "Preparing"),
207 TransactionState::Committed(ts) => write!(f, "Committed({ts})"),
208 TransactionState::Aborted => write!(f, "Aborted"),
209 TransactionState::Terminated => write!(f, "Terminated"),
210 }
211 }
212}
213
214impl AtomicTransactionState {
215 fn store(&self, state: TransactionState) {
216 self.state.store(state.encode(), Ordering::Release);
217 }
218
219 fn load(&self) -> TransactionState {
220 TransactionState::decode(self.state.load(Ordering::Acquire))
221 }
222}
223
224/// A multi-version concurrency control database.
225#[derive(Debug)]
226pub struct MvStore<Clock: LogicalClock> {
227 rows: SkipMap<RowID, RwLock<Vec<RowVersion>>>,
228 txs: SkipMap<TxID, RwLock<Transaction>>,
229 tx_ids: AtomicU64,
230 clock: Clock,
231 storage: Storage,
232}
233
234impl<Clock: LogicalClock> MvStore<Clock> {
235 /// Creates a new database.
236 pub fn new(clock: Clock, storage: Storage) -> Self {
237 Self {
238 rows: SkipMap::new(),
239 txs: SkipMap::new(),
240 tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes
241 clock,
242 storage,
243 }
244 }
245
246 /// Inserts a new row into the database.
247 ///
248 /// This function inserts a new `row` into the database within the context
249 /// of the transaction `tx_id`.
250 ///
251 /// # Arguments
252 ///
253 /// * `tx_id` - the ID of the transaction in which to insert the new row.
254 /// * `row` - the row object containing the values to be inserted.
255 ///
256 pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
257 tracing::trace!("insert(tx_id={}, row.id={:?})", tx_id, row.id);
258 let tx = self
259 .txs
260 .get(&tx_id)
261 .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
262 let mut tx = tx.value().write().unwrap();
263 assert_eq!(tx.state, TransactionState::Active);
264 let id = row.id;
265 let row_version = RowVersion {
266 begin: TxTimestampOrID::TxID(tx.tx_id),
267 end: None,
268 row,
269 };
270 tx.insert_to_write_set(id);
271 drop(tx);
272 self.insert_version(id, row_version);
273 Ok(())
274 }
275
276 /// Updates a row in the database with new values.
277 ///
278 /// This function updates an existing row in the database within the
279 /// context of the transaction `tx_id`. The `row` argument identifies the
280 /// row to be updated as `id` and contains the new values to be inserted.
281 ///
282 /// If the row identified by the `id` does not exist, this function does
283 /// nothing and returns `false`. Otherwise, the function updates the row
284 /// with the new values and returns `true`.
285 ///
286 /// # Arguments
287 ///
288 /// * `tx_id` - the ID of the transaction in which to update the new row.
289 /// * `row` - the row object containing the values to be updated.
290 ///
291 /// # Returns
292 ///
293 /// Returns `true` if the row was successfully updated, and `false` otherwise.
294 pub fn update(&self, tx_id: TxID, row: Row) -> Result<bool> {
295 tracing::trace!("update(tx_id={}, row.id={:?})", tx_id, row.id);
296 if !self.delete(tx_id, row.id)? {
297 return Ok(false);
298 }
299 self.insert(tx_id, row)?;
300 Ok(true)
301 }
302
303 /// Inserts a row in the database with new values, previously deleting
304 /// any old data if it existed. Bails on a delete error, e.g. write-write conflict.
305 pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> {
306 tracing::trace!("upsert(tx_id={}, row.id={:?})", tx_id, row.id);
307 self.delete(tx_id, row.id)?;
308 self.insert(tx_id, row)
309 }
310
311 /// Deletes a row from the table with the given `id`.
312 ///
313 /// This function deletes an existing row `id` in the database within the
314 /// context of the transaction `tx_id`.
315 ///
316 /// # Arguments
317 ///
318 /// * `tx_id` - the ID of the transaction in which to delete the new row.
319 /// * `id` - the ID of the row to delete.
320 ///
321 /// # Returns
322 ///
323 /// Returns `true` if the row was successfully deleted, and `false` otherwise.
324 ///
325 pub fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
326 tracing::trace!("delete(tx_id={}, id={:?})", tx_id, id);
327 let row_versions_opt = self.rows.get(&id);
328 if let Some(ref row_versions) = row_versions_opt {
329 let mut row_versions = row_versions.value().write().unwrap();
330 for rv in row_versions.iter_mut().rev() {
331 let tx = self
332 .txs
333 .get(&tx_id)
334 .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
335 let tx = tx.value().read().unwrap();
336 assert_eq!(tx.state, TransactionState::Active);
337 // A transaction cannot delete a version that it cannot see,
338 // nor can it conflict with it.
339 if !rv.is_visible_to(&tx, &self.txs) {
340 continue;
341 }
342 if is_write_write_conflict(&self.txs, &tx, rv) {
343 drop(row_versions);
344 drop(row_versions_opt);
345 drop(tx);
346 self.rollback_tx(tx_id);
347 return Err(DatabaseError::WriteWriteConflict);
348 }
349
350 rv.end = Some(TxTimestampOrID::TxID(tx.tx_id));
351 drop(row_versions);
352 drop(row_versions_opt);
353 drop(tx);
354 let tx = self
355 .txs
356 .get(&tx_id)
357 .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
358 let mut tx = tx.value().write().unwrap();
359 tx.insert_to_write_set(id);
360 return Ok(true);
361 }
362 }
363 Ok(false)
364 }
365
366 /// Retrieves a row from the table with the given `id`.
367 ///
368 /// This operation is performed within the scope of the transaction identified
369 /// by `tx_id`.
370 ///
371 /// # Arguments
372 ///
373 /// * `tx_id` - The ID of the transaction to perform the read operation in.
374 /// * `id` - The ID of the row to retrieve.
375 ///
376 /// # Returns
377 ///
378 /// Returns `Some(row)` with the row data if the row with the given `id` exists,
379 /// and `None` otherwise.
380 pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
381 tracing::trace!("read(tx_id={}, id={:?})", tx_id, id);
382 let tx = self.txs.get(&tx_id).unwrap();
383 let tx = tx.value().read().unwrap();
384 assert_eq!(tx.state, TransactionState::Active);
385 if let Some(row_versions) = self.rows.get(&id) {
386 let row_versions = row_versions.value().read().unwrap();
387 if let Some(rv) = row_versions
388 .iter()
389 .rev()
390 .filter(|rv| rv.is_visible_to(&tx, &self.txs))
391 .next()
392 {
393 tx.insert_to_read_set(id);
394 return Ok(Some(rv.row.clone()));
395 }
396 }
397 Ok(None)
398 }
399
400 /// Gets all row ids in the database.
401 pub fn scan_row_ids(&self) -> Result<Vec<RowID>> {
402 tracing::trace!("scan_row_ids");
403 let keys = self.rows.iter().map(|entry| *entry.key());
404 Ok(keys.collect())
405 }
406
407 /// Gets all row ids in the database for a given table.
408 pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
409 tracing::trace!("scan_row_ids_for_table(table_id={})", table_id);
410 Ok(self
411 .rows
412 .range(
413 RowID {
414 table_id,
415 row_id: 0,
416 }..RowID {
417 table_id,
418 row_id: i64::MAX,
419 },
420 )
421 .map(|entry| *entry.key())
422 .collect())
423 }
424
425 pub fn get_row_id_range(
426 &self,
427 table_id: u64,
428 start: i64,
429 bucket: &mut Vec<RowID>,
430 max_items: u64,
431 ) -> Result<()> {
432 tracing::trace!(
433 "get_row_id_in_range(table_id={}, range_start={})",
434 table_id,
435 start,
436 );
437 let start_id = RowID {
438 table_id,
439 row_id: start,
440 };
441
442 let end_id = RowID {
443 table_id,
444 row_id: i64::MAX,
445 };
446
447 self.rows
448 .range(start_id..end_id)
449 .take(max_items as usize)
450 .for_each(|entry| bucket.push(*entry.key()));
451
452 Ok(())
453 }
454
455 pub fn get_next_row_id_for_table(&self, table_id: u64, start: i64) -> Option<RowID> {
456 tracing::trace!(
457 "getting_next_id_for_table(table_id={}, range_start={})",
458 table_id,
459 start,
460 );
461 let min_bound = RowID {
462 table_id,
463 row_id: start,
464 };
465
466 let max_bound = RowID {
467 table_id,
468 row_id: i64::MAX,
469 };
470
471 self.rows
472 .range(min_bound..max_bound)
473 .next()
474 .map(|entry| *entry.key())
475 }
476
477 /// Begins a new transaction in the database.
478 ///
479 /// This function starts a new transaction in the database and returns a `TxID` value
480 /// that you can use to perform operations within the transaction. All changes made within the
481 /// transaction are isolated from other transactions until you commit the transaction.
482 pub fn begin_tx(&self) -> TxID {
483 let tx_id = self.get_tx_id();
484 let begin_ts = self.get_timestamp();
485 let tx = Transaction::new(tx_id, begin_ts);
486 tracing::trace!("begin_tx(tx_id={})", tx_id);
487 self.txs.insert(tx_id, RwLock::new(tx));
488 tx_id
489 }
490
491 /// Commits a transaction with the specified transaction ID.
492 ///
493 /// This function commits the changes made within the specified transaction and finalizes the
494 /// transaction. Once a transaction has been committed, all changes made within the transaction
495 /// are visible to other transactions that access the same data.
496 ///
497 /// # Arguments
498 ///
499 /// * `tx_id` - The ID of the transaction to commit.
500 pub fn commit_tx(&self, tx_id: TxID) -> Result<()> {
501 let end_ts = self.get_timestamp();
502 // NOTICE: the first shadowed tx keeps the entry alive in the map
503 // for the duration of this whole function, which is important for correctness!
504 let tx = self.txs.get(&tx_id).ok_or(DatabaseError::TxTerminated)?;
505 let tx = tx.value().write().unwrap();
506 match tx.state.load() {
507 TransactionState::Terminated => return Err(DatabaseError::TxTerminated),
508 _ => {
509 assert_eq!(tx.state, TransactionState::Active);
510 }
511 }
512 tx.state.store(TransactionState::Preparing);
513 tracing::trace!("prepare_tx(tx_id={})", tx_id);
514
515 /* TODO: The code we have here is sufficient for snapshot isolation.
516 ** In order to implement serializability, we need the following steps:
517 **
518 ** 1. Validate if all read versions are still visible by inspecting the read_set
519 ** 2. Validate if there are no phantoms by walking the scans from scan_set (which we don't even have yet)
520 ** - a phantom is a version that became visible in the middle of our transaction,
521 ** but wasn't taken into account during one of the scans from the scan_set
522 ** 3. Wait for commit dependencies, which we don't even track yet...
523 ** Excerpt from what's a commit dependency and how it's tracked in the original paper:
524 ** """
525 A transaction T1 has a commit dependency on another transaction
526 T2, if T1 is allowed to commit only if T2 commits. If T2 aborts,
527 T1 must also abort, so cascading aborts are possible. T1 acquires a
528 commit dependency either by speculatively reading or speculatively ignoring a version,
529 instead of waiting for T2 to commit.
530 We implement commit dependencies by a register-and-report
531 approach: T1 registers its dependency with T2 and T2 informs T1
532 when it has committed or aborted. Each transaction T contains a
533 counter, CommitDepCounter, that counts how many unresolved
534 commit dependencies it still has. A transaction cannot commit
535 until this counter is zero. In addition, T has a Boolean variable
536 AbortNow that other transactions can set to tell T to abort. Each
537 transaction T also has a set, CommitDepSet, that stores transaction IDs
538 of the transactions that depend on T.
539 To take a commit dependency on a transaction T2, T1 increments
540 its CommitDepCounter and adds its transaction ID to T2’s CommitDepSet.
541 When T2 has committed, it locates each transaction in
542 its CommitDepSet and decrements their CommitDepCounter. If
543 T2 aborted, it tells the dependent transactions to also abort by
544 setting their AbortNow flags. If a dependent transaction is not
545 found, this means that it has already aborted.
546 Note that a transaction with commit dependencies may not have to
547 wait at all - the dependencies may have been resolved before it is
548 ready to commit. Commit dependencies consolidate all waits into
549 a single wait and postpone the wait to just before commit.
550 Some transactions may have to wait before commit.
551 Waiting raises a concern of deadlocks.
552 However, deadlocks cannot occur because an older transaction never
553 waits on a younger transaction. In
554 a wait-for graph the direction of edges would always be from a
555 younger transaction (higher end timestamp) to an older transaction
556 (lower end timestamp) so cycles are impossible.
557 """
558 ** If you're wondering when a speculative read happens, here you go:
559 ** Case 1: speculative read of TB:
560 """
561 If transaction TB is in the Preparing state, it has acquired an end
562 timestamp TS which will be V’s begin timestamp if TB commits.
563 A safe approach in this situation would be to have transaction T
564 wait until transaction TB commits. However, we want to avoid all
565 blocking during normal processing so instead we continue with
566 the visibility test and, if the test returns true, allow T to
567 speculatively read V. Transaction T acquires a commit dependency on
568 TB, restricting the serialization order of the two transactions. That
569 is, T is allowed to commit only if TB commits.
570 """
571 ** Case 2: speculative ignore of TE:
572 """
573 If TE’s state is Preparing, it has an end timestamp TS that will become
574 the end timestamp of V if TE does commit. If TS is greater than the read
575 time RT, it is obvious that V will be visible if TE commits. If TE
576 aborts, V will still be visible, because any transaction that updates
577 V after TE has aborted will obtain an end timestamp greater than
578 TS. If TS is less than RT, we have a more complicated situation:
579 if TE commits, V will not be visible to T but if TE aborts, it will
580 be visible. We could handle this by forcing T to wait until TE
581 commits or aborts but we want to avoid all blocking during normal processing.
582 Instead we allow T to speculatively ignore V and
583 proceed with its processing. Transaction T acquires a commit
584 dependency (see Section 2.7) on TE, that is, T is allowed to commit
585 only if TE commits.
586 """
587 */
588 tx.state.store(TransactionState::Committed(end_ts));
589 tracing::trace!("commit_tx(tx_id={})", tx_id);
590 let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
591 drop(tx);
592 // Postprocessing: inserting row versions and logging the transaction to persistent storage.
593 // TODO: we should probably save to persistent storage first, and only then update the in-memory structures.
594 let mut log_record = LogRecord::new(end_ts);
595 for ref id in write_set {
596 if let Some(row_versions) = self.rows.get(id) {
597 let mut row_versions = row_versions.value().write().unwrap();
598 for row_version in row_versions.iter_mut() {
599 if let TxTimestampOrID::TxID(id) = row_version.begin {
600 if id == tx_id {
601 // New version is valid STARTING FROM committing transaction's end timestamp
602 // See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
603 row_version.begin = TxTimestampOrID::Timestamp(end_ts);
604 self.insert_version_raw(
605 &mut log_record.row_versions,
606 row_version.clone(),
607 ); // FIXME: optimize cloning out
608 }
609 }
610 if let Some(TxTimestampOrID::TxID(id)) = row_version.end {
611 if id == tx_id {
612 // Old version is valid UNTIL committing transaction's end timestamp
613 // See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
614 row_version.end = Some(TxTimestampOrID::Timestamp(end_ts));
615 self.insert_version_raw(
616 &mut log_record.row_versions,
617 row_version.clone(),
618 ); // FIXME: optimize cloning out
619 }
620 }
621 }
622 }
623 }
624 tracing::trace!("updated(tx_id={})", tx_id);
625 // We have now updated all the versions with a reference to the
626 // transaction ID to a timestamp and can, therefore, remove the
627 // transaction. Please note that when we move to lockless, the
628 // invariant doesn't necessarily hold anymore because another thread
629 // might have speculatively read a version that we want to remove.
630 // But that's a problem for another day.
631 // FIXME: it actually just become a problem for today!!!
632 // TODO: test that reproduces this failure, and then a fix
633 self.txs.remove(&tx_id);
634 if !log_record.row_versions.is_empty() {
635 self.storage.log_tx(log_record)?;
636 }
637 tracing::trace!("logged(tx_id={})", tx_id);
638 Ok(())
639 }
640
641 /// Rolls back a transaction with the specified ID.
642 ///
643 /// This function rolls back a transaction with the specified `tx_id` by
644 /// discarding any changes made by the transaction.
645 ///
646 /// # Arguments
647 ///
648 /// * `tx_id` - The ID of the transaction to abort.
649 pub fn rollback_tx(&self, tx_id: TxID) {
650 let tx_unlocked = self.txs.get(&tx_id).unwrap();
651 let tx = tx_unlocked.value().write().unwrap();
652 assert_eq!(tx.state, TransactionState::Active);
653 tx.state.store(TransactionState::Aborted);
654 tracing::trace!("abort(tx_id={})", tx_id);
655 let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
656 drop(tx);
657
658 for ref id in write_set {
659 if let Some(row_versions) = self.rows.get(id) {
660 let mut row_versions = row_versions.value().write().unwrap();
661 row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id));
662 if row_versions.is_empty() {
663 self.rows.remove(id);
664 }
665 }
666 }
667
668 let tx = tx_unlocked.value().read().unwrap();
669 tx.state.store(TransactionState::Terminated);
670 tracing::trace!("terminate(tx_id={})", tx_id);
671 // FIXME: verify that we can already remove the transaction here!
672 // Maybe it's fine for snapshot isolation, but too early for serializable?
673 self.txs.remove(&tx_id);
674 }
675
676 /// Generates next unique transaction id
677 pub fn get_tx_id(&self) -> u64 {
678 self.tx_ids.fetch_add(1, Ordering::SeqCst)
679 }
680
681 /// Gets current timestamp
682 pub fn get_timestamp(&self) -> u64 {
683 self.clock.get_timestamp()
684 }
685
686 /// Removes unused row versions with very loose heuristics,
687 /// which sometimes leaves versions intact for too long.
688 /// Returns the number of removed versions.
689 pub fn drop_unused_row_versions(&self) -> usize {
690 tracing::trace!(
691 "drop_unused_row_versions() -> txs: {}; rows: {}",
692 self.txs.len(),
693 self.rows.len()
694 );
695 let mut dropped = 0;
696 let mut to_remove = Vec::new();
697 for entry in self.rows.iter() {
698 let mut row_versions = entry.value().write().unwrap();
699 row_versions.retain(|rv| {
700 // FIXME: should take rv.begin into account as well
701 let should_stay = match rv.end {
702 Some(TxTimestampOrID::Timestamp(version_end_ts)) => {
703 // a transaction started before this row version ended, ergo row version is needed
704 // NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable
705 self.txs.iter().any(|tx| {
706 let tx = tx.value().read().unwrap();
707 // FIXME: verify!
708 match tx.state.load() {
709 TransactionState::Active | TransactionState::Preparing => {
710 version_end_ts > tx.begin_ts
711 }
712 _ => false,
713 }
714 })
715 }
716 // Let's skip potentially complex logic if the transafction is still
717 // active/tracked. We will drop the row version when the transaction
718 // gets garbage-collected itself, it will always happen eventually.
719 Some(TxTimestampOrID::TxID(tx_id)) => !self.txs.contains_key(&tx_id),
720 // this row version is current, ergo visible
721 None => true,
722 };
723 if !should_stay {
724 dropped += 1;
725 tracing::trace!(
726 "Dropping row version {:?} {:?}-{:?}",
727 entry.key(),
728 rv.begin,
729 rv.end
730 );
731 }
732 should_stay
733 });
734 if row_versions.is_empty() {
735 to_remove.push(*entry.key());
736 }
737 }
738 for id in to_remove {
739 self.rows.remove(&id);
740 }
741 dropped
742 }
743
744 pub fn recover(&self) -> Result<()> {
745 let tx_log = self.storage.read_tx_log()?;
746 for record in tx_log {
747 tracing::debug!("recover() -> tx_timestamp={}", record.tx_timestamp);
748 for version in record.row_versions {
749 self.insert_version(version.row.id, version);
750 }
751 self.clock.reset(record.tx_timestamp);
752 }
753 Ok(())
754 }
755
756 // Extracts the begin timestamp from a transaction
757 fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 {
758 match ts_or_id {
759 TxTimestampOrID::Timestamp(ts) => *ts,
760 TxTimestampOrID::TxID(tx_id) => {
761 self.txs
762 .get(tx_id)
763 .unwrap()
764 .value()
765 .read()
766 .unwrap()
767 .begin_ts
768 }
769 }
770 }
771
772 /// Inserts a new row version into the database, while making sure that
773 /// the row version is inserted in the correct order.
774 fn insert_version(&self, id: RowID, row_version: RowVersion) {
775 let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new()));
776 let mut versions = versions.value().write().unwrap();
777 self.insert_version_raw(&mut versions, row_version)
778 }
779
780 /// Inserts a new row version into the internal data structure for versions,
781 /// while making sure that the row version is inserted in the correct order.
782 fn insert_version_raw(&self, versions: &mut Vec<RowVersion>, row_version: RowVersion) {
783 // NOTICE: this is an insert a'la insertion sort, with pessimistic linear complexity.
784 // However, we expect the number of versions to be nearly sorted, so we deem it worthy
785 // to search linearly for the insertion point instead of paying the price of using
786 // another data structure, e.g. a BTreeSet. If it proves to be too quadratic empirically,
787 // we can either switch to a tree-like structure, or at least use partition_point()
788 // which performs a binary search for the insertion point.
789 let position = versions
790 .iter()
791 .rposition(|v| {
792 self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin)
793 })
794 .map(|p| p + 1)
795 .unwrap_or(0);
796 if versions.len() - position > 3 {
797 tracing::debug!(
798 "Inserting a row version {} positions from the end",
799 versions.len() - position
800 );
801 }
802 versions.insert(position, row_version);
803 }
804}
805
806/// A write-write conflict happens when transaction T_current attempts to update a
807/// row version that is:
808/// a) currently being updated by an active transaction T_previous, or
809/// b) was updated by an ended transaction T_previous that committed AFTER T_current started
810/// but BEFORE T_previous commits.
811///
812/// "Suppose transaction T wants to update a version V. V is updatable
813/// only if it is the latest version, that is, it has an end timestamp equal
814/// to infinity or its End field contains the ID of a transaction TE and
815/// TE’s state is Aborted"
816/// Ref: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf , page 301,
817/// 2.6. Updating a Version.
818pub(crate) fn is_write_write_conflict(
819 txs: &SkipMap<TxID, RwLock<Transaction>>,
820 tx: &Transaction,
821 rv: &RowVersion,
822) -> bool {
823 match rv.end {
824 Some(TxTimestampOrID::TxID(rv_end)) => {
825 let te = txs.get(&rv_end).unwrap();
826 let te = te.value().read().unwrap();
827 if te.tx_id == tx.tx_id {
828 return false;
829 }
830 te.state.load() != TransactionState::Aborted
831 }
832 // A non-"infinity" end timestamp (here modeled by Some(ts)) functions as a write lock
833 // on the row, so it can never be updated by another transaction.
834 // Ref: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf , page 301,
835 // 2.6. Updating a Version.
836 Some(TxTimestampOrID::Timestamp(_)) => true,
837 None => false,
838 }
839}
840
841impl RowVersion {
842 pub fn is_visible_to(
843 &self,
844 tx: &Transaction,
845 txs: &SkipMap<TxID, RwLock<Transaction>>,
846 ) -> bool {
847 is_begin_visible(txs, tx, self) && is_end_visible(txs, tx, self)
848 }
849}
850
851fn is_begin_visible(
852 txs: &SkipMap<TxID, RwLock<Transaction>>,
853 tx: &Transaction,
854 rv: &RowVersion,
855) -> bool {
856 match rv.begin {
857 TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts,
858 TxTimestampOrID::TxID(rv_begin) => {
859 let tb = txs.get(&rv_begin).unwrap();
860 let tb = tb.value().read().unwrap();
861 let visible = match tb.state.load() {
862 TransactionState::Active => tx.tx_id == tb.tx_id && rv.end.is_none(),
863 TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable!
864 TransactionState::Committed(committed_ts) => tx.begin_ts >= committed_ts,
865 TransactionState::Aborted => false,
866 TransactionState::Terminated => {
867 tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now");
868 false
869 }
870 };
871 tracing::trace!(
872 "is_begin_visible: tx={tx}, tb={tb} rv = {:?}-{:?} visible = {visible}",
873 rv.begin,
874 rv.end
875 );
876 visible
877 }
878 }
879}
880
881fn is_end_visible(
882 txs: &SkipMap<TxID, RwLock<Transaction>>,
883 tx: &Transaction,
884 rv: &RowVersion,
885) -> bool {
886 match rv.end {
887 Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts,
888 Some(TxTimestampOrID::TxID(rv_end)) => {
889 let te = txs.get(&rv_end).unwrap();
890 let te = te.value().read().unwrap();
891 let visible = match te.state.load() {
892 TransactionState::Active => tx.tx_id != te.tx_id,
893 TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable!
894 TransactionState::Committed(committed_ts) => tx.begin_ts < committed_ts,
895 TransactionState::Aborted => false,
896 TransactionState::Terminated => {
897 tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now");
898 false
899 }
900 };
901 tracing::trace!(
902 "is_end_visible: tx={tx}, te={te} rv = {:?}-{:?} visible = {visible}",
903 rv.begin,
904 rv.end
905 );
906 visible
907 }
908 None => true,
909 }
910}