Skip to main content

limbo_core/mvcc/database/
mod.rs

1use crate::mvcc::clock::LogicalClock;
2use crate::mvcc::errors::DatabaseError;
3use crate::mvcc::persistent_storage::Storage;
4use crossbeam_skiplist::{SkipMap, SkipSet};
5use std::fmt::Debug;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::RwLock;
8
9pub type Result<T> = std::result::Result<T, DatabaseError>;
10
11#[cfg(test)]
12mod tests;
13
14#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
15pub struct RowID {
16    pub table_id: u64,
17    pub row_id: i64,
18}
19
20impl RowID {
21    pub fn new(table_id: u64, row_id: i64) -> Self {
22        Self { table_id, row_id }
23    }
24}
25
26#[derive(Clone, Debug, PartialEq, PartialOrd)]
27
28pub struct Row {
29    pub id: RowID,
30    pub data: Vec<u8>,
31}
32
33impl Row {
34    pub fn new(id: RowID, data: Vec<u8>) -> Self {
35        Self { id, data }
36    }
37}
38
39/// A row version.
40#[derive(Clone, Debug, PartialEq)]
41pub struct RowVersion {
42    begin: TxTimestampOrID,
43    end: Option<TxTimestampOrID>,
44    row: Row,
45}
46
47pub type TxID = u64;
48
49/// A log record contains all the versions inserted and deleted by a transaction.
50#[derive(Clone, Debug)]
51pub struct LogRecord {
52    pub(crate) tx_timestamp: TxID,
53    row_versions: Vec<RowVersion>,
54}
55
56impl LogRecord {
57    fn new(tx_timestamp: TxID) -> Self {
58        Self {
59            tx_timestamp,
60            row_versions: Vec::new(),
61        }
62    }
63}
64
65/// A transaction timestamp or ID.
66///
67/// Versions either track a timestamp or a transaction ID, depending on the
68/// phase of the transaction. During the active phase, new versions track the
69/// transaction ID in the `begin` and `end` fields. After a transaction commits,
70/// versions switch to tracking timestamps.
71#[derive(Clone, Debug, PartialEq, PartialOrd)]
72enum TxTimestampOrID {
73    /// A committed transaction's timestamp.
74    Timestamp(u64),
75    /// The ID of a non-committed transaction.
76    TxID(TxID),
77}
78
79/// Transaction
80#[derive(Debug)]
81pub struct Transaction {
82    /// The state of the transaction.
83    state: AtomicTransactionState,
84    /// The transaction ID.
85    tx_id: u64,
86    /// The transaction begin timestamp.
87    begin_ts: u64,
88    /// The transaction write set.
89    write_set: SkipSet<RowID>,
90    /// The transaction read set.
91    read_set: SkipSet<RowID>,
92}
93
94impl Transaction {
95    fn new(tx_id: u64, begin_ts: u64) -> Transaction {
96        Transaction {
97            state: TransactionState::Active.into(),
98            tx_id,
99            begin_ts,
100            write_set: SkipSet::new(),
101            read_set: SkipSet::new(),
102        }
103    }
104
105    fn insert_to_read_set(&self, id: RowID) {
106        self.read_set.insert(id);
107    }
108
109    fn insert_to_write_set(&mut self, id: RowID) {
110        self.write_set.insert(id);
111    }
112}
113
114impl std::fmt::Display for Transaction {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
116        write!(
117            f,
118            "{{ state: {}, id: {}, begin_ts: {}, write_set: {:?}, read_set: {:?}",
119            self.state.load(),
120            self.tx_id,
121            self.begin_ts,
122            // FIXME: I'm sorry, we obviously shouldn't be cloning here.
123            self.write_set
124                .iter()
125                .map(|v| *v.value())
126                .collect::<Vec<RowID>>(),
127            self.read_set
128                .iter()
129                .map(|v| *v.value())
130                .collect::<Vec<RowID>>()
131        )
132    }
133}
134
135/// Transaction state.
136#[derive(Debug, Clone, PartialEq)]
137enum TransactionState {
138    Active,
139    Preparing,
140    Aborted,
141    Terminated,
142    Committed(u64),
143}
144
145impl TransactionState {
146    pub fn encode(&self) -> u64 {
147        match self {
148            TransactionState::Active => 0,
149            TransactionState::Preparing => 1,
150            TransactionState::Aborted => 2,
151            TransactionState::Terminated => 3,
152            TransactionState::Committed(ts) => {
153                // We only support 2*62 - 1 timestamps, because the extra bit
154                // is used to encode the type.
155                assert!(ts & 0x8000_0000_0000_0000 == 0);
156                0x8000_0000_0000_0000 | ts
157            }
158        }
159    }
160
161    pub fn decode(v: u64) -> Self {
162        match v {
163            0 => TransactionState::Active,
164            1 => TransactionState::Preparing,
165            2 => TransactionState::Aborted,
166            3 => TransactionState::Terminated,
167            v if v & 0x8000_0000_0000_0000 != 0 => {
168                TransactionState::Committed(v & 0x7fff_ffff_ffff_ffff)
169            }
170            _ => panic!("Invalid transaction state"),
171        }
172    }
173}
174
175// Transaction state encoded into a single 64-bit atomic.
176#[derive(Debug)]
177pub(crate) struct AtomicTransactionState {
178    pub(crate) state: AtomicU64,
179}
180
181impl From<TransactionState> for AtomicTransactionState {
182    fn from(state: TransactionState) -> Self {
183        Self {
184            state: AtomicU64::new(state.encode()),
185        }
186    }
187}
188
189impl From<AtomicTransactionState> for TransactionState {
190    fn from(state: AtomicTransactionState) -> Self {
191        let encoded = state.state.load(Ordering::Acquire);
192        TransactionState::decode(encoded)
193    }
194}
195
196impl std::cmp::PartialEq<TransactionState> for AtomicTransactionState {
197    fn eq(&self, other: &TransactionState) -> bool {
198        &self.load() == other
199    }
200}
201
202impl std::fmt::Display for TransactionState {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
204        match self {
205            TransactionState::Active => write!(f, "Active"),
206            TransactionState::Preparing => write!(f, "Preparing"),
207            TransactionState::Committed(ts) => write!(f, "Committed({ts})"),
208            TransactionState::Aborted => write!(f, "Aborted"),
209            TransactionState::Terminated => write!(f, "Terminated"),
210        }
211    }
212}
213
214impl AtomicTransactionState {
215    fn store(&self, state: TransactionState) {
216        self.state.store(state.encode(), Ordering::Release);
217    }
218
219    fn load(&self) -> TransactionState {
220        TransactionState::decode(self.state.load(Ordering::Acquire))
221    }
222}
223
224/// A multi-version concurrency control database.
225#[derive(Debug)]
226pub struct MvStore<Clock: LogicalClock> {
227    rows: SkipMap<RowID, RwLock<Vec<RowVersion>>>,
228    txs: SkipMap<TxID, RwLock<Transaction>>,
229    tx_ids: AtomicU64,
230    clock: Clock,
231    storage: Storage,
232}
233
234impl<Clock: LogicalClock> MvStore<Clock> {
235    /// Creates a new database.
236    pub fn new(clock: Clock, storage: Storage) -> Self {
237        Self {
238            rows: SkipMap::new(),
239            txs: SkipMap::new(),
240            tx_ids: AtomicU64::new(1), // let's reserve transaction 0 for special purposes
241            clock,
242            storage,
243        }
244    }
245
246    /// Inserts a new row into the database.
247    ///
248    /// This function inserts a new `row` into the database within the context
249    /// of the transaction `tx_id`.
250    ///
251    /// # Arguments
252    ///
253    /// * `tx_id` - the ID of the transaction in which to insert the new row.
254    /// * `row` - the row object containing the values to be inserted.
255    ///
256    pub fn insert(&self, tx_id: TxID, row: Row) -> Result<()> {
257        tracing::trace!("insert(tx_id={}, row.id={:?})", tx_id, row.id);
258        let tx = self
259            .txs
260            .get(&tx_id)
261            .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
262        let mut tx = tx.value().write().unwrap();
263        assert_eq!(tx.state, TransactionState::Active);
264        let id = row.id;
265        let row_version = RowVersion {
266            begin: TxTimestampOrID::TxID(tx.tx_id),
267            end: None,
268            row,
269        };
270        tx.insert_to_write_set(id);
271        drop(tx);
272        self.insert_version(id, row_version);
273        Ok(())
274    }
275
276    /// Updates a row in the database with new values.
277    ///
278    /// This function updates an existing row in the database within the
279    /// context of the transaction `tx_id`. The `row` argument identifies the
280    /// row to be updated as `id` and contains the new values to be inserted.
281    ///
282    /// If the row identified by the `id` does not exist, this function does
283    /// nothing and returns `false`. Otherwise, the function updates the row
284    /// with the new values and returns `true`.
285    ///
286    /// # Arguments
287    ///
288    /// * `tx_id` - the ID of the transaction in which to update the new row.
289    /// * `row` - the row object containing the values to be updated.
290    ///
291    /// # Returns
292    ///
293    /// Returns `true` if the row was successfully updated, and `false` otherwise.
294    pub fn update(&self, tx_id: TxID, row: Row) -> Result<bool> {
295        tracing::trace!("update(tx_id={}, row.id={:?})", tx_id, row.id);
296        if !self.delete(tx_id, row.id)? {
297            return Ok(false);
298        }
299        self.insert(tx_id, row)?;
300        Ok(true)
301    }
302
303    /// Inserts a row in the database with new values, previously deleting
304    /// any old data if it existed. Bails on a delete error, e.g. write-write conflict.
305    pub fn upsert(&self, tx_id: TxID, row: Row) -> Result<()> {
306        tracing::trace!("upsert(tx_id={}, row.id={:?})", tx_id, row.id);
307        self.delete(tx_id, row.id)?;
308        self.insert(tx_id, row)
309    }
310
311    /// Deletes a row from the table with the given `id`.
312    ///
313    /// This function deletes an existing row `id` in the database within the
314    /// context of the transaction `tx_id`.
315    ///
316    /// # Arguments
317    ///
318    /// * `tx_id` - the ID of the transaction in which to delete the new row.
319    /// * `id` - the ID of the row to delete.
320    ///
321    /// # Returns
322    ///
323    /// Returns `true` if the row was successfully deleted, and `false` otherwise.
324    ///
325    pub fn delete(&self, tx_id: TxID, id: RowID) -> Result<bool> {
326        tracing::trace!("delete(tx_id={}, id={:?})", tx_id, id);
327        let row_versions_opt = self.rows.get(&id);
328        if let Some(ref row_versions) = row_versions_opt {
329            let mut row_versions = row_versions.value().write().unwrap();
330            for rv in row_versions.iter_mut().rev() {
331                let tx = self
332                    .txs
333                    .get(&tx_id)
334                    .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
335                let tx = tx.value().read().unwrap();
336                assert_eq!(tx.state, TransactionState::Active);
337                // A transaction cannot delete a version that it cannot see,
338                // nor can it conflict with it.
339                if !rv.is_visible_to(&tx, &self.txs) {
340                    continue;
341                }
342                if is_write_write_conflict(&self.txs, &tx, rv) {
343                    drop(row_versions);
344                    drop(row_versions_opt);
345                    drop(tx);
346                    self.rollback_tx(tx_id);
347                    return Err(DatabaseError::WriteWriteConflict);
348                }
349
350                rv.end = Some(TxTimestampOrID::TxID(tx.tx_id));
351                drop(row_versions);
352                drop(row_versions_opt);
353                drop(tx);
354                let tx = self
355                    .txs
356                    .get(&tx_id)
357                    .ok_or(DatabaseError::NoSuchTransactionID(tx_id))?;
358                let mut tx = tx.value().write().unwrap();
359                tx.insert_to_write_set(id);
360                return Ok(true);
361            }
362        }
363        Ok(false)
364    }
365
366    /// Retrieves a row from the table with the given `id`.
367    ///
368    /// This operation is performed within the scope of the transaction identified
369    /// by `tx_id`.
370    ///
371    /// # Arguments
372    ///
373    /// * `tx_id` - The ID of the transaction to perform the read operation in.
374    /// * `id` - The ID of the row to retrieve.
375    ///
376    /// # Returns
377    ///
378    /// Returns `Some(row)` with the row data if the row with the given `id` exists,
379    /// and `None` otherwise.
380    pub fn read(&self, tx_id: TxID, id: RowID) -> Result<Option<Row>> {
381        tracing::trace!("read(tx_id={}, id={:?})", tx_id, id);
382        let tx = self.txs.get(&tx_id).unwrap();
383        let tx = tx.value().read().unwrap();
384        assert_eq!(tx.state, TransactionState::Active);
385        if let Some(row_versions) = self.rows.get(&id) {
386            let row_versions = row_versions.value().read().unwrap();
387            if let Some(rv) = row_versions
388                .iter()
389                .rev()
390                .filter(|rv| rv.is_visible_to(&tx, &self.txs))
391                .next()
392            {
393                tx.insert_to_read_set(id);
394                return Ok(Some(rv.row.clone()));
395            }
396        }
397        Ok(None)
398    }
399
400    /// Gets all row ids in the database.
401    pub fn scan_row_ids(&self) -> Result<Vec<RowID>> {
402        tracing::trace!("scan_row_ids");
403        let keys = self.rows.iter().map(|entry| *entry.key());
404        Ok(keys.collect())
405    }
406
407    /// Gets all row ids in the database for a given table.
408    pub fn scan_row_ids_for_table(&self, table_id: u64) -> Result<Vec<RowID>> {
409        tracing::trace!("scan_row_ids_for_table(table_id={})", table_id);
410        Ok(self
411            .rows
412            .range(
413                RowID {
414                    table_id,
415                    row_id: 0,
416                }..RowID {
417                    table_id,
418                    row_id: i64::MAX,
419                },
420            )
421            .map(|entry| *entry.key())
422            .collect())
423    }
424
425    pub fn get_row_id_range(
426        &self,
427        table_id: u64,
428        start: i64,
429        bucket: &mut Vec<RowID>,
430        max_items: u64,
431    ) -> Result<()> {
432        tracing::trace!(
433            "get_row_id_in_range(table_id={}, range_start={})",
434            table_id,
435            start,
436        );
437        let start_id = RowID {
438            table_id,
439            row_id: start,
440        };
441
442        let end_id = RowID {
443            table_id,
444            row_id: i64::MAX,
445        };
446
447        self.rows
448            .range(start_id..end_id)
449            .take(max_items as usize)
450            .for_each(|entry| bucket.push(*entry.key()));
451
452        Ok(())
453    }
454
455    pub fn get_next_row_id_for_table(&self, table_id: u64, start: i64) -> Option<RowID> {
456        tracing::trace!(
457            "getting_next_id_for_table(table_id={}, range_start={})",
458            table_id,
459            start,
460        );
461        let min_bound = RowID {
462            table_id,
463            row_id: start,
464        };
465
466        let max_bound = RowID {
467            table_id,
468            row_id: i64::MAX,
469        };
470
471        self.rows
472            .range(min_bound..max_bound)
473            .next()
474            .map(|entry| *entry.key())
475    }
476
477    /// Begins a new transaction in the database.
478    ///
479    /// This function starts a new transaction in the database and returns a `TxID` value
480    /// that you can use to perform operations within the transaction. All changes made within the
481    /// transaction are isolated from other transactions until you commit the transaction.
482    pub fn begin_tx(&self) -> TxID {
483        let tx_id = self.get_tx_id();
484        let begin_ts = self.get_timestamp();
485        let tx = Transaction::new(tx_id, begin_ts);
486        tracing::trace!("begin_tx(tx_id={})", tx_id);
487        self.txs.insert(tx_id, RwLock::new(tx));
488        tx_id
489    }
490
491    /// Commits a transaction with the specified transaction ID.
492    ///
493    /// This function commits the changes made within the specified transaction and finalizes the
494    /// transaction. Once a transaction has been committed, all changes made within the transaction
495    /// are visible to other transactions that access the same data.
496    ///
497    /// # Arguments
498    ///
499    /// * `tx_id` - The ID of the transaction to commit.
500    pub fn commit_tx(&self, tx_id: TxID) -> Result<()> {
501        let end_ts = self.get_timestamp();
502        // NOTICE: the first shadowed tx keeps the entry alive in the map
503        // for the duration of this whole function, which is important for correctness!
504        let tx = self.txs.get(&tx_id).ok_or(DatabaseError::TxTerminated)?;
505        let tx = tx.value().write().unwrap();
506        match tx.state.load() {
507            TransactionState::Terminated => return Err(DatabaseError::TxTerminated),
508            _ => {
509                assert_eq!(tx.state, TransactionState::Active);
510            }
511        }
512        tx.state.store(TransactionState::Preparing);
513        tracing::trace!("prepare_tx(tx_id={})", tx_id);
514
515        /* TODO: The code we have here is sufficient for snapshot isolation.
516        ** In order to implement serializability, we need the following steps:
517        **
518        ** 1. Validate if all read versions are still visible by inspecting the read_set
519        ** 2. Validate if there are no phantoms by walking the scans from scan_set (which we don't even have yet)
520        **    - a phantom is a version that became visible in the middle of our transaction,
521        **      but wasn't taken into account during one of the scans from the scan_set
522        ** 3. Wait for commit dependencies, which we don't even track yet...
523        **    Excerpt from what's a commit dependency and how it's tracked in the original paper:
524        **    """
525                A transaction T1 has a commit dependency on another transaction
526                T2, if T1 is allowed to commit only if T2 commits. If T2 aborts,
527                T1 must also abort, so cascading aborts are possible. T1 acquires a
528                commit dependency either by speculatively reading or speculatively ignoring a version,
529                instead of waiting for T2 to commit.
530                We implement commit dependencies by a register-and-report
531                approach: T1 registers its dependency with T2 and T2 informs T1
532                when it has committed or aborted. Each transaction T contains a
533                counter, CommitDepCounter, that counts how many unresolved
534                commit dependencies it still has. A transaction cannot commit
535                until this counter is zero. In addition, T has a Boolean variable
536                AbortNow that other transactions can set to tell T to abort. Each
537                transaction T also has a set, CommitDepSet, that stores transaction IDs
538                of the transactions that depend on T.
539                To take a commit dependency on a transaction T2, T1 increments
540                its CommitDepCounter and adds its transaction ID to T2’s CommitDepSet.
541                When T2 has committed, it locates each transaction in
542                its CommitDepSet and decrements their CommitDepCounter. If
543                T2 aborted, it tells the dependent transactions to also abort by
544                setting their AbortNow flags. If a dependent transaction is not
545                found, this means that it has already aborted.
546                Note that a transaction with commit dependencies may not have to
547                wait at all - the dependencies may have been resolved before it is
548                ready to commit. Commit dependencies consolidate all waits into
549                a single wait and postpone the wait to just before commit.
550                Some transactions may have to wait before commit.
551                Waiting raises a concern of deadlocks.
552                However, deadlocks cannot occur because an older transaction never
553                waits on a younger transaction. In
554                a wait-for graph the direction of edges would always be from a
555                younger transaction (higher end timestamp) to an older transaction
556                (lower end timestamp) so cycles are impossible.
557            """
558        **  If you're wondering when a speculative read happens, here you go:
559        **  Case 1: speculative read of TB:
560            """
561                If transaction TB is in the Preparing state, it has acquired an end
562                timestamp TS which will be V’s begin timestamp if TB commits.
563                A safe approach in this situation would be to have transaction T
564                wait until transaction TB commits. However, we want to avoid all
565                blocking during normal processing so instead we continue with
566                the visibility test and, if the test returns true, allow T to
567                speculatively read V. Transaction T acquires a commit dependency on
568                TB, restricting the serialization order of the two transactions. That
569                is, T is allowed to commit only if TB commits.
570            """
571        **  Case 2: speculative ignore of TE:
572            """
573                If TE’s state is Preparing, it has an end timestamp TS that will become
574                the end timestamp of V if TE does commit. If TS is greater than the read
575                time RT, it is obvious that V will be visible if TE commits. If TE
576                aborts, V will still be visible, because any transaction that updates
577                V after TE has aborted will obtain an end timestamp greater than
578                TS. If TS is less than RT, we have a more complicated situation:
579                if TE commits, V will not be visible to T but if TE aborts, it will
580                be visible. We could handle this by forcing T to wait until TE
581                commits or aborts but we want to avoid all blocking during normal processing.
582                Instead we allow T to speculatively ignore V and
583                proceed with its processing. Transaction T acquires a commit
584                dependency (see Section 2.7) on TE, that is, T is allowed to commit
585                only if TE commits.
586            """
587        */
588        tx.state.store(TransactionState::Committed(end_ts));
589        tracing::trace!("commit_tx(tx_id={})", tx_id);
590        let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
591        drop(tx);
592        // Postprocessing: inserting row versions and logging the transaction to persistent storage.
593        // TODO: we should probably save to persistent storage first, and only then update the in-memory structures.
594        let mut log_record = LogRecord::new(end_ts);
595        for ref id in write_set {
596            if let Some(row_versions) = self.rows.get(id) {
597                let mut row_versions = row_versions.value().write().unwrap();
598                for row_version in row_versions.iter_mut() {
599                    if let TxTimestampOrID::TxID(id) = row_version.begin {
600                        if id == tx_id {
601                            // New version is valid STARTING FROM committing transaction's end timestamp
602                            // See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
603                            row_version.begin = TxTimestampOrID::Timestamp(end_ts);
604                            self.insert_version_raw(
605                                &mut log_record.row_versions,
606                                row_version.clone(),
607                            ); // FIXME: optimize cloning out
608                        }
609                    }
610                    if let Some(TxTimestampOrID::TxID(id)) = row_version.end {
611                        if id == tx_id {
612                            // Old version is valid UNTIL committing transaction's end timestamp
613                            // See diagram on page 299: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf
614                            row_version.end = Some(TxTimestampOrID::Timestamp(end_ts));
615                            self.insert_version_raw(
616                                &mut log_record.row_versions,
617                                row_version.clone(),
618                            ); // FIXME: optimize cloning out
619                        }
620                    }
621                }
622            }
623        }
624        tracing::trace!("updated(tx_id={})", tx_id);
625        // We have now updated all the versions with a reference to the
626        // transaction ID to a timestamp and can, therefore, remove the
627        // transaction. Please note that when we move to lockless, the
628        // invariant doesn't necessarily hold anymore because another thread
629        // might have speculatively read a version that we want to remove.
630        // But that's a problem for another day.
631        // FIXME: it actually just become a problem for today!!!
632        // TODO: test that reproduces this failure, and then a fix
633        self.txs.remove(&tx_id);
634        if !log_record.row_versions.is_empty() {
635            self.storage.log_tx(log_record)?;
636        }
637        tracing::trace!("logged(tx_id={})", tx_id);
638        Ok(())
639    }
640
641    /// Rolls back a transaction with the specified ID.
642    ///
643    /// This function rolls back a transaction with the specified `tx_id` by
644    /// discarding any changes made by the transaction.
645    ///
646    /// # Arguments
647    ///
648    /// * `tx_id` - The ID of the transaction to abort.
649    pub fn rollback_tx(&self, tx_id: TxID) {
650        let tx_unlocked = self.txs.get(&tx_id).unwrap();
651        let tx = tx_unlocked.value().write().unwrap();
652        assert_eq!(tx.state, TransactionState::Active);
653        tx.state.store(TransactionState::Aborted);
654        tracing::trace!("abort(tx_id={})", tx_id);
655        let write_set: Vec<RowID> = tx.write_set.iter().map(|v| *v.value()).collect();
656        drop(tx);
657
658        for ref id in write_set {
659            if let Some(row_versions) = self.rows.get(id) {
660                let mut row_versions = row_versions.value().write().unwrap();
661                row_versions.retain(|rv| rv.begin != TxTimestampOrID::TxID(tx_id));
662                if row_versions.is_empty() {
663                    self.rows.remove(id);
664                }
665            }
666        }
667
668        let tx = tx_unlocked.value().read().unwrap();
669        tx.state.store(TransactionState::Terminated);
670        tracing::trace!("terminate(tx_id={})", tx_id);
671        // FIXME: verify that we can already remove the transaction here!
672        // Maybe it's fine for snapshot isolation, but too early for serializable?
673        self.txs.remove(&tx_id);
674    }
675
676    /// Generates next unique transaction id
677    pub fn get_tx_id(&self) -> u64 {
678        self.tx_ids.fetch_add(1, Ordering::SeqCst)
679    }
680
681    /// Gets current timestamp
682    pub fn get_timestamp(&self) -> u64 {
683        self.clock.get_timestamp()
684    }
685
686    /// Removes unused row  versions with very loose heuristics,
687    /// which sometimes leaves versions intact for too long.
688    /// Returns the number of removed versions.
689    pub fn drop_unused_row_versions(&self) -> usize {
690        tracing::trace!(
691            "drop_unused_row_versions() -> txs: {}; rows: {}",
692            self.txs.len(),
693            self.rows.len()
694        );
695        let mut dropped = 0;
696        let mut to_remove = Vec::new();
697        for entry in self.rows.iter() {
698            let mut row_versions = entry.value().write().unwrap();
699            row_versions.retain(|rv| {
700                // FIXME: should take rv.begin into account as well
701                let should_stay = match rv.end {
702                    Some(TxTimestampOrID::Timestamp(version_end_ts)) => {
703                        // a transaction started before this row version ended, ergo row version is needed
704                        // NOTICE: O(row_versions x transactions), but also lock-free, so sounds acceptable
705                        self.txs.iter().any(|tx| {
706                            let tx = tx.value().read().unwrap();
707                            // FIXME: verify!
708                            match tx.state.load() {
709                                TransactionState::Active | TransactionState::Preparing => {
710                                    version_end_ts > tx.begin_ts
711                                }
712                                _ => false,
713                            }
714                        })
715                    }
716                    // Let's skip potentially complex logic if the transafction is still
717                    // active/tracked. We will drop the row version when the transaction
718                    // gets garbage-collected itself, it will always happen eventually.
719                    Some(TxTimestampOrID::TxID(tx_id)) => !self.txs.contains_key(&tx_id),
720                    // this row version is current, ergo visible
721                    None => true,
722                };
723                if !should_stay {
724                    dropped += 1;
725                    tracing::trace!(
726                        "Dropping row version {:?} {:?}-{:?}",
727                        entry.key(),
728                        rv.begin,
729                        rv.end
730                    );
731                }
732                should_stay
733            });
734            if row_versions.is_empty() {
735                to_remove.push(*entry.key());
736            }
737        }
738        for id in to_remove {
739            self.rows.remove(&id);
740        }
741        dropped
742    }
743
744    pub fn recover(&self) -> Result<()> {
745        let tx_log = self.storage.read_tx_log()?;
746        for record in tx_log {
747            tracing::debug!("recover() -> tx_timestamp={}", record.tx_timestamp);
748            for version in record.row_versions {
749                self.insert_version(version.row.id, version);
750            }
751            self.clock.reset(record.tx_timestamp);
752        }
753        Ok(())
754    }
755
756    // Extracts the begin timestamp from a transaction
757    fn get_begin_timestamp(&self, ts_or_id: &TxTimestampOrID) -> u64 {
758        match ts_or_id {
759            TxTimestampOrID::Timestamp(ts) => *ts,
760            TxTimestampOrID::TxID(tx_id) => {
761                self.txs
762                    .get(tx_id)
763                    .unwrap()
764                    .value()
765                    .read()
766                    .unwrap()
767                    .begin_ts
768            }
769        }
770    }
771
772    /// Inserts a new row version into the database, while making sure that
773    /// the row version is inserted in the correct order.
774    fn insert_version(&self, id: RowID, row_version: RowVersion) {
775        let versions = self.rows.get_or_insert_with(id, || RwLock::new(Vec::new()));
776        let mut versions = versions.value().write().unwrap();
777        self.insert_version_raw(&mut versions, row_version)
778    }
779
780    /// Inserts a new row version into the internal data structure for versions,
781    /// while making sure that the row version is inserted in the correct order.
782    fn insert_version_raw(&self, versions: &mut Vec<RowVersion>, row_version: RowVersion) {
783        // NOTICE: this is an insert a'la insertion sort, with pessimistic linear complexity.
784        // However, we expect the number of versions to be nearly sorted, so we deem it worthy
785        // to search linearly for the insertion point instead of paying the price of using
786        // another data structure, e.g. a BTreeSet. If it proves to be too quadratic empirically,
787        // we can either switch to a tree-like structure, or at least use partition_point()
788        // which performs a binary search for the insertion point.
789        let position = versions
790            .iter()
791            .rposition(|v| {
792                self.get_begin_timestamp(&v.begin) < self.get_begin_timestamp(&row_version.begin)
793            })
794            .map(|p| p + 1)
795            .unwrap_or(0);
796        if versions.len() - position > 3 {
797            tracing::debug!(
798                "Inserting a row version {} positions from the end",
799                versions.len() - position
800            );
801        }
802        versions.insert(position, row_version);
803    }
804}
805
806/// A write-write conflict happens when transaction T_current attempts to update a
807/// row version that is:
808/// a) currently being updated by an active transaction T_previous, or
809/// b) was updated by an ended transaction T_previous that committed AFTER T_current started
810/// but BEFORE T_previous commits.
811///
812/// "Suppose transaction T wants to update a version V. V is updatable
813/// only if it is the latest version, that is, it has an end timestamp equal
814/// to infinity or its End field contains the ID of a transaction TE and
815/// TE’s state is Aborted"
816/// Ref: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf , page 301,
817/// 2.6. Updating a Version.
818pub(crate) fn is_write_write_conflict(
819    txs: &SkipMap<TxID, RwLock<Transaction>>,
820    tx: &Transaction,
821    rv: &RowVersion,
822) -> bool {
823    match rv.end {
824        Some(TxTimestampOrID::TxID(rv_end)) => {
825            let te = txs.get(&rv_end).unwrap();
826            let te = te.value().read().unwrap();
827            if te.tx_id == tx.tx_id {
828                return false;
829            }
830            te.state.load() != TransactionState::Aborted
831        }
832        // A non-"infinity" end timestamp (here modeled by Some(ts)) functions as a write lock
833        // on the row, so it can never be updated by another transaction.
834        // Ref: https://www.cs.cmu.edu/~15721-f24/papers/Hekaton.pdf , page 301,
835        // 2.6. Updating a Version.
836        Some(TxTimestampOrID::Timestamp(_)) => true,
837        None => false,
838    }
839}
840
841impl RowVersion {
842    pub fn is_visible_to(
843        &self,
844        tx: &Transaction,
845        txs: &SkipMap<TxID, RwLock<Transaction>>,
846    ) -> bool {
847        is_begin_visible(txs, tx, self) && is_end_visible(txs, tx, self)
848    }
849}
850
851fn is_begin_visible(
852    txs: &SkipMap<TxID, RwLock<Transaction>>,
853    tx: &Transaction,
854    rv: &RowVersion,
855) -> bool {
856    match rv.begin {
857        TxTimestampOrID::Timestamp(rv_begin_ts) => tx.begin_ts >= rv_begin_ts,
858        TxTimestampOrID::TxID(rv_begin) => {
859            let tb = txs.get(&rv_begin).unwrap();
860            let tb = tb.value().read().unwrap();
861            let visible = match tb.state.load() {
862                TransactionState::Active => tx.tx_id == tb.tx_id && rv.end.is_none(),
863                TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable!
864                TransactionState::Committed(committed_ts) => tx.begin_ts >= committed_ts,
865                TransactionState::Aborted => false,
866                TransactionState::Terminated => {
867                    tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now");
868                    false
869                }
870            };
871            tracing::trace!(
872                "is_begin_visible: tx={tx}, tb={tb} rv = {:?}-{:?} visible = {visible}",
873                rv.begin,
874                rv.end
875            );
876            visible
877        }
878    }
879}
880
881fn is_end_visible(
882    txs: &SkipMap<TxID, RwLock<Transaction>>,
883    tx: &Transaction,
884    rv: &RowVersion,
885) -> bool {
886    match rv.end {
887        Some(TxTimestampOrID::Timestamp(rv_end_ts)) => tx.begin_ts < rv_end_ts,
888        Some(TxTimestampOrID::TxID(rv_end)) => {
889            let te = txs.get(&rv_end).unwrap();
890            let te = te.value().read().unwrap();
891            let visible = match te.state.load() {
892                TransactionState::Active => tx.tx_id != te.tx_id,
893                TransactionState::Preparing => false, // NOTICE: makes sense for snapshot isolation, not so much for serializable!
894                TransactionState::Committed(committed_ts) => tx.begin_ts < committed_ts,
895                TransactionState::Aborted => false,
896                TransactionState::Terminated => {
897                    tracing::debug!("TODO: should reread rv's end field - it should have updated the timestamp in the row version by now");
898                    false
899                }
900            };
901            tracing::trace!(
902                "is_end_visible: tx={tx}, te={te} rv = {:?}-{:?}  visible = {visible}",
903                rv.begin,
904                rv.end
905            );
906            visible
907        }
908        None => true,
909    }
910}