use crate::error::ErrorClass;
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum EngineError {
#[error("not found: {entity}")]
NotFound { entity: &'static str },
#[error("validation: {kind:?}: {detail}")]
Validation {
kind: ValidationKind,
detail: String,
},
#[error("contention: {0:?}")]
Contention(ContentionKind),
#[error("conflict: {0:?}")]
Conflict(ConflictKind),
#[error("state: {0:?}")]
State(StateKind),
#[error("bug: {0:?}")]
Bug(BugKind),
#[error("transport ({backend}): {source}")]
Transport {
backend: &'static str,
#[source]
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("unavailable: {op}")]
Unavailable { op: &'static str },
#[error("resource exhausted: pool={pool} max={max}")]
ResourceExhausted {
pool: &'static str,
max: u32,
retry_after_ms: Option<u32>,
},
#[error("timeout: op={op} elapsed={elapsed:?}")]
Timeout {
op: &'static str,
elapsed: std::time::Duration,
},
#[error("stream disconnected; reconnect with returned cursor")]
StreamDisconnected {
cursor: crate::stream_subscribe::StreamCursor,
},
#[error("stream backpressure; events dropped")]
StreamBackpressure,
#[error("{context}: {source}")]
Contextual {
#[source]
source: Box<EngineError>,
context: String,
},
}
pub fn backend_context(err: EngineError, context: impl Into<String>) -> EngineError {
match err {
EngineError::Transport { .. }
| EngineError::Unavailable { .. }
| EngineError::ResourceExhausted { .. }
| EngineError::Timeout { .. }
| EngineError::Contextual { .. } => EngineError::Contextual {
source: Box::new(err),
context: context.into(),
},
other => other,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum ValidationKind {
InvalidInput,
CapabilityMismatch,
InvalidCapabilities,
InvalidPolicyJson,
PayloadTooLarge,
SignalLimitExceeded,
InvalidWaitpointKey,
InvalidToken,
WaitpointNotTokenBound,
RetentionLimitExceeded,
InvalidLeaseForSuspend,
InvalidDependency,
InvalidWaitpointForExecution,
InvalidBlockingReason,
InvalidOffset,
Unauthorized,
InvalidBudgetScope,
BudgetOverrideNotAllowed,
InvalidQuotaSpec,
InvalidKid,
InvalidSecretHex,
InvalidGraceMs,
InvalidTagKey,
InvalidFrameType,
Corruption,
HandleFromOtherBackend,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum ContentionKind {
UseClaimResumedExecution,
NotAResumedExecution,
ExecutionNotLeaseable,
LeaseConflict,
InvalidClaimGrant,
ClaimGrantExpired,
NoEligibleExecution,
WaitpointNotFound,
WaitpointPendingUseBufferScript,
StaleGraphRevision,
ExecutionNotActive {
terminal_outcome: String,
lease_epoch: String,
lifecycle_phase: String,
attempt_id: String,
},
ExecutionNotEligible,
ExecutionNotInEligibleSet,
ExecutionNotReclaimable,
NoActiveLease,
RateLimitExceeded,
ConcurrencyLimitExceeded,
RetryExhausted,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum ConflictKind {
DependencyAlreadyExists {
existing: crate::contracts::EdgeSnapshot,
},
CycleDetected,
SelfReferencingEdge,
ExecutionAlreadyInFlow,
WaitpointAlreadyExists,
BudgetAttachConflict,
QuotaAttachConflict,
RotationConflict(String),
ActiveAttemptExists,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum StateKind {
StaleLease,
LeaseExpired,
LeaseRevoked,
ExecutionNotSuspended,
AlreadySuspended,
WaitpointClosed,
TargetNotSignalable,
DuplicateSignal,
ResumeConditionNotMet,
WaitpointNotPending,
PendingWaitpointExpired,
WaitpointNotOpen,
ExecutionNotTerminal,
MaxReplaysExhausted,
StreamClosed,
StaleOwnerCannotAppend,
GrantAlreadyExists,
ExecutionNotInFlow,
FlowAlreadyTerminal,
DepsNotSatisfied,
NotBlockedByDeps,
NotRunnable,
Terminal,
BudgetExceeded,
BudgetSoftExceeded,
OkAlreadyApplied,
AttemptNotStarted,
AttemptAlreadyTerminal,
ExecutionNotEligibleForAttempt,
ReplayNotAllowed,
MaxRetriesExhausted,
StreamAlreadyClosed,
AlreadySatisfied,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum BugKind {
AttemptNotInCreatedState,
}
#[derive(Debug, Clone, thiserror::Error)]
#[non_exhaustive]
pub enum BackendError {
#[error("valkey backend: {kind:?}: {message}")]
Valkey {
kind: BackendErrorKind,
message: String,
},
}
impl BackendError {
pub fn kind(&self) -> BackendErrorKind {
match self {
Self::Valkey { kind, .. } => *kind,
}
}
pub fn message(&self) -> &str {
match self {
Self::Valkey { message, .. } => message.as_str(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum BackendErrorKind {
Transport,
Protocol,
Timeout,
Auth,
Cluster,
BusyLoading,
ScriptNotLoaded,
Other,
}
impl BackendErrorKind {
pub fn as_stable_str(&self) -> &'static str {
match self {
Self::Transport => "transport",
Self::Protocol => "protocol",
Self::Timeout => "timeout",
Self::Auth => "auth",
Self::Cluster => "cluster",
Self::BusyLoading => "busy_loading",
Self::ScriptNotLoaded => "script_not_loaded",
Self::Other => "other",
}
}
pub fn is_retryable(&self) -> bool {
matches!(
self,
Self::Transport | Self::Timeout | Self::Cluster | Self::BusyLoading
)
}
}
impl EngineError {
pub fn class(&self) -> ErrorClass {
match self {
Self::NotFound { .. } => ErrorClass::Terminal,
Self::Validation { .. } => ErrorClass::Terminal,
Self::Contention(_) => ErrorClass::Retryable,
Self::Conflict(_) => ErrorClass::Terminal,
Self::State(StateKind::BudgetExceeded) => ErrorClass::Cooperative,
Self::State(
StateKind::ExecutionNotSuspended
| StateKind::AlreadySuspended
| StateKind::AlreadySatisfied
| StateKind::WaitpointClosed
| StateKind::DuplicateSignal
| StateKind::GrantAlreadyExists
| StateKind::OkAlreadyApplied
| StateKind::AttemptAlreadyTerminal
| StateKind::StreamAlreadyClosed
| StateKind::BudgetSoftExceeded
| StateKind::WaitpointNotOpen
| StateKind::WaitpointNotPending
| StateKind::PendingWaitpointExpired
| StateKind::NotBlockedByDeps
| StateKind::DepsNotSatisfied,
) => ErrorClass::Informational,
Self::State(_) => ErrorClass::Terminal,
Self::Bug(_) => ErrorClass::Bug,
Self::Transport { .. } => ErrorClass::Terminal,
Self::Unavailable { .. } => ErrorClass::Terminal,
Self::ResourceExhausted { .. } => ErrorClass::Retryable,
Self::Timeout { .. } => ErrorClass::Terminal,
Self::StreamDisconnected { .. } => ErrorClass::Terminal,
Self::StreamBackpressure => ErrorClass::Informational,
Self::Contextual { source, .. } => source.class(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn class_contention_is_retryable() {
let err = EngineError::Contention(ContentionKind::LeaseConflict);
assert_eq!(err.class(), ErrorClass::Retryable);
}
#[test]
fn class_budget_exceeded_is_cooperative() {
let err = EngineError::State(StateKind::BudgetExceeded);
assert_eq!(err.class(), ErrorClass::Cooperative);
}
#[test]
fn class_duplicate_signal_is_informational() {
let err = EngineError::State(StateKind::DuplicateSignal);
assert_eq!(err.class(), ErrorClass::Informational);
}
#[test]
fn class_bug_variant() {
let err = EngineError::Bug(BugKind::AttemptNotInCreatedState);
assert_eq!(err.class(), ErrorClass::Bug);
}
#[test]
fn class_transport_defaults_terminal() {
let raw = std::io::Error::other("simulated transport error");
let err = EngineError::Transport {
backend: "test",
source: Box::new(raw),
};
assert_eq!(err.class(), ErrorClass::Terminal);
}
#[test]
fn unavailable_is_terminal() {
assert_eq!(
EngineError::Unavailable { op: "foo" }.class(),
ErrorClass::Terminal
);
}
#[test]
fn backend_context_wraps_transport_and_preserves_typed() {
let raw = std::io::Error::other("simulated transport error");
let wrapped = backend_context(
EngineError::Transport {
backend: "valkey",
source: Box::new(raw),
},
"renew: FCALL ff_renew_lease",
);
let rendered = format!("{wrapped}");
assert!(
rendered.starts_with("renew: FCALL ff_renew_lease: transport (valkey): "),
"expected context prefix, got: {rendered}"
);
let wrapped = backend_context(EngineError::Unavailable { op: "x" }, "ctx");
assert!(matches!(wrapped, EngineError::Contextual { .. }));
let inner = EngineError::Validation {
kind: ValidationKind::Corruption,
detail: "bad".into(),
};
let passthrough = backend_context(inner, "describe_edge: HGETALL edge");
match passthrough {
EngineError::Validation { kind, .. } => {
assert_eq!(kind, ValidationKind::Corruption);
}
other => panic!("expected Validation, got {other:?}"),
}
let inner = EngineError::Contention(ContentionKind::LeaseConflict);
assert_eq!(
backend_context(inner, "renew: FCALL ff_renew_lease").class(),
ErrorClass::Retryable
);
}
#[test]
fn backend_error_kind_round_trip() {
let be = BackendError::Valkey {
kind: BackendErrorKind::Transport,
message: "connection reset".into(),
};
assert_eq!(be.kind(), BackendErrorKind::Transport);
assert_eq!(be.message(), "connection reset");
}
#[test]
fn backend_kind_stable_strings_fixed() {
assert_eq!(BackendErrorKind::Transport.as_stable_str(), "transport");
assert_eq!(BackendErrorKind::Protocol.as_stable_str(), "protocol");
assert_eq!(BackendErrorKind::Timeout.as_stable_str(), "timeout");
assert_eq!(BackendErrorKind::Auth.as_stable_str(), "auth");
assert_eq!(BackendErrorKind::Cluster.as_stable_str(), "cluster");
assert_eq!(
BackendErrorKind::BusyLoading.as_stable_str(),
"busy_loading"
);
assert_eq!(
BackendErrorKind::ScriptNotLoaded.as_stable_str(),
"script_not_loaded"
);
assert_eq!(BackendErrorKind::Other.as_stable_str(), "other");
}
#[test]
fn backend_kind_retryability() {
for k in [
BackendErrorKind::Transport,
BackendErrorKind::Timeout,
BackendErrorKind::Cluster,
BackendErrorKind::BusyLoading,
] {
assert!(k.is_retryable(), "{k:?} should be retryable");
}
for k in [
BackendErrorKind::Protocol,
BackendErrorKind::Auth,
BackendErrorKind::ScriptNotLoaded,
BackendErrorKind::Other,
] {
assert!(!k.is_retryable(), "{k:?} should NOT be retryable");
}
}
}