infinite-db 0.4.0

A spatial-graph database using n-dimensional curves and hyperedges for engineering logic.
//! Stable revision watermark for repeatable reads.

use std::collections::{BTreeSet, VecDeque};

use parking_lot::Mutex;

use crate::infinitedb_core::address::RevisionId;

/// Record of a revision that could not be durably applied.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FailedRevision {
    /// Revision that was abandoned.
    pub revision: RevisionId,
    /// Human-readable failure reason.
    pub error: String,
}

/// Contiguous revision ids allocated in one [`RevisionWatermark::allocate_n`] call.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RevisionRange {
    first: RevisionId,
    last: RevisionId,
}

impl RevisionRange {
    /// Empty range placeholder.
    pub fn empty() -> Self {
        Self {
            first: RevisionId::ZERO,
            last: RevisionId::ZERO,
        }
    }

    /// Single-revision range.
    pub fn single(rev: RevisionId) -> Self {
        Self {
            first: rev,
            last: rev,
        }
    }

    /// Inclusive range from `first` through `last`.
    pub fn new(first: RevisionId, last: RevisionId) -> Self {
        Self { first, last }
    }

    /// First revision in the range (inclusive).
    pub fn first(self) -> RevisionId {
        self.first
    }

    /// Last revision in the range (inclusive).
    pub fn last(self) -> RevisionId {
        self.last
    }

    /// Return the revision at `index` (0 = first) within the global session-0 stream.
    pub fn nth(self, index: u64) -> RevisionId {
        RevisionId::legacy(self.first.legacy_sequence() + index)
    }
}

/// Internal watermark state mutated under a single lock.
#[derive(Debug)]
struct WatermarkState {
    allocated: RevisionId,
    outstanding: BTreeSet<RevisionId>,
    failed: VecDeque<FailedRevision>,
}

/// Pure interpretation of [`WatermarkState`] for the repeatable-read ceiling.
fn compute_stable(state: &WatermarkState) -> RevisionId {
    if state.outstanding.is_empty() {
        state.allocated
    } else {
        state
            .outstanding
            .first()
            .copied()
            .unwrap_or(RevisionId::ZERO)
            .predecessor()
    }
}

/// Tracks in-flight revisions and the highest durably applied revision.
///
/// `stable_revision` is the highest revision R such that every write with
/// revision ≤ R has been applied and published, or reported as failed.
/// [`Self::allocated`] may be higher while writes are still queued per shard.
///
/// Allocation and registration are a single atomic operation under one lock: a
/// revision is never visible to `stable_revision` as allocated-but-not-outstanding.
pub struct RevisionWatermark {
    state: Mutex<WatermarkState>,
}

const MAX_FAILED_RECORDS: usize = 64;

impl RevisionWatermark {
    /// Create a watermark with the allocation counter seeded to `initial`.
    pub fn new(initial: u64) -> Self {
        Self {
            state: Mutex::new(WatermarkState {
                allocated: RevisionId::legacy(initial),
                outstanding: BTreeSet::new(),
                failed: VecDeque::new(),
            }),
        }
    }

    /// Allocate the next revision and register it as outstanding.
    pub fn allocate(&self) -> RevisionId {
        let mut state = self.state.lock();
        let rev = state.allocated.next_global();
        state.allocated = rev;
        state.outstanding.insert(rev);
        rev
    }

    /// Allocate a contiguous run of revisions in one lock acquisition.
    pub fn allocate_n(&self, count: u64) -> RevisionRange {
        debug_assert!(count > 0, "allocate_n requires count > 0");
        let mut state = self.state.lock();
        let first = state.allocated.next_global();
        let last = RevisionId::legacy(first.legacy_sequence() + count - 1);
        state.allocated = last;
        let mut seq = first.legacy_sequence();
        while seq <= last.legacy_sequence() {
            state.outstanding.insert(RevisionId::legacy(seq));
            seq += 1;
        }
        RevisionRange { first, last }
    }

    /// Highest allocated revision id (may not yet be durable).
    pub fn allocated(&self) -> RevisionId {
        self.state.lock().allocated
    }

    /// Seed the allocation counter (database open / recovery).
    pub fn set_revision(&self, value: u64) {
        self.state.lock().allocated = RevisionId::legacy(value);
    }

    /// Raise the allocation high-water without registering outstanding work.
    pub fn seed_allocated(&self, rev: RevisionId) {
        let mut state = self.state.lock();
        if rev > state.allocated {
            state.allocated = rev;
        }
    }

