1use serde::{Deserialize, Serialize};
33use std::collections::HashMap;
34use std::fmt;
35
36#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
38pub enum ExceptionCategory {
39 Retryable,
41 Fatal,
43 Ignorable,
45 RequiresIntervention,
47 #[default]
49 Unknown,
50}
51
52impl fmt::Display for ExceptionCategory {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 match self {
55 Self::Retryable => write!(f, "RETRYABLE"),
56 Self::Fatal => write!(f, "FATAL"),
57 Self::Ignorable => write!(f, "IGNORABLE"),
58 Self::RequiresIntervention => write!(f, "REQUIRES_INTERVENTION"),
59 Self::Unknown => write!(f, "UNKNOWN"),
60 }
61 }
62}
63
64#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
66pub enum ExceptionAction {
67 Retry,
69 Fail,
71 Ignore,
73 Reject,
75 #[default]
77 Default,
78}
79
80impl fmt::Display for ExceptionAction {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 match self {
83 Self::Retry => write!(f, "RETRY"),
84 Self::Fail => write!(f, "FAIL"),
85 Self::Ignore => write!(f, "IGNORE"),
86 Self::Reject => write!(f, "REJECT"),
87 Self::Default => write!(f, "DEFAULT"),
88 }
89 }
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
94pub struct TracebackFrame {
95 pub function: String,
97 pub file: String,
99 pub line: u32,
101 pub column: Option<u32>,
103 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
105 pub locals: HashMap<String, String>,
106}
107
108impl TracebackFrame {
109 pub fn new(function: impl Into<String>, file: impl Into<String>, line: u32) -> Self {
111 Self {
112 function: function.into(),
113 file: file.into(),
114 line,
115 column: None,
116 locals: HashMap::new(),
117 }
118 }
119
120 #[must_use]
122 pub fn with_column(mut self, column: u32) -> Self {
123 self.column = Some(column);
124 self
125 }
126
127 #[must_use]
129 pub fn with_local(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
130 self.locals.insert(name.into(), value.into());
131 self
132 }
133}
134
135impl fmt::Display for TracebackFrame {
136 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137 write!(f, " File \"{}\", line {}", self.file, self.line)?;
138 if let Some(col) = self.column {
139 write!(f, ", column {col}")?;
140 }
141 write!(f, ", in {}", self.function)?;
142 Ok(())
143 }
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct TaskException {
149 pub exc_type: String,
151 pub exc_message: String,
153 #[serde(default)]
155 pub traceback: Vec<TracebackFrame>,
156 #[serde(default)]
158 pub traceback_str: Option<String>,
159 #[serde(default)]
161 pub category: ExceptionCategory,
162 #[serde(default)]
164 pub cause: Option<Box<TaskException>>,
165 #[serde(default)]
167 pub context: Option<Box<TaskException>>,
168 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
170 pub metadata: HashMap<String, serde_json::Value>,
171 #[serde(default)]
173 pub timestamp: Option<f64>,
174 #[serde(default)]
176 pub hostname: Option<String>,
177 #[serde(default)]
179 pub task_id: Option<String>,
180}
181
182impl TaskException {
183 pub fn new(exc_type: impl Into<String>, exc_message: impl Into<String>) -> Self {
185 Self {
186 exc_type: exc_type.into(),
187 exc_message: exc_message.into(),
188 traceback: Vec::new(),
189 traceback_str: None,
190 category: ExceptionCategory::Unknown,
191 cause: None,
192 context: None,
193 metadata: HashMap::new(),
194 timestamp: None,
195 hostname: None,
196 task_id: None,
197 }
198 }
199
200 pub fn from_error<E: std::error::Error>(error: &E) -> Self {
202 let exc_type = std::any::type_name::<E>()
203 .rsplit("::")
204 .next()
205 .unwrap_or("Error")
206 .to_string();
207
208 let mut exception = Self::new(exc_type, error.to_string());
209
210 if let Some(source) = error.source() {
212 exception.cause = Some(Box::new(Self::from_error_dyn(source)));
213 }
214
215 exception
216 }
217
218 fn from_error_dyn(error: &dyn std::error::Error) -> Self {
220 let exc_type = "Error".to_string();
221 let mut exception = Self::new(exc_type, error.to_string());
222
223 if let Some(source) = error.source() {
224 exception.cause = Some(Box::new(Self::from_error_dyn(source)));
225 }
226
227 exception
228 }
229
230 #[must_use]
232 pub fn with_traceback(mut self, frames: Vec<(String, String, u32)>) -> Self {
233 self.traceback = frames
234 .into_iter()
235 .map(|(func, file, line)| TracebackFrame::new(func, file, line))
236 .collect();
237 self
238 }
239
240 #[must_use]
242 pub fn with_traceback_frames(mut self, frames: Vec<TracebackFrame>) -> Self {
243 self.traceback = frames;
244 self
245 }
246
247 #[must_use]
249 pub fn with_traceback_str(mut self, traceback: impl Into<String>) -> Self {
250 self.traceback_str = Some(traceback.into());
251 self
252 }
253
254 #[must_use]
256 pub fn with_category(mut self, category: ExceptionCategory) -> Self {
257 self.category = category;
258 self
259 }
260
261 #[must_use]
263 pub fn with_cause(mut self, cause: TaskException) -> Self {
264 self.cause = Some(Box::new(cause));
265 self
266 }
267
268 #[must_use]
270 pub fn with_context(mut self, context: TaskException) -> Self {
271 self.context = Some(Box::new(context));
272 self
273 }
274
275 #[must_use]
277 pub fn with_metadata(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
278 self.metadata.insert(key.into(), value);
279 self
280 }
281
282 #[must_use]
284 pub fn with_timestamp(mut self, timestamp: f64) -> Self {
285 self.timestamp = Some(timestamp);
286 self
287 }
288
289 #[must_use]
291 pub fn with_timestamp_now(mut self) -> Self {
292 use std::time::{SystemTime, UNIX_EPOCH};
293 self.timestamp = Some(
294 SystemTime::now()
295 .duration_since(UNIX_EPOCH)
296 .unwrap_or_default()
297 .as_secs_f64(),
298 );
299 self
300 }
301
302 #[must_use]
304 pub fn with_hostname(mut self, hostname: impl Into<String>) -> Self {
305 self.hostname = Some(hostname.into());
306 self
307 }
308
309 #[must_use]
311 pub fn with_task_id(mut self, task_id: impl Into<String>) -> Self {
312 self.task_id = Some(task_id.into());
313 self
314 }
315
316 #[inline]
318 #[must_use]
319 pub const fn is_retryable(&self) -> bool {
320 self.category as u8 == ExceptionCategory::Retryable as u8
321 }
322
323 #[inline]
325 #[must_use]
326 pub const fn is_fatal(&self) -> bool {
327 self.category as u8 == ExceptionCategory::Fatal as u8
328 }
329
330 #[inline]
332 #[must_use]
333 pub const fn is_ignorable(&self) -> bool {
334 self.category as u8 == ExceptionCategory::Ignorable as u8
335 }
336
337 #[must_use]
339 pub fn exception_chain(&self) -> Vec<&TaskException> {
340 let mut chain = vec![self];
341 let mut current = self;
342
343 while let Some(cause) = ¤t.cause {
344 chain.push(cause);
345 current = cause;
346 }
347
348 chain
349 }
350
351 #[must_use]
353 pub fn format_traceback(&self) -> String {
354 use std::fmt::Write;
355
356 if let Some(ref tb_str) = self.traceback_str {
357 return tb_str.clone();
358 }
359
360 if self.traceback.is_empty() {
361 return String::new();
362 }
363
364 let mut result = String::from("Traceback (most recent call last):\n");
365 for frame in &self.traceback {
366 let _ = writeln!(result, "{frame}");
367 }
368 let _ = write!(result, "{}: {}", self.exc_type, self.exc_message);
369 result
370 }
371
372 pub fn to_json(&self) -> Result<String, serde_json::Error> {
378 serde_json::to_string(self)
379 }
380
381 pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
387 serde_json::from_str(json)
388 }
389
390 #[must_use]
392 pub fn to_celery_format(&self) -> serde_json::Value {
393 serde_json::json!({
394 "exc_type": self.exc_type,
395 "exc_message": self.exc_message,
396 "exc_module": self.metadata.get("module").cloned().unwrap_or(serde_json::Value::Null),
397 "traceback": self.traceback_str.clone().unwrap_or_else(|| self.format_traceback()),
398 })
399 }
400}
401
402impl fmt::Display for TaskException {
403 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404 write!(f, "{}: {}", self.exc_type, self.exc_message)
405 }
406}
407
408impl std::error::Error for TaskException {
409 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
410 self.cause
411 .as_ref()
412 .map(|e| e.as_ref() as &dyn std::error::Error)
413 }
414}
415
416#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct ExceptionPolicy {
419 #[serde(default)]
421 pub retry_on: Vec<String>,
422 #[serde(default)]
424 pub ignore_on: Vec<String>,
425 #[serde(default)]
427 pub fail_on: Vec<String>,
428 #[serde(default)]
430 pub reject_on: Vec<String>,
431 #[serde(default)]
433 pub default_action: ExceptionAction,
434 #[serde(default = "default_true")]
436 pub preserve_traceback: bool,
437 #[serde(default)]
439 pub max_traceback_depth: Option<usize>,
440 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
442 pub include_metadata: HashMap<String, bool>,
443}
444
445fn default_true() -> bool {
446 true
447}
448
449impl Default for ExceptionPolicy {
450 fn default() -> Self {
451 Self {
452 retry_on: Vec::new(),
453 ignore_on: Vec::new(),
454 fail_on: Vec::new(),
455 reject_on: Vec::new(),
456 default_action: ExceptionAction::default(),
457 preserve_traceback: true, max_traceback_depth: None,
459 include_metadata: HashMap::new(),
460 }
461 }
462}
463
464impl ExceptionPolicy {
465 #[must_use]
467 pub fn new() -> Self {
468 Self::default()
469 }
470
471 #[must_use]
473 pub fn retry_on(mut self, types: &[&str]) -> Self {
474 self.retry_on = types.iter().map(std::string::ToString::to_string).collect();
475 self
476 }
477
478 #[must_use]
480 pub fn add_retry_on(mut self, exc_type: impl Into<String>) -> Self {
481 self.retry_on.push(exc_type.into());
482 self
483 }
484
485 #[must_use]
487 pub fn ignore_on(mut self, types: &[&str]) -> Self {
488 self.ignore_on = types.iter().map(std::string::ToString::to_string).collect();
489 self
490 }
491
492 #[must_use]
494 pub fn add_ignore_on(mut self, exc_type: impl Into<String>) -> Self {
495 self.ignore_on.push(exc_type.into());
496 self
497 }
498
499 #[must_use]
501 pub fn fail_on(mut self, types: &[&str]) -> Self {
502 self.fail_on = types.iter().map(std::string::ToString::to_string).collect();
503 self
504 }
505
506 #[must_use]
508 pub fn add_fail_on(mut self, exc_type: impl Into<String>) -> Self {
509 self.fail_on.push(exc_type.into());
510 self
511 }
512
513 #[must_use]
515 pub fn reject_on(mut self, types: &[&str]) -> Self {
516 self.reject_on = types.iter().map(std::string::ToString::to_string).collect();
517 self
518 }
519
520 #[must_use]
522 pub fn add_reject_on(mut self, exc_type: impl Into<String>) -> Self {
523 self.reject_on.push(exc_type.into());
524 self
525 }
526
527 #[must_use]
529 pub fn with_default_action(mut self, action: ExceptionAction) -> Self {
530 self.default_action = action;
531 self
532 }
533
534 #[must_use]
536 pub fn with_traceback(mut self, preserve: bool) -> Self {
537 self.preserve_traceback = preserve;
538 self
539 }
540
541 #[must_use]
543 pub fn with_max_traceback_depth(mut self, depth: usize) -> Self {
544 self.max_traceback_depth = Some(depth);
545 self
546 }
547
548 #[must_use]
550 pub fn get_action(&self, exception: &TaskException) -> ExceptionAction {
551 let exc_type = &exception.exc_type;
552
553 if self.matches_pattern(exc_type, &self.ignore_on) {
555 return ExceptionAction::Ignore;
556 }
557
558 if self.matches_pattern(exc_type, &self.reject_on) {
560 return ExceptionAction::Reject;
561 }
562
563 if self.matches_pattern(exc_type, &self.fail_on) {
565 return ExceptionAction::Fail;
566 }
567
568 if self.matches_pattern(exc_type, &self.retry_on) {
570 return ExceptionAction::Retry;
571 }
572 match exception.category {
574 ExceptionCategory::Retryable => ExceptionAction::Retry,
575 ExceptionCategory::Fatal | ExceptionCategory::RequiresIntervention => {
576 ExceptionAction::Fail
577 }
578 ExceptionCategory::Ignorable => ExceptionAction::Ignore,
579 ExceptionCategory::Unknown => self.default_action,
580 }
581 }
582
583 #[allow(clippy::unused_self)]
585 fn matches_pattern(&self, exc_type: &str, patterns: &[String]) -> bool {
586 for pattern in patterns {
587 if pattern == exc_type {
588 return true;
589 }
590 if let Some(rest) = pattern.strip_prefix('*') {
592 if let Some(middle) = rest.strip_suffix('*') {
593 if !middle.is_empty() && exc_type.contains(middle) {
594 return true;
595 }
596 } else {
597 if exc_type.ends_with(rest) {
599 return true;
600 }
601 }
602 }
603 if let Some(prefix) = pattern.strip_suffix('*') {
605 if exc_type.starts_with(prefix) {
606 return true;
607 }
608 }
609 }
610 false
611 }
612
613 #[must_use]
615 pub fn process_exception(&self, mut exception: TaskException) -> TaskException {
616 if let Some(max_depth) = self.max_traceback_depth {
618 if exception.traceback.len() > max_depth {
619 exception.traceback.truncate(max_depth);
620 }
621 }
622
623 if !self.preserve_traceback {
625 exception.traceback.clear();
626 exception.traceback_str = None;
627 }
628
629 exception
630 }
631}
632
633pub trait ExceptionHandler: Send + Sync {
635 fn handle(&self, exception: &TaskException) -> ExceptionAction;
637
638 fn transform(&self, exception: TaskException) -> TaskException {
640 exception
641 }
642
643 fn on_exception(&self, _exception: &TaskException) {}
645
646 fn name(&self) -> &'static str {
648 "ExceptionHandler"
649 }
650}
651
652#[derive(Default)]
654pub struct ExceptionHandlerChain {
655 handlers: Vec<Box<dyn ExceptionHandler>>,
656}
657
658impl ExceptionHandlerChain {
659 #[must_use]
661 pub fn new() -> Self {
662 Self::default()
663 }
664
665 #[must_use]
667 pub fn add_handler<H: ExceptionHandler + 'static>(mut self, handler: H) -> Self {
668 self.handlers.push(Box::new(handler));
669 self
670 }
671
672 #[must_use]
676 pub fn handle(&self, exception: &TaskException) -> ExceptionAction {
677 for handler in &self.handlers {
678 handler.on_exception(exception);
679 let action = handler.handle(exception);
680 if action != ExceptionAction::Default {
681 return action;
682 }
683 }
684 ExceptionAction::Default
685 }
686
687 #[must_use]
689 pub fn transform(&self, mut exception: TaskException) -> TaskException {
690 for handler in &self.handlers {
691 exception = handler.transform(exception);
692 }
693 exception
694 }
695}
696
697pub struct LoggingExceptionHandler {
699 pub log_level: tracing::Level,
701}
702
703impl Default for LoggingExceptionHandler {
704 fn default() -> Self {
705 Self {
706 log_level: tracing::Level::ERROR,
707 }
708 }
709}
710
711impl LoggingExceptionHandler {
712 #[must_use]
714 pub fn new() -> Self {
715 Self::default()
716 }
717
718 #[must_use]
720 pub fn with_level(mut self, level: tracing::Level) -> Self {
721 self.log_level = level;
722 self
723 }
724}
725
726impl ExceptionHandler for LoggingExceptionHandler {
727 fn handle(&self, _exception: &TaskException) -> ExceptionAction {
728 ExceptionAction::Default
729 }
730
731 fn on_exception(&self, exception: &TaskException) {
732 match self.log_level {
733 tracing::Level::ERROR => {
734 tracing::error!(
735 exc_type = %exception.exc_type,
736 exc_message = %exception.exc_message,
737 task_id = ?exception.task_id,
738 "Task exception occurred"
739 );
740 }
741 tracing::Level::WARN => {
742 tracing::warn!(
743 exc_type = %exception.exc_type,
744 exc_message = %exception.exc_message,
745 task_id = ?exception.task_id,
746 "Task exception occurred"
747 );
748 }
749 _ => {
750 tracing::info!(
751 exc_type = %exception.exc_type,
752 exc_message = %exception.exc_message,
753 task_id = ?exception.task_id,
754 "Task exception occurred"
755 );
756 }
757 }
758 }
759
760 fn name(&self) -> &'static str {
761 "LoggingExceptionHandler"
762 }
763}
764
765pub struct PolicyExceptionHandler {
767 policy: ExceptionPolicy,
768}
769
770impl PolicyExceptionHandler {
771 #[must_use]
773 pub fn new(policy: ExceptionPolicy) -> Self {
774 Self { policy }
775 }
776}
777
778impl ExceptionHandler for PolicyExceptionHandler {
779 fn handle(&self, exception: &TaskException) -> ExceptionAction {
780 self.policy.get_action(exception)
781 }
782
783 fn transform(&self, exception: TaskException) -> TaskException {
784 self.policy.process_exception(exception)
785 }
786
787 fn name(&self) -> &'static str {
788 "PolicyExceptionHandler"
789 }
790}
791
792pub mod exception_types {
794 pub const NETWORK_EXCEPTIONS: &[&str] = &[
796 "ConnectionError",
797 "TimeoutError",
798 "ConnectionRefused",
799 "ConnectionReset",
800 "BrokenPipe",
801 "NetworkError",
802 "SocketError",
803 "DNSError",
804 ];
805
806 pub const DATABASE_EXCEPTIONS: &[&str] = &[
808 "DatabaseError",
809 "OperationalError",
810 "InterfaceError",
811 "ConnectionPoolError",
812 "DeadlockError",
813 "LockTimeout",
814 ];
815
816 pub const VALIDATION_EXCEPTIONS: &[&str] = &[
818 "ValidationError",
819 "ValueError",
820 "TypeError",
821 "InvalidArgument",
822 "SchemaError",
823 ];
824
825 pub const RESOURCE_EXCEPTIONS: &[&str] = &[
827 "ResourceExhausted",
828 "QuotaExceeded",
829 "RateLimitExceeded",
830 "OutOfMemory",
831 "DiskFull",
832 ];
833
834 pub const AUTH_EXCEPTIONS: &[&str] = &[
836 "AuthenticationError",
837 "AuthorizationError",
838 "PermissionDenied",
839 "TokenExpired",
840 "InvalidCredentials",
841 ];
842}
843
844#[cfg(test)]
845mod tests {
846 use super::*;
847
848 #[test]
849 fn test_task_exception_creation() {
850 let exc = TaskException::new("ValueError", "Invalid input");
851 assert_eq!(exc.exc_type, "ValueError");
852 assert_eq!(exc.exc_message, "Invalid input");
853 assert_eq!(exc.category, ExceptionCategory::Unknown);
854 }
855
856 #[test]
857 fn test_task_exception_with_traceback() {
858 let exc = TaskException::new("RuntimeError", "Something went wrong").with_traceback(vec![
859 ("main".to_string(), "app.rs".to_string(), 10),
860 ("process".to_string(), "tasks.rs".to_string(), 25),
861 ]);
862
863 assert_eq!(exc.traceback.len(), 2);
864 assert_eq!(exc.traceback[0].function, "main");
865 assert_eq!(exc.traceback[0].file, "app.rs");
866 assert_eq!(exc.traceback[0].line, 10);
867 }
868
869 #[test]
870 fn test_task_exception_with_cause() {
871 let cause = TaskException::new("IOError", "File not found");
872 let exc = TaskException::new("ProcessingError", "Failed to process file").with_cause(cause);
873
874 assert!(exc.cause.is_some());
875 assert_eq!(exc.cause.as_ref().unwrap().exc_type, "IOError");
876 }
877
878 #[test]
879 fn test_exception_chain() {
880 let root = TaskException::new("RootError", "Root cause");
881 let middle = TaskException::new("MiddleError", "Middle error").with_cause(root);
882 let top = TaskException::new("TopError", "Top level error").with_cause(middle);
883
884 let chain = top.exception_chain();
885 assert_eq!(chain.len(), 3);
886 assert_eq!(chain[0].exc_type, "TopError");
887 assert_eq!(chain[1].exc_type, "MiddleError");
888 assert_eq!(chain[2].exc_type, "RootError");
889 }
890
891 #[test]
892 fn test_exception_category() {
893 let retryable = TaskException::new("TimeoutError", "Request timed out")
894 .with_category(ExceptionCategory::Retryable);
895 let fatal = TaskException::new("ValidationError", "Invalid data")
896 .with_category(ExceptionCategory::Fatal);
897
898 assert!(retryable.is_retryable());
899 assert!(!retryable.is_fatal());
900 assert!(fatal.is_fatal());
901 assert!(!fatal.is_retryable());
902 }
903
904 #[test]
905 fn test_exception_policy_retry_on() {
906 let policy = ExceptionPolicy::new().retry_on(&["TimeoutError", "ConnectionError"]);
907
908 let timeout_exc = TaskException::new("TimeoutError", "Timed out");
909 let validation_exc = TaskException::new("ValidationError", "Invalid");
910
911 assert_eq!(policy.get_action(&timeout_exc), ExceptionAction::Retry);
912 assert_eq!(policy.get_action(&validation_exc), ExceptionAction::Default);
913 }
914
915 #[test]
916 fn test_exception_policy_ignore_on() {
917 let policy = ExceptionPolicy::new().ignore_on(&["NotFoundError"]);
918
919 let not_found = TaskException::new("NotFoundError", "Resource not found");
920 assert_eq!(policy.get_action(¬_found), ExceptionAction::Ignore);
921 }
922
923 #[test]
924 fn test_exception_policy_fail_on() {
925 let policy = ExceptionPolicy::new().fail_on(&["ValidationError"]);
926
927 let validation = TaskException::new("ValidationError", "Invalid input");
928 assert_eq!(policy.get_action(&validation), ExceptionAction::Fail);
929 }
930
931 #[test]
932 fn test_exception_policy_pattern_matching() {
933 let policy = ExceptionPolicy::new()
934 .retry_on(&["*Error"])
935 .fail_on(&["Validation*"]);
936
937 let timeout = TaskException::new("TimeoutError", "Timed out");
938 let validation = TaskException::new("ValidationFailed", "Invalid");
939
940 assert_eq!(policy.get_action(&validation), ExceptionAction::Fail);
942 assert_eq!(policy.get_action(&timeout), ExceptionAction::Retry);
943 }
944
945 #[test]
946 fn test_exception_policy_category_fallback() {
947 let policy = ExceptionPolicy::new();
948
949 let retryable =
950 TaskException::new("CustomError", "Error").with_category(ExceptionCategory::Retryable);
951 let fatal =
952 TaskException::new("CustomError", "Error").with_category(ExceptionCategory::Fatal);
953
954 assert_eq!(policy.get_action(&retryable), ExceptionAction::Retry);
955 assert_eq!(policy.get_action(&fatal), ExceptionAction::Fail);
956 }
957
958 #[test]
959 fn test_exception_policy_default_action() {
960 let policy = ExceptionPolicy::new().with_default_action(ExceptionAction::Retry);
961
962 let unknown = TaskException::new("UnknownError", "Unknown");
963 assert_eq!(policy.get_action(&unknown), ExceptionAction::Retry);
964 }
965
966 #[test]
967 fn test_exception_policy_traceback_truncation() {
968 let policy = ExceptionPolicy::new().with_max_traceback_depth(2);
969
970 let exc = TaskException::new("Error", "Error").with_traceback(vec![
971 ("f1".to_string(), "a.rs".to_string(), 1),
972 ("f2".to_string(), "b.rs".to_string(), 2),
973 ("f3".to_string(), "c.rs".to_string(), 3),
974 ("f4".to_string(), "d.rs".to_string(), 4),
975 ]);
976
977 let processed = policy.process_exception(exc);
978 assert_eq!(processed.traceback.len(), 2);
979 }
980
981 #[test]
982 fn test_exception_policy_no_traceback() {
983 let policy = ExceptionPolicy::new().with_traceback(false);
984
985 let exc = TaskException::new("Error", "Error")
986 .with_traceback(vec![("f1".to_string(), "a.rs".to_string(), 1)])
987 .with_traceback_str("Traceback...");
988
989 let processed = policy.process_exception(exc);
990 assert!(processed.traceback.is_empty());
991 assert!(processed.traceback_str.is_none());
992 }
993
994 #[test]
995 fn test_traceback_frame_display() {
996 let frame = TracebackFrame::new("process_data", "tasks.py", 42).with_column(10);
997
998 let display = format!("{frame}");
999 assert!(display.contains("tasks.py"));
1000 assert!(display.contains("42"));
1001 assert!(display.contains("process_data"));
1002 assert!(display.contains("column 10"));
1003 }
1004
1005 #[test]
1006 fn test_task_exception_serialization() {
1007 let exc = TaskException::new("TestError", "Test message")
1008 .with_category(ExceptionCategory::Retryable)
1009 .with_traceback(vec![("test".to_string(), "test.rs".to_string(), 1)]);
1010
1011 let json = exc.to_json().unwrap();
1012 let parsed = TaskException::from_json(&json).unwrap();
1013
1014 assert_eq!(parsed.exc_type, "TestError");
1015 assert_eq!(parsed.exc_message, "Test message");
1016 assert_eq!(parsed.category, ExceptionCategory::Retryable);
1017 assert_eq!(parsed.traceback.len(), 1);
1018 }
1019
1020 #[test]
1021 fn test_task_exception_celery_format() {
1022 let exc = TaskException::new("ValueError", "Invalid value")
1023 .with_traceback_str("Traceback (most recent call last):\n File \"test.py\"");
1024
1025 let celery = exc.to_celery_format();
1026 assert_eq!(celery["exc_type"], "ValueError");
1027 assert_eq!(celery["exc_message"], "Invalid value");
1028 }
1029
1030 #[test]
1031 fn test_exception_display() {
1032 let exc = TaskException::new("ValueError", "Invalid input");
1033 assert_eq!(format!("{exc}"), "ValueError: Invalid input");
1034 }
1035
1036 #[test]
1037 fn test_exception_handler_chain() {
1038 struct RetryHandler;
1039 impl ExceptionHandler for RetryHandler {
1040 fn handle(&self, exc: &TaskException) -> ExceptionAction {
1041 if exc.exc_type.contains("Timeout") {
1042 ExceptionAction::Retry
1043 } else {
1044 ExceptionAction::Default
1045 }
1046 }
1047 fn name(&self) -> &'static str {
1048 "RetryHandler"
1049 }
1050 }
1051
1052 struct FailHandler;
1053 impl ExceptionHandler for FailHandler {
1054 fn handle(&self, exc: &TaskException) -> ExceptionAction {
1055 if exc.exc_type.contains("Fatal") {
1056 ExceptionAction::Fail
1057 } else {
1058 ExceptionAction::Default
1059 }
1060 }
1061 fn name(&self) -> &'static str {
1062 "FailHandler"
1063 }
1064 }
1065
1066 let chain = ExceptionHandlerChain::new()
1067 .add_handler(RetryHandler)
1068 .add_handler(FailHandler);
1069
1070 let timeout = TaskException::new("TimeoutError", "Timed out");
1071 let fatal = TaskException::new("FatalError", "Fatal");
1072 let unknown = TaskException::new("UnknownError", "Unknown");
1073
1074 assert_eq!(chain.handle(&timeout), ExceptionAction::Retry);
1075 assert_eq!(chain.handle(&fatal), ExceptionAction::Fail);
1076 assert_eq!(chain.handle(&unknown), ExceptionAction::Default);
1077 }
1078
1079 #[test]
1080 fn test_policy_exception_handler() {
1081 let policy = ExceptionPolicy::new()
1082 .retry_on(&["TimeoutError"])
1083 .fail_on(&["ValidationError"]);
1084
1085 let handler = PolicyExceptionHandler::new(policy);
1086
1087 let timeout = TaskException::new("TimeoutError", "Timed out");
1088 let validation = TaskException::new("ValidationError", "Invalid");
1089
1090 assert_eq!(handler.handle(&timeout), ExceptionAction::Retry);
1091 assert_eq!(handler.handle(&validation), ExceptionAction::Fail);
1092 }
1093}