reddb_server/runtime/locking.rs
1//! Intent-lock hierarchy adapter — `Resource` naming + `LockerGuard` RAII.
2//!
3//! Thin layer over `crate::storage::transaction::lock::LockManager`
4//! that gives the runtime dispatch paths a typed API:
5//!
6//! - Read dispatch: `(Global, IS) → (Collection, IS)`
7//! - Write dispatch: `(Global, IX) → (Collection, IX)`
8//! - DDL dispatch: `(Global, IX) → (Collection, X)`
9//!
10//! The adapter owns:
11//!
12//! 1. A `Resource` enum that maps the two hierarchy levels to the
13//! byte-key format `LockManager` expects (no string collisions,
14//! cheap encoding).
15//! 2. A `LockerGuard` that records each `(resource, mode)` pair in
16//! acquisition order and releases them on drop. Releases run in
17//! reverse order so the global lock is the last to go.
18//! 3. A tiny monotonic `TxnId` allocator keyed by the current
19//! connection id — enough for deadlock detection to distinguish
20//! concurrent acquirers.
21//!
22//! The adapter does **not** implement ordered-acquire enforcement
23//! via phantom types yet (TODO for P1.T4); callers currently discipline
24//! themselves by always going `Global → Collection`.
25
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::Arc;
28
29use crate::storage::transaction::lock::{LockManager, LockMode, LockResult, TxnId};
30
31/// Hierarchical resources the runtime locks on. The byte-key encoding
32/// prefixes each level with a discriminator so `Collection("global")`
33/// can never collide with the true global-scope `Resource::Global`.
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub enum Resource {
36 Global,
37 Collection(String),
38}
39
40impl Resource {
41 /// Deterministic byte-key for the underlying `LockManager`.
42 pub fn key(&self) -> Vec<u8> {
43 match self {
44 Resource::Global => b"G/".to_vec(),
45 Resource::Collection(name) => {
46 let mut out = Vec::with_capacity(2 + name.len());
47 out.extend_from_slice(b"C/");
48 out.extend_from_slice(name.as_bytes());
49 out
50 }
51 }
52 }
53}
54
55/// Outcome of a `try_acquire` — `Granted` / `Upgraded` fall through to
56/// the guard, the rest bubble up as an error the caller can log or
57/// retry.
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum AcquireError {
60 Deadlock(Vec<TxnId>),
61 Timeout,
62 LockLimitExceeded,
63 /// Requested mode isn't compatible with a mode this guard already
64 /// holds on the same resource and can't be upgraded there.
65 IncompatibleEscalation {
66 resource: Resource,
67 held: LockMode,
68 requested: LockMode,
69 },
70}
71
72impl std::fmt::Display for AcquireError {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 match self {
75 Self::Deadlock(cycle) => write!(f, "deadlock detected (cycle: {cycle:?})"),
76 Self::Timeout => f.write_str("lock acquire timed out"),
77 Self::LockLimitExceeded => f.write_str("per-txn lock limit exceeded"),
78 Self::IncompatibleEscalation {
79 resource,
80 held,
81 requested,
82 } => write!(
83 f,
84 "cannot escalate lock on {resource:?}: held={held:?} requested={requested:?}"
85 ),
86 }
87 }
88}
89
90impl std::error::Error for AcquireError {}
91
92/// Monotonic per-process TxnId allocator. Deadlock detection needs
93/// acquirer uniqueness, not connection identity, so a plain counter
94/// suffices. `0` is reserved by the underlying lock manager for "not
95/// a real txn", so allocation starts at `1`.
96static NEXT_TXN_ID: AtomicU64 = AtomicU64::new(1);
97
98pub fn fresh_txn_id() -> TxnId {
99 NEXT_TXN_ID.fetch_add(1, Ordering::Relaxed)
100}
101
102/// Acquisition record tracked by the guard so drop can release in
103/// reverse order.
104#[derive(Debug, Clone)]
105struct Held {
106 resource: Resource,
107 mode: LockMode,
108}
109
110/// RAII guard over a set of acquired locks. Drop releases every
111/// acquired resource in reverse-acquire order, so the common
112/// `Global → Collection` path releases `Collection` first then
113/// `Global`.
114pub struct LockerGuard {
115 manager: Arc<LockManager>,
116 txn_id: TxnId,
117 held: Vec<Held>,
118}
119
120impl LockerGuard {
121 /// Start a new guard bound to the given manager. No locks are
122 /// acquired yet — callers chain `acquire` calls.
123 pub fn new(manager: Arc<LockManager>) -> Self {
124 Self {
125 manager,
126 txn_id: fresh_txn_id(),
127 held: Vec::with_capacity(2),
128 }
129 }
130
131 /// Acquire a lock on `resource` with `mode`. Records the
132 /// acquisition so drop can reverse it. Rejects illegal upgrades
133 /// (already holding Exclusive, requesting Shared) with
134 /// `IncompatibleEscalation` so bugs in the dispatch layer don't
135 /// silently downgrade.
136 pub fn acquire(&mut self, resource: Resource, mode: LockMode) -> Result<(), AcquireError> {
137 // If we already hold this resource, only allow legal upgrades.
138 if let Some(existing) = self.held.iter().find(|h| h.resource == resource) {
139 let already = existing.mode;
140 if already == mode {
141 return Ok(());
142 }
143 if !already.can_upgrade_to(&mode) {
144 return Err(AcquireError::IncompatibleEscalation {
145 resource,
146 held: already,
147 requested: mode,
148 });
149 }
150 }
151
152 let key = resource.key();
153 match self.manager.acquire(self.txn_id, &key, mode) {
154 LockResult::Granted | LockResult::Upgraded | LockResult::Waiting => {
155 // `Waiting` shouldn't surface under the blocking
156 // `acquire()` — treat it defensively as granted.
157 self.held.push(Held { resource, mode });
158 Ok(())
159 }
160 LockResult::Deadlock(cycle) => Err(AcquireError::Deadlock(cycle)),
161 LockResult::Timeout => Err(AcquireError::Timeout),
162 LockResult::LockLimitExceeded => Err(AcquireError::LockLimitExceeded),
163 // `AlreadyHeld` only fires when our own txn already had
164 // the lock — equivalent to a no-op acquire from the
165 // caller's POV. `TxnNotFound` shouldn't reach this layer
166 // (release-before-acquire bug); treat both as success.
167 LockResult::AlreadyHeld | LockResult::TxnNotFound => {
168 self.held.push(Held { resource, mode });
169 Ok(())
170 }
171 }
172 }
173
174 /// Number of currently-held resources. Useful for lock-stats
175 /// assertions in tests.
176 pub fn held_count(&self) -> usize {
177 self.held.len()
178 }
179
180 /// The txn id this guard uses for underlying `LockManager`
181 /// acquisitions. Exposed for lock-stats inspection in tests.
182 pub fn txn_id(&self) -> TxnId {
183 self.txn_id
184 }
185}
186
187impl Drop for LockerGuard {
188 fn drop(&mut self) {
189 // Release in reverse acquire order. The per-resource release
190 // is already robust against the lock not existing (manager
191 // returns false), so we can just drain.
192 while let Some(Held { resource, .. }) = self.held.pop() {
193 let key = resource.key();
194 self.manager.release(self.txn_id, &key);
195 }
196 // Belt-and-suspenders — clear any residue so deadlock
197 // detection's wait-graph doesn't leak this txn.
198 self.manager.release_all(self.txn_id);
199 }
200}
201
202// Unit-level tests live in `tests/unit_locking.rs` because the
203// project's lib-test target has pre-existing unrelated compile
204// errors that would block `cargo test --lib`.