1use std::time::Duration;
25
26use ff_core::contracts::{
27 CancelExecutionArgs, CancelExecutionResult, CancelFlowArgs, CancelFlowHeader,
28 ChangePriorityArgs, ChangePriorityResult, ReplayExecutionArgs, ReplayExecutionResult,
29 RevokeLeaseArgs, RevokeLeaseResult,
30};
31use ff_core::engine_error::{ContentionKind, EngineError, StateKind, ValidationKind};
32use ff_core::state::PublicState;
33use ff_core::types::{CancelSource, ExecutionId, FlowId};
34use serde_json::json;
35use sqlx::{PgPool, Postgres, Row};
36use uuid::Uuid;
37
38use crate::error::map_sqlx_error;
39use crate::{lease_event, operator_event};
40
41const MAX_ATTEMPTS: u32 = 3;
44
45fn eid_uuid(eid: &ff_core::types::ExecutionId) -> Uuid {
48 let s = eid.as_str();
49 let suffix = s
50 .split_once("}:")
51 .map(|(_, u)| u)
52 .expect("ExecutionId has `}:` separator (invariant)");
53 Uuid::parse_str(suffix).expect("ExecutionId suffix is a valid UUID (invariant)")
54}
55
56fn now_ms() -> i64 {
57 let d = std::time::SystemTime::now()
58 .duration_since(std::time::UNIX_EPOCH)
59 .expect("clock is after UNIX_EPOCH");
60 (d.as_millis() as i64).max(0)
61}
62
63fn is_serialization_conflict(err: &EngineError) -> bool {
67 matches!(err, EngineError::Contention(ContentionKind::LeaseConflict))
68}
69
70async fn begin_serializable(pool: &PgPool) -> Result<sqlx::Transaction<'_, Postgres>, EngineError> {
71 let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
72 sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
73 .execute(&mut *tx)
74 .await
75 .map_err(map_sqlx_error)?;
76 Ok(tx)
77}
78
79fn synthetic_lease_id(exec_uuid: Uuid, attempt_index: i32, lease_epoch: i64) -> String {
85 format!("pg:{exec_uuid}:{attempt_index}:{lease_epoch}")
86}
87
88async fn cancel_execution_once(
97 pool: &PgPool,
98 args: &CancelExecutionArgs,
99) -> Result<CancelExecutionResult, EngineError> {
100 let partition_key: i16 = args.execution_id.partition() as i16;
101 let exec_uuid = eid_uuid(&args.execution_id);
102 let now: i64 = args.now.0;
107
108 let mut tx = begin_serializable(pool).await?;
109
110 let row = sqlx::query(
114 r#"
115 SELECT ec.lifecycle_phase,
116 ec.public_state,
117 ec.attempt_index,
118 a.worker_instance_id,
119 a.lease_epoch
120 FROM ff_exec_core ec
121 LEFT JOIN ff_attempt a
122 ON a.partition_key = ec.partition_key
123 AND a.execution_id = ec.execution_id
124 AND a.attempt_index = ec.attempt_index
125 WHERE ec.partition_key = $1 AND ec.execution_id = $2
126 FOR NO KEY UPDATE OF ec
127 "#,
128 )
129 .bind(partition_key)
130 .bind(exec_uuid)
131 .fetch_optional(&mut *tx)
132 .await
133 .map_err(map_sqlx_error)?;
134
135 let Some(row) = row else {
136 tx.rollback().await.map_err(map_sqlx_error)?;
137 return Err(EngineError::NotFound {
140 entity: "execution",
141 });
142 };
143
144 let lifecycle_phase: String = row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
145 let public_state: String = row.try_get("public_state").map_err(map_sqlx_error)?;
146 let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
147 let worker_instance_id: Option<String> =
148 row.try_get("worker_instance_id").map_err(map_sqlx_error)?;
149 let lease_epoch: Option<i64> = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
150
151 if matches!(lifecycle_phase.as_str(), "terminal" | "cancelled") {
156 tx.rollback().await.map_err(map_sqlx_error)?;
157 return if public_state == "cancelled" {
158 Ok(CancelExecutionResult::Cancelled {
159 execution_id: args.execution_id.clone(),
160 public_state: PublicState::Cancelled,
161 })
162 } else {
163 Err(EngineError::Validation {
164 kind: ValidationKind::InvalidInput,
165 detail: format!(
166 "cancel_execution: execution_id={}: already terminal in state '{}'",
167 args.execution_id, public_state
168 ),
169 })
170 };
171 }
172
173 let lease_active = worker_instance_id
184 .as_deref()
185 .is_some_and(|s| !s.is_empty());
186 if !matches!(args.source, CancelSource::OperatorOverride) && lease_active {
187 let Some(expected_epoch) = args.lease_epoch.as_ref() else {
188 tx.rollback().await.map_err(map_sqlx_error)?;
189 return Err(EngineError::Validation {
190 kind: ValidationKind::InvalidInput,
191 detail: format!(
192 "cancel_execution: execution_id={}: lease_epoch required when source != operator_override and execution is active",
193 args.execution_id
194 ),
195 });
196 };
197 let expected: i64 = i64::try_from(expected_epoch.0).unwrap_or(i64::MAX);
198 if lease_epoch.unwrap_or(0) != expected {
199 tx.rollback().await.map_err(map_sqlx_error)?;
200 return Err(EngineError::State(
201 ff_core::engine_error::StateKind::StaleLease,
202 ));
203 }
204 }
205
206 sqlx::query(
212 r#"
213 UPDATE ff_exec_core
214 SET lifecycle_phase = 'cancelled',
215 ownership_state = 'unowned',
216 eligibility_state = 'not_applicable',
217 public_state = 'cancelled',
218 attempt_state = 'cancelled',
219 terminal_at_ms = COALESCE(terminal_at_ms, $3),
220 cancellation_reason = COALESCE(cancellation_reason, $4),
221 cancelled_by = COALESCE(cancelled_by, $5),
222 raw_fields = jsonb_set(raw_fields,
223 '{last_mutation_at}',
224 to_jsonb($3::text))
225 WHERE partition_key = $1 AND execution_id = $2
226 "#,
227 )
228 .bind(partition_key)
229 .bind(exec_uuid)
230 .bind(now)
231 .bind(&args.reason)
232 .bind(args.source.as_str())
233 .execute(&mut *tx)
234 .await
235 .map_err(map_sqlx_error)?;
236
237 if lease_active {
243 sqlx::query(
244 r#"
245 UPDATE ff_attempt
246 SET worker_instance_id = NULL,
247 lease_expires_at_ms = NULL,
248 lease_epoch = lease_epoch + 1,
249 terminal_at_ms = $4,
250 outcome = 'cancelled'
251 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
252 "#,
253 )
254 .bind(partition_key)
255 .bind(exec_uuid)
256 .bind(attempt_index)
257 .bind(now)
258 .execute(&mut *tx)
259 .await
260 .map_err(map_sqlx_error)?;
261
262 lease_event::emit(
267 &mut tx,
268 partition_key,
269 exec_uuid,
270 None, lease_event::EVENT_REVOKED,
272 now,
273 )
274 .await?;
275 }
276
277 tx.commit().await.map_err(map_sqlx_error)?;
278 Ok(CancelExecutionResult::Cancelled {
279 execution_id: args.execution_id.clone(),
280 public_state: PublicState::Cancelled,
281 })
282}
283
284pub(super) async fn cancel_execution_impl(
287 pool: &PgPool,
288 args: CancelExecutionArgs,
289) -> Result<CancelExecutionResult, EngineError> {
290 let mut last: Option<EngineError> = None;
291 for attempt in 0..MAX_ATTEMPTS {
292 match cancel_execution_once(pool, &args).await {
293 Ok(r) => return Ok(r),
294 Err(err) => {
295 if is_serialization_conflict(&err) {
296 if attempt + 1 < MAX_ATTEMPTS {
297 let ms = 5u64 * (1u64 << attempt);
298 tokio::time::sleep(Duration::from_millis(ms)).await;
299 }
300 last = Some(err);
301 continue;
302 }
303 return Err(err);
304 }
305 }
306 }
307 let _ = last;
308 Err(EngineError::Contention(ContentionKind::RetryExhausted))
309}
310
311async fn revoke_lease_once(
314 pool: &PgPool,
315 args: &RevokeLeaseArgs,
316) -> Result<RevokeLeaseResult, EngineError> {
317 let partition_key: i16 = args.execution_id.partition() as i16;
318 let exec_uuid = eid_uuid(&args.execution_id);
319 let now = now_ms();
320
321 let mut tx = begin_serializable(pool).await?;
322
323 let ec_row = sqlx::query(
329 r#"
330 SELECT attempt_index
331 FROM ff_exec_core
332 WHERE partition_key = $1 AND execution_id = $2
333 FOR NO KEY UPDATE
334 "#,
335 )
336 .bind(partition_key)
337 .bind(exec_uuid)
338 .fetch_optional(&mut *tx)
339 .await
340 .map_err(map_sqlx_error)?;
341
342 let Some(ec_row) = ec_row else {
343 tx.rollback().await.map_err(map_sqlx_error)?;
344 return Err(EngineError::NotFound {
345 entity: "execution",
346 });
347 };
348 let attempt_index: i32 = ec_row.try_get("attempt_index").map_err(map_sqlx_error)?;
349
350 let att_row = sqlx::query(
353 r#"
354 SELECT worker_instance_id, lease_epoch
355 FROM ff_attempt
356 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
357 FOR UPDATE
358 "#,
359 )
360 .bind(partition_key)
361 .bind(exec_uuid)
362 .bind(attempt_index)
363 .fetch_optional(&mut *tx)
364 .await
365 .map_err(map_sqlx_error)?;
366
367 let (worker_instance_id, lease_epoch): (Option<String>, Option<i64>) = match att_row {
368 Some(r) => (
369 r.try_get("worker_instance_id").map_err(map_sqlx_error)?,
370 r.try_get("lease_epoch").map_err(map_sqlx_error)?,
371 ),
372 None => (None, None),
373 };
374
375 let lease_active = worker_instance_id
376 .as_deref()
377 .is_some_and(|s| !s.is_empty());
378 if !lease_active {
379 tx.rollback().await.map_err(map_sqlx_error)?;
383 return Ok(RevokeLeaseResult::AlreadySatisfied {
384 reason: "no_active_lease".to_owned(),
385 });
386 }
387
388 let caller_wiid = args.worker_instance_id.as_str();
396 if !caller_wiid.is_empty()
397 && worker_instance_id.as_deref() != Some(caller_wiid)
398 {
399 tx.rollback().await.map_err(map_sqlx_error)?;
400 return Ok(RevokeLeaseResult::AlreadySatisfied {
401 reason: "different_worker_instance_id".to_owned(),
402 });
403 }
404
405 let prior_epoch = lease_epoch.unwrap_or(0);
406
407 if let Some(expected) = args
412 .expected_lease_id
413 .as_ref()
414 .filter(|s| !s.is_empty())
415 {
416 let current_id = synthetic_lease_id(exec_uuid, attempt_index, prior_epoch);
417 if expected != ¤t_id {
418 tx.rollback().await.map_err(map_sqlx_error)?;
419 return Ok(RevokeLeaseResult::AlreadySatisfied {
420 reason: "lease_id_mismatch".to_owned(),
421 });
422 }
423 }
424
425 let affected = sqlx::query(
429 r#"
430 UPDATE ff_attempt
431 SET worker_instance_id = NULL,
432 lease_expires_at_ms = NULL,
433 lease_epoch = lease_epoch + 1
434 WHERE partition_key = $1
435 AND execution_id = $2
436 AND attempt_index = $3
437 AND lease_epoch = $4
438 "#,
439 )
440 .bind(partition_key)
441 .bind(exec_uuid)
442 .bind(attempt_index)
443 .bind(prior_epoch)
444 .execute(&mut *tx)
445 .await
446 .map_err(map_sqlx_error)?
447 .rows_affected();
448
449 if affected == 0 {
450 tx.rollback().await.map_err(map_sqlx_error)?;
451 return Ok(RevokeLeaseResult::AlreadySatisfied {
452 reason: "epoch_moved".to_owned(),
453 });
454 }
455
456 sqlx::query(
462 r#"
463 UPDATE ff_exec_core
464 SET lifecycle_phase = 'runnable',
465 ownership_state = 'unowned',
466 eligibility_state = 'eligible_now',
467 attempt_state = 'attempt_interrupted',
468 raw_fields = jsonb_set(raw_fields,
469 '{last_mutation_at}',
470 to_jsonb($3::text))
471 WHERE partition_key = $1 AND execution_id = $2
472 AND lifecycle_phase = 'active'
473 "#,
474 )
475 .bind(partition_key)
476 .bind(exec_uuid)
477 .bind(now)
478 .execute(&mut *tx)
479 .await
480 .map_err(map_sqlx_error)?;
481
482 lease_event::emit(
484 &mut tx,
485 partition_key,
486 exec_uuid,
487 None,
488 lease_event::EVENT_REVOKED,
489 now,
490 )
491 .await?;
492
493 tx.commit().await.map_err(map_sqlx_error)?;
494
495 Ok(RevokeLeaseResult::Revoked {
496 lease_id: synthetic_lease_id(exec_uuid, attempt_index, prior_epoch),
497 lease_epoch: (prior_epoch + 1).to_string(),
498 })
499}
500
501pub(super) async fn revoke_lease_impl(
504 pool: &PgPool,
505 args: RevokeLeaseArgs,
506) -> Result<RevokeLeaseResult, EngineError> {
507 let mut last: Option<EngineError> = None;
508 for attempt in 0..MAX_ATTEMPTS {
509 match revoke_lease_once(pool, &args).await {
510 Ok(r) => return Ok(r),
511 Err(err) => {
512 if is_serialization_conflict(&err) {
513 if attempt + 1 < MAX_ATTEMPTS {
514 let ms = 5u64 * (1u64 << attempt);
515 tokio::time::sleep(Duration::from_millis(ms)).await;
516 }
517 last = Some(err);
518 continue;
519 }
520 return Err(err);
521 }
522 }
523 }
524 let _ = last;
525 Err(EngineError::Contention(ContentionKind::RetryExhausted))
526}
527
528async fn retry_serializable<F, Fut, T>(mut f: F) -> Result<T, EngineError>
533where
534 F: FnMut() -> Fut,
535 Fut: std::future::Future<Output = Result<T, EngineError>>,
536{
537 let mut last: Option<EngineError> = None;
538 for attempt in 0..MAX_ATTEMPTS {
539 match f().await {
540 Ok(v) => return Ok(v),
541 Err(err) => {
542 if is_serialization_conflict(&err) {
543 if attempt + 1 < MAX_ATTEMPTS {
544 let ms = 5u64 * (1u64 << attempt);
545 tokio::time::sleep(Duration::from_millis(ms)).await;
546 }
547 last = Some(err);
548 continue;
549 }
550 return Err(err);
551 }
552 }
553 }
554 let _ = last;
555 Err(EngineError::Contention(ContentionKind::RetryExhausted))
556}
557
558async fn change_priority_once(
559 pool: &PgPool,
560 args: &ChangePriorityArgs,
561) -> Result<ChangePriorityResult, EngineError> {
562 let partition_key: i16 = args.execution_id.partition() as i16;
563 let exec_uuid = eid_uuid(&args.execution_id);
564 let now: i64 = args.now.0;
565
566 let mut tx = begin_serializable(pool).await?;
567
568 let row = sqlx::query(
573 r#"
574 SELECT lifecycle_phase, eligibility_state, priority
575 FROM ff_exec_core
576 WHERE partition_key = $1 AND execution_id = $2
577 FOR NO KEY UPDATE
578 "#,
579 )
580 .bind(partition_key)
581 .bind(exec_uuid)
582 .fetch_optional(&mut *tx)
583 .await
584 .map_err(map_sqlx_error)?;
585
586 let Some(row) = row else {
587 tx.rollback().await.map_err(map_sqlx_error)?;
588 return Err(EngineError::NotFound {
589 entity: "execution",
590 });
591 };
592
593 let lifecycle_phase: String = row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
594 let eligibility_state: String = row.try_get("eligibility_state").map_err(map_sqlx_error)?;
595 let old_priority: i32 = row.try_get("priority").map_err(map_sqlx_error)?;
596
597 if lifecycle_phase != "runnable" || eligibility_state != "eligible_now" {
600 tx.rollback().await.map_err(map_sqlx_error)?;
601 return Err(EngineError::Contention(
602 ContentionKind::ExecutionNotEligible,
603 ));
604 }
605
606 let new_priority = args.new_priority.clamp(0, 9000);
610
611 let affected = sqlx::query(
616 r#"
617 UPDATE ff_exec_core
618 SET priority = $3,
619 raw_fields = jsonb_set(raw_fields,
620 '{last_mutation_at}',
621 to_jsonb($4::text))
622 WHERE partition_key = $1 AND execution_id = $2
623 AND lifecycle_phase = 'runnable'
624 AND eligibility_state = 'eligible_now'
625 "#,
626 )
627 .bind(partition_key)
628 .bind(exec_uuid)
629 .bind(new_priority)
630 .bind(now)
631 .execute(&mut *tx)
632 .await
633 .map_err(map_sqlx_error)?
634 .rows_affected();
635
636 if affected == 0 {
637 tx.rollback().await.map_err(map_sqlx_error)?;
638 return Err(EngineError::Contention(
639 ContentionKind::ExecutionNotEligible,
640 ));
641 }
642
643 operator_event::emit(
645 &mut tx,
646 partition_key,
647 exec_uuid,
648 operator_event::EVENT_PRIORITY_CHANGED,
649 json!({
650 "old_priority": old_priority,
651 "new_priority": new_priority,
652 }),
653 now,
654 )
655 .await?;
656
657 tx.commit().await.map_err(map_sqlx_error)?;
658
659 Ok(ChangePriorityResult::Changed {
660 execution_id: args.execution_id.clone(),
661 })
662}
663
664pub(super) async fn change_priority_impl(
665 pool: &PgPool,
666 args: ChangePriorityArgs,
667) -> Result<ChangePriorityResult, EngineError> {
668 retry_serializable(|| change_priority_once(pool, &args)).await
669}
670
671async fn replay_execution_once(
674 pool: &PgPool,
675 args: &ReplayExecutionArgs,
676) -> Result<ReplayExecutionResult, EngineError> {
677 let partition_key: i16 = args.execution_id.partition() as i16;
678 let exec_uuid = eid_uuid(&args.execution_id);
679 let now: i64 = args.now.0;
680
681 let mut tx = begin_serializable(pool).await?;
682
683 let ec_row = sqlx::query(
686 r#"
687 SELECT lifecycle_phase, flow_id, attempt_index, priority, raw_fields
688 FROM ff_exec_core
689 WHERE partition_key = $1 AND execution_id = $2
690 FOR NO KEY UPDATE
691 "#,
692 )
693 .bind(partition_key)
694 .bind(exec_uuid)
695 .fetch_optional(&mut *tx)
696 .await
697 .map_err(map_sqlx_error)?;
698
699 let Some(ec_row) = ec_row else {
700 tx.rollback().await.map_err(map_sqlx_error)?;
701 return Err(EngineError::NotFound {
702 entity: "execution",
703 });
704 };
705
706 let lifecycle_phase: String = ec_row
707 .try_get("lifecycle_phase")
708 .map_err(map_sqlx_error)?;
709 let flow_id: Option<Uuid> = ec_row.try_get("flow_id").map_err(map_sqlx_error)?;
710 let attempt_index: i32 = ec_row.try_get("attempt_index").map_err(map_sqlx_error)?;
711
712 if lifecycle_phase != "terminal" {
715 tx.rollback().await.map_err(map_sqlx_error)?;
716 return Err(EngineError::State(StateKind::ExecutionNotTerminal));
717 }
718
719 let att_row = sqlx::query(
722 r#"
723 SELECT outcome
724 FROM ff_attempt
725 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
726 FOR UPDATE
727 "#,
728 )
729 .bind(partition_key)
730 .bind(exec_uuid)
731 .bind(attempt_index)
732 .fetch_optional(&mut *tx)
733 .await
734 .map_err(map_sqlx_error)?;
735
736 let attempt_outcome: Option<String> = match att_row.as_ref() {
737 Some(r) => r.try_get("outcome").map_err(map_sqlx_error)?,
738 None => None,
739 };
740
741 let is_skipped_flow_member =
745 attempt_outcome.as_deref() == Some("skipped") && flow_id.is_some();
746
747 let groups_reset: i64 = if is_skipped_flow_member {
752 let count = sqlx::query(
753 r#"
754 UPDATE ff_edge_group
755 SET skip_count = 0,
756 fail_count = 0,
757 running_count = 0
758 WHERE (partition_key, flow_id, downstream_eid) IN (
759 SELECT DISTINCT e.partition_key, e.flow_id, e.downstream_eid
760 FROM ff_edge e
761 WHERE e.partition_key = $1
762 AND e.downstream_eid = $2
763 )
764 "#,
765 )
766 .bind(partition_key)
767 .bind(exec_uuid)
768 .execute(&mut *tx)
769 .await
770 .map_err(map_sqlx_error)?
771 .rows_affected();
772 count as i64
773 } else {
774 0
775 };
776
777 let (eligibility_state, public_state) = if is_skipped_flow_member {
790 ("blocked_by_dependencies", "waiting_children")
791 } else {
792 ("eligible_now", "waiting")
793 };
794
795 sqlx::query(
796 r#"
797 UPDATE ff_exec_core
798 SET lifecycle_phase = 'runnable',
799 ownership_state = 'unowned',
800 eligibility_state = $3,
801 public_state = $4,
802 attempt_state = 'pending_replay_attempt',
803 terminal_at_ms = NULL,
804 result = NULL,
805 cancellation_reason = NULL,
806 cancelled_by = NULL,
807 raw_fields = jsonb_set(
808 jsonb_set(raw_fields, '{last_mutation_at}', to_jsonb($5::text)),
809 '{replay_count}',
810 to_jsonb(COALESCE((raw_fields->>'replay_count')::int, 0) + 1)
811 )
812 WHERE partition_key = $1 AND execution_id = $2
813 "#,
814 )
815 .bind(partition_key)
816 .bind(exec_uuid)
817 .bind(eligibility_state)
818 .bind(public_state)
819 .bind(now)
820 .execute(&mut *tx)
821 .await
822 .map_err(map_sqlx_error)?;
823
824 if att_row.is_some() {
831 sqlx::query(
832 r#"
833 UPDATE ff_attempt
834 SET outcome = NULL,
835 terminal_at_ms = NULL,
836 worker_id = NULL,
837 worker_instance_id = NULL,
838 lease_expires_at_ms = NULL,
839 lease_epoch = lease_epoch + 1
840 WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
841 "#,
842 )
843 .bind(partition_key)
844 .bind(exec_uuid)
845 .bind(attempt_index)
846 .execute(&mut *tx)
847 .await
848 .map_err(map_sqlx_error)?;
849 }
850
851 let details = if is_skipped_flow_member {
853 json!({
854 "branch": "skipped_flow_member",
855 "groups_reset": groups_reset,
856 })
857 } else {
858 json!({
859 "branch": "normal",
860 })
861 };
862 operator_event::emit(
863 &mut tx,
864 partition_key,
865 exec_uuid,
866 operator_event::EVENT_REPLAYED,
867 details,
868 now,
869 )
870 .await?;
871
872 tx.commit().await.map_err(map_sqlx_error)?;
873
874 let ps = if is_skipped_flow_member {
875 PublicState::WaitingChildren
876 } else {
877 PublicState::Waiting
878 };
879 Ok(ReplayExecutionResult::Replayed { public_state: ps })
880}
881
882pub(super) async fn replay_execution_impl(
883 pool: &PgPool,
884 args: ReplayExecutionArgs,
885) -> Result<ReplayExecutionResult, EngineError> {
886 retry_serializable(|| replay_execution_once(pool, &args)).await
887}
888
889fn member_wire_id(partition_key: i16, exec_uuid: Uuid) -> String {
894 format!("{{fp:{partition_key}}}:{exec_uuid}")
895}
896
897async fn cancel_flow_header_once(
898 pool: &PgPool,
899 partition_config: &ff_core::partition::PartitionConfig,
900 args: &CancelFlowArgs,
901) -> Result<CancelFlowHeader, EngineError> {
902 let flow_uuid: Uuid = args.flow_id.0;
903 let partition_key: i16 =
908 ff_core::partition::flow_partition(&args.flow_id, partition_config).index as i16;
909 let now: i64 = args.now.0;
910
911 let mut tx = begin_serializable(pool).await?;
912
913 let flow_row = sqlx::query(
920 r#"
921 SELECT public_flow_state, raw_fields
922 FROM ff_flow_core
923 WHERE partition_key = $1 AND flow_id = $2
924 FOR NO KEY UPDATE
925 "#,
926 )
927 .bind(partition_key)
928 .bind(flow_uuid)
929 .fetch_optional(&mut *tx)
930 .await
931 .map_err(map_sqlx_error)?;
932
933 let Some(flow_row) = flow_row else {
934 tx.rollback().await.map_err(map_sqlx_error)?;
935 return Err(EngineError::NotFound { entity: "flow" });
936 };
937
938 let public_flow_state: String = flow_row
939 .try_get("public_flow_state")
940 .map_err(map_sqlx_error)?;
941 let raw_fields: serde_json::Value = flow_row.try_get("raw_fields").map_err(map_sqlx_error)?;
942
943 if matches!(public_flow_state.as_str(), "cancelled" | "completed" | "failed") {
948 let stored_cancellation_policy = raw_fields
949 .get("cancellation_policy")
950 .and_then(|v| v.as_str())
951 .map(str::to_owned);
952 let stored_cancel_reason = raw_fields
953 .get("cancel_reason")
954 .and_then(|v| v.as_str())
955 .map(str::to_owned);
956
957 let member_rows = sqlx::query(
961 r#"
962 SELECT execution_id
963 FROM ff_cancel_backlog_member
964 WHERE partition_key = $1 AND flow_id = $2
965 "#,
966 )
967 .bind(partition_key)
968 .bind(flow_uuid)
969 .fetch_all(&mut *tx)
970 .await
971 .map_err(map_sqlx_error)?;
972
973 let members: Vec<String> = if member_rows.is_empty() {
974 let live = sqlx::query(
976 r#"
977 SELECT execution_id
978 FROM ff_exec_core
979 WHERE partition_key = $1 AND flow_id = $2
980 "#,
981 )
982 .bind(partition_key)
983 .bind(flow_uuid)
984 .fetch_all(&mut *tx)
985 .await
986 .map_err(map_sqlx_error)?;
987 live.iter()
988 .map(|r| {
989 let u: Uuid = r.get("execution_id");
990 member_wire_id(partition_key, u)
991 })
992 .collect()
993 } else {
994 member_rows
995 .iter()
996 .map(|r| r.get::<String, _>("execution_id"))
997 .collect()
998 };
999
1000 tx.commit().await.map_err(map_sqlx_error)?;
1001 return Ok(CancelFlowHeader::AlreadyTerminal {
1002 stored_cancellation_policy,
1003 stored_cancel_reason,
1004 member_execution_ids: members,
1005 });
1006 }
1007
1008 sqlx::query(
1013 r#"
1014 UPDATE ff_flow_core
1015 SET public_flow_state = 'cancelled',
1016 terminal_at_ms = COALESCE(terminal_at_ms, $3),
1017 raw_fields = raw_fields
1018 || jsonb_build_object(
1019 'cancellation_policy', $4::text,
1020 'cancel_reason', $5::text)
1021 WHERE partition_key = $1 AND flow_id = $2
1022 "#,
1023 )
1024 .bind(partition_key)
1025 .bind(flow_uuid)
1026 .bind(now)
1027 .bind(&args.cancellation_policy)
1028 .bind(&args.reason)
1029 .execute(&mut *tx)
1030 .await
1031 .map_err(map_sqlx_error)?;
1032
1033 sqlx::query(
1034 r#"
1035 INSERT INTO ff_cancel_backlog
1036 (partition_key, flow_id, requested_at_ms, requester, reason,
1037 cancellation_policy, status)
1038 VALUES ($1, $2, $3, '', $4, $5, 'pending')
1039 ON CONFLICT (partition_key, flow_id) DO NOTHING
1040 "#,
1041 )
1042 .bind(partition_key)
1043 .bind(flow_uuid)
1044 .bind(now)
1045 .bind(&args.reason)
1046 .bind(&args.cancellation_policy)
1047 .execute(&mut *tx)
1048 .await
1049 .map_err(map_sqlx_error)?;
1050
1051 let member_rows = sqlx::query(
1053 r#"
1054 SELECT execution_id
1055 FROM ff_exec_core
1056 WHERE partition_key = $1 AND flow_id = $2
1057 AND lifecycle_phase NOT IN ('terminal','cancelled')
1058 "#,
1059 )
1060 .bind(partition_key)
1061 .bind(flow_uuid)
1062 .fetch_all(&mut *tx)
1063 .await
1064 .map_err(map_sqlx_error)?;
1065
1066 let member_uuids: Vec<Uuid> = member_rows.iter().map(|r| r.get("execution_id")).collect();
1067 let member_execution_ids: Vec<String> = member_uuids
1068 .iter()
1069 .map(|u| member_wire_id(partition_key, *u))
1070 .collect();
1071
1072 if !member_uuids.is_empty() {
1073 sqlx::query(
1077 r#"
1078 INSERT INTO ff_cancel_backlog_member
1079 (partition_key, flow_id, execution_id)
1080 SELECT $1, $2, eid
1081 FROM UNNEST($3::text[]) AS eid
1082 ON CONFLICT (partition_key, flow_id, execution_id) DO NOTHING
1083 "#,
1084 )
1085 .bind(partition_key)
1086 .bind(flow_uuid)
1087 .bind(&member_execution_ids)
1088 .execute(&mut *tx)
1089 .await
1090 .map_err(map_sqlx_error)?;
1091
1092 sqlx::query(
1098 r#"
1099 UPDATE ff_exec_core
1100 SET lifecycle_phase = 'cancelled',
1101 eligibility_state = 'cancelled',
1102 public_state = 'cancelled',
1103 terminal_at_ms = COALESCE(terminal_at_ms, $3),
1104 cancellation_reason = COALESCE(cancellation_reason, $4),
1105 cancelled_by = COALESCE(cancelled_by, 'cancel_flow_header')
1106 WHERE partition_key = $1 AND execution_id = ANY($2::uuid[])
1107 "#,
1108 )
1109 .bind(partition_key)
1110 .bind(&member_uuids)
1111 .bind(now)
1112 .bind(&args.reason)
1113 .execute(&mut *tx)
1114 .await
1115 .map_err(map_sqlx_error)?;
1116 }
1117
1118 operator_event::emit(
1120 &mut tx,
1121 partition_key,
1122 flow_uuid,
1123 operator_event::EVENT_FLOW_CANCEL_REQUESTED,
1124 json!({
1125 "flow_id": flow_uuid.to_string(),
1126 "cancellation_policy": &args.cancellation_policy,
1127 "reason": &args.reason,
1128 "member_count": member_execution_ids.len(),
1129 }),
1130 now,
1131 )
1132 .await?;
1133
1134 tx.commit().await.map_err(map_sqlx_error)?;
1135
1136 Ok(CancelFlowHeader::Cancelled {
1137 cancellation_policy: args.cancellation_policy.clone(),
1138 member_execution_ids,
1139 })
1140}
1141
1142pub(super) async fn cancel_flow_header_impl(
1143 pool: &PgPool,
1144 partition_config: &ff_core::partition::PartitionConfig,
1145 args: CancelFlowArgs,
1146) -> Result<CancelFlowHeader, EngineError> {
1147 retry_serializable(|| cancel_flow_header_once(pool, partition_config, &args)).await
1148}
1149
1150async fn ack_cancel_member_once(
1153 pool: &PgPool,
1154 partition_config: &ff_core::partition::PartitionConfig,
1155 flow_id: &FlowId,
1156 execution_id: &ExecutionId,
1157) -> Result<(), EngineError> {
1158 let flow_uuid: Uuid = flow_id.0;
1159 let partition_key: i16 =
1160 ff_core::partition::flow_partition(flow_id, partition_config).index as i16;
1161 let member_wire = execution_id.as_str();
1162
1163 let mut tx = begin_serializable(pool).await?;
1164
1165 sqlx::query(
1184 r#"
1185 DELETE FROM ff_cancel_backlog_member
1186 WHERE partition_key = $1
1187 AND flow_id = $2
1188 AND execution_id = $3
1189 "#,
1190 )
1191 .bind(partition_key)
1192 .bind(flow_uuid)
1193 .bind(member_wire)
1194 .execute(&mut *tx)
1195 .await
1196 .map_err(map_sqlx_error)?;
1197
1198 sqlx::query(
1199 r#"
1200 DELETE FROM ff_cancel_backlog
1201 WHERE partition_key = $1
1202 AND flow_id = $2
1203 AND NOT EXISTS (
1204 SELECT 1 FROM ff_cancel_backlog_member
1205 WHERE partition_key = $1 AND flow_id = $2
1206 )
1207 "#,
1208 )
1209 .bind(partition_key)
1210 .bind(flow_uuid)
1211 .execute(&mut *tx)
1212 .await
1213 .map_err(map_sqlx_error)?;
1214
1215 tx.commit().await.map_err(map_sqlx_error)?;
1216 Ok(())
1217}
1218
1219pub(super) async fn ack_cancel_member_impl(
1220 pool: &PgPool,
1221 partition_config: &ff_core::partition::PartitionConfig,
1222 flow_id: FlowId,
1223 execution_id: ExecutionId,
1224) -> Result<(), EngineError> {
1225 retry_serializable(|| {
1226 ack_cancel_member_once(pool, partition_config, &flow_id, &execution_id)
1227 })
1228 .await
1229}