1use std::time::{Duration, Instant};
16use tokio::{sync::mpsc::Sender, time::sleep};
17use tracing::{info, warn};
18
19use crate::{LockApi, Locker, lock_args::LockArgs};
20
21const DRW_MUTEX_REFRESH_INTERVAL: Duration = Duration::from_secs(10);
22const LOCK_RETRY_MIN_INTERVAL: Duration = Duration::from_millis(250);
23
24#[derive(Debug)]
25pub struct DRWMutex {
26 owner: String,
27 names: Vec<String>,
28 write_locks: Vec<String>,
29 read_locks: Vec<String>,
30 cancel_refresh_sender: Option<Sender<bool>>,
31 lockers: Vec<LockApi>,
33 refresh_interval: Duration,
34 lock_retry_min_interval: Duration,
35}
36
37#[derive(Debug, Default, Clone)]
38pub struct Granted {
39 index: usize,
40 lock_uid: String,
41}
42
43impl Granted {
44 fn is_locked(&self) -> bool {
45 is_locked(&self.lock_uid)
46 }
47}
48
49fn is_locked(uid: &str) -> bool {
50 !uid.is_empty()
51}
52
53#[derive(Debug, Clone)]
54pub struct Options {
55 pub timeout: Duration,
56 pub retry_interval: Duration,
57}
58
59impl DRWMutex {
60 pub fn new(owner: String, names: Vec<String>, lockers: Vec<LockApi>) -> Self {
61 let mut names = names;
62 names.sort();
63 Self {
64 owner,
65 names,
66 write_locks: vec![String::new(); lockers.len()],
67 read_locks: vec![String::new(); lockers.len()],
68 cancel_refresh_sender: None,
69 lockers,
71 refresh_interval: DRW_MUTEX_REFRESH_INTERVAL,
72 lock_retry_min_interval: LOCK_RETRY_MIN_INTERVAL,
73 }
74 }
75
76 fn is_locked(&self) -> bool {
77 self.write_locks.iter().any(|w_lock| is_locked(w_lock))
78 }
79
80 fn is_r_locked(&self) -> bool {
81 self.read_locks.iter().any(|r_lock| is_locked(r_lock))
82 }
83}
84
85impl DRWMutex {
86 pub async fn lock(&mut self, id: &String, source: &String) {
87 let is_read_lock = false;
88 let opts = Options {
89 timeout: Duration::from_secs(10),
90 retry_interval: Duration::from_millis(50),
91 };
92 self.lock_blocking(id, source, is_read_lock, &opts).await;
93 }
94
95 pub async fn get_lock(&mut self, id: &String, source: &String, opts: &Options) -> bool {
96 let is_read_lock = false;
97 self.lock_blocking(id, source, is_read_lock, opts).await
98 }
99
100 pub async fn r_lock(&mut self, id: &String, source: &String) {
101 let is_read_lock = true;
102 let opts = Options {
103 timeout: Duration::from_secs(10),
104 retry_interval: Duration::from_millis(50),
105 };
106 self.lock_blocking(id, source, is_read_lock, &opts).await;
107 }
108
109 pub async fn get_r_lock(&mut self, id: &String, source: &String, opts: &Options) -> bool {
110 let is_read_lock = true;
111 self.lock_blocking(id, source, is_read_lock, opts).await
112 }
113
114 pub async fn lock_blocking(&mut self, id: &String, source: &String, is_read_lock: bool, opts: &Options) -> bool {
115 let locker_len = self.lockers.len();
116
117 if locker_len == 0 {
119 return false;
120 }
121
122 let mut tolerance = locker_len / 2;
123 let mut quorum = locker_len - tolerance;
124 if !is_read_lock {
125 if quorum == tolerance {
130 quorum += 1;
131 }
132 }
133 info!(
134 "lockBlocking {}/{} for {:?}: lockType readLock({}), additional opts: {:?}, quorum: {}, tolerance: {}, lockClients: {}\n",
135 id, source, self.names, is_read_lock, opts, quorum, tolerance, locker_len
136 );
137
138 tolerance = locker_len.saturating_sub(quorum);
141 let mut attempt = 0;
142 let mut locks = vec!["".to_string(); self.lockers.len()];
143
144 loop {
145 if self.inner_lock(&mut locks, id, source, is_read_lock, tolerance, quorum).await {
146 if is_read_lock {
147 self.read_locks = locks;
148 } else {
149 self.write_locks = locks;
150 }
151
152 info!("lock_blocking {}/{} for {:?}: granted", id, source, self.names);
153
154 return true;
155 }
156
157 attempt += 1;
158 if attempt >= 10 {
159 break;
160 }
161 sleep(opts.retry_interval).await;
162 }
163
164 false
165 }
166
167 async fn inner_lock(
168 &mut self,
169 locks: &mut [String],
170 id: &String,
171 source: &String,
172 is_read_lock: bool,
173 tolerance: usize,
174 quorum: usize,
175 ) -> bool {
176 locks.iter_mut().for_each(|lock| *lock = "".to_string());
177
178 let mut granteds = Vec::with_capacity(self.lockers.len());
179 let args = LockArgs {
180 uid: id.to_string(),
181 resources: self.names.clone(),
182 owner: self.owner.clone(),
183 source: source.to_string(),
184 quorum,
185 };
186
187 for (index, locker) in self.lockers.iter_mut().enumerate() {
188 let mut granted = Granted {
189 index,
190 ..Default::default()
191 };
192
193 if is_read_lock {
194 match locker.rlock(&args).await {
195 Ok(locked) => {
196 if locked {
197 granted.lock_uid = id.to_string();
198 }
199 }
200 Err(err) => {
201 warn!("Unable to call RLock failed with {} for {} at {:?}", err, args, locker);
202 }
203 }
204 } else {
205 match locker.lock(&args).await {
206 Ok(locked) => {
207 if locked {
208 granted.lock_uid = id.to_string();
209 }
210 }
211 Err(err) => {
212 warn!("Unable to call Lock failed with {} for {} at {:?}", err, args, locker);
213 }
214 }
215 }
216
217 granteds.push(granted);
218 }
219
220 granteds.iter().for_each(|granted| {
221 locks[granted.index] = granted.lock_uid.clone();
222 });
223
224 let quorum_locked = check_quorum_locked(locks, quorum);
225 if !quorum_locked {
226 info!("Unable to acquire lock in quorum, {}", args);
227 if !self.release_all(tolerance, locks, is_read_lock).await {
228 info!("Unable to release acquired locks, these locks will expire automatically {}", args);
229 }
230 }
231
232 quorum_locked
233 }
234
235 pub async fn un_lock(&mut self) {
236 if self.write_locks.is_empty() || !self.is_locked() {
237 warn!("Trying to un_lock() while no lock() is active, write_locks: {:?}", self.write_locks)
238 }
239
240 let tolerance = self.lockers.len() / 2;
241 let is_read_lock = false;
242 let mut locks = std::mem::take(&mut self.write_locks);
243 let start = Instant::now();
244 loop {
245 if self.release_all(tolerance, &mut locks, is_read_lock).await {
246 return;
247 }
248
249 sleep(self.lock_retry_min_interval).await;
250 if Instant::now().duration_since(start) > Duration::from_secs(30) {
251 return;
252 }
253 }
254 }
255
256 pub async fn un_r_lock(&mut self) {
257 if self.read_locks.is_empty() || !self.is_r_locked() {
258 warn!("Trying to un_r_lock() while no r_lock() is active, read_locks: {:?}", self.read_locks)
259 }
260
261 let tolerance = self.lockers.len() / 2;
262 let is_read_lock = true;
263 let mut locks = std::mem::take(&mut self.read_locks);
264 let start = Instant::now();
265 loop {
266 if self.release_all(tolerance, &mut locks, is_read_lock).await {
267 return;
268 }
269
270 sleep(self.lock_retry_min_interval).await;
271 if Instant::now().duration_since(start) > Duration::from_secs(30) {
272 return;
273 }
274 }
275 }
276
277 async fn release_all(&mut self, tolerance: usize, locks: &mut [String], is_read_lock: bool) -> bool {
278 for (index, locker) in self.lockers.iter_mut().enumerate() {
279 if send_release(locker, &locks[index], &self.owner, &self.names, is_read_lock).await {
280 locks[index] = "".to_string();
281 }
282 }
283
284 !check_failed_unlocks(locks, tolerance)
285 }
286}
287
288fn check_failed_unlocks(locks: &[String], tolerance: usize) -> bool {
313 let mut un_locks_failed = 0;
314 locks.iter().for_each(|lock| {
315 if is_locked(lock) {
316 un_locks_failed += 1;
317 }
318 });
319
320 if tolerance >= locks.len() {
323 return false;
324 }
325
326 if locks.len() - tolerance == tolerance {
329 return un_locks_failed >= tolerance;
330 }
331
332 un_locks_failed > tolerance
334}
335
336async fn send_release(locker: &mut LockApi, uid: &String, owner: &str, names: &[String], is_read_lock: bool) -> bool {
337 if uid.is_empty() {
338 return false;
339 }
340
341 let args = LockArgs {
342 uid: uid.to_string(),
343 owner: owner.to_owned(),
344 resources: names.to_owned(),
345 ..Default::default()
346 };
347
348 if is_read_lock {
349 match locker.runlock(&args).await {
350 Ok(locked) => {
351 if !locked {
352 warn!("Unable to release runlock, args: {}", args);
353 return false;
354 }
355 }
356 Err(err) => {
357 warn!("Unable to call RLock failed with {} for {} at {:?}", err, args, locker);
358 return false;
359 }
360 }
361 } else {
362 match locker.unlock(&args).await {
363 Ok(locked) => {
364 if !locked {
365 warn!("Unable to release unlock, args: {}", args);
366 return false;
367 }
368 }
369 Err(err) => {
370 warn!("Unable to call Lock failed with {} for {} at {:?}", err, args, locker);
371 return false;
372 }
373 }
374 }
375
376 true
377}
378
379fn check_quorum_locked(locks: &[String], quorum: usize) -> bool {
380 let mut count = 0;
381 locks.iter().for_each(|lock| {
382 if is_locked(lock) {
383 count += 1;
384 }
385 });
386
387 count >= quorum
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393 use crate::local_locker::LocalLocker;
394 use async_trait::async_trait;
395 use std::collections::HashMap;
396 use std::io::{Error, Result};
397 use std::sync::{Arc, Mutex};
398
399 #[derive(Debug, Clone)]
401 struct MockLocker {
402 id: String,
403 state: Arc<Mutex<MockLockerState>>,
404 }
405
406 #[derive(Debug, Default)]
407 struct MockLockerState {
408 locks: HashMap<String, String>, read_locks: HashMap<String, String>, should_fail: bool,
411 is_online: bool,
412 }
413
414 impl MockLocker {
415 fn new(id: String) -> Self {
416 Self {
417 id,
418 state: Arc::new(Mutex::new(MockLockerState {
419 is_online: true,
420 ..Default::default()
421 })),
422 }
423 }
424
425 fn set_should_fail(&self, should_fail: bool) {
426 self.state.lock().unwrap().should_fail = should_fail;
427 }
428
429 fn set_online(&self, online: bool) {
430 self.state.lock().unwrap().is_online = online;
431 }
432
433 fn get_lock_count(&self) -> usize {
434 self.state.lock().unwrap().locks.len()
435 }
436
437 fn get_read_lock_count(&self) -> usize {
438 self.state.lock().unwrap().read_locks.len()
439 }
440
441 fn has_lock(&self, uid: &str) -> bool {
442 self.state.lock().unwrap().locks.contains_key(uid)
443 }
444
445 fn has_read_lock(&self, uid: &str) -> bool {
446 self.state.lock().unwrap().read_locks.contains_key(uid)
447 }
448 }
449
450 #[async_trait]
451 impl Locker for MockLocker {
452 async fn lock(&mut self, args: &LockArgs) -> Result<bool> {
453 let mut state = self.state.lock().unwrap();
454 if state.should_fail {
455 return Err(Error::other("Mock lock failure"));
456 }
457 if !state.is_online {
458 return Err(Error::other("Mock locker offline"));
459 }
460
461 if state.locks.contains_key(&args.uid) {
463 return Ok(false);
464 }
465
466 state.locks.insert(args.uid.clone(), args.owner.clone());
467 Ok(true)
468 }
469
470 async fn unlock(&mut self, args: &LockArgs) -> Result<bool> {
471 let mut state = self.state.lock().unwrap();
472 if state.should_fail {
473 return Err(Error::other("Mock unlock failure"));
474 }
475
476 Ok(state.locks.remove(&args.uid).is_some())
477 }
478
479 async fn rlock(&mut self, args: &LockArgs) -> Result<bool> {
480 let mut state = self.state.lock().unwrap();
481 if state.should_fail {
482 return Err(Error::other("Mock rlock failure"));
483 }
484 if !state.is_online {
485 return Err(Error::other("Mock locker offline"));
486 }
487
488 if state.locks.contains_key(&args.uid) {
490 return Ok(false);
491 }
492
493 state.read_locks.insert(args.uid.clone(), args.owner.clone());
494 Ok(true)
495 }
496
497 async fn runlock(&mut self, args: &LockArgs) -> Result<bool> {
498 let mut state = self.state.lock().unwrap();
499 if state.should_fail {
500 return Err(Error::other("Mock runlock failure"));
501 }
502
503 Ok(state.read_locks.remove(&args.uid).is_some())
504 }
505
506 async fn refresh(&mut self, _args: &LockArgs) -> Result<bool> {
507 let state = self.state.lock().unwrap();
508 if state.should_fail {
509 return Err(Error::other("Mock refresh failure"));
510 }
511 Ok(true)
512 }
513
514 async fn force_unlock(&mut self, args: &LockArgs) -> Result<bool> {
515 let mut state = self.state.lock().unwrap();
516 let removed_lock = state.locks.remove(&args.uid).is_some();
517 let removed_read_lock = state.read_locks.remove(&args.uid).is_some();
518 Ok(removed_lock || removed_read_lock)
519 }
520
521 async fn close(&self) {}
522
523 async fn is_online(&self) -> bool {
524 self.state.lock().unwrap().is_online
525 }
526
527 async fn is_local(&self) -> bool {
528 true
529 }
530 }
531
532 fn create_mock_lockers(count: usize) -> Vec<LockApi> {
533 (0..count).map(|_| LockApi::Local).collect()
535 }
536
537 #[test]
538 fn test_drw_mutex_new() {
539 let names = vec!["resource1".to_string(), "resource2".to_string()];
540 let lockers = create_mock_lockers(3);
541 let mutex = DRWMutex::new("owner1".to_string(), names.clone(), lockers);
542
543 assert_eq!(mutex.owner, "owner1");
544 assert_eq!(mutex.names.len(), 2);
545 assert_eq!(mutex.lockers.len(), 3);
546 assert_eq!(mutex.write_locks.len(), 3);
547 assert_eq!(mutex.read_locks.len(), 3);
548 assert_eq!(mutex.refresh_interval, DRW_MUTEX_REFRESH_INTERVAL);
549 assert_eq!(mutex.lock_retry_min_interval, LOCK_RETRY_MIN_INTERVAL);
550
551 let mut expected_names = names;
553 expected_names.sort();
554 assert_eq!(mutex.names, expected_names);
555 }
556
557 #[test]
558 fn test_drw_mutex_new_empty_names() {
559 let names = vec![];
560 let lockers = create_mock_lockers(1);
561 let mutex = DRWMutex::new("owner1".to_string(), names, lockers);
562
563 assert_eq!(mutex.names.len(), 0);
564 assert_eq!(mutex.lockers.len(), 1);
565 }
566
567 #[test]
568 fn test_drw_mutex_new_single_locker() {
569 let names = vec!["resource1".to_string()];
570 let lockers = create_mock_lockers(1);
571 let mutex = DRWMutex::new("owner1".to_string(), names, lockers);
572
573 assert_eq!(mutex.lockers.len(), 1);
574 assert_eq!(mutex.write_locks.len(), 1);
575 assert_eq!(mutex.read_locks.len(), 1);
576 }
577
578 #[test]
579 fn test_is_locked_function() {
580 assert!(!is_locked(""));
581 assert!(is_locked("some-uid"));
582 assert!(is_locked("any-non-empty-string"));
583 }
584
585 #[test]
586 fn test_granted_is_locked() {
587 let granted_empty = Granted {
588 index: 0,
589 lock_uid: "".to_string(),
590 };
591 assert!(!granted_empty.is_locked());
592
593 let granted_locked = Granted {
594 index: 1,
595 lock_uid: "test-uid".to_string(),
596 };
597 assert!(granted_locked.is_locked());
598 }
599
600 #[test]
601 fn test_drw_mutex_is_locked() {
602 let names = vec!["resource1".to_string()];
603 let lockers = create_mock_lockers(2);
604 let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
605
606 assert!(!mutex.is_locked());
608 assert!(!mutex.is_r_locked());
609
610 mutex.write_locks[0] = "test-uid".to_string();
612 assert!(mutex.is_locked());
613 assert!(!mutex.is_r_locked());
614
615 mutex.write_locks[0] = "".to_string();
617 mutex.read_locks[1] = "read-uid".to_string();
618 assert!(!mutex.is_locked());
619 assert!(mutex.is_r_locked());
620 }
621
622 #[test]
623 fn test_options_debug() {
624 let opts = Options {
625 timeout: Duration::from_secs(5),
626 retry_interval: Duration::from_millis(100),
627 };
628 let debug_str = format!("{opts:?}");
629 assert!(debug_str.contains("timeout"));
630 assert!(debug_str.contains("retry_interval"));
631 }
632
633 #[test]
634 fn test_check_quorum_locked() {
635 assert!(!check_quorum_locked(&[], 1));
637
638 let locks = vec!["".to_string(), "".to_string(), "".to_string()];
640 assert!(!check_quorum_locked(&locks, 1));
641 assert!(!check_quorum_locked(&locks, 2));
642
643 let locks = vec!["uid1".to_string(), "".to_string(), "uid3".to_string()];
645 assert!(check_quorum_locked(&locks, 1));
646 assert!(check_quorum_locked(&locks, 2));
647 assert!(!check_quorum_locked(&locks, 3));
648
649 let locks = vec!["uid1".to_string(), "uid2".to_string(), "uid3".to_string()];
651 assert!(check_quorum_locked(&locks, 1));
652 assert!(check_quorum_locked(&locks, 2));
653 assert!(check_quorum_locked(&locks, 3));
654 assert!(!check_quorum_locked(&locks, 4));
655 }
656
657 #[test]
658 fn test_check_failed_unlocks() {
659 assert!(!check_failed_unlocks(&[], 0)); assert!(!check_failed_unlocks(&[], 1)); let locks = vec!["".to_string(), "".to_string(), "".to_string()];
665 assert!(!check_failed_unlocks(&locks, 1)); assert!(!check_failed_unlocks(&locks, 2)); let locks = vec!["uid1".to_string(), "".to_string(), "uid3".to_string()];
670 assert!(check_failed_unlocks(&locks, 1)); assert!(!check_failed_unlocks(&locks, 2)); let locks = vec!["uid1".to_string(), "uid2".to_string()]; let tolerance = 1; assert!(check_failed_unlocks(&locks, tolerance)); let locks = vec!["".to_string(), "uid2".to_string()]; assert!(check_failed_unlocks(&locks, tolerance)); let locks = vec!["".to_string(), "".to_string()]; assert!(!check_failed_unlocks(&locks, tolerance)); }
685
686 #[test]
687 fn test_check_failed_unlocks_edge_cases() {
688 let locks = vec!["uid1".to_string()];
690 assert!(check_failed_unlocks(&locks, 0)); let locks = vec!["uid1".to_string(), "uid2".to_string()];
694 assert!(!check_failed_unlocks(&locks, 2)); let locks = vec!["uid1".to_string()];
698 assert!(!check_failed_unlocks(&locks, 5)); }
700
701 #[tokio::test]
703 async fn test_drw_mutex_lock_basic_functionality() {
704 let names = vec!["resource1".to_string()];
705 let lockers = create_mock_lockers(1); let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
707
708 let id = "test-lock-id".to_string();
709 let source = "test-source".to_string();
710 let opts = Options {
711 timeout: Duration::from_secs(1),
712 retry_interval: Duration::from_millis(10),
713 };
714
715 let _result = mutex.get_lock(&id, &source, &opts).await;
717 if _result {
722 assert!(mutex.is_locked(), "Mutex should be in locked state");
723 mutex.un_lock().await;
724 assert!(!mutex.is_locked(), "Mutex should be unlocked after un_lock");
725 }
726 }
727
728 #[tokio::test]
729 async fn test_drw_mutex_rlock_basic_functionality() {
730 let names = vec!["resource1".to_string()];
731 let lockers = create_mock_lockers(1); let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
733
734 let id = "test-rlock-id".to_string();
735 let source = "test-source".to_string();
736 let opts = Options {
737 timeout: Duration::from_secs(1),
738 retry_interval: Duration::from_millis(10),
739 };
740
741 let _result = mutex.get_r_lock(&id, &source, &opts).await;
743 if _result {
748 assert!(mutex.is_r_locked(), "Mutex should be in read locked state");
749 mutex.un_r_lock().await;
750 assert!(!mutex.is_r_locked(), "Mutex should be unlocked after un_r_lock");
751 }
752 }
753
754 #[tokio::test]
755 async fn test_drw_mutex_lock_with_multiple_lockers() {
756 let names = vec!["resource1".to_string()];
757 let lockers = create_mock_lockers(3); let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
759
760 let id = "test-lock-id".to_string();
761 let source = "test-source".to_string();
762 let opts = Options {
763 timeout: Duration::from_secs(1),
764 retry_interval: Duration::from_millis(10),
765 };
766
767 let _result = mutex.get_lock(&id, &source, &opts).await;
773 }
777
778 #[tokio::test]
779 async fn test_drw_mutex_unlock_without_lock() {
780 let names = vec!["resource1".to_string()];
781 let lockers = create_mock_lockers(1);
782 let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
783
784 mutex.un_lock().await;
786 assert!(!mutex.is_locked());
787
788 mutex.un_r_lock().await;
790 assert!(!mutex.is_r_locked());
791 }
792
793 #[tokio::test]
794 async fn test_drw_mutex_multiple_resources() {
795 let names = vec!["resource1".to_string(), "resource2".to_string(), "resource3".to_string()];
796 let lockers = create_mock_lockers(1);
797 let mut mutex = DRWMutex::new("owner1".to_string(), names.clone(), lockers);
798
799 let mut expected_names = names;
801 expected_names.sort();
802 assert_eq!(mutex.names, expected_names);
803
804 let id = "test-lock-id".to_string();
805 let source = "test-source".to_string();
806 let opts = Options {
807 timeout: Duration::from_secs(1),
808 retry_interval: Duration::from_millis(10),
809 };
810
811 let _result = mutex.get_lock(&id, &source, &opts).await;
812 }
816
817 #[tokio::test]
818 async fn test_drw_mutex_concurrent_read_locks() {
819 {
821 let mut global_server = crate::GLOBAL_LOCAL_SERVER.write().await;
822 *global_server = LocalLocker::new();
823 }
824
825 let names = vec!["test-resource".to_string()];
827 let lockers = create_mock_lockers(1);
828 let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
829
830 let id1 = "test-rlock-id1".to_string();
831 let id2 = "test-rlock-id2".to_string();
832 let source = "test-source".to_string();
833 let opts = Options {
834 timeout: Duration::from_secs(5),
835 retry_interval: Duration::from_millis(50),
836 };
837
838 let result1 = mutex.get_r_lock(&id1, &source, &opts).await;
840 assert!(result1, "First read lock should succeed");
841
842 mutex.un_r_lock().await;
844
845 let result2 = mutex.get_r_lock(&id2, &source, &opts).await;
847 assert!(result2, "Second read lock should succeed after first is released");
848
849 mutex.un_r_lock().await;
851 }
852
853 #[tokio::test]
854 async fn test_send_release_with_empty_uid() {
855 let mut locker = LockApi::Local;
856 let result = send_release(&mut locker, &"".to_string(), "owner", &["resource".to_string()], false).await;
857 assert!(!result, "send_release should return false for empty uid");
858 }
859
860 #[test]
861 fn test_drw_mutex_debug() {
862 let names = vec!["resource1".to_string()];
863 let lockers = create_mock_lockers(1);
864 let mutex = DRWMutex::new("owner1".to_string(), names, lockers);
865
866 let debug_str = format!("{mutex:?}");
867 assert!(debug_str.contains("DRWMutex"));
868 assert!(debug_str.contains("owner"));
869 assert!(debug_str.contains("names"));
870 }
871
872 #[test]
873 fn test_granted_default() {
874 let granted = Granted::default();
875 assert_eq!(granted.index, 0);
876 assert_eq!(granted.lock_uid, "");
877 assert!(!granted.is_locked());
878 }
879
880 #[test]
881 fn test_granted_clone() {
882 let granted = Granted {
883 index: 5,
884 lock_uid: "test-uid".to_string(),
885 };
886 let cloned = granted.clone();
887 assert_eq!(granted.index, cloned.index);
888 assert_eq!(granted.lock_uid, cloned.lock_uid);
889 }
890
891 #[test]
893 fn test_potential_bug_check_failed_unlocks_logic() {
894 let locks = vec!["uid1".to_string(), "uid2".to_string(), "uid3".to_string(), "uid4".to_string()];
898 let tolerance = 2; assert!(check_failed_unlocks(&locks, tolerance)); let locks = vec!["uid1".to_string(), "uid2".to_string(), "".to_string(), "".to_string()];
907 assert!(check_failed_unlocks(&locks, tolerance)); let locks = vec!["uid1".to_string(), "".to_string(), "".to_string(), "".to_string()];
911 assert!(!check_failed_unlocks(&locks, tolerance)); let locks = vec!["uid1".to_string(), "uid2".to_string(), "uid3".to_string()];
915 let tolerance = 1; assert!(check_failed_unlocks(&locks, tolerance)); let locks = vec!["uid1".to_string(), "uid2".to_string(), "".to_string()];
924 assert!(check_failed_unlocks(&locks, tolerance)); let locks = vec!["uid1".to_string(), "".to_string(), "".to_string()];
928 assert!(!check_failed_unlocks(&locks, tolerance)); }
930
931 #[test]
932 fn test_quorum_calculation_edge_cases() {
933 struct QuorumTest {
954 locker_count: usize,
955 expected_tolerance: usize,
956 expected_write_quorum: usize,
957 expected_read_quorum: usize,
958 }
959
960 let test_cases = vec![
961 QuorumTest {
962 locker_count: 1,
963 expected_tolerance: 0,
964 expected_write_quorum: 1,
965 expected_read_quorum: 1,
966 },
967 QuorumTest {
968 locker_count: 2,
969 expected_tolerance: 1,
970 expected_write_quorum: 2,
971 expected_read_quorum: 1,
972 },
973 QuorumTest {
974 locker_count: 3,
975 expected_tolerance: 1,
976 expected_write_quorum: 2,
977 expected_read_quorum: 2,
978 },
979 QuorumTest {
980 locker_count: 4,
981 expected_tolerance: 2,
982 expected_write_quorum: 3,
983 expected_read_quorum: 2,
984 },
985 QuorumTest {
986 locker_count: 5,
987 expected_tolerance: 2,
988 expected_write_quorum: 3,
989 expected_read_quorum: 3,
990 },
991 ];
992
993 for test_case in test_cases {
994 let tolerance = test_case.locker_count / 2;
995 let mut write_quorum = test_case.locker_count - tolerance;
996 let read_quorum = write_quorum;
997
998 if write_quorum == tolerance {
1000 write_quorum += 1;
1001 }
1002
1003 assert_eq!(
1004 tolerance, test_case.expected_tolerance,
1005 "Tolerance mismatch for {} lockers",
1006 test_case.locker_count
1007 );
1008 assert_eq!(
1009 write_quorum, test_case.expected_write_quorum,
1010 "Write quorum mismatch for {} lockers",
1011 test_case.locker_count
1012 );
1013 assert_eq!(
1014 read_quorum, test_case.expected_read_quorum,
1015 "Read quorum mismatch for {} lockers",
1016 test_case.locker_count
1017 );
1018 }
1019 }
1020
1021 #[test]
1022 fn test_potential_integer_overflow() {
1023 let locker_count = 0;
1027 let tolerance = locker_count / 2; let quorum = locker_count - tolerance; assert_eq!(tolerance, 0);
1032 assert_eq!(quorum, 0);
1033
1034 }
1036
1037 #[test]
1038 fn test_drw_mutex_constants() {
1039 assert!(DRW_MUTEX_REFRESH_INTERVAL.as_secs() > 0);
1041 assert!(LOCK_RETRY_MIN_INTERVAL.as_millis() > 0);
1042 assert!(DRW_MUTEX_REFRESH_INTERVAL > LOCK_RETRY_MIN_INTERVAL);
1043 }
1044
1045 #[test]
1046 fn test_drw_mutex_new_with_unsorted_names() {
1047 let names = vec!["zebra".to_string(), "alpha".to_string(), "beta".to_string()];
1048 let lockers = create_mock_lockers(1);
1049 let mutex = DRWMutex::new("owner1".to_string(), names, lockers);
1050
1051 assert_eq!(mutex.names, vec!["alpha", "beta", "zebra"]);
1053 }
1054
1055 #[test]
1056 fn test_drw_mutex_new_with_duplicate_names() {
1057 let names = vec![
1058 "resource1".to_string(),
1059 "resource2".to_string(),
1060 "resource1".to_string(), ];
1062 let lockers = create_mock_lockers(1);
1063 let mutex = DRWMutex::new("owner1".to_string(), names, lockers);
1064
1065 assert_eq!(mutex.names, vec!["resource1", "resource1", "resource2"]);
1067 }
1068
1069 #[tokio::test]
1070 async fn test_drw_mutex_lock_and_rlock_methods() {
1071 let names = vec!["resource1".to_string()];
1072 let lockers = create_mock_lockers(1);
1073 let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
1074
1075 let id = "test-id".to_string();
1076 let source = "test-source".to_string();
1077
1078 mutex.lock(&id, &source).await;
1081 mutex.un_lock().await;
1085
1086 mutex.r_lock(&id, &source).await;
1088 mutex.un_r_lock().await;
1089 }
1090
1091 #[tokio::test]
1092 async fn test_drw_mutex_zero_lockers() {
1093 let names = vec!["resource1".to_string()];
1094 let lockers = vec![]; let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
1096
1097 let id = "test-id".to_string();
1098 let source = "test-source".to_string();
1099 let opts = Options {
1100 timeout: Duration::from_secs(1),
1101 retry_interval: Duration::from_millis(10),
1102 };
1103
1104 let _result = mutex.get_lock(&id, &source, &opts).await;
1109 assert!(!_result, "Should fail with zero lockers");
1110 }
1111
1112 #[test]
1113 fn test_check_quorum_locked_edge_cases() {
1114 let locks = vec!["".to_string()];
1116 assert!(check_quorum_locked(&locks, 0)); let locks = vec!["uid1".to_string()];
1120 assert!(!check_quorum_locked(&locks, 5)); let locks = vec!["uid1".to_string(), "uid2".to_string(), "uid3".to_string()];
1124 assert!(!check_quorum_locked(&locks, 4)); }
1126
1127 #[test]
1128 fn test_check_failed_unlocks_comprehensive() {
1129 assert!(!check_failed_unlocks(&["".to_string()], 0)); assert!(!check_failed_unlocks(&["".to_string()], 1)); assert!(!check_failed_unlocks(&["uid".to_string()], 1)); assert!(check_failed_unlocks(&["uid".to_string()], 0)); let two_failed = vec!["uid1".to_string(), "uid2".to_string()];
1139 let one_failed = vec!["uid1".to_string(), "".to_string()];
1140 let zero_failed = vec!["".to_string(), "".to_string()];
1141
1142 assert!(check_failed_unlocks(&two_failed, 0)); assert!(check_failed_unlocks(&one_failed, 0)); assert!(!check_failed_unlocks(&zero_failed, 0)); assert!(check_failed_unlocks(&two_failed, 1)); assert!(check_failed_unlocks(&one_failed, 1)); assert!(!check_failed_unlocks(&zero_failed, 1)); assert!(!check_failed_unlocks(&two_failed, 2)); assert!(!check_failed_unlocks(&one_failed, 2)); assert!(!check_failed_unlocks(&zero_failed, 2)); }
1157
1158 #[test]
1159 fn test_options_clone() {
1160 let opts = Options {
1161 timeout: Duration::from_secs(5),
1162 retry_interval: Duration::from_millis(100),
1163 };
1164 let cloned = opts.clone();
1165 assert_eq!(opts.timeout, cloned.timeout);
1166 assert_eq!(opts.retry_interval, cloned.retry_interval);
1167 }
1168
1169 #[tokio::test]
1170 async fn test_drw_mutex_release_all_edge_cases() {
1171 let names = vec!["resource1".to_string()];
1172 let lockers = create_mock_lockers(2);
1173 let mut mutex = DRWMutex::new("owner1".to_string(), names, lockers);
1174
1175 let mut empty_locks = vec!["".to_string(), "".to_string()];
1177 let result = mutex.release_all(1, &mut empty_locks, false).await;
1178 assert!(result, "Should succeed when releasing empty locks");
1179
1180 let mut some_locks = vec!["uid1".to_string(), "uid2".to_string()];
1182 let result = mutex.release_all(1, &mut some_locks, false).await;
1183 let _ = result; }
1187
1188 #[test]
1189 fn test_drw_mutex_struct_fields() {
1190 let names = vec!["resource1".to_string()];
1191 let lockers = create_mock_lockers(2);
1192 let mutex = DRWMutex::new("test-owner".to_string(), names, lockers);
1193
1194 assert_eq!(mutex.owner, "test-owner");
1196 assert_eq!(mutex.names, vec!["resource1"]);
1197 assert_eq!(mutex.write_locks.len(), 2);
1198 assert_eq!(mutex.read_locks.len(), 2);
1199 assert_eq!(mutex.lockers.len(), 2);
1200 assert!(mutex.cancel_refresh_sender.is_none());
1201 assert_eq!(mutex.refresh_interval, DRW_MUTEX_REFRESH_INTERVAL);
1202 assert_eq!(mutex.lock_retry_min_interval, LOCK_RETRY_MIN_INTERVAL);
1203
1204 for lock in &mutex.write_locks {
1206 assert!(lock.is_empty());
1207 }
1208 for lock in &mutex.read_locks {
1209 assert!(lock.is_empty());
1210 }
1211 }
1212}