Skip to main content

upstream_rs/services/storage/
lock_storage.rs

1use anyhow::{Context, Result, anyhow};
2#[cfg(unix)]
3use std::process::Command;
4use std::{
5    fs::{self, OpenOptions},
6    io::{ErrorKind, Write},
7    path::{Path, PathBuf},
8    process, thread,
9    time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use crate::application::cli::arguments::Commands;
13use crate::utils::static_paths::UpstreamPaths;
14
15#[derive(Debug)]
16pub struct LockStorage {
17    path: PathBuf,
18}
19
20const STALE_LOCK_MAX_AGE: Duration = Duration::from_mins(5);
21const LOCK_POLL_INTERVAL: Duration = Duration::from_secs(1);
22
23#[derive(Default, Debug)]
24struct LockMetadata {
25    pid: Option<u32>,
26    operation: Option<String>,
27    started_at_unix: Option<u64>,
28}
29
30enum AcquireOutcome {
31    Acquired(LockStorage),
32    Waiting,
33}
34
35impl LockStorage {
36    pub fn acquire(paths: &UpstreamPaths, command: &Commands) -> Result<Self> {
37        let lock_path = paths.dirs.metadata_dir.join("lock");
38        let operation = command.to_string();
39        Self::acquire_at(&lock_path, &operation)
40    }
41
42    fn acquire_at(lock_path: &Path, operation: &str) -> Result<Self> {
43        let mut printed_wait_notice = false;
44
45        loop {
46            match Self::try_acquire_at_internal(lock_path, operation, true)? {
47                AcquireOutcome::Acquired(lock) => return Ok(lock),
48                AcquireOutcome::Waiting => {
49                    if !printed_wait_notice {
50                        eprintln!("Waiting for lock file...");
51                        printed_wait_notice = true;
52                    }
53                    thread::sleep(LOCK_POLL_INTERVAL);
54                }
55            }
56        }
57    }
58
59    fn try_acquire_at_internal(
60        lock_path: &Path,
61        operation: &str,
62        allow_recovery: bool,
63    ) -> Result<AcquireOutcome> {
64        let lock_parent = lock_path
65            .parent()
66            .ok_or_else(|| anyhow!("Invalid lock path '{}'", lock_path.display()))?;
67
68        fs::create_dir_all(lock_parent).with_context(|| {
69            format!(
70                "Failed to create lock directory '{}'",
71                lock_parent.display()
72            )
73        })?;
74
75        let mut file = match OpenOptions::new()
76            .write(true)
77            .create_new(true)
78            .open(lock_path)
79        {
80            Ok(file) => file,
81            Err(err) if err.kind() == ErrorKind::AlreadyExists => {
82                let lock_info = fs::read_to_string(lock_path)
83                    .unwrap_or_else(|_| "<lock details unavailable>".to_string());
84
85                if allow_recovery && Self::is_stale_lock(&lock_info) {
86                    match fs::remove_file(lock_path) {
87                        Ok(_) => {
88                            return Self::try_acquire_at_internal(lock_path, operation, false);
89                        }
90                        Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {
91                            return Self::try_acquire_at_internal(lock_path, operation, false);
92                        }
93                        Err(remove_err) => {
94                            return Err(remove_err).context(format!(
95                                "Lock at '{}' appears stale but could not be removed",
96                                lock_path.display()
97                            ));
98                        }
99                    }
100                }
101
102                return Ok(AcquireOutcome::Waiting);
103            }
104            Err(err) => {
105                return Err(err).with_context(|| {
106                    format!("Failed to create lock file '{}'", lock_path.display())
107                });
108            }
109        };
110
111        let since_epoch = SystemTime::now()
112            .duration_since(UNIX_EPOCH)
113            .map(|d| d.as_secs())
114            .unwrap_or(0);
115        writeln!(file, "pid={}", process::id()).ok();
116        writeln!(file, "operation={}", operation).ok();
117        writeln!(file, "started_at_unix={}", since_epoch).ok();
118
119        Ok(AcquireOutcome::Acquired(Self {
120            path: lock_path.to_path_buf(),
121        }))
122    }
123
124    fn parse_lock_metadata(lock_info: &str) -> LockMetadata {
125        let mut meta = LockMetadata::default();
126        for raw_line in lock_info.lines() {
127            let line = raw_line.trim();
128            if let Some(value) = line.strip_prefix("pid=") {
129                meta.pid = value.trim().parse::<u32>().ok();
130            } else if let Some(value) = line.strip_prefix("operation=") {
131                let op = value.trim();
132                if !op.is_empty() {
133                    meta.operation = Some(op.to_string());
134                }
135            } else if let Some(value) = line.strip_prefix("started_at_unix=") {
136                meta.started_at_unix = value.trim().parse::<u64>().ok();
137            }
138        }
139        meta
140    }
141
142    fn is_stale_lock(lock_info: &str) -> bool {
143        let meta = Self::parse_lock_metadata(lock_info);
144
145        if let Some(pid) = meta.pid {
146            // If the PID is still alive, never steal the lock based on age alone.
147            return !Self::process_exists(pid);
148        }
149
150        if let Some(started_at) = meta.started_at_unix {
151            let now = SystemTime::now()
152                .duration_since(UNIX_EPOCH)
153                .map(|d| d.as_secs())
154                .unwrap_or(started_at);
155            if Duration::from_secs(now.saturating_sub(started_at)) > STALE_LOCK_MAX_AGE {
156                return true;
157            }
158        }
159
160        false
161    }
162
163    fn process_exists(pid: u32) -> bool {
164        if pid == 0 {
165            return false;
166        }
167
168        #[cfg(unix)]
169        {
170            // Linux: /proc is cheap and reliable.
171            if Path::new("/proc").exists() {
172                return Path::new("/proc").join(pid.to_string()).exists();
173            }
174
175            // macOS/BSD fallback: `kill -0 <pid>` checks whether the process exists.
176            // If the probe command is unavailable, avoid false stale detection.
177            Command::new("kill")
178                .arg("-0")
179                .arg(pid.to_string())
180                .status()
181                .map(|status| status.success())
182                .unwrap_or(true)
183        }
184
185        #[cfg(not(unix))]
186        {
187            let _ = pid;
188            true
189        }
190    }
191}
192
193impl Drop for LockStorage {
194    fn drop(&mut self) {
195        let _ = fs::remove_file(&self.path);
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::LockStorage;
202    use std::{
203        fs,
204        path::PathBuf,
205        thread,
206        time::{Duration, SystemTime, UNIX_EPOCH},
207    };
208
209    fn unique_lock_path(name: &str) -> PathBuf {
210        let nanos = SystemTime::now()
211            .duration_since(UNIX_EPOCH)
212            .map(|d| d.as_nanos())
213            .unwrap_or(0);
214        std::env::temp_dir()
215            .join(format!("upstream-lock-test-{name}-{nanos}"))
216            .join("metadata")
217            .join("lock")
218    }
219
220    #[test]
221    fn lock_waits_for_concurrent_acquire_to_finish() {
222        let lock_path = unique_lock_path("concurrent");
223        let guard = LockStorage::acquire_at(&lock_path, "test").expect("first lock");
224        let release_path = lock_path.clone();
225
226        let releaser = thread::spawn(move || {
227            thread::sleep(Duration::from_millis(100));
228            drop(guard);
229        });
230
231        let _guard = LockStorage::acquire_at(&lock_path, "test").expect("lock after wait");
232        releaser.join().expect("join releaser");
233
234        let _ = fs::remove_dir_all(release_path.parent().unwrap().parent().unwrap());
235    }
236
237    #[test]
238    fn lock_releases_on_drop() {
239        let lock_path = unique_lock_path("release");
240        {
241            let _guard = LockStorage::acquire_at(&lock_path, "test").expect("first lock");
242        }
243
244        let _guard2 = LockStorage::acquire_at(&lock_path, "test").expect("lock after drop");
245
246        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
247    }
248
249    #[test]
250    fn stale_lock_is_recovered_automatically() {
251        let lock_path = unique_lock_path("stale-recover");
252        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
253        // Deliberately invalid/non-existent pid with old start time.
254        fs::write(
255            &lock_path,
256            "pid=999999\noperation=test\nstarted_at_unix=1\n",
257        )
258        .expect("write stale lock");
259
260        let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
261        let contents = fs::read_to_string(&lock_path).expect("read lock");
262        assert!(contents.contains("operation=new-op"));
263
264        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
265    }
266
267    #[test]
268    fn parse_lock_metadata_extracts_known_fields() {
269        let meta = LockStorage::parse_lock_metadata(
270            "pid=123\noperation=upgrade\nstarted_at_unix=456\nunknown=ignored\n",
271        );
272        assert_eq!(meta.pid, Some(123));
273        assert_eq!(meta.operation.as_deref(), Some("upgrade"));
274        assert_eq!(meta.started_at_unix, Some(456));
275    }
276
277    #[test]
278    fn active_lock_still_blocks_second_acquire() {
279        let lock_path = unique_lock_path("active-block");
280        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
281        let current_pid = std::process::id();
282        let now = SystemTime::now()
283            .duration_since(UNIX_EPOCH)
284            .map(|d| d.as_secs())
285            .unwrap_or(0);
286        fs::write(
287            &lock_path,
288            format!("pid={current_pid}\noperation=test\nstarted_at_unix={now}\n"),
289        )
290        .expect("write active lock");
291
292        let outcome =
293            LockStorage::try_acquire_at_internal(&lock_path, "next-op", true).expect("try acquire");
294        assert!(matches!(outcome, super::AcquireOutcome::Waiting));
295
296        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
297    }
298
299    #[test]
300    fn active_lock_is_not_recovered_even_if_timestamp_is_old() {
301        let lock_path = unique_lock_path("active-old");
302        fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
303        let current_pid = std::process::id();
304        fs::write(
305            &lock_path,
306            format!("pid={current_pid}\noperation=test\nstarted_at_unix=1\n"),
307        )
308        .expect("write active lock with old timestamp");
309
310        let outcome =
311            LockStorage::try_acquire_at_internal(&lock_path, "next-op", true).expect("try acquire");
312        assert!(matches!(outcome, super::AcquireOutcome::Waiting));
313
314        let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
315    }
316}