upstream_rs/services/storage/
lock_storage.rs1use anyhow::{Context, Result, anyhow};
2use std::{
3 fs::{self, OpenOptions},
4 io::{ErrorKind, Write},
5 path::{Path, PathBuf},
6 process, thread,
7 time::{Duration, SystemTime, UNIX_EPOCH},
8};
9
10use crate::utils::platform::process_id;
11use crate::utils::static_paths::UpstreamPaths;
12
13#[derive(Debug)]
14pub struct LockStorage {
15 path: PathBuf,
16}
17
18const LOCK_POLL_INTERVAL: Duration = Duration::from_secs(1);
19
20#[derive(Default, Debug)]
21struct LockMetadata {
22 pid: Option<u32>,
23 pid_start_token: Option<String>,
24 operation: Option<String>,
25 started_at_unix: Option<u64>,
26}
27
28enum AcquireOutcome {
29 Acquired(LockStorage),
30 Waiting,
31}
32
33impl LockStorage {
34 pub fn acquire(paths: &UpstreamPaths, operation: &str) -> Result<Self> {
35 let lock_path = paths.dirs.metadata_dir.join("lock");
36 Self::acquire_at(&lock_path, operation)
37 }
38
39 fn acquire_at(lock_path: &Path, operation: &str) -> Result<Self> {
40 let mut printed_wait_notice = false;
41
42 loop {
43 match Self::try_acquire_at_internal(lock_path, operation, true)? {
44 AcquireOutcome::Acquired(lock) => return Ok(lock),
45 AcquireOutcome::Waiting => {
46 if !printed_wait_notice {
47 eprintln!("Waiting for lock file...");
48 printed_wait_notice = true;
49 }
50 thread::sleep(LOCK_POLL_INTERVAL);
51 }
52 }
53 }
54 }
55
56 fn try_acquire_at_internal(
57 lock_path: &Path,
58 operation: &str,
59 allow_recovery: bool,
60 ) -> Result<AcquireOutcome> {
61 let lock_parent = lock_path
62 .parent()
63 .ok_or_else(|| anyhow!("Invalid lock path '{}'", lock_path.display()))?;
64
65 fs::create_dir_all(lock_parent).with_context(|| {
66 format!(
67 "Failed to create lock directory '{}'",
68 lock_parent.display()
69 )
70 })?;
71
72 let mut file = match OpenOptions::new()
73 .write(true)
74 .create_new(true)
75 .open(lock_path)
76 {
77 Ok(file) => file,
78 Err(err) if err.kind() == ErrorKind::AlreadyExists => {
79 let lock_info = fs::read_to_string(lock_path)
80 .unwrap_or_else(|_| "<lock details unavailable>".to_string());
81
82 if allow_recovery && Self::is_stale_lock(&lock_info) {
83 match fs::remove_file(lock_path) {
84 Ok(_) => {
85 return Self::try_acquire_at_internal(lock_path, operation, false);
86 }
87 Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {
88 return Self::try_acquire_at_internal(lock_path, operation, false);
89 }
90 Err(remove_err) => {
91 return Err(remove_err).context(format!(
92 "Lock at '{}' appears stale but could not be removed",
93 lock_path.display()
94 ));
95 }
96 }
97 }
98
99 return Ok(AcquireOutcome::Waiting);
100 }
101 Err(err) => {
102 return Err(err).with_context(|| {
103 format!("Failed to create lock file '{}'", lock_path.display())
104 });
105 }
106 };
107
108 let since_epoch = SystemTime::now()
109 .duration_since(UNIX_EPOCH)
110 .map(|d| d.as_secs())
111 .unwrap_or(0);
112 writeln!(file, "pid={}", process::id()).ok();
113 if let Some(identity) = process_id::current_process_identity() {
114 writeln!(file, "pid_start_token={}", identity.start_token).ok();
115 }
116 writeln!(file, "operation={}", operation).ok();
117 writeln!(file, "started_at_unix={}", since_epoch).ok();
118
119 Ok(AcquireOutcome::Acquired(Self {
120 path: lock_path.to_path_buf(),
121 }))
122 }
123
124 fn parse_lock_metadata(lock_info: &str) -> LockMetadata {
125 let mut meta = LockMetadata::default();
126 for raw_line in lock_info.lines() {
127 let line = raw_line.trim();
128 if let Some(value) = line.strip_prefix("pid=") {
129 meta.pid = value.trim().parse::<u32>().ok();
130 } else if let Some(value) = line.strip_prefix("pid_start_token=") {
131 let token = value.trim();
132 if !token.is_empty() {
133 meta.pid_start_token = Some(token.to_string());
134 }
135 } else if let Some(value) = line.strip_prefix("operation=") {
136 let op = value.trim();
137 if !op.is_empty() {
138 meta.operation = Some(op.to_string());
139 }
140 } else if let Some(value) = line.strip_prefix("started_at_unix=") {
141 meta.started_at_unix = value.trim().parse::<u64>().ok();
142 }
143 }
144 meta
145 }
146
147 fn is_stale_lock(lock_info: &str) -> bool {
148 let meta = Self::parse_lock_metadata(lock_info);
149
150 if let Some(pid) = meta.pid {
151 let probe = process_id::probe_process(pid);
152 if !probe.exists {
153 return true;
154 }
155
156 if let (Some(expected), Some(actual)) = (
157 meta.pid_start_token.as_deref(),
158 probe.start_token.as_deref(),
159 ) {
160 return expected != actual;
161 }
162
163 return false;
164 }
165
166 true
169 }
170}
171
172impl Drop for LockStorage {
173 fn drop(&mut self) {
174 let _ = fs::remove_file(&self.path);
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::LockStorage;
181 use crate::utils::platform::process_id;
182 use std::{
183 fs,
184 path::PathBuf,
185 thread,
186 time::{Duration, SystemTime, UNIX_EPOCH},
187 };
188
189 fn unique_lock_path(name: &str) -> PathBuf {
190 let nanos = SystemTime::now()
191 .duration_since(UNIX_EPOCH)
192 .map(|d| d.as_nanos())
193 .unwrap_or(0);
194 std::env::temp_dir()
195 .join(format!("upstream-lock-test-{name}-{nanos}"))
196 .join("metadata")
197 .join("lock")
198 }
199
200 #[test]
201 fn lock_waits_for_concurrent_acquire_to_finish() {
202 let lock_path = unique_lock_path("concurrent");
203 let guard = LockStorage::acquire_at(&lock_path, "test").expect("first lock");
204 let release_path = lock_path.clone();
205
206 let releaser = thread::spawn(move || {
207 thread::sleep(Duration::from_millis(100));
208 drop(guard);
209 });
210
211 let _guard = LockStorage::acquire_at(&lock_path, "test").expect("lock after wait");
212 releaser.join().expect("join releaser");
213
214 let _ = fs::remove_dir_all(release_path.parent().unwrap().parent().unwrap());
215 }
216
217 #[test]
218 fn lock_releases_on_drop() {
219 let lock_path = unique_lock_path("release");
220 {
221 let _guard = LockStorage::acquire_at(&lock_path, "test").expect("first lock");
222 }
223
224 let _guard2 = LockStorage::acquire_at(&lock_path, "test").expect("lock after drop");
225
226 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
227 }
228
229 #[test]
230 fn stale_lock_is_recovered_automatically() {
231 let lock_path = unique_lock_path("stale-recover");
232 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
233 fs::write(
235 &lock_path,
236 "pid=999999\noperation=test\nstarted_at_unix=1\n",
237 )
238 .expect("write stale lock");
239
240 let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
241 let contents = fs::read_to_string(&lock_path).expect("read lock");
242 assert!(contents.contains("operation=new-op"));
243
244 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
245 }
246
247 #[test]
248 fn parse_lock_metadata_extracts_known_fields() {
249 let meta = LockStorage::parse_lock_metadata(
250 "pid=123\npid_start_token=abc123\noperation=upgrade\nstarted_at_unix=456\nunknown=ignored\n",
251 );
252 assert_eq!(meta.pid, Some(123));
253 assert_eq!(meta.pid_start_token.as_deref(), Some("abc123"));
254 assert_eq!(meta.operation.as_deref(), Some("upgrade"));
255 assert_eq!(meta.started_at_unix, Some(456));
256 }
257
258 #[test]
259 fn active_lock_still_blocks_second_acquire() {
260 let lock_path = unique_lock_path("active-block");
261 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
262 let current_pid = std::process::id();
263 let now = SystemTime::now()
264 .duration_since(UNIX_EPOCH)
265 .map(|d| d.as_secs())
266 .unwrap_or(0);
267 fs::write(
268 &lock_path,
269 format!("pid={current_pid}\noperation=test\nstarted_at_unix={now}\n"),
270 )
271 .expect("write active lock");
272
273 let outcome =
274 LockStorage::try_acquire_at_internal(&lock_path, "next-op", true).expect("try acquire");
275 assert!(matches!(outcome, super::AcquireOutcome::Waiting));
276
277 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
278 }
279
280 #[test]
281 fn active_lock_is_not_recovered_even_if_timestamp_is_old() {
282 let lock_path = unique_lock_path("active-old");
283 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
284 let current_pid = std::process::id();
285 fs::write(
286 &lock_path,
287 format!("pid={current_pid}\noperation=test\nstarted_at_unix=1\n"),
288 )
289 .expect("write active lock with old timestamp");
290
291 let outcome =
292 LockStorage::try_acquire_at_internal(&lock_path, "next-op", true).expect("try acquire");
293 assert!(matches!(outcome, super::AcquireOutcome::Waiting));
294
295 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
296 }
297
298 #[test]
299 fn lock_without_pid_is_recovered_automatically() {
300 let lock_path = unique_lock_path("missing-pid");
301 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
302 fs::write(&lock_path, "operation=test\nstarted_at_unix=1\n").expect("write stale lock");
303
304 let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
305 let contents = fs::read_to_string(&lock_path).expect("read lock");
306 assert!(contents.contains("operation=new-op"));
307
308 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
309 }
310
311 #[test]
312 fn lock_with_malformed_pid_is_recovered_automatically() {
313 let lock_path = unique_lock_path("malformed-pid");
314 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
315 fs::write(&lock_path, "pid=abc\noperation=test\nstarted_at_unix=1\n")
316 .expect("write stale lock");
317
318 let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
319 let contents = fs::read_to_string(&lock_path).expect("read lock");
320 assert!(contents.contains("operation=new-op"));
321
322 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
323 }
324
325 #[test]
326 fn stale_lock_is_recovered_when_pid_start_token_mismatches() {
327 let Some(identity) = process_id::current_process_identity() else {
328 return;
329 };
330
331 let lock_path = unique_lock_path("token-mismatch");
332 fs::create_dir_all(lock_path.parent().expect("lock parent")).expect("create lock parent");
333 let current_pid = std::process::id();
334 fs::write(
335 &lock_path,
336 format!(
337 "pid={current_pid}\npid_start_token={}-mismatch\noperation=test\nstarted_at_unix=1\n",
338 identity.start_token
339 ),
340 )
341 .expect("write mismatched lock");
342
343 let _guard = LockStorage::acquire_at(&lock_path, "new-op").expect("recover stale lock");
344 let contents = fs::read_to_string(&lock_path).expect("read lock");
345 assert!(contents.contains("operation=new-op"));
346
347 let _ = fs::remove_dir_all(lock_path.parent().unwrap().parent().unwrap());
348 }
349}