Skip to main content

lora_store/
lock_table.rs

1//! Per-record write locks for fine-grained concurrent mutations.
2//!
3//! [`LockTable`] is the sharded per-record `Mutex<()>` registry the
4//! auto-commit / OCC path consults at commit time. Writers translate
5//! their buffered mutation stream into a [`MutationWriteSet`] (defined
6//! in [`crate::mutation`]) and call [`WriteSetLocks::acquire`] to grab
7//! every per-record lock in sorted order before publishing.
8//!
9//! Concurrent writers on the same record serialize on the entry's
10//! `Mutex`; concurrent writers on disjoint record sets proceed
11//! independently.
12//!
13//! # Sharding
14//!
15//! The lock table is sharded into [`LOCK_TABLE_SHARDS`] independent
16//! buckets. Each shard owns a `HashMap<u64, Arc<Mutex<()>>>` behind
17//! its own `Mutex`. Acquiring a per-record lock looks up the
18//! `Arc<Mutex<()>>` in the relevant shard (taking only that shard's
19//! Mutex briefly, *not* a global table lock) and then locks it.
20//!
21//! Power-of-two shard count means the shard index is a single AND on
22//! the id, no division. 256 shards is generous for any realistic
23//! thread count and keeps the per-shard contention well below the
24//! per-record lock contention you'd expect for non-pathological
25//! workloads.
26//!
27//! # Lifetime
28//!
29//! Lock entries are *not* GC'd from the table after their owning tx
30//! drops them. Each entry is `Arc<Mutex<()>>` — small, and the lock
31//! gets reused if the same id is locked again. A future GC pass could
32//! sweep entries with refcount 1 if memory pressure warrants it.
33
34use std::collections::HashMap;
35use std::sync::{Arc, Mutex, MutexGuard};
36
37use crate::{MutationWriteSet, NodeId, RelationshipId};
38
39/// Power-of-two number of shards in the lock table. Chosen large
40/// enough that per-shard contention is well below per-record
41/// contention for any realistic thread count, small enough that the
42/// table itself stays under a few KB of empty-shard overhead.
43pub const LOCK_TABLE_SHARDS: usize = 256;
44
45const SHARD_MASK: u64 = (LOCK_TABLE_SHARDS as u64) - 1;
46
47/// Sharded per-record write-lock registry. Cloning is cheap (one
48/// `Arc` per shard map) so the table can live behind an `Arc` shared
49/// across all writers.
50pub struct LockTable {
51    nodes: [Mutex<HashMap<NodeId, Arc<Mutex<()>>>>; LOCK_TABLE_SHARDS],
52    rels: [Mutex<HashMap<RelationshipId, Arc<Mutex<()>>>>; LOCK_TABLE_SHARDS],
53}
54
55impl Default for LockTable {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61impl LockTable {
62    pub fn new() -> Self {
63        Self {
64            nodes: std::array::from_fn(|_| Mutex::new(HashMap::new())),
65            rels: std::array::from_fn(|_| Mutex::new(HashMap::new())),
66        }
67    }
68
69    /// Look up (or insert) the per-node `Arc<Mutex<()>>`. Caller is
70    /// expected to `lock()` the returned `Arc` and hold it across the
71    /// mutation. Sharded so two threads locking different node ids in
72    /// the same id-mod-shards bucket only contend for the brief
73    /// shard-table acquisition, not on each other's record lock.
74    pub fn node_lock_arc(&self, id: NodeId) -> Arc<Mutex<()>> {
75        let shard = (id & SHARD_MASK) as usize;
76        let mut map = self.nodes[shard].lock().unwrap_or_else(|p| p.into_inner());
77        map.entry(id)
78            .or_insert_with(|| Arc::new(Mutex::new(())))
79            .clone()
80    }
81
82    /// Same as [`Self::node_lock_arc`] but for relationships.
83    pub fn rel_lock_arc(&self, id: RelationshipId) -> Arc<Mutex<()>> {
84        let shard = (id & SHARD_MASK) as usize;
85        let mut map = self.rels[shard].lock().unwrap_or_else(|p| p.into_inner());
86        map.entry(id)
87            .or_insert_with(|| Arc::new(Mutex::new(())))
88            .clone()
89    }
90}
91
92/// Bundle of per-record locks held by a single writer's commit
93/// critical section. Holding the locks here (rather than scattered
94/// `MutexGuard`s) makes drop order deterministic — all locks release
95/// when this struct drops, regardless of which release path
96/// (commit / abort / panic) the writer took.
97///
98/// Lock acquisition is **always sorted by id** so any two transactions
99/// touching the same record set acquire the locks in the same order.
100/// That's the standard prevention strategy for the simplest deadlock
101/// scenario — two writers each holding one of the other's locks.
102pub struct WriteSetLocks {
103    /// Boxed because `MutexGuard` is `!Unpin` and we want to keep the
104    /// guards live by value. The `Arc<Mutex<()>>` originates from the
105    /// `LockTable`; we keep both the Arc *and* the guard so the
106    /// `'static` guard is anchored by the Arc that owns the lock.
107    _guards: Vec<OwnedMutexGuard>,
108}
109
110/// Wrapper that pairs an `Arc<Mutex<()>>` with the `MutexGuard<'_>`
111/// it produced. Extends the guard's borrow lifetime to the lifetime of
112/// the surrounding [`WriteSetLocks`]; safe because the `Arc` keeps the
113/// underlying `Mutex` alive at least as long as the guard, and the
114/// `Vec` drops guards before Arcs (Rust drops fields in declaration
115/// order, but `OwnedMutexGuard` also enforces it via its own struct
116/// drop order).
117struct OwnedMutexGuard {
118    /// Guard first so it drops first.
119    guard: Option<MutexGuard<'static, ()>>,
120    /// Arc kept alive while the guard exists.
121    _arc: Arc<Mutex<()>>,
122}
123
124impl OwnedMutexGuard {
125    /// SAFETY: extends the guard's lifetime from `'_` to `'static`. This
126    /// is sound because (a) `_arc` keeps the `Mutex<()>` alive for the
127    /// guard's full life, and (b) `Drop` for `OwnedMutexGuard` ensures
128    /// the guard is dropped before `_arc`.
129    fn lock(arc: Arc<Mutex<()>>) -> Self {
130        let guard = arc.lock().unwrap_or_else(|p| p.into_inner());
131        // Erase the lifetime — the Arc anchor keeps the Mutex live.
132        let guard: MutexGuard<'static, ()> =
133            unsafe { std::mem::transmute::<MutexGuard<'_, ()>, _>(guard) };
134        Self {
135            guard: Some(guard),
136            _arc: arc,
137        }
138    }
139}
140
141impl Drop for OwnedMutexGuard {
142    fn drop(&mut self) {
143        // Explicit drop order: guard before _arc.
144        self.guard.take();
145    }
146}
147
148impl WriteSetLocks {
149    /// Acquire write locks for every record in the write set, sorted
150    /// by id. Returns once all locks are held; the caller's commit
151    /// runs without contention on these records until `WriteSetLocks`
152    /// drops.
153    pub fn acquire(table: &LockTable, write_set: &MutationWriteSet) -> Self {
154        // Sort to prevent deadlock from inconsistent acquisition order
155        // across concurrent writers touching overlapping record sets.
156        let mut node_ids: Vec<NodeId> = write_set.nodes.iter().copied().collect();
157        node_ids.sort_unstable();
158        let mut rel_ids: Vec<RelationshipId> = write_set.rels.iter().copied().collect();
159        rel_ids.sort_unstable();
160
161        // Acquire nodes first, then rels. The two namespaces never
162        // alias because a NodeId and a RelationshipId can't share a
163        // lock entry (separate shard arrays in the LockTable).
164        let mut guards = Vec::with_capacity(node_ids.len() + rel_ids.len());
165        for id in node_ids {
166            guards.push(OwnedMutexGuard::lock(table.node_lock_arc(id)));
167        }
168        for id in rel_ids {
169            guards.push(OwnedMutexGuard::lock(table.rel_lock_arc(id)));
170        }
171        Self { _guards: guards }
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178    use std::sync::Arc;
179    use std::thread;
180    use std::time::Duration;
181
182    #[test]
183    fn lock_table_returns_same_arc_for_same_id() {
184        let table = LockTable::new();
185        let a = table.node_lock_arc(42);
186        let b = table.node_lock_arc(42);
187        assert!(Arc::ptr_eq(&a, &b));
188    }
189
190    #[test]
191    fn lock_table_distinct_ids_get_distinct_locks() {
192        let table = LockTable::new();
193        let a = table.node_lock_arc(1);
194        let b = table.node_lock_arc(2);
195        assert!(!Arc::ptr_eq(&a, &b));
196    }
197
198    #[test]
199    fn node_and_rel_namespaces_are_separate() {
200        let table = LockTable::new();
201        let n = table.node_lock_arc(7);
202        let r = table.rel_lock_arc(7);
203        assert!(!Arc::ptr_eq(&n, &r));
204    }
205
206    #[test]
207    fn write_set_locks_serialize_same_id() {
208        let table = Arc::new(LockTable::new());
209        let counter = Arc::new(Mutex::new(0u32));
210
211        let mut handles = Vec::new();
212        for _ in 0..4 {
213            let table = table.clone();
214            let counter = counter.clone();
215            handles.push(thread::spawn(move || {
216                let mut ws = MutationWriteSet::new();
217                ws.nodes.insert(99);
218                let _locks = WriteSetLocks::acquire(&table, &ws);
219                let mut c = counter.lock().unwrap();
220                let before = *c;
221                *c += 1;
222                // Held while we sleep — second thread can't enter
223                // because it's blocked on the same id's lock.
224                thread::sleep(Duration::from_millis(5));
225                assert_eq!(*c, before + 1, "lock did not serialize");
226            }));
227        }
228        for h in handles {
229            h.join().unwrap();
230        }
231        assert_eq!(*counter.lock().unwrap(), 4);
232    }
233}