Skip to main content

omne_cli/
ulid.rs

1#![allow(dead_code)]
2
3//! Monotonic ULID allocator backed by an advisory file lock.
4//!
5//! `allocate(lock_path)` returns a `Ulid` strictly greater than every
6//! prior ULID minted at the same lock path, across processes. The
7//! `run_id = <pipe>-<lowercase-ulid>` composition (plan R4) lives at
8//! the call site; this module returns the raw `Ulid`.
9//!
10//! Cross-process ordering uses `fs2` advisory exclusive locks on the
11//! lock file itself. Default acquire budget is 5s; exceeding yields
12//! `Error::LockTimeout { path }` (surfaced by `CliError::UlidLockTimeout`).
13//!
14//! Monotonicity under clock skew: if `Ulid::new()` produces a value not
15//! strictly greater than the persisted ULID (host clock ran backward, or
16//! two calls within the same millisecond happen to land with a smaller
17//! random field), we bump the persisted value via `Ulid::increment()` and
18//! return that instead.
19
20use std::fs::{File, OpenOptions};
21use std::io::{Read, Seek, SeekFrom, Write};
22use std::path::{Path, PathBuf};
23use std::str::FromStr;
24use std::thread;
25use std::time::{Duration, Instant};
26
27use fs2::FileExt;
28use thiserror::Error;
29use ulid::Ulid;
30
31/// Default budget for acquiring the advisory lock on the lock file.
32pub const DEFAULT_LOCK_TIMEOUT: Duration = Duration::from_secs(5);
33
34const LOCK_POLL_INTERVAL: Duration = Duration::from_millis(25);
35
36#[derive(Debug, Error)]
37pub enum Error {
38    #[error("ULID lock {path} was not released within the acquire budget")]
39    LockTimeout { path: PathBuf },
40
41    #[error("ULID lock {path} I/O error: {source}")]
42    Io {
43        path: PathBuf,
44        #[source]
45        source: std::io::Error,
46    },
47
48    #[error("ULID lock {path} holds a malformed value: {value:?}")]
49    MalformedState { path: PathBuf, value: String },
50}
51
52/// Allocate a strictly-monotonic ULID using the default 5s lock budget.
53pub fn allocate(lock_path: &Path) -> Result<Ulid, Error> {
54    allocate_with_timeout(lock_path, DEFAULT_LOCK_TIMEOUT)
55}
56
57/// Allocate with an explicit lock-acquisition timeout. Used by tests.
58pub fn allocate_with_timeout(lock_path: &Path, timeout: Duration) -> Result<Ulid, Error> {
59    if let Some(parent) = lock_path.parent() {
60        if !parent.as_os_str().is_empty() {
61            std::fs::create_dir_all(parent).map_err(|source| Error::Io {
62                path: lock_path.to_path_buf(),
63                source,
64            })?;
65        }
66    }
67
68    let file = OpenOptions::new()
69        .create(true)
70        .read(true)
71        .write(true)
72        .truncate(false)
73        .open(lock_path)
74        .map_err(|source| Error::Io {
75            path: lock_path.to_path_buf(),
76            source,
77        })?;
78
79    acquire_lock(&file, lock_path, timeout)?;
80
81    let result = mint(&file, lock_path);
82
83    // Release regardless of mint outcome; ignore unlock errors — a dropped
84    // file handle releases the advisory lock anyway on every platform fs2
85    // supports.
86    let _ = FileExt::unlock(&file);
87
88    result
89}
90
91fn acquire_lock(file: &File, lock_path: &Path, timeout: Duration) -> Result<(), Error> {
92    // `WouldBlock` is the Unix mapping of EWOULDBLOCK; Windows instead
93    // returns ERROR_LOCK_VIOLATION (OS code 33) which Rust classifies as
94    // `Uncategorized`. `fs2::lock_contended_error()` is the canonical
95    // "lock held" sentinel — we compare `raw_os_error()` against it to
96    // stay portable.
97    let contended_os = fs2::lock_contended_error().raw_os_error();
98    let deadline = Instant::now() + timeout;
99    loop {
100        // Check the deadline before every lock attempt (and before any
101        // sleep), so the worst-case overshoot is bounded by the syscall
102        // duration of one `try_lock_exclusive`, not by an additional
103        // `LOCK_POLL_INTERVAL` of sleep that fires after the budget
104        // already expired.
105        if Instant::now() >= deadline {
106            return Err(Error::LockTimeout {
107                path: lock_path.to_path_buf(),
108            });
109        }
110        match FileExt::try_lock_exclusive(file) {
111            Ok(()) => return Ok(()),
112            Err(e)
113                if e.kind() == std::io::ErrorKind::WouldBlock
114                    || e.raw_os_error() == contended_os =>
115            {
116                // Sleep for the smaller of the poll interval and the
117                // remaining budget so the next iteration's deadline
118                // check fires close to the requested timeout.
119                let remaining = deadline.saturating_duration_since(Instant::now());
120                thread::sleep(LOCK_POLL_INTERVAL.min(remaining));
121            }
122            Err(source) => {
123                return Err(Error::Io {
124                    path: lock_path.to_path_buf(),
125                    source,
126                });
127            }
128        }
129    }
130}
131
132fn mint(mut file: &File, lock_path: &Path) -> Result<Ulid, Error> {
133    let prior = read_state(&mut file, lock_path)?;
134    let candidate = Ulid::new();
135    let next = match prior {
136        Some(p) if candidate <= p => p.increment().ok_or_else(|| Error::MalformedState {
137            path: lock_path.to_path_buf(),
138            value: "ULID space exhausted at persisted value".to_string(),
139        })?,
140        _ => candidate,
141    };
142    write_state(&mut file, lock_path, next)?;
143    Ok(next)
144}
145
146fn read_state(file: &mut &File, lock_path: &Path) -> Result<Option<Ulid>, Error> {
147    let mut buf = String::new();
148    (*file)
149        .seek(SeekFrom::Start(0))
150        .map_err(|source| Error::Io {
151            path: lock_path.to_path_buf(),
152            source,
153        })?;
154    (*file)
155        .read_to_string(&mut buf)
156        .map_err(|source| Error::Io {
157            path: lock_path.to_path_buf(),
158            source,
159        })?;
160    let trimmed = buf.trim();
161    if trimmed.is_empty() {
162        return Ok(None);
163    }
164    Ulid::from_str(trimmed)
165        .map(Some)
166        .map_err(|_| Error::MalformedState {
167            path: lock_path.to_path_buf(),
168            value: trimmed.to_string(),
169        })
170}
171
172fn write_state(file: &mut &File, lock_path: &Path, ulid: Ulid) -> Result<(), Error> {
173    // ULIDs serialize to a fixed 26-character ASCII string. We
174    // overwrite in place from offset 0 without truncating first; that
175    // means a torn write — kill mid-`write_all` — leaves the file
176    // either with the prior 26-character ULID intact (if the write
177    // started but no bytes hit disk) or with a partial ULID prefix +
178    // tail of the prior ULID (`MalformedState` on next read,
179    // recoverable by deleting `.ulid-last`). Truncating first would
180    // turn the same crash into an empty file that the next caller
181    // mis-reads as "first allocation" — silently losing monotonicity.
182    // Constant-length overwrite keeps the failure mode loud.
183    (*file)
184        .seek(SeekFrom::Start(0))
185        .map_err(|source| Error::Io {
186            path: lock_path.to_path_buf(),
187            source,
188        })?;
189    let bytes = ulid.to_string();
190    debug_assert_eq!(bytes.len(), 26, "ULID string must be exactly 26 chars");
191    (*file)
192        .write_all(bytes.as_bytes())
193        .map_err(|source| Error::Io {
194            path: lock_path.to_path_buf(),
195            source,
196        })?;
197    (*file).flush().map_err(|source| Error::Io {
198        path: lock_path.to_path_buf(),
199        source,
200    })
201}