1#[derive(thiserror::Error, Debug)]
5pub enum WorkerError {
6 #[error("failed to connect to worker endpoint: {source}")]
8 Connect {
9 source: tonic::transport::Error,
11 },
12
13 #[error("worker handshake failed: {source}")]
15 Handshake {
16 source: tonic::Status,
18 },
19
20 #[error("worker registration failed: {source}")]
22 Registration {
23 source: Box<dyn std::error::Error + Send + Sync + 'static>,
25 },
26
27 #[error("failed to decode worker payload: {source}")]
29 Decode {
30 source: Box<dyn std::error::Error + Send + Sync + 'static>,
32 },
33
34 #[error("failed to encode worker payload: {source}")]
36 Encode {
37 source: Box<dyn std::error::Error + Send + Sync + 'static>,
39 },
40
41 #[error("worker transport failed: {source}")]
43 Transport {
44 source: tonic::Status,
46 },
47
48 #[error(
55 "worker session drop budget exhausted: the server repeatedly closed the stream cleanly"
56 )]
57 CleanCloseExhausted,
58}
59
60impl WorkerError {
61 pub fn registration(source: impl std::error::Error + Send + Sync + 'static) -> Self {
63 Self::Registration {
64 source: Box::new(source),
65 }
66 }
67
68 #[must_use]
74 pub fn grpc_status(&self) -> Option<&tonic::Status> {
75 match self {
76 Self::Handshake { source } | Self::Transport { source } => Some(source),
77 Self::Registration { source } => source.downcast_ref::<tonic::Status>(),
78 Self::Connect { .. }
79 | Self::Decode { .. }
80 | Self::Encode { .. }
81 | Self::CleanCloseExhausted => None,
82 }
83 }
84
85 #[must_use]
93 pub fn is_retryable(&self) -> bool {
94 !matches!(
95 self.grpc_status().map(tonic::Status::code),
96 Some(tonic::Code::PermissionDenied | tonic::Code::Unauthenticated)
97 )
98 }
99
100 pub fn decode(source: impl std::error::Error + Send + Sync + 'static) -> Self {
102 Self::Decode {
103 source: Box::new(source),
104 }
105 }
106
107 pub fn encode(source: impl std::error::Error + Send + Sync + 'static) -> Self {
109 Self::Encode {
110 source: Box::new(source),
111 }
112 }
113}
114
115#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
117#[error("activity type `{activity_type}` has no registered handler")]
118pub struct MissingActivityHandler {
119 pub activity_type: String,
121}
122
123#[cfg(test)]
124mod tests {
125 use super::{MissingActivityHandler, WorkerError};
126
127 fn assert_send_sync_static<T: Send + Sync + 'static>() {}
128
129 #[test]
130 fn worker_error_is_send_sync_static() {
131 assert_send_sync_static::<WorkerError>();
132 }
133
134 #[test]
135 fn display_messages_name_failed_condition() {
136 let error = WorkerError::registration(MissingActivityHandler {
137 activity_type: String::from("charge-card"),
138 });
139
140 assert_eq!(
141 error.to_string(),
142 "worker registration failed: activity type `charge-card` has no registered handler"
143 );
144 }
145
146 #[test]
147 fn registration_error_exposes_boxed_grpc_status() {
148 let error = WorkerError::registration(tonic::Status::permission_denied(
149 "namespace `payments` is not granted to subject `worker-a`",
150 ));
151
152 let status = error.grpc_status();
153 assert!(matches!(
154 status.map(tonic::Status::code),
155 Some(tonic::Code::PermissionDenied)
156 ));
157 assert_eq!(
158 status.map(tonic::Status::message),
159 Some("namespace `payments` is not granted to subject `worker-a`")
160 );
161 }
162
163 #[test]
164 fn permission_denied_and_unauthenticated_are_not_retryable() {
165 let denied = WorkerError::Handshake {
166 source: tonic::Status::permission_denied("namespace not granted"),
167 };
168 let unauthenticated = WorkerError::Transport {
169 source: tonic::Status::unauthenticated("credentials rejected"),
170 };
171 let denied_registration =
172 WorkerError::registration(tonic::Status::permission_denied("namespace not granted"));
173
174 assert!(!denied.is_retryable());
175 assert!(!unauthenticated.is_retryable());
176 assert!(!denied_registration.is_retryable());
177 }
178
179 #[test]
180 fn transient_and_non_grpc_failures_stay_retryable() {
181 let unavailable = WorkerError::Transport {
182 source: tonic::Status::unavailable("engine unreachable"),
183 };
184 let local_registration = WorkerError::registration(MissingActivityHandler {
185 activity_type: String::from("charge-card"),
186 });
187
188 assert!(unavailable.is_retryable());
189 assert!(local_registration.is_retryable());
190 assert!(local_registration.grpc_status().is_none());
191 }
192}