Skip to main content

upstream_rs/services/storage/
lock_storage.rs

1use anyhow::{Context, Result, anyhow};
2use std::{
3    fs::{self, OpenOptions},
4    io::{ErrorKind, Write},
5    path::{Path, PathBuf},
6    process, thread,
7    time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use crate::application::cli::arguments::Commands;
11use crate::utils::platform::process_id;
12use crate::utils::static_paths::UpstreamPaths;
13
14#[derive(Debug)]
15pub struct LockStorage {
16    path: PathBuf,
17}
18
19const LOCK_POLL_INTERVAL: Duration = Duration::from_secs(1);
20
21#[derive(Default, Debug)]
22struct LockMetadata {
23    pid: Option<u32>,
24    pid_start_token: Option<String>,
25    operation: Option<String>,
26    started_at_unix: Option<u64>,
27}
28
29enum AcquireOutcome {
30    Acquired(LockStorage),
31    Waiting,
32}
33
34impl LockStorage {
35    pub fn acquire(paths: &UpstreamPaths, command: &Commands) -> Result<Self> {
36        let lock_path = paths.dirs.metadata_dir.join("lock");
37        let operation = command.to_string();
38        Self::acquire_at(&lock_path, &operation)
39    }
40
41    fn acquire_at(lock_path: &Path, operation: &str) -> Result<Self> {
42        let mut printed_wait_notice = false;
43
44        loop {
45            match Self::try_acquire_at_internal(lock_path, operation, true)? {
46                AcquireOutcome::Acquired(lock) => return Ok(lock),
47                AcquireOutcome::Waiting => {
48                    if !printed_wait_notice {
49                        eprintln!("Waiting for lock file...");
50                        printed_wait_notice = true;
51                    }
52                    thread::sleep(LOCK_POLL_INTERVAL);
53                }
54            }
55        }
56    }
57
58    fn try_acquire_at_internal(
59        lock_path: &Path,
60        operation: &str,
61        allow_recovery: bool,
62    ) -> Result<AcquireOutcome> {
63        let lock_parent = lock_path
64            .parent()
65            .ok_or_else(|| anyhow!("Invalid lock path '{}'", lock_path.display()))?;
66
67        fs::create_dir_all(lock_parent).with_context(|| {
68            format!(
69                "Failed to create lock directory '{}'",
70                lock_parent.display()
71            )
72        })?;
73
74        let mut file = match OpenOptions::new()
75            .write(true)
76            .create_new(true)
77            .open(lock_path)
78        {
79            Ok(file) => file,
80            Err(err) if err.kind() == ErrorKind::AlreadyExists => {
81                let lock_info = fs::read_to_string(lock_path)
82                    .unwrap_or_else(|_| "<lock details unavailable>".to_string());
83
84                if allow_recovery && Self::is_stale_lock(&lock_info) {
85                    match fs::remove_file(lock_path) {
86                        Ok(_) => {
87                            return Self::try_acquire_at_internal(lock_path, operation, false);
88                        }
89                        Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {
90                            return Self::try_acquire_at_internal(lock_path, operation, false);
91                        }
92                        Err(remove_err) => {
93                            return Err(remove_err).context(format!(
94                                "Lock at '{}' appears stale but could not be removed",
95                                lock_path.display()
96                            ));
97                        }
98                    }
99                }
100
101                return Ok(AcquireOutcome::Waiting);
102            }
103            Err(err) => {
104                return Err(err).with_context(|| {
105                    format!("Failed to create lock file '{}'", lock_path.display())
106                });
107            }
108        };
109
110        let since_epoch = SystemTime::now()
111            .duration_since(UNIX_EPOCH)
112            .map(|d| d.as_secs())
113            .unwrap_or(0);
114        writeln!(file, "pid={}", process::id()).ok();
115        if let Some(identity) = process_id::current_process_identity() {
116            writeln!(file, "pid_start_token={}", identity.start_token).ok();
117        }
118        writeln!(file, "operation={}", operation).ok();
119        writeln!(file, "started_at_unix={}", since_epoch).ok();
120
121        Ok(AcquireOutcome::Acquired(Self {
122            path: lock_path.to_path_buf(),
123        }))
124    }
125
126    fn parse_lock_metadata(lock_info: &str) -> LockMetadata {
127        let mut meta = LockMetadata::default();
128        for raw_line in lock_info.lines() {
129            let line = raw_line.trim();
130            if let Some(value) = line.strip_prefix("pid=") {
131                meta.pid = value.trim().parse::<u32>().ok();
132            } else if let Some(value) = line.strip_prefix("pid_start_token=") {
133                let token = value.trim();
134                if !token.is_empty() {
135                    meta.pid_start_token = Some(token.to_string());
136                }
137            } else if let Some(value) = line.strip_prefix("operation=") {
138                let op = value.trim();
139                if !op.is_empty() {
140                    meta.operation = Some(op.to_string());
141                }
142            } else if let Some(value) = line.strip_prefix("started_at_unix=") {
143                meta.started_at_unix = value.trim().parse::<u64>().ok();
144            }
145        }
146        meta
147    }
148
149    fn is_stale_lock(lock_info: &str) -> bool {
150        let meta = Self::parse_lock_metadata(lock_info);
151
152        if let Some(pid) = meta.pid {
153            let probe = process_id::probe_process(pid);
154            if !probe.exists {
155                return true;
156            }
157
158            if let (Some(expected), Some(actual)) = (
159                meta.pid_start_token.as_deref(),
160                probe.start_token.as_deref(),
161            ) {
162                return expected != actual;
163            }
164
165            return false;
166        }
167
168        // Missing or malformed pid values are treated as stale to prevent deadlocks
169        // caused by corrupted or manually-created lock files.
170        true
171    }
172}
173
174impl Drop for LockStorage {
175    fn drop(&mut self) {
176        let _ = fs::remove_file(&self.path);
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::LockStorage;
183    use crate::utils::platform::process_id;
184    use std::{
185        fs,
186        path::PathBuf,
187        thread,
188        time::{Duration, SystemTime, UNIX_EPOCH},
189    };
190
191    fn unique_lock_path(name: &str) -> PathBuf {
192        let nanos = SystemTime::now()
193            .duration_since(UNIX_EPOCH)
194            .map(|d| d.as_nanos())
195            .unwrap_or(0);
196        std::env::temp_dir()
197            .join(format!("upstream-lock-test-{name}-{nanos}"))
198            .join("metadata")
199            .join("lock")
200    }
201
202    #[test]
203    fn lock_waits_for_concurrent_acquire_to_finish() {
204        let lock_path = unique_lock_path("concurrent");
205        let guard = LockStorage::acquire_at(&lock_path, "test").expect("first lock");
206        let release_path = lock_path.clone();
207
208        let releaser = thread::spawn(move || {
209            thread::sleep(Duration::from_millis(100));
210            drop(guard);
211        });
212
213        let _guard = LockStorage::acquire_at(&lock_path, "test").expect("lock after wait");
214        releaser.join().expect("join releaser");
215
216        let _ = fs::remove_dir_all(release_path.parent().unwrap().parent().unwrap());
217    }
218
219    #[test]
220    fn lock_releases_on_drop() {
221        let lock_path = unique_lock_path("release");
222        {
223            let _guard = LockStorage::acquire_at(&lock_path, "test").expect("first lock");
224        }
225
226        let _guard2 = LockStorage::acquire_at(&lock_path, "test").expect("lock after drop");
227
228        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
229    }
230
231    #[test]
232    fn stale_lock_is_recovered_automatically() {
233        let lock_path = unique_lock_path("stale-recover");
234        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
235        // Deliberately invalid/non-existent pid with old start time.
236        fs::write(
237            &lock_path,
238            "pid=999999\noperation=test\nstarted_at_unix=1\n",
239        )
240        .expect("write stale lock");
241
242        let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
243        let contents = fs::read_to_string(&lock_path).expect("read lock");
244        assert!(contents.contains("operation=new-op"));
245
246        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
247    }
248
249    #[test]
250    fn parse_lock_metadata_extracts_known_fields() {
251        let meta = LockStorage::parse_lock_metadata(
252            "pid=123\npid_start_token=abc123\noperation=upgrade\nstarted_at_unix=456\nunknown=ignored\n",
253        );
254        assert_eq!(meta.pid, Some(123));
255        assert_eq!(meta.pid_start_token.as_deref(), Some("abc123"));
256        assert_eq!(meta.operation.as_deref(), Some("upgrade"));
257        assert_eq!(meta.started_at_unix, Some(456));
258    }
259
260    #[test]
261    fn active_lock_still_blocks_second_acquire() {
262        let lock_path = unique_lock_path("active-block");
263        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
264        let current_pid = std::process::id();
265        let now = SystemTime::now()
266            .duration_since(UNIX_EPOCH)
267            .map(|d| d.as_secs())
268            .unwrap_or(0);
269        fs::write(
270            &lock_path,
271            format!("pid={current_pid}\noperation=test\nstarted_at_unix={now}\n"),
272        )
273        .expect("write active lock");
274
275        let outcome =
276            LockStorage::try_acquire_at_internal(&lock_path, "next-op", true).expect("try acquire");
277        assert!(matches!(outcome, super::AcquireOutcome::Waiting));
278
279        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
280    }
281
282    #[test]
283    fn active_lock_is_not_recovered_even_if_timestamp_is_old() {
284        let lock_path = unique_lock_path("active-old");
285        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
286        let current_pid = std::process::id();
287        fs::write(
288            &lock_path,
289            format!("pid={current_pid}\noperation=test\nstarted_at_unix=1\n"),
290        )
291        .expect("write active lock with old timestamp");
292
293        let outcome =
294            LockStorage::try_acquire_at_internal(&lock_path, "next-op", true).expect("try acquire");
295        assert!(matches!(outcome, super::AcquireOutcome::Waiting));
296
297        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
298    }
299
300    #[test]
301    fn lock_without_pid_is_recovered_automatically() {
302        let lock_path = unique_lock_path("missing-pid");
303        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
304        fs::write(&lock_path, "operation=test\nstarted_at_unix=1\n").expect("write stale lock");
305
306        let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
307        let contents = fs::read_to_string(&lock_path).expect("read lock");
308        assert!(contents.contains("operation=new-op"));
309
310        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
311    }
312
313    #[test]
314    fn lock_with_malformed_pid_is_recovered_automatically() {
315        let lock_path = unique_lock_path("malformed-pid");
316        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
317        fs::write(&lock_path, "pid=abc\noperation=test\nstarted_at_unix=1\n")
318            .expect("write stale lock");
319
320        let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
321        let contents = fs::read_to_string(&lock_path).expect("read lock");
322        assert!(contents.contains("operation=new-op"));
323
324        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
325    }
326
327    #[test]
328    fn stale_lock_is_recovered_when_pid_start_token_mismatches() {
329        let Some(identity) = process_id::current_process_identity() else {
330            return;
331        };
332
333        let lock_path = unique_lock_path("token-mismatch");
334        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
335        let current_pid = std::process::id();
336        fs::write(
337            &lock_path,
338            format!(
339                "pid={current_pid}\npid_start_token={}-mismatch\noperation=test\nstarted_at_unix=1\n",
340                identity.start_token
341            ),
342        )
343        .expect("write mismatched lock");
344
345        let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
346        let contents = fs::read_to_string(&lock_path).expect("read lock");
347        assert!(contents.contains("operation=new-op"));
348
349        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
350    }
351}