1use crate::{
21 AuthCheckOwned, ClientMessage, ErrorCode, Key, MetaData, Privilege, ProtocolVersionSegment,
22 RequestPattern, TransactionId, server::Err,
23};
24use http::StatusCode;
25use miette::Diagnostic;
26use opentelemetry_otlp::ExporterBuildError;
27use std::{fmt, io, net::AddrParseError, num::ParseIntError};
28use thiserror::Error;
29use tokio::sync::{
30 broadcast,
31 mpsc::{self, error::SendError},
32 oneshot,
33};
34
35#[derive(Debug, Error, Diagnostic)]
36pub enum ConfigError {
37 #[error("invalid separator: {0}; separator must be a single ASCII char")]
38 InvalidSeparator(String),
39 #[error("invalid wildcard: {0}; wildcard must be a single ASCII char")]
40 InvalidWildcard(String),
41 #[error("invalid multi-wildcard: {0}; multi-wildcard must be a single ASCII char")]
42 InvalidMultiWildcard(String),
43 #[error("invalid port: {0}")]
44 InvalidPort(ParseIntError),
45 #[error("invalid address: {0}")]
46 InvalidAddr(AddrParseError),
47 #[error("invalid interval: {0}")]
48 InvalidInterval(ParseIntError),
49 #[error("license file could not be loaded: {0}")]
50 InvalidLicense(String),
51 #[error("could not load config file: {0}")]
52 IoError(#[from] io::Error),
53 #[error("could not load config file: {0}")]
54 YamlError(#[from] serde_yaml::Error),
55 #[error("error setting up telemetry: {0}")]
56 ExporterBuildError(#[from] ExporterBuildError),
57}
58
59pub trait ConfigIntContext<I> {
60 fn to_port(self) -> Result<I, ConfigError>;
61 fn to_interval(self) -> Result<I, ConfigError>;
62}
63
64impl<I> ConfigIntContext<I> for Result<I, ParseIntError> {
65 fn to_port(self) -> Result<I, ConfigError> {
66 self.map_err(ConfigError::InvalidPort)
67 }
68 fn to_interval(self) -> Result<I, ConfigError> {
69 self.map_err(ConfigError::InvalidInterval)
70 }
71}
72
73impl From<AddrParseError> for ConfigError {
74 fn from(e: AddrParseError) -> Self {
75 ConfigError::InvalidAddr(e)
76 }
77}
78
79pub type ConfigResult<T> = std::result::Result<T, ConfigError>;
80
81pub trait Context<T, E: std::error::Error> {
82 fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError>;
83}
84#[derive(Debug, Clone, Diagnostic)]
85pub enum AuthorizationError {
86 InsufficientPrivileges(Privilege, AuthCheckOwned),
87 TokenDecodeError(String),
88 MissingToken,
89 MissingSecret,
90 InvalidCheck,
91}
92
93impl fmt::Display for AuthorizationError {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 match self {
96 AuthorizationError::InsufficientPrivileges(privilege, check) => {
97 write!(f, "Client lacks privilege '{privilege} {check}'")
98 }
99 AuthorizationError::TokenDecodeError(msg) => msg.fmt(f),
100 AuthorizationError::MissingToken => "No JWT was included in the request".fmt(f),
101 AuthorizationError::MissingSecret => "No JWT was configured".fmt(f),
102 AuthorizationError::InvalidCheck => "Incorrect check provided. This is a bug.".fmt(f),
103 }
104 }
105}
106impl std::error::Error for AuthorizationError {}
107
108pub type AuthorizationResult<T> = Result<T, AuthorizationError>;
109
110#[derive(Debug, Diagnostic, thiserror::Error)]
111pub enum WorterbuchError {
112 IllegalWildcard(RequestPattern),
113 IllegalMultiWildcard(RequestPattern),
114 MultiWildcardAtIllegalPosition(RequestPattern),
115 NoSuchValue(Key),
116 NotSubscribed,
117 IoError(io::Error, MetaData),
118 SerDeError(serde_json::Error, MetaData),
119 InvalidServerResponse(MetaData),
120 Other(Box<dyn std::error::Error + Send + Sync>, MetaData),
121 ServerResponse(Err),
122 ProtocolNegotiationFailed(ProtocolVersionSegment),
123 ReadOnlyKey(Key),
124 AuthorizationRequired(Privilege),
125 AlreadyAuthorized,
126 Unauthorized(#[from] AuthorizationError),
127 NoPubStream(TransactionId),
128 NotLeader,
129 Cas,
130 CasVersionMismatch,
131 NotImplemented,
132 KeyIsLocked(Key),
133 KeyIsNotLocked(Key),
134 FeatureDisabled(MetaData),
135}
136
137impl fmt::Display for WorterbuchError {
138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 match self {
140 WorterbuchError::IllegalWildcard(rp) => {
141 write!(f, "Key contains illegal wildcard: {rp}")
142 }
143 WorterbuchError::IllegalMultiWildcard(rp) => {
144 write!(f, "Key contains illegal multi-wildcard: {rp}")
145 }
146 WorterbuchError::MultiWildcardAtIllegalPosition(rp) => {
147 write!(f, "Key contains multi-wildcard at illegal position: {rp}")
148 }
149 WorterbuchError::NoSuchValue(key) => write!(f, "no value for key '{key}'"),
150 WorterbuchError::NotSubscribed => write!(f, "no such subscription"),
151 WorterbuchError::IoError(e, meta) => write!(f, "{meta}: {e}"),
152 WorterbuchError::SerDeError(e, meta) => write!(f, "{meta}: {e}"),
153 WorterbuchError::Other(e, meta) => write!(f, "{meta}: {e}"),
154 WorterbuchError::ServerResponse(e) => {
155 write!(f, "error {}: {}", e.error_code, e.metadata)
156 }
157 WorterbuchError::ProtocolNegotiationFailed(v) => {
158 write!(f, "The server does not implement protocol version {v}")
159 }
160 WorterbuchError::InvalidServerResponse(meta) => write!(
161 f,
162 "The server sent a response that is not valid for the issued request: {meta}"
163 ),
164 WorterbuchError::ReadOnlyKey(key) => {
165 write!(f, "Tried to modify a read only key: {key}")
166 }
167 WorterbuchError::AuthorizationRequired(op) => {
168 write!(f, "Operation {op} requires authorization")
169 }
170 WorterbuchError::AlreadyAuthorized => {
171 write!(f, "Handshake already done")
172 }
173 WorterbuchError::Unauthorized(err) => err.fmt(f),
174 WorterbuchError::NoPubStream(tid) => {
175 write!(
176 f,
177 "There is no active publish stream for transaction ID {tid}"
178 )
179 }
180 WorterbuchError::NotLeader => {
181 write!(
182 f,
183 "Node cannot process the request since it is not the current cluster leader"
184 )
185 }
186 WorterbuchError::Cas => {
187 write!(
188 f,
189 "Tried to modify a compare-and-swap value without providing a version number"
190 )
191 }
192 WorterbuchError::CasVersionMismatch => {
193 write!(
194 f,
195 "Tried to modify a compare-and-swap value with an out-of-sync version number"
196 )
197 }
198 WorterbuchError::NotImplemented => {
199 write!(
200 f,
201 "This function is not implemented in the negotiated protocol version",
202 )
203 }
204 WorterbuchError::KeyIsLocked(key) => {
205 write!(f, "Key {key} is locked by another client",)
206 }
207 WorterbuchError::KeyIsNotLocked(key) => {
208 write!(f, "Key {key} is not locked",)
209 }
210 WorterbuchError::FeatureDisabled(m) => m.fmt(f),
211 }
212 }
213}
214
215impl<T> Context<T, io::Error> for Result<T, io::Error> {
216 fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError> {
217 self.map_err(|e| WorterbuchError::IoError(e, metadata()))
218 }
219}
220
221impl<T> Context<T, serde_json::Error> for Result<T, serde_json::Error> {
222 fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError> {
223 self.map_err(|e| WorterbuchError::SerDeError(e, metadata()))
224 }
225}
226
227impl<T, V: fmt::Debug + 'static + Send + Sync> Context<T, SendError<V>>
228 for Result<T, SendError<V>>
229{
230 fn context(self, metadata: impl FnOnce() -> String) -> Result<T, WorterbuchError> {
231 self.map_err(|e| WorterbuchError::Other(Box::new(e), metadata()))
232 }
233}
234
235impl<T: Send + Sync + 'static> From<mpsc::error::SendError<T>> for WorterbuchError {
236 fn from(value: mpsc::error::SendError<T>) -> Self {
237 WorterbuchError::Other(Box::new(value), "Internal server error".to_owned())
238 }
239}
240
241impl From<oneshot::error::RecvError> for WorterbuchError {
242 fn from(value: oneshot::error::RecvError) -> Self {
243 WorterbuchError::Other(Box::new(value), "Internal server error".to_owned())
244 }
245}
246
247pub type WorterbuchResult<T> = std::result::Result<T, WorterbuchError>;
248
249#[derive(Debug, Diagnostic)]
250pub enum ConnectionError {
251 IoError(Box<io::Error>),
252 SendError(Box<dyn std::error::Error + Send + Sync>),
253 #[cfg(feature = "ws")]
254 WebsocketError(Box<tungstenite::Error>),
255 #[cfg(feature = "wasm")]
256 WebsocketError(Box<tokio_tungstenite_wasm::Error>),
257 TrySendError(Box<dyn std::error::Error + Send + Sync>),
258 RecvError(Box<oneshot::error::RecvError>),
259 BcRecvError(Box<broadcast::error::RecvError>),
260 WorterbuchError(Box<WorterbuchError>),
261 ConfigError(Box<ConfigError>),
262 SerdeError(Box<serde_json::Error>),
263 AckError(Box<broadcast::error::SendError<u64>>),
264 Timeout(Box<String>),
265 #[cfg(feature = "ws")]
266 HttpError(Box<tungstenite::http::Error>),
267 AuthorizationError(Box<String>),
268 NoServerAddressesConfigured,
269 ServerResponse(Box<Err>),
270}
271
272impl std::error::Error for ConnectionError {}
273
274impl fmt::Display for ConnectionError {
275 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
276 match self {
277 Self::IoError(e) => e.fmt(f),
278 Self::SendError(e) => e.fmt(f),
279 #[cfg(any(feature = "ws", feature = "wasm"))]
280 Self::WebsocketError(e) => e.fmt(f),
281 Self::TrySendError(e) => e.fmt(f),
282 Self::RecvError(e) => e.fmt(f),
283 Self::BcRecvError(e) => e.fmt(f),
284 Self::WorterbuchError(e) => e.fmt(f),
285 Self::ConfigError(e) => e.fmt(f),
286 Self::SerdeError(e) => e.fmt(f),
287 Self::AckError(e) => e.fmt(f),
288 Self::Timeout(msg) => msg.fmt(f),
289 #[cfg(feature = "ws")]
290 Self::HttpError(e) => e.fmt(f),
291 Self::AuthorizationError(msg) => msg.fmt(f),
292 Self::NoServerAddressesConfigured => {
293 fmt::Display::fmt("no server addresses configured", f)
294 }
295 Self::ServerResponse(e) => e.fmt(f),
296 }
297 }
298}
299
300pub type ConnectionResult<T> = std::result::Result<T, ConnectionError>;
301
302impl From<Err> for ConnectionError {
303 fn from(value: Err) -> Self {
304 ConnectionError::ServerResponse(Box::new(value))
305 }
306}
307
308impl From<io::Error> for ConnectionError {
309 fn from(e: io::Error) -> Self {
310 ConnectionError::IoError(Box::new(e))
311 }
312}
313
314impl<T: fmt::Debug + 'static + Send + Sync> From<SendError<T>> for ConnectionError {
315 fn from(e: SendError<T>) -> Self {
316 ConnectionError::SendError(Box::new(e))
317 }
318}
319
320#[cfg(feature = "ws")]
321impl From<tungstenite::Error> for ConnectionError {
322 fn from(e: tungstenite::Error) -> Self {
323 ConnectionError::WebsocketError(Box::new(e))
324 }
325}
326
327#[cfg(feature = "wasm")]
328impl From<tokio_tungstenite_wasm::Error> for ConnectionError {
329 fn from(e: tokio_tungstenite_wasm::Error) -> Self {
330 ConnectionError::WebsocketError(Box::new(e))
331 }
332}
333
334impl From<oneshot::error::RecvError> for ConnectionError {
335 fn from(e: oneshot::error::RecvError) -> Self {
336 ConnectionError::RecvError(Box::new(e))
337 }
338}
339
340impl From<broadcast::error::RecvError> for ConnectionError {
341 fn from(e: broadcast::error::RecvError) -> Self {
342 ConnectionError::BcRecvError(Box::new(e))
343 }
344}
345
346impl From<ConfigError> for ConnectionError {
347 fn from(e: ConfigError) -> Self {
348 ConnectionError::ConfigError(Box::new(e))
349 }
350}
351
352impl From<serde_json::Error> for ConnectionError {
353 fn from(e: serde_json::Error) -> Self {
354 Self::SerdeError(Box::new(e))
355 }
356}
357
358impl From<broadcast::error::SendError<u64>> for ConnectionError {
359 fn from(e: broadcast::error::SendError<u64>) -> Self {
360 Self::AckError(Box::new(e))
361 }
362}
363
364impl From<mpsc::error::TrySendError<ClientMessage>> for ConnectionError {
365 fn from(e: mpsc::error::TrySendError<ClientMessage>) -> Self {
366 Self::TrySendError(Box::new(Box::new(e)))
367 }
368}
369
370#[cfg(feature = "ws")]
371impl From<tungstenite::http::Error> for ConnectionError {
372 fn from(e: tungstenite::http::Error) -> Self {
373 Self::HttpError(Box::new(e))
374 }
375}
376
377impl From<&WorterbuchError> for ErrorCode {
378 fn from(e: &WorterbuchError) -> Self {
379 match e {
380 WorterbuchError::IllegalWildcard(_) => ErrorCode::IllegalWildcard,
381 WorterbuchError::IllegalMultiWildcard(_) => ErrorCode::IllegalMultiWildcard,
382 WorterbuchError::MultiWildcardAtIllegalPosition(_) => {
383 ErrorCode::MultiWildcardAtIllegalPosition
384 }
385 WorterbuchError::NoSuchValue(_) => ErrorCode::NoSuchValue,
386 WorterbuchError::NotSubscribed => ErrorCode::NotSubscribed,
387 WorterbuchError::IoError(_, _) => ErrorCode::IoError,
388 WorterbuchError::SerDeError(_, _) => ErrorCode::SerdeError,
389 WorterbuchError::ProtocolNegotiationFailed(_) => ErrorCode::ProtocolNegotiationFailed,
390 WorterbuchError::InvalidServerResponse(_) => ErrorCode::InvalidServerResponse,
391 WorterbuchError::ReadOnlyKey(_) => ErrorCode::ReadOnlyKey,
392 WorterbuchError::AuthorizationRequired(_) => ErrorCode::AuthorizationRequired,
393 WorterbuchError::AlreadyAuthorized => ErrorCode::AlreadyAuthorized,
394 WorterbuchError::Unauthorized(_) => ErrorCode::Unauthorized,
395 WorterbuchError::NoPubStream(_) => ErrorCode::NoPubStream,
396 WorterbuchError::NotLeader => ErrorCode::NotLeader,
397 WorterbuchError::Cas => ErrorCode::Cas,
398 WorterbuchError::CasVersionMismatch => ErrorCode::CasVersionMismatch,
399 WorterbuchError::NotImplemented => ErrorCode::NotImplemented,
400 WorterbuchError::KeyIsLocked(_) => ErrorCode::KeyIsLocked,
401 WorterbuchError::KeyIsNotLocked(_) => ErrorCode::KeyIsNotLocked,
402 WorterbuchError::FeatureDisabled(_) => ErrorCode::KeyIsNotLocked,
403 WorterbuchError::Other(_, _) | WorterbuchError::ServerResponse(_) => ErrorCode::Other,
404 }
405 }
406}
407
408impl From<WorterbuchError> for (StatusCode, String) {
409 fn from(e: WorterbuchError) -> Self {
410 match &e {
411 WorterbuchError::IllegalMultiWildcard(_)
412 | WorterbuchError::IllegalWildcard(_)
413 | WorterbuchError::MultiWildcardAtIllegalPosition(_)
414 | WorterbuchError::NotImplemented
415 | WorterbuchError::KeyIsNotLocked(_) => (StatusCode::BAD_REQUEST, e.to_string()),
416
417 WorterbuchError::AlreadyAuthorized
418 | WorterbuchError::NotSubscribed
419 | WorterbuchError::FeatureDisabled(_)
420 | WorterbuchError::NoPubStream(_) => (StatusCode::UNPROCESSABLE_ENTITY, e.to_string()),
421
422 WorterbuchError::KeyIsLocked(_)
423 | WorterbuchError::Cas
424 | WorterbuchError::CasVersionMismatch => (StatusCode::CONFLICT, e.to_string()),
425
426 WorterbuchError::ReadOnlyKey(_) => (StatusCode::METHOD_NOT_ALLOWED, e.to_string()),
427
428 WorterbuchError::AuthorizationRequired(_) => (StatusCode::UNAUTHORIZED, e.to_string()),
429
430 WorterbuchError::NoSuchValue(_) => (StatusCode::NOT_FOUND, e.to_string()),
431
432 WorterbuchError::Unauthorized(ae) => match &ae {
433 AuthorizationError::MissingToken => (StatusCode::UNAUTHORIZED, e.to_string()),
434 _ => (StatusCode::FORBIDDEN, e.to_string()),
435 },
436
437 WorterbuchError::IoError(_, _)
438 | WorterbuchError::SerDeError(_, _)
439 | WorterbuchError::InvalidServerResponse(_)
440 | WorterbuchError::Other(_, _)
441 | WorterbuchError::ServerResponse(_)
442 | WorterbuchError::ProtocolNegotiationFailed(_) => {
443 (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
444 }
445
446 WorterbuchError::NotLeader => (StatusCode::NO_CONTENT, e.to_string()),
447 }
448 }
449}
450
451#[cfg(feature = "axum-errors")]
452pub mod axum {
453 use crate::error::{AuthorizationError, WorterbuchError};
454 use axum::response::IntoResponse;
455 use http::StatusCode;
456
457 impl IntoResponse for WorterbuchError {
458 fn into_response(self) -> axum::response::Response {
459 let err: (StatusCode, String) = self.into();
460 err.into_response()
461 }
462 }
463
464 impl IntoResponse for AuthorizationError {
465 fn into_response(self) -> axum::response::Response {
466 let err: WorterbuchError = self.into();
467 err.into_response()
468 }
469 }
470}