Skip to main content

slipstream/
export_lease.rs

1//! Fleet-wide "export at most once per round" lease.
2//!
3//! Every replica of a fold runs the same checkpoint loop; without coordination,
4//! N nodes would produce N identical artifacts per round. [`ExportLease`] makes
5//! exactly one of them do the work: each candidate calls
6//! [`try_acquire`](ExportLease::try_acquire) when its trigger fires; one wins,
7//! the rest skip the round.
8//!
9//! ## Mechanism: CAS + embedded expiry — no TTL machinery
10//!
11//! The lease is a single KV key. Acquisition is a **create-only** write
12//! (`KvWriter::create`): exactly one caller fleet-wide can create a missing
13//! key, so the race has one winner by construction. The lease's lifetime is an
14//! `expires_at_unix` timestamp **inside the value** — not a server-side TTL —
15//! and an expired (or unparseable) lease is taken over with a CAS
16//! [`update`](crate::KvWriter::update) against the observed version, so the
17//! steal race also has exactly one winner.
18//!
19//! Embedding the expiry rather than using per-message TTL keeps the lease
20//! portable to any [`KvWriter`] backend and free of server-version/bucket-flag
21//! requirements. The cost is wall-clock comparison across nodes: with
22//! NTP-sane clocks and round periods measured in minutes, skew is noise — and
23//! a premature steal is *safe* anyway (two exporters produce two identical
24//! artifacts; the upload is last-write-wins on the same key). The lease is a
25//! work-deduplication optimization, never a correctness gate.
26//!
27//! ## Lifecycle
28//!
29//! A successful round leaves the key in place until it expires — that is the
30//! "at most once per `ttl`" semantic: `ttl` IS the round period. The winner
31//! calls [`LeaseGuard::complete`] after its upload succeeds, which (best
32//! effort) rewrites the value with the exported cursor and completion time, so
33//! the lease key doubles as the fleet-visible "last export" record. A crash
34//! mid-round simply lets the key expire; the next trigger elects someone else.
35
36use std::sync::Arc;
37use std::time::{Duration, SystemTime, UNIX_EPOCH};
38
39use serde::{Deserialize, Serialize};
40use tracing::{debug, warn};
41
42use crate::artifact::hex_encode;
43use crate::kv::{KvError, KvReader, KvWriter, VersionToken, WatchCursor};
44use crate::stores::KvStore;
45
46/// The lease key's value: who holds (or last held) the round, until when, and
47/// — after [`LeaseGuard::complete`] — what was exported.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct LeaseRecord {
50    /// Identity of the node that won the round (caller-chosen, e.g. node id).
51    pub holder_id: String,
52    /// When the round was won, seconds since the Unix epoch.
53    pub acquired_at_unix: u64,
54    /// When the lease lapses and the next round may be won. This is the round
55    /// period: the "at most once per `ttl`" bound.
56    pub expires_at_unix: u64,
57    /// Hex of the exported artifact's cursor, set by [`LeaseGuard::complete`].
58    #[serde(default, skip_serializing_if = "Option::is_none")]
59    pub completed_cursor_hex: Option<String>,
60    /// When the round completed (artifact uploaded), set by
61    /// [`LeaseGuard::complete`].
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub completed_at_unix: Option<u64>,
64}
65
66/// Coordinates "at most one export per round" across every replica of a fold.
67/// See the module docs for the CAS + embedded-expiry mechanism.
68pub struct ExportLease {
69    reader: Arc<dyn KvReader>,
70    writer: Arc<dyn KvWriter>,
71    key: String,
72    holder_id: String,
73}
74
75/// Proof of a won round, returned by [`ExportLease::try_acquire`].
76///
77/// There is nothing to release — the round runs until the lease expires (that
78/// is the round period). Call [`complete`](Self::complete) after the artifact
79/// is safely uploaded to publish what was exported.
80pub struct LeaseGuard {
81    writer: Arc<dyn KvWriter>,
82    key: String,
83    record: LeaseRecord,
84    version: VersionToken,
85    /// Set by [`complete`](Self::complete) / [`abandon`](Self::abandon). A
86    /// guard dropped without either (early `?`, cancelled future) leaks the
87    /// round — the fleet waits out the ttl — so [`Drop`] logs it.
88    resolved: bool,
89}
90
91impl ExportLease {
92    /// A lease on `key` in `store`, identifying this node as `holder_id`.
93    ///
94    /// Fails with [`KvError::OperationFailed`] if the store has no writer.
95    pub fn new(
96        store: &dyn KvStore,
97        key: impl Into<String>,
98        holder_id: impl Into<String>,
99    ) -> Result<Self, KvError> {
100        let writer = store.writer().ok_or_else(|| {
101            KvError::OperationFailed(format!(
102                "store {:?} has no writer; an export lease needs create/update",
103                store.name()
104            ))
105        })?;
106        Ok(Self {
107            reader: store.reader(),
108            writer,
109            key: key.into(),
110            holder_id: holder_id.into(),
111        })
112    }
113
114    /// Try to win this export round. Exactly one caller fleet-wide gets
115    /// `Ok(Some(guard))` per round; everyone else gets `Ok(None)` and skips.
116    ///
117    /// `ttl` is the round period: the winner's lease suppresses further rounds
118    /// until it lapses, whether or not the winner survives. Crash mid-round →
119    /// the key expires → the next trigger elects someone else.
120    pub async fn try_acquire(&self, ttl: Duration) -> Result<Option<LeaseGuard>, KvError> {
121        let now = unix_now();
122        let record = LeaseRecord {
123            holder_id: self.holder_id.clone(),
124            acquired_at_unix: now,
125            expires_at_unix: now.saturating_add(ttl.as_secs()),
126            completed_cursor_hex: None,
127            completed_at_unix: None,
128        };
129        let bytes =
130            serde_json::to_vec(&record).map_err(|e| KvError::SerializationError(e.to_string()))?;
131
132        // Fast path: the round is open — create-only, one winner.
133        match self.writer.create(&self.key, &bytes).await {
134            Ok(version) => {
135                debug!(key = %self.key, holder = %self.holder_id, "export lease acquired (create)");
136                return Ok(Some(self.guard(record, version)));
137            }
138            Err(KvError::AlreadyExists) => {}
139            Err(e) => return Err(e),
140        }
141
142        // The key exists. `entry` (not `get`): a CAS-deleted lease is an
143        // empty-value tombstone that `get` hides, but its version is exactly
144        // what the takeover CAS needs.
145        let Some(entry) = self.reader.entry(&self.key).await? else {
146            // Deleted between create and read; treat as lost — the next
147            // trigger retries cleanly rather than looping here.
148            return Ok(None);
149        };
150
151        // A live, parseable, unexpired lease wins; anything else (expired,
152        // tombstone, unparseable garbage) is taken over. Unparseable leases
153        // MUST be stealable or one corrupt write wedges exports fleet-wide.
154        if let Ok(existing) = serde_json::from_slice::<LeaseRecord>(&entry.value)
155            && existing.expires_at_unix > now
156        {
157            return Ok(None);
158        }
159
160        // Takeover: CAS against the version we read — one winner.
161        match self.writer.update(&self.key, &bytes, &entry.version).await {
162            Ok(version) => {
163                debug!(key = %self.key, holder = %self.holder_id, "export lease acquired (takeover)");
164                Ok(Some(self.guard(record, version)))
165            }
166            // Someone else's create/update landed first: their round.
167            Err(KvError::RevisionMismatch | KvError::AlreadyExists | KvError::KeyNotFound) => {
168                Ok(None)
169            }
170            Err(e) => Err(e),
171        }
172    }
173
174    /// Read the current lease record, if any — the fleet-visible "last export"
175    /// state. `None` when no round has ever run (or the key was tombstoned);
176    /// [`KvError::SerializationError`] when the key holds unparseable bytes —
177    /// distinct from `None` so an operator can see "present but corrupt" (a
178    /// state [`try_acquire`](Self::try_acquire) will repair by takeover) rather
179    /// than a false "never ran".
180    pub async fn current(&self) -> Result<Option<LeaseRecord>, KvError> {
181        match self.reader.get(&self.key).await? {
182            Some(entry) => serde_json::from_slice(&entry.value).map(Some).map_err(|e| {
183                KvError::SerializationError(format!(
184                    "lease key {:?} holds an unparseable value: {e}",
185                    self.key
186                ))
187            }),
188            None => Ok(None),
189        }
190    }
191
192    fn guard(&self, record: LeaseRecord, version: VersionToken) -> LeaseGuard {
193        LeaseGuard {
194            writer: Arc::clone(&self.writer),
195            key: self.key.clone(),
196            record,
197            version,
198            resolved: false,
199        }
200    }
201}
202
203impl LeaseGuard {
204    /// The record this guard wrote when it won the round.
205    pub fn record(&self) -> &LeaseRecord {
206        &self.record
207    }
208
209    /// Give the round back early: a failed export/upload should not suppress
210    /// the fleet for the rest of the ttl. CAS-deletes the lease (tombstone)
211    /// against this guard's version, so the next trigger on any node can win a
212    /// fresh round immediately.
213    ///
214    /// Best-effort: a CAS conflict (someone already took over) or write error
215    /// is logged, not surfaced — worst case the round waits out its ttl, which
216    /// is the no-abandon behavior anyway.
217    pub async fn abandon(mut self) {
218        self.resolved = true;
219        match self
220            .writer
221            .delete_with_version(&self.key, &self.version)
222            .await
223        {
224            Ok(_) => {
225                debug!(key = %self.key, holder = %self.record.holder_id, "export lease abandoned");
226            }
227            Err(e) => {
228                warn!(
229                    key = %self.key,
230                    holder = %self.record.holder_id,
231                    error = %e,
232                    "failed to abandon export lease; next round waits for expiry"
233                );
234            }
235        }
236    }
237
238    /// Publish the round's outcome: rewrite the lease value with the exported
239    /// cursor and completion time (expiry unchanged — the round still runs its
240    /// full period).
241    ///
242    /// Best-effort observability: a CAS conflict means the lease was already
243    /// taken over (this round overran its ttl) and is logged, not surfaced —
244    /// the artifact is already safe wherever the caller put it.
245    pub async fn complete(mut self, cursor: &WatchCursor) -> Result<(), KvError> {
246        self.resolved = true;
247        self.record.completed_cursor_hex = Some(hex_encode(cursor.version().as_bytes()));
248        self.record.completed_at_unix = Some(unix_now());
249        let bytes = serde_json::to_vec(&self.record)
250            .map_err(|e| KvError::SerializationError(e.to_string()))?;
251        match self.writer.update(&self.key, &bytes, &self.version).await {
252            Ok(_) => Ok(()),
253            Err(KvError::RevisionMismatch) => {
254                warn!(
255                    key = %self.key,
256                    holder = %self.record.holder_id,
257                    "export round overran its lease; completion record skipped"
258                );
259                Ok(())
260            }
261            Err(e) => Err(e),
262        }
263    }
264}
265
266impl Drop for LeaseGuard {
267    fn drop(&mut self) {
268        if !self.resolved {
269            warn!(
270                key = %self.key,
271                holder = %self.record.holder_id,
272                "LeaseGuard dropped without complete() or abandon(); the fleet waits out the lease ttl"
273            );
274        }
275    }
276}
277
278fn unix_now() -> u64 {
279    match SystemTime::now().duration_since(UNIX_EPOCH) {
280        Ok(d) => d.as_secs(),
281        Err(_) => {
282            // A pre-epoch clock means every lease this node writes is already
283            // expired (`expires_at = 0 + ttl` is in the past), so any node can
284            // steal it — duplicate exports every round, which the lease design
285            // tolerates (dedup, not correctness). That direction is deliberate:
286            // the alternative sentinel (`u64::MAX`) would mint a never-expiring
287            // lease that wedges the fleet until manual cleanup. But it must not
288            // be silent — duplicate artifacts with no log line is undebuggable.
289            warn!(
290                "system clock predates the Unix epoch; lease expiry math degraded (expect duplicate export rounds until the clock is fixed)"
291            );
292            0
293        }
294    }
295}