lora_store/lock_table.rs
1//! Per-record write locks for fine-grained concurrent mutations.
2//!
3//! [`LockTable`] is the sharded per-record `Mutex<()>` registry the
4//! auto-commit / OCC path consults at commit time. Writers translate
5//! their buffered mutation stream into a [`MutationWriteSet`] (defined
6//! in [`crate::mutation`]) and call [`WriteSetLocks::acquire`] to grab
7//! every per-record lock in sorted order before publishing.
8//!
9//! Concurrent writers on the same record serialize on the entry's
10//! `Mutex`; concurrent writers on disjoint record sets proceed
11//! independently.
12//!
13//! # Sharding
14//!
15//! The lock table is sharded into [`LOCK_TABLE_SHARDS`] independent
16//! buckets. Each shard owns a `HashMap<u64, Arc<Mutex<()>>>` behind
17//! its own `Mutex`. Acquiring a per-record lock looks up the
18//! `Arc<Mutex<()>>` in the relevant shard (taking only that shard's
19//! Mutex briefly, *not* a global table lock) and then locks it.
20//!
21//! Power-of-two shard count means the shard index is a single AND on
22//! the id, no division. 256 shards is generous for any realistic
23//! thread count and keeps the per-shard contention well below the
24//! per-record lock contention you'd expect for non-pathological
25//! workloads.
26//!
27//! # Lifetime
28//!
29//! Lock entries are *not* GC'd from the table after their owning tx
30//! drops them. Each entry is `Arc<Mutex<()>>` — small, and the lock
31//! gets reused if the same id is locked again. A future GC pass could
32//! sweep entries with refcount 1 if memory pressure warrants it.
33
34use std::collections::HashMap;
35use std::sync::{Arc, Mutex, MutexGuard};
36
37use crate::{MutationWriteSet, NodeId, RelationshipId};
38
39/// Power-of-two number of shards in the lock table. Chosen large
40/// enough that per-shard contention is well below per-record
41/// contention for any realistic thread count, small enough that the
42/// table itself stays under a few KB of empty-shard overhead.
43pub const LOCK_TABLE_SHARDS: usize = 256;
44
45const SHARD_MASK: u64 = (LOCK_TABLE_SHARDS as u64) - 1;
46
47/// Sharded per-record write-lock registry. Cloning is cheap (one
48/// `Arc` per shard map) so the table can live behind an `Arc` shared
49/// across all writers.
50pub struct LockTable {
51 nodes: [Mutex<HashMap<NodeId, Arc<Mutex<()>>>>; LOCK_TABLE_SHARDS],
52 rels: [Mutex<HashMap<RelationshipId, Arc<Mutex<()>>>>; LOCK_TABLE_SHARDS],
53}
54
55impl Default for LockTable {
56 fn default() -> Self {
57 Self::new()
58 }
59}
60
61impl LockTable {
62 pub fn new() -> Self {
63 Self {
64 nodes: std::array::from_fn(|_| Mutex::new(HashMap::new())),
65 rels: std::array::from_fn(|_| Mutex::new(HashMap::new())),
66 }
67 }
68
69 /// Look up (or insert) the per-node `Arc<Mutex<()>>`. Caller is
70 /// expected to `lock()` the returned `Arc` and hold it across the
71 /// mutation. Sharded so two threads locking different node ids in
72 /// the same id-mod-shards bucket only contend for the brief
73 /// shard-table acquisition, not on each other's record lock.
74 pub fn node_lock_arc(&self, id: NodeId) -> Arc<Mutex<()>> {
75 let shard = (id & SHARD_MASK) as usize;
76 let mut map = self.nodes[shard].lock().unwrap_or_else(|p| p.into_inner());
77 map.entry(id)
78 .or_insert_with(|| Arc::new(Mutex::new(())))
79 .clone()
80 }
81
82 /// Same as [`Self::node_lock_arc`] but for relationships.
83 pub fn rel_lock_arc(&self, id: RelationshipId) -> Arc<Mutex<()>> {
84 let shard = (id & SHARD_MASK) as usize;
85 let mut map = self.rels[shard].lock().unwrap_or_else(|p| p.into_inner());
86 map.entry(id)
87 .or_insert_with(|| Arc::new(Mutex::new(())))
88 .clone()
89 }
90}
91
92/// Bundle of per-record locks held by a single writer's commit
93/// critical section. Holding the locks here (rather than scattered
94/// `MutexGuard`s) makes drop order deterministic — all locks release
95/// when this struct drops, regardless of which release path
96/// (commit / abort / panic) the writer took.
97///
98/// Lock acquisition is **always sorted by id** so any two transactions
99/// touching the same record set acquire the locks in the same order.
100/// That's the standard prevention strategy for the simplest deadlock
101/// scenario — two writers each holding one of the other's locks.
102pub struct WriteSetLocks {
103 /// Boxed because `MutexGuard` is `!Unpin` and we want to keep the
104 /// guards live by value. The `Arc<Mutex<()>>` originates from the
105 /// `LockTable`; we keep both the Arc *and* the guard so the
106 /// `'static` guard is anchored by the Arc that owns the lock.
107 _guards: Vec<OwnedMutexGuard>,
108}
109
110/// Wrapper that pairs an `Arc<Mutex<()>>` with the `MutexGuard<'_>`
111/// it produced. Extends the guard's borrow lifetime to the lifetime of
112/// the surrounding [`WriteSetLocks`]; safe because the `Arc` keeps the
113/// underlying `Mutex` alive at least as long as the guard, and the
114/// `Vec` drops guards before Arcs (Rust drops fields in declaration
115/// order, but `OwnedMutexGuard` also enforces it via its own struct
116/// drop order).
117struct OwnedMutexGuard {
118 /// Guard first so it drops first.
119 guard: Option<MutexGuard<'static, ()>>,
120 /// Arc kept alive while the guard exists.
121 _arc: Arc<Mutex<()>>,
122}
123
124impl OwnedMutexGuard {
125 /// SAFETY: extends the guard's lifetime from `'_` to `'static`. This
126 /// is sound because (a) `_arc` keeps the `Mutex<()>` alive for the
127 /// guard's full life, and (b) `Drop` for `OwnedMutexGuard` ensures
128 /// the guard is dropped before `_arc`.
129 fn lock(arc: Arc<Mutex<()>>) -> Self {
130 let guard = arc.lock().unwrap_or_else(|p| p.into_inner());
131 // Erase the lifetime — the Arc anchor keeps the Mutex live.
132 let guard: MutexGuard<'static, ()> =
133 unsafe { std::mem::transmute::<MutexGuard<'_, ()>, _>(guard) };
134 Self {
135 guard: Some(guard),
136 _arc: arc,
137 }
138 }
139}
140
141impl Drop for OwnedMutexGuard {
142 fn drop(&mut self) {
143 // Explicit drop order: guard before _arc.
144 self.guard.take();
145 }
146}
147
148impl WriteSetLocks {
149 /// Acquire write locks for every record in the write set, sorted
150 /// by id. Returns once all locks are held; the caller's commit
151 /// runs without contention on these records until `WriteSetLocks`
152 /// drops.
153 pub fn acquire(table: &LockTable, write_set: &MutationWriteSet) -> Self {
154 // Sort to prevent deadlock from inconsistent acquisition order
155 // across concurrent writers touching overlapping record sets.
156 let mut node_ids: Vec<NodeId> = write_set.nodes.iter().copied().collect();
157 node_ids.sort_unstable();
158 let mut rel_ids: Vec<RelationshipId> = write_set.rels.iter().copied().collect();
159 rel_ids.sort_unstable();
160
161 // Acquire nodes first, then rels. The two namespaces never
162 // alias because a NodeId and a RelationshipId can't share a
163 // lock entry (separate shard arrays in the LockTable).
164 let mut guards = Vec::with_capacity(node_ids.len() + rel_ids.len());
165 for id in node_ids {
166 guards.push(OwnedMutexGuard::lock(table.node_lock_arc(id)));
167 }
168 for id in rel_ids {
169 guards.push(OwnedMutexGuard::lock(table.rel_lock_arc(id)));
170 }
171 Self { _guards: guards }
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178 use std::sync::Arc;
179 use std::thread;
180 use std::time::Duration;
181
182 #[test]
183 fn lock_table_returns_same_arc_for_same_id() {
184 let table = LockTable::new();
185 let a = table.node_lock_arc(42);
186 let b = table.node_lock_arc(42);
187 assert!(Arc::ptr_eq(&a, &b));
188 }
189
190 #[test]
191 fn lock_table_distinct_ids_get_distinct_locks() {
192 let table = LockTable::new();
193 let a = table.node_lock_arc(1);
194 let b = table.node_lock_arc(2);
195 assert!(!Arc::ptr_eq(&a, &b));
196 }
197
198 #[test]
199 fn node_and_rel_namespaces_are_separate() {
200 let table = LockTable::new();
201 let n = table.node_lock_arc(7);
202 let r = table.rel_lock_arc(7);
203 assert!(!Arc::ptr_eq(&n, &r));
204 }
205
206 #[test]
207 fn write_set_locks_serialize_same_id() {
208 let table = Arc::new(LockTable::new());
209 let counter = Arc::new(Mutex::new(0u32));
210
211 let mut handles = Vec::new();
212 for _ in 0..4 {
213 let table = table.clone();
214 let counter = counter.clone();
215 handles.push(thread::spawn(move || {
216 let mut ws = MutationWriteSet::new();
217 ws.nodes.insert(99);
218 let _locks = WriteSetLocks::acquire(&table, &ws);
219 let mut c = counter.lock().unwrap();
220 let before = *c;
221 *c += 1;
222 // Held while we sleep — second thread can't enter
223 // because it's blocked on the same id's lock.
224 thread::sleep(Duration::from_millis(5));
225 assert_eq!(*c, before + 1, "lock did not serialize");
226 }));
227 }
228 for h in handles {
229 h.join().unwrap();
230 }
231 assert_eq!(*counter.lock().unwrap(), 4);
232 }
233}