use std::sync::atomic::AtomicBool;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use fs4::fs_std::FileExt;
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use polars_core::runtime::ASYNC;
use super::utils::FILE_CACHE_PREFIX;
pub(super) static GLOBAL_FILE_CACHE_LOCK: LazyLock<GlobalLock> = LazyLock::new(|| {
let path = FILE_CACHE_PREFIX.join(".process-lock");
let file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)
.map_err(|err| {
panic!("failed to open/create global file cache lockfile: {err}");
})
.unwrap();
let at_bool = Arc::new(AtomicBool::new(false));
let access_tracker = AccessTracker(at_bool.clone());
let notify_lock_acquired = Arc::new(tokio::sync::Notify::new());
let notify_lock_acquired_2 = notify_lock_acquired.clone();
ASYNC.spawn(async move {
let at_bool = std::mem::ManuallyDrop::new(at_bool);
let access_tracker = at_bool.as_ref();
let notify_lock_acquired = notify_lock_acquired_2;
let verbose = false;
loop {
if verbose {
eprintln!("file cache background unlock: waiting for acquisition notification");
}
notify_lock_acquired.notified().await;
if verbose {
eprintln!("file cache background unlock: got acquisition notification");
}
loop {
if !access_tracker.swap(false, std::sync::atomic::Ordering::Relaxed) {
if let Some(unlocked_by_this_call) = GLOBAL_FILE_CACHE_LOCK.try_unlock() {
if unlocked_by_this_call && verbose {
eprintln!(
"file cache background unlock: unlocked global file cache lockfile"
);
}
break;
}
}
tokio::time::sleep(Duration::from_secs(3)).await;
}
}
});
GlobalLock {
inner: RwLock::new(GlobalLockData { file, state: None }),
access_tracker,
notify_lock_acquired,
}
});
pub(super) enum LockedState {
Shared,
#[allow(dead_code)]
Eviction,
}
#[allow(dead_code)]
pub(super) type GlobalFileCacheGuardAny<'a> = RwLockReadGuard<'a, GlobalLockData>;
pub(super) type GlobalFileCacheGuardExclusive<'a> = RwLockWriteGuard<'a, GlobalLockData>;
pub(super) struct GlobalLockData {
file: std::fs::File,
state: Option<LockedState>,
}
pub(super) struct GlobalLock {
inner: RwLock<GlobalLockData>,
access_tracker: AccessTracker,
notify_lock_acquired: Arc<tokio::sync::Notify>,
}
#[derive(Clone)]
struct AccessTracker(Arc<AtomicBool>);
impl Drop for AccessTracker {
fn drop(&mut self) {
self.0.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
struct NotifyOnDrop(Arc<tokio::sync::Notify>);
impl Drop for NotifyOnDrop {
fn drop(&mut self) {
self.0.notify_one();
}
}
impl GlobalLock {
fn get_access_tracker(&self) -> AccessTracker {
let at = self.access_tracker.clone();
at.0.store(true, std::sync::atomic::Ordering::Relaxed);
at
}
fn try_unlock(&self) -> Option<bool> {
if let Some(mut this) = self.inner.try_write() {
if Arc::strong_count(&self.access_tracker.0) <= 2 {
return if this.state.take().is_some() {
FileExt::unlock(&this.file).unwrap();
Some(true)
} else {
Some(false)
};
}
}
None
}
pub(super) fn lock_shared(&self) -> GlobalFileCacheGuardAny<'_> {
let access_tracker = self.get_access_tracker();
let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());
{
let this = self.inner.read();
if let Some(LockedState::Shared) = this.state {
return this;
}
}
{
let mut this = self.inner.write();
if let Some(LockedState::Eviction) = this.state {
FileExt::unlock(&this.file).unwrap();
this.state = None;
}
if this.state.is_none() {
FileExt::lock_shared(&this.file).unwrap();
this.state = Some(LockedState::Shared);
}
}
debug_assert!(Arc::strong_count(&access_tracker.0) > 2);
{
let this = self.inner.read();
if let Some(LockedState::Eviction) = this.state {
drop(this);
return self.lock_shared();
}
assert!(
this.state.is_some(),
"impl error: global file cache lock was unlocked"
);
this
}
}
#[allow(dead_code)]
pub(super) fn try_lock_eviction(&self) -> Option<GlobalFileCacheGuardExclusive<'_>> {
let access_tracker = self.get_access_tracker();
if let Some(mut this) = self.inner.try_write() {
if
Arc::strong_count(&access_tracker.0) > 3 {
return None;
}
let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());
if let Some(ref state) = this.state {
if matches!(state, LockedState::Eviction) {
return Some(this);
}
}
if this.state.take().is_some() {
FileExt::unlock(&this.file).unwrap();
}
if this.file.try_lock_exclusive().is_ok() {
this.state = Some(LockedState::Eviction);
return Some(this);
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_test_lock() -> Arc<GlobalLock> {
let at_bool = Arc::new(AtomicBool::new(false));
std::mem::forget(at_bool.clone());
Arc::new(GlobalLock {
inner: RwLock::new(GlobalLockData {
file: tempfile::tempfile().unwrap(),
state: None,
}),
access_tracker: AccessTracker(at_bool),
notify_lock_acquired: Arc::new(tokio::sync::Notify::new()),
})
}
#[test]
fn try_unlock_when_not_locked() {
let lock = make_test_lock();
assert_eq!(lock.try_unlock(), Some(false));
}
#[test]
fn lock_shared_lifecycle() {
let lock = make_test_lock();
let guard = lock.lock_shared();
assert!(matches!(guard.state, Some(LockedState::Shared)));
drop(guard);
let guard = lock.lock_shared();
assert!(matches!(guard.state, Some(LockedState::Shared)));
drop(guard);
assert_eq!(lock.try_unlock(), Some(true));
assert_eq!(lock.try_unlock(), Some(false));
}
#[test]
fn concurrent_mixed_operations_stress() {
let lock = make_test_lock();
let mut handles = Vec::new();
for _ in 0..6 {
let lock = Arc::clone(&lock);
handles.push(std::thread::spawn(move || {
for _ in 0..200 {
let _guard = lock.lock_shared();
std::thread::yield_now();
}
}));
}
for _ in 0..2 {
let lock = Arc::clone(&lock);
handles.push(std::thread::spawn(move || {
for _ in 0..200 {
let _ = lock.try_unlock();
std::thread::yield_now();
}
}));
}
for _ in 0..2 {
let lock = Arc::clone(&lock);
handles.push(std::thread::spawn(move || {
for _ in 0..200 {
drop(lock.try_lock_eviction());
std::thread::yield_now();
}
}));
}
for h in handles {
h.join().unwrap();
}
}
}