1use crate::error::ScriptError;
20use ff_core::engine_error::{
21 BugKind, ConflictKind, ContentionKind, EngineError, StateKind, ValidationKind,
22};
23use ff_core::error::ErrorClass;
24
25pub fn transport_script(err: ScriptError) -> EngineError {
29 EngineError::Transport {
30 backend: "valkey",
31 source: Box::new(err),
32 }
33}
34
35pub fn transport_script_ref(err: &EngineError) -> Option<&ScriptError> {
40 match err {
41 EngineError::Transport { source, .. } => source.downcast_ref::<ScriptError>(),
42 EngineError::Contextual { source, .. } => transport_script_ref(source),
45 _ => None,
46 }
47}
48
49#[cfg(feature = "valkey-client")]
58pub fn valkey_kind(err: &EngineError) -> Option<ferriskey::ErrorKind> {
59 match err {
60 EngineError::Transport { source, .. } => source
61 .downcast_ref::<ScriptError>()
62 .and_then(|s| s.valkey_kind()),
63 EngineError::Contextual { source, .. } => valkey_kind(source),
65 _ => None,
66 }
67}
68
69pub fn class(err: &EngineError) -> ErrorClass {
77 match err {
78 EngineError::Transport { source, .. } => source
79 .downcast_ref::<ScriptError>()
80 .map(|s| s.class())
81 .unwrap_or(ErrorClass::Terminal),
82 EngineError::Contextual { source, .. } => class(source),
85 other => other.class(),
86 }
87}
88
89impl From<ScriptError> for EngineError {
90 fn from(err: ScriptError) -> Self {
91 use ScriptError as S;
92 match err {
93 S::ExecutionNotFound => Self::NotFound {
95 entity: "execution",
96 },
97 S::FlowNotFound => Self::NotFound { entity: "flow" },
98 S::AttemptNotFound => Self::NotFound { entity: "attempt" },
99 S::BudgetNotFound => Self::NotFound { entity: "budget" },
100 S::QuotaPolicyNotFound => Self::NotFound {
101 entity: "quota_policy",
102 },
103 S::StreamNotFound => Self::NotFound { entity: "stream" },
104
105 S::InvalidInput(d) => Self::Validation {
107 kind: ValidationKind::InvalidInput,
108 detail: d,
109 },
110 S::CapabilityMismatch(d) => Self::Validation {
111 kind: ValidationKind::CapabilityMismatch,
112 detail: d,
113 },
114 S::InvalidCapabilities(d) => Self::Validation {
115 kind: ValidationKind::InvalidCapabilities,
116 detail: d,
117 },
118 S::InvalidPolicyJson(d) => Self::Validation {
119 kind: ValidationKind::InvalidPolicyJson,
120 detail: d,
121 },
122 S::InvalidTagKey(d) => Self::Validation {
123 kind: ValidationKind::InvalidTagKey,
124 detail: d,
125 },
126 S::PayloadTooLarge => Self::Validation {
128 kind: ValidationKind::PayloadTooLarge,
129 detail: String::new(),
130 },
131 S::SignalLimitExceeded => Self::Validation {
132 kind: ValidationKind::SignalLimitExceeded,
133 detail: String::new(),
134 },
135 S::InvalidWaitpointKey => Self::Validation {
136 kind: ValidationKind::InvalidWaitpointKey,
137 detail: String::new(),
138 },
139 S::InvalidToken => Self::Validation {
140 kind: ValidationKind::InvalidToken,
141 detail: String::new(),
142 },
143 S::WaitpointNotTokenBound => Self::Validation {
144 kind: ValidationKind::WaitpointNotTokenBound,
145 detail: String::new(),
146 },
147 S::RetentionLimitExceeded => Self::Validation {
148 kind: ValidationKind::RetentionLimitExceeded,
149 detail: String::new(),
150 },
151 S::InvalidLeaseForSuspend => Self::Validation {
152 kind: ValidationKind::InvalidLeaseForSuspend,
153 detail: String::new(),
154 },
155 S::InvalidDependency => Self::Validation {
156 kind: ValidationKind::InvalidDependency,
157 detail: String::new(),
158 },
159 S::InvalidWaitpointForExecution => Self::Validation {
160 kind: ValidationKind::InvalidWaitpointForExecution,
161 detail: String::new(),
162 },
163 S::InvalidBlockingReason => Self::Validation {
164 kind: ValidationKind::InvalidBlockingReason,
165 detail: String::new(),
166 },
167 S::InvalidOffset => Self::Validation {
168 kind: ValidationKind::InvalidOffset,
169 detail: String::new(),
170 },
171 S::Unauthorized => Self::Validation {
172 kind: ValidationKind::Unauthorized,
173 detail: String::new(),
174 },
175 S::InvalidBudgetScope => Self::Validation {
176 kind: ValidationKind::InvalidBudgetScope,
177 detail: String::new(),
178 },
179 S::BudgetOverrideNotAllowed => Self::Validation {
180 kind: ValidationKind::BudgetOverrideNotAllowed,
181 detail: String::new(),
182 },
183 S::InvalidQuotaSpec => Self::Validation {
184 kind: ValidationKind::InvalidQuotaSpec,
185 detail: String::new(),
186 },
187 S::InvalidKid => Self::Validation {
188 kind: ValidationKind::InvalidKid,
189 detail: String::new(),
190 },
191 S::InvalidSecretHex => Self::Validation {
192 kind: ValidationKind::InvalidSecretHex,
193 detail: String::new(),
194 },
195 S::InvalidGraceMs => Self::Validation {
196 kind: ValidationKind::InvalidGraceMs,
197 detail: String::new(),
198 },
199 S::InvalidFrameType => Self::Validation {
200 kind: ValidationKind::InvalidFrameType,
201 detail: String::new(),
202 },
203
204 S::UseClaimResumedExecution => {
206 Self::Contention(ContentionKind::UseClaimResumedExecution)
207 }
208 S::NotAResumedExecution => Self::Contention(ContentionKind::NotAResumedExecution),
209 S::ExecutionNotLeaseable => Self::Contention(ContentionKind::ExecutionNotLeaseable),
210 S::LeaseConflict => Self::Contention(ContentionKind::LeaseConflict),
211 S::InvalidClaimGrant => Self::Contention(ContentionKind::InvalidClaimGrant),
212 S::ClaimGrantExpired => Self::Contention(ContentionKind::ClaimGrantExpired),
213 S::NoEligibleExecution => Self::Contention(ContentionKind::NoEligibleExecution),
214 S::WaitpointNotFound => Self::Contention(ContentionKind::WaitpointNotFound),
215 S::WaitpointPendingUseBufferScript => {
216 Self::Contention(ContentionKind::WaitpointPendingUseBufferScript)
217 }
218 S::StaleGraphRevision => Self::Contention(ContentionKind::StaleGraphRevision),
219 S::ExecutionNotActive {
220 terminal_outcome,
221 lease_epoch,
222 lifecycle_phase,
223 attempt_id,
224 } => Self::Contention(ContentionKind::ExecutionNotActive {
225 terminal_outcome,
226 lease_epoch,
227 lifecycle_phase,
228 attempt_id,
229 }),
230 S::ExecutionNotEligible => Self::Contention(ContentionKind::ExecutionNotEligible),
231 S::ExecutionNotInEligibleSet => {
232 Self::Contention(ContentionKind::ExecutionNotInEligibleSet)
233 }
234 S::ExecutionNotReclaimable => {
235 Self::Contention(ContentionKind::ExecutionNotReclaimable)
236 }
237 S::NoActiveLease => Self::Contention(ContentionKind::NoActiveLease),
238 S::RateLimitExceeded => Self::Contention(ContentionKind::RateLimitExceeded),
239 S::ConcurrencyLimitExceeded => {
240 Self::Contention(ContentionKind::ConcurrencyLimitExceeded)
241 }
242
243 S::DependencyAlreadyExists => transport_script(S::DependencyAlreadyExists),
251 S::CycleDetected => Self::Conflict(ConflictKind::CycleDetected),
252 S::SelfReferencingEdge => Self::Conflict(ConflictKind::SelfReferencingEdge),
253 S::ExecutionAlreadyInFlow => Self::Conflict(ConflictKind::ExecutionAlreadyInFlow),
254 S::WaitpointAlreadyExists => Self::Conflict(ConflictKind::WaitpointAlreadyExists),
255 S::BudgetAttachConflict => Self::Conflict(ConflictKind::BudgetAttachConflict),
256 S::QuotaAttachConflict => Self::Conflict(ConflictKind::QuotaAttachConflict),
257 S::RotationConflict(kid) => Self::Conflict(ConflictKind::RotationConflict(kid)),
258 S::ActiveAttemptExists => Self::Conflict(ConflictKind::ActiveAttemptExists),
259
260 S::StaleLease => Self::State(StateKind::StaleLease),
262 S::LeaseExpired => Self::State(StateKind::LeaseExpired),
263 S::LeaseRevoked => Self::State(StateKind::LeaseRevoked),
264 S::ExecutionNotSuspended => Self::State(StateKind::ExecutionNotSuspended),
265 S::AlreadySuspended => Self::State(StateKind::AlreadySuspended),
266 S::WaitpointClosed => Self::State(StateKind::WaitpointClosed),
267 S::TargetNotSignalable => Self::State(StateKind::TargetNotSignalable),
268 S::DuplicateSignal => Self::State(StateKind::DuplicateSignal),
269 S::ResumeConditionNotMet => Self::State(StateKind::ResumeConditionNotMet),
270 S::WaitpointNotPending => Self::State(StateKind::WaitpointNotPending),
271 S::PendingWaitpointExpired => Self::State(StateKind::PendingWaitpointExpired),
272 S::WaitpointNotOpen => Self::State(StateKind::WaitpointNotOpen),
273 S::ExecutionNotTerminal => Self::State(StateKind::ExecutionNotTerminal),
274 S::MaxReplaysExhausted => Self::State(StateKind::MaxReplaysExhausted),
275 S::StreamClosed => Self::State(StateKind::StreamClosed),
276 S::StaleOwnerCannotAppend => Self::State(StateKind::StaleOwnerCannotAppend),
277 S::GrantAlreadyExists => Self::State(StateKind::GrantAlreadyExists),
278 S::ExecutionNotInFlow => Self::State(StateKind::ExecutionNotInFlow),
279 S::FlowAlreadyTerminal => Self::State(StateKind::FlowAlreadyTerminal),
280 S::DepsNotSatisfied => Self::State(StateKind::DepsNotSatisfied),
281 S::NotBlockedByDeps => Self::State(StateKind::NotBlockedByDeps),
282 S::NotRunnable => Self::State(StateKind::NotRunnable),
283 S::Terminal => Self::State(StateKind::Terminal),
284 S::BudgetExceeded => Self::State(StateKind::BudgetExceeded),
285 S::BudgetSoftExceeded => Self::State(StateKind::BudgetSoftExceeded),
286 S::OkAlreadyApplied => Self::State(StateKind::OkAlreadyApplied),
287 S::AttemptNotStarted => Self::State(StateKind::AttemptNotStarted),
288 S::AttemptAlreadyTerminal => Self::State(StateKind::AttemptAlreadyTerminal),
289 S::ExecutionNotEligibleForAttempt => {
290 Self::State(StateKind::ExecutionNotEligibleForAttempt)
291 }
292 S::ReplayNotAllowed => Self::State(StateKind::ReplayNotAllowed),
293 S::MaxRetriesExhausted => Self::State(StateKind::MaxRetriesExhausted),
294 S::StreamAlreadyClosed => Self::State(StateKind::StreamAlreadyClosed),
295
296 S::AttemptNotInCreatedState => Self::Bug(BugKind::AttemptNotInCreatedState),
298
299 #[cfg(feature = "valkey-client")]
301 e @ (S::Parse { .. } | S::Valkey(_)) => transport_script(e),
302 #[cfg(not(feature = "valkey-client"))]
303 e @ S::Parse { .. } => transport_script(e),
304
305 other => transport_script(other),
313 }
314 }
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320
321 #[test]
322 fn not_found_mappings() {
323 assert!(matches!(
324 EngineError::from(ScriptError::ExecutionNotFound),
325 EngineError::NotFound { entity: "execution" }
326 ));
327 assert!(matches!(
328 EngineError::from(ScriptError::FlowNotFound),
329 EngineError::NotFound { entity: "flow" }
330 ));
331 }
332
333 #[test]
334 fn validation_detail_preserved() {
335 match EngineError::from(ScriptError::CapabilityMismatch("gpu,cuda".into())) {
336 EngineError::Validation {
337 kind: ValidationKind::CapabilityMismatch,
338 detail,
339 } => assert_eq!(detail, "gpu,cuda"),
340 other => panic!("{other:?}"),
341 }
342 }
343
344 #[test]
345 fn contention_bucket() {
346 assert!(matches!(
347 EngineError::from(ScriptError::LeaseConflict),
348 EngineError::Contention(ContentionKind::LeaseConflict)
349 ));
350 assert!(matches!(
351 EngineError::from(ScriptError::UseClaimResumedExecution),
352 EngineError::Contention(ContentionKind::UseClaimResumedExecution)
353 ));
354 }
355
356 #[test]
357 fn execution_not_active_detail_flows_through() {
358 let src = ScriptError::ExecutionNotActive {
359 terminal_outcome: "success".into(),
360 lease_epoch: "3".into(),
361 lifecycle_phase: "terminal".into(),
362 attempt_id: "att-1".into(),
363 };
364 match EngineError::from(src) {
365 EngineError::Contention(ContentionKind::ExecutionNotActive {
366 terminal_outcome,
367 lease_epoch,
368 lifecycle_phase,
369 attempt_id,
370 }) => {
371 assert_eq!(terminal_outcome, "success");
372 assert_eq!(lease_epoch, "3");
373 assert_eq!(lifecycle_phase, "terminal");
374 assert_eq!(attempt_id, "att-1");
375 }
376 other => panic!("{other:?}"),
377 }
378 }
379
380 #[test]
381 fn dependency_already_exists_falls_through_to_transport_without_enrich() {
382 let err = EngineError::from(ScriptError::DependencyAlreadyExists);
383 match &err {
384 EngineError::Transport { backend, source } => {
385 assert_eq!(*backend, "valkey");
386 assert!(matches!(
387 source.downcast_ref::<ScriptError>(),
388 Some(ScriptError::DependencyAlreadyExists)
389 ));
390 }
391 other => panic!("{other:?}"),
392 }
393 }
394
395 #[test]
396 fn conflict_variants() {
397 assert!(matches!(
398 EngineError::from(ScriptError::CycleDetected),
399 EngineError::Conflict(ConflictKind::CycleDetected)
400 ));
401 assert!(matches!(
402 EngineError::from(ScriptError::ExecutionAlreadyInFlow),
403 EngineError::Conflict(ConflictKind::ExecutionAlreadyInFlow)
404 ));
405 match EngineError::from(ScriptError::RotationConflict("kid-1".into())) {
406 EngineError::Conflict(ConflictKind::RotationConflict(k)) => assert_eq!(k, "kid-1"),
407 other => panic!("{other:?}"),
408 }
409 }
410
411 #[test]
412 fn state_variants() {
413 assert!(matches!(
414 EngineError::from(ScriptError::StaleLease),
415 EngineError::State(StateKind::StaleLease)
416 ));
417 assert!(matches!(
418 EngineError::from(ScriptError::BudgetExceeded),
419 EngineError::State(StateKind::BudgetExceeded)
420 ));
421 }
422
423 #[test]
424 fn bug_variants() {
425 assert!(matches!(
426 EngineError::from(ScriptError::AttemptNotInCreatedState),
427 EngineError::Bug(BugKind::AttemptNotInCreatedState)
428 ));
429 }
430
431 #[test]
432 fn transport_preserves_parse() {
433 let err = EngineError::from(ScriptError::Parse {
434 fcall: "test_fn".into(),
435 execution_id: None,
436 message: "bad envelope".into(),
437 });
438 match &err {
439 EngineError::Transport { backend, source } => {
440 assert_eq!(*backend, "valkey");
441 assert!(matches!(
442 source.downcast_ref::<ScriptError>(),
443 Some(ScriptError::Parse { .. })
444 ));
445 }
446 other => panic!("{other:?}"),
447 }
448 }
449
450 #[test]
451 fn transport_script_helper_round_trips() {
452 let err = transport_script(ScriptError::AttemptNotFound);
453 assert!(matches!(
454 transport_script_ref(&err),
455 Some(ScriptError::AttemptNotFound)
456 ));
457 assert_eq!(class(&err), ScriptError::AttemptNotFound.class());
458 }
459
460 #[cfg(feature = "valkey-client")]
461 #[test]
462 fn transport_preserves_valkey_kind() {
463 let src = ScriptError::Valkey(ferriskey::Error::from((
464 ferriskey::ErrorKind::IoError,
465 "boom",
466 )));
467 let err = EngineError::from(src);
468 assert_eq!(valkey_kind(&err), Some(ferriskey::ErrorKind::IoError));
469 }
470
471 #[cfg(feature = "valkey-client")]
472 #[test]
473 fn class_transport_delegates() {
474 let err = EngineError::from(ScriptError::Valkey(ferriskey::Error::from((
475 ferriskey::ErrorKind::IoError,
476 "x",
477 ))));
478 assert_eq!(class(&err), ErrorClass::Retryable);
479 }
480
481 #[test]
482 fn class_transport_with_non_script_source_terminal() {
483 let raw = std::io::Error::other("simulated postgres net error");
484 let err = EngineError::Transport {
485 backend: "postgres",
486 source: Box::new(raw),
487 };
488 assert_eq!(class(&err), ErrorClass::Terminal);
489 #[cfg(feature = "valkey-client")]
490 assert!(valkey_kind(&err).is_none());
491 assert!(transport_script_ref(&err).is_none());
492 }
493
494 #[test]
495 fn unavailable_has_no_script_source() {
496 let err = EngineError::Unavailable { op: "claim" };
497 assert_eq!(class(&err), ErrorClass::Terminal);
498 #[cfg(feature = "valkey-client")]
499 assert!(valkey_kind(&err).is_none());
500 assert!(transport_script_ref(&err).is_none());
501 }
502}