use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use ff_core::contracts::decode::build_execution_snapshot;
use ff_core::contracts::{CreateExecutionArgs, ExecutionInfo, ExecutionSnapshot, ListExecutionsPage};
use ff_core::engine_error::{EngineError, ValidationKind};
use ff_core::partition::{PartitionConfig, PartitionKey};
use ff_core::state::{
AttemptState, BlockingReason, EligibilityState, LifecyclePhase, OwnershipState, PublicState,
StateVector, TerminalOutcome,
};
use ff_core::types::{ExecutionId, FlowId};
use serde_json::Value as JsonValue;
use sqlx::{PgPool, Row};
use uuid::Uuid;
use crate::error::map_sqlx_error;
fn eid_uuid(eid: &ExecutionId) -> Uuid {
let s = eid.as_str();
let suffix = s
.split_once("}:")
.map(|(_, u)| u)
.expect("ExecutionId has `}:` separator (invariant)");
Uuid::parse_str(suffix).expect("ExecutionId suffix is a valid UUID (invariant)")
}
fn eid_from_parts(partition: u16, uuid: Uuid) -> Result<ExecutionId, EngineError> {
let s = format!("{{fp:{partition}}}:{uuid}");
ExecutionId::parse(&s).map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("exec_core: execution_id: could not reassemble '{s}': {e}"),
})
}
fn now_ms() -> i64 {
let d = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("clock is after UNIX_EPOCH");
(d.as_millis() as i64).max(0)
}
pub(super) async fn create_execution_impl(
pool: &PgPool,
_partition_config: &PartitionConfig,
args: CreateExecutionArgs,
) -> Result<ExecutionId, EngineError> {
let partition_key: i16 = args.execution_id.partition() as i16;
let execution_id = eid_uuid(&args.execution_id);
let lane_id = args.lane_id.as_str().to_owned();
let priority: i32 = args.priority;
let created_at_ms: i64 = args.now.0;
let deadline_at_ms: Option<i64> = args.execution_deadline_at.map(|t| t.0);
let mut raw: serde_json::Map<String, JsonValue> = serde_json::Map::new();
raw.insert(
"namespace".into(),
JsonValue::String(args.namespace.as_str().to_owned()),
);
raw.insert("execution_kind".into(), JsonValue::String(args.execution_kind));
raw.insert(
"creator_identity".into(),
JsonValue::String(args.creator_identity),
);
if let Some(k) = args.idempotency_key {
raw.insert("idempotency_key".into(), JsonValue::String(k));
}
if let Some(enc) = args.payload_encoding {
raw.insert("payload_encoding".into(), JsonValue::String(enc));
}
raw.insert(
"last_mutation_at".into(),
JsonValue::String(created_at_ms.to_string()),
);
raw.insert(
"total_attempt_count".into(),
JsonValue::String("0".to_owned()),
);
let tags_json: serde_json::Map<String, JsonValue> = args
.tags
.into_iter()
.map(|(k, v)| (k, JsonValue::String(v)))
.collect();
raw.insert("tags".into(), JsonValue::Object(tags_json));
let raw_fields = JsonValue::Object(raw);
let policy_json: Option<JsonValue> = match args.policy {
Some(p) => Some(serde_json::to_value(&p).map_err(|e| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("create_execution: policy: serialize failed: {e}"),
})?),
None => None,
};
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
sqlx::query(
r#"
INSERT INTO ff_exec_core (
partition_key, execution_id, flow_id, lane_id,
required_capabilities, attempt_index,
lifecycle_phase, ownership_state, eligibility_state,
public_state, attempt_state,
priority, created_at_ms, deadline_at_ms,
payload, policy, raw_fields
) VALUES (
$1, $2, NULL, $3,
'{}'::text[], 0,
'submitted', 'unowned', 'eligible_now',
'waiting', 'pending',
$4, $5, $6,
$7, $8, $9
)
ON CONFLICT (partition_key, execution_id) DO NOTHING
"#,
)
.bind(partition_key)
.bind(execution_id)
.bind(&lane_id)
.bind(priority)
.bind(created_at_ms)
.bind(deadline_at_ms)
.bind(&args.input_payload)
.bind(policy_json)
.bind(&raw_fields)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
r#"
INSERT INTO ff_lane_registry (lane_id, registered_at_ms, registered_by)
VALUES ($1, $2, $3)
ON CONFLICT (lane_id) DO NOTHING
"#,
)
.bind(&lane_id)
.bind(created_at_ms)
.bind("create_execution")
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(args.execution_id)
}
pub(super) async fn describe_execution_impl(
pool: &PgPool,
_partition_config: &PartitionConfig,
id: &ExecutionId,
) -> Result<Option<ExecutionSnapshot>, EngineError> {
let partition_key: i16 = id.partition() as i16;
let execution_id = eid_uuid(id);
let row = sqlx::query(
r#"
SELECT flow_id, lane_id, public_state, blocking_reason,
created_at_ms, raw_fields
FROM ff_exec_core
WHERE partition_key = $1 AND execution_id = $2
"#,
)
.bind(partition_key)
.bind(execution_id)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
return Ok(None);
};
let flow_id_uuid: Option<Uuid> = row.try_get("flow_id").map_err(map_sqlx_error)?;
let lane_id: String = row.try_get("lane_id").map_err(map_sqlx_error)?;
let public_state: String = row.try_get("public_state").map_err(map_sqlx_error)?;
let blocking_reason: Option<String> =
row.try_get("blocking_reason").map_err(map_sqlx_error)?;
let created_at_ms: i64 = row.try_get("created_at_ms").map_err(map_sqlx_error)?;
let raw_fields: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
let mut core: HashMap<String, String> = HashMap::new();
core.insert("public_state".into(), public_state);
core.insert("lane_id".into(), lane_id);
if let Some(fid) = flow_id_uuid {
core.insert(
"flow_id".into(),
format!("{{fp:{part}}}:{fid}", part = id.partition()),
);
}
if let Some(r) = blocking_reason {
core.insert("blocking_reason".into(), r);
}
core.insert("created_at".into(), created_at_ms.to_string());
if let JsonValue::Object(map) = &raw_fields {
for key in [
"namespace",
"last_mutation_at",
"total_attempt_count",
"current_attempt_id",
"current_attempt_index",
"current_waitpoint_id",
"blocking_detail",
] {
if let Some(JsonValue::String(s)) = map.get(key) {
core.insert(key.to_owned(), s.clone());
}
}
}
let tags_raw: HashMap<String, String> = match &raw_fields {
JsonValue::Object(map) => match map.get("tags") {
Some(JsonValue::Object(tag_map)) => tag_map
.iter()
.filter_map(|(k, v)| {
v.as_str().map(|s| (k.clone(), s.to_owned()))
})
.collect(),
_ => HashMap::new(),
},
_ => HashMap::new(),
};
build_execution_snapshot(id.clone(), &core, tags_raw)
}
pub(super) async fn list_executions_impl(
pool: &PgPool,
_partition_config: &PartitionConfig,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListExecutionsPage, EngineError> {
if limit == 0 {
return Ok(ListExecutionsPage::new(Vec::new(), None));
}
let parsed = partition.parse().map_err(|e| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("list_executions: partition: '{partition}': {e}"),
})?;
let partition_key: i16 = parsed.index as i16;
let cursor_uuid: Option<Uuid> = cursor.as_ref().map(eid_uuid);
let effective_limit = limit.min(1000);
let fetch_limit: i64 = (effective_limit as i64).saturating_add(1);
let rows = sqlx::query(
r#"
SELECT execution_id
FROM ff_exec_core
WHERE partition_key = $1
AND ($2::uuid IS NULL OR execution_id > $2)
ORDER BY execution_id ASC
LIMIT $3
"#,
)
.bind(partition_key)
.bind(cursor_uuid)
.bind(fetch_limit)
.fetch_all(pool)
.await
.map_err(map_sqlx_error)?;
let mut ids: Vec<ExecutionId> = Vec::with_capacity(rows.len());
for row in &rows {
let u: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
ids.push(eid_from_parts(parsed.index, u)?);
}
let has_more = ids.len() > effective_limit;
if has_more {
ids.truncate(effective_limit);
}
let next_cursor = if has_more { ids.last().cloned() } else { None };
Ok(ListExecutionsPage::new(ids, next_cursor))
}
pub(super) async fn cancel_impl(
pool: &PgPool,
_partition_config: &PartitionConfig,
execution_id: &ExecutionId,
reason: &str,
) -> Result<(), EngineError> {
let partition_key: i16 = execution_id.partition() as i16;
let eid_uuid = eid_uuid(execution_id);
let now = now_ms();
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
let current: Option<(String, String)> = sqlx::query_as(
r#"
SELECT lifecycle_phase, public_state
FROM ff_exec_core
WHERE partition_key = $1 AND execution_id = $2
FOR UPDATE
"#,
)
.bind(partition_key)
.bind(eid_uuid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let Some((lifecycle_phase, public_state)) = current else {
tx.rollback().await.map_err(map_sqlx_error)?;
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!(
"cancel: execution_id={execution_id}: row not found on partition_key={partition_key}"
),
});
};
if lifecycle_phase == "terminal" {
tx.rollback().await.map_err(map_sqlx_error)?;
return if public_state == "cancelled" {
Ok(())
} else {
Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!(
"cancel: execution_id={execution_id}: already terminal in state '{public_state}'"
),
})
};
}
sqlx::query(
r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'terminal',
ownership_state = 'unowned',
eligibility_state = 'not_applicable',
public_state = 'cancelled',
attempt_state = 'cancelled',
terminal_at_ms = $3,
cancellation_reason = $4,
cancelled_by = 'worker',
raw_fields = jsonb_set(raw_fields, '{last_mutation_at}', to_jsonb($3::text))
WHERE partition_key = $1 AND execution_id = $2
"#,
)
.bind(partition_key)
.bind(eid_uuid)
.bind(now)
.bind(reason)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(())
}
pub(super) async fn resolve_execution_flow_id_impl(
pool: &PgPool,
_partition_config: &PartitionConfig,
eid: &ExecutionId,
) -> Result<Option<FlowId>, EngineError> {
let partition_key: i16 = eid.partition() as i16;
let execution_id = eid_uuid(eid);
let row: Option<(Option<Uuid>,)> = sqlx::query_as(
r#"
SELECT flow_id
FROM ff_exec_core
WHERE partition_key = $1 AND execution_id = $2
"#,
)
.bind(partition_key)
.bind(execution_id)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
let Some((maybe_fid,)) = row else {
return Ok(None);
};
let Some(fid_uuid) = maybe_fid else {
return Ok(None);
};
let s = fid_uuid.to_string();
FlowId::parse(&s)
.map(Some)
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!(
"resolve_execution_flow_id: exec_core.flow_id='{s}' is not a valid FlowId: {e}"
),
})
}
fn normalise_lifecycle_phase(raw: &str) -> &str {
match raw {
"cancelled" | "terminal" => "terminal",
"pending" | "runnable" | "eligible" | "blocked" => "runnable",
"active" => "active",
"suspended" => "suspended",
"submitted" => "submitted",
other => other,
}
}
fn normalise_ownership_state(raw: &str) -> &str {
match raw {
"released" | "unowned" => "unowned",
"leased" => "leased",
"lease_expired_reclaimable" => "lease_expired_reclaimable",
"lease_revoked" => "lease_revoked",
other => other,
}
}
fn normalise_eligibility_state(raw: &str) -> &str {
match raw {
"cancelled" => "not_applicable",
"pending_claim" => "eligible_now",
other => other,
}
}
fn normalise_attempt_state(raw: &str) -> &str {
match raw {
"pending" | "pending_claim" => "pending_first_attempt",
"running" => "running_attempt",
"cancelled" => "attempt_terminal",
other => other,
}
}
fn normalise_public_state(raw: &str) -> &str {
match raw {
"running" => "active",
other => other,
}
}
macro_rules! json_enum {
($ty:ty, $field:expr, $raw:expr) => {{
let quoted = format!("\"{}\"", $raw);
serde_json::from_str::<$ty>("ed).map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!(
"exec_core: {}: '{}' is not a known value: {}",
$field, $raw, e
),
})
}};
}
fn derive_terminal_outcome(
phase_norm: &str,
phase_raw: &str,
attempt_outcome: Option<&str>,
) -> TerminalOutcome {
if phase_norm != "terminal" {
return TerminalOutcome::None;
}
if phase_raw == "cancelled" {
return TerminalOutcome::Cancelled;
}
match attempt_outcome {
Some("success") => TerminalOutcome::Success,
Some("failed") => TerminalOutcome::Failed,
Some("cancelled") => TerminalOutcome::Cancelled,
Some("expired") => TerminalOutcome::Expired,
Some("skipped") => TerminalOutcome::Skipped,
_ => TerminalOutcome::None,
}
}
pub(super) async fn read_execution_state_impl(
pool: &PgPool,
_partition_config: &PartitionConfig,
id: &ExecutionId,
) -> Result<Option<PublicState>, EngineError> {
let partition_key: i16 = id.partition() as i16;
let execution_id = eid_uuid(id);
let row: Option<(String,)> = sqlx::query_as(
r#"
SELECT public_state
FROM ff_exec_core
WHERE partition_key = $1 AND execution_id = $2
"#,
)
.bind(partition_key)
.bind(execution_id)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
let Some((raw,)) = row else {
return Ok(None);
};
let parsed: PublicState =
json_enum!(PublicState, "public_state", normalise_public_state(&raw))?;
Ok(Some(parsed))
}
pub(super) async fn read_execution_info_impl(
pool: &PgPool,
_partition_config: &PartitionConfig,
id: &ExecutionId,
) -> Result<Option<ExecutionInfo>, EngineError> {
let partition_key: i16 = id.partition() as i16;
let execution_id = eid_uuid(id);
let row = sqlx::query(
r#"
SELECT ec.flow_id,
ec.lane_id,
ec.priority,
ec.lifecycle_phase,
ec.ownership_state,
ec.eligibility_state,
ec.public_state,
ec.attempt_state,
ec.blocking_reason,
ec.attempt_index,
ec.created_at_ms,
ec.terminal_at_ms,
ec.raw_fields,
cur.outcome AS attempt_outcome,
first_att.started_at_ms AS first_started_at_ms
FROM ff_exec_core ec
LEFT JOIN LATERAL (
SELECT outcome
FROM ff_attempt
WHERE partition_key = ec.partition_key
AND execution_id = ec.execution_id
AND attempt_index = ec.attempt_index
) cur ON TRUE
LEFT JOIN LATERAL (
SELECT started_at_ms
FROM ff_attempt
WHERE partition_key = ec.partition_key
AND execution_id = ec.execution_id
AND started_at_ms IS NOT NULL
ORDER BY attempt_index ASC
LIMIT 1
) first_att ON TRUE
WHERE ec.partition_key = $1 AND ec.execution_id = $2
"#,
)
.bind(partition_key)
.bind(execution_id)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
return Ok(None);
};
let flow_id_uuid: Option<Uuid> = row.try_get("flow_id").map_err(map_sqlx_error)?;
let lane_id: String = row.try_get("lane_id").map_err(map_sqlx_error)?;
let priority: i32 = row.try_get("priority").map_err(map_sqlx_error)?;
let lifecycle_phase_raw: String =
row.try_get("lifecycle_phase").map_err(map_sqlx_error)?;
let ownership_state_raw: String =
row.try_get("ownership_state").map_err(map_sqlx_error)?;
let eligibility_state_raw: String =
row.try_get("eligibility_state").map_err(map_sqlx_error)?;
let public_state_raw: String = row.try_get("public_state").map_err(map_sqlx_error)?;
let attempt_state_raw: String = row.try_get("attempt_state").map_err(map_sqlx_error)?;
let blocking_reason_opt: Option<String> =
row.try_get("blocking_reason").map_err(map_sqlx_error)?;
let attempt_index: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
let created_at_ms: i64 = row.try_get("created_at_ms").map_err(map_sqlx_error)?;
let terminal_at_ms_opt: Option<i64> =
row.try_get("terminal_at_ms").map_err(map_sqlx_error)?;
let raw_fields: JsonValue = row.try_get("raw_fields").map_err(map_sqlx_error)?;
let attempt_outcome_opt: Option<String> =
row.try_get("attempt_outcome").map_err(map_sqlx_error)?;
let first_started_at_ms_opt: Option<i64> =
row.try_get("first_started_at_ms").map_err(map_sqlx_error)?;
let lifecycle_phase: LifecyclePhase = json_enum!(
LifecyclePhase,
"lifecycle_phase",
normalise_lifecycle_phase(&lifecycle_phase_raw)
)?;
let ownership_state: OwnershipState = json_enum!(
OwnershipState,
"ownership_state",
normalise_ownership_state(&ownership_state_raw)
)?;
let eligibility_state: EligibilityState = json_enum!(
EligibilityState,
"eligibility_state",
normalise_eligibility_state(&eligibility_state_raw)
)?;
let public_state: PublicState = json_enum!(
PublicState,
"public_state",
normalise_public_state(&public_state_raw)
)?;
let attempt_state: AttemptState = json_enum!(
AttemptState,
"attempt_state",
normalise_attempt_state(&attempt_state_raw)
)?;
let blocking_reason: BlockingReason = match blocking_reason_opt
.as_deref()
.filter(|s| !s.is_empty())
{
None => BlockingReason::None,
Some(raw) => json_enum!(BlockingReason, "blocking_reason", raw)?,
};
let terminal_outcome = derive_terminal_outcome(
normalise_lifecycle_phase(&lifecycle_phase_raw),
&lifecycle_phase_raw,
attempt_outcome_opt.as_deref(),
);
let state_vector = StateVector {
lifecycle_phase,
ownership_state,
eligibility_state,
blocking_reason,
terminal_outcome,
attempt_state,
public_state,
};
let mut namespace = String::new();
let mut execution_kind = String::new();
let mut blocking_detail = String::new();
if let JsonValue::Object(map) = &raw_fields {
if let Some(JsonValue::String(s)) = map.get("namespace") {
namespace = s.clone();
}
if let Some(JsonValue::String(s)) = map.get("execution_kind") {
execution_kind = s.clone();
}
if let Some(JsonValue::String(s)) = map.get("blocking_detail") {
blocking_detail = s.clone();
}
}
let flow_id = flow_id_uuid.map(|fid| fid.to_string());
Ok(Some(ExecutionInfo {
execution_id: id.clone(),
namespace,
lane_id,
priority,
execution_kind,
state_vector,
public_state,
created_at: created_at_ms.to_string(),
started_at: first_started_at_ms_opt.map(|v| v.to_string()),
completed_at: terminal_at_ms_opt.map(|v| v.to_string()),
current_attempt_index: attempt_index.max(0) as u32,
flow_id,
blocking_detail,
}))
}
pub(super) async fn get_execution_result_impl(
pool: &PgPool,
_partition_config: &PartitionConfig,
id: &ExecutionId,
) -> Result<Option<Vec<u8>>, EngineError> {
let partition_key: i16 = id.partition() as i16;
let execution_id = eid_uuid(id);
let row: Option<(Option<Vec<u8>>,)> = sqlx::query_as(
r#"
SELECT result
FROM ff_exec_core
WHERE partition_key = $1 AND execution_id = $2
"#,
)
.bind(partition_key)
.bind(execution_id)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
match row {
None => Ok(None),
Some((payload,)) => Ok(payload),
}
}