1use std::collections::BTreeSet;
17
18use sqlx::{PgPool, Row};
19use uuid::Uuid;
20
21use ff_core::contracts::{
22 ExpiredLeaseInfo, ExpiredLeasesCursor, HeartbeatWorkerArgs, HeartbeatWorkerOutcome,
23 LIST_EXPIRED_LEASES_DEFAULT_LIMIT, LIST_EXPIRED_LEASES_MAX_LIMIT, ListExpiredLeasesArgs,
24 ListExpiredLeasesResult, ListWorkersArgs, ListWorkersResult, MARK_WORKER_DEAD_REASON_MAX_BYTES,
25 MarkWorkerDeadArgs, MarkWorkerDeadOutcome, RegisterWorkerArgs, RegisterWorkerOutcome,
26 WorkerInfo,
27};
28use ff_core::engine_error::{EngineError, ValidationKind};
29use ff_core::hash::fnv1a_u64;
30use ff_core::types::{
31 AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, Namespace, TimestampMs, WorkerId,
32 WorkerInstanceId,
33};
34
35use crate::error::map_sqlx_error;
36use crate::reconcilers::ScanReport;
37
38pub fn worker_partition_key(worker_instance_id: &str) -> i16 {
41 (fnv1a_u64(worker_instance_id.as_bytes()) % 256) as i16
42}
43
44fn synthetic_lease_uuid(exec_uuid: Uuid, attempt_index: i32, lease_epoch: i64) -> Uuid {
50 let mut bytes = *exec_uuid.as_bytes();
55 let ai = attempt_index.to_be_bytes();
56 let le = lease_epoch.to_be_bytes();
57 for i in 0..4 {
58 bytes[8 + i] ^= ai[i];
59 }
60 for i in 0..8 {
61 bytes[i] ^= le[i];
62 }
63 Uuid::from_bytes(bytes)
64}
65
66pub async fn register_worker(
69 pool: &PgPool,
70 args: RegisterWorkerArgs,
71) -> Result<RegisterWorkerOutcome, EngineError> {
72 let partition_key = worker_partition_key(args.worker_instance_id.as_str());
73 let lanes_sorted: Vec<String> = args.lanes.iter().map(|l| l.0.clone()).collect();
74 let caps_csv: String = args
75 .capabilities
76 .iter()
77 .cloned()
78 .collect::<Vec<String>>()
79 .join(",");
80
81 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
86
87 let existing_worker_id: Option<String> = sqlx::query_scalar(
91 "SELECT worker_id FROM ff_worker_registry \
92 WHERE partition_key = $1 AND namespace = $2 AND worker_instance_id = $3 \
93 FOR UPDATE",
94 )
95 .bind(partition_key)
96 .bind(args.namespace.as_str())
97 .bind(args.worker_instance_id.as_str())
98 .fetch_optional(&mut *tx)
99 .await
100 .map_err(map_sqlx_error)?;
101
102 if let Some(existing) = existing_worker_id.as_deref()
103 && existing != args.worker_id.as_str()
104 {
105 return Err(EngineError::Validation {
106 kind: ValidationKind::InvalidInput,
107 detail: "instance_id reassigned".into(),
108 });
109 }
110
111 let pre_existed: Option<i64> = sqlx::query_scalar(
126 "SELECT 1::bigint FROM ff_worker_registry \
127 WHERE partition_key = $1 \
128 AND namespace = $2 \
129 AND worker_instance_id = $3",
130 )
131 .bind(partition_key)
132 .bind(args.namespace.as_str())
133 .bind(args.worker_instance_id.as_str())
134 .fetch_optional(&mut *tx)
135 .await
136 .map_err(map_sqlx_error)?;
137
138 sqlx::query(
139 "INSERT INTO ff_worker_registry (\
140 partition_key, namespace, worker_instance_id, worker_id, \
141 lanes, capabilities_csv, last_heartbeat_ms, liveness_ttl_ms, \
142 registered_at_ms\
143 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
144 ON CONFLICT (partition_key, namespace, worker_instance_id) DO UPDATE SET \
145 worker_id = EXCLUDED.worker_id, \
146 lanes = EXCLUDED.lanes, \
147 capabilities_csv = EXCLUDED.capabilities_csv, \
148 last_heartbeat_ms = EXCLUDED.last_heartbeat_ms, \
149 liveness_ttl_ms = EXCLUDED.liveness_ttl_ms",
150 )
151 .bind(partition_key)
152 .bind(args.namespace.as_str())
153 .bind(args.worker_instance_id.as_str())
154 .bind(args.worker_id.as_str())
155 .bind(&lanes_sorted)
156 .bind(caps_csv.as_str())
157 .bind(args.now.0)
158 .bind(args.liveness_ttl_ms as i64)
159 .bind(args.now.0)
160 .execute(&mut *tx)
161 .await
162 .map_err(map_sqlx_error)?;
163 let registered = pre_existed.is_none();
164
165 sqlx::query(
166 "INSERT INTO ff_worker_registry_event \
167 (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
168 VALUES ($1, $2, $3, 'registered', $4, NULL) \
169 ON CONFLICT DO NOTHING",
170 )
171 .bind(partition_key)
172 .bind(args.namespace.as_str())
173 .bind(args.worker_instance_id.as_str())
174 .bind(args.now.0)
175 .execute(&mut *tx)
176 .await
177 .map_err(map_sqlx_error)?;
178
179 tx.commit().await.map_err(map_sqlx_error)?;
180
181 Ok(if registered {
182 RegisterWorkerOutcome::Registered
183 } else {
184 RegisterWorkerOutcome::Refreshed
185 })
186}
187
188pub async fn heartbeat_worker(
191 pool: &PgPool,
192 args: HeartbeatWorkerArgs,
193) -> Result<HeartbeatWorkerOutcome, EngineError> {
194 let partition_key = worker_partition_key(args.worker_instance_id.as_str());
195
196 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
201
202 let ttl_row: Option<i64> = sqlx::query_scalar(
203 "UPDATE ff_worker_registry SET last_heartbeat_ms = $4 \
204 WHERE partition_key = $1 AND namespace = $2 AND worker_instance_id = $3 \
205 AND last_heartbeat_ms + liveness_ttl_ms > $4 \
206 RETURNING liveness_ttl_ms",
207 )
208 .bind(partition_key)
209 .bind(args.namespace.as_str())
210 .bind(args.worker_instance_id.as_str())
211 .bind(args.now.0)
212 .fetch_optional(&mut *tx)
213 .await
214 .map_err(map_sqlx_error)?;
215
216 let Some(ttl_ms) = ttl_row else {
217 tx.commit().await.map_err(map_sqlx_error)?;
218 return Ok(HeartbeatWorkerOutcome::NotRegistered);
219 };
220
221 sqlx::query(
222 "INSERT INTO ff_worker_registry_event \
223 (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
224 VALUES ($1, $2, $3, 'heartbeat', $4, NULL) \
225 ON CONFLICT DO NOTHING",
226 )
227 .bind(partition_key)
228 .bind(args.namespace.as_str())
229 .bind(args.worker_instance_id.as_str())
230 .bind(args.now.0)
231 .execute(&mut *tx)
232 .await
233 .map_err(map_sqlx_error)?;
234
235 tx.commit().await.map_err(map_sqlx_error)?;
236
237 let next_expiry_ms = TimestampMs::from_millis(args.now.0.saturating_add(ttl_ms));
238 Ok(HeartbeatWorkerOutcome::Refreshed { next_expiry_ms })
239}
240
241pub async fn mark_worker_dead(
244 pool: &PgPool,
245 args: MarkWorkerDeadArgs,
246) -> Result<MarkWorkerDeadOutcome, EngineError> {
247 if args.reason.len() > MARK_WORKER_DEAD_REASON_MAX_BYTES {
251 return Err(EngineError::Validation {
252 kind: ValidationKind::InvalidInput,
253 detail: format!(
254 "reason: exceeds {} bytes (got {})",
255 MARK_WORKER_DEAD_REASON_MAX_BYTES,
256 args.reason.len()
257 ),
258 });
259 }
260 if args.reason.chars().any(|c| c.is_control()) {
261 return Err(EngineError::Validation {
262 kind: ValidationKind::InvalidInput,
263 detail: "reason: must not contain control characters".into(),
264 });
265 }
266
267 let partition_key = worker_partition_key(args.worker_instance_id.as_str());
268
269 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
270
271 let deleted: i64 = sqlx::query_scalar(
272 "WITH d AS (\
273 DELETE FROM ff_worker_registry \
274 WHERE partition_key = $1 AND namespace = $2 AND worker_instance_id = $3 \
275 RETURNING 1 AS x\
276 ) SELECT COUNT(*) FROM d",
277 )
278 .bind(partition_key)
279 .bind(args.namespace.as_str())
280 .bind(args.worker_instance_id.as_str())
281 .fetch_one(&mut *tx)
282 .await
283 .map_err(map_sqlx_error)?;
284
285 if deleted == 0 {
286 tx.commit().await.map_err(map_sqlx_error)?;
287 return Ok(MarkWorkerDeadOutcome::NotRegistered);
288 }
289
290 sqlx::query(
291 "INSERT INTO ff_worker_registry_event \
292 (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
293 VALUES ($1, $2, $3, 'marked_dead', $4, $5) \
294 ON CONFLICT DO NOTHING",
295 )
296 .bind(partition_key)
297 .bind(args.namespace.as_str())
298 .bind(args.worker_instance_id.as_str())
299 .bind(args.now.0)
300 .bind(args.reason.as_str())
301 .execute(&mut *tx)
302 .await
303 .map_err(map_sqlx_error)?;
304
305 tx.commit().await.map_err(map_sqlx_error)?;
306 Ok(MarkWorkerDeadOutcome::Marked)
307}
308
309pub async fn list_expired_leases(
312 pool: &PgPool,
313 args: ListExpiredLeasesArgs,
314) -> Result<ListExpiredLeasesResult, EngineError> {
315 let limit = args
316 .limit
317 .unwrap_or(LIST_EXPIRED_LEASES_DEFAULT_LIMIT)
318 .min(LIST_EXPIRED_LEASES_MAX_LIMIT) as i64;
319 let _ = args.max_partitions_per_call;
323
324 let (cursor_expiry, cursor_eid_str): (Option<i64>, Option<String>) = match args.after.as_ref() {
328 Some(c) => (Some(c.expires_at_ms.0), Some(c.execution_id.to_string())),
329 None => (None, None),
330 };
331 let namespace_filter: Option<&str> = args.namespace.as_ref().map(|n| n.as_str());
332
333 let rows = sqlx::query(
345 "SELECT a.partition_key, a.execution_id, a.attempt_index, a.lease_epoch, \
346 a.worker_instance_id, a.lease_expires_at_ms \
347 FROM ff_attempt a \
348 JOIN ff_exec_core c \
349 ON c.partition_key = a.partition_key AND c.execution_id = a.execution_id \
350 WHERE a.lease_expires_at_ms IS NOT NULL \
351 AND a.lease_expires_at_ms <= $1 \
352 AND a.worker_instance_id IS NOT NULL \
353 AND c.public_state IN ('claimed', 'running') \
354 AND ($2::text IS NULL OR c.raw_fields->>'namespace' = $2) \
355 AND ($3::bigint IS NULL \
356 OR (a.lease_expires_at_ms, a.execution_id::text) > ($3, $4)) \
357 ORDER BY a.lease_expires_at_ms ASC, a.execution_id ASC \
358 LIMIT $5",
359 )
360 .bind(args.as_of.0)
361 .bind(namespace_filter)
362 .bind(cursor_expiry)
363 .bind(cursor_eid_str.as_deref().unwrap_or(""))
364 .bind(limit)
365 .fetch_all(pool)
366 .await
367 .map_err(map_sqlx_error)?;
368
369 let mut entries: Vec<ExpiredLeaseInfo> = Vec::with_capacity(rows.len());
370 for row in &rows {
371 let partition_key: i16 = row.try_get("partition_key").map_err(map_sqlx_error)?;
372 let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
373 let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
374 let lease_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
375 let worker_inst: String = row.try_get("worker_instance_id").map_err(map_sqlx_error)?;
376 let expires_at_ms: i64 = row.try_get("lease_expires_at_ms").map_err(map_sqlx_error)?;
377
378 let eid_str = format!("{{fp:{}}}:{}", partition_key, exec_uuid);
379 let execution_id = ExecutionId::parse(&eid_str).map_err(|e| EngineError::Validation {
380 kind: ValidationKind::Corruption,
381 detail: format!("list_expired_leases: bad execution_id {eid_str:?}: {e}"),
382 })?;
383
384 let lease_id = LeaseId::from_uuid(synthetic_lease_uuid(exec_uuid, attempt_index, lease_epoch));
385 let attempt_index_u = u32::try_from(attempt_index.max(0)).unwrap_or(0);
386 let lease_epoch_u = u64::try_from(lease_epoch.max(0)).unwrap_or(0);
387
388 entries.push(ExpiredLeaseInfo::new(
389 execution_id,
390 lease_id,
391 LeaseEpoch::new(lease_epoch_u),
392 WorkerInstanceId::new(worker_inst),
393 TimestampMs::from_millis(expires_at_ms),
394 AttemptIndex::new(attempt_index_u),
395 ));
396 }
397
398 let page_full = rows.len() as i64 >= limit;
399 let cursor = if page_full {
400 entries
401 .last()
402 .map(|e| ExpiredLeasesCursor::new(e.expires_at_ms, e.execution_id.clone()))
403 } else {
404 None
405 };
406 Ok(ListExpiredLeasesResult::new(entries, cursor))
407}
408
409pub async fn list_workers(
412 pool: &PgPool,
413 args: ListWorkersArgs,
414) -> Result<ListWorkersResult, EngineError> {
415 let Some(ns) = args.namespace.as_ref() else {
423 return Err(EngineError::Unavailable {
424 op: "list_workers (cross-namespace on Postgres — pass namespace explicitly)",
425 });
426 };
427 let limit = args.limit.unwrap_or(1000) as i64;
428 let namespace_filter: &str = ns.as_str();
429 let after_cursor: Option<&str> = args.after.as_ref().map(|w| w.as_str());
430
431 let rows = sqlx::query(
432 "SELECT worker_id, worker_instance_id, namespace, lanes, \
433 capabilities_csv, last_heartbeat_ms, liveness_ttl_ms, registered_at_ms \
434 FROM ff_worker_registry \
435 WHERE namespace = $1 \
436 AND ($2::text IS NULL OR worker_instance_id > $2) \
437 ORDER BY worker_instance_id ASC \
438 LIMIT $3",
439 )
440 .bind(namespace_filter)
441 .bind(after_cursor)
442 .bind(limit)
443 .fetch_all(pool)
444 .await
445 .map_err(map_sqlx_error)?;
446
447 let mut entries: Vec<WorkerInfo> = Vec::with_capacity(rows.len());
448 for row in &rows {
449 let worker_id: String = row.try_get("worker_id").map_err(map_sqlx_error)?;
450 let worker_inst: String = row.try_get("worker_instance_id").map_err(map_sqlx_error)?;
451 let namespace: String = row.try_get("namespace").map_err(map_sqlx_error)?;
452 let lanes_vec: Vec<String> = row.try_get("lanes").map_err(map_sqlx_error)?;
453 let caps_csv: String = row.try_get("capabilities_csv").map_err(map_sqlx_error)?;
454 let last_hb_ms: i64 = row.try_get("last_heartbeat_ms").map_err(map_sqlx_error)?;
455 let liveness_ttl_ms: i64 = row.try_get("liveness_ttl_ms").map_err(map_sqlx_error)?;
456 let registered_at_ms: i64 = row.try_get("registered_at_ms").map_err(map_sqlx_error)?;
457
458 let lanes: BTreeSet<LaneId> = lanes_vec.into_iter().map(LaneId).collect();
459 let capabilities: BTreeSet<String> = caps_csv
460 .split(',')
461 .filter(|s| !s.is_empty())
462 .map(str::to_owned)
463 .collect();
464
465 entries.push(WorkerInfo::new(
466 WorkerId::new(worker_id),
467 WorkerInstanceId::new(worker_inst),
468 Namespace::new(namespace),
469 lanes,
470 capabilities,
471 TimestampMs::from_millis(last_hb_ms),
472 u64::try_from(liveness_ttl_ms.max(0)).unwrap_or(0),
473 TimestampMs::from_millis(registered_at_ms),
474 ));
475 }
476
477 let page_full = rows.len() as i64 >= limit;
478 let cursor = if page_full {
479 entries.last().map(|w| w.worker_instance_id.clone())
480 } else {
481 None
482 };
483 Ok(ListWorkersResult::new(entries, cursor))
484}
485
486pub async fn ttl_sweep_tick(pool: &PgPool, partition_key: i16) -> Result<ScanReport, EngineError> {
495 let now_ms: i64 = i64::try_from(
496 std::time::SystemTime::now()
497 .duration_since(std::time::UNIX_EPOCH)
498 .map(|d| d.as_millis())
499 .unwrap_or(0),
500 )
501 .unwrap_or(i64::MAX);
502
503 let report_rows = sqlx::query(
510 "WITH swept AS (\
511 DELETE FROM ff_worker_registry \
512 WHERE partition_key = $1 \
513 AND last_heartbeat_ms + liveness_ttl_ms < $2 \
514 RETURNING partition_key, namespace, worker_instance_id\
515 ), ev AS (\
516 INSERT INTO ff_worker_registry_event \
517 (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
518 SELECT partition_key, namespace, worker_instance_id, 'ttl_swept', $2, NULL \
519 FROM swept \
520 ON CONFLICT DO NOTHING \
521 RETURNING 1\
522 ) SELECT COUNT(*) AS swept FROM swept",
523 )
524 .bind(partition_key)
525 .bind(now_ms)
526 .fetch_one(pool)
527 .await
528 .map_err(map_sqlx_error)?;
529
530 let processed: i64 = report_rows.try_get("swept").map_err(map_sqlx_error)?;
531 Ok(ScanReport {
532 processed: u32::try_from(processed.max(0)).unwrap_or(u32::MAX),
533 errors: 0,
534 })
535}