Skip to main content

slipstream/
export_lease.rs

1//! Fleet-wide "export at most once per round" lease.
2//!
3//! Every replica of a fold runs the same checkpoint loop; without coordination,
4//! N nodes would produce N identical artifacts per round. [`ExportLease`] makes
5//! exactly one of them do the work: each candidate calls
6//! [`try_acquire`](ExportLease::try_acquire) when its trigger fires; one wins,
7//! the rest skip the round.
8//!
9//! ## Mechanism: CAS + embedded expiry — no TTL machinery
10//!
11//! The lease is a single KV key. Acquisition is a **create-only** write
12//! (`KvWriter::create`): exactly one caller fleet-wide can create a missing
13//! key, so the race has one winner by construction. The lease's lifetime is an
14//! `expires_at_unix` timestamp **inside the value** — not a server-side TTL —
15//! and an expired (or unparseable) lease is taken over with a CAS
16//! [`update`](crate::KvWriter::update) against the observed version, so the
17//! steal race also has exactly one winner.
18//!
19//! Embedding the expiry rather than using per-message TTL keeps the lease
20//! portable to any [`KvWriter`] backend and free of server-version/bucket-flag
21//! requirements. The cost is wall-clock comparison across nodes: with
22//! NTP-sane clocks and round periods measured in minutes, skew is noise — and
23//! a premature steal is *safe* anyway, though NOT because concurrent
24//! exporters produce identical artifacts (they don't: each replica exports
25//! at its own applied cursor). Safety comes from the transport: payloads are
26//! content-addressed (concurrent uploads cannot collide or tear) and the
27//! published pointer only moves forward (an overrun round's stale artifact
28//! is refused, [`PublishOutcome::SupersededByNewer`](crate::PublishOutcome)),
29//! machine-checked in `tests/model.rs` with NO lease in the model at all.
30//! The lease is a work-deduplication optimization, never a correctness gate.
31//!
32//! ## Lifecycle
33//!
34//! A successful round leaves the key in place until it expires — that is the
35//! "at most once per `ttl`" semantic: `ttl` IS the round period. The winner
36//! calls [`LeaseGuard::complete`] after its upload succeeds, which (best
37//! effort) rewrites the value with the exported cursor and completion time, so
38//! the lease key doubles as the fleet-visible "last export" record. A crash
39//! mid-round simply lets the key expire; the next trigger elects someone else.
40
41use std::sync::Arc;
42use std::time::{Duration, SystemTime, UNIX_EPOCH};
43
44use serde::{Deserialize, Serialize};
45use tracing::{debug, warn};
46
47use crate::artifact::hex_encode;
48use crate::kv::{KvError, KvReader, KvWriter, VersionToken, WatchCursor};
49use crate::stores::KvStore;
50
51/// The lease key's value: who holds (or last held) the round, until when, and
52/// — after [`LeaseGuard::complete`] — what was exported.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct LeaseRecord {
55    /// Identity of the node that won the round (caller-chosen, e.g. node id).
56    pub holder_id: String,
57    /// When the round was won, seconds since the Unix epoch.
58    pub acquired_at_unix: u64,
59    /// When the lease lapses and the next round may be won. This is the round
60    /// period: the "at most once per `ttl`" bound.
61    pub expires_at_unix: u64,
62    /// Hex of the exported artifact's cursor, set by [`LeaseGuard::complete`].
63    #[serde(default, skip_serializing_if = "Option::is_none")]
64    pub completed_cursor_hex: Option<String>,
65    /// When the round completed (artifact uploaded), set by
66    /// [`LeaseGuard::complete`].
67    #[serde(default, skip_serializing_if = "Option::is_none")]
68    pub completed_at_unix: Option<u64>,
69}
70
71/// Coordinates "at most one export per round" across every replica of a fold.
72/// See the module docs for the CAS + embedded-expiry mechanism.
73pub struct ExportLease {
74    reader: Arc<dyn KvReader>,
75    writer: Arc<dyn KvWriter>,
76    key: String,
77    holder_id: String,
78}
79
80/// Proof of a won round, returned by [`ExportLease::try_acquire`].
81///
82/// There is nothing to release — the round runs until the lease expires (that
83/// is the round period). Call [`complete`](Self::complete) after the artifact
84/// is safely uploaded to publish what was exported.
85pub struct LeaseGuard {
86    writer: Arc<dyn KvWriter>,
87    key: String,
88    record: LeaseRecord,
89    version: VersionToken,
90    /// Set by [`complete`](Self::complete) / [`abandon`](Self::abandon). A
91    /// guard dropped without either (early `?`, cancelled future) leaks the
92    /// round — the fleet waits out the ttl — so [`Drop`] logs it.
93    resolved: bool,
94}
95
96impl ExportLease {
97    /// A lease on `key` in `store`, identifying this node as `holder_id`.
98    ///
99    /// Fails with [`KvError::OperationFailed`] if the store has no writer.
100    pub fn new(
101        store: &dyn KvStore,
102        key: impl Into<String>,
103        holder_id: impl Into<String>,
104    ) -> Result<Self, KvError> {
105        let writer = store.writer().ok_or_else(|| {
106            KvError::OperationFailed(format!(
107                "store {:?} has no writer; an export lease needs create/update",
108                store.name()
109            ))
110        })?;
111        Ok(Self {
112            reader: store.reader(),
113            writer,
114            key: key.into(),
115            holder_id: holder_id.into(),
116        })
117    }
118
119    /// Try to win this export round. Exactly one caller fleet-wide gets
120    /// `Ok(Some(guard))` per round; everyone else gets `Ok(None)` and skips.
121    ///
122    /// `ttl` is the round period: the winner's lease suppresses further rounds
123    /// until it lapses, whether or not the winner survives. Crash mid-round →
124    /// the key expires → the next trigger elects someone else.
125    pub async fn try_acquire(&self, ttl: Duration) -> Result<Option<LeaseGuard>, KvError> {
126        let now = unix_now();
127        let record = LeaseRecord {
128            holder_id: self.holder_id.clone(),
129            acquired_at_unix: now,
130            expires_at_unix: now.saturating_add(ttl.as_secs()),
131            completed_cursor_hex: None,
132            completed_at_unix: None,
133        };
134        let bytes =
135            serde_json::to_vec(&record).map_err(|e| KvError::SerializationError(e.to_string()))?;
136
137        // Fast path: the round is open — create-only, one winner.
138        match self.writer.create(&self.key, &bytes).await {
139            Ok(version) => {
140                debug!(key = %self.key, holder = %self.holder_id, "export lease acquired (create)");
141                return Ok(Some(self.guard(record, version)));
142            }
143            Err(KvError::AlreadyExists) => {}
144            Err(e) => return Err(e),
145        }
146
147        // The key exists. `entry` (not `get`): a CAS-deleted lease is an
148        // empty-value tombstone that `get` hides, but its version is exactly
149        // what the takeover CAS needs.
150        let Some(entry) = self.reader.entry(&self.key).await? else {
151            // Deleted between create and read; treat as lost — the next
152            // trigger retries cleanly rather than looping here.
153            return Ok(None);
154        };
155
156        // A live, parseable, unexpired lease wins; anything else (expired,
157        // tombstone, unparseable garbage) is taken over. Unparseable leases
158        // MUST be stealable or one corrupt write wedges exports fleet-wide.
159        if let Ok(existing) = serde_json::from_slice::<LeaseRecord>(&entry.value)
160            && existing.expires_at_unix > now
161        {
162            return Ok(None);
163        }
164
165        // Takeover: CAS against the version we read — one winner.
166        match self.writer.update(&self.key, &bytes, &entry.version).await {
167            Ok(version) => {
168                debug!(key = %self.key, holder = %self.holder_id, "export lease acquired (takeover)");
169                Ok(Some(self.guard(record, version)))
170            }
171            // Someone else's create/update landed first: their round.
172            Err(KvError::RevisionMismatch | KvError::AlreadyExists | KvError::KeyNotFound) => {
173                Ok(None)
174            }
175            Err(e) => Err(e),
176        }
177    }
178
179    /// Read the current lease record, if any — the fleet-visible "last export"
180    /// state. `None` when no round has ever run (or the key was tombstoned);
181    /// [`KvError::SerializationError`] when the key holds unparseable bytes —
182    /// distinct from `None` so an operator can see "present but corrupt" (a
183    /// state [`try_acquire`](Self::try_acquire) will repair by takeover) rather
184    /// than a false "never ran".
185    pub async fn current(&self) -> Result<Option<LeaseRecord>, KvError> {
186        match self.reader.get(&self.key).await? {
187            Some(entry) => serde_json::from_slice(&entry.value).map(Some).map_err(|e| {
188                KvError::SerializationError(format!(
189                    "lease key {:?} holds an unparseable value: {e}",
190                    self.key
191                ))
192            }),
193            None => Ok(None),
194        }
195    }
196
197    fn guard(&self, record: LeaseRecord, version: VersionToken) -> LeaseGuard {
198        LeaseGuard {
199            writer: Arc::clone(&self.writer),
200            key: self.key.clone(),
201            record,
202            version,
203            resolved: false,
204        }
205    }
206}
207
208impl LeaseGuard {
209    /// The record this guard wrote when it won the round.
210    pub fn record(&self) -> &LeaseRecord {
211        &self.record
212    }
213
214    /// Give the round back early: a failed export/upload should not suppress
215    /// the fleet for the rest of the ttl. CAS-deletes the lease (tombstone)
216    /// against this guard's version, so the next trigger on any node can win a
217    /// fresh round immediately.
218    ///
219    /// Best-effort: a CAS conflict (someone already took over) or write error
220    /// is logged, not surfaced — worst case the round waits out its ttl, which
221    /// is the no-abandon behavior anyway.
222    ///
223    /// An unawaited `abandon()` does nothing (and would trip the [`Drop`]
224    /// warning); the future's inherent `#[must_use]` plus this crate's
225    /// `deny(unused_must_use)` make that a compile error, not a silent leak.
226    pub async fn abandon(mut self) {
227        self.resolved = true;
228        match self
229            .writer
230            .delete_with_version(&self.key, &self.version)
231            .await
232        {
233            Ok(_) => {
234                debug!(key = %self.key, holder = %self.record.holder_id, "export lease abandoned");
235            }
236            Err(e) => {
237                warn!(
238                    key = %self.key,
239                    holder = %self.record.holder_id,
240                    error = %e,
241                    "failed to abandon export lease; next round waits for expiry"
242                );
243            }
244        }
245    }
246
247    /// Publish the round's outcome: rewrite the lease value with the exported
248    /// cursor and completion time (expiry unchanged — the round still runs its
249    /// full period).
250    ///
251    /// Best-effort observability: a CAS conflict means the lease was already
252    /// taken over (this round overran its ttl) and is logged, not surfaced —
253    /// the artifact is already safe wherever the caller put it.
254    ///
255    /// An unawaited `complete()` records nothing (and would trip the [`Drop`]
256    /// warning); the future's inherent `#[must_use]` plus this crate's
257    /// `deny(unused_must_use)` make that a compile error, not a silent leak.
258    pub async fn complete(mut self, cursor: &WatchCursor) -> Result<(), KvError> {
259        self.resolved = true;
260        self.record.completed_cursor_hex = Some(hex_encode(cursor.version().as_bytes()));
261        self.record.completed_at_unix = Some(unix_now());
262        let bytes = serde_json::to_vec(&self.record)
263            .map_err(|e| KvError::SerializationError(e.to_string()))?;
264        match self.writer.update(&self.key, &bytes, &self.version).await {
265            Ok(_) => Ok(()),
266            Err(KvError::RevisionMismatch) => {
267                warn!(
268                    key = %self.key,
269                    holder = %self.record.holder_id,
270                    "export round overran its lease; completion record skipped"
271                );
272                Ok(())
273            }
274            Err(e) => Err(e),
275        }
276    }
277}
278
279impl Drop for LeaseGuard {
280    fn drop(&mut self) {
281        if !self.resolved {
282            warn!(
283                key = %self.key,
284                holder = %self.record.holder_id,
285                "LeaseGuard dropped without complete() or abandon(); the fleet waits out the lease ttl"
286            );
287        }
288    }
289}
290
291fn unix_now() -> u64 {
292    match SystemTime::now().duration_since(UNIX_EPOCH) {
293        Ok(d) => d.as_secs(),
294        Err(_) => {
295            // A pre-epoch clock means every lease this node writes is already
296            // expired (`expires_at = 0 + ttl` is in the past), so any node can
297            // steal it — duplicate exports every round, which the lease design
298            // tolerates (dedup, not correctness). That direction is deliberate:
299            // the alternative sentinel (`u64::MAX`) would mint a never-expiring
300            // lease that wedges the fleet until manual cleanup. But it must not
301            // be silent — duplicate artifacts with no log line is undebuggable.
302            warn!(
303                "system clock predates the Unix epoch; lease expiry math degraded (expect duplicate export rounds until the clock is fixed)"
304            );
305            0
306        }
307    }
308}