use ff_core::error::ErrorClass;
use crate::retry::is_retryable_kind;
#[derive(Debug, thiserror::Error)]
pub enum ScriptError {
#[error("stale_lease: lease superseded by reclaim")]
StaleLease,
#[error("lease_expired: lease TTL elapsed")]
LeaseExpired,
#[error("lease_revoked: operator revoked lease")]
LeaseRevoked,
#[error("execution_not_active: execution is not in active state")]
ExecutionNotActive,
#[error("no_active_lease: target has no active lease")]
NoActiveLease,
#[error("active_attempt_exists: invariant violation")]
ActiveAttemptExists,
#[error("use_claim_resumed_execution: attempt_interrupted, use resume claim path")]
UseClaimResumedExecution,
#[error("not_a_resumed_execution: use normal claim path")]
NotAResumedExecution,
#[error("execution_not_leaseable: state changed since grant")]
ExecutionNotLeaseable,
#[error("lease_conflict: another worker holds lease")]
LeaseConflict,
#[error("invalid_claim_grant: grant missing or mismatched")]
InvalidClaimGrant,
#[error("claim_grant_expired: grant TTL elapsed")]
ClaimGrantExpired,
#[error("no_eligible_execution: no execution available")]
NoEligibleExecution,
#[error("budget_exceeded: hard budget limit reached")]
BudgetExceeded,
#[error("budget_soft_exceeded: soft budget limit reached")]
BudgetSoftExceeded,
#[error("execution_not_suspended: already resumed or cancelled")]
ExecutionNotSuspended,
#[error("already_suspended: suspension already active")]
AlreadySuspended,
#[error("waitpoint_closed: waitpoint already closed")]
WaitpointClosed,
#[error("waitpoint_not_found: waitpoint does not exist yet")]
WaitpointNotFound,
#[error("target_not_signalable: no valid signal target")]
TargetNotSignalable,
#[error("waitpoint_pending_use_buffer_script: route to buffer script")]
WaitpointPendingUseBufferScript,
#[error("duplicate_signal: signal already delivered")]
DuplicateSignal,
#[error("payload_too_large: signal payload exceeds 64KB")]
PayloadTooLarge,
#[error("signal_limit_exceeded: max signals per execution reached")]
SignalLimitExceeded,
#[error("invalid_waitpoint_key: MAC verification failed")]
InvalidWaitpointKey,
#[error("invalid_lease_for_suspend: lease/attempt binding mismatch")]
InvalidLeaseForSuspend,
#[error("resume_condition_not_met: resume conditions not satisfied")]
ResumeConditionNotMet,
#[error("waitpoint_not_pending: waitpoint is not in pending state")]
WaitpointNotPending,
#[error("pending_waitpoint_expired: pending waitpoint aged out")]
PendingWaitpointExpired,
#[error("invalid_waitpoint_for_execution: waitpoint does not belong to execution")]
InvalidWaitpointForExecution,
#[error("waitpoint_already_exists: waitpoint already exists")]
WaitpointAlreadyExists,
#[error("waitpoint_not_open: waitpoint is not pending or active")]
WaitpointNotOpen,
#[error("execution_not_terminal: cannot replay non-terminal execution")]
ExecutionNotTerminal,
#[error("max_replays_exhausted: replay limit reached")]
MaxReplaysExhausted,
#[error("stream_closed: attempt terminal, no appends allowed")]
StreamClosed,
#[error("stale_owner_cannot_append: lease mismatch on append")]
StaleOwnerCannotAppend,
#[error("retention_limit_exceeded: frame exceeds size limit")]
RetentionLimitExceeded,
#[error("execution_not_eligible: state changed")]
ExecutionNotEligible,
#[error("execution_not_in_eligible_set: removed by another scheduler")]
ExecutionNotInEligibleSet,
#[error("grant_already_exists: grant already active")]
GrantAlreadyExists,
#[error("execution_not_reclaimable: already reclaimed or cancelled")]
ExecutionNotReclaimable,
#[error("invalid_dependency: dependency edge not found")]
InvalidDependency,
#[error("stale_graph_revision: graph has been updated")]
StaleGraphRevision,
#[error("execution_already_in_flow: execution belongs to another flow")]
ExecutionAlreadyInFlow,
#[error("cycle_detected: dependency edge would create cycle")]
CycleDetected,
#[error("flow_not_found: flow does not exist")]
FlowNotFound,
#[error("execution_not_in_flow: execution not in flow")]
ExecutionNotInFlow,
#[error("dependency_already_exists: edge already exists")]
DependencyAlreadyExists,
#[error("self_referencing_edge: upstream and downstream are the same")]
SelfReferencingEdge,
#[error("flow_already_terminal: flow is already terminal")]
FlowAlreadyTerminal,
#[error("deps_not_satisfied: dependencies still unresolved")]
DepsNotSatisfied,
#[error("not_blocked_by_deps: execution not blocked by dependencies")]
NotBlockedByDeps,
#[error("not_runnable: execution is not in runnable state")]
NotRunnable,
#[error("terminal: execution is already terminal")]
Terminal,
#[error("invalid_blocking_reason: unrecognized blocking reason")]
InvalidBlockingReason,
#[error("ok_already_applied: usage seq already processed")]
OkAlreadyApplied,
#[error("attempt_not_found: attempt index does not exist")]
AttemptNotFound,
#[error("attempt_not_in_created_state: internal sequencing error")]
AttemptNotInCreatedState,
#[error("attempt_not_started: attempt not in started state")]
AttemptNotStarted,
#[error("attempt_already_terminal: attempt already ended")]
AttemptAlreadyTerminal,
#[error("execution_not_found: execution does not exist")]
ExecutionNotFound,
#[error("execution_not_eligible_for_attempt: wrong state for new attempt")]
ExecutionNotEligibleForAttempt,
#[error("replay_not_allowed: execution not terminal or limit reached")]
ReplayNotAllowed,
#[error("max_retries_exhausted: retry limit reached")]
MaxRetriesExhausted,
#[error("stream_not_found: no frames appended yet")]
StreamNotFound,
#[error("stream_already_closed: stream already closed")]
StreamAlreadyClosed,
#[error("invalid_frame_type: unrecognized frame type")]
InvalidFrameType,
#[error("invalid_offset: invalid stream ID offset")]
InvalidOffset,
#[error("unauthorized: authentication/authorization failed")]
Unauthorized,
#[error("budget_not_found: budget does not exist")]
BudgetNotFound,
#[error("invalid_budget_scope: malformed budget scope")]
InvalidBudgetScope,
#[error("budget_attach_conflict: budget attachment conflict")]
BudgetAttachConflict,
#[error("budget_override_not_allowed: insufficient privileges")]
BudgetOverrideNotAllowed,
#[error("quota_policy_not_found: quota policy does not exist")]
QuotaPolicyNotFound,
#[error("rate_limit_exceeded: rate limit window full")]
RateLimitExceeded,
#[error("concurrency_limit_exceeded: concurrency cap reached")]
ConcurrencyLimitExceeded,
#[error("quota_attach_conflict: quota policy already attached")]
QuotaAttachConflict,
#[error("invalid_quota_spec: malformed quota policy definition")]
InvalidQuotaSpec,
#[error("invalid_input: {0}")]
InvalidInput(String),
#[error("capability_mismatch: missing {0}")]
CapabilityMismatch(String),
#[error("invalid_capabilities: {0}")]
InvalidCapabilities(String),
#[error("invalid_policy_json: {0}")]
InvalidPolicyJson(String),
#[error("waitpoint_not_token_bound")]
WaitpointNotTokenBound,
#[error("valkey: {0}")]
Valkey(#[from] ferriskey::Error),
#[error("parse error: {0}")]
Parse(String),
}
impl ScriptError {
pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
match self {
Self::Valkey(e) => Some(e.kind()),
_ => None,
}
}
pub fn class(&self) -> ErrorClass {
match self {
Self::StaleLease
| Self::LeaseExpired
| Self::LeaseRevoked
| Self::ExecutionNotActive
| Self::TargetNotSignalable
| Self::PayloadTooLarge
| Self::SignalLimitExceeded
| Self::InvalidWaitpointKey
| Self::ExecutionNotTerminal
| Self::MaxReplaysExhausted
| Self::StreamClosed
| Self::StaleOwnerCannotAppend
| Self::RetentionLimitExceeded
| Self::InvalidLeaseForSuspend
| Self::ResumeConditionNotMet
| Self::InvalidDependency
| Self::ExecutionAlreadyInFlow
| Self::CycleDetected
| Self::FlowNotFound
| Self::ExecutionNotInFlow
| Self::DependencyAlreadyExists
| Self::SelfReferencingEdge
| Self::FlowAlreadyTerminal
| Self::InvalidWaitpointForExecution
| Self::InvalidBlockingReason
| Self::NotRunnable
| Self::Terminal
| Self::AttemptNotFound
| Self::AttemptNotStarted
| Self::ExecutionNotFound
| Self::ExecutionNotEligibleForAttempt
| Self::ReplayNotAllowed
| Self::MaxRetriesExhausted
| Self::Unauthorized
| Self::BudgetNotFound
| Self::InvalidBudgetScope
| Self::BudgetAttachConflict
| Self::BudgetOverrideNotAllowed
| Self::QuotaPolicyNotFound
| Self::QuotaAttachConflict
| Self::InvalidQuotaSpec
| Self::InvalidInput(_)
| Self::InvalidCapabilities(_)
| Self::InvalidPolicyJson(_)
| Self::WaitpointNotTokenBound
| Self::Parse(_) => ErrorClass::Terminal,
Self::Valkey(e) => {
if is_retryable_kind(e.kind()) {
ErrorClass::Retryable
} else {
ErrorClass::Terminal
}
}
Self::UseClaimResumedExecution
| Self::NotAResumedExecution
| Self::ExecutionNotLeaseable
| Self::LeaseConflict
| Self::InvalidClaimGrant
| Self::ClaimGrantExpired
| Self::NoEligibleExecution
| Self::WaitpointNotFound
| Self::WaitpointPendingUseBufferScript
| Self::StaleGraphRevision
| Self::RateLimitExceeded
| Self::ConcurrencyLimitExceeded
| Self::CapabilityMismatch(_)
| Self::InvalidOffset => ErrorClass::Retryable,
Self::BudgetExceeded => ErrorClass::Cooperative,
Self::ExecutionNotSuspended
| Self::AlreadySuspended
| Self::WaitpointClosed
| Self::DuplicateSignal
| Self::ExecutionNotEligible
| Self::ExecutionNotInEligibleSet
| Self::GrantAlreadyExists
| Self::ExecutionNotReclaimable
| Self::NoActiveLease
| Self::OkAlreadyApplied
| Self::AttemptAlreadyTerminal
| Self::StreamAlreadyClosed
| Self::BudgetSoftExceeded
| Self::WaitpointAlreadyExists
| Self::WaitpointNotOpen
| Self::WaitpointNotPending
| Self::PendingWaitpointExpired
| Self::NotBlockedByDeps
| Self::DepsNotSatisfied => ErrorClass::Informational,
Self::ActiveAttemptExists | Self::AttemptNotInCreatedState => ErrorClass::Bug,
Self::StreamNotFound => ErrorClass::Expected,
Self::InvalidFrameType => ErrorClass::SoftError,
}
}
pub fn from_code(code: &str) -> Option<Self> {
Some(match code {
"stale_lease" => Self::StaleLease,
"lease_expired" => Self::LeaseExpired,
"lease_revoked" => Self::LeaseRevoked,
"execution_not_active" => Self::ExecutionNotActive,
"no_active_lease" => Self::NoActiveLease,
"active_attempt_exists" => Self::ActiveAttemptExists,
"use_claim_resumed_execution" => Self::UseClaimResumedExecution,
"not_a_resumed_execution" => Self::NotAResumedExecution,
"execution_not_leaseable" => Self::ExecutionNotLeaseable,
"lease_conflict" => Self::LeaseConflict,
"invalid_claim_grant" => Self::InvalidClaimGrant,
"claim_grant_expired" => Self::ClaimGrantExpired,
"no_eligible_execution" => Self::NoEligibleExecution,
"budget_exceeded" => Self::BudgetExceeded,
"budget_soft_exceeded" => Self::BudgetSoftExceeded,
"execution_not_suspended" => Self::ExecutionNotSuspended,
"already_suspended" => Self::AlreadySuspended,
"waitpoint_closed" => Self::WaitpointClosed,
"waitpoint_not_found" => Self::WaitpointNotFound,
"target_not_signalable" => Self::TargetNotSignalable,
"waitpoint_pending_use_buffer_script" => Self::WaitpointPendingUseBufferScript,
"duplicate_signal" => Self::DuplicateSignal,
"payload_too_large" => Self::PayloadTooLarge,
"signal_limit_exceeded" => Self::SignalLimitExceeded,
"invalid_waitpoint_key" => Self::InvalidWaitpointKey,
"invalid_lease_for_suspend" => Self::InvalidLeaseForSuspend,
"resume_condition_not_met" => Self::ResumeConditionNotMet,
"waitpoint_not_pending" => Self::WaitpointNotPending,
"pending_waitpoint_expired" => Self::PendingWaitpointExpired,
"invalid_waitpoint_for_execution" => Self::InvalidWaitpointForExecution,
"waitpoint_already_exists" => Self::WaitpointAlreadyExists,
"waitpoint_not_open" => Self::WaitpointNotOpen,
"execution_not_terminal" => Self::ExecutionNotTerminal,
"max_replays_exhausted" => Self::MaxReplaysExhausted,
"stream_closed" => Self::StreamClosed,
"stale_owner_cannot_append" => Self::StaleOwnerCannotAppend,
"retention_limit_exceeded" => Self::RetentionLimitExceeded,
"execution_not_eligible" => Self::ExecutionNotEligible,
"execution_not_in_eligible_set" => Self::ExecutionNotInEligibleSet,
"grant_already_exists" => Self::GrantAlreadyExists,
"execution_not_reclaimable" => Self::ExecutionNotReclaimable,
"invalid_dependency" => Self::InvalidDependency,
"stale_graph_revision" => Self::StaleGraphRevision,
"execution_already_in_flow" => Self::ExecutionAlreadyInFlow,
"cycle_detected" => Self::CycleDetected,
"flow_not_found" => Self::FlowNotFound,
"execution_not_in_flow" => Self::ExecutionNotInFlow,
"dependency_already_exists" => Self::DependencyAlreadyExists,
"self_referencing_edge" => Self::SelfReferencingEdge,
"flow_already_terminal" => Self::FlowAlreadyTerminal,
"deps_not_satisfied" => Self::DepsNotSatisfied,
"not_blocked_by_deps" => Self::NotBlockedByDeps,
"not_runnable" => Self::NotRunnable,
"terminal" => Self::Terminal,
"invalid_blocking_reason" => Self::InvalidBlockingReason,
"ok_already_applied" => Self::OkAlreadyApplied,
"attempt_not_found" => Self::AttemptNotFound,
"attempt_not_in_created_state" => Self::AttemptNotInCreatedState,
"attempt_not_started" => Self::AttemptNotStarted,
"attempt_already_terminal" => Self::AttemptAlreadyTerminal,
"execution_not_found" => Self::ExecutionNotFound,
"execution_not_eligible_for_attempt" => Self::ExecutionNotEligibleForAttempt,
"replay_not_allowed" => Self::ReplayNotAllowed,
"max_retries_exhausted" => Self::MaxRetriesExhausted,
"stream_not_found" => Self::StreamNotFound,
"stream_already_closed" => Self::StreamAlreadyClosed,
"invalid_frame_type" => Self::InvalidFrameType,
"invalid_offset" => Self::InvalidOffset,
"unauthorized" => Self::Unauthorized,
"budget_not_found" => Self::BudgetNotFound,
"invalid_budget_scope" => Self::InvalidBudgetScope,
"budget_attach_conflict" => Self::BudgetAttachConflict,
"budget_override_not_allowed" => Self::BudgetOverrideNotAllowed,
"quota_policy_not_found" => Self::QuotaPolicyNotFound,
"rate_limit_exceeded" => Self::RateLimitExceeded,
"concurrency_limit_exceeded" => Self::ConcurrencyLimitExceeded,
"quota_attach_conflict" => Self::QuotaAttachConflict,
"invalid_quota_spec" => Self::InvalidQuotaSpec,
"invalid_input" => Self::InvalidInput(String::new()),
"capability_mismatch" => Self::CapabilityMismatch(String::new()),
"invalid_capabilities" => Self::InvalidCapabilities(String::new()),
"invalid_policy_json" => Self::InvalidPolicyJson(String::new()),
"waitpoint_not_token_bound" => Self::WaitpointNotTokenBound,
_ => return None,
})
}
pub fn from_code_with_detail(code: &str, detail: &str) -> Option<Self> {
let base = Self::from_code(code)?;
Some(match base {
Self::CapabilityMismatch(_) => Self::CapabilityMismatch(detail.to_owned()),
Self::InvalidCapabilities(_) => Self::InvalidCapabilities(detail.to_owned()),
Self::InvalidPolicyJson(_) => Self::InvalidPolicyJson(detail.to_owned()),
Self::InvalidInput(_) => Self::InvalidInput(detail.to_owned()),
other => other,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn error_classification_terminal() {
assert_eq!(ScriptError::StaleLease.class(), ErrorClass::Terminal);
assert_eq!(ScriptError::LeaseExpired.class(), ErrorClass::Terminal);
assert_eq!(ScriptError::ExecutionNotFound.class(), ErrorClass::Terminal);
}
#[test]
fn error_classification_retryable() {
assert_eq!(
ScriptError::UseClaimResumedExecution.class(),
ErrorClass::Retryable
);
assert_eq!(
ScriptError::NoEligibleExecution.class(),
ErrorClass::Retryable
);
assert_eq!(
ScriptError::WaitpointNotFound.class(),
ErrorClass::Retryable
);
assert_eq!(
ScriptError::RateLimitExceeded.class(),
ErrorClass::Retryable
);
}
#[test]
fn error_classification_cooperative() {
assert_eq!(ScriptError::BudgetExceeded.class(), ErrorClass::Cooperative);
}
#[test]
fn error_classification_valkey_transient_is_retryable() {
use ferriskey::ErrorKind;
let transient = ScriptError::Valkey(ferriskey::Error::from((
ErrorKind::IoError,
"connection dropped",
)));
assert_eq!(transient.class(), ErrorClass::Retryable);
}
#[test]
fn error_classification_valkey_permanent_is_terminal() {
use ferriskey::ErrorKind;
let permanent = ScriptError::Valkey(ferriskey::Error::from((
ErrorKind::AuthenticationFailed,
"bad creds",
)));
assert_eq!(permanent.class(), ErrorClass::Terminal);
let fatal_recv = ScriptError::Valkey(ferriskey::Error::from((
ErrorKind::FatalReceiveError,
"response lost",
)));
assert_eq!(fatal_recv.class(), ErrorClass::Terminal);
}
#[test]
fn error_classification_informational() {
assert_eq!(
ScriptError::ExecutionNotSuspended.class(),
ErrorClass::Informational
);
assert_eq!(
ScriptError::DuplicateSignal.class(),
ErrorClass::Informational
);
assert_eq!(
ScriptError::OkAlreadyApplied.class(),
ErrorClass::Informational
);
}
#[test]
fn error_classification_bug() {
assert_eq!(ScriptError::ActiveAttemptExists.class(), ErrorClass::Bug);
assert_eq!(
ScriptError::AttemptNotInCreatedState.class(),
ErrorClass::Bug
);
}
#[test]
fn error_classification_expected() {
assert_eq!(ScriptError::StreamNotFound.class(), ErrorClass::Expected);
}
#[test]
fn error_classification_budget_soft_exceeded() {
assert_eq!(
ScriptError::BudgetSoftExceeded.class(),
ErrorClass::Informational
);
}
#[test]
fn error_classification_soft_error() {
assert_eq!(ScriptError::InvalidFrameType.class(), ErrorClass::SoftError);
}
#[test]
fn from_code_roundtrip() {
let codes = [
"stale_lease", "lease_expired", "lease_revoked",
"execution_not_active", "no_active_lease", "active_attempt_exists",
"use_claim_resumed_execution", "not_a_resumed_execution",
"execution_not_leaseable", "lease_conflict",
"invalid_claim_grant", "claim_grant_expired",
"budget_exceeded", "budget_soft_exceeded",
"execution_not_suspended", "already_suspended",
"waitpoint_closed", "waitpoint_not_found",
"target_not_signalable", "waitpoint_pending_use_buffer_script",
"invalid_lease_for_suspend", "resume_condition_not_met",
"signal_limit_exceeded",
"execution_not_terminal", "max_replays_exhausted",
"stream_closed", "stale_owner_cannot_append", "retention_limit_exceeded",
"execution_not_eligible", "execution_not_in_eligible_set",
"grant_already_exists", "execution_not_reclaimable",
"invalid_dependency", "stale_graph_revision",
"execution_already_in_flow", "cycle_detected",
"execution_not_found", "max_retries_exhausted",
"flow_not_found", "execution_not_in_flow",
"dependency_already_exists", "self_referencing_edge",
"flow_already_terminal",
"deps_not_satisfied", "not_blocked_by_deps",
"not_runnable", "terminal", "invalid_blocking_reason",
"waitpoint_not_pending", "pending_waitpoint_expired",
"invalid_waitpoint_for_execution", "waitpoint_already_exists",
"waitpoint_not_open",
];
for code in codes {
let err = ScriptError::from_code(code);
assert!(err.is_some(), "failed to parse code: {code}");
}
}
#[test]
fn from_code_unknown_returns_none() {
assert!(ScriptError::from_code("nonexistent_error").is_none());
}
}