slipstream/export_lease.rs
1//! Fleet-wide "export at most once per round" lease.
2//!
3//! Every replica of a fold runs the same checkpoint loop; without coordination,
4//! N nodes would produce N identical artifacts per round. [`ExportLease`] makes
5//! exactly one of them do the work: each candidate calls
6//! [`try_acquire`](ExportLease::try_acquire) when its trigger fires; one wins,
7//! the rest skip the round.
8//!
9//! ## Mechanism: CAS + embedded expiry — no TTL machinery
10//!
11//! The lease is a single KV key. Acquisition is a **create-only** write
12//! (`KvWriter::create`): exactly one caller fleet-wide can create a missing
13//! key, so the race has one winner by construction. The lease's lifetime is an
14//! `expires_at_unix` timestamp **inside the value** — not a server-side TTL —
15//! and an expired (or unparseable) lease is taken over with a CAS
16//! [`update`](crate::KvWriter::update) against the observed version, so the
17//! steal race also has exactly one winner.
18//!
19//! Embedding the expiry rather than using per-message TTL keeps the lease
20//! portable to any [`KvWriter`] backend and free of server-version/bucket-flag
21//! requirements. The cost is wall-clock comparison across nodes: with
22//! NTP-sane clocks and round periods measured in minutes, skew is noise — and
23//! a premature steal is *safe* anyway (two exporters produce two identical
24//! artifacts; the upload is last-write-wins on the same key). The lease is a
25//! work-deduplication optimization, never a correctness gate.
26//!
27//! ## Lifecycle
28//!
29//! A successful round leaves the key in place until it expires — that is the
30//! "at most once per `ttl`" semantic: `ttl` IS the round period. The winner
31//! calls [`LeaseGuard::complete`] after its upload succeeds, which (best
32//! effort) rewrites the value with the exported cursor and completion time, so
33//! the lease key doubles as the fleet-visible "last export" record. A crash
34//! mid-round simply lets the key expire; the next trigger elects someone else.
35
36use std::sync::Arc;
37use std::time::{Duration, SystemTime, UNIX_EPOCH};
38
39use serde::{Deserialize, Serialize};
40use tracing::{debug, warn};
41
42use crate::artifact::hex_encode;
43use crate::kv::{KvError, KvReader, KvWriter, VersionToken, WatchCursor};
44use crate::stores::KvStore;
45
46/// The lease key's value: who holds (or last held) the round, until when, and
47/// — after [`LeaseGuard::complete`] — what was exported.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct LeaseRecord {
50 /// Identity of the node that won the round (caller-chosen, e.g. node id).
51 pub holder_id: String,
52 /// When the round was won, seconds since the Unix epoch.
53 pub acquired_at_unix: u64,
54 /// When the lease lapses and the next round may be won. This is the round
55 /// period: the "at most once per `ttl`" bound.
56 pub expires_at_unix: u64,
57 /// Hex of the exported artifact's cursor, set by [`LeaseGuard::complete`].
58 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub completed_cursor_hex: Option<String>,
60 /// When the round completed (artifact uploaded), set by
61 /// [`LeaseGuard::complete`].
62 #[serde(default, skip_serializing_if = "Option::is_none")]
63 pub completed_at_unix: Option<u64>,
64}
65
66/// Coordinates "at most one export per round" across every replica of a fold.
67/// See the module docs for the CAS + embedded-expiry mechanism.
68pub struct ExportLease {
69 reader: Arc<dyn KvReader>,
70 writer: Arc<dyn KvWriter>,
71 key: String,
72 holder_id: String,
73}
74
75/// Proof of a won round, returned by [`ExportLease::try_acquire`].
76///
77/// There is nothing to release — the round runs until the lease expires (that
78/// is the round period). Call [`complete`](Self::complete) after the artifact
79/// is safely uploaded to publish what was exported.
80pub struct LeaseGuard {
81 writer: Arc<dyn KvWriter>,
82 key: String,
83 record: LeaseRecord,
84 version: VersionToken,
85 /// Set by [`complete`](Self::complete) / [`abandon`](Self::abandon). A
86 /// guard dropped without either (early `?`, cancelled future) leaks the
87 /// round — the fleet waits out the ttl — so [`Drop`] logs it.
88 resolved: bool,
89}
90
91impl ExportLease {
92 /// A lease on `key` in `store`, identifying this node as `holder_id`.
93 ///
94 /// Fails with [`KvError::OperationFailed`] if the store has no writer.
95 pub fn new(
96 store: &dyn KvStore,
97 key: impl Into<String>,
98 holder_id: impl Into<String>,
99 ) -> Result<Self, KvError> {
100 let writer = store.writer().ok_or_else(|| {
101 KvError::OperationFailed(format!(
102 "store {:?} has no writer; an export lease needs create/update",
103 store.name()
104 ))
105 })?;
106 Ok(Self {
107 reader: store.reader(),
108 writer,
109 key: key.into(),
110 holder_id: holder_id.into(),
111 })
112 }
113
114 /// Try to win this export round. Exactly one caller fleet-wide gets
115 /// `Ok(Some(guard))` per round; everyone else gets `Ok(None)` and skips.
116 ///
117 /// `ttl` is the round period: the winner's lease suppresses further rounds
118 /// until it lapses, whether or not the winner survives. Crash mid-round →
119 /// the key expires → the next trigger elects someone else.
120 pub async fn try_acquire(&self, ttl: Duration) -> Result<Option<LeaseGuard>, KvError> {
121 let now = unix_now();
122 let record = LeaseRecord {
123 holder_id: self.holder_id.clone(),
124 acquired_at_unix: now,
125 expires_at_unix: now.saturating_add(ttl.as_secs()),
126 completed_cursor_hex: None,
127 completed_at_unix: None,
128 };
129 let bytes =
130 serde_json::to_vec(&record).map_err(|e| KvError::SerializationError(e.to_string()))?;
131
132 // Fast path: the round is open — create-only, one winner.
133 match self.writer.create(&self.key, &bytes).await {
134 Ok(version) => {
135 debug!(key = %self.key, holder = %self.holder_id, "export lease acquired (create)");
136 return Ok(Some(self.guard(record, version)));
137 }
138 Err(KvError::AlreadyExists) => {}
139 Err(e) => return Err(e),
140 }
141
142 // The key exists. `entry` (not `get`): a CAS-deleted lease is an
143 // empty-value tombstone that `get` hides, but its version is exactly
144 // what the takeover CAS needs.
145 let Some(entry) = self.reader.entry(&self.key).await? else {
146 // Deleted between create and read; treat as lost — the next
147 // trigger retries cleanly rather than looping here.
148 return Ok(None);
149 };
150
151 // A live, parseable, unexpired lease wins; anything else (expired,
152 // tombstone, unparseable garbage) is taken over. Unparseable leases
153 // MUST be stealable or one corrupt write wedges exports fleet-wide.
154 if let Ok(existing) = serde_json::from_slice::<LeaseRecord>(&entry.value)
155 && existing.expires_at_unix > now
156 {
157 return Ok(None);
158 }
159
160 // Takeover: CAS against the version we read — one winner.
161 match self.writer.update(&self.key, &bytes, &entry.version).await {
162 Ok(version) => {
163 debug!(key = %self.key, holder = %self.holder_id, "export lease acquired (takeover)");
164 Ok(Some(self.guard(record, version)))
165 }
166 // Someone else's create/update landed first: their round.
167 Err(KvError::RevisionMismatch | KvError::AlreadyExists | KvError::KeyNotFound) => {
168 Ok(None)
169 }
170 Err(e) => Err(e),
171 }
172 }
173
174 /// Read the current lease record, if any — the fleet-visible "last export"
175 /// state. `None` when no round has ever run (or the key was tombstoned);
176 /// [`KvError::SerializationError`] when the key holds unparseable bytes —
177 /// distinct from `None` so an operator can see "present but corrupt" (a
178 /// state [`try_acquire`](Self::try_acquire) will repair by takeover) rather
179 /// than a false "never ran".
180 pub async fn current(&self) -> Result<Option<LeaseRecord>, KvError> {
181 match self.reader.get(&self.key).await? {
182 Some(entry) => serde_json::from_slice(&entry.value).map(Some).map_err(|e| {
183 KvError::SerializationError(format!(
184 "lease key {:?} holds an unparseable value: {e}",
185 self.key
186 ))
187 }),
188 None => Ok(None),
189 }
190 }
191
192 fn guard(&self, record: LeaseRecord, version: VersionToken) -> LeaseGuard {
193 LeaseGuard {
194 writer: Arc::clone(&self.writer),
195 key: self.key.clone(),
196 record,
197 version,
198 resolved: false,
199 }
200 }
201}
202
203impl LeaseGuard {
204 /// The record this guard wrote when it won the round.
205 pub fn record(&self) -> &LeaseRecord {
206 &self.record
207 }
208
209 /// Give the round back early: a failed export/upload should not suppress
210 /// the fleet for the rest of the ttl. CAS-deletes the lease (tombstone)
211 /// against this guard's version, so the next trigger on any node can win a
212 /// fresh round immediately.
213 ///
214 /// Best-effort: a CAS conflict (someone already took over) or write error
215 /// is logged, not surfaced — worst case the round waits out its ttl, which
216 /// is the no-abandon behavior anyway.
217 pub async fn abandon(mut self) {
218 self.resolved = true;
219 match self
220 .writer
221 .delete_with_version(&self.key, &self.version)
222 .await
223 {
224 Ok(_) => {
225 debug!(key = %self.key, holder = %self.record.holder_id, "export lease abandoned");
226 }
227 Err(e) => {
228 warn!(
229 key = %self.key,
230 holder = %self.record.holder_id,
231 error = %e,
232 "failed to abandon export lease; next round waits for expiry"
233 );
234 }
235 }
236 }
237
238 /// Publish the round's outcome: rewrite the lease value with the exported
239 /// cursor and completion time (expiry unchanged — the round still runs its
240 /// full period).
241 ///
242 /// Best-effort observability: a CAS conflict means the lease was already
243 /// taken over (this round overran its ttl) and is logged, not surfaced —
244 /// the artifact is already safe wherever the caller put it.
245 pub async fn complete(mut self, cursor: &WatchCursor) -> Result<(), KvError> {
246 self.resolved = true;
247 self.record.completed_cursor_hex = Some(hex_encode(cursor.version().as_bytes()));
248 self.record.completed_at_unix = Some(unix_now());
249 let bytes = serde_json::to_vec(&self.record)
250 .map_err(|e| KvError::SerializationError(e.to_string()))?;
251 match self.writer.update(&self.key, &bytes, &self.version).await {
252 Ok(_) => Ok(()),
253 Err(KvError::RevisionMismatch) => {
254 warn!(
255 key = %self.key,
256 holder = %self.record.holder_id,
257 "export round overran its lease; completion record skipped"
258 );
259 Ok(())
260 }
261 Err(e) => Err(e),
262 }
263 }
264}
265
266impl Drop for LeaseGuard {
267 fn drop(&mut self) {
268 if !self.resolved {
269 warn!(
270 key = %self.key,
271 holder = %self.record.holder_id,
272 "LeaseGuard dropped without complete() or abandon(); the fleet waits out the lease ttl"
273 );
274 }
275 }
276}
277
278fn unix_now() -> u64 {
279 match SystemTime::now().duration_since(UNIX_EPOCH) {
280 Ok(d) => d.as_secs(),
281 Err(_) => {
282 // A pre-epoch clock means every lease this node writes is already
283 // expired (`expires_at = 0 + ttl` is in the past), so any node can
284 // steal it — duplicate exports every round, which the lease design
285 // tolerates (dedup, not correctness). That direction is deliberate:
286 // the alternative sentinel (`u64::MAX`) would mint a never-expiring
287 // lease that wedges the fleet until manual cleanup. But it must not
288 // be silent — duplicate artifacts with no log line is undebuggable.
289 warn!(
290 "system clock predates the Unix epoch; lease expiry math degraded (expect duplicate export rounds until the clock is fixed)"
291 );
292 0
293 }
294 }
295}