noxu_txn/
thread_locker.rs1use hashbrown::HashSet;
5use std::sync::Arc;
6use std::thread;
7
8use crate::lock_manager::LockManager;
9use crate::locker::Locker;
10use crate::{LockResult, LockType, TxnError};
11
12pub struct ThreadLocker {
24 id: i64,
26
27 lock_manager: Arc<LockManager>,
29
30 thread_id: u64,
35
36 locked_lsns: HashSet<u64>,
38
39 lock_timeout_ms: u64,
41
42 default_no_wait: bool,
44
45 is_open: bool,
47}
48
49impl ThreadLocker {
50 pub fn new(id: i64, lock_manager: Arc<LockManager>) -> Self {
60 let tid = get_thread_id();
61 lock_manager.register_locker_sharing(id, tid as i64);
62 ThreadLocker {
63 id,
64 lock_manager,
65 thread_id: tid,
66 locked_lsns: HashSet::new(),
67 lock_timeout_ms: 5000, default_no_wait: false,
69 is_open: true,
70 }
71 }
72
73 pub fn with_timeout(
75 id: i64,
76 lock_manager: Arc<LockManager>,
77 timeout_ms: u64,
78 ) -> Self {
79 let tid = get_thread_id();
80 lock_manager.register_locker_sharing(id, tid as i64);
81 ThreadLocker {
82 id,
83 lock_manager,
84 thread_id: tid,
85 locked_lsns: HashSet::new(),
86 lock_timeout_ms: timeout_ms,
87 default_no_wait: false,
88 is_open: true,
89 }
90 }
91
92 pub fn get_thread_id(&self) -> u64 {
94 self.thread_id
95 }
96
97 pub fn release_all_locks(&mut self) -> Result<(), TxnError> {
99 for &lsn in &self.locked_lsns {
100 self.lock_manager.release(lsn, self.id)?;
101 }
102 self.locked_lsns.clear();
103 Ok(())
104 }
105
106 pub fn set_lock_timeout(&mut self, timeout_ms: u64) {
108 self.lock_timeout_ms = timeout_ms;
109 }
110
111 fn check_thread(&self) -> Result<(), TxnError> {
113 let current_thread = get_thread_id();
114 if current_thread != self.thread_id {
115 return Err(TxnError::StateError(format!(
116 "ThreadLocker created on thread {} but used on thread {}",
117 self.thread_id, current_thread
118 )));
119 }
120 Ok(())
121 }
122}
123
124impl Locker for ThreadLocker {
125 fn id(&self) -> i64 {
126 self.id
127 }
128
129 fn lock(
130 &mut self,
131 lsn: u64,
132 lock_type: LockType,
133 non_blocking: bool,
134 ) -> Result<LockResult, TxnError> {
135 if !self.is_open {
136 return Err(TxnError::StateError("Locker is closed".to_string()));
137 }
138
139 self.check_thread()?;
141
142 let use_no_wait = non_blocking || self.default_no_wait;
144
145 let grant = self.lock_manager.lock(
147 lsn,
148 self.id,
149 lock_type,
150 use_no_wait,
151 false, )?;
153
154 if grant.is_granted() {
156 self.locked_lsns.insert(lsn);
157 }
158
159 Ok(LockResult::simple(grant))
161 }
162
163 fn release_lock(&mut self, lsn: u64) -> Result<(), TxnError> {
164 if self.locked_lsns.remove(&lsn) {
165 self.lock_manager.release(lsn, self.id)?;
166 }
167 Ok(())
168 }
169
170 fn owns_write_lock(&self, lsn: u64) -> bool {
171 self.lock_manager.is_owned_write_lock(lsn, self.id)
172 }
173
174 fn is_transactional(&self) -> bool {
175 false
176 }
177
178 fn lock_timeout_ms(&self) -> u64 {
179 self.lock_timeout_ms
180 }
181
182 fn default_no_wait(&self) -> bool {
183 self.default_no_wait
184 }
185
186 fn shares_locks_with(&self, other_id: i64) -> bool {
192 self.lock_manager.same_share_group(self.id, other_id)
193 }
194
195 fn close(&mut self) {
196 self.is_open = false;
197 let _ = self.release_all_locks();
198 }
199
200 fn is_open(&self) -> bool {
201 self.is_open
202 }
203}
204
205impl Drop for ThreadLocker {
206 fn drop(&mut self) {
207 let _ = self.release_all_locks();
209 self.lock_manager.unregister_locker_sharing(self.id);
211 }
212}
213
214fn get_thread_id() -> u64 {
218 use std::collections::hash_map::DefaultHasher;
219 use std::hash::{Hash, Hasher};
220
221 let thread_id = thread::current().id();
222 let mut hasher = DefaultHasher::new();
223 thread_id.hash(&mut hasher);
224 hasher.finish()
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230
231 fn setup() -> (Arc<LockManager>, ThreadLocker) {
232 let lm = Arc::new(LockManager::new());
233 let locker = ThreadLocker::new(1, lm.clone());
234 (lm, locker)
235 }
236
237 #[test]
238 fn test_new() {
239 let (_, locker) = setup();
240 assert_eq!(locker.id(), 1);
241 assert!(!locker.is_transactional());
242 assert!(locker.is_open());
243 assert!(locker.get_thread_id() > 0);
244 }
245
246 #[test]
247 fn test_lock_and_release() {
248 let (_, mut locker) = setup();
249
250 let result = locker.lock(100, LockType::Write, false).unwrap();
252 assert!(result.is_granted());
253
254 assert!(locker.owns_write_lock(100));
256
257 locker.release_lock(100).unwrap();
259 assert!(!locker.owns_write_lock(100));
260 }
261
262 #[test]
263 fn test_release_all_locks() {
264 let (_, mut locker) = setup();
265
266 locker.lock(100, LockType::Write, false).unwrap();
268 locker.lock(200, LockType::Write, false).unwrap();
269
270 locker.release_all_locks().unwrap();
271
272 assert!(!locker.owns_write_lock(100));
273 assert!(!locker.owns_write_lock(200));
274 }
275
276 #[test]
277 fn test_close_releases_locks() {
278 let (_, mut locker) = setup();
279
280 locker.lock(100, LockType::Write, false).unwrap();
281 assert!(locker.is_open());
282
283 locker.close();
284 assert!(!locker.is_open());
285 assert!(!locker.owns_write_lock(100));
286 }
287
288 #[test]
289 fn test_same_thread_check() {
290 let (_, mut locker) = setup();
291 let result = locker.lock(100, LockType::Write, false);
293 assert!(result.is_ok());
294 }
295
296 #[test]
297 fn test_get_thread_id() {
298 let id1 = get_thread_id();
299 let id2 = get_thread_id();
300 assert_eq!(id1, id2);
302 }
303}