upstream_rs/services/storage/
lock_storage.rs1use anyhow::{Context, Result, anyhow};
2#[cfg(unix)]
3use std::process::Command;
4use std::{
5 fs::{self, OpenOptions},
6 io::{ErrorKind, Write},
7 path::{Path, PathBuf},
8 process, thread,
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use crate::application::cli::arguments::Commands;
13use crate::utils::static_paths::UpstreamPaths;
14
15#[derive(Debug)]
16pub struct LockStorage {
17 path: PathBuf,
18}
19
20const STALE_LOCK_MAX_AGE: Duration = Duration::from_mins(5);
21const LOCK_POLL_INTERVAL: Duration = Duration::from_secs(1);
22
23#[derive(Default, Debug)]
24struct LockMetadata {
25 pid: Option<u32>,
26 operation: Option<String>,
27 started_at_unix: Option<u64>,
28}
29
30enum AcquireOutcome {
31 Acquired(LockStorage),
32 Waiting,
33}
34
35impl LockStorage {
36 pub fn acquire(paths: &UpstreamPaths, command: &Commands) -> Result<Self> {
37 let lock_path = paths.dirs.metadata_dir.join("lock");
38 let operation = command.to_string();
39 Self::acquire_at(&lock_path, &operation)
40 }
41
42 fn acquire_at(lock_path: &Path, operation: &str) -> Result<Self> {
43 let mut printed_wait_notice = false;
44
45 loop {
46 match Self::try_acquire_at_internal(lock_path, operation, true)? {
47 AcquireOutcome::Acquired(lock) => return Ok(lock),
48 AcquireOutcome::Waiting => {
49 if !printed_wait_notice {
50 eprintln!("Waiting for lock file...");
51 printed_wait_notice = true;
52 }
53 thread::sleep(LOCK_POLL_INTERVAL);
54 }
55 }
56 }
57 }
58
59 fn try_acquire_at_internal(
60 lock_path: &Path,
61 operation: &str,
62 allow_recovery: bool,
63 ) -> Result<AcquireOutcome> {
64 let lock_parent = lock_path
65 .parent()
66 .ok_or_else(|| anyhow!("Invalid lock path '{}'", lock_path.display()))?;
67
68 fs::create_dir_all(lock_parent).with_context(|| {
69 format!(
70 "Failed to create lock directory '{}'",
71 lock_parent.display()
72 )
73 })?;
74
75 let mut file = match OpenOptions::new()
76 .write(true)
77 .create_new(true)
78 .open(lock_path)
79 {
80 Ok(file) => file,
81 Err(err) if err.kind() == ErrorKind::AlreadyExists => {
82 let lock_info = fs::read_to_string(lock_path)
83 .unwrap_or_else(|_| "<lock details unavailable>".to_string());
84
85 if allow_recovery && Self::is_stale_lock(&lock_info) {
86 match fs::remove_file(lock_path) {
87 Ok(_) => {
88 return Self::try_acquire_at_internal(lock_path, operation, false);
89 }
90 Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {
91 return Self::try_acquire_at_internal(lock_path, operation, false);
92 }
93 Err(remove_err) => {
94 return Err(remove_err).context(format!(
95 "Lock at '{}' appears stale but could not be removed",
96 lock_path.display()
97 ));
98 }
99 }
100 }
101
102 return Ok(AcquireOutcome::Waiting);
103 }
104 Err(err) => {
105 return Err(err).with_context(|| {
106 format!("Failed to create lock file '{}'", lock_path.display())
107 });
108 }
109 };
110
111 let since_epoch = SystemTime::now()
112 .duration_since(UNIX_EPOCH)
113 .map(|d| d.as_secs())
114 .unwrap_or(0);
115 writeln!(file, "pid={}", process::id()).ok();
116 writeln!(file, "operation={}", operation).ok();
117 writeln!(file, "started_at_unix={}", since_epoch).ok();
118
119 Ok(AcquireOutcome::Acquired(Self {
120 path: lock_path.to_path_buf(),
121 }))
122 }
123
124 fn parse_lock_metadata(lock_info: &str) -> LockMetadata {
125 let mut meta = LockMetadata::default();
126 for raw_line in lock_info.lines() {
127 let line = raw_line.trim();
128 if let Some(value) = line.strip_prefix("pid=") {
129 meta.pid = value.trim().parse::<u32>().ok();
130 } else if let Some(value) = line.strip_prefix("operation=") {
131 let op = value.trim();
132 if !op.is_empty() {
133 meta.operation = Some(op.to_string());
134 }
135 } else if let Some(value) = line.strip_prefix("started_at_unix=") {
136 meta.started_at_unix = value.trim().parse::<u64>().ok();
137 }
138 }
139 meta
140 }
141
142 fn is_stale_lock(lock_info: &str) -> bool {
143 let meta = Self::parse_lock_metadata(lock_info);
144
145 if let Some(pid) = meta.pid {
146 return !Self::process_exists(pid);
148 }
149
150 if let Some(started_at) = meta.started_at_unix {
151 let now = SystemTime::now()
152 .duration_since(UNIX_EPOCH)
153 .map(|d| d.as_secs())
154 .unwrap_or(started_at);
155 if Duration::from_secs(now.saturating_sub(started_at)) > STALE_LOCK_MAX_AGE {
156 return true;
157 }
158 }
159
160 false
161 }
162
163 fn process_exists(pid: u32) -> bool {
164 if pid == 0 {
165 return false;
166 }
167
168 #[cfg(unix)]
169 {
170 if Path::new("/proc").exists() {
172 return Path::new("/proc").join(pid.to_string()).exists();
173 }
174
175 Command::new("kill")
178 .arg("-0")
179 .arg(pid.to_string())
180 .status()
181 .map(|status| status.success())
182 .unwrap_or(true)
183 }
184
185 #[cfg(not(unix))]
186 {
187 let _ = pid;
188 true
189 }
190 }
191}
192
193impl Drop for LockStorage {
194 fn drop(&mut self) {
195 let _ = fs::remove_file(&self.path);
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::LockStorage;
202 use std::{
203 fs,
204 path::PathBuf,
205 thread,
206 time::{Duration, SystemTime, UNIX_EPOCH},
207 };
208
209 fn unique_lock_path(name: &str) -> PathBuf {
210 let nanos = SystemTime::now()
211 .duration_since(UNIX_EPOCH)
212 .map(|d| d.as_nanos())
213 .unwrap_or(0);
214 std::env::temp_dir()
215 .join(format!("upstream-lock-test-{name}-{nanos}"))
216 .join("metadata")
217 .join("lock")
218 }
219
220 #[test]
221 fn lock_waits_for_concurrent_acquire_to_finish() {
222 let lock_path = unique_lock_path("concurrent");
223 let guard = LockStorage::acquire_at(&lock_path, "test").expect("first lock");
224 let release_path = lock_path.clone();
225
226 let releaser = thread::spawn(move || {
227 thread::sleep(Duration::from_millis(100));
228 drop(guard);
229 });
230
231 let _guard = LockStorage::acquire_at(&lock_path, "test").expect("lock after wait");
232 releaser.join().expect("join releaser");
233
234 let _ = fs::remove_dir_all(release_path.parent().unwrap().parent().unwrap());
235 }
236
237 #[test]
238 fn lock_releases_on_drop() {
239 let lock_path = unique_lock_path("release");
240 {
241 let _guard = LockStorage::acquire_at(&lock_path, "test").expect("first lock");
242 }
243
244 let _guard2 = LockStorage::acquire_at(&lock_path, "test").expect("lock after drop");
245
246 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
247 }
248
249 #[test]
250 fn stale_lock_is_recovered_automatically() {
251 let lock_path = unique_lock_path("stale-recover");
252 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
253 fs::write(
255 &lock_path,
256 "pid=999999\noperation=test\nstarted_at_unix=1\n",
257 )
258 .expect("write stale lock");
259
260 let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
261 let contents = fs::read_to_string(&lock_path).expect("read lock");
262 assert!(contents.contains("operation=new-op"));
263
264 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
265 }
266
267 #[test]
268 fn parse_lock_metadata_extracts_known_fields() {
269 let meta = LockStorage::parse_lock_metadata(
270 "pid=123\noperation=upgrade\nstarted_at_unix=456\nunknown=ignored\n",
271 );
272 assert_eq!(meta.pid, Some(123));
273 assert_eq!(meta.operation.as_deref(), Some("upgrade"));
274 assert_eq!(meta.started_at_unix, Some(456));
275 }
276
277 #[test]
278 fn active_lock_still_blocks_second_acquire() {
279 let lock_path = unique_lock_path("active-block");
280 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
281 let current_pid = std::process::id();
282 let now = SystemTime::now()
283 .duration_since(UNIX_EPOCH)
284 .map(|d| d.as_secs())
285 .unwrap_or(0);
286 fs::write(
287 &lock_path,
288 format!("pid={current_pid}\noperation=test\nstarted_at_unix={now}\n"),
289 )
290 .expect("write active lock");
291
292 let outcome =
293 LockStorage::try_acquire_at_internal(&lock_path, "next-op", true).expect("try acquire");
294 assert!(matches!(outcome, super::AcquireOutcome::Waiting));
295
296 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
297 }
298
299 #[test]
300 fn active_lock_is_not_recovered_even_if_timestamp_is_old() {
301 let lock_path = unique_lock_path("active-old");
302 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
303 let current_pid = std::process::id();
304 fs::write(
305 &lock_path,
306 format!("pid={current_pid}\noperation=test\nstarted_at_unix=1\n"),
307 )
308 .expect("write active lock with old timestamp");
309
310 let outcome =
311 LockStorage::try_acquire_at_internal(&lock_path, "next-op", true).expect("try acquire");
312 assert!(matches!(outcome, super::AcquireOutcome::Waiting));
313
314 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
315 }
316}