use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use crate::artifact::hex_encode;
use crate::kv::{KvError, KvReader, KvWriter, VersionToken, WatchCursor};
use crate::stores::KvStore;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LeaseRecord {
pub holder_id: String,
pub acquired_at_unix: u64,
pub expires_at_unix: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub completed_cursor_hex: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub completed_at_unix: Option<u64>,
}
pub struct ExportLease {
reader: Arc<dyn KvReader>,
writer: Arc<dyn KvWriter>,
key: String,
holder_id: String,
}
pub struct LeaseGuard {
writer: Arc<dyn KvWriter>,
key: String,
record: LeaseRecord,
version: VersionToken,
resolved: bool,
}
impl ExportLease {
pub fn new(
store: &dyn KvStore,
key: impl Into<String>,
holder_id: impl Into<String>,
) -> Result<Self, KvError> {
let writer = store.writer().ok_or_else(|| {
KvError::OperationFailed(format!(
"store {:?} has no writer; an export lease needs create/update",
store.name()
))
})?;
Ok(Self {
reader: store.reader(),
writer,
key: key.into(),
holder_id: holder_id.into(),
})
}
pub async fn try_acquire(&self, ttl: Duration) -> Result<Option<LeaseGuard>, KvError> {
let now = unix_now();
let record = LeaseRecord {
holder_id: self.holder_id.clone(),
acquired_at_unix: now,
expires_at_unix: now.saturating_add(ttl.as_secs()),
completed_cursor_hex: None,
completed_at_unix: None,
};
let bytes =
serde_json::to_vec(&record).map_err(|e| KvError::SerializationError(e.to_string()))?;
match self.writer.create(&self.key, &bytes).await {
Ok(version) => {
debug!(key = %self.key, holder = %self.holder_id, "export lease acquired (create)");
return Ok(Some(self.guard(record, version)));
}
Err(KvError::AlreadyExists) => {}
Err(e) => return Err(e),
}
let Some(entry) = self.reader.entry(&self.key).await? else {
return Ok(None);
};
if let Ok(existing) = serde_json::from_slice::<LeaseRecord>(&entry.value)
&& existing.expires_at_unix > now
{
return Ok(None);
}
match self.writer.update(&self.key, &bytes, &entry.version).await {
Ok(version) => {
debug!(key = %self.key, holder = %self.holder_id, "export lease acquired (takeover)");
Ok(Some(self.guard(record, version)))
}
Err(KvError::RevisionMismatch | KvError::AlreadyExists | KvError::KeyNotFound) => {
Ok(None)
}
Err(e) => Err(e),
}
}
pub async fn current(&self) -> Result<Option<LeaseRecord>, KvError> {
match self.reader.get(&self.key).await? {
Some(entry) => serde_json::from_slice(&entry.value).map(Some).map_err(|e| {
KvError::SerializationError(format!(
"lease key {:?} holds an unparseable value: {e}",
self.key
))
}),
None => Ok(None),
}
}
fn guard(&self, record: LeaseRecord, version: VersionToken) -> LeaseGuard {
LeaseGuard {
writer: Arc::clone(&self.writer),
key: self.key.clone(),
record,
version,
resolved: false,
}
}
}
impl LeaseGuard {
pub fn record(&self) -> &LeaseRecord {
&self.record
}
pub async fn abandon(mut self) {
self.resolved = true;
match self
.writer
.delete_with_version(&self.key, &self.version)
.await
{
Ok(_) => {
debug!(key = %self.key, holder = %self.record.holder_id, "export lease abandoned");
}
Err(e) => {
warn!(
key = %self.key,
holder = %self.record.holder_id,
error = %e,
"failed to abandon export lease; next round waits for expiry"
);
}
}
}
pub async fn complete(mut self, cursor: &WatchCursor) -> Result<(), KvError> {
self.resolved = true;
self.record.completed_cursor_hex = Some(hex_encode(cursor.version().as_bytes()));
self.record.completed_at_unix = Some(unix_now());
let bytes = serde_json::to_vec(&self.record)
.map_err(|e| KvError::SerializationError(e.to_string()))?;
match self.writer.update(&self.key, &bytes, &self.version).await {
Ok(_) => Ok(()),
Err(KvError::RevisionMismatch) => {
warn!(
key = %self.key,
holder = %self.record.holder_id,
"export round overran its lease; completion record skipped"
);
Ok(())
}
Err(e) => Err(e),
}
}
}
impl Drop for LeaseGuard {
fn drop(&mut self) {
if !self.resolved {
warn!(
key = %self.key,
holder = %self.record.holder_id,
"LeaseGuard dropped without complete() or abandon(); the fleet waits out the lease ttl"
);
}
}
}
fn unix_now() -> u64 {
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => d.as_secs(),
Err(_) => {
warn!(
"system clock predates the Unix epoch; lease expiry math degraded (expect duplicate export rounds until the clock is fixed)"
);
0
}
}
}