1use crate::{
21 AuthCheckOwned, ClientMessage, ErrorCode, Key, MetaData, Privilege, ProtocolVersionSegment,
22 RequestPattern, TransactionId, server::Err,
23};
24use http::StatusCode;
25#[cfg(feature = "ws")]
26use http::header::InvalidHeaderValue;
27use jsonwebtoken::Algorithm;
28use miette::Diagnostic;
29use opentelemetry_otlp::ExporterBuildError;
30use std::{fmt, io, net::AddrParseError, num::ParseIntError};
31use thiserror::Error;
32use tokio::sync::{
33 broadcast,
34 mpsc::{self, error::SendError},
35 oneshot,
36};
37use uuid::Uuid;
38
39#[derive(Debug, Error, Diagnostic)]
40pub enum ConfigError {
41 #[error("invalid separator: {0}; separator must be a single ASCII char")]
42 InvalidSeparator(String),
43 #[error("invalid wildcard: {0}; wildcard must be a single ASCII char")]
44 InvalidWildcard(String),
45 #[error("invalid multi-wildcard: {0}; multi-wildcard must be a single ASCII char")]
46 InvalidMultiWildcard(String),
47 #[error("invalid port: {0}")]
48 InvalidPort(ParseIntError),
49 #[error("invalid address: {0}")]
50 InvalidAddr(AddrParseError),
51 #[error("invalid interval: {0}")]
52 InvalidInterval(ParseIntError),
53 #[error("license file could not be loaded: {0}")]
54 InvalidLicense(String),
55 #[error("could not load config file: {0}")]
56 IoError(#[from] io::Error),
57 #[error("could not load config file: {0}")]
58 YamlError(#[from] serde_yaml::Error),
59 #[error("error setting up telemetry: {0}")]
60 ExporterBuildError(#[from] ExporterBuildError),
61 #[error("Parse error: {0}")]
62 ParseError(#[from] serde_json::Error),
63}
64
65pub trait ConfigIntContext<I> {
66 fn to_port(self) -> Result<I, ConfigError>;
67 fn to_interval(self) -> Result<I, ConfigError>;
68}
69
70impl<I> ConfigIntContext<I> for Result<I, ParseIntError> {
71 fn to_port(self) -> Result<I, ConfigError> {
72 self.map_err(ConfigError::InvalidPort)
73 }
74 fn to_interval(self) -> Result<I, ConfigError> {
75 self.map_err(ConfigError::InvalidInterval)
76 }
77}
78
79impl From<AddrParseError> for ConfigError {
80 fn from(e: AddrParseError) -> Self {
81 ConfigError::InvalidAddr(e)
82 }
83}
84
85pub type ConfigResult<T> = std::result::Result<T, ConfigError>;
86
87pub trait Context<T, E: std::error::Error> {
88 fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError>;
89}
90#[derive(Debug, Clone, Error, Diagnostic)]
91pub enum AuthorizationError {
92 #[error("Client lacks privilege '{0} {1}'")]
93 InsufficientPrivileges(Privilege, AuthCheckOwned),
94 #[error("No JWT was included in the request")]
95 MissingToken,
96 #[error("No JWT was configured")]
97 MissingSecret,
98 #[error("Incorrect check provided. This is a bug.")]
100 InvalidCheck,
101 #[error("Unsupported encryption algorith: {0:?}")]
102 UnsupportedEncryptionAlgorithm(Algorithm),
103 #[error("JWT error: {0}")]
104 JwtError(#[from] jsonwebtoken::errors::Error),
105}
106
107pub type AuthorizationResult<T> = Result<T, AuthorizationError>;
108
109#[derive(Debug, Diagnostic, thiserror::Error)]
110pub enum WorterbuchError {
111 IllegalWildcard(RequestPattern),
112 IllegalMultiWildcard(RequestPattern),
113 MultiWildcardAtIllegalPosition(RequestPattern),
114 NoSuchValue(Key),
115 NotSubscribed,
116 IoError(io::Error, MetaData),
117 SerDeError(serde_json::Error, MetaData),
118 InvalidServerResponse(MetaData),
119 Other(Box<dyn std::error::Error + Send + Sync>, MetaData),
120 ServerResponse(Err),
121 ProtocolNegotiationFailed(ProtocolVersionSegment),
122 ReadOnlyKey(Key),
123 AuthorizationRequired(Privilege),
124 AlreadyAuthorized,
125 Unauthorized(#[from] AuthorizationError),
126 NoPubStream(TransactionId),
127 NotLeader,
128 Cas,
129 CasVersionMismatch,
130 NotImplemented,
131 KeyIsLocked(Key),
132 KeyIsNotLocked(Key),
133 FeatureDisabled(MetaData),
134 ClientIdCollision(Uuid),
135}
136
137impl fmt::Display for WorterbuchError {
138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 match self {
140 WorterbuchError::IllegalWildcard(rp) => {
141 write!(f, "Key contains illegal wildcard: {rp}")
142 }
143 WorterbuchError::IllegalMultiWildcard(rp) => {
144 write!(f, "Key contains illegal multi-wildcard: {rp}")
145 }
146 WorterbuchError::MultiWildcardAtIllegalPosition(rp) => {
147 write!(f, "Key contains multi-wildcard at illegal position: {rp}")
148 }
149 WorterbuchError::NoSuchValue(key) => write!(f, "no value for key '{key}'"),
150 WorterbuchError::NotSubscribed => write!(f, "no such subscription"),
151 WorterbuchError::IoError(e, meta) => write!(f, "{meta}: {e}"),
152 WorterbuchError::SerDeError(e, meta) => write!(f, "{meta}: {e}"),
153 WorterbuchError::Other(e, meta) => write!(f, "{meta}: {e}"),
154 WorterbuchError::ServerResponse(e) => {
155 write!(f, "error {}: {}", e.error_code, e.metadata)
156 }
157 WorterbuchError::ProtocolNegotiationFailed(v) => {
158 write!(f, "The server does not implement protocol version {v}")
159 }
160 WorterbuchError::InvalidServerResponse(meta) => write!(
161 f,
162 "The server sent a response that is not valid for the issued request: {meta}"
163 ),
164 WorterbuchError::ReadOnlyKey(key) => {
165 write!(f, "Tried to modify a read only key: {key}")
166 }
167 WorterbuchError::AuthorizationRequired(op) => {
168 write!(f, "Operation {op} requires authorization")
169 }
170 WorterbuchError::AlreadyAuthorized => {
171 write!(f, "Handshake already done")
172 }
173 WorterbuchError::Unauthorized(err) => err.fmt(f),
174 WorterbuchError::NoPubStream(tid) => {
175 write!(
176 f,
177 "There is no active publish stream for transaction ID {tid}"
178 )
179 }
180 WorterbuchError::NotLeader => {
181 write!(
182 f,
183 "Node cannot process the request since it is not the current cluster leader"
184 )
185 }
186 WorterbuchError::Cas => {
187 write!(
188 f,
189 "Tried to modify a compare-and-swap value without providing a version number"
190 )
191 }
192 WorterbuchError::CasVersionMismatch => {
193 write!(
194 f,
195 "Tried to modify a compare-and-swap value with an out-of-sync version number"
196 )
197 }
198 WorterbuchError::NotImplemented => {
199 write!(
200 f,
201 "This function is not implemented in the negotiated protocol version",
202 )
203 }
204 WorterbuchError::KeyIsLocked(key) => {
205 write!(f, "Key {key} is locked by another client",)
206 }
207 WorterbuchError::KeyIsNotLocked(key) => {
208 write!(f, "Key {key} is not locked",)
209 }
210 WorterbuchError::FeatureDisabled(m) => m.fmt(f),
211 WorterbuchError::ClientIdCollision(id) => {
212 write!(f, "Client ID collision: {id} already exists",)
213 }
214 }
215 }
216}
217
218impl<T> Context<T, io::Error> for Result<T, io::Error> {
219 fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError> {
220 self.map_err(|e| WorterbuchError::IoError(e, metadata()))
221 }
222}
223
224impl<T> Context<T, serde_json::Error> for Result<T, serde_json::Error> {
225 fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError> {
226 self.map_err(|e| WorterbuchError::SerDeError(e, metadata()))
227 }
228}
229
230impl<T, V: fmt::Debug + 'static + Send + Sync> Context<T, SendError<V>>
231 for Result<T, SendError<V>>
232{
233 fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError> {
234 self.map_err(|e| WorterbuchError::Other(Box::new(e), metadata()))
235 }
236}
237
238impl<T: Send + Sync + 'static> From<mpsc::error::SendError<T>> for WorterbuchError {
239 fn from(value: mpsc::error::SendError<T>) -> Self {
240 WorterbuchError::Other(Box::new(value), "Internal server error".to_owned())
241 }
242}
243
244impl From<oneshot::error::RecvError> for WorterbuchError {
245 fn from(value: oneshot::error::RecvError) -> Self {
246 WorterbuchError::Other(Box::new(value), "Internal server error".to_owned())
247 }
248}
249
250pub type WorterbuchResult<T> = std::result::Result<T, WorterbuchError>;
251
252#[derive(Debug, Diagnostic)]
253pub enum ConnectionError {
254 IoError(Box<io::Error>),
255 SendError(Box<dyn std::error::Error + Send + Sync>),
256 #[cfg(feature = "ws")]
257 WebsocketError(Box<tungstenite::Error>),
258 #[cfg(feature = "wasm")]
259 WebsocketError(Box<tokio_tungstenite_wasm::Error>),
260 TrySendError(Box<dyn std::error::Error + Send + Sync>),
261 RecvError(Box<oneshot::error::RecvError>),
262 BcRecvError(Box<broadcast::error::RecvError>),
263 WorterbuchError(Box<WorterbuchError>),
264 ConfigError(Box<ConfigError>),
265 SerdeError(Box<serde_json::Error>),
266 AckError(Box<broadcast::error::SendError<u64>>),
267 Timeout(Box<String>),
268 #[cfg(feature = "ws")]
269 HttpError(Box<tungstenite::http::Error>),
270 AuthorizationError(Box<String>),
271 NoServerAddressesConfigured,
272 ServerResponse(Box<Err>),
273 #[cfg(feature = "ws")]
274 InvalidHeaderValue(Box<InvalidHeaderValue>),
275}
276
277impl std::error::Error for ConnectionError {}
278
279impl fmt::Display for ConnectionError {
280 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
281 match self {
282 Self::IoError(e) => e.fmt(f),
283 Self::SendError(e) => e.fmt(f),
284 #[cfg(any(feature = "ws", feature = "wasm"))]
285 Self::WebsocketError(e) => e.fmt(f),
286 Self::TrySendError(e) => e.fmt(f),
287 Self::RecvError(e) => e.fmt(f),
288 Self::BcRecvError(e) => e.fmt(f),
289 Self::WorterbuchError(e) => e.fmt(f),
290 Self::ConfigError(e) => e.fmt(f),
291 Self::SerdeError(e) => e.fmt(f),
292 Self::AckError(e) => e.fmt(f),
293 Self::Timeout(msg) => msg.fmt(f),
294 #[cfg(feature = "ws")]
295 Self::HttpError(e) => e.fmt(f),
296 Self::AuthorizationError(msg) => msg.fmt(f),
297 Self::NoServerAddressesConfigured => {
298 fmt::Display::fmt("no server addresses configured", f)
299 }
300 Self::ServerResponse(e) => e.fmt(f),
301 #[cfg(feature = "ws")]
302 Self::InvalidHeaderValue(e) => e.fmt(f),
303 }
304 }
305}
306
307pub type ConnectionResult<T> = std::result::Result<T, ConnectionError>;
308
309impl From<Err> for ConnectionError {
310 fn from(value: Err) -> Self {
311 ConnectionError::ServerResponse(Box::new(value))
312 }
313}
314
315impl From<io::Error> for ConnectionError {
316 fn from(e: io::Error) -> Self {
317 ConnectionError::IoError(Box::new(e))
318 }
319}
320
321impl<T: fmt::Debug + 'static + Send + Sync> From<SendError<T>> for ConnectionError {
322 fn from(e: SendError<T>) -> Self {
323 ConnectionError::SendError(Box::new(e))
324 }
325}
326
327#[cfg(feature = "ws")]
328impl From<tungstenite::Error> for ConnectionError {
329 fn from(e: tungstenite::Error) -> Self {
330 ConnectionError::WebsocketError(Box::new(e))
331 }
332}
333
334#[cfg(feature = "wasm")]
335impl From<tokio_tungstenite_wasm::Error> for ConnectionError {
336 fn from(e: tokio_tungstenite_wasm::Error) -> Self {
337 ConnectionError::WebsocketError(Box::new(e))
338 }
339}
340
341impl From<oneshot::error::RecvError> for ConnectionError {
342 fn from(e: oneshot::error::RecvError) -> Self {
343 ConnectionError::RecvError(Box::new(e))
344 }
345}
346
347impl From<broadcast::error::RecvError> for ConnectionError {
348 fn from(e: broadcast::error::RecvError) -> Self {
349 ConnectionError::BcRecvError(Box::new(e))
350 }
351}
352
353impl From<ConfigError> for ConnectionError {
354 fn from(e: ConfigError) -> Self {
355 ConnectionError::ConfigError(Box::new(e))
356 }
357}
358
359impl From<serde_json::Error> for ConnectionError {
360 fn from(e: serde_json::Error) -> Self {
361 Self::SerdeError(Box::new(e))
362 }
363}
364
365impl From<broadcast::error::SendError<u64>> for ConnectionError {
366 fn from(e: broadcast::error::SendError<u64>) -> Self {
367 Self::AckError(Box::new(e))
368 }
369}
370
371impl From<mpsc::error::TrySendError<ClientMessage>> for ConnectionError {
372 fn from(e: mpsc::error::TrySendError<ClientMessage>) -> Self {
373 Self::TrySendError(Box::new(Box::new(e)))
374 }
375}
376
377#[cfg(feature = "ws")]
378impl From<tungstenite::http::Error> for ConnectionError {
379 fn from(e: tungstenite::http::Error) -> Self {
380 Self::HttpError(Box::new(e))
381 }
382}
383
384#[cfg(feature = "ws")]
385impl From<InvalidHeaderValue> for ConnectionError {
386 fn from(e: InvalidHeaderValue) -> Self {
387 Self::InvalidHeaderValue(Box::new(e))
388 }
389}
390
391impl From<&WorterbuchError> for ErrorCode {
392 fn from(e: &WorterbuchError) -> Self {
393 match e {
394 WorterbuchError::IllegalWildcard(_) => ErrorCode::IllegalWildcard,
395 WorterbuchError::IllegalMultiWildcard(_) => ErrorCode::IllegalMultiWildcard,
396 WorterbuchError::MultiWildcardAtIllegalPosition(_) => {
397 ErrorCode::MultiWildcardAtIllegalPosition
398 }
399 WorterbuchError::NoSuchValue(_) => ErrorCode::NoSuchValue,
400 WorterbuchError::NotSubscribed => ErrorCode::NotSubscribed,
401 WorterbuchError::IoError(_, _) => ErrorCode::IoError,
402 WorterbuchError::SerDeError(_, _) => ErrorCode::SerdeError,
403 WorterbuchError::ProtocolNegotiationFailed(_) => ErrorCode::ProtocolNegotiationFailed,
404 WorterbuchError::InvalidServerResponse(_) => ErrorCode::InvalidServerResponse,
405 WorterbuchError::ReadOnlyKey(_) => ErrorCode::ReadOnlyKey,
406 WorterbuchError::AuthorizationRequired(_) => ErrorCode::AuthorizationRequired,
407 WorterbuchError::AlreadyAuthorized => ErrorCode::AlreadyAuthorized,
408 WorterbuchError::Unauthorized(_) => ErrorCode::Unauthorized,
409 WorterbuchError::NoPubStream(_) => ErrorCode::NoPubStream,
410 WorterbuchError::NotLeader => ErrorCode::NotLeader,
411 WorterbuchError::Cas => ErrorCode::Cas,
412 WorterbuchError::CasVersionMismatch => ErrorCode::CasVersionMismatch,
413 WorterbuchError::NotImplemented => ErrorCode::NotImplemented,
414 WorterbuchError::KeyIsLocked(_) => ErrorCode::KeyIsLocked,
415 WorterbuchError::KeyIsNotLocked(_) => ErrorCode::KeyIsNotLocked,
416 WorterbuchError::FeatureDisabled(_) => ErrorCode::KeyIsNotLocked,
417 WorterbuchError::ClientIdCollision(_) => ErrorCode::ClientIDCollision,
418 WorterbuchError::Other(_, _) | WorterbuchError::ServerResponse(_) => ErrorCode::Other,
419 }
420 }
421}
422
423impl From<WorterbuchError> for (StatusCode, String) {
424 fn from(e: WorterbuchError) -> Self {
425 match &e {
426 WorterbuchError::IllegalMultiWildcard(_)
427 | WorterbuchError::IllegalWildcard(_)
428 | WorterbuchError::MultiWildcardAtIllegalPosition(_)
429 | WorterbuchError::NotImplemented
430 | WorterbuchError::KeyIsNotLocked(_) => (StatusCode::BAD_REQUEST, e.to_string()),
431
432 WorterbuchError::AlreadyAuthorized
433 | WorterbuchError::NotSubscribed
434 | WorterbuchError::FeatureDisabled(_)
435 | WorterbuchError::NoPubStream(_) => (StatusCode::UNPROCESSABLE_ENTITY, e.to_string()),
436
437 WorterbuchError::KeyIsLocked(_)
438 | WorterbuchError::Cas
439 | WorterbuchError::CasVersionMismatch => (StatusCode::CONFLICT, e.to_string()),
440
441 WorterbuchError::ReadOnlyKey(_) => (StatusCode::METHOD_NOT_ALLOWED, e.to_string()),
442
443 WorterbuchError::AuthorizationRequired(_) => (StatusCode::UNAUTHORIZED, e.to_string()),
444
445 WorterbuchError::NoSuchValue(_) => (StatusCode::NOT_FOUND, e.to_string()),
446
447 WorterbuchError::Unauthorized(ae) => match &ae {
448 AuthorizationError::MissingToken => (StatusCode::UNAUTHORIZED, e.to_string()),
449 _ => (StatusCode::FORBIDDEN, e.to_string()),
450 },
451
452 WorterbuchError::IoError(_, _)
453 | WorterbuchError::SerDeError(_, _)
454 | WorterbuchError::InvalidServerResponse(_)
455 | WorterbuchError::Other(_, _)
456 | WorterbuchError::ServerResponse(_)
457 | WorterbuchError::ProtocolNegotiationFailed(_)
458 | WorterbuchError::ClientIdCollision(_) => {
459 (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
460 }
461
462 WorterbuchError::NotLeader => (StatusCode::NO_CONTENT, e.to_string()),
463 }
464 }
465}
466
467#[cfg(feature = "axum-errors")]
468pub mod axum {
469 use crate::error::{AuthorizationError, WorterbuchError};
470 use axum::response::IntoResponse;
471 use http::StatusCode;
472
473 impl IntoResponse for WorterbuchError {
474 fn into_response(self) -> axum::response::Response {
475 let err: (StatusCode, String) = self.into();
476 err.into_response()
477 }
478 }
479
480 impl IntoResponse for AuthorizationError {
481 fn into_response(self) -> axum::response::Response {
482 let err: WorterbuchError = self.into();
483 err.into_response()
484 }
485 }
486}