use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum LockKey {
Surrogate {
collection: Arc<str>,
surrogate: u32,
},
Kv {
collection: Arc<str>,
key: Arc<[u8]>,
},
Edge {
collection: Arc<str>,
src: u32,
dst: u32,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TxnId {
pub epoch: u64,
pub position: u32,
}
impl TxnId {
pub fn new(epoch: u64, position: u32) -> Self {
Self { epoch, position }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AcquireOutcome {
Ready,
Blocked,
}
struct LockEntry {
holder: TxnId,
waiters: VecDeque<TxnId>,
}
pub struct LockManager {
table: BTreeMap<LockKey, LockEntry>,
held_locks: BTreeMap<TxnId, BTreeSet<LockKey>>,
pending_keys: BTreeMap<TxnId, BTreeSet<LockKey>>,
}
impl LockManager {
pub fn new() -> Self {
Self {
table: BTreeMap::new(),
held_locks: BTreeMap::new(),
pending_keys: BTreeMap::new(),
}
}
pub fn acquire(&mut self, txn: TxnId, keys: BTreeSet<LockKey>) -> AcquireOutcome {
let all_available = keys.iter().all(|k| match self.table.get(k) {
None => true,
Some(entry) => entry.holder == txn,
});
if all_available {
for key in &keys {
if !self.table.contains_key(key) {
self.table.insert(
key.clone(),
LockEntry {
holder: txn,
waiters: VecDeque::new(),
},
);
}
}
self.pending_keys.remove(&txn);
self.held_locks.insert(txn, keys);
AcquireOutcome::Ready
} else {
for key in &keys {
if let Some(entry) = self.table.get_mut(key)
&& entry.holder != txn
&& !entry.waiters.contains(&txn)
{
entry.waiters.push_back(txn);
}
}
self.pending_keys.insert(txn, keys);
AcquireOutcome::Blocked
}
}
pub fn release(&mut self, txn: TxnId) -> Vec<TxnId> {
let held = match self.held_locks.remove(&txn) {
Some(h) => h,
None => return Vec::new(),
};
let mut newly_promoted: BTreeSet<TxnId> = BTreeSet::new();
for key in &held {
if let Some(entry) = self.table.get_mut(key)
&& entry.holder == txn
{
if let Some(next) = entry.waiters.pop_front() {
entry.holder = next;
if let Some(pending) = self.pending_keys.get(&next) {
let all_held = pending
.iter()
.all(|k| self.table.get(k).is_none_or(|e| e.holder == next));
if all_held {
let keys = self.pending_keys.remove(&next).expect("invariant: pending_keys.get(&next) succeeded in the enclosing if-let");
self.held_locks.insert(next, keys);
newly_promoted.insert(next);
}
}
} else {
self.table.remove(key);
}
}
}
newly_promoted.into_iter().collect()
}
pub fn is_ready(&self, txn: TxnId, keys: &BTreeSet<LockKey>) -> bool {
keys.iter().all(|key| {
match self.table.get(key) {
None => true, Some(entry) => entry.holder == txn, }
})
}
#[cfg(test)]
pub fn lock_count(&self) -> usize {
self.table.len()
}
#[cfg(test)]
pub fn holder_count(&self) -> usize {
self.held_locks.len()
}
}
impl Default for LockManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn key(name: &str) -> LockKey {
LockKey::Surrogate {
collection: Arc::from(name),
surrogate: 1,
}
}
fn keyset(names: &[&str]) -> BTreeSet<LockKey> {
names.iter().map(|n| key(n)).collect()
}
fn txn(epoch: u64, pos: u32) -> TxnId {
TxnId::new(epoch, pos)
}
#[test]
fn acquire_free_keys_returns_ready() {
let mut lm = LockManager::new();
let t = txn(1, 0);
let outcome = lm.acquire(t, keyset(&["a", "b"]));
assert_eq!(outcome, AcquireOutcome::Ready);
assert_eq!(lm.lock_count(), 2);
}
#[test]
fn acquire_held_key_returns_blocked_and_enqueues_waiter() {
let mut lm = LockManager::new();
let t1 = txn(1, 0);
let t2 = txn(1, 1);
lm.acquire(t1, keyset(&["x"]));
let outcome = lm.acquire(t2, keyset(&["x"]));
assert_eq!(outcome, AcquireOutcome::Blocked);
assert!(lm.table.get(&key("x")).unwrap().waiters.contains(&t2));
}
#[test]
fn release_returns_unblocked_waiter_ids() {
let mut lm = LockManager::new();
let t1 = txn(1, 0);
let t2 = txn(1, 1);
lm.acquire(t1, keyset(&["x"]));
lm.acquire(t2, keyset(&["x"]));
let unblocked = lm.release(t1);
assert!(unblocked.contains(&t2));
}
#[test]
fn release_preserves_fifo_waiter_order() {
let mut lm = LockManager::new();
let t1 = txn(1, 0);
let t2 = txn(1, 1);
let t3 = txn(1, 2);
lm.acquire(t1, keyset(&["x"]));
lm.acquire(t2, keyset(&["x"]));
lm.acquire(t3, keyset(&["x"]));
lm.release(t1);
let holder = lm.table.get(&key("x")).unwrap().holder;
assert_eq!(holder, t2);
lm.release(t2);
let holder = lm.table.get(&key("x")).unwrap().holder;
assert_eq!(holder, t3);
}
#[test]
fn multi_key_txn_releases_all_atomically() {
let mut lm = LockManager::new();
let t1 = txn(1, 0);
lm.acquire(t1, keyset(&["a", "b", "c"]));
assert_eq!(lm.lock_count(), 3);
lm.release(t1);
assert_eq!(lm.lock_count(), 0);
assert_eq!(lm.holder_count(), 0);
}
#[test]
fn is_ready_returns_true_when_all_keys_free_or_self_at_front() {
let mut lm = LockManager::new();
let t1 = txn(1, 0);
let t2 = txn(1, 1);
lm.acquire(t1, keyset(&["x", "y"]));
lm.acquire(t2, keyset(&["x", "y"]));
assert!(!lm.is_ready(t2, &keyset(&["x", "y"])));
lm.release(t1);
assert!(lm.is_ready(t2, &keyset(&["x", "y"])));
}
}