1use std::collections::BTreeSet;
17
18use sqlx::{PgPool, Row};
19use uuid::Uuid;
20
21use ff_core::contracts::{
22 ExpiredLeaseInfo, ExpiredLeasesCursor, HeartbeatWorkerArgs, HeartbeatWorkerOutcome,
23 LIST_EXPIRED_LEASES_DEFAULT_LIMIT, LIST_EXPIRED_LEASES_MAX_LIMIT, ListExpiredLeasesArgs,
24 ListExpiredLeasesResult, ListWorkersArgs, ListWorkersResult, MARK_WORKER_DEAD_REASON_MAX_BYTES,
25 MarkWorkerDeadArgs, MarkWorkerDeadOutcome, RegisterWorkerArgs, RegisterWorkerOutcome,
26 WorkerInfo,
27};
28use ff_core::engine_error::{EngineError, ValidationKind};
29use ff_core::hash::fnv1a_u64;
30use ff_core::types::{
31 AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, Namespace, TimestampMs, WorkerId,
32 WorkerInstanceId,
33};
34
35use crate::error::map_sqlx_error;
36use crate::reconcilers::ScanReport;
37
38pub fn worker_partition_key(worker_instance_id: &str) -> i16 {
41 (fnv1a_u64(worker_instance_id.as_bytes()) % 256) as i16
42}
43
44fn synthetic_lease_uuid(exec_uuid: Uuid, attempt_index: i32, lease_epoch: i64) -> Uuid {
50 let mut bytes = *exec_uuid.as_bytes();
55 let ai = attempt_index.to_be_bytes();
56 let le = lease_epoch.to_be_bytes();
57 for i in 0..4 {
58 bytes[8 + i] ^= ai[i];
59 }
60 for i in 0..8 {
61 bytes[i] ^= le[i];
62 }
63 Uuid::from_bytes(bytes)
64}
65
66pub async fn register_worker(
69 pool: &PgPool,
70 args: RegisterWorkerArgs,
71) -> Result<RegisterWorkerOutcome, EngineError> {
72 let partition_key = worker_partition_key(args.worker_instance_id.as_str());
73 let lanes_sorted: Vec<String> = args.lanes.iter().map(|l| l.0.clone()).collect();
74 let caps_csv: String = args
75 .capabilities
76 .iter()
77 .cloned()
78 .collect::<Vec<String>>()
79 .join(",");
80
81 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
86
87 let existing_worker_id: Option<String> = sqlx::query_scalar(
91 "SELECT worker_id FROM ff_worker_registry \
92 WHERE partition_key = $1 AND namespace = $2 AND worker_instance_id = $3 \
93 FOR UPDATE",
94 )
95 .bind(partition_key)
96 .bind(args.namespace.as_str())
97 .bind(args.worker_instance_id.as_str())
98 .fetch_optional(&mut *tx)
99 .await
100 .map_err(map_sqlx_error)?;
101
102 if let Some(existing) = existing_worker_id.as_deref()
103 && existing != args.worker_id.as_str()
104 {
105 return Err(EngineError::Validation {
106 kind: ValidationKind::InvalidInput,
107 detail: "instance_id reassigned".into(),
108 });
109 }
110
111 let registered: bool = sqlx::query_scalar(
115 "INSERT INTO ff_worker_registry (\
116 partition_key, namespace, worker_instance_id, worker_id, \
117 lanes, capabilities_csv, last_heartbeat_ms, liveness_ttl_ms, \
118 registered_at_ms\
119 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
120 ON CONFLICT (partition_key, namespace, worker_instance_id) DO UPDATE SET \
121 worker_id = EXCLUDED.worker_id, \
122 lanes = EXCLUDED.lanes, \
123 capabilities_csv = EXCLUDED.capabilities_csv, \
124 last_heartbeat_ms = EXCLUDED.last_heartbeat_ms, \
125 liveness_ttl_ms = EXCLUDED.liveness_ttl_ms \
126 RETURNING (xmax = 0)",
127 )
128 .bind(partition_key)
129 .bind(args.namespace.as_str())
130 .bind(args.worker_instance_id.as_str())
131 .bind(args.worker_id.as_str())
132 .bind(&lanes_sorted)
133 .bind(caps_csv.as_str())
134 .bind(args.now.0)
135 .bind(args.liveness_ttl_ms as i64)
136 .bind(args.now.0)
137 .fetch_one(&mut *tx)
138 .await
139 .map_err(map_sqlx_error)?;
140
141 sqlx::query(
142 "INSERT INTO ff_worker_registry_event \
143 (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
144 VALUES ($1, $2, $3, 'registered', $4, NULL) \
145 ON CONFLICT DO NOTHING",
146 )
147 .bind(partition_key)
148 .bind(args.namespace.as_str())
149 .bind(args.worker_instance_id.as_str())
150 .bind(args.now.0)
151 .execute(&mut *tx)
152 .await
153 .map_err(map_sqlx_error)?;
154
155 tx.commit().await.map_err(map_sqlx_error)?;
156
157 Ok(if registered {
158 RegisterWorkerOutcome::Registered
159 } else {
160 RegisterWorkerOutcome::Refreshed
161 })
162}
163
164pub async fn heartbeat_worker(
167 pool: &PgPool,
168 args: HeartbeatWorkerArgs,
169) -> Result<HeartbeatWorkerOutcome, EngineError> {
170 let partition_key = worker_partition_key(args.worker_instance_id.as_str());
171
172 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
177
178 let ttl_row: Option<i64> = sqlx::query_scalar(
179 "UPDATE ff_worker_registry SET last_heartbeat_ms = $4 \
180 WHERE partition_key = $1 AND namespace = $2 AND worker_instance_id = $3 \
181 AND last_heartbeat_ms + liveness_ttl_ms > $4 \
182 RETURNING liveness_ttl_ms",
183 )
184 .bind(partition_key)
185 .bind(args.namespace.as_str())
186 .bind(args.worker_instance_id.as_str())
187 .bind(args.now.0)
188 .fetch_optional(&mut *tx)
189 .await
190 .map_err(map_sqlx_error)?;
191
192 let Some(ttl_ms) = ttl_row else {
193 tx.commit().await.map_err(map_sqlx_error)?;
194 return Ok(HeartbeatWorkerOutcome::NotRegistered);
195 };
196
197 sqlx::query(
198 "INSERT INTO ff_worker_registry_event \
199 (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
200 VALUES ($1, $2, $3, 'heartbeat', $4, NULL) \
201 ON CONFLICT DO NOTHING",
202 )
203 .bind(partition_key)
204 .bind(args.namespace.as_str())
205 .bind(args.worker_instance_id.as_str())
206 .bind(args.now.0)
207 .execute(&mut *tx)
208 .await
209 .map_err(map_sqlx_error)?;
210
211 tx.commit().await.map_err(map_sqlx_error)?;
212
213 let next_expiry_ms = TimestampMs::from_millis(args.now.0.saturating_add(ttl_ms));
214 Ok(HeartbeatWorkerOutcome::Refreshed { next_expiry_ms })
215}
216
217pub async fn mark_worker_dead(
220 pool: &PgPool,
221 args: MarkWorkerDeadArgs,
222) -> Result<MarkWorkerDeadOutcome, EngineError> {
223 if args.reason.len() > MARK_WORKER_DEAD_REASON_MAX_BYTES {
227 return Err(EngineError::Validation {
228 kind: ValidationKind::InvalidInput,
229 detail: format!(
230 "reason: exceeds {} bytes (got {})",
231 MARK_WORKER_DEAD_REASON_MAX_BYTES,
232 args.reason.len()
233 ),
234 });
235 }
236 if args.reason.chars().any(|c| c.is_control()) {
237 return Err(EngineError::Validation {
238 kind: ValidationKind::InvalidInput,
239 detail: "reason: must not contain control characters".into(),
240 });
241 }
242
243 let partition_key = worker_partition_key(args.worker_instance_id.as_str());
244
245 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
246
247 let deleted: i64 = sqlx::query_scalar(
248 "WITH d AS (\
249 DELETE FROM ff_worker_registry \
250 WHERE partition_key = $1 AND namespace = $2 AND worker_instance_id = $3 \
251 RETURNING 1 AS x\
252 ) SELECT COUNT(*) FROM d",
253 )
254 .bind(partition_key)
255 .bind(args.namespace.as_str())
256 .bind(args.worker_instance_id.as_str())
257 .fetch_one(&mut *tx)
258 .await
259 .map_err(map_sqlx_error)?;
260
261 if deleted == 0 {
262 tx.commit().await.map_err(map_sqlx_error)?;
263 return Ok(MarkWorkerDeadOutcome::NotRegistered);
264 }
265
266 sqlx::query(
267 "INSERT INTO ff_worker_registry_event \
268 (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
269 VALUES ($1, $2, $3, 'marked_dead', $4, $5) \
270 ON CONFLICT DO NOTHING",
271 )
272 .bind(partition_key)
273 .bind(args.namespace.as_str())
274 .bind(args.worker_instance_id.as_str())
275 .bind(args.now.0)
276 .bind(args.reason.as_str())
277 .execute(&mut *tx)
278 .await
279 .map_err(map_sqlx_error)?;
280
281 tx.commit().await.map_err(map_sqlx_error)?;
282 Ok(MarkWorkerDeadOutcome::Marked)
283}
284
285pub async fn list_expired_leases(
288 pool: &PgPool,
289 args: ListExpiredLeasesArgs,
290) -> Result<ListExpiredLeasesResult, EngineError> {
291 let limit = args
292 .limit
293 .unwrap_or(LIST_EXPIRED_LEASES_DEFAULT_LIMIT)
294 .min(LIST_EXPIRED_LEASES_MAX_LIMIT) as i64;
295 let _ = args.max_partitions_per_call;
299
300 let (cursor_expiry, cursor_eid_str): (Option<i64>, Option<String>) = match args.after.as_ref() {
304 Some(c) => (Some(c.expires_at_ms.0), Some(c.execution_id.to_string())),
305 None => (None, None),
306 };
307 let namespace_filter: Option<&str> = args.namespace.as_ref().map(|n| n.as_str());
308
309 let rows = sqlx::query(
321 "SELECT a.partition_key, a.execution_id, a.attempt_index, a.lease_epoch, \
322 a.worker_instance_id, a.lease_expires_at_ms \
323 FROM ff_attempt a \
324 JOIN ff_exec_core c \
325 ON c.partition_key = a.partition_key AND c.execution_id = a.execution_id \
326 WHERE a.lease_expires_at_ms IS NOT NULL \
327 AND a.lease_expires_at_ms <= $1 \
328 AND a.worker_instance_id IS NOT NULL \
329 AND c.public_state IN ('claimed', 'running') \
330 AND ($2::text IS NULL OR c.raw_fields->>'namespace' = $2) \
331 AND ($3::bigint IS NULL \
332 OR (a.lease_expires_at_ms, a.execution_id::text) > ($3, $4)) \
333 ORDER BY a.lease_expires_at_ms ASC, a.execution_id ASC \
334 LIMIT $5",
335 )
336 .bind(args.as_of.0)
337 .bind(namespace_filter)
338 .bind(cursor_expiry)
339 .bind(cursor_eid_str.as_deref().unwrap_or(""))
340 .bind(limit)
341 .fetch_all(pool)
342 .await
343 .map_err(map_sqlx_error)?;
344
345 let mut entries: Vec<ExpiredLeaseInfo> = Vec::with_capacity(rows.len());
346 for row in &rows {
347 let partition_key: i16 = row.try_get("partition_key").map_err(map_sqlx_error)?;
348 let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
349 let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
350 let lease_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
351 let worker_inst: String = row.try_get("worker_instance_id").map_err(map_sqlx_error)?;
352 let expires_at_ms: i64 = row.try_get("lease_expires_at_ms").map_err(map_sqlx_error)?;
353
354 let eid_str = format!("{{fp:{}}}:{}", partition_key, exec_uuid);
355 let execution_id = ExecutionId::parse(&eid_str).map_err(|e| EngineError::Validation {
356 kind: ValidationKind::Corruption,
357 detail: format!("list_expired_leases: bad execution_id {eid_str:?}: {e}"),
358 })?;
359
360 let lease_id = LeaseId::from_uuid(synthetic_lease_uuid(exec_uuid, attempt_index, lease_epoch));
361 let attempt_index_u = u32::try_from(attempt_index.max(0)).unwrap_or(0);
362 let lease_epoch_u = u64::try_from(lease_epoch.max(0)).unwrap_or(0);
363
364 entries.push(ExpiredLeaseInfo::new(
365 execution_id,
366 lease_id,
367 LeaseEpoch::new(lease_epoch_u),
368 WorkerInstanceId::new(worker_inst),
369 TimestampMs::from_millis(expires_at_ms),
370 AttemptIndex::new(attempt_index_u),
371 ));
372 }
373
374 let page_full = rows.len() as i64 >= limit;
375 let cursor = if page_full {
376 entries
377 .last()
378 .map(|e| ExpiredLeasesCursor::new(e.expires_at_ms, e.execution_id.clone()))
379 } else {
380 None
381 };
382 Ok(ListExpiredLeasesResult::new(entries, cursor))
383}
384
385pub async fn list_workers(
388 pool: &PgPool,
389 args: ListWorkersArgs,
390) -> Result<ListWorkersResult, EngineError> {
391 let Some(ns) = args.namespace.as_ref() else {
399 return Err(EngineError::Unavailable {
400 op: "list_workers (cross-namespace on Postgres — pass namespace explicitly)",
401 });
402 };
403 let limit = args.limit.unwrap_or(1000) as i64;
404 let namespace_filter: &str = ns.as_str();
405 let after_cursor: Option<&str> = args.after.as_ref().map(|w| w.as_str());
406
407 let rows = sqlx::query(
408 "SELECT worker_id, worker_instance_id, namespace, lanes, \
409 capabilities_csv, last_heartbeat_ms, liveness_ttl_ms, registered_at_ms \
410 FROM ff_worker_registry \
411 WHERE namespace = $1 \
412 AND ($2::text IS NULL OR worker_instance_id > $2) \
413 ORDER BY worker_instance_id ASC \
414 LIMIT $3",
415 )
416 .bind(namespace_filter)
417 .bind(after_cursor)
418 .bind(limit)
419 .fetch_all(pool)
420 .await
421 .map_err(map_sqlx_error)?;
422
423 let mut entries: Vec<WorkerInfo> = Vec::with_capacity(rows.len());
424 for row in &rows {
425 let worker_id: String = row.try_get("worker_id").map_err(map_sqlx_error)?;
426 let worker_inst: String = row.try_get("worker_instance_id").map_err(map_sqlx_error)?;
427 let namespace: String = row.try_get("namespace").map_err(map_sqlx_error)?;
428 let lanes_vec: Vec<String> = row.try_get("lanes").map_err(map_sqlx_error)?;
429 let caps_csv: String = row.try_get("capabilities_csv").map_err(map_sqlx_error)?;
430 let last_hb_ms: i64 = row.try_get("last_heartbeat_ms").map_err(map_sqlx_error)?;
431 let liveness_ttl_ms: i64 = row.try_get("liveness_ttl_ms").map_err(map_sqlx_error)?;
432 let registered_at_ms: i64 = row.try_get("registered_at_ms").map_err(map_sqlx_error)?;
433
434 let lanes: BTreeSet<LaneId> = lanes_vec.into_iter().map(LaneId).collect();
435 let capabilities: BTreeSet<String> = caps_csv
436 .split(',')
437 .filter(|s| !s.is_empty())
438 .map(str::to_owned)
439 .collect();
440
441 entries.push(WorkerInfo::new(
442 WorkerId::new(worker_id),
443 WorkerInstanceId::new(worker_inst),
444 Namespace::new(namespace),
445 lanes,
446 capabilities,
447 TimestampMs::from_millis(last_hb_ms),
448 u64::try_from(liveness_ttl_ms.max(0)).unwrap_or(0),
449 TimestampMs::from_millis(registered_at_ms),
450 ));
451 }
452
453 let page_full = rows.len() as i64 >= limit;
454 let cursor = if page_full {
455 entries.last().map(|w| w.worker_instance_id.clone())
456 } else {
457 None
458 };
459 Ok(ListWorkersResult::new(entries, cursor))
460}
461
462pub async fn ttl_sweep_tick(pool: &PgPool, partition_key: i16) -> Result<ScanReport, EngineError> {
471 let now_ms: i64 = i64::try_from(
472 std::time::SystemTime::now()
473 .duration_since(std::time::UNIX_EPOCH)
474 .map(|d| d.as_millis())
475 .unwrap_or(0),
476 )
477 .unwrap_or(i64::MAX);
478
479 let report_rows = sqlx::query(
486 "WITH swept AS (\
487 DELETE FROM ff_worker_registry \
488 WHERE partition_key = $1 \
489 AND last_heartbeat_ms + liveness_ttl_ms < $2 \
490 RETURNING partition_key, namespace, worker_instance_id\
491 ), ev AS (\
492 INSERT INTO ff_worker_registry_event \
493 (partition_key, namespace, worker_instance_id, event_kind, event_at_ms, reason) \
494 SELECT partition_key, namespace, worker_instance_id, 'ttl_swept', $2, NULL \
495 FROM swept \
496 ON CONFLICT DO NOTHING \
497 RETURNING 1\
498 ) SELECT COUNT(*) AS swept FROM swept",
499 )
500 .bind(partition_key)
501 .bind(now_ms)
502 .fetch_one(pool)
503 .await
504 .map_err(map_sqlx_error)?;
505
506 let processed: i64 = report_rows.try_get("swept").map_err(map_sqlx_error)?;
507 Ok(ScanReport {
508 processed: u32::try_from(processed.max(0)).unwrap_or(u32::MAX),
509 errors: 0,
510 })
511}