1use anyhow::{Result, anyhow};
7use std::collections::{HashMap, HashSet};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::RwLock;
12
13const DEFAULT_LOCK_TIMEOUT_SECS: u64 = 300;
14const LOCK_POLL_INTERVAL_MS: u64 = 50;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum LockType {
19 Read,
21 Write,
23}
24
25#[derive(Debug, Clone)]
27pub struct LockInfo {
28 pub agent_id: String,
30 pub lock_type: LockType,
32 pub acquired_at: Instant,
34 pub timeout: Option<Duration>,
36}
37
38impl LockInfo {
39 pub fn is_expired(&self) -> bool {
41 if let Some(timeout) = self.timeout {
42 self.acquired_at.elapsed() > timeout
43 } else {
44 false
45 }
46 }
47
48 pub fn time_remaining(&self) -> Option<Duration> {
50 self.timeout.map(|timeout| {
51 let elapsed = self.acquired_at.elapsed();
52 if elapsed >= timeout {
53 Duration::ZERO
54 } else {
55 timeout - elapsed
56 }
57 })
58 }
59}
60
61#[derive(Debug, Clone, Default)]
63struct FileLockState {
64 write_lock: Option<LockInfo>,
66 read_locks: Vec<LockInfo>,
68}
69
70pub struct LockGuard {
72 manager: Arc<FileLockManager>,
73 agent_id: String,
74 path: PathBuf,
75 lock_type: LockType,
76}
77
78impl std::fmt::Debug for LockGuard {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 f.debug_struct("LockGuard")
81 .field("agent_id", &self.agent_id)
82 .field("path", &self.path)
83 .field("lock_type", &self.lock_type)
84 .finish()
85 }
86}
87
88impl Drop for LockGuard {
89 fn drop(&mut self) {
90 let manager = self.manager.clone();
92 let agent_id = self.agent_id.clone();
93 let path = self.path.clone();
94 let lock_type = self.lock_type;
95
96 tokio::spawn(async move {
98 if let Err(e) = manager
99 .release_lock_internal(&agent_id, &path, lock_type)
100 .await
101 {
102 eprintln!("Warning: Failed to release lock on drop: {}", e);
103 }
104 });
105 }
106}
107
108pub struct FileLockManager {
110 locks: RwLock<HashMap<PathBuf, FileLockState>>,
112 default_timeout: Option<Duration>,
114 waiting: RwLock<HashMap<String, HashSet<PathBuf>>>,
117}
118
119impl FileLockManager {
120 pub fn new() -> Self {
122 Self {
123 locks: RwLock::new(HashMap::new()),
124 default_timeout: Some(Duration::from_secs(DEFAULT_LOCK_TIMEOUT_SECS)),
125 waiting: RwLock::new(HashMap::new()),
126 }
127 }
128
129 pub fn with_timeout(timeout: Duration) -> Self {
131 Self {
132 locks: RwLock::new(HashMap::new()),
133 default_timeout: Some(timeout),
134 waiting: RwLock::new(HashMap::new()),
135 }
136 }
137
138 pub fn without_timeout() -> Self {
140 Self {
141 locks: RwLock::new(HashMap::new()),
142 default_timeout: None,
143 waiting: RwLock::new(HashMap::new()),
144 }
145 }
146
147 #[tracing::instrument(name = "agent.lock.acquire", skip_all, fields(agent_id, lock_type = ?lock_type))]
151 pub async fn acquire_lock(
152 self: &Arc<Self>,
153 agent_id: &str,
154 path: impl AsRef<Path>,
155 lock_type: LockType,
156 ) -> Result<LockGuard> {
157 self.acquire_lock_with_timeout(agent_id, path, lock_type, self.default_timeout)
158 .await
159 }
160
161 #[tracing::instrument(name = "agent.lock.acquire_timeout", skip_all, fields(agent_id, lock_type = ?lock_type))]
163 pub async fn acquire_lock_with_timeout(
164 self: &Arc<Self>,
165 agent_id: &str,
166 path: impl AsRef<Path>,
167 lock_type: LockType,
168 timeout: Option<Duration>,
169 ) -> Result<LockGuard> {
170 let path = path.as_ref().to_path_buf();
171 let mut locks = self.locks.write().await;
172
173 self.cleanup_expired_internal(&mut locks);
175
176 let state = locks.entry(path.clone()).or_default();
177
178 match lock_type {
179 LockType::Read => {
180 if let Some(write_lock) = &state.write_lock
182 && write_lock.agent_id != agent_id
183 {
184 return Err(anyhow!(
185 "File {} is write-locked by agent {}",
186 path.display(),
187 write_lock.agent_id
188 ));
189 }
190
191 state.read_locks.push(LockInfo {
193 agent_id: agent_id.to_string(),
194 lock_type: LockType::Read,
195 acquired_at: Instant::now(),
196 timeout,
197 });
198 }
199 LockType::Write => {
200 if let Some(write_lock) = &state.write_lock {
202 if write_lock.agent_id != agent_id {
203 return Err(anyhow!(
204 "File {} is already write-locked by agent {}",
205 path.display(),
206 write_lock.agent_id
207 ));
208 }
209 return Ok(LockGuard {
211 manager: Arc::clone(self),
212 agent_id: agent_id.to_string(),
213 path,
214 lock_type,
215 });
216 }
217
218 let other_readers: Vec<_> = state
220 .read_locks
221 .iter()
222 .filter(|lock| lock.agent_id != agent_id)
223 .map(|lock| lock.agent_id.clone())
224 .collect();
225
226 if !other_readers.is_empty() {
227 return Err(anyhow!(
228 "File {} has read locks from agents: {:?}",
229 path.display(),
230 other_readers
231 ));
232 }
233
234 state.write_lock = Some(LockInfo {
236 agent_id: agent_id.to_string(),
237 lock_type: LockType::Write,
238 acquired_at: Instant::now(),
239 timeout,
240 });
241 }
242 }
243
244 Ok(LockGuard {
245 manager: Arc::clone(self),
246 agent_id: agent_id.to_string(),
247 path,
248 lock_type,
249 })
250 }
251
252 #[tracing::instrument(name = "agent.lock.acquire_wait", skip_all, fields(agent_id, lock_type = ?lock_type))]
257 pub async fn acquire_with_wait(
258 self: &Arc<Self>,
259 agent_id: &str,
260 path: impl AsRef<Path>,
261 lock_type: LockType,
262 wait_timeout: Duration,
263 ) -> Result<LockGuard> {
264 let path = path.as_ref().to_path_buf();
265 let deadline = Instant::now() + wait_timeout;
266 let poll_interval = Duration::from_millis(LOCK_POLL_INTERVAL_MS);
267
268 loop {
269 if self.would_deadlock(agent_id, &path).await {
271 return Err(anyhow!(
272 "Deadlock detected: agent {} waiting for {} would create circular dependency",
273 agent_id,
274 path.display()
275 ));
276 }
277
278 match self
280 .acquire_lock_with_timeout(agent_id, &path, lock_type, self.default_timeout)
281 .await
282 {
283 Ok(guard) => {
284 self.stop_waiting(agent_id, &path).await;
286 return Ok(guard);
287 }
288 Err(_) if Instant::now() < deadline => {
289 self.start_waiting(agent_id, &path).await;
291
292 self.cleanup_expired().await;
294
295 tokio::time::sleep(poll_interval).await;
297 }
298 Err(e) => {
299 self.stop_waiting(agent_id, &path).await;
301 return Err(anyhow!(
302 "Lock acquisition timeout after {:?}: {}",
303 wait_timeout,
304 e
305 ));
306 }
307 }
308 }
309 }
310
311 async fn would_deadlock(&self, agent_id: &str, target_path: &Path) -> bool {
315 let locks = self.locks.read().await;
316 let waiting = self.waiting.read().await;
317
318 let current_holders = if let Some(state) = locks.get(target_path) {
320 let mut holders = HashSet::new();
321 if let Some(write_lock) = &state.write_lock {
322 holders.insert(write_lock.agent_id.clone());
323 }
324 for read_lock in &state.read_locks {
325 holders.insert(read_lock.agent_id.clone());
326 }
327 holders
328 } else {
329 return false; };
331
332 if current_holders.contains(agent_id) {
334 return false;
335 }
336
337 let mut visited = HashSet::new();
339 let mut stack = Vec::new();
340
341 for holder in current_holders {
342 stack.push(holder);
343 }
344
345 while let Some(current) = stack.pop() {
346 if current == agent_id {
347 return true; }
349
350 if visited.contains(¤t) {
351 continue;
352 }
353 visited.insert(current.clone());
354
355 if let Some(waiting_for) = waiting.get(¤t) {
357 for waiting_path in waiting_for {
359 if let Some(state) = locks.get(waiting_path) {
360 if let Some(write_lock) = &state.write_lock
361 && !visited.contains(&write_lock.agent_id)
362 {
363 stack.push(write_lock.agent_id.clone());
364 }
365 for read_lock in &state.read_locks {
366 if !visited.contains(&read_lock.agent_id) {
367 stack.push(read_lock.agent_id.clone());
368 }
369 }
370 }
371 }
372 }
373 }
374
375 false
376 }
377
378 async fn start_waiting(&self, agent_id: &str, path: &Path) {
380 let mut waiting = self.waiting.write().await;
381 waiting
382 .entry(agent_id.to_string())
383 .or_insert_with(HashSet::new)
384 .insert(path.to_path_buf());
385 }
386
387 async fn stop_waiting(&self, agent_id: &str, path: &Path) {
389 let mut waiting = self.waiting.write().await;
390 if let Some(paths) = waiting.get_mut(agent_id) {
391 paths.remove(path);
392 if paths.is_empty() {
393 waiting.remove(agent_id);
394 }
395 }
396 }
397
398 pub async fn clear_waiting(&self, agent_id: &str) {
400 let mut waiting = self.waiting.write().await;
401 waiting.remove(agent_id);
402 }
403
404 pub async fn get_waiting_agents(&self) -> HashMap<String, Vec<PathBuf>> {
406 let waiting = self.waiting.read().await;
407 waiting
408 .iter()
409 .map(|(k, v)| (k.clone(), v.iter().cloned().collect()))
410 .collect()
411 }
412
413 #[tracing::instrument(name = "agent.lock.release", skip_all, fields(agent_id, lock_type = ?lock_type))]
415 pub async fn release_lock(
416 &self,
417 agent_id: &str,
418 path: impl AsRef<Path>,
419 lock_type: LockType,
420 ) -> Result<()> {
421 self.release_lock_internal(agent_id, path.as_ref(), lock_type)
422 .await
423 }
424
425 async fn release_lock_internal(
427 &self,
428 agent_id: &str,
429 path: &Path,
430 lock_type: LockType,
431 ) -> Result<()> {
432 let mut locks = self.locks.write().await;
433
434 if let Some(state) = locks.get_mut(path) {
435 match lock_type {
436 LockType::Read => {
437 let original_len = state.read_locks.len();
439 state.read_locks.retain(|lock| lock.agent_id != agent_id);
440
441 if state.read_locks.len() == original_len {
442 return Err(anyhow!(
443 "No read lock found for agent {} on {}",
444 agent_id,
445 path.display()
446 ));
447 }
448 }
449 LockType::Write => {
450 if let Some(write_lock) = &state.write_lock {
452 if write_lock.agent_id == agent_id {
453 state.write_lock = None;
454 } else {
455 return Err(anyhow!(
456 "Write lock on {} belongs to agent {}, not {}",
457 path.display(),
458 write_lock.agent_id,
459 agent_id
460 ));
461 }
462 } else {
463 return Err(anyhow!("No write lock found on {}", path.display()));
464 }
465 }
466 }
467
468 if state.write_lock.is_none() && state.read_locks.is_empty() {
470 locks.remove(path);
471 }
472 } else {
473 return Err(anyhow!("No locks found for {}", path.display()));
474 }
475
476 Ok(())
477 }
478
479 #[tracing::instrument(name = "agent.lock.release_all", skip(self))]
481 pub async fn release_all_locks(&self, agent_id: &str) -> usize {
482 let mut locks = self.locks.write().await;
483 let mut released = 0;
484
485 for state in locks.values_mut() {
486 if let Some(write_lock) = &state.write_lock
488 && write_lock.agent_id == agent_id
489 {
490 state.write_lock = None;
491 released += 1;
492 }
493
494 let original_len = state.read_locks.len();
496 state.read_locks.retain(|lock| lock.agent_id != agent_id);
497 released += original_len - state.read_locks.len();
498 }
499
500 locks.retain(|_, state| state.write_lock.is_some() || !state.read_locks.is_empty());
502
503 released
504 }
505
506 pub async fn check_lock(&self, path: impl AsRef<Path>) -> Option<LockInfo> {
508 let locks = self.locks.read().await;
509
510 if let Some(state) = locks.get(path.as_ref()) {
511 if let Some(write_lock) = &state.write_lock {
513 return Some(write_lock.clone());
514 }
515 if let Some(read_lock) = state.read_locks.first() {
516 return Some(read_lock.clone());
517 }
518 }
519
520 None
521 }
522
523 pub async fn is_locked_by(&self, path: impl AsRef<Path>, agent_id: &str) -> bool {
525 let locks = self.locks.read().await;
526
527 if let Some(state) = locks.get(path.as_ref()) {
528 if let Some(write_lock) = &state.write_lock
529 && write_lock.agent_id == agent_id
530 {
531 return true;
532 }
533 if state
534 .read_locks
535 .iter()
536 .any(|lock| lock.agent_id == agent_id)
537 {
538 return true;
539 }
540 }
541
542 false
543 }
544
545 pub async fn can_acquire(
547 &self,
548 path: impl AsRef<Path>,
549 agent_id: &str,
550 lock_type: LockType,
551 ) -> bool {
552 let locks = self.locks.read().await;
553
554 if let Some(state) = locks.get(path.as_ref()) {
555 match lock_type {
556 LockType::Read => {
557 if let Some(write_lock) = &state.write_lock {
559 return write_lock.agent_id == agent_id;
560 }
561 true
562 }
563 LockType::Write => {
564 if let Some(write_lock) = &state.write_lock
566 && write_lock.agent_id != agent_id
567 {
568 return false;
569 }
570 !state
571 .read_locks
572 .iter()
573 .any(|lock| lock.agent_id != agent_id)
574 }
575 }
576 } else {
577 true
578 }
579 }
580
581 pub async fn force_release(&self, path: impl AsRef<Path>) -> Result<()> {
583 let mut locks = self.locks.write().await;
584
585 if locks.remove(path.as_ref()).is_some() {
586 Ok(())
587 } else {
588 Err(anyhow!("No locks found for {}", path.as_ref().display()))
589 }
590 }
591
592 pub async fn list_locks(&self) -> Vec<(PathBuf, LockInfo)> {
594 let locks = self.locks.read().await;
595 let mut result = Vec::new();
596
597 for (path, state) in locks.iter() {
598 if let Some(write_lock) = &state.write_lock {
599 result.push((path.clone(), write_lock.clone()));
600 }
601 for read_lock in &state.read_locks {
602 result.push((path.clone(), read_lock.clone()));
603 }
604 }
605
606 result
607 }
608
609 pub async fn locks_for_agent(&self, agent_id: &str) -> Vec<(PathBuf, LockInfo)> {
611 let locks = self.locks.read().await;
612 let mut result = Vec::new();
613
614 for (path, state) in locks.iter() {
615 if let Some(write_lock) = &state.write_lock
616 && write_lock.agent_id == agent_id
617 {
618 result.push((path.clone(), write_lock.clone()));
619 }
620 for read_lock in &state.read_locks {
621 if read_lock.agent_id == agent_id {
622 result.push((path.clone(), read_lock.clone()));
623 }
624 }
625 }
626
627 result
628 }
629
630 pub async fn cleanup_expired(&self) -> usize {
632 let mut locks = self.locks.write().await;
633 self.cleanup_expired_internal(&mut locks)
634 }
635
636 fn cleanup_expired_internal(&self, locks: &mut HashMap<PathBuf, FileLockState>) -> usize {
638 let mut cleaned = 0;
639
640 for state in locks.values_mut() {
641 if let Some(write_lock) = &state.write_lock
643 && write_lock.is_expired()
644 {
645 state.write_lock = None;
646 cleaned += 1;
647 }
648
649 let original_len = state.read_locks.len();
651 state.read_locks.retain(|lock| !lock.is_expired());
652 cleaned += original_len - state.read_locks.len();
653 }
654
655 locks.retain(|_, state| state.write_lock.is_some() || !state.read_locks.is_empty());
657
658 cleaned
659 }
660
661 pub async fn stats(&self) -> LockStats {
663 let locks = self.locks.read().await;
664
665 let mut total_files = 0;
666 let mut total_write_locks = 0;
667 let mut total_read_locks = 0;
668
669 for state in locks.values() {
670 total_files += 1;
671 if state.write_lock.is_some() {
672 total_write_locks += 1;
673 }
674 total_read_locks += state.read_locks.len();
675 }
676
677 LockStats {
678 total_files,
679 total_write_locks,
680 total_read_locks,
681 }
682 }
683}
684
685impl Default for FileLockManager {
686 fn default() -> Self {
687 Self::new()
688 }
689}
690
691#[derive(Debug, Clone)]
693pub struct LockStats {
694 pub total_files: usize,
696 pub total_write_locks: usize,
698 pub total_read_locks: usize,
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705
706 #[tokio::test]
707 async fn test_acquire_write_lock() {
708 let manager = Arc::new(FileLockManager::new());
709 let guard = manager
710 .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
711 .await
712 .unwrap();
713
714 assert_eq!(guard.lock_type, LockType::Write);
715 assert!(manager.is_locked_by("/test/file.txt", "agent-1").await);
716 }
717
718 #[tokio::test]
719 async fn test_acquire_read_lock() {
720 let manager = Arc::new(FileLockManager::new());
721 let _guard = manager
722 .acquire_lock("agent-1", "/test/file.txt", LockType::Read)
723 .await
724 .unwrap();
725
726 assert!(manager.is_locked_by("/test/file.txt", "agent-1").await);
727 }
728
729 #[tokio::test]
730 async fn test_multiple_read_locks() {
731 let manager = Arc::new(FileLockManager::new());
732
733 let _guard1 = manager
734 .acquire_lock("agent-1", "/test/file.txt", LockType::Read)
735 .await
736 .unwrap();
737 let _guard2 = manager
738 .acquire_lock("agent-2", "/test/file.txt", LockType::Read)
739 .await
740 .unwrap();
741
742 assert!(manager.is_locked_by("/test/file.txt", "agent-1").await);
743 assert!(manager.is_locked_by("/test/file.txt", "agent-2").await);
744 }
745
746 #[tokio::test]
747 async fn test_write_lock_blocks_other_write() {
748 let manager = Arc::new(FileLockManager::new());
749
750 let _guard = manager
751 .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
752 .await
753 .unwrap();
754
755 let result = manager
756 .acquire_lock("agent-2", "/test/file.txt", LockType::Write)
757 .await;
758
759 assert!(result.is_err());
760 }
761
762 #[tokio::test]
763 async fn test_write_lock_blocks_read() {
764 let manager = Arc::new(FileLockManager::new());
765
766 let _guard = manager
767 .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
768 .await
769 .unwrap();
770
771 let result = manager
772 .acquire_lock("agent-2", "/test/file.txt", LockType::Read)
773 .await;
774
775 assert!(result.is_err());
776 }
777
778 #[tokio::test]
779 async fn test_read_lock_blocks_write() {
780 let manager = Arc::new(FileLockManager::new());
781
782 let _guard = manager
783 .acquire_lock("agent-1", "/test/file.txt", LockType::Read)
784 .await
785 .unwrap();
786
787 let result = manager
788 .acquire_lock("agent-2", "/test/file.txt", LockType::Write)
789 .await;
790
791 assert!(result.is_err());
792 }
793
794 #[tokio::test]
795 async fn test_same_agent_reacquire_write() {
796 let manager = Arc::new(FileLockManager::new());
797
798 let _guard1 = manager
799 .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
800 .await
801 .unwrap();
802 let _guard2 = manager
803 .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
804 .await
805 .unwrap();
806
807 assert!(manager.is_locked_by("/test/file.txt", "agent-1").await);
809 }
810
811 #[tokio::test]
812 async fn test_release_all_locks() {
813 let manager = Arc::new(FileLockManager::new());
814
815 let _guard1 = manager
816 .acquire_lock("agent-1", "/test/file1.txt", LockType::Write)
817 .await
818 .unwrap();
819 let _guard2 = manager
820 .acquire_lock("agent-1", "/test/file2.txt", LockType::Read)
821 .await
822 .unwrap();
823
824 std::mem::forget(_guard1);
826 std::mem::forget(_guard2);
827
828 let released = manager.release_all_locks("agent-1").await;
829 assert_eq!(released, 2);
830 }
831
832 #[tokio::test]
833 async fn test_lock_stats() {
834 let manager = Arc::new(FileLockManager::new());
835
836 let _guard1 = manager
837 .acquire_lock("agent-1", "/test/file1.txt", LockType::Write)
838 .await
839 .unwrap();
840 let _guard2 = manager
841 .acquire_lock("agent-2", "/test/file2.txt", LockType::Read)
842 .await
843 .unwrap();
844 let _guard3 = manager
845 .acquire_lock("agent-3", "/test/file2.txt", LockType::Read)
846 .await
847 .unwrap();
848
849 let stats = manager.stats().await;
850 assert_eq!(stats.total_files, 2);
851 assert_eq!(stats.total_write_locks, 1);
852 assert_eq!(stats.total_read_locks, 2);
853 }
854
855 #[tokio::test]
856 async fn test_can_acquire() {
857 let manager = Arc::new(FileLockManager::new());
858
859 assert!(
861 manager
862 .can_acquire("/test/file.txt", "agent-1", LockType::Write)
863 .await
864 );
865 assert!(
866 manager
867 .can_acquire("/test/file.txt", "agent-1", LockType::Read)
868 .await
869 );
870
871 let _guard = manager
872 .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
873 .await
874 .unwrap();
875
876 assert!(
878 manager
879 .can_acquire("/test/file.txt", "agent-1", LockType::Write)
880 .await
881 );
882 assert!(
883 manager
884 .can_acquire("/test/file.txt", "agent-1", LockType::Read)
885 .await
886 );
887
888 assert!(
890 !manager
891 .can_acquire("/test/file.txt", "agent-2", LockType::Write)
892 .await
893 );
894 assert!(
895 !manager
896 .can_acquire("/test/file.txt", "agent-2", LockType::Read)
897 .await
898 );
899 }
900
901 #[tokio::test]
902 async fn test_expired_lock_cleanup() {
903 let manager = Arc::new(FileLockManager::new());
904
905 let _guard = manager
907 .acquire_lock_with_timeout(
908 "agent-1",
909 "/test/file.txt",
910 LockType::Write,
911 Some(Duration::from_millis(1)),
912 )
913 .await
914 .unwrap();
915
916 std::mem::forget(_guard);
918
919 tokio::time::sleep(Duration::from_millis(10)).await;
921
922 let cleaned = manager.cleanup_expired().await;
924 assert_eq!(cleaned, 1);
925
926 let result = manager
928 .acquire_lock("agent-2", "/test/file.txt", LockType::Write)
929 .await;
930 assert!(result.is_ok());
931 }
932
933 #[tokio::test]
934 async fn test_force_release() {
935 let manager = Arc::new(FileLockManager::new());
936
937 let _guard = manager
938 .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
939 .await
940 .unwrap();
941
942 std::mem::forget(_guard);
944
945 manager.force_release("/test/file.txt").await.unwrap();
947
948 let result = manager
950 .acquire_lock("agent-2", "/test/file.txt", LockType::Write)
951 .await;
952 assert!(result.is_ok());
953 }
954
955 #[tokio::test]
956 async fn test_list_locks() {
957 let manager = Arc::new(FileLockManager::new());
958
959 let _guard1 = manager
960 .acquire_lock("agent-1", "/test/file1.txt", LockType::Write)
961 .await
962 .unwrap();
963 let _guard2 = manager
964 .acquire_lock("agent-2", "/test/file2.txt", LockType::Read)
965 .await
966 .unwrap();
967
968 let locks = manager.list_locks().await;
969 assert_eq!(locks.len(), 2);
970 }
971
972 #[tokio::test]
973 async fn test_locks_for_agent() {
974 let manager = Arc::new(FileLockManager::new());
975
976 let _guard1 = manager
977 .acquire_lock("agent-1", "/test/file1.txt", LockType::Write)
978 .await
979 .unwrap();
980 let _guard2 = manager
981 .acquire_lock("agent-1", "/test/file2.txt", LockType::Read)
982 .await
983 .unwrap();
984 let _guard3 = manager
985 .acquire_lock("agent-2", "/test/file3.txt", LockType::Write)
986 .await
987 .unwrap();
988
989 let agent1_locks = manager.locks_for_agent("agent-1").await;
990 assert_eq!(agent1_locks.len(), 2);
991
992 let agent2_locks = manager.locks_for_agent("agent-2").await;
993 assert_eq!(agent2_locks.len(), 1);
994 }
995
996 #[tokio::test]
997 async fn test_acquire_with_wait_success() {
998 let manager = Arc::new(FileLockManager::new());
999
1000 let guard = manager
1002 .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
1003 .await
1004 .unwrap();
1005
1006 let manager_clone = manager.clone();
1008 tokio::spawn(async move {
1009 tokio::time::sleep(Duration::from_millis(50)).await;
1010 drop(guard);
1011 tokio::time::sleep(Duration::from_millis(10)).await;
1013 manager_clone.cleanup_expired().await;
1014 });
1015
1016 let result = manager
1018 .acquire_with_wait(
1019 "agent-2",
1020 "/test/file.txt",
1021 LockType::Write,
1022 Duration::from_millis(500),
1023 )
1024 .await;
1025
1026 assert!(result.is_ok());
1027 }
1028
1029 #[tokio::test]
1030 async fn test_acquire_with_wait_timeout() {
1031 let manager = Arc::new(FileLockManager::new());
1032
1033 let _guard = manager
1035 .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
1036 .await
1037 .unwrap();
1038
1039 let result = manager
1041 .acquire_with_wait(
1042 "agent-2",
1043 "/test/file.txt",
1044 LockType::Write,
1045 Duration::from_millis(100),
1046 )
1047 .await;
1048
1049 assert!(result.is_err());
1050 assert!(result.unwrap_err().to_string().contains("timeout"));
1051 }
1052
1053 #[tokio::test]
1054 async fn test_deadlock_detection() {
1055 let manager = Arc::new(FileLockManager::new());
1056
1057 let _guard1 = manager
1059 .acquire_lock("agent-1", "/test/file1.txt", LockType::Write)
1060 .await
1061 .unwrap();
1062
1063 let _guard2 = manager
1065 .acquire_lock("agent-2", "/test/file2.txt", LockType::Write)
1066 .await
1067 .unwrap();
1068
1069 manager
1071 .start_waiting("agent-1", std::path::Path::new("/test/file2.txt"))
1072 .await;
1073
1074 assert!(
1076 manager
1077 .would_deadlock("agent-2", std::path::Path::new("/test/file1.txt"))
1078 .await
1079 );
1080
1081 assert!(
1083 !manager
1084 .would_deadlock("agent-3", std::path::Path::new("/test/file1.txt"))
1085 .await
1086 );
1087 }
1088
1089 #[tokio::test]
1090 async fn test_waiting_agents() {
1091 let manager = Arc::new(FileLockManager::new());
1092
1093 manager
1095 .start_waiting("agent-1", std::path::Path::new("/test/file1.txt"))
1096 .await;
1097 manager
1098 .start_waiting("agent-1", std::path::Path::new("/test/file2.txt"))
1099 .await;
1100 manager
1101 .start_waiting("agent-2", std::path::Path::new("/test/file1.txt"))
1102 .await;
1103
1104 let waiting = manager.get_waiting_agents().await;
1105 assert_eq!(waiting.len(), 2);
1106 assert_eq!(waiting.get("agent-1").map(|v| v.len()), Some(2));
1107 assert_eq!(waiting.get("agent-2").map(|v| v.len()), Some(1));
1108
1109 manager.clear_waiting("agent-1").await;
1111
1112 let waiting = manager.get_waiting_agents().await;
1113 assert_eq!(waiting.len(), 1);
1114 assert!(!waiting.contains_key("agent-1"));
1115 }
1116}