Skip to main content

reddb_server/runtime/
locking.rs

1//! Intent-lock hierarchy adapter — `Resource` naming + `LockerGuard` RAII.
2//!
3//! Thin layer over `crate::storage::transaction::lock::LockManager`
4//! that gives the runtime dispatch paths a typed API:
5//!
6//! - Read dispatch: `(Global, IS) → (Collection, IS)`
7//! - Write dispatch: `(Global, IX) → (Collection, IX)`
8//! - DDL dispatch: `(Global, IX) → (Collection, X)`
9//!
10//! The adapter owns:
11//!
12//! 1. A `Resource` enum that maps the two hierarchy levels to the
13//!    byte-key format `LockManager` expects (no string collisions,
14//!    cheap encoding).
15//! 2. A `LockerGuard` that records each `(resource, mode)` pair in
16//!    acquisition order and releases them on drop. Releases run in
17//!    reverse order so the global lock is the last to go.
18//! 3. A tiny monotonic `TxnId` allocator keyed by the current
19//!    connection id — enough for deadlock detection to distinguish
20//!    concurrent acquirers.
21//!
22//! The adapter does **not** implement ordered-acquire enforcement
23//! via phantom types yet (TODO for P1.T4); callers currently discipline
24//! themselves by always going `Global → Collection`.
25
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::Arc;
28
29use crate::storage::transaction::lock::{LockManager, LockMode, LockResult, TxnId};
30
31/// Hierarchical resources the runtime locks on. The byte-key encoding
32/// prefixes each level with a discriminator so `Collection("global")`
33/// can never collide with the true global-scope `Resource::Global`.
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub enum Resource {
36    Global,
37    Collection(String),
38}
39
40impl Resource {
41    /// Deterministic byte-key for the underlying `LockManager`.
42    pub fn key(&self) -> Vec<u8> {
43        match self {
44            Resource::Global => b"G/".to_vec(),
45            Resource::Collection(name) => {
46                let mut out = Vec::with_capacity(2 + name.len());
47                out.extend_from_slice(b"C/");
48                out.extend_from_slice(name.as_bytes());
49                out
50            }
51        }
52    }
53}
54
55/// Outcome of a `try_acquire` — `Granted` / `Upgraded` fall through to
56/// the guard, the rest bubble up as an error the caller can log or
57/// retry.
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum AcquireError {
60    Deadlock(Vec<TxnId>),
61    Timeout,
62    LockLimitExceeded,
63    /// Requested mode isn't compatible with a mode this guard already
64    /// holds on the same resource and can't be upgraded there.
65    IncompatibleEscalation {
66        resource: Resource,
67        held: LockMode,
68        requested: LockMode,
69    },
70}
71
72impl std::fmt::Display for AcquireError {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        match self {
75            Self::Deadlock(cycle) => write!(f, "deadlock detected (cycle: {cycle:?})"),
76            Self::Timeout => f.write_str("lock acquire timed out"),
77            Self::LockLimitExceeded => f.write_str("per-txn lock limit exceeded"),
78            Self::IncompatibleEscalation {
79                resource,
80                held,
81                requested,
82            } => write!(
83                f,
84                "cannot escalate lock on {resource:?}: held={held:?} requested={requested:?}"
85            ),
86        }
87    }
88}
89
90impl std::error::Error for AcquireError {}
91
92/// Monotonic per-process TxnId allocator. Deadlock detection needs
93/// acquirer uniqueness, not connection identity, so a plain counter
94/// suffices. `0` is reserved by the underlying lock manager for "not
95/// a real txn", so allocation starts at `1`.
96static NEXT_TXN_ID: AtomicU64 = AtomicU64::new(1);
97
98pub fn fresh_txn_id() -> TxnId {
99    NEXT_TXN_ID.fetch_add(1, Ordering::Relaxed)
100}
101
102/// Acquisition record tracked by the guard so drop can release in
103/// reverse order.
104#[derive(Debug, Clone)]
105struct Held {
106    resource: Resource,
107    mode: LockMode,
108}
109
110/// RAII guard over a set of acquired locks. Drop releases every
111/// acquired resource in reverse-acquire order, so the common
112/// `Global → Collection` path releases `Collection` first then
113/// `Global`.
114pub struct LockerGuard {
115    manager: Arc<LockManager>,
116    txn_id: TxnId,
117    held: Vec<Held>,
118}
119
120impl LockerGuard {
121    /// Start a new guard bound to the given manager. No locks are
122    /// acquired yet — callers chain `acquire` calls.
123    pub fn new(manager: Arc<LockManager>) -> Self {
124        Self {
125            manager,
126            txn_id: fresh_txn_id(),
127            held: Vec::with_capacity(2),
128        }
129    }
130
131    /// Acquire a lock on `resource` with `mode`. Records the
132    /// acquisition so drop can reverse it. Rejects illegal upgrades
133    /// (already holding Exclusive, requesting Shared) with
134    /// `IncompatibleEscalation` so bugs in the dispatch layer don't
135    /// silently downgrade.
136    pub fn acquire(&mut self, resource: Resource, mode: LockMode) -> Result<(), AcquireError> {
137        // If we already hold this resource, only allow legal upgrades.
138        if let Some(existing) = self.held.iter().find(|h| h.resource == resource) {
139            let already = existing.mode;
140            if already == mode {
141                return Ok(());
142            }
143            if !already.can_upgrade_to(&mode) {
144                return Err(AcquireError::IncompatibleEscalation {
145                    resource,
146                    held: already,
147                    requested: mode,
148                });
149            }
150        }
151
152        let key = resource.key();
153        match self.manager.acquire(self.txn_id, &key, mode) {
154            LockResult::Granted | LockResult::Upgraded | LockResult::Waiting => {
155                // `Waiting` shouldn't surface under the blocking
156                // `acquire()` — treat it defensively as granted.
157                self.held.push(Held { resource, mode });
158                Ok(())
159            }
160            LockResult::Deadlock(cycle) => Err(AcquireError::Deadlock(cycle)),
161            LockResult::Timeout => Err(AcquireError::Timeout),
162            LockResult::LockLimitExceeded => Err(AcquireError::LockLimitExceeded),
163            // `AlreadyHeld` only fires when our own txn already had
164            // the lock — equivalent to a no-op acquire from the
165            // caller's POV. `TxnNotFound` shouldn't reach this layer
166            // (release-before-acquire bug); treat both as success.
167            LockResult::AlreadyHeld | LockResult::TxnNotFound => {
168                self.held.push(Held { resource, mode });
169                Ok(())
170            }
171        }
172    }
173
174    /// Number of currently-held resources. Useful for lock-stats
175    /// assertions in tests.
176    pub fn held_count(&self) -> usize {
177        self.held.len()
178    }
179
180    /// The txn id this guard uses for underlying `LockManager`
181    /// acquisitions. Exposed for lock-stats inspection in tests.
182    pub fn txn_id(&self) -> TxnId {
183        self.txn_id
184    }
185}
186
187impl Drop for LockerGuard {
188    fn drop(&mut self) {
189        // Release in reverse acquire order. The per-resource release
190        // is already robust against the lock not existing (manager
191        // returns false), so we can just drain.
192        while let Some(Held { resource, .. }) = self.held.pop() {
193            let key = resource.key();
194            self.manager.release(self.txn_id, &key);
195        }
196        // Belt-and-suspenders — clear any residue so deadlock
197        // detection's wait-graph doesn't leak this txn.
198        self.manager.release_all(self.txn_id);
199    }
200}
201
202// Unit-level tests live in `tests/unit_locking.rs` because the
203// project's lib-test target has pre-existing unrelated compile
204// errors that would block `cargo test --lib`.