Skip to main content

brainwires_agents/
file_locks.rs

1//! File locking system for multi-agent coordination
2//!
3//! Provides a mechanism for agents to "checkout" files, preventing concurrent
4//! modifications and ensuring consistency across background task agents.
5
6use anyhow::{Result, anyhow};
7use std::collections::{HashMap, HashSet};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::RwLock;
12
13const DEFAULT_LOCK_TIMEOUT_SECS: u64 = 300;
14const LOCK_POLL_INTERVAL_MS: u64 = 50;
15
16/// Type of file lock
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum LockType {
19    /// Shared read lock - multiple agents can hold simultaneously
20    Read,
21    /// Exclusive write lock - only one agent can hold
22    Write,
23}
24
25/// Information about a held lock
26#[derive(Debug, Clone)]
27pub struct LockInfo {
28    /// ID of the agent holding the lock
29    pub agent_id: String,
30    /// Type of lock
31    pub lock_type: LockType,
32    /// When the lock was acquired
33    pub acquired_at: Instant,
34    /// Optional timeout for auto-release
35    pub timeout: Option<Duration>,
36}
37
38impl LockInfo {
39    /// Check if the lock has expired
40    pub fn is_expired(&self) -> bool {
41        if let Some(timeout) = self.timeout {
42            self.acquired_at.elapsed() > timeout
43        } else {
44            false
45        }
46    }
47
48    /// Get remaining time before timeout
49    pub fn time_remaining(&self) -> Option<Duration> {
50        self.timeout.map(|timeout| {
51            let elapsed = self.acquired_at.elapsed();
52            if elapsed >= timeout {
53                Duration::ZERO
54            } else {
55                timeout - elapsed
56            }
57        })
58    }
59}
60
61/// Internal lock state for a file
62#[derive(Debug, Clone, Default)]
63struct FileLockState {
64    /// Write lock (exclusive)
65    write_lock: Option<LockInfo>,
66    /// Read locks (shared)
67    read_locks: Vec<LockInfo>,
68}
69
70/// Guard that releases a lock when dropped
71pub struct LockGuard {
72    manager: Arc<FileLockManager>,
73    agent_id: String,
74    path: PathBuf,
75    lock_type: LockType,
76}
77
78impl std::fmt::Debug for LockGuard {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        f.debug_struct("LockGuard")
81            .field("agent_id", &self.agent_id)
82            .field("path", &self.path)
83            .field("lock_type", &self.lock_type)
84            .finish()
85    }
86}
87
88impl Drop for LockGuard {
89    fn drop(&mut self) {
90        // Use blocking release since we're in Drop
91        let manager = self.manager.clone();
92        let agent_id = self.agent_id.clone();
93        let path = self.path.clone();
94        let lock_type = self.lock_type;
95
96        // Spawn a task to release the lock asynchronously
97        tokio::spawn(async move {
98            if let Err(e) = manager
99                .release_lock_internal(&agent_id, &path, lock_type)
100                .await
101            {
102                eprintln!("Warning: Failed to release lock on drop: {}", e);
103            }
104        });
105    }
106}
107
108/// Manages file locks across multiple agents
109pub struct FileLockManager {
110    /// Map of file paths to their lock states
111    locks: RwLock<HashMap<PathBuf, FileLockState>>,
112    /// Default timeout for locks
113    default_timeout: Option<Duration>,
114    /// Waiting agents: agent_id -> set of paths they're waiting for
115    /// Used for deadlock detection
116    waiting: RwLock<HashMap<String, HashSet<PathBuf>>>,
117}
118
119impl FileLockManager {
120    /// Create a new file lock manager
121    pub fn new() -> Self {
122        Self {
123            locks: RwLock::new(HashMap::new()),
124            default_timeout: Some(Duration::from_secs(DEFAULT_LOCK_TIMEOUT_SECS)),
125            waiting: RwLock::new(HashMap::new()),
126        }
127    }
128
129    /// Create a file lock manager with a custom default timeout
130    pub fn with_timeout(timeout: Duration) -> Self {
131        Self {
132            locks: RwLock::new(HashMap::new()),
133            default_timeout: Some(timeout),
134            waiting: RwLock::new(HashMap::new()),
135        }
136    }
137
138    /// Create a file lock manager with no default timeout
139    pub fn without_timeout() -> Self {
140        Self {
141            locks: RwLock::new(HashMap::new()),
142            default_timeout: None,
143            waiting: RwLock::new(HashMap::new()),
144        }
145    }
146
147    /// Acquire a lock on a file
148    ///
149    /// Returns a LockGuard that automatically releases the lock when dropped.
150    #[tracing::instrument(name = "agent.lock.acquire", skip_all, fields(agent_id, lock_type = ?lock_type))]
151    pub async fn acquire_lock(
152        self: &Arc<Self>,
153        agent_id: &str,
154        path: impl AsRef<Path>,
155        lock_type: LockType,
156    ) -> Result<LockGuard> {
157        self.acquire_lock_with_timeout(agent_id, path, lock_type, self.default_timeout)
158            .await
159    }
160
161    /// Acquire a lock with a specific timeout
162    #[tracing::instrument(name = "agent.lock.acquire_timeout", skip_all, fields(agent_id, lock_type = ?lock_type))]
163    pub async fn acquire_lock_with_timeout(
164        self: &Arc<Self>,
165        agent_id: &str,
166        path: impl AsRef<Path>,
167        lock_type: LockType,
168        timeout: Option<Duration>,
169    ) -> Result<LockGuard> {
170        let path = path.as_ref().to_path_buf();
171        let mut locks = self.locks.write().await;
172
173        // Clean up expired locks first
174        self.cleanup_expired_internal(&mut locks);
175
176        let state = locks.entry(path.clone()).or_default();
177
178        match lock_type {
179            LockType::Read => {
180                // Check for write lock
181                if let Some(write_lock) = &state.write_lock
182                    && write_lock.agent_id != agent_id
183                {
184                    return Err(anyhow!(
185                        "File {} is write-locked by agent {}",
186                        path.display(),
187                        write_lock.agent_id
188                    ));
189                }
190
191                // Add read lock
192                state.read_locks.push(LockInfo {
193                    agent_id: agent_id.to_string(),
194                    lock_type: LockType::Read,
195                    acquired_at: Instant::now(),
196                    timeout,
197                });
198            }
199            LockType::Write => {
200                // Check for existing write lock
201                if let Some(write_lock) = &state.write_lock {
202                    if write_lock.agent_id != agent_id {
203                        return Err(anyhow!(
204                            "File {} is already write-locked by agent {}",
205                            path.display(),
206                            write_lock.agent_id
207                        ));
208                    }
209                    // Same agent already has write lock, return success
210                    return Ok(LockGuard {
211                        manager: Arc::clone(self),
212                        agent_id: agent_id.to_string(),
213                        path,
214                        lock_type,
215                    });
216                }
217
218                // Check for read locks by other agents
219                let other_readers: Vec<_> = state
220                    .read_locks
221                    .iter()
222                    .filter(|lock| lock.agent_id != agent_id)
223                    .map(|lock| lock.agent_id.clone())
224                    .collect();
225
226                if !other_readers.is_empty() {
227                    return Err(anyhow!(
228                        "File {} has read locks from agents: {:?}",
229                        path.display(),
230                        other_readers
231                    ));
232                }
233
234                // Set write lock
235                state.write_lock = Some(LockInfo {
236                    agent_id: agent_id.to_string(),
237                    lock_type: LockType::Write,
238                    acquired_at: Instant::now(),
239                    timeout,
240                });
241            }
242        }
243
244        Ok(LockGuard {
245            manager: Arc::clone(self),
246            agent_id: agent_id.to_string(),
247            path,
248            lock_type,
249        })
250    }
251
252    /// Acquire a lock with waiting and timeout
253    ///
254    /// This method will wait up to `wait_timeout` for the lock to become available.
255    /// It includes deadlock detection to prevent circular wait scenarios.
256    #[tracing::instrument(name = "agent.lock.acquire_wait", skip_all, fields(agent_id, lock_type = ?lock_type))]
257    pub async fn acquire_with_wait(
258        self: &Arc<Self>,
259        agent_id: &str,
260        path: impl AsRef<Path>,
261        lock_type: LockType,
262        wait_timeout: Duration,
263    ) -> Result<LockGuard> {
264        let path = path.as_ref().to_path_buf();
265        let deadline = Instant::now() + wait_timeout;
266        let poll_interval = Duration::from_millis(LOCK_POLL_INTERVAL_MS);
267
268        loop {
269            // Check for deadlock before waiting
270            if self.would_deadlock(agent_id, &path).await {
271                return Err(anyhow!(
272                    "Deadlock detected: agent {} waiting for {} would create circular dependency",
273                    agent_id,
274                    path.display()
275                ));
276            }
277
278            // Try to acquire the lock
279            match self
280                .acquire_lock_with_timeout(agent_id, &path, lock_type, self.default_timeout)
281                .await
282            {
283                Ok(guard) => {
284                    // Successfully acquired - remove from waiting set
285                    self.stop_waiting(agent_id, &path).await;
286                    return Ok(guard);
287                }
288                Err(_) if Instant::now() < deadline => {
289                    // Record that we're waiting for this path
290                    self.start_waiting(agent_id, &path).await;
291
292                    // Clean up expired locks that might be blocking us
293                    self.cleanup_expired().await;
294
295                    // Wait before retrying
296                    tokio::time::sleep(poll_interval).await;
297                }
298                Err(e) => {
299                    // Timeout or other error
300                    self.stop_waiting(agent_id, &path).await;
301                    return Err(anyhow!(
302                        "Lock acquisition timeout after {:?}: {}",
303                        wait_timeout,
304                        e
305                    ));
306                }
307            }
308        }
309    }
310
311    /// Check if acquiring a lock would cause a deadlock
312    ///
313    /// Uses cycle detection in the wait-for graph.
314    async fn would_deadlock(&self, agent_id: &str, target_path: &Path) -> bool {
315        let locks = self.locks.read().await;
316        let waiting = self.waiting.read().await;
317
318        // Find who currently holds the lock on target_path
319        let current_holders = if let Some(state) = locks.get(target_path) {
320            let mut holders = HashSet::new();
321            if let Some(write_lock) = &state.write_lock {
322                holders.insert(write_lock.agent_id.clone());
323            }
324            for read_lock in &state.read_locks {
325                holders.insert(read_lock.agent_id.clone());
326            }
327            holders
328        } else {
329            return false; // No one holds the lock
330        };
331
332        // If we already hold the lock, no deadlock
333        if current_holders.contains(agent_id) {
334            return false;
335        }
336
337        // DFS to detect cycle: would any holder eventually wait for us?
338        let mut visited = HashSet::new();
339        let mut stack = Vec::new();
340
341        for holder in current_holders {
342            stack.push(holder);
343        }
344
345        while let Some(current) = stack.pop() {
346            if current == agent_id {
347                return true; // Cycle detected
348            }
349
350            if visited.contains(&current) {
351                continue;
352            }
353            visited.insert(current.clone());
354
355            // Find what paths this agent is waiting for
356            if let Some(waiting_for) = waiting.get(&current) {
357                // Find who holds those paths
358                for waiting_path in waiting_for {
359                    if let Some(state) = locks.get(waiting_path) {
360                        if let Some(write_lock) = &state.write_lock
361                            && !visited.contains(&write_lock.agent_id)
362                        {
363                            stack.push(write_lock.agent_id.clone());
364                        }
365                        for read_lock in &state.read_locks {
366                            if !visited.contains(&read_lock.agent_id) {
367                                stack.push(read_lock.agent_id.clone());
368                            }
369                        }
370                    }
371                }
372            }
373        }
374
375        false
376    }
377
378    /// Record that an agent is waiting for a path
379    async fn start_waiting(&self, agent_id: &str, path: &Path) {
380        let mut waiting = self.waiting.write().await;
381        waiting
382            .entry(agent_id.to_string())
383            .or_insert_with(HashSet::new)
384            .insert(path.to_path_buf());
385    }
386
387    /// Remove an agent from the waiting set for a path
388    async fn stop_waiting(&self, agent_id: &str, path: &Path) {
389        let mut waiting = self.waiting.write().await;
390        if let Some(paths) = waiting.get_mut(agent_id) {
391            paths.remove(path);
392            if paths.is_empty() {
393                waiting.remove(agent_id);
394            }
395        }
396    }
397
398    /// Clear all waiting entries for an agent (e.g., when agent exits)
399    pub async fn clear_waiting(&self, agent_id: &str) {
400        let mut waiting = self.waiting.write().await;
401        waiting.remove(agent_id);
402    }
403
404    /// Get all agents currently waiting for locks
405    pub async fn get_waiting_agents(&self) -> HashMap<String, Vec<PathBuf>> {
406        let waiting = self.waiting.read().await;
407        waiting
408            .iter()
409            .map(|(k, v)| (k.clone(), v.iter().cloned().collect()))
410            .collect()
411    }
412
413    /// Release a specific lock
414    #[tracing::instrument(name = "agent.lock.release", skip_all, fields(agent_id, lock_type = ?lock_type))]
415    pub async fn release_lock(
416        &self,
417        agent_id: &str,
418        path: impl AsRef<Path>,
419        lock_type: LockType,
420    ) -> Result<()> {
421        self.release_lock_internal(agent_id, path.as_ref(), lock_type)
422            .await
423    }
424
425    /// Internal release implementation
426    async fn release_lock_internal(
427        &self,
428        agent_id: &str,
429        path: &Path,
430        lock_type: LockType,
431    ) -> Result<()> {
432        let mut locks = self.locks.write().await;
433
434        if let Some(state) = locks.get_mut(path) {
435            match lock_type {
436                LockType::Read => {
437                    // Remove matching read lock
438                    let original_len = state.read_locks.len();
439                    state.read_locks.retain(|lock| lock.agent_id != agent_id);
440
441                    if state.read_locks.len() == original_len {
442                        return Err(anyhow!(
443                            "No read lock found for agent {} on {}",
444                            agent_id,
445                            path.display()
446                        ));
447                    }
448                }
449                LockType::Write => {
450                    // Remove write lock if it belongs to this agent
451                    if let Some(write_lock) = &state.write_lock {
452                        if write_lock.agent_id == agent_id {
453                            state.write_lock = None;
454                        } else {
455                            return Err(anyhow!(
456                                "Write lock on {} belongs to agent {}, not {}",
457                                path.display(),
458                                write_lock.agent_id,
459                                agent_id
460                            ));
461                        }
462                    } else {
463                        return Err(anyhow!("No write lock found on {}", path.display()));
464                    }
465                }
466            }
467
468            // Clean up empty state
469            if state.write_lock.is_none() && state.read_locks.is_empty() {
470                locks.remove(path);
471            }
472        } else {
473            return Err(anyhow!("No locks found for {}", path.display()));
474        }
475
476        Ok(())
477    }
478
479    /// Release all locks held by an agent
480    #[tracing::instrument(name = "agent.lock.release_all", skip(self))]
481    pub async fn release_all_locks(&self, agent_id: &str) -> usize {
482        let mut locks = self.locks.write().await;
483        let mut released = 0;
484
485        for state in locks.values_mut() {
486            // Release write lock
487            if let Some(write_lock) = &state.write_lock
488                && write_lock.agent_id == agent_id
489            {
490                state.write_lock = None;
491                released += 1;
492            }
493
494            // Release read locks
495            let original_len = state.read_locks.len();
496            state.read_locks.retain(|lock| lock.agent_id != agent_id);
497            released += original_len - state.read_locks.len();
498        }
499
500        // Clean up empty entries
501        locks.retain(|_, state| state.write_lock.is_some() || !state.read_locks.is_empty());
502
503        released
504    }
505
506    /// Check if a file is locked
507    pub async fn check_lock(&self, path: impl AsRef<Path>) -> Option<LockInfo> {
508        let locks = self.locks.read().await;
509
510        if let Some(state) = locks.get(path.as_ref()) {
511            // Return write lock if present, otherwise first read lock
512            if let Some(write_lock) = &state.write_lock {
513                return Some(write_lock.clone());
514            }
515            if let Some(read_lock) = state.read_locks.first() {
516                return Some(read_lock.clone());
517            }
518        }
519
520        None
521    }
522
523    /// Check if a file is locked by a specific agent
524    pub async fn is_locked_by(&self, path: impl AsRef<Path>, agent_id: &str) -> bool {
525        let locks = self.locks.read().await;
526
527        if let Some(state) = locks.get(path.as_ref()) {
528            if let Some(write_lock) = &state.write_lock
529                && write_lock.agent_id == agent_id
530            {
531                return true;
532            }
533            if state
534                .read_locks
535                .iter()
536                .any(|lock| lock.agent_id == agent_id)
537            {
538                return true;
539            }
540        }
541
542        false
543    }
544
545    /// Check if a file can be locked with a specific type by an agent
546    pub async fn can_acquire(
547        &self,
548        path: impl AsRef<Path>,
549        agent_id: &str,
550        lock_type: LockType,
551    ) -> bool {
552        let locks = self.locks.read().await;
553
554        if let Some(state) = locks.get(path.as_ref()) {
555            match lock_type {
556                LockType::Read => {
557                    // Can read if no write lock or own write lock
558                    if let Some(write_lock) = &state.write_lock {
559                        return write_lock.agent_id == agent_id;
560                    }
561                    true
562                }
563                LockType::Write => {
564                    // Can write if no other agent has any lock
565                    if let Some(write_lock) = &state.write_lock
566                        && write_lock.agent_id != agent_id
567                    {
568                        return false;
569                    }
570                    !state
571                        .read_locks
572                        .iter()
573                        .any(|lock| lock.agent_id != agent_id)
574                }
575            }
576        } else {
577            true
578        }
579    }
580
581    /// Force release a lock (admin operation)
582    pub async fn force_release(&self, path: impl AsRef<Path>) -> Result<()> {
583        let mut locks = self.locks.write().await;
584
585        if locks.remove(path.as_ref()).is_some() {
586            Ok(())
587        } else {
588            Err(anyhow!("No locks found for {}", path.as_ref().display()))
589        }
590    }
591
592    /// Get all currently held locks
593    pub async fn list_locks(&self) -> Vec<(PathBuf, LockInfo)> {
594        let locks = self.locks.read().await;
595        let mut result = Vec::new();
596
597        for (path, state) in locks.iter() {
598            if let Some(write_lock) = &state.write_lock {
599                result.push((path.clone(), write_lock.clone()));
600            }
601            for read_lock in &state.read_locks {
602                result.push((path.clone(), read_lock.clone()));
603            }
604        }
605
606        result
607    }
608
609    /// Get locks held by a specific agent
610    pub async fn locks_for_agent(&self, agent_id: &str) -> Vec<(PathBuf, LockInfo)> {
611        let locks = self.locks.read().await;
612        let mut result = Vec::new();
613
614        for (path, state) in locks.iter() {
615            if let Some(write_lock) = &state.write_lock
616                && write_lock.agent_id == agent_id
617            {
618                result.push((path.clone(), write_lock.clone()));
619            }
620            for read_lock in &state.read_locks {
621                if read_lock.agent_id == agent_id {
622                    result.push((path.clone(), read_lock.clone()));
623                }
624            }
625        }
626
627        result
628    }
629
630    /// Clean up expired locks
631    pub async fn cleanup_expired(&self) -> usize {
632        let mut locks = self.locks.write().await;
633        self.cleanup_expired_internal(&mut locks)
634    }
635
636    /// Internal cleanup implementation
637    fn cleanup_expired_internal(&self, locks: &mut HashMap<PathBuf, FileLockState>) -> usize {
638        let mut cleaned = 0;
639
640        for state in locks.values_mut() {
641            // Clean expired write lock
642            if let Some(write_lock) = &state.write_lock
643                && write_lock.is_expired()
644            {
645                state.write_lock = None;
646                cleaned += 1;
647            }
648
649            // Clean expired read locks
650            let original_len = state.read_locks.len();
651            state.read_locks.retain(|lock| !lock.is_expired());
652            cleaned += original_len - state.read_locks.len();
653        }
654
655        // Remove empty entries
656        locks.retain(|_, state| state.write_lock.is_some() || !state.read_locks.is_empty());
657
658        cleaned
659    }
660
661    /// Get statistics about current locks
662    pub async fn stats(&self) -> LockStats {
663        let locks = self.locks.read().await;
664
665        let mut total_files = 0;
666        let mut total_write_locks = 0;
667        let mut total_read_locks = 0;
668
669        for state in locks.values() {
670            total_files += 1;
671            if state.write_lock.is_some() {
672                total_write_locks += 1;
673            }
674            total_read_locks += state.read_locks.len();
675        }
676
677        LockStats {
678            total_files,
679            total_write_locks,
680            total_read_locks,
681        }
682    }
683}
684
685impl Default for FileLockManager {
686    fn default() -> Self {
687        Self::new()
688    }
689}
690
691/// Statistics about current locks
692#[derive(Debug, Clone)]
693pub struct LockStats {
694    /// Number of files with locks
695    pub total_files: usize,
696    /// Number of write locks
697    pub total_write_locks: usize,
698    /// Number of read locks
699    pub total_read_locks: usize,
700}
701
702#[cfg(test)]
703mod tests {
704    use super::*;
705
706    #[tokio::test]
707    async fn test_acquire_write_lock() {
708        let manager = Arc::new(FileLockManager::new());
709        let guard = manager
710            .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
711            .await
712            .unwrap();
713
714        assert_eq!(guard.lock_type, LockType::Write);
715        assert!(manager.is_locked_by("/test/file.txt", "agent-1").await);
716    }
717
718    #[tokio::test]
719    async fn test_acquire_read_lock() {
720        let manager = Arc::new(FileLockManager::new());
721        let _guard = manager
722            .acquire_lock("agent-1", "/test/file.txt", LockType::Read)
723            .await
724            .unwrap();
725
726        assert!(manager.is_locked_by("/test/file.txt", "agent-1").await);
727    }
728
729    #[tokio::test]
730    async fn test_multiple_read_locks() {
731        let manager = Arc::new(FileLockManager::new());
732
733        let _guard1 = manager
734            .acquire_lock("agent-1", "/test/file.txt", LockType::Read)
735            .await
736            .unwrap();
737        let _guard2 = manager
738            .acquire_lock("agent-2", "/test/file.txt", LockType::Read)
739            .await
740            .unwrap();
741
742        assert!(manager.is_locked_by("/test/file.txt", "agent-1").await);
743        assert!(manager.is_locked_by("/test/file.txt", "agent-2").await);
744    }
745
746    #[tokio::test]
747    async fn test_write_lock_blocks_other_write() {
748        let manager = Arc::new(FileLockManager::new());
749
750        let _guard = manager
751            .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
752            .await
753            .unwrap();
754
755        let result = manager
756            .acquire_lock("agent-2", "/test/file.txt", LockType::Write)
757            .await;
758
759        assert!(result.is_err());
760    }
761
762    #[tokio::test]
763    async fn test_write_lock_blocks_read() {
764        let manager = Arc::new(FileLockManager::new());
765
766        let _guard = manager
767            .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
768            .await
769            .unwrap();
770
771        let result = manager
772            .acquire_lock("agent-2", "/test/file.txt", LockType::Read)
773            .await;
774
775        assert!(result.is_err());
776    }
777
778    #[tokio::test]
779    async fn test_read_lock_blocks_write() {
780        let manager = Arc::new(FileLockManager::new());
781
782        let _guard = manager
783            .acquire_lock("agent-1", "/test/file.txt", LockType::Read)
784            .await
785            .unwrap();
786
787        let result = manager
788            .acquire_lock("agent-2", "/test/file.txt", LockType::Write)
789            .await;
790
791        assert!(result.is_err());
792    }
793
794    #[tokio::test]
795    async fn test_same_agent_reacquire_write() {
796        let manager = Arc::new(FileLockManager::new());
797
798        let _guard1 = manager
799            .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
800            .await
801            .unwrap();
802        let _guard2 = manager
803            .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
804            .await
805            .unwrap();
806
807        // Same agent can reacquire their own write lock
808        assert!(manager.is_locked_by("/test/file.txt", "agent-1").await);
809    }
810
811    #[tokio::test]
812    async fn test_release_all_locks() {
813        let manager = Arc::new(FileLockManager::new());
814
815        let _guard1 = manager
816            .acquire_lock("agent-1", "/test/file1.txt", LockType::Write)
817            .await
818            .unwrap();
819        let _guard2 = manager
820            .acquire_lock("agent-1", "/test/file2.txt", LockType::Read)
821            .await
822            .unwrap();
823
824        // Forget guards to prevent auto-release
825        std::mem::forget(_guard1);
826        std::mem::forget(_guard2);
827
828        let released = manager.release_all_locks("agent-1").await;
829        assert_eq!(released, 2);
830    }
831
832    #[tokio::test]
833    async fn test_lock_stats() {
834        let manager = Arc::new(FileLockManager::new());
835
836        let _guard1 = manager
837            .acquire_lock("agent-1", "/test/file1.txt", LockType::Write)
838            .await
839            .unwrap();
840        let _guard2 = manager
841            .acquire_lock("agent-2", "/test/file2.txt", LockType::Read)
842            .await
843            .unwrap();
844        let _guard3 = manager
845            .acquire_lock("agent-3", "/test/file2.txt", LockType::Read)
846            .await
847            .unwrap();
848
849        let stats = manager.stats().await;
850        assert_eq!(stats.total_files, 2);
851        assert_eq!(stats.total_write_locks, 1);
852        assert_eq!(stats.total_read_locks, 2);
853    }
854
855    #[tokio::test]
856    async fn test_can_acquire() {
857        let manager = Arc::new(FileLockManager::new());
858
859        // No locks - can acquire anything
860        assert!(
861            manager
862                .can_acquire("/test/file.txt", "agent-1", LockType::Write)
863                .await
864        );
865        assert!(
866            manager
867                .can_acquire("/test/file.txt", "agent-1", LockType::Read)
868                .await
869        );
870
871        let _guard = manager
872            .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
873            .await
874            .unwrap();
875
876        // Same agent can acquire
877        assert!(
878            manager
879                .can_acquire("/test/file.txt", "agent-1", LockType::Write)
880                .await
881        );
882        assert!(
883            manager
884                .can_acquire("/test/file.txt", "agent-1", LockType::Read)
885                .await
886        );
887
888        // Other agent cannot
889        assert!(
890            !manager
891                .can_acquire("/test/file.txt", "agent-2", LockType::Write)
892                .await
893        );
894        assert!(
895            !manager
896                .can_acquire("/test/file.txt", "agent-2", LockType::Read)
897                .await
898        );
899    }
900
901    #[tokio::test]
902    async fn test_expired_lock_cleanup() {
903        let manager = Arc::new(FileLockManager::new());
904
905        // Acquire lock with very short timeout
906        let _guard = manager
907            .acquire_lock_with_timeout(
908                "agent-1",
909                "/test/file.txt",
910                LockType::Write,
911                Some(Duration::from_millis(1)),
912            )
913            .await
914            .unwrap();
915
916        // Forget guard to prevent auto-release
917        std::mem::forget(_guard);
918
919        // Wait for expiration
920        tokio::time::sleep(Duration::from_millis(10)).await;
921
922        // Cleanup should remove expired lock
923        let cleaned = manager.cleanup_expired().await;
924        assert_eq!(cleaned, 1);
925
926        // Now another agent can acquire
927        let result = manager
928            .acquire_lock("agent-2", "/test/file.txt", LockType::Write)
929            .await;
930        assert!(result.is_ok());
931    }
932
933    #[tokio::test]
934    async fn test_force_release() {
935        let manager = Arc::new(FileLockManager::new());
936
937        let _guard = manager
938            .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
939            .await
940            .unwrap();
941
942        // Forget guard
943        std::mem::forget(_guard);
944
945        // Force release
946        manager.force_release("/test/file.txt").await.unwrap();
947
948        // Another agent can now acquire
949        let result = manager
950            .acquire_lock("agent-2", "/test/file.txt", LockType::Write)
951            .await;
952        assert!(result.is_ok());
953    }
954
955    #[tokio::test]
956    async fn test_list_locks() {
957        let manager = Arc::new(FileLockManager::new());
958
959        let _guard1 = manager
960            .acquire_lock("agent-1", "/test/file1.txt", LockType::Write)
961            .await
962            .unwrap();
963        let _guard2 = manager
964            .acquire_lock("agent-2", "/test/file2.txt", LockType::Read)
965            .await
966            .unwrap();
967
968        let locks = manager.list_locks().await;
969        assert_eq!(locks.len(), 2);
970    }
971
972    #[tokio::test]
973    async fn test_locks_for_agent() {
974        let manager = Arc::new(FileLockManager::new());
975
976        let _guard1 = manager
977            .acquire_lock("agent-1", "/test/file1.txt", LockType::Write)
978            .await
979            .unwrap();
980        let _guard2 = manager
981            .acquire_lock("agent-1", "/test/file2.txt", LockType::Read)
982            .await
983            .unwrap();
984        let _guard3 = manager
985            .acquire_lock("agent-2", "/test/file3.txt", LockType::Write)
986            .await
987            .unwrap();
988
989        let agent1_locks = manager.locks_for_agent("agent-1").await;
990        assert_eq!(agent1_locks.len(), 2);
991
992        let agent2_locks = manager.locks_for_agent("agent-2").await;
993        assert_eq!(agent2_locks.len(), 1);
994    }
995
996    #[tokio::test]
997    async fn test_acquire_with_wait_success() {
998        let manager = Arc::new(FileLockManager::new());
999
1000        // Acquire initial lock
1001        let guard = manager
1002            .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
1003            .await
1004            .unwrap();
1005
1006        // Spawn task to release after delay
1007        let manager_clone = manager.clone();
1008        tokio::spawn(async move {
1009            tokio::time::sleep(Duration::from_millis(50)).await;
1010            drop(guard);
1011            // Give time for the drop to process
1012            tokio::time::sleep(Duration::from_millis(10)).await;
1013            manager_clone.cleanup_expired().await;
1014        });
1015
1016        // Agent 2 should wait and eventually acquire
1017        let result = manager
1018            .acquire_with_wait(
1019                "agent-2",
1020                "/test/file.txt",
1021                LockType::Write,
1022                Duration::from_millis(500),
1023            )
1024            .await;
1025
1026        assert!(result.is_ok());
1027    }
1028
1029    #[tokio::test]
1030    async fn test_acquire_with_wait_timeout() {
1031        let manager = Arc::new(FileLockManager::new());
1032
1033        // Acquire lock that won't be released
1034        let _guard = manager
1035            .acquire_lock("agent-1", "/test/file.txt", LockType::Write)
1036            .await
1037            .unwrap();
1038
1039        // Agent 2 should timeout
1040        let result = manager
1041            .acquire_with_wait(
1042                "agent-2",
1043                "/test/file.txt",
1044                LockType::Write,
1045                Duration::from_millis(100),
1046            )
1047            .await;
1048
1049        assert!(result.is_err());
1050        assert!(result.unwrap_err().to_string().contains("timeout"));
1051    }
1052
1053    #[tokio::test]
1054    async fn test_deadlock_detection() {
1055        let manager = Arc::new(FileLockManager::new());
1056
1057        // Agent 1 holds lock on file1
1058        let _guard1 = manager
1059            .acquire_lock("agent-1", "/test/file1.txt", LockType::Write)
1060            .await
1061            .unwrap();
1062
1063        // Agent 2 holds lock on file2
1064        let _guard2 = manager
1065            .acquire_lock("agent-2", "/test/file2.txt", LockType::Write)
1066            .await
1067            .unwrap();
1068
1069        // Simulate agent 1 waiting for file2
1070        manager
1071            .start_waiting("agent-1", std::path::Path::new("/test/file2.txt"))
1072            .await;
1073
1074        // Agent 2 trying to acquire file1 would create a deadlock
1075        assert!(
1076            manager
1077                .would_deadlock("agent-2", std::path::Path::new("/test/file1.txt"))
1078                .await
1079        );
1080
1081        // But agent 3 trying to acquire file1 would NOT create a deadlock
1082        assert!(
1083            !manager
1084                .would_deadlock("agent-3", std::path::Path::new("/test/file1.txt"))
1085                .await
1086        );
1087    }
1088
1089    #[tokio::test]
1090    async fn test_waiting_agents() {
1091        let manager = Arc::new(FileLockManager::new());
1092
1093        // Record some waiting agents
1094        manager
1095            .start_waiting("agent-1", std::path::Path::new("/test/file1.txt"))
1096            .await;
1097        manager
1098            .start_waiting("agent-1", std::path::Path::new("/test/file2.txt"))
1099            .await;
1100        manager
1101            .start_waiting("agent-2", std::path::Path::new("/test/file1.txt"))
1102            .await;
1103
1104        let waiting = manager.get_waiting_agents().await;
1105        assert_eq!(waiting.len(), 2);
1106        assert_eq!(waiting.get("agent-1").map(|v| v.len()), Some(2));
1107        assert_eq!(waiting.get("agent-2").map(|v| v.len()), Some(1));
1108
1109        // Clear agent 1's waiting
1110        manager.clear_waiting("agent-1").await;
1111
1112        let waiting = manager.get_waiting_agents().await;
1113        assert_eq!(waiting.len(), 1);
1114        assert!(!waiting.contains_key("agent-1"));
1115    }
1116}