1use std::error::Error;
33use std::fmt;
34
35use asupersync::channel::mpsc;
36use asupersync::runtime::JoinError;
37use pureflow_types::IdentifierError;
38
39use crate::capability::CapabilityValidationError;
40use crate::ports::{PortRecvError, PortSendError};
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum ErrorCode {
45 InvalidIdentifier,
47 InvalidCapabilities,
49 NodeExecutionFailed,
51 ExecutionCancelled,
53 LifecycleObservationFailed,
55 MetadataCollectionFailed,
57}
58
59impl ErrorCode {
60 #[must_use]
62 pub const fn as_str(self) -> &'static str {
63 match self {
64 Self::InvalidIdentifier => "CDT-VAL-001",
65 Self::InvalidCapabilities => "CDT-VAL-002",
66 Self::NodeExecutionFailed => "CDT-EXEC-001",
67 Self::ExecutionCancelled => "CDT-CANCEL-001",
68 Self::LifecycleObservationFailed => "CDT-LIFE-001",
69 Self::MetadataCollectionFailed => "CDT-META-001",
70 }
71 }
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum ErrorVisibility {
77 User,
79 Internal,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum RetryDisposition {
86 Never,
88 Safe,
90 Unknown,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
96pub enum ValidationError {
97 Identifier(IdentifierError),
99 Capability(CapabilityValidationError),
101}
102
103impl ValidationError {
104 const fn code(&self) -> ErrorCode {
105 match self {
106 Self::Identifier(_) => ErrorCode::InvalidIdentifier,
107 Self::Capability(_) => ErrorCode::InvalidCapabilities,
108 }
109 }
110}
111
112impl fmt::Display for ValidationError {
113 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114 match self {
115 Self::Identifier(err) => write!(f, "identifier validation failed: {err}"),
116 Self::Capability(err) => write!(f, "capability validation failed: {err}"),
117 }
118 }
119}
120
121impl Error for ValidationError {
122 fn source(&self) -> Option<&(dyn Error + 'static)> {
123 match self {
124 Self::Identifier(err) => Some(err),
125 Self::Capability(err) => Some(err),
126 }
127 }
128}
129
130impl From<IdentifierError> for ValidationError {
131 fn from(value: IdentifierError) -> Self {
132 Self::Identifier(value)
133 }
134}
135
136impl From<CapabilityValidationError> for ValidationError {
137 fn from(value: CapabilityValidationError) -> Self {
138 Self::Capability(value)
139 }
140}
141
142#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct ExecutionError {
145 message: String,
146}
147
148impl ExecutionError {
149 #[must_use]
151 pub fn new(message: impl Into<String>) -> Self {
152 Self {
153 message: message.into(),
154 }
155 }
156
157 #[must_use]
159 pub fn message(&self) -> &str {
160 &self.message
161 }
162}
163
164impl fmt::Display for ExecutionError {
165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166 write!(f, "node execution failed: {}", self.message)
167 }
168}
169
170impl Error for ExecutionError {}
171
172#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct CancellationError {
175 reason: String,
176}
177
178impl CancellationError {
179 #[must_use]
181 pub fn new(reason: impl Into<String>) -> Self {
182 Self {
183 reason: reason.into(),
184 }
185 }
186
187 #[must_use]
189 pub fn reason(&self) -> &str {
190 &self.reason
191 }
192}
193
194impl fmt::Display for CancellationError {
195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 write!(f, "execution cancelled: {}", self.reason)
197 }
198}
199
200impl Error for CancellationError {}
201
202#[derive(Debug, Clone, PartialEq, Eq)]
204pub struct LifecycleError {
205 message: String,
206}
207
208impl LifecycleError {
209 #[must_use]
211 pub fn new(message: impl Into<String>) -> Self {
212 Self {
213 message: message.into(),
214 }
215 }
216
217 #[must_use]
219 pub fn message(&self) -> &str {
220 &self.message
221 }
222}
223
224impl fmt::Display for LifecycleError {
225 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226 write!(f, "lifecycle observation failed: {}", self.message)
227 }
228}
229
230impl Error for LifecycleError {}
231
232#[derive(Debug, Clone, PartialEq, Eq)]
234pub struct MetadataError {
235 message: String,
236}
237
238impl MetadataError {
239 #[must_use]
241 pub fn new(message: impl Into<String>) -> Self {
242 Self {
243 message: message.into(),
244 }
245 }
246
247 #[must_use]
249 pub fn message(&self) -> &str {
250 &self.message
251 }
252}
253
254impl fmt::Display for MetadataError {
255 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256 write!(f, "metadata collection failed: {}", self.message)
257 }
258}
259
260impl Error for MetadataError {}
261
262#[derive(Debug, Clone, PartialEq, Eq)]
264pub enum PureflowError {
265 Validation(ValidationError),
267 Execution(ExecutionError),
269 Cancellation(CancellationError),
271 Lifecycle(LifecycleError),
273 Metadata(MetadataError),
275}
276
277impl PureflowError {
278 #[must_use]
280 pub fn execution(message: impl Into<String>) -> Self {
281 Self::Execution(ExecutionError::new(message))
282 }
283
284 #[must_use]
286 pub fn cancelled(reason: impl Into<String>) -> Self {
287 Self::Cancellation(CancellationError::new(reason))
288 }
289
290 #[must_use]
292 pub fn lifecycle(message: impl Into<String>) -> Self {
293 Self::Lifecycle(LifecycleError::new(message))
294 }
295
296 #[must_use]
298 pub fn metadata(message: impl Into<String>) -> Self {
299 Self::Metadata(MetadataError::new(message))
300 }
301
302 #[must_use]
304 pub const fn code(&self) -> ErrorCode {
305 match self {
306 Self::Validation(err) => err.code(),
307 Self::Execution(_) => ErrorCode::NodeExecutionFailed,
308 Self::Cancellation(_) => ErrorCode::ExecutionCancelled,
309 Self::Lifecycle(_) => ErrorCode::LifecycleObservationFailed,
310 Self::Metadata(_) => ErrorCode::MetadataCollectionFailed,
311 }
312 }
313
314 #[must_use]
316 pub const fn visibility(&self) -> ErrorVisibility {
317 match self {
318 Self::Validation(_) | Self::Cancellation(_) => ErrorVisibility::User,
319 Self::Execution(_) | Self::Lifecycle(_) | Self::Metadata(_) => {
320 ErrorVisibility::Internal
321 }
322 }
323 }
324
325 #[must_use]
327 pub const fn retry_disposition(&self) -> RetryDisposition {
328 match self {
329 Self::Validation(_) => RetryDisposition::Never,
330 Self::Execution(_) | Self::Lifecycle(_) | Self::Metadata(_) => {
331 RetryDisposition::Unknown
332 }
333 Self::Cancellation(_) => RetryDisposition::Safe,
334 }
335 }
336}
337
338impl fmt::Display for PureflowError {
339 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
340 match self {
341 Self::Validation(err) => write!(f, "{}: {err}", self.code().as_str()),
342 Self::Execution(err) => write!(f, "{}: {err}", self.code().as_str()),
343 Self::Cancellation(err) => write!(f, "{}: {err}", self.code().as_str()),
344 Self::Lifecycle(err) => write!(f, "{}: {err}", self.code().as_str()),
345 Self::Metadata(err) => write!(f, "{}: {err}", self.code().as_str()),
346 }
347 }
348}
349
350impl Error for PureflowError {
351 fn source(&self) -> Option<&(dyn Error + 'static)> {
352 match self {
353 Self::Validation(err) => Some(err),
354 Self::Execution(err) => Some(err),
355 Self::Cancellation(err) => Some(err),
356 Self::Lifecycle(err) => Some(err),
357 Self::Metadata(err) => Some(err),
358 }
359 }
360}
361
362impl From<ValidationError> for PureflowError {
363 fn from(value: ValidationError) -> Self {
364 Self::Validation(value)
365 }
366}
367
368impl From<IdentifierError> for PureflowError {
369 fn from(value: IdentifierError) -> Self {
370 Self::Validation(value.into())
371 }
372}
373
374impl From<CapabilityValidationError> for PureflowError {
375 fn from(value: CapabilityValidationError) -> Self {
376 Self::Validation(value.into())
377 }
378}
379
380impl From<ExecutionError> for PureflowError {
381 fn from(value: ExecutionError) -> Self {
382 Self::Execution(value)
383 }
384}
385
386impl From<CancellationError> for PureflowError {
387 fn from(value: CancellationError) -> Self {
388 Self::Cancellation(value)
389 }
390}
391
392impl From<LifecycleError> for PureflowError {
393 fn from(value: LifecycleError) -> Self {
394 Self::Lifecycle(value)
395 }
396}
397
398impl From<MetadataError> for PureflowError {
399 fn from(value: MetadataError) -> Self {
400 Self::Metadata(value)
401 }
402}
403
404impl From<JoinError> for PureflowError {
405 fn from(value: JoinError) -> Self {
406 match value {
407 JoinError::Cancelled(reason) => Self::cancelled(reason.to_string()),
408 JoinError::Panicked(payload) => {
409 Self::execution(format!("asupersync task panicked: {payload}"))
410 }
411 JoinError::PolledAfterCompletion => {
412 Self::execution("asupersync task join polled after completion")
413 }
414 }
415 }
416}
417
418impl<T> From<mpsc::SendError<T>> for PureflowError {
419 fn from(value: mpsc::SendError<T>) -> Self {
420 match value {
421 mpsc::SendError::Disconnected(_) => {
422 Self::execution("asupersync send failed: receiver disconnected")
423 }
424 mpsc::SendError::Cancelled(_) => Self::cancelled("asupersync send cancelled"),
425 mpsc::SendError::Full(_) => {
426 Self::execution("asupersync send failed: bounded channel full")
427 }
428 }
429 }
430}
431
432impl From<mpsc::RecvError> for PureflowError {
433 fn from(value: mpsc::RecvError) -> Self {
434 match value {
435 mpsc::RecvError::Disconnected => {
436 Self::execution("asupersync receive failed: sender disconnected")
437 }
438 mpsc::RecvError::Cancelled => Self::cancelled("asupersync receive cancelled"),
439 mpsc::RecvError::Empty => Self::execution("asupersync receive failed: channel empty"),
440 }
441 }
442}
443
444impl From<PortSendError> for PureflowError {
445 fn from(value: PortSendError) -> Self {
446 match value {
447 PortSendError::Cancelled { .. } => Self::cancelled(value.to_string()),
448 _ => Self::execution(value.to_string()),
449 }
450 }
451}
452
453impl From<PortRecvError> for PureflowError {
454 fn from(value: PortRecvError) -> Self {
455 match value {
456 PortRecvError::Cancelled { .. } => Self::cancelled(value.to_string()),
457 _ => Self::execution(value.to_string()),
458 }
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use std::collections::BTreeSet;
465
466 use super::*;
467 use crate::capability::{EffectCapability, NodeCapabilities};
468 use asupersync::types::{CancelReason, PanicPayload};
469 use pureflow_types::NodeId;
470
471 const ALL_ERROR_CODES: [ErrorCode; 6] = [
472 ErrorCode::InvalidIdentifier,
473 ErrorCode::InvalidCapabilities,
474 ErrorCode::NodeExecutionFailed,
475 ErrorCode::ExecutionCancelled,
476 ErrorCode::LifecycleObservationFailed,
477 ErrorCode::MetadataCollectionFailed,
478 ];
479
480 const fn expected_category_prefix(code: ErrorCode) -> &'static str {
481 match code {
482 ErrorCode::InvalidIdentifier | ErrorCode::InvalidCapabilities => "CDT-VAL-",
483 ErrorCode::NodeExecutionFailed => "CDT-EXEC-",
484 ErrorCode::ExecutionCancelled => "CDT-CANCEL-",
485 ErrorCode::LifecycleObservationFailed => "CDT-LIFE-",
486 ErrorCode::MetadataCollectionFailed => "CDT-META-",
487 }
488 }
489
490 #[test]
491 fn error_code_strings_are_unique_nonempty_and_category_prefixed() {
492 let mut seen: BTreeSet<&'static str> = BTreeSet::new();
493
494 for code in ALL_ERROR_CODES {
495 let value: &'static str = code.as_str();
496
497 assert!(!value.is_empty(), "error code must not be empty: {code:?}");
498 assert!(
499 value.starts_with(expected_category_prefix(code)),
500 "error code {value} should match category for {code:?}"
501 );
502 assert!(seen.insert(value), "duplicate error code string: {value}");
503 }
504
505 assert_eq!(seen.len(), ALL_ERROR_CODES.len());
506 }
507
508 #[test]
509 fn identifier_errors_map_to_user_facing_non_retryable_codes() {
510 let err: PureflowError = IdentifierError::Whitespace {
511 kind: pureflow_types::IdentifierKind::Workflow,
512 }
513 .into();
514
515 assert_eq!(err.code(), ErrorCode::InvalidIdentifier);
516 assert_eq!(err.code().as_str(), "CDT-VAL-001");
517 assert_eq!(err.visibility(), ErrorVisibility::User);
518 assert_eq!(err.retry_disposition(), RetryDisposition::Never);
519 assert_eq!(
520 err.to_string(),
521 "CDT-VAL-001: identifier validation failed: workflow id must not contain whitespace"
522 );
523 }
524
525 #[test]
526 fn capability_errors_map_to_validation_codes() {
527 let err: PureflowError = NodeCapabilities::new(
528 NodeId::new("reader").expect("valid node id"),
529 Vec::new(),
530 [
531 EffectCapability::FileSystemRead,
532 EffectCapability::FileSystemRead,
533 ],
534 )
535 .expect_err("duplicate effect must fail")
536 .into();
537
538 assert_eq!(err.code(), ErrorCode::InvalidCapabilities);
539 assert_eq!(err.visibility(), ErrorVisibility::User);
540 assert_eq!(err.retry_disposition(), RetryDisposition::Never);
541 }
542
543 #[test]
544 fn execution_errors_are_internal_with_unknown_retry_safety() {
545 let err: PureflowError = PureflowError::execution("executor returned failure");
546
547 assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
548 assert_eq!(err.visibility(), ErrorVisibility::Internal);
549 assert_eq!(err.retry_disposition(), RetryDisposition::Unknown);
550 assert_eq!(
551 err.to_string(),
552 "CDT-EXEC-001: node execution failed: executor returned failure"
553 );
554 }
555
556 #[test]
557 fn cancellation_errors_are_user_facing_and_safe_to_retry() {
558 let err: PureflowError = PureflowError::cancelled("shutdown requested");
559
560 assert_eq!(err.code(), ErrorCode::ExecutionCancelled);
561 assert_eq!(err.visibility(), ErrorVisibility::User);
562 assert_eq!(err.retry_disposition(), RetryDisposition::Safe);
563 assert_eq!(
564 err.to_string(),
565 "CDT-CANCEL-001: execution cancelled: shutdown requested"
566 );
567 }
568
569 #[test]
570 fn metadata_errors_are_internal_with_unknown_retry_safety() {
571 let err: PureflowError = PureflowError::metadata("collector unavailable");
572
573 assert_eq!(err.code(), ErrorCode::MetadataCollectionFailed);
574 assert_eq!(err.visibility(), ErrorVisibility::Internal);
575 assert_eq!(err.retry_disposition(), RetryDisposition::Unknown);
576 assert_eq!(
577 err.to_string(),
578 "CDT-META-001: metadata collection failed: collector unavailable"
579 );
580 }
581
582 #[test]
583 fn asupersync_join_cancel_maps_to_cancellation() {
584 let err: PureflowError = JoinError::Cancelled(CancelReason::user("shutdown")).into();
585
586 assert_eq!(err.code(), ErrorCode::ExecutionCancelled);
587 assert_eq!(err.visibility(), ErrorVisibility::User);
588 assert_eq!(err.retry_disposition(), RetryDisposition::Safe);
589 assert!(err.to_string().contains("shutdown"));
590 }
591
592 #[test]
593 fn asupersync_join_panic_maps_to_execution_failure() {
594 let err: PureflowError = JoinError::Panicked(PanicPayload::new("boom")).into();
595
596 assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
597 assert_eq!(err.visibility(), ErrorVisibility::Internal);
598 assert_eq!(err.retry_disposition(), RetryDisposition::Unknown);
599 assert_eq!(
600 err.to_string(),
601 "CDT-EXEC-001: node execution failed: asupersync task panicked: panic: boom"
602 );
603 }
604
605 #[test]
606 fn asupersync_send_cancel_maps_to_cancellation() {
607 let err: PureflowError = mpsc::SendError::Cancelled(()).into();
608
609 assert_eq!(err.code(), ErrorCode::ExecutionCancelled);
610 assert_eq!(
611 err.to_string(),
612 "CDT-CANCEL-001: execution cancelled: asupersync send cancelled"
613 );
614 }
615
616 #[test]
617 fn asupersync_send_full_maps_to_execution_failure() {
618 let err: PureflowError = mpsc::SendError::Full(()).into();
619
620 assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
621 assert_eq!(
622 err.to_string(),
623 "CDT-EXEC-001: node execution failed: asupersync send failed: bounded channel full"
624 );
625 }
626
627 #[test]
628 fn asupersync_recv_disconnected_maps_to_execution_failure() {
629 let err: PureflowError = mpsc::RecvError::Disconnected.into();
630
631 assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
632 assert_eq!(
633 err.to_string(),
634 "CDT-EXEC-001: node execution failed: asupersync receive failed: sender disconnected"
635 );
636 }
637
638 #[test]
639 fn port_errors_map_to_execution_failures() {
640 let port_id: pureflow_types::PortId =
641 pureflow_types::PortId::new("out").expect("valid port id");
642 let err: PureflowError = PortSendError::Full { port_id }.into();
643
644 assert_eq!(err.code(), ErrorCode::NodeExecutionFailed);
645 assert_eq!(
646 err.to_string(),
647 "CDT-EXEC-001: node execution failed: output port `out` is full"
648 );
649 }
650
651 #[test]
652 fn cancelled_port_errors_map_to_cancellation_failures() {
653 let port_id: pureflow_types::PortId =
654 pureflow_types::PortId::new("out").expect("valid port id");
655 let err: PureflowError = PortSendError::Cancelled { port_id }.into();
656
657 assert_eq!(err.code(), ErrorCode::ExecutionCancelled);
658 assert_eq!(
659 err.to_string(),
660 "CDT-CANCEL-001: execution cancelled: output port `out` send cancelled"
661 );
662 }
663}