Skip to main content

ff_backend_postgres/
worker_registry.rs

1//! RFC-025 Phase 3 — Postgres worker-registry bodies + TTL-sweep.
2//!
3//! Backs the five `EngineBackend` worker-registry methods
4//! (`register_worker`, `heartbeat_worker`, `mark_worker_dead`,
5//! `list_expired_leases`, `list_workers`) on Postgres, plus the
6//! `worker_registry_ttl_sweep` reconciler that prunes rows whose
7//! `last_heartbeat_ms + liveness_ttl_ms` has elapsed.
8//!
9//! Schema shape lives in `migrations/0021_worker_registry.sql`
10//! (current-state table + append-only event log). Partition key is
11//! `(fnv1a_u64(worker_instance_id.as_bytes()) % 256) as i16` — identical
12//! to the Valkey-side hashing so `register` / `heartbeat` /
13//! `mark_dead` / `ttl_sweep` all land on the same row for the same
14//! instance id.
15
16use std::collections::BTreeSet;
17
18use sqlx::{PgPool, Row};
19use uuid::Uuid;
20
21use ff_core::contracts::{
22    ExpiredLeaseInfo, ExpiredLeasesCursor, HeartbeatWorkerArgs, HeartbeatWorkerOutcome,
23    LIST_EXPIRED_LEASES_DEFAULT_LIMIT, LIST_EXPIRED_LEASES_MAX_LIMIT, ListExpiredLeasesArgs,
24    ListExpiredLeasesResult, ListWorkersArgs, ListWorkersResult, MARK_WORKER_DEAD_REASON_MAX_BYTES,
25    MarkWorkerDeadArgs, MarkWorkerDeadOutcome, RegisterWorkerArgs, RegisterWorkerOutcome,
26    WorkerInfo,
27};
28use ff_core::engine_error::{EngineError, ValidationKind};
29use ff_core::hash::fnv1a_u64;
30use ff_core::types::{
31    AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, Namespace, TimestampMs, WorkerId,
32    WorkerInstanceId,
33};
34
35use crate::error::map_sqlx_error;
36use crate::reconcilers::ScanReport;
37
38/// Derive the `ff_worker_registry.partition_key` for an instance id.
39/// Must match the Valkey-side hashing (both paths own the same row).
40pub fn worker_partition_key(worker_instance_id: &str) -> i16 {
41    (fnv1a_u64(worker_instance_id.as_bytes()) % 256) as i16
42}
43
44/// Postgres has no stable `lease_id` column on `ff_attempt` (identity
45/// is `(execution_id, attempt_index, lease_epoch)`; see
46/// `operator::synthetic_lease_id`). For `list_expired_leases` we need
47/// to surface a `LeaseId` — derive a deterministic UUID from the same
48/// triple so two callers that observe the same lease see the same id.
49fn synthetic_lease_uuid(exec_uuid: Uuid, attempt_index: i32, lease_epoch: i64) -> Uuid {
50    // 16 bytes of seed material: exec UUID (16) XOR-folded with
51    // attempt_index (4) + lease_epoch (8) packed into the tail. Not a
52    // cryptographic derivation — just stable + collision-resistant
53    // enough within a single execution's attempts.
54    let mut bytes = *exec_uuid.as_bytes();
55    let ai = attempt_index.to_be_bytes();
56    let le = lease_epoch.to_be_bytes();
57    for i in 0..4 {
58        bytes[8 + i] ^= ai[i];
59    }
60    for i in 0..8 {
61        bytes[i] ^= le[i];
62    }
63    Uuid::from_bytes(bytes)
64}
65
66// ── register_worker ──────────────────────────────────────────────
67
68pub async fn register_worker(
69    pool: &PgPool,
70    args: RegisterWorkerArgs,
71) -> Result<RegisterWorkerOutcome, EngineError> {
72    let partition_key = worker_partition_key(args.worker_instance_id.as_str());
73    let lanes_sorted: Vec<String> = args.lanes.iter().map(|l| l.0.clone()).collect();
74    let caps_csv: String = args
75        .capabilities
76        .iter()
77        .cloned()
78        .collect::<Vec<String>>()
79        .join(",");
80
81    // Single tx: (1) reject instance_id reassignment under a different
82    // `worker_id`; (2) upsert the row + return whether it was a fresh
83    // insert or an overwrite (`xmax = 0` returns true iff the row was
84    // inserted); (3) append a `registered` event.
85    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
86
87    // Conflict check — if a row exists with the same (partition_key,
88    // namespace, worker_instance_id) but a different `worker_id`,
89    // reject per RFC-025 §4.
90    let existing_worker_id: Option<String> = sqlx::query_scalar(
91        "SELECT worker_id FROM ff_worker_registry \
92         WHERE partition_key = $1 AND namespace = $2 AND worker_instance_id = $3 \
93         FOR UPDATE",
94    )
95    .bind(partition_key)
96    .bind(args.namespace.as_str())
97    .bind(args.worker_instance_id.as_str())
98    .fetch_optional(&mut *tx)
99    .await
100    .map_err(map_sqlx_error)?;
101
102    if let Some(existing) = existing_worker_id.as_deref()
103        && existing != args.worker_id.as_str()
104    {
105        return Err(EngineError::Validation {
106            kind: ValidationKind::InvalidInput,
107            detail: "instance_id reassigned".into(),
108        });
109    }
110
111    // "Was this an INSERT vs a conflict-UPDATE?" signal.
112    //
113    // Postgres 16 + sqlx 0.8.x rejects any `RETURNING` projection
114    // that references the `xmax` system column with SQLSTATE 0A000
115    // "cannot retrieve a system column in this context"
116    // (tts_virtual_getsysattr, execTuples.c:144 — cairn bug FF #508).
117    // This is true even for `xmax::text` / `(xmax = 0)::bool` etc.
118    // because sqlx's portal path uses a virtual tuple slot that PG
119    // 16 rejects system-column access from.
120    //
121    // Workaround: issue a separate SELECT to learn if the target
122    // row existed before the UPSERT. Wrap both statements in the
123    // transaction we already hold so the read is consistent with
124    // the write.
125    let pre_existed: Option<i64> = sqlx::query_scalar(
126        "SELECT 1::bigint FROM ff_worker_registry \
127          WHERE partition_key = $1 \
128            AND namespace = $2 \
129            AND worker_instance_id = $3",
130    )
131    .bind(partition_key)
132    .bind(args.namespace.as_str())
133    .bind(args.worker_instance_id.as_str())
134    .fetch_optional(&mut *tx)
135    .await
136    .map_err(map_sqlx_error)?;
137
138    sqlx::query(
139        "INSERT INTO ff_worker_registry (\
140             partition_key, namespace, worker_instance_id, worker_id, \
141             lanes, capabilities_csv, last_heartbeat_ms, liveness_ttl_ms, \
142             registered_at_ms\
143         ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
144         ON CONFLICT (partition_key, namespace, worker_instance_id) DO UPDATE SET \
145             worker_id = EXCLUDED.worker_id, \
146             lanes = EXCLUDED.lanes, \
147             capabilities_csv = EXCLUDED.capabilities_csv, \
148             last_heartbeat_ms = EXCLUDED.last_heartbeat_ms, \
149             liveness_ttl_ms = EXCLUDED.liveness_ttl_ms",
150    )
151    .bind(partition_key)
152    .bind(args.namespace.as_str())
153    .bind(args.worker_instance_id.as_str())
154    .bind(args.worker_id.as_str())
155    .bind(&lanes_sorted)
156    .bind(caps_csv.as_str())
157    .bind(args.now.0)
158    .bind(args.liveness_ttl_ms as i64)
159    .bind(args.now.0)
160    .execute(&mut *tx)
161    .await
162    .map_err(map_sqlx_error)?;
163    let registered = pre_existed.is_none();
164
165    sqlx::query(
166        "INSERT INTO ff_worker_registry_event \
167             (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
168         VALUES ($1, $2, $3, 'registered', $4, NULL) \
169         ON CONFLICT DO NOTHING",
170    )
171    .bind(partition_key)
172    .bind(args.namespace.as_str())
173    .bind(args.worker_instance_id.as_str())
174    .bind(args.now.0)
175    .execute(&mut *tx)
176    .await
177    .map_err(map_sqlx_error)?;
178
179    tx.commit().await.map_err(map_sqlx_error)?;
180
181    Ok(if registered {
182        RegisterWorkerOutcome::Registered
183    } else {
184        RegisterWorkerOutcome::Refreshed
185    })
186}
187
188// ── heartbeat_worker ─────────────────────────────────────────────
189
190pub async fn heartbeat_worker(
191    pool: &PgPool,
192    args: HeartbeatWorkerArgs,
193) -> Result<HeartbeatWorkerOutcome, EngineError> {
194    let partition_key = worker_partition_key(args.worker_instance_id.as_str());
195
196    // Refresh `last_heartbeat_ms` only when the row is still within
197    // its declared TTL window. If the TTL has lapsed, the TTL-sweep
198    // scanner will drop the row on its next tick — surface
199    // `NotRegistered` rather than re-upping a logically-dead worker.
200    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
201
202    let ttl_row: Option<i64> = sqlx::query_scalar(
203        "UPDATE ff_worker_registry SET last_heartbeat_ms = $4 \
204         WHERE partition_key = $1 AND namespace = $2 AND worker_instance_id = $3 \
205           AND last_heartbeat_ms + liveness_ttl_ms > $4 \
206         RETURNING liveness_ttl_ms",
207    )
208    .bind(partition_key)
209    .bind(args.namespace.as_str())
210    .bind(args.worker_instance_id.as_str())
211    .bind(args.now.0)
212    .fetch_optional(&mut *tx)
213    .await
214    .map_err(map_sqlx_error)?;
215
216    let Some(ttl_ms) = ttl_row else {
217        tx.commit().await.map_err(map_sqlx_error)?;
218        return Ok(HeartbeatWorkerOutcome::NotRegistered);
219    };
220
221    sqlx::query(
222        "INSERT INTO ff_worker_registry_event \
223             (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
224         VALUES ($1, $2, $3, 'heartbeat', $4, NULL) \
225         ON CONFLICT DO NOTHING",
226    )
227    .bind(partition_key)
228    .bind(args.namespace.as_str())
229    .bind(args.worker_instance_id.as_str())
230    .bind(args.now.0)
231    .execute(&mut *tx)
232    .await
233    .map_err(map_sqlx_error)?;
234
235    tx.commit().await.map_err(map_sqlx_error)?;
236
237    let next_expiry_ms = TimestampMs::from_millis(args.now.0.saturating_add(ttl_ms));
238    Ok(HeartbeatWorkerOutcome::Refreshed { next_expiry_ms })
239}
240
241// ── mark_worker_dead ─────────────────────────────────────────────
242
243pub async fn mark_worker_dead(
244    pool: &PgPool,
245    args: MarkWorkerDeadArgs,
246) -> Result<MarkWorkerDeadOutcome, EngineError> {
247    // Mirrors the Valkey body's validation (§Rev-2 item 9): 256-byte
248    // reason cap + no control characters. Reject oversize / invalid
249    // before touching storage.
250    if args.reason.len() > MARK_WORKER_DEAD_REASON_MAX_BYTES {
251        return Err(EngineError::Validation {
252            kind: ValidationKind::InvalidInput,
253            detail: format!(
254                "reason: exceeds {} bytes (got {})",
255                MARK_WORKER_DEAD_REASON_MAX_BYTES,
256                args.reason.len()
257            ),
258        });
259    }
260    if args.reason.chars().any(|c| c.is_control()) {
261        return Err(EngineError::Validation {
262            kind: ValidationKind::InvalidInput,
263            detail: "reason: must not contain control characters".into(),
264        });
265    }
266
267    let partition_key = worker_partition_key(args.worker_instance_id.as_str());
268
269    let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
270
271    let deleted: i64 = sqlx::query_scalar(
272        "WITH d AS (\
273             DELETE FROM ff_worker_registry \
274             WHERE partition_key = $1 AND namespace = $2 AND worker_instance_id = $3 \
275             RETURNING 1 AS x\
276         ) SELECT COUNT(*) FROM d",
277    )
278    .bind(partition_key)
279    .bind(args.namespace.as_str())
280    .bind(args.worker_instance_id.as_str())
281    .fetch_one(&mut *tx)
282    .await
283    .map_err(map_sqlx_error)?;
284
285    if deleted == 0 {
286        tx.commit().await.map_err(map_sqlx_error)?;
287        return Ok(MarkWorkerDeadOutcome::NotRegistered);
288    }
289
290    sqlx::query(
291        "INSERT INTO ff_worker_registry_event \
292             (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
293         VALUES ($1, $2, $3, 'marked_dead', $4, $5) \
294         ON CONFLICT DO NOTHING",
295    )
296    .bind(partition_key)
297    .bind(args.namespace.as_str())
298    .bind(args.worker_instance_id.as_str())
299    .bind(args.now.0)
300    .bind(args.reason.as_str())
301    .execute(&mut *tx)
302    .await
303    .map_err(map_sqlx_error)?;
304
305    tx.commit().await.map_err(map_sqlx_error)?;
306    Ok(MarkWorkerDeadOutcome::Marked)
307}
308
309// ── list_expired_leases ──────────────────────────────────────────
310
311pub async fn list_expired_leases(
312    pool: &PgPool,
313    args: ListExpiredLeasesArgs,
314) -> Result<ListExpiredLeasesResult, EngineError> {
315    let limit = args
316        .limit
317        .unwrap_or(LIST_EXPIRED_LEASES_DEFAULT_LIMIT)
318        .min(LIST_EXPIRED_LEASES_MAX_LIMIT) as i64;
319    // `max_partitions_per_call` is a Valkey ZSET fan-out knob; on PG
320    // the partial index `ff_attempt_lease_expiry_idx` already covers
321    // all partitions in one scan, so the value is accepted-and-ignored.
322    let _ = args.max_partitions_per_call;
323
324    // Cursor tuple: `(expires_at_ms, execution_id)` strict-greater
325    // than `(cursor.expires_at_ms, cursor.execution_id)` so pagination
326    // is stable under equal-expiry.
327    let (cursor_expiry, cursor_eid_str): (Option<i64>, Option<String>) = match args.after.as_ref() {
328        Some(c) => (Some(c.expires_at_ms.0), Some(c.execution_id.to_string())),
329        None => (None, None),
330    };
331    let namespace_filter: Option<&str> = args.namespace.as_ref().map(|n| n.as_str());
332
333    // Join `ff_attempt` (lease state) with `ff_exec_core` (namespace
334    // filter + partition-prefix reconstruction of the ExecutionId
335    // wire form). `ff_attempt_lease_expiry_idx` (partial, from
336    // migration 0001) keys the scan on `(partition_key,
337    // lease_expires_at_ms)`. Cross-partition order is enforced at
338    // the `ORDER BY` level.
339    // `ff_exec_core` has no `namespace` column — namespace lives in
340    // `raw_fields jsonb` (see migrations/0001_initial.sql:98-122).
341    // Phase 3 referenced `c.namespace` directly which would crash at
342    // runtime; `raw_fields->>'namespace'` is the authoritative read
343    // path.
344    let rows = sqlx::query(
345        "SELECT a.partition_key, a.execution_id, a.attempt_index, a.lease_epoch, \
346                a.worker_instance_id, a.lease_expires_at_ms \
347           FROM ff_attempt a \
348           JOIN ff_exec_core c \
349             ON c.partition_key = a.partition_key AND c.execution_id = a.execution_id \
350          WHERE a.lease_expires_at_ms IS NOT NULL \
351            AND a.lease_expires_at_ms <= $1 \
352            AND a.worker_instance_id IS NOT NULL \
353            AND c.public_state IN ('claimed', 'running') \
354            AND ($2::text IS NULL OR c.raw_fields->>'namespace' = $2) \
355            AND ($3::bigint IS NULL \
356                 OR (a.lease_expires_at_ms, a.execution_id::text) > ($3, $4)) \
357          ORDER BY a.lease_expires_at_ms ASC, a.execution_id ASC \
358          LIMIT $5",
359    )
360    .bind(args.as_of.0)
361    .bind(namespace_filter)
362    .bind(cursor_expiry)
363    .bind(cursor_eid_str.as_deref().unwrap_or(""))
364    .bind(limit)
365    .fetch_all(pool)
366    .await
367    .map_err(map_sqlx_error)?;
368
369    let mut entries: Vec<ExpiredLeaseInfo> = Vec::with_capacity(rows.len());
370    for row in &rows {
371        let partition_key: i16 = row.try_get("partition_key").map_err(map_sqlx_error)?;
372        let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
373        let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
374        let lease_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
375        let worker_inst: String = row.try_get("worker_instance_id").map_err(map_sqlx_error)?;
376        let expires_at_ms: i64 = row.try_get("lease_expires_at_ms").map_err(map_sqlx_error)?;
377
378        let eid_str = format!("{{fp:{}}}:{}", partition_key, exec_uuid);
379        let execution_id = ExecutionId::parse(&eid_str).map_err(|e| EngineError::Validation {
380            kind: ValidationKind::Corruption,
381            detail: format!("list_expired_leases: bad execution_id {eid_str:?}: {e}"),
382        })?;
383
384        let lease_id = LeaseId::from_uuid(synthetic_lease_uuid(exec_uuid, attempt_index, lease_epoch));
385        let attempt_index_u = u32::try_from(attempt_index.max(0)).unwrap_or(0);
386        let lease_epoch_u = u64::try_from(lease_epoch.max(0)).unwrap_or(0);
387
388        entries.push(ExpiredLeaseInfo::new(
389            execution_id,
390            lease_id,
391            LeaseEpoch::new(lease_epoch_u),
392            WorkerInstanceId::new(worker_inst),
393            TimestampMs::from_millis(expires_at_ms),
394            AttemptIndex::new(attempt_index_u),
395        ));
396    }
397
398    let page_full = rows.len() as i64 >= limit;
399    let cursor = if page_full {
400        entries
401            .last()
402            .map(|e| ExpiredLeasesCursor::new(e.expires_at_ms, e.execution_id.clone()))
403    } else {
404        None
405    };
406    Ok(ListExpiredLeasesResult::new(entries, cursor))
407}
408
409// ── list_workers ─────────────────────────────────────────────────
410
411pub async fn list_workers(
412    pool: &PgPool,
413    args: ListWorkersArgs,
414) -> Result<ListWorkersResult, EngineError> {
415    // RFC-025 §9.4: cross-namespace enumeration requires a two-key
416    // cursor (namespace, worker_instance_id) because instance_ids
417    // collide across namespaces. The Phase-1 contract's cursor is
418    // single-key `Option<WorkerInstanceId>`; expanding to a tuple
419    // would reopen Phase 1. Reject cross-ns with `Unavailable` to
420    // match Phase-2 Valkey behaviour; operator tooling can loop per
421    // namespace until the tuple-cursor RFC lands.
422    let Some(ns) = args.namespace.as_ref() else {
423        return Err(EngineError::Unavailable {
424            op: "list_workers (cross-namespace on Postgres — pass namespace explicitly)",
425        });
426    };
427    let limit = args.limit.unwrap_or(1000) as i64;
428    let namespace_filter: &str = ns.as_str();
429    let after_cursor: Option<&str> = args.after.as_ref().map(|w| w.as_str());
430
431    let rows = sqlx::query(
432        "SELECT worker_id, worker_instance_id, namespace, lanes, \
433                capabilities_csv, last_heartbeat_ms, liveness_ttl_ms, registered_at_ms \
434           FROM ff_worker_registry \
435          WHERE namespace = $1 \
436            AND ($2::text IS NULL OR worker_instance_id > $2) \
437          ORDER BY worker_instance_id ASC \
438          LIMIT $3",
439    )
440    .bind(namespace_filter)
441    .bind(after_cursor)
442    .bind(limit)
443    .fetch_all(pool)
444    .await
445    .map_err(map_sqlx_error)?;
446
447    let mut entries: Vec<WorkerInfo> = Vec::with_capacity(rows.len());
448    for row in &rows {
449        let worker_id: String = row.try_get("worker_id").map_err(map_sqlx_error)?;
450        let worker_inst: String = row.try_get("worker_instance_id").map_err(map_sqlx_error)?;
451        let namespace: String = row.try_get("namespace").map_err(map_sqlx_error)?;
452        let lanes_vec: Vec<String> = row.try_get("lanes").map_err(map_sqlx_error)?;
453        let caps_csv: String = row.try_get("capabilities_csv").map_err(map_sqlx_error)?;
454        let last_hb_ms: i64 = row.try_get("last_heartbeat_ms").map_err(map_sqlx_error)?;
455        let liveness_ttl_ms: i64 = row.try_get("liveness_ttl_ms").map_err(map_sqlx_error)?;
456        let registered_at_ms: i64 = row.try_get("registered_at_ms").map_err(map_sqlx_error)?;
457
458        let lanes: BTreeSet<LaneId> = lanes_vec.into_iter().map(LaneId).collect();
459        let capabilities: BTreeSet<String> = caps_csv
460            .split(',')
461            .filter(|s| !s.is_empty())
462            .map(str::to_owned)
463            .collect();
464
465        entries.push(WorkerInfo::new(
466            WorkerId::new(worker_id),
467            WorkerInstanceId::new(worker_inst),
468            Namespace::new(namespace),
469            lanes,
470            capabilities,
471            TimestampMs::from_millis(last_hb_ms),
472            u64::try_from(liveness_ttl_ms.max(0)).unwrap_or(0),
473            TimestampMs::from_millis(registered_at_ms),
474        ));
475    }
476
477    let page_full = rows.len() as i64 >= limit;
478    let cursor = if page_full {
479        entries.last().map(|w| w.worker_instance_id.clone())
480    } else {
481        None
482    };
483    Ok(ListWorkersResult::new(entries, cursor))
484}
485
486// ── TTL-sweep scanner ────────────────────────────────────────────
487
488/// Per-partition TTL sweep. Mirrors Valkey's native PEXPIRE behaviour
489/// for PG/SQLite: rows whose `last_heartbeat_ms + liveness_ttl_ms`
490/// falls strictly below `now_ms` are deleted and a `ttl_swept` event
491/// is appended to the audit log.
492///
493/// Returns a `ScanReport` for the supervisor's per-tick log.
494pub async fn ttl_sweep_tick(pool: &PgPool, partition_key: i16) -> Result<ScanReport, EngineError> {
495    let now_ms: i64 = i64::try_from(
496        std::time::SystemTime::now()
497            .duration_since(std::time::UNIX_EPOCH)
498            .map(|d| d.as_millis())
499            .unwrap_or(0),
500    )
501    .unwrap_or(i64::MAX);
502
503    // DELETE + append-to-event-log in a single statement via CTE so
504    // the sweep is atomic per row: never a row deleted without an
505    // event, never an event without a delete. Concurrent
506    // `mark_worker_dead` on the same row deletes first + writes its
507    // own event; the sweep's DELETE finds zero rows, the INSERT-SELECT
508    // inserts zero rows. Idempotent.
509    let report_rows = sqlx::query(
510        "WITH swept AS (\
511             DELETE FROM ff_worker_registry \
512             WHERE partition_key = $1 \
513               AND last_heartbeat_ms + liveness_ttl_ms < $2 \
514             RETURNING partition_key, namespace, worker_instance_id\
515         ), ev AS (\
516             INSERT INTO ff_worker_registry_event \
517                 (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
518             SELECT partition_key, namespace, worker_instance_id, 'ttl_swept', $2, NULL \
519               FROM swept \
520             ON CONFLICT DO NOTHING \
521             RETURNING 1\
522         ) SELECT COUNT(*) AS swept FROM swept",
523    )
524    .bind(partition_key)
525    .bind(now_ms)
526    .fetch_one(pool)
527    .await
528    .map_err(map_sqlx_error)?;
529
530    let processed: i64 = report_rows.try_get("swept").map_err(map_sqlx_error)?;
531    Ok(ScanReport {
532        processed: u32::try_from(processed.max(0)).unwrap_or(u32::MAX),
533        errors: 0,
534    })
535}