Skip to main content

upstream_rs/services/storage/
lock_storage.rs

1use anyhow::{Context, Result, anyhow};
2use std::{
3    fs::{self, OpenOptions},
4    io::{ErrorKind, Write},
5    path::{Path, PathBuf},
6    process, thread,
7    time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use crate::utils::platform::process_id;
11use crate::utils::static_paths::UpstreamPaths;
12
13#[derive(Debug)]
14pub struct LockStorage {
15    path: PathBuf,
16}
17
18const LOCK_POLL_INTERVAL: Duration = Duration::from_secs(1);
19
20#[derive(Default, Debug)]
21struct LockMetadata {
22    pid: Option<u32>,
23    pid_start_token: Option<String>,
24    operation: Option<String>,
25    started_at_unix: Option<u64>,
26}
27
28enum AcquireOutcome {
29    Acquired(LockStorage),
30    Waiting,
31}
32
33impl LockStorage {
34    pub fn acquire(paths: &UpstreamPaths, operation: &str) -> Result<Self> {
35        let lock_path = paths.dirs.metadata_dir.join("lock");
36        Self::acquire_at(&lock_path, operation)
37    }
38
39    fn acquire_at(lock_path: &Path, operation: &str) -> Result<Self> {
40        let mut printed_wait_notice = false;
41
42        loop {
43            match Self::try_acquire_at_internal(lock_path, operation, true)? {
44                AcquireOutcome::Acquired(lock) => return Ok(lock),
45                AcquireOutcome::Waiting => {
46                    if !printed_wait_notice {
47                        eprintln!("Waiting for lock file...");
48                        printed_wait_notice = true;
49                    }
50                    thread::sleep(LOCK_POLL_INTERVAL);
51                }
52            }
53        }
54    }
55
56    fn try_acquire_at_internal(
57        lock_path: &Path,
58        operation: &str,
59        allow_recovery: bool,
60    ) -> Result<AcquireOutcome> {
61        let lock_parent = lock_path
62            .parent()
63            .ok_or_else(|| anyhow!("Invalid lock path '{}'", lock_path.display()))?;
64
65        fs::create_dir_all(lock_parent).with_context(|| {
66            format!(
67                "Failed to create lock directory '{}'",
68                lock_parent.display()
69            )
70        })?;
71
72        let mut file = match OpenOptions::new()
73            .write(true)
74            .create_new(true)
75            .open(lock_path)
76        {
77            Ok(file) => file,
78            Err(err) if err.kind() == ErrorKind::AlreadyExists => {
79                let lock_info = fs::read_to_string(lock_path)
80                    .unwrap_or_else(|_| "<lock details unavailable>".to_string());
81
82                if allow_recovery && Self::is_stale_lock(&lock_info) {
83                    match fs::remove_file(lock_path) {
84                        Ok(_) => {
85                            return Self::try_acquire_at_internal(lock_path, operation, false);
86                        }
87                        Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {
88                            return Self::try_acquire_at_internal(lock_path, operation, false);
89                        }
90                        Err(remove_err) => {
91                            return Err(remove_err).context(format!(
92                                "Lock at '{}' appears stale but could not be removed",
93                                lock_path.display()
94                            ));
95                        }
96                    }
97                }
98
99                return Ok(AcquireOutcome::Waiting);
100            }
101            Err(err) => {
102                return Err(err).with_context(|| {
103                    format!("Failed to create lock file '{}'", lock_path.display())
104                });
105            }
106        };
107
108        let since_epoch = SystemTime::now()
109            .duration_since(UNIX_EPOCH)
110            .map(|d| d.as_secs())
111            .unwrap_or(0);
112        writeln!(file, "pid={}", process::id()).ok();
113        if let Some(identity) = process_id::current_process_identity() {
114            writeln!(file, "pid_start_token={}", identity.start_token).ok();
115        }
116        writeln!(file, "operation={}", operation).ok();
117        writeln!(file, "started_at_unix={}", since_epoch).ok();
118
119        Ok(AcquireOutcome::Acquired(Self {
120            path: lock_path.to_path_buf(),
121        }))
122    }
123
124    fn parse_lock_metadata(lock_info: &str) -> LockMetadata {
125        let mut meta = LockMetadata::default();
126        for raw_line in lock_info.lines() {
127            let line = raw_line.trim();
128            if let Some(value) = line.strip_prefix("pid=") {
129                meta.pid = value.trim().parse::<u32>().ok();
130            } else if let Some(value) = line.strip_prefix("pid_start_token=") {
131                let token = value.trim();
132                if !token.is_empty() {
133                    meta.pid_start_token = Some(token.to_string());
134                }
135            } else if let Some(value) = line.strip_prefix("operation=") {
136                let op = value.trim();
137                if !op.is_empty() {
138                    meta.operation = Some(op.to_string());
139                }
140            } else if let Some(value) = line.strip_prefix("started_at_unix=") {
141                meta.started_at_unix = value.trim().parse::<u64>().ok();
142            }
143        }
144        meta
145    }
146
147    fn is_stale_lock(lock_info: &str) -> bool {
148        let meta = Self::parse_lock_metadata(lock_info);
149
150        if let Some(pid) = meta.pid {
151            let probe = process_id::probe_process(pid);
152            if !probe.exists {
153                return true;
154            }
155
156            if let (Some(expected), Some(actual)) = (
157                meta.pid_start_token.as_deref(),
158                probe.start_token.as_deref(),
159            ) {
160                return expected != actual;
161            }
162
163            return false;
164        }
165
166        // Missing or malformed pid values are treated as stale to prevent deadlocks
167        // caused by corrupted or manually-created lock files.
168        true
169    }
170}
171
172impl Drop for LockStorage {
173    fn drop(&mut self) {
174        let _ = fs::remove_file(&self.path);
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::LockStorage;
181    use crate::utils::platform::process_id;
182    use std::{
183        fs,
184        path::PathBuf,
185        thread,
186        time::{Duration, SystemTime, UNIX_EPOCH},
187    };
188
189    fn unique_lock_path(name: &str) -> PathBuf {
190        let nanos = SystemTime::now()
191            .duration_since(UNIX_EPOCH)
192            .map(|d| d.as_nanos())
193            .unwrap_or(0);
194        std::env::temp_dir()
195            .join(format!("upstream-lock-test-{name}-{nanos}"))
196            .join("metadata")
197            .join("lock")
198    }
199
200    #[test]
201    fn lock_waits_for_concurrent_acquire_to_finish() {
202        let lock_path = unique_lock_path("concurrent");
203        let guard = LockStorage::acquire_at(&lock_path, "test").expect("first lock");
204        let release_path = lock_path.clone();
205
206        let releaser = thread::spawn(move || {
207            thread::sleep(Duration::from_millis(100));
208            drop(guard);
209        });
210
211        let _guard = LockStorage::acquire_at(&lock_path, "test").expect("lock after wait");
212        releaser.join().expect("join releaser");
213
214        let _ = fs::remove_dir_all(release_path.parent().unwrap().parent().unwrap());
215    }
216
217    #[test]
218    fn lock_releases_on_drop() {
219        let lock_path = unique_lock_path("release");
220        {
221            let _guard = LockStorage::acquire_at(&lock_path, "test").expect("first lock");
222        }
223
224        let _guard2 = LockStorage::acquire_at(&lock_path, "test").expect("lock after drop");
225
226        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
227    }
228
229    #[test]
230    fn stale_lock_is_recovered_automatically() {
231        let lock_path = unique_lock_path("stale-recover");
232        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
233        // Deliberately invalid/non-existent pid with old start time.
234        fs::write(
235            &lock_path,
236            "pid=999999\noperation=test\nstarted_at_unix=1\n",
237        )
238        .expect("write stale lock");
239
240        let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
241        let contents = fs::read_to_string(&lock_path).expect("read lock");
242        assert!(contents.contains("operation=new-op"));
243
244        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
245    }
246
247    #[test]
248    fn parse_lock_metadata_extracts_known_fields() {
249        let meta = LockStorage::parse_lock_metadata(
250            "pid=123\npid_start_token=abc123\noperation=upgrade\nstarted_at_unix=456\nunknown=ignored\n",
251        );
252        assert_eq!(meta.pid, Some(123));
253        assert_eq!(meta.pid_start_token.as_deref(), Some("abc123"));
254        assert_eq!(meta.operation.as_deref(), Some("upgrade"));
255        assert_eq!(meta.started_at_unix, Some(456));
256    }
257
258    #[test]
259    fn active_lock_still_blocks_second_acquire() {
260        let lock_path = unique_lock_path("active-block");
261        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
262        let current_pid = std::process::id();
263        let now = SystemTime::now()
264            .duration_since(UNIX_EPOCH)
265            .map(|d| d.as_secs())
266            .unwrap_or(0);
267        fs::write(
268            &lock_path,
269            format!("pid={current_pid}\noperation=test\nstarted_at_unix={now}\n"),
270        )
271        .expect("write active lock");
272
273        let outcome =
274            LockStorage::try_acquire_at_internal(&lock_path, "next-op", true).expect("try acquire");
275        assert!(matches!(outcome, super::AcquireOutcome::Waiting));
276
277        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
278    }
279
280    #[test]
281    fn active_lock_is_not_recovered_even_if_timestamp_is_old() {
282        let lock_path = unique_lock_path("active-old");
283        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
284        let current_pid = std::process::id();
285        fs::write(
286            &lock_path,
287            format!("pid={current_pid}\noperation=test\nstarted_at_unix=1\n"),
288        )
289        .expect("write active lock with old timestamp");
290
291        let outcome =
292            LockStorage::try_acquire_at_internal(&lock_path, "next-op", true).expect("try acquire");
293        assert!(matches!(outcome, super::AcquireOutcome::Waiting));
294
295        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
296    }
297
298    #[test]
299    fn lock_without_pid_is_recovered_automatically() {
300        let lock_path = unique_lock_path("missing-pid");
301        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
302        fs::write(&lock_path, "operation=test\nstarted_at_unix=1\n").expect("write stale lock");
303
304        let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
305        let contents = fs::read_to_string(&lock_path).expect("read lock");
306        assert!(contents.contains("operation=new-op"));
307
308        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
309    }
310
311    #[test]
312    fn lock_with_malformed_pid_is_recovered_automatically() {
313        let lock_path = unique_lock_path("malformed-pid");
314        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
315        fs::write(&lock_path, "pid=abc\noperation=test\nstarted_at_unix=1\n")
316            .expect("write stale lock");
317
318        let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
319        let contents = fs::read_to_string(&lock_path).expect("read lock");
320        assert!(contents.contains("operation=new-op"));
321
322        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
323    }
324
325    #[test]
326    fn stale_lock_is_recovered_when_pid_start_token_mismatches() {
327        let Some(identity) = process_id::current_process_identity() else {
328            return;
329        };
330
331        let lock_path = unique_lock_path("token-mismatch");
332        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
333        let current_pid = std::process::id();
334        fs::write(
335            &lock_path,
336            format!(
337                "pid={current_pid}\npid_start_token={}-mismatch\noperation=test\nstarted_at_unix=1\n",
338                identity.start_token
339            ),
340        )
341        .expect("write mismatched lock");
342
343        let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
344        let contents = fs::read_to_string(&lock_path).expect("read lock");
345        assert!(contents.contains("operation=new-op"));
346
347        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
348    }
349}