1use hashbrown::HashSet;
5use std::sync::Arc;
6
7use crate::lock_manager::LockManager;
8use crate::locker::Locker;
9use crate::{LockResult, LockType, TxnError};
10
11pub struct BasicLocker {
21 id: i64,
23
24 lock_manager: Arc<LockManager>,
26
27 locked_lsns: HashSet<u64>,
29
30 lock_timeout_ms: u64,
32
33 default_no_wait: bool,
35
36 is_open: bool,
38
39 locking_required: bool,
47}
48
49impl BasicLocker {
50 pub fn new(id: i64, lock_manager: Arc<LockManager>) -> Self {
56 BasicLocker {
57 id,
58 lock_manager,
59 locked_lsns: HashSet::new(),
60 lock_timeout_ms: 5000, default_no_wait: false,
62 is_open: true,
63 locking_required: true,
64 }
65 }
66
67 pub fn with_timeout(
69 id: i64,
70 lock_manager: Arc<LockManager>,
71 timeout_ms: u64,
72 ) -> Self {
73 BasicLocker {
74 id,
75 lock_manager,
76 locked_lsns: HashSet::new(),
77 lock_timeout_ms: timeout_ms,
78 default_no_wait: false,
79 is_open: true,
80 locking_required: true,
81 }
82 }
83
84 pub fn with_no_wait(id: i64, lock_manager: Arc<LockManager>) -> Self {
86 BasicLocker {
87 id,
88 lock_manager,
89 locked_lsns: HashSet::new(),
90 lock_timeout_ms: 5000,
91 default_no_wait: true,
92 is_open: true,
93 locking_required: true,
94 }
95 }
96
97 pub fn register_cursor(&mut self, is_internal_db_cursor: bool) {
105 self.locking_required = !is_internal_db_cursor;
106 }
107
108 pub fn release_all_locks(&mut self) -> Result<(), TxnError> {
112 for &lsn in &self.locked_lsns {
113 self.lock_manager.release(lsn, self.id)?;
114 }
115 self.locked_lsns.clear();
116 Ok(())
117 }
118
119 pub fn set_lock_timeout(&mut self, timeout_ms: u64) {
121 self.lock_timeout_ms = timeout_ms;
122 }
123
124 pub fn set_default_no_wait(&mut self, no_wait: bool) {
126 self.default_no_wait = no_wait;
127 }
128}
129
130impl Locker for BasicLocker {
131 fn id(&self) -> i64 {
132 self.id
133 }
134
135 fn lock(
136 &mut self,
137 lsn: u64,
138 lock_type: LockType,
139 non_blocking: bool,
140 ) -> Result<LockResult, TxnError> {
141 if !self.is_open {
142 return Err(TxnError::StateError("Locker is closed".to_string()));
143 }
144
145 let use_no_wait = non_blocking || self.default_no_wait;
147
148 let grant = self.lock_manager.lock(
150 lsn,
151 self.id,
152 lock_type,
153 use_no_wait,
154 false, )?;
156
157 if grant.is_granted() {
159 self.locked_lsns.insert(lsn);
160 }
161
162 Ok(LockResult::simple(grant))
164 }
165
166 fn release_lock(&mut self, lsn: u64) -> Result<(), TxnError> {
167 if self.locked_lsns.remove(&lsn) {
168 self.lock_manager.release(lsn, self.id)?;
169 }
170 Ok(())
171 }
172
173 fn owns_write_lock(&self, lsn: u64) -> bool {
174 self.lock_manager.is_owned_write_lock(lsn, self.id)
175 }
176
177 fn is_transactional(&self) -> bool {
178 false
179 }
180
181 fn lock_timeout_ms(&self) -> u64 {
182 self.lock_timeout_ms
183 }
184
185 fn default_no_wait(&self) -> bool {
186 self.default_no_wait
187 }
188
189 fn locking_required(&self) -> bool {
190 self.locking_required
191 }
192
193 fn operation_end(&mut self) -> Result<(), TxnError> {
194 self.release_all_locks()?;
195 self.close();
196 Ok(())
197 }
198
199 fn release_non_txn_locks(&mut self) -> Result<(), TxnError> {
200 self.release_all_locks()
201 }
202
203 fn non_txn_operation_end(&mut self) -> Result<(), TxnError> {
204 self.operation_end()
205 }
206
207 fn close(&mut self) {
208 self.is_open = false;
209 let _ = self.release_all_locks();
210 }
211
212 fn is_open(&self) -> bool {
213 self.is_open
214 }
215}
216
217impl Drop for BasicLocker {
218 fn drop(&mut self) {
219 let _ = self.release_all_locks();
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227
228 fn setup() -> (Arc<LockManager>, BasicLocker) {
229 let lm = Arc::new(LockManager::new());
230 let locker = BasicLocker::new(1, lm.clone());
231 (lm, locker)
232 }
233
234 #[test]
235 fn test_new() {
236 let (_, locker) = setup();
237 assert_eq!(locker.id(), 1);
238 assert!(!locker.is_transactional());
239 assert!(locker.is_open());
240 assert_eq!(locker.lock_timeout_ms(), 5000);
241 }
242
243 #[test]
244 fn test_lock_and_release() {
245 let (_, mut locker) = setup();
246
247 let result = locker.lock(100, LockType::Write, false).unwrap();
249 assert!(result.is_granted());
250
251 assert!(locker.owns_write_lock(100));
253
254 locker.release_lock(100).unwrap();
256 assert!(!locker.owns_write_lock(100));
257 }
258
259 #[test]
260 fn test_release_all_locks() {
261 let (_, mut locker) = setup();
262
263 locker.lock(100, LockType::Write, false).unwrap();
265 locker.lock(200, LockType::Write, false).unwrap();
266 locker.lock(300, LockType::Read, false).unwrap();
267
268 assert!(locker.owns_write_lock(100));
269 assert!(locker.owns_write_lock(200));
270
271 locker.release_all_locks().unwrap();
273
274 assert!(!locker.owns_write_lock(100));
275 assert!(!locker.owns_write_lock(200));
276 }
277
278 #[test]
279 fn test_close_releases_locks() {
280 let (_, mut locker) = setup();
281
282 locker.lock(100, LockType::Write, false).unwrap();
283 assert!(locker.is_open());
284 assert!(locker.owns_write_lock(100));
285
286 locker.close();
287 assert!(!locker.is_open());
288 assert!(!locker.owns_write_lock(100));
289 }
290
291 #[test]
292 fn test_with_timeout() {
293 let lm = Arc::new(LockManager::new());
294 let locker = BasicLocker::with_timeout(1, lm, 10000);
295 assert_eq!(locker.lock_timeout_ms(), 10000);
296 }
297
298 #[test]
299 fn test_with_no_wait() {
300 let lm = Arc::new(LockManager::new());
301 let locker = BasicLocker::with_no_wait(1, lm);
302 assert!(locker.default_no_wait());
303 }
304
305 #[test]
306 fn test_set_lock_timeout() {
307 let (_, mut locker) = setup();
308 locker.set_lock_timeout(20000);
309 assert_eq!(locker.lock_timeout_ms(), 20000);
310 }
311
312 #[test]
313 fn test_lock_after_close_fails() {
314 let (_, mut locker) = setup();
315 locker.close();
316
317 let result = locker.lock(100, LockType::Write, false);
318 assert!(result.is_err());
319 match result.unwrap_err() {
320 TxnError::StateError(msg) => assert!(msg.contains("closed")),
321 _ => panic!("Expected StateError"),
322 }
323 }
324}