tonbo 0.4.0-a1

Embedded database for serverless and edge runtimes, storing data as Parquet on S3
Documentation
//! MVCC timestamps, read visibility, and commit clock helpers.

use std::fmt;

use serde::{Deserialize, Serialize};

/// Canonical column name storing MVCC commit timestamps alongside Arrow payloads.
pub(crate) const MVCC_COMMIT_COL: &str = "_commit_ts";

/// Logical commit timestamp assigned to mutations and read views.
///
/// Timestamps are monotonically increasing `u64` values that identify
/// each committed version of the database. They can be used for:
/// - Point-in-time queries via [`DB::snapshot_at`](crate::db::DB::snapshot_at)
/// - Listing historical versions via [`DB::list_versions`](crate::db::DB::list_versions)
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct Timestamp(u64);

impl Timestamp {
    /// Least possible timestamp (used for uninitialised clocks).
    pub const MIN: Self = Self(0);
    /// Greatest possible timestamp (used for open-ended visibility).
    pub const MAX: Self = Self(u64::MAX);

    /// Construct a timestamp from a raw `u64`.
    #[inline]
    pub const fn new(raw: u64) -> Self {
        Self(raw)
    }

    /// Returns the raw `u64` value backing this timestamp.
    #[inline]
    pub const fn get(self) -> u64 {
        self.0
    }

    /// Returns the next timestamp after `self`, saturating on overflow.
    #[inline]
    pub(crate) const fn next(self) -> Self {
        Self(self.0.saturating_add(1))
    }

    /// Add `delta` while saturating on overflow.
    #[inline]
    pub(crate) const fn saturating_add(self, delta: u64) -> Self {
        Self(self.0.saturating_add(delta))
    }

    /// Subtract `delta` while saturating at [`Timestamp::MIN`].
    #[inline]
    pub(crate) const fn saturating_sub(self, delta: u64) -> Self {
        Self(self.0.saturating_sub(delta))
    }
}

impl From<u64> for Timestamp {
    fn from(value: u64) -> Self {
        Self(value)
    }
}

impl From<Timestamp> for u64 {
    fn from(ts: Timestamp) -> Self {
        ts.0
    }
}

impl fmt::Debug for Timestamp {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_tuple("Timestamp").field(&self.0).finish()
    }
}

/// Tracks the next commit timestamp to allocate.
use std::sync::atomic::{AtomicU64, Ordering};

/// Monotonic allocator for MVCC commit timestamps backed by atomics.
#[derive(Debug)]
pub struct CommitClock {
    next: AtomicU64,
}

impl CommitClock {
    /// Create a new clock that will hand out timestamps starting from `start`.
    #[inline]
    pub(crate) const fn new(start: Timestamp) -> Self {
        Self {
            next: AtomicU64::new(start.get()),
        }
    }

    /// Allocate and return the next commit timestamp.
    #[inline]
    pub(crate) fn alloc(&self) -> Timestamp {
        let current = self.next.fetch_add(1, Ordering::Relaxed);
        Timestamp::new(current)
    }

    /// Return the timestamp that will be handed out next.
    #[inline]
    pub(crate) fn peek(&self) -> Timestamp {
        Timestamp::new(self.next.load(Ordering::Relaxed))
    }

    /// Advance the clock so that it will hand out at least `candidate`.
    ///
    /// Useful after recovery where the highest observed commit is already known.
    #[inline]
    pub(crate) fn advance_to_at_least(&self, candidate: Timestamp) {
        // Use a loop with compare_exchange to avoid losing larger candidates.
        let mut current = self.next.load(Ordering::Relaxed);
        while candidate.get() > current {
            match self.next.compare_exchange(
                current,
                candidate.get(),
                Ordering::Relaxed,
                Ordering::Relaxed,
            ) {
                Ok(_) => break,
                Err(actual) => current = actual,
            }
        }
    }
}

impl Default for CommitClock {
    fn default() -> Self {
        Self::new(Timestamp::MIN)
    }
}

/// Immutable view acquired by readers to evaluate MVCC visibility.
#[derive(Debug, Clone, Copy)]
pub struct ReadView {
    read_ts: Timestamp,
}

impl ReadView {
    /// Build a read view pinned at `read_ts`.
    #[inline]
    pub(crate) const fn new(read_ts: Timestamp) -> Self {
        Self { read_ts }
    }

    /// Commit timestamp visible to the view (inclusive).
    #[inline]
    pub(crate) const fn read_ts(&self) -> Timestamp {
        self.read_ts
    }
}