use std::collections::BTreeMap;
use std::time::Duration;
use ff_core::backend::CancelFlowPolicy;
use ff_core::contracts::{
CancelFlowResult, EdgeDependencyPolicy, EdgeDirection, EdgeGroupSnapshot, EdgeGroupState,
EdgeSnapshot, FlowSnapshot, FlowStatus, FlowSummary, ListFlowsPage, OnSatisfied,
SetEdgeGroupPolicyResult,
};
use ff_core::engine_error::{
ContentionKind, EngineError, ValidationKind,
};
use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
use ff_core::types::{EdgeId, ExecutionId, FlowId, Namespace, TimestampMs};
use serde_json::Value as JsonValue;
use sqlx::postgres::PgRow;
use sqlx::{PgPool, Row};
use uuid::Uuid;
use crate::error::map_sqlx_error;
const CANCEL_FLOW_MAX_ATTEMPTS: u32 = 3;
fn partition_index_from_key(key: &PartitionKey) -> Result<i16, EngineError> {
let p = key.parse().map_err(|e| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("partition_key: {e}"),
})?;
Ok(p.index as i16)
}
fn flow_partition_byte(
flow_id: &FlowId,
partition_config: &ff_core::partition::PartitionConfig,
) -> i16 {
ff_core::partition::flow_partition(flow_id, partition_config).index as i16
}
fn raw_str<'a>(raw: &'a JsonValue, key: &str) -> Option<&'a str> {
raw.get(key).and_then(|v| v.as_str())
}
fn raw_u64(raw: &JsonValue, key: &str) -> Option<u64> {
raw.get(key).and_then(|v| {
v.as_u64()
.or_else(|| v.as_str().and_then(|s| s.parse::<u64>().ok()))
})
}
fn raw_u32(raw: &JsonValue, key: &str) -> Option<u32> {
raw.get(key).and_then(|v| {
v.as_u64()
.or_else(|| v.as_str().and_then(|s| s.parse::<u64>().ok()))
.and_then(|n| u32::try_from(n).ok())
})
}
fn extract_tags(raw: &JsonValue) -> BTreeMap<String, String> {
let mut tags = BTreeMap::new();
let Some(obj) = raw.as_object() else {
return tags;
};
for (k, v) in obj {
if !is_namespaced_tag_key(k) {
continue;
}
if let Some(s) = v.as_str() {
tags.insert(k.clone(), s.to_owned());
}
}
tags
}
fn is_namespaced_tag_key(k: &str) -> bool {
let mut chars = k.chars();
let Some(first) = chars.next() else {
return false;
};
if !first.is_ascii_lowercase() {
return false;
}
let mut saw_dot = false;
for c in chars {
if c == '.' {
saw_dot = true;
break;
}
if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
return false;
}
}
saw_dot
}
fn parse_on_satisfied(s: &str) -> OnSatisfied {
match s {
"let_run" => OnSatisfied::LetRun,
_ => OnSatisfied::CancelRemaining,
}
}
fn decode_edge_policy(v: &JsonValue) -> EdgeDependencyPolicy {
let kind = v.get("kind").and_then(|k| k.as_str()).unwrap_or("all_of");
match kind {
"any_of" => {
let on = v
.get("on_satisfied")
.and_then(|x| x.as_str())
.map(parse_on_satisfied)
.unwrap_or(OnSatisfied::CancelRemaining);
EdgeDependencyPolicy::AnyOf { on_satisfied: on }
}
"quorum" => {
let k = v
.get("k")
.and_then(|x| x.as_u64())
.and_then(|n| u32::try_from(n).ok())
.unwrap_or(1);
let on = v
.get("on_satisfied")
.and_then(|x| x.as_str())
.map(parse_on_satisfied)
.unwrap_or(OnSatisfied::CancelRemaining);
EdgeDependencyPolicy::Quorum { k, on_satisfied: on }
}
_ => EdgeDependencyPolicy::AllOf,
}
}
fn encode_edge_policy(p: &EdgeDependencyPolicy) -> JsonValue {
match p {
EdgeDependencyPolicy::AllOf => serde_json::json!({ "kind": "all_of" }),
EdgeDependencyPolicy::AnyOf { on_satisfied } => serde_json::json!({
"kind": "any_of",
"on_satisfied": on_satisfied.variant_str(),
}),
EdgeDependencyPolicy::Quorum { k, on_satisfied } => serde_json::json!({
"kind": "quorum",
"k": k,
"on_satisfied": on_satisfied.variant_str(),
}),
_ => serde_json::json!({ "kind": "all_of" }),
}
}
fn decode_edge_group_row(row: &PgRow) -> Result<EdgeGroupSnapshot, EngineError> {
let downstream_uuid: Uuid = row.get("downstream_eid");
let policy_raw: JsonValue = row.get("policy");
let k_target: i32 = row.get("k_target");
let success_count: i32 = row.get("success_count");
let fail_count: i32 = row.get("fail_count");
let skip_count: i32 = row.get("skip_count");
let running_count: i32 = row.get("running_count");
let part: i16 = row.get("partition_key");
let downstream_id = ExecutionId::parse(&format!("{{fp:{part}}}:{downstream_uuid}"))
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("ff_edge_group.downstream_eid: {e}"),
})?;
let policy = decode_edge_policy(&policy_raw);
let total: u32 = success_count.max(0) as u32
+ fail_count.max(0) as u32
+ skip_count.max(0) as u32
+ running_count.max(0) as u32;
let state = if k_target > 0 && success_count >= k_target {
EdgeGroupState::Satisfied
} else {
EdgeGroupState::Pending
};
Ok(EdgeGroupSnapshot::new(
downstream_id,
policy,
total,
success_count.max(0) as u32,
fail_count.max(0) as u32,
skip_count.max(0) as u32,
running_count.max(0) as u32,
state,
))
}
fn decode_flow_row(
flow_id: FlowId,
row: &PgRow,
edge_groups: Vec<EdgeGroupSnapshot>,
) -> Result<FlowSnapshot, EngineError> {
let public_flow_state: String = row.get("public_flow_state");
let graph_revision_i: i64 = row.get("graph_revision");
let created_at_ms: i64 = row.get("created_at_ms");
let terminal_at_ms: Option<i64> = row.get("terminal_at_ms");
let raw_fields: JsonValue = row.get("raw_fields");
let flow_kind = raw_str(&raw_fields, "flow_kind")
.unwrap_or("")
.to_owned();
let namespace_str = raw_str(&raw_fields, "namespace").unwrap_or("default");
let namespace = Namespace::new(namespace_str.to_owned());
let node_count = raw_u32(&raw_fields, "node_count").unwrap_or(0);
let edge_count = raw_u32(&raw_fields, "edge_count").unwrap_or(0);
let last_mutation_at_ms =
raw_u64(&raw_fields, "last_mutation_at_ms").map(|n| n as i64).unwrap_or(created_at_ms);
let cancelled_at = terminal_at_ms.map(TimestampMs);
let cancel_reason = raw_str(&raw_fields, "cancel_reason").map(str::to_owned);
let cancellation_policy = raw_str(&raw_fields, "cancellation_policy").map(str::to_owned);
let tags = extract_tags(&raw_fields);
let graph_revision = u64::try_from(graph_revision_i).unwrap_or(0);
Ok(FlowSnapshot::new(
flow_id,
flow_kind,
namespace,
public_flow_state,
graph_revision,
node_count,
edge_count,
TimestampMs(created_at_ms),
TimestampMs(last_mutation_at_ms),
cancelled_at,
cancel_reason,
cancellation_policy,
tags,
edge_groups,
))
}
fn decode_edge_row(row: &PgRow, flow_id: &FlowId) -> Result<EdgeSnapshot, EngineError> {
let edge_uuid: Uuid = row.get("edge_id");
let upstream_uuid: Uuid = row.get("upstream_eid");
let downstream_uuid: Uuid = row.get("downstream_eid");
let part: i16 = row.get("partition_key");
let policy_raw: JsonValue = row.get("policy");
let upstream = ExecutionId::parse(&format!("{{fp:{part}}}:{upstream_uuid}"))
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("ff_edge.upstream_eid: {e}"),
})?;
let downstream = ExecutionId::parse(&format!("{{fp:{part}}}:{downstream_uuid}"))
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("ff_edge.downstream_eid: {e}"),
})?;
let dependency_kind = raw_str(&policy_raw, "dependency_kind")
.unwrap_or("success_only")
.to_owned();
let satisfaction_condition = raw_str(&policy_raw, "satisfaction_condition")
.unwrap_or("all_required")
.to_owned();
let data_passing_ref = raw_str(&policy_raw, "data_passing_ref")
.filter(|s| !s.is_empty())
.map(str::to_owned);
let edge_state = raw_str(&policy_raw, "edge_state")
.unwrap_or("pending")
.to_owned();
let created_at_ms =
raw_u64(&policy_raw, "created_at_ms").map(|n| n as i64).unwrap_or(0);
let created_by = raw_str(&policy_raw, "created_by")
.unwrap_or("engine")
.to_owned();
Ok(EdgeSnapshot::new(
EdgeId::from_uuid(edge_uuid),
flow_id.clone(),
upstream,
downstream,
dependency_kind,
satisfaction_condition,
data_passing_ref,
edge_state,
TimestampMs(created_at_ms),
created_by,
))
}
pub async fn describe_flow(
pool: &PgPool,
partition_config: &ff_core::partition::PartitionConfig,
id: &FlowId,
) -> Result<Option<FlowSnapshot>, EngineError> {
let part = flow_partition_byte(id, partition_config);
let flow_uuid: Uuid = id.0;
let flow_row_opt = sqlx::query(
"SELECT partition_key, flow_id, graph_revision, public_flow_state, \
created_at_ms, terminal_at_ms, raw_fields \
FROM ff_flow_core \
WHERE partition_key = $1 AND flow_id = $2",
)
.bind(part)
.bind(flow_uuid)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
let Some(flow_row) = flow_row_opt else {
return Ok(None);
};
let eg_rows = sqlx::query(
"SELECT partition_key, flow_id, downstream_eid, policy, \
k_target, success_count, fail_count, skip_count, running_count \
FROM ff_edge_group \
WHERE partition_key = $1 AND flow_id = $2 \
ORDER BY downstream_eid",
)
.bind(part)
.bind(flow_uuid)
.fetch_all(pool)
.await
.map_err(map_sqlx_error)?;
let mut edge_groups = Vec::with_capacity(eg_rows.len());
for row in &eg_rows {
edge_groups.push(decode_edge_group_row(row)?);
}
decode_flow_row(id.clone(), &flow_row, edge_groups).map(Some)
}
pub async fn list_flows(
pool: &PgPool,
partition: PartitionKey,
cursor: Option<FlowId>,
limit: usize,
) -> Result<ListFlowsPage, EngineError> {
if limit == 0 {
return Ok(ListFlowsPage::new(Vec::new(), None));
}
let part = partition_index_from_key(&partition)?;
let cursor_uuid: Option<Uuid> = cursor.as_ref().map(|f| f.0);
let fetch_limit = (limit + 1) as i64;
let rows = sqlx::query(
"SELECT flow_id, created_at_ms, public_flow_state \
FROM ff_flow_core \
WHERE partition_key = $1 \
AND ($2::uuid IS NULL OR flow_id > $2) \
ORDER BY flow_id \
LIMIT $3",
)
.bind(part)
.bind(cursor_uuid)
.bind(fetch_limit)
.fetch_all(pool)
.await
.map_err(map_sqlx_error)?;
let mut flows: Vec<FlowSummary> = Vec::with_capacity(rows.len().min(limit));
let mut next_cursor: Option<FlowId> = None;
for (idx, row) in rows.iter().enumerate() {
if idx >= limit {
if let Some(last) = flows.last() {
next_cursor = Some(last.flow_id.clone());
}
break;
}
let flow_uuid: Uuid = row.get("flow_id");
let created_at_ms: i64 = row.get("created_at_ms");
let public_state: String = row.get("public_flow_state");
let status = FlowStatus::from_public_flow_state(&public_state);
flows.push(FlowSummary::new(
FlowId::from_uuid(flow_uuid),
TimestampMs(created_at_ms),
status,
));
}
Ok(ListFlowsPage::new(flows, next_cursor))
}
pub async fn list_edges(
pool: &PgPool,
partition_config: &ff_core::partition::PartitionConfig,
flow_id: &FlowId,
direction: EdgeDirection,
) -> Result<Vec<EdgeSnapshot>, EngineError> {
let part = flow_partition_byte(flow_id, partition_config);
let flow_uuid: Uuid = flow_id.0;
let (column_filter, subject_eid) = match &direction {
EdgeDirection::Outgoing { from_node } => ("upstream_eid", from_node),
EdgeDirection::Incoming { to_node } => ("downstream_eid", to_node),
};
let subject_uuid = parse_exec_uuid(subject_eid)?;
let sql = format!(
"SELECT partition_key, flow_id, edge_id, upstream_eid, downstream_eid, policy \
FROM ff_edge \
WHERE partition_key = $1 AND flow_id = $2 AND {column_filter} = $3 \
ORDER BY edge_id"
);
let rows = sqlx::query(&sql)
.bind(part)
.bind(flow_uuid)
.bind(subject_uuid)
.fetch_all(pool)
.await
.map_err(map_sqlx_error)?;
let mut out = Vec::with_capacity(rows.len());
for row in &rows {
out.push(decode_edge_row(row, flow_id)?);
}
Ok(out)
}
pub async fn describe_edge(
pool: &PgPool,
partition_config: &ff_core::partition::PartitionConfig,
flow_id: &FlowId,
edge_id: &EdgeId,
) -> Result<Option<EdgeSnapshot>, EngineError> {
let part = flow_partition_byte(flow_id, partition_config);
let row_opt = sqlx::query(
"SELECT partition_key, flow_id, edge_id, upstream_eid, downstream_eid, policy \
FROM ff_edge \
WHERE partition_key = $1 AND flow_id = $2 AND edge_id = $3",
)
.bind(part)
.bind(flow_id.0)
.bind(edge_id.0)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
match row_opt {
Some(row) => decode_edge_row(&row, flow_id).map(Some),
None => Ok(None),
}
}
fn parse_exec_uuid(eid: &ExecutionId) -> Result<Uuid, EngineError> {
let s = eid.as_str();
let Some(colon) = s.rfind("}:") else {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("execution_id missing '}}:' delimiter: {s}"),
});
};
let tail = &s[colon + 2..];
Uuid::parse_str(tail).map_err(|e| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("execution_id uuid suffix parse: {e}"),
})
}
fn cancel_policy_to_str(p: CancelFlowPolicy) -> &'static str {
match p {
CancelFlowPolicy::FlowOnly => "cancel_flow_only",
CancelFlowPolicy::CancelAll => "cancel_all",
CancelFlowPolicy::CancelPending => "cancel_pending",
_ => "cancel_all",
}
}
pub async fn cancel_flow(
pool: &PgPool,
partition_config: &ff_core::partition::PartitionConfig,
id: &FlowId,
policy: CancelFlowPolicy,
) -> Result<CancelFlowResult, EngineError> {
let part = flow_partition_byte(id, partition_config);
let flow_uuid: Uuid = id.0;
let policy_str = cancel_policy_to_str(policy);
let mut last_transport: Option<EngineError> = None;
for attempt in 0..CANCEL_FLOW_MAX_ATTEMPTS {
match cancel_flow_once(pool, part, flow_uuid, policy, policy_str).await {
Ok(result) => return Ok(result),
Err(err) => {
if is_serialization_conflict(&err) {
if attempt + 1 < CANCEL_FLOW_MAX_ATTEMPTS {
let ms = 5u64 * (1u64 << attempt).saturating_sub(0);
tokio::time::sleep(Duration::from_millis(ms)).await;
}
last_transport = Some(err);
continue;
}
return Err(err);
}
}
}
let _ = last_transport; Err(EngineError::Contention(ContentionKind::RetryExhausted))
}
fn is_serialization_conflict(err: &EngineError) -> bool {
matches!(
err,
EngineError::Contention(ContentionKind::LeaseConflict)
)
}
async fn cancel_flow_once(
pool: &PgPool,
part: i16,
flow_uuid: Uuid,
policy: CancelFlowPolicy,
policy_str: &str,
) -> Result<CancelFlowResult, EngineError> {
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let flow_found = sqlx::query(
"UPDATE ff_flow_core \
SET public_flow_state = 'cancelled', \
terminal_at_ms = COALESCE(terminal_at_ms, \
(extract(epoch from clock_timestamp())*1000)::bigint), \
raw_fields = raw_fields \
|| jsonb_build_object('cancellation_policy', $3::text) \
WHERE partition_key = $1 AND flow_id = $2 \
RETURNING flow_id",
)
.bind(part)
.bind(flow_uuid)
.bind(policy_str)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
if flow_found.is_none() {
tx.commit().await.map_err(map_sqlx_error)?;
return Ok(CancelFlowResult::Cancelled {
cancellation_policy: policy_str.to_owned(),
member_execution_ids: Vec::new(),
});
}
let member_rows = if matches!(policy, CancelFlowPolicy::FlowOnly) {
Vec::new()
} else {
let state_filter: &str = match policy {
CancelFlowPolicy::CancelAll => {
"lifecycle_phase NOT IN ('completed','failed','cancelled','expired')"
}
CancelFlowPolicy::CancelPending => {
"lifecycle_phase IN ('pending','blocked','eligible')"
}
_ => "lifecycle_phase NOT IN ('completed','failed','cancelled','expired')",
};
let sql = format!(
"SELECT execution_id FROM ff_exec_core \
WHERE partition_key = $1 AND flow_id = $2 AND {state_filter}"
);
sqlx::query(&sql)
.bind(part)
.bind(flow_uuid)
.fetch_all(&mut *tx)
.await
.map_err(map_sqlx_error)?
};
let mut member_execution_ids: Vec<String> = Vec::with_capacity(member_rows.len());
for row in &member_rows {
let exec_uuid: Uuid = row.get("execution_id");
sqlx::query(
"UPDATE ff_exec_core \
SET lifecycle_phase = 'cancelled', \
eligibility_state = 'cancelled', \
public_state = 'cancelled', \
terminal_at_ms = COALESCE(terminal_at_ms, \
(extract(epoch from clock_timestamp())*1000)::bigint), \
cancellation_reason = COALESCE(cancellation_reason, 'flow_cancelled'), \
cancelled_by = COALESCE(cancelled_by, 'cancel_flow') \
WHERE partition_key = $1 AND execution_id = $2",
)
.bind(part)
.bind(exec_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
"INSERT INTO ff_completion_event \
(partition_key, execution_id, flow_id, outcome, occurred_at_ms) \
VALUES ($1, $2, $3, 'cancelled', \
(extract(epoch from clock_timestamp())*1000)::bigint)",
)
.bind(part)
.bind(exec_uuid)
.bind(flow_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
"INSERT INTO ff_lease_event \
(execution_id, lease_id, event_type, occurred_at_ms, partition_key) \
VALUES ($1, NULL, 'revoked', \
(extract(epoch from clock_timestamp())*1000)::bigint, $2)",
)
.bind(exec_uuid.to_string())
.bind(i32::from(part))
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
member_execution_ids.push(format!("{{fp:{part}}}:{exec_uuid}"));
}
if matches!(policy, CancelFlowPolicy::CancelPending) {
sqlx::query(
"INSERT INTO ff_pending_cancel_groups \
(partition_key, flow_id, downstream_eid, enqueued_at_ms) \
SELECT partition_key, flow_id, downstream_eid, \
(extract(epoch from clock_timestamp())*1000)::bigint \
FROM ff_edge_group \
WHERE partition_key = $1 AND flow_id = $2 AND running_count > 0 \
ON CONFLICT DO NOTHING",
)
.bind(part)
.bind(flow_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
}
tx.commit().await.map_err(map_sqlx_error)?;
Ok(CancelFlowResult::Cancelled {
cancellation_policy: policy_str.to_owned(),
member_execution_ids,
})
}
pub async fn set_edge_group_policy(
pool: &PgPool,
partition_config: &ff_core::partition::PartitionConfig,
flow_id: &FlowId,
downstream_execution_id: &ExecutionId,
policy: EdgeDependencyPolicy,
) -> Result<SetEdgeGroupPolicyResult, EngineError> {
match &policy {
EdgeDependencyPolicy::AllOf => {}
EdgeDependencyPolicy::AnyOf { .. } => {}
EdgeDependencyPolicy::Quorum { k, .. } => {
if *k == 0 {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "quorum k must be >= 1".to_string(),
});
}
}
_ => {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "unknown EdgeDependencyPolicy variant".to_string(),
});
}
}
let part = flow_partition_byte(flow_id, partition_config);
let flow_uuid: Uuid = flow_id.0;
let downstream_uuid = parse_exec_uuid(downstream_execution_id)?;
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
let already_staged: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM ff_edge \
WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
)
.bind(part)
.bind(flow_uuid)
.bind(downstream_uuid)
.fetch_one(&mut *tx)
.await
.map_err(map_sqlx_error)?;
if already_staged > 0 {
let _ = tx.rollback().await;
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "edge_group_policy_already_fixed: dependencies already staged".to_string(),
});
}
let existing: Option<JsonValue> = sqlx::query_scalar(
"SELECT policy FROM ff_edge_group \
WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
)
.bind(part)
.bind(flow_uuid)
.bind(downstream_uuid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let encoded = encode_edge_policy(&policy);
if let Some(existing_policy) = existing
&& existing_policy == encoded
{
tx.commit().await.map_err(map_sqlx_error)?;
return Ok(SetEdgeGroupPolicyResult::AlreadySet);
}
sqlx::query(
"INSERT INTO ff_edge_group \
(partition_key, flow_id, downstream_eid, policy) \
VALUES ($1, $2, $3, $4) \
ON CONFLICT (partition_key, flow_id, downstream_eid) \
DO UPDATE SET policy = EXCLUDED.policy",
)
.bind(part)
.bind(flow_uuid)
.bind(downstream_uuid)
.bind(&encoded)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(SetEdgeGroupPolicyResult::Set)
}
#[allow(dead_code)]
fn _unused_imports_anchor(_p: Partition, _f: PartitionFamily) {}