1use std::time::Duration;
22
23use ff_core::backend::{BackendTag, Handle, HandleKind};
24use ff_core::contracts::{
25 IssueReclaimGrantArgs, IssueReclaimGrantOutcome, ReclaimExecutionArgs,
26 ReclaimExecutionOutcome, ReclaimGrant,
27};
28use ff_core::engine_error::{ContentionKind, EngineError, ValidationKind};
29use ff_core::handle_codec::{encode as encode_opaque, HandlePayload};
30use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
31use ff_core::types::{AttemptIndex, ExecutionId, LeaseEpoch};
32use sha2::{Digest, Sha256};
33use sqlx::{PgPool, Postgres, Row};
34use uuid::Uuid;
35
36use crate::error::map_sqlx_error;
37use crate::lease_event;
38use crate::signal::{current_active_kid, hmac_sign};
39
40const MAX_ATTEMPTS: u32 = 3;
41
42pub const DEFAULT_MAX_RECLAIM_COUNT: u32 = 1000;
44
45pub fn grant_id_from_key(grant_key: &str) -> Vec<u8> {
47 let mut h = Sha256::new();
48 h.update(grant_key.as_bytes());
49 h.finalize().to_vec()
50}
51
52fn capabilities_jsonb(caps: &std::collections::BTreeSet<String>) -> serde_json::Value {
53 serde_json::Value::Array(
54 caps.iter()
55 .map(|c| serde_json::Value::String(c.clone()))
56 .collect(),
57 )
58}
59
60#[allow(clippy::too_many_arguments)]
62pub async fn write_claim_grant<'c>(
63 tx: &mut sqlx::Transaction<'c, Postgres>,
64 partition_key: i16,
65 grant_key: &str,
66 execution_id: Uuid,
67 worker_id: &str,
68 worker_instance_id: &str,
69 grant_ttl_ms: u64,
70 issued_at_ms: i64,
71 expires_at_ms: i64,
72) -> Result<(), EngineError> {
73 let grant_id = grant_id_from_key(grant_key);
74 sqlx::query(
75 r#"
76 INSERT INTO ff_claim_grant (
77 partition_key, grant_id, execution_id, kind,
78 worker_id, worker_instance_id, lane_id,
79 capability_hash, worker_capabilities,
80 route_snapshot_json, admission_summary,
81 grant_ttl_ms, issued_at_ms, expires_at_ms
82 ) VALUES (
83 $1, $2, $3, 'claim',
84 $4, $5, NULL,
85 NULL, '[]'::jsonb,
86 NULL, NULL,
87 $6, $7, $8
88 )
89 ON CONFLICT (partition_key, grant_id) DO UPDATE SET
90 worker_id = EXCLUDED.worker_id,
91 worker_instance_id = EXCLUDED.worker_instance_id,
92 grant_ttl_ms = EXCLUDED.grant_ttl_ms,
93 issued_at_ms = EXCLUDED.issued_at_ms,
94 expires_at_ms = EXCLUDED.expires_at_ms
95 "#,
96 )
97 .bind(partition_key)
98 .bind(&grant_id)
99 .bind(execution_id)
100 .bind(worker_id)
101 .bind(worker_instance_id)
102 .bind(i64::try_from(grant_ttl_ms).unwrap_or(i64::MAX))
103 .bind(issued_at_ms)
104 .bind(expires_at_ms)
105 .execute(&mut **tx)
106 .await
107 .map_err(map_sqlx_error)?;
108
109 let grant_json = serde_json::json!({
123 "grant_key": grant_key,
124 "worker_id": worker_id,
125 "worker_instance_id": worker_instance_id,
126 "expires_at_ms": expires_at_ms,
127 "issued_at_ms": issued_at_ms,
128 });
129 sqlx::query(
130 r#"
131 UPDATE ff_exec_core
132 SET raw_fields = jsonb_set(
133 COALESCE(raw_fields, '{}'::jsonb),
134 '{claim_grant}',
135 $3::jsonb,
136 true)
137 WHERE partition_key = $1 AND execution_id = $2
138 "#,
139 )
140 .bind(partition_key)
141 .bind(execution_id)
142 .bind(grant_json)
143 .execute(&mut **tx)
144 .await
145 .map_err(map_sqlx_error)?;
146
147 Ok(())
148}
149
150pub async fn read_claim_grant_identity(
153 pool: &PgPool,
154 partition_key: i16,
155 execution_id: Uuid,
156) -> Result<Option<(String, String)>, EngineError> {
157 let row = sqlx::query(
158 r#"
159 SELECT worker_id, worker_instance_id
160 FROM ff_claim_grant
161 WHERE partition_key = $1
162 AND execution_id = $2
163 AND kind = 'claim'
164 ORDER BY issued_at_ms DESC
165 LIMIT 1
166 "#,
167 )
168 .bind(partition_key)
169 .bind(execution_id)
170 .fetch_optional(pool)
171 .await
172 .map_err(map_sqlx_error)?;
173 Ok(row.map(|r| {
174 (
175 r.get::<String, _>("worker_id"),
176 r.get::<String, _>("worker_instance_id"),
177 )
178 }))
179}
180
181fn is_retryable_serialization(err: &EngineError) -> bool {
184 matches!(err, EngineError::Contention(ContentionKind::LeaseConflict))
185}
186
187async fn begin_serializable(pool: &PgPool) -> Result<sqlx::Transaction<'_, Postgres>, EngineError> {
188 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
189 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
190 .execute(&mut *tx)
191 .await
192 .map_err(map_sqlx_error)?;
193 Ok(tx)
194}
195
196fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
197 let s = eid.as_str();
198 let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
199 kind: ValidationKind::InvalidInput,
200 detail: format!("execution_id missing `{{fp:` prefix: {s}"),
201 })?;
202 let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
203 kind: ValidationKind::InvalidInput,
204 detail: format!("execution_id missing `}}:`: {s}"),
205 })?;
206 let part_u: u16 = rest[..close].parse().map_err(|_| EngineError::Validation {
213 kind: ValidationKind::InvalidInput,
214 detail: format!("execution_id partition index not u16: {s}"),
215 })?;
216 let part = part_u as i16;
217 let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
218 kind: ValidationKind::InvalidInput,
219 detail: format!("execution_id UUID invalid: {s}"),
220 })?;
221 Ok((part, uuid))
222}
223
224fn now_ms() -> i64 {
225 let d = std::time::SystemTime::now()
226 .duration_since(std::time::UNIX_EPOCH)
227 .unwrap_or(Duration::ZERO);
228 (d.as_millis() as i64).max(0)
229}
230
231async fn issue_reclaim_grant_once(
234 pool: &PgPool,
235 args: &IssueReclaimGrantArgs,
236) -> Result<IssueReclaimGrantOutcome, EngineError> {
237 let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
238 let (kid, secret) = current_active_kid(pool)
239 .await?
240 .ok_or(EngineError::Unavailable {
241 op: "issue_reclaim_grant: ff_waitpoint_hmac keystore empty",
242 })?;
243
244 let mut tx = begin_serializable(pool).await?;
245
246 let row = sqlx::query(
251 r#"
252 SELECT ec.lifecycle_phase,
253 ec.ownership_state,
254 ec.lease_reclaim_count,
255 a.lease_expires_at_ms,
256 a.worker_instance_id
257 FROM ff_exec_core ec
258 LEFT JOIN ff_attempt a
259 ON a.partition_key = ec.partition_key
260 AND a.execution_id = ec.execution_id
261 AND a.attempt_index = ec.attempt_index
262 WHERE ec.partition_key = $1 AND ec.execution_id = $2
263 FOR NO KEY UPDATE OF ec
264 "#,
265 )
266 .bind(part)
267 .bind(exec_uuid)
268 .fetch_optional(&mut *tx)
269 .await
270 .map_err(map_sqlx_error)?;
271
272 let Some(row) = row else {
273 tx.rollback().await.map_err(map_sqlx_error)?;
274 return Err(EngineError::NotFound {
275 entity: "execution",
276 });
277 };
278
279 let lifecycle_phase: String = row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
280 let ownership_state: String = row.try_get("ownership_state").map_err(map_sqlx_error)?;
281 let reclaim_count: i32 = row.try_get("lease_reclaim_count").map_err(map_sqlx_error)?;
282 let lease_expires_at: Option<i64> = row
283 .try_get::<Option<i64>, _>("lease_expires_at_ms")
284 .map_err(map_sqlx_error)?;
285 let worker_instance_id: Option<String> = row
286 .try_get::<Option<String>, _>("worker_instance_id")
287 .map_err(map_sqlx_error)?;
288
289 let reclaim_count_u = u32::try_from(reclaim_count.max(0)).unwrap_or(0);
290 if reclaim_count_u >= DEFAULT_MAX_RECLAIM_COUNT {
291 tx.rollback().await.map_err(map_sqlx_error)?;
292 return Ok(IssueReclaimGrantOutcome::ReclaimCapExceeded {
293 execution_id: args.execution_id.clone(),
294 reclaim_count: reclaim_count_u,
295 });
296 }
297
298 let now = now_ms();
299 let lease_expired = match lease_expires_at {
300 Some(exp) => exp <= now,
301 None => worker_instance_id.is_none() || worker_instance_id.as_deref() == Some(""),
302 };
303 let reclaimable_state = matches!(
304 ownership_state.as_str(),
305 "lease_expired_reclaimable" | "lease_revoked"
306 );
307 let phase_active = lifecycle_phase == "active";
308 if !(phase_active && (reclaimable_state || lease_expired)) {
309 tx.rollback().await.map_err(map_sqlx_error)?;
310 return Ok(IssueReclaimGrantOutcome::NotReclaimable {
311 execution_id: args.execution_id.clone(),
312 detail: format!(
313 "execution not reclaimable: lifecycle_phase={lifecycle_phase}, ownership_state={ownership_state}"
314 ),
315 });
316 }
317
318 let partition = Partition {
319 family: PartitionFamily::Execution,
320 index: part as u16,
321 };
322 let hash_tag = partition.hash_tag();
323 let expires_at_ms = now.saturating_add_unsigned(args.grant_ttl_ms.min(i64::MAX as u64));
324 let message = format!(
325 "{hash_tag}|{exec_uuid}|{wid}|{wiid}|{exp}|reclaim",
326 wid = args.worker_id.as_str(),
327 wiid = args.worker_instance_id.as_str(),
328 exp = expires_at_ms,
329 );
330 let sig = hmac_sign(&secret, &kid, message.as_bytes());
331 let grant_key = format!("pg:reclaim:{hash_tag}:{exec_uuid}:{expires_at_ms}:{sig}");
332 let grant_id = grant_id_from_key(&grant_key);
333
334 sqlx::query(
335 r#"
336 INSERT INTO ff_claim_grant (
337 partition_key, grant_id, execution_id, kind,
338 worker_id, worker_instance_id, lane_id,
339 capability_hash, worker_capabilities,
340 route_snapshot_json, admission_summary,
341 grant_ttl_ms, issued_at_ms, expires_at_ms
342 ) VALUES (
343 $1, $2, $3, 'reclaim',
344 $4, $5, $6,
345 $7, $8,
346 $9, $10,
347 $11, $12, $13
348 )
349 "#,
350 )
351 .bind(part)
352 .bind(&grant_id)
353 .bind(exec_uuid)
354 .bind(args.worker_id.as_str())
355 .bind(args.worker_instance_id.as_str())
356 .bind(args.lane_id.as_str())
357 .bind(args.capability_hash.as_deref())
358 .bind(capabilities_jsonb(&args.worker_capabilities))
359 .bind(args.route_snapshot_json.as_deref())
360 .bind(args.admission_summary.as_deref())
361 .bind(i64::try_from(args.grant_ttl_ms).unwrap_or(i64::MAX))
362 .bind(now)
363 .bind(expires_at_ms)
364 .execute(&mut *tx)
365 .await
366 .map_err(map_sqlx_error)?;
367
368 tx.commit().await.map_err(map_sqlx_error)?;
369
370 Ok(IssueReclaimGrantOutcome::Granted(ReclaimGrant::new(
371 args.execution_id.clone(),
372 PartitionKey::from(&partition),
373 grant_key,
374 expires_at_ms as u64,
375 args.lane_id.clone(),
376 )))
377}
378
379pub async fn issue_reclaim_grant_impl(
380 pool: &PgPool,
381 args: IssueReclaimGrantArgs,
382) -> Result<IssueReclaimGrantOutcome, EngineError> {
383 let mut last: Option<EngineError> = None;
384 for attempt in 0..MAX_ATTEMPTS {
385 match issue_reclaim_grant_once(pool, &args).await {
386 Ok(r) => return Ok(r),
387 Err(err) if is_retryable_serialization(&err) => {
388 if attempt + 1 < MAX_ATTEMPTS {
389 let ms = 5u64 * (1u64 << attempt);
390 tokio::time::sleep(Duration::from_millis(ms)).await;
391 }
392 last = Some(err);
393 continue;
394 }
395 Err(err) => return Err(err),
396 }
397 }
398 let _ = last;
399 Err(EngineError::Contention(ContentionKind::RetryExhausted))
400}
401
402async fn reclaim_execution_once(
405 pool: &PgPool,
406 args: &ReclaimExecutionArgs,
407) -> Result<ReclaimExecutionOutcome, EngineError> {
408 let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
409 let max_reclaim_count = args.max_reclaim_count.unwrap_or(DEFAULT_MAX_RECLAIM_COUNT);
410
411 let mut tx = begin_serializable(pool).await?;
412
413 let grant_row = sqlx::query(
414 r#"
415 SELECT grant_id, worker_id, expires_at_ms, lane_id
416 FROM ff_claim_grant
417 WHERE partition_key = $1
418 AND execution_id = $2
419 AND kind = 'reclaim'
420 ORDER BY issued_at_ms DESC
421 FOR UPDATE
422 LIMIT 1
423 "#,
424 )
425 .bind(part)
426 .bind(exec_uuid)
427 .fetch_optional(&mut *tx)
428 .await
429 .map_err(map_sqlx_error)?;
430
431 let Some(grant_row) = grant_row else {
432 tx.rollback().await.map_err(map_sqlx_error)?;
433 return Ok(ReclaimExecutionOutcome::GrantNotFound {
434 execution_id: args.execution_id.clone(),
435 });
436 };
437 let grant_id: Vec<u8> = grant_row.try_get("grant_id").map_err(map_sqlx_error)?;
438 let grant_worker_id: String = grant_row.try_get("worker_id").map_err(map_sqlx_error)?;
439 let grant_expires_at_ms: i64 = grant_row
440 .try_get("expires_at_ms")
441 .map_err(map_sqlx_error)?;
442
443 if grant_worker_id != args.worker_id.as_str() {
446 tx.rollback().await.map_err(map_sqlx_error)?;
447 return Err(EngineError::Validation {
448 kind: ValidationKind::InvalidInput,
449 detail: format!(
450 "reclaim_execution: grant.worker_id={grant_worker_id} != args.worker_id={}",
451 args.worker_id.as_str()
452 ),
453 });
454 }
455 let now = now_ms();
456 if grant_expires_at_ms <= now {
457 tx.rollback().await.map_err(map_sqlx_error)?;
458 return Ok(ReclaimExecutionOutcome::GrantNotFound {
459 execution_id: args.execution_id.clone(),
460 });
461 }
462
463 let core_row = sqlx::query(
464 r#"
465 SELECT ec.lifecycle_phase, ec.attempt_index, ec.lease_reclaim_count,
466 COALESCE(a.lease_epoch, 0) AS prior_lease_epoch
467 FROM ff_exec_core ec
468 LEFT JOIN ff_attempt a
469 ON a.partition_key = ec.partition_key
470 AND a.execution_id = ec.execution_id
471 AND a.attempt_index = ec.attempt_index
472 WHERE ec.partition_key = $1 AND ec.execution_id = $2
473 FOR NO KEY UPDATE OF ec
474 "#,
475 )
476 .bind(part)
477 .bind(exec_uuid)
478 .fetch_optional(&mut *tx)
479 .await
480 .map_err(map_sqlx_error)?;
481 let Some(core_row) = core_row else {
482 tx.rollback().await.map_err(map_sqlx_error)?;
483 return Err(EngineError::NotFound {
484 entity: "execution",
485 });
486 };
487 let lifecycle_phase: String = core_row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
488 let cur_attempt_index: i32 = core_row.try_get("attempt_index").map_err(map_sqlx_error)?;
489 let cur_reclaim_count: i32 = core_row
490 .try_get("lease_reclaim_count")
491 .map_err(map_sqlx_error)?;
492 let prior_lease_epoch: i64 = core_row
498 .try_get("prior_lease_epoch")
499 .map_err(map_sqlx_error)?;
500
501 if lifecycle_phase == "terminal" || lifecycle_phase == "cancelled" {
502 tx.rollback().await.map_err(map_sqlx_error)?;
503 return Ok(ReclaimExecutionOutcome::NotReclaimable {
504 execution_id: args.execution_id.clone(),
505 detail: format!("execution terminal: lifecycle_phase={lifecycle_phase}"),
506 });
507 }
508
509 let next_reclaim_count = (cur_reclaim_count.max(0) as u32).saturating_add(1);
513 if next_reclaim_count > max_reclaim_count {
514 sqlx::query(
515 r#"
516 UPDATE ff_exec_core
517 SET lifecycle_phase = 'terminal',
518 ownership_state = 'unowned',
519 eligibility_state = 'not_applicable',
520 public_state = 'failed',
521 attempt_state = 'terminal_failed',
522 terminal_at_ms = COALESCE(terminal_at_ms, $3),
523 lease_reclaim_count = $4,
524 cancellation_reason = COALESCE(cancellation_reason, 'reclaim_cap_exceeded')
525 WHERE partition_key = $1 AND execution_id = $2
526 "#,
527 )
528 .bind(part)
529 .bind(exec_uuid)
530 .bind(now)
531 .bind(i32::try_from(next_reclaim_count).unwrap_or(i32::MAX))
532 .execute(&mut *tx)
533 .await
534 .map_err(map_sqlx_error)?;
535 sqlx::query("DELETE FROM ff_claim_grant WHERE partition_key = $1 AND grant_id = $2")
536 .bind(part)
537 .bind(&grant_id)
538 .execute(&mut *tx)
539 .await
540 .map_err(map_sqlx_error)?;
541
542 sqlx::query(
548 r#"
549 INSERT INTO ff_completion_event (
550 partition_key, execution_id, flow_id, outcome,
551 namespace, instance_tag, occurred_at_ms
552 )
553 SELECT partition_key, execution_id, flow_id, 'failed',
554 NULL, NULL, $3
555 FROM ff_exec_core
556 WHERE partition_key = $1 AND execution_id = $2
557 "#,
558 )
559 .bind(part)
560 .bind(exec_uuid)
561 .bind(now)
562 .execute(&mut *tx)
563 .await
564 .map_err(map_sqlx_error)?;
565 lease_event::emit(
566 &mut tx,
567 part,
568 exec_uuid,
569 None,
570 lease_event::EVENT_REVOKED,
571 now,
572 )
573 .await?;
574
575 tx.commit().await.map_err(map_sqlx_error)?;
576 return Ok(ReclaimExecutionOutcome::ReclaimCapExceeded {
577 execution_id: args.execution_id.clone(),
578 reclaim_count: next_reclaim_count,
579 });
580 }
581
582 sqlx::query(
584 r#"
585 UPDATE ff_attempt
586 SET outcome = 'interrupted_reclaimed',
587 terminal_at_ms = COALESCE(terminal_at_ms, $4)
588 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
589 "#,
590 )
591 .bind(part)
592 .bind(exec_uuid)
593 .bind(cur_attempt_index)
594 .bind(now)
595 .execute(&mut *tx)
596 .await
597 .map_err(map_sqlx_error)?;
598
599 let new_attempt_index = cur_attempt_index + 1;
605 let new_lease_epoch: i64 = prior_lease_epoch.max(0).saturating_add(1);
610 let lease_ttl_ms = i64::try_from(args.lease_ttl_ms).unwrap_or(i64::MAX);
611 let new_lease_expires = now.saturating_add(lease_ttl_ms);
612 sqlx::query(
613 r#"
614 INSERT INTO ff_attempt (
615 partition_key, execution_id, attempt_index,
616 worker_id, worker_instance_id,
617 lease_epoch, lease_expires_at_ms, started_at_ms,
618 raw_fields
619 ) VALUES (
620 $1, $2, $3,
621 $4, $5,
622 $8, $6, $7,
623 jsonb_build_object('attempt_type', 'reclaim')
624 )
625 "#,
626 )
627 .bind(part)
628 .bind(exec_uuid)
629 .bind(new_attempt_index)
630 .bind(args.worker_id.as_str())
631 .bind(args.worker_instance_id.as_str())
632 .bind(new_lease_expires)
633 .bind(now)
634 .bind(new_lease_epoch)
635 .execute(&mut *tx)
636 .await
637 .map_err(map_sqlx_error)?;
638
639 sqlx::query(
641 r#"
642 UPDATE ff_exec_core
643 SET lifecycle_phase = 'active',
644 ownership_state = 'leased',
645 eligibility_state = 'not_applicable',
646 public_state = 'running',
647 attempt_state = 'running_attempt',
648 attempt_index = $3,
649 lease_reclaim_count = $4,
650 started_at_ms = COALESCE(started_at_ms, $5)
651 WHERE partition_key = $1 AND execution_id = $2
652 "#,
653 )
654 .bind(part)
655 .bind(exec_uuid)
656 .bind(new_attempt_index)
657 .bind(i32::try_from(next_reclaim_count).unwrap_or(i32::MAX))
658 .bind(now)
659 .execute(&mut *tx)
660 .await
661 .map_err(map_sqlx_error)?;
662
663 sqlx::query("DELETE FROM ff_claim_grant WHERE partition_key = $1 AND grant_id = $2")
665 .bind(part)
666 .bind(&grant_id)
667 .execute(&mut *tx)
668 .await
669 .map_err(map_sqlx_error)?;
670
671 lease_event::emit(
673 &mut tx,
674 part,
675 exec_uuid,
676 None,
677 lease_event::EVENT_RECLAIMED,
678 now,
679 )
680 .await?;
681
682 tx.commit().await.map_err(map_sqlx_error)?;
683
684 let payload = HandlePayload::new(
693 args.execution_id.clone(),
694 AttemptIndex::new(u32::try_from(new_attempt_index.max(0)).unwrap_or(0)),
695 args.attempt_id.clone(),
696 args.lease_id.clone(),
697 LeaseEpoch(u64::try_from(new_lease_epoch.max(0)).unwrap_or(0)),
698 u32::try_from(args.lease_ttl_ms.min(u32::MAX as u64)).unwrap_or(u32::MAX) as u64,
699 args.lane_id.clone(),
700 args.worker_instance_id.clone(),
701 );
702 Ok(ReclaimExecutionOutcome::Claimed(mint_handle(
703 payload,
704 HandleKind::Reclaimed,
705 )))
706}
707
708pub async fn reclaim_execution_impl(
709 pool: &PgPool,
710 args: ReclaimExecutionArgs,
711) -> Result<ReclaimExecutionOutcome, EngineError> {
712 let mut last: Option<EngineError> = None;
713 for attempt in 0..MAX_ATTEMPTS {
714 match reclaim_execution_once(pool, &args).await {
715 Ok(r) => return Ok(r),
716 Err(err) if is_retryable_serialization(&err) => {
717 if attempt + 1 < MAX_ATTEMPTS {
718 let ms = 5u64 * (1u64 << attempt);
719 tokio::time::sleep(Duration::from_millis(ms)).await;
720 }
721 last = Some(err);
722 continue;
723 }
724 Err(err) => return Err(err),
725 }
726 }
727 let _ = last;
728 Err(EngineError::Contention(ContentionKind::RetryExhausted))
729}
730
731fn mint_handle(payload: HandlePayload, kind: HandleKind) -> Handle {
732 let op = encode_opaque(BackendTag::Postgres, &payload);
733 Handle::new(BackendTag::Postgres, kind, op)
734}