fluvio_dataplane_protocol/
error_code.rs1use std::fmt::{Display, Formatter};
8use flv_util::string_helper::upper_cammel_case_to_sentence;
9use fluvio_protocol::{Encoder, Decoder};
10use crate::smartmodule::SmartModuleRuntimeError;
11
12#[repr(i16)]
17#[derive(thiserror::Error, Encoder, Decoder, PartialEq, Debug, Clone)]
18#[non_exhaustive]
19pub enum ErrorCode {
20 #[fluvio(tag = -1)]
21 #[error("An unknown server error occurred")]
22 UnknownServerError,
23
24 #[fluvio(tag = 0)]
26 #[error("ErrorCode indicated success. If you see this it is likely a bug.")]
27 None,
28
29 #[fluvio(tag = 2)]
30 #[error("Other error: {0}")]
31 Other(String),
32
33 #[fluvio(tag = 1)]
34 #[error("Offset out of range")]
35 OffsetOutOfRange,
36 #[fluvio(tag = 6)]
37 #[error("the given SPU is not the leader for the partition")]
38 NotLeaderForPartition,
39 #[fluvio(tag = 7)]
40 #[error("the request '{kind}' exceeded the timeout {timeout_ms} ms")]
41 RequestTimedOut { timeout_ms: u64, kind: RequestKind },
42 #[fluvio(tag = 10)]
43 #[error("the message is too large to send")]
44 MessageTooLarge,
45 #[fluvio(tag = 13)]
46 #[error("permission denied")]
47 PermissionDenied,
48 #[fluvio(tag = 56)]
49 #[error("a storage error occurred")]
50 StorageError,
51 #[fluvio(tag = 60)]
52 #[error("invalid create request")]
53 InvalidCreateRequest,
54 #[fluvio(tag = 61)]
55 #[error("invalid Delete request")]
56 InvalidDeleteRequest,
57
58 #[fluvio(tag = 1000)]
60 #[error("an error occurred on the SPU")]
61 SpuError,
62 #[fluvio(tag = 1001)]
63 #[error("failed to register an SPU")]
64 SpuRegisterationFailed,
65 #[fluvio(tag = 1002)]
66 #[error("the SPU is offline")]
67 SpuOffline,
68 #[fluvio(tag = 1003)]
69 #[error("the SPU was not found")]
70 SpuNotFound,
71 #[fluvio(tag = 1004)]
72 #[error("the SPU already exists")]
73 SpuAlreadyExists,
74
75 #[fluvio(tag = 2000)]
77 #[error("a topic error occurred")]
78 TopicError,
79 #[fluvio(tag = 2001)]
80 #[error("the topic was not found")]
81 TopicNotFound,
82 #[fluvio(tag = 2002)]
83 #[error("the topic already exists")]
84 TopicAlreadyExists,
85 #[fluvio(tag = 2003)]
86 #[error("the topic has not been initialized")]
87 TopicPendingInitialization,
88 #[fluvio(tag = 2004)]
89 #[error("the topic configuration is invalid")]
90 TopicInvalidConfiguration,
91 #[fluvio(tag = 2005)]
92 #[error("the topic is not provisioned")]
93 TopicNotProvisioned,
94 #[fluvio(tag = 2006)]
95 #[error("the topic name is invalid")]
96 TopicInvalidName,
97
98 #[fluvio(tag = 3000)]
100 #[error("the partition is not initialized")]
101 PartitionPendingInitialization,
102 #[fluvio(tag = 3001)]
103 #[error("the partition is not a leader")]
104 PartitionNotLeader,
105
106 #[fluvio(tag = 3002)]
108 #[error("the fetch session was not found")]
109 FetchSessionNotFoud,
110
111 #[deprecated(since = "0.9.13")]
113 #[fluvio(tag = 4000)]
114 #[error("a legacy SmartModule error occurred")]
115 LegacySmartModuleError(#[from] LegacySmartModuleError),
116
117 #[fluvio(tag = 5000)]
119 #[error("an error occurred while managing a connector")]
120 ManagedConnectorError,
121
122 #[fluvio(tag = 5001)]
123 #[error("the managed connector was not found")]
124 ManagedConnectorNotFound,
125
126 #[fluvio(tag = 5002)]
127 #[error("an error occurred while managing a connector")]
128 ManagedConnectorAlreadyExists,
129
130 #[fluvio(tag = 6000)]
132 #[error("an error occurred while managing a SmartModule")]
133 SmartModuleError,
134 #[fluvio(tag = 6001)]
135 #[error("SmartModule {name} was not found")]
136 SmartModuleNotFound { name: String },
137 #[fluvio(tag = 6002)]
138 #[error("SmartModule is invalid: {error}")]
139 SmartModuleInvalid { error: String, name: Option<String> },
140 #[fluvio(tag = 6003)]
141 #[error("SmartModule is not a valid '{kind}' SmartModule due to {error}. Are you missing a #[smartmodule({kind})] attribute?")]
142 SmartModuleInvalidExports { error: String, kind: String },
143 #[fluvio(tag = 6004)]
144 #[error("SmartModule runtime error {0}")]
145 SmartModuleRuntimeError(SmartModuleRuntimeError),
146
147 #[fluvio(tag = 7000)]
149 #[error("a tableformat error occurred")]
150 TableFormatError,
151 #[fluvio(tag = 7001)]
152 #[error("the tableformat was not found")]
153 TableFormatNotFound,
154 #[fluvio(tag = 7002)]
155 #[error("the tableformat already exists")]
156 TableFormatAlreadyExists,
157
158 #[fluvio(tag = 8000)]
160 #[error("DerivedStream object error")]
161 DerivedStreamObjectError,
162 #[fluvio(tag = 8001)]
163 #[error("the derivedstream was not found")]
164 DerivedStreamNotFound(String),
165 #[fluvio(tag = 8002)]
166 #[error("the derivedstream join data cannot be fetched")]
167 DerivedStreamJoinFetchError,
168 #[fluvio(tag = 8003)]
169 #[error("the derivedstream {0} is invalid")]
170 DerivedStreamInvalid(String),
171 #[error("can't do recursive derivedstream yet: {0}->{1}")]
172 DerivedStreamRecursion(String, String),
173
174 #[fluvio(tag = 9000)]
176 #[error("a compression error occurred in the SPU")]
177 CompressionError,
178}
179
180impl Default for ErrorCode {
181 fn default() -> ErrorCode {
182 ErrorCode::None
183 }
184}
185
186impl ErrorCode {
187 pub fn is_ok(&self) -> bool {
188 matches!(self, ErrorCode::None)
189 }
190
191 pub fn to_sentence(&self) -> String {
192 match self {
193 ErrorCode::None => "".to_owned(),
194 _ => upper_cammel_case_to_sentence(format!("{:?}", self), true),
195 }
196 }
197
198 pub fn is_error(&self) -> bool {
199 !self.is_ok()
200 }
201}
202
203#[derive(thiserror::Error, Debug, Clone, PartialEq, Encoder, Decoder)]
205pub enum LegacySmartModuleError {
206 #[error("Runtime error")]
207 Runtime(#[from] SmartModuleRuntimeError),
208 #[error("WASM Module error: {0}")]
209 InvalidWasmModule(String),
210 #[error("WASM module is not a valid '{0}' DerivedStream. Are you missing a #[smartmodule({0})] attribute?")]
211 InvalidDerivedStreamModule(String),
212}
213
214impl Default for LegacySmartModuleError {
215 fn default() -> Self {
216 Self::Runtime(Default::default())
217 }
218}
219
220#[derive(Debug, Clone, PartialEq, Encoder, Decoder)]
221#[non_exhaustive]
222pub enum RequestKind {
223 Produce,
224}
225
226impl Default for RequestKind {
227 fn default() -> Self {
228 RequestKind::Produce
229 }
230}
231
232impl Display for RequestKind {
233 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
234 write!(f, "{:?}", self)
235 }
236}
237
238#[cfg(test)]
243mod test {
244 use super::*;
245
246 #[test]
247 fn test_protocol_tags_unchanged() {
248 macro_rules! assert_tag {
249 ($variant:expr, $tag:expr, $version:expr) => {{
250 let mut data = Vec::new();
251 let mut value = ErrorCode::default();
252
253 fluvio_protocol::Encoder::encode(&$variant, &mut data, $version)
254 .expect(&format!("Failed to encode {}", stringify!($variant)));
255 assert_eq!(
256 data[..2],
257 ($tag as i16).to_be_bytes(),
258 "Data check failed for {}",
259 stringify!($variant)
260 );
261 fluvio_protocol::Decoder::decode(
262 &mut value,
263 &mut std::io::Cursor::new(&data),
264 $version,
265 )
266 .expect(&format!("Failed to decode {}", stringify!($variant)));
267 assert_eq!(
268 &value,
269 &$variant,
270 "Value check failed for {}",
271 stringify!($variant)
272 );
273 }};
274 }
275
276 assert_tag!(ErrorCode::UnknownServerError, -1, 0);
277 assert_tag!(ErrorCode::None, 0, 0);
278 assert_tag!(ErrorCode::OffsetOutOfRange, 1, 0);
279 assert_tag!(ErrorCode::NotLeaderForPartition, 6, 0);
280 assert_tag!(
281 ErrorCode::RequestTimedOut {
282 kind: RequestKind::Produce,
283 timeout_ms: 1
284 },
285 7,
286 0
287 );
288 assert_tag!(ErrorCode::MessageTooLarge, 10, 0);
289 assert_tag!(ErrorCode::PermissionDenied, 13, 0);
290 assert_tag!(ErrorCode::StorageError, 56, 0);
291
292 assert_tag!(ErrorCode::SpuError, 1000, 0);
294 assert_tag!(ErrorCode::SpuRegisterationFailed, 1001, 0);
295 assert_tag!(ErrorCode::SpuOffline, 1002, 0);
296 assert_tag!(ErrorCode::SpuNotFound, 1003, 0);
297 assert_tag!(ErrorCode::SpuAlreadyExists, 1004, 0);
298
299 assert_tag!(ErrorCode::TopicError, 2000, 0);
301 assert_tag!(ErrorCode::TopicNotFound, 2001, 0);
302 assert_tag!(ErrorCode::TopicAlreadyExists, 2002, 0);
303 assert_tag!(ErrorCode::TopicPendingInitialization, 2003, 0);
304 assert_tag!(ErrorCode::TopicInvalidConfiguration, 2004, 0);
305 assert_tag!(ErrorCode::TopicNotProvisioned, 2005, 0);
306
307 assert_tag!(ErrorCode::PartitionPendingInitialization, 3000, 0);
309 assert_tag!(ErrorCode::PartitionNotLeader, 3001, 0);
310
311 assert_tag!(ErrorCode::FetchSessionNotFoud, 3002, 0);
313 }
314
315 #[test]
316 fn test_errorcode_impls_error() {
317 fn assert_error<E: std::error::Error>() {}
318 assert_error::<ErrorCode>();
319 }
320}