1use crate::error::ScriptError;
20use ff_core::engine_error::{
21 BugKind, ConflictKind, ContentionKind, EngineError, StateKind, ValidationKind,
22};
23use ff_core::error::ErrorClass;
24
25pub fn transport_script(err: ScriptError) -> EngineError {
29 EngineError::Transport {
30 backend: "valkey",
31 source: Box::new(err),
32 }
33}
34
35pub fn transport_script_ref(err: &EngineError) -> Option<&ScriptError> {
40 match err {
41 EngineError::Transport { source, .. } => source.downcast_ref::<ScriptError>(),
42 EngineError::Contextual { source, .. } => transport_script_ref(source),
45 _ => None,
46 }
47}
48
49#[cfg(feature = "valkey-client")]
58pub fn valkey_kind(err: &EngineError) -> Option<ferriskey::ErrorKind> {
59 match err {
60 EngineError::Transport { source, .. } => source
61 .downcast_ref::<ScriptError>()
62 .and_then(|s| s.valkey_kind()),
63 EngineError::Contextual { source, .. } => valkey_kind(source),
65 _ => None,
66 }
67}
68
69pub fn class(err: &EngineError) -> ErrorClass {
77 match err {
78 EngineError::Transport { source, .. } => source
79 .downcast_ref::<ScriptError>()
80 .map(|s| s.class())
81 .unwrap_or(ErrorClass::Terminal),
82 EngineError::Contextual { source, .. } => class(source),
85 other => other.class(),
86 }
87}
88
89impl From<ScriptError> for EngineError {
90 fn from(err: ScriptError) -> Self {
91 use ScriptError as S;
92 match err {
93 S::ExecutionNotFound => Self::NotFound {
95 entity: "execution",
96 },
97 S::FlowNotFound => Self::NotFound { entity: "flow" },
98 S::AttemptNotFound => Self::NotFound { entity: "attempt" },
99 S::BudgetNotFound => Self::NotFound { entity: "budget" },
100 S::QuotaPolicyNotFound => Self::NotFound {
101 entity: "quota_policy",
102 },
103 S::StreamNotFound => Self::NotFound { entity: "stream" },
104
105 S::InvalidInput(d) => Self::Validation {
107 kind: ValidationKind::InvalidInput,
108 detail: d,
109 },
110 S::CapabilityMismatch(d) => Self::Validation {
111 kind: ValidationKind::CapabilityMismatch,
112 detail: d,
113 },
114 S::InvalidCapabilities(d) => Self::Validation {
115 kind: ValidationKind::InvalidCapabilities,
116 detail: d,
117 },
118 S::InvalidPolicyJson(d) => Self::Validation {
119 kind: ValidationKind::InvalidPolicyJson,
120 detail: d,
121 },
122 S::InvalidTagKey(d) => Self::Validation {
123 kind: ValidationKind::InvalidTagKey,
124 detail: d,
125 },
126 S::PayloadTooLarge => Self::Validation {
128 kind: ValidationKind::PayloadTooLarge,
129 detail: String::new(),
130 },
131 S::SignalLimitExceeded => Self::Validation {
132 kind: ValidationKind::SignalLimitExceeded,
133 detail: String::new(),
134 },
135 S::InvalidWaitpointKey => Self::Validation {
136 kind: ValidationKind::InvalidWaitpointKey,
137 detail: String::new(),
138 },
139 S::WaitpointNotTokenBound => Self::Validation {
140 kind: ValidationKind::WaitpointNotTokenBound,
141 detail: String::new(),
142 },
143 S::RetentionLimitExceeded => Self::Validation {
144 kind: ValidationKind::RetentionLimitExceeded,
145 detail: String::new(),
146 },
147 S::InvalidLeaseForSuspend => Self::Validation {
148 kind: ValidationKind::InvalidLeaseForSuspend,
149 detail: String::new(),
150 },
151 S::InvalidDependency => Self::Validation {
152 kind: ValidationKind::InvalidDependency,
153 detail: String::new(),
154 },
155 S::InvalidWaitpointForExecution => Self::Validation {
156 kind: ValidationKind::InvalidWaitpointForExecution,
157 detail: String::new(),
158 },
159 S::InvalidBlockingReason => Self::Validation {
160 kind: ValidationKind::InvalidBlockingReason,
161 detail: String::new(),
162 },
163 S::InvalidOffset => Self::Validation {
164 kind: ValidationKind::InvalidOffset,
165 detail: String::new(),
166 },
167 S::Unauthorized => Self::Validation {
168 kind: ValidationKind::Unauthorized,
169 detail: String::new(),
170 },
171 S::InvalidBudgetScope => Self::Validation {
172 kind: ValidationKind::InvalidBudgetScope,
173 detail: String::new(),
174 },
175 S::BudgetOverrideNotAllowed => Self::Validation {
176 kind: ValidationKind::BudgetOverrideNotAllowed,
177 detail: String::new(),
178 },
179 S::InvalidQuotaSpec => Self::Validation {
180 kind: ValidationKind::InvalidQuotaSpec,
181 detail: String::new(),
182 },
183 S::InvalidKid => Self::Validation {
184 kind: ValidationKind::InvalidKid,
185 detail: String::new(),
186 },
187 S::InvalidSecretHex => Self::Validation {
188 kind: ValidationKind::InvalidSecretHex,
189 detail: String::new(),
190 },
191 S::InvalidGraceMs => Self::Validation {
192 kind: ValidationKind::InvalidGraceMs,
193 detail: String::new(),
194 },
195 S::InvalidFrameType => Self::Validation {
196 kind: ValidationKind::InvalidFrameType,
197 detail: String::new(),
198 },
199
200 S::UseClaimResumedExecution => {
202 Self::Contention(ContentionKind::UseClaimResumedExecution)
203 }
204 S::NotAResumedExecution => Self::Contention(ContentionKind::NotAResumedExecution),
205 S::ExecutionNotLeaseable => Self::Contention(ContentionKind::ExecutionNotLeaseable),
206 S::LeaseConflict => Self::Contention(ContentionKind::LeaseConflict),
207 S::InvalidClaimGrant => Self::Contention(ContentionKind::InvalidClaimGrant),
208 S::ClaimGrantExpired => Self::Contention(ContentionKind::ClaimGrantExpired),
209 S::NoEligibleExecution => Self::Contention(ContentionKind::NoEligibleExecution),
210 S::WaitpointNotFound => Self::Contention(ContentionKind::WaitpointNotFound),
211 S::WaitpointPendingUseBufferScript => {
212 Self::Contention(ContentionKind::WaitpointPendingUseBufferScript)
213 }
214 S::StaleGraphRevision => Self::Contention(ContentionKind::StaleGraphRevision),
215 S::ExecutionNotActive {
216 terminal_outcome,
217 lease_epoch,
218 lifecycle_phase,
219 attempt_id,
220 } => Self::Contention(ContentionKind::ExecutionNotActive {
221 terminal_outcome,
222 lease_epoch,
223 lifecycle_phase,
224 attempt_id,
225 }),
226 S::ExecutionNotEligible => Self::Contention(ContentionKind::ExecutionNotEligible),
227 S::ExecutionNotInEligibleSet => {
228 Self::Contention(ContentionKind::ExecutionNotInEligibleSet)
229 }
230 S::ExecutionNotReclaimable => {
231 Self::Contention(ContentionKind::ExecutionNotReclaimable)
232 }
233 S::NoActiveLease => Self::Contention(ContentionKind::NoActiveLease),
234 S::RateLimitExceeded => Self::Contention(ContentionKind::RateLimitExceeded),
235 S::ConcurrencyLimitExceeded => {
236 Self::Contention(ContentionKind::ConcurrencyLimitExceeded)
237 }
238
239 S::DependencyAlreadyExists => transport_script(S::DependencyAlreadyExists),
247 S::CycleDetected => Self::Conflict(ConflictKind::CycleDetected),
248 S::SelfReferencingEdge => Self::Conflict(ConflictKind::SelfReferencingEdge),
249 S::ExecutionAlreadyInFlow => Self::Conflict(ConflictKind::ExecutionAlreadyInFlow),
250 S::WaitpointAlreadyExists => Self::Conflict(ConflictKind::WaitpointAlreadyExists),
251 S::BudgetAttachConflict => Self::Conflict(ConflictKind::BudgetAttachConflict),
252 S::QuotaAttachConflict => Self::Conflict(ConflictKind::QuotaAttachConflict),
253 S::RotationConflict(kid) => Self::Conflict(ConflictKind::RotationConflict(kid)),
254 S::ActiveAttemptExists => Self::Conflict(ConflictKind::ActiveAttemptExists),
255
256 S::StaleLease => Self::State(StateKind::StaleLease),
258 S::LeaseExpired => Self::State(StateKind::LeaseExpired),
259 S::LeaseRevoked => Self::State(StateKind::LeaseRevoked),
260 S::ExecutionNotSuspended => Self::State(StateKind::ExecutionNotSuspended),
261 S::AlreadySuspended => Self::State(StateKind::AlreadySuspended),
262 S::WaitpointClosed => Self::State(StateKind::WaitpointClosed),
263 S::TargetNotSignalable => Self::State(StateKind::TargetNotSignalable),
264 S::DuplicateSignal => Self::State(StateKind::DuplicateSignal),
265 S::ResumeConditionNotMet => Self::State(StateKind::ResumeConditionNotMet),
266 S::WaitpointNotPending => Self::State(StateKind::WaitpointNotPending),
267 S::PendingWaitpointExpired => Self::State(StateKind::PendingWaitpointExpired),
268 S::WaitpointNotOpen => Self::State(StateKind::WaitpointNotOpen),
269 S::ExecutionNotTerminal => Self::State(StateKind::ExecutionNotTerminal),
270 S::MaxReplaysExhausted => Self::State(StateKind::MaxReplaysExhausted),
271 S::StreamClosed => Self::State(StateKind::StreamClosed),
272 S::StaleOwnerCannotAppend => Self::State(StateKind::StaleOwnerCannotAppend),
273 S::GrantAlreadyExists => Self::State(StateKind::GrantAlreadyExists),
274 S::ExecutionNotInFlow => Self::State(StateKind::ExecutionNotInFlow),
275 S::FlowAlreadyTerminal => Self::State(StateKind::FlowAlreadyTerminal),
276 S::DepsNotSatisfied => Self::State(StateKind::DepsNotSatisfied),
277 S::NotBlockedByDeps => Self::State(StateKind::NotBlockedByDeps),
278 S::NotRunnable => Self::State(StateKind::NotRunnable),
279 S::Terminal => Self::State(StateKind::Terminal),
280 S::BudgetExceeded => Self::State(StateKind::BudgetExceeded),
281 S::BudgetSoftExceeded => Self::State(StateKind::BudgetSoftExceeded),
282 S::OkAlreadyApplied => Self::State(StateKind::OkAlreadyApplied),
283 S::AttemptNotStarted => Self::State(StateKind::AttemptNotStarted),
284 S::AttemptAlreadyTerminal => Self::State(StateKind::AttemptAlreadyTerminal),
285 S::ExecutionNotEligibleForAttempt => {
286 Self::State(StateKind::ExecutionNotEligibleForAttempt)
287 }
288 S::ReplayNotAllowed => Self::State(StateKind::ReplayNotAllowed),
289 S::MaxRetriesExhausted => Self::State(StateKind::MaxRetriesExhausted),
290 S::StreamAlreadyClosed => Self::State(StateKind::StreamAlreadyClosed),
291
292 S::AttemptNotInCreatedState => Self::Bug(BugKind::AttemptNotInCreatedState),
294
295 #[cfg(feature = "valkey-client")]
297 e @ (S::Parse { .. } | S::Valkey(_)) => transport_script(e),
298 #[cfg(not(feature = "valkey-client"))]
299 e @ S::Parse { .. } => transport_script(e),
300
301 other => transport_script(other),
309 }
310 }
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316
317 #[test]
318 fn not_found_mappings() {
319 assert!(matches!(
320 EngineError::from(ScriptError::ExecutionNotFound),
321 EngineError::NotFound { entity: "execution" }
322 ));
323 assert!(matches!(
324 EngineError::from(ScriptError::FlowNotFound),
325 EngineError::NotFound { entity: "flow" }
326 ));
327 }
328
329 #[test]
330 fn validation_detail_preserved() {
331 match EngineError::from(ScriptError::CapabilityMismatch("gpu,cuda".into())) {
332 EngineError::Validation {
333 kind: ValidationKind::CapabilityMismatch,
334 detail,
335 } => assert_eq!(detail, "gpu,cuda"),
336 other => panic!("{other:?}"),
337 }
338 }
339
340 #[test]
341 fn contention_bucket() {
342 assert!(matches!(
343 EngineError::from(ScriptError::LeaseConflict),
344 EngineError::Contention(ContentionKind::LeaseConflict)
345 ));
346 assert!(matches!(
347 EngineError::from(ScriptError::UseClaimResumedExecution),
348 EngineError::Contention(ContentionKind::UseClaimResumedExecution)
349 ));
350 }
351
352 #[test]
353 fn execution_not_active_detail_flows_through() {
354 let src = ScriptError::ExecutionNotActive {
355 terminal_outcome: "success".into(),
356 lease_epoch: "3".into(),
357 lifecycle_phase: "terminal".into(),
358 attempt_id: "att-1".into(),
359 };
360 match EngineError::from(src) {
361 EngineError::Contention(ContentionKind::ExecutionNotActive {
362 terminal_outcome,
363 lease_epoch,
364 lifecycle_phase,
365 attempt_id,
366 }) => {
367 assert_eq!(terminal_outcome, "success");
368 assert_eq!(lease_epoch, "3");
369 assert_eq!(lifecycle_phase, "terminal");
370 assert_eq!(attempt_id, "att-1");
371 }
372 other => panic!("{other:?}"),
373 }
374 }
375
376 #[test]
377 fn dependency_already_exists_falls_through_to_transport_without_enrich() {
378 let err = EngineError::from(ScriptError::DependencyAlreadyExists);
379 match &err {
380 EngineError::Transport { backend, source } => {
381 assert_eq!(*backend, "valkey");
382 assert!(matches!(
383 source.downcast_ref::<ScriptError>(),
384 Some(ScriptError::DependencyAlreadyExists)
385 ));
386 }
387 other => panic!("{other:?}"),
388 }
389 }
390
391 #[test]
392 fn conflict_variants() {
393 assert!(matches!(
394 EngineError::from(ScriptError::CycleDetected),
395 EngineError::Conflict(ConflictKind::CycleDetected)
396 ));
397 assert!(matches!(
398 EngineError::from(ScriptError::ExecutionAlreadyInFlow),
399 EngineError::Conflict(ConflictKind::ExecutionAlreadyInFlow)
400 ));
401 match EngineError::from(ScriptError::RotationConflict("kid-1".into())) {
402 EngineError::Conflict(ConflictKind::RotationConflict(k)) => assert_eq!(k, "kid-1"),
403 other => panic!("{other:?}"),
404 }
405 }
406
407 #[test]
408 fn state_variants() {
409 assert!(matches!(
410 EngineError::from(ScriptError::StaleLease),
411 EngineError::State(StateKind::StaleLease)
412 ));
413 assert!(matches!(
414 EngineError::from(ScriptError::BudgetExceeded),
415 EngineError::State(StateKind::BudgetExceeded)
416 ));
417 }
418
419 #[test]
420 fn bug_variants() {
421 assert!(matches!(
422 EngineError::from(ScriptError::AttemptNotInCreatedState),
423 EngineError::Bug(BugKind::AttemptNotInCreatedState)
424 ));
425 }
426
427 #[test]
428 fn transport_preserves_parse() {
429 let err = EngineError::from(ScriptError::Parse {
430 fcall: "test_fn".into(),
431 execution_id: None,
432 message: "bad envelope".into(),
433 });
434 match &err {
435 EngineError::Transport { backend, source } => {
436 assert_eq!(*backend, "valkey");
437 assert!(matches!(
438 source.downcast_ref::<ScriptError>(),
439 Some(ScriptError::Parse { .. })
440 ));
441 }
442 other => panic!("{other:?}"),
443 }
444 }
445
446 #[test]
447 fn transport_script_helper_round_trips() {
448 let err = transport_script(ScriptError::AttemptNotFound);
449 assert!(matches!(
450 transport_script_ref(&err),
451 Some(ScriptError::AttemptNotFound)
452 ));
453 assert_eq!(class(&err), ScriptError::AttemptNotFound.class());
454 }
455
456 #[cfg(feature = "valkey-client")]
457 #[test]
458 fn transport_preserves_valkey_kind() {
459 let src = ScriptError::Valkey(ferriskey::Error::from((
460 ferriskey::ErrorKind::IoError,
461 "boom",
462 )));
463 let err = EngineError::from(src);
464 assert_eq!(valkey_kind(&err), Some(ferriskey::ErrorKind::IoError));
465 }
466
467 #[cfg(feature = "valkey-client")]
468 #[test]
469 fn class_transport_delegates() {
470 let err = EngineError::from(ScriptError::Valkey(ferriskey::Error::from((
471 ferriskey::ErrorKind::IoError,
472 "x",
473 ))));
474 assert_eq!(class(&err), ErrorClass::Retryable);
475 }
476
477 #[test]
478 fn class_transport_with_non_script_source_terminal() {
479 let raw = std::io::Error::other("simulated postgres net error");
480 let err = EngineError::Transport {
481 backend: "postgres",
482 source: Box::new(raw),
483 };
484 assert_eq!(class(&err), ErrorClass::Terminal);
485 #[cfg(feature = "valkey-client")]
486 assert!(valkey_kind(&err).is_none());
487 assert!(transport_script_ref(&err).is_none());
488 }
489
490 #[test]
491 fn unavailable_has_no_script_source() {
492 let err = EngineError::Unavailable { op: "claim" };
493 assert_eq!(class(&err), ErrorClass::Terminal);
494 #[cfg(feature = "valkey-client")]
495 assert!(valkey_kind(&err).is_none());
496 assert!(transport_script_ref(&err).is_none());
497 }
498}