1use ff_core::backend::{
37 BackendTag, CapabilitySet, ClaimPolicy, FailOutcome, FailureClass, FailureReason, Handle,
38 HandleKind, LeaseRenewal, ResumeToken,
39};
40use ff_core::caps::{matches as caps_matches, CapabilityRequirement};
41use ff_core::engine_error::{ContentionKind, EngineError};
42use ff_core::handle_codec::{decode as decode_opaque, encode as encode_opaque, HandlePayload};
43use ff_core::types::{
44 AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, TimestampMs,
45};
46use sqlx::{PgPool, Row};
47use std::time::{SystemTime, UNIX_EPOCH};
48use uuid::Uuid;
49
50use crate::error::map_sqlx_error;
51use crate::lease_event;
52
53fn now_ms() -> i64 {
56 i64::try_from(
57 SystemTime::now()
58 .duration_since(UNIX_EPOCH)
59 .map(|d| d.as_millis())
60 .unwrap_or(0),
61 )
62 .unwrap_or(i64::MAX)
63}
64
65pub(crate) fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
68 let s = eid.as_str();
69 let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
70 kind: ff_core::engine_error::ValidationKind::InvalidInput,
71 detail: format!("execution_id missing `{{fp:` prefix: {s}"),
72 })?;
73 let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
74 kind: ff_core::engine_error::ValidationKind::InvalidInput,
75 detail: format!("execution_id missing `}}:`: {s}"),
76 })?;
77 let part: i16 = rest[..close]
78 .parse()
79 .map_err(|_| EngineError::Validation {
80 kind: ff_core::engine_error::ValidationKind::InvalidInput,
81 detail: format!("execution_id partition index not u16: {s}"),
82 })?;
83 let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
84 kind: ff_core::engine_error::ValidationKind::InvalidInput,
85 detail: format!("execution_id UUID invalid: {s}"),
86 })?;
87 Ok((part, uuid))
88}
89
90fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
91 if handle.backend != BackendTag::Postgres {
92 return Err(EngineError::Validation {
93 kind: ff_core::engine_error::ValidationKind::Corruption,
94 detail: format!(
95 "handle minted by {:?} passed to Postgres backend",
96 handle.backend
97 ),
98 });
99 }
100 let decoded = decode_opaque(&handle.opaque)?;
101 if decoded.tag != BackendTag::Postgres {
102 return Err(EngineError::Validation {
103 kind: ff_core::engine_error::ValidationKind::Corruption,
104 detail: format!("inner handle tag mismatch: {:?}", decoded.tag),
105 });
106 }
107 Ok(decoded.payload)
108}
109
110fn mint_handle(payload: HandlePayload, kind: HandleKind) -> Handle {
111 let op = encode_opaque(BackendTag::Postgres, &payload);
112 Handle::new(BackendTag::Postgres, kind, op)
113}
114
115pub(crate) async fn claim(
118 pool: &PgPool,
119 lane: &LaneId,
120 capabilities: &CapabilitySet,
121 policy: &ClaimPolicy,
122) -> Result<Option<Handle>, EngineError> {
123 let total_partitions: i16 = 256;
128 for part in 0..total_partitions {
129 match try_claim_in_partition(pool, part, lane, capabilities, policy).await? {
130 Some(h) => return Ok(Some(h)),
131 None => continue,
132 }
133 }
134 Ok(None)
135}
136
137async fn try_claim_in_partition(
138 pool: &PgPool,
139 part: i16,
140 lane: &LaneId,
141 capabilities: &CapabilitySet,
142 policy: &ClaimPolicy,
143) -> Result<Option<Handle>, EngineError> {
144 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
145 let row = sqlx::query(
148 r#"
149 SELECT execution_id, required_capabilities, attempt_index
150 FROM ff_exec_core
151 WHERE partition_key = $1
152 AND lane_id = $2
153 AND lifecycle_phase = 'runnable'
154 AND eligibility_state = 'eligible_now'
155 ORDER BY priority DESC, created_at_ms ASC
156 FOR UPDATE SKIP LOCKED
157 LIMIT 1
158 "#,
159 )
160 .bind(part)
161 .bind(lane.as_str())
162 .fetch_optional(&mut *tx)
163 .await
164 .map_err(map_sqlx_error)?;
165
166 let Some(row) = row else {
167 tx.rollback().await.map_err(map_sqlx_error)?;
169 return Ok(None);
170 };
171
172 let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
173 let required_caps: Vec<String> = row
174 .try_get::<Vec<String>, _>("required_capabilities")
175 .map_err(map_sqlx_error)?;
176 let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
177 let req = CapabilityRequirement::new(required_caps);
178 if !caps_matches(&req, capabilities) {
179 tx.rollback().await.map_err(map_sqlx_error)?;
181 return Ok(None);
182 }
183
184 let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
185 let now = now_ms();
186 let lease_ttl_ms = i64::from(policy.lease_ttl_ms);
187 let expires = now.saturating_add(lease_ttl_ms);
188
189 sqlx::query(
193 r#"
194 INSERT INTO ff_attempt (
195 partition_key, execution_id, attempt_index,
196 worker_id, worker_instance_id,
197 lease_epoch, lease_expires_at_ms, started_at_ms
198 ) VALUES ($1, $2, $3, $4, $5, 1, $6, $7)
199 ON CONFLICT (partition_key, execution_id, attempt_index)
200 DO UPDATE SET
201 worker_id = EXCLUDED.worker_id,
202 worker_instance_id = EXCLUDED.worker_instance_id,
203 lease_epoch = ff_attempt.lease_epoch + 1,
204 lease_expires_at_ms = EXCLUDED.lease_expires_at_ms,
205 started_at_ms = EXCLUDED.started_at_ms,
206 outcome = NULL
207 "#,
208 )
209 .bind(part)
210 .bind(exec_uuid)
211 .bind(attempt_index_i)
212 .bind(policy.worker_id.as_str())
213 .bind(policy.worker_instance_id.as_str())
214 .bind(expires)
215 .bind(now)
216 .execute(&mut *tx)
217 .await
218 .map_err(map_sqlx_error)?;
219
220 let epoch_row = sqlx::query(
222 r#"
223 SELECT lease_epoch FROM ff_attempt
224 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
225 "#,
226 )
227 .bind(part)
228 .bind(exec_uuid)
229 .bind(attempt_index_i)
230 .fetch_one(&mut *tx)
231 .await
232 .map_err(map_sqlx_error)?;
233 let epoch_i: i64 = epoch_row.try_get("lease_epoch").map_err(map_sqlx_error)?;
234
235 sqlx::query(
246 r#"
247 UPDATE ff_exec_core
248 SET lifecycle_phase = 'active',
249 ownership_state = 'leased',
250 eligibility_state = 'not_applicable',
251 public_state = 'running',
252 attempt_state = 'running_attempt',
253 started_at_ms = COALESCE(started_at_ms, $3)
254 WHERE partition_key = $1 AND execution_id = $2
255 "#,
256 )
257 .bind(part)
258 .bind(exec_uuid)
259 .bind(now)
260 .execute(&mut *tx)
261 .await
262 .map_err(map_sqlx_error)?;
263
264 lease_event::emit(
266 &mut tx,
267 part,
268 exec_uuid,
269 None,
270 lease_event::EVENT_ACQUIRED,
271 now,
272 )
273 .await?;
274
275 tx.commit().await.map_err(map_sqlx_error)?;
276
277 let exec_id = ExecutionId::parse(&format!("{{fp:{part}}}:{exec_uuid}")).map_err(|e| {
278 EngineError::Validation {
279 kind: ff_core::engine_error::ValidationKind::InvalidInput,
280 detail: format!("reassembling exec id: {e}"),
281 }
282 })?;
283 let payload = HandlePayload::new(
284 exec_id,
285 attempt_index,
286 AttemptId::new(),
287 LeaseId::new(),
288 LeaseEpoch(u64::try_from(epoch_i).unwrap_or(1)),
289 u64::from(policy.lease_ttl_ms),
290 lane.clone(),
291 policy.worker_instance_id.clone(),
292 );
293 Ok(Some(mint_handle(payload, HandleKind::Fresh)))
294}
295
296pub(crate) async fn claim_from_resume_grant(
299 pool: &PgPool,
300 token: ResumeToken,
301) -> Result<Option<Handle>, EngineError> {
302 let eid = &token.grant.execution_id;
303 let (part, exec_uuid) = split_exec_id(eid)?;
304 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
305 let row = sqlx::query(
307 r#"
308 SELECT attempt_index, lease_epoch, lease_expires_at_ms
309 FROM ff_attempt
310 WHERE partition_key = $1 AND execution_id = $2
311 ORDER BY attempt_index DESC
312 FOR UPDATE
313 LIMIT 1
314 "#,
315 )
316 .bind(part)
317 .bind(exec_uuid)
318 .fetch_optional(&mut *tx)
319 .await
320 .map_err(map_sqlx_error)?;
321 let Some(row) = row else {
322 tx.rollback().await.map_err(map_sqlx_error)?;
323 return Err(EngineError::NotFound { entity: "attempt" });
324 };
325 let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
326 let current_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
327 let expires_at: Option<i64> = row
328 .try_get::<Option<i64>, _>("lease_expires_at_ms")
329 .map_err(map_sqlx_error)?;
330
331 let now = now_ms();
335 let live = matches!(expires_at, Some(exp) if exp > now);
336 if live {
337 tx.rollback().await.map_err(map_sqlx_error)?;
338 return Ok(None); }
340
341 let lease_ttl_ms = i64::from(token.lease_ttl_ms);
343 let new_expires = now.saturating_add(lease_ttl_ms);
344 sqlx::query(
345 r#"
346 UPDATE ff_attempt
347 SET worker_id = $1,
348 worker_instance_id = $2,
349 lease_epoch = lease_epoch + 1,
350 lease_expires_at_ms = $3,
351 started_at_ms = $4,
352 outcome = NULL
353 WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7
354 "#,
355 )
356 .bind(token.worker_id.as_str())
357 .bind(token.worker_instance_id.as_str())
358 .bind(new_expires)
359 .bind(now)
360 .bind(part)
361 .bind(exec_uuid)
362 .bind(attempt_index_i)
363 .execute(&mut *tx)
364 .await
365 .map_err(map_sqlx_error)?;
366
367 sqlx::query(
368 r#"
369 UPDATE ff_exec_core
370 SET lifecycle_phase = 'active',
371 ownership_state = 'leased',
372 eligibility_state = 'not_applicable',
373 attempt_state = 'running_attempt'
374 WHERE partition_key = $1 AND execution_id = $2
375 "#,
376 )
377 .bind(part)
378 .bind(exec_uuid)
379 .execute(&mut *tx)
380 .await
381 .map_err(map_sqlx_error)?;
382
383 lease_event::emit(
385 &mut tx,
386 part,
387 exec_uuid,
388 None,
389 lease_event::EVENT_RECLAIMED,
390 now,
391 )
392 .await?;
393
394 tx.commit().await.map_err(map_sqlx_error)?;
395
396 let new_epoch = current_epoch.saturating_add(1);
397 let payload = HandlePayload::new(
398 eid.clone(),
399 AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0)),
400 AttemptId::new(),
401 LeaseId::new(),
402 LeaseEpoch(u64::try_from(new_epoch).unwrap_or(0)),
403 u64::from(token.lease_ttl_ms),
404 token.grant.lane_id.clone(),
405 token.worker_instance_id.clone(),
406 );
407 Ok(Some(mint_handle(payload, HandleKind::Resumed)))
408}
409
410async fn fence_check<'c>(
417 tx: &mut sqlx::Transaction<'c, sqlx::Postgres>,
418 part: i16,
419 exec_uuid: Uuid,
420 attempt_index: i32,
421 expected_epoch: u64,
422) -> Result<(), EngineError> {
423 let row = sqlx::query(
424 r#"
425 SELECT lease_epoch FROM ff_attempt
426 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
427 FOR UPDATE
428 "#,
429 )
430 .bind(part)
431 .bind(exec_uuid)
432 .bind(attempt_index)
433 .fetch_optional(&mut **tx)
434 .await
435 .map_err(map_sqlx_error)?;
436 let Some(row) = row else {
437 return Err(EngineError::NotFound { entity: "attempt" });
438 };
439 let epoch_i: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
440 let observed = u64::try_from(epoch_i).unwrap_or(0);
441 if observed != expected_epoch {
442 return Err(EngineError::Contention(ContentionKind::LeaseConflict));
443 }
444 Ok(())
445}
446
447pub(crate) async fn renew(
450 pool: &PgPool,
451 handle: &Handle,
452) -> Result<LeaseRenewal, EngineError> {
453 let payload = decode_handle(handle)?;
454 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
455 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
456 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
457 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
458 let now = now_ms();
459 let new_expires = now.saturating_add(i64::try_from(payload.lease_ttl_ms).unwrap_or(0));
460 sqlx::query(
461 r#"
462 UPDATE ff_attempt
463 SET lease_expires_at_ms = $1
464 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
465 "#,
466 )
467 .bind(new_expires)
468 .bind(part)
469 .bind(exec_uuid)
470 .bind(attempt_index)
471 .execute(&mut *tx)
472 .await
473 .map_err(map_sqlx_error)?;
474 lease_event::emit(
476 &mut tx,
477 part,
478 exec_uuid,
479 None,
480 lease_event::EVENT_RENEWED,
481 now,
482 )
483 .await?;
484 tx.commit().await.map_err(map_sqlx_error)?;
485 Ok(LeaseRenewal::new(
486 u64::try_from(new_expires).unwrap_or(0),
487 payload.lease_epoch.0,
488 ))
489}
490
491pub(crate) async fn progress(
494 pool: &PgPool,
495 handle: &Handle,
496 percent: Option<u8>,
497 message: Option<String>,
498) -> Result<(), EngineError> {
499 let payload = decode_handle(handle)?;
500 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
501 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
502 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
503 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
504 let mut patch = serde_json::Map::new();
511 if let Some(pct) = percent {
512 patch.insert("progress_pct".into(), serde_json::Value::from(pct));
513 }
514 if let Some(msg) = message {
515 patch.insert("progress_message".into(), serde_json::Value::from(msg));
516 }
517 let patch_val = serde_json::Value::Object(patch);
518 sqlx::query(
519 r#"
520 UPDATE ff_exec_core
521 SET raw_fields = raw_fields || $1::jsonb
522 WHERE partition_key = $2 AND execution_id = $3
523 "#,
524 )
525 .bind(patch_val)
526 .bind(part)
527 .bind(exec_uuid)
528 .execute(&mut *tx)
529 .await
530 .map_err(map_sqlx_error)?;
531 tx.commit().await.map_err(map_sqlx_error)?;
532 Ok(())
533}
534
535pub(crate) async fn complete(
538 pool: &PgPool,
539 handle: &Handle,
540 payload_bytes: Option<Vec<u8>>,
541) -> Result<(), EngineError> {
542 let payload = decode_handle(handle)?;
543 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
544 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
545 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
546 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
547 let now = now_ms();
548
549 sqlx::query(
550 r#"
551 UPDATE ff_attempt
552 SET terminal_at_ms = $1,
553 outcome = 'success',
554 lease_expires_at_ms = NULL
555 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
556 "#,
557 )
558 .bind(now)
559 .bind(part)
560 .bind(exec_uuid)
561 .bind(attempt_index)
562 .execute(&mut *tx)
563 .await
564 .map_err(map_sqlx_error)?;
565
566 sqlx::query(
567 r#"
568 UPDATE ff_exec_core
569 SET lifecycle_phase = 'terminal',
570 ownership_state = 'unowned',
571 eligibility_state = 'not_applicable',
572 attempt_state = 'attempt_terminal',
573 terminal_at_ms = $1,
574 result = $2
575 WHERE partition_key = $3 AND execution_id = $4
576 "#,
577 )
578 .bind(now)
579 .bind(payload_bytes.as_deref())
580 .bind(part)
581 .bind(exec_uuid)
582 .execute(&mut *tx)
583 .await
584 .map_err(map_sqlx_error)?;
585
586 sqlx::query(
588 r#"
589 INSERT INTO ff_completion_event (
590 partition_key, execution_id, flow_id, outcome,
591 namespace, instance_tag, occurred_at_ms
592 )
593 SELECT partition_key, execution_id, flow_id, 'success',
594 NULL, NULL, $3
595 FROM ff_exec_core
596 WHERE partition_key = $1 AND execution_id = $2
597 "#,
598 )
599 .bind(part)
600 .bind(exec_uuid)
601 .bind(now)
602 .execute(&mut *tx)
603 .await
604 .map_err(map_sqlx_error)?;
605
606 lease_event::emit(
608 &mut tx,
609 part,
610 exec_uuid,
611 None,
612 lease_event::EVENT_REVOKED,
613 now,
614 )
615 .await?;
616
617 tx.commit().await.map_err(map_sqlx_error)?;
618 Ok(())
619}
620
621pub(crate) async fn fail(
624 pool: &PgPool,
625 handle: &Handle,
626 reason: FailureReason,
627 classification: FailureClass,
628) -> Result<FailOutcome, EngineError> {
629 let payload = decode_handle(handle)?;
630 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
631 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
632 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
633 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
634 let now = now_ms();
635 let retryable = matches!(
636 classification,
637 FailureClass::Transient | FailureClass::InfraCrash
638 );
639
640 if retryable {
641 sqlx::query(
643 r#"
644 UPDATE ff_attempt
645 SET terminal_at_ms = $1,
646 outcome = 'retry',
647 lease_expires_at_ms = NULL
648 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
649 "#,
650 )
651 .bind(now)
652 .bind(part)
653 .bind(exec_uuid)
654 .bind(attempt_index)
655 .execute(&mut *tx)
656 .await
657 .map_err(map_sqlx_error)?;
658
659 sqlx::query(
660 r#"
661 UPDATE ff_exec_core
662 SET lifecycle_phase = 'runnable',
663 ownership_state = 'unowned',
664 eligibility_state = 'eligible_now',
665 attempt_state = 'pending_retry_attempt',
666 attempt_index = attempt_index + 1,
667 raw_fields = raw_fields || jsonb_build_object('last_failure_message', $1::text)
668 WHERE partition_key = $2 AND execution_id = $3
669 "#,
670 )
671 .bind(&reason.message)
672 .bind(part)
673 .bind(exec_uuid)
674 .execute(&mut *tx)
675 .await
676 .map_err(map_sqlx_error)?;
677
678 lease_event::emit(
680 &mut tx,
681 part,
682 exec_uuid,
683 None,
684 lease_event::EVENT_REVOKED,
685 now,
686 )
687 .await?;
688
689 tx.commit().await.map_err(map_sqlx_error)?;
690 Ok(FailOutcome::RetryScheduled {
691 delay_until: TimestampMs::from_millis(now),
692 })
693 } else {
694 sqlx::query(
696 r#"
697 UPDATE ff_attempt
698 SET terminal_at_ms = $1,
699 outcome = 'failed',
700 lease_expires_at_ms = NULL
701 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
702 "#,
703 )
704 .bind(now)
705 .bind(part)
706 .bind(exec_uuid)
707 .bind(attempt_index)
708 .execute(&mut *tx)
709 .await
710 .map_err(map_sqlx_error)?;
711
712 sqlx::query(
713 r#"
714 UPDATE ff_exec_core
715 SET lifecycle_phase = 'terminal',
716 ownership_state = 'unowned',
717 eligibility_state = 'not_applicable',
718 attempt_state = 'attempt_terminal',
719 terminal_at_ms = $1,
720 raw_fields = raw_fields || jsonb_build_object('last_failure_message', $2::text)
721 WHERE partition_key = $3 AND execution_id = $4
722 "#,
723 )
724 .bind(now)
725 .bind(&reason.message)
726 .bind(part)
727 .bind(exec_uuid)
728 .execute(&mut *tx)
729 .await
730 .map_err(map_sqlx_error)?;
731
732 sqlx::query(
733 r#"
734 INSERT INTO ff_completion_event (
735 partition_key, execution_id, flow_id, outcome,
736 namespace, instance_tag, occurred_at_ms
737 )
738 SELECT partition_key, execution_id, flow_id, 'failed',
739 NULL, NULL, $3
740 FROM ff_exec_core
741 WHERE partition_key = $1 AND execution_id = $2
742 "#,
743 )
744 .bind(part)
745 .bind(exec_uuid)
746 .bind(now)
747 .execute(&mut *tx)
748 .await
749 .map_err(map_sqlx_error)?;
750
751 lease_event::emit(
753 &mut tx,
754 part,
755 exec_uuid,
756 None,
757 lease_event::EVENT_REVOKED,
758 now,
759 )
760 .await?;
761
762 tx.commit().await.map_err(map_sqlx_error)?;
763 Ok(FailOutcome::TerminalFailed)
764 }
765}
766
767pub(crate) async fn delay(
770 pool: &PgPool,
771 handle: &Handle,
772 delay_until: TimestampMs,
773) -> Result<(), EngineError> {
774 let payload = decode_handle(handle)?;
775 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
776 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
777 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
778 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
779
780 sqlx::query(
781 r#"
782 UPDATE ff_attempt
783 SET outcome = 'interrupted',
784 lease_expires_at_ms = NULL
785 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
786 "#,
787 )
788 .bind(part)
789 .bind(exec_uuid)
790 .bind(attempt_index)
791 .execute(&mut *tx)
792 .await
793 .map_err(map_sqlx_error)?;
794
795 sqlx::query(
796 r#"
797 UPDATE ff_exec_core
798 SET lifecycle_phase = 'runnable',
799 ownership_state = 'unowned',
800 eligibility_state = 'not_eligible_until_time',
801 attempt_state = 'attempt_interrupted',
802 deadline_at_ms = $1
803 WHERE partition_key = $2 AND execution_id = $3
804 "#,
805 )
806 .bind(delay_until.0)
807 .bind(part)
808 .bind(exec_uuid)
809 .execute(&mut *tx)
810 .await
811 .map_err(map_sqlx_error)?;
812
813 lease_event::emit(
815 &mut tx,
816 part,
817 exec_uuid,
818 None,
819 lease_event::EVENT_REVOKED,
820 now_ms(),
821 )
822 .await?;
823
824 tx.commit().await.map_err(map_sqlx_error)?;
825 Ok(())
826}
827
828pub(crate) async fn wait_children(
831 pool: &PgPool,
832 handle: &Handle,
833) -> Result<(), EngineError> {
834 let payload = decode_handle(handle)?;
835 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
836 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
837 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
838 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
839
840 sqlx::query(
841 r#"
842 UPDATE ff_attempt
843 SET outcome = 'waiting_children',
844 lease_expires_at_ms = NULL
845 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
846 "#,
847 )
848 .bind(part)
849 .bind(exec_uuid)
850 .bind(attempt_index)
851 .execute(&mut *tx)
852 .await
853 .map_err(map_sqlx_error)?;
854
855 sqlx::query(
856 r#"
857 UPDATE ff_exec_core
858 SET lifecycle_phase = 'runnable',
859 ownership_state = 'unowned',
860 eligibility_state = 'blocked_by_dependencies',
861 attempt_state = 'attempt_interrupted',
862 blocking_reason = 'waiting_for_children'
863 WHERE partition_key = $1 AND execution_id = $2
864 "#,
865 )
866 .bind(part)
867 .bind(exec_uuid)
868 .execute(&mut *tx)
869 .await
870 .map_err(map_sqlx_error)?;
871
872 lease_event::emit(
874 &mut tx,
875 part,
876 exec_uuid,
877 None,
878 lease_event::EVENT_REVOKED,
879 now_ms(),
880 )
881 .await?;
882
883 tx.commit().await.map_err(map_sqlx_error)?;
884 Ok(())
885}
886