1use ff_core::backend::{
37 BackendTag, CapabilitySet, ClaimPolicy, FailOutcome, FailureClass, FailureReason, Handle,
38 HandleKind, LeaseRenewal, ReclaimToken,
39};
40use ff_core::caps::{matches as caps_matches, CapabilityRequirement};
41use ff_core::engine_error::{ContentionKind, EngineError};
42use ff_core::handle_codec::{decode as decode_opaque, encode as encode_opaque, HandlePayload};
43use ff_core::types::{
44 AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, TimestampMs,
45};
46use sqlx::{PgPool, Row};
47use std::time::{SystemTime, UNIX_EPOCH};
48use uuid::Uuid;
49
50use crate::error::map_sqlx_error;
51
52fn now_ms() -> i64 {
55 i64::try_from(
56 SystemTime::now()
57 .duration_since(UNIX_EPOCH)
58 .map(|d| d.as_millis())
59 .unwrap_or(0),
60 )
61 .unwrap_or(i64::MAX)
62}
63
64fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
67 let s = eid.as_str();
68 let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
69 kind: ff_core::engine_error::ValidationKind::InvalidInput,
70 detail: format!("execution_id missing `{{fp:` prefix: {s}"),
71 })?;
72 let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
73 kind: ff_core::engine_error::ValidationKind::InvalidInput,
74 detail: format!("execution_id missing `}}:`: {s}"),
75 })?;
76 let part: i16 = rest[..close]
77 .parse()
78 .map_err(|_| EngineError::Validation {
79 kind: ff_core::engine_error::ValidationKind::InvalidInput,
80 detail: format!("execution_id partition index not u16: {s}"),
81 })?;
82 let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
83 kind: ff_core::engine_error::ValidationKind::InvalidInput,
84 detail: format!("execution_id UUID invalid: {s}"),
85 })?;
86 Ok((part, uuid))
87}
88
89fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
90 if handle.backend != BackendTag::Postgres {
91 return Err(EngineError::Validation {
92 kind: ff_core::engine_error::ValidationKind::Corruption,
93 detail: format!(
94 "handle minted by {:?} passed to Postgres backend",
95 handle.backend
96 ),
97 });
98 }
99 let decoded = decode_opaque(&handle.opaque)?;
100 if decoded.tag != BackendTag::Postgres {
101 return Err(EngineError::Validation {
102 kind: ff_core::engine_error::ValidationKind::Corruption,
103 detail: format!("inner handle tag mismatch: {:?}", decoded.tag),
104 });
105 }
106 Ok(decoded.payload)
107}
108
109fn mint_handle(payload: HandlePayload, kind: HandleKind) -> Handle {
110 let op = encode_opaque(BackendTag::Postgres, &payload);
111 Handle::new(BackendTag::Postgres, kind, op)
112}
113
114pub(crate) async fn claim(
117 pool: &PgPool,
118 lane: &LaneId,
119 capabilities: &CapabilitySet,
120 policy: &ClaimPolicy,
121) -> Result<Option<Handle>, EngineError> {
122 let total_partitions: i16 = 256;
127 for part in 0..total_partitions {
128 match try_claim_in_partition(pool, part, lane, capabilities, policy).await? {
129 Some(h) => return Ok(Some(h)),
130 None => continue,
131 }
132 }
133 Ok(None)
134}
135
136async fn try_claim_in_partition(
137 pool: &PgPool,
138 part: i16,
139 lane: &LaneId,
140 capabilities: &CapabilitySet,
141 policy: &ClaimPolicy,
142) -> Result<Option<Handle>, EngineError> {
143 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
144 let row = sqlx::query(
147 r#"
148 SELECT execution_id, required_capabilities, attempt_index
149 FROM ff_exec_core
150 WHERE partition_key = $1
151 AND lane_id = $2
152 AND lifecycle_phase = 'runnable'
153 AND eligibility_state = 'eligible_now'
154 ORDER BY priority DESC, created_at_ms ASC
155 FOR UPDATE SKIP LOCKED
156 LIMIT 1
157 "#,
158 )
159 .bind(part)
160 .bind(lane.as_str())
161 .fetch_optional(&mut *tx)
162 .await
163 .map_err(map_sqlx_error)?;
164
165 let Some(row) = row else {
166 tx.rollback().await.map_err(map_sqlx_error)?;
168 return Ok(None);
169 };
170
171 let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
172 let required_caps: Vec<String> = row
173 .try_get::<Vec<String>, _>("required_capabilities")
174 .map_err(map_sqlx_error)?;
175 let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
176 let req = CapabilityRequirement::new(required_caps);
177 if !caps_matches(&req, capabilities) {
178 tx.rollback().await.map_err(map_sqlx_error)?;
180 return Ok(None);
181 }
182
183 let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
184 let now = now_ms();
185 let lease_ttl_ms = i64::from(policy.lease_ttl_ms);
186 let expires = now.saturating_add(lease_ttl_ms);
187
188 sqlx::query(
192 r#"
193 INSERT INTO ff_attempt (
194 partition_key, execution_id, attempt_index,
195 worker_id, worker_instance_id,
196 lease_epoch, lease_expires_at_ms, started_at_ms
197 ) VALUES ($1, $2, $3, $4, $5, 1, $6, $7)
198 ON CONFLICT (partition_key, execution_id, attempt_index)
199 DO UPDATE SET
200 worker_id = EXCLUDED.worker_id,
201 worker_instance_id = EXCLUDED.worker_instance_id,
202 lease_epoch = ff_attempt.lease_epoch + 1,
203 lease_expires_at_ms = EXCLUDED.lease_expires_at_ms,
204 started_at_ms = EXCLUDED.started_at_ms,
205 outcome = NULL
206 "#,
207 )
208 .bind(part)
209 .bind(exec_uuid)
210 .bind(attempt_index_i)
211 .bind(policy.worker_id.as_str())
212 .bind(policy.worker_instance_id.as_str())
213 .bind(expires)
214 .bind(now)
215 .execute(&mut *tx)
216 .await
217 .map_err(map_sqlx_error)?;
218
219 let epoch_row = sqlx::query(
221 r#"
222 SELECT lease_epoch FROM ff_attempt
223 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
224 "#,
225 )
226 .bind(part)
227 .bind(exec_uuid)
228 .bind(attempt_index_i)
229 .fetch_one(&mut *tx)
230 .await
231 .map_err(map_sqlx_error)?;
232 let epoch_i: i64 = epoch_row.try_get("lease_epoch").map_err(map_sqlx_error)?;
233
234 sqlx::query(
236 r#"
237 UPDATE ff_exec_core
238 SET lifecycle_phase = 'active',
239 ownership_state = 'leased',
240 eligibility_state = 'not_applicable',
241 attempt_state = 'running_attempt'
242 WHERE partition_key = $1 AND execution_id = $2
243 "#,
244 )
245 .bind(part)
246 .bind(exec_uuid)
247 .execute(&mut *tx)
248 .await
249 .map_err(map_sqlx_error)?;
250
251 tx.commit().await.map_err(map_sqlx_error)?;
252
253 let exec_id = ExecutionId::parse(&format!("{{fp:{part}}}:{exec_uuid}")).map_err(|e| {
254 EngineError::Validation {
255 kind: ff_core::engine_error::ValidationKind::InvalidInput,
256 detail: format!("reassembling exec id: {e}"),
257 }
258 })?;
259 let payload = HandlePayload::new(
260 exec_id,
261 attempt_index,
262 AttemptId::new(),
263 LeaseId::new(),
264 LeaseEpoch(u64::try_from(epoch_i).unwrap_or(1)),
265 u64::from(policy.lease_ttl_ms),
266 lane.clone(),
267 policy.worker_instance_id.clone(),
268 );
269 Ok(Some(mint_handle(payload, HandleKind::Fresh)))
270}
271
272pub(crate) async fn claim_from_reclaim(
275 pool: &PgPool,
276 token: ReclaimToken,
277) -> Result<Option<Handle>, EngineError> {
278 let eid = &token.grant.execution_id;
279 let (part, exec_uuid) = split_exec_id(eid)?;
280 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
281 let row = sqlx::query(
283 r#"
284 SELECT attempt_index, lease_epoch, lease_expires_at_ms
285 FROM ff_attempt
286 WHERE partition_key = $1 AND execution_id = $2
287 ORDER BY attempt_index DESC
288 FOR UPDATE
289 LIMIT 1
290 "#,
291 )
292 .bind(part)
293 .bind(exec_uuid)
294 .fetch_optional(&mut *tx)
295 .await
296 .map_err(map_sqlx_error)?;
297 let Some(row) = row else {
298 tx.rollback().await.map_err(map_sqlx_error)?;
299 return Err(EngineError::NotFound { entity: "attempt" });
300 };
301 let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
302 let current_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
303 let expires_at: Option<i64> = row
304 .try_get::<Option<i64>, _>("lease_expires_at_ms")
305 .map_err(map_sqlx_error)?;
306
307 let now = now_ms();
311 let live = matches!(expires_at, Some(exp) if exp > now);
312 if live {
313 tx.rollback().await.map_err(map_sqlx_error)?;
314 return Ok(None); }
316
317 let lease_ttl_ms = i64::from(token.lease_ttl_ms);
319 let new_expires = now.saturating_add(lease_ttl_ms);
320 sqlx::query(
321 r#"
322 UPDATE ff_attempt
323 SET worker_id = $1,
324 worker_instance_id = $2,
325 lease_epoch = lease_epoch + 1,
326 lease_expires_at_ms = $3,
327 started_at_ms = $4,
328 outcome = NULL
329 WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7
330 "#,
331 )
332 .bind(token.worker_id.as_str())
333 .bind(token.worker_instance_id.as_str())
334 .bind(new_expires)
335 .bind(now)
336 .bind(part)
337 .bind(exec_uuid)
338 .bind(attempt_index_i)
339 .execute(&mut *tx)
340 .await
341 .map_err(map_sqlx_error)?;
342
343 sqlx::query(
344 r#"
345 UPDATE ff_exec_core
346 SET lifecycle_phase = 'active',
347 ownership_state = 'leased',
348 eligibility_state = 'not_applicable',
349 attempt_state = 'running_attempt'
350 WHERE partition_key = $1 AND execution_id = $2
351 "#,
352 )
353 .bind(part)
354 .bind(exec_uuid)
355 .execute(&mut *tx)
356 .await
357 .map_err(map_sqlx_error)?;
358
359 tx.commit().await.map_err(map_sqlx_error)?;
360
361 let new_epoch = current_epoch.saturating_add(1);
362 let payload = HandlePayload::new(
363 eid.clone(),
364 AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0)),
365 AttemptId::new(),
366 LeaseId::new(),
367 LeaseEpoch(u64::try_from(new_epoch).unwrap_or(0)),
368 u64::from(token.lease_ttl_ms),
369 token.grant.lane_id.clone(),
370 token.worker_instance_id.clone(),
371 );
372 Ok(Some(mint_handle(payload, HandleKind::Resumed)))
373}
374
375async fn fence_check<'c>(
382 tx: &mut sqlx::Transaction<'c, sqlx::Postgres>,
383 part: i16,
384 exec_uuid: Uuid,
385 attempt_index: i32,
386 expected_epoch: u64,
387) -> Result<(), EngineError> {
388 let row = sqlx::query(
389 r#"
390 SELECT lease_epoch FROM ff_attempt
391 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
392 FOR UPDATE
393 "#,
394 )
395 .bind(part)
396 .bind(exec_uuid)
397 .bind(attempt_index)
398 .fetch_optional(&mut **tx)
399 .await
400 .map_err(map_sqlx_error)?;
401 let Some(row) = row else {
402 return Err(EngineError::NotFound { entity: "attempt" });
403 };
404 let epoch_i: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
405 let observed = u64::try_from(epoch_i).unwrap_or(0);
406 if observed != expected_epoch {
407 return Err(EngineError::Contention(ContentionKind::LeaseConflict));
408 }
409 Ok(())
410}
411
412pub(crate) async fn renew(
415 pool: &PgPool,
416 handle: &Handle,
417) -> Result<LeaseRenewal, EngineError> {
418 let payload = decode_handle(handle)?;
419 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
420 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
421 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
422 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
423 let now = now_ms();
424 let new_expires = now.saturating_add(i64::try_from(payload.lease_ttl_ms).unwrap_or(0));
425 sqlx::query(
426 r#"
427 UPDATE ff_attempt
428 SET lease_expires_at_ms = $1
429 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
430 "#,
431 )
432 .bind(new_expires)
433 .bind(part)
434 .bind(exec_uuid)
435 .bind(attempt_index)
436 .execute(&mut *tx)
437 .await
438 .map_err(map_sqlx_error)?;
439 tx.commit().await.map_err(map_sqlx_error)?;
440 Ok(LeaseRenewal::new(
441 u64::try_from(new_expires).unwrap_or(0),
442 payload.lease_epoch.0,
443 ))
444}
445
446pub(crate) async fn progress(
449 pool: &PgPool,
450 handle: &Handle,
451 percent: Option<u8>,
452 message: Option<String>,
453) -> Result<(), EngineError> {
454 let payload = decode_handle(handle)?;
455 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
456 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
457 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
458 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
459 let mut patch = serde_json::Map::new();
466 if let Some(pct) = percent {
467 patch.insert("progress_pct".into(), serde_json::Value::from(pct));
468 }
469 if let Some(msg) = message {
470 patch.insert("progress_message".into(), serde_json::Value::from(msg));
471 }
472 let patch_val = serde_json::Value::Object(patch);
473 sqlx::query(
474 r#"
475 UPDATE ff_exec_core
476 SET raw_fields = raw_fields || $1::jsonb
477 WHERE partition_key = $2 AND execution_id = $3
478 "#,
479 )
480 .bind(patch_val)
481 .bind(part)
482 .bind(exec_uuid)
483 .execute(&mut *tx)
484 .await
485 .map_err(map_sqlx_error)?;
486 tx.commit().await.map_err(map_sqlx_error)?;
487 Ok(())
488}
489
490pub(crate) async fn complete(
493 pool: &PgPool,
494 handle: &Handle,
495 payload_bytes: Option<Vec<u8>>,
496) -> Result<(), EngineError> {
497 let payload = decode_handle(handle)?;
498 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
499 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
500 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
501 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
502 let now = now_ms();
503
504 sqlx::query(
505 r#"
506 UPDATE ff_attempt
507 SET terminal_at_ms = $1,
508 outcome = 'success',
509 lease_expires_at_ms = NULL
510 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
511 "#,
512 )
513 .bind(now)
514 .bind(part)
515 .bind(exec_uuid)
516 .bind(attempt_index)
517 .execute(&mut *tx)
518 .await
519 .map_err(map_sqlx_error)?;
520
521 sqlx::query(
522 r#"
523 UPDATE ff_exec_core
524 SET lifecycle_phase = 'terminal',
525 ownership_state = 'unowned',
526 eligibility_state = 'not_applicable',
527 attempt_state = 'attempt_terminal',
528 terminal_at_ms = $1,
529 result = $2
530 WHERE partition_key = $3 AND execution_id = $4
531 "#,
532 )
533 .bind(now)
534 .bind(payload_bytes.as_deref())
535 .bind(part)
536 .bind(exec_uuid)
537 .execute(&mut *tx)
538 .await
539 .map_err(map_sqlx_error)?;
540
541 sqlx::query(
543 r#"
544 INSERT INTO ff_completion_event (
545 partition_key, execution_id, flow_id, outcome,
546 namespace, instance_tag, occurred_at_ms
547 )
548 SELECT partition_key, execution_id, flow_id, 'success',
549 NULL, NULL, $3
550 FROM ff_exec_core
551 WHERE partition_key = $1 AND execution_id = $2
552 "#,
553 )
554 .bind(part)
555 .bind(exec_uuid)
556 .bind(now)
557 .execute(&mut *tx)
558 .await
559 .map_err(map_sqlx_error)?;
560
561 tx.commit().await.map_err(map_sqlx_error)?;
562 Ok(())
563}
564
565pub(crate) async fn fail(
568 pool: &PgPool,
569 handle: &Handle,
570 reason: FailureReason,
571 classification: FailureClass,
572) -> Result<FailOutcome, EngineError> {
573 let payload = decode_handle(handle)?;
574 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
575 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
576 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
577 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
578 let now = now_ms();
579 let retryable = matches!(
580 classification,
581 FailureClass::Transient | FailureClass::InfraCrash
582 );
583
584 if retryable {
585 sqlx::query(
587 r#"
588 UPDATE ff_attempt
589 SET terminal_at_ms = $1,
590 outcome = 'retry',
591 lease_expires_at_ms = NULL
592 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
593 "#,
594 )
595 .bind(now)
596 .bind(part)
597 .bind(exec_uuid)
598 .bind(attempt_index)
599 .execute(&mut *tx)
600 .await
601 .map_err(map_sqlx_error)?;
602
603 sqlx::query(
604 r#"
605 UPDATE ff_exec_core
606 SET lifecycle_phase = 'runnable',
607 ownership_state = 'unowned',
608 eligibility_state = 'eligible_now',
609 attempt_state = 'pending_retry_attempt',
610 attempt_index = attempt_index + 1,
611 raw_fields = raw_fields || jsonb_build_object('last_failure_message', $1::text)
612 WHERE partition_key = $2 AND execution_id = $3
613 "#,
614 )
615 .bind(&reason.message)
616 .bind(part)
617 .bind(exec_uuid)
618 .execute(&mut *tx)
619 .await
620 .map_err(map_sqlx_error)?;
621
622 tx.commit().await.map_err(map_sqlx_error)?;
623 Ok(FailOutcome::RetryScheduled {
624 delay_until: TimestampMs::from_millis(now),
625 })
626 } else {
627 sqlx::query(
629 r#"
630 UPDATE ff_attempt
631 SET terminal_at_ms = $1,
632 outcome = 'failed',
633 lease_expires_at_ms = NULL
634 WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
635 "#,
636 )
637 .bind(now)
638 .bind(part)
639 .bind(exec_uuid)
640 .bind(attempt_index)
641 .execute(&mut *tx)
642 .await
643 .map_err(map_sqlx_error)?;
644
645 sqlx::query(
646 r#"
647 UPDATE ff_exec_core
648 SET lifecycle_phase = 'terminal',
649 ownership_state = 'unowned',
650 eligibility_state = 'not_applicable',
651 attempt_state = 'attempt_terminal',
652 terminal_at_ms = $1,
653 raw_fields = raw_fields || jsonb_build_object('last_failure_message', $2::text)
654 WHERE partition_key = $3 AND execution_id = $4
655 "#,
656 )
657 .bind(now)
658 .bind(&reason.message)
659 .bind(part)
660 .bind(exec_uuid)
661 .execute(&mut *tx)
662 .await
663 .map_err(map_sqlx_error)?;
664
665 sqlx::query(
666 r#"
667 INSERT INTO ff_completion_event (
668 partition_key, execution_id, flow_id, outcome,
669 namespace, instance_tag, occurred_at_ms
670 )
671 SELECT partition_key, execution_id, flow_id, 'failed',
672 NULL, NULL, $3
673 FROM ff_exec_core
674 WHERE partition_key = $1 AND execution_id = $2
675 "#,
676 )
677 .bind(part)
678 .bind(exec_uuid)
679 .bind(now)
680 .execute(&mut *tx)
681 .await
682 .map_err(map_sqlx_error)?;
683
684 tx.commit().await.map_err(map_sqlx_error)?;
685 Ok(FailOutcome::TerminalFailed)
686 }
687}
688
689pub(crate) async fn delay(
692 pool: &PgPool,
693 handle: &Handle,
694 delay_until: TimestampMs,
695) -> Result<(), EngineError> {
696 let payload = decode_handle(handle)?;
697 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
698 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
699 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
700 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
701
702 sqlx::query(
703 r#"
704 UPDATE ff_attempt
705 SET outcome = 'interrupted',
706 lease_expires_at_ms = NULL
707 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
708 "#,
709 )
710 .bind(part)
711 .bind(exec_uuid)
712 .bind(attempt_index)
713 .execute(&mut *tx)
714 .await
715 .map_err(map_sqlx_error)?;
716
717 sqlx::query(
718 r#"
719 UPDATE ff_exec_core
720 SET lifecycle_phase = 'runnable',
721 ownership_state = 'unowned',
722 eligibility_state = 'not_eligible_until_time',
723 attempt_state = 'attempt_interrupted',
724 deadline_at_ms = $1
725 WHERE partition_key = $2 AND execution_id = $3
726 "#,
727 )
728 .bind(delay_until.0)
729 .bind(part)
730 .bind(exec_uuid)
731 .execute(&mut *tx)
732 .await
733 .map_err(map_sqlx_error)?;
734
735 tx.commit().await.map_err(map_sqlx_error)?;
736 Ok(())
737}
738
739pub(crate) async fn wait_children(
742 pool: &PgPool,
743 handle: &Handle,
744) -> Result<(), EngineError> {
745 let payload = decode_handle(handle)?;
746 let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
747 let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
748 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
749 fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
750
751 sqlx::query(
752 r#"
753 UPDATE ff_attempt
754 SET outcome = 'waiting_children',
755 lease_expires_at_ms = NULL
756 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
757 "#,
758 )
759 .bind(part)
760 .bind(exec_uuid)
761 .bind(attempt_index)
762 .execute(&mut *tx)
763 .await
764 .map_err(map_sqlx_error)?;
765
766 sqlx::query(
767 r#"
768 UPDATE ff_exec_core
769 SET lifecycle_phase = 'runnable',
770 ownership_state = 'unowned',
771 eligibility_state = 'blocked_by_dependencies',
772 attempt_state = 'attempt_interrupted',
773 blocking_reason = 'waiting_for_children'
774 WHERE partition_key = $1 AND execution_id = $2
775 "#,
776 )
777 .bind(part)
778 .bind(exec_uuid)
779 .execute(&mut *tx)
780 .await
781 .map_err(map_sqlx_error)?;
782
783 tx.commit().await.map_err(map_sqlx_error)?;
784 Ok(())
785}
786