use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
use super::error::{StorageError, StorageResult};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageLock {
pub key: String,
pub holder: String,
pub acquired_at: DateTime<Utc>,
pub ttl: Duration,
pub token: String,
}
impl StorageLock {
pub fn new(key: String, holder: String, ttl: Duration) -> Self {
Self {
key,
holder,
acquired_at: Utc::now(),
ttl,
token: Uuid::new_v4().to_string(),
}
}
pub fn is_expired(&self) -> bool {
match chrono::Duration::from_std(self.ttl) {
Ok(duration) => {
let expiry = self.acquired_at + duration;
Utc::now() > expiry
}
Err(_) => {
true
}
}
}
pub fn remaining_ttl(&self) -> Option<Duration> {
match chrono::Duration::from_std(self.ttl) {
Ok(duration) => {
let expiry = self.acquired_at + duration;
let remaining = expiry - Utc::now();
if remaining > chrono::Duration::zero() {
remaining.to_std().ok()
} else {
None
}
}
Err(_) => {
None
}
}
}
}
#[async_trait]
pub trait StorageLockGuard: Send + Sync {
fn lock_info(&self) -> &StorageLock;
async fn release(self: Box<Self>) -> StorageResult<()>;
async fn extend(&mut self, additional_ttl: Duration) -> StorageResult<()>;
async fn is_valid(&self) -> StorageResult<bool>;
}
pub struct FileLockGuard {
lock: StorageLock,
lock_file: std::path::PathBuf,
}
impl FileLockGuard {
pub fn new(lock: StorageLock, lock_file: std::path::PathBuf) -> Self {
Self { lock, lock_file }
}
}
#[async_trait]
impl StorageLockGuard for FileLockGuard {
fn lock_info(&self) -> &StorageLock {
&self.lock
}
async fn release(self: Box<Self>) -> StorageResult<()> {
tokio::fs::remove_file(&self.lock_file)
.await
.map_err(|e| StorageError::lock(format!("Failed to release lock: {}", e)))?;
Ok(())
}
async fn extend(&mut self, additional_ttl: Duration) -> StorageResult<()> {
self.lock.ttl += additional_ttl;
let _metadata = tokio::fs::metadata(&self.lock_file).await?;
let _modified = std::time::SystemTime::now();
Ok(())
}
async fn is_valid(&self) -> StorageResult<bool> {
if self.lock.is_expired() {
return Ok(false);
}
Ok(self.lock_file.exists())
}
}
impl Drop for FileLockGuard {
fn drop(&mut self) {
if self.lock_file.exists() {
let _ = std::fs::remove_file(&self.lock_file);
}
}
}
pub struct LockManager {
backend: Arc<dyn LockBackend>,
}
impl LockManager {
pub fn new(backend: Arc<dyn LockBackend>) -> Self {
Self { backend }
}
pub async fn acquire_with_retry(
&self,
key: &str,
holder: &str,
ttl: Duration,
max_retries: u32,
retry_delay: Duration,
) -> StorageResult<Box<dyn StorageLockGuard>> {
let mut attempts = 0;
loop {
match self.backend.try_acquire(key, holder, ttl).await {
Ok(guard) => return Ok(guard),
Err(e) if e.is_conflict() && attempts < max_retries => {
attempts += 1;
tokio::time::sleep(retry_delay).await;
}
Err(e) => return Err(e),
}
}
}
pub async fn wait_for_lock(
&self,
key: &str,
holder: &str,
ttl: Duration,
timeout: Duration,
) -> StorageResult<Box<dyn StorageLockGuard>> {
let deadline = tokio::time::Instant::now() + timeout;
let retry_delay = Duration::from_millis(100);
while tokio::time::Instant::now() < deadline {
match self.backend.try_acquire(key, holder, ttl).await {
Ok(guard) => return Ok(guard),
Err(e) if e.is_conflict() => {
tokio::time::sleep(retry_delay).await;
}
Err(e) => return Err(e),
}
}
Err(StorageError::Timeout(timeout))
}
}
#[async_trait]
pub trait LockBackend: Send + Sync {
async fn try_acquire(
&self,
key: &str,
holder: &str,
ttl: Duration,
) -> StorageResult<Box<dyn StorageLockGuard>>;
async fn exists(&self, key: &str) -> StorageResult<bool>;
async fn force_release(&self, key: &str) -> StorageResult<()>;
async fn list_locks(&self) -> StorageResult<Vec<StorageLock>>;
}