use std::time::Duration;
use ff_core::contracts::{
CancelExecutionArgs, CancelExecutionResult, CancelFlowArgs, CancelFlowHeader,
ChangePriorityArgs, ChangePriorityResult, ReplayExecutionArgs, ReplayExecutionResult,
RevokeLeaseArgs, RevokeLeaseResult,
};
use ff_core::engine_error::{ContentionKind, EngineError, StateKind, ValidationKind};
use ff_core::state::PublicState;
use ff_core::types::{CancelSource, ExecutionId, FlowId};
use serde_json::json;
use sqlx::{PgPool, Postgres, Row};
use uuid::Uuid;
use crate::error::map_sqlx_error;
use crate::{lease_event, operator_event};
const MAX_ATTEMPTS: u32 = 3;
fn eid_uuid(eid: &ff_core::types::ExecutionId) -> Uuid {
let s = eid.as_str();
let suffix = s
.split_once("}:")
.map(|(_, u)| u)
.expect("ExecutionId has `}:` separator (invariant)");
Uuid::parse_str(suffix).expect("ExecutionId suffix is a valid UUID (invariant)")
}
fn now_ms() -> i64 {
let d = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock is after UNIX_EPOCH");
(d.as_millis() as i64).max(0)
}
fn is_serialization_conflict(err: &EngineError) -> bool {
matches!(err, EngineError::Contention(ContentionKind::LeaseConflict))
}
async fn begin_serializable(pool: &PgPool) -> Result<sqlx::Transaction<'_, Postgres>, EngineError> {
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
Ok(tx)
}
fn synthetic_lease_id(exec_uuid: Uuid, attempt_index: i32, lease_epoch: i64) -> String {
format!("pg:{exec_uuid}:{attempt_index}:{lease_epoch}")
}
async fn cancel_execution_once(
pool: &PgPool,
args: &CancelExecutionArgs,
) -> Result<CancelExecutionResult, EngineError> {
let partition_key: i16 = args.execution_id.partition() as i16;
let exec_uuid = eid_uuid(&args.execution_id);
let now: i64 = args.now.0;
let mut tx = begin_serializable(pool).await?;
let row = sqlx::query(
r#"
SELECT ec.lifecycle_phase,
ec.public_state,
ec.attempt_index,
a.worker_instance_id,
a.lease_epoch
FROM ff_exec_core ec
LEFT JOIN ff_attempt a
ON a.partition_key = ec.partition_key
AND a.execution_id = ec.execution_id
AND a.attempt_index = ec.attempt_index
WHERE ec.partition_key = $1 AND ec.execution_id = $2
FOR NO KEY UPDATE OF ec
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
tx.rollback().await.map_err(map_sqlx_error)?;
return Err(EngineError::NotFound {
entity: "execution",
});
};
let lifecycle_phase: String = row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
let public_state: String = row.try_get("public_state").map_err(map_sqlx_error)?;
let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
let worker_instance_id: Option<String> =
row.try_get("worker_instance_id").map_err(map_sqlx_error)?;
let lease_epoch: Option<i64> = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
if matches!(lifecycle_phase.as_str(), "terminal" | "cancelled") {
tx.rollback().await.map_err(map_sqlx_error)?;
return if public_state == "cancelled" {
Ok(CancelExecutionResult::Cancelled {
execution_id: args.execution_id.clone(),
public_state: PublicState::Cancelled,
})
} else {
Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!(
"cancel_execution: execution_id={}: already terminal in state '{}'",
args.execution_id, public_state
),
})
};
}
let lease_active = worker_instance_id
.as_deref()
.is_some_and(|s| !s.is_empty());
if !matches!(args.source, CancelSource::OperatorOverride) && lease_active {
let Some(expected_epoch) = args.lease_epoch.as_ref() else {
tx.rollback().await.map_err(map_sqlx_error)?;
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!(
"cancel_execution: execution_id={}: lease_epoch required when source != operator_override and execution is active",
args.execution_id
),
});
};
let expected: i64 = i64::try_from(expected_epoch.0).unwrap_or(i64::MAX);
if lease_epoch.unwrap_or(0) != expected {
tx.rollback().await.map_err(map_sqlx_error)?;
return Err(EngineError::State(
ff_core::engine_error::StateKind::StaleLease,
));
}
}
sqlx::query(
r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'cancelled',
ownership_state = 'unowned',
eligibility_state = 'not_applicable',
public_state = 'cancelled',
attempt_state = 'cancelled',
terminal_at_ms = COALESCE(terminal_at_ms, $3),
cancellation_reason = COALESCE(cancellation_reason, $4),
cancelled_by = COALESCE(cancelled_by, $5),
raw_fields = jsonb_set(raw_fields,
'{last_mutation_at}',
to_jsonb($3::text))
WHERE partition_key = $1 AND execution_id = $2
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.bind(now)
.bind(&args.reason)
.bind(args.source.as_str())
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
if lease_active {
sqlx::query(
r#"
UPDATE ff_attempt
SET worker_instance_id = NULL,
lease_expires_at_ms = NULL,
lease_epoch = lease_epoch + 1,
terminal_at_ms = $4,
outcome = 'cancelled'
WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.bind(attempt_index)
.bind(now)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
lease_event::emit(
&mut tx,
partition_key,
exec_uuid,
None, lease_event::EVENT_REVOKED,
now,
)
.await?;
}
tx.commit().await.map_err(map_sqlx_error)?;
Ok(CancelExecutionResult::Cancelled {
execution_id: args.execution_id.clone(),
public_state: PublicState::Cancelled,
})
}
pub(super) async fn cancel_execution_impl(
pool: &PgPool,
args: CancelExecutionArgs,
) -> Result<CancelExecutionResult, EngineError> {
let mut last: Option<EngineError> = None;
for attempt in 0..MAX_ATTEMPTS {
match cancel_execution_once(pool, &args).await {
Ok(r) => return Ok(r),
Err(err) => {
if is_serialization_conflict(&err) {
if attempt + 1 < MAX_ATTEMPTS {
let ms = 5u64 * (1u64 << attempt);
tokio::time::sleep(Duration::from_millis(ms)).await;
}
last = Some(err);
continue;
}
return Err(err);
}
}
}
let _ = last;
Err(EngineError::Contention(ContentionKind::RetryExhausted))
}
async fn revoke_lease_once(
pool: &PgPool,
args: &RevokeLeaseArgs,
) -> Result<RevokeLeaseResult, EngineError> {
let partition_key: i16 = args.execution_id.partition() as i16;
let exec_uuid = eid_uuid(&args.execution_id);
let now = now_ms();
let mut tx = begin_serializable(pool).await?;
let ec_row = sqlx::query(
r#"
SELECT attempt_index
FROM ff_exec_core
WHERE partition_key = $1 AND execution_id = $2
FOR NO KEY UPDATE
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let Some(ec_row) = ec_row else {
tx.rollback().await.map_err(map_sqlx_error)?;
return Err(EngineError::NotFound {
entity: "execution",
});
};
let attempt_index: i32 = ec_row.try_get("attempt_index").map_err(map_sqlx_error)?;
let att_row = sqlx::query(
r#"
SELECT worker_instance_id, lease_epoch
FROM ff_attempt
WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
FOR UPDATE
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.bind(attempt_index)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let (worker_instance_id, lease_epoch): (Option<String>, Option<i64>) = match att_row {
Some(r) => (
r.try_get("worker_instance_id").map_err(map_sqlx_error)?,
r.try_get("lease_epoch").map_err(map_sqlx_error)?,
),
None => (None, None),
};
let lease_active = worker_instance_id
.as_deref()
.is_some_and(|s| !s.is_empty());
if !lease_active {
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(RevokeLeaseResult::AlreadySatisfied {
reason: "no_active_lease".to_owned(),
});
}
let caller_wiid = args.worker_instance_id.as_str();
if !caller_wiid.is_empty()
&& worker_instance_id.as_deref() != Some(caller_wiid)
{
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(RevokeLeaseResult::AlreadySatisfied {
reason: "different_worker_instance_id".to_owned(),
});
}
let prior_epoch = lease_epoch.unwrap_or(0);
if let Some(expected) = args
.expected_lease_id
.as_ref()
.filter(|s| !s.is_empty())
{
let current_id = synthetic_lease_id(exec_uuid, attempt_index, prior_epoch);
if expected != ¤t_id {
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(RevokeLeaseResult::AlreadySatisfied {
reason: "lease_id_mismatch".to_owned(),
});
}
}
let affected = sqlx::query(
r#"
UPDATE ff_attempt
SET worker_instance_id = NULL,
lease_expires_at_ms = NULL,
lease_epoch = lease_epoch + 1
WHERE partition_key = $1
AND execution_id = $2
AND attempt_index = $3
AND lease_epoch = $4
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.bind(attempt_index)
.bind(prior_epoch)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?
.rows_affected();
if affected == 0 {
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(RevokeLeaseResult::AlreadySatisfied {
reason: "epoch_moved".to_owned(),
});
}
sqlx::query(
r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'runnable',
ownership_state = 'unowned',
eligibility_state = 'eligible_now',
attempt_state = 'attempt_interrupted',
raw_fields = jsonb_set(raw_fields,
'{last_mutation_at}',
to_jsonb($3::text))
WHERE partition_key = $1 AND execution_id = $2
AND lifecycle_phase = 'active'
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.bind(now)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
lease_event::emit(
&mut tx,
partition_key,
exec_uuid,
None,
lease_event::EVENT_REVOKED,
now,
)
.await?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(RevokeLeaseResult::Revoked {
lease_id: synthetic_lease_id(exec_uuid, attempt_index, prior_epoch),
lease_epoch: (prior_epoch + 1).to_string(),
})
}
pub(super) async fn revoke_lease_impl(
pool: &PgPool,
args: RevokeLeaseArgs,
) -> Result<RevokeLeaseResult, EngineError> {
let mut last: Option<EngineError> = None;
for attempt in 0..MAX_ATTEMPTS {
match revoke_lease_once(pool, &args).await {
Ok(r) => return Ok(r),
Err(err) => {
if is_serialization_conflict(&err) {
if attempt + 1 < MAX_ATTEMPTS {
let ms = 5u64 * (1u64 << attempt);
tokio::time::sleep(Duration::from_millis(ms)).await;
}
last = Some(err);
continue;
}
return Err(err);
}
}
}
let _ = last;
Err(EngineError::Contention(ContentionKind::RetryExhausted))
}
async fn retry_serializable<F, Fut, T>(mut f: F) -> Result<T, EngineError>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, EngineError>>,
{
let mut last: Option<EngineError> = None;
for attempt in 0..MAX_ATTEMPTS {
match f().await {
Ok(v) => return Ok(v),
Err(err) => {
if is_serialization_conflict(&err) {
if attempt + 1 < MAX_ATTEMPTS {
let ms = 5u64 * (1u64 << attempt);
tokio::time::sleep(Duration::from_millis(ms)).await;
}
last = Some(err);
continue;
}
return Err(err);
}
}
}
let _ = last;
Err(EngineError::Contention(ContentionKind::RetryExhausted))
}
async fn change_priority_once(
pool: &PgPool,
args: &ChangePriorityArgs,
) -> Result<ChangePriorityResult, EngineError> {
let partition_key: i16 = args.execution_id.partition() as i16;
let exec_uuid = eid_uuid(&args.execution_id);
let now: i64 = args.now.0;
let mut tx = begin_serializable(pool).await?;
let row = sqlx::query(
r#"
SELECT lifecycle_phase, eligibility_state, priority
FROM ff_exec_core
WHERE partition_key = $1 AND execution_id = $2
FOR NO KEY UPDATE
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
tx.rollback().await.map_err(map_sqlx_error)?;
return Err(EngineError::NotFound {
entity: "execution",
});
};
let lifecycle_phase: String = row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
let eligibility_state: String = row.try_get("eligibility_state").map_err(map_sqlx_error)?;
let old_priority: i32 = row.try_get("priority").map_err(map_sqlx_error)?;
if lifecycle_phase != "runnable" || eligibility_state != "eligible_now" {
tx.rollback().await.map_err(map_sqlx_error)?;
return Err(EngineError::Contention(
ContentionKind::ExecutionNotEligible,
));
}
let new_priority = args.new_priority.clamp(0, 9000);
let affected = sqlx::query(
r#"
UPDATE ff_exec_core
SET priority = $3,
raw_fields = jsonb_set(raw_fields,
'{last_mutation_at}',
to_jsonb($4::text))
WHERE partition_key = $1 AND execution_id = $2
AND lifecycle_phase = 'runnable'
AND eligibility_state = 'eligible_now'
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.bind(new_priority)
.bind(now)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?
.rows_affected();
if affected == 0 {
tx.rollback().await.map_err(map_sqlx_error)?;
return Err(EngineError::Contention(
ContentionKind::ExecutionNotEligible,
));
}
operator_event::emit(
&mut tx,
partition_key,
exec_uuid,
operator_event::EVENT_PRIORITY_CHANGED,
json!({
"old_priority": old_priority,
"new_priority": new_priority,
}),
now,
)
.await?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(ChangePriorityResult::Changed {
execution_id: args.execution_id.clone(),
})
}
pub(super) async fn change_priority_impl(
pool: &PgPool,
args: ChangePriorityArgs,
) -> Result<ChangePriorityResult, EngineError> {
retry_serializable(|| change_priority_once(pool, &args)).await
}
async fn replay_execution_once(
pool: &PgPool,
args: &ReplayExecutionArgs,
) -> Result<ReplayExecutionResult, EngineError> {
let partition_key: i16 = args.execution_id.partition() as i16;
let exec_uuid = eid_uuid(&args.execution_id);
let now: i64 = args.now.0;
let mut tx = begin_serializable(pool).await?;
let ec_row = sqlx::query(
r#"
SELECT lifecycle_phase, flow_id, attempt_index, priority, raw_fields
FROM ff_exec_core
WHERE partition_key = $1 AND execution_id = $2
FOR NO KEY UPDATE
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let Some(ec_row) = ec_row else {
tx.rollback().await.map_err(map_sqlx_error)?;
return Err(EngineError::NotFound {
entity: "execution",
});
};
let lifecycle_phase: String = ec_row
.try_get("lifecycle_phase")
.map_err(map_sqlx_error)?;
let flow_id: Option<Uuid> = ec_row.try_get("flow_id").map_err(map_sqlx_error)?;
let attempt_index: i32 = ec_row.try_get("attempt_index").map_err(map_sqlx_error)?;
if lifecycle_phase != "terminal" {
tx.rollback().await.map_err(map_sqlx_error)?;
return Err(EngineError::State(StateKind::ExecutionNotTerminal));
}
let att_row = sqlx::query(
r#"
SELECT outcome
FROM ff_attempt
WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
FOR UPDATE
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.bind(attempt_index)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let attempt_outcome: Option<String> = match att_row.as_ref() {
Some(r) => r.try_get("outcome").map_err(map_sqlx_error)?,
None => None,
};
let is_skipped_flow_member =
attempt_outcome.as_deref() == Some("skipped") && flow_id.is_some();
let groups_reset: i64 = if is_skipped_flow_member {
let count = sqlx::query(
r#"
UPDATE ff_edge_group
SET skip_count = 0,
fail_count = 0,
running_count = 0
WHERE (partition_key, flow_id, downstream_eid) IN (
SELECT DISTINCT e.partition_key, e.flow_id, e.downstream_eid
FROM ff_edge e
WHERE e.partition_key = $1
AND e.downstream_eid = $2
)
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?
.rows_affected();
count as i64
} else {
0
};
let (eligibility_state, public_state) = if is_skipped_flow_member {
("blocked_by_dependencies", "waiting_children")
} else {
("eligible_now", "waiting")
};
sqlx::query(
r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'runnable',
ownership_state = 'unowned',
eligibility_state = $3,
public_state = $4,
attempt_state = 'pending_replay_attempt',
terminal_at_ms = NULL,
result = NULL,
cancellation_reason = NULL,
cancelled_by = NULL,
raw_fields = jsonb_set(
jsonb_set(raw_fields, '{last_mutation_at}', to_jsonb($5::text)),
'{replay_count}',
to_jsonb(COALESCE((raw_fields->>'replay_count')::int, 0) + 1)
)
WHERE partition_key = $1 AND execution_id = $2
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.bind(eligibility_state)
.bind(public_state)
.bind(now)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
if att_row.is_some() {
sqlx::query(
r#"
UPDATE ff_attempt
SET outcome = NULL,
terminal_at_ms = NULL,
worker_id = NULL,
worker_instance_id = NULL,
lease_expires_at_ms = NULL,
lease_epoch = lease_epoch + 1
WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
"#,
)
.bind(partition_key)
.bind(exec_uuid)
.bind(attempt_index)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
}
let details = if is_skipped_flow_member {
json!({
"branch": "skipped_flow_member",
"groups_reset": groups_reset,
})
} else {
json!({
"branch": "normal",
})
};
operator_event::emit(
&mut tx,
partition_key,
exec_uuid,
operator_event::EVENT_REPLAYED,
details,
now,
)
.await?;
tx.commit().await.map_err(map_sqlx_error)?;
let ps = if is_skipped_flow_member {
PublicState::WaitingChildren
} else {
PublicState::Waiting
};
Ok(ReplayExecutionResult::Replayed { public_state: ps })
}
pub(super) async fn replay_execution_impl(
pool: &PgPool,
args: ReplayExecutionArgs,
) -> Result<ReplayExecutionResult, EngineError> {
retry_serializable(|| replay_execution_once(pool, &args)).await
}
fn member_wire_id(partition_key: i16, exec_uuid: Uuid) -> String {
format!("{{fp:{partition_key}}}:{exec_uuid}")
}
async fn cancel_flow_header_once(
pool: &PgPool,
partition_config: &ff_core::partition::PartitionConfig,
args: &CancelFlowArgs,
) -> Result<CancelFlowHeader, EngineError> {
let flow_uuid: Uuid = args.flow_id.0;
let partition_key: i16 =
ff_core::partition::flow_partition(&args.flow_id, partition_config).index as i16;
let now: i64 = args.now.0;
let mut tx = begin_serializable(pool).await?;
let flow_row = sqlx::query(
r#"
SELECT public_flow_state, raw_fields
FROM ff_flow_core
WHERE partition_key = $1 AND flow_id = $2
FOR NO KEY UPDATE
"#,
)
.bind(partition_key)
.bind(flow_uuid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let Some(flow_row) = flow_row else {
tx.rollback().await.map_err(map_sqlx_error)?;
return Err(EngineError::NotFound { entity: "flow" });
};
let public_flow_state: String = flow_row
.try_get("public_flow_state")
.map_err(map_sqlx_error)?;
let raw_fields: serde_json::Value = flow_row.try_get("raw_fields").map_err(map_sqlx_error)?;
if matches!(public_flow_state.as_str(), "cancelled" | "completed" | "failed") {
let stored_cancellation_policy = raw_fields
.get("cancellation_policy")
.and_then(|v| v.as_str())
.map(str::to_owned);
let stored_cancel_reason = raw_fields
.get("cancel_reason")
.and_then(|v| v.as_str())
.map(str::to_owned);
let member_rows = sqlx::query(
r#"
SELECT execution_id
FROM ff_cancel_backlog_member
WHERE partition_key = $1 AND flow_id = $2
"#,
)
.bind(partition_key)
.bind(flow_uuid)
.fetch_all(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let members: Vec<String> = if member_rows.is_empty() {
let live = sqlx::query(
r#"
SELECT execution_id
FROM ff_exec_core
WHERE partition_key = $1 AND flow_id = $2
"#,
)
.bind(partition_key)
.bind(flow_uuid)
.fetch_all(&mut *tx)
.await
.map_err(map_sqlx_error)?;
live.iter()
.map(|r| {
let u: Uuid = r.get("execution_id");
member_wire_id(partition_key, u)
})
.collect()
} else {
member_rows
.iter()
.map(|r| r.get::<String, _>("execution_id"))
.collect()
};
tx.commit().await.map_err(map_sqlx_error)?;
return Ok(CancelFlowHeader::AlreadyTerminal {
stored_cancellation_policy,
stored_cancel_reason,
member_execution_ids: members,
});
}
sqlx::query(
r#"
UPDATE ff_flow_core
SET public_flow_state = 'cancelled',
terminal_at_ms = COALESCE(terminal_at_ms, $3),
raw_fields = raw_fields
|| jsonb_build_object(
'cancellation_policy', $4::text,
'cancel_reason', $5::text)
WHERE partition_key = $1 AND flow_id = $2
"#,
)
.bind(partition_key)
.bind(flow_uuid)
.bind(now)
.bind(&args.cancellation_policy)
.bind(&args.reason)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
r#"
INSERT INTO ff_cancel_backlog
(partition_key, flow_id, requested_at_ms, requester, reason,
cancellation_policy, status)
VALUES ($1, $2, $3, '', $4, $5, 'pending')
ON CONFLICT (partition_key, flow_id) DO NOTHING
"#,
)
.bind(partition_key)
.bind(flow_uuid)
.bind(now)
.bind(&args.reason)
.bind(&args.cancellation_policy)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let member_rows = sqlx::query(
r#"
SELECT execution_id
FROM ff_exec_core
WHERE partition_key = $1 AND flow_id = $2
AND lifecycle_phase NOT IN ('terminal','cancelled')
"#,
)
.bind(partition_key)
.bind(flow_uuid)
.fetch_all(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let member_uuids: Vec<Uuid> = member_rows.iter().map(|r| r.get("execution_id")).collect();
let member_execution_ids: Vec<String> = member_uuids
.iter()
.map(|u| member_wire_id(partition_key, *u))
.collect();
if !member_uuids.is_empty() {
sqlx::query(
r#"
INSERT INTO ff_cancel_backlog_member
(partition_key, flow_id, execution_id)
SELECT $1, $2, eid
FROM UNNEST($3::text[]) AS eid
ON CONFLICT (partition_key, flow_id, execution_id) DO NOTHING
"#,
)
.bind(partition_key)
.bind(flow_uuid)
.bind(&member_execution_ids)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'cancelled',
eligibility_state = 'cancelled',
public_state = 'cancelled',
terminal_at_ms = COALESCE(terminal_at_ms, $3),
cancellation_reason = COALESCE(cancellation_reason, $4),
cancelled_by = COALESCE(cancelled_by, 'cancel_flow_header')
WHERE partition_key = $1 AND execution_id = ANY($2::uuid[])
"#,
)
.bind(partition_key)
.bind(&member_uuids)
.bind(now)
.bind(&args.reason)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
}
operator_event::emit(
&mut tx,
partition_key,
flow_uuid,
operator_event::EVENT_FLOW_CANCEL_REQUESTED,
json!({
"flow_id": flow_uuid.to_string(),
"cancellation_policy": &args.cancellation_policy,
"reason": &args.reason,
"member_count": member_execution_ids.len(),
}),
now,
)
.await?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(CancelFlowHeader::Cancelled {
cancellation_policy: args.cancellation_policy.clone(),
member_execution_ids,
})
}
pub(super) async fn cancel_flow_header_impl(
pool: &PgPool,
partition_config: &ff_core::partition::PartitionConfig,
args: CancelFlowArgs,
) -> Result<CancelFlowHeader, EngineError> {
retry_serializable(|| cancel_flow_header_once(pool, partition_config, &args)).await
}
async fn ack_cancel_member_once(
pool: &PgPool,
partition_config: &ff_core::partition::PartitionConfig,
flow_id: &FlowId,
execution_id: &ExecutionId,
) -> Result<(), EngineError> {
let flow_uuid: Uuid = flow_id.0;
let partition_key: i16 =
ff_core::partition::flow_partition(flow_id, partition_config).index as i16;
let member_wire = execution_id.as_str();
let mut tx = begin_serializable(pool).await?;
sqlx::query(
r#"
DELETE FROM ff_cancel_backlog_member
WHERE partition_key = $1
AND flow_id = $2
AND execution_id = $3
"#,
)
.bind(partition_key)
.bind(flow_uuid)
.bind(member_wire)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
r#"
DELETE FROM ff_cancel_backlog
WHERE partition_key = $1
AND flow_id = $2
AND NOT EXISTS (
SELECT 1 FROM ff_cancel_backlog_member
WHERE partition_key = $1 AND flow_id = $2
)
"#,
)
.bind(partition_key)
.bind(flow_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(())
}
pub(super) async fn ack_cancel_member_impl(
pool: &PgPool,
partition_config: &ff_core::partition::PartitionConfig,
flow_id: FlowId,
execution_id: ExecutionId,
) -> Result<(), EngineError> {
retry_serializable(|| {
ack_cancel_member_once(pool, partition_config, &flow_id, &execution_id)
})
.await
}