use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use ff_core::backend::{BackendTag, Handle, HandleKind, HandleOpaque, ResumeSignal, WaitpointHmac};
use ff_core::contracts::{
AdditionalWaitpointBinding, ClaimResumedExecutionArgs, ClaimResumedExecutionResult,
ClaimedResumedExecution, DeliverSignalArgs, DeliverSignalResult, ResumeCondition, SuspendArgs,
SuspendOutcome, SuspendOutcomeDetails, WaitpointBinding,
};
use ff_core::engine_error::{ContentionKind, EngineError, ValidationKind};
use ff_core::handle_codec::{encode as encode_opaque, HandlePayload};
use ff_core::partition::PartitionConfig;
use ff_core::types::{
AttemptId, AttemptIndex, ExecutionId, LeaseEpoch, LeaseFence, SignalId, SuspensionId,
TimestampMs, WaitpointId,
};
use serde_json::{json, Value as JsonValue};
use sqlx::{PgPool, Postgres, Transaction};
use uuid::Uuid;
use crate::error::map_sqlx_error;
use crate::lease_event;
use crate::signal::{hmac_sign, hmac_verify, is_retryable_serialization, SERIALIZABLE_RETRY_BUDGET};
use crate::signal_event;
use crate::suspend::evaluate;
fn now_ms() -> i64 {
let d = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
(d.as_millis() as i64).max(0)
}
fn split_exec_id(eid: &ExecutionId) -> Result<(i16, Uuid), EngineError> {
let s = eid.as_str();
let rest = s.strip_prefix("{fp:").ok_or_else(|| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("execution_id missing `{{fp:` prefix: {s}"),
})?;
let close = rest.find("}:").ok_or_else(|| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("execution_id missing `}}:`: {s}"),
})?;
let part: i16 = rest[..close]
.parse()
.map_err(|_| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("execution_id partition index not u16: {s}"),
})?;
let uuid = Uuid::parse_str(&rest[close + 2..]).map_err(|_| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("execution_id UUID invalid: {s}"),
})?;
Ok((part, uuid))
}
fn decode_handle(handle: &Handle) -> Result<HandlePayload, EngineError> {
if handle.backend != BackendTag::Postgres {
return Err(EngineError::Validation {
kind: ValidationKind::HandleFromOtherBackend,
detail: format!("expected Postgres, got {:?}", handle.backend),
});
}
let decoded = ff_core::handle_codec::decode(&handle.opaque)?;
if decoded.tag != BackendTag::Postgres {
return Err(EngineError::Validation {
kind: ValidationKind::HandleFromOtherBackend,
detail: format!("embedded tag {:?}", decoded.tag),
});
}
Ok(decoded.payload)
}
fn wp_uuid(w: &WaitpointId) -> Result<Uuid, EngineError> {
Uuid::parse_str(&w.to_string()).map_err(|e| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("waitpoint_id not a UUID: {e}"),
})
}
fn susp_uuid(s: &SuspensionId) -> Result<Uuid, EngineError> {
Uuid::parse_str(&s.to_string()).map_err(|e| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("suspension_id not a UUID: {e}"),
})
}
fn is_retryable_engine(err: &EngineError) -> bool {
match err {
EngineError::Transport { source, .. } => {
let s = source.to_string();
s.contains("40001")
|| s.contains("40P01")
|| s.contains("serialization_failure")
|| s.contains("deadlock_detected")
}
EngineError::Contention(ContentionKind::LeaseConflict) => true,
_ => false,
}
}
async fn run_serializable<T, F>(pool: &PgPool, mut op: F) -> Result<T, EngineError>
where
T: Send,
F: for<'a> FnMut(
&'a mut Transaction<'_, Postgres>,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<T, EngineError>> + Send + 'a>,
> + Send,
{
for _ in 0..SERIALIZABLE_RETRY_BUDGET {
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
sqlx::query("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let body_res = op(&mut tx).await;
match body_res {
Ok(v) => match tx.commit().await {
Ok(()) => return Ok(v),
Err(e) if is_retryable_serialization(&e) => continue,
Err(e) => return Err(map_sqlx_error(e)),
},
Err(e) if is_retryable_engine(&e) => {
let _ = tx.rollback().await;
continue;
}
Err(e) => {
let _ = tx.rollback().await;
return Err(e);
}
}
}
Err(EngineError::Contention(ContentionKind::RetryExhausted))
}
fn outcome_to_dedup_json(outcome: &SuspendOutcome) -> JsonValue {
let details = outcome.details();
let extras: Vec<JsonValue> = details
.additional_waitpoints
.iter()
.map(|e| {
json!({
"waitpoint_id": e.waitpoint_id.to_string(),
"waitpoint_key": e.waitpoint_key,
"token": e.waitpoint_token.as_str(),
})
})
.collect();
let (variant, handle_opaque) = match outcome {
SuspendOutcome::Suspended { handle, .. } => {
("Suspended", Some(hex::encode(handle.opaque.as_bytes())))
}
SuspendOutcome::AlreadySatisfied { .. } => ("AlreadySatisfied", None),
_ => ("Suspended", None),
};
json!({
"variant": variant,
"details": {
"suspension_id": details.suspension_id.to_string(),
"waitpoint_id": details.waitpoint_id.to_string(),
"waitpoint_key": details.waitpoint_key,
"token": details.waitpoint_token.as_str(),
"extras": extras,
},
"handle_opaque_hex": handle_opaque,
})
}
fn outcome_from_dedup_json(v: &JsonValue) -> Result<SuspendOutcome, EngineError> {
let corrupt = |s: String| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: s,
};
let det = &v["details"];
let suspension_id = SuspensionId::parse(det["suspension_id"].as_str().unwrap_or(""))
.map_err(|e| corrupt(format!("dedup suspension_id: {e}")))?;
let waitpoint_id = WaitpointId::parse(det["waitpoint_id"].as_str().unwrap_or(""))
.map_err(|e| corrupt(format!("dedup waitpoint_id: {e}")))?;
let waitpoint_key = det["waitpoint_key"].as_str().unwrap_or("").to_owned();
let token = det["token"].as_str().unwrap_or("").to_owned();
let mut extras: Vec<AdditionalWaitpointBinding> = Vec::new();
if let Some(arr) = det["extras"].as_array() {
for e in arr {
let wid = WaitpointId::parse(e["waitpoint_id"].as_str().unwrap_or(""))
.map_err(|err| corrupt(format!("dedup extra wp_id: {err}")))?;
let wkey = e["waitpoint_key"].as_str().unwrap_or("").to_owned();
let tok = e["token"].as_str().unwrap_or("").to_owned();
extras.push(AdditionalWaitpointBinding::new(
wid,
wkey,
WaitpointHmac::new(tok),
));
}
}
let details = SuspendOutcomeDetails::new(
suspension_id,
waitpoint_id,
waitpoint_key,
WaitpointHmac::new(token),
)
.with_additional_waitpoints(extras);
match v["variant"].as_str().unwrap_or("Suspended") {
"AlreadySatisfied" => Ok(SuspendOutcome::AlreadySatisfied { details }),
_ => {
let opaque_hex = v["handle_opaque_hex"].as_str().unwrap_or("");
let bytes = hex::decode(opaque_hex)
.map_err(|e| corrupt(format!("dedup handle hex: {e}")))?;
let opaque = HandleOpaque::new(bytes.into_boxed_slice());
let handle = Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
Ok(SuspendOutcome::Suspended { details, handle })
}
}
}
pub(crate) async fn suspend_impl(
pool: &PgPool,
_partition_config: &PartitionConfig,
handle: &Handle,
args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError> {
let payload = decode_handle(handle)?;
suspend_core(pool, payload, args).await
}
pub(crate) async fn suspend_by_triple_impl(
pool: &PgPool,
_partition_config: &PartitionConfig,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError> {
let (part, exec_uuid) = split_exec_id(&exec_id)?;
let row: Option<(i32,)> = sqlx::query_as(
"SELECT attempt_index FROM ff_exec_core \
WHERE partition_key = $1 AND execution_id = $2",
)
.bind(part)
.bind(exec_uuid)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
let attempt_index_i = match row {
Some((i,)) => i,
None => return Err(EngineError::NotFound { entity: "execution" }),
};
let attempt_index =
AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
let payload = HandlePayload::new(
exec_id,
attempt_index,
triple.attempt_id,
triple.lease_id,
triple.lease_epoch,
0,
ff_core::types::LaneId::new(""),
ff_core::types::WorkerInstanceId::new(""),
);
suspend_core(pool, payload, args).await
}
async fn suspend_core(
pool: &PgPool,
payload: HandlePayload,
args: SuspendArgs,
) -> Result<SuspendOutcome, EngineError> {
let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
let attempt_index_i = i32::try_from(payload.attempt_index.0).unwrap_or(0);
let expected_epoch = payload.lease_epoch.0;
let idem_key = args.idempotency_key.as_ref().map(|k| k.as_str().to_owned());
run_serializable(pool, move |tx| {
let args = args.clone();
let idem = idem_key.clone();
let payload = payload.clone();
Box::pin(async move {
if let Some(key) = idem.as_deref() {
let row: Option<(JsonValue,)> = sqlx::query_as(
"SELECT outcome_json FROM ff_suspend_dedup \
WHERE partition_key = $1 AND idempotency_key = $2",
)
.bind(part)
.bind(key)
.fetch_optional(&mut **tx)
.await
.map_err(map_sqlx_error)?;
if let Some((cached,)) = row {
return outcome_from_dedup_json(&cached);
}
}
let epoch_row: Option<(i64,)> = sqlx::query_as(
"SELECT lease_epoch FROM ff_attempt \
WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3 \
FOR UPDATE",
)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index_i)
.fetch_optional(&mut **tx)
.await
.map_err(map_sqlx_error)?;
let observed_epoch: u64 = match epoch_row {
Some((e,)) => u64::try_from(e).unwrap_or(0),
None => return Err(EngineError::NotFound { entity: "attempt" }),
};
if observed_epoch != expected_epoch {
return Err(EngineError::Contention(ContentionKind::LeaseConflict));
}
let kid_row: Option<(String, Vec<u8>)> = sqlx::query_as(
"SELECT kid, secret FROM ff_waitpoint_hmac \
WHERE active = TRUE \
ORDER BY rotated_at_ms DESC LIMIT 1",
)
.fetch_optional(&mut **tx)
.await
.map_err(map_sqlx_error)?;
let (kid, secret) = kid_row.ok_or_else(|| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "ff_waitpoint_hmac empty — rotate a kid before suspend".into(),
})?;
let now = args.now.0;
let mut signed: Vec<(WaitpointId, String, String)> = Vec::new();
for binding in args.waitpoints.iter() {
let (wp_id, wp_key) = match binding {
WaitpointBinding::Fresh {
waitpoint_id,
waitpoint_key,
} => (waitpoint_id.clone(), waitpoint_key.clone()),
WaitpointBinding::UsePending { waitpoint_id } => {
let row: Option<(String,)> = sqlx::query_as(
"SELECT waitpoint_key FROM ff_waitpoint_pending \
WHERE partition_key = $1 AND waitpoint_id = $2",
)
.bind(part)
.bind(wp_uuid(waitpoint_id)?)
.fetch_optional(&mut **tx)
.await
.map_err(map_sqlx_error)?;
let wp_key = row.map(|(k,)| k).unwrap_or_default();
(waitpoint_id.clone(), wp_key)
}
_ => {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "unsupported WaitpointBinding variant".into(),
});
}
};
let msg = format!("{}:{}", payload.execution_id, wp_id);
let token = hmac_sign(&secret, &kid, msg.as_bytes());
sqlx::query(
"INSERT INTO ff_waitpoint_pending \
(partition_key, waitpoint_id, execution_id, token_kid, token, \
created_at_ms, expires_at_ms, waitpoint_key) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
ON CONFLICT (partition_key, waitpoint_id) DO UPDATE SET \
token_kid = EXCLUDED.token_kid, token = EXCLUDED.token, \
waitpoint_key = EXCLUDED.waitpoint_key",
)
.bind(part)
.bind(wp_uuid(&wp_id)?)
.bind(exec_uuid)
.bind(&kid)
.bind(&token)
.bind(now)
.bind(args.timeout_at.map(|t| t.0))
.bind(&wp_key)
.execute(&mut **tx)
.await
.map_err(map_sqlx_error)?;
signed.push((wp_id, wp_key, token));
}
let condition_json =
serde_json::to_value(&args.resume_condition).map_err(|e| {
EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("resume_condition serialize: {e}"),
}
})?;
sqlx::query(
"INSERT INTO ff_suspension_current \
(partition_key, execution_id, suspension_id, suspended_at_ms, \
timeout_at_ms, reason_code, condition, satisfied_set, member_map, \
timeout_behavior) \
VALUES ($1, $2, $3, $4, $5, $6, $7, '[]'::jsonb, '{}'::jsonb, $8) \
ON CONFLICT (partition_key, execution_id) DO UPDATE SET \
suspension_id = EXCLUDED.suspension_id, \
suspended_at_ms = EXCLUDED.suspended_at_ms, \
timeout_at_ms = EXCLUDED.timeout_at_ms, \
reason_code = EXCLUDED.reason_code, \
condition = EXCLUDED.condition, \
satisfied_set = '[]'::jsonb, \
member_map = '{}'::jsonb, \
timeout_behavior = EXCLUDED.timeout_behavior",
)
.bind(part)
.bind(exec_uuid)
.bind(susp_uuid(&args.suspension_id)?)
.bind(now)
.bind(args.timeout_at.map(|t| t.0))
.bind(args.reason_code.as_wire_str())
.bind(&condition_json)
.bind(args.timeout_behavior.as_wire_str())
.execute(&mut **tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
"UPDATE ff_exec_core \
SET lifecycle_phase = 'suspended', \
ownership_state = 'released', \
eligibility_state = 'not_applicable', \
public_state = 'suspended', \
attempt_state = 'attempt_interrupted' \
WHERE partition_key = $1 AND execution_id = $2",
)
.bind(part)
.bind(exec_uuid)
.execute(&mut **tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
"UPDATE ff_attempt \
SET worker_id = NULL, \
worker_instance_id = NULL, \
lease_expires_at_ms = NULL, \
lease_epoch = lease_epoch + 1, \
outcome = 'attempt_interrupted' \
WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index_i)
.execute(&mut **tx)
.await
.map_err(map_sqlx_error)?;
lease_event::emit(
tx,
part,
exec_uuid,
None,
lease_event::EVENT_REVOKED,
now,
)
.await?;
let (primary_id, primary_key, primary_token) = signed[0].clone();
let extras: Vec<AdditionalWaitpointBinding> = signed
.iter()
.skip(1)
.map(|(id, key, tok)| {
AdditionalWaitpointBinding::new(
id.clone(),
key.clone(),
WaitpointHmac::new(tok.clone()),
)
})
.collect();
let details = SuspendOutcomeDetails::new(
args.suspension_id.clone(),
primary_id,
primary_key,
WaitpointHmac::new(primary_token),
)
.with_additional_waitpoints(extras);
let opaque = encode_opaque(BackendTag::Postgres, &payload);
let suspended_handle =
Handle::new(BackendTag::Postgres, HandleKind::Suspended, opaque);
let outcome = SuspendOutcome::Suspended {
details,
handle: suspended_handle,
};
if let Some(key) = idem.as_deref() {
let cached = outcome_to_dedup_json(&outcome);
sqlx::query(
"INSERT INTO ff_suspend_dedup \
(partition_key, idempotency_key, outcome_json, created_at_ms) \
VALUES ($1, $2, $3, $4) \
ON CONFLICT DO NOTHING",
)
.bind(part)
.bind(key)
.bind(&cached)
.bind(now)
.execute(&mut **tx)
.await
.map_err(map_sqlx_error)?;
}
Ok(outcome)
})
})
.await
}
pub(crate) async fn deliver_signal_impl(
pool: &PgPool,
_partition_config: &PartitionConfig,
args: DeliverSignalArgs,
) -> Result<DeliverSignalResult, EngineError> {
let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
let wp_u = wp_uuid(&args.waitpoint_id)?;
run_serializable(pool, move |tx| {
let args = args.clone();
Box::pin(async move {
let row: Option<(String, String, String, Uuid)> = sqlx::query_as(
"SELECT token_kid, token, waitpoint_key, execution_id \
FROM ff_waitpoint_pending \
WHERE partition_key = $1 AND waitpoint_id = $2 \
FOR UPDATE",
)
.bind(part)
.bind(wp_u)
.fetch_optional(&mut **tx)
.await
.map_err(map_sqlx_error)?;
let (kid, stored_token, wp_key, stored_exec) = match row {
Some(r) => r,
None => return Err(EngineError::NotFound { entity: "waitpoint" }),
};
if stored_exec != exec_uuid {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "waitpoint belongs to a different execution".into(),
});
}
let secret_row: Option<(Vec<u8>,)> = sqlx::query_as(
"SELECT secret FROM ff_waitpoint_hmac WHERE kid = $1",
)
.bind(&kid)
.fetch_optional(&mut **tx)
.await
.map_err(map_sqlx_error)?;
let (secret,) = secret_row.ok_or_else(|| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("kid {kid} missing from keystore"),
})?;
let presented = args.waitpoint_token.as_str();
let msg = format!("{}:{}", args.execution_id, args.waitpoint_id);
hmac_verify(&secret, &kid, msg.as_bytes(), presented).map_err(|e| {
EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: format!("waitpoint_token verify: {e}"),
}
})?;
if presented != stored_token {
return Err(EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "waitpoint_token does not match minted token".into(),
});
}
let susp_row: Option<(JsonValue, JsonValue)> = sqlx::query_as(
"SELECT condition, member_map FROM ff_suspension_current \
WHERE partition_key = $1 AND execution_id = $2 \
FOR UPDATE",
)
.bind(part)
.bind(exec_uuid)
.fetch_optional(&mut **tx)
.await
.map_err(map_sqlx_error)?;
let (condition_json, mut member_map) = match susp_row {
Some(r) => r,
None => return Err(EngineError::NotFound { entity: "suspension" }),
};
let signal_blob = json!({
"signal_id": args.signal_id.to_string(),
"signal_name": args.signal_name,
"signal_category": args.signal_category,
"source_type": args.source_type,
"source_identity": args.source_identity,
"correlation_id": args.correlation_id.clone().unwrap_or_default(),
"accepted_at": args.now.0,
"payload_hex": args.payload.as_ref().map(hex::encode),
});
let map_obj = member_map.as_object_mut().ok_or_else(|| {
EngineError::Validation {
kind: ValidationKind::Corruption,
detail: "member_map not a JSON object".into(),
}
})?;
let entry = map_obj.entry(wp_key.clone()).or_insert_with(|| json!([]));
entry
.as_array_mut()
.ok_or_else(|| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: "member_map[wp_key] not a JSON array".into(),
})?
.push(signal_blob);
let condition: ResumeCondition = serde_json::from_value(condition_json)
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("condition deserialize: {e}"),
})?;
let signals_by_wp: HashMap<String, Vec<ResumeSignal>> = map_obj
.iter()
.map(|(k, v)| {
let sigs: Vec<ResumeSignal> = v
.as_array()
.map(|arr| arr.iter().filter_map(resume_signal_from_json).collect())
.unwrap_or_default();
(k.clone(), sigs)
})
.collect();
let borrowed: HashMap<&str, &[ResumeSignal]> = signals_by_wp
.iter()
.map(|(k, v)| (k.as_str(), v.as_slice()))
.collect();
let satisfied = evaluate(&condition, &borrowed);
sqlx::query(
"UPDATE ff_suspension_current SET member_map = $1 \
WHERE partition_key = $2 AND execution_id = $3",
)
.bind(&member_map)
.bind(part)
.bind(exec_uuid)
.execute(&mut **tx)
.await
.map_err(map_sqlx_error)?;
let effect = if satisfied {
sqlx::query(
"UPDATE ff_exec_core \
SET public_state = 'resumable', \
lifecycle_phase = 'runnable', \
eligibility_state = 'eligible_now' \
WHERE partition_key = $1 AND execution_id = $2",
)
.bind(part)
.bind(exec_uuid)
.execute(&mut **tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
"DELETE FROM ff_waitpoint_pending \
WHERE partition_key = $1 AND execution_id = $2",
)
.bind(part)
.bind(exec_uuid)
.execute(&mut **tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
"INSERT INTO ff_completion_event \
(partition_key, execution_id, outcome, occurred_at_ms) \
VALUES ($1, $2, 'resumable', $3)",
)
.bind(part)
.bind(exec_uuid)
.bind(args.now.0)
.execute(&mut **tx)
.await
.map_err(map_sqlx_error)?;
"resume_condition_satisfied"
} else {
"appended_to_waitpoint"
};
let wp_id_str = args.waitpoint_id.to_string();
signal_event::emit(
tx,
part,
exec_uuid,
&args.signal_id.to_string(),
Some(wp_id_str.as_str()),
Some(args.source_identity.as_str()),
args.now.0,
)
.await?;
Ok(DeliverSignalResult::Accepted {
signal_id: args.signal_id.clone(),
effect: effect.to_owned(),
})
})
})
.await
}
fn resume_signal_from_json(v: &JsonValue) -> Option<ResumeSignal> {
let signal_id = SignalId::parse(v["signal_id"].as_str()?).ok()?;
Some(ResumeSignal {
signal_id,
signal_name: v["signal_name"].as_str()?.to_owned(),
signal_category: v["signal_category"].as_str().unwrap_or("").to_owned(),
source_type: v["source_type"].as_str().unwrap_or("").to_owned(),
source_identity: v["source_identity"].as_str().unwrap_or("").to_owned(),
correlation_id: v["correlation_id"].as_str().unwrap_or("").to_owned(),
accepted_at: TimestampMs::from_millis(v["accepted_at"].as_i64().unwrap_or(0)),
payload: v["payload_hex"].as_str().and_then(|h| hex::decode(h).ok()),
})
}
pub(crate) async fn claim_resumed_execution_impl(
pool: &PgPool,
_partition_config: &PartitionConfig,
args: ClaimResumedExecutionArgs,
) -> Result<ClaimResumedExecutionResult, EngineError> {
let (part, exec_uuid) = split_exec_id(&args.execution_id)?;
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
let row: Option<(String, i32)> = sqlx::query_as(
"SELECT public_state, attempt_index FROM ff_exec_core \
WHERE partition_key = $1 AND execution_id = $2 \
FOR UPDATE",
)
.bind(part)
.bind(exec_uuid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let (public_state, attempt_index_i) = match row {
Some(r) => r,
None => {
tx.rollback().await.ok();
return Err(EngineError::NotFound { entity: "execution" });
}
};
if public_state != "resumable" {
tx.rollback().await.ok();
return Err(EngineError::Contention(
ContentionKind::NotAResumedExecution,
));
}
let now = now_ms();
let lease_ttl = i64::try_from(args.lease_ttl_ms).unwrap_or(0);
let new_expires = now.saturating_add(lease_ttl);
sqlx::query(
"UPDATE ff_attempt \
SET worker_id = $1, worker_instance_id = $2, \
lease_epoch = lease_epoch + 1, \
lease_expires_at_ms = $3, started_at_ms = $4, outcome = NULL \
WHERE partition_key = $5 AND execution_id = $6 AND attempt_index = $7",
)
.bind(args.worker_id.as_str())
.bind(args.worker_instance_id.as_str())
.bind(new_expires)
.bind(now)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index_i)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
"UPDATE ff_exec_core \
SET lifecycle_phase = 'active', ownership_state = 'leased', \
eligibility_state = 'not_applicable', \
public_state = 'running', attempt_state = 'running_attempt' \
WHERE partition_key = $1 AND execution_id = $2",
)
.bind(part)
.bind(exec_uuid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let epoch_row: (i64,) = sqlx::query_as(
"SELECT lease_epoch FROM ff_attempt \
WHERE partition_key = $1 AND execution_id = $2 AND attempt_index = $3",
)
.bind(part)
.bind(exec_uuid)
.bind(attempt_index_i)
.fetch_one(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let lease_id_str = args.lease_id.to_string();
lease_event::emit(
&mut tx,
part,
exec_uuid,
Some(&lease_id_str),
lease_event::EVENT_ACQUIRED,
now,
)
.await?;
tx.commit().await.map_err(map_sqlx_error)?;
let attempt_index = AttemptIndex::new(u32::try_from(attempt_index_i.max(0)).unwrap_or(0));
let lease_epoch = LeaseEpoch(u64::try_from(epoch_row.0).unwrap_or(0));
let attempt_id = AttemptId::new();
Ok(ClaimResumedExecutionResult::Claimed(
ClaimedResumedExecution {
execution_id: args.execution_id.clone(),
lease_id: args.lease_id.clone(),
lease_epoch,
attempt_index,
attempt_id,
lease_expires_at: TimestampMs::from_millis(new_expires),
},
))
}
pub(crate) async fn observe_signals_impl(
pool: &PgPool,
handle: &Handle,
) -> Result<Vec<ResumeSignal>, EngineError> {
let payload = decode_handle(handle)?;
let (part, exec_uuid) = split_exec_id(&payload.execution_id)?;
let row: Option<(JsonValue,)> = sqlx::query_as(
"SELECT member_map FROM ff_suspension_current \
WHERE partition_key = $1 AND execution_id = $2",
)
.bind(part)
.bind(exec_uuid)
.fetch_optional(pool)
.await
.map_err(map_sqlx_error)?;
let Some((member_map,)) = row else {
return Ok(Vec::new());
};
let mut out: Vec<ResumeSignal> = Vec::new();
if let Some(map) = member_map.as_object() {
for (_wp_key, arr) in map {
if let Some(sigs) = arr.as_array() {
for v in sigs {
if let Some(s) = resume_signal_from_json(v) {
out.push(s);
}
}
}
}
}
Ok(out)
}