use std::time::Duration;
use ff_core::contracts::{EdgeDependencyPolicy, OnSatisfied};
use ff_core::engine_error::{ContentionKind, EngineError};
use serde_json::Value as JsonValue;
use sqlx::{PgPool, Row};
use uuid::Uuid;
use crate::error::map_sqlx_error;
const ADVANCE_MAX_ATTEMPTS: u32 = 3;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DispatchOutcome {
NoOp,
Advanced(usize),
}
#[tracing::instrument(name = "pg.dispatch_completion", skip(pool))]
pub async fn dispatch_completion(
pool: &PgPool,
event_id: i64,
) -> Result<DispatchOutcome, EngineError> {
let now = now_ms();
let row = sqlx::query(
r#"
UPDATE ff_completion_event
SET dispatched_at_ms = $2
WHERE event_id = $1
AND dispatched_at_ms IS NULL
RETURNING partition_key, execution_id, flow_id, outcome
"#,
)
.bind(event_id)
.bind(now)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
return Ok(DispatchOutcome::NoOp);
};
let partition_key: i16 = row.get("partition_key");
let execution_id: Uuid = row.get("execution_id");
let flow_id: Option<Uuid> = row.get("flow_id");
let outcome: String = row.get("outcome");
let Some(flow_id) = flow_id else {
return Ok(DispatchOutcome::Advanced(0));
};
let edges = sqlx::query(
r#"
SELECT edge_id, downstream_eid
FROM ff_edge
WHERE partition_key = $1 AND flow_id = $2 AND upstream_eid = $3
"#,
)
.bind(partition_key)
.bind(flow_id)
.bind(execution_id)
.fetch_all(pool)
.await
.map_err(map_sqlx_error)?;
if edges.is_empty() {
return Ok(DispatchOutcome::Advanced(0));
}
let outcome_kind = OutcomeKind::from_str(&outcome);
let mut advanced: usize = 0;
for edge in &edges {
let downstream_eid: Uuid = edge.get("downstream_eid");
advance_edge_group_with_retry(
pool,
partition_key,
flow_id,
execution_id,
downstream_eid,
outcome_kind,
)
.await?;
advanced += 1;
}
Ok(DispatchOutcome::Advanced(advanced))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum OutcomeKind {
Success,
Fail,
Skip,
}
impl OutcomeKind {
fn from_str(s: &str) -> Self {
match s {
"success" => Self::Success,
"skipped" => Self::Skip,
_ => Self::Fail,
}
}
}
async fn advance_edge_group_with_retry(
pool: &PgPool,
partition_key: i16,
flow_id: Uuid,
upstream_eid: Uuid,
downstream_eid: Uuid,
outcome: OutcomeKind,
) -> Result<(), EngineError> {
for attempt in 0..ADVANCE_MAX_ATTEMPTS {
match advance_edge_group(
pool,
partition_key,
flow_id,
upstream_eid,
downstream_eid,
outcome,
)
.await
{
Ok(()) => return Ok(()),
Err(err) if is_serialization_conflict(&err) => {
if attempt + 1 < ADVANCE_MAX_ATTEMPTS {
let ms = 5u64 * (1u64 << attempt);
tokio::time::sleep(Duration::from_millis(ms)).await;
continue;
}
return Err(EngineError::Contention(ContentionKind::RetryExhausted));
}
Err(err) => return Err(err),
}
}
Err(EngineError::Contention(ContentionKind::RetryExhausted))
}
fn is_serialization_conflict(err: &EngineError) -> bool {
if matches!(err, EngineError::Contention(ContentionKind::LeaseConflict)) {
return true;
}
if let EngineError::Transport { source, .. } = err
&& let Some(sqlx_err) = source.downcast_ref::<sqlx::Error>()
&& let Some(db) = sqlx_err.as_database_error()
&& let Some(code) = db.code()
&& code.as_ref() == "55P03"
{
return true;
}
false
}
#[tracing::instrument(
name = "pg.advance_edge_group",
skip(pool),
fields(
part = partition_key,
flow = %flow_id,
downstream = %downstream_eid,
)
)]
async fn advance_edge_group(
pool: &PgPool,
partition_key: i16,
flow_id: Uuid,
upstream_eid: Uuid,
downstream_eid: Uuid,
outcome: OutcomeKind,
) -> Result<(), EngineError> {
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let row = sqlx::query(
r#"
SELECT policy, success_count, fail_count, skip_count, running_count,
cancel_siblings_pending_flag
FROM ff_edge_group
WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3
FOR UPDATE
"#,
)
.bind(partition_key)
.bind(flow_id)
.bind(downstream_eid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
tx.commit().await.map_err(map_sqlx_error)?;
return Ok(());
};
let policy_raw: JsonValue = row.get("policy");
let mut success: i32 = row.get("success_count");
let mut fail: i32 = row.get("fail_count");
let mut skip: i32 = row.get("skip_count");
let mut running: i32 = row.get("running_count");
let already_flagged: bool = row.get("cancel_siblings_pending_flag");
if running > 0 {
running -= 1;
}
match outcome {
OutcomeKind::Success => success += 1,
OutcomeKind::Fail => fail += 1,
OutcomeKind::Skip => skip += 1,
}
let policy = decode_policy(&policy_raw);
let total = success + fail + skip + running.max(0);
let decision = evaluate(&policy, success, fail, skip, total);
sqlx::query(
r#"
UPDATE ff_edge_group
SET success_count = $4,
fail_count = $5,
skip_count = $6,
running_count = $7
WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3
"#,
)
.bind(partition_key)
.bind(flow_id)
.bind(downstream_eid)
.bind(success)
.bind(fail)
.bind(skip)
.bind(running.max(0))
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let now = now_ms();
match decision {
Decision::Pending => { }
Decision::Satisfied { cancel_siblings } => {
sqlx::query(
r#"
UPDATE ff_exec_core
SET eligibility_state = 'eligible_now',
lifecycle_phase = CASE
WHEN lifecycle_phase = 'blocked' THEN 'runnable'
ELSE lifecycle_phase
END
WHERE partition_key = $1 AND execution_id = $2
AND lifecycle_phase NOT IN ('terminal','cancelled')
"#,
)
.bind(partition_key)
.bind(downstream_eid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
if cancel_siblings && !already_flagged {
let sibling_rows = sqlx::query(
r#"
SELECT ff_exec_core.execution_id
FROM ff_exec_core
JOIN ff_edge ON ff_edge.upstream_eid = ff_exec_core.execution_id
WHERE ff_exec_core.partition_key = $1
AND ff_edge.partition_key = $1
AND ff_edge.flow_id = $2
AND ff_edge.downstream_eid = $3
AND ff_exec_core.lifecycle_phase NOT IN ('terminal','cancelled')
AND ff_exec_core.public_state <> 'skipped'
AND ff_exec_core.execution_id <> $4
"#,
)
.bind(partition_key)
.bind(flow_id)
.bind(downstream_eid)
.bind(upstream_eid)
.fetch_all(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let members: Vec<String> = sibling_rows
.iter()
.map(|r| {
let u: Uuid = r.get("execution_id");
u.to_string()
})
.collect();
sqlx::query(
r#"
INSERT INTO ff_pending_cancel_groups
(partition_key, flow_id, downstream_eid, enqueued_at_ms)
VALUES ($1, $2, $3, $4)
ON CONFLICT DO NOTHING
"#,
)
.bind(partition_key)
.bind(flow_id)
.bind(downstream_eid)
.bind(now)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
r#"
UPDATE ff_edge_group
SET cancel_siblings_pending_flag = TRUE,
cancel_siblings_pending_members = $4
WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3
"#,
)
.bind(partition_key)
.bind(flow_id)
.bind(downstream_eid)
.bind(&members)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
}
}
Decision::Impossible => {
let updated = sqlx::query(
r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'terminal',
eligibility_state = 'not_applicable',
public_state = 'skipped',
attempt_state = 'attempt_terminal',
terminal_at_ms = COALESCE(terminal_at_ms, $3)
WHERE partition_key = $1 AND execution_id = $2
AND lifecycle_phase NOT IN ('terminal','cancelled')
RETURNING execution_id
"#,
)
.bind(partition_key)
.bind(downstream_eid)
.bind(now)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
if updated.is_some() {
sqlx::query(
r#"
INSERT INTO ff_completion_event
(partition_key, execution_id, flow_id, outcome, occurred_at_ms)
VALUES ($1, $2, $3, 'skipped', $4)
"#,
)
.bind(partition_key)
.bind(downstream_eid)
.bind(flow_id)
.bind(now)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
}
}
}
tx.commit().await.map_err(map_sqlx_error)?;
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Decision {
Pending,
Satisfied { cancel_siblings: bool },
Impossible,
}
fn evaluate(
policy: &EdgeDependencyPolicy,
success: i32,
fail: i32,
skip: i32,
total: i32,
) -> Decision {
let nonsuccess = fail + skip;
match policy {
EdgeDependencyPolicy::AllOf => {
if success == total && total > 0 {
Decision::Satisfied { cancel_siblings: false }
} else if nonsuccess > 0 {
Decision::Impossible
} else {
Decision::Pending
}
}
EdgeDependencyPolicy::AnyOf { on_satisfied } => {
if success >= 1 {
Decision::Satisfied {
cancel_siblings: matches!(on_satisfied, OnSatisfied::CancelRemaining),
}
} else if nonsuccess >= total && total > 0 {
Decision::Impossible
} else {
Decision::Pending
}
}
EdgeDependencyPolicy::Quorum { k, on_satisfied } => {
let k = *k as i32;
if success >= k {
Decision::Satisfied {
cancel_siblings: matches!(on_satisfied, OnSatisfied::CancelRemaining),
}
} else if total - nonsuccess < k {
Decision::Impossible
} else {
Decision::Pending
}
}
_ => Decision::Pending,
}
}
fn decode_policy(v: &JsonValue) -> EdgeDependencyPolicy {
let kind = v.get("kind").and_then(|k| k.as_str()).unwrap_or("all_of");
match kind {
"any_of" => EdgeDependencyPolicy::AnyOf {
on_satisfied: parse_on_satisfied(v),
},
"quorum" => {
let k = v
.get("k")
.and_then(|x| x.as_u64())
.and_then(|n| u32::try_from(n).ok())
.unwrap_or(1);
EdgeDependencyPolicy::Quorum {
k,
on_satisfied: parse_on_satisfied(v),
}
}
_ => EdgeDependencyPolicy::AllOf,
}
}
fn parse_on_satisfied(v: &JsonValue) -> OnSatisfied {
match v.get("on_satisfied").and_then(|x| x.as_str()) {
Some("let_run") => OnSatisfied::LetRun,
_ => OnSatisfied::CancelRemaining,
}
}
fn now_ms() -> i64 {
i64::try_from(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0),
)
.unwrap_or(i64::MAX)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn evaluate_all_of_satisfied() {
let d = evaluate(&EdgeDependencyPolicy::AllOf, 3, 0, 0, 3);
assert_eq!(d, Decision::Satisfied { cancel_siblings: false });
}
#[test]
fn evaluate_all_of_impossible_on_fail() {
let d = evaluate(&EdgeDependencyPolicy::AllOf, 2, 1, 0, 3);
assert_eq!(d, Decision::Impossible);
}
#[test]
fn evaluate_all_of_pending() {
let d = evaluate(&EdgeDependencyPolicy::AllOf, 1, 0, 0, 3);
assert_eq!(d, Decision::Pending);
}
#[test]
fn evaluate_any_of_cancels_siblings() {
let d = evaluate(
&EdgeDependencyPolicy::AnyOf {
on_satisfied: OnSatisfied::CancelRemaining,
},
1, 0, 0, 3,
);
assert_eq!(d, Decision::Satisfied { cancel_siblings: true });
}
#[test]
fn evaluate_any_of_let_run() {
let d = evaluate(
&EdgeDependencyPolicy::AnyOf {
on_satisfied: OnSatisfied::LetRun,
},
1, 0, 0, 3,
);
assert_eq!(d, Decision::Satisfied { cancel_siblings: false });
}
#[test]
fn evaluate_any_of_impossible_when_all_fail() {
let d = evaluate(
&EdgeDependencyPolicy::AnyOf {
on_satisfied: OnSatisfied::CancelRemaining,
},
0, 3, 0, 3,
);
assert_eq!(d, Decision::Impossible);
}
#[test]
fn evaluate_quorum_satisfied_at_k() {
let d = evaluate(
&EdgeDependencyPolicy::Quorum {
k: 2,
on_satisfied: OnSatisfied::LetRun,
},
2, 0, 1, 3,
);
assert_eq!(d, Decision::Satisfied { cancel_siblings: false });
}
#[test]
fn evaluate_quorum_impossible_when_headroom_exhausted() {
let d = evaluate(
&EdgeDependencyPolicy::Quorum {
k: 3,
on_satisfied: OnSatisfied::CancelRemaining,
},
0, 3, 0, 5,
);
assert_eq!(d, Decision::Impossible);
}
#[test]
fn outcome_kind_mapping() {
assert_eq!(OutcomeKind::from_str("success"), OutcomeKind::Success);
assert_eq!(OutcomeKind::from_str("failed"), OutcomeKind::Fail);
assert_eq!(OutcomeKind::from_str("skipped"), OutcomeKind::Skip);
assert_eq!(OutcomeKind::from_str("cancelled"), OutcomeKind::Fail);
}
}