Skip to main content

ff_backend_postgres/
worker_registry.rs

1//! RFC-025 Phase 3 — Postgres worker-registry bodies + TTL-sweep.
2//!
3//! Backs the five `EngineBackend` worker-registry methods
4//! (`register_worker`, `heartbeat_worker`, `mark_worker_dead`,
5//! `list_expired_leases`, `list_workers`) on Postgres, plus the
6//! `worker_registry_ttl_sweep` reconciler that prunes rows whose
7//! `last_heartbeat_ms + liveness_ttl_ms` has elapsed.
8//!
9//! Schema shape lives in `migrations/0021_worker_registry.sql`
10//! (current-state table + append-only event log). Partition key is
11//! `(fnv1a_u64(worker_instance_id.as_bytes()) % 256) as i16` — identical
12//! to the Valkey-side hashing so `register` / `heartbeat` /
13//! `mark_dead` / `ttl_sweep` all land on the same row for the same
14//! instance id.
15
16use std::collections::BTreeSet;
17
18use sqlx::{PgPool, Row};
19use uuid::Uuid;
20
21use ff_core::contracts::{
22    ExpiredLeaseInfo, ExpiredLeasesCursor, HeartbeatWorkerArgs, HeartbeatWorkerOutcome,
23    LIST_EXPIRED_LEASES_DEFAULT_LIMIT, LIST_EXPIRED_LEASES_MAX_LIMIT, ListExpiredLeasesArgs,
24    ListExpiredLeasesResult, ListWorkersArgs, ListWorkersResult, MARK_WORKER_DEAD_REASON_MAX_BYTES,
25    MarkWorkerDeadArgs, MarkWorkerDeadOutcome, RegisterWorkerArgs, RegisterWorkerOutcome,
26    WorkerInfo,
27};
28use ff_core::engine_error::{EngineError, ValidationKind};
29use ff_core::hash::fnv1a_u64;
30use ff_core::types::{
31    AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, Namespace, TimestampMs, WorkerId,
32    WorkerInstanceId,
33};
34
35use crate::error::map_sqlx_error;
36use crate::reconcilers::ScanReport;
37
38/// Derive the `ff_worker_registry.partition_key` for an instance id.
39/// Must match the Valkey-side hashing (both paths own the same row).
40pub fn worker_partition_key(worker_instance_id: &str) -> i16 {
41    (fnv1a_u64(worker_instance_id.as_bytes()) % 256) as i16
42}
43
44/// Postgres has no stable `lease_id` column on `ff_attempt` (identity
45/// is `(execution_id, attempt_index, lease_epoch)`; see
46/// `operator::synthetic_lease_id`). For `list_expired_leases` we need
47/// to surface a `LeaseId` — derive a deterministic UUID from the same
48/// triple so two callers that observe the same lease see the same id.
49fn synthetic_lease_uuid(exec_uuid: Uuid, attempt_index: i32, lease_epoch: i64) -> Uuid {
50    // 16 bytes of seed material: exec UUID (16) XOR-folded with
51    // attempt_index (4) + lease_epoch (8) packed into the tail. Not a
52    // cryptographic derivation — just stable + collision-resistant
53    // enough within a single execution's attempts.
54    let mut bytes = *exec_uuid.as_bytes();
55    let ai = attempt_index.to_be_bytes();
56    let le = lease_epoch.to_be_bytes();
57    for i in 0..4 {
58        bytes[8 + i] ^= ai[i];
59    }
60    for i in 0..8 {
61        bytes[i] ^= le[i];
62    }
63    Uuid::from_bytes(bytes)
64}
65
66// ── register_worker ──────────────────────────────────────────────
67
68pub async fn register_worker(
69    pool: &PgPool,
70    args: RegisterWorkerArgs,
71) -> Result<RegisterWorkerOutcome, EngineError> {
72    let partition_key = worker_partition_key(args.worker_instance_id.as_str());
73    let lanes_sorted: Vec<String> = args.lanes.iter().map(|l| l.0.clone()).collect();
74    let caps_csv: String = args
75        .capabilities
76        .iter()
77        .cloned()
78        .collect::<Vec<String>>()
79        .join(",");
80
81    // Single tx: (1) reject instance_id reassignment under a different
82    // `worker_id`; (2) upsert the row + return whether it was a fresh
83    // insert or an overwrite (`xmax = 0` returns true iff the row was
84    // inserted); (3) append a `registered` event.
85    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
86
87    // Conflict check — if a row exists with the same (partition_key,
88    // namespace, worker_instance_id) but a different `worker_id`,
89    // reject per RFC-025 §4.
90    let existing_worker_id: Option<String> = sqlx::query_scalar(
91        "SELECT worker_id FROM ff_worker_registry \
92         WHERE partition_key = $1 AND namespace = $2 AND worker_instance_id = $3 \
93         FOR UPDATE",
94    )
95    .bind(partition_key)
96    .bind(args.namespace.as_str())
97    .bind(args.worker_instance_id.as_str())
98    .fetch_optional(&mut *tx)
99    .await
100    .map_err(map_sqlx_error)?;
101
102    if let Some(existing) = existing_worker_id.as_deref()
103        && existing != args.worker_id.as_str()
104    {
105        return Err(EngineError::Validation {
106            kind: ValidationKind::InvalidInput,
107            detail: "instance_id reassigned".into(),
108        });
109    }
110
111    // `RETURNING (xmax = 0)` is the idiomatic "was this an insert?"
112    // signal on Postgres UPSERTs: `xmax` is 0 on an insert, non-zero
113    // on an update-path through ON CONFLICT.
114    let registered: bool = sqlx::query_scalar(
115        "INSERT INTO ff_worker_registry (\
116             partition_key, namespace, worker_instance_id, worker_id, \
117             lanes, capabilities_csv, last_heartbeat_ms, liveness_ttl_ms, \
118             registered_at_ms\
119         ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
120         ON CONFLICT (partition_key, namespace, worker_instance_id) DO UPDATE SET \
121             worker_id = EXCLUDED.worker_id, \
122             lanes = EXCLUDED.lanes, \
123             capabilities_csv = EXCLUDED.capabilities_csv, \
124             last_heartbeat_ms = EXCLUDED.last_heartbeat_ms, \
125             liveness_ttl_ms = EXCLUDED.liveness_ttl_ms \
126         RETURNING (xmax = 0)",
127    )
128    .bind(partition_key)
129    .bind(args.namespace.as_str())
130    .bind(args.worker_instance_id.as_str())
131    .bind(args.worker_id.as_str())
132    .bind(&lanes_sorted)
133    .bind(caps_csv.as_str())
134    .bind(args.now.0)
135    .bind(args.liveness_ttl_ms as i64)
136    .bind(args.now.0)
137    .fetch_one(&mut *tx)
138    .await
139    .map_err(map_sqlx_error)?;
140
141    sqlx::query(
142        "INSERT INTO ff_worker_registry_event \
143             (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
144         VALUES ($1, $2, $3, 'registered', $4, NULL) \
145         ON CONFLICT DO NOTHING",
146    )
147    .bind(partition_key)
148    .bind(args.namespace.as_str())
149    .bind(args.worker_instance_id.as_str())
150    .bind(args.now.0)
151    .execute(&mut *tx)
152    .await
153    .map_err(map_sqlx_error)?;
154
155    tx.commit().await.map_err(map_sqlx_error)?;
156
157    Ok(if registered {
158        RegisterWorkerOutcome::Registered
159    } else {
160        RegisterWorkerOutcome::Refreshed
161    })
162}
163
164// ── heartbeat_worker ─────────────────────────────────────────────
165
166pub async fn heartbeat_worker(
167    pool: &PgPool,
168    args: HeartbeatWorkerArgs,
169) -> Result<HeartbeatWorkerOutcome, EngineError> {
170    let partition_key = worker_partition_key(args.worker_instance_id.as_str());
171
172    // Refresh `last_heartbeat_ms` only when the row is still within
173    // its declared TTL window. If the TTL has lapsed, the TTL-sweep
174    // scanner will drop the row on its next tick — surface
175    // `NotRegistered` rather than re-upping a logically-dead worker.
176    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
177
178    let ttl_row: Option<i64> = sqlx::query_scalar(
179        "UPDATE ff_worker_registry SET last_heartbeat_ms = $4 \
180         WHERE partition_key = $1 AND namespace = $2 AND worker_instance_id = $3 \
181           AND last_heartbeat_ms + liveness_ttl_ms > $4 \
182         RETURNING liveness_ttl_ms",
183    )
184    .bind(partition_key)
185    .bind(args.namespace.as_str())
186    .bind(args.worker_instance_id.as_str())
187    .bind(args.now.0)
188    .fetch_optional(&mut *tx)
189    .await
190    .map_err(map_sqlx_error)?;
191
192    let Some(ttl_ms) = ttl_row else {
193        tx.commit().await.map_err(map_sqlx_error)?;
194        return Ok(HeartbeatWorkerOutcome::NotRegistered);
195    };
196
197    sqlx::query(
198        "INSERT INTO ff_worker_registry_event \
199             (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
200         VALUES ($1, $2, $3, 'heartbeat', $4, NULL) \
201         ON CONFLICT DO NOTHING",
202    )
203    .bind(partition_key)
204    .bind(args.namespace.as_str())
205    .bind(args.worker_instance_id.as_str())
206    .bind(args.now.0)
207    .execute(&mut *tx)
208    .await
209    .map_err(map_sqlx_error)?;
210
211    tx.commit().await.map_err(map_sqlx_error)?;
212
213    let next_expiry_ms = TimestampMs::from_millis(args.now.0.saturating_add(ttl_ms));
214    Ok(HeartbeatWorkerOutcome::Refreshed { next_expiry_ms })
215}
216
217// ── mark_worker_dead ─────────────────────────────────────────────
218
219pub async fn mark_worker_dead(
220    pool: &PgPool,
221    args: MarkWorkerDeadArgs,
222) -> Result<MarkWorkerDeadOutcome, EngineError> {
223    // Mirrors the Valkey body's validation (§Rev-2 item 9): 256-byte
224    // reason cap + no control characters. Reject oversize / invalid
225    // before touching storage.
226    if args.reason.len() > MARK_WORKER_DEAD_REASON_MAX_BYTES {
227        return Err(EngineError::Validation {
228            kind: ValidationKind::InvalidInput,
229            detail: format!(
230                "reason: exceeds {} bytes (got {})",
231                MARK_WORKER_DEAD_REASON_MAX_BYTES,
232                args.reason.len()
233            ),
234        });
235    }
236    if args.reason.chars().any(|c| c.is_control()) {
237        return Err(EngineError::Validation {
238            kind: ValidationKind::InvalidInput,
239            detail: "reason: must not contain control characters".into(),
240        });
241    }
242
243    let partition_key = worker_partition_key(args.worker_instance_id.as_str());
244
245    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
246
247    let deleted: i64 = sqlx::query_scalar(
248        "WITH d AS (\
249             DELETE FROM ff_worker_registry \
250             WHERE partition_key = $1 AND namespace = $2 AND worker_instance_id = $3 \
251             RETURNING 1 AS x\
252         ) SELECT COUNT(*) FROM d",
253    )
254    .bind(partition_key)
255    .bind(args.namespace.as_str())
256    .bind(args.worker_instance_id.as_str())
257    .fetch_one(&mut *tx)
258    .await
259    .map_err(map_sqlx_error)?;
260
261    if deleted == 0 {
262        tx.commit().await.map_err(map_sqlx_error)?;
263        return Ok(MarkWorkerDeadOutcome::NotRegistered);
264    }
265
266    sqlx::query(
267        "INSERT INTO ff_worker_registry_event \
268             (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
269         VALUES ($1, $2, $3, 'marked_dead', $4, $5) \
270         ON CONFLICT DO NOTHING",
271    )
272    .bind(partition_key)
273    .bind(args.namespace.as_str())
274    .bind(args.worker_instance_id.as_str())
275    .bind(args.now.0)
276    .bind(args.reason.as_str())
277    .execute(&mut *tx)
278    .await
279    .map_err(map_sqlx_error)?;
280
281    tx.commit().await.map_err(map_sqlx_error)?;
282    Ok(MarkWorkerDeadOutcome::Marked)
283}
284
285// ── list_expired_leases ──────────────────────────────────────────
286
287pub async fn list_expired_leases(
288    pool: &PgPool,
289    args: ListExpiredLeasesArgs,
290) -> Result<ListExpiredLeasesResult, EngineError> {
291    let limit = args
292        .limit
293        .unwrap_or(LIST_EXPIRED_LEASES_DEFAULT_LIMIT)
294        .min(LIST_EXPIRED_LEASES_MAX_LIMIT) as i64;
295    // `max_partitions_per_call` is a Valkey ZSET fan-out knob; on PG
296    // the partial index `ff_attempt_lease_expiry_idx` already covers
297    // all partitions in one scan, so the value is accepted-and-ignored.
298    let _ = args.max_partitions_per_call;
299
300    // Cursor tuple: `(expires_at_ms, execution_id)` strict-greater
301    // than `(cursor.expires_at_ms, cursor.execution_id)` so pagination
302    // is stable under equal-expiry.
303    let (cursor_expiry, cursor_eid_str): (Option<i64>, Option<String>) = match args.after.as_ref() {
304        Some(c) => (Some(c.expires_at_ms.0), Some(c.execution_id.to_string())),
305        None => (None, None),
306    };
307    let namespace_filter: Option<&str> = args.namespace.as_ref().map(|n| n.as_str());
308
309    // Join `ff_attempt` (lease state) with `ff_exec_core` (namespace
310    // filter + partition-prefix reconstruction of the ExecutionId
311    // wire form). `ff_attempt_lease_expiry_idx` (partial, from
312    // migration 0001) keys the scan on `(partition_key,
313    // lease_expires_at_ms)`. Cross-partition order is enforced at
314    // the `ORDER BY` level.
315    // `ff_exec_core` has no `namespace` column — namespace lives in
316    // `raw_fields jsonb` (see migrations/0001_initial.sql:98-122).
317    // Phase 3 referenced `c.namespace` directly which would crash at
318    // runtime; `raw_fields->>'namespace'` is the authoritative read
319    // path.
320    let rows = sqlx::query(
321        "SELECT a.partition_key, a.execution_id, a.attempt_index, a.lease_epoch, \
322                a.worker_instance_id, a.lease_expires_at_ms \
323           FROM ff_attempt a \
324           JOIN ff_exec_core c \
325             ON c.partition_key = a.partition_key AND c.execution_id = a.execution_id \
326          WHERE a.lease_expires_at_ms IS NOT NULL \
327            AND a.lease_expires_at_ms <= $1 \
328            AND a.worker_instance_id IS NOT NULL \
329            AND c.public_state IN ('claimed', 'running') \
330            AND ($2::text IS NULL OR c.raw_fields->>'namespace' = $2) \
331            AND ($3::bigint IS NULL \
332                 OR (a.lease_expires_at_ms, a.execution_id::text) > ($3, $4)) \
333          ORDER BY a.lease_expires_at_ms ASC, a.execution_id ASC \
334          LIMIT $5",
335    )
336    .bind(args.as_of.0)
337    .bind(namespace_filter)
338    .bind(cursor_expiry)
339    .bind(cursor_eid_str.as_deref().unwrap_or(""))
340    .bind(limit)
341    .fetch_all(pool)
342    .await
343    .map_err(map_sqlx_error)?;
344
345    let mut entries: Vec<ExpiredLeaseInfo> = Vec::with_capacity(rows.len());
346    for row in &rows {
347        let partition_key: i16 = row.try_get("partition_key").map_err(map_sqlx_error)?;
348        let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
349        let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
350        let lease_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
351        let worker_inst: String = row.try_get("worker_instance_id").map_err(map_sqlx_error)?;
352        let expires_at_ms: i64 = row.try_get("lease_expires_at_ms").map_err(map_sqlx_error)?;
353
354        let eid_str = format!("{{fp:{}}}:{}", partition_key, exec_uuid);
355        let execution_id = ExecutionId::parse(&eid_str).map_err(|e| EngineError::Validation {
356            kind: ValidationKind::Corruption,
357            detail: format!("list_expired_leases: bad execution_id {eid_str:?}: {e}"),
358        })?;
359
360        let lease_id = LeaseId::from_uuid(synthetic_lease_uuid(exec_uuid, attempt_index, lease_epoch));
361        let attempt_index_u = u32::try_from(attempt_index.max(0)).unwrap_or(0);
362        let lease_epoch_u = u64::try_from(lease_epoch.max(0)).unwrap_or(0);
363
364        entries.push(ExpiredLeaseInfo::new(
365            execution_id,
366            lease_id,
367            LeaseEpoch::new(lease_epoch_u),
368            WorkerInstanceId::new(worker_inst),
369            TimestampMs::from_millis(expires_at_ms),
370            AttemptIndex::new(attempt_index_u),
371        ));
372    }
373
374    let page_full = rows.len() as i64 >= limit;
375    let cursor = if page_full {
376        entries
377            .last()
378            .map(|e| ExpiredLeasesCursor::new(e.expires_at_ms, e.execution_id.clone()))
379    } else {
380        None
381    };
382    Ok(ListExpiredLeasesResult::new(entries, cursor))
383}
384
385// ── list_workers ─────────────────────────────────────────────────
386
387pub async fn list_workers(
388    pool: &PgPool,
389    args: ListWorkersArgs,
390) -> Result<ListWorkersResult, EngineError> {
391    // RFC-025 §9.4: cross-namespace enumeration requires a two-key
392    // cursor (namespace, worker_instance_id) because instance_ids
393    // collide across namespaces. The Phase-1 contract's cursor is
394    // single-key `Option<WorkerInstanceId>`; expanding to a tuple
395    // would reopen Phase 1. Reject cross-ns with `Unavailable` to
396    // match Phase-2 Valkey behaviour; operator tooling can loop per
397    // namespace until the tuple-cursor RFC lands.
398    let Some(ns) = args.namespace.as_ref() else {
399        return Err(EngineError::Unavailable {
400            op: "list_workers (cross-namespace on Postgres — pass namespace explicitly)",
401        });
402    };
403    let limit = args.limit.unwrap_or(1000) as i64;
404    let namespace_filter: &str = ns.as_str();
405    let after_cursor: Option<&str> = args.after.as_ref().map(|w| w.as_str());
406
407    let rows = sqlx::query(
408        "SELECT worker_id, worker_instance_id, namespace, lanes, \
409                capabilities_csv, last_heartbeat_ms, liveness_ttl_ms, registered_at_ms \
410           FROM ff_worker_registry \
411          WHERE namespace = $1 \
412            AND ($2::text IS NULL OR worker_instance_id > $2) \
413          ORDER BY worker_instance_id ASC \
414          LIMIT $3",
415    )
416    .bind(namespace_filter)
417    .bind(after_cursor)
418    .bind(limit)
419    .fetch_all(pool)
420    .await
421    .map_err(map_sqlx_error)?;
422
423    let mut entries: Vec<WorkerInfo> = Vec::with_capacity(rows.len());
424    for row in &rows {
425        let worker_id: String = row.try_get("worker_id").map_err(map_sqlx_error)?;
426        let worker_inst: String = row.try_get("worker_instance_id").map_err(map_sqlx_error)?;
427        let namespace: String = row.try_get("namespace").map_err(map_sqlx_error)?;
428        let lanes_vec: Vec<String> = row.try_get("lanes").map_err(map_sqlx_error)?;
429        let caps_csv: String = row.try_get("capabilities_csv").map_err(map_sqlx_error)?;
430        let last_hb_ms: i64 = row.try_get("last_heartbeat_ms").map_err(map_sqlx_error)?;
431        let liveness_ttl_ms: i64 = row.try_get("liveness_ttl_ms").map_err(map_sqlx_error)?;
432        let registered_at_ms: i64 = row.try_get("registered_at_ms").map_err(map_sqlx_error)?;
433
434        let lanes: BTreeSet<LaneId> = lanes_vec.into_iter().map(LaneId).collect();
435        let capabilities: BTreeSet<String> = caps_csv
436            .split(',')
437            .filter(|s| !s.is_empty())
438            .map(str::to_owned)
439            .collect();
440
441        entries.push(WorkerInfo::new(
442            WorkerId::new(worker_id),
443            WorkerInstanceId::new(worker_inst),
444            Namespace::new(namespace),
445            lanes,
446            capabilities,
447            TimestampMs::from_millis(last_hb_ms),
448            u64::try_from(liveness_ttl_ms.max(0)).unwrap_or(0),
449            TimestampMs::from_millis(registered_at_ms),
450        ));
451    }
452
453    let page_full = rows.len() as i64 >= limit;
454    let cursor = if page_full {
455        entries.last().map(|w| w.worker_instance_id.clone())
456    } else {
457        None
458    };
459    Ok(ListWorkersResult::new(entries, cursor))
460}
461
462// ── TTL-sweep scanner ────────────────────────────────────────────
463
464/// Per-partition TTL sweep. Mirrors Valkey's native PEXPIRE behaviour
465/// for PG/SQLite: rows whose `last_heartbeat_ms + liveness_ttl_ms`
466/// falls strictly below `now_ms` are deleted and a `ttl_swept` event
467/// is appended to the audit log.
468///
469/// Returns a `ScanReport` for the supervisor's per-tick log.
470pub async fn ttl_sweep_tick(pool: &PgPool, partition_key: i16) -> Result<ScanReport, EngineError> {
471    let now_ms: i64 = i64::try_from(
472        std::time::SystemTime::now()
473            .duration_since(std::time::UNIX_EPOCH)
474            .map(|d| d.as_millis())
475            .unwrap_or(0),
476    )
477    .unwrap_or(i64::MAX);
478
479    // DELETE + append-to-event-log in a single statement via CTE so
480    // the sweep is atomic per row: never a row deleted without an
481    // event, never an event without a delete. Concurrent
482    // `mark_worker_dead` on the same row deletes first + writes its
483    // own event; the sweep's DELETE finds zero rows, the INSERT-SELECT
484    // inserts zero rows. Idempotent.
485    let report_rows = sqlx::query(
486        "WITH swept AS (\
487             DELETE FROM ff_worker_registry \
488             WHERE partition_key = $1 \
489               AND last_heartbeat_ms + liveness_ttl_ms < $2 \
490             RETURNING partition_key, namespace, worker_instance_id\
491         ), ev AS (\
492             INSERT INTO ff_worker_registry_event \
493                 (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
494             SELECT partition_key, namespace, worker_instance_id, 'ttl_swept', $2, NULL \
495               FROM swept \
496             ON CONFLICT DO NOTHING \
497             RETURNING 1\
498         ) SELECT COUNT(*) AS swept FROM swept",
499    )
500    .bind(partition_key)
501    .bind(now_ms)
502    .fetch_one(pool)
503    .await
504    .map_err(map_sqlx_error)?;
505
506    let processed: i64 = report_rows.try_get("swept").map_err(map_sqlx_error)?;
507    Ok(ScanReport {
508        processed: u32::try_from(processed.max(0)).unwrap_or(u32::MAX),
509        errors: 0,
510    })
511}