slipstream/export_lease.rs
1//! Fleet-wide "export at most once per round" lease.
2//!
3//! Every replica of a fold runs the same checkpoint loop; without coordination,
4//! N nodes would produce N identical artifacts per round. [`ExportLease`] makes
5//! exactly one of them do the work: each candidate calls
6//! [`try_acquire`](ExportLease::try_acquire) when its trigger fires; one wins,
7//! the rest skip the round.
8//!
9//! ## Mechanism: CAS + embedded expiry — no TTL machinery
10//!
11//! The lease is a single KV key. Acquisition is a **create-only** write
12//! (`KvWriter::create`): exactly one caller fleet-wide can create a missing
13//! key, so the race has one winner by construction. The lease's lifetime is an
14//! `expires_at_unix` timestamp **inside the value** — not a server-side TTL —
15//! and an expired (or unparseable) lease is taken over with a CAS
16//! [`update`](crate::KvWriter::update) against the observed version, so the
17//! steal race also has exactly one winner.
18//!
19//! Embedding the expiry rather than using per-message TTL keeps the lease
20//! portable to any [`KvWriter`] backend and free of server-version/bucket-flag
21//! requirements. The cost is wall-clock comparison across nodes: with
22//! NTP-sane clocks and round periods measured in minutes, skew is noise — and
23//! a premature steal is *safe* anyway, though NOT because concurrent
24//! exporters produce identical artifacts (they don't: each replica exports
25//! at its own applied cursor). Safety comes from the transport: payloads are
26//! content-addressed (concurrent uploads cannot collide or tear) and the
27//! published pointer only moves forward (an overrun round's stale artifact
28//! is refused, [`PublishOutcome::SupersededByNewer`](crate::PublishOutcome)),
29//! machine-checked in `tests/model.rs` with NO lease in the model at all.
30//! The lease is a work-deduplication optimization, never a correctness gate.
31//!
32//! ## Lifecycle
33//!
34//! A successful round leaves the key in place until it expires — that is the
35//! "at most once per `ttl`" semantic: `ttl` IS the round period. The winner
36//! calls [`LeaseGuard::complete`] after its upload succeeds, which (best
37//! effort) rewrites the value with the exported cursor and completion time, so
38//! the lease key doubles as the fleet-visible "last export" record. A crash
39//! mid-round simply lets the key expire; the next trigger elects someone else.
40
41use std::sync::Arc;
42use std::time::{Duration, SystemTime, UNIX_EPOCH};
43
44use serde::{Deserialize, Serialize};
45use tracing::{debug, warn};
46
47use crate::artifact::hex_encode;
48use crate::kv::{KvError, KvReader, KvWriter, VersionToken, WatchCursor};
49use crate::stores::KvStore;
50
51/// The lease key's value: who holds (or last held) the round, until when, and
52/// — after [`LeaseGuard::complete`] — what was exported.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct LeaseRecord {
55 /// Identity of the node that won the round (caller-chosen, e.g. node id).
56 pub holder_id: String,
57 /// When the round was won, seconds since the Unix epoch.
58 pub acquired_at_unix: u64,
59 /// When the lease lapses and the next round may be won. This is the round
60 /// period: the "at most once per `ttl`" bound.
61 pub expires_at_unix: u64,
62 /// Hex of the exported artifact's cursor, set by [`LeaseGuard::complete`].
63 #[serde(default, skip_serializing_if = "Option::is_none")]
64 pub completed_cursor_hex: Option<String>,
65 /// When the round completed (artifact uploaded), set by
66 /// [`LeaseGuard::complete`].
67 #[serde(default, skip_serializing_if = "Option::is_none")]
68 pub completed_at_unix: Option<u64>,
69}
70
71/// Coordinates "at most one export per round" across every replica of a fold.
72/// See the module docs for the CAS + embedded-expiry mechanism.
73pub struct ExportLease {
74 reader: Arc<dyn KvReader>,
75 writer: Arc<dyn KvWriter>,
76 key: String,
77 holder_id: String,
78}
79
80/// Proof of a won round, returned by [`ExportLease::try_acquire`].
81///
82/// There is nothing to release — the round runs until the lease expires (that
83/// is the round period). Call [`complete`](Self::complete) after the artifact
84/// is safely uploaded to publish what was exported.
85pub struct LeaseGuard {
86 writer: Arc<dyn KvWriter>,
87 key: String,
88 record: LeaseRecord,
89 version: VersionToken,
90 /// Set by [`complete`](Self::complete) / [`abandon`](Self::abandon). A
91 /// guard dropped without either (early `?`, cancelled future) leaks the
92 /// round — the fleet waits out the ttl — so [`Drop`] logs it.
93 resolved: bool,
94}
95
96impl ExportLease {
97 /// A lease on `key` in `store`, identifying this node as `holder_id`.
98 ///
99 /// Fails with [`KvError::OperationFailed`] if the store has no writer.
100 pub fn new(
101 store: &dyn KvStore,
102 key: impl Into<String>,
103 holder_id: impl Into<String>,
104 ) -> Result<Self, KvError> {
105 let writer = store.writer().ok_or_else(|| {
106 KvError::OperationFailed(format!(
107 "store {:?} has no writer; an export lease needs create/update",
108 store.name()
109 ))
110 })?;
111 Ok(Self {
112 reader: store.reader(),
113 writer,
114 key: key.into(),
115 holder_id: holder_id.into(),
116 })
117 }
118
119 /// Try to win this export round. Exactly one caller fleet-wide gets
120 /// `Ok(Some(guard))` per round; everyone else gets `Ok(None)` and skips.
121 ///
122 /// `ttl` is the round period: the winner's lease suppresses further rounds
123 /// until it lapses, whether or not the winner survives. Crash mid-round →
124 /// the key expires → the next trigger elects someone else.
125 pub async fn try_acquire(&self, ttl: Duration) -> Result<Option<LeaseGuard>, KvError> {
126 let now = unix_now();
127 let record = LeaseRecord {
128 holder_id: self.holder_id.clone(),
129 acquired_at_unix: now,
130 expires_at_unix: now.saturating_add(ttl.as_secs()),
131 completed_cursor_hex: None,
132 completed_at_unix: None,
133 };
134 let bytes =
135 serde_json::to_vec(&record).map_err(|e| KvError::SerializationError(e.to_string()))?;
136
137 // Fast path: the round is open — create-only, one winner.
138 match self.writer.create(&self.key, &bytes).await {
139 Ok(version) => {
140 debug!(key = %self.key, holder = %self.holder_id, "export lease acquired (create)");
141 return Ok(Some(self.guard(record, version)));
142 }
143 Err(KvError::AlreadyExists) => {}
144 Err(e) => return Err(e),
145 }
146
147 // The key exists. `entry` (not `get`): a CAS-deleted lease is an
148 // empty-value tombstone that `get` hides, but its version is exactly
149 // what the takeover CAS needs.
150 let Some(entry) = self.reader.entry(&self.key).await? else {
151 // Deleted between create and read; treat as lost — the next
152 // trigger retries cleanly rather than looping here.
153 return Ok(None);
154 };
155
156 // A live, parseable, unexpired lease wins; anything else (expired,
157 // tombstone, unparseable garbage) is taken over. Unparseable leases
158 // MUST be stealable or one corrupt write wedges exports fleet-wide.
159 if let Ok(existing) = serde_json::from_slice::<LeaseRecord>(&entry.value)
160 && existing.expires_at_unix > now
161 {
162 return Ok(None);
163 }
164
165 // Takeover: CAS against the version we read — one winner.
166 match self.writer.update(&self.key, &bytes, &entry.version).await {
167 Ok(version) => {
168 debug!(key = %self.key, holder = %self.holder_id, "export lease acquired (takeover)");
169 Ok(Some(self.guard(record, version)))
170 }
171 // Someone else's create/update landed first: their round.
172 Err(KvError::RevisionMismatch | KvError::AlreadyExists | KvError::KeyNotFound) => {
173 Ok(None)
174 }
175 Err(e) => Err(e),
176 }
177 }
178
179 /// Read the current lease record, if any — the fleet-visible "last export"
180 /// state. `None` when no round has ever run (or the key was tombstoned);
181 /// [`KvError::SerializationError`] when the key holds unparseable bytes —
182 /// distinct from `None` so an operator can see "present but corrupt" (a
183 /// state [`try_acquire`](Self::try_acquire) will repair by takeover) rather
184 /// than a false "never ran".
185 pub async fn current(&self) -> Result<Option<LeaseRecord>, KvError> {
186 match self.reader.get(&self.key).await? {
187 Some(entry) => serde_json::from_slice(&entry.value).map(Some).map_err(|e| {
188 KvError::SerializationError(format!(
189 "lease key {:?} holds an unparseable value: {e}",
190 self.key
191 ))
192 }),
193 None => Ok(None),
194 }
195 }
196
197 fn guard(&self, record: LeaseRecord, version: VersionToken) -> LeaseGuard {
198 LeaseGuard {
199 writer: Arc::clone(&self.writer),
200 key: self.key.clone(),
201 record,
202 version,
203 resolved: false,
204 }
205 }
206}
207
208impl LeaseGuard {
209 /// The record this guard wrote when it won the round.
210 pub fn record(&self) -> &LeaseRecord {
211 &self.record
212 }
213
214 /// Give the round back early: a failed export/upload should not suppress
215 /// the fleet for the rest of the ttl. CAS-deletes the lease (tombstone)
216 /// against this guard's version, so the next trigger on any node can win a
217 /// fresh round immediately.
218 ///
219 /// Best-effort: a CAS conflict (someone already took over) or write error
220 /// is logged, not surfaced — worst case the round waits out its ttl, which
221 /// is the no-abandon behavior anyway.
222 ///
223 /// An unawaited `abandon()` does nothing (and would trip the [`Drop`]
224 /// warning); the future's inherent `#[must_use]` plus this crate's
225 /// `deny(unused_must_use)` make that a compile error, not a silent leak.
226 pub async fn abandon(mut self) {
227 self.resolved = true;
228 match self
229 .writer
230 .delete_with_version(&self.key, &self.version)
231 .await
232 {
233 Ok(_) => {
234 debug!(key = %self.key, holder = %self.record.holder_id, "export lease abandoned");
235 }
236 Err(e) => {
237 warn!(
238 key = %self.key,
239 holder = %self.record.holder_id,
240 error = %e,
241 "failed to abandon export lease; next round waits for expiry"
242 );
243 }
244 }
245 }
246
247 /// Publish the round's outcome: rewrite the lease value with the exported
248 /// cursor and completion time (expiry unchanged — the round still runs its
249 /// full period).
250 ///
251 /// Best-effort observability: a CAS conflict means the lease was already
252 /// taken over (this round overran its ttl) and is logged, not surfaced —
253 /// the artifact is already safe wherever the caller put it.
254 ///
255 /// An unawaited `complete()` records nothing (and would trip the [`Drop`]
256 /// warning); the future's inherent `#[must_use]` plus this crate's
257 /// `deny(unused_must_use)` make that a compile error, not a silent leak.
258 pub async fn complete(mut self, cursor: &WatchCursor) -> Result<(), KvError> {
259 self.resolved = true;
260 self.record.completed_cursor_hex = Some(hex_encode(cursor.version().as_bytes()));
261 self.record.completed_at_unix = Some(unix_now());
262 let bytes = serde_json::to_vec(&self.record)
263 .map_err(|e| KvError::SerializationError(e.to_string()))?;
264 match self.writer.update(&self.key, &bytes, &self.version).await {
265 Ok(_) => Ok(()),
266 Err(KvError::RevisionMismatch) => {
267 warn!(
268 key = %self.key,
269 holder = %self.record.holder_id,
270 "export round overran its lease; completion record skipped"
271 );
272 Ok(())
273 }
274 Err(e) => Err(e),
275 }
276 }
277}
278
279impl Drop for LeaseGuard {
280 fn drop(&mut self) {
281 if !self.resolved {
282 warn!(
283 key = %self.key,
284 holder = %self.record.holder_id,
285 "LeaseGuard dropped without complete() or abandon(); the fleet waits out the lease ttl"
286 );
287 }
288 }
289}
290
291fn unix_now() -> u64 {
292 match SystemTime::now().duration_since(UNIX_EPOCH) {
293 Ok(d) => d.as_secs(),
294 Err(_) => {
295 // A pre-epoch clock means every lease this node writes is already
296 // expired (`expires_at = 0 + ttl` is in the past), so any node can
297 // steal it — duplicate exports every round, which the lease design
298 // tolerates (dedup, not correctness). That direction is deliberate:
299 // the alternative sentinel (`u64::MAX`) would mint a never-expiring
300 // lease that wedges the fleet until manual cleanup. But it must not
301 // be silent — duplicate artifacts with no log line is undebuggable.
302 warn!(
303 "system clock predates the Unix epoch; lease expiry math degraded (expect duplicate export rounds until the clock is fixed)"
304 );
305 0
306 }
307 }
308}