upstream_rs/services/storage/
lock_storage.rs1use anyhow::{Context, Result, anyhow};
2use std::{
3 fs::{self, OpenOptions},
4 io::{ErrorKind, Write},
5 path::{Path, PathBuf},
6 process, thread,
7 time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use crate::application::cli::arguments::Commands;
11use crate::utils::platform::process_id;
12use crate::utils::static_paths::UpstreamPaths;
13
14#[derive(Debug)]
15pub struct LockStorage {
16 path: PathBuf,
17}
18
19const LOCK_POLL_INTERVAL: Duration = Duration::from_secs(1);
20
21#[derive(Default, Debug)]
22struct LockMetadata {
23 pid: Option<u32>,
24 pid_start_token: Option<String>,
25 operation: Option<String>,
26 started_at_unix: Option<u64>,
27}
28
29enum AcquireOutcome {
30 Acquired(LockStorage),
31 Waiting,
32}
33
34impl LockStorage {
35 pub fn acquire(paths: &UpstreamPaths, command: &Commands) -> Result<Self> {
36 let lock_path = paths.dirs.metadata_dir.join("lock");
37 let operation = command.to_string();
38 Self::acquire_at(&lock_path, &operation)
39 }
40
41 fn acquire_at(lock_path: &Path, operation: &str) -> Result<Self> {
42 let mut printed_wait_notice = false;
43
44 loop {
45 match Self::try_acquire_at_internal(lock_path, operation, true)? {
46 AcquireOutcome::Acquired(lock) => return Ok(lock),
47 AcquireOutcome::Waiting => {
48 if !printed_wait_notice {
49 eprintln!("Waiting for lock file...");
50 printed_wait_notice = true;
51 }
52 thread::sleep(LOCK_POLL_INTERVAL);
53 }
54 }
55 }
56 }
57
58 fn try_acquire_at_internal(
59 lock_path: &Path,
60 operation: &str,
61 allow_recovery: bool,
62 ) -> Result<AcquireOutcome> {
63 let lock_parent = lock_path
64 .parent()
65 .ok_or_else(|| anyhow!("Invalid lock path '{}'", lock_path.display()))?;
66
67 fs::create_dir_all(lock_parent).with_context(|| {
68 format!(
69 "Failed to create lock directory '{}'",
70 lock_parent.display()
71 )
72 })?;
73
74 let mut file = match OpenOptions::new()
75 .write(true)
76 .create_new(true)
77 .open(lock_path)
78 {
79 Ok(file) => file,
80 Err(err) if err.kind() == ErrorKind::AlreadyExists => {
81 let lock_info = fs::read_to_string(lock_path)
82 .unwrap_or_else(|_| "<lock details unavailable>".to_string());
83
84 if allow_recovery && Self::is_stale_lock(&lock_info) {
85 match fs::remove_file(lock_path) {
86 Ok(_) => {
87 return Self::try_acquire_at_internal(lock_path, operation, false);
88 }
89 Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {
90 return Self::try_acquire_at_internal(lock_path, operation, false);
91 }
92 Err(remove_err) => {
93 return Err(remove_err).context(format!(
94 "Lock at '{}' appears stale but could not be removed",
95 lock_path.display()
96 ));
97 }
98 }
99 }
100
101 return Ok(AcquireOutcome::Waiting);
102 }
103 Err(err) => {
104 return Err(err).with_context(|| {
105 format!("Failed to create lock file '{}'", lock_path.display())
106 });
107 }
108 };
109
110 let since_epoch = SystemTime::now()
111 .duration_since(UNIX_EPOCH)
112 .map(|d| d.as_secs())
113 .unwrap_or(0);
114 writeln!(file, "pid={}", process::id()).ok();
115 if let Some(identity) = process_id::current_process_identity() {
116 writeln!(file, "pid_start_token={}", identity.start_token).ok();
117 }
118 writeln!(file, "operation={}", operation).ok();
119 writeln!(file, "started_at_unix={}", since_epoch).ok();
120
121 Ok(AcquireOutcome::Acquired(Self {
122 path: lock_path.to_path_buf(),
123 }))
124 }
125
126 fn parse_lock_metadata(lock_info: &str) -> LockMetadata {
127 let mut meta = LockMetadata::default();
128 for raw_line in lock_info.lines() {
129 let line = raw_line.trim();
130 if let Some(value) = line.strip_prefix("pid=") {
131 meta.pid = value.trim().parse::<u32>().ok();
132 } else if let Some(value) = line.strip_prefix("pid_start_token=") {
133 let token = value.trim();
134 if !token.is_empty() {
135 meta.pid_start_token = Some(token.to_string());
136 }
137 } else if let Some(value) = line.strip_prefix("operation=") {
138 let op = value.trim();
139 if !op.is_empty() {
140 meta.operation = Some(op.to_string());
141 }
142 } else if let Some(value) = line.strip_prefix("started_at_unix=") {
143 meta.started_at_unix = value.trim().parse::<u64>().ok();
144 }
145 }
146 meta
147 }
148
149 fn is_stale_lock(lock_info: &str) -> bool {
150 let meta = Self::parse_lock_metadata(lock_info);
151
152 if let Some(pid) = meta.pid {
153 let probe = process_id::probe_process(pid);
154 if !probe.exists {
155 return true;
156 }
157
158 if let (Some(expected), Some(actual)) = (
159 meta.pid_start_token.as_deref(),
160 probe.start_token.as_deref(),
161 ) {
162 return expected != actual;
163 }
164
165 return false;
166 }
167
168 true
171 }
172}
173
174impl Drop for LockStorage {
175 fn drop(&mut self) {
176 let _ = fs::remove_file(&self.path);
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::LockStorage;
183 use crate::utils::platform::process_id;
184 use std::{
185 fs,
186 path::PathBuf,
187 thread,
188 time::{Duration, SystemTime, UNIX_EPOCH},
189 };
190
191 fn unique_lock_path(name: &str) -> PathBuf {
192 let nanos = SystemTime::now()
193 .duration_since(UNIX_EPOCH)
194 .map(|d| d.as_nanos())
195 .unwrap_or(0);
196 std::env::temp_dir()
197 .join(format!("upstream-lock-test-{name}-{nanos}"))
198 .join("metadata")
199 .join("lock")
200 }
201
202 #[test]
203 fn lock_waits_for_concurrent_acquire_to_finish() {
204 let lock_path = unique_lock_path("concurrent");
205 let guard = LockStorage::acquire_at(&lock_path, "test").expect("first lock");
206 let release_path = lock_path.clone();
207
208 let releaser = thread::spawn(move || {
209 thread::sleep(Duration::from_millis(100));
210 drop(guard);
211 });
212
213 let _guard = LockStorage::acquire_at(&lock_path, "test").expect("lock after wait");
214 releaser.join().expect("join releaser");
215
216 let _ = fs::remove_dir_all(release_path.parent().unwrap().parent().unwrap());
217 }
218
219 #[test]
220 fn lock_releases_on_drop() {
221 let lock_path = unique_lock_path("release");
222 {
223 let _guard = LockStorage::acquire_at(&lock_path, "test").expect("first lock");
224 }
225
226 let _guard2 = LockStorage::acquire_at(&lock_path, "test").expect("lock after drop");
227
228 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
229 }
230
231 #[test]
232 fn stale_lock_is_recovered_automatically() {
233 let lock_path = unique_lock_path("stale-recover");
234 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
235 fs::write(
237 &lock_path,
238 "pid=999999\noperation=test\nstarted_at_unix=1\n",
239 )
240 .expect("write stale lock");
241
242 let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
243 let contents = fs::read_to_string(&lock_path).expect("read lock");
244 assert!(contents.contains("operation=new-op"));
245
246 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
247 }
248
249 #[test]
250 fn parse_lock_metadata_extracts_known_fields() {
251 let meta = LockStorage::parse_lock_metadata(
252 "pid=123\npid_start_token=abc123\noperation=upgrade\nstarted_at_unix=456\nunknown=ignored\n",
253 );
254 assert_eq!(meta.pid, Some(123));
255 assert_eq!(meta.pid_start_token.as_deref(), Some("abc123"));
256 assert_eq!(meta.operation.as_deref(), Some("upgrade"));
257 assert_eq!(meta.started_at_unix, Some(456));
258 }
259
260 #[test]
261 fn active_lock_still_blocks_second_acquire() {
262 let lock_path = unique_lock_path("active-block");
263 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
264 let current_pid = std::process::id();
265 let now = SystemTime::now()
266 .duration_since(UNIX_EPOCH)
267 .map(|d| d.as_secs())
268 .unwrap_or(0);
269 fs::write(
270 &lock_path,
271 format!("pid={current_pid}\noperation=test\nstarted_at_unix={now}\n"),
272 )
273 .expect("write active lock");
274
275 let outcome =
276 LockStorage::try_acquire_at_internal(&lock_path, "next-op", true).expect("try acquire");
277 assert!(matches!(outcome, super::AcquireOutcome::Waiting));
278
279 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
280 }
281
282 #[test]
283 fn active_lock_is_not_recovered_even_if_timestamp_is_old() {
284 let lock_path = unique_lock_path("active-old");
285 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
286 let current_pid = std::process::id();
287 fs::write(
288 &lock_path,
289 format!("pid={current_pid}\noperation=test\nstarted_at_unix=1\n"),
290 )
291 .expect("write active lock with old timestamp");
292
293 let outcome =
294 LockStorage::try_acquire_at_internal(&lock_path, "next-op", true).expect("try acquire");
295 assert!(matches!(outcome, super::AcquireOutcome::Waiting));
296
297 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
298 }
299
300 #[test]
301 fn lock_without_pid_is_recovered_automatically() {
302 let lock_path = unique_lock_path("missing-pid");
303 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
304 fs::write(&lock_path, "operation=test\nstarted_at_unix=1\n").expect("write stale lock");
305
306 let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
307 let contents = fs::read_to_string(&lock_path).expect("read lock");
308 assert!(contents.contains("operation=new-op"));
309
310 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
311 }
312
313 #[test]
314 fn lock_with_malformed_pid_is_recovered_automatically() {
315 let lock_path = unique_lock_path("malformed-pid");
316 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
317 fs::write(&lock_path, "pid=abc\noperation=test\nstarted_at_unix=1\n")
318 .expect("write stale lock");
319
320 let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
321 let contents = fs::read_to_string(&lock_path).expect("read lock");
322 assert!(contents.contains("operation=new-op"));
323
324 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
325 }
326
327 #[test]
328 fn stale_lock_is_recovered_when_pid_start_token_mismatches() {
329 let Some(identity) = process_id::current_process_identity() else {
330 return;
331 };
332
333 let lock_path = unique_lock_path("token-mismatch");
334 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
335 let current_pid = std::process::id();
336 fs::write(
337 &lock_path,
338 format!(
339 "pid={current_pid}\npid_start_token={}-mismatch\noperation=test\nstarted_at_unix=1\n",
340 identity.start_token
341 ),
342 )
343 .expect("write mismatched lock");
344
345 let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
346 let contents = fs::read_to_string(&lock_path).expect("read lock");
347 assert!(contents.contains("operation=new-op"));
348
349 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
350 }
351}