use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use super::{LockGuard, LockManager, key::validate_key};
const DEFAULT_PURGE_INTERVAL: Duration = Duration::from_secs(60);
pub struct InternalMemoryLockManager {
keys: parking_lot::Mutex<HashMap<String, Weak<tokio::sync::Mutex<()>>>>,
last_purge: parking_lot::Mutex<Instant>,
purge_interval: Duration,
}
impl InternalMemoryLockManager {
pub fn new() -> Self {
Self {
keys: parking_lot::Mutex::new(HashMap::new()),
last_purge: parking_lot::Mutex::new(Instant::now()),
purge_interval: DEFAULT_PURGE_INTERVAL,
}
}
#[cfg(test)]
fn set_purge_interval(&mut self, interval: Duration) {
self.purge_interval = interval;
}
pub fn purge(&self) -> usize {
let mut keys = self.keys.lock();
let before = keys.len();
keys.retain(|_, weak| weak.strong_count() > 0);
before - keys.len()
}
fn key_mutex(&self, key: &str) -> Arc<tokio::sync::Mutex<()>> {
self.maybe_purge();
let mut keys = self.keys.lock();
if let Some(weak) = keys.get(key) {
if let Some(arc) = weak.upgrade() {
return arc;
}
}
let arc = Arc::new(tokio::sync::Mutex::new(()));
keys.insert(key.to_string(), Arc::downgrade(&arc));
arc
}
fn maybe_purge(&self) {
let now = Instant::now();
let mut last = self.last_purge.lock();
if now.duration_since(*last) >= self.purge_interval {
*last = now;
drop(last);
self.purge();
}
}
}
impl Default for InternalMemoryLockManager {
fn default() -> Self { Self::new() }
}
impl std::fmt::Debug for InternalMemoryLockManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InternalMemoryLockManager").finish()
}
}
#[async_trait::async_trait]
impl LockManager for InternalMemoryLockManager {
async fn try_lock(&self, key: &str) -> Option<Box<dyn LockGuard>> {
if let Err(e) = validate_key(key) {
log::warn!("rejecting lock key {:?}: {:#}", key, e);
return None;
}
let guard = self.key_mutex(key).try_lock_owned().ok()?;
Some(Box::new(InternalMemoryGuard { _guard: guard }))
}
async fn lock(
&self,
key: &str,
timeout: std::time::Duration,
) -> anyhow::Result<Box<dyn LockGuard>> {
super::key::validate_key(key)?;
let mutex = self.key_mutex(key);
match tokio::time::timeout(timeout, mutex.lock_owned()).await {
Ok(guard) => Ok(Box::new(InternalMemoryGuard { _guard: guard })),
Err(_) => anyhow::bail!(
"timed out acquiring lock {:?} after {:?}", key, timeout,
),
}
}
}
struct InternalMemoryGuard {
_guard: tokio::sync::OwnedMutexGuard<()>,
}
impl LockGuard for InternalMemoryGuard {}
impl std::fmt::Debug for InternalMemoryGuard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InternalMemoryGuard").finish()
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod test {
use super::*;
#[tokio::test]
async fn purge_drops_dead_entries_only() {
let mut mgr = InternalMemoryLockManager::new();
mgr.set_purge_interval(Duration::from_secs(3600));
{
let _g = mgr.try_lock("dead-a").await.unwrap();
}
{
let _g = mgr.try_lock("dead-b").await.unwrap();
}
let _alive = mgr.try_lock("alive").await.unwrap();
assert_eq!(mgr.purge(), 2, "should drop the two dead entries");
assert_eq!(mgr.purge(), 0, "second purge has nothing to do");
}
#[tokio::test]
async fn auto_purges_on_try_lock_after_interval() {
let mut mgr = InternalMemoryLockManager::new();
mgr.set_purge_interval(Duration::from_millis(0));
{
let _g = mgr.try_lock("dead").await.unwrap();
}
let _g = mgr.try_lock("next").await.unwrap();
assert_eq!(mgr.purge(), 0);
}
}