Skip to main content

running_process/broker/server/
spawn_coordinator.rs

1//! Spawn coordination contract for broker-managed backends.
2//!
3//! This module does not launch child processes yet. It owns the state that
4//! Phase 4/5 launch code needs before spawning: per-backend-key budget windows,
5//! single-flight protection, retry-after hints for refused Hello replies, and
6//! process-wide file locks for backend spawn ownership.
7
8use std::collections::HashMap;
9use std::fs::{File, OpenOptions};
10use std::io;
11use std::path::{Path, PathBuf};
12use std::time::{Duration, Instant};
13
14use super::backend_registry::BackendKey;
15
16/// Default backend spawn attempts allowed per budget window.
17pub const DEFAULT_SPAWN_ATTEMPTS_PER_WINDOW: u32 = 3;
18
19/// Default backend spawn budget window.
20pub const DEFAULT_SPAWN_BUDGET_WINDOW: Duration = Duration::from_secs(30);
21
22/// Acquire the backend spawn lock at `path`.
23///
24/// The returned guard owns an exclusive OS file lock until it is dropped. The
25/// lock file is intentionally left in place on drop; ownership is attached to
26/// the open file handle, not to path existence.
27///
28/// On Unix and Windows this helper verifies file identity after taking the
29/// lock. If another coordinator deletes, renames, or recreates the lock file
30/// between open and lock acquisition, the helper refuses the stale handle with
31/// [`SpawnLockError::DeletedOrRecreated`]. On platforms where file identity is
32/// not available, callers must keep lock files in a trusted broker-owned
33/// directory and treat path deletion/recreation detection as best-effort.
34pub fn acquire_spawn_lock(path: impl AsRef<Path>) -> Result<SpawnLockGuard, SpawnLockError> {
35    acquire_spawn_lock_with_hook(path.as_ref(), |_, _| {})
36}
37
38fn acquire_spawn_lock_with_hook<F>(
39    path: &Path,
40    mut before_lock: F,
41) -> Result<SpawnLockGuard, SpawnLockError>
42where
43    F: FnMut(&Path, &File),
44{
45    let path_buf = path.to_path_buf();
46    let file = open_lock_file(path).map_err(|source| SpawnLockError::Open {
47        path: path_buf.clone(),
48        source,
49    })?;
50
51    before_lock(path, &file);
52
53    try_lock_file(&file).map_err(|source| {
54        if is_lock_conflict(&source) {
55            SpawnLockError::AlreadyLocked {
56                path: path_buf.clone(),
57            }
58        } else {
59            SpawnLockError::Lock {
60                path: path_buf.clone(),
61                source,
62            }
63        }
64    })?;
65
66    let opened_identity =
67        file_identity(&file).map_err(|source| lock_identity_error(&path_buf, &file, source))?;
68    let current_identity = match path_identity(path) {
69        Ok(identity) => identity,
70        Err(source) if source.kind() == io::ErrorKind::NotFound => {
71            let _ = try_unlock_file(&file);
72            return Err(SpawnLockError::DeletedOrRecreated {
73                path: path_buf,
74                opened_identity,
75                current_identity: None,
76            });
77        }
78        Err(source) => return Err(lock_identity_error(&path_buf, &file, source)),
79    };
80
81    if opened_identity != current_identity {
82        let _ = try_unlock_file(&file);
83        return Err(SpawnLockError::DeletedOrRecreated {
84            path: path_buf,
85            opened_identity,
86            current_identity,
87        });
88    }
89
90    Ok(SpawnLockGuard {
91        file,
92        path: path_buf,
93        identity: opened_identity,
94    })
95}
96
97fn lock_identity_error(path: &Path, file: &File, source: io::Error) -> SpawnLockError {
98    let _ = try_unlock_file(file);
99    SpawnLockError::Identity {
100        path: path.to_path_buf(),
101        source,
102    }
103}
104
105/// RAII guard for an acquired backend spawn lock.
106#[must_use = "dropping the guard releases the backend spawn lock immediately"]
107#[derive(Debug)]
108pub struct SpawnLockGuard {
109    file: File,
110    path: PathBuf,
111    identity: Option<SpawnLockFileIdentity>,
112}
113
114impl SpawnLockGuard {
115    /// Lock file path that was acquired.
116    pub fn path(&self) -> &Path {
117        &self.path
118    }
119
120    /// Platform file identity captured for the lock file, when available.
121    pub fn file_identity(&self) -> Option<SpawnLockFileIdentity> {
122        self.identity
123    }
124}
125
126impl Drop for SpawnLockGuard {
127    fn drop(&mut self) {
128        let _ = try_unlock_file(&self.file);
129    }
130}
131
132/// Stable identity for an opened lock file on platforms that expose it.
133#[derive(Clone, Copy, Debug, PartialEq, Eq)]
134pub struct SpawnLockFileIdentity {
135    /// Device, volume, or platform-equivalent file namespace.
136    pub device: u64,
137    /// Inode, file index, or platform-equivalent file number.
138    pub file: u64,
139}
140
141/// Errors returned while acquiring a backend spawn lock.
142#[derive(Debug, thiserror::Error)]
143pub enum SpawnLockError {
144    /// The lock path could not be opened or created.
145    #[error("failed to open backend spawn lock file {path}: {source}")]
146    Open {
147        /// Lock path.
148        path: PathBuf,
149        /// Underlying I/O error.
150        #[source]
151        source: io::Error,
152    },
153    /// Another broker worker already owns the lock.
154    #[error("backend spawn lock file {path} is already locked")]
155    AlreadyLocked {
156        /// Lock path.
157        path: PathBuf,
158    },
159    /// The platform lock operation failed for a reason other than contention.
160    #[error("failed to lock backend spawn lock file {path}: {source}")]
161    Lock {
162        /// Lock path.
163        path: PathBuf,
164        /// Underlying I/O error.
165        #[source]
166        source: io::Error,
167    },
168    /// The lock path no longer names the file that was locked.
169    #[error("backend spawn lock file {path} was deleted or recreated during acquisition")]
170    DeletedOrRecreated {
171        /// Lock path.
172        path: PathBuf,
173        /// Identity of the opened file handle.
174        opened_identity: Option<SpawnLockFileIdentity>,
175        /// Identity currently reachable through the lock path.
176        current_identity: Option<SpawnLockFileIdentity>,
177    },
178    /// File identity could not be read.
179    #[error("failed to verify backend spawn lock file identity for {path}: {source}")]
180    Identity {
181        /// Lock path.
182        path: PathBuf,
183        /// Underlying I/O error.
184        #[source]
185        source: io::Error,
186    },
187}
188
189/// Spawn-budget tuning.
190#[derive(Clone, Copy, Debug, PartialEq, Eq)]
191pub struct SpawnBudgetConfig {
192    /// Maximum spawn attempts in one window.
193    pub max_attempts: u32,
194    /// Window duration.
195    pub window: Duration,
196}
197
198impl SpawnBudgetConfig {
199    /// Build a config, clamping zero values to safe non-zero defaults.
200    pub fn new(max_attempts: u32, window: Duration) -> Self {
201        Self {
202            max_attempts: max_attempts.max(1),
203            window: if window.is_zero() {
204                Duration::from_millis(1)
205            } else {
206                window
207            },
208        }
209    }
210}
211
212impl Default for SpawnBudgetConfig {
213    fn default() -> Self {
214        Self {
215            max_attempts: DEFAULT_SPAWN_ATTEMPTS_PER_WINDOW,
216            window: DEFAULT_SPAWN_BUDGET_WINDOW,
217        }
218    }
219}
220
221/// Coordinates bounded spawn attempts for backend keys.
222#[derive(Debug)]
223pub struct SpawnCoordinator {
224    config: SpawnBudgetConfig,
225    states: HashMap<BackendKey, SpawnBudgetState>,
226}
227
228impl SpawnCoordinator {
229    /// Create an empty coordinator with default budget settings.
230    pub fn new() -> Self {
231        Self::with_config(SpawnBudgetConfig::default())
232    }
233
234    /// Create an empty coordinator with explicit budget settings.
235    pub fn with_config(config: SpawnBudgetConfig) -> Self {
236        Self {
237            config,
238            states: HashMap::new(),
239        }
240    }
241
242    /// Begin one spawn attempt for `key`.
243    ///
244    /// The returned permit is a contract token for the caller that will perform
245    /// the actual child-process launch in later slices. Call [`Self::finish`]
246    /// when that launch path succeeds or fails.
247    pub fn try_begin(
248        &mut self,
249        key: BackendKey,
250        now: Instant,
251    ) -> Result<SpawnPermit, SpawnBeginError> {
252        let state = self
253            .states
254            .entry(key.clone())
255            .or_insert_with(|| SpawnBudgetState::new(now));
256        state.refresh(now, self.config.window);
257
258        if state.in_flight {
259            return Err(SpawnBeginError::AlreadyInProgress);
260        }
261
262        if state.attempts_used >= self.config.max_attempts {
263            return Err(SpawnBeginError::BudgetExhausted {
264                retry_after: retry_after(state.window_started_at, now, self.config.window),
265                remaining: 0,
266            });
267        }
268
269        state.attempts_used += 1;
270        state.in_flight = true;
271        Ok(SpawnPermit {
272            key,
273            attempt_number: state.attempts_used,
274            remaining_after_begin: self.config.max_attempts - state.attempts_used,
275        })
276    }
277
278    /// Finish an in-flight spawn attempt.
279    pub fn finish(&mut self, key: &BackendKey, outcome: SpawnOutcome, now: Instant) {
280        let Some(state) = self.states.get_mut(key) else {
281            return;
282        };
283        state.refresh(now, self.config.window);
284        state.in_flight = false;
285        if outcome == SpawnOutcome::Success {
286            state.window_started_at = now;
287            state.attempts_used = 0;
288        }
289    }
290
291    /// Return the current budget snapshot for one backend key.
292    pub fn snapshot(&mut self, key: BackendKey, now: Instant) -> SpawnBudgetSnapshot {
293        let state = self
294            .states
295            .entry(key.clone())
296            .or_insert_with(|| SpawnBudgetState::new(now));
297        state.refresh(now, self.config.window);
298        snapshot_for(key, state, self.config, now)
299    }
300}
301
302impl Default for SpawnCoordinator {
303    fn default() -> Self {
304        Self::new()
305    }
306}
307
308/// Token returned for a spawn attempt that may proceed.
309#[derive(Clone, Debug, PartialEq, Eq)]
310pub struct SpawnPermit {
311    /// Backend key this permit covers.
312    pub key: BackendKey,
313    /// 1-based attempt number inside the current window.
314    pub attempt_number: u32,
315    /// Budget remaining after this attempt starts.
316    pub remaining_after_begin: u32,
317}
318
319/// Result of a spawn attempt.
320#[derive(Clone, Copy, Debug, PartialEq, Eq)]
321pub enum SpawnOutcome {
322    /// The backend process was launched and verified.
323    Success,
324    /// The backend process failed to launch or verify.
325    Failed,
326}
327
328/// Errors returned when a spawn attempt cannot begin.
329#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
330pub enum SpawnBeginError {
331    /// Another worker is already launching this backend key.
332    #[error("backend spawn already in progress")]
333    AlreadyInProgress,
334    /// The per-key spawn budget is exhausted.
335    #[error("backend spawn budget exhausted; retry after {retry_after:?}")]
336    BudgetExhausted {
337        /// Time until the budget window resets.
338        retry_after: Duration,
339        /// Remaining attempts, always zero for this variant.
340        remaining: u32,
341    },
342}
343
344/// Current budget state for metrics/admin snapshots.
345#[derive(Clone, Debug, PartialEq, Eq)]
346pub struct SpawnBudgetSnapshot {
347    /// Backend key this snapshot describes.
348    pub key: BackendKey,
349    /// Attempts used in the active window.
350    pub attempts_used: u32,
351    /// Attempts still available in the active window.
352    pub remaining: u32,
353    /// Whether a spawn is currently in flight.
354    pub in_flight: bool,
355    /// Retry-after hint when no attempts remain.
356    pub retry_after: Option<Duration>,
357}
358
359#[derive(Clone, Debug)]
360struct SpawnBudgetState {
361    window_started_at: Instant,
362    attempts_used: u32,
363    in_flight: bool,
364}
365
366impl SpawnBudgetState {
367    fn new(now: Instant) -> Self {
368        Self {
369            window_started_at: now,
370            attempts_used: 0,
371            in_flight: false,
372        }
373    }
374
375    fn refresh(&mut self, now: Instant, window: Duration) {
376        if elapsed_since(self.window_started_at, now) >= window {
377            self.window_started_at = now;
378            self.attempts_used = 0;
379            self.in_flight = false;
380        }
381    }
382}
383
384fn snapshot_for(
385    key: BackendKey,
386    state: &SpawnBudgetState,
387    config: SpawnBudgetConfig,
388    now: Instant,
389) -> SpawnBudgetSnapshot {
390    let remaining = config.max_attempts.saturating_sub(state.attempts_used);
391    SpawnBudgetSnapshot {
392        key,
393        attempts_used: state.attempts_used,
394        remaining,
395        in_flight: state.in_flight,
396        retry_after: (remaining == 0)
397            .then(|| retry_after(state.window_started_at, now, config.window)),
398    }
399}
400
401fn retry_after(window_started_at: Instant, now: Instant, window: Duration) -> Duration {
402    window.saturating_sub(elapsed_since(window_started_at, now))
403}
404
405fn elapsed_since(started_at: Instant, now: Instant) -> Duration {
406    now.checked_duration_since(started_at)
407        .unwrap_or(Duration::ZERO)
408}
409
410fn open_lock_file(path: &Path) -> io::Result<File> {
411    let mut options = OpenOptions::new();
412    options.read(true).write(true).create(true);
413    configure_lock_file_options(&mut options);
414    options.open(path)
415}
416
417#[cfg(unix)]
418fn configure_lock_file_options(options: &mut OpenOptions) {
419    use std::os::unix::fs::OpenOptionsExt;
420
421    options.mode(0o600);
422}
423
424#[cfg(windows)]
425fn configure_lock_file_options(options: &mut OpenOptions) {
426    use std::os::windows::fs::OpenOptionsExt;
427    use winapi::um::winnt::{FILE_SHARE_DELETE, FILE_SHARE_READ, FILE_SHARE_WRITE};
428
429    options.share_mode(FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE);
430}
431
432#[cfg(not(any(unix, windows)))]
433fn configure_lock_file_options(_options: &mut OpenOptions) {}
434
435#[cfg(unix)]
436fn try_lock_file(file: &File) -> io::Result<()> {
437    use std::os::unix::io::AsRawFd;
438
439    let result = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
440    if result == 0 {
441        Ok(())
442    } else {
443        Err(io::Error::last_os_error())
444    }
445}
446
447#[cfg(unix)]
448fn try_unlock_file(file: &File) -> io::Result<()> {
449    use std::os::unix::io::AsRawFd;
450
451    let result = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_UN) };
452    if result == 0 {
453        Ok(())
454    } else {
455        Err(io::Error::last_os_error())
456    }
457}
458
459#[cfg(unix)]
460fn is_lock_conflict(error: &io::Error) -> bool {
461    error.raw_os_error() == Some(libc::EWOULDBLOCK) || error.raw_os_error() == Some(libc::EAGAIN)
462}
463
464#[cfg(unix)]
465fn file_identity(file: &File) -> io::Result<Option<SpawnLockFileIdentity>> {
466    use std::os::unix::fs::MetadataExt;
467
468    let metadata = file.metadata()?;
469    Ok(Some(SpawnLockFileIdentity {
470        device: metadata.dev(),
471        file: metadata.ino(),
472    }))
473}
474
475#[cfg(unix)]
476fn path_identity(path: &Path) -> io::Result<Option<SpawnLockFileIdentity>> {
477    use std::os::unix::fs::MetadataExt;
478
479    let metadata = path.metadata()?;
480    Ok(Some(SpawnLockFileIdentity {
481        device: metadata.dev(),
482        file: metadata.ino(),
483    }))
484}
485
486#[cfg(windows)]
487fn try_lock_file(file: &File) -> io::Result<()> {
488    use std::mem;
489    use std::os::windows::io::AsRawHandle;
490    use winapi::um::fileapi::LockFileEx;
491    use winapi::um::minwinbase::{LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, OVERLAPPED};
492    use winapi::um::winnt::HANDLE;
493
494    let mut overlapped: OVERLAPPED = unsafe { mem::zeroed() };
495    let result = unsafe {
496        LockFileEx(
497            file.as_raw_handle() as HANDLE,
498            LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY,
499            0,
500            u32::MAX,
501            u32::MAX,
502            &mut overlapped,
503        )
504    };
505    if result == 0 {
506        Err(io::Error::last_os_error())
507    } else {
508        Ok(())
509    }
510}
511
512#[cfg(windows)]
513fn try_unlock_file(file: &File) -> io::Result<()> {
514    use std::mem;
515    use std::os::windows::io::AsRawHandle;
516    use winapi::um::fileapi::UnlockFileEx;
517    use winapi::um::minwinbase::OVERLAPPED;
518    use winapi::um::winnt::HANDLE;
519
520    let mut overlapped: OVERLAPPED = unsafe { mem::zeroed() };
521    let result = unsafe {
522        UnlockFileEx(
523            file.as_raw_handle() as HANDLE,
524            0,
525            u32::MAX,
526            u32::MAX,
527            &mut overlapped,
528        )
529    };
530    if result == 0 {
531        Err(io::Error::last_os_error())
532    } else {
533        Ok(())
534    }
535}
536
537#[cfg(windows)]
538fn is_lock_conflict(error: &io::Error) -> bool {
539    use winapi::shared::winerror::ERROR_LOCK_VIOLATION;
540
541    error.raw_os_error() == Some(ERROR_LOCK_VIOLATION as i32)
542}
543
544#[cfg(windows)]
545fn file_identity(file: &File) -> io::Result<Option<SpawnLockFileIdentity>> {
546    use std::mem::MaybeUninit;
547    use std::os::windows::io::AsRawHandle;
548    use winapi::um::fileapi::{GetFileInformationByHandle, BY_HANDLE_FILE_INFORMATION};
549    use winapi::um::winnt::HANDLE;
550
551    let mut info = MaybeUninit::<BY_HANDLE_FILE_INFORMATION>::uninit();
552    let result =
553        unsafe { GetFileInformationByHandle(file.as_raw_handle() as HANDLE, info.as_mut_ptr()) };
554    if result == 0 {
555        return Err(io::Error::last_os_error());
556    }
557
558    let info = unsafe { info.assume_init() };
559    Ok(Some(SpawnLockFileIdentity {
560        device: info.dwVolumeSerialNumber as u64,
561        file: ((info.nFileIndexHigh as u64) << 32) | info.nFileIndexLow as u64,
562    }))
563}
564
565#[cfg(windows)]
566fn path_identity(path: &Path) -> io::Result<Option<SpawnLockFileIdentity>> {
567    let mut options = OpenOptions::new();
568    options.read(true).write(true);
569    configure_lock_file_options(&mut options);
570    let file = options.open(path)?;
571    file_identity(&file)
572}
573
574#[cfg(not(any(unix, windows)))]
575fn try_lock_file(_file: &File) -> io::Result<()> {
576    Err(io::Error::new(
577        io::ErrorKind::Unsupported,
578        "backend spawn file locks are supported only on Unix and Windows",
579    ))
580}
581
582#[cfg(not(any(unix, windows)))]
583fn try_unlock_file(_file: &File) -> io::Result<()> {
584    Ok(())
585}
586
587#[cfg(not(any(unix, windows)))]
588fn is_lock_conflict(_error: &io::Error) -> bool {
589    false
590}
591
592#[cfg(not(any(unix, windows)))]
593fn file_identity(_file: &File) -> io::Result<Option<SpawnLockFileIdentity>> {
594    Ok(None)
595}
596
597#[cfg(not(any(unix, windows)))]
598fn path_identity(_path: &Path) -> io::Result<Option<SpawnLockFileIdentity>> {
599    Ok(None)
600}
601
602#[cfg(test)]
603mod tests {
604    use std::fs;
605
606    use super::*;
607
608    #[test]
609    #[cfg(any(unix, windows))]
610    fn acquire_spawn_lock_detects_lock_file_replacement_between_open_and_lock() {
611        let tmp = tempfile::tempdir().unwrap();
612        let lock_path = tmp.path().join("backend.spawn.lock");
613        let replaced_path = tmp.path().join("backend.spawn.lock.replaced");
614
615        let err = acquire_spawn_lock_with_hook(&lock_path, |path, _file| {
616            fs::rename(path, &replaced_path).unwrap();
617            fs::write(path, b"replacement lock file").unwrap();
618        })
619        .unwrap_err();
620
621        let SpawnLockError::DeletedOrRecreated {
622            path,
623            opened_identity: Some(opened_identity),
624            current_identity: Some(current_identity),
625        } = err
626        else {
627            panic!("expected deleted/recreated error, got {err:?}");
628        };
629
630        assert_eq!(path, lock_path);
631        assert_ne!(opened_identity, current_identity);
632
633        let _guard = acquire_spawn_lock(&lock_path).unwrap();
634    }
635}