1use ff_core::backend::{
37 BackendTag, CapabilitySet, ClaimPolicy, FailOutcome, FailureClass, FailureReason, Handle,
38 HandleKind, LeaseRenewal, ReclaimToken,
39};
40use ff_core::caps::{matches as caps_matches, CapabilityRequirement};
41use ff_core::engine_error::{ContentionKind, EngineError};
42use ff_core::handle_codec::{decode as decode_opaque, encode as encode_opaque, HandlePayload};
43use ff_core::types::{
44 AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, TimestampMs,
45};
46use sqlx::{PgPool, Row};
47use std::time::{SystemTime, UNIX_EPOCH};
48use uuid::Uuid;
49
50use crate::error::map_sqlx_error;
51use crate::lease_event;
52
53fn now_ms() -> i64 {
56 i64::try_from(
57 SystemTime::now()
58 .duration_since(UNIX_EPOCH)
59 .map(|d| d.as_millis())
60 .unwrap_or(0),
61 )
62 .unwrap_or(i64::MAX)
63}
64
65fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
68 let s = eid.as_str();
69 let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
70 kind: ff_core::engine_error::ValidationKind::InvalidInput,
71 detail: format!("execution_id missing `{{fp:` prefix: {s}"),
72 })?;
73 let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
74 kind: ff_core::engine_error::ValidationKind::InvalidInput,
75 detail: format!("execution_id missing `}}:`: {s}"),
76 })?;
77 let part: i16 = rest[..close]
78 .parse()
79 .map_err(|_| EngineError::Validation {
80 kind: ff_core::engine_error::ValidationKind::InvalidInput,
81 detail: format!("execution_id partition index not u16: {s}"),
82 })?;
83 let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
84 kind: ff_core::engine_error::ValidationKind::InvalidInput,
85 detail: format!("execution_id UUID invalid: {s}"),
86 })?;
87 Ok((part, uuid))
88}
89
90fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
91 if handle.backend != BackendTag::Postgres {
92 return Err(EngineError::Validation {
93 kind: ff_core::engine_error::ValidationKind::Corruption,
94 detail: format!(
95 "handle minted by {:?} passed to Postgres backend",
96 handle.backend
97 ),
98 });
99 }
100 let decoded = decode_opaque(&handle.opaque)?;
101 if decoded.tag != BackendTag::Postgres {
102 return Err(EngineError::Validation {
103 kind: ff_core::engine_error::ValidationKind::Corruption,
104 detail: format!("inner handle tag mismatch: {:?}", decoded.tag),
105 });
106 }
107 Ok(decoded.payload)
108}
109
110fn mint_handle(payload: HandlePayload, kind: HandleKind) -> Handle {
111 let op = encode_opaque(BackendTag::Postgres, &payload);
112 Handle::new(BackendTag::Postgres, kind, op)
113}
114
115pub(crate) async fn claim(
118 pool: &PgPool,
119 lane: &LaneId,
120 capabilities: &CapabilitySet,
121 policy: &ClaimPolicy,
122) -> Result<Option<Handle>, EngineError> {
123 let total_partitions: i16 = 256;
128 for part in 0..total_partitions {
129 match try_claim_in_partition(pool, part, lane, capabilities, policy).await? {
130 Some(h) => return Ok(Some(h)),
131 None => continue,
132 }
133 }
134 Ok(None)
135}
136
137async fn try_claim_in_partition(
138 pool: &PgPool,
139 part: i16,
140 lane: &LaneId,
141 capabilities: &CapabilitySet,
142 policy: &ClaimPolicy,
143) -> Result<Option<Handle>, EngineError> {
144 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
145 let row = sqlx::query(
148 r#"
149 SELECT execution_id, required_capabilities, attempt_index
150 FROM ff_exec_core
151 WHERE partition_key = $1
152 AND lane_id = $2
153 AND lifecycle_phase = 'runnable'
154 AND eligibility_state = 'eligible_now'
155 ORDER BY priority DESC, created_at_ms ASC
156 FOR UPDATE SKIP LOCKED
157 LIMIT 1
158 "#,
159 )
160 .bind(part)
161 .bind(lane.as_str())
162 .fetch_optional(&mut *tx)
163 .await
164 .map_err(map_sqlx_error)?;
165
166 let Some(row) = row else {
167 tx.rollback().await.map_err(map_sqlx_error)?;
169 return Ok(None);
170 };
171
172 let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
173 let required_caps: Vec<String> = row
174 .try_get::<Vec<String>, _>("required_capabilities")
175 .map_err(map_sqlx_error)?;
176 let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
177 let req = CapabilityRequirement::new(required_caps);
178 if !caps_matches(&req, capabilities) {
179 tx.rollback().await.map_err(map_sqlx_error)?;
181 return Ok(None);
182 }
183
184 let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
185 let now = now_ms();
186 let lease_ttl_ms = i64::from(policy.lease_ttl_ms);
187 let expires = now.saturating_add(lease_ttl_ms);
188
189 sqlx::query(
193 r#"
194 INSERT INTO ff_attempt (
195 partition_key, execution_id, attempt_index,
196 worker_id, worker_instance_id,
197 lease_epoch, lease_expires_at_ms, started_at_ms
198 ) VALUES ($1, $2, $3, $4, $5, 1, $6, $7)
199 ON CONFLICT (partition_key, execution_id, attempt_index)
200 DO UPDATE SET
201 worker_id = EXCLUDED.worker_id,
202 worker_instance_id = EXCLUDED.worker_instance_id,
203 lease_epoch = ff_attempt.lease_epoch + 1,
204 lease_expires_at_ms = EXCLUDED.lease_expires_at_ms,
205 started_at_ms = EXCLUDED.started_at_ms,
206 outcome = NULL
207 "#,
208 )
209 .bind(part)
210 .bind(exec_uuid)
211 .bind(attempt_index_i)
212 .bind(policy.worker_id.as_str())
213 .bind(policy.worker_instance_id.as_str())
214 .bind(expires)
215 .bind(now)
216 .execute(&mut *tx)
217 .await
218 .map_err(map_sqlx_error)?;
219
220 let epoch_row = sqlx::query(
222 r#"
223 SELECT lease_epoch FROM ff_attempt
224 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
225 "#,
226 )
227 .bind(part)
228 .bind(exec_uuid)
229 .bind(attempt_index_i)
230 .fetch_one(&mut *tx)
231 .await
232 .map_err(map_sqlx_error)?;
233 let epoch_i: i64 = epoch_row.try_get("lease_epoch").map_err(map_sqlx_error)?;
234
235 sqlx::query(
237 r#"
238 UPDATE ff_exec_core
239 SET lifecycle_phase = 'active',
240 ownership_state = 'leased',
241 eligibility_state = 'not_applicable',
242 attempt_state = 'running_attempt'
243 WHERE partition_key = $1 AND execution_id = $2
244 "#,
245 )
246 .bind(part)
247 .bind(exec_uuid)
248 .execute(&mut *tx)
249 .await
250 .map_err(map_sqlx_error)?;
251
252 lease_event::emit(
254 &mut tx,
255 part,
256 exec_uuid,
257 None,
258 lease_event::EVENT_ACQUIRED,
259 now,
260 )
261 .await?;
262
263 tx.commit().await.map_err(map_sqlx_error)?;
264
265 let exec_id = ExecutionId::parse(&format!("{{fp:{part}}}:{exec_uuid}")).map_err(|e| {
266 EngineError::Validation {
267 kind: ff_core::engine_error::ValidationKind::InvalidInput,
268 detail: format!("reassembling exec id: {e}"),
269 }
270 })?;
271 let payload = HandlePayload::new(
272 exec_id,
273 attempt_index,
274 AttemptId::new(),
275 LeaseId::new(),
276 LeaseEpoch(u64::try_from(epoch_i).unwrap_or(1)),
277 u64::from(policy.lease_ttl_ms),
278 lane.clone(),
279 policy.worker_instance_id.clone(),
280 );
281 Ok(Some(mint_handle(payload, HandleKind::Fresh)))
282}
283
284pub(crate) async fn claim_from_reclaim(
287 pool: &PgPool,
288 token: ReclaimToken,
289) -> Result<Option<Handle>, EngineError> {
290 let eid = &token.grant.execution_id;
291 let (part, exec_uuid) = split_exec_id(eid)?;
292 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
293 let row = sqlx::query(
295 r#"
296 SELECT attempt_index, lease_epoch, lease_expires_at_ms
297 FROM ff_attempt
298 WHERE partition_key = $1 AND execution_id = $2
299 ORDER BY attempt_index DESC
300 FOR UPDATE
301 LIMIT 1
302 "#,
303 )
304 .bind(part)
305 .bind(exec_uuid)
306 .fetch_optional(&mut *tx)
307 .await
308 .map_err(map_sqlx_error)?;
309 let Some(row) = row else {
310 tx.rollback().await.map_err(map_sqlx_error)?;
311 return Err(EngineError::NotFound { entity: "attempt" });
312 };
313 let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
314 let current_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
315 let expires_at: Option<i64> = row
316 .try_get::<Option<i64>, _>("lease_expires_at_ms")
317 .map_err(map_sqlx_error)?;
318
319 let now = now_ms();
323 let live = matches!(expires_at, Some(exp) if exp > now);
324 if live {
325 tx.rollback().await.map_err(map_sqlx_error)?;
326 return Ok(None); }
328
329 let lease_ttl_ms = i64::from(token.lease_ttl_ms);
331 let new_expires = now.saturating_add(lease_ttl_ms);
332 sqlx::query(
333 r#"
334 UPDATE ff_attempt
335 SET worker_id = $1,
336 worker_instance_id = $2,
337 lease_epoch = lease_epoch + 1,
338 lease_expires_at_ms = $3,
339 started_at_ms = $4,
340 outcome = NULL
341 WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7
342 "#,
343 )
344 .bind(token.worker_id.as_str())
345 .bind(token.worker_instance_id.as_str())
346 .bind(new_expires)
347 .bind(now)
348 .bind(part)
349 .bind(exec_uuid)
350 .bind(attempt_index_i)
351 .execute(&mut *tx)
352 .await
353 .map_err(map_sqlx_error)?;
354
355 sqlx::query(
356 r#"
357 UPDATE ff_exec_core
358 SET lifecycle_phase = 'active',
359 ownership_state = 'leased',
360 eligibility_state = 'not_applicable',
361 attempt_state = 'running_attempt'
362 WHERE partition_key = $1 AND execution_id = $2
363 "#,
364 )
365 .bind(part)
366 .bind(exec_uuid)
367 .execute(&mut *tx)
368 .await
369 .map_err(map_sqlx_error)?;
370
371 lease_event::emit(
373 &mut tx,
374 part,
375 exec_uuid,
376 None,
377 lease_event::EVENT_RECLAIMED,
378 now,
379 )
380 .await?;
381
382 tx.commit().await.map_err(map_sqlx_error)?;
383
384 let new_epoch = current_epoch.saturating_add(1);
385 let payload = HandlePayload::new(
386 eid.clone(),
387 AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0)),
388 AttemptId::new(),
389 LeaseId::new(),
390 LeaseEpoch(u64::try_from(new_epoch).unwrap_or(0)),
391 u64::from(token.lease_ttl_ms),
392 token.grant.lane_id.clone(),
393 token.worker_instance_id.clone(),
394 );
395 Ok(Some(mint_handle(payload, HandleKind::Resumed)))
396}
397
398async fn fence_check<'c>(
405 tx: &mut sqlx::Transaction<'c, sqlx::Postgres>,
406 part: i16,
407 exec_uuid: Uuid,
408 attempt_index: i32,
409 expected_epoch: u64,
410) -> Result<(), EngineError> {
411 let row = sqlx::query(
412 r#"
413 SELECT lease_epoch FROM ff_attempt
414 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
415 FOR UPDATE
416 "#,
417 )
418 .bind(part)
419 .bind(exec_uuid)
420 .bind(attempt_index)
421 .fetch_optional(&mut **tx)
422 .await
423 .map_err(map_sqlx_error)?;
424 let Some(row) = row else {
425 return Err(EngineError::NotFound { entity: "attempt" });
426 };
427 let epoch_i: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
428 let observed = u64::try_from(epoch_i).unwrap_or(0);
429 if observed != expected_epoch {
430 return Err(EngineError::Contention(ContentionKind::LeaseConflict));
431 }
432 Ok(())
433}
434
435pub(crate) async fn renew(
438 pool: &PgPool,
439 handle: &Handle,
440) -> Result<LeaseRenewal, EngineError> {
441 let payload = decode_handle(handle)?;
442 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
443 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
444 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
445 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
446 let now = now_ms();
447 let new_expires = now.saturating_add(i64::try_from(payload.lease_ttl_ms).unwrap_or(0));
448 sqlx::query(
449 r#"
450 UPDATE ff_attempt
451 SET lease_expires_at_ms = $1
452 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
453 "#,
454 )
455 .bind(new_expires)
456 .bind(part)
457 .bind(exec_uuid)
458 .bind(attempt_index)
459 .execute(&mut *tx)
460 .await
461 .map_err(map_sqlx_error)?;
462 lease_event::emit(
464 &mut tx,
465 part,
466 exec_uuid,
467 None,
468 lease_event::EVENT_RENEWED,
469 now,
470 )
471 .await?;
472 tx.commit().await.map_err(map_sqlx_error)?;
473 Ok(LeaseRenewal::new(
474 u64::try_from(new_expires).unwrap_or(0),
475 payload.lease_epoch.0,
476 ))
477}
478
479pub(crate) async fn progress(
482 pool: &PgPool,
483 handle: &Handle,
484 percent: Option<u8>,
485 message: Option<String>,
486) -> Result<(), EngineError> {
487 let payload = decode_handle(handle)?;
488 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
489 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
490 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
491 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
492 let mut patch = serde_json::Map::new();
499 if let Some(pct) = percent {
500 patch.insert("progress_pct".into(), serde_json::Value::from(pct));
501 }
502 if let Some(msg) = message {
503 patch.insert("progress_message".into(), serde_json::Value::from(msg));
504 }
505 let patch_val = serde_json::Value::Object(patch);
506 sqlx::query(
507 r#"
508 UPDATE ff_exec_core
509 SET raw_fields = raw_fields || $1::jsonb
510 WHERE partition_key = $2 AND execution_id = $3
511 "#,
512 )
513 .bind(patch_val)
514 .bind(part)
515 .bind(exec_uuid)
516 .execute(&mut *tx)
517 .await
518 .map_err(map_sqlx_error)?;
519 tx.commit().await.map_err(map_sqlx_error)?;
520 Ok(())
521}
522
523pub(crate) async fn complete(
526 pool: &PgPool,
527 handle: &Handle,
528 payload_bytes: Option<Vec<u8>>,
529) -> Result<(), EngineError> {
530 let payload = decode_handle(handle)?;
531 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
532 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
533 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
534 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
535 let now = now_ms();
536
537 sqlx::query(
538 r#"
539 UPDATE ff_attempt
540 SET terminal_at_ms = $1,
541 outcome = 'success',
542 lease_expires_at_ms = NULL
543 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
544 "#,
545 )
546 .bind(now)
547 .bind(part)
548 .bind(exec_uuid)
549 .bind(attempt_index)
550 .execute(&mut *tx)
551 .await
552 .map_err(map_sqlx_error)?;
553
554 sqlx::query(
555 r#"
556 UPDATE ff_exec_core
557 SET lifecycle_phase = 'terminal',
558 ownership_state = 'unowned',
559 eligibility_state = 'not_applicable',
560 attempt_state = 'attempt_terminal',
561 terminal_at_ms = $1,
562 result = $2
563 WHERE partition_key = $3 AND execution_id = $4
564 "#,
565 )
566 .bind(now)
567 .bind(payload_bytes.as_deref())
568 .bind(part)
569 .bind(exec_uuid)
570 .execute(&mut *tx)
571 .await
572 .map_err(map_sqlx_error)?;
573
574 sqlx::query(
576 r#"
577 INSERT INTO ff_completion_event (
578 partition_key, execution_id, flow_id, outcome,
579 namespace, instance_tag, occurred_at_ms
580 )
581 SELECT partition_key, execution_id, flow_id, 'success',
582 NULL, NULL, $3
583 FROM ff_exec_core
584 WHERE partition_key = $1 AND execution_id = $2
585 "#,
586 )
587 .bind(part)
588 .bind(exec_uuid)
589 .bind(now)
590 .execute(&mut *tx)
591 .await
592 .map_err(map_sqlx_error)?;
593
594 lease_event::emit(
596 &mut tx,
597 part,
598 exec_uuid,
599 None,
600 lease_event::EVENT_REVOKED,
601 now,
602 )
603 .await?;
604
605 tx.commit().await.map_err(map_sqlx_error)?;
606 Ok(())
607}
608
609pub(crate) async fn fail(
612 pool: &PgPool,
613 handle: &Handle,
614 reason: FailureReason,
615 classification: FailureClass,
616) -> Result<FailOutcome, EngineError> {
617 let payload = decode_handle(handle)?;
618 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
619 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
620 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
621 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
622 let now = now_ms();
623 let retryable = matches!(
624 classification,
625 FailureClass::Transient | FailureClass::InfraCrash
626 );
627
628 if retryable {
629 sqlx::query(
631 r#"
632 UPDATE ff_attempt
633 SET terminal_at_ms = $1,
634 outcome = 'retry',
635 lease_expires_at_ms = NULL
636 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
637 "#,
638 )
639 .bind(now)
640 .bind(part)
641 .bind(exec_uuid)
642 .bind(attempt_index)
643 .execute(&mut *tx)
644 .await
645 .map_err(map_sqlx_error)?;
646
647 sqlx::query(
648 r#"
649 UPDATE ff_exec_core
650 SET lifecycle_phase = 'runnable',
651 ownership_state = 'unowned',
652 eligibility_state = 'eligible_now',
653 attempt_state = 'pending_retry_attempt',
654 attempt_index = attempt_index + 1,
655 raw_fields = raw_fields || jsonb_build_object('last_failure_message', $1::text)
656 WHERE partition_key = $2 AND execution_id = $3
657 "#,
658 )
659 .bind(&reason.message)
660 .bind(part)
661 .bind(exec_uuid)
662 .execute(&mut *tx)
663 .await
664 .map_err(map_sqlx_error)?;
665
666 lease_event::emit(
668 &mut tx,
669 part,
670 exec_uuid,
671 None,
672 lease_event::EVENT_REVOKED,
673 now,
674 )
675 .await?;
676
677 tx.commit().await.map_err(map_sqlx_error)?;
678 Ok(FailOutcome::RetryScheduled {
679 delay_until: TimestampMs::from_millis(now),
680 })
681 } else {
682 sqlx::query(
684 r#"
685 UPDATE ff_attempt
686 SET terminal_at_ms = $1,
687 outcome = 'failed',
688 lease_expires_at_ms = NULL
689 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
690 "#,
691 )
692 .bind(now)
693 .bind(part)
694 .bind(exec_uuid)
695 .bind(attempt_index)
696 .execute(&mut *tx)
697 .await
698 .map_err(map_sqlx_error)?;
699
700 sqlx::query(
701 r#"
702 UPDATE ff_exec_core
703 SET lifecycle_phase = 'terminal',
704 ownership_state = 'unowned',
705 eligibility_state = 'not_applicable',
706 attempt_state = 'attempt_terminal',
707 terminal_at_ms = $1,
708 raw_fields = raw_fields || jsonb_build_object('last_failure_message', $2::text)
709 WHERE partition_key = $3 AND execution_id = $4
710 "#,
711 )
712 .bind(now)
713 .bind(&reason.message)
714 .bind(part)
715 .bind(exec_uuid)
716 .execute(&mut *tx)
717 .await
718 .map_err(map_sqlx_error)?;
719
720 sqlx::query(
721 r#"
722 INSERT INTO ff_completion_event (
723 partition_key, execution_id, flow_id, outcome,
724 namespace, instance_tag, occurred_at_ms
725 )
726 SELECT partition_key, execution_id, flow_id, 'failed',
727 NULL, NULL, $3
728 FROM ff_exec_core
729 WHERE partition_key = $1 AND execution_id = $2
730 "#,
731 )
732 .bind(part)
733 .bind(exec_uuid)
734 .bind(now)
735 .execute(&mut *tx)
736 .await
737 .map_err(map_sqlx_error)?;
738
739 lease_event::emit(
741 &mut tx,
742 part,
743 exec_uuid,
744 None,
745 lease_event::EVENT_REVOKED,
746 now,
747 )
748 .await?;
749
750 tx.commit().await.map_err(map_sqlx_error)?;
751 Ok(FailOutcome::TerminalFailed)
752 }
753}
754
755pub(crate) async fn delay(
758 pool: &PgPool,
759 handle: &Handle,
760 delay_until: TimestampMs,
761) -> Result<(), EngineError> {
762 let payload = decode_handle(handle)?;
763 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
764 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
765 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
766 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
767
768 sqlx::query(
769 r#"
770 UPDATE ff_attempt
771 SET outcome = 'interrupted',
772 lease_expires_at_ms = NULL
773 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
774 "#,
775 )
776 .bind(part)
777 .bind(exec_uuid)
778 .bind(attempt_index)
779 .execute(&mut *tx)
780 .await
781 .map_err(map_sqlx_error)?;
782
783 sqlx::query(
784 r#"
785 UPDATE ff_exec_core
786 SET lifecycle_phase = 'runnable',
787 ownership_state = 'unowned',
788 eligibility_state = 'not_eligible_until_time',
789 attempt_state = 'attempt_interrupted',
790 deadline_at_ms = $1
791 WHERE partition_key = $2 AND execution_id = $3
792 "#,
793 )
794 .bind(delay_until.0)
795 .bind(part)
796 .bind(exec_uuid)
797 .execute(&mut *tx)
798 .await
799 .map_err(map_sqlx_error)?;
800
801 lease_event::emit(
803 &mut tx,
804 part,
805 exec_uuid,
806 None,
807 lease_event::EVENT_REVOKED,
808 now_ms(),
809 )
810 .await?;
811
812 tx.commit().await.map_err(map_sqlx_error)?;
813 Ok(())
814}
815
816pub(crate) async fn wait_children(
819 pool: &PgPool,
820 handle: &Handle,
821) -> Result<(), EngineError> {
822 let payload = decode_handle(handle)?;
823 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
824 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
825 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
826 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
827
828 sqlx::query(
829 r#"
830 UPDATE ff_attempt
831 SET outcome = 'waiting_children',
832 lease_expires_at_ms = NULL
833 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
834 "#,
835 )
836 .bind(part)
837 .bind(exec_uuid)
838 .bind(attempt_index)
839 .execute(&mut *tx)
840 .await
841 .map_err(map_sqlx_error)?;
842
843 sqlx::query(
844 r#"
845 UPDATE ff_exec_core
846 SET lifecycle_phase = 'runnable',
847 ownership_state = 'unowned',
848 eligibility_state = 'blocked_by_dependencies',
849 attempt_state = 'attempt_interrupted',
850 blocking_reason = 'waiting_for_children'
851 WHERE partition_key = $1 AND execution_id = $2
852 "#,
853 )
854 .bind(part)
855 .bind(exec_uuid)
856 .execute(&mut *tx)
857 .await
858 .map_err(map_sqlx_error)?;
859
860 lease_event::emit(
862 &mut tx,
863 part,
864 exec_uuid,
865 None,
866 lease_event::EVENT_REVOKED,
867 now_ms(),
868 )
869 .await?;
870
871 tx.commit().await.map_err(map_sqlx_error)?;
872 Ok(())
873}
874