use ff_core::backend::{
BackendTag, CapabilitySet, ClaimPolicy, FailOutcome, FailureClass, FailureReason, Handle,
HandleKind, LeaseRenewal, ReclaimToken,
};
use ff_core::caps::{matches as caps_matches, CapabilityRequirement};
use ff_core::engine_error::{ContentionKind, EngineError};
use ff_core::handle_codec::{decode as decode_opaque, encode as encode_opaque, HandlePayload};
use ff_core::types::{
AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseEpoch, LeaseId, TimestampMs,
};
use sqlx::{PgPool, Row};
use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;
use crate::error::map_sqlx_error;
fn now_ms() -> i64 {
i64::try_from(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0),
)
.unwrap_or(i64::MAX)
}
fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
let s = eid.as_str();
let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::InvalidInput,
detail: format!("execution_id missing `{{fp:` prefix: {s}"),
})?;
let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::InvalidInput,
detail: format!("execution_id missing `}}:`: {s}"),
})?;
let part: i16 = rest[..close]
.parse()
.map_err(|_| EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::InvalidInput,
detail: format!("execution_id partition index not u16: {s}"),
})?;
let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::InvalidInput,
detail: format!("execution_id UUID invalid: {s}"),
})?;
Ok((part, uuid))
}
fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
if handle.backend != BackendTag::Postgres {
return Err(EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::Corruption,
detail: format!(
"handle minted by {:?} passed to Postgres backend",
handle.backend
),
});
}
let decoded = decode_opaque(&handle.opaque)?;
if decoded.tag != BackendTag::Postgres {
return Err(EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::Corruption,
detail: format!("inner handle tag mismatch: {:?}", decoded.tag),
});
}
Ok(decoded.payload)
}
fn mint_handle(payload: HandlePayload, kind: HandleKind) -> Handle {
let op = encode_opaque(BackendTag::Postgres, &payload);
Handle::new(BackendTag::Postgres, kind, op)
}
pub(crate) async fn claim(
pool: &PgPool,
lane: &LaneId,
capabilities: &CapabilitySet,
policy: &ClaimPolicy,
) -> Result<Option<Handle>, EngineError> {
let total_partitions: i16 = 256;
for part in 0..total_partitions {
match try_claim_in_partition(pool, part, lane, capabilities, policy).await? {
Some(h) => return Ok(Some(h)),
None => continue,
}
}
Ok(None)
}
async fn try_claim_in_partition(
pool: &PgPool,
part: i16,
lane: &LaneId,
capabilities: &CapabilitySet,
policy: &ClaimPolicy,
) -> Result<Option<Handle>, EngineError> {
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
let row = sqlx::query(
r#"
SELECT execution_id, required_capabilities, attempt_index
FROM ff_exec_core
WHERE partition_key = $1
AND lane_id = $2
AND lifecycle_phase = 'runnable'
AND eligibility_state = 'eligible_now'
ORDER BY priority DESC, created_at_ms ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
"#,
)
.bind(part)
.bind(lane.as_str())
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(None);
};
let exec_uuid: Uuid = row.try_get("execution_id").map_err(map_sqlx_error)?;
let required_caps: Vec<String> = row
.try_get::<Vec<String>, _>("required_capabilities")
.map_err(map_sqlx_error)?;
let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
let req = CapabilityRequirement::new(required_caps);
if !caps_matches(&req, capabilities) {
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(None);
}
let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
let now = now_ms();
let lease_ttl_ms = i64::from(policy.lease_ttl_ms);
let expires = now.saturating_add(lease_ttl_ms);
sqlx::query(
r#"
INSERT INTO ff_attempt (
partition_key, execution_id, attempt_index,
worker_id, worker_instance_id,
lease_epoch, lease_expires_at_ms, started_at_ms
) VALUES ($1, $2, $3, $4, $5, 1, $6, $7)
ON CONFLICT (partition_key, execution_id, attempt_index)
DO UPDATE SET
worker_id = EXCLUDED.worker_id,
worker_instance_id = EXCLUDED.worker_instance_id,
lease_epoch = ff_attempt.lease_epoch + 1,
lease_expires_at_ms = EXCLUDED.lease_expires_at_ms,
started_at_ms = EXCLUDED.started_at_ms,
outcome = NULL
"#,
)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index_i)
.bind(policy.worker_id.as_str())
.bind(policy.worker_instance_id.as_str())
.bind(expires)
.bind(now)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let epoch_row = sqlx::query(
r#"
SELECT lease_epoch FROM ff_attempt
WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
"#,
)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index_i)
.fetch_one(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let epoch_i: i64 = epoch_row.try_get("lease_epoch").map_err(map_sqlx_error)?;
sqlx::query(
r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'active',
ownership_state = 'leased',
eligibility_state = 'not_applicable',
attempt_state = 'running_attempt'
WHERE partition_key = $1 AND execution_id = $2
"#,
)
.bind(part)
.bind(exec_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
let exec_id = ExecutionId::parse(&format!("{{fp:{part}}}:{exec_uuid}")).map_err(|e| {
EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::InvalidInput,
detail: format!("reassembling exec id: {e}"),
}
})?;
let payload = HandlePayload::new(
exec_id,
attempt_index,
AttemptId::new(),
LeaseId::new(),
LeaseEpoch(u64::try_from(epoch_i).unwrap_or(1)),
u64::from(policy.lease_ttl_ms),
lane.clone(),
policy.worker_instance_id.clone(),
);
Ok(Some(mint_handle(payload, HandleKind::Fresh)))
}
pub(crate) async fn claim_from_reclaim(
pool: &PgPool,
token: ReclaimToken,
) -> Result<Option<Handle>, EngineError> {
let eid = &token.grant.execution_id;
let (part, exec_uuid) = split_exec_id(eid)?;
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
let row = sqlx::query(
r#"
SELECT attempt_index, lease_epoch, lease_expires_at_ms
FROM ff_attempt
WHERE partition_key = $1 AND execution_id = $2
ORDER BY attempt_index DESC
FOR UPDATE
LIMIT 1
"#,
)
.bind(part)
.bind(exec_uuid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
tx.rollback().await.map_err(map_sqlx_error)?;
return Err(EngineError::NotFound { entity: "attempt" });
};
let attempt_index_i: i32 = row.try_get("attempt_index").map_err(map_sqlx_error)?;
let current_epoch: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
let expires_at: Option<i64> = row
.try_get::<Option<i64>, _>("lease_expires_at_ms")
.map_err(map_sqlx_error)?;
let now = now_ms();
let live = matches!(expires_at, Some(exp) if exp > now);
if live {
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(None); }
let lease_ttl_ms = i64::from(token.lease_ttl_ms);
let new_expires = now.saturating_add(lease_ttl_ms);
sqlx::query(
r#"
UPDATE ff_attempt
SET worker_id = $1,
worker_instance_id = $2,
lease_epoch = lease_epoch + 1,
lease_expires_at_ms = $3,
started_at_ms = $4,
outcome = NULL
WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7
"#,
)
.bind(token.worker_id.as_str())
.bind(token.worker_instance_id.as_str())
.bind(new_expires)
.bind(now)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index_i)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'active',
ownership_state = 'leased',
eligibility_state = 'not_applicable',
attempt_state = 'running_attempt'
WHERE partition_key = $1 AND execution_id = $2
"#,
)
.bind(part)
.bind(exec_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
let new_epoch = current_epoch.saturating_add(1);
let payload = HandlePayload::new(
eid.clone(),
AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0)),
AttemptId::new(),
LeaseId::new(),
LeaseEpoch(u64::try_from(new_epoch).unwrap_or(0)),
u64::from(token.lease_ttl_ms),
token.grant.lane_id.clone(),
token.worker_instance_id.clone(),
);
Ok(Some(mint_handle(payload, HandleKind::Resumed)))
}
async fn fence_check<'c>(
tx: &mut sqlx::Transaction<'c, sqlx::Postgres>,
part: i16,
exec_uuid: Uuid,
attempt_index: i32,
expected_epoch: u64,
) -> Result<(), EngineError> {
let row = sqlx::query(
r#"
SELECT lease_epoch FROM ff_attempt
WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
FOR UPDATE
"#,
)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.fetch_optional(&mut **tx)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
return Err(EngineError::NotFound { entity: "attempt" });
};
let epoch_i: i64 = row.try_get("lease_epoch").map_err(map_sqlx_error)?;
let observed = u64::try_from(epoch_i).unwrap_or(0);
if observed != expected_epoch {
return Err(EngineError::Contention(ContentionKind::LeaseConflict));
}
Ok(())
}
pub(crate) async fn renew(
pool: &PgPool,
handle: &Handle,
) -> Result<LeaseRenewal, EngineError> {
let payload = decode_handle(handle)?;
let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
let now = now_ms();
let new_expires = now.saturating_add(i64::try_from(payload.lease_ttl_ms).unwrap_or(0));
sqlx::query(
r#"
UPDATE ff_attempt
SET lease_expires_at_ms = $1
WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
"#,
)
.bind(new_expires)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(LeaseRenewal::new(
u64::try_from(new_expires).unwrap_or(0),
payload.lease_epoch.0,
))
}
pub(crate) async fn progress(
pool: &PgPool,
handle: &Handle,
percent: Option<u8>,
message: Option<String>,
) -> Result<(), EngineError> {
let payload = decode_handle(handle)?;
let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
let mut patch = serde_json::Map::new();
if let Some(pct) = percent {
patch.insert("progress_pct".into(), serde_json::Value::from(pct));
}
if let Some(msg) = message {
patch.insert("progress_message".into(), serde_json::Value::from(msg));
}
let patch_val = serde_json::Value::Object(patch);
sqlx::query(
r#"
UPDATE ff_exec_core
SET raw_fields = raw_fields || $1::jsonb
WHERE partition_key = $2 AND execution_id = $3
"#,
)
.bind(patch_val)
.bind(part)
.bind(exec_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(())
}
pub(crate) async fn complete(
pool: &PgPool,
handle: &Handle,
payload_bytes: Option<Vec<u8>>,
) -> Result<(), EngineError> {
let payload = decode_handle(handle)?;
let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
let now = now_ms();
sqlx::query(
r#"
UPDATE ff_attempt
SET terminal_at_ms = $1,
outcome = 'success',
lease_expires_at_ms = NULL
WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
"#,
)
.bind(now)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'terminal',
ownership_state = 'unowned',
eligibility_state = 'not_applicable',
attempt_state = 'attempt_terminal',
terminal_at_ms = $1,
result = $2
WHERE partition_key = $3 AND execution_id = $4
"#,
)
.bind(now)
.bind(payload_bytes.as_deref())
.bind(part)
.bind(exec_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
r#"
INSERT INTO ff_completion_event (
partition_key, execution_id, flow_id, outcome,
namespace, instance_tag, occurred_at_ms
)
SELECT partition_key, execution_id, flow_id, 'success',
NULL, NULL, $3
FROM ff_exec_core
WHERE partition_key = $1 AND execution_id = $2
"#,
)
.bind(part)
.bind(exec_uuid)
.bind(now)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(())
}
pub(crate) async fn fail(
pool: &PgPool,
handle: &Handle,
reason: FailureReason,
classification: FailureClass,
) -> Result<FailOutcome, EngineError> {
let payload = decode_handle(handle)?;
let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
let now = now_ms();
let retryable = matches!(
classification,
FailureClass::Transient | FailureClass::InfraCrash
);
if retryable {
sqlx::query(
r#"
UPDATE ff_attempt
SET terminal_at_ms = $1,
outcome = 'retry',
lease_expires_at_ms = NULL
WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
"#,
)
.bind(now)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'runnable',
ownership_state = 'unowned',
eligibility_state = 'eligible_now',
attempt_state = 'pending_retry_attempt',
attempt_index = attempt_index + 1,
raw_fields = raw_fields || jsonb_build_object('last_failure_message', $1::text)
WHERE partition_key = $2 AND execution_id = $3
"#,
)
.bind(&reason.message)
.bind(part)
.bind(exec_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(FailOutcome::RetryScheduled {
delay_until: TimestampMs::from_millis(now),
})
} else {
sqlx::query(
r#"
UPDATE ff_attempt
SET terminal_at_ms = $1,
outcome = 'failed',
lease_expires_at_ms = NULL
WHERE partition_key = $2 AND execution_id = $3 AND attempt_index = $4
"#,
)
.bind(now)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'terminal',
ownership_state = 'unowned',
eligibility_state = 'not_applicable',
attempt_state = 'attempt_terminal',
terminal_at_ms = $1,
raw_fields = raw_fields || jsonb_build_object('last_failure_message', $2::text)
WHERE partition_key = $3 AND execution_id = $4
"#,
)
.bind(now)
.bind(&reason.message)
.bind(part)
.bind(exec_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
r#"
INSERT INTO ff_completion_event (
partition_key, execution_id, flow_id, outcome,
namespace, instance_tag, occurred_at_ms
)
SELECT partition_key, execution_id, flow_id, 'failed',
NULL, NULL, $3
FROM ff_exec_core
WHERE partition_key = $1 AND execution_id = $2
"#,
)
.bind(part)
.bind(exec_uuid)
.bind(now)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(FailOutcome::TerminalFailed)
}
}
pub(crate) async fn delay(
pool: &PgPool,
handle: &Handle,
delay_until: TimestampMs,
) -> Result<(), EngineError> {
let payload = decode_handle(handle)?;
let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
sqlx::query(
r#"
UPDATE ff_attempt
SET outcome = 'interrupted',
lease_expires_at_ms = NULL
WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
"#,
)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'runnable',
ownership_state = 'unowned',
eligibility_state = 'not_eligible_until_time',
attempt_state = 'attempt_interrupted',
deadline_at_ms = $1
WHERE partition_key = $2 AND execution_id = $3
"#,
)
.bind(delay_until.0)
.bind(part)
.bind(exec_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(())
}
pub(crate) async fn wait_children(
pool: &PgPool,
handle: &Handle,
) -> Result<(), EngineError> {
let payload = decode_handle(handle)?;
let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
let attempt_index = i32::try_from(payload.attempt_index.0).unwrap_or(0);
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
fence_check(&mut tx, part, exec_uuid, attempt_index, payload.lease_epoch.0).await?;
sqlx::query(
r#"
UPDATE ff_attempt
SET outcome = 'waiting_children',
lease_expires_at_ms = NULL
WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3
"#,
)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'runnable',
ownership_state = 'unowned',
eligibility_state = 'blocked_by_dependencies',
attempt_state = 'attempt_interrupted',
blocking_reason = 'waiting_for_children'
WHERE partition_key = $1 AND execution_id = $2
"#,
)
.bind(part)
.bind(exec_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(())
}