Skip to main content

noxu_txn/
thread_locker.rs

1//! ThreadLocker - per-thread locker.
2//!
3
4use hashbrown::HashSet;
5use std::sync::Arc;
6use std::thread;
7
8use crate::lock_manager::LockManager;
9use crate::locker::Locker;
10use crate::{LockResult, LockType, TxnError};
11
12/// A thread-based locker that shares locks with other ThreadLockers
13/// on the same thread.
14///
15/// ThreadLocker extends BasicLocker to track which thread created it.
16/// All ThreadLockers on the same thread share locks with each other,
17/// which allows multiple cursors to operate without lock conflicts.
18///
19/// This is used for auto-commit operations where a transaction context
20/// is not explicitly provided.
21///
22///
23pub struct ThreadLocker {
24    /// Unique locker ID.
25    id: i64,
26
27    /// Shared lock manager.
28    lock_manager: Arc<LockManager>,
29
30    /// Thread ID that created this locker (hashed for stable u64 representation).
31    ///
32    /// All ThreadLockers on the same thread share the same `thread_id` and
33    /// therefore share locks with each other.
34    thread_id: u64,
35
36    /// Set of LSNs currently locked by this locker.
37    locked_lsns: HashSet<u64>,
38
39    /// Lock timeout in milliseconds (0 = infinite).
40    lock_timeout_ms: u64,
41
42    /// Whether this locker uses non-blocking locks by default.
43    default_no_wait: bool,
44
45    /// Whether this locker is open.
46    is_open: bool,
47}
48
49impl ThreadLocker {
50    /// Creates a new ThreadLocker for the current thread.
51    ///
52    /// Registers this locker's thread ID in the LockManager's sharing registry
53    /// so that `LockImpl::try_lock()` can bypass conflict detection for co-owning
54    /// ThreadLockers on the same thread.
55    ///
56    /// # Arguments
57    /// * `id` - Unique locker ID
58    /// * `lock_manager` - Shared lock manager
59    pub fn new(id: i64, lock_manager: Arc<LockManager>) -> Self {
60        let tid = get_thread_id();
61        lock_manager.register_locker_sharing(id, tid as i64);
62        ThreadLocker {
63            id,
64            lock_manager,
65            thread_id: tid,
66            locked_lsns: HashSet::new(),
67            lock_timeout_ms: 5000, // Default 5 second timeout
68            default_no_wait: false,
69            is_open: true,
70        }
71    }
72
73    /// Creates a ThreadLocker with a specified timeout.
74    pub fn with_timeout(
75        id: i64,
76        lock_manager: Arc<LockManager>,
77        timeout_ms: u64,
78    ) -> Self {
79        let tid = get_thread_id();
80        lock_manager.register_locker_sharing(id, tid as i64);
81        ThreadLocker {
82            id,
83            lock_manager,
84            thread_id: tid,
85            locked_lsns: HashSet::new(),
86            lock_timeout_ms: timeout_ms,
87            default_no_wait: false,
88            is_open: true,
89        }
90    }
91
92    /// Returns the thread ID that created this locker.
93    pub fn get_thread_id(&self) -> u64 {
94        self.thread_id
95    }
96
97    /// Release all locks held by this locker.
98    pub fn release_all_locks(&mut self) -> Result<(), TxnError> {
99        for &lsn in &self.locked_lsns {
100            self.lock_manager.release(lsn, self.id)?;
101        }
102        self.locked_lsns.clear();
103        Ok(())
104    }
105
106    /// Sets the lock timeout.
107    pub fn set_lock_timeout(&mut self, timeout_ms: u64) {
108        self.lock_timeout_ms = timeout_ms;
109    }
110
111    /// Checks that this locker is being used by the correct thread.
112    fn check_thread(&self) -> Result<(), TxnError> {
113        let current_thread = get_thread_id();
114        if current_thread != self.thread_id {
115            return Err(TxnError::StateError(format!(
116                "ThreadLocker created on thread {} but used on thread {}",
117                self.thread_id, current_thread
118            )));
119        }
120        Ok(())
121    }
122}
123
124impl Locker for ThreadLocker {
125    fn id(&self) -> i64 {
126        self.id
127    }
128
129    fn lock(
130        &mut self,
131        lsn: u64,
132        lock_type: LockType,
133        non_blocking: bool,
134    ) -> Result<LockResult, TxnError> {
135        if !self.is_open {
136            return Err(TxnError::StateError("Locker is closed".to_string()));
137        }
138
139        // Check that we're on the right thread
140        self.check_thread()?;
141
142        // Use non_blocking parameter or default
143        let use_no_wait = non_blocking || self.default_no_wait;
144
145        // Ask the lock manager for the lock
146        let grant = self.lock_manager.lock(
147            lsn,
148            self.id,
149            lock_type,
150            use_no_wait,
151            false, // jump_ahead
152        )?;
153
154        // Track this lock
155        if grant.is_granted() {
156            self.locked_lsns.insert(lsn);
157        }
158
159        // ThreadLocker doesn't track write lock info (non-transactional)
160        Ok(LockResult::simple(grant))
161    }
162
163    fn release_lock(&mut self, lsn: u64) -> Result<(), TxnError> {
164        if self.locked_lsns.remove(&lsn) {
165            self.lock_manager.release(lsn, self.id)?;
166        }
167        Ok(())
168    }
169
170    fn owns_write_lock(&self, lsn: u64) -> bool {
171        self.lock_manager.is_owned_write_lock(lsn, self.id)
172    }
173
174    fn is_transactional(&self) -> bool {
175        false
176    }
177
178    fn lock_timeout_ms(&self) -> u64 {
179        self.lock_timeout_ms
180    }
181
182    fn default_no_wait(&self) -> bool {
183        self.default_no_wait
184    }
185
186    /// Returns true if the other locker was created on the same thread.
187    ///
188    /// Both lockers must be
189    /// ThreadLockers **and** have the same originating thread for sharing.
190    /// We check via the LockManager's sharing registry (locker_id → thread_id).
191    fn shares_locks_with(&self, other_id: i64) -> bool {
192        self.lock_manager.same_share_group(self.id, other_id)
193    }
194
195    fn close(&mut self) {
196        self.is_open = false;
197        let _ = self.release_all_locks();
198    }
199
200    fn is_open(&self) -> bool {
201        self.is_open
202    }
203}
204
205impl Drop for ThreadLocker {
206    fn drop(&mut self) {
207        // Ensure locks are released when locker is dropped.
208        let _ = self.release_all_locks();
209        // Deregister from the sharing registry.
210        self.lock_manager.unregister_locker_sharing(self.id);
211    }
212}
213
214/// Gets a stable thread ID for the current thread.
215///
216/// Since ThreadId::as_u64() is unstable, we use a hash of the thread ID.
217fn get_thread_id() -> u64 {
218    use std::collections::hash_map::DefaultHasher;
219    use std::hash::{Hash, Hasher};
220
221    let thread_id = thread::current().id();
222    let mut hasher = DefaultHasher::new();
223    thread_id.hash(&mut hasher);
224    hasher.finish()
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    fn setup() -> (Arc<LockManager>, ThreadLocker) {
232        let lm = Arc::new(LockManager::new());
233        let locker = ThreadLocker::new(1, lm.clone());
234        (lm, locker)
235    }
236
237    #[test]
238    fn test_new() {
239        let (_, locker) = setup();
240        assert_eq!(locker.id(), 1);
241        assert!(!locker.is_transactional());
242        assert!(locker.is_open());
243        assert!(locker.get_thread_id() > 0);
244    }
245
246    #[test]
247    fn test_lock_and_release() {
248        let (_, mut locker) = setup();
249
250        // Acquire a write lock
251        let result = locker.lock(100, LockType::Write, false).unwrap();
252        assert!(result.is_granted());
253
254        // Check that we own the lock
255        assert!(locker.owns_write_lock(100));
256
257        // Release the lock
258        locker.release_lock(100).unwrap();
259        assert!(!locker.owns_write_lock(100));
260    }
261
262    #[test]
263    fn test_release_all_locks() {
264        let (_, mut locker) = setup();
265
266        // Acquire multiple locks
267        locker.lock(100, LockType::Write, false).unwrap();
268        locker.lock(200, LockType::Write, false).unwrap();
269
270        locker.release_all_locks().unwrap();
271
272        assert!(!locker.owns_write_lock(100));
273        assert!(!locker.owns_write_lock(200));
274    }
275
276    #[test]
277    fn test_close_releases_locks() {
278        let (_, mut locker) = setup();
279
280        locker.lock(100, LockType::Write, false).unwrap();
281        assert!(locker.is_open());
282
283        locker.close();
284        assert!(!locker.is_open());
285        assert!(!locker.owns_write_lock(100));
286    }
287
288    #[test]
289    fn test_same_thread_check() {
290        let (_, mut locker) = setup();
291        // Should succeed on same thread
292        let result = locker.lock(100, LockType::Write, false);
293        assert!(result.is_ok());
294    }
295
296    #[test]
297    fn test_get_thread_id() {
298        let id1 = get_thread_id();
299        let id2 = get_thread_id();
300        // Same thread should have same ID
301        assert_eq!(id1, id2);
302    }
303}