#[cfg(not(target_arch = "wasm32"))]
use std::fs::{File, OpenOptions};
#[cfg(not(target_arch = "wasm32"))]
use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
#[cfg(not(target_arch = "wasm32"))]
use std::process;
use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
use fs2::FileExt;
use crate::error::{Error, Result};
pub(crate) struct LockFile {
path: PathBuf,
#[cfg(not(target_arch = "wasm32"))]
file: Option<File>,
}
impl LockFile {
pub const LOCK_FILE_NAME: &'static str = "LOCK";
pub fn new<P: AsRef<Path>>(path: P) -> Self {
let lock_path = path.as_ref().join(Self::LOCK_FILE_NAME);
Self {
path: lock_path,
#[cfg(not(target_arch = "wasm32"))]
file: None,
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn acquire(&mut self) -> Result<()> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&self.path)
.map_err(|e| Error::Io(Arc::new(e)))?;
file.try_lock_exclusive().map_err(|e| match e.kind() {
ErrorKind::WouldBlock => Error::Other(format!(
"Database at {} is already locked by another process",
self.path.display()
)),
_ => Error::Io(Arc::new(e)),
})?;
let pid = process::id();
let content = format!("{}\n", pid);
file.set_len(0)
.and_then(|_| file.try_clone()?.write_all(content.as_bytes()))
.map_err(|e| Error::Io(Arc::new(e)))?;
self.file = Some(file);
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn release(&mut self) -> Result<()> {
if let Some(_file) = self.file.take() {
}
Ok(())
}
#[cfg(target_arch = "wasm32")]
pub fn acquire(&mut self) -> Result<()> {
Ok(())
}
#[cfg(target_arch = "wasm32")]
pub fn release(&mut self) -> Result<()> {
Ok(())
}
}
#[cfg(not(target_arch = "wasm32"))]
impl Drop for LockFile {
fn drop(&mut self) {
let _ = self.release();
}
}
#[cfg(test)]
mod tests {
use std::fs;
use std::sync::{Arc, Barrier};
use tempfile::TempDir;
use test_log::test;
use super::*;
use crate::error::Error;
use crate::lsm::TreeBuilder;
#[test]
fn test_lock_acquisition_and_release() {
let temp_dir = TempDir::new().unwrap();
let mut lock = LockFile::new(temp_dir.path());
assert!(lock.acquire().is_ok());
assert!(temp_dir.path().join(LockFile::LOCK_FILE_NAME).exists());
assert!(lock.release().is_ok());
}
#[test]
fn test_lock_contention() {
let temp_dir = TempDir::new().unwrap();
let mut lock1 = LockFile::new(temp_dir.path());
assert!(lock1.acquire().is_ok());
let mut lock2 = LockFile::new(temp_dir.path());
assert!(lock2.acquire().is_err());
assert!(lock1.release().is_ok());
assert!(lock2.acquire().is_ok());
}
#[test]
fn test_lock_with_process_id() {
let temp_dir = TempDir::new().unwrap();
let mut lock = LockFile::new(temp_dir.path());
assert!(lock.acquire().is_ok());
let lock_path = temp_dir.path().join(LockFile::LOCK_FILE_NAME);
let content = fs::read_to_string(&lock_path).unwrap();
let pid = process::id().to_string();
assert!(content.trim() == pid);
}
#[test]
fn test_drop_releases_lock() {
let temp_dir = TempDir::new().unwrap();
{
let mut lock = LockFile::new(temp_dir.path());
assert!(lock.acquire().is_ok());
}
let mut lock2 = LockFile::new(temp_dir.path());
assert!(lock2.acquire().is_ok());
}
#[test]
fn test_stale_lockfile_doesnt_block() {
let temp_dir = TempDir::new().unwrap();
let lock_path = temp_dir.path().join(LockFile::LOCK_FILE_NAME);
fs::write(&lock_path, "99999\n").unwrap();
let mut lock = LockFile::new(temp_dir.path());
assert!(
lock.acquire().is_ok(),
"Should be able to acquire lock even with stale LOCK file, as OS-level locks are auto-released"
);
let content = fs::read_to_string(&lock_path).unwrap();
let current_pid = process::id().to_string();
assert!(
content.trim() == current_pid,
"Lock file should contain current PID, not stale PID"
);
lock.release().unwrap();
}
#[test(tokio::test)]
async fn test_lock_file_prevents_concurrent_access() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().to_path_buf();
let tree1 = TreeBuilder::new()
.with_path(temp_path.clone())
.build()
.expect("First tree should be created successfully");
let result = TreeBuilder::new().with_path(temp_path.clone()).build();
assert!(result.is_err(), "Second tree should fail to acquire lock");
if let Err(Error::Other(msg)) = result {
assert!(msg.contains("already locked"), "Error should indicate database is locked");
} else {
panic!("Expected a lock error");
}
tree1.close().await.unwrap();
let tree2 = TreeBuilder::new()
.with_path(temp_path)
.build()
.expect("After closing first tree, second should succeed");
tree2.close().await.unwrap();
}
#[test(tokio::test(flavor = "multi_thread"))]
async fn test_lock_file_multithreaded() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().to_path_buf();
let threads = 10;
let barrier = Arc::new(Barrier::new(threads));
let mut handles = vec![];
for i in 0..threads {
let path = temp_path.clone();
let thread_barrier = Arc::clone(&barrier);
let handle = tokio::task::spawn_blocking(move || {
thread_barrier.wait();
let result = TreeBuilder::new().with_path(path).build();
(i, result)
});
handles.push(handle);
}
let mut results = vec![];
for handle in handles {
results.push(handle.await.unwrap());
}
let success_count = results.iter().filter(|(_, result)| result.is_ok()).count();
assert_eq!(success_count, 1, "Exactly one thread should acquire the lock");
for (_, result) in results {
if let Ok(tree) = result {
tree.close().await.unwrap();
}
}
}
}