    /// Register an externally stamped revision as outstanding (HLC sessions).
    pub fn register_outstanding(&self, rev: RevisionId) {
        let mut state = self.state.lock();
        state.outstanding.insert(rev);
        if rev > state.allocated {
            state.allocated = rev;
        }
    }

    /// Retire a revision after durable apply and live-tail publish.
    pub fn retire(&self, rev: RevisionId) {
        self.state.lock().outstanding.remove(&rev);
    }

    /// Abandon a revision that can no longer succeed (I/O failure).
    pub fn retire_failed(&self, rev: RevisionId, error: impl Into<String>) {
        let mut state = self.state.lock();
        state.outstanding.remove(&rev);
        if state.failed.len() >= MAX_FAILED_RECORDS {
            state.failed.pop_front();
        }
        state.failed.push_back(FailedRevision {
            revision: rev,
            error: error.into(),
        });
    }

    /// Observe recorded write failures without clearing the log (most recent retained).
    pub fn failed_revisions(&self) -> Vec<FailedRevision> {
        self.state.lock().failed.iter().cloned().collect()
    }

    /// Drain recorded write failures (most recent retained).
    pub fn take_failed(&self) -> Vec<FailedRevision> {
        self.state.lock().failed.drain(..).collect()
    }

    /// Highest revision guaranteed visible to readers (repeatable-read ceiling).
    ///
    /// Every revision ≤ the returned value has either been durably applied or
    /// abandoned via [`Self::retire_failed`]; stable never waits on a revision
    /// that can no longer succeed.
    pub fn stable_revision(&self) -> RevisionId {
        compute_stable(&self.state.lock())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn stable_with_non_dense_outstanding_ids() {
        let wm = RevisionWatermark::new(0);
        wm.allocate(); // 1
        wm.allocate(); // 2
        wm.allocate(); // 3
        wm.retire(RevisionId::legacy(1));
        wm.retire(RevisionId::legacy(3));
        // Outstanding: {2}. Stable should be 1 (predecessor of 2), not 0.
        assert_eq!(wm.stable_revision(), RevisionId::legacy(1));
        wm.retire(RevisionId::legacy(2));
        assert_eq!(wm.stable_revision(), RevisionId::legacy(3));
    }

    #[test]
    fn allocate_registers_before_stable_observes() {
        let wm = RevisionWatermark::new(0);
        let rev = wm.allocate();
        assert!(
            wm.stable_revision() < rev,
            "stable must lag behind an unretired allocation"
        );
        wm.retire(rev);
        assert_eq!(wm.stable_revision(), rev);
    }

    #[test]
    fn compute_stable_cases() {
        let empty = WatermarkState {
            allocated: RevisionId::legacy(5),
            outstanding: BTreeSet::new(),
            failed: VecDeque::new(),
        };
        assert_eq!(compute_stable(&empty), RevisionId::legacy(5));

        let with_gap = WatermarkState {
            allocated: RevisionId::legacy(3),
            outstanding: BTreeSet::from([RevisionId::legacy(2)]),
            failed: VecDeque::new(),
        };
        assert_eq!(compute_stable(&with_gap), RevisionId::legacy(1));

        let dense = WatermarkState {
            allocated: RevisionId::legacy(3),
            outstanding: BTreeSet::from([
                RevisionId::legacy(1),
                RevisionId::legacy(2),
                RevisionId::legacy(3),
            ]),
            failed: VecDeque::new(),
        };
        assert_eq!(compute_stable(&dense), RevisionId::ZERO);
    }

    #[test]
    fn revision_range_nth() {
        let range = RevisionRange {
            first: RevisionId::legacy(10),
            last: RevisionId::legacy(12),
        };
        assert_eq!(range.nth(0), RevisionId::legacy(10));
        assert_eq!(range.nth(2), RevisionId::legacy(12));
    }

    #[test]
    fn allocation_registration_atomic_under_contention() {
        use std::sync::Arc;
        use std::thread;

        let wm = Arc::new(RevisionWatermark::new(0));
        let wm_writer = Arc::clone(&wm);
        let writer = thread::spawn(move || {
            for _ in 0..500 {
                let rev = wm_writer.allocate();
                assert!(
                    wm_writer.stable_revision() < rev,
                    "stable must lag behind an unretired allocation"
                );
                thread::yield_now();
                wm_writer.retire(rev);
            }
        });

        for _ in 0..10_000 {
            let stable = wm.stable_revision();
            let allocated = wm.allocated();
            assert!(
                stable <= allocated,
                "stable {stable:?} must not exceed allocated {allocated:?}"
            );
            thread::yield_now();
        }
        writer.join().unwrap();
    }
}