1use crate::proto::ErrorResponse;
19use std::fmt::{Debug, Display, Formatter};
20
21pub struct ApiError {
23 pub code: i32,
24 pub message: String,
25}
26
27impl Debug for ApiError {
28 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
29 f.debug_struct("ApiError")
30 .field("code", &self.code)
31 .field("message", &self.message)
32 .finish()
33 }
34}
35
36impl Display for ApiError {
37 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38 Debug::fmt(self, f)
39 }
40}
41
42impl ApiError {
43 pub fn is_retriable(&self) -> bool {
45 FlussError::for_code(self.code).is_retriable()
46 }
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54#[repr(i32)]
55pub enum FlussError {
56 UnknownServerError = -1,
58 None = 0,
60 NetworkException = 1,
62 UnsupportedVersion = 2,
64 CorruptMessage = 3,
66 DatabaseNotExist = 4,
68 DatabaseNotEmpty = 5,
70 DatabaseAlreadyExist = 6,
72 TableNotExist = 7,
74 TableAlreadyExist = 8,
76 SchemaNotExist = 9,
78 LogStorageException = 10,
80 KvStorageException = 11,
82 NotLeaderOrFollower = 12,
84 RecordTooLargeException = 13,
86 CorruptRecordException = 14,
88 InvalidTableException = 15,
90 InvalidDatabaseException = 16,
92 InvalidReplicationFactor = 17,
94 InvalidRequiredAcks = 18,
96 LogOffsetOutOfRangeException = 19,
98 NonPrimaryKeyTableException = 20,
100 UnknownTableOrBucketException = 21,
102 InvalidUpdateVersionException = 22,
104 InvalidCoordinatorException = 23,
106 FencedLeaderEpochException = 24,
108 RequestTimeOut = 25,
110 StorageException = 26,
112 OperationNotAttemptedException = 27,
114 NotEnoughReplicasAfterAppendException = 28,
116 NotEnoughReplicasException = 29,
118 SecurityTokenException = 30,
120 OutOfOrderSequenceException = 31,
122 DuplicateSequenceException = 32,
124 UnknownWriterIdException = 33,
126 InvalidColumnProjection = 34,
128 InvalidTargetColumn = 35,
130 PartitionNotExists = 36,
132 TableNotPartitionedException = 37,
134 InvalidTimestampException = 38,
136 InvalidConfigException = 39,
138 LakeStorageNotConfiguredException = 40,
140 KvSnapshotNotExist = 41,
142 PartitionAlreadyExists = 42,
144 PartitionSpecInvalidException = 43,
146 LeaderNotAvailableException = 44,
148 PartitionMaxNumException = 45,
150 AuthenticateException = 46,
152 SecurityDisabledException = 47,
154 AuthorizationException = 48,
156 BucketMaxNumException = 49,
158 FencedTieringEpochException = 50,
160 RetriableAuthenticateException = 51,
162 InvalidServerRackInfoException = 52,
164 LakeSnapshotNotExist = 53,
166 LakeTableAlreadyExist = 54,
168 IneligibleReplicaException = 55,
170 InvalidAlterTableException = 56,
172 DeletionDisabledException = 57,
174}
175
176impl FlussError {
177 pub fn code(&self) -> i32 {
179 *self as i32
180 }
181
182 pub fn is_retriable(&self) -> bool {
183 matches!(
184 self,
185 FlussError::NetworkException
186 | FlussError::CorruptMessage
187 | FlussError::SchemaNotExist
188 | FlussError::LogStorageException
189 | FlussError::KvStorageException
190 | FlussError::NotLeaderOrFollower
191 | FlussError::CorruptRecordException
192 | FlussError::UnknownTableOrBucketException
193 | FlussError::RequestTimeOut
194 | FlussError::StorageException
195 | FlussError::NotEnoughReplicasAfterAppendException
196 | FlussError::NotEnoughReplicasException
197 | FlussError::LeaderNotAvailableException
198 )
199 }
200
201 pub fn message(&self) -> &'static str {
203 match self {
204 FlussError::UnknownServerError => {
205 "The server experienced an unexpected error when processing the request."
206 }
207 FlussError::None => "No error",
208 FlussError::NetworkException => {
209 "The server disconnected before a response was received."
210 }
211 FlussError::UnsupportedVersion => "The version of API is not supported.",
212 FlussError::CorruptMessage => {
213 "This message has failed its CRC checksum, exceeds the valid size, has a null key for a primary key table, or is otherwise corrupt."
214 }
215 FlussError::DatabaseNotExist => "The database does not exist.",
216 FlussError::DatabaseNotEmpty => "The database is not empty.",
217 FlussError::DatabaseAlreadyExist => "The database already exists.",
218 FlussError::TableNotExist => "The table does not exist.",
219 FlussError::TableAlreadyExist => "The table already exists.",
220 FlussError::SchemaNotExist => "The schema does not exist.",
221 FlussError::LogStorageException => {
222 "Exception occur while storage data for log in server."
223 }
224 FlussError::KvStorageException => {
225 "Exception occur while storage data for kv in server."
226 }
227 FlussError::NotLeaderOrFollower => "Not leader or follower.",
228 FlussError::RecordTooLargeException => "The record is too large.",
229 FlussError::CorruptRecordException => "The record is corrupt.",
230 FlussError::InvalidTableException => {
231 "The client has attempted to perform an operation on an invalid table."
232 }
233 FlussError::InvalidDatabaseException => {
234 "The client has attempted to perform an operation on an invalid database."
235 }
236 FlussError::InvalidReplicationFactor => {
237 "The replication factor is larger then the number of available tablet servers."
238 }
239 FlussError::InvalidRequiredAcks => {
240 "Produce request specified an invalid value for required acks."
241 }
242 FlussError::LogOffsetOutOfRangeException => "The log offset is out of range.",
243 FlussError::NonPrimaryKeyTableException => "The table is not primary key table.",
244 FlussError::UnknownTableOrBucketException => "The table or bucket does not exist.",
245 FlussError::InvalidUpdateVersionException => "The update version is invalid.",
246 FlussError::InvalidCoordinatorException => "The coordinator is invalid.",
247 FlussError::FencedLeaderEpochException => "The leader epoch is invalid.",
248 FlussError::RequestTimeOut => "The request time out.",
249 FlussError::StorageException => "The general storage exception.",
250 FlussError::OperationNotAttemptedException => {
251 "The server did not attempt to execute this operation."
252 }
253 FlussError::NotEnoughReplicasAfterAppendException => {
254 "Records are written to the server already, but to fewer in-sync replicas than required."
255 }
256 FlussError::NotEnoughReplicasException => {
257 "Messages are rejected since there are fewer in-sync replicas than required."
258 }
259 FlussError::SecurityTokenException => "Get file access security token exception.",
260 FlussError::OutOfOrderSequenceException => {
261 "The tablet server received an out of order sequence batch."
262 }
263 FlussError::DuplicateSequenceException => {
264 "The tablet server received a duplicate sequence batch."
265 }
266 FlussError::UnknownWriterIdException => {
267 "This exception is raised by the tablet server if it could not locate the writer metadata."
268 }
269 FlussError::InvalidColumnProjection => "The requested column projection is invalid.",
270 FlussError::InvalidTargetColumn => "The requested target column to write is invalid.",
271 FlussError::PartitionNotExists => "The partition does not exist.",
272 FlussError::TableNotPartitionedException => "The table is not partitioned.",
273 FlussError::InvalidTimestampException => "The timestamp is invalid.",
274 FlussError::InvalidConfigException => "The config is invalid.",
275 FlussError::LakeStorageNotConfiguredException => "The lake storage is not configured.",
276 FlussError::KvSnapshotNotExist => "The kv snapshot does not exist.",
277 FlussError::PartitionAlreadyExists => "The partition already exists.",
278 FlussError::PartitionSpecInvalidException => "The partition spec is invalid.",
279 FlussError::LeaderNotAvailableException => {
280 "There is no currently available leader for the given partition."
281 }
282 FlussError::PartitionMaxNumException => "Exceed the maximum number of partitions.",
283 FlussError::AuthenticateException => "Authentication failed.",
284 FlussError::SecurityDisabledException => "Security is disabled.",
285 FlussError::AuthorizationException => "Authorization failed.",
286 FlussError::BucketMaxNumException => "Exceed the maximum number of buckets.",
287 FlussError::FencedTieringEpochException => "The tiering epoch is invalid.",
288 FlussError::RetriableAuthenticateException => {
289 "Authentication failed with retriable exception."
290 }
291 FlussError::InvalidServerRackInfoException => "The server rack info is invalid.",
292 FlussError::LakeSnapshotNotExist => "The lake snapshot does not exist.",
293 FlussError::LakeTableAlreadyExist => "The lake table already exists.",
294 FlussError::IneligibleReplicaException => {
295 "The new ISR contains at least one ineligible replica."
296 }
297 FlussError::InvalidAlterTableException => "The alter table is invalid.",
298 FlussError::DeletionDisabledException => {
299 "Deletion operations are disabled on this table."
300 }
301 }
302 }
303
304 pub fn to_api_error(&self, message: Option<String>) -> ApiError {
306 ApiError {
307 code: self.code(),
308 message: message.unwrap_or(self.message().to_string()),
309 }
310 }
311
312 pub fn for_code(code: i32) -> Self {
315 match code {
316 -1 => FlussError::UnknownServerError,
317 0 => FlussError::None,
318 1 => FlussError::NetworkException,
319 2 => FlussError::UnsupportedVersion,
320 3 => FlussError::CorruptMessage,
321 4 => FlussError::DatabaseNotExist,
322 5 => FlussError::DatabaseNotEmpty,
323 6 => FlussError::DatabaseAlreadyExist,
324 7 => FlussError::TableNotExist,
325 8 => FlussError::TableAlreadyExist,
326 9 => FlussError::SchemaNotExist,
327 10 => FlussError::LogStorageException,
328 11 => FlussError::KvStorageException,
329 12 => FlussError::NotLeaderOrFollower,
330 13 => FlussError::RecordTooLargeException,
331 14 => FlussError::CorruptRecordException,
332 15 => FlussError::InvalidTableException,
333 16 => FlussError::InvalidDatabaseException,
334 17 => FlussError::InvalidReplicationFactor,
335 18 => FlussError::InvalidRequiredAcks,
336 19 => FlussError::LogOffsetOutOfRangeException,
337 20 => FlussError::NonPrimaryKeyTableException,
338 21 => FlussError::UnknownTableOrBucketException,
339 22 => FlussError::InvalidUpdateVersionException,
340 23 => FlussError::InvalidCoordinatorException,
341 24 => FlussError::FencedLeaderEpochException,
342 25 => FlussError::RequestTimeOut,
343 26 => FlussError::StorageException,
344 27 => FlussError::OperationNotAttemptedException,
345 28 => FlussError::NotEnoughReplicasAfterAppendException,
346 29 => FlussError::NotEnoughReplicasException,
347 30 => FlussError::SecurityTokenException,
348 31 => FlussError::OutOfOrderSequenceException,
349 32 => FlussError::DuplicateSequenceException,
350 33 => FlussError::UnknownWriterIdException,
351 34 => FlussError::InvalidColumnProjection,
352 35 => FlussError::InvalidTargetColumn,
353 36 => FlussError::PartitionNotExists,
354 37 => FlussError::TableNotPartitionedException,
355 38 => FlussError::InvalidTimestampException,
356 39 => FlussError::InvalidConfigException,
357 40 => FlussError::LakeStorageNotConfiguredException,
358 41 => FlussError::KvSnapshotNotExist,
359 42 => FlussError::PartitionAlreadyExists,
360 43 => FlussError::PartitionSpecInvalidException,
361 44 => FlussError::LeaderNotAvailableException,
362 45 => FlussError::PartitionMaxNumException,
363 46 => FlussError::AuthenticateException,
364 47 => FlussError::SecurityDisabledException,
365 48 => FlussError::AuthorizationException,
366 49 => FlussError::BucketMaxNumException,
367 50 => FlussError::FencedTieringEpochException,
368 51 => FlussError::RetriableAuthenticateException,
369 52 => FlussError::InvalidServerRackInfoException,
370 53 => FlussError::LakeSnapshotNotExist,
371 54 => FlussError::LakeTableAlreadyExist,
372 55 => FlussError::IneligibleReplicaException,
373 56 => FlussError::InvalidAlterTableException,
374 57 => FlussError::DeletionDisabledException,
375 _ => FlussError::UnknownServerError,
376 }
377 }
378}
379
380impl Display for FlussError {
381 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
382 write!(f, "{}", self.message())
383 }
384}
385
386impl From<ErrorResponse> for ApiError {
387 fn from(error_response: ErrorResponse) -> Self {
388 let fluss_error = FlussError::for_code(error_response.error_code);
389 fluss_error.to_api_error(error_response.error_message)
390 }
391}
392
393impl From<ApiError> for FlussError {
394 fn from(api_error: ApiError) -> Self {
395 FlussError::for_code(api_error.code)
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402
403 #[test]
404 fn for_code_maps_known_and_unknown() {
405 assert_eq!(FlussError::for_code(0), FlussError::None);
406 assert_eq!(
407 FlussError::for_code(FlussError::AuthorizationException.code()),
408 FlussError::AuthorizationException
409 );
410 assert_eq!(FlussError::for_code(9999), FlussError::UnknownServerError);
411 }
412
413 #[test]
414 fn to_api_error_uses_message() {
415 let err = FlussError::InvalidTableException.to_api_error(None);
416 assert_eq!(err.code, FlussError::InvalidTableException.code());
417 assert!(err.message.contains("invalid table"));
418 }
419
420 #[test]
421 fn error_response_conversion_round_trip() {
422 let response = ErrorResponse {
423 error_code: FlussError::TableNotExist.code(),
424 error_message: Some("missing".to_string()),
425 };
426 let api_error = ApiError::from(response);
427 assert_eq!(api_error.code, FlussError::TableNotExist.code());
428 assert_eq!(api_error.message, "missing");
429 let fluss_error = FlussError::from(api_error);
430 assert_eq!(fluss_error, FlussError::TableNotExist);
431 }
432
433 #[test]
434 fn is_retriable_known_retriable_errors() {
435 let retriable = [
436 FlussError::NetworkException,
437 FlussError::CorruptMessage,
438 FlussError::SchemaNotExist,
439 FlussError::LogStorageException,
440 FlussError::KvStorageException,
441 FlussError::NotLeaderOrFollower,
442 FlussError::CorruptRecordException,
443 FlussError::UnknownTableOrBucketException,
444 FlussError::RequestTimeOut,
445 FlussError::StorageException,
446 FlussError::NotEnoughReplicasAfterAppendException,
447 FlussError::NotEnoughReplicasException,
448 FlussError::LeaderNotAvailableException,
449 ];
450 for err in &retriable {
451 assert!(err.is_retriable(), "{err:?} should be retriable");
452 }
453 }
454
455 #[test]
456 fn is_retriable_known_non_retriable_errors() {
457 let non_retriable = [
458 FlussError::UnknownServerError,
459 FlussError::None,
460 FlussError::TableNotExist,
461 FlussError::AuthenticateException,
462 FlussError::AuthorizationException,
463 FlussError::RecordTooLargeException,
464 FlussError::DeletionDisabledException,
465 FlussError::InvalidCoordinatorException,
466 FlussError::FencedLeaderEpochException,
467 FlussError::FencedTieringEpochException,
468 FlussError::RetriableAuthenticateException,
469 ];
470 for err in &non_retriable {
471 assert!(!err.is_retriable(), "{err:?} should not be retriable");
472 }
473 }
474
475 #[test]
476 fn api_error_is_retriable_delegates_to_fluss_error() {
477 let retriable_api = FlussError::RequestTimeOut.to_api_error(None);
478 assert!(retriable_api.is_retriable());
479
480 let permanent_api = FlussError::TableNotExist.to_api_error(None);
481 assert!(!permanent_api.is_retriable());
482 }
483}