1use std::collections::HashMap;
66use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
67use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
68
69use parking_lot::{Mutex, RwLock};
70
71#[derive(Debug, Clone, PartialEq, Eq)]
77pub enum ClaimResult {
78 Success {
80 claim_token: ClaimToken,
82 },
83 AlreadyClaimed {
85 owner: String,
87 expires_at: u64,
89 },
90 TookOver {
92 previous_owner: String,
94 claim_token: ClaimToken,
96 },
97 NotFound,
99 Error(String),
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Hash)]
112pub struct ClaimToken {
113 pub queue_id: String,
115 pub task_id: String,
117 pub owner: String,
119 pub instance: u64,
121 pub created_at: u64,
123 pub expires_at: u64,
125}
126
127impl ClaimToken {
128 pub fn is_valid(&self, now_millis: u64) -> bool {
130 now_millis < self.expires_at
131 }
132
133 pub fn remaining_ms(&self, now_millis: u64) -> u64 {
135 self.expires_at.saturating_sub(now_millis)
136 }
137}
138
139#[derive(Debug, Clone)]
145struct ClaimEntry {
146 owner: String,
148 instance: u64,
150 claimed_at: u64,
152 expires_at: u64,
154 claim_count: u32,
156}
157
158impl ClaimEntry {
159 fn is_expired(&self, now_millis: u64) -> bool {
160 now_millis >= self.expires_at
161 }
162
163 fn to_token(&self, queue_id: &str, task_id: &str) -> ClaimToken {
164 ClaimToken {
165 queue_id: queue_id.to_string(),
166 task_id: task_id.to_string(),
167 owner: self.owner.clone(),
168 instance: self.instance,
169 created_at: self.claimed_at,
170 expires_at: self.expires_at,
171 }
172 }
173}
174
175pub struct AtomicClaimManager {
196 claims: RwLock<HashMap<String, HashMap<String, ClaimEntry>>>,
198 instance_counter: AtomicU64,
200 stats: RwLock<ClaimStats>,
202 claim_locks: RwLock<HashMap<String, std::sync::Arc<Mutex<()>>>>,
204}
205
206#[derive(Debug, Clone, Default)]
208pub struct ClaimStats {
209 pub attempts: u64,
211 pub successes: u64,
213 pub contentions: u64,
215 pub takeovers: u64,
217 pub acks: u64,
219 pub nacks: u64,
221 pub expirations: u64,
223}
224
225impl Default for AtomicClaimManager {
226 fn default() -> Self {
227 Self::new()
228 }
229}
230
231impl AtomicClaimManager {
232 pub fn new() -> Self {
234 Self {
235 claims: RwLock::new(HashMap::new()),
236 instance_counter: AtomicU64::new(1),
237 stats: RwLock::new(ClaimStats::default()),
238 claim_locks: RwLock::new(HashMap::new()),
239 }
240 }
241
242 fn get_claim_lock(&self, queue_id: &str, task_id: &str) -> std::sync::Arc<Mutex<()>> {
244 let key = format!("{}:{}", queue_id, task_id);
245
246 {
248 let locks = self.claim_locks.read();
249 if let Some(lock) = locks.get(&key) {
250 return lock.clone();
251 }
252 }
253
254 let mut locks = self.claim_locks.write();
256 locks
257 .entry(key)
258 .or_insert_with(|| std::sync::Arc::new(Mutex::new(())))
259 .clone()
260 }
261
262 pub fn claim(
276 &self,
277 queue_id: &str,
278 task_id: &str,
279 owner: &str,
280 lease_duration_ms: u64,
281 ) -> ClaimResult {
282 let now = current_time_millis();
283
284 let lock = self.get_claim_lock(queue_id, task_id);
286 let _guard = lock.lock();
287
288 self.stats.write().attempts += 1;
290
291 let mut claims = self.claims.write();
292 let queue_claims = claims
293 .entry(queue_id.to_string())
294 .or_insert_with(HashMap::new);
295
296 if let Some(existing) = queue_claims.get(task_id) {
298 if existing.owner == owner {
299 let instance = self.instance_counter.fetch_add(1, AtomicOrdering::SeqCst);
301 let new_entry = ClaimEntry {
302 owner: owner.to_string(),
303 instance,
304 claimed_at: now,
305 expires_at: now + lease_duration_ms,
306 claim_count: existing.claim_count + 1,
307 };
308 let token = new_entry.to_token(queue_id, task_id);
309 queue_claims.insert(task_id.to_string(), new_entry);
310
311 self.stats.write().successes += 1;
312 return ClaimResult::Success { claim_token: token };
313 }
314
315 if !existing.is_expired(now) {
316 self.stats.write().contentions += 1;
318 return ClaimResult::AlreadyClaimed {
319 owner: existing.owner.clone(),
320 expires_at: existing.expires_at,
321 };
322 }
323
324 let previous_owner = existing.owner.clone();
326 let instance = self.instance_counter.fetch_add(1, AtomicOrdering::SeqCst);
327 let new_entry = ClaimEntry {
328 owner: owner.to_string(),
329 instance,
330 claimed_at: now,
331 expires_at: now + lease_duration_ms,
332 claim_count: existing.claim_count + 1,
333 };
334 let token = new_entry.to_token(queue_id, task_id);
335 queue_claims.insert(task_id.to_string(), new_entry);
336
337 self.stats.write().takeovers += 1;
338 return ClaimResult::TookOver {
339 previous_owner,
340 claim_token: token,
341 };
342 }
343
344 let instance = self.instance_counter.fetch_add(1, AtomicOrdering::SeqCst);
346 let entry = ClaimEntry {
347 owner: owner.to_string(),
348 instance,
349 claimed_at: now,
350 expires_at: now + lease_duration_ms,
351 claim_count: 1,
352 };
353 let token = entry.to_token(queue_id, task_id);
354 queue_claims.insert(task_id.to_string(), entry);
355
356 self.stats.write().successes += 1;
357 ClaimResult::Success { claim_token: token }
358 }
359
360 pub fn release(&self, token: &ClaimToken) -> Result<(), String> {
364 let _now = current_time_millis();
365
366 let lock = self.get_claim_lock(&token.queue_id, &token.task_id);
367 let _guard = lock.lock();
368
369 let mut claims = self.claims.write();
370
371 if let Some(queue_claims) = claims.get_mut(&token.queue_id) {
372 if let Some(existing) = queue_claims.get(&token.task_id) {
373 if existing.instance != token.instance {
375 return Err("Stale claim token".to_string());
376 }
377 if existing.owner != token.owner {
378 return Err("Not claim owner".to_string());
379 }
380
381 queue_claims.remove(&token.task_id);
382 self.stats.write().acks += 1;
383 return Ok(());
384 }
385 }
386
387 Err("Claim not found".to_string())
388 }
389
390 pub fn extend(
394 &self,
395 queue_id: &str,
396 token: &ClaimToken,
397 additional_ms: u64,
398 ) -> Result<ClaimToken, String> {
399 let _now = current_time_millis();
400
401 let lock = self.get_claim_lock(queue_id, &token.task_id);
402 let _guard = lock.lock();
403
404 let mut claims = self.claims.write();
405
406 if let Some(queue_claims) = claims.get_mut(queue_id) {
407 if let Some(existing) = queue_claims.get_mut(&token.task_id) {
408 if existing.instance != token.instance {
410 return Err("Stale claim token".to_string());
411 }
412 if existing.owner != token.owner {
413 return Err("Not claim owner".to_string());
414 }
415
416 existing.expires_at += additional_ms;
418
419 return Ok(existing.to_token(queue_id, &token.task_id));
420 }
421 }
422
423 Err("Claim not found".to_string())
424 }
425
426 pub fn is_claimed(&self, queue_id: &str, task_id: &str) -> Option<(String, u64)> {
428 let now = current_time_millis();
429
430 let claims = self.claims.read();
431
432 if let Some(queue_claims) = claims.get(queue_id) {
433 if let Some(entry) = queue_claims.get(task_id) {
434 if !entry.is_expired(now) {
435 return Some((entry.owner.clone(), entry.expires_at));
436 }
437 }
438 }
439
440 None
441 }
442
443 pub fn get_token(&self, queue_id: &str, task_id: &str, owner: &str) -> Option<ClaimToken> {
445 let now = current_time_millis();
446
447 let claims = self.claims.read();
448
449 if let Some(queue_claims) = claims.get(queue_id) {
450 if let Some(entry) = queue_claims.get(task_id) {
451 if !entry.is_expired(now) && entry.owner == owner {
452 return Some(entry.to_token(queue_id, task_id));
453 }
454 }
455 }
456
457 None
458 }
459
460 pub fn cleanup_expired(&self) -> usize {
465 let now = current_time_millis();
466 let mut cleaned = 0;
467
468 let mut claims = self.claims.write();
469
470 for queue_claims in claims.values_mut() {
471 queue_claims.retain(|_, entry| {
472 if entry.is_expired(now) {
473 cleaned += 1;
474 false
475 } else {
476 true
477 }
478 });
479 }
480
481 if cleaned > 0 {
482 self.stats.write().expirations += cleaned as u64;
483 }
484
485 cleaned
486 }
487
488 pub fn stats(&self) -> ClaimStats {
490 self.stats.read().clone()
491 }
492
493 pub fn active_claims(&self, queue_id: &str) -> usize {
495 let now = current_time_millis();
496
497 self.claims
498 .read()
499 .get(queue_id)
500 .map(|q| q.values().filter(|e| !e.is_expired(now)).count())
501 .unwrap_or(0)
502 }
503
504 pub fn list_claims(&self, queue_id: &str) -> Vec<ClaimToken> {
506 let now = current_time_millis();
507
508 self.claims
509 .read()
510 .get(queue_id)
511 .map(|q| {
512 q.iter()
513 .filter(|(_, e)| !e.is_expired(now))
514 .map(|(task_id, e)| e.to_token(queue_id, task_id))
515 .collect()
516 })
517 .unwrap_or_default()
518 }
519}
520
521pub trait CompareAndSwap {
530 type Error: std::fmt::Debug;
532
533 fn insert_if_absent(&self, key: &[u8], value: &[u8]) -> Result<bool, Self::Error>;
537
538 fn compare_and_set(
542 &self,
543 key: &[u8],
544 expected: &[u8],
545 new_value: &[u8],
546 ) -> Result<bool, Self::Error>;
547
548 fn delete_if_match(&self, key: &[u8], expected: &[u8]) -> Result<bool, Self::Error>;
552}
553
554#[derive(Debug, Clone)]
560pub struct LeaseConfig {
561 pub default_lease_ms: u64,
563 pub min_lease_ms: u64,
565 pub max_lease_ms: u64,
567 pub cleanup_interval_ms: u64,
569 pub max_extensions: u32,
571}
572
573impl Default for LeaseConfig {
574 fn default() -> Self {
575 Self {
576 default_lease_ms: 30_000, min_lease_ms: 1_000, max_lease_ms: 3_600_000, cleanup_interval_ms: 5_000, max_extensions: 10,
581 }
582 }
583}
584
585pub struct LeaseManager {
587 claim_manager: AtomicClaimManager,
589 config: LeaseConfig,
591 last_cleanup: RwLock<Instant>,
593 extension_counts: RwLock<HashMap<String, u32>>,
595}
596
597impl LeaseManager {
598 pub fn new(config: LeaseConfig) -> Self {
600 Self {
601 claim_manager: AtomicClaimManager::new(),
602 config,
603 last_cleanup: RwLock::new(Instant::now()),
604 extension_counts: RwLock::new(HashMap::new()),
605 }
606 }
607
608 pub fn acquire(
610 &self,
611 queue_id: &str,
612 task_id: &str,
613 owner: &str,
614 lease_ms: Option<u64>,
615 ) -> ClaimResult {
616 self.maybe_cleanup();
617
618 let lease_duration = lease_ms
619 .unwrap_or(self.config.default_lease_ms)
620 .clamp(self.config.min_lease_ms, self.config.max_lease_ms);
621
622 self.claim_manager
623 .claim(queue_id, task_id, owner, lease_duration)
624 }
625
626 pub fn release(&self, queue_id: &str, token: &ClaimToken) -> Result<(), String> {
628 {
630 let key = format!("{}:{}", queue_id, token.task_id);
631 self.extension_counts.write().remove(&key);
632 }
633
634 self.claim_manager.release(token)
636 }
637
638 pub fn extend(
640 &self,
641 queue_id: &str,
642 token: &ClaimToken,
643 additional_ms: u64,
644 ) -> Result<ClaimToken, String> {
645 let key = format!("{}:{}", queue_id, token.task_id);
646
647 {
649 let counts = self.extension_counts.read();
650 if let Some(&count) = counts.get(&key) {
651 if count >= self.config.max_extensions {
652 return Err(format!(
653 "Maximum extensions ({}) reached",
654 self.config.max_extensions
655 ));
656 }
657 }
658 }
659
660 let additional = additional_ms.clamp(self.config.min_lease_ms, self.config.max_lease_ms);
662
663 let result = self.claim_manager.extend(queue_id, token, additional)?;
664
665 {
667 let mut counts = self.extension_counts.write();
668 *counts.entry(key).or_insert(0) += 1;
669 }
670
671 Ok(result)
672 }
673
674 pub fn stats(&self) -> ClaimStats {
676 self.claim_manager.stats()
677 }
678
679 pub fn cleanup(&self) -> usize {
681 *self.last_cleanup.write() = Instant::now();
682 self.claim_manager.cleanup_expired()
683 }
684
685 fn maybe_cleanup(&self) {
687 let should_cleanup = {
688 let last = self.last_cleanup.read();
689 last.elapsed() > Duration::from_millis(self.config.cleanup_interval_ms)
690 };
691
692 if should_cleanup {
693 self.cleanup();
694 }
695 }
696}
697
698fn current_time_millis() -> u64 {
704 SystemTime::now()
705 .duration_since(UNIX_EPOCH)
706 .unwrap_or_default()
707 .as_millis() as u64
708}
709
710#[cfg(test)]
715mod tests {
716 use super::*;
717 use std::sync::Arc;
718 use std::thread;
719
720 #[test]
721 fn test_claim_success() {
722 let manager = AtomicClaimManager::new();
723
724 match manager.claim("queue1", "task1", "worker1", 30_000) {
725 ClaimResult::Success { claim_token } => {
726 assert_eq!(claim_token.task_id, "task1");
727 assert_eq!(claim_token.owner, "worker1");
728 }
729 _ => panic!("Expected success"),
730 }
731 }
732
733 #[test]
734 fn test_claim_contention() {
735 let manager = AtomicClaimManager::new();
736
737 let result1 = manager.claim("queue1", "task1", "worker1", 30_000);
739 assert!(matches!(result1, ClaimResult::Success { .. }));
740
741 let result2 = manager.claim("queue1", "task1", "worker2", 30_000);
743 match result2 {
744 ClaimResult::AlreadyClaimed { owner, .. } => {
745 assert_eq!(owner, "worker1");
746 }
747 _ => panic!("Expected AlreadyClaimed"),
748 }
749 }
750
751 #[test]
752 fn test_claim_takeover() {
753 let manager = AtomicClaimManager::new();
754
755 let result1 = manager.claim("queue1", "task1", "worker1", 1);
757 assert!(matches!(result1, ClaimResult::Success { .. }));
758
759 thread::sleep(Duration::from_millis(10));
761
762 let result2 = manager.claim("queue1", "task1", "worker2", 30_000);
764 match result2 {
765 ClaimResult::TookOver { previous_owner, .. } => {
766 assert_eq!(previous_owner, "worker1");
767 }
768 _ => panic!("Expected TookOver, got {:?}", result2),
769 }
770 }
771
772 #[test]
773 fn test_concurrent_claims() {
774 let manager = Arc::new(AtomicClaimManager::new());
775 let successes = Arc::new(std::sync::atomic::AtomicUsize::new(0));
776
777 let mut handles = vec![];
778
779 for i in 0..10 {
780 let mgr = manager.clone();
781 let succ = successes.clone();
782
783 handles.push(thread::spawn(move || {
784 match mgr.claim("queue1", "task1", &format!("worker{}", i), 30_000) {
785 ClaimResult::Success { .. } => {
786 succ.fetch_add(1, AtomicOrdering::SeqCst);
787 }
788 _ => {}
789 }
790 }));
791 }
792
793 for h in handles {
794 h.join().unwrap();
795 }
796
797 assert_eq!(successes.load(AtomicOrdering::SeqCst), 1);
799 }
800
801 #[test]
802 fn test_claim_release_wrong_queue() {
803 let manager = AtomicClaimManager::new();
804
805 let token = match manager.claim("queue1", "task1", "worker1", 30_000) {
807 ClaimResult::Success { claim_token } => claim_token,
808 _ => panic!("Expected success"),
809 };
810
811 assert_eq!(token.queue_id, "queue1");
813
814 assert!(manager.is_claimed("queue1", "task1").is_some());
816
817 manager.release(&token).unwrap();
819
820 assert!(manager.is_claimed("queue1", "task1").is_none());
822 }
823
824 #[test]
825 fn test_multiple_queue_isolation() {
826 let manager = AtomicClaimManager::new();
827
828 let token1 = match manager.claim("queue1", "task1", "worker1", 30_000) {
830 ClaimResult::Success { claim_token } => claim_token,
831 _ => panic!("Expected success"),
832 };
833
834 let token2 = match manager.claim("queue2", "task1", "worker1", 30_000) {
835 ClaimResult::Success { claim_token } => claim_token,
836 _ => panic!("Expected success"),
837 };
838
839 assert_eq!(token1.queue_id, "queue1");
841 assert_eq!(token2.queue_id, "queue2");
842
843 assert!(manager.is_claimed("queue1", "task1").is_some());
845 assert!(manager.is_claimed("queue2", "task1").is_some());
846
847 manager.release(&token1).unwrap();
849 assert!(manager.is_claimed("queue1", "task1").is_none());
850 assert!(manager.is_claimed("queue2", "task1").is_some());
851
852 manager.release(&token2).unwrap();
854 assert!(manager.is_claimed("queue2", "task1").is_none());
855 }
856
857 #[test]
858 fn test_lease_manager_extension_limit() {
859 let config = LeaseConfig {
860 max_extensions: 2,
861 default_lease_ms: 100,
862 min_lease_ms: 10,
863 max_lease_ms: 1000,
864 cleanup_interval_ms: 10000,
865 };
866
867 let manager = LeaseManager::new(config);
868
869 let token = match manager.acquire("queue1", "task1", "worker1", None) {
870 ClaimResult::Success { claim_token } => claim_token,
871 _ => panic!("Expected success"),
872 };
873
874 let token = manager.extend("queue1", &token, 100).unwrap();
876
877 let token = manager.extend("queue1", &token, 100).unwrap();
879
880 let result = manager.extend("queue1", &token, 100);
882 assert!(result.is_err());
883 }
884
885 #[test]
886 fn test_cleanup_expired() {
887 let manager = AtomicClaimManager::new();
888
889 manager.claim("queue1", "task1", "worker1", 1);
891 manager.claim("queue1", "task2", "worker1", 1);
892 manager.claim("queue1", "task3", "worker1", 100_000); thread::sleep(Duration::from_millis(10));
895
896 let cleaned = manager.cleanup_expired();
897 assert_eq!(cleaned, 2); assert!(manager.is_claimed("queue1", "task3").is_some());
901 }
902
903 #[test]
904 fn test_stats_tracking() {
905 let manager = AtomicClaimManager::new();
906
907 manager.claim("queue1", "task1", "worker1", 30_000);
909
910 manager.claim("queue1", "task1", "worker2", 30_000);
912
913 let stats = manager.stats();
914 assert_eq!(stats.attempts, 2);
915 assert_eq!(stats.successes, 1);
916 assert_eq!(stats.contentions, 1);
917 }
918}