use hashbrown::HashSet;
use std::sync::Arc;
use crate::lock_manager::LockManager;
use crate::locker::Locker;
use crate::{LockResult, LockType, TxnError};
pub struct BasicLocker {
id: i64,
lock_manager: Arc<LockManager>,
locked_lsns: HashSet<u64>,
lock_timeout_ms: u64,
default_no_wait: bool,
is_open: bool,
locking_required: bool,
}
impl BasicLocker {
pub fn new(id: i64, lock_manager: Arc<LockManager>) -> Self {
BasicLocker {
id,
lock_manager,
locked_lsns: HashSet::new(),
lock_timeout_ms: 5000, default_no_wait: false,
is_open: true,
locking_required: true,
}
}
pub fn with_timeout(
id: i64,
lock_manager: Arc<LockManager>,
timeout_ms: u64,
) -> Self {
BasicLocker {
id,
lock_manager,
locked_lsns: HashSet::new(),
lock_timeout_ms: timeout_ms,
default_no_wait: false,
is_open: true,
locking_required: true,
}
}
pub fn with_no_wait(id: i64, lock_manager: Arc<LockManager>) -> Self {
BasicLocker {
id,
lock_manager,
locked_lsns: HashSet::new(),
lock_timeout_ms: 5000,
default_no_wait: true,
is_open: true,
locking_required: true,
}
}
pub fn register_cursor(&mut self, is_internal_db_cursor: bool) {
self.locking_required = !is_internal_db_cursor;
}
pub fn release_all_locks(&mut self) -> Result<(), TxnError> {
for &lsn in &self.locked_lsns {
self.lock_manager.release(lsn, self.id)?;
}
self.locked_lsns.clear();
Ok(())
}
pub fn set_lock_timeout(&mut self, timeout_ms: u64) {
self.lock_timeout_ms = timeout_ms;
}
pub fn set_default_no_wait(&mut self, no_wait: bool) {
self.default_no_wait = no_wait;
}
}
impl Locker for BasicLocker {
fn id(&self) -> i64 {
self.id
}
fn lock(
&mut self,
lsn: u64,
lock_type: LockType,
non_blocking: bool,
) -> Result<LockResult, TxnError> {
if !self.is_open {
return Err(TxnError::StateError("Locker is closed".to_string()));
}
let use_no_wait = non_blocking || self.default_no_wait;
let grant = self.lock_manager.lock(
lsn,
self.id,
lock_type,
use_no_wait,
false, )?;
if grant.is_granted() {
self.locked_lsns.insert(lsn);
}
Ok(LockResult::simple(grant))
}
fn release_lock(&mut self, lsn: u64) -> Result<(), TxnError> {
if self.locked_lsns.remove(&lsn) {
self.lock_manager.release(lsn, self.id)?;
}
Ok(())
}
fn owns_write_lock(&self, lsn: u64) -> bool {
self.lock_manager.is_owned_write_lock(lsn, self.id)
}
fn is_transactional(&self) -> bool {
false
}
fn lock_timeout_ms(&self) -> u64 {
self.lock_timeout_ms
}
fn default_no_wait(&self) -> bool {
self.default_no_wait
}
fn locking_required(&self) -> bool {
self.locking_required
}
fn operation_end(&mut self) -> Result<(), TxnError> {
self.release_all_locks()?;
self.close();
Ok(())
}
fn release_non_txn_locks(&mut self) -> Result<(), TxnError> {
self.release_all_locks()
}
fn non_txn_operation_end(&mut self) -> Result<(), TxnError> {
self.operation_end()
}
fn close(&mut self) {
self.is_open = false;
let _ = self.release_all_locks();
}
fn is_open(&self) -> bool {
self.is_open
}
}
impl Drop for BasicLocker {
fn drop(&mut self) {
let _ = self.release_all_locks();
}
}
#[cfg(test)]
mod tests {
use super::*;
fn setup() -> (Arc<LockManager>, BasicLocker) {
let lm = Arc::new(LockManager::new());
let locker = BasicLocker::new(1, lm.clone());
(lm, locker)
}
#[test]
fn test_new() {
let (_, locker) = setup();
assert_eq!(locker.id(), 1);
assert!(!locker.is_transactional());
assert!(locker.is_open());
assert_eq!(locker.lock_timeout_ms(), 5000);
}
#[test]
fn test_lock_and_release() {
let (_, mut locker) = setup();
let result = locker.lock(100, LockType::Write, false).unwrap();
assert!(result.is_granted());
assert!(locker.owns_write_lock(100));
locker.release_lock(100).unwrap();
assert!(!locker.owns_write_lock(100));
}
#[test]
fn test_release_all_locks() {
let (_, mut locker) = setup();
locker.lock(100, LockType::Write, false).unwrap();
locker.lock(200, LockType::Write, false).unwrap();
locker.lock(300, LockType::Read, false).unwrap();
assert!(locker.owns_write_lock(100));
assert!(locker.owns_write_lock(200));
locker.release_all_locks().unwrap();
assert!(!locker.owns_write_lock(100));
assert!(!locker.owns_write_lock(200));
}
#[test]
fn test_close_releases_locks() {
let (_, mut locker) = setup();
locker.lock(100, LockType::Write, false).unwrap();
assert!(locker.is_open());
assert!(locker.owns_write_lock(100));
locker.close();
assert!(!locker.is_open());
assert!(!locker.owns_write_lock(100));
}
#[test]
fn test_with_timeout() {
let lm = Arc::new(LockManager::new());
let locker = BasicLocker::with_timeout(1, lm, 10000);
assert_eq!(locker.lock_timeout_ms(), 10000);
}
#[test]
fn test_with_no_wait() {
let lm = Arc::new(LockManager::new());
let locker = BasicLocker::with_no_wait(1, lm);
assert!(locker.default_no_wait());
}
#[test]
fn test_set_lock_timeout() {
let (_, mut locker) = setup();
locker.set_lock_timeout(20000);
assert_eq!(locker.lock_timeout_ms(), 20000);
}
#[test]
fn test_lock_after_close_fails() {
let (_, mut locker) = setup();
locker.close();
let result = locker.lock(100, LockType::Write, false);
assert!(result.is_err());
match result.unwrap_err() {
TxnError::StateError(msg) => assert!(msg.contains("closed")),
_ => panic!("Expected StateError"),
}
}
}