use std::{
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use chrono::{DateTime, Utc};
use thiserror::Error;
use tokio::task::JoinHandle;
use crate::supertable::wal::{
persistence::{Etag, WalStore, WalStoreError},
state_doc::{Lease, SupertableHandleId, WalId, WalState, WalStateDoc},
};
pub const DEFAULT_LEASE_DURATION: Duration = Duration::from_secs(60);
pub const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);
#[derive(Debug, Error)]
pub enum LeaseError {
#[error("lease held by another owner ({held_owner:?}) until {held_expires_at}; cannot acquire")]
Conflict {
held_owner: SupertableHandleId,
held_expires_at: DateTime<Utc>,
},
#[error("WAL preempted: lease owner is now {actual_owner:?}, not {expected_owner:?}")]
Preempted {
expected_owner: SupertableHandleId,
actual_owner: SupertableHandleId,
},
#[error("WAL has no lease; expected owner {expected_owner:?}")]
LeaseMissing { expected_owner: SupertableHandleId },
#[error("etag CAS lost; WAL state doc was updated by another writer")]
CasLost,
#[error("lease op invalid on WAL in state {state:?}")]
InvalidPreState { state: WalState },
#[error("wal store error: {0}")]
WalStore(#[from] WalStoreError),
}
pub async fn try_acquire(
store: &WalStore,
wal_id: WalId,
owner: SupertableHandleId,
now: DateTime<Utc>,
lease_duration: Duration,
) -> Result<(WalStateDoc, Etag), LeaseError> {
let (mut doc, etag) = store.read(wal_id).await?;
if doc.state == WalState::Complete {
return Err(LeaseError::InvalidPreState { state: doc.state });
}
if let Some(existing) = &doc.lease
&& existing.expires_at > now
&& existing.owner != owner
{
return Err(LeaseError::Conflict {
held_owner: existing.owner,
held_expires_at: existing.expires_at,
});
}
let expires_at = now + chrono::Duration::from_std(lease_duration).unwrap_or_default();
doc.lease = Some(Lease {
owner,
acquired_at: now,
expires_at,
});
match store.update_with_etag(wal_id, &etag, &doc).await {
Ok(new_etag) => Ok((doc, new_etag)),
Err(WalStoreError::CasFailed { .. }) => Err(LeaseError::CasLost),
Err(other) => Err(other.into()),
}
}
pub async fn try_heartbeat(
store: &WalStore,
wal_id: WalId,
owner: SupertableHandleId,
now: DateTime<Utc>,
lease_duration: Duration,
) -> Result<(WalStateDoc, Etag), LeaseError> {
let (mut doc, etag) = store.read(wal_id).await?;
match &doc.lease {
None => {
return Err(LeaseError::LeaseMissing {
expected_owner: owner,
});
}
Some(existing) if existing.owner != owner => {
return Err(LeaseError::Preempted {
expected_owner: owner,
actual_owner: existing.owner,
});
}
Some(_) => {}
}
let expires_at = now + chrono::Duration::from_std(lease_duration).unwrap_or_default();
if let Some(existing) = doc.lease.as_mut() {
existing.expires_at = expires_at;
}
match store.update_with_etag(wal_id, &etag, &doc).await {
Ok(new_etag) => Ok((doc, new_etag)),
Err(WalStoreError::CasFailed { .. }) => Err(LeaseError::CasLost),
Err(other) => Err(other.into()),
}
}
pub async fn try_release(
store: &WalStore,
wal_id: WalId,
owner: SupertableHandleId,
) -> Result<(WalStateDoc, Etag), LeaseError> {
let (mut doc, etag) = store.read(wal_id).await?;
match &doc.lease {
None => {
return Err(LeaseError::LeaseMissing {
expected_owner: owner,
});
}
Some(existing) if existing.owner != owner => {
return Err(LeaseError::Preempted {
expected_owner: owner,
actual_owner: existing.owner,
});
}
Some(_) => {}
}
doc.lease = None;
match store.update_with_etag(wal_id, &etag, &doc).await {
Ok(new_etag) => Ok((doc, new_etag)),
Err(WalStoreError::CasFailed { .. }) => Err(LeaseError::CasLost),
Err(other) => Err(other.into()),
}
}
#[derive(Debug)]
pub struct ProgressTracker {
last_progress_at_unix_ms: AtomicU64,
stop_requested: AtomicBool,
}
impl ProgressTracker {
pub fn new(now: SystemTime) -> Self {
Self {
last_progress_at_unix_ms: AtomicU64::new(unix_ms(now)),
stop_requested: AtomicBool::new(false),
}
}
pub fn mark_progress(&self) {
let now = unix_ms(SystemTime::now());
self.last_progress_at_unix_ms.store(now, Ordering::Relaxed);
}
pub fn stop_requested(&self) -> bool {
self.stop_requested.load(Ordering::Relaxed)
}
pub fn request_stop(&self) {
self.stop_requested.store(true, Ordering::Relaxed);
}
fn last_progress_at(&self) -> SystemTime {
let ms = self.last_progress_at_unix_ms.load(Ordering::Relaxed);
UNIX_EPOCH + Duration::from_millis(ms)
}
}
fn unix_ms(t: SystemTime) -> u64 {
t.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
#[derive(Debug)]
pub struct HeartbeatHandle {
join: Option<JoinHandle<()>>,
tracker: Arc<ProgressTracker>,
}
impl HeartbeatHandle {
pub async fn stop(mut self) {
self.tracker.request_stop();
if let Some(join) = self.join.take() {
let _ = join.await;
}
}
pub fn tracker(&self) -> Arc<ProgressTracker> {
Arc::clone(&self.tracker)
}
}
impl Drop for HeartbeatHandle {
fn drop(&mut self) {
self.tracker.request_stop();
if let Some(join) = self.join.take() {
join.abort();
}
}
}
pub fn spawn_heartbeat(
store: WalStore,
wal_id: WalId,
owner: SupertableHandleId,
lease_duration: Duration,
interval: Duration,
) -> HeartbeatHandle {
let tracker = Arc::new(ProgressTracker::new(SystemTime::now()));
let tracker_for_task = Arc::clone(&tracker);
let stuck_threshold = lease_duration / 2;
let join = tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.tick().await;
loop {
ticker.tick().await;
if tracker_for_task.stop_requested() {
return;
}
let last = tracker_for_task.last_progress_at();
if let Ok(elapsed) = SystemTime::now().duration_since(last)
&& elapsed > stuck_threshold
{
tracker_for_task.request_stop();
return;
}
match try_heartbeat(&store, wal_id, owner, Utc::now(), lease_duration).await {
Ok(_) => {}
Err(_) => {
tracker_for_task.request_stop();
return;
}
}
}
});
HeartbeatHandle {
join: Some(join),
tracker,
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use chrono::Duration as ChronoDuration;
use tempfile::TempDir;
use uuid::Uuid;
use super::*;
use crate::{
storage::{LocalFsStorageProvider, StorageProvider},
supertable::wal::state_doc::{
OpKind, RowId, SCHEMA_VERSION, SupertableHandleId, TombstoneEntry, TombstoneOutcome,
WalState,
},
};
fn sample_intent_wal(wal_id_v: i128) -> WalStateDoc {
WalStateDoc {
wal_id: WalId(wal_id_v),
schema_version: SCHEMA_VERSION,
op_kind: OpKind::Delete,
state: WalState::Intent,
created_at: Utc::now(),
lease: None,
predicate_repr: "test".into(),
target_ids: vec![RowId(1)],
new_row_count: None,
new_row_content_hash: None,
preallocated_superfile_id: None,
minted_id_spans: Vec::new(),
tombstone_progress: vec![TombstoneEntry {
target_id: RowId(1),
outcome: TombstoneOutcome::Pending,
tombstoned_in_superfile: None,
}],
}
}
fn fixture() -> (TempDir, WalStore) {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
(dir, WalStore::new(storage))
}
async fn put_wal(store: &WalStore, doc: &WalStateDoc) -> Etag {
store.create(doc).await.expect("create")
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn acquire_on_vacant_lease_succeeds_and_writes_owner() {
let (_dir, store) = fixture();
let doc = sample_intent_wal(1);
let _ = put_wal(&store, &doc).await;
let owner = SupertableHandleId(0x1111);
let now = Utc::now();
let (post, new_etag) = try_acquire(&store, doc.wal_id, owner, now, Duration::from_secs(30))
.await
.expect("acquire");
let lease = post.lease.expect("set");
assert_eq!(lease.owner, owner);
assert!(lease.expires_at > now);
let (read_back, read_etag) = store.read(doc.wal_id).await.expect("read");
assert_eq!(read_back.lease.expect("set").owner, owner);
assert_eq!(read_etag, new_etag);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn acquire_on_live_lease_returns_conflict() {
let (_dir, store) = fixture();
let mut doc = sample_intent_wal(2);
let now = Utc::now();
doc.lease = Some(Lease {
owner: SupertableHandleId(0xAAAA),
acquired_at: now,
expires_at: now + ChronoDuration::seconds(60),
});
let _ = put_wal(&store, &doc).await;
let err = try_acquire(
&store,
doc.wal_id,
SupertableHandleId(0xBBBB),
now,
Duration::from_secs(30),
)
.await
.expect_err("must conflict");
assert!(
matches!(err, LeaseError::Conflict { held_owner, .. } if held_owner == SupertableHandleId(0xAAAA))
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn acquire_on_expired_lease_preempts() {
let (_dir, store) = fixture();
let mut doc = sample_intent_wal(3);
let now = Utc::now();
doc.lease = Some(Lease {
owner: SupertableHandleId(0xAAAA),
acquired_at: now - ChronoDuration::seconds(120),
expires_at: now - ChronoDuration::seconds(10),
});
let _ = put_wal(&store, &doc).await;
let (post, _etag) = try_acquire(
&store,
doc.wal_id,
SupertableHandleId(0xBBBB),
now,
Duration::from_secs(30),
)
.await
.expect("expired → preempt");
assert_eq!(post.lease.expect("set").owner, SupertableHandleId(0xBBBB));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn acquire_re_takes_for_same_owner_renewing_expiry() {
let (_dir, store) = fixture();
let mut doc = sample_intent_wal(4);
let owner = SupertableHandleId(0xC0C0);
let now = Utc::now();
doc.lease = Some(Lease {
owner,
acquired_at: now - ChronoDuration::seconds(45),
expires_at: now + ChronoDuration::seconds(15),
});
let _ = put_wal(&store, &doc).await;
let (post, _etag) = try_acquire(&store, doc.wal_id, owner, now, Duration::from_secs(60))
.await
.expect("re-acquire");
let lease = post.lease.expect("set");
assert_eq!(lease.owner, owner);
let expected = now + ChronoDuration::seconds(60);
assert!((lease.expires_at - expected).num_milliseconds().abs() < 10);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn acquire_on_complete_wal_is_invalid_pre_state() {
let (_dir, store) = fixture();
let mut doc = sample_intent_wal(5);
doc.state = WalState::Complete;
let _ = put_wal(&store, &doc).await;
let err = try_acquire(
&store,
doc.wal_id,
SupertableHandleId(0xDEAD),
Utc::now(),
Duration::from_secs(30),
)
.await
.expect_err("must reject");
assert!(matches!(
err,
LeaseError::InvalidPreState {
state: WalState::Complete
}
));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn heartbeat_extends_expires_at() {
let (_dir, store) = fixture();
let mut doc = sample_intent_wal(6);
let owner = SupertableHandleId(0x10001);
let now = Utc::now();
doc.lease = Some(Lease {
owner,
acquired_at: now,
expires_at: now + ChronoDuration::seconds(20),
});
let _ = put_wal(&store, &doc).await;
let later = now + ChronoDuration::seconds(10);
let (post, _etag) =
try_heartbeat(&store, doc.wal_id, owner, later, Duration::from_secs(60))
.await
.expect("heartbeat");
let lease = post.lease.expect("still held");
let expected = later + ChronoDuration::seconds(60);
assert!((lease.expires_at - expected).num_milliseconds().abs() < 10);
assert_eq!(lease.owner, owner);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn heartbeat_on_preempted_lease_returns_preempted() {
let (_dir, store) = fixture();
let mut doc = sample_intent_wal(7);
let original = SupertableHandleId(0xAAAA);
let now = Utc::now();
doc.lease = Some(Lease {
owner: SupertableHandleId(0xBBBB),
acquired_at: now,
expires_at: now + ChronoDuration::seconds(60),
});
let _ = put_wal(&store, &doc).await;
let err = try_heartbeat(&store, doc.wal_id, original, now, Duration::from_secs(60))
.await
.expect_err("preempted");
assert!(matches!(
err,
LeaseError::Preempted {
expected_owner,
actual_owner,
} if expected_owner == original && actual_owner == SupertableHandleId(0xBBBB)
));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn heartbeat_on_cleared_lease_returns_lease_missing() {
let (_dir, store) = fixture();
let doc = sample_intent_wal(8);
let _ = put_wal(&store, &doc).await;
let err = try_heartbeat(
&store,
doc.wal_id,
SupertableHandleId(0xAAAA),
Utc::now(),
Duration::from_secs(60),
)
.await
.expect_err("no lease");
assert!(matches!(err, LeaseError::LeaseMissing { .. }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn release_clears_lease_for_matching_owner() {
let (_dir, store) = fixture();
let mut doc = sample_intent_wal(9);
let owner = SupertableHandleId(0xC0DE);
let now = Utc::now();
doc.lease = Some(Lease {
owner,
acquired_at: now,
expires_at: now + ChronoDuration::seconds(60),
});
let _ = put_wal(&store, &doc).await;
let (post, _etag) = try_release(&store, doc.wal_id, owner)
.await
.expect("release");
assert!(post.lease.is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn release_on_wrong_owner_returns_preempted() {
let (_dir, store) = fixture();
let mut doc = sample_intent_wal(10);
let now = Utc::now();
doc.lease = Some(Lease {
owner: SupertableHandleId(0xAAAA),
acquired_at: now,
expires_at: now + ChronoDuration::seconds(60),
});
let _ = put_wal(&store, &doc).await;
let err = try_release(&store, doc.wal_id, SupertableHandleId(0xBBBB))
.await
.expect_err("preempted");
assert!(matches!(err, LeaseError::Preempted { .. }));
}
#[test]
fn defaults_match_plan_constants() {
assert_eq!(DEFAULT_LEASE_DURATION, Duration::from_secs(60));
assert_eq!(DEFAULT_HEARTBEAT_INTERVAL, Duration::from_secs(10));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn heartbeat_extends_lease_while_worker_marks_progress() {
let (_dir, store) = fixture();
let mut doc = sample_intent_wal(20);
let owner = SupertableHandleId(0xC0FFEE);
let now = Utc::now();
doc.lease = Some(Lease {
owner,
acquired_at: now,
expires_at: now + ChronoDuration::seconds(1),
});
let _ = put_wal(&store, &doc).await;
let handle = spawn_heartbeat(
store.clone(),
doc.wal_id,
owner,
Duration::from_millis(800),
Duration::from_millis(100),
);
let tracker = handle.tracker();
for _ in 0..8 {
tokio::time::sleep(Duration::from_millis(50)).await;
tracker.mark_progress();
}
let (post, _etag) = store.read(doc.wal_id).await.expect("read");
let lease = post.lease.expect("still held");
assert_eq!(lease.owner, owner);
let elapsed_since_initial = lease.expires_at - now;
assert!(
elapsed_since_initial.num_milliseconds() > 500,
"expected heartbeat to extend lease past initial 1s budget; got {} ms",
elapsed_since_initial.num_milliseconds()
);
handle.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn heartbeat_signals_stop_when_worker_is_stuck() {
let (_dir, store) = fixture();
let mut doc = sample_intent_wal(21);
let owner = SupertableHandleId(0xDEAD_BEEF);
let now = Utc::now();
doc.lease = Some(Lease {
owner,
acquired_at: now,
expires_at: now + ChronoDuration::seconds(5),
});
let _ = put_wal(&store, &doc).await;
let handle = spawn_heartbeat(
store.clone(),
doc.wal_id,
owner,
Duration::from_millis(400),
Duration::from_millis(50),
);
let tracker = handle.tracker();
tokio::time::timeout(Duration::from_secs(2), async {
loop {
if tracker.stop_requested() {
break;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await
.expect("expected heartbeat to flip stop_requested on stuck worker");
handle.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn heartbeat_signals_stop_on_preemption() {
let (_dir, store) = fixture();
let mut doc = sample_intent_wal(22);
let original = SupertableHandleId(0xAAAA);
let now = Utc::now();
doc.lease = Some(Lease {
owner: original,
acquired_at: now,
expires_at: now + ChronoDuration::seconds(60),
});
let _ = put_wal(&store, &doc).await;
let handle = spawn_heartbeat(
store.clone(),
doc.wal_id,
original,
Duration::from_secs(60),
Duration::from_millis(50),
);
let tracker = handle.tracker();
let (mut current, etag) = store.read(doc.wal_id).await.expect("read");
current.lease = Some(Lease {
owner: SupertableHandleId(0xBBBB),
acquired_at: now,
expires_at: now + ChronoDuration::seconds(60),
});
let _ = store
.update_with_etag(doc.wal_id, &etag, ¤t)
.await
.expect("preempt cas");
for _ in 0..6 {
tracker.mark_progress();
tokio::time::sleep(Duration::from_millis(50)).await;
if tracker.stop_requested() {
break;
}
}
assert!(
tracker.stop_requested(),
"expected heartbeat to flip stop_requested on preemption"
);
handle.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn explicit_stop_winds_down_heartbeat_task() {
let (_dir, store) = fixture();
let mut doc = sample_intent_wal(23);
let owner = SupertableHandleId(0x4242);
let now = Utc::now();
doc.lease = Some(Lease {
owner,
acquired_at: now,
expires_at: now + ChronoDuration::seconds(60),
});
let _ = put_wal(&store, &doc).await;
let handle = spawn_heartbeat(
store.clone(),
doc.wal_id,
owner,
Duration::from_secs(60),
Duration::from_millis(50),
);
handle.stop().await;
}
#[test]
fn unused_uuid_smoke() {
let _ = Uuid::nil();
}
}