1use base64::prelude::*;
2use prost::Message;
3use thiserror::Error;
4use tonic::{codegen::http::uri, Status};
5
6use danube_core::proto::{ErrorMessage, ErrorType};
7
8pub type Result<T> = std::result::Result<T, DanubeError>;
9
10#[derive(Debug, Error)]
11pub enum DanubeError {
12 #[error("transport error: {0}")]
13 TonicTransportError(#[from] tonic::transport::Error),
14
15 #[error("from status error: {0}, with message: {1:?}")]
16 FromStatus(tonic::Status, Option<ErrorMessage>),
17
18 #[error("unable to parse the address: {0}")]
19 UrlParseError(#[from] uri::InvalidUri),
20
21 #[error("unable to parse the address")]
22 ParseError,
23
24 #[error("unable to load the certificate: {0}")]
25 IoError(#[from] std::io::Error),
26
27 #[error("unable to perform operation: {0}")]
28 Unrecoverable(String),
29
30 #[error("invalid token")]
31 InvalidToken,
32}
33
34impl DanubeError {
35 pub fn extract_status(&self) -> Option<&tonic::Status> {
36 match self {
37 DanubeError::FromStatus(status, _) => Some(status),
38 _ => None,
39 }
40 }
41}
42
43pub(crate) fn decode_error_details(status: &Status) -> Option<ErrorMessage> {
44 if let Some(metadata_value) = status.metadata().get_bin("error-message-bin") {
45 let base64_buffer = metadata_value.as_encoded_bytes();
46
47 let buffer = BASE64_STANDARD_NO_PAD.decode(base64_buffer).unwrap();
49
50 match ErrorMessage::decode(&buffer[..]) {
52 Ok(error_message) => Some(error_message),
53 Err(err) => {
54 eprintln!("Error decoding error message: {}", err);
55 None
56 }
57 }
58 } else {
59 None
60 }
61}
62
63#[allow(dead_code)]
65pub(crate) fn error_type_from_i32(value: i32) -> Option<ErrorType> {
66 match value {
67 0 => Some(ErrorType::UnknownError),
68 1 => Some(ErrorType::InvalidTopicName),
69 2 => Some(ErrorType::TopicNotFound),
70 3 => Some(ErrorType::ServiceNotReady),
71 4 => Some(ErrorType::ProducerAlreadyExists),
72 5 => Some(ErrorType::SubscribePermissionDenied),
73 6 => Some(ErrorType::SubscriptionNotFound),
74 _ => None,
75 }
76}