Skip to main content

sochdb_kernel/
atomic_claim.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Atomic Claim Protocol for Queue Operations
19//!
20//! This module provides linearizable "claim + delete" pop semantics for queue
21//! operations, ensuring no double-delivery under concurrent access.
22//!
23//! ## Problem: Double-Delivery Under Concurrency
24//!
25//! Without atomic claims, a naive "scan → delete" pattern can fail:
26//!
27//! ```text
28//! Worker A: scan() → finds task T
29//! Worker B: scan() → finds task T  (same task!)
30//! Worker A: delete(T) → success
31//! Worker B: delete(T) → fails or double-processes
32//! ```
33//!
34//! ## Solution: CAS-Based Claim Protocol
35//!
36//! The claim is the linearization point. Only one worker can successfully
37//! create the claim key, establishing ownership:
38//!
39//! ```text
40//! Worker A: scan() → finds task T
41//! Worker A: CAS(claim/T, absent → A) → SUCCESS
42//! Worker B: scan() → finds task T
43//! Worker B: CAS(claim/T, absent → B) → FAIL (key exists)
44//! Worker B: retry with next candidate
45//! ```
46//!
47//! ## Lease-Based Crash Recovery
48//!
49//! Claims have expiry times. If a worker crashes:
50//!
51//! 1. Claim expires after `lease_duration`
52//! 2. Next worker's scan finds task with expired claim
53//! 3. New claim can overwrite expired claim
54//! 4. Task is reprocessed (at-least-once delivery)
55//!
56//! ## Integration with SochDB's MVCC/SSI
57//!
58//! The claim protocol works with SochDB's transaction model:
59//!
60//! - Claims use MVCC versioning for conflict detection
61//! - The claim key insert is the durability boundary
62//! - SSI's dangerous-structure detection catches anomalies
63//! - WAL + fsync ensures claim survives crashes
64
65use std::collections::HashMap;
66use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
67use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
68
69use parking_lot::{Mutex, RwLock};
70
71// ============================================================================
72// ClaimResult - Result of a Claim Attempt
73// ============================================================================
74
75/// Result of attempting to claim a task
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub enum ClaimResult {
78    /// Successfully claimed the task
79    Success {
80        /// The claim token for subsequent operations
81        claim_token: ClaimToken,
82    },
83    /// Task was already claimed by another worker
84    AlreadyClaimed {
85        /// Who holds the claim
86        owner: String,
87        /// When the claim expires
88        expires_at: u64,
89    },
90    /// Claim expired and was taken over
91    TookOver {
92        /// Previous owner whose claim expired
93        previous_owner: String,
94        /// New claim token
95        claim_token: ClaimToken,
96    },
97    /// Task not found
98    NotFound,
99    /// Internal error
100    Error(String),
101}
102
103// ============================================================================
104// ClaimToken - Proof of Ownership
105// ============================================================================
106
107/// A token proving ownership of a claimed task
108///
109/// This token must be presented for subsequent operations (ack, nack, extend).
110/// It prevents workers from operating on tasks they don't own.
111#[derive(Debug, Clone, PartialEq, Eq, Hash)]
112pub struct ClaimToken {
113    /// Queue containing the task
114    pub queue_id: String,
115    /// Task being claimed
116    pub task_id: String,
117    /// Owner identity
118    pub owner: String,
119    /// Unique claim instance (to detect stale tokens)
120    pub instance: u64,
121    /// When the claim was created (epoch millis)
122    pub created_at: u64,
123    /// When the claim expires (epoch millis)
124    pub expires_at: u64,
125}
126
127impl ClaimToken {
128    /// Check if this token is still valid
129    pub fn is_valid(&self, now_millis: u64) -> bool {
130        now_millis < self.expires_at
131    }
132
133    /// Time remaining on the lease
134    pub fn remaining_ms(&self, now_millis: u64) -> u64 {
135        self.expires_at.saturating_sub(now_millis)
136    }
137}
138
139// ============================================================================
140// ClaimEntry - Internal Claim State
141// ============================================================================
142
143/// Internal state of a claim
144#[derive(Debug, Clone)]
145struct ClaimEntry {
146    /// Owner identity
147    owner: String,
148    /// Unique instance ID (increments on each claim)
149    instance: u64,
150    /// When claimed (epoch millis)
151    claimed_at: u64,
152    /// When claim expires (epoch millis)
153    expires_at: u64,
154    /// Number of times this task has been claimed
155    claim_count: u32,
156}
157
158impl ClaimEntry {
159    fn is_expired(&self, now_millis: u64) -> bool {
160        now_millis >= self.expires_at
161    }
162
163    fn to_token(&self, queue_id: &str, task_id: &str) -> ClaimToken {
164        ClaimToken {
165            queue_id: queue_id.to_string(),
166            task_id: task_id.to_string(),
167            owner: self.owner.clone(),
168            instance: self.instance,
169            created_at: self.claimed_at,
170            expires_at: self.expires_at,
171        }
172    }
173}
174
175// ============================================================================
176// AtomicClaimManager - The Core Claim Coordination Layer
177// ============================================================================
178
179/// Atomic claim manager for queue task ownership
180///
181/// This provides the CAS-based claim protocol that ensures linearizable
182/// task ownership under concurrent access.
183///
184/// ## Thread Safety
185///
186/// All operations are thread-safe. The manager uses fine-grained locking
187/// to minimize contention:
188/// - Per-queue locks for claim operations
189/// - Read-write locks for statistics
190///
191/// ## Durability
192///
193/// In production, claims should be persisted to storage with WAL durability.
194/// This in-memory implementation is for reference and testing.
195pub struct AtomicClaimManager {
196    /// Claims by queue_id -> (task_id -> ClaimEntry)
197    claims: RwLock<HashMap<String, HashMap<String, ClaimEntry>>>,
198    /// Instance counter for unique claim IDs
199    instance_counter: AtomicU64,
200    /// Statistics
201    stats: RwLock<ClaimStats>,
202    /// Mutex for claim operations (ensures CAS semantics)
203    claim_locks: RwLock<HashMap<String, std::sync::Arc<Mutex<()>>>>,
204}
205
206/// Statistics for claim operations
207#[derive(Debug, Clone, Default)]
208pub struct ClaimStats {
209    /// Total claim attempts
210    pub attempts: u64,
211    /// Successful claims
212    pub successes: u64,
213    /// Failed due to contention
214    pub contentions: u64,
215    /// Takeovers of expired claims
216    pub takeovers: u64,
217    /// Claims released via ack
218    pub acks: u64,
219    /// Claims released via nack
220    pub nacks: u64,
221    /// Claims expired
222    pub expirations: u64,
223}
224
225impl Default for AtomicClaimManager {
226    fn default() -> Self {
227        Self::new()
228    }
229}
230
231impl AtomicClaimManager {
232    /// Create a new claim manager
233    pub fn new() -> Self {
234        Self {
235            claims: RwLock::new(HashMap::new()),
236            instance_counter: AtomicU64::new(1),
237            stats: RwLock::new(ClaimStats::default()),
238            claim_locks: RwLock::new(HashMap::new()),
239        }
240    }
241
242    /// Get or create a lock for a specific claim key
243    fn get_claim_lock(&self, queue_id: &str, task_id: &str) -> std::sync::Arc<Mutex<()>> {
244        let key = format!("{}:{}", queue_id, task_id);
245
246        // Fast path: check if lock exists
247        {
248            let locks = self.claim_locks.read();
249            if let Some(lock) = locks.get(&key) {
250                return lock.clone();
251            }
252        }
253
254        // Slow path: create lock
255        let mut locks = self.claim_locks.write();
256        locks
257            .entry(key)
258            .or_insert_with(|| std::sync::Arc::new(Mutex::new(())))
259            .clone()
260    }
261
262    /// Attempt to claim a task
263    ///
264    /// This is the atomic CAS operation that establishes ownership.
265    ///
266    /// ## Semantics
267    ///
268    /// - If task is unclaimed: creates claim, returns Success
269    /// - If task is claimed by other worker with valid lease: returns AlreadyClaimed
270    /// - If task is claimed but lease expired: creates new claim, returns TookOver
271    ///
272    /// ## Complexity
273    ///
274    /// O(1) hash lookups + lock acquisition
275    pub fn claim(
276        &self,
277        queue_id: &str,
278        task_id: &str,
279        owner: &str,
280        lease_duration_ms: u64,
281    ) -> ClaimResult {
282        let now = current_time_millis();
283
284        // Get per-claim lock to ensure CAS semantics
285        let lock = self.get_claim_lock(queue_id, task_id);
286        let _guard = lock.lock();
287
288        // Update stats
289        self.stats.write().attempts += 1;
290
291        let mut claims = self.claims.write();
292        let queue_claims = claims
293            .entry(queue_id.to_string())
294            .or_insert_with(HashMap::new);
295
296        // Check existing claim
297        if let Some(existing) = queue_claims.get(task_id) {
298            if existing.owner == owner {
299                // Same owner re-claiming (extend)
300                let instance = self.instance_counter.fetch_add(1, AtomicOrdering::SeqCst);
301                let new_entry = ClaimEntry {
302                    owner: owner.to_string(),
303                    instance,
304                    claimed_at: now,
305                    expires_at: now + lease_duration_ms,
306                    claim_count: existing.claim_count + 1,
307                };
308                let token = new_entry.to_token(queue_id, task_id);
309                queue_claims.insert(task_id.to_string(), new_entry);
310
311                self.stats.write().successes += 1;
312                return ClaimResult::Success { claim_token: token };
313            }
314
315            if !existing.is_expired(now) {
316                // Valid claim by another worker
317                self.stats.write().contentions += 1;
318                return ClaimResult::AlreadyClaimed {
319                    owner: existing.owner.clone(),
320                    expires_at: existing.expires_at,
321                };
322            }
323
324            // Expired claim - take over
325            let previous_owner = existing.owner.clone();
326            let instance = self.instance_counter.fetch_add(1, AtomicOrdering::SeqCst);
327            let new_entry = ClaimEntry {
328                owner: owner.to_string(),
329                instance,
330                claimed_at: now,
331                expires_at: now + lease_duration_ms,
332                claim_count: existing.claim_count + 1,
333            };
334            let token = new_entry.to_token(queue_id, task_id);
335            queue_claims.insert(task_id.to_string(), new_entry);
336
337            self.stats.write().takeovers += 1;
338            return ClaimResult::TookOver {
339                previous_owner,
340                claim_token: token,
341            };
342        }
343
344        // No existing claim - create new
345        let instance = self.instance_counter.fetch_add(1, AtomicOrdering::SeqCst);
346        let entry = ClaimEntry {
347            owner: owner.to_string(),
348            instance,
349            claimed_at: now,
350            expires_at: now + lease_duration_ms,
351            claim_count: 1,
352        };
353        let token = entry.to_token(queue_id, task_id);
354        queue_claims.insert(task_id.to_string(), entry);
355
356        self.stats.write().successes += 1;
357        ClaimResult::Success { claim_token: token }
358    }
359
360    /// Release a claim (acknowledge successful processing)
361    ///
362    /// The claim token must be valid and owned by the caller.
363    pub fn release(&self, token: &ClaimToken) -> Result<(), String> {
364        let _now = current_time_millis();
365
366        let lock = self.get_claim_lock(&token.queue_id, &token.task_id);
367        let _guard = lock.lock();
368
369        let mut claims = self.claims.write();
370
371        if let Some(queue_claims) = claims.get_mut(&token.queue_id) {
372            if let Some(existing) = queue_claims.get(&token.task_id) {
373                // Verify ownership
374                if existing.instance != token.instance {
375                    return Err("Stale claim token".to_string());
376                }
377                if existing.owner != token.owner {
378                    return Err("Not claim owner".to_string());
379                }
380
381                queue_claims.remove(&token.task_id);
382                self.stats.write().acks += 1;
383                return Ok(());
384            }
385        }
386
387        Err("Claim not found".to_string())
388    }
389
390    /// Extend a claim's lease duration
391    ///
392    /// Useful when processing takes longer than expected.
393    pub fn extend(
394        &self,
395        queue_id: &str,
396        token: &ClaimToken,
397        additional_ms: u64,
398    ) -> Result<ClaimToken, String> {
399        let _now = current_time_millis();
400
401        let lock = self.get_claim_lock(queue_id, &token.task_id);
402        let _guard = lock.lock();
403
404        let mut claims = self.claims.write();
405
406        if let Some(queue_claims) = claims.get_mut(queue_id) {
407            if let Some(existing) = queue_claims.get_mut(&token.task_id) {
408                // Verify ownership
409                if existing.instance != token.instance {
410                    return Err("Stale claim token".to_string());
411                }
412                if existing.owner != token.owner {
413                    return Err("Not claim owner".to_string());
414                }
415
416                // Extend the lease
417                existing.expires_at += additional_ms;
418
419                return Ok(existing.to_token(queue_id, &token.task_id));
420            }
421        }
422
423        Err("Claim not found".to_string())
424    }
425
426    /// Check if a task is currently claimed
427    pub fn is_claimed(&self, queue_id: &str, task_id: &str) -> Option<(String, u64)> {
428        let now = current_time_millis();
429
430        let claims = self.claims.read();
431
432        if let Some(queue_claims) = claims.get(queue_id) {
433            if let Some(entry) = queue_claims.get(task_id) {
434                if !entry.is_expired(now) {
435                    return Some((entry.owner.clone(), entry.expires_at));
436                }
437            }
438        }
439
440        None
441    }
442
443    /// Get the current claim token for a task (if owned by the given worker)
444    pub fn get_token(&self, queue_id: &str, task_id: &str, owner: &str) -> Option<ClaimToken> {
445        let now = current_time_millis();
446
447        let claims = self.claims.read();
448
449        if let Some(queue_claims) = claims.get(queue_id) {
450            if let Some(entry) = queue_claims.get(task_id) {
451                if !entry.is_expired(now) && entry.owner == owner {
452                    return Some(entry.to_token(queue_id, task_id));
453                }
454            }
455        }
456
457        None
458    }
459
460    /// Clean up expired claims
461    ///
462    /// This should be called periodically (e.g., every few seconds).
463    /// Returns the number of claims cleaned up.
464    pub fn cleanup_expired(&self) -> usize {
465        let now = current_time_millis();
466        let mut cleaned = 0;
467
468        let mut claims = self.claims.write();
469
470        for queue_claims in claims.values_mut() {
471            queue_claims.retain(|_, entry| {
472                if entry.is_expired(now) {
473                    cleaned += 1;
474                    false
475                } else {
476                    true
477                }
478            });
479        }
480
481        if cleaned > 0 {
482            self.stats.write().expirations += cleaned as u64;
483        }
484
485        cleaned
486    }
487
488    /// Get statistics
489    pub fn stats(&self) -> ClaimStats {
490        self.stats.read().clone()
491    }
492
493    /// Get number of active claims for a queue
494    pub fn active_claims(&self, queue_id: &str) -> usize {
495        let now = current_time_millis();
496
497        self.claims
498            .read()
499            .get(queue_id)
500            .map(|q| q.values().filter(|e| !e.is_expired(now)).count())
501            .unwrap_or(0)
502    }
503
504    /// Get all active claims for a queue (for monitoring)
505    pub fn list_claims(&self, queue_id: &str) -> Vec<ClaimToken> {
506        let now = current_time_millis();
507
508        self.claims
509            .read()
510            .get(queue_id)
511            .map(|q| {
512                q.iter()
513                    .filter(|(_, e)| !e.is_expired(now))
514                    .map(|(task_id, e)| e.to_token(queue_id, task_id))
515                    .collect()
516            })
517            .unwrap_or_default()
518    }
519}
520
521// ============================================================================
522// CompareAndSwap Trait - For Storage Integration
523// ============================================================================
524
525/// Compare-and-swap trait for storage backends
526///
527/// This trait abstracts the CAS operation for different storage implementations.
528/// SochDB's storage layer should implement this for durable claims.
529pub trait CompareAndSwap {
530    /// Type of error returned
531    type Error: std::fmt::Debug;
532
533    /// Insert a key-value pair only if the key doesn't exist
534    ///
535    /// Returns Ok(true) if inserted, Ok(false) if key exists, Err on failure.
536    fn insert_if_absent(&self, key: &[u8], value: &[u8]) -> Result<bool, Self::Error>;
537
538    /// Update a value only if the current value matches expected
539    ///
540    /// Returns Ok(true) if updated, Ok(false) if mismatch, Err on failure.
541    fn compare_and_set(
542        &self,
543        key: &[u8],
544        expected: &[u8],
545        new_value: &[u8],
546    ) -> Result<bool, Self::Error>;
547
548    /// Delete a key only if the current value matches expected
549    ///
550    /// Returns Ok(true) if deleted, Ok(false) if mismatch, Err on failure.
551    fn delete_if_match(&self, key: &[u8], expected: &[u8]) -> Result<bool, Self::Error>;
552}
553
554// ============================================================================
555// LeaseManager - Higher-Level Lease Coordination
556// ============================================================================
557
558/// Configuration for lease management
559#[derive(Debug, Clone)]
560pub struct LeaseConfig {
561    /// Default lease duration
562    pub default_lease_ms: u64,
563    /// Minimum lease duration
564    pub min_lease_ms: u64,
565    /// Maximum lease duration
566    pub max_lease_ms: u64,
567    /// How often to run cleanup (ms)
568    pub cleanup_interval_ms: u64,
569    /// Maximum extensions per task
570    pub max_extensions: u32,
571}
572
573impl Default for LeaseConfig {
574    fn default() -> Self {
575        Self {
576            default_lease_ms: 30_000,   // 30 seconds
577            min_lease_ms: 1_000,        // 1 second
578            max_lease_ms: 3_600_000,    // 1 hour
579            cleanup_interval_ms: 5_000, // 5 seconds
580            max_extensions: 10,
581        }
582    }
583}
584
585/// Higher-level lease manager with periodic cleanup
586pub struct LeaseManager {
587    /// Underlying claim manager
588    claim_manager: AtomicClaimManager,
589    /// Configuration
590    config: LeaseConfig,
591    /// Last cleanup time
592    last_cleanup: RwLock<Instant>,
593    /// Extension counts per task
594    extension_counts: RwLock<HashMap<String, u32>>,
595}
596
597impl LeaseManager {
598    /// Create a new lease manager
599    pub fn new(config: LeaseConfig) -> Self {
600        Self {
601            claim_manager: AtomicClaimManager::new(),
602            config,
603            last_cleanup: RwLock::new(Instant::now()),
604            extension_counts: RwLock::new(HashMap::new()),
605        }
606    }
607
608    /// Acquire a lease on a task
609    pub fn acquire(
610        &self,
611        queue_id: &str,
612        task_id: &str,
613        owner: &str,
614        lease_ms: Option<u64>,
615    ) -> ClaimResult {
616        self.maybe_cleanup();
617
618        let lease_duration = lease_ms
619            .unwrap_or(self.config.default_lease_ms)
620            .clamp(self.config.min_lease_ms, self.config.max_lease_ms);
621
622        self.claim_manager
623            .claim(queue_id, task_id, owner, lease_duration)
624    }
625
626    /// Release a lease
627    pub fn release(&self, queue_id: &str, token: &ClaimToken) -> Result<(), String> {
628        // Clear extension count
629        {
630            let key = format!("{}:{}", queue_id, token.task_id);
631            self.extension_counts.write().remove(&key);
632        }
633
634        // Delegate to claim manager (which now correctly uses token.queue_id)
635        self.claim_manager.release(token)
636    }
637
638    /// Extend a lease
639    pub fn extend(
640        &self,
641        queue_id: &str,
642        token: &ClaimToken,
643        additional_ms: u64,
644    ) -> Result<ClaimToken, String> {
645        let key = format!("{}:{}", queue_id, token.task_id);
646
647        // Check extension limit
648        {
649            let counts = self.extension_counts.read();
650            if let Some(&count) = counts.get(&key) {
651                if count >= self.config.max_extensions {
652                    return Err(format!(
653                        "Maximum extensions ({}) reached",
654                        self.config.max_extensions
655                    ));
656                }
657            }
658        }
659
660        // Clamp additional time
661        let additional = additional_ms.clamp(self.config.min_lease_ms, self.config.max_lease_ms);
662
663        let result = self.claim_manager.extend(queue_id, token, additional)?;
664
665        // Increment extension count
666        {
667            let mut counts = self.extension_counts.write();
668            *counts.entry(key).or_insert(0) += 1;
669        }
670
671        Ok(result)
672    }
673
674    /// Get claim manager statistics
675    pub fn stats(&self) -> ClaimStats {
676        self.claim_manager.stats()
677    }
678
679    /// Force cleanup of expired leases
680    pub fn cleanup(&self) -> usize {
681        *self.last_cleanup.write() = Instant::now();
682        self.claim_manager.cleanup_expired()
683    }
684
685    /// Check if cleanup should run and run it if needed
686    fn maybe_cleanup(&self) {
687        let should_cleanup = {
688            let last = self.last_cleanup.read();
689            last.elapsed() > Duration::from_millis(self.config.cleanup_interval_ms)
690        };
691
692        if should_cleanup {
693            self.cleanup();
694        }
695    }
696}
697
698// ============================================================================
699// Helper Functions
700// ============================================================================
701
702/// Get current time in milliseconds since epoch
703fn current_time_millis() -> u64 {
704    SystemTime::now()
705        .duration_since(UNIX_EPOCH)
706        .unwrap_or_default()
707        .as_millis() as u64
708}
709
710// ============================================================================
711// Tests
712// ============================================================================
713
714#[cfg(test)]
715mod tests {
716    use super::*;
717    use std::sync::Arc;
718    use std::thread;
719
720    #[test]
721    fn test_claim_success() {
722        let manager = AtomicClaimManager::new();
723
724        match manager.claim("queue1", "task1", "worker1", 30_000) {
725            ClaimResult::Success { claim_token } => {
726                assert_eq!(claim_token.task_id, "task1");
727                assert_eq!(claim_token.owner, "worker1");
728            }
729            _ => panic!("Expected success"),
730        }
731    }
732
733    #[test]
734    fn test_claim_contention() {
735        let manager = AtomicClaimManager::new();
736
737        // First claim succeeds
738        let result1 = manager.claim("queue1", "task1", "worker1", 30_000);
739        assert!(matches!(result1, ClaimResult::Success { .. }));
740
741        // Second claim fails
742        let result2 = manager.claim("queue1", "task1", "worker2", 30_000);
743        match result2 {
744            ClaimResult::AlreadyClaimed { owner, .. } => {
745                assert_eq!(owner, "worker1");
746            }
747            _ => panic!("Expected AlreadyClaimed"),
748        }
749    }
750
751    #[test]
752    fn test_claim_takeover() {
753        let manager = AtomicClaimManager::new();
754
755        // Create claim with very short lease
756        let result1 = manager.claim("queue1", "task1", "worker1", 1);
757        assert!(matches!(result1, ClaimResult::Success { .. }));
758
759        // Wait for expiration
760        thread::sleep(Duration::from_millis(10));
761
762        // New worker can take over
763        let result2 = manager.claim("queue1", "task1", "worker2", 30_000);
764        match result2 {
765            ClaimResult::TookOver { previous_owner, .. } => {
766                assert_eq!(previous_owner, "worker1");
767            }
768            _ => panic!("Expected TookOver, got {:?}", result2),
769        }
770    }
771
772    #[test]
773    fn test_concurrent_claims() {
774        let manager = Arc::new(AtomicClaimManager::new());
775        let successes = Arc::new(std::sync::atomic::AtomicUsize::new(0));
776
777        let mut handles = vec![];
778
779        for i in 0..10 {
780            let mgr = manager.clone();
781            let succ = successes.clone();
782
783            handles.push(thread::spawn(move || {
784                match mgr.claim("queue1", "task1", &format!("worker{}", i), 30_000) {
785                    ClaimResult::Success { .. } => {
786                        succ.fetch_add(1, AtomicOrdering::SeqCst);
787                    }
788                    _ => {}
789                }
790            }));
791        }
792
793        for h in handles {
794            h.join().unwrap();
795        }
796
797        // Only one worker should succeed
798        assert_eq!(successes.load(AtomicOrdering::SeqCst), 1);
799    }
800
801    #[test]
802    fn test_claim_release_wrong_queue() {
803        let manager = AtomicClaimManager::new();
804
805        // Claim task in queue1
806        let token = match manager.claim("queue1", "task1", "worker1", 30_000) {
807            ClaimResult::Success { claim_token } => claim_token,
808            _ => panic!("Expected success"),
809        };
810
811        // Token should have correct queue_id
812        assert_eq!(token.queue_id, "queue1");
813
814        // Verify claimed
815        assert!(manager.is_claimed("queue1", "task1").is_some());
816
817        // Release using the correct token should work
818        manager.release(&token).unwrap();
819
820        // After release, task should not be claimed
821        assert!(manager.is_claimed("queue1", "task1").is_none());
822    }
823
824    #[test]
825    fn test_multiple_queue_isolation() {
826        let manager = AtomicClaimManager::new();
827
828        // Claim same task_id in different queues
829        let token1 = match manager.claim("queue1", "task1", "worker1", 30_000) {
830            ClaimResult::Success { claim_token } => claim_token,
831            _ => panic!("Expected success"),
832        };
833
834        let token2 = match manager.claim("queue2", "task1", "worker1", 30_000) {
835            ClaimResult::Success { claim_token } => claim_token,
836            _ => panic!("Expected success"),
837        };
838
839        // Tokens should have distinct queue_ids
840        assert_eq!(token1.queue_id, "queue1");
841        assert_eq!(token2.queue_id, "queue2");
842
843        // Both tasks should be claimed
844        assert!(manager.is_claimed("queue1", "task1").is_some());
845        assert!(manager.is_claimed("queue2", "task1").is_some());
846
847        // Release from queue1 should not affect queue2
848        manager.release(&token1).unwrap();
849        assert!(manager.is_claimed("queue1", "task1").is_none());
850        assert!(manager.is_claimed("queue2", "task1").is_some());
851
852        // Release from queue2
853        manager.release(&token2).unwrap();
854        assert!(manager.is_claimed("queue2", "task1").is_none());
855    }
856
857    #[test]
858    fn test_lease_manager_extension_limit() {
859        let config = LeaseConfig {
860            max_extensions: 2,
861            default_lease_ms: 100,
862            min_lease_ms: 10,
863            max_lease_ms: 1000,
864            cleanup_interval_ms: 10000,
865        };
866
867        let manager = LeaseManager::new(config);
868
869        let token = match manager.acquire("queue1", "task1", "worker1", None) {
870            ClaimResult::Success { claim_token } => claim_token,
871            _ => panic!("Expected success"),
872        };
873
874        // First extension OK
875        let token = manager.extend("queue1", &token, 100).unwrap();
876
877        // Second extension OK
878        let token = manager.extend("queue1", &token, 100).unwrap();
879
880        // Third extension should fail
881        let result = manager.extend("queue1", &token, 100);
882        assert!(result.is_err());
883    }
884
885    #[test]
886    fn test_cleanup_expired() {
887        let manager = AtomicClaimManager::new();
888
889        // Create claims with very short leases
890        manager.claim("queue1", "task1", "worker1", 1);
891        manager.claim("queue1", "task2", "worker1", 1);
892        manager.claim("queue1", "task3", "worker1", 100_000); // Long lease
893
894        thread::sleep(Duration::from_millis(10));
895
896        let cleaned = manager.cleanup_expired();
897        assert_eq!(cleaned, 2); // task1 and task2 expired
898
899        // task3 should still be claimed
900        assert!(manager.is_claimed("queue1", "task3").is_some());
901    }
902
903    #[test]
904    fn test_stats_tracking() {
905        let manager = AtomicClaimManager::new();
906
907        // Success
908        manager.claim("queue1", "task1", "worker1", 30_000);
909
910        // Contention
911        manager.claim("queue1", "task1", "worker2", 30_000);
912
913        let stats = manager.stats();
914        assert_eq!(stats.attempts, 2);
915        assert_eq!(stats.successes, 1);
916        assert_eq!(stats.contentions, 1);
917    }
918}