typedb_driver/common/
error.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20use std::{collections::HashSet, error::Error as StdError, fmt};
21
22use itertools::Itertools;
23use tonic::{Code, Status};
24use tonic_types::{ErrorDetails, ErrorInfo, StatusExt};
25
26use super::{address::Address, RequestID};
27
28macro_rules! error_messages {
29    {
30        $name:ident code: $code_pfx:literal, type: $message_pfx:literal,
31        $($error_name:ident $({ $($field:ident : $inner:ty),+ $(,)? })? = $code:literal: $body:literal),+ $(,)?
32    } => {
33        #[derive(Clone, Eq, PartialEq)]
34        pub enum $name {$(
35            $error_name$( { $($field: $inner),+ })?,
36        )*}
37
38        impl $name {
39            pub const PREFIX: &'static str = $code_pfx;
40
41            pub const fn code(&self) -> usize {
42                match self {$(
43                    Self::$error_name $({ $($field: _),+ })? => $code,
44                )*}
45            }
46
47            pub fn format_code(&self) -> String {
48                format!(concat!("[", $code_pfx, "{}{}]"), self.padding(), self.code())
49            }
50
51            pub fn message(&self) -> String {
52                match self {$(
53                    Self::$error_name $({$($field),+})? => format!($body $($(, $field = $field)+)?),
54                )*}
55            }
56
57            const fn max_code() -> usize {
58                let mut max = usize::MIN;
59                $(max = if $code > max { $code } else { max };)*
60                max
61            }
62
63            const fn num_digits(x: usize) -> usize {
64                if (x < 10) { 1 } else { 1 + Self::num_digits(x/10) }
65            }
66
67            const fn padding(&self) -> &'static str {
68                match Self::num_digits(Self::max_code()) - Self::num_digits(self.code()) {
69                    0 => "",
70                    1 => "0",
71                    2 => "00",
72                    3 => "000",
73                    _ => unreachable!(),
74                }
75            }
76
77            const fn name(&self) -> &'static str {
78                match self {$(
79                    Self::$error_name $({ $($field: _),+ })? => concat!(stringify!($name), "::", stringify!($error_name)),
80                )*}
81            }
82        }
83
84        impl std::fmt::Display for $name {
85            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86                write!(
87                    f,
88                    concat!("[", $code_pfx, "{}{}] ", $message_pfx, ": {}"),
89                    self.padding(),
90                    self.code(),
91                    self.message()
92                )
93            }
94        }
95
96        impl std::fmt::Debug for $name {
97            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98                let mut debug_struct = f.debug_struct(self.name());
99                debug_struct.field("message", &format!("{}", self));
100                $(
101                    $(
102                        if let Self::$error_name { $($field),+ } = &self {
103                            $(debug_struct.field(stringify!($field), &$field);)+
104                        }
105                    )?
106                )*
107                debug_struct.finish()
108            }
109        }
110
111        impl std::error::Error for $name {
112            fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
113                None
114            }
115        }
116    };
117}
118
119error_messages! { ConnectionError
120    code: "CXN", type: "Connection Error",
121    RPCMethodUnavailable { message: String } =
122        1: "The server does not support this method, please check the driver-server compatibility:\n'{message}'.",
123    ServerConnectionFailed { addresses: Vec<Address> } =
124        2: "Unable to connect to TypeDB server(s) at: \n{addresses:?}",
125    ServerConnectionFailedWithError { error: String } =
126        3: "Unable to connect to TypeDB server(s), received errors: \n{error}",
127    ServerConnectionFailedStatusError { error: String } =
128        4: "Unable to connect to TypeDB server(s), received network or protocol error: \n{error}",
129    ServerConnectionIsClosed =
130        5: "The connection has been closed and no further operation is allowed.",
131    TransactionIsClosed =
132        6: "The transaction is closed and no further operation is allowed.",
133    TransactionIsClosedWithErrors { errors: String } =
134        7: "The transaction is closed because of the error(s):\n{errors}",
135    DatabaseNotFound { name: String } =
136        8: "Database '{name}' not found.",
137    MissingResponseField { field: &'static str } =
138        9: "Missing field in message received from server: '{field}'. This is either a version compatibility issue or a bug.",
139    UnknownRequestId { request_id: RequestID } =
140        10: "Received a response with unknown request id '{request_id}'",
141    UnexpectedResponse { response: String } =
142        11: "Received unexpected response from server: '{response}'. This is either a version compatibility issue or a bug.",
143    InvalidResponseField { name: &'static str } =
144        12: "Invalid field in message received from server: '{name}'. This is either a version compatibility issue or a bug.",
145    QueryStreamNoResponse =
146        13: "Didn't receive any server responses for the query.",
147    UnexpectedQueryType { query_type: i32 } =
148        14: "Unexpected query type in message received from server: {query_type}. This is either a version compatibility issue or a bug.",
149    ClusterReplicaNotPrimary =
150        15: "The replica is not the primary replica.",
151    ClusterAllNodesFailed { errors: String } =
152        16: "Attempted connecting to all TypeDB Cluster servers, but the following errors occurred: \n{errors}.",
153    TokenCredentialInvalid =
154        17: "Invalid token credentials.",
155    EncryptionSettingsMismatch =
156        18: "Unable to connect to TypeDB: possible encryption settings mismatch.",
157    SSLCertificateNotValidated =
158        19: "SSL handshake with TypeDB failed: the server's identity could not be verified. Possible CA mismatch.",
159    BrokenPipe =
160        20: "Stream closed because of a broken pipe. This could happen if you are attempting to connect to an unencrypted TypeDB server using a TLS-enabled credentials.",
161    ConnectionFailed =
162        21: "Connection failed. Please check the server is running and the address is accessible. Encrypted TypeDB endpoints may also have misconfigured SSL certificates.",
163    MissingPort { address: String } =
164        22: "Invalid URL '{address}': missing port.",
165    AddressTranslationMismatch { unknown: HashSet<Address>, unmapped: HashSet<Address> } =
166        23: "Address translation map does not match the server's advertised address list. User-provided servers not in the advertised list: {unknown:?}. Advertised servers not mapped by user: {unmapped:?}.",
167    ValueTimeZoneNameNotRecognised { time_zone: String } =
168        24: "Time zone provided by the server has name '{time_zone}', which is not an officially recognized timezone.",
169    ValueTimeZoneOffsetNotRecognised { offset: i32 } =
170        25: "Time zone provided by the server has numerical offset '{offset}', which is not recognised as a valid value for offset in seconds.",
171    ValueStructNotImplemented =
172        26: "Struct valued responses are not yet supported by the driver.",
173    ListsNotImplemented =
174        27: "Lists are not yet supported by the driver.",
175    UnexpectedKind { kind: i32 } =
176        28: "Unexpected kind in message received from server: {kind}. This is either a version compatibility issue or a bug.",
177    UnexpectedConnectionClose =
178        29: "Connection closed unexpectedly.",
179    DatabaseImportChannelIsClosed =
180        30: "The database import channel is closed and no further operation is allowed.",
181    DatabaseImportStreamUnexpectedResponse =
182        31: "The database import stream received an unexpected response in the process. It is either a version compatibility issue or a bug.",
183    DatabaseExportChannelIsClosed =
184        32: "The database export channel is closed and no further operation is allowed.",
185    DatabaseExportStreamNoResponse =
186        33: "Didn't receive any server responses for the database export command.",
187    AbsentTlsConfigForTlsConnection =
188        34: "Could not establish a TLS connection without a TLS config specified. Please verify your driver options.",
189    TlsConnectionWithoutHttps =
190        35: "TLS connections can only be enabled when connecting to HTTPS endpoints, for example using 'https://<ip>:port'. Please modify the address, or disable TLS (WARNING: this will send passwords over plaintext).",
191    NonTlsConnectionWithHttps =
192        36: "Connecting to an https endpoint requires enabling TLS in driver options.",
193}
194
195error_messages! { ConceptError
196    code: "CPT", type: "Concept Error",
197    UnavailableRowVariable { variable: String } =
198        1: "Cannot get concept from a concept row by variable '{variable}'.",
199    UnavailableRowIndex { index: usize } =
200        2: "Cannot get concept from a concept row by index '{index}'.",
201}
202
203error_messages! { MigrationError
204    code: "MGT", type: "Migration Error",
205    CannotOpenImportFile { path: String, reason: String } =
206        1: "Cannot open import file '{path}': {reason}",
207    CannotCreateExportFile { path: String, reason: String } =
208        2: "Cannot create export file '{path}': {reason}",
209    CannotExportToTheSameFile =
210        3: "Cannot export both schema and data to the same file.",
211    CannotDecodeImportedConcept =
212        4: "Cannot decode a concept from the provided import file. Make sure to pass a correct database file produced by a TypeDB export operation.",
213    CannotDecodeImportedConceptLength =
214        5: "Cannot decode a concept length from the provided import file. Make sure to pass a correct database file produced by a TypeDB export operation.",
215    CannotEncodeExportedConcept =
216        6: "Cannot encode a concept for export. It's either a version compatibility error or a bug."
217}
218
219error_messages! { InternalError
220    code: "INT", type: "Internal Error",
221    RecvError =
222        1: "Channel is closed.",
223    SendError =
224        2: "Unable to send response over callback channel (receiver dropped).",
225    UnexpectedRequestType { request_type: String } =
226        3: "Unexpected request type for remote procedure call: {request_type}. This is either a version compatibility issue or a bug.",
227    UnexpectedResponseType { response_type: String } =
228        4: "Unexpected response type for remote procedure call: {response_type}. This is either a version compatibility issue or a bug.",
229    UnknownServer { server: Address } =
230        5: "Received replica at unrecognized server: {server}.",
231    EnumOutOfBounds { value: i32, enum_name: &'static str } =
232        6: "Value '{value}' is out of bounds for enum '{enum_name}'.",
233}
234
235#[derive(Clone, PartialEq, Eq)]
236pub struct ServerError {
237    error_code: String,
238    error_domain: String,
239    message: String,
240    stack_trace: Vec<String>,
241}
242
243impl ServerError {
244    pub(crate) fn new(error_code: String, error_domain: String, message: String, stack_trace: Vec<String>) -> Self {
245        Self { error_code, error_domain, message, stack_trace }
246    }
247
248    pub(crate) fn format_code(&self) -> &str {
249        &self.error_code
250    }
251
252    pub(crate) fn message(&self) -> String {
253        self.to_string()
254    }
255
256    fn to_string(&self) -> String {
257        if self.stack_trace.is_empty() {
258            format!("[{}] {}. {}", self.error_code, self.error_domain, self.message)
259        } else {
260            format!("\n{}", self.stack_trace.join("\nCaused: "))
261        }
262    }
263}
264
265impl fmt::Display for ServerError {
266    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267        write!(f, "{}", self.to_string())
268    }
269}
270
271impl fmt::Debug for ServerError {
272    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273        fmt::Display::fmt(self, f)
274    }
275}
276
277/// Represents errors encountered during operation.
278#[derive(Clone, Debug, PartialEq, Eq)]
279pub enum Error {
280    Connection(ConnectionError),
281    Concept(ConceptError),
282    Migration(MigrationError),
283    Internal(InternalError),
284    Server(ServerError),
285    Other(String),
286}
287
288impl Error {
289    pub fn code(&self) -> String {
290        match self {
291            Self::Connection(error) => error.format_code(),
292            Self::Concept(error) => error.format_code(),
293            Self::Migration(error) => error.format_code(),
294            Self::Internal(error) => error.format_code(),
295            Self::Server(error) => error.format_code().to_owned(),
296            Self::Other(_error) => String::new(),
297        }
298    }
299
300    pub fn message(&self) -> String {
301        match self {
302            Self::Connection(error) => error.message(),
303            Self::Concept(error) => error.message(),
304            Self::Migration(error) => error.message(),
305            Self::Internal(error) => error.message(),
306            Self::Server(error) => error.message(),
307            Self::Other(error) => error.clone(),
308        }
309    }
310
311    fn try_extracting_connection_error(status: &Status, code: &str) -> Option<ConnectionError> {
312        // TODO: We should probably catch more connection errors instead of wrapping them into
313        // ServerErrors. However, the most valuable information even for connection is inside
314        // stacktraces now.
315        match code {
316            "AUT2" | "AUT3" => Some(ConnectionError::TokenCredentialInvalid {}),
317            _ => None,
318        }
319    }
320
321    fn from_message(message: &str) -> Self {
322        // TODO: Consider converting some of the messages to connection errors
323        Self::Other(message.to_owned())
324    }
325
326    fn parse_unavailable(status_message: &str) -> Error {
327        if status_message == "broken pipe" {
328            Error::Connection(ConnectionError::BrokenPipe)
329        } else if status_message.contains("received corrupt message") {
330            Error::Connection(ConnectionError::EncryptionSettingsMismatch)
331        } else if status_message.contains("UnknownIssuer") {
332            Error::Connection(ConnectionError::SSLCertificateNotValidated)
333        } else if status_message.contains("Connection refused") {
334            Error::Connection(ConnectionError::ConnectionFailed)
335        } else {
336            Error::Connection(ConnectionError::ServerConnectionFailedStatusError { error: status_message.to_owned() })
337        }
338    }
339}
340
341impl fmt::Display for Error {
342    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343        match self {
344            Self::Connection(error) => write!(f, "{error}"),
345            Self::Concept(error) => write!(f, "{error}"),
346            Self::Migration(error) => write!(f, "{error}"),
347            Self::Internal(error) => write!(f, "{error}"),
348            Self::Server(error) => write!(f, "{error}"),
349            Self::Other(message) => write!(f, "{message}"),
350        }
351    }
352}
353
354impl StdError for Error {
355    fn source(&self) -> Option<&(dyn StdError + 'static)> {
356        match self {
357            Self::Connection(error) => Some(error),
358            Self::Concept(error) => Some(error),
359            Self::Migration(error) => Some(error),
360            Self::Internal(error) => Some(error),
361            Self::Server(_) => None,
362            Self::Other(_) => None,
363        }
364    }
365}
366
367impl From<ConnectionError> for Error {
368    fn from(error: ConnectionError) -> Self {
369        Self::Connection(error)
370    }
371}
372
373impl From<ConceptError> for Error {
374    fn from(error: ConceptError) -> Self {
375        Self::Concept(error)
376    }
377}
378
379impl From<InternalError> for Error {
380    fn from(error: InternalError) -> Self {
381        Self::Internal(error)
382    }
383}
384
385impl From<ServerError> for Error {
386    fn from(error: ServerError) -> Self {
387        Self::Server(error)
388    }
389}
390
391impl From<Status> for Error {
392    fn from(status: Status) -> Self {
393        if let Ok(details) = status.check_error_details() {
394            if let Some(bad_request) = details.bad_request() {
395                Self::Connection(ConnectionError::ServerConnectionFailedWithError {
396                    error: format!("{:?}", bad_request),
397                })
398            } else if let Some(error_info) = details.error_info() {
399                let code = error_info.reason.clone();
400                if let Some(connection_error) = Self::try_extracting_connection_error(&status, &code) {
401                    return Self::Connection(connection_error);
402                }
403                let domain = error_info.domain.clone();
404                let stack_trace =
405                    if let Some(debug_info) = details.debug_info() { debug_info.stack_entries.clone() } else { vec![] };
406
407                Self::Server(ServerError::new(code, domain, status.message().to_owned(), stack_trace))
408            } else {
409                Self::from_message(status.message())
410            }
411        } else {
412            if status.code() == Code::Unavailable {
413                Self::parse_unavailable(status.message())
414            } else if status.code() == Code::Unknown
415                || is_rst_stream(&status)
416                || status.code() == Code::FailedPrecondition
417                || status.code() == Code::AlreadyExists
418            {
419                Self::Connection(ConnectionError::ServerConnectionFailedStatusError {
420                    error: status.message().to_owned(),
421                })
422            } else if status.code() == Code::Unimplemented {
423                Self::Connection(ConnectionError::RPCMethodUnavailable { message: status.message().to_owned() })
424            } else {
425                Self::from_message(status.message())
426            }
427        }
428    }
429}
430
431fn is_rst_stream(status: &Status) -> bool {
432    // "Received Rst Stream" occurs if the server is in the process of shutting down.
433    status.message().contains("Received Rst Stream")
434}
435
436impl From<http::uri::InvalidUri> for Error {
437    fn from(err: http::uri::InvalidUri) -> Self {
438        Self::Other(err.to_string())
439    }
440}
441
442impl From<tonic::transport::Error> for Error {
443    fn from(err: tonic::transport::Error) -> Self {
444        Self::Other(err.to_string())
445    }
446}
447
448impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
449    fn from(_err: tokio::sync::mpsc::error::SendError<T>) -> Self {
450        Self::Internal(InternalError::SendError)
451    }
452}
453
454impl From<tokio::sync::oneshot::error::RecvError> for Error {
455    fn from(_err: tokio::sync::oneshot::error::RecvError) -> Self {
456        Self::Internal(InternalError::RecvError)
457    }
458}
459
460impl From<crossbeam::channel::RecvError> for Error {
461    fn from(_err: crossbeam::channel::RecvError) -> Self {
462        Self::Internal(InternalError::RecvError)
463    }
464}
465
466impl<T> From<crossbeam::channel::SendError<T>> for Error {
467    fn from(_err: crossbeam::channel::SendError<T>) -> Self {
468        Self::Internal(InternalError::SendError)
469    }
470}
471
472impl From<String> for Error {
473    fn from(err: String) -> Self {
474        Self::Other(err)
475    }
476}
477
478impl From<std::io::Error> for Error {
479    fn from(err: std::io::Error) -> Self {
480        Self::Other(err.to_string())
481    }
482}