use ff_core::contracts::*;
use crate::error::ScriptError;
use ff_core::keys::{ExecKeyContext, IndexKeys};
use ff_core::state::{AttemptType, PublicState};
use ff_core::types::*;
use crate::result::{FcallResult, FromFcallResult};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ClaimedExecutionPartial {
pub lease_id: LeaseId,
pub lease_epoch: LeaseEpoch,
pub attempt_index: AttemptIndex,
pub attempt_id: AttemptId,
pub attempt_type: AttemptType,
pub lease_expires_at: TimestampMs,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ClaimExecutionResultPartial {
Claimed(ClaimedExecutionPartial),
}
impl ClaimExecutionResultPartial {
pub fn complete(self, execution_id: ExecutionId) -> ClaimExecutionResult {
match self {
Self::Claimed(p) => ClaimExecutionResult::Claimed(ClaimedExecution {
execution_id,
lease_id: p.lease_id,
lease_epoch: p.lease_epoch,
attempt_index: p.attempt_index,
attempt_id: p.attempt_id,
attempt_type: p.attempt_type,
lease_expires_at: p.lease_expires_at,
}),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum CompleteExecutionResultPartial {
Completed { public_state: PublicState },
}
impl CompleteExecutionResultPartial {
pub fn complete(self, execution_id: ExecutionId) -> CompleteExecutionResult {
match self {
Self::Completed { public_state } => CompleteExecutionResult::Completed {
execution_id,
public_state,
},
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum CancelExecutionResultPartial {
Cancelled { public_state: PublicState },
}
impl CancelExecutionResultPartial {
pub fn complete(self, execution_id: ExecutionId) -> CancelExecutionResult {
match self {
Self::Cancelled { public_state } => CancelExecutionResult::Cancelled {
execution_id,
public_state,
},
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DelayExecutionResultPartial {
Delayed { public_state: PublicState },
}
impl DelayExecutionResultPartial {
pub fn complete(self, execution_id: ExecutionId) -> DelayExecutionResult {
match self {
Self::Delayed { public_state } => DelayExecutionResult::Delayed {
execution_id,
public_state,
},
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum MoveToWaitingChildrenResultPartial {
Moved { public_state: PublicState },
}
impl MoveToWaitingChildrenResultPartial {
pub fn complete(self, execution_id: ExecutionId) -> MoveToWaitingChildrenResult {
match self {
Self::Moved { public_state } => MoveToWaitingChildrenResult::Moved {
execution_id,
public_state,
},
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ExpireExecutionResultPartial {
Expired,
AlreadyTerminal,
}
impl ExpireExecutionResultPartial {
pub fn complete(self, execution_id: ExecutionId) -> ExpireExecutionResult {
match self {
Self::Expired => ExpireExecutionResult::Expired { execution_id },
Self::AlreadyTerminal => ExpireExecutionResult::AlreadyTerminal,
}
}
}
pub struct ExecOpKeys<'a> {
pub ctx: &'a ExecKeyContext,
pub idx: &'a IndexKeys,
pub lane_id: &'a LaneId,
pub worker_instance_id: &'a WorkerInstanceId,
}
ff_function! {
pub ff_create_execution(args: CreateExecutionArgs) -> CreateExecutionResult {
keys(k: &ExecOpKeys<'_>) {
k.ctx.core(),
k.ctx.payload(),
k.ctx.policy(),
k.ctx.tags(),
if args.delay_until.is_some() {
k.idx.lane_delayed(k.lane_id)
} else {
k.idx.lane_eligible(k.lane_id)
},
args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
ff_core::keys::idempotency_key(k.ctx.hash_tag(), args.namespace.as_str(), ik)
}).unwrap_or_else(|| k.ctx.noop()),
k.idx.execution_deadline(),
k.idx.all_executions(),
}
argv {
args.execution_id.to_string(),
args.namespace.to_string(),
args.lane_id.to_string(),
args.execution_kind.clone(),
args.priority.to_string(),
args.creator_identity.clone(),
args.policy.as_ref().map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".into())).unwrap_or_else(|| "{}".into()),
String::from_utf8_lossy(&args.input_payload).into_owned(),
args.delay_until.map(|t| t.to_string()).unwrap_or_default(),
args.idempotency_key.as_ref().map(|_| "86400000".to_string()).unwrap_or_default(),
serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".into()),
String::new(),
args.partition_id.to_string(),
}
}
}
impl FromFcallResult for CreateExecutionResult {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?;
if r.status == "DUPLICATE" {
let eid_str = r.field_str(0);
let eid = ExecutionId::parse(&eid_str)
.map_err(|e| ScriptError::Parse(format!("bad execution_id: {e}")))?;
return Ok(CreateExecutionResult::Duplicate { execution_id: eid });
}
let r = r.into_success()?;
let eid_str = r.field_str(0);
let ps_str = r.field_str(1);
let eid = ExecutionId::parse(&eid_str)
.map_err(|e| ScriptError::Parse(format!("bad execution_id: {e}")))?;
let public_state = parse_public_state(&ps_str)?;
Ok(CreateExecutionResult::Created {
execution_id: eid,
public_state,
})
}
}
ff_function! {
pub ff_claim_execution(args: ClaimExecutionArgs) -> ClaimExecutionResultPartial {
keys(k: &ExecOpKeys<'_>) {
k.ctx.core(),
k.ctx.claim_grant(),
k.idx.lane_eligible(k.lane_id),
k.idx.lease_expiry(),
k.idx.worker_leases(k.worker_instance_id),
k.ctx.attempt_hash(args.expected_attempt_index),
k.ctx.attempt_usage(args.expected_attempt_index),
k.ctx.attempt_policy(args.expected_attempt_index),
k.ctx.attempts(),
k.ctx.lease_current(),
k.ctx.lease_history(),
k.idx.lane_active(k.lane_id),
k.idx.attempt_timeout(),
k.idx.execution_deadline(),
}
argv {
args.execution_id.to_string(),
args.worker_id.to_string(),
args.worker_instance_id.to_string(),
args.lane_id.to_string(),
String::new(),
args.lease_id.to_string(),
args.lease_ttl_ms.to_string(),
(args.lease_ttl_ms * 2 / 3).to_string(),
args.attempt_id.to_string(),
args.attempt_policy_json.clone(),
args.attempt_timeout_ms.map(|t| t.to_string()).unwrap_or_default(),
args.execution_deadline_at.map(|t| t.to_string()).unwrap_or_default(),
}
}
}
impl FromFcallResult for ClaimExecutionResultPartial {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?.into_success()?;
let lease_id = LeaseId::parse(&r.field_str(0))
.map_err(|e| ScriptError::Parse(format!("bad lease_id: {e}")))?;
let epoch = r.field_str(1).parse::<u64>()
.map_err(|e| ScriptError::Parse(format!("bad epoch: {e}")))?;
let expires_at = r.field_str(2).parse::<i64>()
.map_err(|e| ScriptError::Parse(format!("bad expires_at: {e}")))?;
let attempt_id = AttemptId::parse(&r.field_str(3))
.map_err(|e| ScriptError::Parse(format!("bad attempt_id: {e}")))?;
let attempt_index = r.field_str(4).parse::<u32>()
.map_err(|e| ScriptError::Parse(format!("bad attempt_index: {e}")))?;
let attempt_type = parse_attempt_type(&r.field_str(5))?;
Ok(Self::Claimed(ClaimedExecutionPartial {
lease_id,
lease_epoch: LeaseEpoch::new(epoch),
attempt_index: AttemptIndex::new(attempt_index),
attempt_id,
attempt_type,
lease_expires_at: TimestampMs::from_millis(expires_at),
}))
}
}
ff_function! {
pub ff_complete_execution(args: CompleteExecutionArgs) -> CompleteExecutionResultPartial {
keys(k: &ExecOpKeys<'_>) {
k.ctx.core(),
k.ctx.attempt_hash(args.attempt_index),
k.idx.lease_expiry(),
k.idx.worker_leases(k.worker_instance_id),
k.idx.lane_terminal(k.lane_id),
k.ctx.lease_current(),
k.ctx.lease_history(),
k.idx.lane_active(k.lane_id),
k.ctx.stream_meta(args.attempt_index),
k.ctx.result(),
k.idx.attempt_timeout(),
k.idx.execution_deadline(),
}
argv {
args.execution_id.to_string(),
args.lease_id.to_string(),
args.lease_epoch.to_string(),
args.attempt_id.to_string(),
args.result_payload.as_ref()
.map(|p| String::from_utf8_lossy(p).into_owned())
.unwrap_or_default(),
}
}
}
impl FromFcallResult for CompleteExecutionResultPartial {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let _r = FcallResult::parse(raw)?.into_success()?;
Ok(Self::Completed { public_state: PublicState::Completed })
}
}
ff_function! {
pub ff_cancel_execution(args: CancelExecutionArgs) -> CancelExecutionResultPartial {
keys(k: &ExecOpKeys<'_>) {
k.ctx.core(), k.ctx.attempt_hash(AttemptIndex::new(0)), k.ctx.stream_meta(AttemptIndex::new(0)), k.ctx.lease_current(), k.ctx.lease_history(), k.idx.lease_expiry(), k.idx.worker_leases(k.worker_instance_id), k.ctx.suspension_current(), k.ctx.suspension_current(), k.ctx.suspension_current(), k.idx.suspension_timeout(), k.idx.lane_terminal(k.lane_id), k.idx.attempt_timeout(), k.idx.execution_deadline(), k.idx.lane_eligible(k.lane_id), k.idx.lane_delayed(k.lane_id), k.idx.lane_blocked_dependencies(k.lane_id), k.idx.lane_blocked_budget(k.lane_id), k.idx.lane_blocked_quota(k.lane_id), k.idx.lane_blocked_route(k.lane_id), k.idx.lane_blocked_operator(k.lane_id), }
argv {
args.execution_id.to_string(),
args.reason.clone(),
args.source.to_string(),
args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
}
}
}
impl FromFcallResult for CancelExecutionResultPartial {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let _r = FcallResult::parse(raw)?.into_success()?;
Ok(Self::Cancelled { public_state: PublicState::Cancelled })
}
}
ff_function! {
pub ff_delay_execution(args: DelayExecutionArgs) -> DelayExecutionResultPartial {
keys(k: &ExecOpKeys<'_>) {
k.ctx.core(),
k.ctx.attempt_hash(args.attempt_index),
k.ctx.lease_current(),
k.ctx.lease_history(),
k.idx.lease_expiry(),
k.idx.worker_leases(k.worker_instance_id),
k.idx.lane_active(k.lane_id),
k.idx.lane_delayed(k.lane_id),
k.idx.attempt_timeout(),
}
argv {
args.execution_id.to_string(),
args.lease_id.to_string(),
args.lease_epoch.to_string(),
args.attempt_id.to_string(),
args.delay_until.to_string(),
}
}
}
impl FromFcallResult for DelayExecutionResultPartial {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let _r = FcallResult::parse(raw)?.into_success()?;
Ok(Self::Delayed { public_state: PublicState::Delayed })
}
}
ff_function! {
pub ff_move_to_waiting_children(args: MoveToWaitingChildrenArgs) -> MoveToWaitingChildrenResultPartial {
keys(k: &ExecOpKeys<'_>) {
k.ctx.core(),
k.ctx.attempt_hash(args.attempt_index),
k.ctx.lease_current(),
k.ctx.lease_history(),
k.idx.lease_expiry(),
k.idx.worker_leases(k.worker_instance_id),
k.idx.lane_active(k.lane_id),
k.idx.lane_blocked_dependencies(k.lane_id),
k.idx.attempt_timeout(),
}
argv {
args.execution_id.to_string(),
args.lease_id.to_string(),
args.lease_epoch.to_string(),
args.attempt_id.to_string(),
}
}
}
impl FromFcallResult for MoveToWaitingChildrenResultPartial {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let _r = FcallResult::parse(raw)?.into_success()?;
Ok(Self::Moved { public_state: PublicState::WaitingChildren })
}
}
ff_function! {
pub ff_fail_execution(args: FailExecutionArgs) -> FailExecutionResult {
keys(k: &ExecOpKeys<'_>) {
k.ctx.core(),
k.ctx.attempt_hash(args.attempt_index),
k.idx.lease_expiry(),
k.idx.worker_leases(k.worker_instance_id),
k.idx.lane_terminal(k.lane_id),
k.idx.lane_delayed(k.lane_id),
k.ctx.lease_current(),
k.ctx.lease_history(),
k.idx.lane_active(k.lane_id),
k.ctx.stream_meta(args.attempt_index),
k.idx.attempt_timeout(),
k.idx.execution_deadline(),
}
argv {
args.execution_id.to_string(),
args.lease_id.to_string(),
args.lease_epoch.to_string(),
args.attempt_id.to_string(),
args.failure_reason.clone(),
args.failure_category.clone(),
args.retry_policy_json.clone(),
}
}
}
impl FromFcallResult for FailExecutionResult {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?.into_success()?;
let sub_status = r.field_str(0);
match sub_status.as_str() {
"retry_scheduled" => {
let delay_str = r.field_str(1);
let delay_ms: i64 = delay_str
.parse()
.map_err(|e| ScriptError::Parse(format!("bad delay_until: {e}")))?;
Ok(FailExecutionResult::RetryScheduled {
delay_until: TimestampMs::from_millis(delay_ms),
next_attempt_index: AttemptIndex::new(0), })
}
"terminal_failed" => Ok(FailExecutionResult::TerminalFailed),
_ => Err(ScriptError::Parse(format!(
"unexpected fail sub-status: {sub_status}"
))),
}
}
}
ff_function! {
pub ff_expire_execution(args: ExpireExecutionArgs) -> ExpireExecutionResultPartial {
keys(k: &ExecOpKeys<'_>) {
k.ctx.core(),
k.ctx.attempt_hash(AttemptIndex::new(0)), k.ctx.stream_meta(AttemptIndex::new(0)), k.ctx.lease_current(),
k.ctx.lease_history(),
k.idx.lease_expiry(),
k.idx.worker_leases(k.worker_instance_id),
k.idx.lane_active(k.lane_id),
k.idx.lane_terminal(k.lane_id),
k.idx.attempt_timeout(),
k.idx.execution_deadline(),
k.idx.lane_suspended(k.lane_id),
k.idx.suspension_timeout(),
k.ctx.suspension_current(),
}
argv {
args.execution_id.to_string(),
args.expire_reason.clone(),
}
}
}
impl FromFcallResult for ExpireExecutionResultPartial {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?.into_success()?;
let sub = r.field_str(0);
match sub.as_str() {
"already_terminal" | "not_found_cleaned" => Ok(Self::AlreadyTerminal),
"expired" => Ok(Self::Expired),
_ => Ok(Self::Expired),
}
}
}
fn parse_public_state(s: &str) -> Result<PublicState, ScriptError> {
match s {
"waiting" => Ok(PublicState::Waiting),
"delayed" => Ok(PublicState::Delayed),
"rate_limited" => Ok(PublicState::RateLimited),
"waiting_children" => Ok(PublicState::WaitingChildren),
"active" => Ok(PublicState::Active),
"suspended" => Ok(PublicState::Suspended),
"completed" => Ok(PublicState::Completed),
"failed" => Ok(PublicState::Failed),
"cancelled" => Ok(PublicState::Cancelled),
"expired" => Ok(PublicState::Expired),
"skipped" => Ok(PublicState::Skipped),
_ => Err(ScriptError::Parse(format!("unknown public_state: {s}"))),
}
}
fn parse_attempt_type(s: &str) -> Result<AttemptType, ScriptError> {
match s {
"initial" => Ok(AttemptType::Initial),
"retry" => Ok(AttemptType::Retry),
"reclaim" => Ok(AttemptType::Reclaim),
"replay" => Ok(AttemptType::Replay),
"fallback" => Ok(AttemptType::Fallback),
_ => Err(ScriptError::Parse(format!("unknown attempt_type: {s}"))),
}
}
#[cfg(test)]
mod partial_tests {
use super::*;
use ff_core::partition::PartitionConfig;
fn test_eid() -> ExecutionId {
ExecutionId::for_flow(&FlowId::new(), &PartitionConfig::default())
}
#[test]
fn claim_partial_complete_attaches_execution_id() {
let partial = ClaimExecutionResultPartial::Claimed(ClaimedExecutionPartial {
lease_id: LeaseId::new(),
lease_epoch: LeaseEpoch::new(1),
attempt_index: AttemptIndex::new(0),
attempt_id: AttemptId::new(),
attempt_type: AttemptType::Initial,
lease_expires_at: TimestampMs::from_millis(1000),
});
let eid = test_eid();
let full = partial.complete(eid.clone());
match full {
ClaimExecutionResult::Claimed(c) => assert_eq!(c.execution_id, eid),
}
}
#[test]
fn complete_partial_complete_attaches_execution_id() {
let partial = CompleteExecutionResultPartial::Completed {
public_state: PublicState::Completed,
};
let eid = test_eid();
let full = partial.complete(eid.clone());
match full {
CompleteExecutionResult::Completed { execution_id, .. } => assert_eq!(execution_id, eid),
}
}
#[test]
fn cancel_partial_complete_attaches_execution_id() {
let partial = CancelExecutionResultPartial::Cancelled {
public_state: PublicState::Cancelled,
};
let eid = test_eid();
let full = partial.complete(eid.clone());
match full {
CancelExecutionResult::Cancelled { execution_id, .. } => assert_eq!(execution_id, eid),
}
}
#[test]
fn delay_partial_complete_attaches_execution_id() {
let partial = DelayExecutionResultPartial::Delayed {
public_state: PublicState::Delayed,
};
let eid = test_eid();
let full = partial.complete(eid.clone());
match full {
DelayExecutionResult::Delayed { execution_id, .. } => assert_eq!(execution_id, eid),
}
}
#[test]
fn move_to_waiting_children_partial_complete_attaches_execution_id() {
let partial = MoveToWaitingChildrenResultPartial::Moved {
public_state: PublicState::WaitingChildren,
};
let eid = test_eid();
let full = partial.complete(eid.clone());
match full {
MoveToWaitingChildrenResult::Moved { execution_id, .. } => assert_eq!(execution_id, eid),
}
}
#[test]
fn expire_partial_expired_variant_attaches_execution_id() {
let partial = ExpireExecutionResultPartial::Expired;
let eid = test_eid();
let full = partial.complete(eid.clone());
match full {
ExpireExecutionResult::Expired { execution_id } => assert_eq!(execution_id, eid),
_ => panic!("expected Expired variant"),
}
}
#[test]
fn expire_partial_already_terminal_variant_ignores_execution_id() {
let partial = ExpireExecutionResultPartial::AlreadyTerminal;
let eid = test_eid();
let full = partial.complete(eid);
assert!(matches!(full, ExpireExecutionResult::AlreadyTerminal));
}
}