1use std::{collections::HashSet, error::Error as StdError, fmt};
21
22use itertools::Itertools;
23use tonic::{Code, Status};
24use tonic_types::StatusExt;
25
26use super::{address::Address, RequestID};
27
28macro_rules! error_messages {
29 {
30 $name:ident code: $code_pfx:literal, type: $message_pfx:literal,
31 $($error_name:ident $({ $($field:ident : $inner:ty),+ $(,)? })? = $code:literal: $body:literal),+ $(,)?
32 } => {
33 #[derive(Clone, Eq, PartialEq)]
34 pub enum $name {$(
35 $error_name$( { $($field: $inner),+ })?,
36 )*}
37
38 impl $name {
39 pub const PREFIX: &'static str = $code_pfx;
40
41 pub const fn code(&self) -> usize {
42 match self {$(
43 Self::$error_name $({ $($field: _),+ })? => $code,
44 )*}
45 }
46
47 pub fn format_code(&self) -> String {
48 format!(concat!("[", $code_pfx, "{}{}]"), self.padding(), self.code())
49 }
50
51 pub fn message(&self) -> String {
52 match self {$(
53 Self::$error_name $({$($field),+})? => format!($body $($(, $field = $field)+)?),
54 )*}
55 }
56
57 const fn max_code() -> usize {
58 let mut max = usize::MIN;
59 $(max = if $code > max { $code } else { max };)*
60 max
61 }
62
63 const fn num_digits(x: usize) -> usize {
64 if (x < 10) { 1 } else { 1 + Self::num_digits(x/10) }
65 }
66
67 const fn padding(&self) -> &'static str {
68 match Self::num_digits(Self::max_code()) - Self::num_digits(self.code()) {
69 0 => "",
70 1 => "0",
71 2 => "00",
72 3 => "000",
73 _ => unreachable!(),
74 }
75 }
76
77 const fn name(&self) -> &'static str {
78 match self {$(
79 Self::$error_name $({ $($field: _),+ })? => concat!(stringify!($name), "::", stringify!($error_name)),
80 )*}
81 }
82 }
83
84 impl fmt::Display for $name {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 write!(
87 f,
88 concat!("[", $code_pfx, "{}{}] ", $message_pfx, ": {}"),
89 self.padding(),
90 self.code(),
91 self.message()
92 )
93 }
94 }
95
96 impl fmt::Debug for $name {
97 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 let mut debug_struct = f.debug_struct(self.name());
99 debug_struct.field("message", &format!("{}", self));
100 $(
101 $(
102 if let Self::$error_name { $($field),+ } = &self {
103 $(debug_struct.field(stringify!($field), &$field);)+
104 }
105 )?
106 )*
107 debug_struct.finish()
108 }
109 }
110
111 impl std::error::Error for $name {
112 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
113 None
114 }
115 }
116 };
117}
118
119error_messages! { AnalyzeError
120 code: "ANZ", type: "Analyze Error",
121 MissingResponseField { field: &'static str } =
122 1: "Missing field in message received from server: '{field}'. This is either a version compatibility issue or a bug.",
123 UnknownEnumValue { enum_name: &'static str, value: i32 } =
124 2: "Value '{value}' is out of bounds for enum '{enum_name}'. This is either a version compatibility issue or a bug.",
125}
126
127error_messages! { ConnectionError
128 code: "CXN", type: "Connection Error",
129 RPCMethodUnavailable { message: String } =
130 1: "The server does not support this method, please check the driver-server compatibility:\n'{message}'.",
131 ServerConnectionFailed { addresses: Vec<Address> } =
132 2: "Unable to connect to TypeDB server(s) at: \n{addresses:?}",
133 ServerConnectionFailedWithError { error: String } =
134 3: "Unable to connect to TypeDB server(s), received errors: \n{error}",
135 ServerConnectionFailedStatusError { error: String } =
136 4: "Unable to connect to TypeDB server(s), received network or protocol error: \n{error}",
137 ServerConnectionIsClosed =
138 5: "The connection has been closed and no further operation is allowed.",
139 TransactionIsClosed =
140 6: "The transaction is closed and no further operation is allowed.",
141 TransactionIsClosedWithErrors { errors: String } =
142 7: "The transaction is closed because of the error(s):\n{errors}",
143 DatabaseNotFound { name: String } =
144 8: "Database '{name}' not found.",
145 MissingResponseField { field: &'static str } =
146 9: "Missing field in message received from server: '{field}'. This is either a version compatibility issue or a bug.",
147 UnknownRequestId { request_id: RequestID } =
148 10: "Received a response with unknown request id '{request_id}'",
149 UnexpectedResponse { response: String } =
150 11: "Received unexpected response from server: '{response}'. This is either a version compatibility issue or a bug.",
151 InvalidResponseField { name: &'static str } =
152 12: "Invalid field in message received from server: '{name}'. This is either a version compatibility issue or a bug.",
153 QueryStreamNoResponse =
154 13: "Didn't receive any server responses for the query.",
155 UnexpectedQueryType { query_type: i32 } =
156 14: "Unexpected query type in message received from server: {query_type}. This is either a version compatibility issue or a bug.",
157 ClusterReplicaNotPrimary =
158 15: "The replica is not the primary replica.",
159 ClusterAllNodesFailed { errors: String } =
160 16: "Attempted connecting to all TypeDB Cluster servers, but the following errors occurred: \n{errors}.",
161 TokenCredentialInvalid =
162 17: "Invalid token credentials.",
163 EncryptionSettingsMismatch =
164 18: "Unable to connect to TypeDB: possible encryption settings mismatch.",
165 SSLCertificateNotValidated =
166 19: "SSL handshake with TypeDB failed: the server's identity could not be verified. Possible CA mismatch.",
167 BrokenPipe =
168 20: "Stream closed because of a broken pipe. This could happen if you are attempting to connect to an unencrypted TypeDB server using a TLS-enabled credentials.",
169 ConnectionFailed =
170 21: "Connection failed. Please check the server is running and the address is accessible. Encrypted TypeDB endpoints may also have misconfigured SSL certificates.",
171 MissingPort { address: String } =
172 22: "Invalid URL '{address}': missing port.",
173 AddressTranslationMismatch { unknown: HashSet<Address>, unmapped: HashSet<Address> } =
174 23: "Address translation map does not match the server's advertised address list. User-provided servers not in the advertised list: {unknown:?}. Advertised servers not mapped by user: {unmapped:?}.",
175 ValueTimeZoneNameNotRecognised { time_zone: String } =
176 24: "Time zone provided by the server has name '{time_zone}', which is not an officially recognized timezone.",
177 ValueTimeZoneOffsetNotRecognised { offset: i32 } =
178 25: "Time zone provided by the server has numerical offset '{offset}', which is not recognised as a valid value for offset in seconds.",
179 ValueStructNotImplemented =
180 26: "Struct valued responses are not yet supported by the driver.",
181 ListsNotImplemented =
182 27: "Lists are not yet supported by the driver.",
183 UnexpectedKind { kind: i32 } =
184 28: "Unexpected kind in message received from server: {kind}. This is either a version compatibility issue or a bug.",
185 UnexpectedConnectionClose =
186 29: "Connection closed unexpectedly.",
187 DatabaseImportChannelIsClosed =
188 30: "The database import channel is closed and no further operation is allowed.",
189 DatabaseImportStreamUnexpectedResponse =
190 31: "The database import stream received an unexpected response in the process. It is either a version compatibility issue or a bug.",
191 DatabaseExportChannelIsClosed =
192 32: "The database export channel is closed and no further operation is allowed.",
193 DatabaseExportStreamNoResponse =
194 33: "Didn't receive any server responses for the database export command.",
195 AbsentTlsConfigForTlsConnection =
196 34: "Could not establish a TLS connection without a TLS config specified. Please verify your driver options.",
197 TlsConnectionWithoutHttps =
198 35: "TLS connections can only be enabled when connecting to HTTPS endpoints, for example using 'https://<ip>:port'. Please modify the address, or disable TLS (WARNING: this will send passwords over plaintext).",
199 NonTlsConnectionWithHttps =
200 36: "Connecting to an https endpoint requires enabling TLS in driver options.",
201 AnalyzeNoResponse =
202 37: "Didn't receive any server responses for the analyze request.",
203}
204
205error_messages! { ConceptError
206 code: "CPT", type: "Concept Error",
207 UnavailableRowVariable { variable: String } =
208 1: "Cannot get concept from a concept row by variable '{variable}'.",
209 UnavailableRowIndex { index: usize } =
210 2: "Cannot get concept from a concept row by index '{index}'.",
211}
212
213error_messages! { MigrationError
214 code: "MGT", type: "Migration Error",
215 CannotOpenImportFile { path: String, reason: String } =
216 1: "Cannot open import file '{path}': {reason}",
217 CannotCreateExportFile { path: String, reason: String } =
218 2: "Cannot create export file '{path}': {reason}",
219 CannotExportToTheSameFile =
220 3: "Cannot export both schema and data to the same file.",
221 CannotDecodeImportedConcept =
222 4: "Cannot decode a concept from the provided import file. Make sure to pass a correct database file produced by a TypeDB export operation.",
223 CannotDecodeImportedConceptLength =
224 5: "Cannot decode a concept length from the provided import file. Make sure to pass a correct database file produced by a TypeDB export operation.",
225 CannotEncodeExportedConcept =
226 6: "Cannot encode a concept for export. It's either a version compatibility error or a bug."
227}
228
229error_messages! { InternalError
230 code: "INT", type: "Internal Error",
231 RecvError =
232 1: "Channel is closed.",
233 SendError =
234 2: "Unable to send response over callback channel (receiver dropped).",
235 UnexpectedRequestType { request_type: String } =
236 3: "Unexpected request type for remote procedure call: {request_type}. This is either a version compatibility issue or a bug.",
237 UnexpectedResponseType { response_type: String } =
238 4: "Unexpected response type for remote procedure call: {response_type}. This is either a version compatibility issue or a bug.",
239 UnknownServer { server: Address } =
240 5: "Received replica at unrecognized server: {server}.",
241 EnumOutOfBounds { value: i32, enum_name: &'static str } =
242 6: "Value '{value}' is out of bounds for enum '{enum_name}'.",
243}
244
245#[derive(Clone, PartialEq, Eq)]
246pub struct ServerError {
247 error_code: String,
248 error_domain: String,
249 message: String,
250 stack_trace: Vec<String>,
251}
252
253impl ServerError {
254 pub(crate) fn new(error_code: String, error_domain: String, message: String, stack_trace: Vec<String>) -> Self {
255 Self { error_code, error_domain, message, stack_trace }
256 }
257
258 pub(crate) fn format_code(&self) -> &str {
259 &self.error_code
260 }
261
262 pub(crate) fn message(&self) -> String {
263 self.to_string()
264 }
265
266 fn to_string(&self) -> String {
267 if self.stack_trace.is_empty() {
268 format!("[{}] {}. {}", self.error_code, self.error_domain, self.message)
269 } else {
270 format!("\n{}", self.stack_trace.join("\nCaused: "))
271 }
272 }
273}
274
275impl fmt::Display for ServerError {
276 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
277 write!(f, "{}", self.to_string())
278 }
279}
280
281impl fmt::Debug for ServerError {
282 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
283 fmt::Display::fmt(self, f)
284 }
285}
286
287#[derive(Clone, Debug, PartialEq, Eq)]
289pub enum Error {
290 Analyze(AnalyzeError),
291 Connection(ConnectionError),
292 Concept(ConceptError),
293 Migration(MigrationError),
294 Internal(InternalError),
295 Server(ServerError),
296 Other(String),
297}
298
299impl Error {
300 pub fn code(&self) -> String {
301 match self {
302 Self::Analyze(error) => error.format_code(),
303 Self::Connection(error) => error.format_code(),
304 Self::Concept(error) => error.format_code(),
305 Self::Migration(error) => error.format_code(),
306 Self::Internal(error) => error.format_code(),
307 Self::Server(error) => error.format_code().to_owned(),
308 Self::Other(_error) => String::new(),
309 }
310 }
311
312 pub fn message(&self) -> String {
313 match self {
314 Self::Analyze(error) => error.message(),
315 Self::Connection(error) => error.message(),
316 Self::Concept(error) => error.message(),
317 Self::Migration(error) => error.message(),
318 Self::Internal(error) => error.message(),
319 Self::Server(error) => error.message(),
320 Self::Other(error) => error.clone(),
321 }
322 }
323
324 fn try_extracting_connection_error(status: &Status, code: &str) -> Option<ConnectionError> {
325 match code {
329 "AUT2" | "AUT3" => Some(ConnectionError::TokenCredentialInvalid {}),
330 _ => None,
331 }
332 }
333
334 fn from_message(message: String) -> Self {
335 Self::Other(message)
337 }
338
339 fn parse_unavailable(status_message: &str) -> Error {
340 if status_message == "broken pipe" {
341 Error::Connection(ConnectionError::BrokenPipe)
342 } else if status_message.contains("received corrupt message") {
343 Error::Connection(ConnectionError::EncryptionSettingsMismatch)
344 } else if status_message.contains("UnknownIssuer") {
345 Error::Connection(ConnectionError::SSLCertificateNotValidated)
346 } else if status_message.contains("Connection refused") {
347 Error::Connection(ConnectionError::ConnectionFailed)
348 } else {
349 Error::Connection(ConnectionError::ServerConnectionFailedStatusError { error: status_message.to_owned() })
350 }
351 }
352}
353
354impl fmt::Display for Error {
355 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356 match self {
357 Self::Analyze(error) => write!(f, "{error}"),
358 Self::Connection(error) => write!(f, "{error}"),
359 Self::Concept(error) => write!(f, "{error}"),
360 Self::Migration(error) => write!(f, "{error}"),
361 Self::Internal(error) => write!(f, "{error}"),
362 Self::Server(error) => write!(f, "{error}"),
363 Self::Other(message) => write!(f, "{message}"),
364 }
365 }
366}
367
368impl StdError for Error {
369 fn source(&self) -> Option<&(dyn StdError + 'static)> {
370 match self {
371 Self::Analyze(error) => Some(error),
372 Self::Connection(error) => Some(error),
373 Self::Concept(error) => Some(error),
374 Self::Migration(error) => Some(error),
375 Self::Internal(error) => Some(error),
376 Self::Server(_) => None,
377 Self::Other(_) => None,
378 }
379 }
380}
381
382impl From<AnalyzeError> for Error {
383 fn from(error: AnalyzeError) -> Self {
384 Self::Analyze(error)
385 }
386}
387
388impl From<ConnectionError> for Error {
389 fn from(error: ConnectionError) -> Self {
390 Self::Connection(error)
391 }
392}
393
394impl From<ConceptError> for Error {
395 fn from(error: ConceptError) -> Self {
396 Self::Concept(error)
397 }
398}
399
400impl From<InternalError> for Error {
401 fn from(error: InternalError) -> Self {
402 Self::Internal(error)
403 }
404}
405
406impl From<ServerError> for Error {
407 fn from(error: ServerError) -> Self {
408 Self::Server(error)
409 }
410}
411
412impl From<Status> for Error {
413 fn from(status: Status) -> Self {
414 if let Ok(details) = status.check_error_details() {
415 if let Some(bad_request) = details.bad_request() {
416 Self::Connection(ConnectionError::ServerConnectionFailedWithError {
417 error: format!("{:?}", bad_request),
418 })
419 } else if let Some(error_info) = details.error_info() {
420 let code = error_info.reason.clone();
421 if let Some(connection_error) = Self::try_extracting_connection_error(&status, &code) {
422 return Self::Connection(connection_error);
423 }
424 let domain = error_info.domain.clone();
425 let stack_trace =
426 if let Some(debug_info) = details.debug_info() { debug_info.stack_entries.clone() } else { vec![] };
427
428 Self::Server(ServerError::new(code, domain, status.message().to_owned(), stack_trace))
429 } else {
430 Self::from_message(concat_source_messages(&status))
431 }
432 } else {
433 if status.code() == Code::Unavailable {
434 Self::parse_unavailable(status.message())
435 } else if status.code() == Code::Unknown
436 || is_rst_stream(&status)
437 || status.code() == Code::FailedPrecondition
438 || status.code() == Code::AlreadyExists
439 {
440 Self::Connection(ConnectionError::ServerConnectionFailedStatusError {
441 error: status.message().to_owned(),
442 })
443 } else if status.code() == Code::Unimplemented {
444 Self::Connection(ConnectionError::RPCMethodUnavailable { message: status.message().to_owned() })
445 } else {
446 Self::from_message(concat_source_messages(&status))
447 }
448 }
449 }
450}
451
452fn is_rst_stream(status: &Status) -> bool {
453 status.message().contains("Received Rst Stream")
455}
456
457fn concat_source_messages(status: &Status) -> String {
458 let mut errors = String::new();
459 errors.push_str(status.message());
460 let mut err: Option<&dyn std::error::Error> = status.source();
461 while let Some(e) = err {
462 errors.push_str(": ");
463 errors.push_str(e.to_string().as_str());
464 err = e.source();
465 }
466 errors
467}
468
469impl From<http::uri::InvalidUri> for Error {
470 fn from(err: http::uri::InvalidUri) -> Self {
471 Self::Other(err.to_string())
472 }
473}
474
475impl From<tonic::transport::Error> for Error {
476 fn from(err: tonic::transport::Error) -> Self {
477 Self::Other(err.to_string())
478 }
479}
480
481impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
482 fn from(_err: tokio::sync::mpsc::error::SendError<T>) -> Self {
483 Self::Internal(InternalError::SendError)
484 }
485}
486
487impl From<tokio::sync::oneshot::error::RecvError> for Error {
488 fn from(_err: tokio::sync::oneshot::error::RecvError) -> Self {
489 Self::Internal(InternalError::RecvError)
490 }
491}
492
493impl From<crossbeam::channel::RecvError> for Error {
494 fn from(_err: crossbeam::channel::RecvError) -> Self {
495 Self::Internal(InternalError::RecvError)
496 }
497}
498
499impl<T> From<crossbeam::channel::SendError<T>> for Error {
500 fn from(_err: crossbeam::channel::SendError<T>) -> Self {
501 Self::Internal(InternalError::SendError)
502 }
503}
504
505impl From<String> for Error {
506 fn from(err: String) -> Self {
507 Self::Other(err)
508 }
509}
510
511impl From<std::io::Error> for Error {
512 fn from(err: std::io::Error) -> Self {
513 Self::Other(err.to_string())
514 }
515}