Skip to main content

pipedream_rs/
error.rs

1use std::any::Any;
2use std::sync::Arc;
3
4/// Error from a downstream handler in the stream.
5///
6/// Errors are cloneable (via `Arc`) and can flow both:
7/// - Through the stream (subscribable via `subscribe::<RelayError>()`)
8/// - Back to senders (via `send().await` returning `Err(SendError::Downstream(...))`)
9#[derive(Debug, Clone)]
10pub struct RelayError {
11    /// The message ID that caused this error (0 if unknown).
12    pub msg_id: u64,
13    /// The underlying error.
14    pub error: Arc<dyn std::error::Error + Send + Sync>,
15    /// Coarse provenance indicator for the error source.
16    ///
17    /// This should be a compile-time constant string like "sink", "tap",
18    /// "within", "subscription", or "forwarder". Not intended for user input.
19    pub source: &'static str,
20}
21
22impl RelayError {
23    /// Create a new RelayError.
24    pub fn new<E: std::error::Error + Send + Sync + 'static>(
25        msg_id: u64,
26        error: E,
27        source: &'static str,
28    ) -> Self {
29        Self {
30            msg_id,
31            error: Arc::new(error),
32            source,
33        }
34    }
35}
36
37impl std::fmt::Display for RelayError {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "[{}] msg {}: {}", self.source, self.msg_id, self.error)
40    }
41}
42
43impl std::error::Error for RelayError {
44    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
45        Some(self.error.as_ref())
46    }
47}
48
49/// Wrapper for panic payloads, converting them to an Error type.
50///
51/// Panic payloads are intentionally lossy - we extract the message if it's
52/// a string type, otherwise provide a generic indicator. This is the right
53/// behavior for async boundaries and cross-task error propagation.
54#[derive(Debug, Clone)]
55pub struct PanicError {
56    message: String,
57}
58
59impl PanicError {
60    /// Create a PanicError from a panic payload.
61    pub fn new(payload: Box<dyn Any + Send>) -> Self {
62        let message = if let Some(s) = payload.downcast_ref::<&str>() {
63            s.to_string()
64        } else if let Some(s) = payload.downcast_ref::<String>() {
65            s.clone()
66        } else {
67            "panic occurred (non-string payload)".to_string()
68        };
69        Self { message }
70    }
71}
72
73impl std::fmt::Display for PanicError {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        write!(f, "panic: {}", self.message)
76    }
77}
78
79impl std::error::Error for PanicError {}