Skip to main content

mimir_core/
workspace_lock.rs

1//! Cross-process workspace write lock.
2//!
3//! The append-only log assumes one writer per workspace. This module
4//! provides a small filesystem lockfile guard that higher-level write
5//! surfaces can share before opening or writing a canonical log.
6
7use std::ffi::OsString;
8use std::fs::{self, File, OpenOptions};
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{SystemTime, UNIX_EPOCH};
13
14use thiserror::Error;
15
16static NEXT_LOCK_ID: AtomicU64 = AtomicU64::new(1);
17
18/// Exclusive write guard for one canonical log path.
19///
20/// The guard creates `<canonical-log>.lock` using `create_new(true)`,
21/// so acquisition is atomic across processes on local filesystems. The
22/// lockfile is removed when the guard drops. If a process crashes, the
23/// file can remain behind; operators should inspect and remove that
24/// stale file deliberately rather than have Mimir guess liveness.
25#[derive(Debug)]
26pub struct WorkspaceWriteLock {
27    path: PathBuf,
28    lock_id: String,
29    _file: File,
30}
31
32impl WorkspaceWriteLock {
33    /// Acquire the lock associated with `log_path`.
34    ///
35    /// # Errors
36    ///
37    /// Returns [`WorkspaceLockError::AlreadyHeld`] when another
38    /// holder's lockfile already exists, or
39    /// [`WorkspaceLockError::Io`] for filesystem failures creating,
40    /// writing, or syncing the lockfile.
41    pub fn acquire_for_log(log_path: impl AsRef<Path>) -> Result<Self, WorkspaceLockError> {
42        Self::acquire_for_log_with_owner(log_path, default_owner())
43    }
44
45    /// Acquire the lock associated with `log_path` and write an
46    /// operator-visible owner string into the lockfile.
47    ///
48    /// # Errors
49    ///
50    /// Same as [`Self::acquire_for_log`].
51    pub fn acquire_for_log_with_owner(
52        log_path: impl AsRef<Path>,
53        owner: impl AsRef<str>,
54    ) -> Result<Self, WorkspaceLockError> {
55        let log_path = log_path.as_ref();
56        let path = lock_path_for_log(log_path);
57        if let Some(parent) = parent_to_create(&path) {
58            fs::create_dir_all(parent).map_err(|source| WorkspaceLockError::Io {
59                path: parent.to_path_buf(),
60                source,
61            })?;
62        }
63
64        let mut file = match OpenOptions::new().write(true).create_new(true).open(&path) {
65            Ok(file) => file,
66            Err(source) if source.kind() == std::io::ErrorKind::AlreadyExists => {
67                return Err(WorkspaceLockError::AlreadyHeld { path });
68            }
69            Err(source) => {
70                return Err(WorkspaceLockError::Io { path, source });
71            }
72        };
73
74        let metadata = LockMetadata::new(log_path, owner.as_ref());
75        write_lock_metadata(&mut file, &metadata).map_err(|source| {
76            let _ = fs::remove_file(&path);
77            WorkspaceLockError::Io {
78                path: path.clone(),
79                source,
80            }
81        })?;
82        file.sync_all().map_err(|source| {
83            let _ = fs::remove_file(&path);
84            WorkspaceLockError::Io {
85                path: path.clone(),
86                source,
87            }
88        })?;
89
90        Ok(Self {
91            path,
92            lock_id: metadata.lock_id,
93            _file: file,
94        })
95    }
96
97    /// Filesystem path of the held lockfile.
98    #[must_use]
99    pub fn path(&self) -> &Path {
100        &self.path
101    }
102}
103
104impl Drop for WorkspaceWriteLock {
105    fn drop(&mut self) {
106        if lock_file_still_owned(&self.path, &self.lock_id) {
107            let _ = fs::remove_file(&self.path);
108        }
109    }
110}
111
112/// Error returned when a workspace write lock cannot be acquired.
113#[derive(Debug, Error)]
114pub enum WorkspaceLockError {
115    /// A lockfile already exists for this canonical log.
116    #[error("workspace write lock already held: {path}")]
117    AlreadyHeld {
118        /// Existing lockfile path.
119        path: PathBuf,
120    },
121
122    /// Filesystem failure while creating, writing, syncing, or
123    /// cleaning up a lockfile.
124    #[error("workspace write lock i/o failed at {path}: {source}")]
125    Io {
126        /// Path involved in the failing operation.
127        path: PathBuf,
128        /// Underlying I/O error.
129        #[source]
130        source: std::io::Error,
131    },
132}
133
134/// Return the lockfile path associated with `log_path`.
135///
136/// `canonical.log` maps to `canonical.log.lock`; paths without a file
137/// name use `.mimir-workspace.lock` inside that directory.
138#[must_use]
139pub fn lock_path_for_log(log_path: impl AsRef<Path>) -> PathBuf {
140    let log_path = log_path.as_ref();
141    let mut file_name = log_path
142        .file_name()
143        .map_or_else(|| OsString::from(".mimir-workspace"), OsString::from);
144    file_name.push(".lock");
145    match log_path.parent() {
146        Some(parent) => parent.join(file_name),
147        None => PathBuf::from(file_name),
148    }
149}
150
151struct LockMetadata {
152    lock_id: String,
153    owner: String,
154    pid: u32,
155    acquired_at_ms: u128,
156    log_path: PathBuf,
157}
158
159impl LockMetadata {
160    fn new(log_path: &Path, owner: &str) -> Self {
161        let acquired_at_ms = unix_time_millis();
162        let pid = std::process::id();
163        let sequence = NEXT_LOCK_ID.fetch_add(1, Ordering::Relaxed);
164        Self {
165            lock_id: format!("{pid}-{acquired_at_ms}-{sequence}"),
166            owner: owner.to_string(),
167            pid,
168            acquired_at_ms,
169            log_path: log_path.to_path_buf(),
170        }
171    }
172}
173
174fn write_lock_metadata(file: &mut File, metadata: &LockMetadata) -> Result<(), std::io::Error> {
175    writeln!(file, "lock_id={}", metadata.lock_id)?;
176    writeln!(file, "owner={}", metadata.owner)?;
177    writeln!(file, "pid={}", metadata.pid)?;
178    writeln!(file, "acquired_at_ms={}", metadata.acquired_at_ms)?;
179    writeln!(file, "log_path={}", metadata.log_path.display())?;
180    Ok(())
181}
182
183fn unix_time_millis() -> u128 {
184    SystemTime::now()
185        .duration_since(UNIX_EPOCH)
186        .map_or(0, |duration| duration.as_millis())
187}
188
189fn parent_to_create(path: &Path) -> Option<&Path> {
190    path.parent()
191        .filter(|parent| !parent.as_os_str().is_empty())
192}
193
194fn lock_file_still_owned(path: &Path, lock_id: &str) -> bool {
195    let Ok(contents) = fs::read_to_string(path) else {
196        return false;
197    };
198    let expected = format!("lock_id={lock_id}");
199    contents.lines().any(|line| line == expected)
200}
201
202fn default_owner() -> String {
203    std::env::args()
204        .next()
205        .unwrap_or_else(|| "mimir".to_string())
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    #[test]
213    fn lock_path_appends_lock_suffix() {
214        let path = lock_path_for_log("/tmp/canonical.log");
215        assert_eq!(path, PathBuf::from("/tmp/canonical.log.lock"));
216    }
217
218    #[test]
219    fn relative_lock_path_has_no_parent_to_create() {
220        assert_eq!(parent_to_create(Path::new("canonical.log.lock")), None);
221    }
222}