rustfs_lock/
drwmutex.rs

1// Copyright 2024 RustFS Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::{Duration, Instant};
16use tokio::{sync::mpsc::Sender, time::sleep};
17use tracing::{info, warn};
18
19use crate::{LockApi, Locker, lock_args::LockArgs};
20
21const DRW_MUTEX_REFRESH_INTERVAL: Duration = Duration::from_secs(10);
22const LOCK_RETRY_MIN_INTERVAL: Duration = Duration::from_millis(250);
23
24#[derive(Debug)]
25pub struct DRWMutex {
26    owner: String,
27    names: Vec<String>,
28    write_locks: Vec<String>,
29    read_locks: Vec<String>,
30    cancel_refresh_sender: Option<Sender<bool>>,
31    // rng: ThreadRng,
32    lockers: Vec<LockApi>,
33    refresh_interval: Duration,
34    lock_retry_min_interval: Duration,
35}
36
37#[derive(Debug, Default, Clone)]
38pub struct Granted {
39    index: usize,
40    lock_uid: String,
41}
42
43impl Granted {
44    fn is_locked(&self) -> bool {
45        is_locked(&self.lock_uid)
46    }
47}
48
49fn is_locked(uid: &str) -> bool {
50    !uid.is_empty()
51}
52
53#[derive(Debug, Clone)]
54pub struct Options {
55    pub timeout: Duration,
56    pub retry_interval: Duration,
57}
58
59impl DRWMutex {
60    pub fn new(owner: String, names: Vec<String>, lockers: Vec<LockApi>) -> Self {
61        let mut names = names;
62        names.sort();
63        Self {
64            owner,
65            names,
66            write_locks: vec![String::new(); lockers.len()],
67            read_locks: vec![String::new(); lockers.len()],
68            cancel_refresh_sender: None,
69            // rng: rand::thread_rng(),
70            lockers,
71            refresh_interval: DRW_MUTEX_REFRESH_INTERVAL,
72            lock_retry_min_interval: LOCK_RETRY_MIN_INTERVAL,
73        }
74    }
75
76    fn is_locked(&self) -> bool {
77        self.write_locks.iter().any(|w_lock| is_locked(w_lock))
78    }
79
80    fn is_r_locked(&self) -> bool {
81        self.read_locks.iter().any(|r_lock| is_locked(r_lock))
82    }
83}
84
85impl DRWMutex {
86    pub async fn lock(&mut self, id: &String, source: &String) {
87        let is_read_lock = false;
88        let opts = Options {
89            timeout: Duration::from_secs(10),
90            retry_interval: Duration::from_millis(50),
91        };
92        self.lock_blocking(id, source, is_read_lock, &opts).await;
93    }
94
95    pub async fn get_lock(&mut self, id: &String, source: &String, opts: &Options) -> bool {
96        let is_read_lock = false;
97        self.lock_blocking(id, source, is_read_lock, opts).await
98    }
99
100    pub async fn r_lock(&mut self, id: &String, source: &String) {
101        let is_read_lock = true;
102        let opts = Options {
103            timeout: Duration::from_secs(10),
104            retry_interval: Duration::from_millis(50),
105        };
106        self.lock_blocking(id, source, is_read_lock, &opts).await;
107    }
108
109    pub async fn get_r_lock(&mut self, id: &String, source: &String, opts: &Options) -> bool {
110        let is_read_lock = true;
111        self.lock_blocking(id, source, is_read_lock, opts).await
112    }
113
114    pub async fn lock_blocking(&mut self, id: &String, source: &String, is_read_lock: bool, opts: &Options) -> bool {
115        let locker_len = self.lockers.len();
116
117        // Handle edge case: no lockers available
118        if locker_len == 0 {
119            return false;
120        }
121
122        let mut tolerance = locker_len / 2;
123        let mut quorum = locker_len - tolerance;
124        if !is_read_lock {
125            // In situations for write locks, as a special case
126            // to avoid split brains we make sure to acquire
127            // quorum + 1 when tolerance is exactly half of the
128            // total locker clients.
129            if quorum == tolerance {
130                quorum += 1;
131            }
132        }
133        info!(
134            "lockBlocking {}/{} for {:?}: lockType readLock({}), additional opts: {:?}, quorum: {}, tolerance: {}, lockClients: {}\n",
135            id, source, self.names, is_read_lock, opts, quorum, tolerance, locker_len
136        );
137
138        // Recalculate tolerance after potential quorum adjustment
139        // Use saturating_sub to prevent underflow
140        tolerance = locker_len.saturating_sub(quorum);
141        let mut attempt = 0;
142        let mut locks = vec!["".to_string(); self.lockers.len()];
143
144        loop {
145            if self.inner_lock(&mut locks, id, source, is_read_lock, tolerance, quorum).await {
146                if is_read_lock {
147                    self.read_locks = locks;
148                } else {
149                    self.write_locks = locks;
150                }
151
152                info!("lock_blocking {}/{} for {:?}: granted", id, source, self.names);
153
154                return true;
155            }
156
157            attempt += 1;
158            if attempt >= 10 {
159                break;
160            }
161            sleep(opts.retry_interval).await;
162        }
163
164        false
165    }
166
167    async fn inner_lock(
168        &mut self,
169        locks: &mut [String],
170        id: &String,
171        source: &String,
172        is_read_lock: bool,
173        tolerance: usize,
174        quorum: usize,
175    ) -> bool {
176        locks.iter_mut().for_each(|lock| *lock = "".to_string());
177
178        let mut granteds = Vec::with_capacity(self.lockers.len());
179        let args = LockArgs {
180            uid: id.to_string(),
181            resources: self.names.clone(),
182            owner: self.owner.clone(),
183            source: source.to_string(),
184            quorum,
185        };
186
187        for (index, locker) in self.lockers.iter_mut().enumerate() {
188            let mut granted = Granted {
189                index,
190                ..Default::default()
191            };
192
193            if is_read_lock {
194                match locker.rlock(&args).await {
195                    Ok(locked) => {
196                        if locked {
197                            granted.lock_uid = id.to_string();
198                        }
199                    }
200                    Err(err) => {
201                        warn!("Unable to call RLock failed with {} for {} at {:?}", err, args, locker);
202                    }
203                }
204            } else {
205                match locker.lock(&args).await {
206                    Ok(locked) => {
207                        if locked {
208                            granted.lock_uid = id.to_string();
209                        }
210                    }
211                    Err(err) => {
212                        warn!("Unable to call Lock failed with {} for {} at {:?}", err, args, locker);
213                    }
214                }
215            }
216
217            granteds.push(granted);
218        }
219
220        granteds.iter().for_each(|granted| {
221            locks[granted.index] = granted.lock_uid.clone();
222        });
223
224        let quorum_locked = check_quorum_locked(locks, quorum);
225        if !quorum_locked {
226            info!("Unable to acquire lock in quorum, {}", args);
227            if !self.release_all(tolerance, locks, is_read_lock).await {
228                info!("Unable to release acquired locks, these locks will expire automatically {}", args);
229            }
230        }
231
232        quorum_locked
233    }
234
235    pub async fn un_lock(&mut self) {
236        if self.write_locks.is_empty() || !self.is_locked() {
237            warn!("Trying to un_lock() while no lock() is active, write_locks: {:?}", self.write_locks)
238        }
239
240        let tolerance = self.lockers.len() / 2;
241        let is_read_lock = false;
242        let mut locks = std::mem::take(&mut self.write_locks);
243        let start = Instant::now();
244        loop {
245            if self.release_all(tolerance, &mut locks, is_read_lock).await {
246                return;
247            }
248
249            sleep(self.lock_retry_min_interval).await;
250            if Instant::now().duration_since(start) > Duration::from_secs(30) {
251                return;
252            }
253        }
254    }
255
256    pub async fn un_r_lock(&mut self) {
257        if self.read_locks.is_empty() || !self.is_r_locked() {
258            warn!("Trying to un_r_lock() while no r_lock() is active, read_locks: {:?}", self.read_locks)
259        }
260
261        let tolerance = self.lockers.len() / 2;
262        let is_read_lock = true;
263        let mut locks = std::mem::take(&mut self.read_locks);
264        let start = Instant::now();
265        loop {
266            if self.release_all(tolerance, &mut locks, is_read_lock).await {
267                return;
268            }
269
270            sleep(self.lock_retry_min_interval).await;
271            if Instant::now().duration_since(start) > Duration::from_secs(30) {
272                return;
273            }
274        }
275    }
276
277    async fn release_all(&mut self, tolerance: usize, locks: &mut [String], is_read_lock: bool) -> bool {
278        for (index, locker) in self.lockers.iter_mut().enumerate() {
279            if send_release(locker, &locks[index], &self.owner, &self.names, is_read_lock).await {
280                locks[index] = "".to_string();
281            }
282        }
283
284        !check_failed_unlocks(locks, tolerance)
285    }
286}
287
288// async fn start_continuous_lock_refresh(lockers: &Vec<&mut LockApi>, id: &String, source: &String, quorum: usize, refresh_interval: Duration, mut cancel_refresh_receiver: Receiver<bool>) {
289//     let uid = id.to_string();
290//     tokio::spawn(async move {
291//         let mut ticker = interval(refresh_interval);
292//         let args = LockArgs {
293//             uid,
294//             ..Default::default()
295//         };
296
297//         loop {
298//             select! {
299//                 _ = ticker.tick() => {
300//                     for (index, locker) in lockers.iter().enumerate() {
301
302//                     }
303//                 },
304//                 _ = cancel_refresh_receiver.recv() => {
305//                     return;
306//                 }
307//             }
308//         }
309//     });
310// }
311
312fn check_failed_unlocks(locks: &[String], tolerance: usize) -> bool {
313    let mut un_locks_failed = 0;
314    locks.iter().for_each(|lock| {
315        if is_locked(lock) {
316            un_locks_failed += 1;
317        }
318    });
319
320    // Handle edge case: if tolerance is greater than or equal to locks.len(),
321    // we can tolerate all failures, so return false (no critical failure)
322    if tolerance >= locks.len() {
323        return false;
324    }
325
326    // Special case: when locks.len() - tolerance == tolerance (i.e., locks.len() == 2 * tolerance)
327    // This happens when we have an even number of lockers and tolerance is exactly half
328    if locks.len() - tolerance == tolerance {
329        return un_locks_failed >= tolerance;
330    }
331
332    // Normal case: failure if more than tolerance unlocks failed
333    un_locks_failed > tolerance
334}
335
336async fn send_release(locker: &mut LockApi, uid: &String, owner: &str, names: &[String], is_read_lock: bool) -> bool {
337    if uid.is_empty() {
338        return false;
339    }
340
341    let args = LockArgs {
342        uid: uid.to_string(),
343        owner: owner.to_owned(),
344        resources: names.to_owned(),
345        ..Default::default()
346    };
347
348    if is_read_lock {
349        match locker.runlock(&args).await {
350            Ok(locked) => {
351                if !locked {
352                    warn!("Unable to release runlock, args: {}", args);
353                    return false;
354                }
355            }
356            Err(err) => {
357                warn!("Unable to call RLock failed with {} for {} at {:?}", err, args, locker);
358                return false;
359            }
360        }
361    } else {
362        match locker.unlock(&args).await {
363            Ok(locked) => {
364                if !locked {
365                    warn!("Unable to release unlock, args: {}", args);
366                    return false;
367                }
368            }
369            Err(err) => {
370                warn!("Unable to call Lock failed with {} for {} at {:?}", err, args, locker);
371                return false;
372            }
373        }
374    }
375
376    true
377}
378
379fn check_quorum_locked(locks: &[String], quorum: usize) -> bool {
380    let mut count = 0;
381    locks.iter().for_each(|lock| {
382        if is_locked(lock) {
383            count += 1;
384        }
385    });
386
387    count >= quorum
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use crate::local_locker::LocalLocker;
394    use async_trait::async_trait;
395    use std::collections::HashMap;
396    use std::io::{Error, Result};
397    use std::sync::{Arc, Mutex};
398
399    // Mock locker for testing
400    #[derive(Debug, Clone)]
401    struct MockLocker {
402        id: String,
403        state: Arc<Mutex<MockLockerState>>,
404    }
405
406    #[derive(Debug, Default)]
407    struct MockLockerState {
408        locks: HashMap<String, String>,      // uid -> owner
409        read_locks: HashMap<String, String>, // uid -> owner
410        should_fail: bool,
411        is_online: bool,
412    }
413
414    impl MockLocker {
415        fn new(id: String) -> Self {
416            Self {
417                id,
418                state: Arc::new(Mutex::new(MockLockerState {
419                    is_online: true,
420                    ..Default::default()
421                })),
422            }
423        }
424
425        fn set_should_fail(&self, should_fail: bool) {
426            self.state.lock().unwrap().should_fail = should_fail;
427        }
428
429        fn set_online(&self, online: bool) {
430            self.state.lock().unwrap().is_online = online;
431        }
432
433        fn get_lock_count(&self) -> usize {
434            self.state.lock().unwrap().locks.len()
435        }
436
437        fn get_read_lock_count(&self) -> usize {
438            self.state.lock().unwrap().read_locks.len()
439        }
440
441        fn has_lock(&self, uid: &str) -> bool {
442            self.state.lock().unwrap().locks.contains_key(uid)
443        }
444
445        fn has_read_lock(&self, uid: &str) -> bool {
446            self.state.lock().unwrap().read_locks.contains_key(uid)
447        }
448    }
449
450    #[async_trait]
451    impl Locker for MockLocker {
452        async fn lock(&mut self, args: &LockArgs) -> Result<bool> {
453            let mut state = self.state.lock().unwrap();
454            if state.should_fail {
455                return Err(Error::other("Mock lock failure"));
456            }
457            if !state.is_online {
458                return Err(Error::other("Mock locker offline"));
459            }
460
461            // Check if already locked
462            if state.locks.contains_key(&args.uid) {
463                return Ok(false);
464            }
465
466            state.locks.insert(args.uid.clone(), args.owner.clone());
467            Ok(true)
468        }
469
470        async fn unlock(&mut self, args: &LockArgs) -> Result<bool> {
471            let mut state = self.state.lock().unwrap();
472            if state.should_fail {
473                return Err(Error::other("Mock unlock failure"));
474            }
475
476            Ok(state.locks.remove(&args.uid).is_some())
477        }
478
479        async fn rlock(&mut self, args: &LockArgs) -> Result<bool> {
480            let mut state = self.state.lock().unwrap();
481            if state.should_fail {
482                return Err(Error::other("Mock rlock failure"));
483            }
484            if !state.is_online {
485                return Err(Error::other("Mock locker offline"));
486            }
487
488            // Check if write lock exists
489            if state.locks.contains_key(&args.uid) {
490                return Ok(false);
491            }
492
493            state.read_locks.insert(args.uid.clone(), args.owner.clone());
494            Ok(true)
495        }
496
497        async fn runlock(&mut self, args: &LockArgs) -> Result<bool> {
498            let mut state = self.state.lock().unwrap();
499            if state.should_fail {
500                return Err(Error::other("Mock runlock failure"));
501            }
502
503            Ok(state.read_locks.remove(&args.uid).is_some())
504        }
505
506        async fn refresh(&mut self, _args: &LockArgs) -> Result<bool> {
507            let state = self.state.lock().unwrap();
508            if state.should_fail {
509                return Err(Error::other("Mock refresh failure"));
510            }
511            Ok(true)
512        }
513
514        async fn force_unlock(&mut self, args: &LockArgs) -> Result<bool> {
515            let mut state = self.state.lock().unwrap();
516            let removed_lock = state.locks.remove(&args.uid).is_some();
517            let removed_read_lock = state.read_locks.remove(&args.uid).is_some();
518            Ok(removed_lock || removed_read_lock)
519        }
520
521        async fn close(&self) {}
522
523        async fn is_online(&self) -> bool {
524            self.state.lock().unwrap().is_online
525        }
526
527        async fn is_local(&self) -> bool {
528            true
529        }
530    }
531
532    fn create_mock_lockers(count: usize) -> Vec<LockApi> {
533        // For testing, we'll use Local lockers which use the global local server
534        (0..count).map(|_| LockApi::Local).collect()
535    }
536
537    #[test]
538    fn test_drw_mutex_new() {
539        let names = vec!["resource1".to_string(), "resource2".to_string()];
540        let lockers = create_mock_lockers(3);
541        let mutex = DRWMutex::new("owner1".to_string(), names.clone(), lockers);
542
543        assert_eq!(mutex.owner, "owner1");
544        assert_eq!(mutex.names.len(), 2);
545        assert_eq!(mutex.lockers.len(), 3);
546        assert_eq!(mutex.write_locks.len(), 3);
547        assert_eq!(mutex.read_locks.len(), 3);
548        assert_eq!(mutex.refresh_interval, DRW_MUTEX_REFRESH_INTERVAL);
549        assert_eq!(mutex.lock_retry_min_interval, LOCK_RETRY_MIN_INTERVAL);
550
551        // Names should be sorted
552        let mut expected_names = names;
553        expected_names.sort();
554        assert_eq!(mutex.names, expected_names);
555    }
556
557    #[test]
558    fn test_drw_mutex_new_empty_names() {
559        let names = vec![];
560        let lockers = create_mock_lockers(1);
561        let mutex = DRWMutex::new("owner1".to_string(), names, lockers);
562
563        assert_eq!(mutex.names.len(), 0);
564        assert_eq!(mutex.lockers.len(), 1);
565    }
566
567    #[test]
568    fn test_drw_mutex_new_single_locker() {
569        let names = vec!["resource1".to_string()];
570        let lockers = create_mock_lockers(1);
571        let mutex = DRWMutex::new("owner1".to_string(), names, lockers);
572
573        assert_eq!(mutex.lockers.len(), 1);
574        assert_eq!(mutex.write_locks.len(), 1);
575        assert_eq!(mutex.read_locks.len(), 1);
576    }
577
578    #[test]
579    fn test_is_locked_function() {
580        assert!(!is_locked(""));
581        assert!(is_locked("some-uid"));
582        assert!(is_locked("any-non-empty-string"));
583    }
584
585    #[test]
586    fn test_granted_is_locked() {
587        let granted_empty = Granted {
588            index: 0,
589            lock_uid: "".to_string(),
590        };
591        assert!(!granted_empty.is_locked());
592
593        let granted_locked = Granted {
594            index: 1,
595            lock_uid: "test-uid".to_string(),
596        };
597        assert!(granted_locked.is_locked());
598    }
599
600    #[test]
601    fn test_drw_mutex_is_locked() {
602        let names = vec!["resource1".to_string()];
603        let lockers = create_mock_lockers(2);
604        let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
605
606        // Initially not locked
607        assert!(!mutex.is_locked());
608        assert!(!mutex.is_r_locked());
609
610        // Set write locks
611        mutex.write_locks[0] = "test-uid".to_string();
612        assert!(mutex.is_locked());
613        assert!(!mutex.is_r_locked());
614
615        // Clear write locks, set read locks
616        mutex.write_locks[0] = "".to_string();
617        mutex.read_locks[1] = "read-uid".to_string();
618        assert!(!mutex.is_locked());
619        assert!(mutex.is_r_locked());
620    }
621
622    #[test]
623    fn test_options_debug() {
624        let opts = Options {
625            timeout: Duration::from_secs(5),
626            retry_interval: Duration::from_millis(100),
627        };
628        let debug_str = format!("{opts:?}");
629        assert!(debug_str.contains("timeout"));
630        assert!(debug_str.contains("retry_interval"));
631    }
632
633    #[test]
634    fn test_check_quorum_locked() {
635        // Test with empty locks
636        assert!(!check_quorum_locked(&[], 1));
637
638        // Test with all empty locks
639        let locks = vec!["".to_string(), "".to_string(), "".to_string()];
640        assert!(!check_quorum_locked(&locks, 1));
641        assert!(!check_quorum_locked(&locks, 2));
642
643        // Test with some locks
644        let locks = vec!["uid1".to_string(), "".to_string(), "uid3".to_string()];
645        assert!(check_quorum_locked(&locks, 1));
646        assert!(check_quorum_locked(&locks, 2));
647        assert!(!check_quorum_locked(&locks, 3));
648
649        // Test with all locks
650        let locks = vec!["uid1".to_string(), "uid2".to_string(), "uid3".to_string()];
651        assert!(check_quorum_locked(&locks, 1));
652        assert!(check_quorum_locked(&locks, 2));
653        assert!(check_quorum_locked(&locks, 3));
654        assert!(!check_quorum_locked(&locks, 4));
655    }
656
657    #[test]
658    fn test_check_failed_unlocks() {
659        // Test with empty locks
660        assert!(!check_failed_unlocks(&[], 0)); // tolerance >= locks.len(), so no critical failure
661        assert!(!check_failed_unlocks(&[], 1)); // tolerance >= locks.len(), so no critical failure
662
663        // Test with all unlocked
664        let locks = vec!["".to_string(), "".to_string(), "".to_string()];
665        assert!(!check_failed_unlocks(&locks, 1)); // 0 failed <= tolerance 1
666        assert!(!check_failed_unlocks(&locks, 2)); // 0 failed <= tolerance 2
667
668        // Test with some failed unlocks
669        let locks = vec!["uid1".to_string(), "".to_string(), "uid3".to_string()];
670        assert!(check_failed_unlocks(&locks, 1)); // 2 failed > tolerance 1
671        assert!(!check_failed_unlocks(&locks, 2)); // 2 failed <= tolerance 2
672
673        // Test special case: locks.len() - tolerance == tolerance
674        // This means locks.len() == 2 * tolerance
675        let locks = vec!["uid1".to_string(), "uid2".to_string()]; // len = 2
676        let tolerance = 1; // 2 - 1 == 1
677        assert!(check_failed_unlocks(&locks, tolerance)); // 2 failed >= tolerance 1
678
679        let locks = vec!["".to_string(), "uid2".to_string()]; // len = 2, 1 failed
680        assert!(check_failed_unlocks(&locks, tolerance)); // 1 failed >= tolerance 1
681
682        let locks = vec!["".to_string(), "".to_string()]; // len = 2, 0 failed
683        assert!(!check_failed_unlocks(&locks, tolerance)); // 0 failed < tolerance 1
684    }
685
686    #[test]
687    fn test_check_failed_unlocks_edge_cases() {
688        // Test with zero tolerance
689        let locks = vec!["uid1".to_string()];
690        assert!(check_failed_unlocks(&locks, 0)); // 1 failed > tolerance 0
691
692        // Test with tolerance equal to lock count
693        let locks = vec!["uid1".to_string(), "uid2".to_string()];
694        assert!(!check_failed_unlocks(&locks, 2)); // 2 failed <= tolerance 2
695
696        // Test with tolerance greater than lock count
697        let locks = vec!["uid1".to_string()];
698        assert!(!check_failed_unlocks(&locks, 5)); // 1 failed <= tolerance 5
699    }
700
701    // Async tests using the local locker infrastructure
702    #[tokio::test]
703    async fn test_drw_mutex_lock_basic_functionality() {
704        let names = vec!["resource1".to_string()];
705        let lockers = create_mock_lockers(1); // Single locker for simplicity
706        let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
707
708        let id = "test-lock-id".to_string();
709        let source = "test-source".to_string();
710        let opts = Options {
711            timeout: Duration::from_secs(1),
712            retry_interval: Duration::from_millis(10),
713        };
714
715        // Test get_lock (result depends on local locker state)
716        let _result = mutex.get_lock(&id, &source, &opts).await;
717        // Just ensure the method doesn't panic and returns a boolean
718        // assert!(result || !result); // This is always true, so removed
719
720        // If lock was acquired, test unlock
721        if _result {
722            assert!(mutex.is_locked(), "Mutex should be in locked state");
723            mutex.un_lock().await;
724            assert!(!mutex.is_locked(), "Mutex should be unlocked after un_lock");
725        }
726    }
727
728    #[tokio::test]
729    async fn test_drw_mutex_rlock_basic_functionality() {
730        let names = vec!["resource1".to_string()];
731        let lockers = create_mock_lockers(1); // Single locker for simplicity
732        let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
733
734        let id = "test-rlock-id".to_string();
735        let source = "test-source".to_string();
736        let opts = Options {
737            timeout: Duration::from_secs(1),
738            retry_interval: Duration::from_millis(10),
739        };
740
741        // Test get_r_lock (result depends on local locker state)
742        let _result = mutex.get_r_lock(&id, &source, &opts).await;
743        // Just ensure the method doesn't panic and returns a boolean
744        // assert!(result || !result); // This is always true, so removed
745
746        // If read lock was acquired, test runlock
747        if _result {
748            assert!(mutex.is_r_locked(), "Mutex should be in read locked state");
749            mutex.un_r_lock().await;
750            assert!(!mutex.is_r_locked(), "Mutex should be unlocked after un_r_lock");
751        }
752    }
753
754    #[tokio::test]
755    async fn test_drw_mutex_lock_with_multiple_lockers() {
756        let names = vec!["resource1".to_string()];
757        let lockers = create_mock_lockers(3); // 3 lockers, need quorum of 2
758        let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
759
760        let id = "test-lock-id".to_string();
761        let source = "test-source".to_string();
762        let opts = Options {
763            timeout: Duration::from_secs(1),
764            retry_interval: Duration::from_millis(10),
765        };
766
767        // With 3 local lockers, the quorum calculation should be:
768        // tolerance = 3 / 2 = 1
769        // quorum = 3 - 1 = 2
770        // Since it's a write lock and quorum != tolerance, quorum stays 2
771        // The result depends on the actual locker implementation
772        let _result = mutex.get_lock(&id, &source, &opts).await;
773        // We don't assert success/failure here since it depends on the local locker state
774        // Just ensure the method doesn't panic and returns a boolean
775        // assert!(result || !result); // This is always true, so removed
776    }
777
778    #[tokio::test]
779    async fn test_drw_mutex_unlock_without_lock() {
780        let names = vec!["resource1".to_string()];
781        let lockers = create_mock_lockers(1);
782        let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
783
784        // Try to unlock without having a lock - should not panic
785        mutex.un_lock().await;
786        assert!(!mutex.is_locked());
787
788        // Try to unlock read lock without having one - should not panic
789        mutex.un_r_lock().await;
790        assert!(!mutex.is_r_locked());
791    }
792
793    #[tokio::test]
794    async fn test_drw_mutex_multiple_resources() {
795        let names = vec!["resource1".to_string(), "resource2".to_string(), "resource3".to_string()];
796        let lockers = create_mock_lockers(1);
797        let mut mutex = DRWMutex::new("owner1".to_string(), names.clone(), lockers);
798
799        // Names should be sorted
800        let mut expected_names = names;
801        expected_names.sort();
802        assert_eq!(mutex.names, expected_names);
803
804        let id = "test-lock-id".to_string();
805        let source = "test-source".to_string();
806        let opts = Options {
807            timeout: Duration::from_secs(1),
808            retry_interval: Duration::from_millis(10),
809        };
810
811        let _result = mutex.get_lock(&id, &source, &opts).await;
812        // The result depends on the actual locker implementation
813        // Just ensure the method doesn't panic and returns a boolean
814        // assert!(result || !result); // This is always true, so removed
815    }
816
817    #[tokio::test]
818    async fn test_drw_mutex_concurrent_read_locks() {
819        // Clear global state before test to avoid interference from other tests
820        {
821            let mut global_server = crate::GLOBAL_LOCAL_SERVER.write().await;
822            *global_server = LocalLocker::new();
823        }
824
825        // Use a single mutex with one resource for simplicity
826        let names = vec!["test-resource".to_string()];
827        let lockers = create_mock_lockers(1);
828        let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
829
830        let id1 = "test-rlock-id1".to_string();
831        let id2 = "test-rlock-id2".to_string();
832        let source = "test-source".to_string();
833        let opts = Options {
834            timeout: Duration::from_secs(5),
835            retry_interval: Duration::from_millis(50),
836        };
837
838        // First acquire a read lock
839        let result1 = mutex.get_r_lock(&id1, &source, &opts).await;
840        assert!(result1, "First read lock should succeed");
841
842        // Release the first read lock
843        mutex.un_r_lock().await;
844
845        // Then acquire another read lock with different ID - this should succeed
846        let result2 = mutex.get_r_lock(&id2, &source, &opts).await;
847        assert!(result2, "Second read lock should succeed after first is released");
848
849        // Clean up
850        mutex.un_r_lock().await;
851    }
852
853    #[tokio::test]
854    async fn test_send_release_with_empty_uid() {
855        let mut locker = LockApi::Local;
856        let result = send_release(&mut locker, &"".to_string(), "owner", &["resource".to_string()], false).await;
857        assert!(!result, "send_release should return false for empty uid");
858    }
859
860    #[test]
861    fn test_drw_mutex_debug() {
862        let names = vec!["resource1".to_string()];
863        let lockers = create_mock_lockers(1);
864        let mutex = DRWMutex::new("owner1".to_string(), names, lockers);
865
866        let debug_str = format!("{mutex:?}");
867        assert!(debug_str.contains("DRWMutex"));
868        assert!(debug_str.contains("owner"));
869        assert!(debug_str.contains("names"));
870    }
871
872    #[test]
873    fn test_granted_default() {
874        let granted = Granted::default();
875        assert_eq!(granted.index, 0);
876        assert_eq!(granted.lock_uid, "");
877        assert!(!granted.is_locked());
878    }
879
880    #[test]
881    fn test_granted_clone() {
882        let granted = Granted {
883            index: 5,
884            lock_uid: "test-uid".to_string(),
885        };
886        let cloned = granted.clone();
887        assert_eq!(granted.index, cloned.index);
888        assert_eq!(granted.lock_uid, cloned.lock_uid);
889    }
890
891    // Test potential bug scenarios
892    #[test]
893    fn test_potential_bug_check_failed_unlocks_logic() {
894        // This test highlights the potentially confusing logic in check_failed_unlocks
895
896        // Case 1: Even number of lockers
897        let locks = vec!["uid1".to_string(), "uid2".to_string(), "uid3".to_string(), "uid4".to_string()];
898        let tolerance = 2; // locks.len() / 2 = 4 / 2 = 2
899        // locks.len() - tolerance = 4 - 2 = 2, which equals tolerance
900        // So the special case applies: un_locks_failed >= tolerance
901
902        // All 4 failed unlocks
903        assert!(check_failed_unlocks(&locks, tolerance)); // 4 >= 2 = true
904
905        // 2 failed unlocks
906        let locks = vec!["uid1".to_string(), "uid2".to_string(), "".to_string(), "".to_string()];
907        assert!(check_failed_unlocks(&locks, tolerance)); // 2 >= 2 = true
908
909        // 1 failed unlock
910        let locks = vec!["uid1".to_string(), "".to_string(), "".to_string(), "".to_string()];
911        assert!(!check_failed_unlocks(&locks, tolerance)); // 1 >= 2 = false
912
913        // Case 2: Odd number of lockers
914        let locks = vec!["uid1".to_string(), "uid2".to_string(), "uid3".to_string()];
915        let tolerance = 1; // locks.len() / 2 = 3 / 2 = 1
916        // locks.len() - tolerance = 3 - 1 = 2, which does NOT equal tolerance (1)
917        // So the normal case applies: un_locks_failed > tolerance
918
919        // 3 failed unlocks
920        assert!(check_failed_unlocks(&locks, tolerance)); // 3 > 1 = true
921
922        // 2 failed unlocks
923        let locks = vec!["uid1".to_string(), "uid2".to_string(), "".to_string()];
924        assert!(check_failed_unlocks(&locks, tolerance)); // 2 > 1 = true
925
926        // 1 failed unlock
927        let locks = vec!["uid1".to_string(), "".to_string(), "".to_string()];
928        assert!(!check_failed_unlocks(&locks, tolerance)); // 1 > 1 = false
929    }
930
931    #[test]
932    fn test_quorum_calculation_edge_cases() {
933        // Test the quorum calculation logic that might have issues
934
935        // For 1 locker: tolerance = 0, quorum = 1
936        // Write lock: quorum == tolerance (1 == 0 is false), so quorum stays 1
937        // This seems wrong - with 1 locker, we should need that 1 locker
938
939        // For 2 lockers: tolerance = 1, quorum = 1
940        // Write lock: quorum == tolerance (1 == 1 is true), so quorum becomes 2
941        // This makes sense - we need both lockers for write lock
942
943        // For 3 lockers: tolerance = 1, quorum = 2
944        // Write lock: quorum == tolerance (2 == 1 is false), so quorum stays 2
945
946        // For 4 lockers: tolerance = 2, quorum = 2
947        // Write lock: quorum == tolerance (2 == 2 is true), so quorum becomes 3
948
949        // The logic seems to be: for write locks, if exactly half the lockers
950        // would be tolerance, we need one more to avoid split brain
951
952        // Let's verify this makes sense:
953        struct QuorumTest {
954            locker_count: usize,
955            expected_tolerance: usize,
956            expected_write_quorum: usize,
957            expected_read_quorum: usize,
958        }
959
960        let test_cases = vec![
961            QuorumTest {
962                locker_count: 1,
963                expected_tolerance: 0,
964                expected_write_quorum: 1,
965                expected_read_quorum: 1,
966            },
967            QuorumTest {
968                locker_count: 2,
969                expected_tolerance: 1,
970                expected_write_quorum: 2,
971                expected_read_quorum: 1,
972            },
973            QuorumTest {
974                locker_count: 3,
975                expected_tolerance: 1,
976                expected_write_quorum: 2,
977                expected_read_quorum: 2,
978            },
979            QuorumTest {
980                locker_count: 4,
981                expected_tolerance: 2,
982                expected_write_quorum: 3,
983                expected_read_quorum: 2,
984            },
985            QuorumTest {
986                locker_count: 5,
987                expected_tolerance: 2,
988                expected_write_quorum: 3,
989                expected_read_quorum: 3,
990            },
991        ];
992
993        for test_case in test_cases {
994            let tolerance = test_case.locker_count / 2;
995            let mut write_quorum = test_case.locker_count - tolerance;
996            let read_quorum = write_quorum;
997
998            // Apply write lock special case
999            if write_quorum == tolerance {
1000                write_quorum += 1;
1001            }
1002
1003            assert_eq!(
1004                tolerance, test_case.expected_tolerance,
1005                "Tolerance mismatch for {} lockers",
1006                test_case.locker_count
1007            );
1008            assert_eq!(
1009                write_quorum, test_case.expected_write_quorum,
1010                "Write quorum mismatch for {} lockers",
1011                test_case.locker_count
1012            );
1013            assert_eq!(
1014                read_quorum, test_case.expected_read_quorum,
1015                "Read quorum mismatch for {} lockers",
1016                test_case.locker_count
1017            );
1018        }
1019    }
1020
1021    #[test]
1022    fn test_potential_integer_overflow() {
1023        // Test potential issues with tolerance calculation
1024
1025        // What happens with 0 lockers? This should probably be an error case
1026        let locker_count = 0;
1027        let tolerance = locker_count / 2; // 0 / 2 = 0
1028        let quorum = locker_count - tolerance; // 0 - 0 = 0
1029
1030        // This would result in quorum = 0, which doesn't make sense
1031        assert_eq!(tolerance, 0);
1032        assert_eq!(quorum, 0);
1033
1034        // The code should probably validate that locker_count > 0
1035    }
1036
1037    #[test]
1038    fn test_drw_mutex_constants() {
1039        // Test that constants are reasonable
1040        assert!(DRW_MUTEX_REFRESH_INTERVAL.as_secs() > 0);
1041        assert!(LOCK_RETRY_MIN_INTERVAL.as_millis() > 0);
1042        assert!(DRW_MUTEX_REFRESH_INTERVAL > LOCK_RETRY_MIN_INTERVAL);
1043    }
1044
1045    #[test]
1046    fn test_drw_mutex_new_with_unsorted_names() {
1047        let names = vec!["zebra".to_string(), "alpha".to_string(), "beta".to_string()];
1048        let lockers = create_mock_lockers(1);
1049        let mutex = DRWMutex::new("owner1".to_string(), names, lockers);
1050
1051        // Names should be sorted
1052        assert_eq!(mutex.names, vec!["alpha", "beta", "zebra"]);
1053    }
1054
1055    #[test]
1056    fn test_drw_mutex_new_with_duplicate_names() {
1057        let names = vec![
1058            "resource1".to_string(),
1059            "resource2".to_string(),
1060            "resource1".to_string(), // Duplicate
1061        ];
1062        let lockers = create_mock_lockers(1);
1063        let mutex = DRWMutex::new("owner1".to_string(), names, lockers);
1064
1065        // Should keep duplicates but sort them
1066        assert_eq!(mutex.names, vec!["resource1", "resource1", "resource2"]);
1067    }
1068
1069    #[tokio::test]
1070    async fn test_drw_mutex_lock_and_rlock_methods() {
1071        let names = vec!["resource1".to_string()];
1072        let lockers = create_mock_lockers(1);
1073        let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
1074
1075        let id = "test-id".to_string();
1076        let source = "test-source".to_string();
1077
1078        // Test the convenience methods (lock and r_lock)
1079        // These should not panic and should attempt to acquire locks
1080        mutex.lock(&id, &source).await;
1081        // Note: We can't easily test the result since these methods don't return bool
1082
1083        // Clear any state
1084        mutex.un_lock().await;
1085
1086        // Test r_lock
1087        mutex.r_lock(&id, &source).await;
1088        mutex.un_r_lock().await;
1089    }
1090
1091    #[tokio::test]
1092    async fn test_drw_mutex_zero_lockers() {
1093        let names = vec!["resource1".to_string()];
1094        let lockers = vec![]; // No lockers
1095        let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
1096
1097        let id = "test-id".to_string();
1098        let source = "test-source".to_string();
1099        let opts = Options {
1100            timeout: Duration::from_secs(1),
1101            retry_interval: Duration::from_millis(10),
1102        };
1103
1104        // With 0 lockers, quorum calculation:
1105        // tolerance = 0 / 2 = 0
1106        // quorum = 0 - 0 = 0
1107        // This should fail because we can't achieve any quorum
1108        let _result = mutex.get_lock(&id, &source, &opts).await;
1109        assert!(!_result, "Should fail with zero lockers");
1110    }
1111
1112    #[test]
1113    fn test_check_quorum_locked_edge_cases() {
1114        // Test with quorum 0
1115        let locks = vec!["".to_string()];
1116        assert!(check_quorum_locked(&locks, 0)); // 0 >= 0
1117
1118        // Test with quorum larger than locks
1119        let locks = vec!["uid1".to_string()];
1120        assert!(!check_quorum_locked(&locks, 5)); // 1 < 5
1121
1122        // Test with all locks but high quorum
1123        let locks = vec!["uid1".to_string(), "uid2".to_string(), "uid3".to_string()];
1124        assert!(!check_quorum_locked(&locks, 4)); // 3 < 4
1125    }
1126
1127    #[test]
1128    fn test_check_failed_unlocks_comprehensive() {
1129        // Test all combinations for small lock counts
1130
1131        // 1 lock scenarios
1132        assert!(!check_failed_unlocks(&["".to_string()], 0)); // 1 success, tolerance 0 -> 1 > 0 = true, but tolerance >= len, so false
1133        assert!(!check_failed_unlocks(&["".to_string()], 1)); // tolerance >= len
1134        assert!(!check_failed_unlocks(&["uid".to_string()], 1)); // tolerance >= len
1135        assert!(check_failed_unlocks(&["uid".to_string()], 0)); // 1 failed > 0
1136
1137        // 2 lock scenarios
1138        let two_failed = vec!["uid1".to_string(), "uid2".to_string()];
1139        let one_failed = vec!["uid1".to_string(), "".to_string()];
1140        let zero_failed = vec!["".to_string(), "".to_string()];
1141
1142        // tolerance = 0
1143        assert!(check_failed_unlocks(&two_failed, 0)); // 2 > 0
1144        assert!(check_failed_unlocks(&one_failed, 0)); // 1 > 0
1145        assert!(!check_failed_unlocks(&zero_failed, 0)); // 0 > 0 = false
1146
1147        // tolerance = 1 (special case: 2 - 1 == 1)
1148        assert!(check_failed_unlocks(&two_failed, 1)); // 2 >= 1
1149        assert!(check_failed_unlocks(&one_failed, 1)); // 1 >= 1
1150        assert!(!check_failed_unlocks(&zero_failed, 1)); // 0 >= 1 = false
1151
1152        // tolerance = 2
1153        assert!(!check_failed_unlocks(&two_failed, 2)); // tolerance >= len
1154        assert!(!check_failed_unlocks(&one_failed, 2)); // tolerance >= len
1155        assert!(!check_failed_unlocks(&zero_failed, 2)); // tolerance >= len
1156    }
1157
1158    #[test]
1159    fn test_options_clone() {
1160        let opts = Options {
1161            timeout: Duration::from_secs(5),
1162            retry_interval: Duration::from_millis(100),
1163        };
1164        let cloned = opts.clone();
1165        assert_eq!(opts.timeout, cloned.timeout);
1166        assert_eq!(opts.retry_interval, cloned.retry_interval);
1167    }
1168
1169    #[tokio::test]
1170    async fn test_drw_mutex_release_all_edge_cases() {
1171        let names = vec!["resource1".to_string()];
1172        let lockers = create_mock_lockers(2);
1173        let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
1174
1175        // Test release_all with empty locks
1176        let mut empty_locks = vec!["".to_string(), "".to_string()];
1177        let result = mutex.release_all(1, &mut empty_locks, false).await;
1178        assert!(result, "Should succeed when releasing empty locks");
1179
1180        // Test release_all with some locks
1181        let mut some_locks = vec!["uid1".to_string(), "uid2".to_string()];
1182        let result = mutex.release_all(1, &mut some_locks, false).await;
1183        // This should attempt to release the locks and may succeed or fail
1184        // depending on the local locker state - just ensure it doesn't panic
1185        let _ = result; // Suppress unused variable warning
1186    }
1187
1188    #[test]
1189    fn test_drw_mutex_struct_fields() {
1190        let names = vec!["resource1".to_string()];
1191        let lockers = create_mock_lockers(2);
1192        let mutex = DRWMutex::new("test-owner".to_string(), names, lockers);
1193
1194        // Test that all fields are properly initialized
1195        assert_eq!(mutex.owner, "test-owner");
1196        assert_eq!(mutex.names, vec!["resource1"]);
1197        assert_eq!(mutex.write_locks.len(), 2);
1198        assert_eq!(mutex.read_locks.len(), 2);
1199        assert_eq!(mutex.lockers.len(), 2);
1200        assert!(mutex.cancel_refresh_sender.is_none());
1201        assert_eq!(mutex.refresh_interval, DRW_MUTEX_REFRESH_INTERVAL);
1202        assert_eq!(mutex.lock_retry_min_interval, LOCK_RETRY_MIN_INTERVAL);
1203
1204        // All locks should be initially empty
1205        for lock in &mutex.write_locks {
1206            assert!(lock.is_empty());
1207        }
1208        for lock in &mutex.read_locks {
1209            assert!(lock.is_empty());
1210        }
1211    }
1212}