fluvio_dataplane_protocol/
error_code.rs

1//!
2//! # Fluvio Error Codes
3//!
4//! Error code definitions described here.
5//!
6
7use std::fmt::{Display, Formatter};
8use flv_util::string_helper::upper_cammel_case_to_sentence;
9use fluvio_protocol::{Encoder, Decoder};
10use crate::smartmodule::SmartModuleRuntimeError;
11
12// -----------------------------------
13// Error Definition & Implementation
14// -----------------------------------
15
16#[repr(i16)]
17#[derive(thiserror::Error, Encoder, Decoder, PartialEq, Debug, Clone)]
18#[non_exhaustive]
19pub enum ErrorCode {
20    #[fluvio(tag = -1)]
21    #[error("An unknown server error occurred")]
22    UnknownServerError,
23
24    // Not an error
25    #[fluvio(tag = 0)]
26    #[error("ErrorCode indicated success. If you see this it is likely a bug.")]
27    None,
28
29    #[fluvio(tag = 2)]
30    #[error("Other error: {0}")]
31    Other(String),
32
33    #[fluvio(tag = 1)]
34    #[error("Offset out of range")]
35    OffsetOutOfRange,
36    #[fluvio(tag = 6)]
37    #[error("the given SPU is not the leader for the partition")]
38    NotLeaderForPartition,
39    #[fluvio(tag = 7)]
40    #[error("the request '{kind}' exceeded the timeout {timeout_ms} ms")]
41    RequestTimedOut { timeout_ms: u64, kind: RequestKind },
42    #[fluvio(tag = 10)]
43    #[error("the message is too large to send")]
44    MessageTooLarge,
45    #[fluvio(tag = 13)]
46    #[error("permission denied")]
47    PermissionDenied,
48    #[fluvio(tag = 56)]
49    #[error("a storage error occurred")]
50    StorageError,
51    #[fluvio(tag = 60)]
52    #[error("invalid create request")]
53    InvalidCreateRequest,
54    #[fluvio(tag = 61)]
55    #[error("invalid Delete request")]
56    InvalidDeleteRequest,
57
58    // Spu errors
59    #[fluvio(tag = 1000)]
60    #[error("an error occurred on the SPU")]
61    SpuError,
62    #[fluvio(tag = 1001)]
63    #[error("failed to register an SPU")]
64    SpuRegisterationFailed,
65    #[fluvio(tag = 1002)]
66    #[error("the SPU is offline")]
67    SpuOffline,
68    #[fluvio(tag = 1003)]
69    #[error("the SPU was not found")]
70    SpuNotFound,
71    #[fluvio(tag = 1004)]
72    #[error("the SPU already exists")]
73    SpuAlreadyExists,
74
75    // Topic errors
76    #[fluvio(tag = 2000)]
77    #[error("a topic error occurred")]
78    TopicError,
79    #[fluvio(tag = 2001)]
80    #[error("the topic was not found")]
81    TopicNotFound,
82    #[fluvio(tag = 2002)]
83    #[error("the topic already exists")]
84    TopicAlreadyExists,
85    #[fluvio(tag = 2003)]
86    #[error("the topic has not been initialized")]
87    TopicPendingInitialization,
88    #[fluvio(tag = 2004)]
89    #[error("the topic configuration is invalid")]
90    TopicInvalidConfiguration,
91    #[fluvio(tag = 2005)]
92    #[error("the topic is not provisioned")]
93    TopicNotProvisioned,
94    #[fluvio(tag = 2006)]
95    #[error("the topic name is invalid")]
96    TopicInvalidName,
97
98    // Partition errors
99    #[fluvio(tag = 3000)]
100    #[error("the partition is not initialized")]
101    PartitionPendingInitialization,
102    #[fluvio(tag = 3001)]
103    #[error("the partition is not a leader")]
104    PartitionNotLeader,
105
106    // Stream Fetch error
107    #[fluvio(tag = 3002)]
108    #[error("the fetch session was not found")]
109    FetchSessionNotFoud,
110
111    // Legacy SmartModule errors
112    #[deprecated(since = "0.9.13")]
113    #[fluvio(tag = 4000)]
114    #[error("a legacy SmartModule error occurred")]
115    LegacySmartModuleError(#[from] LegacySmartModuleError),
116
117    // Managed Connector Errors
118    #[fluvio(tag = 5000)]
119    #[error("an error occurred while managing a connector")]
120    ManagedConnectorError,
121
122    #[fluvio(tag = 5001)]
123    #[error("the managed connector was not found")]
124    ManagedConnectorNotFound,
125
126    #[fluvio(tag = 5002)]
127    #[error("an error occurred while managing a connector")]
128    ManagedConnectorAlreadyExists,
129
130    // SmartModule Errors
131    #[fluvio(tag = 6000)]
132    #[error("an error occurred while managing a SmartModule")]
133    SmartModuleError,
134    #[fluvio(tag = 6001)]
135    #[error("SmartModule {name} was not found")]
136    SmartModuleNotFound { name: String },
137    #[fluvio(tag = 6002)]
138    #[error("SmartModule is invalid: {error}")]
139    SmartModuleInvalid { error: String, name: Option<String> },
140    #[fluvio(tag = 6003)]
141    #[error("SmartModule is not a valid '{kind}' SmartModule due to {error}. Are you missing a #[smartmodule({kind})] attribute?")]
142    SmartModuleInvalidExports { error: String, kind: String },
143    #[fluvio(tag = 6004)]
144    #[error("SmartModule runtime error {0}")]
145    SmartModuleRuntimeError(SmartModuleRuntimeError),
146
147    // TableFormat Errors
148    #[fluvio(tag = 7000)]
149    #[error("a tableformat error occurred")]
150    TableFormatError,
151    #[fluvio(tag = 7001)]
152    #[error("the tableformat was not found")]
153    TableFormatNotFound,
154    #[fluvio(tag = 7002)]
155    #[error("the tableformat already exists")]
156    TableFormatAlreadyExists,
157
158    // DerivedStream Object Errors
159    #[fluvio(tag = 8000)]
160    #[error("DerivedStream object error")]
161    DerivedStreamObjectError,
162    #[fluvio(tag = 8001)]
163    #[error("the derivedstream was not found")]
164    DerivedStreamNotFound(String),
165    #[fluvio(tag = 8002)]
166    #[error("the derivedstream join data cannot be fetched")]
167    DerivedStreamJoinFetchError,
168    #[fluvio(tag = 8003)]
169    #[error("the derivedstream {0} is invalid")]
170    DerivedStreamInvalid(String),
171    #[error("can't do recursive derivedstream yet: {0}->{1}")]
172    DerivedStreamRecursion(String, String),
173
174    // Compression errors
175    #[fluvio(tag = 9000)]
176    #[error("a compression error occurred in the SPU")]
177    CompressionError,
178}
179
180impl Default for ErrorCode {
181    fn default() -> ErrorCode {
182        ErrorCode::None
183    }
184}
185
186impl ErrorCode {
187    pub fn is_ok(&self) -> bool {
188        matches!(self, ErrorCode::None)
189    }
190
191    pub fn to_sentence(&self) -> String {
192        match self {
193            ErrorCode::None => "".to_owned(),
194            _ => upper_cammel_case_to_sentence(format!("{:?}", self), true),
195        }
196    }
197
198    pub fn is_error(&self) -> bool {
199        !self.is_ok()
200    }
201}
202
203/// Deprecated. A type representing the possible errors that may occur during DerivedStream execution.
204#[derive(thiserror::Error, Debug, Clone, PartialEq, Encoder, Decoder)]
205pub enum LegacySmartModuleError {
206    #[error("Runtime error")]
207    Runtime(#[from] SmartModuleRuntimeError),
208    #[error("WASM Module error: {0}")]
209    InvalidWasmModule(String),
210    #[error("WASM module is not a valid '{0}' DerivedStream. Are you missing a #[smartmodule({0})] attribute?")]
211    InvalidDerivedStreamModule(String),
212}
213
214impl Default for LegacySmartModuleError {
215    fn default() -> Self {
216        Self::Runtime(Default::default())
217    }
218}
219
220#[derive(Debug, Clone, PartialEq, Encoder, Decoder)]
221#[non_exhaustive]
222pub enum RequestKind {
223    Produce,
224}
225
226impl Default for RequestKind {
227    fn default() -> Self {
228        RequestKind::Produce
229    }
230}
231
232impl Display for RequestKind {
233    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
234        write!(f, "{:?}", self)
235    }
236}
237
238// -----------------------------------
239// Unit Tests
240// -----------------------------------
241
242#[cfg(test)]
243mod test {
244    use super::*;
245
246    #[test]
247    fn test_protocol_tags_unchanged() {
248        macro_rules! assert_tag {
249            ($variant:expr, $tag:expr, $version:expr) => {{
250                let mut data = Vec::new();
251                let mut value = ErrorCode::default();
252
253                fluvio_protocol::Encoder::encode(&$variant, &mut data, $version)
254                    .expect(&format!("Failed to encode {}", stringify!($variant)));
255                assert_eq!(
256                    data[..2],
257                    ($tag as i16).to_be_bytes(),
258                    "Data check failed for {}",
259                    stringify!($variant)
260                );
261                fluvio_protocol::Decoder::decode(
262                    &mut value,
263                    &mut std::io::Cursor::new(&data),
264                    $version,
265                )
266                .expect(&format!("Failed to decode {}", stringify!($variant)));
267                assert_eq!(
268                    &value,
269                    &$variant,
270                    "Value check failed for {}",
271                    stringify!($variant)
272                );
273            }};
274        }
275
276        assert_tag!(ErrorCode::UnknownServerError, -1, 0);
277        assert_tag!(ErrorCode::None, 0, 0);
278        assert_tag!(ErrorCode::OffsetOutOfRange, 1, 0);
279        assert_tag!(ErrorCode::NotLeaderForPartition, 6, 0);
280        assert_tag!(
281            ErrorCode::RequestTimedOut {
282                kind: RequestKind::Produce,
283                timeout_ms: 1
284            },
285            7,
286            0
287        );
288        assert_tag!(ErrorCode::MessageTooLarge, 10, 0);
289        assert_tag!(ErrorCode::PermissionDenied, 13, 0);
290        assert_tag!(ErrorCode::StorageError, 56, 0);
291
292        // Spu errors
293        assert_tag!(ErrorCode::SpuError, 1000, 0);
294        assert_tag!(ErrorCode::SpuRegisterationFailed, 1001, 0);
295        assert_tag!(ErrorCode::SpuOffline, 1002, 0);
296        assert_tag!(ErrorCode::SpuNotFound, 1003, 0);
297        assert_tag!(ErrorCode::SpuAlreadyExists, 1004, 0);
298
299        // Topic errors
300        assert_tag!(ErrorCode::TopicError, 2000, 0);
301        assert_tag!(ErrorCode::TopicNotFound, 2001, 0);
302        assert_tag!(ErrorCode::TopicAlreadyExists, 2002, 0);
303        assert_tag!(ErrorCode::TopicPendingInitialization, 2003, 0);
304        assert_tag!(ErrorCode::TopicInvalidConfiguration, 2004, 0);
305        assert_tag!(ErrorCode::TopicNotProvisioned, 2005, 0);
306
307        // Partition errors
308        assert_tag!(ErrorCode::PartitionPendingInitialization, 3000, 0);
309        assert_tag!(ErrorCode::PartitionNotLeader, 3001, 0);
310
311        // Stream Fetch error
312        assert_tag!(ErrorCode::FetchSessionNotFoud, 3002, 0);
313    }
314
315    #[test]
316    fn test_errorcode_impls_error() {
317        fn assert_error<E: std::error::Error>() {}
318        assert_error::<ErrorCode>();
319    }
320}