use std::collections::{BTreeSet, VecDeque};
use parking_lot::Mutex;
use crate::infinitedb_core::address::RevisionId;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FailedRevision {
pub revision: RevisionId,
pub error: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RevisionRange {
first: RevisionId,
last: RevisionId,
}
impl RevisionRange {
pub fn empty() -> Self {
Self {
first: RevisionId::ZERO,
last: RevisionId::ZERO,
}
}
pub fn single(rev: RevisionId) -> Self {
Self {
first: rev,
last: rev,
}
}
pub fn new(first: RevisionId, last: RevisionId) -> Self {
Self { first, last }
}
pub fn first(self) -> RevisionId {
self.first
}
pub fn last(self) -> RevisionId {
self.last
}
pub fn nth(self, index: u64) -> RevisionId {
RevisionId::legacy(self.first.legacy_sequence() + index)
}
}
#[derive(Debug)]
struct WatermarkState {
allocated: RevisionId,
outstanding: BTreeSet<RevisionId>,
failed: VecDeque<FailedRevision>,
}
fn compute_stable(state: &WatermarkState) -> RevisionId {
if state.outstanding.is_empty() {
state.allocated
} else {
state
.outstanding
.first()
.copied()
.unwrap_or(RevisionId::ZERO)
.predecessor()
}
}
pub struct RevisionWatermark {
state: Mutex<WatermarkState>,
}
const MAX_FAILED_RECORDS: usize = 64;
impl RevisionWatermark {
pub fn new(initial: u64) -> Self {
Self {
state: Mutex::new(WatermarkState {
allocated: RevisionId::legacy(initial),
outstanding: BTreeSet::new(),
failed: VecDeque::new(),
}),
}
}
pub fn allocate(&self) -> RevisionId {
let mut state = self.state.lock();
let rev = state.allocated.next_global();
state.allocated = rev;
state.outstanding.insert(rev);
rev
}
pub fn allocate_n(&self, count: u64) -> RevisionRange {
debug_assert!(count > 0, "allocate_n requires count > 0");
let mut state = self.state.lock();
let first = state.allocated.next_global();
let last = RevisionId::legacy(first.legacy_sequence() + count - 1);
state.allocated = last;
let mut seq = first.legacy_sequence();
while seq <= last.legacy_sequence() {
state.outstanding.insert(RevisionId::legacy(seq));
seq += 1;
}
RevisionRange { first, last }
}
pub fn allocated(&self) -> RevisionId {
self.state.lock().allocated
}
pub fn set_revision(&self, value: u64) {
self.state.lock().allocated = RevisionId::legacy(value);
}
pub fn seed_allocated(&self, rev: RevisionId) {
let mut state = self.state.lock();
if rev > state.allocated {
state.allocated = rev;
}
}
pub fn register_outstanding(&self, rev: RevisionId) {
let mut state = self.state.lock();
state.outstanding.insert(rev);
if rev > state.allocated {
state.allocated = rev;
}
}
pub fn retire(&self, rev: RevisionId) {
self.state.lock().outstanding.remove(&rev);
}
pub fn retire_failed(&self, rev: RevisionId, error: impl Into<String>) {
let mut state = self.state.lock();
state.outstanding.remove(&rev);
if state.failed.len() >= MAX_FAILED_RECORDS {
state.failed.pop_front();
}
state.failed.push_back(FailedRevision {
revision: rev,
error: error.into(),
});
}
pub fn failed_revisions(&self) -> Vec<FailedRevision> {
self.state.lock().failed.iter().cloned().collect()
}
pub fn take_failed(&self) -> Vec<FailedRevision> {
self.state.lock().failed.drain(..).collect()
}
pub fn stable_revision(&self) -> RevisionId {
compute_stable(&self.state.lock())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn stable_with_non_dense_outstanding_ids() {
let wm = RevisionWatermark::new(0);
wm.allocate(); wm.allocate(); wm.allocate(); wm.retire(RevisionId::legacy(1));
wm.retire(RevisionId::legacy(3));
assert_eq!(wm.stable_revision(), RevisionId::legacy(1));
wm.retire(RevisionId::legacy(2));
assert_eq!(wm.stable_revision(), RevisionId::legacy(3));
}
#[test]
fn allocate_registers_before_stable_observes() {
let wm = RevisionWatermark::new(0);
let rev = wm.allocate();
assert!(
wm.stable_revision() < rev,
"stable must lag behind an unretired allocation"
);
wm.retire(rev);
assert_eq!(wm.stable_revision(), rev);
}
#[test]
fn compute_stable_cases() {
let empty = WatermarkState {
allocated: RevisionId::legacy(5),
outstanding: BTreeSet::new(),
failed: VecDeque::new(),
};
assert_eq!(compute_stable(&empty), RevisionId::legacy(5));
let with_gap = WatermarkState {
allocated: RevisionId::legacy(3),
outstanding: BTreeSet::from([RevisionId::legacy(2)]),
failed: VecDeque::new(),
};
assert_eq!(compute_stable(&with_gap), RevisionId::legacy(1));
let dense = WatermarkState {
allocated: RevisionId::legacy(3),
outstanding: BTreeSet::from([
RevisionId::legacy(1),
RevisionId::legacy(2),
RevisionId::legacy(3),
]),
failed: VecDeque::new(),
};
assert_eq!(compute_stable(&dense), RevisionId::ZERO);
}
#[test]
fn revision_range_nth() {
let range = RevisionRange {
first: RevisionId::legacy(10),
last: RevisionId::legacy(12),
};
assert_eq!(range.nth(0), RevisionId::legacy(10));
assert_eq!(range.nth(2), RevisionId::legacy(12));
}
#[test]
fn allocation_registration_atomic_under_contention() {
use std::sync::Arc;
use std::thread;
let wm = Arc::new(RevisionWatermark::new(0));
let wm_writer = Arc::clone(&wm);
let writer = thread::spawn(move || {
for _ in 0..500 {
let rev = wm_writer.allocate();
assert!(
wm_writer.stable_revision() < rev,
"stable must lag behind an unretired allocation"
);
thread::yield_now();
wm_writer.retire(rev);
}
});
for _ in 0..10_000 {
let stable = wm.stable_revision();
let allocated = wm.allocated();
assert!(
stable <= allocated,
"stable {stable:?} must not exceed allocated {allocated:?}"
);
thread::yield_now();
}
writer.join().unwrap();
}
}