1use std::collections::HashMap;
9use std::fs::{File, OpenOptions};
10use std::io;
11use std::path::{Path, PathBuf};
12use std::time::{Duration, Instant};
13
14use super::backend_registry::BackendKey;
15
16pub const DEFAULT_SPAWN_ATTEMPTS_PER_WINDOW: u32 = 3;
18
19pub const DEFAULT_SPAWN_BUDGET_WINDOW: Duration = Duration::from_secs(30);
21
22pub fn acquire_spawn_lock(path: impl AsRef<Path>) -> Result<SpawnLockGuard, SpawnLockError> {
35 acquire_spawn_lock_with_hook(path.as_ref(), |_, _| {})
36}
37
38fn acquire_spawn_lock_with_hook<F>(
39 path: &Path,
40 mut before_lock: F,
41) -> Result<SpawnLockGuard, SpawnLockError>
42where
43 F: FnMut(&Path, &File),
44{
45 let path_buf = path.to_path_buf();
46 let file = open_lock_file(path).map_err(|source| SpawnLockError::Open {
47 path: path_buf.clone(),
48 source,
49 })?;
50
51 before_lock(path, &file);
52
53 try_lock_file(&file).map_err(|source| {
54 if is_lock_conflict(&source) {
55 SpawnLockError::AlreadyLocked {
56 path: path_buf.clone(),
57 }
58 } else {
59 SpawnLockError::Lock {
60 path: path_buf.clone(),
61 source,
62 }
63 }
64 })?;
65
66 let opened_identity =
67 file_identity(&file).map_err(|source| lock_identity_error(&path_buf, &file, source))?;
68 let current_identity = match path_identity(path) {
69 Ok(identity) => identity,
70 Err(source) if source.kind() == io::ErrorKind::NotFound => {
71 let _ = try_unlock_file(&file);
72 return Err(SpawnLockError::DeletedOrRecreated {
73 path: path_buf,
74 opened_identity,
75 current_identity: None,
76 });
77 }
78 Err(source) => return Err(lock_identity_error(&path_buf, &file, source)),
79 };
80
81 if opened_identity != current_identity {
82 let _ = try_unlock_file(&file);
83 return Err(SpawnLockError::DeletedOrRecreated {
84 path: path_buf,
85 opened_identity,
86 current_identity,
87 });
88 }
89
90 Ok(SpawnLockGuard {
91 file,
92 path: path_buf,
93 identity: opened_identity,
94 })
95}
96
97fn lock_identity_error(path: &Path, file: &File, source: io::Error) -> SpawnLockError {
98 let _ = try_unlock_file(file);
99 SpawnLockError::Identity {
100 path: path.to_path_buf(),
101 source,
102 }
103}
104
105#[must_use = "dropping the guard releases the backend spawn lock immediately"]
107#[derive(Debug)]
108pub struct SpawnLockGuard {
109 file: File,
110 path: PathBuf,
111 identity: Option<SpawnLockFileIdentity>,
112}
113
114impl SpawnLockGuard {
115 pub fn path(&self) -> &Path {
117 &self.path
118 }
119
120 pub fn file_identity(&self) -> Option<SpawnLockFileIdentity> {
122 self.identity
123 }
124}
125
126impl Drop for SpawnLockGuard {
127 fn drop(&mut self) {
128 let _ = try_unlock_file(&self.file);
129 }
130}
131
132#[derive(Clone, Copy, Debug, PartialEq, Eq)]
134pub struct SpawnLockFileIdentity {
135 pub device: u64,
137 pub file: u64,
139}
140
141#[derive(Debug, thiserror::Error)]
143pub enum SpawnLockError {
144 #[error("failed to open backend spawn lock file {path}: {source}")]
146 Open {
147 path: PathBuf,
149 #[source]
151 source: io::Error,
152 },
153 #[error("backend spawn lock file {path} is already locked")]
155 AlreadyLocked {
156 path: PathBuf,
158 },
159 #[error("failed to lock backend spawn lock file {path}: {source}")]
161 Lock {
162 path: PathBuf,
164 #[source]
166 source: io::Error,
167 },
168 #[error("backend spawn lock file {path} was deleted or recreated during acquisition")]
170 DeletedOrRecreated {
171 path: PathBuf,
173 opened_identity: Option<SpawnLockFileIdentity>,
175 current_identity: Option<SpawnLockFileIdentity>,
177 },
178 #[error("failed to verify backend spawn lock file identity for {path}: {source}")]
180 Identity {
181 path: PathBuf,
183 #[source]
185 source: io::Error,
186 },
187}
188
189#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub struct SpawnBudgetConfig {
192 pub max_attempts: u32,
194 pub window: Duration,
196}
197
198impl SpawnBudgetConfig {
199 pub fn new(max_attempts: u32, window: Duration) -> Self {
201 Self {
202 max_attempts: max_attempts.max(1),
203 window: if window.is_zero() {
204 Duration::from_millis(1)
205 } else {
206 window
207 },
208 }
209 }
210}
211
212impl Default for SpawnBudgetConfig {
213 fn default() -> Self {
214 Self {
215 max_attempts: DEFAULT_SPAWN_ATTEMPTS_PER_WINDOW,
216 window: DEFAULT_SPAWN_BUDGET_WINDOW,
217 }
218 }
219}
220
221#[derive(Debug)]
223pub struct SpawnCoordinator {
224 config: SpawnBudgetConfig,
225 states: HashMap<BackendKey, SpawnBudgetState>,
226}
227
228impl SpawnCoordinator {
229 pub fn new() -> Self {
231 Self::with_config(SpawnBudgetConfig::default())
232 }
233
234 pub fn with_config(config: SpawnBudgetConfig) -> Self {
236 Self {
237 config,
238 states: HashMap::new(),
239 }
240 }
241
242 pub fn try_begin(
248 &mut self,
249 key: BackendKey,
250 now: Instant,
251 ) -> Result<SpawnPermit, SpawnBeginError> {
252 let state = self
253 .states
254 .entry(key.clone())
255 .or_insert_with(|| SpawnBudgetState::new(now));
256 state.refresh(now, self.config.window);
257
258 if state.in_flight {
259 return Err(SpawnBeginError::AlreadyInProgress);
260 }
261
262 if state.attempts_used >= self.config.max_attempts {
263 return Err(SpawnBeginError::BudgetExhausted {
264 retry_after: retry_after(state.window_started_at, now, self.config.window),
265 remaining: 0,
266 });
267 }
268
269 state.attempts_used += 1;
270 state.in_flight = true;
271 Ok(SpawnPermit {
272 key,
273 attempt_number: state.attempts_used,
274 remaining_after_begin: self.config.max_attempts - state.attempts_used,
275 })
276 }
277
278 pub fn finish(&mut self, key: &BackendKey, outcome: SpawnOutcome, now: Instant) {
280 let Some(state) = self.states.get_mut(key) else {
281 return;
282 };
283 state.refresh(now, self.config.window);
284 state.in_flight = false;
285 if outcome == SpawnOutcome::Success {
286 state.window_started_at = now;
287 state.attempts_used = 0;
288 }
289 }
290
291 pub fn snapshot(&mut self, key: BackendKey, now: Instant) -> SpawnBudgetSnapshot {
293 let state = self
294 .states
295 .entry(key.clone())
296 .or_insert_with(|| SpawnBudgetState::new(now));
297 state.refresh(now, self.config.window);
298 snapshot_for(key, state, self.config, now)
299 }
300}
301
302impl Default for SpawnCoordinator {
303 fn default() -> Self {
304 Self::new()
305 }
306}
307
308#[derive(Clone, Debug, PartialEq, Eq)]
310pub struct SpawnPermit {
311 pub key: BackendKey,
313 pub attempt_number: u32,
315 pub remaining_after_begin: u32,
317}
318
319#[derive(Clone, Copy, Debug, PartialEq, Eq)]
321pub enum SpawnOutcome {
322 Success,
324 Failed,
326}
327
328#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
330pub enum SpawnBeginError {
331 #[error("backend spawn already in progress")]
333 AlreadyInProgress,
334 #[error("backend spawn budget exhausted; retry after {retry_after:?}")]
336 BudgetExhausted {
337 retry_after: Duration,
339 remaining: u32,
341 },
342}
343
344#[derive(Clone, Debug, PartialEq, Eq)]
346pub struct SpawnBudgetSnapshot {
347 pub key: BackendKey,
349 pub attempts_used: u32,
351 pub remaining: u32,
353 pub in_flight: bool,
355 pub retry_after: Option<Duration>,
357}
358
359#[derive(Clone, Debug)]
360struct SpawnBudgetState {
361 window_started_at: Instant,
362 attempts_used: u32,
363 in_flight: bool,
364}
365
366impl SpawnBudgetState {
367 fn new(now: Instant) -> Self {
368 Self {
369 window_started_at: now,
370 attempts_used: 0,
371 in_flight: false,
372 }
373 }
374
375 fn refresh(&mut self, now: Instant, window: Duration) {
376 if elapsed_since(self.window_started_at, now) >= window {
377 self.window_started_at = now;
378 self.attempts_used = 0;
379 self.in_flight = false;
380 }
381 }
382}
383
384fn snapshot_for(
385 key: BackendKey,
386 state: &SpawnBudgetState,
387 config: SpawnBudgetConfig,
388 now: Instant,
389) -> SpawnBudgetSnapshot {
390 let remaining = config.max_attempts.saturating_sub(state.attempts_used);
391 SpawnBudgetSnapshot {
392 key,
393 attempts_used: state.attempts_used,
394 remaining,
395 in_flight: state.in_flight,
396 retry_after: (remaining == 0)
397 .then(|| retry_after(state.window_started_at, now, config.window)),
398 }
399}
400
401fn retry_after(window_started_at: Instant, now: Instant, window: Duration) -> Duration {
402 window.saturating_sub(elapsed_since(window_started_at, now))
403}
404
405fn elapsed_since(started_at: Instant, now: Instant) -> Duration {
406 now.checked_duration_since(started_at)
407 .unwrap_or(Duration::ZERO)
408}
409
410fn open_lock_file(path: &Path) -> io::Result<File> {
411 let mut options = OpenOptions::new();
412 options.read(true).write(true).create(true);
413 configure_lock_file_options(&mut options);
414 options.open(path)
415}
416
417#[cfg(unix)]
418fn configure_lock_file_options(options: &mut OpenOptions) {
419 use std::os::unix::fs::OpenOptionsExt;
420
421 options.mode(0o600);
422}
423
424#[cfg(windows)]
425fn configure_lock_file_options(options: &mut OpenOptions) {
426 use std::os::windows::fs::OpenOptionsExt;
427 use winapi::um::winnt::{FILE_SHARE_DELETE, FILE_SHARE_READ, FILE_SHARE_WRITE};
428
429 options.share_mode(FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE);
430}
431
432#[cfg(not(any(unix, windows)))]
433fn configure_lock_file_options(_options: &mut OpenOptions) {}
434
435#[cfg(unix)]
436fn try_lock_file(file: &File) -> io::Result<()> {
437 use std::os::unix::io::AsRawFd;
438
439 let result = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
440 if result == 0 {
441 Ok(())
442 } else {
443 Err(io::Error::last_os_error())
444 }
445}
446
447#[cfg(unix)]
448fn try_unlock_file(file: &File) -> io::Result<()> {
449 use std::os::unix::io::AsRawFd;
450
451 let result = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_UN) };
452 if result == 0 {
453 Ok(())
454 } else {
455 Err(io::Error::last_os_error())
456 }
457}
458
459#[cfg(unix)]
460fn is_lock_conflict(error: &io::Error) -> bool {
461 error.raw_os_error() == Some(libc::EWOULDBLOCK) || error.raw_os_error() == Some(libc::EAGAIN)
462}
463
464#[cfg(unix)]
465fn file_identity(file: &File) -> io::Result<Option<SpawnLockFileIdentity>> {
466 use std::os::unix::fs::MetadataExt;
467
468 let metadata = file.metadata()?;
469 Ok(Some(SpawnLockFileIdentity {
470 device: metadata.dev(),
471 file: metadata.ino(),
472 }))
473}
474
475#[cfg(unix)]
476fn path_identity(path: &Path) -> io::Result<Option<SpawnLockFileIdentity>> {
477 use std::os::unix::fs::MetadataExt;
478
479 let metadata = path.metadata()?;
480 Ok(Some(SpawnLockFileIdentity {
481 device: metadata.dev(),
482 file: metadata.ino(),
483 }))
484}
485
486#[cfg(windows)]
487fn try_lock_file(file: &File) -> io::Result<()> {
488 use std::mem;
489 use std::os::windows::io::AsRawHandle;
490 use winapi::um::fileapi::LockFileEx;
491 use winapi::um::minwinbase::{LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, OVERLAPPED};
492 use winapi::um::winnt::HANDLE;
493
494 let mut overlapped: OVERLAPPED = unsafe { mem::zeroed() };
495 let result = unsafe {
496 LockFileEx(
497 file.as_raw_handle() as HANDLE,
498 LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY,
499 0,
500 u32::MAX,
501 u32::MAX,
502 &mut overlapped,
503 )
504 };
505 if result == 0 {
506 Err(io::Error::last_os_error())
507 } else {
508 Ok(())
509 }
510}
511
512#[cfg(windows)]
513fn try_unlock_file(file: &File) -> io::Result<()> {
514 use std::mem;
515 use std::os::windows::io::AsRawHandle;
516 use winapi::um::fileapi::UnlockFileEx;
517 use winapi::um::minwinbase::OVERLAPPED;
518 use winapi::um::winnt::HANDLE;
519
520 let mut overlapped: OVERLAPPED = unsafe { mem::zeroed() };
521 let result = unsafe {
522 UnlockFileEx(
523 file.as_raw_handle() as HANDLE,
524 0,
525 u32::MAX,
526 u32::MAX,
527 &mut overlapped,
528 )
529 };
530 if result == 0 {
531 Err(io::Error::last_os_error())
532 } else {
533 Ok(())
534 }
535}
536
537#[cfg(windows)]
538fn is_lock_conflict(error: &io::Error) -> bool {
539 use winapi::shared::winerror::ERROR_LOCK_VIOLATION;
540
541 error.raw_os_error() == Some(ERROR_LOCK_VIOLATION as i32)
542}
543
544#[cfg(windows)]
545fn file_identity(file: &File) -> io::Result<Option<SpawnLockFileIdentity>> {
546 use std::mem::MaybeUninit;
547 use std::os::windows::io::AsRawHandle;
548 use winapi::um::fileapi::{GetFileInformationByHandle, BY_HANDLE_FILE_INFORMATION};
549 use winapi::um::winnt::HANDLE;
550
551 let mut info = MaybeUninit::<BY_HANDLE_FILE_INFORMATION>::uninit();
552 let result =
553 unsafe { GetFileInformationByHandle(file.as_raw_handle() as HANDLE, info.as_mut_ptr()) };
554 if result == 0 {
555 return Err(io::Error::last_os_error());
556 }
557
558 let info = unsafe { info.assume_init() };
559 Ok(Some(SpawnLockFileIdentity {
560 device: info.dwVolumeSerialNumber as u64,
561 file: ((info.nFileIndexHigh as u64) << 32) | info.nFileIndexLow as u64,
562 }))
563}
564
565#[cfg(windows)]
566fn path_identity(path: &Path) -> io::Result<Option<SpawnLockFileIdentity>> {
567 let mut options = OpenOptions::new();
568 options.read(true).write(true);
569 configure_lock_file_options(&mut options);
570 let file = options.open(path)?;
571 file_identity(&file)
572}
573
574#[cfg(not(any(unix, windows)))]
575fn try_lock_file(_file: &File) -> io::Result<()> {
576 Err(io::Error::new(
577 io::ErrorKind::Unsupported,
578 "backend spawn file locks are supported only on Unix and Windows",
579 ))
580}
581
582#[cfg(not(any(unix, windows)))]
583fn try_unlock_file(_file: &File) -> io::Result<()> {
584 Ok(())
585}
586
587#[cfg(not(any(unix, windows)))]
588fn is_lock_conflict(_error: &io::Error) -> bool {
589 false
590}
591
592#[cfg(not(any(unix, windows)))]
593fn file_identity(_file: &File) -> io::Result<Option<SpawnLockFileIdentity>> {
594 Ok(None)
595}
596
597#[cfg(not(any(unix, windows)))]
598fn path_identity(_path: &Path) -> io::Result<Option<SpawnLockFileIdentity>> {
599 Ok(None)
600}
601
602#[cfg(test)]
603mod tests {
604 use std::fs;
605
606 use super::*;
607
608 #[test]
609 #[cfg(any(unix, windows))]
610 fn acquire_spawn_lock_detects_lock_file_replacement_between_open_and_lock() {
611 let tmp = tempfile::tempdir().unwrap();
612 let lock_path = tmp.path().join("backend.spawn.lock");
613 let replaced_path = tmp.path().join("backend.spawn.lock.replaced");
614
615 let err = acquire_spawn_lock_with_hook(&lock_path, |path, _file| {
616 fs::rename(path, &replaced_path).unwrap();
617 fs::write(path, b"replacement lock file").unwrap();
618 })
619 .unwrap_err();
620
621 let SpawnLockError::DeletedOrRecreated {
622 path,
623 opened_identity: Some(opened_identity),
624 current_identity: Some(current_identity),
625 } = err
626 else {
627 panic!("expected deleted/recreated error, got {err:?}");
628 };
629
630 assert_eq!(path, lock_path);
631 assert_ne!(opened_identity, current_identity);
632
633 let _guard = acquire_spawn_lock(&lock_path).unwrap();
634 }
635}