Skip to main content

fileloft_store_fs/
locker.rs

1use std::path::PathBuf;
2use std::time::Duration;
3
4use fileloft_core::{
5    error::TusError,
6    info::UploadId,
7    lock::{SendLock, SendLocker},
8};
9use fs4::fs_std::FileExt;
10
11/// Directory for per-upload `*.lock` files (shared across processes using the same upload root).
12#[derive(Clone, Debug)]
13pub struct FileLocker {
14    lock_dir: PathBuf,
15    pub timeout: Duration,
16}
17
18impl FileLocker {
19    pub fn new(lock_dir: impl Into<PathBuf>) -> Self {
20        Self {
21            lock_dir: lock_dir.into(),
22            timeout: Duration::from_secs(20),
23        }
24    }
25
26    pub fn with_timeout(mut self, timeout: Duration) -> Self {
27        self.timeout = timeout;
28        self
29    }
30
31    fn lock_path(&self, id: &UploadId) -> PathBuf {
32        self.lock_dir.join(format!("{}.lock", id.as_str()))
33    }
34}
35
36impl SendLocker for FileLocker {
37    type LockType = FileLock;
38
39    async fn acquire(&self, id: &UploadId) -> Result<FileLock, TusError> {
40        id.validate()?;
41        tokio::fs::create_dir_all(&self.lock_dir)
42            .await
43            .map_err(TusError::Io)?;
44        let path = self.lock_path(id);
45        let timeout = self.timeout;
46        let id_str = id.to_string();
47
48        tokio::task::spawn_blocking(move || {
49            let deadline = std::time::Instant::now() + timeout;
50            loop {
51                let f = std::fs::OpenOptions::new()
52                    .create(true)
53                    .truncate(false)
54                    .read(true)
55                    .write(true)
56                    .open(&path)
57                    .map_err(TusError::Io)?;
58                match f.try_lock_exclusive() {
59                    Ok(true) => {
60                        return Ok(FileLock {
61                            file: Some(f),
62                            path,
63                        });
64                    }
65                    Ok(false) | Err(_) => {
66                        drop(f);
67                    }
68                }
69                if std::time::Instant::now() >= deadline {
70                    return Err(TusError::LockTimeout(id_str));
71                }
72                std::thread::sleep(Duration::from_millis(10));
73            }
74        })
75        .await
76        .map_err(|e| TusError::Internal(format!("lock join: {e}")))?
77    }
78}
79
80pub struct FileLock {
81    file: Option<std::fs::File>,
82    path: PathBuf,
83}
84
85impl SendLock for FileLock {
86    async fn release(mut self) -> Result<(), TusError> {
87        let path = self.path.clone();
88        let file = self
89            .file
90            .take()
91            .ok_or_else(|| TusError::Internal("lock already released".into()))?;
92        std::mem::forget(self);
93        tokio::task::spawn_blocking(move || {
94            FileExt::unlock(&file).map_err(TusError::Io)?;
95            let _ = std::fs::remove_file(&path);
96            Ok::<(), TusError>(())
97        })
98        .await
99        .map_err(|e| TusError::Internal(format!("unlock join: {e}")))?
100    }
101}
102
103impl Drop for FileLock {
104    fn drop(&mut self) {
105        if let Some(f) = self.file.take() {
106            let _ = FileExt::unlock(&f);
107        }
108        let _ = std::fs::remove_file(&self.path);
109    }
110}