1use std::{borrow::Cow, fmt, sync::Arc};
4
5use rmp::{
6 decode::{MarkerReadError, NumValueReadError, ValueReadError},
7 encode::{RmpWriteErr, ValueWriteError},
8};
9use rmpv::Value;
10use tokio::{task::JoinError, time::error::Elapsed};
11
12#[derive(Clone, Debug, thiserror::Error)]
14#[error("{description} (code {code})")]
15pub struct ErrorResponse {
16 pub code: u32,
17 pub description: String,
18 pub extra: Option<rmpv::Value>,
19}
20
21impl ErrorResponse {
22 pub fn new(code: u32, description: String, extra: Option<rmpv::Value>) -> Self {
23 Self {
24 code,
25 description,
26 extra,
27 }
28 }
29}
30
31#[non_exhaustive]
33#[derive(Debug, thiserror::Error)]
34pub enum Error {
35 #[error("Error response: {0}")]
37 Response(#[from] ErrorResponse),
38 #[error("Call or eval error: {0}")]
40 CallEval(Value),
41
42 #[error("Timeout")]
44 Timeout,
45 #[error("Connect timeout")]
47 ConnectTimeout,
48
49 #[error("Authorization error: {} (code {})" ,.0.description, .0.code)]
51 Auth(#[source] ErrorResponse),
52
53 #[error(transparent)]
55 Encode(#[from] EncodingError),
56 #[error(transparent)]
58 Decode(#[from] DecodingError),
59
60 #[error("Duplicated sync '{0}'")]
62 DuplicatedSync(u32),
63
64 #[error("Space is missing primary index")]
66 SpaceMissingPrimaryIndex,
67
68 #[error("TCP connection IO error")]
70 Io(#[from] Arc<tokio::io::Error>),
71 #[error("TCP connection closed")]
73 ConnectionClosed,
74
75 #[error(transparent)]
77 Other(anyhow::Error),
78}
79
80impl From<tokio::io::Error> for Error {
81 fn from(v: tokio::io::Error) -> Self {
82 Self::Io(Arc::new(v))
83 }
84}
85
86impl From<CodecDecodeError> for Error {
87 fn from(value: CodecDecodeError) -> Self {
88 match value {
89 CodecDecodeError::Io(x) => x.into(),
90 CodecDecodeError::Decode(x) => x.into(),
91 }
92 }
93}
94
95impl From<CodecEncodeError> for Error {
96 fn from(value: CodecEncodeError) -> Self {
97 match value {
98 CodecEncodeError::Io(x) => x.into(),
99 CodecEncodeError::Encode(x) => x.into(),
100 }
101 }
102}
103
104impl From<ConnectionError> for Error {
105 fn from(value: ConnectionError) -> Self {
106 match value {
107 ConnectionError::Io(x) => x.into(),
108 ConnectionError::ConnectionClosed => Self::ConnectionClosed,
109 ConnectionError::Decode(x) => x.into(),
110 err @ ConnectionError::JoinError(_) => Self::Other(err.into()),
111 }
112 }
113}
114
115impl From<Elapsed> for Error {
116 fn from(_: Elapsed) -> Self {
117 Self::Timeout
118 }
119}
120
121#[non_exhaustive]
123#[derive(Debug, thiserror::Error)]
124pub enum EncodingError {
125 #[error("Failed to encode data into MessagePack")]
127 MessagePack(#[source] anyhow::Error),
128}
129
130impl<E> From<ValueWriteError<E>> for EncodingError
131where
132 E: RmpWriteErr + Send + Sync,
133{
134 fn from(v: ValueWriteError<E>) -> Self {
135 Self::MessagePack(v.into())
136 }
137}
138
139impl From<std::io::Error> for EncodingError {
140 fn from(value: std::io::Error) -> Self {
141 Self::MessagePack(value.into())
142 }
143}
144
145impl From<rmp_serde::encode::Error> for EncodingError {
146 fn from(value: rmp_serde::encode::Error) -> Self {
147 Self::MessagePack(value.into())
148 }
149}
150
151#[derive(Clone, Debug, thiserror::Error)]
153#[error("{kind}{}", DecodingErrorLocation::display_in_error(.location))]
154pub struct DecodingError {
155 #[source]
156 kind: Arc<DecodingErrorDetails>,
157 location: Option<DecodingErrorLocation>,
158}
159
160impl DecodingError {
161 pub(crate) fn new(kind: DecodingErrorDetails) -> Self {
162 Self {
163 kind: Arc::new(kind),
164 location: None,
165 }
166 }
167
168 pub(crate) fn missing_key(key: &'static str) -> Self {
169 DecodingErrorDetails::MissingKey(key).into()
170 }
171
172 pub(crate) fn type_mismatch(
173 expected: &'static str,
174 actual: impl Into<Cow<'static, str>>,
175 ) -> Self {
176 DecodingErrorDetails::TypeMismatch {
177 expected,
178 actual: actual.into(),
179 }
180 .into()
181 }
182
183 pub(crate) fn message_pack(err: impl Into<anyhow::Error>) -> Self {
184 DecodingErrorDetails::MessagePack(err.into()).into()
185 }
186
187 pub(crate) fn unknown_response_code(code: u32) -> Self {
188 DecodingErrorDetails::UnknownResponseCode(code).into()
189 }
190
191 pub(crate) fn invalid_tuple_length(expected: usize, actual: usize) -> Self {
192 DecodingErrorDetails::InvalidTupleLength { expected, actual }.into()
193 }
194
195 pub(crate) fn with_location(mut self, location: DecodingErrorLocation) -> Self {
196 self.location = Some(location);
197 self
198 }
199
200 pub(crate) fn in_key(self, key: &'static str) -> Self {
201 self.with_location(DecodingErrorLocation::Key(key))
202 }
203
204 pub(crate) fn in_other(self, other: &'static str) -> Self {
205 self.with_location(DecodingErrorLocation::Other(other))
206 }
207
208 pub fn kind(&self) -> &DecodingErrorDetails {
209 &self.kind
210 }
211
212 pub fn location(&self) -> Option<&DecodingErrorLocation> {
213 self.location.as_ref()
214 }
215}
216
217impl From<DecodingErrorDetails> for DecodingError {
218 fn from(value: DecodingErrorDetails) -> Self {
219 Self::new(value)
220 }
221}
222
223#[non_exhaustive]
225#[derive(Debug, thiserror::Error)]
226pub enum DecodingErrorDetails {
227 #[error("unknown response code: {0}")]
229 UnknownResponseCode(u32),
230 #[error("Missing key in response: {0}")]
232 MissingKey(&'static str),
233 #[error("Type mismatch, expected '{expected}', actual '{actual}'")]
235 TypeMismatch {
236 expected: &'static str,
237 actual: Cow<'static, str>,
238 },
239 #[error("Invalid tuple length {actual}, expected {expected}")]
241 InvalidTupleLength { expected: usize, actual: usize },
242
243 #[error("Failed to deserialize rmpv::Value")]
245 Serde(#[source] rmpv::ext::Error),
246 #[error("Failed to decode data from MessagePack")]
248 MessagePack(#[source] anyhow::Error),
249}
250
251impl From<ValueReadError> for DecodingError {
252 fn from(v: ValueReadError) -> Self {
253 DecodingErrorDetails::MessagePack(v.into()).into()
254 }
255}
256
257impl From<rmpv::decode::Error> for DecodingError {
258 fn from(v: rmpv::decode::Error) -> Self {
259 DecodingErrorDetails::MessagePack(v.into()).into()
260 }
261}
262
263impl From<rmpv::ext::Error> for DecodingError {
264 fn from(v: rmpv::ext::Error) -> Self {
265 DecodingErrorDetails::Serde(v).into()
266 }
267}
268
269impl From<NumValueReadError> for DecodingError {
270 fn from(v: NumValueReadError) -> Self {
271 DecodingErrorDetails::MessagePack(v.into()).into()
272 }
273}
274
275impl From<MarkerReadError> for DecodingError {
276 fn from(v: MarkerReadError) -> Self {
277 DecodingErrorDetails::MessagePack(v.0.into()).into()
278 }
279}
280
281#[derive(Clone, Debug)]
282pub enum DecodingErrorLocation {
283 Key(&'static str),
284 FrameLengthField,
285 Other(&'static str),
286}
287
288impl fmt::Display for DecodingErrorLocation {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 match self {
291 DecodingErrorLocation::Key(x) => write!(f, "key '{x}'"),
292 DecodingErrorLocation::FrameLengthField => write!(f, "frame length field"),
293 DecodingErrorLocation::Other(x) => write!(f, "{x}"),
294 }
295 }
296}
297
298impl DecodingErrorLocation {
299 fn display_in_error(value: &Option<Self>) -> String {
300 if let Some(x) = value {
301 format!(" (in {x})")
302 } else {
303 String::new()
304 }
305 }
306}
307
308#[derive(Debug, thiserror::Error)]
310pub(crate) enum CodecDecodeError {
311 #[error(transparent)]
312 Io(#[from] tokio::io::Error),
313 #[error(transparent)]
314 Decode(#[from] DecodingError),
315}
316
317#[derive(Debug, thiserror::Error)]
319pub(crate) enum CodecEncodeError {
320 #[error(transparent)]
321 Io(#[from] tokio::io::Error),
322 #[error(transparent)]
323 Encode(#[from] EncodingError),
324}
325
326#[derive(Clone, Debug, thiserror::Error)]
328pub(crate) enum ConnectionError {
329 #[error(transparent)]
330 Io(Arc<tokio::io::Error>),
331 #[error("Connection closed")]
332 ConnectionClosed,
333 #[error(transparent)]
334 Decode(#[from] DecodingError),
335 #[error("Tokio JoinHandle error: {0:?}")]
336 JoinError(#[source] Arc<JoinError>),
337}
338
339impl From<tokio::io::Error> for ConnectionError {
340 fn from(value: tokio::io::Error) -> Self {
341 Self::Io(Arc::new(value))
342 }
343}
344
345impl From<CodecDecodeError> for ConnectionError {
346 fn from(value: CodecDecodeError) -> Self {
347 match value {
348 CodecDecodeError::Io(x) => x.into(),
349 CodecDecodeError::Decode(x) => x.into(),
350 }
351 }
352}
353
354impl From<JoinError> for ConnectionError {
355 fn from(value: JoinError) -> Self {
356 Self::JoinError(Arc::new(value))
357 }
358}