celers_core/
exception.rs

1//! Exception Handling for Task Execution
2//!
3//! This module provides comprehensive exception handling capabilities for task execution,
4//! including exception classification, custom handlers, traceback preservation, and
5//! cross-language exception serialization.
6//!
7//! # Example
8//!
9//! ```rust
10//! use celers_core::exception::{
11//!     TaskException, ExceptionCategory, ExceptionPolicy, ExceptionAction,
12//! };
13//!
14//! // Create an exception with traceback
15//! let exception = TaskException::new("ValueError", "Invalid input: negative number")
16//!     .with_traceback(vec![
17//!         ("process_data".to_string(), "tasks.py".to_string(), 42),
18//!         ("validate_input".to_string(), "validators.py".to_string(), 15),
19//!     ])
20//!     .with_category(ExceptionCategory::Retryable);
21//!
22//! // Create a policy for handling exceptions
23//! let policy = ExceptionPolicy::new()
24//!     .ignore_on(&["NotFoundError"])
25//!     .retry_on(&["TimeoutError", "ConnectionError"])
26//!     .fail_on(&["ValidationError"]);
27//!
28//! // Determine action based on exception
29//! let action = policy.get_action(&exception);
30//! ```
31
32use serde::{Deserialize, Serialize};
33use std::collections::HashMap;
34use std::fmt;
35
36/// Category of an exception that determines retry behavior
37#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
38pub enum ExceptionCategory {
39    /// Exception is retryable (transient errors like network issues)
40    Retryable,
41    /// Exception is fatal and should not be retried
42    Fatal,
43    /// Exception result should be ignored (task considered successful)
44    Ignorable,
45    /// Exception requires manual intervention
46    RequiresIntervention,
47    /// Unknown category - use default policy
48    #[default]
49    Unknown,
50}
51
52impl fmt::Display for ExceptionCategory {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        match self {
55            Self::Retryable => write!(f, "RETRYABLE"),
56            Self::Fatal => write!(f, "FATAL"),
57            Self::Ignorable => write!(f, "IGNORABLE"),
58            Self::RequiresIntervention => write!(f, "REQUIRES_INTERVENTION"),
59            Self::Unknown => write!(f, "UNKNOWN"),
60        }
61    }
62}
63
64/// Action to take when an exception occurs
65#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
66pub enum ExceptionAction {
67    /// Retry the task
68    Retry,
69    /// Fail the task and move to DLQ
70    Fail,
71    /// Ignore the exception and mark task as successful
72    Ignore,
73    /// Reject the task (don't retry, don't move to DLQ)
74    Reject,
75    /// Defer to default retry policy
76    #[default]
77    Default,
78}
79
80impl fmt::Display for ExceptionAction {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        match self {
83            Self::Retry => write!(f, "RETRY"),
84            Self::Fail => write!(f, "FAIL"),
85            Self::Ignore => write!(f, "IGNORE"),
86            Self::Reject => write!(f, "REJECT"),
87            Self::Default => write!(f, "DEFAULT"),
88        }
89    }
90}
91
92/// A single frame in an exception traceback
93#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
94pub struct TracebackFrame {
95    /// Function or method name
96    pub function: String,
97    /// File path or module name
98    pub file: String,
99    /// Line number
100    pub line: u32,
101    /// Optional column number
102    pub column: Option<u32>,
103    /// Optional local variables (for debugging)
104    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
105    pub locals: HashMap<String, String>,
106}
107
108impl TracebackFrame {
109    /// Create a new traceback frame
110    pub fn new(function: impl Into<String>, file: impl Into<String>, line: u32) -> Self {
111        Self {
112            function: function.into(),
113            file: file.into(),
114            line,
115            column: None,
116            locals: HashMap::new(),
117        }
118    }
119
120    /// Add column information
121    #[must_use]
122    pub fn with_column(mut self, column: u32) -> Self {
123        self.column = Some(column);
124        self
125    }
126
127    /// Add a local variable
128    #[must_use]
129    pub fn with_local(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
130        self.locals.insert(name.into(), value.into());
131        self
132    }
133}
134
135impl fmt::Display for TracebackFrame {
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        write!(f, "  File \"{}\", line {}", self.file, self.line)?;
138        if let Some(col) = self.column {
139            write!(f, ", column {col}")?;
140        }
141        write!(f, ", in {}", self.function)?;
142        Ok(())
143    }
144}
145
146/// A structured exception from task execution
147#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct TaskException {
149    /// Exception type name (e.g., "`ValueError`", "`TimeoutError`")
150    pub exc_type: String,
151    /// Exception message
152    pub exc_message: String,
153    /// Full traceback as structured frames
154    #[serde(default)]
155    pub traceback: Vec<TracebackFrame>,
156    /// Raw traceback string (for Python compatibility)
157    #[serde(default)]
158    pub traceback_str: Option<String>,
159    /// Exception category
160    #[serde(default)]
161    pub category: ExceptionCategory,
162    /// Cause exception (for chained exceptions)
163    #[serde(default)]
164    pub cause: Option<Box<TaskException>>,
165    /// Context exception (for exception context)
166    #[serde(default)]
167    pub context: Option<Box<TaskException>>,
168    /// Additional metadata
169    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
170    pub metadata: HashMap<String, serde_json::Value>,
171    /// Timestamp when exception occurred (Unix timestamp)
172    #[serde(default)]
173    pub timestamp: Option<f64>,
174    /// Worker hostname where exception occurred
175    #[serde(default)]
176    pub hostname: Option<String>,
177    /// Task ID that raised the exception
178    #[serde(default)]
179    pub task_id: Option<String>,
180}
181
182impl TaskException {
183    /// Create a new task exception
184    pub fn new(exc_type: impl Into<String>, exc_message: impl Into<String>) -> Self {
185        Self {
186            exc_type: exc_type.into(),
187            exc_message: exc_message.into(),
188            traceback: Vec::new(),
189            traceback_str: None,
190            category: ExceptionCategory::Unknown,
191            cause: None,
192            context: None,
193            metadata: HashMap::new(),
194            timestamp: None,
195            hostname: None,
196            task_id: None,
197        }
198    }
199
200    /// Create from a Rust error
201    pub fn from_error<E: std::error::Error>(error: &E) -> Self {
202        let exc_type = std::any::type_name::<E>()
203            .rsplit("::")
204            .next()
205            .unwrap_or("Error")
206            .to_string();
207
208        let mut exception = Self::new(exc_type, error.to_string());
209
210        // Capture error chain as cause
211        if let Some(source) = error.source() {
212            exception.cause = Some(Box::new(Self::from_error_dyn(source)));
213        }
214
215        exception
216    }
217
218    /// Create from a dynamic error reference
219    fn from_error_dyn(error: &dyn std::error::Error) -> Self {
220        let exc_type = "Error".to_string();
221        let mut exception = Self::new(exc_type, error.to_string());
222
223        if let Some(source) = error.source() {
224            exception.cause = Some(Box::new(Self::from_error_dyn(source)));
225        }
226
227        exception
228    }
229
230    /// Add structured traceback frames
231    #[must_use]
232    pub fn with_traceback(mut self, frames: Vec<(String, String, u32)>) -> Self {
233        self.traceback = frames
234            .into_iter()
235            .map(|(func, file, line)| TracebackFrame::new(func, file, line))
236            .collect();
237        self
238    }
239
240    /// Add traceback frames
241    #[must_use]
242    pub fn with_traceback_frames(mut self, frames: Vec<TracebackFrame>) -> Self {
243        self.traceback = frames;
244        self
245    }
246
247    /// Add raw traceback string
248    #[must_use]
249    pub fn with_traceback_str(mut self, traceback: impl Into<String>) -> Self {
250        self.traceback_str = Some(traceback.into());
251        self
252    }
253
254    /// Set exception category
255    #[must_use]
256    pub fn with_category(mut self, category: ExceptionCategory) -> Self {
257        self.category = category;
258        self
259    }
260
261    /// Set cause exception
262    #[must_use]
263    pub fn with_cause(mut self, cause: TaskException) -> Self {
264        self.cause = Some(Box::new(cause));
265        self
266    }
267
268    /// Set context exception
269    #[must_use]
270    pub fn with_context(mut self, context: TaskException) -> Self {
271        self.context = Some(Box::new(context));
272        self
273    }
274
275    /// Add metadata
276    #[must_use]
277    pub fn with_metadata(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
278        self.metadata.insert(key.into(), value);
279        self
280    }
281
282    /// Set timestamp
283    #[must_use]
284    pub fn with_timestamp(mut self, timestamp: f64) -> Self {
285        self.timestamp = Some(timestamp);
286        self
287    }
288
289    /// Set timestamp to now
290    #[must_use]
291    pub fn with_timestamp_now(mut self) -> Self {
292        use std::time::{SystemTime, UNIX_EPOCH};
293        self.timestamp = Some(
294            SystemTime::now()
295                .duration_since(UNIX_EPOCH)
296                .unwrap_or_default()
297                .as_secs_f64(),
298        );
299        self
300    }
301
302    /// Set hostname
303    #[must_use]
304    pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
305        self.hostname = Some(hostname.into());
306        self
307    }
308
309    /// Set task ID
310    #[must_use]
311    pub fn with_task_id(mut self, task_id: impl Into<String>) -> Self {
312        self.task_id = Some(task_id.into());
313        self
314    }
315
316    /// Check if this exception is retryable
317    #[inline]
318    #[must_use]
319    pub const fn is_retryable(&self) -> bool {
320        self.category as u8 == ExceptionCategory::Retryable as u8
321    }
322
323    /// Check if this exception is fatal
324    #[inline]
325    #[must_use]
326    pub const fn is_fatal(&self) -> bool {
327        self.category as u8 == ExceptionCategory::Fatal as u8
328    }
329
330    /// Check if this exception should be ignored
331    #[inline]
332    #[must_use]
333    pub const fn is_ignorable(&self) -> bool {
334        self.category as u8 == ExceptionCategory::Ignorable as u8
335    }
336
337    /// Get the full exception chain as a vector
338    #[must_use]
339    pub fn exception_chain(&self) -> Vec<&TaskException> {
340        let mut chain = vec![self];
341        let mut current = self;
342
343        while let Some(cause) = &current.cause {
344            chain.push(cause);
345            current = cause;
346        }
347
348        chain
349    }
350
351    /// Format the traceback as a string
352    #[must_use]
353    pub fn format_traceback(&self) -> String {
354        use std::fmt::Write;
355
356        if let Some(ref tb_str) = self.traceback_str {
357            return tb_str.clone();
358        }
359
360        if self.traceback.is_empty() {
361            return String::new();
362        }
363
364        let mut result = String::from("Traceback (most recent call last):\n");
365        for frame in &self.traceback {
366            let _ = writeln!(result, "{frame}");
367        }
368        let _ = write!(result, "{}: {}", self.exc_type, self.exc_message);
369        result
370    }
371
372    /// Serialize to JSON for cross-language compatibility
373    ///
374    /// # Errors
375    ///
376    /// Returns an error if serialization fails.
377    pub fn to_json(&self) -> Result<String, serde_json::Error> {
378        serde_json::to_string(self)
379    }
380
381    /// Deserialize from JSON
382    ///
383    /// # Errors
384    ///
385    /// Returns an error if deserialization fails.
386    pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
387        serde_json::from_str(json)
388    }
389
390    /// Convert to Celery-compatible format
391    #[must_use]
392    pub fn to_celery_format(&self) -> serde_json::Value {
393        serde_json::json!({
394            "exc_type": self.exc_type,
395            "exc_message": self.exc_message,
396            "exc_module": self.metadata.get("module").cloned().unwrap_or(serde_json::Value::Null),
397            "traceback": self.traceback_str.clone().unwrap_or_else(|| self.format_traceback()),
398        })
399    }
400}
401
402impl fmt::Display for TaskException {
403    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404        write!(f, "{}: {}", self.exc_type, self.exc_message)
405    }
406}
407
408impl std::error::Error for TaskException {
409    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
410        self.cause
411            .as_ref()
412            .map(|e| e.as_ref() as &dyn std::error::Error)
413    }
414}
415
416/// Policy for handling exceptions
417#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct ExceptionPolicy {
419    /// Exception types to retry on (by type name pattern)
420    #[serde(default)]
421    pub retry_on: Vec<String>,
422    /// Exception types to ignore (task succeeds despite exception)
423    #[serde(default)]
424    pub ignore_on: Vec<String>,
425    /// Exception types to fail on (no retry, go to DLQ)
426    #[serde(default)]
427    pub fail_on: Vec<String>,
428    /// Exception types to reject (no retry, no DLQ)
429    #[serde(default)]
430    pub reject_on: Vec<String>,
431    /// Default action for unmatched exceptions
432    #[serde(default)]
433    pub default_action: ExceptionAction,
434    /// Whether to preserve full traceback
435    #[serde(default = "default_true")]
436    pub preserve_traceback: bool,
437    /// Maximum traceback depth to preserve
438    #[serde(default)]
439    pub max_traceback_depth: Option<usize>,
440    /// Custom metadata to include in exceptions
441    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
442    pub include_metadata: HashMap<String, bool>,
443}
444
445fn default_true() -> bool {
446    true
447}
448
449impl Default for ExceptionPolicy {
450    fn default() -> Self {
451        Self {
452            retry_on: Vec::new(),
453            ignore_on: Vec::new(),
454            fail_on: Vec::new(),
455            reject_on: Vec::new(),
456            default_action: ExceptionAction::default(),
457            preserve_traceback: true, // Default to preserving traceback
458            max_traceback_depth: None,
459            include_metadata: HashMap::new(),
460        }
461    }
462}
463
464impl ExceptionPolicy {
465    /// Create a new exception policy with defaults
466    #[must_use]
467    pub fn new() -> Self {
468        Self::default()
469    }
470
471    /// Set exception types to retry on
472    #[must_use]
473    pub fn retry_on(mut self, types: &[&str]) -> Self {
474        self.retry_on = types.iter().map(std::string::ToString::to_string).collect();
475        self
476    }
477
478    /// Add exception type to retry on
479    #[must_use]
480    pub fn add_retry_on(mut self, exc_type: impl Into<String>) -> Self {
481        self.retry_on.push(exc_type.into());
482        self
483    }
484
485    /// Set exception types to ignore
486    #[must_use]
487    pub fn ignore_on(mut self, types: &[&str]) -> Self {
488        self.ignore_on = types.iter().map(std::string::ToString::to_string).collect();
489        self
490    }
491
492    /// Add exception type to ignore
493    #[must_use]
494    pub fn add_ignore_on(mut self, exc_type: impl Into<String>) -> Self {
495        self.ignore_on.push(exc_type.into());
496        self
497    }
498
499    /// Set exception types to fail on
500    #[must_use]
501    pub fn fail_on(mut self, types: &[&str]) -> Self {
502        self.fail_on = types.iter().map(std::string::ToString::to_string).collect();
503        self
504    }
505
506    /// Add exception type to fail on
507    #[must_use]
508    pub fn add_fail_on(mut self, exc_type: impl Into<String>) -> Self {
509        self.fail_on.push(exc_type.into());
510        self
511    }
512
513    /// Set exception types to reject
514    #[must_use]
515    pub fn reject_on(mut self, types: &[&str]) -> Self {
516        self.reject_on = types.iter().map(std::string::ToString::to_string).collect();
517        self
518    }
519
520    /// Add exception type to reject
521    #[must_use]
522    pub fn add_reject_on(mut self, exc_type: impl Into<String>) -> Self {
523        self.reject_on.push(exc_type.into());
524        self
525    }
526
527    /// Set default action
528    #[must_use]
529    pub fn with_default_action(mut self, action: ExceptionAction) -> Self {
530        self.default_action = action;
531        self
532    }
533
534    /// Set traceback preservation
535    #[must_use]
536    pub fn with_traceback(mut self, preserve: bool) -> Self {
537        self.preserve_traceback = preserve;
538        self
539    }
540
541    /// Set maximum traceback depth
542    #[must_use]
543    pub fn with_max_traceback_depth(mut self, depth: usize) -> Self {
544        self.max_traceback_depth = Some(depth);
545        self
546    }
547
548    /// Get the action to take for an exception
549    #[must_use]
550    pub fn get_action(&self, exception: &TaskException) -> ExceptionAction {
551        let exc_type = &exception.exc_type;
552
553        // Check ignore first (takes precedence)
554        if self.matches_pattern(exc_type, &self.ignore_on) {
555            return ExceptionAction::Ignore;
556        }
557
558        // Check reject
559        if self.matches_pattern(exc_type, &self.reject_on) {
560            return ExceptionAction::Reject;
561        }
562
563        // Check fail
564        if self.matches_pattern(exc_type, &self.fail_on) {
565            return ExceptionAction::Fail;
566        }
567
568        // Check retry
569        if self.matches_pattern(exc_type, &self.retry_on) {
570            return ExceptionAction::Retry;
571        }
572        // Use category-based decision if available
573        match exception.category {
574            ExceptionCategory::Retryable => ExceptionAction::Retry,
575            ExceptionCategory::Fatal | ExceptionCategory::RequiresIntervention => {
576                ExceptionAction::Fail
577            }
578            ExceptionCategory::Ignorable => ExceptionAction::Ignore,
579            ExceptionCategory::Unknown => self.default_action,
580        }
581    }
582
583    /// Check if exception type matches any pattern
584    #[allow(clippy::unused_self)]
585    fn matches_pattern(&self, exc_type: &str, patterns: &[String]) -> bool {
586        for pattern in patterns {
587            if pattern == exc_type {
588                return true;
589            }
590            // Support contains pattern (*middle*)
591            if let Some(rest) = pattern.strip_prefix('*') {
592                if let Some(middle) = rest.strip_suffix('*') {
593                    if !middle.is_empty() && exc_type.contains(middle) {
594                        return true;
595                    }
596                } else {
597                    // Suffix pattern (*suffix)
598                    if exc_type.ends_with(rest) {
599                        return true;
600                    }
601                }
602            }
603            // Prefix pattern (prefix*)
604            if let Some(prefix) = pattern.strip_suffix('*') {
605                if exc_type.starts_with(prefix) {
606                    return true;
607                }
608            }
609        }
610        false
611    }
612
613    /// Process an exception according to policy
614    #[must_use]
615    pub fn process_exception(&self, mut exception: TaskException) -> TaskException {
616        // Truncate traceback if needed
617        if let Some(max_depth) = self.max_traceback_depth {
618            if exception.traceback.len() > max_depth {
619                exception.traceback.truncate(max_depth);
620            }
621        }
622
623        // Clear traceback if not preserving
624        if !self.preserve_traceback {
625            exception.traceback.clear();
626            exception.traceback_str = None;
627        }
628
629        exception
630    }
631}
632
633/// Trait for custom exception handlers
634pub trait ExceptionHandler: Send + Sync {
635    /// Handle an exception and return the action to take
636    fn handle(&self, exception: &TaskException) -> ExceptionAction;
637
638    /// Transform an exception (e.g., add metadata, modify traceback)
639    fn transform(&self, exception: TaskException) -> TaskException {
640        exception
641    }
642
643    /// Called before the exception is processed
644    fn on_exception(&self, _exception: &TaskException) {}
645
646    /// Get handler name for logging
647    fn name(&self) -> &'static str {
648        "ExceptionHandler"
649    }
650}
651
652/// A chain of exception handlers
653#[derive(Default)]
654pub struct ExceptionHandlerChain {
655    handlers: Vec<Box<dyn ExceptionHandler>>,
656}
657
658impl ExceptionHandlerChain {
659    /// Create a new handler chain
660    #[must_use]
661    pub fn new() -> Self {
662        Self::default()
663    }
664
665    /// Add a handler to the chain
666    #[must_use]
667    pub fn add_handler<H: ExceptionHandler + 'static>(mut self, handler: H) -> Self {
668        self.handlers.push(Box::new(handler));
669        self
670    }
671
672    /// Handle an exception through the chain
673    ///
674    /// Returns the first non-Default action, or Default if all handlers return Default
675    #[must_use]
676    pub fn handle(&self, exception: &TaskException) -> ExceptionAction {
677        for handler in &self.handlers {
678            handler.on_exception(exception);
679            let action = handler.handle(exception);
680            if action != ExceptionAction::Default {
681                return action;
682            }
683        }
684        ExceptionAction::Default
685    }
686
687    /// Transform an exception through all handlers
688    #[must_use]
689    pub fn transform(&self, mut exception: TaskException) -> TaskException {
690        for handler in &self.handlers {
691            exception = handler.transform(exception);
692        }
693        exception
694    }
695}
696
697/// Built-in handler that logs exceptions
698pub struct LoggingExceptionHandler {
699    /// Log level for exceptions
700    pub log_level: tracing::Level,
701}
702
703impl Default for LoggingExceptionHandler {
704    fn default() -> Self {
705        Self {
706            log_level: tracing::Level::ERROR,
707        }
708    }
709}
710
711impl LoggingExceptionHandler {
712    /// Create a new logging handler
713    #[must_use]
714    pub fn new() -> Self {
715        Self::default()
716    }
717
718    /// Set log level
719    #[must_use]
720    pub fn with_level(mut self, level: tracing::Level) -> Self {
721        self.log_level = level;
722        self
723    }
724}
725
726impl ExceptionHandler for LoggingExceptionHandler {
727    fn handle(&self, _exception: &TaskException) -> ExceptionAction {
728        ExceptionAction::Default
729    }
730
731    fn on_exception(&self, exception: &TaskException) {
732        match self.log_level {
733            tracing::Level::ERROR => {
734                tracing::error!(
735                    exc_type = %exception.exc_type,
736                    exc_message = %exception.exc_message,
737                    task_id = ?exception.task_id,
738                    "Task exception occurred"
739                );
740            }
741            tracing::Level::WARN => {
742                tracing::warn!(
743                    exc_type = %exception.exc_type,
744                    exc_message = %exception.exc_message,
745                    task_id = ?exception.task_id,
746                    "Task exception occurred"
747                );
748            }
749            _ => {
750                tracing::info!(
751                    exc_type = %exception.exc_type,
752                    exc_message = %exception.exc_message,
753                    task_id = ?exception.task_id,
754                    "Task exception occurred"
755                );
756            }
757        }
758    }
759
760    fn name(&self) -> &'static str {
761        "LoggingExceptionHandler"
762    }
763}
764
765/// Built-in handler based on exception policy
766pub struct PolicyExceptionHandler {
767    policy: ExceptionPolicy,
768}
769
770impl PolicyExceptionHandler {
771    /// Create a handler from a policy
772    #[must_use]
773    pub fn new(policy: ExceptionPolicy) -> Self {
774        Self { policy }
775    }
776}
777
778impl ExceptionHandler for PolicyExceptionHandler {
779    fn handle(&self, exception: &TaskException) -> ExceptionAction {
780        self.policy.get_action(exception)
781    }
782
783    fn transform(&self, exception: TaskException) -> TaskException {
784        self.policy.process_exception(exception)
785    }
786
787    fn name(&self) -> &'static str {
788        "PolicyExceptionHandler"
789    }
790}
791
792/// Common exception types for categorization
793pub mod exception_types {
794    /// Network-related exceptions (typically retryable)
795    pub const NETWORK_EXCEPTIONS: &[&str] = &[
796        "ConnectionError",
797        "TimeoutError",
798        "ConnectionRefused",
799        "ConnectionReset",
800        "BrokenPipe",
801        "NetworkError",
802        "SocketError",
803        "DNSError",
804    ];
805
806    /// Database-related exceptions (often retryable)
807    pub const DATABASE_EXCEPTIONS: &[&str] = &[
808        "DatabaseError",
809        "OperationalError",
810        "InterfaceError",
811        "ConnectionPoolError",
812        "DeadlockError",
813        "LockTimeout",
814    ];
815
816    /// Validation exceptions (typically not retryable)
817    pub const VALIDATION_EXCEPTIONS: &[&str] = &[
818        "ValidationError",
819        "ValueError",
820        "TypeError",
821        "InvalidArgument",
822        "SchemaError",
823    ];
824
825    /// Resource exceptions (may be retryable)
826    pub const RESOURCE_EXCEPTIONS: &[&str] = &[
827        "ResourceExhausted",
828        "QuotaExceeded",
829        "RateLimitExceeded",
830        "OutOfMemory",
831        "DiskFull",
832    ];
833
834    /// Authentication exceptions (typically not retryable)
835    pub const AUTH_EXCEPTIONS: &[&str] = &[
836        "AuthenticationError",
837        "AuthorizationError",
838        "PermissionDenied",
839        "TokenExpired",
840        "InvalidCredentials",
841    ];
842}
843
844#[cfg(test)]
845mod tests {
846    use super::*;
847
848    #[test]
849    fn test_task_exception_creation() {
850        let exc = TaskException::new("ValueError", "Invalid input");
851        assert_eq!(exc.exc_type, "ValueError");
852        assert_eq!(exc.exc_message, "Invalid input");
853        assert_eq!(exc.category, ExceptionCategory::Unknown);
854    }
855
856    #[test]
857    fn test_task_exception_with_traceback() {
858        let exc = TaskException::new("RuntimeError", "Something went wrong").with_traceback(vec![
859            ("main".to_string(), "app.rs".to_string(), 10),
860            ("process".to_string(), "tasks.rs".to_string(), 25),
861        ]);
862
863        assert_eq!(exc.traceback.len(), 2);
864        assert_eq!(exc.traceback[0].function, "main");
865        assert_eq!(exc.traceback[0].file, "app.rs");
866        assert_eq!(exc.traceback[0].line, 10);
867    }
868
869    #[test]
870    fn test_task_exception_with_cause() {
871        let cause = TaskException::new("IOError", "File not found");
872        let exc = TaskException::new("ProcessingError", "Failed to process file").with_cause(cause);
873
874        assert!(exc.cause.is_some());
875        assert_eq!(exc.cause.as_ref().unwrap().exc_type, "IOError");
876    }
877
878    #[test]
879    fn test_exception_chain() {
880        let root = TaskException::new("RootError", "Root cause");
881        let middle = TaskException::new("MiddleError", "Middle error").with_cause(root);
882        let top = TaskException::new("TopError", "Top level error").with_cause(middle);
883
884        let chain = top.exception_chain();
885        assert_eq!(chain.len(), 3);
886        assert_eq!(chain[0].exc_type, "TopError");
887        assert_eq!(chain[1].exc_type, "MiddleError");
888        assert_eq!(chain[2].exc_type, "RootError");
889    }
890
891    #[test]
892    fn test_exception_category() {
893        let retryable = TaskException::new("TimeoutError", "Request timed out")
894            .with_category(ExceptionCategory::Retryable);
895        let fatal = TaskException::new("ValidationError", "Invalid data")
896            .with_category(ExceptionCategory::Fatal);
897
898        assert!(retryable.is_retryable());
899        assert!(!retryable.is_fatal());
900        assert!(fatal.is_fatal());
901        assert!(!fatal.is_retryable());
902    }
903
904    #[test]
905    fn test_exception_policy_retry_on() {
906        let policy = ExceptionPolicy::new().retry_on(&["TimeoutError", "ConnectionError"]);
907
908        let timeout_exc = TaskException::new("TimeoutError", "Timed out");
909        let validation_exc = TaskException::new("ValidationError", "Invalid");
910
911        assert_eq!(policy.get_action(&timeout_exc), ExceptionAction::Retry);
912        assert_eq!(policy.get_action(&validation_exc), ExceptionAction::Default);
913    }
914
915    #[test]
916    fn test_exception_policy_ignore_on() {
917        let policy = ExceptionPolicy::new().ignore_on(&["NotFoundError"]);
918
919        let not_found = TaskException::new("NotFoundError", "Resource not found");
920        assert_eq!(policy.get_action(&not_found), ExceptionAction::Ignore);
921    }
922
923    #[test]
924    fn test_exception_policy_fail_on() {
925        let policy = ExceptionPolicy::new().fail_on(&["ValidationError"]);
926
927        let validation = TaskException::new("ValidationError", "Invalid input");
928        assert_eq!(policy.get_action(&validation), ExceptionAction::Fail);
929    }
930
931    #[test]
932    fn test_exception_policy_pattern_matching() {
933        let policy = ExceptionPolicy::new()
934            .retry_on(&["*Error"])
935            .fail_on(&["Validation*"]);
936
937        let timeout = TaskException::new("TimeoutError", "Timed out");
938        let validation = TaskException::new("ValidationFailed", "Invalid");
939
940        // fail_on takes precedence due to order in get_action
941        assert_eq!(policy.get_action(&validation), ExceptionAction::Fail);
942        assert_eq!(policy.get_action(&timeout), ExceptionAction::Retry);
943    }
944
945    #[test]
946    fn test_exception_policy_category_fallback() {
947        let policy = ExceptionPolicy::new();
948
949        let retryable =
950            TaskException::new("CustomError", "Error").with_category(ExceptionCategory::Retryable);
951        let fatal =
952            TaskException::new("CustomError", "Error").with_category(ExceptionCategory::Fatal);
953
954        assert_eq!(policy.get_action(&retryable), ExceptionAction::Retry);
955        assert_eq!(policy.get_action(&fatal), ExceptionAction::Fail);
956    }
957
958    #[test]
959    fn test_exception_policy_default_action() {
960        let policy = ExceptionPolicy::new().with_default_action(ExceptionAction::Retry);
961
962        let unknown = TaskException::new("UnknownError", "Unknown");
963        assert_eq!(policy.get_action(&unknown), ExceptionAction::Retry);
964    }
965
966    #[test]
967    fn test_exception_policy_traceback_truncation() {
968        let policy = ExceptionPolicy::new().with_max_traceback_depth(2);
969
970        let exc = TaskException::new("Error", "Error").with_traceback(vec![
971            ("f1".to_string(), "a.rs".to_string(), 1),
972            ("f2".to_string(), "b.rs".to_string(), 2),
973            ("f3".to_string(), "c.rs".to_string(), 3),
974            ("f4".to_string(), "d.rs".to_string(), 4),
975        ]);
976
977        let processed = policy.process_exception(exc);
978        assert_eq!(processed.traceback.len(), 2);
979    }
980
981    #[test]
982    fn test_exception_policy_no_traceback() {
983        let policy = ExceptionPolicy::new().with_traceback(false);
984
985        let exc = TaskException::new("Error", "Error")
986            .with_traceback(vec![("f1".to_string(), "a.rs".to_string(), 1)])
987            .with_traceback_str("Traceback...");
988
989        let processed = policy.process_exception(exc);
990        assert!(processed.traceback.is_empty());
991        assert!(processed.traceback_str.is_none());
992    }
993
994    #[test]
995    fn test_traceback_frame_display() {
996        let frame = TracebackFrame::new("process_data", "tasks.py", 42).with_column(10);
997
998        let display = format!("{frame}");
999        assert!(display.contains("tasks.py"));
1000        assert!(display.contains("42"));
1001        assert!(display.contains("process_data"));
1002        assert!(display.contains("column 10"));
1003    }
1004
1005    #[test]
1006    fn test_task_exception_serialization() {
1007        let exc = TaskException::new("TestError", "Test message")
1008            .with_category(ExceptionCategory::Retryable)
1009            .with_traceback(vec![("test".to_string(), "test.rs".to_string(), 1)]);
1010
1011        let json = exc.to_json().unwrap();
1012        let parsed = TaskException::from_json(&json).unwrap();
1013
1014        assert_eq!(parsed.exc_type, "TestError");
1015        assert_eq!(parsed.exc_message, "Test message");
1016        assert_eq!(parsed.category, ExceptionCategory::Retryable);
1017        assert_eq!(parsed.traceback.len(), 1);
1018    }
1019
1020    #[test]
1021    fn test_task_exception_celery_format() {
1022        let exc = TaskException::new("ValueError", "Invalid value")
1023            .with_traceback_str("Traceback (most recent call last):\n  File \"test.py\"");
1024
1025        let celery = exc.to_celery_format();
1026        assert_eq!(celery["exc_type"], "ValueError");
1027        assert_eq!(celery["exc_message"], "Invalid value");
1028    }
1029
1030    #[test]
1031    fn test_exception_display() {
1032        let exc = TaskException::new("ValueError", "Invalid input");
1033        assert_eq!(format!("{exc}"), "ValueError: Invalid input");
1034    }
1035
1036    #[test]
1037    fn test_exception_handler_chain() {
1038        struct RetryHandler;
1039        impl ExceptionHandler for RetryHandler {
1040            fn handle(&self, exc: &TaskException) -> ExceptionAction {
1041                if exc.exc_type.contains("Timeout") {
1042                    ExceptionAction::Retry
1043                } else {
1044                    ExceptionAction::Default
1045                }
1046            }
1047            fn name(&self) -> &'static str {
1048                "RetryHandler"
1049            }
1050        }
1051
1052        struct FailHandler;
1053        impl ExceptionHandler for FailHandler {
1054            fn handle(&self, exc: &TaskException) -> ExceptionAction {
1055                if exc.exc_type.contains("Fatal") {
1056                    ExceptionAction::Fail
1057                } else {
1058                    ExceptionAction::Default
1059                }
1060            }
1061            fn name(&self) -> &'static str {
1062                "FailHandler"
1063            }
1064        }
1065
1066        let chain = ExceptionHandlerChain::new()
1067            .add_handler(RetryHandler)
1068            .add_handler(FailHandler);
1069
1070        let timeout = TaskException::new("TimeoutError", "Timed out");
1071        let fatal = TaskException::new("FatalError", "Fatal");
1072        let unknown = TaskException::new("UnknownError", "Unknown");
1073
1074        assert_eq!(chain.handle(&timeout), ExceptionAction::Retry);
1075        assert_eq!(chain.handle(&fatal), ExceptionAction::Fail);
1076        assert_eq!(chain.handle(&unknown), ExceptionAction::Default);
1077    }
1078
1079    #[test]
1080    fn test_policy_exception_handler() {
1081        let policy = ExceptionPolicy::new()
1082            .retry_on(&["TimeoutError"])
1083            .fail_on(&["ValidationError"]);
1084
1085        let handler = PolicyExceptionHandler::new(policy);
1086
1087        let timeout = TaskException::new("TimeoutError", "Timed out");
1088        let validation = TaskException::new("ValidationError", "Invalid");
1089
1090        assert_eq!(handler.handle(&timeout), ExceptionAction::Retry);
1091        assert_eq!(handler.handle(&validation), ExceptionAction::Fail);
1092    }
1093}