Skip to main content

aion_worker/
error.rs

1//! `WorkerError` taxonomy.
2
3/// Errors produced by worker configuration, protocol, transport, and payload boundaries.
4#[derive(thiserror::Error, Debug)]
5pub enum WorkerError {
6    /// The worker could not connect to the configured endpoint.
7    #[error("failed to connect to worker endpoint: {source}")]
8    Connect {
9        /// Transport connection failure reported by tonic.
10        source: tonic::transport::Error,
11    },
12
13    /// The worker could not perform the initial protocol handshake.
14    #[error("worker handshake failed: {source}")]
15    Handshake {
16        /// Handshake failure reported by the underlying transport.
17        source: tonic::Status,
18    },
19
20    /// The worker could not register activity types.
21    #[error("worker registration failed: {source}")]
22    Registration {
23        /// Registration failure source.
24        source: Box<dyn std::error::Error + Send + Sync + 'static>,
25    },
26
27    /// A payload or wire value could not be decoded.
28    #[error("failed to decode worker payload: {source}")]
29    Decode {
30        /// Decode failure source.
31        source: Box<dyn std::error::Error + Send + Sync + 'static>,
32    },
33
34    /// A payload or wire value could not be encoded.
35    #[error("failed to encode worker payload: {source}")]
36    Encode {
37        /// Encode failure source.
38        source: Box<dyn std::error::Error + Send + Sync + 'static>,
39    },
40
41    /// An established worker transport failed.
42    #[error("worker transport failed: {source}")]
43    Transport {
44        /// Transport failure reported by tonic.
45        source: tonic::Status,
46    },
47
48    /// The server kept closing the worker stream cleanly until the cumulative
49    /// session-drop budget ran out without any session proving healthy.
50    ///
51    /// A single clean close is a retryable drop (the worker redials through
52    /// the budgeted backoff cycle); this error surfaces only when a
53    /// persistent clean-close loop exhausts `reconnect.max_attempts`.
54    #[error(
55        "worker session drop budget exhausted: the server repeatedly closed the stream cleanly"
56    )]
57    CleanCloseExhausted,
58}
59
60impl WorkerError {
61    /// Creates a registration error from any source error.
62    pub fn registration(source: impl std::error::Error + Send + Sync + 'static) -> Self {
63        Self::Registration {
64            source: Box::new(source),
65        }
66    }
67
68    /// Returns the underlying gRPC status carried by this error, if any.
69    ///
70    /// Handshake and transport failures carry a [`tonic::Status`] directly;
71    /// registration failures preserve the status as their boxed source when the
72    /// server rejected the registration over the wire.
73    #[must_use]
74    pub fn grpc_status(&self) -> Option<&tonic::Status> {
75        match self {
76            Self::Handshake { source } | Self::Transport { source } => Some(source),
77            Self::Registration { source } => source.downcast_ref::<tonic::Status>(),
78            Self::Connect { .. }
79            | Self::Decode { .. }
80            | Self::Encode { .. }
81            | Self::CleanCloseExhausted => None,
82        }
83    }
84
85    /// Returns whether retrying connection or registration can ever succeed.
86    ///
87    /// `PermissionDenied` and `Unauthenticated` are deterministic server
88    /// denials (ungranted namespace, rejected credentials): retrying them only
89    /// burns the reconnect budget and delays the surfaced error. Every other
90    /// failure (transport unavailability, decode faults, local validation) is
91    /// treated as transient for the bounded backoff loop.
92    #[must_use]
93    pub fn is_retryable(&self) -> bool {
94        !matches!(
95            self.grpc_status().map(tonic::Status::code),
96            Some(tonic::Code::PermissionDenied | tonic::Code::Unauthenticated)
97        )
98    }
99
100    /// Creates a decode error from any source error.
101    pub fn decode(source: impl std::error::Error + Send + Sync + 'static) -> Self {
102        Self::Decode {
103            source: Box::new(source),
104        }
105    }
106
107    /// Creates an encode error from any source error.
108    pub fn encode(source: impl std::error::Error + Send + Sync + 'static) -> Self {
109        Self::Encode {
110            source: Box::new(source),
111        }
112    }
113}
114
115/// Error returned before serving when a requested activity type has no handler.
116#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
117#[error("activity type `{activity_type}` has no registered handler")]
118pub struct MissingActivityHandler {
119    /// Activity type requested for registration.
120    pub activity_type: String,
121}
122
123#[cfg(test)]
124mod tests {
125    use super::{MissingActivityHandler, WorkerError};
126
127    fn assert_send_sync_static<T: Send + Sync + 'static>() {}
128
129    #[test]
130    fn worker_error_is_send_sync_static() {
131        assert_send_sync_static::<WorkerError>();
132    }
133
134    #[test]
135    fn display_messages_name_failed_condition() {
136        let error = WorkerError::registration(MissingActivityHandler {
137            activity_type: String::from("charge-card"),
138        });
139
140        assert_eq!(
141            error.to_string(),
142            "worker registration failed: activity type `charge-card` has no registered handler"
143        );
144    }
145
146    #[test]
147    fn registration_error_exposes_boxed_grpc_status() {
148        let error = WorkerError::registration(tonic::Status::permission_denied(
149            "namespace `payments` is not granted to subject `worker-a`",
150        ));
151
152        let status = error.grpc_status();
153        assert!(matches!(
154            status.map(tonic::Status::code),
155            Some(tonic::Code::PermissionDenied)
156        ));
157        assert_eq!(
158            status.map(tonic::Status::message),
159            Some("namespace `payments` is not granted to subject `worker-a`")
160        );
161    }
162
163    #[test]
164    fn permission_denied_and_unauthenticated_are_not_retryable() {
165        let denied = WorkerError::Handshake {
166            source: tonic::Status::permission_denied("namespace not granted"),
167        };
168        let unauthenticated = WorkerError::Transport {
169            source: tonic::Status::unauthenticated("credentials rejected"),
170        };
171        let denied_registration =
172            WorkerError::registration(tonic::Status::permission_denied("namespace not granted"));
173
174        assert!(!denied.is_retryable());
175        assert!(!unauthenticated.is_retryable());
176        assert!(!denied_registration.is_retryable());
177    }
178
179    #[test]
180    fn transient_and_non_grpc_failures_stay_retryable() {
181        let unavailable = WorkerError::Transport {
182            source: tonic::Status::unavailable("engine unreachable"),
183        };
184        let local_registration = WorkerError::registration(MissingActivityHandler {
185            activity_type: String::from("charge-card"),
186        });
187
188        assert!(unavailable.is_retryable());
189        assert!(local_registration.is_retryable());
190        assert!(local_registration.grpc_status().is_none());
191    }
192}