1use std::collections::HashMap;
66use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
67use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
68
69use parking_lot::{Mutex, RwLock};
70
71#[derive(Debug, Clone, PartialEq, Eq)]
77pub enum ClaimResult {
78 Success {
80 claim_token: ClaimToken,
82 },
83 AlreadyClaimed {
85 owner: String,
87 expires_at: u64,
89 },
90 TookOver {
92 previous_owner: String,
94 claim_token: ClaimToken,
96 },
97 NotFound,
99 Error(String),
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Hash)]
112pub struct ClaimToken {
113 pub task_id: String,
115 pub owner: String,
117 pub instance: u64,
119 pub created_at: u64,
121 pub expires_at: u64,
123}
124
125impl ClaimToken {
126 pub fn is_valid(&self, now_millis: u64) -> bool {
128 now_millis < self.expires_at
129 }
130
131 pub fn remaining_ms(&self, now_millis: u64) -> u64 {
133 self.expires_at.saturating_sub(now_millis)
134 }
135}
136
137#[derive(Debug, Clone)]
143struct ClaimEntry {
144 owner: String,
146 instance: u64,
148 claimed_at: u64,
150 expires_at: u64,
152 claim_count: u32,
154}
155
156impl ClaimEntry {
157 fn is_expired(&self, now_millis: u64) -> bool {
158 now_millis >= self.expires_at
159 }
160
161 fn to_token(&self, task_id: &str) -> ClaimToken {
162 ClaimToken {
163 task_id: task_id.to_string(),
164 owner: self.owner.clone(),
165 instance: self.instance,
166 created_at: self.claimed_at,
167 expires_at: self.expires_at,
168 }
169 }
170}
171
172pub struct AtomicClaimManager {
193 claims: RwLock<HashMap<String, HashMap<String, ClaimEntry>>>,
195 instance_counter: AtomicU64,
197 stats: RwLock<ClaimStats>,
199 claim_locks: RwLock<HashMap<String, std::sync::Arc<Mutex<()>>>>,
201}
202
203#[derive(Debug, Clone, Default)]
205pub struct ClaimStats {
206 pub attempts: u64,
208 pub successes: u64,
210 pub contentions: u64,
212 pub takeovers: u64,
214 pub acks: u64,
216 pub nacks: u64,
218 pub expirations: u64,
220}
221
222impl Default for AtomicClaimManager {
223 fn default() -> Self {
224 Self::new()
225 }
226}
227
228impl AtomicClaimManager {
229 pub fn new() -> Self {
231 Self {
232 claims: RwLock::new(HashMap::new()),
233 instance_counter: AtomicU64::new(1),
234 stats: RwLock::new(ClaimStats::default()),
235 claim_locks: RwLock::new(HashMap::new()),
236 }
237 }
238
239 fn get_claim_lock(&self, queue_id: &str, task_id: &str) -> std::sync::Arc<Mutex<()>> {
241 let key = format!("{}:{}", queue_id, task_id);
242
243 {
245 let locks = self.claim_locks.read();
246 if let Some(lock) = locks.get(&key) {
247 return lock.clone();
248 }
249 }
250
251 let mut locks = self.claim_locks.write();
253 locks.entry(key)
254 .or_insert_with(|| std::sync::Arc::new(Mutex::new(())))
255 .clone()
256 }
257
258 pub fn claim(
272 &self,
273 queue_id: &str,
274 task_id: &str,
275 owner: &str,
276 lease_duration_ms: u64,
277 ) -> ClaimResult {
278 let now = current_time_millis();
279
280 let lock = self.get_claim_lock(queue_id, task_id);
282 let _guard = lock.lock();
283
284 self.stats.write().attempts += 1;
286
287 let mut claims = self.claims.write();
288 let queue_claims = claims.entry(queue_id.to_string()).or_insert_with(HashMap::new);
289
290 if let Some(existing) = queue_claims.get(task_id) {
292 if existing.owner == owner {
293 let instance = self.instance_counter.fetch_add(1, AtomicOrdering::SeqCst);
295 let new_entry = ClaimEntry {
296 owner: owner.to_string(),
297 instance,
298 claimed_at: now,
299 expires_at: now + lease_duration_ms,
300 claim_count: existing.claim_count + 1,
301 };
302 let token = new_entry.to_token(task_id);
303 queue_claims.insert(task_id.to_string(), new_entry);
304
305 self.stats.write().successes += 1;
306 return ClaimResult::Success { claim_token: token };
307 }
308
309 if !existing.is_expired(now) {
310 self.stats.write().contentions += 1;
312 return ClaimResult::AlreadyClaimed {
313 owner: existing.owner.clone(),
314 expires_at: existing.expires_at,
315 };
316 }
317
318 let previous_owner = existing.owner.clone();
320 let instance = self.instance_counter.fetch_add(1, AtomicOrdering::SeqCst);
321 let new_entry = ClaimEntry {
322 owner: owner.to_string(),
323 instance,
324 claimed_at: now,
325 expires_at: now + lease_duration_ms,
326 claim_count: existing.claim_count + 1,
327 };
328 let token = new_entry.to_token(task_id);
329 queue_claims.insert(task_id.to_string(), new_entry);
330
331 self.stats.write().takeovers += 1;
332 return ClaimResult::TookOver {
333 previous_owner,
334 claim_token: token,
335 };
336 }
337
338 let instance = self.instance_counter.fetch_add(1, AtomicOrdering::SeqCst);
340 let entry = ClaimEntry {
341 owner: owner.to_string(),
342 instance,
343 claimed_at: now,
344 expires_at: now + lease_duration_ms,
345 claim_count: 1,
346 };
347 let token = entry.to_token(task_id);
348 queue_claims.insert(task_id.to_string(), entry);
349
350 self.stats.write().successes += 1;
351 ClaimResult::Success { claim_token: token }
352 }
353
354 pub fn release(&self, token: &ClaimToken) -> Result<(), String> {
358 let _now = current_time_millis();
359
360 let lock = self.get_claim_lock(&token.owner, &token.task_id);
361 let _guard = lock.lock();
362
363 let mut claims = self.claims.write();
364
365 if let Some(queue_claims) = claims.get_mut(&token.owner) {
366 if let Some(existing) = queue_claims.get(&token.task_id) {
367 if existing.instance != token.instance {
369 return Err("Stale claim token".to_string());
370 }
371 if existing.owner != token.owner {
372 return Err("Not claim owner".to_string());
373 }
374
375 queue_claims.remove(&token.task_id);
376 self.stats.write().acks += 1;
377 return Ok(());
378 }
379 }
380
381 Err("Claim not found".to_string())
382 }
383
384 pub fn extend(
388 &self,
389 queue_id: &str,
390 token: &ClaimToken,
391 additional_ms: u64,
392 ) -> Result<ClaimToken, String> {
393 let _now = current_time_millis();
394
395 let lock = self.get_claim_lock(queue_id, &token.task_id);
396 let _guard = lock.lock();
397
398 let mut claims = self.claims.write();
399
400 if let Some(queue_claims) = claims.get_mut(queue_id) {
401 if let Some(existing) = queue_claims.get_mut(&token.task_id) {
402 if existing.instance != token.instance {
404 return Err("Stale claim token".to_string());
405 }
406 if existing.owner != token.owner {
407 return Err("Not claim owner".to_string());
408 }
409
410 existing.expires_at += additional_ms;
412
413 return Ok(existing.to_token(&token.task_id));
414 }
415 }
416
417 Err("Claim not found".to_string())
418 }
419
420 pub fn is_claimed(&self, queue_id: &str, task_id: &str) -> Option<(String, u64)> {
422 let now = current_time_millis();
423
424 let claims = self.claims.read();
425
426 if let Some(queue_claims) = claims.get(queue_id) {
427 if let Some(entry) = queue_claims.get(task_id) {
428 if !entry.is_expired(now) {
429 return Some((entry.owner.clone(), entry.expires_at));
430 }
431 }
432 }
433
434 None
435 }
436
437 pub fn get_token(&self, queue_id: &str, task_id: &str, owner: &str) -> Option<ClaimToken> {
439 let now = current_time_millis();
440
441 let claims = self.claims.read();
442
443 if let Some(queue_claims) = claims.get(queue_id) {
444 if let Some(entry) = queue_claims.get(task_id) {
445 if !entry.is_expired(now) && entry.owner == owner {
446 return Some(entry.to_token(task_id));
447 }
448 }
449 }
450
451 None
452 }
453
454 pub fn cleanup_expired(&self) -> usize {
459 let now = current_time_millis();
460 let mut cleaned = 0;
461
462 let mut claims = self.claims.write();
463
464 for queue_claims in claims.values_mut() {
465 queue_claims.retain(|_, entry| {
466 if entry.is_expired(now) {
467 cleaned += 1;
468 false
469 } else {
470 true
471 }
472 });
473 }
474
475 if cleaned > 0 {
476 self.stats.write().expirations += cleaned as u64;
477 }
478
479 cleaned
480 }
481
482 pub fn stats(&self) -> ClaimStats {
484 self.stats.read().clone()
485 }
486
487 pub fn active_claims(&self, queue_id: &str) -> usize {
489 let now = current_time_millis();
490
491 self.claims.read()
492 .get(queue_id)
493 .map(|q| q.values().filter(|e| !e.is_expired(now)).count())
494 .unwrap_or(0)
495 }
496
497 pub fn list_claims(&self, queue_id: &str) -> Vec<ClaimToken> {
499 let now = current_time_millis();
500
501 self.claims.read()
502 .get(queue_id)
503 .map(|q| {
504 q.iter()
505 .filter(|(_, e)| !e.is_expired(now))
506 .map(|(task_id, e)| e.to_token(task_id))
507 .collect()
508 })
509 .unwrap_or_default()
510 }
511}
512
513pub trait CompareAndSwap {
522 type Error: std::fmt::Debug;
524
525 fn insert_if_absent(&self, key: &[u8], value: &[u8]) -> Result<bool, Self::Error>;
529
530 fn compare_and_set(
534 &self,
535 key: &[u8],
536 expected: &[u8],
537 new_value: &[u8],
538 ) -> Result<bool, Self::Error>;
539
540 fn delete_if_match(&self, key: &[u8], expected: &[u8]) -> Result<bool, Self::Error>;
544}
545
546#[derive(Debug, Clone)]
552pub struct LeaseConfig {
553 pub default_lease_ms: u64,
555 pub min_lease_ms: u64,
557 pub max_lease_ms: u64,
559 pub cleanup_interval_ms: u64,
561 pub max_extensions: u32,
563}
564
565impl Default for LeaseConfig {
566 fn default() -> Self {
567 Self {
568 default_lease_ms: 30_000, min_lease_ms: 1_000, max_lease_ms: 3_600_000, cleanup_interval_ms: 5_000, max_extensions: 10,
573 }
574 }
575}
576
577pub struct LeaseManager {
579 claim_manager: AtomicClaimManager,
581 config: LeaseConfig,
583 last_cleanup: RwLock<Instant>,
585 extension_counts: RwLock<HashMap<String, u32>>,
587}
588
589impl LeaseManager {
590 pub fn new(config: LeaseConfig) -> Self {
592 Self {
593 claim_manager: AtomicClaimManager::new(),
594 config,
595 last_cleanup: RwLock::new(Instant::now()),
596 extension_counts: RwLock::new(HashMap::new()),
597 }
598 }
599
600 pub fn acquire(
602 &self,
603 queue_id: &str,
604 task_id: &str,
605 owner: &str,
606 lease_ms: Option<u64>,
607 ) -> ClaimResult {
608 self.maybe_cleanup();
609
610 let lease_duration = lease_ms
611 .unwrap_or(self.config.default_lease_ms)
612 .clamp(self.config.min_lease_ms, self.config.max_lease_ms);
613
614 self.claim_manager.claim(queue_id, task_id, owner, lease_duration)
615 }
616
617 pub fn release(&self, queue_id: &str, token: &ClaimToken) -> Result<(), String> {
619 {
621 let key = format!("{}:{}", queue_id, token.task_id);
622 self.extension_counts.write().remove(&key);
623 }
624
625 let _now = current_time_millis();
629
630 let mut claims = self.claim_manager.claims.write();
631 if let Some(queue_claims) = claims.get_mut(queue_id) {
632 if let Some(existing) = queue_claims.get(&token.task_id) {
633 if existing.instance == token.instance {
634 queue_claims.remove(&token.task_id);
635 self.claim_manager.stats.write().acks += 1;
636 return Ok(());
637 } else {
638 return Err("Stale claim token".to_string());
639 }
640 }
641 }
642
643 Err("Claim not found".to_string())
644 }
645
646 pub fn extend(
648 &self,
649 queue_id: &str,
650 token: &ClaimToken,
651 additional_ms: u64,
652 ) -> Result<ClaimToken, String> {
653 let key = format!("{}:{}", queue_id, token.task_id);
654
655 {
657 let counts = self.extension_counts.read();
658 if let Some(&count) = counts.get(&key) {
659 if count >= self.config.max_extensions {
660 return Err(format!(
661 "Maximum extensions ({}) reached",
662 self.config.max_extensions
663 ));
664 }
665 }
666 }
667
668 let additional = additional_ms.clamp(
670 self.config.min_lease_ms,
671 self.config.max_lease_ms,
672 );
673
674 let result = self.claim_manager.extend(queue_id, token, additional)?;
675
676 {
678 let mut counts = self.extension_counts.write();
679 *counts.entry(key).or_insert(0) += 1;
680 }
681
682 Ok(result)
683 }
684
685 pub fn stats(&self) -> ClaimStats {
687 self.claim_manager.stats()
688 }
689
690 pub fn cleanup(&self) -> usize {
692 *self.last_cleanup.write() = Instant::now();
693 self.claim_manager.cleanup_expired()
694 }
695
696 fn maybe_cleanup(&self) {
698 let should_cleanup = {
699 let last = self.last_cleanup.read();
700 last.elapsed() > Duration::from_millis(self.config.cleanup_interval_ms)
701 };
702
703 if should_cleanup {
704 self.cleanup();
705 }
706 }
707}
708
709fn current_time_millis() -> u64 {
715 SystemTime::now()
716 .duration_since(UNIX_EPOCH)
717 .unwrap_or_default()
718 .as_millis() as u64
719}
720
721#[cfg(test)]
726mod tests {
727 use super::*;
728 use std::thread;
729 use std::sync::Arc;
730
731 #[test]
732 fn test_claim_success() {
733 let manager = AtomicClaimManager::new();
734
735 match manager.claim("queue1", "task1", "worker1", 30_000) {
736 ClaimResult::Success { claim_token } => {
737 assert_eq!(claim_token.task_id, "task1");
738 assert_eq!(claim_token.owner, "worker1");
739 }
740 _ => panic!("Expected success"),
741 }
742 }
743
744 #[test]
745 fn test_claim_contention() {
746 let manager = AtomicClaimManager::new();
747
748 let result1 = manager.claim("queue1", "task1", "worker1", 30_000);
750 assert!(matches!(result1, ClaimResult::Success { .. }));
751
752 let result2 = manager.claim("queue1", "task1", "worker2", 30_000);
754 match result2 {
755 ClaimResult::AlreadyClaimed { owner, .. } => {
756 assert_eq!(owner, "worker1");
757 }
758 _ => panic!("Expected AlreadyClaimed"),
759 }
760 }
761
762 #[test]
763 fn test_claim_takeover() {
764 let manager = AtomicClaimManager::new();
765
766 let result1 = manager.claim("queue1", "task1", "worker1", 1);
768 assert!(matches!(result1, ClaimResult::Success { .. }));
769
770 thread::sleep(Duration::from_millis(10));
772
773 let result2 = manager.claim("queue1", "task1", "worker2", 30_000);
775 match result2 {
776 ClaimResult::TookOver { previous_owner, .. } => {
777 assert_eq!(previous_owner, "worker1");
778 }
779 _ => panic!("Expected TookOver, got {:?}", result2),
780 }
781 }
782
783 #[test]
784 fn test_concurrent_claims() {
785 let manager = Arc::new(AtomicClaimManager::new());
786 let successes = Arc::new(std::sync::atomic::AtomicUsize::new(0));
787
788 let mut handles = vec![];
789
790 for i in 0..10 {
791 let mgr = manager.clone();
792 let succ = successes.clone();
793
794 handles.push(thread::spawn(move || {
795 match mgr.claim("queue1", "task1", &format!("worker{}", i), 30_000) {
796 ClaimResult::Success { .. } => {
797 succ.fetch_add(1, AtomicOrdering::SeqCst);
798 }
799 _ => {}
800 }
801 }));
802 }
803
804 for h in handles {
805 h.join().unwrap();
806 }
807
808 assert_eq!(successes.load(AtomicOrdering::SeqCst), 1);
810 }
811
812 #[test]
813 fn test_claim_release() {
814 let manager = AtomicClaimManager::new();
815
816 let token = match manager.claim("queue1", "task1", "worker1", 30_000) {
818 ClaimResult::Success { claim_token } => claim_token,
819 _ => panic!("Expected success"),
820 };
821
822 assert!(manager.is_claimed("queue1", "task1").is_some());
824
825 manager.cleanup_expired();
829
830 }
832
833 #[test]
834 fn test_lease_manager_extension_limit() {
835 let config = LeaseConfig {
836 max_extensions: 2,
837 default_lease_ms: 100,
838 min_lease_ms: 10,
839 max_lease_ms: 1000,
840 cleanup_interval_ms: 10000,
841 };
842
843 let manager = LeaseManager::new(config);
844
845 let token = match manager.acquire("queue1", "task1", "worker1", None) {
846 ClaimResult::Success { claim_token } => claim_token,
847 _ => panic!("Expected success"),
848 };
849
850 let token = manager.extend("queue1", &token, 100).unwrap();
852
853 let token = manager.extend("queue1", &token, 100).unwrap();
855
856 let result = manager.extend("queue1", &token, 100);
858 assert!(result.is_err());
859 }
860
861 #[test]
862 fn test_cleanup_expired() {
863 let manager = AtomicClaimManager::new();
864
865 manager.claim("queue1", "task1", "worker1", 1);
867 manager.claim("queue1", "task2", "worker1", 1);
868 manager.claim("queue1", "task3", "worker1", 100_000); thread::sleep(Duration::from_millis(10));
871
872 let cleaned = manager.cleanup_expired();
873 assert_eq!(cleaned, 2); assert!(manager.is_claimed("queue1", "task3").is_some());
877 }
878
879 #[test]
880 fn test_stats_tracking() {
881 let manager = AtomicClaimManager::new();
882
883 manager.claim("queue1", "task1", "worker1", 30_000);
885
886 manager.claim("queue1", "task1", "worker2", 30_000);
888
889 let stats = manager.stats();
890 assert_eq!(stats.attempts, 2);
891 assert_eq!(stats.successes, 1);
892 assert_eq!(stats.contentions, 1);
893 }
894}