fileloft_store_fs/
locker.rs1use std::path::PathBuf;
2use std::time::Duration;
3
4use fileloft_core::{
5 error::TusError,
6 info::UploadId,
7 lock::{SendLock, SendLocker},
8};
9use fs4::fs_std::FileExt;
10
11#[derive(Clone, Debug)]
13pub struct FileLocker {
14 lock_dir: PathBuf,
15 pub timeout: Duration,
16}
17
18impl FileLocker {
19 pub fn new(lock_dir: impl Into<PathBuf>) -> Self {
20 Self {
21 lock_dir: lock_dir.into(),
22 timeout: Duration::from_secs(20),
23 }
24 }
25
26 pub fn with_timeout(mut self, timeout: Duration) -> Self {
27 self.timeout = timeout;
28 self
29 }
30
31 fn lock_path(&self, id: &UploadId) -> PathBuf {
32 self.lock_dir.join(format!("{}.lock", id.as_str()))
33 }
34}
35
36impl SendLocker for FileLocker {
37 type LockType = FileLock;
38
39 async fn acquire(&self, id: &UploadId) -> Result<FileLock, TusError> {
40 id.validate()?;
41 tokio::fs::create_dir_all(&self.lock_dir)
42 .await
43 .map_err(TusError::Io)?;
44 let path = self.lock_path(id);
45 let timeout = self.timeout;
46 let id_str = id.to_string();
47
48 tokio::task::spawn_blocking(move || {
49 let deadline = std::time::Instant::now() + timeout;
50 loop {
51 let f = std::fs::OpenOptions::new()
52 .create(true)
53 .truncate(false)
54 .read(true)
55 .write(true)
56 .open(&path)
57 .map_err(TusError::Io)?;
58 match f.try_lock_exclusive() {
59 Ok(true) => {
60 return Ok(FileLock {
61 file: Some(f),
62 path,
63 });
64 }
65 Ok(false) | Err(_) => {
66 drop(f);
67 }
68 }
69 if std::time::Instant::now() >= deadline {
70 return Err(TusError::LockTimeout(id_str));
71 }
72 std::thread::sleep(Duration::from_millis(10));
73 }
74 })
75 .await
76 .map_err(|e| TusError::Internal(format!("lock join: {e}")))?
77 }
78}
79
80pub struct FileLock {
81 file: Option<std::fs::File>,
82 path: PathBuf,
83}
84
85impl SendLock for FileLock {
86 async fn release(mut self) -> Result<(), TusError> {
87 let path = self.path.clone();
88 let file = self
89 .file
90 .take()
91 .ok_or_else(|| TusError::Internal("lock already released".into()))?;
92 std::mem::forget(self);
93 tokio::task::spawn_blocking(move || {
94 FileExt::unlock(&file).map_err(TusError::Io)?;
95 let _ = std::fs::remove_file(&path);
96 Ok::<(), TusError>(())
97 })
98 .await
99 .map_err(|e| TusError::Internal(format!("unlock join: {e}")))?
100 }
101}
102
103impl Drop for FileLock {
104 fn drop(&mut self) {
105 if let Some(f) = self.file.take() {
106 let _ = FileExt::unlock(&f);
107 }
108 let _ = std::fs::remove_file(&self.path);
109 }
110}