use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use parking_lot::{Mutex, RwLock};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ClaimResult {
Success {
claim_token: ClaimToken,
},
AlreadyClaimed {
owner: String,
expires_at: u64,
},
TookOver {
previous_owner: String,
claim_token: ClaimToken,
},
NotFound,
Error(String),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ClaimToken {
pub queue_id: String,
pub task_id: String,
pub owner: String,
pub instance: u64,
pub created_at: u64,
pub expires_at: u64,
}
impl ClaimToken {
pub fn is_valid(&self, now_millis: u64) -> bool {
now_millis < self.expires_at
}
pub fn remaining_ms(&self, now_millis: u64) -> u64 {
self.expires_at.saturating_sub(now_millis)
}
}
#[derive(Debug, Clone)]
struct ClaimEntry {
owner: String,
instance: u64,
claimed_at: u64,
expires_at: u64,
claim_count: u32,
}
impl ClaimEntry {
fn is_expired(&self, now_millis: u64) -> bool {
now_millis >= self.expires_at
}
fn to_token(&self, queue_id: &str, task_id: &str) -> ClaimToken {
ClaimToken {
queue_id: queue_id.to_string(),
task_id: task_id.to_string(),
owner: self.owner.clone(),
instance: self.instance,
created_at: self.claimed_at,
expires_at: self.expires_at,
}
}
}
pub struct AtomicClaimManager {
claims: RwLock<HashMap<String, HashMap<String, ClaimEntry>>>,
instance_counter: AtomicU64,
stats: RwLock<ClaimStats>,
claim_locks: RwLock<HashMap<String, std::sync::Arc<Mutex<()>>>>,
}
#[derive(Debug, Clone, Default)]
pub struct ClaimStats {
pub attempts: u64,
pub successes: u64,
pub contentions: u64,
pub takeovers: u64,
pub acks: u64,
pub nacks: u64,
pub expirations: u64,
}
impl Default for AtomicClaimManager {
fn default() -> Self {
Self::new()
}
}
impl AtomicClaimManager {
pub fn new() -> Self {
Self {
claims: RwLock::new(HashMap::new()),
instance_counter: AtomicU64::new(1),
stats: RwLock::new(ClaimStats::default()),
claim_locks: RwLock::new(HashMap::new()),
}
}
fn get_claim_lock(&self, queue_id: &str, task_id: &str) -> std::sync::Arc<Mutex<()>> {
let key = format!("{}:{}", queue_id, task_id);
{
let locks = self.claim_locks.read();
if let Some(lock) = locks.get(&key) {
return lock.clone();
}
}
let mut locks = self.claim_locks.write();
locks
.entry(key)
.or_insert_with(|| std::sync::Arc::new(Mutex::new(())))
.clone()
}
pub fn claim(
&self,
queue_id: &str,
task_id: &str,
owner: &str,
lease_duration_ms: u64,
) -> ClaimResult {
let now = current_time_millis();
let lock = self.get_claim_lock(queue_id, task_id);
let _guard = lock.lock();
self.stats.write().attempts += 1;
let mut claims = self.claims.write();
let queue_claims = claims
.entry(queue_id.to_string())
.or_insert_with(HashMap::new);
if let Some(existing) = queue_claims.get(task_id) {
if existing.owner == owner {
let instance = self.instance_counter.fetch_add(1, AtomicOrdering::SeqCst);
let new_entry = ClaimEntry {
owner: owner.to_string(),
instance,
claimed_at: now,
expires_at: now + lease_duration_ms,
claim_count: existing.claim_count + 1,
};
let token = new_entry.to_token(queue_id, task_id);
queue_claims.insert(task_id.to_string(), new_entry);
self.stats.write().successes += 1;
return ClaimResult::Success { claim_token: token };
}
if !existing.is_expired(now) {
self.stats.write().contentions += 1;
return ClaimResult::AlreadyClaimed {
owner: existing.owner.clone(),
expires_at: existing.expires_at,
};
}
let previous_owner = existing.owner.clone();
let instance = self.instance_counter.fetch_add(1, AtomicOrdering::SeqCst);
let new_entry = ClaimEntry {
owner: owner.to_string(),
instance,
claimed_at: now,
expires_at: now + lease_duration_ms,
claim_count: existing.claim_count + 1,
};
let token = new_entry.to_token(queue_id, task_id);
queue_claims.insert(task_id.to_string(), new_entry);
self.stats.write().takeovers += 1;
return ClaimResult::TookOver {
previous_owner,
claim_token: token,
};
}
let instance = self.instance_counter.fetch_add(1, AtomicOrdering::SeqCst);
let entry = ClaimEntry {
owner: owner.to_string(),
instance,
claimed_at: now,
expires_at: now + lease_duration_ms,
claim_count: 1,
};
let token = entry.to_token(queue_id, task_id);
queue_claims.insert(task_id.to_string(), entry);
self.stats.write().successes += 1;
ClaimResult::Success { claim_token: token }
}
pub fn release(&self, token: &ClaimToken) -> Result<(), String> {
let _now = current_time_millis();
let lock = self.get_claim_lock(&token.queue_id, &token.task_id);
let _guard = lock.lock();
let mut claims = self.claims.write();
if let Some(queue_claims) = claims.get_mut(&token.queue_id) {
if let Some(existing) = queue_claims.get(&token.task_id) {
if existing.instance != token.instance {
return Err("Stale claim token".to_string());
}
if existing.owner != token.owner {
return Err("Not claim owner".to_string());
}
queue_claims.remove(&token.task_id);
self.stats.write().acks += 1;
return Ok(());
}
}
Err("Claim not found".to_string())
}
pub fn extend(
&self,
queue_id: &str,
token: &ClaimToken,
additional_ms: u64,
) -> Result<ClaimToken, String> {
let _now = current_time_millis();
let lock = self.get_claim_lock(queue_id, &token.task_id);
let _guard = lock.lock();
let mut claims = self.claims.write();
if let Some(queue_claims) = claims.get_mut(queue_id) {
if let Some(existing) = queue_claims.get_mut(&token.task_id) {
if existing.instance != token.instance {
return Err("Stale claim token".to_string());
}
if existing.owner != token.owner {
return Err("Not claim owner".to_string());
}
existing.expires_at += additional_ms;
return Ok(existing.to_token(queue_id, &token.task_id));
}
}
Err("Claim not found".to_string())
}
pub fn is_claimed(&self, queue_id: &str, task_id: &str) -> Option<(String, u64)> {
let now = current_time_millis();
let claims = self.claims.read();
if let Some(queue_claims) = claims.get(queue_id) {
if let Some(entry) = queue_claims.get(task_id) {
if !entry.is_expired(now) {
return Some((entry.owner.clone(), entry.expires_at));
}
}
}
None
}
pub fn get_token(&self, queue_id: &str, task_id: &str, owner: &str) -> Option<ClaimToken> {
let now = current_time_millis();
let claims = self.claims.read();
if let Some(queue_claims) = claims.get(queue_id) {
if let Some(entry) = queue_claims.get(task_id) {
if !entry.is_expired(now) && entry.owner == owner {
return Some(entry.to_token(queue_id, task_id));
}
}
}
None
}
pub fn cleanup_expired(&self) -> usize {
let now = current_time_millis();
let mut cleaned = 0;
let mut claims = self.claims.write();
for queue_claims in claims.values_mut() {
queue_claims.retain(|_, entry| {
if entry.is_expired(now) {
cleaned += 1;
false
} else {
true
}
});
}
if cleaned > 0 {
self.stats.write().expirations += cleaned as u64;
}
cleaned
}
pub fn stats(&self) -> ClaimStats {
self.stats.read().clone()
}
pub fn active_claims(&self, queue_id: &str) -> usize {
let now = current_time_millis();
self.claims
.read()
.get(queue_id)
.map(|q| q.values().filter(|e| !e.is_expired(now)).count())
.unwrap_or(0)
}
pub fn list_claims(&self, queue_id: &str) -> Vec<ClaimToken> {
let now = current_time_millis();
self.claims
.read()
.get(queue_id)
.map(|q| {
q.iter()
.filter(|(_, e)| !e.is_expired(now))
.map(|(task_id, e)| e.to_token(queue_id, task_id))
.collect()
})
.unwrap_or_default()
}
}
pub trait CompareAndSwap {
type Error: std::fmt::Debug;
fn insert_if_absent(&self, key: &[u8], value: &[u8]) -> Result<bool, Self::Error>;
fn compare_and_set(
&self,
key: &[u8],
expected: &[u8],
new_value: &[u8],
) -> Result<bool, Self::Error>;
fn delete_if_match(&self, key: &[u8], expected: &[u8]) -> Result<bool, Self::Error>;
}
#[derive(Debug, Clone)]
pub struct LeaseConfig {
pub default_lease_ms: u64,
pub min_lease_ms: u64,
pub max_lease_ms: u64,
pub cleanup_interval_ms: u64,
pub max_extensions: u32,
}
impl Default for LeaseConfig {
fn default() -> Self {
Self {
default_lease_ms: 30_000, min_lease_ms: 1_000, max_lease_ms: 3_600_000, cleanup_interval_ms: 5_000, max_extensions: 10,
}
}
}
pub struct LeaseManager {
claim_manager: AtomicClaimManager,
config: LeaseConfig,
last_cleanup: RwLock<Instant>,
extension_counts: RwLock<HashMap<String, u32>>,
}
impl LeaseManager {
pub fn new(config: LeaseConfig) -> Self {
Self {
claim_manager: AtomicClaimManager::new(),
config,
last_cleanup: RwLock::new(Instant::now()),
extension_counts: RwLock::new(HashMap::new()),
}
}
pub fn acquire(
&self,
queue_id: &str,
task_id: &str,
owner: &str,
lease_ms: Option<u64>,
) -> ClaimResult {
self.maybe_cleanup();
let lease_duration = lease_ms
.unwrap_or(self.config.default_lease_ms)
.clamp(self.config.min_lease_ms, self.config.max_lease_ms);
self.claim_manager
.claim(queue_id, task_id, owner, lease_duration)
}
pub fn release(&self, queue_id: &str, token: &ClaimToken) -> Result<(), String> {
{
let key = format!("{}:{}", queue_id, token.task_id);
self.extension_counts.write().remove(&key);
}
self.claim_manager.release(token)
}
pub fn extend(
&self,
queue_id: &str,
token: &ClaimToken,
additional_ms: u64,
) -> Result<ClaimToken, String> {
let key = format!("{}:{}", queue_id, token.task_id);
{
let counts = self.extension_counts.read();
if let Some(&count) = counts.get(&key) {
if count >= self.config.max_extensions {
return Err(format!(
"Maximum extensions ({}) reached",
self.config.max_extensions
));
}
}
}
let additional = additional_ms.clamp(self.config.min_lease_ms, self.config.max_lease_ms);
let result = self.claim_manager.extend(queue_id, token, additional)?;
{
let mut counts = self.extension_counts.write();
*counts.entry(key).or_insert(0) += 1;
}
Ok(result)
}
pub fn stats(&self) -> ClaimStats {
self.claim_manager.stats()
}
pub fn cleanup(&self) -> usize {
*self.last_cleanup.write() = Instant::now();
self.claim_manager.cleanup_expired()
}
fn maybe_cleanup(&self) {
let should_cleanup = {
let last = self.last_cleanup.read();
last.elapsed() > Duration::from_millis(self.config.cleanup_interval_ms)
};
if should_cleanup {
self.cleanup();
}
}
}
fn current_time_millis() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_claim_success() {
let manager = AtomicClaimManager::new();
match manager.claim("queue1", "task1", "worker1", 30_000) {
ClaimResult::Success { claim_token } => {
assert_eq!(claim_token.task_id, "task1");
assert_eq!(claim_token.owner, "worker1");
}
_ => panic!("Expected success"),
}
}
#[test]
fn test_claim_contention() {
let manager = AtomicClaimManager::new();
let result1 = manager.claim("queue1", "task1", "worker1", 30_000);
assert!(matches!(result1, ClaimResult::Success { .. }));
let result2 = manager.claim("queue1", "task1", "worker2", 30_000);
match result2 {
ClaimResult::AlreadyClaimed { owner, .. } => {
assert_eq!(owner, "worker1");
}
_ => panic!("Expected AlreadyClaimed"),
}
}
#[test]
fn test_claim_takeover() {
let manager = AtomicClaimManager::new();
let result1 = manager.claim("queue1", "task1", "worker1", 1);
assert!(matches!(result1, ClaimResult::Success { .. }));
thread::sleep(Duration::from_millis(10));
let result2 = manager.claim("queue1", "task1", "worker2", 30_000);
match result2 {
ClaimResult::TookOver { previous_owner, .. } => {
assert_eq!(previous_owner, "worker1");
}
_ => panic!("Expected TookOver, got {:?}", result2),
}
}
#[test]
fn test_concurrent_claims() {
let manager = Arc::new(AtomicClaimManager::new());
let successes = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let mut handles = vec![];
for i in 0..10 {
let mgr = manager.clone();
let succ = successes.clone();
handles.push(thread::spawn(move || {
match mgr.claim("queue1", "task1", &format!("worker{}", i), 30_000) {
ClaimResult::Success { .. } => {
succ.fetch_add(1, AtomicOrdering::SeqCst);
}
_ => {}
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(successes.load(AtomicOrdering::SeqCst), 1);
}
#[test]
fn test_claim_release_wrong_queue() {
let manager = AtomicClaimManager::new();
let token = match manager.claim("queue1", "task1", "worker1", 30_000) {
ClaimResult::Success { claim_token } => claim_token,
_ => panic!("Expected success"),
};
assert_eq!(token.queue_id, "queue1");
assert!(manager.is_claimed("queue1", "task1").is_some());
manager.release(&token).unwrap();
assert!(manager.is_claimed("queue1", "task1").is_none());
}
#[test]
fn test_multiple_queue_isolation() {
let manager = AtomicClaimManager::new();
let token1 = match manager.claim("queue1", "task1", "worker1", 30_000) {
ClaimResult::Success { claim_token } => claim_token,
_ => panic!("Expected success"),
};
let token2 = match manager.claim("queue2", "task1", "worker1", 30_000) {
ClaimResult::Success { claim_token } => claim_token,
_ => panic!("Expected success"),
};
assert_eq!(token1.queue_id, "queue1");
assert_eq!(token2.queue_id, "queue2");
assert!(manager.is_claimed("queue1", "task1").is_some());
assert!(manager.is_claimed("queue2", "task1").is_some());
manager.release(&token1).unwrap();
assert!(manager.is_claimed("queue1", "task1").is_none());
assert!(manager.is_claimed("queue2", "task1").is_some());
manager.release(&token2).unwrap();
assert!(manager.is_claimed("queue2", "task1").is_none());
}
#[test]
fn test_lease_manager_extension_limit() {
let config = LeaseConfig {
max_extensions: 2,
default_lease_ms: 100,
min_lease_ms: 10,
max_lease_ms: 1000,
cleanup_interval_ms: 10000,
};
let manager = LeaseManager::new(config);
let token = match manager.acquire("queue1", "task1", "worker1", None) {
ClaimResult::Success { claim_token } => claim_token,
_ => panic!("Expected success"),
};
let token = manager.extend("queue1", &token, 100).unwrap();
let token = manager.extend("queue1", &token, 100).unwrap();
let result = manager.extend("queue1", &token, 100);
assert!(result.is_err());
}
#[test]
fn test_cleanup_expired() {
let manager = AtomicClaimManager::new();
manager.claim("queue1", "task1", "worker1", 1);
manager.claim("queue1", "task2", "worker1", 1);
manager.claim("queue1", "task3", "worker1", 100_000);
thread::sleep(Duration::from_millis(10));
let cleaned = manager.cleanup_expired();
assert_eq!(cleaned, 2);
assert!(manager.is_claimed("queue1", "task3").is_some());
}
#[test]
fn test_stats_tracking() {
let manager = AtomicClaimManager::new();
manager.claim("queue1", "task1", "worker1", 30_000);
manager.claim("queue1", "task1", "worker2", 30_000);
let stats = manager.stats();
assert_eq!(stats.attempts, 2);
assert_eq!(stats.successes, 1);
assert_eq!(stats.contentions, 1);
}
}