worterbuch_common/
error.rs

1/*
2 *  Worterbuch error handling module
3 *
4 *  Copyright (C) 2024 Michael Bachmann
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU Affero General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU Affero General Public License for more details.
15 *
16 *  You should have received a copy of the GNU Affero General Public License
17 *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20use crate::{
21    AuthCheckOwned, ClientMessage, ErrorCode, Key, MetaData, Privilege, ProtocolVersionSegment,
22    RequestPattern, TransactionId, server::Err,
23};
24use http::StatusCode;
25#[cfg(feature = "ws")]
26use http::header::InvalidHeaderValue;
27use jsonwebtoken::Algorithm;
28use miette::Diagnostic;
29use opentelemetry_otlp::ExporterBuildError;
30use std::{fmt, io, net::AddrParseError, num::ParseIntError};
31use thiserror::Error;
32use tokio::sync::{
33    broadcast,
34    mpsc::{self, error::SendError},
35    oneshot,
36};
37use uuid::Uuid;
38
39#[derive(Debug, Error, Diagnostic)]
40pub enum ConfigError {
41    #[error("invalid separator: {0}; separator must be a single ASCII char")]
42    InvalidSeparator(String),
43    #[error("invalid wildcard: {0}; wildcard must be a single ASCII char")]
44    InvalidWildcard(String),
45    #[error("invalid multi-wildcard: {0}; multi-wildcard must be a single ASCII char")]
46    InvalidMultiWildcard(String),
47    #[error("invalid port: {0}")]
48    InvalidPort(ParseIntError),
49    #[error("invalid address: {0}")]
50    InvalidAddr(AddrParseError),
51    #[error("invalid interval: {0}")]
52    InvalidInterval(ParseIntError),
53    #[error("license file could not be loaded: {0}")]
54    InvalidLicense(String),
55    #[error("could not load config file: {0}")]
56    IoError(#[from] io::Error),
57    #[error("could not load config file: {0}")]
58    YamlError(#[from] serde_yaml::Error),
59    #[error("error setting up telemetry: {0}")]
60    ExporterBuildError(#[from] ExporterBuildError),
61    #[error("Parse error: {0}")]
62    ParseError(#[from] serde_json::Error),
63}
64
65pub trait ConfigIntContext<I> {
66    fn to_port(self) -> Result<I, ConfigError>;
67    fn to_interval(self) -> Result<I, ConfigError>;
68}
69
70impl<I> ConfigIntContext<I> for Result<I, ParseIntError> {
71    fn to_port(self) -> Result<I, ConfigError> {
72        self.map_err(ConfigError::InvalidPort)
73    }
74    fn to_interval(self) -> Result<I, ConfigError> {
75        self.map_err(ConfigError::InvalidInterval)
76    }
77}
78
79impl From<AddrParseError> for ConfigError {
80    fn from(e: AddrParseError) -> Self {
81        ConfigError::InvalidAddr(e)
82    }
83}
84
85pub type ConfigResult<T> = std::result::Result<T, ConfigError>;
86
87pub trait Context<T, E: std::error::Error> {
88    fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError>;
89}
90#[derive(Debug, Clone, Error, Diagnostic)]
91pub enum AuthorizationError {
92    #[error("Client lacks privilege '{0} {1}'")]
93    InsufficientPrivileges(Privilege, AuthCheckOwned),
94    #[error("No JWT was included in the request")]
95    MissingToken,
96    #[error("No JWT was configured")]
97    MissingSecret,
98    // TODO should this not trigger a crash?
99    #[error("Incorrect check provided. This is a bug.")]
100    InvalidCheck,
101    #[error("Unsupported encryption algorith: {0:?}")]
102    UnsupportedEncryptionAlgorithm(Algorithm),
103    #[error("JWT error: {0}")]
104    JwtError(#[from] jsonwebtoken::errors::Error),
105}
106
107pub type AuthorizationResult<T> = Result<T, AuthorizationError>;
108
109#[derive(Debug, Diagnostic, thiserror::Error)]
110pub enum WorterbuchError {
111    IllegalWildcard(RequestPattern),
112    IllegalMultiWildcard(RequestPattern),
113    MultiWildcardAtIllegalPosition(RequestPattern),
114    NoSuchValue(Key),
115    NotSubscribed,
116    IoError(io::Error, MetaData),
117    SerDeError(serde_json::Error, MetaData),
118    InvalidServerResponse(MetaData),
119    Other(Box<dyn std::error::Error + Send + Sync>, MetaData),
120    ServerResponse(Err),
121    ProtocolNegotiationFailed(ProtocolVersionSegment),
122    ReadOnlyKey(Key),
123    AuthorizationRequired(Privilege),
124    AlreadyAuthorized,
125    Unauthorized(#[from] AuthorizationError),
126    NoPubStream(TransactionId),
127    NotLeader,
128    Cas,
129    CasVersionMismatch,
130    NotImplemented,
131    KeyIsLocked(Key),
132    KeyIsNotLocked(Key),
133    FeatureDisabled(MetaData),
134    ClientIdCollision(Uuid),
135}
136
137impl fmt::Display for WorterbuchError {
138    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139        match self {
140            WorterbuchError::IllegalWildcard(rp) => {
141                write!(f, "Key contains illegal wildcard: {rp}")
142            }
143            WorterbuchError::IllegalMultiWildcard(rp) => {
144                write!(f, "Key contains illegal multi-wildcard: {rp}")
145            }
146            WorterbuchError::MultiWildcardAtIllegalPosition(rp) => {
147                write!(f, "Key contains multi-wildcard at illegal position: {rp}")
148            }
149            WorterbuchError::NoSuchValue(key) => write!(f, "no value for key '{key}'"),
150            WorterbuchError::NotSubscribed => write!(f, "no such subscription"),
151            WorterbuchError::IoError(e, meta) => write!(f, "{meta}: {e}"),
152            WorterbuchError::SerDeError(e, meta) => write!(f, "{meta}: {e}"),
153            WorterbuchError::Other(e, meta) => write!(f, "{meta}: {e}"),
154            WorterbuchError::ServerResponse(e) => {
155                write!(f, "error {}: {}", e.error_code, e.metadata)
156            }
157            WorterbuchError::ProtocolNegotiationFailed(v) => {
158                write!(f, "The server does not implement protocol version {v}")
159            }
160            WorterbuchError::InvalidServerResponse(meta) => write!(
161                f,
162                "The server sent a response that is not valid for the issued request: {meta}"
163            ),
164            WorterbuchError::ReadOnlyKey(key) => {
165                write!(f, "Tried to modify a read only key: {key}")
166            }
167            WorterbuchError::AuthorizationRequired(op) => {
168                write!(f, "Operation {op} requires authorization")
169            }
170            WorterbuchError::AlreadyAuthorized => {
171                write!(f, "Handshake already done")
172            }
173            WorterbuchError::Unauthorized(err) => err.fmt(f),
174            WorterbuchError::NoPubStream(tid) => {
175                write!(
176                    f,
177                    "There is no active publish stream for transaction ID {tid}"
178                )
179            }
180            WorterbuchError::NotLeader => {
181                write!(
182                    f,
183                    "Node cannot process the request since it is not the current cluster leader"
184                )
185            }
186            WorterbuchError::Cas => {
187                write!(
188                    f,
189                    "Tried to modify a compare-and-swap value without providing a version number"
190                )
191            }
192            WorterbuchError::CasVersionMismatch => {
193                write!(
194                    f,
195                    "Tried to modify a compare-and-swap value with an out-of-sync version number"
196                )
197            }
198            WorterbuchError::NotImplemented => {
199                write!(
200                    f,
201                    "This function is not implemented in the negotiated protocol version",
202                )
203            }
204            WorterbuchError::KeyIsLocked(key) => {
205                write!(f, "Key {key} is locked by another client",)
206            }
207            WorterbuchError::KeyIsNotLocked(key) => {
208                write!(f, "Key {key} is not locked",)
209            }
210            WorterbuchError::FeatureDisabled(m) => m.fmt(f),
211            WorterbuchError::ClientIdCollision(id) => {
212                write!(f, "Client ID collision: {id} already exists",)
213            }
214        }
215    }
216}
217
218impl<T> Context<T, io::Error> for Result<T, io::Error> {
219    fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError> {
220        self.map_err(|e| WorterbuchError::IoError(e, metadata()))
221    }
222}
223
224impl<T> Context<T, serde_json::Error> for Result<T, serde_json::Error> {
225    fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError> {
226        self.map_err(|e| WorterbuchError::SerDeError(e, metadata()))
227    }
228}
229
230impl<T, V: fmt::Debug + 'static + Send + Sync> Context<T, SendError<V>>
231    for Result<T, SendError<V>>
232{
233    fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError> {
234        self.map_err(|e| WorterbuchError::Other(Box::new(e), metadata()))
235    }
236}
237
238impl<T: Send + Sync + 'static> From<mpsc::error::SendError<T>> for WorterbuchError {
239    fn from(value: mpsc::error::SendError<T>) -> Self {
240        WorterbuchError::Other(Box::new(value), "Internal server error".to_owned())
241    }
242}
243
244impl From<oneshot::error::RecvError> for WorterbuchError {
245    fn from(value: oneshot::error::RecvError) -> Self {
246        WorterbuchError::Other(Box::new(value), "Internal server error".to_owned())
247    }
248}
249
250pub type WorterbuchResult<T> = std::result::Result<T, WorterbuchError>;
251
252#[derive(Debug, Diagnostic)]
253pub enum ConnectionError {
254    IoError(Box<io::Error>),
255    SendError(Box<dyn std::error::Error + Send + Sync>),
256    #[cfg(feature = "ws")]
257    WebsocketError(Box<tungstenite::Error>),
258    #[cfg(feature = "wasm")]
259    WebsocketError(Box<tokio_tungstenite_wasm::Error>),
260    TrySendError(Box<dyn std::error::Error + Send + Sync>),
261    RecvError(Box<oneshot::error::RecvError>),
262    BcRecvError(Box<broadcast::error::RecvError>),
263    WorterbuchError(Box<WorterbuchError>),
264    ConfigError(Box<ConfigError>),
265    SerdeError(Box<serde_json::Error>),
266    AckError(Box<broadcast::error::SendError<u64>>),
267    Timeout(Box<String>),
268    #[cfg(feature = "ws")]
269    HttpError(Box<tungstenite::http::Error>),
270    AuthorizationError(Box<String>),
271    NoServerAddressesConfigured,
272    ServerResponse(Box<Err>),
273    #[cfg(feature = "ws")]
274    InvalidHeaderValue(Box<InvalidHeaderValue>),
275}
276
277impl std::error::Error for ConnectionError {}
278
279impl fmt::Display for ConnectionError {
280    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
281        match self {
282            Self::IoError(e) => e.fmt(f),
283            Self::SendError(e) => e.fmt(f),
284            #[cfg(any(feature = "ws", feature = "wasm"))]
285            Self::WebsocketError(e) => e.fmt(f),
286            Self::TrySendError(e) => e.fmt(f),
287            Self::RecvError(e) => e.fmt(f),
288            Self::BcRecvError(e) => e.fmt(f),
289            Self::WorterbuchError(e) => e.fmt(f),
290            Self::ConfigError(e) => e.fmt(f),
291            Self::SerdeError(e) => e.fmt(f),
292            Self::AckError(e) => e.fmt(f),
293            Self::Timeout(msg) => msg.fmt(f),
294            #[cfg(feature = "ws")]
295            Self::HttpError(e) => e.fmt(f),
296            Self::AuthorizationError(msg) => msg.fmt(f),
297            Self::NoServerAddressesConfigured => {
298                fmt::Display::fmt("no server addresses configured", f)
299            }
300            Self::ServerResponse(e) => e.fmt(f),
301            #[cfg(feature = "ws")]
302            Self::InvalidHeaderValue(e) => e.fmt(f),
303        }
304    }
305}
306
307pub type ConnectionResult<T> = std::result::Result<T, ConnectionError>;
308
309impl From<Err> for ConnectionError {
310    fn from(value: Err) -> Self {
311        ConnectionError::ServerResponse(Box::new(value))
312    }
313}
314
315impl From<io::Error> for ConnectionError {
316    fn from(e: io::Error) -> Self {
317        ConnectionError::IoError(Box::new(e))
318    }
319}
320
321impl<T: fmt::Debug + 'static + Send + Sync> From<SendError<T>> for ConnectionError {
322    fn from(e: SendError<T>) -> Self {
323        ConnectionError::SendError(Box::new(e))
324    }
325}
326
327#[cfg(feature = "ws")]
328impl From<tungstenite::Error> for ConnectionError {
329    fn from(e: tungstenite::Error) -> Self {
330        ConnectionError::WebsocketError(Box::new(e))
331    }
332}
333
334#[cfg(feature = "wasm")]
335impl From<tokio_tungstenite_wasm::Error> for ConnectionError {
336    fn from(e: tokio_tungstenite_wasm::Error) -> Self {
337        ConnectionError::WebsocketError(Box::new(e))
338    }
339}
340
341impl From<oneshot::error::RecvError> for ConnectionError {
342    fn from(e: oneshot::error::RecvError) -> Self {
343        ConnectionError::RecvError(Box::new(e))
344    }
345}
346
347impl From<broadcast::error::RecvError> for ConnectionError {
348    fn from(e: broadcast::error::RecvError) -> Self {
349        ConnectionError::BcRecvError(Box::new(e))
350    }
351}
352
353impl From<ConfigError> for ConnectionError {
354    fn from(e: ConfigError) -> Self {
355        ConnectionError::ConfigError(Box::new(e))
356    }
357}
358
359impl From<serde_json::Error> for ConnectionError {
360    fn from(e: serde_json::Error) -> Self {
361        Self::SerdeError(Box::new(e))
362    }
363}
364
365impl From<broadcast::error::SendError<u64>> for ConnectionError {
366    fn from(e: broadcast::error::SendError<u64>) -> Self {
367        Self::AckError(Box::new(e))
368    }
369}
370
371impl From<mpsc::error::TrySendError<ClientMessage>> for ConnectionError {
372    fn from(e: mpsc::error::TrySendError<ClientMessage>) -> Self {
373        Self::TrySendError(Box::new(Box::new(e)))
374    }
375}
376
377#[cfg(feature = "ws")]
378impl From<tungstenite::http::Error> for ConnectionError {
379    fn from(e: tungstenite::http::Error) -> Self {
380        Self::HttpError(Box::new(e))
381    }
382}
383
384#[cfg(feature = "ws")]
385impl From<InvalidHeaderValue> for ConnectionError {
386    fn from(e: InvalidHeaderValue) -> Self {
387        Self::InvalidHeaderValue(Box::new(e))
388    }
389}
390
391impl From<&WorterbuchError> for ErrorCode {
392    fn from(e: &WorterbuchError) -> Self {
393        match e {
394            WorterbuchError::IllegalWildcard(_) => ErrorCode::IllegalWildcard,
395            WorterbuchError::IllegalMultiWildcard(_) => ErrorCode::IllegalMultiWildcard,
396            WorterbuchError::MultiWildcardAtIllegalPosition(_) => {
397                ErrorCode::MultiWildcardAtIllegalPosition
398            }
399            WorterbuchError::NoSuchValue(_) => ErrorCode::NoSuchValue,
400            WorterbuchError::NotSubscribed => ErrorCode::NotSubscribed,
401            WorterbuchError::IoError(_, _) => ErrorCode::IoError,
402            WorterbuchError::SerDeError(_, _) => ErrorCode::SerdeError,
403            WorterbuchError::ProtocolNegotiationFailed(_) => ErrorCode::ProtocolNegotiationFailed,
404            WorterbuchError::InvalidServerResponse(_) => ErrorCode::InvalidServerResponse,
405            WorterbuchError::ReadOnlyKey(_) => ErrorCode::ReadOnlyKey,
406            WorterbuchError::AuthorizationRequired(_) => ErrorCode::AuthorizationRequired,
407            WorterbuchError::AlreadyAuthorized => ErrorCode::AlreadyAuthorized,
408            WorterbuchError::Unauthorized(_) => ErrorCode::Unauthorized,
409            WorterbuchError::NoPubStream(_) => ErrorCode::NoPubStream,
410            WorterbuchError::NotLeader => ErrorCode::NotLeader,
411            WorterbuchError::Cas => ErrorCode::Cas,
412            WorterbuchError::CasVersionMismatch => ErrorCode::CasVersionMismatch,
413            WorterbuchError::NotImplemented => ErrorCode::NotImplemented,
414            WorterbuchError::KeyIsLocked(_) => ErrorCode::KeyIsLocked,
415            WorterbuchError::KeyIsNotLocked(_) => ErrorCode::KeyIsNotLocked,
416            WorterbuchError::FeatureDisabled(_) => ErrorCode::KeyIsNotLocked,
417            WorterbuchError::ClientIdCollision(_) => ErrorCode::ClientIDCollision,
418            WorterbuchError::Other(_, _) | WorterbuchError::ServerResponse(_) => ErrorCode::Other,
419        }
420    }
421}
422
423impl From<WorterbuchError> for (StatusCode, String) {
424    fn from(e: WorterbuchError) -> Self {
425        match &e {
426            WorterbuchError::IllegalMultiWildcard(_)
427            | WorterbuchError::IllegalWildcard(_)
428            | WorterbuchError::MultiWildcardAtIllegalPosition(_)
429            | WorterbuchError::NotImplemented
430            | WorterbuchError::KeyIsNotLocked(_) => (StatusCode::BAD_REQUEST, e.to_string()),
431
432            WorterbuchError::AlreadyAuthorized
433            | WorterbuchError::NotSubscribed
434            | WorterbuchError::FeatureDisabled(_)
435            | WorterbuchError::NoPubStream(_) => (StatusCode::UNPROCESSABLE_ENTITY, e.to_string()),
436
437            WorterbuchError::KeyIsLocked(_)
438            | WorterbuchError::Cas
439            | WorterbuchError::CasVersionMismatch => (StatusCode::CONFLICT, e.to_string()),
440
441            WorterbuchError::ReadOnlyKey(_) => (StatusCode::METHOD_NOT_ALLOWED, e.to_string()),
442
443            WorterbuchError::AuthorizationRequired(_) => (StatusCode::UNAUTHORIZED, e.to_string()),
444
445            WorterbuchError::NoSuchValue(_) => (StatusCode::NOT_FOUND, e.to_string()),
446
447            WorterbuchError::Unauthorized(ae) => match &ae {
448                AuthorizationError::MissingToken => (StatusCode::UNAUTHORIZED, e.to_string()),
449                _ => (StatusCode::FORBIDDEN, e.to_string()),
450            },
451
452            WorterbuchError::IoError(_, _)
453            | WorterbuchError::SerDeError(_, _)
454            | WorterbuchError::InvalidServerResponse(_)
455            | WorterbuchError::Other(_, _)
456            | WorterbuchError::ServerResponse(_)
457            | WorterbuchError::ProtocolNegotiationFailed(_)
458            | WorterbuchError::ClientIdCollision(_) => {
459                (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
460            }
461
462            WorterbuchError::NotLeader => (StatusCode::NO_CONTENT, e.to_string()),
463        }
464    }
465}
466
467#[cfg(feature = "axum-errors")]
468pub mod axum {
469    use crate::error::{AuthorizationError, WorterbuchError};
470    use axum::response::IntoResponse;
471    use http::StatusCode;
472
473    impl IntoResponse for WorterbuchError {
474        fn into_response(self) -> axum::response::Response {
475            let err: (StatusCode, String) = self.into();
476            err.into_response()
477        }
478    }
479
480    impl IntoResponse for AuthorizationError {
481        fn into_response(self) -> axum::response::Response {
482            let err: WorterbuchError = self.into();
483            err.into_response()
484        }
485    }
486}