1use crate::{
21 AuthCheckOwned, ClientId, ClientMessage, ErrorCode, Key, MetaData, Privilege,
22 ProtocolVersionSegment, RequestPattern, TransactionId, server::Err,
23};
24use http::StatusCode;
25#[cfg(feature = "ws")]
26use http::header::InvalidHeaderValue;
27use jsonwebtoken::Algorithm;
28use miette::Diagnostic;
29use opentelemetry_otlp::ExporterBuildError;
30use std::{fmt, io, net::AddrParseError, num::ParseIntError};
31use thiserror::Error;
32use tokio::sync::{
33 broadcast,
34 mpsc::{self, error::SendError},
35 oneshot,
36};
37
38#[derive(Debug, Error, Diagnostic)]
39pub enum ConfigError {
40 #[error("invalid separator: {0}; separator must be a single ASCII char")]
41 InvalidSeparator(String),
42 #[error("invalid wildcard: {0}; wildcard must be a single ASCII char")]
43 InvalidWildcard(String),
44 #[error("invalid multi-wildcard: {0}; multi-wildcard must be a single ASCII char")]
45 InvalidMultiWildcard(String),
46 #[error("invalid port: {0}")]
47 InvalidPort(ParseIntError),
48 #[error("invalid address: {0}")]
49 InvalidAddr(AddrParseError),
50 #[error("invalid interval: {0}")]
51 InvalidInterval(ParseIntError),
52 #[error("license file could not be loaded: {0}")]
53 InvalidLicense(String),
54 #[error("could not load config file: {0}")]
55 IoError(#[from] io::Error),
56 #[error("could not load config file: {0}")]
57 YamlError(#[from] serde_yaml::Error),
58 #[error("error setting up telemetry: {0}")]
59 ExporterBuildError(#[from] ExporterBuildError),
60 #[error("Parse error: {0}")]
61 ParseError(#[from] serde_json::Error),
62}
63
64pub trait ConfigIntContext<I> {
65 fn to_port(self) -> Result<I, ConfigError>;
66 fn to_interval(self) -> Result<I, ConfigError>;
67}
68
69impl<I> ConfigIntContext<I> for Result<I, ParseIntError> {
70 fn to_port(self) -> Result<I, ConfigError> {
71 self.map_err(ConfigError::InvalidPort)
72 }
73 fn to_interval(self) -> Result<I, ConfigError> {
74 self.map_err(ConfigError::InvalidInterval)
75 }
76}
77
78impl From<AddrParseError> for ConfigError {
79 fn from(e: AddrParseError) -> Self {
80 ConfigError::InvalidAddr(e)
81 }
82}
83
84pub type ConfigResult<T> = std::result::Result<T, ConfigError>;
85
86pub trait Context<T, E: std::error::Error> {
87 fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError>;
88}
89#[derive(Debug, Clone, Error, Diagnostic)]
90pub enum AuthorizationError {
91 #[error("Client lacks privilege '{0} {1}'")]
92 InsufficientPrivileges(Privilege, AuthCheckOwned),
93 #[error("No JWT was included in the request")]
94 MissingToken,
95 #[error("No JWT was configured")]
96 MissingSecret,
97 #[error("Incorrect check provided. This is a bug.")]
99 InvalidCheck,
100 #[error("Unsupported encryption algorith: {0:?}")]
101 UnsupportedEncryptionAlgorithm(Algorithm),
102 #[error("JWT error: {0}")]
103 JwtError(#[from] jsonwebtoken::errors::Error),
104}
105
106pub type AuthorizationResult<T> = Result<T, AuthorizationError>;
107
108#[derive(Debug, Diagnostic, thiserror::Error)]
109pub enum WorterbuchError {
110 IllegalWildcard(RequestPattern),
111 IllegalMultiWildcard(RequestPattern),
112 MultiWildcardAtIllegalPosition(RequestPattern),
113 NoSuchValue(Key),
114 NotSubscribed,
115 IoError(io::Error, MetaData),
116 SerDeError(serde_json::Error, MetaData),
117 InvalidServerResponse(MetaData),
118 Other(Box<dyn std::error::Error + Send + Sync>, MetaData),
119 ServerResponse(Err),
120 ProtocolNegotiationFailed(ProtocolVersionSegment),
121 ReadOnlyKey(Key),
122 AuthorizationRequired(Privilege),
123 AlreadyAuthorized,
124 Unauthorized(#[from] AuthorizationError),
125 NoPubStream(TransactionId),
126 NotLeader,
127 Cas,
128 CasVersionMismatch,
129 NotImplemented,
130 KeyIsLocked(Key),
131 KeyIsNotLocked(Key),
132 FeatureDisabled(MetaData),
133 ClientIdCollision(ClientId),
134}
135
136impl fmt::Display for WorterbuchError {
137 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
138 match self {
139 WorterbuchError::IllegalWildcard(rp) => {
140 write!(f, "Key contains illegal wildcard: {rp}")
141 }
142 WorterbuchError::IllegalMultiWildcard(rp) => {
143 write!(f, "Key contains illegal multi-wildcard: {rp}")
144 }
145 WorterbuchError::MultiWildcardAtIllegalPosition(rp) => {
146 write!(f, "Key contains multi-wildcard at illegal position: {rp}")
147 }
148 WorterbuchError::NoSuchValue(key) => write!(f, "no value for key '{key}'"),
149 WorterbuchError::NotSubscribed => write!(f, "no such subscription"),
150 WorterbuchError::IoError(e, meta) => write!(f, "{meta}: {e}"),
151 WorterbuchError::SerDeError(e, meta) => write!(f, "{meta}: {e}"),
152 WorterbuchError::Other(e, meta) => write!(f, "{meta}: {e}"),
153 WorterbuchError::ServerResponse(e) => {
154 write!(f, "error {}: {}", e.error_code, e.metadata)
155 }
156 WorterbuchError::ProtocolNegotiationFailed(v) => {
157 write!(f, "The server does not implement protocol version {v}")
158 }
159 WorterbuchError::InvalidServerResponse(meta) => write!(
160 f,
161 "The server sent a response that is not valid for the issued request: {meta}"
162 ),
163 WorterbuchError::ReadOnlyKey(key) => {
164 write!(f, "Tried to modify a read only key: {key}")
165 }
166 WorterbuchError::AuthorizationRequired(op) => {
167 write!(f, "Operation {op} requires authorization")
168 }
169 WorterbuchError::AlreadyAuthorized => {
170 write!(f, "Handshake already done")
171 }
172 WorterbuchError::Unauthorized(err) => err.fmt(f),
173 WorterbuchError::NoPubStream(tid) => {
174 write!(
175 f,
176 "There is no active publish stream for transaction ID {tid}"
177 )
178 }
179 WorterbuchError::NotLeader => {
180 write!(
181 f,
182 "Node cannot process the request since it is not the current cluster leader"
183 )
184 }
185 WorterbuchError::Cas => {
186 write!(
187 f,
188 "Tried to modify a compare-and-swap value without providing a version number"
189 )
190 }
191 WorterbuchError::CasVersionMismatch => {
192 write!(
193 f,
194 "Tried to modify a compare-and-swap value with an out-of-sync version number"
195 )
196 }
197 WorterbuchError::NotImplemented => {
198 write!(
199 f,
200 "This function is not implemented in the negotiated protocol version",
201 )
202 }
203 WorterbuchError::KeyIsLocked(key) => {
204 write!(f, "Key {key} is locked by another client",)
205 }
206 WorterbuchError::KeyIsNotLocked(key) => {
207 write!(f, "Key {key} is not locked",)
208 }
209 WorterbuchError::FeatureDisabled(m) => m.fmt(f),
210 WorterbuchError::ClientIdCollision(id) => {
211 write!(f, "Client ID collision: {id} already exists",)
212 }
213 }
214 }
215}
216
217impl<T> Context<T, io::Error> for Result<T, io::Error> {
218 fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError> {
219 self.map_err(|e| WorterbuchError::IoError(e, metadata()))
220 }
221}
222
223impl<T> Context<T, serde_json::Error> for Result<T, serde_json::Error> {
224 fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError> {
225 self.map_err(|e| WorterbuchError::SerDeError(e, metadata()))
226 }
227}
228
229impl<T, V: fmt::Debug + 'static + Send + Sync> Context<T, SendError<V>>
230 for Result<T, SendError<V>>
231{
232 fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError> {
233 self.map_err(|e| WorterbuchError::Other(Box::new(e), metadata()))
234 }
235}
236
237impl<T: Send + Sync + 'static> From<mpsc::error::SendError<T>> for WorterbuchError {
238 fn from(value: mpsc::error::SendError<T>) -> Self {
239 WorterbuchError::Other(Box::new(value), "Internal server error".to_owned())
240 }
241}
242
243impl From<oneshot::error::RecvError> for WorterbuchError {
244 fn from(value: oneshot::error::RecvError) -> Self {
245 WorterbuchError::Other(Box::new(value), "Internal server error".to_owned())
246 }
247}
248
249pub type WorterbuchResult<T> = std::result::Result<T, WorterbuchError>;
250
251#[derive(Debug, Diagnostic)]
252pub enum ConnectionError {
253 IoError(Box<io::Error>),
254 SendError(Box<dyn std::error::Error + Send + Sync>),
255 #[cfg(feature = "ws")]
256 WebsocketError(Box<tungstenite::Error>),
257 #[cfg(feature = "wasm")]
258 WebsocketWasmError(Box<tokio_tungstenite_wasm::Error>),
259 TrySendError(Box<dyn std::error::Error + Send + Sync>),
260 RecvError(Box<oneshot::error::RecvError>),
261 BcRecvError(Box<broadcast::error::RecvError>),
262 WorterbuchError(Box<WorterbuchError>),
263 ConfigError(Box<ConfigError>),
264 SerdeError(Box<serde_json::Error>),
265 AckError(Box<broadcast::error::SendError<u64>>),
266 Timeout(Box<String>),
267 #[cfg(feature = "ws")]
268 HttpError(Box<tungstenite::http::Error>),
269 AuthorizationError(Box<String>),
270 NoServerAddressesConfigured,
271 ServerResponse(Box<Err>),
272 #[cfg(feature = "ws")]
273 InvalidHeaderValue(Box<InvalidHeaderValue>),
274}
275
276impl std::error::Error for ConnectionError {}
277
278impl fmt::Display for ConnectionError {
279 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
280 match self {
281 Self::IoError(e) => e.fmt(f),
282 Self::SendError(e) => e.fmt(f),
283 #[cfg(feature = "ws")]
284 Self::WebsocketError(e) => e.fmt(f),
285 #[cfg(feature = "wasm")]
286 Self::WebsocketWasmError(e) => e.fmt(f),
287 Self::TrySendError(e) => e.fmt(f),
288 Self::RecvError(e) => e.fmt(f),
289 Self::BcRecvError(e) => e.fmt(f),
290 Self::WorterbuchError(e) => e.fmt(f),
291 Self::ConfigError(e) => e.fmt(f),
292 Self::SerdeError(e) => e.fmt(f),
293 Self::AckError(e) => e.fmt(f),
294 Self::Timeout(msg) => msg.fmt(f),
295 #[cfg(feature = "ws")]
296 Self::HttpError(e) => e.fmt(f),
297 Self::AuthorizationError(msg) => msg.fmt(f),
298 Self::NoServerAddressesConfigured => {
299 fmt::Display::fmt("no server addresses configured", f)
300 }
301 Self::ServerResponse(e) => e.fmt(f),
302 #[cfg(feature = "ws")]
303 Self::InvalidHeaderValue(e) => e.fmt(f),
304 }
305 }
306}
307
308pub type ConnectionResult<T> = std::result::Result<T, ConnectionError>;
309
310impl From<Err> for ConnectionError {
311 fn from(value: Err) -> Self {
312 ConnectionError::ServerResponse(Box::new(value))
313 }
314}
315
316impl From<io::Error> for ConnectionError {
317 fn from(e: io::Error) -> Self {
318 ConnectionError::IoError(Box::new(e))
319 }
320}
321
322impl<T: fmt::Debug + 'static + Send + Sync> From<SendError<T>> for ConnectionError {
323 fn from(e: SendError<T>) -> Self {
324 ConnectionError::SendError(Box::new(e))
325 }
326}
327
328#[cfg(feature = "ws")]
329impl From<tungstenite::Error> for ConnectionError {
330 fn from(e: tungstenite::Error) -> Self {
331 ConnectionError::WebsocketError(Box::new(e))
332 }
333}
334
335#[cfg(feature = "wasm")]
336impl From<tokio_tungstenite_wasm::Error> for ConnectionError {
337 fn from(e: tokio_tungstenite_wasm::Error) -> Self {
338 ConnectionError::WebsocketWasmError(Box::new(e))
339 }
340}
341
342impl From<oneshot::error::RecvError> for ConnectionError {
343 fn from(e: oneshot::error::RecvError) -> Self {
344 ConnectionError::RecvError(Box::new(e))
345 }
346}
347
348impl From<broadcast::error::RecvError> for ConnectionError {
349 fn from(e: broadcast::error::RecvError) -> Self {
350 ConnectionError::BcRecvError(Box::new(e))
351 }
352}
353
354impl From<ConfigError> for ConnectionError {
355 fn from(e: ConfigError) -> Self {
356 ConnectionError::ConfigError(Box::new(e))
357 }
358}
359
360impl From<serde_json::Error> for ConnectionError {
361 fn from(e: serde_json::Error) -> Self {
362 Self::SerdeError(Box::new(e))
363 }
364}
365
366impl From<broadcast::error::SendError<u64>> for ConnectionError {
367 fn from(e: broadcast::error::SendError<u64>) -> Self {
368 Self::AckError(Box::new(e))
369 }
370}
371
372impl From<mpsc::error::TrySendError<ClientMessage>> for ConnectionError {
373 fn from(e: mpsc::error::TrySendError<ClientMessage>) -> Self {
374 Self::TrySendError(Box::new(Box::new(e)))
375 }
376}
377
378#[cfg(feature = "ws")]
379impl From<tungstenite::http::Error> for ConnectionError {
380 fn from(e: tungstenite::http::Error) -> Self {
381 Self::HttpError(Box::new(e))
382 }
383}
384
385#[cfg(feature = "ws")]
386impl From<InvalidHeaderValue> for ConnectionError {
387 fn from(e: InvalidHeaderValue) -> Self {
388 Self::InvalidHeaderValue(Box::new(e))
389 }
390}
391
392impl From<&WorterbuchError> for ErrorCode {
393 fn from(e: &WorterbuchError) -> Self {
394 match e {
395 WorterbuchError::IllegalWildcard(_) => ErrorCode::IllegalWildcard,
396 WorterbuchError::IllegalMultiWildcard(_) => ErrorCode::IllegalMultiWildcard,
397 WorterbuchError::MultiWildcardAtIllegalPosition(_) => {
398 ErrorCode::MultiWildcardAtIllegalPosition
399 }
400 WorterbuchError::NoSuchValue(_) => ErrorCode::NoSuchValue,
401 WorterbuchError::NotSubscribed => ErrorCode::NotSubscribed,
402 WorterbuchError::IoError(_, _) => ErrorCode::IoError,
403 WorterbuchError::SerDeError(_, _) => ErrorCode::SerdeError,
404 WorterbuchError::ProtocolNegotiationFailed(_) => ErrorCode::ProtocolNegotiationFailed,
405 WorterbuchError::InvalidServerResponse(_) => ErrorCode::InvalidServerResponse,
406 WorterbuchError::ReadOnlyKey(_) => ErrorCode::ReadOnlyKey,
407 WorterbuchError::AuthorizationRequired(_) => ErrorCode::AuthorizationRequired,
408 WorterbuchError::AlreadyAuthorized => ErrorCode::AlreadyAuthorized,
409 WorterbuchError::Unauthorized(_) => ErrorCode::Unauthorized,
410 WorterbuchError::NoPubStream(_) => ErrorCode::NoPubStream,
411 WorterbuchError::NotLeader => ErrorCode::NotLeader,
412 WorterbuchError::Cas => ErrorCode::Cas,
413 WorterbuchError::CasVersionMismatch => ErrorCode::CasVersionMismatch,
414 WorterbuchError::NotImplemented => ErrorCode::NotImplemented,
415 WorterbuchError::KeyIsLocked(_) => ErrorCode::KeyIsLocked,
416 WorterbuchError::KeyIsNotLocked(_) => ErrorCode::KeyIsNotLocked,
417 WorterbuchError::FeatureDisabled(_) => ErrorCode::KeyIsNotLocked,
418 WorterbuchError::ClientIdCollision(_) => ErrorCode::ClientIDCollision,
419 WorterbuchError::Other(_, _) | WorterbuchError::ServerResponse(_) => ErrorCode::Other,
420 }
421 }
422}
423
424impl From<WorterbuchError> for (StatusCode, String) {
425 fn from(e: WorterbuchError) -> Self {
426 match &e {
427 WorterbuchError::IllegalMultiWildcard(_)
428 | WorterbuchError::IllegalWildcard(_)
429 | WorterbuchError::MultiWildcardAtIllegalPosition(_)
430 | WorterbuchError::NotImplemented
431 | WorterbuchError::KeyIsNotLocked(_) => (StatusCode::BAD_REQUEST, e.to_string()),
432
433 WorterbuchError::AlreadyAuthorized
434 | WorterbuchError::NotSubscribed
435 | WorterbuchError::FeatureDisabled(_)
436 | WorterbuchError::NoPubStream(_) => (StatusCode::UNPROCESSABLE_ENTITY, e.to_string()),
437
438 WorterbuchError::KeyIsLocked(_)
439 | WorterbuchError::Cas
440 | WorterbuchError::CasVersionMismatch => (StatusCode::CONFLICT, e.to_string()),
441
442 WorterbuchError::ReadOnlyKey(_) => (StatusCode::METHOD_NOT_ALLOWED, e.to_string()),
443
444 WorterbuchError::AuthorizationRequired(_) => (StatusCode::UNAUTHORIZED, e.to_string()),
445
446 WorterbuchError::NoSuchValue(_) => (StatusCode::NOT_FOUND, e.to_string()),
447
448 WorterbuchError::Unauthorized(ae) => match &ae {
449 AuthorizationError::MissingToken => (StatusCode::UNAUTHORIZED, e.to_string()),
450 _ => (StatusCode::FORBIDDEN, e.to_string()),
451 },
452
453 WorterbuchError::IoError(_, _)
454 | WorterbuchError::SerDeError(_, _)
455 | WorterbuchError::InvalidServerResponse(_)
456 | WorterbuchError::Other(_, _)
457 | WorterbuchError::ServerResponse(_)
458 | WorterbuchError::ProtocolNegotiationFailed(_)
459 | WorterbuchError::ClientIdCollision(_) => {
460 (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
461 }
462
463 WorterbuchError::NotLeader => (StatusCode::NO_CONTENT, e.to_string()),
464 }
465 }
466}
467
468#[cfg(feature = "axum-errors")]
469pub mod axum {
470 use crate::error::{AuthorizationError, WorterbuchError};
471 use axum::response::IntoResponse;
472 use http::StatusCode;
473
474 impl IntoResponse for WorterbuchError {
475 fn into_response(self) -> axum::response::Response {
476 let err: (StatusCode, String) = self.into();
477 err.into_response()
478 }
479 }
480
481 impl IntoResponse for AuthorizationError {
482 fn into_response(self) -> axum::response::Response {
483 let err: WorterbuchError = self.into();
484 err.into_response()
485 }
486 }
487}