use std::fs::{self, File, OpenOptions};
use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use fs2::FileExt;
use serde::Serialize;
#[derive(Debug)]
pub enum LockError {
MarkerAlreadyExists {
path: PathBuf,
},
Contended {
path: PathBuf,
reason: String,
},
Io {
operation: &'static str,
path: PathBuf,
message: String,
},
}
impl std::fmt::Display for LockError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MarkerAlreadyExists { path } => {
write!(
f,
"active-store lock marker `{}` already exists; another restore or writer may hold the active store",
path.display()
)
}
Self::Contended { path, reason } => write!(
f,
"active-store lock marker `{}` exists and the OS exclusive lock is held: {reason}",
path.display()
),
Self::Io {
operation,
path,
message,
} => write!(
f,
"active-store lock {operation} failed for `{}`: {message}",
path.display()
),
}
}
}
impl std::error::Error for LockError {}
pub const MARKER_HEADER: &str = "cortex-restore-active-store-lock-v1";
pub const MARKER_SCOPE_PRODUCTION: &str = "production";
#[derive(Debug, Clone, Serialize)]
pub struct LockMarkerPayload {
pub deployment_id: String,
pub operator_principal_id: String,
pub restore_intent_blake3: String,
pub acquired_at: DateTime<Utc>,
pub host: String,
}
#[derive(Debug)]
pub struct ActiveStoreLockGuard {
lock_file: Option<File>,
path: PathBuf,
lock_companion_path: PathBuf,
#[allow(dead_code)]
payload: LockMarkerPayload,
leaked: bool,
}
pub const LOCK_COMPANION_SUFFIX: &str = ".flock";
impl ActiveStoreLockGuard {
pub fn acquire(path: &Path, payload: LockMarkerPayload) -> Result<Self, LockError> {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() && !parent.exists() {
return Err(LockError::Io {
operation: "marker parent directory probe",
path: parent.to_path_buf(),
message: "parent directory does not exist".to_string(),
});
}
}
let mut marker_file = match OpenOptions::new().write(true).create_new(true).open(path) {
Ok(file) => file,
Err(err) if err.kind() == ErrorKind::AlreadyExists => {
return Err(LockError::MarkerAlreadyExists {
path: path.to_path_buf(),
});
}
Err(err) => {
return Err(LockError::Io {
operation: "marker create_new",
path: path.to_path_buf(),
message: err.to_string(),
});
}
};
let body = render_marker_body(&payload);
if let Err(err) = marker_file
.write_all(body.as_bytes())
.and_then(|()| marker_file.sync_all())
{
drop(marker_file);
let _ = fs::remove_file(path);
return Err(LockError::Io {
operation: "marker body write+sync",
path: path.to_path_buf(),
message: err.to_string(),
});
}
drop(marker_file);
let lock_companion_path = make_companion_path(path);
let lock_file = match OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&lock_companion_path)
{
Ok(file) => file,
Err(err) => {
let _ = fs::remove_file(path);
return Err(LockError::Io {
operation: "lock companion open",
path: lock_companion_path,
message: err.to_string(),
});
}
};
if let Err(err) = FileExt::try_lock_exclusive(&lock_file) {
drop(lock_file);
let _ = fs::remove_file(&lock_companion_path);
let _ = fs::remove_file(path);
return Err(LockError::Contended {
path: lock_companion_path,
reason: err.to_string(),
});
}
Ok(Self {
lock_file: Some(lock_file),
path: path.to_path_buf(),
lock_companion_path,
payload,
leaked: false,
})
}
#[must_use]
pub fn marker_path(&self) -> &Path {
&self.path
}
#[must_use]
#[allow(dead_code)]
pub fn lock_companion_path(&self) -> &Path {
&self.lock_companion_path
}
#[must_use]
#[allow(dead_code)]
pub fn payload(&self) -> &LockMarkerPayload {
&self.payload
}
pub fn leak_for_rollback_failure(&mut self) {
self.leaked = true;
}
fn release_locked_handle(&mut self) {
if let Some(file) = self.lock_file.take() {
let _ = FileExt::unlock(&file);
drop(file);
}
}
}
impl Drop for ActiveStoreLockGuard {
fn drop(&mut self) {
self.release_locked_handle();
if !self.leaked {
let _ = fs::remove_file(&self.lock_companion_path);
let _ = fs::remove_file(&self.path);
}
}
}
fn render_marker_body(payload: &LockMarkerPayload) -> String {
format!(
"{header}\npid={pid}\nhost={host}\ndeployment_id={deployment_id}\noperator_principal_id={operator_principal_id}\nacquired_at={acquired_at}\nscope={scope}\nrestore_intent_blake3={intent_digest}\n",
header = MARKER_HEADER,
pid = std::process::id(),
host = payload.host,
deployment_id = payload.deployment_id,
operator_principal_id = payload.operator_principal_id,
acquired_at = payload.acquired_at.to_rfc3339(),
scope = MARKER_SCOPE_PRODUCTION,
intent_digest = payload.restore_intent_blake3,
)
}
fn make_companion_path(marker: &Path) -> PathBuf {
let mut companion = marker.as_os_str().to_os_string();
companion.push(LOCK_COMPANION_SUFFIX);
PathBuf::from(companion)
}
pub fn quarantine_stale_marker(path: &Path) -> Result<PathBuf, LockError> {
let timestamp = Utc::now().format("%Y%m%dT%H%M%SZ");
let stale_name = match path.file_name().and_then(|name| name.to_str()) {
Some(name) => format!("{name}.stale-{timestamp}"),
None => format!(".cortex-restore-active-store.lock.stale-{timestamp}"),
};
let stale_path = path
.parent()
.map(|parent| parent.join(&stale_name))
.unwrap_or_else(|| PathBuf::from(&stale_name));
fs::rename(path, &stale_path).map_err(|err| LockError::Io {
operation: "stale marker rename",
path: path.to_path_buf(),
message: err.to_string(),
})?;
Ok(stale_path)
}
pub fn read_marker_file(path: &Path) -> Result<Option<String>, LockError> {
match fs::read_to_string(path) {
Ok(text) => Ok(Some(text)),
Err(err) if err.kind() == ErrorKind::NotFound => Ok(None),
Err(err) => Err(LockError::Io {
operation: "marker read",
path: path.to_path_buf(),
message: err.to_string(),
}),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn fixture_payload() -> LockMarkerPayload {
LockMarkerPayload {
deployment_id: "dep-test-0001".to_string(),
operator_principal_id: "operator-test".to_string(),
restore_intent_blake3:
"blake3:0000000000000000000000000000000000000000000000000000000000000000"
.to_string(),
acquired_at: Utc::now(),
host: "test-host".to_string(),
}
}
#[test]
fn acquire_creates_marker_and_releases_on_drop() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("active-store.lock");
{
let guard = ActiveStoreLockGuard::acquire(&path, fixture_payload()).unwrap();
assert!(guard.marker_path().exists());
let body = fs::read_to_string(guard.marker_path()).unwrap();
assert!(body.starts_with(MARKER_HEADER));
assert!(body.contains("scope=production"));
assert!(body.contains("deployment_id=dep-test-0001"));
}
assert!(!path.exists(), "drop must clean the marker");
}
#[test]
fn second_acquire_fails_while_first_is_held() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("active-store.lock");
let _first = ActiveStoreLockGuard::acquire(&path, fixture_payload()).unwrap();
match ActiveStoreLockGuard::acquire(&path, fixture_payload()) {
Err(LockError::MarkerAlreadyExists { .. }) => {}
other => panic!("expected MarkerAlreadyExists, got {other:?}"),
}
}
#[test]
fn quarantine_renames_stale_marker() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("active-store.lock");
fs::write(&path, "stale-body").unwrap();
let stale = quarantine_stale_marker(&path).unwrap();
assert!(!path.exists());
assert!(stale.exists());
let text = fs::read_to_string(&stale).unwrap();
assert_eq!(text, "stale-body");
assert!(stale
.file_name()
.unwrap()
.to_string_lossy()
.contains(".stale-"));
}
#[test]
fn leak_keeps_marker_after_drop() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("active-store.lock");
let path_clone = path.clone();
{
let mut guard = ActiveStoreLockGuard::acquire(&path, fixture_payload()).unwrap();
guard.leak_for_rollback_failure();
}
assert!(
path_clone.exists(),
"leak_for_rollback_failure preserves marker"
);
let _ = fs::remove_file(path_clone);
}
}