#[derive(thiserror::Error, Debug)]
pub enum WorkerError {
#[error("failed to connect to worker endpoint: {source}")]
Connect {
source: tonic::transport::Error,
},
#[error("worker handshake failed: {source}")]
Handshake {
source: tonic::Status,
},
#[error("worker registration failed: {source}")]
Registration {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("failed to decode worker payload: {source}")]
Decode {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("failed to encode worker payload: {source}")]
Encode {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("worker transport failed: {source}")]
Transport {
source: tonic::Status,
},
#[error(
"worker session drop budget exhausted: the server repeatedly closed the stream cleanly"
)]
CleanCloseExhausted,
}
impl WorkerError {
pub fn registration(source: impl std::error::Error + Send + Sync + 'static) -> Self {
Self::Registration {
source: Box::new(source),
}
}
#[must_use]
pub fn grpc_status(&self) -> Option<&tonic::Status> {
match self {
Self::Handshake { source } | Self::Transport { source } => Some(source),
Self::Registration { source } => source.downcast_ref::<tonic::Status>(),
Self::Connect { .. }
| Self::Decode { .. }
| Self::Encode { .. }
| Self::CleanCloseExhausted => None,
}
}
#[must_use]
pub fn is_retryable(&self) -> bool {
!matches!(
self.grpc_status().map(tonic::Status::code),
Some(tonic::Code::PermissionDenied | tonic::Code::Unauthenticated)
)
}
pub fn decode(source: impl std::error::Error + Send + Sync + 'static) -> Self {
Self::Decode {
source: Box::new(source),
}
}
pub fn encode(source: impl std::error::Error + Send + Sync + 'static) -> Self {
Self::Encode {
source: Box::new(source),
}
}
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
#[error("activity type `{activity_type}` has no registered handler")]
pub struct MissingActivityHandler {
pub activity_type: String,
}
#[cfg(test)]
mod tests {
use super::{MissingActivityHandler, WorkerError};
fn assert_send_sync_static<T: Send + Sync + 'static>() {}
#[test]
fn worker_error_is_send_sync_static() {
assert_send_sync_static::<WorkerError>();
}
#[test]
fn display_messages_name_failed_condition() {
let error = WorkerError::registration(MissingActivityHandler {
activity_type: String::from("charge-card"),
});
assert_eq!(
error.to_string(),
"worker registration failed: activity type `charge-card` has no registered handler"
);
}
#[test]
fn registration_error_exposes_boxed_grpc_status() {
let error = WorkerError::registration(tonic::Status::permission_denied(
"namespace `payments` is not granted to subject `worker-a`",
));
let status = error.grpc_status();
assert!(matches!(
status.map(tonic::Status::code),
Some(tonic::Code::PermissionDenied)
));
assert_eq!(
status.map(tonic::Status::message),
Some("namespace `payments` is not granted to subject `worker-a`")
);
}
#[test]
fn permission_denied_and_unauthenticated_are_not_retryable() {
let denied = WorkerError::Handshake {
source: tonic::Status::permission_denied("namespace not granted"),
};
let unauthenticated = WorkerError::Transport {
source: tonic::Status::unauthenticated("credentials rejected"),
};
let denied_registration =
WorkerError::registration(tonic::Status::permission_denied("namespace not granted"));
assert!(!denied.is_retryable());
assert!(!unauthenticated.is_retryable());
assert!(!denied_registration.is_retryable());
}
#[test]
fn transient_and_non_grpc_failures_stay_retryable() {
let unavailable = WorkerError::Transport {
source: tonic::Status::unavailable("engine unreachable"),
};
let local_registration = WorkerError::registration(MissingActivityHandler {
activity_type: String::from("charge-card"),
});
assert!(unavailable.is_retryable());
assert!(local_registration.is_retryable());
assert!(local_registration.grpc_status().is_none());
}
}